diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d09be56..5511f07 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,7 +44,7 @@ jobs: sudo apt update \ && sudo apt install libssl-dev build-essential cmake pkg-config llvm-dev libclang-dev clang libmosquitto-dev libsqlite3-dev -y \ && cargo install --locked cargo-audit || true \ - && cargo install cargo-tarpaulin + && cargo install cargo-tarpaulin --force - name: 🔐 Run audit run: | diff --git a/Cargo.lock b/Cargo.lock index 95871cc..a8afbf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -570,6 +570,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "thiserror", "tokio", "tracing", ] @@ -2422,6 +2423,7 @@ dependencies = [ "messaging", "opentelemetry", "rdkafka", + "thiserror", "tokio", "tracing", ] diff --git a/auth/Cargo.toml b/auth/Cargo.toml index a208a53..290200c 100644 --- a/auth/Cargo.toml +++ b/auth/Cargo.toml @@ -11,6 +11,7 @@ serde_json = { workspace = true } tracing = { workspace = true } async-trait = { workspace = true } opentelemetry = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true, features = ["sync"] } reqwest = { version = "0.12.3", features = ["json"] } moka = { version = "0.12.7", features = ["future"] } diff --git a/auth/src/auth0.rs b/auth/src/auth0.rs index b16f161..8e4798a 100644 --- a/auth/src/auth0.rs +++ b/auth/src/auth0.rs @@ -1,4 +1,4 @@ -use super::{manager::JwtManager, types::TokenClaims}; +use super::{errors::AuthError, manager::JwtManager, types::TokenClaims}; use async_trait::async_trait; use configs::IdentityServerConfigs; use jsonwebtoken::jwk::JwkSet; @@ -37,7 +37,7 @@ impl Auth0JwtManager { #[async_trait] impl JwtManager for Auth0JwtManager { - async fn verify(&self, ctx: &Context, token: &str) -> Result { + async fn verify(&self, ctx: &Context, token: &str) -> Result { let mut span = self.tracer.start_with_context("authenticate", ctx); if let Some(cached_claim) = self.jwt_cache.get(token).await { @@ -75,7 +75,7 @@ impl JwtManager for Auth0JwtManager { } impl Auth0JwtManager { - async fn get_jwks(&self, span: &mut BoxedSpan) -> Result { + async fn get_jwks(&self, span: &mut BoxedSpan) -> Result { let res = match reqwest::get(&format!( "https://{}/{}", self.cfg.realm, ".well-known/jwks.json" @@ -88,7 +88,7 @@ impl Auth0JwtManager { span.set_status(Status::Error { description: Cow::from("error to get jwks from auth0 api"), }); - Err(()) + Err(AuthError::CouldNotRetrieveJWKS) } Ok(r) => Ok(r), }?; @@ -100,12 +100,12 @@ impl Auth0JwtManager { span.set_status(Status::Error { description: Cow::from("error deserializing the jwks"), }); - Err(()) + Err(AuthError::FailedToDeserializeToken) } Ok(v) => Ok(v), }?; - return Ok(val); + Ok(val) } } diff --git a/auth/src/errors.rs b/auth/src/errors.rs new file mode 100644 index 0000000..9d49a9d --- /dev/null +++ b/auth/src/errors.rs @@ -0,0 +1,25 @@ +use thiserror::Error; + +#[derive(Error, Debug, PartialEq, Eq)] +pub enum AuthError { + #[error("internal error")] + InternalError, + + #[error("could not retrieve JWKS")] + CouldNotRetrieveJWKS, + + #[error("error to load secrets from secret manager - `{0}`")] + InvalidToken(String), + + #[error("failed to deserialize token")] + FailedToDeserializeToken, + + #[error("failed to retrieve claim")] + FailedToRetrieveClaim, + + #[error("failed to retrieve user custom data claim")] + FailedToRetrieveUserCustomDataClaim, + + #[error("failed to retrieve scope claim")] + FailedToRetrieveScopeClaim, +} diff --git a/auth/src/keycloak.rs b/auth/src/keycloak.rs index f0fcf8a..00e050c 100644 --- a/auth/src/keycloak.rs +++ b/auth/src/keycloak.rs @@ -1,4 +1,4 @@ -use super::{manager::JwtManager, types::TokenClaims}; +use super::{errors::AuthError, manager::JwtManager, types::TokenClaims}; use async_trait::async_trait; use configs::IdentityServerConfigs; use jsonwebtoken::jwk::JwkSet; @@ -37,7 +37,7 @@ impl KeycloakJwtManager { #[async_trait] impl JwtManager for KeycloakJwtManager { - async fn verify(&self, ctx: &Context, token: &str) -> Result { + async fn verify(&self, ctx: &Context, token: &str) -> Result { let mut span = self.tracer.start_with_context("authenticate", ctx); if let Some(cached_claim) = self.jwt_cache.get(token).await { @@ -75,7 +75,7 @@ impl JwtManager for KeycloakJwtManager { } impl KeycloakJwtManager { - async fn get_jwks(&self, span: &mut BoxedSpan) -> Result { + async fn get_jwks(&self, span: &mut BoxedSpan) -> Result { // http://BASE_URL/realms/proteu/protocol/openid-connect/certs let endpoint = format!( "{}/realms/{}/protocol/openid-connect/certs", @@ -89,7 +89,7 @@ impl KeycloakJwtManager { span.set_status(Status::Error { description: Cow::from("error to get jwks from auth0 api"), }); - Err(()) + Err(AuthError::CouldNotRetrieveJWKS) } Ok(r) => Ok(r), }?; @@ -101,11 +101,11 @@ impl KeycloakJwtManager { span.set_status(Status::Error { description: Cow::from("error deserializing the jwks"), }); - Err(()) + Err(AuthError::CouldNotRetrieveJWKS) } Ok(v) => Ok(v), }?; - return Ok(val); + Ok(val) } } diff --git a/auth/src/lib.rs b/auth/src/lib.rs index 4ef948f..d9ad4d5 100644 --- a/auth/src/lib.rs +++ b/auth/src/lib.rs @@ -1,4 +1,5 @@ pub mod auth0; +pub mod errors; pub mod keycloak; pub mod manager; pub mod rbac; diff --git a/auth/src/manager.rs b/auth/src/manager.rs index 5529a13..a51c067 100644 --- a/auth/src/manager.rs +++ b/auth/src/manager.rs @@ -1,4 +1,4 @@ -use crate::types::TokenClaims; +use crate::{errors::AuthError, types::TokenClaims}; use async_trait::async_trait; use jsonwebtoken::{ decode, decode_header, @@ -12,7 +12,7 @@ use tracing::error; #[async_trait] pub trait JwtManager: Send + Sync { - async fn verify(&self, ctx: &Context, token: &str) -> Result; + async fn verify(&self, ctx: &Context, token: &str) -> Result; fn decode_token( &self, @@ -20,40 +20,50 @@ pub trait JwtManager: Send + Sync { jwks: &JwkSet, aud: &str, iss: &str, - ) -> Result>, ()> { + ) -> Result>, AuthError> { let Ok(header) = decode_header(token) else { error!("failed to decoded token header"); - return Err(()); + return Err(AuthError::InvalidToken( + "failed to decoded token header".into(), + )); }; let Some(kid) = header.kid else { error!("token header without kid"); - return Err(()); + return Err(AuthError::InvalidToken("token header without kid".into())); }; let Some(jwk) = jwks.find(&kid) else { error!("wasn't possible to find the same token kid into jwks"); - return Err(()); + return Err(AuthError::InvalidToken( + "wasn't possible to find the same token kid into jwks".into(), + )); }; let AlgorithmParameters::RSA(rsa) = &jwk.algorithm else { error!("token hashed using other algorithm than RSA"); - return Err(()); + return Err(AuthError::InvalidToken( + "token hashed using other algorithm than RSA".into(), + )); }; let Ok(decoding_key) = DecodingKey::from_rsa_components(&rsa.n, &rsa.e) else { error!("failed to decode rsa components"); - return Err(()); + return Err(AuthError::InvalidToken( + "failed to decode rsa components".into(), + )); }; let Some(key_alg) = jwk.common.key_algorithm else { error!("jwk with no key algorithm"); - return Err(()); + return Err(AuthError::InvalidToken("jwk with no key algorithm".into())); }; let Ok(alg) = Algorithm::from_str(key_alg.to_string().as_str()) else { error!("algorithm provided by the JWK is not sported!"); - return Err(()); + return Err(AuthError::InvalidToken( + "algorithm provided by the JWK is not sported!".into(), + )); }; let mut validation = Validation::new(alg); @@ -67,7 +77,7 @@ pub trait JwtManager: Send + Sync { Ok(d) => Ok(d), Err(err) => { error!(error = err.to_string(), "token validation error"); - Err(()) + Err(AuthError::InvalidToken("token validation error".into())) } } } diff --git a/auth/src/types/custom_data.rs b/auth/src/types/custom_data.rs index 43b6499..d9aea3e 100644 --- a/auth/src/types/custom_data.rs +++ b/auth/src/types/custom_data.rs @@ -1,3 +1,4 @@ +use crate::errors::AuthError; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -5,73 +6,42 @@ use tracing::error; #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct CustomData { - #[serde(rename(serialize = "x1", deserialize = "x1"))] - pub user_id: i64, - - #[serde(rename(serialize = "x2", deserialize = "x2"))] - pub user_key: String, - - #[serde(rename(serialize = "y1", deserialize = "y1"))] - pub company_id: i64, - - #[serde(rename(serialize = "y2", deserialize = "y2"))] - pub company_key: String, + pub user_data: Value, + pub user_metadata: Value, } impl CustomData { - pub fn from_auth0(claims: &HashMap) -> Result { + pub fn from_auth0(claims: &HashMap) -> Result { let Some(user_data) = claims.get("user_data") else { error!(claim = "user_data", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveUserCustomDataClaim); }; let Some(user_metadata) = user_data.get("user_metadata") else { error!(claim = "user_metadata", "invalid jwt claim"); - return Err(()); - }; - - let Some(x1) = user_metadata.get("x1") else { - error!(claim = "x1", "invalid jwt claim"); - return Err(()); - }; - - let Some(x2) = user_metadata.get("x2") else { - error!(claim = "x2", "invalid jwt claim"); - return Err(()); - }; - - let Some(y1) = user_metadata.get("y1") else { - error!(claim = "y1", "invalid jwt claim"); - return Err(()); - }; - - let Some(y2) = user_metadata.get("y2") else { - error!(claim = "y2", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveUserCustomDataClaim); }; Ok(Self { - user_id: x1.as_i64().unwrap(), - user_key: x2.as_str().unwrap().into(), - company_id: y1.as_i64().unwrap(), - company_key: y2.as_str().unwrap().into(), + user_data: user_data.to_owned(), + user_metadata: user_metadata.to_owned(), }) } - pub fn from_keycloak(claims: &HashMap) -> Result { + pub fn from_keycloak(claims: &HashMap) -> Result { let Some(user_data) = claims.get("user_data") else { error!(claim = "user_data", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveUserCustomDataClaim); }; let Some(json) = user_data.as_str() else { error!(claim = "user_data", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveUserCustomDataClaim); }; let Ok(custom) = serde_json::from_str::(json) else { error!(claim = "user_data", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveUserCustomDataClaim); }; Ok(custom) diff --git a/auth/src/types/scopes.rs b/auth/src/types/scopes.rs index eb0e67d..878d1e4 100644 --- a/auth/src/types/scopes.rs +++ b/auth/src/types/scopes.rs @@ -1,3 +1,4 @@ +use crate::errors::AuthError; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -7,15 +8,15 @@ use tracing::error; pub struct Scopes(pub HashSet); impl Scopes { - pub fn from_auth0(claims: &HashMap) -> Result { + pub fn from_auth0(claims: &HashMap) -> Result { let Some(v) = claims.get("scope") else { error!(claim = "scope", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveScopeClaim); }; let Some(scopes) = v.as_str() else { error!(claim = "scope", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveScopeClaim); }; let mut set = HashSet::new(); @@ -24,33 +25,33 @@ impl Scopes { set.insert(scope.into()); } - Ok(Self { 0: set }) + Ok(Self(set)) } - pub fn from_keycloak(claims: &HashMap) -> Result { + pub fn from_keycloak(claims: &HashMap) -> Result { let Some(access) = claims.get("resource_access") else { error!(claim = "resource_access", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveScopeClaim); }; let Some(azp_v) = claims.get("azp") else { error!(claim = "azp", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveScopeClaim); }; let Some(azp) = azp_v.as_str() else { error!(claim = "azp", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveScopeClaim); }; let Some(resources) = access.get(azp) else { error!(claim = azp, "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveScopeClaim); }; let Some(roles) = resources.get("roles") else { error!(claim = "roles", "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveScopeClaim); }; let mut set = HashSet::new(); @@ -59,6 +60,6 @@ impl Scopes { set.insert(role.as_str().unwrap().into()); } - Ok(Self { 0: set }) + Ok(Self(set)) } } diff --git a/auth/src/types/token_claims.rs b/auth/src/types/token_claims.rs index 731beb1..1ceb534 100644 --- a/auth/src/types/token_claims.rs +++ b/auth/src/types/token_claims.rs @@ -1,4 +1,5 @@ use super::{CustomData, Scopes}; +use crate::errors::AuthError; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -20,7 +21,7 @@ impl TokenClaims { claims: &HashMap, scopes: Scopes, custom_data: CustomData, - ) -> Result { + ) -> Result { Ok(Self { iss: get_claim_as_string("iss", claims)?, sub: get_claim_as_string("sub", claims)?, @@ -32,56 +33,56 @@ impl TokenClaims { }) } - pub fn from_auth0(claims: &HashMap) -> Result { + pub fn from_auth0(claims: &HashMap) -> Result { let scopes = Scopes::from_auth0(claims)?; let custom_data = CustomData::from_auth0(claims)?; TokenClaims::new(claims, scopes, custom_data) } - pub fn from_keycloak(claims: &HashMap) -> Result { + pub fn from_keycloak(claims: &HashMap) -> Result { let scopes = Scopes::from_keycloak(claims)?; let custom_data = CustomData::from_keycloak(claims)?; TokenClaims::new(claims, scopes, custom_data) } } -fn get_claim_as_string(key: &str, claims: &HashMap) -> Result { +fn get_claim_as_string(key: &str, claims: &HashMap) -> Result { let Some(fv) = claims.get(key) else { error!(claim = key, "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveClaim); }; let Some(value) = fv.as_str() else { error!(claim = key, "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveClaim); }; Ok(value.into()) } -fn get_claim_as_u64(key: &str, claims: &HashMap) -> Result { +fn get_claim_as_u64(key: &str, claims: &HashMap) -> Result { let Some(fv) = claims.get(key) else { error!(claim = key, "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveClaim); }; let Some(value) = fv.as_u64() else { error!(claim = key, "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveClaim); }; Ok(value) } -fn get_claim_as_vec(key: &str, claims: &HashMap) -> Result, ()> { +fn get_claim_as_vec(key: &str, claims: &HashMap) -> Result, AuthError> { let Some(fv) = claims.get(key) else { error!(claim = key, "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveClaim); }; let Some(value) = fv.as_array() else { error!(claim = key, "invalid jwt claim"); - return Err(()); + return Err(AuthError::FailedToRetrieveClaim); }; Ok(value diff --git a/configs/src/mqtt.rs b/configs/src/mqtt.rs index a18730d..95ca459 100644 --- a/configs/src/mqtt.rs +++ b/configs/src/mqtt.rs @@ -10,7 +10,7 @@ pub enum MQTTBrokerKind { impl From<&str> for MQTTBrokerKind { fn from(value: &str) -> Self { match value.to_uppercase().as_str() { - "AWSIoTCore" => MQTTBrokerKind::AWSIoTCore, + "AWSIOTCORE" => MQTTBrokerKind::AWSIoTCore, _ => MQTTBrokerKind::Default, } } @@ -19,7 +19,7 @@ impl From<&str> for MQTTBrokerKind { impl From<&String> for MQTTBrokerKind { fn from(value: &String) -> Self { match value.to_uppercase().as_str() { - "AWSIoTCore" => MQTTBrokerKind::AWSIoTCore, + "AWSIOTCORE" => MQTTBrokerKind::AWSIoTCore, _ => MQTTBrokerKind::Default, } } diff --git a/configs/src/sqlite.rs b/configs/src/sqlite.rs index a2667b6..13cef08 100644 --- a/configs/src/sqlite.rs +++ b/configs/src/sqlite.rs @@ -1,4 +1,4 @@ -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct SqliteConfigs { ///Default: local.db pub file: String, @@ -7,13 +7,3 @@ pub struct SqliteConfigs { /// Default: postgres pub password: String, } - -impl Default for SqliteConfigs { - fn default() -> Self { - Self { - file: Default::default(), - user: Default::default(), - password: Default::default(), - } - } -} diff --git a/configs_builder/src/configs_builder.rs b/configs_builder/src/configs_builder.rs index 161aa23..b498340 100644 --- a/configs_builder/src/configs_builder.rs +++ b/configs_builder/src/configs_builder.rs @@ -189,9 +189,7 @@ impl ConfigBuilder { app_cfg: &AppConfigs, ) -> Result>, ConfigsError> { match app_cfg.secret_manager { - SecretsManagerKind::None => { - return Ok(Some(Arc::new(FakeSecretClient::new()))); - } + SecretsManagerKind::None => Ok(Some(Arc::new(FakeSecretClient::new()))), SecretsManagerKind::AWSSecretManager => { let secret_key = env::var(SECRET_KEY_ENV_KEY).unwrap_or_default(); @@ -649,7 +647,7 @@ impl ConfigBuilder { v.parse().unwrap_or_else(|_| { error!(key = key, value = v, "parse went wrong"); - return default; + default }) } diff --git a/health_http_server/src/server/mod.rs b/health_http_server/src/server/mod.rs index b32e162..7a7832e 100644 --- a/health_http_server/src/server/mod.rs +++ b/health_http_server/src/server/mod.rs @@ -1,3 +1,3 @@ -mod server; +mod tiny_server; -pub use server::TinyHTTPServer; +pub use tiny_server::TinyHTTPServer; diff --git a/health_http_server/src/server/server.rs b/health_http_server/src/server/tiny_server.rs similarity index 100% rename from health_http_server/src/server/server.rs rename to health_http_server/src/server/tiny_server.rs diff --git a/health_readiness/src/service.rs b/health_readiness/src/service.rs index 07f83e0..0bce261 100644 --- a/health_readiness/src/service.rs +++ b/health_readiness/src/service.rs @@ -34,11 +34,11 @@ pub struct HealthReadinessServiceImpl { impl HealthReadinessServiceImpl { pub fn empty() -> Arc { - return Arc::new(HealthReadinessServiceImpl { checkers: vec![] }); + Arc::new(HealthReadinessServiceImpl { checkers: vec![] }) } pub fn new(checkers: Vec>) -> Arc { - return Arc::new(HealthReadinessServiceImpl { checkers }); + Arc::new(HealthReadinessServiceImpl { checkers }) } #[cfg(feature = "mqtt")] diff --git a/http_components/src/middlewares/deserializer.rs b/http_components/src/middlewares/deserializer.rs index 1389cf5..a41deb2 100644 --- a/http_components/src/middlewares/deserializer.rs +++ b/http_components/src/middlewares/deserializer.rs @@ -11,7 +11,7 @@ pub fn handler() -> JsonConfig { format!("JSON error: {:?}", err), HttpResponse::BadRequest().json(HTTPError::bad_request( "unformatted body", - &format!("{}", err), + format!("{}", err), )), ) .into() diff --git a/http_components/src/middlewares/otel/http_metrics.rs b/http_components/src/middlewares/otel/http_metrics.rs index f6f35c5..0df884a 100644 --- a/http_components/src/middlewares/otel/http_metrics.rs +++ b/http_components/src/middlewares/otel/http_metrics.rs @@ -60,6 +60,12 @@ impl HTTPOtelMetrics { } } +impl Default for HTTPOtelMetrics { + fn default() -> Self { + Self::new() + } +} + impl dev::Transform for HTTPOtelMetrics where S: dev::Service< diff --git a/http_server/src/server/server.rs b/http_server/src/server/http_server.rs similarity index 100% rename from http_server/src/server/server.rs rename to http_server/src/server/http_server.rs diff --git a/http_server/src/server/mod.rs b/http_server/src/server/mod.rs index 6f8d236..53577e2 100644 --- a/http_server/src/server/mod.rs +++ b/http_server/src/server/mod.rs @@ -1,3 +1,3 @@ -mod server; +mod http_server; -pub use server::HTTPServer; +pub use http_server::HTTPServer; diff --git a/kafka/Cargo.toml b/kafka/Cargo.toml index 6630656..3f732c7 100644 --- a/kafka/Cargo.toml +++ b/kafka/Cargo.toml @@ -12,3 +12,4 @@ async-trait = { workspace = true } opentelemetry = { workspace = true } tracing = { workspace = true } tokio = { workspace = true, features = ["default"] } +thiserror = { workspace = true } diff --git a/kafka/src/errors.rs b/kafka/src/errors.rs new file mode 100644 index 0000000..ecb240b --- /dev/null +++ b/kafka/src/errors.rs @@ -0,0 +1,7 @@ +use thiserror::Error; + +#[derive(Error, Debug, PartialEq, Eq)] +pub enum KafkaError { + #[error("internal error")] + InternalError, +} diff --git a/kafka/src/lib.rs b/kafka/src/lib.rs index 0bc4e49..306cdfb 100644 --- a/kafka/src/lib.rs +++ b/kafka/src/lib.rs @@ -1,4 +1,5 @@ pub mod connection; pub mod dispatcher; +pub mod errors; pub mod otel; pub mod publisher; diff --git a/kafka/src/otel.rs b/kafka/src/otel.rs index 1cc2b3e..5d404c5 100644 --- a/kafka/src/otel.rs +++ b/kafka/src/otel.rs @@ -1,5 +1,4 @@ -use std::str::FromStr; - +use super::errors::KafkaError; use opentelemetry::{ global::BoxedTracer, trace::{ @@ -9,6 +8,7 @@ use opentelemetry::{ Context, }; use rdkafka::message::{BorrowedHeaders, Header, Headers, OwnedHeaders}; +use std::str::FromStr; use tracing::error; const SUPPORTED_VERSION: u8 = 0; @@ -63,45 +63,53 @@ pub fn inject_context( }) } -pub fn extract_context(kafka_headers: &BorrowedHeaders) -> Result { +pub fn extract_context(kafka_headers: &BorrowedHeaders) -> Result { let Some((header_value, stats)) = extract_trace_from_header(kafka_headers) else { - return Err(()); + return Err(KafkaError::InternalError); }; let parts = header_value.split_terminator('-').collect::>(); // Ensure parts are not out of range. if parts.len() < 4 { - return Err(()); + return Err(KafkaError::InternalError); } // Ensure version is within range, for version 0 there must be 4 parts. - let version = u8::from_str_radix(parts[0], 16).map_err(|_| ())?; + let Ok(version) = u8::from_str_radix(parts[0], 16) else { + return Err(KafkaError::InternalError); + }; if version > MAX_VERSION || version == 0 && parts.len() != 4 { - return Err(()); + return Err(KafkaError::InternalError); } // Ensure trace id is lowercase if parts[1].chars().any(|c| c.is_ascii_uppercase()) { - return Err(()); + return Err(KafkaError::InternalError); } // Parse trace id section - let trace_id = TraceId::from_hex(parts[1]).map_err(|_| ())?; + let Ok(trace_id) = TraceId::from_hex(parts[1]) else { + return Err(KafkaError::InternalError); + }; // Ensure span id is lowercase if parts[2].chars().any(|c| c.is_ascii_uppercase()) { - return Err(()); + return Err(KafkaError::InternalError); } // Parse span id section - let span_id = SpanId::from_hex(parts[2]).map_err(|_| ())?; + let Ok(span_id) = SpanId::from_hex(parts[2]) else { + return Err(KafkaError::InternalError); + }; // Parse trace flags section - let opts = u8::from_str_radix(parts[3], 16).map_err(|_| ())?; + let Ok(opts) = u8::from_str_radix(parts[3], 16) else { + return Err(KafkaError::InternalError); + }; // Ensure opts are valid for version 0 if version == 0 && opts > 2 { - return Err(()); + return Err(KafkaError::InternalError); } // Build trace flags clearing all flags other than the trace-context @@ -109,7 +117,7 @@ pub fn extract_context(kafka_headers: &BorrowedHeaders) -> Result { let trace_flags = TraceFlags::new(opts) & TraceFlags::SAMPLED; let trace_state: TraceState = - TraceState::from_str(&stats).unwrap_or_else(|_| TraceState::default()); + TraceState::from_str(stats).unwrap_or_else(|_| TraceState::default()); // create context let span_context = SpanContext::new(trace_id, span_id, trace_flags, true, trace_state); @@ -117,7 +125,7 @@ pub fn extract_context(kafka_headers: &BorrowedHeaders) -> Result { Ok(Context::new().with_remote_span_context(span_context)) } -fn extract_trace_from_header<'e>(kafka_headers: &'e BorrowedHeaders) -> Option<(&'e str, &'e str)> { +fn extract_trace_from_header(kafka_headers: &BorrowedHeaders) -> Option<(&str, &str)> { let mut trace_parent = ""; let mut trace_state = ""; let mut founded = 0; diff --git a/kafka/src/publisher.rs b/kafka/src/publisher.rs index 4ae7c7c..33c8ad8 100644 --- a/kafka/src/publisher.rs +++ b/kafka/src/publisher.rs @@ -122,7 +122,7 @@ impl KafkaPublisher { let partition = match headers.get(PARTITION_HEADER_KEY) { Some(v) => { if let HeaderValues::LongInt(p) = v { - Some(p.clone()) + Some(p.to_owned()) } else { None } @@ -133,7 +133,7 @@ impl KafkaPublisher { let timestamp = match headers.get(TIMESTAMP_HEADER_KEY) { Some(v) => { if let HeaderValues::LongLongInt(t) = v { - t.clone() + t.to_owned() } else { now() } @@ -144,7 +144,7 @@ impl KafkaPublisher { let queue_timeout = match headers.get(QUEUE_TIMEOUT_KEY) { Some(v) => { if let HeaderValues::LongLongUint(t) = v { - Duration::from_millis(t.clone()) + Duration::from_millis(t.to_owned()) } else { Duration::from_secs(0) } diff --git a/messaging/src/publisher.rs b/messaging/src/publisher.rs index 294e4e5..d1dd8da 100644 --- a/messaging/src/publisher.rs +++ b/messaging/src/publisher.rs @@ -18,17 +18,17 @@ pub enum HeaderValues { LongLongUint(u64), } -impl Into for HeaderValues { - fn into(self) -> String { - match self { - Self::ShortString(v) => v, - Self::LongString(v) => v, - Self::Int(v) => v.to_string(), - Self::LongInt(v) => v.to_string(), - Self::LongLongInt(v) => v.to_string(), - Self::Uint(v) => v.to_string(), - Self::LongUint(v) => v.to_string(), - Self::LongLongUint(v) => v.to_string(), +impl From for String { + fn from(val: HeaderValues) -> Self { + match val { + HeaderValues::ShortString(v) => v, + HeaderValues::LongString(v) => v, + HeaderValues::Int(v) => v.to_string(), + HeaderValues::LongInt(v) => v.to_string(), + HeaderValues::LongLongInt(v) => v.to_string(), + HeaderValues::Uint(v) => v.to_string(), + HeaderValues::LongUint(v) => v.to_string(), + HeaderValues::LongLongUint(v) => v.to_string(), } } } diff --git a/mqtt/src/client.rs b/mqtt/src/client.rs index 3657681..62cc9f0 100644 --- a/mqtt/src/client.rs +++ b/mqtt/src/client.rs @@ -32,10 +32,10 @@ impl MQTTClient { MQTTBrokerKind::Default => password_connection_opts(&cfgs.mqtt), }; - return MQTTClient { + MQTTClient { crate_opts, connection_opts, - }; + } } pub async fn connect( @@ -66,7 +66,7 @@ where T: DynamicConfigs, { CreateOptionsBuilder::new() - .server_uri(&format!( + .server_uri(format!( "{}://{}:{}", cfgs.mqtt.transport, cfgs.mqtt.host, cfgs.mqtt.port )) diff --git a/mqtt/src/dispatcher.rs b/mqtt/src/dispatcher.rs index 2697453..766d371 100644 --- a/mqtt/src/dispatcher.rs +++ b/mqtt/src/dispatcher.rs @@ -60,12 +60,10 @@ impl Dispatcher for MQTTDispatcher { let mut cloned_stream = self.stream.clone(); while let Some(delivery) = cloned_stream.next().await { - match delivery { - Some(msg) => match self.consume(&Context::new(), &msg).await { - Err(e) => error!(error = e.to_string(), "failure to consume msg"), - _ => {} - }, - _ => {} + if let Some(msg) = delivery { + if let Err(err) = self.consume(&Context::new(), &msg).await { + error!(error = err.to_string(), "failure to consume msg"); + } } } @@ -91,7 +89,7 @@ impl MQTTDispatcher { let msg = ConsumerMessage::new(msg.topic(), "", msg.payload(), None); - return match handler.exec(&ctx, &msg).await { + match handler.exec(&ctx, &msg).await { Ok(_) => { debug!( trace.id = traces::trace_id(&ctx), @@ -113,7 +111,7 @@ impl MQTTDispatcher { }); Err(e) } - }; + } } fn get_handler_index( @@ -136,8 +134,8 @@ impl MQTTDispatcher { Err(err) => { error!( error = err.to_string(), - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), + trace.id = traces::trace_id(ctx), + span.id = traces::span_id(ctx), topic = received_topic, "bad topic" ); @@ -148,8 +146,8 @@ impl MQTTDispatcher { if p == usize::MAX { warn!( - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), + trace.id = traces::trace_id(ctx), + span.id = traces::span_id(ctx), topic = received_topic, "cant find dispatch for this topic" ); diff --git a/rabbitmq/src/consumer.rs b/rabbitmq/src/consumer.rs index 54cb11d..09e980b 100644 --- a/rabbitmq/src/consumer.rs +++ b/rabbitmq/src/consumer.rs @@ -25,7 +25,7 @@ pub(crate) async fn consume<'c>( ) -> Result<(), AmqpError> { let (msg_type, count) = extract_header_properties(&delivery.properties); - let (ctx, mut span) = otel::new_span(&delivery.properties, &tracer, &msg_type); + let (ctx, mut span) = otel::new_span(&delivery.properties, tracer, &msg_type); debug!( trace.id = traces::trace_id(&ctx), @@ -41,22 +41,22 @@ pub(crate) async fn consume<'c>( span.set_status(Status::Error { description: Cow::from(msg), }); + debug!( trace.id = traces::trace_id(&ctx), span.id = traces::span_id(&ctx), "{}", msg ); - match delivery.ack(BasicAckOptions { multiple: false }).await { - Err(e) => { - error!("error whiling ack msg"); - span.record_error(&e); - span.set_status(Status::Error { - description: Cow::from("error to ack msg"), - }); - } - _ => {} + + if let Err(e) = delivery.ack(BasicAckOptions { multiple: false }).await { + error!("error whiling ack msg"); + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("error to ack msg"), + }); }; + return Err(AmqpError::InternalError {}); }; @@ -172,25 +172,25 @@ pub(crate) async fn consume<'c>( span.set_status(Status::Error { description: Cow::from("msg was sent to dlq"), }); - return Err(AmqpError::PublishingToDQLError {}); - } - _ => { - match delivery.ack(BasicAckOptions { multiple: false }).await { - Err(e) => { - error!( - trace.id = traces::trace_id(&ctx), - span.id = traces::span_id(&ctx), - "error whiling ack msg to default queue" - ); - span.record_error(&e); - span.set_status(Status::Error { - description: Cow::from("msg was sent to dlq"), - }); - return Err(AmqpError::AckMessageError {}); - } - _ => return Ok(()), - }; + + Err(AmqpError::PublishingToDQLError {}) } + _ => match delivery.ack(BasicAckOptions { multiple: false }).await { + Err(e) => { + error!( + trace.id = traces::trace_id(&ctx), + span.id = traces::span_id(&ctx), + "error whiling ack msg to default queue" + ); + span.record_error(&e); + span.set_status(Status::Error { + description: Cow::from("msg was sent to dlq"), + }); + + Err(AmqpError::AckMessageError {}) + } + _ => Ok(()), + }, } } @@ -202,7 +202,7 @@ fn extract_header_properties(props: &AMQPProperties) -> (String, i64) { let count = match headers.inner().get(AMQP_HEADERS_X_DEATH) { Some(value) => match value.as_array() { - Some(arr) => match arr.as_slice().get(0) { + Some(arr) => match arr.as_slice().first() { Some(value) => match value.as_field_table() { Some(table) => match table.inner().get(AMQP_HEADERS_COUNT) { Some(value) => match value.as_long_long_int() { diff --git a/rabbitmq/src/dispatcher.rs b/rabbitmq/src/dispatcher.rs index 560e569..734c809 100644 --- a/rabbitmq/src/dispatcher.rs +++ b/rabbitmq/src/dispatcher.rs @@ -91,7 +91,7 @@ impl RabbitMQDispatcher { while let Some(result) = consumer.next().await { match result { Ok(delivery) => { - match consume( + if let Err(err) = consume( &global::tracer("amqp consumer"), &delivery, &defs, @@ -99,10 +99,7 @@ impl RabbitMQDispatcher { ) .await { - Err(err) => { - error!(error = err.to_string(), "error consume msg") - } - _ => {} + error!(error = err.to_string(), "error consume msg") } } @@ -117,18 +114,18 @@ impl RabbitMQDispatcher { return Err(MessagingError::ConsumerError("some error occur".to_owned())); } - return Ok(()); + Ok(()) } pub async fn consume_blocking_multi(&self) -> Result<(), MessagingError> { let mut spawns = vec![]; - for (msg_type, def) in &self.dispatchers_def { + for (consumer_tag, def) in &self.dispatchers_def { let mut consumer = match self .channel .basic_consume( &def.queue_def.name, - &msg_type, + consumer_tag, BasicConsumeOptions { no_local: false, no_ack: false, @@ -154,7 +151,7 @@ impl RabbitMQDispatcher { while let Some(result) = consumer.next().await { match result { Ok(delivery) => { - match consume( + if let Err(err) = consume( &global::tracer("amqp consumer"), &delivery, &defs, @@ -162,10 +159,7 @@ impl RabbitMQDispatcher { ) .await { - Err(err) => { - error!(error = err.to_string(), "error consume msg") - } - _ => {} + error!(error = err.to_string(), "error consume msg"); } } diff --git a/rabbitmq/src/exchange.rs b/rabbitmq/src/exchange.rs index 84a75fc..81379d2 100644 --- a/rabbitmq/src/exchange.rs +++ b/rabbitmq/src/exchange.rs @@ -110,7 +110,7 @@ impl<'ex> ExchangeDefinition<'ex> { } pub fn passive(mut self) -> Self { - self.passive = self.passive; + self.passive = true; self } diff --git a/secrets_manager/src/aws_client.rs b/secrets_manager/src/aws_client.rs index e25bb90..3928284 100644 --- a/secrets_manager/src/aws_client.rs +++ b/secrets_manager/src/aws_client.rs @@ -15,7 +15,7 @@ pub struct AWSSecretClient { #[cfg_attr(mock, automock)] impl SecretClient for AWSSecretClient { fn get_by_key(&self, key: &str) -> Result { - let key = key.strip_prefix("!").unwrap_or_default(); + let key = key.strip_prefix('!').unwrap_or_default(); let value = self.secrets[key].clone(); let Value::String(secret) = value else { diff --git a/traces/src/injectors/grpc.rs b/traces/src/injectors/grpc.rs index 466440a..9578f99 100644 --- a/traces/src/injectors/grpc.rs +++ b/traces/src/injectors/grpc.rs @@ -25,6 +25,6 @@ impl<'a> Injector for GRPCInjector<'a> { pub fn inject(ctx: &Context, meta: &mut tonic::metadata::MetadataMap) { global::get_text_map_propagator(|propagator| { - propagator.inject_context(&ctx, &mut GRPCInjector(meta)) + propagator.inject_context(ctx, &mut GRPCInjector(meta)) }); } diff --git a/traces/src/lib.rs b/traces/src/lib.rs index 70c5f68..578d77d 100644 --- a/traces/src/lib.rs +++ b/traces/src/lib.rs @@ -23,7 +23,7 @@ where } let sampler = Sampler::TraceIdRatioBased(cfg.trace.export_rate_base); - return Sampler::ParentBased(Box::new(sampler)); + Sampler::ParentBased(Box::new(sampler)) } pub fn span_ctx(tracer: &BoxedTracer, kind: SpanKind, name: &str) -> Context {