17.6. asyncore — 非同期ソケットハンドラ

ソースコード: Lib/asyncore.py


このモジュールは、非同期ソケットサービスのクライアント・サーバを開発するための基盤として使われます。

CPUが一つしかない場合、プログラムが"二つのことを同時に"実行する方法は二つしかありません。もっとも簡単で一般的なのはマルチスレッドを利用する方法ですが、これとはまったく異なるテクニックで、一つのスレッドだけでマルチスレッドと同じような効果を得られるテクニックがあります。このテクニックはI/O処理が中心である場合にのみ有効で、CPU負荷の高いプログラムでは効果が無く、この場合にはプリエンプティブなスケジューリングが可能なスレッドが有効でしょう。しかし、多くの場合、ネットワークサーバではCPU負荷よりはIO負荷が問題となります。

もしOSのI/Oライブラリがシステムコール select() をサポートしている場合(ほとんどの場合はサポートされている)、I/O処理は"バックグラウンド"で実行し、その間に他の処理を実行すれば、複数の通信チャネルを同時にこなすことができます。一見、この戦略は奇妙で複雑に思えるかもしれませんが、いろいろな面でマルチスレッドよりも理解しやすく、制御も容易です。 asyncore は多くの複雑な問題を解決済みなので、洗練され、パフォーマンスにも優れたネットワークサーバとクライアントを簡単に開発することができます。とくに、 asynchat のような、対話型のアプリケーションやプロトコルには非常に有効でしょう。

基本的には、この二つのモジュールを使う場合は一つ以上のネットワーク チャネルasyncore.dispatcher クラス、または asynchat.async_chat のインスタンスとして作成します。作成されたチャネルはグローバルマップに登録され、 loop() 関数で参照されます。 loop() には、専用の マップ を渡す事も可能です。

チャネルを生成後、 loop() を呼び出すとチャネル処理が開始し、最後のチャネル(非同期処理中にマップに追加されたチャネルを含む)が閉じるまで継続します。

asyncore.loop([timeout[, use_poll[, map[, count]]]])

ポーリングループを開始し、count 回が過ぎるか、全てのオープン済みチャネルがクローズされた場合のみ終了します。全ての引数はオプションです。引数 count のデフォルト値は None で、ループは全てのチャネルがクローズされた場合のみ終了します。引数 timeoutselect() または poll() の引数 timeout として渡され、秒単位で指定します。デフォルト値は 30 秒です。引数 use_poll が真の場合、 select() ではなく poll() が使われます (デフォルト値は False です)。

引数 map には、監視するチャネルをアイテムとして格納した辞書を指定します。チャネルがクローズされた時に map からそのチャネルが削除されます。 map が省略された場合、グローバルなマップが使用されます。チャネル (asyncore.dispatcher, asynchat.async_chat とそのサブクラス) は自由に混ぜて map に入れることができます。

class asyncore.dispatcher

dispatcher クラスは、低レベルソケットオブジェクトの薄いラッパーです。便宜上、非同期ループから呼び出されるイベント処理メソッドを追加していますが、これ以外の点では、non-blockingなソケットと同様です。

非同期ループ内で低レベルイベントが発生した場合、発生のタイミングやコネクションの状態から特定の高レベルイベントへと置き換えることができます。例えばソケットを他のホストに接続する場合、最初の書き込み可能イベントが発生すれば接続が完了した事が分かります(この時点で、ソケットへの書き込みは成功すると考えられる)。このように判定できる高レベルイベントを以下に示します:

Event 説明
handle_connect() 最初にreadもしくはwriteイベントが発生した時
handle_close() 読み込み可能なデータなしでreadイベントが発生した時
handle_accepted() listen中のソケットでreadイベントが発生した時

非同期処理中、マップに登録されたチャネルの readable() メソッドと writable() メソッドが呼び出され、 select()poll() でread/writeイベントを検出するリストに登録するか否かを判定します。

このようにして、チャネルでは低レベルなソケットイベントの種類より多くの種類のイベントを検出する事ができます。以下にあげるイベントは、サブクラスでオーバライドすることが可能です:

handle_read()

非同期ループで、チャネルのソケットの read() メソッドの呼び出しが成功した時に呼び出されます。

handle_write()

非同期ループで、書き込み可能ソケットが実際に書き込み可能になった時に呼び出される。このメソッドは、パフォーマンスの向上のためバッファリングを行う場合などに利用できます。例:

def handle_write(self):
    sent = self.send(self.buffer)
    self.buffer = self.buffer[sent:]
handle_expt()

out of band (OOB)データが検出された時に呼び出されます。OOBはあまりサポートされておらず、また滅多に使われないので、handle_expt() が呼び出されることはほとんどありません。

handle_connect()

ソケットの接続が確立した時に呼び出されます。"welcome"バナーの送信、プロトコルネゴシエーションの初期化などを行います。

handle_close()

ソケットが閉じた時に呼び出されます。

handle_error()

捕捉されない例外が発生した時に呼び出されます。デフォルトでは、短縮したトレースバック情報が出力されます。

handle_accept()

listen 中のチャネル (受動的にオープンしたもの) がリモートホストからの connect() で接続され、接続が確立した時に呼び出されます。

readable()

非同期ループ中に呼び出され、readイベントの監視リストに加えるか否かを決定します。デフォルトのメソッドでは True を返し、readイベントの発生を監視します。

writable()

非同期ループ中に呼び出され、writeイベントの監視リストに加えるか否かを決定します。デフォルトのメソッドでは True を返し、writeイベントの発生を監視します。

さらに、チャネルにはソケットのメソッドとほぼ同じメソッドがあり、チャネルはソケットのメソッドの多くを委譲・拡張しており、ソケットとほぼ同じメソッドを持っています。

create_socket(family, type)

引数も含め、通常のソケット生成と同じ。 socket モジュールを参照のこと。

connect(address)

通常のソケットオブジェクトと同様、address には一番目の値が接続先ホスト、2番目の値がポート番号であるタプルを指定します。

send(data)

リモート側の端点に data を送出します。

recv(buffer_size)

リモート側の端点より、最大 buffer_size バイトのデータを読み込みます。長さ0の文字列が返ってきた場合、チャネルはリモートから切断された事を示します。

recv()EAGAIN または EWOULDBLOCK による socket.error を起こしうることに注意してください。 select.select()select.poll() が、ソケットが読み込み出来る状態にあると報告したとしてもです。

listen(backlog)

ソケットへの接続を待つ。引数 backlog は、キューイングできるコネクションの最大数を指定します(1以上)。最大数はシステムに依存でします(通常は5)。

bind(address)

ソケットを address にバインドします。ソケットはバインド済みであってはなりません。 (address の形式は、アドレスファミリに依存します。 socket モジュールを参照のこと。) ソケットを再利用可能にする (SO_REUSEADDR オプションを設定する) には、 dispatcher オブジェクトの set_reuse_addr() メソッドを呼び出してください。

accept()

接続を受け入れます。ソケットはアドレスにバインド済みであり、listen() で接続待ち状態でなければなりません。戻り値は None(conn, address) のペアで、conn はデータの送受信を行う 新しい ソケットオブジェクト、address は接続先ソケットがバインドされているアドレスです。None が返された場合、接続が起こらなかったことを意味します。その場合、サーバーはこのイベントを無視して後続の接続を待ち続けるべきです。

close()

ソケットをクローズします。以降の全ての操作は失敗します。リモート端点では、キューに溜まったデータ以外、これ以降のデータ受信は行えません。ソケットはガベージコレクト時に自動的にクローズされます。

class asyncore.dispatcher_with_send

dispatcher のサブクラスで、シンプルなバッファされた出力を持ちます。シンプルなクライアントプログラムに適しています。もっと高レベルな場合には asynchat.async_chat を利用してください。

class asyncore.file_dispatcher

file_dispatcher はファイルデスクリプタか ファイルオブジェクト とオプションとして map を引数にとって、 poll()loop() 関数で利用できるようにラップします。与えられたファイルオブジェクトなどが fileno() メソッドを持っているとき、そのメソッドが呼び出されて戻り値が file_wrapper のコンストラクタに渡されます。利用できるプラットフォーム: UNIX。

class asyncore.file_wrapper

file_wrapper は整数のファイルデスクリプタを受け取って os.dup() を呼び出してハンドルを複製するので、元のハンドルは file_wrapper と独立してclose されます。このクラスは file_dispatcher クラスが使うために必要なソケットをエミュレートするメソッドを実装しています。利用できるプラットフォーム: UNIX。

17.6.1. asyncoreの例: 簡単なHTTPクライアント

基本的なサンプルとして、以下に非常に単純なHTTPクライアントを示します。このHTTPクライアントは dispatcher クラスでソケットを利用しています:

import asyncore, socket

class HTTPClient(asyncore.dispatcher):

    def __init__(self, host, path):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect( (host, 80) )
        self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path

    def handle_connect(self):
        pass

    def handle_close(self):
        self.close()

    def handle_read(self):
        print self.recv(8192)

    def writable(self):
        return (len(self.buffer) > 0)

    def handle_write(self):
        sent = self.send(self.buffer)
        self.buffer = self.buffer[sent:]


client = HTTPClient('www.python.org', '/')
asyncore.loop()

17.6.2. 基本的な echo サーバーの例

この例の基本的な echoサーバーは、 dispatcher を利用して接続を受けつけ、接続をハンドラーにディスパッチします:

import asyncore
import socket

class EchoHandler(asyncore.dispatcher_with_send):

    def handle_read(self):
        data = self.recv(8192)
        if data:
            self.send(data)

class EchoServer(asyncore.dispatcher):

    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)

    def handle_accept(self):
        pair = self.accept()
        if pair is not None:
            sock, addr = pair
            print 'Incoming connection from %s' % repr(addr)
            handler = EchoHandler(sock)

server = EchoServer('localhost', 8080)
asyncore.loop()