From 2e9b2577ca58afd677054cf7d8334ce737efe3ed Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Wed, 1 Jun 2022 19:43:28 -0400 Subject: [PATCH] [mempool] Specify number of workers for mempool threads. Add prometheus metrics for networking. (#1113) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * function to revoke vouch * vouch revoking apis * add test * set rotation can only expand by a certain number of nodes on every epoch. So that no more than 25% of the next validator set has unproven nodes. This is based on the amount of nodes that we know to keep consensus (case 1 and 2). * updating tests and patch implementation * patch reconfig case_2 * make sure jailed nodes are dropping * increase threshold for voting * update writeset for rescue to include recovert mode * patch build * remove recovery mode * expand epoch set by 1/6th * make setting recovery mode optional from CLI * add debug prints * debug prints * debug prints and comments. * add debug prints and comments * notes * comment the mempool config params * find where we could create backpressure on mempool * prints for debugging * WIP experimental backpressure on shared mempool consensus requests. Does not build. * change node.yaml default params for state_sync and mempool * Fix build * adding prometheus counters * patch build * State sync debug (#1117) * Release v5.1.1 (#1114) * function to revoke vouch * vouch revoking apis * add test * set rotation can only expand by a certain number of nodes on every epoch. So that no more than 25% of the next validator set has unproven nodes. This is based on the amount of nodes that we know to keep consensus (case 1 and 2). * updating tests and patch implementation * patch reconfig case_2 * make sure jailed nodes are dropping * increase threshold for voting * update writeset for rescue to include recovert mode * patch build * remove recovery mode * expand epoch set by 1/6th * make setting recovery mode optional from CLI * impove mock case 1 helper * patch onboarding reconfig * patch mock tests * refactored tests that use mock_ * build stdlib for release * update 0L default configs for mempool and state sync * bump version * changelog * Update 5_1_1.md * adds some more metrics * adds more metrics * [move] [Fast Track Proposal] Turn down the heat on Cost To Exist (#1119) * defer for 90 days cost to inactives, and reduce the cost of burn by only implementing at steady state. * burn should be the default if user has not set send to community explicitly * exchanges some dbg! statements with debug! statements to be able to control log output Co-authored-by: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Co-authored-by: Gökhan Şimşek Co-authored-by: Sven Panko --- config/src/config/mempool_config.rs | 16 +++ config/src/config/state_sync_config.rs | 5 + execution/executor/src/lib.rs | 6 +- language/diem-framework/staged/stdlib.mv | Bin 120006 -> 120118 bytes .../diem-vm/src/diem_transaction_executor.rs | 14 +-- mempool/src/counters.rs | 73 ++++++++++++ mempool/src/shared_mempool/coordinator.rs | 38 ++++++- mempool/src/shared_mempool/peer_manager.rs | 18 ++- mempool/src/shared_mempool/tasks.rs | 32 ++++-- state-sync/src/coordinator.rs | 75 ++++++++++++- state-sync/src/counters.rs | 106 ++++++++++++++++++ state-sync/src/error.rs | 2 +- state-sync/src/request_manager.rs | 51 ++++++++- 13 files changed, 403 insertions(+), 33 deletions(-) diff --git a/config/src/config/mempool_config.rs b/config/src/config/mempool_config.rs index 821e6ad4f0..f414454910 100644 --- a/config/src/config/mempool_config.rs +++ b/config/src/config/mempool_config.rs @@ -6,18 +6,34 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] pub struct MempoolConfig { + /// What is the total size of the mempool queue, including invalid txs. pub capacity: usize, + /// How many txs can each user have in the mempool at a given time. pub capacity_per_user: usize, + // a threshold for fullnodes to determine which peers to broadcast to. + // peers which are go over this threshold, will receive broadcasts. // number of failovers to broadcast to when the primary network is alive pub default_failovers: usize, + // number of times a mempool broadcast gets re-sent to a peer if the previous was unacknowledged. pub max_broadcasts_per_peer: usize, + // how often to snapshot the mempool for analytics purposes. pub mempool_snapshot_interval_secs: u64, + // how long to wait for a peer after a broadcast was submitted, before we mark it as unacknowledged. pub shared_mempool_ack_timeout_ms: u64, + // if peer_manager is in backoff mode mempool/src/shared_mempool/peer_manager.rs + // this is the base interval for backing off. pub shared_mempool_backoff_interval_ms: u64, + + // size of batch from mempool timeline to broadcast to peers. pub shared_mempool_batch_size: usize, + // Number of workers to be spawned to receive inbound shared mempool broadcasts. pub shared_mempool_max_concurrent_inbound_syncs: usize, + // the default interval to execute shared mempool broadcasts to peers. + // this is overriden when peer is in backoff mode. pub shared_mempool_tick_interval_ms: u64, + /// when a transaction gets automatically garbage collected by system. Different than user tx expiry which has separate GC pub system_transaction_timeout_secs: u64, + /// tick interval for system GC. pub system_transaction_gc_interval_ms: u64, } diff --git a/config/src/config/state_sync_config.rs b/config/src/config/state_sync_config.rs index a6a693620e..e1147812b4 100644 --- a/config/src/config/state_sync_config.rs +++ b/config/src/config/state_sync_config.rs @@ -11,12 +11,16 @@ pub struct StateSyncConfig { // The timeout of the state sync client to process a commit notification (in milliseconds) pub client_commit_timeout_ms: u64, // default timeout used for long polling to remote peer + // this is only used by fullnodes pub long_poll_timeout_ms: u64, // valid maximum chunk limit for sanity check pub max_chunk_limit: u64, // valid maximum timeout limit for sanity check + // This timeout applies to the process_request_for_target_and_highest + // if the chunk cannot be applied now, then insert it in a subscription to appply. The subscription expires at max_timeout_ms pub max_timeout_ms: u64, // The timeout of the state sync coordinator to receive a commit ack from mempool (in milliseconds) + // Stops sending pub mempool_commit_timeout_ms: u64, // default timeout to make state sync progress by sending chunk requests to a certain number of networks // if no progress is made by sending chunk requests to a number of networks, @@ -26,6 +30,7 @@ pub struct StateSyncConfig { // commits when processing a sync request). pub sync_request_timeout_ms: u64, // interval used for checking state synchronization progress + // IMPORTANT: the mempool peer ack timeout is determined by 2X this number. pub tick_interval_ms: u64, } diff --git a/execution/executor/src/lib.rs b/execution/executor/src/lib.rs index f848895a8a..82a65bbc3b 100644 --- a/execution/executor/src/lib.rs +++ b/execution/executor/src/lib.rs @@ -618,7 +618,7 @@ impl ChunkExecutor for Executor { let latency = start_time.elapsed(); metrics_timer_vl.observe_duration(); - dbg!("verify_chunk latency", &latency); + debug!("verify_chunk latency: {:?}", &latency); // 3. Execute transactions. @@ -632,7 +632,7 @@ impl ChunkExecutor for Executor { let latency = start_time.elapsed(); metrics_timer_el.observe_duration(); - dbg!("execute_chunk latency", &latency); + debug!("execute_chunk latency: {:?}", &latency); // temp time the transaction execution. @@ -657,7 +657,7 @@ impl ChunkExecutor for Executor { let latency = start_time.elapsed(); metrics_timer_stxl.observe_duration(); - dbg!("save_transactions latency", &latency); + debug!("save_transactions latency: {:?}", &latency); // 5. Cache maintenance. let output_trees = output.executed_trees().clone(); diff --git a/language/diem-framework/staged/stdlib.mv b/language/diem-framework/staged/stdlib.mv index ce45e0a5afb50a69376ec422c4bf63e7860f39c6..d812a9838a67cbb7f6d26aa89155c6ebd363e2ef 100644 GIT binary patch delta 1318 zcmbVL-)me&6h3F>o|${+&ff0by}OB#Hcp#T7bkch;Sqwk~lITmOeFQL9E{uKTW<;^}RdygJb?#od0 zw(ga>;ByKfK7I(Wl_p$q|iq0`nURFe+spFE= zWPB^h#$_u*DUVR>3$^#l%+?YXscmKxYk0w9xvS;Clyv#r6%?mm;qThIc{!Nj$cf}U+ zGZfR`=5el{EMNn>O|^XTu8%tC{qhRF ztnp6zS{rYNheHOaSd-Q}cr=~wV6pNRtwuS%Auy&qVT-gD*kV|e>vSz$?O=8864GUi zd}iU-Y(gJmV3RO0;kevDx`M`w$8Cu|VYn2QIpl0sR|V9g?n770-@AkhRMG@Ae>IYJro z9Lz!9D;Lk6r}@S9Qrj%FmM*;dZo9K|A>F)+vpL&pzxnp!O6%NW=i*xW^Hn_k@m|su zvj$_~*9USAxmET1qj1pvl;Pj6%wW%7mQ`SNj)mHEWDl1WMyUMoFd@fi!a$^B#b_2% zwt~u`t27J)bub)ztQB^LXx&msaz;JQE*Crt_&2$O%s(KPmHhwYPW~6WObA1A{n<^t G^1@#xZl%%y delta 1182 zcma)*O^6&t6vyAISFgIdy1FwxpUJqHoz1W=n}EBC3WJbS*7fAYqljo^vj)fA=*$S> zVN^sA#3c--{YYE`n)m^_W}_pB0g;$XK(B&=CnI{%vmSgs>nuS$STwJz|F8P}{$2I@ zvk!uA-Ve4mKOVdgouB$GQUKr(imIro`Yh@*Sl|oz90nKFAIiSQ1@|5s{}G?(_*<7R zMo>VYfx$ok7V;|;6Gw$;c>ti@X5 z-bB)RpOAvOyk~ETQ&FdiB?R1Gse);cu%{)crm*Zxo^cQfG!|S6MW_v*-#W!a=y>?& z*1_T6Ro-6wWEzJ*bn3(POY>OVe*MyQTxg_aA~x15wrRyH@g8w(m4kU`QjyD{OjGuh zP#ZlCzq?$+>hPD#3m9zg{q)8wtqncd!2qE^h1>Np&G*!T2fs%^^okL46hQ^7Q=1K8O@wL#u<-Jq#3GkZ7c*$n&L*NOm}tDjNi=t$S0?^ zNpn0encgI$6Fts^O*+DNNTxr@aKde{BfE_HR(vyyJ2#J0^O0X}d+QfottI^0WQfridiTfW3kg}xM(=xP2i z`@Dx!M_xkSz{H9%+w^5VjiE0`Wfx7;8RR$6>2%sx_^im4s6v~_7c$ew+2s>b)#R1) zz8Q~M+D85YBN(fBE9Rq~&ia_Y6dC`u`fKE?qbK@!eXe!&=7aL&59_0r?_Lc08QPcS zpw6lnvxm;%KGKNUGcvhPTACAQq}fRmlO=7uw3N7!z9PrSXhNV|L>GbzB*@KpBRRql z^u6nkJjTZ!?yhusY2|o-HT&rtE*d`dzF^fxQQEg F{tdxHlb8Si diff --git a/language/diem-vm/src/diem_transaction_executor.rs b/language/diem-vm/src/diem_transaction_executor.rs index 49c1d7d4f0..0b2f4cbd3e 100644 --- a/language/diem-vm/src/diem_transaction_executor.rs +++ b/language/diem-vm/src/diem_transaction_executor.rs @@ -512,8 +512,8 @@ impl DiemVM { .map(|_return_vals| ()) .or_else(|e| { println!("error here\n"); - dbg!(&proposer); - dbg!(&previous_vote); + debug!("proposer: {:?}", &proposer); + debug!("previous vote: {:?}", &previous_vote); expect_only_successful_execution(e, BLOCK_PROLOGUE.as_str(), log_context) })?; @@ -777,16 +777,8 @@ impl DiemVM { let (vm_status, output, sender) = self.execute_single_transaction(&txn, data_cache, &log_context)?; - // match &txn { - // PreprocessedTransaction::UserTransaction(t) => { - // dbg!(&t.sequence_number()); - // }, - // _ => {}, - // }; - // dbg!("tx sender", &sender); - // let latency = start_time.elapsed(); metric_single_tx_lat.observe_duration(); - // dbg!("single tx latency", &latency); + if !output.status().is_discarded() { data_cache.push_write_set(output.write_set()); diff --git a/mempool/src/counters.rs b/mempool/src/counters.rs index 23e2cf6e62..1a3afb8e41 100644 --- a/mempool/src/counters.rs +++ b/mempool/src/counters.rs @@ -55,6 +55,10 @@ pub const STATE_SYNC_EVENT_LABEL: &str = "state_sync"; pub const RECONFIG_EVENT_LABEL: &str = "reconfig"; pub const PEER_BROADCAST_EVENT_LABEL: &str = "peer_broadcast"; +//////// 0L //////// +pub const CONSENSUS_REQUEST_LABEL: &str = "consensus_request"; + + // task spawn stage labels pub const SPAWN_LABEL: &str = "spawn"; pub const START_LABEL: &str = "start"; @@ -449,3 +453,72 @@ pub static MAIN_LOOP: Lazy = Lazy::new(|| { .unwrap(), ) }); + + + +//////// 0L //////// +/// Counter for my node +pub static SELF_REQUEST_BACKOFF: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_mempool_self_request_backoff", + "Number of times my node requested backoff" + ) + .unwrap() +}); + +pub static COORDINATOR_HANDLE_CLIENT_EVENT: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_mempool_coordinator_handle_client_event", + "Number of times a client event was handled in mempool" + ) + .unwrap() +}); + +pub static COORDINATOR_HANDLE_CONSENSUS_EVENT: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_mempool_coordinator_handle_consensus_event", + "Number of times a consensus event was handled in mempool" + ) + .unwrap() +}); + +pub static COORDINATOR_HANDLE_STATE_SYNC_EVENT: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_mempool_coordinator_handle_state_sync_event", + "Number of times a state-sync event was handled in mempool" + ) + .unwrap() +}); + +pub static COORDINATOR_HANDLE_MEMPOOL_RECONFIG_EVENT: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_mempool_coordinator_handle_mempool_reconfig_event", + "Number of times a mempool reconfiguration event was handled in mempool" + ) + .unwrap() +}); + +pub static TASKS_PROCESS_TX_BROADCAST_EVENT: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_mempool_tasks_process_tx_broadcast_event", + "Number of times a transaction broadcast event was handled in mempool" + ) + .unwrap() +}); + +pub static TASKS_PROCESS_CONSENSUS_REQUEST_EVENT: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_mempool_tasks_process_consensus_request_event", + "Number of times a consensus request was processed in mempool" + ) + .unwrap() +}); + +pub static PEER_MANAGER_PEER_REQUESTED_BACKOFF: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "diem_mempool_peer_requested_backoff", + "Number of backoff requests from peers", + &["network", "peer"] + ) + .unwrap() +}); diff --git a/mempool/src/shared_mempool/coordinator.rs b/mempool/src/shared_mempool/coordinator.rs index 01cf5b4bd3..5c8a1bfd39 100644 --- a/mempool/src/shared_mempool/coordinator.rs +++ b/mempool/src/shared_mempool/coordinator.rs @@ -73,21 +73,36 @@ pub(crate) async fn coordinator( let _timer = counters::MAIN_LOOP.start_timer(); ::futures::select! { (msg, callback) = client_events.select_next_some() => { + debug!("handle_client_event"); + counters::COORDINATOR_HANDLE_CLIENT_EVENT.inc(); handle_client_event(&mut smp, &bounded_executor, msg, callback).await; }, + // 0L TODO: execute mempool tasks in a bounded execution with capacity. msg = consensus_requests.select_next_some() => { - tasks::process_consensus_request(&smp.mempool, msg).await; + debug!("process_consensus_request"); + counters::COORDINATOR_HANDLE_CONSENSUS_EVENT.inc(); + //////// 0L //////// + // The goal here is to put consensus requests also in a Tokio Semaphore (diem BoundedExecutor) where we can control the amount of workers and put backpressure. + + handle_consensus_request(&mut smp, &bounded_executor, msg).await; + // tasks::process_consensus_request(&smp.mempool, msg).await; } msg = state_sync_requests.select_next_some() => { + debug!("state_sync_requests"); + counters::COORDINATOR_HANDLE_STATE_SYNC_EVENT.inc(); handle_state_sync_request(&mut smp, msg); } config_update = mempool_reconfig_events.select_next_some() => { + debug!("handle_mempool_reconfig_event"); + counters::COORDINATOR_HANDLE_MEMPOOL_RECONFIG_EVENT.inc(); handle_mempool_reconfig_event(&mut smp, &bounded_executor, config_update).await; }, (peer, backoff) = scheduled_broadcasts.select_next_some() => { tasks::execute_broadcast(peer, backoff, &mut smp, &mut scheduled_broadcasts, executor.clone()); }, (network_id, event) = events.select_next_some() => { + // dbg!("handle_event", &event.); + handle_event(&executor, &bounded_executor, &mut scheduled_broadcasts, &mut smp, network_id, event).await; }, complete => break, @@ -124,6 +139,27 @@ async fn handle_client_event( .await; } +//////// 0L //////// +async fn handle_consensus_request( + smp: &mut SharedMempool, + bounded_executor: &BoundedExecutor, + msg: ConsensusRequest, +) where + V: TransactionValidation, +{ + // This timer measures how long it took for the bounded executor to *schedule* the + // task. + let _timer = + counters::task_spawn_latency_timer(counters::CONSENSUS_REQUEST_LABEL, counters::SPAWN_LABEL); + // This timer measures how long it took for the task to go from scheduled to started. + let _task_start_timer = + counters::task_spawn_latency_timer(counters::CONSENSUS_REQUEST_LABEL, counters::START_LABEL); + + bounded_executor + .spawn(tasks::process_consensus_request(smp.clone(), msg)) + .await; +} + fn handle_state_sync_request(smp: &mut SharedMempool, msg: CommitNotification) where V: TransactionValidation, diff --git a/mempool/src/shared_mempool/peer_manager.rs b/mempool/src/shared_mempool/peer_manager.rs index 9d0a4b07c9..141eb9e6ed 100644 --- a/mempool/src/shared_mempool/peer_manager.rs +++ b/mempool/src/shared_mempool/peer_manager.rs @@ -35,7 +35,7 @@ pub(crate) type PeerSyncStates = HashMap; /// State of last sync with peer: /// `timeline_id` is position in log of ready transactions /// `is_alive` - is connection healthy -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct PeerSyncState { pub timeline_id: u64, pub is_alive: bool, @@ -81,7 +81,7 @@ impl Ord for BatchId { } /// Txn broadcast-related info for a given remote peer. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct BroadcastInfo { // Sent broadcasts that have not yet received an ack. pub sent_batches: BTreeMap, @@ -140,6 +140,7 @@ impl PeerManager { /// Disables a peer if it can be restarted, otherwise removes it pub fn disable_peer(&self, peer: PeerNetworkId) { + error!("shared mempool disable peer {:?}", &peer); // Remove all state on the peer, and start over self.peer_states.lock().remove(&peer); counters::active_upstream_peers(&peer.raw_network_id()).dec(); @@ -150,6 +151,7 @@ impl PeerManager { pub fn is_backoff_mode(&self, peer: &PeerNetworkId) -> bool { if let Some(state) = self.peer_states.lock().get(peer) { + warn!("shared mempool is in backoff mode for peer: {:?} ", &peer); state.broadcast_info.backoff_mode } else { // If we don't have sync state, we shouldn't backoff @@ -165,6 +167,10 @@ impl PeerManager { ) where V: TransactionValidation, { + + // dbg!("execute broadcast"); + // dbg!(&self.peer_states); + // Start timer for tracking broadcast latency. let start_time = Instant::now(); @@ -178,6 +184,7 @@ impl PeerManager { // Only broadcast to peers that are alive. if !state.is_alive { + error!("shared mempool peer is not alive: {:?}", &state.metadata); return; } @@ -242,6 +249,7 @@ impl PeerManager { // This helps rate-limit egress network bandwidth and not overload a remote peer or this // node's Diem network sender. if pending_broadcasts >= self.mempool_config.max_broadcasts_per_peer { + error!("will stop broadcasting shared mempool to peer: {:?}", &peer); return; } } @@ -370,6 +378,7 @@ impl PeerManager { let _ = std::mem::replace(&mut *prioritized_peers, peers); } + /// Node receives ack from peer. pub fn process_broadcast_ack( &self, peer: PeerNetworkId, @@ -433,6 +442,11 @@ impl PeerManager { // as a backoff broadcast. // This ensures backpressure request from remote peer is honored at least once. if backoff { + counters::PEER_MANAGER_PEER_REQUESTED_BACKOFF.with_label_values(&[ + &peer.raw_network_id().to_string(), + &peer.peer_id().to_string(), + ]).inc(); + error!("Peer requested backoff: {:?}", &peer); sync_state.broadcast_info.backoff_mode = true; } } diff --git a/mempool/src/shared_mempool/tasks.rs b/mempool/src/shared_mempool/tasks.rs index 65d17713dd..66c5184dc0 100644 --- a/mempool/src/shared_mempool/tasks.rs +++ b/mempool/src/shared_mempool/tasks.rs @@ -83,9 +83,9 @@ pub(crate) async fn process_client_transaction_submission( ) where V: TransactionValidation, { - dbg!("new transaction", &transaction); - dbg!("new transaction", &transaction.sender()); - dbg!("new transaction", &transaction.sequence_number()); + debug!("new transaction: {:?}", &transaction); + debug!("new transaction sender: {:?}", &transaction.sender()); + debug!("new transaction seq number: {:?}", &transaction.sequence_number()); timer.stop_and_record(); let _timer = @@ -116,6 +116,8 @@ pub(crate) async fn process_transaction_broadcast( ) where V: TransactionValidation, { + warn!("process_transaction_broadcast from other node {:?}", &peer); + counters::TASKS_PROCESS_TX_BROADCAST_EVENT.inc(); timer.stop_and_record(); let _timer = counters::process_txn_submit_latency_timer( peer.raw_network_id().as_str(), @@ -129,7 +131,9 @@ pub(crate) async fn process_transaction_broadcast( .network_senders .get_mut(&peer.network_id()) .expect("[shared mempool] missing network sender"); - if let Err(e) = network_sender.send_to(peer.peer_id(), ack_response) { + if let Err(e) = network_sender.send_to(peer.peer_id(), ack_response.clone()) { + error!("process_transaction_broadcast network error, {:?}, message: {:?}", &peer, &ack_response); + counters::network_send_fail_inc(counters::ACK_TXNS); error!( LogSchema::event_log(LogEntry::BroadcastACK, LogEvent::NetworkSendFail) @@ -150,7 +154,11 @@ fn gen_ack_response( let mut retry = false; for r in results.into_iter() { let submission_status = r.1; + + // 0L TODO: when is backoff submitted when mempool is full. if submission_status.0.code == MempoolStatusCode::MempoolIsFull { + debug!("mempool is full, responding to peer with backoff."); + counters::SELF_REQUEST_BACKOFF.inc(); backoff = true; } if is_txn_retryable(submission_status) { @@ -230,7 +238,7 @@ where if let Ok(sequence_number) = seq_numbers[idx] { if t.sequence_number() == sequence_number { return Some((t, sequence_number)); - } else if t.sequence_number() > sequence_number{ + } else if t.sequence_number() > sequence_number{ // discard transactions that are too new. statuses.push(( t, @@ -381,7 +389,13 @@ pub(crate) async fn process_state_sync_request( counters::mempool_service_latency(counters::COMMIT_STATE_SYNC_LABEL, result, latency); } -pub(crate) async fn process_consensus_request(mempool: &Mutex, req: ConsensusRequest) { + +pub(crate) async fn process_consensus_request( + smp: SharedMempool, + req: ConsensusRequest +) { + debug!("process_consensus_request"); + counters::TASKS_PROCESS_CONSENSUS_REQUEST_EVENT.inc(); // Start latency timer let start_time = Instant::now(); debug!(LogSchema::event_log(LogEntry::Consensus, LogEvent::Received).consensus_msg(&req)); @@ -394,7 +408,7 @@ pub(crate) async fn process_consensus_request(mempool: &Mutex, req: .collect(); let mut txns; { - let mut mempool = mempool.lock(); + let mut mempool = smp.mempool.lock(); // gc before pulling block as extra protection against txns that may expire in consensus // Note: this gc operation relies on the fact that consensus uses the system time to determine block timestamp let curr_time = diem_infallible::duration_since_epoch(); @@ -417,7 +431,7 @@ pub(crate) async fn process_consensus_request(mempool: &Mutex, req: counters::COMMIT_CONSENSUS_LABEL, transactions.len(), ); - reject_txns(mempool, transactions).await; + reject_txns(&smp.mempool, transactions).await; ( ConsensusResponse::CommitResponse(), callback, @@ -437,7 +451,7 @@ pub(crate) async fn process_consensus_request(mempool: &Mutex, req: }; let latency = start_time.elapsed(); - dbg!("mempool_service latency", &latency); + debug!("mempool_service latency: {:?}", &latency); counters::mempool_service_latency(counter_label, result, latency); diff --git a/state-sync/src/coordinator.rs b/state-sync/src/coordinator.rs index 52fca2a802..c9da18e46c 100644 --- a/state-sync/src/coordinator.rs +++ b/state-sync/src/coordinator.rs @@ -145,15 +145,20 @@ impl StateSyncCoordinator { network_handles: Vec<(NodeNetworkId, StateSyncSender, StateSyncEvents)>, ) { info!(LogSchema::new(LogEntry::RuntimeStart)); + + // load duration of state sync tick from configs. let mut interval = IntervalStream::new(interval(Duration::from_millis( self.config.tick_interval_ms, ))) .fuse(); + // iterate over all the state sync senders and collect the stream of network events. let events: Vec<_> = network_handles .into_iter() .map(|(network_id, _sender, events)| events.map(move |e| (network_id.clone(), e))) .collect(); + + // combine/fuse the streams of network events. let mut network_events = select_all(events).fuse(); loop { @@ -162,6 +167,7 @@ impl StateSyncCoordinator { msg = self.client_events.select_next_some() => { match msg { CoordinatorMessage::SyncRequest(request) => { + debug!("sync request"); let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY .with_label_values(&[counters::SYNC_MSG_LABEL]) .start_timer(); @@ -171,6 +177,8 @@ impl StateSyncCoordinator { } } CoordinatorMessage::CommitNotification(notification) => { + debug!("CommitNotification"); + let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY .with_label_values(&[counters::COMMIT_MSG_LABEL]) .start_timer(); @@ -180,9 +188,15 @@ impl StateSyncCoordinator { } } CoordinatorMessage::GetSyncState(callback) => { + debug!("GetSyncState"); + counters::GET_SYNC_STATE.inc(); + let _ = self.get_sync_state(callback); } CoordinatorMessage::WaitForInitialization(cb_sender) => { + debug!("WaitForInitialization"); + counters::WAIT_FOR_INIT.inc(); + if let Err(e) = self.wait_for_initialization(cb_sender) { error!(LogSchema::new(LogEntry::Waypoint).error(&e)); } @@ -197,6 +211,9 @@ impl StateSyncCoordinator { } } Event::LostPeer(metadata) => { + debug!("received event lost peer"); + + // prometheus metric is handled in process_lost_peer if let Err(e) = self.process_lost_peer(network_id, metadata.remote_peer_id) { error!(LogSchema::new(LogEntry::LostPeer).error(&e)); } @@ -215,6 +232,7 @@ impl StateSyncCoordinator { } }, _ = interval.select_next_some() => { + // dbg!("statesync tick"); if let Err(e) = self.check_progress() { error!(LogSchema::event_log(LogEntry::ProgressCheck, LogEvent::Fail).error(&e)); } @@ -238,7 +256,15 @@ impl StateSyncCoordinator { network_id: NodeNetworkId, peer_id: PeerId, ) -> Result<(), Error> { + error!("lost peer: {:?}", &peer_id); let peer = PeerNetworkId(network_id, peer_id); + + counters::STATE_SYNC_LOST_PEER.with_label_values(&[ + &peer.raw_network_id().to_string(), + &peer.peer_id().to_string() + ]) + .inc(); + self.request_manager.disable_peer(&peer) } @@ -332,6 +358,8 @@ impl StateSyncCoordinator { if self.is_initialized() { Self::send_initialization_callback(cb_sender)?; } else { + debug!("start statesync initialization listener"); + counters::STATE_SYNC_INIT_LISTENER_STARTED.inc(); self.initialization_listener = Some(cb_sender); } @@ -343,6 +371,9 @@ impl StateSyncCoordinator { /// Note: when processing a sync request, state sync assumes that it's the only one /// modifying storage, i.e., consensus is not trying to commit transactions concurrently. fn process_sync_request(&mut self, request: SyncRequest) -> Result<(), Error> { + debug!("processing state sync request"); + counters::STATE_SYNC_PROCESS_STATE_SYNC_REQUEST.inc(); + fail_point!("state_sync::process_sync_request_message", |_| { Err(crate::error::Error::UnexpectedError( "Injected error in process_sync_request_message".into(), @@ -574,6 +605,7 @@ impl StateSyncCoordinator { ) .await { + debug!("statesync commit to mempool failed with mempool_commit_timeout_ms expired"); counters::COMMIT_FLOW_FAIL .with_label_values(&[counters::FROM_MEMPOOL_LABEL]) .inc(); @@ -657,15 +689,32 @@ impl StateSyncCoordinator { // Verify the chunk request is valid before trying to process it. If it's invalid, // penalize the peer's score. + + // 0L todo: don't penalize validators. Validators should always try to connect to others. + if let Err(error) = self.verify_chunk_request_is_valid(&request) { + debug!("statesync penalizing peer {:?}", &peer); + + counters::STATE_SYNC_PENALIZE_PEER + .with_label_values(&[ + &peer.raw_network_id().to_string(), + &peer.peer_id().to_string() + ]) + .inc(); + self.request_manager.process_invalid_chunk_request(&peer); return Err(error); } match request.target.clone() { + // 0L: thsi is for validators + // uses the max_timeout_ms from config, to put the chunk on a subsripction queue if the local state is not ready to process. TargetType::TargetLedgerInfo(li) => { self.process_request_for_target_and_highest(peer, request, Some(li), None) } + + // 0L: This is for Fullnodes, requesting a highest available + // must include a specific timeout for the request. TargetType::HighestAvailable { target_li, timeout_ms, @@ -744,6 +793,9 @@ impl StateSyncCoordinator { // requestor) add the request to the subscriptions to be handled when this node catches up. let local_version = self.local_state.committed_version(); if local_version <= request.known_version { + debug!("this node local version is less than known_version. Adding req to subscription."); + counters::STATE_SYNC_LOCAL_VERSION_NOT_UP_TO_DATE.inc(); + let expiration_time = SystemTime::now().checked_add(Duration::from_millis(timeout)); if let Some(time) = expiration_time { let request_info = PendingRequestInfo { @@ -1459,16 +1511,26 @@ impl StateSyncCoordinator { /// therefore not write to storage. Reads are still permitted (e.g., to /// handle chunk requests). fn is_consensus_executing(&mut self) -> bool { - self.is_initialized() && self.role == RoleType::Validator && self.sync_request.is_none() + // TODO(0L) this is not actually checking if consensus is executing. + // !!!!!!!!!!!!!! + //////// 0L //////// + // EXPERIMENTAL + // !!!!!!!!!!!!! + // always return false + return false + + // self.is_initialized() && self.role == RoleType::Validator && self.sync_request.is_none() } /// Ensures that state sync is making progress: /// * Kick starts the initial sync process (e.g., syncing to a waypoint or target). /// * Issues a new request if too much time has passed since the last request was sent. fn check_progress(&mut self) -> Result<(), Error> { - if self.is_consensus_executing() { - return Ok(()); // No need to check progress or issue any requests (consensus is running). - } + // dbg!("check_progress"); + // if self.is_consensus_executing() { + // dbg!("is_consensus_executing"); + // return Ok(()); // No need to check progress or issue any requests (consensus is running). + // } // Check if the sync request has timed out (i.e., if we aren't committing fast enough) if let Some(sync_request) = self.sync_request.as_ref() { @@ -1541,6 +1603,7 @@ impl StateSyncCoordinator { chunk_target: TargetType, log_entry: LogEntry, ) -> Result<(), Error> { + // dbg!("sending chunk request"); if let Err(error) = self.send_chunk_request_with_target(known_version, known_epoch, chunk_target) { @@ -1563,7 +1626,11 @@ impl StateSyncCoordinator { known_epoch: u64, target: TargetType, ) -> Result<(), Error> { + debug!("sending chunk request with"); if self.request_manager.no_available_peers() { + debug!("no available peers"); + counters::STATE_SYNC_NO_AVAILABLE_PEERS.inc(); + warn!(LogSchema::event_log( LogEntry::SendChunkRequest, LogEvent::MissingPeers diff --git a/state-sync/src/counters.rs b/state-sync/src/counters.rs index 4e762b5f21..adfe2725f6 100644 --- a/state-sync/src/counters.rs +++ b/state-sync/src/counters.rs @@ -344,3 +344,109 @@ pub static MAIN_LOOP: Lazy = Lazy::new(|| { .unwrap(), ) }); + +//////// 0L //////// + +/// Number of disabled peer events +pub static DISABLE_PEER_EVENT: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "diem_state_sync_disable_peer_event", + "Number of times a disconnection event happened", + &["network"] + ) + .unwrap() +}); + +/// The score of the remote state sync peers. +pub static SYNC_PEER_SCORE: Lazy = Lazy::new(|| { + register_histogram_vec!( + "diem_state_sync_peer_score", + "Score of the peer", + &["network", "peer"] + ) + .unwrap() +}); + +pub static GET_SYNC_STATE: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_state_sync_get_sync_state", + "Number of times sync state was retrieved" + ) + .unwrap() +}); + +pub static WAIT_FOR_INIT: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_state_sync_wait_for_init", + "Number of times sync state was waiting for initialization" + ) + .unwrap() +}); + +pub static STATE_SYNC_INIT_LISTENER_STARTED: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_state_sync_init_listener_started", + "Number of times sync state started the initialization listener" + ) + .unwrap() +}); + +pub static STATE_SYNC_PROCESS_STATE_SYNC_REQUEST: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_state_sync_process_state_sync_request", + "Number of times sync state processed a state sync request" + ) + .unwrap() +}); + +pub static STATE_SYNC_PENALIZE_PEER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "diem_state_sync_penalize_peer", + "Number of times sync state penalized a peer", + &["network", "peer"] + ) + .unwrap() +}); + +pub static STATE_SYNC_LOCAL_VERSION_NOT_UP_TO_DATE: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_state_sync_local_version_not_up_to_date", + "Number of times sync state detected that the local version was not up-to-date" + ) + .unwrap() +}); + +pub static STATE_SYNC_NO_AVAILABLE_PEERS: Lazy = Lazy::new(|| { + register_int_counter!( + "diem_state_sync_no_available_peers", + "Number of times sync state detected no available peers to sync with" + ) + .unwrap() +}); + +pub static STATE_SYNC_VERSION_REQUEST_TIMEOUT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "diem_state_sync_version_request_timeout", + "Number of times sync state received a timeout while requesting version information", + &["network", "peer"] + ) + .unwrap() +}); + +pub static STATE_SYNC_ADDING_VALIDATOR_TO_STATE_SYNC: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "diem_state_sync_adding_validator_to_state_sync", + "Number of times sync state added a validator to state sync", + &["network", "peer"] + ) + .unwrap() +}); + +pub static STATE_SYNC_LOST_PEER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "diem_state_sync_lost_peer", + "Number of times sync state lost a peer", + &["network", "peer"] + ) + .unwrap() +}); diff --git a/state-sync/src/error.rs b/state-sync/src/error.rs index 5eae7be751..fe15a12772 100644 --- a/state-sync/src/error.rs +++ b/state-sync/src/error.rs @@ -49,7 +49,7 @@ pub enum Error { ReceivedWrongChunkType(String), #[error("Received a oneshot::canceled event as the sender of a channel was dropped: {0}")] SenderDroppedError(String), - #[error("Synced beyond the target version. Synced version: {0}, target version: {1}")] + #[error("Synced beyond the target version. DB already at the version requested. Synced version: {0}, target version: {1}")] //////// 0L //////// SyncedBeyondTarget(Version, Version), #[error("State sync is uninitialized! Error: {0}")] UninitializedError(String), diff --git a/state-sync/src/request_manager.rs b/state-sync/src/request_manager.rs index 7d2ddd6bbb..2370a5ad16 100644 --- a/state-sync/src/request_manager.rs +++ b/state-sync/src/request_manager.rs @@ -143,13 +143,26 @@ impl RequestManager { } pub fn disable_peer(&mut self, peer: &PeerNetworkId) -> Result<(), Error> { + error!("disable_peer {:?}", &peer); + //////// 0L //////// + counters::DISABLE_PEER_EVENT + .with_label_values(&[&peer.raw_network_id().to_string()]) + .inc(); + + counters::SYNC_PEER_SCORE + .with_label_values(&[ + &peer.raw_network_id().to_string(), + &peer.peer_id().to_string(), + ]) + .observe(0.0); + info!(LogSchema::new(LogEntry::LostPeer).peer(&peer)); if self.peer_scores.contains_key(peer) { counters::ACTIVE_UPSTREAM_PEERS .with_label_values(&[&peer.raw_network_id().to_string()]) .dec(); - self.peer_scores.remove(peer); + self.peer_scores.remove(peer); } else { warn!(LogSchema::new(LogEntry::LostPeerNotKnown).peer(&peer)); } @@ -158,11 +171,16 @@ impl RequestManager { } pub fn no_available_peers(&self) -> bool { + if self.peer_scores.is_empty() { + error!("no available peers"); + }; self.peer_scores.is_empty() } fn update_score(&mut self, peer: &PeerNetworkId, update_type: PeerScoreUpdateType) { + if let Some(score) = self.peer_scores.get_mut(peer) { + let old_score = *score; let new_score = match update_type { PeerScoreUpdateType::Success => { @@ -182,6 +200,16 @@ impl RequestManager { } }; *score = new_score; + + counters::SYNC_PEER_SCORE + .with_label_values(&[ + &peer.raw_network_id().to_string(), + &peer.peer_id().to_string(), + ]) + .observe(*score); + + error!("update peer score: {:?} with update_type {:?}, old score: {:?}, new score: {:?}", &peer, &update_type, &old_score, &score); + } } @@ -251,7 +279,7 @@ impl RequestManager { if let Some(network_level) = new_multicast_network_level { self.update_multicast_network_level(network_level, None); } - + debug!("{:?}", &chosen_peers); chosen_peers } @@ -260,6 +288,8 @@ impl RequestManager { let peers = self.pick_peers(); if peers.is_empty() { + debug!("no statesync available peers"); + counters::STATE_SYNC_NO_AVAILABLE_PEERS.inc(); warn!(log.event(LogEvent::MissingPeers)); return Err(Error::NoAvailablePeers( "No peers to send chunk request to".into(), @@ -453,6 +483,7 @@ impl RequestManager { /// Checks whether the request sent with known_version = `version` has timed out /// Returns true if such a request timed out (or does not exist), else false. pub fn has_request_timed_out(&mut self, version: u64) -> Result { + let last_request_time = self.get_last_request_time(version).unwrap_or(UNIX_EPOCH); let timeout = is_timeout(last_request_time, self.request_timeout); @@ -467,7 +498,15 @@ impl RequestManager { return Ok(timeout); } }; + + error!("request timed out, length: {:?}, peers {:?}", &self.request_timeout, &peers_to_penalize); + for peer in peers_to_penalize.iter() { + counters::STATE_SYNC_VERSION_REQUEST_TIMEOUT.with_label_values(&[ + &peer.raw_network_id().to_string(), + &peer.peer_id().to_string() + ]) + .inc(); self.update_score(peer, PeerScoreUpdateType::TimeOut); } @@ -893,6 +932,14 @@ mod tests { validator: &PeerNetworkId, peer_role: PeerRole, ) { + debug!("adding validator to state sync: {:?}", &validator); + counters::STATE_SYNC_ADDING_VALIDATOR_TO_STATE_SYNC + .with_label_values(&[ + &validator.raw_network_id().to_string(), + &validator.peer_id().to_string() + ]) + .inc(); + let connection_metadata = ConnectionMetadata::mock_with_role_and_origin( validator.peer_id(), peer_role,