diff --git a/redsumer-rs/CHANGELOG.md b/redsumer-rs/CHANGELOG.md index c6f9b94..e6be8aa 100644 --- a/redsumer-rs/CHANGELOG.md +++ b/redsumer-rs/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## ✨ v0.4.1 [2024-06-13] + +### Fixed: + +- 🛠 Fixing BUG reported in [issue #15](https://github.com/enerBit/redsumer-rs/issues/15) with arguments in function xclaim. + ## ✨ v0.4.0 [2024-04-23] ### Added: diff --git a/redsumer-rs/Cargo.toml b/redsumer-rs/Cargo.toml index f2fecf3..a9c37bf 100644 --- a/redsumer-rs/Cargo.toml +++ b/redsumer-rs/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "redsumer" description = "Lightweight implementation of Redis Streams for Rust" -version = "0.4.0" +version = "0.4.1" edition = "2021" license-file = "../LICENSE" readme = "../README.md" @@ -22,10 +22,10 @@ authors = [ [dependencies] redis = { version = "0.25.3", features = ["tokio-comp", "streams"] } -tokio = { version = "1.37.0", features = ["full"] } +tokio = { version = "1.38.0", features = ["full"] } uuid = { version = "1.8.0" } time = { version = "0.3.36", features = ["parsing"] } bytes = { version = "1.6.0" } -serde = { version = "1.0.198", features = ["derive"] } -serde_json = { version = "1.0.116" } +serde = { version = "1.0.203", features = ["derive"] } +serde_json = { version = "1.0.117" } log = { version = "0.4.21" } diff --git a/redsumer-rs/src/redsumer/consumer.rs b/redsumer-rs/src/redsumer/consumer.rs index b2f7002..2613859 100644 --- a/redsumer-rs/src/redsumer/consumer.rs +++ b/redsumer-rs/src/redsumer/consumer.rs @@ -226,6 +226,25 @@ impl<'c> RedsumerConsumer<'c> { /// Claim pending messages from *stream* from *since_id* to the newest one until to a maximum of *claimed_messages_count* using [`Commands::xpending_count`] ([`XPENDING`](https://redis.io/commands/xpending/)) and [`Commands::xclaim_options`] ([`XCLAIM`](https://redis.io/commands/xclaim/)). fn claim_pending_messages(&self) -> RedsumerResult> { + let ids_to_claim: Vec = self + .get_client() + .get_connection()? + .xpending_count::<_, _, _, _, _, StreamPendingCountReply>( + self.get_stream_name(), + self.get_group_name(), + self.get_since_id(), + "+", + self.get_claimed_messages_count(), + )? + .ids + .iter() + .map(|stream_pending_id| stream_pending_id.id.to_owned()) + .collect::>(); + + if ids_to_claim.is_empty() { + return Ok(Vec::new()); + } + Ok(self .get_client() .get_connection()? @@ -234,20 +253,7 @@ impl<'c> RedsumerConsumer<'c> { self.get_group_name(), self.get_consumer_name(), self.get_min_idle_time_milliseconds(), - &(self - .get_client() - .get_connection()? - .xpending_count::<_, _, _, _, _, StreamPendingCountReply>( - self.get_stream_name(), - self.get_group_name(), - self.get_since_id(), - "+", - self.get_claimed_messages_count(), - )? - .ids - .iter() - .map(|stream_pending_id| stream_pending_id.id.to_owned()) - .collect::>()), + &ids_to_claim, StreamClaimOptions::default(), )? .ids)