Skip to content

Commit

Permalink
nat traveral testing
Browse files Browse the repository at this point in the history
  • Loading branch information
bytemaster committed Sep 10, 2012
1 parent 8295a82 commit ac60bb2
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 26 deletions.
21 changes: 14 additions & 7 deletions include/tornet/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ namespace tn {
};

enum proto_message_type {
data_msg = 0,
auth_msg = 1,
auth_resp_msg = 2,
route_lookup_msg = 3,
route_msg = 4,
close_msg = 5,
update_rank = 6
data_msg = 0,
auth_msg = 1,
auth_resp_msg = 2,
route_lookup_msg = 3,
route_msg = 4,
close_msg = 5,
update_rank = 6,
req_reverse_connect_msg = 7
};

typedef fc::shared_ptr<connection> ptr;
Expand Down Expand Up @@ -92,17 +93,23 @@ namespace tn {
bool handle_route_msg( const tn::buffer& b );
bool handle_update_rank_msg( const tn::buffer& b );

// requests the remote host attempt to connect to ep
void request_reverse_connect( const fc::ip::endpoint& ep );
bool handle_request_reverse_connect_msg( const tn::buffer& b );

void generate_dh();
bool process_dh( const tn::buffer& b );
void send_dh();
void send_auth();
void send_auth_response(bool);
void send_update_rank();


void close_channel( const channel& c );
void send_close();
void send( const channel& c, const tn::buffer& b );


void set_remote_id( const node_id& nid );
void send( const char* c, uint32_t l, proto_message_type t );

Expand Down
6 changes: 3 additions & 3 deletions include/tornet/kad.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace tn {
*
* TODO: Document the thread this update occurs in...
*/
const std::map<fc::sha1,fc::sha1>& current_results()const {
const std::map<fc::sha1,host>& current_results()const {
return m_current_results;
}

Expand Down Expand Up @@ -82,8 +82,8 @@ namespace tn {
status m_cur_status;

/// stores endpoints that are on deck ordered by distance.
std::map<fc::sha1, fc::ip::endpoint> m_search_queue;
std::map<fc::sha1, fc::sha1> m_current_results;
std::map<fc::sha1, host> m_search_queue;
std::map<fc::sha1, host> m_current_results;
};


Expand Down
15 changes: 15 additions & 0 deletions include/tornet/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ namespace tn {
* @param limit - the maximum distance to consider, or unlimited distance if limit is 0
* @param n - the number of nodes to return
*
* @note The returned host.ids are distances from 'this' node, to get the real id
* you must perform host.id ^ this->node.id
*
* TODO: Add a method to query info about a given node.
*/
fc::vector<host> find_nodes_near( const id_type& target, uint32_t n,
Expand All @@ -100,6 +103,18 @@ namespace tn {
*/
id_type connect_to( const endpoint& ep );

/**
* Given the desired endpoint (ep), attempt to open a connection, but ask nat_ep to
* forward a request to poke a hole in the firewall. Presumably, nat_ep is already
* allowed to talk to ep so his message can get through.
*
* Once the nat punch-through is achieved, then this method behaves the same as
* connect_to(ep);
*
* If the punch-through is not successful, then an exception will be thrown.
*/
id_type connect_to( const endpoint& ep, const endpoint& nat_into_ep );

/**
* This method will attempt to connect to node_id and then create a new channel to node_port with
* the coresponding local_port for return messages.
Expand Down
2 changes: 2 additions & 0 deletions net/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ namespace tornet {
*/
id_type connect_to( const endpoint& ep );



/**
* This method will attempt to connect to node_id and then create a new channel to node_port with
* the coresponding local_port for return messages.
Expand Down
53 changes: 46 additions & 7 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,44 @@ void connection::send_close() {
send( &resp, sizeof(resp), close_msg );
}


/**
* First, local host should send a punch_through to ep.
* Then, ask this connection to forward a 'reverse connect' request
* to ep.
*
* The above two steps should repeat every .25 seconds up to 3 times
* before concluding that a connection could not be established.
*
*
*
* Put the connection in 'waiting for punch through state' so that
*
*
* Send request_nat_punch_through_msg to remote ep.
* The remote ep will then send a nat_punch_through_msg to ep
* The remote ep will then send a request_nat_punch_through_ack_msg back to us.
*
* This call will block for up to 1 second or until it receives the ack msg.
* The request will be sent every .25 seconds until 1 second has elapsed in case
* a packet was dropped.
*/
void connection::request_reverse_connect( const fc::ip::endpoint& ep ) {
char rnpt[6];
fc::datastream<char*> ds(rnpt,sizeof(rnpt));
ds << uint32_t(ep.get_address()) << ep.port();
send( rnpt, sizeof(rnpt), req_reverse_connect_msg );
}

bool connection::handle_request_reverse_connect_msg( const tn::buffer& b ) {
fc::datastream<const char*> ds(b.data(), b.size() );
uint32_t ip; uint16_t port;
ds >> ip >> port;

my->_node.get_thread().async( [=]() { my->_node.connect_to( fc::ip::endpoint( ip, port ) ); } );
return true;
}

/**
* Return the received rank
*/
Expand Down Expand Up @@ -315,13 +353,14 @@ bool connection::decode_packet( const tn::buffer& b ) {
// slog( "%1% bytes type %2% pad %3%", b.size(), int(msg_type), int(pad) );

switch( msg_type ) {
case data_msg: return handle_data_msg( b.subbuf(4,b.size()-4-pad) );
case auth_msg: return handle_auth_msg( b.subbuf(4,b.size()-4-pad) );
case auth_resp_msg: return handle_auth_resp_msg( b.subbuf(4,b.size()-4-pad) );
case route_lookup_msg: return handle_lookup_msg( b.subbuf(4,b.size()-4-pad) );
case route_msg: return handle_route_msg( b.subbuf(4,b.size()-4-pad) );
case close_msg: return handle_close_msg( b.subbuf(4,b.size()-4-pad) );
case update_rank: return handle_update_rank_msg( b.subbuf(4,b.size()-4-pad) );
case data_msg: return handle_data_msg( b.subbuf(4,b.size()-4-pad) );
case auth_msg: return handle_auth_msg( b.subbuf(4,b.size()-4-pad) );
case auth_resp_msg: return handle_auth_resp_msg( b.subbuf(4,b.size()-4-pad) );
case route_lookup_msg: return handle_lookup_msg( b.subbuf(4,b.size()-4-pad) );
case route_msg: return handle_route_msg( b.subbuf(4,b.size()-4-pad) );
case close_msg: return handle_close_msg( b.subbuf(4,b.size()-4-pad) );
case update_rank: return handle_update_rank_msg( b.subbuf(4,b.size()-4-pad) );
case req_reverse_connect_msg: return handle_request_reverse_connect_msg( b.subbuf(4,b.size()-4-pad) );
default:
wlog( "Unknown message type" );
}
Expand Down
20 changes: 14 additions & 6 deletions src/kad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ namespace tn {
auto i = nn.begin();
auto e = nn.end();
while( i != e ) {
m_search_queue[i->id] = i->ep;
// note i->id is the distance from this node's id
m_search_queue[i->id] = *i;
++i;
}

Expand Down Expand Up @@ -65,18 +66,25 @@ namespace tn {
void kad_search::search_thread() {
slog( "search thread.... queue size %d", m_search_queue.size() );
while( m_search_queue.size() && m_cur_status == kad_search::searching ) {
fc::ip::endpoint ep = m_search_queue.begin()->second;
auto cur_item = m_search_queue.begin()->second;
fc::ip::endpoint ep = m_search_queue.begin()->second.ep;
fc::sha1 nid = m_search_queue.begin()->first ^ m_target;
m_search_queue.erase(m_search_queue.begin());

try {
// TODO: determine if we must perform nat traversal
if( cur_item.nat_hosts.size() ) {
elog( "This node requies NAT traversal to reach!! %s",
fc::string(cur_item.nat_hosts.front()).c_str() );
}

fc::sha1 rtn = m_node->connect_to(ep);
slog( "node %s found at %s", fc::string(rtn).c_str(), fc::string(ep).c_str() );

// This filter may involve RPC calls....
filter( rtn );
slog( " adding node %s to result list", fc::string(rtn).c_str() );
m_current_results[m_target^rtn] = rtn;
m_current_results[m_target^rtn] = host( rtn, ep );
if( m_current_results.size() > m_n ) {
m_current_results.erase( --m_current_results.end() );
}
Expand Down Expand Up @@ -118,12 +126,12 @@ namespace tn {
// if current results is not 'full' or the new result is less than the last
// current result
if( m_current_results.size() < m_n ) {
m_search_queue[rri->id] = rri->ep;
m_search_queue[rri->id] = *rri;
} else { // assume m_current_results.size() > 1 because m_n >= 1
std::map<fc::sha1,fc::sha1>::const_iterator ritr = m_current_results.end();
auto ritr = m_current_results.end();
--ritr;
if( ritr->first > rri->id ) { // only search the node if it is closer than current results
m_search_queue[rri->id] = rri->ep;
m_search_queue[rri->id] = *rri;
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ void start_services( int argc, char** argv ) {
ks->wait();

slog( "Results: \n" );
const std::map<fc::sha1,fc::sha1>& r = ks->current_results();
//const std::map<fc::sha1,tn::host>&
auto r = ks->current_results();
auto itr = r.begin();
while( itr != r.end() ) {
slog( " distance: %s node: %s",
fc::string(itr->first).c_str(), fc::string(itr->second).c_str() );
slog( " distance: %s node: %s endpoint: %s",
fc::string(itr->first).c_str(), fc::string(itr->second.id).c_str(), fc::string(itr->second.ep).c_str() );
++itr;
}
} catch ( ... ) {
Expand Down
20 changes: 20 additions & 0 deletions src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,26 @@ namespace tn {
my->listen(port);
}

fc::sha1 node::connect_to( const endpoint& ep, const endpoint& nat_ep ) {
if( !my->_thread.is_current() ) {
return my->_thread.async( [&,this](){ return connect_to( ep, nat_ep ); } ).wait();
}

ep_to_con_map::iterator ep_con = my->_ep_to_con.find(ep);
if( ep_con != my->_ep_to_con.end() ) { return ep_con->second->get_remote_id(); }

ep_to_con_map::iterator nat_con = my->_ep_to_con.find(nat_ep);
if( nat_con == my->_ep_to_con.end() || nat_con->second->get_state() != connection::connected ) {
FC_THROW( "No active connection to NAT endpoint %s", fc::string(nat_ep).c_str() );
}

nat_con->second->request_reverse_connect(ep);

return connect_to(ep);
}



node::id_type node::connect_to( const node::endpoint& ep ) {
if( !my->_thread.is_current() ) {
return my->_thread.async( [&,this](){ return connect_to( ep ); } ).wait();
Expand Down

0 comments on commit ac60bb2

Please sign in to comment.