Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Zenoh 1.0.0 #57

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
784 changes: 216 additions & 568 deletions components/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ prost = { version = "0.12" }
prost-types = { version = "0.12" }
# tokio does not enable features by default
tokio = { version = "1.39" }
zenoh = { version = "0.11.0", default-features = false }
zenoh = { version = "1.0.0", default-features = false }

[profile.release]
lto = true # Link time optimization (dead code removal etc...)
Expand Down
3 changes: 3 additions & 0 deletions components/Dockerfile.fms-consumer
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ RUN apt-get update && apt-get install -y ca-certificates \
# This will speed up fetching the crate.io index in the future, see
# https://blog.rust-lang.org/2022/06/22/sparse-registry-testing.html
ENV CARGO_UNSTABLE_SPARSE_REGISTRY=true
# This is supposedly required for successfully building for arm64 using buildx with QEMU
# see https://github.com/rust-lang/cargo/issues/10583
ENV CARGO_NET_GIT_FETCH_WITH_CLI=true
RUN cargo install cargo-about

RUN echo "Building for $TARGETARCH"
Expand Down
265 changes: 70 additions & 195 deletions components/fms-consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::fs::File;
use std::io::{self, BufRead, BufReader};
use std::process;
use std::sync::Arc;
use std::time::Duration;

use clap::{Arg, ArgAction, ArgMatches, Command};
use fms_proto::fms::VehicleStatus;
Expand All @@ -36,9 +35,7 @@ use rdkafka::consumer::Consumer;
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Headers};
use rdkafka::{ClientConfig, Message};

use futures::select;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::Config;

const CONTENT_TYPE_PROTOBUF: &str = "application/vnd.google.protobuf";

Expand All @@ -52,41 +49,12 @@ const SUBCOMMAND_ZENOH: &str = "zenoh";

const KEY_EXPR: &str = "fms/vehicleStatus";

fn parse_zenoh_args(args: &ArgMatches) -> Config {
let mut config: Config = if let Some(conf_file) = args.get_one::<String>("config") {
Config::from_file(conf_file).unwrap()
fn parse_zenoh_args(args: &ArgMatches) -> Result<Config, Box<dyn std::error::Error + Send + Sync>> {
if let Some(conf_file) = args.get_one::<String>("config") {
Config::from_file(conf_file)
} else {
Config::default()
};

if let Some(mode) = args.get_one::<WhatAmI>("mode") {
config.set_mode(Some(*mode)).unwrap();
}

if let Some(values) = args.get_many::<String>("connect") {
config
.connect
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.get_many::<String>("listen") {
config
.listen
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.get_one::<bool>("no-multicast-scouting") {
config
.scouting
.multicast
.set_enabled(Some(*values))
.unwrap();
Ok(Config::default())
}
if let Some(values) = args.get_one::<Duration>("session-timeout") {
let millis = u64::try_from(values.as_millis()).unwrap_or(u64::MAX);
config.scouting.set_timeout(Some(millis)).unwrap();
}
config
}

fn add_property_bag_to_map(property_bag: String, headers: &mut HashMap<String, String>) {
Expand Down Expand Up @@ -221,122 +189,63 @@ async fn process_hono_message(m: &BorrowedMessage<'_>, influx_writer: Arc<Influx
}
}

async fn run_async_processor_hono(args: &ArgMatches) {
let influx_writer = InfluxWriter::new(args).map_or_else(
|e| {
error!("failed to create InfluxDB writer: {e}");
process::exit(1);
},
Arc::new,
);
async fn run_async_processor_hono(args: &ArgMatches) -> Result<(), Box<dyn std::error::Error>> {
let influx_writer = InfluxWriter::new(args).map(Arc::new)?;

let hono_args = args.subcommand_matches(SUBCOMMAND_HONO).unwrap();
let mut client_config = get_kafka_client_config(
hono_args
.get_one::<String>(PARAM_KAFKA_PROPERTIES_FILE)
.unwrap(),
)
.unwrap_or_else(|e| {
error!("failed to create Kafka client: {e}");
process::exit(1);
});
)?;

// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = client_config
.set_log_level(RDKafkaLogLevel::Debug)
.create()
.unwrap_or_else(|e| {
error!("failed to create Kafka client: {e}");
process::exit(1);
});
.create()?;

let topic_name = hono_args.get_one::<String>(PARAM_KAFKA_TOPIC_NAME).unwrap();

match consumer.fetch_metadata(Some(topic_name), Duration::from_secs(10)) {
Err(e) => {
error!("could not retrieve meta data for topic [{topic_name}] from broker: {e}");
process::exit(1);
}
Ok(metadata) => match metadata
.topics()
.iter()
.find(|topic| topic.name() == topic_name)
{
Some(topic) => {
if topic.partitions().is_empty() {
error!("topic [{topic_name}] does not exist (yet)");
process::exit(1);
}
consumer.subscribe(&[topic_name.as_str()])?;
info!("successfully subscribed to topic {topic_name}");
info!("starting message consumer");
consumer
.stream()
.try_for_each(|borrowed_message| {
let cloned_writer = influx_writer.clone();
async move {
process_hono_message(&borrowed_message, cloned_writer).await;
Ok(())
}
None => {
error!("broker did not return meta data for topic [{topic_name}]");
process::exit(1);
}
},
}

match consumer.subscribe(&[topic_name.as_str()]) {
Err(e) => {
error!("failed to subscribe to topic: {e}");
process::exit(1);
}
Ok(_) => {
info!("successfully subscribed to topic {topic_name}");
info!("starting message consumer");
consumer
.stream()
.try_for_each(|borrowed_message| {
let cloned_writer = influx_writer.clone();
async move {
process_hono_message(&borrowed_message, cloned_writer).await;
Ok(())
}
})
.await
.unwrap_or_else(|e| {
error!("could not start consumer for topic [{topic_name}]: {e}");
process::exit(1);
});
}
}
})
.await?;
Ok(())
}

async fn run_async_processor_zenoh(args: &ArgMatches) {
let influx_writer = InfluxWriter::new(args).map_or_else(
|e| {
error!("failed to create InfluxDB writer: {e}");
process::exit(1);
},
Arc::new,
);
async fn run_async_processor_zenoh(args: &ArgMatches) -> Result<(), Box<dyn std::error::Error>> {
let influx_writer = InfluxWriter::new(args).map(Arc::new)?;
let zenoh_args = args.subcommand_matches(SUBCOMMAND_ZENOH).unwrap();
let config = parse_zenoh_args(zenoh_args);
let config = parse_zenoh_args(zenoh_args)
.map_err(|e| Box::<dyn std::error::Error>::from(e.to_string()))?;

info!("Opening session...");
let session = zenoh::open(config).res().await.unwrap_or_else(|e| {
error!("failed to open Zenoh session: {e}");
process::exit(1);
});
let session = zenoh::open(config)
.await
.map_err(|e| Box::<dyn std::error::Error>::from(e.to_string()))?;

info!("Declaring Subscriber on '{}'...", &KEY_EXPR);
let subscriber = session
.declare_subscriber(KEY_EXPR)
.res()
.await
.unwrap_or_else(|e| {
error!("failed to create Zenoh subscriber: {e}");
process::exit(1);
});
loop {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
let cloned_writer = influx_writer.clone();
process_zenoh_message(&sample.value.payload.contiguous(), cloned_writer).await;
}
);
.map_err(|e| Box::<dyn std::error::Error>::from(e.to_string()))?;
while let Ok(sample) = subscriber.recv_async().await {
let cloned_writer = influx_writer.clone();
let payload = sample.payload().to_bytes();
process_zenoh_message(&payload, cloned_writer).await;
}
Ok(())
}

#[tokio::main]
pub async fn main() {
env_logger::init();
Expand All @@ -357,89 +266,55 @@ pub async fn main() {
Command::new(SUBCOMMAND_HONO)
.about("Forwards VSS data to an Influx DB server from Hono's north bound Kafka API")
.arg(
Arg::new(PARAM_KAFKA_PROPERTIES_FILE)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_PROPERTIES_FILE)
.help("The path to a file containing Kafka client properties for connecting to the Kafka broker(s).")
.action(ArgAction::Set)
.value_name("PATH")
.env("KAFKA_PROPERTIES_FILE")
.required(true),
)
.arg(
Arg::new(PARAM_KAFKA_TOPIC_NAME)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_TOPIC_NAME)
.alias("topic")
.help("The name of the Kafka topic to consume VSS data from.")
.value_name("TOPIC")
.required(true)
.env("KAFKA_TOPIC_NAME"),
),
Arg::new(PARAM_KAFKA_PROPERTIES_FILE)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_PROPERTIES_FILE)
.help("The path to a file containing Kafka client properties for connecting to the Kafka broker(s).")
.action(ArgAction::Set)
.value_name("PATH")
.env("KAFKA_PROPERTIES_FILE")
.required(true),
)
.arg(
Arg::new(PARAM_KAFKA_TOPIC_NAME)
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long(PARAM_KAFKA_TOPIC_NAME)
.alias("topic")
.help("The name of the Kafka topic to consume VSS data from.")
.value_name("TOPIC")
.required(true)
.env("KAFKA_TOPIC_NAME"),
),
)
.subcommand(
Command::new(SUBCOMMAND_ZENOH)
.about("Forwards VSS data to an Influx DB server from Eclipse Zenoh")
.arg(
Arg::new("mode")
.value_parser(clap::value_parser!(WhatAmI))
.long("mode")
.short('m')
.help("The Zenoh session mode (peer by default).")
.required(false),
)
.arg(
Arg::new("connect")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("connect")
.short('e')
.help("Endpoints to connect to.")
.required(false),
)
.arg(
Arg::new("listen")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("listen")
.short('l')
.help("Endpoints to listen on.")
.required(false),
)
.arg(
Arg::new("no-multicast-scouting")
.long("no-multicast-scouting")
.help("Disable the multicast-based scouting mechanism.")
.action(clap::ArgAction::SetFalse)
.required(false),
)
.arg(
Arg::new("config")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("config")
.short('c')
.help("A configuration file.")
.required(false),
)
.arg(
Arg::new("session-timeout")
.value_parser(|s: &str| duration_str::parse(s))
.long("session-timeout")
.help("The time to wait for establishment of a Zenoh session, e.g. 10s.")
.value_name("DURATION_SPEC")
.required(false)
.default_value("20s")
),
Arg::new("config")
.value_parser(clap::builder::NonEmptyStringValueParser::new())
.long("config")
.short('c')
.help("A configuration file.")
.required(false),
),
);

let args = parser.get_matches();

match args.subcommand_name() {
Some(SUBCOMMAND_HONO) => {
info!("starting FMS data consumer for Hono");
run_async_processor_hono(&args).await
if let Err(e) = run_async_processor_hono(&args).await {
error!("failed to start Hono processor: {e}");
process::exit(1);
}
}
Some(SUBCOMMAND_ZENOH) => {
info!("starting FMS data consumer for Zenoh");
run_async_processor_zenoh(&args).await
if let Err(e) = run_async_processor_zenoh(&args).await {
error!("failed to start Zenoh processor: {e}");
process::exit(1);
}
}
Some(_) => {
// cannot happen because subcommand is required
Expand Down
2 changes: 1 addition & 1 deletion components/fms-forwarder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ log = { workspace = true }
paho-mqtt = { version = "0.12", default-features = false, features = [
"vendored-ssl",
] }
zenoh = { workspace = true, features = ["transport_tcp"] }
protobuf = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand All @@ -60,6 +59,7 @@ tonic = { version = "0.11", default-features = false, features = [
"tls",
"prost",
] }
zenoh = { workspace = true, features = ["transport_tcp"] }

[build-dependencies]
protoc-bin-vendored = { workspace = true }
Expand Down
Loading
Loading