Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.5.0 Beta to be tested! #20

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
346e27b
Remove FromRedisValueHandler from crate. Remove module test_types.rs
JMTamayo Sep 19, 2024
e7b710a
Remove integration tests. In the future commits, unitary tests are go…
JMTamayo Sep 20, 2024
207b3c3
Check format and linters in CI. Publish test coverage in codecov
JMTamayo Sep 20, 2024
82c83d8
Fixing CI.yaml: remove blank lines
JMTamayo Sep 20, 2024
f67949d
Move types.rs to core module
JMTamayo Sep 20, 2024
3c299d8
Move client.rs to core module. Implement CommunicationProtocol, Clien…
JMTamayo Sep 20, 2024
5237d6d
Implement connection.rs in core module. Implement function ping to ve…
JMTamayo Sep 20, 2024
5104937
Implement producer.rs in core module. Implement producer core methods…
JMTamayo Sep 20, 2024
ed48297
Rename RedsumerProducer to Producer. Implement Message struct to be u…
JMTamayo Sep 20, 2024
e42467c
Rename streams module to redsumer. Include tracing logs in producer m…
JMTamayo Sep 20, 2024
64d2fea
Rename streams module to redsumer
JMTamayo Sep 20, 2024
960927e
Include tracing logs in connection methods
JMTamayo Sep 21, 2024
de06d01
Rename module redis_streams to streams in core
JMTamayo Sep 21, 2024
e0bc264
Implement Makefile to improve dev experience
JMTamayo Sep 21, 2024
b8d1c54
Remove Message from redsumer producer. Implement directly produce_fro…
JMTamayo Sep 21, 2024
8bce385
Divide types module into result and streams::types modules
JMTamayo Sep 22, 2024
65e4b66
Implement new types and result modules in client, connection and prod…
JMTamayo Sep 22, 2024
5e91adf
Implement core::streams::consumer module and redsumer::consumer module
JMTamayo Sep 23, 2024
f2dc07f
Make cargo clippy happy
JMTamayo Sep 23, 2024
b7de4da
Make cargo clippy happy. Update doc tests
JMTamayo Sep 23, 2024
339ed6b
Update project dependencies versions. Update crate version
JMTamayo Sep 23, 2024
7ddc8f7
Include commands to create docs.rs documentation in Makefile
JMTamayo Sep 23, 2024
42990c2
Modify tests coverage target
JMTamayo Sep 23, 2024
a367194
Include log filter in CI
JMTamayo Sep 23, 2024
a8dc15d
Update README.md
JMTamayo Sep 26, 2024
e3a2999
Remove examples from README.md: Folder does not exist
JMTamayo Sep 26, 2024
01b60b7
Beta version to be tested
JMTamayo Oct 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,41 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Redis streams service
uses: supercharge/[email protected]
- uses: actions/checkout@v3

- name: Install Rust
uses: actions-rust-lang/[email protected]
with:
components: llvm-tools-preview

- name: Install cargo-llvm-cov
uses: taiki-e/[email protected]
with:
redis-port: 6379
tool: cargo-llvm-cov

- name: Git checkout
uses: actions/checkout@v3

- name: Build project
run: cargo build --verbose


- name: Verify project formatting
run: cargo fmt --all --check

- name: Check for possible errors and coding suggestions
run: cargo clippy --all-features

- name: Run project tests
env:
RUST_LOG: warn
run: cargo test --verbose

- name: Run tests with coverage for all features
env:
RUST_LOG: warn
run: cargo llvm-cov --workspace --all-features --codecov --output-path codecov.json

- name: Upload coverage to Codecov
uses: codecov/[email protected]
with:
files: codecov.json
fail_ci_if_error: true
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
23 changes: 23 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
fmt:
cargo fmt --all

fmt-check:
cargo fmt --all --check

clippy-check:
cargo clippy --all-features

install-llvm-cov:
cargo install cargo-llvm-cov

test-llvm-cov:
cargo llvm-cov --html --workspace --all-features

test:
cargo test --all-features

doc:
cargo doc --all-features

doc-open:
cargo doc --all-features --open
224 changes: 127 additions & 97 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ To use ***redsumer*** from GitHub repository with specific version, set the depe

```ini
[dependencies]
redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.4.2" }
redsumer = { git = "https://github.com/enerBit/redsumer-rs.git", package = "redsumer", version = "0.5.0-beta.1" }
```

You can depend on it via cargo by adding the following dependency to your `Cargo.toml` file:

```ini
[dependencies]
redsumer = { version = "0.4.2" }
redsumer = { version = "0.5.0-beta.1" }
```

## Basic Usage
Expand All @@ -34,38 +34,56 @@ use uuid::Uuid;

#[tokio::main]
async fn main() {
let credentials: Option<ClientCredentials> = None;
let host: &str = "localhost";
let port: &str = "6379";
let db: &str = "0";
let stream_name: &str = "my-stream";

let producer_result: RedsumerResult<RedsumerProducer> =
RedsumerProducer::new(
credentials,
host,
port,
db,
stream_name,
);

let producer: RedsumerProducer = producer_result.unwrap_or_else(|error| {
panic!("Error creating a new RedsumerProducer instance: {:?}", error);
});

let mut message: BTreeMap<&str, String> = BTreeMap::new();
message.insert("id", Uuid::default().to_string());
message.insert("started_at", OffsetDateTime::now_utc().to_string());

let id: Id = producer.produce(message).await.unwrap_or_else(|error| {
panic!("Error producing stream message from BTreeMap: {:?}", error.to_string());
});
let credentials: Option<ClientCredentials> = None;
let host: &str = "localhost";
let port: u16 = 6379;
let db: i64 = 0;
let stream_name: &str = "my-stream";

let args: ClientArgs = ClientArgs::new(
credentials,
host,
port,
db,
CommunicationProtocol::RESP2,
);

let config: ProducerConfig = ProducerConfig::new(stream_name);

let producer_result: RedsumerResult<Producer> =
Producer::new(
&args,
&config,
);

let producer: Producer = producer_result.unwrap_or_else(|error| {
panic!("Error creating a new RedsumerProducer instance: {:?}", error);
});

let mut message_1: BTreeMap<&str, String> = BTreeMap::new();
message_1.insert("id", Uuid::new_v4().to_string());
message_1.insert("started_at", OffsetDateTime::now_utc().to_string());

let mut message_2: Vec<(String, String)> = Vec::new();
message_2.push(("id".to_string(), Uuid::new_v4().to_string()));
message_2.push(("started_at".to_string(), OffsetDateTime::now_utc().to_string()));

let id_1: Id = producer.produce_from_map(message_1).await.unwrap_or_else(|error| {
panic!("Error producing stream message from BTreeMap: {:?}", error.to_string());
});

let id_2: Id = producer.produce_from_items(message_2).await.unwrap_or_else(|error| {
panic!("Error producing stream message from Vec: {:?}", error.to_string());
});

println!("Message 1 produced with id: {:?}", id_1);
println!("Message 2 produced with id: {:?}", id_2);
}
```

Similar to the previous example, you can produce a message from a [HashMap](std::collections::HashMap) or a [HashSet](std::collections::HashSet). Go to [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases like producing a stream message from an instance of a struct.
Similar to the previous example, you can produce a message from a [HashMap](std::collections::HashMap) or a [HashSet](std::collections::HashSet).

The [produce](RedsumerProducer::produce) method accepts a generic type that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information.
The [produce_from_map](Producer::produce_from_map) and [produce_from_items](Producer::produce_from_items) methods accepts generic types that implements the [ToRedisArgs](redis::ToRedisArgs) trait. Take a look at the documentation for more information.

#### Consume messages from a stream:

Expand All @@ -77,87 +95,99 @@ use redsumer::redis::StreamId;

#[tokio::main]
async fn main() {
let credentials: Option<ClientCredentials> = None;
let host: &str = "localhost";
let port: &str = "6379";
let db: &str = "0";
let stream_name: &str = "my-stream";
let group_name: &str = "group-name";
let consumer_name: &str = "consumer";
let since_id: &str = "0-0";
let min_idle_time_milliseconds: usize = 1000;
let new_messages_count: usize = 3;
let pending_messages_count: usize = 2;
let claimed_messages_count: usize = 1;
let block: u8 = 5;

let consumer_result: RedsumerResult<RedsumerConsumer> = RedsumerConsumer::new(
credentials,
host,
port,
db,
stream_name,
group_name,
consumer_name,
since_id,
min_idle_time_milliseconds,
new_messages_count,
pending_messages_count,
claimed_messages_count,
block,
);

let mut consumer: RedsumerConsumer = consumer_result.unwrap_or_else(|error| {
panic!("Error creating a new RedsumerConsumer instance: {:?}", error);
});

loop {
let messages: Vec<StreamId> = consumer.consume().await.unwrap_or_else(|error| {
panic!("Error consuming messages from stream: {:?}", error);
});

for message in messages {
if consumer.is_still_mine(&message.id).unwrap_or_else(|error| {
panic!(
"Error checking if message is still in consumer pending list: {:?}", error
);
}) {
// Process message ...
println!("Processing message: {:?}", message);
// ...

let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| {
panic!("Error acknowledging message: {:?}", error);
});

if ack {
println!("Message acknowledged: {:?}", message);
}
}
}
}
let credentials: Option<ClientCredentials> = None;
let host: &str = "localhost";
let port: u16 = 6379;
let db: i64 = 0;
let stream_name: &str = "my-stream";
let group_name: &str = "group-name";
let consumer_name: &str = "consumer";
let initial_stream_id: &str = "0-0";
let min_idle_time_milliseconds: usize = 1000;
let new_messages_count: usize = 3;
let pending_messages_count: usize = 2;
let claimed_messages_count: usize = 1;
let block: usize = 5;

let args: ClientArgs = ClientArgs::new(
credentials,
host,
port,
db,
CommunicationProtocol::RESP2,
);

let config: ConsumerConfig = ConsumerConfig::new(
stream_name,
group_name,
consumer_name,
ReadNewMessagesOptions::new(
new_messages_count,
block
),
ReadPendingMessagesOptions::new(
pending_messages_count
),
ClaimMessagesOptions::new(
claimed_messages_count,
min_idle_time_milliseconds
),
);

let consumer_result: RedsumerResult<Consumer> = Consumer::new(
args,
config,
Some(initial_stream_id.to_string()),
);

let mut consumer: Consumer = consumer_result.unwrap_or_else(|error| {
panic!("Error creating a new RedsumerConsumer instance: {:?}", error);
});

loop {
let messages: Vec<StreamId> = consumer.consume().await.unwrap_or_else(|error| {
panic!("Error consuming messages from stream: {:?}", error);
});

for message in messages {
if consumer.is_still_mine(&message.id).unwrap_or_else(|error| {
panic!(
"Error checking if message is still in consumer pending list: {:?}", error
);
}) {
// Process message ...
println!("Processing message: {:?}", message);
// ...

let ack: bool = consumer.ack(&message.id).await.unwrap_or_else(|error| {
panic!("Error acknowledging message: {:?}", error);
});

if ack {
println!("Message acknowledged: {:?}", message);
}
}
}
}
}
```

In this example, the [consume](RedsumerConsumer::consume) method is called in a loop to consume messages from the stream.
The [consume](RedsumerConsumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream.
The [is_still_mine](RedsumerConsumer::is_still_mine) method is used to check if the message is still in the consumer pending list.
If it is, the message is processed and then acknowledged using the [ack](RedsumerConsumer::ack) method.
The [ack](RedsumerConsumer::ack) method returns a boolean indicating if the message was successfully acknowledged.
In this example, the [consume](Consumer::consume) method is called in a loop to consume messages from the stream.
The [consume](Consumer::consume) method returns a vector of [StreamId](redis::StreamId) instances. Each [StreamId](redis::StreamId) instance represents a message in the stream.
The [is_still_mine](Consumer::is_still_mine) method is used to check if the message is still in the consumer pending list.
If it is, the message is processed and then acknowledged using the [ack](Consumer::ack) method.
The [ack](Consumer::ack) method returns a boolean indicating if the message was successfully acknowledged.

The main objective of this message consumption strategy is to minimize the possibility that two or more consumers from the same consumer group operating simultaneously consume the same message at the same time.
Knowing that it is a complex problem with no definitive solution, including business logic in the message processing instance will always improve results.

Take a look at the [examples](https://github.com/enerBit/redsumer-rs/tree/main/examples) directory to see more use cases.

#### Utilities from [redis] crate:

The [redis] module provides utilities from the [redis](https://docs.rs/redis) crate. You can use these utilities to interact with Redis values and errors.

#### Unwrap [Value](redis::Value) to a specific type:

The [Value](redis::Value) enum represents a Redis value. It can be converted to a specific type using the [from_redis_value](redis::from_redis_value) function. This function can be imported from the [redis] module.
***redsumer*** includes the [FromRedisValueHandler] struct that implements the [FromRedisValue](redis::FromRedisValue) trait for a lot of types. It is useful to convert a [Value](redis::Value) to a specific type reducing boilerplate code and total lines of code.

## Contributing

Expand Down
11 changes: 11 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
coverage:
status:
project:
default:
threshold: 5%
target: 80%

patch:
default:
threshold: 5%
target: 80%
Loading
Loading