Skip to content

Commit

Permalink
Support shred orchestration in BSC (#13881)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Jan 28, 2025
1 parent 1ee80e5 commit 0d933fd
Show file tree
Hide file tree
Showing 19 changed files with 892 additions and 52 deletions.
96 changes: 96 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ STATEFN(TNodeWarden::StateOnline) {
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);

hFunc(NPDisk::TEvSlayResult, Handle);
hFunc(NPDisk::TEvShredPDiskResult, Handle);
hFunc(NPDisk::TEvShredPDisk, Handle);

hFunc(TEvRegisterPDiskLoadActor, Handle);

Expand Down Expand Up @@ -575,6 +577,50 @@ void TNodeWarden::Handle(NPDisk::TEvSlayResult::TPtr ev) {
};
}

void TNodeWarden::Handle(NPDisk::TEvShredPDiskResult::TPtr ev) {
ProcessShredStatus(ev->Cookie, ev->Get()->ShredGeneration, ev->Get()->Status == NKikimrProto::OK ? std::nullopt :
std::make_optional(TStringBuilder() << "failed to shred PDisk Status# " << NKikimrProto::EReplyStatus_Name(
ev->Get()->Status)));
}

void TNodeWarden::Handle(NPDisk::TEvShredPDisk::TPtr ev) {
// the message has returned to sender -- PDisk was terminated before processing it; normally it must never happen,
// because NodeWarden issues PoisonPill synchronously with removing PDisk from the LocalPDisks set
ProcessShredStatus(ev->Cookie, ev->Get()->ShredGeneration, "PDisk has been terminated before it got shredded");
Y_DEBUG_ABORT("unexpected case");
}

void TNodeWarden::ProcessShredStatus(ui64 cookie, ui64 generation, std::optional<TString> error) {
const auto it = ShredInFlight.find(cookie);
const std::optional<TPDiskKey> key = it != ShredInFlight.end() ? std::make_optional(it->second) : std::nullopt;
if (it != ShredInFlight.end()) {
ShredInFlight.erase(it);
}

const auto pdiskIt = key ? LocalPDisks.find(*key) : LocalPDisks.end();
TPDiskRecord *pdisk = pdiskIt != LocalPDisks.end() ? &pdiskIt->second : nullptr;
if (pdisk) {
const size_t numErased = pdisk->ShredCookies.erase(cookie);
Y_ABORT_UNLESS(numErased);
}

STLOG(PRI_DEBUG, BS_SHRED, BSSN00, "processing shred result from PDisk",
(Cookie, cookie),
(PDiskId, key),
(ShredGeneration, generation),
(ErrorReason, error),
(ShredGenerationIssued, pdisk ? pdisk->ShredGenerationIssued : std::nullopt));

if (pdisk && generation == pdisk->ShredGenerationIssued) {
if (error) {
pdisk->ShredState.emplace<TString>(std::move(*error));
} else {
pdisk->ShredState.emplace<ui64>(generation);
}
SendPDiskReport(key->PDiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, pdisk->ShredState);
}
}

void TNodeWarden::Handle(TEvRegisterPDiskLoadActor::TPtr ev) {
Send(ev.Get()->Sender, new TEvRegisterPDiskLoadActorResult(NextLocalPDiskInitOwnerRound()));
}
Expand Down Expand Up @@ -609,6 +655,56 @@ void TNodeWarden::Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr
ApplyGroupInfo(groupId, generation, nullptr, false, false);
}
}

if (record.HasShredRequest()) {
const auto& request = record.GetShredRequest();
const ui64 generation = request.GetShredGeneration();
for (ui32 pdiskId : request.GetPDiskIds()) {
const TPDiskKey key(LocalNodeId, pdiskId);
if (const auto it = LocalPDisks.find(key); it != LocalPDisks.end()) {
TPDiskRecord& pdisk = it->second;

auto issueShredRequestToPDisk = [&] {
const ui64 cookie = ++LastShredCookie;
ShredInFlight.emplace(cookie, key);
pdisk.ShredCookies.insert(cookie);

const TActorId actorId = SelfId();
auto ev = std::make_unique<NPDisk::TEvShredPDisk>(generation);
TActivationContext::Send(new IEventHandle(MakeBlobStoragePDiskID(LocalNodeId, pdiskId), SelfId(),
ev.release(), IEventHandle::FlagForwardOnNondelivery, cookie, &actorId));
pdisk.ShredGenerationIssued.emplace(generation);

STLOG(PRI_DEBUG, BS_SHRED, BSSN01, "sending shred query to PDisk",
(Cookie, cookie),
(PDiskId, key),
(ShredGeneration, generation));
};

if (pdisk.ShredGenerationIssued == generation) {
std::visit(TOverloaded{
[&](std::monostate&) {
// shredding is in progress, do nothing
},
[&](ui64& generation) {
// shredding has already completed for this generation, report it
SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, generation);
},
[&](TString& /*aborted*/) {
// shredding finished with error last time, restart
issueShredRequestToPDisk();
}
}, pdisk.ShredState);
} else if (pdisk.ShredGenerationIssued < generation) {
issueShredRequestToPDisk();
} else {
SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, "obsolete generation");
}
} else {
SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, "PDisk not found");
}
}
}
}

void TNodeWarden::SendDropDonorQuery(ui32 nodeId, ui32 pdiskId, ui32 vslotId, const TVDiskID& vdiskId, TDuration backoff) {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ namespace NKikimr::NStorage {
friend bool operator ==(const TPDiskKey& x, const TPDiskKey& y) {
return x.NodeId == y.NodeId && x.PDiskId == y.PDiskId;
}

TString ToString() const {
return TStringBuilder() << '[' << NodeId << ':' << PDiskId << ']';
}
};

struct TUnreportedMetricTag {};
Expand All @@ -78,6 +82,10 @@ namespace NKikimr::NStorage {
ui32 RefCount = 0;
bool Temporary = false;

std::optional<ui64> ShredGenerationIssued;
std::variant<std::monostate, ui64, TString> ShredState; // not issued, finished with generation, aborted
THashSet<ui64> ShredCookies;

TPDiskRecord(NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk record)
: Record(std::move(record))
{}
Expand Down Expand Up @@ -113,6 +121,8 @@ namespace NKikimr::NStorage {
std::map<TPDiskKey, TPDiskRecord> LocalPDisks;
TIntrusiveList<TPDiskRecord, TUnreportedMetricTag> PDisksWithUnreportedMetrics;
std::map<ui64, ui32> PDiskRestartRequests;
ui64 LastShredCookie = 0;
THashMap<ui64, TPDiskKey> ShredInFlight;

struct TPDiskByPathInfo {
TPDiskKey RunningPDiskId; // currently running PDiskId
Expand Down Expand Up @@ -535,6 +545,9 @@ namespace NKikimr::NStorage {
void Handle(TEvInterconnect::TEvNodeInfo::TPtr ev);
void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev);
void Handle(NPDisk::TEvSlayResult::TPtr ev);
void Handle(NPDisk::TEvShredPDiskResult::TPtr ev);
void Handle(NPDisk::TEvShredPDisk::TPtr ev);
void ProcessShredStatus(ui64 cookie, ui64 generation, std::optional<TString> error);
void Handle(TEvRegisterPDiskLoadActor::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev);

Expand All @@ -543,7 +556,8 @@ namespace NKikimr::NStorage {
void SendVDiskReport(TVSlotId vslotId, const TVDiskID& vdiskId,
NKikimrBlobStorage::TEvControllerNodeReport::EVDiskPhase phase, TDuration backoff = {});

void SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase);
void SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase,
std::variant<std::monostate, ui64, TString> shredState = {});

void Handle(TEvBlobStorage::TEvControllerUpdateDiskStatus::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev);
Expand Down
17 changes: 16 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ namespace NKikimr::NStorage {
Y_ABORT_UNLESS(jt != PDiskByPath.end() && jt->second.RunningPDiskId == it->first);
pending = std::move(jt->second.Pending);
PDiskByPath.erase(jt);
for (ui64 cookie : it->second.ShredCookies) {
ShredInFlight.erase(cookie);
}
LocalPDisks.erase(it);
PDiskRestartInFlight.erase(pdiskId);

Expand All @@ -250,14 +253,26 @@ namespace NKikimr::NStorage {
}
}

void TNodeWarden::SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase) {
void TNodeWarden::SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase,
std::variant<std::monostate, ui64, TString> shredState) {
STLOG(PRI_DEBUG, BS_NODE, NW41, "SendPDiskReport", (PDiskId, pdiskId), (Phase, phase));

auto report = std::make_unique<TEvBlobStorage::TEvControllerNodeReport>(LocalNodeId);
auto *pReport = report->Record.AddPDiskReports();
pReport->SetPDiskId(pdiskId);
pReport->SetPhase(phase);

const TPDiskKey key(LocalNodeId, pdiskId);
if (const auto it = LocalPDisks.find(key); it != LocalPDisks.end() && it->second.Record.HasPDiskGuid()) {
pReport->SetPDiskGuid(it->second.Record.GetPDiskGuid());
}

std::visit(TOverloaded{
[](std::monostate&) {},
[pReport](ui64& generation) { pReport->SetShredGenerationFinished(generation); },
[pReport](TString& aborted) { pReport->SetShredAborted(aborted); }
}, shredState);

SendToController(std::move(report));
}

Expand Down
15 changes: 15 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ void TNodeWarden::SendRegisterNode() {
FillInVDiskStatus(ev->Record.MutableVDiskStatus(), true);
ev->Record.SetDeclarativePDiskManagement(true);

for (const auto& [key, pdisk] : LocalPDisks) {
if (pdisk.ShredGenerationIssued) {
auto *item = ev->Record.AddShredStatus();
item->SetPDiskId(key.PDiskId);
if (pdisk.Record.HasPDiskGuid()) {
item->SetPDiskGuid(pdisk.Record.GetPDiskGuid());
}
std::visit(TOverloaded{
[item](const std::monostate&) { item->SetShredInProgress(true); },
[item](const ui64& generation) { item->SetShredGenerationFinished(generation); },
[item](const TString& aborted) { item->SetShredAborted(aborted); }
}, pdisk.ShredState);
}
}

SendToController(std::move(ev));
}

Expand Down
57 changes: 56 additions & 1 deletion ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ namespace NKikimr {

LOG_DEBUG_S(ctx, BS_VDISK_BLOCK, VCtx->VDiskLogPrefix
<< "TEvVBlock: tabletId# " << tabletId << " gen# " << gen
<< " Marker# BSVS14");
<< " Marker# BSVS00");

TLsnSeg seg;
ui32 actGen = 0;
Expand Down Expand Up @@ -1824,6 +1824,7 @@ namespace NKikimr {
}
UpdateReplState();
RunBalancing(ctx);
ProcessShredQ();
}

void SkeletonErrorState(const TActorContext &ctx,
Expand Down Expand Up @@ -2637,6 +2638,52 @@ namespace NKikimr {
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Shredding support

std::deque<std::unique_ptr<IEventHandle>> ShredQ;

template<typename TEvent>
void HandleShredEnqueue(TAutoPtr<TEventHandle<TEvent>> ev) {
STLOG(PRI_DEBUG, BS_SHRED, BSSV00, VCtx->VDiskLogPrefix << "enqueued shred event", (Type, ev->GetTypeRewrite()));
ShredQ.emplace_back(ev.Release());
}

void ProcessShredQ() {
for (auto& ev : std::exchange(ShredQ, {})) {
TAutoPtr<IEventHandle> temp(ev.release());
Receive(temp);
}
}

void HandleShred(NPDisk::TEvPreShredCompactVDisk::TPtr ev) {
STLOG(PRI_DEBUG, BS_SHRED, BSSV01, VCtx->VDiskLogPrefix << "processing TEvPreShredCompactVDisk",
(ShredGeneration, ev->Get()->ShredGeneration));
Send(ev->Sender, new NPDisk::TEvPreShredCompactVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound,
ev->Get()->ShredGeneration, NKikimrProto::OK, TString()), 0, ev->Cookie);
}

void HandleShred(NPDisk::TEvShredVDisk::TPtr ev) {
STLOG(PRI_DEBUG, BS_SHRED, BSSV02, VCtx->VDiskLogPrefix << "processing TEvShredVDisk",
(ShredGeneration, ev->Get()->ShredGeneration));
Send(ev->Sender, new NPDisk::TEvShredVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound,
ev->Get()->ShredGeneration, NKikimrProto::OK, TString()), 0, ev->Cookie);
}

void HandleShredError(NPDisk::TEvPreShredCompactVDisk::TPtr ev) {
STLOG(PRI_DEBUG, BS_SHRED, BSSV03, VCtx->VDiskLogPrefix << "processing TEvPreShredCompactVDisk in error state",
(ShredGeneration, ev->Get()->ShredGeneration));
Send(ev->Sender, new NPDisk::TEvPreShredCompactVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound,
ev->Get()->ShredGeneration, NKikimrProto::ERROR, "VDisk is in error state"), 0, ev->Cookie);
}

void HandleShredError(NPDisk::TEvShredVDisk::TPtr ev) {
STLOG(PRI_DEBUG, BS_SHRED, BSSV04, VCtx->VDiskLogPrefix << "processing TEvShredVDisk in error state",
(ShredGeneration, ev->Get()->ShredGeneration));
Send(ev->Sender, new NPDisk::TEvShredVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound,
ev->Get()->ShredGeneration, NKikimrProto::ERROR, "VDisk is in error state"), 0, ev->Cookie);
}

// NOTES: we have 4 state functions, one of which is an error state (StateDatabaseError) and
// others are good: StateLocalRecovery, StateSyncGuidRecovery, StateNormal
// We switch between states in the following manner:
Expand Down Expand Up @@ -2696,6 +2743,8 @@ namespace NKikimr {
hFunc(NPDisk::TEvChunkForgetResult, Handle)
FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration)
hFunc(TEvReplInvoke, HandleReplNotInProgress)
hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShredEnqueue)
hFunc(NPDisk::TEvShredVDisk, HandleShredEnqueue)
)

COUNTED_STRICT_STFUNC(StateSyncGuidRecovery,
Expand Down Expand Up @@ -2749,6 +2798,8 @@ namespace NKikimr {
hFunc(NPDisk::TEvChunkForgetResult, Handle)
FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration)
hFunc(TEvReplInvoke, HandleReplNotInProgress)
hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShredEnqueue)
hFunc(NPDisk::TEvShredVDisk, HandleShredEnqueue)
)

COUNTED_STRICT_STFUNC(StateNormal,
Expand Down Expand Up @@ -2819,6 +2870,8 @@ namespace NKikimr {
FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration)
hFunc(TEvReplInvoke, Handle)
CFunc(TEvStartBalancing::EventType, RunBalancing)
hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShred)
hFunc(NPDisk::TEvShredVDisk, HandleShred)
)

COUNTED_STRICT_STFUNC(StateDatabaseError,
Expand Down Expand Up @@ -2846,6 +2899,8 @@ namespace NKikimr {
hFunc(NPDisk::TEvChunkForgetResult, Handle)
FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration)
hFunc(TEvReplInvoke, HandleReplNotInProgress)
hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShredError)
hFunc(NPDisk::TEvShredVDisk, HandleShredError)
)

PDISK_TERMINATE_STATE_FUNC_DEF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ namespace NKikimr {
<< " OriginalBlobId# " << OriginalId
<< " PatchedBlobId# " << PatchedId
<< " ErrorReason# " << ErrorReason
<< " Marker# BSVSP01");
<< " Marker# BSVSP00");
SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), Event->Cookie, VCtx);
PassAway();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/mind/bscontroller/bsc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ STFUNC(TBlobStorageController::StateWork) {
hFunc(TEvTabletPipe::TEvClientDestroyed, ConsoleInteraction->Handle);
hFunc(TEvBlobStorage::TEvGetBlockResult, ConsoleInteraction->Handle);
fFunc(TEvBlobStorage::EvControllerShredRequest, EnqueueIncomingEvent);
cFunc(TEvPrivate::EvUpdateShredState, ShredState.HandleUpdateShredState);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
STLOG(PRI_ERROR, BS_CONTROLLER, BSC06, "StateWork unexpected event", (Type, type),
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/mind/bscontroller/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ namespace NKikimr::NBsController {
const ui32 nodeId = fullPDiskId.NodeId;
const ui32 pdiskId = fullPDiskId.PDiskId;

NKikimrBlobStorage::TNodeWardenServiceSet &service = *Services[nodeId].MutableServiceSet();
auto& query = Services[nodeId];
NKikimrBlobStorage::TNodeWardenServiceSet &service = *query.MutableServiceSet();
NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk *pdisk = service.AddPDisks();
pdisk->SetNodeID(nodeId);
pdisk->SetPDiskID(pdiskId);
Expand Down Expand Up @@ -125,6 +126,14 @@ namespace NKikimr::NBsController {
break;
}

if (auto& shred = Self->ShredState; shred.ShouldShred(fullPDiskId, pdiskInfo)) {
const auto& generation = shred.GetCurrentGeneration();
Y_ABORT_UNLESS(generation);
auto *m = query.MutableShredRequest();
m->SetShredGeneration(*generation);
m->AddPDiskIds(fullPDiskId.PDiskId);
}

return pdisk;
}

Expand Down Expand Up @@ -505,6 +514,7 @@ namespace NKikimr::NBsController {
CommitStoragePoolStatUpdates(state);
CommitSysViewUpdates(state);
CommitVirtualGroupUpdates(state);
CommitShredUpdates(state);

// add updated and remove deleted vslots from VSlotReadyTimestampQ
const TMonotonic now = TActivationContext::Monotonic();
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/mind/bscontroller/config_fit_pdisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ namespace NKikimr {
/* nextVslotId */ 1000, disk.PDiskConfig, disk.BoxId, DefaultMaxSlots,
NKikimrBlobStorage::EDriveStatus::ACTIVE, /* statusTimestamp */ TInstant::Zero(),
NKikimrBlobStorage::EDecommitStatus::DECOMMIT_NONE, NBsController::TPDiskMood::Normal,
disk.Serial, disk.LastSeenSerial, disk.LastSeenPath, staticSlotUsage);
disk.Serial, disk.LastSeenSerial, disk.LastSeenPath, staticSlotUsage,
true /* assume shred completed for this disk */);

// Set PDiskId and Guid in DrivesSerials
if (auto info = state.DrivesSerials.FindForUpdate(disk.Serial)) {
Expand Down
Loading

0 comments on commit 0d933fd

Please sign in to comment.