Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of follower-side snapshot transmission resumption #255

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.3"
version = "2.2.4"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.6]@oss/master")
self.requires("homestore/[^6.6.15]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ add_test(NAME HomestoreTestDynamic

# To test both baseline & incremental resync functionality, we use 13 to minimize the likelihood of it being a divisor of the total LSN (currently 30)
add_test(NAME HomestoreTestDynamicWithResync
COMMAND homestore_test_dynamic --gtest_filter="HomeObjectFixture.ReplaceMember" -csv error --executor immediate --config_path ./
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
--override_config homestore_config.consensus.snapshot_freq_distance:13
--override_config homestore_config.consensus.num_reserved_log_items=13)
7 changes: 7 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@ void HSHomeObject::on_replica_restart() {
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); },
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_shard_meta_name);

// recover snapshot transmission progress info
HomeStore::instance()->meta_service().register_handler(
_snp_rcvr_meta_name,
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) { on_snp_rcvr_meta_blk_found(mblk, buf); },
[this](bool success) { on_snp_rcvr_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_snp_rcvr_meta_name);
});
}

Expand Down
41 changes: 34 additions & 7 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class HSHomeObject : public HomeObjectImpl {
inline static auto const _svc_meta_name = std::string("HomeObject");
inline static auto const _pg_meta_name = std::string("PGManager");
inline static auto const _shard_meta_name = std::string("ShardManager");
inline static auto const _snp_rcvr_meta_name = std::string("SnapshotReceiver");
static constexpr uint64_t HS_CHUNK_SIZE = 2 * Gi;
static constexpr uint32_t _data_block_size = 1024;
static uint64_t _hs_chunk_size;
Expand Down Expand Up @@ -161,8 +162,21 @@ class HSHomeObject : public HomeObjectImpl {
homestore::chunk_num_t p_chunk_id;
homestore::chunk_num_t v_chunk_id;
};
//TODO this blk is used to store snapshot metadata/status for recovery
struct snapshot_info_superblk {};

struct snapshot_rcvr_info_superblk {
shard_id_t shard_cursor;
int64_t snp_lsn;
pg_id_t pg_id;
uint64_t shard_cnt; // count of shards
shard_id_t shard_list[1]; // array of shard ids

uint32_t size() const {
return sizeof(snapshot_rcvr_info_superblk) - sizeof(shard_id_t) + shard_cnt * sizeof(shard_id_t);
}
static auto name() -> string { return _snp_rcvr_meta_name; }

std::vector< shard_id_t > get_shard_list() const { return std::vector(shard_list, shard_list + shard_cnt); }
};
#pragma pack()

public:
Expand Down Expand Up @@ -227,12 +241,15 @@ class HSHomeObject : public HomeObjectImpl {
uint32_t blk_size;
};

public:
homestore::superblk< pg_info_superblk > pg_sb_;
shared< homestore::ReplDev > repl_dev_;
std::shared_ptr< BlobIndexTable > index_table_;
PGMetrics metrics_;

// Snapshot receiver progress info, used as a checkpoint for recovery
// Placed within HS_PG since HomeObject is unable to locate the ReplicationStateMachine
homestore::superblk< snapshot_rcvr_info_superblk > snp_rcvr_info_sb_;

HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table,
std::shared_ptr< const std::vector< homestore::chunk_num_t > > pg_chunk_ids);
HS_PG(homestore::superblk< pg_info_superblk >&& sb, shared< homestore::ReplDev > rdev);
Expand Down Expand Up @@ -369,7 +386,15 @@ class HSHomeObject : public HomeObjectImpl {
bool is_last_batch);

int64_t get_context_lsn() const;
pg_id_t get_context_pg_id() const;

// Try to load existing snapshot context info
bool load_prev_context();

// Reset the context for a new snapshot, should be called before each new snapshot transmission
void reset_context(int64_t lsn, pg_id_t pg_id);
void destroy_context();

shard_id_t get_shard_cursor() const;
shard_id_t get_next_shard() const;

Expand All @@ -391,9 +416,10 @@ class HSHomeObject : public HomeObjectImpl {

std::unique_ptr< SnapshotContext > ctx_;

// snapshot info, can be used as a checkpoint for recovery
snapshot_info_superblk snp_info_;
// other stats for snapshot transmission progress
// Update the snp_info superblock
void update_snp_info_sb(bool init = false);

HS_PG* get_hs_pg(pg_id_t pg_id);
};

private:
Expand All @@ -404,7 +430,6 @@ class HSHomeObject : public HomeObjectImpl {
static constexpr size_t max_zpad_bufs = _data_block_size / io_align;
std::array< sisl::io_blob_safe, max_zpad_bufs > zpad_bufs_; // Zero padded buffers for blob payload.

private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

// blob related
Expand Down Expand Up @@ -436,6 +461,8 @@ class HSHomeObject : public HomeObjectImpl {
void on_pg_meta_blk_recover_completed(bool success);
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_shard_meta_blk_recover_completed(bool success);
void on_snp_rcvr_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_snp_rcvr_meta_blk_recover_completed(bool success);

void persist_pg_sb();

Expand Down
6 changes: 3 additions & 3 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
pg_sb_{_pg_meta_name},
repl_dev_{std::move(rdev)},
index_table_{std::move(index_table)},
metrics_{*this} {
metrics_{*this},
snp_rcvr_info_sb_{_snp_rcvr_meta_name} {
RELEASE_ASSERT(pg_chunk_ids != nullptr, "PG chunks null");
const uint32_t num_chunks = pg_chunk_ids->size();
pg_sb_.create(sizeof(pg_info_superblk) - sizeof(char) + pg_info_.members.size() * sizeof(pg_members) +
Expand Down Expand Up @@ -516,8 +517,7 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share
pg_sb_.write();
}

HSHomeObject::HS_PG::HS_PG(homestore::superblk< HSHomeObject::pg_info_superblk >&& sb,
shared< homestore::ReplDev > rdev) :
HSHomeObject::HS_PG::HS_PG(superblk< pg_info_superblk >&& sb, shared< ReplDev > rdev) :
PG{pg_info_from_sb(sb)}, pg_sb_{std::move(sb)}, repl_dev_{std::move(rdev)}, metrics_{*this} {
durable_entities_.blob_sequence_num = pg_sb_->blob_sequence_num;
durable_entities_.active_blob_count = pg_sb_->active_blob_count;
Expand Down
17 changes: 17 additions & 0 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ bool HSHomeObject::PGBlobIterator::update_cursor(objId id) {
if (id.value == LAST_OBJ_ID) { return true; }
//resend batch
if (id.value == cur_obj_id_.value) { return true; }

// If cur_obj_id_ == 0|0 (PG meta), this may be a request for resuming from specific shard
if (cur_obj_id_.shard_seq_num == 0 && id.shard_seq_num != 0 && id.batch_id == 0) {
bool found = false;
for (size_t i = 0; i < shard_list_.size(); i++) {
if (get_sequence_num_from_shard_id(shard_list_[i].info.id) == id.shard_seq_num) {
found = true;
cur_shard_idx_ = i;
cur_start_blob_idx_ = 0;
cur_batch_blob_count_ = 0;
break;
}
}
if (found) { cur_obj_id_ = id; }
return found;
}

auto next_obj_id = expected_next_obj_id();
if (id.value != next_obj_id.value) {
LOGE("invalid objId, expected={}, actual={}", next_obj_id.to_string(), id.to_string());
Expand Down
40 changes: 25 additions & 15 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_sna
}

int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
HSHomeObject::PGBlobIterator* pg_iter = nullptr;
auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();

Expand Down Expand Up @@ -278,15 +278,15 @@ int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snaps
log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id);

LOGD("read current snp obj {}", log_str)
//invalid Id
// invalid Id
if (!pg_iter->update_cursor(obj_id)) {
LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}",
log_str, pg_iter->cur_obj_id_.shard_seq_num, pg_iter->cur_obj_id_.batch_id);
return -1;
}

//pg metadata message
//shardId starts from 1
// pg metadata message
// shardId starts from 1
if (obj_id.shard_seq_num == 0) {
if (!pg_iter->create_pg_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create pg snapshot data for snapshot read, {}", log_str);
Expand Down Expand Up @@ -323,16 +323,21 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
auto r_dev = repl_dev();
if (!m_snp_rcv_handler) {
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
if (m_snp_rcv_handler->load_prev_context()) {
LOGI("Reloaded previous snapshot context, lsn:{} pg_id:{} shard:{}", context->get_lsn(),
m_snp_rcv_handler->get_context_pg_id(), m_snp_rcv_handler->get_next_shard());
}
}

auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_obj->offset));
auto obj_id = objId(snp_obj->offset);
auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}",
uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(),
obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size());

if (snp_obj->is_last_obj) {
LOGD("Write snapshot reached is_last_obj true {}", log_suffix);
m_snp_rcv_handler->destroy_context();
return;
}

Expand Down Expand Up @@ -363,19 +368,25 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna

auto pg_data = GetSizePrefixedResyncPGMetaData(data_buf);

//Check if pg exists, if yes, clean the stale pg resources, may be due to previous snapshot failure. Let's resync on a pristine base
if (m_snp_rcv_handler->get_context_lsn() == context->get_lsn() && m_snp_rcv_handler->get_shard_cursor() != 0) {
// Request to resume from the beginning of shard
snp_obj->offset =
m_snp_rcv_handler->get_shard_cursor() == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker
? LAST_OBJ_ID
: objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_shard_cursor()), 0).value;
LOGI("Resume from previous context breakpoint, lsn:{} pg_id:{} shard:{}", context->get_lsn(),
pg_data->pg_id(), m_snp_rcv_handler->get_next_shard());
return;
}

// Init a new transmission
// If PG already exists, clean the stale pg resources. Let's resync on a pristine base
if (home_object_->pg_exists(pg_data->pg_id())) {
LOGI("pg already exists, clean pg resources before snapshot, pg_id:{} {}", pg_data->pg_id(), log_suffix);
home_object_->pg_destroy(pg_data->pg_id());
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
}
// Check if the snapshot context is same as the current snapshot context.
// If not, drop the previous context and re-init a new one
if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) {
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
}
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());

auto ret = m_snp_rcv_handler->process_pg_snapshot_data(*pg_data);
if (ret) {
Expand Down Expand Up @@ -452,5 +463,4 @@ void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) {
delete pg_iter;
user_snp_ctx = nullptr;
}

} // namespace homeobject
17 changes: 8 additions & 9 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,15 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
/// @brief Called when the replica is being destroyed by nuraft;
void on_destroy(const homestore::group_id_t& group_id) override;

/// Not Implemented
/// @brief Called when the snapshot is being created by nuraft;
// Snapshot related functions
homestore::AsyncReplResult<> create_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual bool apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual std::shared_ptr< homestore::snapshot_context > last_snapshot() override;
virtual int read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void free_user_snp_ctx(void*& user_snp_ctx) override;
bool apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
std::shared_ptr< homestore::snapshot_context > last_snapshot() override;
int read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
void write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
void free_user_snp_ctx(void*& user_snp_ctx) override;

private:
HSHomeObject* home_object_{nullptr};
Expand Down
Loading
Loading