Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #4078 from wanderingbort/feature/expirimental-sock…
Browse files Browse the repository at this point in the history
…et-read-optimization

support for SO_RCVLOWAT
  • Loading branch information
arhag authored Jun 15, 2018
2 parents 369c7e3 + 38c6a0b commit 474f888
Showing 1 changed file with 41 additions and 5 deletions.
46 changes: 41 additions & 5 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ namespace eosio {

shared_ptr<tcp::resolver> resolver;

bool use_socket_read_watermark = false;

channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;

void connect( connection_ptr c );
Expand Down Expand Up @@ -489,6 +491,7 @@ namespace eosio {
socket_ptr socket;

fc::message_buffer<1024*1024> pending_message_buffer;
fc::optional<std::size_t> outstanding_read_bytes;
vector<char> blk_buffer;

struct queued_write {
Expand Down Expand Up @@ -2106,14 +2109,34 @@ namespace eosio {
return;
}
connection_wptr weak_conn = conn;
conn->socket->async_read_some
(conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(),
[this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {

std::size_t minimum_read = conn->outstanding_read_bytes ? *conn->outstanding_read_bytes : message_header_size;

if (use_socket_read_watermark) {
const size_t max_socket_read_watermark = 4096;
std::size_t socket_read_watermark = std::min<std::size_t>(minimum_read, max_socket_read_watermark);
boost::asio::socket_base::receive_low_watermark read_watermark_opt(socket_read_watermark);
conn->socket->set_option(read_watermark_opt);
}

auto completion_handler = [minimum_read](boost::system::error_code ec, std::size_t bytes_transferred) -> std::size_t {
if (ec || bytes_transferred >= minimum_read ) {
return 0;
} else {
return minimum_read - bytes_transferred;
}
};

boost::asio::async_read(*conn->socket,
conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler,
[this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) {
auto conn = weak_conn.lock();
if (!conn) {
return;
}

conn->outstanding_read_bytes.reset();

try {
if( !ec ) {
if (bytes_transferred > conn->pending_message_buffer.bytes_to_write()) {
Expand All @@ -2126,6 +2149,7 @@ namespace eosio {
uint32_t bytes_in_buffer = conn->pending_message_buffer.bytes_to_read();

if (bytes_in_buffer < message_header_size) {
conn->outstanding_read_bytes.emplace(message_header_size - bytes_in_buffer);
break;
} else {
uint32_t message_length;
Expand All @@ -2136,13 +2160,22 @@ namespace eosio {
close(conn);
return;
}
if (bytes_in_buffer >= message_length + message_header_size) {

auto total_message_bytes = message_length + message_header_size;

if (bytes_in_buffer >= total_message_bytes) {
conn->pending_message_buffer.advance_read_ptr(message_header_size);
if (!conn->process_next_message(*this, message_length)) {
return;
}
} else {
conn->pending_message_buffer.add_space(message_length + message_header_size - bytes_in_buffer);
auto outstanding_message_bytes = total_message_bytes - bytes_in_buffer;
auto available_buffer_bytes = conn->pending_message_buffer.bytes_to_write();
if (outstanding_message_bytes > available_buffer_bytes) {
conn->pending_message_buffer.add_space( outstanding_message_bytes - available_buffer_bytes );
}

conn->outstanding_read_bytes.emplace(outstanding_message_bytes);
break;
}
}
Expand Down Expand Up @@ -2891,6 +2924,7 @@ namespace eosio {
"True to require exact match of peer network version.")
( "sync-fetch-span", bpo::value<uint32_t>()->default_value(def_sync_fetch_span), "number of blocks to retrieve in a chunk from any individual peer during synchronization")
( "max-implicit-request", bpo::value<uint32_t>()->default_value(def_max_just_send), "maximum sizes of transaction or block messages that are sent without first sending a notice")
( "use-socket-read-watermark", bpo::value<bool>()->default_value(false), "Enable expirimental socket read watermark optimization")
( "peer-log-format", bpo::value<string>()->default_value( "[\"${_name}\" ${_ip}:${_port}]" ),
"The string used to format peers when logging messages about them. Variables are escaped with ${<variable name>}.\n"
"Available Variables:\n"
Expand Down Expand Up @@ -2927,6 +2961,8 @@ namespace eosio {
my->num_clients = 0;
my->started_sessions = 0;

my->use_socket_read_watermark = options.at("use-socket-read-watermark").as<bool>();

my->resolver = std::make_shared<tcp::resolver>( std::ref( app().get_io_service() ) );
if(options.count("p2p-listen-endpoint")) {
my->p2p_address = options.at("p2p-listen-endpoint").as< string >();
Expand Down

0 comments on commit 474f888

Please sign in to comment.