From d3a0829147917d82dc3cb61dc62dbf8b8a814c78 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Mon, 30 Dec 2024 15:37:35 +0100 Subject: [PATCH] fix: add metric and unbounded sender for receipt store Signed-off-by: Gustavo Inacio --- crates/service/src/tap.rs | 9 +++++---- crates/service/src/tap/receipt_store.rs | 20 ++++++++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/crates/service/src/tap.rs b/crates/service/src/tap.rs index e6ca2b48..ed7ef101 100644 --- a/crates/service/src/tap.rs +++ b/crates/service/src/tap.rs @@ -10,7 +10,7 @@ use sqlx::PgPool; use tap_core::receipt::checks::ReceiptCheck; use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain}; use tokio::sync::{ - mpsc::{self, Sender}, + mpsc::{self, UnboundedSender}, watch::Receiver, }; use tokio_util::sync::CancellationToken; @@ -31,7 +31,7 @@ const GRACE_PERIOD: u64 = 60; #[derive(Clone)] pub struct IndexerTapContext { domain_separator: Arc, - receipt_producer: Sender, + receipt_producer: UnboundedSender, cancelation_token: CancellationToken, } @@ -60,8 +60,9 @@ impl IndexerTapContext { } pub async fn new(pgpool: PgPool, domain_separator: Eip712Domain) -> Self { - const MAX_RECEIPT_QUEUE_SIZE: usize = 1000; - let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE); + // const MAX_RECEIPT_QUEUE_SIZE: usize = 1000; + // let (tx, rx) = mpsc::channel(MAX_RECEIPT_QUEUE_SIZE); + let (tx, rx) = mpsc::unbounded_channel(); let cancelation_token = CancellationToken::new(); let inner = InnerContext { pgpool }; Self::spawn_store_receipt_task(inner, rx, cancelation_token.clone()); diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/receipt_store.rs index 2d284197..7e47cd0e 100644 --- a/crates/service/src/tap/receipt_store.rs +++ b/crates/service/src/tap/receipt_store.rs @@ -3,17 +3,27 @@ use anyhow::anyhow; use bigdecimal::num_bigint::BigInt; +use lazy_static::lazy_static; +use prometheus::{register_gauge, Gauge}; use sqlx::{types::BigDecimal, PgPool}; use tap_core::{ manager::adapters::ReceiptStore, receipt::{state::Checking, ReceiptWithState}, }; use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain}; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle}; use tokio_util::sync::CancellationToken; use super::{AdapterError, IndexerTapContext}; +lazy_static! { + pub static ref RECEIPTS_IN_QUEUE: Gauge = register_gauge!( + "indexer_receipts_in_queue_total", + "Total receipts waiting to be stored", + ) + .unwrap(); +} + #[derive(Clone)] pub struct InnerContext { pub pgpool: PgPool, @@ -74,7 +84,7 @@ impl InnerContext { impl IndexerTapContext { pub fn spawn_store_receipt_task( inner_context: InnerContext, - mut receiver: Receiver, + mut receiver: UnboundedReceiver, cancelation_token: CancellationToken, ) -> JoinHandle<()> { const BUFFER_SIZE: usize = 100; @@ -83,7 +93,8 @@ impl IndexerTapContext { let mut buffer = Vec::with_capacity(BUFFER_SIZE); tokio::select! { biased; - _ = receiver.recv_many(&mut buffer, BUFFER_SIZE) => { + size = receiver.recv_many(&mut buffer, BUFFER_SIZE) => { + RECEIPTS_IN_QUEUE.sub(size as f64); if let Err(e) = inner_context.store_receipts(buffer).await { tracing::error!("Failed to store receipts: {}", e); } @@ -104,10 +115,11 @@ impl ReceiptStore for IndexerTapContext { receipt: ReceiptWithState, ) -> Result { let db_receipt = DatabaseReceipt::from_receipt(receipt, &self.domain_separator)?; - self.receipt_producer.send(db_receipt).await.map_err(|e| { + self.receipt_producer.send(db_receipt).map_err(|e| { tracing::error!("Failed to queue receipt for storage: {}", e); anyhow!(e) })?; + RECEIPTS_IN_QUEUE.inc(); // We don't need receipt_ids Ok(0)