Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RAV value threshold per sender #113

Merged
merged 12 commits into from
Feb 16, 2024
Prev Previous commit
fix: use std mutex for sender_accounts
Signed-off-by: Alexis Asseman <alexis@semiotic.ai>
aasseman committed Feb 16, 2024
commit 12bc4aa364e72d788fc6a775b597b2da1bbfdd92
8 changes: 2 additions & 6 deletions tap-agent/src/tap/sender_account.rs
Original file line number Diff line number Diff line change
@@ -434,9 +434,7 @@ mod tests {

// To help with testing from other modules.
impl SenderAccount {
pub async fn _tests_get_allocations_active(
&self,
) -> HashMap<Address, Arc<SenderAllocation>> {
pub fn _tests_get_allocations_active(&self) -> HashMap<Address, Arc<SenderAllocation>> {
self.inner
.allocations
.lock()
@@ -452,9 +450,7 @@ mod tests {
.collect()
}

pub async fn _tests_get_allocations_ineligible(
&self,
) -> HashMap<Address, Arc<SenderAllocation>> {
pub fn _tests_get_allocations_ineligible(&self) -> HashMap<Address, Arc<SenderAllocation>> {
self.inner
.allocations
.lock()
89 changes: 49 additions & 40 deletions tap-agent/src/tap/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;
use std::sync::Mutex as StdMutex;
use std::{collections::HashMap, str::FromStr, sync::Arc};

use alloy_sol_types::Eip712Domain;
@@ -13,7 +14,6 @@ use indexer_common::prelude::{Allocation, SubgraphClient};
use serde::Deserialize;
use sqlx::{postgres::PgListener, PgPool};
use thegraph::types::Address;
use tokio::sync::RwLock;
use tracing::{error, warn};

use crate::config;
@@ -41,7 +41,7 @@ struct Inner {
config: &'static config::Cli,
pgpool: PgPool,
/// Map of sender_address to SenderAllocation.
sender_accounts: Arc<RwLock<HashMap<Address, SenderAccount>>>,
sender_accounts: Arc<StdMutex<HashMap<Address, Arc<SenderAccount>>>>,
indexer_allocations: Eventual<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<EscrowAccounts>,
escrow_subgraph: &'static SubgraphClient,
@@ -57,11 +57,11 @@ impl Inner {
target_senders: HashSet<Address>,
) -> Result<()> {
let eligible_allocations: HashSet<Address> = indexer_allocations.keys().copied().collect();
let mut sender_accounts_write = self.sender_accounts.write().await;
let mut sender_accounts_copy = self.sender_accounts.lock().unwrap().clone();

// For all Senders that are not in the target_senders HashSet, set all their allocations to
// ineligible. That will trigger a finalization of all their receipts.
for (sender_id, sender_account) in sender_accounts_write.iter_mut() {
for (sender_id, sender_account) in sender_accounts_copy.iter() {
if !target_senders.contains(sender_id) {
sender_account.update_allocations(HashSet::new()).await;
}
@@ -70,33 +70,37 @@ impl Inner {
// Get or create SenderAccount instances for all currently eligible
// senders.
for sender_id in &target_senders {
let sender = sender_accounts_write
.entry(*sender_id)
.or_insert(SenderAccount::new(
self.config,
self.pgpool.clone(),
*sender_id,
self.escrow_accounts.clone(),
self.escrow_subgraph,
self.escrow_adapter.clone(),
self.tap_eip712_domain_separator.clone(),
self.sender_aggregator_endpoints
.get(sender_id)
.ok_or_else(|| {
anyhow!(
"No sender_aggregator_endpoint found for sender {}",
sender_id
)
})?
.clone(),
));
let sender =
sender_accounts_copy
.entry(*sender_id)
.or_insert(Arc::new(SenderAccount::new(
self.config,
self.pgpool.clone(),
*sender_id,
self.escrow_accounts.clone(),
self.escrow_subgraph,
self.escrow_adapter.clone(),
self.tap_eip712_domain_separator.clone(),
self.sender_aggregator_endpoints
.get(sender_id)
.ok_or_else(|| {
anyhow!(
"No sender_aggregator_endpoint found for sender {}",
sender_id
)
})?
.clone(),
)));

// Update sender's allocations
sender
.update_allocations(eligible_allocations.clone())
.await;
}

// Replace the sender_accounts with the updated sender_accounts_copy
*self.sender_accounts.lock().unwrap() = sender_accounts_copy;

// TODO: remove Sender instances that are finished. Ideally done in another async task?

Ok(())
@@ -118,7 +122,7 @@ impl SenderAccountsManager {
let inner = Arc::new(Inner {
config,
pgpool,
sender_accounts: Arc::new(RwLock::new(HashMap::new())),
sender_accounts: Arc::new(StdMutex::new(HashMap::new())),
indexer_allocations,
escrow_accounts,
escrow_subgraph,
@@ -239,11 +243,11 @@ impl SenderAccountsManager {

// Create SenderAccount instances for all senders that have unfinalized allocations and add
// the allocations to the SenderAccount instances.
let mut sender_accounts_write_lock = inner.sender_accounts.write().await;
let mut sender_accounts = HashMap::new();
for (sender_id, allocation_ids) in unfinalized_sender_allocations_map {
let sender = sender_accounts_write_lock
let sender = sender_accounts
.entry(sender_id)
.or_insert(SenderAccount::new(
.or_insert(Arc::new(SenderAccount::new(
config,
inner.pgpool.clone(),
sender_id,
@@ -256,7 +260,7 @@ impl SenderAccountsManager {
.get(&sender_id)
.expect("should be able to get sender_aggregator_endpoint for sender")
.clone(),
));
)));

sender.update_allocations(allocation_ids).await;

@@ -265,7 +269,8 @@ impl SenderAccountsManager {
.await
.expect("should be able to recompute unaggregated fees");
}
drop(sender_accounts_write_lock);
// replace the sender_accounts with the updated sender_accounts
*inner.sender_accounts.lock().unwrap() = sender_accounts;

// Update senders and allocations based on the current state of the network.
// It is important to do this after creating the Sender and SenderAllocation instances based
@@ -320,7 +325,7 @@ impl SenderAccountsManager {
/// corresponding SenderAccount.
async fn new_receipts_watcher(
mut pglistener: PgListener,
sender_accounts: Arc<RwLock<HashMap<Address, SenderAccount>>>,
sender_accounts: Arc<StdMutex<HashMap<Address, Arc<SenderAccount>>>>,
escrow_accounts: Eventual<EscrowAccounts>,
) {
loop {
@@ -354,7 +359,13 @@ impl SenderAccountsManager {
}
};

if let Some(sender_account) = sender_accounts.read().await.get(&sender_address) {
let sender_account = sender_accounts
.lock()
.unwrap()
.get(&sender_address)
.cloned();

if let Some(sender_account) = sender_account {
sender_account
.handle_new_receipt_notification(new_receipt_notification)
.await;
@@ -492,8 +503,8 @@ mod tests {
assert!(sender_account
._inner
.sender_accounts
.write()
.await
.lock()
.unwrap()
.contains_key(&SENDER.1));

// Remove the escrow account from the escrow_accounts Eventual.
@@ -506,22 +517,20 @@ mod tests {
assert!(sender_account
._inner
.sender_accounts
.read()
.await
.lock()
.unwrap()
.get(&SENDER.1)
.unwrap()
._tests_get_allocations_active()
.await
.is_empty());
assert!(sender_account
._inner
.sender_accounts
.read()
.await
.lock()
.unwrap()
.get(&SENDER.1)
.unwrap()
._tests_get_allocations_ineligible()
.await
.contains_key(&allocation_id));
}
}