Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work on OpenTelemetry #2892

Merged
merged 8 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ impl MultiplexedConnection {
.pipeline
.send_single(cmd.get_packed_command(), self.response_timeout)
.await;

eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
if e.is_connection_dropped() {
Expand Down
21 changes: 21 additions & 0 deletions glide-core/redis-rs/redis/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{fmt, io};
use crate::connection::ConnectionLike;
use crate::pipeline::Pipeline;
use crate::types::{from_owned_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
use telemetrylib::GlideSpan;

/// An argument to a redis command
#[derive(Clone)]
Expand All @@ -30,6 +31,8 @@ pub struct Cmd {
cursor: Option<u64>,
// If it's true command's response won't be read from socket. Useful for Pub/Sub.
no_response: bool,
/// The span associated with this command
span: Option<GlideSpan>,
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
}

/// Represents a redis iterator.
Expand Down Expand Up @@ -321,6 +324,7 @@ impl Cmd {
args: vec![],
cursor: None,
no_response: false,
span: None,
}
}

Expand All @@ -331,6 +335,7 @@ impl Cmd {
args: Vec::with_capacity(arg_count),
cursor: None,
no_response: false,
span: None,
}
}

Expand Down Expand Up @@ -360,6 +365,16 @@ impl Cmd {
self
}

/// Associate a trackable span to the command. This allow tracking the lifetime
/// of the command.
///
/// A span is used by an OpenTelemetry backend to track the lifetime of the command
#[inline]
pub fn with_span(&mut self, name: &str) -> &mut Cmd {
self.span = Some(telemetrylib::GlideOpenTelemetry::new_span(name));
self
}

/// Works similar to `arg` but adds a cursor argument. This is always
/// an integer and also flips the command implementation to support a
/// different mode for the iterators where the iterator will ask for
Expand Down Expand Up @@ -582,6 +597,12 @@ impl Cmd {
pub fn is_no_response(&self) -> bool {
self.no_response
}

/// Return this command span
#[inline]
pub fn span(&self) -> Option<GlideSpan> {
self.span.clone()
}
}

impl fmt::Debug for Cmd {
Expand Down
11 changes: 9 additions & 2 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,17 @@ async fn send_command(
mut client: Client,
routing: Option<RoutingInfo>,
) -> ClientUsageResult<Value> {
client
if let Some(span) = cmd.span() {
span.add_event("RequestSent");
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
}
let res = client
.send_command(&cmd, routing)
.await
.map_err(|err| err.into())
.map_err(|err| err.into());
if let Some(span) = cmd.span() {
span.add_event("ResponseArrived");
}
res
}

// Parse the cluster scan command parameters from protobuf and send the command to redis-rs.
Expand Down
6 changes: 6 additions & 0 deletions glide-core/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ authors = ["Valkey GLIDE Maintainers"]
lazy_static = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
chrono = "0"
futures-util = "0"
tokio = { version = "1", features = ["macros", "time"] }

opentelemetry = "0"
opentelemetry_sdk = { version = "0", features = ["rt-tokio"] }
5 changes: 5 additions & 0 deletions glide-core/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use lazy_static::lazy_static;
use serde::Serialize;
use std::sync::RwLock as StdRwLock;
mod open_telemetry;
mod open_telemetry_exporter_file;

pub use open_telemetry::{GlideOpenTelemetry, GlideSpan};
pub use open_telemetry_exporter_file::SpanExporterFile;

#[derive(Default, Serialize)]
#[allow(dead_code)]
Expand Down
240 changes: 240 additions & 0 deletions glide-core/telemetry/src/open_telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
use opentelemetry::global::ObjectSafeSpan;
use opentelemetry::trace::SpanKind;
use opentelemetry::{global, trace::Tracer};
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::TracerProvider;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

const SPAN_WRITE_LOCK_ERR: &str = "Failed to get span write lock";
const TRACE_SCOPE: &str = "valkey_glide";
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved

pub enum GlideSpanStatus {
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
Ok,
Error(String),
}

#[allow(dead_code)]
#[derive(Clone, Debug)]
/// Defines the tracing exporter between GLIDE -> collector
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
pub enum GlideOpenTelemetryTraceExporter {
/// Collector is listening on grpc
Grpc(String),
/// Collector is listening on http
Http(String),
/// No collector. Instead, write the traces collected to a file. The contained value "PathBuf"
/// points to the folder where the collected data should be placed.
File(PathBuf),
}

#[derive(Clone, Debug)]
struct GlideSpanInner {
span: Arc<RwLock<opentelemetry::global::BoxedSpan>>,
}

impl GlideSpanInner {
pub fn new(name: &str) -> Self {
let tracer = global::tracer(TRACE_SCOPE);
let span = Arc::new(RwLock::new(
tracer
.span_builder(name.to_string())
.with_kind(SpanKind::Client)
.start(&tracer),
));
GlideSpanInner { span }
}

/// Attach event with name and list of attributes to this span.
pub fn add_event(&self, name: &str, attributes: Option<&Vec<(&str, &str)>>) {
let attributes: Vec<opentelemetry::KeyValue> = if let Some(attributes) = attributes {
attributes
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string()))
.collect()
} else {
Vec::<opentelemetry::KeyValue>::default()
};
self.span
.write()
.expect(SPAN_WRITE_LOCK_ERR)
.add_event_with_timestamp(
name.to_string().into(),
std::time::SystemTime::now(),
attributes,
);
}

pub fn set_status(&self, status: GlideSpanStatus) {
match status {
GlideSpanStatus::Ok => self
.span
.write()
.expect(SPAN_WRITE_LOCK_ERR)
.set_status(opentelemetry::trace::Status::Ok),
GlideSpanStatus::Error(what) => {
self.span.write().expect(SPAN_WRITE_LOCK_ERR).set_status(
opentelemetry::trace::Status::Error {
description: what.into(),
},
)
}
}
}
}

#[derive(Clone, Debug)]
pub struct GlideSpan {
inner: GlideSpanInner,
}

impl GlideSpan {
pub fn new(name: &str) -> Self {
GlideSpan {
inner: GlideSpanInner::new(name),
}
}

/// Attach event with name to this span.
pub fn add_event(&self, name: &str) {
self.inner.add_event(name, None)
}

/// Attach event with name and attributes to this span.
pub fn add_event_with_attributes(&self, name: &str, attributes: &Vec<(&str, &str)>) {
self.inner.add_event(name, Some(attributes))
}

pub fn set_status(&self, status: GlideSpanStatus) {
self.inner.set_status(status)
}
}

/// OpenTelemetry configuration object. Use `GlideOpenTelemetryConfigBuilder` to construct it:
///
/// ```rust,no_compile
/// let config = GlideOpenTelemetryConfigBuilder::default()
/// .with_flush_interval(std::time::Duration::from_millis(100))
/// .build();
/// GlideOpenTelemetry::initialise(config);
/// ```
pub struct GlideOpenTelemetryConfig {
/// Default delay interval between two consecutive exports.
span_flush_interval: std::time::Duration,
/// Determines the protocol between the collector and GLIDE
trace_exporter: GlideOpenTelemetryTraceExporter,
}

#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct GlideOpenTelemetryConfigBuilder {
span_flush_interval: std::time::Duration,
trace_exporter: GlideOpenTelemetryTraceExporter,
}

impl Default for GlideOpenTelemetryConfigBuilder {
fn default() -> Self {
GlideOpenTelemetryConfigBuilder {
span_flush_interval: std::time::Duration::from_millis(5_000),
trace_exporter: GlideOpenTelemetryTraceExporter::File(std::env::temp_dir()),
}
}
}

#[allow(dead_code)]
impl GlideOpenTelemetryConfigBuilder {
pub fn with_flush_interval(mut self, duration: std::time::Duration) -> Self {
self.span_flush_interval = duration;
self
}

pub fn with_trace_exporter(mut self, protocol: GlideOpenTelemetryTraceExporter) -> Self {
self.trace_exporter = protocol;
self
}

pub fn build(self) -> GlideOpenTelemetryConfig {
GlideOpenTelemetryConfig {
span_flush_interval: self.span_flush_interval,
trace_exporter: self.trace_exporter,
}
}
}

pub struct GlideOpenTelemetry {}

/// Our interface to OpenTelemetry
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
impl GlideOpenTelemetry {
/// Initialise the open telemetry library with a file system exporter
///
/// This method should be called once for the given **process**
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
pub fn initialise(config: GlideOpenTelemetryConfig) {
let trace_exporter = match config.trace_exporter {
GlideOpenTelemetryTraceExporter::File(p) => {
let exporter = crate::SpanExporterFile::new(p);
let batch_config = opentelemetry_sdk::trace::BatchConfigBuilder::default()
.with_scheduled_delay(config.span_flush_interval)
.build();
opentelemetry_sdk::trace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.with_batch_config(batch_config)
.build()
}
GlideOpenTelemetryTraceExporter::Http(_url) => {
todo!("HTTP protocol is not implemented yet!")
}
GlideOpenTelemetryTraceExporter::Grpc(_url) => {
todo!("GRPC protocol is not implemented yet!")
}
};

global::set_text_map_propagator(TraceContextPropagator::new());
let provider = TracerProvider::builder()
.with_span_processor(trace_exporter)
.build();
global::set_tracer_provider(provider);
}

/// Create new span
pub fn new_span(name: &str) -> GlideSpan {
GlideSpan::new(name)
}

/// Trigger a shutdown procedure flushing all remaining traces
pub fn shutdown() {
global::shutdown_tracer_provider();
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_span_json_exporter() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let config = GlideOpenTelemetryConfigBuilder::default()
.with_flush_interval(std::time::Duration::from_millis(100))
.with_trace_exporter(GlideOpenTelemetryTraceExporter::File(PathBuf::from("/tmp")))
.build();
GlideOpenTelemetry::initialise(config);
let span = GlideOpenTelemetry::new_span("span_with_1_event");
span.add_event("Event1");
span.set_status(GlideSpanStatus::Ok);

drop(span); // writes the span
let span = GlideOpenTelemetry::new_span("span_with_2_events");
span.add_event("Event1");
span.add_event("Event2");
span.set_status(GlideSpanStatus::Ok);
drop(span); // writes the span

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
});
}
}
Loading
Loading