Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Apr 11, 2024
1 parent 2a06f13 commit d630c26
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 78 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ NOTE: Keep in mind that if you place `validate` middleware before `inject_params
Ignored methods can be defined in extension config:
```yml
extensions:
validate:
validator:
ignore_methods:
- system_health
- system_name
Expand Down
4 changes: 2 additions & 2 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod merge_subscription;
pub mod rate_limit;
pub mod server;
pub mod telemetry;
pub mod validate;
pub mod validator;

#[async_trait]
pub trait Extension: Sized {
Expand Down Expand Up @@ -139,5 +139,5 @@ define_all_extensions! {
server: server::SubwayServerBuilder,
event_bus: event_bus::EventBus,
rate_limit: rate_limit::RateLimitBuilder,
validate: validate::Validate,
validator: validator::Validator,
}
33 changes: 0 additions & 33 deletions src/extensions/validate/mod.rs

This file was deleted.

67 changes: 67 additions & 0 deletions src/extensions/validator/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::extensions::client::Client;
use crate::middlewares::{CallRequest, CallResult};
use crate::utils::errors;
use async_trait::async_trait;
use serde::Deserialize;
use std::sync::Arc;

use super::{Extension, ExtensionRegistry};

#[derive(Default)]
pub struct Validator {
pub config: ValidateConfig,
}

#[derive(Deserialize, Default, Debug, Clone)]
pub struct ValidateConfig {
pub ignore_methods: Vec<String>,
}

#[async_trait]
impl Extension for Validator {
type Config = ValidateConfig;

async fn from_config(config: &Self::Config, _registry: &ExtensionRegistry) -> Result<Self, anyhow::Error> {
Ok(Self::new(config.clone()))
}
}

impl Validator {
pub fn new(config: ValidateConfig) -> Self {
Self { config }
}

pub fn ignore(&self, method: &String) -> bool {
self.config.ignore_methods.contains(method)
}

pub fn validate(&self, client: Arc<Client>, request: CallRequest, response: CallResult) {
tokio::spawn(async move {
let healthy_endpoints = client.endpoints().iter().filter(|x| x.health().score() > 0);
futures::future::join_all(healthy_endpoints.map(|endpoint| async {
let expected = endpoint
.request(
&request.method,
request.params.clone(),
std::time::Duration::from_secs(30),
)
.await
.map_err(errors::map_error);

if response != expected {
let request = serde_json::to_string_pretty(&request).unwrap_or_default();
let actual = match &response {
Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(),
Err(e) => e.to_string()
};
let expected = match &expected {
Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(),
Err(e) => e.to_string()
};
let endpoint_url = endpoint.url();
tracing::error!("Response mismatch for request:\n{request}\nSubway response:\n{actual}\nEndpoint {endpoint_url} response:\n{expected}");
}
})).await;
});
}
}
49 changes: 7 additions & 42 deletions src/middlewares/methods/validate.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use async_trait::async_trait;
use std::sync::Arc;

use crate::utils::errors;
use crate::{
extensions::{client::Client, validate::Validate},
extensions::{client::Client, validator::Validator},
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod},
utils::{TypeRegistry, TypeRegistryRef},
};

pub struct ValidateMiddleware {
validate: Arc<Validate>,
validator: Arc<Validator>,
client: Arc<Client>,
}

impl ValidateMiddleware {
pub fn new(validate: Arc<Validate>, client: Arc<Client>) -> Self {
Self { validate, client }
pub fn new(validator: Arc<Validator>, client: Arc<Client>) -> Self {
Self { validator, client }
}
}

Expand All @@ -25,7 +24,7 @@ impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for ValidateMiddlewar
_method: &RpcMethod,
extensions: &TypeRegistryRef,
) -> Option<Box<dyn Middleware<CallRequest, CallResult>>> {
let validate = extensions.read().await.get::<Validate>().unwrap_or_default();
let validate = extensions.read().await.get::<Validator>().unwrap_or_default();

let client = extensions
.read()
Expand All @@ -44,44 +43,10 @@ impl Middleware<CallRequest, CallResult> for ValidateMiddleware {
context: TypeRegistry,
next: NextFn<CallRequest, CallResult>,
) -> CallResult {
let client = self.client.clone();
let result = next(request.clone(), context).await;
let actual = result.clone();

if self.validate.ignore(&request.method) {
return result;
if !self.validator.ignore(&request.method) {
self.validator.validate(self.client.clone(), request, result.clone());
}

if let Err(err) = tokio::spawn(async move {
let healthy_endpoints = client.endpoints().iter().filter(|x| x.health().score() > 0);
futures::future::join_all(healthy_endpoints.map(|endpoint| async {
let expected = endpoint
.request(
&request.method,
request.params.clone(),
std::time::Duration::from_secs(30),
)
.await
.map_err(errors::map_error);

if actual != expected {
let request = serde_json::to_string_pretty(&request).unwrap_or_default();
let actual = match &actual {
Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(),
Err(e) => e.to_string()
};
let expected = match &expected {
Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(),
Err(e) => e.to_string()
};
let endpoint_url = endpoint.url();
tracing::error!("Response mismatch for request:\n{request}\nSubway response:\n{actual}\nEndpoint {endpoint_url} response:\n{expected}");
}
})).await;
}).await {
tracing::error!("Validate task failed: {err:?}");
}

result
}
}

0 comments on commit d630c26

Please sign in to comment.