18.5.4. トランスポートとプロトコル (コールバックベースの API)¶
18.5.4.1. トランスポート¶
Transports are classes provided by asyncio in order to abstract
various kinds of communication channels. You generally won’t instantiate
a transport yourself; instead, you will call an AbstractEventLoop method
which will create the transport and try to initiate the underlying
communication channel, calling you back when it succeeds.
いったん通信チャンネルが確立されると、トランスポートは常に プロトコル インスタンスとのペアを成します。プロトコルはその後さまざまな用途のためトランスポートのメソッドを呼び出します。
asyncio は現在 TCP、UDP、SSL およびサブプロセスパイプのトランスポートを実装しています。利用可能なトランスポートのメソッドはトランスポートの種類に依存します。
トランスポートクラスは スレッド安全ではありません。
18.5.4.1.1. BaseTransport¶
-
class
asyncio.BaseTransport¶ トランスポートの基底クラスです。
-
close()¶ トランスポートをクローズします。トランスポートが発信データのバッファーを持っていた場合、バッファーされたデータは非同期にフラッシュされます。それ以降データは受信されません。バッファーされていたデータがすべてフラッシュされた後、そのプロトコルの
connection_lost()メソッドが引数Noneで呼び出されます。
-
is_closing()¶ トランスポートを閉じている最中か閉じていた場合
Trueを返します。バージョン 3.5.1 で追加.
-
get_extra_info(name, default=None)¶ オプションのトランスポート情報を返します。name は取得したトランスポート固有の情報を表す文字列で、default は情報が存在しなかったときに返す値になります。
このメソッドはトランスポートの実装に容易にチャンネル固有の情報を渡すことができます。
ソケット:
'peername': ソケットが接続されているリモートアドレスで、socket.socket.getpeername()の結果になります (エラーのときはNone)'socket':socket.socketのインスタンスになります'sockname': ソケット自身のアドレスで、socket.socket.getsockname()の結果になります
SSL ソケット:
'compression': 圧縮アルゴリズムで、ssl.SSLSocket.compression()の結果になります。圧縮されていないときはNoneになります'cipher': 3 個の値 (使用されている暗号アルゴリズムの名称、使用が定義されている SSL プロトコルのバージョン、および使用されている秘密鍵のビット数) からなるタプルで、ssl.SSLSocket.cipher()の結果になります'peercert': ピアの証明書で、ssl.SSLSocket.getpeercert()の結果になります'sslcontext':ssl.SSLContextのインスタンスになります'ssl_object':ssl.SSLObjectまたはssl.SSLSocketインスタンス
パイプ:
'pipe': パイプオブジェクトです
サブプロセス:
'subprocess':subprocess.Popenのインスタンスになります
-
set_protocol(protocol)¶ Set a new protocol. Switching protocol should only be done when both protocols are documented to support the switch.
バージョン 3.5.3 で追加.
-
get_protocol()¶ Return the current protocol.
バージョン 3.5.3 で追加.
バージョン 3.5.1 で変更:
'ssl_object'情報が SSL ソケットに追加されました。-
18.5.4.1.2. ReadTransport¶
-
class
asyncio.ReadTransport¶ 読み込み専用トランスポートのインターフェースです。
-
pause_reading()¶ トランスポートの受信側を一時停止します。
resume_reading()が呼び出されるまでそのプロトコルのdata_received()メソッドにデータは渡されません。
-
resume_reading()¶ 受信を再開します。読み込み可能データが存在した場合そのプロトコルの
data_received()メソッドが一度呼び出されます。
-
18.5.4.1.3. WriteTransport¶
-
class
asyncio.WriteTransport¶ 書き込み専用トランスポートのインターフェースです。
-
abort()¶ トランスポートを即座にクローズします。未完了の処理があってもそれを待ちません。バッファーされているデータは失われます。それ以降データは受信されません。最終的にそのプロトコルの
connection_lost()メソッドが引数Noneで呼び出されます。
-
can_write_eof()¶ トランスポートが
write_eof()をサポートしている場合Trueを、サポートしていない場合はFalseを返します。
-
get_write_buffer_size()¶ トランスポートで使用されている出力バッファーの現在のサイズを返します。
-
get_write_buffer_limits()¶ 書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を取得します。
(low, high)のタプルを返します。low および high は整数のバイト列になります。水位点の設定は
set_write_buffer_limits()で行います。バージョン 3.4.2 で追加.
-
set_write_buffer_limits(high=None, low=None)¶ 書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を設定します。
これら 2 個の値はプロトコルの
pause_writing()およびresume_writing()メソッドが呼ばれたときの振る舞いを制御します。指定する場合、low は high 以下でなければなりません。high も low も負数は指定できません。The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes
pause_writing()to be called whenever the buffer becomes non-empty. Setting low to zero causesresume_writing()to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently.水位点の取得には
get_write_buffer_limits()を使用します。
-
write(data)¶ トランスポートにバイト列 data を書き込みます。
このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。
-
writelines(list_of_data)¶ バイト列のデータのリスト (またはイテラブル) をトランスポートに書き込みます。この振る舞いはイテラブルを yield して各要素で
write()を呼び出すことと等価ですが、より効率的な実装となる場合があります。
-
write_eof()¶ バッファーされたデータをフラッシュした後トランスポートの送信側をクローズします。データは受信されます。
このメソッドはトランスポート (例えば SSL) がハーフクローズをサポートしていない場合
NotImplementedErrorを送出します。
-
18.5.4.1.4. DatagramTransport¶
18.5.4.1.5. BaseSubprocessTransport¶
-
class
asyncio.BaseSubprocessTransport¶ -
get_pid()¶ サブプロセスのプロセス ID (整数) を返します。
-
get_pipe_transport(fd)¶ 整数のファイル記述子 fd に該当する通信パイプのトランスポートを返します:
-
get_returncode()¶ サブプロセスのリターンコード (整数) を返します。リターンコードを持たない場合
Noneを返します。subprocess.Popen.returncode属性と同じです。
-
kill()¶ Kill the subprocess, as in
subprocess.Popen.kill().POSIX システムでは、この関数はサブプロセスに SIGKILL を送信します。Windows では、このメソッドは
terminate()の別名です。
-
send_signal(signal)¶ サブプロセスにシグナル signal を送信します。
subprocess.Popen.send_signal()と同じです。
-
terminate()¶ サブプロセスに停止を要求します。
subprocess.Popen.terminate()と同じです。このメソッドはclose()メソッドの別名です。POSIX システムでは、このメソッドはサブプロセスに SIGTERM を送信します。Windows では、Windows API 関数 TerminateProcess() が呼び出されます。
-
close()¶ サブプロセスがまだ返していない場合、
terminate()メソッドの呼び出しによってサブプロセスに停止を要求し、全パイプ (stdin、stdout および stderr) のトランスポートをクローズします。
-
18.5.4.2. プロトコル¶
asyncio はネットワークプロトコルの実装をサブクラス化する基底クラスを提供します。これらクラスは トランスポート と連動して使用されます: プロトコルは入力データの解析および出力データの書き込みのための問い合わせを行い、トランスポートは実際の I/O とバッファリングに責任を持ちます。
プロトコルクラスをサブクラス化するとき、いくつかのメソッドをオーバーライドすることを推奨します。これらメソッドはコールバックです: いくつかのイベントが発生したとき (例えばデータの受信など) に呼び出されます; あなたがトランスポートを実装する場合を除き、これらを直接呼び出すべきではありません。
注釈
すべてのコールバックはデフォルトで空の実装を持ちます。したがって、あなたが興味を持ったイベント用のコールバックのみ実装が必要になります。
18.5.4.2.1. プロトコルクラス群¶
-
class
asyncio.Protocol¶ (例えば TCP や SSL トランスポートとともに使用する) ストリーミングプロトコルを実装する基底クラスです。
-
class
asyncio.DatagramProtocol¶ (例えば UDP トランスポートともに使用する) データグラムプロトコルを実装する基底クラスです。
-
class
asyncio.SubprocessProtocol¶ 子プロセスと (一方向パイプを使用して) 通信するプロトコルを実装する基底クラスです。
18.5.4.2.2. コネクションコールバック¶
これらコールバックは Protocol、DatagramProtocol および SubprocessProtocol インスタンスから呼び出される場合があります:
-
BaseProtocol.connection_made(transport)¶ コネクションが作成されたときに呼び出されます。
引数 transport はコネクションを表すトランスポートです。必要であれば、それをどこに格納するか (例えば属性へ) を決めるのはあなたです。
-
BaseProtocol.connection_lost(exc)¶ コネクションが失われた、あるいはクローズされたときに呼び出されます。
引数は例外オブジェクトまたは
Noneになります。Noneのとき、通常の EOF が受信されたか、あるいはコネクションがこちら側から中止またはクローズされたことを意味します。
connection_made() および connection_lost() は接続が成功するたびに厳密に 1 回呼び出されます。その他のすべてのコールバックはこれら 2 つのメソッドの間に呼び出され、あなたのプロトコルの実装内のリソース管理を容易に行えます。
以下のコールバックを呼び出すのは SubprocessProtocol インスタンスのみかもしれません:
-
SubprocessProtocol.pipe_data_received(fd, data)¶ 子プロセスが自身の標準出力や標準エラー出力のパイプにデータを書き込んだときに呼び出されます。fd はパイプのファイル記述子 (整数) になります。data はデータを含む空ではないバイト列になります。
-
SubprocessProtocol.pipe_connection_lost(fd, exc)¶ 子プロセスと通信するパイプの一つがクローズされると呼び出されます。fd はクローズされたファイル記述子 (整数) になります。
-
SubprocessProtocol.process_exited()¶ 子プロセスが終了したときに呼び出されます。
18.5.4.2.3. ストリーミングプロトコル¶
以下のコールバックは Protocol インスタンス上で呼び出されます:
-
Protocol.data_received(data)¶ データを受信したときに呼び出されます。data は受信したデータを含む空ではないバイト列オブジェクトになります。
注釈
トランスポートによって、データのバッファー、チャンクあるいは再構築のどれかが行われます。一般に、固有のセマンティックを信頼すべきではなく、代わりに全体的かつ十分に柔軟な解析を行うべきです。ただし、データは常に正しい順序で受信されます。
-
Protocol.eof_received()¶ Called when the other end signals it won’t send any more data (for example by calling
write_eof(), if the other end also uses asyncio).This method may return a false value (including
None), in which case the transport will close itself. Conversely, if this method returns a true value, closing the transport is up to the protocol. Since the default implementation returnsNone, it implicitly closes the connection.注釈
SSL のような一部のトランスポートはハーフクローズ接続をサポートしていません。その場合このメソッドが真値を返すとコネクションのクローズを回避できません。
接続中、data_received() は複数回呼び出されえます。eof_received() が呼び出されるのは 1 回で、1 度呼び出されると、その後 data_received() が呼び出されることはありません。
State machine:
18.5.4.2.4. データグラムプロトコル¶
以下のコールバックは DatagramProtocol インスタンス上で呼び出されます。
-
DatagramProtocol.datagram_received(data, addr)¶ データグラムを受信したときに呼び出されます。data は受信データを含むバイトオブジェクトです。addr はデータを送信するピアのアドレスです; 正確な形式はトランスポートに依存します。
18.5.4.2.5. フロー制御コールバック¶
これらコールバックは Protocol、DatagramProtocol および SubprocessProtocol インスタンスから呼び出される場合があります:
-
BaseProtocol.pause_writing()¶ トランスポートのバッファーサイズが最高水位点 (High-Water Mark) を超えたときに呼び出されます。
-
BaseProtocol.resume_writing()¶ トランスポートのバッファーサイズが最低水位点 (Low-Water Mark) に達したきに呼び出されます。
pause_writing() および resume_writing() の呼び出しは対になります。– pause_writing() はバッファーが完全に最高水位点を超えたとき (後続の書き込みがさらにバッファーサイズを増やすとしても) 1 度呼び出され、バッファーサイズが最終的に最低水位点に達したときに resume_writing() が 1 度呼び出されます。
注釈
バッファーサイズが最高水位点と等しくなった時点では pause_writing() は呼び出されません – 完全に超えなければなりません。対して、バッファーサイズが最低水位点と等しくなったときは resume_writing() は呼び出されます。これら端末条件は各点がゼロになったとき予定通りに動作するかどうか確認するために重要です。
注釈
BSD システム (OS X、FreeBSD など) では、DatagramProtocol でのフロー制御は、大量のパケット送信による送信失敗を検知するのが容易ではないためサポートされていません。ソケットは常に ‘待機状態’ のように見え、超過分のパケットは破棄されます; エラー番号 errno.ENOBUFS が設定された OSError が送出されるときもあればされないときもあります; 送出された場合、DatagramProtocol.error_received() に通知されますが、送出されないと無視されます。
18.5.4.2.6. コルーチンとプロトコル¶
Coroutines can be scheduled in a protocol method using ensure_future(),
but there is no guarantee made about the execution order. Protocols are not
aware of coroutines created in protocol methods and so will not wait for them.
信頼できる実行順を持つには、コルーチンの yield from で ストリームオブジェクト を使用します。例えば、StreamWriter.drain() コルーチンは書き込みバッファーがフラッシュされるまで待機することができます。
18.5.4.3. プロトコルの例¶
18.5.4.3.1. TCP Echo クライアントプロトコル¶
TCP echo client using the AbstractEventLoop.create_connection() method, send
data and wait until the connection is closed:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
'127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
The event loop is running twice. The
run_until_complete() method is preferred in this short
example to raise an exception if the server is not listening, instead of
having to write a short coroutine to handle the exception and stop the
running loop. At run_until_complete() exit, the loop is
no longer running, so there is no need to stop the loop in case of an error.
参考
ストリームを使った TCP Echo クライアント の例では asyncio.open_connection() 関数を使用しています。
18.5.4.3.2. TCP Echo サーバープロトコル¶
TCP echo server using the AbstractEventLoop.create_server() method, send back
received data and close the connection:
import asyncio
class EchoServerClientProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Transport.close() は、データがまだソケットに送信されていなくても、WriteTransport.write() の直後に呼び出されます: それぞれのメソッドは非同期です。これらトランスポートメソッドはコルーチンではないため、yield from は必要ありません。
参考
ストリームを使った TCP Echo サーバー の例では asyncio.start_server() 関数を使用しています。
18.5.4.3.3. UDP Echo クライアントプロトコル¶
UDP echo client using the AbstractEventLoop.create_datagram_endpoint()
method, send data and close the transport when we received the answer:
import asyncio
class EchoClientProtocol:
def __init__(self, message, loop):
self.message = message
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
loop = asyncio.get_event_loop()
loop.stop()
loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, loop),
remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()
18.5.4.3.4. UDP Echo サーバープロトコル¶
UDP echo server using the AbstractEventLoop.create_datagram_endpoint()
method, send back received data:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
transport.close()
loop.close()
18.5.4.3.5. プロトコルを使ってデータを待つオープンソケットの登録¶
Wait until a socket receives data using the
AbstractEventLoop.create_connection() method with a protocol, and then close
the event loop
import asyncio
try:
from socket import socketpair
except ImportError:
from asyncio.windows_utils import socketpair
# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()
class MyProtocol(asyncio.Protocol):
transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport (it will call connection_lost())
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed, stop the event loop
loop.stop()
# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Run the event loop
loop.run_forever()
# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()
参考
読み込みイベント用のファイル記述子の監視 の例では、ソケットのファイル記述子を登録するのに低水準の BaseEventLoop.add_reader() メソッドを使用しています。
ストリームを使ってデータを待つオープンソケットの登録 の例ではコルーチンの open_connection() 関数によって作成された高水準ストリームを使用しています。
