diff --git a/cloud/blockstore/libs/rdma/impl/client.cpp b/cloud/blockstore/libs/rdma/impl/client.cpp index c4484749714..0b4c9df7157 100644 --- a/cloud/blockstore/libs/rdma/impl/client.cpp +++ b/cloud/blockstore/libs/rdma/impl/client.cpp @@ -389,6 +389,7 @@ class TClientEndpoint final // config might be adjusted during initial handshake TClientConfigPtr OriginalConfig; TClientConfig Config; + const EWaitMode WaitMode; bool ResetConfig = false; TCompletionPoller* Poller = nullptr; @@ -524,6 +525,7 @@ TClientEndpoint::TClientEndpoint( , Reconnect(config->MaxReconnectDelay) , OriginalConfig(std::move(config)) , Config(*OriginalConfig) + , WaitMode(Config.WaitMode) { // user data attached to connection events Connection->context = this; @@ -769,14 +771,14 @@ void TClientEndpoint::SendRequest( Counters->RequestEnqueued(); InputRequests.Enqueue(std::move(req)); - if (Config.WaitMode == EWaitMode::Poll) { + if (WaitMode == EWaitMode::Poll) { RequestEvent.Set(); } } bool TClientEndpoint::HandleInputRequests() { - if (Config.WaitMode == EWaitMode::Poll) { + if (WaitMode == EWaitMode::Poll) { RequestEvent.Clear(); } @@ -811,7 +813,7 @@ bool TClientEndpoint::AbortRequests() noexcept { bool ret = false; - if (Config.WaitMode == EWaitMode::Poll) { + if (WaitMode == EWaitMode::Poll) { DisconnectEvent.Clear(); } @@ -856,7 +858,7 @@ bool TClientEndpoint::HandleCompletionEvents() { ibv_cq* cq = CompletionQueue.get(); - if (Config.WaitMode == EWaitMode::Poll) { + if (WaitMode == EWaitMode::Poll) { Verbs->GetCompletionEvent(cq); Verbs->AckCompletionEvents(cq, 1); Verbs->RequestCompletionEvent(cq, 0); @@ -1113,7 +1115,7 @@ void TClientEndpoint::SetError() noexcept RDMA_ERROR("flush error: " << e.what()); } - if (Config.WaitMode == EWaitMode::Poll) { + if (WaitMode == EWaitMode::Poll) { DisconnectEvent.Set(); } } diff --git a/cloud/blockstore/vhost-server/backend_aio.cpp b/cloud/blockstore/vhost-server/backend_aio.cpp index 07ff6f0317f..89f01e2048b 100644 --- a/cloud/blockstore/vhost-server/backend_aio.cpp +++ b/cloud/blockstore/vhost-server/backend_aio.cpp @@ -8,6 +8,8 @@ #include <cloud/storage/core/libs/common/thread.h> #include <cloud/storage/core/libs/diagnostics/logging.h> +#include <util/system/sanitizers.h> + #include <libaio.h> #include <thread> @@ -18,55 +20,6 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -#ifndef NDEBUG - -TString ToString(std::span<iocb*> batch) -{ - TStringStream ss; - - const char* op[]{ - "pread", - "pwrite", - "", - "", - "", - "", - "", - "preadv", - "pwritev", - }; - - ss << "[ "; - for (iocb* cb: batch) { - ss << "{ " << op[cb->aio_lio_opcode] << ":" << cb->aio_fildes << " "; - switch (cb->aio_lio_opcode) { - case IO_CMD_PREAD: - case IO_CMD_PWRITE: - ss << cb->u.c.buf << " " << cb->u.c.nbytes << ":" - << cb->u.c.offset; - break; - case IO_CMD_PREADV: - case IO_CMD_PWRITEV: { - iovec* iov = static_cast<iovec*>(cb->u.c.buf); - for (unsigned i = 0; i != cb->u.c.nbytes; ++i) { - ss << "(" << iov[i].iov_base << " " << iov[i].iov_len - << ") "; - } - break; - } - } - ss << "} "; - } - - ss << "]"; - - return ss.Str(); -} - -#endif // NDEBUG - -//////////////////////////////////////////////////////////////////////////////// - void CompleteRequest( TAioRequest* req, vhd_bdev_io_result status, @@ -345,11 +298,6 @@ void TAioBackend::ProcessQueue( queueStats.Submitted += ret; -#ifndef NDEBUG - STORAGE_DEBUG( - "submitted " << ret << ": " - << ToString(std::span(batch.data(), ret))); -#endif // remove submitted items from the batch batch.erase(batch.begin(), batch.begin() + ret); } @@ -416,7 +364,9 @@ void TAioBackend::CompletionThreadFunc() for (int i = 0; i != ret; ++i) { if (events[i].data) { auto* req = static_cast<TAioCompoundRequest*>(events[i].data); + NSan::Acquire(req); iocb* sub = events[i].obj; + NSan::Acquire(sub); vhd_bdev_io_result result = VHD_BDEV_SUCCESS; @@ -440,6 +390,7 @@ void TAioBackend::CompletionThreadFunc() } auto* req = static_cast<TAioRequest*>(events[i].obj); + NSan::Acquire(req); vhd_bdev_io_result result = VHD_BDEV_SUCCESS; auto* bio = vhd_get_bdev_io(req->Io); diff --git a/cloud/blockstore/vhost-server/request_aio.cpp b/cloud/blockstore/vhost-server/request_aio.cpp index 1635a6f4725..900dc93e749 100644 --- a/cloud/blockstore/vhost-server/request_aio.cpp +++ b/cloud/blockstore/vhost-server/request_aio.cpp @@ -3,6 +3,7 @@ #include <cloud/storage/core/libs/diagnostics/logging.h> #include <util/generic/strbuf.h> +#include <util/system/sanitizers.h> #include <algorithm> @@ -81,11 +82,13 @@ void PrepareCompoundIO( sreq->data = req; batch.push_back(sreq); + NSan::Release(sreq); ptr += count; totalBytes -= count; deviceOffset = 0; } + NSan::Release(req); } } // namespace @@ -198,6 +201,7 @@ void PrepareIO( STORAGE_DEBUG("Prepared IO request with addr: %p", req); batch.push_back(req); + NSan::Release(req); } } // namespace NCloud::NBlockStore::NVHostServer diff --git a/cloud/blockstore/vhost-server/server.cpp b/cloud/blockstore/vhost-server/server.cpp index 100eec40dc6..f1aebfab915 100644 --- a/cloud/blockstore/vhost-server/server.cpp +++ b/cloud/blockstore/vhost-server/server.cpp @@ -180,14 +180,12 @@ void TServer::Stop() { STORAGE_INFO("Stopping the server"); - { - auto promise = NewPromise(); - vhd_unregister_blockdev(Handler, [] (void* opaque) { - static_cast<TPromise<void>*>(opaque)->SetValue(); - }, &promise); + auto promise = NewPromise(); + vhd_unregister_blockdev(Handler, [] (void* opaque) { + static_cast<TPromise<void>*>(opaque)->SetValue(); + }, &promise); - promise.GetFuture().Wait(); - } + promise.GetFuture().Wait(); // 2. Stop request queues. For each do: // 2.1 Stop a request queue