Menu

[r342]: / framework / trunk / thread_pool.h  Maximize  Restore  History

Download this file

172 lines (142 with data), 3.9 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H
#include <pthread.h>
#include <string>
#include <memory>
#include "worker_thread.h"
#include "global_config.h"
#include <boost/shared_ptr.hpp>
namespace cppcms {
using boost::shared_ptr;
class base_factory {
public:
virtual shared_ptr<worker_thread> operator()() const = 0;
virtual ~base_factory() {};
};
template<typename T>
class simple_factory : public base_factory {
public:
virtual shared_ptr<worker_thread> operator()() const { return shared_ptr<worker_thread>(new T()); };
};
namespace details {
class fast_cgi_application {
fast_cgi_application static *handlers_owner;
protected:
// General control
int main_fd; //File descriptor associated with socket
int signal_pipe[2]; // Notification pipe
std::string socket;
static void handler(int id);
typedef enum { EXIT , ACCEPT } event_t;
event_t wait();
void set_signal_handlers();
static fast_cgi_application *get_instance() { return handlers_owner; };
public:
fast_cgi_application(char const *socket,int backlog);
virtual ~fast_cgi_application() {};
void shutdown();
virtual bool run() { return false; };
void execute();
};
class fast_cgi_single_threaded_app : public fast_cgi_application {
/* Single thread model -- one process runs */
FCGX_Request request;
shared_ptr<worker_thread> worker;
void setup();
public:
virtual bool run();
fast_cgi_single_threaded_app(base_factory const &factory,char const *socket=NULL);
virtual ~fast_cgi_single_threaded_app(){};
};
template <class T>
class sefe_set {
pthread_mutex_t access_mutex;
pthread_cond_t new_data_availible;
pthread_cond_t new_space_availible;
protected:
int max;
int size;
virtual T &get_int() throw() = 0;
virtual void put_int(T &val) throw() = 0;
public:
void init(int size){
pthread_mutex_init(&access_mutex,NULL);
pthread_cond_init(&new_data_availible,NULL);
pthread_cond_init(&new_space_availible,NULL);
max=size;
this->size=0;
};
sefe_set() {};
virtual ~sefe_set() {};
virtual void push(T val) {
pthread_mutex_lock(&access_mutex);
while(size>=max) {
pthread_cond_wait(&new_space_availible,&access_mutex);
}
put_int(val);
pthread_cond_signal(&new_data_availible);
pthread_mutex_unlock(&access_mutex);
};
T pop() {
pthread_mutex_lock(&access_mutex);
while(size==0) {
pthread_cond_wait(&new_data_availible,&access_mutex);
}
T data=get_int();
pthread_cond_signal(&new_space_availible);
pthread_mutex_unlock(&access_mutex);
return data;
};
};
template <class T>
class sefe_queue : public sefe_set<T>{
T *queue;
int head;
int tail;
int next(int x) { return (x+1)%this->max; };
protected:
virtual void put_int(T &val) throw() {
queue[head]=val;
head=next(head);
this->size++;
}
virtual T &get_int() throw () {
this->size--;
int ptr=tail;
tail=next(tail);
return queue[ptr];
}
public:
void init(int size) {
if(queue) return;
queue=new T [size];
sefe_set<T>::init(size);
}
sefe_queue() { queue = NULL; head=tail=0; };
virtual ~sefe_queue() { delete [] queue; };
};
class fast_cgi_multiple_threaded_app : public fast_cgi_application {
int size;
vector<shared_ptr<worker_thread> > workers;
sefe_queue<FCGX_Request*> requests_queue;
sefe_queue<FCGX_Request*> jobs_queue;
typedef pair<int,fast_cgi_multiple_threaded_app*> info_t;
FCGX_Request *requests;
info_t *threads_info;
pthread_t *pids;
static void *thread_func(void *p);
void start_threads();
void wait_threads();
public:
fast_cgi_multiple_threaded_app( int num,int buffer_len, base_factory const &facory,char const *socket=NULL);
virtual bool run();
virtual ~fast_cgi_multiple_threaded_app() {
delete [] requests;
delete [] pids;
delete [] threads_info;
};
};
} // END OF DETAILS
void run_application(int argc,char *argv[],base_factory const &factory);
}
#endif /* _THREAD_POOL_H */
MongoDB Logo MongoDB