Menu

[r415]: / framework / trunk / tcp_cache_server.cpp  Maximize  Restore  History

Download this file

249 lines (237 with data), 5.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
#include "config.h"
#ifdef __CYGWIN__
// Cygwin ASIO works only with win32 sockets
#define _WIN32_WINNT 1
#define __USE_W32_SOCKETS 1
#endif
#ifdef USE_BOOST_ASIO
#include <boost/asio.hpp>
namespace aio = boost::asio;
using boost::system::error_code;
#else
#include <asio.hpp>
namespace aio = asio;
using asio::error_code;
#endif
#include "tcp_cache_protocol.h"
#include "archive.h"
#include "thread_cache.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <ctime>
#include <cstdlib>
using namespace std;
using namespace cppcms;
using aio::ip::tcp;
using boost::shared_ptr;
class session : public boost::enable_shared_from_this<session> {
vector<char> data_in;
string data_out;
cppcms::tcp_operation_header hout;
cppcms::tcp_operation_header hin;
public:
tcp::socket socket_;
base_cache &cache;
session(aio::io_service &srv,base_cache &c) : socket_(srv), cache(c) {}
void run()
{
aio::async_read(socket_,aio::buffer(&hin,sizeof(hin)),
boost::bind(&session::on_header_in,shared_from_this(),
aio::placeholders::error));
}
void on_header_in(error_code const &e)
{
data_in.clear();
data_in.resize(hin.size);
aio::async_read(socket_,aio::buffer(data_in,hin.size),
boost::bind(&session::on_data_in,shared_from_this(),
aio::placeholders::error));
}
void fetch_page()
{
string key;
key.assign(data_in.begin(),data_in.end());
if(cache.fetch_page(key,data_out,hin.operations.fetch_page.gzip)) {
hout.opcode=opcodes::page_data;
hout.size=data_out.size();
hout.operations.page_data.strlen=data_out.size();
}
else {
hout.opcode=opcodes::no_data;
}
}
void fetch()
{
archive a;
set<string> tags;
string key;
key.assign(data_in.begin(),data_in.end());
if(!cache.fetch(key,a,tags)) {
hout.opcode=opcodes::no_data;
}
else {
hout.opcode=opcodes::data;
data_out=a.get();
hout.operations.data.data_len=data_out.size();
for(set<string>::iterator p=tags.begin(),e=tags.end();p!=e;++p) {
data_out.append(p->c_str(),p->size()+1);
}
hout.operations.data.triggers_len=data_out.size()-hout.operations.data.data_len;
hout.size=data_out.size();
}
}
void rise()
{
string key;
key.assign(data_in.begin(),data_in.end());
cache.rise(key);
hout.opcode=opcodes::done;
}
void clear()
{
cache.clear();
hout.opcode=opcodes::done;
}
void stats()
{
unsigned k,t;
cache.stats(k,t);
hout.opcode=opcodes::out_stats;
hout.operations.out_stats.keys=k;
hout.operations.out_stats.triggers=t;
}
bool load_triggers(set<string> &triggers,char const *start,unsigned len)
{
int slen=len;
while(slen>0) {
unsigned size=strlen(start);
if(size==0) {
return false;
}
string tmp;
tmp.assign(start,size);
slen-=size+1;
start+=size+1;
triggers.insert(tmp);
}
return true;
}
void store()
{
set<string> triggers;
if( hin.operations.store.key_len
+hin.operations.store.data_len
+hin.operations.store.triggers_len != hin.size
|| hin.operations.store.key_len == 0)
{
hout.opcode=opcodes::error;
return;
}
string ts;
vector<char>::iterator p=data_in.begin()
+hin.operations.store.key_len
+hin.operations.store.data_len;
ts.assign(p,p + hin.operations.store.triggers_len);
if(!load_triggers(triggers,ts.c_str(),
hin.operations.store.triggers_len))
{
hout.opcode=opcodes::error;
return;
}
time_t now;
std::time(&now);
time_t timeout=now+(time_t)hin.operations.store.timeout;
string key;
key.assign(data_in.begin(),data_in.begin()+hin.operations.store.key_len);
string data;
data.assign(data_in.begin()+hin.operations.store.key_len,
data_in.begin() + hin.operations.store.key_len + hin.operations.store.data_len);
archive a(data);
cache.store(key,triggers,timeout,a);
hout.opcode=opcodes::done;
}
void on_data_in(error_code const &e)
{
if(e) return;
memset(&hout,0,sizeof(hout));
switch(hin.opcode){
case opcodes::fetch_page: fetch_page(); break;
case opcodes::fetch: fetch(); break;
case opcodes::rise: rise(); break;
case opcodes::clear: clear(); break;
case opcodes::store: store(); break;
case opcodes::stats: stats(); break;
default:
hout.opcode=opcodes::error;
}
async_write(socket_,aio::buffer(&hout,sizeof(hout)),
boost::bind(&session::on_header_out,shared_from_this(),
aio::placeholders::error));
}
void on_header_out(error_code const &e)
{
if(e) return;
if(hout.size==0) {
run();
return ;
}
async_write(socket_,aio::buffer(data_out.c_str(),hout.size),
boost::bind(&session::on_data_out,shared_from_this(),
aio::placeholders::error));
}
void on_data_out(error_code const &e)
{
if(e) return;
run();
}
};
class tcp_cache_server {
tcp::acceptor acceptor_;
base_cache &cache;
void on_accept(error_code const &e,shared_ptr<session> s)
{
if(!e) {
tcp::no_delay nd(true);
s->socket_.set_option(nd);
s->run();
start_accept();
}
}
void start_accept()
{
shared_ptr<session> s(new session(acceptor_.io_service(),cache));
acceptor_.async_accept(s->socket_,boost::bind(&tcp_cache_server::on_accept,this,aio::placeholders::error,s));
}
public:
tcp_cache_server( aio::io_service &io,
string ip,
int port,
base_cache &c) :
acceptor_(io,
tcp::endpoint(aio::ip::address::from_string(ip),
port)),
cache(c)
{
start_accept();
}
};
int main(int argc,char **argv)
{
if(argc!=4) {
cerr<<"Usage: tcp_cache_server ip port entries-limit"<<endl;
return 1;
}
try
{
aio::io_service io;
thread_cache cache(atoi(argv[3]));
tcp_cache_server srv_cache(io,argv[1],atoi(argv[2]),cache);
io.run();
}
catch(std::exception const &e) {
cerr<<"Error:"<<e.what()<<endl;
return 1;
}
return 0;
}
MongoDB Logo MongoDB