diff --git a/crates/rpc-client/src/call.rs b/crates/rpc-client/src/call.rs index 4efa940a7cb..538f74d0bc3 100644 --- a/crates/rpc-client/src/call.rs +++ b/crates/rpc-client/src/call.rs @@ -1,116 +1,15 @@ use alloy_json_rpc::{ - transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcParam, - RpcResult, RpcReturn, + transform_response, try_deserialize_ok, Request, ResponsePacket, RpcParam, RpcReturn, }; use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult}; use core::panic; -use futures::FutureExt; -use serde_json::value::RawValue; use std::{ fmt, - future::Future, + future::{Future, IntoFuture}, marker::PhantomData, - pin::Pin, - task::{self, ready, Poll::Ready}, }; use tower::Service; -/// The states of the [`RpcCall`] future. -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[pin_project::pin_project(project = CallStateProj)] -enum CallState -where - Params: RpcParam, -{ - Prepared { - request: Option>, - connection: BoxTransport, - }, - AwaitingResponse { - #[pin] - fut: >::Future, - }, - Complete, -} - -impl Clone for CallState -where - Params: RpcParam, -{ - fn clone(&self) -> Self { - match self { - Self::Prepared { request, connection } => { - Self::Prepared { request: request.clone(), connection: connection.clone() } - } - _ => panic!("cloned after dispatch"), - } - } -} - -impl fmt::Debug for CallState -where - Params: RpcParam, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(match self { - Self::Prepared { .. } => "Prepared", - Self::AwaitingResponse { .. } => "AwaitingResponse", - Self::Complete => "Complete", - }) - } -} - -impl Future for CallState -where - Params: RpcParam, -{ - type Output = TransportResult>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - loop { - match self.as_mut().project() { - CallStateProj::Prepared { connection, request } => { - if let Err(e) = - task::ready!(Service::::poll_ready(connection, cx)) - { - self.set(Self::Complete); - return Ready(RpcResult::Err(e)); - } - - let request = request.take().expect("no request"); - debug!(method=%request.meta.method, id=%request.meta.id, "sending request"); - trace!(params_ty=%std::any::type_name::(), ?request, "full request"); - let request = request.serialize(); - let fut = match request { - Ok(request) => { - trace!(request=%request.serialized(), "serialized request"); - connection.call(request.into()) - } - Err(err) => { - trace!(?err, "failed to serialize request"); - self.set(Self::Complete); - return Ready(RpcResult::Err(TransportError::ser_err(err))); - } - }; - self.set(Self::AwaitingResponse { fut }); - } - CallStateProj::AwaitingResponse { fut } => { - let res = match task::ready!(fut.poll(cx)) { - Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)), - Err(e) => Ready(RpcResult::Err(e)), - _ => panic!("received batch response from single request"), - }; - self.set(Self::Complete); - return res; - } - CallStateProj::Complete => { - panic!("Polled after completion"); - } - } - } - } -} - /// A prepared, but unsent, RPC call. /// /// This is a future that will send the request when polled. It contains a @@ -130,26 +29,21 @@ where /// batch request must immediately erase the `Param` type to allow batching of /// requests with different `Param` types, while the `RpcCall` may do so lazily. #[must_use = "futures do nothing unless you `.await` or poll them"] -#[pin_project::pin_project] #[derive(Clone)] -pub struct RpcCall Output> -where - Params: RpcParam, - Map: FnOnce(Resp) -> Output, -{ - #[pin] - state: CallState, - map: Option, +pub struct RpcCall Output> { + request: Request, + connection: BoxTransport, + map: Map, _pd: core::marker::PhantomData (Resp, Output)>, } -impl core::fmt::Debug for RpcCall +impl fmt::Debug for RpcCall where Params: RpcParam, Map: FnOnce(Resp) -> Output, { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("RpcCall").field("state", &self.state).finish() + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RpcCall").finish_non_exhaustive() } } @@ -158,13 +52,11 @@ where Params: RpcParam, { #[doc(hidden)] - pub fn new(req: Request, connection: impl IntoBoxTransport) -> Self { + pub fn new(request: Request, connection: impl IntoBoxTransport) -> Self { Self { - state: CallState::Prepared { - request: Some(req), - connection: connection.into_box_transport(), - }, - map: Some(std::convert::identity), + request, + connection: connection.into_box_transport(), + map: std::convert::identity, _pd: PhantomData, } } @@ -193,24 +85,16 @@ where where NewMap: FnOnce(Resp) -> NewOutput, { - RpcCall { state: self.state, map: Some(map), _pd: PhantomData } + RpcCall { request: self.request, connection: self.connection, map, _pd: PhantomData } } /// Returns `true` if the request is a subscription. - /// - /// # Panics - /// - /// Panics if called after the request has been sent. pub fn is_subscription(&self) -> bool { self.request().meta.is_subscription() } /// Set the request to be a non-standard subscription (i.e. not /// "eth_subscribe"). - /// - /// # Panics - /// - /// Panics if called after the request has been sent. pub fn set_is_subscription(&mut self) { self.request_mut().meta.set_is_subscription(); } @@ -224,49 +108,28 @@ where /// /// This is useful for modifying the params after the request has been /// prepared. - /// - /// # Panics - /// - /// Panics if called after the request has been sent. pub fn params(&mut self) -> &mut Params { &mut self.request_mut().params } /// Returns a reference to the request. - /// - /// # Panics - /// - /// Panics if called after the request has been sent. pub fn request(&self) -> &Request { - let CallState::Prepared { request, .. } = &self.state else { - panic!("Cannot get request after request has been sent"); - }; - request.as_ref().expect("no request in prepared") + &self.request } /// Returns a mutable reference to the request. - /// - /// # Panics - /// - /// Panics if called after the request has been sent. pub fn request_mut(&mut self) -> &mut Request { - let CallState::Prepared { request, .. } = &mut self.state else { - panic!("Cannot get request after request has been sent"); - }; - request.as_mut().expect("no request in prepared") + &mut self.request } /// Map the params of the request into a new type. pub fn map_params( self, - map: impl Fn(Params) -> NewParams, + map: impl FnOnce(Params) -> NewParams, ) -> RpcCall { - let CallState::Prepared { request, connection } = self.state else { - panic!("Cannot get request after request has been sent"); - }; - let request = request.expect("no request in prepared").map_params(map); RpcCall { - state: CallState::Prepared { request: Some(request), connection }, + request: self.request.map_params(map), + connection: self.connection, map: self.map, _pd: PhantomData, } @@ -285,13 +148,9 @@ where /// /// Panics if called after the request has been polled. pub fn into_owned_params(self) -> RpcCall { - let CallState::Prepared { request, connection } = self.state else { - panic!("Cannot get params after request has been sent"); - }; - let request = request.expect("no request in prepared").into_owned_params(); - RpcCall { - state: CallState::Prepared { request: Some(request), connection }, + request: self.request.into_owned_params(), + connection: self.connection, map: self.map, _pd: PhantomData, } @@ -302,30 +161,37 @@ impl<'a, Params, Resp, Output, Map> RpcCall where Params: RpcParam + 'a, Resp: RpcReturn, - Output: 'static, + Output: 'a, Map: FnOnce(Resp) -> Output + Send + 'a, { /// Convert this future into a boxed, pinned future, erasing its type. pub fn boxed(self) -> RpcFut<'a, Output> { - Box::pin(self) + self.into_future() + } + + async fn do_call(self) -> TransportResult { + let Self { request, mut connection, map, _pd: PhantomData } = self; + std::future::poll_fn(|cx| connection.poll_ready(cx)).await?; + let serialized_request = request.serialize().map_err(TransportError::ser_err)?; + let response_packet = connection.call(serialized_request.into()).await?; + let ResponsePacket::Single(response) = response_packet else { + panic!("received batch response from single request") + }; + try_deserialize_ok(transform_response(response)).map(map) } } -impl Future for RpcCall +impl<'a, Params, Resp, Output, Map> IntoFuture for RpcCall where - Params: RpcParam, + Params: RpcParam + 'a, Resp: RpcReturn, - Output: 'static, - Map: FnOnce(Resp) -> Output, + Output: 'a, + Map: FnOnce(Resp) -> Output + Send + 'a, { - type Output = TransportResult; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - trace!(?self.state, "polling RpcCall"); - - let this = self.get_mut(); - let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx))); + type IntoFuture = RpcFut<'a, Output>; + type Output = ::Output; - Ready(resp.map(this.map.take().expect("polled after completion"))) + fn into_future(self) -> Self::IntoFuture { + Box::pin(self.do_call()) } }