diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index d0b42bb..3761991 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -19,6 +19,7 @@ #include "dwio/nimble/velox/DeduplicationUtils.h" #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaTypes.h" +#include "folly/concurrency/ConcurrentHashMap.h" #include "velox/common/base/CompareFlags.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" @@ -365,6 +366,11 @@ class RowFieldWriter : public FieldWriter { : FieldWriter{context, context.schemaBuilder.createRowTypeBuilder(type->size())}, nullsStream_{context_.createNullsStreamData( typeBuilder_->asRow().nullsDescriptor())} { + if (context.writeExecutor) { + barrier_ = + std::make_unique(std::move(context_.writeExecutor)); + } + auto rowType = std::dynamic_pointer_cast(type->type()); @@ -411,8 +417,27 @@ class RowFieldWriter : public FieldWriter { [&](auto offset) { childRanges.add(offset, 1); }); context_.decodingPairPool().addPair(std::move(pair)); } - for (auto i = 0; i < fields_.size(); ++i) { - fields_[i]->write(row->childAt(i), *childRangesPtr); + + if (barrier_) { + for (auto i = 0; i < fields_.size(); ++i) { + const auto& kind = fields_[i]->typeBuilder()->kind(); + if (kind == Kind::Row || kind == Kind::FlatMap) { + // if row handle now to prevent deadlock + // if flatmap handle within due to fieldvaluewriter creation + fields_[i]->write(row->childAt(i), *childRangesPtr); + } else { + barrier_->add([&field = fields_[i], + &rowItem = row->childAt(i), + &childRanges = *childRangesPtr]() { + field->write(rowItem, childRanges); + }); + } + } + barrier_->waitAll(); + } else { + for (auto i = 0; i < fields_.size(); ++i) { + fields_[i]->write(row->childAt(i), *childRangesPtr); + } } } @@ -431,6 +456,7 @@ class RowFieldWriter : public FieldWriter { } private: + std::unique_ptr barrier_; std::vector> fields_; NullsStreamData& nullsStream_; }; @@ -826,7 +852,12 @@ class FlatMapFieldWriter : public FieldWriter { NimbleTypeTraits::scalarKind)), nullsStream_{context_.createNullsStreamData( typeBuilder_->asFlatMap().nullsDescriptor())}, - valueType_{type->childAt(1)} {} + valueType_{type->childAt(1)} { + if (context.writeExecutor) { + barrier_ = + std::make_unique(std::move(context.writeExecutor)); + } + } void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) override { @@ -985,8 +1016,16 @@ class FlatMapFieldWriter : public FieldWriter { // Now actually ingest the map values if (nonNullCount > 0) { auto& values = map->mapValues(); - for (auto& pair : currentValueFields_) { - pair.second->write(values, nonNullCount); + + if (barrier_) { + for (auto& pair : currentValueFields_) { + barrier_->add([&]() { pair.second->write(values, nonNullCount); }); + } + barrier_->waitAll(); + } else { + for (auto& pair : currentValueFields_) { + pair.second->write(values, nonNullCount); + } } } nonNullCount_ += nonNullCount; @@ -1023,6 +1062,7 @@ class FlatMapFieldWriter : public FieldWriter { } private: + std::unique_ptr barrier_; FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) { auto it = currentValueFields_.find(key); if (it != currentValueFields_.end()) { @@ -1035,6 +1075,7 @@ class FlatMapFieldWriter : public FieldWriter { // check whether the typebuilder for this key is already present auto flatFieldIt = allValueFields_.find(key); + if (flatFieldIt == allValueFields_.end()) { auto valueFieldWriter = FieldWriter::create(context_, valueType_); const auto& inMapDescriptor = typeBuilder_->asFlatMap().addChild( @@ -1061,7 +1102,8 @@ class FlatMapFieldWriter : public FieldWriter { NullsStreamData& nullsStream_; // This map store the FlatMapValue fields used in current flush unit. - folly::F14FastMap currentValueFields_; + folly::ConcurrentHashMap + currentValueFields_; // This map stores the FlatMapPassthrough fields. folly::F14FastMap< @@ -1072,7 +1114,7 @@ class FlatMapFieldWriter : public FieldWriter { uint64_t nonNullCount_ = 0; // This map store all FlatMapValue fields encountered by the VeloxWriter // across the whole file. - folly::F14FastMap> + folly::ConcurrentHashMap> allValueFields_; }; diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index aad6b45..1cbbf97 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -23,6 +23,7 @@ #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/StreamData.h" #include "folly/concurrency/DynamicBoundedQueue.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/dwio/common/TypeWithId.h" #include "velox/vector/DecodedVector.h" @@ -35,6 +36,7 @@ struct InputBufferGrowthStats { std::atomic itemCount{0}; }; +using ExecutorBarrier = velox::dwio::common::ExecutorBarrier; using DecodedVectorPtr = std::unique_ptr; using SelectivityVectorPtr = std::unique_ptr; using DecodingPair = std::pair; @@ -119,6 +121,7 @@ class DecodingPairPool { struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, + std::shared_ptr writeExecutor = nullptr, std::unique_ptr reclaimer = nullptr, std::function vectorDecoderVisitor = []() {}, std::chrono::milliseconds timeout = std::chrono::milliseconds(1000 * 10), @@ -127,6 +130,7 @@ struct FieldWriterContext { "field_writer_buffer", true, std::move(reclaimer))}, + writeExecutor{std::move(writeExecutor)}, inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, decodingPairPool_{std::make_unique( @@ -137,6 +141,7 @@ struct FieldWriterContext { } std::shared_ptr bufferMemoryPool; + std::shared_ptr writeExecutor; SchemaBuilder schemaBuilder; folly::F14FastSet flatMapNodeIds; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 50fac4f..ff926da 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,14 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor, options.poolTimeout, options.maxPoolSize}, + : FieldWriterContext{ + memoryPool, + options.writeExecutor, + options.reclaimerFactory(), + options.vectorDecoderVisitor, + options.poolTimeout, + options.maxPoolSize + }, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 68021e7..d5bac31 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -132,6 +132,8 @@ struct VeloxWriterOptions { // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; + // If provided, internal write operations will happen in parallel + std::shared_ptr writeExecutor; bool enableChunking = false; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index b192107..12657a5 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2433,6 +2433,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) { if (parallelismFactor > 0) { writerOptions.encodingExecutor = std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); } for (auto i = 0; i < iterations; ++i) { @@ -2519,9 +2521,12 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; - writerOptions.encodingExecutor = parallelismFactor > 0 - ? std::make_shared(parallelismFactor) - : nullptr; + if (parallelismFactor > 0) { + writerOptions.encodingExecutor = + std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); + } for (auto i = 0; i < iterations; ++i) { writeAndVerify(