Skip to content

Commit

Permalink
feat: add metrics for finalized/unfinalized block rpc requests (#41)
Browse files Browse the repository at this point in the history
* feat: add metrics for finalized/unfinalized block rpc requests

* fix

* update description
  • Loading branch information
koushiro authored Jul 12, 2024
1 parent b12b2c8 commit a56e483
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
41 changes: 41 additions & 0 deletions src/extensions/prometheus/rpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ impl RpcMetrics {
}
}

pub fn finalized_query(&self, method: &str) {
if let Self::Prometheus(inner) = self {
inner.finalized_query(method);
}
}
pub fn finalized_miss(&self, method: &str) {
if let Self::Prometheus(inner) = self {
inner.finalized_miss(method);
}
}

pub fn call_metrics(&self) -> Option<(HistogramVec, CounterVec<U64>, CounterVec<U64>)> {
if let Self::Prometheus(inner) = self {
return Some((
Expand All @@ -57,6 +68,8 @@ pub struct InnerMetrics {
closed_session_count: Counter<U64>,
cache_query_counter: CounterVec<U64>,
cache_miss_counter: CounterVec<U64>,
finalized_query_counter: CounterVec<U64>,
finalized_miss_counter: CounterVec<U64>,
call_times: HistogramVec,
calls_started: CounterVec<U64>,
calls_finished: CounterVec<U64>,
Expand All @@ -76,6 +89,22 @@ impl InnerMetrics {
&["method"],
)
.unwrap();
let finalized_miss_counter = CounterVec::new(
Opts::new(
"finalized_miss_counter",
"Total number of finalized block misses of RPC requests with block tag",
),
&["method"],
)
.unwrap();
let finalized_query_counter = CounterVec::new(
Opts::new(
"finalized_query_counter",
"Total number of finalized block queries of RPC requests with block tag",
),
&["method"],
)
.unwrap();
let call_times = HistogramVec::new(
HistogramOpts::new(
"rpc_calls_time",
Expand All @@ -99,6 +128,8 @@ impl InnerMetrics {
let closed_session_count = register(closed_counter, registry).unwrap();
let cache_query_counter = register(cache_query_counter, registry).unwrap();
let cache_miss_counter = register(cache_miss_counter, registry).unwrap();
let finalized_query_counter = register(finalized_query_counter, registry).unwrap();
let finalized_miss_counter = register(finalized_miss_counter, registry).unwrap();

let call_times = register(call_times, registry).unwrap();
let calls_started = register(calls_started_counter, registry).unwrap();
Expand All @@ -107,6 +138,8 @@ impl InnerMetrics {
Self {
cache_miss_counter,
cache_query_counter,
finalized_miss_counter,
finalized_query_counter,
open_session_count,
closed_session_count,
calls_started,
Expand All @@ -129,4 +162,12 @@ impl InnerMetrics {
fn cache_miss(&self, method: &str) {
self.cache_miss_counter.with_label_values(&[method]).inc();
}

fn finalized_query(&self, method: &str) {
self.finalized_query_counter.with_label_values(&[method]).inc();
}

fn finalized_miss(&self, method: &str) {
self.finalized_miss_counter.with_label_values(&[method]).inc();
}
}
23 changes: 19 additions & 4 deletions src/middlewares/methods/block_tag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use opentelemetry_semantic_conventions as semconv;

use crate::{
extensions::api::EthApi,
extensions::prometheus::{get_rpc_metrics, RpcMetrics},
middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER},
utils::{TypeRegistry, TypeRegistryRef},
};
Expand All @@ -15,6 +16,7 @@ use super::cache::BypassCache;
pub struct BlockTagMiddleware {
api: Arc<EthApi>,
index: usize,
metrics: RpcMetrics,
}

#[async_trait]
Expand All @@ -24,20 +26,21 @@ impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for BlockTagMiddlewar
extensions: &TypeRegistryRef,
) -> Option<Box<dyn Middleware<CallRequest, CallResult>>> {
let index = method.params.iter().position(|p| p.ty == "BlockTag" && p.inject)?;
let metrics = get_rpc_metrics(extensions).await;

let eth_api = extensions
.read()
.await
.get::<EthApi>()
.expect("EthApi extension not found");

Some(Box::new(BlockTagMiddleware::new(eth_api, index)))
Some(Box::new(BlockTagMiddleware::new(eth_api, index, metrics)))
}
}

impl BlockTagMiddleware {
pub fn new(api: Arc<EthApi>, index: usize) -> Self {
Self { api, index }
pub fn new(api: Arc<EthApi>, index: usize, metrics: RpcMetrics) -> Self {
Self { api, index, metrics }
}

async fn replace(&self, mut request: CallRequest, mut context: TypeRegistry) -> (CallRequest, TypeRegistry) {
Expand All @@ -47,25 +50,29 @@ impl BlockTagMiddleware {
// nothing to do here
return (request, context);
}
self.metrics.finalized_query(&request.method);
match param.as_str().unwrap_or_default() {
"finalized" => {
let finalized_head = self.api.current_finalized_head();
if let Some((_, finalized_number)) = finalized_head {
Some(format!("0x{:x}", finalized_number).into())
} else {
self.metrics.finalized_miss(&request.method);
// cannot determine finalized
context.insert(BypassCache(true));
None
}
}
"latest" => {
// bypass cache for latest block to avoid caching forks
self.metrics.finalized_miss(&request.method);
context.insert(BypassCache(true));
let (_, number) = self.api.get_head().read().await;
Some(format!("0x{:x}", number).into())
}
"earliest" => None, // no need to replace earliest because it's always going to be genesis
"pending" | "safe" => {
self.metrics.finalized_miss(&request.method);
context.insert(BypassCache(true));
None
}
Expand All @@ -77,9 +84,13 @@ impl BlockTagMiddleware {
if let Ok(number) = u64::from_str_radix(hex_number, 16) {
if number <= finalized_number {
bypass_cache = false;
} else {
self.metrics.finalized_miss(&request.method);
}
}
}
} else {
self.metrics.finalized_miss(&request.method);
}
if bypass_cache {
context.insert(BypassCache(true));
Expand Down Expand Up @@ -191,7 +202,11 @@ mod tests {
let (context, api) = create_client().await;

(
BlockTagMiddleware::new(Arc::new(api), params.iter().position(|p| p.ty == "BlockTag").unwrap()),
BlockTagMiddleware::new(
Arc::new(api),
params.iter().position(|p| p.ty == "BlockTag").unwrap(),
RpcMetrics::noop(),
),
context,
)
}
Expand Down

0 comments on commit a56e483

Please sign in to comment.