Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewvon committed Oct 1, 2024
2 parents 44e3db0 + 389e66b commit 5b0834a
Show file tree
Hide file tree
Showing 89 changed files with 2,070 additions and 897 deletions.
20 changes: 20 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
# Rocksdb Change Log
> NOTE: Entries for next release do not go here. Follow instructions in `unreleased_history/README.txt`
## 9.7.0 (09/20/2024)
### New Features
* Make Cache a customizable class that can be instantiated by the object registry.
* Add new option `prefix_seek_opt_in_only` that makes iterators generally safer when you might set a `prefix_extractor`. When `prefix_seek_opt_in_only=true`, which is expected to be the future default, prefix seek is only used when `prefix_same_as_start` or `auto_prefix_mode` are set. Also, `prefix_same_as_start` and `auto_prefix_mode` now allow prefix filtering even with `total_order_seek=true`.
* Add a new table property "rocksdb.key.largest.seqno" which records the largest sequence number of all keys in file. It is verified to be zero during SST file ingestion.

### Behavior Changes
* Changed the semantics of the BlobDB configuration option `blob_garbage_collection_force_threshold` to define a threshold for the overall garbage ratio of all blob files currently eligible for garbage collection (according to `blob_garbage_collection_age_cutoff`). This can provide better control over space amplification at the cost of slightly higher write amplification.
* Set `write_dbid_to_manifest=true` by default. This means DB ID will now be preserved through backups, checkpoints, etc. by default. Also add `write_identity_file` option which can be set to false for anticipated future behavior.
* In FIFO compaction, compactions for changing file temperature (configured by option `file_temperature_age_thresholds`) will compact one file at a time, instead of merging multiple eligible file together (#13018).
* Support ingesting db generated files using hard link, i.e. IngestExternalFileOptions::move_files/link_files and IngestExternalFileOptions::allow_db_generated_files.
* Add a new file ingestion option `IngestExternalFileOptions::link_files` to hard link input files and preserve original files links after ingestion.
* DB::Close now untracks files in SstFileManager, making avaialble any space used
by them. Prior to this change they would be orphaned until the DB is re-opened.

### Bug Fixes
* Fix a bug in CompactRange() where result files may not be compacted in any future compaction. This can only happen when users configure CompactRangeOptions::change_level to true and the change level step of manual compaction fails (#13009).
* Fix handling of dynamic change of `prefix_extractor` with memtable prefix filter. Previously, prefix seek could mix different prefix interpretations between memtable and SST files. Now the latest `prefix_extractor` at the time of iterator creation or refresh is respected.
* Fix a bug with manual_wal_flush and auto error recovery from WAL failure that may cause CFs to be inconsistent (#12995). The fix will set potential WAL write failure as fatal error when manual_wal_flush is true, and disables auto error recovery from these errors.

## 9.6.0 (08/19/2024)
### New Features
* *Best efforts recovery supports recovering to incomplete Version with a clean seqno cut that presents a valid point in time view from the user's perspective, if versioning history doesn't include atomic flush.
Expand Down
26 changes: 15 additions & 11 deletions cache/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,23 @@ Status Cache::CreateFromString(const ConfigOptions& config_options,
std::shared_ptr<Cache>* result) {
Status status;
std::shared_ptr<Cache> cache;
if (value.find('=') == std::string::npos) {
cache = NewLRUCache(ParseSizeT(value));
} else {
LRUCacheOptions cache_opts;
status = OptionTypeInfo::ParseStruct(config_options, "",
&lru_cache_options_type_info, "",
value, &cache_opts);
if (value.find("://") == std::string::npos) {
if (value.find('=') == std::string::npos) {
cache = NewLRUCache(ParseSizeT(value));
} else {
LRUCacheOptions cache_opts;
status = OptionTypeInfo::ParseStruct(config_options, "",
&lru_cache_options_type_info, "",
value, &cache_opts);
if (status.ok()) {
cache = NewLRUCache(cache_opts);
}
}
if (status.ok()) {
cache = NewLRUCache(cache_opts);
result->swap(cache);
}
}
if (status.ok()) {
result->swap(cache);
} else {
status = LoadSharedObject<Cache>(config_options, value, result);
}
return status;
}
Expand Down
46 changes: 38 additions & 8 deletions cache/compressed_secondary_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1,
static_cast<uint32_t*>(&source_32));
source = static_cast<CacheTier>(source_32);
handle_value_charge -= (data_ptr - ptr->get());
uint64_t data_size = 0;
data_ptr = GetVarint64Ptr(data_ptr, ptr->get() + handle_value_charge,
static_cast<uint64_t*>(&data_size));
assert(handle_value_charge > data_size);
handle_value_charge = data_size;
}
MemoryAllocator* allocator = cache_options_.memory_allocator.get();

Expand Down Expand Up @@ -169,13 +173,15 @@ Status CompressedSecondaryCache::InsertInternal(
}

auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
char header[10];
char header[20];
char* payload = header;
payload = EncodeVarint32(payload, static_cast<uint32_t>(type));
payload = EncodeVarint32(payload, static_cast<uint32_t>(source));
size_t data_size = (*helper->size_cb)(value);
char* data_size_ptr = payload;
payload = EncodeVarint64(payload, data_size);

size_t header_size = payload - header;
size_t data_size = (*helper->size_cb)(value);
size_t total_size = data_size + header_size;
CacheAllocationPtr ptr =
AllocateBlock(total_size, cache_options_.memory_allocator.get());
Expand Down Expand Up @@ -210,6 +216,8 @@ Status CompressedSecondaryCache::InsertInternal(

val = Slice(compressed_val);
data_size = compressed_val.size();
payload = EncodeVarint64(data_size_ptr, data_size);
header_size = payload - header;
total_size = header_size + data_size;
PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, data_size);

Expand All @@ -222,14 +230,21 @@ Status CompressedSecondaryCache::InsertInternal(

PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
if (cache_options_.enable_custom_split_merge) {
size_t charge{0};
CacheValueChunk* value_chunks_head =
SplitValueIntoChunks(val, cache_options_.compression_type, charge);
return cache_->Insert(key, value_chunks_head, internal_helper, charge);
size_t split_charge{0};
CacheValueChunk* value_chunks_head = SplitValueIntoChunks(
val, cache_options_.compression_type, split_charge);
return cache_->Insert(key, value_chunks_head, internal_helper,
split_charge);
} else {
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
size_t charge = malloc_usable_size(ptr.get());
#else
size_t charge = total_size;
#endif
std::memcpy(ptr.get(), header, header_size);
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
return cache_->Insert(key, buf, internal_helper, total_size);
charge += sizeof(CacheAllocationPtr);
return cache_->Insert(key, buf, internal_helper, charge);
}
}

Expand Down Expand Up @@ -398,6 +413,21 @@ const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper(
}
}

size_t CompressedSecondaryCache::TEST_GetCharge(const Slice& key) {
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
return 0;
}

size_t charge = cache_->GetCharge(lru_handle);
if (cache_->Value(lru_handle) != nullptr &&
!cache_options_.enable_custom_split_merge) {
charge -= 10;
}
cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
return charge;
}

std::shared_ptr<SecondaryCache>
CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
return std::make_shared<CompressedSecondaryCache>(*this);
Expand Down
2 changes: 2 additions & 0 deletions cache/compressed_secondary_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class CompressedSecondaryCache : public SecondaryCache {
const Cache::CacheItemHelper* helper,
CompressionType type, CacheTier source);

size_t TEST_GetCharge(const Slice& key);

// TODO: clean up to use cleaner interfaces in typed_cache.h
const Cache::CacheItemHelper* GetHelper(bool enable_custom_split_merge) const;
std::shared_ptr<Cache> cache_;
Expand Down
4 changes: 4 additions & 0 deletions cache/compressed_secondary_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class CompressedSecondaryCacheTestBase : public testing::Test,
protected:
void BasicTestHelper(std::shared_ptr<SecondaryCache> sec_cache,
bool sec_cache_is_compressed) {
CompressedSecondaryCache* comp_sec_cache =
static_cast<CompressedSecondaryCache*>(sec_cache.get());
get_perf_context()->Reset();
bool kept_in_sec_cache{true};
// Lookup an non-existent key.
Expand Down Expand Up @@ -66,6 +68,8 @@ class CompressedSecondaryCacheTestBase : public testing::Test,
ASSERT_OK(sec_cache->Insert(key1, &item1, GetHelper(), false));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1);

ASSERT_GT(comp_sec_cache->TEST_GetCharge(key1), 1000);

std::unique_ptr<SecondaryCacheResultHandle> handle1_2 =
sec_cache->Lookup(key1, GetHelper(), this, true, /*advise_erase=*/true,
/*stats=*/nullptr, kept_in_sec_cache);
Expand Down
3 changes: 2 additions & 1 deletion cache/secondary_cache_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value,
// Warm up the secondary cache with the compressed block. The secondary
// cache may choose to ignore it based on the admission policy.
if (value != nullptr && !compressed_value.empty() &&
adm_policy_ == TieredAdmissionPolicy::kAdmPolicyThreeQueue) {
adm_policy_ == TieredAdmissionPolicy::kAdmPolicyThreeQueue &&
helper->IsSecondaryCacheCompatible()) {
Status status = secondary_cache_->InsertSaved(key, compressed_value, type);
assert(status.ok() || status.IsNotSupported());
}
Expand Down
48 changes: 48 additions & 0 deletions cache/tiered_secondary_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,54 @@ TEST_F(DBTieredSecondaryCacheTest, IterateTest) {
Destroy(options);
}

TEST_F(DBTieredSecondaryCacheTest, VolatileTierTest) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
return;
}

BlockBasedTableOptions table_options;
// We want a block cache of size 5KB, and a compressed secondary cache of
// size 5KB. However, we specify a block cache size of 256KB here in order
// to take into account the cache reservation in the block cache on
// behalf of the compressed cache. The unit of cache reservation is 256KB.
// The effective block cache capacity will be calculated as 256 + 5 = 261KB,
// and 256KB will be reserved for the compressed cache, leaving 5KB for
// the primary block cache. We only have to worry about this here because
// the cache size is so small.
table_options.block_cache = NewCache(256 * 1024, 5 * 1024, 256 * 1024);
table_options.block_size = 4 * 1024;
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.compression = kLZ4Compression;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

// Disable paranoid_file_checks so that flush will not read back the newly
// written file
options.paranoid_file_checks = false;
options.lowest_used_cache_tier = CacheTier::kVolatileTier;
DestroyAndReopen(options);
Random rnd(301);
const int N = 256;
for (int i = 0; i < N; i++) {
std::string p_v;
test::CompressibleString(&rnd, 0.5, 1007, &p_v);
ASSERT_OK(Put(Key(i), p_v));
}

ASSERT_OK(Flush());

// Since lowest_used_cache_tier is the volatile tier, nothing should be
// inserted in the secondary cache.
std::string v = Get(Key(0));
ASSERT_EQ(1007, v.size());
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 0u);
ASSERT_EQ(nvm_sec_cache()->num_misses(), 0u);

Destroy(options);
}

class DBTieredAdmPolicyTest
: public DBTieredSecondaryCacheTest,
public testing::WithParamInterface<TieredAdmissionPolicy> {};
Expand Down
21 changes: 12 additions & 9 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,23 @@ void ArenaWrappedDBIter::Init(
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback,
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(
env, read_options, ioptions, mutable_cf_options, ioptions.user_comparator,
/* iter */ nullptr, version, sequence, true,
max_sequential_skip_in_iteration, read_callback, cfh, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
memtable_range_tombstone_iter_ = nullptr;

if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false;
}
read_options_.total_order_seek |= ioptions.prefix_seek_opt_in_only;

auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(env, read_options_, ioptions, mutable_cf_options,
ioptions.user_comparator,
/* iter */ nullptr, version, sequence, true,
max_sequential_skip_in_iteration, read_callback,
cfh, expose_blob_index);

sv_number_ = version_number;
allow_refresh_ = allow_refresh;
memtable_range_tombstone_iter_ = nullptr;
}

Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }
Expand Down
9 changes: 9 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4075,6 +4075,15 @@ void rocksdb_options_set_write_dbid_to_manifest(
opt->rep.write_dbid_to_manifest = write_dbid_to_manifest;
}

unsigned char rocksdb_options_get_write_identity_file(rocksdb_options_t* opt) {
return opt->rep.write_identity_file;
}

void rocksdb_options_set_write_identity_file(
rocksdb_options_t* opt, unsigned char write_identity_file) {
opt->rep.write_identity_file = write_identity_file;
}

unsigned char rocksdb_options_get_track_and_verify_wals_in_manifest(
rocksdb_options_t* opt) {
return opt->rep.track_and_verify_wals_in_manifest;
Expand Down
13 changes: 12 additions & 1 deletion db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,8 @@ int main(int argc, char** argv) {
rocksdb_options_set_write_buffer_size(options, 100000);
rocksdb_options_set_paranoid_checks(options, 1);
rocksdb_options_set_max_open_files(options, 10);
/* Compatibility with how test was written */
rocksdb_options_set_write_dbid_to_manifest(options, 0);

table_options = rocksdb_block_based_options_create();
rocksdb_block_based_options_set_block_cache(table_options, cache);
Expand Down Expand Up @@ -962,15 +964,24 @@ int main(int argc, char** argv) {
rocksdb_options_t* options_dbid_in_manifest = rocksdb_options_create();
rocksdb_options_set_create_if_missing(options_dbid_in_manifest, 1);

rocksdb_options_set_write_dbid_to_manifest(options_dbid_in_manifest, false);
unsigned char write_to_manifest =
rocksdb_options_get_write_dbid_to_manifest(options_dbid_in_manifest);
CheckCondition(!write_to_manifest);
rocksdb_options_set_write_dbid_to_manifest(options_dbid_in_manifest, true);
CheckCondition(!write_to_manifest);
write_to_manifest =
rocksdb_options_get_write_dbid_to_manifest(options_dbid_in_manifest);
CheckCondition(write_to_manifest);

rocksdb_options_set_write_identity_file(options_dbid_in_manifest, true);
unsigned char write_identity_file =
rocksdb_options_get_write_identity_file(options_dbid_in_manifest);
CheckCondition(write_identity_file);
rocksdb_options_set_write_identity_file(options_dbid_in_manifest, false);
write_identity_file =
rocksdb_options_get_write_identity_file(options_dbid_in_manifest);
CheckCondition(!write_identity_file);

db = rocksdb_open(options_dbid_in_manifest, dbbackupname, &err);
CheckNoError(err);

Expand Down
4 changes: 3 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1201,8 +1201,10 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(super_version->mem->NewIterator(
read_opts, /*seqno_to_time_mapping=*/nullptr, &arena));
read_opts, /*seqno_to_time_mapping=*/nullptr, &arena,
/*prefix_extractor=*/nullptr));
super_version->imm->AddIterators(read_opts, /*seqno_to_time_mapping=*/nullptr,
/*prefix_extractor=*/nullptr,
&merge_iter_builder,
false /* add_range_tombstone_iter */);
ScopedArenaPtr<InternalIterator> memtable_iter(merge_iter_builder.Finish());
Expand Down
7 changes: 1 addition & 6 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,7 @@ class CompactionJob {
// doesn't contain the LSM tree information, which is passed though MANIFEST
// file.
struct CompactionServiceInput {
ColumnFamilyDescriptor column_family;

DBOptions db_options;
std::string cf_name;

std::vector<SequenceNumber> snapshots;

Expand All @@ -402,9 +400,6 @@ struct CompactionServiceInput {
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output);

// Initialize a dummy ColumnFamilyDescriptor
CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}

#ifndef NDEBUG
bool TEST_Equals(CompactionServiceInput* other);
bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
Expand Down
Loading

0 comments on commit 5b0834a

Please sign in to comment.