diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 304b734f8f5e..b9322ef7ed46 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -100,6 +100,8 @@ STATEFN(TNodeWarden::StateOnline) { hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); hFunc(NPDisk::TEvSlayResult, Handle); + hFunc(NPDisk::TEvShredPDiskResult, Handle); + hFunc(NPDisk::TEvShredPDisk, Handle); hFunc(TEvRegisterPDiskLoadActor, Handle); @@ -575,6 +577,50 @@ void TNodeWarden::Handle(NPDisk::TEvSlayResult::TPtr ev) { }; } +void TNodeWarden::Handle(NPDisk::TEvShredPDiskResult::TPtr ev) { + ProcessShredStatus(ev->Cookie, ev->Get()->ShredGeneration, ev->Get()->Status == NKikimrProto::OK ? std::nullopt : + std::make_optional(TStringBuilder() << "failed to shred PDisk Status# " << NKikimrProto::EReplyStatus_Name( + ev->Get()->Status))); +} + +void TNodeWarden::Handle(NPDisk::TEvShredPDisk::TPtr ev) { + // the message has returned to sender -- PDisk was terminated before processing it; normally it must never happen, + // because NodeWarden issues PoisonPill synchronously with removing PDisk from the LocalPDisks set + ProcessShredStatus(ev->Cookie, ev->Get()->ShredGeneration, "PDisk has been terminated before it got shredded"); + Y_DEBUG_ABORT("unexpected case"); +} + +void TNodeWarden::ProcessShredStatus(ui64 cookie, ui64 generation, std::optional error) { + const auto it = ShredInFlight.find(cookie); + const std::optional key = it != ShredInFlight.end() ? std::make_optional(it->second) : std::nullopt; + if (it != ShredInFlight.end()) { + ShredInFlight.erase(it); + } + + const auto pdiskIt = key ? LocalPDisks.find(*key) : LocalPDisks.end(); + TPDiskRecord *pdisk = pdiskIt != LocalPDisks.end() ? &pdiskIt->second : nullptr; + if (pdisk) { + const size_t numErased = pdisk->ShredCookies.erase(cookie); + Y_ABORT_UNLESS(numErased); + } + + STLOG(PRI_DEBUG, BS_SHRED, BSSN00, "processing shred result from PDisk", + (Cookie, cookie), + (PDiskId, key), + (ShredGeneration, generation), + (ErrorReason, error), + (ShredGenerationIssued, pdisk ? pdisk->ShredGenerationIssued : std::nullopt)); + + if (pdisk && generation == pdisk->ShredGenerationIssued) { + if (error) { + pdisk->ShredState.emplace(std::move(*error)); + } else { + pdisk->ShredState.emplace(generation); + } + SendPDiskReport(key->PDiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, pdisk->ShredState); + } +} + void TNodeWarden::Handle(TEvRegisterPDiskLoadActor::TPtr ev) { Send(ev.Get()->Sender, new TEvRegisterPDiskLoadActorResult(NextLocalPDiskInitOwnerRound())); } @@ -609,6 +655,56 @@ void TNodeWarden::Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ApplyGroupInfo(groupId, generation, nullptr, false, false); } } + + if (record.HasShredRequest()) { + const auto& request = record.GetShredRequest(); + const ui64 generation = request.GetShredGeneration(); + for (ui32 pdiskId : request.GetPDiskIds()) { + const TPDiskKey key(LocalNodeId, pdiskId); + if (const auto it = LocalPDisks.find(key); it != LocalPDisks.end()) { + TPDiskRecord& pdisk = it->second; + + auto issueShredRequestToPDisk = [&] { + const ui64 cookie = ++LastShredCookie; + ShredInFlight.emplace(cookie, key); + pdisk.ShredCookies.insert(cookie); + + const TActorId actorId = SelfId(); + auto ev = std::make_unique(generation); + TActivationContext::Send(new IEventHandle(MakeBlobStoragePDiskID(LocalNodeId, pdiskId), SelfId(), + ev.release(), IEventHandle::FlagForwardOnNondelivery, cookie, &actorId)); + pdisk.ShredGenerationIssued.emplace(generation); + + STLOG(PRI_DEBUG, BS_SHRED, BSSN01, "sending shred query to PDisk", + (Cookie, cookie), + (PDiskId, key), + (ShredGeneration, generation)); + }; + + if (pdisk.ShredGenerationIssued == generation) { + std::visit(TOverloaded{ + [&](std::monostate&) { + // shredding is in progress, do nothing + }, + [&](ui64& generation) { + // shredding has already completed for this generation, report it + SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, generation); + }, + [&](TString& /*aborted*/) { + // shredding finished with error last time, restart + issueShredRequestToPDisk(); + } + }, pdisk.ShredState); + } else if (pdisk.ShredGenerationIssued < generation) { + issueShredRequestToPDisk(); + } else { + SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, "obsolete generation"); + } + } else { + SendPDiskReport(pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED, "PDisk not found"); + } + } + } } void TNodeWarden::SendDropDonorQuery(ui32 nodeId, ui32 pdiskId, ui32 vslotId, const TVDiskID& vdiskId, TDuration backoff) { diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index c37a25ce2bd3..753946503ed0 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -61,6 +61,10 @@ namespace NKikimr::NStorage { friend bool operator ==(const TPDiskKey& x, const TPDiskKey& y) { return x.NodeId == y.NodeId && x.PDiskId == y.PDiskId; } + + TString ToString() const { + return TStringBuilder() << '[' << NodeId << ':' << PDiskId << ']'; + } }; struct TUnreportedMetricTag {}; @@ -78,6 +82,10 @@ namespace NKikimr::NStorage { ui32 RefCount = 0; bool Temporary = false; + std::optional ShredGenerationIssued; + std::variant ShredState; // not issued, finished with generation, aborted + THashSet ShredCookies; + TPDiskRecord(NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk record) : Record(std::move(record)) {} @@ -113,6 +121,8 @@ namespace NKikimr::NStorage { std::map LocalPDisks; TIntrusiveList PDisksWithUnreportedMetrics; std::map PDiskRestartRequests; + ui64 LastShredCookie = 0; + THashMap ShredInFlight; struct TPDiskByPathInfo { TPDiskKey RunningPDiskId; // currently running PDiskId @@ -535,6 +545,9 @@ namespace NKikimr::NStorage { void Handle(TEvInterconnect::TEvNodeInfo::TPtr ev); void Handle(TEvInterconnect::TEvNodesInfo::TPtr ev); void Handle(NPDisk::TEvSlayResult::TPtr ev); + void Handle(NPDisk::TEvShredPDiskResult::TPtr ev); + void Handle(NPDisk::TEvShredPDisk::TPtr ev); + void ProcessShredStatus(ui64 cookie, ui64 generation, std::optional error); void Handle(TEvRegisterPDiskLoadActor::TPtr ev); void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev); @@ -543,7 +556,8 @@ namespace NKikimr::NStorage { void SendVDiskReport(TVSlotId vslotId, const TVDiskID& vdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EVDiskPhase phase, TDuration backoff = {}); - void SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase); + void SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase, + std::variant shredState = {}); void Handle(TEvBlobStorage::TEvControllerUpdateDiskStatus::TPtr ev); void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp index d29112084e1e..fa9d3f9ef000 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp @@ -224,6 +224,9 @@ namespace NKikimr::NStorage { Y_ABORT_UNLESS(jt != PDiskByPath.end() && jt->second.RunningPDiskId == it->first); pending = std::move(jt->second.Pending); PDiskByPath.erase(jt); + for (ui64 cookie : it->second.ShredCookies) { + ShredInFlight.erase(cookie); + } LocalPDisks.erase(it); PDiskRestartInFlight.erase(pdiskId); @@ -250,7 +253,8 @@ namespace NKikimr::NStorage { } } - void TNodeWarden::SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase) { + void TNodeWarden::SendPDiskReport(ui32 pdiskId, NKikimrBlobStorage::TEvControllerNodeReport::EPDiskPhase phase, + std::variant shredState) { STLOG(PRI_DEBUG, BS_NODE, NW41, "SendPDiskReport", (PDiskId, pdiskId), (Phase, phase)); auto report = std::make_unique(LocalNodeId); @@ -258,6 +262,17 @@ namespace NKikimr::NStorage { pReport->SetPDiskId(pdiskId); pReport->SetPhase(phase); + const TPDiskKey key(LocalNodeId, pdiskId); + if (const auto it = LocalPDisks.find(key); it != LocalPDisks.end() && it->second.Record.HasPDiskGuid()) { + pReport->SetPDiskGuid(it->second.Record.GetPDiskGuid()); + } + + std::visit(TOverloaded{ + [](std::monostate&) {}, + [pReport](ui64& generation) { pReport->SetShredGenerationFinished(generation); }, + [pReport](TString& aborted) { pReport->SetShredAborted(aborted); } + }, shredState); + SendToController(std::move(report)); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp b/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp index 019246d0e3db..65cdc0267ed5 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp @@ -83,6 +83,21 @@ void TNodeWarden::SendRegisterNode() { FillInVDiskStatus(ev->Record.MutableVDiskStatus(), true); ev->Record.SetDeclarativePDiskManagement(true); + for (const auto& [key, pdisk] : LocalPDisks) { + if (pdisk.ShredGenerationIssued) { + auto *item = ev->Record.AddShredStatus(); + item->SetPDiskId(key.PDiskId); + if (pdisk.Record.HasPDiskGuid()) { + item->SetPDiskGuid(pdisk.Record.GetPDiskGuid()); + } + std::visit(TOverloaded{ + [item](const std::monostate&) { item->SetShredInProgress(true); }, + [item](const ui64& generation) { item->SetShredGenerationFinished(generation); }, + [item](const TString& aborted) { item->SetShredAborted(aborted); } + }, pdisk.ShredState); + } + } + SendToController(std::move(ev)); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 31f09470895b..fdad5c331e2f 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -1083,7 +1083,7 @@ namespace NKikimr { LOG_DEBUG_S(ctx, BS_VDISK_BLOCK, VCtx->VDiskLogPrefix << "TEvVBlock: tabletId# " << tabletId << " gen# " << gen - << " Marker# BSVS14"); + << " Marker# BSVS00"); TLsnSeg seg; ui32 actGen = 0; @@ -1824,6 +1824,7 @@ namespace NKikimr { } UpdateReplState(); RunBalancing(ctx); + ProcessShredQ(); } void SkeletonErrorState(const TActorContext &ctx, @@ -2637,6 +2638,52 @@ namespace NKikimr { } } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Shredding support + + std::deque> ShredQ; + + template + void HandleShredEnqueue(TAutoPtr> ev) { + STLOG(PRI_DEBUG, BS_SHRED, BSSV00, VCtx->VDiskLogPrefix << "enqueued shred event", (Type, ev->GetTypeRewrite())); + ShredQ.emplace_back(ev.Release()); + } + + void ProcessShredQ() { + for (auto& ev : std::exchange(ShredQ, {})) { + TAutoPtr temp(ev.release()); + Receive(temp); + } + } + + void HandleShred(NPDisk::TEvPreShredCompactVDisk::TPtr ev) { + STLOG(PRI_DEBUG, BS_SHRED, BSSV01, VCtx->VDiskLogPrefix << "processing TEvPreShredCompactVDisk", + (ShredGeneration, ev->Get()->ShredGeneration)); + Send(ev->Sender, new NPDisk::TEvPreShredCompactVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound, + ev->Get()->ShredGeneration, NKikimrProto::OK, TString()), 0, ev->Cookie); + } + + void HandleShred(NPDisk::TEvShredVDisk::TPtr ev) { + STLOG(PRI_DEBUG, BS_SHRED, BSSV02, VCtx->VDiskLogPrefix << "processing TEvShredVDisk", + (ShredGeneration, ev->Get()->ShredGeneration)); + Send(ev->Sender, new NPDisk::TEvShredVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound, + ev->Get()->ShredGeneration, NKikimrProto::OK, TString()), 0, ev->Cookie); + } + + void HandleShredError(NPDisk::TEvPreShredCompactVDisk::TPtr ev) { + STLOG(PRI_DEBUG, BS_SHRED, BSSV03, VCtx->VDiskLogPrefix << "processing TEvPreShredCompactVDisk in error state", + (ShredGeneration, ev->Get()->ShredGeneration)); + Send(ev->Sender, new NPDisk::TEvPreShredCompactVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound, + ev->Get()->ShredGeneration, NKikimrProto::ERROR, "VDisk is in error state"), 0, ev->Cookie); + } + + void HandleShredError(NPDisk::TEvShredVDisk::TPtr ev) { + STLOG(PRI_DEBUG, BS_SHRED, BSSV04, VCtx->VDiskLogPrefix << "processing TEvShredVDisk in error state", + (ShredGeneration, ev->Get()->ShredGeneration)); + Send(ev->Sender, new NPDisk::TEvShredVDiskResult(PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound, + ev->Get()->ShredGeneration, NKikimrProto::ERROR, "VDisk is in error state"), 0, ev->Cookie); + } + // NOTES: we have 4 state functions, one of which is an error state (StateDatabaseError) and // others are good: StateLocalRecovery, StateSyncGuidRecovery, StateNormal // We switch between states in the following manner: @@ -2696,6 +2743,8 @@ namespace NKikimr { hFunc(NPDisk::TEvChunkForgetResult, Handle) FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration) hFunc(TEvReplInvoke, HandleReplNotInProgress) + hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShredEnqueue) + hFunc(NPDisk::TEvShredVDisk, HandleShredEnqueue) ) COUNTED_STRICT_STFUNC(StateSyncGuidRecovery, @@ -2749,6 +2798,8 @@ namespace NKikimr { hFunc(NPDisk::TEvChunkForgetResult, Handle) FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration) hFunc(TEvReplInvoke, HandleReplNotInProgress) + hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShredEnqueue) + hFunc(NPDisk::TEvShredVDisk, HandleShredEnqueue) ) COUNTED_STRICT_STFUNC(StateNormal, @@ -2819,6 +2870,8 @@ namespace NKikimr { FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration) hFunc(TEvReplInvoke, Handle) CFunc(TEvStartBalancing::EventType, RunBalancing) + hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShred) + hFunc(NPDisk::TEvShredVDisk, HandleShred) ) COUNTED_STRICT_STFUNC(StateDatabaseError, @@ -2846,6 +2899,8 @@ namespace NKikimr { hFunc(NPDisk::TEvChunkForgetResult, Handle) FFunc(TEvPrivate::EvCheckSnapshotExpiration, CheckSnapshotExpiration) hFunc(TEvReplInvoke, HandleReplNotInProgress) + hFunc(NPDisk::TEvPreShredCompactVDisk, HandleShredError) + hFunc(NPDisk::TEvShredVDisk, HandleShredError) ) PDISK_TERMINATE_STATE_FUNC_DEF; diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp index a0b22acc05a0..072292476d1f 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp @@ -104,7 +104,7 @@ namespace NKikimr { << " OriginalBlobId# " << OriginalId << " PatchedBlobId# " << PatchedId << " ErrorReason# " << ErrorReason - << " Marker# BSVSP01"); + << " Marker# BSVSP00"); SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), Event->Cookie, VCtx); PassAway(); } diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp index 3ec06132f627..6ad9b10ff166 100644 --- a/ydb/core/mind/bscontroller/bsc.cpp +++ b/ydb/core/mind/bscontroller/bsc.cpp @@ -438,6 +438,7 @@ STFUNC(TBlobStorageController::StateWork) { hFunc(TEvTabletPipe::TEvClientDestroyed, ConsoleInteraction->Handle); hFunc(TEvBlobStorage::TEvGetBlockResult, ConsoleInteraction->Handle); fFunc(TEvBlobStorage::EvControllerShredRequest, EnqueueIncomingEvent); + cFunc(TEvPrivate::EvUpdateShredState, ShredState.HandleUpdateShredState); default: if (!HandleDefaultEvents(ev, SelfId())) { STLOG(PRI_ERROR, BS_CONTROLLER, BSC06, "StateWork unexpected event", (Type, type), diff --git a/ydb/core/mind/bscontroller/config.cpp b/ydb/core/mind/bscontroller/config.cpp index faca35b91ded..02da306c8b7f 100644 --- a/ydb/core/mind/bscontroller/config.cpp +++ b/ydb/core/mind/bscontroller/config.cpp @@ -93,7 +93,8 @@ namespace NKikimr::NBsController { const ui32 nodeId = fullPDiskId.NodeId; const ui32 pdiskId = fullPDiskId.PDiskId; - NKikimrBlobStorage::TNodeWardenServiceSet &service = *Services[nodeId].MutableServiceSet(); + auto& query = Services[nodeId]; + NKikimrBlobStorage::TNodeWardenServiceSet &service = *query.MutableServiceSet(); NKikimrBlobStorage::TNodeWardenServiceSet::TPDisk *pdisk = service.AddPDisks(); pdisk->SetNodeID(nodeId); pdisk->SetPDiskID(pdiskId); @@ -125,6 +126,14 @@ namespace NKikimr::NBsController { break; } + if (auto& shred = Self->ShredState; shred.ShouldShred(fullPDiskId, pdiskInfo)) { + const auto& generation = shred.GetCurrentGeneration(); + Y_ABORT_UNLESS(generation); + auto *m = query.MutableShredRequest(); + m->SetShredGeneration(*generation); + m->AddPDiskIds(fullPDiskId.PDiskId); + } + return pdisk; } @@ -505,6 +514,7 @@ namespace NKikimr::NBsController { CommitStoragePoolStatUpdates(state); CommitSysViewUpdates(state); CommitVirtualGroupUpdates(state); + CommitShredUpdates(state); // add updated and remove deleted vslots from VSlotReadyTimestampQ const TMonotonic now = TActivationContext::Monotonic(); diff --git a/ydb/core/mind/bscontroller/config_fit_pdisks.cpp b/ydb/core/mind/bscontroller/config_fit_pdisks.cpp index 26cf98546d1f..13c07470d7d1 100644 --- a/ydb/core/mind/bscontroller/config_fit_pdisks.cpp +++ b/ydb/core/mind/bscontroller/config_fit_pdisks.cpp @@ -344,7 +344,8 @@ namespace NKikimr { /* nextVslotId */ 1000, disk.PDiskConfig, disk.BoxId, DefaultMaxSlots, NKikimrBlobStorage::EDriveStatus::ACTIVE, /* statusTimestamp */ TInstant::Zero(), NKikimrBlobStorage::EDecommitStatus::DECOMMIT_NONE, NBsController::TPDiskMood::Normal, - disk.Serial, disk.LastSeenSerial, disk.LastSeenPath, staticSlotUsage); + disk.Serial, disk.LastSeenSerial, disk.LastSeenPath, staticSlotUsage, + true /* assume shred completed for this disk */); // Set PDiskId and Guid in DrivesSerials if (auto info = state.DrivesSerials.FindForUpdate(disk.Serial)) { diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index 0324cecae2d9..b488c38e0de5 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -329,6 +329,7 @@ class TBlobStorageController : public TActor, public TTa TMaybe ReadCentric; // null on old versions Table::NextVSlotId::Type NextVSlotId; // null on old versions Table::PDiskConfig::Type PDiskConfig; + bool ShredComplete; TBoxId BoxId; ui32 ExpectedSlotCount = 0; bool HasExpectedSlotCount = false; @@ -349,6 +350,8 @@ class TBlobStorageController : public TActor, public TTa TString LastSeenPath; const ui32 StaticSlotUsage = 0; + bool ShredInProgress = false; // set to true when shredding is started for this disk + template static void Apply(TBlobStorageController* /*controller*/, T&& callback) { static TTableAdapter, public TTa Table::ExpectedSerial, Table::LastSeenSerial, Table::LastSeenPath, - Table::DecommitStatus + Table::DecommitStatus, + Table::ShredComplete > adapter( &TPDiskInfo::Path, &TPDiskInfo::Kind, @@ -380,7 +384,8 @@ class TBlobStorageController : public TActor, public TTa &TPDiskInfo::ExpectedSerial, &TPDiskInfo::LastSeenSerial, &TPDiskInfo::LastSeenPath, - &TPDiskInfo::DecommitStatus + &TPDiskInfo::DecommitStatus, + &TPDiskInfo::ShredComplete ); callback(&adapter); } @@ -402,7 +407,8 @@ class TBlobStorageController : public TActor, public TTa const TString& expectedSerial, const TString& lastSeenSerial, const TString& lastSeenPath, - ui32 staticSlotUsage) + ui32 staticSlotUsage, + bool shredComplete) : HostId(hostId) , Path(path) , Kind(kind) @@ -411,6 +417,7 @@ class TBlobStorageController : public TActor, public TTa , ReadCentric(readCentric) , NextVSlotId(nextVSlotId) , PDiskConfig(std::move(pdiskConfig)) + , ShredComplete(shredComplete) , BoxId(boxId) , Status(status) , StatusTimestamp(statusTimestamp) @@ -1543,7 +1550,6 @@ class TBlobStorageController : public TActor, public TTa TString YamlConfig; ui32 ConfigVersion = 0; TBackoffTimer GetBlockBackoff{1, 1000}; - NKikimrBlobStorage::TShredState ShredState; THashMap> StaticPDiskMap; THashMap StaticPDiskSlotUsage; @@ -1555,6 +1561,9 @@ class TBlobStorageController : public TActor, public TTa bool Loaded = false; std::shared_ptr EnableSelfHealWithDegraded; + struct TLifetimeToken {}; + std::shared_ptr LifetimeToken = std::make_shared(); + std::set> GroupToNode; NKikimrBlobStorage::TSerialManagementStage::E SerialManagementStage @@ -1585,6 +1594,7 @@ class TBlobStorageController : public TActor, public TTa EvVSlotNotReadyHistogramUpdate, EvProcessIncomingEvent, EvUpdateHostRecords, + EvUpdateShredState, }; struct TEvUpdateSystemViews : public TEventLocal {}; @@ -1626,6 +1636,7 @@ class TBlobStorageController : public TActor, public TTa void CommitScrubUpdates(TConfigState& state, TTransactionContext& txc); void CommitStoragePoolStatUpdates(TConfigState& state); void CommitSysViewUpdates(TConfigState& state); + void CommitShredUpdates(TConfigState& state); void InitializeSelfHealState(); void FillInSelfHealGroups(TEvControllerUpdateSelfHealInfo& msg, TConfigState *state); @@ -1847,7 +1858,35 @@ class TBlobStorageController : public TActor, public TTa void ForwardToSystemViewsCollector(STATEFN_SIG); void Handle(TEvPrivate::TEvUpdateSystemViews::TPtr &ev); void Handle(TEvBlobStorage::TEvGetBlockResult::TPtr &ev); - void Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Shred support + + class TShredState { + friend class TBlobStorageController; + friend class TTxUpdateShred; + class TImpl; + std::unique_ptr Impl; + + public: + TShredState(TBlobStorageController *self); + ~TShredState(); + void Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr ev); + void OnLoad(const TString& buffer); + void Initialize(); + void HandleUpdateShredState(); + std::optional GetCurrentGeneration() const; + bool ShouldShred(TPDiskId pdiskId, const TPDiskInfo& pdiskInfo) const; + void OnShredFinished(TPDiskId pdiskId, TPDiskInfo& pdiskInfo, ui64 generation, TTransactionContext& txc); + void OnShredAborted(TPDiskId pdiskId, TPDiskInfo& pdiskInfo); + void OnNodeReportTxComplete(); + void OnRegisterNode(TPDiskId pdiskId, std::optional shredCompleteGeneration, bool shredInProgress, + TTransactionContext& txc); + void OnWardenConnected(TNodeId nodeId); + void Render(IOutputStream& str, const TCgiParameters& cgi); + }; + + TShredState ShredState{this}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Scrub handling @@ -2087,7 +2126,7 @@ class TBlobStorageController : public TActor, public TTa hFunc(TEvBlobStorage::TEvControllerGroupDecommittedNotify, Handle); cFunc(TEvPrivate::EvScrub, ScrubState.HandleTimer); cFunc(TEvPrivate::EvVSlotReadyUpdate, VSlotReadyUpdate); - hFunc(TEvBlobStorage::TEvControllerShredRequest, Handle); + hFunc(TEvBlobStorage::TEvControllerShredRequest, ShredState.Handle); } if (const TDuration time = TDuration::Seconds(timer.Passed()); time >= TDuration::MilliSeconds(100)) { @@ -2127,6 +2166,8 @@ class TBlobStorageController : public TActor, public TTa (Event, ev->ToString())); StateWork(ev); } + + ShredState.Initialize(); } void UpdatePDisksCounters() { diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp index 8162be7fdfda..714cb70af18c 100644 --- a/ydb/core/mind/bscontroller/load_everything.cpp +++ b/ydb/core/mind/bscontroller/load_everything.cpp @@ -94,9 +94,7 @@ class TBlobStorageController::TTxLoadEverything : public TTransactionBaseYamlConfig = state.GetValue(); Self->ConfigVersion = state.GetValue(); if (state.HaveValue()) { - TString buffer = state.GetValue(); - const bool success = Self->ShredState.ParseFromString(buffer); - Y_ABORT_UNLESS(success); + Self->ShredState.OnLoad(state.GetValue()); } } } @@ -329,7 +327,8 @@ class TBlobStorageController::TTxLoadEverything : public TTransactionBase(), disks.GetValue(), boxId, Self->DefaultMaxSlots, disks.GetValue(), disks.GetValue(), disks.GetValue(), disks.GetValue(), disks.GetValue(), - disks.GetValue(), disks.GetValue(), staticSlotUsage); + disks.GetValue(), disks.GetValue(), staticSlotUsage, + disks.GetValueOrDefault()); if (!disks.Next()) return false; diff --git a/ydb/core/mind/bscontroller/monitoring.cpp b/ydb/core/mind/bscontroller/monitoring.cpp index 374955bc70f7..c566743ef28f 100644 --- a/ydb/core/mind/bscontroller/monitoring.cpp +++ b/ydb/core/mind/bscontroller/monitoring.cpp @@ -888,8 +888,7 @@ bool TBlobStorageController::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e } else if (page == "OperationLogEntry") { tx.Reset(new TTxMonEvent_OperationLogEntry(ev->Sender, cgi, this)); } else if (page == "HealthEvents") { - Execute(new TTxMonEvent_HealthEvents(ev->Sender, cgi, this)); - return true; + tx.Reset(new TTxMonEvent_HealthEvents(ev->Sender, cgi, this)); } else if (page == "SelfHeal") { bool hiddenAction = cgi.Has("action") && cgi.Get("action") == "disableSelfHeal"; if (cgi.Has("disable") && cgi.Get("disable") == "1" && hiddenAction) { @@ -909,6 +908,56 @@ bool TBlobStorageController::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e RenderGroupDetail(str, TGroupId::FromValue(groupId)); } else if (page == "Scrub") { ScrubState.Render(str); + } else if (page == "Shred") { + if (cgi.Has("startshred")) { + class TStartShredActor : public TActorBootstrapped { + TBlobStorageController* const Self; + std::weak_ptr LifetimeToken; + const ui64 Generation; + const TActorId Sender; + const TCgiParameters Cgi; + TStringStream Str; + + public: + TStartShredActor(TBlobStorageController *self, ui64 generation, TActorId sender, TCgiParameters cgi) + : Self(self) + , LifetimeToken(self->LifetimeToken) + , Generation(generation) + , Sender(sender) + , Cgi(std::move(cgi)) + {} + + void Bootstrap() { + if (LifetimeToken.expired()) { + Str << "BS_CONTROLLER has been terminated"; + return PassAway(); + } + Send(Self->SelfId(), new TEvBlobStorage::TEvControllerShredRequest(Generation)); + Become(&TThis::StateFunc); + } + + void Handle(TEvBlobStorage::TEvControllerShredResponse::TPtr /*ev*/) { + if (!LifetimeToken.expired()) { + Self->ShredState.Render(Str, Cgi); + } + PassAway(); + } + + void PassAway() override { + Send(Sender, new NMon::TEvRemoteHttpInfoRes(Str.Str())); + TActorBootstrapped::PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvBlobStorage::TEvControllerShredResponse, Handle); + ) + }; + RegisterWithSameMailbox(new TStartShredActor(this, FromStringWithDefault(cgi.Get("generation")), + ev->Sender, cgi)); + return true; + } else { + ShredState.Render(str, cgi); + } } else if (page == "InternalTables") { const TString table = cgi.Has("table") ? cgi.Get("table") : "pdisks"; RenderInternalTables(str, table); @@ -965,6 +1014,7 @@ void TBlobStorageController::RenderMonPage(IOutputStream& out) { (SelfHealEnable ? "enabled" : "disabled") << ")
"; out << "Health events
"; out << "Scrub state
"; + out << "Shred state
"; out << "Virtual groups
"; out << "Internal tables
"; diff --git a/ydb/core/mind/bscontroller/node_report.cpp b/ydb/core/mind/bscontroller/node_report.cpp index ed1b116f1ca9..259c58228929 100644 --- a/ydb/core/mind/bscontroller/node_report.cpp +++ b/ydb/core/mind/bscontroller/node_report.cpp @@ -83,11 +83,13 @@ class TBlobStorageController::TTxNodeReport continue; // ignore incorrect report } - TPDiskId pdiskId(record.GetNodeId(), report.GetPDiskId()); + const TPDiskId pdiskId(record.GetNodeId(), report.GetPDiskId()); TPDiskInfo *pdisk = State->PDisks.FindForUpdate(pdiskId); if (!pdisk) { continue; + } else if (report.HasPDiskGuid() && pdisk->Guid != report.GetPDiskGuid()) { + continue; // race with reused PDisk id } switch (report.GetPhase()) { @@ -99,6 +101,21 @@ class TBlobStorageController::TTxNodeReport pdisk->Mood = TPDiskMood::Normal; } break; + + case NKikimrBlobStorage::TEvControllerNodeReport::PD_SHRED: + switch (report.GetShredStateCase()) { + case NKikimrBlobStorage::TEvControllerNodeReport::TPDiskReport::kShredGenerationFinished: + Self->ShredState.OnShredFinished(pdiskId, *pdisk, report.GetShredGenerationFinished(), txc); + break; + + case NKikimrBlobStorage::TEvControllerNodeReport::TPDiskReport::kShredAborted: + case NKikimrBlobStorage::TEvControllerNodeReport::TPDiskReport::SHREDSTATE_NOT_SET: + STLOG(PRI_ERROR, BS_CONTROLLER, BSCTXNR00, "shred aborted due to error", (PDiskId, pdiskId), + (ErrorReason, report.GetShredAborted())); + Self->ShredState.OnShredAborted(pdiskId, *pdisk); + break; + } + break; } } @@ -117,6 +134,7 @@ class TBlobStorageController::TTxNodeReport State->ApplyConfigUpdates(); State.reset(); } + Self->ShredState.OnNodeReportTxComplete(); } }; diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp index 6c6c46900ba4..c060a5c2dab7 100644 --- a/ydb/core/mind/bscontroller/register_node.cpp +++ b/ydb/core/mind/bscontroller/register_node.cpp @@ -308,6 +308,15 @@ class TBlobStorageController::TTxRegisterNode } Self->ReadPDisk(it->first, *pdisk, Response.get(), entityStatus); + + const TPDiskId pdiskId(it->first); + if (auto& shred = Self->ShredState; shred.ShouldShred(pdiskId, *pdisk)) { + const auto& generation = shred.GetCurrentGeneration(); + Y_ABORT_UNLESS(generation); + auto *m = Response->Record.MutableShredRequest(); + m->SetShredGeneration(*generation); + m->AddPDiskIds(pdiskId.PDiskId); + } } Response->Record.SetInstanceId(Self->InstanceId); @@ -324,6 +333,27 @@ class TBlobStorageController::TTxRegisterNode Self->GroupToNode.emplace(TGroupId::FromValue(groupId), nodeId); } + for (const auto& status : record.GetShredStatus()) { + const TPDiskId pdiskId(nodeId, status.GetPDiskId()); + + switch (status.GetShredStateCase()) { + case NKikimrBlobStorage::TEvControllerRegisterNode::TShredStatus::kShredGenerationFinished: + Self->ShredState.OnRegisterNode(pdiskId, status.GetShredGenerationFinished(), false, txc); + break; + + case NKikimrBlobStorage::TEvControllerRegisterNode::TShredStatus::kShredAborted: + case NKikimrBlobStorage::TEvControllerRegisterNode::TShredStatus::SHREDSTATE_NOT_SET: + STLOG(PRI_ERROR, BS_CONTROLLER, BSCTXRN08, "shred aborted due to error", (PDiskId, pdiskId), + (ErrorReason, status.GetShredAborted())); + Self->ShredState.OnRegisterNode(pdiskId, std::nullopt, false, txc); + break; + + case NKikimrBlobStorage::TEvControllerRegisterNode::TShredStatus::kShredInProgress: + Self->ShredState.OnRegisterNode(pdiskId, std::nullopt, true, txc); + break; + } + } + return true; } @@ -332,6 +362,7 @@ class TBlobStorageController::TTxRegisterNode Self->SendInReply(*Request, std::move(Response)); Self->Execute(new TTxUpdateNodeDrives(std::move(UpdateNodeDrivesRecord), Self)); } + Self->ShredState.OnNodeReportTxComplete(); } }; @@ -517,6 +548,8 @@ void TBlobStorageController::OnWardenConnected(TNodeId nodeId, TActorId serverId } node.LastConnectTimestamp = TInstant::Now(); + + ShredState.OnWardenConnected(nodeId); } void TBlobStorageController::OnWardenDisconnected(TNodeId nodeId, TActorId serverId) { diff --git a/ydb/core/mind/bscontroller/scheme.h b/ydb/core/mind/bscontroller/scheme.h index 5824b32b2b1f..162e5d961862 100644 --- a/ydb/core/mind/bscontroller/scheme.h +++ b/ydb/core/mind/bscontroller/scheme.h @@ -42,10 +42,12 @@ struct Schema : NIceDb::Schema { struct LastSeenPath : Column<16, NScheme::NTypeIds::String> {}; struct DecommitStatus : Column<17, NScheme::NTypeIds::Uint32> { using Type = NKikimrBlobStorage::EDecommitStatus; static constexpr Type Default = Type::DECOMMIT_NONE; }; struct Mood : Column<18, NScheme::NTypeIds::Uint8> { using Type = TPDiskMood::EValue; static constexpr Type Default = Type::Normal; }; + struct ShredComplete : Column<19, NScheme::NTypeIds::Bool> { static constexpr Type Default = true; }; using TKey = TableKey; // order is important using TColumns = TableColumns; + Status, Timestamp, PDiskConfig, ExpectedSerial, LastSeenSerial, LastSeenPath, DecommitStatus, Mood, + ShredComplete>; }; struct Group : Table<4> { diff --git a/ydb/core/mind/bscontroller/shred.cpp b/ydb/core/mind/bscontroller/shred.cpp index b1f8e403c6ef..243aadef31ce 100644 --- a/ydb/core/mind/bscontroller/shred.cpp +++ b/ydb/core/mind/bscontroller/shred.cpp @@ -1,63 +1,522 @@ #include "impl.h" +#include "config.h" namespace NKikimr::NBsController { class TBlobStorageController::TTxUpdateShred : public TTransactionBase { + TBlobStorageController::TShredState::TImpl *Impl; const NKikimrBlobStorage::TEvControllerShredRequest Request; const TActorId Sender; const ui64 Cookie; const TActorId InterconnectSession; + bool Action = false; public: - TTxUpdateShred(TBlobStorageController *controller, TEvBlobStorage::TEvControllerShredRequest::TPtr ev) - : TTransactionBase(controller) - , Request(std::move(ev->Get()->Record)) - , Sender(ev->Sender) - , Cookie(ev->Cookie) - , InterconnectSession(ev->InterconnectSession) + TTxUpdateShred(TShredState::TImpl *impl, TEvBlobStorage::TEvControllerShredRequest::TPtr ev); + TTxType GetTxType() const override { return NBlobStorageController::TXTYPE_UPDATE_SHRED; } + bool Execute(TTransactionContext& txc, const TActorContext&) override; + void Complete(const TActorContext&) override; + }; + + class TBlobStorageController::TShredState::TImpl { + friend class TTxUpdateShred; + TBlobStorageController* const Self; + NKikimrBlobStorage::TShredState ShredState; + THashMap GroupShredForbidden; + THashMap PDiskShredForbidden; + THashSet PDiskShredComplete; + bool Ready = false; + + public: + TImpl(TBlobStorageController *self) + : Self(self) {} - TTxType GetTxType() const override { return NBlobStorageController::TXTYPE_UPDATE_SHRED; } + void Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr ev) { + STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "received TEvControllerShredRequest", (Record, ev->Get()->Record)); + Self->Execute(new TTxUpdateShred(this, ev)); + } + + void OnLoad(const TString& buffer) { + const bool success = ShredState.ParseFromString(buffer); + Y_ABORT_UNLESS(success); + } + + void Initialize() { + for (const auto& [pdiskId, pdiskInfo] : Self->PDisks) { + if (pdiskInfo->ShredComplete) { + PDiskShredComplete.insert(pdiskId); + } + } + TActivationContext::Schedule(TDuration::Seconds(20), new IEventHandle(TEvPrivate::EvUpdateShredState, 0, + Self->SelfId(), TActorId(), nullptr, 0)); + } + + void HandleUpdateShredState() { + Ready = true; + UpdateShredState(0); + } - bool Execute(TTransactionContext& txc, const TActorContext&) override { - auto& current = Self->ShredState; - if (Request.HasGeneration() && (!current.HasGeneration() || current.GetGeneration() < Request.GetGeneration())) { - // reset shred state to initial one with newly provided generation - current.SetGeneration(Request.GetGeneration()); - current.SetCompleted(false); - current.SetProgress10k(0); + void UpdateShredState(TNodeId nodeId) { + if (!ShredState.HasGeneration() || ShredState.GetCompleted() || !Ready) { + return; // nothing to do or everything is already done + } + + THashMap> outbox; + + auto& pdisks = Self->PDisks; + const TPDiskId from(nodeId, Min()); + const TPDiskId to(nodeId ? nodeId : Max(), Max()); + for (auto it = pdisks.lower_bound(from); it != pdisks.end() && !(to < it->first); ++it) { + const auto& [pdiskId, pdiskInfo] = *it; + + if (pdiskInfo->ShredInProgress || pdiskInfo->ShredComplete || PDiskShredForbidden.contains(pdiskId)) { + continue; + } + + if (TNodeInfo *node = Self->FindNode(pdiskId.NodeId); !node || !node->ConnectedServerId) { + continue; + } + + // disallow shredding for PDisks sharing same groups as this one + for (const auto& [id, vslot] : pdiskInfo->VSlotsOnPDisk) { + if (vslot->Group) { + ++GroupShredForbidden[vslot->Group->ID]; + for (const auto& groupSlot : vslot->Group->VDisksInGroup) { + ++PDiskShredForbidden[groupSlot->VSlotId.ComprisingPDiskId()]; + } + } + } + + pdiskInfo->ShredInProgress = true; + + auto& ptr = outbox[pdiskId.NodeId]; + if (!ptr) { + ptr.reset(new TEvBlobStorage::TEvControllerNodeServiceSetUpdate); + } + auto& record = ptr->Record; + record.SetNodeID(pdiskId.NodeId); + auto *shredRequest = record.MutableShredRequest(); + shredRequest->SetShredGeneration(ShredState.GetGeneration()); + shredRequest->AddPDiskIds(pdiskId.PDiskId); + } + + for (auto& [nodeId, ev] : outbox) { + STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "issuing TEvControllerNodeServiceSetUpdate", (NodeId, nodeId), + (Record, ev->Record)); + Self->Send(MakeBlobStorageNodeWardenID(nodeId), ev.release()); + } + } + + std::optional GetCurrentGeneration() const { + return ShredState.HasGeneration() ? std::make_optional(ShredState.GetGeneration()) : std::nullopt; + } - // serialize it to string and update database + bool ShouldShred(TPDiskId /*pdiskId*/, const TPDiskInfo& pdiskInfo) const { + if (!ShredState.HasGeneration() || pdiskInfo.ShredComplete) { + return false; // PDisk is already shredded + } else if (pdiskInfo.ShredInProgress) { + return true; + } else { + return false; + } + } + + void OnShredFinished(TPDiskId pdiskId, TPDiskInfo& pdiskInfo, ui64 generation, TTransactionContext& txc) { + STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "shred finished", (PDiskId, pdiskId), (Generation, generation), + (ShredInProgress, pdiskInfo.ShredInProgress), (ShredComplete, pdiskInfo.ShredComplete), + (CurrentGeneration, GetCurrentGeneration())); + + if (pdiskInfo.ShredInProgress && !pdiskInfo.ShredComplete && generation == GetCurrentGeneration()) { + pdiskInfo.ShredComplete = true; + + const auto [it, inserted] = PDiskShredComplete.insert(pdiskId); + Y_ABORT_UNLESS(inserted); + + // recalculate progress + const ui32 complete = PDiskShredComplete.size(); + const ui32 total = Self->PDisks.size(); + Y_ABORT_UNLESS(complete <= total); + + ShredState.SetProgress10k(complete * 10'000ULL / total); + ShredState.SetCompleted(complete == total); + NIceDb::TNiceDb db(txc.DB); TString buffer; - const bool success = current.SerializeToString(&buffer); + const bool success = ShredState.SerializeToString(&buffer); Y_ABORT_UNLESS(success); - NIceDb::TNiceDb db(txc.DB); db.Table().Key(true).Update(buffer); } - return true; + EndShredForPDisk(pdiskId, pdiskInfo); + } + + void OnShredAborted(TPDiskId pdiskId, TPDiskInfo& pdiskInfo) { + STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "shred aborted", (PDiskId, pdiskId), + (ShredInProgress, pdiskInfo.ShredInProgress), (ShredComplete, pdiskInfo.ShredComplete)); + + EndShredForPDisk(pdiskId, pdiskInfo); + } + + void OnNodeReportTxComplete() { + UpdateShredState(0); // update all nodes as this report may have unblocked some PDisks from shredding } - void Complete(const TActorContext&) override { - auto ev = std::make_unique(); - auto& r = ev->Record; - const auto& current = Self->ShredState; - if (current.HasGeneration()) { - r.SetCurrentGeneration(current.GetGeneration()); - r.SetCompleted(current.GetCompleted()); - r.SetProgress10k(current.GetProgress10k()); + void EndShredForPDisk(TPDiskId /*pdiskId*/, TPDiskInfo& pdiskInfo) { + if (pdiskInfo.ShredInProgress) { + pdiskInfo.ShredInProgress = false; + + for (const auto& [vdiskSlotId, vslot] : pdiskInfo.VSlotsOnPDisk) { + if (vslot->Group) { + const auto it = GroupShredForbidden.find(vslot->Group->ID); + Y_ABORT_UNLESS(it != GroupShredForbidden.end()); + if (!--it->second) { + GroupShredForbidden.erase(it); + } + + for (const auto& groupSlot : vslot->Group->VDisksInGroup) { + const auto pdiskIt = PDiskShredForbidden.find(groupSlot->VSlotId.ComprisingPDiskId()); + Y_ABORT_UNLESS(pdiskIt != PDiskShredForbidden.end()); + if (!--pdiskIt->second) { + PDiskShredForbidden.erase(pdiskIt); + } + } + } + } + } + } + + void OnRegisterNode(TPDiskId pdiskId, std::optional shredCompleteGeneration, bool shredInProgress, + TTransactionContext& txc) { + const auto it = Self->PDisks.find(pdiskId); + if (it == Self->PDisks.end()) { + return; // may be some kind of race or something + } + TPDiskInfo& pdiskInfo = *it->second; + + if (shredCompleteGeneration) { + OnShredFinished(pdiskId, pdiskInfo, *shredCompleteGeneration, txc); } - auto h = std::make_unique(Sender, Self->SelfId(), ev.release(), 0, Cookie); - if (InterconnectSession) { - h->Rewrite(TEvInterconnect::EvForward, InterconnectSession); + if (pdiskInfo.ShredComplete) { + return; + } + + if (!shredInProgress) { + OnShredAborted(pdiskId, pdiskInfo); + } else if (!pdiskInfo.ShredInProgress) { + for (const auto& [id, vslot] : pdiskInfo.VSlotsOnPDisk) { + if (vslot->Group) { + ++GroupShredForbidden[vslot->Group->ID]; + for (const auto& groupSlot : vslot->Group->VDisksInGroup) { + ++PDiskShredForbidden[groupSlot->VSlotId.ComprisingPDiskId()]; + } + } + } + pdiskInfo.ShredInProgress = true; + } + } + + void OnWardenConnected(TNodeId nodeId) { + UpdateShredState(nodeId); // try to issue new shreds specifically for this node + } + + void Render(IOutputStream& str, const TCgiParameters& cgi) { + HTML(str) { + TAG(TH3) { + str << "Shred state"; + } + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Start new shredding iteration"; + } + DIV_CLASS("panel-body") { + FORM_CLASS("form-horizontal") { + DIV_CLASS("control-group") { + LABEL_CLASS_FOR("control-label", "generation") { str << "Shred generation"; } + DIV_CLASS("controls") { + str << ""; + for (const auto& [key, value] : cgi) { + constexpr char apos = '\''; + auto escape = [&](const TString& x) { + for (char ch : x) { + switch (ch) { + case apos: str << "'"; break; + case '&': str << "&"; break; + case '<': str << "<" ; break; + case '>': str << ">" ; break; + default: str << ch ; break; + } + } + }; + str << ""; + } + } + } + DIV_CLASS("control-group") { + DIV_CLASS("controls") { + str << ""; + } + } + } + + } + } + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Settings"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { str << "Parameter"; } + TABLEH() { str << "Value"; } + } + } + TABLEBODY() { + TABLER() { + TABLED() { str << "Generation"; } + TABLED() { str << (ShredState.HasGeneration() ? ToString(ShredState.GetGeneration()) : "none"); } + } + TABLER() { + TABLED() { str << "Progress"; } + TABLED() { + if (ShredState.HasProgress10k()) { + const ui32 progress = ShredState.GetProgress10k(); + str << Sprintf("%d.%02d %%", progress / 100, progress % 100); + } else { + str << "none"; + } + } + } + TABLER() { + TABLED() { str << "Completed"; } + TABLED() { str << (!ShredState.HasCompleted() || ShredState.GetCompleted() ? "yes" : "no"); } + } + } + } + } + } + DIV_CLASS("panel panel-info") { + DIV_CLASS("panel-heading") { + str << "Groups"; + } + DIV_CLASS("panel-body") { + TABLE_CLASS("table") { + TABLEHEAD() { + TABLER() { + TABLEH() { str << "GroupId"; } + TABLEH() { str << "Shred in progress"; } + TABLEH() { str << "PDisks"; } + } + } + TABLEBODY() { + THashSet listedPDisks; + + auto renderPDisk = [&](TPDiskId pdiskId, const TPDiskInfo& pdiskInfo) { + str << "" << pdiskId << ""; + }; + + for (const auto& [groupId, groupInfo] : Self->GroupMap) { + TABLER() { + TABLED() { str << groupId; } + + bool shredInProgress = false; + for (const auto& vslot : groupInfo->VDisksInGroup) { + shredInProgress = shredInProgress || vslot->PDisk->ShredInProgress; + } + TABLED() { str << (shredInProgress ? "yes" : "no"); } + + TABLED() { + bool first = true; + for (const auto& vslot : groupInfo->VDisksInGroup) { + if (first) { + first = false; + } else { + str << ' '; + } + renderPDisk(vslot->VSlotId.ComprisingPDiskId(), *vslot->PDisk); + listedPDisks.insert(vslot->VSlotId.ComprisingPDiskId()); + } + } + } + } + + TABLER() { + TABLED() { str << "other ones"; } + TABLED() {} + TABLED() { + for (const auto& [pdiskId, pdiskInfo] : Self->PDisks) { + if (listedPDisks.contains(pdiskId)) { + continue; + } + renderPDisk(pdiskId, *pdiskInfo); + str << "
"; + } + } + } + } + } + } + } + } + } + + void CommitShredUpdates(TConfigState& state) { + for (const auto& [base, overlay] : state.VSlots.Diff()) { + auto hasGroupOfInterest = [&](const auto& item) { + if constexpr (requires { static_cast(item); }) { + if (!item) { + return false; + } + } + return item->second && item->second->Group && GroupShredForbidden.contains(item->second->Group->ID); + }; + if (int diff = hasGroupOfInterest(overlay) - hasGroupOfInterest(base)) { + const auto it = PDiskShredForbidden.find(overlay->first.ComprisingPDiskId()); + Y_ABORT_UNLESS(it != PDiskShredForbidden.end()); + it->second += diff; + if (!it->second) { + PDiskShredForbidden.erase(it); + } + } + } + + for (const auto& [base, overlay] : state.Groups.Diff()) { + if (base && !overlay->second) { // group was deleted + GroupShredForbidden.erase(base->first); + } + } + + for (const auto& [base, overlay] : state.PDisks.Diff()) { + if (base && !overlay->second) { + Y_ABORT_UNLESS(!PDiskShredForbidden.contains(base->first)); + } } - TActivationContext::Send(h.release()); } }; - void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr ev) { - Execute(new TTxUpdateShred(this, ev)); + TBlobStorageController::TTxUpdateShred::TTxUpdateShred(TShredState::TImpl *impl, TEvBlobStorage::TEvControllerShredRequest::TPtr ev) + : TTransactionBase(impl->Self) + , Impl(impl) + , Request(std::move(ev->Get()->Record)) + , Sender(ev->Sender) + , Cookie(ev->Cookie) + , InterconnectSession(ev->InterconnectSession) + {} + + bool TBlobStorageController::TTxUpdateShred::Execute(TTransactionContext& txc, const TActorContext&) { + auto& current = Impl->ShredState; + if (Request.HasGeneration() && (!current.HasGeneration() || current.GetGeneration() < Request.GetGeneration())) { + // reset shred state to initial one with newly provided generation + current.SetGeneration(Request.GetGeneration()); + current.SetCompleted(false); + current.SetProgress10k(0); + + // serialize it to string and update database + TString buffer; + const bool success = current.SerializeToString(&buffer); + Y_ABORT_UNLESS(success); + NIceDb::TNiceDb db(txc.DB); + db.Table().Key(true).Update(buffer); + + // reset shred competion flag for all shredded PDisks + for (auto& [pdiskId, pdiskInfo] : Self->PDisks) { + if (pdiskInfo->ShredComplete) { + db.Table().Key(pdiskId.GetKey()).Update(false); + pdiskInfo->ShredComplete = false; + Impl->PDiskShredComplete.erase(pdiskId); + Action = true; + } + } + } + + return true; + } + + void TBlobStorageController::TTxUpdateShred::Complete(const TActorContext&) { + auto ev = std::make_unique(); + auto& r = ev->Record; + const auto& current = Impl->ShredState; + if (current.HasGeneration()) { + r.SetCurrentGeneration(current.GetGeneration()); + r.SetCompleted(current.GetCompleted()); + r.SetProgress10k(current.GetProgress10k()); + } + + auto h = std::make_unique(Sender, Impl->Self->SelfId(), ev.release(), 0, Cookie); + if (InterconnectSession) { + h->Rewrite(TEvInterconnect::EvForward, InterconnectSession); + } + TActivationContext::Send(h.release()); + if (Action) { + Impl->UpdateShredState(0); + } + } + + TBlobStorageController::TShredState::TShredState(TBlobStorageController *self) + : Impl(std::make_unique(self)) + {} + + TBlobStorageController::TShredState::~TShredState() = default; + + void TBlobStorageController::TShredState::Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr ev) { + Impl->Handle(ev); + } + + void TBlobStorageController::TShredState::OnLoad(const TString& buffer) { + Impl->OnLoad(buffer); + } + + void TBlobStorageController::TShredState::Initialize() { + Impl->Initialize(); + } + + void TBlobStorageController::TShredState::HandleUpdateShredState() { + Impl->HandleUpdateShredState(); + } + + std::optional TBlobStorageController::TShredState::GetCurrentGeneration() const { + return Impl->GetCurrentGeneration(); + } + + bool TBlobStorageController::TShredState::ShouldShred(TPDiskId pdiskId, const TPDiskInfo& pdiskInfo) const { + return Impl->ShouldShred(pdiskId, pdiskInfo); + } + + void TBlobStorageController::TShredState::OnShredFinished(TPDiskId pdiskId, TPDiskInfo& pdiskInfo, ui64 generation, + TTransactionContext& txc) { + Impl->OnShredFinished(pdiskId, pdiskInfo, generation, txc); + } + + void TBlobStorageController::TShredState::OnShredAborted(TPDiskId pdiskId, TPDiskInfo& pdiskInfo) { + Impl->OnShredAborted(pdiskId, pdiskInfo); + } + + void TBlobStorageController::TShredState::OnNodeReportTxComplete() { + Impl->OnNodeReportTxComplete(); + } + + void TBlobStorageController::TShredState::OnRegisterNode(TPDiskId pdiskId, std::optional shredCompleteGeneration, + bool shredInProgress, TTransactionContext& txc) { + Impl->OnRegisterNode(pdiskId, shredCompleteGeneration, shredInProgress, txc); + } + + void TBlobStorageController::TShredState::OnWardenConnected(TNodeId nodeId) { + Impl->OnWardenConnected(nodeId); + } + + void TBlobStorageController::TShredState::Render(IOutputStream& str, const TCgiParameters& cgi) { + Impl->Render(str, cgi); + } + + void TBlobStorageController::CommitShredUpdates(TConfigState& state) { + ShredState.Impl->CommitShredUpdates(state); } } // NKikimr::NBsController diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 36cd686171aa..d8116cf2845b 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1089,6 +1089,18 @@ message TEvControllerRegisterNode { repeated TVDiskStatus VDiskStatus = 6; // actual status for currently operating VDisks repeated TDriveData DrivesData = 7; optional bool DeclarativePDiskManagement = 8; + + message TShredStatus { + optional uint32 PDiskId = 1; + optional uint64 PDiskGuid = 2; + oneof ShredState { + uint64 ShredGenerationFinished = 3; + string ShredAborted = 4; + bool ShredInProgress = 5; + } + } + + repeated TShredStatus ShredStatus = 9; } message TEvControllerUpdateNodeDrives { @@ -1102,6 +1114,11 @@ message TEvControllerNodeServiceSetUpdate { optional uint32 CurrentGeneration = 2; // current state in the database } + message TShredRequest { + optional uint64 ShredGeneration = 1; + repeated uint32 PDiskIds = 2; // a set of PDisks to start shred at + } + optional NKikimrProto.EReplyStatus Status = 1; optional uint32 NodeID = 2; optional TNodeWardenServiceSet ServiceSet = 3; @@ -1109,6 +1126,7 @@ message TEvControllerNodeServiceSetUpdate { optional string InstanceId = 5; optional bool Comprehensive = 6; // if this flag is set to true, then there will be no any other entities than elisted in the ServiceSet optional uint64 AvailDomain = 7; // availability domain of the controller + optional TShredRequest ShredRequest = 8; } message TEvControllerConfigRequest { @@ -1321,10 +1339,16 @@ message TEvControllerNodeReport { enum EPDiskPhase { PD_UNKNOWN = 0; PD_RESTARTED = 1; + PD_SHRED = 2; } message TPDiskReport { optional uint32 PDiskId = 1; optional EPDiskPhase Phase = 2; + optional uint64 PDiskGuid = 3; + oneof ShredState { + uint64 ShredGenerationFinished = 4; // if set, then PDisk's shredding generation was upped to this one + string ShredAborted = 5; // if set, then PDisk's shredding has been aborted, ShredGeneration kept as it is + } } optional uint32 NodeId = 1; diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index f573cccd412a..4255a1cfc2c9 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -54,6 +54,7 @@ enum EServiceKikimr { BS_PROXY_ASSIMILATE = 347; BS_VDISK_BALANCING = 2600; BS_PROXY_GETBLOCK = 2601; + BS_SHRED = 2602; // DATASHARD section // TX_DATASHARD = 290; // diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema index 6e767bcbbeea..f3fc33aba42f 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_bs_controller_/flat_bs_controller.schema @@ -260,6 +260,11 @@ "ColumnName": "NodeID", "ColumnType": "Uint32" }, + { + "ColumnId": 19, + "ColumnName": "ShredComplete", + "ColumnType": "Bool" + }, { "ColumnId": 2, "ColumnName": "PDiskID", @@ -333,6 +338,7 @@ 17, 18, 1, + 19, 2, 3, 4,