From 63a1acb0326134e17280d3578a5f94ffda81d3a4 Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Thu, 1 Feb 2024 17:15:12 -0800 Subject: [PATCH] fix: proper signer/sender handling at startup DB SenderAccount loading Signed-off-by: Alexis Asseman --- tap-agent/src/tap/sender_accounts_manager.rs | 73 +++++++++++++++++--- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/tap-agent/src/tap/sender_accounts_manager.rs b/tap-agent/src/tap/sender_accounts_manager.rs index 79a65ebd..290f7f23 100644 --- a/tap-agent/src/tap/sender_accounts_manager.rs +++ b/tap-agent/src/tap/sender_accounts_manager.rs @@ -93,12 +93,16 @@ impl SenderAccountsManager { .await .expect("Should get escrow accounts from Eventual"); - let mut sender_accounts_write_lock = inner.sender_accounts.write().await; + // Gather all outstanding receipts and unfinalized RAVs from the database. + // Used to create SenderAccount instances for all senders that have unfinalized allocations + // and try to finalize them if they have become ineligible. + + // First we accumulate all allocations for each sender. This is because we may have more + // than one signer per sender in DB. + let mut unfinalized_sender_allocations_map: HashMap> = + HashMap::new(); - // Create Sender and SenderAllocation instances for all outstanding receipts in the - // database because they may be linked to allocations that are not eligible anymore, but - // still need to get aggregated. - let unfinalized_allocations_in_db = sqlx::query!( + let receipts_signer_allocations_in_db = sqlx::query!( r#" SELECT DISTINCT signer_address, @@ -115,9 +119,9 @@ impl SenderAccountsManager { ) .fetch_all(&inner.pgpool) .await - .expect("should be able to fetch unfinalized allocations from the database"); + .expect("should be able to fetch pending receipts from the database"); - for row in unfinalized_allocations_in_db { + for row in receipts_signer_allocations_in_db { let allocation_ids = row .allocation_ids .expect("all receipts should have an allocation_id") @@ -133,6 +137,56 @@ impl SenderAccountsManager { .get_sender_for_signer(&signer_id) .expect("should be able to get sender from signer"); + // Accumulate allocations for the sender + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + + let nonfinal_ravs_sender_allocations_in_db = sqlx::query!( + r#" + SELECT + sender_address, + ( + SELECT ARRAY + ( + SELECT DISTINCT allocation_id + FROM scalar_tap_ravs + WHERE sender_address = sender_address + ) + ) AS allocation_id + FROM scalar_tap_ravs + "# + ) + .fetch_all(&inner.pgpool) + .await + .expect("should be able to fetch unfinalized RAVs from the database"); + + for row in nonfinal_ravs_sender_allocations_in_db { + let allocation_ids = row + .allocation_id + .expect("all RAVs should have an allocation_id") + .iter() + .map(|allocation_id| { + Address::from_str(allocation_id) + .expect("allocation_id should be a valid address") + }) + .collect::>(); + let sender_id = Address::from_str(&row.sender_address) + .expect("sender_address should be a valid address"); + + // Accumulate allocations for the sender + unfinalized_sender_allocations_map + .entry(sender_id) + .or_default() + .extend(allocation_ids); + } + + // Create SenderAccount instances for all senders that have unfinalized allocations and add + // the allocations to the SenderAccount instances. + let mut sender_accounts_write_lock = inner.sender_accounts.write().await; + for (sender_id, allocation_ids) in unfinalized_sender_allocations_map { let sender = sender_accounts_write_lock .entry(sender_id) .or_insert(SenderAccount::new( @@ -150,14 +204,13 @@ impl SenderAccountsManager { .clone(), )); - // Update sender's allocations - sender.update_allocations(allocation_ids.clone()).await; + sender.update_allocations(allocation_ids).await; + sender .recompute_unaggregated_fees() .await .expect("should be able to recompute unaggregated fees"); } - drop(sender_accounts_write_lock); // Update senders and allocations based on the current state of the network.