同步与互斥 Shepard-Wang

信号量

1、信号量的引入

#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>

// global shared varible
volatile int cnt = 0; // counter

// thread routine
void *thread(void* arg)
{
  int i, niters = *((int*)arg);
  
  for(int i = 0; i < niters; ++i)
    cnt++;
  
  return NULL;
}

int main(int argc, char* argv[])
{
  int niters;
  pthread_t tid1, tid2;

  if(argc != 2)
  {
    printf("usage: %s <niters>\n", argv[0]);
    exit(0);
  }

  niters = atoi(argv[1]);
  
  // create thread and wait for them to finish
  pthread_create(&tid1, NULL, thread, &niters);
  pthread_create(&tid2, NULL, thread, &niters);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);

  printf("cnt = %d\n", cnt);

  return 0;
}

当输入的 niters 足够大的时候,发现 cnt 并不等于 2 * niters:

[root@SuperhandsomeChuan synchronization]# ./a.out 100000000
cnt = 127028782
[root@SuperhandsomeChuan synchronization]# ./a.out 200000000
cnt = 255307600

问题就处在,我们无法预测线程的执行顺序

2、PV操作

信号量 s 是具有 非负整数值的全局变量 ,只能由两种特殊的操作来处理,这两种操作称为 РV :

  • P(s):如果 s 是非零的,那么 P 将 s 减 1,并且立即返回。如果 s 为零,那么就挂起这个线程,直到 s 变为非零,而一个 V 操作会重启这个线程。在重启之后, Р 操作将 s 减 1,并将控制返回给调用者。
  • V(s): V 操作将 s 加 1。如果有任何线程阻塞在 Р 操作等待s变成非零,那么 V 操作会重启这些线程中的一个,然后该线程将 s 减 1,完成它的 Р 操作。

P 中的 测试和减 1 操作是不可分割的,也就是说,一旦预测信号量 s 变为非负,就会将 s 减 1,不能有中断。V 中的 加 1 操作和测试 也是不可分割的,也就是 加载、加 1 和存储信号量 的过程中没有中断。注意,V 的定义中没有定义等待线程被重新启动的顺序。唯一的要求是 V 必须 只能重启一个 正在等待的线程。因此,当有多个线程在等待同一个信号量时,你不能预测 V 操作要重启哪一个线程

理解了 PV 操作,我们就可以修改 1 中的程序了。

首先了解一下 POSIX semaphore 接口:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);

我们来封装一下 PV 操作:

void P(sem_t* s) {sem_wait(s);}
void V(sem_t* s) {sem_post(s);}

修改后的程序:

#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#include<semaphore.h>

// global shared varible
volatile int cnt = 0; // counter

sem_t mutex;

void P(sem_t* s) {sem_wait(s);}
void V(sem_t* s) {sem_post(s);}

// thread routine
void *thread(void* arg)
{
  int i, niters = *((int*)arg);
  
  for(int i = 0; i < niters; ++i)
  {
    P(&mutex);
    cnt++;
    V(&mutex);
  }
  return NULL;
}

int main(int argc, char* argv[])
{
  int niters;
  pthread_t tid1, tid2;  
  sem_init(&mutex, 0, 1); // mutex = 1

  if(argc != 2)
  {
    printf("usage: %s <niters>\n", argv[0]);
    exit(0);
  }

  niters = atoi(argv[1]);
  
  // create thread and wait for them to finish
  pthread_create(&tid1, NULL, thread, &niters);
  pthread_create(&tid2, NULL, thread, &niters);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);
  
  printf("cnt = %d\n", cnt);

  return 0;
}

运行结果:

[root@SuperhandsomeChuan synchronization]# ./a.out 200000000
cnt = 400000000

3、二值信号量

sem_t m;
sem_init(&m, 0, 1); 

sem_wait(&m);
// critical section here
sem_post(&m);

如果线程 0 持有锁(即调用了 sem_wait()之后,调用 sem_post()之前),另一个线程(线程 1)调用 sem_wait()尝试进入临界区,那么更有趣的情况就发生了。这种情况下,线程 1把信号量减为-1,然后等待(自己睡眠,放弃处理器)。线程 0 再次运行,它最终调用 sem_post(),将信号量的值增加到 0,唤醒等待的线程(线程 1),然后线程 1 就可以获取锁。线程 1 执行结束时,再次增加信号量的值,将它恢复为 1。

二值信号量也可以理解为互斥锁

4、信号量用作条件变量

信号量也可以用在一个线程暂停执行,等待某一条件成立的场景。

sem_t s;

void *child(void *arg) {
	printf("child\n");
	sem_post(&s); // signal here: child is done
	return NULL;
}

int main(int argc, char *argv[]) {
	sem_init(&s, 0, 0); 
	printf("parent: begin\n");
	pthread_t c;
	Pthread_create(c, NULL, child, NULL);
	sem_wait(&s); // wait here for child
	printf("parent: end\n");
	return 0;
}

注意,为了让父进程等待子进程执行完成,这里信号量的初始值我们设置为 0 。

这个程序有两种可能的执行情况:

1)父进程 sem_wait 先于子进程 sem_post执行

这种情况下,首先父进程调用 sem_wait ,信号量的值减去 1,变成 -1,父进程进入睡眠状态。当子进程调用 sem_post 将信号量的值加上 1,变为 0 时,父进程进入运行状态。

2)子进程 sem_post 先于父进程 sem_wait执行

子进程首先调用 sem_post ,信号量从 0 变为 1 。父进程调用 sem_wait ,信号量变为 0 ,父进程继续执行。

5、生产者消费者模型(有界缓冲区问题)

因为插入和取出项目都涉及更新共享变量,所以我们必须保证对缓冲区的访问是互斥的。但是只保证互斥访问是不够的,我们还需要调度对缓冲区的访问。如果缓冲区是满的(没有空的槽位),那么生产者必须等待直到有一个槽位变为可用。与之相似,如果缓冲区是空的(没有可取用的项目),那么消费者必须等待直到有一个项目变为可用。

下面的一个例子是 CSAPP 中的示例,SBUF 包,用来构建生产者消费者模型。首先创建一个结构体 sbuf_t,内容如下:

typedef struct
{
    int *buf; 	 // 有限缓冲区
    int n;    	 // 有限缓冲区最大槽位
    int front;	 // 第一个可用项目所在的槽位的前一个
    int rear;	 // 最后一个可用项目
    sem_t mutex; // 对有限缓冲区的互斥锁
    sem_t slots; // 可用的槽位
    sem_t items; // 可用的项目数
}sbuf_t;

4 个函数的功能如下:

void sbuf_init(sbuf_t *sp, int n); // 初始化,分配 n 个槽位给缓冲区
void sbuf_init(sbuf_t *sp); // 销毁缓冲区
void sbuf_insert(sbuf_t *sp, int item); // 向缓冲区加入 item 
int sbuf_remove(sbuf_t *sp); // 从缓冲区中取出

完整程序(加测试):

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<semaphore.h>
#include<time.h>

typedef struct
{
  int *buf;  
  int n;
  int front; // 指向第一个元素前面的位置
  int rear;  // 指向最后一个元素
  sem_t mutex;
  sem_t slots;
  sem_t items;
}sbuf_t;

// PV操作
void P(sem_t *s){sem_wait(s);}
void V(sem_t *s){sem_post(s);}

// 创建一个大小为 n 个槽,空的缓冲区
void sbuf_init(sbuf_t *sp, int n)
{
  sp->buf = (int*)calloc(n, sizeof(int));
  sp->n = n;
  sp->front = sp->rear = 0;
  sem_init(&sp->mutex, 0, 1);
  sem_init(&sp->slots, 0, n);
  sem_init(&sp->items, 0, 0);
}

// 释放缓冲区
void sbuf_deinit(sbuf_t *sp)
{
  free(sp->buf);
}

// 向缓冲区中加入 item 
void sbuf_insert(sbuf_t *sp, int item)
{
  P(&sp->slots);
  P(&sp->mutex);
  sp->buf[++sp->rear % sp->n] = item;
  V(&sp->mutex);
  V(&sp->items);
}

// 从缓冲区中取出 item 
int sbuf_remove(sbuf_t *sp)
{
  int ret;
  P(&sp->items);
  P(&sp->mutex);
  ret = sp->buf[(++sp->front) % sp->n];
  V(&sp->mutex);
  V(&sp->slots);

  return ret;
}

void *producer1(void* arg)
{
  while(1)
  {
    usleep(300); 
    int num = rand() % 10000;
    sbuf_insert((sbuf_t*)arg, num);
    printf("producer1 生产:%d\n", num);
  }
  return NULL;
}

void *consumer1(void* arg)
{
  while(1)
  {
    sleep(1);
    printf("consumer1 消费 %d\n", sbuf_remove((sbuf_t*)arg));
  }

  return NULL;
}

void *consumer2(void* arg)
{
  while(1)
  {
    sleep(2);
    printf("consumer2 消费 %d\n", sbuf_remove((sbuf_t*)arg));
  }

  return NULL;
}

int main()
{ 
  pthread_t tid1, tid2, tid3;
  sbuf_t sbuf;
  
  srand(time(NULL));
  sbuf_init(&sbuf, 5);

  pthread_create(&tid1, NULL, producer1, &sbuf);
  pthread_create(&tid2, NULL, consumer1, &sbuf);
  pthread_create(&tid3, NULL, consumer2, &sbuf);
  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);
  pthread_join(tid3, NULL);
  
  sbuf_deinit(&sbuf);

  return 0;
}

6、读者写者模型

第一类,读者优先级高于写者

要求不要让读者等待,除非已经有写者在写

// readcnt:记录读者的个数
// 信号量 w 保护临界资源(对文件的读或写)
// 信号量 mutex 保护 readcnt

reader()
{
	P(mutex);
    readcnt++;
    if(readcnt == 1) // 第一个读者
       P(w);
    V(mutex);
    
    // 执行读动作
    
    P(mutex);
    readcnt--;
    if(readcnt == 0) // 最后一个读者
        V(w);
    V(mutex);
}

writer()
{
	P(w);
    
    // 执行写动作
    
    V(w);
}
第二类,写者优先级高于读者

只要有读者想写,那么读进程就必须等待

// readcnt:记录读者的个数
// 信号量 w 保护临界资源(对文件的读或写)
// 信号量 mutex 保护 readcnt
// writecnt 记录写着的个数
// 信号量 mutex2 保护 writecnt
// writecnt 控制信号量 canread,用来表示读者是否可以读 

reader()
{
    // 如果没有写者,canread 可以放读者不断进来。一旦有了写者,就不会有增加的读者了、
    // canread 控制的是读者是否可以开始读
    P(canread);
	P(mutex);
    readcnt++;
    if(readcnt == 1) // 第一个读者
       P(w);
    V(mutex);
    V(canread);
    
    // 执行读动作
    
    P(mutex);
    readcnt--;
    if(readcnt == 0) // 最后一个读者
        V(w);
    V(mutex);
}

writer()
{
	P(mutex2);
    writecnt++;
    // 第一个写者进来时,阻塞后面的读者
    if(writecnt == 1) 
        P(canread);
    V(mutex2);
   	// w 控制 写写互斥,读写互斥
    P(w)
    // write
    V(w);
    P(mutex2);
    writecnt--;
    // 没有写着,唤醒读者
    if(writecnt == 0)
        V(canread);
    V(mutex2);
}

首先,当没有写者时,对读者不造成影响。

当有写者想写时,立马持有 canread(这里可能会和读者竞争 canread 锁)。之后所有的读者都阻塞,写着等待 canread 释放。读者不会增加,等待最后的读者读完,释放 w 锁。写者开始写,每个写者写的操作都是独立的(被 w 保护),当最后一个写着写完后,释放 canread 锁,读者又可以读了。

7、哲学家就餐问题

这个问题的基本情况是:假定有 5 位“哲学家”围着一个圆桌。每两位哲学家之间有一把餐叉(一共 5 把)。哲学家有时要思考一会,不需要餐叉;有时又要就餐。而一位哲学家只有同时拿到了左手边和右手边的两把餐叉,才能吃到东西。

每个哲学家线程:

while(1)
{
    think();
    getfork();
    eat();
    putfork();
}

关键的挑战就是如何实现 getforks()和 putforks()函数,保证没有死锁,没有哲学家饿死,并且并发度更高(尽可能让更多哲学家同时吃东西)。

每个哲学家都知道自己的编号:

int left(int p) { return p; }
int right(int p) { return (p + 1) % 5; }

我们需要一些信号量来解决这个问题。假设需要 5 个,每个餐叉一个: sem_t forks[5]

这个问题有很多种解决办法,比如杀死一个哲学家 每个哲学家要么同时拿起左右两个叉子,要么不拿,单数哲学家先拿左边的叉子,双数哲学家先拿右边的叉子等等。

下面这个做法是这样,让 0 ~ 3 号哲学家先拿左叉子,4 号哲学家先拿右叉子。

void getforks() {
  if (p == 4) {
 	 sem_wait(&forks[right(p)]);
  	 sem_wait(&forks[left(p)]);
  } else {
  	sem_wait(&forks[left(p)]);
  	sem_wait(&forks[right(p)]);
  }
 }

补充:System V 信号量 与 POSIX 信号量 的区别

Linux 中,对于信号量 semaphores 存在两个版本:System VPOSIX

使用中文搜索这两个版本的区别时,各大博客网站上的文章内容如出一辙。知乎上对于这个问题的解释也不详尽,于是到 StackOverFlow 上搜索发现这两个版本的主要区别在于:

  • System V 有信号量集(可以控制多个信号量,这个操作是原子的)
  • POSIX 信号量性能好,设计简单等一些优点(详细参考最后的参考文章)

但是最重要的一点,

POSIX semaphores provide no mechanism to wake a waiting process when a different process dies while holding a semaphore lock.This lack of cleanup can lead to zombie semaphores which will cause any other or subsequent process that tries to use them to deadlock.

当一个持有信号量的进程死亡时,POSIX 信号量没有办法唤醒等待这个信号量的进程。从而可能会造成死锁

从这个角度来说,System V 信号量更加安全。

参考文章: