Skip to content

Commit

Permalink
export metrics tracking download_metadata_json failure with status
Browse files Browse the repository at this point in the history
fix clippy

cleanup
  • Loading branch information
Nagaprasadvr committed Jan 15, 2025
1 parent c235125 commit 53a7898
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
21 changes: 20 additions & 1 deletion grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ lazy_static::lazy_static! {
"download_metadata_inserted_count", "Total number of inserted tasks for download metadata"
).unwrap();

static ref DOWNLOAD_METADATA_FAILED_COUNT: IntGaugeVec = IntGaugeVec::new(
Opts::new("download_metadata_failed_count", "Status of failed tasks for download metadata"),
&["status"]
).unwrap();

static ref INGEST_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ingest_tasks", "Number of tasks spawned for ingest"),
&["stream", "consumer"]
Expand Down Expand Up @@ -124,6 +129,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT);
register!(INGEST_JOB_TIME);
register!(DOWNLOAD_METADATA_INSERTED_COUNT);
register!(DOWNLOAD_METADATA_FAILED_COUNT);
register!(INGEST_TASKS);
register!(ACK_TASKS);
register!(GRPC_TASKS);
Expand Down Expand Up @@ -175,7 +181,10 @@ fn metrics_handler() -> Response<Body> {
error!("could not encode custom metrics: {}", error);
String::new()
});
Response::builder().body(Body::from(metrics)).unwrap()
Response::builder()
.header("content-type", "text/plain")
.body(Body::from(metrics))
.unwrap()
}

fn not_found_handler() -> Response<Body> {
Expand Down Expand Up @@ -258,6 +267,16 @@ pub fn grpc_tasks_total_dec(label: &str, stream: &str) {
GRPC_TASKS.with_label_values(&[label, stream]).dec()
}

pub fn inc_download_metadata_inserted_count() {
DOWNLOAD_METADATA_INSERTED_COUNT.inc();
}

pub fn download_metadata_json_failed_task_inc(status: u16) {
DOWNLOAD_METADATA_FAILED_COUNT
.with_label_values(&[&status.to_string()])
.inc();
}

#[derive(Debug, Clone, Copy)]
pub enum ProgramTransformerTaskStatusKind {
Success,
Expand Down
23 changes: 17 additions & 6 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use {
crate::{
config::{ConfigIngestStream, REDIS_STREAM_DATA_KEY},
prom::{
ack_tasks_total_dec, ack_tasks_total_inc, ingest_job_time_set, ingest_tasks_total_dec,
ack_tasks_total_dec, ack_tasks_total_inc, download_metadata_json_failed_task_inc,
inc_download_metadata_inserted_count, ingest_job_time_set, ingest_tasks_total_dec,
ingest_tasks_total_inc, program_transformer_task_status_inc, redis_xack_inc,
redis_xlen_set, redis_xread_inc, ProgramTransformerTaskStatusKind,
},
},
das_core::{DownloadMetadata, DownloadMetadataInfo},
das_core::{
DownloadMetadata, DownloadMetadataInfo, FetchMetadataJsonError, MetadataJsonTaskError,
StatusCode,
},
futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt},
program_transformers::{AccountInfo, ProgramTransformer, TransactionInfo},
redis::{
Expand Down Expand Up @@ -195,14 +199,21 @@ impl MessageHandler for DownloadMetadataJsonHandle {
&self,
input: HashMap<String, RedisValue>,
) -> BoxFuture<'static, Result<(), IngestMessageError>> {
inc_download_metadata_inserted_count();
let download_metadata = Arc::clone(&self.0);

Box::pin(async move {
let info = DownloadMetadataInfo::try_parse_msg(input)?;
download_metadata
.handle_download(&info)
.await
.map_err(Into::into)
download_metadata.handle_download(&info).await.map_err(|e| {
if let MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Response {
status: StatusCode::Code(code),
..
}) = &e
{
download_metadata_json_failed_task_inc(code.as_u16());
}
IngestMessageError::DownloadMetadataJson(e)
})
})
}
}
Expand Down

0 comments on commit 53a7898

Please sign in to comment.