forked from bytemaster/tornet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel.cpp
123 lines (107 loc) · 3.01 KB
/
channel.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include <tornet/net/detail/connection.hpp>
#include <tornet/net/channel.hpp>
#include <boost/cmt/log/log.hpp>
#include <boost/cmt/mutex.hpp>
namespace tornet {
namespace detail {
class channel_private {
public:
channel_private( const detail::connection::ptr& c )
:con(c),closed(false){}
detail::connection::wptr con;
channel::recv_handler rc;
bool closed;
uint16_t rport;
uint16_t lport;
boost::cmt::mutex mtx;
};
}
channel::channel( const detail::connection::ptr& con, uint16_t r, uint16_t l )
:my(new detail::channel_private(con)) {
my->rport = r;
my->lport = l;
}
channel::channel() { }
channel::~channel() { }
boost::cmt::thread* channel::get_thread()const {
if( !my ) {wlog("my==0"); return 0;}
if( my->con.expired() ) elog( "my->con expired" );
detail::connection::ptr c(my->con);
return &c->get_thread();
}
void channel::close() {
slog("close!!" );
if( my ) {
// boost::unique_lock<boost::cmt::mutex> lock( my->mtx );
if( !my->con.expired() ) {
detail::connection::ptr c(my->con);
if( c )
c->close_channel(*this);
}
my->rc = channel::recv_handler();
my.reset();
}
}
void channel::reset() {
slog( "reset!" );
my->con.reset();
}
channel::node_id channel::remote_node()const {
BOOST_ASSERT(my);
detail::connection::ptr c(my->con);
if( c )
return c->get_remote_id();
TORNET_THROW( "Channel Closed" );
}
uint8_t channel::remote_rank()const {
BOOST_ASSERT(my);
detail::connection::ptr c(my->con);
if( c )
return c->get_remote_rank();
TORNET_THROW( "Channel Closed" );
}
uint16_t channel::local_channel_num() const {
BOOST_ASSERT(my);
return my->lport;
}
uint16_t channel::remote_channel_num() const {
BOOST_ASSERT(my);
return my->rport;
}
void channel::on_recv( const recv_handler& rc ) {
BOOST_ASSERT(my);
boost::unique_lock<boost::cmt::mutex> lock( my->mtx );
my->rc = rc;
}
void channel::recv( const tornet::buffer& b, channel::error_code ec ) {
BOOST_ASSERT(my);
// race condition between on_recv and recv both accessing
// the receive callback
boost::unique_lock<boost::cmt::mutex> lock( my->mtx );
if( my->rc ) {
my->rc(b,ec);
}
// note, if recv callback calls close() my could be NULL now
}
void channel::send( const tornet::buffer& b ) {
if( !my )
TORNET_THROW( "Channel freed!" );
detail::connection::ptr c(my->con);
if( c )
c->send(*this,b);
else
TORNET_THROW( "Channel Closed" );
}
channel::operator bool()const {
if( my ) {
if( !my->con.expired() )
return true;
// connection is gone! This channel is bogus!
const_cast<channel*>(this)->my.reset();
}
return false;
}
bool channel::operator==(const channel& c )const {
return my == c.my;
}
} // namespace tornet