Skip to content

Commit

Permalink
Move block_cost_limit tracking to BankingStage in preparation for SIM…
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu authored Jan 29, 2025
1 parent a1a3779 commit c74444a
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 425 deletions.
1 change: 1 addition & 0 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ fn main() {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// This is so that the signal_receiver does not go out of scope after the closure.
Expand Down
2 changes: 2 additions & 0 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&mut transaction_buffer,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
&|_| 0,
);
});

Expand Down Expand Up @@ -327,6 +328,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let chunk_len = verified.len() / CHUNKS;
Expand Down
1 change: 1 addition & 0 deletions core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz
&bank,
transaction_iter.next().unwrap(),
0,
&|_| 0,
);
assert!(summary
.execute_and_commit_transactions_output
Expand Down
1 change: 1 addition & 0 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ impl BankingSimulator {
false,
collections::HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let (&_slot, &raw_base_event_time) = freeze_time_by_slot
Expand Down
23 changes: 21 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use {
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
Expand Down Expand Up @@ -366,6 +366,8 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
// callback function for compute space reservation for BundleStage
block_cost_limit_block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
Self::new_num_threads(
block_production_method,
Expand All @@ -384,6 +386,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_block_cost_limit_reservation_cb,
)
}

Expand All @@ -405,6 +408,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
match block_production_method {
BlockProductionMethod::CentralScheduler => Self::new_central_scheduler(
Expand All @@ -423,6 +427,7 @@ impl BankingStage {
enable_forwarding,
blacklisted_accounts,
bundle_account_locker,
block_cost_limit_reservation_cb,
),
}
}
Expand All @@ -444,6 +449,7 @@ impl BankingStage {
enable_forwarding: bool,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
Expand Down Expand Up @@ -492,6 +498,7 @@ impl BankingStage {
),
blacklisted_accounts.clone(),
bundle_account_locker.clone(),
block_cost_limit_reservation_cb.clone(),
));
}

Expand Down Expand Up @@ -521,11 +528,12 @@ impl BankingStage {
);

worker_metrics.push(consume_worker.metrics_handle());
let cb = block_cost_limit_reservation_cb.clone();
bank_thread_hdls.push(
Builder::new()
.name(format!("solCoWorker{id:02}"))
.spawn(move || {
let _ = consume_worker.run();
let _ = consume_worker.run(cb);
})
.unwrap(),
)
Expand Down Expand Up @@ -589,6 +597,7 @@ impl BankingStage {
unprocessed_transaction_storage: UnprocessedTransactionStorage,
blacklisted_accounts: HashSet<Pubkey>,
bundle_account_locker: BundleAccountLocker,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64 + Clone + Send + 'static,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let consumer = Consumer::new(
Expand All @@ -610,6 +619,7 @@ impl BankingStage {
&consumer,
id,
unprocessed_transaction_storage,
block_cost_limit_reservation_cb,
)
})
.unwrap()
Expand All @@ -623,6 +633,7 @@ impl BankingStage {
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
block_cost_limit_reservation_cb: &impl Fn(&Bank) -> u64,
) {
if unprocessed_transaction_storage.should_not_process() {
return;
Expand All @@ -648,6 +659,7 @@ impl BankingStage {
unprocessed_transaction_storage,
banking_stage_stats,
slot_metrics_tracker,
block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_consume_buffered_packets_us(consume_buffered_packets_us);
Expand Down Expand Up @@ -686,6 +698,7 @@ impl BankingStage {
consumer: &Consumer,
id: u32,
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
block_cost_limit_reservation_cb: impl Fn(&Bank) -> u64,
) {
let mut banking_stage_stats = BankingStageStats::new(id);

Expand All @@ -703,6 +716,7 @@ impl BankingStage {
&mut unprocessed_transaction_storage,
&banking_stage_stats,
&mut slot_metrics_tracker,
&block_cost_limit_reservation_cb
));
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_us);
Expand Down Expand Up @@ -840,6 +854,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
drop(non_vote_sender);
drop(tpu_vote_sender);
Expand Down Expand Up @@ -902,6 +917,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);
trace!("sending bank");
drop(non_vote_sender);
Expand Down Expand Up @@ -993,6 +1009,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// fund another account so we can send 2 good transactions in a single batch.
Expand Down Expand Up @@ -1170,6 +1187,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

// wait for banking_stage to eat the packets
Expand Down Expand Up @@ -1377,6 +1395,7 @@ mod tests {
false,
HashSet::default(),
BundleAccountLocker::default(),
|_| 0,
);

let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();
Expand Down
24 changes: 15 additions & 9 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,18 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
self.metrics.clone()
}

pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> {
pub fn run(self, reservation_cb: impl Fn(&Bank) -> u64) -> Result<(), ConsumeWorkerError<Tx>> {
loop {
let work = self.consume_receiver.recv()?;
self.consume_loop(work)?;
self.consume_loop(work, &reservation_cb)?;
}
}

fn consume_loop(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
fn consume_loop(
&self,
work: ConsumeWork<Tx>,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError<Tx>> {
let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
let Some(mut bank) = maybe_consume_bank else {
self.metrics
Expand Down Expand Up @@ -97,7 +101,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
return self.retry_drain(work);
}
}
self.consume(&bank, work)?;
self.consume(&bank, work, reservation_cb)?;
}

Ok(())
Expand All @@ -108,11 +112,13 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
&self,
bank: &Arc<Bank>,
work: ConsumeWork<Tx>,
reservation_cb: &impl Fn(&Bank) -> u64,
) -> Result<(), ConsumeWorkerError<Tx>> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
&work.max_ages,
reservation_cb,
);

self.metrics.update_for_consume(&output);
Expand Down Expand Up @@ -904,7 +910,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));

let pubkey1 = Pubkey::new_unique();

Expand Down Expand Up @@ -949,7 +955,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -998,7 +1004,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -1050,7 +1056,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down Expand Up @@ -1125,7 +1131,7 @@ mod tests {
consumed_receiver,
..
} = &test_frame;
let worker_thread = std::thread::spawn(move || worker.run());
let worker_thread = std::thread::spawn(move || worker.run(|_| 0));
poh_recorder
.write()
.unwrap()
Expand Down
Loading

0 comments on commit c74444a

Please sign in to comment.