Skip to content

Commit

Permalink
feat(backfiller): push transaction payloads to redis through the perk…
Browse files Browse the repository at this point in the history
…le messenger. mark tree transactons as processed_at so know it completed the index loop.
  • Loading branch information
kespinola committed Dec 12, 2023
1 parent 7725d5b commit c311b11
Show file tree
Hide file tree
Showing 19 changed files with 273 additions and 114 deletions.
4 changes: 2 additions & 2 deletions digital_asset_types/src/dao/generated/cl_audits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ impl From<crate::dao::cl_items::ActiveModel> for ActiveModel {
seq: item.seq,
leaf_idx: item.leaf_idx,
..Default::default()
}
};
}
}
}
2 changes: 1 addition & 1 deletion digital_asset_types/src/dao/generated/tree_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ColumnTrait for Column {
type EntityName = Entity;
fn def(&self) -> ColumnDef {
match self {
Self::Signature => ColumnType::Char(Some(64u32)).def(),
Self::Signature => ColumnType::Char(Some(84u32)).def(),
Self::Tree => ColumnType::Binary.def(),
Self::Slot => ColumnType::BigInteger.def(),
Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(),
Expand Down
14 changes: 11 additions & 3 deletions digital_asset_types/src/dao/scopes/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ pub async fn get_signatures_for_asset(
) -> Result<Vec<(String, Option<String>)>, DbErr> {
// if tree_id and leaf_idx are provided, use them directly to fetch transactions
if let (Some(tree_id), Some(leaf_idx)) = (tree_id, leaf_idx) {
let transactions = fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?;
let transactions =
fetch_transactions(conn, tree_id, leaf_idx, sort_direction, pagination, limit).await?;
return Ok(transactions);
}

Expand Down Expand Up @@ -442,7 +443,8 @@ pub async fn get_signatures_for_asset(
let leaf_id = asset
.nonce
.ok_or(DbErr::RecordNotFound("Leaf ID does not exist".to_string()))?;
let transactions = fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?;
let transactions =
fetch_transactions(conn, tree, leaf_id, sort_direction, pagination, limit).await?;
Ok(transactions)
} else {
Ok(Vec::new())
Expand All @@ -461,7 +463,13 @@ pub async fn fetch_transactions(
stmt = stmt.filter(cl_audits::Column::LeafIdx.eq(leaf_id));
stmt = stmt.order_by(cl_audits::Column::CreatedAt, sea_orm::Order::Desc);

stmt = paginate(pagination, limit, stmt, sort_direction, cl_audits::Column::Id);
stmt = paginate(
pagination,
limit,
stmt,
sort_direction,
cl_audits::Column::Id,
);
let transactions = stmt.all(conn).await?;
let transaction_list: Vec<(String, Option<String>)> = transactions
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl MigrationTrait for Migration {
.if_not_exists()
.col(
ColumnDef::new(TreeTransactions::Signature)
.char_len(64)
.char_len(88)
.not_null()
.primary_key(),
)
Expand Down
6 changes: 5 additions & 1 deletion nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ pub fn account_worker<T: Messenger>(
})
}

async fn handle_account(manager: Arc<ProgramTransformer>, item: RecvData, stream_key: &'static str) -> Option<String> {
async fn handle_account(
manager: Arc<ProgramTransformer>,
item: RecvData,
stream_key: &'static str,
) -> Option<String> {
let id = item.id;
let mut ret_id = None;
let data = item.data;
Expand Down
16 changes: 10 additions & 6 deletions nft_ingester/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use sqlx::{postgres::{PgPoolOptions, PgConnectOptions}, PgPool, ConnectOptions};

use crate::{
config::{IngesterConfig, IngesterRole},
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
ConnectOptions, PgPool,
};

use crate::config::{IngesterConfig, IngesterRole};
const BARE_MINIMUM_CONNECTIONS: u32 = 5;
const DEFAULT_MAX: u32 = 125;
pub async fn setup_database(config: IngesterConfig) -> PgPool {
Expand All @@ -19,8 +20,11 @@ pub async fn setup_database(config: IngesterConfig) -> PgPool {
let mut options: PgConnectOptions = url.parse().unwrap();
options.log_statements(log::LevelFilter::Trace);

options.log_slow_statements(log::LevelFilter::Debug, std::time::Duration::from_millis(500));

options.log_slow_statements(
log::LevelFilter::Debug,
std::time::Duration::from_millis(500),
);

let pool = PgPoolOptions::new()
.min_connections(BARE_MINIMUM_CONNECTIONS)
.max_connections(max)
Expand Down
4 changes: 4 additions & 0 deletions nft_ingester/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum IngesterError {
HttpError { status_code: String },
#[error("AssetIndex Error {0}")]
AssetIndexError(String),
#[error("TryFromInt Error {0}")]
TryFromInt(#[from] std::num::TryFromIntError),
#[error("Chrono FixedOffset Error")]
ChronoFixedOffset,
}

impl From<reqwest::Error> for IngesterError {
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use chrono::Duration;
use clap::{arg, command, value_parser};
use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM,
TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM,
};
use std::{path::PathBuf, time};
use tokio::{signal, task::JoinSet};
Expand Down Expand Up @@ -118,7 +119,6 @@ pub async fn main() -> Result<(), IngesterError> {
TRANSACTION_BACKFILL_STREAM,
)?;


if let Some(t) = timer_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
Expand Down
9 changes: 2 additions & 7 deletions nft_ingester/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use cadence_macros::{is_global_default_set, set_global_default, statsd_count, st
use log::{error, warn};
use tokio::time::Instant;

use crate::{
config::IngesterConfig,
error::IngesterError,
};
use crate::{config::IngesterConfig, error::IngesterError};

#[macro_export]
macro_rules! metric {
Expand All @@ -32,9 +29,7 @@ pub fn setup_metrics(config: &IngesterConfig) {
let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap();
let queuing_sink = QueuingMetricSink::from(udp_sink);
let builder = StatsdClient::builder("das_ingester", queuing_sink);
let client = builder
.with_tag("env", env)
.build();
let client = builder.with_tag("env", env).build();
set_global_default(client);
}
}
Expand Down
99 changes: 65 additions & 34 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,76 @@
use crate::error::IngesterError;
use digital_asset_types::dao::{
asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items,
asset, asset_creators, asset_grouping, cl_audits, cl_items, tree_transactions,
};
use log::{debug, info, error};
use log::{debug, error, info};
use mpl_bubblegum::types::Collection;
use sea_orm::{
query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait,
};
use spl_account_compression::events::ChangeLogEventV1;

use std::convert::From;
use sea_orm::ActiveModelTrait;

/// Mark tree transaction as processed. If the transaction already exists, update the `processed_at` field.
///
/// This function takes in a tree ID, slot, transaction ID, and a transaction object.
/// It first checks if a tree transaction with the given transaction ID already exists.
/// If it does, it updates the `processed_at` field of the existing tree transaction with the current time.
/// If it doesn't, it creates a new tree transaction with the provided parameters and saves it.
///
/// # Arguments
///
/// * `tree_id` - A vector of bytes representing the ID of the tree.
/// * `slot` - A 64-bit unsigned integer representing the slot.
/// * `txn_id` - A string slice representing the transaction ID.
/// * `txn` - A reference to a transaction object.
///
/// # Returns
///
/// This function returns a `Result` that contains an empty tuple, or an `IngesterError` if the operation fails.
pub async fn save_tree_transaction<'c, T>(
tree_id: Vec<u8>,
slot: u64,
txn_id: &str,
txn: &T,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
let now = chrono::Utc::now()
.with_timezone(&chrono::FixedOffset::east_opt(0).ok_or(IngesterError::ChronoFixedOffset)?);

let tree_transaction = tree_transactions::Entity::find()
.filter(tree_transactions::Column::Signature.eq(txn_id))
.one(txn)
.await?;

if let Some(tree_transaction) = tree_transaction {
let mut tree_transaction: tree_transactions::ActiveModel = tree_transaction.into();

tree_transaction.processed_at = Set(Some(now));

tree_transaction.save(txn).await?;
} else {
let tree_transaction = tree_transactions::ActiveModel {
signature: Set(txn_id.to_string()),
slot: Set(i64::try_from(slot)?),
tree: Set(tree_id.to_vec()),
processed_at: Set(Some(now)),
..Default::default()
};

tree_transactions::Entity::insert(tree_transaction)
.on_conflict(
OnConflict::column(tree_transactions::Column::Signature)
.do_nothing()
.to_owned(),
)
.exec(txn)
.await?;
}
Ok(())
}

pub async fn save_changelog_event<'c, T>(
change_log_event: &ChangeLogEventV1,
Expand Down Expand Up @@ -44,6 +105,7 @@ where
let mut i: i64 = 0;
let depth = change_log_event.path.len() - 1;
let tree_id = change_log_event.id.as_ref();

for p in change_log_event.path.iter() {
let node_idx = p.index as i64;
info!(
Expand Down Expand Up @@ -103,37 +165,6 @@ where
}
}

// If and only if the entire path of nodes was inserted into the `cl_items` table, then insert
// a single row into the `backfill_items` table. This way if an incomplete path was inserted
// into `cl_items` due to an error, a gap will be created for the tree and the backfiller will
// fix it.
if i - 1 == depth as i64 {
// See if the tree already exists in the `backfill_items` table.
let rows = backfill_items::Entity::find()
.filter(backfill_items::Column::Tree.eq(tree_id))
.limit(1)
.all(txn)
.await?;

// If the tree does not exist in `backfill_items` and the sequence number is greater than 1,
// then we know we will need to backfill the tree from sequence number 1 up to the current
// sequence number. So in this case we set at flag to force checking the tree.
let force_chk = rows.is_empty() && change_log_event.seq > 1;

info!("Adding to backfill_items table at level {}", i - 1);
let item = backfill_items::ActiveModel {
tree: Set(tree_id.to_vec()),
seq: Set(change_log_event.seq as i64),
slot: Set(slot as i64),
force_chk: Set(force_chk),
backfilled: Set(false),
failed: Set(false),
..Default::default()
};

backfill_items::Entity::insert(item).exec(txn).await?;
}

Ok(())
//TODO -> set maximum size of path and break into multiple statements
}
Expand Down
12 changes: 12 additions & 0 deletions nft_ingester/src/program_transformers/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ where
}
_ => debug!("Bubblegum: Not Implemented Instruction"),
}

// TODO: assuming tree update available on all transactions but need to confirm.
if let Some(tree_update) = &parsing_result.tree_update {
save_tree_transaction(
tree_update.id.to_bytes().to_vec(),
bundle.slot,
bundle.txn_id,
txn,
)
.await?;
}

Ok(())
}

Expand Down
6 changes: 2 additions & 4 deletions nft_ingester/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@

use crate::{error::IngesterError, metric};
use cadence_macros::{is_global_default_set, statsd_count, statsd_gauge};

use log::{error};
use log::error;
use plerkle_messenger::{Messenger, MessengerConfig};
use tokio::{
task::{JoinHandle},
task::JoinHandle,
time::{self, Duration},
};


pub struct StreamSizeTimer {
interval: tokio::time::Duration,
messenger_config: MessengerConfig,
Expand Down
10 changes: 6 additions & 4 deletions nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use crate::{
use cadence_macros::{is_global_default_set, statsd_count, statsd_time};
use chrono::Utc;
use log::{debug, error};
use plerkle_messenger::{
ConsumptionType, Messenger, MessengerConfig, RecvData,
};
use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData};
use plerkle_serialization::root_as_transaction_info;

use sqlx::{Pool, Postgres};
Expand Down Expand Up @@ -69,7 +67,11 @@ pub fn transaction_worker<T: Messenger>(
})
}

async fn handle_transaction(manager: Arc<ProgramTransformer>, item: RecvData, stream_key: &'static str) -> Option<String> {
async fn handle_transaction(
manager: Arc<ProgramTransformer>,
item: RecvData,
stream_key: &'static str,
) -> Option<String> {
let mut ret_id = None;
if item.tries > 0 {
metric! {
Expand Down
6 changes: 5 additions & 1 deletion tools/acc_forwarder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,11 @@ async fn send_account(
let fbb = serialize_account(fbb, &account_info, slot, is_startup);
let bytes = fbb.finished_data();

messenger.lock().await.send(ACCOUNT_BACKFILL_STREAM, bytes).await?;
messenger
.lock()
.await
.send(ACCOUNT_BACKFILL_STREAM, bytes)
.await?;
info!("sent account {} to stream", pubkey);
ACC_FORWARDER_SENT.inc();

Expand Down
Loading

0 comments on commit c311b11

Please sign in to comment.