Skip to content

Commit

Permalink
Fix large transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 13, 2024
1 parent b1c3d88 commit e665291
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 15 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
env_logger = "0.11"
rand = "0.8"
ntex = { version = "1", features = ["tokio"] }
ntex-amqp = { path = ".", features = ["frame-trace"] }

[patch.crates-io]
ntex-amqp = { path = "." }
Expand Down
6 changes: 0 additions & 6 deletions src/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,6 @@ impl DeliveryBuilder {

if let Some(ref err) = inner.error {
Err(err.clone())
} else if inner
.max_message_size
.map(|l| self.data.len() > l as usize)
.unwrap_or_default()
{
Err(AmqpProtocolError::BodyTooLarge)
} else {
let id = self
.sender
Expand Down
14 changes: 5 additions & 9 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1278,24 +1278,20 @@ impl SessionInner {
);

loop {
let chunk = body.split_to(std::cmp::min(max_frame_size, body.len()));

// last chunk
if body.is_empty() {
log::trace!("{}: Sending last tranfer for {:?}", self.tag(), tag);

let mut transfer = Transfer(Default::default());
transfer.0.more = false;
self.post_frame(Frame::Transfer(transfer));
log::trace!("{}: Last tranfer for {:?} is sent", self.tag(), tag);
break;
}

log::trace!("{}: Sending chunk tranfer for {:?}", self.tag(), tag);
let chunk = body.split_to(std::cmp::min(max_frame_size, body.len()));

log::trace!("{}: Sending chunk tranfer for {:?}", self.tag(), tag);
let mut transfer = Transfer(Default::default());
transfer.0.delivery_id = Some(delivery_id);
transfer.0.handle = link_handle;
transfer.0.body = Some(TransferBody::Data(chunk));
transfer.0.more = true;
transfer.0.more = !body.is_empty();
transfer.0.batchable = true;
self.post_frame(Frame::Transfer(transfer));
}
Expand Down
66 changes: 66 additions & 0 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use ntex::{http::Uri, rt, time::sleep, time::Millis};
use ntex_amqp::{
client, codec::protocol, error::LinkError, server, types, ControlFrame, ControlFrameKind,
};
use rand::{distributions::Alphanumeric, thread_rng, Rng};

async fn server(
_link: types::Link<()>,
Expand Down Expand Up @@ -91,6 +92,71 @@ async fn test_simple() -> std::io::Result<()> {
Ok(())
}

#[ntex::test]
async fn test_large_transfer() -> std::io::Result<()> {
let _ = env_logger::init();

let mut rng = thread_rng();
let data: String = (0..2048)
.map(|_| rng.sample(Alphanumeric) as char)
.collect();

let count = Arc::new(AtomicUsize::new(0));
let count2 = count.clone();
let srv = test_server(move || {
let count = count2.clone();
server::Server::build(|con: server::Handshake| async move {
match con {
server::Handshake::Amqp(con) => {
let con = con.open().await.unwrap();
Ok(con.ack(()))
}
server::Handshake::Sasl(_) => Err(()),
}
})
.control(|msg: ControlFrame| async move {
if let ControlFrameKind::AttachReceiver(_, rcv) = msg.kind() {
rcv.set_max_message_size(1024);
}
Ok::<_, ()>(())
})
.finish(
server::Router::<()>::new()
.service(
"test",
fn_factory_with_config(move |_: types::Link<()>| server_count(count.clone())),
)
.finish(),
)
});

let uri = Uri::try_from(format!("amqp://{}:{}", srv.addr().ip(), srv.addr().port())).unwrap();
let client = client::Connector::new().connect(uri).await.unwrap();
let sink = client.sink();
ntex::rt::spawn(async move {
let _ = client.start_default().await;
});

let session = sink.open_session().await.unwrap();
let link = session
.build_sender_link("test", "test")
.attach()
.await
.unwrap();

let delivery = link
.delivery(Bytes::from(data.clone()))
.send()
.await
.unwrap();
let st = delivery.wait().await.unwrap().unwrap();
assert_eq!(st, protocol::DeliveryState::Accepted(protocol::Accepted {}));
sleep(Millis(250)).await;

assert_eq!(count.load(Ordering::Relaxed), 1);
Ok(())
}

async fn sasl_auth(auth: server::Sasl) -> Result<server::HandshakeAck<()>, server::HandshakeError> {
let init = auth
.mechanism("PLAIN")
Expand Down

0 comments on commit e665291

Please sign in to comment.