Web/DB プログラミング徹底解説

ホーム > 雑記帳 > マルチスレッド [Linux]

マルチスレッド [Linux]

スポンサーリンク

 Linux 上ではプロセスの起動コストが低いため、マルチプロセスのデザインが多いらしいですが、Windowsではプロセス起動はなるべく避けたいところです。マ ルチスレッドによる設計ができないと話になりません。動機は明確なので、さっそく本題に入ります。

まだざっとしか見ていませんが、Linux でのスレッディングについては "The Native POSIX Thread Library for Linux" (2003) などによれば、つい最近でもいろいろな問題点がある(あった)ようではありますが、これはまぁ、スレッドライブラリの呼び出し側の問題ではないのでここで は気にしないことにします。POSIX 標準のソースの互換性を保ちながら、カーネルがしっかりとサポートしてくれることを期待したいところです。ここでは POSIX pthread インターフェイスを使います。

1. スレッドの生成

int pthread_create (
    pthread_t *,
    const pthread_attr_t *,
    void *(*)(void *),
    void *);

ここで、パラメータは次の通りです。

  • pthread_t にはスレッドの起動が成功したときにスレッドの識別子が書き込まれます。現在のところ、pthread_t は sys/types.h で __uint32_t に define されています。
  • pthread_attr_t はスレッドの属性です。NULL を pthread_create に渡すとデフォルトの属性が使われます。
    使い方は pthread_attr_t を pthread_attr_init で初期化して、さらに pthread_attr_set* 系関数で属性を設定することになりますが、今のところここは深入りしないことにします。
  • void *(*)(void*) は void 型のポインタをひとつ引数に取り、void 型のポインタを返すスレッド関数です。スレッドのスタートアップとしてこちらが呼ばれます。
  • 第四引数はスレッド関数への引数です。

この関数は成功すると 0 を返します。

関数から抜けたときには、pthread_exit があたかも呼ばれたかのような振る舞いをすることになります。ちなみに、メインスレッドはこの方法とは異なり、exit() で返り値を返すのと同様に振舞います。

2. 同期

特定のスレッドの終了を待つときは次を使います。

int pthread_join (pthread_t, void **);

ここで引数は次の通り:

  • pthread_t は pthread_create の第一引数のスレッド識別子です。
  • 二つ目の引数には待っていたスレッドが pthread_exit に渡した返り値が入ります。

pthread_join は第一パラメータで指定したスレッドが終了するまで待ちます。

ここまでのサンプルコードを示します。

#include <pthread.h>
#include <iostream>
using namespace std;

void* thread_func (void* pv) {

    char* pch = (char*) pv;
    *pch = 'z';
   
    cout << "[child] " << *pch << endl;

}

int main (int argc, char* argv[]) {

    pthread_t         thread;
    pthread_attr_t  thread_attr;
    char                  chReturn       = 'a';
    void*                thread_result;
    int                    nRet             = 0;

    cout << "[main] " << chReturn << endl;
   
    nRet = pthread_create ( &thread, NULL, thread_func, (void*) &chReturn );

    if ( 0 != nRet ) {
        cout << "Failed to create a new thread." << endl;
    }

    pthread_join ( thread, &thread_result );

    cout << "[main] " << chReturn << endl;

    return 0;
}

ここでは上記のコードを thread.cpp としています。ビルドと実行結果は次の通り:

$ cc -D_REENTRANT thread.cpp -lstdc++ -lpthread -o threadtest.exe

$ ./threadtest.exe
[main] a
[child] z
[main] z

3. クリティカルセクションの保護

Win32 の場合は最も良く使われるのは最軽量の CRITICAL_SECTION でしょう。CRITICAL_SECTION に対して InitializeCriticalSection、EnterCriticalSectiton、LeaveCriticalSection および LeaveCriticalSection を使い、クリティカルセクションに対してスレッド間の排他制御を行います。POSIX のインターフェイスとしては、セマフォとミューテックスが用意されているようです。Win32 ではこれらはカーネルオブジェクトとして実装されているために少々重く、プロセス間の排他制御のシナリオなど必要に迫られて使うものでした。

Lock Convoy の検出と解決: ちょっと脱線かつ Win32 の話ですが、Lock convoy とか Boxcar 問題として知られる問題については次のブログに詳しく書いてます。
http://blogs.msdn.com/sloh/archive/2005/05/27/422605.aspx

3.1 ミューテックスを使う方法

クリティカルセクションへの排他制御を行うには、ミューテックス(Mutex)がもっとも単純でしょう。Mutex は POSIX pthread ライブラリの一部として定義されています。

mutex の作成と破棄

int pthread_mutex_init (
    pthread_mutex_t *,
    const pthread_mutexattr_t *);

  • 第一引数は初期化する mutex を指定します。ちなみに、pthread_mutex_t も現在は sys/types.h で __uint32_t に define されています。
  • 第二引数にミューテックスの属性を指定します。NULL はデフォルトの属性を表します。

成功すると 0 を返します。また、使い終わったらmutex を破棄しますが単に次の関数に渡すだけです。

int pthread_mutex_destroy (
    pthread_mutex_t *);

mutex の取得と解放

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

名前から明らかですね。lock と unlock です。成功するとともに 0 を返します。

mutex を使うサンプルコードを示します。ここでは、条件コンパイルによってグローバル変数である g_nTest を 「ミューテックスによって保護するバージョン」 と 「保護しないバージョン」 の二通りを作成できるようにしてあります。コードは少々ごちゃごちゃして読みにくいですが、やっていることは単純です。

#include <pthread.h>
#include <iostream>
using namespace std;

#define     THREAD_NUMBER (10)
pthread_t  g_ptThreadIds [THREAD_NUMBER];
int            g_nTest = 0;

#ifdef USEMUTEX
pthread_mutex_t g_Mutex;
#endif

void* funcinc (void* pv) {

    cout << "+";
   
    for ( int i=0; i<50000; i++ ) {
#ifdef USEMUTEX
        pthread_mutex_lock ( &g_Mutex );
#endif
        g_nTest++;
#ifdef USEMUTEX
        pthread_mutex_unlock ( &g_Mutex );
#endif
    }

    cout << "(+)";

}

void* funcdec ( void* pv ) {

    cout << "-";
   
    for ( int i=0; i<50000; i++) {

#ifdef USEMUTEX
        pthread_mutex_lock ( &g_Mutex );
#endif
        g_nTest--;
#ifdef USEMUTEX
        pthread_mutex_unlock ( &g_Mutex );
#endif
    }

    cout << "(-)";

}

void JoinMultipleThreads ( pthread_t* ppthread, int nThreadNum ) {

    void* thread_result;
   
    for ( int i=0; i<nThreadNum; i++ ) {
        pthread_join ( ppthread[i], &thread_result );
    }
   
}

int main (int argc, char* argv[]) {

    pthread_attr_t thread_attr;
    int nRet = 0;

#ifdef USEMUTEX
    nRet = pthread_mutex_init ( &g_Mutex, NULL);
#endif

    if ( nRet ) {
        cout << "Failed to create a mutex.";
        exit ( EXIT_FAILURE );
    }
   
    for ( int i=0; i<THREAD_NUMBER; i++ ) {
        if ( i % 2 ) {
            nRet = pthread_create ( &g_ptThreadIds[i], NULL, funcinc,  NULL );
        }
        else {
            nRet = pthread_create ( &g_ptThreadIds[i], NULL, funcdec,  NULL );
        }
        if ( 0 != nRet ) {
            cout << "Failed to create a new thread." << endl;
            abort ();
        }
    }

    JoinMultipleThreads ( g_ptThreadIds, THREAD_NUMBER );

    cout << endl;
    cout << "Result: " << g_nTest << endl;

#ifdef USEMUTEX
    pthread_mutex_destroy ( &g_Mutex );
#endif

    return 0;
}

$ cc -D_REENTRANT mutextest.cpp -lstdc++ -lpthread -o mutextest.exe

$ ./mutextest.exe
-++-+-+-+-(+)(+)(-)(+)(-)(+)(-)(+)(-)(-)
Result: -60949

$ ./mutextest.exe
-(-)+(+)-(-)+(+)-(-)+(+)-(-)+-(+)+(-)(+)
Result: 4840

$ ./mutextest.exe
-(-)+(+)-(-)+(+)-(-)+(+)-(-)+-(+)+(-)(+)
Result: -19533

結果は上記のように 不安定です。

$ cc -D_REENTRANT -DUSEMUTEX mutextest.cpp -lstdc++ -lpthread -o mutextest.exe

$ ./mutextest.exe
+-+--+-+-+(+)(-)(-)(+)(-)(-)(+)(+)(+)(-)
Result: 0

$ ./mutextest.exe
-+-+-+-+-+(-)(-)(+)(+)(+)(-)(-)(+)(+)(-)
Result: 0

$ ./mutextest.exe
-+-+-+-+-+(-)(+)(-)(+)(+)(-)(-)(+)(+)(-)
Result: 0

3.2 セマフォを使う方法

semaphore.h を使いますが、こちらはまた後で使う機会が来たら書き加えようと思います。

int sem_init (sem_t * sem, int pshared, unsigned int value);
int sem_destroy (sem_t * sem);
sem_t *sem_open (const char *name, int oflag, ...);
int sem_close (sem_t *sem);
int sem_wait (sem_t * sem);
int sem_trywait (sem_t * sem);
int sem_timedwait (sem_t * sem, const struct timespec *abstime);
int sem_post (sem_t * sem);
int sem_getvalue (sem_t * sem, int *sval);

スポンサーリンク