Skip to content

Commit

Permalink
Projection: Stop materializing forwarded columns (hyrise#2247)
Browse files Browse the repository at this point in the history
Up to now, whenever a Projection SELECT a, a+1 FROM t emitted a newly generate column (here: a+1), it also materialized all columns that it forwarded (here: a). The reason for that is that we do not allow ValueSegments and ReferenceSegments to be mixed within one table.

With this PR, we instead forward the ReferenceSegments and create new ReferenceSegments for the newly generated columns. These point to a new, temporary table. Because we use the EntireChunkPosList, this virtually causes no overhead.

Also does a minor improvement to segment_iterate which causes a segment that is iterated on using an EntireChunkPosLists to use the sequential iterator, not the random access one.
  • Loading branch information
mrks authored Oct 20, 2020
1 parent 59dc39f commit da83f75
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 187 deletions.
252 changes: 112 additions & 140 deletions src/lib/operators/projection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,45 @@ std::shared_ptr<const Table> Projection::_on_execute() {

const auto& input_table = *left_input_table();

/**
* For PQPColumnExpressions, it is possible to forward the input column if the input TableType (References or Data)
* matches the output column type (ReferenceSegment or not).
*/
const auto only_projects_columns = std::all_of(expressions.begin(), expressions.end(), [&](const auto& expression) {
// Determine the type of the output table: If no input columns are forwarded, i.e., all output columns are newly
// generated, the output type is always TableType::Data and all segments are ValueSegments. Otherwise, the output type
// depends on the input table's type. If the forwarded columns come from a data table, so are the newly generated
// columns. However, we cannot return a table that mixes data and reference segments, so if the forwarded columns are
// reference columns, the newly generated columns contain reference segments, too. These point to an internal dummy
// table, the projection_result_table. The life time of this table is managed by the shared_ptr in
// ReferenceSegment::_referenced_table. The ReferenceSegments created for this use an EntireChunkPosList, so
// segment_iterate on the ReferenceSegment decays to the underlying ValueSegment virtually without overhead.

// +-----+ +------+ Case 1
// |a |b | |a |a+1| * Input TableType::Data
// |DS|DS| --> |DS|VS | * A column (a) is forwarded
// +-----+ +------+ Result: No type change needed, output TableType::Data
//
// +-----+ +---+ Case 2
// |a |b | |a+1| * Input TableType::References
// |RS|RS| --> |VS | * No column is forwarded
// +-----+ +---+ Result: Output TableType::Data
//
// +------+ +-----+ +------+ Case 3
// |orig_a| |a |b | |a |a+1| * Input TableType::References
// |VS | |RS|RS| --> |RS|RS | * A column (a) is forwarded
// +------+ +-----+ +------+ Result: Type change needed, output TableType::References, RS a is forwarded, a+1
// ^ | | | is a new RS pointing to a new dummy table.
// +--------+----------+ v
// +---+ (VS: ValueSegment, DS: DictionarySegment, RS: ReferenceSegment)
// |a+1|
// |VS |
// +---+

const auto forwards_any_columns = std::any_of(expressions.begin(), expressions.end(), [&](const auto& expression) {
return expression->type == ExpressionType::PQPColumn;
});
const auto output_table_type = forwards_any_columns ? input_table.type() : TableType::Data;

const auto output_table_type = only_projects_columns ? input_table.type() : TableType::Data;
const auto forward_columns = input_table.type() == output_table_type;
// NULLability information is either forwarded or collected during the execution of the ExpressionEvaluator
auto column_is_nullable = std::vector<bool>(expressions.size(), false);

// Uncorrelated subqueries need to be evaluated exactly once, not once per chunk.
const auto uncorrelated_subquery_results =
ExpressionEvaluator::populate_uncorrelated_subquery_results_cache(expressions);

Expand All @@ -70,167 +98,79 @@ std::shared_ptr<const Table> Projection::_on_execute() {
step_performance_data.set_step_runtime(OperatorSteps::UncorrelatedSubqueries, timer.lap());
}

auto column_is_nullable = std::vector<bool>(expressions.size(), false);

/**
* Perform the projection
*/
auto output_chunk_segments = std::vector<Segments>(input_table.chunk_count());
// Perform the actual projection on a per-chunk level. `output_segments_by_chunk` will contain both forwarded and
// newly generated columns. In the upcoming loop, we do not yet deal with the projection_result_table indirection
// described above.
auto output_segments_by_chunk = std::vector<Segments>(input_table.chunk_count());

auto forwarding_cost = std::chrono::nanoseconds{};
auto expression_evaluator_cost = std::chrono::nanoseconds{};

const auto chunk_count_input_table = input_table.chunk_count();
for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count_input_table; ++chunk_id) {
const auto chunk_count = input_table.chunk_count();
for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count; ++chunk_id) {
const auto input_chunk = input_table.get_chunk(chunk_id);
Assert(input_chunk, "Physically deleted chunk should not reach this point, see get_chunk / #1686.");

auto output_segments = Segments{expressions.size()};

// The ExpressionEvaluator is created once per chunk so that evaluated sub-expressions can be reused across columns.
ExpressionEvaluator evaluator(left_input_table(), chunk_id, uncorrelated_subquery_results);

for (auto column_id = ColumnID{0}; column_id < expressions.size(); ++column_id) {
const auto& expression = expressions[column_id];

// Forward input column if possible
if (expression->type == ExpressionType::PQPColumn && forward_columns) {
const auto pqp_column_expression = std::static_pointer_cast<PQPColumnExpression>(expression);
output_segments[column_id] = input_chunk->get_segment(pqp_column_expression->column_id);
if (expression->type == ExpressionType::PQPColumn) {
// Forward input column if possible
const auto& pqp_column_expression = static_cast<const PQPColumnExpression&>(*expression);
output_segments[column_id] = input_chunk->get_segment(pqp_column_expression.column_id);
column_is_nullable[column_id] =
column_is_nullable[column_id] || input_table.column_is_nullable(pqp_column_expression->column_id);
forwarding_cost += timer.lap();
} else if (expression->type == ExpressionType::PQPColumn && !forward_columns) {
// The current column will be returned without any logical modifications. As other columns do get modified (and
// returned as a ValueSegment), all segments (including this one) need to become ValueSegments. This segment is
// not yet a ValueSegment (otherwise forward_columns would be true); thus we need to materialize it.

// TODO(jk): Once we have a smart pos list that knows that a single chunk is referenced in its entirety, we can
// simply forward that chunk here instead of materializing it.

const auto pqp_column_expression = std::static_pointer_cast<PQPColumnExpression>(expression);
const auto segment = input_chunk->get_segment(pqp_column_expression->column_id);

resolve_data_type(expression->data_type(), [&](const auto data_type) {
using ColumnDataType = typename decltype(data_type)::type;

const auto reference_segment = std::dynamic_pointer_cast<ReferenceSegment>(segment);
DebugAssert(reference_segment, "Expected ReferenceSegment");

// If the ReferenceSegment references a single (FixedString)DictionarySegment, do not materialize it as a
// ValueSegment, but re-use its dictionary and only copy the value ids.
auto referenced_dictionary_segment = std::shared_ptr<BaseDictionarySegment>{};

const auto& pos_list = reference_segment->pos_list();
if (pos_list->references_single_chunk()) {
const auto& referenced_table = reference_segment->referenced_table();
const auto& referenced_chunk = referenced_table->get_chunk(pos_list->common_chunk_id());
const auto& referenced_segment = referenced_chunk->get_segment(reference_segment->referenced_column_id());
referenced_dictionary_segment = std::dynamic_pointer_cast<BaseDictionarySegment>(referenced_segment);
}

if (referenced_dictionary_segment) {
// Resolving the BaseDictionarySegment so that we can handle both regular and fixed-string dictionaries
resolve_encoded_segment_type<ColumnDataType>(
*referenced_dictionary_segment, [&](const auto& typed_segment) {
using DictionarySegmentType = std::decay_t<decltype(typed_segment)>;

// Write new a attribute vector containing only positions given from the input_pos_list.
[[maybe_unused]] auto materialize_filtered_attribute_vector = [](const auto& dictionary_segment,
const auto& input_pos_list) {
auto filtered_attribute_vector = pmr_vector<ValueID::base_type>(input_pos_list->size());
auto iterable = create_iterable_from_attribute_vector(dictionary_segment);
auto chunk_offset = ChunkOffset{0};
iterable.with_iterators(input_pos_list, [&](auto it, auto end) {
while (it != end) {
filtered_attribute_vector[chunk_offset] = it->value();
++it;
++chunk_offset;
}
});
// DictionarySegments take BaseCompressedVectors, not an std::vector<ValueId> for the attribute
// vector. But the latter can be wrapped into a FixedSizeByteAligned<uint32_t> without copying.
return std::make_shared<FixedSizeByteAlignedVector<uint32_t>>(std::move(filtered_attribute_vector));
};

if constexpr (std::is_same_v<DictionarySegmentType, DictionarySegment<ColumnDataType>>) { // NOLINT
const auto compressed_attribute_vector =
materialize_filtered_attribute_vector(typed_segment, pos_list);
const auto& dictionary = typed_segment.dictionary();

output_segments[column_id] = std::make_shared<DictionarySegment<ColumnDataType>>(
dictionary, std::move(compressed_attribute_vector));
} else if constexpr (std::is_same_v<DictionarySegmentType, // NOLINT - lint.sh wants {} on same line
FixedStringDictionarySegment<ColumnDataType>>) {
const auto compressed_attribute_vector =
materialize_filtered_attribute_vector(typed_segment, pos_list);
const auto& dictionary = typed_segment.fixed_string_dictionary();

output_segments[column_id] = std::make_shared<FixedStringDictionarySegment<ColumnDataType>>(
dictionary, std::move(compressed_attribute_vector));
} else {
Fail("Referenced segment was dynamically casted to BaseDictionarySegment, but resolve failed");
}
// clang-format on
});
} else {
// End of dictionary segment shortcut - handle all other referenced segments and ReferenceSegments that
// reference more than a single chunk by materializing them into a ValueSegment
bool has_null = false;
auto values = pmr_vector<ColumnDataType>(segment->size());
auto null_values = pmr_vector<bool>(
input_table.column_is_nullable(pqp_column_expression->column_id) ? segment->size() : 0);

auto chunk_offset = ChunkOffset{0};
segment_iterate<ColumnDataType>(*segment, [&](const auto& position) {
if (position.is_null()) {
DebugAssert(!null_values.empty(), "Mismatching NULL information");
has_null = true;
null_values[chunk_offset] = true;
} else {
values[chunk_offset] = position.value();
}
++chunk_offset;
});

auto value_segment = std::shared_ptr<ValueSegment<ColumnDataType>>{};
if (has_null) {
value_segment = std::make_shared<ValueSegment<ColumnDataType>>(std::move(values), std::move(null_values));
} else {
value_segment = std::make_shared<ValueSegment<ColumnDataType>>(std::move(values));
}

output_segments[column_id] = std::move(value_segment);
column_is_nullable[column_id] = has_null;
}
});
column_is_nullable[column_id] || input_table.column_is_nullable(pqp_column_expression.column_id);
forwarding_cost += timer.lap();
} else {
// Newly generated column - the expression needs to be evaluated
auto output_segment = evaluator.evaluate_expression_to_segment(*expression);
column_is_nullable[column_id] = column_is_nullable[column_id] || output_segment->is_nullable();

// Storing the result in output_segments means that the vector may contain both ReferenceSegments and
// ValueSegments. We deal with this later.
output_segments[column_id] = std::move(output_segment);
expression_evaluator_cost += timer.lap();
}
}

output_chunk_segments[chunk_id] = std::move(output_segments);
output_segments_by_chunk[chunk_id] = std::move(output_segments);
}

step_performance_data.set_step_runtime(OperatorSteps::ForwardUnmodifiedColumns, forwarding_cost);
step_performance_data.set_step_runtime(OperatorSteps::EvaluateNewColumns, expression_evaluator_cost);

/**
* Determine the TableColumnDefinitions and build the output table
*/
TableColumnDefinitions column_definitions;
// Determine the TableColumnDefinitions. We can only do this now because column_is_nullable has been filled in the
// loop above. If necessary, projection_result_column_definitions holds those newly generated columns that the
// ReferenceSegments point to.
TableColumnDefinitions output_column_definitions;
TableColumnDefinitions projection_result_column_definitions;
for (auto column_id = ColumnID{0}; column_id < expressions.size(); ++column_id) {
column_definitions.emplace_back(expressions[column_id]->as_column_name(), expressions[column_id]->data_type(),
column_is_nullable[column_id]);
const auto definition = TableColumnDefinition{expressions[column_id]->as_column_name(),
expressions[column_id]->data_type(), column_is_nullable[column_id]};
output_column_definitions.emplace_back(definition);

if (expressions[column_id]->type != ExpressionType::PQPColumn && output_table_type == TableType::References) {
projection_result_column_definitions.emplace_back(definition);
}
}

// Create the projection_result_table if needed
auto projection_result_table = std::shared_ptr<Table>{};
if (!projection_result_column_definitions.empty()) {
projection_result_table = std::make_shared<Table>(projection_result_column_definitions, TableType::Data,
std::nullopt, input_table.uses_mvcc());
}

auto output_chunks = std::vector<std::shared_ptr<Chunk>>{chunk_count_input_table};
auto output_chunks = std::vector<std::shared_ptr<Chunk>>{chunk_count};
auto projection_result_chunks = std::vector<std::shared_ptr<Chunk>>{chunk_count};

// Maps input columns to output columns (which may be reordered). Only contains input column IDs that are forwarded
// to the output without modfications.
// Create a mapping from input columns to output columns for future use. This is necessary as the order may have been
// changed. The mapping only contains input column IDs that are forwarded to the output without modfications.
auto input_column_to_output_column = std::unordered_map<ColumnID, ColumnID>{};
for (auto expression_id = ColumnID{0}; expression_id < expressions.size(); ++expression_id) {
const auto& expression = expressions[expression_id];
Expand All @@ -240,15 +180,47 @@ std::shared_ptr<const Table> Projection::_on_execute() {
}
}

for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count_input_table; ++chunk_id) {
// Create the actual chunks, and, if needed, fill the projection_result_table. Also set MVCC and
// individually_sorted_by information as needed.
for (auto chunk_id = ChunkID{0}; chunk_id < chunk_count; ++chunk_id) {
const auto input_chunk = input_table.get_chunk(chunk_id);
Assert(input_chunk, "Physically deleted chunk should not reach this point, see get_chunk / #1686.");

auto projection_result_segments = Segments{};
const auto entire_chunk_pos_list = std::make_shared<EntireChunkPosList>(chunk_id, input_chunk->size());
for (auto column_id = ColumnID{0}; column_id < expressions.size(); ++column_id) {
// Turn newly generated ValueSegments into ReferenceSegments, if needed
if (expressions[column_id]->type != ExpressionType::PQPColumn && output_table_type == TableType::References) {
projection_result_segments.emplace_back(output_segments_by_chunk[chunk_id][column_id]);

const auto projection_result_column_id =
ColumnID{static_cast<ColumnID::base_type>(projection_result_segments.size() - 1)};

output_segments_by_chunk[chunk_id][column_id] = std::make_shared<ReferenceSegment>(
projection_result_table, projection_result_column_id, entire_chunk_pos_list);
}
}

// The output chunk contains all rows that are in the stored chunk, including invalid rows. We forward this
// information so that following operators (currently, the Validate operator) can use it for optimizations.
const auto chunk = std::make_shared<Chunk>(std::move(output_chunk_segments[chunk_id]), input_chunk->mvcc_data());
chunk->increase_invalid_row_count(input_chunk->invalid_row_count());
chunk->finalize();
auto chunk = std::shared_ptr<Chunk>{};
if (output_table_type == TableType::Data) {
chunk = std::make_shared<Chunk>(std::move(output_segments_by_chunk[chunk_id]), input_chunk->mvcc_data());
chunk->increase_invalid_row_count(input_chunk->invalid_row_count());
chunk->finalize();

DebugAssert(projection_result_segments.empty(),
"For TableType::Data, projection_result_segments should be unused");
} else {
chunk = std::make_shared<Chunk>(std::move(output_segments_by_chunk[chunk_id]));
// No need to increase_invalid_row_count here, as it is ignored for reference chunks anyway
chunk->finalize();

if (projection_result_table) {
projection_result_table->append_chunk(projection_result_segments, input_chunk->mvcc_data());
projection_result_table->last_chunk()->increase_invalid_row_count(input_chunk->invalid_row_count());
}
}

// Forward sorted_by flags, mapping column ids
const auto& sorted_by = input_chunk->individually_sorted_by();
Expand All @@ -272,7 +244,7 @@ std::shared_ptr<const Table> Projection::_on_execute() {

step_performance_data.set_step_runtime(OperatorSteps::BuildOutput, timer.lap());

return std::make_shared<Table>(column_definitions, output_table_type, std::move(output_chunks),
return std::make_shared<Table>(output_column_definitions, output_table_type, std::move(output_chunks),
input_table.uses_mvcc());
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/storage/chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void Chunk::set_individually_sorted_by(const SortColumnDefinition& sorted_by) {
}

void Chunk::set_individually_sorted_by(const std::vector<SortColumnDefinition>& sorted_by) {
Assert(!is_mutable(), "Cannot set sorted_by on mutable chunks.");
Assert(!is_mutable(), "Cannot set_individually_sorted_by on mutable chunks.");
// Currently, we assume that set_individually_sorted_by is called only once at most.
// As such, there should be no existing sorting and the new sorting should contain at least one column.
// Feel free to remove this assertion if necessary.
Expand Down
Loading

0 comments on commit da83f75

Please sign in to comment.