Skip to content

Commit

Permalink
Core: Open telemetry client config (#3023)
Browse files Browse the repository at this point in the history
* add OpenTelemetry client configs

Signed-off-by: Adar Ovadia <[email protected]>

---------

Signed-off-by: Adar Ovadia <[email protected]>
Co-authored-by: Adar Ovadia <[email protected]>
  • Loading branch information
adarovadya and Adar Ovadia authored Feb 4, 2025
1 parent 7b5c4c9 commit cd94f51
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 7 deletions.
14 changes: 14 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rand::Rng;
#[cfg(feature = "cluster-async")]
use std::ops::Add;
use std::time::Duration;
use telemetrylib::GlideOpenTelemetryConfig;

#[cfg(feature = "tls-rustls")]
use crate::tls::TlsConnParams;
Expand Down Expand Up @@ -49,6 +50,7 @@ struct BuilderParams {
response_timeout: Option<Duration>,
protocol: ProtocolVersion,
pubsub_subscriptions: Option<PubSubSubscriptionInfo>,
open_telemetry_config: Option<GlideOpenTelemetryConfig>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -402,6 +404,18 @@ impl ClusterClientBuilder {
self
}

/// Set OpenTelemetry configuration for this client
///
/// # Parameters
/// - `open_telemetry_config`: Use the `open_telemetry_config` property to specify the endpoint of the collector to export the measurments.
pub fn open_telemetry_config(
mut self,
open_telemetry_config: GlideOpenTelemetryConfig,
) -> ClusterClientBuilder {
self.builder_params.open_telemetry_config = Some(open_telemetry_config);
self
}

/// Enables periodic topology checks for this client.
///
/// If enabled, periodic topology checks will be executed at the configured intervals to examine whether there
Expand Down
76 changes: 76 additions & 0 deletions glide-core/redis-rs/redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ mod cluster_async {
use futures_time::{future::FutureExt, task::sleep};
use once_cell::sync::Lazy;
use std::ops::Add;
use std::str::FromStr;
use telemetrylib::*;

use redis::{
aio::{ConnectionLike, MultiplexedConnection},
Expand Down Expand Up @@ -206,6 +208,80 @@ mod cluster_async {
.unwrap();
}

#[tokio::test]
async fn test_async_open_telemetry_config() {
let glide_ot_config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(400))
.with_trace_exporter(
GlideOpenTelemetryTraceExporter::from_str("http://valid-url.com").unwrap(),
)
.build();
let result = std::panic::catch_unwind(|| {
GlideOpenTelemetry::initialise(glide_ot_config.clone());
});
assert!(result.is_err(), "Expected a panic but no panic occurred");

// Check the panic message
if let Err(err) = result {
let panic_msg = err
.downcast_ref::<String>()
.map(String::as_str)
.or_else(|| err.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic message");

assert!(
panic_msg.contains("not yet implemented: HTTP protocol is not implemented yet!"),
"Unexpected panic message: {}",
panic_msg
);
}
}

#[tokio::test]
async fn test_async_open_telemetry_invalid_config() {
let result = GlideOpenTelemetryTraceExporter::from_str("invalid-protocol.com");
assert!(result.is_err(), "Expected `from_str` to return an error");
assert_eq!(
result.unwrap_err().kind(),
std::io::ErrorKind::InvalidInput,
"Expected ErrorKind::InvalidInput"
);
}

#[tokio::test]
async fn test_async_open_telemetry_interval_config() {
let exporter = GlideOpenTelemetryTraceExporter::from_str("http://valid-url.com").unwrap();
let glide_ot_config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(400))
.with_trace_exporter(exporter.clone())
.build();
assert_eq!(GlideOpenTelemetry::get_span_interval(glide_ot_config), 400);
// check the default interval
let glide_ot_config = GlideOpenTelemetryConfigBuilder::default()
.with_trace_exporter(exporter)
.build();
assert_eq!(
GlideOpenTelemetry::get_span_interval(glide_ot_config.clone()),
5000
);

let cluster = TestClusterContext::new(3, 0);

let cluster_addresses: Vec<_> = cluster
.cluster
.servers
.iter()
.map(|server| server.connection_info())
.collect();
ClusterClient::builder(cluster_addresses.clone())
.open_telemetry_config(glide_ot_config.clone())
.build()
.unwrap()
.get_async_connection(None)
.await
.unwrap();
}

#[tokio::test]
async fn test_routing_by_slot_to_replica_with_az_affinity_strategy_to_half_replicas() {
// Skip test if version is less then Valkey 8.0
Expand Down
21 changes: 21 additions & 0 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use redis::{
};
pub use standalone_client::StandaloneClient;
use std::io;
use std::str::FromStr;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -28,6 +29,7 @@ mod reconnecting_connection;
mod standalone_client;
mod value_conversion;
use redis::InfoDict;
use telemetrylib::*;
use tokio::sync::mpsc;
use versions::Versioning;

Expand Down Expand Up @@ -661,13 +663,15 @@ pub enum ConnectionError {
Standalone(standalone_client::StandaloneClientConnectionError),
Cluster(redis::RedisError),
Timeout,
IoError(std::io::Error),
}

impl std::fmt::Debug for ConnectionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Standalone(arg0) => f.debug_tuple("Standalone").field(arg0).finish(),
Self::Cluster(arg0) => f.debug_tuple("Cluster").field(arg0).finish(),
Self::IoError(arg0) => f.debug_tuple("IoError").field(arg0).finish(),
Self::Timeout => write!(f, "Timeout"),
}
}
Expand All @@ -678,6 +682,7 @@ impl std::fmt::Display for ConnectionError {
match self {
ConnectionError::Standalone(err) => write!(f, "{err:?}"),
ConnectionError::Cluster(err) => write!(f, "{err}"),
ConnectionError::IoError(err) => write!(f, "{err}"),
ConnectionError::Timeout => f.write_str("connection attempt timed out"),
}
}
Expand Down Expand Up @@ -800,6 +805,22 @@ impl Client {
let inflight_requests_allowed = Arc::new(AtomicIsize::new(
inflight_requests_limit.try_into().unwrap(),
));

if let Some(endpoint_str) = &request.otel_endpoint {
let trace_exporter = GlideOpenTelemetryTraceExporter::from_str(endpoint_str.as_str())
.map_err(ConnectionError::IoError)?;
let config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(
request
.otel_span_flush_interval_ms
.unwrap_or(DEFAULT_FLUSH_SPAN_INTERVAL_MS),
))
.with_trace_exporter(trace_exporter)
.build();

GlideOpenTelemetry::initialise(config);
};

tokio::time::timeout(DEFAULT_CLIENT_CREATION_TIMEOUT, async move {
let internal_client = if request.cluster_mode_enabled {
let client = create_cluster_client(request, push_sender)
Expand Down
7 changes: 7 additions & 0 deletions glide-core/src/client/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct ConnectionRequest {
pub periodic_checks: Option<PeriodicCheck>,
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
pub inflight_requests_limit: Option<u32>,
pub otel_endpoint: Option<String>,
pub otel_span_flush_interval_ms: Option<u64>,
}

pub struct AuthenticationInfo {
Expand Down Expand Up @@ -206,6 +208,9 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {

let inflight_requests_limit = none_if_zero(value.inflight_requests_limit);

let otel_endpoint = chars_to_string_option(&value.opentelemetry_config.collector_end_point);
let otel_span_flush_interval_ms = value.opentelemetry_config.span_flush_interval;

ConnectionRequest {
read_from,
client_name,
Expand All @@ -221,6 +226,8 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
periodic_checks,
pubsub_subscriptions,
inflight_requests_limit,
otel_endpoint,
otel_span_flush_interval_ms,
}
}
}
7 changes: 7 additions & 0 deletions glide-core/src/protobuf/connection_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ message PubSubSubscriptions
map<uint32, PubSubChannelsOrPatterns> channels_or_patterns_by_type = 1;
}

message OpenTelemetryConfig
{
string collector_end_point = 1;
optional uint64 span_flush_interval= 2;
}

// IMPORTANT - if you add fields here, you probably need to add them also in client/mod.rs:`sanitized_request_string`.
message ConnectionRequest {
repeated NodeAddress addresses = 1;
Expand All @@ -72,6 +78,7 @@ message ConnectionRequest {
uint32 inflight_requests_limit = 14;
string client_az = 15;
uint32 connection_timeout = 16;
OpenTelemetryConfig opentelemetry_config = 17;
}

message ConnectionRetryStrategy {
Expand Down
1 change: 1 addition & 0 deletions glide-core/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ serde_json = "1"
chrono = "0"
futures-util = "0"
tokio = { version = "1", features = ["macros", "time"] }
url = "2"

opentelemetry = "0"
opentelemetry_sdk = { version = "0", features = ["rt-tokio"] }
2 changes: 1 addition & 1 deletion glide-core/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::RwLock as StdRwLock;
mod open_telemetry;
mod open_telemetry_exporter_file;

pub use open_telemetry::{GlideOpenTelemetry, GlideSpan};
pub use open_telemetry::*;
pub use open_telemetry_exporter_file::SpanExporterFile;

#[derive(Default, Serialize)]
Expand Down
53 changes: 48 additions & 5 deletions glide-core/telemetry/src/open_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ use opentelemetry::trace::TraceContextExt;
use opentelemetry::{global, trace::Tracer};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::TracerProvider;
use std::io::{Error, ErrorKind};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use url::Url;

const SPAN_WRITE_LOCK_ERR: &str = "Failed to get span write lock";
const SPAN_READ_LOCK_ERR: &str = "Failed to get span read lock";
const TRACE_SCOPE: &str = "valkey_glide";

/// Default interval in milliseconds for flushing open telemetry data to the collector.
pub const DEFAULT_FLUSH_SPAN_INTERVAL_MS: u64 = 5000;

pub enum GlideSpanStatus {
Ok,
Error(String),
Expand All @@ -31,6 +36,38 @@ pub enum GlideOpenTelemetryTraceExporter {
File(PathBuf),
}

impl std::str::FromStr for GlideOpenTelemetryTraceExporter {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
parse_endpoint(s)
}
}

fn parse_endpoint(endpoint: &str) -> Result<GlideOpenTelemetryTraceExporter, Error> {
// Parse the URL using the `url` crate to validate it
let url = Url::parse(endpoint)
.map_err(|_| Error::new(ErrorKind::InvalidInput, format!("Parse error. {endpoint}")))?;

match url.scheme() {
"http" => Ok(GlideOpenTelemetryTraceExporter::Http(format!(
"{}:{}",
url.host_str().unwrap_or("127.0.0.1"),
url.port().unwrap_or(80)
))), // HTTP endpoint
"https" => Ok(GlideOpenTelemetryTraceExporter::Http(format!(
"{}:{}",
url.host_str().unwrap_or("127.0.0.1"),
url.port().unwrap_or(443)
))), // HTTPS endpoint
"grpc" => Ok(GlideOpenTelemetryTraceExporter::Grpc(format!(
"{}:{}",
url.host_str().unwrap_or("127.0.0.1"),
url.port().unwrap_or(80)
))), // gRPC endpoint
_ => Err(Error::new(ErrorKind::InvalidInput, endpoint)),
}
}

#[derive(Clone, Debug)]
struct GlideSpanInner {
span: Arc<RwLock<opentelemetry::global::BoxedSpan>>,
Expand Down Expand Up @@ -188,6 +225,7 @@ impl GlideSpan {
/// .build();
/// GlideOpenTelemetry::initialise(config);
/// ```
#[derive(Clone, Debug)]
pub struct GlideOpenTelemetryConfig {
/// Default delay interval between two consecutive exports.
span_flush_interval: std::time::Duration,
Expand All @@ -205,13 +243,12 @@ pub struct GlideOpenTelemetryConfigBuilder {
impl Default for GlideOpenTelemetryConfigBuilder {
fn default() -> Self {
GlideOpenTelemetryConfigBuilder {
span_flush_interval: std::time::Duration::from_millis(5_000),
span_flush_interval: std::time::Duration::from_millis(DEFAULT_FLUSH_SPAN_INTERVAL_MS),
trace_exporter: GlideOpenTelemetryTraceExporter::File(std::env::temp_dir()),
}
}
}

#[allow(dead_code)]
impl GlideOpenTelemetryConfigBuilder {
pub fn with_flush_interval(mut self, duration: std::time::Duration) -> Self {
self.span_flush_interval = duration;
Expand All @@ -231,6 +268,7 @@ impl GlideOpenTelemetryConfigBuilder {
}
}

#[derive(Clone)]
pub struct GlideOpenTelemetry {}

/// Our interface to OpenTelemetry
Expand All @@ -239,12 +277,13 @@ impl GlideOpenTelemetry {
///
/// This method should be called once for the given **process**
pub fn initialise(config: GlideOpenTelemetryConfig) {
let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
.with_scheduled_delay(config.span_flush_interval)
.build();

let trace_exporter = match config.trace_exporter {
GlideOpenTelemetryTraceExporter::File(p) => {
let exporter = crate::SpanExporterFile::new(p);
let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
.with_scheduled_delay(config.span_flush_interval)
.build();
opentelemetry_sdk::trace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
Expand All @@ -267,6 +306,10 @@ impl GlideOpenTelemetry {
global::set_tracer_provider(provider);
}

pub fn get_span_interval(config: GlideOpenTelemetryConfig) -> u64 {
config.span_flush_interval.as_millis() as u64
}

/// Create new span
pub fn new_span(name: &str) -> GlideSpan {
GlideSpan::new(name)
Expand Down
Loading

0 comments on commit cd94f51

Please sign in to comment.