diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 6e11fb539..4f281fcc1 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -27,15 +27,25 @@ pub struct IngesterConfig { pub backfiller_trees: Option>, pub role: Option, pub max_postgres_connections: Option, - pub account_stream_worker_count: Option, - pub account_backfill_stream_worker_count: Option, - pub transaction_stream_worker_count: Option, - pub transaction_backfill_stream_worker_count: Option, + pub worker_config: Option>, pub code_version: Option<&'static str>, pub background_task_runner_config: Option, pub cl_audits: Option, // save transaction logs for compressed nfts } +#[derive(Deserialize, PartialEq, Debug, Clone)] +pub struct WorkerConfig { + pub stream_name: String, + pub worker_type: WorkerType, + pub worker_count: u32, +} + +#[derive(Deserialize, PartialEq, Debug, Clone)] +pub enum WorkerType { + Account, + Transaction, +} + impl IngesterConfig { /// Get the db url out of the dict, this is built a a dict so that future extra db parameters can be easily shoved in. /// this panics if the key is not present @@ -66,20 +76,31 @@ impl IngesterConfig { mc } - pub fn get_account_stream_worker_count(&self) -> u32 { - self.account_stream_worker_count.unwrap_or(2) + pub fn get_worker_config(&self) -> Vec { + return if let Some(wc) = &self.worker_config { + wc.to_vec() + } else { + vec![ + WorkerConfig { + stream_name: "ACC".to_string(), + worker_count: 2, + worker_type: WorkerType::Account, + }, + WorkerConfig { + stream_name: "TXN".to_string(), + worker_count: 2, + worker_type: WorkerType::Transaction, + }, + ] + }; } - pub fn get_account_backfill_stream_worker_count(&self) -> u32 { - self.account_backfill_stream_worker_count.unwrap_or(0) - } - - pub fn get_transaction_stream_worker_count(&self) -> u32 { - self.transaction_stream_worker_count.unwrap_or(2) - } - - pub fn get_transaction_backfill_stream_worker_count(&self) -> u32 { - self.transaction_backfill_stream_worker_count.unwrap_or(0) + pub fn get_worker_count(&self) -> u32 { + let mut count = 0; + for wc in self.get_worker_config() { + count += wc.worker_count; + } + count } } diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 3d08cafb1..f68b31d72 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -14,7 +14,7 @@ use crate::{ account_updates::account_worker, ack::ack_worker, backfiller::setup_backfiller, - config::{init_logger, rand_string, setup_config, IngesterRole}, + config::{init_logger, rand_string, setup_config, IngesterRole, WorkerType}, database::setup_database, error::IngesterError, metrics::setup_metrics, @@ -26,9 +26,7 @@ use cadence_macros::{is_global_default_set, statsd_count}; use chrono::Duration; use clap::{arg, command, value_parser}; use log::{error, info}; -use plerkle_messenger::{ - redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM -}; +use plerkle_messenger::{redis_messenger::RedisMessenger, ConsumptionType}; use std::{path::PathBuf, time}; use tokio::{signal, task::JoinSet}; @@ -97,102 +95,58 @@ pub async fn main() -> Result<(), IngesterError> { if role != IngesterRole::BackgroundTaskRunner { tasks.spawn(bg_task_listener); } - let mut timer_acc = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - ACCOUNT_STREAM, - )?; - let mut timer_backfiller_acc = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - ACCOUNT_BACKFILL_STREAM, - )?; - let mut timer_txn = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - TRANSACTION_STREAM, - )?; - let mut timer_backfiller_txn = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - TRANSACTION_BACKFILL_STREAM, - )?; - - - if let Some(t) = timer_acc.start::().await { - tasks.spawn(t); - } - if let Some(t) = timer_backfiller_acc.start::().await { - tasks.spawn(t); - } - if let Some(t) = timer_txn.start::().await { - tasks.spawn(t); - } - if let Some(t) = timer_backfiller_txn.start::().await { - tasks.spawn(t); - } // Stream Consumers Setup ------------------------------------- if role == IngesterRole::Ingester || role == IngesterRole::All { + let workers = config.get_worker_config().clone(); + let (_ack_task, ack_sender) = ack_worker::(config.get_messneger_client_config()); - for i in 0..config.get_account_stream_worker_count() { - let _account = account_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - ACCOUNT_STREAM, - ); - } - for i in 0..config.get_account_backfill_stream_worker_count() { - let _account_backfill = account_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - ACCOUNT_BACKFILL_STREAM, - ); - } - for i in 0..config.get_transaction_stream_worker_count() { - let _txn = transaction_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - config.cl_audits.unwrap_or(false), - TRANSACTION_STREAM, - ); - } - for i in 0..config.get_transaction_backfill_stream_worker_count() { - let _txn_backfill = transaction_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - config.cl_audits.unwrap_or(false), - TRANSACTION_BACKFILL_STREAM, - ); + + // iterate all the workers + for worker in workers { + let stream_name = worker.stream_name.to_owned().as_str(); + + let mut timer_worker = StreamSizeTimer::new( + stream_metrics_timer, + config.messenger_config.clone(), + stream_name.clone(), + )?; + + if let Some(t) = timer_worker.start::().await { + tasks.spawn(t); + } + + for i in 0..worker.worker_count { + if worker.worker_type == WorkerType::Account { + let _account = account_worker::( + database_pool.clone(), + config.get_messneger_client_config(), + bg_task_sender.clone(), + ack_sender.clone(), + if i == 0 { + ConsumptionType::Redeliver + } else { + ConsumptionType::New + }, + stream_name, + ); + } else if worker.worker_type == WorkerType::Transaction { + let _txn = transaction_worker::( + database_pool.clone(), + config.get_messneger_client_config(), + bg_task_sender.clone(), + ack_sender.clone(), + if i == 0 { + ConsumptionType::Redeliver + } else { + ConsumptionType::New + }, + config.cl_audits.unwrap_or(false), + stream_name, + ); + } + } } } // Stream Size Timers ----------------------------------------