Skip to content

Commit

Permalink
Support very large batch commands
Browse files Browse the repository at this point in the history
  • Loading branch information
ideawu committed Mar 29, 2014
1 parent 62ee0cc commit 47c7056
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 100 deletions.
6 changes: 6 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
* 1.6.8.6 (2014-03-29)
* New features:
* Incompatible changes:
* Bug fixes:
- Redesign network flow, support very large batch commands

* 1.6.8.5 (2014-03-05)
* New features:
- Add qslice(lrange), qget(lindex, lget) commands.
Expand Down
1 change: 1 addition & 0 deletions src/link.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Link::Link(bool is_server){

sock = -1;
noblock_ = false;
error_ = false;
remote_ip[0] = '\0';
remote_port = -1;

Expand Down
7 changes: 7 additions & 0 deletions src/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Link{
private:
int sock;
bool noblock_;
bool error_;
std::vector<Bytes> recv_data;

RedisLink *redis;
Expand Down Expand Up @@ -43,6 +44,12 @@ class Link{
int fd() const{
return sock;
}
bool error() const{
return error_;
}
void mark_error(){
error_ = true;
}

static Link* connect(const char *ip, int port);
static Link* listen(const char *ip, int port);
Expand Down
204 changes: 108 additions & 96 deletions src/ssdb-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Config *conf = NULL;
SSDB *ssdb = NULL;
Link *serv_link = NULL;
IpFilter *ip_filter = NULL;
Fdevents *fdes = NULL;
int link_count = 0;

typedef std::vector<Link *> ready_list_t;

Expand All @@ -55,8 +57,8 @@ int main(int argc, char **argv){
}
#endif

fdes = new Fdevents();
run(argc, argv);

remove_pidfile();

if(serv_link){
Expand All @@ -71,16 +73,41 @@ int main(int argc, char **argv){
log_debug("free conf");
delete conf;
}
if(fdes){
delete fdes;
}
log_info("ssdb server exit.");
return 0;
}

int proc_result(ProcJob &job, Fdevents &fdes, ready_list_t &ready_list){
Link* accept_link(){
Link *link = serv_link->accept();
if(link == NULL){
log_error("accept failed! %s", strerror(errno));
return NULL;
}
if(!ip_filter->check_pass(link->remote_ip)){
log_debug("ip_filter deny link from %s:%d", link->remote_ip, link->remote_port);
delete link;
return NULL;
}
link_count ++;
log_debug("new link from %s:%d, fd: %d, link_count: %d",
link->remote_ip, link->remote_port, link->fd(), link_count);

link->nodelay();
link->noblock();
link->create_time = millitime();
link->active_time = link->create_time;
return link;
}

int proc_result(ProcJob &job, ready_list_t &ready_list){
Link *link = job.link;

if(job.result == PROC_ERROR){
log_info("fd: %d, proc error, delete link", link->fd());
fdes.del(link->fd());
fdes->del(link->fd());
delete link;
return PROC_ERROR;
}
Expand All @@ -91,42 +118,78 @@ int proc_result(ProcJob &job, Fdevents &fdes, ready_list_t &ready_list){
}

if(!link->output->empty()){
//log_trace("add %d to fdes.out", link->fd());
fdes.set(link->fd(), FDEVENT_OUT, 1, link);
if(fdes.isset(link->fd(), FDEVENT_IN)){
//log_trace("delete %d from fdes.in", link->fd());
fdes.clr(link->fd(), FDEVENT_IN);
}
fdes->set(link->fd(), FDEVENT_OUT, 1, link);
}
if(link->input->empty()){
fdes->set(link->fd(), FDEVENT_IN, 1, link);
}else{
if(link->input->empty()){
if(!fdes.isset(link->fd(), FDEVENT_IN)){
//log_trace("add %d to fdes.in", link->fd());
fdes.set(link->fd(), FDEVENT_IN, 1, link);
}
}else{
if(fdes.isset(link->fd(), FDEVENT_IN)){
//log_trace("delete %d from fdes.in", link->fd());
fdes.clr(link->fd(), FDEVENT_IN);
}
ready_list.push_back(link);
}
fdes->clr(link->fd(), FDEVENT_IN);
ready_list.push_back(link);
}
return PROC_OK;
}

/*
event:
read => ready_list OR close
write => NONE
proc =>
done: write & (read OR ready_list)
async: stop (read & write)
1. When writing to a link, it may happen to be in the ready_list,
so we cannot close that link in write process, we could only
just mark it as closed.
2. When reading from a link, it is never in the ready_list, so it
is safe to close it in read process, also safe to put it into
ready_list.
3. Ignore FDEVENT_ERR
*/
int proc_client_event(const Fdevent *fde, ready_list_t &ready_list){
Link *link = (Link *)fde->data.ptr;
if(fde->events & FDEVENT_IN){
ready_list.push_back(link);
if(link->error()){
return 0;
}
int len = link->read();
//log_debug("fd: %d read: %d", link->fd(), len);
if(len <= 0){
log_debug("fd: %d, read: %d, delete link", link->fd(), len);
link->mark_error();
return 0;
}
}
if(fde->events & FDEVENT_OUT){
if(link->error()){
return 0;
}
int len = link->write();
if(len <= 0){
log_debug("fd: %d, write: %d, delete link", link->fd(), len);
link->mark_error();
return 0;
}
if(link->output->empty()){
fdes->clr(link->fd(), FDEVENT_OUT);
}
}
return 0;
}

void run(int argc, char **argv){
ready_list_t ready_list;
ready_list_t ready_list_2;
ready_list_t::iterator it;
const Fdevents::events_t *events;
Server serv(ssdb);

Fdevents fdes;
fdes.set(serv_link->fd(), FDEVENT_IN, 0, serv_link);
fdes.set(serv.reader->fd(), FDEVENT_IN, 0, serv.reader);
fdes.set(serv.writer->fd(), FDEVENT_IN, 0, serv.writer);
fdes->set(serv_link->fd(), FDEVENT_IN, 0, serv_link);
fdes->set(serv.reader->fd(), FDEVENT_IN, 0, serv.reader);
fdes->set(serv.writer->fd(), FDEVENT_IN, 0, serv.writer);

int link_count = 0;
uint32_t last_ticks = g_ticks;

while(!quit){
Expand All @@ -136,13 +199,14 @@ void run(int argc, char **argv){
log_info("ssdb working, links: %d", link_count);
}

ready_list.swap(ready_list_2);
ready_list_2.clear();

if(!ready_list.empty()){
// ready_list not empty, so we should return immediately
events = fdes.wait(0);
events = fdes->wait(0);
}else{
events = fdes.wait(50);
events = fdes->wait(50);
}
if(events == NULL){
log_fatal("events.wait error: %s", strerror(errno));
Expand All @@ -152,95 +216,44 @@ void run(int argc, char **argv){
for(int i=0; i<(int)events->size(); i++){
const Fdevent *fde = events->at(i);
if(fde->data.ptr == serv_link){
Link *link = serv_link->accept();
if(link == NULL){
log_error("accept failed! %s", strerror(errno));
continue;
}
if(!ip_filter->check_pass(link->remote_ip)){
log_debug("ip_filter deny link from %s:%d",
link->remote_ip, link->remote_port);
delete link;
continue;
Link *link = accept_link();
if(link){
fdes->set(link->fd(), FDEVENT_IN, 1, link);
}
link_count ++;
log_debug("new link from %s:%d, fd: %d, link_count: %d",
link->remote_ip, link->remote_port, link->fd(), link_count);

link->nodelay();
link->noblock();
link->create_time = millitime();
link->active_time = link->create_time;
fdes.set(link->fd(), FDEVENT_IN, 1, link);
}else if(fde->data.ptr == serv.reader || fde->data.ptr == serv.writer){
WorkerPool<Server::ProcWorker, ProcJob> *worker = (WorkerPool<Server::ProcWorker, ProcJob> *)fde->data.ptr;
ProcJob job;
if(worker->pop(&job) == 0){
log_fatal("reading result from workers error!");
exit(0);
}
if(proc_result(job, fdes, ready_list_2) == PROC_ERROR){
if(proc_result(job, ready_list) == PROC_ERROR){
link_count --;
}
}else{
Link *link = (Link *)fde->data.ptr;
// 不能同时监听读和写事件, 只能监听其中一个
if(fde->events & FDEVENT_ERR){
log_info("fd: %d error, delete link", link->fd());
link_count --;
fdes.del(link->fd());
delete link;
}else if(fde->events & FDEVENT_IN){
int len = link->read();
//log_trace("fd: %d read: %d", link->fd(), len);
if(len <= 0){
log_debug("fd: %d, read: %d, delete link", link->fd(), len);
link_count --;
fdes.del(link->fd());
delete link;
}else{
ready_list.push_back(link);
}
}else if(fde->events & FDEVENT_OUT){
int len = link->write();
//log_trace("fd: %d write: %d", link->fd(), len);
if(len <= 0){
log_debug("fd: %d, write: %d, delete link", link->fd(), len);
link_count --;
fdes.del(link->fd());
delete link;
}else if(link->output->empty()){
//log_trace("delete %d from fdes.out", link->fd());
fdes.clr(link->fd(), FDEVENT_OUT);
if(!link->input->empty()){
ready_list.push_back(link);
}else{
//log_trace("add %d to fdes.in", link->fd());
fdes.set(link->fd(), FDEVENT_IN, 1, link);
}
}else{
//log_trace("%d", link->output->size());
}
}
proc_client_event(fde, ready_list);
}
}

for(it = ready_list.begin(); it != ready_list.end(); it ++){
Link *link = *it;
if(link->error()){
link_count --;
fdes->del(link->fd());
delete link;
continue;
}

const Request *req = link->recv();
if(req == NULL){
log_warn("fd: %d, link parse error, delete link", link->fd());
link_count --;
fdes.del(link->fd());
fdes->del(link->fd());
delete link;
continue;
}
if(req->empty()){
if(!fdes.isset(link->fd(), FDEVENT_IN)){
//log_trace("add %d to fdes.in", link->fd());
fdes.set(link->fd(), FDEVENT_IN, 1, link);
}
fdes->set(link->fd(), FDEVENT_IN, 1, link);
continue;
}

Expand All @@ -250,20 +263,19 @@ void run(int argc, char **argv){
job.link = link;
serv.proc(&job);
if(job.result == PROC_THREAD){
fdes.del(link->fd());
fdes->del(link->fd());
continue;
}
if(job.result == PROC_BACKEND){
fdes.del(link->fd());
fdes->del(link->fd());
link_count --;
continue;
}

if(proc_result(job, fdes, ready_list_2) == PROC_ERROR){
if(proc_result(job, ready_list_2) == PROC_ERROR){
link_count --;
}
} // end foreach ready link
ready_list.swap(ready_list_2);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/util/fde_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ bool Fdevents::isset(int fd, int flag){

int Fdevents::set(int fd, int flags, int data_num, void *data_ptr){
struct Fdevent *fde = get_fde(fd);
if(fde->s_flags & flags){
return 0;
}
int ctl_op = fde->s_flags? EPOLL_CTL_MOD : EPOLL_CTL_ADD;

fde->s_flags |= flags;
Expand Down Expand Up @@ -57,6 +60,9 @@ int Fdevents::del(int fd){

int Fdevents::clr(int fd, int flags){
struct Fdevent *fde = get_fde(fd);
if(!(fde->s_flags & flags)){
return 0;
}

fde->s_flags &= ~flags;
int ctl_op = fde->s_flags? EPOLL_CTL_MOD: EPOLL_CTL_DEL;
Expand Down
Loading

0 comments on commit 47c7056

Please sign in to comment.