Skip to content

Commit

Permalink
Fix receiver delivery queue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 16, 2024
1 parent 7f968a5 commit 00bb769
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.1.5] - 2024-04-17

* Fix receiver's delivery queue handling

## [2.1.4] - 2024-04-13

* Fix large transfers handling
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-amqp"
version = "2.1.4"
version = "2.1.5"
authors = ["ntex contributors <[email protected]>"]
description = "AMQP 1.0 Client/Server framework"
documentation = "https://docs.rs/ntex-amqp"
Expand Down
16 changes: 12 additions & 4 deletions src/rcvlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,22 @@ impl ReceiverLink {

/// Check deliveries
pub fn has_deliveries(&self) -> bool {
!self.inner.get_mut().queue.is_empty()
let inner = self.inner.get_ref();
if inner.partial_body.is_none() {
!inner.queue.is_empty()

Check warning on line 110 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L108-L110

Added lines #L108 - L110 were not covered by tests
} else {
inner.queue.len() > 1

Check warning on line 112 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L112

Added line #L112 was not covered by tests
}
}

/// Get delivery
pub fn get_delivery(&self) -> Option<(Delivery, Transfer)> {
self.inner.get_mut().queue.pop_front()
let inner = self.inner.get_mut();
if inner.partial_body.is_none() || inner.queue.len() > 1 {
inner.queue.pop_front()
} else {
None

Check warning on line 122 in src/rcvlink.rs

View check run for this annotation

Codecov / codecov/patch

src/rcvlink.rs#L122

Added line #L122 was not covered by tests
}
}

/// Send disposition frame
Expand Down Expand Up @@ -295,8 +305,6 @@ impl ReceiverLinkInner {
self.credit -= 1;
}

println!("============= {:#?}\n{:?}", transfer, self.partial_body);

// handle batched transfer
if let Some(ref mut body) = self.partial_body {
if transfer.0.delivery_id.is_some() {
Expand Down

0 comments on commit 00bb769

Please sign in to comment.