PythonでジョブをQueueにいれつつMulti Processでさばいていく方法

2017-12-20
このエントリーをはてなブックマークに追加

先日書いた「PythonでたまったQueueをMulti Threadでさばいていく方法」のMulti Process版

ProcessPoolExecutorではプロセス間で通信できないようだったのでmultiprocessingを使って書きました

import time
from datetime import datetime
from multiprocessing import Process, JoinableQueue


def worker(queue):
    while True:
        _job = queue.get()
        if _job is None:
            queue.task_done()
            break

        print(datetime.now())
        print(_job)
        queue.task_done()
        time.sleep(1)

def main():
    _process = []
    _queue = JoinableQueue()
    _docs = ["a", "b", "c", "d", "e", "f", "g"]
    for _i in range(2):
        _p = Process(target=worker, args=(_queue,))
        _p.start()
        _process.append(_p)

    for _d in _docs:
        _queue.put(_d)

    for _i in range(2):
        _queue.put(None)

    _queue.join()

if __name__ == "__main__":
    main()

思っていたよりマルチスレッド版とは違いが出ました

ジョブの投入が終わってからNoneを入れてるのは、「Noneっていうジョブが来たら終了してね」っていうサインを送ったっていう意図です、Queueが空になったくらいじゃ返ってきてくれません

2017-12-20 09:12:35.840879
a
2017-12-20 09:12:35.841184
b
2017-12-20 09:12:36.842196
c
2017-12-20 09:12:36.842427
d
2017-12-20 09:12:37.843443
e
2017-12-20 09:12:37.843636
f
2017-12-20 09:12:38.844696
g

PythonでたまったQueueをMulti Threadでさばいていく方法

2017-12-18
このエントリーをはてなブックマークに追加

Queueにはいったジョブを指定したThread数で順番にさばいていきたいよ、っていうときに使います
あんまり大量にQueueにいれるとメモリからあふれて落ちるんだろうな

import threading
import queue
import time
from datetime import datetime

QUEUE = queue.Queue()


def worker():
    while True:
        _docs = QUEUE.get()
        job_worker(_docs)
        QUEUE.task_done()

def job_worker(docs):
    print(datetime.now())
    print(docs)
    time.sleep(1)
    return None

def main():

    _threads = []
    for _i in range(2):
        _t = threading.Thread(target=worker)
        _t.daemon = True
        _t.start()
        _threads.append(_t)

    _docs = ["a", "b", "c", "d", "e", "f", "g"]
    for _doc in _docs:
        QUEUE.put(_doc)

    QUEUE.join()

if __name__ == "__main__":
    main()

実行するとこんな出力になります、threadがふたつなのでふたつずつ処理されてますね

2017-12-17 17:55:49.769058
a
2017-12-17 17:55:49.769201
b
2017-12-17 17:55:50.770286
c
2017-12-17 17:55:50.770461
d
2017-12-17 17:55:51.771527
e
2017-12-17 17:55:51.771687
f
2017-12-17 17:55:52.772768
g

Python3.6で書いたサンプルです

PythonのSciPyで値の標準化

2017-01-15
このエントリーをはてなブックマークに追加

「平均が0、分散が1」ってやつです。
平均引いて標準偏差で割ればいいので大した計算量ではないのですが、SciPyを使うと関数一発。便利すぎる。

import numpy as np
from scipy.stats import zscore


def main():
    _array = np.array([1, 2, 3, 4, 5])
    _n = zscore(_array)

    print(_n)
    print(np.mean(_n))
    print(np.var(_n))


if __name__ == "__main__":
    main()
$ python standardization.py
[-1.41421356 -0.70710678  0.          0.70710678  1.41421356]
0.0
1.0

PythonでTwitterのStreaming APIを受信する

2017-01-05
このエントリーをはてなブックマークに追加

全Tweetの1%と言われるPublic streamsのsampleを受信してみます。

OAuthやらStreamやら、どう実装するかなーと調べてみるとRequests: 人間のためのHTTP があれば十分なことがわかったし、親切なサンプルコードも書かれてたのであっさり実装できてしまった。

OAuth 1 Authentication

Streaming Requests

import json

import requests
from requests_oauthlib import OAuth1


def main():
    _auth = OAuth1(
        API_KEY,
        API_SECRET,
        ACCESS_TOKEN,
        ACCESS_SECRET
    )

    _stream = requests.get(
        "https://stream.twitter.com/1.1/statuses/sample.json?language=ja",
        auth=_auth,
        stream=True,
    )

    for _line in _stream.iter_lines():

        _doc = json.loads(_line.decode("utf-8"))
        print(json.dumps(_doc, ensure_ascii=False, indent=2))


if __name__ == '__main__':
    main()

日本語のツイートだけ取得したかったのでlanguage=jaを追加しています。Requests便利すぎ。

CentOS7にPython3.6.0をソースからインストールした

2016-12-31
このエントリーをはてなブックマークに追加

久々に新しくec2に私用でインスタンスたてたのでこれまでずっとAmazon Linux使ってたけどCentOS7にしてみた。ついでに先日リリースされたPython3.6.0をインストールしてみました。

cd /usr/local/src/
sudo wget https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tgz
sudo tar zxfv Python-3.6.0.tgz
sudo chown -R centos. Python-3.6.0
cd Python-3.6.0
./configure --enable-shared --enable-optimizations
sudo make
sudo make install

「chownはなんのため?」と思われるでしょうが、tarボールを解凍したらパーミッションが501.gamesになっていて触れなかったのでcentosユーザーに変えました。なんでこうなったのかは知らんのですが、同じホストでPython3.5.2のソースコードをDLして試したらパーミッションはcentos.centosだったので、tarした環境の問題なんじゃないかと。

makeとmake installで30分以上待ちました。テストですごい時間かかってた。t2.microだったからなんだろうけど、3.5のころはここまで時間かかんなかった気がするんだよなぁ。

で、このまま起動しようとするとライブラリがみえないっていって怒られるので

sudo vi /etc/ld.so.conf

「/usr/local/lib」を追記

sudo /sbin/ldconfig
$ /usr/local/bin/python3
Python 3.6.0 (default, Dec 31 2016, 00:18:38)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux
Type "help", "copyright", "credits" or "license" for more information.

無事起動しました。

デフォルトのpythonを置き換えることはしなくて、併存です。どうせvenv使うんだし、それで十分です。最近はpyenv使う人が多いのかもしれないけど、標準でインストールされるvenvで十分なのであれこれインストールしたりはしません。

最近GCPが話題になっていて、AWSよりずっと安いっていう話も聞くんですけど、別にAWSにそんなに不満もないのでまだしばらくはAWS使おうかなと。

Pythonでitertoolsを使ってパラメータの総当り

2016-06-28
このエントリーをはてなブックマークに追加
import itertools

_x = ['0', '1', '2']
_y = ['a', 'b', 'c']

for _i, _j in itertools.product(_x, _y):
	print(_i, _j)

パラメータAには複数のパターン、パラメータBにも複数のパターン。それらの総当りパターンを出力したいっていう時に便利。

0 a
0 b
0 c
1 a
1 b
1 c
2 a
2 b
2 c
Tags:

Pythonでジョブキューしてみる

2015-08-02
このエントリーをはてなブックマークに追加

PythonでジョブキューするといえばやっぱりCeleryがスタンダードなんだろうか。RQというシンプルなジョブキューライブラリもあるんですけど、今回はCeleryを動かしてみる。

インストールは簡単。

pip install Celery

バックエンドはRabbitMQかRedisで使うのが良さそうです。手元の環境にRedisが入っていたのでそのままRedisを使ってみます。

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def task(message):
    return "Hello {0}".format(message)

上記をjob.pyとして保存しておきます。で、このjob.pyをワーカーとして起動。このワーカーがデキューしてくれます。

$ celery -A job worker --loglevel=info

いろいろログを出力して起動してくれます。ワーカーの起動はいろいろオプションがあります。デーモンとして動かしたり並列で大量に立ち上げたり。非常に便利。

で、エンキューはもうインタラクティブシェルでもできます。

$ python
Python 3.4.2 (default, Oct 16 2014, 01:53:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from job import task
>>> task.delay("World.")

エンキューもこれだけ。
で、エンキューするとさっき立ち上げたワーカーの方でデキューしてジョブを捌いてくれます。

succeeded in 0.0014186919997882796s: 'Hello World.'

こんな簡単にジョブキューの仕組みが作れるなんて。

Python3で並列処理してみた

2015-01-28
このエントリーをはてなブックマークに追加

Python3っていうか、3.3以降です。

from concurrent.futures import ProcessPoolExecutor
import time


def work(job):
	time.sleep(job)
	print(job)

def main():
	_jobs = [5, 1, 2]
	_e = ProcessPoolExecutor(max_workers=2)
	for _j in _jobs:
		_e.submit(work, _j)

if __name__ == '__main__':
	main()

簡単なサンプルなんですけど、ワーカー2つ作って並列にworkというメソッドを実行してくれます。
実行すると

$ python sample.py
1
2
5

こんな感じに出力されます。5,1,2って順番で処理投げ込んでるのに1,2,5の順番で終わってます。

Pythonでmultiprocessingを使って並列処理 で書いたサンプルも簡単でしたけど、ProcessPoolExecutor使うともっと簡単です。

ProcessPoolExecutor使って4並列で走るツール書いて実行したらこんな感じになりました。
ProcessPoolExecutor
これはいい。

Python3でGZipされたログファイルを読み込む

2014-12-25
このエントリーをはてなブックマークに追加

ApachetとかNginxのログは圧縮して保存してケースが多いんですけど、それを読むときにいちいち解凍してるのもめんどくさいのでgzipのまんま読んでみるサンプル。

import gzip


def main():

    _f = gzip.open('/path/to/logfile.gz', 'rb')
    _line = _f.readline()

    while _line:
        _text = _line.decode('utf-8')
        print(_text)
        _line = _f.readline()

    _f.close()


if __name__ = '__main__':
    main()

ファイルを読み込むだけだと文字列はなくてバイト配列なのでそれをデコードしてます。テキストファイルを読み込む時となんら変わりはないのですが、デコードが必要なのでそれを忘れるとあれ?ってなってしまうので要注意。

最初.split(‘\t’)ができなくてあれ?ってなりました。

Python3.4で動かしたコードですけど、2.7でも動くはずです。

2015.01.28 追記

    _f = gzip.open('/path/to/logfile.gz', 'rt')
    _line = _f.readline()

    while _line:
        print(_line)
        _line = _f.readline()

ファイル開くときにテキストモードで開けばもっとスマートです。@yosida95 に教えてもらいました。感謝!

Pythonでコサイン類似度を使ってテキストの類似度を計算する

2014-11-12
このエントリーをはてなブックマークに追加

テキストの類似度って言っても出現する単語の回数比較でしか無いので文意解釈はしてないです。あくまで「出現した文字の一致度」ですね。

Word2Vecとかを使ってテキストを拡張してあげれば少しは文意を加味した類似度といえるのかもです。

from scipy.spatial.distance import cosine
import unittest


class SentenceSimilarity(object):

    def __init__(self):
        self._A = None
        self._B = None

    @property
    def A(self):
        return self._A

    @property
    def B(self):
        return self._B

    @A.setter
    def A(self, v):
        self._A = [i for i in v.replace(' ', ',').split(',') if len(i) > 0]

    @B.setter
    def B(self, v):
        self._B = [i for i in v.replace(' ', ',').split(',') if len(i) > 0]

    def distance(self):

        if self._A is None or self._B is None:
            return False

        if len(self._A) < 2 or len(self._B) < 2:
            return False

        return self._distance()

    def _distance(self):

        _words = []
        _words.extend(self._A)
        _words.extend(self._B)

        _words = list(set(_words))
        _words.sort()

        _listA = [self._A.count(_w) for _w in _words]
        _listB = [self._B.count(_w) for _w in _words]

        try:
            return 1 - cosine(_listA, _listB)
        except:
            return False


class TestSentenceSimilarity(unittest.TestCase):

    def setUp(self):
        pass

    def test_0(self):

        _sentence = SentenceSimilarity()
        _sentence.A = '今期 業績 予想 未定 期限切れ 肉 問題 販売減'
        _sentence.B = '都市 対抗 野球 西濃運輸 初優勝 佐伯 富士 重工'
        self.assertGreater(_sentence.distance(), 0.0)


    def test_1(self):

        _sentence = SentenceSimilarity()
        _sentence.A = '今期 業績 予想 未定 期限切れ 肉 問題 販売減'

        self.assertEqual(_sentence.distance(), False)

    def test_2(self):

        _sentence = SentenceSimilarity()
        _sentence.A = '今期'
        _sentence.B = '都市'

        self.assertEqual(_sentence.distance(), False)

    def test_3(self):

        _sentence = SentenceSimilarity()
        _sentence.A = '今期,業績,予想,未定'
        _sentence.B = '都市,業績,予想,未定'

        _sentence.B = '業績 対抗 野球 西濃運輸 初優勝'

        self.assertGreater(_sentence.distance(), 0.0)

if __name__ == '__main__':
    unittest.main()

使い方はテストをみていただければご理解いただけるかと。

コサイン類似度の計算なんて

return 1 - cosine(_listA, _listB)

の部分でしかやってなくて、残りは全部下ごしらえです。

_sentence = SentenceSimilarity()
_sentence.A = ‘今期 業績 予想 未定 期限切れ 肉 問題 販売減’
_sentence.B = ‘都市 対抗 野球 西濃運輸 初優勝 佐伯 富士 重工’

インスタンス作って、AとB(命名が安直ですいません)にカンマ切り、もしくはスペース切りの文字列を指定します。なので事前に分かち書きをしておく必要があります。

まあ文章を渡してメソッドの中で分かち書きしてもいいんですけどね。手元にあったのが分かち書き済みだったので今回はそのまま使いました。

_sentence.distance()

で、類似度を求めます。類似度を求める前に_Aと_Bの和集合を作って、和集合の単語リストにある単語が_Aと_Bそれぞれで幾つずつ出現するかを求めます。

上記例だと和集合は[今期,業績,予想,未定,期限切れ,肉,問題,販売減,都市,対抗,野球,西濃運輸,初優勝,佐伯,富士,重工]となり、
_listAは[1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0]
_listBは[0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1]
になるはずです。で、これらをベクトルにしてコサイン類似度を求めます。今は0と1だけですが複数回出現する単語があれば当然2以上の値も出てきます。

コサイン類似度は0から1の値で返ってきて、値が大きければ大きいほど類似しています。上記サンプルだと0が返ってくるはずです。ぜんぜん違う、ってことですね。

アイテムベースのレコメンドに使えると思います。サービスとして提供するレコメンドエンジンであればこんな単純な計算じゃないでしょうね。

Python 3.4.2, scipy 0.14.0 で書きました。