Skip to content

Commit

Permalink
Cleanup Temporary Tables (hyrise#937)
Browse files Browse the repository at this point in the history
* Cleanup Temporary Tables

* Some more cleanyclean in operators

* Address Review

* address review part 2

* review

* d'oh

* Update abstract_task.cpp

* this could have been prevented if I had a compiler on my phone...
  • Loading branch information
mrks authored Jun 29, 2018
1 parent 16d69e9 commit 1b99106
Show file tree
Hide file tree
Showing 31 changed files with 213 additions and 75 deletions.
4 changes: 3 additions & 1 deletion src/benchmarklib/benchmark_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ void BenchmarkRunner::_execute_query(const NamedQuery& named_query) {
const auto& name = named_query.first;
const auto& sql = named_query.second;

auto pipeline = SQLPipelineBuilder{sql}.with_mvcc(_config.use_mvcc).create_pipeline();
auto pipeline_builder = SQLPipelineBuilder{sql}.with_mvcc(_config.use_mvcc);
if (_config.enable_visualization) pipeline_builder.dont_cleanup_temporaries();
auto pipeline = pipeline_builder.create_pipeline();
// Execute the query, we don't care about the results
pipeline.get_result_table();

Expand Down
4 changes: 3 additions & 1 deletion src/bin/console/console.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ int Console::_eval_command(const CommandFunction& func, const std::string& comma
bool Console::_initialize_pipeline(const std::string& sql) {
try {
if (_explicitly_created_transaction_context != nullptr) {
// We want to keep the temporary tables for debugging and visualization
_sql_pipeline =
std::make_unique<SQLPipeline>(SQLPipelineBuilder{sql}
.dont_cleanup_temporaries()
.with_prepared_statement_cache(_prepared_statements)
.with_transaction_context(_explicitly_created_transaction_context)
.create_pipeline());
Expand Down Expand Up @@ -554,7 +556,7 @@ int Console::visualize(const std::string& input) {

} else {
// Visualize the Physical Query Plan
SQLQueryPlan query_plan;
SQLQueryPlan query_plan{CleanupTemporaries::No};

try {
if (!no_execute) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/operators/abstract_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ std::shared_ptr<const Table> AbstractOperator::get_output() const {
return _output;
}

void AbstractOperator::clear_output() { _output = nullptr; }

const std::string AbstractOperator::description(DescriptionMode description_mode) const { return name(); }

std::shared_ptr<AbstractOperator> AbstractOperator::recreate(const std::vector<AllParameterVariant>& args) const {
Expand Down
7 changes: 5 additions & 2 deletions src/lib/operators/abstract_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ class AbstractOperator : public std::enable_shared_from_this<AbstractOperator>,
virtual void execute();

// returns the result of the operator
// When using OperatorTasks, they automatically clear this once all successors are done. This reduces the number of
// temporary tables.
std::shared_ptr<const Table> get_output() const;

// clears the output of this operator to free up space
void clear_output();

virtual const std::string name() const = 0;
virtual const std::string description(DescriptionMode description_mode = DescriptionMode::SingleLine) const;

Expand Down Expand Up @@ -152,8 +157,6 @@ class AbstractOperator : public std::enable_shared_from_this<AbstractOperator>,
std::optional<std::weak_ptr<TransactionContext>> _transaction_context;

BaseOperatorPerformanceData _base_performance_data;

std::weak_ptr<OperatorTask> _operator_task;
};

} // namespace opossum
11 changes: 8 additions & 3 deletions src/lib/operators/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ std::shared_ptr<AbstractOperator> Aggregate::_on_recreate(
return std::make_shared<Aggregate>(recreated_input_left, _aggregates, _groupby_column_ids);
}

void Aggregate::_on_cleanup() {
_contexts_per_column.clear();
_keys_per_chunk.clear();
}

/*
Visitor context for the partitioning/grouping visitor
*/
Expand Down Expand Up @@ -584,10 +589,10 @@ std::shared_ptr<const Table> Aggregate::_on_execute() {
}

// Write the output
_output = std::make_shared<Table>(_output_column_definitions, TableType::Data);
_output->append_chunk(_output_columns);
auto output = std::make_shared<Table>(_output_column_definitions, TableType::Data);
output->append_chunk(_output_columns);

return _output;
return output;
}

/*
Expand Down
3 changes: 2 additions & 1 deletion src/lib/operators/aggregate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class Aggregate : public AbstractReadOnlyOperator {
const std::vector<AllParameterVariant>& args, const std::shared_ptr<AbstractOperator>& recreated_input_left,
const std::shared_ptr<AbstractOperator>& recreated_input_right) const override;

void _on_cleanup() override;

template <typename ColumnType>
static void _create_aggregate_context(boost::hana::basic_type<ColumnType> type,
std::shared_ptr<ColumnVisitableContext>& aggregate_context,
Expand Down Expand Up @@ -125,7 +127,6 @@ class Aggregate : public AbstractReadOnlyOperator {
const std::vector<AggregateColumnDefinition> _aggregates;
const std::vector<ColumnID> _groupby_column_ids;

std::shared_ptr<Table> _output;
TableColumnDefinitions _output_column_definitions;
ChunkColumns _output_columns;

Expand Down
6 changes: 3 additions & 3 deletions src/lib/operators/delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ std::shared_ptr<const Table> Delete::_on_execute(std::shared_ptr<TransactionCont
}
}

_num_rows_deleted = input_table_left()->row_count();

return nullptr;
}

Expand All @@ -66,12 +68,10 @@ void Delete::_on_commit_records(const CommitID cid) {
}

void Delete::_finish_commit() {
const auto num_rows_deleted = input_table_left()->row_count();

const auto table_statistics = _table->table_statistics();
if (table_statistics) {
_table->set_table_statistics(std::make_shared<TableStatistics>(table_statistics->table_type(),
table_statistics->row_count() - num_rows_deleted,
table_statistics->row_count() - _num_rows_deleted,
table_statistics->column_statistics()));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib/operators/delete.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ class Delete : public AbstractReadWriteOperator {
std::shared_ptr<Table> _table;
TransactionID _transaction_id;
std::vector<std::shared_ptr<const PosList>> _pos_lists;
uint64_t _num_rows_deleted;
};
} // namespace opossum
38 changes: 19 additions & 19 deletions src/lib/operators/join_hash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,20 +532,20 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl {
*/
TableColumnDefinitions output_column_definitions;

auto _right_in_table = _right->get_output();
auto _left_in_table = _left->get_output();
auto right_in_table = _right->get_output();
auto left_in_table = _left->get_output();

if (_inputs_swapped) {
// Semi/Anti joins are always swapped but do not need the outer relation
if (_mode == JoinMode::Semi || _mode == JoinMode::Anti) {
output_column_definitions = _right_in_table->column_definitions();
output_column_definitions = right_in_table->column_definitions();
} else {
output_column_definitions =
concatenated(_right_in_table->column_definitions(), _left_in_table->column_definitions());
concatenated(right_in_table->column_definitions(), left_in_table->column_definitions());
}
} else {
output_column_definitions =
concatenated(_left_in_table->column_definitions(), _right_in_table->column_definitions());
concatenated(left_in_table->column_definitions(), right_in_table->column_definitions());
}

_output_table = std::make_shared<Table>(output_column_definitions, TableType::References);
Expand All @@ -559,8 +559,8 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl {

// Pre-partitioning
// Save chunk offsets into the input relation
size_t left_chunk_count = _left_in_table->chunk_count();
size_t right_chunk_count = _right_in_table->chunk_count();
size_t left_chunk_count = left_in_table->chunk_count();
size_t right_chunk_count = right_in_table->chunk_count();

auto left_chunk_offsets = std::make_shared<std::vector<size_t>>();
auto right_chunk_offsets = std::make_shared<std::vector<size_t>>();
Expand All @@ -571,13 +571,13 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl {
size_t offset_left = 0;
for (ChunkID i{0}; i < left_chunk_count; ++i) {
left_chunk_offsets->operator[](i) = offset_left;
offset_left += _left_in_table->get_chunk(i)->size();
offset_left += left_in_table->get_chunk(i)->size();
}

size_t offset_right = 0;
for (ChunkID i{0}; i < right_chunk_count; ++i) {
right_chunk_offsets->operator[](i) = offset_right;
offset_right += _right_in_table->get_chunk(i)->size();
offset_right += right_in_table->get_chunk(i)->size();
}

Timer performance_timer;
Expand All @@ -592,10 +592,10 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl {
This helps choosing a scheduler node for the radix phase (see below).
*/
// Scheduler note: parallelize this at some point. Currently, the amount of jobs would be too high
auto materialized_left = _materialize_input<LeftType>(_left_in_table, _column_ids.first, histograms_left);
auto materialized_left = _materialize_input<LeftType>(left_in_table, _column_ids.first, histograms_left);
// 'keep_nulls' makes sure that the relation on the right materializes NULL values when executing an OUTER join.
auto materialized_right =
_materialize_input<RightType>(_right_in_table, _column_ids.second, histograms_right, keep_nulls);
_materialize_input<RightType>(right_in_table, _column_ids.second, histograms_right, keep_nulls);

// Radix Partitioning phase
/*
Expand Down Expand Up @@ -656,13 +656,13 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl {
PosListsByColumn right_pos_lists_by_column;

// left_pos_lists_by_column will only be needed if left is a reference table and being output
if (_left_in_table->type() == TableType::References && !only_output_right_input) {
left_pos_lists_by_column = setup_pos_lists_by_column(_left_in_table);
if (left_in_table->type() == TableType::References && !only_output_right_input) {
left_pos_lists_by_column = setup_pos_lists_by_column(left_in_table);
}

// right_pos_lists_by_column will only be needed if right is a reference table
if (_right_in_table->type() == TableType::References) {
right_pos_lists_by_column = setup_pos_lists_by_column(_right_in_table);
if (right_in_table->type() == TableType::References) {
right_pos_lists_by_column = setup_pos_lists_by_column(right_in_table);
}

for (size_t partition_id = 0; partition_id < left_pos_lists.size(); ++partition_id) {
Expand All @@ -679,15 +679,15 @@ class JoinHash::JoinHashImpl : public AbstractJoinOperatorImpl {

// we need to swap back the inputs, so that the order of the output columns is not harmed
if (_inputs_swapped) {
write_output_columns(output_columns, _right_in_table, right_pos_lists_by_column, right);
write_output_columns(output_columns, right_in_table, right_pos_lists_by_column, right);

// Semi/Anti joins are always swapped but do not need the outer relation
if (!only_output_right_input) {
write_output_columns(output_columns, _left_in_table, left_pos_lists_by_column, left);
write_output_columns(output_columns, left_in_table, left_pos_lists_by_column, left);
}
} else {
write_output_columns(output_columns, _left_in_table, left_pos_lists_by_column, left);
write_output_columns(output_columns, _right_in_table, right_pos_lists_by_column, right);
write_output_columns(output_columns, left_in_table, left_pos_lists_by_column, left);
write_output_columns(output_columns, right_in_table, right_pos_lists_by_column, right);
}

_output_table->append_chunk(output_columns);
Expand Down
10 changes: 10 additions & 0 deletions src/lib/operators/join_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,14 @@ void JoinIndex::_write_output_columns(ChunkColumns& output_columns, const std::s
}
}

void JoinIndex::_on_cleanup() {
_output_table.reset();
_left_in_table.reset();
_right_in_table.reset();
_pos_list_left.reset();
_pos_list_right.reset();
_left_matches.clear();
_right_matches.clear();
}

} // namespace opossum
2 changes: 2 additions & 0 deletions src/lib/operators/join_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class JoinIndex : public AbstractJoinOperator {
void _write_output_columns(ChunkColumns& output_columns, const std::shared_ptr<const Table> input_table,
std::shared_ptr<PosList> pos_list);

void _on_cleanup() override;

std::shared_ptr<Table> _output_table;
std::shared_ptr<const Table> _left_in_table;
std::shared_ptr<const Table> _right_in_table;
Expand Down
9 changes: 9 additions & 0 deletions src/lib/operators/join_nested_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,13 @@ void JoinNestedLoop::_write_output_chunks(ChunkColumns& columns, const std::shar
}
}

void JoinNestedLoop::_on_cleanup() {
_output_table.reset();
_left_in_table.reset();
_right_in_table.reset();
_pos_list_left.reset();
_pos_list_right.reset();
_right_matches.clear();
}

} // namespace opossum
2 changes: 2 additions & 0 deletions src/lib/operators/join_nested_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class JoinNestedLoop : public AbstractJoinOperator {
void _write_output_chunks(ChunkColumns& columns, const std::shared_ptr<const Table> input_table,
std::shared_ptr<PosList> pos_list);

void _on_cleanup() override;

std::shared_ptr<Table> _output_table;
std::shared_ptr<const Table> _left_in_table;
std::shared_ptr<const Table> _right_in_table;
Expand Down
4 changes: 3 additions & 1 deletion src/lib/operators/projection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ std::shared_ptr<const Table> Projection::_on_execute() {

// For subselects, we need to execute the subquery in order to use the result table later
if (column_expression->is_subselect() && !column_expression->has_subselect_table()) {
SQLQueryPlan query_plan;
// Because subqueries are still WIP, we do not clean up their temporary tables. Otherwise, we would have to
// somehow pass in cleanup_temporaries from the pipeline.
SQLQueryPlan query_plan{CleanupTemporaries::No};
query_plan.add_tree_by_root(column_expression->subselect_operator());

auto transaction_context = this->transaction_context();
Expand Down
11 changes: 6 additions & 5 deletions src/lib/scheduler/abstract_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ TaskID AbstractTask::id() const { return _id; }

NodeID AbstractTask::node_id() const { return _node_id; }

bool AbstractTask::is_ready() const { return _predecessor_counter == 0; }
bool AbstractTask::is_ready() const { return _pending_predecessors == 0; }

bool AbstractTask::is_done() const { return _done; }

Expand All @@ -33,10 +33,13 @@ void AbstractTask::set_id(TaskID id) { _id = id; }
void AbstractTask::set_as_predecessor_of(std::shared_ptr<AbstractTask> successor) {
DebugAssert((!_is_scheduled), "Possible race: Don't set dependencies after the Task was scheduled");

successor->_on_predecessor_added();
successor->_pending_predecessors++;
_successors.emplace_back(successor);
successor->_predecessors.emplace_back(shared_from_this());
}

const std::vector<std::weak_ptr<AbstractTask>>& AbstractTask::predecessors() const { return _predecessors; }

const std::vector<std::shared_ptr<AbstractTask>>& AbstractTask::successors() const { return _successors; }

void AbstractTask::set_node_id(NodeID node_id) { _node_id = node_id; }
Expand Down Expand Up @@ -108,10 +111,8 @@ void AbstractTask::_mark_as_scheduled() {
DebugAssert((!already_scheduled), "Task was already scheduled!");
}

void AbstractTask::_on_predecessor_added() { _predecessor_counter++; }

void AbstractTask::_on_predecessor_done() {
auto new_predecessor_count = --_predecessor_counter; // atomically decrement
auto new_predecessor_count = --_pending_predecessors; // atomically decrement
if (new_predecessor_count == 0) {
if (CurrentScheduler::is_set()) {
auto worker = Worker::get_this_thread_worker();
Expand Down
13 changes: 7 additions & 6 deletions src/lib/scheduler/abstract_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class AbstractTask : public std::enable_shared_from_this<AbstractTask> {
*/
void set_as_predecessor_of(std::shared_ptr<AbstractTask> successor);

/**
* @return the predecessors of this Task
*/
const std::vector<std::weak_ptr<AbstractTask>>& predecessors() const;

/**
* @return the successors of this Task
*/
Expand Down Expand Up @@ -117,11 +122,6 @@ class AbstractTask : public std::enable_shared_from_this<AbstractTask> {
*/
void _join_without_replacement_worker();

/**
* Called when a dependency is initialized (by set_as_predecessor_of)
*/
void _on_predecessor_added();

/**
* Called by a dependency when it finished execution
*/
Expand All @@ -133,7 +133,8 @@ class AbstractTask : public std::enable_shared_from_this<AbstractTask> {
std::function<void()> _done_callback;

// For dependencies
std::atomic_uint _predecessor_counter{0};
std::atomic_uint _pending_predecessors{0};
std::vector<std::weak_ptr<AbstractTask>> _predecessors;
std::vector<std::shared_ptr<AbstractTask>> _successors;

// For making sure a task gets only scheduled and enqueued once, respectively
Expand Down
Loading

0 comments on commit 1b99106

Please sign in to comment.