Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework producer and consumer concepts #357

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/cloud/quickstart.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ Now you're familiar with using InfinyOn Cloud with the [Cloud CLI], Check out ou
[SmartModule Jolt]: hub/smartmodules/jolt.mdx
[SmartModule Developer Kit (smdk)]: smartmodules/smdk.mdx
[about Topics in the Fluvio docs]: fluvio/concepts/topics.mdx
[about Producers in the Fluvio docs]: fluvio/concepts/producers.mdx
[about Consumers in the Fluvio docs]: fluvio/concepts/consumers.mdx
[about Producers in the Fluvio docs]: fluvio/concepts/producer.mdx
[about Consumers in the Fluvio docs]: fluvio/concepts/consumer.mdx
[a growing number of connectors]: hub/overview.mdx#connectors
[a random quote]: https://demo-data.infinyon.com/api/quote
[about Connectors in the Fluvio docs]: connectors/overview.mdx
Expand Down
225 changes: 225 additions & 0 deletions docs/fluvio/concepts/consumer.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
---
sidebar_position: 6
title: "Consumer"
---

# Consumer

The **Fluvio Consumer** is the component that reads and processes records from topics within the Fluvio streaming platform. It is designed to be flexible and robust, enabling applications to ingest data in real time or in batch mode with fine-grained control over partition selection, output formatting, and offset management.

---

## Capabilities

- **Record Consumption**:
The Consumer retrieves records from a specified topic and partition. By default, it reads from partition `0` but can be directed to any partition or even all partitions simultaneously.

- **Flexible Reading Modes**:
- **From the Beginning or Latest**: Consumers can start reading at the beginning of the log (using flags such as `--from-beginning` or `-B`) or from the current offset.
- **Batch vs. Continuous Consumption**: The `--disable-continuous` (`-d`) flag allows the consumer to exit after reading all available records, making it ideal for batch processing.

- **SmartModule Integration**:
Similar to producers, consumers can integrate [SmartModules](https://github.com/infinyon/fluvio/tree/master/smartmodule/examples/filter_json) (WASM modules) to perform inline transformations or filtering on records before they are delivered to the application.

- **Custom Output Formatting**:
With the `--format` flag, consumers can tailor the output display by embedding placeholders (e.g., `{{key}}`, `{{value}}`, `{{partition}}`, `{{offset}}`, and `{{time}}`) in a format string to suit their logging or processing needs.

- **Offset Management**:
Consumers maintain a persistent pointer for each topic partition. This offset ensures that upon restart or recovery, the consumer can resume reading from the correct position in the log.

---

## Partition Consumption Strategies

### Single Partition Consumption

By default, when you run a command like:

```bash
$ fluvio consume my-topic -B -d
```

the consumer reads from partition `0`. This is suitable for topics where records are funneled into a single partition or when targeted consumption is required.

### Specifying a Partition

To target a specific partition, use the `--partition` (`-p`) flag:

```bash
$ fluvio consume my-topic -B --partition 1
```

This command directs the consumer to read from partition `1`.

### Consuming from All Partitions

For cases where you want to aggregate records from every partition, the `-A` flag allows the consumer to consume records across all partitions:

```bash
$ fluvio consume my-topic -B -A
```

> **Note:** When consuming from multiple partitions, there is no guarantee of record ordering between partitions.

---

## SmartModule Integration for Consumers

Fluvio SmartModules provide an additional layer of processing by allowing you to filter or transform records in-flight. For example, you might use a SmartModule to filter out records that do not meet certain criteria:

```bash
$ fluvio consume my-topic -B --smartmodule-path="fluvio_wasm_filter.wasm"
```

Alternatively, after registering a SmartModule with the cluster:

```bash
$ fluvio smartmodule create --wasm-file="fluvio_wasm_filter.wasm" my_filter
```

you can apply it by name:

```bash
$ fluvio consume my-topic -B --smartmodule="my_filter"
```

This integration allows for powerful, on-the-fly processing without altering the core consumer logic.

---

## Custom Output Formatting

The consumer’s default output prints only the record values. For richer debugging or logging, the `--key-value` flag displays both keys and values:

```bash
$ fluvio consume my-topic -B --key-value
[null] This is my first record ever
[alice] Alice In Wonderland
```

Moreover, you can define a custom format using the `--format` option. For instance, to print records as CSV rows:

```bash
$ fluvio consume my-topic -B --format="{{time}},{{partition}},{{offset}},{{key}},{{value}}"
2022-05-04T15:35:49.244Z,0,0,null,This is my first record ever
2022-05-04T15:35:49.244Z,0,1,null,This is my second record ever
2022-05-04T15:52:19.963Z,0,2,alice,Alice In Wonderland
```

This flexibility in output formatting empowers users to integrate Fluvio into various monitoring, debugging, and processing workflows.

---

## Consumer Offsets and Commit Strategies

A key aspect of any streaming system is the management of **consumer offsets**. In Fluvio, these offsets serve as durable bookmarks that track which records have been processed:

- **Persistence and Durability**:
Offsets are stored persistently per topic partition. They survive cluster restarts and upgrades, ensuring that consumers can reliably resume consumption.

- **Commit Strategies**:
Fluvio supports two primary offset management strategies:

**Manual Offset Management**:
The application explicitly commits offsets after processing records. This provides full control over when an offset is considered “committed.”

_Example in Rust:_
```rust
use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};
use futures_util::StreamExt;

async fn manual_consume(fluvio: &Fluvio) -> anyhow::Result<()> {
let mut stream = fluvio
.consumer_with_config(
ConsumerConfigExtBuilder::default()
.topic("my-topic".to_string())
.offset_consumer("my-consumer".to_string())
.offset_start(Offset::beginning())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()?,
)
.await?;

while let Some(Ok(record)) = stream.next().await {
println!("{}", String::from_utf8_lossy(record.as_ref()));
stream.offset_commit()?;
stream.offset_flush().await?;
}
Ok(())
}
```

2. **Auto Offset Management**:
Offsets are committed automatically as records are consumed, reducing the burden on the developer.

_Example in Rust:_
```rust
use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};
use futures_util::StreamExt;

async fn auto_consume(fluvio: &Fluvio) -> anyhow::Result<()> {
let mut stream = fluvio
.consumer_with_config(
ConsumerConfigExtBuilder::default()
.topic("my-topic".to_string())
.offset_consumer("my-consumer".to_string())
.offset_start(Offset::beginning())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()?,
)
.await?;

while let Some(Ok(record)) = stream.next().await {
println!("{}", String::from_utf8_lossy(record.as_ref()));
}
Ok(())
}
```

- **Consumer Identity and Listing**:
Each consumer is identified by a unique name (specified via the CLI with `-c` or in code). You can list and manage consumers using the Fluvio CLI:

```bash
$ fluvio consumer list
CONSUMER TOPIC PARTITION OFFSET LAST SEEN
c1 hello-topic 0 1 4m 14s
```

Consumers can also be deleted as needed:

```bash
$ fluvio consumer delete c1
```

This robust offset management system ensures that applications can reliably process streams of data without loss or duplication, even in the face of failures.

---

## Summary

The **Fluvio Consumer** abstracts the complexities of reading and processing streaming data with features that include:

- **Flexible Partition Consumption**:
Read from single or multiple partitions with configurable starting offsets.

- **Dynamic Data Transformation**:
Integrate SmartModules for real-time filtering or transformation of records.

- **Customizable Output**:
Format output to suit application needs using the `--key-value` and `--format` options.

- **Robust Offset Management**:
Choose between manual and automatic strategies to maintain accurate, persistent consumption state.

By leveraging these capabilities, the Fluvio Consumer provides a powerful, scalable way to build reactive applications that respond to real-time data streams.

For further details and practical examples, please [check out the consumer in the CLI reference].


[check out the consumer in the CLI reference]: fluvio/cli/fluvio/consume.mdx
Loading