diff --git a/README.md b/README.md index c0ef5f1..a07884f 100644 --- a/README.md +++ b/README.md @@ -1,79 +1,164 @@ # redsumer-rs -A **lightweight implementation** of **Redis Streams** for Rust, that allows you managing streaming messages in an easy way. -With `redsumer-rs` you can: +A lightweight implementation of Redis Streams for Rust, allowing you to manage streaming messages in a simplified way. With redsumer you can: + - **Produce** new messages in a specific *stream*. -- **Consume** messages from specific *stream*, setting config parameters that allow you a flexible implementation. +- **Consume** messages from specific *stream*, setting config parameters that allow you a flexible implementation. It also provides an option to minimize the possibility of two or more consumers from the same consumer group consuming the same message simultaneously. + +To use ***redsumer*** from GitHub repository with specific version, set the dependency in Cargo.toml file as follows: + +```ini +[dependencies] +redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.4.2" } +``` -For more dependency options from git, check section [3.0 Cargo Reference](https://doc.rust-lang.org/cargo/reference/index.html) from `The Cargo Book`. +You can depend on it via cargo by adding the following dependency to your `Cargo.toml` file: + +```ini +[dependencies] +redsumer = { version = "0.4.2" } +``` ## Basic Usage -### Producer: +#### Produce a new stream message: -Create a new producer instance: +Create a new producer instance and produce a new stream message from a [BTreeMap](`std::collections::BTreeMap`): ```rust,no_run - use redsumer::*; - - let credentials: Option = None; - let host: &str = "localhost"; - let port: &str = "6379"; - let db: &str = "0"; - let stream_name: &str = "my-stream"; - - let producer_result: RedsumerResult = - RedsumerProducer::new( - credentials, - host, - port, - db, - stream_name, - ); - - let producer: RedsumerProducer = producer_result.unwrap(); +use std::collections::BTreeMap; + +use redsumer::*; +use time::OffsetDateTime; +use uuid::Uuid; + +#[tokio::main] +async fn main() { + let credentials: Option = None; + let host: &str = "localhost"; + let port: &str = "6379"; + let db: &str = "0"; + let stream_name: &str = "my-stream"; + + let producer_result: RedsumerResult = + RedsumerProducer::new( + credentials, + host, + port, + db, + stream_name, + ); + + let producer: RedsumerProducer = producer_result.unwrap_or_else(|error| { + panic!("Error creating a new RedsumerProducer instance: {:?}", error); + }); + + let mut message: BTreeMap<&str, String> = BTreeMap::new(); + message.insert("id", Uuid::default().to_string()); + message.insert("started_at", OffsetDateTime::now_utc().to_string()); + + let id: Id = producer.produce(message).await.unwrap_or_else(|error| { + panic!("Error producing stream message from BTreeMap: {:?}", error.to_string()); + }); +} ``` -### Consumer: +Similar to the previous example, you can produce a message from a [HashMap](std::collections::HashMap) or a [HashSet](std::collections::HashSet). Go to [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases like producing a stream message from an instance of a struct. + +The [produce](RedsumerProducer::produce) method accepts a generic type that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information. + +#### Consume messages from a stream: -Create a new consumer instance: +Create a new consumer instance and consume messages from stream: ```rust,no_run - use redsumer::*; - - let credentials: Option = None; - let host: &str = "localhost"; - let port: &str = "6379"; - let db: &str = "0"; - let stream_name: &str = "my-stream"; - let group_name: &str = "group-name"; - let consumer_name: &str = "consumer"; - let since_id: &str = "0-0"; - let min_idle_time_milliseconds: usize = 1000; - let new_messages_count: usize = 3; - let pending_messages_count: usize = 2; - let claimed_messages_count: usize = 1; - let block: u8 = 5; - - let consumer_result: RedsumerResult = RedsumerConsumer::new( - credentials, - host, - port, - db, - stream_name, - group_name, - consumer_name, - since_id, - min_idle_time_milliseconds, - new_messages_count, - pending_messages_count, - claimed_messages_count, - block, - ); - - let mut consumer: RedsumerConsumer = consumer_result.unwrap(); +use redsumer::*; +use redsumer::redis::StreamId; + +#[tokio::main] +async fn main() { + let credentials: Option = None; + let host: &str = "localhost"; + let port: &str = "6379"; + let db: &str = "0"; + let stream_name: &str = "my-stream"; + let group_name: &str = "group-name"; + let consumer_name: &str = "consumer"; + let since_id: &str = "0-0"; + let min_idle_time_milliseconds: usize = 1000; + let new_messages_count: usize = 3; + let pending_messages_count: usize = 2; + let claimed_messages_count: usize = 1; + let block: u8 = 5; + + let consumer_result: RedsumerResult = RedsumerConsumer::new( + credentials, + host, + port, + db, + stream_name, + group_name, + consumer_name, + since_id, + min_idle_time_milliseconds, + new_messages_count, + pending_messages_count, + claimed_messages_count, + block, + ); + + let mut consumer: RedsumerConsumer = consumer_result.unwrap_or_else(|error| { + panic!("Error creating a new RedsumerConsumer instance: {:?}", error); + }); + + loop { + let messages: Vec = consumer.consume().await.unwrap_or_else(|error| { + panic!("Error consuming messages from stream: {:?}", error); + }); + + for message in messages { + if consumer.is_still_mine(&message.id).unwrap_or_else(|error| { + panic!( + "Error checking if message is still in consumer pending list: {:?}", error + ); + }) { + // Process message ... + println!("Processing message: {:?}", message); + // ... + + let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| { + panic!("Error acknowledging message: {:?}", error); + }); + + if ack { + println!("Message acknowledged: {:?}", message); + } + } + } + } +} ``` +In this example, the [consume](RedsumerConsumer::consume) method is called in a loop to consume messages from the stream. +The [consume](RedsumerConsumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream. +The [is_still_mine](RedsumerConsumer::is_still_mine) method is used to check if the message is still in the consumer pending list. +If it is, the message is processed and then acknowledged using the [ack](RedsumerConsumer::ack) method. +The [ack](RedsumerConsumer::ack) method returns a boolean indicating if the message was successfully acknowledged. + +The main objective of this message consumption strategy is to minimize the possibility that two or more consumers from the same consumer group operating simultaneously consume the same message at the same time. +Knowing that it is a complex problem with no definitive solution, including business logic in the message processing instance will always improve results. + +Take a look at the [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases. + +#### Utilities from [redis] crate: + +The [redis] module provides utilities from the [redis](https://docs.rs/redis) crate. You can use these utilities to interact with Redis values and errors. + +#### Unwrap [Value](redis::Value) to a specific type: + +The [Value](redis::Value) enum represents a Redis value. It can be converted to a specific type using the [from_redis_value](redis::from_redis_value) function. This function can be imported from the [redis] module. +***redsumer*** includes the [FromRedisValueHandler] struct that implements the [FromRedisValue](redis::FromRedisValue) trait for a lot of types. It is useful to convert a [Value](redis::Value) to a specific type reducing boilerplate code and total lines of code. + ## Contributing We welcome contributions to `redsumer-rs`. Here are some ways you can contribute: diff --git a/redsumer-rs/Cargo.toml b/redsumer-rs/Cargo.toml index a9c37bf..2393648 100644 --- a/redsumer-rs/Cargo.toml +++ b/redsumer-rs/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "redsumer" description = "Lightweight implementation of Redis Streams for Rust" -version = "0.4.1" +version = "0.4.2" edition = "2021" license-file = "../LICENSE" readme = "../README.md" @@ -21,11 +21,11 @@ authors = [ ] [dependencies] -redis = { version = "0.25.3", features = ["tokio-comp", "streams"] } +redis = { version = "0.25.4", features = ["tokio-comp", "streams"] } tokio = { version = "1.38.0", features = ["full"] } -uuid = { version = "1.8.0" } +uuid = { version = "1.9.1" } time = { version = "0.3.36", features = ["parsing"] } bytes = { version = "1.6.0" } serde = { version = "1.0.203", features = ["derive"] } -serde_json = { version = "1.0.117" } -log = { version = "0.4.21" } +serde_json = { version = "1.0.120" } +log = { version = "0.4.22" } diff --git a/redsumer-rs/src/lib.rs b/redsumer-rs/src/lib.rs index 8323dca..7c22233 100644 --- a/redsumer-rs/src/lib.rs +++ b/redsumer-rs/src/lib.rs @@ -1,73 +1,162 @@ -//! A **lightweight implementation** of **Redis Streams** for Rust, that allows you managing streaming messages in an easy way. -//! With `redsumer-rs` you can: +//! A lightweight implementation of Redis Streams for Rust, allowing you to manage streaming messages in a simplified way. With redsumer you can: +//! //! - **Produce** new messages in a specific *stream*. -//! - **Consume** messages from specific *stream*, setting config parameters that allow you a flexible implementation. +//! - **Consume** messages from specific *stream*, setting config parameters that allow you a flexible implementation. It also provides an option to minimize the possibility of two or more consumers from the same consumer group consuming the same message simultaneously. +//! +//! To use ***redsumer*** from GitHub repository with specific version, set the dependency in Cargo.toml file as follows: +//! +//! ```ini +//! [dependencies] +//! redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.4.2" } +//! ``` //! -//! For more dependency options from git, check section [3.0 Cargo Reference](https://doc.rust-lang.org/cargo/reference/index.html) from `The Cargo Book`. +//! You can depend on it via cargo by adding the following dependency to your `Cargo.toml` file: +//! +//! ```ini +//! [dependencies] +//! redsumer = { version = "0.4.2" } +//! ``` //! //! ## Basic Usage //! -//! Create a new producer instance: +//! #### Produce a new stream message: +//! +//! Create a new producer instance and produce a new stream message from a [BTreeMap](`std::collections::BTreeMap`): //! //! ```rust,no_run +//! use std::collections::BTreeMap; +//! //! use redsumer::*; -//! -//! let credentials: Option = None; -//! let host: &str = "localhost"; -//! let port: &str = "6379"; -//! let db: &str = "0"; -//! let stream_name: &str = "my-stream"; -//! -//! let producer_result: RedsumerResult = -//! RedsumerProducer::new( -//! credentials, -//! host, -//! port, -//! db, -//! stream_name, -//! ); -//! -//! let producer: RedsumerProducer = producer_result.unwrap(); +//! use time::OffsetDateTime; +//! use uuid::Uuid; +//! +//! #[tokio::main] +//! async fn main() { +//! let credentials: Option = None; +//! let host: &str = "localhost"; +//! let port: &str = "6379"; +//! let db: &str = "0"; +//! let stream_name: &str = "my-stream"; +//! +//! let producer_result: RedsumerResult = +//! RedsumerProducer::new( +//! credentials, +//! host, +//! port, +//! db, +//! stream_name, +//! ); +//! +//! let producer: RedsumerProducer = producer_result.unwrap_or_else(|error| { +//! panic!("Error creating a new RedsumerProducer instance: {:?}", error); +//! }); +//! +//! let mut message: BTreeMap<&str, String> = BTreeMap::new(); +//! message.insert("id", Uuid::default().to_string()); +//! message.insert("started_at", OffsetDateTime::now_utc().to_string()); +//! +//! let id: Id = producer.produce(message).await.unwrap_or_else(|error| { +//! panic!("Error producing stream message from BTreeMap: {:?}", error.to_string()); +//! }); +//! } //! ``` //! -//! Create a new consumer instance: +//! Similar to the previous example, you can produce a message from a [HashMap](std::collections::HashMap) or a [HashSet](std::collections::HashSet). Go to [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases like producing a stream message from an instance of a struct. +//! +//! The [produce](RedsumerProducer::produce) method accepts a generic type that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information. +//! +//! #### Consume messages from a stream: +//! +//! Create a new consumer instance and consume messages from stream: //! //! ```rust,no_run //! use redsumer::*; +//! use redsumer::redis::StreamId; +//! +//! #[tokio::main] +//! async fn main() { +//! let credentials: Option = None; +//! let host: &str = "localhost"; +//! let port: &str = "6379"; +//! let db: &str = "0"; +//! let stream_name: &str = "my-stream"; +//! let group_name: &str = "group-name"; +//! let consumer_name: &str = "consumer"; +//! let since_id: &str = "0-0"; +//! let min_idle_time_milliseconds: usize = 1000; +//! let new_messages_count: usize = 3; +//! let pending_messages_count: usize = 2; +//! let claimed_messages_count: usize = 1; +//! let block: u8 = 5; +//! +//! let consumer_result: RedsumerResult = RedsumerConsumer::new( +//! credentials, +//! host, +//! port, +//! db, +//! stream_name, +//! group_name, +//! consumer_name, +//! since_id, +//! min_idle_time_milliseconds, +//! new_messages_count, +//! pending_messages_count, +//! claimed_messages_count, +//! block, +//! ); +//! +//! let mut consumer: RedsumerConsumer = consumer_result.unwrap_or_else(|error| { +//! panic!("Error creating a new RedsumerConsumer instance: {:?}", error); +//! }); +//! +//! loop { +//! let messages: Vec = consumer.consume().await.unwrap_or_else(|error| { +//! panic!("Error consuming messages from stream: {:?}", error); +//! }); +//! +//! for message in messages { +//! if consumer.is_still_mine(&message.id).unwrap_or_else(|error| { +//! panic!( +//! "Error checking if message is still in consumer pending list: {:?}", error +//! ); +//! }) { +//! // Process message ... +//! println!("Processing message: {:?}", message); +//! // ... +//! +//! let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| { +//! panic!("Error acknowledging message: {:?}", error); +//! }); //! -//! let credentials: Option = None; -//! let host: &str = "localhost"; -//! let port: &str = "6379"; -//! let db: &str = "0"; -//! let stream_name: &str = "my-stream"; -//! let group_name: &str = "group-name"; -//! let consumer_name: &str = "consumer"; -//! let since_id: &str = "0-0"; -//! let min_idle_time_milliseconds: usize = 1000; -//! let new_messages_count: usize = 3; -//! let pending_messages_count: usize = 2; -//! let claimed_messages_count: usize = 1; -//! let block: u8 = 5; -//! -//! let consumer_result: RedsumerResult = RedsumerConsumer::new( -//! credentials, -//! host, -//! port, -//! db, -//! stream_name, -//! group_name, -//! consumer_name, -//! since_id, -//! min_idle_time_milliseconds, -//! new_messages_count, -//! pending_messages_count, -//! claimed_messages_count, -//! block, -//! ); -//! -//! let mut consumer: RedsumerConsumer = consumer_result.unwrap(); +//! if ack { +//! println!("Message acknowledged: {:?}", message); +//! } +//! } +//! } +//! } +//! } //! ``` //! +//! In this example, the [consume](RedsumerConsumer::consume) method is called in a loop to consume messages from the stream. +//! The [consume](RedsumerConsumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream. +//! The [is_still_mine](RedsumerConsumer::is_still_mine) method is used to check if the message is still in the consumer pending list. +//! If it is, the message is processed and then acknowledged using the [ack](RedsumerConsumer::ack) method. +//! The [ack](RedsumerConsumer::ack) method returns a boolean indicating if the message was successfully acknowledged. +//! +//! The main objective of this message consumption strategy is to minimize the possibility that two or more consumers from the same consumer group operating simultaneously consume the same message at the same time. +//! Knowing that it is a complex problem with no definitive solution, including business logic in the message processing instance will always improve results. +//! +//! Take a look at the [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases. +//! +//! #### Utilities from [redis] crate: +//! +//! The [redis] module provides utilities from the [redis](https://docs.rs/redis) crate. You can use these utilities to interact with Redis values and errors. +//! +//! #### Unwrap [Value](redis::Value) to a specific type: +//! +//! The [Value](redis::Value) enum represents a Redis value. It can be converted to a specific type using the [from_redis_value](redis::from_redis_value) function. This function can be imported from the [redis] module. +//! ***redsumer*** includes the [FromRedisValueHandler] struct that implements the [FromRedisValue](redis::FromRedisValue) trait for a lot of types. It is useful to convert a [Value](redis::Value) to a specific type reducing boilerplate code and total lines of code. +//! //! ## Contributing //! //! We welcome contributions to `redsumer-rs`. Here are some ways you can contribute: diff --git a/redsumer-rs/src/redsumer/client.rs b/redsumer-rs/src/redsumer/client.rs index f0c8a54..b616550 100644 --- a/redsumer-rs/src/redsumer/client.rs +++ b/redsumer-rs/src/redsumer/client.rs @@ -3,6 +3,10 @@ use redis::Client; use super::types::RedsumerResult; /// To hold credentials to authenticate in *Redis*. +/// +/// This credentials are used to authenticate in *Redis* when server requires it. If server does not require it, you set it to `None`. +/// +/// This crate uses a connection URL in format: `redis://[][:@]:/`. Other formats are not supported yet. pub struct ClientCredentials<'k> { user: &'k str, password: &'k str, @@ -20,10 +24,14 @@ impl<'k> ClientCredentials<'k> { } /// Build a new instance of [`ClientCredentials`]. + /// /// # Arguments: /// - **user**: Redis user. /// - **password**: Redis password. /// + /// # Returns: + /// A new instance of [`ClientCredentials`]. + /// /// ```rust,no_run /// use redsumer::ClientCredentials; /// let credentials = ClientCredentials::new("user", "password"); @@ -41,6 +49,9 @@ impl<'k> ClientCredentials<'k> { /// - **host**: Redis host. /// - **port**: Redis port. /// - **db**: Redis database. +/// +/// # Returns: +/// - A [`RedsumerResult`] with a new instance of [`Client`] to connect to *Redis*. Otherwise, a [`RedsumerError`] is returned. pub fn get_redis_client( credentials: Option, host: &str, diff --git a/redsumer-rs/src/redsumer/consumer.rs b/redsumer-rs/src/redsumer/consumer.rs index 2613859..d29c2de 100644 --- a/redsumer-rs/src/redsumer/consumer.rs +++ b/redsumer-rs/src/redsumer/consumer.rs @@ -6,13 +6,25 @@ use redis::{ StreamClaimOptions, StreamClaimReply, StreamId, StreamPendingCountReply, StreamReadOptions, StreamReadReply, }, - Client, Commands, ErrorKind, RedisError, + Client, Commands, ConnectionLike, ErrorKind, RedisError, }; use super::client::{get_redis_client, ClientCredentials}; -use super::types::{Id, RedsumerResult}; -/// A consumer implementation of *Redis Streams*. +#[allow(unused_imports)] +use super::types::{Id, RedsumerError, RedsumerResult}; + +/// A consumer implementation of Redis Streams. +/// +/// The consumer is responsible for consuming messages from a stream. It can read new messages, pending messages or claim messages from other consumers according to their min idle time. +/// +/// The consumer can be created using the [new](`RedsumerConsumer::new`) method. After creating a new consumer, it is possible to consume messages using the [consume](`RedsumerConsumer::consume`) method. +/// +/// Also it is possible to verify if a specific message is still in consumer pending list using the [is_still_mine](`RedsumerConsumer::is_still_mine`) method. +/// +/// The consumer can also ack a message using the [ack](`RedsumerConsumer::ack`) method. If the message is acked, it is removed from the consumer pending list. +/// +/// Take a look at the [new](`RedsumerConsumer::new`) to know more about the consumer creation process and its parameters. #[derive(Debug, Clone)] pub struct RedsumerConsumer<'c> { client: Client, @@ -79,6 +91,15 @@ impl<'c> RedsumerConsumer<'c> { } /// Build a new [`RedsumerConsumer`] instance. + /// + /// Before creating a new consumer, the following validations are performed: + /// + /// - If the *new_messages_count*, *pending_messages_count* and *claimed_messages_count* are all zero, a [`RedsumerError`] is returned. + /// - If connection string is invalid, a [`RedsumerError`] is returned. + /// - If connection to Redis server can not be established, a [`RedsumerError`] is returned. + /// - If the stream does not exist, a [`RedsumerError`] is returned: The stream must exist before creating a new consumer. + /// - If the consumers group does not exist, it is created based on the *stream_name*, *group_name* and *since_id*. If the consumers group already exists, a warning is logged. If an error occurs during the creation process, a [`RedsumerError`] is returned. + /// /// # Arguments: /// - **credentials**: Optional [`ClientCredentials`] to authenticate in Redis. /// - **host**: Redis host. @@ -94,26 +115,45 @@ impl<'c> RedsumerConsumer<'c> { /// - **claimed_messages_count**: Maximum number of claimed messages to read. /// - **block**: Max time to wait for new messages, given in milliseconds. /// - /// When a new instance of [`RedsumerConsumer`] is created, it is checked if the stream exists and if the consumers group exists. If the consumers group does not exist, it is created based on the *stream_name*, *group_name* and *since_id*. + /// # Returns: + /// - A [`RedsumerResult`] containing a [`RedsumerConsumer`] instance. Otherwise, a [`RedsumerError`] is returned. /// + /// # Example: + /// Create a new [`RedsumerConsumer`] instance. /// ```rust,no_run - /// use redsumer::{RedsumerConsumer, ClientCredentials}; + /// use redsumer::{ClientCredentials, RedsumerConsumer}; /// - /// let consumer = RedsumerConsumer::new( - /// Some(ClientCredentials::new("user", "password")), - /// "localhost", - /// "6379", - /// "0", - /// "my_stream", - /// "my_consumer_group", - /// "0-0", - /// "my_consumer", - /// 360000, - /// 10, - /// 10, - /// 10, - /// 5, - /// ).unwrap(); + /// let client_credentials = Some(ClientCredentials::new("user", "password")); + /// let host = "localhost"; + /// let port = "6379"; + /// let db = "0"; + /// let stream_name = "my_stream"; + /// let group_name = "my_consumer_group"; + /// let consumer_name = "my_consumer"; + /// let since_id = "0-0"; + /// let min_idle_time_milliseconds = 360000; + /// let new_messages_count = 10; + /// let pending_messages_count = 10; + /// let claimed_messages_count = 10; + /// let block = 5; + /// + /// let consumer: RedsumerConsumer = RedsumerConsumer::new( + /// client_credentials, + /// host, + /// port, + /// db, + /// stream_name, + /// group_name, + /// consumer_name, + /// since_id, + /// min_idle_time_milliseconds, + /// new_messages_count, + /// pending_messages_count, + /// claimed_messages_count, + /// block, + /// ).unwrap_or_else(|error| { + /// panic!("Error creating new RedsumerConsumer: {}", error); + /// }); /// ``` pub fn new( credentials: Option>, @@ -130,7 +170,23 @@ impl<'c> RedsumerConsumer<'c> { claimed_messages_count: usize, block: u8, ) -> RedsumerResult { - let client: Client = get_redis_client(credentials, host, port, db)?; + let total_messages_to_read: usize = + new_messages_count + pending_messages_count + claimed_messages_count; + if total_messages_to_read.eq(&0) { + return Err(RedisError::from(( + ErrorKind::TryAgain, + "Total messages to read must be grater than zero", + ))); + } + + let mut client: Client = get_redis_client(credentials, host, port, db)?; + + if !client.check_connection() { + return Err(RedisError::from(( + ErrorKind::TryAgain, + "Error getting connection to Redis server", + ))); + }; if !client.get_connection()?.exists::<_, bool>(stream_name)? { return Err(RedisError::from(( @@ -156,15 +212,6 @@ impl<'c> RedsumerConsumer<'c> { } }; - let total_messages_to_read: usize = - new_messages_count + pending_messages_count + claimed_messages_count; - if total_messages_to_read.eq(&0) { - return Err(RedisError::from(( - ErrorKind::TryAgain, - "Total messages to read must be grater than zero", - ))); - } - Ok(Self { client, stream_name, @@ -259,11 +306,18 @@ impl<'c> RedsumerConsumer<'c> { .ids) } - /// Consume messages from *stream* according to the following steps: + /// Consume messages from stream according to the following steps: + /// /// 1. Consumer tries to get new messages. If new messages are found, they are returned as a result. /// 2. If new messages are not found, consumer tries to get pending messages. If pending messages are found, they are returned as a result. /// 3. If pending messages are not found, consumer tries to claim messages from other consumers according to *min_idle_time_milliseconds*. If claimed messages are found, they are returned as a result. /// 4. If new, pending or claimed messages are not found, an empty list is returned as a result. + /// + /// # Arguments: + /// *No arguments* + /// + /// # Returns: + /// - A [`RedsumerResult`] containing a list of [`StreamId`] if new, pending or claimed messages are found, otherwise an empty list is returned. If an error occurs, a [`RedsumerError`] is returned. pub async fn consume(&mut self) -> RedsumerResult> { debug!("Consuming messages from stream {}", self.get_stream_name()); @@ -303,8 +357,14 @@ impl<'c> RedsumerConsumer<'c> { } /// Verify if a specific message by *id* is still in consumer pending list. + /// + /// If the message is not still in consumer pending list, it is recommended to verify if another consumer has claimed the message before trying to process it again. + /// /// # Arguments: /// - **id**: Stream message id. + /// + /// # Returns: + /// - A [`RedsumerResult`] containing a boolean value. If the message is still in consumer pending list, `true` is returned. Otherwise, `false` is returned. If an error occurs, a [`RedsumerError`] is returned. pub fn is_still_mine(&self, id: &Id) -> RedsumerResult { Ok(self .get_client() @@ -323,8 +383,14 @@ impl<'c> RedsumerConsumer<'c> { } /// Ack a message by *id*. + /// + /// If the message is acked, it is removed from the consumer pending list. Otherwise, it is recommended to verify if another consumer has claimed the message before trying to process it again. + /// /// # Arguments: /// - **id**: Stream message id. + /// + /// # Returns: + /// - A [`RedsumerResult`] containing a boolean value. If the message is acked, `true` is returned. Otherwise, `false` is returned. If an error occurs, a [`RedsumerError`] is returned. pub async fn ack(&self, id: &Id) -> RedsumerResult { Ok(self.get_client().get_connection()?.xack::<_, _, _, bool>( self.stream_name, diff --git a/redsumer-rs/src/redsumer/producer.rs b/redsumer-rs/src/redsumer/producer.rs index 182a666..034066a 100644 --- a/redsumer-rs/src/redsumer/producer.rs +++ b/redsumer-rs/src/redsumer/producer.rs @@ -1,9 +1,13 @@ -use redis::{Client, Commands, ToRedisArgs}; +use redis::{Client, Commands, ConnectionLike, ErrorKind, RedisError, ToRedisArgs}; use super::client::{get_redis_client, ClientCredentials}; -use super::types::{Id, RedsumerResult}; + +#[allow(unused_imports)] +use super::types::{Id, RedsumerError, RedsumerResult}; /// A producer implementation of *Redis Streams*. +/// +/// This struct is responsible for producing messages in a stream. #[derive(Debug, Clone)] pub struct RedsumerProducer<'p> { client: Client, @@ -22,6 +26,12 @@ impl<'p> RedsumerProducer<'p> { } /// Build a new [`RedsumerProducer`] instance. + /// + /// Before creating a new producer, the following validations are performed: + /// + /// - If connection string is invalid, a [`RedsumerError`] is returned. + /// - If connection to Redis server can not be established, a [`RedsumerError`] is returned. + /// /// # Arguments: /// - **credentials**: Optional [`ClientCredentials`] to authenticate in Redis. /// - **host**: Redis host. @@ -29,16 +39,23 @@ impl<'p> RedsumerProducer<'p> { /// - **db**: Redis database. /// - **stream_name**: Stream name to produce messages. /// + /// # Returns: + /// - A [`RedsumerResult`] with the new [`RedsumerProducer`] instance. Otherwise, a [`RedsumerError`] is returned. + /// + /// # Example: + /// Create a new [`RedsumerProducer`] instance. /// ```rust,no_run /// use redsumer::RedsumerProducer; /// - /// let producer = RedsumerProducer::new( + /// let producer: RedsumerProducer = RedsumerProducer::new( /// None, /// "localhost", /// "6379", /// "0", /// "my_stream", - /// ).unwrap(); + /// ).unwrap_or_else(|err| { + /// panic!("Error creating producer: {:?}", err); + /// }); /// ``` pub fn new( credentials: Option>, @@ -47,7 +64,14 @@ impl<'p> RedsumerProducer<'p> { db: &'p str, stream_name: &'p str, ) -> RedsumerResult> { - let client: Client = get_redis_client(credentials, host, port, db)?; + let mut client: Client = get_redis_client(credentials, host, port, db)?; + + if !client.check_connection() { + return Err(RedisError::from(( + ErrorKind::TryAgain, + "Error getting connection to Redis server", + ))); + }; Ok(RedsumerProducer { client, @@ -55,9 +79,17 @@ impl<'p> RedsumerProducer<'p> { }) } - /// Produce a message in the stream, where message implements [`ToRedisArgs`]. + /// Produce a new message in stream. + /// + /// This method produces a new message in the stream setting the *ID* as "*", which means that Redis will generate a new *ID* for the message automatically with the current timestamp. + /// + /// If stream does not exist, it will be created. + /// /// # Arguments: - /// - **message**: Message to produce in stream. + /// - **message**: Message to produce in stream. It must implement [`ToRedisArgs`]. + /// + /// # Returns: + /// - A [`RedsumerResult`] with the *ID* of the produced message. Otherwise, a [`RedsumerError`] is returned. pub async fn produce(&self, message: M) -> RedsumerResult where M: ToRedisArgs, diff --git a/redsumer-rs/tests/test_redsumer.rs b/redsumer-rs/tests/test_redsumer.rs index 1ee5d1b..845035f 100644 --- a/redsumer-rs/tests/test_redsumer.rs +++ b/redsumer-rs/tests/test_redsumer.rs @@ -16,7 +16,13 @@ pub mod test_redsumer { "test", ); - assert!(producer_res.is_ok()); + assert!(producer_res.is_err()); + + let error: RedsumerError = producer_res.unwrap_err(); + assert_eq!( + error.to_string(), + "Error getting connection to Redis server- TryAgain" + ); } #[tokio::test]