Skip to content

Commit

Permalink
Add a new MultiGetEntity API (facebook#11222)
Browse files Browse the repository at this point in the history
Summary:
The new `MultiGetEntity` API can be used to get a consistent view of
a batch of keys, with the results presented as wide-column entities.
Similarly to `GetEntity` and the iterator's `columns` API, if the entry
corresponding to the key is a wide-column entity to start with, it is
returned as-is, and if it is a plain key-value, it is wrapped into an entity
with a single default column.

Implementation-wise, the new API shares the logic of the batched `MultiGet`
API (via the `MultiGetCommon` methods). Both single-CF and multi-CF
`MultiGetEntity` APIs are provided, and blobs are also supported.

Pull Request resolved: facebook#11222

Test Plan: `make check`

Reviewed By: akankshamahajan15

Differential Revision: D43256950

Pulled By: ltamasi

fbshipit-source-id: 47fb2cb7e2d0470e3580f43fdb2fe9e51f0e7005
  • Loading branch information
ltamasi authored and facebook-github-bot committed Feb 15, 2023
1 parent 6d5e860 commit 9794acb
Show file tree
Hide file tree
Showing 13 changed files with 438 additions and 87 deletions.
43 changes: 38 additions & 5 deletions db/blob/db_blob_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1782,16 +1782,49 @@ TEST_F(DBBlobBasicTest, GetEntityBlob) {
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";

constexpr char other_key[] = "other_key";
constexpr char other_blob_value[] = "other_blob_value";

ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Put(other_key, other_blob_value));

ASSERT_OK(Flush());

PinnableWideColumns result;
ASSERT_OK(
db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key, &result));

WideColumns expected_columns{{kDefaultWideColumnName, blob_value}};
ASSERT_EQ(result.columns(), expected_columns);
WideColumns other_expected_columns{
{kDefaultWideColumnName, other_blob_value}};

{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key,
&result));
ASSERT_EQ(result.columns(), expected_columns);
}

{
PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(),
other_key, &result));

ASSERT_EQ(result.columns(), other_expected_columns);
}

{
constexpr size_t num_keys = 2;

std::array<Slice, num_keys> keys{{key, other_key}};
std::array<PinnableWideColumns, num_keys> results;
std::array<Status, num_keys> statuses;

db_->MultiGetEntity(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &results[0], &statuses[0]);

ASSERT_OK(statuses[0]);
ASSERT_EQ(results[0].columns(), expected_columns);

ASSERT_OK(statuses[1]);
ASSERT_EQ(results[1].columns(), other_expected_columns);
}
}

class DBBlobWithTimestampTest : public DBBasicTestWithTimestampBase {
Expand Down
88 changes: 79 additions & 9 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2605,14 +2605,25 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input) {
return MultiGet(read_options, num_keys, column_families, keys, values,
/*timestamps=*/nullptr, statuses, sorted_input);
MultiGet(read_options, num_keys, column_families, keys, values,
/* timestamps */ nullptr, statuses, sorted_input);
}

void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input) {
MultiGetCommon(read_options, num_keys, column_families, keys, values,
/* columns */ nullptr, timestamps, statuses, sorted_input);
}

void DBImpl::MultiGetCommon(const ReadOptions& read_options,
const size_t num_keys,
ColumnFamilyHandle** column_families,
const Slice* keys, PinnableSlice* values,
PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
const bool sorted_input) {
if (num_keys == 0) {
return;
}
Expand Down Expand Up @@ -2658,8 +2669,20 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
values[i].Reset();
key_context.emplace_back(column_families[i], keys[i], &values[i],
PinnableSlice* val = nullptr;
PinnableWideColumns* col = nullptr;

if (values) {
val = &values[i];
val->Reset();
} else {
assert(columns);

col = &columns[i];
col->Reset();
}

key_context.emplace_back(column_families[i], keys[i], val, col,
timestamps ? &timestamps[i] : nullptr,
&statuses[i]);
}
Expand Down Expand Up @@ -2783,15 +2806,25 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input) {
return MultiGet(read_options, column_family, num_keys, keys, values,
/*timestamp=*/nullptr, statuses, sorted_input);
MultiGet(read_options, column_family, num_keys, keys, values,
/* timestamps */ nullptr, statuses, sorted_input);
}

void DBImpl::MultiGet(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
std::string* timestamps, Status* statuses,
const bool sorted_input) {
MultiGetCommon(read_options, column_family, num_keys, keys, values,
/* columns */ nullptr, timestamps, statuses, sorted_input);
}

void DBImpl::MultiGetCommon(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
bool sorted_input) {
if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
// tracing is enabled.
Expand All @@ -2805,8 +2838,20 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
values[i].Reset();
key_context.emplace_back(column_family, keys[i], &values[i],
PinnableSlice* val = nullptr;
PinnableWideColumns* col = nullptr;

if (values) {
val = &values[i];
val->Reset();
} else {
assert(columns);

col = &columns[i];
col->Reset();
}

key_context.emplace_back(column_family, keys[i], val, col,
timestamps ? &timestamps[i] : nullptr,
&statuses[i]);
}
Expand Down Expand Up @@ -2968,8 +3013,17 @@ Status DBImpl::MultiGetImpl(
uint64_t bytes_read = 0;
for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
KeyContext* key = (*sorted_keys)[i];
assert(key);
assert(key->s);

if (key->s->ok()) {
bytes_read += key->value->size();
if (key->value) {
bytes_read += key->value->size();
} else {
assert(key->columns);
bytes_read += key->columns->serialized_size();
}

num_found++;
}
}
Expand All @@ -2993,6 +3047,22 @@ Status DBImpl::MultiGetImpl(
return s;
}

void DBImpl::MultiGetEntity(const ReadOptions& options, size_t num_keys,
ColumnFamilyHandle** column_families,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
MultiGetCommon(options, num_keys, column_families, keys, /* values */ nullptr,
results, /* timestamps */ nullptr, statuses, sorted_input);
}

void DBImpl::MultiGetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) {
MultiGetCommon(options, column_family, num_keys, keys, /* values */ nullptr,
results, /* timestamps */ nullptr, statuses, sorted_input);
}

Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) {
Expand Down
65 changes: 42 additions & 23 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,33 +277,40 @@ class DBImpl : public DB {
// The values and statuses parameters are arrays with number of elements
// equal to keys.size(). This allows the storage for those to be alloacted
// by the caller on the stack for small batches
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
virtual void MultiGet(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses,
const bool sorted_input = false) override;

virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
virtual void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses,
const bool sorted_input = false) override;

virtual void MultiGetWithCallback(
void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
Status* statuses, const bool sorted_input = false) override;
void MultiGet(const ReadOptions& options, ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys, PinnableSlice* values,
std::string* timestamps, Status* statuses,
const bool sorted_input = false) override;

void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input = false) override;
void MultiGet(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, std::string* timestamps,
Status* statuses, const bool sorted_input = false) override;

void MultiGetWithCallback(
const ReadOptions& options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);

using DB::MultiGetEntity;

void MultiGetEntity(const ReadOptions& options,
ColumnFamilyHandle* column_family, size_t num_keys,
const Slice* keys, PinnableWideColumns* results,
Status* statuses, bool sorted_input) override;

void MultiGetEntity(const ReadOptions& options, size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableWideColumns* results, Status* statuses,
bool sorted_input) override;

virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) override;
Expand Down Expand Up @@ -2191,6 +2198,18 @@ class DBImpl : public DB {
const size_t num_keys, bool sorted,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs);

void MultiGetCommon(const ReadOptions& options,
ColumnFamilyHandle* column_family, const size_t num_keys,
const Slice* keys, PinnableSlice* values,
PinnableWideColumns* columns, std::string* timestamps,
Status* statuses, bool sorted_input);

void MultiGetCommon(const ReadOptions& options, const size_t num_keys,
ColumnFamilyHandle** column_families, const Slice* keys,
PinnableSlice* values, PinnableWideColumns* columns,
std::string* timestamps, Status* statuses,
bool sorted_input);

// A structure to hold the information required to process MultiGet of keys
// belonging to one column family. For a multi column family MultiGet, there
// will be a container of these objects.
Expand Down
18 changes: 12 additions & 6 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1452,18 +1452,24 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
}
SequenceNumber dummy_seq;
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
callback, &iter->is_blob_index, iter->value->GetSelf(),
/*columns=*/nullptr, iter->timestamp, iter->s,
&(iter->merge_context), &dummy_seq, &found_final_value,
&merge_in_progress);
callback, &iter->is_blob_index,
iter->value ? iter->value->GetSelf() : nullptr, iter->columns,
iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq,
&found_final_value, &merge_in_progress);

if (!found_final_value && merge_in_progress) {
*(iter->s) = Status::MergeInProgress();
}

if (found_final_value) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
if (iter->value) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
} else {
assert(iter->columns);
range->AddValueSize(iter->columns->serialized_size());
}

range->MarkKeyDone(iter);
RecordTick(moptions_.statistics, MEMTABLE_HIT);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
Expand Down
Loading

0 comments on commit 9794acb

Please sign in to comment.