From bc38ed0f2f8ba2c4690e0d0e251aeb2acce308ca Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 19 Jan 2025 14:42:01 +0000 Subject: [PATCH] fix/avoid-suppress-manual-compaction: ### Add Support for Manual Compaction Requests - **Compaction Logic Enhancements**: - Updated `CompactionScheduler` in `compaction.rs` to handle manual compaction requests using `Options::StrictWindow`. - Introduced `PendingCompaction` struct to manage pending manual compaction requests. - Added logic to reschedule manual compaction requests once the current compaction task is completed. - **Testing**: - Added `test_manual_compaction_when_compaction_in_progress` to verify the handling of manual compaction requests during ongoing compaction processes. These changes enhance the compaction scheduling mechanism by allowing manual compaction requests to be queued and processed efficiently. --- src/mito2/src/compaction.rs | 182 ++++++++++++++++++++++++++++++++++-- 1 file changed, 176 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 24c6dbe2b81c..06d8ce2da84b 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use std::time::Instant; use api::v1::region::compact_request; +use api::v1::region::compact_request::Options; use common_base::Plugins; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, error, info, warn}; @@ -140,8 +141,20 @@ impl CompactionScheduler { schema_metadata_manager: SchemaMetadataManagerRef, ) -> Result<()> { if let Some(status) = self.region_status.get_mut(®ion_id) { - // Region is compacting. Add the waiter to pending list. - status.merge_waiter(waiter); + match compact_options { + Options::Regular(_) => { + // Region is compacting. Add the waiter to pending list. + status.merge_waiter(waiter); + } + options @ Options::StrictWindow(_) => { + // Incoming compaction request is manually triggered. + status.set_pending_request(PendingCompaction { options, waiter }); + info!( + "Region {} is compacting, manually compaction will be re-scheduled.", + region_id + ); + } + } return Ok(()); } @@ -177,6 +190,30 @@ impl CompactionScheduler { return; }; + if let Some(pending_request) = std::mem::take(&mut status.pending_request) { + let PendingCompaction { options, waiter } = pending_request; + + let request = status.new_compaction_request( + self.request_sender.clone(), + waiter, + self.engine_config.clone(), + self.cache_manager.clone(), + manifest_ctx, + self.listener.clone(), + schema_metadata_manager, + ); + + if let Err(e) = self.schedule_compaction_request(request, options).await { + error!(e; "Failed to continue pending manual compaction for region id: {}", region_id); + } else { + debug!( + "Successfully scheduled manual compaction for region id: {}", + region_id + ); + } + return; + } + // We should always try to compact the region until picker returns None. let request = status.new_compaction_request( self.request_sender.clone(), @@ -445,6 +482,8 @@ struct CompactionStatus { access_layer: AccessLayerRef, /// Pending waiters for compaction. waiters: Vec, + /// Pending compactions that are supposed to run as soon as current compaction task finished. + pending_request: Option, } impl CompactionStatus { @@ -459,6 +498,7 @@ impl CompactionStatus { version_control, access_layer, waiters: Vec::new(), + pending_request: None, } } @@ -469,9 +509,21 @@ impl CompactionStatus { } } + /// Set pending compaction request or replace current value if already exist. + fn set_pending_request(&mut self, pending: PendingCompaction) { + if let Some(prev) = self.pending_request.replace(pending) { + debug!( + "Replace pending compaction options with new request {:?} for region: {}", + prev.options, self.region_id + ); + } + } + fn on_failure(mut self, err: Arc) { for waiter in self.waiters.drain(..) { - waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id:self.region_id })); + waiter.send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); } } @@ -647,9 +699,20 @@ fn get_expired_ssts( .collect() } +/// Pending compaction request that is supposed to run after current task is finished, +/// typically used for manual compactions. +struct PendingCompaction { + /// Compaction options. Currently, it can only be [StrictWindow]. + pub(crate) options: compact_request::Options, + /// Waiters of pending requests. + pub(crate) waiter: OptionOutputTx, +} + #[cfg(test)] mod tests { + use api::v1::region::StrictWindow; use tokio::sync::oneshot; + use super::*; use crate::test_util::mock_schema_metadata_manager; use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler}; @@ -802,7 +865,8 @@ mod tests { .region_status .get(&builder.region_id()) .unwrap() - .waiters.is_empty()); + .waiters + .is_empty()); // On compaction finished and schedule next compaction. scheduler @@ -811,7 +875,6 @@ mod tests { assert_eq!(1, scheduler.region_status.len()); assert_eq!(2, job_scheduler.num_jobs()); - // 5 files for next compaction. apply_edit( &version_control, @@ -837,6 +900,113 @@ mod tests { assert!(!scheduler .region_status .get(&builder.region_id()) - .unwrap().waiters.is_empty()); + .unwrap() + .waiters + .is_empty()); + } + + #[tokio::test] + async fn test_manual_compaction_when_compaction_in_progress() { + common_telemetry::init_default_ut_logging(); + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let purger = builder.file_purger(); + let region_id = builder.region_id(); + + let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); + schema_metadata_manager + .register_region_table_info( + builder.region_id().table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + kv_backend, + ) + .await; + + // 5 files to compact. + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + + let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0] + .files + .values() + .map(|file| file.meta_ref().clone()) + .collect(); + + // 5 files for next compaction and removes old files. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &file_metas, + purger.clone(), + ); + + scheduler + .schedule_compaction( + region_id, + compact_request::Options::Regular(Default::default()), + &version_control, + &env.access_layer, + OptionOutputTx::none(), + &manifest_ctx, + schema_metadata_manager.clone(), + ) + .await + .unwrap(); + // Should schedule 1 compaction. + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + assert!(scheduler + .region_status + .get(®ion_id) + .unwrap() + .pending_request + .is_none()); + + // Schedule another manual compaction. + let (tx, _rx) = oneshot::channel(); + scheduler + .schedule_compaction( + region_id, + compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }), + &version_control, + &env.access_layer, + OptionOutputTx::new(Some(OutputTx::new(tx))), + &manifest_ctx, + schema_metadata_manager.clone(), + ) + .await + .unwrap(); + assert_eq!(1, scheduler.region_status.len()); + // Current job num should be 1 since compaction is in progress. + assert_eq!(1, job_scheduler.num_jobs()); + let status = scheduler.region_status.get(&builder.region_id()).unwrap(); + assert!(status.pending_request.is_some()); + + // On compaction finished and schedule next compaction. + scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone()) + .await; + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(2, job_scheduler.num_jobs()); + + let status = scheduler.region_status.get(&builder.region_id()).unwrap(); + assert!(status.pending_request.is_none()); } }