diff --git a/Cargo.lock b/Cargo.lock index 0cb5efd249..e47d08dd0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5336,11 +5336,11 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "zenoh-config", "zenoh-core", "zenoh-link-commons", "zenoh-protocol", "zenoh-result", - "zenoh-util", ] [[package]] diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 6422759952..361b8de7fa 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -495,6 +495,12 @@ // note that mTLS (client authentication) is required for a listener to disconnect a client on expiration close_link_on_expiration: false, }, + /// Optional configuration for TCP system buffers sizes. Applies to TCP and TLS links. + /// + /// Configure TCP read buffer size (bytes) + // tcp_rx_buffer: 123456, + /// Configure TCP write buffer size (bytes) + // tcp_tx_buffer: 123456, }, /// Shared memory configuration. /// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 87c1b9b90d..f0ba50d0a6 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -515,6 +515,10 @@ validated_struct::validator! { UnixPipeConf { file_access_mask: Option }, + /// Configure TCP read buffer size + pub tcp_rx_buffer: Option, + /// Configure TCP write buffer size + pub tcp_tx_buffer: Option, }, pub shared_memory: ShmConf { diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index fae26cd02d..ed6f5e72f8 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -21,6 +21,7 @@ extern crate alloc; mod listener; mod multicast; +pub mod tcp; #[cfg(feature = "tls")] pub mod tls; mod unicast; @@ -44,6 +45,8 @@ use zenoh_result::ZResult; /*************************************/ pub const BIND_INTERFACE: &str = "iface"; +pub const TCP_TX_BUFFER_SIZE: &str = "tcp_tx_buffer"; +pub const TCP_RX_BUFFER_SIZE: &str = "tcp_rx_buffer"; #[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] pub struct Link { diff --git a/io/zenoh-link-commons/src/tcp.rs b/io/zenoh-link-commons/src/tcp.rs new file mode 100644 index 0000000000..db7da4d562 --- /dev/null +++ b/io/zenoh-link-commons/src/tcp.rs @@ -0,0 +1,100 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::net::SocketAddr; + +use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use zenoh_result::{zerror, ZResult}; + +pub struct TcpSocketConfig<'a> { + tx_buffer_size: Option, + rx_buffer_size: Option, + iface: Option<&'a str>, +} + +impl<'a> TcpSocketConfig<'a> { + pub fn new( + tx_buffer_size: Option, + rx_buffer_size: Option, + iface: Option<&'a str>, + ) -> Self { + Self { + tx_buffer_size, + rx_buffer_size, + iface, + } + } + + /// Build a new TCPListener bound to `addr` with the given configuration parameters + pub fn new_listener(&self, addr: &SocketAddr) -> ZResult<(TcpListener, SocketAddr)> { + let socket = self.socket_with_config(addr)?; + // Build a TcpListener from TcpSocket + // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html + socket.set_reuseaddr(true)?; + socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?; + // backlog (the maximum number of pending connections are queued): 1024 + let listener = socket + .listen(1024) + .map_err(|e| zerror!("{}: {}", addr, e))?; + + let local_addr = listener + .local_addr() + .map_err(|e| zerror!("{}: {}", addr, e))?; + + Ok((listener, local_addr)) + } + + /// Connect to a TCP socket address at `dst_addr` with the given configuration parameters + pub async fn new_link( + &self, + dst_addr: &SocketAddr, + ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { + let socket = self.socket_with_config(dst_addr)?; + // Build a TcpStream from TcpSocket + // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html + let stream = socket + .connect(*dst_addr) + .await + .map_err(|e| zerror!("{}: {}", dst_addr, e))?; + + let src_addr = stream + .local_addr() + .map_err(|e| zerror!("{}: {}", dst_addr, e))?; + + let dst_addr = stream + .peer_addr() + .map_err(|e| zerror!("{}: {}", dst_addr, e))?; + + Ok((stream, src_addr, dst_addr)) + } + + /// Creates a TcpSocket with the provided config + fn socket_with_config(&self, addr: &SocketAddr) -> ZResult { + let socket = match addr { + SocketAddr::V4(_) => TcpSocket::new_v4(), + SocketAddr::V6(_) => TcpSocket::new_v6(), + }?; + + if let Some(iface) = self.iface { + zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; + } + if let Some(size) = self.tx_buffer_size { + socket.set_send_buffer_size(size)?; + } + if let Some(size) = self.rx_buffer_size { + socket.set_recv_buffer_size(size)?; + } + + Ok(socket) + } +} diff --git a/io/zenoh-link/src/lib.rs b/io/zenoh-link/src/lib.rs index b092aaf9d6..d621182b5b 100644 --- a/io/zenoh-link/src/lib.rs +++ b/io/zenoh-link/src/lib.rs @@ -34,7 +34,9 @@ use zenoh_link_serial::{LinkManagerUnicastSerial, SerialLocatorInspector, SERIAL #[cfg(feature = "transport_tcp")] pub use zenoh_link_tcp as tcp; #[cfg(feature = "transport_tcp")] -use zenoh_link_tcp::{LinkManagerUnicastTcp, TcpLocatorInspector, TCP_LOCATOR_PREFIX}; +use zenoh_link_tcp::{ + LinkManagerUnicastTcp, TcpConfigurator, TcpLocatorInspector, TCP_LOCATOR_PREFIX, +}; #[cfg(feature = "transport_tls")] pub use zenoh_link_tls as tls; #[cfg(feature = "transport_tls")] @@ -172,6 +174,8 @@ impl LocatorInspector { } #[derive(Default)] pub struct LinkConfigurator { + #[cfg(feature = "transport_tcp")] + tcp_inspector: TcpConfigurator, #[cfg(feature = "transport_quic")] quic_inspector: QuicConfigurator, #[cfg(feature = "transport_tls")] @@ -199,6 +203,13 @@ impl LinkConfigurator { errors.insert(proto, e); } }; + #[cfg(feature = "transport_tcp")] + { + insert_config( + TCP_LOCATOR_PREFIX.into(), + self.tcp_inspector.inspect_config(config), + ); + } #[cfg(feature = "transport_quic")] { insert_config( diff --git a/io/zenoh-links/zenoh-link-tcp/Cargo.toml b/io/zenoh-links/zenoh-link-tcp/Cargo.toml index 8a631bdfbc..c6bce930ea 100644 --- a/io/zenoh-links/zenoh-link-tcp/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tcp/Cargo.toml @@ -30,8 +30,8 @@ socket2 = { workspace = true } tokio = { workspace = true, features = ["net", "io-util", "rt", "time"] } tokio-util = { workspace = true, features = ["rt"] } tracing = {workspace = true} +zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-link-commons = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } -zenoh-util = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-tcp/src/lib.rs b/io/zenoh-links/zenoh-link-tcp/src/lib.rs index 0654943f4f..bd642ece83 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/lib.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/lib.rs @@ -29,7 +29,9 @@ use zenoh_protocol::{ use zenoh_result::{zerror, ZResult}; mod unicast; +mod utils; pub use unicast::*; +pub use utils::TcpConfigurator; // Default MTU (TCP PDU) in bytes. // NOTE: Since TCP is a byte-stream oriented transport, theoretically it has diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index e3eb9d9796..d61bf515dd 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -16,12 +16,12 @@ use std::{cell::UnsafeCell, convert::TryInto, fmt, net::SocketAddr, sync::Arc, t use async_trait::async_trait; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - net::{TcpListener, TcpSocket, TcpStream}, + net::{TcpListener, TcpStream}, }; use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ - get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + get_ip_interface_names, tcp::TcpSocketConfig, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -29,9 +29,9 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; -use super::{ - get_tcp_addrs, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, TCP_LINGER_TIMEOUT, - TCP_LOCATOR_PREFIX, +use crate::{ + get_tcp_addrs, utils::TcpLinkConfig, TCP_ACCEPT_THROTTLE_TIME, TCP_DEFAULT_MTU, + TCP_LINGER_TIMEOUT, TCP_LOCATOR_PREFIX, }; pub struct LinkUnicastTcp { @@ -241,80 +241,22 @@ impl LinkManagerUnicastTcp { } } -impl LinkManagerUnicastTcp { - async fn new_link_inner( - &self, - dst_addr: &SocketAddr, - iface: Option<&str>, - ) -> ZResult<(TcpStream, SocketAddr, SocketAddr)> { - let socket = match dst_addr { - SocketAddr::V4(_) => TcpSocket::new_v4(), - SocketAddr::V6(_) => TcpSocket::new_v6(), - }?; - - if let Some(iface) = iface { - zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; - } - - // Build a TcpStream from TcpSocket - // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html - let stream = socket - .connect(*dst_addr) - .await - .map_err(|e| zerror!("{}: {}", dst_addr, e))?; - - let src_addr = stream - .local_addr() - .map_err(|e| zerror!("{}: {}", dst_addr, e))?; - - let dst_addr = stream - .peer_addr() - .map_err(|e| zerror!("{}: {}", dst_addr, e))?; - - Ok((stream, src_addr, dst_addr)) - } - - async fn new_listener_inner( - &self, - addr: &SocketAddr, - iface: Option<&str>, - ) -> ZResult<(TcpListener, SocketAddr)> { - let socket = match addr { - SocketAddr::V4(_) => TcpSocket::new_v4(), - SocketAddr::V6(_) => TcpSocket::new_v6(), - }?; - - if let Some(iface) = iface { - zenoh_util::net::set_bind_to_device_tcp_socket(&socket, iface)?; - } - - // Build a TcpListener from TcpSocket - // https://docs.rs/tokio/latest/tokio/net/struct.TcpSocket.html - socket.set_reuseaddr(true)?; - socket.bind(*addr).map_err(|e| zerror!("{}: {}", addr, e))?; - // backlog (the maximum number of pending connections are queued): 1024 - let listener = socket - .listen(1024) - .map_err(|e| zerror!("{}: {}", addr, e))?; - - let local_addr = listener - .local_addr() - .map_err(|e| zerror!("{}: {}", addr, e))?; - - Ok((listener, local_addr)) - } -} - #[async_trait] impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_link(&self, endpoint: EndPoint) -> ZResult { let dst_addrs = get_tcp_addrs(endpoint.address()).await?; let config = endpoint.config(); - let iface = config.get(BIND_INTERFACE); + + let link_config = TcpLinkConfig::new(&config)?; + let socket_config = TcpSocketConfig::new( + link_config.tx_buffer_size, + link_config.rx_buffer_size, + link_config.bind_iface, + ); let mut errs: Vec = vec![]; for da in dst_addrs { - match self.new_link_inner(&da, iface).await { + match socket_config.new_link(&da).await { Ok((stream, src_addr, dst_addr)) => { let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); return Ok(LinkUnicast(link)); @@ -339,11 +281,13 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { let addrs = get_tcp_addrs(endpoint.address()).await?; let config = endpoint.config(); - let iface = config.get(BIND_INTERFACE); + + let link_config = TcpLinkConfig::new(&config)?; + let socket_config: TcpSocketConfig<'_> = link_config.into(); let mut errs: Vec = vec![]; for da in addrs { - match self.new_listener_inner(&da, iface).await { + match socket_config.new_listener(&da) { Ok((socket, local_addr)) => { // Update the endpoint locator address endpoint = EndPoint::new( diff --git a/io/zenoh-links/zenoh-link-tcp/src/utils.rs b/io/zenoh-links/zenoh-link-tcp/src/utils.rs new file mode 100644 index 0000000000..4969693193 --- /dev/null +++ b/io/zenoh-links/zenoh-link-tcp/src/utils.rs @@ -0,0 +1,80 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh_config::Config as ZenohConfig; +use zenoh_link_commons::{ + tcp::TcpSocketConfig, ConfigurationInspector, BIND_INTERFACE, TCP_RX_BUFFER_SIZE, + TCP_TX_BUFFER_SIZE, +}; +use zenoh_protocol::core::{parameters, Config}; +use zenoh_result::{zerror, ZResult}; + +#[derive(Default, Clone, Copy, Debug)] +pub struct TcpConfigurator; + +impl ConfigurationInspector for TcpConfigurator { + fn inspect_config(&self, config: &ZenohConfig) -> ZResult { + let mut ps: Vec<(&str, &str)> = vec![]; + let c = config.transport().link(); + + let rx_buffer_size; + if let Some(size) = c.tcp_rx_buffer { + rx_buffer_size = size.to_string(); + ps.push((TCP_RX_BUFFER_SIZE, &rx_buffer_size)); + } + let tx_buffer_size; + if let Some(size) = c.tcp_tx_buffer { + tx_buffer_size = size.to_string(); + ps.push((TCP_TX_BUFFER_SIZE, &tx_buffer_size)); + } + + Ok(parameters::from_iter(ps.drain(..))) + } +} + +pub(crate) struct TcpLinkConfig<'a> { + pub(crate) rx_buffer_size: Option, + pub(crate) tx_buffer_size: Option, + pub(crate) bind_iface: Option<&'a str>, +} + +impl<'a> TcpLinkConfig<'a> { + pub(crate) fn new(config: &'a Config) -> ZResult { + let mut tcp_config = Self { + rx_buffer_size: None, + tx_buffer_size: None, + bind_iface: config.get(BIND_INTERFACE), + }; + + if let Some(size) = config.get(TCP_RX_BUFFER_SIZE) { + tcp_config.rx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP read buffer size argument: {}", size))?, + ); + }; + if let Some(size) = config.get(TCP_TX_BUFFER_SIZE) { + tcp_config.tx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP write buffer size argument: {}", size))?, + ); + }; + + Ok(tcp_config) + } +} + +impl<'a> From> for TcpSocketConfig<'a> { + fn from(value: TcpLinkConfig<'a>) -> Self { + Self::new(value.tx_buffer_size, value.rx_buffer_size, value.bind_iface) + } +} diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 046288800e..62250d354a 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -324,29 +324,17 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let connector = TlsConnector::from(config); // Initialize the TcpStream - let tcp_stream = TcpStream::connect(addr).await.map_err(|e| { - zerror!( - "Can not create a new TLS link bound to {:?}: {}", - server_name, - e - ) - })?; - - let src_addr = tcp_stream.local_addr().map_err(|e| { - zerror!( - "Can not create a new TLS link bound to {:?}: {}", - server_name, - e - ) - })?; - - let dst_addr = tcp_stream.peer_addr().map_err(|e| { - zerror!( - "Can not create a new TLS link bound to {:?}: {}", - server_name, - e - ) - })?; + let (tcp_stream, src_addr, dst_addr) = client_config + .tcp_socket_config + .new_link(&addr) + .await + .map_err(|e| { + zerror!( + "Can not create a new TLS link bound to {:?}: {}", + server_name, + e + ) + })?; // Initialize the TlsStream let tls_stream = connector @@ -404,13 +392,11 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { .map_err(|e| zerror!("Cannot create a new TLS listener on {addr}. {e}"))?; // Initialize the TcpListener - let socket = TcpListener::bind(addr) - .await + let (socket, local_addr) = tls_server_config + .tcp_socket_config + .new_listener(&addr) .map_err(|e| zerror!("Can not create a new TLS listener on {}: {}", addr, e))?; - let local_addr = socket - .local_addr() - .map_err(|e| zerror!("Can not create a new TLS listener on {}: {}", addr, e))?; let local_port = local_addr.port(); // Initialize the TlsAcceptor diff --git a/io/zenoh-links/zenoh-link-tls/src/utils.rs b/io/zenoh-links/zenoh-link-tls/src/utils.rs index 74e7cc9e51..e85be4573f 100644 --- a/io/zenoh-links/zenoh-link-tls/src/utils.rs +++ b/io/zenoh-links/zenoh-link-tls/src/utils.rs @@ -31,7 +31,10 @@ use rustls_pki_types::ServerName; use secrecy::ExposeSecret; use webpki::anchor_from_trusted_cert; use zenoh_config::Config as ZenohConfig; -use zenoh_link_commons::{tls::WebPkiVerifierAnyServerName, ConfigurationInspector}; +use zenoh_link_commons::{ + tcp::TcpSocketConfig, tls::WebPkiVerifierAnyServerName, ConfigurationInspector, + TCP_RX_BUFFER_SIZE, TCP_TX_BUFFER_SIZE, +}; use zenoh_protocol::core::{ endpoint::{Address, Config}, parameters, @@ -150,18 +153,32 @@ impl ConfigurationInspector for TlsConfigurator { false => ps.push((TLS_CLOSE_LINK_ON_EXPIRATION, "false")), } + let link_c = config.transport().link(); + let rx_buffer_size; + if let Some(size) = link_c.tcp_rx_buffer { + rx_buffer_size = size.to_string(); + ps.push((TCP_RX_BUFFER_SIZE, &rx_buffer_size)); + } + + let tx_buffer_size; + if let Some(size) = link_c.tcp_tx_buffer { + tx_buffer_size = size.to_string(); + ps.push((TCP_TX_BUFFER_SIZE, &tx_buffer_size)); + } + Ok(parameters::from_iter(ps.drain(..))) } } -pub(crate) struct TlsServerConfig { +pub(crate) struct TlsServerConfig<'a> { pub(crate) server_config: ServerConfig, pub(crate) tls_handshake_timeout: Duration, pub(crate) tls_close_link_on_expiration: bool, + pub(crate) tcp_socket_config: TcpSocketConfig<'a>, } -impl TlsServerConfig { - pub async fn new(config: &Config<'_>) -> ZResult { +impl<'a> TlsServerConfig<'a> { + pub async fn new(config: &Config<'a>) -> ZResult { let tls_server_client_auth: bool = match config.get(TLS_ENABLE_MTLS) { Some(s) => s .parse() @@ -241,10 +258,27 @@ impl TlsServerConfig { .unwrap_or(config::TLS_HANDSHAKE_TIMEOUT_MS_DEFAULT), ); + let mut tcp_rx_buffer_size = None; + if let Some(size) = config.get(TCP_RX_BUFFER_SIZE) { + tcp_rx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP read buffer size argument: {}", size))?, + ); + }; + let mut tcp_tx_buffer_size = None; + if let Some(size) = config.get(TCP_TX_BUFFER_SIZE) { + tcp_tx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP write buffer size argument: {}", size))?, + ); + }; + Ok(TlsServerConfig { server_config: sc, tls_handshake_timeout, tls_close_link_on_expiration, + // TODO: add interface binding + tcp_socket_config: TcpSocketConfig::new(tcp_tx_buffer_size, tcp_rx_buffer_size, None), }) } @@ -269,13 +303,14 @@ impl TlsServerConfig { } } -pub(crate) struct TlsClientConfig { +pub(crate) struct TlsClientConfig<'a> { pub(crate) client_config: ClientConfig, pub(crate) tls_close_link_on_expiration: bool, + pub(crate) tcp_socket_config: TcpSocketConfig<'a>, } -impl TlsClientConfig { - pub async fn new(config: &Config<'_>) -> ZResult { +impl<'a> TlsClientConfig<'a> { + pub async fn new(config: &Config<'a>) -> ZResult { let tls_client_server_auth: bool = match config.get(TLS_ENABLE_MTLS) { Some(s) => s .parse() @@ -386,9 +421,27 @@ impl TlsClientConfig { .with_no_client_auth() } }; + + let mut tcp_rx_buffer_size = None; + if let Some(size) = config.get(TCP_RX_BUFFER_SIZE) { + tcp_rx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP read buffer size argument: {}", size))?, + ); + }; + let mut tcp_tx_buffer_size = None; + if let Some(size) = config.get(TCP_TX_BUFFER_SIZE) { + tcp_tx_buffer_size = Some( + size.parse() + .map_err(|_| zerror!("Unknown TCP write buffer size argument: {}", size))?, + ); + }; + Ok(TlsClientConfig { client_config: cc, tls_close_link_on_expiration, + // TODO: add interface binding + tcp_socket_config: TcpSocketConfig::new(tcp_tx_buffer_size, tcp_rx_buffer_size, None), }) } diff --git a/zenoh/tests/tcp_buffers.rs b/zenoh/tests/tcp_buffers.rs new file mode 100644 index 0000000000..8d75682507 --- /dev/null +++ b/zenoh/tests/tcp_buffers.rs @@ -0,0 +1,111 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{Config, Wait}; + +#[test] +fn buffer_size_config() { + let mut config = Config::default(); + config + .insert_json5( + "transport/link", + r#" + { + tcp_tx_buffer: 65000, + tcp_rx_buffer: 65000, + } + "#, + ) + .unwrap(); + + config + .insert_json5("listen/endpoints", r#"["tcp/[::]:0"]"#) + .unwrap(); + + zenoh::open(config).wait().unwrap(); +} + +#[test] +fn buffer_size_endpoint() { + let mut config = Config::default(); + config + .insert_json5( + "listen/endpoints", + r#"["tcp/[::]:0#tcp_tx_buffer=65000;tcp_rx_buffer=65000"]"#, + ) + .unwrap(); + + zenoh::open(config).wait().unwrap(); +} + +#[cfg(target_os = "macos")] +#[test] +#[should_panic(expected = "Can not create a new TCP listener")] +fn buffer_size_override() { + buffer_size_config_override(); +} + +#[cfg(not(target_os = "macos"))] +#[test] +fn buffer_size_override() { + buffer_size_config_override(); +} + +fn buffer_size_config_override() { + let mut config = Config::default(); + config + .insert_json5( + "transport/link", + r#" + { + tcp_tx_buffer: 0, + tcp_rx_buffer: 0, + } + "#, + ) + .unwrap(); + + config + .insert_json5( + "listen/endpoints", + r#"["tcp/[::]:0#tcp_tx_buffer=65000;tcp_rx_buffer=65000"]"#, + ) + .unwrap(); + + zenoh::open(config).wait().unwrap(); +} + +#[cfg(target_os = "macos")] +#[test] +#[should_panic(expected = "Can not create a new TCP listener")] +fn buffer_size_zero() { + listen_zero_buffers(); +} + +#[cfg(not(target_os = "macos"))] +#[test] +fn buffer_size_zero() { + listen_zero_buffers(); +} + +fn listen_zero_buffers() { + let mut config = Config::default(); + config + .insert_json5( + "listen/endpoints", + r#"["tcp/[::]:0#tcp_tx_buffer=0;tcp_rx_buffer=0"]"#, + ) + .unwrap(); + zenoh::open(config).wait().unwrap(); +}