Skip to content

Commit

Permalink
Merge branch 'grpc-ingest' into triton-build
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Jan 20, 2025
2 parents b07b475 + 9f3d543 commit 7ff1c77
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use {
},
solana_sdk::{pubkey::Pubkey, signature::Signature},
std::{collections::HashMap, marker::PhantomData, sync::Arc},
tokio::time::{sleep, Duration},
tokio::{
task::JoinSet,
time::{sleep, Duration},
},
tracing::{debug, error, warn},
yellowstone_grpc_proto::{
convert_from::{
Expand Down Expand Up @@ -408,7 +411,7 @@ impl<H: MessageHandler> IngestStream<H> {
let config_messages = Arc::clone(&config);

let messages = tokio::spawn(async move {
let mut tasks = FuturesUnordered::new();
let mut tasks = JoinSet::new();
let config = Arc::clone(&config_messages);
let handler = handler.clone();

Expand All @@ -417,7 +420,7 @@ impl<H: MessageHandler> IngestStream<H> {
Some(ids) = msg_rx.recv() => {
for StreamId { id, map } in ids {
if tasks.len() >= config.max_concurrency {
tasks.next().await;
tasks.join_next().await;
}

let handler = handler.clone();
Expand All @@ -426,7 +429,7 @@ impl<H: MessageHandler> IngestStream<H> {

ingest_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
tasks.spawn(async move {
let start_time = tokio::time::Instant::now();
let result = handler.handle(map).await.map_err(IngestMessageError::into);
let elapsed_time = start_time.elapsed().as_secs_f64();
Expand Down Expand Up @@ -455,7 +458,7 @@ impl<H: MessageHandler> IngestStream<H> {
}

ingest_tasks_total_dec(&config.name, &config.consumer);
}));
});
}
}
_ = &mut msg_shutdown_rx => {
Expand All @@ -464,13 +467,13 @@ impl<H: MessageHandler> IngestStream<H> {
}
}

while (tasks.next().await).is_some() {}
while (tasks.join_next().await).is_some() {}
});

let ack = tokio::spawn({
let config = Arc::clone(&config);
let mut pending = Vec::new();
let mut tasks = FuturesUnordered::new();
let mut tasks = JoinSet::new();
let handler = Arc::new(Acknowledge::new(Arc::clone(&config), connection.clone()));

async move {
Expand All @@ -484,7 +487,7 @@ impl<H: MessageHandler> IngestStream<H> {

if pending.len() >= config.xack_batch_max_size {
if tasks.len() >= config.ack_concurrency {
tasks.next().await;
tasks.join_next().await;
}

let ids = std::mem::take(&mut pending);
Expand All @@ -493,25 +496,25 @@ impl<H: MessageHandler> IngestStream<H> {

ack_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
tasks.spawn(async move {
handler.handle(ids).await;
}));
});

deadline.as_mut().reset(tokio::time::Instant::now() + config.xack_batch_max_idle);
}
}
_ = &mut deadline, if !pending.is_empty() => {
if tasks.len() >= config.ack_concurrency {
tasks.next().await;
tasks.join_next().await;
}
let ids = std::mem::take(&mut pending);
let handler = Arc::clone(&handler);

ack_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
tasks.spawn(async move {
handler.handle(ids).await;
}));
});

deadline.as_mut().reset(tokio::time::Instant::now() + config.xack_batch_max_idle);
}
Expand All @@ -526,7 +529,7 @@ impl<H: MessageHandler> IngestStream<H> {
handler.handle(std::mem::take(&mut pending)).await;
}

while (tasks.next().await).is_some() {}
while (tasks.join_next().await).is_some() {}
}
});

Expand Down

0 comments on commit 7ff1c77

Please sign in to comment.