Skip to content

Commit

Permalink
[forwardport] Update local client to support middleware (Kudos Seun) (
Browse files Browse the repository at this point in the history
#589) (#600)

* Update `local` client to support middleware (Kudos Seun) (#589)

* v15.0.1

* v15.0.1 => v15.1.0

* v15.0.1 => v15.1.0

* cargo fmt

* fix tests

* adds *_with_middleware methods

* remove Unpin bound, add documentation, cargo fmt

* 15.0.1

* bump to 15.1.0

Co-authored-by: Seun Lanlege <[email protected]>
Co-authored-by: Niklas <[email protected]>

* cargo fmt --all

Co-authored-by: Seun Lanlege <[email protected]>
Co-authored-by: Niklas <[email protected]>
  • Loading branch information
3 people authored Dec 9, 2020
1 parent 40eec20 commit 61ea7c6
Showing 1 changed file with 60 additions and 17 deletions.
77 changes: 60 additions & 17 deletions core-client/transports/src/transports/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::{
task::{Context, Poll},
Future, Sink, SinkExt, Stream, StreamExt,
};
use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata};
use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata, Middleware};
use jsonrpc_pubsub::Session;
use std::ops::Deref;
use std::pin::Pin;
Expand All @@ -26,10 +26,11 @@ enum Buffered {
None,
}

impl<TMetadata, THandler> LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
TMiddleware: Middleware<TMetadata>,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
{
/// Creates a new `LocalRpc` with default metadata.
pub fn new(handler: THandler) -> Self
Expand All @@ -50,10 +51,11 @@ where
}
}

impl<TMetadata, THandler> Stream for LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> Stream for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
type Item = String;

Expand All @@ -62,10 +64,11 @@ where
}
}

impl<TMetadata, THandler> LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
fn poll_buffered(&mut self, cx: &mut Context) -> Poll<Result<(), RpcError>> {
let response = match self.buffered {
Expand All @@ -87,10 +90,11 @@ where
}
}

impl<TMetadata, THandler> Sink<String> for LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> Sink<String> for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
type Error = RpcError;

Expand Down Expand Up @@ -121,14 +125,15 @@ where
}
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata>`.
pub fn connect_with_metadata<TClient, THandler, TMetadata>(
/// Connects to a `Deref<Target = MetaIoHandler<Metadata>` specifying a custom middleware implementation.
pub fn connect_with_metadata_and_middleware<TClient, THandler, TMetadata, TMiddleware>(
handler: THandler,
meta: TMetadata,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
TMetadata: Metadata + Unpin,
{
let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
Expand All @@ -137,24 +142,53 @@ where
(client, rpc_client)
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata>`.
pub fn connect_with_metadata<TClient, THandler, TMetadata>(
handler: THandler,
meta: TMetadata,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
{
connect_with_metadata_and_middleware(handler, meta)
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>` specifying a custom middleware implementation.
pub fn connect_with_middleware<TClient, THandler, TMetadata, TMiddleware>(
handler: THandler,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
TMetadata: Metadata + Default + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
connect_with_metadata_and_middleware(handler, Default::default())
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>`.
pub fn connect<TClient, THandler, TMetadata>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMetadata: Metadata + Default + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
{
connect_with_metadata(handler, Default::default())
connect_with_middleware(handler)
}

/// Metadata for LocalRpc.
pub type LocalMeta = Arc<Session>;

/// Connects with pubsub.
pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
/// Connects with pubsub specifying a custom middleware implementation.
pub fn connect_with_pubsub_and_middleware<TClient, THandler, TMiddleware>(
handler: THandler,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<LocalMeta>> + Unpin,
TMiddleware: Middleware<LocalMeta> + Unpin,
THandler: Deref<Target = MetaIoHandler<LocalMeta, TMiddleware>> + Unpin,
{
let (tx, rx) = mpsc::unbounded();
let meta = Arc::new(Session::new(tx));
Expand All @@ -164,3 +198,12 @@ where
let client = TClient::from(sender);
(client, rpc_client)
}

/// Connects with pubsub.
pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<LocalMeta>> + Unpin,
{
connect_with_pubsub_and_middleware(handler)
}

0 comments on commit 61ea7c6

Please sign in to comment.