Skip to content

Commit

Permalink
Merge pull request #175 from rpcpool/espi/ops-bubblegum-backfill-patch
Browse files Browse the repository at this point in the history
[Patch] Filter out failed transactions while tree backfilling
  • Loading branch information
kespinola authored Feb 23, 2024
2 parents 9e3f11a + 0943057 commit 4632968
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 3 deletions.
91 changes: 91 additions & 0 deletions ops/src/bubblegum/audit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use super::rpc::{Rpc, SolanaRpcArgs};
use anyhow::Result;

use clap::Parser;
use das_core::{connect_db, MetricsArgs, PoolArgs};
use digital_asset_types::dao::cl_audits_v2;
use futures::future;

use sea_orm::{CursorTrait, EntityTrait, SqlxPostgresConnector};
use solana_sdk::signature::Signature;
use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta;

use tokio::io::{stdout, AsyncWriteExt};

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Database configuration
#[clap(flatten)]
pub database: PoolArgs,

/// Metrics configuration
#[clap(flatten)]
pub metrics: MetricsArgs,

/// Solana configuration
#[clap(flatten)]
pub solana: SolanaRpcArgs,

#[arg(long, env, default_value = "10000")]
pub batch_size: u64,
}

pub async fn run(config: Args) -> Result<()> {
let pool = connect_db(config.database).await?;

let solana_rpc = Rpc::from_config(config.solana);

let mut output = stdout();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let mut after = None;

loop {
let mut query = cl_audits_v2::Entity::find().cursor_by(cl_audits_v2::Column::Id);
let mut query = query.first(config.batch_size);

if let Some(after) = after {
query = query.after(after);
}

let entries = query.all(&conn).await?;

let mut transactions = vec![];

for entry in entries.clone() {
transactions.push(fetch_transaction(entry, solana_rpc.clone()));
}

let transactions = future::join_all(transactions).await;

for (signature, transaction) in transactions.into_iter().flatten() {
if let Some(meta) = transaction.transaction.meta {
if meta.err.is_some() {
output
.write_all(format!("{}\n", signature).as_bytes())
.await?;

output.flush().await?;
}
}
}

after = entries.last().map(|cl_audit_v2| cl_audit_v2.id);

if entries.is_empty() {
break;
}
}

Ok(())
}

async fn fetch_transaction(
entry: cl_audits_v2::Model,
solana_rpc: Rpc,
) -> Result<(Signature, EncodedConfirmedTransactionWithStatusMeta)> {
let signature = Signature::try_from(entry.tx.as_ref())?;

let transaction = solana_rpc.get_transaction(&signature).await?;

Ok((signature, transaction))
}
8 changes: 7 additions & 1 deletion ops/src/bubblegum/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::backfiller;
use super::{audit, backfiller};
use anyhow::Result;
use clap::{Args, Subcommand};

Expand All @@ -8,6 +8,9 @@ pub enum Commands {
/// It crawls through trees and backfills any missed tree transactions.
#[clap(name = "backfill")]
Backfill(backfiller::Args),
/// The `audit` commands checks `cl_audits_v2` for any failed transactions and logs them to stdout.
#[clap(name = "audit")]
Audit(audit::Args),
}

#[derive(Debug, Clone, Args)]
Expand All @@ -21,6 +24,9 @@ pub async fn subcommand(subcommand: BubblegumCommand) -> Result<()> {
Commands::Backfill(args) => {
backfiller::run(args).await?;
}
Commands::Audit(args) => {
audit::run(args).await?;
}
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions ops/src/bubblegum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod audit;
mod backfiller;
mod cmd;
mod queue;
Expand Down
11 changes: 9 additions & 2 deletions ops/src/bubblegum/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use clap::Args;
use log::error;
use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value};
use solana_client::rpc_filter::{Memcmp, RpcFilterType};
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature};
use spl_account_compression::id;
use spl_account_compression::state::{
Expand Down Expand Up @@ -146,16 +147,22 @@ impl TreeGapFill {
let sigs = client
.get_signatures_for_address(&self.tree, before, self.until)
.await?;
let sig_count = sigs.len();

for sig in sigs.iter() {
let successful_transactions = sigs
.into_iter()
.filter(|transaction| transaction.err.is_none())
.collect::<Vec<RpcConfirmedTransactionStatusWithSignature>>();

for sig in successful_transactions.iter() {
let sig = Signature::from_str(&sig.signature)?;

sender.send(sig).await?;

before = Some(sig);
}

if sigs.len() < GET_SIGNATURES_FOR_ADDRESS_LIMIT {
if sig_count < GET_SIGNATURES_FOR_ADDRESS_LIMIT {
break;
}
}
Expand Down

0 comments on commit 4632968

Please sign in to comment.