如果这篇博客帮助到你,可以请我喝一杯咖啡~
CC BY 4.0 (除特别声明或转载文章外)
信号量
1、信号量的引入
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
// global shared varible
volatile int cnt = 0; // counter
// thread routine
void *thread(void* arg)
{
int i, niters = *((int*)arg);
for(int i = 0; i < niters; ++i)
cnt++;
return NULL;
}
int main(int argc, char* argv[])
{
int niters;
pthread_t tid1, tid2;
if(argc != 2)
{
printf("usage: %s <niters>\n", argv[0]);
exit(0);
}
niters = atoi(argv[1]);
// create thread and wait for them to finish
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
printf("cnt = %d\n", cnt);
return 0;
}
当输入的 niters 足够大的时候,发现 cnt 并不等于 2 * niters
:
[root@SuperhandsomeChuan synchronization]# ./a.out 100000000
cnt = 127028782
[root@SuperhandsomeChuan synchronization]# ./a.out 200000000
cnt = 255307600
问题就处在,我们无法预测线程的执行顺序
2、PV操作
信号量 s
是具有 非负整数值的全局变量 ,只能由两种特殊的操作来处理,这两种操作称为 Р
和 V
:
P(s)
:如果 s 是非零的,那么 P 将 s 减 1,并且立即返回。如果 s 为零,那么就挂起这个线程,直到 s 变为非零,而一个 V 操作会重启这个线程。在重启之后, Р 操作将 s 减 1,并将控制返回给调用者。V(s)
: V 操作将 s 加 1。如果有任何线程阻塞在 Р 操作等待s变成非零,那么 V 操作会重启这些线程中的一个,然后该线程将 s 减 1,完成它的 Р 操作。
P 中的 测试和减 1 操作是不可分割的,也就是说,一旦预测信号量 s 变为非负,就会将 s 减 1,不能有中断。V 中的 加 1 操作和测试 也是不可分割的,也就是 加载、加 1 和存储信号量 的过程中没有中断。注意,V 的定义中没有定义等待线程被重新启动的顺序。唯一的要求是 V 必须 只能重启一个 正在等待的线程。因此,当有多个线程在等待同一个信号量时,你不能预测 V 操作要重启哪一个线程。
理解了 PV 操作,我们就可以修改 1 中的程序了。
首先了解一下 POSIX semaphore 接口:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
我们来封装一下 PV 操作:
void P(sem_t* s) {sem_wait(s);}
void V(sem_t* s) {sem_post(s);}
修改后的程序:
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#include<semaphore.h>
// global shared varible
volatile int cnt = 0; // counter
sem_t mutex;
void P(sem_t* s) {sem_wait(s);}
void V(sem_t* s) {sem_post(s);}
// thread routine
void *thread(void* arg)
{
int i, niters = *((int*)arg);
for(int i = 0; i < niters; ++i)
{
P(&mutex);
cnt++;
V(&mutex);
}
return NULL;
}
int main(int argc, char* argv[])
{
int niters;
pthread_t tid1, tid2;
sem_init(&mutex, 0, 1); // mutex = 1
if(argc != 2)
{
printf("usage: %s <niters>\n", argv[0]);
exit(0);
}
niters = atoi(argv[1]);
// create thread and wait for them to finish
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
printf("cnt = %d\n", cnt);
return 0;
}
运行结果:
[root@SuperhandsomeChuan synchronization]# ./a.out 200000000
cnt = 400000000
3、二值信号量
sem_t m;
sem_init(&m, 0, 1);
sem_wait(&m);
// critical section here
sem_post(&m);
如果线程 0 持有锁(即调用了 sem_wait()之后,调用 sem_post()之前),另一个线程(线程 1)调用 sem_wait()尝试进入临界区,那么更有趣的情况就发生了。这种情况下,线程 1把信号量减为-1,然后等待(自己睡眠,放弃处理器)。线程 0 再次运行,它最终调用 sem_post(),将信号量的值增加到 0,唤醒等待的线程(线程 1),然后线程 1 就可以获取锁。线程 1 执行结束时,再次增加信号量的值,将它恢复为 1。
二值信号量也可以理解为互斥锁
4、信号量用作条件变量
信号量也可以用在一个线程暂停执行,等待某一条件成立的场景。
sem_t s;
void *child(void *arg) {
printf("child\n");
sem_post(&s); // signal here: child is done
return NULL;
}
int main(int argc, char *argv[]) {
sem_init(&s, 0, 0);
printf("parent: begin\n");
pthread_t c;
Pthread_create(c, NULL, child, NULL);
sem_wait(&s); // wait here for child
printf("parent: end\n");
return 0;
}
注意,为了让父进程等待子进程执行完成,这里信号量的初始值我们设置为 0 。
这个程序有两种可能的执行情况:
1)父进程 sem_wait
先于子进程 sem_post
执行
这种情况下,首先父进程调用 sem_wait
,信号量的值减去 1,变成 -1,父进程进入睡眠状态。当子进程调用 sem_post
将信号量的值加上 1,变为 0 时,父进程进入运行状态。
2)子进程 sem_post
先于父进程 sem_wait
执行
子进程首先调用 sem_post
,信号量从 0 变为 1 。父进程调用 sem_wait
,信号量变为 0 ,父进程继续执行。
5、生产者消费者模型(有界缓冲区问题)
因为插入和取出项目都涉及更新共享变量,所以我们必须保证对缓冲区的访问是互斥的。但是只保证互斥访问是不够的,我们还需要调度对缓冲区的访问。如果缓冲区是满的(没有空的槽位),那么生产者必须等待直到有一个槽位变为可用。与之相似,如果缓冲区是空的(没有可取用的项目),那么消费者必须等待直到有一个项目变为可用。
下面的一个例子是 CSAPP 中的示例,SBUF 包,用来构建生产者消费者模型。首先创建一个结构体 sbuf_t
,内容如下:
typedef struct
{
int *buf; // 有限缓冲区
int n; // 有限缓冲区最大槽位
int front; // 第一个可用项目所在的槽位的前一个
int rear; // 最后一个可用项目
sem_t mutex; // 对有限缓冲区的互斥锁
sem_t slots; // 可用的槽位
sem_t items; // 可用的项目数
}sbuf_t;
4 个函数的功能如下:
void sbuf_init(sbuf_t *sp, int n); // 初始化,分配 n 个槽位给缓冲区
void sbuf_init(sbuf_t *sp); // 销毁缓冲区
void sbuf_insert(sbuf_t *sp, int item); // 向缓冲区加入 item
int sbuf_remove(sbuf_t *sp); // 从缓冲区中取出
完整程序(加测试):
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<semaphore.h>
#include<time.h>
typedef struct
{
int *buf;
int n;
int front; // 指向第一个元素前面的位置
int rear; // 指向最后一个元素
sem_t mutex;
sem_t slots;
sem_t items;
}sbuf_t;
// PV操作
void P(sem_t *s){sem_wait(s);}
void V(sem_t *s){sem_post(s);}
// 创建一个大小为 n 个槽,空的缓冲区
void sbuf_init(sbuf_t *sp, int n)
{
sp->buf = (int*)calloc(n, sizeof(int));
sp->n = n;
sp->front = sp->rear = 0;
sem_init(&sp->mutex, 0, 1);
sem_init(&sp->slots, 0, n);
sem_init(&sp->items, 0, 0);
}
// 释放缓冲区
void sbuf_deinit(sbuf_t *sp)
{
free(sp->buf);
}
// 向缓冲区中加入 item
void sbuf_insert(sbuf_t *sp, int item)
{
P(&sp->slots);
P(&sp->mutex);
sp->buf[++sp->rear % sp->n] = item;
V(&sp->mutex);
V(&sp->items);
}
// 从缓冲区中取出 item
int sbuf_remove(sbuf_t *sp)
{
int ret;
P(&sp->items);
P(&sp->mutex);
ret = sp->buf[(++sp->front) % sp->n];
V(&sp->mutex);
V(&sp->slots);
return ret;
}
void *producer1(void* arg)
{
while(1)
{
usleep(300);
int num = rand() % 10000;
sbuf_insert((sbuf_t*)arg, num);
printf("producer1 生产:%d\n", num);
}
return NULL;
}
void *consumer1(void* arg)
{
while(1)
{
sleep(1);
printf("consumer1 消费 %d\n", sbuf_remove((sbuf_t*)arg));
}
return NULL;
}
void *consumer2(void* arg)
{
while(1)
{
sleep(2);
printf("consumer2 消费 %d\n", sbuf_remove((sbuf_t*)arg));
}
return NULL;
}
int main()
{
pthread_t tid1, tid2, tid3;
sbuf_t sbuf;
srand(time(NULL));
sbuf_init(&sbuf, 5);
pthread_create(&tid1, NULL, producer1, &sbuf);
pthread_create(&tid2, NULL, consumer1, &sbuf);
pthread_create(&tid3, NULL, consumer2, &sbuf);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
sbuf_deinit(&sbuf);
return 0;
}
6、读者写者模型
第一类,读者优先级高于写者
要求不要让读者等待,除非已经有写者在写
// readcnt:记录读者的个数
// 信号量 w 保护临界资源(对文件的读或写)
// 信号量 mutex 保护 readcnt
reader()
{
P(mutex);
readcnt++;
if(readcnt == 1) // 第一个读者
P(w);
V(mutex);
// 执行读动作
P(mutex);
readcnt--;
if(readcnt == 0) // 最后一个读者
V(w);
V(mutex);
}
writer()
{
P(w);
// 执行写动作
V(w);
}
第二类,写者优先级高于读者
只要有读者想写,那么读进程就必须等待
// readcnt:记录读者的个数
// 信号量 w 保护临界资源(对文件的读或写)
// 信号量 mutex 保护 readcnt
// writecnt 记录写着的个数
// 信号量 mutex2 保护 writecnt
// writecnt 控制信号量 canread,用来表示读者是否可以读
reader()
{
// 如果没有写者,canread 可以放读者不断进来。一旦有了写者,就不会有增加的读者了、
// canread 控制的是读者是否可以开始读
P(canread);
P(mutex);
readcnt++;
if(readcnt == 1) // 第一个读者
P(w);
V(mutex);
V(canread);
// 执行读动作
P(mutex);
readcnt--;
if(readcnt == 0) // 最后一个读者
V(w);
V(mutex);
}
writer()
{
P(mutex2);
writecnt++;
// 第一个写者进来时,阻塞后面的读者
if(writecnt == 1)
P(canread);
V(mutex2);
// w 控制 写写互斥,读写互斥
P(w)
// write
V(w);
P(mutex2);
writecnt--;
// 没有写着,唤醒读者
if(writecnt == 0)
V(canread);
V(mutex2);
}
首先,当没有写者时,对读者不造成影响。
当有写者想写时,立马持有 canread(这里可能会和读者竞争 canread 锁)。之后所有的读者都阻塞,写着等待 canread 释放。读者不会增加,等待最后的读者读完,释放 w 锁。写者开始写,每个写者写的操作都是独立的(被 w 保护),当最后一个写着写完后,释放 canread 锁,读者又可以读了。
7、哲学家就餐问题
这个问题的基本情况是:假定有 5 位“哲学家”围着一个圆桌。每两位哲学家之间有一把餐叉(一共 5 把)。哲学家有时要思考一会,不需要餐叉;有时又要就餐。而一位哲学家只有同时拿到了左手边和右手边的两把餐叉,才能吃到东西。
每个哲学家线程:
while(1)
{
think();
getfork();
eat();
putfork();
}
关键的挑战就是如何实现 getforks()和 putforks()函数,保证没有死锁,没有哲学家饿死,并且并发度更高(尽可能让更多哲学家同时吃东西)。
每个哲学家都知道自己的编号:
int left(int p) { return p; }
int right(int p) { return (p + 1) % 5; }
我们需要一些信号量来解决这个问题。假设需要 5 个,每个餐叉一个: sem_t forks[5]
这个问题有很多种解决办法,比如杀死一个哲学家 每个哲学家要么同时拿起左右两个叉子,要么不拿,单数哲学家先拿左边的叉子,双数哲学家先拿右边的叉子等等。
下面这个做法是这样,让 0 ~ 3 号哲学家先拿左叉子,4 号哲学家先拿右叉子。
void getforks() {
if (p == 4) {
sem_wait(&forks[right(p)]);
sem_wait(&forks[left(p)]);
} else {
sem_wait(&forks[left(p)]);
sem_wait(&forks[right(p)]);
}
}
补充:System V 信号量 与 POSIX 信号量 的区别
Linux 中,对于信号量 semaphores 存在两个版本:System V
和 POSIX
使用中文搜索这两个版本的区别时,各大博客网站上的文章内容如出一辙。知乎上对于这个问题的解释也不详尽,于是到 StackOverFlow 上搜索发现这两个版本的主要区别在于:
- System V 有信号量集(可以控制多个信号量,这个操作是原子的)
- POSIX 信号量性能好,设计简单等一些优点(详细参考最后的参考文章)
但是最重要的一点,
POSIX semaphores provide no mechanism to wake a waiting process when a different process dies while holding a semaphore lock.This lack of cleanup can lead to zombie semaphores which will cause any other or subsequent process that tries to use them to deadlock.
当一个持有信号量的进程死亡时,POSIX 信号量没有办法唤醒等待这个信号量的进程。从而可能会造成死锁。
从这个角度来说,System V 信号量更加安全。
参考文章:
- Differences between System V and Posix semaphores
- 《深入理解计算机系统》
- 《操作系统导论》