From 0367ff9fe8134d290fb3d03a5f261fc4af5b9a3a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sat, 8 Feb 2025 22:29:43 +0900 Subject: [PATCH 1/6] Buffer pre-session tasks in BP unified scheduler --- Cargo.lock | 7 + Cargo.toml | 1 + ledger/src/blockstore_processor.rs | 2 +- programs/sbf/Cargo.lock | 7 + runtime/src/bank_forks.rs | 9 +- runtime/src/installed_scheduler_pool.rs | 39 +- svm/examples/Cargo.lock | 7 + unified-scheduler-logic/Cargo.toml | 1 + unified-scheduler-logic/src/lib.rs | 23 +- unified-scheduler-pool/src/lib.rs | 572 ++++++++++++++++++++++-- 10 files changed, 610 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9b2a4203f2297c..022cd4dfe16169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10598,6 +10598,7 @@ dependencies = [ "solana-runtime-transaction", "solana-transaction", "static_assertions", + "unwrap_none", ] [[package]] @@ -12347,6 +12348,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unwrap_none" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "461d0c5956fcc728ecc03a3a961e4adc9a7975d86f6f8371389a289517c02ca9" + [[package]] name = "uriparse" version = "0.6.4" diff --git a/Cargo.toml b/Cargo.toml index 9a0103ac4790aa..8e519276ea54ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -596,6 +596,7 @@ tower = "0.5.2" trait-set = "0.3.0" trees = "0.4.2" tungstenite = "0.20.1" +unwrap_none = "0.1.2" uriparse = "0.6.4" url = "2.5.4" vec_extract_if_polyfill = "0.1.0" diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 0f13e76f99c7dc..d7720dd533e5d6 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -5032,7 +5032,7 @@ pub mod tests { .. } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 10c8f48eb8e83c..0c7f2b797acd61 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -8870,6 +8870,7 @@ dependencies = [ "solana-runtime-transaction", "solana-transaction", "static_assertions", + "unwrap_none", ] [[package]] @@ -10390,6 +10391,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unwrap_none" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "461d0c5956fcc728ecc03a3a961e4adc9a7975d86f6f8371389a289517c02ca9" + [[package]] name = "uriparse" version = "0.6.4" diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index c46c0216871f5b..cb4e175787fe9c 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -246,7 +246,14 @@ impl BankForks { let context = SchedulingContext::new_with_mode(mode, bank.clone()); let scheduler = scheduler_pool.take_scheduler(context); let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler)); - scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener()); + if matches!(mode, SchedulingMode::BlockVerification) { + // Skip registering for block production. Both the replay stage and PohRecorder + // don't support _concurrent block production_ at all. It's strongly assumed that + // block is produced in singleton way and it's actually desired, while ignoring the + // opportunity cost of (hopefully rare!) fork switching... + scheduler_pool + .register_timeout_listener(bank_with_scheduler.create_timeout_listener()); + } bank_with_scheduler } else { BankWithScheduler::new_without_scheduler(bank) diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 8012f77af8d0a4..2e989406b083ec 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -238,28 +238,36 @@ pub type SchedulerId = u64; #[derive(Clone, Debug)] pub struct SchedulingContext { mode: SchedulingMode, - bank: Arc, + bank: Option>, } impl SchedulingContext { - pub fn new(bank: Arc) -> Self { - // mode will be configurable later + pub fn for_preallocation() -> Self { Self { - mode: SchedulingMode::BlockVerification, - bank, + mode: SchedulingMode::BlockProduction, + bank: None, } } pub fn new_with_mode(mode: SchedulingMode, bank: Arc) -> Self { - Self { mode, bank } + Self { + mode, + bank: Some(bank), + } + } + + #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] + fn for_verification(bank: Arc) -> Self { + Self::new_with_mode(SchedulingMode::BlockVerification, bank) } #[cfg(feature = "dev-context-only-utils")] pub fn for_production(bank: Arc) -> Self { - Self { - mode: SchedulingMode::BlockProduction, - bank, - } + Self::new_with_mode(SchedulingMode::BlockProduction, bank) + } + + pub fn is_preallocated(&self) -> bool { + self.bank.is_none() } pub fn mode(&self) -> SchedulingMode { @@ -267,11 +275,11 @@ impl SchedulingContext { } pub fn bank(&self) -> &Arc { - &self.bank + self.bank.as_ref().unwrap() } - pub fn slot(&self) -> Slot { - self.bank().slot() + pub fn slot(&self) -> Option { + self.bank.as_ref().map(|bank| bank.slot()) } } @@ -570,7 +578,8 @@ impl BankWithSchedulerInner { let pool = pool.clone(); drop(scheduler); - let context = SchedulingContext::new(self.bank.clone()); + // Schedulers can be stale only if its mode is block-verification. + let context = SchedulingContext::for_verification(self.bank.clone()); let mut scheduler = self.scheduler.write().unwrap(); trace!("with_active_scheduler: {:?}", scheduler); scheduler.transition_from_stale_to_active(|pool, result_with_timings| { @@ -773,7 +782,7 @@ mod tests { mock.expect_context() .times(1) .in_sequence(&mut seq.lock().unwrap()) - .return_const(SchedulingContext::new(bank)); + .return_const(SchedulingContext::for_verification(bank)); for wait_reason in is_dropped_flags { let seq_cloned = seq.clone(); diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index fe5814cc04b2b8..5f3a5c34aebea4 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -8207,6 +8207,7 @@ dependencies = [ "solana-runtime-transaction", "solana-transaction", "static_assertions", + "unwrap_none", ] [[package]] @@ -9679,6 +9680,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unwrap_none" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "461d0c5956fcc728ecc03a3a961e4adc9a7975d86f6f8371389a289517c02ca9" + [[package]] name = "uriparse" version = "0.6.4" diff --git a/unified-scheduler-logic/Cargo.toml b/unified-scheduler-logic/Cargo.toml index 6a278e8701050f..b0acb2d03a97e9 100644 --- a/unified-scheduler-logic/Cargo.toml +++ b/unified-scheduler-logic/Cargo.toml @@ -15,6 +15,7 @@ solana-pubkey = { workspace = true } solana-runtime-transaction = { workspace = true } solana-transaction = { workspace = true } static_assertions = { workspace = true } +unwrap_none = { workspace = true } [dev-dependencies] solana-instruction = { workspace = true } diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index c7b5e7a1e8ca76..302b867c11f1cb 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -103,9 +103,10 @@ use { solana_transaction::sanitized::SanitizedTransaction, static_assertions::const_assert_eq, std::{collections::VecDeque, mem, sync::Arc}, + unwrap_none::UnwrapNone, }; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub enum SchedulingMode { BlockVerification, BlockProduction, @@ -668,11 +669,29 @@ impl SchedulingStateMachine { /// indicating the scheduled task is blocked currently. /// /// Note that this function takes ownership of the task to allow for future optimizations. + #[cfg(test)] #[must_use] pub fn schedule_task(&mut self, task: Task) -> Option { + self.schedule_or_buffer_task(task, false) + } + + pub fn buffer_task(&mut self, task: Task) { + self.schedule_or_buffer_task(task, true).unwrap_none(); + } + + #[must_use] + pub fn schedule_or_buffer_task(&mut self, task: Task, force_buffering: bool) -> Option { self.total_task_count.increment_self(); self.active_task_count.increment_self(); - self.try_lock_usage_queues(task) + self.try_lock_usage_queues(task).and_then(|task| { + if !force_buffering { + Some(task) + } else { + self.unblocked_task_count.increment_self(); + self.unblocked_task_queue.push_back(task); + None + } + }) } #[must_use] diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 61ce28afbfd724..bafc0b30d8b7d0 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -53,7 +53,7 @@ use { mem, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering::Relaxed}, - Arc, Mutex, OnceLock, Weak, + Arc, Mutex, MutexGuard, OnceLock, Weak, }, thread::{self, sleep, JoinHandle}, time::{Duration, Instant}, @@ -70,6 +70,7 @@ use crate::sleepless_testing::BuilderTracked; #[derive(Debug)] enum CheckPoint { NewTask(usize), + NewBufferedTask(usize), TaskHandled(usize), SchedulerThreadAborted, IdleSchedulerCleaned(usize), @@ -96,6 +97,7 @@ type AtomicSchedulerId = AtomicU64; #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { scheduler_inners: Mutex>, + block_production_scheduler_inner: Mutex>, trashed_scheduler_inners: Mutex>, timeout_listeners: Mutex>, handler_count: usize, @@ -117,6 +119,61 @@ pub struct SchedulerPool, TH: TaskHandler> { _phantom: PhantomData, } +/// A small tri-state [`Option`]-like private helper type to codify the existence of +/// block-production scheduler as a singleton. +/// +/// Block-production scheduler should carry its buffered (= not-yet-processed) tasks over sessions +/// (= banks) and `banking_packet_receiver` shouldn't be consumed by multiple schedulers at once. +/// So, it's managed differently from block-verification schedulers. +#[derive(Default, Debug)] +enum BlockProdutionSchedulerInner, TH: TaskHandler> { + #[default] + NotSpawned, + Pooled(S::Inner), + Taken(SchedulerId), +} + +impl, TH: TaskHandler> BlockProdutionSchedulerInner { + fn can_put(&self, returned: &S::Inner) -> bool { + match self { + Self::NotSpawned => false, + Self::Pooled(inner) => { + // `returned` must be a block-verification scheduler if there's already a block- + // production scheduler. So, return false with following assert_ne!(). + assert_ne!(inner.id(), returned.id()); + false + } + Self::Taken(id) => *id == returned.id(), + } + } + + fn put_spawned(&mut self, inner: S::Inner) { + assert_matches!(mem::replace(self, Self::Pooled(inner)), Self::NotSpawned); + } + + fn trash_taken(&mut self) { + assert_matches!(mem::replace(self, Self::NotSpawned), Self::Taken(_)); + } + + fn put_returned(&mut self, inner: S::Inner) { + let new = inner.id(); + assert_matches!(mem::replace(self, Self::Pooled(inner)), Self::Taken(old) if old == new); + } + + fn take_pooled(&mut self) -> S::Inner { + let id = { + let Self::Pooled(inner) = &self else { + panic!("cannot take: {:?}", self) + }; + inner.id() + }; + let Self::Pooled(inner) = mem::replace(self, Self::Taken(id)) else { + unreachable!(); + }; + inner + } +} + #[derive(derive_more::Debug, Clone)] pub struct HandlerContext { log_messages_bytes_limit: Option, @@ -284,6 +341,7 @@ where let scheduler_pool = Arc::new_cyclic(|weak_self| Self { scheduler_inners: Mutex::default(), + block_production_scheduler_inner: Mutex::default(), trashed_scheduler_inners: Mutex::default(), timeout_listeners: Mutex::default(), handler_count, @@ -431,7 +489,21 @@ where // Refer to the comment in is_aborted() as to the exact definition of the concept of // _trashed_ and the interaction among different parts of unified scheduler. let should_trash = scheduler.is_trashed(); + let mut block_production_scheduler_inner = + self.block_production_scheduler_inner.lock().unwrap(); + if should_trash { + // Maintain the runtime invariant established in register_banking_stage() about + // the availability of pooled block production scheduler by re-spawning one. + if block_production_scheduler_inner.can_put(&scheduler) { + block_production_scheduler_inner.trash_taken(); + // To prevent block-production scheduler is taken from do_take_resumed_scheduler() + // by different thread at this very moment, `.trash_taken()` and `.put_spawned()` + // must be done atomically. That's why we pass around MutexGuard into + // spawn_block_production_scheduler(). + self.spawn_block_production_scheduler(&mut block_production_scheduler_inner); + } + // Delay drop()-ing this trashed returned scheduler inner by stashing it in // self.trashed_scheduler_inners, which is periodically drained by the `solScCleaner` // thread. Dropping it could take long time (in fact, @@ -440,12 +512,15 @@ where .lock() .expect("not poisoned") .push(scheduler); + } else if block_production_scheduler_inner.can_put(&scheduler) { + block_production_scheduler_inner.put_returned(scheduler); } else { self.scheduler_inners .lock() .expect("not poisoned") .push((scheduler, Instant::now())); } + drop(block_production_scheduler_inner); } #[cfg(test)] @@ -460,13 +535,35 @@ where ) -> S { assert_matches!(result_with_timings, (Ok(_), _)); - // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been - // returned recently - if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() - { - S::from_inner(inner, context, result_with_timings) - } else { - S::spawn(self.self_arc(), context, result_with_timings) + match context.mode() { + BlockVerification => { + // pop is intentional for filo, expecting relatively warmed-up scheduler due to + // having been returned recently + if let Some((inner, _pooled_at)) = + self.scheduler_inners.lock().expect("not poisoned").pop() + { + S::from_inner(inner, context, result_with_timings) + } else { + S::spawn(self.self_arc(), context, result_with_timings) + } + } + BlockProduction => { + // There must be a pooled block-production scheduler at this point because prior + // register_banking_stage() invocation should have spawned such one. + assert!( + self.banking_stage_handler_context + .lock() + .expect("not poisoned") + .is_some(), + "register_banking_stage() isn't called yet", + ); + let inner = self + .block_production_scheduler_inner + .lock() + .expect("not poisoned") + .take_pooled(); + S::from_inner(inner, context, result_with_timings) + } } } @@ -486,6 +583,11 @@ where banking_packet_handler, transaction_recorder, }); + // Immediately start a block production scheduler, so that the scheduler can start + // buffering tasks, which are preprocessed as much as possible. + self.spawn_block_production_scheduler( + &mut self.block_production_scheduler_inner.lock().unwrap(), + ); } fn create_handler_context( @@ -528,6 +630,20 @@ where ) } + fn spawn_block_production_scheduler( + &self, + block_production_scheduler_inner: &mut MutexGuard<'_, BlockProdutionSchedulerInner>, + ) { + let scheduler = S::spawn( + self.self_arc(), + SchedulingContext::for_preallocation(), + initialized_result_with_timings(), + ); + let ((result, _timings), inner) = scheduler.into_inner(); + assert_matches!(result, Ok(_)); + block_production_scheduler_inner.put_spawned(inner); + } + pub fn default_handler_count() -> usize { Self::calculate_default_handler_count( thread::available_parallelism() @@ -1115,6 +1231,15 @@ impl, TH: TaskHandler> ThreadManager { mut result_with_timings: ResultWithTimings, handler_context: HandlerContext, ) { + let scheduling_mode = context.mode(); + let (mut is_finished, mut session_ending) = match scheduling_mode { + BlockVerification => (false, false), + BlockProduction => { + assert!(context.is_preallocated()); + (true, true) + } + }; + // Firstly, setup bi-directional messaging between the scheduler and handlers to pass // around tasks, by creating 2 channels (one for to-be-handled tasks from the scheduler to // the handlers and the other for finished tasks from the handlers to the scheduler). @@ -1222,8 +1347,6 @@ impl, TH: TaskHandler> ThreadManager { .take() .expect("no 2nd start_threads()"); - let mut session_ending = false; - // Now, this is the main loop for the scheduler thread, which is a special beast. // // That's because it could be the most notable bottleneck of throughput in the future @@ -1283,7 +1406,6 @@ impl, TH: TaskHandler> ThreadManager { // 2. Subsequent result_with_timings are propagated explicitly from // the new_task_receiver.recv() invocation located at the end of loop. 'nonaborted_main_loop: loop { - let mut is_finished = false; while !is_finished { // ALL recv selectors are eager-evaluated ALWAYS by current crossbeam impl, // which isn't great and is inconsistent with `if`s in the Rust's match @@ -1321,12 +1443,12 @@ impl, TH: TaskHandler> ThreadManager { runnable_task_sender.send_payload(task).unwrap(); }, recv(new_task_receiver) -> message => { - assert!(!session_ending); + assert!(scheduling_mode == BlockProduction || !session_ending); match message { Ok(NewTaskPayload::Payload(task)) => { sleepless_testing::at(CheckPoint::NewTask(task.task_index())); - if let Some(task) = state_machine.schedule_task(task) { + if let Some(task) = state_machine.schedule_or_buffer_task(task, session_ending) { runnable_task_sender.send_aux_payload(task).unwrap(); } } @@ -1356,29 +1478,42 @@ impl, TH: TaskHandler> ThreadManager { is_finished = session_ending && state_machine.has_no_active_task(); } + assert!(mem::replace(&mut is_finished, false)); // Finalize the current session after asserting it's explicitly requested so. - assert!(session_ending); // Send result first because this is blocking the replay code-path. session_result_sender .send(result_with_timings) .expect("always outlived receiver"); state_machine.reinitialize(); - session_ending = false; + assert!(mem::replace(&mut session_ending, false)); - { + loop { // Prepare for the new session. match new_task_receiver.recv() { + Ok(NewTaskPayload::Payload(task)) => { + sleepless_testing::at(CheckPoint::NewBufferedTask( + task.task_index(), + )); + assert_eq!(scheduling_mode, BlockProduction); + state_machine.buffer_task(task); + } Ok(NewTaskPayload::OpenSubchannel(context_and_result_with_timings)) => { let (new_context, new_result_with_timings) = *context_and_result_with_timings; // We just received subsequent (= not initial) session and about to // enter into the preceding `while(!is_finished) {...}` loop again. // Before that, propagate new SchedulingContext to handler threads + assert_eq!(scheduling_mode, new_context.mode()); + assert!(!new_context.is_preallocated()); runnable_task_sender .send_chained_channel(&new_context, handler_count) .unwrap(); result_with_timings = new_result_with_timings; + break; + } + Ok(NewTaskPayload::CloseSubchannel) => { + // This match arm can be hit if context.is_preallocated() } Err(_) => { // This unusual condition must be triggered by ThreadManager::drop(). @@ -1386,7 +1521,6 @@ impl, TH: TaskHandler> ThreadManager { result_with_timings = initialized_result_with_timings(); break 'nonaborted_main_loop; } - Ok(_) => unreachable!(), } } } @@ -1799,6 +1933,7 @@ mod tests { #[derive(Debug)] enum TestCheckPoint { BeforeNewTask, + AfterNewBufferedTask, AfterTaskHandled, AfterSchedulerThreadAborted, BeforeIdleSchedulerCleaned, @@ -1835,7 +1970,7 @@ mod tests { let pool = DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); - let context = SchedulingContext::new(bank); + let context = SchedulingContext::for_verification(bank); let scheduler = pool.take_scheduler(context); let debug = format!("{scheduler:#?}"); @@ -1870,7 +2005,7 @@ mod tests { ); let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); - let context1 = SchedulingContext::new(bank); + let context1 = SchedulingContext::for_verification(bank); let context2 = context1.clone(); let old_scheduler = pool.do_take_scheduler(context1); @@ -1935,7 +2070,7 @@ mod tests { ); let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); - let context1 = SchedulingContext::new(bank); + let context1 = SchedulingContext::for_verification(bank); let context2 = context1.clone(); let small_scheduler = pool.do_take_scheduler(context1); @@ -2010,7 +2145,7 @@ mod tests { ); let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.take_scheduler(context); let bank = BankWithScheduler::new(bank, Some(scheduler)); pool.register_timeout_listener(bank.create_timeout_listener()); @@ -2080,7 +2215,7 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.take_scheduler(context); let bank = BankWithScheduler::new(bank, Some(scheduler)); @@ -2148,7 +2283,7 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.take_scheduler(context); let bank = BankWithScheduler::new(bank, Some(scheduler)); @@ -2200,7 +2335,7 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.take_scheduler(context); let bank = BankWithScheduler::new(bank, Some(scheduler)); @@ -2287,7 +2422,7 @@ mod tests { None, ignored_prioritization_fee_cache, ); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.do_take_scheduler(context); scheduler.schedule_execution(tx, 0).unwrap(); @@ -2379,7 +2514,7 @@ mod tests { None, ignored_prioritization_fee_cache, ); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.do_take_scheduler(context); // This test is racy. @@ -2415,7 +2550,7 @@ mod tests { let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); - let context = &SchedulingContext::new(bank); + let context = &SchedulingContext::for_verification(bank); let scheduler1 = pool.do_take_scheduler(context.clone()); let scheduler_id1 = scheduler1.id(); @@ -2444,7 +2579,7 @@ mod tests { let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); let bank = Arc::new(Bank::default_for_tests()); - let context = &SchedulingContext::new(bank); + let context = &SchedulingContext::for_verification(bank); let mut scheduler = pool.do_take_scheduler(context.clone()); // should never panic. @@ -2466,8 +2601,8 @@ mod tests { let new_bank = &Arc::new(Bank::default_for_tests()); assert!(!Arc::ptr_eq(old_bank, new_bank)); - let old_context = &SchedulingContext::new(old_bank.clone()); - let new_context = &SchedulingContext::new(new_bank.clone()); + let old_context = &SchedulingContext::for_verification(old_bank.clone()); + let new_context = &SchedulingContext::for_verification(new_bank.clone()); let scheduler = pool.do_take_scheduler(old_context.clone()); let scheduler_id = scheduler.id(); @@ -2552,7 +2687,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); assert_eq!(bank.transaction_count(), 0); let scheduler = pool.take_scheduler(context); @@ -2597,7 +2732,7 @@ mod tests { DEFAULT_TIMEOUT_DURATION, ); let pool = pool_raw.clone(); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.take_scheduler(context); let unfunded_keypair = Keypair::new(); @@ -2725,7 +2860,7 @@ mod tests { None, ignored_prioritization_fee_cache, ); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.take_scheduler(context); @@ -2800,7 +2935,7 @@ mod tests { None, ignored_prioritization_fee_cache, ); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let scheduler = pool.do_take_scheduler(context); for i in 0..10 { @@ -2884,7 +3019,7 @@ mod tests { None, ignored_prioritization_fee_cache, ); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); assert_eq!(bank.transaction_count(), 0); let scheduler = pool.take_scheduler(context); @@ -2959,8 +3094,8 @@ mod tests { 2, genesis_config.hash(), )); - let context0 = &SchedulingContext::new(bank0.clone()); - let context1 = &SchedulingContext::new(bank1.clone()); + let context0 = &SchedulingContext::for_verification(bank0.clone()); + let context1 = &SchedulingContext::for_verification(bank1.clone()); // Exercise the scheduler by busy-looping to expose the race condition for (context, index) in [(context0, 0), (context1, 1)] @@ -3152,7 +3287,7 @@ mod tests { ); } let (bank, _bank_forks) = setup_dummy_fork_graph(bank); - let context = SchedulingContext::new(bank.clone()); + let context = SchedulingContext::for_verification(bank.clone()); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = @@ -3238,7 +3373,7 @@ mod tests { let result = &mut Ok(()); let timings = &mut ExecuteTimings::default(); let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let scheduling_context = &SchedulingContext::new(bank.clone()); + let scheduling_context = &SchedulingContext::for_verification(bank.clone()); let handler_context = &HandlerContext { log_messages_bytes_limit: None, transaction_status_sender: None, @@ -3392,4 +3527,363 @@ mod tests { exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } + + #[test] + fn test_block_production_scheduler_schedule_execution_success() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = setup_dummy_fork_graph(bank); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + + let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + pool.register_banking_stage( + banking_packet_receiver, + Box::new(|_, _| unreachable!()), + poh_recorder.read().unwrap().new_recorder(), + ); + + assert_eq!(bank.transaction_count(), 0); + let context = SchedulingContext::for_production(bank.clone()); + let scheduler = pool.take_scheduler(context); + let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + scheduler.schedule_execution(tx0, 0).unwrap(); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + assert_eq!(bank.transaction_count(), 1); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + + #[test] + fn test_block_production_scheduler_buffering_on_spawn() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &CheckPoint::NewBufferedTask(17), + &TestCheckPoint::AfterNewBufferedTask, + ]); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = setup_dummy_fork_graph(bank); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + + // send fake packet batch to trigger banking_packet_handler + let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); + banking_packet_sender + .send(BankingPacketBatch::default()) + .unwrap(); + assert_eq!(banking_packet_sender.len(), 1); + + // A dummy handler which unconditionally sends tx0 back to the scheduler thread + let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let fixed_banking_packet_handler = + Box::new(move |helper: &BankingStageHelper, _banking_packet| { + helper.send_new_task(helper.create_new_task(tx0.clone(), 17)) + }); + pool.register_banking_stage( + banking_packet_receiver, + fixed_banking_packet_handler, + poh_recorder.read().unwrap().new_recorder(), + ); + let context = SchedulingContext::for_production(bank.clone()); + + // Confirm the banking packet channel is cleared, even before taking scheduler + sleepless_testing::at(TestCheckPoint::AfterNewBufferedTask); + assert_eq!(banking_packet_sender.len(), 0); + + assert_eq!(bank.transaction_count(), 0); + let scheduler = pool.take_scheduler(context); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + assert_eq!(bank.transaction_count(), 1); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + + #[test] + fn test_block_production_scheduler_buffering_before_new_session() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &CheckPoint::NewBufferedTask(17), + &TestCheckPoint::AfterNewBufferedTask, + ]); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = setup_dummy_fork_graph(bank); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + + // A dummy handler which unconditionally sends tx0 back to the scheduler thread + let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &solana_pubkey::new_rand(), + 2, + genesis_config.hash(), + )); + let fixed_banking_packet_handler = + Box::new(move |helper: &BankingStageHelper, _banking_packet| { + helper.send_new_task(helper.create_new_task(tx0.clone(), 17)) + }); + + let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); + pool.register_banking_stage( + banking_packet_receiver, + fixed_banking_packet_handler, + poh_recorder.read().unwrap().new_recorder(), + ); + let context = SchedulingContext::for_production(bank.clone()); + + // Quickly take and return the scheduler so that this test can test the behavior while + // waiting for new session... + let scheduler = pool.take_scheduler(context.clone()); + let bank_tmp = BankWithScheduler::new(bank.clone(), Some(scheduler)); + assert_matches!(bank_tmp.wait_for_completed_scheduler(), Some((Ok(()), _))); + + // Send fake packet batch to trigger banking_packet_handler + assert_eq!(banking_packet_sender.len(), 0); + banking_packet_sender + .send(BankingPacketBatch::default()) + .unwrap(); + + // Confirm the banking packet channel is cleared, even before taking scheduler + sleepless_testing::at(TestCheckPoint::AfterNewBufferedTask); + assert_eq!(banking_packet_sender.len(), 0); + + assert_eq!(bank.transaction_count(), 0); + let scheduler = pool.take_scheduler(context); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + assert_eq!(bank.transaction_count(), 1); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + + #[test] + #[should_panic(expected = "register_banking_stage() isn't called yet")] + fn test_block_production_scheduler_take_without_registering() { + solana_logger::setup(); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let bank = Arc::new(Bank::default_for_tests()); + let context = &SchedulingContext::for_production(bank); + let scheduler = pool.do_take_scheduler(context.clone()); + Box::new(scheduler.into_inner().1).return_to_pool(); + } + + #[test] + #[should_panic(expected = "cannot take: Taken(0)")] + fn test_block_production_scheduler_double_take_without_returning() { + solana_logger::setup(); + + let GenesisConfigInfo { genesis_config, .. } = + solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = setup_dummy_fork_graph(bank); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); + pool.register_banking_stage( + banking_packet_receiver, + Box::new(|_, _| {}), + poh_recorder.read().unwrap().new_recorder(), + ); + let context = SchedulingContext::for_production(bank.clone()); + + let scheduler1 = pool.do_take_scheduler(context.clone()); + let scheduler2 = pool.do_take_scheduler(context); + + Box::new(scheduler1.into_inner().1).return_to_pool(); + Box::new(scheduler2.into_inner().1).return_to_pool(); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + + #[test] + fn test_block_production_scheduler_drop_overgrown() { + solana_logger::setup(); + + let GenesisConfigInfo { genesis_config, .. } = + solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = setup_dummy_fork_graph(bank); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + const REDUCED_MAX_USAGE_QUEUE_COUNT: usize = 0; + let pool = DefaultSchedulerPool::do_new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + SHORTENED_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, + REDUCED_MAX_USAGE_QUEUE_COUNT, + DEFAULT_TIMEOUT_DURATION, + ); + + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + + let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); + pool.register_banking_stage( + banking_packet_receiver, + Box::new(|_, _| unreachable!()), + poh_recorder.read().unwrap().new_recorder(), + ); + + let context = SchedulingContext::for_production(bank); + let scheduler = pool.do_take_scheduler(context.clone()); + let trashed_old_scheduler_id = scheduler.id(); + + // Make scheduler overgrown and trash it by returning + scheduler + .inner + .usage_queue_loader + .load(Pubkey::new_unique()); + Box::new(scheduler.into_inner().1).return_to_pool(); + + // Re-take a brand-new one + let scheduler = pool.do_take_scheduler(context); + let respawned_new_scheduler_id = scheduler.id(); + Box::new(scheduler.into_inner().1).return_to_pool(); + + assert_ne!(trashed_old_scheduler_id, respawned_new_scheduler_id); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + + #[test] + fn test_block_production_scheduler_return_block_verification_scheduler_while_pooled() { + solana_logger::setup(); + + let GenesisConfigInfo { genesis_config, .. } = + solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let pool = + DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + + let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + pool.register_banking_stage( + banking_packet_receiver, + Box::new(|_, _| {}), + poh_recorder.read().unwrap().new_recorder(), + ); + + // Make sure the assertion in BlockProdutionSchedulerInner::can_put() doesn't cause false + // positives... + let context = SchedulingContext::for_verification(bank.clone()); + let scheduler = pool.take_scheduler(context); + let bank_tmp = BankWithScheduler::new(bank, Some(scheduler)); + assert_matches!(bank_tmp.wait_for_completed_scheduler(), Some((Ok(()), _))); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } } From e1fd517c2b3d24a6bcd4231e0bd6586779ac0c8a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 14 Feb 2025 22:11:08 +0900 Subject: [PATCH 2/6] Clean up comments and tests --- runtime/src/bank_forks.rs | 8 +++---- runtime/src/installed_scheduler_pool.rs | 3 ++- unified-scheduler-pool/src/lib.rs | 30 +++++++++++++++---------- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index cb4e175787fe9c..bc4c2d7ada7f76 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -246,11 +246,11 @@ impl BankForks { let context = SchedulingContext::new_with_mode(mode, bank.clone()); let scheduler = scheduler_pool.take_scheduler(context); let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler)); + // Skip registering for block production. Both the tvu main loop in the replay stage + // and PohRecorder don't support _concurrent block production_ at all. It's strongly + // assumed that block is produced in singleton way and it's actually desired, while + // ignoring the opportunity cost of (hopefully rare!) fork switching... if matches!(mode, SchedulingMode::BlockVerification) { - // Skip registering for block production. Both the replay stage and PohRecorder - // don't support _concurrent block production_ at all. It's strongly assumed that - // block is produced in singleton way and it's actually desired, while ignoring the - // opportunity cost of (hopefully rare!) fork switching... scheduler_pool .register_timeout_listener(bank_with_scheduler.create_timeout_listener()); } diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 2e989406b083ec..5359b9b9c0f634 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -578,7 +578,8 @@ impl BankWithSchedulerInner { let pool = pool.clone(); drop(scheduler); - // Schedulers can be stale only if its mode is block-verification. + // Schedulers can be stale only if its mode is block-verification. So, + // unconditional context construction for verification is okay here. let context = SchedulingContext::for_verification(self.bank.clone()); let mut scheduler = self.scheduler.write().unwrap(); trace!("with_active_scheduler: {:?}", scheduler); diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index bafc0b30d8b7d0..90da6c0e202fd3 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -138,8 +138,9 @@ impl, TH: TaskHandler> BlockProdutionSchedulerInner false, Self::Pooled(inner) => { - // `returned` must be a block-verification scheduler if there's already a block- - // production scheduler. So, return false with following assert_ne!(). + // the given `returned` inner must be a block-verification scheduler if there's + // already a pooled block-production scheduler inner here. So, return `false` after + // sanity check to detect double `put` intention with following `assert_ne!()`. assert_ne!(inner.id(), returned.id()); false } @@ -497,9 +498,10 @@ where // the availability of pooled block production scheduler by re-spawning one. if block_production_scheduler_inner.can_put(&scheduler) { block_production_scheduler_inner.trash_taken(); - // To prevent block-production scheduler is taken from do_take_resumed_scheduler() - // by different thread at this very moment, `.trash_taken()` and `.put_spawned()` - // must be done atomically. That's why we pass around MutexGuard into + // To prevent block-production scheduler from being taken in + // do_take_resumed_scheduler() by different thread at this very moment, the + // preceding `.trash_taken()` and following `.put_spawned()` must be done + // atomically. That's why we pass around MutexGuard into // spawn_block_production_scheduler(). self.spawn_block_production_scheduler(&mut block_production_scheduler_inner); } @@ -3537,6 +3539,7 @@ mod tests { mint_keypair, .. } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); @@ -3557,6 +3560,7 @@ mod tests { ); pool.register_banking_stage( banking_packet_receiver, + // we don't use the banking packet channel in this test. so, pass panicking handler. Box::new(|_, _| unreachable!()), poh_recorder.read().unwrap().new_recorder(), ); @@ -3618,7 +3622,7 @@ mod tests { .unwrap(); assert_eq!(banking_packet_sender.len(), 1); - // A dummy handler which unconditionally sends tx0 back to the scheduler thread + // Create a dummy handler which unconditionally sends tx0 back to the scheduler thread let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_pubkey::new_rand(), @@ -3634,13 +3638,13 @@ mod tests { fixed_banking_packet_handler, poh_recorder.read().unwrap().new_recorder(), ); - let context = SchedulingContext::for_production(bank.clone()); // Confirm the banking packet channel is cleared, even before taking scheduler sleepless_testing::at(TestCheckPoint::AfterNewBufferedTask); assert_eq!(banking_packet_sender.len(), 0); assert_eq!(bank.transaction_count(), 0); + let context = SchedulingContext::for_production(bank.clone()); let scheduler = pool.take_scheduler(context); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); @@ -3655,7 +3659,7 @@ mod tests { solana_logger::setup(); let _progress = sleepless_testing::setup(&[ - &CheckPoint::NewBufferedTask(17), + &CheckPoint::NewBufferedTask(18), &TestCheckPoint::AfterNewBufferedTask, ]); @@ -3682,7 +3686,7 @@ mod tests { Some(leader_schedule_cache), ); - // A dummy handler which unconditionally sends tx0 back to the scheduler thread + // Create a dummy handler which unconditionally sends tx0 back to the scheduler thread let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_pubkey::new_rand(), @@ -3691,7 +3695,7 @@ mod tests { )); let fixed_banking_packet_handler = Box::new(move |helper: &BankingStageHelper, _banking_packet| { - helper.send_new_task(helper.create_new_task(tx0.clone(), 17)) + helper.send_new_task(helper.create_new_task(tx0.clone(), 18)) }); let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); @@ -3700,10 +3704,10 @@ mod tests { fixed_banking_packet_handler, poh_recorder.read().unwrap().new_recorder(), ); - let context = SchedulingContext::for_production(bank.clone()); // Quickly take and return the scheduler so that this test can test the behavior while // waiting for new session... + let context = SchedulingContext::for_production(bank.clone()); let scheduler = pool.take_scheduler(context.clone()); let bank_tmp = BankWithScheduler::new(bank.clone(), Some(scheduler)); assert_matches!(bank_tmp.wait_for_completed_scheduler(), Some((Ok(()), _))); @@ -3772,8 +3776,8 @@ mod tests { Box::new(|_, _| {}), poh_recorder.read().unwrap().new_recorder(), ); - let context = SchedulingContext::for_production(bank.clone()); + let context = SchedulingContext::for_production(bank.clone()); let scheduler1 = pool.do_take_scheduler(context.clone()); let scheduler2 = pool.do_take_scheduler(context); @@ -3841,6 +3845,7 @@ mod tests { let respawned_new_scheduler_id = scheduler.id(); Box::new(scheduler.into_inner().1).return_to_pool(); + // id should be different assert_ne!(trashed_old_scheduler_id, respawned_new_scheduler_id); exit.store(true, Ordering::Relaxed); @@ -3855,6 +3860,7 @@ mod tests { solana_ledger::genesis_utils::create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); From 78f37391ac7317f7a56b66a314f2d27df78e5312 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 14 Feb 2025 22:15:27 +0900 Subject: [PATCH 3/6] Add doc comments to SchedulingStateMachine --- unified-scheduler-logic/src/lib.rs | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 302b867c11f1cb..ab986007ad258e 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -669,27 +669,47 @@ impl SchedulingStateMachine { /// indicating the scheduled task is blocked currently. /// /// Note that this function takes ownership of the task to allow for future optimizations. - #[cfg(test)] + #[cfg(any(test, doc))] #[must_use] pub fn schedule_task(&mut self, task: Task) -> Option { self.schedule_or_buffer_task(task, false) } + /// Adds given `task` to internal buffer, even if it's immediately schedulable otherwise. + /// + /// Put differently, buffering means to force the task to be blocked unconditionally after + /// normal scheduling processing. + /// + /// Thus, the task is internally retained inside this [`SchedulingStateMachine`], whether the + /// task is blocked or not. Eventually, the buffered task will be returned by one of later + /// invocations [`schedule_next_unblocked_task()`](Self::schedule_next_unblocked_task). + /// + /// Note that this function takes ownership of the task to allow for future optimizations. pub fn buffer_task(&mut self, task: Task) { self.schedule_or_buffer_task(task, true).unwrap_none(); } + /// Schedules or buffers given `task`, returning successful one unless buffering is forced. + /// + /// Refer to [`schedule_task()`](Self::schedule_task) and + /// [`buffer_task()`](Self::buffer_task) for the difference between _scheduling_ and + /// _buffering_ respectively. + /// + /// Note that this function takes ownership of the task to allow for future optimizations. #[must_use] pub fn schedule_or_buffer_task(&mut self, task: Task, force_buffering: bool) -> Option { self.total_task_count.increment_self(); self.active_task_count.increment_self(); self.try_lock_usage_queues(task).and_then(|task| { - if !force_buffering { - Some(task) - } else { + // locking succeeded, and then ... + if force_buffering { + // ... push to unblocked_task_queue, if buffering is forced. self.unblocked_task_count.increment_self(); self.unblocked_task_queue.push_back(task); None + } else { + // ... return the task back as schedulable to the caller as-is otherwise. + Some(task) } }) } From 7636f40dd5e233e72f7b7234004a607e2903f448 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 14 Feb 2025 22:29:09 +0900 Subject: [PATCH 4/6] Make SchedulingContext::bank() return Option<_> --- runtime/src/installed_scheduler_pool.rs | 17 ++++++++++++++--- unified-scheduler-pool/src/lib.rs | 6 +++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 5359b9b9c0f634..4dec084c057e89 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -235,6 +235,9 @@ pub type SchedulerId = u64; /// expected to be used by a particular scheduler only for that duration of the time and to be /// disposed by the scheduler. Then, the scheduler may work on different banks with new /// `SchedulingContext`s. +/// +/// There's a special construction only used for scheduler preallocation, which has no bank. Panics +/// will be triggered when tried to be used normally across code-base. #[derive(Clone, Debug)] pub struct SchedulingContext { mode: SchedulingMode, @@ -274,8 +277,8 @@ impl SchedulingContext { self.mode } - pub fn bank(&self) -> &Arc { - self.bank.as_ref().unwrap() + pub fn bank(&self) -> Option<&Arc> { + self.bank.as_ref() } pub fn slot(&self) -> Option { @@ -432,11 +435,19 @@ pub struct BankWithSchedulerInner { pub type InstalledSchedulerRwLock = RwLock; impl BankWithScheduler { + /// Creates a new `BankWithScheduler` from bank and its associated scheduler. + /// + /// # Panics + /// + /// Panics if `scheduler`'s scheduling context is unmatched to given bank or for scheduler + /// preallocation. #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] pub(crate) fn new(bank: Arc, scheduler: Option) -> Self { + // Avoid the fatal situation in which bank is being associated with a scheduler associated + // to a different bank! if let Some(bank_in_context) = scheduler .as_ref() - .map(|scheduler| scheduler.context().bank()) + .map(|scheduler| scheduler.context().bank().unwrap()) { assert!(Arc::ptr_eq(&bank, bank_in_context)); } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 90da6c0e202fd3..f37b006396f561 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -722,7 +722,7 @@ impl TaskHandler for DefaultTaskHandler { ) { // scheduler must properly prevent conflicting tx executions. thus, task handler isn't // responsible for locking. - let bank = scheduling_context.bank(); + let bank = scheduling_context.bank().unwrap(); let transaction = task.transaction(); let index = task.task_index(); @@ -2612,7 +2612,7 @@ mod tests { let scheduler = pool.take_scheduler(new_context.clone()); assert_eq!(scheduler_id, scheduler.id()); - assert!(Arc::ptr_eq(scheduler.context().bank(), new_bank)); + assert!(Arc::ptr_eq(scheduler.context().bank().unwrap(), new_bank)); } #[test] @@ -3060,7 +3060,7 @@ mod tests { _handler_context: &HandlerContext, ) { // The task index must always be matched to the slot. - assert_eq!(task.task_index() as Slot, context.bank().slot()); + assert_eq!(task.task_index() as Slot, context.slot().unwrap()); } } From 47d09bf6a74494582cb36c0f297303361c9ecbac Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 17 Feb 2025 14:26:19 +0900 Subject: [PATCH 5/6] kick ci From 8697c30b6b1ec65dcc48021435932c507034c477 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 17 Feb 2025 16:01:25 +0900 Subject: [PATCH 6/6] kick ci