diff --git a/crates/accountsdb-rabbitmq/src/lib.rs b/crates/accountsdb-rabbitmq/src/lib.rs index 94ca892be..f24862009 100644 --- a/crates/accountsdb-rabbitmq/src/lib.rs +++ b/crates/accountsdb-rabbitmq/src/lib.rs @@ -22,6 +22,7 @@ pub(crate) mod prelude { } pub(crate) mod config; +pub(crate) mod metrics; mod plugin; pub(crate) mod selectors; pub(crate) mod sender; diff --git a/crates/accountsdb-rabbitmq/src/metrics.rs b/crates/accountsdb-rabbitmq/src/metrics.rs new file mode 100644 index 000000000..16aa51f2e --- /dev/null +++ b/crates/accountsdb-rabbitmq/src/metrics.rs @@ -0,0 +1,92 @@ +use std::{ + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, + time::Duration, +}; + +use smol::{channel, Executor, Timer}; + +#[derive(Debug)] +pub struct Counter(AtomicI64); + +impl Counter { + #[inline] + fn new() -> Self { + Self(AtomicI64::new(0)) + } + + pub fn log(&self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + + fn get(&self) -> i64 { + self.0.swap(0, Ordering::SeqCst) + } +} + +#[derive(Debug)] +pub struct Metrics { + _executor: Arc>, + _stop: channel::Sender<()>, + pub sends: Counter, + pub fg_sends: Counter, + pub recvs: Counter, + pub errs: Counter, + pub reconnects: Counter, +} + +impl Metrics { + pub fn new_rc() -> Arc { + let executor = Arc::new(Executor::new()); + let (stop_tx, stop_rx) = channel::bounded(1); + + std::thread::spawn({ + let executor = executor.clone(); + + move || smol::block_on(executor.run(stop_rx.recv())) + }); + + let this = Arc::new(Self { + _executor: executor.clone(), + _stop: stop_tx, + sends: Counter::new(), + fg_sends: Counter::new(), + recvs: Counter::new(), + errs: Counter::new(), + reconnects: Counter::new(), + }); + + executor + .spawn({ + let this = Arc::clone(&this); + + async move { + loop { + Timer::after(Duration::from_secs(30)).await; + + this.submit(); + } + } + }) + .detach(); + + this + } + + fn submit(&self) { + solana_metrics::datapoint_info!( + "accountsdb_rabbitmq", + ("msgs_sent", self.sends.get(), i64), + ("blocking_sends", self.fg_sends.get(), i64), + ("evts_recvd", self.recvs.get(), i64), + ); + + solana_metrics::datapoint_error!( + "accountsdb_rabbitmq", + ("errors", self.errs.get(), i64), + ("reconnects", self.reconnects.get(), i64), + ); + } +} diff --git a/crates/accountsdb-rabbitmq/src/plugin.rs b/crates/accountsdb-rabbitmq/src/plugin.rs index b5c233bc8..b988ff0fd 100644 --- a/crates/accountsdb-rabbitmq/src/plugin.rs +++ b/crates/accountsdb-rabbitmq/src/plugin.rs @@ -1,15 +1,6 @@ -use std::{ - collections::HashSet, - env, - sync::atomic::{AtomicUsize, Ordering}, - time::{Duration, Instant}, -}; +use std::{collections::HashSet, env, sync::Arc}; -use indexer_rabbitmq::{ - accountsdb::{AccountUpdate, Message, Producer, QueueType}, - lapin::{Connection, ConnectionProperties}, -}; -use smol::lock::Mutex; +use indexer_rabbitmq::accountsdb::{AccountUpdate, Message}; use solana_program::{ instruction::CompiledInstruction, message::SanitizedMessage, program_pack::Pack, }; @@ -29,68 +20,29 @@ use crate::{ AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfo, ReplicaAccountInfoVersions, ReplicaTransactionInfoVersions, Result, }, + metrics::{Counter, Metrics}, prelude::*, selectors::{AccountSelector, InstructionSelector}, sender::Sender, }; -fn custom_err( - e: impl Into>, -) -> AccountsDbPluginError { - AccountsDbPluginError::Custom(e.into()) -} - -#[derive(Debug)] -struct Metrics { - last_send: Mutex, - last_recv: Mutex, - send_count: AtomicUsize, - recv_count: AtomicUsize, -} - -impl Metrics { - async fn try_submit(last: &Mutex, f: impl FnOnce()) { - let now = Instant::now(); - let mut time = last.lock().await; - - if now - *time >= Duration::from_secs(30) { - *time = now; - f(); - } - } - - async fn log_send(&self) { - let n = self.send_count.fetch_add(1, Ordering::SeqCst) + 1; - - Self::try_submit(&self.last_send, || { - solana_metrics::datapoint_info!( - "accountsdb_rabbitmq", - ("msgs_sent", i64::try_from(n).unwrap(), i64), - ); - }) - .await; - } - - async fn log_recv(&self) { - let n = self.recv_count.fetch_add(1, Ordering::SeqCst) + 1; - - Self::try_submit(&self.last_recv, || { - solana_metrics::datapoint_info!( - "accountsdb_rabbitmq", - ("msgs_sent", i64::try_from(n).unwrap(), i64), - ); - }) - .await; +#[inline] +fn custom_err<'a, E: Into>>( + counter: &'a Counter, +) -> impl FnOnce(E) -> AccountsDbPluginError + 'a { + |e| { + counter.log(); + AccountsDbPluginError::Custom(e.into()) } } /// An instance of the plugin #[derive(Debug, Default)] pub struct AccountsDbPluginRabbitMq { - producer: Option>, + producer: Option, acct_sel: Option, ins_sel: Option, - metrics: Option, + metrics: Option>, token_addresses: HashSet, } @@ -107,18 +59,15 @@ struct TokenList { impl AccountsDbPluginRabbitMq { const TOKEN_REG_URL: &'static str = "https://raw.githubusercontent.com/solana-labs/token-list/main/src/tokens/solana.tokenlist.json"; - fn load_token_reg() -> Result> { + fn load_token_reg() -> StdResult, anyhow::Error> { // We use `smol` as an executor, and reqwest's async backend doesn't like that - let res: TokenList = reqwest::blocking::get(Self::TOKEN_REG_URL) - .map_err(custom_err)? - .json() - .map_err(custom_err)?; + let res: TokenList = reqwest::blocking::get(Self::TOKEN_REG_URL)?.json()?; res.tokens .into_iter() .map(|TokenItem { address }| address.parse()) .collect::>() - .map_err(custom_err) + .map_err(Into::into) } } @@ -129,14 +78,15 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { fn on_load(&mut self, cfg: &str) -> Result<()> { solana_logger::setup_with_default("info"); + let metrics = Metrics::new_rc(); - let (amqp, jobs, metrics, acct, ins) = Config::read(cfg) + let (amqp, jobs, metrics_conf, acct, ins) = Config::read(cfg) .and_then(Config::into_parts) - .map_err(custom_err)?; + .map_err(custom_err(&metrics.errs))?; let startup_type = acct.startup(); - if let Some(config) = metrics.config { + if let Some(config) = metrics_conf.config { const VAR: &str = "SOLANA_METRICS_CONFIG"; if env::var_os(VAR).is_some() { @@ -149,29 +99,16 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { self.acct_sel = Some(acct); self.ins_sel = Some(ins); - self.token_addresses = Self::load_token_reg()?; + self.token_addresses = Self::load_token_reg().map_err(custom_err(&metrics.errs))?; - self.metrics = Some(Metrics { - last_send: Mutex::new(Instant::now()), - last_recv: Mutex::new(Instant::now()), - send_count: AtomicUsize::new(0), - recv_count: AtomicUsize::new(0), - }); + self.metrics = Some(Arc::clone(&metrics)); smol::block_on(async { - let conn = Connection::connect( - &amqp.address, - ConnectionProperties::default().with_executor(smol_executor_trait::Smol), - ) - .await - .map_err(custom_err)?; - - self.producer = Some(Sender::new( - Producer::new(&conn, QueueType::new(amqp.network, startup_type, None)) + self.producer = Some( + Sender::new(amqp, &jobs, startup_type, Arc::clone(&metrics)) .await - .map_err(custom_err)?, - jobs.limit, - )); + .map_err(custom_err(&metrics.errs))?, + ); Ok(()) }) @@ -183,23 +120,30 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { slot: u64, is_startup: bool, ) -> Result<()> { - fn uninit() -> AccountsDbPluginError { - AccountsDbPluginError::AccountsUpdateError { - msg: "RabbitMQ plugin not initialized yet!".into(), + #[inline] + fn uninit<'a>( + counter: impl Into> + 'a, + ) -> impl FnOnce() -> AccountsDbPluginError + 'a { + || { + counter.into().map(Counter::log); + + AccountsDbPluginError::AccountsUpdateError { + msg: "RabbitMQ plugin not initialized yet!".into(), + } } } smol::block_on(async { - let metrics = self.metrics.as_ref().ok_or_else(uninit)?; + let metrics = self.metrics.as_ref().ok_or_else(uninit(None))?; - metrics.log_recv().await; + metrics.recvs.log(); match account { ReplicaAccountInfoVersions::V0_0_1(acct) => { if !self .acct_sel .as_ref() - .ok_or_else(uninit)? + .ok_or_else(uninit(&metrics.errs))? .is_selected(acct, is_startup) { return Ok(()); @@ -229,31 +173,31 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { } } - let key = Pubkey::new_from_array(pubkey.try_into().map_err(custom_err)?); - let owner = Pubkey::new_from_array(owner.try_into().map_err(custom_err)?); + let key = Pubkey::new_from_array( + pubkey.try_into().map_err(custom_err(&metrics.errs))?, + ); + let owner = Pubkey::new_from_array( + owner.try_into().map_err(custom_err(&metrics.errs))?, + ); let data = data.to_owned(); self.producer .as_ref() - .ok_or_else(uninit)? - .run(move |prod| async move { - prod.write(Message::AccountUpdate(AccountUpdate { - key, - lamports, - owner, - executable, - rent_epoch, - data, - write_version, - slot, - is_startup, - })) - .await - .map_err(Into::into) - }) + .ok_or_else(uninit(&metrics.errs))? + .send(Message::AccountUpdate(AccountUpdate { + key, + lamports, + owner, + executable, + rent_epoch, + data, + write_version, + slot, + is_startup, + })) .await; - metrics.log_send().await; + metrics.sends.log(); }, } @@ -266,8 +210,17 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { transaction: ReplicaTransactionInfoVersions, _slot: u64, ) -> Result<()> { - fn uninit() -> AccountsDbPluginError { - AccountsDbPluginError::Custom(anyhow!("RabbitMQ plugin not initialized yet!").into()) + #[inline] + fn uninit<'a>( + counter: impl Into> + 'a, + ) -> impl FnOnce() -> AccountsDbPluginError + 'a { + || { + counter.into().map(Counter::log); + + AccountsDbPluginError::Custom( + anyhow!("RabbitMQ plugin not initialized yet!").into(), + ) + } } #[inline] @@ -275,7 +228,7 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { ins: &CompiledInstruction, sel: &InstructionSelector, msg: &SanitizedMessage, - prod: &Sender, + prod: &Sender, ) -> StdResult<(), anyhow::Error> { // TODO: no clue if this is right. let program = *msg @@ -300,30 +253,28 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { let data = ins.data.clone(); - prod.run(|prod| async move { - prod.write(Message::InstructionNotify { - program, - data, - accounts, - }) - .await - .map_err(Into::into) + prod.send(Message::InstructionNotify { + program, + data, + accounts, }) .await; Ok(()) } + let metrics = self.metrics.as_ref().ok_or_else(uninit(None))?; + smol::block_on(async { match transaction { ReplicaTransactionInfoVersions::V0_0_1(tx) => { - let ins_sel = self.ins_sel.as_ref().ok_or_else(uninit)?; + let ins_sel = self.ins_sel.as_ref().ok_or_else(uninit(&metrics.errs))?; if matches!(tx.transaction_status_meta.status, Err(..)) { return Ok(()); } - let prod = self.producer.as_ref().ok_or_else(uninit)?; + let prod = self.producer.as_ref().ok_or_else(uninit(&metrics.errs))?; let msg = tx.transaction.message(); for ins in msg.instructions().iter().chain( @@ -335,7 +286,10 @@ impl AccountsDbPlugin for AccountsDbPluginRabbitMq { ) { process_instruction(ins, ins_sel, msg, prod) .await - .map_err(|e| debug!("Error processing instruction: {:?}", e)) + .map_err(|e| { + debug!("Error processing instruction: {:?}", e); + metrics.errs.log(); + }) .ok(); } }, diff --git a/crates/accountsdb-rabbitmq/src/sender.rs b/crates/accountsdb-rabbitmq/src/sender.rs index 6ff6af328..738c0ffc9 100644 --- a/crates/accountsdb-rabbitmq/src/sender.rs +++ b/crates/accountsdb-rabbitmq/src/sender.rs @@ -1,26 +1,124 @@ -use std::{ - future::Future, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; -use smol::{channel, Executor}; +use indexer_rabbitmq::{ + accountsdb::{Message, Producer, QueueType, StartupType}, + lapin::{Connection, ConnectionProperties}, +}; +use smol::{ + channel, + lock::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, + Executor, +}; + +use crate::{ + config, + metrics::{Counter, Metrics}, +}; #[derive(Debug)] -pub struct Sender { - payload: Arc, +struct Inner { + background_count: AtomicUsize, + + amqp: config::Amqp, + startup_type: StartupType, + producer: RwLock, + metrics: Arc, +} + +impl Inner { + async fn new( + amqp: config::Amqp, + startup_type: StartupType, + metrics: Arc, + ) -> Result { + let producer = Self::create_producer(&amqp, startup_type).await?; + + Ok(Self { + background_count: AtomicUsize::new(0), + amqp, + startup_type, + producer: RwLock::new(producer), + metrics, + }) + } + + async fn create_producer( + amqp: &config::Amqp, + startup_type: StartupType, + ) -> Result { + let conn = Connection::connect( + &amqp.address, + ConnectionProperties::default().with_executor(smol_executor_trait::Smol), + ) + .await?; + + Producer::new(&conn, QueueType::new(amqp.network, startup_type, None)).await + } + + async fn connect<'a>( + &self, + prod: RwLockUpgradableReadGuard<'a, Producer>, + ) -> Result, indexer_rabbitmq::Error> { + let mut prod = RwLockUpgradableReadGuard::upgrade(prod).await; + + *prod = Self::create_producer(&self.amqp, self.startup_type).await?; + + Ok(RwLockWriteGuard::downgrade_to_upgradable(prod)) + } + + async fn send(self: Arc, msg: Message, backgrounded: bool) { + #[inline] + fn log_err(counter: &'_ Counter) -> impl FnOnce(E) + '_ { + |err| { + counter.log(); + log::error!("{:?}", err); + } + } + + let metrics = self.metrics.as_ref(); + let prod = self.producer.upgradable_read().await; + + match prod.write(&msg).await.map_err(log_err(&metrics.errs)) { + Ok(()) => return, + Err(()) => (), + } + + metrics.reconnects.log(); + let prod = match self.connect(prod).await.map_err(log_err(&metrics.errs)) { + Ok(p) => p, + Err(()) => return, + }; + + match prod.write(&msg).await.map_err(log_err(&metrics.errs)) { + Ok(()) | Err(()) => (), // Type-level assertion that we consumed the error + } + + if backgrounded { + assert!(self.background_count.fetch_sub(1, Ordering::SeqCst) > 0); + } + } +} + +#[derive(Debug)] +pub struct Sender { + inner: Arc, executor: Arc>, _stop: channel::Sender<()>, - background_count: AtomicUsize, limit: usize, } -impl Sender { - pub fn new(payload: T, limit: usize) -> Self { +impl Sender { + pub async fn new( + amqp: config::Amqp, + jobs: &config::Jobs, + startup_type: StartupType, + metrics: Arc, + ) -> Result { let executor = Arc::new(Executor::new()); - let (stop, stop_rx) = channel::bounded(1); + let (stop_tx, stop_rx) = channel::bounded(1); std::thread::spawn({ let executor = executor.clone(); @@ -28,40 +126,28 @@ impl Sender { move || smol::block_on(executor.run(stop_rx.recv())) }); - Self { - payload: Arc::new(payload), + Ok(Self { + inner: Arc::new(Inner::new(amqp, startup_type, metrics).await?), executor, - _stop: stop, - background_count: AtomicUsize::new(0), - limit, - } - } - - async fn wrap_future(f: impl Future> + Send + 'static) { - match f.await { - Ok(()) => (), - Err(e) => log::error!("{:?}", e), - } + _stop: stop_tx, + limit: jobs.limit, + }) } - pub async fn run> + Send + 'static>( - &self, - f: impl FnOnce(Arc) -> F, - ) { + pub async fn send(&self, msg: Message) { + let inner = Arc::clone(&self.inner); let new_count = - self.background_count + inner + .background_count .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |c| { if c < self.limit { Some(c + 1) } else { None } }); if new_count.is_ok() { - self.executor - .spawn(Self::wrap_future(f(Arc::clone(&self.payload)))) - .detach(); - - assert!(self.background_count.fetch_sub(1, Ordering::SeqCst) > 0); + self.executor.spawn(inner.send(msg, true)).detach(); } else { - Self::wrap_future(f(Arc::clone(&self.payload))).await; + inner.metrics.fg_sends.log(); + inner.send(msg, false).await; } } }