From 1744e9efd74d83aeb15b384a8174949dbe753172 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 9 Apr 2024 18:12:26 -0500 Subject: [PATCH] BankingStage Forwarding Filter (#685) * add PacketFlags::FROM_STAKED_NODE * Only forward packets from staked node * fix local-cluster test forwarding * review comment * tpu_votes get marked as from_staked_node --- bench-streamer/src/main.rs | 1 + core/src/banking_stage/forwarder.rs | 1 + core/src/fetch_stage.rs | 3 ++ core/src/repair/ancestor_hashes_service.rs | 2 ++ core/src/repair/serve_repair_service.rs | 1 + core/src/shred_fetch_stage.rs | 1 + gossip/src/gossip_service.rs | 1 + local-cluster/tests/local_cluster.rs | 42 ++++++++++++++++------ sdk/src/packet.rs | 12 +++++++ streamer/src/nonblocking/quic.rs | 1 + streamer/src/streamer.rs | 9 ++++- 11 files changed, 63 insertions(+), 11 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index de300345eb..2d6998f298 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -117,6 +117,7 @@ fn main() -> Result<()> { Duration::from_millis(1), // coalesce true, None, + false, )); } diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index e1c2bdc304..0af6c5f851 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -161,6 +161,7 @@ impl Forwarder { self.update_data_budget(); let packet_vec: Vec<_> = forwardable_packets .filter(|p| !p.meta().forwarded()) + .filter(|p| p.meta().is_from_staked_node()) .filter(|p| self.data_budget.take(p.meta().size)) .filter_map(|p| p.data(..).map(|data| data.to_vec())) .collect(); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 5e972e6261..1f668a6ec0 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -171,6 +171,7 @@ impl FetchStage { coalesce, true, in_vote_only_mode.clone(), + false, // unstaked connections ) }) .collect() @@ -194,6 +195,7 @@ impl FetchStage { coalesce, true, in_vote_only_mode.clone(), + false, // unstaked connections ) }) .collect() @@ -216,6 +218,7 @@ impl FetchStage { coalesce, true, None, + true, // only staked connections should be voting ) }) .collect(); diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index 8f455cbd6a..0b7846f811 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -171,6 +171,7 @@ impl AncestorHashesService { Duration::from_millis(1), // coalesce false, // use_pinned_memory None, // in_vote_only_mode + false, // is_staked_service ); let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded(); @@ -1304,6 +1305,7 @@ mod test { Duration::from_millis(1), // coalesce false, None, + false, ); let (remote_request_sender, remote_request_receiver) = unbounded(); let t_packet_adapter = Builder::new() diff --git a/core/src/repair/serve_repair_service.rs b/core/src/repair/serve_repair_service.rs index 3fe424d076..2be1a67120 100644 --- a/core/src/repair/serve_repair_service.rs +++ b/core/src/repair/serve_repair_service.rs @@ -47,6 +47,7 @@ impl ServeRepairService { Duration::from_millis(1), // coalesce false, // use_pinned_memory None, // in_vote_only_mode + false, // is_staked_service ); let t_packet_adapter = Builder::new() .name(String::from("solServRAdapt")) diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 84f1520e64..a0a8810269 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -175,6 +175,7 @@ impl ShredFetchStage { PACKET_COALESCE_DURATION, true, // use_pinned_memory None, // in_vote_only_mode + false, ) }) .collect(); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 76ab14f27a..006c633e60 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -63,6 +63,7 @@ impl GossipService { Duration::from_millis(1), // coalesce false, None, + false, ); let (consume_sender, listen_receiver) = unbounded(); let t_socket_consume = cluster_info.clone().start_socket_consume_thread( diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 44032aeeb4..ed95bf85d6 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -9,7 +9,7 @@ use { solana_accounts_db::{ hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs, }, - solana_client::thin_client::ThinClient, + solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, solana_core::{ consensus::{ tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH, @@ -56,12 +56,9 @@ use { response::RpcSignatureResult, }, solana_runtime::{ - commitment::VOTE_THRESHOLD_SIZE, - snapshot_archive_info::SnapshotArchiveInfoGetter, - snapshot_bank_utils, - snapshot_config::SnapshotConfig, - snapshot_package::SnapshotKind, - snapshot_utils::{self}, + commitment::VOTE_THRESHOLD_SIZE, snapshot_archive_info::SnapshotArchiveInfoGetter, + snapshot_bank_utils, snapshot_config::SnapshotConfig, snapshot_package::SnapshotKind, + snapshot_utils, }, solana_sdk::{ account::AccountSharedData, @@ -78,7 +75,7 @@ use { system_program, system_transaction, vote::state::VoteStateUpdate, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::broadcast_stage::{ broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition}, BroadcastStageType, @@ -90,11 +87,12 @@ use { fs, io::Read, iter, + net::{IpAddr, Ipv4Addr}, num::NonZeroUsize, path::Path, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, + Arc, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -363,6 +361,13 @@ fn test_forwarding() { ), ..ClusterConfig::default() }; + + let client_keypair = Keypair::new(); + let mut overrides = HashMap::new(); + let stake = DEFAULT_NODE_STAKE * 10; + let total_stake = stake + config.node_stakes.iter().sum::(); + overrides.insert(client_keypair.pubkey(), stake); + config.validator_configs[1].staked_nodes_overrides = Arc::new(RwLock::new(overrides)); let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let cluster_nodes = discover_cluster( @@ -380,11 +385,28 @@ fn test_forwarding() { .find(|c| c.pubkey() != &leader_pubkey) .unwrap(); + let stakes = HashMap::from([ + (client_keypair.pubkey(), stake), + (Pubkey::new_unique(), total_stake - stake), + ]); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::new( + Arc::new(stakes), + HashMap::::default(), // overrides + ))); + + let client_connection_cache = Arc::new(ConnectionCache::new_with_client_options( + "client-connection-cache", + 1, + None, + Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), + Some((&staked_nodes, &client_keypair.pubkey())), + )); + // Confirm that transactions were forwarded to and processed by the leader. cluster_tests::send_many_transactions( validator_info, &cluster.funding_keypair, - &cluster.connection_cache, + &client_connection_cache, 10, 20, ); diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 8300b57218..661e8acee6 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -35,6 +35,8 @@ bitflags! { const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; /// For tracking performance const PERF_TRACK_PACKET = 0b0100_0000; + /// For marking packets from staked nodes + const FROM_STAKED_NODE = 0b1000_0000; } } @@ -215,6 +217,11 @@ impl Meta { self.port = socket_addr.port(); } + pub fn set_from_staked_node(&mut self, from_staked_node: bool) { + self.flags + .set(PacketFlags::FROM_STAKED_NODE, from_staked_node); + } + #[inline] pub fn discard(&self) -> bool { self.flags.contains(PacketFlags::DISCARD) @@ -278,6 +285,11 @@ impl Meta { pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) } + + #[inline] + pub fn is_from_staked_node(&self) -> bool { + self.flags.contains(PacketFlags::FROM_STAKED_NODE) + } } impl Default for Meta { diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 63a657d911..ddf4005c43 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -924,6 +924,7 @@ async fn handle_chunk( if packet_accum.is_none() { let mut meta = Meta::default(); meta.set_socket_addr(remote_addr); + meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked(_))); *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 7b68619082..a79445c3b8 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -110,6 +110,7 @@ fn recv_loop( coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, + is_staked_service: bool, ) -> Result<()> { loop { let mut packet_batch = if use_pinned_memory { @@ -147,7 +148,9 @@ fn recv_loop( if len == PACKETS_PER_BATCH { full_packet_batches_count.fetch_add(1, Ordering::Relaxed); } - + packet_batch + .iter_mut() + .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service)); packet_batch_sender.send(packet_batch)?; } break; @@ -156,6 +159,7 @@ fn recv_loop( } } +#[allow(clippy::too_many_arguments)] pub fn receiver( thread_name: String, socket: Arc, @@ -166,6 +170,7 @@ pub fn receiver( coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, + is_staked_service: bool, ) -> JoinHandle<()> { let res = socket.set_read_timeout(Some(Duration::new(1, 0))); assert!(res.is_ok(), "streamer::receiver set_read_timeout error"); @@ -181,6 +186,7 @@ pub fn receiver( coalesce, use_pinned_memory, in_vote_only_mode, + is_staked_service, ); }) .unwrap() @@ -490,6 +496,7 @@ mod test { Duration::from_millis(1), // coalesce true, None, + false, ); const NUM_PACKETS: usize = 5; let t_responder = {