diff --git a/README.md b/README.md index fa5afb3..27c3279 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,3 @@ # sim -C++ network server framework, `nc` and `telnet` friendly. - - -#demo - - #include "sim/sim.h" - - class MyHandler : public sim::Handler - { - public: - virtual sim::HandlerState proc(const sim::Request &req, sim::Response *resp){ - std::string cmd = req.msg.type(); - if(cmd == "ping"){ - resp->msg.add("ok"); - resp->msg.add("pong"); - }else{ - resp->msg.add("ok"); - resp->msg.add(cmd); - } - return this->resp(); - } - }; - - int main(int argc, char **argv){ - const char *ip = "127.0.0.1"; - int port = 8800; - sim::Server *serv = sim::Server::listen(ip, port); - if(!serv){ - log_fatal("%s", strerror(errno)); - exit(0); - } - log_info("server listen on %s:%d", ip, port); - - MyHandler handler; - serv->add_handler(&handler); - - serv->loop(); - return 0; - } - - +C++ network server framework. diff --git a/src/Makefile b/src/Makefile index 29408a4..7181d95 100644 --- a/src/Makefile +++ b/src/Makefile @@ -1,29 +1,16 @@ include ../build_config.mk -CFLAGS += -OBJS = sim.o message.o decoder.o link.o handler.o server.o \ - fde.o log.o app.o config.o -LIB=libsim.a -HEADER_FILES=sim.h message.h decoder.h fde.h link.h handler.h server.h -UTIL_HEADERS=util/thread.h util/strings.h util/log.h util/config.h util/app.h +CFLAGS += -I . +OBJS = +LIBS = util/libutil.a net/libnet.a line/libline.a core/libcore.a + +all: ${OBJS} + make clean + $(CXX) ${CFLAGS} test.cpp ${OBJS} ${LIBS} -all: $(OBJS) - mkdir -p $(HEADER_OUTPUT_DIR)/util - ar -cru $(OUTPUT_DIR)/$(LIB) ${OBJS} - cp $(HEADER_FILES) $(HEADER_OUTPUT_DIR) - cp $(UTIL_HEADERS) $(HEADER_OUTPUT_DIR)/util - -fde.o: fde.h fde.cpp fde_select.cpp fde_epoll.cpp - ${CXX} ${CFLAGS} -c fde.cpp -app.o: util/app.h util/app.cpp - $(CXX) ${CFLAGS} -c util/app.cpp -log.o: util/log.h util/log.cpp - $(CXX) ${CFLAGS} -c util/log.cpp -config.o: util/config.h util/config.cpp - $(CXX) ${CFLAGS} -c util/config.cpp .cpp.o: $(CXX) ${CFLAGS} -c $< -o $@ clean: - rm -rf *.o *.a *.out $(OUTPUT_DIR)/$(LIB) $(HEADER_OUTPUT_DIR) + rm -rf *.o *.a *.out diff --git a/src/client/Makefile b/src/client/Makefile deleted file mode 100644 index 2d36cde..0000000 --- a/src/client/Makefile +++ /dev/null @@ -1,16 +0,0 @@ -include ../../build_config.mk -CFLAGS += -I../ -OUTPUT_DIR=../../api/cpp - -all: lib - ${CXX} -I$(OUTPUT_DIR) -o demo demo.cpp $(OUTPUT_DIR)/libsim-client.a - -lib: client.h client.cpp - $(CXX) $(CFLAGS) -c client.cpp - ar -cru libsim-client.a\ - client.o\ - ../sim.o ../message.o ../decoder.o ../link.o - cp client.h ../message.h libsim-client.a ../../api/cpp - -clean: - rm -rf *.o *.a *.out diff --git a/src/client/client.cpp b/src/client/client.cpp deleted file mode 100644 index 9db414e..0000000 --- a/src/client/client.cpp +++ /dev/null @@ -1,52 +0,0 @@ -#include "client.h" -#include "link.h" - -namespace sim{ - -Client::Client(){ - link = NULL; -} - -Client::~Client(){ - delete link; -} - -Client* Client::connect(const char *ip, int port){ - return Client::connect(std::string(ip), port); -} - -Client* Client::connect(const std::string &ip, int port){ - Link *link = Link::connect(ip, port); - if(!link){ - return NULL; - } - Client *client = new Client(); - client->link = link; - return client; -} - -int Client::send(const Message &msg){ - this->link->send(msg); - return this->link->flush(); -} - -int Client::recv(Message *msg){ - while(1){ - int ret; - ret = link->recv(msg); - if(ret == -1){ - return -1; - }else if(ret == 1){ - return 1; - }else{ - // - } - - ret = link->read(); - if(ret <= 0){ - return -1; - } - } -} - -}; // namespace sim diff --git a/src/client/client.h b/src/client/client.h deleted file mode 100644 index 6b57e46..0000000 --- a/src/client/client.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef SIM_CLIENT_H_ -#define SIM_CLIENT_H_ - -#include "message.h" - -namespace sim{ - -class Link; -std::string encode(const std::string s, bool force_ascii=false); -std::string decode(const std::string s); - -class Client -{ -public: - static Client* connect(const char *ip, int port); - static Client* connect(const std::string &ip, int port); - - int send(const Message &msg); // blocking send - int recv(Message *msg); // blocking recv - - ~Client(); -private: - Client(); - Link *link; -}; - -}; // namespace sim - -#endif diff --git a/src/client/demo.cpp b/src/client/demo.cpp deleted file mode 100644 index a87b0ed..0000000 --- a/src/client/demo.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "client.h" - -int main(int argc, char **argv){ - printf("Usage: %s [ip] [port]\n", argv[0]); - const char *ip = (argc >= 2)? argv[1] : "127.0.0.1"; - int port = (argc >= 3)? atoi(argv[2]) : 8800; - - // connect to server - sim::Client *client = sim::Client::connect(ip, port); - if(client == NULL){ - printf("fail to connect to server!\n"); - return 0; - } - - sim::Message req, resp; - req.set_type("ping"); - client->send(req); - client->recv(&resp); - printf("resp: %s\n", resp.encode().c_str()); - - delete client; - return 0; -} diff --git a/src/core/Makefile b/src/core/Makefile new file mode 100644 index 0000000..5bae094 --- /dev/null +++ b/src/core/Makefile @@ -0,0 +1,19 @@ +include ../../build_config.mk + +CFLAGS += -I ../ +OBJS = event.o parser.o session.o transport_impl.o server.o + +all: ${OBJS} + ar -cru ./libcore.a ${OBJS} + +.cpp.o: + $(CXX) ${CFLAGS} -c $< -o $@ + +test: + make clean + make + $(CXX) ${CFLAGS} test.cpp ${OBJS} util/libutil.a net/libnet.a line/libline.a + +clean: + rm -rf *.o *.a *.out + diff --git a/src/core/event.cpp b/src/core/event.cpp new file mode 100644 index 0000000..20d4e90 --- /dev/null +++ b/src/core/event.cpp @@ -0,0 +1,41 @@ +#include "event.h" +#include "session.h" + +Event Event::new_event(Session *sess){ + return Event(sess->id(), 0); +} +Event Event::close_event(Session *sess){ + return Event(sess->id(), 1); +} +Event Event::read_event(Session *sess){ + return Event(sess->id(), 2); +} + +Event::Event(){ + _type = -1; +} + +Event::Event(int id, int type){ + _id = id; + _type = type; +} + +int Event::id() const{ + return _id; +} + +bool Event::is_none() const{ + return _type == -1; +} + +bool Event::is_new() const{ + return _type == 0; +} + +bool Event::is_close() const{ + return _type == 1; +} + +bool Event::is_read() const{ + return _type == 2; +} diff --git a/src/core/event.h b/src/core/event.h new file mode 100644 index 0000000..a657941 --- /dev/null +++ b/src/core/event.h @@ -0,0 +1,26 @@ +#ifndef SIM_EVENT_H +#define SIM_EVENT_H + +class Session; + +class Event +{ +public: + static Event new_event(Session *sess); + static Event close_event(Session *sess); + static Event read_event(Session *sess); + + Event(); + Event(int id, int type); + int id() const; + bool is_none() const; + bool is_new() const; + bool is_close() const; + bool is_read() const; // 收到一个报文 + +private: + int _id; + int _type; +}; + +#endif diff --git a/src/core/message.h b/src/core/message.h new file mode 100644 index 0000000..1727302 --- /dev/null +++ b/src/core/message.h @@ -0,0 +1,13 @@ +#ifndef SIM_MESSAGE_H +#define SIM_MESSAGE_H + +#include "util/buffer.h" + +class Message +{ +public: + virtual ~Message(){} + virtual int encode(Buffer *buffer) = 0; +}; + +#endif diff --git a/src/core/parser.cpp b/src/core/parser.cpp new file mode 100644 index 0000000..f9909ec --- /dev/null +++ b/src/core/parser.cpp @@ -0,0 +1,37 @@ +#include "parser.h" + +ParseState ParseState::none_state(){ + return ParseState(0); +} + +ParseState ParseState::ready_state(){ + return ParseState(1); +} + +ParseState ParseState::error_state(){ + return ParseState(-1); +} + +ParseState::ParseState(){ + _code = 0; +} + +ParseState::ParseState(int code){ + _code = code; +} + +ParseState::~ParseState(){ +} + +bool ParseState::none() const{ + return _code == 0; +} + +bool ParseState::ready() const{ + return _code == 1; +} + +bool ParseState::error() const{ + return _code == -1; +} + diff --git a/src/core/parser.h b/src/core/parser.h new file mode 100644 index 0000000..03124b9 --- /dev/null +++ b/src/core/parser.h @@ -0,0 +1,37 @@ +#ifndef SIM_PARSER_H +#define SIM_PARSER_H + +class Buffer; +class Message; +class ParseState; + +class Parser +{ +public: + virtual ~Parser(){}; + virtual ParseState parse(Buffer *buffer, Message **msg) = 0; +}; + +///////////////////////////////////////// + +class ParseState +{ +public: + static ParseState none_state(); + static ParseState ready_state(); + static ParseState error_state(); + + ParseState(); + ~ParseState(); + + bool none() const; + bool ready() const; + bool error() const; + +private: + ParseState(int code); + + int _code; +}; + +#endif diff --git a/src/core/server.cpp b/src/core/server.cpp new file mode 100644 index 0000000..6af1f18 --- /dev/null +++ b/src/core/server.cpp @@ -0,0 +1,28 @@ +#include "server.h" +#include +#include "util/log.h" +#include "net/link.h" +#include "net/tcp_link.h" +#include "session.h" + +Server::Server(){ + _link = NULL; +} + +Server::~Server(){ + delete _link; +} + +Link* Server::link() const{ + return _link; +} + +int Server::listen(const char *host, int port){ + TcpLink *tcp = TcpLink::listen(host, port); + if(!tcp){ + log_error("failed to listen at %s:%d, %s", host, port, strerror(errno)); + return -1; + } + this->_link = tcp; + return 0; +} diff --git a/src/core/server.h b/src/core/server.h new file mode 100644 index 0000000..98c272e --- /dev/null +++ b/src/core/server.h @@ -0,0 +1,25 @@ +#ifndef SIM_SERVER_H +#define SIM_SERVER_H + +class Link; +class Session; + +class Server +{ +public: + Server(); + virtual ~Server(); + + Link* link() const; + + virtual void init() = 0; + // 接受新连接 + virtual Session* accept() = 0; + + int listen(const char *host, int port); + +private: + Link *_link; +}; + +#endif diff --git a/src/core/session.cpp b/src/core/session.cpp new file mode 100644 index 0000000..2ca844b --- /dev/null +++ b/src/core/session.cpp @@ -0,0 +1,88 @@ +#include "session.h" +#include "util/log.h" +#include "net/link.h" +#include "parser.h" +#include "message.h" + +static int id_incr = 1; + +Session::Session(Link *link, Parser *parser){ + _id = id_incr++; + _link = link; + _parser = parser; +} + +Session::~Session(){ + delete _link; + delete _parser; + for(std::list::iterator it=_input.begin(); it!=_input.end(); it++){ + Message *msg = *it; + delete msg; + } + for(std::list::iterator it=_output.begin(); it!=_output.end(); it++){ + Message *msg = *it; + delete msg; + } +} + +int Session::id() const{ + return _id; +} + +Link* Session::link() const{ + return _link; +} + +Parser* Session::parser() const{ + return _parser; +} + +const std::list* Session::input() const{ + return &_input; +} + +const std::list* Session::output() const{ + return &_output; +} + +int Session::parse_input(){ + int ret = 0; + while(1){ + Message *msg; + ParseState s = _parser->parse(_link->input(), &msg); + if(s.ready()){ + _input.push_back(msg); + ret ++; + }else if(s.error()){ + return -1; + }else{ + break; + } + } + return ret; +} + +int Session::encode_output(){ + int ret = 0; + while(!_output.empty()){ + Message *msg = _output.front(); + _output.pop_front(); + msg->encode(_link->output()); + delete msg; + ret ++; + } + return ret; +} + +Message* Session::recv(){ + if(_input.empty()){ + return NULL; + } + Message *msg = _input.front(); + _input.pop_front(); + return msg; +} + +void Session::send(Message *msg){ + _output.push_back(msg); +} diff --git a/src/core/session.h b/src/core/session.h new file mode 100644 index 0000000..2d79fa7 --- /dev/null +++ b/src/core/session.h @@ -0,0 +1,40 @@ +#ifndef SIM_SESSION_H +#define SIM_SESSION_H + +#include + +class Link; +class Parser; +class Message; + +class Session +{ +public: + Session(Link *link, Parser *parser); + virtual ~Session(); + + int id() const; + Link* link() const; + Parser* parser() const; + + const std::list* input() const; + const std::list* output() const; + + // 返回解析成功的要接收的报文的数量,出错返回-1 + virtual int parse_input(); + // 返回编码成功的要发送的报文的数量,出错返回-1 + virtual int encode_output(); + + Message* recv(); + void send(Message *msg); + +private: + int _id; + Link *_link; + Parser *_parser; + + std::list _input; + std::list _output; +}; + +#endif diff --git a/src/core/transport.h b/src/core/transport.h new file mode 100644 index 0000000..4e6438f --- /dev/null +++ b/src/core/transport.h @@ -0,0 +1,35 @@ +#ifndef SIM_TRANSPORT_H +#define SIM_TRANSPORT_H + +#include + +class Event; +class Server; +class Message; + +class Transport +{ +public: + static Transport* create(); + + virtual ~Transport(){} + + virtual void add_server(Server *serv) = 0; + virtual void init() = 0; + + virtual const std::vector* wait(int timeout_ms) = 0; + + // multi-thread safe + virtual void accept(int id) = 0; + // multi-thread safe + virtual void close(int id) = 0; + // multi-thread safe + virtual Message* recv(int id) = 0; + // multi-thread safe + virtual void send(int id, Message *msg) = 0; + +protected: + Transport(){} +}; + +#endif diff --git a/src/core/transport_impl.cpp b/src/core/transport_impl.cpp new file mode 100644 index 0000000..9009d2d --- /dev/null +++ b/src/core/transport_impl.cpp @@ -0,0 +1,235 @@ +#include "transport_impl.h" +#include "util/log.h" +#include "net/fde.h" +#include "net/link.h" + +#define FDE_NUM_COMMON 0 +#define FDE_NUM_SERVER 1 +#define FDE_NUM_CLIENT 2 + +// static +Transport* Transport::create(){ + Transport *ret = new TransportImpl(); + return ret; +} + +TransportImpl::TransportImpl(){ + _fdes = new Fdevents(); +} + +TransportImpl::~TransportImpl(){ + delete _fdes; +} + +void TransportImpl::add_server(Server *serv){ + serv->init(); + _servers.push_back(serv); + _fdes->set(serv->link()->fd(), FDEVENT_IN, FDE_NUM_SERVER, serv); +} + +void TransportImpl::init(){ + _fdes->set(_accept_ids.fd(), FDEVENT_IN, FDE_NUM_COMMON, &_accept_ids); + _fdes->set(_close_ids.fd(), FDEVENT_IN, FDE_NUM_COMMON, &_close_ids); + _fdes->set(_send_ids.fd(), FDEVENT_IN, FDE_NUM_COMMON, &_send_ids); + +} + +void TransportImpl::accept(int id){ + this->_accept_ids.push(id); +} + +void TransportImpl::close(int id){ + Locking l(&_mutex); + if(_opening_list.find(id) != _opening_list.end()){ + Session *sess = _opening_list[id]; + _opening_list.erase(sess->id()); + _closing_list[sess->id()] = sess; + } + if(_working_list.find(id) != _working_list.end()){ + Session *sess = _working_list[id]; + _working_list.erase(sess->id()); + _closing_list[sess->id()] = sess; + } + + this->_close_ids.push(id); +} + +Message* TransportImpl::recv(int id){ + Locking l(&_mutex); + if(_working_list.find(id) != _working_list.end()){ + Session *sess = _working_list[id]; + return sess->recv(); + } + return NULL; +} + +void TransportImpl::handle_on_read(Session *sess){ + // log_debug("net read %s", sess->link()->address().c_str()); + + bool error = false; + int ret = sess->link()->net_read(); + if(ret <= 0){ + error = true; + }else{ + Locking l(&_mutex); + int num = sess->parse_input(); + if(num == -1){ + log_debug("parse error!"); + error = true; + }else{ + // log_debug("parsed %d message(s)", num); + for(int i=0; i_events.push_back(Event::read_event(sess)); + } + } + } + + if(error){ + this->handle_on_close(sess); + } +} + +void TransportImpl::send(int id, Message *msg){ + Locking l(&_mutex); + if(_working_list.find(id) != _working_list.end()){ + Session *sess = _working_list[id]; + sess->send(msg); + + // log_debug("output.size %d", sess->output()->size()); + this->_send_ids.push(id); + } +} + +void TransportImpl::handle_send_id(){ + Locking l(&_mutex); + + int id; + _send_ids.pop(&id); + + if(_working_list.find(id) != _working_list.end()){ + Session *sess = _working_list[id]; + if(!sess->output()->empty() && !_fdes->isset(sess->link()->fd(), FDEVENT_OUT)){ + // log_debug("fde.set(%d, OUT)", sess->id()); + _fdes->set(sess->link()->fd(), FDEVENT_OUT, FDE_NUM_CLIENT, sess); + } + } +} + +void TransportImpl::handle_on_write(Session *sess){ + bool error = false; + + { + Locking l(&_mutex); + if(!sess->output()->empty()){ + int num = sess->encode_output(); + if(num == -1){ + log_debug("encode error!"); + error = true; + }else{ + // log_debug("encoded %d message(s)", num); + } + } + } + + int ret = sess->link()->net_write(); + if(ret == 0){ + // log_debug("fde.clr(%d, OUT)", sess->id()); + _fdes->clr(sess->link()->fd(), FDEVENT_OUT); + }else if(ret == -1){ + error = true; + } + + if(error){ + this->handle_on_close(sess); + } +} + +void TransportImpl::handle_on_new(Session *sess){ + log_debug("on new %s", sess->link()->address().c_str()); + + Locking l(&_mutex); + _opening_list[sess->id()] = sess; + + this->_events.push_back(Event::new_event(sess)); +} + +void TransportImpl::handle_on_close(Session *sess){ + int id = sess->id(); + log_debug("on close %s", sess->link()->address().c_str()); + Locking l(&_mutex); + if(_working_list.find(id) != _working_list.end()){ + _working_list.erase(sess->id()); + _closing_list[sess->id()] = sess; + + _fdes->del(sess->link()->fd()); + this->_events.push_back(Event::close_event(sess)); + } +} + +void TransportImpl::handle_accept_id(){ + int id; + _accept_ids.pop(&id); + + Locking l(&_mutex); + if(_opening_list.find(id) != _opening_list.end()){ + Session *sess = _opening_list[id]; + log_debug("accept %s", sess->link()->address().c_str()); + _opening_list.erase(sess->id()); + _working_list[sess->id()] = sess; + + _fdes->set(sess->link()->fd(), FDEVENT_IN, FDE_NUM_CLIENT, sess); + } +} + +void TransportImpl::handle_close_id(){ + int id; + _close_ids.pop(&id); + + Locking l(&_mutex); + if(_closing_list.find(id) != _closing_list.end()){ + Session *sess = _closing_list[id]; + log_debug("close %s", sess->link()->address().c_str()); + _closing_list.erase(id); + + _fdes->del(sess->link()->fd()); + delete sess; + } +} + +const std::vector* TransportImpl::wait(int timeout_ms){ + _events.clear(); + + const Fdevents::events_t *events = _fdes->wait(timeout_ms); + for(int i=0; i<(int)events->size(); i++){ + const Fdevent *fde = events->at(i); + if(fde->data.ptr == &this->_accept_ids){ + this->handle_accept_id(); + }else if(fde->data.ptr == &this->_close_ids){ + this->handle_close_id(); + }else if(fde->data.ptr == &this->_send_ids){ + this->handle_send_id(); + }else{ + if(fde->data.num == FDE_NUM_SERVER){ + Server *serv = (Server *)fde->data.ptr; + Session *sess = serv->accept(); + if(sess){ + this->handle_on_new(sess); + }else{ + log_error("accept return NULL"); + } + }else{ + Session *sess = (Session *)fde->data.ptr; + if(sess){ // 防止已经被 fde_del + if(fde->events & FDEVENT_IN){ + this->handle_on_read(sess); + } + if(fde->events & FDEVENT_OUT){ + this->handle_on_write(sess); + } + } + } + } + } + + return &_events; +} diff --git a/src/core/transport_impl.h b/src/core/transport_impl.h new file mode 100644 index 0000000..4126c4e --- /dev/null +++ b/src/core/transport_impl.h @@ -0,0 +1,59 @@ +#ifndef SIM_TRANSPORT_IMPL_H +#define SIM_TRANSPORT_IMPL_H + +#include +#include +#include "util/thread.h" +#include "transport.h" +#include "event.h" +#include "server.h" +#include "session.h" + +namespace sim{ + class Fdevents; +}; +using namespace sim; + +class TransportImpl : public Transport +{ +public: + TransportImpl(); + ~TransportImpl(); + + virtual void add_server(Server *serv); + virtual void init(); + + virtual const std::vector* wait(int timeout_ms); + + virtual void accept(int id); + virtual void close(int id); + virtual Message* recv(int id); + virtual void send(int id, Message *msg); + +private: + Mutex _mutex; + + std::map _working_list; + std::map _opening_list; + std::map _closing_list; + + std::vector _events; + + Channel _accept_ids; + Channel _close_ids; + Channel _send_ids; + + void handle_on_new(Session *sess); + void handle_on_close(Session *sess); + void handle_on_read(Session *sess); + void handle_on_write(Session *sess); + void handle_accept_id(); + void handle_close_id(); + void handle_send_id(); + + Fdevents *_fdes; + + std::vector _servers; +}; + +#endif diff --git a/src/core/worker.h b/src/core/worker.h new file mode 100644 index 0000000..aa40bb0 --- /dev/null +++ b/src/core/worker.h @@ -0,0 +1,84 @@ +#ifndef SIM_WORKER_H +#define SIM_WORKER_H + +#include "util/thread.h" +#include "util/log.h" + +template +class Worker +{ +public: + Worker(); + virtual ~Worker(); + + void start(int num=1); + void stop(); + void add_task(T task); + + virtual void process(T task) = 0; + +private: + Queue _tasks; + Mutex _mutex; + bool _quit; + int _num; + static void* run(void *arg); +}; + +template +Worker::Worker(){ +} + +template +Worker::~Worker(){ +} + +template +void Worker::add_task(T task){ + _tasks.push(task); +} + +template +void Worker::start(int num){ + _quit = false; + _num = num; + for(int i=0; i +void Worker::stop(){ + _quit = true; + + for(int i=0; i<20; i++){ + usleep(10 * 1000); + Locking l(&_mutex); + if(_num == 0){ + break; + } + } +} + +template +void* Worker::run(void *arg){ + Worker *worker = (Worker *)arg; + + while(!worker->_quit){ + T event; + int ret = worker->_tasks.pop(&event, 20); + if(ret == 1){ + worker->process(event); + } + } + + Locking l(&worker->_mutex); + worker->_num --; + + return NULL; +} +#endif diff --git a/src/decoder.cpp b/src/decoder.cpp deleted file mode 100644 index aed34e2..0000000 --- a/src/decoder.cpp +++ /dev/null @@ -1,92 +0,0 @@ -#include "util/strings.h" -#include "util/log.h" -#include "sim.h" - -namespace sim{ - -const static int BUF_RESIZE_TRIGGER = 16 * 1024; - -int Decoder::push(const char *buf, int len){ - buffer.append(buf, len); - //log_debug("'%s'", str_escape(buffer).c_str()); - return len; -} - -int Decoder::parse(Message *msg){ - msg->reset(); - - if(buffer_offset >= BUF_RESIZE_TRIGGER){ - //log_debug("resize buffer"); - buffer = std::string(buffer.data() + buffer_offset, buffer.size() - buffer_offset); - buffer_offset = 0; - } - - while(buffer.size() > buffer_offset && isspace(buffer[buffer_offset])){ - buffer_offset ++; - } - if(buffer.size() == buffer_offset){ - return 0; - } - - const char *key = buffer.data() + buffer_offset; - const char *msg_end = (const char *)memchr(key, sim::MSG_END_BYTE, buffer.size() - buffer_offset); - if(!msg_end){ - return 0; - } - int msg_len = msg_end - key + 1; - int size = msg_len; - - int auto_tag = 0; - while(1){ - int key_len = 0; - int val_len; - int tag; - - const char *end; - end = (const char *)memchr(key, sim::KV_END_BYTE, size); - // 兼容最后一个 空格 被省略的情况 - if(end == NULL){ - end = msg_end; - } - - const char *val = (const char *)memchr(key, sim::KV_SEP_BYTE, end - key); - if(val == NULL){ - val = key; - tag = auto_tag; - }else{ - val ++; - key_len = val - key - 1; - size -= key_len + 1; - std::string key_s(key, key_len); - tag = str_to_int(key_s); - } - - val_len = end - val; - size -= val_len + 1; - - if(val_len > 0 && val[val_len - 1] == '\r'){ - val_len -= 1; - } - - //printf("%u key: %u, val: %u\n", __LINE__, key_len, val_len); - - std::string val_s(val, val_len); - msg->set(tag, val_s); - - key = end + 1; - auto_tag = tag + 1; - - if(key >= msg_end){ - std::map::iterator it; - for(it=msg->fields_.begin(); it!=msg->fields_.end(); it++){ - it->second = sim::decode(it->second); - } - buffer_offset += msg_len; - //log_debug("msg.len: %d, buffer.len: %d", msg_len, buffer.size()); - return 1; - } - } - return 0; -} - -}; // namespace sim diff --git a/src/decoder.h b/src/decoder.h deleted file mode 100644 index 08b6227..0000000 --- a/src/decoder.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SIM_DECODER_H_ -#define SIM_DECODER_H_ - -#include -#include "message.h" - -namespace sim{ - -class Decoder{ -public: - Decoder(){ - buffer_offset = 0; - } - // 当你从 socket 中读到数据, 或者从文件中读到数据时, 将数据压入解码器的数据缓冲区 - int push(const char *buf, int len); - // 解析缓冲区中的数据, 如果解析出一个完整的 FIX 报文, 返回 1, - // 如果数据不足一个报文, 返回 0, 你应该继续读取数据并压入解码器. - // 如果出错(如系统错误), 返回 -1. - int parse(Message *msg); -private: - // TODO: 优化成 ring buffer, 直接从 socket 里读 - std::string buffer; - int buffer_offset; -}; - -}; // namespace sim - -#endif diff --git a/src/handler.cpp b/src/handler.cpp deleted file mode 100644 index a99d1d0..0000000 --- a/src/handler.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "util/log.h" -#include "handler.h" -#include "util/thread.h" - -namespace sim{ - -int Handler::m_init(){ - resps = new SelectableQueue(); - return this->init(); -} - -int Handler::m_free(){ - delete resps; - return this->free(); -} - -int Handler::fd(){ - return resps->fd(); -} - -HandlerState Handler::accept(const Session &sess){ - return HANDLE_OK; -} - -HandlerState Handler::close(const Session &sess){ - return HANDLE_OK; -} - -HandlerState Handler::proc(const Request &req, Response *resp){ - return HANDLE_OK; -} - -void Handler::async_send(Response *resp){ - this->resps->push(resp); -} - -Response* Handler::handle(){ - while(this->resps->size() > 0){ - Response *resp; - if(this->resps->pop(&resp) == 1 && resp != NULL){ - return resp; - } - } - return NULL; -} - -}; // namespace sim diff --git a/src/handler.h b/src/handler.h deleted file mode 100644 index 4e181a9..0000000 --- a/src/handler.h +++ /dev/null @@ -1,99 +0,0 @@ -#ifndef SIM_HANDLER_H_ -#define SIM_HANDLER_H_ - -#include "link.h" -#include "message.h" - -template -class SelectableQueue; - -namespace sim{ - -class Session -{ -public: - int64_t id; - Link *link; - - Session(){ - static int64_t inc = 0; - this->id = inc ++; - this->link = NULL; - } - ~Session(){ - } -}; - -class Request{ -public: - Message msg; - Session sess; - - double stime; - double time_wait; - double time_proc; -}; - -class Response{ -public: - Message msg; - Session sess; -}; - - -typedef enum{ - HANDLE_OK = 0, - HANDLE_FAIL = 1, - HANDLE_RESP = 1, -}HandlerState; - - -class Handler -{ -public: - Handler(){}; - virtual ~Handler(){}; - - // 当有新客户端进来时, 此方法被调用. 如果返回 HANDLE_FAIL, 连接将被关闭. - virtual HandlerState accept(const Session &sess); - // 当客户端被关闭时, 此方法被调用. - virtual HandlerState close(const Session &sess); - - // 当收到客户端的一个请求报文时, 调用此方法. - // 如果有响应需要立即返回给客户端, 将响应加到 resp 中, 并返回 HANDLE_RESP; - virtual HandlerState proc(const Request &req, Response *resp); - - virtual int init(){ return 0; } - virtual int free(){ return 0; } - //virtual void thread(); - - /***** 以下是特殊方法, 你一般不需要关心. *****/ - - // 此方法默认返回异步响应队列的 fd, 你可以重写此方法, 返回你自己的 fd. - virtual int fd(); - - // 当 fd() 有可读事件时, 本函数被调用. - // 如果此方法有响应需要立即返回, 请返回 Response 实例, 外面会负责释放内存. - // 如无响应, 返回 NULL. - virtual Response* handle(); - -protected: - // 将异步响应加入到队列中, 该响应会被发送给客户端. - // 如果 Handler 是多线程的, 可以会调用本方法将响应发给客户端. - void async_send(Response *resp); - - HandlerState ok(){ return HANDLE_OK; }; - HandlerState fail(){ return HANDLE_FAIL; }; - HandlerState resp(){ return HANDLE_RESP; }; - -private: - SelectableQueue *resps; - - int m_init(); - int m_free(); - friend class Server; -}; - -}; // namespace sim - -#endif diff --git a/src/http/Makefile b/src/http/Makefile new file mode 100644 index 0000000..8c3f543 --- /dev/null +++ b/src/http/Makefile @@ -0,0 +1,14 @@ +include ../../build_config.mk + +CFLAGS += -I ../ +OBJS = + +all: $(OBJS) + ar -cru ./libhttp.a ${OBJS} + +.cpp.o: + $(CXX) ${CFLAGS} -c $< -o $@ + +clean: + rm -rf *.o *.a *.out + diff --git a/src/line/Makefile b/src/line/Makefile new file mode 100644 index 0000000..3d158f9 --- /dev/null +++ b/src/line/Makefile @@ -0,0 +1,14 @@ +include ../../build_config.mk + +CFLAGS += -I ../ +OBJS = line_message.o line_parser.o line_server.o + +all: $(OBJS) + ar -cru ./libline.a ${OBJS} + +.cpp.o: + $(CXX) ${CFLAGS} -c $< -o $@ + +clean: + rm -rf *.o *.a *.out + diff --git a/src/line/line_message.cpp b/src/line/line_message.cpp new file mode 100644 index 0000000..067f8d5 --- /dev/null +++ b/src/line/line_message.cpp @@ -0,0 +1,15 @@ +#include "line_message.h" + +std::string LineMessage::text() const{ + return _text; +} + +void LineMessage::text(const std::string &s){ + _text = s; +} + +int LineMessage::encode(Buffer *buffer){ + buffer->append(_text.data(), _text.size()); + buffer->append("\n", 1); + return _text.size() + 1; +} diff --git a/src/line/line_message.h b/src/line/line_message.h new file mode 100644 index 0000000..1325643 --- /dev/null +++ b/src/line/line_message.h @@ -0,0 +1,20 @@ +#ifndef SIM_LINE_MESSAGE_H +#define SIM_LINE_MESSAGE_H + +#include +#include "core/message.h" +#include "util/buffer.h" + +class LineMessage : public Message +{ +public: + std::string text() const; + void text(const std::string &s); + + virtual int encode(Buffer *buffer); + +private: + std::string _text; +}; + +#endif diff --git a/src/line/line_parser.cpp b/src/line/line_parser.cpp new file mode 100644 index 0000000..32db61a --- /dev/null +++ b/src/line/line_parser.cpp @@ -0,0 +1,26 @@ +#include "line_parser.h" +#include +#include "line_message.h" + +ParseState LineParser::parse(Buffer *buffer, Message **msg){ + const char *data = buffer->data(); + char *end = (char *)memchr(data, '\n', buffer->size()); + if(!end){ + return ParseState::none_state(); + } + int total = end - data + 1; + int len = end - data; + // 兼容 \r\n + if(len >= 1 && data[len - 1] == '\r'){ + len -= 1; + } + + std::string s(data, len); + LineMessage *lm = new LineMessage(); + lm->text(s); + *msg = lm; + + buffer->remove(total); + + return ParseState::ready_state(); +} diff --git a/src/line/line_parser.h b/src/line/line_parser.h new file mode 100644 index 0000000..787d79c --- /dev/null +++ b/src/line/line_parser.h @@ -0,0 +1,15 @@ +#ifndef SIM_LINE_PARSER_H +#define SIM_LINE_PARSER_H + +#include "core/parser.h" +#include "core/message.h" +#include "core/session.h" +#include "util/buffer.h" + +class LineParser : public Parser +{ +public: + virtual ParseState parse(Buffer *buffer, Message **msg); +}; + +#endif diff --git a/src/line/line_server.cpp b/src/line/line_server.cpp new file mode 100644 index 0000000..4da3dee --- /dev/null +++ b/src/line/line_server.cpp @@ -0,0 +1,21 @@ +#include "line_server.h" +#include "line_parser.h" +#include "net/link.h" + +LineServer::LineServer(){ +} + +LineServer::~LineServer(){ +} + +void LineServer::init(){ +} + +Session* LineServer::accept(){ + Link *link = this->link()->accept(); + if(!link){ + return NULL; + } + Session *sess = new Session(link, new LineParser()); + return sess; +} diff --git a/src/line/line_server.h b/src/line/line_server.h new file mode 100644 index 0000000..87d7942 --- /dev/null +++ b/src/line/line_server.h @@ -0,0 +1,17 @@ +#ifndef SIM_LINE_SERVER_H +#define SIM_LINE_SERVER_H + +#include "core/server.h" +#include "core/session.h" + +class LineServer : public Server +{ +public: + LineServer(); + ~LineServer(); + + virtual void init(); + virtual Session* accept(); +}; + +#endif diff --git a/src/link.h b/src/link.h deleted file mode 100644 index dbabb83..0000000 --- a/src/link.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef SIM_LINK_H_ -#define SIM_LINK_H_ - -#include -#include -#include -#include -#include -#include -#include "decoder.h" - -namespace sim{ - -class Link{ -private: - int sock; - bool noblock_; - bool error_; - Decoder decoder_; - Link(bool is_server=false); - - // TODO: max_recv_buf_size, max_send_buf_size -public: - std::string output; - - char remote_ip[INET_ADDRSTRLEN]; - int remote_port; - - double create_time; - double active_time; - - ~Link(); - void close(); - void nodelay(bool enable=true); - // noblock(true) is supposed to corperate with IO Multiplex, - // otherwise, flush() may cause a lot unneccessary write calls. - void noblock(bool enable=true); - void keepalive(bool enable=true); - - int fd() const{ - return sock; - } - bool error() const{ - return error_; - } - void mark_error(){ - error_ = true; - } - - static Link* connect(const char *ip, int port); - static Link* connect(const std::string &ip, int port); - static Link* listen(const char *ip, int port); - static Link* listen(const std::string &ip, int port); - Link* accept(); - - // read network data info buffer - int read(); - int write(); - // flush buffered data to network - // REQUIRES: nonblock - int flush(); - - int recv(Message *msg); - int send(const Message &msg); -}; - - -}; // namespace sim - -#endif diff --git a/src/message.cpp b/src/message.cpp deleted file mode 100644 index dbac65f..0000000 --- a/src/message.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include "util/strings.h" -#include "sim.h" - -namespace sim{ - -Message::Message(){ -} - -Message::~Message(){ -} - -void Message::reset(){ - fields_.clear(); -} - -std::string Message::type() const{ - return type_; -} - -void Message::set_type(const std::string &type){ - type_ = type; -} - -void Message::set(int tag, int32_t val){ - this->set(tag, (int64_t)val); -} - -void Message::set(int tag, int64_t val){ - this->set(tag, str(val)); -} - -void Message::set(int tag, const char *val){ - this->set(tag, str(val)); -} - -void Message::set(int tag, const std::string &val){ - if(tag == 0){ - this->set_type(val); - } - fields_[tag] = val; -} - -void Message::add(const std::string &val){ - int tag; - std::map::const_reverse_iterator it; - it = fields_.rbegin(); - if(it == fields_.rend()){ - tag = 0; - }else{ - tag = it->first + 1; - } - this->set(tag, val); -} - -const std::string* Message::get(int tag) const{ - std::map::const_iterator it; - it = fields_.find(tag); - if(it == fields_.end()){ - return NULL; - } - return &(it->second); -} - -static std::string encode_field(int tag, const std::string &val){ - std::string buffer; - buffer.append(str(tag)); - buffer.push_back(sim::KV_SEP_BYTE); - buffer.append(sim::encode(val)); - buffer.push_back(sim::KV_END_BYTE); - return buffer; -} - -std::string Message::encode() const{ - std::string buffer; - buffer.append(encode_field(0, this->type())); - std::map::const_iterator it; - for(it=fields_.begin(); it!=fields_.end(); it++){ - int tag = it->first; - if(tag == 0){ - continue; - } - const std::string &val = it->second; - buffer.append(encode_field(tag, val)); - } - buffer[buffer.size()-1] = sim::MSG_END_BYTE; - return buffer; -} - -}; // namespace sim diff --git a/src/message.h b/src/message.h deleted file mode 100644 index 8ed7fd2..0000000 --- a/src/message.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef SIM_MESSAGE_H_ -#define SIM_MESSAGE_H_ - -#include -#include -#include -#include - -namespace sim{ - -class Message{ -public: - Message(); - ~Message(); - - void reset(); - - std::string type() const; - void set_type(const std::string &type); - - void add(const std::string &val); // 自增 tag, 从 0 开始 - void set(int tag, int32_t val); - void set(int tag, int64_t val); - void set(int tag, const char *val); - void set(int tag, const std::string &val); - const std::string* get(int tag) const; - - std::string encode() const; - const std::map* fields() const{ - return &fields_; - } - -private: - std::string type_; - std::map fields_; - friend class Decoder; -}; - -}; // namespace sim - -#endif diff --git a/src/net/Makefile b/src/net/Makefile new file mode 100644 index 0000000..d2691dc --- /dev/null +++ b/src/net/Makefile @@ -0,0 +1,14 @@ +include ../../build_config.mk + +CFLAGS += -I ../ +OBJS = fde.o link.o tcp_link.o + +all: $(OBJS) + ar -cru ./libnet.a ${OBJS} + +.cpp.o: + $(CXX) ${CFLAGS} -c $< -o $@ + +clean: + rm -rf *.o *.a *.out + diff --git a/src/fde.cpp b/src/net/fde.cpp similarity index 100% rename from src/fde.cpp rename to src/net/fde.cpp diff --git a/src/fde.h b/src/net/fde.h similarity index 96% rename from src/fde.h rename to src/net/fde.h index dc2432e..fddd428 100644 --- a/src/fde.h +++ b/src/net/fde.h @@ -57,7 +57,7 @@ struct Fdevent{ class Fdevents{ public: - typedef std::vector events_t; + typedef std::vector events_t; private: #ifdef HAVE_EPOLL static const int MAX_FDS = 8 * 1024; diff --git a/src/fde_epoll.cpp b/src/net/fde_epoll.cpp similarity index 98% rename from src/fde_epoll.cpp rename to src/net/fde_epoll.cpp index bddc4ec..b94fc1f 100644 --- a/src/fde_epoll.cpp +++ b/src/net/fde_epoll.cpp @@ -60,6 +60,8 @@ int Fdevents::del(int fd){ } struct Fdevent *fde = get_fde(fd); + fde->data.num = 0; + fde->data.ptr = NULL; fde->s_flags = FDEVENT_NONE; return 0; } diff --git a/src/fde_select.cpp b/src/net/fde_select.cpp similarity index 98% rename from src/fde_select.cpp rename to src/net/fde_select.cpp index 8a18ff0..8349c03 100644 --- a/src/fde_select.cpp +++ b/src/net/fde_select.cpp @@ -53,6 +53,8 @@ int Fdevents::del(int fd){ FD_CLR(fd, &writeset); struct Fdevent *fde = get_fde(fd); + fde->data.num = 0; + fde->data.ptr = NULL; fde->s_flags = FDEVENT_NONE; while(maxfd >= 0 && this->events[maxfd]->s_flags == 0){ maxfd --; diff --git a/src/net/link.cpp b/src/net/link.cpp new file mode 100644 index 0000000..3cae8d5 --- /dev/null +++ b/src/net/link.cpp @@ -0,0 +1,53 @@ +#include "link.h" +#include +#include +#include "util/buffer.h" + +Link::Link(){ + _fd = -1; + _nonblock = false; + _input = new Buffer(); + _output = new Buffer(); +} + +Link::~Link(){ + this->close(); + delete _input; + delete _output; +} + +int Link::fd() const{ + return _fd; +} + +Buffer* Link::input() const{ + return _input; +} + +Buffer* Link::output() const{ + return _output; +} + +std::string Link::address() const{ + return _address; +} + +void Link::close(){ + if(_fd >= 0){ + ::close(_fd); + _fd = -1; + } +} + +bool Link::nonblock() const{ + return _nonblock; +} + +void Link::nonblock(bool enable){ + _nonblock = enable; + if(enable){ + ::fcntl(_fd, F_SETFL, O_NONBLOCK | O_RDWR); + }else{ + ::fcntl(_fd, F_SETFL, O_RDWR); + } +} diff --git a/src/net/link.h b/src/net/link.h new file mode 100644 index 0000000..d0eddf6 --- /dev/null +++ b/src/net/link.h @@ -0,0 +1,38 @@ +#ifndef SIM_LINK_H +#define SIM_LINK_H + +#include +#include + +class Buffer; + +class Link +{ +public: + Link(); + virtual ~Link(); + + int fd() const; + Buffer* input() const; + Buffer* output() const; + std::string address() const; + + bool nonblock() const; + void nonblock(bool enable); + + void close(); + virtual Link* accept() = 0; + virtual int net_read() = 0; + virtual int net_write() = 0; + +protected: + int _fd; + Buffer* _input; + Buffer* _output; + std::string _address; + +private: + bool _nonblock; +}; + +#endif diff --git a/src/link.cpp b/src/net/tcp_link.cpp similarity index 58% rename from src/link.cpp rename to src/net/tcp_link.cpp index 1020b5b..60fdfa6 100644 --- a/src/link.cpp +++ b/src/net/tcp_link.cpp @@ -1,58 +1,36 @@ #include #include -#include #include #include #include +#include "tcp_link.h" #include "util/log.h" -#include "link.h" +#include "util/buffer.h" -namespace sim{ +// namespace sim{ -Link::Link(bool is_server){ - sock = -1; - noblock_ = false; - error_ = false; - remote_ip[0] = '\0'; - remote_port = -1; +TcpLink::TcpLink(bool is_server){ } -Link::~Link(){ - this->close(); +TcpLink::~TcpLink(){ } -void Link::close(){ - if(sock >= 0){ - ::close(sock); - sock = -1; - } -} - -void Link::nodelay(bool enable){ +void TcpLink::nodelay(bool enable){ int opt = enable? 1 : 0; - ::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)); + ::setsockopt(fd(), IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)); } -void Link::keepalive(bool enable){ +void TcpLink::keepalive(bool enable){ int opt = enable? 1 : 0; - ::setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&opt, sizeof(opt)); + ::setsockopt(fd(), SOL_SOCKET, SO_KEEPALIVE, (void *)&opt, sizeof(opt)); } -void Link::noblock(bool enable){ - noblock_ = enable; - if(enable){ - ::fcntl(sock, F_SETFL, O_NONBLOCK | O_RDWR); - }else{ - ::fcntl(sock, F_SETFL, O_RDWR); - } -} - -Link* Link::connect(const std::string &ip, int port){ +TcpLink* TcpLink::connect(const std::string &ip, int port){ return connect(ip.c_str(), port); } -Link* Link::connect(const char *ip, int port){ - Link *link; +TcpLink* TcpLink::connect(const char *ip, int port){ + TcpLink *link; int sock = -1; struct sockaddr_in addr; @@ -69,8 +47,8 @@ Link* Link::connect(const char *ip, int port){ } //log_debug("fd: %d, connect to %s:%d", sock, ip, port); - link = new Link(); - link->sock = sock; + link = new TcpLink(); + link->_fd = sock; link->keepalive(true); return link; sock_err: @@ -81,12 +59,12 @@ Link* Link::connect(const char *ip, int port){ return NULL; } -Link* Link::listen(const std::string &ip, int port){ +TcpLink* TcpLink::listen(const std::string &ip, int port){ return listen(ip.c_str(), port); } -Link* Link::listen(const char *ip, int port){ - Link *link; +TcpLink* TcpLink::listen(const char *ip, int port){ + TcpLink *link; int sock = -1; int opt = 1; @@ -109,11 +87,13 @@ Link* Link::listen(const char *ip, int port){ goto sock_err; } //log_debug("server socket fd: %d, listen on: %s:%d", sock, ip, port); + char s[INET_ADDRSTRLEN + 6]; + snprintf(s, sizeof(s), "%s:%d", ip, port); + + link = new TcpLink(true); + link->_fd = sock; + link->_address = s; - link = new Link(true); - link->sock = sock; - snprintf(link->remote_ip, sizeof(link->remote_ip), "%s", ip); - link->remote_port = port; return link; sock_err: //log_debug("listen %s:%d failed: %s", ip, port, strerror(errno)); @@ -123,13 +103,13 @@ Link* Link::listen(const char *ip, int port){ return NULL; } -Link* Link::accept(){ - Link *link; +Link* TcpLink::accept(){ + TcpLink *link; int client_sock; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); - while((client_sock = ::accept(sock, (struct sockaddr *)&addr, &addrlen)) == -1){ + while((client_sock = ::accept(fd(), (struct sockaddr *)&addr, &addrlen)) == -1){ if(errno != EINTR){ //log_error("socket %d accept failed: %s", sock, strerror(errno)); return NULL; @@ -142,22 +122,26 @@ Link* Link::accept(){ //log_error("socket %d set linger failed: %s", client_sock, strerror(errno)); } - link = new Link(); - link->sock = client_sock; + char s[INET_ADDRSTRLEN + 6]; + inet_ntop(AF_INET, &addr.sin_addr, s, sizeof(s)); + int len = strlen(s); + snprintf(s+len, sizeof(s) - len, "%d", ntohs(addr.sin_port)); + + link = new TcpLink(); + link->_fd = client_sock; + link->_address = s; link->keepalive(true); - inet_ntop(AF_INET, &addr.sin_addr, link->remote_ip, sizeof(link->remote_ip)); - link->remote_port = ntohs(addr.sin_port); return link; } -int Link::read(){ +int TcpLink::net_read(){ int ret = 0; - char buf[16 * 1024]; + char buf[8 * 1024]; int want = sizeof(buf); while(1){ // test //want = 1; - int len = ::read(sock, buf, want); + int len = ::read(fd(), buf, want); if(len == -1){ if(errno == EINTR){ continue; @@ -173,9 +157,10 @@ int Link::read(){ return 0; } ret += len; - decoder_.push(buf, len); + + _input->append(buf, len); } - if(!noblock_){ + if(!nonblock()){ break; } } @@ -183,13 +168,13 @@ int Link::read(){ return ret; } -int Link::write(){ +int TcpLink::net_write(){ int ret = 0; int want; - while((want = output.size()) > 0){ + while((want = _output->size()) > 0){ // test - //want = 1; - int len = ::write(sock, output.data(), want); + // want = 1; + int len = ::write(fd(), _output->data(), want); if(len == -1){ if(errno == EINTR){ continue; @@ -200,41 +185,19 @@ int Link::write(){ return -1; } }else{ - //log_info("fd: %d, want: %d, write: %d", sock, want, len); + //log_debug("fd: %d, want: %d, write: %d", sock, want, len); if(len == 0){ // ? break; } ret += len; - output = std::string(output.data() + len, output.size() - len); + _output->remove(len); } - if(!noblock_){ + if(!nonblock()){ break; } } return ret; } -int Link::flush(){ - int len = 0; - while(!output.empty()){ - int ret = this->write(); - if(ret == -1){ - return -1; - } - len += ret; - } - return len; -} - -int Link::recv(Message *msg){ - return decoder_.parse(msg); -} - -int Link::send(const Message &msg){ - std::string s = msg.encode(); - output.append(s); - return (int)s.size(); -} - -}; // namespace sim +// }; // namespace sim diff --git a/src/net/tcp_link.h b/src/net/tcp_link.h new file mode 100644 index 0000000..63417ad --- /dev/null +++ b/src/net/tcp_link.h @@ -0,0 +1,37 @@ +#ifndef SIM_TCP_SOCKET_H_ +#define SIM_TCP_SOCKET_H_ + +#include +#include +#include +#include +#include +#include "link.h" + +// namespace sim{ + +class TcpLink : public Link{ +public: + virtual ~TcpLink(); + + void nodelay(bool enable=true); + void keepalive(bool enable=true); + + virtual Link* accept(); + virtual int net_read(); + virtual int net_write(); + + static TcpLink* connect(const char *ip, int port); + static TcpLink* connect(const std::string &ip, int port); + static TcpLink* listen(const char *ip, int port); + static TcpLink* listen(const std::string &ip, int port); + +private: + + TcpLink(bool is_server=false); +}; + + +// }; // namespace sim + +#endif diff --git a/src/server.cpp b/src/server.cpp deleted file mode 100644 index f69005b..0000000 --- a/src/server.cpp +++ /dev/null @@ -1,278 +0,0 @@ -#include -#include -#include -#include -#include -#include "util/log.h" -#include "sim.h" -#include "fde.h" -#include "server.h" - -#include "util/strings.h" - -static std::string msg_str(const sim::Message &msg){ - std::string ret; - const std::map *fields = msg.fields(); - std::map::const_iterator it; - char buf[50]; - int count = 0; - for(it=fields->begin(); it!=fields->end(); it++){ - if(ret.size() > 100){ - snprintf(buf, sizeof(buf), "[%d more...]", (int)fields->size() - count); - ret.append(buf); - break; - } - - int tag = it->first; - const std::string &val = it->second; - ret.append(str(tag)); - ret.push_back('='); - if(val.size() < 30){ - std::string h = sim::encode(val, true); - ret.append(h); - }else{ - sprintf(buf, "[%d]", (int)val.size()); - ret.append(buf); - } - - count ++; - if(count != (int)fields->size()){ - ret.push_back(' '); - } - } - return ret; -} - - -namespace sim{ - -const static int DEFAULT_TYPE = 0; -const static int HANDLER_TYPE = 1; - -Server::Server(){ - signal(SIGPIPE, SIG_IGN); - link_count = 0; - serv_link = NULL; - fdes = new Fdevents(); -} - -Server::~Server(){ - for(int i=0; ihandlers.size(); i++){ - Handler *handler = this->handlers[i]; - handler->m_free(); - delete handler; - } - this->handlers.clear(); - - delete serv_link; - delete fdes; -} - -Server* Server::listen(const std::string &ip, int port){ - Link *serv_link = Link::listen(ip, port); - if(!serv_link){ - return NULL; - } - - Server *ret = new Server(); - ret->serv_link = serv_link; - ret->fdes->set(serv_link->fd(), FDEVENT_IN, DEFAULT_TYPE, serv_link); - return ret; -} - -void Server::add_handler(Handler *handler){ - handler->m_init(); - this->handlers.push_back(handler); - if(handler->fd() > 0){ - fdes->set(handler->fd(), FDEVENT_IN, HANDLER_TYPE, handler); - } -} - -Session* Server::accept_session(){ - Link *link = serv_link->accept(); - if(link == NULL){ - log_error("accept failed! %s", strerror(errno)); - return NULL; - } - - link->nodelay(); - link->noblock(); - link->create_time = microtime(); - link->active_time = link->create_time; - - Session *sess = new Session(); - sess->link = link; - this->sessions[sess->id] = sess; - - for(int i=0; ihandlers.size(); i++){ - Handler *handler = this->handlers[i]; - HandlerState state = handler->accept(*sess); - if(state == HANDLE_FAIL){ - delete link; - delete sess; - return NULL; - } - } - - this->link_count ++; - log_debug("new link from %s:%d, fd: %d, links: %d", - link->remote_ip, link->remote_port, link->fd(), this->link_count); - fdes->set(link->fd(), FDEVENT_IN, DEFAULT_TYPE, sess); - - return sess; -} - -int Server::close_session(Session *sess){ - Link *link = sess->link; - for(int i=0; ihandlers.size(); i++){ - Handler *handler = this->handlers[i]; - handler->close(*sess); - } - - this->link_count --; - log_debug("delete link %s:%d, fd: %d, links: %d", - link->remote_ip, link->remote_port, link->fd(), this->link_count); - fdes->del(link->fd()); - - this->sessions.erase(sess->id); - delete link; - delete sess; - return 0; -} - -int Server::read_session(Session *sess){ - Link *link = sess->link; - if(link->error()){ - return 0; - } - - int len = link->read(); - if(len <= 0){ - this->close_session(sess); - return -1; - } - - while(1){ - Request req; - int ret = link->recv(&req.msg); - if(ret == -1){ - log_info("fd: %d, parse error, delete link", link->fd()); - this->close_session(sess); - return -1; - }else if(ret == 0){ - // 报文未就绪, 继续读网络 - break; - } - req.stime = microtime(); - req.sess = *sess; - - Response resp; - for(int i=0; ihandlers.size(); i++){ - Handler *handler = this->handlers[i]; - req.time_wait = 1000 * (microtime() - req.stime); - HandlerState state = handler->proc(req, &resp); - req.time_proc = 1000 * (microtime() - req.stime) - req.time_wait; - if(state == HANDLE_RESP){ - link->send(resp.msg); - if(link && !link->output.empty()){ - fdes->set(link->fd(), FDEVENT_OUT, DEFAULT_TYPE, sess); - } - - if(log_level() >= Logger::LEVEL_DEBUG){ - log_debug("w:%.3f,p:%.3f, req: %s resp: %s", - req.time_wait, req.time_proc, - msg_str(req.msg).c_str(), - msg_str(resp.msg).c_str()); - } - }else if(state == HANDLE_FAIL){ - this->close_session(sess); - return -1; - } - } - } - - return 0; -} - -int Server::write_session(Session *sess){ - Link *link = sess->link; - if(link->error()){ - return 0; - } - - int len = link->write(); - if(len <= 0){ - log_debug("fd: %d, write: %d, delete link", link->fd(), len); - this->close_session(sess); - return -1; - } - if(link->output.empty()){ - fdes->clr(link->fd(), FDEVENT_OUT); - } - return 0; -} - -Session* Server::get_session(int64_t sess_id){ - std::map::iterator it; - it = sessions.find(sess_id); - if(it == sessions.end()){ - return NULL; - } - return it->second; -} - -void Server::loop(){ - while(1){ - if(this->loop_once() == -1){ - break; - } - } -} - -int Server::loop_once(){ - const Fdevents::events_t *events; - events = fdes->wait(20); - if(events == NULL){ - log_fatal("events.wait error: %s", strerror(errno)); - return 0; - } - - for(int i=0; i<(int)events->size(); i++){ - const Fdevent *fde = events->at(i); - if(fde->data.ptr == serv_link){ - this->accept_session(); - }else if(fde->data.num == HANDLER_TYPE){ - Handler *handler = (Handler *)fde->data.ptr; - while(Response *resp = handler->handle()){ - Session *sess = this->get_session(resp->sess.id); - if(sess){ - Link *link = sess->link; - link->send(resp->msg); - if(link && !link->output.empty()){ - fdes->set(link->fd(), FDEVENT_OUT, DEFAULT_TYPE, sess); - } - } - delete resp; - } - }else{ - Session *sess = (Session *)fde->data.ptr; - Link *link = sess->link; - if(fde->events & FDEVENT_IN){ - if(this->read_session(sess) == -1){ - continue; - } - } - if(fde->events & FDEVENT_OUT){ - if(this->write_session(sess) == -1){ - continue; - } - } - if(link && !link->output.empty()){ - fdes->set(link->fd(), FDEVENT_OUT, DEFAULT_TYPE, sess); - } - } - } - return 0; -} - -}; // namespace sim diff --git a/src/server.h b/src/server.h deleted file mode 100644 index be07a9f..0000000 --- a/src/server.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef SIM_SERVER_H_ -#define SIM_SERVER_H_ - -#include -#include -#include -#include "link.h" -#include "handler.h" - -namespace sim{ - -class Fdevents; - -class Server -{ -public: - static Server* listen(const std::string &ip, int port); - //static Server* create(...); - Server(); - ~Server(); - - void add_handler(Handler *handler); - void loop(); - int loop_once(); - - //int send(int64_t sess_id, const Message &msg); - //int send_all(const Message &msg); -private: - Fdevents *fdes; - Link *serv_link; - int link_count; - std::map sessions; - std::vector handlers; - - Session* accept_session(); - Session* get_session(int64_t sess_id); - int close_session(Session *sess); - int read_session(Session *sess); - int write_session(Session *sess); -}; - -}; // namespace sim - -#endif diff --git a/src/sim.cpp b/src/sim.cpp deleted file mode 100644 index 174f51a..0000000 --- a/src/sim.cpp +++ /dev/null @@ -1,136 +0,0 @@ -#include -#include "sim.h" - -namespace sim{ - -std::string version(){ - return "1.0"; -} - -std::string encode(const std::string s, bool force_ascii){ - std::string ret; - int size = (int)s.size(); - for(int i=0; i= '!' && c <= '~'){ - ret.push_back(c); - }else{ - ret.append("\\x"); - unsigned char d = c; - ret.push_back(hex[d >> 4]); - ret.push_back(hex[d & 0x0f]); - } - } - break; - } - } - return ret; -} - -inline static -int hex_int(char c){ - if(c >= '0' && c <= '9'){ - return c - '0'; - }else{ - return c - 'a' + 10; - } -} - -std::string decode(const std::string s){ - int size = (int)s.size(); - std::string ret; - for(int i=0; i= size - 1){ - break; - } - char c2 = s[++i]; - switch(c2){ - case 's': - ret.push_back(' '); - break; - case '\\': - ret.push_back('\\'); - break; - case 'a': - ret.push_back('\a'); - break; - case 'b': - ret.push_back('\b'); - break; - case 'f': - ret.push_back('\f'); - break; - case 'v': - ret.push_back('\v'); - break; - case 'r': - ret.push_back('\r'); - break; - case 'n': - ret.push_back('\n'); - break; - case 't': - ret.push_back('\t'); - break; - case '0': - ret.push_back('\0'); - break; - case 'x': - if(i < size - 2){ - char c3 = s[++i]; - char c4 = s[++i]; - ret.push_back((char)((hex_int(c3) << 4) + hex_int(c4))); - } - break; - default: - ret.push_back(c2); - break; - } - } - return ret; -} - -}; // namespace sim - diff --git a/src/sim.h b/src/sim.h deleted file mode 100644 index 2f641f1..0000000 --- a/src/sim.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef SIM_SIM_H_ -#define SIM_SIM_H_ - -#include -#include -#include "message.h" -#include "decoder.h" -#include "fde.h" -#include "link.h" -#include "server.h" -#include "handler.h" -#include "util/config.h" -#include "util/app.h" -#include "util/log.h" - -namespace sim{ - -const static char KV_SEP_BYTE = '='; -const static char KV_END_BYTE = ' '; -const static char MSG_END_BYTE = '\n'; - - -inline static -double microtime(){ - struct timeval now; - gettimeofday(&now, NULL); - double ret = now.tv_sec + now.tv_usec/1000.0/1000.0; - return ret; -} - -std::string encode(const std::string s, bool force_ascii=false); -std::string decode(const std::string s); - -}; // namespace sim - -#endif diff --git a/src/test.cpp b/src/test.cpp new file mode 100644 index 0000000..752e089 --- /dev/null +++ b/src/test.cpp @@ -0,0 +1,109 @@ +#include "util/log.h" +#include "core/event.h" +#include "core/transport.h" +#include "core/worker.h" +#include "line/line_message.h" +#include "line/line_server.h" + +// using namespace sim; + +#define TICK_INTERVAL 1000 // ms +volatile bool quit = false; + +void signal_handler(int sig){ + switch(sig){ + case SIGTERM: + case SIGINT:{ + quit = true; + break; + } + case SIGALRM:{ + break; + } + } +} + +class TestWorker : public Worker +{ +public: + Transport *trans; + + virtual void process(Event event){ + // log_debug("read"); + LineMessage *req = (LineMessage *)trans->recv(event.id()); + if(!req){ + // do nothing, the close event will finally be triggered + log_debug("recv NULL msg, detect session closed"); + return; + } + + log_debug("recv: %s", req->text().c_str()); + + std::string text = "req="; + text.append(req->text()); + LineMessage *resp = new LineMessage(); + resp->text(text); + trans->send(event.id(), resp); + + delete req; + } +}; + +int main(int argc, char **argv){ + // set_log_level("error"); + + signal(SIGPIPE, SIG_IGN); + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); +// #ifndef __CYGWIN__ +// signal(SIGALRM, signal_handler); +// { +// struct itimerval tv; +// tv.it_interval.tv_sec = (TICK_INTERVAL / 1000); +// tv.it_interval.tv_usec = (TICK_INTERVAL % 1000) * 1000; +// tv.it_value.tv_sec = 0; +// tv.it_value.tv_usec = TICK_INTERVAL * 1000; +// setitimer(ITIMER_REAL, &tv, NULL); +// } +// #endif + + const char *host = "127.0.0.1"; + int port = 8000; + + Server *serv = new LineServer(); + if(serv->listen(host, port) == -1){ + log_error("failed to listen at %s:%d, %s", host, port, strerror(errno)); + exit(0); + } + log_debug("server listen at %s:%d", host, port); + + Transport *trans = Transport::create(); + trans->add_server(serv); + trans->init(); + log_debug("transport setup"); + + TestWorker worker; + worker.trans = trans; + worker.start(10); + + while(!quit){ + const std::vector *events = trans->wait(200); + for(int i=0; i<(int)events->size(); i++){ + Event event = events->at(i); + + if(event.is_new()){ + log_debug("accept"); + trans->accept(event.id()); + }else if(event.is_close()){ + log_debug("close"); + trans->close(event.id()); + }else if(event.is_read()){ + worker.add_task(event); + } + } + } + + worker.stop(); + + return 0; +} diff --git a/src/util/Makefile b/src/util/Makefile new file mode 100644 index 0000000..c45126d --- /dev/null +++ b/src/util/Makefile @@ -0,0 +1,14 @@ +include ../../build_config.mk + +CFLAGS += -I ../ +OBJS = log.o buffer.o + +all: ${OBJS} + ar -cru ./libutil.a ${OBJS} + +.cpp.o: + $(CXX) ${CFLAGS} -c $< -o $@ + +clean: + rm -rf *.o *.a *.out + diff --git a/src/util/buffer.cpp b/src/util/buffer.cpp new file mode 100644 index 0000000..66fdb69 --- /dev/null +++ b/src/util/buffer.cpp @@ -0,0 +1,41 @@ +#include "buffer.h" +#include +#include + +Buffer::Buffer(){ + _size = 0; + _capacity = 1024; + _buf = (char *)malloc(_capacity); +} + +Buffer::~Buffer(){ + free(_buf); +} + +bool Buffer::empty() const{ + return _size == 0; +} + +int Buffer::size() const{ + return _size; +} + +char* Buffer::data(){ + return _buf; +} + +int Buffer::remove(int count){ + memmove(_buf, _buf + count, _size - count); + _size -= count; + return count; +} + +int Buffer::append(const char *buf, int count){ + if(_size + count > _capacity){ + _capacity = _size + count; + _buf = (char *)realloc(_buf, _capacity); + } + memcpy(_buf + _size, buf, count); + _size += count; + return count; +} diff --git a/src/util/buffer.h b/src/util/buffer.h new file mode 100644 index 0000000..72a3bd6 --- /dev/null +++ b/src/util/buffer.h @@ -0,0 +1,23 @@ +#ifndef SIM_BUFFER_H +#define SIM_BUFFER_H + +class Buffer +{ +public: + Buffer(); + ~Buffer(); + + bool empty() const; + int size() const; + char* data(); + + int remove(int count); + int append(const char *buf, int len); + +private: + int _size; + char *_buf; + int _capacity; +}; + +#endif diff --git a/src/util/thread.h b/src/util/thread.h index 24882b0..00d7709 100644 --- a/src/util/thread.h +++ b/src/util/thread.h @@ -14,6 +14,8 @@ found in the LICENSE file. #include #include #include +#include +#include class Mutex{ private: @@ -33,6 +35,37 @@ class Mutex{ } }; +class Semaphore{ +private: + pthread_mutex_t _mutex; + pthread_cond_t _cond; + int _count; +public: + Semaphore(){ + _count = 0; + pthread_mutex_init(&_mutex, NULL); + pthread_cond_init(&_cond, NULL); + } + ~Semaphore(){ + pthread_cond_destroy(&_cond); + pthread_mutex_destroy(&_mutex); + } + void wait(){ + pthread_mutex_lock(&_mutex); + while(_count == 0){ + pthread_cond_wait(&_cond, &_mutex); + } + _count--; + pthread_mutex_unlock(&_mutex); + } + void notify(){ + pthread_mutex_lock(&_mutex); + _count++; + pthread_mutex_unlock(&_mutex); + pthread_cond_signal(&_cond); + } +}; + class Locking{ private: Mutex *mutex; @@ -50,17 +83,124 @@ class Locking{ }; +// Thread safe queue +template +class Queue{ +private: + pthread_cond_t cond; + pthread_mutex_t mutex; + std::queue items; +public: + Queue(); + ~Queue(); + + bool empty(); + int size(); + int push(const T item); + int pop(T *data); + int pop(T *data, int timeout_ms=-1); +}; + +template +Queue::Queue(){ + pthread_cond_init(&cond, NULL); + pthread_mutex_init(&mutex, NULL); +} + +template +Queue::~Queue(){ + pthread_cond_destroy(&cond); + pthread_mutex_destroy(&mutex); +} + +template +bool Queue::empty(){ + bool ret = false; + if(pthread_mutex_lock(&mutex) != 0){ + return -1; + } + ret = items.empty(); + pthread_mutex_unlock(&mutex); + return ret; +} + +template +int Queue::size(){ + int ret = -1; + if(pthread_mutex_lock(&mutex) != 0){ + return -1; + } + ret = items.size(); + pthread_mutex_unlock(&mutex); + return ret; +} + +template +int Queue::push(const T item){ + if(pthread_mutex_lock(&mutex) != 0){ + return -1; + } + { + items.push(item); + } + pthread_mutex_unlock(&mutex); + pthread_cond_signal(&cond); + return 1; +} + +template +int Queue::pop(T *data, int timeout_ms){ + int ret = 0; + + if(pthread_mutex_lock(&mutex) != 0){ + return -1; + } + { + while(items.empty()){ + if(timeout_ms == -1){ + // 可能被系统中断 + pthread_cond_wait(&cond, &mutex); + }else{ + struct timeval now; + gettimeofday(&now, NULL); + + struct timespec tv; + tv.tv_sec = now.tv_sec + (timeout_ms / 1000); + tv.tv_nsec = (now.tv_usec + (timeout_ms % 1000) * 1000) * 1000; + tv.tv_sec += tv.tv_nsec / (1000 * 1000 * 1000); + tv.tv_nsec %= (1000 * 1000 * 1000); + // 超时精确度较差,误差在 5ms 级别 + // 可能被系统中断 + pthread_cond_timedwait(&cond, &mutex, &tv); + // pthread_cond_timedwait() == ETIMEDOUT + break; + } + } + if(items.empty()){ + ret = 0; + }else{ + ret = 1; + *data = items.front(); + items.pop(); + } + } + if(pthread_mutex_unlock(&mutex) != 0){ + return -1; + } + return ret; +} + // Selectable queue, multi writers, single reader template -class SelectableQueue{ +class Channel{ private: int fds[2]; public: Mutex mutex; std::queue items; - SelectableQueue(); - ~SelectableQueue(); + Channel(); + ~Channel(); int fd(){ return fds[0]; } @@ -73,26 +213,26 @@ class SelectableQueue{ template -SelectableQueue::SelectableQueue(){ +Channel::Channel(){ if(pipe(fds) == -1){ exit(0); } } template -SelectableQueue::~SelectableQueue(){ +Channel::~Channel(){ close(fds[0]); close(fds[1]); } template -int SelectableQueue::size(){ +int Channel::size(){ Locking l(&mutex); return items.size(); } template -int SelectableQueue::push(const T item){ +int Channel::push(const T item){ Locking l(&mutex); items.push(item); if(::write(fds[1], "1", 1) == -1){ @@ -102,7 +242,7 @@ int SelectableQueue::push(const T item){ } template -int SelectableQueue::pop(T *data){ +int Channel::pop(T *data){ int n, ret = 1; char buf[1];