Skip to content

Commit

Permalink
[sync] out of the set validators fall back to fullnode network (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrotari19 authored and 0o-de-lally committed Feb 28, 2024
1 parent fc6a08f commit 718314c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions state-sync/diem-data-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ impl DiemDataClient {
Ok(connected_peers)
}




/// Returns all priority and regular peers
pub(crate) fn get_priority_and_regular_peers(
&self,
Expand Down Expand Up @@ -519,6 +522,10 @@ impl DiemDataClientInterface for DiemDataClient {
self.global_summary_cache.read().clone()
}

fn get_all_connected_peers(&self) -> crate::error::Result<Vec<PeerNetworkId>, Error> {
self.get_all_connected_peers()
}

async fn get_epoch_ending_ledger_infos(
&self,
start_epoch: Epoch,
Expand Down
5 changes: 5 additions & 0 deletions state-sync/diem-data-client/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{error, error::Error, global_summary::GlobalDataSummary};
use diem_storage_service_types::{responses::TransactionOrOutputListWithProof, Epoch};
use diem_config::network_id::PeerNetworkId;
use diem_types::{
ledger_info::LedgerInfoWithSignatures,
state_store::state_value::StateValueChunkWithProof,
Expand All @@ -21,6 +22,10 @@ pub trait DiemDataClientInterface {
/// cached view of this data client's available data.
fn get_global_data_summary(&self) -> GlobalDataSummary;


/// Fetches all the current connected peers.
fn get_all_connected_peers(&self) -> crate::error::Result<Vec<PeerNetworkId>, Error>;

/// Fetches the epoch ending ledger infos between start and end
/// (inclusive). In some cases, fewer ledger infos may be returned (e.g.,
/// to tolerate network or chunk limits). If the data cannot be fetched,
Expand Down
4 changes: 4 additions & 0 deletions state-sync/diem-data-client/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ mock! {
request_timeout_ms: u64,
) -> Result<Response<Vec<LedgerInfoWithSignatures>>>;

fn get_all_connected_peers(&self) -> crate::error::Result<Vec<PeerNetworkId>, crate::error::Error> {
crate::error::Error::NotImplemented("Not implemented".to_string())
}

async fn get_new_transaction_outputs_with_proof(
&self,
known_version: Version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl DiemDataClientInterface for MockDiemDataClient {
transaction_chunk_size: create_range_random_u64(20, 1000),
transaction_output_chunk_size: create_range_random_u64(20, 1000),
};

// Create a global data summary with a fixed set of data
let advertised_data = AdvertisedData {
states: vec![
Expand Down Expand Up @@ -238,6 +238,12 @@ impl DiemDataClientInterface for MockDiemDataClient {
}
}


fn get_all_connected_peers(&self) -> crate::error::Result<Vec<PeerNetworkId>, Error> {
crate::error::Error::NotImplemented("Not implemented".to_string())
}


async fn get_state_values_with_proof(
&self,
version: Version,
Expand Down
3 changes: 3 additions & 0 deletions state-sync/state-sync-v2/state-sync-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ diem-data-client = { workspace = true }
diem-data-streaming-service = { workspace = true }
diem-event-notifications = { workspace = true }
diem-executor-types = { workspace = true }
diem-sdk = { workspace = true }
diem-infallible = { workspace = true }
diem-logger = { workspace = true }
diem-mempool-notifications = { workspace = true }
Expand All @@ -39,6 +40,7 @@ serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
reqwest = { workspace = true }

[dev-dependencies]
anyhow = { workspace = true }
Expand All @@ -62,3 +64,4 @@ mockall = { workspace = true }
move-core-types = { workspace = true }
ntest = { workspace = true }
rand = { workspace = true }

30 changes: 27 additions & 3 deletions state-sync/state-sync-v2/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ use crate::{
utils,
utils::{OutputFallbackHandler, PENDING_DATA_LOG_FREQ_SECS},
};
use diem_config::config::{RoleType, StateSyncDriverConfig};
use diem_config::{config::{RoleType, StateSyncDriverConfig}, network_id::NetworkId};

use diem_consensus_notifications::{
ConsensusCommitNotification, ConsensusNotification, ConsensusSyncNotification,
};
use diem_data_client::interface::DiemDataClientInterface;
use diem_data_streaming_service::streaming_client::{
DataStreamingClient, NotificationAndFeedback, NotificationFeedback,
};



use diem_event_notifications::EventSubscriptionService;
use diem_infallible::Mutex;
use diem_logger::prelude::*;
Expand All @@ -43,6 +47,7 @@ use tokio::{
};
use tokio_stream::wrappers::IntervalStream;


// Useful constants for the driver
const DRIVER_ERROR_LOG_FREQ_SECS: u64 = 3;

Expand Down Expand Up @@ -508,6 +513,23 @@ impl<
Ok(())
}

/// Returns true iff the validator is in validator set
fn is_in_validator_set(&self) -> bool {
let connected_peers = self.diem_data_client.get_all_connected_peers();
match connected_peers{
Ok(connected_peers) => {
// Check if there is a validator in the connected peers
return self.is_validator() && connected_peers.iter().any(|peer| {
peer.network_id() == NetworkId::Validator
});
},
Err(_) => {
return self.is_validator();
}
}
}


/// Returns true iff there's an active sync request from consensus
fn active_sync_request(&self) -> bool {
self.consensus_notification_handler.active_sync_request()
Expand All @@ -520,9 +542,9 @@ impl<

/// Returns true iff consensus is currently executing
fn check_if_consensus_executing(&self) -> bool {
self.is_validator() && self.bootstrapper.is_bootstrapped() && !self.active_sync_request()
self.is_in_validator_set() && self.bootstrapper.is_bootstrapped() && !self.active_sync_request()
}

/// Checks if the connection deadline has passed. If so, validators with
/// genesis waypoints will be automatically marked as bootstrapped. This
/// helps in the case of single node deployments, where there are no peers
Expand Down Expand Up @@ -588,6 +610,7 @@ impl<

// Drive progress depending on if we're bootstrapping or continuously syncing
if self.bootstrapper.is_bootstrapped() {

// Fetch any consensus sync requests
let consensus_sync_request = self.consensus_notification_handler.get_sync_request();

Expand All @@ -610,6 +633,7 @@ impl<
metrics::increment_counter(&metrics::CONTINUOUS_SYNCER_ERRORS, error.get_label());
}
} else {

metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::Bootstrapper.get_label(),
Expand Down

0 comments on commit 718314c

Please sign in to comment.