From 0c4f872c56c504e845d4c779b6cffe83d9eaea88 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Thu, 22 Aug 2024 21:17:33 +0200 Subject: [PATCH] fix: picking dangling pending messages before reading new --- backfill/src/worker/transaction.rs | 2 + grpc-ingest/src/ingester.rs | 5 +- grpc-ingest/src/redis.rs | 137 ++++++++++++++++------------- 3 files changed, 82 insertions(+), 62 deletions(-) diff --git a/backfill/src/worker/transaction.rs b/backfill/src/worker/transaction.rs index c047cffe4..3bb34b4b0 100644 --- a/backfill/src/worker/transaction.rs +++ b/backfill/src/worker/transaction.rs @@ -33,6 +33,7 @@ impl TryFrom for Pubkey { } } +#[derive(Debug)] pub struct FetchedEncodedTransactionWithStatusMeta(pub EncodedConfirmedTransactionWithStatusMeta); impl TryFrom for TransactionInfo { @@ -41,6 +42,7 @@ impl TryFrom for TransactionInfo { fn try_from( fetched_transaction: FetchedEncodedTransactionWithStatusMeta, ) -> Result { + tracing::info!("fetched transaction: {:?}", fetched_transaction); let mut account_keys = Vec::new(); let encoded_transaction_with_status_meta = fetched_transaction.0; diff --git a/grpc-ingest/src/ingester.rs b/grpc-ingest/src/ingester.rs index 54391efc9..7af7bd44a 100644 --- a/grpc-ingest/src/ingester.rs +++ b/grpc-ingest/src/ingester.rs @@ -5,7 +5,7 @@ use { postgres::{create_pool as pg_create_pool, metrics_pgpool, report_pgpool}, prom::{ download_metadata_inserted_total_inc, program_transformer_task_status_inc, - program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind, + program_transformer_tasks_total_set, redis_xack_inc, ProgramTransformerTaskStatusKind, }, redis::{ metrics_xlen, IngestStream, ProgramTransformerInfo, RedisStream, RedisStreamMessage, @@ -58,7 +58,6 @@ fn download_metadata_notifier_v2( let mut connection = connection.clone(); let stream = stream.clone(); Box::pin(async move { - download_metadata_inserted_total_inc(); let info_bytes = serde_json::to_vec(&info)?; @@ -73,6 +72,8 @@ fn download_metadata_notifier_v2( .query_async(&mut connection) .await?; + redis_xack_inc(&stream, 1); + Ok(()) }) }, diff --git a/grpc-ingest/src/redis.rs b/grpc-ingest/src/redis.rs index 471480fcc..268af0113 100644 --- a/grpc-ingest/src/redis.rs +++ b/grpc-ingest/src/redis.rs @@ -15,8 +15,8 @@ use { redis::{ aio::MultiplexedConnection, streams::{ - StreamClaimReply, StreamId, StreamKey, StreamMaxlen, StreamPendingCountReply, - StreamReadOptions, StreamReadReply, + StreamClaimOptions, StreamClaimReply, StreamId, StreamKey, StreamMaxlen, + StreamPendingCountReply, StreamReadOptions, StreamReadReply, }, AsyncCommands, ErrorKind as RedisErrorKind, RedisResult, Value as RedisValue, }, @@ -298,7 +298,10 @@ impl<'a> .await { Ok(response) => { - info!("Acknowledged and deleted message: response={:?}", response); + debug!( + "Acknowledged and deleted message: stream={:?} response={:?} expected={:?}", + &config.name, response, count + ); redis_xack_inc(&config.name, count); } @@ -371,10 +374,10 @@ impl IngestStream { ) .await { - error!("Failed to create group: {:?}", e); + error!("redis=xgroup_create stream={} err={:?}", config.name, e); } else { debug!( - "Group created successfully: name={}, group={}, consumer={}", + "redis=xgroup_create stream={} group={} consumer={}", config.name, config.group, config.consumer ); } @@ -448,7 +451,7 @@ impl IngestStream { let streams = streams_report.clone(); if let Err(e) = report_xlen(connection, streams).await { - error!("Failed to report xlen: {:?}", e); + error!("redis=report_xlen err={:?}", e); } sleep(Duration::from_millis(100)).await; @@ -461,63 +464,71 @@ impl IngestStream { let (read_shutdown_tx, read_shutdown_rx) = tokio::sync::oneshot::channel(); tokio::spawn(async move { - debug!("Starting read stream task name={}", config_read.name); + debug!( + "redis=read_stream stream={} Starting read stream task", + config_read.name + ); let mut shutdown_rx = read_shutdown_rx; - // loop { - // if shutdown_rx.try_recv().is_ok() { - // debug!( - // "Shutdown signal received, exiting prefetch loop name={}", - // config_read.name - // ); - // break; - // } - - // if let Ok(pending) = redis::cmd("XPENDING") - // .arg(&config_read.name) - // .arg(&config_read.group) - // .arg("-") - // .arg("+") - // .arg(config_read.batch_size) - // .arg(&config_read.consumer) - // .query_async::<_, StreamPendingCountReply>(&mut connection_read) - // .await - // { - // if pending.ids.is_empty() { - // debug!( - // "No pending messages stream={} consumer={} group={}", - // config_read.name, config_read.consumer, config_read.group - // ); - // break; - // } - - // let ids: Vec<&str> = pending.ids.iter().map(|info| info.id.as_str()).collect(); - // let claim_opts = StreamClaimOptions::default(); - - // let claimed: RedisResult = connection_read - // .xclaim_options( - // &config_read.name, - // &config_read.group, - // &config_read.consumer, - // 20, - // &ids, - // claim_opts, - // ) - // .await; - - // if let Ok(claimed) = claimed { - // for StreamId { id, map } in claimed.ids { - // executor.push(IngestStreamJob::Process((id, map))); - // } - // } - // } - // } + let mut start = "-".to_owned(); + loop { + if shutdown_rx.try_recv().is_ok() { + debug!( + "redis=read_stream stream={} Shutdown signal received, exiting prefetch loop", + config_read.name + ); + break; + } + + if let Ok(pending) = redis::cmd("XPENDING") + .arg(&config_read.name) + .arg(&config_read.group) + .arg(&start) + .arg("+") + .arg(config_read.batch_size) + .arg(&config_read.consumer) + .query_async::<_, StreamPendingCountReply>(&mut connection_read) + .await + { + if pending.ids.is_empty() { + debug!( + "redis=XPENDING stream={} consumer={} group={} No pending messages", + config_read.name, config_read.consumer, config_read.group + ); + break; + } + + let ids: Vec<&str> = pending.ids.iter().map(|info| info.id.as_str()).collect(); + let claim_opts = StreamClaimOptions::default(); + + let claimed: RedisResult = connection_read + .xclaim_options( + &config_read.name, + &config_read.group, + &config_read.consumer, + 20, + &ids, + claim_opts, + ) + .await; + + if let Ok(claimed) = claimed { + for StreamId { id, map } in claimed.ids { + executor.push(IngestStreamJob::Process((id, map))); + } + } + + if let Some(last_id) = pending.ids.last() { + start = last_id.id.clone(); + } + } + } loop { if shutdown_rx.try_recv().is_ok() { debug!( - "Shutdown signal received, exiting read loop name={}", + "redis=read_stream stream={} Shutdown signal received, exiting read loop", config_read.name ); break; @@ -535,7 +546,10 @@ impl IngestStream { match result { Ok(reply) => { let count = reply.keys.len(); - info!("Reading and processing: count={:?}", count); + debug!( + "redis=xread stream={:?} count={:?}", + &config_read.name, count + ); for StreamKey { key: _, ids } in reply.keys { for StreamId { id, map } in ids { @@ -544,7 +558,7 @@ impl IngestStream { } } Err(err) => { - error!("Error reading from stream: {:?}", err); + error!("redis=xread stream={:?} err={:?}", &config_read.name, err); } } } @@ -552,11 +566,14 @@ impl IngestStream { let control = tokio::spawn(async move { let mut shutdown_rx: tokio::sync::oneshot::Receiver<()> = shutdown_rx; - debug!("Starting ingest stream name={}", config.name); + debug!( + "redis=ingest_stream stream={} Starting ingest stream", + config.name + ); tokio::select! { _ = &mut shutdown_rx => { - info!("Shut down ingest stream name={}", config.name); + info!("redis=ingest_stream stream={} Shut down ingest stream", config.name); let _ = read_shutdown_tx.send(()); let _ = ack_shutdown_tx.send(());