17.7. queue — 同期キュークラス¶
ソースコード: Lib/queue.py
queue モジュールは、複数プロデューサ-複数コンシューマ(multi-producer, multi-consumer)キューを実装します。これは、複数のスレッドの間で情報を安全に交換しなければならないときのマルチスレッドプログラミングで特に有益です。このモジュールの Queue クラスは、必要なすべてのロックセマンティクスを実装しています。これはPythonのスレッドサポートの状況に依存します。 threading モジュールを参照してください。
このモジュールでは3種類のキューが実装されています。それらはキューから取り出されるエントリの順番だけが違います。 FIFOキューでは、最初に追加されたエントリが最初に取り出されます。 LIFOキューでは、最後に追加されたエントリが最初に取り出されます(スタックのように振る舞います)。 優先順位付きキュー(priority queue)では、エントリは(heapq モジュールを利用して)ソートされ、 最も低い値のエントリが最初に取り出されます。
内部的には、このモジュールは競争スレッドを一時的にブロックするためにロックを使っています; しかし、スレッド内での再入を扱うようには設計されていません。
queue モジュールは以下のクラスと例外を定義します:
-
class
queue.Queue(maxsize=0)¶ FIFOキューのコンストラクタです。maxsize はキューに置くことのできる要素数の上限を設定する整数です。いったんこの大きさに達したら、挿入はキューの要素が消費されるまでブロックされます。もし maxsize が0以下であるならば、キューの大きさは無限です。
-
class
queue.LifoQueue(maxsize=0)¶ LIFOキューのコンストラクタです。maxsize はキューに置くことのできる要素数の上限を設定する整数です。いったんこの大きさに達したら、挿入はキューの要素が消費されるまでブロックされます。もし maxsize が0以下であるならば、キューの大きさは無限です。
-
class
queue.PriorityQueue(maxsize=0)¶ 優先順位付きキューのコンストラクタです。maxsize はキューに置くことのできる要素数の上限を設定する整数です。いったんこの大きさに達したら、挿入はキューの要素が消費されるまでブロックされます。もし maxsize が0以下であるならば、キューの大きさは無限です。
最小の値を持つ要素が最初に検索されます (最小の値を持つ値は、
sorted(list(entries))[0]によって返されるものです)。典型的な要素のパターンは、(priority_number, data)形式のタプルです。
-
exception
queue.Empty¶ 空の
Queueオブジェクトで、非ブロックメソッドget()(またはget_nowait()) が呼ばれたとき、送出される例外です。
-
exception
queue.Full¶ 満杯の
Queueオブジェクトで、非ブロックメソッドput()(またはput_nowait()) が呼ばれたとき、送出される例外です。
17.7.1. キューオブジェクト¶
キューオブジェクト(Queue, LifoQueue, PriorityQueue)は、以下のpublicメソッドを提供しています。
-
Queue.qsize()¶ キューの近似サイズを返します。ここで、qsize() > 0 は後続の get() がブロックしないことを保証しないこと、また qsize() < maxsize が put() がブロックしないことを保証しないことに注意してください。
-
Queue.empty()¶ キューが空の場合は
Trueを返し、そうでなければFalseを返します。empty() がTrueを返しても、後続の put() の呼び出しがブロックしないことは保証されません。同様に、empty() がFalseを返しても、後続の get() の呼び出しがブロックしないことは保証されません。
-
Queue.full()¶ キューが一杯の場合は
Trueを返し、そうでなければFalseを返します。full() がTrueを返しても、後続の get() の呼び出しがブロックしないことは保証されません。同様に、full() がFalseを返しても、後続の put() の呼び出しがブロックしないことは保証されません。
-
Queue.put(item, block=True, timeout=None)¶ item をキューに入れます。 もしオプション引数 block が真で timeout が
None(デフォルト) の場合は、必要であればフリースロットが利用可能になるまでブロックします。 timeout が正の数の場合は、最大で timeout 秒間ブロックし、その時間内に空きスロットが利用可能にならなければ、例外Fullを送出します。 そうでない場合 (block が偽) は、空きスロットが直ちに利用できるならば、キューにアイテムを置きます。 できないならば、例外Fullを送出します (この場合 timeout は無視されます)。
-
Queue.put_nowait(item)¶ put(item, False)と等価です。
-
Queue.get(block=True, timeout=None)¶ キューからアイテムを取り除き、それを返します。 オプション引数 block が真で timeout が
None(デフォルト) の場合は、必要であればアイテムが取り出せるようになるまでブロックします。 もし timeout が正の数の場合は、最大で timeout 秒間ブロックし、その時間内でアイテムが取り出せるようにならなければ、例外Emptyを送出します。 そうでない場合 (block が偽) は、直ちにアイテムが取り出せるならば、それを返します。 できないならば、例外Emptyを送出します (この場合 timeout は無視されます)。
-
Queue.get_nowait()¶ get(False)と等価です。
キューに入れられたタスクが全てコンシューマスレッドに処理されたかどうかを追跡するために 2つのメソッドが提供されます。
-
Queue.task_done()¶ 過去にキューに入れられたタスクが完了した事を示します。キューのコンシューマスレッドに利用されます。タスクの取り出しに使われた各
get()の後にtask_done()を呼び出すと、取り出したタスクに対する処理が完了した事をキューに教えます。join()がブロックされていた場合、全itemが処理された (キューにput()された全てのitemに対してtask_done()が呼び出されたことを意味します) 時に復帰します。キューにある要素より多く呼び出された場合
ValueErrorが発生します。
-
Queue.join()¶ キューにあるすべてのアイテムが取り出されて処理されるまでブロックします。
キューにitemが追加される度に、未完了タスクカウントが増やされます。コンシューマスレッドが
task_done()を呼び出して、itemを受け取ってそれに対する処理が完了した事を知らせる度に、未完了タスクカウントが減らされます。未完了タスクカウントが0になったときに、join()のブロックが解除されます。
キューに入れたタスクが完了するのを待つ例:
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
参考
multiprocessing.Queueクラス(マルチスレッドではなく) マルチプロセスの文脈で使用されるキュークラス。
collections.deque は、ロックなしで append() や popleft() といったアトミック操作が可能なキューの実装です。
