Posix信号量
Nevermore 2022-05-21 OS
信号量s是一个具有非负整数值的全局变量,只有两个特殊的操作,称为P和V:P和V的名字来自荷兰语Proberen(测试)和Verhogen(增量)。P操作等待信号量s变为非零,然后将其递减。V操作增加s。信号量与每个共享资源相关联,记录了共享资源的个数。PV操作保证了线程的同步和互斥,这样多线程进行获取及并发执行的时候可以避免不安全的访问。
- P (s): while (s <= 0); s--;
- V (s): s++;
为了保证信号量的大小是有限的,并且在条件重新满足后线程可以重复访问临界资源(消费者释放的空间可以被生产者填充,生产者生产的数据可以被消费者获取),可以定义环形队列来进行数据的存储。判断生产者和消费者当前是否能够各自执行自己的操作(空间或数据剩余),可以使用信号量:生产者生产了数据就让空间-1,数据+1,空间不足就不能再生产,需要等待消费者进行消费;消费者同理。
Posix信号的系统调用:
int sem_init(sem_t *sem, int pshared, unsigned int value); //初始化,pshared为0表示被线程共享
int sem_destroy(sem_t *sem); //销毁
int sem_wait(sem_t *sem); //执行信号量-1的操作(P操作),表明资源被申请;若setm为0,则会阻塞等待。
int sem_post(sem_t *sem); // 执行信号量+1的操作(V操作);若sem不为0,则会唤醒sem_wait并为当前线程加锁。
一段代码
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
#include <cstdlib>
#include <unistd.h>
#include <ctime>
#include <pthread.h>
template<class T>
class CircularQueue //定义环形队列记录生产和消费的过程,当生产和消费指向同一个资源的时候,队列为满或为空。
{
public:
CircularQueue(const int capacity = 8)
:_capacity(capacity)
,_CQueue(capacity)
,_consumer_index(0)
,_producer_index(0)
{
sem_init(&_num_data_produced, 0, 0);
sem_init(&_num_space_remained, 0, _capacity); //剩余空间数为容量大小
pthread_mutex_init(&_Consumer_mutex, nullptr);
pthread_mutex_init(&_Producer_mutex, nullptr);
}
~CircularQueue()
{
sem_destroy(&_num_data_produced);
sem_destroy(&_num_space_remained);
pthread_mutex_destroy(&_Consumer_mutex);
pthread_mutex_destroy(&_Producer_mutex);
}
void Push(const T&x) //生产过程
{
sem_wait(&_num_space_remained); //生产者获取剩余的资源,P操作使得剩余的空间资源-1
//线程加锁,此时可能已经有多个生产者线程申请了信号量,但只能有一个能进行临界资源的访问
pthread_mutex_lock(&_Producer_mutex);
_CQueue[_producer_index] = x; //获取队列中的数据
++_producer_index; //生产数据后,将生产者的位置指向队列的下一个位置。
_producer_index = _producer_index % _capacity; //环形队列由_capacity控制
pthread_mutex_unlock(&_Producer_mutex);
sem_post(&_num_data_produced); //生产者生产了数据,V操作将数据量+1
}
void Pop(T&ret) //消费过程
{
sem_wait(&_num_data_produced); //消费者获取数据资源,P操作使得数据-1
pthread_mutex_lock(&_Consumer_mutex);
ret = _CQueue[_consumer_index];
++_consumer_index;
_consumer_index = _consumer_index % _capacity;
pthread_mutex_unlock(&_Consumer_mutex);
sem_post(&_num_space_remained);//消费者获取资源后,V操作使得剩余的空间+1
}
private:
std::vector<T> _CQueue;
int _capacity;
sem_t _num_data_produced; //生产者生产的数据,消费者读取
sem_t _num_space_remained;//剩余的空间,生产者读取
int _consumer_index; //定义index记录生产和消费的位置
int _producer_index;
pthread_mutex_t _Consumer_mutex; //定义锁用来控制多消费者对_consumer_index的访问
pthread_mutex_t _Producer_mutex;
};
void *ProduceMake(void* arg)
{
CircularQueue<int> *cq = (CircularQueue<int>*)arg;
while(1)
{
int input = rand() % 100;
std::cout << "Data Produced:" <<pthread_self() % 10 << " >:" << input << std::endl;
cq->Push(input);
}
}
void *ConsumeBuy(void *arg)
{
CircularQueue<int> *cq = (CircularQueue<int>*)arg;
while(1)
{
int ret;
cq->Pop(ret);
std::cout << "Data Consumed <" <<pthread_self() % 10 << " >:" << ret <<std::endl;
sleep(1);
}
}
int main()
{
srand((unsigned int)time(nullptr));
CircularQueue<int> *cq = new CircularQueue<int>();
pthread_t consumer1;
pthread_t consumer2;
pthread_t consumer3;
pthread_t producer1;
pthread_t producer2;
pthread_create(&producer1, nullptr, ProduceMake, (void*)cq);
pthread_create(&producer2, nullptr, ProduceMake, (void*)cq);
pthread_create(&consumer1, nullptr, ConsumeBuy, (void*)cq);
pthread_create(&consumer2, nullptr, ConsumeBuy, (void*)cq);
pthread_create(&consumer3, nullptr, ConsumeBuy, (void*)cq);
pthread_join(consumer1, nullptr);
pthread_join(consumer2, nullptr);
pthread_join(consumer3, nullptr);
pthread_join(producer1, nullptr);
pthread_join(producer2, nullptr);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131