Skip to content

Commit

Permalink
Persist the new MANIFEST after successfully syncing the new WAL durin…
Browse files Browse the repository at this point in the history
…g recovery (#9922)

Summary:
In case of non-TransactionDB and avoid_flush_during_recovery = true, RocksDB won't
flush the data from WAL to L0 for all column families if possible. As a
result, not all column families can increase their log_numbers, and
min_log_number_to_keep won't change.
For transaction DB (.allow_2pc), even with the flush, there may be old WAL files that it must not delete because they can contain data of uncommitted transactions and min_log_number_to_keep won't change.
If we persist a new MANIFEST with
advanced log_numbers for some column families, then during a second
crash after persisting the MANIFEST, RocksDB will see some column
families' log_numbers larger than the corrupted wal, and the "column family inconsistency" error will be hit, causing recovery to fail.

As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL.
If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error.
Currently, RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. This PR buffers the edits in a structure and writes to a new MANIFEST after recovery is successful

Pull Request resolved: facebook/rocksdb#9922

Test Plan:
1. Update unit tests to fail without this change
2. make crast_test -j

Branch with unit test and no fix  facebook/rocksdb#9942 to keep track of unit test (without fix)

Reviewed By: riversand963

Differential Revision: D36043701

Pulled By: akankshamahajan15

fbshipit-source-id: 5760970db0a0920fb73d3c054a4155733500acd9
  • Loading branch information
akankshamahajan15 authored and riversand963 committed Jun 8, 2022
1 parent 8244f13 commit 405a35f
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 113 deletions.
77 changes: 67 additions & 10 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest,
// The combination of corrupting a WAL and injecting an error during subsequent
// re-open exposes the bug of prematurely persisting a new MANIFEST with
// advanced ColumnFamilyData::log_number.
TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
CloseDb();
Options options;
options.track_and_verify_wals_in_manifest =
Expand Down Expand Up @@ -1107,7 +1107,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
// number. TEST_SwitchMemtable makes sure WALs are not synced and test can
// corrupt un-sync WAL.
for (int i = 0; i < 2; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), "value"));
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
}

Expand Down Expand Up @@ -1188,6 +1189,23 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
{
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));

// Verify that data is not lost.
{
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
ASSERT_EQ("dontcare", v);

v.clear();
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v));
ASSERT_EQ("value" + std::to_string(0), v);

// Since it's corrupting second last wal, below key is not found.
v.clear();
ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v),
Status::NotFound());
}

for (auto* h : handles) {
delete h;
}
Expand Down Expand Up @@ -1219,8 +1237,7 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
// The combination of corrupting a WAL and injecting an error during subsequent
// re-open exposes the bug of prematurely persisting a new MANIFEST with
// advanced ColumnFamilyData::log_number.
TEST_P(CrashDuringRecoveryWithCorruptionTest,
DISABLED_TxnDbCrashDuringRecovery) {
TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
CloseDb();
Options options;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
Expand Down Expand Up @@ -1271,13 +1288,14 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,

// Put and flush cf0
for (int i = 0; i < 2; ++i) {
ASSERT_OK(txn_db->Put(WriteOptions(), "dontcare", "value"));
ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
}

// Put cf1
txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->Put(handles[1], "foo1", "value"));
ASSERT_OK(txn->Put(handles[1], "foo1", "value1"));
ASSERT_OK(txn->Commit());

delete txn;
Expand Down Expand Up @@ -1337,7 +1355,6 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
size_t size = file_nums.size();
assert(size >= 2);
uint64_t log_num = file_nums[size - 1];
CorruptFileWithTruncation(FileType::kWalFile, log_num);
}
Expand All @@ -1354,6 +1371,27 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
{
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
&handles, &txn_db));

// Verify that data is not lost.
{
std::string v;
// Key not visible since it's not committed.
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v),
Status::NotFound());

v.clear();
ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v));
ASSERT_EQ("value" + std::to_string(0), v);

// Last WAL is corrupted which contains two keys below.
v.clear();
ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v),
Status::NotFound());
v.clear();
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v),
Status::NotFound());
}

for (auto* h : handles) {
delete h;
}
Expand Down Expand Up @@ -1396,8 +1434,7 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
// The combination of corrupting a WAL and injecting an error during subsequent
// re-open exposes the bug of prematurely persisting a new MANIFEST with
// advanced ColumnFamilyData::log_number.
TEST_P(CrashDuringRecoveryWithCorruptionTest,
DISABLED_CrashDuringRecoveryWithFlush) {
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) {
CloseDb();
Options options;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
Expand Down Expand Up @@ -1430,7 +1467,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
// Write to default_cf and flush this cf several times to advance wal
// number.
for (int i = 0; i < 2; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), "value"));
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(db_->Flush(FlushOptions()));
}

Expand Down Expand Up @@ -1483,6 +1521,25 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
{
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));

// Verify that data is not lost.
{
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
ASSERT_EQ("dontcare", v);

for (int i = 0; i < 2; ++i) {
v.clear();
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v));
ASSERT_EQ("value" + std::to_string(i), v);
}

// Since it's corrupting last wal after Flush, below key is not found.
v.clear();
ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v),
Status::NotFound());
}

for (auto* h : handles) {
delete h;
}
Expand Down
60 changes: 54 additions & 6 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,39 @@ class DBImpl : public DB {

std::atomic<bool> shutting_down_;

// RecoveryContext struct stores the context about version edits along
// with corresponding column_family_data and column_family_options.
class RecoveryContext {
public:
~RecoveryContext() {
for (auto& edit_list : edit_lists_) {
for (auto* edit : edit_list) {
delete edit;
}
}
}

void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
assert(cfd != nullptr);
if (map_.find(cfd->GetID()) == map_.end()) {
uint32_t size = static_cast<uint32_t>(map_.size());
map_.emplace(cfd->GetID(), size);
cfds_.emplace_back(cfd);
mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions());
edit_lists_.emplace_back(autovector<VersionEdit*>());
}
uint32_t i = map_[cfd->GetID()];
edit_lists_[i].emplace_back(new VersionEdit(edit));
}

std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index;
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// files_to_delete_ contains sst files
std::unordered_set<std::string> files_to_delete_;
};

// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
Expand Down Expand Up @@ -1356,16 +1389,19 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit.
// recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
// skipped.
// recovery_ctx stores the context about version edits and all those
// edits are persisted to new Manifest after successfully syncing the new WAL.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_wal_file_exists = false,
bool error_if_data_exists_in_wals = false,
uint64_t* recovered_seq = nullptr);
uint64_t* recovered_seq = nullptr,
RecoveryContext* recovery_ctx = nullptr);

virtual bool OwnTablesAndLogs() const { return true; }

// Set DB identity file, and write DB ID to manifest if necessary.
Status SetDBId(bool read_only);
Status SetDBId(bool read_only, RecoveryContext* recovery_ctx);

// REQUIRES: db mutex held when calling this function, but the db mutex can
// be released and re-acquired. Db mutex will be held when the function
Expand All @@ -1374,12 +1410,15 @@ class DBImpl : public DB {
// not referenced in the MANIFEST (e.g.
// 1. It's best effort recovery;
// 2. The VersionEdits referencing the SST files are appended to
// MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are
// RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
// still not synced to MANIFEST during recovery.)
// We delete these SST files. In the
// It stores the SST files to be deleted in RecoveryContext. In the
// meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
Status DeleteUnreferencedSstFiles();
// recovery_ctx stores the context about version edits and files to be
// deleted. All those edits are persisted to new Manifest after successfully
// syncing the new WAL.
Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx);

// SetDbSessionId() should be called in the constuctor DBImpl()
// to ensure that db_session_id_ gets updated every time the DB is opened
Expand All @@ -1389,6 +1428,14 @@ class DBImpl : public DB {
Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
const Slice& ts) const;

// recovery_ctx stores the context about version edits and
// LogAndApplyForRecovery persist all those edits to new Manifest after
// successfully syncing new WAL.
// LogAndApplyForRecovery should be called only once during recovery and it
// should be called when RocksDB writes to a first new MANIFEST since this
// recovery.
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);

private:
friend class DB;
friend class ErrorHandler;
Expand Down Expand Up @@ -1645,7 +1692,8 @@ class DBImpl : public DB {
// corrupted_log_found is set to true if we recover from a corrupted log file.
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found);
bool* corrupted_log_found,
RecoveryContext* recovery_ctx);

// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
Expand Down
36 changes: 11 additions & 25 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
return min_log_number_to_keep;
}

Status DBImpl::SetDBId(bool read_only) {
Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
Status s;
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true
// the very first time.
Expand All @@ -890,22 +890,22 @@ Status DBImpl::SetDBId(bool read_only) {
}
s = GetDbIdentityFromIdentityFile(&db_id_);
if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
assert(!read_only);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
VersionEdit edit;
edit.SetDBId(db_id_);
Options options;
MutableCFOptions mutable_cf_options(options);
versions_->db_id_ = db_id_;
s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options, &edit, &mutex_, nullptr,
/* new_descriptor_log */ false);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
}
} else if (!read_only) {
s = SetIdentityFile(env_, dbname_, db_id_);
}
return s;
}

Status DBImpl::DeleteUnreferencedSstFiles() {
Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
Expand All @@ -925,7 +925,6 @@ Status DBImpl::DeleteUnreferencedSstFiles() {

uint64_t next_file_number = versions_->current_next_file_number();
uint64_t largest_file_number = next_file_number;
std::set<std::string> files_to_delete;
Status s;
for (const auto& path : paths) {
std::vector<std::string> files;
Expand All @@ -943,8 +942,9 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
const std::string normalized_fpath = path + fname;
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
files_to_delete.insert(normalized_fpath);
recovery_ctx->files_to_delete_.find(normalized_fpath) ==
recovery_ctx->files_to_delete_.end()) {
recovery_ctx->files_to_delete_.emplace(normalized_fpath);
}
}
}
Expand All @@ -961,21 +961,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
assert(versions_->GetColumnFamilySet());
ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
assert(default_cfd);
s = versions_->LogAndApply(
default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
directories_.GetDbDir(), /*new_descriptor_log*/ false);
if (!s.ok()) {
return s;
}

mutex_.Unlock();
for (const auto& fname : files_to_delete) {
s = env_->DeleteFile(fname);
if (!s.ok()) {
break;
}
}
mutex_.Lock();
recovery_ctx->UpdateVersionEdits(default_cfd, edit);
return s;
}

Expand Down
Loading

0 comments on commit 405a35f

Please sign in to comment.