#include "config.h"
#ifdef __CYGWIN__
// Cygwin ASIO works only with win32 sockets
#define _WIN32_WINNT 1
#define __USE_W32_SOCKETS 1
#endif
#ifdef USE_BOOST_ASIO
#include <boost/asio.hpp>
namespace aio = boost::asio;
using boost::system::error_code;
using boost::system::system_error;
#else
#include <asio.hpp>
namespace aio = asio;
using asio::error_code;
using asio::system_error;
#endif
#include "tcp_cache.h"
#include "tcp_cache_protocol.h"
#include "archive.h"
using aio::ip::tcp;
namespace cppcms {
class messenger : boost::noncopyable {
aio::io_service srv_;
tcp::socket socket_;
string ip_;
int port_;
public:
void connect(string ip,int port) {
ip_=ip;
port_=port;
error_code e;
socket_.connect(tcp::endpoint(aio::ip::address::from_string(ip),port),e);
if(e) throw cppcms_error("connect:"+e.message());
tcp::no_delay nd(true);
socket_.set_option(nd);
}
messenger(string ip,int port) :
socket_(srv_)
{
connect(ip,port);
}
messenger() : socket_(srv_) { };
void transmit(tcp_operation_header &h,string &data)
{
bool done=false;
int times=0;
do {
try {
aio::write(socket_,aio::buffer(&h,sizeof(h)));
if(h.size>0) {
aio::write(socket_,aio::buffer(data,h.size));
}
aio::read(socket_,aio::buffer(&h,sizeof(h)));
if(h.size>0) {
vector<char> d(h.size);
aio::read(socket_,aio::buffer(d,h.size));
data.assign(d.begin(),d.begin()+h.size);
}
done=true;
}
catch(system_error const &e) {
if(times) {
throw cppcms_error(string("tcp_cache:")+e.what());
}
socket_.close();
error_code er;
socket_.connect(
tcp::endpoint(
aio::ip::address::from_string(ip_),port_),er);
if(er) throw cppcms_error("reconnect:"+er.message());
times++;
}
}while(!done);
}
};
tcp_cache::tcp_cache(vector<string> const& ip,vector<long> const &port)
{
if(ip.size()<1 || port.size()!=ip.size()) {
throw cppcms_error("Incorrect parameters for tcp cache");
}
conns=ip.size();
tcp=new messenger[conns];
try {
for(int i=0;i<conns;i++) {
tcp[i].connect(ip[i],port[i]);
}
}
catch(...) {
delete [] tcp;
tcp=NULL;
throw;
}
}
tcp_cache::~tcp_cache()
{
delete [] tcp;
}
void tcp_cache::broadcast(tcp_operation_header &h,string &data)
{
int i;
for(i=0;i<conns;i++) {
tcp_operation_header ht=h;
string dt=data;
tcp[i].transmit(ht,data);
}
}
void tcp_cache::rise(string const &trigger)
{
tcp_operation_header h={0};
h.opcode=opcodes::rise;
h.size=trigger.size();
string data=trigger;
h.operations.rise.trigger_len=trigger.size();
broadcast(h,data);
}
void tcp_cache::clear()
{
tcp_operation_header h={0};
h.opcode=opcodes::clear;
h.size=0;
string empty;
broadcast(h,empty);
}
bool tcp_cache::fetch_page(string const &key,string &output,bool gzip)
{
string data=key;
tcp_operation_header h={0};
h.opcode=opcodes::fetch_page;
h.size=data.size();
h.operations.fetch_page.gzip=gzip;
h.operations.fetch_page.strlen=data.size();
get(key).transmit(h,data);
if(h.opcode==opcodes::page_data) {
output=data;
return true;
}
return false;
}
bool tcp_cache::fetch(string const &key,archive &a,set<string> &tags)
{
string data=key;
tcp_operation_header h={0};
h.opcode=opcodes::fetch;
h.size=data.size();
h.operations.fetch.key_len=data.size();
get(key).transmit(h,data);
if(h.opcode!=opcodes::data)
return false;
char const *ptr=data.c_str();
a.set(ptr,h.operations.data.data_len);
ptr+=h.operations.data.data_len;
int len=h.operations.data.triggers_len;
while(len>0) {
string tag;
unsigned tmp_len=strlen(ptr);
tag.assign(ptr,tmp_len);
ptr+=tmp_len+1;
len-=tmp_len+1;
tags.insert(tag);
}
return true;
}
void tcp_cache::stats(unsigned &keys,unsigned &triggers)
{
keys=0; triggers=0;
for(int i=0;i<conns;i++) {
tcp_operation_header h={0};
string data;
h.opcode=opcodes::stats;
tcp[i].transmit(h,data);
if(h.opcode==opcodes::out_stats) {
keys+=h.operations.out_stats.keys;
triggers+=h.operations.out_stats.triggers;
}
}
}
void tcp_cache::store(string const &key,set<string> const &triggers,time_t timeout,archive const &a)
{
tcp_operation_header h={0};
string data;
h.opcode=opcodes::store;
data.append(key);
h.operations.store.key_len=key.size();
data.append(a.get());
h.operations.store.data_len=a.get().size();
time_t now;
time(&now);
h.operations.store.timeout=timeout-now > 0 ? timeout-now : 0;
unsigned tlen=0;
for(set<string>::const_iterator p=triggers.begin(),e=triggers.end();p!=e;++p) {
tlen+=p->size()+1;
data.append(p->c_str(),p->size()+1);
}
h.operations.store.triggers_len=tlen;
h.size=data.size();
get(key).transmit(h,data);
}
messenger &tcp_cache::get(string const &key)
{
if(conns==1) return tcp[0];
unsigned val=0,i;
for(i=0;i<key.size();i++) {
val+=251*key[i]+103 % 307;
}
return tcp[val % conns];
}
}
#ifdef TCP_CACHE_UNIT_TEST
#include <assert.h>
#include <iostream>
#include <cstdlib>
int main(int argc,char **argv)
{
using namespace cppcms;
using namespace std;
if(argc!=3) {
cerr<<"Usage IP port"<<endl;
return 1;
}
try {
archive a;
set<string> s;
tcp_cache tcp(argv[1],atoi(argv[2]));
assert(tcp.fetch("something",a,s)==false);
time_t t;
time(&t);
t+=2;
a.set("data",4);
tcp.store("key",s,t,a);
unsigned keys,triggers;
tcp.stats(keys,triggers);
assert(keys==1);
assert(triggers==1);
s.clear();
a.set("");
assert(tcp.fetch("key",a,s)==true);
assert(s.size()==1);
assert(*(s.begin())=="key");
assert(a.get()=="data");
sleep(3);
assert(tcp.fetch("key",a,s)==false);
a.set("");
a<<string("msg1");
a<<string("msg2");
time(&t);
t+=50;
s.clear();
s.insert("a");
s.insert("b");
tcp.store("k",s,t,a);
string x;
assert(tcp.fetch_page("k",x,true)==true);
assert(x=="msg2");
assert(tcp.fetch_page("k",x,false)==true);
assert(x=="msg1");
a.set("");
s.clear();
assert(tcp.fetch("k",a,s)==true);
assert(s.size()==3);
set<string>::iterator ptr=s.begin();
assert(*ptr++=="a");
assert(*ptr++=="b");
assert(*ptr++=="k");
tcp.rise("a");
assert(tcp.fetch("k",a,s)==false);
a.set("Something");
s.clear();
tcp.store("bb",s,t,a);
assert(tcp.fetch("xx",a,s)==false);
assert(tcp.fetch("bb",a,s)==true);
tcp.clear();
assert(tcp.fetch("bb",a,s)==false);
cout<<"Done... OK!\n";
}
catch(std::exception const &e) {
cerr<<e.what()<<endl;
}
}
#endif