From 8479ef4c2f9ca7be1dcccdd247c5de72dde8b78a Mon Sep 17 00:00:00 2001 From: Dmitriy Khomitskiy Date: Mon, 26 Aug 2024 08:59:40 -0500 Subject: [PATCH] RPC-456 fixing several recommendations from PR --- src/priority_fee.rs | 97 ++++++++++++++++++++++++--------------------- src/rpc_server.rs | 52 ++++++++++++------------ 2 files changed, 76 insertions(+), 73 deletions(-) diff --git a/src/priority_fee.rs b/src/priority_fee.rs index c94b88e..4ec6558 100644 --- a/src/priority_fee.rs +++ b/src/priority_fee.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::ops::Sub; - use cadence_macros::statsd_count; use cadence_macros::statsd_gauge; use dashmap::DashMap; @@ -11,7 +10,6 @@ use solana_program_runtime::compute_budget::ComputeBudget; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::{pubkey::Pubkey, slot_history::Slot}; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::time::timeout; use tracing::{error, warn}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; @@ -209,28 +207,31 @@ impl PriorityFeeTracker { } fn poll_fees(&self, mut sampling_rxn: Receiver<(Vec, bool, Option)>) { - let priority_fee_tracker = self.clone(); - tokio::spawn(async move { - let mut start = Instant::now(); - loop { - let elapsed = start.elapsed().as_millis(); - let remaining_time = 1_000u128.saturating_sub(elapsed); - match timeout(Duration::from_millis(remaining_time as u64), sampling_rxn.recv()).await - { - Ok(res) => { - match res { - Some((accounts, include_vote, lookback_period)) => - priority_fee_tracker.record_specific_fees(accounts, include_vote, lookback_period), - _ => {}, - } + { + let priority_fee_tracker = self.clone(); + // task to run global fee comparison every 1 second + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(1_000)).await; + priority_fee_tracker.record_general_fees(); + } + }); + } + + { + let priority_fee_tracker = self.clone(); + // task to poll the queue and run comparison to see what is the diff between algos + tokio::spawn(async move { + loop { + match sampling_rxn.recv().await + { + Some((accounts, include_vote, lookback_period)) => + priority_fee_tracker.record_specific_fees(accounts, include_vote, lookback_period), + _ => {}, } - Err(_) => { - priority_fee_tracker.record_general_fees(); - start = Instant::now(); - }, }; - } - }); + }); + } } fn record_general_fees(&self) { @@ -270,41 +271,41 @@ impl PriorityFeeTracker { fn record_specific_fees(&self, accounts: Vec, include_vote: bool, lookback_period: Option) { - let global_fees1 = self.calculation1(&accounts, include_vote, &lookback_period); - let global_fees2 = self.calculation2(&accounts, include_vote, &lookback_period); + let old_fee = self.calculation1(&accounts, include_vote, &lookback_period); + let new_fee = self.calculation2(&accounts, include_vote, &lookback_period); statsd_gauge!( "min_priority_fee_diff", - global_fees1.min.sub(global_fees2.min), + new_fee.min.sub(old_fee.min), "account" => "none" ); statsd_gauge!("low_priority_fee_diff", - global_fees1.low.sub(global_fees2.low) as u64, + new_fee.low.sub(old_fee.low) as u64, "account" => "none" ); statsd_gauge!( "medium_priority_fee_diff", - global_fees1.medium.sub(global_fees2.medium) as u64, + new_fee.medium.sub(old_fee.medium) as u64, "account" => "none" ); statsd_gauge!( "high_priority_fee_diff", - global_fees1.high.sub(global_fees2.high) as u64, + new_fee.high.sub(old_fee.high) as u64, "account" => "none" ); statsd_gauge!( "very_high_priority_fee_diff", - global_fees1.very_high.sub(global_fees2.very_high) as u64, + new_fee.very_high.sub(old_fee.very_high) as u64, "account" => "none" ); statsd_gauge!( "unsafe_max_priority_fee_diff", - global_fees1.unsafe_max.sub(global_fees2.unsafe_max) as u64, + new_fee.unsafe_max.sub(old_fee.unsafe_max) as u64, "account" => "none" ); statsd_gauge!( "recommended_priority_fee_diff", - get_recommended_fee(global_fees1).sub(get_recommended_fee(global_fees2)) as u64, + get_recommended_fee(new_fee).sub(get_recommended_fee(old_fee)) as u64, "account" => "none" ); } @@ -439,7 +440,8 @@ impl PriorityFeeTracker { fees.extend_from_slice(&slot_priority_fees.fees.non_vote_fees); } } - estimate_max_values(&mut fees, &mut micro_lamport_priority_fee_estimates); + micro_lamport_priority_fee_estimates = estimate_max_values(&mut fees, + micro_lamport_priority_fee_estimates); for account in accounts { fees.clear(); @@ -457,7 +459,8 @@ impl PriorityFeeTracker { } } } - estimate_max_values(&mut fees, &mut micro_lamport_priority_fee_estimates); + micro_lamport_priority_fee_estimates = estimate_max_values(&mut fees, + micro_lamport_priority_fee_estimates); } micro_lamport_priority_fee_estimates } @@ -465,20 +468,22 @@ impl PriorityFeeTracker { fn estimate_max_values( mut fees: &mut Vec, - micro_lamport_priority_fee_estimates: &mut MicroLamportPriorityFeeEstimates, -) { - micro_lamport_priority_fee_estimates.min = + micro_lamport_priority_fee_estimates: MicroLamportPriorityFeeEstimates, +) -> MicroLamportPriorityFeeEstimates { + let mut estimate = MicroLamportPriorityFeeEstimates::default(); + estimate.min = percentile(&mut fees, 0).max(micro_lamport_priority_fee_estimates.min); - micro_lamport_priority_fee_estimates.low = + estimate.low = percentile(&mut fees, 25).max(micro_lamport_priority_fee_estimates.low); - micro_lamport_priority_fee_estimates.medium = + estimate.medium = percentile(&mut fees, 50).max(micro_lamport_priority_fee_estimates.medium); - micro_lamport_priority_fee_estimates.high = + estimate.high = percentile(&mut fees, 75).max(micro_lamport_priority_fee_estimates.high); - micro_lamport_priority_fee_estimates.very_high = + estimate.very_high = percentile(&mut fees, 95).max(micro_lamport_priority_fee_estimates.very_high); - micro_lamport_priority_fee_estimates.unsafe_max = + estimate.unsafe_max = percentile(&mut fees, 100).max(micro_lamport_priority_fee_estimates.unsafe_max); + estimate } fn max(a: f64, b: f64) -> f64 { @@ -572,7 +577,7 @@ mod tests { set_global_default(client) } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_specific_fee_estimates() { init_metrics(); let tracker = PriorityFeeTracker::new(10); @@ -629,7 +634,7 @@ mod tests { assert_eq!(estimates.unsafe_max, expected_max_fee); } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_with_many_slots() { init_metrics(); let tracker = PriorityFeeTracker::new(101); @@ -686,7 +691,7 @@ mod tests { assert_eq!(estimates.unsafe_max, expected_max_fee); } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_with_many_slots_broken() { // same test as above but with an extra slot to throw off the value init_metrics(); @@ -736,7 +741,7 @@ mod tests { assert_ne!(estimates.very_high, expected_very_high_fee); } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_with_transactions_for_different_accounts() { // same test as above but with an extra slot to throw off the value init_metrics(); @@ -814,7 +819,7 @@ mod tests { assert_eq!(estimates.unsafe_max, expected_max_fee); } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_with_multiple_transactions_for_different_slots() { // same test as above but with an extra slot to throw off the value init_metrics(); diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 79c783a..db8a523 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -253,36 +253,36 @@ impl AtlasPriorityFeeEstimatorRpcServer for AtlasPriorityFeeEstimator { &self, get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, ) -> RpcResult { - let algo = |accounts: Vec, - include_vote: bool, - lookback_period: Option| - -> MicroLamportPriorityFeeEstimates { - self.priority_fee_tracker.get_priority_fee_estimates( - accounts, - include_vote, - lookback_period, - true - ) - }; - self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo) + let algo_run_fn = |accounts: Vec, + include_vote: bool, + lookback_period: Option| + -> MicroLamportPriorityFeeEstimates { + self.priority_fee_tracker.get_priority_fee_estimates( + accounts, + include_vote, + lookback_period, + true, + ) + }; + self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo_run_fn) } fn get_test_priority_fee_estimate( &self, get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, ) -> RpcResult { - let algo = |accounts: Vec, - include_vote: bool, - lookback_period: Option| - -> MicroLamportPriorityFeeEstimates { + let algo_run_fn = |accounts: Vec, + include_vote: bool, + lookback_period: Option| + -> MicroLamportPriorityFeeEstimates { self.priority_fee_tracker.get_priority_fee_estimates( accounts, include_vote, lookback_period, - false + false, ) }; - self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo) + self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo_run_fn) } } @@ -300,13 +300,11 @@ impl AtlasPriorityFeeEstimator { server } - fn execute_priority_fee_estimate_coordinator( + fn execute_priority_fee_estimate_coordinator( &self, get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest, - algo_fn: F, + priority_fee_calc_fn: impl FnOnce(Vec, bool, Option) -> MicroLamportPriorityFeeEstimates, ) -> RpcResult - where - F: Fn(Vec, bool, Option) -> MicroLamportPriorityFeeEstimates, { let options = get_priority_fee_estimate_request.options.clone(); let reason = validate_get_priority_fee_estimate_request(&get_priority_fee_estimate_request); @@ -329,7 +327,7 @@ impl AtlasPriorityFeeEstimator { } } let include_vote = should_include_vote(&options); - let priority_fee_levels = algo_fn(accounts, include_vote, lookback_slots); + let priority_fee_levels = priority_fee_calc_fn(accounts, include_vote, lookback_slots); if let Some(options) = options.clone() { if options.include_all_priority_fee_levels == Some(true) { return Ok(GetPriorityFeeEstimateResponse { @@ -407,7 +405,7 @@ mod tests { use solana_sdk::pubkey::Pubkey; use std::sync::Arc; - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_calculating_fees_with_all_options_none() { prep_statsd(); @@ -432,7 +430,7 @@ mod tests { assert!(resp.priority_fee_levels.is_none()); } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_calculating_fees_with_no_options() { prep_statsd(); @@ -456,7 +454,7 @@ mod tests { assert!(resp.priority_fee_levels.is_none()); } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_calculating_all_fees() { prep_statsd(); @@ -490,7 +488,7 @@ mod tests { assert_eq!(levels.unsafe_max, 200.0); assert!(resp.priority_fee_estimate.is_none()); } - #[tokio::test(flavor = "current_thread")] + #[tokio::test] async fn test_calculating_recommended_given_very_low_calculated_fee() { prep_statsd();