Linux线程同步
2022-03-04 # 学习笔记 # Linux

Linux线程同步

前置补充内容

manpage 中无法查找pthread_mutex_*函数

解决方式:

1
sudo apt-get install manpages-posix-dev

互斥锁(mutex)

基础内容

说明:linux中的mutex互斥锁为建议锁,并不具备强制性。对于公共数据,即使一个线程使用了mutex变量上锁,其他线程也可以在没有检测锁或者没有使用lock在锁上阻塞的情况下,直接去访问修改公共数据。

主要函数:

1
2
3
4
5
pthread_mutex_init();
pthread_mutex_destory();
pthread_mutex_lock ();
pthread_mutex_trylock();
pthread_mutex_unlock();

以上 5 个函数的返回值都是:成功返回 0,失败返回错误号

使用步骤:

创建锁:

1
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr)

这里的 restrict 关键字,表示指针指向的内容只能通过这个指针进行修改 restrict 关键字: 用来限定指针变量。被该关键字限定的指针变量所指向的内存操作,必须由本指针完成。

1
2
3
pthread_mutex_t mutex;
1. pthread_mutex_init(&mutex, NULL); //动态初始化。
2. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //静态初始化。

Trylock:

lock函数为尝试上锁后不成功继续阻塞等待,直到锁被释放后再上锁,继续执行下面的操作。trylock则为尝试加锁,成功则上锁,不成功则返回错误号(如EBUSY),不阻塞。

1
pthread_mutex_trylock(&mutex);

注意事项:应尽量缩小锁的粒度,在使用完资源后尽可能的立即释放解锁。

实例:用互斥锁来实现两个线程内容的有序输出

需求:用两个线程交替输出“HELLO” “WORLD” “hello” “world”四个单词,要求HELLO 和 WORLD始终在同一行。hello 与 world 在同一行。

不使用互斥锁时的代码及输出效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>

void* tfn(void* arg)
{
while (1)
{
printf("hello");
sleep(rand() % 3);
printf("world\n");
sleep(rand() % 3);
}
}

int main(int argc, char* argv[])
{
pthread_t tid;
pthread_create(&tid, NULL, tfn, NULL);


while (1)
{
printf("HELLO");
sleep(rand() % 3);
printf("WORLD\n");
sleep(rand() % 3);
}
pthread_join(tid, NULL);
}

添加了互斥锁后的代码及输出效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>

pthread_mutex_t mutex;//创建互斥锁

void* tfn(void* arg)
{
while (1)
{
pthread_mutex_lock(&mutex);//加锁
printf("hello");
sleep(rand() % 3);
printf("world\n");
pthread_mutex_unlock(&mutex);//解锁
sleep(rand() % 3);
//pthread_mutex_unlock(&mutex)
//若将解锁位置放在此处,则会造成刚刚解锁完就继续加锁的情况。因此要放置于上方
//因此在使用互斥锁时应注意使用完立即释放,并且保持较小的粒度。
}
}

int main(int argc, char* argv[])
{
pthread_t tid;

pthread_mutex_init(&mutex, NULL);
pthread_create(&tid, NULL, tfn, NULL);


while (1)
{
pthread_mutex_lock(&mutex);//加锁
printf("HELLO");
sleep(rand() % 3);
printf("WORLD\n");
pthread_mutex_unlock(&mutex);//解锁
sleep(rand() % 3);

}
pthread_join(tid, NULL);
pthread_mutex_destroy(&mutex);
}

死锁:

互斥锁产生死锁的情况有以下两种:

  • 一个线程对一把锁进行了多次lock操作
  • 两个线程,各自持有一把锁的情况下,互相请求对方的那一把锁

读写锁

特性:

优势:相比于互斥锁,对数据访问时的具体操作做了区分,当读线程多的时候,能够明显的提高访问效率。

执行方式:

1.读写锁是“写模式加锁”时,解锁前,所有对该锁加锁的线程都会被阻塞。

2.读写锁是“读模式加锁”时,如果线程以读模式对其加锁会成功,如果线程以写模式加锁会阻塞。

3.读写锁是“读模式加锁”时,既有试图以写模式加锁的线程,也有试图以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求。优先满足写模式锁。读锁、写锁并行阻塞,写锁优先级高。

读写锁也叫共享-独占锁。当读写锁以读模式锁住时,它是以共享模式锁住的,当它以写模式锁住时,它是以独占模式锁住的。写独占、读共享。

读写锁非常适合于对数据结构读的次数远大于写的情况。

总结:读共享,写独占,写锁优先级高。

主要函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//读写锁定义
pthread_rwlock_t rwlock;
//读写锁初始化
pthread_rwlock_init(&rwlock, NULL);
//加锁
pthread_rwlock_rdlock(&rwlock);
pthread_rwlock_wrlock(&rwlock);
//尝试加锁
pthread_rwlock_tryrdlock(&rwlock);
pthread_rwlock_trywrlock(&rwlock);
//解锁
pthread_rwlock_unlock(&rwlock);
//销毁锁
pthread_rwlock_destroy(&rwlock);

实例:

用读写锁来同步对共享变量的读写,验证读共享写独占的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

pthread_rwlock_t rwlock;//定义读写锁
int counter;//要修改的共享变量

void* th_read(void* arg)
{
long i = (long)arg;
while (1)
{
pthread_rwlock_rdlock(&rwlock);//加锁
printf("th_read %d,counter = %d\n", i, counter);
pthread_rwlock_unlock(&rwlock);//解锁
sleep(2);
}
}

void* th_write(void* arg)
{
long i = (long)arg;
while (1)
{
pthread_rwlock_wrlock(&rwlock);//加锁
printf("th_write %d ,counter = %d,++counter = %d\n", i, counter, ++counter);
pthread_rwlock_unlock(&rwlock);//解锁
sleep(1);
}

}

int main(int argc, char* argv[])
{
pthread_t tid[8];
int i;
counter = 0;

//创建三个写线程
for (i = 0; i < 3; i++)
{
//传入参数表明该函数是第几个写线程
pthread_create(&tid[i], NULL, th_write, (void*)(i+1));
}
//创建五个读线程
for (i = 0; i < 5; i++)
{
//传入参数表明该函数是第几个读线程
pthread_create(&tid[i + 3], NULL, th_read, (void*)(i + 1));
}


//回收线程
for (i = 0; i < 8; i++)
{
pthread_join(tid[i], NULL);
}
//释放读写锁
pthread_rwlock_destroy(&rwlock);
}

可以看出,写进程的counter是唯一的。说明在写进程进行写入操作时,其他线程在阻塞等待。

而出现多个读线程读出同一个counter值的情况。说明读锁是共享的。

条件变量

为什么需要条件变量

条件变量的引入,主要是为了解决互斥量机制效率低下的问题:

如果只使用互斥量,那么一个线程在不知道另一个线程是否使用完了资源的情况下,就会在自己的时钟周期内一直阻塞在给互斥量加锁的操作上。而并不是把自己的时钟周期让出给其他线程从而尽快完成任务释放资源(伪代码表示如下)。

1
2
3
4
5
while(1){
加锁; //会一直阻塞在这里
执行对共享区的访问或修改;
解锁;
}

而我们也可能会考虑使用休眠的机制来让被阻塞的线程让出自己的时钟周期,例如下面的形式:

1
2
3
4
5
6
7
8
9
while(1){
if(已经被上锁了)
休眠10s
else{
加锁;
取元素;
解锁
}
}

但是此种方式也可能在被阻塞的线程刚进入休眠时,另一个线程释放了锁,但需要使用资源的线程仍然需要休眠完整个10s的时间。造成不必要的延迟。当然如果不sleep,也可以,但会造成不必要的CPU开销。使用基于条件变量的事件通知唤醒机制,就可以避免这些问题。一旦正在占用资源的线程完成操作后就执行pthread_cond_signal(),当前阻塞的线程就会立即被唤醒开始工作。

条件变量的使用方式:

主要应用函数:

1
2
3
4
5
6
7
8
9
10
//条件变量的定义,初始化,销毁
pthread_cond_t cond;
pthread_cond_init();
pthread_cond_destory();
//条件变量等待
pthread_cond_wait();//忙等
pthread_cond_timedwait();//有时限的等待
//发送信号
pthread_cond_sisgnal();//向至少一个进程发送
pthread_cond_broadcast();//向全部进程发送

初始化条件变量:

  1. pthread_cond_init(&cond, NULL); 动态初始化。

  2. pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 静态初始化。

阻塞等待条件:

pthread_cond_wait(&cond, &mutex);

作用:

  1. 阻塞等待条件变量满足

  2. 解锁已经加锁成功的信号量 (相当于 pthread_mutex_unlock(&mutex)),12 两步为 一个原子操作

  1. 当条件满足,函数返回时,解除阻塞并重新申请获取互斥锁。重新加锁信号量 (相当于, pthread_mutex_lock(&mutex);)

为什么条件变量需要结合互斥量来使用

在上面的wait函数中,我们可以看到参数有一个mutex互斥量。原因在于:

要实现wait和signal的功能,就需要有一个公共的条件(布尔)变量,调用wait的线程需要读取这个布尔表达式内数据,

同样,调用signal的线程需要修改这个布尔表达式的数据,让表达式为真。故而这两个线程必然访问至少一个公共变量。从而实现消息的传递。那么对于这样一个公共变量。就一定需要一个互斥锁来保护。

在下面的生产者消费者模型中,我们会发现,mutex还有着另一个作用,不仅可以保护上面提到的公共变量,同时也可以保护生产者消费者模型中的临界区。

wait函数具体流程

在wait函数执行之前,我们需要做的有:

  • 声明并初始化一个互斥量
  • 对互斥量进行上锁操作

接下来,wait函数会执行下面的几步

  • 完成准备工作(例如读取上面提到的条件变量)
  • 解锁上面提供的互斥量
  • 阻塞线程并等待条件满足

当条件满足时

  • 解除阻塞
  • 对提供的互斥量加锁
  • 完成剩余的操作

从过程中我们可以看出,通过一个mutex变量。在wait函数准备操作进行之前上锁,进行结束后解锁。成功的将wait函数的部分操作变为了原子操作,从而保证了数据一致性。

实例:用条件变量实现生产者消费者模型(多个消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <stdio.h> 
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

//静态初始化条件变量和互斥量
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

struct msg {
struct msg* next;
int num;
};

struct msg* head = NULL;

void* producer(void* arg)
{
while (1)
{
//产生新的节点
struct msg* temp = NULL;
temp = (struct msg*)malloc(sizeof(struct msg));
temp->num = rand() % 300;

//访问临界区,用mutex保护
pthread_mutex_lock(&lock);
//将新的节点连接到链表中
temp->next = head;
head = temp;
printf("producer: create %d\n", temp->num);
pthread_mutex_unlock(&lock);

//唤醒消费者
pthread_cond_signal(&has_product);
sleep(1);
}

return NULL;
}

void* consumer(void* arg)
{
long i = (long)arg;
while (1)
{
struct msg* temp;

pthread_mutex_lock(&lock);
//此处用while而不能用if
//因为可能会出现同时唤醒多个消费者的情况
while (head == NULL)
{
pthread_cond_wait(&has_product, &lock);
}

//访问临界区
temp = head;
head = temp->next;
pthread_mutex_unlock(&lock);

printf("consumer%d: %d\n",i ,temp->num);

free(temp);
sleep(rand() % 3 +2);
}

return NULL;
}

int main(int argc, char* argv[])
{
pthread_t pid, cid[2];

srand(time(NULL));

pthread_create(&pid, NULL, producer, NULL);

for (int i = 0; i < 2; i++)
{
pthread_create(&cid[i], NULL, consumer, (void*)(i + 1));
}

pthread_join(pid, NULL);

for (int i = 0; i < 2; i++)
{
pthread_join(cid[i], NULL);
}

return 0;
}

运行效果:

信号量

基本使用方式:

信号量应用于线程,进程间的同步。相当于值为N的互斥量。N值表示剩余资源的数量,或是可以同时访问共享数据区的线程数。

相关函数:

1
2
3
sem_t sem; //定义类型。 
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_destroy(&sem);

init参数:

  • sem: 信号量

  • pshared: 0: 用于线程间同步 1: 用于进程间同步

  • value:N 值。

1
2
sem_wait(&sem); 
sem_post(&sem);

分别为消耗一个资源和添加一个资源。wait会使信号量–,post则会使信号量++。wait时若信号量为0则会阻塞。

实例:信号量实现生产者消费者模型(单个消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <stdio.h> 
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

#define NUM 5


int queue[NUM];//全局数组实现环形队列
sem_t blank_num, product_num;//剩余空格数量和产品数量


void* producer(void* arg)
{
int i = 0;
while (1)
{
sem_wait(&blank_num);
queue[i] = rand() % 1000 + 1;
printf("Produce %d\n", queue[i]);
sem_post(&product_num);

i = (i + 1) % NUM;
sleep(rand() % 1);

}
return NULL;
}

void* consumer(void* arg)
{
int i = 0;
while (1)
{
sem_wait(&product_num);
printf("Consume %d\n",queue[i]);
queue[i] = 0;
sem_post(&blank_num);

i = (i + 1) % NUM;
sleep(rand() % 3);
}

return NULL;
}


int main(int argc, char* argv)
{
pthread_t pid, cid;
sem_init(&blank_num, 0, NUM);
sem_init(&product_num, 0, 0);

pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);


pthread_join(pid, NULL);
pthread_join(cid, NULL);

sem_destroy(&blank_num);
sem_destroy(&product_num);

return 0;
}

注:使用信号量实现多个消费者模型需要借助互斥锁。