Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add bufferPool for nimble parallel writer #103

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions dwio/nimble/common/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

namespace facebook::nimble {

// Internally manages memory in chunks. Releases memory only upon destruction.
// Internally manages memory in chunks. releases memory when destroyed
// Buffer is NOT threadsafe: external locking is required.
class Buffer {
using MemoryPool = facebook::velox::memory::MemoryPool;
Expand All @@ -52,7 +52,6 @@ class Buffer {
// to, and guarantees for the lifetime of *this that that region will remain
// valid. Does NOT guarantee that the region is initially 0'd.
char* reserve(uint64_t bytes) {
std::scoped_lock<std::mutex> l(mutex_);
if (reserveEnd_ + bytes <= chunkEnd_) {
pos_ = reserveEnd_;
reserveEnd_ += bytes;
Expand Down Expand Up @@ -98,11 +97,6 @@ class Buffer {
char* reserveEnd_;
std::vector<velox::BufferPtr> chunks_;
MemoryPool& memoryPool_;
// NOTE: this is temporary fix, to quickly enable parallel access to the
// buffer class. In the near future, we are going to templetize this class to
// produce a concurrent and a non-concurrent variants, and change the call
// sites to use each variant when needed.
std::mutex mutex_;
};

} // namespace facebook::nimble
101 changes: 101 additions & 0 deletions dwio/nimble/common/tests/BufferPoolTests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) Meta Platforms, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>
#include "dwio/nimble/common/Exceptions.h"
#include "dwio/nimble/velox/FieldWriter.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/dwio/common/ExecutorBarrier.h"

namespace facebook::nimble::test {
using MemoryPool = velox::memory::MemoryPool;
using ExecutorBarrier = velox::dwio::common::ExecutorBarrier;

class BufferPoolTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
velox::memory::SharedArbitrator::registerFactory();
velox::memory::MemoryManager::testingSetInstance(
{.arbitratorKind = "SHARED"});
}

void SetUp() override {
rootPool_ = velox::memory::memoryManager()->addRootPool("default_root");
leafPool_ = rootPool_->addLeafChild("default_leaf");
}

std::shared_ptr<velox::memory::MemoryPool> rootPool_;
std::shared_ptr<velox::memory::MemoryPool> leafPool_;
};

TEST_F(BufferPoolTest, CreateBufferPoolBadMaxPool) {
try {
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 0};
FAIL();
} catch (const NimbleUserError& e) {
EXPECT_EQ(e.errorMessage(), "max pool size must be > 0");
}
}

TEST_F(BufferPoolTest, ReserveBuffer) {
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ 10};
EXPECT_EQ(bufferPool.size(), 10);
{
auto buffer = bufferPool.reserveBuffer();
EXPECT_EQ(bufferPool.size(), 9);
}
EXPECT_EQ(bufferPool.size(), 10);
}

TEST_F(BufferPoolTest, EmptyFillBufferPool) {
size_t iterations = 10;
auto bufferPool = BufferPool{*leafPool_, /* maxPoolSize */ iterations};

for (auto i = 0; i < iterations; ++i) {
{
auto buffer1 = bufferPool.reserveBuffer();
auto buffer2 = bufferPool.reserveBuffer();
auto buffer3 = bufferPool.reserveBuffer();

EXPECT_EQ(bufferPool.size(), iterations - 3);
}
EXPECT_EQ(bufferPool.size(), iterations);
}
}

TEST_F(BufferPoolTest, ParallelFillPool) {
auto parallelismFactor = std::thread::hardware_concurrency();
auto executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
ExecutorBarrier barrier{executor};
auto bufferPool = BufferPool{*leafPool_};
EXPECT_EQ(bufferPool.size(), parallelismFactor);

for (auto i = 0; i < parallelismFactor; ++i) {
barrier.add([&]() {
for (auto j = 0; j < 100000; ++j) {
auto buffer = bufferPool.reserveBuffer();
}
});
}

barrier.waitAll();
EXPECT_LE(bufferPool.size(), parallelismFactor);
}
} // namespace facebook::nimble::test
65 changes: 63 additions & 2 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,8 @@ class SimpleFieldWriter : public FieldWriter {
const OrderedRanges& ranges,
folly::Executor*) override {
auto size = ranges.size();
auto& buffer = context_.stringBuffer();
auto& data = valuesStream_.mutableData();

auto bufferObject = context_.bufferPool().reserveBuffer();
if (auto flat = vector->asFlatVector<SourceType>()) {
valuesStream_.ensureNullsCapacity(flat->mayHaveNulls(), size);
bool rangeCopied = false;
Expand Down Expand Up @@ -331,6 +330,7 @@ class SimpleFieldWriter : public FieldWriter {
valuesStream_.mutableNonNulls(),
Flat<SourceType>{vector},
[&](SourceType value) {
auto& buffer = bufferObject.get();
data.push_back(
C::convert(value, buffer, valuesStream_.extraMemory()));
});
Expand All @@ -344,6 +344,7 @@ class SimpleFieldWriter : public FieldWriter {
valuesStream_.mutableNonNulls(),
Decoded<SourceType>{decoded},
[&](SourceType value) {
auto& buffer = bufferObject.get();
data.push_back(
C::convert(value, buffer, valuesStream_.extraMemory()));
});
Expand Down Expand Up @@ -1590,6 +1591,66 @@ size_t DecodingContextPool::size() const {
return pool_.size();
}

BufferPool::BufferObject::BufferObject(
BufferPool& pool,
std::unique_ptr<Buffer> buffer)
: pool_{pool}, buffer_{std::move(buffer)} {}

BufferPool::BufferObject::~BufferObject() {
pool_.addBuffer(std::move(buffer_));
}

Buffer& BufferPool::BufferObject::get() {
return *buffer_;
}

BufferPool::BufferPool(
facebook::velox::memory::MemoryPool& memoryPool,
size_t maxPoolSize,
uint64_t initialChunkSize)
: defaultInitialChunkSize_{initialChunkSize},
semaphore_{0},
memoryPool_{memoryPool} {
NIMBLE_CHECK(maxPoolSize > 0, "max pool size must be > 0")
pool_.reserve(maxPoolSize);
for (size_t i = 0; i < maxPoolSize; ++i) {
pool_.emplace_back(newBuffer());
semaphore_.release();
}
}

facebook::velox::memory::MemoryPool& BufferPool::getMemoryPool() {
return memoryPool_;
}

// buffer back to the pool.
void BufferPool::addBuffer(std::unique_ptr<Buffer> buffer) {
std::scoped_lock<std::mutex> lock(mutex_);
pool_.push_back(std::move(buffer));
semaphore_.release();
}

// Reserves a buffer from the pool. Adds a new buffer to the pool
// while there are buffers available
BufferPool::BufferObject BufferPool::reserveBuffer() {
semaphore_.acquire();

std::scoped_lock<std::mutex> lock(mutex_);
auto buffer = std::move(pool_.back());
pool_.pop_back();

return BufferPool::BufferObject{*this, std::move(buffer)};
}

// Returns estimated number of buffers in the pool
size_t BufferPool::size() {
return pool_.size();
}

std::unique_ptr<Buffer> BufferPool::newBuffer() {
return std::make_unique<Buffer>(memoryPool_, defaultInitialChunkSize_);
}

std::unique_ptr<FieldWriter> FieldWriter::create(
FieldWriterContext& context,
const std::shared_ptr<const velox::dwio::common::TypeWithId>& type,
Expand Down
58 changes: 46 additions & 12 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,59 @@ class DecodingContextPool {
std::unique_ptr<velox::SelectivityVector> selectivityVector);
};

// Manages a pool of buffers. Buffers are returned to the pool when released.
// maxPoolSize should be set to at least 90% of capacity for performance
class BufferPool {
public:
class BufferObject {
public:
explicit BufferObject(BufferPool& pool, std::unique_ptr<Buffer> buffer);

~BufferObject();
Buffer& get();

private:
BufferPool& pool_;
std::unique_ptr<Buffer> buffer_;
};

explicit BufferPool(
facebook::velox::memory::MemoryPool& memoryPool,
size_t maxPoolSize = std::thread::hardware_concurrency(),
uint64_t initialChunkSize = kMinChunkSize);

facebook::velox::memory::MemoryPool& getMemoryPool();
BufferObject reserveBuffer();
size_t size();

private:
static const uint64_t kMinChunkSize = 1LL << 20;
const uint64_t defaultInitialChunkSize_;

std::mutex mutex_;
std::counting_semaphore<> semaphore_;
std::vector<std::unique_ptr<Buffer>> pool_;
facebook::velox::memory::MemoryPool& memoryPool_;

void addBuffer(std::unique_ptr<Buffer> buffer);
std::unique_ptr<Buffer> newBuffer();
};

struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
std::function<void(void)> vectorDecoderVisitor = []() {},
size_t maxPoolSize = std::thread::hardware_concurrency())
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
decodingContextPool_{std::move(vectorDecoderVisitor)} {
resetStringBuffer();
}
bufferPool_{
std::make_unique<BufferPool>(*bufferMemoryPool, maxPoolSize)},
decodingContextPool_{std::move(vectorDecoderVisitor)} {}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::mutex flatMapSchemaMutex;
Expand All @@ -112,13 +151,8 @@ struct FieldWriterContext {
return decodingContextPool_.reserveContext();
}

Buffer& stringBuffer() {
return *buffer_;
}

// Reset writer context for use by next stripe.
void resetStringBuffer() {
buffer_ = std::make_unique<Buffer>(*bufferMemoryPool);
BufferPool& bufferPool() {
return *bufferPool_;
}

const std::vector<std::unique_ptr<StreamData>>& streams() {
Expand Down Expand Up @@ -148,7 +182,7 @@ struct FieldWriterContext {
}

private:
std::unique_ptr<Buffer> buffer_;
std::unique_ptr<BufferPool> bufferPool_;
DecodingContextPool decodingContextPool_;
std::vector<std::unique_ptr<StreamData>> streams_;
};
Expand Down
20 changes: 10 additions & 10 deletions dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
: FieldWriterContext{
memoryPool,
options.reclaimerFactory(),
options.vectorDecoderVisitor,
options.maxPoolSize
},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down Expand Up @@ -622,9 +627,6 @@ void VeloxWriter::writeChunk(bool lastChunk) {
LoggingScope scope{*context_->logger};
velox::CpuWallTimer veloxTimer{context_->stripeFlushTiming};

if (!encodingBuffer_) {
encodingBuffer_ = std::make_unique<Buffer>(*encodingMemoryPool_);
}
streams_.resize(context_->schemaBuilder.nodeCount());

// When writing null streams, we write the nulls as data, and the stream
Expand Down Expand Up @@ -668,9 +670,11 @@ void VeloxWriter::writeChunk(bool lastChunk) {

auto encode = [&](StreamData& streamData) {
const auto offset = streamData.descriptor().offset();
auto encoded = encodeStream(*context_, *encodingBuffer_, streamData);
auto bufferObject = context_->bufferPool().reserveBuffer();
auto& buffer = bufferObject.get();
auto encoded = encodeStream(*context_, buffer, streamData);
if (!encoded.empty()) {
ChunkedStreamWriter chunkWriter{*encodingBuffer_};
ChunkedStreamWriter chunkWriter{buffer};
NIMBLE_DASSERT(offset < streams_.size(), "Stream offset out of range.");
auto& stream = streams_[offset];
for (auto& buffer : chunkWriter.encode(encoded)) {
Expand Down Expand Up @@ -782,10 +786,6 @@ uint32_t VeloxWriter::writeStripe() {
uint64_t startSize = writer_.size();
writer_.writeStripe(context_->rowsInStripe, std::move(streams_));
stripeSize = writer_.size() - startSize;
encodingBuffer_.reset();
// TODO: once chunked string fields are supported, move string buffer
// reset to writeChunk()
context_->resetStringBuffer();
}

NIMBLE_ASSERT(
Expand Down
1 change: 0 additions & 1 deletion dwio/nimble/velox/VeloxWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class VeloxWriter {
TabletWriter writer_;
std::unique_ptr<FieldWriter> root_;

std::unique_ptr<Buffer> encodingBuffer_;
std::vector<Stream> streams_;
std::exception_ptr lastException_;
const velox::common::SpillConfig* const spillConfig_;
Expand Down
1 change: 1 addition & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ struct VeloxWriterOptions {

const velox::common::SpillConfig* spillConfig{nullptr};

size_t maxPoolSize = std::thread::hardware_concurrency();
// If provided, internal writing/encoding operations will happen in parallel
// using the specified executors.
//
Expand Down
14 changes: 9 additions & 5 deletions dwio/nimble/velox/tests/VeloxWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,15 @@ TEST_F(VeloxWriterTests, MemoryReclaimPath) {
std::string file;
auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
std::atomic_bool reclaimEntered = false;
nimble::VeloxWriterOptions writerOptions{.reclaimerFactory = [&]() {
auto reclaimer = std::make_unique<MockReclaimer>();
reclaimer->setEnterArbitrationFunc([&]() { reclaimEntered = true; });
return reclaimer;
}};
nimble::VeloxWriterOptions writerOptions{
.reclaimerFactory =
[&]() {
auto reclaimer = std::make_unique<MockReclaimer>();
reclaimer->setEnterArbitrationFunc(
[&]() { reclaimEntered = true; });
return reclaimer;
},
.maxPoolSize = 2};
nimble::VeloxWriter writer(
*writerPool, type, std::move(writeFile), std::move(writerOptions));
auto batches = generateBatches(type, 100, 4000, 20221110, *leafPool_);
Expand Down