Skip to content

Commit

Permalink
send request with subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
vovkman committed Dec 4, 2023
1 parent 7bd3596 commit a3bff42
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
6 changes: 1 addition & 5 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,7 @@ async fn geyser_subscribe(
request: SubscribeRequest,
resub: usize,
) -> anyhow::Result<()> {
let (mut subscribe_tx, mut stream) = client.subscribe().await?;
subscribe_tx
.send(request)
.await
.map_err(GeyserGrpcClientError::SubscribeSendError)?;
let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?;

info!("stream opened");
let mut counter = 0;
Expand Down
20 changes: 19 additions & 1 deletion yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use http::request;

use {
bytes::Bytes,
futures::{
Expand Down Expand Up @@ -173,7 +175,23 @@ impl<F: Interceptor> GeyserGrpcClient<F> {
impl Sink<SubscribeRequest, Error = mpsc::SendError>,
impl Stream<Item = Result<SubscribeUpdate, Status>>,
)> {
let (subscribe_tx, subscribe_rx) = mpsc::unbounded();
self.subscribe_with_request(None).await
}

pub async fn subscribe_with_request(
&mut self,
request: Option<SubscribeRequest>,
) -> GeyserGrpcClientResult<(
impl Sink<SubscribeRequest, Error = mpsc::SendError>,
impl Stream<Item = Result<SubscribeUpdate, Status>>,
)> {
let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded();
if let Some(request) = request {
subscribe_tx
.send(request)
.await
.map_err(GeyserGrpcClientError::SubscribeSendError)?;
}
let response: Response<Streaming<SubscribeUpdate>> =
self.geyser.subscribe(subscribe_rx).await?;
Ok((subscribe_tx, response.into_inner()))
Expand Down

0 comments on commit a3bff42

Please sign in to comment.