Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconfigure nbd device on disconnect from the kernel #2791 #2792

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 196 additions & 62 deletions cloud/blockstore/libs/endpoint_proxy/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <cloud/blockstore/libs/nbd/client.h>
#include <cloud/blockstore/libs/nbd/client_handler.h>
#include <cloud/blockstore/libs/nbd/device.h>
#include <cloud/blockstore/libs/nbd/error_handler.h>
#include <cloud/blockstore/libs/nbd/netlink_device.h>
#include <cloud/blockstore/libs/nbd/server.h>
#include <cloud/blockstore/libs/nbd/server_handler.h>
Expand All @@ -19,6 +20,7 @@
#include <cloud/blockstore/public/api/grpc/endpoint_proxy.grpc.pb.h>
#include <cloud/blockstore/public/api/protos/endpoints.pb.h>

#include <cloud/storage/core/libs/common/backoff_delay_provider.h>
#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/common/proto_helpers.h>
#include <cloud/storage/core/libs/common/scheduler.h>
Expand All @@ -30,6 +32,7 @@

#include <library/cpp/logger/log.h>

#include <contrib/libs/grpc/include/grpcpp/alarm.h>
#include <contrib/libs/grpc/include/grpcpp/impl/codegen/completion_queue.h>
#include <contrib/libs/grpc/include/grpcpp/impl/codegen/status.h>
#include <contrib/libs/grpc/include/grpcpp/security/server_credentials.h>
Expand All @@ -55,6 +58,15 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

constexpr auto NBD_CONNECTION_TIMEOUT = TDuration::Days(1);
constexpr auto NBD_RECONFIGURE_CONNECTED = true;
constexpr auto NBD_DELETE_DEVICE = false;

constexpr auto MIN_RECONNECT_DELAY = TDuration::MilliSeconds(100);
constexpr auto MAX_RECONNECT_DELAY = TDuration::Minutes(10);

////////////////////////////////////////////////////////////////////////////////

TString ReadFile(const TString& name)
{
return TFileInput(name).ReadAll();
Expand Down Expand Up @@ -160,6 +172,34 @@ struct TResizeRequestContext: TRequestContextBase
}
};

struct TRestartAlarmContext: TRequestContextBase
{
TString Socket;
grpc::ServerCompletionQueue& CQ;
TBackoffDelayProvider Backoff;
grpc::Alarm Alarm;

TRestartAlarmContext(
TString socket,
grpc::ServerCompletionQueue& cq)
: Socket(std::move(socket))
, CQ(cq)
, Backoff{MIN_RECONNECT_DELAY, MAX_RECONNECT_DELAY}
{
SetAlarm();
}

void SetAlarm()
{
Alarm.Set(
&CQ,
gpr_time_from_nanos(
Backoff.GetDelayAndIncrease().NanoSeconds(),
gpr_clock_type::GPR_TIMESPAN),
this);
}
};

////////////////////////////////////////////////////////////////////////////////

struct TClientStorage: NStorage::NServer::IClientStorage
Expand Down Expand Up @@ -226,6 +266,24 @@ struct TServer: IEndpointProxyServer
};
THashMap<TString, std::shared_ptr<TEndpoint>> Socket2Endpoint;

struct TErrorHandler: NBD::IErrorHandler
{
TString Socket;
grpc::ServerCompletionQueue& CQ;

TErrorHandler(TString socket, grpc::ServerCompletionQueue& cq)
: Socket(std::move(socket))
, CQ(cq)
{}

void ProcessException(std::exception_ptr e) override
{
Y_UNUSED(e);

new TRestartAlarmContext(Socket, CQ);
}
};

NBD::IClientPtr NbdClient;

TServer(
Expand Down Expand Up @@ -460,6 +518,15 @@ struct TServer: IEndpointProxyServer
ProcessRequest(resizeRequestContext);
continue;
}

auto* restartAlarmContext =
dynamic_cast<TRestartAlarmContext*>(requestContext);
if (restartAlarmContext) {
if (!ProcessAlarm(restartAlarmContext)) {
delete restartAlarmContext;
}
continue;
}
}

STORAGE_INFO("Exiting loop");
Expand All @@ -472,7 +539,7 @@ struct TServer: IEndpointProxyServer
template <typename TRequest>
void RequestReceived(const TRequest& request)
{
STORAGE_INFO(request.ShortDebugString().Quote() << "- Received");
STORAGE_INFO(request.ShortDebugString().Quote() << " - Received");
}

template <typename TResponse>
Expand Down Expand Up @@ -546,12 +613,13 @@ struct TServer: IEndpointProxyServer
TEndpoint& ep,
NProto::TStartProxyEndpointResponse& response)
{
const auto tag = TStringBuilder()
<< request.ShortDebugString().Quote() << " - ";

if (!ValidateRequest(request, response)) {
return;
}

STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Validated request");
STORAGE_INFO(tag << "Validated request");

TNetworkAddress connectAddress(
MakeUnixSocketAddress(request.GetUnixSocketPath()));
Expand All @@ -560,8 +628,7 @@ struct TServer: IEndpointProxyServer
NBD::CreateClientHandler(Logging),
CreateBlockStoreStub());
ep.Client->Start();
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD client endpoint");
STORAGE_INFO(tag << "Started NBD client endpoint");

auto retryPolicy = CreateRetryPolicy(ClientConfig);
ep.RequestStats = CreateProxyRequestStats();
Expand All @@ -575,8 +642,7 @@ struct TServer: IEndpointProxyServer
Scheduler,
ep.RequestStats,
volumeStats);
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started DurableClient");
STORAGE_INFO(tag << "Started DurableClient");

// these options can actually be obtained from ClientHandler after the
// first request is processed (they will be available after connection
Expand All @@ -588,74 +654,26 @@ struct TServer: IEndpointProxyServer
ep.NbdOptions.BlockSize = request.GetBlockSize();
ep.NbdOptions.BlocksCount = request.GetBlocksCount();

auto handlerFactory = CreateServerHandlerFactory(
CreateDefaultDeviceHandlerFactory(),
Logging,
CreateProxyStorage(
ep.Client,
ep.RequestStats,
ep.NbdOptions.BlockSize),
CreateServerStatsStub(),
ep.NbdOptions);

ep.InternalUnixSocketPath = request.GetUnixSocketPath() + ".p";
ep.ListenAddress = std::make_unique<TNetworkAddress>(
MakeUnixSocketAddress(ep.InternalUnixSocketPath));

// TODO fix StartEndpoint signature - it's actually synchronous
auto startResult = NbdServer->StartEndpoint(
*ep.ListenAddress,
std::move(handlerFactory)).GetValueSync();

if (HasError(startResult)) {
*response.MutableError() = std::move(startResult);
auto status = StartServerEndpoint(ep, tag);
if (HasError(status)) {
*response.MutableError() = std::move(status);
return;
}
response.SetInternalUnixSocketPath(ep.InternalUnixSocketPath);

STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD server endpoint");

ep.NbdDevicePath = request.GetNbdDevice();
if (!ep.NbdDevicePath) {
STORAGE_WARN(request.ShortDebugString().Quote()
<< " - NbdDevice missing - no nbd connection with the"
<< " kernel will be established");
return;
}

if (Config.Netlink) {
ep.NbdDevice = NBD::CreateNetlinkDevice(
Logging,
*ep.ListenAddress,
request.GetNbdDevice(),
Config.NbdRequestTimeout,
TDuration::Days(1), // connection timeout
true); // reconfigure device if exists
} else {
// The only case we want kernel to retry requests is when the socket
// is dead due to nbd server restart. And since we can't configure
// ioctl device to use a new socket, request timeout effectively
// becomes connection timeout
ep.NbdDevice = NBD::CreateDevice(
Logging,
*ep.ListenAddress,
request.GetNbdDevice(),
TDuration::Days(1)); // timeout
}

auto future = ep.NbdDevice->Start();
const auto& status = future.GetValue();
status = StartDevice(ep, tag);
if (HasError(status)) {
STORAGE_ERROR(request.ShortDebugString().Quote()
<< " - Unable to start nbd device: "
STORAGE_ERROR(tag << "Unable to start nbd device: "
<< status.GetMessage());
*response.MutableError() = std::move(status);
return;
}

STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD device connection");
}

void ProcessRequest(TStartRequestContext* requestContext)
Expand Down Expand Up @@ -718,7 +736,7 @@ struct TServer: IEndpointProxyServer
if (ep.NbdDevice) {
ep.NbdDevice->Stop(true);
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Stopped NBD device connection");
<< " - Stopped NBD device");
}

if (ep.ListenAddress) {
Expand Down Expand Up @@ -890,6 +908,122 @@ struct TServer: IEndpointProxyServer
ResponseSent(request);
}

NProto::TError StartServerEndpoint(TEndpoint& ep, const TString& tag)
{
auto handlerFactory = CreateServerHandlerFactory(
CreateDefaultDeviceHandlerFactory(),
Logging,
CreateProxyStorage(
ep.Client,
ep.RequestStats,
ep.NbdOptions.BlockSize),
CreateServerStatsStub(),
std::make_shared<TErrorHandler>(ep.UnixSocketPath, *CQ),
ep.NbdOptions);

// TODO fix StartEndpoint signature - it's actually synchronous
auto status = NbdServer->StartEndpoint(
*ep.ListenAddress,
std::move(handlerFactory)).GetValueSync();
if (HasError(status)) {
STORAGE_INFO(tag << "Unable to start NBD server endpoint: "
<< status.GetMessage());
return status;
}

STORAGE_INFO(tag << "Started NBD server endpoint");
return {};
}

NProto::TError StartDevice(TEndpoint& ep, const TString& tag)
{
if (!ep.NbdDevicePath) {
STORAGE_WARN(tag << "NbdDevice missing - nbd connection "
<< "with the kernel won't be established");
return {};
}

if (Config.Netlink) {
ep.NbdDevice = NBD::CreateNetlinkDevice(
Logging,
*ep.ListenAddress,
ep.NbdDevicePath,
Config.NbdRequestTimeout,
NBD_CONNECTION_TIMEOUT,
NBD_RECONFIGURE_CONNECTED);
} else {
// The only case we want kernel to retry requests is when the socket
// is dead due to nbd server restart. And since we can't configure
// ioctl device to use a new socket, request timeout effectively
// becomes connection timeout
ep.NbdDevice = NBD::CreateDevice(
Logging,
*ep.ListenAddress,
ep.NbdDevicePath,
NBD_CONNECTION_TIMEOUT);
}

auto status = ep.NbdDevice->Start().GetValueSync();
if (HasError(status)) {
STORAGE_ERROR(tag << "Unable to start NBD device: "
<< status.GetMessage());
return status;
}

STORAGE_INFO(tag << "Started NBD device");

return {};
}

// returns true in case of an error
bool ProcessAlarm(TRestartAlarmContext* context)
{
const auto tag = TStringBuilder()
<< "UnixSocketPath: " << context->Socket.Quote() << " - ";

if (auto& ep = Socket2Endpoint[context->Socket]; ep) {
STORAGE_INFO(tag << "Restarting proxy endpoint");

if (DoProcessAlarm(*ep, tag)) {
STORAGE_ERROR(tag
<< "Unable to restart proxy endpoint, retry in "
<< context->Backoff.GetDelay());
context->SetAlarm();
return true;
}
} else {
STORAGE_WARN(tag << "Unable to restart proxy endpoint: "
<< "original endpoint stopped");
}
return false;
}

// returns true in case of an error
bool DoProcessAlarm(TEndpoint& ep, const TString& tag)
{
if (ep.NbdDevice) {
ep.NbdDevice->Stop(NBD_DELETE_DEVICE).GetValueSync();
STORAGE_INFO(tag << "Stopped NBD device");
}

if (ep.ListenAddress) {
NbdServer->StopEndpoint(*ep.ListenAddress).GetValueSync();
STORAGE_INFO(tag << "Stopped NBD server endpoint");
}

auto status = StartServerEndpoint(ep, tag);
if (HasError(status)) {
return true;
}

status = StartDevice(ep, tag);
if (HasError(status)) {
return true;
}

return false;
}

void Start() override
{
PreStart();
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cloud/blockstore/libs/client/session.h>
#include <cloud/blockstore/libs/diagnostics/server_stats.h>
#include <cloud/blockstore/libs/endpoints/endpoint_listener.h>
#include <cloud/blockstore/libs/nbd/error_handler.h>
#include <cloud/blockstore/libs/nbd/server.h>
#include <cloud/blockstore/libs/nbd/server_handler.h>
#include <cloud/blockstore/libs/service/device_handler.h>
Expand Down Expand Up @@ -53,6 +54,7 @@ class TNbdEndpointListener final
Logging,
std::move(session),
ServerStats,
NBD::CreateErrorHandlerStub(),
options);

auto address = TNetworkAddress(
Expand Down
Loading
Loading