From a3bff425c93367b8c3d829e03c485f9aa58e51d6 Mon Sep 17 00:00:00 2001 From: vovkman Date: Sun, 3 Dec 2023 19:22:10 -0500 Subject: [PATCH] send request with subscribe --- examples/rust/src/bin/client.rs | 6 +----- yellowstone-grpc-client/src/lib.rs | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index ba1f4eb9..39cac8fb 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -524,11 +524,7 @@ async fn geyser_subscribe( request: SubscribeRequest, resub: usize, ) -> anyhow::Result<()> { - let (mut subscribe_tx, mut stream) = client.subscribe().await?; - subscribe_tx - .send(request) - .await - .map_err(GeyserGrpcClientError::SubscribeSendError)?; + let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; info!("stream opened"); let mut counter = 0; diff --git a/yellowstone-grpc-client/src/lib.rs b/yellowstone-grpc-client/src/lib.rs index 99e71f1c..11e964ae 100644 --- a/yellowstone-grpc-client/src/lib.rs +++ b/yellowstone-grpc-client/src/lib.rs @@ -1,3 +1,5 @@ +use http::request; + use { bytes::Bytes, futures::{ @@ -173,7 +175,23 @@ impl GeyserGrpcClient { impl Sink, impl Stream>, )> { - let (subscribe_tx, subscribe_rx) = mpsc::unbounded(); + self.subscribe_with_request(None).await + } + + pub async fn subscribe_with_request( + &mut self, + request: Option, + ) -> GeyserGrpcClientResult<( + impl Sink, + impl Stream>, + )> { + let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded(); + if let Some(request) = request { + subscribe_tx + .send(request) + .await + .map_err(GeyserGrpcClientError::SubscribeSendError)?; + } let response: Response> = self.geyser.subscribe(subscribe_rx).await?; Ok((subscribe_tx, response.into_inner()))