Skip to content

Commit

Permalink
validate middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
ermalkaleci committed Apr 10, 2024
1 parent cdbdd9b commit 88765fd
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .validateignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# skip validation for methods listed below
system_health
system_name
system_version
author_pendingExtrinsics
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ jsonrpc-pubsub = { version = "18.0.0" }
name = "bench"
harness = false

[features]
validate = []

[target.'cfg(tokio_unstable)'.dependencies]
console-subscriber = "0.2.0"

Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,18 @@ It's also possible to run individual benchmarks by:

```
cargo bench --bench bench ws_round_trip
```
```

## Validate Middleware

This middleware will intercept all method request/responses and compare the result directly with healthy endpoint responses.
This is useful for debugging to make sure the returned values are as expected.
You can use this by compiling with `validate` feature and enable validate middleware on your config file.
```yml
middlewares:
methods:
- validate
```
NOTE: Keep in mind that if you place `validate` middleware before `inject_params` you may get false positive errors because the request will not be the same.

Use `.validateingore` file to list all the methods you want to ignore.
7 changes: 7 additions & 0 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod tests;
const TRACER: utils::telemetry::Tracer = utils::telemetry::Tracer::new("client");

pub struct Client {
endpoints: Vec<Arc<Endpoint>>,
sender: tokio::sync::mpsc::Sender<Message>,
rotation_notify: Arc<Notify>,
retries: u32,
Expand Down Expand Up @@ -186,6 +187,7 @@ impl Client {

let rotation_notify = Arc::new(Notify::new());
let rotation_notify_bg = rotation_notify.clone();
let endpoints_ = endpoints.clone();

let background_task = tokio::spawn(async move {
let request_backoff_counter = Arc::new(AtomicU32::new(0));
Expand Down Expand Up @@ -395,6 +397,7 @@ impl Client {
});

Ok(Self {
endpoints: endpoints_,
sender: message_tx,
rotation_notify,
retries: retries.unwrap_or(3),
Expand All @@ -406,6 +409,10 @@ impl Client {
Self::new(endpoints, None, None, None, None)
}

pub fn endpoints(&self) -> &Vec<Arc<Endpoint>> {
self.endpoints.as_ref()
}

pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> CallResult {
async move {
let (tx, rx) = tokio::sync::oneshot::channel();
Expand Down
2 changes: 2 additions & 0 deletions src/middlewares/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub async fn create_method_middleware(
"block_tag" => block_tag::BlockTagMiddleware::build(method, extensions).await,
"inject_params" => inject_params::InjectParamsMiddleware::build(method, extensions).await,
"delay" => delay::DelayMiddleware::build(method, extensions).await,
#[cfg(feature = "validate")]
"validate" => validate::ValidateMiddleware::build(method, extensions).await,
#[cfg(test)]
"crazy" => testing::CrazyMiddleware::build(method, extensions).await,
_ => panic!("Unknown method middleware: {}", name),
Expand Down
3 changes: 3 additions & 0 deletions src/middlewares/methods/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ pub mod inject_params;
pub mod response;
pub mod upstream;

#[cfg(feature = "validate")]
pub mod validate;

#[cfg(test)]
pub mod testing;
100 changes: 100 additions & 0 deletions src/middlewares/methods/validate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use async_trait::async_trait;
use std::sync::Arc;
use std::{fs::File, io::Read};

use crate::utils::errors;
use crate::{
extensions::client::Client,
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod},
utils::{TypeRegistry, TypeRegistryRef},
};

pub struct ValidateMiddleware {
client: Arc<Client>,
ignore_methods: Vec<String>,
}

impl ValidateMiddleware {
pub fn new(client: Arc<Client>, ignore_methods: Vec<String>) -> Self {
Self { client, ignore_methods }
}
}

#[async_trait]
impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for ValidateMiddleware {
async fn build(
_method: &RpcMethod,
extensions: &TypeRegistryRef,
) -> Option<Box<dyn Middleware<CallRequest, CallResult>>> {
// read ignored methods from file
let mut ignore_methods = vec![];
if let Ok(mut file) = File::open(".validateignore") {
if let Err(err) = file.read_to_end(&mut ignore_methods) {
tracing::error!("Read .validateignore failed: {err:?}");
}
}
let ignore_methods = String::from_utf8(ignore_methods)
.unwrap_or_default()
.split('\n')
.map(|x| x.trim().to_string())
.filter(|x| !x.starts_with('#') && !x.starts_with("//")) // filter comments
.collect();

let client = extensions
.read()
.await
.get::<Client>()
.expect("Client extension not found");
Some(Box::new(ValidateMiddleware::new(client, ignore_methods)))
}
}

#[async_trait]
impl Middleware<CallRequest, CallResult> for ValidateMiddleware {
async fn call(
&self,
request: CallRequest,
context: TypeRegistry,
next: NextFn<CallRequest, CallResult>,
) -> CallResult {
let client = self.client.clone();
let result = next(request.clone(), context).await;
let actual = result.clone();

if self.ignore_methods.contains(&request.method) {
return result;
}

if let Err(err) = tokio::spawn(async move {
let healthy_endpoints = client.endpoints().iter().filter(|x| x.health().score() > 0);
futures::future::join_all(healthy_endpoints.map(|endpoint| async {
let expected = endpoint
.request(
&request.method,
request.params.clone(),
std::time::Duration::from_secs(30),
)
.await
.map_err(errors::map_error);

if actual != expected {
let request = serde_json::to_string_pretty(&request).unwrap_or_default();
let actual = match &actual {
Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(),
Err(e) => e.to_string()
};
let expected = match &expected {
Ok(value) => serde_json::to_string_pretty(&value).unwrap_or_default(),
Err(e) => e.to_string()
};
let endpoint_url = endpoint.url();
tracing::error!("Response mismatch for request:\n{request}\nSubway response:\n{actual}\nEndpoint {endpoint_url} response:\n{expected}");
}
})).await;
}).await {
tracing::error!("Validate task failed: {err:?}");
}

result
}
}
3 changes: 2 additions & 1 deletion src/middlewares/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use jsonrpsee::{
PendingSubscriptionSink,
};
use opentelemetry::trace::FutureExt as _;
use serde::Serialize;
use std::{
fmt::{Debug, Formatter},
sync::Arc,
Expand All @@ -20,7 +21,7 @@ pub mod factory;
pub mod methods;
pub mod subscriptions;

#[derive(Debug)]
#[derive(Clone, Debug, Serialize)]
/// Represents a RPC request made to a middleware function.
pub struct CallRequest {
pub method: String,
Expand Down

0 comments on commit 88765fd

Please sign in to comment.