Skip to content

Commit

Permalink
Merge pull request #29 from OpenMatchmaking/feature-ack-message
Browse files Browse the repository at this point in the history
Acking published messages
  • Loading branch information
Relrin authored Jan 24, 2019
2 parents 13b725a + 9628335 commit 90bc5da
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
15 changes: 11 additions & 4 deletions pathfinder/src/engine/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use lapin_futures_rustls::lapin::channel::{
QueueDeclareOptions, QueueDeleteOptions, QueueUnbindOptions,
};
use lapin_futures_rustls::lapin::types::{AMQPValue, FieldTable};
use log::error;
use log::{error, info, warn};

use crate::error::PathfinderError;
use crate::rabbitmq::{RabbitMQContext};
Expand Down Expand Up @@ -99,7 +99,14 @@ pub fn rpc_request_future(
publish_message_options,
basic_properties
)
.map(move |_confirmation| (publish_channel, consume_channel, queue, options))
.map(move |confirmation| {
match confirmation {
Some(_) => info!("Publish message got confirmation."),
None => warn!("Request wasn't delivered."),
};

(publish_channel, consume_channel, queue, options)
})
})
// 4. Consume a response message from the queue, that was declared on the 1st step
.and_then(move |(publish_channel, consume_channel, queue, options)| {
Expand All @@ -119,7 +126,7 @@ pub fn rpc_request_future(
})
})
// 5. Prepare a response for a client, serialize and sent via WebSocket transmitter
.and_then(move |(_publish_channel, consume_channel, queue, message, options)| {
.and_then(move |(publish_channel, consume_channel, queue, message, options)| {
let raw_data = from_utf8(&message.data).unwrap();
let json = Arc::new(Box::new(json_parse(raw_data).unwrap()));
let serializer = Serializer::new();
Expand All @@ -129,7 +136,7 @@ pub fn rpc_request_future(

consume_channel
.basic_ack(message.delivery_tag, false)
.map(move |_confirmation| (_publish_channel, consume_channel, queue, options))
.map(move |_confirmation| (publish_channel, consume_channel, queue, options))
})
// 6. Unbind the response queue from the exchange point
.and_then(move |(publish_channel, consume_channel, _queue, options)| {
Expand Down
20 changes: 17 additions & 3 deletions pathfinder/src/engine/middleware/jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use lapin_futures_rustls::lapin::channel::{
QueueDeclareOptions, QueueDeleteOptions, QueueUnbindOptions,
};
use lapin_futures_rustls::lapin::types::{AMQPValue, FieldTable};
use log::error;
use log::{error, info, warn};
use uuid::Uuid;

use crate::error::PathfinderError;
Expand Down Expand Up @@ -121,7 +121,14 @@ impl JwtTokenMiddleware {
publish_message_options,
basic_properties
)
.map(move |_confirmation| (publish_channel, consume_channel, queue, options))
.map(move |confirmation| {
match confirmation {
Some(_) => info!("Publish for verifying JWT got confirmation."),
None => warn!("Request for verifying JWT wasn't delivered."),
};

(publish_channel, consume_channel, queue, options)
})
})
// 4. Consume a response message from the queue, that was declared on the 2nd step
.and_then(move |(publish_channel, consume_channel, queue, options)| {
Expand Down Expand Up @@ -284,7 +291,14 @@ impl JwtTokenMiddleware {
publish_message_options,
basic_properties
)
.map(move |_confirmation| (publish_channel, consume_channel, queue, options))
.map(move |confirmation| {
match confirmation {
Some(_) => info!("Publish for getting headers got confirmation."),
None => warn!("Request for getting headers wasn't delivered."),
};

(publish_channel, consume_channel, queue, options)
})
})
// 4. Consume a response message from the queue, that was declared on the 2nd step
.and_then(move |(publish_channel, consume_channel, queue, options)| {
Expand Down
2 changes: 1 addition & 1 deletion pathfinder/src/rabbitmq/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn get_uri(cli: &CliOptions) -> AMQPUri {
false => "amqp",
};
format!(
"{}://{}:{}@{}:{}/{}",
"{}://{}:{}@{}:{}/{}?heartbeat=10",
schema.to_string(),
cli.rabbitmq_username.clone(),
cli.rabbitmq_password.clone(),
Expand Down

0 comments on commit 90bc5da

Please sign in to comment.