Menu

[r755]: / framework / branches / refactoring / thread_pool.cpp  Maximize  Restore  History

Download this file

126 lines (100 with data), 2.0 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#define CPPCMS_SOURCE
#include "thread_pool.h"
#include <queue>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#if defined(CPPCMS_POSIX)
#include <signal.h>
#endif
namespace cppcms {
namespace impl {
class thread_pool : public util::noncopyable {
public:
void post(util::callback0 const &job)
{
boost::unique_lock<boost::mutex> lock(mutex_);
queue_.push(job);
cond_.notify_one();
}
thread_pool(int threads)
{
workers_.resize(threads);
#if defined(CPPCMS_POSIX)
sigset_t set,old;
sigfillset(&set);
pthread_sigmask(SIG_BLOCK,&set,&old);
#endif
for(int i=0;i<threads;i++) {
workers_[i].reset(new boost::thread(boost::bind(&thread_pool::worker,this)));
}
#if defined(CPPCMS_POSIX)
pthread_sigmask(SIG_SETMASK,&old,0);
#endif
}
void stop()
{
{
boost::unique_lock<boost::mutex> lock(mutex_);
shut_down_=true;
cond_.notify_all();
}
for(unsigned i=0;i<workers_.size();i++) {
boost::shared_ptr<boost::thread> thread=workers_[i];
workers_[i].reset();
if(thread)
thread->join();
}
}
~thread_pool()
{
try {
stop();
}
catch(...)
{
}
}
private:
void worker()
{
for(;;) {
util::callback0 job;
{
boost::unique_lock<boost::mutex> lock(mutex_);
if(shut_down_)
return;
if(!queue_.empty()) {
queue_.front().swap(job);
queue_.pop();
}
else {
cond_.wait(lock);
}
}
job();
}
}
boost::mutex mutex_;
boost::condition_variable cond_;
bool shut_down_;
std::queue<util::callback0> queue_;
std::vector<boost::shared_ptr<boost::thread> > workers_;
};
}
thread_pool::thread_pool(int n) :
impl_(new impl::thread_pool(n))
{
}
void thread_pool::post(util::callback0 const &job)
{
impl_->post(job);
}
void thread_pool::stop()
{
impl_->stop();
}
thread_pool::~thread_pool()
{
}
} // cppcms
MongoDB Logo MongoDB