先日書いた「PythonでたまったQueueをMulti Threadでさばいていく方法」のMulti Process版
ProcessPoolExecutorではプロセス間で通信できないようだったのでmultiprocessingを使って書きました
import time
from datetime import datetime
from multiprocessing import Process, JoinableQueue
def worker(queue):
while True:
_job = queue.get()
if _job is None:
queue.task_done()
break
print(datetime.now())
print(_job)
queue.task_done()
time.sleep(1)
def main():
_process = []
_queue = JoinableQueue()
_docs = ["a", "b", "c", "d", "e", "f", "g"]
for _i in range(2):
_p = Process(target=worker, args=(_queue,))
_p.start()
_process.append(_p)
for _d in _docs:
_queue.put(_d)
for _i in range(2):
_queue.put(None)
_queue.join()
if __name__ == "__main__":
main()
思っていたよりマルチスレッド版とは違いが出ました
ジョブの投入が終わってからNoneを入れてるのは、「Noneっていうジョブが来たら終了してね」っていうサインを送ったっていう意図です、Queueが空になったくらいじゃ返ってきてくれません
2017-12-20 09:12:35.840879
a
2017-12-20 09:12:35.841184
b
2017-12-20 09:12:36.842196
c
2017-12-20 09:12:36.842427
d
2017-12-20 09:12:37.843443
e
2017-12-20 09:12:37.843636
f
2017-12-20 09:12:38.844696
g
Queueにはいったジョブを指定したThread数で順番にさばいていきたいよ、っていうときに使います
あんまり大量にQueueにいれるとメモリからあふれて落ちるんだろうな
import threading
import queue
import time
from datetime import datetime
QUEUE = queue.Queue()
def worker():
while True:
_docs = QUEUE.get()
job_worker(_docs)
QUEUE.task_done()
def job_worker(docs):
print(datetime.now())
print(docs)
time.sleep(1)
return None
def main():
_threads = []
for _i in range(2):
_t = threading.Thread(target=worker)
_t.daemon = True
_t.start()
_threads.append(_t)
_docs = ["a", "b", "c", "d", "e", "f", "g"]
for _doc in _docs:
QUEUE.put(_doc)
QUEUE.join()
if __name__ == "__main__":
main()
実行するとこんな出力になります、threadがふたつなのでふたつずつ処理されてますね
2017-12-17 17:55:49.769058
a
2017-12-17 17:55:49.769201
b
2017-12-17 17:55:50.770286
c
2017-12-17 17:55:50.770461
d
2017-12-17 17:55:51.771527
e
2017-12-17 17:55:51.771687
f
2017-12-17 17:55:52.772768
g
Python3.6で書いたサンプルです
全Tweetの1%と言われるPublic streamsのsampleを受信してみます。
OAuthやらStreamやら、どう実装するかなーと調べてみるとRequests: 人間のためのHTTP があれば十分なことがわかったし、親切なサンプルコードも書かれてたのであっさり実装できてしまった。
OAuth 1 Authentication
Streaming Requests
import json
import requests
from requests_oauthlib import OAuth1
def main():
_auth = OAuth1(
API_KEY,
API_SECRET,
ACCESS_TOKEN,
ACCESS_SECRET
)
_stream = requests.get(
"https://stream.twitter.com/1.1/statuses/sample.json?language=ja",
auth=_auth,
stream=True,
)
for _line in _stream.iter_lines():
_doc = json.loads(_line.decode("utf-8"))
print(json.dumps(_doc, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()
日本語のツイートだけ取得したかったのでlanguage=jaを追加しています。Requests便利すぎ。
久々に新しくec2に私用でインスタンスたてたのでこれまでずっとAmazon Linux使ってたけどCentOS7にしてみた。ついでに先日リリースされたPython3.6.0をインストールしてみました。
cd /usr/local/src/
sudo wget https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tgz
sudo tar zxfv Python-3.6.0.tgz
sudo chown -R centos. Python-3.6.0
cd Python-3.6.0
./configure --enable-shared --enable-optimizations
sudo make
sudo make install
「chownはなんのため?」と思われるでしょうが、tarボールを解凍したらパーミッションが501.gamesになっていて触れなかったのでcentosユーザーに変えました。なんでこうなったのかは知らんのですが、同じホストでPython3.5.2のソースコードをDLして試したらパーミッションはcentos.centosだったので、tarした環境の問題なんじゃないかと。
makeとmake installで30分以上待ちました。テストですごい時間かかってた。t2.microだったからなんだろうけど、3.5のころはここまで時間かかんなかった気がするんだよなぁ。
で、このまま起動しようとするとライブラリがみえないっていって怒られるので
sudo vi /etc/ld.so.conf
「/usr/local/lib」を追記
sudo /sbin/ldconfig
$ /usr/local/bin/python3
Python 3.6.0 (default, Dec 31 2016, 00:18:38)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux
Type "help", "copyright", "credits" or "license" for more information.
無事起動しました。
デフォルトのpythonを置き換えることはしなくて、併存です。どうせvenv使うんだし、それで十分です。最近はpyenv使う人が多いのかもしれないけど、標準でインストールされるvenvで十分なのであれこれインストールしたりはしません。
最近GCPが話題になっていて、AWSよりずっと安いっていう話も聞くんですけど、別にAWSにそんなに不満もないのでまだしばらくはAWS使おうかなと。
PythonでジョブキューするといえばやっぱりCeleryがスタンダードなんだろうか。RQというシンプルなジョブキューライブラリもあるんですけど、今回はCeleryを動かしてみる。
インストールは簡単。
pip install Celery
バックエンドはRabbitMQかRedisで使うのが良さそうです。手元の環境にRedisが入っていたのでそのままRedisを使ってみます。
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def task(message):
return "Hello {0}".format(message)
上記をjob.pyとして保存しておきます。で、このjob.pyをワーカーとして起動。このワーカーがデキューしてくれます。
$ celery -A job worker --loglevel=info
いろいろログを出力して起動してくれます。ワーカーの起動はいろいろオプションがあります。デーモンとして動かしたり並列で大量に立ち上げたり。非常に便利。
で、エンキューはもうインタラクティブシェルでもできます。
$ python
Python 3.4.2 (default, Oct 16 2014, 01:53:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from job import task
>>> task.delay("World.")
エンキューもこれだけ。
で、エンキューするとさっき立ち上げたワーカーの方でデキューしてジョブを捌いてくれます。
succeeded in 0.0014186919997882796s: 'Hello World.'
こんな簡単にジョブキューの仕組みが作れるなんて。
Python3っていうか、3.3以降です。
from concurrent.futures import ProcessPoolExecutor
import time
def work(job):
time.sleep(job)
print(job)
def main():
_jobs = [5, 1, 2]
_e = ProcessPoolExecutor(max_workers=2)
for _j in _jobs:
_e.submit(work, _j)
if __name__ == '__main__':
main()
簡単なサンプルなんですけど、ワーカー2つ作って並列にworkというメソッドを実行してくれます。
実行すると
$ python sample.py
1
2
5
こんな感じに出力されます。5,1,2って順番で処理投げ込んでるのに1,2,5の順番で終わってます。
Pythonでmultiprocessingを使って並列処理 で書いたサンプルも簡単でしたけど、ProcessPoolExecutor使うともっと簡単です。
ProcessPoolExecutor使って4並列で走るツール書いて実行したらこんな感じになりました。

これはいい。
ApachetとかNginxのログは圧縮して保存してケースが多いんですけど、それを読むときにいちいち解凍してるのもめんどくさいのでgzipのまんま読んでみるサンプル。
import gzip
def main():
_f = gzip.open('/path/to/logfile.gz', 'rb')
_line = _f.readline()
while _line:
_text = _line.decode('utf-8')
print(_text)
_line = _f.readline()
_f.close()
if __name__ = '__main__':
main()
ファイルを読み込むだけだと文字列はなくてバイト配列なのでそれをデコードしてます。テキストファイルを読み込む時となんら変わりはないのですが、デコードが必要なのでそれを忘れるとあれ?ってなってしまうので要注意。
最初.split(‘\t’)ができなくてあれ?ってなりました。
Python3.4で動かしたコードですけど、2.7でも動くはずです。
2015.01.28 追記
_f = gzip.open('/path/to/logfile.gz', 'rt')
_line = _f.readline()
while _line:
print(_line)
_line = _f.readline()
ファイル開くときにテキストモードで開けばもっとスマートです。@yosida95 に教えてもらいました。感謝!
テキストの類似度って言っても出現する単語の回数比較でしか無いので文意解釈はしてないです。あくまで「出現した文字の一致度」ですね。
Word2Vecとかを使ってテキストを拡張してあげれば少しは文意を加味した類似度といえるのかもです。
from scipy.spatial.distance import cosine
import unittest
class SentenceSimilarity(object):
def __init__(self):
self._A = None
self._B = None
@property
def A(self):
return self._A
@property
def B(self):
return self._B
@A.setter
def A(self, v):
self._A = [i for i in v.replace(' ', ',').split(',') if len(i) > 0]
@B.setter
def B(self, v):
self._B = [i for i in v.replace(' ', ',').split(',') if len(i) > 0]
def distance(self):
if self._A is None or self._B is None:
return False
if len(self._A) < 2 or len(self._B) < 2:
return False
return self._distance()
def _distance(self):
_words = []
_words.extend(self._A)
_words.extend(self._B)
_words = list(set(_words))
_words.sort()
_listA = [self._A.count(_w) for _w in _words]
_listB = [self._B.count(_w) for _w in _words]
try:
return 1 - cosine(_listA, _listB)
except:
return False
class TestSentenceSimilarity(unittest.TestCase):
def setUp(self):
pass
def test_0(self):
_sentence = SentenceSimilarity()
_sentence.A = '今期 業績 予想 未定 期限切れ 肉 問題 販売減'
_sentence.B = '都市 対抗 野球 西濃運輸 初優勝 佐伯 富士 重工'
self.assertGreater(_sentence.distance(), 0.0)
def test_1(self):
_sentence = SentenceSimilarity()
_sentence.A = '今期 業績 予想 未定 期限切れ 肉 問題 販売減'
self.assertEqual(_sentence.distance(), False)
def test_2(self):
_sentence = SentenceSimilarity()
_sentence.A = '今期'
_sentence.B = '都市'
self.assertEqual(_sentence.distance(), False)
def test_3(self):
_sentence = SentenceSimilarity()
_sentence.A = '今期,業績,予想,未定'
_sentence.B = '都市,業績,予想,未定'
_sentence.B = '業績 対抗 野球 西濃運輸 初優勝'
self.assertGreater(_sentence.distance(), 0.0)
if __name__ == '__main__':
unittest.main()
使い方はテストをみていただければご理解いただけるかと。
コサイン類似度の計算なんて
return 1 - cosine(_listA, _listB)
の部分でしかやってなくて、残りは全部下ごしらえです。
_sentence = SentenceSimilarity()
_sentence.A = ‘今期 業績 予想 未定 期限切れ 肉 問題 販売減’
_sentence.B = ‘都市 対抗 野球 西濃運輸 初優勝 佐伯 富士 重工’
インスタンス作って、AとB(命名が安直ですいません)にカンマ切り、もしくはスペース切りの文字列を指定します。なので事前に分かち書きをしておく必要があります。
まあ文章を渡してメソッドの中で分かち書きしてもいいんですけどね。手元にあったのが分かち書き済みだったので今回はそのまま使いました。
_sentence.distance()
で、類似度を求めます。類似度を求める前に_Aと_Bの和集合を作って、和集合の単語リストにある単語が_Aと_Bそれぞれで幾つずつ出現するかを求めます。
上記例だと和集合は[今期,業績,予想,未定,期限切れ,肉,問題,販売減,都市,対抗,野球,西濃運輸,初優勝,佐伯,富士,重工]となり、
_listAは[1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0]
_listBは[0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1]
になるはずです。で、これらをベクトルにしてコサイン類似度を求めます。今は0と1だけですが複数回出現する単語があれば当然2以上の値も出てきます。
コサイン類似度は0から1の値で返ってきて、値が大きければ大きいほど類似しています。上記サンプルだと0が返ってくるはずです。ぜんぜん違う、ってことですね。
アイテムベースのレコメンドに使えると思います。サービスとして提供するレコメンドエンジンであればこんな単純な計算じゃないでしょうね。
Python 3.4.2, scipy 0.14.0 で書きました。
使ったのはMovieLens 100k。
「100,000 ratings from 1000 users on 1700 movies.」
だそうです。
from sklearn.cluster import KMeans
def main():
_items = []
_f = open('./rating.csv' )
_lines = _f.readlines()
for _line in _lines:
_items.append(_line.split(','))
_f.close()
km = KMeans(init='k-means++')
km.fit(_items)
print(km.labels_)
if __name__ == '__main__':
main()
すげえ簡単。。。
CSVは1行が1人分で全部の映画のレーティング情報(0が観てない、評価は1から5)が入っています。1000ユーザーが1700本の映画を評価(未鑑賞含む)してます。
だいたいこんな感じ。
5,3,4,3,3,5,4,1,5,3,2,5,5,5,5,5,3,4,,,,,,,,
4,0,0,0,0,0,0,0,0,2,0,0,4,4,0,0,0,0,,,,,,,,
得られる結果は
[0 1 4 4 3 2 6 3 4 6 3 7 6 2 1 5 1 6 4 7 1 3 3 7,,,,,,,
インデックス2番のユーザーさんと3番のユーザーさんは好きな映画が似る傾向にありそうですね。
Python 3.4.2、scikit-learn 0.15.2 で試しました。
Clustering text documents using k-means (K平均法を使ってテキスト文書をクラスタリングする)というそのまんまのサンプルがあったので写経して最低限だけ削りだしてみた。
K平均法とかTF-IDFとか潜在意味解析の説明は割愛。
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import Normalizer
def main():
_items = [
'わたし まけ まし た わ',
'わたし まけ まし た わ',
'わたし まけ まし た わ',
'わたし まけ まし た わ',
'となり の きゃく は よく かき くう きゃく だ',
'にわ には にわ なかにわ には にわ にわとり が いる',
'バカ と テスト と 召喚獣',
'俺 の 妹 が こんな に 可愛い わけ が ない'
]
vectorizer = TfidfVectorizer(
use_idf=True
)
X = vectorizer.fit_transform(_items)
lsa = TruncatedSVD(10)
X = lsa.fit_transform(X)
X = Normalizer(copy=False).fit_transform(X)
km = KMeans(
init='k-means++',
)
km.fit(X)
print(km.labels_)
if __name__ == '__main__':
main()
テキストは分かち書きされてる状態(空白区切りになってます)でドキュメント単位でリストにいれておく。これを元データとしてTfidfVectorizerに突っ込みます。ここでドキュメントがベクトル化されます。
で、潜在意味解析を通した上でK平均法でクラスタリング。至って単純。
km.labels_を出力すると
[1 1 1 1 0 2 4 3]
こんな感じで出力されるのですが、これは「データセットのインデックス0,1,2,3は1番クラスタ、インデックス4は0番クラスタ、、、」ということです。これを使って元データをグルーピングしておきます。
すっごい簡単。使い方はこれであってる、と思う。たぶん。あ、ドキュメント数は少なくても1,000以上ないときれいにクラスタリングされなそうだし、テキストももっと長くないと納得のいく結果は出力されにくいんじゃないかと。
自分のツイートを使ったり青空文庫を使ったりで気軽に試せそうです。
Python 3.4.2、scikit-learn==0.15.2 で動かしました。
昨日Mahoutの記事書いたばっかりなのにね。