Skip to content

Commit

Permalink
[consensus] send to distant peers first on broadcast (aptos-labs#13568)
Browse files Browse the repository at this point in the history
* [consensus] send to distant peers first on broadcast
  • Loading branch information
ibalajiarun authored Jun 14, 2024
1 parent 42ef25e commit 2823f2e
Show file tree
Hide file tree
Showing 22 changed files with 200 additions and 165 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ impl RBNetworkSender<DAGMessage, DAGRpcResult> for MockNetworkSender {
) -> anyhow::Result<HashMap<Author, Bytes>> {
Ok(peers.into_iter().map(|peer| (peer, Bytes::new())).collect())
}

fn sort_peers_by_latency(&self, _: &mut [Author]) {}
}

#[async_trait]
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/dag/tests/dag_network_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ impl RBNetworkSender<DAGMessage, DAGRpcResult> for MockDAGNetworkSender {
) -> anyhow::Result<HashMap<Author, Bytes>> {
unimplemented!()
}

fn sort_peers_by_latency(&self, _: &mut [Author]) {}
}

#[async_trait]
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/dag/tests/dag_state_sync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ impl RBNetworkSender<DAGMessage, DAGRpcResult> for MockDAGNetworkSender {
) -> anyhow::Result<HashMap<Author, Bytes>> {
unimplemented!()
}

fn sort_peers_by_latency(&self, _: &mut [Author]) {}
}

#[async_trait]
Expand Down
33 changes: 12 additions & 21 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,42 +317,25 @@ impl NetworkSender {
error!("Error broadcasting to self: {:?}", err);
}

// Get the list of validators excluding our own account address. Note the
// ordering is not important in this case.
let self_author = self.author;
let other_validators: Vec<_> = self
.validators
.get_ordered_account_addresses_iter()
.filter(|author| author != &self_author)
.collect();

counters::CONSENSUS_SENT_MSGS
.with_label_values(&[msg.name()])
.inc_by(other_validators.len() as u64);
// Broadcast message over direct-send to all other validators.
if let Err(err) = self
.consensus_network_client
.send_to_many(other_validators.into_iter(), msg)
{
warn!(error = ?err, "Error broadcasting message");
}
self.broadcast_without_self(msg);
}

pub fn broadcast_without_self(&self, msg: ConsensusMsg) {
let self_author = self.author;
let other_validators: Vec<_> = self
let mut other_validators: Vec<_> = self
.validators
.get_ordered_account_addresses_iter()
.filter(|author| author != &self_author)
.collect();
self.sort_peers_by_latency(&mut other_validators);

counters::CONSENSUS_SENT_MSGS
.with_label_values(&[msg.name()])
.inc_by(other_validators.len() as u64);
// Broadcast message over direct-send to all other validators.
if let Err(err) = self
.consensus_network_client
.send_to_many(other_validators.into_iter(), msg)
.send_to_many(other_validators, msg)
{
warn!(error = ?err, "Error broadcasting message");
}
Expand Down Expand Up @@ -483,6 +466,10 @@ impl NetworkSender {
pub fn author(&self) -> Author {
self.author
}

pub fn sort_peers_by_latency(&self, peers: &mut [Author]) {
self.consensus_network_client.sort_peers_by_latency(peers);
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -631,6 +618,10 @@ impl<Req: TConsensusMsg + RBMessage + 'static, Res: TConsensusMsg + RBMessage +
self.consensus_network_client
.to_bytes_by_protocol(peers, consensus_msg)
}

fn sort_peers_by_latency(&self, peers: &mut [Author]) {
self.sort_peers_by_latency(peers);
}
}

#[async_trait]
Expand Down
12 changes: 7 additions & 5 deletions consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,9 @@ impl<NetworkClient: NetworkClientInterface<ConsensusMsg>> ConsensusNetworkClient
}

/// Send a single message to the destination peers
pub fn send_to_many(
&self,
peers: impl Iterator<Item = PeerId>,
message: ConsensusMsg,
) -> Result<(), Error> {
pub fn send_to_many(&self, peers: Vec<PeerId>, message: ConsensusMsg) -> Result<(), Error> {
let peer_network_ids: Vec<PeerNetworkId> = peers
.into_iter()
.map(|peer| self.get_peer_network_id_for_peer(peer))
.collect();
self.network_client.send_to_peers(message, peer_network_ids)
Expand Down Expand Up @@ -209,4 +206,9 @@ impl<NetworkClient: NetworkClientInterface<ConsensusMsg>> ConsensusNetworkClient
fn get_peer_network_id_for_peer(&self, peer: PeerId) -> PeerNetworkId {
PeerNetworkId::new(NetworkId::Validator, peer)
}

pub fn sort_peers_by_latency(&self, peers: &mut [PeerId]) {
self.network_client
.sort_peers_by_latency(NetworkId::Validator, peers);
}
}
6 changes: 5 additions & 1 deletion consensus/src/pipeline/commit_reliable_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use aptos_consensus_types::{
};
use aptos_infallible::Mutex;
use aptos_reliable_broadcast::{BroadcastStatus, RBMessage, RBNetworkSender};
use aptos_types::validator_verifier::ValidatorVerifier;
use aptos_types::{validator_verifier::ValidatorVerifier, PeerId};
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -149,4 +149,8 @@ impl RBNetworkSender<CommitMessage> for NetworkSender {
self.consensus_network_client
.to_bytes_by_protocol(peers, msg)
}

fn sort_peers_by_latency(&self, peers: &mut [PeerId]) {
self.sort_peers_by_latency(peers);
}
}
4 changes: 4 additions & 0 deletions crates/aptos-jwk-consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ impl RBNetworkSender<JWKConsensusMsg> for NetworkSender {
) -> Result<HashMap<Author, bytes::Bytes>, anyhow::Error> {
self.jwk_network_client.to_bytes_by_protocol(peers, message)
}

fn sort_peers_by_latency(&self, peers: &mut [AccountAddress]) {
self.jwk_network_client.sort_peers_by_latency(peers)
}
}

pub trait RpcResponseSender: Send + Sync {
Expand Down
5 changes: 5 additions & 0 deletions crates/aptos-jwk-consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,9 @@ impl<NetworkClient: NetworkClientInterface<JWKConsensusMsg>>
fn get_peer_network_id_for_peer(&self, peer: PeerId) -> PeerNetworkId {
PeerNetworkId::new(NetworkId::Validator, peer)
}

pub fn sort_peers_by_latency(&self, peers: &mut [PeerId]) {
self.network_client
.sort_peers_by_latency(NetworkId::Validator, peers)
}
}
6 changes: 6 additions & 0 deletions crates/reliable-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub trait RBNetworkSender<Req: RBMessage, Res: RBMessage = Req>: Send + Sync {
peers: Vec<Author>,
message: Req,
) -> anyhow::Result<HashMap<Author, Bytes>>;

fn sort_peers_by_latency(&self, peers: &mut [Author]);
}

pub trait BroadcastStatus<Req: RBMessage, Res: RBMessage = Req>: Send + Sync + Clone {
Expand Down Expand Up @@ -155,6 +157,10 @@ where

let mut rpc_futures = FuturesUnordered::new();
let mut aggregate_futures = FuturesUnordered::new();

let mut receivers = receivers;
network_sender.sort_peers_by_latency(&mut receivers);

for receiver in receivers {
rpc_futures.push(send_message(receiver, None));
}
Expand Down
2 changes: 2 additions & 0 deletions crates/reliable-broadcast/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ where
.map(|peer| (peer, raw_message.clone()))
.collect())
}

fn sort_peers_by_latency(&self, _: &mut [Author]) {}
}

#[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions dkg/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl RBNetworkSender<DKGMessage> for NetworkSender {
) -> anyhow::Result<HashMap<AccountAddress, Bytes>> {
self.dkg_network_client.to_bytes_by_protocol(peers, message)
}

fn sort_peers_by_latency(&self, peers: &mut [AccountAddress]) {
self.dkg_network_client.sort_peers_by_latency(peers)
}
}

pub struct NetworkReceivers {
Expand Down
5 changes: 5 additions & 0 deletions dkg/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ impl<NetworkClient: NetworkClientInterface<DKGMessage>> DKGNetworkClient<Network
fn get_peer_network_id_for_peer(&self, peer: PeerId) -> PeerNetworkId {
PeerNetworkId::new(NetworkId::Validator, peer)
}

pub fn sort_peers_by_latency(&self, peers: &mut [PeerId]) {
self.network_client
.sort_peers_by_latency(NetworkId::Validator, peers)
}
}
Loading

0 comments on commit 2823f2e

Please sign in to comment.