diff --git a/Cargo.lock b/Cargo.lock index adb5d9e06cd4..afa58315c133 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4865,6 +4865,7 @@ dependencies = [ "diem-runtimes", "diem-schemadb", "diem-scratchpad", + "diem-sdk", "diem-storage-interface", "diem-storage-service-client", "diem-storage-service-types", @@ -4879,6 +4880,7 @@ dependencies = [ "ntest", "once_cell", "rand 0.7.3", + "reqwest", "serde 1.0.149", "thiserror", "tokio", diff --git a/state-sync/diem-data-client/src/client.rs b/state-sync/diem-data-client/src/client.rs index ae847edfb815..6a8cae376d91 100644 --- a/state-sync/diem-data-client/src/client.rs +++ b/state-sync/diem-data-client/src/client.rs @@ -281,6 +281,9 @@ impl DiemDataClient { Ok(connected_peers) } + + + /// Returns all priority and regular peers pub(crate) fn get_priority_and_regular_peers( &self, @@ -519,6 +522,10 @@ impl DiemDataClientInterface for DiemDataClient { self.global_summary_cache.read().clone() } + fn get_all_connected_peers(&self) -> crate::error::Result, Error> { + self.get_all_connected_peers() + } + async fn get_epoch_ending_ledger_infos( &self, start_epoch: Epoch, diff --git a/state-sync/diem-data-client/src/interface.rs b/state-sync/diem-data-client/src/interface.rs index d415e0a8bf3e..b1281fb78734 100644 --- a/state-sync/diem-data-client/src/interface.rs +++ b/state-sync/diem-data-client/src/interface.rs @@ -3,6 +3,7 @@ use crate::{error, error::Error, global_summary::GlobalDataSummary}; use diem_storage_service_types::{responses::TransactionOrOutputListWithProof, Epoch}; +use diem_config::network_id::PeerNetworkId; use diem_types::{ ledger_info::LedgerInfoWithSignatures, state_store::state_value::StateValueChunkWithProof, @@ -21,6 +22,10 @@ pub trait DiemDataClientInterface { /// cached view of this data client's available data. fn get_global_data_summary(&self) -> GlobalDataSummary; + + /// Fetches all the current connected peers. + fn get_all_connected_peers(&self) -> crate::error::Result, Error>; + /// Fetches the epoch ending ledger infos between start and end /// (inclusive). In some cases, fewer ledger infos may be returned (e.g., /// to tolerate network or chunk limits). If the data cannot be fetched, diff --git a/state-sync/diem-data-client/src/tests/mock.rs b/state-sync/diem-data-client/src/tests/mock.rs index 6b71c4f5d7d2..d3bab7d84c62 100644 --- a/state-sync/diem-data-client/src/tests/mock.rs +++ b/state-sync/diem-data-client/src/tests/mock.rs @@ -208,6 +208,10 @@ mock! { request_timeout_ms: u64, ) -> Result>>; + fn get_all_connected_peers(&self) -> crate::error::Result, crate::error::Error> { + crate::error::Error::NotImplemented("Not implemented".to_string()) + } + async fn get_new_transaction_outputs_with_proof( &self, known_version: Version, diff --git a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs index 3610d4514395..f097d00607a4 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs @@ -209,7 +209,7 @@ impl DiemDataClientInterface for MockDiemDataClient { transaction_chunk_size: create_range_random_u64(20, 1000), transaction_output_chunk_size: create_range_random_u64(20, 1000), }; - + // Create a global data summary with a fixed set of data let advertised_data = AdvertisedData { states: vec![ @@ -238,6 +238,12 @@ impl DiemDataClientInterface for MockDiemDataClient { } } + + fn get_all_connected_peers(&self) -> crate::error::Result, Error> { + crate::error::Error::NotImplemented("Not implemented".to_string()) + } + + async fn get_state_values_with_proof( &self, version: Version, diff --git a/state-sync/state-sync-v2/state-sync-driver/Cargo.toml b/state-sync/state-sync-v2/state-sync-driver/Cargo.toml index 347f27411201..1b1fde889100 100644 --- a/state-sync/state-sync-v2/state-sync-driver/Cargo.toml +++ b/state-sync/state-sync-v2/state-sync-driver/Cargo.toml @@ -21,6 +21,7 @@ diem-data-client = { workspace = true } diem-data-streaming-service = { workspace = true } diem-event-notifications = { workspace = true } diem-executor-types = { workspace = true } +diem-sdk = { workspace = true } diem-infallible = { workspace = true } diem-logger = { workspace = true } diem-mempool-notifications = { workspace = true } @@ -39,6 +40,7 @@ serde = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } +reqwest = { workspace = true } [dev-dependencies] anyhow = { workspace = true } @@ -62,3 +64,4 @@ mockall = { workspace = true } move-core-types = { workspace = true } ntest = { workspace = true } rand = { workspace = true } + diff --git a/state-sync/state-sync-v2/state-sync-driver/src/driver.rs b/state-sync/state-sync-v2/state-sync-driver/src/driver.rs index 39e509d8ba28..441774354cb3 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/driver.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/driver.rs @@ -20,7 +20,8 @@ use crate::{ utils, utils::{OutputFallbackHandler, PENDING_DATA_LOG_FREQ_SECS}, }; -use diem_config::config::{RoleType, StateSyncDriverConfig}; +use diem_config::{config::{RoleType, StateSyncDriverConfig}, network_id::NetworkId}; + use diem_consensus_notifications::{ ConsensusCommitNotification, ConsensusNotification, ConsensusSyncNotification, }; @@ -28,6 +29,9 @@ use diem_data_client::interface::DiemDataClientInterface; use diem_data_streaming_service::streaming_client::{ DataStreamingClient, NotificationAndFeedback, NotificationFeedback, }; + + + use diem_event_notifications::EventSubscriptionService; use diem_infallible::Mutex; use diem_logger::prelude::*; @@ -43,6 +47,7 @@ use tokio::{ }; use tokio_stream::wrappers::IntervalStream; + // Useful constants for the driver const DRIVER_ERROR_LOG_FREQ_SECS: u64 = 3; @@ -508,6 +513,23 @@ impl< Ok(()) } + /// Returns true iff the validator is in validator set + fn is_in_validator_set(&self) -> bool { + let connected_peers = self.diem_data_client.get_all_connected_peers(); + match connected_peers{ + Ok(connected_peers) => { + // Check if there is a validator in the connected peers + return self.is_validator() && connected_peers.iter().any(|peer| { + peer.network_id() == NetworkId::Validator + }); + }, + Err(_) => { + return self.is_validator(); + } + } + } + + /// Returns true iff there's an active sync request from consensus fn active_sync_request(&self) -> bool { self.consensus_notification_handler.active_sync_request() @@ -520,9 +542,9 @@ impl< /// Returns true iff consensus is currently executing fn check_if_consensus_executing(&self) -> bool { - self.is_validator() && self.bootstrapper.is_bootstrapped() && !self.active_sync_request() + self.is_in_validator_set() && self.bootstrapper.is_bootstrapped() && !self.active_sync_request() } - + /// Checks if the connection deadline has passed. If so, validators with /// genesis waypoints will be automatically marked as bootstrapped. This /// helps in the case of single node deployments, where there are no peers @@ -588,6 +610,7 @@ impl< // Drive progress depending on if we're bootstrapping or continuously syncing if self.bootstrapper.is_bootstrapped() { + // Fetch any consensus sync requests let consensus_sync_request = self.consensus_notification_handler.get_sync_request(); @@ -610,6 +633,7 @@ impl< metrics::increment_counter(&metrics::CONTINUOUS_SYNCER_ERRORS, error.get_label()); } } else { + metrics::increment_counter( &metrics::EXECUTING_COMPONENT, ExecutingComponent::Bootstrapper.get_label(),