Skip to content

Commit

Permalink
Merge pull request #28 from OpenMatchmaking/feature-reuse-channels
Browse files Browse the repository at this point in the history
Reusing channels for publishing and consuming messages
  • Loading branch information
Relrin authored Jan 23, 2019
2 parents 0f46266 + a21230f commit 13b725a
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 179 deletions.
2 changes: 1 addition & 1 deletion pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pathfinder"
version = "0.1.0"
version = "1.0.0"
authors = ["Valeryi Savich <[email protected]>"]
repository = "https://github.com/OpenMatchmaking/pathfinder"
readme = "../README.md"
Expand Down
15 changes: 7 additions & 8 deletions pathfinder/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use uuid::Uuid;
use crate::cli::CliOptions;
use crate::config::get_config;
use crate::error::{Result, PathfinderError};
use crate::rabbitmq::RabbitMQClient;
use crate::rabbitmq::RabbitMQContext;
use super::middleware::{
CustomUserHeaders, EmptyMiddleware, JwtTokenMiddleware, Middleware,
MiddlewareFuture
Expand Down Expand Up @@ -62,7 +62,7 @@ impl Engine {
&self,
message: Message,
transmitter: MessageSender,
rabbitmq_client: Arc<RabbitMQClient>
rabbitmq_context: Arc<RabbitMQContext>
) -> Box<Future<Item=(), Error=PathfinderError> + Send + Sync + 'static> {
// 1. Deserialize message into JSON
let json_message = match deserialize_message(&message) {
Expand All @@ -80,14 +80,14 @@ impl Engine {
// 3. Instantiate futures that will be processing client credentials and a request
let default_headers = self.generate_default_headers(&json_message.clone(), endpoint.clone());
let transmitter_inner = transmitter.clone();
let rabbitmq_client_inner = rabbitmq_client.clone();
let rabbitmq_context_inner = rabbitmq_context.clone();
let rpc_options = Arc::new(RpcOptions::default()
.with_endpoint(endpoint.clone())
.with_message(json_message.clone())
.with_queue_name(Arc::new(format!("{}", Uuid::new_v4()))
));

let middleware_future = self.get_middleware_future(json_message.clone(), endpoint.clone(), rabbitmq_client.clone());
let middleware_future = self.get_middleware_future(json_message.clone(), endpoint.clone(), rabbitmq_context.clone());
Box::new(
middleware_future.and_then(move |custom_headers: CustomUserHeaders| {
let mut request_headers = default_headers.clone();
Expand All @@ -98,7 +98,7 @@ impl Engine {
}
rpc_request_future(
transmitter_inner.clone(),
rabbitmq_client_inner.clone(),
rabbitmq_context_inner.clone(),
rpc_options.clone(),
request_headers.clone()
)
Expand All @@ -117,11 +117,10 @@ impl Engine {
&self,
json_message: JsonMessage,
endpoint: ReadOnlyEndpoint,
rabbitmq_client: Arc<RabbitMQClient>
rabbitmq_context: Arc<RabbitMQContext>
) -> MiddlewareFuture {
let middleware = self.get_middleware_by_endpoint(endpoint);
let rabbitmq_client_local = rabbitmq_client.clone();
middleware.process_request(json_message, rabbitmq_client_local)
middleware.process_request(json_message, rabbitmq_context.clone())
}

/// Returns a middleware that matches to the passed endpoint
Expand Down
92 changes: 43 additions & 49 deletions pathfinder/src/engine/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use lapin_futures_rustls::lapin::types::{AMQPValue, FieldTable};
use log::error;

use crate::error::PathfinderError;
use crate::rabbitmq::RabbitMQClient;
use crate::rabbitmq::{RabbitMQContext};
use crate::engine::MessageSender;
use crate::engine::options::RpcOptions;
use crate::engine::serializer::Serializer;
Expand All @@ -27,48 +27,46 @@ use crate::engine::serializer::Serializer;
/// response to the caller via transmitter.
pub fn rpc_request_future(
transmitter: MessageSender,
rabbitmq_client: Arc<RabbitMQClient>,
rabbitmq_context: Arc<RabbitMQContext>,
options: Arc<RpcOptions>,
headers: HashMap<String, String>
) -> Box<Future<Item=(), Error=PathfinderError> + Send + Sync + 'static> {
let rabbitmq_client_local = rabbitmq_client.clone();
let rabbitmq_context_local = rabbitmq_context.clone();
let publish_channel = rabbitmq_context_local.get_publish_channel();
let consume_channel = rabbitmq_context_local.get_consume_channel();

Box::new(
// 1. Create a channel
rabbitmq_client_local.get_channel()
// 2. Declare a response queue
.and_then(move |channel| {
let queue_name = options.get_queue_name().unwrap().clone();
let queue_declare_options = QueueDeclareOptions {
passive: false,
durable: true,
exclusive: true,
auto_delete: false,
..Default::default()
};
let queue_name = options.get_queue_name().unwrap().clone();
let queue_declare_options = QueueDeclareOptions {
passive: false,
durable: true,
exclusive: true,
auto_delete: false,
..Default::default()
};

channel
.queue_declare(&queue_name, queue_declare_options, FieldTable::new())
.map(move |queue| (channel, queue, options))
})
// 3. Link the response queue the exchange
.and_then(move |(channel, queue, options)| {
Box::new(
// 1. Declare a response queue
consume_channel
.queue_declare(&queue_name, queue_declare_options, FieldTable::new())
.map(move |queue| (publish_channel, consume_channel, queue, options))
// 2. Link the response queue the exchange
.and_then(move |(publish_channel, consume_channel, queue, options)| {
let queue_name = options.get_queue_name().unwrap().clone();
let endpoint = options.get_endpoint().unwrap().clone();
let routing_key = options.get_queue_name().unwrap().clone();

channel
consume_channel
.queue_bind(
&queue_name,
&endpoint.get_response_exchange(),
&routing_key,
QueueBindOptions::default(),
FieldTable::new()
)
.map(move |_| (channel, queue, options))
.map(move |_| (publish_channel, consume_channel, queue, options))
})
// 4. Publish message into the microservice queue and make ensure that it's delivered
.and_then(move |(channel, queue, options)| {
// 3. Publish message into the microservice queue and make ensure that it's delivered
.and_then(move |(publish_channel, consume_channel, queue, options)| {
let publish_message_options = BasicPublishOptions {
mandatory: true,
immediate: false,
Expand All @@ -93,19 +91,19 @@ pub fn rpc_request_future(
.with_reply_to(queue_name_response.to_string()) // Response queue
.with_correlation_id(event_name.clone().to_string()); // Event name

channel
publish_channel
.basic_publish(
&endpoint.get_request_exchange(),
&endpoint.get_routing_key(),
message["content"].dump().as_bytes().to_vec(),
publish_message_options,
basic_properties
)
.map(move |_confirmation| (channel, queue, options))
.map(move |_confirmation| (publish_channel, consume_channel, queue, options))
})
// 5. Consume a response message from the queue, that was declared on the 2nd step
.and_then(move |(channel, queue, options)| {
channel
// 4. Consume a response message from the queue, that was declared on the 1st step
.and_then(move |(publish_channel, consume_channel, queue, options)| {
consume_channel
.basic_consume(
&queue,
"response_consumer",
Expand All @@ -117,56 +115,52 @@ pub fn rpc_request_future(
.take(1)
.into_future()
.map_err(|(err, _)| err)
.map(move |(message, _)| (channel, queue, message.unwrap(), options))
.map(move |(message, _)| (publish_channel, consume_channel, queue, message.unwrap(), options))
})
})
// 6. Prepare a response for a client, serialize and sent via WebSocket transmitter
.and_then(move |(channel, queue, message, options)| {
// 5. Prepare a response for a client, serialize and sent via WebSocket transmitter
.and_then(move |(_publish_channel, consume_channel, queue, message, options)| {
let raw_data = from_utf8(&message.data).unwrap();
let json = Arc::new(Box::new(json_parse(raw_data).unwrap()));
let serializer = Serializer::new();
let response = serializer.serialize(json.dump()).unwrap();
let transmitter_local = transmitter.clone();
transmitter_local.unbounded_send(response).unwrap_or(());

channel
consume_channel
.basic_ack(message.delivery_tag, false)
.map(move |_confirmation| (channel, queue, options))
.map(move |_confirmation| (_publish_channel, consume_channel, queue, options))
})
// 7. Unbind the response queue from the exchange point
.and_then(move |(channel, _queue, options)| {
// 6. Unbind the response queue from the exchange point
.and_then(move |(publish_channel, consume_channel, _queue, options)| {
let queue_name = options.get_queue_name().unwrap().clone();
let routing_key = options.get_queue_name().unwrap().clone();
let endpoint = options.get_endpoint().unwrap().clone();

channel
consume_channel
.queue_unbind(
&queue_name,
&endpoint.get_response_exchange(),
&routing_key,
QueueUnbindOptions::default(),
FieldTable::new(),
)
.map(move |_| (channel, options))
.map(move |_| (publish_channel, consume_channel, options))
})
// 8. Delete the response queue
.and_then(move |(channel, options)| {
// 7. Delete the response queue
.and_then(move |(_publish_channel, consume_channel, options)| {
let queue_delete_options = QueueDeleteOptions {
if_unused: false,
if_empty: false,
..Default::default()
};
let queue_name = options.get_queue_name().unwrap().clone();

channel
consume_channel
.queue_delete(&queue_name, queue_delete_options)
.map(move |_| channel)
})
// 9. Close the channel
.and_then(move |channel| {
channel.close(200, "Close the channel.")
.map(move |_| ())
})
// 10. Returns the result to the caller as future
// 8. Returns the result to the caller as future
.then(move |result| match result {
Ok(_) => Ok(()),
Err(err) => {
Expand Down
4 changes: 2 additions & 2 deletions pathfinder/src/engine/middleware/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::Future;

use crate::engine::serializer::JsonMessage;
use crate::error::PathfinderError;
use crate::rabbitmq::RabbitMQClient;
use crate::rabbitmq::RabbitMQContext;

/// Type alias for dictionary with custom user headers
pub type CustomUserHeaders = HashMap<String, String>;
Expand All @@ -21,5 +21,5 @@ pub type MiddlewareFuture = Box<Future<Item=CustomUserHeaders, Error=PathfinderE
pub trait Middleware: Send + Sync {
/// Applied transforms and checks to an incoming request. If it failed,
/// then should return a `PathfinderError` instance.
fn process_request(&self, message: JsonMessage, rabbitmq_client: Arc<RabbitMQClient>) -> MiddlewareFuture;
fn process_request(&self, message: JsonMessage, rabbitmq_context: Arc<RabbitMQContext>) -> MiddlewareFuture;
}
4 changes: 2 additions & 2 deletions pathfinder/src/engine/middleware/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::future::lazy;

use crate::engine::middleware::base::{Middleware, MiddlewareFuture};
use crate::engine::serializer::JsonMessage;
use crate::rabbitmq::RabbitMQClient;
use crate::rabbitmq::RabbitMQContext;

/// A middleware that used for reverse proxy for cases when
/// not necessary to do validating tokens or permissions.
Expand All @@ -23,7 +23,7 @@ impl EmptyMiddleware {

impl Middleware for EmptyMiddleware {
/// Returns an empty future which is doesn't doing anything.
fn process_request(&self, _message: JsonMessage, _rabbitmq_client: Arc<RabbitMQClient>) -> MiddlewareFuture {
fn process_request(&self, _message: JsonMessage, _rabbitmq_context: Arc<RabbitMQContext>) -> MiddlewareFuture {
Box::new(lazy(move || Ok(HashMap::new())))
}
}
Loading

0 comments on commit 13b725a

Please sign in to comment.