diff --git a/pathfinder/Cargo.toml b/pathfinder/Cargo.toml index e041df3..d98918f 100644 --- a/pathfinder/Cargo.toml +++ b/pathfinder/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pathfinder" -version = "0.1.0" +version = "1.0.0" authors = ["Valeryi Savich "] repository = "https://github.com/OpenMatchmaking/pathfinder" readme = "../README.md" diff --git a/pathfinder/src/engine/engine.rs b/pathfinder/src/engine/engine.rs index d56aaed..bb27b75 100644 --- a/pathfinder/src/engine/engine.rs +++ b/pathfinder/src/engine/engine.rs @@ -15,7 +15,7 @@ use uuid::Uuid; use crate::cli::CliOptions; use crate::config::get_config; use crate::error::{Result, PathfinderError}; -use crate::rabbitmq::RabbitMQClient; +use crate::rabbitmq::RabbitMQContext; use super::middleware::{ CustomUserHeaders, EmptyMiddleware, JwtTokenMiddleware, Middleware, MiddlewareFuture @@ -62,7 +62,7 @@ impl Engine { &self, message: Message, transmitter: MessageSender, - rabbitmq_client: Arc + rabbitmq_context: Arc ) -> Box + Send + Sync + 'static> { // 1. Deserialize message into JSON let json_message = match deserialize_message(&message) { @@ -80,14 +80,14 @@ impl Engine { // 3. Instantiate futures that will be processing client credentials and a request let default_headers = self.generate_default_headers(&json_message.clone(), endpoint.clone()); let transmitter_inner = transmitter.clone(); - let rabbitmq_client_inner = rabbitmq_client.clone(); + let rabbitmq_context_inner = rabbitmq_context.clone(); let rpc_options = Arc::new(RpcOptions::default() .with_endpoint(endpoint.clone()) .with_message(json_message.clone()) .with_queue_name(Arc::new(format!("{}", Uuid::new_v4())) )); - let middleware_future = self.get_middleware_future(json_message.clone(), endpoint.clone(), rabbitmq_client.clone()); + let middleware_future = self.get_middleware_future(json_message.clone(), endpoint.clone(), rabbitmq_context.clone()); Box::new( middleware_future.and_then(move |custom_headers: CustomUserHeaders| { let mut request_headers = default_headers.clone(); @@ -98,7 +98,7 @@ impl Engine { } rpc_request_future( transmitter_inner.clone(), - rabbitmq_client_inner.clone(), + rabbitmq_context_inner.clone(), rpc_options.clone(), request_headers.clone() ) @@ -117,11 +117,10 @@ impl Engine { &self, json_message: JsonMessage, endpoint: ReadOnlyEndpoint, - rabbitmq_client: Arc + rabbitmq_context: Arc ) -> MiddlewareFuture { let middleware = self.get_middleware_by_endpoint(endpoint); - let rabbitmq_client_local = rabbitmq_client.clone(); - middleware.process_request(json_message, rabbitmq_client_local) + middleware.process_request(json_message, rabbitmq_context.clone()) } /// Returns a middleware that matches to the passed endpoint diff --git a/pathfinder/src/engine/futures.rs b/pathfinder/src/engine/futures.rs index 4edd956..f2e6e75 100644 --- a/pathfinder/src/engine/futures.rs +++ b/pathfinder/src/engine/futures.rs @@ -17,7 +17,7 @@ use lapin_futures_rustls::lapin::types::{AMQPValue, FieldTable}; use log::error; use crate::error::PathfinderError; -use crate::rabbitmq::RabbitMQClient; +use crate::rabbitmq::{RabbitMQContext}; use crate::engine::MessageSender; use crate::engine::options::RpcOptions; use crate::engine::serializer::Serializer; @@ -27,37 +27,35 @@ use crate::engine::serializer::Serializer; /// response to the caller via transmitter. pub fn rpc_request_future( transmitter: MessageSender, - rabbitmq_client: Arc, + rabbitmq_context: Arc, options: Arc, headers: HashMap ) -> Box + Send + Sync + 'static> { - let rabbitmq_client_local = rabbitmq_client.clone(); + let rabbitmq_context_local = rabbitmq_context.clone(); + let publish_channel = rabbitmq_context_local.get_publish_channel(); + let consume_channel = rabbitmq_context_local.get_consume_channel(); - Box::new( - // 1. Create a channel - rabbitmq_client_local.get_channel() - // 2. Declare a response queue - .and_then(move |channel| { - let queue_name = options.get_queue_name().unwrap().clone(); - let queue_declare_options = QueueDeclareOptions { - passive: false, - durable: true, - exclusive: true, - auto_delete: false, - ..Default::default() - }; + let queue_name = options.get_queue_name().unwrap().clone(); + let queue_declare_options = QueueDeclareOptions { + passive: false, + durable: true, + exclusive: true, + auto_delete: false, + ..Default::default() + }; - channel - .queue_declare(&queue_name, queue_declare_options, FieldTable::new()) - .map(move |queue| (channel, queue, options)) - }) - // 3. Link the response queue the exchange - .and_then(move |(channel, queue, options)| { + Box::new( + // 1. Declare a response queue + consume_channel + .queue_declare(&queue_name, queue_declare_options, FieldTable::new()) + .map(move |queue| (publish_channel, consume_channel, queue, options)) + // 2. Link the response queue the exchange + .and_then(move |(publish_channel, consume_channel, queue, options)| { let queue_name = options.get_queue_name().unwrap().clone(); let endpoint = options.get_endpoint().unwrap().clone(); let routing_key = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_bind( &queue_name, &endpoint.get_response_exchange(), @@ -65,10 +63,10 @@ pub fn rpc_request_future( QueueBindOptions::default(), FieldTable::new() ) - .map(move |_| (channel, queue, options)) + .map(move |_| (publish_channel, consume_channel, queue, options)) }) - // 4. Publish message into the microservice queue and make ensure that it's delivered - .and_then(move |(channel, queue, options)| { + // 3. Publish message into the microservice queue and make ensure that it's delivered + .and_then(move |(publish_channel, consume_channel, queue, options)| { let publish_message_options = BasicPublishOptions { mandatory: true, immediate: false, @@ -93,7 +91,7 @@ pub fn rpc_request_future( .with_reply_to(queue_name_response.to_string()) // Response queue .with_correlation_id(event_name.clone().to_string()); // Event name - channel + publish_channel .basic_publish( &endpoint.get_request_exchange(), &endpoint.get_routing_key(), @@ -101,11 +99,11 @@ pub fn rpc_request_future( publish_message_options, basic_properties ) - .map(move |_confirmation| (channel, queue, options)) + .map(move |_confirmation| (publish_channel, consume_channel, queue, options)) }) - // 5. Consume a response message from the queue, that was declared on the 2nd step - .and_then(move |(channel, queue, options)| { - channel + // 4. Consume a response message from the queue, that was declared on the 1st step + .and_then(move |(publish_channel, consume_channel, queue, options)| { + consume_channel .basic_consume( &queue, "response_consumer", @@ -117,11 +115,11 @@ pub fn rpc_request_future( .take(1) .into_future() .map_err(|(err, _)| err) - .map(move |(message, _)| (channel, queue, message.unwrap(), options)) + .map(move |(message, _)| (publish_channel, consume_channel, queue, message.unwrap(), options)) }) }) - // 6. Prepare a response for a client, serialize and sent via WebSocket transmitter - .and_then(move |(channel, queue, message, options)| { + // 5. Prepare a response for a client, serialize and sent via WebSocket transmitter + .and_then(move |(_publish_channel, consume_channel, queue, message, options)| { let raw_data = from_utf8(&message.data).unwrap(); let json = Arc::new(Box::new(json_parse(raw_data).unwrap())); let serializer = Serializer::new(); @@ -129,17 +127,17 @@ pub fn rpc_request_future( let transmitter_local = transmitter.clone(); transmitter_local.unbounded_send(response).unwrap_or(()); - channel + consume_channel .basic_ack(message.delivery_tag, false) - .map(move |_confirmation| (channel, queue, options)) + .map(move |_confirmation| (_publish_channel, consume_channel, queue, options)) }) - // 7. Unbind the response queue from the exchange point - .and_then(move |(channel, _queue, options)| { + // 6. Unbind the response queue from the exchange point + .and_then(move |(publish_channel, consume_channel, _queue, options)| { let queue_name = options.get_queue_name().unwrap().clone(); let routing_key = options.get_queue_name().unwrap().clone(); let endpoint = options.get_endpoint().unwrap().clone(); - channel + consume_channel .queue_unbind( &queue_name, &endpoint.get_response_exchange(), @@ -147,10 +145,10 @@ pub fn rpc_request_future( QueueUnbindOptions::default(), FieldTable::new(), ) - .map(move |_| (channel, options)) + .map(move |_| (publish_channel, consume_channel, options)) }) - // 8. Delete the response queue - .and_then(move |(channel, options)| { + // 7. Delete the response queue + .and_then(move |(_publish_channel, consume_channel, options)| { let queue_delete_options = QueueDeleteOptions { if_unused: false, if_empty: false, @@ -158,15 +156,11 @@ pub fn rpc_request_future( }; let queue_name = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_delete(&queue_name, queue_delete_options) - .map(move |_| channel) - }) - // 9. Close the channel - .and_then(move |channel| { - channel.close(200, "Close the channel.") + .map(move |_| ()) }) - // 10. Returns the result to the caller as future + // 8. Returns the result to the caller as future .then(move |result| match result { Ok(_) => Ok(()), Err(err) => { diff --git a/pathfinder/src/engine/middleware/base.rs b/pathfinder/src/engine/middleware/base.rs index c9175b6..0e88741 100644 --- a/pathfinder/src/engine/middleware/base.rs +++ b/pathfinder/src/engine/middleware/base.rs @@ -8,7 +8,7 @@ use futures::Future; use crate::engine::serializer::JsonMessage; use crate::error::PathfinderError; -use crate::rabbitmq::RabbitMQClient; +use crate::rabbitmq::RabbitMQContext; /// Type alias for dictionary with custom user headers pub type CustomUserHeaders = HashMap; @@ -21,5 +21,5 @@ pub type MiddlewareFuture = Box) -> MiddlewareFuture; + fn process_request(&self, message: JsonMessage, rabbitmq_context: Arc) -> MiddlewareFuture; } diff --git a/pathfinder/src/engine/middleware/empty.rs b/pathfinder/src/engine/middleware/empty.rs index ab57a68..1d0c1cc 100644 --- a/pathfinder/src/engine/middleware/empty.rs +++ b/pathfinder/src/engine/middleware/empty.rs @@ -9,7 +9,7 @@ use futures::future::lazy; use crate::engine::middleware::base::{Middleware, MiddlewareFuture}; use crate::engine::serializer::JsonMessage; -use crate::rabbitmq::RabbitMQClient; +use crate::rabbitmq::RabbitMQContext; /// A middleware that used for reverse proxy for cases when /// not necessary to do validating tokens or permissions. @@ -23,7 +23,7 @@ impl EmptyMiddleware { impl Middleware for EmptyMiddleware { /// Returns an empty future which is doesn't doing anything. - fn process_request(&self, _message: JsonMessage, _rabbitmq_client: Arc) -> MiddlewareFuture { + fn process_request(&self, _message: JsonMessage, _rabbitmq_context: Arc) -> MiddlewareFuture { Box::new(lazy(move || Ok(HashMap::new()))) } } diff --git a/pathfinder/src/engine/middleware/jwt.rs b/pathfinder/src/engine/middleware/jwt.rs index 525a60c..3b51473 100644 --- a/pathfinder/src/engine/middleware/jwt.rs +++ b/pathfinder/src/engine/middleware/jwt.rs @@ -29,7 +29,7 @@ use crate::engine::middleware::base::{Middleware, MiddlewareFuture, CustomUserHe use crate::engine::middleware::utils::get_permissions; use crate::engine::options::RpcOptions; use crate::engine::serializer::JsonMessage; -use crate::rabbitmq::RabbitMQClient; +use crate::rabbitmq::RabbitMQContext; /// A middleware class, that will check a JSON Web Token in WebSocket message. /// If token wasn't specified or it's invalid returns a `PathfinderError` object. @@ -43,39 +43,37 @@ impl JwtTokenMiddleware { /// Performs a request to Auth/Auth microservice with the taken token /// that must be verified before doing any actions later. - fn verify_token(&self, message: JsonMessage, token: String, rabbitmq_client: Arc) + fn verify_token(&self, message: JsonMessage, token: String, rabbitmq_context: Arc) -> impl Future + Sync + Send + 'static { let access_token = token.clone(); - let rabbitmq_client_local = rabbitmq_client.clone(); let options = Arc::new(RpcOptions::default() .with_message(message.clone()) .with_queue_name(Arc::new(format!("{}", Uuid::new_v4()))) ); + let rabbitmq_context_local = rabbitmq_context.clone(); + let publish_channel = rabbitmq_context_local.get_publish_channel(); + let consume_channel = rabbitmq_context_local.get_consume_channel(); - // 1. Create a channel - rabbitmq_client_local.get_channel() - // 2. Declare a response queue - .and_then(move |channel| { - let queue_name = options.get_queue_name().unwrap().clone(); - let queue_declare_options = QueueDeclareOptions { - passive: false, - durable: true, - exclusive: true, - auto_delete: false, - ..Default::default() - }; + let queue_name = options.get_queue_name().unwrap().clone(); + let queue_declare_options = QueueDeclareOptions { + passive: false, + durable: true, + exclusive: true, + auto_delete: false, + ..Default::default() + }; - channel - .queue_declare(&queue_name, queue_declare_options, FieldTable::new()) - .map(move |queue| (channel, queue, options)) - }) - // 3. Link the response queue the exchange - .and_then(move |(channel, queue, options)| { + // 1. Declare a response queue + consume_channel + .queue_declare(&queue_name, queue_declare_options, FieldTable::new()) + .map(move |queue| (publish_channel, consume_channel, queue, options)) + // 2. Link the response queue the exchange + .and_then(move |(publish_channel, consume_channel, queue, options)| { let queue_name = options.get_queue_name().unwrap().clone(); let routing_key = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_bind( &queue_name, RESPONSE_EXCHANGE.clone(), @@ -83,10 +81,10 @@ impl JwtTokenMiddleware { QueueBindOptions::default(), FieldTable::new() ) - .map(move |_| (channel, queue, options)) + .map(move |_| (publish_channel, consume_channel, queue, options)) }) - // 4. Publish message into the microservice queue and make ensure that it's delivered - .and_then(move |(channel, queue, options)| { + // 3. Publish message into the microservice queue and make ensure that it's delivered + .and_then(move |(publish_channel, consume_channel, queue, options)| { let publish_message_options = BasicPublishOptions { mandatory: true, immediate: false, @@ -115,7 +113,7 @@ impl JwtTokenMiddleware { .with_reply_to(queue_name_response.to_string()) // Response queue .with_correlation_id(event_name.clone().to_string()); // Event name - channel + publish_channel .basic_publish( TOKEN_VERIFY_EXCHANGE.clone(), TOKEN_VERIFY_ROUTING_KEY.clone(), @@ -123,11 +121,11 @@ impl JwtTokenMiddleware { publish_message_options, basic_properties ) - .map(move |_confirmation| (channel, queue, options)) + .map(move |_confirmation| (publish_channel, consume_channel, queue, options)) }) - // 5. Consume a response message from the queue, that was declared on the 2nd step - .and_then(move |(channel, queue, options)| { - channel + // 4. Consume a response message from the queue, that was declared on the 2nd step + .and_then(move |(publish_channel, consume_channel, queue, options)| { + consume_channel .basic_consume( &queue, "response_consumer", @@ -139,24 +137,24 @@ impl JwtTokenMiddleware { .take(1) .into_future() .map_err(|(err, _)| err) - .map(move |(message, _)| (channel, queue, message.unwrap(), options)) + .map(move |(message, _)| (publish_channel, consume_channel, queue, message.unwrap(), options)) }) }) - // 6. Prepare a response for a client, serialize and pass to the next processing stage - .and_then(move |(channel, queue, message, options)| { + // 5. Prepare a response for a client, serialize and pass to the next processing stage + .and_then(move |(publish_channel, consume_channel, queue, message, options)| { let raw_data = from_utf8(&message.data).unwrap(); let json = parse_json(raw_data).unwrap(); - channel + consume_channel .basic_ack(message.delivery_tag, false) - .map(move |_confirmation| (channel, queue, options, json)) + .map(move |_confirmation| (publish_channel, consume_channel, queue, options, json)) }) - // 7. Unbind the response queue from the exchange point - .and_then(move |(channel, _queue, options, json)| { + // 6. Unbind the response queue from the exchange point + .and_then(move |(publish_channel, consume_channel, _queue, options, json)| { let queue_name = options.get_queue_name().unwrap().clone(); let routing_key = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_unbind( &queue_name, RESPONSE_EXCHANGE.clone(), @@ -164,10 +162,10 @@ impl JwtTokenMiddleware { QueueUnbindOptions::default(), FieldTable::new(), ) - .map(move |_| (channel, options, json)) + .map(move |_| (publish_channel, consume_channel, options, json)) }) - // 8. Delete the response queue - .and_then(move |(channel, options, json)| { + // 7. Delete the response queue + .and_then(move |(_publish_channel, consume_channel, options, json)| { let queue_delete_options = QueueDeleteOptions { if_unused: false, if_empty: false, @@ -175,15 +173,11 @@ impl JwtTokenMiddleware { }; let queue_name = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_delete(&queue_name, queue_delete_options) - .map(move |_| (channel, json)) - }) - // 9. Close the channel - .and_then(move |(channel, json)| { - channel.close(200, "Close the channel.").map(|_| json) + .map(move |_| json) }) - // 10. Prepare the response for the client. + // 8. Prepare the response for the client .then(move |result| match result { Ok(json) => { let has_errors = !json["error"].is_null(); @@ -212,39 +206,37 @@ impl JwtTokenMiddleware { /// Performs a request to Auth/Auth microservice with the taken token /// that will be used for getting a list of permissions to other resources. - fn get_headers(&self, message: JsonMessage, token: String, rabbitmq_client: Arc) + fn get_headers(&self, message: JsonMessage, token: String, rabbitmq_context: Arc) -> impl Future + Sync + Send + 'static { let access_token = token.clone(); - let rabbitmq_client_local = rabbitmq_client.clone(); let options = Arc::new(RpcOptions::default() .with_message(message.clone()) .with_queue_name(Arc::new(format!("{}", Uuid::new_v4()))) ); + let rabbitmq_context_local = rabbitmq_context.clone(); + let publish_channel = rabbitmq_context_local.get_publish_channel(); + let consume_channel = rabbitmq_context_local.get_consume_channel(); - // 1. Create a channel - rabbitmq_client_local.get_channel() - // 2. Declare a response queue - .and_then(move |channel| { - let queue_name = options.get_queue_name().unwrap().clone(); - let queue_declare_options = QueueDeclareOptions { - passive: false, - durable: true, - exclusive: true, - auto_delete: false, - ..Default::default() - }; + let queue_name = options.get_queue_name().unwrap().clone(); + let queue_declare_options = QueueDeclareOptions { + passive: false, + durable: true, + exclusive: true, + auto_delete: false, + ..Default::default() + }; - channel - .queue_declare(&queue_name, queue_declare_options, FieldTable::new()) - .map(move |queue| (channel, queue, options)) - }) - // 3. Link the response queue the exchange - .and_then(move |(channel, queue, options)| { + // 1. Declare a response queue + consume_channel + .queue_declare(&queue_name, queue_declare_options, FieldTable::new()) + .map(move |queue| (publish_channel, consume_channel, queue, options)) + // 2. Link the response queue the exchange + .and_then(move |(publish_channel, consume_channel, queue, options)| { let queue_name = options.get_queue_name().unwrap().clone(); let routing_key = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_bind( &queue_name, RESPONSE_EXCHANGE.clone(), @@ -252,10 +244,10 @@ impl JwtTokenMiddleware { QueueBindOptions::default(), FieldTable::new() ) - .map(move |_| (channel, queue, options)) + .map(move |_| (publish_channel, consume_channel, queue, options)) }) - // 4. Publish message into the microservice queue and make ensure that it's delivered - .and_then(move |(channel, queue, options)| { + // 3. Publish message into the microservice queue and make ensure that it's delivered + .and_then(move |(publish_channel, consume_channel, queue, options)| { let publish_message_options = BasicPublishOptions { mandatory: true, immediate: false, @@ -284,7 +276,7 @@ impl JwtTokenMiddleware { .with_reply_to(queue_name_response.to_string()) // Response queue .with_correlation_id(event_name.clone().to_string()); // Event name - channel + publish_channel .basic_publish( TOKEN_USER_PROFILE_EXCHANGE.clone(), TOKEN_USER_PROFILE_ROUTING_KEY.clone(), @@ -292,11 +284,11 @@ impl JwtTokenMiddleware { publish_message_options, basic_properties ) - .map(move |_confirmation| (channel, queue, options)) + .map(move |_confirmation| (publish_channel, consume_channel, queue, options)) }) - // 5. Consume a response message from the queue, that was declared on the 2nd step - .and_then(move |(channel, queue, options)| { - channel + // 4. Consume a response message from the queue, that was declared on the 2nd step + .and_then(move |(publish_channel, consume_channel, queue, options)| { + consume_channel .basic_consume( &queue, "response_consumer", @@ -308,24 +300,24 @@ impl JwtTokenMiddleware { .take(1) .into_future() .map_err(|(err, _)| err) - .map(move |(message, _)| (channel, queue, message.unwrap(), options)) + .map(move |(message, _)| (publish_channel, consume_channel, queue, message.unwrap(), options)) }) }) - // 6. Prepare a response for a client, serialize and pass to the next processing stage - .and_then(move |(channel, queue, message, options)| { + // 5. Prepare a response for a client, serialize and pass to the next processing stage + .and_then(move |(publish_channel, consume_channel, queue, message, options)| { let raw_data = from_utf8(&message.data).unwrap(); let json = parse_json(raw_data).unwrap(); - channel + consume_channel .basic_ack(message.delivery_tag, false) - .map(move |_confirmation| (channel, queue, options, json)) + .map(move |_confirmation| (publish_channel, consume_channel, queue, options, json)) }) - // 7. Unbind the response queue from the exchange point - .and_then(move |(channel, _queue, options, json)| { + // 6. Unbind the response queue from the exchange point + .and_then(move |(publish_channel, consume_channel, _queue, options, json)| { let queue_name = options.get_queue_name().unwrap().clone(); let routing_key = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_unbind( &queue_name, RESPONSE_EXCHANGE.clone(), @@ -333,10 +325,10 @@ impl JwtTokenMiddleware { QueueUnbindOptions::default(), FieldTable::new(), ) - .map(move |_| (channel, options, json)) + .map(move |_| (publish_channel, consume_channel, options, json)) }) - // 8. Delete the response queue - .and_then(move |(channel, options, json)| { + // 7. Delete the response queue + .and_then(move |(_publish_channel, consume_channel, options, json)| { let queue_delete_options = QueueDeleteOptions { if_unused: false, if_empty: false, @@ -344,15 +336,11 @@ impl JwtTokenMiddleware { }; let queue_name = options.get_queue_name().unwrap().clone(); - channel + consume_channel .queue_delete(&queue_name, queue_delete_options) - .map(move |_| (channel, json)) - }) - // 9. Close the channel - .and_then(move |(channel, json)| { - channel.close(200, "Close the channel.").map(|_| json) + .map(move |_| json) }) - // 10. Prepare the response for the client. + // 8. Prepare the response for the client .then(move |result| match result { Ok(json) => { let has_errors = !json["error"].is_null(); @@ -381,7 +369,7 @@ impl JwtTokenMiddleware { } impl Middleware for JwtTokenMiddleware { - fn process_request(&self, message: JsonMessage, rabbitmq_client: Arc) -> MiddlewareFuture { + fn process_request(&self, message: JsonMessage, rabbitmq_context: Arc) -> MiddlewareFuture { // Extract a token from a JSON object let token = match message["token"].as_str() { Some(token) => String::from(token), @@ -394,8 +382,8 @@ impl Middleware for JwtTokenMiddleware { }; // Verify the passed JSON Web Token and extract permissions - let verify_token_future = self.verify_token(message.clone(),token.clone(), rabbitmq_client.clone()); - let get_headers_future = self.get_headers(message.clone(),token.clone(), rabbitmq_client.clone()); + let verify_token_future = self.verify_token(message.clone(),token.clone(), rabbitmq_context.clone()); + let get_headers_future = self.get_headers(message.clone(),token.clone(), rabbitmq_context.clone()); Box::new(verify_token_future.and_then(move |_| get_headers_future)) } } diff --git a/pathfinder/src/error.rs b/pathfinder/src/error.rs index a44de4c..def151d 100644 --- a/pathfinder/src/error.rs +++ b/pathfinder/src/error.rs @@ -11,8 +11,9 @@ use std::io; use std::result; use config::ConfigError; -use failure::Error; +use failure::{Error as FailureError}; use json::JsonValue; +use lapin_futures::error::{Error as LapinError}; use strum_macros::AsStaticStr; /// Type alias for `Result` objects that return a Pathfinder error. @@ -24,7 +25,9 @@ pub enum PathfinderError { /// The error that occurred during work with I/O. Io(io::Error), /// Represents a Lapin client error. - LapinError(Error), + LapinError(FailureError), + /// Represents an error, occurred on initialization Lapin client channel + LapinChannelError(LapinError), /// Represents all possible errors that can occur when working with /// configuration (reading, watching for a changes, etc.). SettingsError(ConfigError), @@ -50,6 +53,7 @@ impl fmt::Display for PathfinderError { match *self { PathfinderError::Io(ref err) => write!(f, "IO error: {}", err), PathfinderError::LapinError(ref err) => write!(f, "Lapin error: {}", err), + PathfinderError::LapinChannelError(ref err) => write!(f, "Lapin channel error: {}", err), PathfinderError::SettingsError(ref err) => write!(f, "Settings error: {}", err), PathfinderError::InvalidEndpoint(ref msg) => write!(f, "Parse error: {}", msg), PathfinderError::EndpointNotFound(ref msg) => write!(f, "Endpoint \"{}\" was not found", msg), diff --git a/pathfinder/src/proxy.rs b/pathfinder/src/proxy.rs index f06f775..19c7f7a 100644 --- a/pathfinder/src/proxy.rs +++ b/pathfinder/src/proxy.rs @@ -10,6 +10,7 @@ //! use std::collections::HashMap; +use std::io::{Error, ErrorKind}; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; @@ -17,6 +18,7 @@ use amq_protocol::uri::AMQPUri; use futures::stream::Stream; use futures::sync::mpsc; use futures::{Future, Sink}; +use lapin_futures::error::{Error as LapinError}; use log::{debug, info, error}; use strum::AsStaticRef; use tokio::net::TcpListener; @@ -26,7 +28,7 @@ use tungstenite::protocol::Message; use crate::cli::CliOptions; use crate::engine::{Engine, MessageSender, serialize_message, wrap_a_string_error}; use crate::error::PathfinderError; -use crate::rabbitmq::client::RabbitMQClient; +use crate::rabbitmq::client::{RabbitMQContext, RabbitMQClient}; use crate::rabbitmq::utils::get_uri; /// A reverse proxy application. @@ -68,11 +70,27 @@ impl Proxy { let connections_local = connections.clone(); accept_async(stream) - // Process the messages + // Processing an unexpected error during creation a new connection + .map_err(|error| { + let io_error = Error::new(ErrorKind::Other, error); + PathfinderError::Io(io_error) + }) + // Prepare lapin client context for further communication with RabbitMQ. .and_then(move |ws_stream| { + let rabbitmq_inner = rabbimq_local.clone(); + rabbitmq_inner + .get_context() + .map(move |rabbitmq_context: Arc| (ws_stream, rabbitmq_context)) + .map_err(|error: LapinError| PathfinderError::LapinChannelError(error)) + }) + // Process the messages + .and_then(move |(ws_stream, rabbitmq_context)| { + let connections_inner = connections_local.clone(); let connection_for_insert = connections_local.clone(); let connection_for_remove = connections_local.clone(); - let connections_inner = connections_local.clone(); + + let rabbitmq_context_inner = rabbitmq_context.clone(); + let rabbitmq_context_for_clean = rabbitmq_context.clone(); // Create a channel for the stream, which other sockets will use to // send us messages. It could be used for broadcasting your data to @@ -91,10 +109,10 @@ impl Proxy { let connections_nested = connections_inner.clone(); let transmitter_nested = connections_nested.lock().unwrap()[&addr_nested].clone(); let transmitter_for_errors = connections_nested.lock().unwrap()[&addr_nested].clone(); - let rabbitmq_nested = rabbimq_local.clone(); + let rabbitmq_context_nested = rabbitmq_context_inner.clone(); let process_request_future = engine_local - .process_request(message, transmitter_nested, rabbitmq_nested) + .process_request(message, transmitter_nested, rabbitmq_context_nested) .map_err(move |error: PathfinderError| { let response = match error { PathfinderError::MicroserviceError(json) => { @@ -105,7 +123,7 @@ impl Proxy { let error_message = format!("{}", error); let error_type = error.as_static(); wrap_a_string_error(&error_type, error_message.as_str()) - }, + } }; transmitter_for_errors.unbounded_send(response).unwrap_or(()) @@ -127,16 +145,22 @@ impl Proxy { .map_err(|_| ()) .select(ws_writer.map(|_| ()).map_err(|_| ())); - // Close the connection after using - tokio::spawn(connection.then(move |_| { - connection_for_remove.lock().unwrap().remove(&addr); - debug!("Connection {} closed.", addr); - Ok(()) - })); - + // Then clean up RabbitMQ context and close the connection after the usage + let handler = connection + .then(move |_| { + debug!("Clean up RabbitMQ context."); + rabbitmq_context_for_clean.close_channels() + }) + .then(move |_| { + connection_for_remove.lock().unwrap().remove(&addr); + debug!("Connection {} closed.", addr); + Ok(()) + }); + + tokio::spawn(handler); Ok(()) }) - // An error occurred during the WebSocket handshake + // An unexpected error occurred during processing or the WebSocket handshake .or_else(|error| { debug!("{}", error); Ok(()) diff --git a/pathfinder/src/rabbitmq/client.rs b/pathfinder/src/rabbitmq/client.rs index 3ed0552..74648cd 100644 --- a/pathfinder/src/rabbitmq/client.rs +++ b/pathfinder/src/rabbitmq/client.rs @@ -21,6 +21,38 @@ pub type LapinClient = Client; /// Alias for the lapin channel. pub type LapinChannel = Channel; +/// Custom client context, stores data, channels and everything else +/// that can be used for communicating with AMQP. +pub struct RabbitMQContext { + publish_channel: LapinChannel, + consume_channel: LapinChannel +} + +impl RabbitMQContext { + pub fn new(publish_channel: LapinChannel, consume_channel: LapinChannel) -> RabbitMQContext { + RabbitMQContext { + publish_channel, + consume_channel + } + } + + pub fn get_publish_channel(&self) -> LapinChannel { + self.publish_channel.clone() + } + + pub fn get_consume_channel(&self) -> LapinChannel { + self.consume_channel.clone() + } + + pub fn close_channels(&self) -> impl Future + Sync + Send + 'static { + let publish_channel = self.publish_channel.clone(); + let consume_channel = self.consume_channel.clone(); + + publish_channel.close(200, "Close the publish channel.") + .and_then(move |_| consume_channel.close(200, "Close the consume channel.")) + } +} + /// A future-based asynchronous RabbitMQ client. pub struct RabbitMQClient { client: Arc @@ -46,9 +78,22 @@ impl RabbitMQClient { }) } - /// Returns a lapin channel as future, based on the lapin client instance. - pub fn get_channel(&self) -> impl Future + Sync + Send + 'static { + /// Returns client context as future, based on the lapin client instance. + pub fn get_context(&self) -> impl Future, Error=LapinError> + Sync + Send + 'static { let client = self.client.clone(); + + // Request channel for publishing messages client.create_confirm_channel(ConfirmSelectOptions::default()) + .map(|publish_channel| (client, publish_channel)) + .map(|(client, publish_channel)| + // Request channel for consuming messages + client.create_confirm_channel(ConfirmSelectOptions::default()) + .map(|consume_channel| (publish_channel, consume_channel)) + ) + .flatten() + // Initialize the client context + .map(|(publish_channel, consume_channel)| + Arc::new(RabbitMQContext::new(publish_channel, consume_channel)) + ) } } diff --git a/pathfinder/src/rabbitmq/mod.rs b/pathfinder/src/rabbitmq/mod.rs index 2c3c3c2..087a19d 100644 --- a/pathfinder/src/rabbitmq/mod.rs +++ b/pathfinder/src/rabbitmq/mod.rs @@ -4,5 +4,5 @@ pub mod client; pub mod utils; -pub use self::client::{LapinChannel, LapinClient, RabbitMQClient}; +pub use self::client::{LapinChannel, LapinClient, RabbitMQContext, RabbitMQClient}; pub use self::utils::{get_address_to_rabbitmq, get_uri};