From 2a12b85e3b962a5e1e74461d419e9fa820aaec7a Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Tue, 12 Dec 2023 10:33:48 -0500 Subject: [PATCH] [WIP] Tree Transaction Backfiller (#114) * refactor(backfiller): tree backfilling using getSignaturesForAdress. fetch all trees, fetch associated transactions * feat(backfiller): generate table and model for query last transaction record for fast forwarding tree transaction crawling * feat(backfiller): push transaction payloads to redis through the perkle messenger. mark tree transactons as processed_at so know it completed the index loop. * fix(backfiller): git history changes made from running formatter. just include changes needed by the backfiller. * fix(backfiller): support mock feature for sea-orm by switching to pg pool and sea_orm adapter. --- Cargo.lock | 57 +++++ Cargo.toml | 1 + backfiller.yaml | 0 .../src/dao/generated/cl_audits.rs | 2 +- digital_asset_types/src/dao/generated/mod.rs | 1 + .../src/dao/generated/prelude.rs | 1 + .../src/dao/generated/tree_transactions.rs | 67 ++++++ digital_asset_types/src/dao/scopes/asset.rs | 12 +- .../src/dapi/signatures_for_asset.rs | 6 +- migration/src/lib.rs | 5 + ...8_103949_create_tree_transactions_table.rs | 61 ++++++ nft_ingester/src/error/mod.rs | 4 + .../src/program_transformers/bubblegum/db.rs | 97 ++++++--- .../src/program_transformers/bubblegum/mod.rs | 14 +- tree_backfiller/Cargo.toml | 88 ++++++++ tree_backfiller/src/backfiller.rs | 175 +++++++++++++++ tree_backfiller/src/db.rs | 35 +++ tree_backfiller/src/main.rs | 33 +++ tree_backfiller/src/queue.rs | 63 ++++++ tree_backfiller/src/tree.rs | 204 ++++++++++++++++++ 20 files changed, 887 insertions(+), 39 deletions(-) create mode 100644 backfiller.yaml create mode 100644 digital_asset_types/src/dao/generated/tree_transactions.rs create mode 100644 migration/src/m20231208_103949_create_tree_transactions_table.rs create mode 100644 tree_backfiller/Cargo.toml create mode 100644 tree_backfiller/src/backfiller.rs create mode 100644 tree_backfiller/src/db.rs create mode 100644 tree_backfiller/src/main.rs create mode 100644 tree_backfiller/src/queue.rs create mode 100644 tree_backfiller/src/tree.rs diff --git a/Cargo.lock b/Cargo.lock index 181526a4b..9479ec71d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1702,6 +1702,63 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "das-tree-backfiller" +version = "0.8.0" +dependencies = [ + "anchor-client", + "anchor-lang", + "anyhow", + "async-trait", + "base64 0.21.4", + "blockbuster", + "borsh 0.10.3", + "bs58 0.4.0", + "cadence", + "cadence-macros", + "chrono", + "clap 4.4.8", + "digital_asset_types", + "env_logger 0.10.0", + "figment", + "flatbuffers", + "futures", + "futures-util", + "lazy_static", + "log", + "mpl-bubblegum", + "num-traits", + "plerkle_messenger", + "plerkle_serialization", + "rand 0.8.5", + "redis", + "regex", + "reqwest", + "rust-crypto", + "sea-orm", + "sea-query 0.28.5", + "serde", + "serde_json", + "solana-account-decoder", + "solana-client", + "solana-geyser-plugin-interface", + "solana-sdk", + "solana-sdk-macro", + "solana-transaction-status", + "spl-account-compression", + "spl-concurrent-merkle-tree", + "spl-token 3.5.0", + "sqlx", + "stretto", + "thiserror", + "tokio", + "tokio-postgres", + "tokio-stream", + "tracing-subscriber", + "url", + "uuid", +] + [[package]] name = "das_api" version = "0.7.2" diff --git a/Cargo.toml b/Cargo.toml index 75f64a41f..74c0b8fe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "metaplex-rpc-proxy", "migration", "nft_ingester", + "tree_backfiller", "tools/acc_forwarder", "tools/bgtask_creator", "tools/fetch_trees", diff --git a/backfiller.yaml b/backfiller.yaml new file mode 100644 index 000000000..e69de29bb diff --git a/digital_asset_types/src/dao/generated/cl_audits.rs b/digital_asset_types/src/dao/generated/cl_audits.rs index ada15f7ad..2b08a9b2a 100644 --- a/digital_asset_types/src/dao/generated/cl_audits.rs +++ b/digital_asset_types/src/dao/generated/cl_audits.rs @@ -92,6 +92,6 @@ impl From for ActiveModel { seq: item.seq, leaf_idx: item.leaf_idx, ..Default::default() - } + }; } } diff --git a/digital_asset_types/src/dao/generated/mod.rs b/digital_asset_types/src/dao/generated/mod.rs index 5db9a8690..64fef9216 100644 --- a/digital_asset_types/src/dao/generated/mod.rs +++ b/digital_asset_types/src/dao/generated/mod.rs @@ -16,3 +16,4 @@ pub mod sea_orm_active_enums; pub mod tasks; pub mod token_accounts; pub mod tokens; +pub mod tree_transactions; diff --git a/digital_asset_types/src/dao/generated/prelude.rs b/digital_asset_types/src/dao/generated/prelude.rs index fd5b03e50..50777a005 100644 --- a/digital_asset_types/src/dao/generated/prelude.rs +++ b/digital_asset_types/src/dao/generated/prelude.rs @@ -14,3 +14,4 @@ pub use super::raw_txn::Entity as RawTxn; pub use super::tasks::Entity as Tasks; pub use super::token_accounts::Entity as TokenAccounts; pub use super::tokens::Entity as Tokens; +pub use super::tree_transactions::Entity as TreeTransactions; diff --git a/digital_asset_types/src/dao/generated/tree_transactions.rs b/digital_asset_types/src/dao/generated/tree_transactions.rs new file mode 100644 index 000000000..3fdddf058 --- /dev/null +++ b/digital_asset_types/src/dao/generated/tree_transactions.rs @@ -0,0 +1,67 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.5 + +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "tree_transactions" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)] +pub struct Model { + pub signature: String, + pub tree: Vec, + pub slot: i64, + pub created_at: Option, + pub processed_at: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + Signature, + Tree, + Slot, + CreatedAt, + ProcessedAt, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + Signature, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = String; + fn auto_increment() -> bool { + false + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::Signature => ColumnType::Char(Some(84u32)).def(), + Self::Tree => ColumnType::Binary.def(), + Self::Slot => ColumnType::BigInteger.def(), + Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(), + Self::ProcessedAt => ColumnType::TimestampWithTimeZone.def().null(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index 35e964624..f89064b04 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -469,12 +469,14 @@ pub async fn get_signatures_for_asset( asset_id: Option>, tree_id: Option>, leaf_idx: Option, + sort_direction: Order, pagination: &Pagination, limit: u64, ) -> Result)>, DbErr> { // if tree_id and leaf_idx are provided, use them directly to fetch transactions if let (Some(tree_id), Some(leaf_idx)) = (tree_id, leaf_idx) { - let transactions = fetch_transactions(conn, tree_id, leaf_idx, pagination, limit).await?; + let transactions = + fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?; return Ok(transactions); } @@ -502,7 +504,8 @@ pub async fn get_signatures_for_asset( let leaf_id = asset .nonce .ok_or(DbErr::RecordNotFound("Leaf ID does not exist".to_string()))?; - let transactions = fetch_transactions(conn, tree, leaf_id, pagination, limit).await?; + let transactions = + fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?; Ok(transactions) } else { Ok(Vec::new()) @@ -513,6 +516,7 @@ pub async fn fetch_transactions( conn: &impl ConnectionTrait, tree: Vec, leaf_id: i64, + sort_direction: Order, pagination: &Pagination, limit: u64, ) -> Result)>, DbErr> { @@ -524,8 +528,8 @@ pub async fn fetch_transactions( pagination, limit, stmt, - Order::Asc, - asset::Column::Id + sort_direction, + cl_audits::Column::Id, ); let transactions = stmt.all(conn).await?; let transaction_list: Vec<(String, Option)> = transactions diff --git a/digital_asset_types/src/dapi/signatures_for_asset.rs b/digital_asset_types/src/dapi/signatures_for_asset.rs index 1ca015653..c34bcbf8d 100644 --- a/digital_asset_types/src/dapi/signatures_for_asset.rs +++ b/digital_asset_types/src/dapi/signatures_for_asset.rs @@ -1,10 +1,11 @@ use crate::dao::scopes; use crate::dao::PageOptions; +use crate::rpc::filter::AssetSorting; use crate::rpc::response::TransactionSignatureList; use sea_orm::DatabaseConnection; use sea_orm::DbErr; use super::common::build_transaction_signatures_response; -use super::common::create_pagination; +use super::common::{create_pagination,create_sorting}; pub async fn get_signatures_for_asset( @@ -12,14 +13,17 @@ pub async fn get_signatures_for_asset( asset_id: Option>, tree: Option>, leaf_idx: Option, + sorting: AssetSorting, page_options: &PageOptions, ) -> Result { let pagination = create_pagination(&page_options)?; + let (sort_direction, sort_column) = create_sorting(sorting); let transactions = scopes::asset::get_signatures_for_asset( db, asset_id, tree, leaf_idx, + sort_direction, &pagination, page_options.limit ) diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 42563786e..d7de48413 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -34,6 +34,7 @@ mod m20231101_120101_add_instruction_into_cl_audit; mod m20231101_120101_cl_audit_table_index; mod m20231019_120101_add_seq_numbers_bgum_update_metadata; mod m20231206_120101_remove_was_decompressed; +mod m20231208_103949_create_tree_transactions_table; pub struct Migrator; @@ -77,7 +78,11 @@ impl MigratorTrait for Migrator { ======= Box::new(m20231101_120101_add_instruction_into_cl_audit::Migration), Box::new(m20231101_120101_cl_audit_table_index::Migration), +<<<<<<< HEAD >>>>>>> 4295c8f... feat: Add GetSigntaturesForAsset API +======= + Box::new(m20231208_103949_create_tree_transactions_table::Migration), +>>>>>>> 1968c00... [WIP] Tree Transaction Backfiller (#114) ] } } diff --git a/migration/src/m20231208_103949_create_tree_transactions_table.rs b/migration/src/m20231208_103949_create_tree_transactions_table.rs new file mode 100644 index 000000000..c30061516 --- /dev/null +++ b/migration/src/m20231208_103949_create_tree_transactions_table.rs @@ -0,0 +1,61 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(TreeTransactions::Table) + .if_not_exists() + .col( + ColumnDef::new(TreeTransactions::Signature) + .char_len(88) + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(TreeTransactions::Tree).binary().not_null()) + .col(ColumnDef::new(TreeTransactions::Slot).big_integer().not_null()) + .col(ColumnDef::new(TreeTransactions::CreatedAt).timestamp_with_time_zone().default("now()")) + .col(ColumnDef::new(TreeTransactions::ProcessedAt).timestamp_with_time_zone()) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("tree_slot_index") + .table(TreeTransactions::Table) + .col(TreeTransactions::Tree) + .col(TreeTransactions::Slot) + .unique() + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned()) + .await?; + + manager + .drop_table(Table::drop().table(TreeTransactions::Table).to_owned()) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum TreeTransactions { + Table, + Signature, + Tree, + CreatedAt, + ProcessedAt, + Slot, +} diff --git a/nft_ingester/src/error/mod.rs b/nft_ingester/src/error/mod.rs index 37ed5f24b..46bd678f4 100644 --- a/nft_ingester/src/error/mod.rs +++ b/nft_ingester/src/error/mod.rs @@ -52,6 +52,10 @@ pub enum IngesterError { HttpError { status_code: String }, #[error("AssetIndex Error {0}")] AssetIndexError(String), + #[error("TryFromInt Error {0}")] + TryFromInt(#[from] std::num::TryFromIntError), + #[error("Chrono FixedOffset Error")] + ChronoFixedOffset, } impl From for IngesterError { diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index cfb9d25a4..c301cbf03 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,7 +1,7 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items, cl_audits, - cl_items, + cl_items, tree_transactions, sea_orm_active_enums::{ ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions, @@ -10,11 +10,73 @@ use digital_asset_types::dao::{ use log::{debug, info, error}; use mpl_bubblegum::types::{Collection, Creator}; use sea_orm::{ - query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, + query::*, sea_query::OnConflict, ActiveModelTrait, ActiveValue::Set, ColumnTrait, DbBackend, + EntityTrait, }; use spl_account_compression::events::ChangeLogEventV1; use std::collections::HashSet; +/// Mark tree transaction as processed. If the transaction already exists, update the `processed_at` field. +/// +/// This function takes in a tree ID, slot, transaction ID, and a transaction object. +/// It first checks if a tree transaction with the given transaction ID already exists. +/// If it does, it updates the `processed_at` field of the existing tree transaction with the current time. +/// If it doesn't, it creates a new tree transaction with the provided parameters and saves it. +/// +/// # Arguments +/// +/// * `tree_id` - A vector of bytes representing the ID of the tree. +/// * `slot` - A 64-bit unsigned integer representing the slot. +/// * `txn_id` - A string slice representing the transaction ID. +/// * `txn` - A reference to a transaction object. +/// +/// # Returns +/// +/// This function returns a `Result` that contains an empty tuple, or an `IngesterError` if the operation fails. +pub async fn save_tree_transaction<'c, T>( + tree_id: Vec, + slot: u64, + txn_id: &str, + txn: &T, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let now = chrono::Utc::now() + .with_timezone(&chrono::FixedOffset::east_opt(0).ok_or(IngesterError::ChronoFixedOffset)?); + + let tree_transaction = tree_transactions::Entity::find() + .filter(tree_transactions::Column::Signature.eq(txn_id)) + .one(txn) + .await?; + + if let Some(tree_transaction) = tree_transaction { + let mut tree_transaction: tree_transactions::ActiveModel = tree_transaction.into(); + + tree_transaction.processed_at = Set(Some(now)); + + tree_transaction.save(txn).await?; + } else { + let tree_transaction = tree_transactions::ActiveModel { + signature: Set(txn_id.to_string()), + slot: Set(i64::try_from(slot)?), + tree: Set(tree_id.to_vec()), + processed_at: Set(Some(now)), + ..Default::default() + }; + + tree_transactions::Entity::insert(tree_transaction) + .on_conflict( + OnConflict::column(tree_transactions::Column::Signature) + .do_nothing() + .to_owned(), + ) + .exec(txn) + .await?; + } + Ok(()) +} + pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, slot: u64, @@ -114,36 +176,7 @@ where } } - // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert - // a single row into the `backfill_items` table. This way if an incomplete path was inserted - // into `cl_items` due to an error, a gap will be created for the tree and the backfiller will - // fix it. - if i - 1 == depth as i64 { - // See if the tree already exists in the `backfill_items` table. - let rows = backfill_items::Entity::find() - .filter(backfill_items::Column::Tree.eq(tree_id)) - .limit(1) - .all(txn) - .await?; - - // If the tree does not exist in `backfill_items` and the sequence number is greater than 1, - // then we know we will need to backfill the tree from sequence number 1 up to the current - // sequence number. So in this case we set at flag to force checking the tree. - let force_chk = rows.is_empty() && change_log_event.seq > 1; - - info!("Adding to backfill_items table at level {}", i - 1); - let item = backfill_items::ActiveModel { - tree: Set(tree_id.to_vec()), - seq: Set(change_log_event.seq as i64), - slot: Set(slot as i64), - force_chk: Set(force_chk), - backfilled: Set(false), - failed: Set(false), - ..Default::default() - }; - - backfill_items::Entity::insert(item).exec(txn).await?; - } + // TODO: drop `backfill_items` table if not needed anymore for backfilling Ok(()) //TODO -> set maximum size of path and break into multiple statements diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index 4ee402f17..1c846d68c 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -91,7 +91,8 @@ where InstructionName::VerifyCollection | InstructionName::UnverifyCollection | InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, cl_audits, ix_str).await?; + collection_verification::process(parsing_result, bundle, txn, cl_audits, ix_str) + .await?; } InstructionName::SetDecompressibleState => (), // Nothing to index. InstructionName::UpdateMetadata => { @@ -104,6 +105,17 @@ where } _ => debug!("Bubblegum: Not Implemented Instruction"), } + // TODO: assuming tree update available on all transactions but need to confirm. + if let Some(tree_update) = &parsing_result.tree_update { + save_tree_transaction( + tree_update.id.to_bytes().to_vec(), + bundle.slot, + bundle.txn_id, + txn, + ) + .await?; + } + Ok(()) } diff --git a/tree_backfiller/Cargo.toml b/tree_backfiller/Cargo.toml new file mode 100644 index 000000000..4d4b1285d --- /dev/null +++ b/tree_backfiller/Cargo.toml @@ -0,0 +1,88 @@ +[package] +name = "das-tree-backfiller" +version = "0.8.0" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +log = "0.4.17" +env_logger = "0.10.0" +anyhow = "1.0.75" +redis = { version = "0.22.3", features = [ + "aio", + "tokio-comp", + "streams", + "tokio-native-tls-comp", +] } +futures = { version = "0.3.25" } +futures-util = "0.3.27" +base64 = "0.21.0" +thiserror = "1.0.31" +serde_json = "1.0.81" +anchor-client = "0.28.0" +tokio = { version = "1.26.0", features = ["full", "tracing"] } +sqlx = { version = "0.6.2", features = [ + "macros", + "runtime-tokio-rustls", + "postgres", + "uuid", + "offline", + "json", +] } +sea-orm = { version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", + "mock", +] } +sea-query = { version = "0.28.1", features = ["postgres-array"] } +chrono = "0.4.19" +tokio-postgres = "0.7.7" +serde = "1.0.136" +bs58 = "0.4.0" +reqwest = "0.11.11" +plerkle_messenger = { version = "1.6.0", features = ['redis'] } +plerkle_serialization = { version = "1.6.0" } +flatbuffers = "23.1.21" +lazy_static = "1.4.0" +regex = "1.5.5" +digital_asset_types = { path = "../digital_asset_types", features = [ + "json_types", + "sql_types", +] } +mpl-bubblegum = "1.0.1-beta.3" +spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } +spl-concurrent-merkle-tree = "0.2.0" +uuid = "1.0.0" +async-trait = "0.1.53" +num-traits = "0.2.15" +blockbuster = "0.9.0-beta.1" +figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } +cadence = "0.29.0" +cadence-macros = "0.29.0" +solana-sdk = "~1.16.16" +solana-client = "~1.16.16" +spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } +solana-transaction-status = "~1.16.16" +solana-account-decoder = "~1.16.16" +solana-geyser-plugin-interface = "~1.16.16" +solana-sdk-macro = "~1.16.16" +rand = "0.8.5" +rust-crypto = "0.2.36" +url = "2.3.1" +anchor-lang = "0.28.0" +borsh = "~0.10.3" +stretto = { version = "0.7", features = ["async"] } +tokio-stream = "0.1.12" +tracing-subscriber = { version = "0.3.16", features = [ + "json", + "env-filter", + "ansi", +] } +clap = { version = "4.2.2", features = ["derive", "cargo", "env"] } + +[lints] +workspace = true diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs new file mode 100644 index 000000000..ad4d34cff --- /dev/null +++ b/tree_backfiller/src/backfiller.rs @@ -0,0 +1,175 @@ +use crate::db; +use crate::{queue, tree}; +use anyhow::Result; +use clap::Parser; +use log::{debug, error, info}; +use sea_orm::SqlxPostgresConnector; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::signature::Signature; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tokio::sync::{mpsc, Semaphore}; +use tokio::time::Duration; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + /// Solana RPC URL + #[arg(long, env)] + pub solana_rpc_url: String, + + /// Number of tree crawler workers + #[arg(long, env, default_value = "1")] + pub tree_crawler_count: usize, + + /// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel. + #[arg(long, env, default_value = "1")] + pub signature_channel_size: usize, + + #[arg(long, env, default_value = "1")] + pub queue_channel_size: usize, + + #[arg(long, env, default_value = "3000")] + pub transaction_check_timeout: u64, + + /// Database configuration + #[clap(flatten)] + pub database: db::PoolArgs, + + /// Redis configuration + #[clap(flatten)] + pub queue: queue::QueueArgs, +} + +/// A thread-safe counter. +pub struct Counter(Arc); + +impl Counter { + /// Creates a new counter initialized to zero. + pub fn new() -> Self { + Self(Arc::new(AtomicUsize::new(0))) + } + + /// Increments the counter by one. + pub fn increment(&self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + + /// Decrements the counter by one. + pub fn decrement(&self) { + self.0.fetch_sub(1, Ordering::SeqCst); + } + + /// Returns the current value of the counter. + pub fn get(&self) -> usize { + self.0.load(Ordering::SeqCst) + } + + /// Returns a future that resolves when the counter reaches zero. + /// The future periodically checks the counter value and sleeps for a short duration. + pub fn zero(&self) -> impl std::future::Future { + let counter = self.clone(); + async move { + while counter.get() > 0 { + println!("Counter value: {}", counter.get()); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } +} + +impl Clone for Counter { + /// Returns a clone of the counter. + /// The returned counter shares the same underlying atomic integer. + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +/// Runs the tree backfiller. +/// +/// This function takes a `Config` as input and returns a `Result<()>`. +/// It creates an `RpcClient` and retrieves all trees. +/// It then spawns a thread for each tree and a separate thread to handle transaction workers. +/// The function waits for all threads to finish before returning. +pub async fn run(config: Args) -> Result<()> { + let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url)); + let sig_solana_rpc = Arc::clone(&solana_rpc); + + let pool = db::connect(config.database).await?; + + let (queue_sender, mut queue_receiver) = mpsc::channel::>(config.queue_channel_size); + let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); + + let transaction_worker_count = Counter::new(); + let transaction_worker_count_check = transaction_worker_count.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + Some(signature) = sig_receiver.recv() => { + let solana_rpc = Arc::clone(&sig_solana_rpc); + let transaction_worker_count_sig = transaction_worker_count.clone(); + let queue_sender = queue_sender.clone(); + + transaction_worker_count_sig.increment(); + + let transaction_task = async move { + if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await { + error!("retrieving transaction: {:?}", e); + } + + transaction_worker_count_sig.decrement(); + }; + + tokio::spawn(transaction_task); + }, + else => break, + } + } + + Ok::<(), anyhow::Error>(()) + }); + + let queue_handler = tokio::spawn(async move { + let mut queue = queue::Queue::setup(config.queue).await?; + + while let Some(data) = queue_receiver.recv().await { + if let Err(e) = queue.push(&data).await { + error!("pushing to queue: {:?}", e); + } + } + + Ok::<(), anyhow::Error>(()) + }); + + let trees = tree::all(&solana_rpc).await?; + + let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); + let mut crawl_handlers = Vec::with_capacity(trees.len()); + + for tree in trees { + let solana_rpc = Arc::clone(&solana_rpc); + let semaphore = Arc::clone(&semaphore); + let sig_sender = sig_sender.clone(); + let pool = pool.clone(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + + let crawl_handler = tokio::spawn(async move { + let _permit = semaphore.acquire().await?; + + if let Err(e) = tree.crawl(solana_rpc, sig_sender, conn).await { + error!("crawling tree: {:?}", e); + } + + Ok::<(), anyhow::Error>(()) + }); + + crawl_handlers.push(crawl_handler); + } + + futures::future::try_join_all(crawl_handlers).await?; + transaction_worker_count_check.zero().await; + let _ = queue_handler.await?; + + Ok(()) +} diff --git a/tree_backfiller/src/db.rs b/tree_backfiller/src/db.rs new file mode 100644 index 000000000..fd718113c --- /dev/null +++ b/tree_backfiller/src/db.rs @@ -0,0 +1,35 @@ +use anyhow::Result; +use clap::Parser; +use sqlx::{ + postgres::{PgConnectOptions, PgPoolOptions}, + PgPool, +}; + +#[derive(Debug, Parser, Clone)] +pub struct PoolArgs { + #[arg(long, env)] + pub database_url: String, + #[arg(long, env, default_value = "125")] + pub database_max_connections: u32, + #[arg(long, env, default_value = "5")] + pub database_min_connections: u32, +} + +///// Establishes a connection to the database using the provided configuration. +///// +///// # Arguments +///// +///// * `config` - A `PoolArgs` struct containing the database URL and the minimum and maximum number of connections. +///// +///// # Returns +///// +///// * `Result` - On success, returns a `DatabaseConnection`. On failure, returns a `DbErr`. +pub async fn connect(config: PoolArgs) -> Result { + let options: PgConnectOptions = config.database_url.parse()?; + + PgPoolOptions::new() + .min_connections(config.database_min_connections) + .max_connections(config.database_max_connections) + .connect_with(options) + .await +} diff --git a/tree_backfiller/src/main.rs b/tree_backfiller/src/main.rs new file mode 100644 index 000000000..2b5da240b --- /dev/null +++ b/tree_backfiller/src/main.rs @@ -0,0 +1,33 @@ +mod backfiller; +mod db; +mod queue; +mod tree; + +use anyhow::Result; +use clap::{Parser, Subcommand}; + +#[derive(Debug, Parser)] +#[clap(author, version)] +struct Args { + #[command(subcommand)] + command: Command, +} + +#[derive(Debug, Clone, Subcommand)] +enum Command { + /// The 'run' command is used to cross-reference the index against on-chain accounts. + /// It crawls through trees and backfills any missed tree transactions. + /// This is particularly useful for ensuring data consistency and completeness. + #[command(name = "run")] + Run(backfiller::Args), +} +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + env_logger::init(); + + match args.command { + Command::Run(config) => backfiller::run(config).await, + } +} diff --git a/tree_backfiller/src/queue.rs b/tree_backfiller/src/queue.rs new file mode 100644 index 000000000..215115974 --- /dev/null +++ b/tree_backfiller/src/queue.rs @@ -0,0 +1,63 @@ +use anyhow::Result; +use clap::Parser; +use figment::value::{Dict, Value}; +use plerkle_messenger::{ + redis_messenger::RedisMessenger, Messenger, MessengerConfig, MessengerError, MessengerType, +}; + +const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL"; + +#[derive(Clone, Debug, Parser)] +pub struct QueueArgs { + #[arg(long, env)] + pub messenger_redis_url: String, + #[arg(long, env, default_value = "100")] + pub messenger_redis_batch_size: String, + #[arg(long, env, default_value = "10000000")] + pub messenger_stream_max_buffer_size: usize, +} + +impl From for MessengerConfig { + fn from(args: QueueArgs) -> Self { + let mut connection_config = Dict::new(); + + connection_config.insert( + "redis_connection_str".to_string(), + Value::from(args.messenger_redis_url), + ); + + connection_config.insert( + "batch_size".to_string(), + Value::from(args.messenger_redis_batch_size), + ); + + Self { + messenger_type: MessengerType::Redis, + connection_config: connection_config, + } + } +} + +#[derive(Debug)] +pub struct Queue(RedisMessenger); + +impl Queue { + pub async fn setup(config: QueueArgs) -> Result { + let mut messenger = RedisMessenger::new(config.clone().into()).await?; + + messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?; + + messenger + .set_buffer_size( + TRANSACTION_BACKFILL_STREAM, + config.messenger_stream_max_buffer_size, + ) + .await; + + Ok(Self(messenger)) + } + + pub async fn push(&mut self, message: &[u8]) -> Result<(), MessengerError> { + self.0.send(TRANSACTION_BACKFILL_STREAM, message).await + } +} diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs new file mode 100644 index 000000000..f3214a105 --- /dev/null +++ b/tree_backfiller/src/tree.rs @@ -0,0 +1,204 @@ +use crate::queue::Queue; +use anyhow::Result; +use borsh::BorshDeserialize; +use clap::Args; +use digital_asset_types::dao::tree_transactions; +use flatbuffers::FlatBufferBuilder; +use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; +use sea_orm::{ + sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, + QueryFilter, QueryOrder, +}; +use solana_account_decoder::UiAccountEncoding; +use solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_client::GetConfirmedSignaturesForAddress2Config, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionConfig}, + rpc_filter::{Memcmp, RpcFilterType}, +}; +use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; +use solana_transaction_status::UiTransactionEncoding; +use spl_account_compression::id; +use spl_account_compression::state::{ + merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, +}; +use std::str::FromStr; +use std::sync::Arc; +use thiserror::Error as ThisError; +use tokio::sync::mpsc::Sender; + +const GET_SIGNATURES_FOR_ADDRESS_LIMIT: usize = 1000; + +#[derive(Debug, Clone, Default, Args)] +pub struct ConfigBackfiller { + /// Solana RPC URL + #[arg(long, env)] + pub solana_rpc_url: String, +} + +#[derive(ThisError, Debug)] +pub enum TreeErrorKind { + #[error("solana rpc")] + Rpc(#[from] solana_client::client_error::ClientError), + #[error("anchor")] + Achor(#[from] anchor_client::anchor_lang::error::Error), + #[error("perkle serialize")] + PerkleSerialize(#[from] plerkle_serialization::error::PlerkleSerializationError), + #[error("perkle messenger")] + PlerkleMessenger(#[from] plerkle_messenger::MessengerError), + #[error("queue send")] + QueueSend(#[from] tokio::sync::mpsc::error::SendError>), +} +#[derive(Debug, Clone)] +pub struct TreeHeaderResponse { + pub max_depth: u32, + pub max_buffer_size: u32, + pub creation_slot: u64, + pub size: usize, +} + +impl TryFrom for TreeHeaderResponse { + type Error = TreeErrorKind; + + fn try_from(payload: ConcurrentMerkleTreeHeader) -> Result { + let size = merkle_tree_get_size(&payload)?; + Ok(Self { + max_depth: payload.get_max_depth(), + max_buffer_size: payload.get_max_buffer_size(), + creation_slot: payload.get_creation_slot(), + size, + }) + } +} + +#[derive(Debug, Clone)] +pub struct TreeResponse { + pub pubkey: Pubkey, + pub tree_header: TreeHeaderResponse, +} + +impl TreeResponse { + pub fn try_from_rpc(pubkey: Pubkey, account: Account) -> Result { + let (header_bytes, _rest) = account.data.split_at(CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1); + let header: ConcurrentMerkleTreeHeader = + ConcurrentMerkleTreeHeader::try_from_slice(header_bytes)?; + + let (auth, _) = Pubkey::find_program_address(&[pubkey.as_ref()], &mpl_bubblegum::ID); + + header.assert_valid_authority(&auth)?; + + let tree_header = header.try_into()?; + + Ok(Self { + pubkey, + tree_header, + }) + } +} + +pub async fn all(client: &Arc) -> Result, TreeErrorKind> { + let config = RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![1u8], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }; + + Ok(client + .get_program_accounts_with_config(&id(), config) + .await? + .into_iter() + .filter_map(|(pubkey, account)| TreeResponse::try_from_rpc(pubkey, account).ok()) + .collect()) +} + +impl TreeResponse { + pub async fn crawl( + &self, + client: Arc, + sender: Sender, + conn: DatabaseConnection, + ) -> Result<()> { + let mut before = None; + + let until = tree_transactions::Entity::find() + .filter(tree_transactions::Column::Tree.eq(self.pubkey.as_ref())) + .order_by_desc(tree_transactions::Column::Slot) + .one(&conn) + .await? + .map(|t| Signature::from_str(&t.signature).ok()) + .flatten(); + + loop { + let sigs = client + .get_signatures_for_address_with_config( + &self.pubkey, + GetConfirmedSignaturesForAddress2Config { + before, + until, + ..GetConfirmedSignaturesForAddress2Config::default() + }, + ) + .await?; + + for sig in sigs.iter() { + let slot = i64::try_from(sig.slot)?; + let sig = Signature::from_str(&sig.signature)?; + + let tree_transaction = tree_transactions::ActiveModel { + signature: Set(sig.to_string()), + tree: Set(self.pubkey.as_ref().to_vec()), + slot: Set(slot), + ..Default::default() + }; + + tree_transactions::Entity::insert(tree_transaction) + .on_conflict( + OnConflict::column(tree_transactions::Column::Signature) + .do_nothing() + .to_owned(), + ) + .exec(&conn) + .await?; + + sender.send(sig.clone()).await?; + + before = Some(sig); + } + + if sigs.len() < GET_SIGNATURES_FOR_ADDRESS_LIMIT { + break; + } + } + + Ok(()) + } +} + +pub async fn transaction<'a>( + client: Arc, + sender: Sender>, + signature: Signature, +) -> Result<(), TreeErrorKind> { + let transaction = client + .get_transaction_with_config( + &signature, + RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Base58), + max_supported_transaction_version: Some(0), + ..RpcTransactionConfig::default() + }, + ) + .await?; + + let message = seralize_encoded_transaction_with_status(FlatBufferBuilder::new(), transaction)?; + + sender.send(message.finished_data().to_vec()).await?; + + Ok(()) +}