From ac60bb202fd3c6d86226edc8c5ba927b3f941861 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Mon, 10 Sep 2012 01:05:19 -0400 Subject: [PATCH] nat traveral testing --- include/tornet/connection.hpp | 21 +++++++++----- include/tornet/kad.hpp | 6 ++-- include/tornet/node.hpp | 15 ++++++++++ net/node.hpp | 2 ++ src/connection.cpp | 53 ++++++++++++++++++++++++++++++----- src/kad.cpp | 20 +++++++++---- src/main.cpp | 7 +++-- src/node.cpp | 20 +++++++++++++ 8 files changed, 118 insertions(+), 26 deletions(-) diff --git a/include/tornet/connection.hpp b/include/tornet/connection.hpp index e04ef6e..abf7b98 100644 --- a/include/tornet/connection.hpp +++ b/include/tornet/connection.hpp @@ -42,13 +42,14 @@ namespace tn { }; enum proto_message_type { - data_msg = 0, - auth_msg = 1, - auth_resp_msg = 2, - route_lookup_msg = 3, - route_msg = 4, - close_msg = 5, - update_rank = 6 + data_msg = 0, + auth_msg = 1, + auth_resp_msg = 2, + route_lookup_msg = 3, + route_msg = 4, + close_msg = 5, + update_rank = 6, + req_reverse_connect_msg = 7 }; typedef fc::shared_ptr ptr; @@ -92,6 +93,10 @@ namespace tn { bool handle_route_msg( const tn::buffer& b ); bool handle_update_rank_msg( const tn::buffer& b ); + // requests the remote host attempt to connect to ep + void request_reverse_connect( const fc::ip::endpoint& ep ); + bool handle_request_reverse_connect_msg( const tn::buffer& b ); + void generate_dh(); bool process_dh( const tn::buffer& b ); void send_dh(); @@ -99,10 +104,12 @@ namespace tn { void send_auth_response(bool); void send_update_rank(); + void close_channel( const channel& c ); void send_close(); void send( const channel& c, const tn::buffer& b ); + void set_remote_id( const node_id& nid ); void send( const char* c, uint32_t l, proto_message_type t ); diff --git a/include/tornet/kad.hpp b/include/tornet/kad.hpp index 4ac0341..7a44dfe 100644 --- a/include/tornet/kad.hpp +++ b/include/tornet/kad.hpp @@ -49,7 +49,7 @@ namespace tn { * * TODO: Document the thread this update occurs in... */ - const std::map& current_results()const { + const std::map& current_results()const { return m_current_results; } @@ -82,8 +82,8 @@ namespace tn { status m_cur_status; /// stores endpoints that are on deck ordered by distance. - std::map m_search_queue; - std::map m_current_results; + std::map m_search_queue; + std::map m_current_results; }; diff --git a/include/tornet/node.hpp b/include/tornet/node.hpp index bcf42df..ddb2f17 100644 --- a/include/tornet/node.hpp +++ b/include/tornet/node.hpp @@ -82,6 +82,9 @@ namespace tn { * @param limit - the maximum distance to consider, or unlimited distance if limit is 0 * @param n - the number of nodes to return * + * @note The returned host.ids are distances from 'this' node, to get the real id + * you must perform host.id ^ this->node.id + * * TODO: Add a method to query info about a given node. */ fc::vector find_nodes_near( const id_type& target, uint32_t n, @@ -100,6 +103,18 @@ namespace tn { */ id_type connect_to( const endpoint& ep ); + /** + * Given the desired endpoint (ep), attempt to open a connection, but ask nat_ep to + * forward a request to poke a hole in the firewall. Presumably, nat_ep is already + * allowed to talk to ep so his message can get through. + * + * Once the nat punch-through is achieved, then this method behaves the same as + * connect_to(ep); + * + * If the punch-through is not successful, then an exception will be thrown. + */ + id_type connect_to( const endpoint& ep, const endpoint& nat_into_ep ); + /** * This method will attempt to connect to node_id and then create a new channel to node_port with * the coresponding local_port for return messages. diff --git a/net/node.hpp b/net/node.hpp index 644cf85..b58f7e7 100644 --- a/net/node.hpp +++ b/net/node.hpp @@ -82,6 +82,8 @@ namespace tornet { */ id_type connect_to( const endpoint& ep ); + + /** * This method will attempt to connect to node_id and then create a new channel to node_port with * the coresponding local_port for return messages. diff --git a/src/connection.cpp b/src/connection.cpp index 4518f5c..2d382e5 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -282,6 +282,44 @@ void connection::send_close() { send( &resp, sizeof(resp), close_msg ); } + +/** + * First, local host should send a punch_through to ep. + * Then, ask this connection to forward a 'reverse connect' request + * to ep. + * + * The above two steps should repeat every .25 seconds up to 3 times + * before concluding that a connection could not be established. + * + * + * + * Put the connection in 'waiting for punch through state' so that + * + * + * Send request_nat_punch_through_msg to remote ep. + * The remote ep will then send a nat_punch_through_msg to ep + * The remote ep will then send a request_nat_punch_through_ack_msg back to us. + * + * This call will block for up to 1 second or until it receives the ack msg. + * The request will be sent every .25 seconds until 1 second has elapsed in case + * a packet was dropped. + */ +void connection::request_reverse_connect( const fc::ip::endpoint& ep ) { + char rnpt[6]; + fc::datastream ds(rnpt,sizeof(rnpt)); + ds << uint32_t(ep.get_address()) << ep.port(); + send( rnpt, sizeof(rnpt), req_reverse_connect_msg ); +} + +bool connection::handle_request_reverse_connect_msg( const tn::buffer& b ) { + fc::datastream ds(b.data(), b.size() ); + uint32_t ip; uint16_t port; + ds >> ip >> port; + + my->_node.get_thread().async( [=]() { my->_node.connect_to( fc::ip::endpoint( ip, port ) ); } ); + return true; +} + /** * Return the received rank */ @@ -315,13 +353,14 @@ bool connection::decode_packet( const tn::buffer& b ) { // slog( "%1% bytes type %2% pad %3%", b.size(), int(msg_type), int(pad) ); switch( msg_type ) { - case data_msg: return handle_data_msg( b.subbuf(4,b.size()-4-pad) ); - case auth_msg: return handle_auth_msg( b.subbuf(4,b.size()-4-pad) ); - case auth_resp_msg: return handle_auth_resp_msg( b.subbuf(4,b.size()-4-pad) ); - case route_lookup_msg: return handle_lookup_msg( b.subbuf(4,b.size()-4-pad) ); - case route_msg: return handle_route_msg( b.subbuf(4,b.size()-4-pad) ); - case close_msg: return handle_close_msg( b.subbuf(4,b.size()-4-pad) ); - case update_rank: return handle_update_rank_msg( b.subbuf(4,b.size()-4-pad) ); + case data_msg: return handle_data_msg( b.subbuf(4,b.size()-4-pad) ); + case auth_msg: return handle_auth_msg( b.subbuf(4,b.size()-4-pad) ); + case auth_resp_msg: return handle_auth_resp_msg( b.subbuf(4,b.size()-4-pad) ); + case route_lookup_msg: return handle_lookup_msg( b.subbuf(4,b.size()-4-pad) ); + case route_msg: return handle_route_msg( b.subbuf(4,b.size()-4-pad) ); + case close_msg: return handle_close_msg( b.subbuf(4,b.size()-4-pad) ); + case update_rank: return handle_update_rank_msg( b.subbuf(4,b.size()-4-pad) ); + case req_reverse_connect_msg: return handle_request_reverse_connect_msg( b.subbuf(4,b.size()-4-pad) ); default: wlog( "Unknown message type" ); } diff --git a/src/kad.cpp b/src/kad.cpp index b55b785..23e1836 100644 --- a/src/kad.cpp +++ b/src/kad.cpp @@ -27,7 +27,8 @@ namespace tn { auto i = nn.begin(); auto e = nn.end(); while( i != e ) { - m_search_queue[i->id] = i->ep; + // note i->id is the distance from this node's id + m_search_queue[i->id] = *i; ++i; } @@ -65,18 +66,25 @@ namespace tn { void kad_search::search_thread() { slog( "search thread.... queue size %d", m_search_queue.size() ); while( m_search_queue.size() && m_cur_status == kad_search::searching ) { - fc::ip::endpoint ep = m_search_queue.begin()->second; + auto cur_item = m_search_queue.begin()->second; + fc::ip::endpoint ep = m_search_queue.begin()->second.ep; fc::sha1 nid = m_search_queue.begin()->first ^ m_target; m_search_queue.erase(m_search_queue.begin()); try { + // TODO: determine if we must perform nat traversal + if( cur_item.nat_hosts.size() ) { + elog( "This node requies NAT traversal to reach!! %s", + fc::string(cur_item.nat_hosts.front()).c_str() ); + } + fc::sha1 rtn = m_node->connect_to(ep); slog( "node %s found at %s", fc::string(rtn).c_str(), fc::string(ep).c_str() ); // This filter may involve RPC calls.... filter( rtn ); slog( " adding node %s to result list", fc::string(rtn).c_str() ); - m_current_results[m_target^rtn] = rtn; + m_current_results[m_target^rtn] = host( rtn, ep ); if( m_current_results.size() > m_n ) { m_current_results.erase( --m_current_results.end() ); } @@ -118,12 +126,12 @@ namespace tn { // if current results is not 'full' or the new result is less than the last // current result if( m_current_results.size() < m_n ) { - m_search_queue[rri->id] = rri->ep; + m_search_queue[rri->id] = *rri; } else { // assume m_current_results.size() > 1 because m_n >= 1 - std::map::const_iterator ritr = m_current_results.end(); + auto ritr = m_current_results.end(); --ritr; if( ritr->first > rri->id ) { // only search the node if it is closer than current results - m_search_queue[rri->id] = rri->ep; + m_search_queue[rri->id] = *rri; } } } diff --git a/src/main.cpp b/src/main.cpp index cf71079..4f23fa7 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -58,11 +58,12 @@ void start_services( int argc, char** argv ) { ks->wait(); slog( "Results: \n" ); - const std::map& r = ks->current_results(); + //const std::map& + auto r = ks->current_results(); auto itr = r.begin(); while( itr != r.end() ) { - slog( " distance: %s node: %s", - fc::string(itr->first).c_str(), fc::string(itr->second).c_str() ); + slog( " distance: %s node: %s endpoint: %s", + fc::string(itr->first).c_str(), fc::string(itr->second.id).c_str(), fc::string(itr->second.ep).c_str() ); ++itr; } } catch ( ... ) { diff --git a/src/node.cpp b/src/node.cpp index 3e8e97d..d29db00 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -84,6 +84,26 @@ namespace tn { my->listen(port); } + fc::sha1 node::connect_to( const endpoint& ep, const endpoint& nat_ep ) { + if( !my->_thread.is_current() ) { + return my->_thread.async( [&,this](){ return connect_to( ep, nat_ep ); } ).wait(); + } + + ep_to_con_map::iterator ep_con = my->_ep_to_con.find(ep); + if( ep_con != my->_ep_to_con.end() ) { return ep_con->second->get_remote_id(); } + + ep_to_con_map::iterator nat_con = my->_ep_to_con.find(nat_ep); + if( nat_con == my->_ep_to_con.end() || nat_con->second->get_state() != connection::connected ) { + FC_THROW( "No active connection to NAT endpoint %s", fc::string(nat_ep).c_str() ); + } + + nat_con->second->request_reverse_connect(ep); + + return connect_to(ep); + } + + + node::id_type node::connect_to( const node::endpoint& ep ) { if( !my->_thread.is_current() ) { return my->_thread.async( [&,this](){ return connect_to( ep ); } ).wait();