Skip to content

Commit

Permalink
fix: picking dangling pending messages before reading new
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Aug 22, 2024
1 parent bab4678 commit 0c4f872
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 62 deletions.
2 changes: 2 additions & 0 deletions backfill/src/worker/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl TryFrom<PubkeyString> for Pubkey {
}
}

#[derive(Debug)]
pub struct FetchedEncodedTransactionWithStatusMeta(pub EncodedConfirmedTransactionWithStatusMeta);

impl TryFrom<FetchedEncodedTransactionWithStatusMeta> for TransactionInfo {
Expand All @@ -41,6 +42,7 @@ impl TryFrom<FetchedEncodedTransactionWithStatusMeta> for TransactionInfo {
fn try_from(
fetched_transaction: FetchedEncodedTransactionWithStatusMeta,
) -> Result<Self, Self::Error> {
tracing::info!("fetched transaction: {:?}", fetched_transaction);
let mut account_keys = Vec::new();
let encoded_transaction_with_status_meta = fetched_transaction.0;

Expand Down
5 changes: 3 additions & 2 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
postgres::{create_pool as pg_create_pool, metrics_pgpool, report_pgpool},
prom::{
download_metadata_inserted_total_inc, program_transformer_task_status_inc,
program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind,
program_transformer_tasks_total_set, redis_xack_inc, ProgramTransformerTaskStatusKind,
},
redis::{
metrics_xlen, IngestStream, ProgramTransformerInfo, RedisStream, RedisStreamMessage,
Expand Down Expand Up @@ -58,7 +58,6 @@ fn download_metadata_notifier_v2(
let mut connection = connection.clone();
let stream = stream.clone();
Box::pin(async move {
download_metadata_inserted_total_inc();

let info_bytes = serde_json::to_vec(&info)?;

Expand All @@ -73,6 +72,8 @@ fn download_metadata_notifier_v2(
.query_async(&mut connection)
.await?;

redis_xack_inc(&stream, 1);

Ok(())
})
},
Expand Down
137 changes: 77 additions & 60 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use {
redis::{
aio::MultiplexedConnection,
streams::{
StreamClaimReply, StreamId, StreamKey, StreamMaxlen, StreamPendingCountReply,
StreamReadOptions, StreamReadReply,
StreamClaimOptions, StreamClaimReply, StreamId, StreamKey, StreamMaxlen,
StreamPendingCountReply, StreamReadOptions, StreamReadReply,
},
AsyncCommands, ErrorKind as RedisErrorKind, RedisResult, Value as RedisValue,
},
Expand Down Expand Up @@ -298,7 +298,10 @@ impl<'a>
.await
{
Ok(response) => {
info!("Acknowledged and deleted message: response={:?}", response);
debug!(
"Acknowledged and deleted message: stream={:?} response={:?} expected={:?}",
&config.name, response, count
);

redis_xack_inc(&config.name, count);
}
Expand Down Expand Up @@ -371,10 +374,10 @@ impl IngestStream {
)
.await
{
error!("Failed to create group: {:?}", e);
error!("redis=xgroup_create stream={} err={:?}", config.name, e);
} else {
debug!(
"Group created successfully: name={}, group={}, consumer={}",
"redis=xgroup_create stream={} group={} consumer={}",
config.name, config.group, config.consumer
);
}
Expand Down Expand Up @@ -448,7 +451,7 @@ impl IngestStream {
let streams = streams_report.clone();

if let Err(e) = report_xlen(connection, streams).await {
error!("Failed to report xlen: {:?}", e);
error!("redis=report_xlen err={:?}", e);
}

sleep(Duration::from_millis(100)).await;
Expand All @@ -461,63 +464,71 @@ impl IngestStream {
let (read_shutdown_tx, read_shutdown_rx) = tokio::sync::oneshot::channel();

tokio::spawn(async move {
debug!("Starting read stream task name={}", config_read.name);
debug!(
"redis=read_stream stream={} Starting read stream task",
config_read.name
);

let mut shutdown_rx = read_shutdown_rx;

// loop {
// if shutdown_rx.try_recv().is_ok() {
// debug!(
// "Shutdown signal received, exiting prefetch loop name={}",
// config_read.name
// );
// break;
// }

// if let Ok(pending) = redis::cmd("XPENDING")
// .arg(&config_read.name)
// .arg(&config_read.group)
// .arg("-")
// .arg("+")
// .arg(config_read.batch_size)
// .arg(&config_read.consumer)
// .query_async::<_, StreamPendingCountReply>(&mut connection_read)
// .await
// {
// if pending.ids.is_empty() {
// debug!(
// "No pending messages stream={} consumer={} group={}",
// config_read.name, config_read.consumer, config_read.group
// );
// break;
// }

// let ids: Vec<&str> = pending.ids.iter().map(|info| info.id.as_str()).collect();
// let claim_opts = StreamClaimOptions::default();

// let claimed: RedisResult<StreamClaimReply> = connection_read
// .xclaim_options(
// &config_read.name,
// &config_read.group,
// &config_read.consumer,
// 20,
// &ids,
// claim_opts,
// )
// .await;

// if let Ok(claimed) = claimed {
// for StreamId { id, map } in claimed.ids {
// executor.push(IngestStreamJob::Process((id, map)));
// }
// }
// }
// }
let mut start = "-".to_owned();
loop {
if shutdown_rx.try_recv().is_ok() {
debug!(
"redis=read_stream stream={} Shutdown signal received, exiting prefetch loop",
config_read.name
);
break;
}

if let Ok(pending) = redis::cmd("XPENDING")
.arg(&config_read.name)
.arg(&config_read.group)
.arg(&start)
.arg("+")
.arg(config_read.batch_size)
.arg(&config_read.consumer)
.query_async::<_, StreamPendingCountReply>(&mut connection_read)
.await
{
if pending.ids.is_empty() {
debug!(
"redis=XPENDING stream={} consumer={} group={} No pending messages",
config_read.name, config_read.consumer, config_read.group
);
break;
}

let ids: Vec<&str> = pending.ids.iter().map(|info| info.id.as_str()).collect();
let claim_opts = StreamClaimOptions::default();

let claimed: RedisResult<StreamClaimReply> = connection_read
.xclaim_options(
&config_read.name,
&config_read.group,
&config_read.consumer,
20,
&ids,
claim_opts,
)
.await;

if let Ok(claimed) = claimed {
for StreamId { id, map } in claimed.ids {
executor.push(IngestStreamJob::Process((id, map)));
}
}

if let Some(last_id) = pending.ids.last() {
start = last_id.id.clone();
}
}
}

loop {
if shutdown_rx.try_recv().is_ok() {
debug!(
"Shutdown signal received, exiting read loop name={}",
"redis=read_stream stream={} Shutdown signal received, exiting read loop",
config_read.name
);
break;
Expand All @@ -535,7 +546,10 @@ impl IngestStream {
match result {
Ok(reply) => {
let count = reply.keys.len();
info!("Reading and processing: count={:?}", count);
debug!(
"redis=xread stream={:?} count={:?}",
&config_read.name, count
);

for StreamKey { key: _, ids } in reply.keys {
for StreamId { id, map } in ids {
Expand All @@ -544,19 +558,22 @@ impl IngestStream {
}
}
Err(err) => {
error!("Error reading from stream: {:?}", err);
error!("redis=xread stream={:?} err={:?}", &config_read.name, err);
}
}
}
});

let control = tokio::spawn(async move {
let mut shutdown_rx: tokio::sync::oneshot::Receiver<()> = shutdown_rx;
debug!("Starting ingest stream name={}", config.name);
debug!(
"redis=ingest_stream stream={} Starting ingest stream",
config.name
);

tokio::select! {
_ = &mut shutdown_rx => {
info!("Shut down ingest stream name={}", config.name);
info!("redis=ingest_stream stream={} Shut down ingest stream", config.name);

let _ = read_shutdown_tx.send(());
let _ = ack_shutdown_tx.send(());
Expand Down

0 comments on commit 0c4f872

Please sign in to comment.