diff --git a/src/benchmarklib/benchmark_runner.cpp b/src/benchmarklib/benchmark_runner.cpp index 0867114ad3..8e9dd1414b 100644 --- a/src/benchmarklib/benchmark_runner.cpp +++ b/src/benchmarklib/benchmark_runner.cpp @@ -139,7 +139,9 @@ void BenchmarkRunner::_execute_query(const NamedQuery& named_query) { const auto& name = named_query.first; const auto& sql = named_query.second; - auto pipeline = SQLPipelineBuilder{sql}.with_mvcc(_config.use_mvcc).create_pipeline(); + auto pipeline_builder = SQLPipelineBuilder{sql}.with_mvcc(_config.use_mvcc); + if (_config.enable_visualization) pipeline_builder.dont_cleanup_temporaries(); + auto pipeline = pipeline_builder.create_pipeline(); // Execute the query, we don't care about the results pipeline.get_result_table(); diff --git a/src/bin/console/console.cpp b/src/bin/console/console.cpp index 2cc902291a..3f8071d837 100644 --- a/src/bin/console/console.cpp +++ b/src/bin/console/console.cpp @@ -233,8 +233,10 @@ int Console::_eval_command(const CommandFunction& func, const std::string& comma bool Console::_initialize_pipeline(const std::string& sql) { try { if (_explicitly_created_transaction_context != nullptr) { + // We want to keep the temporary tables for debugging and visualization _sql_pipeline = std::make_unique(SQLPipelineBuilder{sql} + .dont_cleanup_temporaries() .with_prepared_statement_cache(_prepared_statements) .with_transaction_context(_explicitly_created_transaction_context) .create_pipeline()); @@ -554,7 +556,7 @@ int Console::visualize(const std::string& input) { } else { // Visualize the Physical Query Plan - SQLQueryPlan query_plan; + SQLQueryPlan query_plan{CleanupTemporaries::No}; try { if (!no_execute) { diff --git a/src/lib/operators/abstract_operator.cpp b/src/lib/operators/abstract_operator.cpp index 232bec99a2..ec3dd6f3a8 100644 --- a/src/lib/operators/abstract_operator.cpp +++ b/src/lib/operators/abstract_operator.cpp @@ -70,6 +70,8 @@ std::shared_ptr AbstractOperator::get_output() const { return _output; } +void AbstractOperator::clear_output() { _output = nullptr; } + const std::string AbstractOperator::description(DescriptionMode description_mode) const { return name(); } std::shared_ptr AbstractOperator::recreate(const std::vector& args) const { diff --git a/src/lib/operators/abstract_operator.hpp b/src/lib/operators/abstract_operator.hpp index d0c4ca5afb..6d9eb3a757 100644 --- a/src/lib/operators/abstract_operator.hpp +++ b/src/lib/operators/abstract_operator.hpp @@ -77,8 +77,13 @@ class AbstractOperator : public std::enable_shared_from_this, virtual void execute(); // returns the result of the operator + // When using OperatorTasks, they automatically clear this once all successors are done. This reduces the number of + // temporary tables. std::shared_ptr get_output() const; + // clears the output of this operator to free up space + void clear_output(); + virtual const std::string name() const = 0; virtual const std::string description(DescriptionMode description_mode = DescriptionMode::SingleLine) const; @@ -152,8 +157,6 @@ class AbstractOperator : public std::enable_shared_from_this, std::optional> _transaction_context; BaseOperatorPerformanceData _base_performance_data; - - std::weak_ptr _operator_task; }; } // namespace opossum diff --git a/src/lib/operators/aggregate.cpp b/src/lib/operators/aggregate.cpp index 96ac8bf7a9..7a52c6f29a 100644 --- a/src/lib/operators/aggregate.cpp +++ b/src/lib/operators/aggregate.cpp @@ -74,6 +74,11 @@ std::shared_ptr Aggregate::_on_recreate( return std::make_shared(recreated_input_left, _aggregates, _groupby_column_ids); } +void Aggregate::_on_cleanup() { + _contexts_per_column.clear(); + _keys_per_chunk.clear(); +} + /* Visitor context for the partitioning/grouping visitor */ @@ -584,10 +589,10 @@ std::shared_ptr Aggregate::_on_execute() { } // Write the output - _output = std::make_shared(_output_column_definitions, TableType::Data); - _output->append_chunk(_output_columns); + auto output = std::make_shared
(_output_column_definitions, TableType::Data); + output->append_chunk(_output_columns); - return _output; + return output; } /* diff --git a/src/lib/operators/aggregate.hpp b/src/lib/operators/aggregate.hpp index 2623064f17..2e94c24cb2 100644 --- a/src/lib/operators/aggregate.hpp +++ b/src/lib/operators/aggregate.hpp @@ -98,6 +98,8 @@ class Aggregate : public AbstractReadOnlyOperator { const std::vector& args, const std::shared_ptr& recreated_input_left, const std::shared_ptr& recreated_input_right) const override; + void _on_cleanup() override; + template static void _create_aggregate_context(boost::hana::basic_type type, std::shared_ptr& aggregate_context, @@ -125,7 +127,6 @@ class Aggregate : public AbstractReadOnlyOperator { const std::vector _aggregates; const std::vector _groupby_column_ids; - std::shared_ptr
_output; TableColumnDefinitions _output_column_definitions; ChunkColumns _output_columns; diff --git a/src/lib/operators/delete.cpp b/src/lib/operators/delete.cpp index e49f6626e4..f4e518ba40 100644 --- a/src/lib/operators/delete.cpp +++ b/src/lib/operators/delete.cpp @@ -51,6 +51,8 @@ std::shared_ptr Delete::_on_execute(std::shared_ptrrow_count(); + return nullptr; } @@ -66,12 +68,10 @@ void Delete::_on_commit_records(const CommitID cid) { } void Delete::_finish_commit() { - const auto num_rows_deleted = input_table_left()->row_count(); - const auto table_statistics = _table->table_statistics(); if (table_statistics) { _table->set_table_statistics(std::make_shared(table_statistics->table_type(), - table_statistics->row_count() - num_rows_deleted, + table_statistics->row_count() - _num_rows_deleted, table_statistics->column_statistics())); } } diff --git a/src/lib/operators/delete.hpp b/src/lib/operators/delete.hpp index 80eb793d98..b28cb9ad98 100644 --- a/src/lib/operators/delete.hpp +++ b/src/lib/operators/delete.hpp @@ -42,5 +42,6 @@ class Delete : public AbstractReadWriteOperator { std::shared_ptr
_table; TransactionID _transaction_id; std::vector> _pos_lists; + uint64_t _num_rows_deleted; }; } // namespace opossum diff --git a/src/lib/operators/join_hash.cpp b/src/lib/operators/join_hash.cpp index c782fb8f7b..5de4b33e36 100644 --- a/src/lib/operators/join_hash.cpp +++ b/src/lib/operators/join_hash.cpp @@ -532,20 +532,20 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl { */ TableColumnDefinitions output_column_definitions; - auto _right_in_table = _right->get_output(); - auto _left_in_table = _left->get_output(); + auto right_in_table = _right->get_output(); + auto left_in_table = _left->get_output(); if (_inputs_swapped) { // Semi/Anti joins are always swapped but do not need the outer relation if (_mode == JoinMode::Semi || _mode == JoinMode::Anti) { - output_column_definitions = _right_in_table->column_definitions(); + output_column_definitions = right_in_table->column_definitions(); } else { output_column_definitions = - concatenated(_right_in_table->column_definitions(), _left_in_table->column_definitions()); + concatenated(right_in_table->column_definitions(), left_in_table->column_definitions()); } } else { output_column_definitions = - concatenated(_left_in_table->column_definitions(), _right_in_table->column_definitions()); + concatenated(left_in_table->column_definitions(), right_in_table->column_definitions()); } _output_table = std::make_shared
(output_column_definitions, TableType::References); @@ -559,8 +559,8 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl { // Pre-partitioning // Save chunk offsets into the input relation - size_t left_chunk_count = _left_in_table->chunk_count(); - size_t right_chunk_count = _right_in_table->chunk_count(); + size_t left_chunk_count = left_in_table->chunk_count(); + size_t right_chunk_count = right_in_table->chunk_count(); auto left_chunk_offsets = std::make_shared>(); auto right_chunk_offsets = std::make_shared>(); @@ -571,13 +571,13 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl { size_t offset_left = 0; for (ChunkID i{0}; i < left_chunk_count; ++i) { left_chunk_offsets->operator[](i) = offset_left; - offset_left += _left_in_table->get_chunk(i)->size(); + offset_left += left_in_table->get_chunk(i)->size(); } size_t offset_right = 0; for (ChunkID i{0}; i < right_chunk_count; ++i) { right_chunk_offsets->operator[](i) = offset_right; - offset_right += _right_in_table->get_chunk(i)->size(); + offset_right += right_in_table->get_chunk(i)->size(); } Timer performance_timer; @@ -592,10 +592,10 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl { This helps choosing a scheduler node for the radix phase (see below). */ // Scheduler note: parallelize this at some point. Currently, the amount of jobs would be too high - auto materialized_left = _materialize_input(_left_in_table, _column_ids.first, histograms_left); + auto materialized_left = _materialize_input(left_in_table, _column_ids.first, histograms_left); // 'keep_nulls' makes sure that the relation on the right materializes NULL values when executing an OUTER join. auto materialized_right = - _materialize_input(_right_in_table, _column_ids.second, histograms_right, keep_nulls); + _materialize_input(right_in_table, _column_ids.second, histograms_right, keep_nulls); // Radix Partitioning phase /* @@ -656,13 +656,13 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl { PosListsByColumn right_pos_lists_by_column; // left_pos_lists_by_column will only be needed if left is a reference table and being output - if (_left_in_table->type() == TableType::References && !only_output_right_input) { - left_pos_lists_by_column = setup_pos_lists_by_column(_left_in_table); + if (left_in_table->type() == TableType::References && !only_output_right_input) { + left_pos_lists_by_column = setup_pos_lists_by_column(left_in_table); } // right_pos_lists_by_column will only be needed if right is a reference table - if (_right_in_table->type() == TableType::References) { - right_pos_lists_by_column = setup_pos_lists_by_column(_right_in_table); + if (right_in_table->type() == TableType::References) { + right_pos_lists_by_column = setup_pos_lists_by_column(right_in_table); } for (size_t partition_id = 0; partition_id < left_pos_lists.size(); ++partition_id) { @@ -679,15 +679,15 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl { // we need to swap back the inputs, so that the order of the output columns is not harmed if (_inputs_swapped) { - write_output_columns(output_columns, _right_in_table, right_pos_lists_by_column, right); + write_output_columns(output_columns, right_in_table, right_pos_lists_by_column, right); // Semi/Anti joins are always swapped but do not need the outer relation if (!only_output_right_input) { - write_output_columns(output_columns, _left_in_table, left_pos_lists_by_column, left); + write_output_columns(output_columns, left_in_table, left_pos_lists_by_column, left); } } else { - write_output_columns(output_columns, _left_in_table, left_pos_lists_by_column, left); - write_output_columns(output_columns, _right_in_table, right_pos_lists_by_column, right); + write_output_columns(output_columns, left_in_table, left_pos_lists_by_column, left); + write_output_columns(output_columns, right_in_table, right_pos_lists_by_column, right); } _output_table->append_chunk(output_columns); diff --git a/src/lib/operators/join_index.cpp b/src/lib/operators/join_index.cpp index fcd5c4009f..50cd487d22 100644 --- a/src/lib/operators/join_index.cpp +++ b/src/lib/operators/join_index.cpp @@ -357,4 +357,14 @@ void JoinIndex::_write_output_columns(ChunkColumns& output_columns, const std::s } } +void JoinIndex::_on_cleanup() { + _output_table.reset(); + _left_in_table.reset(); + _right_in_table.reset(); + _pos_list_left.reset(); + _pos_list_right.reset(); + _left_matches.clear(); + _right_matches.clear(); +} + } // namespace opossum diff --git a/src/lib/operators/join_index.hpp b/src/lib/operators/join_index.hpp index 790a84b3b1..3b5ef7296b 100644 --- a/src/lib/operators/join_index.hpp +++ b/src/lib/operators/join_index.hpp @@ -52,6 +52,8 @@ class JoinIndex : public AbstractJoinOperator { void _write_output_columns(ChunkColumns& output_columns, const std::shared_ptr input_table, std::shared_ptr pos_list); + void _on_cleanup() override; + std::shared_ptr
_output_table; std::shared_ptr _left_in_table; std::shared_ptr _right_in_table; diff --git a/src/lib/operators/join_nested_loop.cpp b/src/lib/operators/join_nested_loop.cpp index 818b2a1b27..af2ba037b1 100644 --- a/src/lib/operators/join_nested_loop.cpp +++ b/src/lib/operators/join_nested_loop.cpp @@ -262,4 +262,13 @@ void JoinNestedLoop::_write_output_chunks(ChunkColumns& columns, const std::shar } } +void JoinNestedLoop::_on_cleanup() { + _output_table.reset(); + _left_in_table.reset(); + _right_in_table.reset(); + _pos_list_left.reset(); + _pos_list_right.reset(); + _right_matches.clear(); +} + } // namespace opossum diff --git a/src/lib/operators/join_nested_loop.hpp b/src/lib/operators/join_nested_loop.hpp index 0e54196aeb..a5ae9dd0d0 100644 --- a/src/lib/operators/join_nested_loop.hpp +++ b/src/lib/operators/join_nested_loop.hpp @@ -37,6 +37,8 @@ class JoinNestedLoop : public AbstractJoinOperator { void _write_output_chunks(ChunkColumns& columns, const std::shared_ptr input_table, std::shared_ptr pos_list); + void _on_cleanup() override; + std::shared_ptr
_output_table; std::shared_ptr _left_in_table; std::shared_ptr _right_in_table; diff --git a/src/lib/operators/projection.cpp b/src/lib/operators/projection.cpp index 1d1451fee5..8853f4a7d8 100644 --- a/src/lib/operators/projection.cpp +++ b/src/lib/operators/projection.cpp @@ -136,7 +136,9 @@ std::shared_ptr Projection::_on_execute() { // For subselects, we need to execute the subquery in order to use the result table later if (column_expression->is_subselect() && !column_expression->has_subselect_table()) { - SQLQueryPlan query_plan; + // Because subqueries are still WIP, we do not clean up their temporary tables. Otherwise, we would have to + // somehow pass in cleanup_temporaries from the pipeline. + SQLQueryPlan query_plan{CleanupTemporaries::No}; query_plan.add_tree_by_root(column_expression->subselect_operator()); auto transaction_context = this->transaction_context(); diff --git a/src/lib/scheduler/abstract_task.cpp b/src/lib/scheduler/abstract_task.cpp index fb40f80064..68a01c30d4 100644 --- a/src/lib/scheduler/abstract_task.cpp +++ b/src/lib/scheduler/abstract_task.cpp @@ -18,7 +18,7 @@ TaskID AbstractTask::id() const { return _id; } NodeID AbstractTask::node_id() const { return _node_id; } -bool AbstractTask::is_ready() const { return _predecessor_counter == 0; } +bool AbstractTask::is_ready() const { return _pending_predecessors == 0; } bool AbstractTask::is_done() const { return _done; } @@ -33,10 +33,13 @@ void AbstractTask::set_id(TaskID id) { _id = id; } void AbstractTask::set_as_predecessor_of(std::shared_ptr successor) { DebugAssert((!_is_scheduled), "Possible race: Don't set dependencies after the Task was scheduled"); - successor->_on_predecessor_added(); + successor->_pending_predecessors++; _successors.emplace_back(successor); + successor->_predecessors.emplace_back(shared_from_this()); } +const std::vector>& AbstractTask::predecessors() const { return _predecessors; } + const std::vector>& AbstractTask::successors() const { return _successors; } void AbstractTask::set_node_id(NodeID node_id) { _node_id = node_id; } @@ -108,10 +111,8 @@ void AbstractTask::_mark_as_scheduled() { DebugAssert((!already_scheduled), "Task was already scheduled!"); } -void AbstractTask::_on_predecessor_added() { _predecessor_counter++; } - void AbstractTask::_on_predecessor_done() { - auto new_predecessor_count = --_predecessor_counter; // atomically decrement + auto new_predecessor_count = --_pending_predecessors; // atomically decrement if (new_predecessor_count == 0) { if (CurrentScheduler::is_set()) { auto worker = Worker::get_this_thread_worker(); diff --git a/src/lib/scheduler/abstract_task.hpp b/src/lib/scheduler/abstract_task.hpp index 79c20a2e8f..4aff0718da 100644 --- a/src/lib/scheduler/abstract_task.hpp +++ b/src/lib/scheduler/abstract_task.hpp @@ -58,6 +58,11 @@ class AbstractTask : public std::enable_shared_from_this { */ void set_as_predecessor_of(std::shared_ptr successor); + /** + * @return the predecessors of this Task + */ + const std::vector>& predecessors() const; + /** * @return the successors of this Task */ @@ -117,11 +122,6 @@ class AbstractTask : public std::enable_shared_from_this { */ void _join_without_replacement_worker(); - /** - * Called when a dependency is initialized (by set_as_predecessor_of) - */ - void _on_predecessor_added(); - /** * Called by a dependency when it finished execution */ @@ -133,7 +133,8 @@ class AbstractTask : public std::enable_shared_from_this { std::function _done_callback; // For dependencies - std::atomic_uint _predecessor_counter{0}; + std::atomic_uint _pending_predecessors{0}; + std::vector> _predecessors; std::vector> _successors; // For making sure a task gets only scheduled and enqueued once, respectively diff --git a/src/lib/scheduler/operator_task.cpp b/src/lib/scheduler/operator_task.cpp index 74b1460f87..25af3f4ec3 100644 --- a/src/lib/scheduler/operator_task.cpp +++ b/src/lib/scheduler/operator_task.cpp @@ -14,36 +14,38 @@ #include "scheduler/worker.hpp" namespace opossum { -OperatorTask::OperatorTask(std::shared_ptr op) : _op(std::move(op)) {} +OperatorTask::OperatorTask(std::shared_ptr op, CleanupTemporaries cleanup_temporaries) + : _op(std::move(op)), _cleanup_temporaries(cleanup_temporaries) {} std::string OperatorTask::description() const { return "OperatorTask with id: " + std::to_string(id()) + " for op: " + _op->description(); } const std::vector> OperatorTask::make_tasks_from_operator( - std::shared_ptr op) { + std::shared_ptr op, CleanupTemporaries cleanup_temporaries) { std::vector> tasks; std::unordered_map, std::shared_ptr> task_by_op; - OperatorTask::_add_tasks_from_operator(op, tasks, task_by_op); + OperatorTask::_add_tasks_from_operator(op, tasks, task_by_op, cleanup_temporaries); return tasks; } std::shared_ptr OperatorTask::_add_tasks_from_operator( std::shared_ptr op, std::vector>& tasks, - std::unordered_map, std::shared_ptr>& task_by_op) { + std::unordered_map, std::shared_ptr>& task_by_op, + CleanupTemporaries cleanup_temporaries) { const auto task_by_op_it = task_by_op.find(op); if (task_by_op_it != task_by_op.end()) return task_by_op_it->second; - const auto task = std::make_shared(op); + const auto task = std::make_shared(op, cleanup_temporaries); task_by_op.emplace(op, task); if (auto left = op->mutable_input_left()) { - auto subtree_root = OperatorTask::_add_tasks_from_operator(left, tasks, task_by_op); + auto subtree_root = OperatorTask::_add_tasks_from_operator(left, tasks, task_by_op, cleanup_temporaries); subtree_root->set_as_predecessor_of(task); } if (auto right = op->mutable_input_right()) { - auto subtree_root = OperatorTask::_add_tasks_from_operator(right, tasks, task_by_op); + auto subtree_root = OperatorTask::_add_tasks_from_operator(right, tasks, task_by_op, cleanup_temporaries); subtree_root->set_as_predecessor_of(task); } @@ -91,5 +93,25 @@ void OperatorTask::_on_execute() { context->rollback(); } + + // Get rid of temporary tables that are not needed anymore + // Because `clear_output` is only called by the successive OperatorTasks, we can be sure that no one cleans up the + // root (i.e., the final result) + if (_cleanup_temporaries == CleanupTemporaries::Yes) { + for (const auto& weak_predecessor : predecessors()) { + const auto predecessor = std::dynamic_pointer_cast(weak_predecessor.lock()); + DebugAssert(predecessor != nullptr, "predecessor of OperatorTask is not an OperatorTask itself"); + auto previous_operator_still_needed = false; + + for (const auto& successor : predecessor->successors()) { + if (successor.get() != this && !successor->is_done()) { + previous_operator_still_needed = true; + } + } + // If someone else still holds a shared_ptr to the table (e.g., a ReferenceColumn pointing to a materialized + // temporary table), it will not yet get deleted + if (!previous_operator_still_needed) predecessor->get_operator()->clear_output(); + } + } } } // namespace opossum diff --git a/src/lib/scheduler/operator_task.hpp b/src/lib/scheduler/operator_task.hpp index 83ef84072f..cb1137ad11 100644 --- a/src/lib/scheduler/operator_task.hpp +++ b/src/lib/scheduler/operator_task.hpp @@ -15,13 +15,14 @@ class AbstractOperator; */ class OperatorTask : public AbstractTask { public: - explicit OperatorTask(std::shared_ptr op); + // We don't like abbreviations, but "operator" is a keyword + OperatorTask(std::shared_ptr op, CleanupTemporaries cleanup_temporaries); /** * Create tasks recursively from result operator and set task dependencies automatically. */ static const std::vector> make_tasks_from_operator( - std::shared_ptr op); + std::shared_ptr op, CleanupTemporaries cleanup_temporaries); const std::shared_ptr& get_operator() const; @@ -36,9 +37,11 @@ class OperatorTask : public AbstractTask { */ static std::shared_ptr _add_tasks_from_operator( std::shared_ptr op, std::vector>& tasks, - std::unordered_map, std::shared_ptr>& task_by_op); + std::unordered_map, std::shared_ptr>& task_by_op, + CleanupTemporaries cleanup_temporaries); private: std::shared_ptr _op; + CleanupTemporaries _cleanup_temporaries; }; } // namespace opossum diff --git a/src/lib/sql/sql_pipeline.cpp b/src/lib/sql/sql_pipeline.cpp index 3815264a6b..3f1405bb63 100644 --- a/src/lib/sql/sql_pipeline.cpp +++ b/src/lib/sql/sql_pipeline.cpp @@ -11,7 +11,8 @@ namespace opossum { SQLPipeline::SQLPipeline(const std::string& sql, std::shared_ptr transaction_context, const UseMvcc use_mvcc, const std::shared_ptr& lqp_translator, - const std::shared_ptr& optimizer, const PreparedStatementCache& prepared_statements) + const std::shared_ptr& optimizer, const PreparedStatementCache& prepared_statements, + const CleanupTemporaries cleanup_temporaries) : _transaction_context(transaction_context), _optimizer(optimizer) { DebugAssert(!_transaction_context || _transaction_context->phase() == TransactionPhase::Active, "The transaction context cannot have been committed already."); @@ -73,9 +74,9 @@ SQLPipeline::SQLPipeline(const std::string& sql, std::shared_ptr(statement_string, std::move(parsed_statement), use_mvcc, - transaction_context, lqp_translator, optimizer, prepared_statements); + auto pipeline_statement = std::make_shared( + statement_string, std::move(parsed_statement), use_mvcc, transaction_context, lqp_translator, optimizer, + prepared_statements, cleanup_temporaries); _sql_pipeline_statements.push_back(std::move(pipeline_statement)); } diff --git a/src/lib/sql/sql_pipeline.hpp b/src/lib/sql/sql_pipeline.hpp index c77af96e2b..19e98f309f 100644 --- a/src/lib/sql/sql_pipeline.hpp +++ b/src/lib/sql/sql_pipeline.hpp @@ -45,7 +45,7 @@ class SQLPipeline : public Noncopyable { // Prefer using the SQLPipelineBuilder interface for constructing SQLPipelines conveniently SQLPipeline(const std::string& sql, std::shared_ptr transaction_context, const UseMvcc use_mvcc, const std::shared_ptr& lqp_translator, const std::shared_ptr& optimizer, - const PreparedStatementCache& prepared_statements); + const PreparedStatementCache& prepared_statements, const CleanupTemporaries cleanup_temporaries); // Returns the SQL string for each statement. const std::vector& get_sql_strings(); diff --git a/src/lib/sql/sql_pipeline_builder.cpp b/src/lib/sql/sql_pipeline_builder.cpp index c84ba9d09a..866c1ca562 100644 --- a/src/lib/sql/sql_pipeline_builder.cpp +++ b/src/lib/sql/sql_pipeline_builder.cpp @@ -35,11 +35,16 @@ SQLPipelineBuilder& SQLPipelineBuilder::with_transaction_context( SQLPipelineBuilder& SQLPipelineBuilder::disable_mvcc() { return with_mvcc(UseMvcc::No); } +SQLPipelineBuilder& SQLPipelineBuilder::dont_cleanup_temporaries() { + _cleanup_temporaries = CleanupTemporaries::No; + return *this; +} + SQLPipeline SQLPipelineBuilder::create_pipeline() const { auto lqp_translator = _lqp_translator ? _lqp_translator : std::make_shared(); auto optimizer = _optimizer ? _optimizer : Optimizer::create_default_optimizer(); - return {_sql, _transaction_context, _use_mvcc, lqp_translator, optimizer, _prepared_statements}; + return {_sql, _transaction_context, _use_mvcc, lqp_translator, optimizer, _prepared_statements, _cleanup_temporaries}; } SQLPipelineStatement SQLPipelineBuilder::create_pipeline_statement( @@ -47,7 +52,8 @@ SQLPipelineStatement SQLPipelineBuilder::create_pipeline_statement( auto lqp_translator = _lqp_translator ? _lqp_translator : std::make_shared(); auto optimizer = _optimizer ? _optimizer : Optimizer::create_default_optimizer(); - return {_sql, parsed_sql, _use_mvcc, _transaction_context, lqp_translator, optimizer, _prepared_statements}; + return {_sql, parsed_sql, _use_mvcc, _transaction_context, lqp_translator, + optimizer, _prepared_statements, _cleanup_temporaries}; } } // namespace opossum diff --git a/src/lib/sql/sql_pipeline_builder.hpp b/src/lib/sql/sql_pipeline_builder.hpp index 821cd6e85b..f994d9adbe 100644 --- a/src/lib/sql/sql_pipeline_builder.hpp +++ b/src/lib/sql/sql_pipeline_builder.hpp @@ -48,6 +48,11 @@ class SQLPipelineBuilder final { */ SQLPipelineBuilder& disable_mvcc(); + /* + * Keep temporary tables in the middle of the query plan for visualization and debugging + */ + SQLPipelineBuilder& dont_cleanup_temporaries(); + SQLPipeline create_pipeline() const; /** @@ -64,6 +69,7 @@ class SQLPipelineBuilder final { std::shared_ptr _lqp_translator; std::shared_ptr _optimizer; PreparedStatementCache _prepared_statements; + CleanupTemporaries _cleanup_temporaries{true}; }; } // namespace opossum diff --git a/src/lib/sql/sql_pipeline_statement.cpp b/src/lib/sql/sql_pipeline_statement.cpp index c5d1cfcad0..bc4bf6960a 100644 --- a/src/lib/sql/sql_pipeline_statement.cpp +++ b/src/lib/sql/sql_pipeline_statement.cpp @@ -22,7 +22,8 @@ SQLPipelineStatement::SQLPipelineStatement(const std::string& sql, std::shared_p const std::shared_ptr& transaction_context, const std::shared_ptr& lqp_translator, const std::shared_ptr& optimizer, - const PreparedStatementCache& prepared_statements) + const PreparedStatementCache& prepared_statements, + const CleanupTemporaries cleanup_temporaries) : _sql_string(sql), _use_mvcc(use_mvcc), _auto_commit(_use_mvcc == UseMvcc::Yes && !transaction_context), @@ -31,7 +32,8 @@ SQLPipelineStatement::SQLPipelineStatement(const std::string& sql, std::shared_p _optimizer(optimizer), _parsed_sql_statement(std::move(parsed_sql)), _metrics(std::make_shared()), - _prepared_statements(prepared_statements) { + _prepared_statements(prepared_statements), + _cleanup_temporaries(cleanup_temporaries) { Assert(!_parsed_sql_statement || _parsed_sql_statement->size() == 1, "SQLPipelineStatement must hold exactly one SQL statement"); DebugAssert(!_sql_string.empty(), "An SQLPipelineStatement should always contain a SQL statement string for caching"); @@ -129,7 +131,7 @@ const std::shared_ptr& SQLPipelineStatement::get_query_plan() { _transaction_context = TransactionManager::get().new_transaction_context(); } - _query_plan = std::make_shared(); + _query_plan = std::make_shared(_cleanup_temporaries); // Stores when the actual compilation started/ended auto started = std::chrono::high_resolution_clock::now(); @@ -216,7 +218,7 @@ const std::vector>& SQLPipelineStatement::get_task "Physical query plan creation returned no or more than one plan for a single statement."); const auto& root = query_plan->tree_roots().front(); - _tasks = OperatorTask::make_tasks_from_operator(root); + _tasks = OperatorTask::make_tasks_from_operator(root, _cleanup_temporaries); return _tasks; } diff --git a/src/lib/sql/sql_pipeline_statement.hpp b/src/lib/sql/sql_pipeline_statement.hpp index 45ca48a96b..52007d2e2a 100644 --- a/src/lib/sql/sql_pipeline_statement.hpp +++ b/src/lib/sql/sql_pipeline_statement.hpp @@ -45,7 +45,8 @@ class SQLPipelineStatement : public Noncopyable { SQLPipelineStatement(const std::string& sql, std::shared_ptr parsed_sql, const UseMvcc use_mvcc, const std::shared_ptr& transaction_context, const std::shared_ptr& lqp_translator, - const std::shared_ptr& optimizer, const PreparedStatementCache& prepared_statements); + const std::shared_ptr& optimizer, const PreparedStatementCache& prepared_statements, + const CleanupTemporaries cleanup_temporaries); // Returns the raw SQL string. const std::string& get_sql_string(); @@ -107,6 +108,9 @@ class SQLPipelineStatement : public Noncopyable { PreparedStatementCache _prepared_statements; // Number of placeholders in prepared statement; default 0 because we assume no prepared statement uint16_t _num_parameters = 0; + + // Delete temporary tables + const CleanupTemporaries _cleanup_temporaries; }; } // namespace opossum diff --git a/src/lib/sql/sql_query_plan.cpp b/src/lib/sql/sql_query_plan.cpp index e291a382f0..910f7f2b3f 100644 --- a/src/lib/sql/sql_query_plan.cpp +++ b/src/lib/sql/sql_query_plan.cpp @@ -8,7 +8,8 @@ namespace opossum { -SQLQueryPlan::SQLQueryPlan() : _num_parameters(0) {} +SQLQueryPlan::SQLQueryPlan(CleanupTemporaries cleanup_temporaries) + : _cleanup_temporaries(cleanup_temporaries), _num_parameters(0) {} void SQLQueryPlan::add_tree_by_root(std::shared_ptr op) { _roots.push_back(op); } @@ -21,7 +22,7 @@ std::vector> SQLQueryPlan::create_tasks() const { for (const auto& root : _roots) { std::vector> sub_list; - sub_list = OperatorTask::make_tasks_from_operator(root); + sub_list = OperatorTask::make_tasks_from_operator(root, _cleanup_temporaries); tasks.insert(tasks.end(), sub_list.begin(), sub_list.end()); } @@ -31,7 +32,7 @@ std::vector> SQLQueryPlan::create_tasks() const { const std::vector>& SQLQueryPlan::tree_roots() const { return _roots; } SQLQueryPlan SQLQueryPlan::recreate(const std::vector& arguments) const { - SQLQueryPlan new_plan; + SQLQueryPlan new_plan{_cleanup_temporaries}; for (const auto& root : _roots) { DebugAssert(root.get() != nullptr, "Root operator in plan should not be null."); diff --git a/src/lib/sql/sql_query_plan.hpp b/src/lib/sql/sql_query_plan.hpp index 59b41d1ddc..3033fc543c 100644 --- a/src/lib/sql/sql_query_plan.hpp +++ b/src/lib/sql/sql_query_plan.hpp @@ -18,7 +18,7 @@ class TransactionContext; // When caching a query (through prepared statements or automatically) its SQLQueryPlan object is cached. class SQLQueryPlan { public: - SQLQueryPlan(); + explicit SQLQueryPlan(CleanupTemporaries cleanup_temporaries); // Add a new operator tree to the query plan by adding the root operator. void add_tree_by_root(std::shared_ptr op); @@ -46,6 +46,9 @@ class SQLQueryPlan { uint16_t num_parameters() const; protected: + // Should we delete temporary result tables once they are not needed anymore? + CleanupTemporaries _cleanup_temporaries; + // Root nodes of all operator trees that this plan contains. std::vector> _roots; diff --git a/src/lib/types.hpp b/src/lib/types.hpp index 26ff8a5c16..e03c7f87cc 100644 --- a/src/lib/types.hpp +++ b/src/lib/types.hpp @@ -255,6 +255,7 @@ enum class TableType { References, Data }; enum class DescriptionMode { SingleLine, MultiLine }; enum class UseMvcc : bool { Yes = true, No = false }; +enum class CleanupTemporaries : bool { Yes = true, No = false }; class Noncopyable { protected: diff --git a/src/test/operators/import_csv_test.cpp b/src/test/operators/import_csv_test.cpp index d521e14bd1..3ce5423558 100644 --- a/src/test/operators/import_csv_test.cpp +++ b/src/test/operators/import_csv_test.cpp @@ -103,7 +103,8 @@ TEST_F(OperatorsImportCsvTest, EmptyStrings) { TEST_F(OperatorsImportCsvTest, Parallel) { CurrentScheduler::set(std::make_shared(Topology::create_fake_numa_topology(8, 4))); - auto importer = std::make_shared(std::make_shared("src/test/csv/float_int_large.csv")); + auto importer = std::make_shared(std::make_shared("src/test/csv/float_int_large.csv"), + CleanupTemporaries::Yes); importer->schedule(); TableColumnDefinitions column_definitions{{"b", DataType::Float}, {"a", DataType::Int}}; diff --git a/src/test/scheduler/scheduler_test.cpp b/src/test/scheduler/scheduler_test.cpp index 0c22a43a43..0f077e772d 100644 --- a/src/test/scheduler/scheduler_test.cpp +++ b/src/test/scheduler/scheduler_test.cpp @@ -190,8 +190,8 @@ TEST_F(SchedulerTest, MultipleOperators) { auto gt = std::make_shared("table"); auto ts = std::make_shared(gt, ColumnID{0}, PredicateCondition::GreaterThanEquals, 1234); - auto gt_task = std::make_shared(gt); - auto ts_task = std::make_shared(ts); + auto gt_task = std::make_shared(gt, CleanupTemporaries::Yes); + auto ts_task = std::make_shared(ts, CleanupTemporaries::Yes); gt_task->set_as_predecessor_of(ts_task); gt_task->schedule(); diff --git a/src/test/sql/sql_pipeline_test.cpp b/src/test/sql/sql_pipeline_test.cpp index fb8d53f6f5..6f42d0e1aa 100644 --- a/src/test/sql/sql_pipeline_test.cpp +++ b/src/test/sql/sql_pipeline_test.cpp @@ -384,6 +384,30 @@ TEST_F(SQLPipelineTest, GetResultTableWithScheduler) { EXPECT_TABLE_EQ_UNORDERED(table, _join_result); } +TEST_F(SQLPipelineTest, CleanupWithScheduler) { + auto sql_pipeline = SQLPipelineBuilder{_join_query}.create_pipeline(); + + CurrentScheduler::set(std::make_shared(Topology::create_fake_numa_topology(8, 4))); + sql_pipeline.get_result_table(); + + for (auto task_it = sql_pipeline.get_tasks()[0].cbegin(); task_it != sql_pipeline.get_tasks()[0].cend() - 1; + ++task_it) { + EXPECT_EQ(std::dynamic_pointer_cast(*task_it)->get_operator()->get_output(), nullptr); + } +} + +TEST_F(SQLPipelineTest, DisabledCleanupWithScheduler) { + auto sql_pipeline = SQLPipelineBuilder{_join_query}.dont_cleanup_temporaries().create_pipeline(); + + CurrentScheduler::set(std::make_shared(Topology::create_fake_numa_topology(8, 4))); + sql_pipeline.get_result_table(); + + for (auto task_it = sql_pipeline.get_tasks()[0].cbegin(); task_it != sql_pipeline.get_tasks()[0].cend() - 1; + ++task_it) { + EXPECT_NE(std::dynamic_pointer_cast(*task_it)->get_operator()->get_output(), nullptr); + } +} + TEST_F(SQLPipelineTest, GetResultTableBadQuery) { auto sql = "SELECT a + b FROM table_a"; auto sql_pipeline = SQLPipelineBuilder{sql}.create_pipeline(); diff --git a/src/test/tasks/operator_task_test.cpp b/src/test/tasks/operator_task_test.cpp index 955dfc4360..8a0b9108be 100644 --- a/src/test/tasks/operator_task_test.cpp +++ b/src/test/tasks/operator_task_test.cpp @@ -30,7 +30,7 @@ class OperatorTaskTest : public BaseTest { TEST_F(OperatorTaskTest, BasicTasksFromOperatorTest) { auto gt = std::make_shared("table_a"); - auto tasks = OperatorTask::make_tasks_from_operator(gt); + auto tasks = OperatorTask::make_tasks_from_operator(gt, CleanupTemporaries::Yes); auto result_task = tasks.back(); result_task->schedule(); @@ -42,13 +42,17 @@ TEST_F(OperatorTaskTest, SingleDependencyTasksFromOperatorTest) { auto gt = std::make_shared("table_a"); auto ts = std::make_shared(gt, ColumnID{0}, PredicateCondition::Equals, 1234); - auto tasks = OperatorTask::make_tasks_from_operator(ts); + auto tasks = OperatorTask::make_tasks_from_operator(ts, CleanupTemporaries::Yes); for (auto& task : tasks) { task->schedule(); + // We don't have to wait here, because we are running the task tests without a scheduler } auto expected_result = load_table("src/test/tables/int_float_filtered.tbl", 2); EXPECT_TABLE_EQ_UNORDERED(expected_result, tasks.back()->get_operator()->get_output()); + + // Check that everything was properly cleaned up + EXPECT_EQ(gt->get_output(), nullptr); } TEST_F(OperatorTaskTest, DoubleDependencyTasksFromOperatorTest) { @@ -57,14 +61,20 @@ TEST_F(OperatorTaskTest, DoubleDependencyTasksFromOperatorTest) { auto join = std::make_shared(gt_a, gt_b, JoinMode::Inner, ColumnIDPair(ColumnID{0}, ColumnID{0}), PredicateCondition::Equals); - auto tasks = OperatorTask::make_tasks_from_operator(join); + auto tasks = OperatorTask::make_tasks_from_operator(join, CleanupTemporaries::Yes); for (auto& task : tasks) { task->schedule(); + // We don't have to wait here, because we are running the task tests without a scheduler } auto expected_result = load_table("src/test/tables/joinoperators/int_inner_join.tbl", 2); EXPECT_TABLE_EQ_UNORDERED(expected_result, tasks.back()->get_operator()->get_output()); + + // Check that everything was properly cleaned up + EXPECT_EQ(gt_a->get_output(), nullptr); + EXPECT_EQ(gt_b->get_output(), nullptr); } + TEST_F(OperatorTaskTest, MakeDiamondShape) { auto gt_a = std::make_shared("table_a"); auto scan_a = std::make_shared(gt_a, ColumnID{0}, PredicateCondition::GreaterThanEquals, 1234); @@ -72,7 +82,7 @@ TEST_F(OperatorTaskTest, MakeDiamondShape) { auto scan_c = std::make_shared(scan_a, ColumnID{1}, PredicateCondition::GreaterThan, 2000); auto union_positions = std::make_shared(scan_b, scan_c); - auto tasks = OperatorTask::make_tasks_from_operator(union_positions); + auto tasks = OperatorTask::make_tasks_from_operator(union_positions, CleanupTemporaries::Yes); ASSERT_EQ(tasks.size(), 5u); EXPECT_EQ(tasks[0]->get_operator(), gt_a); @@ -95,5 +105,16 @@ TEST_F(OperatorTaskTest, MakeDiamondShape) { std::vector> expected_successors_4{}; EXPECT_EQ(tasks[4]->successors(), expected_successors_4); + + for (auto& task : tasks) { + task->schedule(); + // We don't have to wait here, because we are running the task tests without a scheduler + } + + // Check that everything was properly cleaned up + EXPECT_EQ(gt_a->get_output(), nullptr); + EXPECT_EQ(scan_a->get_output(), nullptr); + EXPECT_EQ(scan_b->get_output(), nullptr); + EXPECT_EQ(scan_c->get_output(), nullptr); } } // namespace opossum