From 1026a5110837f96564732922ada96c9800dd495d Mon Sep 17 00:00:00 2001 From: Bryan Chen Date: Tue, 31 Jan 2023 15:17:16 +1300 Subject: [PATCH] more tests --- src/client/tests.rs | 92 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 13 deletions(-) diff --git a/src/client/tests.rs b/src/client/tests.rs index a86658f..d99027d 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -1,13 +1,50 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, str::FromStr}; use super::*; +use futures::StreamExt; use jsonrpsee::{ - core::server::rpc_module::Methods, server::{RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle}, + SubscriptionSink, }; +use tokio::sync::{mpsc, oneshot}; + +async fn dummy_server() -> ( + SocketAddr, + ServerHandle, + mpsc::Receiver<(JsonValue, oneshot::Sender)>, + mpsc::Receiver<(JsonValue, SubscriptionSink)>, +) { + let mut module = RpcModule::new(()); + + let (tx, rx) = mpsc::channel::<(JsonValue, oneshot::Sender)>(100); + let (sub_tx, sub_rx) = mpsc::channel::<(JsonValue, SubscriptionSink)>(100); + + module + .register_async_method("mock_rpc", move |params, _| { + let tx = tx.clone(); + async move { + let (resp_tx, resp_rx) = oneshot::channel(); + tx.send((params.parse::().unwrap(), resp_tx)) + .await + .unwrap(); + let res = resp_rx.await; + Ok::(res.unwrap()) + } + }) + .unwrap(); + + module + .register_subscription("mock_sub", "sub", "mock_unsub", move |params, sink, _| { + let params = params.parse::().unwrap(); + let sub_tx = sub_tx.clone(); + tokio::spawn(async move { + sub_tx.send((params, sink)).await.unwrap(); + }); + Ok(()) + }) + .unwrap(); -async fn dummy_server(module: impl Into) -> (SocketAddr, ServerHandle) { let server = ServerBuilder::default() .set_id_provider(RandomStringIdProvider::new(16)) .build("0.0.0.0:0") @@ -17,29 +54,58 @@ async fn dummy_server(module: impl Into) -> (SocketAddr, ServerHandle) let addr = server.local_addr().unwrap(); let handle = server.start(module).unwrap(); - (addr, handle) + (addr, handle, rx, sub_rx) } -fn echo_module() -> RpcModule<()> { - let mut module = RpcModule::new(()); - module - .register_method("echo", |params, _| Ok(params.parse::().unwrap())) +#[tokio::test] +async fn basic_request() { + let (addr, handle, mut rx, _) = dummy_server().await; + + let client = Client::new(&[format!("ws://{addr}")]).await.unwrap(); + + let handler = tokio::spawn(async move { + let (params, resp_tx) = rx.recv().await.unwrap(); + assert_eq!(params.to_string(), "[1]"); + resp_tx.send(JsonValue::from_str("[1]").unwrap()).unwrap(); + }); + + let result = client + .request("mock_rpc", Params::new(Some("[1]"))) + .await .unwrap(); - module + + assert_eq!(result.to_string(), "[1]"); + + handle.stop().unwrap(); + tokio::join!(handler).0.unwrap(); } #[tokio::test] -async fn test_basic() { - let (addr, handle) = dummy_server(echo_module()).await; +async fn basic_subscription() { + let (addr, handle, _, mut rx) = dummy_server().await; let client = Client::new(&[format!("ws://{addr}")]).await.unwrap(); + let handler = tokio::spawn(async move { + let (params, mut sink) = rx.recv().await.unwrap(); + assert_eq!(params.to_string(), "[123]"); + sink.send(&JsonValue::from_str("10").unwrap()).unwrap(); + sink.send(&JsonValue::from_str("11").unwrap()).unwrap(); + sink.send(&JsonValue::from_str("12").unwrap()).unwrap(); + }); + let result = client - .request("echo", Params::new(Some("[1]"))) + .subscribe("mock_sub", Params::new(Some("[123]")), "mock_unsub") .await .unwrap(); - assert_eq!(result.to_string(), "[1]"); + let result = result + .map(|v| v.unwrap().to_string()) + .collect::>() + .await; + + assert_eq!(result, ["10", "11", "12"]); handle.stop().unwrap(); + tokio::join!(handler).0.unwrap(); }