Skip to content

Commit

Permalink
Merge pull request #1597 from ZettaScaleLabs/feat/fragment_marker
Browse files Browse the repository at this point in the history
feat: add start/stop extension markers for fragment chain
Mallets authored Dec 5, 2024
2 parents b3ccf82 + 13cf0de commit 64e8caa
Showing 21 changed files with 899 additions and 23 deletions.
38 changes: 36 additions & 2 deletions commons/zenoh-codec/src/transport/fragment.rs
Original file line number Diff line number Diff line change
@@ -39,6 +39,8 @@ where
more,
sn,
ext_qos,
ext_first,
ext_drop,
} = x;

// Header
@@ -49,7 +51,10 @@ where
if *more {
header |= flag::M;
}
if ext_qos != &ext::QoSType::DEFAULT {
let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8
+ ext_first.is_some() as u8
+ ext_drop.is_some() as u8;
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;
@@ -59,7 +64,16 @@ where

// Extensions
if ext_qos != &ext::QoSType::DEFAULT {
self.write(&mut *writer, (*ext_qos, false))?;
n_exts -= 1;
self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
}
if let Some(first) = ext_first {
n_exts -= 1;
self.write(&mut *writer, (first, n_exts != 0))?
}
if let Some(drop) = ext_drop {
n_exts -= 1;
self.write(&mut *writer, (drop, n_exts != 0))?
}

Ok(())
@@ -99,6 +113,8 @@ where

// Extensions
let mut ext_qos = ext::QoSType::DEFAULT;
let mut ext_first = None;
let mut ext_drop = None;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
@@ -110,6 +126,16 @@ where
ext_qos = q;
has_ext = ext;
}
ext::First::ID => {
let (first, ext): (ext::First, bool) = eodec.read(&mut *reader)?;
ext_first = Some(first);
has_ext = ext;
}
ext::Drop::ID => {
let (drop, ext): (ext::Drop, bool) = eodec.read(&mut *reader)?;
ext_drop = Some(drop);
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Fragment", ext)?;
}
@@ -121,6 +147,8 @@ where
more,
sn,
ext_qos,
ext_first,
ext_drop,
})
}
}
@@ -139,6 +167,8 @@ where
sn,
payload,
ext_qos,
ext_first,
ext_drop,
} = x;

// Header
@@ -147,6 +177,8 @@ where
more: *more,
sn: *sn,
ext_qos: *ext_qos,
ext_first: *ext_first,
ext_drop: *ext_drop,
};
self.write(&mut *writer, &header)?;

@@ -185,6 +217,8 @@ where
more: header.more,
sn: header.sn,
ext_qos: header.ext_qos,
ext_first: header.ext_first,
ext_drop: header.ext_drop,
payload,
})
}
30 changes: 28 additions & 2 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
@@ -64,7 +65,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
@@ -125,6 +127,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
@@ -186,6 +192,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
@@ -228,6 +235,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitSyn", ext)?;
}
@@ -248,6 +260,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
@@ -275,6 +288,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
} = x;

// Header
@@ -287,7 +301,8 @@ where
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
+ (ext_compression.is_some() as u8);
+ (ext_compression.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;

#[cfg(feature = "shared-memory")]
{
@@ -351,6 +366,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (compression, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
@@ -415,6 +434,7 @@ where
let mut ext_mlink = None;
let mut ext_lowlatency = None;
let mut ext_compression = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
@@ -457,6 +477,11 @@ where
ext_compression = Some(q);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "InitAck", ext)?;
}
@@ -478,6 +503,7 @@ where
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
})
}
}
16 changes: 15 additions & 1 deletion commons/zenoh-codec/src/transport/join.rs
Original file line number Diff line number Diff line change
@@ -150,6 +150,7 @@ where
next_sn,
ext_qos,
ext_shm,
ext_patch,
} = x;

// Header
@@ -160,7 +161,9 @@ where
if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST {
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8);
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_shm.is_some() as u8)
+ (*ext_patch != ext::PatchType::NONE) as u8;
if n_exts != 0 {
header |= flag::Z;
}
@@ -201,6 +204,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (shm, n_exts != 0))?;
}
if *ext_patch != ext::PatchType::NONE {
n_exts -= 1;
self.write(&mut *writer, (*ext_patch, n_exts != 0))?;
}

Ok(())
}
@@ -264,6 +271,7 @@ where
// Extensions
let mut ext_qos = None;
let mut ext_shm = None;
let mut ext_patch = ext::PatchType::NONE;

let mut has_ext = imsg::has_flag(self.header, flag::Z);
while has_ext {
@@ -280,6 +288,11 @@ where
ext_shm = Some(s);
has_ext = ext;
}
ext::Patch::ID => {
let (p, ext): (ext::PatchType, bool) = eodec.read(&mut *reader)?;
ext_patch = p;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Join", ext)?;
}
@@ -296,6 +309,7 @@ where
next_sn,
ext_qos,
ext_shm,
ext_patch,
})
}
}
40 changes: 40 additions & 0 deletions commons/zenoh-codec/src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -176,3 +176,43 @@ where
Ok((ext.into(), more))
}
}

// Extensions: Patch
impl<W, const ID: u8> WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ext: ZExtZ64<{ ID }> = x.into();

self.write(&mut *writer, (&ext, more))
}
}

impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);
codec.read(reader)
}
}

impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
Ok((ext.into(), more))
}
}
24 changes: 23 additions & 1 deletion commons/zenoh-protocol/src/transport/fragment.rs
Original file line number Diff line number Diff line change
@@ -75,14 +75,26 @@ pub struct Fragment {
pub sn: TransportSn,
pub payload: ZSlice,
pub ext_qos: ext::QoSType,
pub ext_first: Option<ext::First>,
pub ext_drop: Option<ext::Drop>,
}

// Extensions
pub mod ext {
use crate::{common::ZExtZ64, zextz64};
use crate::{
common::{ZExtUnit, ZExtZ64},
zextunit, zextz64,
};

pub type QoS = zextz64!(0x1, true);
pub type QoSType = crate::transport::ext::QoSType<{ QoS::ID }>;

/// # Start extension
/// Mark the first fragment of a fragmented message
pub type First = zextunit!(0x2, false);
/// # Stop extension
/// Indicate that the remaining fragments has been dropped
pub type Drop = zextunit!(0x3, false);
}

impl Fragment {
@@ -97,13 +109,17 @@ impl Fragment {
let sn: TransportSn = rng.gen();
let payload = ZSlice::rand(rng.gen_range(8..128));
let ext_qos = ext::QoSType::rand();
let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);

Fragment {
reliability,
sn,
more,
payload,
ext_qos,
ext_first,
ext_drop,
}
}
}
@@ -115,6 +131,8 @@ pub struct FragmentHeader {
pub more: bool,
pub sn: TransportSn,
pub ext_qos: ext::QoSType,
pub ext_first: Option<ext::First>,
pub ext_drop: Option<ext::Drop>,
}

impl FragmentHeader {
@@ -128,12 +146,16 @@ impl FragmentHeader {
let more = rng.gen_bool(0.5);
let sn: TransportSn = rng.gen();
let ext_qos = ext::QoSType::rand();
let ext_first = rng.gen_bool(0.5).then(ext::First::rand);
let ext_drop = rng.gen_bool(0.5).then(ext::Drop::rand);

FragmentHeader {
reliability,
more,
sn,
ext_qos,
ext_first,
ext_drop,
}
}
}
13 changes: 13 additions & 0 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
@@ -131,6 +131,7 @@ pub struct InitSyn {
pub ext_mlink: Option<ext::MultiLink>,
pub ext_lowlatency: Option<ext::LowLatency>,
pub ext_compression: Option<ext::Compression>,
pub ext_patch: ext::PatchType,
}

// Extensions
@@ -165,6 +166,13 @@ pub mod ext {
/// # Compression extension
/// Used to negotiate the use of compression on the link
pub type Compression = zextunit!(0x6, false);

/// # Patch extension
/// Used to negotiate the patch version of the protocol
/// if not present (or 0), then protocol as released with 1.0.0
/// if >= 1, then fragmentation first/drop markers
pub type Patch = zextz64!(0x7, false);
pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>;
}

impl InitSyn {
@@ -189,6 +197,7 @@ impl InitSyn {
let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_patch = ext::PatchType::rand();

Self {
version,
@@ -204,6 +213,7 @@ impl InitSyn {
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
}
}
}
@@ -234,6 +244,7 @@ pub struct InitAck {
pub ext_mlink: Option<ext::MultiLink>,
pub ext_lowlatency: Option<ext::LowLatency>,
pub ext_compression: Option<ext::Compression>,
pub ext_patch: ext::PatchType,
}

impl InitAck {
@@ -263,6 +274,7 @@ impl InitAck {
let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_compression = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_patch = ext::PatchType::rand();

Self {
version,
@@ -279,6 +291,7 @@ impl InitAck {
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
}
}
}
15 changes: 14 additions & 1 deletion commons/zenoh-protocol/src/transport/join.rs
Original file line number Diff line number Diff line change
@@ -102,6 +102,7 @@ pub struct Join {
pub next_sn: PrioritySn,
pub ext_qos: Option<ext::QoSType>,
pub ext_shm: Option<ext::Shm>,
pub ext_patch: ext::PatchType,
}

pub mod flag {
@@ -115,7 +116,10 @@ pub mod ext {
use alloc::boxed::Box;

use super::{Priority, PrioritySn};
use crate::{common::ZExtZBuf, zextzbuf};
use crate::{
common::{ZExtZ64, ZExtZBuf},
zextz64, zextzbuf,
};

/// # QoS extension
/// Used to announce next sn when QoS is enabled
@@ -125,6 +129,13 @@ pub mod ext {
/// # Shm extension
/// Used to advertise shared memory capabilities
pub type Shm = zextzbuf!(0x2, true);

/// # Patch extension
/// Used to negotiate the patch version of the protocol
/// if not present (or 0), then protocol as released with 1.0.0
/// if >= 1, then fragmentation first/drop markers
pub type Patch = zextz64!(0x7, false); // use the same id as Init
pub type PatchType = crate::transport::ext::PatchType<{ Patch::ID }>;
}

impl Join {
@@ -151,6 +162,7 @@ impl Join {
.gen_bool(0.5)
.then_some(Box::new([PrioritySn::rand(); Priority::NUM]));
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_patch = ext::PatchType::rand();

Self {
version,
@@ -162,6 +174,7 @@ impl Join {
next_sn,
ext_qos,
ext_shm,
ext_patch,
}
}
}
38 changes: 38 additions & 0 deletions commons/zenoh-protocol/src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -311,4 +311,42 @@ pub mod ext {
ZExtZ64::new(ext.inner as u64)
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct PatchType<const ID: u8>(u8);

impl<const ID: u8> PatchType<ID> {
pub const NONE: Self = Self(0);
pub const CURRENT: Self = Self(1);

pub fn new(int: u8) -> Self {
Self(int)
}

pub fn raw(self) -> u8 {
self.0
}

pub fn has_fragmentation_markers(&self) -> bool {
self.0 >= 1
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
Self(rand::thread_rng().gen())
}
}

impl<const ID: u8> From<ZExtZ64<ID>> for PatchType<ID> {
fn from(ext: ZExtZ64<ID>) -> Self {
Self(ext.value as u8)
}
}

impl<const ID: u8> From<PatchType<ID>> for ZExtZ64<ID> {
fn from(ext: PatchType<ID>) -> Self {
ZExtZ64::new(ext.0 as u64)
}
}
}
15 changes: 15 additions & 0 deletions io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
@@ -201,6 +201,9 @@ pub struct WBatch {
// Statistics related to this batch
#[cfg(feature = "stats")]
pub stats: WBatchStats,
// an ephemeral batch will not be recycled in the pipeline
// it can be used to push a stop fragment when no batch are available
pub ephemeral: bool,
}

impl WBatch {
@@ -209,6 +212,7 @@ impl WBatch {
buffer: BBuf::with_capacity(config.mtu as usize),
codec: Zenoh080Batch::new(),
config,
ephemeral: false,
#[cfg(feature = "stats")]
stats: WBatchStats::default(),
};
@@ -219,6 +223,17 @@ impl WBatch {
batch
}

pub fn new_ephemeral(config: BatchConfig) -> Self {
Self {
ephemeral: true,
..Self::new(config)
}
}

pub fn is_ephemeral(&self) -> bool {
self.ephemeral
}

/// Verify that the [`WBatch`] has no serialized bytes.
#[inline(always)]
pub fn is_empty(&self) -> bool {
29 changes: 24 additions & 5 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ use zenoh_protocol::{
core::Priority,
network::NetworkMessage,
transport::{
fragment,
fragment::FragmentHeader,
frame::{self, FrameHeader},
AtomicBatchSize, BatchSize, TransportMessage,
@@ -232,6 +233,8 @@ struct StageIn {
mutex: StageInMutex,
fragbuf: ZBuf,
batching: bool,
// used for stop fragment
batch_config: BatchConfig,
}

impl StageIn {
@@ -352,19 +355,32 @@ impl StageIn {
more: true,
sn,
ext_qos: frame.ext_qos,
ext_first: Some(fragment::ext::First::new()),
ext_drop: None,
};
let mut reader = self.fragbuf.reader();
while reader.can_read() {
// Get the current serialization batch
// If deadline is reached, sequence number is incremented with `SeqNumGenerator::get`
// in order to break the fragment chain already sent.
batch = zgetbatch_rets!(let _ = tch.sn.get());
batch = zgetbatch_rets!({
// If no fragment has been sent, the sequence number is just reset
if fragment.ext_first.is_some() {
tch.sn.set(sn).unwrap()
// Otherwise, an ephemeral batch is created to send the stop fragment
} else {
let mut batch = WBatch::new_ephemeral(self.batch_config);
self.fragbuf.clear();
fragment.ext_drop = Some(fragment::ext::Drop::new());
let _ = batch.encode((&mut self.fragbuf.reader(), &mut fragment));
self.s_out.move_batch(batch);
}
});

// Serialize the message fragment
match batch.encode((&mut reader, &mut fragment)) {
Ok(_) => {
// Update the SN
fragment.sn = tch.sn.get();
fragment.ext_first = None;
// Move the serialization batch into the OUT pipeline
self.s_out.move_batch(batch);
}
@@ -675,6 +691,7 @@ impl TransmissionPipeline {
},
fragbuf: ZBuf::empty(),
batching: config.batching_enabled,
batch_config: config.batch,
}));

// The stage out for this priority
@@ -863,8 +880,10 @@ impl TransmissionPipelineConsumer {
}

pub(crate) fn refill(&mut self, batch: WBatch, priority: Priority) {
self.stage_out[priority as usize].refill(batch);
self.status.set_congested(priority, false);
if !batch.is_ephemeral() {
self.stage_out[priority as usize].refill(batch);
self.status.set_congested(priority, false);
}
}

pub(crate) fn drain(&mut self) -> Vec<(WBatch, usize)> {
5 changes: 4 additions & 1 deletion io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,9 @@ use zenoh_core::{zcondfeat, zlock};
use zenoh_link::{LinkMulticast, Locator};
use zenoh_protocol::{
core::{Bits, Priority, Resolution, WhatAmI, ZenohIdProto},
transport::{BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn},
transport::{
join::ext::PatchType, BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn,
},
};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::{RecyclingObject, RecyclingObjectPool, Signal};
@@ -495,6 +497,7 @@ async fn tx_task(
next_sn,
ext_qos,
ext_shm: None,
ext_patch: PatchType::CURRENT
}
.into();

25 changes: 22 additions & 3 deletions io/zenoh-transport/src/multicast/rx.rs
Original file line number Diff line number Diff line change
@@ -166,7 +166,7 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

if !self.verify_sn(sn, &mut guard)? {
if !self.verify_sn("Frame", sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
@@ -183,6 +183,8 @@ impl TransportMulticastInner {
more,
sn,
ext_qos,
ext_first,
ext_drop,
payload,
} = fragment;

@@ -205,10 +207,25 @@ impl TransportMulticastInner {
Reliability::BestEffort => zlock!(c.best_effort),
};

if !self.verify_sn(sn, &mut guard)? {
if !self.verify_sn("Fragment", sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
if peer.patch.has_fragmentation_markers() {
if ext_first.is_some() {
guard.defrag.clear();
} else if guard.defrag.is_empty() {
tracing::trace!(
"Transport: {}. First fragment received without start marker.",
self.manager.config.zid,
);
return Ok(());
}
if ext_drop.is_some() {
guard.defrag.clear();
return Ok(());
}
}
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
@@ -236,14 +253,16 @@ impl TransportMulticastInner {

fn verify_sn(
&self,
message_type: &str,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<bool> {
let precedes = guard.sn.precedes(sn)?;
if !precedes {
tracing::debug!(
"Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.",
"Transport: {}. {} with invalid SN dropped: {}. Expected: {}.",
self.manager.config.zid,
message_type,
sn,
guard.sn.next()
);
5 changes: 4 additions & 1 deletion io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{
cmp::min,
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
@@ -25,7 +26,7 @@ use zenoh_core::{zcondfeat, zread, zwrite};
use zenoh_link::{Link, Locator};
use zenoh_protocol::{
core::{Bits, Field, Priority, Resolution, WhatAmI, ZenohIdProto},
transport::{batch_size, close, Close, Join, TransportMessage},
transport::{batch_size, close, join::ext::PatchType, Close, Join, TransportMessage},
};
use zenoh_result::{bail, ZResult};
use zenoh_task::TaskController;
@@ -61,6 +62,7 @@ pub(super) struct TransportMulticastPeer {
token: CancellationToken,
pub(super) priority_rx: Box<[TransportPriorityRx]>,
pub(super) handler: Arc<dyn TransportPeerEventHandler>,
pub(super) patch: PatchType,
}

impl TransportMulticastPeer {
@@ -415,6 +417,7 @@ impl TransportMulticastInner {
token,
priority_rx,
handler,
patch: min(PatchType::CURRENT, join.ext_patch),
};
zwrite!(self.peers).insert(locator.clone(), peer);

23 changes: 22 additions & 1 deletion io/zenoh-transport/src/unicast/establishment/accept.rs
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ struct StateTransport {
#[cfg(feature = "shared-memory")]
ext_shm: ext::shm::StateAccept,
ext_lowlatency: ext::lowlatency::StateAccept,
ext_patch: ext::patch::StateAccept,
}

#[cfg(any(feature = "transport_auth", feature = "transport_compression"))]
@@ -143,6 +144,7 @@ struct AcceptLink<'a> {
ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>,
#[cfg(feature = "transport_compression")]
ext_compression: ext::compression::CompressionFsm<'a>,
ext_patch: ext::patch::PatchFsm<'a>,
}

#[async_trait]
@@ -268,6 +270,12 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

// Extension Patch
self.ext_patch
.recv_init_syn((&mut state.transport.ext_patch, init_syn.ext_patch))
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

let output = RecvInitSynOut {
other_zid: init_syn.zid,
other_whatami: init_syn.whatami,
@@ -330,7 +338,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

// Extension MultiLink
// Extension Compression
let ext_compression = zcondfeat!(
"transport_compression",
self.ext_compression
@@ -340,6 +348,13 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
None
);

// Extension Patch
let ext_patch = self
.ext_patch
.send_init_ack(&state.transport.ext_patch)
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

// Create the cookie
let cookie_nonce: u64 = zasynclock!(self.prng).gen();
let cookie = Cookie {
@@ -358,6 +373,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
ext_lowlatency: state.transport.ext_lowlatency,
#[cfg(feature = "transport_compression")]
ext_compression: state.link.ext_compression,
ext_patch: state.transport.ext_patch,
};

let mut encrypted = vec![];
@@ -391,6 +407,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
}
.into();

@@ -491,6 +508,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> {
#[cfg(feature = "shared-memory")]
ext_shm: cookie.ext_shm,
ext_lowlatency: cookie.ext_lowlatency,
ext_patch: cookie.ext_patch,
},
#[cfg(any(feature = "transport_auth", feature = "transport_compression"))]
link: StateLink {
@@ -681,6 +699,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -
ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(),
#[cfg(feature = "transport_compression")]
ext_compression: ext::compression::CompressionFsm::new(),
ext_patch: ext::patch::PatchFsm::new(),
};

// Init handshake
@@ -719,6 +738,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -
ext_lowlatency: ext::lowlatency::StateAccept::new(
manager.config.unicast.is_lowlatency,
),
ext_patch: ext::patch::StateAccept::new(),
},
#[cfg(any(feature = "transport_auth", feature = "transport_compression"))]
link: StateLink {
@@ -786,6 +806,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -
is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(),
#[cfg(feature = "auth_usrpwd")]
auth_id: osyn_out.other_auth_id,
patch: state.transport.ext_patch.get(),
};

let a_config = TransportLinkUnicastConfig {
5 changes: 5 additions & 0 deletions io/zenoh-transport/src/unicast/establishment/cookie.rs
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ pub(crate) struct Cookie {
pub(crate) ext_lowlatency: ext::lowlatency::StateAccept,
#[cfg(feature = "transport_compression")]
pub(crate) ext_compression: ext::compression::StateAccept,
pub(crate) ext_patch: ext::patch::StateAccept,
}

impl<W> WCodec<&Cookie, &mut W> for Zenoh080
@@ -70,6 +71,7 @@ where
self.write(&mut *writer, &x.ext_lowlatency)?;
#[cfg(feature = "transport_compression")]
self.write(&mut *writer, &x.ext_compression)?;
self.write(&mut *writer, &x.ext_patch)?;

Ok(())
}
@@ -100,6 +102,7 @@ where
let ext_lowlatency: ext::lowlatency::StateAccept = self.read(&mut *reader)?;
#[cfg(feature = "transport_compression")]
let ext_compression: ext::compression::StateAccept = self.read(&mut *reader)?;
let ext_patch: ext::patch::StateAccept = self.read(&mut *reader)?;

let cookie = Cookie {
zid,
@@ -117,6 +120,7 @@ where
ext_lowlatency,
#[cfg(feature = "transport_compression")]
ext_compression,
ext_patch,
};

Ok(cookie)
@@ -188,6 +192,7 @@ impl Cookie {
ext_lowlatency: ext::lowlatency::StateAccept::rand(),
#[cfg(feature = "transport_compression")]
ext_compression: ext::compression::StateAccept::rand(),
ext_patch: ext::patch::StateAccept::rand(),
}
}
}
1 change: 1 addition & 0 deletions io/zenoh-transport/src/unicast/establishment/ext/mod.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ pub(crate) mod compression;
pub(crate) mod lowlatency;
#[cfg(feature = "transport_multilink")]
pub(crate) mod multilink;
pub(crate) mod patch;
pub(crate) mod qos;
#[cfg(feature = "shared-memory")]
pub(crate) mod shm;
203 changes: 203 additions & 0 deletions io/zenoh-transport/src/unicast/establishment/ext/patch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{cmp::min, marker::PhantomData};

use async_trait::async_trait;
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use zenoh_protocol::transport::init::ext::PatchType;
use zenoh_result::{bail, Error as ZError};

use crate::unicast::establishment::{AcceptFsm, OpenFsm};

// Extension Fsm
pub(crate) struct PatchFsm<'a> {
_a: PhantomData<&'a ()>,
}

impl PatchFsm<'_> {
pub(crate) const fn new() -> Self {
Self { _a: PhantomData }
}
}

/*************************************/
/* OPEN */
/*************************************/
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct StateOpen {
patch: PatchType,
}

impl StateOpen {
pub(crate) const fn new() -> Self {
Self {
patch: PatchType::NONE,
}
}

pub(crate) const fn get(&self) -> PatchType {
self.patch
}
}

#[async_trait]
impl<'a> OpenFsm for &'a PatchFsm<'a> {
type Error = ZError;

type SendInitSynIn = &'a StateOpen;
type SendInitSynOut = PatchType;
async fn send_init_syn(
self,
_state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
Ok(PatchType::CURRENT)
}

type RecvInitAckIn = (&'a mut StateOpen, PatchType);
type RecvInitAckOut = ();
async fn recv_init_ack(
self,
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (state, other_ext) = input;
if other_ext > PatchType::CURRENT {
bail!(
"Acceptor patch should be lesser or equal to {current:?}, found {other:?}",
current = PatchType::CURRENT.raw(),
other = other_ext.raw(),
);
}
state.patch = other_ext;
Ok(())
}

type SendOpenSynIn = &'a StateOpen;
type SendOpenSynOut = ();
async fn send_open_syn(
self,
_state: Self::SendOpenSynIn,
) -> Result<Self::SendOpenSynOut, Self::Error> {
unimplemented!("There is no patch extension in OPEN")
}

type RecvOpenAckIn = (&'a mut StateOpen, ());
type RecvOpenAckOut = ();
async fn recv_open_ack(
self,
_state: Self::RecvOpenAckIn,
) -> Result<Self::RecvOpenAckOut, Self::Error> {
unimplemented!("There is no patch extension in OPEN")
}
}

/*************************************/
/* ACCEPT */
/*************************************/
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct StateAccept {
patch: PatchType,
}

impl StateAccept {
pub(crate) const fn new() -> Self {
Self {
patch: PatchType::NONE,
}
}

pub(crate) const fn get(&self) -> PatchType {
self.patch
}

#[cfg(test)]
pub(crate) fn rand() -> Self {
Self {
patch: PatchType::rand(),
}
}
}

// Codec
impl<W> WCodec<&StateAccept, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output {
let raw = x.patch.raw();
self.write(&mut *writer, raw)?;
Ok(())
}
}

impl<R> RCodec<StateAccept, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<StateAccept, Self::Error> {
let raw: u8 = self.read(&mut *reader)?;
let patch = PatchType::new(raw);
Ok(StateAccept { patch })
}
}

#[async_trait]
impl<'a> AcceptFsm for &'a PatchFsm<'a> {
type Error = ZError;

type RecvInitSynIn = (&'a mut StateAccept, PatchType);
type RecvInitSynOut = ();
async fn recv_init_syn(
self,
input: Self::RecvInitSynIn,
) -> Result<Self::RecvInitSynOut, Self::Error> {
let (state, other_ext) = input;
state.patch = other_ext;
Ok(())
}

type SendInitAckIn = &'a StateAccept;
type SendInitAckOut = PatchType;
async fn send_init_ack(
self,
state: Self::SendInitAckIn,
) -> Result<Self::SendInitAckOut, Self::Error> {
Ok(min(PatchType::CURRENT, state.patch))
}

type RecvOpenSynIn = (&'a mut StateAccept, ());
type RecvOpenSynOut = ();
async fn recv_open_syn(
self,
_state: Self::RecvOpenSynIn,
) -> Result<Self::RecvOpenSynOut, Self::Error> {
unimplemented!("There is no patch extension in OPEN")
}

type SendOpenAckIn = &'a StateAccept;
type SendOpenAckOut = ();
async fn send_open_ack(
self,
_state: Self::SendOpenAckIn,
) -> Result<Self::SendOpenAckOut, Self::Error> {
unimplemented!("There is no patch extension in OPEN")
}
}
20 changes: 19 additions & 1 deletion io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ struct StateTransport {
#[cfg(feature = "shared-memory")]
ext_shm: ext::shm::StateOpen,
ext_lowlatency: ext::lowlatency::StateOpen,
ext_patch: ext::patch::StateOpen,
}

#[cfg(any(feature = "transport_auth", feature = "transport_compression"))]
@@ -124,6 +125,7 @@ struct OpenLink<'a> {
ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>,
#[cfg(feature = "transport_compression")]
ext_compression: ext::compression::CompressionFsm<'a>,
ext_patch: ext::patch::PatchFsm<'a>,
}

#[async_trait]
@@ -192,6 +194,13 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> {
None
);

// Extension Patch
let ext_patch = self
.ext_patch
.send_init_syn(&state.transport.ext_patch)
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

let msg: TransportMessage = InitSyn {
version: input.mine_version,
whatami: input.mine_whatami,
@@ -206,6 +215,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> {
ext_mlink,
ext_lowlatency,
ext_compression,
ext_patch,
}
.into();

@@ -347,6 +357,12 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> {
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

// Extension Patch
self.ext_patch
.recv_init_ack((&mut state.transport.ext_patch, init_ack.ext_patch))
.await
.map_err(|e| (e, Some(close::reason::GENERIC)))?;

let output = RecvInitAckOut {
other_zid: init_ack.zid,
other_whatami: init_ack.whatami,
@@ -575,6 +591,7 @@ pub(crate) async fn open_link(
ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(),
#[cfg(feature = "transport_compression")]
ext_compression: ext::compression::CompressionFsm::new(),
ext_patch: ext::patch::PatchFsm::new(),
};

// Clippy raises a warning because `batch_size::UNICAST` is currently equal to `BatchSize::MAX`.
@@ -599,8 +616,8 @@ pub(crate) async fn open_link(
.open(manager.config.unicast.max_links > 1),
#[cfg(feature = "shared-memory")]
ext_shm: ext::shm::StateOpen::new(),

ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency),
ext_patch: ext::patch::StateOpen::new(),
},
#[cfg(any(feature = "transport_auth", feature = "transport_compression"))]
link: StateLink {
@@ -669,6 +686,7 @@ pub(crate) async fn open_link(
is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(),
#[cfg(feature = "auth_usrpwd")]
auth_id: UsrPwdId(None),
patch: state.transport.ext_patch.get(),
};

let o_config = TransportLinkUnicastConfig {
3 changes: 2 additions & 1 deletion io/zenoh-transport/src/unicast/mod.rs
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ use zenoh_link::Link;
use zenoh_protocol::{
core::{Bits, WhatAmI, ZenohIdProto},
network::NetworkMessage,
transport::{close, TransportSn},
transport::{close, init::ext::PatchType, TransportSn},
};
use zenoh_result::{zerror, ZResult};

@@ -63,6 +63,7 @@ pub(crate) struct TransportConfigUnicast {
pub(crate) is_lowlatency: bool,
#[cfg(feature = "auth_usrpwd")]
pub(crate) auth_id: UsrPwdId,
pub(crate) patch: PatchType,
}

/// [`TransportUnicast`] is the transport handler returned
25 changes: 22 additions & 3 deletions io/zenoh-transport/src/unicast/universal/rx.rs
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

if !self.verify_sn(sn, &mut guard)? {
if !self.verify_sn("Frame", sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
@@ -123,6 +123,8 @@ impl TransportUnicastUniversal {
more,
sn,
ext_qos: qos,
ext_first,
ext_drop,
payload,
} = fragment;

@@ -143,10 +145,25 @@ impl TransportUnicastUniversal {
Reliability::BestEffort => zlock!(c.best_effort),
};

if !self.verify_sn(sn, &mut guard)? {
if !self.verify_sn("Fragment", sn, &mut guard)? {
// Drop invalid message and continue
return Ok(());
}
if self.config.patch.has_fragmentation_markers() {
if ext_first.is_some() {
guard.defrag.clear();
} else if guard.defrag.is_empty() {
tracing::trace!(
"Transport: {}. First fragment received without start marker.",
self.manager.config.zid,
);
return Ok(());
}
if ext_drop.is_some() {
guard.defrag.clear();
return Ok(());
}
}
if guard.defrag.is_empty() {
let _ = guard.defrag.sync(sn);
}
@@ -178,14 +195,16 @@ impl TransportUnicastUniversal {

fn verify_sn(
&self,
message_type: &str,
sn: TransportSn,
guard: &mut MutexGuard<'_, TransportChannelRx>,
) -> ZResult<bool> {
let precedes = guard.sn.roll(sn)?;
if !precedes {
tracing::trace!(
"Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.",
"Transport: {}. {} with invalid SN dropped: {}. Expected: {}.",
self.config.zid,
message_type,
sn,
guard.sn.next()
);
349 changes: 349 additions & 0 deletions io/zenoh-transport/tests/unicast_fragmentation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{
any::Any,
convert::TryFrom,
fmt::Write as _,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use lazy_static::lazy_static;
use zenoh_core::ztimeout;
use zenoh_link::Link;
use zenoh_protocol::{
core::{CongestionControl, Encoding, EndPoint, Priority, WhatAmI, ZenohIdProto},
network::{
push::ext::{NodeIdType, QoSType},
NetworkMessage, Push,
},
zenoh::Put,
};
use zenoh_result::ZResult;
use zenoh_transport::{
multicast::TransportMulticast,
unicast::{test_helpers::make_transport_manager_builder, TransportUnicast},
TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer,
TransportPeerEventHandler,
};

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const SLEEP_SEND: Duration = Duration::from_millis(1);

const MSG_COUNT: usize = 100;
lazy_static! {
#[derive(Debug)]
static ref MSG: NetworkMessage = Push {
wire_expr: "test".into(),
// Set CongestionControl::Drop to test
ext_qos: QoSType::new(Priority::DEFAULT, CongestionControl::Drop, false),
ext_tstamp: None,
ext_nodeid: NodeIdType::DEFAULT,
payload: Put {
// 10 MB payload to stress fragmentation
payload: (0..10_000_000).map(|b| b as u8).collect::<Vec<u8>>().into(),
timestamp: None,
encoding: Encoding::empty(),
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
}
.into(),
}
.into();
}

// Transport Handler for the router
struct SHRouter {
count: Arc<AtomicUsize>,
}

impl Default for SHRouter {
fn default() -> Self {
Self {
count: Arc::new(AtomicUsize::new(0)),
}
}
}

impl SHRouter {
fn get_count(&self) -> usize {
self.count.load(Ordering::SeqCst)
}
}

impl TransportEventHandler for SHRouter {
fn new_unicast(
&self,
_peer: TransportPeer,
_transport: TransportUnicast,
) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
let arc = Arc::new(SCRouter::new(self.count.clone()));
Ok(arc)
}

fn new_multicast(
&self,
_transport: TransportMulticast,
) -> ZResult<Arc<dyn TransportMulticastEventHandler>> {
panic!();
}
}

// Transport Callback for the router
pub struct SCRouter {
count: Arc<AtomicUsize>,
}

impl SCRouter {
pub fn new(count: Arc<AtomicUsize>) -> Self {
Self { count }
}
}

impl TransportPeerEventHandler for SCRouter {
fn handle_message(&self, message: NetworkMessage) -> ZResult<()> {
assert_eq!(message, *MSG);
self.count.fetch_add(1, Ordering::SeqCst);
std::thread::sleep(2 * SLEEP_SEND);
Ok(())
}

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
self
}
}

// Transport Handler for the client
#[derive(Default)]
struct SHClient;

impl TransportEventHandler for SHClient {
fn new_unicast(
&self,
_peer: TransportPeer,
_transport: TransportUnicast,
) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
Ok(Arc::new(SCClient))
}

fn new_multicast(
&self,
_transport: TransportMulticast,
) -> ZResult<Arc<dyn TransportMulticastEventHandler>> {
panic!();
}
}

// Transport Callback for the client
#[derive(Default)]
pub struct SCClient;

impl TransportPeerEventHandler for SCClient {
fn handle_message(&self, _message: NetworkMessage) -> ZResult<()> {
Ok(())
}

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
self
}
}

async fn open_transport_unicast(
client_endpoints: &[EndPoint],
server_endpoints: &[EndPoint],
) -> (
TransportManager,
Arc<SHRouter>,
TransportManager,
TransportUnicast,
) {
// Define client and router IDs
let client_id = ZenohIdProto::try_from([1]).unwrap();
let router_id = ZenohIdProto::try_from([2]).unwrap();

// Create the router transport manager
let router_handler = Arc::new(SHRouter::default());
let unicast = make_transport_manager_builder(
#[cfg(feature = "transport_multilink")]
server_endpoints.len(),
#[cfg(feature = "shared-memory")]
false,
false,
);
let router_manager = TransportManager::builder()
.zid(router_id)
.whatami(WhatAmI::Router)
.unicast(unicast)
.build(router_handler.clone())
.unwrap();

// Create the listener on the router
for e in server_endpoints.iter() {
println!("Add endpoint: {}", e);
let _ = ztimeout!(router_manager.add_listener(e.clone())).unwrap();
}

// Create the client transport manager
let unicast = make_transport_manager_builder(
#[cfg(feature = "transport_multilink")]
client_endpoints.len(),
#[cfg(feature = "shared-memory")]
false,
false,
);
let client_manager = TransportManager::builder()
.whatami(WhatAmI::Client)
.zid(client_id)
.unicast(unicast)
.build(Arc::new(SHClient))
.unwrap();

// Create an empty transport with the client
// Open transport -> This should be accepted
for e in client_endpoints.iter() {
println!("Opening transport with {}", e);
let _ = ztimeout!(client_manager.open_transport_unicast(e.clone())).unwrap();
}

let client_transport = ztimeout!(client_manager.get_transport_unicast(&router_id)).unwrap();

// Return the handlers
(
router_manager,
router_handler,
client_manager,
client_transport,
)
}

async fn close_transport(
router_manager: TransportManager,
client_manager: TransportManager,
client_transport: TransportUnicast,
endpoints: &[EndPoint],
) {
// Close the client transport
let mut ee = String::new();
for e in endpoints.iter() {
let _ = write!(ee, "{e} ");
}
println!("Closing transport with {}", ee);
ztimeout!(client_transport.close()).unwrap();

ztimeout!(async {
while !router_manager.get_transports_unicast().await.is_empty() {
tokio::time::sleep(SLEEP).await;
}
});

// Stop the locators on the manager
for e in endpoints.iter() {
println!("Del locator: {}", e);
ztimeout!(router_manager.del_listener(e)).unwrap();
}

ztimeout!(async {
while !router_manager.get_listeners().await.is_empty() {
tokio::time::sleep(SLEEP).await;
}
});

// Wait a little bit
tokio::time::sleep(SLEEP).await;

ztimeout!(router_manager.close());
ztimeout!(client_manager.close());

// Wait a little bit
tokio::time::sleep(SLEEP).await;
}

async fn test_transport(router_handler: Arc<SHRouter>, client_transport: TransportUnicast) {
println!("Sending {} messages...", MSG_COUNT);

ztimeout!(async {
let mut sent = 0;
while router_handler.get_count() < MSG_COUNT {
if client_transport.schedule(MSG.clone()).is_ok() {
sent += 1;
println!(
"Sent: {sent}. Received: {}/{MSG_COUNT}",
router_handler.get_count()
);
}
}
});

// Wait a little bit
tokio::time::sleep(SLEEP).await;
}

async fn run_single(client_endpoints: &[EndPoint], server_endpoints: &[EndPoint]) {
println!(
"\n>>> Running test for: {:?}, {:?}",
client_endpoints, server_endpoints,
);

#[allow(unused_variables)] // Used when stats feature is enabled
let (router_manager, router_handler, client_manager, client_transport) =
open_transport_unicast(client_endpoints, server_endpoints).await;

test_transport(router_handler.clone(), client_transport.clone()).await;

#[cfg(feature = "stats")]
{
let c_stats = client_transport.get_stats().unwrap().report();
println!("\tClient: {:?}", c_stats);
let r_stats = ztimeout!(router_manager.get_transport_unicast(&client_manager.config.zid))
.unwrap()
.get_stats()
.map(|s| s.report())
.unwrap();
println!("\tRouter: {:?}", r_stats);
}

close_transport(
router_manager,
client_manager,
client_transport,
client_endpoints,
)
.await;
}

#[cfg(feature = "transport_tcp")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn fragmentation_unicast_tcp_only() {
zenoh_util::init_log_from_env_or("error");

// Define the locators
let endpoints: Vec<EndPoint> = vec![format!("tcp/127.0.0.1:{}", 16800).parse().unwrap()];
// Run
run_single(&endpoints, &endpoints).await;
}

0 comments on commit 64e8caa

Please sign in to comment.