Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Union that can keep sort order #1834

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>

#include "engine/Sort.h"
#include "engine/Union.h"
#include "global/RuntimeParameters.h"

using std::string;
Expand Down Expand Up @@ -164,7 +165,14 @@ std::shared_ptr<QueryExecutionTree> QueryExecutionTree::createSortedTree(
return qet;
}

// Push down sort into Union.
QueryExecutionContext* qec = qet->getRootOperation()->getExecutionContext();
if (auto unionOperation =
std::dynamic_pointer_cast<Union>(qet->getRootOperation())) {
return std::make_shared<QueryExecutionTree>(
qec, unionOperation->createSortedVariant(sortColumns));
}

auto sort = std::make_shared<Sort>(qec, std::move(qet), sortColumns);
return std::make_shared<QueryExecutionTree>(qec, std::move(sort));
}
Expand Down
213 changes: 210 additions & 3 deletions src/engine/Union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@

Union::Union(QueryExecutionContext* qec,
const std::shared_ptr<QueryExecutionTree>& t1,
const std::shared_ptr<QueryExecutionTree>& t2)
: Operation(qec) {
const std::shared_ptr<QueryExecutionTree>& t2,
std::vector<ColumnIndex> targetOrder)
: Operation(qec), targetOrder_{std::move(targetOrder)} {
AD_CONTRACT_CHECK(t1 && t2);
_subtrees[0] = t1;
_subtrees[1] = t2;
Expand Down Expand Up @@ -45,13 +46,42 @@
AD_CORRECTNESS_CHECK(ql::ranges::all_of(_columnOrigins, [](const auto& el) {
return el[0] != NO_COLUMN || el[1] != NO_COLUMN;
}));

if (!targetOrder_.empty()) {
auto computeSortOrder = [this](bool left) {
vector<ColumnIndex> specificSortOrder;
for (ColumnIndex index : targetOrder_) {
ColumnIndex realIndex = _columnOrigins.at(index).at(!left);
if (realIndex != NO_COLUMN) {
specificSortOrder.push_back(realIndex);
}
}
return specificSortOrder;
};

_subtrees[0] = QueryExecutionTree::createSortedTree(std::move(_subtrees[0]),
computeSortOrder(true));
_subtrees[1] = QueryExecutionTree::createSortedTree(
std::move(_subtrees[1]), computeSortOrder(false));

// Swap children to get cheaper computation
if (_columnOrigins.at(targetOrder_.at(0)).at(1) == NO_COLUMN) {
std::swap(_subtrees[0], _subtrees[1]);
ql::ranges::for_each(_columnOrigins,
[](auto& el) { std::swap(el[0], el[1]); });
}
}
}

string Union::getCacheKeyImpl() const {
std::ostringstream os;
os << _subtrees[0]->getCacheKey() << "\n";
os << "UNION\n";
os << _subtrees[1]->getCacheKey() << "\n";
os << "sort order: ";
for (size_t i : targetOrder_) {
os << i << " ";
}
return std::move(os).str();
}

Expand All @@ -64,7 +94,7 @@
return _columnOrigins.size();
}

vector<ColumnIndex> Union::resultSortedOn() const { return {}; }
vector<ColumnIndex> Union::resultSortedOn() const { return targetOrder_; }

// _____________________________________________________________________________
VariableToColumnMap Union::computeVariableToColumnMap() const {
Expand Down Expand Up @@ -165,6 +195,18 @@
std::shared_ptr<const Result> subRes2 =
_subtrees[1]->getResult(requestLaziness);

// If first sort column is not present in left child, we can fall back to the
// cheap computation because it orders the left child first.
if (!targetOrder_.empty() &&
_columnOrigins.at(targetOrder_.at(0)).at(0) != NO_COLUMN) {
auto generator = computeResultKeepOrder(requestLaziness, std::move(subRes1),
std::move(subRes2));
return requestLaziness
? ProtoResult{std::move(generator), resultSortedOn()}
: ProtoResult{cppcoro::getSingleElement(std::move(generator)),
resultSortedOn()};
}

if (requestLaziness) {
return {computeResultLazily(std::move(subRes1), std::move(subRes2)),
resultSortedOn()};
Expand Down Expand Up @@ -292,3 +334,168 @@
}
return copy;
}

// _____________________________________________________________________________
std::shared_ptr<Operation> Union::createSortedVariant(
const vector<ColumnIndex>& sortOrder) const {
return std::make_shared<Union>(_executionContext, _subtrees.at(0),
_subtrees.at(1), sortOrder);
}

namespace {
struct Wrapper {
const IdTable& idTable_;
const LocalVocab& localVocab_;
};
Result::IdTableVocabPair& moveOrCopy(Result::IdTableVocabPair& element) {
return element;
}
Result::IdTableVocabPair moveOrCopy(const Wrapper& element) {
return {element.idTable_.clone(), element.localVocab_.clone()};
}

Check warning on line 355 in src/engine/Union.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Union.cpp#L353-L355

Added lines #L353 - L355 were not covered by tests
} // namespace

// _____________________________________________________________________________
bool Union::isSmaller(const auto& row1, const auto& row2) const {
for (auto& col : targetOrder_) {
ColumnIndex index1 = _columnOrigins.at(col).at(0);
ColumnIndex index2 = _columnOrigins.at(col).at(1);
if (index1 == NO_COLUMN) {
return true;
}
if (index2 == NO_COLUMN) {
return false;
}
if (row1[index1] != row2[index2]) {
return row1[index1] < row2[index2];
}
}
return false;
}

// _____________________________________________________________________________
Result::Generator Union::processRemaining(std::vector<ColumnIndex> permutation,
auto& it, auto end,
bool requestLaziness, size_t index,
IdTable& resultTable,
LocalVocab& localVocab) const {
// append the remaining elements
while (it != end) {
if (requestLaziness) {
if (index != 0) {
resultTable.insertAtEnd(it->idTable_, index, std::nullopt, permutation,
Id::makeUndefined());
localVocab.mergeWith(std::span{&it->localVocab_, 1});
co_yield Result::IdTableVocabPair{std::move(resultTable),
std::move(localVocab)};
} else {
if (resultTable.size() != 0) {
co_yield Result::IdTableVocabPair{std::move(resultTable),
std::move(localVocab)};
}
auto&& pair = moveOrCopy(*it);
pair.idTable_ = transformToCorrectColumnFormat(std::move(pair.idTable_),
permutation);
co_yield pair;
}
} else {
resultTable.insertAtEnd(it->idTable_, index, std::nullopt, permutation,
Id::makeUndefined());
}
index = 0;
++it;
}
}

// _____________________________________________________________________________
Result::Generator Union::computeResultKeepOrderImpl(
bool requestLaziness, auto range1, auto range2,
std::pair<std::shared_ptr<const Result>, std::shared_ptr<const Result>>)
const {
IdTable resultTable{getResultWidth(), allocator()};
if (requestLaziness) {
resultTable.reserve(chunkSize);
}
LocalVocab localVocab;
auto it1 = range1.begin();
auto it2 = range2.begin();
size_t index1 = 0;
size_t index2 = 0;
auto pushRow = [this, &resultTable](bool left, const auto& row) {
resultTable.emplace_back();
for (size_t column = 0; column < resultTable.numColumns(); column++) {
ColumnIndex origin = _columnOrigins.at(column).at(!left);
resultTable.at(resultTable.size() - 1, column) =
origin == NO_COLUMN ? Id::makeUndefined() : row[origin];
}
};
while (it1 != range1.end() && it2 != range2.end()) {
localVocab.mergeWith(std::span{&it1->localVocab_, 1});
localVocab.mergeWith(std::span{&it2->localVocab_, 1});
while (index1 < it1->idTable_.size() && index2 < it2->idTable_.size()) {
if (isSmaller(it1->idTable_.at(index1), it2->idTable_.at(index2))) {
pushRow(true, it1->idTable_.at(index1));
index1++;
} else {
pushRow(false, it2->idTable_.at(index2));
index2++;
}
if (requestLaziness && resultTable.size() >= chunkSize) {
co_yield Result::IdTableVocabPair{std::move(resultTable),
std::move(localVocab)};
resultTable = IdTable{getResultWidth(), allocator()};
resultTable.reserve(chunkSize);
localVocab = LocalVocab{};
}
}
if (index1 == it1->idTable_.size()) {
++it1;
index1 = 0;
}
if (index2 == it2->idTable_.size()) {
++it2;
index2 = 0;
}
}

// append the remaining elements
for (auto& pair :
processRemaining(computePermutation<true>(), it1, range1.end(),
requestLaziness, index1, resultTable, localVocab)) {
AD_CORRECTNESS_CHECK(requestLaziness);
co_yield pair;
}
for (auto& pair :
processRemaining(computePermutation<false>(), it2, range2.end(),
requestLaziness, index2, resultTable, localVocab)) {
AD_CORRECTNESS_CHECK(requestLaziness);
co_yield pair;
}
if (!requestLaziness) {
co_yield Result::IdTableVocabPair{std::move(resultTable),
std::move(localVocab)};
}
}

// _____________________________________________________________________________
Result::Generator Union::computeResultKeepOrder(
bool requestLaziness, std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2) const {
using Range = std::variant<Result::LazyResult, std::array<Wrapper, 1>>;
Range leftRange = result1->isFullyMaterialized()
? Range{std::array{Wrapper{result1->idTable(),
result1->localVocab()}}}
: Range{std::move(result1->idTables())};
Range rightRange = result2->isFullyMaterialized()
? Range{std::array{Wrapper{result2->idTable(),
result2->localVocab()}}}
: Range{std::move(result2->idTables())};
return std::visit(
[this, requestLaziness, &result1, &result2](auto leftRange,
auto rightRange) {
return computeResultKeepOrderImpl(
requestLaziness, std::move(leftRange), std::move(rightRange),
std::pair{std::move(result1), std::move(result2)});
},
std::move(leftRange), std::move(rightRange));
}
33 changes: 32 additions & 1 deletion src/engine/Union.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ class Union : public Operation {
*/
std::vector<std::array<size_t, 2>> _columnOrigins;
std::array<std::shared_ptr<QueryExecutionTree>, 2> _subtrees;
std::vector<ColumnIndex> targetOrder_;

public:
Union(QueryExecutionContext* qec,
const std::shared_ptr<QueryExecutionTree>& t1,
const std::shared_ptr<QueryExecutionTree>& t2);
const std::shared_ptr<QueryExecutionTree>& t2,
std::vector<ColumnIndex> targetOrder = {});

protected:
virtual string getCacheKeyImpl() const override;
Expand Down Expand Up @@ -61,6 +63,14 @@ class Union : public Operation {
return {_subtrees[0].get(), _subtrees[1].get()};
}

// Create a sorted variant of this operation. This can be more efficient than
// stacking a `Sort` operation on top of this one because Union can simply
// push the sort down to its children. If one of the children is already
// sorted properly then it is way cheaper to sort the other child and then
// merge the two sorted results.
std::shared_ptr<Operation> createSortedVariant(
const vector<ColumnIndex>& sortColumns) const;

private:
std::unique_ptr<Operation> cloneImpl() const override;

Expand All @@ -85,4 +95,25 @@ class Union : public Operation {
Result::Generator computeResultLazily(
std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2) const;

// Compares two rows with respect to the columns that the result is sorted on.
bool isSmaller(const auto& row1, const auto& row2) const;

// Helper function for `computeResultKeepOrderImpl` that processes any
// remaining elements once one side is exhausted.
Result::Generator processRemaining(std::vector<ColumnIndex> permutation,
auto& it, auto end, bool requestLaziness,
size_t index, IdTable& resultTable,
LocalVocab& localVocab) const;

// Actual implementation of `computeResultKeepOrder`.
Result::Generator computeResultKeepOrderImpl(
bool requestLaziness, auto range1, auto range2,
std::pair<std::shared_ptr<const Result>, std::shared_ptr<const Result>>
lifetimeExtension) const;

// Similar to `computeResultLazily` but it keeps the order of the results.
Result::Generator computeResultKeepOrder(
bool requestLaziness, std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2) const;
};
33 changes: 23 additions & 10 deletions src/engine/idTable/IdTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -736,22 +736,35 @@ class IdTable {
// The input must be some kind of `IdTable`.
// TODO<joka921> Can/should we constraint this functions by a concept?
template <typename Table>
void insertAtEnd(const Table& table,
std::optional<size_t> beginIdx = std::nullopt,
std::optional<size_t> endIdx = std::nullopt) {
AD_CORRECTNESS_CHECK(table.numColumns() == numColumns());
void insertAtEnd(
const Table& table, std::optional<size_t> beginIdx = std::nullopt,
std::optional<size_t> endIdx = std::nullopt,
std::optional<std::vector<ColumnIndex>> permutation = std::nullopt,
typename Table::single_value_type defaultValue = {}) {
AD_CORRECTNESS_CHECK(
table.numColumns() == numColumns() ||
(permutation.has_value() && numColumns() == permutation->size()));
auto begin = beginIdx.value_or(0);
auto end = endIdx.value_or(table.size());
AD_CORRECTNESS_CHECK(begin <= end && end <= table.size());
auto numInserted = end - begin;
auto oldSize = size();
resize(numRows() + numInserted);
ql::ranges::for_each(ad_utility::integerRange(numColumns()),
[this, &table, oldSize, begin, numInserted](size_t i) {
ql::ranges::copy(
table.getColumn(i).subspan(begin, numInserted),
getColumn(i).begin() + oldSize);
});
ql::ranges::for_each(
ad_utility::integerRange(numColumns()),
[this, &table, oldSize, begin, numInserted, &permutation,
&defaultValue](size_t i) {
size_t mappedIndex =
permutation.has_value() ? permutation.value()[i] : i;
// Map out of index column indices from the default value.
if (mappedIndex >= table.numColumns()) {
ql::ranges::fill(getColumn(i).subspan(oldSize), defaultValue);
return;
}
ql::ranges::copy(
table.getColumn(mappedIndex).subspan(begin, numInserted),
getColumn(i).begin() + oldSize);
});
}

// Check whether two `IdTables` have the same content. Mostly used for unit
Expand Down
Loading
Loading