ostep concurrency

ostep concurrency筆記


Introduce

  • 在thread中的context和在process的context非常像,不一樣的是process儲存在PCB而thrad儲存在TCB(thread control block)

  • 另一個不同點為stack,

    • 在single thread中也就是process只有一個stack(single stack)
    • 在multi thread process 中由於這些thread都是independently,因此每個thread都會有自己的stack來存放(thread-local storage)

The Heart Of The Problem

  • 在thread中有個嚴重的問題,假設有兩個thread(th1,th2)並且執行以下的asm
mov 0x8049a1c, %eax
add $0x1, %eax
mov %eax, 0x8049a1c
# 從0x8049a1c拿出一個register並進行加1,再放回0x8049a1c
# 預計%eax的值為52
  • 執行流程為,假設eax register為counter=50
    1. th1 獲得counter,counter=50
    2. counter+1=51,此時發生context,執行th2
    3. th2 獲得eax,但th2從address中拿到的register還是50(原因是thread拿到的是自己的private register,這個register為virtualized by the context-switch code that saves and restores)
    4. counter+1=51,此時另一個context又發生,th1回復執行
    5. th1將值放回address
    6. 原本counter預計拿到52但此時只有51
  • 此時這種問題稱為race condition,每次運行的結果都不一定一樣,稱為這個結果為不確定的(indeterminate)而非確定(deterministic)
  • 由於多個thread為race condition,因此稱為critical section
    • critical section是訪問共享variable的地方,一定不能由多個thread同時執行

關鍵術語

  1. critical section是訪問共享資源的一段程式,資源為變量或data structur
  2. race condition 出現在多個thread同時訪問critical section,他們都試圖更新資源,導致非期望結果
  3. indeterminate程式由一個或多個race condition組成,導致結果不是deterministic,並非我們期望的輸出
  4. mutual exclusion是為了避免這些問題,可以保證每次只有一個thread進入critical section,從而避免race

Locks

  • 在concurrency中鎖扮演非常重要的角色,而本身也很簡單,只有lock和unlock兩個函數
    • lock 調用時,如果其他thread沒有鎖則調用的thread就會獲得鎖並且進入critical section,此時這個獲得鎖的thread叫做owner
      • 當其他thread嘗試獲得鎖不會返回,這樣就能控制critical section中只會有一個thread
    • unlock 調用時,如果沒有其他thread嘗試獲得鎖,則會閒置變成可用的
      • 當有等待的thread時,會收到鎖狀態變化的通知,並且獲得鎖進入critical section
  • 而設計一個鎖需要考量以下3點
    1. mutual exclusion 為主要限制其他thread進入critical section的方法
    2. fairness 避免thread發生strave一直無法獲得鎖
    3. performance,其中包括了thread爭搶,釋放的開銷,以及單CPU上多個thread的競爭

Controlling Interrupts

  1. 需要將系統設置為privileged operation(turn interrupts on and off),可能會遇到惡意程式調用lock而不釋放
  2. 沒有辦法在multi process上進行,多個thread在不同cpu上運行並且訪問critical section,interrupts會不知道要對哪個進行
  3. 長時間interrupts會導致系統問題,想像CPU因為interrupts的關係而miss掉disk finish request

Spin lock

A Failed Attempt: Just Using Loads/Stores

typedef struct __lock_t { int flag; } lock_t;
 
void init(lock_t *mutex) {
  // 0 -> lock is available, 1 -> held
  mutex->flag = 0;
}
 
void lock(lock_t *mutex) {
  while (mutex->flag == 1) // TEST the flag;
    // spin-wait (do nothing)
  mutex->flag = 1; // now SET it!
}
 
void unlock(lock_t *mutex) {
  mutex->flag = 0;
}
  • 想法很簡單,當另一個thread調用lock時,嘗試獲得鎖的thread會不斷spin等待,值到unlock

Problem

  1. performance,由於thread未獲取鎖時一直spin等待鎖釋放浪費時間,在single process上甚至沒辦法運行
  2. mutual exclusion,可能多個thread同時被標示為1,導致超過一個thread進入critical section

Test-And-Set (Atomic exchange)

 int TestAndSet(int *old_ptr, int new) {
  int old = *old_ptr; // fetch old value at old_ptr
  *old_ptr = new; // store ’new’ into old_ptr
  return old; // return the old value
 }
 
typedef struct __lock_t {
  int flag;
 } lock_t;
 
void init(lock_t *lock) {
// 0: lock is available, 1: lock is held
  lock->flag = 0;
}
 
 void lock(lock_t *lock) {
  while (TestAndSet(&lock->flag, 1) == 1);
    // spin-wait (do nothing)
 }
 
 void unlock(lock_t *lock) {
  lock->flag = 0;
 }
  • TestAndSet主要做了以下事情,由於是atomic下運行,因此為同時發生
    1. 返回old_ptr指向的舊值
    2. 更新新值

這個鎖的作用

  1. 假設有一個thread在運行,調用lock而沒有其他thread擁有鎖,因此flag為0,跳出while獲取鎖,同時也會將flag設為1,標誌為鎖已被擁有
  • thread離開critical section時調用unlock將falg清0
  1. 當某一個thread已經有鎖時(flag=1),其他thread調用lock,TestAndSet返回1並且卡在while中spin
  • 當flag被清0後會再次調用TestAndSet,返回0並且atomic將flag設為1從而獲得鎖

Spin的問題

  • 利用CPU週期不斷spin值到有鎖可用,在single process上需要preemptive scheduling(不斷透過clock interrupts一個thread來運行其他thread),否則spin lock在單CPU上無法使用,因為一個spin的thread永遠不會放棄CPU

Evaluating Spin Locks

  1. mutual, 一次只能有一個thread進入critical section
  2. fairness, spin lock沒辦法保證fairness,spin的thread在race condition下可能會永遠spin,導致其他thread strave
  3. performance, 只有獲得lock的CPU不會spin,所以其他thread都會在unlock前浪費CPU時間
  • 不過在multi process下性能不差,由於critical section很短,因此很快就會有所能用,不會浪費CPU週期

Load-Linked and Store-Conditional

 int LoadLinked(int *ptr) {
  return *ptr;
 }
 int StoreConditional(int *ptr, int value) {
 
 if (no update to *ptr since LL to this addr) {
  *ptr = value;
  return 1; // success!
 } else {
    return 0; // failed to update
  }
 }
  • load從內存取出一個值放入register
  • store condition 只有上一次加載的地址期間都沒更新時才會成功,同時更新剛才Load-Linked加載地址的值
    • 成功時,返回1,並且將ptr指得值更新為value
    • 失敗時,返回0,不會有任何動作

Fetch-And-Add

int FetchAndAdd(int *ptr) {
   int old = *ptr;
   *ptr = old+1;
   return old;
 }
 
typedef struct __lock_t {
   int ticket ;
   int turn;
 }lock_t;
 
void lock_init(lock_t *lock) {
   lock->ticket = 0;
   lock->turn = 0;
}
 
void lock(lock_t *lock) {
  int myturn = FetchAndAdd(&lock->ticket);
  while (myturn != lock->turn)
    ; //spin
}
 
void unlock(lock_t *lock) {
  lock->turn = lock->turn + 1;
}
  1. 使用了ticket和turn來構建lock,如果thread希望獲得lock,會先對ticket值執行一個atomic的獲取並相加
  2. 這個值為該thread的turn(myturn),根據全局共享的lock->turn變量,當某一個thread(myturn == lock->turn)時,則輪到這個thread進入critical section
  3. unlock則是增加turn,從而下一個等待的thread可以進入critical section

重點改動

  • 這個方法保證了所有thread都能搶到lock,從而實現fairness,只要有一個thread獲得ticket值,他最後會被調用
  • 之前的方法則不會保證fairness,比如基於TestAndSet,一個thread可能一直spin,即使其他thread在獲得鎖和釋放鎖

自旋過多

  • 假設兩個thread在single process上執行,其中一個thread持有鎖,被中斷導致另一個thread嘗試獲取鎖,此時因為鎖已經被持有了,因此自值到thread 0重新被resume後釋放鎖,因此假設有N個thread競爭一個鎖會產生浪費N1N-1個time slice,導致過多的cpu time被浪費

Avoid spin

Just Yield Baby

  • 想法很簡單,spin時放棄CPU,相比原先雖然好一點,但由於等到有鎖時喚醒的context switch成本也很大,因此不是一個好方法
void init() {
    flag = 0;
}
 
void lock() {
  while(TestAndSet(&flag,1) == 1)
    yield(); //give up the CPU
}
 
void unlock() {
  flag = 0;
}

Using Queue: Sleep Instead Of Spin

  • 先前的方法總會遇到一個問題,spin util to get the lock,或是yield the CPU immediately,都會造成CPU waste且no prevention of starvation

  • 使用support provided by Solaris

    1. park: put a calling thread to sleep
    2. unpark(threadID): wake a particular thread by threadID
typedef struct __lock_t {
  int flag;
  int guard;
  queue_t *q;
}lock_t;
 
 
void lock_init(lock_t *m){
  m->flag = 0;
  m->guard = 0;
  queue_init(m->q);
}
 
void lock(lock_t *m) {
  while(TestAndSet(&m->guard,1) == 1); // acquire guard lock by spining
  if(m->flag == 0){
    m->flag = 1; // lock is acquired
    m->guard = 0;
  }else{
    queue_add(m->q,gettid());
    m->guard = 0;
    park();
  }
}
 
void unlock(lock_t *m) {
  while(TestAndSet(&m->guard,1) == 1); // acquire guard lock by spining
  if(queue_empty(m->q))
    m->flag = 0; // let go of lock; no one wants it
  else
    unpark(queue_remove(m->q)); // hold lock for next thread !
  m->guard = 0;
}

我們藉由以上程式碼完成兩件事情

  1. 使用TestAndSet實現高性能的lock
  2. 使用queue來control獲得鎖的邏輯,避免starvation
  • guard 基本上起到了spin的作用,但因為圍繞著flag和queue因此沒辦法完全avoid spin,但這個spin的time是有限的,因此不會造成不斷spin的狀況

  • 在lock函數中,如果flag != 0也代表無法獲取鎖,會將她加入queue中等待,並且將guard設為0來釋放CPU,需要注意的是如果我們在park後才將guard設為0

    • 如果先設為park再將guard設為0
      1. 由於先park後scheduler將它設為睡眠,此時訪問他的guard值時會讓其他scheduler誤以為他尚未sleep而導致錯誤

      2. 如果此時另一個thread嘗試unpark時,由於他的guard尚未清0,因此scheduler選擇不wake這一個thread

  • 在wake up另一個thread時並沒有將flag重新設為0是必須的,thread被wake後是從park的調用返回,此時沒有guard因此也不能將flag設為1,因此我們直接將lock release的thread傳遞給下一個gets lock的thread即可

  • 在目前的solutioin中存在race condition,在park調用之前如果有另一個thread也要park,assuming 這個thrad應該要sleep until gets lock,此時switch到了another thread,which thread have lock,這時該thread釋放鎖會導致第一個park的thread永遠沈睡,這種問題也稱為wakeup/waiting race

Setpark

  • Solaris 使用了setpark來解決這問題,當一個thread表明自己馬上要park時,如果剛好另一個thread被調度,並且調用了unpark,後續的park掉用就會直接返回,而不是一直sleep
queue_add(m->q,gettid());
setpark();
m->guard = 0;

Lock-based Concurrent Data Structures

Simple But Not Scalabe

typedef structur __counter_t {
  int value;
}counter_t;
 
void init(counter_t *c) {
  c->value = 0;
}
 
void inc(counter_t *c) {
  c->value ++;
}
 
void dec(counter_t *c) {
  c->vlaue --;
}
 
int get(counter_t *c) {
  return c->value;
}

Problem: How to let this code be thread safe

typedef struct __counter_t {
  int value;
  pthread_mutex_t lock;
} counter_t;
 
void init(counter_t *c) {
  c->value = 0;
  pthread_mutex_init(&c->lock, NULL);
}
 
void inc(counter_t *c) {
  pthread_mutex_lock(&c->lock);
  c->value ++;
  pthread_mutex_unlock(&c->lock);
}
 
void dec(counter_t *c) {
  pthread_mutex_lock(&c->lock);
  c->value --;
  pthread_mutex_unlock(&c->lock);
}
 
int get(counter_t *c) {
  pthread_mutex_lock(&c->lock);
  int rc = c->value;
  pthread_mutex_unlock(&c->lock);
  return rc;
}
  • 現在有了一個thread safe的data structur,但當thread增加時性能會大幅度下降,理想情況下在mutil process上運行會像single process一樣快

Sloppy counter

  • 簡單的思想就是每個thread都有自己的mutex來避免race condition發生,而全局上的counter會定期從這些局部counter(含有mutex的thread)上加上自己本身的來達到記數的效果
typedef struct __counter_t {
  int global; // global counter
  pthread_mutex_t glock; // global lock
  int local[NUMCPUS]; // local count (per cpu)
  pthread_mutex_t llock[NUMCPUS];
  int threshold; // update frequency
} counter_t;
 
// init: record threshold, init locks, init values of all local counter and global counter
 
void init(counter_t *c,int threshold) {
  c->threshold = threshold;
  c->global = 0;
  pthread_mutex_init(&c->glock, NULL);
  int i;
  for(i=0;i<NUMCPUS;i++){
    c->local[i] = 0;
    pthread_mutex_init(&c->llock[i], NULL);
  }
}
 
// update: usually, just grab local lock and update local amount,一但到了threshold才grab global lock and thransfer local values to global amount
 
void update(counter_t *c, int threadID, int amount) {
  int cpu = threadID % NUMCPUS;
  pthread_mutex_lock(&c->llock[cpu]);
  c->local[cpu] += amount;
  if(c->local[cpu] >= c->threshold)  {
    // transfer to global
    pthread_mutex_lock(&c->glock);
    c->global += c->local[cpu];
    pthread_mutex_unlock(&c->glock);
    c->local[cpu] = 0;
  }
  pthread_mutex_unlock(&c->llock[cpu]);
}
 
// get: just return the global amount
int get(counter_t *c) {
  pthread_mutex_lock(&c->lock);
  int val = c->global;
  pthread_mutex_unlock(&c->lock);
  return val;
}
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
 
typedef struct __node_t {
  int key;
  struct __node_t *next;
}node_t;
 
typedef struct __list_t {
  node_t *head;
  pthread_mutex_t lock;
}list_t;
 
void List_Init(list_t *l) {
  l->head = NULL;
  pthread_mutex_init(&l->lock,NULL);
}
 
int List_Insert(list_t *l,int key) {
  pthread_mutex_lock(&l->lock);
  node_t *new = malloc(sizeof(node_t));
  if (new == NULL) {
    perror("malloc");
    pthread_mutex_unlock(&l->lock);
    return -1;
  }
  new->key = key;
  new->next = l->head;
  l->head = new;
  pthread_mutex_unlock(&l->lock);
  return 0;
}
 
int List_Lookup(list_t *l,int key) {
  pthread_mutex_lock(&l->lock);
  node_t *curr = l->head;
  while(curr != NULL) {
    if(curr->key == key) {
      pthread_mutex_unlock(&l->lock);
      return 0;
    }
    curr = curr->next;
  }
  pthread_mutex_unlock(&l->lock);
  return -1;
}
 

目前的LinkList會有點問題

  • 在Insert的部分在進入點上了鎖,結束時釋放.但如果malloc發生了錯誤,就應該在發生錯誤時釋放鎖
    • 假定malloc是thread safe的,讓獲取鎖和釋放鎖只在真正的critical section時使用
int List_Insert(list_t *l,int key) {
  node_t *new = malloc(sizeof(node_t));
  if (new == NULL) {
    perror("malloc");
    pthread_mutex_unlock(&l->lock);
    return -1;
  }
  new->key = key;
  // just lock curitical section
  pthread_mutex_lock(&l->lock);
  new->next = l->head;
  l->head = new;
  pthread_mutex_unlock(&l->lock);
  return 0;
}
 
int List_Lookup(list_t *l,int key) {
  int rv = -1;
  pthread_mutex_lock(&l->lock);
  node_t *curr = l->head;
  while(curr != NULL) {
    if(curr->key == key) {
      rv = 0;
      pthread_mutex_unlock(&l->lock);
      return rv;
    }
    curr = curr->next;
  }
  pthread_mutex_unlock(&l->lock);
  return rv;
}
  • 使用hand-over-hand locking來避免擴展性不好的問題
  • 原理:
    • 每個node都有一個鎖以此替代整個LinkList只有一個鎖,在iterator的時後先搶佔下一個node上的鎖再釋放自己的
  • 問題:
    • 概念上增加了LinkList操作的併發程度,但因為在iterator時不斷地獲取和釋放會造成大量開銷,因此不一定會比單鎖還要快

queue

#include <pthread.h>
#include <assert.h>
 
typedef struct __node_t {
  int value;
  struct __node_t *next;
} node_t;
 
typedef struct __queue_t {
  node_t *head;
  node_t *tail;
  pthread_mutex_t head_lock, tail_lock;
} queue_t;
 
void Queue_Init(queue_t *q) {
  node_t *tmp = malloc(sizeof(node_t));
  tmp->next = NULL;
  q->head = q->tail = tmp;
  pthread_mutex_init(&q->head_lock, NULL);
  pthread_mutex_init(&q->tail_lock, NULL);
}
 
void Queue_Enqueue(queue_t *q,int value) {
  node_t *tmp = mallco(sizeof(node_t));
  assert(tmp != NULL);
  tmp->value = value;
  tmp->next = NULL;
 
  pthread_mutex_lock(&q->tail_lock);
  q->tail->next = tmp;
  q->tail = tmp;
  pthread_mutex_unlock(&q->tail_lock);
}
 
int Queue_Dequeue(queue_t *q,int *value) {
  pthread_mutex_lock(&q->head_lock);
  node_t *tmp = q->head;
  node_t *new_head = tmp->next;
  if (new_head == NULL) {
    pthread_mutex_unlock(&q->head_lock);
    return -1;
  }
  *value = new_head->value;
  q->head = new_head;
  pthread_mutex_unlock(&q->head_lock);
  free(tmp);
  return 0;
}

Condition Variables

  • 有時候我們需要檢查某condition滿足後程式再繼續運行

基於 spinLock

volatile int done = 0;
 
void *child(void *arg){
  printf("child\n");
  done = 1;
  return NULL;
}
 
int main(int argc,cahr **argv){
  printf("parent: begin\n");
  pthread_t c;
  Pthread_create(&c,NULL,child,NULL);
  while(done == 0); //spin
  printf("parent: end\n");
  return 0;
}
  • 問題是目前程式主thread會一直spin導致浪費CPU資源,因此我們希望有某種方式能讓main thread sleep until condition 滿足(即child thread完成執行)

Definition and Routines

  • thread 可以使用condition variable 來等待一個條件成真,condition variable為一個顯式隊列,當某condition不滿足時,thrad可以將自己加入queue來等待

  • 聲明condition variable

    1. pthread_cont_t c,這裏聲明c是一個condition variable
    2. condition variable也兩種操作
  • condition variable操作

    1. wait,讓thread進入sleep的方法
    2. singal,讓thread從sleep中喚醒的方法
    pthread_cond_wait(pthread_cond_t *c,pthread_mutex_t *m);
    pthread_cond_signal(pthread_cond_t *c);

使用condition variable instead spinlock

#include <pthread.h>
#include <stdio.h>
int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
 
void thr_exit() {
  pthread_mutex_lock(&m);
  done = 1;
  pthread_cond_signal(&c);
  pthread_mutex_unlock(&m);
}
 
void *child() {
  printf("child \n");
  thr_exit();
  return NULL;
}
 
void thr_join() {
  pthread_mutex_lock(&m);
  while(done == 0)
    pthread_cond_wait(&c,&m);
  pthread_mutex_unlock(&m);
}
 
int main() {
  printf("parent: begin \n");
  pthread_t p;
  pthread_create(&p,NULL,child,NULL);
  thr_join();
  printf("parent: end \n");
  return 0;
}
  • 由上面範例可以發現wait其實是在釋放鎖,以至於讓調用的thread sleep
  • 而singal則是讓thread被喚醒後重新獲得鎖

如果沒有done的話呢

void thr_exit() {
  pthread_mutex_lock(&m);
  pthread_cond_signal(&c);
  pthread_mutex_unlock(&m);
}
 
void thr_join(){
  pthread_mutex_lock(&m);
  pthread_cond_wait(&c,&m);
  pthread_mutex_unlock(&m);
}
  • 這會導致child thread立刻運行並且調用singal嘗試喚醒thread,此時沒有被喚醒的thread在所以凱著,而parent thread則會調用wait然後就一直卡住了

如果不加鎖

void thr_exit(){
  done = 1;
  pthread_cond_signal(&c);
}
 
void thr_join() {
  if(done == 0) {
    pthread_cond_wait(&c);
  }
}
  • parent thread調用thr_join後檢查為done為0準備wait,此時在進入wait前parent thread被interrupts,child thread改變done為1,但同樣沒有thread在wait,接著parent thread就會一直卡住了

生產者消費者問(Bound Buffer) Problem

  • For example, 在multi Web server 中 producer 將HTTP request into a work queue,並且consumer對這個queue進行消費
  • Bound buffer也用在pipe the output 上
    grep foo file.txt | wc -l
    • this example runs two process concurrently
  • 由於Bound Buffer會shared resource,因此我們需要保證他不會發生race condition

Producer/Consumer Threads v1

void *producer(void *arg) {
  int i;
  int loops = (int)arg;
  for(i=0;i<loops;i++){
    put(i);
  }
}
 
void *consumer(void *arg) {
  while(1) {
    int tmp = get();
    printf("%d\n",tmp);
  }
}
 

A Broken Solution

  • Single CV And If Statement
int loops; // must initialize somewhere...
 cond_t cond;
 mutex_t mutex;
 
 void *producer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    Pthread_mutex_lock(&mutex); // p1
  if (count == 1) // p2
    Pthread_cond_wait(&cond, &mutex); // p3
  put(i); // p4
  Pthread_cond_signal(&cond); // p5
  Pthread_mutex_unlock(&mutex); // p6
 }
}
 
 void *consumer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    Pthread_mutex_lock(&mutex); // c1
  if (count == 0) // c2
    Pthread_cond_wait(&cond, &mutex); // c3
  int tmp = get(); // c4
  Pthread_cond_signal(&cond); // c5
  Pthread_mutex_unlock(&mutex); // c6
  printf("%d\n", tmp);
 }
}
  • 目前會有一個問題,當producer和consumer同時運行時,producer在p2時會檢查count == 1,此時consumer也在c2檢查count == 0,這樣會導致兩個thread都會進入wait狀態,導致死鎖
  • 這個問題稱為lost wakeup,當producer在p2時,如果consumer在c2時,producer會將consumer從wait中喚醒,但consumer會在c2檢查count == 0,因此會再次進入wait狀態

Beter Solution

int loops; // must initialize somewhere...
 cond_t cond;
 mutex_t mutex;
 
 void *producer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    Pthread_mutex_lock(&mutex); // p1
  while (count == 1) // p2
    Pthread_cond_wait(&cond, &mutex); // p3
  put(i); // p4
  Pthread_cond_signal(&cond); // p5
  Pthread_mutex_unlock(&mutex); // p6
 }
}
 
 void *consumer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    Pthread_mutex_lock(&mutex); // c1
  while (count == 0) // c2
    Pthread_cond_wait(&cond, &mutex); // c3
  int tmp = get(); // c4
  Pthread_cond_signal(&cond); // c5
  Pthread_mutex_unlock(&mutex); // c6
  printf("%d\n", tmp);
 }
}
  • 這個方法的好處是,當producer在p2時,如果consumer在c2,producer會將consumer從wait中喚醒,但consumer會在c2檢查count == 0,因此會再次進入wait狀態
  • 這樣producer就不會進入wait狀態,而是會繼續運行,直到count != 1

Use condition variable to signal

cond_t empty,fill;
mutex_t mutex;
 
void *producer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    Pthread_mutex_lock(&mutex);
    while (count == 1)
      Pthread_cond_wait(&empty, &mutex);
    put(i);
    Pthread_cond_signal(&fill);
    Pthread_mutex_unlock(&mutex);
  }
}
 
void *consumer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    Pthread_mutex_lock(&mutex);
    while (count == 0)
      Pthread_cond_wait(&fill, &mutex);
    int tmp = get();
    Pthread_cond_signal(&empty);
    Pthread_mutex_unlock(&mutex);
    printf("%d\n", tmp);
  }
}
  • producer thread等待condition variable empty 發信號給fill
  • consumer thread等待condition variable fill 發信號給empty
  • 這樣producer和consumer就不會互相影響,而是各自等待各自的condition variable

使用While而非IF

  • 對condition variable使用while解決了spurios wakeup情況,某些thread庫由於實現細節可能會自出現,一個信號喚醒兩個thread的情況,再次檢查等待條件就是假喚醒的另一個遠因

Cover Condition

int byteLeft = MAX_HEAP_SIZE;
cond_t c;
mutex_t m;
 
void *allocate(int size) {
  pthread_mutex_lock(&m);
  while(byteLeft < size) {
    pthread_cond_wait(&c, &m);
  }
 
  void *ptr = ... // get memory from heap
  byteLeft -= size;
  pthread_mutex_unlock(&m);
  return ptr;
}
 
void *free(void *ptr, int size) {
  pthread_mutex_lock(&m);
  byteLeft += size;
  pthread_cond_signal(&c);
  pthread_mutex_unlock(&m);
}
  • 如果單純使用singal的話會有一個問題,假設P0調用allocate(100),接著P1調用allocate(10),此時因為沒有足夠的空閑內存,因此P0,P1都會在thread上等待
  • 此時第三個thread調用free(20),當他發送singal信號時,可能不會喚醒P1,而P0因為內存不夠因此繼續等待,因為不知道喚醒哪一個(或哪些)thread,因此無法正常工作

使用cover condition解決

  • 能夠使用pthread_cond_broadcast來喚醒所有的thread,解決了不知道要喚醒哪一個thread的問題,但同時因為全部thread都被喚醒,因此造成不必要的開銷

Semaphores

Semaphores Definition

  • semaphore是一個整數,可以調用sem_waitsem_post來進行操作
    • 調用sem_wait時,會將semaphore的值減1,如果值為0則會進入wait狀態
    • 調用sem_post時,會將semaphore的值加1,如果有thread在wait狀態則會喚醒他
  • 對於semaphore的初始化可為0和1
#include <semaphore.h>
sem_t s;
sem_init(&s, 0, 1); // init semaphore s to 1

Binary Semaphore (mutex)

  • 我們可以將critical section使用semaphore的wait和post環繞,但是否正確運作,semaphore的初始值非常重要
  • 如果一開始初始為0,因為調用wait導致semaphore值變為1-1然後等待,會造成任何thread都無法進入critical section
    • 因此初始化時的x=1x=1
#include <semaphore.h>
sem_t m ;
sem_init(&m, 0, x);
sem_wait(&m); // lock -> decrement
// critical section
sem_post(&m); // unlock -> increment

Semaphores for Ordering

  • semaphore也可以用來暫停某一個thread等待某一個條件成立
  • 這種情況下通常暫停的thread會等待試圖完成條件的thread發送信號,因此可以將semaphore當作condition variable
  • 此時應該要將x=0x=0,這樣當主thread調用wait時會進入wait狀態,直到子thread調用post發送信號才會喚醒
#include <semaphore.h>
sem_t s;
 
void *child() {
  printf("child \n");
  sem_post(&s); // signal
  return NULL;
}
 
int main() {
  sem_init(&s, 0, x);
  printf("parent: begin \n");
  pthread_t p;
  pthread_create(&p, NULL, child, NULL);
  sem_wait(&s); // wait
  printf("parent: end \n");
  return 0;
}

Producer Consumer Problem

  • 使用empty和full分別表示緩衝區空或滿
int buffer[BUFFER_SIZE];
int fill = 0;
int use = 0;
 
void put(int value) {
  buffer[fill] = value;
  fill = (fill + 1) % BUFFER_SIZE;
}
 
int get() {
  int value = buffer[use];
  use = (use + 1) % BUFFER_SIZE;
  return value;
}
 
sem_t empty, full;
void *producer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    sem_wait(&empty);
    put(i);
    sem_post(&full);
  }
}
 
void *consumer(void *arg) {
  int i;
  for (i = 0; i < loops; i++) {
    sem_wait(&full);
    int tmp = get();
    sem_post(&empty);
    printf("%d\n", tmp);
  }
}
 
int main() {
  // ...
  sem_init(&empty, 0, BUFFER_SIZE);
  sem_init(&full, 0, 0);
  // ...
}
  • 目前的設計會有一個問題
    • 第一次當producer調用wait暫停等待consumer信號正常運行
    • 但第二次的時候因為producer並不是變為負數,因此會跳過wait的狀態,讓porucer繼續運行
  • 這樣會導致producer和consumer都會同時使用同一個buffer

增加信號來修復問題

void *producer(void *arg) {
  int i;
  for(i=0;i<loops;i++){
    sem_wait(&mutex); //P0
    sem_wait(&empty);
    put(i);
    sem_post(&full);
    sem_post(&mutex);
  }
}
 
void *consumer(void *arg) {
  int i;
  for(i=0;i<loops;i++){
    sem_wait(&mutex); //P1
    sem_wait(&full);
    int tmp = get();
    sem_post(&empty);
    sem_post(&mutex);
  printf("%d\n",tmp);
  }
}
  • 目前的方案似乎因為使用mutex可以來保護buffer,但可能會發生死鎖問題
  • 這是因為producer第一行P0就先獲得mutex,然後發送給consumer信號時,consumer嘗試獲取鎖P1但producer還沒釋放,因此造成死鎖

改變鎖的順序

void *producer(void *arg) {
  int i;
  for(i=0;i<loops;i++){
    sem_wait(&empty);
    sem_wait(&mutex);
    put(i);
    sem_post(&mutex);
    sem_post(&full);
  }
}
 
void *consumer(void *arg) {
  int i;
  for(i=0;i<loops;i++){
    sem_wait(&full);
    sem_wait(&mutex);
    int tmp = get();
    sem_post(&mutex);
    sem_post(&empty);
  printf("%d\n",tmp);
  }
}
 
int main() {
  // ...
  sem_init(&empty, 0, BUFFER_SIZE); // MAX BUFFER_SIZE are empty to begin with...
  sem_init(&full, 0, 0); // ... and 0 are full
  sem_init(&mutex, 0, 1); // mutex=1 because it is lock
  // ...
}
  • 現在的方案producer會先釋放鎖再發送給consumer信號,這樣就不會造成死鎖的問題,同樣consumer也會先釋放鎖再發送給producer信號