Skip to content

Commit

Permalink
Introduced api for the asynchonous WAL
Browse files Browse the repository at this point in the history
  • Loading branch information
toktarev committed Feb 4, 2020
1 parent ec76e93 commit 2403f24
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
9 changes: 8 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;

// Function that Get and KeyMayExist call with no_io true or false
using DB::WriteWal;
virtual Status WriteWal(WriteBatch* updates,uint64_t last_sequence) override;

using DB::UnOrderedWrite;
virtual Status UnOrderedWrite(const WriteOptions& options, WriteBatch* updates,
uint64_t* last_sequence) override;

// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
Expand Down
24 changes: 24 additions & 0 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,30 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
return WriteImpl(write_options, my_batch, nullptr, nullptr);
}

Status DBImpl::WriteWal(WriteBatch* updates, uint64_t last_sequence) {
const size_t sub_batch_cnt = WriteBatchInternal::Count(updates);
uint64_t seq = last_sequence;
WriteOptions aWriteOptions{};

// Use a write thread to i) optimize for WAL write, ii) publish last
// sequence in in increasing order, iii) call pre_release_callback serially
return WriteImplWALOnly(&write_thread_, aWriteOptions, updates, nullptr,
nullptr, 0, &seq, sub_batch_cnt,
nullptr, kDoAssignOrder,
kDoPublishLastSeq, true);
}

Status DBImpl::UnOrderedWrite(const WriteOptions& options, WriteBatch* updates,
uint64_t* last_sequence) {
const size_t sub_batch_cnt = WriteBatchInternal::Count(updates);

*(last_sequence) = versions_->FetchAddLastAllocatedSequence(sub_batch_cnt) + 1;
return UnorderedWriteMemtable(options, updates, nullptr,
0,
*(last_sequence),
sub_batch_cnt);
}

#ifndef ROCKSDB_LITE
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
Expand Down
9 changes: 9 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ class DB {
// Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;

virtual Status WriteWal(WriteBatch* updates,uint64_t last_sequence) {
return Status::NotSupported();
}

virtual Status UnOrderedWrite(const WriteOptions& options, WriteBatch* updates,
uint64_t* last_sequence) {
return Status::NotSupported();
}

// If the database contains an entry for "key" store the
// corresponding value in *value and return OK.
//
Expand Down

0 comments on commit 2403f24

Please sign in to comment.