diff --git a/.sqlx/query-08726aaece34702a956885e927166df6b1f8ad0a0589464b9cbd9f09ca3f38db.json b/.sqlx/query-08726aaece34702a956885e927166df6b1f8ad0a0589464b9cbd9f09ca3f38db.json new file mode 100644 index 00000000..fdb4b97c --- /dev/null +++ b/.sqlx/query-08726aaece34702a956885e927166df6b1f8ad0a0589464b9cbd9f09ca3f38db.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT \n signer_address,\n (\n SELECT ARRAY \n (\n SELECT DISTINCT allocation_id\n FROM scalar_tap_receipts\n WHERE signer_address = top.signer_address\n )\n ) AS allocation_ids\n FROM scalar_tap_receipts AS top\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "signer_address", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "allocation_ids", + "type_info": "BpcharArray" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + null + ] + }, + "hash": "08726aaece34702a956885e927166df6b1f8ad0a0589464b9cbd9f09ca3f38db" +} diff --git a/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json b/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json deleted file mode 100644 index 8d1a90b1..00000000 --- a/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT DISTINCT allocation_id, signer_address\n FROM scalar_tap_receipts\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "allocation_id", - "type_info": "Bpchar" - }, - { - "ordinal": 1, - "name": "signer_address", - "type_info": "Bpchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - }, - "hash": "778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb" -} diff --git a/.sqlx/query-9fb8ca8ec1553951f20f81acdcfe52ad56d1bbac73fbcf5a93808bd4eaafdd4a.json b/.sqlx/query-9fb8ca8ec1553951f20f81acdcfe52ad56d1bbac73fbcf5a93808bd4eaafdd4a.json new file mode 100644 index 00000000..e65597e7 --- /dev/null +++ b/.sqlx/query-9fb8ca8ec1553951f20f81acdcfe52ad56d1bbac73fbcf5a93808bd4eaafdd4a.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT\n sender_address,\n (\n SELECT ARRAY \n (\n SELECT DISTINCT allocation_id\n FROM scalar_tap_ravs\n WHERE sender_address = top.sender_address\n )\n ) AS allocation_id\n FROM scalar_tap_ravs AS top\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sender_address", + "type_info": "Bpchar" + }, + { + "ordinal": 1, + "name": "allocation_id", + "type_info": "BpcharArray" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + null + ] + }, + "hash": "9fb8ca8ec1553951f20f81acdcfe52ad56d1bbac73fbcf5a93808bd4eaafdd4a" +} diff --git a/Cargo.lock b/Cargo.lock index 2fd56b30..b8a6220a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1822,6 +1822,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "enum-as-inner" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffccbb6966c05b32ef8fbac435df276c4ae4d3dc55a8cd0eb9745e6c12f546a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "env_logger" version = "0.9.3" @@ -3135,6 +3147,7 @@ dependencies = [ "clap", "confy", "dotenvy", + "enum-as-inner", "ethereum-types", "ethers-signers", "eventuals", diff --git a/tap-agent/Cargo.toml b/tap-agent/Cargo.toml index d5fb845c..e17b4606 100644 --- a/tap-agent/Cargo.toml +++ b/tap-agent/Cargo.toml @@ -46,6 +46,7 @@ tracing-subscriber = { version = "0.3", features = [ "std", "json", ] } +enum-as-inner = "0.6.0" [dev-dependencies] ethers-signers = "2.0.8" diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index 8789138e..345e5696 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -9,11 +9,10 @@ use indexer_common::prelude::{ }; use crate::{ - aggregator_endpoints, config, database, - tap::sender_allocation_relationships_manager::SenderAllocationRelationshipsManager, + aggregator_endpoints, config, database, tap::sender_accounts_manager::SenderAccountsManager, }; -pub async fn start_agent(config: &'static config::Cli) -> SenderAllocationRelationshipsManager { +pub async fn start_agent(config: &'static config::Cli) -> SenderAccountsManager { let pgpool = database::connect(&config.postgres).await; let http_client = reqwest::Client::new(); @@ -80,7 +79,7 @@ pub async fn start_agent(config: &'static config::Cli) -> SenderAllocationRelati verifying_contract: config.receipts.receipts_verifier_address, }; - SenderAllocationRelationshipsManager::new( + SenderAccountsManager::new( config, pgpool, indexer_allocations, diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index ab4e9998..7afd0d03 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -11,8 +11,10 @@ mod escrow_adapter; mod rav_storage_adapter; mod receipt_checks_adapter; mod receipt_storage_adapter; -mod sender_allocation_relationship; -pub mod sender_allocation_relationships_manager; +mod sender_account; +pub mod sender_accounts_manager; +mod sender_allocation; +mod unaggregated_receipts; #[cfg(test)] pub mod test_utils; diff --git a/tap-agent/src/tap/rav_storage_adapter.rs b/tap-agent/src/tap/rav_storage_adapter.rs index 7bdbd898..3a464f19 100644 --- a/tap-agent/src/tap/rav_storage_adapter.rs +++ b/tap-agent/src/tap/rav_storage_adapter.rs @@ -91,18 +91,18 @@ impl RAVStorageAdapter { #[cfg(test)] mod test { use super::*; - use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER, SIGNER}; + use crate::tap::test_utils::{create_rav, ALLOCATION_ID_0, SENDER, SIGNER}; use tap_core::adapters::rav_storage_adapter::RAVStorageAdapter as RAVStorageAdapterTrait; #[sqlx::test(migrations = "../migrations")] async fn update_and_retrieve_rav(pool: PgPool) { let timestamp_ns = u64::MAX - 10; let value_aggregate = u128::MAX; - let rav_storage_adapter = RAVStorageAdapter::new(pool.clone(), *ALLOCATION_ID, SENDER.1); + let rav_storage_adapter = RAVStorageAdapter::new(pool.clone(), *ALLOCATION_ID_0, SENDER.1); // Insert a rav let mut new_rav = create_rav( - *ALLOCATION_ID, + *ALLOCATION_ID_0, SIGNER.0.clone(), timestamp_ns, value_aggregate, @@ -121,7 +121,7 @@ mod test { // Update the RAV 3 times in quick succession for i in 0..3 { new_rav = create_rav( - *ALLOCATION_ID, + *ALLOCATION_ID_0, SIGNER.0.clone(), timestamp_ns + i, value_aggregate - (i as u128), diff --git a/tap-agent/src/tap/receipt_storage_adapter.rs b/tap-agent/src/tap/receipt_storage_adapter.rs index ffff9572..64cb0392 100644 --- a/tap-agent/src/tap/receipt_storage_adapter.rs +++ b/tap-agent/src/tap/receipt_storage_adapter.rs @@ -194,7 +194,7 @@ mod test { use super::*; use crate::tap::test_utils::{ - create_received_receipt, store_receipt, ALLOCATION_ID, ALLOCATION_ID_IRRELEVANT, SENDER, + create_received_receipt, store_receipt, ALLOCATION_ID_0, ALLOCATION_ID_IRRELEVANT, SENDER, SENDER_IRRELEVANT, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; use anyhow::Result; @@ -381,7 +381,7 @@ mod test { let storage_adapter = ReceiptStorageAdapter::new( pgpool.clone(), - *ALLOCATION_ID, + *ALLOCATION_ID_0, SENDER.1, get_full_list_of_checks(), escrow_accounts.clone(), @@ -392,7 +392,7 @@ mod test { for i in 0..10 { received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 42, @@ -416,7 +416,7 @@ mod test { ); received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, i + 42, @@ -525,7 +525,7 @@ mod test { let storage_adapter = ReceiptStorageAdapter::new( pgpool, - *ALLOCATION_ID, + *ALLOCATION_ID_0, SENDER.1, get_full_list_of_checks(), escrow_accounts.clone(), @@ -536,7 +536,7 @@ mod test { for i in 0..10 { received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 42, @@ -560,7 +560,7 @@ mod test { ); received_receipt_vec.push( create_received_receipt( - &ALLOCATION_ID, + &ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, i + 42, diff --git a/tap-agent/src/tap/sender_account.rs b/tap-agent/src/tap/sender_account.rs new file mode 100644 index 00000000..1fd8150b --- /dev/null +++ b/tap-agent/src/tap/sender_account.rs @@ -0,0 +1,951 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Mutex as StdMutex; +use std::{ + cmp::max, + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use alloy_sol_types::Eip712Domain; +use anyhow::{anyhow, Result}; +use enum_as_inner::EnumAsInner; +use eventuals::Eventual; +use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; +use sqlx::PgPool; +use thegraph::types::Address; +use tokio::sync::Mutex as TokioMutex; +use tokio::{select, sync::Notify, time}; +use tracing::{error, warn}; + +use crate::config::{self}; +use crate::tap::{ + escrow_adapter::EscrowAdapter, sender_accounts_manager::NewReceiptNotification, + sender_allocation::SenderAllocation, unaggregated_receipts::UnaggregatedReceipts, +}; + +#[derive(Clone, EnumAsInner)] +enum AllocationState { + Active(Arc), + Ineligible(Arc), +} + +/// The inner state of a SenderAccount. This is used to store an Arc state for spawning async tasks. +pub struct Inner { + config: &'static config::Cli, + pgpool: PgPool, + allocations: Arc>>, + sender: Address, + sender_aggregator_endpoint: String, + unaggregated_fees: Arc>, + unaggregated_receipts_guard: Arc>, +} + +impl Inner { + async fn rav_requester(&self, notif_value_trigger: Arc) { + loop { + notif_value_trigger.notified().await; + + self.rav_requester_single().await; + + // Check if we already need to send another RAV request. + let unaggregated_fees = self.unaggregated_fees.lock().unwrap().clone(); + if unaggregated_fees.value >= self.config.tap.rav_request_trigger_value.into() { + // If so, "self-notify" to trigger another RAV request. + notif_value_trigger.notify_one(); + + warn!( + "Sender {} has {} unaggregated fees immediately after a RAV request, which is + over the trigger value. Triggering another RAV request.", + self.sender, unaggregated_fees.value, + ); + } + } + } + + async fn rav_requester_finalize(&self, notif_finalize_allocations: Arc) { + loop { + // Wait for either 5 minutes or a notification that we need to try to finalize + // allocation receipts. + select! { + _ = time::sleep(Duration::from_secs(300)) => (), + _ = notif_finalize_allocations.notified() => () + } + + // Get a quick snapshot of the current finalizing allocations. They are + // Arcs, so it should be cheap. + let allocations_finalizing = self + .allocations + .lock() + .unwrap() + .values() + .filter(|a| matches!(a, AllocationState::Ineligible(_))) + .map(|a| a.as_ineligible().unwrap()) + .cloned() + .collect::>(); + + for allocation in allocations_finalizing { + if let Err(e) = allocation.rav_requester_single().await { + error!( + "Error while requesting RAV for sender {} and allocation {}: {}", + self.sender, + allocation.get_allocation_id(), + e + ); + continue; + } + + if let Err(e) = allocation.mark_rav_final().await { + error!( + "Error while marking allocation {} as final for sender {}: {}", + allocation.get_allocation_id(), + self.sender, + e + ); + continue; + } + + // Remove the allocation from the finalizing map. + self.allocations + .lock() + .unwrap() + .remove(&allocation.get_allocation_id()); + } + } + } + + /// Does a single RAV request for the sender's allocation with the highest unaggregated fees + async fn rav_requester_single(&self) { + let heaviest_allocation = match self.get_heaviest_allocation() { + Ok(a) => a, + Err(e) => { + error!( + "Error while getting allocation with most unaggregated fees: {}", + e + ); + return; + } + }; + + if let Err(e) = heaviest_allocation.rav_requester_single().await { + error!( + "Error while requesting RAV for sender {} and allocation {}: {}", + self.sender, + heaviest_allocation.get_allocation_id(), + e + ); + return; + }; + + if let Err(e) = self.recompute_unaggregated_fees().await { + error!( + "Error while recomputing unaggregated fees for sender {}: {}", + self.sender, e + ); + } + } + + /// Returns the allocation with the highest unaggregated fees value. + fn get_heaviest_allocation(&self) -> Result> { + // Get a quick snapshot of all allocations. They are Arcs, so it should be cheap, + // and we don't want to hold the lock for too long. + let allocations: Vec<_> = self.allocations.lock().unwrap().values().cloned().collect(); + + let mut heaviest_allocation = (None, 0u128); + for allocation in allocations { + let allocation: Arc = match allocation { + AllocationState::Active(a) => a, + AllocationState::Ineligible(a) => a, + }; + let fees = allocation.get_unaggregated_fees().value; + if fees > heaviest_allocation.1 { + heaviest_allocation = (Some(allocation), fees); + } + } + + heaviest_allocation + .0 + .ok_or(anyhow!("Heaviest allocation is None")) + } + + /// Recompute the sender's total unaggregated fees value and last receipt ID. + async fn recompute_unaggregated_fees(&self) -> Result<()> { + // Make sure to pause the handling of receipt notifications while we update the unaggregated + // fees. + let _guard = self.unaggregated_receipts_guard.lock().await; + + // Similar pattern to get_heaviest_allocation(). + let allocations: Vec<_> = self.allocations.lock().unwrap().values().cloned().collect(); + + // Gather the unaggregated fees from all allocations and sum them up. + let mut unaggregated_fees = self.unaggregated_fees.lock().unwrap(); + *unaggregated_fees = UnaggregatedReceipts::default(); // Reset to 0. + for allocation in allocations { + let allocation: Arc = match allocation { + AllocationState::Active(a) => a, + AllocationState::Ineligible(a) => a, + }; + + let uf = allocation.get_unaggregated_fees(); + *unaggregated_fees = UnaggregatedReceipts { + value: self.fees_add(unaggregated_fees.value, uf.value), + last_id: max(unaggregated_fees.last_id, uf.last_id), + }; + } + + Ok(()) + } + + /// Safe add the fees to the unaggregated fees value, log an error if there is an overflow and + /// set the unaggregated fees value to u128::MAX. + fn fees_add(&self, total_unaggregated_fees: u128, value_increment: u128) -> u128 { + total_unaggregated_fees + .checked_add(value_increment) + .unwrap_or_else(|| { + // This should never happen, but if it does, we want to know about it. + error!( + "Overflow when adding receipt value {} to total unaggregated fees {} for \ + sender {}. Setting total unaggregated fees to u128::MAX.", + value_increment, total_unaggregated_fees, self.sender + ); + u128::MAX + }) + } +} + +/// A SenderAccount manages the receipts accounting between the indexer and the sender across +/// multiple allocations. +/// +/// Manages the lifecycle of Scalar TAP for the SenderAccount, including: +/// - Monitoring new receipts and keeping track of the cumulative unaggregated fees across +/// allocations. +/// - Requesting RAVs from the sender's TAP aggregator once the cumulative unaggregated fees reach a +/// certain threshold. +/// - Requesting the last RAV from the sender's TAP aggregator for all EOL allocations. +pub struct SenderAccount { + inner: Arc, + escrow_accounts: Eventual, + escrow_subgraph: &'static SubgraphClient, + escrow_adapter: EscrowAdapter, + tap_eip712_domain_separator: Eip712Domain, + rav_requester_task: tokio::task::JoinHandle<()>, + rav_requester_notify: Arc, + rav_requester_finalize_task: tokio::task::JoinHandle<()>, + rav_requester_finalize_notify: Arc, + unaggregated_receipts_guard: Arc>, +} + +impl SenderAccount { + #[allow(clippy::too_many_arguments)] + pub fn new( + config: &'static config::Cli, + pgpool: PgPool, + sender_id: Address, + escrow_accounts: Eventual, + escrow_subgraph: &'static SubgraphClient, + escrow_adapter: EscrowAdapter, + tap_eip712_domain_separator: Eip712Domain, + sender_aggregator_endpoint: String, + ) -> Self { + let unaggregated_receipts_guard = Arc::new(TokioMutex::new(())); + + let inner = Arc::new(Inner { + config, + pgpool, + allocations: Arc::new(StdMutex::new(HashMap::new())), + sender: sender_id, + sender_aggregator_endpoint, + unaggregated_fees: Arc::new(StdMutex::new(UnaggregatedReceipts::default())), + unaggregated_receipts_guard: unaggregated_receipts_guard.clone(), + }); + + let rav_requester_notify = Arc::new(Notify::new()); + let rav_requester_task = tokio::spawn({ + let inner = inner.clone(); + let rav_requester_notify = rav_requester_notify.clone(); + async move { + inner.rav_requester(rav_requester_notify).await; + } + }); + + let rav_requester_finalize_notify = Arc::new(Notify::new()); + let rav_requester_finalize_task = tokio::spawn({ + let inner = inner.clone(); + let rav_requester_finalize_notify = rav_requester_finalize_notify.clone(); + async move { + inner + .rav_requester_finalize(rav_requester_finalize_notify) + .await; + } + }); + + Self { + inner: inner.clone(), + escrow_accounts, + escrow_subgraph, + escrow_adapter, + tap_eip712_domain_separator, + rav_requester_task, + rav_requester_notify, + rav_requester_finalize_task, + rav_requester_finalize_notify, + unaggregated_receipts_guard, + } + } + + /// Update the sender's allocations to match the target allocations. + pub async fn update_allocations(&self, target_allocations: HashSet
) { + { + let mut allocations = self.inner.allocations.lock().unwrap(); + let mut allocations_to_finalize = false; + + // Make allocations that are no longer to be active `AllocationState::Ineligible`. + for (allocation_id, allocation_state) in allocations.iter_mut() { + if !target_allocations.contains(allocation_id) { + match allocation_state { + AllocationState::Active(allocation) => { + *allocation_state = AllocationState::Ineligible(allocation.clone()); + allocations_to_finalize = true; + } + AllocationState::Ineligible(_) => { + // Allocation is already ineligible, do nothing. + } + } + } + } + + if allocations_to_finalize { + self.rav_requester_finalize_notify.notify_waiters(); + } + } + + // Add new allocations. + for allocation_id in target_allocations { + let sender_allocation = AllocationState::Active(Arc::new( + SenderAllocation::new( + self.inner.config, + self.inner.pgpool.clone(), + allocation_id, + self.inner.sender, + self.escrow_accounts.clone(), + self.escrow_subgraph, + self.escrow_adapter.clone(), + self.tap_eip712_domain_separator.clone(), + self.inner.sender_aggregator_endpoint.clone(), + ) + .await, + )); + if let std::collections::hash_map::Entry::Vacant(e) = + self.inner.allocations.lock().unwrap().entry(allocation_id) + { + e.insert(sender_allocation); + } + } + } + + pub async fn handle_new_receipt_notification( + &self, + new_receipt_notification: NewReceiptNotification, + ) { + // Make sure to pause the handling of receipt notifications while we update the unaggregated + // fees. + let _guard = self.unaggregated_receipts_guard.lock().await; + + let allocation_state = self + .inner + .allocations + .lock() + .unwrap() + .get(&new_receipt_notification.allocation_id) + .cloned(); + + if let Some(AllocationState::Active(allocation)) = allocation_state { + // Try to add the receipt value to the allocation's unaggregated fees value. + // If the fees were not added, it means the receipt was already processed, so we + // don't need to do anything. + if allocation + .fees_add(new_receipt_notification.value, new_receipt_notification.id) + .await + { + // Add the receipt value to the allocation's unaggregated fees value. + allocation + .fees_add(new_receipt_notification.value, new_receipt_notification.id) + .await; + // Add the receipt value to the sender's unaggregated fees value. + let mut unaggregated_fees = self.inner.unaggregated_fees.lock().unwrap(); + *unaggregated_fees = UnaggregatedReceipts { + value: self + .inner + .fees_add(unaggregated_fees.value, new_receipt_notification.value), + last_id: new_receipt_notification.id, + }; + + // Check if we need to trigger a RAV request. + if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() + { + self.rav_requester_notify.notify_waiters(); + } + } + } else { + error!( + "Received a new receipt notification for allocation {} that doesn't exist \ + or is ineligible for sender {}.", + new_receipt_notification.allocation_id, self.inner.sender + ); + } + } + + pub async fn recompute_unaggregated_fees(&self) -> Result<()> { + self.inner.recompute_unaggregated_fees().await + } +} + +// Abort tasks on Drop +impl Drop for SenderAccount { + fn drop(&mut self) { + self.rav_requester_task.abort(); + self.rav_requester_finalize_task.abort(); + } +} + +#[cfg(test)] +mod tests { + + use alloy_primitives::hex::ToHex; + use indexer_common::subgraph_client::DeploymentDetails; + use serde_json::json; + use tap_aggregator::server::run_server; + use tap_core::tap_manager::SignedRAV; + use wiremock::{ + matchers::{body_string_contains, method}, + Mock, MockServer, ResponseTemplate, + }; + + use crate::tap::test_utils::{ + create_received_receipt, store_receipt, ALLOCATION_ID_0, ALLOCATION_ID_1, ALLOCATION_ID_2, + INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, + }; + + use super::*; + + const DUMMY_URL: &str = "http://localhost:1234"; + + // To help with testing from other modules. + impl SenderAccount { + pub fn _tests_get_allocations_active(&self) -> HashMap> { + self.inner + .allocations + .lock() + .unwrap() + .iter() + .filter_map(|(k, v)| { + if let AllocationState::Active(a) = v { + Some((*k, a.clone())) + } else { + None + } + }) + .collect() + } + + pub fn _tests_get_allocations_ineligible(&self) -> HashMap> { + self.inner + .allocations + .lock() + .unwrap() + .iter() + .filter_map(|(k, v)| { + if let AllocationState::Ineligible(a) = v { + Some((*k, a.clone())) + } else { + None + } + }) + .collect() + } + } + + async fn create_sender_with_allocations( + pgpool: PgPool, + sender_aggregator_endpoint: String, + escrow_subgraph_endpoint: &str, + ) -> SenderAccount { + let config = Box::leak(Box::new(config::Cli { + config: None, + ethereum: config::Ethereum { + indexer_address: INDEXER.1, + }, + tap: config::Tap { + rav_request_trigger_value: 100, + rav_request_timestamp_buffer_ms: 1, + rav_request_timeout_secs: 5, + ..Default::default() + }, + ..Default::default() + })); + + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), + ))); + + let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + + let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); + + let sender = SenderAccount::new( + config, + pgpool, + SENDER.1, + escrow_accounts_eventual, + escrow_subgraph, + escrow_adapter, + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + sender_aggregator_endpoint, + ); + + sender + .update_allocations(HashSet::from([ + *ALLOCATION_ID_0, + *ALLOCATION_ID_1, + *ALLOCATION_ID_2, + ])) + .await; + sender.recompute_unaggregated_fees().await.unwrap(); + + sender + } + + /// Test that the sender_account correctly ignores new receipt notifications with + /// an ID lower than the last receipt ID processed (be it from the DB or from a prior receipt + /// notification). + #[sqlx::test(migrations = "../migrations")] + async fn test_handle_new_receipt_notification(pgpool: PgPool) { + // Add receipts to the database. Before creating the sender and allocation so that it loads + // the receipts from the DB. + let mut expected_unaggregated_fees = 0u128; + for i in 10..20 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + expected_unaggregated_fees += u128::from(i); + } + + let sender = + create_sender_with_allocations(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Check that the allocation's unaggregated fees are correct. + assert_eq!( + sender + .inner + .allocations + .lock() + .unwrap() + .get(&*ALLOCATION_ID_0) + .unwrap() + .as_active() + .unwrap() + .get_unaggregated_fees() + .value, + expected_unaggregated_fees + ); + + // Check that the sender's unaggregated fees are correct. + assert_eq!( + sender.inner.unaggregated_fees.lock().unwrap().value, + expected_unaggregated_fees + ); + + // Send a new receipt notification that has a lower ID than the last loaded from the DB. + // The last ID in the DB should be 10, since we added 10 receipts to the empty receipts + // table + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: 10, + timestamp_ns: 19, + value: 19, + }; + sender + .handle_new_receipt_notification(new_receipt_notification) + .await; + + // Check that the allocation's unaggregated fees have *not* increased. + assert_eq!( + sender + .inner + .allocations + .lock() + .unwrap() + .get(&*ALLOCATION_ID_0) + .unwrap() + .as_active() + .unwrap() + .get_unaggregated_fees() + .value, + expected_unaggregated_fees + ); + + // Check that the unaggregated fees have *not* increased. + assert_eq!( + sender.inner.unaggregated_fees.lock().unwrap().value, + expected_unaggregated_fees + ); + + // Send a new receipt notification. + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: 30, + timestamp_ns: 20, + value: 20, + }; + sender + .handle_new_receipt_notification(new_receipt_notification) + .await; + expected_unaggregated_fees += 20; + + // Check that the allocation's unaggregated fees are correct. + assert_eq!( + sender + .inner + .allocations + .lock() + .unwrap() + .get(&*ALLOCATION_ID_0) + .unwrap() + .as_active() + .unwrap() + .get_unaggregated_fees() + .value, + expected_unaggregated_fees + ); + + // Check that the unaggregated fees are correct. + assert_eq!( + sender.inner.unaggregated_fees.lock().unwrap().value, + expected_unaggregated_fees + ); + + // Send a new receipt notification that has a lower ID than the previous one. + let new_receipt_notification = NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: 25, + timestamp_ns: 19, + value: 19, + }; + sender + .handle_new_receipt_notification(new_receipt_notification) + .await; + + // Check that the allocation's unaggregated fees have *not* increased. + assert_eq!( + sender + .inner + .allocations + .lock() + .unwrap() + .get(&*ALLOCATION_ID_0) + .unwrap() + .as_active() + .unwrap() + .get_unaggregated_fees() + .value, + expected_unaggregated_fees + ); + + // Check that the unaggregated fees have *not* increased. + assert_eq!( + sender.inner.unaggregated_fees.lock().unwrap().value, + expected_unaggregated_fees + ); + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_rav_requester_auto(pgpool: PgPool) { + // Start a TAP aggregator server. + let (handle, aggregator_endpoint) = run_server( + 0, + SIGNER.0.clone(), + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + 100 * 1024, + 100 * 1024, + 1, + ) + .await + .unwrap(); + + // Start a mock graphql server using wiremock + let mock_server = MockServer::start().await; + + // Mock result for TAP redeem txs for (allocation, sender) pair. + mock_server + .register( + Mock::given(method("POST")) + .and(body_string_contains("transactions")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({ "data": { "transactions": []}})), + ), + ) + .await; + + // Create a sender_account. + let sender_account = create_sender_with_allocations( + pgpool.clone(), + "http://".to_owned() + &aggregator_endpoint.to_string(), + &mock_server.uri(), + ) + .await; + + // Add receipts to the database and call the `handle_new_receipt_notification` method + // correspondingly. + let mut total_value = 0; + let mut trigger_value = 0; + for i in 0..10 { + // These values should be enough to trigger a RAV request at i == 7 since we set the + // `rav_request_trigger_value` to 100. + let value = (i + 10) as u128; + + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, value, i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + sender_account + .handle_new_receipt_notification(NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: i, + timestamp_ns: i + 1, + value, + }) + .await; + + total_value += value; + if total_value >= 100 && trigger_value == 0 { + trigger_value = total_value; + } + } + + // Wait for the RAV requester to finish. + for _ in 0..100 { + if sender_account.inner.unaggregated_fees.lock().unwrap().value < trigger_value { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + // Get the latest RAV from the database. + let latest_rav = sqlx::query!( + r#" + SELECT rav + FROM scalar_tap_ravs + WHERE allocation_id = $1 AND sender_address = $2 + "#, + ALLOCATION_ID_0.encode_hex::(), + SENDER.1.encode_hex::() + ) + .fetch_optional(&pgpool) + .await + .map(|r| r.map(|r| r.rav)) + .unwrap(); + + let latest_rav = latest_rav + .map(|r| serde_json::from_value::(r).unwrap()) + .unwrap(); + + // Check that the latest RAV value is correct. + assert!(latest_rav.message.value_aggregate >= trigger_value); + + // Check that the allocation's unaggregated fees value is reduced. + assert!( + sender_account + .inner + .allocations + .lock() + .unwrap() + .get(&*ALLOCATION_ID_0) + .unwrap() + .as_active() + .unwrap() + .get_unaggregated_fees() + .value + <= trigger_value + ); + + // Check that the sender's unaggregated fees value is reduced. + assert!(sender_account.inner.unaggregated_fees.lock().unwrap().value <= trigger_value); + + // Reset the total value and trigger value. + total_value = sender_account.inner.unaggregated_fees.lock().unwrap().value; + trigger_value = 0; + + // Add more receipts + for i in 10..20 { + let value = (i + 10) as u128; + + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + + sender_account + .handle_new_receipt_notification(NewReceiptNotification { + allocation_id: *ALLOCATION_ID_0, + signer_address: SIGNER.1, + id: i, + timestamp_ns: i + 1, + value, + }) + .await; + + total_value += value; + if total_value >= 100 && trigger_value == 0 { + trigger_value = total_value; + } + } + + // Wait for the RAV requester to finish. + for _ in 0..100 { + if sender_account.inner.unaggregated_fees.lock().unwrap().value < trigger_value { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + // Get the latest RAV from the database. + let latest_rav = sqlx::query!( + r#" + SELECT rav + FROM scalar_tap_ravs + WHERE allocation_id = $1 AND sender_address = $2 + "#, + ALLOCATION_ID_0.encode_hex::(), + SENDER.1.encode_hex::() + ) + .fetch_optional(&pgpool) + .await + .map(|r| r.map(|r| r.rav)) + .unwrap(); + + let latest_rav = latest_rav + .map(|r| serde_json::from_value::(r).unwrap()) + .unwrap(); + + // Check that the latest RAV value is correct. + + assert!(latest_rav.message.value_aggregate >= trigger_value); + + // Check that the allocation's unaggregated fees value is reduced. + assert!( + sender_account + .inner + .allocations + .lock() + .unwrap() + .get(&*ALLOCATION_ID_0) + .unwrap() + .as_active() + .unwrap() + .get_unaggregated_fees() + .value + <= trigger_value + ); + + // Check that the unaggregated fees value is reduced. + assert!(sender_account.inner.unaggregated_fees.lock().unwrap().value <= trigger_value); + + // Stop the TAP aggregator server. + handle.stop().unwrap(); + handle.stopped().await; + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_sender_unaggregated_fees(pgpool: PgPool) { + // Create a sender_account. + let sender_account = Arc::new( + create_sender_with_allocations(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await, + ); + + // Closure that adds a number of receipts to an allocation. + let add_receipts = |allocation_id: Address, iterations: u64| { + let sender_account = sender_account.clone(); + + async move { + let mut total_value = 0; + for i in 0..iterations { + let value = (i + 10) as u128; + + let id = sender_account + .inner + .unaggregated_fees + .lock() + .unwrap() + .last_id + + 1; + + sender_account + .handle_new_receipt_notification(NewReceiptNotification { + allocation_id, + signer_address: SIGNER.1, + id, + timestamp_ns: i + 1, + value, + }) + .await; + + total_value += value; + } + + assert_eq!( + sender_account + .inner + .allocations + .lock() + .unwrap() + .get(&allocation_id) + .unwrap() + .as_active() + .unwrap() + .get_unaggregated_fees() + .value, + total_value + ); + + total_value + } + }; + + // Add receipts to the database for allocation_0 + let total_value_0 = add_receipts(*ALLOCATION_ID_0, 9).await; + + // Add receipts to the database for allocation_1 + let total_value_1 = add_receipts(*ALLOCATION_ID_1, 10).await; + + // Add receipts to the database for allocation_2 + let total_value_2 = add_receipts(*ALLOCATION_ID_2, 8).await; + + // Get the heaviest allocation. + let heaviest_allocation = sender_account.inner.get_heaviest_allocation().unwrap(); + + // Check that the heaviest allocation is correct. + assert_eq!(heaviest_allocation.get_allocation_id(), *ALLOCATION_ID_1); + + // Check that the sender's unaggregated fees value is correct. + assert_eq!( + sender_account.inner.unaggregated_fees.lock().unwrap().value, + total_value_0 + total_value_1 + total_value_2 + ); + } +} diff --git a/tap-agent/src/tap/sender_allocation_relationships_manager.rs b/tap-agent/src/tap/sender_accounts_manager.rs similarity index 52% rename from tap-agent/src/tap/sender_allocation_relationships_manager.rs rename to tap-agent/src/tap/sender_accounts_manager.rs index 2a1441e6..ad471645 100644 --- a/tap-agent/src/tap/sender_allocation_relationships_manager.rs +++ b/tap-agent/src/tap/sender_accounts_manager.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::collections::HashSet; +use std::sync::Mutex as StdMutex; use std::{collections::HashMap, str::FromStr, sync::Arc}; use alloy_sol_types::Eip712Domain; @@ -13,12 +14,11 @@ use indexer_common::prelude::{Allocation, SubgraphClient}; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; use thegraph::types::Address; -use tokio::sync::RwLock; use tracing::{error, warn}; -use super::escrow_adapter::EscrowAdapter; -use super::sender_allocation_relationship::SenderAllocationRelationship; use crate::config; +use crate::tap::escrow_adapter::EscrowAdapter; +use crate::tap::sender_account::SenderAccount; #[derive(Deserialize, Debug)] pub struct NewReceiptNotification { @@ -29,19 +29,19 @@ pub struct NewReceiptNotification { pub value: u128, } -pub struct SenderAllocationRelationshipsManager { +pub struct SenderAccountsManager { _inner: Arc, new_receipts_watcher_handle: tokio::task::JoinHandle<()>, _eligible_allocations_senders_pipe: PipeHandle, } -#[derive(Clone)] +/// Inner struct for SenderAccountsManager. This is used to store an Arc state for spawning async +/// tasks. struct Inner { config: &'static config::Cli, pgpool: PgPool, - /// Map of (allocation_id, sender_address) to SenderAllocationRelationship. - sender_allocation_relationships: - Arc>>, + /// Map of sender_address to SenderAllocation. + sender_accounts: Arc>>>, indexer_allocations: Eventual>, escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, @@ -50,7 +50,64 @@ struct Inner { sender_aggregator_endpoints: HashMap, } -impl SenderAllocationRelationshipsManager { +impl Inner { + async fn update_sender_accounts( + &self, + indexer_allocations: HashMap, + target_senders: HashSet
, + ) -> Result<()> { + let eligible_allocations: HashSet
= indexer_allocations.keys().copied().collect(); + let mut sender_accounts_copy = self.sender_accounts.lock().unwrap().clone(); + + // For all Senders that are not in the target_senders HashSet, set all their allocations to + // ineligible. That will trigger a finalization of all their receipts. + for (sender_id, sender_account) in sender_accounts_copy.iter() { + if !target_senders.contains(sender_id) { + sender_account.update_allocations(HashSet::new()).await; + } + } + + // Get or create SenderAccount instances for all currently eligible + // senders. + for sender_id in &target_senders { + let sender = + sender_accounts_copy + .entry(*sender_id) + .or_insert(Arc::new(SenderAccount::new( + self.config, + self.pgpool.clone(), + *sender_id, + self.escrow_accounts.clone(), + self.escrow_subgraph, + self.escrow_adapter.clone(), + self.tap_eip712_domain_separator.clone(), + self.sender_aggregator_endpoints + .get(sender_id) + .ok_or_else(|| { + anyhow!( + "No sender_aggregator_endpoint found for sender {}", + sender_id + ) + })? + .clone(), + ))); + + // Update sender's allocations + sender + .update_allocations(eligible_allocations.clone()) + .await; + } + + // Replace the sender_accounts with the updated sender_accounts_copy + *self.sender_accounts.lock().unwrap() = sender_accounts_copy; + + // TODO: remove Sender instances that are finished. Ideally done in another async task? + + Ok(()) + } +} + +impl SenderAccountsManager { pub async fn new( config: &'static config::Cli, pgpool: PgPool, @@ -65,7 +122,7 @@ impl SenderAllocationRelationshipsManager { let inner = Arc::new(Inner { config, pgpool, - sender_allocation_relationships: Arc::new(RwLock::new(HashMap::new())), + sender_accounts: Arc::new(StdMutex::new(HashMap::new())), indexer_allocations, escrow_accounts, escrow_subgraph, @@ -74,27 +131,9 @@ impl SenderAllocationRelationshipsManager { sender_aggregator_endpoints, }); - let escrow_accounts_snapshot = inner - .escrow_accounts - .value() - .await - .expect("Should get escrow accounts from Eventual"); - - Self::update_sender_allocation_relationships( - &inner, - inner - .indexer_allocations - .value() - .await - .expect("Should get indexer allocations from Eventual"), - escrow_accounts_snapshot.get_senders(), - ) - .await - .expect("Should be able to update sender_allocation_relationships"); - // Listen to pg_notify events. We start it before updating the unaggregated_fees for all - // SenderAllocationRelationship instances, so that we don't miss any receipts. PG will - // buffer the notifications until we start consuming them with `new_receipts_watcher`. + // SenderAccount instances, so that we don't miss any receipts. PG will buffer the\ + // notifications until we start consuming them with `new_receipts_watcher`. let mut pglistener = PgListener::connect_with(&inner.pgpool.clone()) .await .unwrap(); @@ -106,73 +145,158 @@ impl SenderAllocationRelationshipsManager { 'scalar_tap_receipt_notification'", ); - let mut sender_allocation_relationships_write_lock = - inner.sender_allocation_relationships.write().await; + let escrow_accounts_snapshot = inner + .escrow_accounts + .value() + .await + .expect("Should get escrow accounts from Eventual"); + + // Gather all outstanding receipts and unfinalized RAVs from the database. + // Used to create SenderAccount instances for all senders that have unfinalized allocations + // and try to finalize them if they have become ineligible. + + // First we accumulate all allocations for each sender. This is because we may have more + // than one signer per sender in DB. + let mut unfinalized_sender_allocations_map: HashMap> = + HashMap::new(); - // Create SenderAllocationRelationship instances for all outstanding receipts in the - // database, because they may be linked to allocations that are not eligible anymore, but - // still need to get aggregated. - sqlx::query!( + let receipts_signer_allocations_in_db = sqlx::query!( r#" - SELECT DISTINCT allocation_id, signer_address - FROM scalar_tap_receipts + SELECT DISTINCT + signer_address, + ( + SELECT ARRAY + ( + SELECT DISTINCT allocation_id + FROM scalar_tap_receipts + WHERE signer_address = top.signer_address + ) + ) AS allocation_ids + FROM scalar_tap_receipts AS top "# ) .fetch_all(&inner.pgpool) .await - .unwrap() - .into_iter() - .for_each(|row| { - let allocation_id = Address::from_str(&row.allocation_id) - .expect("allocation_id should be a valid address"); - let signer = Address::from_str(&row.signer_address) + .expect("should be able to fetch pending receipts from the database"); + + for row in receipts_signer_allocations_in_db { + let allocation_ids = row + .allocation_ids + .expect("all receipts should have an allocation_id") + .iter() + .map(|allocation_id| { + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address") + }) + .collect::>(); + let signer_id = Address::from_str(&row.signer_address) .expect("signer_address should be a valid address"); - let sender = escrow_accounts_snapshot - .get_sender_for_signer(&signer) + let sender_id = escrow_accounts_snapshot + .get_sender_for_signer(&signer_id) .expect("should be able to get sender from signer"); - // Only create a SenderAllocationRelationship if it doesn't exist yet. - if let std::collections::hash_map::Entry::Vacant(e) = - sender_allocation_relationships_write_lock.entry((allocation_id, sender)) - { - e.insert(SenderAllocationRelationship::new( + // Accumulate allocations for the sender + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + + let nonfinal_ravs_sender_allocations_in_db = sqlx::query!( + r#" + SELECT DISTINCT + sender_address, + ( + SELECT ARRAY + ( + SELECT DISTINCT allocation_id + FROM scalar_tap_ravs + WHERE sender_address = top.sender_address + ) + ) AS allocation_id + FROM scalar_tap_ravs AS top + "# + ) + .fetch_all(&inner.pgpool) + .await + .expect("should be able to fetch unfinalized RAVs from the database"); + + for row in nonfinal_ravs_sender_allocations_in_db { + let allocation_ids = row + .allocation_id + .expect("all RAVs should have an allocation_id") + .iter() + .map(|allocation_id| { + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address") + }) + .collect::>(); + let sender_id = Address::from_str(&row.sender_address) + .expect("sender_address should be a valid address"); + + // Accumulate allocations for the sender + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + + // Create SenderAccount instances for all senders that have unfinalized allocations and add + // the allocations to the SenderAccount instances. + let mut sender_accounts = HashMap::new(); + for (sender_id, allocation_ids) in unfinalized_sender_allocations_map { + let sender = sender_accounts + .entry(sender_id) + .or_insert(Arc::new(SenderAccount::new( config, inner.pgpool.clone(), - allocation_id, - sender, + sender_id, inner.escrow_accounts.clone(), inner.escrow_subgraph, inner.escrow_adapter.clone(), inner.tap_eip712_domain_separator.clone(), inner .sender_aggregator_endpoints - .get(&sender) - .unwrap() + .get(&sender_id) + .expect("should be able to get sender_aggregator_endpoint for sender") .clone(), - )); - } - }); + ))); + + sender.update_allocations(allocation_ids).await; - // Update the unaggregated_fees for all SenderAllocationRelationship instances by pulling - // the receipts from the database. - for sender_allocation_relationship in sender_allocation_relationships_write_lock.values() { - sender_allocation_relationship - .update_unaggregated_fees() + sender + .recompute_unaggregated_fees() .await - .expect("should be able to update unaggregated_fees"); + .expect("should be able to recompute unaggregated fees"); } - - drop(sender_allocation_relationships_write_lock); + // replace the sender_accounts with the updated sender_accounts + *inner.sender_accounts.lock().unwrap() = sender_accounts; + + // Update senders and allocations based on the current state of the network. + // It is important to do this after creating the Sender and SenderAllocation instances based + // on the receipts in the database, because now all ineligible allocation and/or sender that + // we created above will be set for receipt finalization. + inner + .update_sender_accounts( + inner + .indexer_allocations + .value() + .await + .expect("Should get indexer allocations from Eventual"), + escrow_accounts_snapshot.get_senders(), + ) + .await + .expect("Should be able to update_sender_accounts"); // Start the new_receipts_watcher task that will consume from the `pglistener` let new_receipts_watcher_handle = tokio::spawn(Self::new_receipts_watcher( pglistener, - inner.sender_allocation_relationships.clone(), + inner.sender_accounts.clone(), inner.escrow_accounts.clone(), )); // Start the eligible_allocations_senders_pipe that watches for changes in eligible senders - // and allocations and updates the SenderAllocationRelationship instances accordingly. + // and allocations and updates the SenderAccount instances accordingly. let inner_clone = inner.clone(); let eligible_allocations_senders_pipe = eventuals::join(( inner.indexer_allocations.clone(), @@ -181,18 +305,12 @@ impl SenderAllocationRelationshipsManager { .pipe_async(move |(indexer_allocations, escrow_accounts)| { let inner = inner_clone.clone(); async move { - Self::update_sender_allocation_relationships( - &inner, - indexer_allocations, - escrow_accounts.get_senders(), - ) - .await - .unwrap_or_else(|e| { - error!( - "Error while updating sender_allocation_relationships: {:?}", - e - ); - }); + inner + .update_sender_accounts(indexer_allocations, escrow_accounts.get_senders()) + .await + .unwrap_or_else(|e| { + error!("Error while updating sender_accounts: {:?}", e); + }); } }); @@ -204,12 +322,10 @@ impl SenderAllocationRelationshipsManager { } /// Continuously listens for new receipt notifications from Postgres and forwards them to the - /// corresponding SenderAllocationRelationship. + /// corresponding SenderAccount. async fn new_receipts_watcher( mut pglistener: PgListener, - sender_allocation_relationships: Arc< - RwLock>, - >, + sender_accounts: Arc>>>, escrow_accounts: Eventual, ) { loop { @@ -243,81 +359,28 @@ impl SenderAllocationRelationshipsManager { } }; - if let Some(sender_allocation_relationship) = sender_allocation_relationships - .read() - .await - .get(&(new_receipt_notification.allocation_id, sender_address)) - { - sender_allocation_relationship + let sender_account = sender_accounts + .lock() + .unwrap() + .get(&sender_address) + .cloned(); + + if let Some(sender_account) = sender_account { + sender_account .handle_new_receipt_notification(new_receipt_notification) .await; } else { warn!( - "No sender_allocation_relationship found for allocation_id {} and \ - sender_address {} to process new receipt notification. This should not \ - happen.", - new_receipt_notification.allocation_id, sender_address + "No sender_allocation_manager found for sender_address {} to process new \ + receipt notification. This should not happen.", + sender_address ); } } } - - async fn update_sender_allocation_relationships( - inner: &Inner, - indexer_allocations: HashMap, - senders: HashSet
, - ) -> Result<()> { - let eligible_allocations: Vec
= indexer_allocations.keys().copied().collect(); - let mut sender_allocation_relationships_write = - inner.sender_allocation_relationships.write().await; - - // Create SenderAllocationRelationship instances for all currently eligible - // (allocation, sender) - for allocation_id in &eligible_allocations { - for sender in &senders { - // Only create a SenderAllocationRelationship if it doesn't exist yet. - if let std::collections::hash_map::Entry::Vacant(e) = - sender_allocation_relationships_write.entry((*allocation_id, *sender)) - { - e.insert(SenderAllocationRelationship::new( - inner.config, - inner.pgpool.clone(), - *allocation_id, - *sender, - inner.escrow_accounts.clone(), - inner.escrow_subgraph, - inner.escrow_adapter.clone(), - inner.tap_eip712_domain_separator.clone(), - inner - .sender_aggregator_endpoints - .get(sender) - .ok_or_else(|| { - anyhow!("No sender_aggregator_endpoint found for sender {}", sender) - })? - .clone(), - )); - } - } - } - - // Trigger a last rav request for all SenderAllocationRelationship instances that correspond - // to ineligible (allocations, sender). - for ((allocation_id, sender), sender_allocation_relatioship) in - sender_allocation_relationships_write.iter() - { - if !eligible_allocations.contains(allocation_id) || !senders.contains(sender) { - sender_allocation_relatioship.start_last_rav_request().await - } - } - - // TODO: remove SenderAllocationRelationship instances that are finished. Ideally done in - // another async task? - - Ok(()) - } } -impl Drop for SenderAllocationRelationshipsManager { +impl Drop for SenderAccountsManager { fn drop(&mut self) { // Abort the notification watcher on drop. Otherwise it may panic because the PgPool could // get dropped before. (Observed in tests) @@ -342,15 +405,12 @@ mod tests { Mock, MockServer, ResponseTemplate, }; - use crate::tap::{ - sender_allocation_relationship::State, - test_utils::{INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR}, - }; + use crate::tap::test_utils::{INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR}; use super::*; #[sqlx::test(migrations = "../migrations")] - async fn test_sender_allocation_relatioship_creation_and_eol(pgpool: PgPool) { + async fn test_sender_account_creation_and_eol(pgpool: PgPool) { let config = Box::leak(Box::new(config::Cli { config: None, ethereum: config::Ethereum { @@ -390,7 +450,7 @@ mod tests { DeploymentDetails::for_query_url(&mock_server.uri()).unwrap(), ))); - let sender_allocation_relatioships = SenderAllocationRelationshipsManager::new( + let sender_account = SenderAccountsManager::new( config, pgpool.clone(), indexer_allocations_eventual, @@ -436,16 +496,16 @@ mod tests { HashMap::from([(SENDER.1, vec![SIGNER.1])]), )); - // Wait for the SenderAllocationRelationship to be created. + // Wait for the SenderAccount to be created. tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Check that the SenderAllocationRelationship was created. - assert!(sender_allocation_relatioships + // Check that the SenderAccount was created. + assert!(sender_account ._inner - .sender_allocation_relationships - .write() - .await - .contains_key(&(allocation_id, SENDER.1))); + .sender_accounts + .lock() + .unwrap() + .contains_key(&SENDER.1)); // Remove the escrow account from the escrow_accounts Eventual. escrow_accounts_writer.write(EscrowAccounts::default()); @@ -453,18 +513,24 @@ mod tests { // Wait a bit tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Check that the SenderAllocationRelationship state is last_rav_pending - assert_eq!( - sender_allocation_relatioships - ._inner - .sender_allocation_relationships - .read() - .await - .get(&(allocation_id, SENDER.1)) - .unwrap() - .state() - .await, - State::LastRavPending - ); + // Check that the Sender's allocation moved from active to ineligible. + assert!(sender_account + ._inner + .sender_accounts + .lock() + .unwrap() + .get(&SENDER.1) + .unwrap() + ._tests_get_allocations_active() + .is_empty()); + assert!(sender_account + ._inner + .sender_accounts + .lock() + .unwrap() + .get(&SENDER.1) + .unwrap() + ._tests_get_allocations_ineligible() + .contains_key(&allocation_id)); } } diff --git a/tap-agent/src/tap/sender_allocation.rs b/tap-agent/src/tap/sender_allocation.rs new file mode 100644 index 00000000..24078481 --- /dev/null +++ b/tap-agent/src/tap/sender_allocation.rs @@ -0,0 +1,601 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Mutex as StdMutex; +use std::{str::FromStr, sync::Arc, time::Duration}; + +use alloy_primitives::hex::ToHex; +use alloy_sol_types::Eip712Domain; +use anyhow::{anyhow, ensure, Result}; +use eventuals::Eventual; +use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; +use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; +use sqlx::{types::BigDecimal, PgPool}; +use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; +use tap_core::{ + eip_712_signed_message::EIP712SignedMessage, + receipt_aggregate_voucher::ReceiptAggregateVoucher, + tap_manager::RAVRequest, + tap_receipt::{ReceiptCheck, ReceivedReceipt}, +}; +use thegraph::types::Address; +use tokio::sync::Mutex as TokioMutex; +use tracing::{error, warn}; + +use crate::{ + config::{self}, + tap::{ + escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, + receipt_checks_adapter::ReceiptChecksAdapter, + receipt_storage_adapter::ReceiptStorageAdapter, signers_trimmed, + unaggregated_receipts::UnaggregatedReceipts, + }, +}; + +type TapManager = tap_core::tap_manager::Manager< + EscrowAdapter, + ReceiptChecksAdapter, + ReceiptStorageAdapter, + RAVStorageAdapter, +>; + +/// Manages unaggregated fees and the TAP lifecyle for a specific (allocation, sender) pair. +pub struct SenderAllocation { + pgpool: PgPool, + tap_manager: TapManager, + allocation_id: Address, + sender: Address, + sender_aggregator_endpoint: String, + unaggregated_fees: Arc>, + config: &'static config::Cli, + escrow_accounts: Eventual, + rav_request_guard: TokioMutex<()>, + unaggregated_receipts_guard: TokioMutex<()>, +} + +impl SenderAllocation { + #[allow(clippy::too_many_arguments)] + pub async fn new( + config: &'static config::Cli, + pgpool: PgPool, + allocation_id: Address, + sender: Address, + escrow_accounts: Eventual, + escrow_subgraph: &'static SubgraphClient, + escrow_adapter: EscrowAdapter, + tap_eip712_domain_separator: Eip712Domain, + sender_aggregator_endpoint: String, + ) -> Self { + let required_checks = vec![ + ReceiptCheck::CheckUnique, + ReceiptCheck::CheckAllocationId, + ReceiptCheck::CheckTimestamp, + // ReceiptCheck::CheckValue, + ReceiptCheck::CheckSignature, + ReceiptCheck::CheckAndReserveEscrow, + ]; + + let receipt_checks_adapter = ReceiptChecksAdapter::new( + config, + pgpool.clone(), + // TODO: Implement query appraisals. + None, + allocation_id, + escrow_accounts.clone(), + escrow_subgraph, + sender, + ); + let receipt_storage_adapter = ReceiptStorageAdapter::new( + pgpool.clone(), + allocation_id, + sender, + required_checks.clone(), + escrow_accounts.clone(), + ); + let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); + let tap_manager = TapManager::new( + tap_eip712_domain_separator.clone(), + escrow_adapter, + receipt_checks_adapter, + rav_storage_adapter, + receipt_storage_adapter, + required_checks, + 0, + ); + + let sender_allocation = Self { + pgpool, + tap_manager, + allocation_id, + sender, + sender_aggregator_endpoint, + unaggregated_fees: Arc::new(StdMutex::new(UnaggregatedReceipts::default())), + config, + escrow_accounts, + rav_request_guard: TokioMutex::new(()), + unaggregated_receipts_guard: TokioMutex::new(()), + }; + + sender_allocation + .update_unaggregated_fees() + .await + .map_err(|e| { + error!( + "Error while updating unaggregated fees for allocation {}: {}", + allocation_id, e + ) + }) + .ok(); + + sender_allocation + } + + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager + /// with the latest unaggregated fees from the database. + async fn update_unaggregated_fees(&self) -> Result<()> { + // Make sure to pause the handling of receipt notifications while we update the unaggregated + // fees. + let _guard = self.unaggregated_receipts_guard.lock().await; + + self.tap_manager.remove_obsolete_receipts().await?; + + let signers = signers_trimmed(&self.escrow_accounts, self.sender).await?; + + // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? + let res = sqlx::query!( + r#" + WITH rav AS ( + SELECT + rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns + FROM + scalar_tap_ravs + WHERE + allocation_id = $1 + AND sender_address = $2 + ) + SELECT + MAX(id), + SUM(value) + FROM + scalar_tap_receipts + WHERE + allocation_id = $1 + AND signer_address IN (SELECT unnest($3::text[])) + AND CASE WHEN ( + SELECT + timestamp_ns :: NUMERIC + FROM + rav + ) IS NOT NULL THEN timestamp_ns > ( + SELECT + timestamp_ns :: NUMERIC + FROM + rav + ) ELSE TRUE END + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + &signers + ) + .fetch_one(&self.pgpool) + .await?; + + ensure!( + res.sum.is_none() == res.max.is_none(), + "Exactly one of SUM(value) and MAX(id) is null. This should not happen." + ); + + *self.unaggregated_fees.lock().unwrap() = UnaggregatedReceipts { + last_id: res.max.unwrap_or(0).try_into()?, + value: res + .sum + .unwrap_or(BigDecimal::from(0)) + .to_string() + .parse::()?, + }; + + // TODO: check if we need to run a RAV request here. + + Ok(()) + } + + /// Request a RAV from the sender's TAP aggregator. Only one RAV request will be running at a + /// time through the use of an internal guard. + pub async fn rav_requester_single(&self) -> Result<()> { + // Making extra sure that only one RAV request is running at a time. + let _guard = self.rav_request_guard.lock().await; + + let RAVRequest { + valid_receipts, + previous_rav, + invalid_receipts, + expected_rav, + } = self + .tap_manager + .create_rav_request( + self.config.tap.rav_request_timestamp_buffer_ms * 1_000_000, + // TODO: limit the number of receipts to aggregate per request. + None, + ) + .await?; + if !invalid_receipts.is_empty() { + warn!( + "Found {} invalid receipts for allocation {} and sender {}.", + invalid_receipts.len(), + self.allocation_id, + self.sender + ); + + // Save invalid receipts to the database for logs. + // TODO: consider doing that in a spawned task? + Self::store_invalid_receipts(self, &invalid_receipts).await?; + } + let client = HttpClientBuilder::default() + .request_timeout(Duration::from_secs( + self.config.tap.rav_request_timeout_secs, + )) + .build(&self.sender_aggregator_endpoint)?; + let response: JsonRpcResponse> = client + .request( + "aggregate_receipts", + rpc_params!( + "0.0", // TODO: Set the version in a smarter place. + valid_receipts, + previous_rav + ), + ) + .await?; + if let Some(warnings) = response.warnings { + warn!("Warnings from sender's TAP aggregator: {:?}", warnings); + } + match self + .tap_manager + .verify_and_store_rav(expected_rav.clone(), response.data.clone()) + .await + { + Ok(_) => {} + + // Adapter errors are local software errors. Shouldn't be a problem with the sender. + Err(tap_core::Error::AdapterError { source_error: e }) => { + anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e) + } + + // The 3 errors below signal an invalid RAV, which should be about problems with the + // sender. The sender could be malicious. + Err( + e @ tap_core::Error::InvalidReceivedRAV { + expected_rav: _, + received_rav: _, + } + | e @ tap_core::Error::SignatureError(_) + | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, + ) => { + Self::store_failed_rav(self, &expected_rav, &response.data, &e.to_string()).await?; + anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e); + } + + // All relevant errors should be handled above. If we get here, we forgot to handle + // an error case. + Err(e) => { + anyhow::bail!("Error while verifying and storing RAV: {:?}", e); + } + } + Self::update_unaggregated_fees(self).await?; + Ok(()) + } + + pub async fn mark_rav_final(&self) -> Result<()> { + let updated_rows = sqlx::query!( + r#" + UPDATE scalar_tap_ravs + SET final = true + WHERE allocation_id = $1 AND sender_address = $2 + RETURNING * + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + ) + .fetch_all(&self.pgpool) + .await?; + if updated_rows.len() != 1 { + anyhow::bail!( + "Expected exactly one row to be updated in the latest RAVs table, \ + but {} were updated.", + updated_rows.len() + ); + }; + Ok(()) + } + + async fn store_invalid_receipts(&self, receipts: &[ReceivedReceipt]) -> Result<()> { + for received_receipt in receipts.iter() { + sqlx::query!( + r#" + INSERT INTO scalar_tap_receipts_invalid ( + allocation_id, + signer_address, + timestamp_ns, + value, + received_receipt + ) + VALUES ($1, $2, $3, $4, $5) + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + BigDecimal::from(received_receipt.signed_receipt().message.timestamp_ns), + BigDecimal::from_str(&received_receipt.signed_receipt().message.value.to_string())?, + serde_json::to_value(received_receipt)? + ) + .execute(&self.pgpool) + .await + .map_err(|e| anyhow!("Failed to store failed receipt: {:?}", e))?; + } + + Ok(()) + } + + async fn store_failed_rav( + &self, + expected_rav: &ReceiptAggregateVoucher, + rav: &EIP712SignedMessage, + reason: &str, + ) -> Result<()> { + sqlx::query!( + r#" + INSERT INTO scalar_tap_rav_requests_failed ( + allocation_id, + sender_address, + expected_rav, + rav_response, + reason + ) + VALUES ($1, $2, $3, $4, $5) + "#, + self.allocation_id.encode_hex::(), + self.sender.encode_hex::(), + serde_json::to_value(expected_rav)?, + serde_json::to_value(rav)?, + reason + ) + .execute(&self.pgpool) + .await + .map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?; + + Ok(()) + } + + /// Safe add the fees to the unaggregated fees value if the receipt_id is greater than the + /// last_id. If the addition would overflow u128, log an error and set the unaggregated fees + /// value to u128::MAX. + /// + /// Returns true if the fees were added, false otherwise. + pub async fn fees_add(&self, fees: u128, receipt_id: u64) -> bool { + // Make sure to pause the handling of receipt notifications while we update the unaggregated + // fees. + let _guard = self.unaggregated_receipts_guard.lock().await; + + let mut fees_added = false; + let mut unaggregated_fees = self.unaggregated_fees.lock().unwrap(); + + if receipt_id > unaggregated_fees.last_id { + *unaggregated_fees = UnaggregatedReceipts { + last_id: receipt_id, + value: unaggregated_fees + .value + .checked_add(fees) + .unwrap_or_else(|| { + // This should never happen, but if it does, we want to know about it. + error!( + "Overflow when adding receipt value {} to total unaggregated fees {} \ + for allocation {} and sender {}. Setting total unaggregated fees to \ + u128::MAX.", + fees, unaggregated_fees.value, self.allocation_id, self.sender + ); + u128::MAX + }), + }; + fees_added = true; + } + + fees_added + } + + pub fn get_unaggregated_fees(&self) -> UnaggregatedReceipts { + self.unaggregated_fees.lock().unwrap().clone() + } + + pub fn get_allocation_id(&self) -> Address { + self.allocation_id + } +} + +#[cfg(test)] +mod tests { + + use std::collections::HashMap; + + use indexer_common::subgraph_client::DeploymentDetails; + use serde_json::json; + use tap_aggregator::server::run_server; + + use wiremock::{ + matchers::{body_string_contains, method}, + Mock, MockServer, ResponseTemplate, + }; + + use super::*; + use crate::tap::test_utils::{ + create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0, INDEXER, + SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, + }; + + const DUMMY_URL: &str = "http://localhost:1234"; + + async fn create_sender_allocation( + pgpool: PgPool, + sender_aggregator_endpoint: String, + escrow_subgraph_endpoint: &str, + ) -> SenderAllocation { + let config = Box::leak(Box::new(config::Cli { + config: None, + ethereum: config::Ethereum { + indexer_address: INDEXER.1, + }, + tap: config::Tap { + rav_request_trigger_value: 100, + rav_request_timestamp_buffer_ms: 1, + rav_request_timeout_secs: 5, + ..Default::default() + }, + ..Default::default() + })); + + let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( + reqwest::Client::new(), + None, + DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), + ))); + + let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + + let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); + + SenderAllocation::new( + config, + pgpool.clone(), + *ALLOCATION_ID_0, + SENDER.1, + escrow_accounts_eventual, + escrow_subgraph, + escrow_adapter, + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + sender_aggregator_endpoint, + ) + .await + } + + /// Test that the sender_allocation correctly updates the unaggregated fees from the + /// database when there is no RAV in the database. + /// + /// The sender_allocation should consider all receipts found for the allocation and + /// sender. + #[sqlx::test(migrations = "../migrations")] + async fn test_update_unaggregated_fees_no_rav(pgpool: PgPool) { + let sender_allocation = + create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Add receipts to the database. + for i in 1..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the sender_allocation update the unaggregated fees from the database. + sender_allocation.update_unaggregated_fees().await.unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!( + sender_allocation.unaggregated_fees.lock().unwrap().value, + 45u128 + ); + } + + /// Test that the sender_allocation correctly updates the unaggregated fees from the + /// database when there is a RAV in the database as well as receipts which timestamp are lesser + /// and greater than the RAV's timestamp. + /// + /// The sender_allocation should only consider receipts with a timestamp greater + /// than the RAV's timestamp. + #[sqlx::test(migrations = "../migrations")] + async fn test_update_unaggregated_fees_with_rav(pgpool: PgPool) { + let sender_allocation = + create_sender_allocation(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await; + + // Add the RAV to the database. + // This RAV has timestamp 4. The sender_allocation should only consider receipts + // with a timestamp greater than 4. + let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10).await; + store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); + + // Add receipts to the database. + for i in 1..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the sender_allocation update the unaggregated fees from the database. + sender_allocation.update_unaggregated_fees().await.unwrap(); + + // Check that the unaggregated fees are correct. + assert_eq!( + sender_allocation.unaggregated_fees.lock().unwrap().value, + 35u128 + ); + } + + #[sqlx::test(migrations = "../migrations")] + async fn test_rav_requester_manual(pgpool: PgPool) { + // Start a TAP aggregator server. + let (handle, aggregator_endpoint) = run_server( + 0, + SIGNER.0.clone(), + TAP_EIP712_DOMAIN_SEPARATOR.clone(), + 100 * 1024, + 100 * 1024, + 1, + ) + .await + .unwrap(); + + // Start a mock graphql server using wiremock + let mock_server = MockServer::start().await; + + // Mock result for TAP redeem txs for (allocation, sender) pair. + mock_server + .register( + Mock::given(method("POST")) + .and(body_string_contains("transactions")) + .respond_with( + ResponseTemplate::new(200) + .set_body_json(json!({ "data": { "transactions": []}})), + ), + ) + .await; + + // Create a sender_allocation. + let sender_allocation = create_sender_allocation( + pgpool.clone(), + "http://".to_owned() + &aggregator_endpoint.to_string(), + &mock_server.uri(), + ) + .await; + + // Add receipts to the database. + for i in 0..10 { + let receipt = + create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i + 1, i.into(), i).await; + store_receipt(&pgpool, receipt.signed_receipt()) + .await + .unwrap(); + } + + // Let the sender_allocation update the unaggregated fees from the database. + sender_allocation.update_unaggregated_fees().await.unwrap(); + + // Trigger a RAV request manually. + sender_allocation.rav_requester_single().await.unwrap(); + + // Stop the TAP aggregator server. + handle.stop().unwrap(); + handle.stopped().await; + } +} diff --git a/tap-agent/src/tap/sender_allocation_relationship.rs b/tap-agent/src/tap/sender_allocation_relationship.rs deleted file mode 100644 index 198dc6f5..00000000 --- a/tap-agent/src/tap/sender_allocation_relationship.rs +++ /dev/null @@ -1,1024 +0,0 @@ -// Copyright 2023-, GraphOps and Semiotic Labs. -// SPDX-License-Identifier: Apache-2.0 - -use std::{str::FromStr, sync::Arc, time::Duration}; - -use alloy_primitives::hex::ToHex; -use alloy_sol_types::Eip712Domain; -use anyhow::{anyhow, ensure, Result}; -use thegraph::types::Address; - -use eventuals::Eventual; -use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; -use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; -use sqlx::{types::BigDecimal, PgPool}; -use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; -use tap_core::{ - eip_712_signed_message::EIP712SignedMessage, - receipt_aggregate_voucher::ReceiptAggregateVoucher, - tap_manager::RAVRequest, - tap_receipt::{ReceiptCheck, ReceivedReceipt}, -}; -use tokio::sync::Mutex; -use tracing::{error, warn}; - -use super::sender_allocation_relationships_manager::NewReceiptNotification; -use crate::{ - config::{self}, - tap::{ - escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, - receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, signers_trimmed, - }, -}; - -type TapManager = tap_core::tap_manager::Manager< - EscrowAdapter, - ReceiptChecksAdapter, - ReceiptStorageAdapter, - RAVStorageAdapter, ->; - -#[derive(Default, Debug)] -struct UnaggregatedFees { - pub value: u128, - /// The ID of the last receipt value added to the unaggregated fees value. - /// This is used to make sure we don't process the same receipt twice. Relies on the fact that - /// the receipts IDs are SERIAL in the database. - pub last_id: u64, -} - -#[derive(Debug, PartialEq, Clone, Copy)] -pub enum State { - Running, - LastRavPending, - Finished, -} - -struct Inner { - pgpool: PgPool, - tap_manager: TapManager, - allocation_id: Address, - sender: Address, - sender_aggregator_endpoint: String, - unaggregated_fees: Arc>, - state: Arc>, - config: &'static config::Cli, - escrow_accounts: Eventual, -} - -/// A SenderAllocationRelationship is the relationship between the indexer and the sender in the -/// context of a single allocation. -/// -/// Manages the lifecycle of Scalar TAP for the SenderAllocationRelationship, including: -/// - Monitoring new receipts and keeping track of the unaggregated fees. -/// - Requesting RAVs from the sender's TAP aggregator once the unaggregated fees reach a certain -/// threshold. -/// - Requesting the last RAV from the sender's TAP aggregator (on SenderAllocationRelationship EOL) -pub struct SenderAllocationRelationship { - inner: Arc, - rav_requester_task: tokio::task::JoinHandle<()>, - rav_requester_notify: Arc, -} - -impl SenderAllocationRelationship { - #[allow(clippy::too_many_arguments)] - pub fn new( - config: &'static config::Cli, - pgpool: PgPool, - allocation_id: Address, - sender: Address, - escrow_accounts: Eventual, - escrow_subgraph: &'static SubgraphClient, - escrow_adapter: EscrowAdapter, - tap_eip712_domain_separator: Eip712Domain, - sender_aggregator_endpoint: String, - ) -> Self { - let required_checks = vec![ - ReceiptCheck::CheckUnique, - ReceiptCheck::CheckAllocationId, - ReceiptCheck::CheckTimestamp, - // ReceiptCheck::CheckValue, - ReceiptCheck::CheckSignature, - ReceiptCheck::CheckAndReserveEscrow, - ]; - - let receipt_checks_adapter = ReceiptChecksAdapter::new( - config, - pgpool.clone(), - // TODO: Implement query appraisals. - None, - allocation_id, - escrow_accounts.clone(), - escrow_subgraph, - sender, - ); - let receipt_storage_adapter = ReceiptStorageAdapter::new( - pgpool.clone(), - allocation_id, - sender, - required_checks.clone(), - escrow_accounts.clone(), - ); - let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); - let tap_manager = TapManager::new( - tap_eip712_domain_separator.clone(), - escrow_adapter, - receipt_checks_adapter, - rav_storage_adapter, - receipt_storage_adapter, - required_checks, - 0, - ); - - let inner = Arc::new(Inner { - pgpool, - tap_manager, - allocation_id, - sender, - sender_aggregator_endpoint, - unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), - state: Arc::new(Mutex::new(State::Running)), - config, - escrow_accounts, - }); - - let rav_requester_notify = Arc::new(tokio::sync::Notify::new()); - let rav_requester_task = tokio::spawn(Self::rav_requester( - inner.clone(), - rav_requester_notify.clone(), - )); - - Self { - inner, - rav_requester_task, - rav_requester_notify, - } - } - - pub async fn handle_new_receipt_notification( - &self, - new_receipt_notification: NewReceiptNotification, - ) { - // If we're in the last rav pending state or finished, we don't want to process any new - // receipts. - if self.state().await != State::Running { - error!( - "Received a new receipt notification for now ineligible allocation {} and \ - sender {}.", - self.inner.allocation_id, self.inner.sender - ); - return; - } - - let mut unaggregated_fees = self.inner.unaggregated_fees.lock().await; - - // Else we already processed that receipt, most likely from pulling the receipts - // from the database. - if new_receipt_notification.id > unaggregated_fees.last_id { - unaggregated_fees.value = unaggregated_fees - .value - .checked_add(new_receipt_notification.value) - .unwrap_or_else(|| { - // This should never happen, but if it does, we want to know about it. - error!( - "Overflow when adding receipt value {} to total unaggregated fees {} for \ - allocation {} and sender {}. Setting total unaggregated fees to u128::MAX.", - new_receipt_notification.value, - unaggregated_fees.value, - new_receipt_notification.allocation_id, - self.inner.sender - ); - u128::MAX - }); - unaggregated_fees.last_id = new_receipt_notification.id; - - // TODO: consider making the trigger per sender, instead of per (sender, allocation). - if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() { - self.rav_requester_notify.notify_waiters(); - } - } - } - - pub async fn start_last_rav_request(&self) { - *(self.inner.state.lock().await) = State::LastRavPending; - self.rav_requester_notify.notify_one(); - } - - /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager - /// with the latest unaggregated fees from the database. - pub async fn update_unaggregated_fees(&self) -> Result<()> { - Self::update_unaggregated_fees_static(&self.inner).await - } - - /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager - /// with the latest unaggregated fees from the database. - async fn update_unaggregated_fees_static(inner: &Inner) -> Result<()> { - inner.tap_manager.remove_obsolete_receipts().await?; - - let signers = signers_trimmed(&inner.escrow_accounts, inner.sender).await?; - - // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? - let res = sqlx::query!( - r#" - WITH rav AS ( - SELECT - rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns - FROM - scalar_tap_ravs - WHERE - allocation_id = $1 - AND sender_address = $2 - ) - SELECT - MAX(id), - SUM(value) - FROM - scalar_tap_receipts - WHERE - allocation_id = $1 - AND signer_address IN (SELECT unnest($3::text[])) - AND CASE WHEN ( - SELECT - timestamp_ns :: NUMERIC - FROM - rav - ) IS NOT NULL THEN timestamp_ns > ( - SELECT - timestamp_ns :: NUMERIC - FROM - rav - ) ELSE TRUE END - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - &signers - ) - .fetch_one(&inner.pgpool) - .await?; - - let mut unaggregated_fees = inner.unaggregated_fees.lock().await; - - ensure!( - res.sum.is_none() == res.max.is_none(), - "Exactly one of SUM(value) and MAX(id) is null. This should not happen." - ); - - unaggregated_fees.last_id = res.max.unwrap_or(0).try_into()?; - unaggregated_fees.value = res - .sum - .unwrap_or(BigDecimal::from(0)) - .to_string() - .parse::()?; - - // TODO: check if we need to run a RAV request here. - - Ok(()) - } - - /// Request a RAV from the sender's TAP aggregator. - /// Will remove the aggregated receipts from the database if successful. - async fn rav_requester(inner: Arc, notifications: Arc) { - loop { - // Wait for a RAV request notification. - notifications.notified().await; - - Self::rav_requester_try(&inner).await.unwrap_or_else(|e| { - error!( - "Error while requesting a RAV for allocation {} and sender {}: {:?}", - inner.allocation_id, inner.sender, e - ); - }); - } - } - - async fn rav_requester_try(inner: &Arc) -> Result<()> { - loop { - Self::rav_requester_single(inner).await?; - - // Check if we need to request another RAV immediately. - let unaggregated_fees = inner.unaggregated_fees.lock().await; - if unaggregated_fees.value < inner.config.tap.rav_request_trigger_value.into() { - break; - } else { - // Go right back to requesting a RAV and warn the user. - warn!( - "Unaggregated fees for allocation {} and sender {} are {} right \ - after the RAV request. This is a sign that the TAP agent can't keep \ - up with the rate of new receipts. Consider increasing the \ - `rav_request_trigger_value` in the TAP agent config. It could also be \ - a sign that the sender's TAP aggregator is too slow.", - inner.allocation_id, inner.sender, unaggregated_fees.value - ); - } - } - - let mut state = inner.state.lock().await; - if *state == State::LastRavPending { - // Mark the last RAV as final in the DB as a cue for the indexer-agent. - Self::mark_rav_final(inner).await?; - - *state = State::Finished; - }; - anyhow::Ok(()) - } - - async fn rav_requester_single(inner: &Arc) -> Result<()> { - let RAVRequest { - valid_receipts, - previous_rav, - invalid_receipts, - expected_rav, - } = inner - .tap_manager - .create_rav_request( - inner.config.tap.rav_request_timestamp_buffer_ms * 1_000_000, - // TODO: limit the number of receipts to aggregate per request. - None, - ) - .await?; - if !invalid_receipts.is_empty() { - warn!( - "Found {} invalid receipts for allocation {} and sender {}.", - invalid_receipts.len(), - inner.allocation_id, - inner.sender - ); - - // Save invalid receipts to the database for logs. - // TODO: consider doing that in a spawned task? - Self::store_invalid_receipts(inner, &invalid_receipts).await?; - } - let client = HttpClientBuilder::default() - .request_timeout(Duration::from_secs( - inner.config.tap.rav_request_timeout_secs, - )) - .build(&inner.sender_aggregator_endpoint)?; - let response: JsonRpcResponse> = client - .request( - "aggregate_receipts", - rpc_params!( - "0.0", // TODO: Set the version in a smarter place. - valid_receipts, - previous_rav - ), - ) - .await?; - if let Some(warnings) = response.warnings { - warn!("Warnings from sender's TAP aggregator: {:?}", warnings); - } - match inner - .tap_manager - .verify_and_store_rav(expected_rav.clone(), response.data.clone()) - .await - { - Ok(_) => {} - - // Adapter errors are local software errors. Shouldn't be a problem with the sender. - Err(tap_core::Error::AdapterError { source_error: e }) => { - anyhow::bail!("TAP Adapter error while storing RAV: {:?}", e) - } - - // The 3 errors below signal an invalid RAV, which should be about problems with the - // sender. The sender could be malicious. - Err( - e @ tap_core::Error::InvalidReceivedRAV { - expected_rav: _, - received_rav: _, - } - | e @ tap_core::Error::SignatureError(_) - | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, - ) => { - Self::store_failed_rav(inner, &expected_rav, &response.data, &e.to_string()) - .await?; - anyhow::bail!("Invalid RAV, sender could be malicious: {:?}.", e); - } - - // All relevant errors should be handled above. If we get here, we forgot to handle - // an error case. - Err(e) => { - anyhow::bail!("Error while verifying and storing RAV: {:?}", e); - } - } - Self::update_unaggregated_fees_static(inner).await?; - Ok(()) - } - - async fn mark_rav_final(inner: &Arc) -> Result<()> { - let updated_rows = sqlx::query!( - r#" - UPDATE scalar_tap_ravs - SET final = true - WHERE allocation_id = $1 AND sender_address = $2 - RETURNING * - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - ) - .fetch_all(&inner.pgpool) - .await?; - if updated_rows.len() != 1 { - anyhow::bail!( - "Expected exactly one row to be updated in the latest RAVs table, \ - but {} were updated.", - updated_rows.len() - ); - }; - Ok(()) - } - - pub async fn state(&self) -> State { - *self.inner.state.lock().await - } - - async fn store_invalid_receipts(inner: &Inner, receipts: &[ReceivedReceipt]) -> Result<()> { - for received_receipt in receipts.iter() { - sqlx::query!( - r#" - INSERT INTO scalar_tap_receipts_invalid ( - allocation_id, - signer_address, - timestamp_ns, - value, - received_receipt - ) - VALUES ($1, $2, $3, $4, $5) - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - BigDecimal::from(received_receipt.signed_receipt().message.timestamp_ns), - BigDecimal::from_str(&received_receipt.signed_receipt().message.value.to_string())?, - serde_json::to_value(received_receipt)? - ) - .execute(&inner.pgpool) - .await - .map_err(|e| anyhow!("Failed to store failed receipt: {:?}", e))?; - } - - Ok(()) - } - - async fn store_failed_rav( - inner: &Inner, - expected_rav: &ReceiptAggregateVoucher, - rav: &EIP712SignedMessage, - reason: &str, - ) -> Result<()> { - sqlx::query!( - r#" - INSERT INTO scalar_tap_rav_requests_failed ( - allocation_id, - sender_address, - expected_rav, - rav_response, - reason - ) - VALUES ($1, $2, $3, $4, $5) - "#, - inner.allocation_id.encode_hex::(), - inner.sender.encode_hex::(), - serde_json::to_value(expected_rav)?, - serde_json::to_value(rav)?, - reason - ) - .execute(&inner.pgpool) - .await - .map_err(|e| anyhow!("Failed to store failed RAV: {:?}", e))?; - - Ok(()) - } -} - -impl Drop for SenderAllocationRelationship { - /// Trying to make sure the RAV requester task is dropped when the SenderAllocationRelationship - /// is dropped. - fn drop(&mut self) { - self.rav_requester_task.abort(); - } -} - -#[cfg(test)] -mod tests { - - use std::collections::HashMap; - - use indexer_common::subgraph_client::DeploymentDetails; - use serde_json::json; - use tap_aggregator::server::run_server; - use tap_core::tap_manager::SignedRAV; - use wiremock::{ - matchers::{body_string_contains, method}, - Mock, MockServer, ResponseTemplate, - }; - - use super::*; - use crate::tap::test_utils::{ - create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID, INDEXER, - SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, - }; - - const DUMMY_URL: &str = "http://localhost:1234"; - - async fn create_sender_allocation_relationship( - pgpool: PgPool, - sender_aggregator_endpoint: String, - escrow_subgraph_endpoint: &str, - ) -> SenderAllocationRelationship { - let config = Box::leak(Box::new(config::Cli { - config: None, - ethereum: config::Ethereum { - indexer_address: INDEXER.1, - }, - tap: config::Tap { - rav_request_trigger_value: 100, - rav_request_timestamp_buffer_ms: 1, - rav_request_timeout_secs: 5, - ..Default::default() - }, - ..Default::default() - })); - - let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new( - reqwest::Client::new(), - None, - DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), - ))); - - let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( - HashMap::from([(SENDER.1, 1000.into())]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); - - let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); - - SenderAllocationRelationship::new( - config, - pgpool.clone(), - *ALLOCATION_ID, - SENDER.1, - escrow_accounts_eventual, - escrow_subgraph, - escrow_adapter, - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - sender_aggregator_endpoint, - ) - } - - /// Test that the sender_allocation_relatioship correctly updates the unaggregated fees from the - /// database when there is no RAV in the database. - /// - /// The sender_allocation_relatioship should consider all receipts found for the allocation and - /// sender. - #[sqlx::test(migrations = "../migrations")] - async fn test_update_unaggregated_fees_no_rav(pgpool: PgPool) { - let sender_allocation_relatioship = - create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) - .await; - - // Add receipts to the database. - for i in 1..10 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - } - - // Let the sender_allocation_relatioship update the unaggregated fees from the database. - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - 45u128 - ); - } - - /// Test that the sender_allocation_relatioship correctly updates the unaggregated fees from the - /// database when there is a RAV in the database as well as receipts which timestamp are lesser - /// and greater than the RAV's timestamp. - /// - /// The sender_allocation_relatioship should only consider receipts with a timestamp greater - /// than the RAV's timestamp. - #[sqlx::test(migrations = "../migrations")] - async fn test_update_unaggregated_fees_with_rav(pgpool: PgPool) { - let sender_allocation_relatioship = - create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) - .await; - - // Add the RAV to the database. - // This RAV has timestamp 4. The sender_allocation_relatioship should only consider receipts - // with a timestamp greater than 4. - let signed_rav = create_rav(*ALLOCATION_ID, SIGNER.0.clone(), 4, 10).await; - store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); - - // Add receipts to the database. - for i in 1..10 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - } - - // Let the sender_allocation_relatioship update the unaggregated fees from the database. - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - 35u128 - ); - } - - /// Test that the sender_allocation_relatioship correctly ignores new receipt notifications with - /// an ID lower than the last receipt ID processed (be it from the DB or from a prior receipt - /// notification). - #[sqlx::test(migrations = "../migrations")] - async fn test_handle_new_receipt_notification(pgpool: PgPool) { - let sender_allocation_relatioship = - create_sender_allocation_relationship(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL) - .await; - - // Add receipts to the database. - let mut expected_unaggregated_fees = 0u128; - for i in 10..20 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - expected_unaggregated_fees += u128::from(i); - } - - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - - // Send a new receipt notification that has a lower ID than the last loaded from the DB. - // The last ID in the DB should be 10, since we added 10 receipts to the empty receipts - // table - let new_receipt_notification = NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: 10, - timestamp_ns: 19, - value: 19, - }; - sender_allocation_relatioship - .handle_new_receipt_notification(new_receipt_notification) - .await; - - // Check that the unaggregated fees have *not* increased. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - - // Send a new receipt notification. - let new_receipt_notification = NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: 30, - timestamp_ns: 20, - value: 20, - }; - sender_allocation_relatioship - .handle_new_receipt_notification(new_receipt_notification) - .await; - expected_unaggregated_fees += 20; - - // Check that the unaggregated fees are correct. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - - // Send a new receipt notification that has a lower ID than the previous one. - let new_receipt_notification = NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: 25, - timestamp_ns: 19, - value: 19, - }; - sender_allocation_relatioship - .handle_new_receipt_notification(new_receipt_notification) - .await; - - // Check that the unaggregated fees have *not* increased. - assert_eq!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value, - expected_unaggregated_fees - ); - } - - #[sqlx::test(migrations = "../migrations")] - async fn test_rav_requester_manual(pgpool: PgPool) { - // Start a TAP aggregator server. - let (handle, aggregator_endpoint) = run_server( - 0, - SIGNER.0.clone(), - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - 100 * 1024, - 100 * 1024, - 1, - ) - .await - .unwrap(); - - // Start a mock graphql server using wiremock - let mock_server = MockServer::start().await; - - // Mock result for TAP redeem txs for (allocation, sender) pair. - mock_server - .register( - Mock::given(method("POST")) - .and(body_string_contains("transactions")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(json!({ "data": { "transactions": []}})), - ), - ) - .await; - - // Create a sender_allocation_relatioship. - let sender_allocation_relatioship = create_sender_allocation_relationship( - pgpool.clone(), - "http://".to_owned() + &aggregator_endpoint.to_string(), - &mock_server.uri(), - ) - .await; - - // Add receipts to the database. - for i in 0..10 { - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - } - - // Let the sender_allocation_relatioship update the unaggregated fees from the database. - sender_allocation_relatioship - .update_unaggregated_fees() - .await - .unwrap(); - - // Trigger a RAV request manually. - SenderAllocationRelationship::rav_requester_try(&sender_allocation_relatioship.inner) - .await - .unwrap(); - - // Stop the TAP aggregator server. - handle.stop().unwrap(); - handle.stopped().await; - } - - #[sqlx::test(migrations = "../migrations")] - async fn test_rav_requester_auto(pgpool: PgPool) { - // Start a TAP aggregator server. - let (handle, aggregator_endpoint) = run_server( - 0, - SIGNER.0.clone(), - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - 100 * 1024, - 100 * 1024, - 1, - ) - .await - .unwrap(); - - // Start a mock graphql server using wiremock - let mock_server = MockServer::start().await; - - // Mock result for TAP redeem txs for (allocation, sender) pair. - mock_server - .register( - Mock::given(method("POST")) - .and(body_string_contains("transactions")) - .respond_with( - ResponseTemplate::new(200) - .set_body_json(json!({ "data": { "transactions": []}})), - ), - ) - .await; - - // Create a sender_allocation_relatioship. - let sender_allocation_relatioship = create_sender_allocation_relationship( - pgpool.clone(), - "http://".to_owned() + &aggregator_endpoint.to_string(), - &mock_server.uri(), - ) - .await; - - // Add receipts to the database and call the `handle_new_receipt_notification` method - // correspondingly. - let mut total_value = 0; - let mut trigger_value = 0; - for i in 0..10 { - // These values should be enough to trigger a RAV request at i == 7 since we set the - // `rav_request_trigger_value` to 100. - let value = (i + 10) as u128; - - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, value, i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - sender_allocation_relatioship - .handle_new_receipt_notification(NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: i, - timestamp_ns: i + 1, - value, - }) - .await; - - total_value += value; - if total_value >= 100 && trigger_value == 0 { - trigger_value = total_value; - } - } - - // Wait for the RAV requester to finish. - for _ in 0..100 { - if sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - < trigger_value - { - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - // Get the latest RAV from the database. - let latest_rav = sqlx::query!( - r#" - SELECT rav - FROM scalar_tap_ravs - WHERE allocation_id = $1 AND sender_address = $2 - "#, - ALLOCATION_ID.encode_hex::(), - SENDER.1.encode_hex::() - ) - .fetch_optional(&pgpool) - .await - .map(|r| r.map(|r| r.rav)) - .unwrap(); - - let latest_rav = latest_rav - .map(|r| serde_json::from_value::(r).unwrap()) - .unwrap(); - - // Check that the latest RAV value is correct. - assert!(latest_rav.message.value_aggregate >= trigger_value); - - // Check that the unaggregated fees value is reduced. - assert!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - <= trigger_value - ); - - // Reset the total value and trigger value. - total_value = sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value; - trigger_value = 0; - - // Add more receipts - for i in 10..20 { - let value = (i + 10) as u128; - - let receipt = - create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; - store_receipt(&pgpool, receipt.signed_receipt()) - .await - .unwrap(); - - sender_allocation_relatioship - .handle_new_receipt_notification(NewReceiptNotification { - allocation_id: *ALLOCATION_ID, - signer_address: SIGNER.1, - id: i, - timestamp_ns: i + 1, - value, - }) - .await; - - total_value += value; - if total_value >= 100 && trigger_value == 0 { - trigger_value = total_value; - } - } - - // Wait for the RAV requester to finish. - for _ in 0..100 { - if sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - < trigger_value - { - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - // Get the latest RAV from the database. - let latest_rav = sqlx::query!( - r#" - SELECT rav - FROM scalar_tap_ravs - WHERE allocation_id = $1 AND sender_address = $2 - "#, - ALLOCATION_ID.encode_hex::(), - SENDER.1.encode_hex::() - ) - .fetch_optional(&pgpool) - .await - .map(|r| r.map(|r| r.rav)) - .unwrap(); - - let latest_rav = latest_rav - .map(|r| serde_json::from_value::(r).unwrap()) - .unwrap(); - - // Check that the latest RAV value is correct. - - assert!(latest_rav.message.value_aggregate >= trigger_value); - - // Check that the unaggregated fees value is reduced. - assert!( - sender_allocation_relatioship - .inner - .unaggregated_fees - .lock() - .await - .value - <= trigger_value - ); - - // Stop the TAP aggregator server. - handle.stop().unwrap(); - handle.stopped().await; - } -} diff --git a/tap-agent/src/tap/test_utils.rs b/tap-agent/src/tap/test_utils.rs index 0edf3b74..d0cf8b7d 100644 --- a/tap-agent/src/tap/test_utils.rs +++ b/tap-agent/src/tap/test_utils.rs @@ -17,8 +17,12 @@ use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt use thegraph::types::Address; lazy_static! { - pub static ref ALLOCATION_ID: Address = + pub static ref ALLOCATION_ID_0: Address = Address::from_str("0xabababababababababababababababababababab").unwrap(); + pub static ref ALLOCATION_ID_1: Address = + Address::from_str("0xbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc").unwrap(); + pub static ref ALLOCATION_ID_2: Address = + Address::from_str("0xcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd").unwrap(); pub static ref ALLOCATION_ID_IRRELEVANT: Address = Address::from_str("0xbcdebcdebcdebcdebcdebcdebcdebcdebcdebcde").unwrap(); pub static ref SENDER: (LocalWallet, Address) = wallet(0); diff --git a/tap-agent/src/tap/unaggregated_receipts.rs b/tap-agent/src/tap/unaggregated_receipts.rs new file mode 100644 index 00000000..0511152c --- /dev/null +++ b/tap-agent/src/tap/unaggregated_receipts.rs @@ -0,0 +1,11 @@ +// Copyright 2023-, GraphOps and Semiotic Labs. +// SPDX-License-Identifier: Apache-2.0 + +#[derive(Default, Debug, Clone)] +pub struct UnaggregatedReceipts { + pub value: u128, + /// The ID of the last receipt value added to the unaggregated fees value. + /// This is used to make sure we don't process the same receipt twice. Relies on the fact that + /// the receipts IDs are SERIAL in the database. + pub last_id: u64, +}