Skip to content

Commit

Permalink
feat: Add high level message stats query (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger authored Feb 19, 2024
1 parent 0668702 commit 081d2e8
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 13 deletions.
260 changes: 249 additions & 11 deletions src/db/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_graphql::OutputType;
use async_graphql::{OutputType, SimpleObject};
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{types::Json, PgPool, Row as SqliteRow};
use sqlx::{types::Json, FromRow, PgPool, Row as SqliteRow};
use std::ops::Deref;
use tracing::trace;

Expand All @@ -20,6 +20,14 @@ pub struct MessageID {
id: i64,
}

#[allow(dead_code)]
#[derive(FromRow, SimpleObject, Serialize, Debug, Clone)]
pub struct IndexerStats {
graph_account: String,
message_count: i64,
subgraphs_count: i64,
}

// Define graphql type for the Row in Messages
impl<T: Clone + Serialize + DeserializeOwned + OutputType> Row<T> {
pub fn get_graphql_row(&self) -> GraphQLRow<T> {
Expand Down Expand Up @@ -254,16 +262,63 @@ pub async fn list_active_indexers(
Ok(rows)
}

pub async fn get_indexer_stats(
pool: &PgPool,
indexers: Option<Vec<String>>,
from_timestamp: i64,
) -> Result<Vec<IndexerStats>, anyhow::Error> {
let base_query = "
SELECT
message->>'graph_account' as graph_account,
COUNT(*) as message_count,
COUNT(DISTINCT message->>'identifier') as subgraphs_count -- Updated field name
FROM messages
WHERE (CAST(message->>'nonce' AS BIGINT)) > $1";

let mut query = String::from(base_query);

if let Some(ref idxs) = indexers {
let placeholders = idxs
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 2))
.collect::<Vec<_>>()
.join(",");
query.push_str(&format!(
" AND (message->>'graph_account') IN ({})",
placeholders
));
}

query.push_str(" GROUP BY graph_account");

let mut dynamic_query = sqlx::query_as::<_, IndexerStats>(&query).bind(from_timestamp);

if let Some(indexers) = indexers {
for account in indexers {
dynamic_query = dynamic_query.bind(account);
}
}

let stats = dynamic_query
.fetch_all(pool)
.await
.map_err(anyhow::Error::new)?;

Ok(stats)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::message_types::PublicPoiMessage;

use super::*;
use sqlx::PgPool;

async fn insert_test_data(pool: &PgPool, entries: Vec<(i64, &str)>) {
for (nonce, graph_account) in entries {
async fn insert_test_data(pool: &PgPool, entries: Vec<(i64, &str, &str)>) {
for (nonce, graph_account, identifier) in entries {
let message = PublicPoiMessage {
identifier: "QmTamam".to_string(),
identifier: identifier.to_string(),
content: "0xText".to_string(),
nonce: nonce.try_into().unwrap(),
network: "testnet".to_string(),
Expand All @@ -282,7 +337,11 @@ mod tests {
async fn test_list_active_indexers_without_indexers(pool: PgPool) {
insert_test_data(
&pool,
vec![(1707328517, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550")],
vec![(
1707328517,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmTamam",
)],
)
.await;

Expand All @@ -302,7 +361,11 @@ mod tests {
async fn test_list_active_indexers_with_specific_indexers(pool: PgPool) {
insert_test_data(
&pool,
vec![(1707328517, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550")],
vec![(
1707328517,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmTamam",
)],
)
.await;

Expand Down Expand Up @@ -345,7 +408,11 @@ mod tests {
let specific_nonce = Utc::now().timestamp();
insert_test_data(
&pool,
vec![(specific_nonce, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550")],
vec![(
specific_nonce,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmTamam",
)],
)
.await;

Expand All @@ -366,8 +433,12 @@ mod tests {
insert_test_data(
&pool,
vec![
(1707328517, "0xb4b4570df6f7fe320f10fdfb702dba7e35244550"),
(1707328518, "some_other_account"),
(
1707328517,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmTamam",
),
(1707328518, "some_other_account", "QmTamam"),
],
)
.await;
Expand Down Expand Up @@ -408,4 +479,171 @@ mod tests {
"Result should be empty for non-existent indexers"
);
}

#[sqlx::test(migrations = "./migrations")]
async fn test_get_indexer_stats_without_parameters(pool: PgPool) {
insert_test_data(
&pool,
vec![
(
1707328517,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmTamam",
),
(
1707328518,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244551",
"QmTamam",
),
],
)
.await;

let from_timestamp = 1707328516;
let indexers = None;
let result = get_indexer_stats(&pool, indexers, from_timestamp)
.await
.expect("Function should complete successfully");

// Expected: At least the inserted indexers are returned with their message counts
assert_eq!(result.len(), 2, "Should return stats for all indexers");
}

#[sqlx::test(migrations = "./migrations")]
async fn test_get_indexer_stats_with_specific_indexer(pool: PgPool) {
insert_test_data(
&pool,
vec![(
1707328517,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmTamam",
)],
)
.await;

let from_timestamp = 1707328516;
let indexers = Some(vec![
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550".to_string()
]);
let result = get_indexer_stats(&pool, indexers, from_timestamp)
.await
.expect("Function should complete successfully");

// Expected: Only the specified indexer is returned with its message count
assert_eq!(
result.len(),
1,
"Should return stats for the specified indexer"
);
assert!(
result.iter().any(|stat| stat.graph_account
== "0xb4b4570df6f7fe320f10fdfb702dba7e35244550"
&& stat.message_count > 0),
"Result should contain the expected graph_account with correct message count"
);
}

#[sqlx::test(migrations = "./migrations")]
async fn test_get_indexer_stats_with_multiple_indexers(pool: PgPool) {
insert_test_data(
&pool,
vec![
(
1707328517,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmTamam",
),
(
1707328518,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244551",
"QmTamam",
),
],
)
.await;

let from_timestamp = 1707328516;
let indexers = Some(vec![
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550".to_string(),
"0xb4b4570df6f7fe320f10fdfb702dba7e35244551".to_string(),
]);
let result = get_indexer_stats(&pool, indexers, from_timestamp)
.await
.expect("Function should complete successfully");

// Expected: Stats for both specified indexers are returned
assert_eq!(
result.len(),
2,
"Should return stats for the specified indexers"
);
}

#[sqlx::test(migrations = "./migrations")]
async fn test_get_indexer_stats_no_matching_records(pool: PgPool) {
// Assuming a very high timestamp to ensure no records match
let from_timestamp = Utc::now().timestamp() + 10000;
let indexers = None;
let result = get_indexer_stats(&pool, indexers, from_timestamp)
.await
.expect("Function should complete successfully");

// Expected: No stats are returned since no records match the given timestamp
assert!(
result.is_empty(),
"Result should be empty when no records match criteria"
);
}

#[sqlx::test(migrations = "./migrations")]
async fn test_get_indexer_stats_with_specific_counts(pool: PgPool) {
// Insert test data with known outcomes
insert_test_data(
&pool,
vec![
// Inserting 2 messages for the same graph_account with the same identifier (counts as 1 unique subgraph)
(
1707328517,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmUnique1",
),
(
1707328518,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550",
"QmUnique1",
),
// Inserting 1 message for another graph_account with a different identifier
(
1707328519,
"0xb4b4570df6f7fe320f10fdfb702dba7e35244551",
"QmUnique2",
),
],
)
.await;

let from_timestamp = 1707328516; // Ensure all inserted records are considered
let indexers = Some(vec![
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550".to_string(),
"0xb4b4570df6f7fe320f10fdfb702dba7e35244551".to_string(),
]);
let result = get_indexer_stats(&pool, indexers, from_timestamp)
.await
.expect("Function should complete successfully");

// Asserting on the expected message_count and subgraphs_count
for stat in result {
match stat.graph_account.as_str() {
"0xb4b4570df6f7fe320f10fdfb702dba7e35244550" => {
assert_eq!(stat.message_count, 2, "The message count should be 2 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244550");
assert_eq!(stat.subgraphs_count, 1, "The subgraphs count should be 1 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244550 because both messages share the same identifier");
}
"0xb4b4570df6f7fe320f10fdfb702dba7e35244551" => {
assert_eq!(stat.message_count, 1, "The message count should be 1 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244551");
assert_eq!(stat.subgraphs_count, 1, "The subgraphs count should also be 1 for graph_account 0xb4b4570df6f7fe320f10fdfb702dba7e35244551 as there is only one message with a unique identifier");
}
_ => panic!("Unexpected graph_account found in the result"),
}
}
}
}
18 changes: 16 additions & 2 deletions src/server/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use thiserror::Error;
use crate::{
config::Config,
db::resolver::{
delete_message_all, delete_message_by_id, list_active_indexers, list_messages, list_rows,
message_by_id,
delete_message_all, delete_message_by_id, get_indexer_stats, list_active_indexers,
list_messages, list_rows, message_by_id, IndexerStats,
},
operator::radio_types::RadioPayloadMessage,
};
Expand Down Expand Up @@ -73,6 +73,20 @@ impl QueryRoot {
Ok(active_indexers)
}

async fn query_indexer_stats(
&self,
ctx: &Context<'_>,
indexers: Option<Vec<String>>,
minutes_ago: Option<i64>,
) -> Result<Vec<IndexerStats>, HttpServiceError> {
let pool = ctx.data_unchecked::<Pool<Postgres>>();
let minutes_ago = minutes_ago.unwrap_or(1440);
let from_timestamp = (Utc::now() - Duration::minutes(minutes_ago)).timestamp();

let stats = get_indexer_stats(pool, indexers, from_timestamp).await?;
Ok(stats)
}

/// Grab a row from db by db entry id
async fn row(
&self,
Expand Down

0 comments on commit 081d2e8

Please sign in to comment.