From a56e483729ef75bf2f5a5545296cbcebc203a292 Mon Sep 17 00:00:00 2001 From: Qinxuan Chen Date: Fri, 12 Jul 2024 17:26:51 +0800 Subject: [PATCH] feat: add metrics for finalized/unfinalized block rpc requests (#41) * feat: add metrics for finalized/unfinalized block rpc requests * fix * update description --- src/extensions/prometheus/rpc_metrics.rs | 41 ++++++++++++++++++++++++ src/middlewares/methods/block_tag.rs | 23 ++++++++++--- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/extensions/prometheus/rpc_metrics.rs b/src/extensions/prometheus/rpc_metrics.rs index 09217b4..1787476 100644 --- a/src/extensions/prometheus/rpc_metrics.rs +++ b/src/extensions/prometheus/rpc_metrics.rs @@ -38,6 +38,17 @@ impl RpcMetrics { } } + pub fn finalized_query(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.finalized_query(method); + } + } + pub fn finalized_miss(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.finalized_miss(method); + } + } + pub fn call_metrics(&self) -> Option<(HistogramVec, CounterVec, CounterVec)> { if let Self::Prometheus(inner) = self { return Some(( @@ -57,6 +68,8 @@ pub struct InnerMetrics { closed_session_count: Counter, cache_query_counter: CounterVec, cache_miss_counter: CounterVec, + finalized_query_counter: CounterVec, + finalized_miss_counter: CounterVec, call_times: HistogramVec, calls_started: CounterVec, calls_finished: CounterVec, @@ -76,6 +89,22 @@ impl InnerMetrics { &["method"], ) .unwrap(); + let finalized_miss_counter = CounterVec::new( + Opts::new( + "finalized_miss_counter", + "Total number of finalized block misses of RPC requests with block tag", + ), + &["method"], + ) + .unwrap(); + let finalized_query_counter = CounterVec::new( + Opts::new( + "finalized_query_counter", + "Total number of finalized block queries of RPC requests with block tag", + ), + &["method"], + ) + .unwrap(); let call_times = HistogramVec::new( HistogramOpts::new( "rpc_calls_time", @@ -99,6 +128,8 @@ impl InnerMetrics { let closed_session_count = register(closed_counter, registry).unwrap(); let cache_query_counter = register(cache_query_counter, registry).unwrap(); let cache_miss_counter = register(cache_miss_counter, registry).unwrap(); + let finalized_query_counter = register(finalized_query_counter, registry).unwrap(); + let finalized_miss_counter = register(finalized_miss_counter, registry).unwrap(); let call_times = register(call_times, registry).unwrap(); let calls_started = register(calls_started_counter, registry).unwrap(); @@ -107,6 +138,8 @@ impl InnerMetrics { Self { cache_miss_counter, cache_query_counter, + finalized_miss_counter, + finalized_query_counter, open_session_count, closed_session_count, calls_started, @@ -129,4 +162,12 @@ impl InnerMetrics { fn cache_miss(&self, method: &str) { self.cache_miss_counter.with_label_values(&[method]).inc(); } + + fn finalized_query(&self, method: &str) { + self.finalized_query_counter.with_label_values(&[method]).inc(); + } + + fn finalized_miss(&self, method: &str) { + self.finalized_miss_counter.with_label_values(&[method]).inc(); + } } diff --git a/src/middlewares/methods/block_tag.rs b/src/middlewares/methods/block_tag.rs index 338b742..02e66e9 100644 --- a/src/middlewares/methods/block_tag.rs +++ b/src/middlewares/methods/block_tag.rs @@ -6,6 +6,7 @@ use opentelemetry_semantic_conventions as semconv; use crate::{ extensions::api::EthApi, + extensions::prometheus::{get_rpc_metrics, RpcMetrics}, middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER}, utils::{TypeRegistry, TypeRegistryRef}, }; @@ -15,6 +16,7 @@ use super::cache::BypassCache; pub struct BlockTagMiddleware { api: Arc, index: usize, + metrics: RpcMetrics, } #[async_trait] @@ -24,6 +26,7 @@ impl MiddlewareBuilder for BlockTagMiddlewar extensions: &TypeRegistryRef, ) -> Option>> { let index = method.params.iter().position(|p| p.ty == "BlockTag" && p.inject)?; + let metrics = get_rpc_metrics(extensions).await; let eth_api = extensions .read() @@ -31,13 +34,13 @@ impl MiddlewareBuilder for BlockTagMiddlewar .get::() .expect("EthApi extension not found"); - Some(Box::new(BlockTagMiddleware::new(eth_api, index))) + Some(Box::new(BlockTagMiddleware::new(eth_api, index, metrics))) } } impl BlockTagMiddleware { - pub fn new(api: Arc, index: usize) -> Self { - Self { api, index } + pub fn new(api: Arc, index: usize, metrics: RpcMetrics) -> Self { + Self { api, index, metrics } } async fn replace(&self, mut request: CallRequest, mut context: TypeRegistry) -> (CallRequest, TypeRegistry) { @@ -47,12 +50,14 @@ impl BlockTagMiddleware { // nothing to do here return (request, context); } + self.metrics.finalized_query(&request.method); match param.as_str().unwrap_or_default() { "finalized" => { let finalized_head = self.api.current_finalized_head(); if let Some((_, finalized_number)) = finalized_head { Some(format!("0x{:x}", finalized_number).into()) } else { + self.metrics.finalized_miss(&request.method); // cannot determine finalized context.insert(BypassCache(true)); None @@ -60,12 +65,14 @@ impl BlockTagMiddleware { } "latest" => { // bypass cache for latest block to avoid caching forks + self.metrics.finalized_miss(&request.method); context.insert(BypassCache(true)); let (_, number) = self.api.get_head().read().await; Some(format!("0x{:x}", number).into()) } "earliest" => None, // no need to replace earliest because it's always going to be genesis "pending" | "safe" => { + self.metrics.finalized_miss(&request.method); context.insert(BypassCache(true)); None } @@ -77,9 +84,13 @@ impl BlockTagMiddleware { if let Ok(number) = u64::from_str_radix(hex_number, 16) { if number <= finalized_number { bypass_cache = false; + } else { + self.metrics.finalized_miss(&request.method); } } } + } else { + self.metrics.finalized_miss(&request.method); } if bypass_cache { context.insert(BypassCache(true)); @@ -191,7 +202,11 @@ mod tests { let (context, api) = create_client().await; ( - BlockTagMiddleware::new(Arc::new(api), params.iter().position(|p| p.ty == "BlockTag").unwrap()), + BlockTagMiddleware::new( + Arc::new(api), + params.iter().position(|p| p.ty == "BlockTag").unwrap(), + RpcMetrics::noop(), + ), context, ) }