非同期 I/O (4/4) I/O 完了ポート
5. I/O 完了ポート
5.1 概要
I/O 完了ポート (I/O Completion Port, IOCP) を使った非同期 I/O は、特に高性能なサーバー開発に欠かせない知識です。ワーカースレッドプールと I/O 処理がうまく協調して動く、大変優れた方法です。
IOCP を使う典型的なコードは次 のような流れになります。
- IOCP を作成
・ 同時実行数 M 個を指定
・ CreateIoCompletionPort API で作成 - ワーカースレッドプールを作成
・ ワーカースレッドは GetQueuedCompletionStatus API の呼び出しでブロックする
・ スレッド数は 同時実行数 M 個+N 個 =M + N 個 (後述)
・ スレッドは普通に _beginthreadex や CreateThread で作成。 - ファイルを開く
・ CreateFile など (ソケットの場合 WSASocket) - ファイルハンドルを IOCP に関連付ける
・ CreateIoCompletionPort API を用いて 1 で取得した IOCP ハンドルと 3 で取得したファイルハンドルを関連付ける - I/O 要求を行う
・ ReadFile など (ソケットの場合 WSARecv や WSASend)
・ このとき OVERLAPPED 構造体を渡す (ソケットの場合 WSAOVERLAPPED) - I/O が完了したときに、ワーカースレッドの
GetQueuedCompletionStatus が制御を返す
・ このとき 5 で渡した OVERLAPPED 構造体へのアドレスや転送されたバイト数が取得できる
それでは、この流れを順番に見ていきます。ここでは概要と、私がコードを書いていて重要だと思った点にポイントを絞って説明します。
5.1.1 IOCP を作成
IOCP は CreateIoCompletionPort API を呼び出すことで作成します。
g_hIOCP =
CreateIoCompletionPort (
INVALID_HANDLE_VALUE,
NULL,
NULL,
NUMBER_OF_THREAD
- 2);
IOCP はこのような呼び出しで作成します。
第四パラメータは同時実行スレッド数を指定します。ここで NUMBER_OF_THREAD というのは 5.1.2 で説明するワーカースレッドプール内のワーカースレッド数です。ここでは、それより、少し少ない数 (-2) を指定していることに注目してください。ワーカースレッドについては、5.1.2 で説明します。
IOCP が作成されるとき、Windows は 5 つのデータを作成します。
- デバイスリスト
- I/O 完了キュー
- 待ちスレッドリスト
- リリーススレッドリスト
- 一時停止スレッドリスト
IOCP を作成した時点ではこれらのリストの中には何もエントリーが存在しません。
図1. デバイスリストと I/O 完了キュー
図2. スレッドの管理用データ構造
5.1.2 ワーカースレッドプールの作成
ワーカースレッドを作成します。
ワーカースレッドの起動は特に変わったことはなく、普通に _beginthreadex などで起動すれば OK です。
HANDLE
hThreads [NUMBER_OF_THREAD];
for ( int i=0; i <
NUMBER_OF_THREAD; i++ ) {
UINT uThreadId;
hThreads[i] = (HANDLE) _beginthreadex(
NULL,
NULL,
WorkerThreadFunc,
(void*) NULL,
NULL,
&uThreadId);
}
このサンプルでは NUMBER_OF_THREAD という識別子で定義した数のワーカースレッドを作成しています。また、起動したスレッドのスレッド関数は、WorkerThreadFunc という名前の関数です。
IOCP に関連付けされる典型的なワーカースレッドは次のような形をしています。
unsigned int __stdcall WorkerThreadFunc
(PVOID pv) {
FOO* pFoo;
LPOVERLAPPED pol = NULL;
DWORD cbNumberOfBytesTransferred = 0;
ULONG uCompletionKey;
BOOL bRet;
while (1) {
bRet = GetQueuedCompletionStatus
(
g_hIOCP,
&cbNumberOfBytesTransferred,
&uCompletionKey,
&pol,
INFINITE );
if ( !bRet ) {
//
Error
}
//
// OVERLAPPED 構造体のアドレスから、その OVERLAPPED データを
// 含むユーザー定義のデータ構造 (ここでは FOO としている) FOO への
// ポインタを取得する
//
pFoo
= (FOO*) CONTAINING_RECORD (pol, FOO, ol);
//
// コンテキストに応じた処理
// コンテキスト情報はユーザー定義の FOO に持たせておく。
//
...
}
return 0;
}
ワーカースレッドが開始すると、直ちに無限ループに入っています。そのはじめに GetQueuedCompletionStatus を呼び出します。この段階では何も I/O 要求を出していませんので、ここで GetQueuedCompletionStatus がブロックします。この時点で、このスレッドは上図 2 中の "待ちスレッドリスト (Waiting Thread List)" に入ります。
少し話は戻りますが、5.1.1 で IOCP を作成したときに IOCP の同時実行数 (M 個) は 実際のワーカースレッド数 (M+N 個) より少ない数を指定しました。これはどういうことでしょうか。上図2 を見てください。スレッドに関してはデータ構造が三つあります。待ちスレッドキュー、リリーススレッドリスト、一時停止スレッドリストです。スレッドを起 動した直後にスレッドは、待ちスレッドキューに入ります。何らかの I/O が完了すると、キューの中のいずれかの GetQueuedCompletionStatus が制御を返します。すると、そのスレッドはリリーススレッドリストに入ります。ここで処理が滞りなく終われば、ワーカースレッドはループの中にいますか ら、また GetQueuedCompletionStatus を呼び出して待ちスレッドキューに入ります。一方、リリーススレッドリストに入って処理が開始したときに、そこで何らかの待機関数を呼び出して処理がブ ロックすると、そのスレッドは一時停止スレッドリストに入ります。このとき、リリーススレッドリストのエントリが削除されます。
IOCP 作成時の同時実行数は、I/O が完了した時点でリリーススレッドリスト内に存在するスレッドの最大数のことです。待機し、一時停止しているスレッドはカウン トされません。ですから、例えば リリーススレッドリスト内に M 個のスレッドがあるときに、あるスレッドが一時停止リストからリリーススレッドリストに復帰すると、瞬間的に M 個以上のスレッドが同時実行することになります。
IOCP ではこのような微妙なスレッド数の調整が、自動的に行われます。
5.1.3 ファイルを開く
CreateFile するときには、やはり FILE_FLAG_OVERLAPPED フラグを指定して、非同期 I/O にします。
IOCP は特にサーバー系で使用されることが多いので、ソケットに関しても補足しますと WSASocket のフラグは WSA_FLAG_OVERLAPPED です。同じような名前なのでわかりやすいです。
5.1.4 ファイルハンドルを IOCP に関連付けする
ファイルハンドルを取得したら、それを IOCP に関連付けします。
hIOCP =
CreateIoCompletionPort ( hFile, g_hIOCP, NULL, 0);
if ( !hIOCP ) {
//
エラー
}
この操作によって、hFile で示したファイルハンドル (で識別されるデバイス) が、上図1のデバイスリストに入ります。
したがって、これで IOCP を介してワーカースレッドとファイルハンドルが関連付けされました。
5.1.5 I/O 要求を行う
I/O 要求は OVERLAPPED 構造体を渡して行います。これは従来の方法と変わりません。
BOOL bRet
= ReadFile (
hFile,
pContext->pBuffer,
BUFFER_SIZE,
NULL,
&pContext->ol );
if ( !bRet ) {
DWORD dwGle = GetLastError ();
if ( ERROR_IO_PENDING != dwGle ) {
//
エラー
}
}
ReadFile が TRUE を返せば直ちに読み出すデータがあることになりますし、FALSE を返し、かつ GetLastError が ERROR_IO_PENDING であれば正常に非同期 I/O が開始したことを示します。
5.1.6 I/O が完了したときに、ワーカースレッドの GetQueuedCompletionStatus が制御を返す
5.1.2 で見たように、ワーカースレッドそれぞれは GetQueuedCompletionStatus の呼び出しで停止しています。I/O が完了したときに、それが制御を返します。そのとき返ってくるのが、OVERLAPPED 構造体へのアドレスと、完了キー (Completion Key) や 転送バイト数などです。
通常は OVERLAPPED 構造体は、他のコンテキストデータに含めて使います。例えばこんな感じです。
typedef struct tagFOO {
STATUS status;
PBYTE pData;
OVERLAPPED ol;
} FOO, *PFOO;
GetQueuedCompletionStatus が返すのは ol のアドレスですから、次のようにして FOO の先頭アドレスを取得して、そのコンテキスト特有の動作を実行します。
PFOO pFoo = (PFOO) CONTAINING_RECORD ( pol, FOO, ol );
このあたりは、IOCP のお話ではなく IOCP を利用するときのテクニック的な話ですから、もっと良い方法があったらそれを使えば良いと思います。
5.2 コードの解説
サンプルコードはこちらです。
サンプルコー ドのダウンロード [iocp.zip, makefileを使用]
5.1 で要点は抑えてあると思いますので、ここではこのサンプルで使っているワーカースレッドの終了方法を説明します。
WinMain を終了するときに、次のように PostQueuedCompletionStatus を利用してスレッド数分 I/O 完了ステータスをポストします。
for ( int i=0; i<NUMBER_OF_THREAD;
i++ ) {
LPOVERLAPPED pOl = (LPOVERLAPPED) malloc
( sizeof (OVERLAPPED) );
if ( !pOl ) {
...
}
PostQueuedCompletionStatus
( g_hIOCP, 0, COMPKEY_EXIT,
pOl);
}
このとき、完了キーとして COMPKEY_EXIT という値をポストしています。これは単に 1 に定義されています。そしてワーカースレッドの方では、次のようにして GetQueuedCompletionStatus から渡された完了キーをチェックして、スレッド終了要求か確認しています。
unsigned int __stdcall WorkerThreadFunc
(PVOID pv) {
...
while (1) {
bRet = GetQueuedCompletionStatus
(
g_hIOCP,
&cbNumberOfBytesTransferred,
&uCompletionKey,
&pol,
INFINITE );
...
if ( COMPKEY_EXIT == uCompletionKey ) {
free ( pol );
break;
}
...
}
...
return 0;
}
以上、長々と非同期 I/O についてサンプルコードを含めて解説を試みてみました。