From 201813a49f7018794e34f21d41e7691ec567b041 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 9 Apr 2024 22:41:27 +0500 Subject: [PATCH 1/5] Handle settled attribute --- src/delivery.rs | 6 +++++- src/session.rs | 10 ++++++---- src/sndlink.rs | 3 ++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/delivery.rs b/src/delivery.rs index eb3d4bb..f9f7288 100644 --- a/src/delivery.rs +++ b/src/delivery.rs @@ -300,7 +300,11 @@ impl DeliveryBuilder { { Err(AmqpProtocolError::BodyTooLarge) } else { - let id = self.sender.get_mut().send(self.data, self.tag).await?; + let id = self + .sender + .get_mut() + .send(self.data, self.tag, self.settled) + .await?; Ok(Delivery { id, diff --git a/src/session.rs b/src/session.rs index fc40612..a06183c 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1253,14 +1253,15 @@ impl SessionInner { transfer.0.handle = link_handle; transfer.0.body = Some(TransferBody::Data(chunk)); transfer.0.more = true; - transfer.0.settled = Some(settled); transfer.0.state = tr_settled; transfer.0.batchable = true; transfer.0.delivery_id = Some(delivery_id); transfer.0.delivery_tag = Some(tag.clone()); transfer.0.message_format = message_format; - if !settled { + if settled { + transfer.0.settled = Some(true); + } else { self.unsettled_snd_deliveries .insert(delivery_id, DeliveryInner::new()); } @@ -1302,13 +1303,14 @@ impl SessionInner { let mut transfer = Transfer(Default::default()); transfer.0.handle = link_handle; transfer.0.body = Some(body); - transfer.0.settled = Some(settled); transfer.0.state = tr_settled; transfer.0.delivery_id = Some(delivery_id); transfer.0.delivery_tag = Some(tag); transfer.0.message_format = message_format; - if !settled { + if settled { + transfer.0.settled = Some(true); + } else { self.unsettled_snd_deliveries .insert(delivery_id, DeliveryInner::new()); } diff --git a/src/sndlink.rs b/src/sndlink.rs index baaf2a2..9de042e 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -320,6 +320,7 @@ impl SenderLinkInner { &mut self, body: T, tag: Option, + settled: bool, ) -> Result { if let Some(ref err) = self.error { Err(err.clone()) @@ -353,7 +354,7 @@ impl SenderLinkInner { self.session .inner .get_mut() - .send_transfer(self.id as u32, tag, body, false, self.max_message_size) + .send_transfer(self.id as u32, tag, body, settled, self.max_message_size) .await } } From 548627de0245c39750eaad328295b1c66d61110d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 10 Apr 2024 00:10:25 +0500 Subject: [PATCH 2/5] wip --- src/delivery.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/delivery.rs b/src/delivery.rs index f9f7288..f4dbcdb 100644 --- a/src/delivery.rs +++ b/src/delivery.rs @@ -134,6 +134,10 @@ impl Delivery { } pub async fn wait(&self) -> Result, AmqpProtocolError> { + if self.0.flags.get().contains(Flags::LOCAL_SETTLED) { + return Ok(None); + } + let rx = if let Some(inner) = self .session .inner From 508926c6072274b398ece8a67aab79da0edc03c0 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 10 Apr 2024 00:14:11 +0500 Subject: [PATCH 3/5] wip --- .github/workflows/linux.yml | 6 ++++-- CHANGES.md | 4 ++++ Cargo.toml | 4 ++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index d8da89b..3a9b248 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -62,9 +62,11 @@ jobs: - name: Upload to Codecov if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') continue-on-error: true - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v4 with: - file: cobertura.xml + token: ${{ secrets.CODECOV_TOKEN }} + files: cobertura.xml + fail_ci_if_error: true - name: Install cargo-cache continue-on-error: true diff --git a/CHANGES.md b/CHANGES.md index 8302468..0265302 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.1.3] - 2024-04-11 + +* Handle settled transfers + ## [2.1.2] - 2024-03-17 * Set transfer handle diff --git a/Cargo.toml b/Cargo.toml index 626e6fa..c033897 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ default = [] frame-trace = [] [dependencies] -ntex = "1.0" +ntex = "1" ntex-util = "1.0.1" ntex-amqp-codec = "0.9" @@ -37,7 +37,7 @@ uuid = { version = "1", features = ["v4"] } [dev-dependencies] env_logger = "0.11" -ntex = { version = "1.0", features = ["tokio"] } +ntex = { version = "1", features = ["tokio"] } [patch.crates-io] ntex-amqp = { path = "." } From 2473e6622cdd21ea447266e4bc728624629c714e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 10 Apr 2024 00:15:50 +0500 Subject: [PATCH 4/5] wip --- src/delivery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/delivery.rs b/src/delivery.rs index f4dbcdb..e2668cb 100644 --- a/src/delivery.rs +++ b/src/delivery.rs @@ -134,7 +134,7 @@ impl Delivery { } pub async fn wait(&self) -> Result, AmqpProtocolError> { - if self.0.flags.get().contains(Flags::LOCAL_SETTLED) { + if self.flags.get().contains(Flags::LOCAL_SETTLED) { return Ok(None); } From 8892309c5ce4976b936054e942d8b6e42ba15426 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 10 Apr 2024 16:30:43 +0500 Subject: [PATCH 5/5] tests --- .gitignore | 1 + Cargo.toml | 2 +- tests/test_server.rs | 44 +++++++++++++++++++++++++++++++++----------- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 42d0755..11a3b5f 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ guide/build/ *.pid *.sock *~ +.DS_Store # These are backup files generated by rustfmt **/*.rs.bk diff --git a/Cargo.toml b/Cargo.toml index c033897..172a6ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ categories = ["network-programming"] keywords = ["AMQP", "IoT", "messaging"] license = "MIT OR Apache-2.0" exclude = [".gitignore", ".travis.yml", ".cargo/config"] -edition = "2018" +edition = "2021" [workspace] members = [ diff --git a/tests/test_server.rs b/tests/test_server.rs index bb4ba1f..67aec48 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1,4 +1,5 @@ -use std::{convert::TryFrom, sync::Arc, sync::Mutex}; +use std::convert::TryFrom; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex}; use ntex::server::test_server; use ntex::service::{boxed, boxed::BoxService, fn_factory_with_config, fn_service}; @@ -9,17 +10,30 @@ use ntex_amqp::{ }; async fn server( - link: types::Link<()>, + _link: types::Link<()>, ) -> Result, LinkError> { - println!("OPEN LINK: {:?}", link); Ok(boxed::service(fn_service(|_req| { Ready::Ok(types::Outcome::Accept) }))) } +async fn server_count( + count: Arc, +) -> Result, LinkError> { + Ok(boxed::service(fn_service(move |_req| { + let val = count.load(Ordering::Relaxed); + count.store(val + 1, Ordering::Release); + Ready::Ok(types::Outcome::Accept) + }))) +} + #[ntex::test] async fn test_simple() -> std::io::Result<()> { - let srv = test_server(|| { + let count = Arc::new(AtomicUsize::new(0)); + + let count2 = count.clone(); + let srv = test_server(move || { + let count = count2.clone(); server::Server::build(|con: server::Handshake| async move { match con { server::Handshake::Amqp(con) => { @@ -31,7 +45,10 @@ async fn test_simple() -> std::io::Result<()> { }) .finish( server::Router::<()>::new() - .service("test", fn_factory_with_config(server)) + .service( + "test", + fn_factory_with_config(move |_: types::Link<()>| server_count(count.clone())), + ) .finish(), ) }); @@ -60,6 +77,17 @@ async fn test_simple() -> std::io::Result<()> { let st = delivery.wait().await.unwrap().unwrap(); assert_eq!(st, protocol::DeliveryState::Accepted(protocol::Accepted {})); + let delivery = link + .delivery(Bytes::from(b"test".as_ref())) + .settled() + .send() + .await + .unwrap(); + let st = delivery.wait().await.unwrap(); + assert_eq!(st, None); + sleep(Millis(250)).await; + + assert_eq!(count.load(Ordering::Relaxed), 2); Ok(()) } @@ -194,8 +222,6 @@ async fn test_session_end() -> std::io::Result<()> { #[ntex::test] async fn test_link_detach() -> std::io::Result<()> { - let _ = env_logger::try_init(); - let srv = test_server(move || { server::Server::build(|con: server::Handshake| async move { match con { @@ -267,8 +293,6 @@ async fn test_link_detach() -> std::io::Result<()> { #[ntex::test] async fn test_link_detach_on_session_end() -> std::io::Result<()> { - let _ = env_logger::try_init(); - let srv = test_server(move || { server::Server::build(|con: server::Handshake| async move { match con { @@ -322,8 +346,6 @@ async fn test_link_detach_on_session_end() -> std::io::Result<()> { #[ntex::test] async fn test_link_detach_on_disconnect() -> std::io::Result<()> { - let _ = env_logger::try_init(); - let srv = test_server(move || { server::Server::build(|con: server::Handshake| async move { match con {