Skip to content

Commit

Permalink
got it working
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Jan 27, 2023
1 parent 3416d49 commit 1b342f3
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 32 deletions.
45 changes: 45 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Debug executable 'subway'",
"cargo": {
"args": [
"build",
"--bin=subway",
"--package=subway"
],
"filter": {
"name": "subway",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in executable 'subway'",
"cargo": {
"args": [
"test",
"--no-run",
"--bin=subway",
"--package=subway"
],
"filter": {
"name": "subway",
"kind": "bin"
}
},
"args": [],
"cwd": "${workspaceFolder}"
}
]
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod client;
mod config;
mod server;
mod middleware;
mod server;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down
81 changes: 56 additions & 25 deletions src/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, atomic::AtomicU32};
use std::sync::{atomic::AtomicU32, Arc};

use async_trait::async_trait;
use futures::{future::BoxFuture, FutureExt};
use jsonrpsee::{
core::{Error, JsonValue},
types::Params,
Expand All @@ -13,40 +14,69 @@ pub struct Request {
pub params: Params<'static>,
}

type NextFn<Result> = Box<dyn FnOnce(Request) -> BoxFuture<'static, Result> + Send + Sync>;

#[async_trait]
pub trait Middleware {
type Response;
type Error;
pub trait Middleware<Result: 'static>: Send + Sync {
async fn call(&self, request: Request, next: NextFn<Result>) -> Result;
}

async fn call(&self, request: Request) -> Result<Self::Response, Self::Error>;
pub struct Middlewares<Result> {
middlewares: Vec<Arc<dyn Middleware<Result>>>,
fallback: Arc<dyn Fn(Request) -> BoxFuture<'static, Result> + Send + Sync>,
}

// TODO: figure out how to setup middleware for subscriptions
impl<Result: 'static> Middlewares<Result> {
pub fn new(
middlewares: Vec<Arc<dyn Middleware<Result>>>,
fallback: Arc<dyn Fn(Request) -> BoxFuture<'static, Result> + Send + Sync>,
) -> Self {
Self {
middlewares,
fallback,
}
}

pub async fn call(&self, request: Request) -> Result {
let mut iter = self.middlewares.iter().rev();
let fallback = self.fallback.clone();
let mut next: Box<dyn FnOnce(Request) -> BoxFuture<'static, Result> + Send + Sync> =
Box::new(move |request| (fallback)(request));

while let Some(middleware) = iter.next() {
let middleware = middleware.clone();
let next2 = next;
let f = move |request| async move { middleware.call(request, next2).await }.boxed();
next = Box::new(f);
}

pub struct LogMiddleware<Next> {
(next)(request).await
}
}

pub struct LogMiddleware {
count: AtomicU32,
next: Next,
}

impl<Next> LogMiddleware<Next> {
pub fn new(next: Next) -> Self {
Self { count: Default::default(), next }
impl LogMiddleware {
pub fn new() -> Self {
Self {
count: Default::default(),
}
}
}

#[async_trait]
impl<Next> Middleware for LogMiddleware<Next>
where
Next: Middleware<Response = JsonValue, Error = Error> + Send + Sync,
{
type Response = JsonValue;
type Error = Error;

async fn call(&self, request: Request) -> Result<Self::Response, Self::Error> {
impl Middleware<Result<JsonValue, Error>> for LogMiddleware {
async fn call(
&self,
request: Request,
next: NextFn<Result<JsonValue, Error>>,
) -> Result<JsonValue, Error> {
self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let val = self.count.load(std::sync::atomic::Ordering::SeqCst);
log::info!("> {}: {}", val, request.method);
let res = self.next.call(request).await;
let res = next(request).await;
log::info!("< {}: {:?}", val, res);
res
}
Expand All @@ -63,11 +93,12 @@ impl UpstreamMiddleware {
}

#[async_trait]
impl Middleware for UpstreamMiddleware {
type Response = JsonValue;
type Error = Error;

async fn call(&self, request: Request) -> Result<Self::Response, Self::Error> {
impl Middleware<Result<JsonValue, Error>> for UpstreamMiddleware {
async fn call(
&self,
request: Request,
_next: NextFn<Result<JsonValue, Error>>,
) -> Result<JsonValue, Error> {
self.client.request(&request.method, request.params).await
}
}
32 changes: 26 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{net::SocketAddr, sync::Arc};

use futures::FutureExt;
use jsonrpsee::server::{RpcModule, ServerBuilder};
use tokio::task::JoinHandle;

use crate::{
client::Client,
config::Config,
middleware::{LogMiddleware, UpstreamMiddleware, Middleware, Request},
middleware::{LogMiddleware, Middlewares, Request, UpstreamMiddleware},
};

// TODO: https://github.com/paritytech/jsonrpsee/issues/985
Expand All @@ -30,15 +31,34 @@ pub async fn start_server(
let client = Arc::new(client);

for method in &config.rpcs.methods {
let middlewares = UpstreamMiddleware::new(client.clone());
let middlewares = LogMiddleware::new(middlewares);
let middlewares = Arc::new(middlewares);
let middlewares = Arc::new(Middlewares::new(
vec![
Arc::new(LogMiddleware::new()),
Arc::new(UpstreamMiddleware::new(client.clone())),
],
Arc::new(|_| {
async {
Err(
jsonrpsee::types::error::CallError::Failed(anyhow::Error::msg(
"Bad configuration",
))
.into(),
)
}
.boxed()
}),
));

let method_name = string_to_static_str(method.method.clone());
module.register_async_method(method_name, move |params, _| {
let middlewares = middlewares.clone();
async move {
middlewares.call(Request { method: method_name.into(), params: params.into_owned() }).await
middlewares
.call(Request {
method: method_name.into(),
params: params.into_owned(),
})
.await
}
})?;
}
Expand Down Expand Up @@ -80,7 +100,7 @@ pub async fn start_server(
let subscribe_name = string_to_static_str(subscription.subscribe.clone());
let unsubscribe_name = string_to_static_str(subscription.unsubscribe.clone());
let name = string_to_static_str(subscription.name.clone());

let client = client.clone();

module.register_subscription(subscribe_name, name, unsubscribe_name, move |params, mut sink, _| {
Expand Down

0 comments on commit 1b342f3

Please sign in to comment.