Skip to content

Commit

Permalink
refactor: PohRecorderMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry committed Feb 13, 2025
1 parent 0006e6e commit 5aaf49b
Showing 1 changed file with 71 additions and 63 deletions.
134 changes: 71 additions & 63 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,54 @@ pub enum PohLeaderStatus {
Reached { poh_slot: Slot, parent_slot: Slot },
}

struct PohRecorderMetrics {
flush_cache_tick_us: u64,
flush_cache_no_tick_us: u64,
record_us: u64,
record_lock_contention_us: u64,
report_metrics_us: u64,
send_entry_us: u64,
tick_lock_contention_us: u64,
ticks_from_record: u64,
total_sleep_us: u64,
last_metric: Instant,
}

impl PohRecorderMetrics {
fn report(&mut self, bank_slot: Slot) {
if self.last_metric.elapsed().as_millis() > 1000 {
datapoint_info!(
"poh_recorder",
("slot", bank_slot, i64),
("flush_cache_tick_us", self.flush_cache_tick_us, i64),
("flush_cache_no_tick_us", self.flush_cache_no_tick_us, i64),
("record_us", self.record_us, i64),
(
"record_lock_contention_us",
self.record_lock_contention_us,
i64
),
("report_metrics_us", self.report_metrics_us, i64),
("send_entry_us", self.send_entry_us, i64),
("tick_lock_contention", self.tick_lock_contention_us, i64),
("ticks_from_record", self.ticks_from_record, i64),
("total_sleep_us", self.total_sleep_us, i64),
);

self.flush_cache_tick_us = 0;
self.flush_cache_no_tick_us = 0;
self.record_us = 0;
self.record_lock_contention_us = 0;
self.report_metrics_us = 0;
self.send_entry_us = 0;
self.tick_lock_contention_us = 0;
self.ticks_from_record = 0;
self.total_sleep_us = 0;
self.last_metric = Instant::now();
}
}
}

pub struct PohRecorder {
pub poh: Arc<Mutex<Poh>>,
tick_height: u64,
Expand All @@ -260,16 +308,7 @@ pub struct PohRecorder {
leader_schedule_cache: Arc<LeaderScheduleCache>,
ticks_per_slot: u64,
target_ns_per_tick: u64,
record_lock_contention_us: u64,
flush_cache_no_tick_us: u64,
flush_cache_tick_us: u64,
send_entry_us: u64,
tick_lock_contention_us: u64,
total_sleep_us: u64,
record_us: u64,
report_metrics_us: u64,
ticks_from_record: u64,
last_metric: Instant,
metrics: PohRecorderMetrics,
record_sender: Sender<Record>,
leader_bank_notifier: Arc<LeaderBankNotifier>,
delay_leader_block_for_pending_fork: bool,
Expand Down Expand Up @@ -359,16 +398,18 @@ impl PohRecorder {
leader_schedule_cache: leader_schedule_cache.clone(),
ticks_per_slot,
target_ns_per_tick,
record_lock_contention_us: 0,
flush_cache_tick_us: 0,
flush_cache_no_tick_us: 0,
send_entry_us: 0,
tick_lock_contention_us: 0,
record_us: 0,
report_metrics_us: 0,
total_sleep_us: 0,
ticks_from_record: 0,
last_metric: Instant::now(),
metrics: PohRecorderMetrics {
flush_cache_tick_us: 0,
flush_cache_no_tick_us: 0,
record_us: 0,
record_lock_contention_us: 0,
report_metrics_us: 0,
send_entry_us: 0,
tick_lock_contention_us: 0,
ticks_from_record: 0,
total_sleep_us: 0,
last_metric: Instant::now(),
},
record_sender,
leader_bank_notifier: Arc::default(),
delay_leader_block_for_pending_fork,
Expand Down Expand Up @@ -415,12 +456,12 @@ impl PohRecorder {
// cannot be generated by `record()`
assert!(!transactions.is_empty(), "No transactions provided");

let ((), report_metrics_us) = measure_us!(self.report_metrics(bank_slot));
self.report_metrics_us += report_metrics_us;
let ((), report_metrics_us) = measure_us!(self.metrics.report(bank_slot));
self.metrics.report_metrics_us += report_metrics_us;

loop {
let (flush_cache_res, flush_cache_us) = measure_us!(self.flush_cache(false));
self.flush_cache_no_tick_us += flush_cache_us;
self.metrics.flush_cache_no_tick_us += flush_cache_us;
flush_cache_res?;

let working_bank = self
Expand All @@ -432,10 +473,10 @@ impl PohRecorder {
}

let (mut poh_lock, poh_lock_us) = measure_us!(self.poh.lock().unwrap());
self.record_lock_contention_us += poh_lock_us;
self.metrics.record_lock_contention_us += poh_lock_us;

let (record_mixin_res, record_mixin_us) = measure_us!(poh_lock.record(mixin));
self.record_us += record_mixin_us;
self.metrics.record_us += record_mixin_us;

drop(poh_lock);

Expand All @@ -451,7 +492,7 @@ impl PohRecorder {
self.working_bank_sender
.send((bank_clone, (entry, self.tick_height)))
});
self.send_entry_us += send_entry_us;
self.metrics.send_entry_us += send_entry_us;
send_entry_res?;
let starting_transaction_index =
working_bank.transaction_index.inspect(|transaction_index| {
Expand All @@ -464,7 +505,7 @@ impl PohRecorder {

// record() might fail if the next PoH hash needs to be a tick. But that's ok, tick()
// and re-record()
self.ticks_from_record += 1;
self.metrics.ticks_from_record += 1;
self.tick();
}
}
Expand All @@ -480,7 +521,7 @@ impl PohRecorder {
};
(poh_entry, target_time)
});
self.tick_lock_contention_us += tick_lock_contention_us;
self.metrics.tick_lock_contention_us += tick_lock_contention_us;

if let Some(poh_entry) = poh_entry {
self.tick_height += 1;
Expand All @@ -501,7 +542,7 @@ impl PohRecorder {
));

let (_flush_res, flush_cache_and_tick_us) = measure_us!(self.flush_cache(true));
self.flush_cache_tick_us += flush_cache_and_tick_us;
self.metrics.flush_cache_tick_us += flush_cache_and_tick_us;

let (_, sleep_us) = measure_us!({
let target_time = target_time.unwrap();
Expand All @@ -512,7 +553,7 @@ impl PohRecorder {
std::hint::spin_loop();
}
});
self.total_sleep_us += sleep_us;
self.metrics.total_sleep_us += sleep_us;
}
}

Expand Down Expand Up @@ -1052,39 +1093,6 @@ impl PohRecorder {
self.report_poh_timing_point_by_tick()
}
}

fn report_metrics(&mut self, bank_slot: Slot) {
if self.last_metric.elapsed().as_millis() > 1000 {
datapoint_info!(
"poh_recorder",
("slot", bank_slot, i64),
("tick_lock_contention", self.tick_lock_contention_us, i64),
("record_us", self.record_us, i64),
("flush_cache_no_tick_us", self.flush_cache_no_tick_us, i64),
("flush_cache_tick_us", self.flush_cache_tick_us, i64),
("send_entry_us", self.send_entry_us, i64),
("ticks_from_record", self.ticks_from_record, i64),
("total_sleep_us", self.total_sleep_us, i64),
(
"record_lock_contention_us",
self.record_lock_contention_us,
i64
),
("report_metrics_us", self.report_metrics_us, i64),
);

self.tick_lock_contention_us = 0;
self.record_us = 0;
self.total_sleep_us = 0;
self.record_lock_contention_us = 0;
self.flush_cache_no_tick_us = 0;
self.flush_cache_tick_us = 0;
self.send_entry_us = 0;
self.ticks_from_record = 0;
self.report_metrics_us = 0;
self.last_metric = Instant::now();
}
}
}

fn do_create_test_recorder(
Expand Down

0 comments on commit 5aaf49b

Please sign in to comment.