From 1afee4559c6f481995b8d9ccef85e2ab7b27a0b7 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Mon, 7 Oct 2024 20:52:33 +0200 Subject: [PATCH] Remove enforcing seq in bubblegum backfill --- backfill/src/worker/program_transformer.rs | 155 ++++----------------- 1 file changed, 30 insertions(+), 125 deletions(-) diff --git a/backfill/src/worker/program_transformer.rs b/backfill/src/worker/program_transformer.rs index 5890e67d3..6e7c20646 100644 --- a/backfill/src/worker/program_transformer.rs +++ b/backfill/src/worker/program_transformer.rs @@ -1,15 +1,11 @@ use anyhow::Result; -use blockbuster::{ - instruction::InstructionBundle, program_handler::ProgramParser, - programs::bubblegum::BubblegumParser, programs::ProgramParseResult, -}; use clap::Parser; use das_core::{create_download_metadata_notifier, DownloadMetadataInfo}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use log::error; -use program_transformers::{ - bubblegum::handle_bubblegum_instruction, ProgramTransformer, TransactionInfo, -}; -use sea_orm::SqlxPostgresConnector; +use program_transformers::{ProgramTransformer, TransactionInfo}; +use std::sync::Arc; use tokio::sync::mpsc::{channel, Sender, UnboundedSender}; use tokio::task::JoinHandle; @@ -19,6 +15,8 @@ use crate::BubblegumBackfillContext; pub struct ProgramTransformerWorkerArgs { #[arg(long, env, default_value = "100000")] pub program_transformer_channel_size: usize, + #[arg(long, env, default_value = "50")] + pub program_transformer_worker_count: usize, } impl ProgramTransformerWorkerArgs { @@ -32,132 +30,39 @@ impl ProgramTransformerWorkerArgs { let worker_forwarder = forwarder.clone(); let worker_pool = context.database_pool.clone(); + let worker_count = self.program_transformer_worker_count; let handle = tokio::spawn(async move { - let mut transactions = Vec::new(); - let download_metadata_notifier = create_download_metadata_notifier(worker_forwarder.clone()).await; - let program_transformer = - ProgramTransformer::new(worker_pool.clone(), download_metadata_notifier); - - while let Some(transaction) = receiver.recv().await { - transactions.push(transaction); - } - - let mut instructions = transactions - .iter() - .flat_map(|tx_info| { - let ordered_instructions = program_transformer.break_transaction(tx_info); - ordered_instructions.into_iter().map(|(ix_pair, inner_ix)| { - ( - tx_info.signature.to_string(), - ix_pair.0, - ix_pair.1, - inner_ix, - ix_pair - .1 - .accounts - .iter() - .map(|&i| tx_info.account_keys[i as usize]) - .collect::>(), - tx_info.slot, - ) - }) - }) - .collect::>(); - instructions.sort_by(|a, b| { - let a_tree_update_seq = if let Some(program_parser) = - program_transformer.match_program(&a.1) - { - if let Ok(result) = program_parser.handle_instruction(&InstructionBundle { - txn_id: &a.0, - program: a.1, - instruction: Some(a.2), - inner_ix: a.3.as_deref(), - keys: a.4.as_slice(), - slot: a.5, - }) { - if let ProgramParseResult::Bubblegum(parsing_result) = result.result_type() - { - parsing_result - .tree_update - .as_ref() - .map_or(u64::MAX, |event| event.seq) - } else { - u64::MAX - } - } else { - u64::MAX - } - } else { - u64::MAX - }; - - let b_tree_update_seq = if let Some(program_parser) = - program_transformer.match_program(&b.1) - { - if let Ok(result) = program_parser.handle_instruction(&InstructionBundle { - txn_id: &b.0, - program: b.1, - instruction: Some(b.2), - inner_ix: b.3.as_deref(), - keys: b.4.as_slice(), - slot: b.5, - }) { - if let ProgramParseResult::Bubblegum(parsing_result) = result.result_type() - { - parsing_result - .tree_update - .as_ref() - .map_or(u64::MAX, |event| event.seq) - } else { - u64::MAX - } - } else { - u64::MAX - } - } else { - u64::MAX - }; + let program_transformer = Arc::new(ProgramTransformer::new( + worker_pool.clone(), + download_metadata_notifier, + )); - a_tree_update_seq.cmp(&b_tree_update_seq) - }); + let mut handlers = FuturesUnordered::new(); - let parser = BubblegumParser {}; - - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(worker_pool); - - for i in instructions { - let bundle = &InstructionBundle { - txn_id: &i.0, - program: i.1, - instruction: Some(i.2), - inner_ix: i.3.as_deref(), - keys: i.4.as_slice(), - slot: i.5, - }; - if let Ok(result) = parser.handle_instruction(bundle) { - if let ProgramParseResult::Bubblegum(parsing_result) = result.result_type() { - let download_metadata_notifier = - create_download_metadata_notifier(worker_forwarder.clone()).await; + while let Some(transaction) = receiver.recv().await { + if handlers.len() >= worker_count { + handlers.next().await; + } - if let Err(err) = handle_bubblegum_instruction( - parsing_result, - bundle, - &conn, - &download_metadata_notifier, - ) + let program_transformer_clone = Arc::clone(&program_transformer); + let handle = tokio::spawn(async move { + if let Err(err) = program_transformer_clone + .handle_transaction(&transaction) .await - { - error!( - "Failed to handle bubblegum instruction for txn {:?}: {:?}", - bundle.txn_id, err - ); - break; - } + { + error!( + "Failed to handle bubblegum instruction for txn {:?}: {:?}", + transaction.signature, err + ); } - } + }); + + handlers.push(handle); } + + futures::future::join_all(handlers).await; }); Ok((handle, sender))