Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] write while sync #17

Open
wants to merge 4 commits into
base: dev.1.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@
#if !defined(_MSC_VER) && !defined(__APPLE__)
#include <sys/unistd.h>



#endif
#include "utilities/util/valvec.hpp"

Expand All @@ -112,16 +110,16 @@

#ifdef WITH_TERARK_ZIP
#include <table/terark_zip_table.h>
#include <terark/util/fiber_pool.hpp>

#include <terark/thread/fiber_yield.hpp>
#include <terark/util/fiber_pool.hpp>
#endif

#ifdef BOOSTLIB
#include <boost/fiber/all.hpp>
#endif
//#include <boost/context/pooled_fixedsize_stack.hpp>


#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif
Expand Down Expand Up @@ -275,6 +273,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
use_custom_gc_(seq_per_batch),
shutdown_initiated_(false),
own_sfm_(options.sst_file_manager == nullptr),
write_wal_while_sync_(options.write_wal_while_sync),
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_),
Expand Down Expand Up @@ -962,6 +961,7 @@ Status DBImpl::SetDBOptions(
s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
&new_options);
if (s.ok()) {
write_wal_while_sync_ = new_options.write_wal_while_sync;
auto bg_job_limits = DBImpl::GetBGJobLimits(
immutable_db_options_.max_background_flushes,
new_options.max_background_compactions,
Expand Down
9 changes: 9 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,13 @@ class DBImpl : public DB {
uint64_t* seq_used = nullptr, size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status WriteWhileSyncWriteImpl(
const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr, uint64_t* log_used = nullptr,
uint64_t log_ref = 0, bool disable_memtable = false,
uint64_t* seq_used = nullptr, size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

// write cached_recoverable_state_ to memtable if it is not empty
// The writer must be the leader in write_thread_ and holding mutex_
Status WriteRecoverableState();
Expand Down Expand Up @@ -1667,6 +1674,8 @@ class DBImpl : public DB {
// DB::Open() or passed to us
bool own_sfm_;

bool write_wal_while_sync_;

// Clients must periodically call SetPreserveDeletesSequenceNumber()
// to advance this seqnum. Default value is 0 which means ALL deletes are
// preserved. Note that this has no effect if DBOptions.preserve_deletes
Expand Down
311 changes: 311 additions & 0 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
log_ref, disable_memtable, seq_used);
}

if (write_options.sync && !write_options.disableWAL && !two_write_queues_ &&
write_wal_while_sync_) {
return WriteWhileSyncWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used,
batch_cnt, pre_release_callback);
}

PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, batch_cnt, pre_release_callback);
Expand Down Expand Up @@ -682,6 +689,310 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
return status;
}

Status DBImpl::WriteWhileSyncWriteImpl(
const WriteOptions& write_options, WriteBatch* my_batch,
WriteCallback* callback, uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used, size_t batch_cnt,
PreReleaseCallback* pre_release_callback) {
Status status;

PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, batch_cnt, pre_release_callback);

RecordTick(stats_, WRITE_WITH_WAL);

StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

std::vector<log::Writer*> writers;
std::vector<WriteThread::Writer*> manual_wake_followers;
std::vector<std::function<Status()>> defer_sync_funcs;

write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group

if (w.ShouldWriteToMemtable()) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_memtable_time);

ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt);

PERF_TIMER_START(write_pre_and_post_process_time);
}

if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group
for (auto* writer : *(w.write_group)) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence,
disable_memtable);
if (!ws.ok()) {
status = ws;
break;
}
}
}
// TODO(myabandeh): propagate status to write_group
auto last_sequence = w.write_group->last_sequence;
MemTableInsertStatusCheck(w.status);
// TODO(linyuanjin): may violate consistency
versions_->SetLastSequence(last_sequence);
write_thread_.ExitAsBatchGroupFollower(&w, &manual_wake_followers);
status = w.write_group->exit_callback();

for (auto* follower : manual_wake_followers) {
follower->status = status;
WriteThread::SetStateCompleted(follower);
}
}
assert(w.state == WriteThread::STATE_COMPLETED);
// STATE_COMPLETED conditional below handles exit

status = w.FinalStatus();
}
if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) {
*log_used = w.log_used;
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
return w.FinalStatus();
}
// else we are the leader of the write batch group
assert(w.state == WriteThread::STATE_GROUP_LEADER);

// Once reaches this point, the current writer "w" will try to do its write
// job. It may also pick up some of the remaining writers in the "writers_"
// when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer.
WriteContext write_context;
WriteThread::WriteGroup write_group;
bool in_parallel_group = false;
uint64_t last_sequence = kMaxSequenceNumber;
if (!two_write_queues_) {
last_sequence = versions_->LastSequence();
}

mutex_.Lock();

bool need_log_sync = false /* write_options.sync */;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;

if (!two_write_queues_ || !disable_memtable) {
// With concurrent writes we do preprocess only in the write thread that
// also does write to memtable to avoid sync issue on shared data structure
// with the other thread

// PreprocessWrite does its own perf timing.
PERF_TIMER_STOP(write_pre_and_post_process_time);

status = PreprocessWrite(write_options, &need_log_sync, &write_context);

PERF_TIMER_START(write_pre_and_post_process_time);
}
log::Writer* log_writer = logs_.back().writer;

for (auto& log : logs_) {
writers.emplace_back(log.writer);
}

mutex_.Unlock();

// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into memtables

TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters");
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);

if (status.ok()) {
// Rules for when we can update the memtable concurrently
// 1. supported by memtable
// 2. Puts are not okay if inplace_update_support
// 3. Merges are not okay
//
// Rules 1..2 are enforced by checking the options
// during startup (CheckConcurrentWritesSupported), so if
// options.allow_concurrent_memtable_write is true then they can be
// assumed to be true. Rule 3 is checked for each batch. We could
// relax rules 2 if we could prevent write batches from referring
// more than once to a particular key.
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size > 1;
size_t total_count = 0;
size_t valid_batches = 0;
size_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
valid_batches += writer->batch_cnt;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
}

total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
}
}
// Note about seq_per_batch_: either disableWAL is set for the entire write
// group or not. In either case we inc seq for each write batch with no
// failed callback. This means that there could be a batch with
// disalbe_memtable in between; although we do not write this batch to
// memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
// the seq per valid written key to mem.
size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;

const bool concurrent_update = two_write_queues_;
// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early.
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count,
concurrent_update);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size,
concurrent_update);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other,
concurrent_update);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);

PERF_TIMER_STOP(write_pre_and_post_process_time);

if (status.ok() /* && !write_options.disableWAL */) {
PERF_TIMER_GUARD(write_wal_time);
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
for (auto& w : writers) {
defer_sync_funcs.emplace_back(w->get_defer_sync_func());
}
w.write_group->exit_callback = [func_vec{std::move(defer_sync_funcs)}]() {
for (auto& func : func_vec) {
auto s = func();
if (!s.ok()) {
return s;
}
}
return Status::OK();
};
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;

if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);

if (!parallel) {
// w.sequence will be set inside InsertInto
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
batch_per_txn_);
} else {
SequenceNumber next_sequence = current_sequence;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
// the merged batch during recovery from the WAL.
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
writer->sequence = next_sequence;
if (seq_per_batch_) {
assert(writer->batch_cnt);
next_sequence += writer->batch_cnt;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->batch);
}
}
write_group.last_sequence = last_sequence;
write_thread_.LaunchParallelMemTableWriters(&write_group);
in_parallel_group = true;

// Each parallel follower is doing each own writes. The leader should
// also do its own.
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence);
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
w.batch_cnt, batch_per_txn_);
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
PERF_TIMER_START(write_pre_and_post_process_time);

if (!w.CallbackFailed()) {
WriteStatusCheck(status);
}

bool should_exit_batch_group = true;
if (in_parallel_group) {
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
}
if (should_exit_batch_group) {
if (status.ok()) {
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence,
disable_memtable);
if (!ws.ok()) {
status = ws;
break;
}
}
}
}
MemTableInsertStatusCheck(w.status);
// TODO(linyuanjin): may violate consistency
versions_->SetLastSequence(last_sequence);
write_thread_.ExitAsBatchGroupLeader(write_group, status,
&manual_wake_followers);
status = write_group.exit_callback();

for (auto* follower : manual_wake_followers) {
follower->status = status;
WriteThread::SetStateCompleted(follower);
}
}

if (status.ok()) {
status = w.FinalStatus();
}
return status;
}

void DBImpl::WriteStatusCheck(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
Expand Down
Loading