非同期 I/O (4/4) I/O 完了ポート

5. I/O 完了ポート

5.1 概要

I/O 完了ポート (I/O Completion Port, IOCP) を使った非同期 I/O は、特に高性能なサーバー開発に欠かせない知識です。ワーカースレッドプールと I/O 処理がうまく協調して動く、大変優れた方法です。

IOCP を使う典型的なコードは次 のような流れになります。

  1. IOCP を作成
    ・ 同時実行数 M 個を指定
    ・ CreateIoCompletionPort API で作成
  2. ワーカースレッドプールを作成
    ・ ワーカースレッドは GetQueuedCompletionStatus API の呼び出しでブロックする
    ・ スレッド数は 同時実行数 M 個+N 個 =M + N 個 (後述)
    ・ スレッドは普通に _beginthreadex や CreateThread で作成。
  3. ファイルを開く
    ・ CreateFile など (ソケットの場合 WSASocket)
  4. ファイルハンドルを IOCP に関連付ける
    ・ CreateIoCompletionPort API を用いて 1 で取得した IOCP ハンドルと 3 で取得したファイルハンドルを関連付ける
  5. I/O 要求を行う
    ・ ReadFile など (ソケットの場合 WSARecv や WSASend)
    ・ このとき OVERLAPPED 構造体を渡す (ソケットの場合 WSAOVERLAPPED)
  6. I/O が完了したときに、ワーカースレッドの GetQueuedCompletionStatus が制御を返す
    ・ このとき 5 で渡した OVERLAPPED 構造体へのアドレスや転送されたバイト数が取得できる

それでは、この流れを順番に見ていきます。ここでは概要と、私がコードを書いていて重要だと思った点にポイントを絞って説明します。

5.1.1 IOCP を作成

IOCP は CreateIoCompletionPort API を呼び出すことで作成します。

    g_hIOCP = CreateIoCompletionPort (
        INVALID_HANDLE_VALUE,
        NULL,
        NULL,
        NUMBER_OF_THREAD - 2);

IOCP はこのような呼び出しで作成します。

第四パラメータは同時実行スレッド数を指定します。ここで NUMBER_OF_THREAD というのは 5.1.2 で説明するワーカースレッドプール内のワーカースレッド数です。ここでは、それより、少し少ない数 (-2) を指定していることに注目してください。ワーカースレッドについては、5.1.2 で説明します。

IOCP が作成されるとき、Windows は 5 つのデータを作成します。

  • デバイスリスト
  • I/O 完了キュー
  • 待ちスレッドリスト
  • リリーススレッドリスト
  • 一時停止スレッドリスト

IOCP を作成した時点ではこれらのリストの中には何もエントリーが存在しません。


図1. デバイスリストと I/O 完了キュー 


図2. スレッドの管理用データ構造 

5.1.2 ワーカースレッドプールの作成

ワーカースレッドを作成します。

ワーカースレッドの起動は特に変わったことはなく、普通に _beginthreadex などで起動すれば OK です。

    HANDLE hThreads [NUMBER_OF_THREAD];
 
    for ( int i=0; i < NUMBER_OF_THREAD; i++ ) {
        UINT uThreadId;
        hThreads[i] = (HANDLE) _beginthreadex(
            NULL,
            NULL,
            WorkerThreadFunc,
            (void*) NULL,
            NULL,
            &uThreadId);
    }

このサンプルでは NUMBER_OF_THREAD という識別子で定義した数のワーカースレッドを作成しています。また、起動したスレッドのスレッド関数は、WorkerThreadFunc という名前の関数です。

IOCP に関連付けされる典型的なワーカースレッドは次のような形をしています。

unsigned int __stdcall WorkerThreadFunc (PVOID pv) {

    FOO* pFoo;
    LPOVERLAPPED pol = NULL;
    DWORD cbNumberOfBytesTransferred = 0;
    ULONG uCompletionKey;
    BOOL bRet;
 
    while (1) { 

        bRet = GetQueuedCompletionStatus (
            g_hIOCP
            &cbNumberOfBytesTransferred, 
            &uCompletionKey, 
            &pol, 
            INFINITE );
  
        if ( !bRet ) {
            // Error
        }

        //
        // OVERLAPPED 構造体のアドレスから、その OVERLAPPED データを
        // 含むユーザー定義のデータ構造 (ここでは FOO としている) FOO への
        // ポインタを取得する
        //
  
        pFoo = (FOO*) CONTAINING_RECORD (pol, FOO, ol);

        //
        // コンテキストに応じた処理
        // コンテキスト情報はユーザー定義の FOO に持たせておく。
        // 
        ...
    }

    return 0;
}

ワーカースレッドが開始すると、直ちに無限ループに入っています。そのはじめに GetQueuedCompletionStatus を呼び出します。この段階では何も I/O 要求を出していませんので、ここで GetQueuedCompletionStatus がブロックします。この時点で、このスレッドは上図 2 中の "待ちスレッドリスト (Waiting Thread List)"  に入ります。

少し話は戻りますが、5.1.1 で IOCP を作成したときに IOCP の同時実行数 (M 個) は 実際のワーカースレッド数 (M+N 個) より少ない数を指定しました。これはどういうことでしょうか。上図2 を見てください。スレッドに関してはデータ構造が三つあります。待ちスレッドキュー、リリーススレッドリスト、一時停止スレッドリストです。スレッドを起 動した直後にスレッドは、待ちスレッドキューに入ります。何らかの I/O が完了すると、キューの中のいずれかの GetQueuedCompletionStatus が制御を返します。すると、そのスレッドはリリーススレッドリストに入ります。ここで処理が滞りなく終われば、ワーカースレッドはループの中にいますか ら、また GetQueuedCompletionStatus を呼び出して待ちスレッドキューに入ります。一方、リリーススレッドリストに入って処理が開始したときに、そこで何らかの待機関数を呼び出して処理がブ ロックすると、そのスレッドは一時停止スレッドリストに入ります。このとき、リリーススレッドリストのエントリが削除されます。

IOCP 作成時の同時実行数は、I/O が完了した時点でリリーススレッドリスト内に存在するスレッドの最大数のことです。待機し、一時停止しているスレッドはカウン トされません。ですから、例えば リリーススレッドリスト内に M 個のスレッドがあるときに、あるスレッドが一時停止リストからリリーススレッドリストに復帰すると、瞬間的に M 個以上のスレッドが同時実行することになります。

IOCP ではこのような微妙なスレッド数の調整が、自動的に行われます。 

5.1.3 ファイルを開く

CreateFile するときには、やはり FILE_FLAG_OVERLAPPED フラグを指定して、非同期 I/O にします。

IOCP は特にサーバー系で使用されることが多いので、ソケットに関しても補足しますと WSASocket のフラグは WSA_FLAG_OVERLAPPED です。同じような名前なのでわかりやすいです。

5.1.4 ファイルハンドルを IOCP に関連付けする

ファイルハンドルを取得したら、それを IOCP に関連付けします。

    hIOCP = CreateIoCompletionPort ( hFile, g_hIOCP, NULL, 0);

    if ( !hIOCP ) {
        // エラー
    }

この操作によって、hFile で示したファイルハンドル (で識別されるデバイス) が、上図1のデバイスリストに入ります。

したがって、これで IOCP を介してワーカースレッドとファイルハンドルが関連付けされました。

5.1.5 I/O 要求を行う

I/O 要求は OVERLAPPED 構造体を渡して行います。これは従来の方法と変わりません。

    BOOL bRet = ReadFile (
        hFile,
        pContext->pBuffer,
        BUFFER_SIZE,
        NULL,
        &pContext->ol );
  
    if ( !bRet ) {
        DWORD dwGle = GetLastError (); 

        if ( ERROR_IO_PENDING != dwGle ) {
            // エラー
        }
    } 

ReadFile が TRUE を返せば直ちに読み出すデータがあることになりますし、FALSE を返し、かつ GetLastError が ERROR_IO_PENDING であれば正常に非同期 I/O が開始したことを示します。

5.1.6 I/O が完了したときに、ワーカースレッドの GetQueuedCompletionStatus が制御を返す

5.1.2 で見たように、ワーカースレッドそれぞれは GetQueuedCompletionStatus の呼び出しで停止しています。I/O が完了したときに、それが制御を返します。そのとき返ってくるのが、OVERLAPPED 構造体へのアドレスと、完了キー (Completion Key) や 転送バイト数などです。

通常は OVERLAPPED 構造体は、他のコンテキストデータに含めて使います。例えばこんな感じです。

typedef struct tagFOO {
     STATUS status;
     PBYTE    pData;
     OVERLAPPED ol;
} FOO, *PFOO;

GetQueuedCompletionStatus が返すのは ol のアドレスですから、次のようにして FOO の先頭アドレスを取得して、そのコンテキスト特有の動作を実行します。

    PFOO pFoo = (PFOO) CONTAINING_RECORD ( pol, FOO, ol );

このあたりは、IOCP のお話ではなく IOCP を利用するときのテクニック的な話ですから、もっと良い方法があったらそれを使えば良いと思います。 

5.2 コードの解説

サンプルコードはこちらです。

サンプルコー ドのダウンロード [iocp.zip, makefileを使用]

5.1 で要点は抑えてあると思いますので、ここではこのサンプルで使っているワーカースレッドの終了方法を説明します。

WinMain を終了するときに、次のように PostQueuedCompletionStatus を利用してスレッド数分 I/O 完了ステータスをポストします。

for ( int i=0; i<NUMBER_OF_THREAD; i++ ) {

    LPOVERLAPPED pOl = (LPOVERLAPPED) malloc ( sizeof (OVERLAPPED) );
    if ( !pOl ) {
        ...
    }
  
    PostQueuedCompletionStatus ( g_hIOCP, 0, COMPKEY_EXIT, pOl);
  
}

このとき、完了キーとして COMPKEY_EXIT という値をポストしています。これは単に 1 に定義されています。そしてワーカースレッドの方では、次のようにして GetQueuedCompletionStatus から渡された完了キーをチェックして、スレッド終了要求か確認しています。

unsigned int __stdcall WorkerThreadFunc (PVOID pv) {

    ... 

    while (1) {

        bRet = GetQueuedCompletionStatus (
            g_hIOCP, 
            &cbNumberOfBytesTransferred, 
            &uCompletionKey
            &pol, 
            INFINITE );
  
        ...

        if ( COMPKEY_EXIT == uCompletionKey ) {
            free ( pol );
            break;
        }

        ...
  
    }
    ...
    return 0;
}

 

以上、長々と非同期 I/O についてサンプルコードを含めて解説を試みてみました。

ここまでお読みいただき、誠にありがとうございます。SNS 等でこの記事をシェアしていただけますと、大変励みになります。どうぞよろしくお願いします。

© 2024 Web/DB プログラミング徹底解説