|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
#ifndef SRC_COMMON_BLOCKING_QUEUE_H_#define SRC_COMMON_BLOCKING_QUEUE_H_#include <boost/thread.hpp>#include <boost/noncopyable.hpp>#include <queue>template<typename T>
class xl_blocking_queue
:boost::noncopyable
{public:
xl_blocking_queue()
:mutex_(), queue_(), cond_()
{
}
~xl_blocking_queue(){}
void put(const T& func)
{
boost::unique_lock<boost::mutex> lock(mutex_);
queue_.push(func);
cond_.notify_all();
}
T get()
{
boost::unique_lock<boost::mutex> lock(mutex_);
if (queue_.size() == 0)
{
cond_.wait(lock);
}
T front(queue_.front());
queue_.pop();
return front;
}
unsigned size()
{
return queue_.size();
}
void notify_all()
{
cond_.notify_all();
}
private:
std::queue<T> queue_;
boost::condition_variable_any cond_;
boost::mutex mutex_;
};#endif |
xl_thread_pool.h
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
<strong>#ifndef SRC_COMMON_THREAD_POOL_H_#define SRC_COMMON_THREAD_POOL_H_#include <boost/thread.hpp>#include <boost/shared_ptr.hpp>#include <boost/noncopyable.hpp>#include <vector>#include "xl_blocking_queue.h"typedef boost::function<void (void)> thread_do_func;
class xl_thread_pool
:boost::noncopyable
{public:
xl_thread_pool(int thread_num)
:num_(thread_num), run_(false)
{
}
~xl_thread_pool()
{
if (run_)
{
stop();
}
}
void start()
{
if (num_ <= 0) return;
int i = 0;
run_ = true;
for(i=0;i<num_;i++)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(boost::BOOST_BIND(&xl_thread_pool::run, this)));
thread_arr_.push_back(thread);
}
}
void stop()
{
run_ = false;
queue_.notify_all();
}
void post(const thread_do_func& task)
{
if (thread_arr_.size() == 0)
{
task();
}
else
{
queue_.put(task);
}
}
private:
xl_blocking_queue<thread_do_func> queue_;
std::vector<boost::shared_ptr<boost::thread> > thread_arr_;
int num_;
bool run_;
void run()
{
while(run_)
{
thread_do_func task = queue_.get();
task();
}
}
};#endif |