diff --git a/cloud/blockstore/libs/daemon/common/bootstrap.cpp b/cloud/blockstore/libs/daemon/common/bootstrap.cpp index 18d00612872..eecd7089098 100644 --- a/cloud/blockstore/libs/daemon/common/bootstrap.cpp +++ b/cloud/blockstore/libs/daemon/common/bootstrap.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -496,6 +497,11 @@ void TBootstrapBase::Init() } STORAGE_INFO("EndpointStorage initialized"); + TEndpointManagerOptions endpointManagerOptions = { + .ClientConfig = Configs->EndpointConfig->GetClientConfig(), + .NbdSocketSuffix = Configs->ServerConfig->GetNbdSocketSuffix(), + }; + EndpointManager = CreateEndpointManager( Timer, Scheduler, @@ -508,8 +514,8 @@ void TBootstrapBase::Init() std::move(sessionManager), std::move(endpointStorage), std::move(endpointListeners), - Configs->EndpointConfig->GetClientConfig(), - Configs->ServerConfig->GetNbdSocketSuffix()); + NBD::CreateDeviceConnectionFactory(Logging, TDuration::Days(1)), + std::move(endpointManagerOptions)); STORAGE_INFO("EndpointManager initialized"); diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp index b3230263dab..3068a51b1d4 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace NCloud::NBlockStore::NServer { @@ -109,7 +110,7 @@ bool CompareRequests( const NProto::TStartEndpointRequest& left, const NProto::TStartEndpointRequest& right) { - Y_DEBUG_ABORT_UNLESS(25 == GetFieldCount()); + Y_DEBUG_ABORT_UNLESS(26 == GetFieldCount()); return left.GetUnixSocketPath() == right.GetUnixSocketPath() && left.GetDiskId() == right.GetDiskId() && left.GetInstanceId() == right.GetInstanceId() @@ -137,7 +138,8 @@ bool CompareRequests( right.GetClientCGroups().begin(), right.GetClientCGroups().end()) && left.GetPersistent() == right.GetPersistent() - && left.GetNbdDeviceFile() == right.GetNbdDeviceFile(); + && left.GetNbdDeviceFile() == right.GetNbdDeviceFile() + && left.GetUseFreeNbdDeviceFile() == right.GetUseFreeNbdDeviceFile(); } bool CompareRequests( @@ -158,6 +160,93 @@ bool CompareRequests( //////////////////////////////////////////////////////////////////////////////// +class TDeviceManager +{ +private: + const TString DevicePrefix; + TVector BusyDevices; + +public: + TDeviceManager(TString devicePrefix) + : DevicePrefix(std::move(devicePrefix)) + { + size_t num = 0; + while (NFs::Exists(GetDeviceName(num))) { + ++num; + } + BusyDevices.resize(num, false); + } + + NProto::TError AcquireDevice(const TString& device) + { + size_t num = 0; + if (!GetDeviceNum(device, num)) { + return MakeError(E_ARGUMENT, TStringBuilder() + << "Couldn't parse nbd device file: " << device); + } + + if (!NFs::Exists(device)) { + return MakeError(E_INVALID_STATE, TStringBuilder() + << "Nbd device file No file for nbd device: " << device); + } + + if (num < BusyDevices.size() && BusyDevices[num]) { + return MakeError(E_INVALID_STATE, TStringBuilder() + << "Nbd device file doesn't exist: " << device); + } + + if (num >= BusyDevices.size()) { + BusyDevices.resize(num + 1, false); + } + + BusyDevices[num] = true; + return {}; + } + + void ReleaseDevice(const TString& device) + { + if (device.empty()) { + return; + } + + size_t num = 0; + bool res = GetDeviceNum(device, num); + Y_ENSURE(res && num < BusyDevices.size() && BusyDevices[num]); + + BusyDevices[num] = false; + } + + TString GetFreeDevice() + { + size_t num = 0; + for (; num < BusyDevices.size(); ++num) { + if (!BusyDevices[num]) { + break; + } + } + + return GetDeviceName(num); + } + +private: + bool GetDeviceNum(const TString& device, size_t& num) + { + if (!device.StartsWith(DevicePrefix)) { + return false; + } + + auto numStr = device.substr(DevicePrefix.size()); + return TryFromString(numStr, num); + } + + TString GetDeviceName(size_t num) + { + return DevicePrefix + ToString(num); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + class TBlockStoreNotImplemented : public IBlockStore { @@ -300,6 +389,7 @@ struct TEndpoint { std::shared_ptr Request; NBD::IDeviceConnectionPtr Device; + NProto::TVolume Volume; }; //////////////////////////////////////////////////////////////////////////////// @@ -316,8 +406,10 @@ class TEndpointManager final const ISessionManagerPtr SessionManager; const IEndpointStoragePtr EndpointStorage; const THashMap EndpointListeners; + const NBD::IDeviceConnectionFactoryPtr NbdDeviceFactory; const TString NbdSocketSuffix; + TDeviceManager NbdDeviceManager; TLog Log; using TRequestStateVariant = std::variant< @@ -354,22 +446,24 @@ class TEndpointManager final ISessionManagerPtr sessionManager, IEndpointStoragePtr endpointStorage, THashMap listeners, - NProto::TClientConfig clientConfig, - TString nbdSocketSuffix) + NBD::IDeviceConnectionFactoryPtr nbdDeviceFactory, + TEndpointManagerOptions options) : Logging(std::move(logging)) , ServerStats(std::move(serverStats)) , Executor(std::move(executor)) , SessionManager(std::move(sessionManager)) , EndpointStorage(std::move(endpointStorage)) , EndpointListeners(std::move(listeners)) - , NbdSocketSuffix(std::move(nbdSocketSuffix)) + , NbdDeviceFactory(std::move(nbdDeviceFactory)) + , NbdSocketSuffix(options.NbdSocketSuffix) + , NbdDeviceManager(options.NbdDevicePrefix) { Log = Logging->CreateLog("BLOCKSTORE_SERVER"); IBlockStorePtr client = std::make_shared(*this); NProto::TClientAppConfig config; - *config.MutableClientConfig() = std::move(clientConfig); + *config.MutableClientConfig() = options.ClientConfig; auto appConfig = std::make_shared(std::move(config)); auto retryPolicy = CreateRetryPolicy(appConfig); @@ -477,10 +571,10 @@ class TEndpointManager final TCallContextPtr ctx, std::shared_ptr request); - NProto::TStartEndpointResponse AlterEndpoint( + NProto::TError AlterEndpoint( TCallContextPtr ctx, - const NProto::TStartEndpointRequest& newRequest, - const NProto::TStartEndpointRequest& oldRequest); + NProto::TStartEndpointRequest newReq, + NProto::TStartEndpointRequest oldReq); NProto::TError OpenAllEndpointSockets( const NProto::TStartEndpointRequest& request, @@ -497,7 +591,10 @@ class TEndpointManager final const NProto::TStartEndpointRequest& request); TResultOrError StartNbdDevice( - std::shared_ptr request); + std::shared_ptr request, + bool restoring); + + void ReleaseNbdDevice(const TString& device, bool restoring); template void RemoveSession(TCallContextPtr ctx, const T& request) @@ -642,8 +739,17 @@ NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( auto it = Endpoints.find(socketPath); if (it != Endpoints.end()) { - const auto& startedEndpoint = *it->second.Request; - return AlterEndpoint(std::move(ctx), *request, startedEndpoint); + auto endpoint = it->second; + auto error = AlterEndpoint(std::move(ctx), *request, *endpoint.Request); + if (HasError(error)) { + return TErrorResponse(error); + } + + NProto::TStartEndpointResponse response; + response.MutableError()->CopyFrom(error); + response.MutableVolume()->CopyFrom(endpoint.Volume); + response.SetNbdDeviceFile(endpoint.Request->GetNbdDeviceFile()); + return response; } auto future = SessionManager->CreateSession(ctx, *request); @@ -658,18 +764,20 @@ NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( return TErrorResponse(error); } - auto deviceOrError = StartNbdDevice(request); + auto deviceOrError = StartNbdDevice(request, restoring); error = deviceOrError.GetError(); if (HasError(error)) { CloseAllEndpointSockets(*request); RemoveSession(std::move(ctx), *request); return TErrorResponse(error); } + auto device = deviceOrError.ExtractResult(); if (!restoring) { error = AddEndpointToStorage(*request); if (HasError(error)) { - deviceOrError.ExtractResult()->Stop(); + device->Stop(); + ReleaseNbdDevice(request->GetNbdDeviceFile(), restoring); CloseAllEndpointSockets(*request); RemoveSession(std::move(ctx), *request); return TErrorResponse(error); @@ -678,7 +786,8 @@ NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( TEndpoint endpoint = { .Request = request, - .Device = deviceOrError.ExtractResult(), + .Device = device, + .Volume = sessionInfo.Volume, }; if (auto c = ServerStats->GetEndpointCounter(request->GetIpcType())) { @@ -689,50 +798,49 @@ NProto::TStartEndpointResponse TEndpointManager::StartEndpointImpl( NProto::TStartEndpointResponse response; response.MutableVolume()->CopyFrom(sessionInfo.Volume); + response.SetNbdDeviceFile(request->GetNbdDeviceFile()); return response; } -NProto::TStartEndpointResponse TEndpointManager::AlterEndpoint( +NProto::TError TEndpointManager::AlterEndpoint( TCallContextPtr ctx, - const NProto::TStartEndpointRequest& newRequest, - const NProto::TStartEndpointRequest& oldRequest) + NProto::TStartEndpointRequest newReq, + NProto::TStartEndpointRequest oldReq) { - const auto& socketPath = newRequest.GetUnixSocketPath(); - - auto startedEndpoint = oldRequest; + const auto& socketPath = newReq.GetUnixSocketPath(); // NBS-3018 - if (!CompareRequests( - oldRequest.GetClientProfile(), - newRequest.GetClientProfile())) - { + if (!CompareRequests(oldReq.GetClientProfile(), newReq.GetClientProfile())) { STORAGE_WARN("Modified ClientProfile will be ignored for endpoint: " << socketPath.Quote()); - startedEndpoint.MutableClientProfile()->CopyFrom( - newRequest.GetClientProfile()); + oldReq.MutableClientProfile()->CopyFrom(newReq.GetClientProfile()); } // CLOUD-98154 - if (oldRequest.GetDeviceName() != newRequest.GetDeviceName()) { + if (oldReq.GetDeviceName() != newReq.GetDeviceName()) { STORAGE_WARN("Modified DeviceName will be ignored for endpoint: " << socketPath.Quote()); - startedEndpoint.SetDeviceName(newRequest.GetDeviceName()); + oldReq.SetDeviceName(newReq.GetDeviceName()); + } + + if (newReq.GetUseFreeNbdDeviceFile() && oldReq.GetNbdDeviceFile()) { + newReq.SetNbdDeviceFile(oldReq.GetNbdDeviceFile()); } - if (CompareRequests(newRequest, startedEndpoint)) { - return TErrorResponse(S_ALREADY, TStringBuilder() + if (CompareRequests(newReq, oldReq)) { + return MakeError(S_ALREADY, TStringBuilder() << "endpoint " << socketPath.Quote() << " has already been started"); } - startedEndpoint.SetVolumeAccessMode(newRequest.GetVolumeAccessMode()); - startedEndpoint.SetVolumeMountMode(newRequest.GetVolumeMountMode()); - startedEndpoint.SetMountSeqNumber(newRequest.GetMountSeqNumber()); + oldReq.SetVolumeAccessMode(newReq.GetVolumeAccessMode()); + oldReq.SetVolumeMountMode(newReq.GetVolumeMountMode()); + oldReq.SetMountSeqNumber(newReq.GetMountSeqNumber()); - if (!CompareRequests(newRequest, startedEndpoint)) { - return TErrorResponse(E_INVALID_STATE, TStringBuilder() + if (!CompareRequests(newReq, oldReq)) { + return MakeError(E_INVALID_STATE, TStringBuilder() << "endpoint " << socketPath.Quote() << " has already been started with other args"); } @@ -740,25 +848,25 @@ NProto::TStartEndpointResponse TEndpointManager::AlterEndpoint( auto future = SessionManager->AlterSession( ctx, socketPath, - newRequest.GetVolumeAccessMode(), - newRequest.GetVolumeMountMode(), - newRequest.GetMountSeqNumber(), - newRequest.GetHeaders()); + newReq.GetVolumeAccessMode(), + newReq.GetVolumeMountMode(), + newReq.GetMountSeqNumber(), + newReq.GetHeaders()); if (auto error = Executor->WaitFor(future); HasError(error)) { - return TErrorResponse(error); + return error; } auto [sessionInfo, error] = Executor->WaitFor(SessionManager->GetSession( ctx, socketPath, - newRequest.GetHeaders())); + newReq.GetHeaders())); if (HasError(error)) { - return TErrorResponse(error); + return error; } - auto listenerIt = EndpointListeners.find(startedEndpoint.GetIpcType()); + auto listenerIt = EndpointListeners.find(oldReq.GetIpcType()); STORAGE_VERIFY( listenerIt != EndpointListeners.end(), TWellKnownEntityTypes::ENDPOINT, @@ -767,11 +875,11 @@ NProto::TStartEndpointResponse TEndpointManager::AlterEndpoint( auto& listener = listenerIt->second; auto alterFuture = listener->AlterEndpoint( - startedEndpoint, + oldReq, sessionInfo.Volume, sessionInfo.Session); - return TErrorResponse(Executor->WaitFor(alterFuture)); + return Executor->WaitFor(alterFuture); } NProto::TStopEndpointResponse TEndpointManager::DoStopEndpoint( @@ -815,6 +923,7 @@ NProto::TStopEndpointResponse TEndpointManager::StopEndpointImpl( } endpoint.Device->Stop(); + ReleaseNbdDevice(endpoint.Request->GetNbdDeviceFile(), false); CloseAllEndpointSockets(*endpoint.Request); RemoveSession(std::move(ctx), *request); @@ -1020,13 +1129,13 @@ NProto::TError TEndpointManager::OpenEndpointSocket( auto ipcType = request.GetIpcType(); auto listenerIt = EndpointListeners.find(ipcType); if (listenerIt == EndpointListeners.end()) { - return TErrorResponse(E_ARGUMENT, TStringBuilder() + return MakeError(E_ARGUMENT, TStringBuilder() << "unsupported endpoint type: " << static_cast(ipcType)); } auto listener = listenerIt->second; if (request.GetUnixSocketPath().size() > UnixSocketPathLengthLimit) { - return TErrorResponse(E_ARGUMENT, TStringBuilder() + return MakeError(E_ARGUMENT, TStringBuilder() << "Length of socket path should not be more than " << UnixSocketPathLengthLimit); } @@ -1105,12 +1214,12 @@ NProto::TError TEndpointManager::DoSwitchEndpoint( }); if (it == Endpoints.end()) { - return TErrorResponse(S_OK); + return MakeError(S_OK); } auto socketPath = it->first; if (IsEndpointRestoring(socketPath)) { - return TErrorResponse(E_REJECTED, "endpoint is restoring now"); + return MakeError(E_REJECTED, "endpoint is restoring now"); } request->SetUnixSocketPath(socketPath); @@ -1135,7 +1244,7 @@ NProto::TError TEndpointManager::SwitchEndpointImpl( auto it = Endpoints.find(socketPath); if (it == Endpoints.end()) { - return TErrorResponse(S_FALSE, TStringBuilder() + return MakeError(S_FALSE, TStringBuilder() << "endpoint " << socketPath.Quote() << " not started"); } @@ -1174,27 +1283,54 @@ NProto::TError TEndpointManager::SwitchEndpointImpl( << ", " << error.GetMessage()); } - return TErrorResponse(error); + return error; } TResultOrError TEndpointManager::StartNbdDevice( - std::shared_ptr request) + std::shared_ptr request, + bool restoring) { - if (request->GetIpcType() != NProto::IPC_NBD || - request->GetNbdDeviceFile().empty()) + if (request->GetIpcType() != NProto::IPC_NBD) { + return NBD::CreateDeviceConnectionStub(); + } + + if (request->HasUseFreeNbdDeviceFile() && + request->GetUseFreeNbdDeviceFile()) { + if (restoring) { + return MakeError(E_ARGUMENT, + "Forbidden 'FreeNbdDeviceFile' flag in restoring endpoints"); + } + + auto nbdDevice = NbdDeviceManager.GetFreeDevice(); + request->SetUseFreeNbdDeviceFile(false); + request->SetNbdDeviceFile(nbdDevice); + } + + if (!request->HasNbdDeviceFile() || !request->GetNbdDeviceFile()) { return NBD::CreateDeviceConnectionStub(); } - auto device = NBD::CreateDeviceConnection( - Logging, - TNetworkAddress(TUnixSocketPath(request->GetUnixSocketPath())), - request->GetNbdDeviceFile(), - TDuration::Days(1)); + if (!restoring) { + if (!request->GetPersistent()) { + return MakeError(E_ARGUMENT, + "Only persistent endpoints can connect to nbd device"); + } + + const auto& nbdDevice = request->GetNbdDeviceFile(); + auto error = NbdDeviceManager.AcquireDevice(nbdDevice); + if (HasError(error)) { + return error; + } + } + NBD::IDeviceConnectionPtr device; try { + TNetworkAddress address(TUnixSocketPath(request->GetUnixSocketPath())); + device = NbdDeviceFactory->Create(address, request->GetNbdDeviceFile()); device->Start(); } catch (...) { + ReleaseNbdDevice(request->GetNbdDeviceFile(), restoring); return MakeError(E_FAIL, CurrentExceptionMessage()); } @@ -1246,6 +1382,19 @@ TFuture TEndpointManager::DoRestoreEndpoints() continue; } + if (request->HasNbdDeviceFile() && request->GetNbdDeviceFile()) { + const auto& nbdDevice = request->GetNbdDeviceFile(); + auto error = NbdDeviceManager.AcquireDevice(nbdDevice); + + if (HasError(error)) { + ReportEndpointRestoringError(); + STORAGE_ERROR("Failed to acquire nbd device" + << ", endpoint: " << request->GetUnixSocketPath().Quote() + << ", error: " << FormatError(error)); + continue; + } + } + auto& headers = *request->MutableHeaders(); if (!headers.GetClientId()) { @@ -1296,6 +1445,16 @@ void TEndpointManager::HandleRestoredEndpoint( }); } +void TEndpointManager::ReleaseNbdDevice(const TString& device, bool restoring) +{ + if (restoring) { + return; + } + + NbdDeviceManager.ReleaseDevice(device); +} + + //////////////////////////////////////////////////////////////////////////////// TFuture TRestoringClient::StartEndpoint( @@ -1323,8 +1482,8 @@ IEndpointManagerPtr CreateEndpointManager( ISessionManagerPtr sessionManager, IEndpointStoragePtr endpointStorage, THashMap listeners, - NProto::TClientConfig clientConfig, - TString nbdSocketSuffix) + NBD::IDeviceConnectionFactoryPtr nbdDeviceFactory, + TEndpointManagerOptions options) { auto manager = std::make_shared( std::move(timer), @@ -1337,8 +1496,8 @@ IEndpointManagerPtr CreateEndpointManager( std::move(sessionManager), std::move(endpointStorage), std::move(listeners), - std::move(clientConfig), - std::move(nbdSocketSuffix)); + std::move(nbdDeviceFactory), + std::move(options)); eventProxy->Register(manager); return manager; } diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager.h b/cloud/blockstore/libs/endpoints/endpoint_manager.h index 2225679ac92..4ed18abd7c9 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager.h +++ b/cloud/blockstore/libs/endpoints/endpoint_manager.h @@ -2,11 +2,13 @@ #include "public.h" +#include #include #include #include #include +#include #include #include #include @@ -39,6 +41,15 @@ struct IEndpointManager //////////////////////////////////////////////////////////////////////////////// +struct TEndpointManagerOptions +{ + NProto::TClientConfig ClientConfig; + TString NbdSocketSuffix; + TString NbdDevicePrefix = "/dev/nbd"; +}; + +//////////////////////////////////////////////////////////////////////////////// + IEndpointManagerPtr CreateEndpointManager( ITimerPtr timer, ISchedulerPtr scheduler, @@ -51,8 +62,8 @@ IEndpointManagerPtr CreateEndpointManager( ISessionManagerPtr sessionManager, IEndpointStoragePtr endpointStorage, THashMap listeners, - NProto::TClientConfig clientConfig, - TString nbdSocketSuffix); + NBD::IDeviceConnectionFactoryPtr nbdDeviceFactory, + TEndpointManagerOptions options); bool AreSameStartEndpointRequests( const NProto::TStartEndpointRequest& left, diff --git a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp index f4824107076..7ed95c19277 100644 --- a/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp +++ b/cloud/blockstore/libs/endpoints/endpoint_manager_ut.cpp @@ -9,24 +9,28 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include #include #include +#include #include #include #include #include #include #include +#include #include @@ -122,6 +126,23 @@ struct TTestSessionManager final //////////////////////////////////////////////////////////////////////////////// +struct TTestDeviceFactory + : public NBD::IDeviceConnectionFactory +{ + TVector Devices; + + NBD::IDeviceConnectionPtr Create( + TNetworkAddress connectAddress, + TString deviceName) override + { + Y_UNUSED(connectAddress); + Devices.push_back(deviceName); + return NBD::CreateDeviceConnectionStub(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + struct TTestEndpoint { NProto::TStartEndpointRequest Request; @@ -142,6 +163,10 @@ class TTestEndpointListener final ui32 AlterEndpointCounter = 0; ui32 SwitchEndpointCounter = 0; + using TStartEndpointHandler = std::function( + const NProto::TStartEndpointRequest& request, + NClient::ISessionPtr session)>; + public: TTestEndpointListener( TFuture result = MakeFuture()) @@ -154,7 +179,13 @@ class TTestEndpointListener final NClient::ISessionPtr session) override { Y_UNUSED(volume); + return StartEndpointHandler(request, session); + } + TStartEndpointHandler StartEndpointHandler = [&] ( + const NProto::TStartEndpointRequest& request, + NClient::ISessionPtr session) + { UNIT_ASSERT(!Endpoints.contains(request.GetUnixSocketPath())); TTestEndpoint endpoint; @@ -168,7 +199,7 @@ class TTestEndpointListener final }); return Result; - } + }; TFuture AlterEndpoint( const NProto::TStartEndpointRequest& request, @@ -222,17 +253,33 @@ class TTestEndpointListener final struct TBootstrap { - const ILoggingServicePtr Logging = CreateLoggingService("console"); - const IBlockStorePtr Service; - const TExecutorPtr Executor = TExecutor::Create("TestService"); - - TBootstrap(IBlockStorePtr service) - : Service(std::move(service)) - {} + const TString DirPath = "./" + CreateGuidAsString(); + ILoggingServicePtr Logging = CreateLoggingService("console"); + ITimerPtr Timer = CreateWallClockTimer(); + std::shared_ptr Scheduler = std::make_shared(); + IBlockStorePtr Service = std::make_shared(); + TExecutorPtr Executor = TExecutor::Create("TestService"); + IRequestStatsPtr RequestStats = CreateRequestStatsStub(); + IVolumeStatsPtr VolumeStats = CreateVolumeStatsStub(); + IServerStatsPtr ServerStats = CreateServerStatsStub(); + ISessionManagerPtr SessionManager; + IEndpointStoragePtr EndpointStorage = CreateFileEndpointStorage(DirPath); + IMutableEndpointStoragePtr MutableStorage = CreateFileMutableEndpointStorage(DirPath); + THashMap EndpointListeners; + NBD::IDeviceConnectionFactoryPtr NbdDeviceFactory; + IEndpointEventProxyPtr EndpointEventHandler = CreateEndpointEventProxy(); + TEndpointManagerOptions Options; + + TBootstrap() + { + MutableStorage->Init(); + } ~TBootstrap() { Stop(); + + MutableStorage->Remove(); } void Start() @@ -367,57 +414,46 @@ std::shared_ptr CreateTestService( //////////////////////////////////////////////////////////////////////////////// -IEndpointManagerPtr CreateEndpointManager( - TBootstrap& bootstrap, - THashMap endpointListeners, - IServerStatsPtr serverStats = CreateServerStatsStub(), - TString nbdSocketSuffix = "", - IEndpointEventProxyPtr endpointEventHandler = CreateEndpointEventProxy()) +IEndpointManagerPtr CreateEndpointManager(TBootstrap& bootstrap) { - TSessionManagerOptions sessionManagerOptions; - sessionManagerOptions.DefaultClientConfig.SetRequestTimeout( - TestRequestTimeout.MilliSeconds()); - - auto encryptionClientFactory = CreateEncryptionClientFactory( - bootstrap.Logging, - CreateDefaultEncryptionKeyProvider()); + if (!bootstrap.SessionManager) { + TSessionManagerOptions sessionManagerOptions; + sessionManagerOptions.DefaultClientConfig.SetRequestTimeout( + TestRequestTimeout.MilliSeconds()); - auto timer = CreateWallClockTimer(); - auto scheduler = CreateSchedulerStub(); - auto requestStats = CreateRequestStatsStub(); - auto volumeStats = CreateVolumeStatsStub(); - - auto sessionManager = CreateSessionManager( - timer, - scheduler, - bootstrap.Logging, - CreateMonitoringServiceStub(), - requestStats, - volumeStats, - serverStats, - bootstrap.Service, - CreateDefaultStorageProvider(bootstrap.Service), - encryptionClientFactory, - bootstrap.Executor, - sessionManagerOptions); + auto encryptionClientFactory = CreateEncryptionClientFactory( + bootstrap.Logging, + CreateDefaultEncryptionKeyProvider()); - const TString dirPath = "./" + CreateGuidAsString(); - auto endpointStorage = CreateFileEndpointStorage(dirPath); + bootstrap.SessionManager = CreateSessionManager( + bootstrap.Timer, + bootstrap.Scheduler, + bootstrap.Logging, + CreateMonitoringServiceStub(), + bootstrap.RequestStats, + bootstrap.VolumeStats, + bootstrap.ServerStats, + bootstrap.Service, + CreateDefaultStorageProvider(bootstrap.Service), + std::move(encryptionClientFactory), + bootstrap.Executor, + std::move(sessionManagerOptions)); + } return NServer::CreateEndpointManager( - timer, - scheduler, + bootstrap.Timer, + bootstrap.Scheduler, bootstrap.Logging, - requestStats, - volumeStats, - serverStats, + bootstrap.RequestStats, + bootstrap.VolumeStats, + bootstrap.ServerStats, bootstrap.Executor, - std::move(endpointEventHandler), - std::move(sessionManager), - std::move(endpointStorage), - std::move(endpointListeners), - {}, // clientConfig - std::move(nbdSocketSuffix)); + bootstrap.EndpointEventHandler, + bootstrap.SessionManager, + bootstrap.EndpointStorage, + bootstrap.EndpointListeners, + bootstrap.NbdDeviceFactory, + bootstrap.Options); } //////////////////////////////////////////////////////////////////////////////// @@ -463,14 +499,14 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) TString diskId = "testDiskId"; auto ipcType = NProto::IPC_GRPC; + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto listener = std::make_shared(); - auto manager = CreateEndpointManager( - bootstrap, - {{ ipcType, listener }}); + bootstrap.EndpointListeners = {{ ipcType, listener }}; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); { @@ -505,14 +541,14 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) auto ipcType = NProto::IPC_GRPC; TString diskId = "testDiskId"; + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto listener = std::make_shared(); - auto manager = CreateEndpointManager( - bootstrap, - {{ ipcType, listener }}); + bootstrap.EndpointListeners = {{ ipcType, listener }}; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); UNIT_ASSERT_VALUES_EQUAL(0, listener->AlterEndpointCounter); @@ -584,16 +620,16 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldHandleListEndpoints) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); - auto manager = CreateEndpointManager( - bootstrap, - { - { NProto::IPC_GRPC, std::make_shared() }, - { NProto::IPC_NBD, std::make_shared() }, - }); + bootstrap.EndpointListeners = { + { NProto::IPC_GRPC, std::make_shared() }, + { NProto::IPC_NBD, std::make_shared() }, + }; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); NProto::TStartEndpointRequest request1; @@ -648,14 +684,14 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldNotStartStopEndpointTwice) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto listener = std::make_shared(); - auto manager = CreateEndpointManager( - bootstrap, - {{ NProto::IPC_GRPC, listener }}); + bootstrap.EndpointListeners = {{ NProto::IPC_GRPC, listener }}; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); auto socketPath = "testSocketPath"; @@ -701,19 +737,18 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldNotStartBusyEndpoint) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto grpcListener = std::make_shared(); auto nbdListener = std::make_shared(); + bootstrap.EndpointListeners = { + { NProto::IPC_GRPC, grpcListener }, + { NProto::IPC_NBD, nbdListener }, + }; - auto manager = CreateEndpointManager( - bootstrap, - { - { NProto::IPC_GRPC, grpcListener }, - { NProto::IPC_NBD, nbdListener }, - }); - + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); auto socketPath = "testSocketPath"; @@ -751,17 +786,16 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldNotMountDiskWhenStartEndpointFailed) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto error = TErrorResponse(E_FAIL, "Endpoint listener is broken"); auto listener = std::make_shared( MakeFuture(error)); + bootstrap.EndpointListeners = {{ NProto::IPC_GRPC, listener }}; - auto manager = CreateEndpointManager( - bootstrap, - {{ NProto::IPC_GRPC, listener }}); - + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); NProto::TStartEndpointRequest request; @@ -778,14 +812,14 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldHandleLocalRequests) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto listener = std::make_shared(); - auto manager = CreateEndpointManager( - bootstrap, - {{ NProto::IPC_GRPC, listener }}); + bootstrap.EndpointListeners = {{ NProto::IPC_GRPC, listener }}; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); auto unixSocket = "testSocket"; @@ -890,15 +924,15 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) ++unmountCounter; }; + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto listener = std::make_shared(); - auto manager = CreateEndpointManager( - bootstrap, - {{ NProto::IPC_VHOST, listener }}, - serverStats); + bootstrap.EndpointListeners = {{ NProto::IPC_VHOST, listener }}; + bootstrap.ServerStats = serverStats; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); auto unixSocket = "testSocket"; @@ -927,16 +961,15 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldNotStartEndpointWithSocketPathLongerThanLimit) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto grpcListener = CreateSocketEndpointListener(bootstrap.Logging, 16); grpcListener->SetClientStorageFactory(CreateClientStorageFactoryStub()); + bootstrap.EndpointListeners = {{ NProto::IPC_GRPC, grpcListener }}; - auto manager = CreateEndpointManager( - bootstrap, - {{ NProto::IPC_GRPC, grpcListener }}); - + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); TString maxSocketPath(UnixSocketPathLengthLimit, 'x'); @@ -979,21 +1012,19 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) TString diskId = "testDiskId"; TString nbdSocketSuffix = "_nbd"; + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto grpcListener = std::make_shared(); auto nbdListener = std::make_shared(); + bootstrap.EndpointListeners = { + { NProto::IPC_GRPC, grpcListener }, + { NProto::IPC_NBD, nbdListener }, + }; + bootstrap.Options.NbdSocketSuffix = nbdSocketSuffix; - auto manager = CreateEndpointManager( - bootstrap, - { - { NProto::IPC_GRPC, grpcListener }, - { NProto::IPC_NBD, nbdListener }, - }, - CreateServerStatsStub(), - nbdSocketSuffix); - + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); { @@ -1038,29 +1069,17 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldIgnoreInstanceIdWhenCompareStartEndpointRequests) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); - - const TString dirPath = "./" + CreateGuidAsString(); - auto endpointStorage = CreateFileEndpointStorage(dirPath); + bootstrap.Service = CreateTestService(mountedVolumes); auto sessionManager = std::make_shared(); - auto manager = NServer::CreateEndpointManager( - CreateWallClockTimer(), - CreateSchedulerStub(), - bootstrap.Logging, - CreateRequestStatsStub(), - CreateVolumeStatsStub(), - CreateServerStatsStub(), - bootstrap.Executor, - CreateEndpointEventProxy(), - sessionManager, - endpointStorage, - {{ NProto::IPC_GRPC, std::make_shared() }}, - {}, // clientConfig - "" // NbdSocketSuffix - ); + bootstrap.SessionManager = sessionManager; + + auto listener = std::make_shared(); + bootstrap.EndpointListeners = {{ NProto::IPC_GRPC, listener }}; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); size_t requestId = 42; @@ -1170,14 +1189,14 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) // NBS-3018, CLOUD-98154 Y_UNIT_TEST(ShouldIgnoreSomeArgsWhenStartEndpointTwice) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); auto listener = std::make_shared(); - auto manager = CreateEndpointManager( - bootstrap, - {{ NProto::IPC_GRPC, listener }}); + bootstrap.EndpointListeners = {{ NProto::IPC_GRPC, listener }}; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); auto socketPath = "testSocketPath"; @@ -1214,18 +1233,14 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) Y_UNIT_TEST(ShouldSwitchEndpointWhenEndpointStarted) { + TBootstrap bootstrap; TMap mountedVolumes; - TBootstrap bootstrap(CreateTestService(mountedVolumes)); + bootstrap.Service = CreateTestService(mountedVolumes); - auto endpointEventHandler = CreateEndpointEventProxy(); auto listener = std::make_shared(); - auto manager = CreateEndpointManager( - bootstrap, - {{ NProto::IPC_VHOST, listener }}, - CreateServerStatsStub(), - "", - endpointEventHandler); + bootstrap.EndpointListeners = {{ NProto::IPC_VHOST, listener }}; + auto manager = CreateEndpointManager(bootstrap); bootstrap.Start(); auto socketPath = "testSocketPath"; @@ -1233,7 +1248,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) { // without started endpoint SwitchEndpointIfNeeded is ignored - auto future = endpointEventHandler->SwitchEndpointIfNeeded( + auto future = bootstrap.EndpointEventHandler->SwitchEndpointIfNeeded( diskId, "test"); auto error = future.GetValue(TDuration::Seconds(5)); UNIT_ASSERT_VALUES_EQUAL_C( @@ -1262,7 +1277,7 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) { // with started endpoint SwitchEndpointIfNeeded leads to // SwitchEndpoint call - auto future = endpointEventHandler->SwitchEndpointIfNeeded( + auto future = bootstrap.EndpointEventHandler->SwitchEndpointIfNeeded( diskId, "test"); auto error = future.GetValue(TDuration::Seconds(5)); @@ -1273,6 +1288,416 @@ Y_UNIT_TEST_SUITE(TEndpointManagerTest) UNIT_ASSERT_VALUES_EQUAL(1, listener->SwitchEndpointCounter); } } + + Y_UNIT_TEST(ShouldStartEndpointWithNbdDevice) + { + TString nbdDevPrefix = CreateGuidAsString() + "_nbd"; + int deviceCount = 6; + for (int i = 0; i < deviceCount; ++i) { + TFsPath(nbdDevPrefix + ToString(i)).Touch(); + } + Y_DEFER { + for (int i = 0; i < deviceCount; ++i) { + TFsPath(nbdDevPrefix + ToString(i)).DeleteIfExists(); + } + }; + + TBootstrap bootstrap; + TMap mountedVolumes; + bootstrap.Service = CreateTestService(mountedVolumes); + + auto listener = std::make_shared(); + bootstrap.EndpointListeners = {{ NProto::IPC_NBD, listener }}; + + auto deviceFactory = std::make_shared(); + bootstrap.NbdDeviceFactory = deviceFactory; + + bootstrap.Options.NbdDevicePrefix = nbdDevPrefix; + + auto manager = CreateEndpointManager(bootstrap); + bootstrap.Start(); + + auto& storage = *bootstrap.EndpointStorage; + google::protobuf::util::MessageDifferencer comparator; + + TString unixSocket = "testSocket"; + TString diskId = "testDiskId"; + TString nbdDevFile = nbdDevPrefix + "0"; + + NProto::TStartEndpointRequest baseRequest; + SetDefaultHeaders(baseRequest); + baseRequest.SetUnixSocketPath(unixSocket); + baseRequest.SetDiskId(diskId); + baseRequest.SetClientId(TestClientId); + baseRequest.SetIpcType(NProto::IPC_NBD); + baseRequest.SetNbdDeviceFile(nbdDevFile); + baseRequest.SetPersistent(true); + + { + auto request = baseRequest; + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT_C(!HasError(response), response.GetError()); + + UNIT_ASSERT(mountedVolumes.contains(diskId)); + UNIT_ASSERT(listener->GetEndpoints().contains(unixSocket)); + + UNIT_ASSERT_VALUES_EQUAL(1, deviceFactory->Devices.size()); + UNIT_ASSERT_VALUES_EQUAL(nbdDevFile, deviceFactory->Devices[0]); + + auto [str, error] = storage.GetEndpoint(request.GetUnixSocketPath()); + UNIT_ASSERT(!HasError(error)); + auto req = DeserializeEndpoint(str); + UNIT_ASSERT(req); + UNIT_ASSERT(comparator.Equals(request, *req)); + } + + { + auto request = baseRequest; + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == S_ALREADY); + UNIT_ASSERT_VALUES_EQUAL(nbdDevFile, response.GetNbdDeviceFile()); + } + + { + auto request = baseRequest; + request.SetUseFreeNbdDeviceFile(true); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == S_ALREADY); + UNIT_ASSERT_VALUES_EQUAL(nbdDevFile, response.GetNbdDeviceFile()); + } + + { + auto request = baseRequest; + request.SetUnixSocketPath(unixSocket + "other"); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_INVALID_STATE); + } + + { + auto request = baseRequest; + request.SetNbdDeviceFile(nbdDevPrefix + "1"); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_INVALID_STATE); + } + + { + auto request = baseRequest; + request.SetNbdDeviceFile(""); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_INVALID_STATE); + } + + { + auto future = StopEndpoint(*manager, unixSocket); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(!HasError(response)); + + UNIT_ASSERT(mountedVolumes.empty()); + UNIT_ASSERT(listener->GetEndpoints().empty()); + } + + // + + { + auto request = baseRequest; + request.SetNbdDeviceFile(""); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT_C(!HasError(response), response.GetError()); + } + + { + auto request = baseRequest; + request.SetNbdDeviceFile(""); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == S_ALREADY); + UNIT_ASSERT_VALUES_EQUAL("", response.GetNbdDeviceFile()); + } + + { + auto request = baseRequest; + request.SetUseFreeNbdDeviceFile(true); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_INVALID_STATE); + } + + { + auto request = baseRequest; + request.SetNbdDeviceFile(nbdDevPrefix + "1"); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_INVALID_STATE); + } + + { + auto future = StopEndpoint(*manager, unixSocket); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(!HasError(response)); + + UNIT_ASSERT(mountedVolumes.empty()); + UNIT_ASSERT(listener->GetEndpoints().empty()); + } + + // + + { + auto request = baseRequest; + request.SetNbdDeviceFile("blabla3"); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_ARGUMENT); + } + + { + auto request = baseRequest; + request.SetPersistent(false); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_ARGUMENT); + } + + // + + int num = 0; + deviceFactory->Devices.clear(); + + for (int i: {0, 1, 3}) { + auto request = baseRequest; + request.SetUnixSocketPath(unixSocket + ToString(num++)); + request.SetNbdDeviceFile(nbdDevPrefix + ToString(i)); + + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT_C(!HasError(response), response.GetError()); + UNIT_ASSERT_VALUES_EQUAL( + nbdDevPrefix + ToString(i), + response.GetNbdDeviceFile()); + + UNIT_ASSERT_VALUES_EQUAL(num, deviceFactory->Devices.size()); + UNIT_ASSERT_VALUES_EQUAL( + nbdDevPrefix + ToString(i), + deviceFactory->Devices[num - 1]); + + auto [str, error] = storage.GetEndpoint(request.GetUnixSocketPath()); + UNIT_ASSERT(!HasError(error)); + auto req = DeserializeEndpoint(str); + UNIT_ASSERT(req); + UNIT_ASSERT(comparator.Equals(request, *req)); + } + + for (int i: {2, 4, 5}) { + auto request = baseRequest; + request.SetUnixSocketPath(unixSocket + ToString(num++)); + request.SetUseFreeNbdDeviceFile(true); + + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT_C(!HasError(response), response.GetError()); + UNIT_ASSERT_VALUES_EQUAL( + nbdDevPrefix + ToString(i), + response.GetNbdDeviceFile()); + + UNIT_ASSERT_VALUES_EQUAL(num, deviceFactory->Devices.size()); + UNIT_ASSERT_VALUES_EQUAL( + nbdDevPrefix + ToString(i), + deviceFactory->Devices[num - 1]); + + auto [str, error] = storage.GetEndpoint(request.GetUnixSocketPath()); + UNIT_ASSERT(!HasError(error)); + auto req = DeserializeEndpoint(str); + UNIT_ASSERT(req); + UNIT_ASSERT(!comparator.Equals(request, *req)); + UNIT_ASSERT_VALUES_EQUAL( + nbdDevPrefix + ToString(i), + req->GetNbdDeviceFile()); + } + + { + auto request = baseRequest; + request.SetUnixSocketPath(unixSocket + ToString(num++)); + request.SetUseFreeNbdDeviceFile(true); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_INVALID_STATE); + } + + for (int i = 0; i < deviceCount; ++i) { + auto future = StopEndpoint(*manager, unixSocket + ToString(i)); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(!HasError(response)); + } + + UNIT_ASSERT(mountedVolumes.empty()); + UNIT_ASSERT(listener->GetEndpoints().empty()); + } + + Y_UNIT_TEST(ShouldRestoreEndpointWithNbdDevice) + { + TString nbdDevPrefix = CreateGuidAsString() + "_nbd"; + int deviceCount = 10; + for (int i = 0; i < deviceCount; ++i) { + TFsPath(nbdDevPrefix + ToString(i)).Touch(); + } + Y_DEFER { + for (int i = 0; i < deviceCount; ++i) { + TFsPath(nbdDevPrefix + ToString(i)).DeleteIfExists(); + } + }; + + TBootstrap bootstrap; + TMap mountedVolumes; + bootstrap.Service = CreateTestService(mountedVolumes); + + auto listener = std::make_shared(); + bootstrap.EndpointListeners = {{ NProto::IPC_NBD, listener }}; + + auto deviceFactory = std::make_shared(); + bootstrap.NbdDeviceFactory = deviceFactory; + + bootstrap.Options.NbdDevicePrefix = nbdDevPrefix; + + auto manager = CreateEndpointManager(bootstrap); + bootstrap.Start(); + + TString unixSocket = "testSocket"; + TString diskId = "testDiskId"; + + NProto::TStartEndpointRequest request; + SetDefaultHeaders(request); + request.SetDiskId(diskId); + request.SetClientId(TestClientId); + request.SetIpcType(NProto::IPC_NBD); + + size_t correctCount = 5; + size_t wrongCount = 3; + + for (size_t i = 0; i < wrongCount + correctCount; ++i) { + request.SetUnixSocketPath(unixSocket + ToString(i)); + + if (i < wrongCount) { + request.SetUseFreeNbdDeviceFile(true); + } else { + request.SetNbdDeviceFile(nbdDevPrefix + ToString(i)); + } + + auto [str, error] = SerializeEndpoint(request); + UNIT_ASSERT_C(!HasError(error), error); + + auto keyOrError = bootstrap.MutableStorage->AddEndpoint( + request.GetUnixSocketPath(), + str); + UNIT_ASSERT_C(!HasError(keyOrError), keyOrError.GetError()); + } + + NMonitoring::TDynamicCountersPtr counters = new NMonitoring::TDynamicCounters(); + InitCriticalEventsCounter(counters); + auto configCounter = counters->GetCounter("AppCriticalEvents/EndpointRestoringError", true); + UNIT_ASSERT_VALUES_EQUAL(0, static_cast(*configCounter)); + + manager->RestoreEndpoints().Wait(); + + UNIT_ASSERT(wrongCount != correctCount); + UNIT_ASSERT_VALUES_EQUAL(wrongCount, static_cast(*configCounter)); + } + + Y_UNIT_TEST(ShouldNotUseRestoringNbdDevices) + { + TString nbdDevPrefix = CreateGuidAsString() + "_nbd"; + int deviceCount = 6; + for (int i = 0; i < deviceCount; ++i) { + TFsPath(nbdDevPrefix + ToString(i)).Touch(); + } + Y_DEFER { + for (int i = 0; i < deviceCount; ++i) { + TFsPath(nbdDevPrefix + ToString(i)).DeleteIfExists(); + } + }; + + TString unixSocket = "testSocket"; + TString diskId = "testDiskId"; + TString nbdDevFile = nbdDevPrefix + "0"; + + TBootstrap bootstrap; + TMap mountedVolumes; + bootstrap.Service = CreateTestService(mountedVolumes); + + auto rejected = MakeFuture(MakeError(E_REJECTED)); + auto listener = std::make_shared(rejected); + listener->StartEndpointHandler = [&] ( + const NProto::TStartEndpointRequest& request, + NClient::ISessionPtr session) + { + UNIT_ASSERT(session); + + if (unixSocket == request.GetUnixSocketPath()) { + return MakeFuture(MakeError(E_REJECTED)); + } + return MakeFuture(); + }; + + bootstrap.EndpointListeners = {{ NProto::IPC_NBD, listener }}; + + auto deviceFactory = std::make_shared(); + bootstrap.NbdDeviceFactory = deviceFactory; + + bootstrap.Options.NbdDevicePrefix = nbdDevPrefix; + + auto manager = CreateEndpointManager(bootstrap); + bootstrap.Start(); + + NProto::TStartEndpointRequest request; + SetDefaultHeaders(request); + request.SetUnixSocketPath(unixSocket); + request.SetDiskId(diskId); + request.SetClientId(TestClientId); + request.SetIpcType(NProto::IPC_NBD); + request.SetNbdDeviceFile(nbdDevFile); + request.SetPersistent(true); + + { + auto [str, error] = SerializeEndpoint(request); + UNIT_ASSERT_C(!HasError(error), error); + + auto keyOrError = bootstrap.MutableStorage->AddEndpoint( + request.GetUnixSocketPath(), + str); + UNIT_ASSERT_C(!HasError(keyOrError), keyOrError.GetError()); + } + + NMonitoring::TDynamicCountersPtr counters = new NMonitoring::TDynamicCounters(); + InitCriticalEventsCounter(counters); + auto configCounter = counters->GetCounter("AppCriticalEvents/EndpointRestoringError", true); + UNIT_ASSERT_VALUES_EQUAL(0, static_cast(*configCounter)); + + manager->RestoreEndpoints(); + bootstrap.Scheduler->RunAllScheduledTasks(); + + UNIT_ASSERT_VALUES_EQUAL(0, static_cast(*configCounter)); + + { + request.SetUnixSocketPath(unixSocket + "other"); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(response.GetError().GetCode() == E_INVALID_STATE); + } + + { + request.SetUnixSocketPath(unixSocket + "other"); + request.SetUseFreeNbdDeviceFile(true); + auto future = StartEndpoint(*manager, request); + auto response = future.GetValue(TDuration::Seconds(5)); + UNIT_ASSERT(!HasError(response)); + UNIT_ASSERT(response.GetNbdDeviceFile() != nbdDevFile); + } + } } } // namespace NCloud::NBlockStore::NServer diff --git a/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp b/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp index f0fe4659f89..8c5ac332b90 100644 --- a/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp +++ b/cloud/blockstore/libs/endpoints/service_endpoint_ut.cpp @@ -269,8 +269,8 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) std::make_shared(), endpointStorage, {{ ipcType, listener }}, - {}, // clientConfig - "" // nbdSocketSuffix + nullptr, // nbdDeviceFactory + {} // options ); NProto::TStartEndpointRequest request; @@ -361,8 +361,8 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) sessionManager, endpointStorage, {{ NProto::IPC_GRPC, std::make_shared() }}, - {}, // clientConfig - "" // nbdSocketSuffix + nullptr, // nbdDeviceFactory + {} // options ); auto endpointService = CreateMultipleEndpointService( @@ -566,8 +566,8 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) std::make_shared(), endpointStorage, {{ NProto::IPC_GRPC, listener }}, - {}, // clientConfig - "" // nbdSocketSuffix + nullptr, // nbdDeviceFactory + {} // options ); endpointManager->RestoreEndpoints().Wait(); @@ -606,9 +606,9 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) CreateEndpointEventProxy(), std::make_shared(), endpointStorage, - {}, // listeners - {}, // clientConfig - "" // nbdSocketSuffix + {}, // listeners + nullptr, // nbdDeviceFactory + {} // options ); endpointManager->RestoreEndpoints().Wait(); @@ -643,9 +643,9 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) CreateEndpointEventProxy(), std::make_shared(), endpointStorage, - {}, // listeners - {}, // clientConfig - "" // nbdSocketSuffix + {}, // listeners + nullptr, // nbdDeviceFactory + {} // options ); endpointManager->RestoreEndpoints().Wait(); @@ -709,8 +709,8 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) std::make_shared(), endpointStorage, {{ NProto::IPC_GRPC, listener }}, - {}, // clientConfig - "" // nbdSocketSuffix + nullptr, // nbdDeviceFactory + {} // options ); auto restoreFuture = endpointManager->RestoreEndpoints(); @@ -787,8 +787,8 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) std::make_shared(), endpointStorage, {{ NProto::IPC_GRPC, std::make_shared() }}, - {}, // clientConfig - "" // nbdSocketSuffix + nullptr, // nbdDeviceFactory + {} // options ); auto future = endpointManager->ListKeyrings( @@ -942,8 +942,8 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) std::make_shared(), endpointStorage, {{ NProto::IPC_GRPC, listener }}, - {}, // clientConfig - "" // nbdSocketSuffix + nullptr, // nbdDeviceFactory + {} // options ); auto unixSocket = "testSocket"; @@ -1059,8 +1059,8 @@ Y_UNIT_TEST_SUITE(TServiceEndpointTest) std::make_shared(), endpointStorage, {{ NProto::IPC_GRPC, listener }}, - {}, // clientConfig - "" // nbdSocketSuffix + nullptr, // nbdDeviceFactory + {} // options ); endpointManager->Start(); diff --git a/cloud/blockstore/libs/nbd/device.cpp b/cloud/blockstore/libs/nbd/device.cpp index f97e79c2786..2c215fbac84 100644 --- a/cloud/blockstore/libs/nbd/device.cpp +++ b/cloud/blockstore/libs/nbd/device.cpp @@ -219,6 +219,33 @@ class TDeviceConnectionStub final {} }; +//////////////////////////////////////////////////////////////////////////////// + +class TDeviceConnectionFactory final + : public IDeviceConnectionFactory +{ +private: + const ILoggingServicePtr Logging; + const TDuration Timeout; + +public: + TDeviceConnectionFactory(ILoggingServicePtr logging, TDuration timeout) + : Logging(std::move(logging)) + , Timeout(std::move(timeout)) + {} + + IDeviceConnectionPtr Create( + TNetworkAddress connectAddress, + TString deviceName) override + { + return CreateDeviceConnection( + Logging, + std::move(connectAddress), + std::move(deviceName), + Timeout); + } +}; + } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -241,4 +268,13 @@ IDeviceConnectionPtr CreateDeviceConnectionStub() return std::make_shared(); } +IDeviceConnectionFactoryPtr CreateDeviceConnectionFactory( + ILoggingServicePtr logging, + TDuration timeout) +{ + return std::make_shared( + std::move(logging), + std::move(timeout)); +} + } // namespace NCloud::NBlockStore::NBD diff --git a/cloud/blockstore/libs/nbd/device.h b/cloud/blockstore/libs/nbd/device.h index bd891b39d7c..f54b6cac2a3 100644 --- a/cloud/blockstore/libs/nbd/device.h +++ b/cloud/blockstore/libs/nbd/device.h @@ -20,6 +20,17 @@ struct IDeviceConnection //////////////////////////////////////////////////////////////////////////////// +struct IDeviceConnectionFactory +{ + virtual ~IDeviceConnectionFactory() = default; + + virtual IDeviceConnectionPtr Create( + TNetworkAddress connectAddress, + TString deviceName) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + IDeviceConnectionPtr CreateDeviceConnection( ILoggingServicePtr logging, TNetworkAddress connectAddress, @@ -28,4 +39,8 @@ IDeviceConnectionPtr CreateDeviceConnection( IDeviceConnectionPtr CreateDeviceConnectionStub(); +IDeviceConnectionFactoryPtr CreateDeviceConnectionFactory( + ILoggingServicePtr logging, + TDuration timeout); + } // namespace NCloud::NBlockStore::NBD diff --git a/cloud/blockstore/libs/nbd/public.h b/cloud/blockstore/libs/nbd/public.h index d4ccdf4968e..1b4a4be5d54 100644 --- a/cloud/blockstore/libs/nbd/public.h +++ b/cloud/blockstore/libs/nbd/public.h @@ -29,6 +29,9 @@ using IClientHandlerPtr = std::shared_ptr; struct IDeviceConnection; using IDeviceConnectionPtr = std::shared_ptr; +struct IDeviceConnectionFactory; +using IDeviceConnectionFactoryPtr = std::shared_ptr; + struct ILimiter; using ILimiterPtr = std::shared_ptr; diff --git a/cloud/blockstore/public/api/protos/endpoints.proto b/cloud/blockstore/public/api/protos/endpoints.proto index c8b1d5e8262..b294fe8cbab 100644 --- a/cloud/blockstore/public/api/protos/endpoints.proto +++ b/cloud/blockstore/public/api/protos/endpoints.proto @@ -88,8 +88,13 @@ message TStartEndpointRequest // Restore endpoint after restart bool Persistent = 24; - // The device file (e.g. "/dev/nbd0") which nbd-client connected to. - string NbdDeviceFile = 25; + oneof NbdDevice { + // The device file (e.g. "/dev/nbd0") which nbd-client will connect to. + string NbdDeviceFile = 25; + + // Use any free nbd device file which nbd-client will connect to. + bool UseFreeNbdDeviceFile = 26; + } } message TStartEndpointResponse @@ -99,6 +104,9 @@ message TStartEndpointResponse // Volume information. TVolume Volume = 2; + + // The device file (e.g. "/dev/nbd0") which nbd-client connected to. + string NbdDeviceFile = 3; } ////////////////////////////////////////////////////////////////////////////////