Skip to content

Commit

Permalink
refactor: use generic manager
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Jan 24, 2025
1 parent 5decbd1 commit 94754d1
Show file tree
Hide file tree
Showing 24 changed files with 251 additions and 248 deletions.
179 changes: 89 additions & 90 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ axum = { version = "0.7.9", default-features = false, features = [
"http1",
"http2",
] }
tokio = "1.40"
tokio = "1"
prometheus = "0.13.3"
anyhow = { version = "1.0.72" }
thiserror = "1.0.49"
Expand Down Expand Up @@ -52,22 +52,22 @@ uuid = { version = "1.11.0", features = ["v7"] }
tracing = { version = "0.1.40", default-features = false }
bigdecimal = "0.4.3"
build-info = "0.0.39"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false }
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1fc51a3", default-features = false }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1fc51a3", default-features = false }
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
"ansi",
], default-features = false }
thegraph-core = { version = "0.10.0", features = [
thegraph-core = { git = "https://github.com/edgeandnode/toolshed", rev = "a1d0509", features = [
"attestation",
"alloy-eip712",
"alloy-sol-types",
"alloy-rlp",
"alloy-signers",
"alloy-signer-local",
"alloy-signer-mnemonic",
"serde"
"serde",
] }
thegraph-graphql-http = { version = "0.3.2", features = ["reqwest"] }
graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
Expand Down
16 changes: 8 additions & 8 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ pub mod server;
pub mod store;

use store::AgreementStore;
use uuid::Uuid;

use thiserror::Error;
use uuid::Uuid;

sol! {
// EIP712 encoded bytes, ABI - ethers
Expand Down Expand Up @@ -264,9 +263,10 @@ pub async fn validate_and_cancel_agreement(

#[cfg(test)]
mod test {
use std::sync::Arc;

use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use thegraph_core::{
alloy::{
Expand All @@ -276,12 +276,12 @@ mod test {
},
attestation::eip712_domain,
};

use crate::{CancellationRequest, DipsError};
use crate::{IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata};
use uuid::Uuid;

pub use crate::store::{AgreementStore, InMemoryAgreementStore};
use crate::{
CancellationRequest, DipsError, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
};

#[tokio::test]
async fn test_validate_and_create_agreement() -> anyhow::Result<()> {
Expand Down
9 changes: 5 additions & 4 deletions crates/dips/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

use std::{str::FromStr, sync::Arc, time::Duration};

use anyhow::anyhow;
use async_trait::async_trait;
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
use uuid::Uuid;

use crate::{
proto::graphprotocol::indexer::dips::*, store::AgreementStore, validate_and_cancel_agreement,
validate_and_create_agreement, DipsError,
};
use anyhow::anyhow;
use async_trait::async_trait;
use thegraph_core::alloy::{dyn_abi::Eip712Domain, primitives::Address};
use uuid::Uuid;

#[derive(Debug)]
pub struct DipsServer {
Expand Down
2 changes: 0 additions & 2 deletions crates/service/src/database/dips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
use anyhow::bail;
use axum::async_trait;
use build_info::chrono::Utc;

use indexer_dips::{
store::AgreementStore, SignedCancellationRequest, SignedIndexingAgreementVoucher,
};

use sqlx::PgPool;
use thegraph_core::alloy::rlp::Decodable;
use uuid::Uuid;
Expand Down
11 changes: 6 additions & 5 deletions crates/service/src/middleware/auth/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use axum::{
};
use tap_core::{
manager::{adapters::ReceiptStore, Manager},
rav::ReceiptAggregateVoucher,
receipt::{Context, SignedReceipt},
};
use tower_http::auth::AsyncAuthorizeRequest;
Expand All @@ -30,7 +31,7 @@ use crate::{error::IndexerServiceError, middleware::prometheus_metrics::MetricLa
///
/// Requires SignedReceipt, MetricLabels and Arc<Context> extensions
pub fn tap_receipt_authorize<T, B>(
tap_manager: Arc<Manager<T>>,
tap_manager: Arc<Manager<T, SignedReceipt, ReceiptAggregateVoucher>>,
failed_receipt_metric: &'static prometheus::CounterVec,
) -> impl AsyncAuthorizeRequest<
B,
Expand All @@ -40,7 +41,7 @@ pub fn tap_receipt_authorize<T, B>(
> + Clone
+ Send
where
T: ReceiptStore + Sync + Send + 'static,
T: ReceiptStore<SignedReceipt> + Sync + Send + 'static,
B: Send,
{
move |request: Request<B>| {
Expand Down Expand Up @@ -91,7 +92,7 @@ mod tests {
receipt::{
checks::{Check, CheckError, CheckList, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
},
};
use test_assets::{
Expand Down Expand Up @@ -133,11 +134,11 @@ mod tests {

struct MyCheck;
#[async_trait::async_trait]
impl Check for MyCheck {
impl Check<SignedReceipt> for MyCheck {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
if receipt.signed_receipt().message.nonce == FAILED_NONCE {
Err(CheckError::Failed(anyhow::anyhow!("Failed")))
Expand Down
4 changes: 2 additions & 2 deletions crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};

use anyhow::anyhow;
use axum::{extract::Request, serve, ServiceExt};
use clap::Parser;
use indexer_config::{Config, DipsConfig, GraphNodeConfig, SubgraphConfig};
use indexer_dips::{
proto::graphprotocol::indexer::dips::agreement_service_server::{
Expand All @@ -19,14 +20,13 @@ use reqwest::Url;
use tap_core::tap_eip712_domain;
use tokio::{net::TcpListener, signal};
use tower_http::normalize_path::NormalizePath;
use tracing::info;

use crate::{
cli::Cli,
database::{self, dips::PsqlAgreementStore},
metrics::serve_metrics,
};
use clap::Parser;
use tracing::info;

mod release;
mod router;
Expand Down
4 changes: 2 additions & 2 deletions crates/service/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use indexer_allocation::Allocation;
use indexer_monitor::EscrowAccounts;
use receipt_store::{DatabaseReceipt, InnerContext};
use sqlx::PgPool;
use tap_core::receipt::checks::ReceiptCheck;
use tap_core::receipt::{checks::ReceiptCheck, SignedReceipt};
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
use tokio::sync::{
mpsc::{self, Sender},
Expand Down Expand Up @@ -48,7 +48,7 @@ impl IndexerTapContext {
escrow_accounts: Receiver<EscrowAccounts>,
timestamp_error_tolerance: Duration,
receipt_max_value: u128,
) -> Vec<ReceiptCheck> {
) -> Vec<ReceiptCheck<SignedReceipt>> {
vec![
Arc::new(AllocationEligible::new(indexer_allocations)),
Arc::new(SenderBalanceCheck::new(escrow_accounts)),
Expand Down
6 changes: 3 additions & 3 deletions crates/service/src/tap/checks/allocation_eligible.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use indexer_allocation::Allocation;
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};
use thegraph_core::alloy::primitives::Address;
use tokio::sync::watch::Receiver;
Expand All @@ -25,11 +25,11 @@ impl AllocationEligible {
}
}
#[async_trait::async_trait]
impl Check for AllocationEligible {
impl Check<SignedReceipt> for AllocationEligible {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let allocation_id = receipt.signed_receipt().message.allocation_id;
if !self
Expand Down
6 changes: 3 additions & 3 deletions crates/service/src/tap/checks/deny_list_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use sqlx::{postgres::PgListener, PgPool};
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};
use thegraph_core::alloy::primitives::Address;

Expand Down Expand Up @@ -153,11 +153,11 @@ impl DenyListCheck {
}

#[async_trait::async_trait]
impl Check for DenyListCheck {
impl Check<SignedReceipt> for DenyListCheck {
async fn check(
&self,
ctx: &tap_core::receipt::Context,
_: &ReceiptWithState<Checking>,
_: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let Sender(receipt_sender) = ctx
.get::<Sender>()
Expand Down
12 changes: 7 additions & 5 deletions crates/service/src/tap/checks/receipt_max_val_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct ReceiptMaxValueCheck {
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};

impl ReceiptMaxValueCheck {
Expand All @@ -19,11 +19,11 @@ impl ReceiptMaxValueCheck {
}

#[async_trait::async_trait]
impl Check for ReceiptMaxValueCheck {
impl Check<SignedReceipt> for ReceiptMaxValueCheck {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let receipt_value = receipt.signed_receipt().message.value;

Expand Down Expand Up @@ -54,7 +54,9 @@ mod tests {
use super::*;
use crate::tap::Eip712Domain;

fn create_signed_receipt_with_custom_value(value: u128) -> ReceiptWithState<Checking> {
fn create_signed_receipt_with_custom_value(
value: u128,
) -> ReceiptWithState<Checking, SignedReceipt> {
let index: u32 = 0;
let wallet: PrivateKeySigner = MnemonicBuilder::<English>::default()
.phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about")
Expand Down Expand Up @@ -86,7 +88,7 @@ mod tests {
&wallet,
)
.unwrap();
ReceiptWithState::<Checking>::new(receipt)
ReceiptWithState::new(receipt)
}

const RECEIPT_LIMIT: u128 = 10;
Expand Down
6 changes: 3 additions & 3 deletions crates/service/src/tap/checks/sender_balance_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use indexer_monitor::EscrowAccounts;
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};
use thegraph_core::alloy::primitives::U256;
use tokio::sync::watch::Receiver;
Expand All @@ -24,11 +24,11 @@ impl SenderBalanceCheck {
}

#[async_trait::async_trait]
impl Check for SenderBalanceCheck {
impl Check<SignedReceipt> for SenderBalanceCheck {
async fn check(
&self,
ctx: &tap_core::receipt::Context,
_: &ReceiptWithState<Checking>,
_: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let escrow_accounts_snapshot = self.escrow_accounts.borrow();

Expand Down
14 changes: 8 additions & 6 deletions crates/service/src/tap/checks/timestamp_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct TimestampCheck {
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
ReceiptWithState, SignedReceipt,
};

impl TimestampCheck {
Expand All @@ -23,11 +23,11 @@ impl TimestampCheck {
}

#[async_trait::async_trait]
impl Check for TimestampCheck {
impl Check<SignedReceipt> for TimestampCheck {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let timestamp_now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand All @@ -52,7 +52,9 @@ mod tests {
use std::time::{Duration, SystemTime};

use tap_core::{
receipt::{checks::Check, state::Checking, Context, Receipt, ReceiptWithState},
receipt::{
checks::Check, state::Checking, Context, Receipt, ReceiptWithState, SignedReceipt,
},
signed_message::EIP712SignedMessage,
tap_eip712_domain,
};
Expand All @@ -66,7 +68,7 @@ mod tests {

fn create_signed_receipt_with_custom_timestamp(
timestamp_ns: u64,
) -> ReceiptWithState<Checking> {
) -> ReceiptWithState<Checking, SignedReceipt> {
let index: u32 = 0;
let wallet: PrivateKeySigner = MnemonicBuilder::<English>::default()
.phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about")
Expand All @@ -89,7 +91,7 @@ mod tests {
&wallet,
)
.unwrap();
ReceiptWithState::<Checking>::new(receipt)
ReceiptWithState::new(receipt)
}

#[tokio::test]
Expand Down
10 changes: 7 additions & 3 deletions crates/service/src/tap/checks/value_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use sqlx::{
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
Context, ReceiptWithState,
Context, ReceiptWithState, SignedReceipt,
};
use thegraph_core::DeploymentId;

Expand Down Expand Up @@ -303,8 +303,12 @@ impl MinimumValue {
}

#[async_trait::async_trait]
impl Check for MinimumValue {
async fn check(&self, ctx: &Context, receipt: &ReceiptWithState<Checking>) -> CheckResult {
impl Check<SignedReceipt> for MinimumValue {
async fn check(
&self,
ctx: &Context,
receipt: &ReceiptWithState<Checking, SignedReceipt>,
) -> CheckResult {
let agora_query = ctx
.get()
.ok_or(CheckError::Failed(anyhow!("Could not find agora query")))?;
Expand Down
Loading

0 comments on commit 94754d1

Please sign in to comment.