Skip to content

Commit

Permalink
Avoid rewriting unmodified B+tree nodes in distributed write path
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 517245970
Change-Id: I22d35be05783bb499281b47ad2920d656f63b1da
  • Loading branch information
jbms authored and copybara-github committed Mar 16, 2023
1 parent 558d9e8 commit 9648c69
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 51 deletions.
4 changes: 4 additions & 0 deletions tensorstore/kvstore/ocdbt/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,14 @@ tensorstore_cc_library(
":io_handle",
":ocdbt",
"//tensorstore/internal:intrusive_ptr",
"//tensorstore/kvstore",
"//tensorstore/kvstore:test_util",
"//tensorstore/kvstore/ocdbt/format",
"//tensorstore/util:future",
"//tensorstore/util:result",
"//tensorstore/util:status_testutil",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest",
],
)

Expand Down
36 changes: 27 additions & 9 deletions tensorstore/kvstore/ocdbt/distributed/btree_node_write_mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,28 @@ struct BtreeLeafNodeWriteMutationCodec {
[[nodiscard]] bool operator()(IO& io, Value&& value) {
static_assert(std::is_same_v<IO, riegeli::Writer> ||
std::is_same_v<IO, riegeli::Reader>);
uint8_t has_new_entry;
size_t mode;
if constexpr (std::is_same_v<IO, riegeli::Writer>) {
has_new_entry = value.new_entry.has_value();
mode = static_cast<size_t>(value.mode);
}
if (!KeyCodec{}(io, value.key) ||
!GenerationCodec{}(io, value.existing_generation.value) ||
!LittleEndianCodec<uint8_t>{}(io, has_new_entry)) {
!SizeCodec{}(io, mode)) {
return false;
}
if (!has_new_entry) return true;
if constexpr (std::is_same_v<IO, riegeli::Reader>) {
value.new_entry.emplace();
if (mode > BtreeNodeWriteMutation::kAddNew) {
io.Fail(absl::InvalidArgumentError(
absl::StrFormat("Invalid mutation mode: %d", mode)));
}
value.mode = static_cast<BtreeNodeWriteMutation::Mode>(mode);
}
if (mode <= BtreeNodeWriteMutation::kDeleteExisting) return true;
// Reuse the `LeafNodeValueReferenceArrayCodec` to write a single
// `LeafNodeValueReference`.
return LeafNodeValueReferenceArrayCodec{[](auto& e) -> decltype(auto) {
return (e.value_reference);
}}(io, span(&*value.new_entry, 1));
}}(io, span(&value.new_entry, 1));
}
};

Expand All @@ -118,13 +122,17 @@ struct BtreeInteriorNodeWriteMutationCodec {
[[nodiscard]] bool operator()(IO& io, Value&& value) {
static_assert(std::is_same_v<IO, riegeli::Writer> ||
std::is_same_v<IO, riegeli::Reader>);
size_t mode;
size_t num_entries;
if constexpr (std::is_same_v<IO, riegeli::Writer>) {
num_entries = value.new_entries.size();
// Encode the mutation mode and number of new entries together.
mode =
static_cast<size_t>(value.mode) + (num_entries ? num_entries - 1 : 0);
}
if (!KeyRangeCodec{}(io, value.existing_range) ||
!GenerationCodec{}(io, value.existing_generation.value) ||
!SizeCodec{}(io, num_entries)) {
!SizeCodec{}(io, mode)) {
return false;
}

Expand All @@ -133,6 +141,14 @@ struct BtreeInteriorNodeWriteMutationCodec {
io.Fail(absl::InvalidArgumentError("empty key range"));
return false;
}
if (mode < BtreeNodeWriteMutation::kAddNew) {
value.mode = static_cast<BtreeNodeWriteMutation::Mode>(mode);
num_entries = 0;
} else {
value.mode = BtreeNodeWriteMutation::kAddNew;
num_entries =
mode - static_cast<size_t>(BtreeNodeWriteMutation::kDeleteExisting);
}
}

constexpr size_t kMaxInitialSize = 1000;
Expand Down Expand Up @@ -198,8 +214,9 @@ absl::Status BtreeInteriorNodeWriteMutation::EncodeTo(

bool AddNewEntries(BtreeNodeEncoder<LeafNodeEntry>& encoder,
const BtreeLeafNodeWriteMutation& mutation) {
if (!mutation.new_entry) return false;
auto& new_entry = *mutation.new_entry;
assert(mutation.mode != BtreeNodeWriteMutation::kRetainExisting);
if (mutation.mode != BtreeNodeWriteMutation::kAddNew) return false;
auto& new_entry = mutation.new_entry;
LeafNodeEntry entry;
entry.key = mutation.key;
entry.value_reference = new_entry.value_reference;
Expand All @@ -209,6 +226,7 @@ bool AddNewEntries(BtreeNodeEncoder<LeafNodeEntry>& encoder,

bool AddNewEntries(BtreeNodeEncoder<InteriorNodeEntry>& encoder,
const BtreeInteriorNodeWriteMutation& mutation) {
assert(mutation.mode != BtreeNodeWriteMutation::kRetainExisting);
for (const auto& new_entry : mutation.new_entries) {
AddNewInteriorEntry(encoder, new_entry);
}
Expand Down
23 changes: 20 additions & 3 deletions tensorstore/kvstore/ocdbt/distributed/btree_node_write_mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ struct BtreeNodeWriteMutation
virtual ~BtreeNodeWriteMutation() = default;
using Ptr = internal::IntrusivePtr<const BtreeNodeWriteMutation>;
virtual absl::Status EncodeTo(riegeli::Writer&& writer) const = 0;

enum Mode : uint8_t {
// Retain existing value. This mutation merely checks that the
// `existing_generation` matches.
kRetainExisting = 0,

// Delete existing value.
kDeleteExisting = 1,

// Replace existing value with new entry/entries.
kAddNew = 2,
};
Mode mode;
};

struct BtreeLeafNodeWriteMutation : public BtreeNodeWriteMutation {
Expand All @@ -45,11 +58,13 @@ struct BtreeLeafNodeWriteMutation : public BtreeNodeWriteMutation {
struct NewEntry {
LeafNodeValueReference value_reference;
};
std::optional<NewEntry> new_entry;

// `new_entry` is meaningful only if
// `BtreeNodeWriteMutation::mode == kAddNew`.
NewEntry new_entry;

std::string_view inclusive_min() const { return key; }
std::string_view key_or_range() const { return key; }
bool has_new_entries() const { return new_entry.has_value(); }

absl::Status DecodeFrom(riegeli::Reader& reader);
absl::Status EncodeTo(riegeli::Writer&& writer) const override;
Expand All @@ -58,13 +73,15 @@ struct BtreeLeafNodeWriteMutation : public BtreeNodeWriteMutation {
struct BtreeInteriorNodeWriteMutation : public BtreeNodeWriteMutation {
KeyRange existing_range;
StorageGeneration existing_generation;

// Invariant: `new_entries` is non-empty, if and only if,
// `BtreeNodeWriteMutation::mode == kAddNew`.
std::vector<InteriorNodeEntryData<std::string>> new_entries;

std::string_view inclusive_min() const {
return existing_range.inclusive_min;
}
const KeyRange& key_or_range() const { return existing_range; }
bool has_new_entries() const { return !new_entries.empty(); }

absl::Status DecodeFrom(riegeli::Reader& reader);
absl::Status EncodeTo(riegeli::Writer&& writer) const override;
Expand Down
24 changes: 16 additions & 8 deletions tensorstore/kvstore/ocdbt/distributed/btree_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ void WriterCommitOperation::StagePending() {
const auto max_inline_value_bytes = config.max_inline_value_bytes;
for (auto& write_request : staged_.write_requests) {
auto& mutation = *write_request.mutation;
if (mutation.new_entry) {
auto& value_ref = mutation.new_entry->value_reference;
if (mutation.mode == BtreeNodeWriteMutation::kAddNew) {
auto& value_ref = mutation.new_entry.value_reference;
if (auto* value = std::get_if<absl::Cord>(&value_ref); value) {
if (value->size() > max_inline_value_bytes) {
auto v = std::move(*value);
Expand Down Expand Up @@ -664,11 +664,17 @@ void WriterCommitOperation::SubmitRequests(
stamp.time = r->time;
if (conditions_matched[i]) {
auto& mutation = *request.mutation;
if (mutation.new_entry) {
stamp.generation = internal_ocdbt::ComputeStorageGeneration(
mutation.new_entry->value_reference);
} else {
stamp.generation = StorageGeneration::NoValue();
switch (mutation.mode) {
case BtreeNodeWriteMutation::kRetainExisting:
stamp.generation = mutation.existing_generation;
break;
case BtreeNodeWriteMutation::kDeleteExisting:
stamp.generation = StorageGeneration::NoValue();
break;
case BtreeNodeWriteMutation::kAddNew:
stamp.generation = internal_ocdbt::ComputeStorageGeneration(
mutation.new_entry.value_reference);
break;
}
}
request.promise.SetResult(std::move(stamp));
Expand Down Expand Up @@ -709,8 +715,10 @@ Future<TimestampedStorageGeneration> DistributedBtreeWriter::Write(
request.promise = std::move(promise);

bool needs_inline_value_pass = false;
request.mutation->mode = value ? BtreeNodeWriteMutation::kAddNew
: BtreeNodeWriteMutation::kDeleteExisting;
if (value) {
auto& new_entry = request.mutation->new_entry.emplace();
auto& new_entry = request.mutation->new_entry;
auto& value_ref = new_entry.value_reference;
if (auto* config = writer.io_handle_->config_state->GetExistingConfig();
!config || value->size() <= config->max_inline_value_bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,20 +190,29 @@ struct NodeCommitOperation
// node.
//
// Sends a mutation request to the parent.
//
// If `new_entries` is `std::nullopt`, merely ensures that the existing entry
// for the node corresponding to `commit_op` is unchanged.
static void UpdateParent(
NodeCommitOperation::Ptr commit_op,
std::vector<InteriorNodeEntryData<std::string>> new_entries);
std::optional<std::vector<InteriorNodeEntryData<std::string>>>
new_entries);

// Called if the existing node corresponding to `commit_op` is the root node.
//
// Calls `WriteNewManifest` to update the manifest directly.
//
// If `new_entries` is `std::nullopt`, merely ensures that the existing
// manifest is unchanged.
static void UpdateRoot(
NodeCommitOperation::Ptr commit_op,
std::vector<InteriorNodeEntryData<std::string>> new_entries);
std::optional<std::vector<InteriorNodeEntryData<std::string>>>
new_entries);

// Called asynchronously by `UpdateRoot` to update the manifest.
static void WriteNewManifest(NodeCommitOperation::Ptr commit_op,
const BtreeGenerationReference& new_generation);
static void WriteNewManifest(
NodeCommitOperation::Ptr commit_op,
std::optional<BtreeGenerationReference> new_generation);

// Restarts the commit, in the case that the manifest was concurrently
// modified.
Expand Down Expand Up @@ -574,17 +583,18 @@ void NodeCommitOperation::ApplyMutationsForEntry(
node_encoder.AddEntry(/*existing=*/true, Entry(*existing_it));
}

// FIXME: handle modified == false
static_cast<void>(modified);

std::optional<std::vector<InteriorNodeEntryData<std::string>>> new_entries;
bool may_be_root =
commit_op->mutation_requests->lease_node->node_identifier.range.full();
TENSORSTORE_ASSIGN_OR_RETURN(auto encoded_nodes,
node_encoder.Finalize(may_be_root),
commit_op->SetError(_));
auto new_entries = internal_ocdbt::WriteNodes(*commit_op->server->io_handle_,
commit_op->flush_promise,
std::move(encoded_nodes));

if (modified) {
TENSORSTORE_ASSIGN_OR_RETURN(auto encoded_nodes,
node_encoder.Finalize(may_be_root),
commit_op->SetError(_));
new_entries = internal_ocdbt::WriteNodes(*commit_op->server->io_handle_,
commit_op->flush_promise,
std::move(encoded_nodes));
}

if (may_be_root) {
UpdateRoot(std::move(commit_op), std::move(new_entries));
Expand Down Expand Up @@ -616,12 +626,17 @@ NodeCommitOperation::ResolveMutationsForKey(
mutation.existing_generation)) {
// `if_equal` condition was satisfied, write will be marked as having
// completed successfully.
if (mutation.has_new_entries()) {
effective_request = &*mutation_it;
existing_generation = StorageGeneration::Unknown();
} else {
effective_request = nullptr;
existing_generation = StorageGeneration::NoValue();
switch (mutation.mode) {
case BtreeNodeWriteMutation::kAddNew:
effective_request = &*mutation_it;
existing_generation = StorageGeneration::Unknown();
break;
case BtreeNodeWriteMutation::kDeleteExisting:
effective_request = nullptr;
existing_generation = StorageGeneration::NoValue();
break;
case BtreeNodeWriteMutation::kRetainExisting:
break;
}
bit_ref = true;
} else {
Expand All @@ -639,11 +654,19 @@ NodeCommitOperation::ResolveMutationsForKey(

void NodeCommitOperation::UpdateParent(
NodeCommitOperation::Ptr commit_op,
std::vector<InteriorNodeEntryData<std::string>> new_entries) {
std::optional<std::vector<InteriorNodeEntryData<std::string>>>
new_entries) {
auto mutation = internal::MakeIntrusivePtr<BtreeInteriorNodeWriteMutation>();
mutation->existing_range = commit_op->key_range;
mutation->existing_generation = commit_op->node_generation;
mutation->new_entries = std::move(new_entries);
if (new_entries.has_value()) {
mutation->mode = new_entries->empty()
? BtreeNodeWriteMutation::kDeleteExisting
: BtreeNodeWriteMutation::kAddNew;
mutation->new_entries = std::move(*new_entries);
} else {
mutation->mode = BtreeNodeWriteMutation::kRetainExisting;
}
MutationBatchRequest batch_request;
batch_request.root_generation =
commit_op->existing_manifest->latest_generation();
Expand Down Expand Up @@ -689,19 +712,23 @@ void NodeCommitOperation::UpdateParent(

void NodeCommitOperation::UpdateRoot(
NodeCommitOperation::Ptr commit_op,
std::vector<InteriorNodeEntryData<std::string>> new_entries) {
TENSORSTORE_ASSIGN_OR_RETURN(
auto new_generation,
internal_ocdbt::WriteRootNode(*commit_op->server->io_handle_,
commit_op->flush_promise, commit_op->height,
std::move(new_entries)),
commit_op->SetError(_));
WriteNewManifest(std::move(commit_op), new_generation);
std::optional<std::vector<InteriorNodeEntryData<std::string>>>
new_entries) {
std::optional<BtreeGenerationReference> new_generation;
if (new_entries) {
TENSORSTORE_ASSIGN_OR_RETURN(
new_generation,
internal_ocdbt::WriteRootNode(
*commit_op->server->io_handle_, commit_op->flush_promise,
commit_op->height, std::move(*new_entries)),
commit_op->SetError(_));
}
WriteNewManifest(std::move(commit_op), std::move(new_generation));
}

void NodeCommitOperation::WriteNewManifest(
NodeCommitOperation::Ptr commit_op,
const BtreeGenerationReference& new_generation) {
std::optional<BtreeGenerationReference> new_generation) {
ABSL_LOG_IF(INFO, TENSORSTORE_INTERNAL_OCDBT_DEBUG)
<< "[Port=" << commit_op->server->listening_port_
<< "] WriteNewManifest: Initiate";
Expand All @@ -722,11 +749,15 @@ void NodeCommitOperation::WriteNewManifest(
<< ", current=" << existing_manifest->latest_generation();
return absl::AbortedError("");
}
if (!new_generation) {
// Leave manifest unmodified.
return existing_manifest;
}
auto [promise, future] =
PromiseFuturePair<std::shared_ptr<const Manifest>>::Make();
auto new_manifest_future = internal_ocdbt::CreateNewManifest(
commit_op->server->io_handle_, commit_op->existing_manifest,
new_generation);
*new_generation);
LinkValue(
[commit_op](Promise<std::shared_ptr<const Manifest>> promise,
ReadyFuture<std::pair<std::shared_ptr<Manifest>,
Expand Down
6 changes: 6 additions & 0 deletions tensorstore/kvstore/ocdbt/distributed/driver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,10 @@ TEST_F(DistributedTest, TwoCooperatorsManifestDeleted) {
MatchesStatus(absl::StatusCode::kFailedPrecondition));
}

// Tests that if a batch of writes leaves a node unmodified, it is not
// rewritten.
TEST_F(DistributedTest, UnmodifiedNode) {
tensorstore::internal_ocdbt::TestUnmodifiedNode(Context(context_spec));
}

} // namespace
6 changes: 6 additions & 0 deletions tensorstore/kvstore/ocdbt/driver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,10 @@ TENSORSTORE_GLOBAL_INITIALIZER {
}
}

// Tests that if a batch of writes leaves a node unmodified, it is not
// rewritten.
TEST(OcdbtTest, UnmodifiedNode) {
tensorstore::internal_ocdbt::TestUnmodifiedNode();
}

} // namespace
Loading

0 comments on commit 9648c69

Please sign in to comment.