17.2. multiprocessing — プロセスベースの並列処理¶
ソースコード: Lib/multiprocessing/
17.2.1. はじめに¶
multiprocessing は、 threading と似た API で複数のプロセスの生成をサポートするパッケージです。 multiprocessing パッケージは、ローカルとリモート両方の並行処理を提供します。また、このパッケージはスレッドの代わりにサブプロセスを使用することにより、グローバルインタープリタロック の問題を避ける工夫が行われています。このような特徴があるため multiprocessing モジュールを使うことで、マルチプロセッサーマシンの性能を最大限に活用することができるでしょう。なお、このモジュールは Unix と Windows の両方で動作します。
multiprocessing モジュールでは、threading モジュールには似たものが存在しない API も導入されています。その最たるものが Pool オブジェクトです。これは複数の入力データに対して、サブプロセス群に入力データを分配 (データ並列) して関数を並列実行するのに便利な手段を提供します。以下の例では、モジュール内で関数を定義して、子プロセスがそのモジュールを正常にインポートできるようにする一般的な方法を示します。 Pool を用いたデータ並列の基礎的な例は次の通りです:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
標準出力に以下が出力されます:
[1, 4, 9]
17.2.1.1. Process クラス¶
multiprocessing モジュールでは、プロセスは以下の手順によって生成されます。はじめに Process のオブジェクトを作成し、続いて start() メソッドを呼び出します。この Process クラスは threading.Thread クラスと同様の API を持っています。まずは、簡単な例をもとにマルチプロセスを使用したプログラムについてみていきましょう
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
実行された個々のプロセス ID を表示するために拡張したサンプルコードを以下に示します:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
なぜ if __name__ == '__main__' という記述が必要かは プログラミングガイドライン を参照してください。
17.2.1.2. コンテキストと開始方式¶
プラットフォームにもよりますが、multiprocessing はプロセスを開始するために 3 つの方法をサポートしています。それら 開始方式 は以下のとおりです
- spawn
親プロセスは新たに python インタープリタープロセスを開始します。子プロセスはプロセスオブジェクトの
run()メソッドの実行に必要なリソースのみ継承します。特に、親プロセスからの不要なファイル記述子とハンドルは継承されません。この方式を使用したプロセスの開始は fork や forkserver に比べ遅くなります。Unix と Windows で利用可能。Windows でのデフォルト。
- fork
親プロセスは
os.fork()を使用して Python インタープリターをフォークします。子プロセスはそれが開始されるとき、事実上親プロセスと同一になります。親プロセスのリソースはすべて子プロセスに継承されます。マルチスレッドプロセスのフォークは安全性に問題があることに注意してください。Unix でのみ利用可能。Unix でのデフォルト。
- forkserver
プログラムを開始するとき forkserver 方式を選択した場合、サーバープロセスが開始されます。それ以降、新しいプロセスが必要になったときはいつでも、親プロセスはサーバーに接続し、新しいプロセスのフォークを要求します。フォークサーバープロセスはシングルスレッドなので
os.fork()の使用に関しても安全です。不要なリソースは継承されません。Unix パイプを経由したファイル記述子の受け渡しをサポートする Unix で利用可能。
バージョン 3.4 で変更: すべての Unix プラットフォームで spawn が、一部のプラットフォームで forkserver が追加されました。Windows では親プロセスの継承可能な全ハンドルが子プロセスに継承されることがなくなりました。
Unix で開始方式に spawn あるいは forkserver を使用した場合は、プログラムのプロセスによって作成されたリンクされていない名前付きセマフォを追跡する セマフォトラッカー プロセスも開始されます。全プロセスが終了したときにセマフォトラッカーは残っているあらゆるセマフォのリンクを解除します。通常そういったことはないのですが、プロセスがシグナルによって kill されたときに “漏れた” セマフォが発生する場合があります。(名前付きセマフォのリンク解除は、システムが個数の上限のみ許可している場合に深刻な問題になるため、それらは再起動されるまで自動的にリンク解除されることはありません。)
開始方式はメインモジュールの if __name__ == '__main__' 節内で、関数 set_start_method() によって指定します。以下に例を示します:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
関数 set_start_method() はプログラム内で複数回使用してはいけません。
もうひとつの方法として、get_context() を使用してコンテキストオブジェクトを取得することができます。コンテキストオブジェクトは multiprocessing モジュールと同じ API を持ち、同じプログラム内で複数の開始方式を使用できます。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
あるコンテキストに関連したオブジェクトは異なるコンテキストのプロセスとは互換性がない場合があることに注意してください。特に、fork コンテキストを使用して作成されたロックは、spawn あるいは forkserver を使用して開始されたプロセスに渡すことはできません。
特定の開始方式の使用を要求するライブラリは get_context() を使用してライブラリ利用者の選択を阻害しないようにするべきです。
17.2.1.3. プロセス間でのオブジェクト交換¶
multiprocessing モジュールでは、プロセス間通信の手段が2つ用意されています。それぞれ以下に詳細を示します:
キュー (Queue)
Queueクラスはqueue.Queueクラスとほとんど同じように使うことができます。以下に例を示します:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()キューはスレッドセーフであり、プロセスセーフです。
パイプ (Pipe)
Pipe()関数はパイプで繋がれたコネクションオブジェクトのペアを返します。デフォルトでは双方向性パイプを返します。以下に例を示します:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()パイプのそれぞれの端を表す2つのコネクションオブジェクトが
Pipe()関数から返されます。各コネクションオブジェクトには、send()、recv()、その他のメソッドがあります。2つのプロセス (またはスレッド) がパイプの 同じ 端で同時に読み込みや書き込みを行うと、パイプ内のデータが破損してしまうかもしれないことに注意してください。もちろん、各プロセスがパイプの別々の端を同時に使用するならば、データが破壊される危険性はありません。
17.2.1.4. プロセス間の同期¶
multiprocessing は threading モジュールと等価な同期プリミティブを備えています。以下の例では、ロックを使用して、一度に1つのプロセスしか標準出力に書き込まないようにしています:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
ロックを使用しないで標準出力に書き込んだ場合は、各プロセスからの出力がごちゃまぜになってしまいます。
17.2.1.5. プロセス間での状態の共有¶
これまでの話の流れで触れたとおり、並行プログラミングを行うときには、できるかぎり状態を共有しないのが定石です。複数のプロセスを使用するときは特にそうでしょう。
しかし、どうしてもプロセス間のデータ共有が必要な場合のために multiprocessing モジュールには2つの方法が用意されています。
共有メモリ (Shared memory)
データを共有メモリ上に保持するために
Valueクラス、もしくはArrayクラスを使用することができます。以下のサンプルコードを使って、この機能についてみていきましょうfrom multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])このサンプルコードを実行すると以下のように表示されます
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
numとarrを生成するときに使用されている、引数'd'と'i'はarrayモジュールにより使用される種別の型コードです。ここで使用されている'd'は倍精度浮動小数、'i'は符号付整数を表します。これらの共有オブジェクトは、プロセスセーフでありスレッドセーフです。共有メモリを使用して、さらに柔軟なプログラミングを行うには
multiprocessing.sharedctypesモジュールを使用します。このモジュールは共有メモリから割り当てられた任意の ctypes オブジェクトの生成をサポートします。
サーバープロセス (Server process)
Manager()関数により生成されたマネージャーオブジェクトはサーバープロセスを管理します。マネージャーオブジェクトは Python のオブジェクトを保持して、他のプロセスがプロキシ経由でその Python オブジェクトを操作することができます。
Manager()関数が返すマネージャはlist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value,Arrayをサポートします。 以下にサンプルコードを示します。from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)このサンプルコードを実行すると以下のように表示されます
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]サーバープロセスのマネージャーオブジェクトは共有メモリのオブジェクトよりも柔軟であるといえます。それは、どのような型のオブジェクトでも使えるからです。また、1つのマネージャーオブジェクトはネットワーク経由で他のコンピューター上のプロセスによって共有することもできます。しかし、共有メモリより動作が遅いという欠点があります。
17.2.1.6. ワーカープロセスのプールを使用¶
Pool クラスは、ワーカープロセスをプールする機能を備えています。このクラスには、異なる方法でワーカープロセスへタスクを割り当てるいくつかのメソッドがあります。
例えば:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。
注釈
このパッケージに含まれる機能を使用するためには、子プロセスから __main__ モジュールをインポートできる必要があります。このことについては プログラミングガイドライン で触れていますが、ここであらためて強調しておきます。なぜかというと、いくつかのサンプルコード、例えば multiprocessing.pool.Pool のサンプルはインタラクティブシェル上では動作しないからです。以下に例を示します:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(このサンプルを試すと、3つのトレースバックすべてがほぼランダムに交互に重なって表示されます。そうなったら、なんとかしてマスタープロセスを止めましょう。)
17.2.2. リファレンス¶
multiprocessing パッケージは threading モジュールの API とほとんど同じです。
17.2.2.1. Process クラスと例外¶
-
class
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Process オブジェクトは各プロセスの処理を表します。
Processクラスはthreading.Threadクラスのすべてのメソッドと同じインタフェースを提供します。コンストラクターは必ずキーワード引数で呼び出すべきです。引数 group には必ず
Noneを渡してください。 この引数はthreading.Threadクラスとの互換性のためだけに残されています。引数 target には、run()メソッドから呼び出される callable オブジェクトを渡します。この引数はデフォルトでNoneとなっており、何も呼び出されません。引数 name にはプロセス名を渡します (詳細はnameを見てください)。 args は対象の呼び出しに対する引数のタプルを渡します。 kwargs は対象の呼び出しに対するキーワード引数の辞書を渡します。もし提供されれば、キーワードのみの daemon 引数はプロセスのdaemonフラグをTrueまたはFalseにセットします。Noneの場合 (デフォルト)、このフラグは作成するプロセスから継承されます。デフォルトでは、target には引数が渡されないようになっています。
サブクラスがコンストラクターをオーバーライドする場合は、そのプロセスに対する処理を行う前に基底クラスのコンストラクター (
Process.__init__()) を実行しなければなりません。バージョン 3.3 で変更: daemon 引数が追加されました。
-
run()¶ プロセスが実行する処理を表すメソッドです。
このメソッドはサブクラスでオーバーライドすることができます。標準の
run()メソッドは、コンストラクターの target 引数として渡された呼び出し可能オブジェクトを呼び出します。もしコンストラクターに args もしくは kwargs 引数が渡されていれば、呼び出すオブジェクトにこれらの引数を渡します。
-
start()¶ プロセスの処理を開始するためのメソッドです。
各 Process オブジェクトに対し、このメソッドが2回以上呼び出されてはいけません。各プロセスでオブジェクトの
run()メソッドを呼び出す準備を行います。
-
join([timeout])¶ オプションの引数 timeout が
None(デフォルト) の場合、join()メソッドが呼ばれたプロセスは処理が終了するまでブロックします。 timeout が正の数である場合、最大 timeout 秒ブロックします。 プロセスが終了あるいはタイムアウトした場合、メソッドはNoneを返すことに注意してください。 プロセスのexitcodeを確認し終了したかどうかを判断してください。1つのプロセスは何回も join されることができます。
プロセスは自分自身を join することはできません。それはデッドロックを引き起こすことがあるからです。プロセスが start される前に join しようとするとエラーが発生します。
-
name¶ プロセスの名前。名前は識別のためだけに使用される文字列です。それ自体には特別な意味はありません。複数のプロセスに同じ名前が与えられても構いません。
最初の名前はコンストラクターによってセットされます。コンストラクターに明示的な名前が渡されない場合、 ‘Process-N1:N2:...:Nk‘ 形式の名前が構築されます。ここでそれぞれの Nk はその親のN番目の子供です。
-
daemon¶ デーモンプロセスであるかのフラグであり、ブール値です。この属性は
start()が呼び出される前に設定されている必要があります。初期値は作成するプロセスから継承します。
あるプロセスが終了するとき、そのプロセスはその子プロセスであるデーモンプロセスすべてを終了させようとします。
デーモンプロセスは子プロセスを作成できないことに注意してください。もし作成できてしまうと、そのデーモンプロセスの親プロセスが終了したときにデーモンプロセスの子プロセスが孤児になってしまう場合があるからです。さらに言えば、デーモンプロセスはUnix デーモンやサービスでは なく 通常のプロセスであり、非デーモンプロセスが終了すると終了されます (そして join されません)。
threading.Threadクラスの API に加えてProcessクラスのオブジェクトには以下の属性およびメソッドがあります:-
pid¶ プロセスIDを返します。プロセスの生成前は
Noneが設定されています。
-
exitcode¶ 子プロセスの終了コードです。子プロセスがまだ終了していない場合は
Noneが返されます。負の値 -N は子プロセスがシグナル N で終了したことを表します。
-
authkey¶ プロセスの認証キーです (バイト文字列です)。
multiprocessingモジュールがメインプロセスにより初期化される場合には、os.urandom()関数を使用してランダムな値が設定されます。Processクラスのオブジェクトの作成時にその親プロセスから認証キーを継承します。もしくはauthkeyに別のバイト文字列を設定することもできます。詳細は 認証キー を参照してください。
-
sentinel¶ プロセスが終了するときに “ready” となるシステムオブジェクトの数値ハンドル。
multiprocessing.connection.wait()を使用していくつかのイベントを同時に wait したい場合はこの値を使うことができます。それ以外の場合はjoin()を呼ぶ方がより単純です。Windows においては、これは
WaitForSingleObjectおよびWaitForMultipleObjectsファミリーの API 呼び出しで使用可能な OS ハンドルです。 Unix においては、これはselectモジュールのプリミティブで使用可能なファイル記述子です。バージョン 3.3 で追加.
-
terminate()¶ プロセスを終了します。Unix 環境では
SIGTERMシグナルを、 Windows 環境ではTerminateProcess()を使用して終了させます。終了ハンドラーや finally 節などは、実行されないことに注意してください。このメソッドにより終了するプロセスの子孫プロセスは、終了 しません。そういった子孫プロセスは単純に孤児になります。
警告
このメソッドの使用時に、関連付けられたプロセスがパイプやキューを使用している場合には、使用中のパイプやキューが破損して他のプロセスから使用できなくなる可能性があります。同様に、プロセスがロックやセマフォなどを取得している場合には、このプロセスが終了してしまうと他のプロセスのデッドロックの原因になるでしょう。
プロセスオブジェクトが作成したプロセスのみが
start(),join(),is_alive(),terminate()とexitcodeのメソッドを呼び出すべきです。以下の例では
Processのメソッドの使い方を示しています:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process(Process-1, initial)> False >>> p.start() >>> print(p, p.is_alive()) <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.ProcessError¶ すべての
multiprocessing例外の基底クラスです。
-
exception
multiprocessing.BufferTooShort¶ この例外は
Connection.recv_bytes_into()によって発生し、バッファーオブジェクトが小さすぎてメッセージが読み込めないことを示します。eがBufferTooShortのインスタンスであるとすると、e.args[0]はそのメッセージをバイト文字列で与えるものです。
-
exception
multiprocessing.AuthenticationError¶ 認証エラーがあった場合に送出されます。
-
exception
multiprocessing.TimeoutError¶ タイムアウトをサポートするメソッドでタイムアウトが過ぎたときに送出されます。
17.2.2.2. パイプ (Pipe) とキュー (Queue)¶
複数のプロセスを使う場合、一般的にはメッセージパッシングをプロセス間通信に使用し、ロックのような同期プリミティブを使用しないようにします。
メッセージのやりとりのために Pipe() (2つのプロセス間の通信用)、もしくはキュー (複数のメッセージ生成プロセス (producer)、消費プロセス (consumer) の実現用) を使うことができます。
Queue, SimpleQueue と JoinableQueue 型は複数プロセスから生成/消費を行う FIFO キューです。これらのキューは標準ライブラリの queue.Queue を模倣しています。 Queue には Python 2.5 の queue.Queue クラスで導入された task_done() と join() メソッドがないことが違う点です。
もし JoinableQueue を使用するなら、キューから削除される各タスクのために JoinableQueue.task_done() を呼び出さなければ なりません 。さもないと、いつか完了していないタスクを数えるためのセマフォがオーバーフローし、例外を発生させるでしょう。
管理オブジェクトを使用することで共有キューを作成できることも覚えておいてください。詳細は マネージャー を参照してください。
注釈
multiprocessing は、タイムアウトを伝えるために、通常の queue.Empty と queue.Full 例外を使用します。それらは multiprocessing の名前空間では利用できないため、queue からインポートする必要があります。
注釈
オブジェクトがキューに追加される際、そのオブジェクトは pickle 化されています。そのため、バックグラウンドのスレッドが後になって下位層のパイプに pickle 化されたデータをフラッシュすることがあります。これにより、少し驚くような結果になりますが、実際に問題になることはないはずです。これが問題になるような状況では、かわりに manager を使ってキューを作成することができるからです。
空のキューの中にオブジェクトを追加した後、キューの
empty()メソッドがFalseを返すまでの間にごくわずかな遅延が起きることがあり、get_nowait()がqueue.Emptyを発生させることなく制御が呼び出し元に返ってしまうことがあります。複数のプロセスがオブジェクトをキューに詰めている場合、キューの反対側ではオブジェクトが詰められたのとは違う順序で取得される可能性があります。ただし、同一のプロセスから詰め込まれたオブジェクトは、それらのオブジェクト間では、必ず期待どおりの順序になります。
警告
Queue を利用しようとしている最中にプロセスを Process.terminate() や os.kill() で終了させる場合、キューにあるデータは破損し易くなります。終了した後で他のプロセスがキューを利用しようとすると、例外を発生させる可能性があります。
警告
上述したように、もし子プロセスがキューへ要素を追加するなら (かつ JoinableQueue.cancel_join_thread を使用しないなら) そのプロセスはバッファーされたすべての要素がパイプへフラッシュされるまで終了しません。
これは、そのプロセスを join しようとする場合、キューに追加されたすべての要素が消費されたことが確実でないかぎり、デッドロックを発生させる可能性があることを意味します。似たような現象で、子プロセスが非デーモンプロセスの場合、親プロセスは終了時に非デーモンのすべての子プロセスを join しようとしてハングアップする可能性があります。
マネージャーを使用して作成されたキューではこの問題はありません。詳細は プログラミングガイドライン を参照してください。
プロセス間通信におけるキューの使用例を知りたいなら 使用例 を参照してください。
-
multiprocessing.Pipe([duplex])¶ パイプの両端を表す
Connectionオブジェクトのペア(conn1, conn2)を返します。duplex が
True(デフォルト) ならパイプは双方向性です。duplex がFalseならパイプは一方向性で、conn1はメッセージの受信専用、conn2はメッセージの送信専用になります。
-
class
multiprocessing.Queue([maxsize])¶ パイプや2~3個のロック/セマフォを使用して実装されたプロセス共有キューを返します。あるプロセスが最初に要素をキューへ追加するとき、バッファーからパイプの中へオブジェクトを転送する供給スレッドが開始されます。
標準ライブラリの
queueモジュールの通常のqueue.Emptyやqueue.Full例外がタイムアウトを伝えるために送出されます。Queueはtask_done()やjoin()を除くqueue.Queueのすべてのメソッドを実装します。-
qsize()¶ おおよそのキューのサイズを返します。マルチスレッディング/マルチプロセスの特性上、この数値は信用できません。
これは
sem_getvalue()が実装されていない Mac OS X のような Unix プラットホーム上でNotImplementedErrorを発生させる可能性があることを覚えておいてください。
-
empty()¶ キューが空っぽなら
Trueを、そうでなければFalseを返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。
-
full()¶ キューがいっぱいなら
Trueを、そうでなければFalseを返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。
-
put(obj[, block[, timeout]])¶ キューの中へ obj を追加します。オプションの引数 block が
True(デフォルト) 且つ timeout がNone(デフォルト) なら、空きスロットが利用可能になるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に空きスロットが利用できなかったらqueue.Full例外を発生させます。それ以外 (block がFalse) で、空きスロットがすぐに利用可能な場合はキューに要素を追加します。そうでなければqueue.Full例外が発生します(その場合 timeout は無視されます)。
-
put_nowait(obj)¶ put(obj, False)と等価です。
-
get([block[, timeout]])¶ キューから要素を取り出して削除します。オプションの引数 block が
True(デフォルト) 且つ timeout がNone(デフォルト) なら、要素が取り出せるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に要素が取り出せなかったらqueue.Empty例外を発生させます。それ以外 (block がFalse) で、要素がすぐに取り出せる場合は要素を返します。そうでなければqueue.Empty例外が発生します(その場合 timeout は無視されます)。
-
get_nowait()¶ get(False)と等価です。
multiprocessing.Queueはqueue.Queueにはない追加メソッドがあります。 これらのメソッドは通常、ほとんどのコードに必要ありません:-
close()¶ カレントプロセスからこのキューへそれ以上データが追加されないことを表します。バックグラウンドスレッドはパイプへバッファーされたすべてのデータをフラッシュするとすぐに終了します。これはキューがガベージコレクトされるときに自動的に呼び出されます。
-
join_thread()¶ バックグラウンドスレッドを join します。このメソッドは
close()が呼び出された後でのみ使用されます。バッファーされたすべてのデータがパイプへフラッシュされるのを保証するため、バックグラウンドスレッドが終了するまでブロックします。デフォルトでは、あるプロセスがキューを作成していない場合、終了時にキューのバックグラウンドスレッドを join しようとします。そのプロセスは
join_thread()が何もしないようにcancel_join_thread()を呼び出すことができます。
-
cancel_join_thread()¶ join_thread()がブロッキングするのを防ぎます。特にこれはバックグラウンドスレッドがそのプロセスの終了時に自動的に join されるのを防ぎます。詳細はjoin_thread()を参照してください。このメソッドは
allow_exit_without_flush()という名前のほうがよかったかもしれません。キューに追加されたデータが失われてしまいがちなため、このメソッドを使う必要はほぼ確実にないでしょう。本当にこれが必要になるのは、キューに追加されたデータを下位層のパイプにフラッシュすることなくカレントプロセスを直ちに終了する必要があり、かつ失われるデータに関心がない場合です。
注釈
このクラスに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォ (shared semaphore) を使用しているものがあります。これが使用できない場合には、このクラスが無効になり、
Queueをインスタンス化する時にImportErrorが発生します。詳細は bpo-3770 を参照してください。同様のことが、以下に列挙されている特殊なキューでも成り立ちます。-
-
class
multiprocessing.SimpleQueue¶ 単純化された
Queue型です。ロックされたPipeと非常に似ています。-
empty()¶ もしキューが空ならば
Trueを、そうでなければFalseを返します。
-
get()¶ キューからアイテムを取り除いて返します。
-
put(item)¶ item をキューに追加します。
-
-
class
multiprocessing.JoinableQueue([maxsize])¶ JoinableQueueはQueueのサブクラスであり、task_done()やjoin()メソッドが追加されているキューです。-
task_done()¶ 以前にキューへ追加されたタスクが完了したことを表します。キューのコンシューマによって使用されます。 タスクをフェッチするために使用されるそれぞれの
get()に対して、 後続のtask_done()呼び出しはタスクの処理が完了したことをキューへ伝えます。もし
join()がブロッキング状態なら、 すべての要素が処理されたときに復帰します(task_done()呼び出しが すべての要素からキュー内へput()されたと受け取ったことを意味します)。キューにある要素より多く呼び出された場合
ValueErrorが発生します。
-
join()¶ キューにあるすべてのアイテムが取り出されて処理されるまでブロックします。
キューに要素が追加されると未完了タスク数が増えます。コンシューマがキューの要素が取り出されてすべての処理が完了したことを表す
task_done()を呼び出すと数が減ります。 未完了タスク数がゼロになるとjoin()はブロッキングを解除します。
-
17.2.2.3. その他¶
-
multiprocessing.active_children()¶ カレントプロセスのすべてのアクティブな子プロセスのリストを返します。
これを呼び出すと “join” してすでに終了しているプロセスには副作用があります。
-
multiprocessing.cpu_count()¶ システムの CPU 数を返します。
NotImplementedErrorが送出される場合があります。
-
multiprocessing.current_process()¶ カレントプロセスに対応する
Processオブジェクトを返します。threading.current_thread()とよく似た関数です。
-
multiprocessing.freeze_support()¶ multiprocessingを使用しているプログラムをフリーズして Windows の実行可能形式を生成するためのサポートを追加します。(py2exe , PyInstaller や cx_Freeze でテストされています。)メインモジュールの
if __name__ == '__main__'の直後にこの関数を呼び出す必要があります。以下に例を示します:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
もし
freeze_support()の行がない場合、フリーズされた実行可能形式を実行しようとするとRuntimeErrorを発生させます。freeze_support()の呼び出しは Windows 以外の OS では効果がありません。さらに、もしモジュールが Windows の通常の Python インタプリタによって実行されているならば(プログラムがフリーズされていなければ)freeze_support()は効果がありません。
-
multiprocessing.get_all_start_methods()¶ サポートしている開始方式のリストを返します。先頭の要素がデフォルトを意味します。利用可能な開始方式には
'fork'、'spawn'および'forkserver'があります。Windows では'spawn'のみが利用可能です。Unix では'fork'および'spawn'は常にサポートされており、'fork'がデフォルトになります。バージョン 3.4 で追加.
-
multiprocessing.get_context(method=None)¶ multiprocessingモジュールと同じ属性を持つコンテキストオブジェクトを返します。method が
Noneの場合、デフォルトのコンテキストが返されます。その他の場合 method は'fork'、'spawn'あるいは'forkserver'でなければなりません。指定された開始方式が利用できない場合はValueErrorが送出されます。バージョン 3.4 で追加.
-
multiprocessing.get_start_method(allow_none=False)¶ 開始するプロセスで使用する開始方式名を返します。
開始方式がまだ確定しておらず、allow_none の値が偽の場合、開始方式はデフォルトに確定され、その名前が返されます。開始方式が確定しておらず、allow_none の値が真の場合、
Noneが返されます。返り値は
'fork'、'spawn'、'forkserver'あるいはNoneになります。Unix では'fork'が、Windows では'spawn'がデフォルトになります。バージョン 3.4 で追加.
-
multiprocessing.set_executable()¶ 子プロセスを開始するときに、使用する Python インタープリターのパスを設定します。(デフォルトでは
sys.executableが使用されます)。コードに組み込むときは、おそらく次のようにする必要がありますset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
子プロセスを作成する前に行ってください。
バージョン 3.4 で変更: Unix で開始方式に
'spawn'を使用している場合にサポートされました。
-
multiprocessing.set_start_method(method)¶ 子プロセスの開始方式を指定します。method には
'fork'、'spawn'あるいは'forkserver'を指定できます。これは一度しか呼び出すことができず、その場所もメインモジュールの
if __name__ == '__main__'節内で保護された状態でなければなりません。バージョン 3.4 で追加.
17.2.2.4. Connection オブジェクト¶
Connection オブジェクトは pickle でシリアライズ可能なオブジェクトか文字列を送ったり、受け取ったりします。そういったオブジェクトはメッセージ指向の接続ソケットと考えられます。
Connection オブジェクトは通常は Pipe() を使用して作成されます。 詳細は リスナーとクライアント も参照してください。
-
class
multiprocessing.Connection¶ -
send(obj)¶ コネクションの相手側へ
recv()を使用して読み込むオブジェクトを送ります。オブジェクトは pickle でシリアライズ可能でなければなりません。 pickle が極端に大きすぎる (OS にも依りますが、およそ 32 MB+) と、
ValueError例外が送出されることがあります。
-
recv()¶ コネクションの相手側から
send()を使用して送られたオブジェクトを返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合EOFErrorが発生します。
-
fileno()¶ コネクションが使用するハンドラーか、ファイル記述子を返します。
-
close()¶ コネクションをクローズします。
コネクションがガベージコレクトされるときに自動的に呼び出されます。
-
poll([timeout])¶ 読み込み可能なデータがあるかどうかを返します。
timeout が指定されていなければすぐに返します。timeout に数値を指定すると、最大指定した秒数をブロッキングします。timeout に
Noneを指定するとタイムアウトせずにずっとブロッキングします。multiprocessing.connection.wait()を使って複数のコネクションオブジェクトを同時にポーリングできることに注意してください。
-
send_bytes(buffer[, offset[, size]])¶ bytes-like object から完全なメッセージとしてバイトデータを送ります。
offset が指定されると buffer のその位置からデータが読み込まれます。 size が指定されるとバッファーからその量のデータが読み込まれます。非常に大きなバッファー (OS に依存しますが、およそ 32MB+) を指定すると、
ValueError例外が発生するかもしれません。
-
recv_bytes([maxlength])¶ コネクションの相手側から送られたバイトデータの完全なメッセージを文字列として返します。何か受け取るまでブロックします。受け取るデータが何も残っておらず、相手側がコネクションを閉じていた場合、
EOFErrorが送出されます。maxlength を指定していて、かつメッセージが maxlength より長い場合、
OSErrorが発生してコネクションからそれ以上読めなくなります。
-
recv_bytes_into(buffer[, offset])¶ コネクションの相手側から送られたバイトデータを buffer に読み込み、メッセージのバイト数を返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合
EOFErrorが発生します。buffer は書き込み可能な bytes-like object でなければなりません。 offset が与えられたら、その位置からバッファーへメッセージが書き込まれます。 オフセットは buffer バイトよりも小さい正の数でなければなりません。
バッファーがあまりに小さいと
BufferTooShort例外が発生します。eが例外インスタンスとすると完全なメッセージはe.args[0]で確認できます。
バージョン 3.3 で変更:
Connection.send()とConnection.recv()を使用して Connection オブジェクト自体をプロセス間で転送できるようになりました。バージョン 3.3 で追加: Connection オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 – コンテキストマネージャ型 を参照してください。
__enter__()は Connection オブジェクトを返します。また__exit__()はclose()を呼び出します。-
例えば:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv() メソッドは受信したデータを自動的に unpickle 化します。それはメッセージを送ったプロセスが信頼できる場合を除いてセキュリティリスクになります。
そのため Pipe() を使用してコネクションオブジェクトを生成する場合を除いて、何らかの認証処理を実行した後で recv() や send() メソッドのみを使用すべきです。詳細は 認証キー を参照してください。
警告
もしプロセスがパイプの読み込みまたは書き込み中に kill されると、メッセージの境界がどこなのか分からなくなってしまうので、そのパイプ内のデータは破損してしまいがちです。
17.2.2.5. 同期プリミティブ¶
一般的にマルチプロセスプログラムは、マルチスレッドプログラムほどは同期プリミティブを必要としません。詳細は threading モジュールのドキュメントを参照してください。
マネージャーオブジェクトを使用して同期プリミティブを作成できることも覚えておいてください。詳細は マネージャー を参照してください。
-
class
multiprocessing.Barrier(parties[, action[, timeout]])¶ バリアーオブジェクト:
threading.Barrierのクローンです。バージョン 3.3 で追加.
-
class
multiprocessing.BoundedSemaphore([value])¶ 有限セマフォオブジェクト:
threading.BoundedSemaphoreの類似物です。よく似た
threading.BoundedSemaphoreとは、次の一点だけ異なります。acquireメソッドの第一引数名は block で、Lock.acquire()と一致しています。注釈
Mac OS X では
sem_getvalue()が実装されていないのでSemaphoreと区別がつきません。
-
class
multiprocessing.Condition([lock])¶ 状態変数:
threading.Conditionの別名です。lock を指定するなら
multiprocessingのLockかRLockオブジェクトにすべきです。バージョン 3.3 で変更:
wait_for()メソッドが追加されました。
-
class
multiprocessing.Event¶ threading.Eventのクローンです。
-
class
multiprocessing.Lock¶ 再帰しないロックオブジェクトで、
threading.Lock相当のものです。プロセスやスレッドがロックをいったん獲得 (acquire) すると、それに続くほかのプロセスやスレッドが獲得しようとする際、それが解放 (release) されるまではブロックされます。解放はどのプロセス、スレッドからも行えます。スレッドに対して適用されるthreading.Lockのコンセプトと振る舞いは、特筆すべきものがない限り、プロセスとスレッドに適用されるmultiprocessing.Lockに引き継がれています。Lockは実際にはファクトリ関数で、デフォルトコンテキストで初期化されたmultiprocessing.synchronize.Lockのインスタンスを返すことに注意してください。Lockは context manager プロトコルをサポートしています。つまりwith文で使うことができます。-
acquire(block=True, timeout=None)¶ ブロックあり、またはブロックなしでロックを獲得します。
引数 block を
True(デフォルト) に設定して呼び出した場合、ロックがアンロック状態になるまでブロックします。ブロックから抜けるとそれをロック状態にしてからTrueを返します。threading.Lock.acquire()の最初の引数とは名前が違っているので注意してください。引数 block の値を
Falseにして呼び出すとブロックしません。 現在ロック状態であれば、直ちにFalseを返します。それ以外の場合には、ロックをロック状態にしてTrueを返します。timeout として正の浮動小数点数を与えて呼び出すと、ロックが獲得できない限り、指定された秒数だけブロックします。 timeout 値に負数を与えると、ゼロを与えた場合と同じになります。 timeout 値の
None(デフォルト) を与えると、無限にブロックします。 timeout 引数の負数とNoneの扱いは、threading.Lock.acquire()に実装された動作と異なるので注意してください。 block がFalseの場合、 timeout は実際的な意味を持たなくなるので無視されます。ロックを獲得した場合はTrue、タイムアウトした場合はFalseで戻ります。
-
release()¶ ロックを解放します。これはロックを獲得したプロセスやスレッドだけでなく、任意のプロセスやスレッドから呼ぶことができます。
threading.Lock.release()と同じように振舞いますが、ロックされていない場合に呼び出すとValueErrorとなる点だけが違います。
-
-
class
multiprocessing.RLock¶ 再帰ロックオブジェクトで、
threading.RLock相当のものです。再帰ロックオブジェクトはそれを獲得 (acquire) したプロセスやスレッドが解放 (release) しなければなりません。プロセスやスレッドがロックをいったん獲得すると、同じプロセスやスレッドはブロックされずに再度獲得出来ます。そのプロセスやスレッドは獲得した回数ぶん解放しなければなりません。RLockは実際にはファクトリ関数で、デフォルトコンテキストで初期化されたmultiprocessing.synchronize.Lockのインスタンスを返すことに注意してください。RLockは context manager プロトコルをサポートしています。つまりwith文で使うことができます。-
acquire(block=True, timeout=None)¶ ブロックあり、またはブロックなしでロックを獲得します。
block 引数を
Trueにして呼び出した場合、ロックが既にカレントプロセスもしくはカレントスレッドが既に所有していない限りは、アンロック状態 (どのプロセス、スレッドも所有していない状態) になるまでブロックします。ブロックから抜けるとカレントプロセスもしくはカレントスレッドが (既に持っていなければ) 所有権を得て、再帰レベルをインクリメントし、Trueで戻ります。threading.RLock.acquire()の実装とはこの最初の引数の振る舞いが、その名前自身を始めとしていくつか違うので注意してください。block 引数を
Falseにして呼び出した場合、ブロックしません。ロックが他のプロセスもしくはスレッドにより獲得済み (つまり所有されている) であれば、カレントプロセスまたはカレントスレッドは所有権を得ず、再帰レベルも変更せずに、Falseで戻ります。ロックがアンロック状態の場合、カレントプロセスもしくはカレントスレッドは所有権を得て再帰レベルがインクリメントされ、Trueで戻ります。(—訳注: block の True/False 関係なくここでの説明では「所有権を持っている場合の2度目以降の aquire」の説明が欠けています。2度目以降の acquire では再帰レベルがインクリメントされて即座に返ります。全体読めばわかるとは思いますが一応。—)timeout 引数の使い方と振る舞いは
Lock.acquire()と同じです。 timeout 引数の振る舞いがいくつかの点でthreading.RLock.acquire()と異なるので注意してください。
-
release()¶ 再帰レベルをデクリメントしてロックを解放します。デクリメント後に再帰レベルがゼロになった場合、ロックの状態をアンロック (いかなるプロセス、いかなるスレッドにも所有されていない状態) にリセットし、ロックの状態がアンロックになるのを待ってブロックしているプロセスもしくはスレッドがある場合にはその中のただ一つだけが処理を進行できるようにします。デクリメント後も再帰レベルがゼロでない場合、ロックの状態はロックのままで、呼び出し側のプロセスもしくはスレッドに所有されたままになります。
このメソッドは呼び出しプロセスあるいはスレッドがロックを所有している場合に限り呼び出してください。所有者でないプロセスもしくはスレッドによって呼ばれるか、あるいはアンロック (未所有) 状態で呼ばれた場合、
AssertionErrorが送出されます。同じ状況でのthreading.RLock.release()実装とは例外の型が異なるので注意してください。
-
-
class
multiprocessing.Semaphore([value])¶ セマフォオブジェクト:
threading.Semaphoreのクローンです。よく似た
threading.BoundedSemaphoreとは、次の一点だけ異なります。acquireメソッドの第一引数名は block で、Lock.acquire()と一致しています。
注釈
Mac OS X では sem_timedwait がサポートされていないので、acquire() にタイムアウトを与えて呼ぶと、ループ内でスリープすることでこの関数がエミュレートされます。
注釈
メインスレッドが BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() 又は Condition.wait() を呼び出してブロッキング状態のときに Ctrl-C で生成される SIGINT シグナルを受け取ると、その呼び出しはすぐに中断されて KeyboardInterrupt が発生します。
これは同等のブロッキング呼び出しが実行中のときに SIGINT が無視される threading の振る舞いとは違っています。
注釈
このパッケージに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォを使用しているものがあります。これが使用できない場合には、multiprocessing.synchronize モジュールが無効になり、このモジュールのインポート時に ImportError が発生します。詳細は bpo-3770 を参照してください。
17.2.2.7. マネージャー¶
マネージャーは異なるプロセス間で共有されるデータの作成方法を提供します。これには別のマシン上で走るプロセス間のネットワーク越しの共有も含まれます。マネージャーオブジェクトは 共有オブジェクト を管理するサーバープロセスを制御します。他のプロセスはプロキシ経由で共有オブジェクトへアクセスすることができます。
プロセス間でオブジェクトを共有するために使用される
SyncManagerオブジェクトを返します。返されたマネージャーオブジェクトは生成される子プロセスに対応付けられ、共有オブジェクトを作成するメソッドや、共有オブジェクトに対応するプロキシを返すメソッドを持ちます。
マネージャープロセスは親プロセスが終了するか、ガベージコレクトされると停止します。マネージャークラスは multiprocessing.managers モジュールで定義されています:
-
class
multiprocessing.managers.BaseManager([address[, authkey]])¶ BaseManager オブジェクトを作成します。
作成後、
start()またはget_server().serve_forever()を呼び出して、マネージャーオブジェクトが、開始されたマネージャープロセスを確実に参照するようにしてください。address はマネージャープロセスが新たなコネクションを待ち受けるアドレスです。address が
Noneの場合、任意のアドレスが設定されます。authkey はサーバープロセスへ接続しようとするコネクションの正当性を検証するために 使用される認証キーです。authkey が
Noneの場合current_process().authkeyが使用されます。authkey を使用する場合はバイト文字列でなければなりません。-
start([initializer[, initargs]])¶ マネージャーを開始するためにサブプロセスを開始します。initializer が
Noneでなければ、サブプロセスは開始時にinitializer(*initargs)を呼び出します。
-
get_server()¶ マネージャーの制御下にある実際のサーバーを表す
Serverオブジェクトを返します。Serverオブジェクトはserve_forever()メソッドをサポートします:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Serverはさらにaddress属性も持っています。
-
connect()¶ ローカルからリモートのマネージャーオブジェクトへ接続します:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc') >>> m.connect()
-
register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ マネージャークラスで呼び出し可能オブジェクト(callable)や型を登録するために使用されるクラスメソッドです。
typeid は特に共有オブジェクトの型を識別するために使用される “型識別子” です。これは文字列でなければなりません。
callable はこの型識別子のオブジェクトを作成するために使用される呼び出し可能オブジェクトです。マネージャーインスタンスが
connect()メソッドを使ってサーバーに接続されているか、 create_method 引数がFalseの場合は、Noneでも構いません。proxytype はこの typeid で共有オブジェクトのプロキシを作成するために使用される
BaseProxyのサブクラスです。Noneの場合、プロキシクラスは自動的に作成されます。exposed は
BaseProxy._callmethod()を使用したアクセスが許されるべき typeid をプロキシするメソッド名のシーケンスを指定するために使用されます (exposed がNoneの場合proxytype._exposed_が存在すればそれが代わりに使用されます)。exposed リストが指定されない場合、共有オブジェクトのすべての “パブリックメソッド” がアクセス可能になります。 (ここでいう “パブリックメソッド” とは__call__()メソッドを持つものと名前が'_'で始まらないあらゆる属性を意味します。)method_to_typeid はプロキシが返す exposed メソッドの返り値の型を指定するために使用されるマッピングで、メソッド名を typeid 文字列にマップします。 (method_to_typeid が
Noneの場合proxytype._method_to_typeid_が存在すれば、それが代わりに使用されます。) メソッド名がこのマッピングのキーではないか、マッピングがNoneの場合、そのメソッドによって返されるオブジェクトが値として (by value) コピーされます。create_method は、共有オブジェクトを作成し、それに対するプロキシを返すようサーバープロセスに伝える、名前 typeid のメソッドを作成するかを決定します。デフォルトでは
Trueです。
BaseManagerインスタンスも読み取り専用属性を1つ持っています:-
address¶ マネージャーが使用するアドレスです。
バージョン 3.3 で変更: マネージャーオブジェクトはコンテキストマネージメント・プロトコルをサポートします – コンテキストマネージャ型 を参照してください。
__enter__()は (まだ開始していない場合) サーバープロセスを開始してから、マネージャーオブジェクトを返します。__exit__()はshutdown()を呼び出します。旧バージョンでは、
__enter__()はマネージャーのサーバープロセスがまだ開始していなかった場合でもプロセスを開始しませんでした。-
-
class
multiprocessing.managers.SyncManager¶ プロセス間の同期のために使用される
BaseManagerのサブクラスです。multiprocessing.Manager()はこの型のオブジェクトを返します。また共有のリストやディクショナリの作成もサポートします。
-
Barrier(parties[, action[, timeout]])¶ 共有
threading.Barrierオブジェクトを作成して、そのプロキシを返します。バージョン 3.3 で追加.
-
BoundedSemaphore([value])¶ 共有
threading.BoundedSemaphoreオブジェクトを作成して、そのプロキシを返します。
-
Condition([lock])¶ 共有
threading.Conditionオブジェクトを作成して、そのプロキシを返します。lock が提供される場合
threading.Lockかthreading.RLockオブジェクトのためのプロキシになります。バージョン 3.3 で変更:
wait_for()メソッドが追加されました。
-
Event()¶ 共有
threading.Eventオブジェクトを作成して、そのプロキシを返します。
-
Lock()¶ 共有
threading.Lockオブジェクトを作成して、そのプロキシを返します。
-
Queue([maxsize])¶ 共有
queue.Queueオブジェクトを作成して、そのプロキシを返します。
-
RLock()¶ 共有
threading.RLockオブジェクトを作成して、そのプロキシを返します。
-
Semaphore([value])¶ 共有
threading.Semaphoreオブジェクトを作成して、そのプロキシを返します。
-
Array(typecode, sequence)¶ 配列を作成して、そのプロキシを返します。
-
Value(typecode, value)¶ 書き込み可能な
value属性を作成して、そのプロキシを返します。
-
dict()¶ -
dict(mapping) -
dict(sequence) 共有
dictオブジェクトを作成して、そのプロキシを返します。
-
list()¶ -
list(sequence) 共有
listオブジェクトを作成して、そのプロキシを返します。
注釈
プロキシには、自身の持つ辞書とリストのミュータブルな値や項目がいつ変更されたのかを知る方法がないため、これらの値や項目の変更はマネージャーを通して伝播しません。このような要素を変更するには、コンテナーのプロキシに変更されたオブジェクトを再代入してください:
# create a list proxy and append a mutable object (a dictionary) lproxy = manager.list() lproxy.append({}) # now mutate the dictionary d = lproxy[0] d['a'] = 1 d['b'] = 2 # at this point, the changes to d are not yet synced, but by # reassigning the dictionary, the proxy is notified of the change lproxy[0] = d
-
-
class
multiprocessing.managers.Namespace¶ SyncManagerに登録することのできる型です。Namespace オブジェクトにはパブリックなメソッドはありませんが、書き込み可能な属性を持ちます。そのオブジェクト表現はその属性の値を表示します。
しかし、Namespace オブジェクトのためにプロキシを使用するとき
'_'が先頭に付く属性はプロキシの属性になり、参照対象の属性にはなりません:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
17.2.2.7.1. カスタマイズされたマネージャー¶
独自のマネージャーを作成するには、BaseManager のサブクラスを作成して、 マネージャークラスで呼び出し可能なオブジェクトか新たな型を登録するために register() クラスメソッドを使用します。例えば:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
17.2.2.7.2. リモートマネージャーを使用する¶
あるマシン上でマネージャーサーバーを実行して、他のマシンからそのサーバーを使用するクライアントを持つことができます(ファイアウォールを通過できることが前提)。
次のコマンドを実行することでリモートクライアントからアクセスを受け付ける1つの共有キューのためにサーバーを作成します:
>>> from multiprocessing.managers import BaseManager
>>> import queue
>>> queue = queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
あるクライアントからサーバーへのアクセスは次のようになります:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
別のクライアントもそれを使用することができます:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
ローカルプロセスもそのキューへアクセスすることができます。クライアント上で上述のコードを使用してアクセスします:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
17.2.2.8. Proxy オブジェクト¶
プロキシは別のプロセスで(おそらく)有効な共有オブジェクトを 参照する オブジェクトです。共有オブジェクトはプロキシの 参照対象 になるということができます。複数のプロキシオブジェクトが同じ参照対象を持つ可能性もあります。
プロキシオブジェクトはその参照対象が持つ対応メソッドを実行するメソッドを持ちます。(そうは言っても、参照対象のすべてのメソッドが必ずしもプロキシ経由で利用可能ではありません) プロキシは通常その参照対象ができることと同じ方法で使用されます:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
プロキシに str() を適用すると参照対象のオブジェクト表現を返すのに対して、 repr() を適用するとプロキシのオブジェクト表現を返すことに注意してください。
プロキシオブジェクトの重要な機能はプロセス間で受け渡し可能な pickle 化ができることです。しかし、プロキシが対応するマネージャープロセスに対して送信される場合、そのプロキシを unpickle するとその参照対象を生成することを覚えておいてください。例えば、これはある共有オブジェクトに別の共有オブジェクトが含められることを意味します:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[[]] []
>>> b.append('hello')
>>> print(a, b)
[['hello']] ['hello']
注釈
multiprocessing のプロキシ型は値による比較に対して何もサポートしません。そのため、例えば以下のようになります:
>>> manager.list([1,2,3]) == [1,2,3]
False
比較を行いたいときは参照対象のコピーを使用してください。
-
class
multiprocessing.managers.BaseProxy¶ プロキシオブジェクトは
BaseProxyのサブクラスのインスタンスです。-
_callmethod(methodname[, args[, kwds]])¶ プロキシの参照対象のメソッドの実行結果を返します。
proxyがプロキシで、プロキシ内の参照対象がobjならこの式proxy._callmethod(methodname, args, kwds)
はこの式を評価します
getattr(obj, methodname)(*args, **kwds)
(マネージャープロセス内の)。
返される値はその呼び出し結果のコピーか、新たな共有オブジェクトに対するプロキシになります。詳細は
BaseManager.register()の method_to_typeid 引数のドキュメントを参照してください。その呼び出しによって例外が発生した場合、
_callmethod()によってその例外は再送出されます。他の例外がマネージャープロセスで発生したなら、RemoteError例外に変換されたものが_callmethod()によって送出されます。特に methodname が 公開 されていない場合は例外が発生することに注意してください。
_callmethod()の使用例になります:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue()¶ 参照対象のコピーを返します。
参照対象が unpickle 化できるなら例外を発生します。
-
__repr__()¶ プロキシオブジェクトのオブジェクト表現を返します。
-
__str__()¶ 参照対象のオブジェクト表現を返します。
-
17.2.2.8.1. クリーンアップ¶
プロキシオブジェクトは弱参照(weakref)コールバックを使用します。プロキシオブジェクトがガベージコレクトされるときにその参照対象が所有するマネージャーからその登録を取り消せるようにするためです。
共有オブジェクトはプロキシが参照しなくなったときにマネージャープロセスから削除されます。
17.2.2.9. プロセスプール¶
Pool クラスでタスクを実行するプロセスのプールを作成することができます。
-
class
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ プロセスプールオブジェクトは、ジョブを送り込めるワーカープロセスのプールを制御します。タイムアウトやコールバックのある非同期の実行をサポートし、並列 map 実装を持ちます。
processes は使用するワーカープロセスの数です。processes が
Noneの場合os.cpu_count()が返す値を使用します。initializer が
Noneではない場合、各ワーカープロセスは開始時にinitializer(*initargs)を呼び出します。maxtasksperchild は、ワーカープロセスが exit して新たなワーカープロセスと置き替えられるまでの間に、ワーカープロセスが完了することのできるタスクの数です。この設定により未利用のリソースが解放されるようなります。デフォルトの maxtasksperchild は
Noneで、これはワーカープロセスがプールと同じ期間だけ生き続けるということを意味します。context はワーカープロセスを開始するために使用されるコンテキストの指定に使用できます。通常プールは関数
multiprocessing.Pool()かコンテキストオブジェクトのPool()メソッドを使用して作成されます。どちらの場合でも context は適切に設定されます。プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。
バージョン 3.2 で追加: maxtasksperchild
バージョン 3.4 で追加: context
注釈
Pool中のワーカープロセスは、典型的にはプールのワークキューの存続期間とちょうど同じだけ生き続けます。ワーカーに確保されたリソースを解放するために (Apache, mod_wsgi, などのような) 他のシステムによく見られるパターンは、プール内のワーカーが設定された量だけの仕事を完了したら exit とクリーンアップを行い、古いプロセスを置き換えるために新しいプロセスを生成するというものです。Poolの maxtasksperchild 引数は、この能力をエンドユーザーに提供します。-
apply(func[, args[, kwds]])¶ 引数 args とキーワード引数 kwds を伴って func を呼びます。結果が準備できるまでブロックします。このブロックがあるため、
apply_async()の方が並行作業により適しています。加えて、 func は、プール内の1つのワーカーだけで実行されます。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶ apply()メソッドの派生版で結果オブジェクトを返します。callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。結果を返せるようになったときに callback が結果オブジェクトに対して適用されます。ただし呼び出しが失敗した場合は、代わりに error_callback が適用されます。
error_callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。対象の関数が失敗した場合、例外インスタンスを伴って error_callback が呼ばれます。
コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。
-
map(func, iterable[, chunksize])¶ map()組み込み関数の並列版です (iterable な引数を1つだけサポートするという違いはありますが)。結果が出るまでブロックします。このメソッドはイテラブルをいくつものチャンクに分割し、プロセスプールにそれぞれ独立したタスクとして送ります。(概算の) チャンクサイズは chunksize を正の整数に設定することで指定できます。
-
map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶ map()メソッドの派生版で結果オブジェクトを返します。callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。結果を返せるようになったときに callback が結果オブジェクトに対して適用されます。ただし呼び出しが失敗した場合は、代わりに error_callback が適用されます。
error_callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。対象の関数が失敗した場合、例外インスタンスを伴って error_callback が呼ばれます。
コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。
-
imap(func, iterable[, chunksize])¶ map()の遅延評価版です。chunksize 引数は
map()メソッドで使用されるものと同じです。 引数 iterable がとても長いなら chunksize に大きな値を指定して使用する方がデフォルト値の1を使用するよりもジョブの完了が かなり 速くなります。また chunksize が
1の場合imap()メソッドが返すイテレーターのnext()メソッドはオプションで timeout パラメーターを持ちます。next(timeout)は、その結果が timeout 秒以内に返されないときにmultiprocessing.TimeoutErrorを発生させます。
-
imap_unordered(func, iterable[, chunksize])¶ イテレーターが返す結果の順番が任意の順番で良いと見なされることを除けば
imap()と同じです。 (ワーカープロセスが1つしかない場合のみ “正しい” 順番になることが保証されます。)
-
starmap(func, iterable[, chunksize])¶ iterable の要素が、引数として unpack されるイテレート可能オブジェクトであると期待される以外は、
map()と似ています。そのため、iterable が
[(1,2), (3, 4)]なら、結果は[func(1,2), func(3,4)]になります。バージョン 3.3 で追加.
-
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])¶ starmap()とmap_async()の組み合わせです。 イテレート可能オブジェクトの iterable をイテレートして、 unpack したイテレート可能オブジェクトを伴って func を呼び出します。結果オブジェクトを返します。バージョン 3.3 で追加.
-
close()¶ これ以上プールでタスクが実行されないようにします。すべてのタスクが完了した後でワーカープロセスが終了します。
-
terminate()¶ 実行中の処理を完了させずにワーカープロセスをすぐに停止します。プールオブジェクトがガベージコレクトされるときに
terminate()が呼び出されます。
-
join()¶ ワーカープロセスが終了するのを待ちます。
join()を使用する前にclose()かterminate()を呼び出さなければなりません。
バージョン 3.3 で追加: Pool オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 – コンテキストマネージャ型 を参照してください。
__enter__()は Pool オブジェクトを返します。また__exit__()はterminate()を呼び出します。-
-
class
multiprocessing.pool.AsyncResult¶ Pool.apply_async()やPool.map_async()で返される結果のクラスです。-
get([timeout])¶ 結果を受け取ったときに返します。 timeout が
Noneではなくて、その結果が timeout 秒以内に受け取れない場合multiprocessing.TimeoutErrorが発生します。リモートの呼び出しが例外を発生させる場合、その例外はget()が再発生させます。
-
wait([timeout])¶ その結果が有効になるか timeout 秒経つまで待ちます。
-
ready()¶ その呼び出しが完了しているかどうかを返します。
-
successful()¶ その呼び出しが例外を発生させることなく完了したかどうかを返します。その結果が返せる状態でない場合
AssertionErrorが発生します。
-
次の例はプールの使用例を紹介します:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
17.2.2.10. リスナーとクライアント¶
通常、プロセス間でメッセージを渡すにはキューを使用するか Pipe() が返す Connection オブジェクトを使用します。
しかし multiprocessing.connection モジュールにはさらに柔軟な仕組みがあります。 このモジュールは、基本的にはソケットもしくは Windows の名前付きパイプを扱う高レベルのメッセージ指向 API を提供します。また、 hmac モジュールを使用した ダイジェスト認証 や同時の複数接続のポーリングもサポートします。
-
multiprocessing.connection.deliver_challenge(connection, authkey)¶ ランダム生成したメッセージをコネクションの相手側へ送信して応答を待ちます。
その応答がキーとして authkey を使用するメッセージのダイジェストと一致する場合、 コネクションの相手側へ歓迎メッセージを送信します。 そうでなければ
AuthenticationErrorを発生させます。
-
multiprocessing.connection.answer_challenge(connection, authkey)¶ メッセージを受信して、そのキーとして authkey を使用するメッセージのダイジェストを計算し、ダイジェストを送り返します。
歓迎メッセージを受け取れない場合
AuthenticationErrorが発生します。
-
multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])¶ address で渡したアドレスを使用するリスナーに対してコネクションを確立しようとして
Connectionを返します。コネクション種別は family 引数で決定しますが、一般的には address のフォーマットから推測できるので、これは指定されません。 (アドレスフォーマット を参照してください)
authenticate が
Trueか authkey がバイト文字列の場合、 ダイジェスト認証が使用されます。認証に使用されるキーは authkey 、または authkey がNoneの場合はcurrent_process().authkeyのどちらかです。 認証が失敗した場合AuthenticationErrorが発生します。 認証キー を参照してください。
-
class
multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])¶ コネクションを ‘待ち受ける’ 束縛されたソケットか Windows の名前付きパイプのラッパーです。
address はリスナーオブジェクトの束縛されたソケットか名前付きパイプが使用するアドレスです。
注釈
‘0.0.0.0’ のアドレスを使用する場合、Windows 上の終点へ接続することができません。終点へ接続したい場合は ‘127.0.0.1’ を使用すべきです。
family は使用するソケット(名前付きパイプ)の種別です。これは
'AF_INET'(TCP ソケット),'AF_UNIX'(Unix ドメインソケット) または'AF_PIPE'(Windows 名前付きパイプ) という文字列のどれか1つになります。これらのうち'AF_INET'のみが利用可能であることが保証されています。 family がNoneの場合 address のフォーマットから推測されたものが使用されます。 address もNoneの場合はデフォルトが選択されます。詳細は アドレスフォーマット を参照してください。 family が'AF_UNIX'で address がNoneの場合tempfile.mkstemp()を使用して作成されたプライベートな一時ディレクトリにソケットが作成されます。リスナーオブジェクトがソケットを使用する場合、ソケットに束縛されるときに backlog (デフォルトでは1つ) がソケットの
listen()メソッドに対して渡されます。authenticate が
True(デフォルトではFalse) か authkey がNoneではない場合、ダイジェスト認証が使用されます。authkey がバイト文字列の場合、認証キーとして使用されます。そうでない場合は
Noneでなければいけません。authkey が
Noneかつ authenticate がTrueの場合current_process().authkeyが認証キーとして使用されます。 authkey がNoneかつ authenticate がFalseの場合、認証は行われません。 もし認証が失敗した場合AuthenticationErrorが発生します。 詳細については 認証キー を参照してください。-
accept()¶ リスナーオブジェクトの名前付きパイプか束縛されたソケット上でコネクションを 受け付けて
Connectionオブジェクトを返します。 認証が失敗した場合AuthenticationErrorが発生します。
-
close()¶ リスナーオブジェクトの名前付きパイプか束縛されたソケットをクローズします。これはリスナーがガベージコレクトされるときに自動的に呼ばれます。そうは言っても、明示的に close() を呼び出す方が望ましいです。
リスナーオブジェクトは次の読み取り専用属性を持っています:
-
address¶ リスナーオブジェクトが使用中のアドレスです。
-
last_accepted¶ 最後にコネクションを受け付けたアドレスです。有効なアドレスがない場合は
Noneになります。
バージョン 3.3 で追加: Listener オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 – コンテキストマネージャ型 を参照してください。
__enter__()はリスナーオブジェクトを返します。また__exit__()はclose()を呼び出します。-
-
multiprocessing.connection.wait(object_list, timeout=None)¶ object_list 中のオブジェクトが準備ができるまで待機します。準備ができた object_list 中のオブジェクトのリストを返します。timeout が浮動小数点なら、最大でその秒数だけ呼び出しがブロックします。timeout が
Noneの場合、無制限の期間ブロックします。負のタイムアウトは0と等価です。Unix と Windows の両方で、 object_list には以下のオブジェクトを含めることが出来ます
読み取り可能な
Connectionオブジェクト;接続された読み取り可能な
socket.socketオブジェクト; または
読み取ることのできるデータがある場合、あるいは相手側の端が閉じられている場合、コネクションまたはソケットオブジェクトは準備ができています。
Unix:
wait(object_list, timeout)はselect.select(object_list, [], [], timeout)とほとんど等価です。違いは、select.select()がシグナルによって中断される場合、EINTRのエラー番号付きでOSErrorを上げるということです。wait()はそのようなことは行いません。Windows: object_list の要素は、 (Win32 関数
WaitForMultipleObjects()のドキュメントで使われている定義から) wait 可能な整数ハンドルか、ソケットハンドルまたはパイプハンドルを返すfileno()メソッドを持つオブジェクトのどちらかでなければなりません。 (パイプハンドルとソケットハンドラーは wait 可能なハンドルでは ない ことに注意してください。)バージョン 3.3 で追加.
例
次のサーバーコードは認証キーとして 'secret password' を使用するリスナーを作成します。このサーバーはコネクションを待ってクライアントへデータを送信します:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
次のコードはサーバーへ接続して、サーバーからデータを受信します:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
次のコードは wait() を使って複数のプロセスからのメッセージを同時に待ちます:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
17.2.2.10.1. アドレスフォーマット¶
'AF_INET'アドレスは(hostname, port)のタプルになります。hostname は文字列で port は整数です。'AF_UNIX'アドレスはファイルシステム上のファイル名の文字列です。'AF_PIPE'アドレスは、次の形式を持つ文字列ですr'\\.\pipe\PipeName'。 ServerName という名前のリモートコンピューター上の名前付きパイプに接続するためにClient()を使用するには、代わりにr'\\ServerName\pipe\PipeName'形式のアドレスを使用する必要があります。
デフォルトでは、2つのバックスラッシュで始まる文字列は 'AF_UNIX' よりも 'AF_PIPE' として推測されることに注意してください。
17.2.2.11. 認証キー¶
Connection.recv を使用するとき、データは自動的に unpickle されて受信します。 信頼できない接続元からのデータを unpickle することはセキュリティリスクがあります。 そのため Listener や Client() はダイジェスト認証を提供するために hmac モジュールを使用します。
認証キーはパスワードとして見なされるバイト文字列です。コネクションが確立すると、双方の終点で正しい接続先であることを証明するために 知っているお互いの認証キーを要求します。(双方の終点が同じキーを使用して通信しようとしても、コネクション上でそのキーを送信することは できません。)
認証が要求されているにもかかわらず認証キーが指定されていない場合 current_process().authkey の返す値が使用されます。 (詳細は Process を参照してください。) この値はカレントプロセスを作成する Process オブジェクトによって自動的に継承されます。 これは(デフォルトでは)複数プロセスのプログラムの全プロセスが相互にコネクションを 確立するときに使用される1つの認証キーを共有することを意味します。
適当な認証キーを os.urandom() を使用して生成することもできます。
17.2.2.12. ログ記録¶
ロギングのためにいくつかの機能が利用可能です。しかし logging パッケージは、 (ハンドラー種別に依存して)違うプロセスからのメッセージがごちゃ混ぜになるので、プロセスの共有ロックを使用しないことに注意してください。
-
multiprocessing.get_logger()¶ multiprocessingが使用するロガーを返します。必要に応じて新たなロガーを作成します。最初に作成するとき、ロガーはレベルに
logging.NOTSETが設定されていてデフォルトハンドラーがありません。このロガーへ送られるメッセージはデフォルトではルートロガーへ伝播されません。Windows 上では子プロセスが親プロセスのロガーレベルを継承しないことに注意してください。さらにその他のロガーのカスタマイズ内容もすべて継承されません。
-
multiprocessing.log_to_stderr()¶ この関数は
get_logger()に対する呼び出しを実行しますが、 get_logger によって作成されるロガーを返すことに加えて、'[%(levelname)s/%(processName)s] %(message)s'のフォーマットを使用してsys.stderrへ出力を送るハンドラーを追加します。
以下にロギングを有効にした例を紹介します:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
完全なロギングレベルの表については logging モジュールを参照してください。
17.2.2.13. multiprocessing.dummy モジュール¶
multiprocessing.dummy は multiprocessing の API を複製しますが threading モジュールのラッパーでしかありません。
17.2.3. プログラミングガイドライン¶
multiprocessing を使用するときに守るべき一定のガイドラインとイディオムを挙げます。
17.2.3.1. すべての開始方式について¶
以下はすべての開始方式に当てはまります。
共有状態を避ける
できるだけプロセス間で巨大なデータを移動することは避けるようにすべきです。
プロセス間の通信には、
threadingモジュールの低レベルな同期プリミティブを使うのではなく、キューやパイプを使うのが良いでしょう。
pickle 化の可能性
プロキシのメソッドへの引数は、pickle 化できるものにしてください。
プロキシのスレッドセーフ性
1 つのプロキシオブジェクトは、ロックで保護しないかぎり、2 つ以上のスレッドから使用してはいけません。
(異なるプロセスで 同じ プロキシを使用することは問題ではありません。)
ゾンビプロセスを join する
Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (または
active_children()が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスのProcess.is_aliveはそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。
pickle/unpickle より継承する方が良い
開始方式に spawn あるいは forkserver を使用している場合、
multiprocessingから多くの型を pickle 化する必要があるため子プロセスはそれらを使うことができます。しかし、一般にパイプやキューを使用して共有オブジェクトを他のプロセスに送信することは避けるべきです。代わりに、共有リソースにアクセスする必要のあるプロセスは上位プロセスからそれらを継承するようにすべきです。
プロセスの強制終了を避ける
あるプロセスを停止するために
Process.terminateメソッドを使用すると、そのプロセスが現在使用されている (ロック、セマフォ、パイプやキューのような) 共有リソースを破壊したり他のプロセスから利用できない状態を引き起こし易いです。そのため、共有リソースを使用しないプロセスでのみ
Process.terminateを使用することを考慮することがおそらく最善の方法です。
キューを使用するプロセスを join する
キューに要素を追加するプロセスは、すべてのバッファーされた要素が “feeder” スレッドによって下位層のパイプに対してフィードされるまで終了を待つということを覚えておいてください。 (子プロセスはこの動作を避けるためにキューの
Queue.cancel_join_threadメソッドを呼ぶことができます。)これはキューを使用するときに、キューに追加されたすべての要素が最終的にそのプロセスが join される前に削除されていることを確認する必要があることを意味します。そうしないと、そのキューに要素が追加したプロセスの終了を保証できません。デーモンではないプロセスは自動的に join されることも覚えておいてください。
次の例はデッドロックを引き起こします:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()修正するには最後の2行を入れ替えます(または単純に
p.join()の行を削除します)。
明示的に子プロセスへリソースを渡す
Unix で開始方式に fork を使用している場合、子プロセスはグローバルリソースを使用した親プロセス内で作成された共有リソースを使用できます。しかし、オブジェクトを子プロセスのコンストラクターに引数として渡すべきです。
Windows や他の開始方式と (将来的にでも) 互換性のあるコードを書く場合は別として、これは子プロセスが実行中である限りは親プロセス内でオブジェクトがガベージコレクトされないことも保証します。これは親プロセス内でオブジェクトがガベージコレクトされたときに一部のリソースが開放されてしまう場合に重要かもしれません。
そのため、例えば
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()は、次のように書き直すべきです
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
sys.stdin を file-like オブジェクトに置き換えることに注意する
multiprocessingは元々無条件に:os.close(sys.stdin.fileno())を
multiprocessing.Process._bootstrap()メソッドの中で呼び出していました — これはプロセス内プロセス (processes-in-processes) で問題が起こしてしまいます。そこで、これは以下のように変更されました:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)これによってプロセス同士が衝突して bad file descripter エラーを起こすという根本的な問題は解決しましたが、アプリケーションの出力バッファーを
sys.stdin()から “file-like オブジェクト” に置き換えるという潜在的危険を持ち込んでしまいました。危険というのは、複数のプロセスが file-like オブジェクトのclose()を呼び出すと、オブジェクトに同じデータが何度もフラッシュされ、破損してしまう可能性がある、というものです。もし file-like オブジェクトを書いて独自のキャッシュを実装するなら、キャッシュするときに常に pid を記録しておき、pid が変わったらキュッシュを捨てることで、フォークセーフにできます。例:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
17.2.3.2. 開始方式が spawn および forkserver の場合¶
開始方式に fork を適用しない場合にいくつかの追加の制限事項があります。
さらなる pickle 化の可能性
Process.__init__()へのすべての引数は pickle 化できることを確認してください。またProcessをサブクラス化する場合、そのインスタンスがProcess.startメソッドが呼ばれたときに pickle 化できるようにしてください。
グローバル変数
子プロセスで実行されるコードがグローバル変数にアクセスしようとする場合、子プロセスが見るその値は
Process.startが呼ばれたときの親プロセスの値と同じではない可能性があります。しかし、単にモジュールレベルの定数であるグローバル変数なら問題にはなりません。
メインモジュールの安全なインポート
新たな Python インタープリターによって、意図しない副作用 (新たなプロセスを開始するなど) を起こさずにメインモジュールを安全にインポートできることを確認してください。
例えば、開始方式に spawn あるいは forkserver を使用した場合に以下のモジュールを実行すると
RuntimeErrorで失敗します:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()代わりに、次のように
if __name__ == '__main__':を使用してプログラムの “エントリポイント” を保護すべきです:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(プログラムをフリーズせずに通常通り実行するなら
freeze_support()行は取り除けます。)これは新たに生成された Python インタープリターがそのモジュールを安全にインポートして、モジュールの
foo()関数を実行します。プールまたはマネージャーがメインモジュールで作成される場合に似たような制限が適用されます。
17.2.4. 使用例¶
カスタマイズされたマネージャーやプロキシの作成方法と使用方法を紹介します:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Pool を使用する例です:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
ワーカープロセスのコレクションに対してタスクをフィードしてその結果をまとめるキューの使い方の例を紹介します:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
