17.2. multiprocessing — プロセスベースの並列処理

ソースコード: Lib/multiprocessing/


17.2.1. はじめに

multiprocessing は、 threading と似た API で複数のプロセスの生成をサポートするパッケージです。 multiprocessing パッケージは、ローカルとリモート両方の並行処理を提供します。また、このパッケージはスレッドの代わりにサブプロセスを使用することにより、グローバルインタープリタロック の問題を避ける工夫が行われています。このような特徴があるため multiprocessing モジュールを使うことで、マルチプロセッサーマシンの性能を最大限に活用することができるでしょう。なお、このモジュールは Unix と Windows の両方で動作します。

multiprocessing モジュールでは、threading モジュールには似たものが存在しない API も導入されています。その最たるものが Pool オブジェクトです。これは複数の入力データに対して、サブプロセス群に入力データを分配 (データ並列) して関数を並列実行するのに便利な手段を提供します。以下の例では、モジュール内で関数を定義して、子プロセスがそのモジュールを正常にインポートできるようにする一般的な方法を示します。 Pool を用いたデータ並列の基礎的な例は次の通りです:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

標準出力に以下が出力されます:

[1, 4, 9]

17.2.1.1. Process クラス

multiprocessing モジュールでは、プロセスは以下の手順によって生成されます。はじめに Process のオブジェクトを作成し、続いて start() メソッドを呼び出します。この Process クラスは threading.Thread クラスと同様の API を持っています。まずは、簡単な例をもとにマルチプロセスを使用したプログラムについてみていきましょう

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

実行された個々のプロセス ID を表示するために拡張したサンプルコードを以下に示します:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

なぜ if __name__ == '__main__' という記述が必要かは プログラミングガイドライン を参照してください。

17.2.1.2. コンテキストと開始方式

プラットフォームにもよりますが、multiprocessing はプロセスを開始するために 3 つの方法をサポートしています。それら 開始方式 は以下のとおりです

spawn

親プロセスは新たに python インタープリタープロセスを開始します。子プロセスはプロセスオブジェクトの run() メソッドの実行に必要なリソースのみ継承します。特に、親プロセスからの不要なファイル記述子とハンドルは継承されません。この方式を使用したプロセスの開始は forkforkserver に比べ遅くなります。

Unix と Windows で利用可能。Windows でのデフォルト。

fork

親プロセスは os.fork() を使用して Python インタープリターをフォークします。子プロセスはそれが開始されるとき、事実上親プロセスと同一になります。親プロセスのリソースはすべて子プロセスに継承されます。マルチスレッドプロセスのフォークは安全性に問題があることに注意してください。

Unix でのみ利用可能。Unix でのデフォルト。

forkserver

プログラムを開始するとき forkserver 方式を選択した場合、サーバープロセスが開始されます。それ以降、新しいプロセスが必要になったときはいつでも、親プロセスはサーバーに接続し、新しいプロセスのフォークを要求します。フォークサーバープロセスはシングルスレッドなので os.fork() の使用に関しても安全です。不要なリソースは継承されません。

Unix パイプを経由したファイル記述子の受け渡しをサポートする Unix で利用可能。

バージョン 3.4 で変更: すべての Unix プラットフォームで spawn が、一部のプラットフォームで forkserver が追加されました。Windows では親プロセスの継承可能な全ハンドルが子プロセスに継承されることがなくなりました。

On Unix using the spawn or forkserver start methods will also start a semaphore tracker process which tracks the unlinked named semaphores created by processes of the program. When all processes have exited the semaphore tracker unlinks any remaining semaphores. Usually there should be none, but if a process was killed by a signal there may some "leaked" semaphores. (Unlinking the named semaphores is a serious matter since the system allows only a limited number, and they will not be automatically unlinked until the next reboot.)

開始方式はメインモジュールの if __name__ == '__main__' 節内で、関数 set_start_method() によって指定します。以下に例を示します:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

関数 set_start_method() はプログラム内で複数回使用してはいけません。

もうひとつの方法として、get_context() を使用してコンテキストオブジェクトを取得することができます。コンテキストオブジェクトは multiprocessing モジュールと同じ API を持ち、同じプログラム内で複数の開始方式を使用できます。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Note that objects related to one context may not be compatible with processes for a different context. In particular, locks created using the fork context cannot be passed to a processes started using the spawn or forkserver start methods.

特定の開始方式の使用を要求するライブラリは get_context() を使用してライブラリ利用者の選択を阻害しないようにするべきです。

17.2.1.3. プロセス間でのオブジェクト交換

multiprocessing モジュールでは、プロセス間通信の手段が2つ用意されています。それぞれ以下に詳細を示します:

キュー (Queue)

Queue クラスは queue.Queue クラスとほとんど同じように使うことができます。以下に例を示します:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

キューはスレッドセーフであり、プロセスセーフです。

パイプ (Pipe)

Pipe() 関数はパイプで繋がれたコネクションオブジェクトのペアを返します。デフォルトでは双方向性パイプを返します。以下に例を示します:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

パイプのそれぞれの端を表す2つのコネクションオブジェクトが Pipe() 関数から返されます。各コネクションオブジェクトには、 send()recv()、その他のメソッドがあります。2つのプロセス (またはスレッド) がパイプの 同じ 端で同時に読み込みや書き込みを行うと、パイプ内のデータが破損してしまうかもしれないことに注意してください。もちろん、各プロセスがパイプの別々の端を同時に使用するならば、データが破壊される危険性はありません。

17.2.1.4. プロセス間の同期

multiprocessingthreading モジュールと等価な同期プリミティブを備えています。以下の例では、ロックを使用して、一度に1つのプロセスしか標準出力に書き込まないようにしています:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

ロックを使用しないで標準出力に書き込んだ場合は、各プロセスからの出力がごちゃまぜになってしまいます。

17.2.1.5. プロセス間での状態の共有

これまでの話の流れで触れたとおり、並行プログラミングを行うときには、できるかぎり状態を共有しないのが定石です。複数のプロセスを使用するときは特にそうでしょう。

しかし、どうしてもプロセス間のデータ共有が必要な場合のために multiprocessing モジュールには2つの方法が用意されています。

共有メモリ (Shared memory)

データを共有メモリ上に保持するために Value クラス、もしくは Array クラスを使用することができます。以下のサンプルコードを使って、この機能についてみていきましょう

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

このサンプルコードを実行すると以下のように表示されます

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

numarr を生成するときに使用されている、引数 'd''i'array モジュールにより使用される種別の型コードです。ここで使用されている 'd' は倍精度浮動小数、 'i' は符号付整数を表します。これらの共有オブジェクトは、プロセスセーフでありスレッドセーフです。

共有メモリを使用して、さらに柔軟なプログラミングを行うには multiprocessing.sharedctypes モジュールを使用します。このモジュールは共有メモリから割り当てられた任意の ctypes オブジェクトの生成をサポートします。

サーバープロセス (Server process)

Manager() 関数により生成されたマネージャーオブジェクトはサーバープロセスを管理します。マネージャーオブジェクトは Python のオブジェクトを保持して、他のプロセスがプロキシ経由でその Python オブジェクトを操作することができます。

Manager() 関数が返すマネージャは list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array をサポートします。 以下にサンプルコードを示します。

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

このサンプルコードを実行すると以下のように表示されます

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

サーバープロセスのマネージャーオブジェクトは共有メモリのオブジェクトよりも柔軟であるといえます。それは、どのような型のオブジェクトでも使えるからです。また、1つのマネージャーオブジェクトはネットワーク経由で他のコンピューター上のプロセスによって共有することもできます。しかし、共有メモリより動作が遅いという欠点があります。

17.2.1.6. ワーカープロセスのプールを使用

Pool クラスは、ワーカープロセスをプールする機能を備えています。このクラスには、異なる方法でワーカープロセスへタスクを割り当てるいくつかのメソッドがあります。

例えば:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。

注釈

このパッケージに含まれる機能を使用するためには、子プロセスから __main__ モジュールをインポートできる必要があります。このことについては プログラミングガイドライン で触れていますが、ここであらためて強調しておきます。なぜかというと、いくつかのサンプルコード、例えば multiprocessing.pool.Pool のサンプルはインタラクティブシェル上では動作しないからです。以下に例を示します:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(このサンプルを試すと、3つのトレースバックすべてがほぼランダムに交互に重なって表示されます。そうなったら、なんとかしてマスタープロセスを止めましょう。)

17.2.2. リファレンス

multiprocessing パッケージは threading モジュールの API とほとんど同じです。

17.2.2.1. Process クラスと例外

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process オブジェクトは各プロセスの処理を表します。 Process クラスは threading.Thread クラスのすべてのメソッドと同じインタフェースを提供します。

コンストラクターは必ずキーワード引数で呼び出すべきです。引数 group には必ず None を渡してください。 この引数は threading.Thread クラスとの互換性のためだけに残されています。引数 target には、 run() メソッドから呼び出される callable オブジェクトを渡します。この引数はデフォルトで None となっており、何も呼び出されません。引数 name にはプロセス名を渡します (詳細は name を見てください)。 args は対象の呼び出しに対する引数のタプルを渡します。 kwargs は対象の呼び出しに対するキーワード引数の辞書を渡します。もし提供されれば、キーワードのみの daemon 引数はプロセスの daemon フラグを True または False にセットします。 None の場合 (デフォルト)、このフラグは作成するプロセスから継承されます。

デフォルトでは、target には引数が渡されないようになっています。

サブクラスがコンストラクターをオーバーライドする場合は、そのプロセスに対する処理を行う前に基底クラスのコンストラクター (Process.__init__()) を実行しなければなりません。

バージョン 3.3 で変更: daemon 引数が追加されました。

run()

プロセスが実行する処理を表すメソッドです。

このメソッドはサブクラスでオーバーライドすることができます。標準の run() メソッドは、コンストラクターの target 引数として渡された呼び出し可能オブジェクトを呼び出します。もしコンストラクターに args もしくは kwargs 引数が渡されていれば、呼び出すオブジェクトにこれらの引数を渡します。

start()

プロセスの処理を開始するためのメソッドです。

各 Process オブジェクトに対し、このメソッドが2回以上呼び出されてはいけません。各プロセスでオブジェクトの run() メソッドを呼び出す準備を行います。

join([timeout])

オプションの引数 timeoutNone (デフォルト) の場合、 join() メソッドが呼ばれたプロセスは処理が終了するまでブロックします。 timeout が正の数である場合、最大 timeout 秒ブロックします。 プロセスが終了あるいはタイムアウトした場合、メソッドは None を返すことに注意してください。 プロセスの exitcode を確認し終了したかどうかを判断してください。

1つのプロセスは何回も join されることができます。

プロセスは自分自身を join することはできません。それはデッドロックを引き起こすことがあるからです。プロセスが start される前に join しようとするとエラーが発生します。

name

プロセスの名前。名前は識別のためだけに使用される文字列です。それ自体には特別な意味はありません。複数のプロセスに同じ名前が与えられても構いません。

最初の名前はコンストラクターによってセットされます。コンストラクターに明示的な名前が渡されない場合、 'Process-N1:N2:…:Nk' 形式の名前が構築されます。ここでそれぞれの Nk はその親のN番目の子供です。

is_alive()

プロセスが実行中かを判別します。

おおまかに言って、プロセスオブジェクトは start() メソッドを呼び出してから子プロセス終了までの期間が実行中となります。

daemon

デーモンプロセスであるかのフラグであり、ブール値です。この属性は start() が呼び出される前に設定されている必要があります。

初期値は作成するプロセスから継承します。

あるプロセスが終了するとき、そのプロセスはその子プロセスであるデーモンプロセスすべてを終了させようとします。

デーモンプロセスは子プロセスを作成できないことに注意してください。もし作成できてしまうと、そのデーモンプロセスの親プロセスが終了したときにデーモンプロセスの子プロセスが孤児になってしまう場合があるからです。さらに言えば、デーモンプロセスはUnix デーモンやサービスでは なく 通常のプロセスであり、非デーモンプロセスが終了すると終了されます (そして join されません)。

threading.Thread クラスの API に加えて Process クラスのオブジェクトには以下の属性およびメソッドがあります:

pid

プロセスIDを返します。プロセスの生成前は None が設定されています。

exitcode

子プロセスの終了コードです。子プロセスがまだ終了していない場合は None が返されます。負の値 -N は子プロセスがシグナル N で終了したことを表します。

authkey

プロセスの認証キーです (バイト文字列です)。

multiprocessing モジュールがメインプロセスにより初期化される場合には、 os.urandom() 関数を使用してランダムな値が設定されます。

Process クラスのオブジェクトの作成時にその親プロセスから認証キーを継承します。もしくは authkey に別のバイト文字列を設定することもできます。

詳細は 認証キー を参照してください。

sentinel

プロセスが終了するときに "ready" となるシステムオブジェクトの数値ハンドル。

multiprocessing.connection.wait() を使用していくつかのイベントを同時に wait したい場合はこの値を使うことができます。それ以外の場合は join() を呼ぶ方がより単純です。

Windows においては、これは WaitForSingleObject および WaitForMultipleObjects ファミリーの API 呼び出しで使用可能な OS ハンドルです。 Unix においては、これは select モジュールのプリミティブで使用可能なファイル記述子です。

バージョン 3.3 で追加.

terminate()

プロセスを終了します。Unix 環境では SIGTERM シグナルを、 Windows 環境では TerminateProcess() を使用して終了させます。終了ハンドラーや finally 節などは、実行されないことに注意してください。

このメソッドにより終了するプロセスの子孫プロセスは、終了 しません 。そういった子孫プロセスは単純に孤児になります。

警告

このメソッドの使用時に、関連付けられたプロセスがパイプやキューを使用している場合には、使用中のパイプやキューが破損して他のプロセスから使用できなくなる可能性があります。同様に、プロセスがロックやセマフォなどを取得している場合には、このプロセスが終了してしまうと他のプロセスのデッドロックの原因になるでしょう。

プロセスオブジェクトが作成したプロセスのみが start(), join(), is_alive(), terminate()exitcode のメソッドを呼び出すべきです。

以下の例では Process のメソッドの使い方を示しています:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

すべての multiprocessing 例外の基底クラスです。

exception multiprocessing.BufferTooShort

この例外は Connection.recv_bytes_into() によって発生し、バッファーオブジェクトが小さすぎてメッセージが読み込めないことを示します。

eBufferTooShort のインスタンスであるとすると、 e.args[0] はそのメッセージをバイト文字列で与えるものです。

exception multiprocessing.AuthenticationError

認証エラーがあった場合に送出されます。

exception multiprocessing.TimeoutError

タイムアウトをサポートするメソッドでタイムアウトが過ぎたときに送出されます。

17.2.2.2. パイプ (Pipe) とキュー (Queue)

複数のプロセスを使う場合、一般的にはメッセージパッシングをプロセス間通信に使用し、ロックのような同期プリミティブを使用しないようにします。

メッセージのやりとりのために Pipe() (2つのプロセス間の通信用)、もしくはキュー (複数のメッセージ生成プロセス (producer)、消費プロセス (consumer) の実現用) を使うことができます。

Queue, SimpleQueueJoinableQueue 型は複数プロセスから生成/消費を行う FIFO キューです。これらのキューは標準ライブラリの queue.Queue を模倣しています。 Queue には Python 2.5 の queue.Queue クラスで導入された task_done()join() メソッドがないことが違う点です。

もし JoinableQueue を使用するなら、キューから削除される各タスクのために JoinableQueue.task_done() を呼び出さなければ なりません 。さもないと、いつか完了していないタスクを数えるためのセマフォがオーバーフローし、例外を発生させるでしょう。

管理オブジェクトを使用することで共有キューを作成できることも覚えておいてください。詳細は マネージャー を参照してください。

注釈

multiprocessing は、タイムアウトを伝えるために、通常の queue.Emptyqueue.Full 例外を使用します。それらは multiprocessing の名前空間では利用できないため、queue からインポートする必要があります。

注釈

オブジェクトがキューに追加される際、そのオブジェクトは pickle 化されています。そのため、バックグラウンドのスレッドが後になって下位層のパイプに pickle 化されたデータをフラッシュすることがあります。これにより、少し驚くような結果になりますが、実際に問題になることはないはずです。これが問題になるような状況では、かわりに manager を使ってキューを作成することができるからです。

  1. 空のキューの中にオブジェクトを追加した後、キューの empty() メソッドが False を返すまでの間にごくわずかな遅延が起きることがあり、get_nowait()queue.Empty を発生させることなく制御が呼び出し元に返ってしまうことがあります。
  2. 複数のプロセスがオブジェクトをキューに詰めている場合、キューの反対側ではオブジェクトが詰められたのとは違う順序で取得される可能性があります。ただし、同一のプロセスから詰め込まれたオブジェクトは、それらのオブジェクト間では、必ず期待どおりの順序になります。

警告

Queue を利用しようとしている最中にプロセスを Process.terminate()os.kill() で終了させる場合、キューにあるデータは破損し易くなります。終了した後で他のプロセスがキューを利用しようとすると、例外を発生させる可能性があります。

警告

上述したように、もし子プロセスがキューへ要素を追加するなら (かつ JoinableQueue.cancel_join_thread を使用しないなら) そのプロセスはバッファーされたすべての要素がパイプへフラッシュされるまで終了しません。

これは、そのプロセスを join しようとする場合、キューに追加されたすべての要素が消費されたことが確実でないかぎり、デッドロックを発生させる可能性があることを意味します。似たような現象で、子プロセスが非デーモンプロセスの場合、親プロセスは終了時に非デーモンのすべての子プロセスを join しようとしてハングアップする可能性があります。

マネージャーを使用して作成されたキューではこの問題はありません。詳細は プログラミングガイドライン を参照してください。

プロセス間通信におけるキューの使用例を知りたいなら 使用例 を参照してください。

multiprocessing.Pipe([duplex])

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

duplexTrue (デフォルト) ならパイプは双方向性です。duplexFalse ならパイプは一方向性で、conn1 はメッセージの受信専用、conn2 はメッセージの送信専用になります。

class multiprocessing.Queue([maxsize])

パイプや2~3個のロック/セマフォを使用して実装されたプロセス共有キューを返します。あるプロセスが最初に要素をキューへ追加するとき、バッファーからパイプの中へオブジェクトを転送する供給スレッドが開始されます。

標準ライブラリの queue モジュールの通常の queue.Emptyqueue.Full 例外がタイムアウトを伝えるために送出されます。

Queuetask_done()join() を除く queue.Queue のすべてのメソッドを実装します。

qsize()

おおよそのキューのサイズを返します。マルチスレッディング/マルチプロセスの特性上、この数値は信用できません。

これは sem_getvalue() が実装されていない Mac OS X のような Unix プラットホーム上で NotImplementedError を発生させる可能性があることを覚えておいてください。

empty()

キューが空っぽなら True を、そうでなければ False を返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。

full()

キューがいっぱいなら True を、そうでなければ False を返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。

put(obj[, block[, timeout]])

キューの中へ obj を追加します。オプションの引数 blockTrue (デフォルト) 且つ timeoutNone (デフォルト) なら、空きスロットが利用可能になるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に空きスロットが利用できなかったら queue.Full 例外を発生させます。それ以外 (blockFalse) で、空きスロットがすぐに利用可能な場合はキューに要素を追加します。そうでなければ queue.Full 例外が発生します(その場合 timeout は無視されます)。

put_nowait(obj)

put(obj, False) と等価です。

get([block[, timeout]])

キューから要素を取り出して削除します。オプションの引数 blockTrue (デフォルト) 且つ timeoutNone (デフォルト) なら、要素が取り出せるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に要素が取り出せなかったら queue.Empty 例外を発生させます。それ以外 (blockFalse) で、要素がすぐに取り出せる場合は要素を返します。そうでなければ queue.Empty 例外が発生します(その場合 timeout は無視されます)。

get_nowait()

get(False) と等価です。

multiprocessing.Queuequeue.Queue にはない追加メソッドがあります。 これらのメソッドは通常、ほとんどのコードに必要ありません:

close()

カレントプロセスからこのキューへそれ以上データが追加されないことを表します。バックグラウンドスレッドはパイプへバッファーされたすべてのデータをフラッシュするとすぐに終了します。これはキューがガベージコレクトされるときに自動的に呼び出されます。

join_thread()

バックグラウンドスレッドを join します。このメソッドは close() が呼び出された後でのみ使用されます。バッファーされたすべてのデータがパイプへフラッシュされるのを保証するため、バックグラウンドスレッドが終了するまでブロックします。

デフォルトでは、あるプロセスがキューを作成していない場合、終了時にキューのバックグラウンドスレッドを join しようとします。そのプロセスは join_thread() が何もしないように cancel_join_thread() を呼び出すことができます。

cancel_join_thread()

join_thread() がブロッキングするのを防ぎます。特にこれはバックグラウンドスレッドがそのプロセスの終了時に自動的に join されるのを防ぎます。詳細は join_thread() を参照してください。

このメソッドは allow_exit_without_flush() という名前のほうがよかったかもしれません。キューに追加されたデータが失われてしまいがちなため、このメソッドを使う必要はほぼ確実にないでしょう。本当にこれが必要になるのは、キューに追加されたデータを下位層のパイプにフラッシュすることなくカレントプロセスを直ちに終了する必要があり、かつ失われるデータに関心がない場合です。

注釈

このクラスに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォ (shared semaphore) を使用しているものがあります。これが使用できない場合には、このクラスが無効になり、 Queue をインスタンス化する時に ImportError が発生します。詳細は bpo-3770 を参照してください。同様のことが、以下に列挙されている特殊なキューでも成り立ちます。

class multiprocessing.SimpleQueue

単純化された Queue 型です。ロックされた Pipe と非常に似ています。

empty()

キューが空ならば True を、そうでなければ False を返します。

get()

キューから要素を削除して返します。

put(item)

item をキューに追加します。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue のサブクラスであり、 task_done()join() メソッドが追加されているキューです。

task_done()

以前にキューへ追加されたタスクが完了したことを表します。キューのコンシューマによって使用されます。 タスクをフェッチするために使用されるそれぞれの get() に対して、 後続の task_done() 呼び出しはタスクの処理が完了したことをキューへ伝えます。

もし join() がブロッキング状態なら、 すべての要素が処理されたときに復帰します( task_done() 呼び出しが すべての要素からキュー内へ put() されたと受け取ったことを意味します)。

キューにある要素より多く呼び出された場合 ValueError が発生します。

join()

キューにあるすべてのアイテムが取り出されて処理されるまでブロックします。

キューに要素が追加されると未完了タスク数が増えます。コンシューマがキューの要素が取り出されてすべての処理が完了したことを表す task_done() を呼び出すと数が減ります。 未完了タスク数がゼロになると join() はブロッキングを解除します。

17.2.2.3. その他

multiprocessing.active_children()

カレントプロセスのすべてのアクティブな子プロセスのリストを返します。

これを呼び出すと "join" してすでに終了しているプロセスには副作用があります。

multiprocessing.cpu_count()

システムの CPU 数を返します。

この数は現在のプロセスが使える CPU 数と同じものではありません。 使用可能な CPU 数は len(os.sched_getaffinity(0)) で取得できます。

NotImplementedError を送出するかもしれません。

multiprocessing.current_process()

カレントプロセスに対応する Process オブジェクトを返します。

threading.current_thread() とよく似た関数です。

multiprocessing.freeze_support()

multiprocessing を使用しているプログラムをフリーズして Windows の実行可能形式を生成するためのサポートを追加します。(py2exe , PyInstallercx_Freeze でテストされています。)

メインモジュールの if __name__ == '__main__' の直後にこの関数を呼び出す必要があります。以下に例を示します:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

もし freeze_support() の行がない場合、フリーズされた実行可能形式を実行しようとすると RuntimeError を発生させます。

freeze_support() の呼び出しは Windows 以外の OS では効果がありません。さらに、もしモジュールが Windows の通常の Python インタプリタによって実行されているならば(プログラムがフリーズされていなければ) freeze_support() は効果がありません。

multiprocessing.get_all_start_methods()

サポートしている開始方式のリストを返します。先頭の要素がデフォルトを意味します。利用可能な開始方式には 'fork''spawn' および 'forkserver' があります。Windows では 'spawn' のみが利用可能です。Unix では 'fork' および 'spawn' は常にサポートされており、'fork' がデフォルトになります。

バージョン 3.4 で追加.

multiprocessing.get_context(method=None)

multiprocessing モジュールと同じ属性を持つコンテキストオブジェクトを返します。

methodNone の場合、デフォルトのコンテキストが返されます。その他の場合 method'fork''spawn' あるいは 'forkserver' でなければなりません。指定された開始方式が利用できない場合は ValueError が送出されます。

バージョン 3.4 で追加.

multiprocessing.get_start_method(allow_none=False)

開始するプロセスで使用する開始方式名を返します。

開始方式がまだ確定しておらず、allow_none の値が偽の場合、開始方式はデフォルトに確定され、その名前が返されます。開始方式が確定しておらず、allow_none の値が真の場合、 None が返されます。

返り値は 'fork''spawn''forkserver' あるいは None になります。Unix では 'fork' が、Windows では 'spawn' がデフォルトになります。

バージョン 3.4 で追加.

multiprocessing.set_executable()

子プロセスを開始するときに、使用する Python インタープリターのパスを設定します。(デフォルトでは sys.executable が使用されます)。コードに組み込むときは、おそらく次のようにする必要があります

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

子プロセスを作成する前に行ってください。

バージョン 3.4 で変更: Unix で開始方式に 'spawn' を使用している場合にサポートされました。

multiprocessing.set_start_method(method)

子プロセスの開始方式を指定します。method には 'fork''spawn' あるいは 'forkserver' を指定できます。

これは一度しか呼び出すことができず、その場所もメインモジュールの if __name__ == '__main__' 節内で保護された状態でなければなりません。

バージョン 3.4 で追加.

17.2.2.4. Connection オブジェクト

Connection オブジェクトは pickle でシリアライズ可能なオブジェクトか文字列を送ったり、受け取ったりします。そういったオブジェクトはメッセージ指向の接続ソケットと考えられます。

Connection objects are usually created using Pipe() – see also リスナーとクライアント.

class multiprocessing.Connection
send(obj)

コネクションの相手側へ recv() を使用して読み込むオブジェクトを送ります。

オブジェクトは pickle でシリアライズ可能でなければなりません。 pickle が極端に大きすぎる (OS にも依りますが、およそ 32 MB+) と、 ValueError 例外が送出されることがあります。

recv()

コネクションの相手側から send() を使用して送られたオブジェクトを返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合 EOFError が発生します。

fileno()

コネクションが使用するハンドラーか、ファイル記述子を返します。

close()

コネクションをクローズします。

コネクションがガベージコレクトされるときに自動的に呼び出されます。

poll([timeout])

読み込み可能なデータがあるかどうかを返します。

timeout が指定されていなければすぐに返します。 timeout に数値を指定すると、最大指定した秒数をブロッキングします。 timeoutNone を指定するとタイムアウトせずにずっとブロッキングします。

multiprocessing.connection.wait() を使って複数のコネクションオブジェクトを同時にポーリングできることに注意してください。

send_bytes(buffer[, offset[, size]])

bytes-like object から完全なメッセージとしてバイトデータを送ります。

offset が指定されると buffer のその位置からデータが読み込まれます。 size が指定されるとバッファーからその量のデータが読み込まれます。非常に大きなバッファー (OS に依存しますが、およそ 32MB+) を指定すると、 ValueError 例外が発生するかもしれません。

recv_bytes([maxlength])

コネクションの相手側から送られたバイトデータの完全なメッセージを文字列として返します。何か受け取るまでブロックします。受け取るデータが何も残っておらず、相手側がコネクションを閉じていた場合、 EOFError が送出されます。

maxlength を指定していて、かつメッセージが maxlength より長い場合、 OSError が発生してコネクションからそれ以上読めなくなります。

バージョン 3.3 で変更: この関数は以前は IOError を送出していました。今では OSError の別名です。

recv_bytes_into(buffer[, offset])

コネクションの相手側から送られたバイトデータを buffer に読み込み、メッセージのバイト数を返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合 EOFError が発生します。

buffer は書き込み可能な bytes-like object でなければなりません。 offset が与えられたら、その位置からバッファーへメッセージが書き込まれます。 オフセットは buffer バイトよりも小さい正の数でなければなりません。

バッファーがあまりに小さいと BufferTooShort 例外が発生します。 e が例外インスタンスとすると完全なメッセージは e.args[0] で確認できます。

バージョン 3.3 で変更: Connection.send()Connection.recv() を使用して Connection オブジェクト自体をプロセス間で転送できるようになりました。

バージョン 3.3 で追加: Connection オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 – コンテキストマネージャ型 を参照してください。 __enter__() は Connection オブジェクトを返します。また __exit__()close() を呼び出します。

例えば:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() メソッドは受信したデータを自動的に unpickle 化します。それはメッセージを送ったプロセスが信頼できる場合を除いてセキュリティリスクになります。

そのため Pipe() を使用してコネクションオブジェクトを生成する場合を除いて、何らかの認証処理を実行した後で recv()send() メソッドのみを使用すべきです。詳細は 認証キー を参照してください。

警告

もしプロセスがパイプの読み込みまたは書き込み中に kill されると、メッセージの境界がどこなのか分からなくなってしまうので、そのパイプ内のデータは破損してしまいがちです。

17.2.2.5. 同期プリミティブ

一般的にマルチプロセスプログラムは、マルチスレッドプログラムほどは同期プリミティブを必要としません。詳細は threading モジュールのドキュメントを参照してください。

マネージャーオブジェクトを使用して同期プリミティブを作成できることも覚えておいてください。詳細は マネージャー を参照してください。

class multiprocessing.Barrier(parties[, action[, timeout]])

バリアーオブジェクト: threading.Barrier のクローンです。

バージョン 3.3 で追加.

class multiprocessing.BoundedSemaphore([value])

有限セマフォオブジェクト: threading.BoundedSemaphore の類似物です。

よく似た threading.BoundedSemaphore とは、次の一点だけ異なります。 acquire メソッドの第一引数名は block で、Lock.acquire() と一致しています。

注釈

Mac OS X では sem_getvalue() が実装されていないので Semaphore と区別がつきません。

class multiprocessing.Condition([lock])

状態変数: threading.Condition の別名です。

lock を指定するなら multiprocessingLockRLock オブジェクトにすべきです。

バージョン 3.3 で変更: wait_for() メソッドが追加されました。

class multiprocessing.Event

threading.Event のクローンです。

class multiprocessing.Lock

再帰しないロックオブジェクトで、 threading.Lock 相当のものです。プロセスやスレッドがロックをいったん獲得 (acquire) すると、それに続くほかのプロセスやスレッドが獲得しようとする際、それが解放 (release) されるまではブロックされます。解放はどのプロセス、スレッドからも行えます。スレッドに対して適用される threading.Lock のコンセプトと振る舞いは、特筆すべきものがない限り、プロセスとスレッドに適用される multiprocessing.Lock に引き継がれています。

Lock は実際にはファクトリ関数で、デフォルトコンテキストで初期化された multiprocessing.synchronize.Lock のインスタンスを返すことに注意してください。

Lockcontext manager プロトコルをサポートしています。つまり with 文で使うことができます。

acquire(block=True, timeout=None)

ブロックあり、またはブロックなしでロックを獲得します。

引数 blockTrue (デフォルト) に設定して呼び出した場合、ロックがアンロック状態になるまでブロックします。ブロックから抜けるとそれをロック状態にしてから True を返します。 threading.Lock.acquire() の最初の引数とは名前が違っているので注意してください。

引数 block の値を False にして呼び出すとブロックしません。 現在ロック状態であれば、直ちに False を返します。それ以外の場合には、ロックをロック状態にして True を返します。

timeout として正の浮動小数点数を与えて呼び出すと、ロックが獲得できない限り、指定された秒数だけブロックします。 timeout 値に負数を与えると、ゼロを与えた場合と同じになります。 timeout 値の None (デフォルト) を与えると、無限にブロックします。 timeout 引数の負数と None の扱いは、 threading.Lock.acquire() に実装された動作と異なるので注意してください。 blockFalse の場合、 timeout は実際的な意味を持たなくなるので無視されます。ロックを獲得した場合は True 、タイムアウトした場合は False で戻ります。

release()

ロックを解放します。これはロックを獲得したプロセスやスレッドだけでなく、任意のプロセスやスレッドから呼ぶことができます。

threading.Lock.release() と同じように振舞いますが、ロックされていない場合に呼び出すと ValueError となる点だけが違います。

class multiprocessing.RLock

再帰ロックオブジェクトで、 threading.RLock 相当のものです。再帰ロックオブジェクトはそれを獲得 (acquire) したプロセスやスレッドが解放 (release) しなければなりません。プロセスやスレッドがロックをいったん獲得すると、同じプロセスやスレッドはブロックされずに再度獲得出来ます。そのプロセスやスレッドは獲得した回数ぶん解放しなければなりません。

RLock は実際にはファクトリ関数で、デフォルトコンテキストで初期化された multiprocessing.synchronize.Lock のインスタンスを返すことに注意してください。

RLockcontext manager プロトコルをサポートしています。つまり with 文で使うことができます。

acquire(block=True, timeout=None)

ブロックあり、またはブロックなしでロックを獲得します。

block 引数を True にして呼び出した場合、ロックが既にカレントプロセスもしくはカレントスレッドが既に所有していない限りは、アンロック状態 (どのプロセス、スレッドも所有していない状態) になるまでブロックします。ブロックから抜けるとカレントプロセスもしくはカレントスレッドが (既に持っていなければ) 所有権を得て、再帰レベルをインクリメントし、 True で戻ります。 threading.RLock.acquire() の実装とはこの最初の引数の振る舞いが、その名前自身を始めとしていくつか違うので注意してください。

block 引数を False にして呼び出した場合、ブロックしません。ロックが他のプロセスもしくはスレッドにより獲得済み (つまり所有されている) であれば、カレントプロセスまたはカレントスレッドは所有権を得ず、再帰レベルも変更せずに、 False で戻ります。ロックがアンロック状態の場合、カレントプロセスもしくはカレントスレッドは所有権を得て再帰レベルがインクリメントされ、 True で戻ります。(—訳注: block の True/False 関係なくここでの説明では「所有権を持っている場合の2度目以降の aquire」の説明が欠けています。2度目以降の acquire では再帰レベルがインクリメントされて即座に返ります。全体読めばわかるとは思いますが一応。—)

timeout 引数の使い方と振る舞いは Lock.acquire() と同じです。 timeout 引数の振る舞いがいくつかの点で threading.RLock.acquire() と異なるので注意してください。

release()

再帰レベルをデクリメントしてロックを解放します。デクリメント後に再帰レベルがゼロになった場合、ロックの状態をアンロック (いかなるプロセス、いかなるスレッドにも所有されていない状態) にリセットし、ロックの状態がアンロックになるのを待ってブロックしているプロセスもしくはスレッドがある場合にはその中のただ一つだけが処理を進行できるようにします。デクリメント後も再帰レベルがゼロでない場合、ロックの状態はロックのままで、呼び出し側のプロセスもしくはスレッドに所有されたままになります。

このメソッドは呼び出しプロセスあるいはスレッドがロックを所有している場合に限り呼び出してください。所有者でないプロセスもしくはスレッドによって呼ばれるか、あるいはアンロック (未所有) 状態で呼ばれた場合、 AssertionError が送出されます。同じ状況での threading.RLock.release() 実装とは例外の型が異なるので注意してください。

class multiprocessing.Semaphore([value])

セマフォオブジェクト: threading.Semaphore のクローンです。

よく似た threading.BoundedSemaphore とは、次の一点だけ異なります。 acquire メソッドの第一引数名は block で、Lock.acquire() と一致しています。

注釈

Mac OS X では sem_timedwait がサポートされていないので、acquire() にタイムアウトを与えて呼ぶと、ループ内でスリープすることでこの関数がエミュレートされます。

注釈

メインスレッドが BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() 又は Condition.wait() を呼び出してブロッキング状態のときに Ctrl-C で生成される SIGINT シグナルを受け取ると、その呼び出しはすぐに中断されて KeyboardInterrupt が発生します。

これは同等のブロッキング呼び出しが実行中のときに SIGINT が無視される threading の振る舞いとは違っています。

注釈

このパッケージに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォを使用しているものがあります。これが使用できない場合には、multiprocessing.synchronize モジュールが無効になり、このモジュールのインポート時に ImportError が発生します。詳細は bpo-3770 を参照してください。

17.2.2.6. 共有 ctypes オブジェクト

子プロセスにより継承される共有メモリを使用する共有オブジェクトを作成することができます。

multiprocessing.Value(typecode_or_type, *args, lock=True)

共有メモリから割り当てられた ctypes オブジェクトを返します。 デフォルトでは、返り値は実際のオブジェクトの同期ラッパーです。オブジェクトそれ自身は、 Valuevalue 属性によってアクセスできます。

typecode_or_type は返されるオブジェクトの型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。 *args は型のコンストラクターへ渡されます。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たに再帰的なロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返されたオブジェクトへのアクセスはロックにより自動的に保護されません。そのため、必ずしも "プロセスセーフ" ではありません。

+= のような演算は、読み込みと書き込みを含むためアトミックでありません。このため、たとえば自動的に共有の値を増加させたい場合、以下のようにするのでは不十分です

counter.value += 1

関連するロックが再帰的 (それがデフォルトです) なら、かわりに次のようにします

with counter.get_lock():
    counter.value += 1

lock はキーワード引数でのみ指定することに注意してください。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

共有メモリから割り当てられた ctypes 配列を返します。デフォルトでは、返り値は実際の配列の同期ラッパーです。

typecode_or_type は返される配列の要素の型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。 size_or_initializer が整数なら、配列の長さを決定し、その配列はゼロで初期化されます。別の使用方法として size_or_initializer は配列の初期化に使用されるシーケンスになり、そのシーケンス長が配列の長さを決定します。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たなロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返されたオブジェクトへのアクセスはロックにより自動的に保護されません。そのため、必ずしも "プロセスセーフ" ではありません。

lock はキーワード引数としてのみ利用可能なことに注意してください。

ctypes.c_char の配列は文字列を格納して取り出せる valueraw 属性を持っていることを覚えておいてください。

17.2.2.6.1. multiprocessing.sharedctypes モジュール

multiprocessing.sharedctypes モジュールは子プロセスに継承される共有メモリの ctypes オブジェクトを割り当てる関数を提供します。

注釈

共有メモリのポインターを格納することは可能ではありますが、特定プロセスのアドレス空間の位置を参照するということを覚えておいてください。しかし、そのポインターは別のプロセスのコンテキストにおいて無効になる確率が高いです。そして、別のプロセスからそのポインターを逆参照しようとするとクラッシュを引き起こす可能性があります。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

共有メモリから割り当てられた ctypes 配列を返します。

typecode_or_type は返される配列の要素の型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードのどちらか一方です。 size_or_initializer が整数なら、それが配列の長さになり、その配列はゼロで初期化されます。別の使用方法として size_or_initializer には配列の初期化に使用されるシーケンスを設定することもでき、その場合はシーケンスの長さが配列の長さになります。

要素を取得したり設定したりすることは潜在的に非アトミックであることに注意してください。ロックを使用して自動的に同期化されたアクセスを保証するには Array() を使用してください。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

共有メモリから割り当てられた ctypes オブジェクトを返します。

typecode_or_type は返されるオブジェクトの型を決めます。それは ctypes の型か array モジュールで使用されるような1文字の型コードかのどちらか一方です。 *args は型のコンストラクターへ渡されます。

値を取得したり設定したりすることは潜在的に非アトミックであることに注意してください。ロックを使用して自動的に同期化されたアクセスを保証するには Value() を使用してください。

ctypes.c_char の配列は文字列を格納して取り出せる valueraw 属性を持っていることを覚えておいてください。詳細は ctypes を参照してください。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray() と同様ですが、 lock の値によっては ctypes 配列をそのまま返す代わりに、プロセスセーフな同期ラッパーが返されます。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たな ロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返された オブジェクトへのアクセスはロックにより自動的に保護されません。 そのため、必ずしも "プロセスセーフ" ではありません。

lock はキーワード引数でのみ指定することに注意してください。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue() と同様ですが、 lock の値によっては ctypes オブジェクトをそのまま返す代わりに、プロセスセーフな同期ラッパーが返されます。

lockTrue (デフォルト) なら、値へ同期アクセスするために新たな ロックオブジェクトが作成されます。 lockLockRLock なら値への同期アクセスに使用されます。 lockFalse なら、返された オブジェクトへのアクセスはロックにより自動的に保護されません。 そのため、必ずしも "プロセスセーフ" ではありません。

lock はキーワード引数でのみ指定することに注意してください。

multiprocessing.sharedctypes.copy(obj)

共有メモリから割り当てられた ctypes オブジェクト obj をコピーしたオブジェクトを返します。

multiprocessing.sharedctypes.synchronized(obj[, lock])

同期アクセスに lock を使用する ctypes オブジェクトのためにプロセスセーフなラッパーオブジェクトを返します。 lockNone (デフォルト) なら、 multiprocessing.RLock オブジェクトが自動的に作成されます。

同期ラッパーがラップするオブジェクトに加えて2つのメソッドがあります。 get_obj() はラップされたオブジェクトを返します。 get_lock() は同期のために使用されるロックオブジェクトを返します。

ラッパー経由で ctypes オブジェクトにアクセスすることは raw ctypes オブジェクトへアクセスするよりずっと遅くなることに注意してください。

バージョン 3.5 で変更: synchronized オブジェクトは コンテクストマネージャ プロトコルをサポートしています。

次の表は通常の ctypes 構文で共有メモリから共有 ctypes オブジェクトを作成するための構文を比較します。 (MyStruct テーブル内には ctypes.Structure のサブクラスがあります。)

ctypes type を使用する sharedctypes typecode を使用する sharedctypes
c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4)
MyStruct(4, 6) RawValue(MyStruct, 4, 6)  
(c_short * 7)() RawArray(c_short, 7) RawArray('h', 7)
(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8))

以下に子プロセスが多くの ctypes オブジェクトを変更する例を紹介します:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

結果は以下のように表示されます

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

17.2.2.7. マネージャー

マネージャーは異なるプロセス間で共有されるデータの作成方法を提供します。これには別のマシン上で走るプロセス間のネットワーク越しの共有も含まれます。マネージャーオブジェクトは 共有オブジェクト を管理するサーバープロセスを制御します。他のプロセスはプロキシ経由で共有オブジェクトへアクセスすることができます。

multiprocessing.Manager()

プロセス間でオブジェクトを共有するために使用される SyncManager オブジェクトを返します。返されたマネージャーオブジェクトは生成される子プロセスに対応付けられ、共有オブジェクトを作成するメソッドや、共有オブジェクトに対応するプロキシを返すメソッドを持ちます。

マネージャープロセスは親プロセスが終了するか、ガベージコレクトされると停止します。マネージャークラスは multiprocessing.managers モジュールで定義されています:

class multiprocessing.managers.BaseManager([address[, authkey]])

BaseManager オブジェクトを作成します。

作成後、start() または get_server().serve_forever() を呼び出して、マネージャーオブジェクトが、開始されたマネージャープロセスを確実に参照するようにしてください。

address はマネージャープロセスが新たなコネクションを待ち受けるアドレスです。addressNone の場合、任意のアドレスが設定されます。

authkey はサーバープロセスへ接続しようとするコネクションの正当性を検証するために 使用される認証キーです。authkeyNone の場合 current_process().authkey が使用されます。authkey を使用する場合はバイト文字列でなければなりません。

start([initializer[, initargs]])

マネージャーを開始するためにサブプロセスを開始します。initializerNone でなければ、サブプロセスは開始時に initializer(*initargs) を呼び出します。

get_server()

マネージャーの制御下にある実際のサーバーを表す Server オブジェクトを返します。 Server オブジェクトは serve_forever() メソッドをサポートします:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server はさらに address 属性も持っています。

connect()

ローカルからリモートのマネージャーオブジェクトへ接続します:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc')
>>> m.connect()
shutdown()

マネージャーが使用するプロセスを停止します。これはサーバープロセスを開始するために start() が使用された場合のみ有効です。

これは複数回呼び出すことができます。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

マネージャークラスで呼び出し可能オブジェクト(callable)や型を登録するために使用されるクラスメソッドです。

typeid は特に共有オブジェクトの型を識別するために使用される "型識別子" です。これは文字列でなければなりません。

callable はこの型識別子のオブジェクトを作成するために使用される呼び出し可能オブジェクトです。マネージャーインスタンスが connect() メソッドを使ってサーバーに接続されているか、 create_method 引数が False の場合は、 None でも構いません。

proxytype はこの typeid で共有オブジェクトのプロキシを作成するために使用される BaseProxy のサブクラスです。 None の場合、プロキシクラスは自動的に作成されます。

exposedBaseProxy._callmethod() を使用したアクセスが許されるべき typeid をプロキシするメソッド名のシーケンスを指定するために使用されます (exposedNone の場合 proxytype._exposed_ が存在すればそれが代わりに使用されます)。exposed リストが指定されない場合、共有オブジェクトのすべての "パブリックメソッド" がアクセス可能になります。 (ここでいう "パブリックメソッド" とは __call__() メソッドを持つものと名前が '_' で始まらないあらゆる属性を意味します。)

method_to_typeid はプロキシが返す exposed メソッドの返り値の型を指定するために使用されるマッピングで、メソッド名を typeid 文字列にマップします。 (method_to_typeidNone の場合 proxytype._method_to_typeid_ が存在すれば、それが代わりに使用されます。) メソッド名がこのマッピングのキーではないか、マッピングが None の場合、そのメソッドによって返されるオブジェクトが値として (by value) コピーされます。

create_method は、共有オブジェクトを作成し、それに対するプロキシを返すようサーバープロセスに伝える、名前 typeid のメソッドを作成するかを決定します。デフォルトでは True です。

BaseManager インスタンスも読み出し専用属性を1つ持っています:

address

マネージャーが使用するアドレスです。

バージョン 3.3 で変更: マネージャーオブジェクトはコンテキストマネージメント・プロトコルをサポートします – コンテキストマネージャ型 を参照してください。 __enter__() は (まだ開始していない場合) サーバープロセスを開始してから、マネージャーオブジェクトを返します。 __exit__()shutdown() を呼び出します。

旧バージョンでは、 __enter__() はマネージャーのサーバープロセスがまだ開始していなかった場合でもプロセスを開始しませんでした。

class multiprocessing.managers.SyncManager

プロセス間の同期のために使用される BaseManager のサブクラスです。 multiprocessing.Manager() はこの型のオブジェクトを返します。

Its methods create and return Proxy オブジェクト for a number of commonly used data types to be synchronized across processes. This notably includes shared lists and dictionaries.

Barrier(parties[, action[, timeout]])

共有 threading.Barrier オブジェクトを作成して、そのプロキシを返します。

バージョン 3.3 で追加.

BoundedSemaphore([value])

共有 threading.BoundedSemaphore オブジェクトを作成して、そのプロキシを返します。

Condition([lock])

共有 threading.Condition オブジェクトを作成して、そのプロキシを返します。

lock が提供される場合 threading.Lockthreading.RLock オブジェクトのためのプロキシになります。

バージョン 3.3 で変更: wait_for() メソッドが追加されました。

Event()

共有 threading.Event オブジェクトを作成して、そのプロキシを返します。

Lock()

共有 threading.Lock オブジェクトを作成して、そのプロキシを返します。

Namespace()

共有 Namespace オブジェクトを作成して、そのプロキシを返します。

Queue([maxsize])

共有 queue.Queue オブジェクトを作成して、そのプロキシを返します。

RLock()

共有 threading.RLock オブジェクトを作成して、そのプロキシを返します。

Semaphore([value])

共有 threading.Semaphore オブジェクトを作成して、そのプロキシを返します。

Array(typecode, sequence)

配列を作成して、そのプロキシを返します。

Value(typecode, value)

書き込み可能な value 属性を作成して、そのプロキシを返します。

dict()
dict(mapping)
dict(sequence)

共有 dict オブジェクトを作成して、そのプロキシを返します。

list()
list(sequence)

共有 list オブジェクトを作成して、そのプロキシを返します。

バージョン 3.6 で変更: 共有オブジェクトは入れ子もできます。 例えば、共有リストのような共有コンテナオブジェクトは、 SyncManager がまとめて管理し同期を取っている他の共有オブジェクトを保持できます。

class multiprocessing.managers.Namespace

SyncManager に登録することのできる型です。

Namespace オブジェクトにはパブリックなメソッドはありませんが、書き込み可能な属性を持ちます。そのオブジェクト表現はその属性の値を表示します。

しかし、Namespace オブジェクトのためにプロキシを使用するとき '_' が先頭に付く属性はプロキシの属性になり、参照対象の属性にはなりません:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

17.2.2.7.1. カスタマイズされたマネージャー

独自のマネージャーを作成するには、BaseManager のサブクラスを作成して、 マネージャークラスで呼び出し可能なオブジェクトか新たな型を登録するために register() クラスメソッドを使用します。例えば:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

17.2.2.7.2. リモートマネージャーを使用する

あるマシン上でマネージャーサーバーを実行して、他のマシンからそのサーバーを使用するクライアントを持つことができます(ファイアウォールを通過できることが前提)。

次のコマンドを実行することでリモートクライアントからアクセスを受け付ける1つの共有キューのためにサーバーを作成します:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

あるクライアントからサーバーへのアクセスは次のようになります:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

別のクライアントもそれを使用することができます:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

ローカルプロセスもそのキューへアクセスすることができます。クライアント上で上述のコードを使用してアクセスします:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

17.2.2.8. Proxy オブジェクト

プロキシは別のプロセスで(おそらく)有効な共有オブジェクトを 参照する オブジェクトです。共有オブジェクトはプロキシの 参照対象 になるということができます。複数のプロキシオブジェクトが同じ参照対象を持つ可能性もあります。

プロキシオブジェクトはその参照対象の対応するメソッドを呼び出すメソッドを持ちます (そうは言っても、参照対象のすべてのメソッドが必ずしもプロキシ経由で利用可能なわけではありません)。 この方法で、プロキシオブジェクトはまるでその参照先と同じように使えます:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

プロキシに str() を適用すると参照対象のオブジェクト表現を返すのに対して、 repr() を適用するとプロキシのオブジェクト表現を返すことに注意してください。

プロキシオブジェクトの重要な機能は pickle 化ができることで、これによりプロセス間での受け渡しができます。 そのため、参照対象が Proxy オブジェクト を持てます。 これによって管理されたリスト、辞書、その他 Proxy オブジェクト をネストできます:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

同様に、辞書とリストのプロキシも他のプロキシの内部に入れてネストできます:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

(プロキシでない) 標準の list オブジェクトや dict オブジェクトが参照対象に含まれていた場合、それらの可変な値の変更はマネージャーからは伝搬されません。 というのも、プロキシには参照対象の中に含まれる値がいつ変更されたかを知る術が無いのです。 しかし、コンテナプロキシに値を保存する (これはプロキシオブジェクトの __setitem__ を起動します) 場合はマネージャーを通して変更が伝搬され、その要素を実際に変更するために、コンテナプロキシに変更後の値が再代入されます:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

This approach is perhaps less convenient than employing nested Proxy オブジェクト for most use cases but also demonstrates a level of control over the synchronization.

注釈

multiprocessing のプロキシ型は値による比較に対して何もサポートしません。そのため、例えば以下のようになります:

>>> manager.list([1,2,3]) == [1,2,3]
False

比較を行いたいときは参照対象のコピーを使用してください。

class multiprocessing.managers.BaseProxy

プロキシオブジェクトは BaseProxy のサブクラスのインスタンスです。

_callmethod(methodname[, args[, kwds]])

プロキシの参照対象のメソッドの実行結果を返します。

proxy がプロキシで、プロキシ内の参照対象が obj ならこの式

proxy._callmethod(methodname, args, kwds)

はこの式を評価します

getattr(obj, methodname)(*args, **kwds)

(マネージャープロセス内の)。

返される値はその呼び出し結果のコピーか、新たな共有オブジェクトに対するプロキシになります。詳細は BaseManager.register()method_to_typeid 引数のドキュメントを参照してください。

その呼び出しによって例外が発生した場合、_callmethod() によってその例外は再送出されます。他の例外がマネージャープロセスで発生したなら、RemoteError 例外に変換されたものが _callmethod() によって送出されます。

特に methodname公開 されていない場合は例外が発生することに注意してください。

_callmethod() の使用例になります:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

参照対象のコピーを返します。

参照対象が unpickle 化できるなら例外を発生します。

__repr__()

プロキシオブジェクトのオブジェクト表現を返します。

__str__()

参照対象のオブジェクト表現を返します。

17.2.2.8.1. クリーンアップ

プロキシオブジェクトは弱参照(weakref)コールバックを使用します。プロキシオブジェクトがガベージコレクトされるときにその参照対象が所有するマネージャーからその登録を取り消せるようにするためです。

共有オブジェクトはプロキシが参照しなくなったときにマネージャープロセスから削除されます。

17.2.2.9. プロセスプール

Pool クラスでタスクを実行するプロセスのプールを作成することができます。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

プロセスプールオブジェクトは、ジョブを送り込めるワーカープロセスのプールを制御します。タイムアウトやコールバックのある非同期の実行をサポートし、並列 map 実装を持ちます。

processes は使用するワーカープロセスの数です。processesNone の場合 os.cpu_count() が返す値を使用します。

initializerNone ではない場合、各ワーカープロセスは開始時に initializer(*initargs) を呼び出します。

maxtasksperchild は、ワーカープロセスが exit して新たなワーカープロセスと置き替えられるまでの間に、ワーカープロセスが完了することのできるタスクの数です。この設定により未利用のリソースが解放されるようなります。デフォルトの maxtasksperchildNone で、これはワーカープロセスがプールと同じ期間だけ生き続けるということを意味します。

context はワーカープロセスを開始するために使用されるコンテキストの指定に使用できます。通常プールは関数 multiprocessing.Pool() かコンテキストオブジェクトの Pool() メソッドを使用して作成されます。どちらの場合でも context は適切に設定されます。

プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。

バージョン 3.2 で追加: maxtasksperchild

バージョン 3.4 で追加: context

注釈

Pool 中のワーカープロセスは、典型的にはプールのワークキューの存続期間とちょうど同じだけ生き続けます。ワーカーに確保されたリソースを解放するために (Apache, mod_wsgi, などのような) 他のシステムによく見られるパターンは、プール内のワーカーが設定された量だけの仕事を完了したら exit とクリーンアップを行い、古いプロセスを置き換えるために新しいプロセスを生成するというものです。 Poolmaxtasksperchild 引数は、この能力をエンドユーザーに提供します。

apply(func[, args[, kwds]])

引数 args とキーワード引数 kwds を伴って func を呼びます。結果が準備できるまでブロックします。このブロックがあるため、 apply_async() の方が並行作業により適しています。加えて、 func は、プール内の1つのワーカーだけで実行されます。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply() メソッドの派生版で結果オブジェクトを返します。

callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。結果を返せるようになったときに callback が結果オブジェクトに対して適用されます。ただし呼び出しが失敗した場合は、代わりに error_callback が適用されます。

error_callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。対象の関数が失敗した場合、例外インスタンスを伴って error_callback が呼ばれます。

コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。

map(func, iterable[, chunksize])

map() 組み込み関数の並列版です (iterable な引数を1つだけサポートするという違いはありますが)。結果が出るまでブロックします。

このメソッドはイテラブルをいくつものチャンクに分割し、プロセスプールにそれぞれ独立したタスクとして送ります。(概算の) チャンクサイズは chunksize を正の整数に設定することで指定できます。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() メソッドの派生版で結果オブジェクトを返します。

callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。結果を返せるようになったときに callback が結果オブジェクトに対して適用されます。ただし呼び出しが失敗した場合は、代わりに error_callback が適用されます。

error_callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。対象の関数が失敗した場合、例外インスタンスを伴って error_callback が呼ばれます。

コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。

imap(func, iterable[, chunksize])

map() の遅延評価版です。

chunksize 引数は map() メソッドで使用されるものと同じです。 引数 iterable がとても長いなら chunksize に大きな値を指定して使用する方がデフォルト値の 1 を使用するよりもジョブの完了が かなり 速くなります。

また chunksize1 の場合 imap() メソッドが返すイテレーターの next() メソッドはオプションで timeout パラメーターを持ちます。 next(timeout) は、その結果が timeout 秒以内に返されないときに multiprocessing.TimeoutError を発生させます。

imap_unordered(func, iterable[, chunksize])

イテレーターが返す結果の順番が任意の順番で良いと見なされることを除けば imap() と同じです。 (ワーカープロセスが1つしかない場合のみ "正しい" 順番になることが保証されます。)

starmap(func, iterable[, chunksize])

iterable の要素が、引数として unpack されるイテレート可能オブジェクトであると期待される以外は、 map() と似ています。

そのため、iterable[(1,2), (3, 4)] なら、結果は [func(1,2), func(3,4)] になります。

バージョン 3.3 で追加.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

starmap()map_async() の組み合わせです。 イテレート可能オブジェクトの iterable をイテレートして、 unpack したイテレート可能オブジェクトを伴って func を呼び出します。結果オブジェクトを返します。

バージョン 3.3 で追加.

close()

これ以上プールでタスクが実行されないようにします。すべてのタスクが完了した後でワーカープロセスが終了します。

terminate()

実行中の処理を完了させずにワーカープロセスをすぐに停止します。プールオブジェクトがガベージコレクトされるときに terminate() が呼び出されます。

join()

ワーカープロセスが終了するのを待ちます。 join() を使用する前に close()terminate() を呼び出さなければなりません。

バージョン 3.3 で追加: Pool オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 – コンテキストマネージャ型 を参照してください。 __enter__() は Pool オブジェクトを返します。また __exit__()terminate() を呼び出します。

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async() で返される結果のクラスです。

get([timeout])

結果を受け取ったときに返します。 timeoutNone ではなくて、その結果が timeout 秒以内に受け取れない場合 multiprocessing.TimeoutError が発生します。リモートの呼び出しが例外を発生させる場合、その例外は get() が再発生させます。

wait([timeout])

その結果が有効になるか timeout 秒経つまで待ちます。

ready()

その呼び出しが完了しているかどうかを返します。

successful()

その呼び出しが例外を発生させることなく完了したかどうかを返します。その結果が返せる状態でない場合 AssertionError が発生します。

次の例はプールの使用例を紹介します:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

17.2.2.10. リスナーとクライアント

Usually message passing between processes is done using queues or by using Connection objects returned by Pipe().

しかし multiprocessing.connection モジュールにはさらに柔軟な仕組みがあります。 このモジュールは、基本的にはソケットもしくは Windows の名前付きパイプを扱う高レベルのメッセージ指向 API を提供します。また、 hmac モジュールを使用した ダイジェスト認証 や同時の複数接続のポーリングもサポートします。

multiprocessing.connection.deliver_challenge(connection, authkey)

ランダム生成したメッセージをコネクションの相手側へ送信して応答を待ちます。

その応答がキーとして authkey を使用するメッセージのダイジェストと一致する場合、 コネクションの相手側へ歓迎メッセージを送信します。 そうでなければ AuthenticationError を発生させます。

multiprocessing.connection.answer_challenge(connection, authkey)

メッセージを受信して、そのキーとして authkey を使用するメッセージのダイジェストを計算し、ダイジェストを送り返します。

歓迎メッセージを受け取れない場合 AuthenticationError が発生します。

multiprocessing.connection.Client(address[, family[, authkey]])

Attempt to set up a connection to the listener which is using address address, returning a Connection.

コネクション種別は family 引数で決定しますが、一般的には address のフォーマットから推測できるので、これは指定されません。 (アドレスフォーマット を参照してください)

If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None. AuthenticationError is raised if authentication fails. See 認証キー.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

コネクションを '待ち受ける' 束縛されたソケットか Windows の名前付きパイプのラッパーです。

address はリスナーオブジェクトの束縛されたソケットか名前付きパイプが使用するアドレスです。

注釈

'0.0.0.0' のアドレスを使用する場合、Windows 上の終点へ接続することができません。終点へ接続したい場合は '127.0.0.1' を使用すべきです。

family は使用するソケット(名前付きパイプ)の種別です。これは 'AF_INET' (TCP ソケット), 'AF_UNIX' (Unix ドメインソケット) または 'AF_PIPE' (Windows 名前付きパイプ) という文字列のどれか1つになります。これらのうち 'AF_INET' のみが利用可能であることが保証されています。 familyNone の場合 address のフォーマットから推測されたものが使用されます。 addressNone の場合はデフォルトが選択されます。詳細は アドレスフォーマット を参照してください。 family'AF_UNIX'addressNone の場合 tempfile.mkstemp() を使用して作成されたプライベートな一時ディレクトリにソケットが作成されます。

リスナーオブジェクトがソケットを使用する場合、ソケットに束縛されるときに backlog (デフォルトでは1つ) がソケットの listen() メソッドに対して渡されます。

If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None. AuthenticationError is raised if authentication fails. See 認証キー.

accept()

Accept a connection on the bound socket or named pipe of the listener object and return a Connection object. If authentication is attempted and fails, then AuthenticationError is raised.

close()

リスナーオブジェクトの名前付きパイプか束縛されたソケットをクローズします。これはリスナーがガベージコレクトされるときに自動的に呼ばれます。そうは言っても、明示的に close() を呼び出す方が望ましいです。

リスナーオブジェクトは次の読み出し専用属性を持っています:

address

リスナーオブジェクトが使用中のアドレスです。

last_accepted

最後にコネクションを受け付けたアドレスです。有効なアドレスがない場合は None になります。

バージョン 3.3 で追加: Listener オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 – コンテキストマネージャ型 を参照してください。 __enter__() はリスナーオブジェクトを返します。また __exit__()close() を呼び出します。

multiprocessing.connection.wait(object_list, timeout=None)

object_list 中のオブジェクトが準備ができるまで待機します。準備ができた object_list 中のオブジェクトのリストを返します。timeout が浮動小数点なら、最大でその秒数だけ呼び出しがブロックします。timeoutNone の場合、無制限の期間ブロックします。負のタイムアウトは0と等価です。

Unix と Windows の両方で、 object_list には以下のオブジェクトを含めることが出来ます

読み取ることのできるデータがある場合、あるいは相手側の端が閉じられている場合、コネクションまたはソケットオブジェクトは準備ができています。

Unix: wait(object_list, timeout)select.select(object_list, [], [], timeout) とほとんど等価です。違いは、 select.select() がシグナルによって中断される場合、 EINTR のエラー番号付きで OSError を上げるということです。 wait() はそのようなことは行いません。

Windows: object_list の要素は、 (Win32 関数 WaitForMultipleObjects() のドキュメントで使われている定義から) wait 可能な整数ハンドルか、ソケットハンドルまたはパイプハンドルを返す fileno() メソッドを持つオブジェクトのどちらかでなければなりません。 (パイプハンドルとソケットハンドラーは wait 可能なハンドルでは ない ことに注意してください。)

バージョン 3.3 で追加.

次のサーバーコードは認証キーとして 'secret password' を使用するリスナーを作成します。このサーバーはコネクションを待ってクライアントへデータを送信します:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

次のコードはサーバーへ接続して、サーバーからデータを受信します:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

次のコードは wait() を使って複数のプロセスからのメッセージを同時に待ちます:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

17.2.2.10.1. アドレスフォーマット

  • 'AF_INET' アドレスは (hostname, port) のタプルになります。 hostname は文字列で port は整数です。
  • 'AF_UNIX' アドレスはファイルシステム上のファイル名の文字列です。
  • 'AF_PIPE' アドレスは、次の形式を持つ文字列です
    r'\\.\pipe\PipeName'ServerName という名前のリモートコンピューター上の名前付きパイプに接続するために Client() を使用するには、代わりに r'\\ServerName\pipe\PipeName' 形式のアドレスを使用する必要があります。

デフォルトでは、2つのバックスラッシュで始まる文字列は 'AF_UNIX' よりも 'AF_PIPE' として推測されることに注意してください。

17.2.2.11. 認証キー

When one uses Connection.recv, the data received is automatically unpickled. Unfortunately unpickling data from an untrusted source is a security risk. Therefore Listener and Client() use the hmac module to provide digest authentication.

認証キーはパスワードとして見なされるバイト文字列です。コネクションが確立すると、双方の終点で正しい接続先であることを証明するために 知っているお互いの認証キーを要求します。(双方の終点が同じキーを使用して通信しようとしても、コネクション上でそのキーを送信することは できません。)

認証が要求されているにもかかわらず認証キーが指定されていない場合 current_process().authkey の返す値が使用されます。 (詳細は Process を参照してください。) この値はカレントプロセスを作成する Process オブジェクトによって自動的に継承されます。 これは(デフォルトでは)複数プロセスのプログラムの全プロセスが相互にコネクションを 確立するときに使用される1つの認証キーを共有することを意味します。

適当な認証キーを os.urandom() を使用して生成することもできます。

17.2.2.12. ログ記録

ロギングのためにいくつかの機能が利用可能です。しかし logging パッケージは、 (ハンドラー種別に依存して)違うプロセスからのメッセージがごちゃ混ぜになるので、プロセスの共有ロックを使用しないことに注意してください。

multiprocessing.get_logger()

multiprocessing が使用するロガーを返します。必要に応じて新たなロガーを作成します。

最初に作成するとき、ロガーはレベルに logging.NOTSET が設定されていてデフォルトハンドラーがありません。このロガーへ送られるメッセージはデフォルトではルートロガーへ伝播されません。

Windows 上では子プロセスが親プロセスのロガーレベルを継承しないことに注意してください。さらにその他のロガーのカスタマイズ内容もすべて継承されません。

multiprocessing.log_to_stderr()

この関数は get_logger() に対する呼び出しを実行しますが、 get_logger によって作成されるロガーを返すことに加えて、 '[%(levelname)s/%(processName)s] %(message)s' のフォーマットを使用して sys.stderr へ出力を送るハンドラーを追加します。

以下にロギングを有効にした例を紹介します:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

完全なロギングレベルの表については logging モジュールを参照してください。

17.2.2.13. multiprocessing.dummy モジュール

multiprocessing.dummymultiprocessing の API を複製しますが threading モジュールのラッパーでしかありません。

17.2.3. プログラミングガイドライン

multiprocessing を使用するときに守るべき一定のガイドラインとイディオムを挙げます。

17.2.3.1. すべての開始方式について

以下はすべての開始方式に当てはまります。

共有状態を避ける

できるだけプロセス間で巨大なデータを移動することは避けるようにすべきです。

プロセス間の通信には、threading モジュールの低レベルな同期プリミティブを使うのではなく、キューやパイプを使うのが良いでしょう。

pickle 化の可能性

プロキシのメソッドへの引数は、 pickle 化できるものにしてください。

プロキシのスレッドセーフ性

1 つのプロキシオブジェクトは、ロックで保護しないかぎり、2 つ以上のスレッドから使用してはいけません。

(異なるプロセスで 同じ プロキシを使用することは問題ではありません。)

ゾンビプロセスを join する

Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (または active_children() が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスの Process.is_alive はそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。

pickle/unpickle より継承する方が良い

開始方式に spawn あるいは forkserver を使用している場合、multiprocessing から多くの型を pickle 化する必要があるため子プロセスはそれらを使うことができます。しかし、一般にパイプやキューを使用して共有オブジェクトを他のプロセスに送信することは避けるべきです。代わりに、共有リソースにアクセスする必要のあるプロセスは上位プロセスからそれらを継承するようにすべきです。

プロセスの強制終了を避ける

あるプロセスを停止するために Process.terminate メソッドを使用すると、そのプロセスが現在使用されている (ロック、セマフォ、パイプやキューのような) 共有リソースを破壊したり他のプロセスから利用できない状態を引き起こし易いです。

そのため、共有リソースを使用しないプロセスでのみ Process.terminate を使用することを考慮することがおそらく最善の方法です。

キューを使用するプロセスを join する

キューに要素を追加するプロセスは、すべてのバッファーされた要素が "feeder" スレッドによって下位層のパイプに対してフィードされるまで終了を待つということを覚えておいてください。 (子プロセスはこの動作を避けるためにキューの Queue.cancel_join_thread メソッドを呼ぶことができます。)

これはキューを使用するときに、キューに追加されたすべての要素が最終的にそのプロセスが join される前に削除されていることを確認する必要があることを意味します。そうしないと、そのキューに要素が追加したプロセスの終了を保証できません。デーモンではないプロセスは自動的に join されることも覚えておいてください。

次の例はデッドロックを引き起こします:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

修正するには最後の2行を入れ替えます(または単純に p.join() の行を削除します)。

明示的に子プロセスへリソースを渡す

Unix で開始方式に fork を使用している場合、子プロセスはグローバルリソースを使用した親プロセス内で作成された共有リソースを使用できます。しかし、オブジェクトを子プロセスのコンストラクターに引数として渡すべきです。

Windows や他の開始方式と (将来的にでも) 互換性のあるコードを書く場合は別として、これは子プロセスが実行中である限りは親プロセス内でオブジェクトがガベージコレクトされないことも保証します。これは親プロセス内でオブジェクトがガベージコレクトされたときに一部のリソースが開放されてしまう場合に重要かもしれません。

そのため、例えば

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

は、次のように書き直すべきです

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

sys.stdin を file-like オブジェクトに置き換えることに注意する

multiprocessing は元々無条件に:

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() メソッドの中で呼び出していました — これはプロセス内プロセス (processes-in-processes) で問題が起こしてしまいます。そこで、これは以下のように変更されました:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

これによってプロセス同士が衝突して bad file descripter エラーを起こすという根本的な問題は解決しましたが、アプリケーションの出力バッファーを sys.stdin() から "file-like オブジェクト" に置き換えるという潜在的危険を持ち込んでしまいました。危険というのは、複数のプロセスが file-like オブジェクトの close() を呼び出すと、オブジェクトに同じデータが何度もフラッシュされ、破損してしまう可能性がある、というものです。

もし file-like オブジェクトを書いて独自のキャッシュを実装するなら、キャッシュするときに常に pid を記録しておき、pid が変わったらキュッシュを捨てることで、フォークセーフにできます。例:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

より詳しい情報は bpo-5155bpo-5313bpo-5331 を見てください

17.2.3.2. 開始方式が spawn および forkserver の場合

開始方式に fork を適用しない場合にいくつかの追加の制限事項があります。

さらなる pickle 化の可能性

Process.__init__() へのすべての引数は pickle 化できることを確認してください。また Process をサブクラス化する場合、そのインスタンスが Process.start メソッドが呼ばれたときに pickle 化できるようにしてください。

グローバル変数

子プロセスで実行されるコードがグローバル変数にアクセスしようとする場合、子プロセスが見るその値は Process.start が呼ばれたときの親プロセスの値と同じではない可能性があります。

しかし、単にモジュールレベルの定数であるグローバル変数なら問題にはなりません。

メインモジュールの安全なインポート

新たな Python インタプリタによるメインモジュールのインポートが、意図しない副作用 (新たなプロセスを開始する等) を起こさずできるようにしてください。

例えば、開始方式に spawn あるいは forkserver を使用した場合に以下のモジュールを実行すると RuntimeError で失敗します:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

代わりに、次のように if __name__ == '__main__': を使用してプログラムの "エントリポイント" を保護すべきです:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(プログラムをフリーズせずに通常通り実行するなら freeze_support() 行は取り除けます。)

これは新たに生成された Python インタープリターがそのモジュールを安全にインポートして、モジュールの foo() 関数を実行します。

プールまたはマネージャーがメインモジュールで作成される場合に似たような制限が適用されます。

17.2.4. 使用例

カスタマイズされたマネージャーやプロキシの作成方法と使用方法を紹介します:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Pool を使用する例です:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

ワーカープロセスのコレクションに対してタスクをフィードしてその結果をまとめるキューの使い方の例を紹介します:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()