Skip to content

Commit

Permalink
add executorBarrier to nimble parallel writer (#102)
Browse files Browse the repository at this point in the history
Summary:

adding executor barrier for the fieldwriter for parallelism

Differential Revision: D64775045
  • Loading branch information
Scott Young authored and facebook-github-bot committed Nov 7, 2024
1 parent 979f56b commit 21b37d0
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 11 deletions.
56 changes: 49 additions & 7 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "dwio/nimble/velox/DeduplicationUtils.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "folly/concurrency/ConcurrentHashMap.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
Expand Down Expand Up @@ -365,6 +366,11 @@ class RowFieldWriter : public FieldWriter {
: FieldWriter{context, context.schemaBuilder.createRowTypeBuilder(type->size())},
nullsStream_{context_.createNullsStreamData<bool>(
typeBuilder_->asRow().nullsDescriptor())} {
if (context.writeExecutor) {
barrier_ =
std::make_unique<ExecutorBarrier>(std::move(context_.writeExecutor));
}

auto rowType =
std::dynamic_pointer_cast<const velox::RowType>(type->type());

Expand Down Expand Up @@ -411,8 +417,27 @@ class RowFieldWriter : public FieldWriter {
[&](auto offset) { childRanges.add(offset, 1); });
context_.decodingPairPool().addPair(std::move(pair));
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);

if (barrier_) {
for (auto i = 0; i < fields_.size(); ++i) {
const auto& kind = fields_[i]->typeBuilder()->kind();
if (kind == Kind::Row || kind == Kind::FlatMap) {
// if row handle now to prevent deadlock
// if flatmap handle within due to fieldvaluewriter creation
fields_[i]->write(row->childAt(i), *childRangesPtr);
} else {
barrier_->add([&field = fields_[i],
&rowItem = row->childAt(i),
&childRanges = *childRangesPtr]() {
field->write(rowItem, childRanges);
});
}
}
barrier_->waitAll();
} else {
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
}
}

Expand All @@ -431,6 +456,7 @@ class RowFieldWriter : public FieldWriter {
}

private:
std::unique_ptr<ExecutorBarrier> barrier_;
std::vector<std::unique_ptr<FieldWriter>> fields_;
NullsStreamData& nullsStream_;
};
Expand Down Expand Up @@ -826,7 +852,12 @@ class FlatMapFieldWriter : public FieldWriter {
NimbleTypeTraits<K>::scalarKind)),
nullsStream_{context_.createNullsStreamData<bool>(
typeBuilder_->asFlatMap().nullsDescriptor())},
valueType_{type->childAt(1)} {}
valueType_{type->childAt(1)} {
if (context.writeExecutor) {
barrier_ =
std::make_unique<ExecutorBarrier>(std::move(context.writeExecutor));
}
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
Expand Down Expand Up @@ -985,8 +1016,16 @@ class FlatMapFieldWriter : public FieldWriter {
// Now actually ingest the map values
if (nonNullCount > 0) {
auto& values = map->mapValues();
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);

if (barrier_) {
for (auto& pair : currentValueFields_) {
barrier_->add([&]() { pair.second->write(values, nonNullCount); });
}
barrier_->waitAll();
} else {
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
}
}
nonNullCount_ += nonNullCount;
Expand Down Expand Up @@ -1023,6 +1062,7 @@ class FlatMapFieldWriter : public FieldWriter {
}

private:
std::unique_ptr<ExecutorBarrier> barrier_;
FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) {
auto it = currentValueFields_.find(key);
if (it != currentValueFields_.end()) {
Expand All @@ -1035,6 +1075,7 @@ class FlatMapFieldWriter : public FieldWriter {

// check whether the typebuilder for this key is already present
auto flatFieldIt = allValueFields_.find(key);

if (flatFieldIt == allValueFields_.end()) {
auto valueFieldWriter = FieldWriter::create(context_, valueType_);
const auto& inMapDescriptor = typeBuilder_->asFlatMap().addChild(
Expand All @@ -1061,7 +1102,8 @@ class FlatMapFieldWriter : public FieldWriter {

NullsStreamData& nullsStream_;
// This map store the FlatMapValue fields used in current flush unit.
folly::F14FastMap<KeyType, FlatMapValueFieldWriter*> currentValueFields_;
folly::ConcurrentHashMap<KeyType, FlatMapValueFieldWriter*>
currentValueFields_;

// This map stores the FlatMapPassthrough fields.
folly::F14FastMap<
Expand All @@ -1072,7 +1114,7 @@ class FlatMapFieldWriter : public FieldWriter {
uint64_t nonNullCount_ = 0;
// This map store all FlatMapValue fields encountered by the VeloxWriter
// across the whole file.
folly::F14FastMap<KeyType, std::unique_ptr<FlatMapValueFieldWriter>>
folly::ConcurrentHashMap<KeyType, std::unique_ptr<FlatMapValueFieldWriter>>
allValueFields_;
};

Expand Down
5 changes: 5 additions & 0 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/StreamData.h"
#include "folly/concurrency/DynamicBoundedQueue.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/dwio/common/TypeWithId.h"
#include "velox/vector/DecodedVector.h"

Expand All @@ -35,6 +36,7 @@ struct InputBufferGrowthStats {
std::atomic<uint64_t> itemCount{0};
};

using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;
using DecodedVectorPtr = std::unique_ptr<velox::DecodedVector>;
using SelectivityVectorPtr = std::unique_ptr<velox::SelectivityVector>;
using DecodingPair = std::pair<DecodedVectorPtr, SelectivityVectorPtr>;
Expand Down Expand Up @@ -119,6 +121,7 @@ class DecodingPairPool {
struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::shared_ptr<folly::Executor> writeExecutor = nullptr,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {},
std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10),
Expand All @@ -127,6 +130,7 @@ struct FieldWriterContext {
"field_writer_buffer",
true,
std::move(reclaimer))},
writeExecutor{std::move(writeExecutor)},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
decodingPairPool_{std::make_unique<DecodingPairPool>(
Expand All @@ -137,6 +141,7 @@ struct FieldWriterContext {
}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::shared_ptr<folly::Executor> writeExecutor;
SchemaBuilder schemaBuilder;

folly::F14FastSet<uint32_t> flatMapNodeIds;
Expand Down
9 changes: 8 additions & 1 deletion dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor, options.poolTimeout, options.maxPoolSize},
: FieldWriterContext{
memoryPool,
options.writeExecutor,
options.reclaimerFactory(),
options.vectorDecoderVisitor,
options.poolTimeout,
options.maxPoolSize
},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down
2 changes: 2 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ struct VeloxWriterOptions {
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal write operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;

bool enableChunking = false;

Expand Down
11 changes: 8 additions & 3 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2519,9 +2521,12 @@ TEST_F(VeloxReaderTests, FuzzComplex) {
for (auto parallelismFactor :
{0U, 1U, 2U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
writerOptions.encodingExecutor = parallelismFactor > 0
? std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor)
: nullptr;
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
Expand Down

0 comments on commit 21b37d0

Please sign in to comment.