diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 3f2407b..e603d75 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -14,16 +14,41 @@ jobs: runs-on: ubuntu-latest steps: - - name: Redis streams service - uses: supercharge/redis-github-action@1.7.0 + - uses: actions/checkout@v3 + + - name: Install Rust + uses: actions-rust-lang/setup-rust-toolchain@v1.8.0 + with: + components: llvm-tools-preview + + - name: Install cargo-llvm-cov + uses: taiki-e/install-action@v2.33.22 with: - redis-port: 6379 + tool: cargo-llvm-cov - - name: Git checkout - uses: actions/checkout@v3 - - name: Build project run: cargo build --verbose - + + - name: Verify project formatting + run: cargo fmt --all --check + + - name: Check for possible errors and coding suggestions + run: cargo clippy --all-features + - name: Run project tests + env: + RUST_LOG: warn run: cargo test --verbose + + - name: Run tests with coverage for all features + env: + RUST_LOG: warn + run: cargo llvm-cov --workspace --all-features --codecov --output-path codecov.json + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4.3.1 + with: + files: codecov.json + fail_ci_if_error: true + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0ff5343 --- /dev/null +++ b/Makefile @@ -0,0 +1,23 @@ +fmt: + cargo fmt --all + +fmt-check: + cargo fmt --all --check + +clippy-check: + cargo clippy --all-features + +install-llvm-cov: + cargo install cargo-llvm-cov + +test-llvm-cov: + cargo llvm-cov --html --workspace --all-features + +test: + cargo test --all-features + +doc: + cargo doc --all-features + +doc-open: + cargo doc --all-features --open \ No newline at end of file diff --git a/README.md b/README.md index a07884f..c7aaab9 100644 --- a/README.md +++ b/README.md @@ -9,14 +9,14 @@ To use ***redsumer*** from GitHub repository with specific version, set the depe ```ini [dependencies] -redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.4.2" } +redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.5.0-beta.1" } ``` You can depend on it via cargo by adding the following dependency to your `Cargo.toml` file: ```ini [dependencies] -redsumer = { version = "0.4.2" } +redsumer = { version = "0.5.0-beta.1" } ``` ## Basic Usage @@ -34,38 +34,56 @@ use uuid::Uuid; #[tokio::main] async fn main() { - let credentials: Option = None; - let host: &str = "localhost"; - let port: &str = "6379"; - let db: &str = "0"; - let stream_name: &str = "my-stream"; - - let producer_result: RedsumerResult = - RedsumerProducer::new( - credentials, - host, - port, - db, - stream_name, - ); - - let producer: RedsumerProducer = producer_result.unwrap_or_else(|error| { - panic!("Error creating a new RedsumerProducer instance: {:?}", error); - }); - - let mut message: BTreeMap<&str, String> = BTreeMap::new(); - message.insert("id", Uuid::default().to_string()); - message.insert("started_at", OffsetDateTime::now_utc().to_string()); - - let id: Id = producer.produce(message).await.unwrap_or_else(|error| { - panic!("Error producing stream message from BTreeMap: {:?}", error.to_string()); - }); + let credentials: Option = None; + let host: &str = "localhost"; + let port: u16 = 6379; + let db: i64 = 0; + let stream_name: &str = "my-stream"; + + let args: ClientArgs = ClientArgs::new( + credentials, + host, + port, + db, + CommunicationProtocol::RESP2, + ); + + let config: ProducerConfig = ProducerConfig::new(stream_name); + + let producer_result: RedsumerResult = + Producer::new( + &args, + &config, + ); + + let producer: Producer = producer_result.unwrap_or_else(|error| { + panic!("Error creating a new RedsumerProducer instance: {:?}", error); + }); + + let mut message_1: BTreeMap<&str, String> = BTreeMap::new(); + message_1.insert("id", Uuid::new_v4().to_string()); + message_1.insert("started_at", OffsetDateTime::now_utc().to_string()); + + let mut message_2: Vec<(String, String)> = Vec::new(); + message_2.push(("id".to_string(), Uuid::new_v4().to_string())); + message_2.push(("started_at".to_string(), OffsetDateTime::now_utc().to_string())); + + let id_1: Id = producer.produce_from_map(message_1).await.unwrap_or_else(|error| { + panic!("Error producing stream message from BTreeMap: {:?}", error.to_string()); + }); + + let id_2: Id = producer.produce_from_items(message_2).await.unwrap_or_else(|error| { + panic!("Error producing stream message from Vec: {:?}", error.to_string()); + }); + + println!("Message 1 produced with id: {:?}", id_1); + println!("Message 2 produced with id: {:?}", id_2); } ``` -Similar to the previous example, you can produce a message from a [HashMap](std::collections::HashMap) or a [HashSet](std::collections::HashSet). Go to [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases like producing a stream message from an instance of a struct. +Similar to the previous example, you can produce a message from a [HashMap](std::collections::HashMap) or a [HashSet](std::collections::HashSet). -The [produce](RedsumerProducer::produce) method accepts a generic type that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information. +The [produce_from_map](Producer::produce_from_map) and [produce_from_items](Producer::produce_from_items) methods accepts generic types that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information. #### Consume messages from a stream: @@ -77,79 +95,92 @@ use redsumer::redis::StreamId; #[tokio::main] async fn main() { - let credentials: Option = None; - let host: &str = "localhost"; - let port: &str = "6379"; - let db: &str = "0"; - let stream_name: &str = "my-stream"; - let group_name: &str = "group-name"; - let consumer_name: &str = "consumer"; - let since_id: &str = "0-0"; - let min_idle_time_milliseconds: usize = 1000; - let new_messages_count: usize = 3; - let pending_messages_count: usize = 2; - let claimed_messages_count: usize = 1; - let block: u8 = 5; - - let consumer_result: RedsumerResult = RedsumerConsumer::new( - credentials, - host, - port, - db, - stream_name, - group_name, - consumer_name, - since_id, - min_idle_time_milliseconds, - new_messages_count, - pending_messages_count, - claimed_messages_count, - block, - ); - - let mut consumer: RedsumerConsumer = consumer_result.unwrap_or_else(|error| { - panic!("Error creating a new RedsumerConsumer instance: {:?}", error); - }); - - loop { - let messages: Vec = consumer.consume().await.unwrap_or_else(|error| { - panic!("Error consuming messages from stream: {:?}", error); - }); - - for message in messages { - if consumer.is_still_mine(&message.id).unwrap_or_else(|error| { - panic!( - "Error checking if message is still in consumer pending list: {:?}", error - ); - }) { - // Process message ... - println!("Processing message: {:?}", message); - // ... - - let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| { - panic!("Error acknowledging message: {:?}", error); - }); - - if ack { - println!("Message acknowledged: {:?}", message); - } - } - } - } + let credentials: Option = None; + let host: &str = "localhost"; + let port: u16 = 6379; + let db: i64 = 0; + let stream_name: &str = "my-stream"; + let group_name: &str = "group-name"; + let consumer_name: &str = "consumer"; + let initial_stream_id: &str = "0-0"; + let min_idle_time_milliseconds: usize = 1000; + let new_messages_count: usize = 3; + let pending_messages_count: usize = 2; + let claimed_messages_count: usize = 1; + let block: usize = 5; + + let args: ClientArgs = ClientArgs::new( + credentials, + host, + port, + db, + CommunicationProtocol::RESP2, + ); + + let config: ConsumerConfig = ConsumerConfig::new( + stream_name, + group_name, + consumer_name, + ReadNewMessagesOptions::new( + new_messages_count, + block + ), + ReadPendingMessagesOptions::new( + pending_messages_count + ), + ClaimMessagesOptions::new( + claimed_messages_count, + min_idle_time_milliseconds + ), + ); + + let consumer_result: RedsumerResult = Consumer::new( + args, + config, + Some(initial_stream_id.to_string()), + ); + + let mut consumer: Consumer = consumer_result.unwrap_or_else(|error| { + panic!("Error creating a new RedsumerConsumer instance: {:?}", error); + }); + + loop { + let messages: Vec = consumer.consume().await.unwrap_or_else(|error| { + panic!("Error consuming messages from stream: {:?}", error); + }); + + for message in messages { + if consumer.is_still_mine(&message.id).unwrap_or_else(|error| { + panic!( + "Error checking if message is still in consumer pending list: {:?}", error + ); + }) { + // Process message ... + println!("Processing message: {:?}", message); + // ... + + let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| { + panic!("Error acknowledging message: {:?}", error); + }); + + if ack { + println!("Message acknowledged: {:?}", message); + } + } + } + } } ``` -In this example, the [consume](RedsumerConsumer::consume) method is called in a loop to consume messages from the stream. -The [consume](RedsumerConsumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream. -The [is_still_mine](RedsumerConsumer::is_still_mine) method is used to check if the message is still in the consumer pending list. -If it is, the message is processed and then acknowledged using the [ack](RedsumerConsumer::ack) method. -The [ack](RedsumerConsumer::ack) method returns a boolean indicating if the message was successfully acknowledged. +In this example, the [consume](Consumer::consume) method is called in a loop to consume messages from the stream. +The [consume](Consumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream. +The [is_still_mine](Consumer::is_still_mine) method is used to check if the message is still in the consumer pending list. +If it is, the message is processed and then acknowledged using the [ack](Consumer::ack) method. +The [ack](Consumer::ack) method returns a boolean indicating if the message was successfully acknowledged. The main objective of this message consumption strategy is to minimize the possibility that two or more consumers from the same consumer group operating simultaneously consume the same message at the same time. Knowing that it is a complex problem with no definitive solution, including business logic in the message processing instance will always improve results. -Take a look at the [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases. - #### Utilities from [redis] crate: The [redis] module provides utilities from the [redis](https://docs.rs/redis) crate. You can use these utilities to interact with Redis values and errors. @@ -157,7 +188,6 @@ The [redis] module provides utilities from the [redis](https://docs.rs/redis) cr #### Unwrap [Value](redis::Value) to a specific type: The [Value](redis::Value) enum represents a Redis value. It can be converted to a specific type using the [from_redis_value](redis::from_redis_value) function. This function can be imported from the [redis] module. -***redsumer*** includes the [FromRedisValueHandler] struct that implements the [FromRedisValue](redis::FromRedisValue) trait for a lot of types. It is useful to convert a [Value](redis::Value) to a specific type reducing boilerplate code and total lines of code. ## Contributing diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..98865fe --- /dev/null +++ b/codecov.yml @@ -0,0 +1,11 @@ +coverage: + status: + project: + default: + threshold: 5% + target: 80% + + patch: + default: + threshold: 5% + target: 80% \ No newline at end of file diff --git a/redsumer-rs/CHANGELOG.md b/redsumer-rs/CHANGELOG.md index e6be8aa..63d035f 100644 --- a/redsumer-rs/CHANGELOG.md +++ b/redsumer-rs/CHANGELOG.md @@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## ✨ v0.5.0-beta.1 [2024-09-30] + +### Added: + +- ⚡ Implement `Debug` for `ClientCredentials`. +- ⚡ Implement `CommunicationProtocol` type to define Redis Protocol version. +- ⚡ Implement `ClientArgs` and `RedisClientBuilder` to build Redis Client. +- ⚡ Implement `VerifyConnection` trait and `ping()` function to verify connection to Redis Server. +- ⚡ Implement `produce_from_map()`, `produce_from_items()` and `ProducerCommands` in producer core module. +- ⚡ Implement `ProducerConfig` to manage the configuration parameters for `Producer`. Implement `ClientArgs` in `Producer`. **[BreakingChange]** +- ⚡ Implement `ConsumerConfig` to manage the configuration parameters for `Consumer`. Implement `ClientArgs` in `Consumer`. Implement `ReadNewMessagesOptions` , `ReadPendingMessagesOptions` and `ClaimMessagesOptions` in `ConsumerConfig` **[BreakingChange]** + +### Changed: + +- 🚀 Rename `RedsumerProducer` to `Producer`. **[BreakingChange]** + +### Removed: + +- ❌ Remove `FromRedisValueHandler` from crate. **[BreakingChange]** +- ❌ Remove internal function `get_redis_client()` from client module. + ## ✨ v0.4.1 [2024-06-13] ### Fixed: @@ -15,9 +36,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added: -- ⚡ Implementation of new types: RedsumerResult, RedsumerError and Id (Breaking change). -- ⚡ Debug and Clone implementation in RedsumerProducer and RedsumerConsumer. -- ⚡ The consumer configuration parameters were implemented directly in RedsumerConsumer (Breaking change). +- ⚡ Implementation of new types: `RedsumerResult`, `RedsumerError` and `Id`. **[BreakingChange]** +- ⚡ `Debug` and `Clone` implementation in `RedsumerProducer` and `RedsumerConsumer`. +- ⚡ The consumer configuration parameters were implemented directly in `RedsumerConsumer`. **[BreakingChange]** ### Fixed: @@ -27,15 +48,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - 🚀 New project structure as workspace. - 🚀 Update dependencies and documentation. -- 🚀 Library modules reorganization (Breaking change). -- 🚀 FromRedisValueImplHandler was changed to FromRedisValueHandler (Breaking change). -- 🚀 The produce_from_map method was replaced by the produce method in RedsumerProducer (Breaking change). -- 🚀 The validate_pending_message_ownership method was replaced by is_still_mine in RedsumerConsumer (Breaking change). -- 🚀 The acknowledge method was replaced by ack in RedsumerConsumer (Breaking change). -- 🚀 The consume method was refactored in RedsumerConsumer in order to implement a new consumption methodology that allows scalability in distributed systems. To understand this new implementation in detail, take a look at the project https://github.com/elpablete/refactored-computing-machine. +- 🚀 Library modules reorganization. **[BreakingChange]** +- 🚀 `FromRedisValueImplHandler` was changed to `FromRedisValueHandler`. **[BreakingChange]** +- 🚀 The `produce_from_map()` method was replaced by the `produce()` method in `RedsumerProducer`. **[BreakingChange]** +- 🚀 The `validate_pending_message_ownership()` method was replaced by `is_still_mine()` in `RedsumerConsumer`. **[BreakingChange]** +- 🚀 The acknowledge method was replaced by ack in `RedsumerConsumer`. **[BreakingChange]** +- 🚀 The consume method was refactored in `RedsumerConsumer` in order to implement a new consumption methodology that allows scalability in distributed systems. To understand this new implementation in detail, take a look at the project https://github.com/elpablete/refactored-computing-machine. ### Removed: -- ❌ The stream_information.rs module was removed from the project: StreamInfo and StreamConsumersInfo implementations were removed (Breaking change). -- ❌ RedsumerConsumerOptions was removed (Breaking change). -- ❌ The produce_from_items method was removed from RedsumerProducer (Breaking change). \ No newline at end of file +- ❌ The *stream_information.rs* module was removed from the project: `StreamInfo` and `StreamConsumersInfo` implementations were removed. **[BreakingChange]** +- ❌ `RedsumerConsumerOptions` was removed. **[BreakingChange]** +- ❌ The `produce_from_items()` method was removed from `RedsumerProducer`. **[BreakingChange]** \ No newline at end of file diff --git a/redsumer-rs/Cargo.toml b/redsumer-rs/Cargo.toml index 2393648..deb61ff 100644 --- a/redsumer-rs/Cargo.toml +++ b/redsumer-rs/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "redsumer" description = "Lightweight implementation of Redis Streams for Rust" -version = "0.4.2" +version = "0.5.0-beta.1" edition = "2021" license-file = "../LICENSE" readme = "../README.md" @@ -16,16 +16,15 @@ categories = [ "database-implementations", ] authors = [ - "enerBit", "Juan Manuel Tamayo ", ] [dependencies] -redis = { version = "0.25.4", features = ["tokio-comp", "streams"] } -tokio = { version = "1.38.0", features = ["full"] } -uuid = { version = "1.9.1" } -time = { version = "0.3.36", features = ["parsing"] } -bytes = { version = "1.6.0" } -serde = { version = "1.0.203", features = ["derive"] } -serde_json = { version = "1.0.120" } -log = { version = "0.4.22" } +redis = { version = "0.27.2", features = ["tokio-comp", "streams"] } +tracing = { version = "0.1.40" } + +[dev-dependencies] +redis-test = { version = "0.6.0" } +tokio = { version = "1.40.0", features = ["full"] } +time ={ version = "0.3.36" } +uuid ={ version = "1.10.0", features = ["v4"] } diff --git a/redsumer-rs/src/core/client.rs b/redsumer-rs/src/core/client.rs new file mode 100644 index 0000000..c4b4e8b --- /dev/null +++ b/redsumer-rs/src/core/client.rs @@ -0,0 +1,385 @@ +use std::fmt::Debug; + +use redis::{Client, ConnectionAddr, ConnectionInfo, ProtocolVersion, RedisConnectionInfo}; + +#[allow(unused_imports)] +use super::result::{RedsumerError, RedsumerResult}; + +/// Communication protocol to be used by the client. It is an alias for [`ProtocolVersion`]. +pub type CommunicationProtocol = ProtocolVersion; + +/// To hold credentials to authenticate in Redis. +/// +/// This credentials are used to authenticate in Redis when server requires it. If server does not require it, you set it to `None`. +#[derive(Clone)] +pub struct ClientCredentials { + /// User to authenticate in Redis service. + user: String, + + /// Password to authenticate in Redis service. + password: String, +} + +impl ClientCredentials { + /// Get *user* + fn get_user(&self) -> &str { + &self.user + } + + /// Get *password* + fn get_password(&self) -> &str { + &self.password + } + + /// Build a new instance of [`ClientCredentials`]. + /// + /// # Arguments: + /// - **user**: The username to authenticate in Redis service. + /// - **password**: The password to authenticate in Redis service. + /// + /// # Returns: + /// A new instance of [`ClientCredentials`]. + /// + /// ```rust,no_run + /// use redsumer::ClientCredentials; + /// let credentials = ClientCredentials::new("user", "password"); + /// ``` + pub fn new(user: &str, password: &str) -> ClientCredentials { + ClientCredentials { + user: user.to_owned(), + password: password.to_owned(), + } + } +} + +impl Debug for ClientCredentials { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClientCredentials") + .field("user", &self.user) + .field("password", &"****") + .finish() + } +} + +/// Define the configuration parameters to create a [`Client`] instance. +/// +/// Take a look at the following supported connection URL format to infer the client arguments: +/// +/// `redis://[][:@]:/` +/// +/// *user* and *password* are optional. If you don't need to authenticate in Redis, you can ignore them. *port* and *db* are mandatory for the connection. Another connection URL formats are not implemented yet. +#[derive(Debug, Clone)] +pub struct ClientArgs { + /// Credentials to authenticate in Redis. + credentials: Option, + + /// Host to connect to Redis. + host: String, + + /// Redis server port. + port: u16, + + /// Redis database number. + db: i64, + + /// Redis protocol version to communicate with the server. + protocol: CommunicationProtocol, +} + +impl ClientArgs { + /// Get *credentials*. + pub fn get_credentials(&self) -> &Option { + &self.credentials + } + + /// Get *host*. + pub fn get_host(&self) -> &str { + &self.host + } + + /// Get *port*. + pub fn get_port(&self) -> u16 { + self.port + } + + /// Get *db*. + pub fn get_db(&self) -> i64 { + self.db + } + + /// Get *protocol*. + pub fn get_protocol(&self) -> CommunicationProtocol { + self.protocol + } + + /// Create a new instance of [`ClientArgs`]. + /// + /// # Arguments: + /// - **credentials**: Credentials to authenticate in Redis. + /// - **host**: Host to connect to Redis. + /// - **port**: Redis server port. + /// - **db**: Redis database + /// - **protocol**: Redis protocol version to communicate with the server. + /// + /// # Returns: + /// A new instance of [`ClientArgs`]. + pub fn new( + credentials: Option, + host: &str, + port: u16, + db: i64, + protocol: CommunicationProtocol, + ) -> ClientArgs { + ClientArgs { + credentials, + host: host.to_owned(), + port, + db, + protocol, + } + } +} + +/// To build a new instance of [`Client`]. +pub trait RedisClientBuilder { + /// Build a new instance of [`Client`]. + /// + /// # Arguments: + /// - No arguments. + /// + /// # Returns: + /// A [`RedsumerResult`] with a new instance of [`Client`]. Otherwise, a [`RedsumerError`] is returned. + fn build(&self) -> RedsumerResult; +} + +impl RedisClientBuilder for ClientArgs { + fn build(&self) -> RedsumerResult { + let addr: ConnectionAddr = + ConnectionAddr::Tcp(String::from(self.get_host()), self.get_port()); + + let username: Option = self + .get_credentials() + .to_owned() + .map(|c| c.get_user().to_string()); + + let password: Option = self + .get_credentials() + .to_owned() + .map(|c| c.get_password().to_string()); + + let redis: RedisConnectionInfo = RedisConnectionInfo { + db: self.get_db(), + username, + password, + protocol: self.get_protocol(), + }; + + Client::open(ConnectionInfo { addr, redis }) + } +} + +#[cfg(test)] +mod test_client_credentials { + use super::*; + + #[test] + fn test_client_credentials_builder_ok() { + // Define the user and password to authenticate in Redis: + let user: &str = "user"; + let password: &str = "password"; + + // Create a new instance of ClientCredentials: + let credentials: ClientCredentials = ClientCredentials::new(user, password); + + // Verify if the user and password are correct: + assert_eq!(credentials.get_user(), user); + assert_eq!(credentials.get_password(), password); + } + + #[test] + fn test_client_credentials_debug() { + // Define the user and password to authenticate in Redis: + let user: &str = "user"; + let password: &str = "password"; + + // Create a new instance of ClientCredentials: + let credentials: ClientCredentials = ClientCredentials::new(user, password); + + // Verify if the debug is correct: + assert_eq!( + format!("{:?}", credentials), + "ClientCredentials { user: \"user\", password: \"****\" }" + ); + } + + #[test] + fn test_client_credentials_clone() { + // Define the user and password to authenticate in Redis: + let user: &str = "user"; + let password: &str = "password"; + + // Create a new instance of ClientCredentials: + let credentials: ClientCredentials = ClientCredentials::new(user, password); + + // Clone the credentials: + let cloned_credentials: ClientCredentials = credentials.clone(); + + // Verify if the credentials are correct: + assert_eq!(credentials.get_user(), cloned_credentials.get_user()); + assert_eq!( + credentials.get_password(), + cloned_credentials.get_password() + ); + } +} + +#[cfg(test)] +mod test_client_args { + use super::*; + + #[test] + fn test_client_args_builder_ok() { + // Define the user and password to authenticate in Redis: + let user: &str = "user"; + let password: &str = "password"; + + // Create a new instance of ClientCredentials: + let credentials: ClientCredentials = ClientCredentials::new(user, password); + + // Define the host to connect to Redis: + let host: &str = "localhost"; + + // Define the port to connect to Redis: + let port: u16 = 6379; + + // Define the database to connect to Redis: + let db: i64 = 1; + + // Define the redis protocol version: + let protocol_version: CommunicationProtocol = CommunicationProtocol::RESP2; + + // Create a new instance of ClientArgs with default port and db: + let args: ClientArgs = ClientArgs::new(Some(credentials), host, port, db, protocol_version); + + // Verify if the args are correct: + assert!(args.get_credentials().is_some()); + assert_eq!(args.get_credentials().to_owned().unwrap().get_user(), user); + assert_eq!( + args.get_credentials().to_owned().unwrap().get_password(), + password + ); + assert_eq!(args.get_host(), host); + assert_eq!(args.get_port(), port); + assert_eq!(args.get_db(), db); + } + + #[test] + fn test_client_args_debug() { + // Define the user and password to authenticate in Redis: + let user: &str = "user"; + let password: &str = "password"; + + // Create a new instance of ClientCredentials: + let credentials: ClientCredentials = ClientCredentials::new(user, password); + + // Define the host to connect to Redis: + let host: &str = "localhost"; + + // Define the port to connect to Redis: + let port: u16 = 6379; + + // Define the database to connect to Redis: + let db: i64 = 1; + + // Define the redis protocol version: + let protocol_version: CommunicationProtocol = CommunicationProtocol::RESP2; + + // Create a new instance of ClientArgs with default port and db: + let args: ClientArgs = ClientArgs::new(Some(credentials), host, port, db, protocol_version); + + // Verify if the debug is correct: + assert_eq!(format!("{:?}", args), "ClientArgs { credentials: Some(ClientCredentials { user: \"user\", password: \"****\" }), host: \"localhost\", port: 6379, db: 1, protocol: RESP2 }"); + } + + #[test] + fn test_client_args_clone() { + // Define the user and password to authenticate in Redis: + let user: &str = "user"; + let password: &str = "password"; + + // Create a new instance of ClientCredentials: + let credentials: ClientCredentials = ClientCredentials::new(user, password); + + // Define the host to connect to Redis: + let host: &str = "localhost"; + + // Define the port to connect to Redis: + let port: u16 = 6379; + + // Define the database to connect to Redis: + let db: i64 = 1; + + // Define the redis protocol version: + let protocol_version: CommunicationProtocol = CommunicationProtocol::RESP2; + + // Create a new instance of ClientArgs with default port and db: + let args: ClientArgs = ClientArgs::new(Some(credentials), host, port, db, protocol_version); + + // Clone the args: + let cloned_args: ClientArgs = args.clone(); + + // Verify if the args are correct: + assert_eq!( + args.get_credentials().to_owned().unwrap().get_user(), + cloned_args.get_credentials().to_owned().unwrap().get_user() + ); + assert_eq!( + args.get_credentials().to_owned().unwrap().get_password(), + cloned_args + .get_credentials() + .to_owned() + .unwrap() + .get_password() + ); + assert_eq!(args.get_host(), cloned_args.get_host()); + assert_eq!(args.get_port(), cloned_args.get_port()); + assert_eq!(args.get_db(), cloned_args.get_db()); + assert_eq!(args.get_protocol(), cloned_args.get_protocol()); + } +} + +#[cfg(test)] +mod test_redis_client_builder { + use super::*; + + #[test] + fn test_redis_client_builder_ok_with_null_credentials() { + // Create a new instance of ClientArgs with default port and db: + let args: ClientArgs = + ClientArgs::new(None, "mylocalhost", 6377, 16, CommunicationProtocol::RESP2); + + // Build a new instance of Client: + let client_result: RedsumerResult = args.build(); + + // Verify if the client is correct: + assert!(client_result.is_ok()); + } + + #[test] + fn test_redis_client_builder_ok_with_credentials() { + // Create a new instance of ClientArgs with default port and db: + let args: ClientArgs = ClientArgs::new( + Some(ClientCredentials::new("user", "password")), + "mylocalhost", + 6377, + 16, + CommunicationProtocol::RESP2, + ); + + // Build a new instance of Client: + let client_result: RedsumerResult = args.build(); + + // Verify if the client is correct: + assert!(client_result.is_ok()); + } +} diff --git a/redsumer-rs/src/core/connection.rs b/redsumer-rs/src/core/connection.rs new file mode 100644 index 0000000..0801568 --- /dev/null +++ b/redsumer-rs/src/core/connection.rs @@ -0,0 +1,77 @@ +use redis::{Commands, ErrorKind, RedisError, RedisResult}; +use tracing::{debug, error}; + +#[allow(unused_imports)] +use crate::core::result::{RedsumerError, RedsumerResult}; + +fn ping(c: &mut C) -> RedisResult +where + C: Commands, +{ + match c.check_connection() { + true => { + debug!("The connection to the Redis server was verified"); + Ok("PONG".into()) + } + false => { + let e: &str = "The connection to the Redis server could not be verified. Please verify the client configuration or server availability"; + error!(e); + Err(RedisError::from((ErrorKind::ClientError, e))) + } + } +} + +/// A trait to verify the connection to the Redis server. +pub trait VerifyConnection { + /// Verify the connection to the Redis server. + /// + /// # Arguments: + /// - No arguments. + /// + /// # Returns: + /// A [`RedsumerResult`] with a [`String`] equal to `PONG` if the connection was verified successfully. Otherwise, a [`RedsumerError`] is returned. + fn ping(&mut self) -> RedsumerResult; +} + +impl VerifyConnection for C +where + C: Commands, +{ + fn ping(&mut self) -> RedsumerResult { + ping(self) + } +} + +#[cfg(test)] +mod test_connection { + use redis::Client; + use redis_test::MockRedisConnection; + + use super::*; + + #[test] + fn test_ping_ok() { + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![]); + + // Ping the server: + let ping_result: RedsumerResult = conn.ping(); + + // Verify the connection to the server: + assert!(ping_result.is_ok()); + assert_eq!(ping_result.unwrap(), "PONG".to_string()); + } + + #[test] + fn test_ping_error() { + // Create a client from a fake host: + let mut client: Client = Client::open("redis://fakehost/0").unwrap(); + + // Ping the server: + let ping_result: RedsumerResult = client.ping(); + + // Verify the connection to the server: + assert!(ping_result.is_err()); + assert_eq!(ping_result.unwrap_err().to_string(), "The connection to the Redis server could not be verified. Please verify the client configuration or server availability- ClientError"); + } +} diff --git a/redsumer-rs/src/core/mod.rs b/redsumer-rs/src/core/mod.rs new file mode 100644 index 0000000..7558281 --- /dev/null +++ b/redsumer-rs/src/core/mod.rs @@ -0,0 +1,4 @@ +pub mod client; +pub mod connection; +pub mod result; +pub mod streams; diff --git a/redsumer-rs/src/core/result.rs b/redsumer-rs/src/core/result.rs new file mode 100644 index 0000000..77a731c --- /dev/null +++ b/redsumer-rs/src/core/result.rs @@ -0,0 +1,7 @@ +use redis::RedisError; + +/// Error type for *redsumer* operations, it is an alias for [`RedisError`]. +pub type RedsumerError = RedisError; + +/// Result type for *redsumer* operations. +pub type RedsumerResult = Result; diff --git a/redsumer-rs/src/core/streams/consumer.rs b/redsumer-rs/src/core/streams/consumer.rs new file mode 100644 index 0000000..017a98f --- /dev/null +++ b/redsumer-rs/src/core/streams/consumer.rs @@ -0,0 +1,1319 @@ +use redis::{ + streams::{ + StreamAutoClaimOptions, StreamAutoClaimReply, StreamId, StreamPendingCountReply, + StreamReadOptions, StreamReadReply, + }, + Commands, ErrorKind, RedisError, RedisResult, ToRedisArgs, +}; +use tracing::{debug, error, warn}; + +#[allow(unused_imports)] +use crate::core::{ + result::{RedsumerError, RedsumerResult}, + streams::types::{LatestPendingMessageId, NextIdToClaim}, +}; + +pub const BEGINNING_OF_TIME_ID: &str = "0-0"; + +/// Get StreamIds from a StreamReadReply by key. +trait UnwrapStreamReadReply { + /// Unwrap StreamReadReply by key into a Vec. + /// + /// # Arguments: + /// - **key**: A key to filter the StreamReadReply. + /// + /// # Returns: + /// A Vec with the StreamIds found. + fn unwrap_by_key(&self, key: &K) -> Vec + where + K: ToString; +} + +impl UnwrapStreamReadReply for StreamReadReply +where + K: ToString, +{ + fn unwrap_by_key(&self, key: &K) -> Vec { + let mut ids: Vec = Vec::new(); + + for stream in self.keys.iter() { + match stream.key.eq(&key.to_string()) { + true => ids.extend(stream.ids.to_owned()), + false => warn!( + "An unexpected stream name found while extracting the key {}: {}. ", + &key.to_string(), + stream.key, + ), + }; + } + + ids + } +} + +/// Verify if a stream exists in Redis Stream service. +fn verify_if_stream_exists(conn: &mut C, key: K) -> RedsumerResult<()> +where + C: Commands, + K: ToRedisArgs, +{ + match conn.exists::<_, bool>(key) { + Ok(true) => { + debug!("The stream already exists"); + Ok(()) + } + Ok(false) => { + error!("The stream does not exist"); + Err(RedisError::from(( + ErrorKind::ClientError, + "Stream does not exist", + ))) + } + Err(e) => { + error!("Error verifying if stream exists: {:?}", e); + Err(RedisError::from(( + ErrorKind::ClientError, + "Error verifying if stream exists", + ))) + } + } +} + +/// Create a consumer group in a stream. +fn create_consumer_group( + conn: &mut C, + key: K, + group: G, + since_id: ID, +) -> RedisResult +where + C: Commands, + K: ToRedisArgs, + G: ToRedisArgs, + ID: ToRedisArgs, +{ + match conn.xgroup_create::<_, _, _, String>(key, group, since_id) { + Ok(_) => { + debug!("The consumers group was successfully created"); + Ok(true) + } + Err(e) => { + if e.to_string().contains("BUSYGROUP") { + debug!("The consumer group already exists"); + Ok(false) + } else { + error!("Error creating consumer group: {:?}", e); + Err(e) + } + } + } +} + +/// Read new messages from a stream. +fn read_new_messages( + conn: &mut C, + key: &K, + group: &G, + consumer: &N, + count: usize, + block: usize, +) -> RedisResult> +where + C: Commands, + K: ToRedisArgs + ToString, + G: ToRedisArgs, + N: ToRedisArgs, +{ + Ok(match count.gt(&0) { + true => conn + .xread_options::<_, _, StreamReadReply>( + &[key], + &[">"], + &StreamReadOptions::default() + .group(group, consumer) + .count(count) + .block(block), + )? + .unwrap_by_key(key), + false => Vec::new(), + }) +} + +/// Read pending messages from a stream. +fn read_pending_messages( + conn: &mut C, + key: &K, + group: &G, + consumer: &N, + latest_pending_message_id: ID, + count: usize, +) -> RedisResult<(Vec, LatestPendingMessageId)> +where + C: Commands, + K: ToRedisArgs + ToString, + G: ToRedisArgs, + N: ToRedisArgs, + ID: ToRedisArgs, +{ + match count.gt(&0) { + true => { + let pending_messages: Vec = conn + .xread_options::<_, _, StreamReadReply>( + &[key], + &[latest_pending_message_id], + &StreamReadOptions::default() + .group(group, consumer) + .count(count), + )? + .unwrap_by_key(key); + + let latest_pending_message_id: String = match pending_messages.last() { + Some(s) => s.id.to_owned(), + None => BEGINNING_OF_TIME_ID.to_owned(), + }; + + Ok((pending_messages, latest_pending_message_id)) + } + false => Ok((Vec::new(), BEGINNING_OF_TIME_ID.to_owned())), + } +} + +/// Claim pending messages from a stream. +fn claim_pending_messages( + conn: &mut C, + key: &K, + group: &G, + consumer: &N, + min_idle_time: usize, + next_id_to_claim: ID, + count: usize, +) -> RedisResult<(Vec, NextIdToClaim)> +where + C: Commands, + K: ToRedisArgs, + G: ToRedisArgs, + N: ToRedisArgs, + ID: ToRedisArgs, +{ + match count.gt(&0) { + true => { + let reply: StreamAutoClaimReply = conn + .xautoclaim_options::<_, _, _, _, _, StreamAutoClaimReply>( + key, + group, + consumer, + min_idle_time, + next_id_to_claim, + StreamAutoClaimOptions::default().count(count), + )?; + + Ok((reply.claimed.to_owned(), reply.next_stream_id.to_owned())) + } + false => Ok((Vec::new(), BEGINNING_OF_TIME_ID.to_owned())), + } +} + +/// Verify if a message is still in the consumer pending list. +fn is_still_mine( + conn: &mut C, + key: K, + group: G, + consumer: CN, + id: ID, +) -> RedsumerResult +where + C: Commands, + K: ToRedisArgs, + G: ToRedisArgs, + CN: ToRedisArgs, + ID: ToRedisArgs, +{ + match conn.xpending_consumer_count::<_, _, _, _, _, _, StreamPendingCountReply>( + key, group, &id, &id, 1, consumer, + ) { + Ok(r) => match r.ids.len().gt(&0) { + true => { + debug!("The message is still in the consumer pending list"); + Ok(true) + } + false => { + debug!("The message is not in the consumer pending list"); + Ok(false) + } + }, + Err(e) => { + error!( + "Error verifying if message is still in consumer pending list: {:?}", + e + ); + Err(e) + } + } +} + +/// Ack a message in a consumer group. +fn ack(conn: &mut C, key: K, group: G, id: ID) -> RedsumerResult +where + C: Commands, + K: ToRedisArgs, + G: ToRedisArgs, + ID: ToRedisArgs, +{ + match conn.xack::<_, _, _, bool>(key, group, &[id]) { + Ok(true) => { + debug!("The message was successfully acknowledged"); + Ok(true) + } + Ok(false) => { + debug!("The message was not acknowledged"); + Ok(false) + } + Err(e) => { + error!("Error acknowledging message: {:?}", e); + Err(e) + } + } +} + +/// A trait that bundles methods for consuming messages from a Redis stream +pub trait ConsumerCommands +where + K: ToRedisArgs, +{ + /// Verify if a stream exists in Redis Stream service. + /// + /// # Arguments: + /// - **key**: A stream key, which must implement the `ToRedisArgs` trait. + /// + /// # Returns: + /// A [`RedsumerResult`] with the result of the operation. + /// If the stream exists, the function will return a success result. + /// If the stream does not exist, the function will return an error result. + /// If an error occurs, the function will return an error result. + fn verify_if_stream_exists(&mut self, key: K) -> RedsumerResult<()>; + + /// Create a consumer group in a Redis stream. + /// + /// # Arguments: + /// - **key**: A stream key, which must implement the `ToRedisArgs` trait. + /// - **group**: A consumers group, which must implement the `ToRedisArgs` trait. + /// - **since_id**: The ID of the message to start consuming, which must implement the `ToRedisArgs` trait. + /// + /// # Returns: + /// A [`RedsumerResult`] with the result of the operation. + /// If the consumer group already exists, the function will return a success result with a `false` value. + /// If the consumer group does not exist, the function will create it and return a success result with a `true` value. + /// If an error occurs, the function will return an error result. + fn create_consumer_group( + &mut self, + key: K, + group: G, + since_id: ID, + ) -> RedsumerResult + where + G: ToRedisArgs, + ID: ToRedisArgs; + + /// Read new messages from a stream. + /// + /// # Arguments: + /// - **key**: A stream key, which must implement the `ToRedisArgs` trait. + /// - **group**: A consumers group, which must implement the `ToRedisArgs` trait. + /// - **consumer**: A consumer name, which must implement the `ToRedisArgs` trait. + /// - **count**: The number of messages to read. + /// - **block**: The time to block waiting for new messages. + /// + /// # Returns: + /// A [`RedisResult`] with a vector of [`StreamId`]s. + /// If the operation is successful, the function will return a vector of [`StreamId`]s. + /// If an error occurs, the function will return an error result. + fn read_new_messages( + &mut self, + key: &K, + group: &G, + consumer: &N, + count: usize, + block: usize, + ) -> RedisResult> + where + G: ToRedisArgs, + N: ToRedisArgs; + + /// Read pending messages from a stream. + /// + /// # Arguments: + /// - **key**: A stream key, which must implement the `ToRedisArgs` trait. + /// - **group**: A consumers group, which must implement the `ToRedisArgs` trait. + /// - **consumer**: A consumer name, which must implement the `ToRedisArgs` trait. + /// - **latest_pending_message_id**: The ID of the latest pending message, which must implement the `ToRedisArgs` trait. + /// - **count**: The number of messages to read. + /// + /// # Returns: + /// A [`RedisResult`] with a tuple of a vector of [`StreamId`]s and the latest pending message ID. + /// If the operation is successful, the function will return a tuple with a vector of [`StreamId`]s and the latest pending message ID. + /// If an error occurs, the function will return an error result. + fn read_pending_messages( + &mut self, + key: &K, + group: &G, + consumer: &N, + latest_pending_message_id: ID, + count: usize, + ) -> RedisResult<(Vec, LatestPendingMessageId)> + where + G: ToRedisArgs, + N: ToRedisArgs, + ID: ToRedisArgs; + + /// Claim pending messages from a stream. + /// + /// # Arguments: + /// - **key**: A stream key, which must implement the `ToRedisArgs` trait. + /// - **group**: A consumers group, which must implement the `ToRedisArgs` trait. + /// - **consumer**: A consumer name, which must implement the `ToRedisArgs` trait. + /// - **min_idle_time**: The minimum idle time in milliseconds. + /// - **next_id_to_claim**: The next ID to claim, which must implement the `ToRedisArgs` trait. + /// - **count**: The number of messages to claim. + /// + /// # Returns: + /// A [`RedisResult`] with a tuple of a vector of [`StreamId`]s and the next ID to claim. + /// If the operation is successful, the function will return a tuple with a vector of [`StreamId`]s and the next ID to claim. + /// If an error occurs, the function will return an error result. + fn claim_pending_messages( + &mut self, + key: &K, + group: &G, + consumer: &N, + min_idle_time: usize, + next_id_to_claim: ID, + count: usize, + ) -> RedisResult<(Vec, NextIdToClaim)> + where + G: ToRedisArgs, + N: ToRedisArgs, + ID: ToRedisArgs; + + /// Verify if a message is still in the consumer pending list. + /// + /// # Arguments: + /// - **key**: A stream key, which must implement the `ToRedisArgs` trait. + /// - **group**: A consumers group, which must implement the `ToRedisArgs` trait. + /// - **consumer**: A consumer name, which must implement the `ToRedisArgs` trait. + /// - **id**: The ID of the message to verify, which must implement the `ToRedisArgs` trait. + /// + /// # Returns: + /// A [`RedsumerResult`] with a boolean value. If the message is still in the consumer pending list, the function will return `true`. If the message is not in the consumer pending list, the function will return `false`. If an error occurs, the function will return an error result. + fn is_still_mine( + &mut self, + key: K, + group: G, + consumer: CN, + id: ID, + ) -> RedsumerResult + where + G: ToRedisArgs, + CN: ToRedisArgs, + ID: ToRedisArgs; + + /// Acknowledge a message in a consumer group. + /// + /// # Arguments: + /// - **key**: A stream key, which must implement the `ToRedisArgs` trait. + /// - **group**: A consumers group, which must implement the `ToRedisArgs` trait. + /// - **id**: The ID of the message to acknowledge, which must implement the `ToRedisArgs` trait. + /// + /// # Returns: + /// A [`RedsumerResult`] with a boolean value. If the message was successfully acknowledged, the function will return `true`. If the message was not acknowledged, the function will return `false`. If an error occurs, the function will return an error result. + fn ack(&mut self, key: K, group: G, id: ID) -> RedsumerResult + where + G: ToRedisArgs, + ID: ToRedisArgs; +} + +impl ConsumerCommands for C +where + C: Commands, + K: ToRedisArgs + ToString, +{ + fn verify_if_stream_exists(&mut self, key: K) -> RedsumerResult<()> + where + K: ToRedisArgs, + { + verify_if_stream_exists(self, key) + } + + fn create_consumer_group( + &mut self, + key: K, + group: G, + since_id: ID, + ) -> RedsumerResult + where + G: ToRedisArgs, + ID: ToRedisArgs, + { + create_consumer_group(self, key, group, since_id) + } + + fn read_new_messages( + &mut self, + key: &K, + group: &G, + consumer: &N, + count: usize, + block: usize, + ) -> RedisResult> + where + G: ToRedisArgs, + N: ToRedisArgs, + { + read_new_messages(self, key, group, consumer, count, block) + } + + fn read_pending_messages( + &mut self, + key: &K, + group: &G, + consumer: &N, + latest_pending_message_id: ID, + count: usize, + ) -> RedisResult<(Vec, LatestPendingMessageId)> + where + G: ToRedisArgs, + N: ToRedisArgs, + ID: ToRedisArgs, + { + read_pending_messages(self, key, group, consumer, latest_pending_message_id, count) + } + + fn claim_pending_messages( + &mut self, + key: &K, + group: &G, + consumer: &N, + min_idle_time: usize, + next_id_to_claim: ID, + count: usize, + ) -> RedisResult<(Vec, NextIdToClaim)> + where + G: ToRedisArgs, + N: ToRedisArgs, + ID: ToRedisArgs, + { + claim_pending_messages( + self, + key, + group, + consumer, + min_idle_time, + next_id_to_claim, + count, + ) + } + + fn is_still_mine( + &mut self, + key: K, + group: G, + consumer: CN, + id: ID, + ) -> RedsumerResult + where + G: ToRedisArgs, + CN: ToRedisArgs, + ID: ToRedisArgs, + { + is_still_mine(self, key, group, consumer, id) + } + + fn ack(&mut self, key: K, group: G, id: ID) -> RedsumerResult + where + G: ToRedisArgs, + ID: ToRedisArgs, + { + ack(self, key, group, id) + } +} + +#[cfg(test)] +mod test_create_consumer_group { + use redis::{cmd, ErrorKind, RedisError}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_create_non_existent_consumer_group() { + // Define the key, group, and since_id: + let key: &str = "my-key"; + let group: &str = "my-group"; + let since_id: &str = "0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, &str>( + cmd("XGROUP") + .arg("CREATE") + .arg(key) + .arg(group) + .arg(since_id), + Ok("Ok"), + )]); + + // Create the consumer group: + let result: RedsumerResult = conn.create_consumer_group(key, group, since_id); + + // Verify the result: + assert!(result.is_ok()); + assert!(result.unwrap()) + } + + #[test] + fn test_create_existent_consumer_group() { + // Define the key, group, and since_id: + let key: &str = "my-key"; + let group: &str = "my-group"; + let since_id: &str = "0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, &str>( + cmd("XGROUP") + .arg("CREATE") + .arg(key) + .arg(group) + .arg(since_id), + Err(RedisError::from(( + ErrorKind::ResponseError, + "BUSYGROUP Consumer Group name already exists", + ))), + )]); + + // Create the consumer group: + let result: RedsumerResult = conn.create_consumer_group(key, group, since_id); + + // Verify the result: + assert!(result.is_ok()); + assert!(!result.unwrap()) + } + + #[test] + fn test_create_consumer_group_error() { + // Define the key, group, and since_id: + let key: &str = "my-key"; + let group: &str = "my-group"; + let since_id: &str = "0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, &str>( + cmd("XGROUP") + .arg("CREATE") + .arg(key) + .arg(group) + .arg(since_id), + Err(RedisError::from((ErrorKind::ResponseError, "XGROUP Error"))), + )]); + + // Create the consumer group: + let result: RedsumerResult = conn.create_consumer_group(key, group, since_id); + + // Verify the result: + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod test_verify_if_stream_exists { + use redis::{cmd, ErrorKind, RedisError}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_verify_if_stream_exists() { + // Define the key: + let key: &str = "my-key"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, i64>(cmd("EXISTS").arg(key), Ok(1))]); + + // Verify if the stream exists: + let result: RedsumerResult<()> = conn.verify_if_stream_exists(key); + + // Verify the result: + assert!(result.is_ok()); + } + + #[test] + fn test_verify_if_stream_does_not_exist() { + // Define the key: + let key: &str = "my-key"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, i64>(cmd("EXISTS").arg(key), Ok(0))]); + + // Verify if the stream exists: + let result: RedsumerResult<()> = conn.verify_if_stream_exists(key); + + // Verify the result: + assert!(result.is_err()); + } + + #[test] + fn test_verify_if_stream_exists_error() { + // Define the key: + let key: &str = "my-key"; + + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![MockCmd::new::<_, i64>( + cmd("EXISTS").arg(key), + Err(RedisError::from((ErrorKind::ResponseError, "EXISTS Error"))), + )]); + + // Verify if the stream exists: + let result: RedsumerResult<()> = conn.verify_if_stream_exists(key); + + // Verify the result: + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod test_read_new_messages { + use redis::{cmd, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_read_new_messages_with_zero_count() { + // Define the key, group, consumer, count, and block: + let key: &str = "my-key"; + let group: &str = "my-group"; + let consumer: &str = "my-consumer"; + let count: usize = 0; + let block: usize = 1; + + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![]); + + // Read new messages: + let result: RedisResult> = + conn.read_new_messages(&key, &group, &consumer, count, block); + + // Verify the result: + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn test_read_new_messages_ok() { + // Define the key, group, and consumer: + let key: &str = "my-key"; + let group: &str = "my-group"; + let consumer: &str = "my-consumer"; + let count: usize = 2; + let block: usize = 1; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XREADGROUP") + .arg( + &StreamReadOptions::default() + .group(group, consumer) + .count(count) + .block(block), + ) + .arg("STREAMS") + .arg(&[key]) + .arg(&[">"]), + Ok(Value::Array(vec![Value::Map(vec![ + ( + Value::SimpleString("my-key".to_string()), + Value::Array(vec![Value::Map(vec![( + Value::SimpleString("1-0".to_string()), + Value::Array(vec![ + Value::SimpleString("code".to_string()), + Value::Int(1), + ]), + )])]), + ), + ( + Value::SimpleString("fake-key".to_string()), + Value::Array(vec![Value::Map(vec![( + Value::SimpleString("666-0".to_string()), + Value::Array(vec![ + Value::SimpleString("code".to_string()), + Value::Int(666), + ]), + )])]), + ), + ])])), + )]); + + // Consume messages: + let result: RedsumerResult> = + conn.read_new_messages(&key, &group, &consumer, count, block); + + // Verify the result: + assert!(result.is_ok()); + + // Verify the messages: + let messages: Vec = result.unwrap(); + assert!(messages.len().eq(&1)); + + assert!(messages[0].id.eq("1-0")); + assert!(messages[0].map.get("code").unwrap().eq(&Value::Int(1))); + } + + #[test] + fn test_read_new_messages_error() { + // Define the key, group, and consumer: + let key: &str = "my-key"; + let group: &str = "my-group"; + let consumer: &str = "my-consumer"; + let count: usize = 2; + let block: usize = 1; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XREADGROUP") + .arg( + &StreamReadOptions::default() + .group(group, consumer) + .count(count) + .block(block), + ) + .arg("STREAMS") + .arg(&[key]) + .arg(&[">"]), + Err(RedisError::from(( + ErrorKind::ResponseError, + "XREADGROUP Error", + ))), + )]); + + // Consume messages: + let result: RedsumerResult> = + conn.read_new_messages(&key, &group, &consumer, count, block); + + // Verify the result: + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod test_read_pending_messages { + use redis::{cmd, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_read_pending_messages_with_zero_count() { + // Define the key, group, consumer, latest_pending_message_id, and count: + let key: &str = "my-key"; + let group: &str = "my-group"; + let consumer: &str = "my-consumer"; + let latest_pending_message_id: &str = "0-0"; + let count: usize = 0; + + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![]); + + // Read pending messages: + let result: RedsumerResult<(Vec, LatestPendingMessageId)> = + conn.read_pending_messages(&key, &group, &consumer, latest_pending_message_id, count); + + // Verify the result: + assert!(result.is_ok()); + + let (messages, next_id_to_claim): (Vec, LatestPendingMessageId) = result.unwrap(); + assert!(messages.is_empty()); + assert!(next_id_to_claim.eq(BEGINNING_OF_TIME_ID)); + } + + #[test] + fn test_read_pending_messages_empty() { + // Define the key, group, consumer, latest_pending_message_id, and count: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let latest_pending_message_id = "0-0"; + let count = 2; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XREADGROUP") + .arg( + &StreamReadOptions::default() + .group(group, consumer) + .count(count), + ) + .arg("STREAMS") + .arg(&[key]) + .arg(&[latest_pending_message_id]), + Ok(Value::Array(vec![Value::Map(vec![])])), + )]); + + // Read pending messages: + let result: RedsumerResult<(Vec, LatestPendingMessageId)> = + conn.read_pending_messages(&key, &group, &consumer, latest_pending_message_id, count); + + // Verify the result: + assert!(result.is_ok()); + + let (messages, next_id_to_claim): (Vec, LatestPendingMessageId) = result.unwrap(); + assert!(messages.len().eq(&0)); + assert!(next_id_to_claim.eq(BEGINNING_OF_TIME_ID)); + } + + #[test] + fn test_read_pending_messages_ok() { + // Define the key, group, consumer, latest_pending_message_id, and count: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let latest_pending_message_id = "0-0"; + let count = 2; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XREADGROUP") + .arg( + &StreamReadOptions::default() + .group(group, consumer) + .count(count), + ) + .arg("STREAMS") + .arg(&[key]) + .arg(&[latest_pending_message_id]), + Ok(Value::Array(vec![Value::Map(vec![ + ( + Value::SimpleString("my-key".to_string()), + Value::Array(vec![Value::Map(vec![( + Value::SimpleString("1-0".to_string()), + Value::Array(vec![ + Value::SimpleString("code".to_string()), + Value::Int(1), + ]), + )])]), + ), + ( + Value::SimpleString("fake-key".to_string()), + Value::Array(vec![Value::Map(vec![( + Value::SimpleString("666-0".to_string()), + Value::Array(vec![ + Value::SimpleString("code".to_string()), + Value::Int(666), + ]), + )])]), + ), + ])])), + )]); + + // Read pending messages: + let result: RedsumerResult<(Vec, LatestPendingMessageId)> = + conn.read_pending_messages(&key, &group, &consumer, latest_pending_message_id, count); + + // Verify the result: + assert!(result.is_ok()); + + let (messages, next_id_to_claim): (Vec, LatestPendingMessageId) = result.unwrap(); + assert!(messages.len().eq(&1)); + + assert!(messages[0].id.eq("1-0")); + assert!(messages[0].map.get("code").unwrap().eq(&Value::Int(1))); + + assert!(next_id_to_claim.eq("1-0")); + } + + #[test] + fn test_read_pending_messages_error() { + // Define the key, group, consumer, latest_pending_message_id, and count: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let latest_pending_message_id = "0-0"; + let count = 2; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XREADGROUP") + .arg( + &StreamReadOptions::default() + .group(group, consumer) + .count(count), + ) + .arg("STREAMS") + .arg(&[key]) + .arg(&[latest_pending_message_id]), + Err(RedisError::from(( + ErrorKind::ResponseError, + "XREADGROUP Error", + ))), + )]); + + // Read pending messages: + let result: RedsumerResult<(Vec, LatestPendingMessageId)> = + conn.read_pending_messages(&key, &group, &consumer, latest_pending_message_id, count); + + // Verify the result: + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod test_claim_pending_messages { + use redis::{cmd, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_claim_pending_messages_with_zero_count() { + // Define the key, group, consumer, min_idle_time, next_id_to_claim, and count: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let min_idle_time = 1000; + let next_id_to_claim = "0-0"; + let count = 0; + + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![]); + + // Claim pending messages: + let result: RedisResult<(Vec, NextIdToClaim)> = conn.claim_pending_messages( + &key, + &group, + &consumer, + min_idle_time, + next_id_to_claim, + count, + ); + + // Verify the result: + assert!(result.is_ok()); + + let (messages, next_id_to_claim): (Vec, NextIdToClaim) = result.unwrap(); + assert!(messages.is_empty()); + assert!(next_id_to_claim.eq(BEGINNING_OF_TIME_ID)); + } + + #[test] + fn test_claim_pending_messages_empty() { + // Define the key, group, consumer, min_idle_time, next_id_to_claim, and count: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let min_idle_time = 1000; + let next_id_to_claim = "0-0"; + let count = 2; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XAUTOCLAIM") + .arg(key) + .arg(group) + .arg(consumer) + .arg(min_idle_time) + .arg(next_id_to_claim) + .arg(&StreamAutoClaimOptions::default().count(count)), + Ok(Value::Array(vec![ + Value::SimpleString("0-0".to_string()), + Value::Array(vec![]), + Value::Array(vec![]), + ])), + )]); + + // Claim pending messages: + let result: RedisResult<(Vec, NextIdToClaim)> = conn.claim_pending_messages( + &key, + &group, + &consumer, + min_idle_time, + next_id_to_claim, + count, + ); + + // Verify the result: + assert!(result.is_ok()); + + let (messages, next_id_to_claim): (Vec, NextIdToClaim) = result.unwrap(); + assert!(messages.len().eq(&0)); + assert!(next_id_to_claim.eq(BEGINNING_OF_TIME_ID)); + } + + #[test] + fn test_claim_pending_messages_ok() { + // Define the key, group, consumer, min_idle_time, next_id_to_claim, and count: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let min_idle_time = 1000; + let next_id_to_claim = "0-0"; + let count = 2; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XAUTOCLAIM") + .arg(key) + .arg(group) + .arg(consumer) + .arg(min_idle_time) + .arg(next_id_to_claim) + .arg(&StreamAutoClaimOptions::default().count(count)), + Ok(Value::Array(vec![ + Value::SimpleString("1-0".to_string()), + Value::Array(vec![Value::Array(vec![ + Value::SimpleString("1-0".to_string()), + Value::Array(vec![Value::SimpleString("code".to_string()), Value::Int(1)]), + ])]), + Value::Array(vec![]), + ])), + )]); + + // Claim pending messages: + let result: RedisResult<(Vec, NextIdToClaim)> = conn.claim_pending_messages( + &key, + &group, + &consumer, + min_idle_time, + next_id_to_claim, + count, + ); + + // Verify the result: + assert!(result.is_ok()); + + let (messages, next_id_to_claim): (Vec, NextIdToClaim) = result.unwrap(); + assert!(messages.len().eq(&1)); + + assert!(messages[0].id.eq("1-0")); + assert!(messages[0].map.get("code").unwrap().eq(&Value::Int(1))); + + assert!(next_id_to_claim.eq("1-0")); + } + + #[test] + fn test_claim_pending_messages_error() { + // Define the key, group, consumer, min_idle_time, next_id_to_claim, and count: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let min_idle_time = 1000; + let next_id_to_claim = "0-0"; + let count = 2; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XAUTOCLAIM") + .arg(key) + .arg(group) + .arg(consumer) + .arg(min_idle_time) + .arg(next_id_to_claim) + .arg(&StreamAutoClaimOptions::default().count(count)), + Err(RedisError::from(( + ErrorKind::ResponseError, + "XAUTOCLAIM Error", + ))), + )]); + + // Claim pending messages: + let result: RedisResult<(Vec, NextIdToClaim)> = conn.claim_pending_messages( + &key, + &group, + &consumer, + min_idle_time, + next_id_to_claim, + count, + ); + + // Verify the result: + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod test_if_is_still_mine { + use redis::{cmd, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_is_still_mine_true() { + // Define the key, group, consumer, and id: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let id = "1-0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XPENDING") + .arg(key) + .arg(group) + .arg(&[id]) + .arg(&[id]) + .arg(1) + .arg(consumer), + Ok(Value::Array(vec![Value::Array(vec![ + Value::BulkString(b"1526984818136-0".to_vec()), + Value::BulkString(b"consumer-123".to_vec()), + Value::Int(196415), + Value::Int(1), + ])])), + )]); + + // Verify if the message is still in the consumer pending list: + let result: RedsumerResult = conn.is_still_mine(key, group, consumer, id); + + // Verify the result: + assert!(result.is_ok()); + assert!(result.unwrap()); + } + + #[test] + fn test_is_still_mine_false() { + // Define the key, group, consumer, and id: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + let id = "1-0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XPENDING") + .arg(key) + .arg(group) + .arg(&[id]) + .arg(&[id]) + .arg(1) + .arg(consumer), + Ok(Value::Array(vec![])), + )]); + + // Verify if the message is still in the consumer pending list: + let result: RedsumerResult = conn.is_still_mine(key, group, consumer, id); + + // Verify the result: + assert!(result.is_ok()); + assert!(!result.unwrap()); + } + + #[test] + fn test_is_still_mine_error() { + // Define the key, group, consumer, and id: + let key = "my-key"; + let group = "my-group"; + let consumer = "my-consumer"; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XPENDING") + .arg(key) + .arg(group) + .arg(&["1-0"]) + .arg(&["1-0"]) + .arg(1) + .arg(consumer), + Err(RedisError::from(( + ErrorKind::ResponseError, + "XPENDING Error", + ))), + )]); + + // Verify if the message is still in the consumer pending list: + let result: RedsumerResult = conn.is_still_mine(key, group, consumer, "1-0"); + + // Verify the result: + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod test_ack { + use redis::cmd; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_ack_ok_true() { + // Define the key, group, and id: + let key = "my-key"; + let group = "my-group"; + let id: &str = "1-0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![MockCmd::new::<_, i64>( + cmd("XACK").arg(key).arg(group).arg(&[id]), + Ok(1), + )]); + + // Acknowledge the message: + let result: RedsumerResult = conn.ack(key, group, id); + + // Verify the result: + assert!(result.is_ok()); + assert!(result.unwrap()); + } + + #[test] + fn test_ack_ok_false() { + // Define the key, group, and id: + let key = "my-key"; + let group = "my-group"; + let id = "1-0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![MockCmd::new::<_, i64>( + cmd("XACK").arg(key).arg(group).arg(&[id]), + Ok(0), + )]); + + // Acknowledge the message: + let result: RedsumerResult = conn.ack(key, group, id); + + // Verify the result: + assert!(result.is_ok()); + assert!(!result.unwrap()); + } + + #[test] + fn test_ack_error() { + // Define the key, group, and id: + let key = "my-key"; + let group = "my-group"; + let id = "1-0"; + + // Create a mock connection: + let mut conn: MockRedisConnection = MockRedisConnection::new(vec![MockCmd::new::<_, i64>( + cmd("XACK").arg(key).arg(group).arg(&[id]), + Err(RedisError::from((ErrorKind::ResponseError, "XACK Error"))), + )]); + + // Acknowledge the message: + let result: RedsumerResult = conn.ack(key, group, id); + + // Verify the result: + assert!(result.is_err()); + } +} diff --git a/redsumer-rs/src/core/streams/mod.rs b/redsumer-rs/src/core/streams/mod.rs new file mode 100644 index 0000000..4543392 --- /dev/null +++ b/redsumer-rs/src/core/streams/mod.rs @@ -0,0 +1,3 @@ +pub mod consumer; +pub mod producer; +pub mod types; diff --git a/redsumer-rs/src/core/streams/producer.rs b/redsumer-rs/src/core/streams/producer.rs new file mode 100644 index 0000000..84f1b37 --- /dev/null +++ b/redsumer-rs/src/core/streams/producer.rs @@ -0,0 +1,214 @@ +use redis::{Commands, FromRedisValue, RedisResult, ToRedisArgs}; +use tracing::{debug, error}; + +#[allow(unused_imports)] +use crate::core::result::{RedsumerError, RedsumerResult}; + +/// Produce a message to a Redis stream from a map. To set the ID of the message, this method use the value "*" to indicate that Redis should generate a new ID with the current timestamp. +fn produce_from_map(c: &mut C, key: K, map: M) -> RedisResult +where + C: Commands, + K: ToRedisArgs, + M: ToRedisArgs, + ID: FromRedisValue, +{ + match c.xadd_map(key, "*", map) { + Ok(id) => { + debug!("Message produced successfully"); + Ok(id) + } + Err(e) => { + error!("Error producing message: {:?}", e); + Err(e) + } + } +} + +/// Produce a message to a Redis stream from a list of items. To set the ID of the message, this method use the value "*" to indicate that Redis should generate a new ID with the current timestamp. +fn produce_from_items(c: &mut C, key: K, items: &[(F, V)]) -> RedisResult +where + C: Commands, + K: ToRedisArgs, + F: ToRedisArgs, + V: ToRedisArgs, + ID: FromRedisValue, +{ + match c.xadd(key, "*", items) { + Ok(id) => { + debug!("Message produced successfully"); + Ok(id) + } + Err(e) => { + error!("Error producing message: {:?}", e); + Err(e) + } + } +} + +/// A trait that bundles methods for producing messages in a Redis stream +pub trait ProducerCommands { + /// Produce a message to a Redis stream from a map. + /// + /// # Arguments: + /// - **key**: The key of the Redis stream, which must implement the `ToRedisArgs` trait. + /// - **map**: A map with the message fields and values, which must implement the `ToRedisArgs` trait. + /// + /// # Returns: + /// A [`RedsumerResult`] with the message ID if the message was produced successfully. Otherwise, a [`RedsumerError`] is returned. + fn produce_from_map(&mut self, key: K, map: M) -> RedsumerResult + where + K: ToRedisArgs, + M: ToRedisArgs; + + /// Produce a message to a Redis stream from a list of items. + /// + /// # Arguments: + /// - **key**: The key of the Redis stream, which must implement the `ToRedisArgs` trait. + /// - **items**: A list of tuples with the message fields and values, which must implement the `ToRedisArgs` trait. + /// + /// # Returns: + /// A [`RedsumerResult`] with the message ID if the message was produced successfully. Otherwise, a [`RedsumerError`] is returned. + fn produce_from_items(&mut self, key: K, items: &[(F, V)]) -> RedsumerResult + where + K: ToRedisArgs, + F: ToRedisArgs, + V: ToRedisArgs; +} + +impl ProducerCommands for C +where + C: Commands, +{ + fn produce_from_map(&mut self, key: K, map: M) -> RedsumerResult + where + K: ToRedisArgs, + M: ToRedisArgs, + { + produce_from_map(self, key, map) + } + + fn produce_from_items(&mut self, key: K, items: &[(F, V)]) -> RedsumerResult + where + K: ToRedisArgs, + F: ToRedisArgs, + V: ToRedisArgs, + { + produce_from_items(self, key, items) + } +} + +#[cfg(test)] +mod test_produce_from_map { + use std::collections::BTreeMap; + + use redis::{cmd, ErrorKind, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_produce_from_map_ok() { + // Define the key: + let key: &str = "my-key"; + + // Define the map: + let mut map: BTreeMap<&str, &str> = BTreeMap::new(); + map.insert("field", "value"); + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XADD").arg(key).arg("*").arg(map.to_owned()), + Ok(Value::SimpleString("1-0".to_string())), + )]); + + // Produce the message: + let result: RedsumerResult = conn.produce_from_map(key, map); + + // Verify the result: + assert!(result.is_ok()); + } + + #[test] + fn test_produce_from_map_error() { + // Define the key: + let key: &str = "my-key"; + + // Define the map: + let mut map: BTreeMap<&str, &str> = BTreeMap::new(); + map.insert("field", "value"); + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XADD").arg(key).arg("*").arg(map.to_owned()), + Err(RedsumerError::from(( + ErrorKind::ResponseError, + "XADD Error", + "XADD command failed".to_string(), + ))), + )]); + + // Produce the message: + let result: RedsumerResult = conn.produce_from_map(key, map); + + // Verify the result: + assert!(result.is_err()); + } +} + +#[cfg(test)] +mod test_produce_from_items { + use redis::{cmd, ErrorKind, Value}; + use redis_test::{MockCmd, MockRedisConnection}; + + use super::*; + + #[test] + fn test_produce_from_items_ok() { + // Define the key: + let key: &str = "my-key"; + + // Define the items: + let items: Vec<(&str, u8)> = vec![("number", 3), ("double", 6)]; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XADD").arg(key).arg("*").arg(&items), + Ok(Value::SimpleString("1-0".to_string())), + )]); + + // Produce the message: + let result: RedsumerResult = conn.produce_from_items(key, &items); + + // Verify the result: + assert!(result.is_ok()); + } + + #[test] + fn test_produce_from_items_error() { + // Define the key: + let key: &str = "my-key"; + + // Define the items: + let items: Vec<(&str, &str)> = vec![("field", "value")]; + + // Create a mock connection: + let mut conn: MockRedisConnection = + MockRedisConnection::new(vec![MockCmd::new::<_, Value>( + cmd("XADD").arg(key).arg("*").arg(&items), + Err(RedsumerError::from(( + ErrorKind::ResponseError, + "XADD Error", + "XADD command failed".to_string(), + ))), + )]); + + // Produce the message: + let result: RedsumerResult = conn.produce_from_items(key, &items); + + // Verify the result: + assert!(result.is_err()); + } +} diff --git a/redsumer-rs/src/core/streams/types.rs b/redsumer-rs/src/core/streams/types.rs new file mode 100644 index 0000000..52313a9 --- /dev/null +++ b/redsumer-rs/src/core/streams/types.rs @@ -0,0 +1,8 @@ +/// Stream message identifier. It is used to identify any message in a stream. +pub type Id = String; + +/// Represents the latest message ID that is pending to be processed. It is used to the read pending messages operation. +pub type LatestPendingMessageId = Id; + +/// Represents the next message ID to claim. It is used to the claim messages operation. +pub type NextIdToClaim = Id; diff --git a/redsumer-rs/src/lib.rs b/redsumer-rs/src/lib.rs index 7c22233..e7e945d 100644 --- a/redsumer-rs/src/lib.rs +++ b/redsumer-rs/src/lib.rs @@ -7,14 +7,14 @@ //! //! ```ini //! [dependencies] -//! redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.4.2" } +//! redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.5.0-beta.1" } //! ``` //! //! You can depend on it via cargo by adding the following dependency to your `Cargo.toml` file: //! //! ```ini //! [dependencies] -//! redsumer = { version = "0.4.2" } +//! redsumer = { version = "0.5.0-beta.1" } //! ``` //! //! ## Basic Usage @@ -32,38 +32,56 @@ //! //! #[tokio::main] //! async fn main() { -//! let credentials: Option = None; -//! let host: &str = "localhost"; -//! let port: &str = "6379"; -//! let db: &str = "0"; -//! let stream_name: &str = "my-stream"; -//! -//! let producer_result: RedsumerResult = -//! RedsumerProducer::new( -//! credentials, -//! host, -//! port, -//! db, -//! stream_name, -//! ); -//! -//! let producer: RedsumerProducer = producer_result.unwrap_or_else(|error| { -//! panic!("Error creating a new RedsumerProducer instance: {:?}", error); -//! }); -//! -//! let mut message: BTreeMap<&str, String> = BTreeMap::new(); -//! message.insert("id", Uuid::default().to_string()); -//! message.insert("started_at", OffsetDateTime::now_utc().to_string()); -//! -//! let id: Id = producer.produce(message).await.unwrap_or_else(|error| { -//! panic!("Error producing stream message from BTreeMap: {:?}", error.to_string()); -//! }); +//! let credentials: Option = None; +//! let host: &str = "localhost"; +//! let port: u16 = 6379; +//! let db: i64 = 0; +//! let stream_name: &str = "my-stream"; +//! +//! let args: ClientArgs = ClientArgs::new( +//! credentials, +//! host, +//! port, +//! db, +//! CommunicationProtocol::RESP2, +//! ); +//! +//! let config: ProducerConfig = ProducerConfig::new(stream_name); +//! +//! let producer_result: RedsumerResult = +//! Producer::new( +//! &args, +//! &config, +//! ); +//! +//! let producer: Producer = producer_result.unwrap_or_else(|error| { +//! panic!("Error creating a new RedsumerProducer instance: {:?}", error); +//! }); +//! +//! let mut message_1: BTreeMap<&str, String> = BTreeMap::new(); +//! message_1.insert("id", Uuid::new_v4().to_string()); +//! message_1.insert("started_at", OffsetDateTime::now_utc().to_string()); +//! +//! let mut message_2: Vec<(String, String)> = Vec::new(); +//! message_2.push(("id".to_string(), Uuid::new_v4().to_string())); +//! message_2.push(("started_at".to_string(), OffsetDateTime::now_utc().to_string())); +//! +//! let id_1: Id = producer.produce_from_map(message_1).await.unwrap_or_else(|error| { +//! panic!("Error producing stream message from BTreeMap: {:?}", error.to_string()); +//! }); +//! +//! let id_2: Id = producer.produce_from_items(message_2).await.unwrap_or_else(|error| { +//! panic!("Error producing stream message from Vec: {:?}", error.to_string()); +//! }); +//! +//! println!("Message 1 produced with id: {:?}", id_1); +//! println!("Message 2 produced with id: {:?}", id_2); //! } //! ``` //! //! Similar to the previous example, you can produce a message from a [HashMap](std::collections::HashMap) or a [HashSet](std::collections::HashSet). Go to [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases like producing a stream message from an instance of a struct. //! -//! The [produce](RedsumerProducer::produce) method accepts a generic type that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information. +//! The [produce_from_map](Producer::produce_from_map) and [produce_from_items](Producer::produce_from_items) methods accepts generic types that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information. //! //! #### Consume messages from a stream: //! @@ -75,73 +93,88 @@ //! //! #[tokio::main] //! async fn main() { -//! let credentials: Option = None; -//! let host: &str = "localhost"; -//! let port: &str = "6379"; -//! let db: &str = "0"; -//! let stream_name: &str = "my-stream"; -//! let group_name: &str = "group-name"; -//! let consumer_name: &str = "consumer"; -//! let since_id: &str = "0-0"; -//! let min_idle_time_milliseconds: usize = 1000; -//! let new_messages_count: usize = 3; -//! let pending_messages_count: usize = 2; -//! let claimed_messages_count: usize = 1; -//! let block: u8 = 5; -//! -//! let consumer_result: RedsumerResult = RedsumerConsumer::new( -//! credentials, -//! host, -//! port, -//! db, -//! stream_name, -//! group_name, -//! consumer_name, -//! since_id, -//! min_idle_time_milliseconds, -//! new_messages_count, -//! pending_messages_count, -//! claimed_messages_count, -//! block, -//! ); -//! -//! let mut consumer: RedsumerConsumer = consumer_result.unwrap_or_else(|error| { -//! panic!("Error creating a new RedsumerConsumer instance: {:?}", error); -//! }); -//! -//! loop { -//! let messages: Vec = consumer.consume().await.unwrap_or_else(|error| { -//! panic!("Error consuming messages from stream: {:?}", error); -//! }); -//! -//! for message in messages { -//! if consumer.is_still_mine(&message.id).unwrap_or_else(|error| { -//! panic!( -//! "Error checking if message is still in consumer pending list: {:?}", error -//! ); -//! }) { -//! // Process message ... -//! println!("Processing message: {:?}", message); -//! // ... -//! -//! let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| { -//! panic!("Error acknowledging message: {:?}", error); -//! }); -//! -//! if ack { -//! println!("Message acknowledged: {:?}", message); -//! } -//! } -//! } -//! } +//! let credentials: Option = None; +//! let host: &str = "localhost"; +//! let port: u16 = 6379; +//! let db: i64 = 0; +//! let stream_name: &str = "my-stream"; +//! let group_name: &str = "group-name"; +//! let consumer_name: &str = "consumer"; +//! let initial_stream_id: &str = "0-0"; +//! let min_idle_time_milliseconds: usize = 1000; +//! let new_messages_count: usize = 3; +//! let pending_messages_count: usize = 2; +//! let claimed_messages_count: usize = 1; +//! let block: usize = 5; +//! +//! let args: ClientArgs = ClientArgs::new( +//! credentials, +//! host, +//! port, +//! db, +//! CommunicationProtocol::RESP2, +//! ); +//! +//! let config: ConsumerConfig = ConsumerConfig::new( +//! stream_name, +//! group_name, +//! consumer_name, +//! ReadNewMessagesOptions::new( +//! new_messages_count, +//! block +//! ), +//! ReadPendingMessagesOptions::new( +//! pending_messages_count +//! ), +//! ClaimMessagesOptions::new( +//! claimed_messages_count, +//! min_idle_time_milliseconds +//! ), +//! ); +//! +//! let consumer_result: RedsumerResult = Consumer::new( +//! args, +//! config, +//! Some(initial_stream_id.to_string()), +//! ); +//! +//! let mut consumer: Consumer = consumer_result.unwrap_or_else(|error| { +//! panic!("Error creating a new RedsumerConsumer instance: {:?}", error); +//! }); +//! +//! loop { +//! let messages: Vec = consumer.consume().await.unwrap_or_else(|error| { +//! panic!("Error consuming messages from stream: {:?}", error); +//! }); +//! +//! for message in messages { +//! if consumer.is_still_mine(&message.id).unwrap_or_else(|error| { +//! panic!( +//! "Error checking if message is still in consumer pending list: {:?}", error +//! ); +//! }) { +//! // Process message ... +//! println!("Processing message: {:?}", message); +//! // ... +//! +//! let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| { +//! panic!("Error acknowledging message: {:?}", error); +//! }); +//! +//! if ack { +//! println!("Message acknowledged: {:?}", message); +//! } +//! } +//! } +//! } //! } //! ``` //! -//! In this example, the [consume](RedsumerConsumer::consume) method is called in a loop to consume messages from the stream. -//! The [consume](RedsumerConsumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream. -//! The [is_still_mine](RedsumerConsumer::is_still_mine) method is used to check if the message is still in the consumer pending list. -//! If it is, the message is processed and then acknowledged using the [ack](RedsumerConsumer::ack) method. -//! The [ack](RedsumerConsumer::ack) method returns a boolean indicating if the message was successfully acknowledged. +//! In this example, the [consume](Consumer::consume) method is called in a loop to consume messages from the stream. +//! The [consume](Consumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream. +//! The [is_still_mine](Consumer::is_still_mine) method is used to check if the message is still in the consumer pending list. +//! If it is, the message is processed and then acknowledged using the [ack](Consumer::ack) method. +//! The [ack](Consumer::ack) method returns a boolean indicating if the message was successfully acknowledged. //! //! The main objective of this message consumption strategy is to minimize the possibility that two or more consumers from the same consumer group operating simultaneously consume the same message at the same time. //! Knowing that it is a complex problem with no definitive solution, including business logic in the message processing instance will always improve results. @@ -155,7 +188,6 @@ //! #### Unwrap [Value](redis::Value) to a specific type: //! //! The [Value](redis::Value) enum represents a Redis value. It can be converted to a specific type using the [from_redis_value](redis::from_redis_value) function. This function can be imported from the [redis] module. -//! ***redsumer*** includes the [FromRedisValueHandler] struct that implements the [FromRedisValue](redis::FromRedisValue) trait for a lot of types. It is useful to convert a [Value](redis::Value) to a specific type reducing boilerplate code and total lines of code. //! //! ## Contributing //! @@ -166,12 +198,19 @@ //! - **Pull Requests**: If you've fixed a bug or implemented a new feature, we'd love to see your work! Please submit a pull request. Make sure your code follows the existing style and all tests pass. //! //! Thank you for your interest in improving `redsumer-rs`! +mod core; mod redsumer; -pub use redsumer::client::ClientCredentials; -pub use redsumer::consumer::*; -pub use redsumer::producer::*; -pub use redsumer::types::*; +pub use core::{ + client::{ClientArgs, ClientCredentials, CommunicationProtocol}, + result::{RedsumerError, RedsumerResult}, + streams::types::Id, +}; +pub use redsumer::consumer::{ + ClaimMessagesOptions, Consumer, ConsumerConfig, ReadNewMessagesOptions, + ReadPendingMessagesOptions, +}; +pub use redsumer::producer::{Producer, ProducerConfig}; pub mod redis { //! Utilities from [redis] crate. diff --git a/redsumer-rs/src/redsumer/client.rs b/redsumer-rs/src/redsumer/client.rs deleted file mode 100644 index b616550..0000000 --- a/redsumer-rs/src/redsumer/client.rs +++ /dev/null @@ -1,76 +0,0 @@ -use redis::Client; - -use super::types::RedsumerResult; - -/// To hold credentials to authenticate in *Redis*. -/// -/// This credentials are used to authenticate in *Redis* when server requires it. If server does not require it, you set it to `None`. -/// -/// This crate uses a connection URL in format: `redis://[][:@]:/`. Other formats are not supported yet. -pub struct ClientCredentials<'k> { - user: &'k str, - password: &'k str, -} - -impl<'k> ClientCredentials<'k> { - /// Get *user* - fn get_user(&self) -> &str { - self.user - } - - /// Get *password* - fn get_password(&self) -> &str { - self.password - } - - /// Build a new instance of [`ClientCredentials`]. - /// - /// # Arguments: - /// - **user**: Redis user. - /// - **password**: Redis password. - /// - /// # Returns: - /// A new instance of [`ClientCredentials`]. - /// - /// ```rust,no_run - /// use redsumer::ClientCredentials; - /// let credentials = ClientCredentials::new("user", "password"); - /// ``` - pub fn new(user: &'k str, password: &'k str) -> ClientCredentials<'k> { - ClientCredentials { user, password } - } -} - -/// Get a new [`Client`] instance to connect to *Redis* using a connection URL in format: -/// `redis://[][:@]:/` -/// -/// # Arguments: -/// - **credentials**: Option to authenticate in *Redis*. -/// - **host**: Redis host. -/// - **port**: Redis port. -/// - **db**: Redis database. -/// -/// # Returns: -/// - A [`RedsumerResult`] with a new instance of [`Client`] to connect to *Redis*. Otherwise, a [`RedsumerError`] is returned. -pub fn get_redis_client( - credentials: Option, - host: &str, - port: &str, - db: &str, -) -> RedsumerResult { - let url: String = match credentials { - Some(credentials) => { - format!( - "redis://{}:{}@{}:{}/{}", - credentials.get_user(), - credentials.get_password(), - host, - port, - db, - ) - } - None => format!("redis://{}:{}/{}", host, port, db,), - }; - - Ok(Client::open(url)?) -} diff --git a/redsumer-rs/src/redsumer/consumer.rs b/redsumer-rs/src/redsumer/consumer.rs index d29c2de..0cf2186 100644 --- a/redsumer-rs/src/redsumer/consumer.rs +++ b/redsumer-rs/src/redsumer/consumer.rs @@ -1,309 +1,292 @@ -use log::{debug, warn}; -use std::fmt::Debug; +use redis::{streams::StreamId, Client}; +use tracing::{debug, info}; -use redis::{ +use crate::core::streams::types::{LatestPendingMessageId, NextIdToClaim}; +#[allow(unused_imports)] +use crate::core::{ + client::{ClientArgs, RedisClientBuilder}, + connection::VerifyConnection, + result::{RedsumerError, RedsumerResult}, streams::{ - StreamClaimOptions, StreamClaimReply, StreamId, StreamPendingCountReply, StreamReadOptions, - StreamReadReply, + consumer::{ConsumerCommands, BEGINNING_OF_TIME_ID}, + types::Id, }, - Client, Commands, ConnectionLike, ErrorKind, RedisError, }; -use super::client::{get_redis_client, ClientCredentials}; - -#[allow(unused_imports)] -use super::types::{Id, RedsumerError, RedsumerResult}; - -/// A consumer implementation of Redis Streams. -/// -/// The consumer is responsible for consuming messages from a stream. It can read new messages, pending messages or claim messages from other consumers according to their min idle time. -/// -/// The consumer can be created using the [new](`RedsumerConsumer::new`) method. After creating a new consumer, it is possible to consume messages using the [consume](`RedsumerConsumer::consume`) method. -/// -/// Also it is possible to verify if a specific message is still in consumer pending list using the [is_still_mine](`RedsumerConsumer::is_still_mine`) method. -/// -/// The consumer can also ack a message using the [ack](`RedsumerConsumer::ack`) method. If the message is acked, it is removed from the consumer pending list. -/// -/// Take a look at the [new](`RedsumerConsumer::new`) to know more about the consumer creation process and its parameters. +/// Options used to configure the consume operation when reading new messages from a Redis stream. #[derive(Debug, Clone)] -pub struct RedsumerConsumer<'c> { - client: Client, - stream_name: &'c str, - group_name: &'c str, - consumer_name: &'c str, - since_id: &'c str, - min_idle_time_milliseconds: usize, - new_messages_count: usize, - pending_messages_count: usize, - claimed_messages_count: usize, - block: u8, +pub struct ReadNewMessagesOptions { + /// The number of new messages to read from the stream. + count: usize, + + /// The block time [seconds] to wait for new messages to arrive in the stream. + block: usize, } -impl<'c> RedsumerConsumer<'c> { - /// Get [`Client`]. - fn get_client(&self) -> &Client { - &self.client +impl ReadNewMessagesOptions { + /// Get the number of new messages to read from the stream. + pub fn get_count(&self) -> usize { + self.count } - /// Get *stream_name*. - pub fn get_stream_name(&self) -> &'c str { - self.stream_name + /// Get the block time to wait for new messages to arrive in the stream. + pub fn get_block(&self) -> usize { + self.block } - /// Get *group_name*. - pub fn get_group_name(&self) -> &str { - self.group_name + /// Create a new instance of [`ReadNewMessagesOptions`]. + /// + /// # Arguments: + /// - **count**: The number of new messages to read from the stream. + /// - **block**: The block time in seconds to wait for new messages to arrive in the stream. + /// + /// # Returns: + /// A new instance of [`ReadNewMessagesOptions`] with the given count and block time. + pub fn new(count: usize, block: usize) -> Self { + ReadNewMessagesOptions { count, block } } +} - /// Get *since_id*. - pub fn get_since_id(&self) -> &str { - self.since_id - } +/// Options used to configure the consume operation when reading pending messages from a Redis stream. +#[derive(Debug, Clone)] +pub struct ReadPendingMessagesOptions { + /// The number of pending messages to read from the stream. + count: usize, - /// Get *consumer_name*. - pub fn get_consumer_name(&self) -> &str { - self.consumer_name + /// The latest pending message ID to start reading from. + latest_pending_message_id: String, +} + +impl ReadPendingMessagesOptions { + /// Get the number of pending messages to read from the stream. + pub fn get_count(&self) -> usize { + self.count } - /// Get *min_idle_time_milliseconds*. - pub fn get_min_idle_time_milliseconds(&self) -> usize { - self.min_idle_time_milliseconds + /// Get the latest pending message ID to start reading from. + fn get_latest_pending_message_id(&self) -> &str { + &self.latest_pending_message_id } - /// Get *new_messages_count*. - pub fn get_new_messages_count(&self) -> usize { - self.new_messages_count + /// Create a new instance of [`ReadPendingMessagesOptions`]. + /// + /// # Arguments: + /// - **count**: The number of pending messages to read from the stream. + /// - **latest_pending_message_id**: The latest pending message ID to start reading from. + /// + /// # Returns: + /// A new instance of [`ReadPendingMessagesOptions`] with the given count and latest pending message ID. + pub fn new(count: usize) -> Self { + ReadPendingMessagesOptions { + count, + latest_pending_message_id: BEGINNING_OF_TIME_ID.to_string(), + } } +} + +/// Options used to configure the consume operation when claiming messages from a Redis stream. +#[derive(Debug, Clone)] +pub struct ClaimMessagesOptions { + /// The number of messages to claim from the stream. + count: usize, - /// Get *pending_messages_count*. - pub fn get_pending_messages_count(&self) -> usize { - self.pending_messages_count + /// The min idle time [milliseconds] to claim the messages. + min_idle_time: usize, + + /// The latest ID to start claiming from. + next_id_to_claim: String, +} + +impl ClaimMessagesOptions { + /// Get the number of messages to claim from the stream. + pub fn get_count(&self) -> usize { + self.count } - /// Get *claimed_messages_count*. - pub fn get_claimed_messages_count(&self) -> usize { - self.claimed_messages_count + /// Get the min idle time to claim the messages. + pub fn get_min_idle_time(&self) -> usize { + self.min_idle_time } - /// Get *block*. - pub fn get_block(&self) -> u8 { - self.block + /// Get the latest ID to start claiming from. + fn get_next_id_to_claim(&self) -> &str { + &self.next_id_to_claim } - /// Build a new [`RedsumerConsumer`] instance. + /// Create a new instance of [`ClaimMessagesOptions`]. /// - /// Before creating a new consumer, the following validations are performed: + /// # Arguments: + /// - **count**: The number of messages to claim from the stream. + /// - **min_idle_time**: The min idle time in milliseconds to claim the messages. /// - /// - If the *new_messages_count*, *pending_messages_count* and *claimed_messages_count* are all zero, a [`RedsumerError`] is returned. - /// - If connection string is invalid, a [`RedsumerError`] is returned. - /// - If connection to Redis server can not be established, a [`RedsumerError`] is returned. - /// - If the stream does not exist, a [`RedsumerError`] is returned: The stream must exist before creating a new consumer. - /// - If the consumers group does not exist, it is created based on the *stream_name*, *group_name* and *since_id*. If the consumers group already exists, a warning is logged. If an error occurs during the creation process, a [`RedsumerError`] is returned. + /// # Returns: + /// A new instance of [`ClaimMessagesOptions`] with the given count, min idle time and latest pending message ID. + pub fn new(count: usize, min_idle_time: usize) -> Self { + ClaimMessagesOptions { + count, + min_idle_time, + next_id_to_claim: BEGINNING_OF_TIME_ID.to_string(), + } + } +} + +/// Define the configuration parameters to create a consumer instance. +#[derive(Debug, Clone)] +pub struct ConsumerConfig { + /// Stream name where messages will be consumed. + stream_name: String, + + /// Group name where the consumer is registered. + group_name: String, + + /// Consumer name within the specified consumers group. + consumer_name: String, + + /// Options to configure the read new messages operation. + read_new_messages_options: ReadNewMessagesOptions, + + /// Options to configure the read pending messages operation. + read_pending_messages_options: ReadPendingMessagesOptions, + + /// Options to configure the claim messages operation. + claim_messages_options: ClaimMessagesOptions, +} + +impl ConsumerConfig { + /// Get **stream name**. + pub fn get_stream_name(&self) -> &str { + &self.stream_name + } + + /// Get **group name**. + pub fn get_group_name(&self) -> &str { + &self.group_name + } + + /// Get **consumer name**. + pub fn get_consumer_name(&self) -> &str { + &self.consumer_name + } + + /// Get **read new messages options**. + pub fn get_read_new_messages_options(&self) -> &ReadNewMessagesOptions { + &self.read_new_messages_options + } + + /// Get **read pending messages options**. + pub fn get_read_pending_messages_options(&self) -> &ReadPendingMessagesOptions { + &self.read_pending_messages_options + } + + /// Get **claim messages options**. + pub fn get_claim_messages_options(&self) -> &ClaimMessagesOptions { + &self.claim_messages_options + } + + /// Create a new [`ConsumerConfig`] instance. /// /// # Arguments: - /// - **credentials**: Optional [`ClientCredentials`] to authenticate in Redis. - /// - **host**: Redis host. - /// - **port**: Redis port. - /// - **db**: Redis database. - /// - **stream_name**: Stream name to consume messages. + /// - **stream_name**: The name of the stream where messages will be produced. /// - **group_name**: Consumers group name. /// - **consumer_name**: Represents the consumer name within the specified consumers group, which must be ensured to be unique. In a microservices architecture, for example, it is recommended to use the pod name. - /// - **since_id**: It is used to read and to claim pending messages from stream greater than the specified value. If consumers group does not exist, it is created based on this value. - /// - **min_idle_time_milliseconds**: It is the minimum idle time to claim pending messages, given in milliseconds. Only pending messages that have been idle for at least this long will be claimed. - /// - **new_messages_count**: Maximum number of new messages to read. - /// - **pending_messages_count**: Maximum number of pending messages to read. - /// - **claimed_messages_count**: Maximum number of claimed messages to read. - /// - **block**: Max time to wait for new messages, given in milliseconds. - /// - /// # Returns: - /// - A [`RedsumerResult`] containing a [`RedsumerConsumer`] instance. Otherwise, a [`RedsumerError`] is returned. + /// - **since_id**: Latest ID to start reading from. + /// - **read_new_messages_options**: Options to configure the read new messages operation. + /// - **read_pending_messages_options**: Options to configure the read pending messages operation. + /// - **claim_messages_options**: Options to configure the claim messages operation. /// - /// # Example: - /// Create a new [`RedsumerConsumer`] instance. - /// ```rust,no_run - /// use redsumer::{ClientCredentials, RedsumerConsumer}; - /// - /// let client_credentials = Some(ClientCredentials::new("user", "password")); - /// let host = "localhost"; - /// let port = "6379"; - /// let db = "0"; - /// let stream_name = "my_stream"; - /// let group_name = "my_consumer_group"; - /// let consumer_name = "my_consumer"; - /// let since_id = "0-0"; - /// let min_idle_time_milliseconds = 360000; - /// let new_messages_count = 10; - /// let pending_messages_count = 10; - /// let claimed_messages_count = 10; - /// let block = 5; - /// - /// let consumer: RedsumerConsumer = RedsumerConsumer::new( - /// client_credentials, - /// host, - /// port, - /// db, - /// stream_name, - /// group_name, - /// consumer_name, - /// since_id, - /// min_idle_time_milliseconds, - /// new_messages_count, - /// pending_messages_count, - /// claimed_messages_count, - /// block, - /// ).unwrap_or_else(|error| { - /// panic!("Error creating new RedsumerConsumer: {}", error); - /// }); - /// ``` + /// # Returns: + /// A new [`ConsumerConfig`] instance. pub fn new( - credentials: Option>, - host: &'c str, - port: &'c str, - db: &'c str, - stream_name: &'c str, - group_name: &'c str, - consumer_name: &'c str, - since_id: &'c str, - min_idle_time_milliseconds: usize, - new_messages_count: usize, - pending_messages_count: usize, - claimed_messages_count: usize, - block: u8, - ) -> RedsumerResult { - let total_messages_to_read: usize = - new_messages_count + pending_messages_count + claimed_messages_count; - if total_messages_to_read.eq(&0) { - return Err(RedisError::from(( - ErrorKind::TryAgain, - "Total messages to read must be grater than zero", - ))); + stream_name: &str, + group_name: &str, + consumer_name: &str, + read_new_messages_options: ReadNewMessagesOptions, + read_pending_messages_options: ReadPendingMessagesOptions, + claim_messages_options: ClaimMessagesOptions, + ) -> Self { + ConsumerConfig { + stream_name: stream_name.to_owned(), + group_name: group_name.to_owned(), + consumer_name: consumer_name.to_owned(), + read_new_messages_options, + read_pending_messages_options, + claim_messages_options, } - - let mut client: Client = get_redis_client(credentials, host, port, db)?; - - if !client.check_connection() { - return Err(RedisError::from(( - ErrorKind::TryAgain, - "Error getting connection to Redis server", - ))); - }; - - if !client.get_connection()?.exists::<_, bool>(stream_name)? { - return Err(RedisError::from(( - ErrorKind::TryAgain, - "Stream does not exist", - ))); - }; - - match client.get_connection()?.xgroup_create::<_, _, _, bool>( - stream_name, - group_name, - since_id, - ) { - Ok(_) => { - debug!("Consumers group {} was created successfully", group_name); - } - Err(error) => { - if error.to_string().contains("BUSYGROUP") { - debug!("Consumers group {} already exists", group_name); - } else { - return Err(error); - } - } - }; - - Ok(Self { - client, - stream_name, - group_name, - since_id, - consumer_name, - min_idle_time_milliseconds, - new_messages_count, - pending_messages_count, - claimed_messages_count, - block, - }) } +} - /// Read new messages from *stream* using [`Commands::xread_options`] ([`XREADGROUP`](https://redis.io/commands/xreadgroup/)). - fn read_new_messages(&self) -> RedsumerResult> { - let xreadgroup_response: StreamReadReply = - self.get_client().get_connection()?.xread_options( - &[self.get_stream_name()], - &[">"], - &StreamReadOptions::default() - .group(self.get_group_name(), self.get_consumer_name()) - .count(self.get_new_messages_count()) - .block(self.get_block().into()), - )?; +/// A consumer implementation of Redis Streams. The consumer is responsible for consuming messages from a stream. It can read new messages, pending messages or claim messages from other consumers according to their min idle time. +#[derive(Debug, Clone)] +pub struct Consumer { + /// Redis client to interact with Redis server. + client: Client, - let mut new_messages: Vec = Vec::new(); - for stream in xreadgroup_response.keys.iter() { - match stream.key.eq(self.get_stream_name()) { - true => new_messages.extend(stream.ids.to_owned()), - false => warn!("Unexpected stream name found: {}. ", stream.key), - }; - } + /// Consumer configuration parameters. + config: ConsumerConfig, +} - Ok(new_messages) +impl Consumer { + /// Get [`Client`]. + fn get_client(&self) -> &Client { + &self.client } - /// Read pending messages from *stream* until to a maximum of *pending_messages_count* using [`Commands::xread_options`] ([`XREADGROUP`](https://redis.io/commands/xreadgroup/)). - fn read_pending_messages(&self) -> RedsumerResult> { - let xreadgroup_response: StreamReadReply = - self.get_client().get_connection()?.xread_options( - &[self.get_stream_name()], - &[self.get_since_id()], - &StreamReadOptions::default() - .group(self.get_group_name(), self.get_consumer_name()) - .count(self.get_pending_messages_count()), - )?; + /// Get *config*. + pub fn get_config(&self) -> &ConsumerConfig { + &self.config + } - let mut pending_messages: Vec = Vec::new(); - for stream in xreadgroup_response.keys.iter() { - match stream.key.eq(self.get_stream_name()) { - true => pending_messages.extend(stream.ids.to_owned()), - false => warn!("Unexpected stream name found: {}. ", stream.key), - }; - } + /// Update the latest pending message ID to start reading from. + fn update_latest_pending_message_id(&mut self, id: &str) { + self.config + .read_pending_messages_options + .latest_pending_message_id = id.to_owned(); + } - Ok(pending_messages) + /// Update the next ID to claim. + fn update_next_id_to_claim(&mut self, id: &str) { + self.config.claim_messages_options.next_id_to_claim = id.to_owned(); } - /// Claim pending messages from *stream* from *since_id* to the newest one until to a maximum of *claimed_messages_count* using [`Commands::xpending_count`] ([`XPENDING`](https://redis.io/commands/xpending/)) and [`Commands::xclaim_options`] ([`XCLAIM`](https://redis.io/commands/xclaim/)). - fn claim_pending_messages(&self) -> RedsumerResult> { - let ids_to_claim: Vec = self - .get_client() - .get_connection()? - .xpending_count::<_, _, _, _, _, StreamPendingCountReply>( - self.get_stream_name(), - self.get_group_name(), - self.get_since_id(), - "+", - self.get_claimed_messages_count(), - )? - .ids - .iter() - .map(|stream_pending_id| stream_pending_id.id.to_owned()) - .collect::>(); - - if ids_to_claim.is_empty() { - return Ok(Vec::new()); - } + /// Build a new [`Consumer`] instance. + /// + /// Before creating a new consumer, the following validations are performed: + /// + /// - If connection string is invalid, a [`RedsumerError`] is returned. + /// - If connection to Redis server can not be established, a [`RedsumerError`] is returned. + /// - If the stream does not exist, a [`RedsumerError`] is returned: The stream must exist before creating a new consumer. + /// - If the consumers group does not exist, it is created based on the *stream_name*, *group_name* and the given *initial_stream_id*. If an error occurs during the creation process, a [`RedsumerError`] is returned. + /// + /// # Arguments: + /// - **args**: Client arguments to build a new [`Client`] instance. + /// - **config**: Consumer configuration parameters. + /// - **initial_stream_id**: The ID of the message to start consuming. + /// + /// # Returns: + /// - A [`RedsumerResult`] containing a [`Consumer`] instance. Otherwise, a [`RedsumerError`] is returned. + pub fn new( + args: ClientArgs, + config: ConsumerConfig, + initial_stream_id: Option, + ) -> RedsumerResult { + debug!( + "Creating a new consumer instance by: {:?} and {:?}", + args, config + ); - Ok(self - .get_client() - .get_connection()? - .xclaim_options::<_, _, _, _, _, StreamClaimReply>( - self.get_stream_name(), - self.get_group_name(), - self.get_consumer_name(), - self.get_min_idle_time_milliseconds(), - &ids_to_claim, - StreamClaimOptions::default(), - )? - .ids) + let mut client: Client = args.build()?; + client.ping()?; + + client.verify_if_stream_exists(config.get_stream_name())?; + client.create_consumer_group( + config.get_stream_name(), + config.get_group_name(), + initial_stream_id.unwrap_or(BEGINNING_OF_TIME_ID.to_string()), + )?; + + info!("Consumer was created successfully and it is ready to be used"); + + Ok(Self { client, config }) } /// Consume messages from stream according to the following steps: @@ -319,33 +302,75 @@ impl<'c> RedsumerConsumer<'c> { /// # Returns: /// - A [`RedsumerResult`] containing a list of [`StreamId`] if new, pending or claimed messages are found, otherwise an empty list is returned. If an error occurs, a [`RedsumerError`] is returned. pub async fn consume(&mut self) -> RedsumerResult> { - debug!("Consuming messages from stream {}", self.get_stream_name()); - - debug!("Processing new messages"); - let new_messages: Vec = match self.get_new_messages_count().gt(&0) { - true => self.read_new_messages()?, - false => Vec::new(), - }; + debug!( + "Consuming messages from stream {}", + self.get_config().get_stream_name() + ); + + debug!( + "Processing new messages by: {:?}", + self.get_config().get_read_new_messages_options() + ); + + let new_messages: Vec = self.get_client().to_owned().read_new_messages( + &self.get_config().get_stream_name(), + &self.get_config().get_group_name(), + &self.get_config().get_consumer_name(), + self.get_config() + .get_read_new_messages_options() + .get_count(), + self.get_config() + .get_read_new_messages_options() + .get_block(), + )?; if new_messages.len().gt(&0) { debug!("Total new messages found: {}", new_messages.len()); return Ok(new_messages); } debug!("Processing pending messages"); - let pending_messages: Vec = match self.get_pending_messages_count().gt(&0) { - true => self.read_pending_messages()?, - false => Vec::new(), - }; + + let (pending_messages, latest_pending_message_id): (Vec, LatestPendingMessageId) = + self.get_client().to_owned().read_pending_messages( + &self.get_config().get_stream_name(), + &self.get_config().get_group_name(), + &self.get_config().get_consumer_name(), + self.get_config() + .get_read_pending_messages_options() + .get_latest_pending_message_id(), + self.get_config() + .get_read_pending_messages_options() + .get_count(), + )?; + + debug!("Updating latest pending message ID to: {latest_pending_message_id}",); + + self.update_latest_pending_message_id(&latest_pending_message_id); if pending_messages.len().gt(&0) { debug!("Total pending messages found: {}", pending_messages.len()); + return Ok(pending_messages); } debug!("Processing claimed messages"); - let claimed_messages: Vec = match self.get_claimed_messages_count().gt(&0) { - true => self.claim_pending_messages()?, - false => Vec::new(), - }; + + let (claimed_messages, next_id_to_claim): (Vec, NextIdToClaim) = + self.get_client().to_owned().claim_pending_messages( + &self.get_config().get_stream_name(), + &self.get_config().get_group_name(), + &self.get_config().get_consumer_name(), + self.get_config() + .get_claim_messages_options() + .get_min_idle_time(), + self.get_config() + .get_claim_messages_options() + .get_next_id_to_claim(), + self.get_config().get_claim_messages_options().get_count(), + )?; + + debug!("Updating next ID to claim to: {next_id_to_claim}",); + + self.update_next_id_to_claim(&next_id_to_claim); if claimed_messages.len().gt(&0) { debug!("Total claimed messages found: {}", claimed_messages.len()); return Ok(claimed_messages); @@ -366,20 +391,12 @@ impl<'c> RedsumerConsumer<'c> { /// # Returns: /// - A [`RedsumerResult`] containing a boolean value. If the message is still in consumer pending list, `true` is returned. Otherwise, `false` is returned. If an error occurs, a [`RedsumerError`] is returned. pub fn is_still_mine(&self, id: &Id) -> RedsumerResult { - Ok(self - .get_client() - .get_connection()? - .xpending_consumer_count::<_, _, _, _, _, _, StreamPendingCountReply>( - self.get_stream_name(), - self.get_group_name(), - id, - id, - 1, - self.get_consumer_name(), - )? - .ids - .len() - .gt(&0)) + self.get_client().to_owned().is_still_mine( + self.get_config().get_stream_name(), + self.get_config().get_group_name(), + self.get_config().get_consumer_name(), + id, + ) } /// Ack a message by *id*. @@ -392,10 +409,10 @@ impl<'c> RedsumerConsumer<'c> { /// # Returns: /// - A [`RedsumerResult`] containing a boolean value. If the message is acked, `true` is returned. Otherwise, `false` is returned. If an error occurs, a [`RedsumerError`] is returned. pub async fn ack(&self, id: &Id) -> RedsumerResult { - Ok(self.get_client().get_connection()?.xack::<_, _, _, bool>( - self.stream_name, - self.group_name, + self.get_client().to_owned().ack( + self.get_config().get_stream_name(), + self.get_config().get_group_name(), &[id], - )?) + ) } } diff --git a/redsumer-rs/src/redsumer/mod.rs b/redsumer-rs/src/redsumer/mod.rs index 39acdcb..934046a 100644 --- a/redsumer-rs/src/redsumer/mod.rs +++ b/redsumer-rs/src/redsumer/mod.rs @@ -1,7 +1,2 @@ -pub mod client; - pub mod consumer; - pub mod producer; - -pub mod types; diff --git a/redsumer-rs/src/redsumer/producer.rs b/redsumer-rs/src/redsumer/producer.rs index 034066a..b6268e8 100644 --- a/redsumer-rs/src/redsumer/producer.rs +++ b/redsumer-rs/src/redsumer/producer.rs @@ -1,31 +1,63 @@ -use redis::{Client, Commands, ConnectionLike, ErrorKind, RedisError, ToRedisArgs}; - -use super::client::{get_redis_client, ClientCredentials}; +use redis::{Client, ToRedisArgs}; +use tracing::{debug, info}; #[allow(unused_imports)] -use super::types::{Id, RedsumerError, RedsumerResult}; +use crate::core::{ + client::{ClientArgs, ClientCredentials, RedisClientBuilder}, + connection::VerifyConnection, + result::{RedsumerError, RedsumerResult}, + streams::{producer::ProducerCommands, types::Id}, +}; + +/// Define the configuration parameters to create a producer instance. +#[derive(Debug, Clone)] +pub struct ProducerConfig { + // Stream name where messages will be produced. + stream_name: String, +} + +impl ProducerConfig { + /// Get **stream name**. + pub fn get_stream_name(&self) -> &str { + &self.stream_name + } -/// A producer implementation of *Redis Streams*. -/// -/// This struct is responsible for producing messages in a stream. + /// Create a new [`ProducerConfig`] instance. + /// + /// # Arguments: + /// - **stream_name**: The name of the stream where messages will be produced. + /// + /// # Returns: + /// A new [`ProducerConfig`] instance. + pub fn new(stream_name: &str) -> Self { + ProducerConfig { + stream_name: stream_name.to_owned(), + } + } +} + +/// A producer implementation of Redis Streams. This struct is responsible for producing messages in a stream. #[derive(Debug, Clone)] -pub struct RedsumerProducer<'p> { +pub struct Producer { + /// Redis client to interact with Redis server. client: Client, - stream_name: &'p str, + + /// Producer configuration parameters. + config: ProducerConfig, } -impl<'p> RedsumerProducer<'p> { +impl Producer { /// Get [`Client`]. fn get_client(&self) -> &Client { &self.client } /// Get *stream name*. - pub fn get_stream_name(&self) -> &str { - self.stream_name + pub fn get_config(&self) -> &ProducerConfig { + &self.config } - /// Build a new [`RedsumerProducer`] instance. + /// Build a new [`Producer`] instance. /// /// Before creating a new producer, the following validations are performed: /// @@ -40,64 +72,58 @@ impl<'p> RedsumerProducer<'p> { /// - **stream_name**: Stream name to produce messages. /// /// # Returns: - /// - A [`RedsumerResult`] with the new [`RedsumerProducer`] instance. Otherwise, a [`RedsumerError`] is returned. - /// - /// # Example: - /// Create a new [`RedsumerProducer`] instance. - /// ```rust,no_run - /// use redsumer::RedsumerProducer; - /// - /// let producer: RedsumerProducer = RedsumerProducer::new( - /// None, - /// "localhost", - /// "6379", - /// "0", - /// "my_stream", - /// ).unwrap_or_else(|err| { - /// panic!("Error creating producer: {:?}", err); - /// }); - /// ``` - pub fn new( - credentials: Option>, - host: &'p str, - port: &'p str, - db: &'p str, - stream_name: &'p str, - ) -> RedsumerResult> { - let mut client: Client = get_redis_client(credentials, host, port, db)?; + /// - A [`RedsumerResult`] with the new [`Producer`] instance. Otherwise, a [`RedsumerError`] is returned. + pub fn new(args: &ClientArgs, config: &ProducerConfig) -> RedsumerResult { + debug!( + "Creating a new producer instance by: {:?} and {:?}", + args, config + ); + + let mut client: Client = args.build()?; + client.ping()?; - if !client.check_connection() { - return Err(RedisError::from(( - ErrorKind::TryAgain, - "Error getting connection to Redis server", - ))); - }; + info!("Producer instance created successfully and it is ready to be used"); - Ok(RedsumerProducer { + Ok(Producer { client, - stream_name, + config: config.to_owned(), }) } - /// Produce a new message in stream. + /// Produce a new message in the stream from a map. /// - /// This method produces a new message in the stream setting the *ID* as "*", which means that Redis will generate a new *ID* for the message automatically with the current timestamp. - /// - /// If stream does not exist, it will be created. + /// This method produces a new message in the stream setting the *ID* as "*", which means that Redis will generate a new *ID* for the message automatically with the current timestamp. If stream does not exist, it will be created. /// /// # Arguments: - /// - **message**: Message to produce in stream. It must implement [`ToRedisArgs`]. + /// - **map**: A map with the message to be produced. It must implement the [`ToRedisArgs`] trait. /// /// # Returns: /// - A [`RedsumerResult`] with the *ID* of the produced message. Otherwise, a [`RedsumerError`] is returned. - pub async fn produce(&self, message: M) -> RedsumerResult + pub async fn produce_from_map(&self, map: M) -> RedsumerResult where M: ToRedisArgs, { - self.get_client().get_connection()?.xadd_map::<_, _, _, Id>( - self.get_stream_name(), - "*", - message, - ) + self.get_client() + .to_owned() + .produce_from_map(self.get_config().get_stream_name(), map) + } + + /// Produce a new message in the stream from a list of items. + /// + /// This method produces a new message in the stream setting the *ID* as "*", which means that Redis will generate a new *ID* for the message automatically with the current timestamp. If stream does not exist, it will be created. + /// + /// # Arguments: + /// - **items**: A list of items with the message to be produced. Each item is a tuple with the field and the value. Both must implement the [`ToRedisArgs`] trait. + /// + /// # Returns: + /// - A [`RedsumerResult`] with the *ID* of the produced message. Otherwise, a [`RedsumerError`] is returned. + pub async fn produce_from_items(&self, items: Vec<(F, V)>) -> RedsumerResult + where + F: ToRedisArgs, + V: ToRedisArgs, + { + self.get_client() + .to_owned() + .produce_from_items(self.get_config().get_stream_name(), items.as_slice()) } } diff --git a/redsumer-rs/src/redsumer/types.rs b/redsumer-rs/src/redsumer/types.rs deleted file mode 100644 index 6cc35be..0000000 --- a/redsumer-rs/src/redsumer/types.rs +++ /dev/null @@ -1,437 +0,0 @@ -use bytes::Bytes; -use redis::{from_redis_value, ErrorKind, RedisError, Value}; -use serde::de::DeserializeOwned; -use serde_json::from_str as json_from_str; -use std::fmt::Debug; -use std::str::FromStr; -use time::format_description::well_known::{Iso8601, Rfc2822, Rfc3339}; -use time::parsing::Parsable; -use time::{Date, OffsetDateTime}; -use uuid::Uuid; - -/// Error type for *redsumer* operations, it's an alias for [`RedisError`]. -pub type RedsumerError = RedisError; - -/// Result type for *redsumer* operations. -pub type RedsumerResult = Result; - -/// Stream message identifier. -pub type Id = String; - -/// Handler to unwrap [`Value`] as specific types. -pub struct FromRedisValueHandler; - -impl FromRedisValueHandler { - /// Unwrap [`Value`] as a [`i8`]. - pub fn to_i8(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`i8`]. - pub fn to_optional_i8(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_i8(v)?)) - } - } - - /// Unwrap [`Value`] as a [`i16`]. - pub fn to_i16(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`i16`]. - pub fn to_optional_i16(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_i16(v)?)) - } - } - - /// Unwrap [`Value`] as a [`i32`]. - pub fn to_i32(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`i32`]. - pub fn to_optional_i32(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_i32(v)?)) - } - } - - /// Unwrap [`Value`] as a [`i64`]. - pub fn to_i64(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`i64`]. - pub fn to_optional_i64(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_i64(v)?)) - } - } - - /// Unwrap [`Value`] as a [`i128`]. - pub fn to_i128(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`i128`]. - pub fn to_optional_i128(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_i128(v)?)) - } - } - - /// Unwrap [`Value`] as a [`u8`]. - pub fn to_u8(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`u8`]. - pub fn to_optional_u8(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_u8(v)?)) - } - } - - /// Unwrap [`Value`] as a [`u16`]. - pub fn to_u16(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`u16`]. - pub fn to_optional_u16(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_u16(v)?)) - } - } - - /// Unwrap [`Value`] as a [`u32`]. - pub fn to_u32(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`u32`]. - pub fn to_optional_u32(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_u32(v)?)) - } - } - - /// Unwrap [`Value`] as a [`u64`]. - pub fn to_u64(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`u64`]. - pub fn to_optional_u64(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_u64(v)?)) - } - } - - /// Unwrap [`Value`] as a [`u128`]. - pub fn to_u128(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`u128`]. - pub fn to_optional_u128(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_u128(v)?)) - } - } - - /// Unwrap [`Value`] as a [`usize`]. - pub fn to_usize(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`usize`]. - pub fn to_optional_usize(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_usize(v)?)) - } - } - - /// Unwrap [`Value`] as a [`isize`]. - pub fn to_isize(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`isize`]. - pub fn to_optional_isize(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_isize(v)?)) - } - } - - /// Unwrap [`Value`] as a [`f32`]. - pub fn to_f32(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`f32`]. - pub fn to_optional_f32(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_f32(v)?)) - } - } - - /// Unwrap [`Value`] as a [`f64`]. - pub fn to_f64(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`f64`]. - pub fn to_optional_f64(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_f64(v)?)) - } - } - - /// Unwrap [`Value`] as a [`String`]. - pub fn to_string(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`String`]. - pub fn to_optional_string(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_string(v)?)) - } - } - - /// Unwrap [`Value`] as a [`bool`]. - pub fn to_bool(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`bool`]. - pub fn to_optional_bool(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_bool(v)?)) - } - } - - /// Unwrap [`Value`] as a [`Uuid`]. - pub fn to_uuid(&self, v: &Value) -> RedsumerResult { - match Uuid::from_str(&self.to_string(v)?) { - Ok(uuid) => Ok(uuid), - Err(error) => Err(RedisError::from(( - ErrorKind::TypeError, - "Response was of incompatible type", - format!( - "Value {:?} is not parsable as Uuid: {:?}", - v, - &error.to_string(), - ), - ))), - } - } - - /// Unwrap [`Value`] as an optional [`Uuid`]. - pub fn to_optional_uuid(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_uuid(v)?)) - } - } - - /// Unwrap [`Value`] as [`OffsetDateTime`] with format `F`. - pub fn to_offsetdatetime( - &self, - v: &Value, - format: &F, - ) -> RedsumerResult { - match OffsetDateTime::parse(&from_redis_value::(v)?, format) { - Ok(offsetdatetime) => Ok(offsetdatetime), - Err(error) => Err(RedisError::from(( - ErrorKind::TypeError, - "Response was of incompatible type", - format!( - "Value {:?} is not parsable as OffsetDatetime with specified format {:?}: {:?}", - v, - format, - &error.to_string(), - ), - ))), - } - } - - /// Unwrap [`Value`] as an optional [`OffsetDateTime`] with format `F`. - pub fn to_optional_offsetdatetime( - &self, - v: &Value, - format: &F, - ) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_offsetdatetime(v, format)?)) - } - } - - /// Unwrap [`Value`] as [`OffsetDateTime`] with specific format [`Iso8601`]. - pub fn to_offsetdatetime_from_iso8601(&self, v: &Value) -> RedsumerResult { - self.to_offsetdatetime(v, &Iso8601::DEFAULT) - } - - /// Unwrap [`Value`] as an optional [`OffsetDateTime`] with specific format [`Iso8601`]. - pub fn to_optional_offsetdatetime_from_iso8601( - &self, - v: &Value, - ) -> RedsumerResult> { - self.to_optional_offsetdatetime(v, &Iso8601::DEFAULT) - } - - /// Unwrap [`Value`] as [`OffsetDateTime`] with specific format [`Rfc2822`]. - pub fn to_offsetdatetime_from_rfc2822(&self, v: &Value) -> RedsumerResult { - self.to_offsetdatetime(v, &Rfc2822) - } - - /// Unwrap [`Value`] as an optional [`OffsetDateTime`] with specific format [`Rfc2822`]. - pub fn to_optional_offsetdatetime_from_rfc2822( - &self, - v: &Value, - ) -> RedsumerResult> { - self.to_optional_offsetdatetime(v, &Rfc2822) - } - - /// Unwrap [`Value`] as [`OffsetDateTime`] with specific format [`Rfc3339`]. - pub fn to_offsetdatetime_from_rfc3339(&self, v: &Value) -> RedsumerResult { - self.to_offsetdatetime(v, &Rfc3339) - } - - /// Unwrap [`Value`] as an optional [`OffsetDateTime`] with specific format [`Rfc3339`]. - pub fn to_optional_offsetdatetime_from_rfc3339( - &self, - v: &Value, - ) -> RedsumerResult> { - self.to_optional_offsetdatetime(v, &Rfc3339) - } - - /// Unwrap [`Value`] as [`Date`] with format `F`. - pub fn to_date( - &self, - v: &Value, - format: &F, - ) -> RedsumerResult { - match Date::parse(&from_redis_value::(v)?, format) { - Ok(date) => Ok(date), - Err(error) => Err(RedisError::from(( - ErrorKind::TypeError, - "Response was of incompatible type", - format!( - "Value {:?} is not parsable as Date with specified format {:?}: {:?}", - v, - format, - &error.to_string(), - ), - ))), - } - } - - /// Unwrap [`Value`] as an optional [`Date`] with format `F`. - pub fn to_optional_date( - &self, - v: &Value, - format: &F, - ) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_date(v, format)?)) - } - } - - /// Unwrap [`Value`] as [`Date`] with specific format [`Iso8601`]. - pub fn to_date_from_iso8601(&self, v: &Value) -> RedsumerResult { - self.to_date(v, &Iso8601::DEFAULT) - } - - /// Unwrap [`Value`] as an optional [`Date`] with specific format [`Iso8601`]. - pub fn to_optional_date_from_iso8601(&self, v: &Value) -> RedsumerResult> { - self.to_optional_date(v, &Iso8601::DEFAULT) - } - - /// Unwrap [`Value`] as [`Bytes`]. - pub fn to_bytes(&self, v: &Value) -> RedsumerResult { - from_redis_value::(v) - } - - /// Unwrap [`Value`] as an optional [`Bytes`]. - pub fn to_optional_bytes(&self, v: &Value) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_bytes(v)?)) - } - } - - /// Unwrap [`Value`] as an instance of generic struct `S`. - pub fn to_struct_instance(&self, v: &Value) -> RedsumerResult { - match json_from_str::(&from_redis_value::(v)?) { - Ok(obj) => Ok(obj), - Err(error) => Err(RedisError::from(( - ErrorKind::TypeError, - "Response was of incompatible type", - format!( - "Value {:?} is not parsable as instance of given struct S: {:?}", - v, - &error.to_string(), - ), - ))), - } - } - - /// Unwrap [`Value`] as an optional instance of generic struct `S`. - pub fn to_optional_struct_instance( - &self, - v: &Value, - ) -> RedsumerResult> { - if *v == Value::Nil { - Ok(None) - } else { - Ok(Some(self.to_struct_instance(v)?)) - } - } - - /// Build a new [`FromRedisValueHandler`] instance. - pub fn new() -> FromRedisValueHandler { - FromRedisValueHandler {} - } -} diff --git a/redsumer-rs/tests/test_redsumer.rs b/redsumer-rs/tests/test_redsumer.rs deleted file mode 100644 index 845035f..0000000 --- a/redsumer-rs/tests/test_redsumer.rs +++ /dev/null @@ -1,314 +0,0 @@ -#[cfg(test)] -pub mod test_redsumer { - use redsumer::redis::*; - use redsumer::*; - use std::collections::HashMap; - use std::time::Duration; - use tokio::time::sleep; - - #[tokio::test] - async fn test_client_with_server_credentials() { - let producer_res: RedsumerResult = RedsumerProducer::new( - Some(ClientCredentials::new("user", "password")), - "localhost", - "6379", - "0", - "test", - ); - - assert!(producer_res.is_err()); - - let error: RedsumerError = producer_res.unwrap_err(); - assert_eq!( - error.to_string(), - "Error getting connection to Redis server- TryAgain" - ); - } - - #[tokio::test] - async fn test_producer_debug_and_clone() { - let producer_res: RedsumerResult = RedsumerProducer::new( - None, - "localhost", - "6379", - "0", - "test-producer-debug-and-clone", - ); - - assert!(producer_res.is_ok()); - let producer: RedsumerProducer = producer_res.unwrap(); - - assert_eq!(format!("{:?}", producer), "RedsumerProducer { client: Client { connection_info: ConnectionInfo { addr: Tcp(\"localhost\", 6379), redis: RedisConnectionInfo { db: 0, username: None, password: None } } }, stream_name: \"test-producer-debug-and-clone\" }"); - assert_eq!( - producer.clone().get_stream_name(), - "test-producer-debug-and-clone" - ); - } - - #[tokio::test] - async fn test_consumer_debug_and_clone() { - let host: &str = "localhost"; - let port: &str = "6379"; - let db: &str = "0"; - let stream_name: &str = "test-consumer-debug-and-clone"; - - let producer_result: RedsumerResult = - RedsumerProducer::new(None, host, port, db, stream_name); - - assert!(producer_result.is_ok()); - let producer: RedsumerProducer = producer_result.unwrap(); - - let message: HashMap = [("key".to_string(), "value".to_string())] - .iter() - .cloned() - .collect(); - - let msg_result: RedsumerResult = producer.produce(message).await; - assert!(msg_result.is_ok()); - - let consumer_result: RedsumerResult = RedsumerConsumer::new( - None, - host, - port, - db, - stream_name, - "group-name", - "consumer", - "0-0", - 1000, - 3, - 2, - 1, - 5, - ); - - assert!(consumer_result.is_ok()); - let consumer: RedsumerConsumer = consumer_result.unwrap(); - - assert_eq!(format!("{:?}", consumer), "RedsumerConsumer { client: Client { connection_info: ConnectionInfo { addr: Tcp(\"localhost\", 6379), redis: RedisConnectionInfo { db: 0, username: None, password: None } } }, stream_name: \"test-consumer-debug-and-clone\", group_name: \"group-name\", consumer_name: \"consumer\", since_id: \"0-0\", min_idle_time_milliseconds: 1000, new_messages_count: 3, pending_messages_count: 2, claimed_messages_count: 1, block: 5 }"); - assert_eq!( - consumer.clone().get_stream_name(), - "test-consumer-debug-and-clone" - ); - } - - #[tokio::test] - async fn test_consumer_error_stream_not_found() { - let consumer_result: RedsumerResult = RedsumerConsumer::new( - None, - "localhost", - "6379", - "0", - "test-consumer-error-stream-not-found", - "group-name", - "test-constructor", - "0-0", - 1000, - 3, - 2, - 1, - 5, - ); - - assert!(consumer_result.is_err()); - assert_eq!( - consumer_result.unwrap_err().to_string(), - "Stream does not exist- TryAgain" - ); - } - - #[tokio::test] - async fn test_consumer_error_in_total_messages_to_read() { - let host: &str = "localhost"; - let port: &str = "6379"; - let db: &str = "0"; - let stream_name: &str = "test-consumer-error-in-total-messages-to-read"; - - let producer_result: RedsumerResult = - RedsumerProducer::new(None, host, port, db, stream_name); - - assert!(producer_result.is_ok()); - let producer: RedsumerProducer = producer_result.unwrap(); - - let message: HashMap = [("key".to_string(), "value".to_string())] - .iter() - .cloned() - .collect(); - - let msg_result: RedsumerResult = producer.produce(message).await; - assert!(msg_result.is_ok()); - - let consumer_result: RedsumerResult = RedsumerConsumer::new( - None, - host, - port, - db, - stream_name, - "group-name", - "consumer", - "0-0", - 1000, - 0, - 0, - 0, - 5, - ); - - assert!(consumer_result.is_err()); - assert_eq!( - consumer_result.unwrap_err().to_string(), - "Total messages to read must be grater than zero- TryAgain" - ); - } - - #[tokio::test] - async fn test_consumer_consume() { - let host: &str = "localhost"; - let port: &str = "6379"; - let db: &str = "0"; - let stream_name: &str = "test-consumer-consume"; - - let producer_result: RedsumerResult = - RedsumerProducer::new(None, host, port, db, stream_name); - - assert!(producer_result.is_ok()); - let producer: RedsumerProducer = producer_result.unwrap(); - - let mut produced_ids: Vec = Vec::new(); - for i in 0..15 { - let message: HashMap = [("key".to_string(), i.to_string())] - .iter() - .cloned() - .collect(); - - let msg_result: RedsumerResult = producer.produce(message).await; - assert!(msg_result.is_ok()); - - produced_ids.push(msg_result.unwrap()); - } - - // Consume new messages: - let consumer_result: RedsumerResult = RedsumerConsumer::new( - None, - host, - port, - db, - stream_name, - "group-name", - "consumer", - "0-0", - 1000, - 15, - 0, - 0, - 5, - ); - - assert!(consumer_result.is_ok()); - let mut consumer: RedsumerConsumer = consumer_result.unwrap(); - - let new_messages_result: RedsumerResult> = consumer.consume().await; - assert!(new_messages_result.is_ok()); - - let new_messages: Vec = new_messages_result.unwrap(); - assert_eq!(new_messages.len(), 15); - - for message in new_messages.iter() { - assert!(produced_ids.contains(&message.id)); - } - - let new_messages_result: RedsumerResult> = consumer.consume().await; - assert!(new_messages_result.is_ok()); - - let new_messages: Vec = new_messages_result.unwrap(); - assert_eq!(new_messages.len(), 0); - - // Consume pending messages: - let consumer_result: RedsumerResult = RedsumerConsumer::new( - None, - host, - port, - db, - stream_name, - "group-name", - "consumer", - "0-0", - 1000, - 0, - 15, - 0, - 5, - ); - - assert!(consumer_result.is_ok()); - let mut consumer: RedsumerConsumer = consumer_result.unwrap(); - - let pending_messages_result: RedsumerResult> = consumer.consume().await; - assert!(pending_messages_result.is_ok()); - - let pending_messages: Vec = pending_messages_result.unwrap(); - assert_eq!(pending_messages.len(), 15); - - for message in pending_messages.iter() { - assert!(produced_ids.contains(&message.id)); - } - - sleep(Duration::from_secs(2)).await; - - // Consume claimed messages: - let ghost_consumer_result: RedsumerResult = RedsumerConsumer::new( - None, - host, - port, - db, - stream_name, - "group-name", - "ghost-consumer", - "0-0", - 1000, - 0, - 0, - 15, - 5, - ); - - assert!(ghost_consumer_result.is_ok()); - let mut ghost_consumer: RedsumerConsumer = ghost_consumer_result.unwrap(); - - let claimed_messages_result: RedsumerResult> = ghost_consumer.consume().await; - assert!(claimed_messages_result.is_ok()); - - let claimed_messages: Vec = claimed_messages_result.unwrap(); - assert_eq!(claimed_messages.len(), 15); - - for message in claimed_messages.iter() { - assert!(produced_ids.contains(&message.id)); - } - - // Verify claimed messages are not available for the original consumer: - for message in claimed_messages.iter() { - let is_still_mine_result: RedsumerResult = consumer.is_still_mine(&message.id); - assert!(is_still_mine_result.is_ok()); - - let is_still_mine: bool = is_still_mine_result.unwrap(); - assert!(!is_still_mine); - } - - // Verify claimed messages are available for the ghost consumer: - for message in claimed_messages.iter() { - let is_still_mine_result: RedsumerResult = - ghost_consumer.is_still_mine(&message.id); - assert!(is_still_mine_result.is_ok()); - - let is_still_mine: bool = is_still_mine_result.unwrap(); - assert!(is_still_mine); - } - - // Ack messages: - for message in claimed_messages.iter() { - let ack_result: RedsumerResult = ghost_consumer.ack(&message.id).await; - assert!(ack_result.is_ok()); - assert!(ack_result.unwrap()); - } - } -} diff --git a/redsumer-rs/tests/test_types.rs b/redsumer-rs/tests/test_types.rs deleted file mode 100644 index 77903a4..0000000 --- a/redsumer-rs/tests/test_types.rs +++ /dev/null @@ -1,545 +0,0 @@ -#[cfg(test)] -pub mod test_from_redis_value_extended_methods { - use redsumer::redis::Value; - use redsumer::FromRedisValueHandler; - - use serde::{Deserialize, Serialize}; - - #[test] - fn test_numerics_from_redis_value() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - // Tests for: isize - assert!(from_redis_value_handler - .to_isize(&(Value::Data(String::from("-255").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_isize(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_isize(&(Value::Data(String::from("255").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_isize(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: i8 - assert!(from_redis_value_handler - .to_i8(&(Value::Data(String::from("127").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_i8(&(Value::Data(String::from("-130").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_i8(&(Value::Data(String::from("127").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_i8(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: i16 - assert!(from_redis_value_handler - .to_i16(&(Value::Data(String::from("127").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_i16(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_i16(&(Value::Data(String::from("127").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_i16(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: i32 - assert!(from_redis_value_handler - .to_i32(&(Value::Data(String::from("127").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_i32(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_i32(&(Value::Data(String::from("127").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_i32(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: i64 - assert!(from_redis_value_handler - .to_i64(&(Value::Data(String::from("127").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_i64(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_i64(&(Value::Data(String::from("127").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_i64(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: i128 - assert!(from_redis_value_handler - .to_i128(&(Value::Data(String::from("127").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_i128(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_i128(&(Value::Data(String::from("127").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_i128(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: usize - assert!(from_redis_value_handler - .to_usize(&(Value::Data(String::from("255").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_usize(&(Value::Data(String::from("-1").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_usize(&(Value::Data(String::from("255").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_usize(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: u8 - assert!(from_redis_value_handler - .to_u8(&(Value::Data(String::from("255").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_u8(&(Value::Data(String::from("-1").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_u8(&(Value::Data(String::from("255").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_u8(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for u16: - assert!(from_redis_value_handler - .to_u16(&(Value::Data(String::from("255").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_u16(&(Value::Data(String::from("-1").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_u16(&(Value::Data(String::from("65535").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_u16(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for u32: - assert!(from_redis_value_handler - .to_u32(&(Value::Data(String::from("255").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_u32(&(Value::Data(String::from("-1").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_u32(&(Value::Data(String::from("255").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_u32(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for u64: - assert!(from_redis_value_handler - .to_u64(&(Value::Data(String::from("255").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_u64(&(Value::Data(String::from("-1").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_u64(&(Value::Data(String::from("255").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_u64(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: u128 - assert!(from_redis_value_handler - .to_u128(&(Value::Data(String::from("127").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_u128(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_u128(&(Value::Data(String::from("127").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_u128(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: f32 - assert!(from_redis_value_handler - .to_f32(&(Value::Data(String::from("1345.5678").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_f32(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_f32(&(Value::Data(String::from("1345.5678").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_f32(&(Value::Nil)) - .unwrap() - .is_none()); - - // Tests for: f64 - assert!(from_redis_value_handler - .to_f64(&(Value::Data(String::from("1345.5678").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_f64(&(Value::Data(String::from("a").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_f64(&(Value::Data(String::from("1345.5678").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_f64(&(Value::Nil)) - .unwrap() - .is_none()); - } - - #[test] - fn test_get_string_from_redis_value() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_optional_string(&(Value::Data(String::from("hello-rusty").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_string(&(Value::Nil)) - .unwrap() - .is_none()); - } - - #[test] - fn test_bool_from_redis_value() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_bool(&(Value::Data(String::from("1").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_bool(&(Value::Data(String::from("0").into_bytes()))) - .is_ok()); - - assert!(from_redis_value_handler - .to_bool(&(Value::Data(String::from("-3").into_bytes()))) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_bool(&(Value::Data(String::from("1").into_bytes()))) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_bool(&(Value::Nil)) - .unwrap() - .is_none()); - } - - #[test] - fn test_uuid_from_redis_value() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_uuid( - &(Value::Data(String::from("2983cfeb-e2e0-4f21-b33e-bf678cb67f79").into_bytes())) - ) - .is_ok()); - - assert!(from_redis_value_handler - .to_uuid( - &(Value::Data(String::from("983cfeb-e2e0-4f21-b33e-bf678cb67f79").into_bytes())) - ) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_uuid( - &(&(Value::Data( - String::from("2983cfeb-e2e0-4f21-b33e-bf678cb67f79").into_bytes() - ))) - ) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_uuid(&Value::Nil) - .unwrap() - .is_none()); - } - - #[test] - fn test_get_offsetdatetime_from_redis_value_in_format_iso8601() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_offsetdatetime_from_iso8601( - &(Value::Data(String::from("2024-01-15T21:19:00+0800").into_bytes())) - ) - .is_ok()); - - assert!(from_redis_value_handler - .to_offsetdatetime_from_iso8601( - &(Value::Data(String::from("2024-01-15 21:19:00.000-05:00").into_bytes())) - ) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_offsetdatetime_from_iso8601( - &(Value::Data(String::from("2024-01-15T21:19:00+0800").into_bytes())) - ) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_offsetdatetime_from_iso8601(&Value::Nil) - .unwrap() - .is_none()); - } - - #[test] - fn test_get_offsetdatetime_from_redis_value_in_format_rfc2822() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_offsetdatetime_from_rfc2822( - &(Value::Data(String::from("Fri, 15 Jan 2024 21:19:00 -0500").into_bytes())) - ) - .is_ok()); - - assert!(from_redis_value_handler - .to_offsetdatetime_from_rfc2822( - &(Value::Data(String::from("2024-01-15 21:19:00.000-05:00").into_bytes())) - ) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_offsetdatetime_from_rfc2822( - &(Value::Data(String::from("Fri, 15 Jan 2024 21:19:00 -0500").into_bytes())) - ) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_offsetdatetime_from_rfc2822(&Value::Nil) - .unwrap() - .is_none()); - } - - #[test] - fn test_get_offsetdatetime_from_redis_value_in_format_rfc3339() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_offsetdatetime_from_rfc3339(&Value::Data( - String::from("2024-01-15T21:19:00.000-05:00").into_bytes() - )) - .is_ok()); - - assert!(from_redis_value_handler - .to_offsetdatetime_from_rfc3339(&Value::Data( - String::from("2024-01-15 21:19:00.000-05:00").into_bytes() - )) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_offsetdatetime_from_rfc3339(&Value::Data( - String::from("2024-01-15T21:19:00.000-05:00").into_bytes() - )) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_offsetdatetime_from_rfc3339(&Value::Nil) - .unwrap() - .is_none()); - } - - #[test] - fn test_get_date_from_redis_value_in_format_iso8601() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_date_from_iso8601(&Value::Data(String::from("2024-01-16").into_bytes())) - .is_ok()); - - assert!(from_redis_value_handler - .to_date_from_iso8601(&Value::Data(String::from("16-01-2024").into_bytes())) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_date_from_iso8601(&Value::Data(String::from("2024-01-16").into_bytes())) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_date_from_iso8601(&Value::Nil) - .unwrap() - .is_none()); - } - - #[test] - fn test_get_bytes_from_redis_value() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - assert!(from_redis_value_handler - .to_bytes(&Value::Data( - String::from("name,last_name\r\nJuan,Manuel").into_bytes() - )) - .is_ok()); - - assert!(from_redis_value_handler - .to_optional_bytes(&Value::Data(String::from("true").into_bytes())) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_bytes(&Value::Nil) - .unwrap() - .is_none()); - } - - #[test] - fn test_get_struct_instance_from_redis_value() { - let from_redis_value_handler: FromRedisValueHandler = FromRedisValueHandler::new(); - - #[derive(Deserialize, Serialize)] - struct Person { - pub name: String, - pub last_name: String, - pub age: u8, - pub localization: Localization, - pub is_live: bool, - } - - #[derive(Deserialize, Serialize)] - struct Localization { - pub city: String, - pub state: String, - pub country: String, - } - - let json_as_value: Value = Value::Data( - String::from( - r#" - { - "name":"Juan", - "last_name":"Tamayo", - "age":30, - "is_live": false, - "localization": { - "country":"COL", - "state":"ANT", - "city":"MDE" - }, - "favorite_food":"frijolitos", - "best_friend":"Miken't" - }"#, - ) - .into_bytes(), - ); - - let incorrect_json_as_value: Value = Value::Data( - String::from(r#"{"name":"Juan","middle_name":"Manuel","age":30}"#).into_bytes(), - ); - - assert!(from_redis_value_handler - .to_struct_instance::(&json_as_value) - .is_ok()); - - assert!(from_redis_value_handler - .to_struct_instance::(&incorrect_json_as_value) - .is_err()); - - assert!(from_redis_value_handler - .to_optional_struct_instance::(&json_as_value) - .unwrap() - .is_some()); - - assert!(from_redis_value_handler - .to_optional_struct_instance::(&Value::Nil) - .unwrap() - .is_none()); - } -}