diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 4c1b176a..656e0f33 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -3,8 +3,9 @@ use { config::{ConfigBlockFailAction, ConfigGrpc}, filters::{Filter, FilterAccountsDataSlice}, prom::{ - self, CONNECTIONS_TOTAL, GEYSER_LOOP_HISTOGRAM, MESSAGE_QUEUE_SIZE, - SNAPSHOT_MESSAGE_QUEUE_SIZE, + self, BLOCK_UPDATE_HISTOGRAM, CONNECTIONS_TOTAL, GEYSER_LOOP_HISTOGRAM, + MESSAGE_QUEUE_SIZE, REMOVE_OUTDATED_HISTOGRAM, SNAPSHOT_MESSAGE_QUEUE_SIZE, + UPDATE_RECONSTRUCTION_HISTOGRAM, }, version::GrpcVersionInfo, }, @@ -815,73 +816,93 @@ impl GrpcService { }); MESSAGE_QUEUE_SIZE.dec(); - // Update blocks info - if let Some(blocks_meta_tx) = &blocks_meta_tx { - if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { - let _ = blocks_meta_tx.send(message.clone()); + { + let block_update_start = Instant::now(); + let _block_update_guard = scopeguard::guard((), |_| { + BLOCK_UPDATE_HISTOGRAM.observe(block_update_start.elapsed().as_millis() as f64); + }); + + // Update blocks info + if let Some(blocks_meta_tx) = &blocks_meta_tx { + if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) { + let _ = blocks_meta_tx.send(message.clone()); + } } } // Remove outdated block reconstruction info - match &message { - // On startup we can receive few Confirmed/Finalized slots without BlockMeta message - // With saved first Processed slot we can ignore errors caused by startup process - Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => { - processed_first_slot = Some(msg.slot); - } - Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => { - // keep extra 10 slots - if let Some(msg_slot) = msg.slot.checked_sub(10) { - loop { - match messages.keys().next().cloned() { - Some(slot) if slot < msg_slot => { - if let Some(slot_messages) = messages.remove(&slot) { - match processed_first_slot { - Some(processed_first) if slot <= processed_first => continue, - None => continue, - _ => {} - } - - if !slot_messages.sealed && slot_messages.finalized_at.is_some() { - let mut reasons = vec![]; - if let Some(block_meta) = slot_messages.block_meta { - let block_txn_count = block_meta.executed_transaction_count as usize; - let msg_txn_count = slot_messages.transactions.len(); - if block_txn_count != msg_txn_count { - reasons.push("InvalidTxnCount"); - error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}"); - } - let block_entries_count = block_meta.entries_count as usize; - let msg_entries_count = slot_messages.entries.len(); - if block_entries_count != msg_entries_count { - reasons.push("InvalidEntriesCount"); - error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}"); - } - } else { - reasons.push("NoBlockMeta"); + { + let remove_outdated_start = Instant::now(); + let _remove_outdated_guard = scopeguard::guard((), |_| { + REMOVE_OUTDATED_HISTOGRAM.observe(remove_outdated_start.elapsed().as_millis() as f64); + }); + + match &message { + // On startup we can receive few Confirmed/Finalized slots without BlockMeta message + // With saved first Processed slot we can ignore errors caused by startup process + Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => { + processed_first_slot = Some(msg.slot); + } + Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => { + // keep extra 10 slots + if let Some(msg_slot) = msg.slot.checked_sub(10) { + loop { + match messages.keys().next().cloned() { + Some(slot) if slot < msg_slot => { + if let Some(slot_messages) = messages.remove(&slot) { + match processed_first_slot { + Some(processed_first) if slot <= processed_first => continue, + None => continue, + _ => {} } - let reason = reasons.join(","); - prom::update_invalid_blocks(format!("failed reconstruct {reason}")); - match block_fail_action { - ConfigBlockFailAction::Log => { - error!("failed reconstruct #{slot} {reason}"); + if !slot_messages.sealed && slot_messages.finalized_at.is_some() { + let mut reasons = vec![]; + if let Some(block_meta) = slot_messages.block_meta { + let block_txn_count = block_meta.executed_transaction_count as usize; + let msg_txn_count = slot_messages.transactions.len(); + if block_txn_count != msg_txn_count { + reasons.push("InvalidTxnCount"); + error!("failed to reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}"); + } + let block_entries_count = block_meta.entries_count as usize; + let msg_entries_count = slot_messages.entries.len(); + if block_entries_count != msg_entries_count { + reasons.push("InvalidEntriesCount"); + error!("failed to reconstruct #{slot} -- entries count: {block_entries_count} vs {msg_entries_count}"); + } + } else { + reasons.push("NoBlockMeta"); } - ConfigBlockFailAction::Panic => { - panic!("failed reconstruct #{slot} {reason}"); + let reason = reasons.join(","); + + prom::update_invalid_blocks(format!("failed reconstruct {reason}")); + match block_fail_action { + ConfigBlockFailAction::Log => { + error!("failed reconstruct #{slot} {reason}"); + } + ConfigBlockFailAction::Panic => { + panic!("failed reconstruct #{slot} {reason}"); + } } } } } + _ => break, } - _ => break, } } } + _ => {} } - _ => {} } + { + let update_reconstruction_start = Instant::now(); + let _update_reconstruction_guard = scopeguard::guard((), |_| { + UPDATE_RECONSTRUCTION_HISTOGRAM.observe(update_reconstruction_start.elapsed().as_millis() as f64); + }); + // Update block reconstruction info let slot_messages = messages.entry(message.get_slot()).or_default(); if !matches!(message, Message::Slot(_)) { @@ -1043,6 +1064,7 @@ impl GrpcService { } } } + } () = &mut processed_sleep => { if !processed_messages.is_empty() { let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into())); diff --git a/yellowstone-grpc-geyser/src/prom.rs b/yellowstone-grpc-geyser/src/prom.rs index 7fca149f..f204e3eb 100644 --- a/yellowstone-grpc-geyser/src/prom.rs +++ b/yellowstone-grpc-geyser/src/prom.rs @@ -53,6 +53,16 @@ lazy_static::lazy_static! { pub static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new( "connections_total", "Total number of connections to GRPC service" ).unwrap(); + + pub static ref BLOCK_UPDATE_HISTOGRAM: Histogram = Histogram::with_opts( + HistogramOpts::new("block_update_histogram", "Processing loop time") + ).unwrap(); + pub static ref REMOVE_OUTDATED_HISTOGRAM: Histogram = Histogram::with_opts( + HistogramOpts::new("remove_outdated_histogram", "Processing loop time") + ).unwrap(); + pub static ref UPDATE_RECONSTRUCTION_HISTOGRAM: Histogram = Histogram::with_opts( + HistogramOpts::new("update_reconstruction_histogram", "Processing loop time") + ).unwrap(); } #[derive(Debug)] @@ -79,6 +89,9 @@ impl PrometheusService { register!(INCOMING_MESSAGES_COUNTER); register!(SNAPSHOT_MESSAGE_QUEUE_SIZE); register!(GEYSER_LOOP_HISTOGRAM); + register!(BLOCK_UPDATE_HISTOGRAM); + register!(REMOVE_OUTDATED_HISTOGRAM); + register!(UPDATE_RECONSTRUCTION_HISTOGRAM); VERSION .with_label_values(&[