diff --git a/bridges/centralized-ethereum/Cargo.toml b/bridges/centralized-ethereum/Cargo.toml index f42e852dc..e271fef85 100644 --- a/bridges/centralized-ethereum/Cargo.toml +++ b/bridges/centralized-ethereum/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] actix = { version = "0.13.0", default-features = false } async-jsonrpc-client = { git = "https://github.com/witnet/async-jsonrpc-client", features = ["tcp"], branch = "fix-tcp-leak" } +chrono = "0.4.38" ctrlc = "3.1.3" env_logger = "0.9.0" envy = "0.4" diff --git a/bridges/centralized-ethereum/src/actors/dr_database.rs b/bridges/centralized-ethereum/src/actors/dr_database.rs index 67046d5eb..fe2ebfb5d 100644 --- a/bridges/centralized-ethereum/src/actors/dr_database.rs +++ b/bridges/centralized-ethereum/src/actors/dr_database.rs @@ -179,6 +179,13 @@ impl Message for SetDrState { type Result = Result<(), ()>; } +/// Count number of data requests in given state +pub struct CountDrsPerState; + +impl Message for CountDrsPerState { + type Result = Result<(u64, u64, u64, u64), ()>; +} + impl Handler for DrDatabase { type Result = (); @@ -274,6 +281,26 @@ impl Handler for DrDatabase { } } +impl Handler for DrDatabase { + type Result = Result<(u64, u64, u64, u64), ()>; + + fn handle(&mut self, _msg: CountDrsPerState, _ctx: &mut Self::Context) -> Self::Result { + Ok(self.dr.iter().fold( + (0u64, 0u64, 0u64, 0u64), + |(mut drs_new, mut drs_pending, mut drs_finished, mut drs_dismissed), + (_dr_id, dr_info)| { + match dr_info.dr_state { + DrState::New => drs_new += 1, + DrState::Pending => drs_pending += 1, + DrState::Finished => drs_finished += 1, + DrState::Dismissed => drs_dismissed += 1, + }; + (drs_new, drs_pending, drs_finished, drs_dismissed) + }, + )) + } +} + /// Required trait for being able to retrieve DrDatabase address from system registry impl actix::Supervised for DrDatabase {} diff --git a/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs b/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs index 58ae02e43..cc43e1957 100644 --- a/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs +++ b/bridges/centralized-ethereum/src/actors/dr_sender/tests.rs @@ -4,14 +4,14 @@ use witnet_data_structures::chain::{RADAggregate, RADRequest, RADRetrieve, RADTa #[test] fn deserialize_empty_dr() { // An empty data request is invalid with error 0xE0: BridgeMalformedRequest - let err = deserialize_and_validate_dr_bytes(&[], 1).unwrap_err(); + let err = deserialize_and_validate_dr_bytes(&[], 1, 1).unwrap_err(); assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 224]); } #[test] fn deserialize_dr_not_protobuf() { // A malformed data request is invalid with error 0xE0: BridgeMalformedRequest - let err = deserialize_and_validate_dr_bytes(&[1, 2, 3, 4], 1).unwrap_err(); + let err = deserialize_and_validate_dr_bytes(&[1, 2, 3, 4], 1, 1).unwrap_err(); assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 224]); } @@ -55,7 +55,7 @@ fn deserialize_dr_high_value() { let dro_bytes = dro.to_pb_bytes().unwrap(); // Setting the maximum allowed value to 1 nanowit below that will result in an error 0xE1: // BridgePoorIncentives - let err = deserialize_and_validate_dr_bytes(&dro_bytes, total_value - 1).unwrap_err(); + let err = deserialize_and_validate_dr_bytes(&dro_bytes, total_value - 1, 1).unwrap_err(); assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 225]); } @@ -78,7 +78,7 @@ fn deserialize_dr_collateral_one_nanowit() { assert_eq!(total_value, 20_000_000); let dro_bytes = dro.to_pb_bytes().unwrap(); - let err = deserialize_and_validate_dr_bytes(&dro_bytes, total_value).unwrap_err(); + let err = deserialize_and_validate_dr_bytes(&dro_bytes, total_value, 1).unwrap_err(); assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 224]); } @@ -95,7 +95,7 @@ fn deserialize_dr_value_overflow() { }; let dro_bytes = dro.to_pb_bytes().unwrap(); - let err = deserialize_and_validate_dr_bytes(&dro_bytes, 1).unwrap_err(); + let err = deserialize_and_validate_dr_bytes(&dro_bytes, 1, 1).unwrap_err(); assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 224]); } @@ -115,6 +115,6 @@ fn deserialize_and_validate_dr_bytes_wip_0022() { let dro_bytes = dro.to_pb_bytes().unwrap(); let witnet_dr_max_value_nanowits = 100_000_000_000; let err = - deserialize_and_validate_dr_bytes(&dro_bytes, witnet_dr_max_value_nanowits).unwrap_err(); + deserialize_and_validate_dr_bytes(&dro_bytes, witnet_dr_max_value_nanowits, 1).unwrap_err(); assert_eq!(err.encode_cbor(), vec![216, 39, 129, 24, 224]); } diff --git a/bridges/centralized-ethereum/src/actors/eth_poller.rs b/bridges/centralized-ethereum/src/actors/eth_poller.rs index 8849f4232..810d7308e 100644 --- a/bridges/centralized-ethereum/src/actors/eth_poller.rs +++ b/bridges/centralized-ethereum/src/actors/eth_poller.rs @@ -113,7 +113,7 @@ impl EthPoller { ); last_dr_id = skip_first; } - while last_dr_id < next_dr_id { + while last_dr_id + 1 < next_dr_id { let init_index = usize::try_from(last_dr_id + 1).unwrap(); let last_index = match next_dr_id.cmp(&(last_dr_id + max_batch_size)) { std::cmp::Ordering::Greater => { diff --git a/bridges/centralized-ethereum/src/actors/mod.rs b/bridges/centralized-ethereum/src/actors/mod.rs index 94a2b84d2..0f80bbc0c 100644 --- a/bridges/centralized-ethereum/src/actors/mod.rs +++ b/bridges/centralized-ethereum/src/actors/mod.rs @@ -12,3 +12,6 @@ pub mod eth_poller; /// wit_poller actor module pub mod wit_poller; + +/// watch_dog actor module +pub mod watch_dog; diff --git a/bridges/centralized-ethereum/src/actors/watch_dog.rs b/bridges/centralized-ethereum/src/actors/watch_dog.rs new file mode 100644 index 000000000..3668698ed --- /dev/null +++ b/bridges/centralized-ethereum/src/actors/watch_dog.rs @@ -0,0 +1,523 @@ +use crate::{ + actors::dr_database::{CountDrsPerState, DrDatabase}, + config::Config, +}; +use actix::prelude::*; +use chrono::{NaiveTime, Timelike, Utc}; +use core::fmt; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use web3::{ + contract::{self, Contract}, + transports::Http, + types::H160, +}; +use witnet_net::client::tcp::{jsonrpc, JsonRpcClient}; +use witnet_node::utils::stop_system_if_panicking; + +/// EthPoller actor reads periodically new requests from the WRB Contract and includes them +/// in the DrDatabase +#[derive(Default)] +pub struct WatchDog { + /// JSON WIT/RPC client connection to Wit/node + pub wit_client: Option>, + /// JSON WIT/RPC socket address + pub wit_jsonrpc_socket: String, + /// Bridge UTXO min value threshold + pub wit_utxo_min_value_threshold: u64, + /// Web3 object + pub eth_jsonrpc_url: String, + /// Web3 signer address + pub eth_account: H160, + /// WitOracle bridge contract + pub eth_contract: Option>>, + /// Polling period for global status + pub polling_rate_minutes: u64, + /// Instant at which the actor is created + pub start_ts: Option, + /// Eth balance upon first metric report: + pub start_eth_balance: Option, + /// Wit balance upon last refund + pub start_wit_balance: Option, + /// Past data request cumulative counters: + pub drs_history: Option<(u64, u64, u64)>, +} + +impl Drop for WatchDog { + fn drop(&mut self) { + log::trace!("Dropping WatchDog"); + stop_system_if_panicking("WatchDog"); + } +} + +/// Make actor from EthPoller +impl Actor for WatchDog { + /// Every actor has to provide execution Context in which it can run. + type Context = Context; + + /// Method to be executed when the actor is started + fn started(&mut self, ctx: &mut Self::Context) { + log::debug!("WatchDog actor has been started!"); + + self.watch_global_status(None, None, None, ctx); + } +} + +#[derive(Debug, PartialEq)] +enum WatchDogStatus { + EvmBalanceLeak, + EvmDisconnect, + EvmErrors, + EvmSyncing, + WitAlmostSynced, + WitBalanceLow, + WitErrors, + WitDisconnect, + WitSyncing, + WitUtxosLow, + WitWaitingConsensus, + UpAndRestarted, + UpAndRunning, +} + +impl fmt::Display for WatchDogStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WatchDogStatus::EvmBalanceLeak => write!(f, "evm-balance-leak"), + WatchDogStatus::EvmDisconnect => write!(f, "evm-disconnect"), + WatchDogStatus::EvmErrors => write!(f, "evm-errors"), + WatchDogStatus::EvmSyncing => write!(f, "evm-syncing"), + WatchDogStatus::WitAlmostSynced => write!(f, "wit-almost-synced"), + WatchDogStatus::WitBalanceLow => write!(f, "wit-balance-low"), + WatchDogStatus::WitDisconnect => write!(f, "wit-disconnect"), + WatchDogStatus::WitErrors => write!(f, "wit-errors"), + WatchDogStatus::WitSyncing => write!(f, "wit-syncing"), + WatchDogStatus::WitUtxosLow => write!(f, "wit-utxos-low"), + WatchDogStatus::WitWaitingConsensus => write!(f, "wit-waiting-consensus"), + WatchDogStatus::UpAndRestarted => write!(f, "up-and-restarted"), + WatchDogStatus::UpAndRunning => write!(f, "up-and-running"), + } + } +} + +/// Required trait for being able to retrieve WatchDog address from system registry +impl actix::Supervised for WatchDog {} +impl SystemService for WatchDog {} + +impl WatchDog { + /// Initialize from config + pub fn from_config(config: &Config, eth_contract: Arc>) -> Self { + Self { + wit_client: JsonRpcClient::start(config.witnet_jsonrpc_socket.to_string().as_str()) + .ok(), + wit_jsonrpc_socket: config.witnet_jsonrpc_socket.to_string(), + wit_utxo_min_value_threshold: config.witnet_utxo_min_value_threshold, + eth_account: config.eth_from, + eth_contract: Some(eth_contract), + eth_jsonrpc_url: config.eth_jsonrpc_url.clone(), + polling_rate_minutes: config.watch_dog_polling_rate_minutes, + start_ts: Some(Instant::now()), + start_eth_balance: None, + start_wit_balance: None, + drs_history: None, + } + } + + fn watch_global_status( + &mut self, + eth_balance: Option, + wit_balance: Option, + drs_history: Option<(u64, u64, u64)>, + ctx: &mut Context, + ) { + if self.start_eth_balance.is_none() && eth_balance.is_some() { + self.start_eth_balance = eth_balance; + } + if let Some(wit_balance) = wit_balance { + if wit_balance > self.start_wit_balance.unwrap_or_default() { + self.start_wit_balance = Some(wit_balance); + log::warn!("Wit account refunded to {} $WIT", wit_balance); + } + } + if self.drs_history.is_none() && drs_history.is_some() { + self.drs_history = drs_history.clone(); + } + let start_eth_balance = self.start_eth_balance; + let start_wit_balance = self.start_wit_balance; + let wit_client = self.wit_client.clone(); + let wit_jsonrpc_socket = self.wit_jsonrpc_socket.clone(); + let mut wit_next_balance = wit_balance; + let wit_utxo_min_value_threshold = self.wit_utxo_min_value_threshold; + let eth_jsonrpc_url = self.eth_jsonrpc_url.clone(); + let eth_account = self.eth_account; + let eth_contract = self.eth_contract.clone().unwrap(); + let eth_contract_address = eth_contract.address(); + let running_secs = self.start_ts.unwrap().elapsed().as_secs(); + let mut drs_history = drs_history.unwrap_or_default(); + + let fut = async move { + let mut status = WatchDogStatus::UpAndRunning; + + let dr_database = DrDatabase::from_registry(); + let (drs_new, drs_pending, drs_finished, drs_dismissed) = + dr_database.send(CountDrsPerState).await.unwrap().unwrap(); + let total_queries = drs_new + drs_pending + drs_finished + drs_dismissed; + + let mut metrics: String = "{".to_string(); + + metrics.push_str(&format!( + "\"drsCurrentlyPending\": {}, ", + drs_new + drs_pending + )); + + drs_history = if drs_history != (0u64, 0u64, 0u64) { + let daily_queries = + ((total_queries - drs_history.2) as f64 / running_secs as f64) * 86400_f64; + metrics.push_str(&format!("\"drsDailyQueries\": {:.1}, ", daily_queries)); + + let last_dismissed = drs_dismissed - drs_history.1; + metrics.push_str(&format!("\"drsLastDismissed\": {last_dismissed}, ")); + + let last_reported = drs_finished - drs_history.0; + metrics.push_str(&format!("\"drsLastReported\": {last_reported}, ")); + + // preserve the number of total queries counted upon bridge launch, + // so average queries per day can be averaged: + (drs_finished, drs_dismissed, drs_history.2) + } else { + status = WatchDogStatus::UpAndRestarted; + (drs_finished, drs_dismissed, total_queries) + }; + metrics.push_str(&format!("\"drsTotalQueries\": {total_queries}, ")); + + let eth_balance = match ( + eth_balance, + check_eth_account_balance(ð_jsonrpc_url, eth_account).await, + ) { + (Some(eth_balance), Ok(Some(new_balance))) => { + if status == WatchDogStatus::UpAndRunning && new_balance < eth_balance { + status = WatchDogStatus::EvmBalanceLeak + } + Some(new_balance) + } + (_, Ok(new_balance)) => new_balance, + (_, Err(err)) => { + if status == WatchDogStatus::UpAndRunning { + status = err; + } + None + } + }; + + let eth_contract_class: Option = match eth_contract + .query("class", (), None, contract::Options::default(), None) + .await + { + Ok(version) => Some(version), + Err(err) => { + log::error!( + "Fail to read class() from contract at {:?}: {:?}", + eth_contract_address, + err.to_string() + ); + if status == WatchDogStatus::UpAndRunning { + status = WatchDogStatus::EvmErrors; + } + None + } + }; + + let eth_contract_version: Option = match eth_contract + .query("version", (), None, contract::Options::default(), None) + .await + { + Ok(version) => Some(version), + Err(web3::contract::Error::InterfaceUnsupported) => None, + Err(err) => { + log::error!( + "Fail to read version() from contract at {:?}: {:?}", + eth_contract_address, + err.to_string() + ); + if status == WatchDogStatus::UpAndRunning { + status = WatchDogStatus::EvmErrors; + } + None + } + }; + + metrics.push_str(&format!("\"evmAccount\": \"{eth_account}\", ")); + if eth_balance.is_some() { + let eth_balance = eth_balance.unwrap(); + metrics.push_str(&format!("\"evmBalance\": {:.5}, ", eth_balance)); + metrics.push_str(&format!("\"evmContract\": \"{eth_contract_address}\", ")); + if let Some(eth_contract_class) = eth_contract_class { + if let Some(eth_contract_version) = eth_contract_version { + metrics.push_str(&format!( + "\"evmContractVersion\": \"{}:{}\", ", + eth_contract_class, eth_contract_version + )); + } else { + metrics.push_str(&format!( + "\"evmContractVersion\": {:?}, ", + eth_contract_class + )); + } + } + if let Some(start_eth_balance) = start_eth_balance { + let eth_hourly_earnings = + ((eth_balance - start_eth_balance) / running_secs as f64) * 3600_f64; + metrics.push_str(&format!( + "\"evmHourlyEarnings\": {:.5}, ", + eth_hourly_earnings + )); + } + } + + if let Some(wit_client) = wit_client { + if let Err(err) = check_wit_connection_status(&wit_client).await { + status = err; + } + + let (wit_account, wit_balance, wit_utxos_above_threshold) = + match fetch_wit_info(&wit_client, wit_utxo_min_value_threshold).await { + Ok((wit_account, wit_balance, wit_utxos_above_threshold)) => { + (wit_account, wit_balance, wit_utxos_above_threshold) + } + Err(err) => { + if status == WatchDogStatus::UpAndRunning { + status = err; + } + (None, None, None) + } + }; + + if wit_account.is_some() { + metrics.push_str(&format!("\"witAccount\": {:?}, ", wit_account.unwrap())); + } + if wit_balance.is_some() { + let wit_balance = wit_balance.unwrap(); + metrics.push_str(&format!("\"witBalance\": {:.5}, ", wit_balance)); + if let Some(start_wit_balance) = start_wit_balance { + let wit_hourly_expenditure = + ((start_wit_balance - wit_balance) / running_secs as f64) * 3600_f64; + metrics.push_str(&format!( + "\"witHourlyExpenditure\": {:.1}, ", + wit_hourly_expenditure + )); + if wit_hourly_expenditure > 0.0 + && wit_balance / wit_hourly_expenditure < 72.0 + { + if status == WatchDogStatus::UpAndRunning { + status = WatchDogStatus::WitBalanceLow; + } + } + } + } + metrics.push_str(&format!("\"witNodeSocket\": \"{wit_jsonrpc_socket}\", ")); + if let Some(wit_utxos_above_threshold) = wit_utxos_above_threshold { + metrics.push_str(&format!( + "\"witUtxosAboveThreshold\": {}, ", + wit_utxos_above_threshold + )); + if wit_utxos_above_threshold < 10 { + if status == WatchDogStatus::UpAndRunning { + status = WatchDogStatus::WitUtxosLow; + } + } + } + + wit_next_balance = wit_balance; + } + + metrics.push_str(&format!("\"runningSecs\": {running_secs}, ")); + metrics.push_str(&format!("\"status\": \"{}\"", status)); + metrics.push('}'); + + log::info!("{metrics}"); + + (eth_balance, wit_next_balance, Some(drs_history)) + }; + + ctx.spawn(fut.into_actor(self).then( + move |(eth_balance, wit_balance, drs_history), act, ctx| { + let time_now = Utc::now().time(); + let period_minutes = act.polling_rate_minutes as u32; + let time_next_minute = + period_minutes * (time_now.minute().div_euclid(period_minutes) + 1); + let time_next = if time_next_minute >= 60 { + NaiveTime::from_hms_opt(time_now.hour() + 1, time_next_minute - 60, 0) + } else { + NaiveTime::from_hms_opt(time_now.hour(), time_next_minute, 0) + }; + let dur = if let Some(time_next) = time_next { + let num_nanosecs = (time_next - time_now).num_nanoseconds(); + if let Some(num_nanosecs) = num_nanosecs { + Duration::from_nanos(num_nanosecs.abs() as u64) + } else { + Duration::from_secs((period_minutes * 60) as u64) + } + } else { + Duration::from_secs((period_minutes * 60) as u64) + }; + // Schedule next iteration only when finished, + // as to avoid multiple tasks running in parallel + ctx.run_later(dur, move |act, ctx| { + act.watch_global_status(eth_balance, wit_balance, drs_history, ctx); + }); + actix::fut::ready(()) + }, + )); + } +} + +async fn check_eth_account_balance( + eth_jsonrpc_url: &str, + eth_account: H160, +) -> Result, WatchDogStatus> { + let web3_http = web3::transports::Http::new(eth_jsonrpc_url) + .map_err(|_e| WatchDogStatus::EvmDisconnect) + .unwrap(); + + let web3 = web3::Web3::new(web3_http); + match web3.eth().syncing().await { + Ok(syncing) => match syncing { + web3::types::SyncState::NotSyncing => { + match web3.eth().balance(eth_account, None).await { + Ok(eth_balance) => { + let eth_balance: f64 = eth_balance.to_string().parse().unwrap_or_default(); + Ok(Some(eth_balance / 1000000000000000000.0)) + } + _ => Ok(None), + } + } + web3::types::SyncState::Syncing(_) => Err(WatchDogStatus::EvmSyncing), + }, + Err(e) => { + log::debug!("check_eth_account_balance => {}", e); + + Err(WatchDogStatus::EvmErrors) + } + } +} + +async fn check_wit_connection_status( + wit_client: &Addr, +) -> Result<(), WatchDogStatus> { + let req = jsonrpc::Request::method("syncStatus").timeout(Duration::from_secs(5)); + let res = wit_client.send(req).await; + match res { + Ok(Ok(result)) => { + if let Some(node_state) = result["node_state"].as_str() { + match node_state { + "Synced" => Ok(()), + "AlmostSynced" => Err(WatchDogStatus::WitAlmostSynced), + "WaitingConsensus" => Err(WatchDogStatus::WitWaitingConsensus), + _ => Err(WatchDogStatus::WitSyncing), + } + } else { + log::debug!("check_wit_connection_status => unknown node_state"); + Err(WatchDogStatus::WitErrors) + } + } + Ok(Err(err)) => { + log::debug!("check_wit_connection_status => {}", err); + Err(WatchDogStatus::WitDisconnect) + } + Err(err) => { + log::debug!("check_wit_connection_status => {}", err); + Err(WatchDogStatus::WitDisconnect) + } + } +} + +async fn fetch_wit_info( + wit_client: &Addr, + wit_utxos_min_threshold: u64, +) -> Result<(Option, Option, Option), WatchDogStatus> { + let req = jsonrpc::Request::method("getPkh").timeout(Duration::from_secs(5)); + let res = wit_client.send(req).await; + let wit_account = match res { + Ok(Ok(res)) => match serde_json::from_value::(res) { + Ok(pkh) => Some(pkh), + Err(_) => None, + }, + Ok(Err(_)) => None, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + }; + + let wit_account_balance = match wit_account.clone() { + Some(wit_account) => { + let req = jsonrpc::Request::method("getBalance") + .timeout(Duration::from_secs(5)) + .params(wit_account) + .expect("getBalance wrong params"); + let res = wit_client.send(req).await; + let res = match res { + Ok(res) => res, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + }; + match res { + Ok(value) => match value.get("total") { + Some(value) => value.as_f64().map(|value| value / 1000000000.0), + None => None, + }, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + } + } + None => None, + }; + + let wit_utxos_above_threshold = match wit_account.clone() { + Some(wit_account) => { + let req = jsonrpc::Request::method("getUtxoInfo") + .timeout(Duration::from_secs(5)) + .params(wit_account) + .expect("getUtxoInfo wrong params"); + let res = wit_client.send(req).await; + let res = match res { + Ok(res) => res, + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + }; + match res { + Ok(utxo_info) => { + if let Some(utxos) = utxo_info["utxos"].as_array() { + let mut counter: u64 = u64::default(); + for utxo in utxos { + if let Some(value) = utxo["value"].as_u64() { + if value >= wit_utxos_min_threshold { + counter += 1; + } + } + } + + Some(counter) + } else { + None + } + } + Err(err) => { + log::debug!("fetch_wit_info => {}", err); + return Err(WatchDogStatus::WitErrors); + } + } + } + None => None, + }; + + Ok((wit_account, wit_account_balance, wit_utxos_above_threshold)) +} diff --git a/bridges/centralized-ethereum/src/config.rs b/bridges/centralized-ethereum/src/config.rs index eb79db3cb..694a62772 100644 --- a/bridges/centralized-ethereum/src/config.rs +++ b/bridges/centralized-ethereum/src/config.rs @@ -37,6 +37,12 @@ pub struct Config { /// Address of the WitnetRequestsBoard contract pub eth_witnet_oracle: H160, + /// Let the dog out? + pub watch_dog_enabled: bool, + /// Watch dog polling rate + #[serde(default = "default_watch_dog_polling_rate_minutes")] + pub watch_dog_polling_rate_minutes: u64, + /// Minimum collateral required on data requests read from the WitnetOracle contract pub witnet_dr_min_collateral_nanowits: u64, /// Maximium data request transaction fee assumed by the bridge @@ -53,6 +59,8 @@ pub struct Config { pub witnet_jsonrpc_socket: SocketAddr, /// Running in the witnet testnet? pub witnet_testnet: bool, + /// Bridge UTXO min value threshold + pub witnet_utxo_min_value_threshold: u64, /// Storage #[serde(deserialize_with = "nested_toml_if_using_envy")] @@ -69,6 +77,10 @@ fn default_max_batch_size() -> u16 { 256 } +fn default_watch_dog_polling_rate_minutes() -> u64 { + 15 +} + /// Gas limits for some methods. If missing, let the client estimate #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] diff --git a/bridges/centralized-ethereum/src/main.rs b/bridges/centralized-ethereum/src/main.rs index c6f20d8f7..c8e6138d4 100644 --- a/bridges/centralized-ethereum/src/main.rs +++ b/bridges/centralized-ethereum/src/main.rs @@ -7,7 +7,7 @@ use structopt::StructOpt; use witnet_centralized_ethereum_bridge::{ actors::{ dr_database::DrDatabase, dr_reporter::DrReporter, dr_sender::DrSender, - eth_poller::EthPoller, wit_poller::WitPoller, + eth_poller::EthPoller, watch_dog::WatchDog, wit_poller::WitPoller, }, check_ethereum_node_running, check_witnet_node_running, config, create_wrb_contract, }; @@ -83,6 +83,7 @@ fn run(callback: fn()) -> Result<(), String> { check_ethereum_node_running(&config.eth_jsonrpc_url) .await .expect("ethereum node not running"); + check_witnet_node_running(&config.witnet_jsonrpc_socket.to_string()) .await .expect("witnet node not running"); @@ -94,6 +95,7 @@ fn run(callback: fn()) -> Result<(), String> { // Web3 contract using HTTP transport with an Ethereum client let (web3, wrb_contract) = create_wrb_contract(&config.eth_jsonrpc_url, config.eth_witnet_oracle); + let wrb_contract = Arc::new(wrb_contract); // Start EthPoller actor @@ -102,25 +104,32 @@ fn run(callback: fn()) -> Result<(), String> { SystemRegistry::set(eth_poller_addr); // Start DrReporter actor - let dr_reporter_addr = DrReporter::from_config(&config, web3, wrb_contract).start(); + let dr_reporter_addr = + DrReporter::from_config(&config, web3.clone(), wrb_contract.clone()).start(); SystemRegistry::set(dr_reporter_addr); // Start Json-RPC actor connected to Witnet node let node_client = JsonRpcClient::start(&config.witnet_jsonrpc_socket.to_string()) - .expect("Json-RPC Client actor failed to started"); + .expect("JSON WIT/RPC node client failed to start"); // Start WitPoller actor let wit_poller_addr = WitPoller::from_config(&config, node_client.clone()).start(); SystemRegistry::set(wit_poller_addr); // Start DrSender actor - let dr_sender_addr = DrSender::from_config(&config, node_client).start(); + let dr_sender_addr = DrSender::from_config(&config, node_client.clone()).start(); SystemRegistry::set(dr_sender_addr); // Initialize Storage Manager let mut node_config = NodeConfig::default(); node_config.storage.db_path = config.storage.db_path.clone(); storage_mngr::start_from_config(node_config); + + // Start WatchDog actor + if config.watch_dog_enabled { + let watch_dog_addr = WatchDog::from_config(&config, wrb_contract.clone()).start(); + SystemRegistry::set(watch_dog_addr); + } }); // Run system diff --git a/bridges/wrb_abi.json b/bridges/wrb_abi.json index 6301e2155..543139cf4 100644 --- a/bridges/wrb_abi.json +++ b/bridges/wrb_abi.json @@ -308,5 +308,18 @@ ], "stateMutability": "view", "type": "function" + }, + { + "inputs": [], + "name": "version", + "outputs": [ + { + "internalType": "string", + "name": "", + "type": "string" + } + ], + "stateMutability": "view", + "type": "function" } ] \ No newline at end of file diff --git a/data_structures/src/staking/errors.rs b/data_structures/src/staking/errors.rs new file mode 100644 index 000000000..03ac013a8 --- /dev/null +++ b/data_structures/src/staking/errors.rs @@ -0,0 +1,105 @@ +use crate::staking::helpers::StakeKey; +use failure::Fail; +use std::{ + convert::From, + fmt::{Debug, Display}, + sync::PoisonError, +}; + +/// All errors related to the staking functionality. +#[derive(Debug, Eq, PartialEq, Fail)] +pub enum StakesError +where + Address: Debug + Display + Sync + Send + 'static, + Coins: Debug + Display + Sync + Send + 'static, + Epoch: Debug + Display + Sync + Send + 'static, +{ + /// The amount of coins being staked or the amount that remains after unstaking is below the + /// minimum stakeable amount. + #[fail( + display = "The amount of coins being staked ({}) or the amount that remains after unstaking is below the minimum stakeable amount ({})", + amount, minimum + )] + AmountIsBelowMinimum { + /// The number of coins being staked or remaining after staking. + amount: Coins, + /// The minimum stakeable amount. + minimum: Coins, + }, + /// Tried to query `Stakes` for information that belongs to the past. + #[fail( + display = "Tried to query `Stakes` for information that belongs to the past. Query Epoch: {} Latest Epoch: {}", + epoch, latest + )] + EpochInThePast { + /// The Epoch being referred. + epoch: Epoch, + /// The latest Epoch. + latest: Epoch, + }, + /// An operation thrown an Epoch value that overflows. + #[fail( + display = "An operation thrown an Epoch value that overflows. Computed Epoch: {} Maximum Epoch: {}", + computed, maximum + )] + EpochOverflow { + /// The computed Epoch value. + computed: u64, + /// The maximum Epoch. + maximum: Epoch, + }, + /// Tried to query for a stake entry that is not registered in `Stakes`. + #[fail( + display = "Tried to query for a stake entry that is not registered in Stakes {}", + key + )] + EntryNotFound { + /// A validator and withdrawer address pair. + key: StakeKey
, + }, + /// Tried to obtain a lock on a write-locked piece of data that is already locked. + #[fail( + display = "The authentication signature contained within a stake transaction is not valid for the given validator and withdrawer addresses" + )] + PoisonedLock, + /// The authentication signature contained within a stake transaction is not valid for the given validator and + /// withdrawer addresses. + #[fail( + display = "The authentication signature contained within a stake transaction is not valid for the given validator and withdrawer addresses" + )] + InvalidAuthentication, + /// Tried to query for a stake entry by validator that is not registered in `Stakes`. + #[fail( + display = "Tried to query for a stake entry by validator ({}) that is not registered in Stakes", + validator + )] + ValidatorNotFound { + /// A validator address. + validator: Address, + }, + /// Tried to query for a stake entry by withdrawer that is not registered in `Stakes`. + #[fail( + display = "Tried to query for a stake entry by withdrawer ({}) that is not registered in Stakes", + withdrawer + )] + WithdrawerNotFound { + /// A withdrawer address. + withdrawer: Address, + }, + /// Tried to query for a stake entry without providing a validator or a withdrawer address. + #[fail( + display = "Tried to query a stake entry without providing a validator or a withdrawer address" + )] + EmptyQuery, +} + +impl From> for StakesError +where + Address: Debug + Display + Sync + Send + 'static, + Coins: Debug + Display + Sync + Send + 'static, + Epoch: Debug + Display + Sync + Send + 'static, +{ + fn from(_value: PoisonError) -> Self { + StakesError::PoisonedLock + } +} diff --git a/data_structures/src/staking/helpers.rs b/data_structures/src/staking/helpers.rs new file mode 100644 index 000000000..c32ea04cf --- /dev/null +++ b/data_structures/src/staking/helpers.rs @@ -0,0 +1,182 @@ +use std::fmt::{Debug, Display, Formatter}; +use std::{rc::Rc, str::FromStr, sync::RwLock}; + +use failure::Error; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::{chain::PublicKeyHash, proto::ProtobufConvert}; + +use crate::staking::prelude::*; + +/// Just a type alias for consistency of using the same data type to represent power. +pub type Power = u64; + +/// The resulting type for all the fallible functions in this module. +pub type StakesResult = Result>; + +/// Newtype for a reference-counted and read-write-locked instance of `Stake`. +/// +/// This newtype is needed for implementing `PartialEq` manually on the locked data, which cannot be done directly +/// because those are externally owned types. +#[derive(Clone, Debug, Default)] +pub struct SyncStake +where + Address: Default, + Epoch: Default, +{ + /// The lock itself. + pub value: Rc>>, +} + +impl From> + for SyncStake +where + Address: Default, + Epoch: Default, +{ + #[inline] + fn from(value: Stake) -> Self { + SyncStake { + value: Rc::new(RwLock::new(value)), + } + } +} + +impl PartialEq for SyncStake +where + Address: Default, + Epoch: Default + PartialEq, + Coins: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + let self_stake = self.value.read().unwrap(); + let other_stake = other.value.read().unwrap(); + + self_stake.coins.eq(&other_stake.coins) && other_stake.epochs.eq(&other_stake.epochs) + } +} + +impl<'de, Address, Coins, Epoch, Power> Deserialize<'de> for SyncStake +where + Address: Default, + Epoch: Default, + Stake: Deserialize<'de>, +{ + #[inline] + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + >::deserialize(deserializer).map(SyncStake::from) + } +} + +impl Serialize for SyncStake +where + Address: Default, + Epoch: Default, + Stake: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + self.value.read().unwrap().serialize(serializer) + } +} + +/// Couples a validator address with a withdrawer address together. This is meant to be used in `Stakes` as the index +/// for the `by_key` index. +#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)] +pub struct StakeKey
{ + /// A validator address. + pub validator: Address, + /// A withdrawer address. + pub withdrawer: Address, +} + +impl ProtobufConvert for StakeKey { + type ProtoStruct = crate::proto::schema::witnet::StakeKey; + + fn to_pb(&self) -> Self::ProtoStruct { + let mut proto = Self::ProtoStruct::new(); + proto.set_validator(self.validator.to_pb()); + proto.set_withdrawer(self.withdrawer.to_pb()); + + proto + } + + fn from_pb(mut pb: Self::ProtoStruct) -> Result { + let validator = PublicKeyHash::from_pb(pb.take_validator())?; + let withdrawer = PublicKeyHash::from_pb(pb.take_withdrawer())?; + + Ok(Self { + validator, + withdrawer, + }) + } +} + +impl From<(T, T)> for StakeKey
+where + T: Into
, +{ + fn from(val: (T, T)) -> Self { + StakeKey { + validator: val.0.into(), + withdrawer: val.1.into(), + } + } +} + +impl
From<&str> for StakeKey
+where + Address: FromStr, +
::Err: std::fmt::Debug, +{ + fn from(val: &str) -> Self { + StakeKey { + validator: Address::from_str(val).unwrap(), + withdrawer: Address::from_str(val).unwrap(), + } + } +} + +impl
Display for StakeKey
+where + Address: Display, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "validator: {} withdrawer: {}", + self.validator, self.withdrawer + ) + } +} + +/// Couples an amount of coins, a validator address and a withdrawer address together. This is meant to be used in +/// `Stakes` as the index of the `by_coins` index. +#[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] +pub struct CoinsAndAddresses { + /// An amount of coins. + pub coins: Coins, + /// A validator and withdrawer addresses pair. + pub addresses: StakeKey
, +} + +/// Allows telling the `census` method in `Stakes` to source addresses from its internal `by_coins` +/// following different strategies. +#[repr(u8)] +#[derive(Clone, Copy, Debug)] +pub enum CensusStrategy { + /// Retrieve all addresses, ordered by decreasing power. + All = 0, + /// Retrieve every Nth address, ordered by decreasing power. + StepBy(usize) = 1, + /// Retrieve the most powerful N addresses, ordered by decreasing power. + Take(usize) = 2, + /// Retrieve a total of N addresses, evenly distributed from the index, ordered by decreasing + /// power. + Evenly(usize) = 3, +} diff --git a/data_structures/src/staking/mod.rs b/data_structures/src/staking/mod.rs new file mode 100644 index 000000000..d678c71e3 --- /dev/null +++ b/data_structures/src/staking/mod.rs @@ -0,0 +1,107 @@ +#![deny(missing_docs)] + +/// Auxiliary convenience types and data structures. +pub mod helpers; +/// Constants related to the staking functionality. +pub mod constants; +/// Errors related to the staking functionality. +pub mod errors; +/// The data structure and related logic for stake entries. +pub mod stake; +/// The data structure and related logic for keeping track of multiple stake entries. +pub mod stakes; + +/// Module re-exporting virtually every submodule on a single level to ease importing of everything +/// staking-related. +pub mod prelude { + pub use crate::capabilities::*; + + pub use super::helpers::*; + pub use super::constants::*; + pub use super::errors::*; + pub use super::stake::*; + pub use super::stakes::*; +} + +#[cfg(test)] +pub mod test { + use super::prelude::*; + + #[test] + fn test_e2e() { + let mut stakes = Stakes::::with_minimum(1); + + // Alpha stakes 2 @ epoch 0 + stakes.add_stake("Alpha", 2, 0).unwrap(); + + // Nobody holds any power just yet + let rank = stakes.rank(Capability::Mining, 0).collect::>(); + assert_eq!(rank, vec![("Alpha".into(), 0)]); + + // One epoch later, Alpha starts to hold power + let rank = stakes.rank(Capability::Mining, 1).collect::>(); + assert_eq!(rank, vec![("Alpha".into(), 2)]); + + // Beta stakes 5 @ epoch 10 + stakes.add_stake("Beta", 5, 10).unwrap(); + + // Alpha is still leading, but Beta has scheduled its takeover + let rank = stakes.rank(Capability::Mining, 10).collect::>(); + assert_eq!(rank, vec![("Alpha".into(), 20), ("Beta".into(), 0)]); + + // Beta eventually takes over after epoch 16 + let rank = stakes.rank(Capability::Mining, 16).collect::>(); + assert_eq!(rank, vec![("Alpha".into(), 32), ("Beta".into(), 30)]); + let rank = stakes.rank(Capability::Mining, 17).collect::>(); + assert_eq!(rank, vec![("Beta".into(), 35), ("Alpha".into(), 34)]); + + // Gamma should never take over, even in a million epochs, because it has only 1 coin + stakes.add_stake("Gamma", 1, 30).unwrap(); + let rank = stakes + .rank(Capability::Mining, 1_000_000) + .collect::>(); + assert_eq!( + rank, + vec![ + ("Beta".into(), 4_999_950), + ("Alpha".into(), 2_000_000), + ("Gamma".into(), 999_970) + ] + ); + + // But Delta is here to change it all + stakes.add_stake("Delta", 1_000, 50).unwrap(); + let rank = stakes.rank(Capability::Mining, 50).collect::>(); + assert_eq!( + rank, + vec![ + ("Beta".into(), 200), + ("Alpha".into(), 100), + ("Gamma".into(), 20), + ("Delta".into(), 0) + ] + ); + let rank = stakes.rank(Capability::Mining, 51).collect::>(); + assert_eq!( + rank, + vec![ + ("Delta".into(), 1_000), + ("Beta".into(), 205), + ("Alpha".into(), 102), + ("Gamma".into(), 21) + ] + ); + + // If Alpha removes all of its stake, it should immediately disappear + stakes.remove_stake("Alpha", 2).unwrap(); + let rank = stakes.rank(Capability::Mining, 51).collect::>(); + assert_eq!( + rank, + vec![ + ("Delta".into(), 1_000), + ("Beta".into(), 205), + ("Gamma".into(), 21), + ] + ); + } +} diff --git a/witnet_centralized_ethereum_bridge.toml b/witnet_centralized_ethereum_bridge.toml index 3a47efa82..bce7b8976 100644 --- a/witnet_centralized_ethereum_bridge.toml +++ b/witnet_centralized_ethereum_bridge.toml @@ -25,18 +25,23 @@ eth_txs_timeout_ms = 900000 # Address of the WitnetRequestsBoard deployed contract eth_witnet_oracle = "0x77703aE126B971c9946d562F41Dd47071dA00777" +# Let the dog out? +watch_dog_enabled = true + +# Polling period for checking and tracing global status +watch_dog_polling_rate_minutes = 1 # Minimum collateral required on data requests read from the WitnetOracle contract -witnet_dr_min_collateral_nanowits = 20000000000 +witnet_dr_min_collateral_nanowits = 20_000_000_000 # Maximium data request transaction fee assumed by the bridge -witnet_dr_max_fee_nanowits = 100000 +witnet_dr_max_fee_nanowits = 100_000 # Maximum data request result size (in bytes) will accept to report witnet_dr_max_result_size = 64 # Maximum data request value that the bridge will accept to relay -witnet_dr_max_value_nanowits = 100000000000 +witnet_dr_max_value_nanowits = 100_000_000_000 # Polling period for checking resolution of data requests in the Witnet blockchain witnet_dr_txs_polling_rate_ms = 45000 @@ -50,6 +55,8 @@ witnet_jsonrpc_socket = "127.0.0.1:21338" # Running in the witnet testnet? witnet_testnet = false +# Bridge UTXO min value threshold +witnet_utxo_min_value_threshold = 2_000_000_000 [eth_gas_limits] # Gas limits for some methods. @@ -59,3 +66,4 @@ witnet_testnet = false [storage] # Path of the folder where RocksDB storage files will be written to. db_path = ".witnet_bridge/storage" +