diff --git a/cloud/blockstore/config/storage.proto b/cloud/blockstore/config/storage.proto index f899633f193..0143188c39c 100644 --- a/cloud/blockstore/config/storage.proto +++ b/cloud/blockstore/config/storage.proto @@ -926,4 +926,26 @@ message TStorageServiceConfig // caches. Disk agent config has higher priority than storage config. optional string CachedDiskAgentConfigPath = 350; optional string CachedDiskAgentSessionsPath = 351; + + // Max bandwidth used for shadow disk fill in MiB/s (actual bandwidth is x2 + // due to the need to read and write). + optional uint32 MaxShadowDiskFillBandwidth = 352; + // Minimum delay between shadow disk acquire attempts when writes to the + // source disk are blocked (in ms). + optional uint32 MinAcquireShadowDiskRetryDelayWhenBlocked = 353; + // Maximum delay between shadow disk acquire attempts when writes to the + // source disk are blocked (in ms). + optional uint32 MaxAcquireShadowDiskRetryDelayWhenBlocked = 354; + // Minimum delay between shadow disk acquire attempts when writes to the + // source disk are not blocked (in ms). + optional uint32 MinAcquireShadowDiskRetryDelayWhenNonBlocked = 355; + // Maximum delay between shadow disk acquire attempts when writes to the + // source disk are not blocked (in ms). + optional uint32 MaxAcquireShadowDiskRetryDelayWhenNonBlocked = 356; + // Timeout for attempts to acquire the shadow disk when writes to the source + // disk are blocked (in ms). + optional uint32 MaxAcquireShadowDiskTotalTimeoutWhenBlocked = 357; + // Timeout for attempts to acquire the shadow disk when writes to the source + // disk are not blocked (in ms). + optional uint32 MaxAcquireShadowDiskTotalTimeoutWhenNonBlocked = 358; } diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index 0e5e1f3e0ce..0b6eb7e1ec7 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -468,6 +468,15 @@ TDuration MSeconds(ui32 value) xxx(CachedAcquireRequestLifetime, TDuration, Seconds(40) )\ \ xxx(UnconfirmedBlobCountHardLimit, ui32, 1000 )\ + \ + xxx(MaxShadowDiskFillBandwidth, ui32, 500 )\ + xxx(MinAcquireShadowDiskRetryDelayWhenBlocked, TDuration, MSeconds(250) )\ + xxx(MaxAcquireShadowDiskRetryDelayWhenBlocked, TDuration, Seconds(1) )\ + xxx(MinAcquireShadowDiskRetryDelayWhenNonBlocked, TDuration, Seconds(1) )\ + xxx(MaxAcquireShadowDiskRetryDelayWhenNonBlocked, TDuration, Seconds(10) )\ + xxx(MaxAcquireShadowDiskTotalTimeoutWhenBlocked, TDuration, Seconds(5) )\ + xxx(MaxAcquireShadowDiskTotalTimeoutWhenNonBlocked, TDuration, Seconds(600) )\ + // BLOCKSTORE_STORAGE_CONFIG_RW #define BLOCKSTORE_STORAGE_CONFIG(xxx) \ diff --git a/cloud/blockstore/libs/storage/core/config.h b/cloud/blockstore/libs/storage/core/config.h index 2439670a953..1c3ca0af9b6 100644 --- a/cloud/blockstore/libs/storage/core/config.h +++ b/cloud/blockstore/libs/storage/core/config.h @@ -553,6 +553,14 @@ class TStorageConfig TString GetCachedDiskAgentConfigPath() const; TString GetCachedDiskAgentSessionsPath() const; + + ui32 GetMaxShadowDiskFillBandwidth() const; + TDuration GetMinAcquireShadowDiskRetryDelayWhenBlocked() const; + TDuration GetMaxAcquireShadowDiskRetryDelayWhenBlocked() const; + TDuration GetMinAcquireShadowDiskRetryDelayWhenNonBlocked() const; + TDuration GetMaxAcquireShadowDiskRetryDelayWhenNonBlocked() const; + TDuration GetMaxAcquireShadowDiskTotalTimeoutWhenBlocked() const; + TDuration GetMaxAcquireShadowDiskTotalTimeoutWhenNonBlocked() const; }; ui64 GetAllocationUnit( diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.cpp index d3ca11eb6cd..2eb4cdaf8e2 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.cpp @@ -8,20 +8,19 @@ namespace NCloud::NBlockStore::NStorage { /////////////////////////////////////////////////////////////////////////////// TMigrationTimeoutCalculator::TMigrationTimeoutCalculator( - TStorageConfigPtr config, + ui32 maxMigrationBandwidthMiBs, + ui32 expectedDiskAgentSize, TNonreplicatedPartitionConfigPtr partitionConfig) - : Config(std::move(config)) + : MaxMigrationBandwidthMiBs(maxMigrationBandwidthMiBs) + , ExpectedDiskAgentSize(expectedDiskAgentSize) , PartitionConfig(std::move(partitionConfig)) {} TDuration TMigrationTimeoutCalculator::CalculateTimeout( TBlockRange64 nextProcessingRange) const { - const ui32 maxMigrationBandwidthMiBs = Config->GetMaxMigrationBandwidth(); - const ui32 expectedDiskAgentSize = Config->GetExpectedDiskAgentSize(); - // migration range is 4_MB - const auto migrationFactorPerAgent = maxMigrationBandwidthMiBs / 4; + const auto migrationFactorPerAgent = MaxMigrationBandwidthMiBs / 4.0; if (PartitionConfig->GetUseSimpleMigrationBandwidthLimiter()) { return TDuration::Seconds(1) / migrationFactorPerAgent; @@ -40,8 +39,8 @@ TDuration TMigrationTimeoutCalculator::CalculateTimeout( } const auto factor = - Max(migrationFactorPerAgent * agentDeviceCount / expectedDiskAgentSize, - 1U); + Max(migrationFactorPerAgent * agentDeviceCount / ExpectedDiskAgentSize, + 1.0); return TDuration::Seconds(1) / factor; } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.h b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.h index 2376de99c75..1b5ab82618a 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator.h @@ -12,16 +12,18 @@ namespace NCloud::NBlockStore::NStorage { class TMigrationTimeoutCalculator { private: - const TStorageConfigPtr Config; + const ui32 MaxMigrationBandwidthMiBs = 0; + const ui32 ExpectedDiskAgentSize = 0; TNonreplicatedPartitionConfigPtr PartitionConfig; public: TMigrationTimeoutCalculator( - TStorageConfigPtr config, + ui32 maxMigrationBandwidthMiBs, + ui32 expectedDiskAgentSize, TNonreplicatedPartitionConfigPtr partitionConfig); - [[nodiscard]] TDuration CalculateTimeout( - TBlockRange64 nextProcessingRange) const; + [[nodiscard]] TDuration + CalculateTimeout(TBlockRange64 nextProcessingRange) const; }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp index 6a507f0e1bd..28745ee1afc 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp @@ -42,15 +42,6 @@ TDevices MakeDevices() return result; } -TStorageConfigPtr MakeStorageConfig(ui32 expectedDiskAgentSize) -{ - NProto::TStorageServiceConfig storageConfig; - storageConfig.SetMaxMigrationBandwidth(16); - storageConfig.SetExpectedDiskAgentSize(expectedDiskAgentSize); - - return std::make_shared(std::move(storageConfig), nullptr); -} - TNonreplicatedPartitionConfigPtr MakePartitionConfig( TDevices devices, bool useSimpleMigrationBandwidthLimiter) @@ -80,7 +71,8 @@ Y_UNIT_TEST_SUITE(TMigrationCalculatorTest) Y_UNIT_TEST(ShouldCalculateMigrationTimeout) { TMigrationTimeoutCalculator timeoutCalculator( - MakeStorageConfig(4), + 16, + 4, MakePartitionConfig(MakeDevices(), false)); // Devices #1, #2, #4 belong to Agent#1, device #3 belong to Agent#2. @@ -112,7 +104,8 @@ Y_UNIT_TEST_SUITE(TMigrationCalculatorTest) Y_UNIT_TEST(ShouldCalculateMigrationTimeoutWithSimpleLimiter) { TMigrationTimeoutCalculator timeoutCalculator( - MakeStorageConfig(100500), + 16, + 100500, MakePartitionConfig(MakeDevices(), true)); // When UseSimpleMigrationBandwidthLimiter enabled we expect the same diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp index 7b71b9c9ebc..4f727ec37d8 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_actor.cpp @@ -40,7 +40,10 @@ TNonreplicatedPartitionMigrationActor::TNonreplicatedPartitionMigrationActor( , SrcConfig(std::move(srcConfig)) , Migrations(std::move(migrations)) , RdmaClient(std::move(rdmaClient)) - , TimeoutCalculator(Config, SrcConfig) + , TimeoutCalculator( + Config->GetMaxMigrationBandwidth(), + Config->GetExpectedDiskAgentSize(), + SrcConfig) {} void TNonreplicatedPartitionMigrationActor::OnBootstrap( diff --git a/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.cpp b/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.cpp new file mode 100644 index 00000000000..49b1898349f --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.cpp @@ -0,0 +1,1087 @@ +#include "shadow_disk_actor.h" + +#include +#include +#include +#include +#include +#include +#include + +using namespace NActors; + +namespace NCloud::NBlockStore::NStorage { + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +template +void ForwardMessageToActor( + TEvent& ev, + const NActors::TActorContext& ctx, + TActorId destActor) +{ + NActors::TActorId nondeliveryActor = ev->GetForwardOnNondeliveryRecipient(); + auto message = std::make_unique( + destActor, + ev->Sender, + ev->ReleaseBase().Release(), + ev->Flags, + ev->Cookie, + ev->Flags & NActors::IEventHandle::FlagForwardOnNondelivery + ? &nondeliveryActor + : nullptr); + ctx.Send(std::move(message)); +} + +TString GetDeviceUUIDs(const TDevices& devices) +{ + TString result; + for (const auto& device: devices) { + if (result.empty()) { + result += device.GetDeviceUUID(); + } else { + result += ", " + device.GetDeviceUUID(); + } + } + return result; +} + +bool CheckDeviceUUIDsIdentical( + const TDevices& described, + const TDevices& acquired) +{ + if (described.size() != acquired.size()) { + return false; + } + + for (int i = 0; i < described.size(); ++i) { + if (described[i].GetDeviceUUID() != acquired[i].GetDeviceUUID()) { + return false; + } + } + return true; +} + +/////////////////////////////////////////////////////////////////////////////// + +class TAcquireShadowDiskActor + : public NActors::TActorBootstrapped +{ +private: + const TString ShadowDiskId; + const TShadowDiskActor::EAcquireReason AcquireReason; + const bool ReadOnlyMount; + const TDuration TotalTimeout; + const TActorId ParentActor; + const ui64 MountSeqNumber = 0; + const ui32 Generation = 0; + const TString ShadowDiskClientId = "shadow-disk-client-id"; + + TInstant AcquireStartedAt = {}; + // The list of devices received via the describe request. + // This is necessary to check that all disk devices have been acquired. + TDevices ShadowDiskDevices; + TDevices AcquiredShadowDiskDevices; + + // Delay provider when retrying describe and acquire requests to disk + // registry. + TBackoffDelayProvider RetryDelayProvider; + +public: + TAcquireShadowDiskActor( + const TStorageConfigPtr config, + TString shadowDiskId, + const TDevices& shadowDiskDevices, + TShadowDiskActor::EAcquireReason acquireReason, + bool readOlyMount, + bool areWritesToSourceBlocked, + ui64 mountSeqNumber, + ui32 generation, + TActorId parentActor); + + void Bootstrap(const TActorContext& ctx); + +private: + void DescribeShadowDisk(const NActors::TActorContext& ctx); + void AcquireShadowDisk(const NActors::TActorContext& ctx); + + std::unique_ptr + MakeDescribeDiskRequest() const; + std::unique_ptr + MakeAcquireDiskRequest() const; + + void HandleDiskRegistryError( + const NActors::TActorContext& ctx, + const NProto::TError& error, + std::unique_ptr retryEvent, + const TString& actionName); + + void MaybeReady(const TActorContext& ctx); + void ReplyAndDie(const TActorContext& ctx, const NProto::TError& error); + +private: + STFUNC(Work); + + void HandleDescribeDiskResponse( + const TEvDiskRegistry::TEvDescribeDiskResponse::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleAcquireDiskResponse( + const TEvDiskRegistry::TEvAcquireDiskResponse::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleDescribeDiskRequestUndelivery( + const TEvDiskRegistry::TEvDescribeDiskRequest::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleAcquireDiskRequestUndelivery( + const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleWakeup( + const NActors::TEvents::TEvWakeup::TPtr& ev, + const NActors::TActorContext& ctx); +}; + +/////////////////////////////////////////////////////////////////////////////// + +TAcquireShadowDiskActor::TAcquireShadowDiskActor( + const TStorageConfigPtr config, + TString shadowDiskId, + const TDevices& shadowDiskDevices, + TShadowDiskActor::EAcquireReason acquireReason, + bool readOnlyMount, + bool areWritesToSourceBlocked, + ui64 mountSeqNumber, + ui32 generation, + TActorId parentActor) + : ShadowDiskId(std::move(shadowDiskId)) + , AcquireReason(acquireReason) + , ReadOnlyMount(readOnlyMount) + , TotalTimeout( + areWritesToSourceBlocked + ? config->GetMaxAcquireShadowDiskTotalTimeoutWhenBlocked() + : config->GetMaxAcquireShadowDiskTotalTimeoutWhenNonBlocked()) + , ParentActor(parentActor) + , MountSeqNumber(mountSeqNumber) + , Generation(generation) + , ShadowDiskDevices(shadowDiskDevices) + , RetryDelayProvider( + areWritesToSourceBlocked + ? config->GetMinAcquireShadowDiskRetryDelayWhenBlocked() + : config->GetMinAcquireShadowDiskRetryDelayWhenNonBlocked(), + areWritesToSourceBlocked + ? config->GetMaxAcquireShadowDiskRetryDelayWhenBlocked() + : config->GetMaxAcquireShadowDiskRetryDelayWhenNonBlocked()) +{ + if (AcquireReason != TShadowDiskActor::EAcquireReason::FirstAcquire) { + STORAGE_CHECK_PRECONDITION(!ShadowDiskDevices.empty()); + } +} + +void TAcquireShadowDiskActor::Bootstrap(const TActorContext& ctx) +{ + Become(&TThis::Work); + + DescribeShadowDisk(ctx); + AcquireShadowDisk(ctx); + + AcquireStartedAt = ctx.Now(); + ctx.Schedule(TotalTimeout, new TEvents::TEvWakeup()); +} + +void TAcquireShadowDiskActor::DescribeShadowDisk( + const NActors::TActorContext& ctx) +{ + if (!ShadowDiskDevices.empty()) { + // We will not describe devices if this is not the first acquire and we + // already know them. + return; + } + LOG_INFO_S( + ctx, + TBlockStoreComponents::VOLUME, + "Describing shadow disk " << ShadowDiskId.Quote()); + + NCloud::SendWithUndeliveryTracking( + ctx, + MakeDiskRegistryProxyServiceId(), + MakeDescribeDiskRequest()); +} + +void TAcquireShadowDiskActor::AcquireShadowDisk( + const NActors::TActorContext& ctx) +{ + if (AcquireReason != TShadowDiskActor::EAcquireReason::PeriodicalReAcquire) + { + LOG_INFO_S( + ctx, + TBlockStoreComponents::VOLUME, + "Acquiring shadow disk " << ShadowDiskId.Quote() << " with timeout " + << TotalTimeout.ToString()); + } + + NCloud::SendWithUndeliveryTracking( + ctx, + MakeDiskRegistryProxyServiceId(), + MakeAcquireDiskRequest()); +} + +auto TAcquireShadowDiskActor::MakeDescribeDiskRequest() const + -> std::unique_ptr +{ + auto request = std::make_unique(); + request->Record.SetDiskId(ShadowDiskId); + return request; +} + +auto TAcquireShadowDiskActor::MakeAcquireDiskRequest() const + -> std::unique_ptr +{ + auto request = std::make_unique(); + request->Record.SetDiskId(ShadowDiskId); + request->Record.MutableHeaders()->SetClientId(ShadowDiskClientId); + request->Record.SetAccessMode( + ReadOnlyMount ? NProto::EVolumeAccessMode::VOLUME_ACCESS_READ_ONLY + : NProto::EVolumeAccessMode::VOLUME_ACCESS_READ_WRITE); + request->Record.SetMountSeqNumber(MountSeqNumber); + request->Record.SetVolumeGeneration(Generation); + return request; +} + +void TAcquireShadowDiskActor::HandleDiskRegistryError( + const NActors::TActorContext& ctx, + const NProto::TError& error, + std::unique_ptr retryEvent, + const TString& actionName) +{ + LOG_DEBUG_S( + ctx, + TBlockStoreComponents::VOLUME, + "Can't " << actionName << " shadow disk " << ShadowDiskId.Quote() + << " Error: " << FormatError(error)); + + const TInstant timeoutElapsedAt = AcquireStartedAt + TotalTimeout; + + const bool canRetry = GetErrorKind(error) == EErrorKind::ErrorRetriable && + timeoutElapsedAt > ctx.Now(); + + if (canRetry) { + LOG_WARN_S( + ctx, + TBlockStoreComponents::VOLUME, + "Will soon retry " << actionName << " shadow disk " + << ShadowDiskId.Quote() + << " Error: " << FormatError(error) + << " with delay " << RetryDelayProvider.GetDelay().ToString()); + + TActivationContext::Schedule( + RetryDelayProvider.GetDelayAndIncrease(), + std::move(retryEvent), + nullptr); + } else { + LOG_ERROR_S( + ctx, + TBlockStoreComponents::VOLUME, + "Will not retry " << actionName << " shadow disk " + << ShadowDiskId.Quote() + << " Error: " << FormatError(error)); + ReplyAndDie(ctx, error); + } +} + +void TAcquireShadowDiskActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& error) +{ + auto response = + std::make_unique(error); + if (!HasError(response->GetError())) { + response->Record.MutableDevices()->Swap(&AcquiredShadowDiskDevices); + } + + NCloud::Send( + ctx, + ParentActor, + std::move(response), + static_cast(AcquireReason)); + + Die(ctx); +} + +STFUNC(TAcquireShadowDiskActor::Work) +{ + switch (ev->GetTypeRewrite()) { + HFunc( + TEvDiskRegistry::TEvDescribeDiskResponse, + HandleDescribeDiskResponse); + HFunc( + TEvDiskRegistry::TEvAcquireDiskResponse, + HandleAcquireDiskResponse); + HFunc( + TEvDiskRegistry::TEvDescribeDiskRequest, + HandleDescribeDiskRequestUndelivery); + HFunc( + TEvDiskRegistry::TEvAcquireDiskRequest, + HandleAcquireDiskRequestUndelivery); + HFunc(NActors::TEvents::TEvWakeup, HandleWakeup); + + default: + HandleUnexpectedEvent(ev, TBlockStoreComponents::SERVICE); + break; + } +} + +void TAcquireShadowDiskActor::HandleDescribeDiskResponse( + const TEvDiskRegistry::TEvDescribeDiskResponse::TPtr& ev, + const NActors::TActorContext& ctx) +{ + auto* msg = ev->Get(); + auto& record = msg->Record; + if (HasError(record.GetError())) { + HandleDiskRegistryError( + ctx, + record.GetError(), + std::make_unique( + MakeDiskRegistryProxyServiceId(), + ctx.SelfID, + MakeDescribeDiskRequest().release()), + "describe"); + return; + } + + ShadowDiskDevices.Swap(record.MutableDevices()); + MaybeReady(ctx); +} + +void TAcquireShadowDiskActor::HandleAcquireDiskResponse( + const TEvDiskRegistry::TEvAcquireDiskResponse::TPtr& ev, + const NActors::TActorContext& ctx) +{ + auto* msg = ev->Get(); + auto& record = msg->Record; + + if (HasError(record.GetError())) { + HandleDiskRegistryError( + ctx, + record.GetError(), + std::make_unique( + MakeDiskRegistryProxyServiceId(), + ctx.SelfID, + MakeAcquireDiskRequest().release()), + "acquire"); + return; + } + + AcquiredShadowDiskDevices.Swap(record.MutableDevices()); + MaybeReady(ctx); +} + +void TAcquireShadowDiskActor::HandleDescribeDiskRequestUndelivery( + const TEvDiskRegistry::TEvDescribeDiskRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + DescribeShadowDisk(ctx); +} + +void TAcquireShadowDiskActor::HandleAcquireDiskRequestUndelivery( + const TEvDiskRegistry::TEvAcquireDiskRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + AcquireShadowDisk(ctx); +} + +void TAcquireShadowDiskActor::HandleWakeup( + const NActors::TEvents::TEvWakeup::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + + LOG_ERROR_S( + ctx, + TBlockStoreComponents::VOLUME, + "Acquire timeout. Shadow disk " << ShadowDiskId.Quote()); + + ReplyAndDie(ctx, MakeError(E_TIMEOUT)); +} + +void TAcquireShadowDiskActor::MaybeReady(const NActors::TActorContext& ctx) +{ + bool gotDescribeAndAcquireResponses = + !ShadowDiskDevices.empty() && !AcquiredShadowDiskDevices.empty(); + if (!gotDescribeAndAcquireResponses) { + return; + } + + // Check all shadow disk devices have been acquired. + if (!CheckDeviceUUIDsIdentical( + ShadowDiskDevices, + AcquiredShadowDiskDevices)) + { + HandleDiskRegistryError( + ctx, + MakeError( + E_REJECTED, + TStringBuilder() + << "The acquired devices are not identical to described. " + "Described [" + << GetDeviceUUIDs(ShadowDiskDevices) << "] acquired [" + << GetDeviceUUIDs(AcquiredShadowDiskDevices) << "]"), + std::make_unique( + MakeDiskRegistryProxyServiceId(), + ctx.SelfID, + MakeAcquireDiskRequest().release(), + 0, // flags + static_cast( + TShadowDiskActor::EAcquireReason::FirstAcquire)), + "acquire"); + AcquiredShadowDiskDevices.Clear(); + return; + }; + + if (AcquireReason != TShadowDiskActor::EAcquireReason::PeriodicalReAcquire) + { + LOG_INFO_S( + ctx, + TBlockStoreComponents::VOLUME, + "Acquired shadow disk " << ShadowDiskId.Quote()); + } + + ReplyAndDie(ctx, MakeError(S_OK)); +} + +} // namespace + +/////////////////////////////////////////////////////////////////////////////// + +TShadowDiskActor::TShadowDiskActor( + TStorageConfigPtr config, + NRdma::IClientPtr rdmaClient, + IProfileLogPtr profileLog, + IBlockDigestGeneratorPtr digestGenerator, + TString rwClientId, + ui64 mountSeqNumber, + ui32 generation, + TNonreplicatedPartitionConfigPtr srcConfig, + TActorId volumeActorId, + TActorId srcActorId, + const TActiveCheckpointInfo& checkpointInfo) + : TNonreplicatedPartitionMigrationCommonActor( + static_cast(this), + config, + srcConfig->GetName(), + srcConfig->GetBlockCount(), + srcConfig->GetBlockSize(), + std::move(profileLog), + std::move(digestGenerator), + checkpointInfo.ProcessedBlockCount, + std::move(rwClientId), + volumeActorId) + , Config(std::move(config)) + , RdmaClient(std::move(rdmaClient)) + , SrcConfig(std::move(srcConfig)) + , CheckpointId(checkpointInfo.CheckpointId) + , ShadowDiskId(checkpointInfo.ShadowDiskId) + , MountSeqNumber(mountSeqNumber) + , Generation(generation) + , VolumeActorId(volumeActorId) + , SrcActorId(srcActorId) + , ProcessedBlockCount(checkpointInfo.ProcessedBlockCount) +{ + STORAGE_CHECK_PRECONDITION( + checkpointInfo.Data == ECheckpointData::DataPresent); + + switch (checkpointInfo.ShadowDiskState) { + case EShadowDiskState::None: + STORAGE_CHECK_PRECONDITION( + checkpointInfo.ShadowDiskState != EShadowDiskState::None); + break; + case EShadowDiskState::New: + State = EActorState::WaitAcquireForPrepareStart; + break; + case EShadowDiskState::Preparing: + State = checkpointInfo.ProcessedBlockCount == 0 + ? EActorState::WaitAcquireForPrepareStart + : EActorState::WaitAcquireForPrepareContinue; + break; + case EShadowDiskState::Ready: + State = EActorState::WaitAcquireForRead; + break; + case EShadowDiskState::Error: + State = EActorState::Error; + break; + } +} + +TShadowDiskActor::~TShadowDiskActor() = default; + +void TShadowDiskActor::OnBootstrap(const NActors::TActorContext& ctx) +{ + PoisonPillHelper.TakeOwnership(ctx, SrcActorId); + AcquireShadowDisk(ctx, EAcquireReason::FirstAcquire); +} + +bool TShadowDiskActor::OnMessage( + const TActorContext& ctx, + TAutoPtr& ev) +{ + switch (ev->GetTypeRewrite()) { + HFunc( + TEvDiskRegistry::TEvAcquireDiskResponse, + HandleAcquireDiskResponse); + HFunc( + TEvNonreplPartitionPrivate::TEvUpdateCounters, + HandleUpdateCounters); + HFunc( + TEvVolume::TEvDiskRegistryBasedPartitionCounters, + HandleShadowDiskCounters); + HFunc(NActors::TEvents::TEvWakeup, HandleWakeup); + HFunc(TEvVolume::TEvReacquireDisk, HandleReacquireDisk); + HFunc(TEvVolume::TEvRdmaUnavailable, HandleRdmaUnavailable); + HFunc( + TEvVolumePrivate::TEvUpdateShadowDiskStateResponse, + HandleUpdateShadowDiskStateResponse); + + // Read request. + HFunc( + TEvService::TEvReadBlocksRequest, + HandleReadBlocks); + HFunc( + TEvService::TEvReadBlocksLocalRequest, + HandleReadBlocks); + + // Write/zero request. + case TEvService::TEvWriteBlocksRequest::EventType: { + return HandleWriteZeroBlocks( + *reinterpret_cast( + &ev), + ctx); + } + case TEvService::TEvWriteBlocksLocalRequest::EventType: { + return HandleWriteZeroBlocks( + *reinterpret_cast< + TEvService::TEvWriteBlocksLocalRequest::TPtr*>(&ev), + ctx); + } + case TEvService::TEvZeroBlocksRequest::EventType: { + return HandleWriteZeroBlocks( + *reinterpret_cast(&ev), + ctx); + } + + default: + // Message processing by the base class is required. + return false; + } + + // We get here if we have processed an incoming message. And its processing + // by the base class is not required. + return true; +} + +TDuration TShadowDiskActor::CalculateMigrationTimeout() +{ + STORAGE_CHECK_PRECONDITION(TimeoutCalculator); + + if (TimeoutCalculator) { + return TimeoutCalculator->CalculateTimeout(GetNextProcessingRange()); + } + return TDuration::Seconds(1); +} + +void TShadowDiskActor::OnMigrationProgress( + const NActors::TActorContext& ctx, + ui64 migrationIndex) +{ + using EReason = TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; + + ProcessedBlockCount = migrationIndex; + + auto request = + std::make_unique( + CheckpointId, + EReason::FillProgressUpdate, + migrationIndex, + SrcConfig->GetBlockCount()); + + NCloud::Send(ctx, VolumeActorId, std::move(request)); +} + +void TShadowDiskActor::OnMigrationFinished(const NActors::TActorContext& ctx) +{ + using EReason = TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; + + ProcessedBlockCount = SrcConfig->GetBlockCount(); + + auto request = + std::make_unique( + CheckpointId, + EReason::FillCompleted, + ProcessedBlockCount, + SrcConfig->GetBlockCount()); + + NCloud::Send(ctx, VolumeActorId, std::move(request)); +} + +void TShadowDiskActor::OnMigrationError(const NActors::TActorContext& ctx) +{ + using EReason = TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; + + auto request = + std::make_unique( + CheckpointId, + EReason::FillError, + ProcessedBlockCount, + SrcConfig->GetBlockCount()); + + NCloud::Send(ctx, VolumeActorId, std::move(request)); +} + +void TShadowDiskActor::AcquireShadowDisk( + const NActors::TActorContext& ctx, + EAcquireReason acquireReason) +{ + switch (acquireReason) { + case EAcquireReason::FirstAcquire: { + STORAGE_CHECK_PRECONDITION(WaitingForAcquire()); + STORAGE_CHECK_PRECONDITION(DstActorId == TActorId()); + STORAGE_CHECK_PRECONDITION(AcquireActorId == TActorId()); + } break; + case EAcquireReason::PeriodicalReAcquire: { + STORAGE_CHECK_PRECONDITION(!WaitingForAcquire()); + STORAGE_CHECK_PRECONDITION(DstActorId != TActorId()); + + if (AcquireActorId != TActorId()) { + return; + } + } break; + case EAcquireReason::ForcedReAcquire: { + STORAGE_CHECK_PRECONDITION(!WaitingForAcquire()); + STORAGE_CHECK_PRECONDITION(DstActorId != TActorId()); + + if (ForcedReAcquireInProgress) { + return; + } + ForcedReAcquireInProgress = true; + } break; + } + + AcquireActorId = NCloud::Register( + ctx, + std::make_unique( + Config, + ShadowDiskId, + ShadowDiskDevices, + acquireReason, + ReadOnlyMount(), + AreWritesToSrcDiskImpossible(), + MountSeqNumber, + Generation, + SelfId())); + + PoisonPillHelper.TakeOwnership(ctx, AcquireActorId); +} + +void TShadowDiskActor::HandleAcquireDiskResponse( + const TEvDiskRegistry::TEvAcquireDiskResponse::TPtr& ev, + const NActors::TActorContext& ctx) +{ + PoisonPillHelper.ReleaseOwnership(ctx, ev->Sender); + if (AcquireActorId == ev->Sender) { + AcquireActorId = {}; + } + + auto* msg = ev->Get(); + auto& record = msg->Record; + auto acquireReason = static_cast(ev->Cookie); + + if (acquireReason == EAcquireReason::ForcedReAcquire) { + ForcedReAcquireInProgress = false; + } + + if (HasError(record.GetError())) { + if (acquireReason != EAcquireReason::PeriodicalReAcquire) { + SetErrorState(ctx); + } + return; + } + + if (acquireReason == EAcquireReason::FirstAcquire) { + CreateShadowDiskPartitionActor(ctx, record.GetDevices()); + } +} + +void TShadowDiskActor::CreateShadowDiskConfig() +{ + STORAGE_CHECK_PRECONDITION(!ShadowDiskDevices.empty()); + + TNonreplicatedPartitionConfig::TVolumeInfo volumeInfo{ + TInstant(), + GetCheckpointShadowDiskType(SrcConfig->GetVolumeInfo().MediaKind)}; + + DstConfig = std::make_shared( + ShadowDiskDevices, + ReadOnlyMount() ? NProto::VOLUME_IO_ERROR_READ_ONLY + : NProto::VOLUME_IO_OK, + ShadowDiskId, + SrcConfig->GetBlockSize(), + volumeInfo, + SelfId(), // need to handle TEvRdmaUnavailable, TEvReacquireDisk + true, // muteIOErrors + false, // markBlocksUsed + THashSet(), // freshDeviceIds + TDuration(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + true // useSimpleMigrationBandwidthLimiter + ); + + TimeoutCalculator.emplace( + Config->GetMaxShadowDiskFillBandwidth(), + Config->GetExpectedDiskAgentSize(), + DstConfig); +} + +void TShadowDiskActor::CreateShadowDiskPartitionActor( + const NActors::TActorContext& ctx, + const TDevices& acquiredShadowDiskDevices) +{ + using EReason = TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; + + STORAGE_CHECK_PRECONDITION(WaitingForAcquire()); + STORAGE_CHECK_PRECONDITION(DstActorId == TActorId()); + + ShadowDiskDevices = acquiredShadowDiskDevices; + + CreateShadowDiskConfig(); + + DstActorId = NCloud::Register( + ctx, + CreateNonreplicatedPartition( + Config, + DstConfig, + SelfId(), + RdmaClient)); + PoisonPillHelper.TakeOwnership(ctx, DstActorId); + + if (State == EActorState::WaitAcquireForRead) { + // Ready to serve checkpoint reads. + State = EActorState::CheckpointReady; + } else { + STORAGE_CHECK_PRECONDITION( + State == EActorState::WaitAcquireForPrepareStart || + State == EActorState::WaitAcquireForPrepareContinue); + + // Ready to fill shadow disk with data. + State = EActorState::Preparing; + + TNonreplicatedPartitionMigrationCommonActor::InitWork( + ctx, + SrcActorId, + DstActorId); + TNonreplicatedPartitionMigrationCommonActor::StartWork(ctx); + + // Persist state. + NCloud::Send( + ctx, + VolumeActorId, + std::make_unique( + CheckpointId, + EReason::FillProgressUpdate, + GetNextProcessingRange().Start, + SrcConfig->GetBlockCount())); + } + + STORAGE_CHECK_PRECONDITION(Acquired()); + SchedulePeriodicalReAcquire(ctx); +} + +void TShadowDiskActor::SchedulePeriodicalReAcquire(const TActorContext& ctx) +{ + ctx.Schedule(Config->GetClientRemountPeriod(), new TEvents::TEvWakeup()); +} + +void TShadowDiskActor::SetErrorState(const NActors::TActorContext& ctx) +{ + using EReason = TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; + + State = EActorState::Error; + + // Persist state. + NCloud::Send( + ctx, + VolumeActorId, + std::make_unique( + CheckpointId, + EReason::FillError, + ProcessedBlockCount, + SrcConfig->GetBlockCount())); +} + +bool TShadowDiskActor::CanJustForwardWritesToSrcDisk() const +{ + return State == EActorState::CheckpointReady || + State == EActorState::Error || + State == EActorState::WaitAcquireForPrepareStart; +} + +bool TShadowDiskActor::AreWritesToSrcDiskForbidden() const +{ + return State == EActorState::WaitAcquireForPrepareContinue; +} + +bool TShadowDiskActor::AreWritesToSrcDiskImpossible() const +{ + return AreWritesToSrcDiskForbidden() || ForcedReAcquireInProgress; +} + +bool TShadowDiskActor::WaitingForAcquire() const +{ + return State == EActorState::WaitAcquireForPrepareStart || + State == EActorState::WaitAcquireForPrepareContinue || + State == EActorState::WaitAcquireForRead; +} + +bool TShadowDiskActor::Acquired() const +{ + return State == EActorState::Preparing || + State == EActorState::CheckpointReady; +} + +bool TShadowDiskActor::ReadOnlyMount() const +{ + STORAGE_CHECK_PRECONDITION(State != EActorState::Error); + + return State == EActorState::WaitAcquireForRead || + State == EActorState::CheckpointReady; +} + +template +void TShadowDiskActor::HandleReadBlocks( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + auto* msg = ev->Get(); + auto& record = msg->Record; + const auto& checkpointId = record.GetCheckpointId(); + + if (checkpointId.empty() || checkpointId != CheckpointId) { + // Forward read request to Source partition. + ForwardRequestToSrcPartition(ev, ctx); + return; + } + + if (State != EActorState::CheckpointReady) { + NCloud::Reply( + ctx, + *ev, + std::make_unique(MakeError( + E_REJECTED, + TStringBuilder() + << "Can't read from checkpoint " << CheckpointId.Quote() + << " while the data is being filled in."))); + return; + } + + // Remove checkpointId and read checkpoint data from shadow disk. + record.SetCheckpointId(TString()); + ForwardRequestToShadowPartition(ev, ctx); +} + +template +bool TShadowDiskActor::HandleWriteZeroBlocks( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + if (CanJustForwardWritesToSrcDisk()) { + ForwardRequestToSrcPartition(ev, ctx); + return true; + } + + if (AreWritesToSrcDiskForbidden()) { + NCloud::Reply( + ctx, + *ev, + std::make_unique(MakeError( + E_REJECTED, + TStringBuilder() + << "Can't write to source disk while shadow disk " + << ShadowDiskId.Quote() << " not ready yet."))); + return true; + } + + // Migration is currently in progress. It is necessary to forward write/zero + // requests to the source and shadow disk with the base class + // TNonreplicatedPartitionMigrationCommonActor + return false; +} + +template +void TShadowDiskActor::ForwardRequestToSrcPartition( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + ForwardMessageToActor(ev, ctx, SrcActorId); +} + +template +void TShadowDiskActor::ForwardRequestToShadowPartition( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + if (DstActorId == NActors::TActorId()) { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + MakeError(E_REJECTED, "shadow disk partition not ready yet"))); + return; + } + + if (State != EActorState::CheckpointReady) { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + MakeError(E_REJECTED, "shadow disk fill in progress"))); + return; + } + + ForwardMessageToActor(ev, ctx, DstActorId); +} + +void TShadowDiskActor::HandleUpdateShadowDiskStateResponse( + const TEvVolumePrivate::TEvUpdateShadowDiskStateResponse::TPtr& ev, + const NActors::TActorContext& ctx) +{ + auto* msg = ev->Get(); + + switch (msg->NewState) { + case EShadowDiskState::None: + case EShadowDiskState::New: { + STORAGE_CHECK_PRECONDITION( + msg->NewState != EShadowDiskState::New && + msg->NewState != EShadowDiskState::None); + LOG_ERROR_S( + ctx, + TBlockStoreComponents::VOLUME, + "State of shadow disk " << ShadowDiskId.Quote() + << " unexpectedly changed to " + << ToString(msg->NewState)); + State = EActorState::Error; + } break; + case EShadowDiskState::Preparing: { + LOG_INFO_S( + ctx, + TBlockStoreComponents::VOLUME, + "State of shadow disk " + << ShadowDiskId.Quote() << " changed to " + << ToString(msg->NewState) + << ", processed block count: " << msg->ProcessedBlockCount); + State = EActorState::Preparing; + STORAGE_CHECK_PRECONDITION(Acquired()); + } break; + case EShadowDiskState::Ready: { + LOG_INFO_S( + ctx, + TBlockStoreComponents::VOLUME, + "State of shadow disk " << ShadowDiskId.Quote() + << " changed to " + << ToString(msg->NewState)); + State = EActorState::CheckpointReady; + STORAGE_CHECK_PRECONDITION(Acquired()); + } break; + case EShadowDiskState::Error: { + LOG_WARN_S( + ctx, + TBlockStoreComponents::VOLUME, + "State of shadow disk " << ShadowDiskId.Quote() + << " changed to " + << ToString(msg->NewState)); + State = EActorState::Error; + } break; + } +} + +void TShadowDiskActor::HandleUpdateCounters( + const TEvNonreplPartitionPrivate::TEvUpdateCounters::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + Y_UNUSED(ctx); + + // Block sending statistics counters from the base class by processing the + // TEvUpdateCounters message ourselves. +} + +void TShadowDiskActor::HandleShadowDiskCounters( + const TEvVolume::TEvDiskRegistryBasedPartitionCounters::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + Y_UNUSED(ctx); + + // TODO. Do we need to count the statistics of the shadow disk in the source + // disk? +} + +void TShadowDiskActor::HandleWakeup( + const NActors::TEvents::TEvWakeup::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + + AcquireShadowDisk(ctx, EAcquireReason::PeriodicalReAcquire); + SchedulePeriodicalReAcquire(ctx); +} + +void TShadowDiskActor::HandleRdmaUnavailable( + const TEvVolume::TEvRdmaUnavailable::TPtr& ev, + const TActorContext& ctx) +{ + ForwardMessageToActor(ev, ctx, VolumeActorId); +} + +void TShadowDiskActor::HandleReacquireDisk( + const TEvVolume::TEvReacquireDisk::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + + // If an E_BS_INVALID_SESSION error occurred while working with the shadow + // disk, the shadow disk partition sends a TEvReacquireDisk message. In this + // case, we try to acquire shadow disk again. + + // If we are in prepare state, this means that writing errors to the + // shadow disk will lead to blocking of the source disk. In that case, we + // are trying to reacquire during short period MaxBlockingTotalTimeout. + + // If we are in ready state, this means that writing errors to the + // shadow disk will not lead to blocking of the source disk. In that case, + // we are trying to reacquire during long period MaxNonBlockingTotalTimeout. + + LOG_WARN_S( + ctx, + TBlockStoreComponents::VOLUME, + "Got TEvReacquireDisk for shadow disk " << ShadowDiskId.Quote()); + + switch (State) { + case EActorState::WaitAcquireForPrepareStart: + case EActorState::WaitAcquireForPrepareContinue: + case EActorState::WaitAcquireForRead: + case EActorState::Error: { + // If we are waiting for disk acquire, or in error state, then we + // should not receive TEvReacquireDisk message. + STORAGE_CHECK_PRECONDITION(false); + } break; + case EActorState::Preparing: + case EActorState::CheckpointReady: { + AcquireShadowDisk(ctx, EAcquireReason::ForcedReAcquire); + } break; + } +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.h b/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.h new file mode 100644 index 00000000000..2030300c006 --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.h @@ -0,0 +1,217 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace NCloud::NBlockStore::NStorage { + +/////////////////////////////////////////////////////////////////////////////// + +// The actor for shadow disk. Can migrate data from the source disk into the +// shadow disk and serves read requests from the checkpoint. +class TShadowDiskActor final + : public TNonreplicatedPartitionMigrationCommonActor + , public IMigrationOwner +{ +public: + enum class EAcquireReason + { + FirstAcquire, + PeriodicalReAcquire, + ForcedReAcquire, + }; + +private: + + // We always start from the WaitAcquireFor* or Error states. + // + // WaitAcquireForPrepareStart -> Preparing -> CheckpointReady + // | | + // v v + // Error Error + // + // WaitAcquireForPrepareContinue -> Preparing -> CheckpointReady + // | | + // v v + // Error Error + // + // WaitAcquireForRead -> CheckpointReady + // | + // v + // Error + + enum class EActorState + { + // We are waiting for the acquire of the shadow disk devices. The shadow + // disk has not filled up yet and we are starting from the beginning of + // the disk. + // We do not block writes to the source disk. + WaitAcquireForPrepareStart, + + // Waiting for the acquire of the shadow disk devices. The + // shadow disk has already been partially filled, and we are not + // starting from the beginning of the disk. + // We block writes to the source disk. + WaitAcquireForPrepareContinue, + + // Waiting for the acquire of the shadow disk devices. The disk is + // already completely filled and we will only read checkpoint data from it. + WaitAcquireForRead, + + // The devices of the shadow disk have been successfully acquired and we + // are currently filling it. + Preparing, + + // The devices of the shadow disk have been successfully acquired and we + // are ready to read the checkpoint data. + CheckpointReady, + + // Something went wrong and we stopped filling the shadow disk. + // At the same time, we do not interfere with the operation of the + // source disk. + Error, + }; + + const TStorageConfigPtr Config; + const NRdma::IClientPtr RdmaClient; + const TNonreplicatedPartitionConfigPtr SrcConfig; + const TString CheckpointId; + const TString SourceDiskId; + const TString ShadowDiskId; + const ui64 MountSeqNumber = 0; + const ui32 Generation = 0; + const NActors::TActorId VolumeActorId; + const NActors::TActorId SrcActorId; + + TNonreplicatedPartitionConfigPtr DstConfig; + NActors::TActorId DstActorId; + ui64 ProcessedBlockCount = 0; + + EActorState State = EActorState::Error; + std::optional TimeoutCalculator; + + NActors::TActorId AcquireActorId; + // The list of devices received on first acquire. + TDevices ShadowDiskDevices; + + bool ForcedReAcquireInProgress = false; + +public: + TShadowDiskActor( + TStorageConfigPtr config, + NRdma::IClientPtr rdmaClient, + IProfileLogPtr profileLog, + IBlockDigestGeneratorPtr digestGenerator, + TString rwClientId, + ui64 mountSeqNumber, + ui32 generation, + TNonreplicatedPartitionConfigPtr srcConfig, + NActors::TActorId volumeActorId, + NActors::TActorId srcActorId, + const TActiveCheckpointInfo& checkpointInfo); + + ~TShadowDiskActor() override; + + // IMigrationOwner implementation + void OnBootstrap(const NActors::TActorContext& ctx) override; + bool OnMessage( + const NActors::TActorContext& ctx, + TAutoPtr& ev) override; + TDuration CalculateMigrationTimeout() override; + void OnMigrationProgress( + const NActors::TActorContext& ctx, + ui64 migrationIndex) override; + void OnMigrationFinished(const NActors::TActorContext& ctx) override; + void OnMigrationError(const NActors::TActorContext& ctx) override; + +private: + void AcquireShadowDisk( + const NActors::TActorContext& ctx, + EAcquireReason acquireReason); + void HandleAcquireDiskResponse( + const TEvDiskRegistry::TEvAcquireDiskResponse::TPtr& ev, + const NActors::TActorContext& ctx); + + void CreateShadowDiskConfig(); + void CreateShadowDiskPartitionActor( + const NActors::TActorContext& ctx, + const TDevices& acquiredShadowDiskDevices); + void SetErrorState(const NActors::TActorContext& ctx); + void SchedulePeriodicalReAcquire(const NActors::TActorContext& ctx); + + // If we haven't started migrating to the shadow disk yet, we can send + // write and zero requests directly to the source disk. + bool CanJustForwardWritesToSrcDisk() const; + + // If the shadow disk is only partially filled, and it is not ready to + // write (because it is not acquired), we reject writes to the source disk. + bool AreWritesToSrcDiskForbidden() const; + + // If the shadow disk is not acquired, or has lost acquiring, then user + // writes to the source disk will not be considered completed, the client + // will repeat write attempts. Since we don't want to slow down the + // client's writing for a long time, we need to keep track of the time + // during which the writing did not occur in order to stop attempts to + // fill a broken shadow disk. + bool AreWritesToSrcDiskImpossible() const; + + bool WaitingForAcquire() const; + bool Acquired() const; + bool ReadOnlyMount() const; + + template + void ForwardRequestToSrcPartition( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx); + + template + void ForwardRequestToShadowPartition( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx); + + template + void HandleReadBlocks( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx); + + template + bool HandleWriteZeroBlocks( + const typename TMethod::TRequest::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleUpdateShadowDiskStateResponse( + const TEvVolumePrivate::TEvUpdateShadowDiskStateResponse::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleUpdateCounters( + const TEvNonreplPartitionPrivate::TEvUpdateCounters::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleShadowDiskCounters( + const TEvVolume::TEvDiskRegistryBasedPartitionCounters::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleWakeup( + const NActors::TEvents::TEvWakeup::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleRdmaUnavailable( + const TEvVolume::TEvRdmaUnavailable::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleReacquireDisk( + const TEvVolume::TEvReacquireDisk::TPtr& ev, + const NActors::TActorContext& ctx); +}; + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/actors/ya.make b/cloud/blockstore/libs/storage/volume/actors/ya.make index dcdbfad1f92..37aeac95772 100644 --- a/cloud/blockstore/libs/storage/volume/actors/ya.make +++ b/cloud/blockstore/libs/storage/volume/actors/ya.make @@ -4,6 +4,7 @@ SRCS( forward_read_marked.cpp forward_write_and_mark_used.cpp read_disk_registry_based_overlay.cpp + shadow_disk_actor.cpp ) PEERDIR( diff --git a/cloud/blockstore/libs/storage/volume/model/checkpoint.cpp b/cloud/blockstore/libs/storage/volume/model/checkpoint.cpp index 9c6813b3551..b2d65981d45 100644 --- a/cloud/blockstore/libs/storage/volume/model/checkpoint.cpp +++ b/cloud/blockstore/libs/storage/volume/model/checkpoint.cpp @@ -91,9 +91,20 @@ void TCheckpointStore::SetShadowDiskState( ui64 totalBlockCount) { if (auto* checkpointData = ActiveCheckpoints.FindPtr(checkpointId)) { + if (shadowDiskState == EShadowDiskState::Ready) { + checkpointData->Data = ECheckpointData::DataPresent; + } checkpointData->ShadowDiskState = shadowDiskState; checkpointData->ProcessedBlockCount = processedBlockCount; checkpointData->TotalBlockCount = totalBlockCount; + + if (auto* checkpointRequest = + CheckpointRequests.FindPtr(checkpointData->RequestId)) + { + checkpointRequest->ShadowDiskState = shadowDiskState; + checkpointRequest->ShadowDiskProcessedBlockCount = + processedBlockCount; + } } } diff --git a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp index c2a563fb7c4..dee2b9806d2 100644 --- a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp +++ b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp @@ -669,6 +669,7 @@ std::unique_ptr PrepareTestActorRuntime( 0 ) ); + runtime->EnableScheduleForActor(MakeDiskRegistryProxyServiceId()); runtime->AddLocalService( MakeDiskAgentServiceId(runtime->GetNodeId()), diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.h b/cloud/blockstore/libs/storage/volume/volume_actor.h index eb373039e63..6bb0ddaa3a7 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.h +++ b/cloud/blockstore/libs/storage/volume/volume_actor.h @@ -642,6 +642,10 @@ class TVolumeActor final void ProcessNextCheckpointRequest(const NActors::TActorContext& ctx); + void HandleUpdateShadowDiskStateRequest( + const TEvVolumePrivate::TEvUpdateShadowDiskStateRequest::TPtr& ev, + const NActors::TActorContext& ctx); + void HandleTabletStatus( const TEvBootstrapper::TEvStatus::TPtr& ev, const NActors::TActorContext& ctx); @@ -916,6 +920,12 @@ class TVolumeActor final const TEvPartitionCommonPrivate::TEvLongRunningOperation::TPtr& ev, const NActors::TActorContext& ctx); + NActors::TActorId WrapNonreplActorIfNeeded( + const NActors::TActorContext& ctx, + NActors::TActorId nonreplicatedActorId, + std::shared_ptr srcConfig); + + void RestartDiskRegistryBasedPartition(const NActors::TActorContext& ctx); void StartPartitionsImpl(const NActors::TActorContext& ctx); BLOCKSTORE_VOLUME_REQUESTS(BLOCKSTORE_IMPLEMENT_REQUEST, TEvVolume) diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp index 04d9e870a1c..40ef5e2bdcc 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_checkpoint.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -1331,6 +1332,11 @@ void TVolumeActor::CompleteUpdateCheckpointRequest( args.Completed ? "completed" : "rejected", args.ShadowDiskId.Quote().c_str()); + const bool needToDestroyShadowActor = + (request.ReqType == ECheckpointRequestType::Delete || + request.ReqType == ECheckpointRequestType::DeleteData) && + State->GetCheckpointStore().HasShadowActor(request.CheckpointId); + State->SetCheckpointRequestFinished( request, args.Completed, @@ -1338,6 +1344,15 @@ void TVolumeActor::CompleteUpdateCheckpointRequest( args.ShadowDiskState); CheckpointRequests.erase(request.RequestId); + const bool needToCreateShadowActor = + request.ReqType == ECheckpointRequestType::Create && + !request.ShadowDiskId.Empty() && + !State->GetCheckpointStore().HasShadowActor(request.CheckpointId); + + if (needToDestroyShadowActor || needToCreateShadowActor) { + RestartDiskRegistryBasedPartition(ctx); + } + if (request.Type == ECheckpointType::Light) { const bool needReply = args.RequestInfo && SelfId() != args.RequestInfo->Sender; diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_resync.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_resync.cpp index 96c98ccec03..3e40f958ae1 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_resync.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_resync.cpp @@ -167,9 +167,7 @@ void TVolumeActor::CompleteToggleResync( Y_UNUSED(args); if (args.ResyncWasNeeded != State->IsMirrorResyncNeeded()) { - StopPartitions(ctx); - StartPartitionsForUse(ctx); - ResetServicePipes(ctx); + RestartDiskRegistryBasedPartition(ctx); } else { State->SetReadWriteError({}); } diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp index 580b74ac1bf..883a31b8a44 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp @@ -11,14 +11,14 @@ #include #include #include - +#include #include +#include + #include #include -#include - namespace NCloud::NBlockStore::NStorage { using namespace NActors; @@ -252,8 +252,53 @@ void TVolumeActor::SetupDiskRegistryBasedPartitions(const TActorContext& ctx) } State->SetDiskRegistryBasedPartitionActor( - nonreplicatedActorId, - std::move(nonreplicatedConfig)); + WrapNonreplActorIfNeeded(ctx, nonreplicatedActorId, nonreplicatedConfig), + nonreplicatedConfig); +} + +NActors::TActorId TVolumeActor::WrapNonreplActorIfNeeded( + const TActorContext& ctx, + NActors::TActorId nonreplicatedActorId, + std::shared_ptr srcConfig) +{ + for (const auto& [checkpointId, checkpointInfo]: + State->GetCheckpointStore().GetActiveCheckpoints()) + { + if (checkpointInfo.Data == ECheckpointData::DataDeleted || + checkpointInfo.ShadowDiskId.Empty() || + State->GetCheckpointStore().HasShadowActor(checkpointId)) + { + continue; + } + + nonreplicatedActorId = NCloud::Register( + ctx, + Config, + GetRdmaClient(), + ProfileLog, + BlockDigestGenerator, + State->GetReadWriteAccessClientId(), + State->GetMountSeqNumber(), + Executor()->Generation(), + srcConfig, + SelfId(), + nonreplicatedActorId, + checkpointInfo); + + State->GetCheckpointStore().ShadowActorCreated(checkpointId); + } + return nonreplicatedActorId; +} + +void TVolumeActor::RestartDiskRegistryBasedPartition(const TActorContext& ctx) +{ + if (!IsDiskRegistryMediaKind(State->GetConfig().GetStorageMediaKind())) { + return; + } + + StopPartitions(ctx); + StartPartitionsForUse(ctx); + ResetServicePipes(ctx); } void TVolumeActor::StartPartitionsImpl(const TActorContext& ctx) @@ -296,6 +341,12 @@ void TVolumeActor::StopPartitions(const TActorContext& ctx) return; } + for (const auto& [checkpointId, _]: + State->GetCheckpointStore().GetActiveCheckpoints()) + { + State->GetCheckpointStore().ShadowActorDestroyed(checkpointId); + } + for (auto& part : State->GetPartitions()) { // Reset previous boot attempts part.RetryCookie.Detach(); diff --git a/cloud/blockstore/libs/storage/volume/volume_ut_checkpoint.cpp b/cloud/blockstore/libs/storage/volume/volume_ut_checkpoint.cpp index 71fc7f205e5..4a46c87a428 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut_checkpoint.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_ut_checkpoint.cpp @@ -3138,7 +3138,8 @@ Y_UNIT_TEST_SUITE(TVolumeCheckpointTest) GetBlockContent(42)); } - Y_UNIT_TEST(ShouldCreateCheckpointWithShadowDisk) + void DoShouldCreateCheckpointWithShadowDisk( + NProto::EStorageMediaKind mediaKind) { NProto::TStorageServiceConfig config; config.SetUseShadowDisksForNonreplDiskCheckpoints(true); @@ -3163,8 +3164,7 @@ Y_UNIT_TEST_SUITE(TVolumeCheckpointTest) }; runtime->SetObserverFunc(countAllocateDeallocateDiskRequest); - const auto expectedBlockCount = - DefaultDeviceBlockSize * DefaultDeviceBlockCount / DefaultBlockSize; + const ui64 expectedBlockCount = 32768; TVolumeClient volume(*runtime); volume.UpdateVolumeConfig( @@ -3174,7 +3174,7 @@ Y_UNIT_TEST_SUITE(TVolumeCheckpointTest) 0, false, 1, - NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + mediaKind, expectedBlockCount); volume.WaitReady(); @@ -3185,9 +3185,9 @@ Y_UNIT_TEST_SUITE(TVolumeCheckpointTest) 0); volume.AddClient(clientInfo); - // Write some data. + // Write all '1' to block 1. volume.WriteBlocks( - TBlockRange64::MakeOneBlock(0), + TBlockRange64::MakeOneBlock(1), clientInfo.GetClientId(), GetBlockContent(1)); @@ -3199,11 +3199,8 @@ Y_UNIT_TEST_SUITE(TVolumeCheckpointTest) UNIT_ASSERT_VALUES_EQUAL(1, allocateRequestCount); UNIT_ASSERT_VALUES_EQUAL(0, deallocateRequestCount); - // Writes to the disk are not blocked. - volume.WriteBlocks( - TBlockRange64::MakeOneBlock(0), - clientInfo.GetClientId(), - GetBlockContent(2)); + // Reconnect pipe since partition has restarted. + volume.ReconnectPipe(); { // Validate checkpoint state (not ready). auto status = volume.GetCheckpointStatus("c1"); @@ -3212,53 +3209,133 @@ Y_UNIT_TEST_SUITE(TVolumeCheckpointTest) status->Record.GetCheckpointStatus()); } - using EReason = - TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; - // Set Checkpoint fill in progress. - volume.UpdateShadowDiskState( - "c1", - EReason::FillProgressUpdate, - 10, - expectedBlockCount); + auto tryWriteBlock = [&](ui64 blockIndx, ui8 content) -> bool + { + auto request = volume.CreateWriteBlocksRequest( + TBlockRange64::MakeOneBlock(blockIndx), + clientInfo.GetClientId(), + GetBlockContent(content)); + volume.SendToPipe(std::move(request)); + auto response = + volume.RecvResponse(); + if (response->GetStatus() != S_OK) { + UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL( + "Request WriteBlocks intersects with currently migrated " + "range", + response->GetError().GetMessage()); + return false; + } + return true; + }; - { // Validate checkpoint state (not ready). - auto status = volume.GetCheckpointStatus("c1"); - UNIT_ASSERT_EQUAL( - NProto::ECheckpointStatus::NOT_READY, - status->Record.GetCheckpointStatus()); - } + auto tryReadBlock = + [&](ui64 blockIndx, ui8 content, const TString& checkpoint) -> bool + { + auto request = volume.CreateReadBlocksRequest( + TBlockRange64::MakeOneBlock(blockIndx), + clientInfo.GetClientId(), + checkpoint); + volume.SendToPipe(std::move(request)); + auto response = + volume.RecvResponse(); + if (response->GetStatus() != S_OK) { + UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL( + "Can't read from checkpoint \"c1\" while the data is being " + "filled in.", + response->GetError().GetMessage()); + return false; + } + const auto& bufs = response->Record.GetBlocks().GetBuffers(); + UNIT_ASSERT_VALUES_EQUAL(1, bufs.size()); + UNIT_ASSERT_VALUES_EQUAL(GetBlockContent(content), bufs[0]); + return true; + }; - volume.UpdateShadowDiskState( - "c1", - EReason::FillCompleted, - expectedBlockCount, - expectedBlockCount); + // The index of the block that is recorded during the filling of the + // shadow disk. We use it later to check the contents of the disk and + // the checkpoint. + ui64 blockIndxToVerify = 0; + + // Writes to the disk are not blocked. But may overlap with migrating + // blocks. + for (ui64 i = 0;; ++i) { + ui64 blockIndx = i % expectedBlockCount; + ui8 content = i % 256; + + // We are trying to write the block to the disk. It may fail if it + // intersects with the current migrated range. + if (tryWriteBlock(blockIndx, content)) { + // We save the index of the successfully written block with + // non-zero data to check the contents of the disk and the + // checkpoint later. + if (content != 0 && blockIndxToVerify == 0) { + blockIndxToVerify = blockIndx; + } - { // Validate checkpoint state (ready). - auto status = volume.GetCheckpointStatus("c1"); - UNIT_ASSERT_EQUAL( - NProto::ECheckpointStatus::READY, - status->Record.GetCheckpointStatus()); + // Reading from disk should always be successful. + UNIT_ASSERT(tryReadBlock(blockIndx, content, "")); + + // Reading from the checkpoint will be successful when the + // checkpoint preparation is completed. + if (tryReadBlock(0, 0, "c1")) { + // Validate checkpoint state (ready). + auto status = volume.GetCheckpointStatus("c1"); + UNIT_ASSERT_EQUAL( + NProto::ECheckpointStatus::READY, + status->Record.GetCheckpointStatus()); + break; + } + + // Validate checkpoint state (not ready). + auto status = volume.GetCheckpointStatus("c1"); + UNIT_ASSERT_EQUAL( + NProto::ECheckpointStatus::NOT_READY, + status->Record.GetCheckpointStatus()); + } + + // Advance migration. + runtime->DispatchEvents({}, TDuration::MilliSeconds(250)); } - // Writes to the disk are not blocked. - volume.WriteBlocks( - TBlockRange64::MakeOneBlock(0), - clientInfo.GetClientId(), - GetBlockContent(2)); + // Check that the recording to the disk has happened. + UNIT_ASSERT_UNEQUAL(0, blockIndxToVerify); + + // Read block blockIndxToVerify from disk. It should contain valid data. + UNIT_ASSERT( + tryReadBlock(blockIndxToVerify, blockIndxToVerify % 256, "")); + + // Write all '0' to block blockIndxToVerify. + UNIT_ASSERT(tryWriteBlock(blockIndxToVerify, 0)); + + // Read block blockIndxToVerify from the disk. It should contain all + // '0'. + UNIT_ASSERT(tryReadBlock(blockIndxToVerify, 0, "")); - // TODO(drbasic) read from checkpoint (success). + // Read block blockIndxToVerify from checkpoint. It should contain valid + // data. + UNIT_ASSERT( + tryReadBlock(blockIndxToVerify, blockIndxToVerify % 256, "c1")); // Delete checkpoint data. volume.DeleteCheckpointData("c1"); UNIT_ASSERT_VALUES_EQUAL(1, allocateRequestCount); UNIT_ASSERT_VALUES_EQUAL(1, deallocateRequestCount); - { // Validate checkpoint state (error). - auto status = volume.GetCheckpointStatus("c1"); - UNIT_ASSERT_EQUAL( - NProto::ECheckpointStatus::ERROR, - status->Record.GetCheckpointStatus()); + // Reconnect pipe since partition has restarted. + volume.ReconnectPipe(); + + // Read from checkpoint without data should fail. + { + auto request = volume.CreateReadBlocksRequest( + TBlockRange64::MakeOneBlock(0), + clientInfo.GetClientId(), + "c1"); + volume.SendToPipe(std::move(request)); + auto response = + volume.RecvResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_NOT_FOUND, response->GetStatus()); } // Delete checkpoint. @@ -3266,12 +3343,458 @@ Y_UNIT_TEST_SUITE(TVolumeCheckpointTest) UNIT_ASSERT_VALUES_EQUAL(1, allocateRequestCount); UNIT_ASSERT_VALUES_EQUAL(2, deallocateRequestCount); - { // Checkpoint state not found for deleted checkpoint. - volume.SendGetCheckpointStatusRequest("c1"); - auto response = volume.RecvGetCheckpointStatusResponse(); - UNIT_ASSERT_VALUES_EQUAL(E_NOT_FOUND, response->GetStatus()); + // Reconnect pipe since partition has restarted. + volume.ReconnectPipe(); + + // Write OK. + UNIT_ASSERT(tryWriteBlock(0, 4)); + + // Read block 0 from the disk. It should contain all '4'. + UNIT_ASSERT(tryReadBlock(0, 4, "")); + } + + Y_UNIT_TEST(ShouldCreateCheckpointWithShadowDiskNonrepl) + { + DoShouldCreateCheckpointWithShadowDisk( + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED); + } + + Y_UNIT_TEST(ShouldCreateCheckpointWithShadowDiskMirror2) + { + DoShouldCreateCheckpointWithShadowDisk( + NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR2); + } + + Y_UNIT_TEST(ShouldCreateCheckpointWithShadowDiskMirror3) + { + DoShouldCreateCheckpointWithShadowDisk( + NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3); + } + + Y_UNIT_TEST(ShouldCreateCheckpointWithShadowDiskHddNonrepl) + { + DoShouldCreateCheckpointWithShadowDisk( + NCloud::NProto::STORAGE_MEDIA_HDD_NONREPLICATED); + } + + void ShouldRetryWhenAcquiringShadowDisk(ui32 requestMessage, ui32 responseMessage) + { + NProto::TStorageServiceConfig config; + config.SetUseShadowDisksForNonreplDiskCheckpoints(true); + auto runtime = PrepareTestActorRuntime(config); + + bool simulateNondelivery = true; + bool simulateErrorResponse = true; + auto describeDiskRequestsFilter = [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == requestMessage && + simulateNondelivery) + { // Simulate non-delivery describe message to DiskRegistry. + auto sendTo = event->Sender; + runtime->Send( + new IEventHandle( + sendTo, + sendTo, + event->ReleaseBase().Release(), + 0, + event->Cookie, + nullptr), + 0); + simulateNondelivery = false; + return TTestActorRuntime::EEventAction::DROP; + } + + if (event->GetTypeRewrite() == responseMessage && + simulateErrorResponse) + { // Simulate response with error from DiskRegistry. + if (event->GetTypeRewrite() == + TEvDiskRegistry::EvDescribeDiskResponse) + { + auto* msg = + event->Get(); + msg->Record.MutableError()->SetCode(E_REJECTED); + } + if (event->GetTypeRewrite() == + TEvDiskRegistry::EvAcquireDiskResponse) + { + auto* msg = + event->Get(); + msg->Record.MutableError()->SetCode(E_REJECTED); + } + simulateErrorResponse = false; + } + return TTestActorRuntime::DefaultObserverFunc(event); + }; + runtime->SetObserverFunc(describeDiskRequestsFilter); + + // Create volume. + TVolumeClient volume(*runtime); + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + 32768); + + volume.WaitReady(); + + auto clientInfo = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + volume.AddClient(clientInfo); + + // Create checkpoint. + volume.CreateCheckpoint("c1"); + + // Reconnect pipe since partition has restarted. + volume.ReconnectPipe(); + + // Wait for shadow disk become ready. + for (;;) { + // Validate checkpoint state (ready). + auto status = + volume.GetCheckpointStatus("c1")->Record.GetCheckpointStatus(); + + if (status == NProto::ECheckpointStatus::READY) { + break; + } + + if (status == NProto::ECheckpointStatus::ERROR) { + UNIT_ASSERT_C(false, "Got error status for checkpoint"); + } + + // Advance shadow disk fill. + runtime->DispatchEvents({}, TDuration::MilliSeconds(250)); + } + + // Check that interceptions of messages have occurred. + UNIT_ASSERT(!simulateNondelivery); + UNIT_ASSERT(!simulateErrorResponse); + } + + Y_UNIT_TEST(ShouldRetryDescribeShadowDisk) + { + ShouldRetryWhenAcquiringShadowDisk( + TEvDiskRegistry::EvDescribeDiskRequest, + TEvDiskRegistry::EvDescribeDiskResponse); + } + + Y_UNIT_TEST(ShouldRetryAcquireShadowDisk) + { + ShouldRetryWhenAcquiringShadowDisk( + TEvDiskRegistry::EvAcquireDiskRequest, + TEvDiskRegistry::EvAcquireDiskResponse); + } + + Y_UNIT_TEST(ShouldStopAcquiringAfterTimeout) + { + NProto::TStorageServiceConfig config; + config.SetUseShadowDisksForNonreplDiskCheckpoints(true); + config.SetMaxAcquireShadowDiskTotalTimeoutWhenNonBlocked(2000); + + auto runtime = PrepareTestActorRuntime(config); + + auto describeDiskRequestsFilter = [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskRegistry::EvDescribeDiskResponse) + { // Simulate response with error from DiskRegistry. + auto* msg = + event->Get(); + msg->Record.MutableError()->SetCode(E_REJECTED); + } + return TTestActorRuntime::DefaultObserverFunc(event); + }; + runtime->SetObserverFunc(describeDiskRequestsFilter); + + // Create volume. + TVolumeClient volume(*runtime); + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + 32768); + + volume.WaitReady(); + + auto clientInfo = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + volume.AddClient(clientInfo); + + // Create checkpoint. + volume.CreateCheckpoint("c1"); + + // Reconnect pipe since partition has restarted. + volume.ReconnectPipe(); + + // Wait for shadow disk enter to the error state. + for (;;) { + auto status = + volume.GetCheckpointStatus("c1")->Record.GetCheckpointStatus(); + + if (status == NProto::ECheckpointStatus::READY) { + UNIT_ASSERT_C(false, "Got ready status for checkpoint"); + } + + if (status == NProto::ECheckpointStatus::ERROR) { + break; + } + + // Advance time. + runtime->DispatchEvents({}, TDuration::MilliSeconds(250)); } } + + Y_UNIT_TEST(ShouldBlockWritesWhenReAcquire) + { + NProto::TStorageServiceConfig config; + config.SetUseShadowDisksForNonreplDiskCheckpoints(true); + config.SetMaxShadowDiskFillBandwidth(1); + auto runtime = PrepareTestActorRuntime(config); + + const ui64 expectedBlockCount = 32768; + + TVolumeClient volume(*runtime); + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + expectedBlockCount); + + volume.WaitReady(); + + auto clientInfo = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + volume.AddClient(clientInfo); + + auto tryWriteBlock = [&](ui64 blockIndx, ui8 content) -> bool + { + auto request = volume.CreateWriteBlocksRequest( + TBlockRange64::MakeOneBlock(blockIndx), + clientInfo.GetClientId(), + GetBlockContent(content)); + volume.SendToPipe(std::move(request)); + auto response = + volume.RecvResponse(); + if (response->GetStatus() != S_OK) { + UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL( + "Can't write to source disk while shadow disk \"vol0c1\" " + "not ready yet.", + response->GetError().GetMessage()); + return false; + } + return true; + }; + + auto tryReadBlock = [&](ui64 blockIndx) -> bool + { + auto request = volume.CreateReadBlocksRequest( + TBlockRange64::MakeOneBlock(blockIndx), + clientInfo.GetClientId()); + volume.SendToPipe(std::move(request)); + auto response = + volume.RecvResponse(); + if (response->GetStatus() != S_OK) { + return false; + } + return true; + }; + + // Create checkpoint. + volume.CreateCheckpoint("c1"); + + // Reconnect pipe since partition has restarted. + volume.ReconnectPipe(); + + // We check that attempts to write to the disk are not blocked. + UNIT_ASSERT(tryWriteBlock(expectedBlockCount - 1, 0)); + + { // Emulate partial disk fill + using EReason = + TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; + + auto request = std::make_unique< + TEvVolumePrivate::TEvUpdateShadowDiskStateRequest>( + "c1", + EReason::FillProgressUpdate, + 128, + expectedBlockCount); + + volume.SendToPipe(std::move(request)); + runtime->DispatchEvents({}, TDuration::MilliSeconds(100)); + } + + // Steal the acquire response. + // We will return it later to complete the shadow disk acquiring. + std::unique_ptr stolenAcquireResponse; + auto stealAcquireResponse = [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskRegistry::EvAcquireDiskResponse) + { + stolenAcquireResponse.reset(event.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::DefaultObserverFunc(event); + }; + auto oldFilter = runtime->SetObserverFunc(stealAcquireResponse); + + // Reboot volume tablet. + NKikimr::RebootTablet(*runtime, TestTabletId, volume.GetSender()); + volume.ReconnectPipe(); + + // Check that the acquire response was stolen. + UNIT_ASSERT(stolenAcquireResponse); + runtime->SetObserverFunc(oldFilter); + + // We check that attempts to write to the disk are blocked. Since the + // shadow disk is partially filled, and the shadow disk acquiring has + // not completed yet. + UNIT_ASSERT(!tryWriteBlock(expectedBlockCount - 1, 0)); + UNIT_ASSERT(tryReadBlock(expectedBlockCount - 1)); + + // Return stolen acquire response. The acquiring should be completed. + runtime->Send(stolenAcquireResponse.release()); + runtime->DispatchEvents({}, TDuration::MilliSeconds(250)); + + // We check that attempts to write to the disk are not blocked. + UNIT_ASSERT(tryWriteBlock(expectedBlockCount - 1, 0)); + UNIT_ASSERT(tryReadBlock(expectedBlockCount - 1)); + } + + Y_UNIT_TEST(ShouldBlockWritesWhenReAcquireForShortTime) + { + NProto::TStorageServiceConfig config; + config.SetUseShadowDisksForNonreplDiskCheckpoints(true); + config.SetMaxShadowDiskFillBandwidth(1); + config.SetMaxAcquireShadowDiskTotalTimeoutWhenBlocked(5000); + auto runtime = PrepareTestActorRuntime(config); + + const ui64 expectedBlockCount = 32768; + + TVolumeClient volume(*runtime); + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, + NCloud::NProto::STORAGE_MEDIA_SSD_NONREPLICATED, + expectedBlockCount); + + volume.WaitReady(); + + auto clientInfo = CreateVolumeClientInfo( + NProto::VOLUME_ACCESS_READ_WRITE, + NProto::VOLUME_MOUNT_LOCAL, + 0); + volume.AddClient(clientInfo); + + auto tryWriteBlock = [&](ui64 blockIndx, ui8 content) -> bool + { + auto request = volume.CreateWriteBlocksRequest( + TBlockRange64::MakeOneBlock(blockIndx), + clientInfo.GetClientId(), + GetBlockContent(content)); + volume.SendToPipe(std::move(request)); + auto response = + volume.RecvResponse(); + if (response->GetStatus() != S_OK) { + UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetStatus()); + UNIT_ASSERT_VALUES_EQUAL( + "Can't write to source disk while shadow disk \"vol0c1\" " + "not ready yet.", + response->GetError().GetMessage()); + return false; + } + return true; + }; + + // Create checkpoint. + volume.CreateCheckpoint("c1"); + + // Reconnect pipe since partition has restarted. + volume.ReconnectPipe(); + + // We check that attempts to write to the disk are not blocked. + UNIT_ASSERT(tryWriteBlock(expectedBlockCount - 1, 0)); + + { // Emulate partial disk fill + using EReason = + TEvVolumePrivate::TUpdateShadowDiskStateRequest::EReason; + + auto request = std::make_unique< + TEvVolumePrivate::TEvUpdateShadowDiskStateRequest>( + "c1", + EReason::FillProgressUpdate, + 128, + expectedBlockCount); + + volume.SendToPipe(std::move(request)); + runtime->DispatchEvents({}, TDuration::MilliSeconds(100)); + } + + // Steal the acquire response. + std::unique_ptr stolenAcquireResponse; + auto stealAcquireResponse = [&](TAutoPtr& event) + { + if (event->GetTypeRewrite() == + TEvDiskRegistry::EvAcquireDiskResponse) + { + stolenAcquireResponse.reset(event.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::DefaultObserverFunc(event); + }; + auto oldFilter = runtime->SetObserverFunc(stealAcquireResponse); + + // Reboot volume tablet. + NKikimr::RebootTablet(*runtime, TestTabletId, volume.GetSender()); + volume.ReconnectPipe(); + + // Check that the acquire response was stolen. + UNIT_ASSERT(stolenAcquireResponse); + runtime->SetObserverFunc(oldFilter); + + // We check that attempts to write to the disk are blocked. Since the + // shadow disk is partially filled, and the shadow disk acquiring has + // not completed yet. + UNIT_ASSERT(!tryWriteBlock(expectedBlockCount - 1, 0)); + + // Wait MaxAcquireShadowDiskTotalTimeoutWhenBlocked timeout. + // After that shadow disk enter to the error state and writes to the + // source disk will be allowed. + runtime->DispatchEvents( + {}, + TDuration::MilliSeconds( + config.GetMaxAcquireShadowDiskTotalTimeoutWhenBlocked())); + + UNIT_ASSERT_EQUAL( + NProto::ECheckpointStatus::ERROR, + volume.GetCheckpointStatus("c1")->Record.GetCheckpointStatus()); + + // We check that attempts to write to the disk are not blocked. + UNIT_ASSERT(tryWriteBlock(expectedBlockCount - 1, 0)); + } + } } // namespace NCloud::NBlockStore::NStorage diff --git a/example/nbs/nbs-storage.txt b/example/nbs/nbs-storage.txt index e5f51ee0421..7e85bed4f39 100644 --- a/example/nbs/nbs-storage.txt +++ b/example/nbs/nbs-storage.txt @@ -6,3 +6,4 @@ NonReplicatedMigrationStartAllowed: true AllocationUnitMirror2SSD: 1 AllocationUnitMirror3SSD: 1 NonReplicatedDontSuspendDevices: true +UseShadowDisksForNonreplDiskCheckpoints: true