Skip to content

Commit

Permalink
release fluvio 0.15.2 (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev authored Feb 7, 2025
1 parent 0864eb8 commit 0e782cf
Show file tree
Hide file tree
Showing 216 changed files with 18,120 additions and 3 deletions.
6 changes: 3 additions & 3 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ lint: build


resync-ver:
rm -rf versioned_docs/version-0.15.1
rsync -aurv docs/ versioned_docs/version-0.15.1
rm -rf versioned_docs/version-0.15.2
rsync -aurv docs/ versioned_docs/version-0.15.2

sync-ver:
rsync -aurv docs/ versioned_docs/version-0.15.1
rsync -aurv docs/ versioned_docs/version-0.15.2
82 changes: 82 additions & 0 deletions news/this-week-in-fluvio-0071.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
title: "This Week in Fluvio #71"
date: 2025-02-06
weight: 20
---
Fluvio is a distributed, programmable streaming platform written in Rust.

---
We released **Fluvio 0.15.2** last week.

## New release
Fluvio **v0.15.2** is now available!

To update you can run `fvm update`

```bash
$ fvm update

info: Updating fluvio stable to version 0.15.2. Current version is 0.15.1.
info: Downloading (1/5): [email protected]
info: Downloading (2/5): [email protected]
info: Downloading (3/5): [email protected]
info: Downloading (4/5): [email protected]
info: Downloading (5/5): [email protected]
done: Installed fluvio version 0.15.2
done: Now using fluvio version 0.15.2

```

If you don't have Fluvio in your machine run:

```
curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
```

If you are enjoying Fluvio please share with your friends!

:::info
Also check out the Stateful Data Flow (SDF) streaming analytics in beta [SDF Examples](https://github.com/infinyon/stateful-dataflows-examples)
:::

## New features
Notable changes in this new version:


- Fixed the error when a record is larger than `max_request_size` parameter.
- Produce config builder is mutable now, allowing a better handling when have advanced conditionals and making it a lot easier to wrapper it.

See the [CHANGELOG] for details

## Good First Issues
We love our open source community contributors. Here are some issues that you could contribute to. All the best.

- [Improve fluvio topic describe with additional information]
- [Different default SPU port]
- [Delete the consumer when delete topic]
- [Remove localhost from fluvio in favor of 127.0.0.1]
- [When a topic is deleted, connected clients should have their connection closed]


---

Get in touch with us on [GitHub Discussions] or join [our Discord channel] and come say hello!

See some of the interesting community projects, examples, and utilities in the [Fluvio Community] GitHub org.


For the full list of changes this week, be sure to check out [our CHANGELOG].

[Fluvio open source]: https://github.com/infinyon/fluvio
[our CHANGELOG]: https://github.com/infinyon/fluvio/blob/master/CHANGELOG.md
[our Discord channel]: https://discordapp.com/invite/bBG2dTz
[GitHub Discussions]: https://github.com/infinyon/fluvio/discussions

[this form]: https://infinyon.com/request/ss-early-access/
[CHANGELOG]: https://github.com/infinyon/fluvio/blob/v0.15.2/CHANGELOG.md
[When a topic is deleted, connected clients should have their connection closed]: https://github.com/infinyon/fluvio/issues/3836
[Delete the consumer when delete topic]: https://github.com/infinyon/fluvio/issues/4308
[Remove localhost from fluvio in favor of 127.0.0.1]: https://github.com/infinyon/fluvio/issues/3866
[Improve fluvio topic describe with additional information]: https://github.com/infinyon/fluvio/issues/3968
[Different default SPU port]: https://github.com/infinyon/fluvio/issues/3739
[Fluvio Community]: https://github.com/fluvio-community
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"

[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
flate2 = "1.0.35"
fluvio = "0.24.4"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fluvio = "0.24.4"
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-auto";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-manual";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
stream.offset_commit().expect("offset commit failed");
stream.offset_flush().await.expect("offset flush failed");
}
}
28 changes: 28 additions & 0 deletions versioned_docs/version-0.15.2/_embeds/apis/rust/consumer-simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use async_std::stream::StreamExt;

use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::from_end(1))
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::Read;
use std::collections::BTreeMap;
use async_std::stream::StreamExt;
use flate2::{bufread::GzEncoder, Compression};

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_file(
SmartModuleKind::Map,
"regex_text.wasm",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation from a wasm file
fn build_smartmodule_from_file(
kind: SmartModuleKind,
file_path: &str,
spec: &str
) -> SmartModuleInvocation {
// Read smartmodule wasm file
let raw_buffer = std::fs::read(file_path).expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer).expect("failed to read encoded wasm file");

// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: kind,
params: params,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::BTreeMap;
use async_std::stream::StreamExt;

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_name(
SmartModuleKind::Map,
"fluvio/[email protected]",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation using smartmodule name
fn build_smartmodule_from_name(
kind: SmartModuleKind,
smartmodule_name: &str,
spec: &str
) -> SmartModuleInvocation {
// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined(smartmodule_name.to_string()),
kind: kind,
params: params,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use fluvio::metadata::topic::TopicSpec;
use fluvio::Fluvio;

const TOPIC_NAME: &str = "hello-rust";
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;

// List topics
let topics = admin.all::<TopicSpec>().await.expect("Failed to list topics");
let topic_names = topics.iter().map(|topic| topic.name.clone()).collect::<Vec<String>>();

println!("Topics:\n - {}", topic_names.join("\n - "));
}
15 changes: 15 additions & 0 deletions versioned_docs/version-0.15.2/_embeds/apis/rust/producer-kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create key and value
let key = "Hello";
let value = "Fluvio";

// create producer & send key/value
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(key, value).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");

println!("Sent [{}] {}", key, value);
}
Loading

0 comments on commit 0e782cf

Please sign in to comment.