diff --git a/.validateignore b/.validateignore new file mode 100644 index 0000000..b145a9d --- /dev/null +++ b/.validateignore @@ -0,0 +1,5 @@ +# skip validation for methods listed below +system_health +system_name +system_version +author_pendingExtrinsics \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 46ddcaa..1583f10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,9 @@ jsonrpc-pubsub = { version = "18.0.0" } name = "bench" harness = false +[features] +validate = [] + [target.'cfg(tokio_unstable)'.dependencies] console-subscriber = "0.2.0" diff --git a/README.md b/README.md index 3de9e1e..7e0e5c3 100644 --- a/README.md +++ b/README.md @@ -75,4 +75,18 @@ It's also possible to run individual benchmarks by: ``` cargo bench --bench bench ws_round_trip -``` \ No newline at end of file +``` + +## Validate Middleware + +This middleware will intercept all method request/responses and compare the result directly with healthy endpoint responses. +This is useful for debugging to make sure the returned values are as expected. +You can use this by compiling with `validate` feature and enable validate middleware on your config file. +```yml +middlewares: + methods: + - validate +``` +NOTE: Keep in mind that if you place `validate` middleware before `inject_params` you may get false positive errors because the request will not be the same. + +Use `.validateingore` file to list all the methods you want to ignore. diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index e016467..505eee7 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -30,6 +30,7 @@ mod tests; const TRACER: utils::telemetry::Tracer = utils::telemetry::Tracer::new("client"); pub struct Client { + endpoints: Vec>, sender: tokio::sync::mpsc::Sender, rotation_notify: Arc, retries: u32, @@ -186,6 +187,7 @@ impl Client { let rotation_notify = Arc::new(Notify::new()); let rotation_notify_bg = rotation_notify.clone(); + let endpoints_ = endpoints.clone(); let background_task = tokio::spawn(async move { let request_backoff_counter = Arc::new(AtomicU32::new(0)); @@ -395,6 +397,7 @@ impl Client { }); Ok(Self { + endpoints: endpoints_, sender: message_tx, rotation_notify, retries: retries.unwrap_or(3), @@ -406,6 +409,10 @@ impl Client { Self::new(endpoints, None, None, None, None) } + pub fn endpoints(&self) -> &Vec> { + self.endpoints.as_ref() + } + pub async fn request(&self, method: &str, params: Vec) -> CallResult { async move { let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/src/middlewares/factory.rs b/src/middlewares/factory.rs index d92da85..cb7e975 100644 --- a/src/middlewares/factory.rs +++ b/src/middlewares/factory.rs @@ -30,6 +30,8 @@ pub async fn create_method_middleware( "block_tag" => block_tag::BlockTagMiddleware::build(method, extensions).await, "inject_params" => inject_params::InjectParamsMiddleware::build(method, extensions).await, "delay" => delay::DelayMiddleware::build(method, extensions).await, + #[cfg(feature = "validate")] + "validate" => validate::ValidateMiddleware::build(method, extensions).await, #[cfg(test)] "crazy" => testing::CrazyMiddleware::build(method, extensions).await, _ => panic!("Unknown method middleware: {}", name), diff --git a/src/middlewares/methods/mod.rs b/src/middlewares/methods/mod.rs index 20be5e1..71af3fa 100644 --- a/src/middlewares/methods/mod.rs +++ b/src/middlewares/methods/mod.rs @@ -5,5 +5,8 @@ pub mod inject_params; pub mod response; pub mod upstream; +#[cfg(feature = "validate")] +pub mod validate; + #[cfg(test)] pub mod testing; diff --git a/src/middlewares/methods/validate.rs b/src/middlewares/methods/validate.rs new file mode 100644 index 0000000..0b38d25 --- /dev/null +++ b/src/middlewares/methods/validate.rs @@ -0,0 +1,100 @@ +use async_trait::async_trait; +use std::sync::Arc; +use std::{fs::File, io::Read}; + +use crate::utils::errors; +use crate::{ + extensions::client::Client, + middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod}, + utils::{TypeRegistry, TypeRegistryRef}, +}; + +pub struct ValidateMiddleware { + client: Arc, + ignore_methods: Vec, +} + +impl ValidateMiddleware { + pub fn new(client: Arc, ignore_methods: Vec) -> Self { + Self { client, ignore_methods } + } +} + +#[async_trait] +impl MiddlewareBuilder for ValidateMiddleware { + async fn build( + _method: &RpcMethod, + extensions: &TypeRegistryRef, + ) -> Option>> { + // read ignored methods from file + let mut ignore_methods = vec![]; + if let Ok(mut file) = File::open(".validateignore") { + if let Err(err) = file.read_to_end(&mut ignore_methods) { + tracing::error!("Read .validateignore failed: {err:?}"); + } + } + let ignore_methods = String::from_utf8(ignore_methods) + .unwrap_or_default() + .split('\n') + .map(|x| x.trim().to_string()) + .filter(|x| !x.starts_with('#') && !x.starts_with("//")) // filter comments + .collect(); + + let client = extensions + .read() + .await + .get::() + .expect("Client extension not found"); + Some(Box::new(ValidateMiddleware::new(client, ignore_methods))) + } +} + +#[async_trait] +impl Middleware for ValidateMiddleware { + async fn call( + &self, + request: CallRequest, + context: TypeRegistry, + next: NextFn, + ) -> CallResult { + let client = self.client.clone(); + let result = next(request.clone(), context).await; + let actual = result.clone(); + + if self.ignore_methods.contains(&request.method) { + return result; + } + + if let Err(err) = tokio::spawn(async move { + let healthy_endpoints = client.endpoints().iter().filter(|x| x.health().score() > 0); + futures::future::join_all(healthy_endpoints.map(|endpoint| async { + let expected = endpoint + .request( + &request.method, + request.params.clone(), + std::time::Duration::from_secs(30), + ) + .await + .map_err(errors::map_error); + + if actual != expected { + let request = serde_json::to_string_pretty(&request).unwrap_or_default(); + let actual = match &actual { + Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(), + Err(e) => e.to_string() + }; + let expected = match &expected { + Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(), + Err(e) => e.to_string() + }; + let endpoint_url = endpoint.url(); + tracing::error!("Response mismatch for request:\n{request}\nSubway response:\n{actual}\nEndpoint {endpoint_url} response:\n{expected}"); + } + })).await; + }).await { + tracing::error!("Validate task failed: {err:?}"); + } + + result + } +} diff --git a/src/middlewares/mod.rs b/src/middlewares/mod.rs index 03871b9..27656b8 100644 --- a/src/middlewares/mod.rs +++ b/src/middlewares/mod.rs @@ -6,6 +6,7 @@ use jsonrpsee::{ PendingSubscriptionSink, }; use opentelemetry::trace::FutureExt as _; +use serde::Serialize; use std::{ fmt::{Debug, Formatter}, sync::Arc, @@ -20,7 +21,7 @@ pub mod factory; pub mod methods; pub mod subscriptions; -#[derive(Debug)] +#[derive(Clone, Debug, Serialize)] /// Represents a RPC request made to a middleware function. pub struct CallRequest { pub method: String,