diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 344b6ad748c..ce5cd00cad1 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -194,6 +194,8 @@ namespace eosio { shared_ptr resolver; + bool use_socket_read_watermark = false; + channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription; void connect( connection_ptr c ); @@ -489,6 +491,7 @@ namespace eosio { socket_ptr socket; fc::message_buffer<1024*1024> pending_message_buffer; + fc::optional outstanding_read_bytes; vector blk_buffer; struct queued_write { @@ -2106,14 +2109,34 @@ namespace eosio { return; } connection_wptr weak_conn = conn; - conn->socket->async_read_some - (conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), - [this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) { + + std::size_t minimum_read = conn->outstanding_read_bytes ? *conn->outstanding_read_bytes : message_header_size; + + if (use_socket_read_watermark) { + const size_t max_socket_read_watermark = 4096; + std::size_t socket_read_watermark = std::min(minimum_read, max_socket_read_watermark); + boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark); + conn->socket->set_option(read_watermark_opt); + } + + auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t { + if (ec || bytes_transferred >= minimum_read ) { + return 0; + } else { + return minimum_read - bytes_transferred; + } + }; + + boost::asio::async_read(*conn->socket, + conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler, + [this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) { auto conn = weak_conn.lock(); if (!conn) { return; } + conn->outstanding_read_bytes.reset(); + try { if( !ec ) { if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) { @@ -2126,6 +2149,7 @@ namespace eosio { uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read(); if (bytes_in_buffer < message_header_size) { + conn->outstanding_read_bytes.emplace(message_header_size - bytes_in_buffer); break; } else { uint32_t message_length; @@ -2136,13 +2160,22 @@ namespace eosio { close(conn); return; } - if (bytes_in_buffer >= message_length + message_header_size) { + + auto total_message_bytes = message_length + message_header_size; + + if (bytes_in_buffer >= total_message_bytes) { conn->pending_message_buffer.advance_read_ptr(message_header_size); if (!conn->process_next_message(*this, message_length)) { return; } } else { - conn->pending_message_buffer.add_space(message_length + message_header_size - bytes_in_buffer); + auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer; + auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write(); + if (outstanding_message_bytes > available_buffer_bytes) { + conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes ); + } + + conn->outstanding_read_bytes.emplace(outstanding_message_bytes); break; } } @@ -2891,6 +2924,7 @@ namespace eosio { "True to require exact match of peer network version.") ( "sync-fetch-span", bpo::value()->default_value(def_sync_fetch_span), "number of blocks to retrieve in a chunk from any individual peer during synchronization") ( "max-implicit-request", bpo::value()->default_value(def_max_just_send), "maximum sizes of transaction or block messages that are sent without first sending a notice") + ( "use-socket-read-watermark", bpo::value()->default_value(false), "Enable expirimental socket read watermark optimization") ( "peer-log-format", bpo::value()->default_value( "[\"${_name}\" ${_ip}:${_port}]" ), "The string used to format peers when logging messages about them. Variables are escaped with ${}.\n" "Available Variables:\n" @@ -2927,6 +2961,8 @@ namespace eosio { my->num_clients = 0; my->started_sessions = 0; + my->use_socket_read_watermark = options.at("use-socket-read-watermark").as(); + my->resolver = std::make_shared( std::ref( app().get_io_service() ) ); if(options.count("p2p-listen-endpoint")) { my->p2p_address = options.at("p2p-listen-endpoint").as< string >();