diff --git a/bridges/centralized-ethereum/src/actors/watch_dog.rs b/bridges/centralized-ethereum/src/actors/watch_dog.rs index 279ee0108..9f38ab409 100644 --- a/bridges/centralized-ethereum/src/actors/watch_dog.rs +++ b/bridges/centralized-ethereum/src/actors/watch_dog.rs @@ -7,11 +7,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use web3::{ - contract::Contract, - transports::Http, - types::H160, -}; +use web3::{contract::Contract, transports::Http, types::H160}; use witnet_net::client::tcp::{jsonrpc, JsonRpcClient}; use witnet_node::utils::stop_system_if_panicking; @@ -39,6 +35,8 @@ pub struct WatchDog { pub start_eth_balance: Option, /// Wit balance upon last refund pub start_wit_balance: Option, + /// Past data request cumulative counters: + pub drs_history: Option<(u64, u64)>, } impl Drop for WatchDog { @@ -57,7 +55,13 @@ impl Actor for WatchDog { fn started(&mut self, ctx: &mut Self::Context) { log::debug!("WatchDog actor has been started!"); - self.watch_global_status(None, None, ctx, Duration::from_millis(self.polling_rate_ms)); + self.watch_global_status( + None, + None, + None, + ctx, + Duration::from_millis(self.polling_rate_ms), + ); } } @@ -71,7 +75,7 @@ enum WatchDogStatus { WitDisconnect, WitSyncing, WitWaitingConsensus, - UpAndRunning + UpAndRunning, } impl WatchDogStatus { @@ -84,7 +88,7 @@ impl WatchDogStatus { WatchDogStatus::WitDisconnect => "wit-disconnect".to_string(), WatchDogStatus::WitErrors => format!("wit-errors"), WatchDogStatus::WitSyncing => "wit-syncing".to_string(), - WatchDogStatus::WitWaitingConsensus => "wit-waiting-consensus".to_string(), + WatchDogStatus::WitWaitingConsensus => "wit-waiting-consensus".to_string(), WatchDogStatus::UpAndRunning => "up-and-running".to_string(), } } @@ -98,7 +102,8 @@ impl WatchDog { /// Initialize from config pub fn from_config(config: &Config, eth_contract: Arc>) -> Self { Self { - wit_client: JsonRpcClient::start(config.witnet_jsonrpc_socket.to_string().as_str()).ok(), + wit_client: JsonRpcClient::start(config.witnet_jsonrpc_socket.to_string().as_str()) + .ok(), wit_jsonrpc_socket: config.witnet_jsonrpc_socket.to_string(), wit_utxo_min_value_threshold: config.witnet_utxo_min_value_threshold, eth_account: config.eth_from, @@ -108,6 +113,7 @@ impl WatchDog { start_ts: Some(Instant::now()), start_eth_balance: None, start_wit_balance: None, + drs_history: None, } } @@ -115,6 +121,7 @@ impl WatchDog { &mut self, eth_balance: Option, wit_balance: Option, + drs_history: Option<(u64, u64)>, ctx: &mut Context, period: Duration, ) { @@ -127,6 +134,9 @@ impl WatchDog { log::warn!("Wit account refunded to {} $WIT", wit_balance); } } + if self.drs_history.is_none() && drs_history.is_some() { + self.drs_history = drs_history; + } let start_eth_balance = self.start_eth_balance; let start_wit_balance = self.start_wit_balance; let wit_client = self.wit_client.clone(); @@ -136,38 +146,50 @@ impl WatchDog { let eth_account = self.eth_account; let eth_contract_address = self.eth_contract.clone().unwrap().address(); let running_secs = self.start_ts.unwrap().elapsed().as_secs(); + let mut drs_history = self.drs_history.unwrap_or_default(); let fut = async move { let mut status = WatchDogStatus::UpAndRunning; let dr_database = DrDatabase::from_registry(); - let (_, drs_pending, drs_finished, _) = + let (drs_new, drs_pending, drs_finished, drs_dismissed) = dr_database.send(CountDrsPerState).await.unwrap().unwrap(); let mut metrics: String = "{".to_string(); - metrics.push_str(&format!("\"drsFinished\": {drs_finished}, ")); - metrics.push_str(&format!("\"drsPending\": {drs_pending}, ")); + + metrics.push_str(&format!("\"drsCurrentlyPending\": {drs_pending}, ")); + + if drs_history != (0u64, 0u64) { + let last_reported = drs_finished - drs_history.0; + let last_dismissed = drs_dismissed - drs_history.1; + + metrics.push_str(&format!("\"drsLastReported\": {last_reported}, ")); + metrics.push_str(&format!("\"drsLastDismissed\": {last_dismissed}, ")); + + let total_queries = drs_new + drs_pending + drs_finished + drs_dismissed; + metrics.push_str(&format!("\"drsTotalQueries\": {total_queries}, ")); + } + drs_history = (drs_finished, drs_dismissed); + metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", ")); if let Some(wit_client) = wit_client { if let Err(err) = check_wit_connection_status(&wit_client).await { status = err; } - - let (wit_account, wit_balance, wit_utxos_above_threshold) = match fetch_wit_info( - &wit_client, - wit_utxo_min_value_threshold - ).await { - Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => { - (wit_account, wit_balance, wit_utxos_above_threshold) - } - Err(err) => { - if status == WatchDogStatus::UpAndRunning { - status = err; + + let (wit_account, wit_balance, wit_utxos_above_threshold) = + match fetch_wit_info(&wit_client, wit_utxo_min_value_threshold).await { + Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => { + (wit_account, wit_balance, wit_utxos_above_threshold) } - (None, None, None) - } - }; + Err(err) => { + if status == WatchDogStatus::UpAndRunning { + status = err; + } + (None, None, None) + } + }; if wit_account.is_some() { metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap())); @@ -216,26 +238,26 @@ impl WatchDog { )); } } - + metrics.push_str(&format!("\"runningSecs\": {running_secs}, ")); metrics.push_str(&format!("\"status\": \"{}\"", status.to_string())); metrics.push_str("}"); + log::info!("{metrics}"); - - (eth_balance, wit_balance) + + (eth_balance, wit_balance, Some(drs_history)) }; - ctx.spawn( - fut.into_actor(self) - .then(move |(eth_balance, wit_balance), _act, ctx| { - // Schedule next iteration only when finished, - // as to avoid multiple tasks running in parallel - ctx.run_later(period, move |act, ctx| { - act.watch_global_status(eth_balance, wit_balance, ctx, period); - }); - actix::fut::ready(()) - }), - ); + ctx.spawn(fut.into_actor(self).then( + move |(eth_balance, wit_balance, drs_history), _act, ctx| { + // Schedule next iteration only when finished, + // as to avoid multiple tasks running in parallel + ctx.run_later(period, move |act, ctx| { + act.watch_global_status(eth_balance, wit_balance, drs_history, ctx, period); + }); + actix::fut::ready(()) + }, + )); } } @@ -263,13 +285,15 @@ async fn check_eth_account_balance( }, Err(e) => { log::debug!("check_eth_account_balance => {}", e); - + Err(WatchDogStatus::EvmErrors) } } } -async fn check_wit_connection_status(wit_client: &Addr) -> Result<(), WatchDogStatus> { +async fn check_wit_connection_status( + wit_client: &Addr, +) -> Result<(), WatchDogStatus> { let req = jsonrpc::Request::method("syncStatus").timeout(Duration::from_secs(5)); let res = wit_client.send(req).await; match res { @@ -279,7 +303,7 @@ async fn check_wit_connection_status(wit_client: &Addr) -> Result "Synced" => Ok(()), "AlmostSynced" => Err(WatchDogStatus::WitAlmostSynced), "WaitingConsensus" => Err(WatchDogStatus::WitWaitingConsensus), - _ => Err(WatchDogStatus::WitSyncing) + _ => Err(WatchDogStatus::WitSyncing), } } else { log::debug!("check_wit_connection_status => unknown node_state"); @@ -297,7 +321,7 @@ async fn check_wit_connection_status(wit_client: &Addr) -> Result } } -async fn fetch_wit_info ( +async fn fetch_wit_info( wit_client: &Addr, wit_utxos_min_threshold: u64, ) -> Result<(Option, Option, Option), WatchDogStatus> { @@ -328,7 +352,7 @@ async fn fetch_wit_info ( log::debug!("fetch_wit_info => {}", err); return Err(WatchDogStatus::WitErrors); } - }; + }; match res { Ok(value) => match value.get("total") { Some(value) => match value.as_f64() { @@ -385,7 +409,6 @@ async fn fetch_wit_info ( } None => None, }; - Ok((wit_account, wit_account_balance, wit_utxos_above_threshold)) }