Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEBDUTY-965: fix thread sanitizer errors in ut #883

Merged
merged 4 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cloud/blockstore/libs/rdma/impl/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ class TClientEndpoint final
// config might be adjusted during initial handshake
TClientConfigPtr OriginalConfig;
TClientConfig Config;
const EWaitMode WaitMode;
bool ResetConfig = false;

TCompletionPoller* Poller = nullptr;
Expand Down Expand Up @@ -524,6 +525,7 @@ TClientEndpoint::TClientEndpoint(
, Reconnect(config->MaxReconnectDelay)
, OriginalConfig(std::move(config))
, Config(*OriginalConfig)
, WaitMode(Config.WaitMode)
{
// user data attached to connection events
Connection->context = this;
Expand Down Expand Up @@ -769,14 +771,14 @@ void TClientEndpoint::SendRequest(
Counters->RequestEnqueued();
InputRequests.Enqueue(std::move(req));

if (Config.WaitMode == EWaitMode::Poll) {
if (WaitMode == EWaitMode::Poll) {
RequestEvent.Set();
}
}

bool TClientEndpoint::HandleInputRequests()
{
if (Config.WaitMode == EWaitMode::Poll) {
if (WaitMode == EWaitMode::Poll) {
RequestEvent.Clear();
}

Expand Down Expand Up @@ -811,7 +813,7 @@ bool TClientEndpoint::AbortRequests() noexcept
{
bool ret = false;

if (Config.WaitMode == EWaitMode::Poll) {
if (WaitMode == EWaitMode::Poll) {
DisconnectEvent.Clear();
}

Expand Down Expand Up @@ -856,7 +858,7 @@ bool TClientEndpoint::HandleCompletionEvents()
{
ibv_cq* cq = CompletionQueue.get();

if (Config.WaitMode == EWaitMode::Poll) {
if (WaitMode == EWaitMode::Poll) {
Verbs->GetCompletionEvent(cq);
Verbs->AckCompletionEvents(cq, 1);
Verbs->RequestCompletionEvent(cq, 0);
Expand Down Expand Up @@ -1113,7 +1115,7 @@ void TClientEndpoint::SetError() noexcept
RDMA_ERROR("flush error: " << e.what());
}

if (Config.WaitMode == EWaitMode::Poll) {
if (WaitMode == EWaitMode::Poll) {
DisconnectEvent.Set();
}
}
Expand Down
59 changes: 5 additions & 54 deletions cloud/blockstore/vhost-server/backend_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <cloud/storage/core/libs/common/thread.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

#include <util/system/sanitizers.h>

#include <libaio.h>

#include <thread>
Expand All @@ -18,55 +20,6 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

#ifndef NDEBUG

TString ToString(std::span<iocb*> batch)
{
TStringStream ss;

const char* op[]{
"pread",
"pwrite",
"",
"",
"",
"",
"",
"preadv",
"pwritev",
};

ss << "[ ";
for (iocb* cb: batch) {
ss << "{ " << op[cb->aio_lio_opcode] << ":" << cb->aio_fildes << " ";
switch (cb->aio_lio_opcode) {
case IO_CMD_PREAD:
case IO_CMD_PWRITE:
ss << cb->u.c.buf << " " << cb->u.c.nbytes << ":"
<< cb->u.c.offset;
break;
case IO_CMD_PREADV:
case IO_CMD_PWRITEV: {
iovec* iov = static_cast<iovec*>(cb->u.c.buf);
for (unsigned i = 0; i != cb->u.c.nbytes; ++i) {
ss << "(" << iov[i].iov_base << " " << iov[i].iov_len
<< ") ";
}
break;
}
}
ss << "} ";
}

ss << "]";

return ss.Str();
}

#endif // NDEBUG

////////////////////////////////////////////////////////////////////////////////

void CompleteRequest(
TAioRequest* req,
vhd_bdev_io_result status,
Expand Down Expand Up @@ -345,11 +298,6 @@ void TAioBackend::ProcessQueue(

queueStats.Submitted += ret;

#ifndef NDEBUG
STORAGE_DEBUG(
"submitted " << ret << ": "
<< ToString(std::span(batch.data(), ret)));
#endif
// remove submitted items from the batch
batch.erase(batch.begin(), batch.begin() + ret);
}
Expand Down Expand Up @@ -416,7 +364,9 @@ void TAioBackend::CompletionThreadFunc()
for (int i = 0; i != ret; ++i) {
if (events[i].data) {
auto* req = static_cast<TAioCompoundRequest*>(events[i].data);
NSan::Acquire(req);
iocb* sub = events[i].obj;
NSan::Acquire(sub);

vhd_bdev_io_result result = VHD_BDEV_SUCCESS;

Expand All @@ -440,6 +390,7 @@ void TAioBackend::CompletionThreadFunc()
}

auto* req = static_cast<TAioRequest*>(events[i].obj);
NSan::Acquire(req);

vhd_bdev_io_result result = VHD_BDEV_SUCCESS;
auto* bio = vhd_get_bdev_io(req->Io);
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/vhost-server/request_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cloud/storage/core/libs/diagnostics/logging.h>

#include <util/generic/strbuf.h>
#include <util/system/sanitizers.h>

#include <algorithm>

Expand Down Expand Up @@ -81,11 +82,13 @@ void PrepareCompoundIO(
sreq->data = req;

batch.push_back(sreq);
NSan::Release(sreq);

ptr += count;
totalBytes -= count;
deviceOffset = 0;
}
NSan::Release(req);
}

} // namespace
Expand Down Expand Up @@ -198,6 +201,7 @@ void PrepareIO(
STORAGE_DEBUG("Prepared IO request with addr: %p", req);

batch.push_back(req);
NSan::Release(req);
}

} // namespace NCloud::NBlockStore::NVHostServer
12 changes: 5 additions & 7 deletions cloud/blockstore/vhost-server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,12 @@ void TServer::Stop()
{
STORAGE_INFO("Stopping the server");

{
auto promise = NewPromise();
vhd_unregister_blockdev(Handler, [] (void* opaque) {
static_cast<TPromise<void>*>(opaque)->SetValue();
}, &promise);
auto promise = NewPromise();
vhd_unregister_blockdev(Handler, [] (void* opaque) {
static_cast<TPromise<void>*>(opaque)->SetValue();
}, &promise);

promise.GetFuture().Wait();
}
promise.GetFuture().Wait();

// 2. Stop request queues. For each do:
// 2.1 Stop a request queue
Expand Down