Skip to content

Commit

Permalink
RPC-456 fixing several recommendations from PR
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitriy-helius committed Aug 26, 2024
1 parent 5114449 commit 8479ef4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 73 deletions.
97 changes: 51 additions & 46 deletions src/priority_fee.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::ops::Sub;

use cadence_macros::statsd_count;
use cadence_macros::statsd_gauge;
use dashmap::DashMap;
Expand All @@ -11,7 +10,6 @@ use solana_program_runtime::compute_budget::ComputeBudget;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::{pubkey::Pubkey, slot_history::Slot};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::timeout;
use tracing::{error, warn};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
Expand Down Expand Up @@ -209,28 +207,31 @@ impl PriorityFeeTracker {
}

fn poll_fees(&self, mut sampling_rxn: Receiver<(Vec<Pubkey>, bool, Option<u32>)>) {
let priority_fee_tracker = self.clone();
tokio::spawn(async move {
let mut start = Instant::now();
loop {
let elapsed = start.elapsed().as_millis();
let remaining_time = 1_000u128.saturating_sub(elapsed);
match timeout(Duration::from_millis(remaining_time as u64), sampling_rxn.recv()).await
{
Ok(res) => {
match res {
Some((accounts, include_vote, lookback_period)) =>
priority_fee_tracker.record_specific_fees(accounts, include_vote, lookback_period),
_ => {},
}
{
let priority_fee_tracker = self.clone();
// task to run global fee comparison every 1 second
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(1_000)).await;
priority_fee_tracker.record_general_fees();
}
});
}

{
let priority_fee_tracker = self.clone();
// task to poll the queue and run comparison to see what is the diff between algos
tokio::spawn(async move {
loop {
match sampling_rxn.recv().await
{
Some((accounts, include_vote, lookback_period)) =>
priority_fee_tracker.record_specific_fees(accounts, include_vote, lookback_period),
_ => {},
}
Err(_) => {
priority_fee_tracker.record_general_fees();
start = Instant::now();
},
};
}
});
});
}
}

fn record_general_fees(&self) {
Expand Down Expand Up @@ -270,41 +271,41 @@ impl PriorityFeeTracker {

fn record_specific_fees(&self, accounts: Vec<Pubkey>, include_vote: bool, lookback_period: Option<u32>)
{
let global_fees1 = self.calculation1(&accounts, include_vote, &lookback_period);
let global_fees2 = self.calculation2(&accounts, include_vote, &lookback_period);
let old_fee = self.calculation1(&accounts, include_vote, &lookback_period);
let new_fee = self.calculation2(&accounts, include_vote, &lookback_period);

statsd_gauge!(
"min_priority_fee_diff",
global_fees1.min.sub(global_fees2.min),
new_fee.min.sub(old_fee.min),
"account" => "none"
);
statsd_gauge!("low_priority_fee_diff",
global_fees1.low.sub(global_fees2.low) as u64,
new_fee.low.sub(old_fee.low) as u64,
"account" => "none"
);
statsd_gauge!(
"medium_priority_fee_diff",
global_fees1.medium.sub(global_fees2.medium) as u64,
new_fee.medium.sub(old_fee.medium) as u64,
"account" => "none"
);
statsd_gauge!(
"high_priority_fee_diff",
global_fees1.high.sub(global_fees2.high) as u64,
new_fee.high.sub(old_fee.high) as u64,
"account" => "none"
);
statsd_gauge!(
"very_high_priority_fee_diff",
global_fees1.very_high.sub(global_fees2.very_high) as u64,
new_fee.very_high.sub(old_fee.very_high) as u64,
"account" => "none"
);
statsd_gauge!(
"unsafe_max_priority_fee_diff",
global_fees1.unsafe_max.sub(global_fees2.unsafe_max) as u64,
new_fee.unsafe_max.sub(old_fee.unsafe_max) as u64,
"account" => "none"
);
statsd_gauge!(
"recommended_priority_fee_diff",
get_recommended_fee(global_fees1).sub(get_recommended_fee(global_fees2)) as u64,
get_recommended_fee(new_fee).sub(get_recommended_fee(old_fee)) as u64,
"account" => "none"
);
}
Expand Down Expand Up @@ -439,7 +440,8 @@ impl PriorityFeeTracker {
fees.extend_from_slice(&slot_priority_fees.fees.non_vote_fees);
}
}
estimate_max_values(&mut fees, &mut micro_lamport_priority_fee_estimates);
micro_lamport_priority_fee_estimates = estimate_max_values(&mut fees,
micro_lamport_priority_fee_estimates);

for account in accounts {
fees.clear();
Expand All @@ -457,28 +459,31 @@ impl PriorityFeeTracker {
}
}
}
estimate_max_values(&mut fees, &mut micro_lamport_priority_fee_estimates);
micro_lamport_priority_fee_estimates = estimate_max_values(&mut fees,
micro_lamport_priority_fee_estimates);
}
micro_lamport_priority_fee_estimates
}
}

fn estimate_max_values(
mut fees: &mut Vec<f64>,
micro_lamport_priority_fee_estimates: &mut MicroLamportPriorityFeeEstimates,
) {
micro_lamport_priority_fee_estimates.min =
micro_lamport_priority_fee_estimates: MicroLamportPriorityFeeEstimates,
) -> MicroLamportPriorityFeeEstimates {
let mut estimate = MicroLamportPriorityFeeEstimates::default();
estimate.min =
percentile(&mut fees, 0).max(micro_lamport_priority_fee_estimates.min);
micro_lamport_priority_fee_estimates.low =
estimate.low =
percentile(&mut fees, 25).max(micro_lamport_priority_fee_estimates.low);
micro_lamport_priority_fee_estimates.medium =
estimate.medium =
percentile(&mut fees, 50).max(micro_lamport_priority_fee_estimates.medium);
micro_lamport_priority_fee_estimates.high =
estimate.high =
percentile(&mut fees, 75).max(micro_lamport_priority_fee_estimates.high);
micro_lamport_priority_fee_estimates.very_high =
estimate.very_high =
percentile(&mut fees, 95).max(micro_lamport_priority_fee_estimates.very_high);
micro_lamport_priority_fee_estimates.unsafe_max =
estimate.unsafe_max =
percentile(&mut fees, 100).max(micro_lamport_priority_fee_estimates.unsafe_max);
estimate
}

fn max(a: f64, b: f64) -> f64 {
Expand Down Expand Up @@ -572,7 +577,7 @@ mod tests {
set_global_default(client)
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_specific_fee_estimates() {
init_metrics();
let tracker = PriorityFeeTracker::new(10);
Expand Down Expand Up @@ -629,7 +634,7 @@ mod tests {
assert_eq!(estimates.unsafe_max, expected_max_fee);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_with_many_slots() {
init_metrics();
let tracker = PriorityFeeTracker::new(101);
Expand Down Expand Up @@ -686,7 +691,7 @@ mod tests {
assert_eq!(estimates.unsafe_max, expected_max_fee);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_with_many_slots_broken() {
// same test as above but with an extra slot to throw off the value
init_metrics();
Expand Down Expand Up @@ -736,7 +741,7 @@ mod tests {
assert_ne!(estimates.very_high, expected_very_high_fee);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_with_transactions_for_different_accounts() {
// same test as above but with an extra slot to throw off the value
init_metrics();
Expand Down Expand Up @@ -814,7 +819,7 @@ mod tests {
assert_eq!(estimates.unsafe_max, expected_max_fee);
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_with_multiple_transactions_for_different_slots() {
// same test as above but with an extra slot to throw off the value
init_metrics();
Expand Down
52 changes: 25 additions & 27 deletions src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,36 +253,36 @@ impl AtlasPriorityFeeEstimatorRpcServer for AtlasPriorityFeeEstimator {
&self,
get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest,
) -> RpcResult<GetPriorityFeeEstimateResponse> {
let algo = |accounts: Vec<Pubkey>,
include_vote: bool,
lookback_period: Option<u32>|
-> MicroLamportPriorityFeeEstimates {
self.priority_fee_tracker.get_priority_fee_estimates(
accounts,
include_vote,
lookback_period,
true
)
};
self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo)
let algo_run_fn = |accounts: Vec<Pubkey>,
include_vote: bool,
lookback_period: Option<u32>|
-> MicroLamportPriorityFeeEstimates {
self.priority_fee_tracker.get_priority_fee_estimates(
accounts,
include_vote,
lookback_period,
true,
)
};
self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo_run_fn)
}

fn get_test_priority_fee_estimate(
&self,
get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest,
) -> RpcResult<GetPriorityFeeEstimateResponse> {
let algo = |accounts: Vec<Pubkey>,
include_vote: bool,
lookback_period: Option<u32>|
-> MicroLamportPriorityFeeEstimates {
let algo_run_fn = |accounts: Vec<Pubkey>,
include_vote: bool,
lookback_period: Option<u32>|
-> MicroLamportPriorityFeeEstimates {
self.priority_fee_tracker.get_priority_fee_estimates(
accounts,
include_vote,
lookback_period,
false
false,
)
};
self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo)
self.execute_priority_fee_estimate_coordinator(get_priority_fee_estimate_request, algo_run_fn)
}
}

Expand All @@ -300,13 +300,11 @@ impl AtlasPriorityFeeEstimator {
server
}

fn execute_priority_fee_estimate_coordinator<F>(
fn execute_priority_fee_estimate_coordinator(
&self,
get_priority_fee_estimate_request: GetPriorityFeeEstimateRequest,
algo_fn: F,
priority_fee_calc_fn: impl FnOnce(Vec<Pubkey>, bool, Option<u32>) -> MicroLamportPriorityFeeEstimates,
) -> RpcResult<GetPriorityFeeEstimateResponse>
where
F: Fn(Vec<Pubkey>, bool, Option<u32>) -> MicroLamportPriorityFeeEstimates,
{
let options = get_priority_fee_estimate_request.options.clone();
let reason = validate_get_priority_fee_estimate_request(&get_priority_fee_estimate_request);
Expand All @@ -329,7 +327,7 @@ impl AtlasPriorityFeeEstimator {
}
}
let include_vote = should_include_vote(&options);
let priority_fee_levels = algo_fn(accounts, include_vote, lookback_slots);
let priority_fee_levels = priority_fee_calc_fn(accounts, include_vote, lookback_slots);
if let Some(options) = options.clone() {
if options.include_all_priority_fee_levels == Some(true) {
return Ok(GetPriorityFeeEstimateResponse {
Expand Down Expand Up @@ -407,7 +405,7 @@ mod tests {
use solana_sdk::pubkey::Pubkey;
use std::sync::Arc;

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_calculating_fees_with_all_options_none() {
prep_statsd();

Expand All @@ -432,7 +430,7 @@ mod tests {
assert!(resp.priority_fee_levels.is_none());
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_calculating_fees_with_no_options() {
prep_statsd();

Expand All @@ -456,7 +454,7 @@ mod tests {
assert!(resp.priority_fee_levels.is_none());
}

#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_calculating_all_fees() {
prep_statsd();

Expand Down Expand Up @@ -490,7 +488,7 @@ mod tests {
assert_eq!(levels.unsafe_max, 200.0);
assert!(resp.priority_fee_estimate.is_none());
}
#[tokio::test(flavor = "current_thread")]
#[tokio::test]
async fn test_calculating_recommended_given_very_low_calculated_fee() {
prep_statsd();

Expand Down

0 comments on commit 8479ef4

Please sign in to comment.