From 081d2e8cce8bb5671c453d79e2d92280d5a661c0 Mon Sep 17 00:00:00 2001 From: Petko Pavlovski Date: Mon, 19 Feb 2024 14:58:46 +0000 Subject: [PATCH] feat: Add high level message stats query (#32) --- src/db/resolver.rs | 260 ++++++++++++++++++++++++++++++++++++++-- src/server/model/mod.rs | 18 ++- 2 files changed, 265 insertions(+), 13 deletions(-) diff --git a/src/db/resolver.rs b/src/db/resolver.rs index 7407a1f..7d50964 100644 --- a/src/db/resolver.rs +++ b/src/db/resolver.rs @@ -1,7 +1,7 @@ -use async_graphql::OutputType; +use async_graphql::{OutputType, SimpleObject}; use chrono::Utc; use serde::{de::DeserializeOwned, Serialize}; -use sqlx::{types::Json, PgPool, Row as SqliteRow}; +use sqlx::{types::Json, FromRow, PgPool, Row as SqliteRow}; use std::ops::Deref; use tracing::trace; @@ -20,6 +20,14 @@ pub struct MessageID { id: i64, } +#[allow(dead_code)] +#[derive(FromRow, SimpleObject, Serialize, Debug, Clone)] +pub struct IndexerStats { + graph_account: String, + message_count: i64, + subgraphs_count: i64, +} + // Define graphql type for the Row in Messages impl Row { pub fn get_graphql_row(&self) -> GraphQLRow { @@ -254,16 +262,63 @@ pub async fn list_active_indexers( Ok(rows) } +pub async fn get_indexer_stats( + pool: &PgPool, + indexers: Option>, + from_timestamp: i64, +) -> Result, anyhow::Error> { + let base_query = " + SELECT + message->>'graph_account' as graph_account, + COUNT(*) as message_count, + COUNT(DISTINCT message->>'identifier') as subgraphs_count -- Updated field name + FROM messages + WHERE (CAST(message->>'nonce' AS BIGINT)) > $1"; + + let mut query = String::from(base_query); + + if let Some(ref idxs) = indexers { + let placeholders = idxs + .iter() + .enumerate() + .map(|(i, _)| format!("${}", i + 2)) + .collect::>() + .join(","); + query.push_str(&format!( + " AND (message->>'graph_account') IN ({})", + placeholders + )); + } + + query.push_str(" GROUP BY graph_account"); + + let mut dynamic_query = sqlx::query_as::<_, IndexerStats>(&query).bind(from_timestamp); + + if let Some(indexers) = indexers { + for account in indexers { + dynamic_query = dynamic_query.bind(account); + } + } + + let stats = dynamic_query + .fetch_all(pool) + .await + .map_err(anyhow::Error::new)?; + + Ok(stats) +} + #[cfg(test)] mod tests { - use super::*; use crate::message_types::PublicPoiMessage; + + use super::*; use sqlx::PgPool; - async fn insert_test_data(pool: &PgPool, entries: Vec<(i64, &str)>) { - for (nonce, graph_account) in entries { + async fn insert_test_data(pool: &PgPool, entries: Vec<(i64, &str, &str)>) { + for (nonce, graph_account, identifier) in entries { let message = PublicPoiMessage { - identifier: "QmTamam".to_string(), + identifier: identifier.to_string(), content: "0xText".to_string(), nonce: nonce.try_into().unwrap(), network: "testnet".to_string(), @@ -282,7 +337,11 @@ mod tests { async fn test_list_active_indexers_without_indexers(pool: PgPool) { insert_test_data( &pool, - vec![(1707328517, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550")], + vec![( + 1707328517, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmTamam", + )], ) .await; @@ -302,7 +361,11 @@ mod tests { async fn test_list_active_indexers_with_specific_indexers(pool: PgPool) { insert_test_data( &pool, - vec![(1707328517, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550")], + vec![( + 1707328517, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmTamam", + )], ) .await; @@ -345,7 +408,11 @@ mod tests { let specific_nonce = Utc::now().timestamp(); insert_test_data( &pool, - vec![(specific_nonce, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550")], + vec![( + specific_nonce, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmTamam", + )], ) .await; @@ -366,8 +433,12 @@ mod tests { insert_test_data( &pool, vec![ - (1707328517, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550"), - (1707328518, "some_other_account"), + ( + 1707328517, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmTamam", + ), + (1707328518, "some_other_account", "QmTamam"), ], ) .await; @@ -408,4 +479,171 @@ mod tests { "Result should be empty for non-existent indexers" ); } + + #[sqlx::test(migrations = "./migrations")] + async fn test_get_indexer_stats_without_parameters(pool: PgPool) { + insert_test_data( + &pool, + vec![ + ( + 1707328517, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmTamam", + ), + ( + 1707328518, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244551", + "QmTamam", + ), + ], + ) + .await; + + let from_timestamp = 1707328516; + let indexers = None; + let result = get_indexer_stats(&pool, indexers, from_timestamp) + .await + .expect("Function should complete successfully"); + + // Expected: At least the inserted indexers are returned with their message counts + assert_eq!(result.len(), 2, "Should return stats for all indexers"); + } + + #[sqlx::test(migrations = "./migrations")] + async fn test_get_indexer_stats_with_specific_indexer(pool: PgPool) { + insert_test_data( + &pool, + vec![( + 1707328517, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmTamam", + )], + ) + .await; + + let from_timestamp = 1707328516; + let indexers = Some(vec![ + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550".to_string() + ]); + let result = get_indexer_stats(&pool, indexers, from_timestamp) + .await + .expect("Function should complete successfully"); + + // Expected: Only the specified indexer is returned with its message count + assert_eq!( + result.len(), + 1, + "Should return stats for the specified indexer" + ); + assert!( + result.iter().any(|stat| stat.graph_account + == "0xb4b4570df6f7fe320f10fdfb702dba7e35244550" + && stat.message_count > 0), + "Result should contain the expected graph_account with correct message count" + ); + } + + #[sqlx::test(migrations = "./migrations")] + async fn test_get_indexer_stats_with_multiple_indexers(pool: PgPool) { + insert_test_data( + &pool, + vec![ + ( + 1707328517, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmTamam", + ), + ( + 1707328518, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244551", + "QmTamam", + ), + ], + ) + .await; + + let from_timestamp = 1707328516; + let indexers = Some(vec![ + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550".to_string(), + "0xb4b4570df6f7fe320f10fdfb702dba7e35244551".to_string(), + ]); + let result = get_indexer_stats(&pool, indexers, from_timestamp) + .await + .expect("Function should complete successfully"); + + // Expected: Stats for both specified indexers are returned + assert_eq!( + result.len(), + 2, + "Should return stats for the specified indexers" + ); + } + + #[sqlx::test(migrations = "./migrations")] + async fn test_get_indexer_stats_no_matching_records(pool: PgPool) { + // Assuming a very high timestamp to ensure no records match + let from_timestamp = Utc::now().timestamp() + 10000; + let indexers = None; + let result = get_indexer_stats(&pool, indexers, from_timestamp) + .await + .expect("Function should complete successfully"); + + // Expected: No stats are returned since no records match the given timestamp + assert!( + result.is_empty(), + "Result should be empty when no records match criteria" + ); + } + + #[sqlx::test(migrations = "./migrations")] + async fn test_get_indexer_stats_with_specific_counts(pool: PgPool) { + // Insert test data with known outcomes + insert_test_data( + &pool, + vec![ + // Inserting 2 messages for the same graph_account with the same identifier (counts as 1 unique subgraph) + ( + 1707328517, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmUnique1", + ), + ( + 1707328518, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550", + "QmUnique1", + ), + // Inserting 1 message for another graph_account with a different identifier + ( + 1707328519, + "0xb4b4570df6f7fe320f10fdfb702dba7e35244551", + "QmUnique2", + ), + ], + ) + .await; + + let from_timestamp = 1707328516; // Ensure all inserted records are considered + let indexers = Some(vec![ + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550".to_string(), + "0xb4b4570df6f7fe320f10fdfb702dba7e35244551".to_string(), + ]); + let result = get_indexer_stats(&pool, indexers, from_timestamp) + .await + .expect("Function should complete successfully"); + + // Asserting on the expected message_count and subgraphs_count + for stat in result { + match stat.graph_account.as_str() { + "0xb4b4570df6f7fe320f10fdfb702dba7e35244550" => { + assert_eq!(stat.message_count, 2, "The message count should be 2 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244550"); + assert_eq!(stat.subgraphs_count, 1, "The subgraphs count should be 1 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244550 because both messages share the same identifier"); + } + "0xb4b4570df6f7fe320f10fdfb702dba7e35244551" => { + assert_eq!(stat.message_count, 1, "The message count should be 1 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244551"); + assert_eq!(stat.subgraphs_count, 1, "The subgraphs count should also be 1 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244551 as there is only one message with a unique identifier"); + } + _ => panic!("Unexpected graph_account found in the result"), + } + } + } } diff --git a/src/server/model/mod.rs b/src/server/model/mod.rs index 7708566..1feeb2b 100644 --- a/src/server/model/mod.rs +++ b/src/server/model/mod.rs @@ -9,8 +9,8 @@ use thiserror::Error; use crate::{ config::Config, db::resolver::{ - delete_message_all, delete_message_by_id, list_active_indexers, list_messages, list_rows, - message_by_id, + delete_message_all, delete_message_by_id, get_indexer_stats, list_active_indexers, + list_messages, list_rows, message_by_id, IndexerStats, }, operator::radio_types::RadioPayloadMessage, }; @@ -73,6 +73,20 @@ impl QueryRoot { Ok(active_indexers) } + async fn query_indexer_stats( + &self, + ctx: &Context<'_>, + indexers: Option>, + minutes_ago: Option, + ) -> Result, HttpServiceError> { + let pool = ctx.data_unchecked::>(); + let minutes_ago = minutes_ago.unwrap_or(1440); + let from_timestamp = (Utc::now() - Duration::minutes(minutes_ago)).timestamp(); + + let stats = get_indexer_stats(pool, indexers, from_timestamp).await?; + Ok(stats) + } + /// Grab a row from db by db entry id async fn row( &self,