diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index d8da89b..3a9b248 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -62,9 +62,11 @@ jobs: - name: Upload to Codecov if: matrix.version == '1.75.0' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') continue-on-error: true - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v4 with: - file: cobertura.xml + token: ${{ secrets.CODECOV_TOKEN }} + files: cobertura.xml + fail_ci_if_error: true - name: Install cargo-cache continue-on-error: true diff --git a/.gitignore b/.gitignore index 42d0755..11a3b5f 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ guide/build/ *.pid *.sock *~ +.DS_Store # These are backup files generated by rustfmt **/*.rs.bk diff --git a/CHANGES.md b/CHANGES.md index 8302468..0265302 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.1.3] - 2024-04-11 + +* Handle settled transfers + ## [2.1.2] - 2024-03-17 * Set transfer handle diff --git a/Cargo.toml b/Cargo.toml index 626e6fa..172a6ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ categories = ["network-programming"] keywords = ["AMQP", "IoT", "messaging"] license = "MIT OR Apache-2.0" exclude = [".gitignore", ".travis.yml", ".cargo/config"] -edition = "2018" +edition = "2021" [workspace] members = [ @@ -24,7 +24,7 @@ default = [] frame-trace = [] [dependencies] -ntex = "1.0" +ntex = "1" ntex-util = "1.0.1" ntex-amqp-codec = "0.9" @@ -37,7 +37,7 @@ uuid = { version = "1", features = ["v4"] } [dev-dependencies] env_logger = "0.11" -ntex = { version = "1.0", features = ["tokio"] } +ntex = { version = "1", features = ["tokio"] } [patch.crates-io] ntex-amqp = { path = "." } diff --git a/src/delivery.rs b/src/delivery.rs index eb3d4bb..e2668cb 100644 --- a/src/delivery.rs +++ b/src/delivery.rs @@ -134,6 +134,10 @@ impl Delivery { } pub async fn wait(&self) -> Result, AmqpProtocolError> { + if self.flags.get().contains(Flags::LOCAL_SETTLED) { + return Ok(None); + } + let rx = if let Some(inner) = self .session .inner @@ -300,7 +304,11 @@ impl DeliveryBuilder { { Err(AmqpProtocolError::BodyTooLarge) } else { - let id = self.sender.get_mut().send(self.data, self.tag).await?; + let id = self + .sender + .get_mut() + .send(self.data, self.tag, self.settled) + .await?; Ok(Delivery { id, diff --git a/src/session.rs b/src/session.rs index fc40612..a06183c 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1253,14 +1253,15 @@ impl SessionInner { transfer.0.handle = link_handle; transfer.0.body = Some(TransferBody::Data(chunk)); transfer.0.more = true; - transfer.0.settled = Some(settled); transfer.0.state = tr_settled; transfer.0.batchable = true; transfer.0.delivery_id = Some(delivery_id); transfer.0.delivery_tag = Some(tag.clone()); transfer.0.message_format = message_format; - if !settled { + if settled { + transfer.0.settled = Some(true); + } else { self.unsettled_snd_deliveries .insert(delivery_id, DeliveryInner::new()); } @@ -1302,13 +1303,14 @@ impl SessionInner { let mut transfer = Transfer(Default::default()); transfer.0.handle = link_handle; transfer.0.body = Some(body); - transfer.0.settled = Some(settled); transfer.0.state = tr_settled; transfer.0.delivery_id = Some(delivery_id); transfer.0.delivery_tag = Some(tag); transfer.0.message_format = message_format; - if !settled { + if settled { + transfer.0.settled = Some(true); + } else { self.unsettled_snd_deliveries .insert(delivery_id, DeliveryInner::new()); } diff --git a/src/sndlink.rs b/src/sndlink.rs index baaf2a2..9de042e 100644 --- a/src/sndlink.rs +++ b/src/sndlink.rs @@ -320,6 +320,7 @@ impl SenderLinkInner { &mut self, body: T, tag: Option, + settled: bool, ) -> Result { if let Some(ref err) = self.error { Err(err.clone()) @@ -353,7 +354,7 @@ impl SenderLinkInner { self.session .inner .get_mut() - .send_transfer(self.id as u32, tag, body, false, self.max_message_size) + .send_transfer(self.id as u32, tag, body, settled, self.max_message_size) .await } } diff --git a/tests/test_server.rs b/tests/test_server.rs index bb4ba1f..67aec48 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1,4 +1,5 @@ -use std::{convert::TryFrom, sync::Arc, sync::Mutex}; +use std::convert::TryFrom; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc, Mutex}; use ntex::server::test_server; use ntex::service::{boxed, boxed::BoxService, fn_factory_with_config, fn_service}; @@ -9,17 +10,30 @@ use ntex_amqp::{ }; async fn server( - link: types::Link<()>, + _link: types::Link<()>, ) -> Result, LinkError> { - println!("OPEN LINK: {:?}", link); Ok(boxed::service(fn_service(|_req| { Ready::Ok(types::Outcome::Accept) }))) } +async fn server_count( + count: Arc, +) -> Result, LinkError> { + Ok(boxed::service(fn_service(move |_req| { + let val = count.load(Ordering::Relaxed); + count.store(val + 1, Ordering::Release); + Ready::Ok(types::Outcome::Accept) + }))) +} + #[ntex::test] async fn test_simple() -> std::io::Result<()> { - let srv = test_server(|| { + let count = Arc::new(AtomicUsize::new(0)); + + let count2 = count.clone(); + let srv = test_server(move || { + let count = count2.clone(); server::Server::build(|con: server::Handshake| async move { match con { server::Handshake::Amqp(con) => { @@ -31,7 +45,10 @@ async fn test_simple() -> std::io::Result<()> { }) .finish( server::Router::<()>::new() - .service("test", fn_factory_with_config(server)) + .service( + "test", + fn_factory_with_config(move |_: types::Link<()>| server_count(count.clone())), + ) .finish(), ) }); @@ -60,6 +77,17 @@ async fn test_simple() -> std::io::Result<()> { let st = delivery.wait().await.unwrap().unwrap(); assert_eq!(st, protocol::DeliveryState::Accepted(protocol::Accepted {})); + let delivery = link + .delivery(Bytes::from(b"test".as_ref())) + .settled() + .send() + .await + .unwrap(); + let st = delivery.wait().await.unwrap(); + assert_eq!(st, None); + sleep(Millis(250)).await; + + assert_eq!(count.load(Ordering::Relaxed), 2); Ok(()) } @@ -194,8 +222,6 @@ async fn test_session_end() -> std::io::Result<()> { #[ntex::test] async fn test_link_detach() -> std::io::Result<()> { - let _ = env_logger::try_init(); - let srv = test_server(move || { server::Server::build(|con: server::Handshake| async move { match con { @@ -267,8 +293,6 @@ async fn test_link_detach() -> std::io::Result<()> { #[ntex::test] async fn test_link_detach_on_session_end() -> std::io::Result<()> { - let _ = env_logger::try_init(); - let srv = test_server(move || { server::Server::build(|con: server::Handshake| async move { match con { @@ -322,8 +346,6 @@ async fn test_link_detach_on_session_end() -> std::io::Result<()> { #[ntex::test] async fn test_link_detach_on_disconnect() -> std::io::Result<()> { - let _ = env_logger::try_init(); - let srv = test_server(move || { server::Server::build(|con: server::Handshake| async move { match con {