线程池实例代码
Nevermore 2022-05-31 OS
#include <iostream>
#include <queue>
#include <ctime>
#include <unistd.h>
#include <cstdlib>
class Task
{
public:
Task(const int &num = 0)
: _num(num)
{
}
void Run()
{
switch (_num)
{
case 0:
std::cout << "get num:" << _num << std::endl;
break;
case 1:
std::cout << "get num:" << _num << std::endl;
break;
case 2:
std::cout << "get num:" << _num << std::endl;
break;
default:
std::cout << "Input Wrong!" << std::endl;
break;
}
std::cout << std::endl;
}
private:
int _num;
};
template <class T>
class ThreadPool
{
private:
ThreadPool(const int &numberOfThread = 3)
: _thread_n(numberOfThread)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
//******************************************************************
// 申请一些成员函数供static函数访问成员变量
bool ThreadQueIsEmpty()
{
return _threadqueue.size() == 0;
}
void Lock()
{
pthread_mutex_lock(&_mutex);
}
void Unlock()
{
pthread_mutex_unlock(&_mutex);
}
void CondWait()
{
pthread_cond_wait(&_cond, &_mutex);
}
//******************************************************************
// 类成员函数有this指针,而线程任务函数只允许一个变量的传参。所以定义为static函数,该函数无法访问非静态的成员变量。
static void *myTask(void *arg)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = (ThreadPool<T> *)arg;
while (1)
{
tp->Lock();
while (tp->ThreadQueIsEmpty())
{
std::cout << "thread wait:" << pthread_self() << std::endl;
tp->CondWait(); // 等待线程状态就绪
}
T task;
tp->GetTaskFromQueue(task);
tp->Unlock();
std::cout << pthread_self() << ":Runing->";
task.Run(); // 解锁之后处理任务,可以让多个线程同时在处理
}
}
public:
static ThreadPool<T> *GetInstance()
{
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 当前单例对象还没有被创建
if (ins == nullptr) // 双判定,减少锁的争用,提高获取单例的效率!
{
pthread_mutex_lock(&lock);
if (ins == nullptr)
{
ins = new ThreadPool<T>();
ins->InitThreadPool();
std::cout << "首次加载对象" << std::endl;
}
pthread_mutex_unlock(&lock);
}
return ins;
}
void InitThreadPool()
{
pthread_t *p = new pthread_t[_thread_n]; // 创建多线程
for (int i = 0; i < _thread_n; i++)
{
pthread_create(p + i, nullptr, myTask, (void *)this); // 需要将this指针传入线程任务函数,因为static无法访问非静态成员,只能外部传入
}
// for (int i = 0; i < _thread_n; i++) //等待异常线程退出
// {
// pthread_join(p[i], nullptr);
// }
}
void AddTaskToQueue(const T &x) // 由主线程调用,为线程池中的线程添加任务
{
Lock();
_threadqueue.push(x);
Unlock();
pthread_cond_signal(&_cond); // 唤醒在当前条件下等待的消费者线程
}
void GetTaskFromQueue(T &x) // 线程池中的线程从任务队列中获取任务
{
x = _threadqueue.front();
_threadqueue.pop();
}
private:
int _thread_n;
std::queue<T> _threadqueue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *ins;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::ins = nullptr;
int main()
{
srand((unsigned int)time(nullptr));
ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance();
tp->InitThreadPool();
while (1)
{
Task t(rand() % 3);
tp->AddTaskToQueue(t);
sleep(1);
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173