Skip to content

Commit

Permalink
fix(query): support vacuum temp files for create and refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Feb 7, 2025
1 parent 3c1c34d commit 78ccaf9
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 9 deletions.
14 changes: 14 additions & 0 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_catalog::lock::LockTableOption;
use databend_common_catalog::table::CompactionLimits;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_core::always_callback;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::MutationKind;
Expand All @@ -32,6 +33,9 @@ use log::info;

use crate::interpreters::common::metrics_inc_compact_hook_compact_time_ms;
use crate::interpreters::common::metrics_inc_compact_hook_main_operation_time_ms;
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
use crate::interpreters::Interpreter;
use crate::interpreters::OptimizeCompactBlockInterpreter;
use crate::interpreters::ReclusterTableInterpreter;
Expand Down Expand Up @@ -176,6 +180,16 @@ async fn compact_table(
let mut build_res = compact_interpreter.execute2().await?;
// execute the compact pipeline
if build_res.main_pipeline.is_complete_pipeline()? {
let query_ctx = ctx.clone();
build_res.main_pipeline.set_on_finished(always_callback(
move |_info: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
},
));

build_res.set_max_threads(settings.get_max_threads()? as usize);
let executor_settings = ExecutorSettings::try_create(ctx.clone())?;

Expand Down
34 changes: 34 additions & 0 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_meta_app::schema::IndexMeta;
use databend_common_meta_app::schema::ListIndexesByIdReq;
use databend_common_meta_app::schema::ListVirtualColumnsReq;
use databend_common_meta_types::MetaId;
use databend_common_pipeline_core::always_callback;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::plans::Plan;
Expand All @@ -37,6 +38,9 @@ use databend_storages_common_table_meta::meta::Location;
use log::info;
use parking_lot::RwLock;

use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
use crate::interpreters::Interpreter;
use crate::interpreters::RefreshIndexInterpreter;
use crate::interpreters::RefreshTableIndexInterpreter;
Expand Down Expand Up @@ -130,6 +134,16 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;

if build_res.main_pipeline.is_complete_pipeline()? {
let query_ctx = ctx_cloned.clone();
build_res.main_pipeline.set_on_finished(always_callback(
move |_: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
},
));

let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

Expand Down Expand Up @@ -157,6 +171,16 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;

if build_res.main_pipeline.is_complete_pipeline()? {
let query_ctx = ctx_cloned.clone();
build_res.main_pipeline.set_on_finished(always_callback(
move |_info: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
},
));

let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

Expand Down Expand Up @@ -184,6 +208,16 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
let settings = ExecutorSettings::try_create(ctx_cloned.clone())?;

if build_res.main_pipeline.is_complete_pipeline()? {
let query_ctx = ctx_cloned.clone();
build_res.main_pipeline.set_on_finished(always_callback(
move |_: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
},
));

let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/interpreters/hook/vacuum_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,11 @@ pub fn hook_disk_temp_dir(query_ctx: &Arc<QueryContext>) -> Result<()> {

Ok(())
}

pub fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
let _ = GlobalIORuntime::instance().block_on(async move {
query_ctx.drop_m_cte_temp_table().await?;
Ok(())
});
Ok(())
}
10 changes: 1 addition & 9 deletions src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_base::base::short_sql;
use databend_common_base::runtime::profile::get_statistics_desc;
use databend_common_base::runtime::profile::ProfileDesc;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
Expand All @@ -50,6 +49,7 @@ use log::info;
use md5::Digest;
use md5::Md5;

use super::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use super::hook::vacuum_hook::hook_disk_temp_dir;
use super::hook::vacuum_hook::hook_vacuum_temp_files;
use super::InterpreterMetrics;
Expand Down Expand Up @@ -360,11 +360,3 @@ fn need_acquire_lock(ctx: Arc<QueryContext>, stmt: &Statement) -> bool {
_ => false,
}
}

fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
let _ = GlobalIORuntime::instance().block_on(async move {
query_ctx.drop_m_cte_temp_table().await?;
Ok(())
});
Ok(())
}
13 changes: 13 additions & 0 deletions src/query/service/src/interpreters/interpreter_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_app::schema::TableStatistics;
use databend_common_meta_types::MatchSeq;
use databend_common_pipeline_core::always_callback;
use databend_common_pipeline_core::ExecutionInfo;
use databend_common_sql::field_default_value;
use databend_common_sql::plans::CreateTablePlan;
Expand All @@ -63,6 +64,9 @@ use crate::interpreters::common::table_option_validation::is_valid_create_opt;
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
use crate::interpreters::hook::vacuum_hook::hook_disk_temp_dir;
use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
use crate::interpreters::InsertInterpreter;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -242,6 +246,15 @@ impl CreateTableInterpreter {
let db_name = self.plan.database.clone();
let table_name = self.plan.table.clone();

let query_ctx = self.ctx.clone();
pipeline
.main_pipeline
.set_on_finished(always_callback(move |_: &ExecutionInfo| {
hook_clear_m_cte_temp_table(&query_ctx)?;
hook_vacuum_temp_files(&query_ctx)?;
hook_disk_temp_dir(&query_ctx)?;
Ok(())
}));
// Add a callback to restore table visibility upon successful insert pipeline completion.
// As there might be previous on_finish callbacks(e.g. refresh/compact/re-cluster hooks) which
// depend on the table being visible, this callback is added at the beginning of the on_finish
Expand Down

0 comments on commit 78ccaf9

Please sign in to comment.