Skip to content

Commit

Permalink
Update to Zenoh 1.0.0
Browse files Browse the repository at this point in the history
Zenoh 1.0.0 contained a few breaking API changes which required some
adaptations to be made. In particular, all (Zenoh related) configuration
is now read from a config file while the corresponding command line
arguments have been removed.
  • Loading branch information
sophokles73 authored and eriksven committed Oct 31, 2024
1 parent a3c2b6c commit 59b4b4a
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 835 deletions.
788 changes: 214 additions & 574 deletions components/Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ log = { version = "0.4" }
protobuf = { version = "3.5.1" }
protobuf-codegen = { version = "3.5.1" }
protoc-bin-vendored = { version = "3.0" }
# prost has no features
prost = { version = "0.12" }
# prost-types has no features
prost-types = { version = "0.12" }
# tokio does not enable features by default
tokio = { version = "1.39" }
zenoh = { version = "0.11.0", default-features = false }
zenoh = { version = "1.0.0", default-features = false }

[profile.release]
lto = true # Link time optimization (dead code removal etc...)
Expand Down
3 changes: 3 additions & 0 deletions components/Dockerfile.fms-consumer
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ RUN apt-get update && apt-get install -y ca-certificates \
# This will speed up fetching the crate.io index in the future, see
# https://blog.rust-lang.org/2022/06/22/sparse-registry-testing.html
ENV CARGO_UNSTABLE_SPARSE_REGISTRY=true
# This is supposedly required for successfully building for arm64 using buildx with QEMU
# see https://github.com/rust-lang/cargo/issues/10583
ENV CARGO_NET_GIT_FETCH_WITH_CLI=true
RUN cargo install cargo-about

RUN echo "Building for $TARGETARCH"
Expand Down
213 changes: 44 additions & 169 deletions components/fms-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::fs::File;
use std::io::{self, BufRead, BufReader};
use std::process;
use std::sync::Arc;
use std::time::Duration;

use clap::{Arg, ArgAction, ArgMatches, Command};
use fms_proto::fms::VehicleStatus;
Expand All @@ -36,9 +35,7 @@ use rdkafka::consumer::Consumer;
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Headers};
use rdkafka::{ClientConfig, Message};

use futures::select;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::Config;

const CONTENT_TYPE_PROTOBUF: &str = "application/vnd.google.protobuf";

Expand All @@ -52,41 +49,12 @@ const SUBCOMMAND_ZENOH: &str = "zenoh";

const KEY_EXPR: &str = "fms/vehicleStatus";

fn parse_zenoh_args(args: &ArgMatches) -> Config {
let mut config: Config = if let Some(conf_file) = args.get_one::<String>("config") {
Config::from_file(conf_file).unwrap()
fn parse_zenoh_args(args: &ArgMatches) -> Result<Config, Box<dyn std::error::Error + Send + Sync>> {
if let Some(conf_file) = args.get_one::<String>("config") {
Config::from_file(conf_file)
} else {
Config::default()
};

if let Some(mode) = args.get_one::<WhatAmI>("mode") {
config.set_mode(Some(*mode)).unwrap();
}

if let Some(values) = args.get_many::<String>("connect") {
config
.connect
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.get_many::<String>("listen") {
config
.listen
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.get_one::<bool>("no-multicast-scouting") {
config
.scouting
.multicast
.set_enabled(Some(*values))
.unwrap();
}
if let Some(values) = args.get_one::<Duration>("session-timeout") {
let millis = u64::try_from(values.as_millis()).unwrap_or(u64::MAX);
config.scouting.set_timeout(Some(millis)).unwrap();
Ok(Config::default())
}
config
}

fn add_property_bag_to_map(property_bag: String, headers: &mut HashMap<String, String>) {
Expand Down Expand Up @@ -221,122 +189,63 @@ async fn process_hono_message(m: &BorrowedMessage<'_>, influx_writer: Arc<Influx
}
}

async fn run_async_processor_hono(args: &ArgMatches) {
let influx_writer = InfluxWriter::new(args).map_or_else(
|e| {
error!("failed to create InfluxDB writer: {e}");
process::exit(1);
},
Arc::new,
);
async fn run_async_processor_hono(args: &ArgMatches) -> Result<(), Box<dyn std::error::Error>> {
let influx_writer = InfluxWriter::new(args).map(Arc::new)?;

let hono_args = args.subcommand_matches(SUBCOMMAND_HONO).unwrap();
let mut client_config = get_kafka_client_config(
hono_args
.get_one::<String>(PARAM_KAFKA_PROPERTIES_FILE)
.unwrap(),
)
.unwrap_or_else(|e| {
error!("failed to create Kafka client: {e}");
process::exit(1);
});
)?;

// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = client_config
.set_log_level(RDKafkaLogLevel::Debug)
.create()
.unwrap_or_else(|e| {
error!("failed to create Kafka client: {e}");
process::exit(1);
});
.create()?;

let topic_name = hono_args.get_one::<String>(PARAM_KAFKA_TOPIC_NAME).unwrap();

match consumer.fetch_metadata(Some(topic_name), Duration::from_secs(10)) {
Err(e) => {
error!("could not retrieve meta data for topic [{topic_name}] from broker: {e}");
process::exit(1);
}
Ok(metadata) => match metadata
.topics()
.iter()
.find(|topic| topic.name() == topic_name)
{
Some(topic) => {
if topic.partitions().is_empty() {
error!("topic [{topic_name}] does not exist (yet)");
process::exit(1);
}
}
None => {
error!("broker did not return meta data for topic [{topic_name}]");
process::exit(1);
consumer.subscribe(&[topic_name.as_str()])?;
info!("successfully subscribed to topic {topic_name}");
info!("starting message consumer");
consumer
.stream()
.try_for_each(|borrowed_message| {
let cloned_writer = influx_writer.clone();
async move {
process_hono_message(&borrowed_message, cloned_writer).await;
Ok(())
}
},
}

match consumer.subscribe(&[topic_name.as_str()]) {
Err(e) => {
error!("failed to subscribe to topic: {e}");
process::exit(1);
}
Ok(_) => {
info!("successfully subscribed to topic {topic_name}");
info!("starting message consumer");
consumer
.stream()
.try_for_each(|borrowed_message| {
let cloned_writer = influx_writer.clone();
async move {
process_hono_message(&borrowed_message, cloned_writer).await;
Ok(())
}
})
.await
.unwrap_or_else(|e| {
error!("could not start consumer for topic [{topic_name}]: {e}");
process::exit(1);
});
}
}
})
.await?;
Ok(())
}

async fn run_async_processor_zenoh(args: &ArgMatches) {
let influx_writer = InfluxWriter::new(args).map_or_else(
|e| {
error!("failed to create InfluxDB writer: {e}");
process::exit(1);
},
Arc::new,
);
async fn run_async_processor_zenoh(args: &ArgMatches) -> Result<(), Box<dyn std::error::Error>> {
let influx_writer = InfluxWriter::new(args).map(Arc::new)?;
let zenoh_args = args.subcommand_matches(SUBCOMMAND_ZENOH).unwrap();
let config = parse_zenoh_args(zenoh_args);
let config = parse_zenoh_args(zenoh_args)
.map_err(|e| Box::<dyn std::error::Error>::from(e.to_string()))?;

info!("Opening session...");
let session = zenoh::open(config).res().await.unwrap_or_else(|e| {
error!("failed to open Zenoh session: {e}");
process::exit(1);
});
let session = zenoh::open(config)
.await
.map_err(|e| Box::<dyn std::error::Error>::from(e.to_string()))?;

info!("Declaring Subscriber on '{}'...", &KEY_EXPR);
let subscriber = session
.declare_subscriber(KEY_EXPR)
.res()
.await
.unwrap_or_else(|e| {
error!("failed to create Zenoh subscriber: {e}");
process::exit(1);
});
loop {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
let cloned_writer = influx_writer.clone();
process_zenoh_message(&sample.value.payload.contiguous(), cloned_writer).await;
}
);
.map_err(|e| Box::<dyn std::error::Error>::from(e.to_string()))?;
while let Ok(sample) = subscriber.recv_async().await {
let cloned_writer = influx_writer.clone();
let payload = sample.payload().to_bytes();
process_zenoh_message(&payload, cloned_writer).await;
}
Ok(())
}

#[tokio::main]
pub async fn main() {
env_logger::init();
Expand Down Expand Up @@ -381,52 +290,12 @@ pub async fn main() {
Command::new(SUBCOMMAND_ZENOH)
.about("Forwards VSS data to an Influx DB server from Eclipse Zenoh")
.arg(
Arg::new("mode")
.value_parser(clap::value_parser!(WhatAmI))
.long("mode")
.short('m')
.help("The Zenoh session mode (peer by default).")
.required(false),
)
.arg(
Arg::new("connect")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("connect")
.short('e')
.help("Endpoints to connect to.")
.required(false),
)
.arg(
Arg::new("listen")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("listen")
.short('l')
.help("Endpoints to listen on.")
.required(false),
)
.arg(
Arg::new("no-multicast-scouting")
.long("no-multicast-scouting")
.help("Disable the multicast-based scouting mechanism.")
.action(clap::ArgAction::SetFalse)
.required(false),
)
.arg(
Arg::new("config")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("config")
.short('c')
.help("A configuration file.")
.required(false),
)
.arg(
Arg::new("session-timeout")
.value_parser(|s: &str| duration_str::parse(s))
.long("session-timeout")
.help("The time to wait for establishment of a Zenoh session, e.g. 10s.")
.value_name("DURATION_SPEC")
.required(false)
.default_value("20s")
),
);

Expand All @@ -435,11 +304,17 @@ pub async fn main() {
match args.subcommand_name() {
Some(SUBCOMMAND_HONO) => {
info!("starting FMS data consumer for Hono");
run_async_processor_hono(&args).await
if let Err(e) = run_async_processor_hono(&args).await {
error!("failed to start Hono processor: {e}");
process::exit(1);
}
}
Some(SUBCOMMAND_ZENOH) => {
info!("starting FMS data consumer for Zenoh");
run_async_processor_zenoh(&args).await
if let Err(e) = run_async_processor_zenoh(&args).await {
error!("failed to start Zenoh processor: {e}");
process::exit(1);
}
}
Some(_) => {
// cannot happen because subcommand is required
Expand Down
2 changes: 1 addition & 1 deletion components/fms-forwarder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ log = { workspace = true }
paho-mqtt = { version = "0.12", default-features = false, features = [
"vendored-ssl",
] }
zenoh = { workspace = true, features = ["transport_tcp"] }
protobuf = { workspace = true }
prost = { version = "0.13" }
prost-types = { version = "0.13" }
Expand All @@ -60,6 +59,7 @@ tonic = { version = "0.12.3", default-features = false, features = [
"tls",
"prost",
] }
zenoh = { workspace = true, features = ["transport_tcp"] }

[build-dependencies]
protoc-bin-vendored = { workspace = true }
Expand Down
Loading

0 comments on commit 59b4b4a

Please sign in to comment.