diff --git a/amqp/Cargo.toml b/amqp/Cargo.toml index 53ac1ab..7b38421 100644 --- a/amqp/Cargo.toml +++ b/amqp/Cargo.toml @@ -13,13 +13,13 @@ traces = { path = "../traces" } lapin = { version = "2.1.1" } opentelemetry = { version = "0.18.0" } -uuid = { version = "1.2.2", features = ["v4"] } -async-trait = { version = "0.1.63" } +uuid = { version = "1.3.0", features = ["v4"] } +async-trait = { version = "0.1.64" } tracing = { version = "0.1.37" } serde_json = { version = "1.0.91" } serde = { version = "1.0.152", features = ["derive"] } -tokio = { version = "1.24.2", features = ["default"] } -futures-util = { version = "0.3.25"} +tokio = { version = "1.25.0", features = ["default"] } +futures-util = { version = "0.3.26"} # Used only with feature mock mockall = { version = "0.11.3", optional = true } diff --git a/env/src/configs.rs b/env/src/configs.rs index bfdcac4..012dfe6 100644 --- a/env/src/configs.rs +++ b/env/src/configs.rs @@ -13,13 +13,16 @@ pub struct Configs { pub health_readiness: HealthReadinessConfig, pub dynamic: T, + + ///Default: 15000 + pub multiple_message_timer: i32, } pub trait DynamicConfig: Default { fn load(&self); } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, Copy)] pub struct Empty; impl DynamicConfig for Empty { fn load(&self) {} diff --git a/env/src/configs_builder.rs b/env/src/configs_builder.rs index c94c959..4703db9 100644 --- a/env/src/configs_builder.rs +++ b/env/src/configs_builder.rs @@ -2,9 +2,9 @@ use crate::{ configs::{AppConfig, Configs, DynamicConfig}, def::{ AMQP_HOST_ENV_KEY, AMQP_PASSWORD_ENV_KEY, AMQP_PORT_ENV_KEY, AMQP_USER_ENV_KEY, - AMQP_VHOST_ENV_KEY, APP_NAME_ENV_KEY, APP_PORT_ENV_KEY, AWS_ACCESS_KEY_ID_ENV_KEY, - AWS_DEFAULT_REGION, AWS_REGION_ENV_KEY_ENV_KEY, AWS_SECRET_ACCESS_KEY, - DYNAMO_ENDPOINT_ENV_KEY, DYNAMO_TABLE_ENV_KEY, ENABLE_HEALTH_READINESS_ENV_KEY, + AMQP_VHOST_ENV_KEY, APP_NAME_ENV_KEY, APP_PORT_ENV_KEY, AWS_DEFAULT_REGION, + CUSTOM_AWS_ACCESS_KEY_ID_ENV_KEY, CUSTOM_AWS_SECRET_ACCESS_KEY, DYNAMO_ENDPOINT_ENV_KEY, + DYNAMO_REGION_ENV_KEY, DYNAMO_TABLE_ENV_KEY, ENABLE_HEALTH_READINESS_ENV_KEY, ENABLE_METRICS_ENV_KEY, ENABLE_TRACES_ENV_KEY, HEALTH_READINESS_PORT_ENV_KEY, HOST_NAME_ENV_KEY, LOG_LEVEL_ENV_KEY, MQTT_HOST_ENV_KEY, MQTT_PASSWORD_ENV_KEY, MQTT_PORT_ENV_KEY, MQTT_USER_ENV_KEY, OTLP_ACCESS_KEY_ENV_KEY, OTLP_EXPORT_TIMEOUT_ENV_KEY, @@ -117,7 +117,7 @@ impl ConfigBuilder { let name = env::var(APP_NAME_ENV_KEY).unwrap_or_default(); let secret_key = env::var(SECRET_KEY_ENV_KEY).unwrap_or_default(); - let host = env::var(APP_PORT_ENV_KEY).unwrap_or_default(); + let host = env::var(HOST_NAME_ENV_KEY).unwrap_or_default(); let port = env::var(APP_PORT_ENV_KEY) .unwrap_or("3000".to_owned()) .parse() @@ -181,6 +181,7 @@ impl ConfigBuilder { T: DynamicConfig, { let mut cfg = Configs::default(); + cfg.app = self.app_cfg.clone(); for (key, value) in env::vars() { match key.as_str() { @@ -271,15 +272,14 @@ impl ConfigBuilder { DYNAMO_TABLE_ENV_KEY if self.dynamo => { cfg.dynamo.table = self.get_string_from_secret(value, "table".to_owned()); } - AWS_REGION_ENV_KEY_ENV_KEY => { + DYNAMO_REGION_ENV_KEY if self.dynamo => { let region = self.get_string_from_secret(value, AWS_DEFAULT_REGION.to_owned()); cfg.dynamo.region = region.clone(); - cfg.aws.region = region; } - AWS_ACCESS_KEY_ID_ENV_KEY if self.aws => { + CUSTOM_AWS_ACCESS_KEY_ID_ENV_KEY if self.aws => { cfg.aws.access_key_id = self.get_string_from_secret(value, "key".to_owned()); } - AWS_SECRET_ACCESS_KEY if self.aws => { + CUSTOM_AWS_SECRET_ACCESS_KEY if self.aws => { cfg.aws.secret_access_key = self.get_string_from_secret(value, "secret".to_owned()); } diff --git a/env/src/def.rs b/env/src/def.rs index 4651f15..4a23611 100644 --- a/env/src/def.rs +++ b/env/src/def.rs @@ -30,9 +30,9 @@ pub const POSTGRES_PASSWORD_ENV_KEY: &str = "POSTGRES_PASSWORD"; pub const POSTGRES_DB_ENV_KEY: &str = "POSTGRES_DB"; pub const DYNAMO_ENDPOINT_ENV_KEY: &str = "DYNAMO_ENDPOINT"; pub const DYNAMO_TABLE_ENV_KEY: &str = "DYNAMO_TABLE"; -pub const AWS_ACCESS_KEY_ID_ENV_KEY: &str = "AWS_ACCESS_KEY_ID"; -pub const AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; -pub const AWS_REGION_ENV_KEY_ENV_KEY: &str = "AWS_REGION"; +pub const DYNAMO_REGION_ENV_KEY: &str = "DYNAMO_REGION"; +pub const CUSTOM_AWS_ACCESS_KEY_ID_ENV_KEY: &str = "CUSTOM_AWS_ACCESS_KEY_ID"; +pub const CUSTOM_AWS_SECRET_ACCESS_KEY: &str = "CUSTOM_AWS_SECRET_ACCESS_KEY"; pub const HEALTH_READINESS_PORT_ENV_KEY: &str = "HEALTH_READINESS_PORT"; pub const ENABLE_HEALTH_READINESS_ENV_KEY: &str = "ENABLE_HEALTH_READINESS"; pub const SQLITE_FILE_NAME_ENV_KEY: &str = "SQLITE_FILE_NAME"; diff --git a/env/src/lib.rs b/env/src/lib.rs index 8fb32ad..134a19c 100644 --- a/env/src/lib.rs +++ b/env/src/lib.rs @@ -8,4 +8,4 @@ pub use configs::{ MQTTConfig, OTLPConfig, PostgresConfig, SqliteConfig, }; pub use configs_builder::ConfigBuilder; -pub use environment::Environment; +pub use environment::Environment; \ No newline at end of file diff --git a/errors/Cargo.toml b/errors/Cargo.toml index 2a7a78a..715e7fd 100644 --- a/errors/Cargo.toml +++ b/errors/Cargo.toml @@ -4,4 +4,4 @@ version = "0.1.0" edition = "2021" [dependencies] -thiserror = { version = "1.0.37" } \ No newline at end of file +thiserror = { version = "1.0.38" } \ No newline at end of file diff --git a/health_readiness/Cargo.toml b/health_readiness/Cargo.toml index d7a1fa2..d776f59 100644 --- a/health_readiness/Cargo.toml +++ b/health_readiness/Cargo.toml @@ -8,10 +8,10 @@ errors = { path = "../errors" } httpw = { path = "../httpw" } env = { path = "../env" } -async-trait = { version = "0.1.63" } +async-trait = { version = "0.1.64" } tracing = { version = "0.1.37" } -deadpool-postgres = { version = "0.10.4" } +deadpool-postgres = { version = "0.10.5" } lapin = { version = "2.1.1" } -paho-mqtt = { version = "0.12" } +paho-mqtt = { version = "0.12.0" } actix-web = { version = "4.3.0" } diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 85a3ff5..a5828c7 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -10,4 +10,4 @@ opentelemetry = { version = "0.18.0", features = ["rt-tokio", "metrics"] } opentelemetry-otlp = { version = "0.11.0", features = ["tonic", "metrics", "grpc-tonic" , "tls", "tls-roots"] } tracing = { version = "0.1.37" } tonic = { version = "0.8.3", features = ["tls"] } -tokio = { version = "1.24.2", features = ["default"] } \ No newline at end of file +tokio = { version = "1.25.0", features = ["default"] } \ No newline at end of file diff --git a/migrator/Cargo.toml b/migrator/Cargo.toml index c295ad9..8b59fe0 100644 --- a/migrator/Cargo.toml +++ b/migrator/Cargo.toml @@ -12,7 +12,7 @@ env = { path = "../env" } logging = { path = "../logging" } errors = { path = "../errors" } -async-trait = { version = "0.1.63" } +async-trait = { version = "0.1.64" } tracing = { version = "0.1.37" } deadpool-postgres = { version = "0.10.2", optional = true } diff --git a/mqtt/Cargo.toml b/mqtt/Cargo.toml index 30ae5c5..a3965d7 100644 --- a/mqtt/Cargo.toml +++ b/mqtt/Cargo.toml @@ -12,9 +12,9 @@ env = { path = "../env" } traces = { path = "../traces" } opentelemetry = { version = "0.18.0" } tracing = { version = "0.1.37" } -async-trait = { version = "0.1.63" } +async-trait = { version = "0.1.64" } bytes = { version = "1.2.1", features = ["serde"] } -paho-mqtt = { version = "0.12" } +paho-mqtt = { version = "0.12.0" } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.91" } futures-util = { version = "0.3.25" } diff --git a/secrets_manager/Cargo.toml b/secrets_manager/Cargo.toml index 35aa9be..72b668c 100644 --- a/secrets_manager/Cargo.toml +++ b/secrets_manager/Cargo.toml @@ -9,7 +9,7 @@ mocks = ["dep:mockall"] [dependencies] errors = { path = "../errors" } -async-trait = { version = "0.1.63" } +async-trait = { version = "0.1.64" } tracing = { version = "0.1.37" } aws-config = { version = "0.51.0" } aws-sdk-secretsmanager = { version = "0.21.0" } @@ -20,4 +20,4 @@ mockall = { version = "0.11.3", optional = true } [dev-dependencies] mockall = { version = "0.11.3" } -tokio = { version = "1.24.2", features = ["macros"] } +tokio = { version = "1.25.0", features = ["macros"] } diff --git a/sql_pool/Cargo.toml b/sql_pool/Cargo.toml index adf0d55..060f295 100644 --- a/sql_pool/Cargo.toml +++ b/sql_pool/Cargo.toml @@ -14,6 +14,7 @@ errors = { path = "../errors" } ## deadpool-postgres = { version = "0.10.2", optional = true } tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-uuid-0_8"], optional = true } +## deadpool-sqlite = { version = "0.5.0", optional = true } tracing = { version = "0.1.37" } \ No newline at end of file diff --git a/traces/Cargo.toml b/traces/Cargo.toml index 33c7a7e..0046443 100644 --- a/traces/Cargo.toml +++ b/traces/Cargo.toml @@ -10,4 +10,4 @@ opentelemetry-otlp = { version = "0.11.0", features = ["tonic", "tls", "tls-root tracing = { version = "0.1.37" } serde = { version = "1.0.152", features = ["derive"] } tonic = { version = "0.8.1", features = ["tls"] } -tokio = { version = "1.24.2", features = ["default"] } \ No newline at end of file +tokio = { version = "1.25.0", features = ["default"] } \ No newline at end of file diff --git a/traces/src/amqp.rs b/traces/src/amqp.rs deleted file mode 100644 index 65b1383..0000000 --- a/traces/src/amqp.rs +++ /dev/null @@ -1,93 +0,0 @@ -use opentelemetry::{ - global, - global::{BoxedSpan, BoxedTracer}, - trace::{ - Span, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, TraceState, - Tracer, - }, - Context, -}; -use std::borrow::Cow; - -const TRACE_VERSION: u8 = 0; - -pub struct Traceparent { - pub trace_id: TraceId, - pub span_id: SpanId, - pub trace_flags: TraceFlags, -} - -///traceparent is compos from {trace-version}-{trace-id}-{parent-id}-{trace-flags} -impl Traceparent { - pub fn from_string(traceparent: &str) -> Traceparent { - if traceparent.is_empty() { - return Traceparent::new_empty(); - } - - let splitted: Vec<&str> = traceparent.split("-").collect(); - - if splitted.len() <= 3 { - return Traceparent::new_empty(); - } - - let trace_id = TraceId::from_hex(&splitted[1].to_string()).unwrap(); - let span_id = SpanId::from_hex(&splitted[2].to_string()).unwrap(); - let trace_flags = TraceFlags::new(splitted[3].as_bytes()[0]); - - Traceparent { - trace_id, - span_id, - trace_flags, - } - } - - pub fn string_from_ctx(ctx: &Context) -> String { - let trace_id = ctx.get::().unwrap(); - let parent_id = ctx.get::().unwrap(); - let trace_flags = ctx.get::().unwrap(); - - format!( - "{:02x}-{:032x}-{:016x}-{:02x}", - TRACE_VERSION, trace_id, parent_id, trace_flags - ) - } - - fn new_empty() -> Traceparent { - let tracer = global::tracer("empty"); - - let span = tracer - .span_builder("empty") - .with_kind(SpanKind::Consumer) - .start(&tracer); - - let span_ctx = span.span_context(); - - Traceparent { - trace_id: span_ctx.clone().trace_id(), - span_id: span_ctx.clone().span_id(), - trace_flags: span_ctx.clone().trace_flags(), - } - } -} - -pub fn get_span(tracer: &BoxedTracer, traceparent: &str, span_name: &str) -> (Context, BoxedSpan) { - let traceparent = Traceparent::from_string(traceparent); - - let ctx = Context::new().with_remote_span_context(SpanContext::new( - traceparent.trace_id, - traceparent.span_id, - traceparent.trace_flags, - true, - TraceState::default(), - )); - - let span = tracer - .span_builder(Cow::from(span_name.to_owned())) - .with_kind(SpanKind::Consumer) - .start_with_context(tracer, &ctx); - - let ctx = ctx.with_value(traceparent.trace_id); - let ctx = ctx.with_value(traceparent.span_id); - - (ctx.with_value(traceparent.trace_flags), span) -} diff --git a/traces/src/grpc.rs b/traces/src/grpc.rs new file mode 100644 index 0000000..47bd26c --- /dev/null +++ b/traces/src/grpc.rs @@ -0,0 +1,53 @@ +use opentelemetry::{ + global::{self, BoxedSpan, BoxedTracer}, + propagation::{Extractor, Injector}, + trace::Tracer, + Context, +}; + +pub struct ExMetadataMap<'a>(&'a tonic::metadata::MetadataMap); + +impl<'a> Extractor for ExMetadataMap<'a> { + /// Get a value for a key from the MetadataMap. If the value can't be converted to &str, returns None + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the MetadataMap. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::>() + } +} + +pub struct InjMetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); + +impl<'a> Injector for InjMetadataMap<'a> { + /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } +} + +pub fn span(meta: &tonic::metadata::MetadataMap, tracer: &BoxedTracer) -> (Context, BoxedSpan) { + let ctx = global::get_text_map_propagator(|prop| prop.extract(&ExMetadataMap(meta))); + + let span = tracer.start_with_context("Processing reply", &ctx); + + (ctx, span) +} + +pub fn inject(ctx: &Context, meta: &mut tonic::metadata::MetadataMap) { + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&ctx, &mut InjMetadataMap(meta)) + }); +} diff --git a/traces/src/lib.rs b/traces/src/lib.rs index aef5994..1d480bb 100644 --- a/traces/src/lib.rs +++ b/traces/src/lib.rs @@ -1,6 +1,4 @@ -///deprecated -pub mod amqp; - +pub mod grpc; pub mod jaeger; pub mod otlp;