Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make RpcCall an IntoFuture #1860

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 42 additions & 176 deletions crates/rpc-client/src/call.rs
Original file line number Diff line number Diff line change
@@ -1,116 +1,15 @@
use alloy_json_rpc::{
transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcParam,
RpcResult, RpcReturn,
transform_response, try_deserialize_ok, Request, ResponsePacket, RpcParam, RpcReturn,
};
use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult};
use core::panic;
use futures::FutureExt;
use serde_json::value::RawValue;
use std::{
fmt,
future::Future,
future::{Future, IntoFuture},
marker::PhantomData,
pin::Pin,
task::{self, ready, Poll::Ready},
};
use tower::Service;

/// The states of the [`RpcCall`] future.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project(project = CallStateProj)]
enum CallState<Params>
where
Params: RpcParam,
{
Prepared {
request: Option<Request<Params>>,
connection: BoxTransport,
},
AwaitingResponse {
#[pin]
fut: <BoxTransport as Service<RequestPacket>>::Future,
},
Complete,
}

impl<Params> Clone for CallState<Params>
where
Params: RpcParam,
{
fn clone(&self) -> Self {
match self {
Self::Prepared { request, connection } => {
Self::Prepared { request: request.clone(), connection: connection.clone() }
}
_ => panic!("cloned after dispatch"),
}
}
}

impl<Params> fmt::Debug for CallState<Params>
where
Params: RpcParam,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Prepared { .. } => "Prepared",
Self::AwaitingResponse { .. } => "AwaitingResponse",
Self::Complete => "Complete",
})
}
}

impl<Params> Future for CallState<Params>
where
Params: RpcParam,
{
type Output = TransportResult<Box<RawValue>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
loop {
match self.as_mut().project() {
CallStateProj::Prepared { connection, request } => {
if let Err(e) =
task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
{
self.set(Self::Complete);
return Ready(RpcResult::Err(e));
}

let request = request.take().expect("no request");
debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
let request = request.serialize();
let fut = match request {
Ok(request) => {
trace!(request=%request.serialized(), "serialized request");
connection.call(request.into())
}
Err(err) => {
trace!(?err, "failed to serialize request");
self.set(Self::Complete);
return Ready(RpcResult::Err(TransportError::ser_err(err)));
}
};
self.set(Self::AwaitingResponse { fut });
}
CallStateProj::AwaitingResponse { fut } => {
let res = match task::ready!(fut.poll(cx)) {
Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
Err(e) => Ready(RpcResult::Err(e)),
_ => panic!("received batch response from single request"),
};
self.set(Self::Complete);
return res;
}
CallStateProj::Complete => {
panic!("Polled after completion");
}
}
}
}
}

/// A prepared, but unsent, RPC call.
///
/// This is a future that will send the request when polled. It contains a
Expand All @@ -130,26 +29,21 @@ where
/// batch request must immediately erase the `Param` type to allow batching of
/// requests with different `Param` types, while the `RpcCall` may do so lazily.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
#[derive(Clone)]
pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
where
Params: RpcParam,
Map: FnOnce(Resp) -> Output,
{
#[pin]
state: CallState<Params>,
map: Option<Map>,
pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output> {
request: Request<Params>,
connection: BoxTransport,
map: Map,
_pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
}

impl<Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Params, Resp, Output, Map>
impl<Params, Resp, Output, Map> fmt::Debug for RpcCall<Params, Resp, Output, Map>
where
Params: RpcParam,
Map: FnOnce(Resp) -> Output,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("RpcCall").field("state", &self.state).finish()
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RpcCall").finish_non_exhaustive()
}
}

Expand All @@ -158,13 +52,11 @@ where
Params: RpcParam,
{
#[doc(hidden)]
pub fn new(req: Request<Params>, connection: impl IntoBoxTransport) -> Self {
pub fn new(request: Request<Params>, connection: impl IntoBoxTransport) -> Self {
Self {
state: CallState::Prepared {
request: Some(req),
connection: connection.into_box_transport(),
},
map: Some(std::convert::identity),
request,
connection: connection.into_box_transport(),
map: std::convert::identity,
_pd: PhantomData,
}
}
Expand Down Expand Up @@ -193,24 +85,16 @@ where
where
NewMap: FnOnce(Resp) -> NewOutput,
{
RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
RpcCall { request: self.request, connection: self.connection, map, _pd: PhantomData }
}

/// Returns `true` if the request is a subscription.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn is_subscription(&self) -> bool {
self.request().meta.is_subscription()
}

/// Set the request to be a non-standard subscription (i.e. not
/// "eth_subscribe").
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn set_is_subscription(&mut self) {
self.request_mut().meta.set_is_subscription();
}
Expand All @@ -224,49 +108,28 @@ where
///
/// This is useful for modifying the params after the request has been
/// prepared.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn params(&mut self) -> &mut Params {
&mut self.request_mut().params
}

/// Returns a reference to the request.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn request(&self) -> &Request<Params> {
let CallState::Prepared { request, .. } = &self.state else {
panic!("Cannot get request after request has been sent");
};
request.as_ref().expect("no request in prepared")
&self.request
}

/// Returns a mutable reference to the request.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn request_mut(&mut self) -> &mut Request<Params> {
let CallState::Prepared { request, .. } = &mut self.state else {
panic!("Cannot get request after request has been sent");
};
request.as_mut().expect("no request in prepared")
&mut self.request
}

/// Map the params of the request into a new type.
pub fn map_params<NewParams: RpcParam>(
self,
map: impl Fn(Params) -> NewParams,
map: impl FnOnce(Params) -> NewParams,
) -> RpcCall<NewParams, Resp, Output, Map> {
let CallState::Prepared { request, connection } = self.state else {
panic!("Cannot get request after request has been sent");
};
let request = request.expect("no request in prepared").map_params(map);
RpcCall {
state: CallState::Prepared { request: Some(request), connection },
request: self.request.map_params(map),
connection: self.connection,
map: self.map,
_pd: PhantomData,
}
Expand All @@ -285,13 +148,9 @@ where
///
/// Panics if called after the request has been polled.
pub fn into_owned_params(self) -> RpcCall<Params::Owned, Resp, Output, Map> {
let CallState::Prepared { request, connection } = self.state else {
panic!("Cannot get params after request has been sent");
};
let request = request.expect("no request in prepared").into_owned_params();

RpcCall {
state: CallState::Prepared { request: Some(request), connection },
request: self.request.into_owned_params(),
connection: self.connection,
map: self.map,
_pd: PhantomData,
}
Expand All @@ -302,30 +161,37 @@ impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
where
Params: RpcParam + 'a,
Resp: RpcReturn,
Output: 'static,
Output: 'a,
Map: FnOnce(Resp) -> Output + Send + 'a,
{
/// Convert this future into a boxed, pinned future, erasing its type.
pub fn boxed(self) -> RpcFut<'a, Output> {
Box::pin(self)
self.into_future()
}

async fn do_call(self) -> TransportResult<Output> {
let Self { request, mut connection, map, _pd: PhantomData } = self;
std::future::poll_fn(|cx| connection.poll_ready(cx)).await?;
let serialized_request = request.serialize().map_err(TransportError::ser_err)?;
let response_packet = connection.call(serialized_request.into()).await?;
let ResponsePacket::Single(response) = response_packet else {
panic!("received batch response from single request")
};
try_deserialize_ok(transform_response(response)).map(map)
}
}

impl<Params, Resp, Output, Map> Future for RpcCall<Params, Resp, Output, Map>
impl<'a, Params, Resp, Output, Map> IntoFuture for RpcCall<Params, Resp, Output, Map>
where
Params: RpcParam,
Params: RpcParam + 'a,
Resp: RpcReturn,
Output: 'static,
Map: FnOnce(Resp) -> Output,
Output: 'a,
Map: FnOnce(Resp) -> Output + Send + 'a,
{
type Output = TransportResult<Output>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
trace!(?self.state, "polling RpcCall");

let this = self.get_mut();
let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));
type IntoFuture = RpcFut<'a, Output>;
type Output = <Self::IntoFuture as Future>::Output;

Ready(resp.map(this.map.take().expect("polled after completion")))
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.do_call())
}
}
Loading