From c311b1124d55d127ee9154418163238d9192ccf5 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Mon, 11 Dec 2023 13:20:23 +0100 Subject: [PATCH] feat(backfiller): push transaction payloads to redis through the perkle messenger. mark tree transactons as processed_at so know it completed the index loop. --- .../src/dao/generated/cl_audits.rs | 4 +- .../src/dao/generated/tree_transactions.rs | 2 +- digital_asset_types/src/dao/scopes/asset.rs | 14 ++- ...8_103949_create_tree_transactions_table.rs | 2 +- nft_ingester/src/account_updates.rs | 6 +- nft_ingester/src/database.rs | 16 +-- nft_ingester/src/error/mod.rs | 4 + nft_ingester/src/main.rs | 4 +- nft_ingester/src/metrics.rs | 9 +- .../src/program_transformers/bubblegum/db.rs | 99 ++++++++++++------- .../src/program_transformers/bubblegum/mod.rs | 12 +++ nft_ingester/src/stream.rs | 6 +- nft_ingester/src/transaction_notifications.rs | 10 +- tools/acc_forwarder/src/main.rs | 6 +- tree_backfiller/src/backfiller.rs | 65 ++++++------ tree_backfiller/src/db.rs | 9 ++ tree_backfiller/src/main.rs | 2 +- tree_backfiller/src/queue.rs | 63 ++++++++++++ tree_backfiller/src/tree.rs | 54 +++++++--- 19 files changed, 273 insertions(+), 114 deletions(-) create mode 100644 tree_backfiller/src/queue.rs diff --git a/digital_asset_types/src/dao/generated/cl_audits.rs b/digital_asset_types/src/dao/generated/cl_audits.rs index ac22cc902..f013a7138 100644 --- a/digital_asset_types/src/dao/generated/cl_audits.rs +++ b/digital_asset_types/src/dao/generated/cl_audits.rs @@ -92,6 +92,6 @@ impl From for ActiveModel { seq: item.seq, leaf_idx: item.leaf_idx, ..Default::default() - } + }; } -} \ No newline at end of file +} diff --git a/digital_asset_types/src/dao/generated/tree_transactions.rs b/digital_asset_types/src/dao/generated/tree_transactions.rs index d1eae60f8..3fdddf058 100644 --- a/digital_asset_types/src/dao/generated/tree_transactions.rs +++ b/digital_asset_types/src/dao/generated/tree_transactions.rs @@ -49,7 +49,7 @@ impl ColumnTrait for Column { type EntityName = Entity; fn def(&self) -> ColumnDef { match self { - Self::Signature => ColumnType::Char(Some(64u32)).def(), + Self::Signature => ColumnType::Char(Some(84u32)).def(), Self::Tree => ColumnType::Binary.def(), Self::Slot => ColumnType::BigInteger.def(), Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(), diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index ebe406e25..da58b5d23 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -414,7 +414,8 @@ pub async fn get_signatures_for_asset( ) -> Result)>, DbErr> { // if tree_id and leaf_idx are provided, use them directly to fetch transactions if let (Some(tree_id), Some(leaf_idx)) = (tree_id, leaf_idx) { - let transactions = fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?; + let transactions = + fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?; return Ok(transactions); } @@ -442,7 +443,8 @@ pub async fn get_signatures_for_asset( let leaf_id = asset .nonce .ok_or(DbErr::RecordNotFound("Leaf ID does not exist".to_string()))?; - let transactions = fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?; + let transactions = + fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?; Ok(transactions) } else { Ok(Vec::new()) @@ -461,7 +463,13 @@ pub async fn fetch_transactions( stmt = stmt.filter(cl_audits::Column::LeafIdx.eq(leaf_id)); stmt = stmt.order_by(cl_audits::Column::CreatedAt, sea_orm::Order::Desc); - stmt = paginate(pagination, limit, stmt, sort_direction, cl_audits::Column::Id); + stmt = paginate( + pagination, + limit, + stmt, + sort_direction, + cl_audits::Column::Id, + ); let transactions = stmt.all(conn).await?; let transaction_list: Vec<(String, Option)> = transactions .into_iter() diff --git a/migration/src/m20231208_103949_create_tree_transactions_table.rs b/migration/src/m20231208_103949_create_tree_transactions_table.rs index e592d0c7a..c30061516 100644 --- a/migration/src/m20231208_103949_create_tree_transactions_table.rs +++ b/migration/src/m20231208_103949_create_tree_transactions_table.rs @@ -13,7 +13,7 @@ impl MigrationTrait for Migration { .if_not_exists() .col( ColumnDef::new(TreeTransactions::Signature) - .char_len(64) + .char_len(88) .not_null() .primary_key(), ) diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index 9791b7500..644b37756 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -65,7 +65,11 @@ pub fn account_worker( }) } -async fn handle_account(manager: Arc, item: RecvData, stream_key: &'static str) -> Option { +async fn handle_account( + manager: Arc, + item: RecvData, + stream_key: &'static str, +) -> Option { let id = item.id; let mut ret_id = None; let data = item.data; diff --git a/nft_ingester/src/database.rs b/nft_ingester/src/database.rs index 578f58cbc..77f8dd2c5 100644 --- a/nft_ingester/src/database.rs +++ b/nft_ingester/src/database.rs @@ -1,8 +1,9 @@ -use sqlx::{postgres::{PgPoolOptions, PgConnectOptions}, PgPool, ConnectOptions}; - -use crate::{ - config::{IngesterConfig, IngesterRole}, +use sqlx::{ + postgres::{PgConnectOptions, PgPoolOptions}, + ConnectOptions, PgPool, }; + +use crate::config::{IngesterConfig, IngesterRole}; const BARE_MINIMUM_CONNECTIONS: u32 = 5; const DEFAULT_MAX: u32 = 125; pub async fn setup_database(config: IngesterConfig) -> PgPool { @@ -19,8 +20,11 @@ pub async fn setup_database(config: IngesterConfig) -> PgPool { let mut options: PgConnectOptions = url.parse().unwrap(); options.log_statements(log::LevelFilter::Trace); - options.log_slow_statements(log::LevelFilter::Debug, std::time::Duration::from_millis(500)); - + options.log_slow_statements( + log::LevelFilter::Debug, + std::time::Duration::from_millis(500), + ); + let pool = PgPoolOptions::new() .min_connections(BARE_MINIMUM_CONNECTIONS) .max_connections(max) diff --git a/nft_ingester/src/error/mod.rs b/nft_ingester/src/error/mod.rs index 37ed5f24b..46bd678f4 100644 --- a/nft_ingester/src/error/mod.rs +++ b/nft_ingester/src/error/mod.rs @@ -52,6 +52,10 @@ pub enum IngesterError { HttpError { status_code: String }, #[error("AssetIndex Error {0}")] AssetIndexError(String), + #[error("TryFromInt Error {0}")] + TryFromInt(#[from] std::num::TryFromIntError), + #[error("Chrono FixedOffset Error")] + ChronoFixedOffset, } impl From for IngesterError { diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 3d08cafb1..6ae64a15e 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -27,7 +27,8 @@ use chrono::Duration; use clap::{arg, command, value_parser}; use log::{error, info}; use plerkle_messenger::{ - redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM + redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM, + TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM, }; use std::{path::PathBuf, time}; use tokio::{signal, task::JoinSet}; @@ -118,7 +119,6 @@ pub async fn main() -> Result<(), IngesterError> { TRANSACTION_BACKFILL_STREAM, )?; - if let Some(t) = timer_acc.start::().await { tasks.spawn(t); } diff --git a/nft_ingester/src/metrics.rs b/nft_ingester/src/metrics.rs index 0e44d69c7..a8b5006ae 100644 --- a/nft_ingester/src/metrics.rs +++ b/nft_ingester/src/metrics.rs @@ -5,10 +5,7 @@ use cadence_macros::{is_global_default_set, set_global_default, statsd_count, st use log::{error, warn}; use tokio::time::Instant; -use crate::{ - config::IngesterConfig, - error::IngesterError, -}; +use crate::{config::IngesterConfig, error::IngesterError}; #[macro_export] macro_rules! metric { @@ -32,9 +29,7 @@ pub fn setup_metrics(config: &IngesterConfig) { let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap(); let queuing_sink = QueuingMetricSink::from(udp_sink); let builder = StatsdClient::builder("das_ingester", queuing_sink); - let client = builder - .with_tag("env", env) - .build(); + let client = builder.with_tag("env", env).build(); set_global_default(client); } } diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 0d6be6dfa..10ff0548f 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,15 +1,76 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ - asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, + asset, asset_creators, asset_grouping, cl_audits, cl_items, tree_transactions, }; -use log::{debug, info, error}; +use log::{debug, error, info}; use mpl_bubblegum::types::Collection; use sea_orm::{ query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, }; use spl_account_compression::events::ChangeLogEventV1; -use std::convert::From; +use sea_orm::ActiveModelTrait; + +/// Mark tree transaction as processed. If the transaction already exists, update the `processed_at` field. +/// +/// This function takes in a tree ID, slot, transaction ID, and a transaction object. +/// It first checks if a tree transaction with the given transaction ID already exists. +/// If it does, it updates the `processed_at` field of the existing tree transaction with the current time. +/// If it doesn't, it creates a new tree transaction with the provided parameters and saves it. +/// +/// # Arguments +/// +/// * `tree_id` - A vector of bytes representing the ID of the tree. +/// * `slot` - A 64-bit unsigned integer representing the slot. +/// * `txn_id` - A string slice representing the transaction ID. +/// * `txn` - A reference to a transaction object. +/// +/// # Returns +/// +/// This function returns a `Result` that contains an empty tuple, or an `IngesterError` if the operation fails. +pub async fn save_tree_transaction<'c, T>( + tree_id: Vec, + slot: u64, + txn_id: &str, + txn: &T, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let now = chrono::Utc::now() + .with_timezone(&chrono::FixedOffset::east_opt(0).ok_or(IngesterError::ChronoFixedOffset)?); + + let tree_transaction = tree_transactions::Entity::find() + .filter(tree_transactions::Column::Signature.eq(txn_id)) + .one(txn) + .await?; + + if let Some(tree_transaction) = tree_transaction { + let mut tree_transaction: tree_transactions::ActiveModel = tree_transaction.into(); + + tree_transaction.processed_at = Set(Some(now)); + + tree_transaction.save(txn).await?; + } else { + let tree_transaction = tree_transactions::ActiveModel { + signature: Set(txn_id.to_string()), + slot: Set(i64::try_from(slot)?), + tree: Set(tree_id.to_vec()), + processed_at: Set(Some(now)), + ..Default::default() + }; + + tree_transactions::Entity::insert(tree_transaction) + .on_conflict( + OnConflict::column(tree_transactions::Column::Signature) + .do_nothing() + .to_owned(), + ) + .exec(txn) + .await?; + } + Ok(()) +} pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, @@ -44,6 +105,7 @@ where let mut i: i64 = 0; let depth = change_log_event.path.len() - 1; let tree_id = change_log_event.id.as_ref(); + for p in change_log_event.path.iter() { let node_idx = p.index as i64; info!( @@ -103,37 +165,6 @@ where } } - // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert - // a single row into the `backfill_items` table. This way if an incomplete path was inserted - // into `cl_items` due to an error, a gap will be created for the tree and the backfiller will - // fix it. - if i - 1 == depth as i64 { - // See if the tree already exists in the `backfill_items` table. - let rows = backfill_items::Entity::find() - .filter(backfill_items::Column::Tree.eq(tree_id)) - .limit(1) - .all(txn) - .await?; - - // If the tree does not exist in `backfill_items` and the sequence number is greater than 1, - // then we know we will need to backfill the tree from sequence number 1 up to the current - // sequence number. So in this case we set at flag to force checking the tree. - let force_chk = rows.is_empty() && change_log_event.seq > 1; - - info!("Adding to backfill_items table at level {}", i - 1); - let item = backfill_items::ActiveModel { - tree: Set(tree_id.to_vec()), - seq: Set(change_log_event.seq as i64), - slot: Set(slot as i64), - force_chk: Set(force_chk), - backfilled: Set(false), - failed: Set(false), - ..Default::default() - }; - - backfill_items::Entity::insert(item).exec(txn).await?; - } - Ok(()) //TODO -> set maximum size of path and break into multiple statements } diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index 4e735576a..25df7455d 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -96,6 +96,18 @@ where } _ => debug!("Bubblegum: Not Implemented Instruction"), } + + // TODO: assuming tree update available on all transactions but need to confirm. + if let Some(tree_update) = &parsing_result.tree_update { + save_tree_transaction( + tree_update.id.to_bytes().to_vec(), + bundle.slot, + bundle.txn_id, + txn, + ) + .await?; + } + Ok(()) } diff --git a/nft_ingester/src/stream.rs b/nft_ingester/src/stream.rs index 3b36397b3..e5db78d33 100644 --- a/nft_ingester/src/stream.rs +++ b/nft_ingester/src/stream.rs @@ -1,15 +1,13 @@ - use crate::{error::IngesterError, metric}; use cadence_macros::{is_global_default_set, statsd_count, statsd_gauge}; -use log::{error}; +use log::error; use plerkle_messenger::{Messenger, MessengerConfig}; use tokio::{ - task::{JoinHandle}, + task::JoinHandle, time::{self, Duration}, }; - pub struct StreamSizeTimer { interval: tokio::time::Duration, messenger_config: MessengerConfig, diff --git a/nft_ingester/src/transaction_notifications.rs b/nft_ingester/src/transaction_notifications.rs index 6ca7cc5f4..c00cbec25 100644 --- a/nft_ingester/src/transaction_notifications.rs +++ b/nft_ingester/src/transaction_notifications.rs @@ -6,9 +6,7 @@ use crate::{ use cadence_macros::{is_global_default_set, statsd_count, statsd_time}; use chrono::Utc; use log::{debug, error}; -use plerkle_messenger::{ - ConsumptionType, Messenger, MessengerConfig, RecvData, -}; +use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}; use plerkle_serialization::root_as_transaction_info; use sqlx::{Pool, Postgres}; @@ -69,7 +67,11 @@ pub fn transaction_worker( }) } -async fn handle_transaction(manager: Arc, item: RecvData, stream_key: &'static str) -> Option { +async fn handle_transaction( + manager: Arc, + item: RecvData, + stream_key: &'static str, +) -> Option { let mut ret_id = None; if item.tries > 0 { metric! { diff --git a/tools/acc_forwarder/src/main.rs b/tools/acc_forwarder/src/main.rs index 61349a8fb..cffb4e1ae 100644 --- a/tools/acc_forwarder/src/main.rs +++ b/tools/acc_forwarder/src/main.rs @@ -416,7 +416,11 @@ async fn send_account( let fbb = serialize_account(fbb, &account_info, slot, is_startup); let bytes = fbb.finished_data(); - messenger.lock().await.send(ACCOUNT_BACKFILL_STREAM, bytes).await?; + messenger + .lock() + .await + .send(ACCOUNT_BACKFILL_STREAM, bytes) + .await?; info!("sent account {} to stream", pubkey); ACC_FORWARDER_SENT.inc(); diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index 4e477424a..c7fb37582 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -1,5 +1,5 @@ use crate::db; -use crate::tree; +use crate::{queue, tree}; use anyhow::Result; use clap::Parser; use digital_asset_types::dao::tree_transactions; @@ -27,12 +27,19 @@ pub struct Args { #[arg(long, env, default_value = "1")] pub signature_channel_size: usize, + #[arg(long, env, default_value = "1")] + pub queue_channel_size: usize, + #[arg(long, env, default_value = "3000")] pub transaction_check_timeout: u64, /// Database configuration #[clap(flatten)] pub database: db::PoolArgs, + + /// Redis configuration + #[clap(flatten)] + pub queue: queue::QueueArgs, } /// A thread-safe counter. @@ -88,16 +95,12 @@ impl Clone for Counter { /// The function waits for all threads to finish before returning. pub async fn run(config: Args) -> Result<()> { let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url)); + let sig_solana_rpc = Arc::clone(&solana_rpc); let conn = db::connect(config.database).await?; - let trees = tree::all(&solana_rpc).await?; - - let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); - let mut crawl_handlers = Vec::with_capacity(trees.len()); - + let (queue_sender, mut queue_receiver) = mpsc::channel::>(config.queue_channel_size); let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); - let sig_solana_rpc = Arc::clone(&solana_rpc); let transaction_worker_count = Counter::new(); let transaction_worker_count_check = transaction_worker_count.clone(); @@ -108,37 +111,19 @@ pub async fn run(config: Args) -> Result<()> { Some(signature) = sig_receiver.recv() => { let solana_rpc = Arc::clone(&sig_solana_rpc); let transaction_worker_count_sig = transaction_worker_count.clone(); - let transaction_worker_count_guard = transaction_worker_count.clone(); + let queue_sender = queue_sender.clone(); transaction_worker_count_sig.increment(); let transaction_task = async move { - match tree::transaction(solana_rpc, signature).await { - Ok(builder) => {} - Err(e) => println!("error retrieving transaction: {:?}", e), + if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await { + println!("error retrieving transaction: {:?}", e); } - transaction_worker_count_sig.decrement() + transaction_worker_count_sig.decrement(); }; - let guarded_task = AssertUnwindSafe(transaction_task).catch_unwind(); - let timed_task = tokio::spawn(async move { - timeout(Duration::from_millis(config.transaction_check_timeout), guarded_task).await - }); - - let _ = tokio::spawn(async move { - match timed_task.await { - Ok(Ok(_)) => {} - Ok(Err(_)) => { - println!("Task panicked"); - transaction_worker_count_guard.decrement() - }, - Err(_) => { - println!("Task timed out"); - transaction_worker_count_guard.decrement() - } - }}); - + tokio::spawn(transaction_task); }, else => break, } @@ -147,6 +132,23 @@ pub async fn run(config: Args) -> Result<()> { Ok::<(), anyhow::Error>(()) }); + let queue_handler = tokio::spawn(async move { + let mut queue = queue::Queue::setup(config.queue).await?; + + while let Some(data) = queue_receiver.recv().await { + if let Err(e) = queue.push(&data).await { + println!("Error pushing to queue: {:?}", e); + } + } + + Ok::<(), anyhow::Error>(()) + }); + + let trees = tree::all(&solana_rpc).await?; + + let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); + let mut crawl_handlers = Vec::with_capacity(trees.len()); + for tree in trees { let solana_rpc = Arc::clone(&solana_rpc); let semaphore = Arc::clone(&semaphore); @@ -156,7 +158,7 @@ pub async fn run(config: Args) -> Result<()> { let crawl_handler = tokio::spawn(async move { let _permit = semaphore.acquire().await?; - if let Err(e) = tree::crawl(solana_rpc, sig_sender, &conn, tree).await { + if let Err(e) = tree::crawl(solana_rpc, sig_sender, conn, tree).await { println!("error crawling tree: {:?}", e); } @@ -168,6 +170,7 @@ pub async fn run(config: Args) -> Result<()> { futures::future::try_join_all(crawl_handlers).await?; transaction_worker_count_check.zero().await; + let _ = queue_handler.await?; Ok(()) } diff --git a/tree_backfiller/src/db.rs b/tree_backfiller/src/db.rs index 9490cfed6..33af2047a 100644 --- a/tree_backfiller/src/db.rs +++ b/tree_backfiller/src/db.rs @@ -12,6 +12,15 @@ pub struct PoolArgs { pub database_min_connections: u32, } +///// Establishes a connection to the database using the provided configuration. +///// +///// # Arguments +///// +///// * `config` - A `PoolArgs` struct containing the database URL and the minimum and maximum number of connections. +///// +///// # Returns +///// +///// * `Result` - On success, returns a `DatabaseConnection`. On failure, returns a `DbErr`. pub async fn connect(config: PoolArgs) -> Result { let mut options = ConnectOptions::new(config.database_url); diff --git a/tree_backfiller/src/main.rs b/tree_backfiller/src/main.rs index 7e8cf520f..207b1e5c4 100644 --- a/tree_backfiller/src/main.rs +++ b/tree_backfiller/src/main.rs @@ -1,10 +1,10 @@ mod backfiller; mod db; +mod queue; mod tree; use anyhow::Result; use clap::{Parser, Subcommand}; -use log::{debug, error, info}; #[derive(Debug, Parser)] #[clap(author, version)] diff --git a/tree_backfiller/src/queue.rs b/tree_backfiller/src/queue.rs new file mode 100644 index 000000000..215115974 --- /dev/null +++ b/tree_backfiller/src/queue.rs @@ -0,0 +1,63 @@ +use anyhow::Result; +use clap::Parser; +use figment::value::{Dict, Value}; +use plerkle_messenger::{ + redis_messenger::RedisMessenger, Messenger, MessengerConfig, MessengerError, MessengerType, +}; + +const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL"; + +#[derive(Clone, Debug, Parser)] +pub struct QueueArgs { + #[arg(long, env)] + pub messenger_redis_url: String, + #[arg(long, env, default_value = "100")] + pub messenger_redis_batch_size: String, + #[arg(long, env, default_value = "10000000")] + pub messenger_stream_max_buffer_size: usize, +} + +impl From for MessengerConfig { + fn from(args: QueueArgs) -> Self { + let mut connection_config = Dict::new(); + + connection_config.insert( + "redis_connection_str".to_string(), + Value::from(args.messenger_redis_url), + ); + + connection_config.insert( + "batch_size".to_string(), + Value::from(args.messenger_redis_batch_size), + ); + + Self { + messenger_type: MessengerType::Redis, + connection_config: connection_config, + } + } +} + +#[derive(Debug)] +pub struct Queue(RedisMessenger); + +impl Queue { + pub async fn setup(config: QueueArgs) -> Result { + let mut messenger = RedisMessenger::new(config.clone().into()).await?; + + messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?; + + messenger + .set_buffer_size( + TRANSACTION_BACKFILL_STREAM, + config.messenger_stream_max_buffer_size, + ) + .await; + + Ok(Self(messenger)) + } + + pub async fn push(&mut self, message: &[u8]) -> Result<(), MessengerError> { + self.0.send(TRANSACTION_BACKFILL_STREAM, message).await + } +} diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs index cf2e0ed37..d0bf32e7c 100644 --- a/tree_backfiller/src/tree.rs +++ b/tree_backfiller/src/tree.rs @@ -1,12 +1,14 @@ +use crate::queue::Queue; use anyhow::Result; use borsh::BorshDeserialize; use clap::Args; use digital_asset_types::dao::tree_transactions; use flatbuffers::FlatBufferBuilder; -use log::debug; -use plerkle_messenger::{Messenger, TRANSACTION_BACKFILL_STREAM}; use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; -use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder}; +use sea_orm::{ + sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, + QueryFilter, QueryOrder, +}; use solana_account_decoder::UiAccountEncoding; use solana_client::{ nonblocking::rpc_client::RpcClient, @@ -18,10 +20,8 @@ use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; use spl_account_compression::id; use spl_account_compression::state::{ - merkle_tree_get_size, ConcurrentMerkleTreeHeader, ConcurrentMerkleTreeHeaderDataV1, - CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, + merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, }; -use sqlx::{Pool, Postgres}; use std::str::FromStr; use std::sync::Arc; use thiserror::Error as ThisError; @@ -44,6 +44,10 @@ pub enum TreeErrorKind { Achor(#[from] anchor_client::anchor_lang::error::Error), #[error("perkle serialize")] PerkleSerialize(#[from] plerkle_serialization::error::PlerkleSerializationError), + #[error("perkle messenger")] + PlerkleMessenger(#[from] plerkle_messenger::MessengerError), + #[error("queue send")] + QueueSend(#[from] tokio::sync::mpsc::error::SendError>), } #[derive(Debug, Clone)] pub struct TreeHeaderResponse { @@ -115,8 +119,8 @@ pub async fn all(client: &Arc) -> Result, TreeError pub async fn crawl( client: Arc, - sig_sender: Sender, - conn: &DatabaseConnection, + sender: Sender, + conn: DatabaseConnection, tree: TreeResponse, ) -> Result<()> { let mut before = None; @@ -124,7 +128,7 @@ pub async fn crawl( let until = tree_transactions::Entity::find() .filter(tree_transactions::Column::Tree.eq(tree.pubkey.as_ref())) .order_by_desc(tree_transactions::Column::Slot) - .one(conn) + .one(&conn) .await? .map(|t| Signature::from_str(&t.signature).ok()) .flatten(); @@ -142,10 +146,26 @@ pub async fn crawl( .await?; for sig in sigs.iter() { + let slot = i64::try_from(sig.slot)?; let sig = Signature::from_str(&sig.signature)?; - println!("sig: {}", sig); - sig_sender.send(sig.clone()).await?; + let tree_transaction = tree_transactions::ActiveModel { + signature: Set(sig.to_string()), + tree: Set(tree.pubkey.as_ref().to_vec()), + slot: Set(slot), + ..Default::default() + }; + + tree_transactions::Entity::insert(tree_transaction) + .on_conflict( + OnConflict::column(tree_transactions::Column::Signature) + .do_nothing() + .to_owned(), + ) + .exec(&conn) + .await?; + + sender.send(sig.clone()).await?; before = Some(sig); } @@ -160,8 +180,9 @@ pub async fn crawl( pub async fn transaction<'a>( client: Arc, + sender: Sender>, signature: Signature, -) -> Result, TreeErrorKind> { +) -> Result<(), TreeErrorKind> { let transaction = client .get_transaction_with_config( &signature, @@ -173,8 +194,9 @@ pub async fn transaction<'a>( ) .await?; - Ok(seralize_encoded_transaction_with_status( - FlatBufferBuilder::new(), - transaction, - )?) + let message = seralize_encoded_transaction_with_status(FlatBufferBuilder::new(), transaction)?; + + sender.send(message.finished_data().to_vec()).await?; + + Ok(()) }