Skip to content

Commit

Permalink
Add TCP buffer sizes configuration (#1629)
Browse files Browse the repository at this point in the history
* Move socket to stream/listener functions to zenoh-link-commons

* Use TcpSocketUtils to create listener/stream in TLS link

* Change TcpSocketUtils to TcpSocketConfig

* Add TCP buffer size endpoint parameters and parsing

* Add TCP buffer sizes to config file

* Rename endpoint parameters to match config file

* Correct typo

* Fix more typos

* Remove unused cargo dependency

* Add TCP buffer sizes config tests

* Allow zero-buffer tests to panic only on mac

* Remove test headers from function
  • Loading branch information
oteffahi authored Dec 6, 2024
1 parent 64e8caa commit b7d2051
Show file tree
Hide file tree
Showing 13 changed files with 412 additions and 112 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,12 @@
// note that mTLS (client authentication) is required for a listener to disconnect a client on expiration
close_link_on_expiration: false,
},
/// Optional configuration for TCP system buffers sizes. Applies to TCP and TLS links.
///
/// Configure TCP read buffer size (bytes)
// tcp_rx_buffer: 123456,
/// Configure TCP write buffer size (bytes)
// tcp_tx_buffer: 123456,
},
/// Shared memory configuration.
/// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,10 @@ validated_struct::validator! {
UnixPipeConf {
file_access_mask: Option<u32>
},
/// Configure TCP read buffer size
pub tcp_rx_buffer: Option<u32>,
/// Configure TCP write buffer size
pub tcp_tx_buffer: Option<u32>,
},
pub shared_memory:
ShmConf {
Expand Down
3 changes: 3 additions & 0 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ extern crate alloc;

mod listener;
mod multicast;
pub mod tcp;
#[cfg(feature = "tls")]
pub mod tls;
mod unicast;
Expand All @@ -44,6 +45,8 @@ use zenoh_result::ZResult;
/*************************************/

pub const BIND_INTERFACE: &str = "iface";
pub const TCP_TX_BUFFER_SIZE: &str = "tcp_tx_buffer";
pub const TCP_RX_BUFFER_SIZE: &str = "tcp_rx_buffer";

#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)]
pub struct Link {
Expand Down
100 changes: 100 additions & 0 deletions io/zenoh-link-commons/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::net::SocketAddr;

use tokio::net::{TcpListener, TcpSocket, TcpStream};
use zenoh_result::{zerror, ZResult};

pub struct TcpSocketConfig<'a> {
tx_buffer_size: Option<u32>,
rx_buffer_size: Option<u32>,
iface: Option<&'a str>,
}

impl<'a> TcpSocketConfig<'a> {
pub fn new(
tx_buffer_size: Option<u32>,
rx_buffer_size: Option<u32>,
iface: Option<&'a str>,
) -> Self {
Self {
tx_buffer_size,
rx_buffer_size,
iface,
}
}

/// Build a new TCPListener bound to `addr` with the given configuration parameters
pub fn new_listener(&self, addr: &SocketAddr) -> ZResult<(TcpListener, SocketAddr)> {
let socket = self.socket_with_config(addr)?;
// Build a TcpListener from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
socket.set_reuseaddr(true)?;
socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?;
// backlog (the maximum number of pending connections are queued): 1024
let listener = socket
.listen(1024)
.map_err(|e| zerror!("{}: {}", addr, e))?;

let local_addr = listener
.local_addr()
.map_err(|e| zerror!("{}: {}", addr, e))?;

Ok((listener, local_addr))
}

/// Connect to a TCP socket address at `dst_addr` with the given configuration parameters
pub async fn new_link(
&self,
dst_addr: &SocketAddr,
) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> {
let socket = self.socket_with_config(dst_addr)?;
// Build a TcpStream from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
let stream = socket
.connect(*dst_addr)
.await
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let src_addr = stream
.local_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let dst_addr = stream
.peer_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

Ok((stream, src_addr, dst_addr))
}

/// Creates a TcpSocket with the provided config
fn socket_with_config(&self, addr: &SocketAddr) -> ZResult<TcpSocket> {
let socket = match addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = self.iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}
if let Some(size) = self.tx_buffer_size {
socket.set_send_buffer_size(size)?;
}
if let Some(size) = self.rx_buffer_size {
socket.set_recv_buffer_size(size)?;
}

Ok(socket)
}
}
13 changes: 12 additions & 1 deletion io/zenoh-link/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use zenoh_link_serial::{LinkManagerUnicastSerial, SerialLocatorInspector, SERIAL
#[cfg(feature = "transport_tcp")]
pub use zenoh_link_tcp as tcp;
#[cfg(feature = "transport_tcp")]
use zenoh_link_tcp::{LinkManagerUnicastTcp, TcpLocatorInspector, TCP_LOCATOR_PREFIX};
use zenoh_link_tcp::{
LinkManagerUnicastTcp, TcpConfigurator, TcpLocatorInspector, TCP_LOCATOR_PREFIX,
};
#[cfg(feature = "transport_tls")]
pub use zenoh_link_tls as tls;
#[cfg(feature = "transport_tls")]
Expand Down Expand Up @@ -172,6 +174,8 @@ impl LocatorInspector {
}
#[derive(Default)]
pub struct LinkConfigurator {
#[cfg(feature = "transport_tcp")]
tcp_inspector: TcpConfigurator,
#[cfg(feature = "transport_quic")]
quic_inspector: QuicConfigurator,
#[cfg(feature = "transport_tls")]
Expand Down Expand Up @@ -199,6 +203,13 @@ impl LinkConfigurator {
errors.insert(proto, e);
}
};
#[cfg(feature = "transport_tcp")]
{
insert_config(
TCP_LOCATOR_PREFIX.into(),
self.tcp_inspector.inspect_config(config),
);
}
#[cfg(feature = "transport_quic")]
{
insert_config(
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-links/zenoh-link-tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ socket2 = { workspace = true }
tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = {workspace = true}
zenoh-config = { workspace = true }
zenoh-core = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
2 changes: 2 additions & 0 deletions io/zenoh-links/zenoh-link-tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use zenoh_protocol::{
use zenoh_result::{zerror, ZResult};

mod unicast;
mod utils;
pub use unicast::*;
pub use utils::TcpConfigurator;

// Default MTU (TCP PDU) in bytes.
// NOTE: Since TCP is a byte-stream oriented transport, theoretically it has
Expand Down
92 changes: 18 additions & 74 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ use std::{cell::UnsafeCell, convert::TryInto, fmt, net::SocketAddr, sync::Arc, t
use async_trait::async_trait;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpSocket, TcpStream},
net::{TcpListener, TcpStream},
};
use tokio_util::sync::CancellationToken;
use zenoh_link_commons::{
get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE,
get_ip_interface_names, tcp::TcpSocketConfig, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
transport::BatchSize,
};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};

use super::{
get_tcp_addrs, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, TCP_LINGER_TIMEOUT,
TCP_LOCATOR_PREFIX,
use crate::{
get_tcp_addrs, utils::TcpLinkConfig, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU,
TCP_LINGER_TIMEOUT, TCP_LOCATOR_PREFIX,
};

pub struct LinkUnicastTcp {
Expand Down Expand Up @@ -241,80 +241,22 @@ impl LinkManagerUnicastTcp {
}
}

impl LinkManagerUnicastTcp {
async fn new_link_inner(
&self,
dst_addr: &SocketAddr,
iface: Option<&str>,
) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> {
let socket = match dst_addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}

// Build a TcpStream from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
let stream = socket
.connect(*dst_addr)
.await
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let src_addr = stream
.local_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

let dst_addr = stream
.peer_addr()
.map_err(|e| zerror!("{}: {}", dst_addr, e))?;

Ok((stream, src_addr, dst_addr))
}

async fn new_listener_inner(
&self,
addr: &SocketAddr,
iface: Option<&str>,
) -> ZResult<(TcpListener, SocketAddr)> {
let socket = match addr {
SocketAddr::V4(_) => TcpSocket::new_v4(),
SocketAddr::V6(_) => TcpSocket::new_v6(),
}?;

if let Some(iface) = iface {
zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?;
}

// Build a TcpListener from TcpSocket
// https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html
socket.set_reuseaddr(true)?;
socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?;
// backlog (the maximum number of pending connections are queued): 1024
let listener = socket
.listen(1024)
.map_err(|e| zerror!("{}: {}", addr, e))?;

let local_addr = listener
.local_addr()
.map_err(|e| zerror!("{}: {}", addr, e))?;

Ok((listener, local_addr))
}
}

#[async_trait]
impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast> {
let dst_addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();
let iface = config.get(BIND_INTERFACE);

let link_config = TcpLinkConfig::new(&config)?;
let socket_config = TcpSocketConfig::new(
link_config.tx_buffer_size,
link_config.rx_buffer_size,
link_config.bind_iface,
);

let mut errs: Vec<ZError> = vec![];
for da in dst_addrs {
match self.new_link_inner(&da, iface).await {
match socket_config.new_link(&da).await {
Ok((stream, src_addr, dst_addr)) => {
let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr));
return Ok(LinkUnicast(link));
Expand All @@ -339,11 +281,13 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult<Locator> {
let addrs = get_tcp_addrs(endpoint.address()).await?;
let config = endpoint.config();
let iface = config.get(BIND_INTERFACE);

let link_config = TcpLinkConfig::new(&config)?;
let socket_config: TcpSocketConfig<'_> = link_config.into();

let mut errs: Vec<ZError> = vec![];
for da in addrs {
match self.new_listener_inner(&da, iface).await {
match socket_config.new_listener(&da) {
Ok((socket, local_addr)) => {
// Update the endpoint locator address
endpoint = EndPoint::new(
Expand Down
Loading

0 comments on commit b7d2051

Please sign in to comment.