Skip to content

Commit

Permalink
api tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xlc committed Feb 8, 2023
1 parent e0e00e8 commit 29a7186
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 99 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ log = "0.4.17"
lru = "0.9.0"
serde = "1.0.152"
serde_yaml = "0.9.17"
serde_json = "1.0.92"
tokio = { version = "1.24.2", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
tower-http = { version = "0.3.5", features = ["full"] }
Expand Down
3 changes: 3 additions & 0 deletions src/api.rs → src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use tokio::sync::{watch, RwLock};

use crate::client::Client;

#[cfg(test)]
mod tests;

pub struct ValueHandle<T> {
inner: RwLock<watch::Receiver<Option<T>>>,
}
Expand Down
128 changes: 128 additions & 0 deletions src/api/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use jsonrpsee::{server::ServerHandle, SubscriptionSink};
use serde_json::json;
use tokio::sync::{mpsc, oneshot};

use super::*;

use crate::client::mock::TestServerBuilder;

async fn create_client() -> (
Api,
ServerHandle,
mpsc::Receiver<(JsonValue, SubscriptionSink)>,
mpsc::Receiver<(JsonValue, SubscriptionSink)>,
mpsc::Receiver<(JsonValue, oneshot::Sender<JsonValue>)>,
) {
let mut builder = TestServerBuilder::new();

let head_rx = builder.register_subscription(
"chain_subscribeNewHeads",
"chain_newHead",
"chain_unsubscribeNewHeads",
);

let finalized_head_rx = builder.register_subscription(
"chain_subscribeFinalizedHeads",
"chain_finalizedHead",
"chain_unsubscribeFinalizedHeads",
);

let block_hash_rx = builder.register_method("chain_getBlockHash");

let (addr, server) = builder.build().await;

let client = Client::new(&[format!("ws://{addr}")]).await.unwrap();
let api = Api::new(Arc::new(client));

(api, server, head_rx, finalized_head_rx, block_hash_rx)
}

#[tokio::test]
async fn get_head_finalized_head() {
let (api, server, mut head_rx, mut finalized_head_rx, mut block_rx) = create_client().await;

let head = api.get_head();
let finalized_head = api.get_finalized_head();

// access value before subscription is established

let h1 = tokio::spawn(async move {
assert_eq!(head.read().await, (json!("0xabcd"), 0x1234));
// should be able to read it multiple times
assert_eq!(head.read().await, (json!("0xabcd"), 0x1234));
});

let (_, mut head_sink) = head_rx.recv().await.unwrap();
head_sink.send(&json!({ "number": "0x1234" })).unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x1234]));
tx.send(json!("0xabcd")).unwrap();
}

let (_, mut finalized_head_sink) = finalized_head_rx.recv().await.unwrap();
finalized_head_sink
.send(&json!({ "number": "0x4321" }))
.unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x4321]));
tx.send(json!("0xdcba")).unwrap();
}

// read after subscription is established

let h2 = tokio::spawn(async move {
let val = finalized_head.read().await;
assert_eq!(val, (json!("0xdcba"), 0x4321));
});

// new head

head_sink.send(&json!({ "number": "0x1122" })).unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x1122]));
tx.send(json!("0xaabb")).unwrap();
}

let finalized_head = api.get_finalized_head();
// still old value
assert_eq!(finalized_head.read().await, (json!("0xdcba"), 0x4321));

// wait a bit for the value to be updated
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

let head = api.get_head();
assert_eq!(head.read().await, (json!("0xaabb"), 0x1122));

// new finalized head
finalized_head_sink
.send(&json!({ "number": "0x2233" }))
.unwrap();
finalized_head_sink
.send(&json!({ "number": "0x3344" }))
.unwrap();

{
let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x2233]));
tx.send(json!("0xbbcc")).unwrap();

let (params, tx) = block_rx.recv().await.unwrap();
assert_eq!(params, json!([0x3344]));
tx.send(json!("0xccdd")).unwrap();
}

// wait a bit for the value to be updated
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

assert_eq!(finalized_head.read().await, (json!("0xccdd"), 0x3344));

h1.await.unwrap();
h2.await.unwrap();
server.stop().unwrap();
}
138 changes: 42 additions & 96 deletions src/client/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,96 +10,71 @@ use jsonrpsee::{
server::{RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle},
SubscriptionSink,
};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::sync::{mpsc, oneshot};

pub struct TestServerBuilder {
methods: Vec<(
&'static str,
Box<dyn FnOnce(mpsc::Receiver<(JsonValue, oneshot::Sender<JsonValue>)>)>,
)>,
subscriptions: Vec<(
&'static str,
&'static str,
&'static str,
Box<dyn FnOnce(mpsc::Receiver<(JsonValue, SubscriptionSink)>)>,
)>,
module: RpcModule<()>,
}

impl TestServerBuilder {
pub fn new() -> Self {
Self {
methods: vec![],
subscriptions: vec![],
module: RpcModule::new(()),
}
}

pub fn register_method(
mut self,
&mut self,
name: &'static str,
f: impl Fn(mpsc::Receiver<(JsonValue, oneshot::Sender<JsonValue>)>) + 'static,
) -> Self {
self.methods.push((name, Box::new(f)));
self
) -> mpsc::Receiver<(JsonValue, oneshot::Sender<JsonValue>)> {
let (tx, rx) = mpsc::channel::<(JsonValue, oneshot::Sender<JsonValue>)>(100);
self.module
.register_async_method(name, move |params, _| {
let tx = tx.clone();
async move {
let (resp_tx, resp_rx) = oneshot::channel();
tx.send((params.parse::<JsonValue>().unwrap(), resp_tx))
.await
.unwrap();
let res = resp_rx.await;
res.map_err(|e| -> Error { CallError::Failed(e.into()).into() })
}
})
.unwrap();
rx
}

pub fn register_subscription(
mut self,
name: &'static str,
&mut self,
sub_name: &'static str,
method_name: &'static str,
unsub_name: &'static str,
f: impl Fn(mpsc::Receiver<(JsonValue, SubscriptionSink)>) + 'static,
) -> Self {
self.subscriptions
.push((name, sub_name, unsub_name, Box::new(f)));
self
) -> mpsc::Receiver<(JsonValue, SubscriptionSink)> {
let (tx, rx) = mpsc::channel::<(JsonValue, SubscriptionSink)>(100);
self.module
.register_subscription(sub_name, method_name, unsub_name, move |params, sink, _| {
let tx = tx.clone();
let params = params.parse::<JsonValue>().unwrap();
tokio::spawn(async move {
tx.send((params, sink)).await.unwrap();
});
Ok(())
})
.unwrap();
rx
}

pub async fn build(self) -> (SocketAddr, ServerHandle) {
enable_logger();

let mut module = RpcModule::new(());

for (name, f) in self.methods {
let (tx, rx) = mpsc::channel::<(JsonValue, oneshot::Sender<JsonValue>)>(100);
f(rx);
module
.register_async_method(name, move |params, _| {
let tx = tx.clone();
async move {
let (resp_tx, resp_rx) = oneshot::channel();
tx.send((params.parse::<JsonValue>().unwrap(), resp_tx))
.await
.unwrap();
let res = resp_rx.await;
res.map_err(|e| -> Error { CallError::Failed(e.into()).into() })
}
})
.unwrap();
}

for (sub_name, method_name, unsub_name, f) in self.subscriptions {
let (tx, rx) = mpsc::channel::<(JsonValue, SubscriptionSink)>(100);
f(rx);
module
.register_subscription(sub_name, method_name, unsub_name, move |params, sink, _| {
let tx = tx.clone();
let params = params.parse::<JsonValue>().unwrap();
tokio::spawn(async move {
tx.send((params, sink)).await.unwrap();
});
Ok(())
})
.unwrap();
}

let server = ServerBuilder::default()
.set_id_provider(RandomStringIdProvider::new(16))
.build("0.0.0.0:0")
.await
.unwrap();

let addr = server.local_addr().unwrap();
let handle = server.start(module).unwrap();
let handle = server.start(self.module).unwrap();

(addr, handle)
}
Expand All @@ -113,41 +88,12 @@ pub async fn dummy_server() -> (
) {
enable_logger();

let rx = Arc::new(Mutex::<
Option<mpsc::Receiver<(JsonValue, oneshot::Sender<JsonValue>)>>,
>::new(None));
let sub_rx =
Arc::new(Mutex::<Option<mpsc::Receiver<(JsonValue, SubscriptionSink)>>>::new(None));

let rx_clone = rx.clone();
let sub_rx_clone = sub_rx.clone();

let (addr, handle) = TestServerBuilder::new()
.register_method("mock_rpc", move |rx2| {
let rx_clone = rx_clone.clone();
tokio::spawn(async move {
(*rx_clone.lock().await) = Some(rx2);
});
})
.register_subscription("mock_sub", "mock", "mock_unsub", move |rx| {
let sub_rx_clone = sub_rx_clone.clone();
tokio::spawn(async move {
(*sub_rx_clone.lock().await) = Some(rx);
});
})
.build()
.await;

// TODO: actually wait for the data to be received
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

let mut guard_rx = rx.lock().await;
let rx = guard_rx.take().unwrap();
drop(guard_rx);

let mut guard_sub_rx = sub_rx.lock().await;
let sub_rx = guard_sub_rx.take().unwrap();
drop(guard_sub_rx);
let mut builder = TestServerBuilder::new();

let rx = builder.register_method("mock_rpc");
let sub_rx = builder.register_subscription("mock_sub", "mock", "mock_unsub");

let (addr, handle) = builder.build().await;

(addr, handle, rx, sub_rx)
}
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use jsonrpsee::{
use crate::config::Config;

#[cfg(test)]
mod mock;
pub mod mock;
#[cfg(test)]
mod tests;

Expand Down

0 comments on commit 29a7186

Please sign in to comment.