From 6b3fcde6d6e663fa06a62c4d051bd7495bb2a33a Mon Sep 17 00:00:00 2001 From: Bouncner Date: Mon, 31 May 2021 21:49:25 +0200 Subject: [PATCH] Improve performance of sort-merge join (#2355) Various small performance improvements in the sort-merge join. No added functionality. --- .../operators/join_hash/join_hash_steps.hpp | 2 +- src/lib/operators/join_sort_merge.cpp | 401 +++++++++--------- src/lib/operators/join_sort_merge.hpp | 11 +- .../join_sort_merge/column_materializer.hpp | 198 +++------ .../join_sort_merge/radix_cluster_sort.hpp | 286 ++++++------- 5 files changed, 389 insertions(+), 509 deletions(-) diff --git a/src/lib/operators/join_hash/join_hash_steps.hpp b/src/lib/operators/join_hash/join_hash_steps.hpp index f953aea954..a4e19991d9 100644 --- a/src/lib/operators/join_hash/join_hash_steps.hpp +++ b/src/lib/operators/join_hash/join_hash_steps.hpp @@ -287,7 +287,7 @@ RadixContainer materialize_input(const std::shared_ptr& in_table // Create histograms per chunk histograms.resize(chunk_count); - std::vector> jobs; + auto jobs = std::vector>{}; jobs.reserve(chunk_count); for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count; ++chunk_id) { const auto chunk_in = in_table->get_chunk(chunk_id); diff --git a/src/lib/operators/join_sort_merge.cpp b/src/lib/operators/join_sort_merge.cpp index efb1eb05a1..dea3c5dd92 100644 --- a/src/lib/operators/join_sort_merge.cpp +++ b/src/lib/operators/join_sort_merge.cpp @@ -23,10 +23,9 @@ namespace opossum { -/** -* TODO(anyone): Outer not-equal join (outer !=) -* TODO(anyone): Choose an appropriate number of clusters. -**/ +// TODO(anyone) >> todos and nice-to-haves for the sort-merge join: +// - outer not-equal join (outer !=) +// - Bloom filters bool JoinSortMerge::supports(const JoinConfiguration config) { return (config.predicate_condition != PredicateCondition::NotEquals || config.join_mode == JoinMode::Inner) && @@ -34,15 +33,14 @@ bool JoinSortMerge::supports(const JoinConfiguration config) { config.join_mode != JoinMode::AntiNullAsTrue && config.join_mode != JoinMode::AntiNullAsFalse; } -/** -* The sort merge join performs a join on two input tables on specific join columns. For usage notes, see the -* join_sort_merge.hpp. This is how the join works: -* -> The input tables are materialized and clustered into a specified number of clusters. -* /utils/radix_cluster_sort.hpp for more info on the clustering phase. -* -> The join is performed per cluster. For the joining phase, runs of entries with the same value are identified -* and handled at once. If a join-match is identified, the corresponding row_ids are noted for the output. -* -> Using the join result, the output table is built using pos lists referencing the original tables. -**/ + +// The sort merge join performs a join on two input tables on specific join columns. For usage notes, see the +// join_sort_merge.hpp. This is how the join works: +// -> The input tables are materialized and clustered into a specified number of clusters. +// /utils/radix_cluster_sort.hpp for more info on the clustering phase. +// -> The join is performed per cluster. For the joining phase, runs of entries with the same value are identified +// and handled at once. If a join-match is identified, the corresponding row_ids are noted for the output. +// -> Using the join result, the output table is built using pos lists referencing the original tables. JoinSortMerge::JoinSortMerge(const std::shared_ptr& left, const std::shared_ptr& right, const JoinMode mode, const OperatorJoinPredicate& primary_predicate, @@ -94,9 +92,6 @@ const std::string& JoinSortMerge::name() const { return name; } -/** -** Start of implementation. -**/ template class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { public: @@ -126,8 +121,8 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { OperatorPerformanceData& _performance; // Contains the materialized sorted input tables - std::unique_ptr> _sorted_left_table; - std::unique_ptr> _sorted_right_table; + MaterializedSegmentList _sorted_left_table; + MaterializedSegmentList _sorted_right_table; // Contains the null value row ids if a join column is an outer join column RowIDPosList _null_rows_left; @@ -141,7 +136,7 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { const std::vector& _secondary_join_predicates; - // the cluster count must be a power of two, i.e. 1, 2, 4, 8, 16, ... + // The cluster count must be a power of two, i.e. 1, 2, 4, 8, 16, ... size_t _cluster_count; // Contains the output row ids for each cluster @@ -166,9 +161,7 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { std::vector _left_row_ids_emitted_per_chunk; std::vector _right_row_ids_emitted_per_chunk; - /** - * The TablePosition is a utility struct that is used to define a specific position in a sorted input table. - **/ + // The TablePosition is a utility struct that is used to define a specific position in a sorted input table. struct TableRange; struct TablePosition { TablePosition() = default; @@ -183,10 +176,8 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { TablePosition _end_of_left_table; TablePosition _end_of_right_table; - /** - * The TableRange is a utility struct that is used to define ranges of rows in a sorted input table spanning from - * a start position to an end position. - **/ + // The TableRange is a utility struct that is used to define ranges of rows in a sorted input table spanning from + // a start position to an end position. struct TableRange { TableRange(TablePosition start_position, TablePosition end_position) : start(start_position), end(end_position) {} TableRange(size_t cluster, size_t start_index, size_t end_index) @@ -197,53 +188,46 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { // Executes the given action for every row id of the table in this range. template - void for_every_row_id(std::unique_ptr>& table, F action) { + void for_every_row_id(const MaterializedSegmentList& table, const F& action) { // False positive with gcc and tsan (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=92194) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" - for (size_t cluster = start.cluster; cluster <= end.cluster; ++cluster) { - size_t start_index = (cluster == start.cluster) ? start.index : 0; - size_t end_index = (cluster == end.cluster) ? end.index : (*table)[cluster]->size(); - for (size_t index = start_index; index < end_index; ++index) { - action((*(*table)[cluster])[index].row_id); + for (auto cluster = start.cluster; cluster <= end.cluster; ++cluster) { + const auto start_index = (cluster == start.cluster) ? start.index : 0; + const auto end_index = (cluster == end.cluster) ? end.index : table[cluster].size(); + for (auto index = start_index; index < end_index; ++index) { + action(table[cluster][index].row_id); } } #pragma GCC diagnostic pop } }; - /** - * Determines the number of clusters to be used for the join. - * The number of clusters must be a power of two, i.e. 1, 2, 4, 8, 16... - * TODO(anyone): How should we determine the number of clusters? - **/ + // Determines the number of clusters to be used for the join. The number of clusters must be a power of two. size_t _determine_number_of_clusters() { - // Get the next lower power of two of the bigger chunk number - // Note: this is only provisional. There should be a reasonable calculation here based on hardware stats. - size_t chunk_count_left = _sort_merge_join.left_input_table()->chunk_count(); - size_t chunk_count_right = _sort_merge_join.right_input_table()->chunk_count(); + // We try to have a partition size of roughly 256 KB to avoid out-of-L2 cache sorts. This value has been determined + // by an array of benchmarks. Ideally, it would incorporate hardware knowledge such as the L2 cache size. + const auto max_sort_items_count = 256'000 / sizeof(T); + const size_t cluster_count_left = _sort_merge_join.left_input_table()->row_count() / max_sort_items_count; + const size_t cluster_count_right = _sort_merge_join.right_input_table()->row_count() / max_sort_items_count; + + // Return the next larger power of two for the larger of the two cluster counts. return static_cast( - std::pow(2, std::floor(std::log2(std::max({size_t{1}, chunk_count_left, chunk_count_right}))))); + std::pow(2, std::floor(std::log2(std::max({size_t{1}, cluster_count_left, cluster_count_right}))))); } - /** - * Gets the table position corresponding to the end of the table, i.e. the last entry of the last cluster. - **/ - static TablePosition _end_of_table(std::unique_ptr>& table) { - DebugAssert(!table->empty(), "table has no chunks"); - auto last_cluster = table->size() - 1; - return TablePosition(last_cluster, (*table)[last_cluster]->size()); + // Gets the table position corresponding to the end of the table, i.e. the last entry of the last cluster. + static TablePosition _end_of_table(const MaterializedSegmentList& table) { + DebugAssert(!table.empty(), "table has no chunks"); + auto last_cluster = table.size() - 1; + return TablePosition(last_cluster, table[last_cluster].size()); } - /** - * Represents the result of a value comparison. - **/ + // Represents the result of a value comparison. enum class CompareResult { Less, Greater, Equal }; - /** - * Performs the join for two runs of a specified cluster. - * A run is a series of rows in a cluster with the same value. - **/ + // Performs the join for two runs of a specified cluster. + // A run is a series of rows in a cluster with the same value. void _join_runs(TableRange left_run, TableRange right_run, CompareResult compare_result, std::optional& multi_predicate_join_evaluator, const size_t cluster_id) { switch (_primary_predicate_condition) { @@ -309,18 +293,14 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Emits a combination of a left row id and a right row id to the join output. - **/ + // Emits a combination of a left row id and a right row id to the join output. void _emit_combination(size_t output_cluster, RowID left_row_id, RowID right_row_id) { _output_pos_lists_left[output_cluster].push_back(left_row_id); _output_pos_lists_right[output_cluster].push_back(right_row_id); } - /** - * Emits all the combinations of row ids from the left table range and the right table range to the join output - * where also the secondary predicates are satisfied. - **/ + // Emits all the combinations of row ids from the left table range and the right table range to the join output + // where also the secondary predicates are satisfied. void _emit_qualified_combinations(size_t output_cluster, TableRange left_range, TableRange right_range, std::optional& multi_predicate_join_evaluator) { if (multi_predicate_join_evaluator) { @@ -347,11 +327,9 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Only for multi predicated inner joins. - * Emits all the combinations of row ids from the left table range and the right table range to the join output - * where the secondary predicates are satisfied. - **/ + // Only for multi predicated inner joins. + // Emits all the combinations of row ids from the left table range and the right table range to the join output + // where the secondary predicates are satisfied. void _emit_combinations_multi_predicated_inner(size_t output_cluster, TableRange left_range, TableRange right_range, MultiPredicateJoinEvaluator& multi_predicate_join_evaluator) { left_range.for_every_row_id(_sorted_left_table, [&](RowID left_row_id) { @@ -363,12 +341,10 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { }); } - /** - * Only for multi predicated left outer joins. - * Emits all the combinations of row ids from the left table range and the right table range to the join output - * where the secondary predicates are satisfied. - * For a left row id without a match, the combination [left row id|NULL row id] is emitted. - **/ + // Only for multi predicated left outer joins. + // Emits all the combinations of row ids from the left table range and the right table range to the join output + // where the secondary predicates are satisfied. + // For a left row id without a match, the combination [left row id|NULL row id] is emitted. void _emit_combinations_multi_predicated_left_outer(size_t output_cluster, TableRange left_range, TableRange right_range, MultiPredicateJoinEvaluator& multi_predicate_join_evaluator) { @@ -398,12 +374,10 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Only for multi predicated right outer joins. - * Emits all the combinations of row ids from the left table range and the right table range to the join output - * where the secondary predicates are satisfied. - * For a right row id without a match, the combination [NULL row id|right row id] is emitted. - **/ + // Only for multi predicated right outer joins. + // Emits all the combinations of row ids from the left table range and the right table range to the join output + // where the secondary predicates are satisfied. + // For a right row id without a match, the combination [NULL row id|right row id] is emitted. void _emit_combinations_multi_predicated_right_outer(size_t output_cluster, TableRange left_range, TableRange right_range, MultiPredicateJoinEvaluator& multi_predicate_join_evaluator) { @@ -433,13 +407,11 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Only for multi predicated full outer joins. - * Emits all the combinations of row ids from the left table range and the right table range to the join output - * where the secondary predicates are satisfied. - * For a left row id without a match, the combination [right row id|NULL row id] is emitted. - * For a right row id without a match, the combination [NULL row id|right row id] is emitted. - **/ + // Only for multi-predicate full outer joins. + // Emits all the combinations of row ids from the left table range and the right table range to the join output + // where the secondary predicates are satisfied. + // For a left row id without a match, the combination [right row id|NULL row id] is emitted. + // For a right row id without a match, the combination [NULL row id|right row id] is emitted. void _emit_combinations_multi_predicated_full_outer(size_t output_cluster, TableRange left_range, TableRange right_range, MultiPredicateJoinEvaluator& multi_predicate_join_evaluator) { @@ -479,44 +451,61 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Emits all combinations of row ids from the left table range and a NULL value on the right side - * (regarding the primary predicate) to the join output. - **/ + // Emits all combinations of row ids from the left table range and a NULL value on the right side + // (regarding the primary predicate) to the join output. void _emit_right_primary_null_combinations(size_t output_cluster, TableRange left_range) { left_range.for_every_row_id( _sorted_left_table, [&](RowID left_row_id) { _emit_combination(output_cluster, left_row_id, NULL_ROW_ID); }); } - /** - * Emits all combinations of row ids from the right table range and a NULL value on the left side - * (regarding the primary predicate) to the join output. - **/ + // Emits all combinations of row ids from the right table range and a NULL value on the left side + // (regarding the primary predicate) to the join output. void _emit_left_primary_null_combinations(size_t output_cluster, TableRange right_range) { right_range.for_every_row_id( _sorted_right_table, [&](RowID right_row_id) { _emit_combination(output_cluster, NULL_ROW_ID, right_row_id); }); } - /** - * Determines the length of the run starting at start_index in the values vector. - * A run is a series of the same value. - **/ - size_t _run_length(size_t start_index, std::shared_ptr> values) { - if (start_index >= values->size()) { + // Determines the length of the run starting at start_index in the values vector. A run is a series of the same + // value. Even though the input vector is sorted, we first start by linearly scanning for the end of the run. The + // reason is that we know with a high probability that the item we are looking for is at the beginning of the input + // vector (the run value is the first in the sequence and the sequence is sorted). If we do not find the run's end + // within the linearly scanned part, we use a binary search. An unsuccessful linear search should come with + // neglectable costs as we will iterate over the values anyways. + size_t _run_length(size_t start_index, const MaterializedSegment& values) { + if (start_index >= values.size()) { return 0; } - auto start_position = values->begin() + start_index; - auto result = std::upper_bound(start_position, values->end(), *start_position, - [](const auto& a, const auto& b) { return a.value < b.value; }); + const auto begin = values.begin() + start_index; + const auto run_value = begin->value; + + constexpr auto LINEAR_SEARCH_ITEMS = size_t{128}; + auto end = begin + LINEAR_SEARCH_ITEMS; + if (start_index + LINEAR_SEARCH_ITEMS >= values.size()) { + // Set end of linear search to end of input vector if we would overshoot otherwise. + end = values.end(); + } + + const auto linear_search_result = std::find_if(begin, end, [&](const auto& v) { return v.value > run_value; }); + if (linear_search_result != end) { + // Match found within the linearly scanned part. + return std::distance(begin, linear_search_result); + } - return result - start_position; + if (linear_search_result == values.end()) { + // We did not find a larger value in the linearly scanned part and it spanned until the end of the input vector. + // That means all values up to the end are part of the run. + return std::distance(begin, end); + } + + // Binary search in case the run did not end within the linearly scanned part. + const auto binary_search_result = std::upper_bound( + end, values.end(), *end, [](const auto& lhs, const auto& rhs) { return lhs.value < rhs.value; }); + return std::distance(begin, binary_search_result); } - /** - * Compares two values and creates a comparison result. - **/ - CompareResult _compare(T left, T right) { + // Compares two values and creates a comparison result. + CompareResult _compare(const T& left, const T& right) { if (left < right) { return CompareResult::Less; } else if (left == right) { @@ -526,55 +515,59 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Performs the join on a single cluster. Runs of entries with the same value are identified and handled together. - * This constitutes the merge phase of the join. The output combinations of row ids are determined by _join_runs. - **/ + // Performs the join on a single cluster. Runs of entries with the same value are identified and handled together. + // This constitutes the merge phase of the join. The output combinations of row ids are determined by _join_runs. void _join_cluster(const size_t cluster_id, std::optional& multi_predicate_join_evaluator) { - auto& left_cluster = (*_sorted_left_table)[cluster_id]; - auto& right_cluster = (*_sorted_right_table)[cluster_id]; + const auto& left_cluster = _sorted_left_table[cluster_id]; + const auto& right_cluster = _sorted_right_table[cluster_id]; - size_t left_run_start = 0; - size_t right_run_start = 0; + auto left_run_start = size_t{0}; + auto right_run_start = size_t{0}; auto left_run_end = left_run_start + _run_length(left_run_start, left_cluster); auto right_run_end = right_run_start + _run_length(right_run_start, right_cluster); - const size_t left_size = left_cluster->size(); - const size_t right_size = right_cluster->size(); + const auto left_size = left_cluster.size(); + const auto right_size = right_cluster.size(); while (left_run_start < left_size && right_run_start < right_size) { - auto& left_value = (*left_cluster)[left_run_start].value; - auto& right_value = (*right_cluster)[right_run_start].value; + const auto& left_value = left_cluster[left_run_start].value; + const auto& right_value = right_cluster[right_run_start].value; - auto compare_result = _compare(left_value, right_value); + const auto compare_result = _compare(left_value, right_value); TableRange left_run(cluster_id, left_run_start, left_run_end); TableRange right_run(cluster_id, right_run_start, right_run_end); _join_runs(left_run, right_run, compare_result, multi_predicate_join_evaluator, cluster_id); // Advance to the next run on the smaller side or both if equal - if (compare_result == CompareResult::Equal) { - // Advance both runs - left_run_start = left_run_end; - right_run_start = right_run_end; - left_run_end = left_run_start + _run_length(left_run_start, left_cluster); - right_run_end = right_run_start + _run_length(right_run_start, right_cluster); - } else if (compare_result == CompareResult::Less) { - // Advance the left run - left_run_start = left_run_end; - left_run_end = left_run_start + _run_length(left_run_start, left_cluster); - } else { - // Advance the right run - right_run_start = right_run_end; - right_run_end = right_run_start + _run_length(right_run_start, right_cluster); + switch (compare_result) { + case CompareResult::Equal: + // Advance both runs + left_run_start = left_run_end; + right_run_start = right_run_end; + left_run_end = left_run_start + _run_length(left_run_start, left_cluster); + right_run_end = right_run_start + _run_length(right_run_start, right_cluster); + break; + case CompareResult::Less: + // Advance the left run + left_run_start = left_run_end; + left_run_end = left_run_start + _run_length(left_run_start, left_cluster); + break; + case CompareResult::Greater: + // Advance the right run + right_run_start = right_run_end; + right_run_end = right_run_start + _run_length(right_run_start, right_cluster); + break; + default: + throw std::logic_error("Unknown CompareResult"); } } // Join the rest of the unfinished side, which is relevant for outer joins and non-equi joins - auto right_rest = TableRange(cluster_id, right_run_start, right_size); - auto left_rest = TableRange(cluster_id, left_run_start, left_size); + const auto right_rest = TableRange(cluster_id, right_run_start, right_size); + const auto left_rest = TableRange(cluster_id, left_run_start, left_size); if (left_run_start < left_size) { _join_runs(left_rest, right_rest, CompareResult::Less, multi_predicate_join_evaluator, cluster_id); } else if (right_run_start < right_size) { @@ -582,52 +575,48 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Determines the smallest value in a sorted materialized table. - **/ - T& _table_min_value(std::unique_ptr>& sorted_table) { + // Determines the smallest value in a sorted materialized table. + const T& _table_min_value(const MaterializedSegmentList& sorted_table) { DebugAssert(_primary_predicate_condition != PredicateCondition::Equals, "Complete table order is required for _table_min_value() which is only available in the non-equi case"); - DebugAssert(!sorted_table->empty(), "Sorted table has no partitions"); + DebugAssert(!sorted_table.empty(), "Sorted table has no partitions"); - for (const auto& partition : *sorted_table) { - if (!partition->empty()) { - return (*partition)[0].value; + for (const auto& partition : sorted_table) { + if (!partition.empty()) { + return partition[0].value; } } Fail("Every partition is empty"); } - /** - * Determines the largest value in a sorted materialized table. - **/ - T& _table_max_value(std::unique_ptr>& sorted_table) { + // Determines the largest value in a sorted materialized table. + const T& _table_max_value(const MaterializedSegmentList& sorted_table) { DebugAssert(_primary_predicate_condition != PredicateCondition::Equals, - "The table needs to be sorted for _table_max_value() which is only the case in the non-equi case"); - DebugAssert(!sorted_table->empty(), "Sorted table is empty"); + "The table needs to be sorted for _table_max_value() which is only the case for non-equi joins"); + DebugAssert(!sorted_table.empty(), "Sorted table is empty"); - for (size_t partition_id = sorted_table->size() - 1; partition_id < sorted_table->size(); --partition_id) { - if (!(*sorted_table)[partition_id]->empty()) { - return (*sorted_table)[partition_id]->back().value; + for (auto partition_iter = sorted_table.rbegin(); partition_iter != sorted_table.rend(); ++partition_iter) { + if (!partition_iter->empty()) { + return partition_iter->back().value; } } Fail("Every partition is empty"); } - /** - * Looks for the first value in a sorted materialized table that fulfills the specified condition. - * Returns the TablePosition of this element and whether a satisfying element has been found. - **/ + // Looks for the first value in a sorted materialized table that fulfills the specified condition. + // Returns the TablePosition of this element and whether a satisfying element has been found. template - std::optional _first_value_that_satisfies(std::unique_ptr>& sorted_table, - Function condition) { - for (size_t partition_id = 0; partition_id < sorted_table->size(); ++partition_id) { - auto partition = (*sorted_table)[partition_id]; - if (!partition->empty() && condition(partition->back().value)) { - for (size_t index = 0; index < partition->size(); ++index) { - if (condition((*partition)[index].value)) { + std::optional _first_value_that_satisfies(const MaterializedSegmentList& sorted_table, + const Function& condition) { + const auto sorted_table_size = sorted_table.size(); + for (auto partition_id = size_t{0}; partition_id < sorted_table_size; ++partition_id) { + const auto& partition = sorted_table[partition_id]; + if (!partition.empty() && condition(partition.back().value)) { + const auto partition_size = partition.size(); + for (auto index = size_t{0}; index < partition_size; ++index) { + if (condition(partition[index].value)) { return TablePosition(partition_id, index); } } @@ -637,18 +626,18 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { return {}; } - /** - * Looks for the first value in a sorted materialized table that fulfills the specified condition, but searches - * the table in reverse order. Returns the TablePosition of this element, and a satisfying element has been found. - **/ + // Looks for the first value in a sorted materialized table that fulfills the specified condition, but searches + // the table in reverse order. Returns the TablePosition of this element, and a satisfying element has been found. template - std::optional _first_value_that_satisfies_reverse( - std::unique_ptr>& sorted_table, Function condition) { - for (size_t partition_id = sorted_table->size() - 1; partition_id < sorted_table->size(); --partition_id) { - auto partition = (*sorted_table)[partition_id]; - if (!partition->empty() && condition((*partition)[0].value)) { - for (size_t index = partition->size() - 1; index < partition->size(); --index) { - if (condition((*partition)[index].value)) { + std::optional _first_value_that_satisfies_reverse(const MaterializedSegmentList& sorted_table, + const Function& condition) { + const auto sorted_table_size = sorted_table.size(); + for (auto partition_id = static_cast(sorted_table_size - 1); partition_id >= 0; --partition_id) { + const auto& partition = sorted_table[partition_id]; + if (!partition.empty() && condition(partition[0].value)) { + const auto partition_size = partition.size(); + for (auto index = static_cast(partition_size - 1); index >= 0; --index) { + if (condition(partition[index].value)) { return TablePosition(partition_id, index + 1); } } @@ -658,11 +647,9 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { return {}; } - /** - * Adds the rows without matches for right outer joins for non-equi operators (<, <=, >, >=). - * This method adds those rows from the right table to the output that do not find a join partner. - * The outer join for the equality operator is handled in _join_runs instead. - **/ + // Adds the rows without matches for right outer joins for non-equi operators (<, <=, >, >=). + // This method adds those rows from the right table to the output that do not find a join partner. + // The outer join for the equality operator is handled in _join_runs instead. void _right_outer_non_equi_join() { auto end_of_right_table = _end_of_table(_sorted_right_table); @@ -671,8 +658,8 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { return; } - auto& left_min_value = _table_min_value(_sorted_left_table); - auto& left_max_value = _table_max_value(_sorted_left_table); + const auto& left_min_value = _table_min_value(_sorted_left_table); + const auto& left_max_value = _table_max_value(_sorted_left_table); auto unmatched_range = std::optional{}; @@ -717,8 +704,8 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { // Add null-combinations for right row ids where the primary predicate was satisfied but the // secondary predicates were not. if (!_secondary_join_predicates.empty()) { - for (const auto& cluster : *_sorted_right_table) { - for (const auto& row : *cluster) { + for (const auto& cluster : _sorted_right_table) { + for (const auto& row : cluster) { if (!_right_row_ids_emitted.contains(row.row_id)) { _emit_combination(0, NULL_ROW_ID, row.row_id); } @@ -727,11 +714,10 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Adds the rows without matches for left outer joins for non-equi operators (<, <=, >, >=). - * This method adds those rows from the left table to the output that do not find a join partner. - * The outer join for the equality operator is handled in _join_runs instead. - **/ + + // Adds the rows without matches for left outer joins for non-equi operators (<, <=, >, >=). + // This method adds those rows from the left table to the output that do not find a join partner. + // The outer join for the equality operator is handled in _join_runs instead. void _left_outer_non_equi_join() { auto end_of_left_table = _end_of_table(_sorted_left_table); @@ -740,8 +726,8 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { return; } - auto& right_min_value = _table_min_value(_sorted_right_table); - auto& right_max_value = _table_max_value(_sorted_right_table); + const auto& right_min_value = _table_min_value(_sorted_right_table); + const auto& right_max_value = _table_max_value(_sorted_right_table); auto unmatched_range = std::optional{}; @@ -786,8 +772,8 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { // Add null-combinations for left row ids where the primary predicate was satisfied but the // secondary predicates were not. if (!_secondary_join_predicates.empty()) { - for (const auto& cluster : *_sorted_left_table) { - for (const auto& row : *cluster) { + for (const auto& cluster : _sorted_left_table) { + for (const auto& row : cluster) { if (!_left_row_ids_emitted.contains(row.row_id)) { _emit_combination(0, row.row_id, NULL_ROW_ID); } @@ -796,11 +782,9 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } } - /** - * Performs the join on all clusters in parallel. - **/ + // Performs the join on all clusters in parallel. void _perform_join() { - std::vector> jobs; + auto jobs = std::vector>{}; _left_row_ids_emitted_per_chunk.resize(_cluster_count); _right_row_ids_emitted_per_chunk.resize(_cluster_count); @@ -812,7 +796,7 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { // Avoid empty jobs for inner equi joins if (_mode == JoinMode::Inner && _primary_predicate_condition == PredicateCondition::Equals) { - if ((*_sorted_left_table)[cluster_id]->empty() || (*_sorted_right_table)[cluster_id]->empty()) { + if (_sorted_left_table[cluster_id].empty() || _sorted_right_table[cluster_id].empty()) { continue; } } @@ -820,8 +804,7 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { _left_row_ids_emitted_per_chunk[cluster_id] = RowHashSet{}; _right_row_ids_emitted_per_chunk[cluster_id] = RowHashSet{}; - const auto merge_row_count = - (*_sorted_left_table)[cluster_id]->size() + (*_sorted_right_table)[cluster_id]->size(); + const auto merge_row_count = _sorted_left_table[cluster_id].size() + _sorted_right_table[cluster_id].size(); const auto join_cluster_task = [this, cluster_id] { // Accessors are not thread-safe, so we create one evaluator per job std::optional multi_predicate_join_evaluator; @@ -834,7 +817,7 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { this->_join_cluster(cluster_id, multi_predicate_join_evaluator); }; - if (merge_row_count > JOB_SPAWN_THRESHOLD * 2) { + if (merge_row_count > JOB_SPAWN_THRESHOLD) { jobs.push_back(std::make_shared(join_cluster_task)); } else { join_cluster_task(); @@ -863,12 +846,9 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { } public: - /** - * Executes the SortMergeJoin operator. - **/ std::shared_ptr _on_execute() override { - bool include_null_left = (_mode == JoinMode::Left || _mode == JoinMode::FullOuter); - bool include_null_right = (_mode == JoinMode::Right || _mode == JoinMode::FullOuter); + const auto include_null_left = (_mode == JoinMode::Left || _mode == JoinMode::FullOuter); + const auto include_null_right = (_mode == JoinMode::Right || _mode == JoinMode::FullOuter); auto radix_clusterer = RadixClusterSort( _sort_merge_join.left_input_table(), _sort_merge_join.right_input_table(), _sort_merge_join._primary_predicate.column_ids, _primary_predicate_condition == PredicateCondition::Equals, @@ -877,8 +857,8 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { auto sort_output = radix_clusterer.execute(); _sorted_left_table = std::move(sort_output.clusters_left); _sorted_right_table = std::move(sort_output.clusters_right); - _null_rows_left = std::move(*sort_output.null_rows_left); - _null_rows_right = std::move(*sort_output.null_rows_right); + _null_rows_left = std::move(sort_output.null_rows_left); + _null_rows_right = std::move(sort_output.null_rows_right); _end_of_left_table = _end_of_table(_sorted_left_table); _end_of_right_table = _end_of_table(_sorted_right_table); @@ -940,7 +920,6 @@ class JoinSortMerge::JoinSortMergeImpl : public AbstractReadOnlyOperatorImpl { SortColumnDefinition(right_join_column, SortMode::Ascending)}); } } - _performance.set_step_runtime(OperatorSteps::OutputWriting, timer.lap()); auto result_table = _sort_merge_join._build_output_table(std::move(output_chunks)); diff --git a/src/lib/operators/join_sort_merge.hpp b/src/lib/operators/join_sort_merge.hpp index 60e1da37ea..71d841bdb2 100644 --- a/src/lib/operators/join_sort_merge.hpp +++ b/src/lib/operators/join_sort_merge.hpp @@ -42,6 +42,12 @@ class JoinSortMerge : public AbstractJoinOperator { OutputWriting }; + // Tasks are added to the scheduler in case the number of rows to process is above JOB_SPAWN_THRESHOLD. If not, + // the task is executed directly. This threshold has been determined by executing a multi-threaded and shuffled TPC-H + // run (28 cores and 50 clients). With larger system changes (e.g., scheduling), the threshold needs to be + // re-evaluated again. + static constexpr auto JOB_SPAWN_THRESHOLD = 500; + protected: std::shared_ptr _on_execute() override; void _on_cleanup() override; @@ -57,11 +63,6 @@ class JoinSortMerge : public AbstractJoinOperator { friend class JoinSortMergeImpl; std::unique_ptr _impl; - - // Tasks are added to the scheduler in case the number of elements to process is above JOB_SPAWN_THRESHOLD. If not, - // the task is executed directly. This threshold needs to be re-evaluated over time to find the value which gives the - // best performance. - static constexpr auto JOB_SPAWN_THRESHOLD = 500; }; } // namespace opossum diff --git a/src/lib/operators/join_sort_merge/column_materializer.hpp b/src/lib/operators/join_sort_merge/column_materializer.hpp index 698d054f3f..6320383c91 100644 --- a/src/lib/operators/join_sort_merge/column_materializer.hpp +++ b/src/lib/operators/join_sort_merge/column_materializer.hpp @@ -6,6 +6,8 @@ #include #include +#include + #include "hyrise.hpp" #include "resolve_type.hpp" #include "scheduler/job_task.hpp" @@ -30,102 +32,92 @@ template using MaterializedSegment = std::vector>; template -using MaterializedSegmentList = std::vector>>; - -/** - * This data structure is passed as a reference to the jobs which materialize - * the chunks. Each job then adds `samples_to_collect` samples to its passed - * SampleRequest. All SampleRequests are later merged to gather a global sample - * list with which the split values for the radix partitioning are determined. - */ +using MaterializedSegmentList = std::vector>; + +// SubSamples are passed into the materialization phase. They are used to pass the number of requested samples and +// are filled by the materialization tasks. All SubSamples are later merged to gather a global sample list with which +// the split values for the radix partitioning are determined. template struct Subsample { - explicit Subsample(ChunkOffset sample_count) : samples_to_collect(sample_count), samples() {} + explicit Subsample(ChunkOffset sample_count) : samples_to_collect(sample_count), samples(sample_count) {} const ChunkOffset samples_to_collect; std::vector samples; }; -/** - * Materializes a table for a specific segment and sorts it if required. Result is a triple of - * materialized values, positions of NULL values, and a list of samples. - **/ +// Materializes a column and sorts it if requested. template class ColumnMaterializer { public: explicit ColumnMaterializer(bool sort, bool materialize_null) : _sort{sort}, _materialize_null{materialize_null} {} public: - /** - * Materializes and sorts all the chunks of an input table in parallel - * by creating multiple jobs that materialize chunks. - * Returns the materialized segments and a list of null row ids if materialize_null is enabled. - **/ - std::tuple>, std::unique_ptr, std::vector> materialize( - const std::shared_ptr input, const ColumnID column_id) { - const ChunkOffset samples_per_chunk = 10; // rather arbitrarily chosen number + // For sufficiently large chunks (number of rows > JOB_SPAWN_THRESHOLD), the materialization is parallelized. Returns + // the materialized segments and a list of null row ids if _materialize_null is true. + std::tuple, RowIDPosList, std::vector> materialize( + const std::shared_ptr& input, const ColumnID column_id) { + constexpr auto SAMPLES_PER_CHUNK = ChunkOffset{10}; const auto chunk_count = input->chunk_count(); - auto output = std::make_unique>(chunk_count); - auto null_rows = std::make_unique(); + auto output = MaterializedSegmentList(chunk_count); - std::vector> subsamples; + auto null_rows_per_chunk = std::vector(chunk_count); + auto subsamples = std::vector>{}; subsamples.reserve(chunk_count); - std::vector> jobs; - for (ChunkID chunk_id{0}; chunk_id < chunk_count; ++chunk_id) { - const auto chunk = input->get_chunk(chunk_id); + auto jobs = std::vector>{}; + for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count; ++chunk_id) { + const auto& chunk = input->get_chunk(chunk_id); Assert(chunk, "Physically deleted chunk should not reach this point, see get_chunk / #1686."); + const auto chunk_size = chunk->size(); - const auto samples_to_write = std::min(samples_per_chunk, chunk->size()); + const auto samples_to_write = std::min(SAMPLES_PER_CHUNK, chunk_size); subsamples.push_back(Subsample(samples_to_write)); - jobs.push_back( - _create_chunk_materialization_job(output, null_rows, chunk_id, input, column_id, subsamples.back())); + auto materialize_job = [&, chunk_id] { + const auto& segment = input->get_chunk(chunk_id)->get_segment(column_id); + output[chunk_id] = _materialize_segment(segment, chunk_id, null_rows_per_chunk[chunk_id], subsamples[chunk_id]); + }; + + if (chunk_size > JoinSortMerge::JOB_SPAWN_THRESHOLD) { + jobs.push_back(std::make_shared(materialize_job)); + } else { + materialize_job(); + } } Hyrise::get().scheduler()->schedule_and_wait_for_tasks(jobs); auto gathered_samples = std::vector(); - gathered_samples.reserve(samples_per_chunk * chunk_count); + gathered_samples.reserve(SAMPLES_PER_CHUNK * chunk_count); + + auto null_row_count = size_t{0}; + for (const auto& null_rows : null_rows_per_chunk) { + null_row_count += null_rows.size(); + } + auto null_rows = RowIDPosList{}; + null_rows.reserve(null_row_count); - for (const auto& subsample : subsamples) { + for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count; ++chunk_id) { + const auto& subsample = subsamples[chunk_id]; gathered_samples.insert(gathered_samples.end(), subsample.samples.begin(), subsample.samples.end()); + + const auto& chunk_null_rows = null_rows_per_chunk[chunk_id]; + null_rows.insert(null_rows.end(), chunk_null_rows.begin(), chunk_null_rows.end()); } + gathered_samples.shrink_to_fit(); return {std::move(output), std::move(null_rows), std::move(gathered_samples)}; } private: - /** - * Creates a job to materialize and sort a chunk. - **/ - std::shared_ptr _create_chunk_materialization_job(std::unique_ptr>& output, - std::unique_ptr& null_rows_output, - const ChunkID chunk_id, - std::shared_ptr input, - const ColumnID column_id, Subsample& subsample) { - return std::make_shared([this, &output, &null_rows_output, input, column_id, chunk_id, &subsample] { - auto segment = input->get_chunk(chunk_id)->get_segment(column_id); - - if (const auto dictionary_segment = std::dynamic_pointer_cast>(segment)) { - (*output)[chunk_id] = - _materialize_dictionary_segment(*dictionary_segment, chunk_id, null_rows_output, subsample); - } else { - (*output)[chunk_id] = _materialize_generic_segment(*segment, chunk_id, null_rows_output, subsample); - } - }); - } - - /** - * Samples values from a materialized segment. - * We collect samples locally and write once to the global sample collection to limit non-local writes. - */ + // Sample values from a materialized segment. We collect samples locally and write once to the global sample + // collection to limit non-local writes. void _gather_samples_from_segment(const MaterializedSegment& segment, Subsample& subsample) const { const auto samples_to_collect = subsample.samples_to_collect; - std::vector collected_samples; - collected_samples.reserve(samples_to_collect); if (segment.size() > 0 && samples_to_collect > 0) { + auto collected_samples = std::vector{}; + collected_samples.reserve(samples_to_collect); const auto step_width = segment.size() / std::max(1u, samples_to_collect); for (auto sample_count = size_t{0}; sample_count < samples_to_collect; ++sample_count) { @@ -139,21 +131,16 @@ class ColumnMaterializer { } } - /** - * Materialization works of all types of segments - */ - std::shared_ptr> _materialize_generic_segment(const AbstractSegment& segment, - const ChunkID chunk_id, - std::unique_ptr& null_rows_output, - Subsample& subsample) { + MaterializedSegment _materialize_segment(const std::shared_ptr& segment, const ChunkID chunk_id, + RowIDPosList& null_rows_output, Subsample& subsample) { auto output = MaterializedSegment{}; - output.reserve(segment.size()); + output.reserve(segment->size()); - segment_iterate(segment, [&](const auto& position) { + segment_iterate(*segment, [&](const auto& position) { const auto row_id = RowID{chunk_id, position.chunk_offset()}; if (position.is_null()) { if (_materialize_null) { - null_rows_output->emplace_back(row_id); + null_rows_output.emplace_back(row_id); } } else { output.emplace_back(row_id, position.value()); @@ -161,82 +148,13 @@ class ColumnMaterializer { }); if (_sort) { - std::sort(output.begin(), output.end(), - [](const auto& left, const auto& right) { return left.value < right.value; }); - } - - _gather_samples_from_segment(output, subsample); - - return std::make_shared>(std::move(output)); - } - - /** - * Specialization for dictionary segments - */ - std::shared_ptr> _materialize_dictionary_segment( - const DictionarySegment& segment, const ChunkID chunk_id, std::unique_ptr& null_rows_output, - Subsample& subsample) { - auto output = MaterializedSegment{}; - output.reserve(segment.size()); - - auto base_attribute_vector = segment.attribute_vector(); - auto dict = segment.dictionary(); - - if (_sort) { - // Works like Bucket Sort - // Collect for every value id, the set of rows that this value appeared in - // value_count is used as an inverted index - auto rows_with_value = std::vector>(dict->size()); - - // Reserve correct size of the vectors by assuming a uniform distribution - for (auto& row : rows_with_value) { - row.reserve(base_attribute_vector->size() / dict->size()); - } - - // Collect the rows for each value id - resolve_compressed_vector_type(*base_attribute_vector, [&](const auto& attribute_vector) { - auto chunk_offset = ChunkOffset{0u}; - auto value_id_it = attribute_vector.cbegin(); - auto null_value_id = segment.null_value_id(); - - for (; value_id_it != attribute_vector.cend(); ++value_id_it, ++chunk_offset) { - auto value_id = static_cast(*value_id_it); - - if (value_id != null_value_id) { - rows_with_value[value_id].push_back(RowID{chunk_id, chunk_offset}); - } else { - if (_materialize_null) { - null_rows_output->push_back(RowID{chunk_id, chunk_offset}); - } - } - } - }); - - // Now that we know the row ids for every value, we can output all the materialized values in a sorted manner. - ChunkOffset chunk_offset{0}; - for (ValueID value_id{0}; value_id < dict->size(); ++value_id) { - for (auto& row_id : rows_with_value[value_id]) { - output.emplace_back(row_id, (*dict)[value_id]); - ++chunk_offset; - } - } - } else { - auto iterable = create_iterable_from_segment(segment); - iterable.for_each([&](const auto& position) { - const auto row_id = RowID{chunk_id, position.chunk_offset()}; - if (position.is_null()) { - if (_materialize_null) { - null_rows_output->emplace_back(row_id); - } - } else { - output.emplace_back(row_id, position.value()); - } - }); + boost::sort::pdqsort(output.begin(), output.end(), + [](const auto& left, const auto& right) { return left.value < right.value; }); } _gather_samples_from_segment(output, subsample); - return std::make_shared>(std::move(output)); + return output; } private: diff --git a/src/lib/operators/join_sort_merge/radix_cluster_sort.hpp b/src/lib/operators/join_sort_merge/radix_cluster_sort.hpp index 8ab436b840..cb856b846b 100644 --- a/src/lib/operators/join_sort_merge/radix_cluster_sort.hpp +++ b/src/lib/operators/join_sort_merge/radix_cluster_sort.hpp @@ -8,6 +8,8 @@ #include #include +#include + #include "column_materializer.hpp" #include "hyrise.hpp" #include "resolve_type.hpp" @@ -15,39 +17,33 @@ namespace opossum { -/** -* The RadixClusterOutput holds the data structures that belong to the output of the clustering stage. -*/ +// The RadixClusterOutput holds the data structures that belong to the output of the clustering stage. template struct RadixClusterOutput { - std::unique_ptr> clusters_left; - std::unique_ptr> clusters_right; - std::unique_ptr null_rows_left; - std::unique_ptr null_rows_right; + MaterializedSegmentList clusters_left; + MaterializedSegmentList clusters_right; + RowIDPosList null_rows_left; + RowIDPosList null_rows_right; }; -/* -* -* Performs radix clustering for the sort merge join. The radix clustering algorithm clusters on the basis -* of the least significant bits of the values because the values there are much more evenly distributed than for the -* most significant bits. As a result, equal values always get moved to the same cluster and the clusters are -* sorted in themselves but not in between the clusters. This is okay for the equi join, because we are only interested -* in equality. In the case of a non-equi join however, complete sortedness is required, because join matches exist -* beyond cluster borders. Therefore, the clustering defaults to a range clustering algorithm for the non-equi-join. -* General clustering process: -* -> Input chunks are materialized and sorted. Every value is stored together with its row id. -* -> Then, either radix clustering or range clustering is performed. -* -> At last, the resulting clusters are sorted. -* -* Radix clustering example: -* cluster_count = 4 -* bits for 4 clusters: 2 -* -* 000001|01 -* 000000|11 -* ˆ right bits are used for clustering -* -**/ +// Performs radix clustering for the sort merge join. The radix clustering algorithm clusters on the basis of the least +// significant bits of the values because the values there are much more evenly distributed than for the most +// significant bits. As a result, equal values always get moved to the same cluster and the clusters are sorted in +// themselves but not in between the clusters. This approach works for equi-joins, because we are only interested in +// equality. In the case of a non-equi-joins however, complete sortedness is required, because join matches exist beyond +// cluster borders. Therefore, the clustering defaults to a range clustering algorithm for the non-equi-join. +// General clustering process: +// -> Input chunks are materialized and sorted. Every value is stored together with its row id. +// -> Then, either radix clustering or range clustering is performed. +// -> At last, the resulting clusters are sorted. +// +// Radix clustering example: +// cluster_count = 4 +// bits for 4 clusters: 2 +// +// 000001|01 +// 000000|11 +// ˆ right-most bits are used for clustering template class RadixClusterSort { public: @@ -84,10 +80,8 @@ class RadixClusterSort { } protected: - /** - * The ChunkInformation structure is used to gather statistics regarding a chunk's values in order to - * be able to appropriately reserve space for the clustering output. - **/ + // The ChunkInformation structure is used to gather statistics regarding a chunk's values in order to be able to + // appropriately reserve space for the clustering output. OperatorPerformanceData& _performance; struct ChunkInformation { @@ -106,18 +100,11 @@ class RadixClusterSort { std::vector insert_position; }; - /** - * The TableInformation structure is used to gather statistics regarding the value distribution of a table - * and its chunks in order to be able to appropriately reserve space for the clustering output. - **/ + // The TableInformation structure is used to gather statistics regarding the value distribution of a table and its + // chunks in order to be able to appropriately reserve space for the clustering output. struct TableInformation { - TableInformation(size_t chunk_count, size_t cluster_count) { - cluster_histogram.resize(cluster_count); - chunk_information.reserve(chunk_count); - for (size_t i = 0; i < chunk_count; ++i) { - chunk_information.push_back(ChunkInformation(cluster_count)); - } - } + TableInformation(size_t chunk_count, size_t cluster_count) + : cluster_histogram(cluster_count, 0), chunk_information(chunk_count, ChunkInformation(cluster_count)) {} // Used to count the number of entries for each cluster from the whole table std::vector cluster_histogram; std::vector chunk_information; @@ -137,98 +124,101 @@ class RadixClusterSort { bool _materialize_null_left; bool _materialize_null_right; - /** - * Determines the total size of a materialized segment list. - **/ - static size_t _materialized_table_size(std::unique_ptr>& table) { - size_t total_size = 0; - for (auto chunk : *table) { - total_size += chunk->size(); + // Determines the total size of a materialized segment list. + static size_t _materialized_table_size(const MaterializedSegmentList& table) { + auto total_size = size_t{0}; + for (const auto& chunk : table) { + total_size += chunk.size(); } return total_size; } - /** - * Concatenates multiple materialized segments to a single materialized segment. - **/ - static std::unique_ptr> _concatenate_chunks( - std::unique_ptr>& input_chunks) { - auto output_table = std::make_unique>(1); - (*output_table)[0] = std::make_shared>(); + // Concatenates multiple materialized segments to a single materialized segment. + static MaterializedSegmentList _concatenate_chunks(const MaterializedSegmentList& input_chunks) { + auto output_table = MaterializedSegmentList(1); // Reserve the required space and move the data to the output - auto output_chunk = (*output_table)[0]; - output_chunk->reserve(_materialized_table_size(input_chunks)); - for (auto& chunk : *input_chunks) { - output_chunk->insert(output_chunk->end(), chunk->begin(), chunk->end()); + auto& output_chunk = output_table[0]; + output_chunk.reserve(_materialized_table_size(input_chunks)); + for (const auto& chunk : input_chunks) { + output_chunk.insert(output_chunk.end(), chunk.begin(), chunk.end()); } return output_table; } - /** - * Performs the clustering on a materialized table using a clustering function that determines for each - * value the appropriate cluster id. This is how the clustering works: - * -> Count for each chunk how many of its values belong in each of the clusters using histograms. - * -> Aggregate the per-chunk histograms to a histogram for the whole table. For each chunk it is noted where - * it will be inserting values in each cluster. - * -> Reserve the appropriate space for each output cluster to avoid ongoing vector resizing. - * -> At last, each value of each chunk is moved to the appropriate cluster. - **/ - std::unique_ptr> _cluster(const std::unique_ptr>& input_chunks, - std::function clusterer) { - auto output_table = std::make_unique>(_cluster_count); - TableInformation table_information(input_chunks->size(), _cluster_count); + // Clusters a materialized table using a given clustering function that determines the appropriate cluster id for + // each value. + // -> For each chunk, count how many of its values belong in each of the clusters using histograms. + // -> Aggregate the per-chunk histograms to a histogram for the whole table. + // -> Reserve the appropriate space for each output cluster to avoid ongoing vector resizing. + // -> At last, each value of each chunk is moved to the appropriate cluster. The created histogram denotes where + // the concurrent tasks can write the data to without the need for synchronization. + MaterializedSegmentList _cluster(const MaterializedSegmentList& input_chunks, + const std::function& clusterer) { + auto output_table = MaterializedSegmentList(_cluster_count); + + const auto input_chunk_count = input_chunks.size(); + TableInformation table_information(input_chunk_count, _cluster_count); // Count for every chunk the number of entries for each cluster in parallel - std::vector> histogram_jobs; - for (size_t chunk_number = 0; chunk_number < input_chunks->size(); ++chunk_number) { + auto histogram_jobs = std::vector>{}; + for (auto chunk_number = size_t{0}; chunk_number < input_chunk_count; ++chunk_number) { auto& chunk_information = table_information.chunk_information[chunk_number]; - auto input_chunk = (*input_chunks)[chunk_number]; + const auto& input_chunk = input_chunks[chunk_number]; // Count the number of entries for each cluster to be able to reserve the appropriate output space later. - auto job = std::make_shared([input_chunk, &clusterer, &chunk_information] { - for (auto& entry : *input_chunk) { - auto cluster_id = clusterer(entry.value); + auto histogram_job = [&] { + for (const auto& entry : input_chunk) { + const auto cluster_id = clusterer(entry.value); ++chunk_information.cluster_histogram[cluster_id]; } - }); + }; - histogram_jobs.push_back(job); + if (input_chunk.size() > JoinSortMerge::JOB_SPAWN_THRESHOLD) { + histogram_jobs.push_back(std::make_shared(histogram_job)); + } else { + histogram_job(); + } } Hyrise::get().scheduler()->schedule_and_wait_for_tasks(histogram_jobs); // Aggregate the chunks histograms to a table histogram and initialize the insert positions for each chunk for (auto& chunk_information : table_information.chunk_information) { - for (size_t cluster_id = 0; cluster_id < _cluster_count; ++cluster_id) { + for (auto cluster_id = size_t{0}; cluster_id < _cluster_count; ++cluster_id) { chunk_information.insert_position[cluster_id] = table_information.cluster_histogram[cluster_id]; table_information.cluster_histogram[cluster_id] += chunk_information.cluster_histogram[cluster_id]; } } // Reserve the appropriate output space for the clusters - for (size_t cluster_id = 0; cluster_id < _cluster_count; ++cluster_id) { - auto cluster_size = table_information.cluster_histogram[cluster_id]; - (*output_table)[cluster_id] = std::make_shared>(cluster_size); + for (auto cluster_id = size_t{0}; cluster_id < _cluster_count; ++cluster_id) { + const auto cluster_size = table_information.cluster_histogram[cluster_id]; + output_table[cluster_id] = MaterializedSegment(cluster_size); } // Move each entry into its appropriate cluster in parallel - std::vector> cluster_jobs; - for (size_t chunk_number = 0; chunk_number < input_chunks->size(); ++chunk_number) { - auto job = - std::make_shared([chunk_number, &output_table, &input_chunks, &table_information, &clusterer] { - auto& chunk_information = table_information.chunk_information[chunk_number]; - for (auto& entry : *(*input_chunks)[chunk_number]) { - auto cluster_id = clusterer(entry.value); - auto& output_cluster = *(*output_table)[cluster_id]; - auto& insert_position = chunk_information.insert_position[cluster_id]; - output_cluster[insert_position] = entry; - ++insert_position; - } - }); - cluster_jobs.push_back(job); + auto cluster_jobs = std::vector>{}; + for (auto chunk_number = size_t{0}; chunk_number < input_chunk_count; ++chunk_number) { + const auto& input_chunk = input_chunks[chunk_number]; + auto cluster_job = [&, chunk_number] { + auto& chunk_information = table_information.chunk_information[chunk_number]; + for (const auto& entry : input_chunk) { + const auto cluster_id = clusterer(entry.value); + auto& output_cluster = output_table[cluster_id]; + auto& insert_position = chunk_information.insert_position[cluster_id]; + output_cluster[insert_position] = entry; + ++insert_position; + } + }; + + if (input_chunk.size() > JoinSortMerge::JOB_SPAWN_THRESHOLD) { + cluster_jobs.push_back(std::make_shared(cluster_job)); + } else { + cluster_job(); + } } Hyrise::get().scheduler()->schedule_and_wait_for_tasks(cluster_jobs); @@ -236,43 +226,32 @@ class RadixClusterSort { return output_table; } - /** - * Performs least significant bit radix clustering which is used in the equi join case. - * Note: if we used the most significant bits, we could also use this for non-equi joins. - * Then, however we would have to deal with skewed clusters. Other ideas: - * - manually select the clustering bits based on statistics. - * - consolidate clusters in order to reduce skew. - **/ - std::unique_ptr> _radix_cluster( - std::unique_ptr>& input_chunks) { + // Performs least significant bit radix clustering which is used in the equi join case. + MaterializedSegmentList _radix_cluster(const MaterializedSegmentList& input_chunks) { auto radix_bitmask = _cluster_count - 1; - return _cluster(input_chunks, [=](const T& value) { return get_radix(value, radix_bitmask); }); + return _cluster(input_chunks, [radix_bitmask](const T& value) { return get_radix(value, radix_bitmask); }); } - /** - * Picks split values from the given sample values. Each split value denotes the inclusive - * upper bound of its corresponding cluster (i.e., split #0 is the upper bound of cluster #0). - * As the last cluster does not require an upper bound, the returned vector size is usually - * the cluster count minus one. However, it can be even shorter (e.g., attributes where - * #distinct values < #cluster count). - * - * Procedure: passed values are sorted and samples are picked from the whole sample - * value range in fixed widths. Repeated values are not removed before picking to handle - * skewed inputs. However, the final split values are unique. As a consequence, the split - * value vector might contain less values than `_cluster_count - 1`. - **/ - const std::vector _pick_split_values(std::vector sample_values) const { - std::sort(sample_values.begin(), sample_values.end()); + // Picks split values from the given sample values. Each split value denotes the inclusive upper bound of its + // corresponding cluster (i.e., split #0 is the upper bound of cluster #0). As the last cluster does not require an + // upper bound, the returned vector size is usually the cluster count minus one. However, it can be even shorter + // (e.g., attributes where #distinct values < #cluster count; in this case, empty clusters will be created). + // Procedure: passed values are sorted and samples are picked from the whole sample value range in constant + // distances. Repeated values are not removed. Thereby, they have a higher chance of being picked which should + // cover skewed inputs. However, the final split values + // are unique. As a consequence, the split value vector might contain less values than `_cluster_count - 1`. + const std::vector _pick_split_values(std::vector&& sample_values) const { + boost::sort::pdqsort(sample_values.begin(), sample_values.end()); if (sample_values.size() <= _cluster_count - 1) { const auto last = std::unique(sample_values.begin(), sample_values.end()); sample_values.erase(last, sample_values.end()); - return sample_values; + return std::move(sample_values); } - std::vector split_values; + auto split_values = std::vector{}; split_values.reserve(_cluster_count - 1); - auto jump_width = sample_values.size() / _cluster_count; + const auto jump_width = sample_values.size() / _cluster_count; for (auto sample_offset = size_t{0}; sample_offset < _cluster_count - 1; ++sample_offset) { split_values.push_back(sample_values[static_cast((sample_offset + 1) * jump_width)]); } @@ -282,15 +261,13 @@ class RadixClusterSort { return split_values; } - /** - * Performs the range cluster sort for the non-equi case (>, >=, <, <=, !=) which requires the complete table to - * be sorted and not only the clusters in themselves. Returns the clustered data from the left table and the - * right table in a pair. - **/ - std::pair>, std::unique_ptr>> _range_cluster( - const std::unique_ptr>& left_input, - const std::unique_ptr>& right_input, std::vector sample_values) { - const std::vector split_values = _pick_split_values(sample_values); + // Performs the range cluster sort for the non-equi case (>, >=, <, <=, !=) which requires the complete table to be + // sorted and not only the clusters in themselves. Returns the clustered data from the left table and the right table + // as a pair. + std::pair, MaterializedSegmentList> _range_cluster( + const MaterializedSegmentList& left_input, const MaterializedSegmentList& right_input, + std::vector&& sample_values) { + const std::vector split_values = _pick_split_values(std::move(sample_values)); // Implements range clustering auto clusterer = [&split_values](const T& value) { @@ -308,25 +285,33 @@ class RadixClusterSort { return split_values.size(); }; - auto output_left = _cluster(left_input, clusterer); - auto output_right = _cluster(right_input, clusterer); + const auto output_left = _cluster(left_input, clusterer); + const auto output_right = _cluster(right_input, clusterer); return {std::move(output_left), std::move(output_right)}; } - /** - * Sorts all clusters of a materialized table. - **/ - void _sort_clusters(std::unique_ptr>& clusters) { - for (auto cluster : *clusters) { - std::sort(cluster->begin(), cluster->end(), [](auto& left, auto& right) { return left.value < right.value; }); + // Sorts all clusters of a materialized table. + void _sort_clusters(MaterializedSegmentList& clusters) { + auto sort_jobs = std::vector>{}; + for (auto cluster_id = size_t{0}; cluster_id < clusters.size(); ++cluster_id) { + const auto cluster_size = clusters[cluster_id].size(); + auto sort_job = [&, cluster_id] { + auto& cluster = clusters[cluster_id]; + boost::sort::pdqsort(cluster.begin(), cluster.end(), + [](const auto& left, const auto& right) { return left.value < right.value; }); + }; + + if (cluster_size > JoinSortMerge::JOB_SPAWN_THRESHOLD) { + sort_jobs.push_back(std::make_shared(sort_job)); + } else { + sort_job(); + } } + Hyrise::get().scheduler()->schedule_and_wait_for_tasks(sort_jobs); } public: - /** - * Executes the clustering and sorting. - **/ RadixClusterOutput execute() { RadixClusterOutput output; @@ -355,17 +340,14 @@ class RadixClusterSort { output.clusters_left = _radix_cluster(materialized_left_segments); output.clusters_right = _radix_cluster(materialized_right_segments); } else { - auto result = _range_cluster(materialized_left_segments, materialized_right_segments, samples_left); + auto result = _range_cluster(materialized_left_segments, materialized_right_segments, std::move(samples_left)); output.clusters_left = std::move(result.first); output.clusters_right = std::move(result.second); } _performance.set_step_runtime(JoinSortMerge::OperatorSteps::Clustering, timer.lap()); - // Sort each cluster (right now std::sort -> but maybe can be replaced with - // an more efficient algorithm if subparts are already sorted [InsertionSort?!]) _sort_clusters(output.clusters_left); _sort_clusters(output.clusters_right); - _performance.set_step_runtime(JoinSortMerge::OperatorSteps::Sorting, timer.lap()); return output;