Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RAV value threshold per sender #113

Merged
merged 12 commits into from
Feb 16, 2024
Prev Previous commit
Next Next commit
fix: proper signer/sender handling at startup DB SenderAccount loading
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
aasseman committed Feb 16, 2024
commit bb3e07d94c903ee2dfb059bf20369e37dda7786a

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 63 additions & 10 deletions tap-agent/src/tap/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
@@ -93,12 +93,16 @@ impl SenderAccountsManager {
.await
.expect("Should get escrow accounts from Eventual");

let mut sender_accounts_write_lock = inner.sender_accounts.write().await;
// Gather all outstanding receipts and unfinalized RAVs from the database.
// Used to create SenderAccount instances for all senders that have unfinalized allocations
// and try to finalize them if they have become ineligible.

// First we accumulate all allocations for each sender. This is because we may have more
// than one signer per sender in DB.
let mut unfinalized_sender_allocations_map: HashMap<Address, HashSet<Address>> =
HashMap::new();

// Create Sender and SenderAllocation instances for all outstanding receipts in the
// database because they may be linked to allocations that are not eligible anymore, but
// still need to get aggregated.
let unfinalized_allocations_in_db = sqlx::query!(
let receipts_signer_allocations_in_db = sqlx::query!(
r#"
SELECT DISTINCT
signer_address,
@@ -115,9 +119,9 @@ impl SenderAccountsManager {
)
.fetch_all(&inner.pgpool)
.await
.expect("should be able to fetch unfinalized allocations from the database");
.expect("should be able to fetch pending receipts from the database");

for row in unfinalized_allocations_in_db {
for row in receipts_signer_allocations_in_db {
let allocation_ids = row
.allocation_ids
.expect("all receipts should have an allocation_id")
@@ -133,6 +137,56 @@ impl SenderAccountsManager {
.get_sender_for_signer(&signer_id)
.expect("should be able to get sender from signer");

// Accumulate allocations for the sender
unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
}

let nonfinal_ravs_sender_allocations_in_db = sqlx::query!(
r#"
SELECT
sender_address,
(
SELECT ARRAY
(
SELECT DISTINCT allocation_id
FROM scalar_tap_ravs
WHERE sender_address = sender_address
)
) AS allocation_id
FROM scalar_tap_ravs
"#
)
.fetch_all(&inner.pgpool)
.await
.expect("should be able to fetch unfinalized RAVs from the database");

for row in nonfinal_ravs_sender_allocations_in_db {
let allocation_ids = row
.allocation_id
.expect("all RAVs should have an allocation_id")
.iter()
.map(|allocation_id| {
Address::from_str(allocation_id)
.expect("allocation_id should be a valid address")
})
.collect::<HashSet<Address>>();
let sender_id = Address::from_str(&row.sender_address)
.expect("sender_address should be a valid address");

// Accumulate allocations for the sender
unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
}

// Create SenderAccount instances for all senders that have unfinalized allocations and add
// the allocations to the SenderAccount instances.
let mut sender_accounts_write_lock = inner.sender_accounts.write().await;
for (sender_id, allocation_ids) in unfinalized_sender_allocations_map {
let sender = sender_accounts_write_lock
.entry(sender_id)
.or_insert(SenderAccount::new(
@@ -150,14 +204,13 @@ impl SenderAccountsManager {
.clone(),
));

// Update sender's allocations
sender.update_allocations(allocation_ids.clone()).await;
sender.update_allocations(allocation_ids).await;

sender
.recompute_unaggregated_fees()
.await
.expect("should be able to recompute unaggregated fees");
}

drop(sender_accounts_write_lock);

// Update senders and allocations based on the current state of the network.