- スレッドとは
- スレッドとプロセス
- Pthreads
- データ競合(data races)
- 相互排除(mutual exclusion)
- デッドロック(deadlock)
- 食事する哲学者(dining philosophers)
- 条件変数(condition variables)
- セマフォ(semaphore)
- 参考
スレッドとは
現代のコンピュータには通常、複数のプロセッサ(CPU)が内蔵されています。オペレーティングシステムは、スレッドと呼ばれる複数のプロセッサをユーザレベルで抽象化したようなものを提供しています。
スレッドとは、プロセス内部で実行されるひとまとまりの命令です。複数のスレッドを同時に動かすことで並行性(concurrency)を実現することができます。
プロセスは複数のスレッドを持つことができます。1つのプロセスには複数のスレッドを含めることができます。同じプロセス内のスレッドは、同じ仮想アドレス空間(テキスト、データ、ヒープ領域)にあるグローバルデータやファイルディスクリプタテーブルを共有します。
個々のスレッドにはプログラムカウンタ、スタック、レジスタなど、それぞれが実行する手続きのために必要な情報が分け与えられます。
スレッドとプロセス
OSのスケジューラがプロセスの間の切り替えを行うのとほとんど同じ方法で、スレッドマネージャはスレッドのタイムスライスを行います。そのほかにもスレッドとプロセスには類似点が多いため、スレッドは軽量プロセスとも呼ばれることがあります。
プロセス
- 隔離された仮想アドレス空間(セキュリティと安定性がある一方、情報の共有が難しい)
- fork-execスタイルで簡単に外部プログラムを実行できる
- 同じプログラム内での複数タスクの協調(coordination)が難しい
スレッド
- 仮想アドレス空間を共有(セキュリティと安定性は劣るが、情報の共有がしやすい)
- 外部プログラムを簡単に実行できない
- 1つのプログラム内で複数タスクの協調がしやすい
Pthreads
ANSI Cにはスレッドのネイティブサポートがありません。代わりにUNIXやLinuxはpthread(POSIX thread)を提供します。
一般的なスレッドの使い方を見てみましょう。複数のスレッドを使って1つの関数を並行して実行します。
#include <stdio.h> #include <pthread.h> static void *thread_routine(void *arg) { int *np = (int *)arg; for (int i = 0; i < 1000000; i++) *np += 1; return NULL; } int main(void) { pthread_t threads[4]; int n = 0; for (size_t i = 0; i < 4; i++) pthread_create(&threads[i], NULL, thread_routine, &n); for (size_t j = 0; j < 4; j++) pthread_join(threads[j], NULL); printf("n: %d\n", n); return 0; }
このコードは変数 n
を100万回インクリメントする関数 thread_routine
を4つのスレッドで並行して実行しています。
スレッドが最初に実行する関数の引数と返り値は共に void *
型であることが、pthreadによって定義されています。どのような用途であってもpthreadを使いやすくするための、C言語によるジェネリックプログラミングの一例です。
pthread_create
によりスレッドを生成します。
#include <pthread.h> int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_rtn)(void *), void *restrict arg);
- 各スレッドは
pthread_t
型のスレッドIDを持つ - 第1引数:生成したスレッドのIDを格納するための
pthread_t
へのポインタ - 第2引数:スレッドの優先順位や他の属性(attributes)を設定するために使用する。設定することがない場合はNULLを渡す
- 第3引数:スレッドが最初に実行する関数
- 第4引数:最初に実行する関数の引数
pthread_join
により、指定したスレッドが実行を終了するまでその場で待機することができます。待機をせず、main関数がリターンしプロセスが終了してしまうと、同じプロセスのアドレス空間にあるスレッドも全て終了してしまいます。
#include <pthread.h> int pthread_join(pthread_t thread, void **rval_ptr);
以下、コードの出力結果です。
$ ./a.out 1157101
4つのスレッドでnを100万回ずつインクリメントしているので、nは最終的に400万になるはずですが、実際には100万前後の値が出力されてしまいます。なぜでしょう。
データ競合(data races)
適切な同期(synchronization)がないと2つ以上のスレッドによる共有メモリへの書き込みは未定義の動作になります。
先ほどのプログラムでは、メインスレッドのスタック領域にある変数nへのポインタを複数のスレッドに渡して書き込みを行っていたことが、未定義の結果の原因になっています。
*np += 1;
という1つの命令は一見不可分(atomic)であるように見えますが、実はこの命令1つにも競合状態(race condition)があります。実際 gcc -S
で生成したアセンブリコードを覗くと、*np += 1;
の箇所が3つの命令に分かれていることがわかります。
movl (%rax), %ecx ; load
addl $1, %ecx ; add
movl %ecx, (%rax) ; store
(x86-64 AT&T syntax)
ALUによる演算はコア専用のレジスタの上で行われます。そのため一度 %rax
が持つアドレスにある変数の値(32bit)をメモリから %ecx
に取り出し、一時的なレジスタで加算し、ふたたびメモリに格納しています。
2つのスレッドがほぼ同時に変数 n
をインクリメントしていることを想像してみてください。
それぞれがメモリから同じ値をレジスタに転送し、個別のレジスタでインクリメントを行ってしまうと、同じ結果として上書きされてしまいます。
incl (%rax)
のような演算とデータ転送が1つの命令にまとめられているアセンブリコードにも競合状態が当てはまります。そのような場合であっても、プロセッサは複雑な命令を見えないところでより小さなマイクロコードに分割し、実行速度の向上などを図ります。 incl (%rax)
も、データ転送、演算、データ転送のようなフェーズに分けられ実行されます。
マルチスレッドにおける正しい同期の基本法則:
2つ以上のスレッドが並行して非アトミックなオブジェクトにアクセスする場合、それらはすべて読み出し操作(read)でなければならない。そうでなければ、プログラムは未定義の動作を引き起こす。
相互排除(mutual exclusion)
相互排除とは、ある資源(今回の場合は変数n)にアクセスできるスレッドの数を多くても1つにすることを意味します。最初のプログラムも、このポリシーを適用すればデータに一貫性をもたせることができます。
Pthreadでは pthread_mutex_t
型のmutexというオブジェクトを使うことで相互排除を実現できます。mutexはよくロック(lock)と呼ばれることがあります(正確にはその一種であり他にもロックはあります)。
mutexの使い方自体は単純です。共有資源にアクセスする前にロック(lock)し、完了したらロックを解除(unlock)します。このように相互排除したい領域のことを「クリティカルセクション(critical section)」「際どい部分」と呼びます。
mutexを使って先ほどのコードを修正します。
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static void *thread_routine(void *args) { int *np = (int *)args; for (int i = 0; i < 1000000; i++) { pthread_mutex_lock(&lock); *np += 1; /* 際どい部分 */ pthread_mutex_unlock(&lock); } return NULL; } /* ... */
pthread_mutex_t
型のmutexを初期化するためには、最初に PTHREAD_MUTEX_INITIALIZER
を代入するか、 pthread_mutex_init
を呼び出す必要があります。
pthread_mutex_lock
でスレッドAがmutexをロックしている間にスレッドBが同じmutexをロックしようとすると、スレッドBはブロックしその場で待機します。
pthread_mutex_unlock
でmutexが解除されるときにその場で複数のスレッドが待機していた場合、それらのスレッドは再びプロセッサ時間の対象となります。その中で最初に実行されたスレッドのみがmutexをふたたびロックすることができ、次の処理へと続行します。mutexを取得できなかった他のスレッドは、それがふたたび解除されるまで待機状態に戻ります。このようにして、一度に1つのスレッドのみがクリティカルセクションへと進行します。
修正されたコードを実行すると、正しい結果になっていることが確認できます。また、相互排除により待ち時間が発生するため、全ての処理が終わるまでにより時間がかかることも確認できます。クリティカルセクションはボトルネックになることが大いにあるので、小さいほうが望ましいです。
デッドロック(deadlock)
マルチスレッドプログラミングを正しく行うためには、競合状態の他にもデッドロックの可能性を考慮する必要があります。
デッドロックとは、各スレッドが別のスレッドによって保持されている資源を(永久に)待機している状態のことを指します。
上の図は資源の所有権と待機のサイクルを描いています。このようなサイクルが発生すると、2つのスレッドが相互に保持している資源へのアクセスを待ってしまうため、どちらも進行できない状態になってしまいます。
そのほかにも標準のmutexの場合、1つのスレッドが同じmutexを連続でロックしようとするとデッドロックが発生します。そのスレッドは自分自身がmutexを解除するまで永久に待機してしまいます。
食事する哲学者(dining philosophers)
デッドロックの可能性とそれを回避する方法を説明するために昔からよく使われてきたシナリオです。仕組みは次のとおりです。
- 5人の哲学者が、考えながら円卓に座っている
- テーブルにはスパゲッティをのせた大皿があり、その周りには哲学者と同じ数のフォークが配置されている
- やがて個々の哲学者は空腹になり、食事しようとする
- スパゲッティをよそい食べるには左右2本のフォーク必要とする
- となりの哲学者が使用中のフォークは取得できない
- 哲学者は会話をせず、思考と食事を繰り返す
プログラム内のシミュレーションでは、1人の哲学者を1つのスレッドとし、1つのフォークを1つのmutexとします。哲学者をスレッドにすることで、一人ひとりが自立して思考と食事を繰り返すことを表現し、フォークをmutexにすることで、となりの哲学者がフォークを使用している間はその場で待機しなければならないことを表現します。
5人の哲学者を5つのスレッドに見立て、それぞれ並行に思考と食事のループを3回行います。
static const int num_philosophers = 5; static const int num_meals = 3; struct threadinfo { int id; pthread_mutex_t *left; pthread_mutex_t *right; }; /* ... */ static void *philosopher(void *arg) { struct threadinfo *tip = arg; for (int i = 0; i < num_meals; i++) { think(tip->id); eat(tip->id, tip->left, tip->right); } return NULL; } int main(void) { pthread_mutex_t forks[num_philosophers]; pthread_t philosophers[num_philosophers]; struct threadinfo ti[num_philosophers]; for (int i = 0; i < num_philosophers; i++) { pthread_mutex_init(&forks[i], NULL); ti[i].id = i; ti[i].left = &forks[i]; ti[i].right = &forks[(i + 1) % num_philosophers]; pthread_create(&philosophers[i], NULL, philosopher, (void *)&ti[i]); } for (int j = 0; j < num_philosophers; j++) pthread_join(philosophers[j], NULL); return 0; }
哲学者とフォークに0から4までのidを振り分けます。0番目の哲学者の場合、左に0番目のフォークがあり右に1番目のフォークがあります。円卓で食事するため、4番目の哲学者の右には0番目のフォークがあります。
思考と食事は以下のような関数で表現します。
static void think(int id) { printf("%d is thinking\n", id); usleep(get_think_time()); } static void eat(int id, pthread_mutex_t *left, pthread_mutex_t *right) { pthread_mutex_lock(left); pthread_mutex_lock(right); printf("%d is eating\n", id); usleep(get_eat_time()); pthread_mutex_unlock(left); pthread_mutex_unlock(right); }
哲学者が食事をするときは必ず左のフォークから手に取ります。思考と食事に要する時間は毎回ランダムです。
プログラムを数回実行してみても、正常に動作しているように見えます。しかしこのプログラムには1つ問題点があります。もし5人の哲学者が同時に左のフォークを取得することに成功したら、全員が使用中である右のフォークを待つことになり立ち往生するでしょう。
以下のように、左のフォークを取得する行のあとにsleepを差し込むと、デッドロックが起こる状況を再現することができます。
static void eat(int id, pthread_mutex_t *left, pthread_mutex_t *right) { pthread_mutex_lock(left); sleep(1); // 人工的にデッドロックを起こしたい pthread_mutex_lock(right); printf("%d is eating\n", id); pthread_mutex_unlock(left); pthread_mutex_unlock(right); }
ソースコード:dining_philosophers_deadlock.c
許可証(permits)による解法
デッドロックは共有資源をめぐって競争するスレッドの数を抑える仕組みを導入することで防ぐことができます。
食事する哲学者の場合、共有資源はフォークになります。全員が同時にフォークの取得を試みることがなければデッドロックは起きません。
そのためいくつか合理的な制約が考えられます。
例えば、5人の哲学者の場合フォークは5つしかないので、3人が同時に食事することは不可能です。同時に食事を試みる哲学者の数を2人に制限することが考えられます。
もしくは、最大4人の哲学者にフォークを取らせてもデッドロックは起きないといえます。5人全員がフォークを取ろうとすることがなければ、少なくとも最後の1人が両方のフォークを取得することに成功するからです。
今回は 哲学者の数 - 1
枚の許可証(permits)を導入します。哲学者は許可証を一枚取得しないと、左右のフォークを取りにいけないという設定を加えます。
struct threadinfo { /* ... */ int *permits; pthread_mutex_t *permits_lock; }; int main(int argc, char *argv[]) { /* ... */ int permits; pthread_mutex_t permits_lock; permits = num_philosophers - 1; pthread_mutex_init(&permits_lock, NULL); /* ... */ }
許可証の数をint型の変数permitsで管理します。2人が同時に許可証を取ることがないように、変数permitsのためのmutexも用意します。
哲学者は左右のフォークを取りにいく前に wait_for_permission
で許可証を一枚取ります。食事をすることに成功したら、フォークを戻す前に grant_permission
で許可証を一枚返却します。
static void eat(int id, pthread_mutex_t *left, pthread_mutex_t *right, int *permits, pthread_mutex_t *permits_lock) { wait_for_permission(permits, permits_lock); pthread_mutex_lock(left); pthread_mutex_lock(right); printf("%d is eating\n", id); usleep(get_eat_time()); grant_permission(permits, permits_lock); pthread_mutex_unlock(left); pthread_mutex_unlock(right); }
以下は許可証にアクセスするための関数です。
static void wait_for_permission(int *permits, pthread_mutex_t *permits_lock) { while (1) { pthread_mutex_lock(permits_lock); if (*permits > 0) break ; pthread_mutex_unlock(permits_lock); usleep(10); } (*permits)--; pthread_mutex_unlock(permits_lock); } static void grant_permission(int *permits, pthread_mutex_t *permits_lock) { pthread_mutex_lock(permits_lock); (*permits)++; pthread_mutex_unlock(permits_lock); }
wait_for_permission
を呼んだ哲学者は許可証が残っているかどうか確認します。許可証の数が0枚だった場合、10マイクロ秒スリープしてから許可証の数を再検証します。だれかが許可証を返却するまでこのループを繰り返します。
ソースコード:dining_philosophers_busywait.c
許可証(permits)による解法の改善
許可証を導入したことによりデッドロックは回避できました。一方で、10マイクロ秒の間隔があるとはいえ、ビジーウェイトはCPU資源を消費するので通常は避けたいものです。
理想は許可証がないことを知った哲学者がスリープ状態に入ることでしょう。その場合、許可証を返却するタイミングでスリープしている哲学者を起こす必要があります。
static void grant_permission(int *permits, pthread_mutex_t *permits_lock) { pthread_mutex_lock(permits_lock); (*permits)++; /* スリープしている哲学者に許可証を返却したことを通知したい */ pthread_mutex_unlock(permits_lock); }
このアイデアを実装するには、mutexのみでは不十分です。他の同期オブジェクトと組み合わせる必要があります。
条件変数(condition variables)
スレッドのために用意されたもう一つの同期オブジェクトとして条件変数があります。ある条件が発生するまでスレッドをスリープさせたい時に条件変数を使います。
Pthreadでの条件変数は pthread_cond_t
型になります。
条件変数の操作:
wait
ある条件が満たされるまで待つbroadcast
ある条件が満たされたことを伝える。待っているスレッドが全て起き上がるsignal
ある条件が満たされたことを伝える。待っているスレッドが1つだけ起き上がる
条件式を競合することなく評価するためにmutexと組み合わせて使う構造になっています。
条件変数の通常の用法:
mutex m; pthread_cond_t cv; pthread_mutex_lock(m); while(condition_is_false) pthread_cond_wait(cv, m); pthread_mutex_unlock(m);
まずmutexロックの保護下で条件式を評価します。条件式が偽のとき、それが真になるまでループに入ります。
pthread_cond_wait
を呼ぶとスレッドマネージャはそのスレッドをスリープ状態にすると同時にmutex m
のロックを解除します(アトミック操作)。ロックを解除してからスリープ状態に入ることで、他のスレッドも同じように pthread_cond_wait
の行まで進行し、スリープ状態に入ることができます。
条件変数がpthread_cond_broadcast
によって通知された時、スレッドマネージャはスリープしているスレッドを起こします。
別のスレッドが条件の値を変更し、それを通知するためにpthread_cond_broadcast
を呼ぶと、条件変数にシグナルが送られます。
その条件変数でスリープしていた(1つまたは全部の)スレッドは、そのシグナルによってスリープが解除され、もう一度内部でmutex m
のロックを獲得しようとします(アトミック操作)。スリープが解除されると同時にもう一度mutexロックを獲得することで、mutexロックの保護下で条件式を再評価できます。
ビジーウェイトの解法を条件変数を使ったものにリファクタしてみましょう。
struct threadinfo { int id; pthread_mutex_t *left; pthread_mutex_t *right; int *permits; pthread_cond_t *cv; pthread_mutex_t *m; }; /* ... */ int main(int argc, char *argv[]) { pthread_mutex_t forks[num_philosophers]; pthread_t philosophers[num_philosophers]; struct threadinfo ti[num_philosophers]; int permits; pthread_cond_t cv; // 追加 pthread_mutex_t m; // 追加 permits = num_philosophers - 1; pthread_mutex_init(&m, NULL); for (int i = 0; i < num_philosophers; i++) { pthread_mutex_init(&forks[i], NULL); ti[i].id = i; ti[i].left = &forks[i]; ti[i].right = &forks[(i + 1) % num_philosophers]; ti[i].permits = &permits; ti[i].cv = &cv; ti[i].m = &m; pthread_create(&philosophers[i], NULL, philosopher, (void *)&ti[i]); } for (int j = 0; j < num_philosophers; j++) pthread_join(philosophers[j], NULL); return 0; }
条件変数は内部でmutexを使用します。そのmutexは我々で用意する必要があるため、もう一つのmutex m
を追加で初期化します。
以下は条件変数を導入した wait_for_permission
と grant_permission
です。
static void wait_for_permission(int *permits, pthread_cond_t *cv, pthread_mutex_t *m) { pthread_mutex_lock(m); while (permits == 0) pthread_cond_wait(cv, m); (*permits)--; pthread_mutex_unlock(m); } static void grant_permission(int *permits, pthread_cond_t *cv, pthread_mutex_t *m) { pthread_mutex_lock(m); (*permits)++; if (*permits == 1) pthread_cond_broadcast(cv); pthread_mutex_unlock(m); }
許可証を得ようとするときpermits == 0
であった場合、スレッドはpthread_cond_wait
を介してスリープ状態に入ります。
許可証を返すとき *permits == 1
になった場合、 pthread_cond_broadcast
で条件に変更があったことを通知します。
ソースコード:dining_philosophers_cv.c
セマフォ(semaphore)
食事する哲学者の最初の解法として、int型のpermits
変数、条件変数 cv
、mutex m
を組み合わせた、「共有資源(許可証)のカウントを管理する仕組み」を作りました。
共有資源を管理するための汎用的なカウンタという考えは便利であるため、ほとんどのプログラミング言語にはよりジェネリックな同期オブジェクトが用意されています。それがセマフォです。
許可証のカウントを管理するwait_for_permission
と grant_permission
を一般化したら以下のような構造になるでしょう。
struct semaphore { size_t value; pthread_cond_t *cv; pthread_mutex_t *m; }; void sem_wait(struct semaphore *s) { pthread_mutex_lock(s->m); while (s->value == 0) pthread_cond_wait(s->cv, s->m); s->value--; pthread_mutex_unlock(s->m); } void sem_signal(struct semaphore *s) { pthread_mutex_lock(s->m); s->value++; if (s->value == 1) pthread_cond_broadcast(s->cv); pthread_mutex_unlock(s->m); }
これがセマフォの基本的な構造です。セマフォは単一の整数を持ち、その値を指定することで初期化します。
セマフォにはWAITとSIGNALという2つの操作でアクセスします。
wait(s)
操作:セマフォの値が正の場合、その値をデクリメントし、呼び出し元のプログラムにリターンする。セマフォの値が0の場合、それが正になるまで実行を中断し待つsignal(s)
操作:セマフォの値をインクリメントする
今回はPOSIXセマフォを使います。POSIXの名前付きセマフォを使うことで複数のプロセスからでも同じセマフォへのアクセスが可能になります。
#include <semaphore.h> sem_t *sem_open(const char *name, int oflag, ... /* mode_t mode, unsigned int value */ );
新しいまたはすでに存在する名前付きセマフォを使う。
int sem_wait(sem_t *sem);
s > 0であることを待ち、値を減らし、リターンする。
int sem_post(sem_t *sem);
値を増やす。
int sem_close(sem_t *sem);
リソースを解法する。
セマフォによる実行順序の制御
セマフォの値を0で初期化することで、単純な実行順序の制約を実装することができます。
それぞれのスレッドの中では命令は逐次(1度に1つ、順番に)実行されます。スレッドの間には実行順序の制約がないため、例えばB4はスレッドAのどの命令の前後に行われるのかわかりません。
B4が開始される前にA2の実行が完了していなければならないという制約の実装手順を見てみましょう。
void *thread_A(void *arg) { sem_t *s = arg; printf("A1\n"); printf("A2\n"); sem_post(s); printf("A3\n"); printf("A4\n"); return NULL; } void *thread_B(void *arg) { sem_t *s = arg; printf("B1\n"); printf("B2\n"); printf("B3\n"); sem_wait(s); printf("B4\n"); return NULL; } int main(void) { sem_t *s; pthread_t tid_A; pthread_t tid_B; s = sem_open("/zero_sem", O_CREAT|O_EXCL, S_IRWXU, 0); sem_unlink("/zero_sem"); pthread_create(&tid_A, NULL, thread_A, (void *)s); pthread_create(&tid_B, NULL, thread_B, (void *)s); pthread_join(tid_A, NULL); pthread_join(tid_B, NULL); sem_close(s); return 0; }
"/zero_sem"という名前のセマフォを0の値で初期化します。スレッドAではA2の実行後にSIGNAL操作を行い、セマフォの値を1にします。一方スレッドBではB4の手前にWAIT操作を配置します。これによりB4はA2の実行が完了したあとにのみ開始されることが保証されます。
sem_close
を呼ぶ前にプロセスが終了した場合、カーネルはopenされていたセマフォを自動的にcloseします。ただし名前付きセマフォをcloseしてもセマフォの値はそのままであることに注意すr必要があります。名前付きPOSIXセマフォを完全に破棄するにはsem_unlink
を呼び出さなければなりません。
int sem_unlink(const char *name);
sem_unlink
はセマフォの名前をシステムから削除します。このときsem_open
によるセマフォへの参照がなかったらセマフォを破棄します。参照があれば、それがcloseされるまでセマフォの破棄を先延ばしします。
ソースコード:sem_zero.c
セマフォによる共有資源の分配
セマフォの値を0以上の整数で初期化することで、共有資源プールの管理ができます。
以前の食事する哲学者の解法ではカウンタ変数、mutex、条件変数を組み合わせて「フォークを取得する前に許可を待つ」仕組みを作りました。同じことを1つのセマフォでより簡単に実現できます。
static void eat(int id, pthread_mutex_t *left, pthread_mutex_t *right, sem_t *permits) { sem_wait(permits); pthread_mutex_lock(left); pthread_mutex_lock(right); printf("%d is eating\n", id); usleep(get_eat_time()); sem_post(permits); pthread_mutex_unlock(left); pthread_mutex_unlock(right); } /* ... */ int main(int argc, char *argv[]) { pthread_mutex_t forks[num_philosophers]; pthread_t philosophers[num_philosophers]; struct threadinfo ti[num_philosophers]; sem_t *permits; permits = sem_open("/permits", O_CREAT|O_EXCL, S_IRWXU, num_philosophers - 1); sem_unlink("/permits"); for (int i = 0; i < num_philosophers; i++) { pthread_mutex_init(&forks[i], NULL); ti[i].id = i; ti[i].left = &forks[i]; ti[i].right = &forks[(i + 1) % 5]; ti[i].permits = permits; pthread_create(&philosophers[i], NULL, philosopher, (void *)&ti[i]); } for (int j = 0; j < 5; j++) pthread_join(philosophers[j], NULL); sem_close(permits); return 0; }
セマフォの値を許可証の数で初期化し、これまでのwait_for_permission
と grant_permission
をそれぞれ sem_wait
と sem_post
によって置き換えます。
ソースコード:dining_philosophers_sem.c
セマフォによる有限バッファ
有限バッファ(bounded buffer)はキャッシュの実装によく使われる構造です。プロセス間通信で使用するパイプのバッファやネットワークソケットなどの実装にも有限バッファが使われています。セマフォや条件変数を使えばスレッド間で同期された有限バッファを作ることができます。
有限バッファには生産者(writer)と消費者(reader)がいます。あらかじめ定めた容量(capacity)で、生産者はバッファが一杯になるまでデータをバッファに書き込むことができ、消費者はバッファが空になるまでデータを読み出すことができます。
まずは同期機能のない有限バッファの実装を見てみましょう。
struct threadinfo { char *buffer; size_t capacity; size_t iterations; }; void *write_to_buffer(void *arg) { struct threadinfo *tip = arg; printf("Writer: Ready to write.\n"); for (size_t i = 0; i < tip->iterations * tip->capacity; i++) { char ch = prepare_data(); tip->buffer[i % tip->capacity] = ch; printf("Writer: published data packet with character '%c'.\t\t", ch); } return NULL; } void *read_from_buffer(void *arg) { struct threadinfo *tip = arg; printf("Reader: Ready to read.\n"); for (size_t i = 0; i < tip->iterations * tip->capacity; i++) { char ch = tip->buffer[i % tip->capacity]; process_data(ch); tip->buffer[i % tip->capacity] = ' '; printf("Reader: consumed data packed with character '%c'.\t\t", ch); } return NULL; } int main(void) { char buffer[8]; pthread_t writer_thread; pthread_t reader_thread; struct threadinfo ti; memset(buffer, ' ', sizeof(buffer)); ti.buffer = buffer; ti.capacity = sizeof(buffer); ti.iterations = 3; pthread_create(&writer_thread, NULL, write_to_buffer, &ti); pthread_create(&reader_thread, NULL, read_from_buffer, &ti); pthread_join(writer_thread, NULL); pthread_join(reader_thread, NULL); return 0; }
生産者スレッドはランダムな文字を1バイトずつデータを有限バッファに書き込みます。消費者スレッドはそれと同じ順で1バイトずつデータを読み出します。コードを単純にするために、バッファの中身を参照する位置は必ずi % capacity
であることにします。
prepare_data
とprocess_data
関数ではデータの処理にかかる時間をシミュレートしてます。
このままだと2つのスレッドは、お互いが何バイト進んだのかわからないまま独立してIOを行ってしまいます。消費者は次のスロットに意味のあるデータが書き込まれているか知るすべがありませんし、生産者は一周まわって消費者がまだ読み込んでいないスロットを上書きしてしまうかもしれません。
$ ./a.out Writer: Ready to write. Reader: Ready to read. Writer: published data packet with character 'S'. BUFFER: |S| | | | | | | | Writer: published data packet with character 'W'. BUFFER: |S|W| | | | | | | Reader: consumed data packed with character ' '. BUFFER: | |W| | | | | | | Writer: published data packet with character 'C'. BUFFER: | |W|C| | | | | | Reader: consumed data packed with character 'W'. BUFFER: | | |C| | | | | | Reader: consumed data packed with character 'C'. BUFFER: | | | | | | | | | Writer: published data packet with character 'C'. BUFFER: | | | |C| | | | | Reader: consumed data packed with character ' '. BUFFER: | | | | | | | | | Reader: consumed data packed with character ' '. BUFFER: | | | | | | | | | Writer: published data packet with character 'I'. BUFFER: | | | | |I| | | | Writer: published data packet with character 'W'. BUFFER: | | | | |I|W| | | Writer: published data packet with character 'T'. BUFFER: | | | | |I|W|T| | Reader: consumed data packed with character ' '. BUFFER: | | | | |I| |T| | Reader: consumed data packed with character 'T'. BUFFER: | | | | |I| | | | Reader: consumed data packed with character ' '. BUFFER: | | | | |I| | | | Writer: published data packet with character 'E'. BUFFER: | | | | |I| | |E| Reader: consumed data packed with character ' '. BUFFER: | | | | |I| | |E| Writer: published data packet with character 'Q'. BUFFER: |Q| | | |I| | |E| Writer: published data packet with character 'G'. BUFFER: |Q|G| | |I| | |E| Writer: published data packet with character 'B'. BUFFER: |Q|G|B| |I| | |E| Writer: published data packet with character 'M'. BUFFER: |Q|G|B|M|I| | |E| Reader: consumed data packed with character ' '. BUFFER: |Q| |B|M|I| | |E| Writer: published data packet with character 'R'. BUFFER: |Q| |B|M|R| | |E| Reader: consumed data packed with character 'B'. BUFFER: |Q| | |M|R| | |E| ...
双方向のスレッド間の協調を可能にするためには2つのセマフォが必要になります。
有限バッファの生産者はバッファが一杯のときにブロックし、消費者が何かを読み出したことでバッファが一杯でなくなったことを条件に起き上がるべきです。
有限バッファの消費者はバッファが空のときにブロックし、生産者が何かを書き込んだことでバッファが空でなくなったことを条件に起き上がるべきです。
空の条件を管理する"/empty"という名前のセマフォと、一杯の条件を管理する"/full"という名前のセマフォを導入してみましょう。
struct threadinfo { char *buffer; sem_t *full; sem_t *empty; size_t capacity; size_t iterations; }; void *write_to_buffer(void *arg) { for (size_t i = 0; i < tip->iterations * tip->capacity; i++) { char ch = prepare_data(); sem_wait(tip->empty); tip->buffer[i % tip->capacity] = ch; sem_post(tip->full); /* ... */ } return NULL; } void *read_from_buffer(void *arg) { for (size_t i = 0; i < tip->iterations * tip->capacity; i++) { sem_wait(tip->full); char ch = tip->buffer[i % tip->capacity]; process_data(ch); tip->buffer[i % tip->capacity] = ' '; sem_post(tip->empty); /* ... */ } return NULL; } int main(void) { /* ... */ sem_t *full = sem_open("/full", O_CREAT|O_EXCL, S_IRWXU, 0); sem_t *empty = sem_open("/empty", O_CREAT|O_EXCL, S_IRWXU, sizeof(buffer)); sem_unlink("/full"); sem_unlink("/empty"); /* ... */ }
ソースコード:sem_reader_writer.c
$ ./a.out Writer: Ready to write. Reader: Ready to read. Writer: published data packet with character 'Z'. BUFFER: |Z| | | | | | | | Writer: published data packet with character 'O'. BUFFER: |Z|O| | | | | | | Reader: consumed data packed with character 'Z'. BUFFER: | |O| | | | | | | Reader: consumed data packed with character 'O'. BUFFER: | | | | | | | | | Writer: published data packet with character 'Z'. BUFFER: | | |Z| | | | | | Reader: consumed data packed with character 'Z'. BUFFER: | | | | | | | | | Writer: published data packet with character 'L'. BUFFER: | | | |L| | | | | Writer: published data packet with character 'O'. BUFFER: | | | |L|O| | | | Reader: consumed data packed with character 'L'. BUFFER: | | | | |O| | | | Writer: published data packet with character 'H'. BUFFER: | | | | |O|H| | | Reader: consumed data packed with character 'O'. BUFFER: | | | | | |H| | | Writer: published data packet with character 'G'. BUFFER: | | | | | |H|G| | Writer: published data packet with character 'F'. BUFFER: | | | | | |H|G|F| Reader: consumed data packed with character 'H'. BUFFER: | | | | | | |G|F| Writer: published data packet with character 'H'. BUFFER: |H| | | | | |G|F| Reader: consumed data packed with character 'G'. BUFFER: |H| | | | | | |F| Writer: published data packet with character 'Z'. BUFFER: |H|Z| | | | | |F| Reader: consumed data packed with character 'F'. BUFFER: |H|Z| | | | | | | Writer: published data packet with character 'E'. BUFFER: |H|Z|E| | | | | | Reader: consumed data packed with character 'H'. BUFFER: | |Z|E| | | | | | Writer: published data packet with character 'D'. BUFFER: | |Z|E|D| | | | | Reader: consumed data packed with character 'Z'. BUFFER: | | |E|D| | | | | Writer: published data packet with character 'M'. BUFFER: | | |E|D|M| | | | Writer: published data packet with character 'X'. BUFFER: | | |E|D|M|X| | | Writer: published data packet with character 'Z'. BUFFER: | | |E|D|M|X|Z| | Reader: consumed data packed with character 'E'. BUFFER: | | | |D|M|X|Z| | Writer: published data packet with character 'D'. BUFFER: | | | |D|M|X|Z|D| Reader: consumed data packed with character 'D'. BUFFER: | | | | |M|X|Z|D| Writer: published data packet with character 'R'. BUFFER: |R| | | |M|X|Z|D| Reader: consumed data packed with character 'M'. BUFFER: |R| | | | |X|Z|D| Writer: published data packet with character 'F'. BUFFER: |R|F| | | |X|Z|D| Reader: consumed data packed with character 'X'. BUFFER: |R|F| | | | |Z|D| Writer: published data packet with character 'P'. BUFFER: |R|F|P| | | |Z|D| Reader: consumed data packed with character 'Z'. BUFFER: |R|F|P| | | | |D| Writer: published data packet with character 'K'. BUFFER: |R|F|P|K| | | |D| Reader: consumed data packed with character 'D'. BUFFER: |R|F|P|K| | | | | Writer: published data packet with character 'Y'. BUFFER: |R|F|P|K|Y| | | | Reader: consumed data packed with character 'R'. BUFFER: | |F|P|K|Y| | | | Writer: published data packet with character 'N'. BUFFER: | |F|P|K|Y|N| | | Reader: consumed data packed with character 'F'. BUFFER: | | |P|K|Y|N| | | Writer: published data packet with character 'H'. BUFFER: | | |P|K|Y|N|H| | Writer: published data packet with character 'C'. BUFFER: | | |P|K|Y|N|H|C| Reader: consumed data packed with character 'P'. BUFFER: | | | |K|Y|N|H|C| Reader: consumed data packed with character 'K'. BUFFER: | | | | |Y|N|H|C| Reader: consumed data packed with character 'Y'. BUFFER: | | | | | |N|H|C| Reader: consumed data packed with character 'N'. BUFFER: | | | | | | |H|C| Reader: consumed data packed with character 'H'. BUFFER: | | | | | | | |C| Reader: consumed data packed with character 'C'. BUFFER: | | | | | | | | |
このように、同期オブジェクトは通信においてデータの流れを調整することに役立ちます。
参考
- Advanced Programming in the UNIX Environment, 3rd Edition by W. Richard Stevens
- Computer Systems: A Programmer's Perspective, 3rd Edition by Randal E. Bryant and David R. O'Hallaron, Carnegie Mellon University
- https://web.stanford.edu/class/cs110/summer-2021/
- http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2021/2021-04-16/index.html
- https://www.ieice-hbkb.org/files/06/06gun_05hen_02.pdf