PythonでジョブをQueueにいれつつMulti Processでさばいていく方法

2017-12-20
このエントリーをはてなブックマークに追加

先日書いた「PythonでたまったQueueをMulti Threadでさばいていく方法」のMulti Process版

ProcessPoolExecutorではプロセス間で通信できないようだったのでmultiprocessingを使って書きました

import time
from datetime import datetime
from multiprocessing import Process, JoinableQueue


def worker(queue):
    while True:
        _job = queue.get()
        if _job is None:
            queue.task_done()
            break

        print(datetime.now())
        print(_job)
        queue.task_done()
        time.sleep(1)

def main():
    _process = []
    _queue = JoinableQueue()
    _docs = ["a", "b", "c", "d", "e", "f", "g"]
    for _i in range(2):
        _p = Process(target=worker, args=(_queue,))
        _p.start()
        _process.append(_p)

    for _d in _docs:
        _queue.put(_d)

    for _i in range(2):
        _queue.put(None)

    _queue.join()

if __name__ == "__main__":
    main()

思っていたよりマルチスレッド版とは違いが出ました

ジョブの投入が終わってからNoneを入れてるのは、「Noneっていうジョブが来たら終了してね」っていうサインを送ったっていう意図です、Queueが空になったくらいじゃ返ってきてくれません

2017-12-20 09:12:35.840879
a
2017-12-20 09:12:35.841184
b
2017-12-20 09:12:36.842196
c
2017-12-20 09:12:36.842427
d
2017-12-20 09:12:37.843443
e
2017-12-20 09:12:37.843636
f
2017-12-20 09:12:38.844696
g

PythonでたまったQueueをMulti Threadでさばいていく方法

2017-12-18
このエントリーをはてなブックマークに追加

Queueにはいったジョブを指定したThread数で順番にさばいていきたいよ、っていうときに使います
あんまり大量にQueueにいれるとメモリからあふれて落ちるんだろうな

import threading
import queue
import time
from datetime import datetime

QUEUE = queue.Queue()


def worker():
    while True:
        _docs = QUEUE.get()
        job_worker(_docs)
        QUEUE.task_done()

def job_worker(docs):
    print(datetime.now())
    print(docs)
    time.sleep(1)
    return None

def main():

    _threads = []
    for _i in range(2):
        _t = threading.Thread(target=worker)
        _t.daemon = True
        _t.start()
        _threads.append(_t)

    _docs = ["a", "b", "c", "d", "e", "f", "g"]
    for _doc in _docs:
        QUEUE.put(_doc)

    QUEUE.join()

if __name__ == "__main__":
    main()

実行するとこんな出力になります、threadがふたつなのでふたつずつ処理されてますね

2017-12-17 17:55:49.769058
a
2017-12-17 17:55:49.769201
b
2017-12-17 17:55:50.770286
c
2017-12-17 17:55:50.770461
d
2017-12-17 17:55:51.771527
e
2017-12-17 17:55:51.771687
f
2017-12-17 17:55:52.772768
g

Python3.6で書いたサンプルです