18.5.9. asyncio での開発

非同期プログラミングは古典的な “逐次” プログラミングとは異なります。このページでは非同期プログラミングで陥りやすい落とし穴の一覧とその回避方法について説明しています。

18.5.9.1. asyncio のデバッグモード

The implementation of asyncio has been written for performance. In order to ease the development of asynchronous code, you may wish to enable debug mode.

アプリケーションで全てのデバッグチェックを有効にするには:

  • Enable the asyncio debug mode globally by setting the environment variable PYTHONASYNCIODEBUG to 1, or by calling AbstractEventLoop.set_debug().
  • Set the log level of the asyncio logger to logging.DEBUG. For example, call logging.basicConfig(level=logging.DEBUG) at startup.
  • Configure the warnings module to display ResourceWarning warnings. For example, use the -Wdefault command line option of Python to display them.

デバッグチェックの例:

参考

AbstractEventLoop.set_debug() メソッドならびに asyncio logger

18.5.9.2. 取り消し

Cancellation of tasks is not common in classic programming. In asynchronous programming, not only it is something common, but you have to prepare your code to handle it.

Futures and tasks can be cancelled explicitly with their Future.cancel() method. The wait_for() function cancels the waited task when the timeout occurs. There are many other cases where a task can be cancelled indirectly.

Don’t call set_result() or set_exception() method of Future if the future is cancelled: it would fail with an exception. For example, write:

if not fut.cancelled():
    fut.set_result('done')

Don’t schedule directly a call to the set_result() or the set_exception() method of a future with AbstractEventLoop.call_soon(): the future can be cancelled before its method is called.

If you wait for a future, you should check early if the future was cancelled to avoid useless operations. Example:

@coroutine
def slow_operation(fut):
    if fut.cancelled():
        return
    # ... slow computation ...
    yield from fut
    # ...

The shield() function can also be used to ignore cancellation.

18.5.9.3. 並行処理とマルチスレッド処理

イベントループは 1 個のスレッド内で実行し、同じスレッド内ですべてのコールバックとタスクを実行します。1 個のタスクがイベントループ内で実行される間、他のタスクは同じスレッド内で実行されることはありません。ただし、タスクが yield from を使用するとそのタスクはサスペンドされ、イベントループは次のタスクを実行します。

異なるスレッドからコールバックをスケジュールする場合、 AbstractEventLoop.call_soon_threadsafe() メソッドを使用してください。例:

loop.call_soon_threadsafe(callback, *args)

ほとんどの asyncio オブジェクトはスレッドセーフではありません。イベントループの外からオブジェクトにアクセスしていないかどうかだけに注意してください。例えばフューチャーをキャンセルする場合にその Future.cancel() メソッドを直接呼び出すのではなく以下のようにします:

loop.call_soon_threadsafe(fut.cancel)

シグナルの処理やサブプロセスの実行を行うには、イベントループはメインスレッド内で実行しなければなりません。

別のスレッドからコルーチンオブジェクトをスケジュールする場合は、 run_coroutine_threadsafe() メソッドを使用してください。 run_coroutine_threadsafe() は結果にアクセスするための concurrent.futures.Future を返します:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
result = future.result(timeout)  # Wait for the result with a timeout

The AbstractEventLoop.run_in_executor() method can be used with a thread pool executor to execute a callback in different thread to not block the thread of the event loop.

参考

同期プリミティブ 節にはタスクの同期法が書かれています。

サブプロセスとスレッド 節では別スレッドからサブプロセスを実行する際の asyncio の限界を列挙しています。

18.5.9.4. ブロック関数を正しく扱う

ブロック関数を直接呼び出してはなりません。例えば、関数が 1 秒間ブロックした場合、他のタスクには 1 秒間の遅延が発生します。これは反応性において重大な影響が発生します。

ネットワークとサブプロセスには、asyncio モジュールは プロトコル のような高水準 API を提供しています。

An executor can be used to run a task in a different thread or even in a different process, to not block the thread of the event loop. See the AbstractEventLoop.run_in_executor() method.

参考

遅延呼び出し 節でイベントループで時間を扱う手順の詳細を説明しています。

18.5.9.5. ログ記録

asyncio モジュールは logging モジュールとともにロガー 'asyncio' の情報のログを記録します。

The default log level for the asyncio module is logging.INFO. For those not wanting such verbosity from asyncio the log level can be changed. For example, to change the level to logging.WARNING:

logging.getLogger('asyncio').setLevel(logging.WARNING)

18.5.9.6. スケジュールされなかったコルーチンオブジェクトの検出

When a coroutine function is called and its result is not passed to ensure_future() or to the AbstractEventLoop.create_task() method, the execution of the coroutine object will never be scheduled which is probably a bug. Enable the debug mode of asyncio to log a warning to detect it.

バグの例:

import asyncio

@asyncio.coroutine
def test():
    print("never scheduled")

test()

デバッグモードの出力:

Coroutine test() at test.py:3 was never yielded from
Coroutine object created at (most recent call last):
  File "test.py", line 7, in <module>
    test()

The fix is to call the ensure_future() function or the AbstractEventLoop.create_task() method with the coroutine object.

18.5.9.7. 未処理の例外の検出

Python は通常未処理の例外には sys.displayhook() を呼び出します。 Future.set_exception() が呼び出されたものの処理されなかった場合、 sys.displayhook() が呼び出されません。 代わりに、フューチャーがガベージコレクションで削除されたとき、例外発生場所のトレースバックとともに ログが記録され ます。

未処理の例外の例:

import asyncio

@asyncio.coroutine
def bug():
    raise Exception("not consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(bug())
loop.run_forever()
loop.close()

出力:

Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at asyncio/coroutines.py:139> exception=Exception('not consumed',)>
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

asyncio のデバッグモードの有効化 によりタスクが生成したトレースバックを取得できます。デバッグモードの出力は以下のようになります:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception('not consumed',) created at test.py:8>
source_traceback: Object created at (most recent call last):
  File "test.py", line 8, in <module>
    asyncio.ensure_future(bug())
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 79, in __next__
    return next(self.gen)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

この問題を解決するには異なるオプションがあります。最初のオプションでは、別のコルーチン内でコルーチンをチェーンし、古典的な try/except を使用します:

@asyncio.coroutine
def handle_exception():
    try:
        yield from bug()
    except Exception:
        print("exception consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(handle_exception())
loop.run_forever()
loop.close()

AbstractEventLoop.run_until_complete() 関数を使う別のオプション:

task = asyncio.ensure_future(bug())
try:
    loop.run_until_complete(task)
except Exception:
    print("exception consumed")

参考

Future.exception() メソッド。

18.5.9.8. コルーチンを正しくチェーンする

コルーチン関数が別のコルーチン関数かタスクを呼び出すとき、それらは yield from で明示的にチェーンされなければなりません。そうされなかった場合、逐次的に実行されることは保証されません。

asyncio.sleep() を使って処理速度の低下をシミュレートする異なるバグの例:

import asyncio

@asyncio.coroutine
def create():
    yield from asyncio.sleep(3.0)
    print("(1) create file")

@asyncio.coroutine
def write():
    yield from asyncio.sleep(1.0)
    print("(2) write into file")

@asyncio.coroutine
def close():
    print("(3) close file")

@asyncio.coroutine
def test():
    asyncio.ensure_future(create())
    asyncio.ensure_future(write())
    asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

loop = asyncio.get_event_loop()
asyncio.ensure_future(test())
loop.run_forever()
print("Pending tasks at exit: %s" % asyncio.Task.all_tasks(loop))
loop.close()

予想される出力:

(1) create file
(2) write into file
(3) close file
Pending tasks at exit: set()

実際の出力:

(3) close file
(2) write into file
Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>

create() が完了する前か、write() を呼び出す前に close() が呼び出されたか、その一方でコルーチン関数が create()write()close() の順で呼び出された場合、ループは停止します。

この問題を解決するには、タスクは yield from でマークされなければなりません:

@asyncio.coroutine
def test():
    yield from asyncio.ensure_future(create())
    yield from asyncio.ensure_future(write())
    yield from asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

あるいは、asyncio.ensure_future() を使いません:

@asyncio.coroutine
def test():
    yield from create()
    yield from write()
    yield from close()
    yield from asyncio.sleep(2.0)
    loop.stop()

18.5.9.9. 未完のタスクの破棄

未完のタスクが破棄された場合、それがラップした コルーチン は完了しません。これがおそらくバグであり、そのため警告がログに記録されます。

Example of log:

Task was destroyed but it is pending!
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>

Enable the debug mode of asyncio to get the traceback where the task was created. Example of log in debug mode:

Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "test.py", line 15, in <module>
    task = asyncio.ensure_future(coro, loop=loop)
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()] created at test.py:7> created at test.py:15>

18.5.9.10. トランスポートとイベントループを閉じる

When a transport is no more needed, call its close() method to release resources. Event loops must also be closed explicitly.

If a transport or an event loop is not closed explicitly, a ResourceWarning warning will be emitted in its destructor. By default, ResourceWarning warnings are ignored. The Debug mode of asyncio section explains how to display them.