Skip to content

Commit

Permalink
fixed conv clear bug caused by rehash
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu- committed Oct 8, 2017
1 parent a3d3cf9 commit fb3edca
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 44 deletions.
3 changes: 2 additions & 1 deletion common.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const i32_t max_fail_time=0;//disable

const u32_t heartbeat_interval=1000;

const u32_t timer_interval=400;//this should be smaller than heartbeat_interval and retry interval;
const u32_t timer_interval=50;//this should be smaller than heartbeat_interval and retry interval;

//const uint32_t conv_timeout=120000; //120 second
//const u32_t conv_timeout=120000; //for test
Expand Down Expand Up @@ -152,6 +152,7 @@ struct dest_t
dest_type type;
inner_t inner;
u32_t conv;
int cook=0;
};

struct fd_info_t
Expand Down
10 changes: 6 additions & 4 deletions connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ conn_manager_t conn_manager;
void server_clear_function(u64_t u64)//used in conv_manager in server mode.for server we have to use one udp fd for one conv(udp connection),
//so we have to close the fd when conv expires
{
int fd64=u64;
fd64_t fd64=u64;
assert(fd_manager.exist(fd64));
fd_manager.close(fd64);
fd_manager.fd64_close(fd64);
}

conv_manager_t::conv_manager_t()
Expand Down Expand Up @@ -103,12 +103,14 @@ conv_manager_t::~conv_manager_t()
}
int conv_manager_t::erase_conv(u32_t conv)
{
if(disable_conv_clear) return 0;
//if(disable_conv_clear) return 0;
assert(conv_last_active_time.find(conv)!=conv_last_active_time.end());
u64_t u64=conv_to_u64[conv];
if(program_mode==server_mode)
{
server_clear_function(u64);
}
assert(conv_to_u64.find(conv)!=conv_to_u64.end());
conv_to_u64.erase(conv);
u64_to_conv.erase(u64);
conv_last_active_time.erase(conv);
Expand Down Expand Up @@ -152,7 +154,7 @@ conv_manager_t::~conv_manager_t()
old_it=it;
it++;
u32_t conv= old_it->first;
erase_conv(old_it->first);
erase_conv(conv);
if(ip_port==0)
{
mylog(log_info,"conv %x cleared\n",conv);
Expand Down
51 changes: 33 additions & 18 deletions connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ struct conv_manager_t // manage the udp connections
long long last_clear_time;

conv_manager_t();
conv_manager_t(const conv_manager_t &b)
{
assert(0==1);
}
~conv_manager_t();
int get_size();
void reserve();
Expand All @@ -70,30 +74,41 @@ struct conn_info_t //stores info for a raw connection.for client ,there is o
conv_manager_t conv_manager;
fec_encode_manager_t fec_encode_manager;
fec_decode_manager_t fec_decode_manager;
fd64_t timer_fd;
my_timer_t timer;
ip_port_t ip_port;
conn_info_t()
{
}
conn_info_t(const conn_info_t &b)
{
assert(0==1);
}
};//g_conn_info;

struct conn_manager_t //manager for connections. for client,we dont need conn_manager since there is only one connection.for server we use one conn_manager for all connections
{

unordered_map<u64_t,conn_info_t*> mp;//<ip,port> to conn_info_t;
unordered_map<u64_t,conn_info_t*>::iterator clear_it;
long long last_clear_time;

conn_manager_t();
int exist(ip_port_t);
conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash
conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash
int insert(ip_port_t);
/*
int exist_fd64(fd64_t fd64);
void insert_fd64(fd64_t fd64,ip_port_t);
ip_port_t find_by_fd64(fd64_t fd64);*/

int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it);
int clear_inactive();
int clear_inactive0();
unordered_map<u64_t,conn_info_t*> mp;//<ip,port> to conn_info_t;
unordered_map<u64_t,conn_info_t*>::iterator clear_it;
long long last_clear_time;

conn_manager_t();
conn_manager_t(const conn_info_t &b)
{
assert(0==1);
}
int exist(ip_port_t);
conn_info_t *& find_p(ip_port_t); //be aware,the adress may change after rehash
conn_info_t & find(ip_port_t) ; //be aware,the adress may change after rehash
int insert(ip_port_t);
/*
int exist_fd64(fd64_t fd64);
void insert_fd64(fd64_t fd64,ip_port_t);
ip_port_t find_by_fd64(fd64_t fd64);*/

int erase(unordered_map<u64_t,conn_info_t*>::iterator erase_it);
int clear_inactive();
int clear_inactive0();

};

Expand Down
2 changes: 1 addition & 1 deletion delay_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len)
}

delay_data_t tmp=delay_data;
tmp.data=(char *)malloc(delay_data.len);
tmp.data=(char *)malloc(delay_data.len+100);

memcpy(tmp.data,delay_data.data,delay_data.len);

Expand Down
75 changes: 75 additions & 0 deletions delay_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "common.h"
#include "packet.h"
#include "log.h"

//enum delay_type_t {none=0,enum_sendto_u64,enum_send_fd,client_to_local,client_to_remote,server_to_local,server_to_remote};

Expand All @@ -27,6 +28,80 @@ union dest_t
u64_t u64;
};
*/

struct my_timer_t
{
int timer_fd;
fd64_t timer_fd64;
my_timer_t()
{
if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
{
mylog(log_fatal,"timer_fd create error");
myexit(1);
}
timer_fd64=fd_manager.create(timer_fd);
}
my_timer_t(const my_timer_t &b)
{
assert(0==1);
}
~my_timer_t()
{
fd_manager.fd64_close(timer_fd64);
}
int add_fd_to_epoll(int epoll_fd)
{
epoll_event ev;;
ev.events = EPOLLIN;
ev.data.u64 = timer_fd;
int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev);
if (ret!= 0) {
mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
myexit(-1);
}
return 0;
}
int add_fd64_to_epoll(int epoll_fd)
{
epoll_event ev;;
ev.events = EPOLLIN;
ev.data.u64 = timer_fd64;
int ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &ev);
if (ret!= 0) {
mylog(log_fatal,"add delay_manager.get_timer_fd() error\n");
myexit(-1);
}
return 0;
}
int get_timer_fd()
{
return timer_fd;
}
fd64_t get_timer_fd64()
{
return timer_fd64;
}
int set_timer_repeat_us(my_time_t my_time)
{
itimerspec its;
memset(&its,0,sizeof(its));
its.it_interval.tv_sec=my_time/1000000llu;
its.it_interval.tv_nsec=my_time%1000000llu*1000llu;
its.it_value.tv_nsec=1; //imidiately
timerfd_settime(timer_fd,0,&its,0);
return 0;
}
int set_timer_abs_us(my_time_t my_time)
{
itimerspec its;
memset(&its,0,sizeof(its));
its.it_value.tv_sec=my_time/1000000llu;
its.it_value.tv_nsec=my_time%1000000llu*1000llu;
timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
return 0;
}
};
struct delay_data_t
{
dest_t dest;
Expand Down
2 changes: 1 addition & 1 deletion fd_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void fd_manager_t::remove_fd(int fd)
fd64_to_fd_mp.erase(fd64);
//return 0;
}*/
void fd_manager_t::close(fd64_t fd64)
void fd_manager_t::fd64_close(fd64_t fd64)
{
assert(exist(fd64));
int fd=fd64_to_fd_mp[fd64];
Expand Down
2 changes: 1 addition & 1 deletion fd_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct fd_manager_t //conver fd to a uniq 64bit number,avoid fd value conflict
int exist_info(fd64_t);
int exist(fd64_t fd64);
int to_fd(fd64_t);
void close(fd64_t fd64);
void fd64_close(fd64_t fd64);
void reserve(int n);
u64_t create(int fd);
fd_manager_t();
Expand Down
2 changes: 1 addition & 1 deletion fec_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fec_encode_manager_t::fec_encode_manager_t()
}
fec_encode_manager_t::~fec_encode_manager_t()
{
fd_manager.close(timer_fd64);
fd_manager.fd64_close(timer_fd64);
}
u64_t fec_encode_manager_t::get_timer_fd64()
{
Expand Down
Loading

0 comments on commit fb3edca

Please sign in to comment.