From abf31c09912aecbb85ce94247e33db926ab912b8 Mon Sep 17 00:00:00 2001 From: Kavya G Date: Thu, 28 Mar 2024 20:50:36 +0530 Subject: [PATCH] Add distributed open telemetry tracing (#6) We would like to add distributed open-telemetry tracing to the fmaas-router. This requires 3 changes on the fmaas-router: 1. Enabling open-telemetry tracing in the fmaas-router (this is done by the `init_logging` function in the newly created `tracing.rs` file. 2. Propagating trace context information in the GRPC metadata to the TGIS router. This is done with helper functions `inject_context_span` etc. defined in the same file and used in the `server.rs` file for the `/generate`, `/generateStream` and `/tokenize` endpoints. Span tags have been added following the [Semantic Convention for RPC calls](https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/) 3. Receives trace context information from wx-inference-proxy. This is done with helper function `extract_context_spa Co-authored-by: Kavya --- Cargo.lock | 220 +++++++++++++++++++++++++++++ fmaas-router/Cargo.toml | 4 + fmaas-router/src/lib.rs | 3 +- fmaas-router/src/main.rs | 27 +--- fmaas-router/src/rpc/generation.rs | 34 +++-- fmaas-router/src/tracing_utils.rs | 129 +++++++++++++++++ 6 files changed, 388 insertions(+), 29 deletions(-) create mode 100644 fmaas-router/src/tracing_utils.rs diff --git a/Cargo.lock b/Cargo.lock index d65bb77..f8b6349 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,12 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +[[package]] +name = "bumpalo" +version = "3.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" + [[package]] name = "bytes" version = "1.5.0" @@ -265,6 +271,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "data-encoding" version = "2.5.0" @@ -327,6 +348,9 @@ dependencies = [ "futures", "ginepro", "mio", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "prost", "prost-types", "rustls-webpki", @@ -336,6 +360,7 @@ dependencies = [ "tonic", "tonic-build", "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -477,6 +502,12 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.25" @@ -689,6 +720,15 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -808,6 +848,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -833,6 +882,89 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" + +[[package]] +name = "opentelemetry_sdk" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "ordered-float" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" +dependencies = [ + "num-traits", +] + [[package]] name = "overload" version = "0.1.1" @@ -1586,6 +1718,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -1713,6 +1863,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.1" @@ -1740,6 +1896,70 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "which" version = "4.4.2" diff --git a/fmaas-router/Cargo.toml b/fmaas-router/Cargo.toml index 20a3a00..1aca6a4 100644 --- a/fmaas-router/Cargo.toml +++ b/fmaas-router/Cargo.toml @@ -27,6 +27,10 @@ prost = "^0.12.3" prost-types = "^0.12.3" serde_yaml = "^0.9.33" serde = { version = "^1.0.197", features = ["derive"] } +opentelemetry = { version = "0.22", features = ["trace"] } +opentelemetry_sdk = {version = "0.22", features = ["rt-tokio"]} +opentelemetry-otlp = "0.15.0" +tracing-opentelemetry = "0.23.0" mio = "^0.8.11" # Override to address CVE-2024-27308 rustls-webpki = "^0.102.2" # Override to address WS-2023-0305, CVE-2018-16875 diff --git a/fmaas-router/src/lib.rs b/fmaas-router/src/lib.rs index e109058..db97981 100644 --- a/fmaas-router/src/lib.rs +++ b/fmaas-router/src/lib.rs @@ -11,6 +11,7 @@ use tracing::info; mod pb; pub mod rpc; pub mod server; +pub mod tracing_utils; #[derive(Debug, Clone, Deserialize)] pub struct ServiceAddr { @@ -118,4 +119,4 @@ async fn create_clients( .expect("Error creating upstream service clients") .into_iter() .collect() -} \ No newline at end of file +} diff --git a/fmaas-router/src/main.rs b/fmaas-router/src/main.rs index d978237..2450e1b 100644 --- a/fmaas-router/src/main.rs +++ b/fmaas-router/src/main.rs @@ -1,8 +1,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use clap::Parser; -use fmaas_router::{server, ModelMap}; -use tracing_subscriber::EnvFilter; +use fmaas_router::{server, tracing_utils::init_logging, ModelMap}; /// App Configuration #[derive(Parser, Debug)] @@ -28,30 +27,16 @@ struct Args { upstream_tls: bool, #[clap(long, env)] upstream_tls_ca_cert_path: Option, + #[clap(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")] + otlp_endpoint: Option, + #[clap(long, env = "OTEL_SERVICE_NAME", default_value = "fmaas-router")] + otlp_service_name: String, } fn main() -> Result<(), std::io::Error> { //Get args let args = Args::parse(); - // Configure log level; use info by default - let filter_layer = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("info")) - .unwrap(); - - if args.json_output { - tracing_subscriber::fmt() - .json() - .with_env_filter(filter_layer) - .with_current_span(false) - .init(); - } else { - tracing_subscriber::fmt() - .compact() - .with_env_filter(filter_layer) - .init(); - } - if args.tls_key_path.is_some() != args.tls_cert_path.is_some() { panic!("tls: must provide both cert and key") } @@ -71,6 +56,8 @@ fn main() -> Result<(), std::io::Error> { let grpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), args.grpc_port); let http_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), args.probe_port); + init_logging(args.otlp_service_name, args.json_output, args.otlp_endpoint); + server::run( grpc_addr, http_addr, diff --git a/fmaas-router/src/rpc/generation.rs b/fmaas-router/src/rpc/generation.rs index abd4ac0..86de443 100644 --- a/fmaas-router/src/rpc/generation.rs +++ b/fmaas-router/src/rpc/generation.rs @@ -9,7 +9,7 @@ use crate::{pb::fmaas::{ generation_service_server::GenerationService, BatchedGenerationRequest, BatchedGenerationResponse, BatchedTokenizeRequest, BatchedTokenizeResponse, GenerationResponse, ModelInfoRequest, ModelInfoResponse, SingleGenerationRequest, -}, create_clients, ServiceAddr}; +}, create_clients, ServiceAddr, tracing_utils::{ExtractTelemetryContext, InjectTelemetryContext}}; #[derive(Debug, Default)] pub struct GenerationServicer { @@ -42,7 +42,6 @@ impl GenerationServicer { #[tonic::async_trait] impl GenerationService for GenerationServicer { - #[instrument(skip_all)] async fn generate( &self, request: Request, @@ -54,12 +53,23 @@ impl GenerationService for GenerationServicer { })); } debug!("Routing generation request for Model ID {}", &br.model_id); - self.client(&br.model_id).await?.generate(request).await + let mut client = self.client(&br.model_id).await?; + let mut span = tracing::info_span!( + "fmaas.GenerationService/Generate", + rpc.system = "grpc", + rpc.method = "Generate", + rpc.service = "GenerationService", + model_id = br.model_id + ); + // Extract span info from the request metadata and set to current span + let request = request + .extract_context_span(&mut span) + .inject_context_span(&span); // Inject span info into request metadata + client.generate(request).await } type GenerateStreamStream = Streaming; - #[instrument(skip_all)] async fn generate_stream( &self, request: Request, @@ -72,10 +82,18 @@ impl GenerationService for GenerationServicer { "Routing streaming generation request for Model ID {}", &sr.model_id ); - self.client(&sr.model_id) - .await? - .generate_stream(request) - .await + let mut client = self.client(&sr.model_id).await?; + let mut span = tracing::info_span!( + "fmaas.GenerationService/GenerateStream", + rpc.system = "grpc", + rpc.method = "GenerateStream", + rpc.service = "GenerationService", + model_id = sr.model_id + ); + let request = request + .extract_context_span(&mut span) + .inject_context_span(&span); + client.generate_stream(request).await } #[instrument(skip_all)] diff --git a/fmaas-router/src/tracing_utils.rs b/fmaas-router/src/tracing_utils.rs new file mode 100644 index 0000000..7a90052 --- /dev/null +++ b/fmaas-router/src/tracing_utils.rs @@ -0,0 +1,129 @@ +//! Inspired by: https://github.com/open-telemetry/opentelemetry-rust gRPC examples +use opentelemetry::{ + global, + propagation::{Extractor, Injector}, + KeyValue, +}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace, trace::Sampler, Resource}; +use tonic::Request; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; + +struct MetadataExtractor<'a>(&'a tonic::metadata::MetadataMap); + +impl<'a> Extractor for MetadataExtractor<'a> { + /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the MetadataMap. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::>() + } +} + +/// Inject context in the metadata of a gRPC request. +struct MetadataInjector<'a>(pub &'a mut tonic::metadata::MetadataMap); + +impl<'a> Injector for MetadataInjector<'a> { + /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = value.parse() { + self.0.insert(key, val); + } + } + } +} + +/// Get context from a span and inject into GRPC requests's metadata +fn inject_span(metadata: &mut tonic::metadata::MetadataMap, span: &Span) { + let ctx = span.context(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&ctx, &mut MetadataInjector(metadata)) + }) +} + +pub trait InjectTelemetryContext { + fn inject_context_span(self, span: &Span) -> Self; +} + +impl InjectTelemetryContext for Request { + fn inject_context_span(mut self, span: &Span) -> Self { + inject_span(self.metadata_mut(), span); + self + } +} + +/// Extract context from metadata and set as passed span's context +fn extract_span(metadata: &tonic::metadata::MetadataMap, span: &mut Span) { + let parent_cx = + global::get_text_map_propagator(|prop| prop.extract(&MetadataExtractor(metadata))); + span.set_parent(parent_cx); +} + +pub trait ExtractTelemetryContext { + fn extract_context_span(self, span: &mut Span) -> Self; +} + +impl ExtractTelemetryContext for Request { + fn extract_context_span(self, span: &mut Span) -> Self { + extract_span(self.metadata(), span); + self + } +} + +pub fn init_logging(service_name: String, json_output: bool, otlp_endpoint: Option) { + let mut layers = Vec::new(); + // Configure log level; use info by default + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); + + let fmt_layer = match json_output { + true => tracing_subscriber::fmt::layer() + .json() + .flatten_event(true) + .boxed(), + false => tracing_subscriber::fmt::layer().boxed(), + }; + layers.push(fmt_layer); + + if let Some(tracing_otlp_endpoint) = otlp_endpoint { + global::set_text_map_propagator(TraceContextPropagator::new()); + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(tracing_otlp_endpoint), + ) + .with_trace_config( + trace::config() + .with_resource(Resource::new(vec![KeyValue::new( + "service.name", + service_name, + )])) + .with_sampler(Sampler::AlwaysOn), + ) + .install_batch(opentelemetry_sdk::runtime::Tokio); + + if let Ok(tracer) = tracer { + layers.push(tracing_opentelemetry::layer().with_tracer(tracer).boxed()); + }; + } + + tracing_subscriber::registry() + .with(filter_layer) + .with(layers) + .init(); +}