Skip to content
This repository has been archived by the owner on Jan 7, 2022. It is now read-only.

Commit

Permalink
Don't start worker threads until Processor initialization is complete
Browse files Browse the repository at this point in the history
Summary:
When creating various components in Processor::init() (watchdog thread, health monitor, failure detector, etc), at first create all of them in a dormant state, then start all of them. In particular, pause all worker threads until later in the startup sequence. This prevents race conditions when the parts initialized early may try to access parts of the Processor that are not initialized yet.

In particular, fixes this race when assigning Processor::failure_detector_ in a unit test (it's a test-only code path, but afaict the same thing can happen in server):

  ==================
  WARNING: ThreadSanitizer: data race (pid=886149)
    Write of size 8 at 0x7b88001dfe10 by main thread:
      #0 _ZSt4swapIPN8facebook9logdevice15FailureDetectorEENSt9enable_ifIXsr6__and_ISt6__not_ISt15__is_tuple_likeIT_EESt21is_move_constructibleIS7_ESt18is_move_assignableIS7_EEE5valueEvE4typeERS7_SG_ at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/move.h:199
      #1 std::unique_ptr<facebook::logdevice::FailureDetector, std::default_delete<facebook::logdevice::FailureDetector> >::reset(facebook::logdevice::FailureDetector*) at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:374
      #2 _ZNSt10unique_ptrIN8facebook9logdevice15FailureDetectorESt14default_deleteIS2_EEaSINS1_19MockFailureDetectorES3_IS7_EEENSt9enable_ifIXsr6__and_ISt6__and_IJSt14is_convertibleINS_IT_T0_E7pointerEPS2_ESt6__not_ISt8is_arrayISC_EESt5__or_IJSA_IJSt12is_referenceIS4_ESt7is_sameIS4_SD_EEESA_IJSI_ISO_ESB_ISD_S4_EEEEEEESt13is_assignableIRS4_OSD_EEE5valueERS5_E4typeEOSE_ at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:303
      #3 (anonymous namespace)::make_processor_with_detector(short, unsigned long, facebook::logdevice::GossipSettings const&) at ./logdevice/server/test/FailureDetectorTest.cpp:185
      #4 (anonymous namespace)::create_processors_and_detectors(unsigned long, facebook::logdevice::GossipSettings const&) at ./logdevice/server/test/FailureDetectorTest.cpp:266
      #5 (anonymous namespace)::simulate(unsigned long, facebook::logdevice::GossipSettings const&) at ./logdevice/server/test/FailureDetectorTest.cpp:296
      #6 FailureDetectorTest_RandomGossip_Test::TestBody() at ./logdevice/server/test/FailureDetectorTest.cpp:313
      #7 void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) at /home/engshare/third-party2/googletest/master/src/googletest/googletest/src/gtest.cc:2417
      #8 main at ./common/gtest/LightMain.cpp:19
      #9 ?? ??:0

    Previous read of size 8 at 0x7b88001dfe10 by thread T79:
      #0 std::__uniq_ptr_impl<facebook::logdevice::FailureDetector, std::default_delete<facebook::logdevice::FailureDetector> >::_M_ptr() const at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:147
      #1 std::unique_ptr<facebook::logdevice::FailureDetector, std::default_delete<facebook::logdevice::FailureDetector> >::get() const at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/unique_ptr.h:337
      #2 facebook::logdevice::ServerWorker::onServerConfigUpdated() at ./logdevice/server/ServerWorker.cpp:190
      #3 facebook::logdevice::Worker::onNodesConfigurationUpdated() at ./logdevice/common/Worker.cpp:393
      #4 facebook::logdevice::Worker::initializeSubscriptions() at ./logdevice/common/Worker.cpp:541
      #5 facebook::logdevice::Worker::setupWorker() at ./logdevice/common/Worker.cpp:582
      #6 facebook::logdevice::ServerWorker::setupWorker() at ./logdevice/server/ServerWorker.cpp:201
      #7 facebook::logdevice::ServerProcessor::createWorker(folly::Executor::KeepAlive<folly::Executor>, facebook::logdevice::worker_id_t, facebook::logdevice::WorkerType)::$_0::operator()() const at ./logdevice/server/ServerProcessor.cpp:26
      #8 void folly::detail::function::FunctionTraits<void ()>::callSmall<facebook::logdevice::ServerProcessor::createWorker(folly::Executor::KeepAlive<folly::Executor>, facebook::logdevice::worker_id_t, facebook::logdevice::WorkerType)::$_0>(folly::detail::function::Data&) at ./folly/Function.h:376
      #9 folly::detail::function::FunctionTraits<void ()>::operator()() at ./folly/Function.h:392
      #10 facebook::logdevice::Worker::addWithPriority(folly::Function<void ()>, signed char)::$_9::operator()() at ./logdevice/common/Worker.cpp:1413
      #11 void folly::detail::function::FunctionTraits<void ()>::callBig<facebook::logdevice::Worker::addWithPriority(folly::Function<void ()>, signed char)::$_9>(folly::detail::function::Data&) at ./folly/Function.h:383
      #12 folly::detail::function::FunctionTraits<void ()>::operator()() at ./folly/Function.h:392
      #13 facebook::logdevice::EventLoopTaskQueue::executeTasks(unsigned int) at ./logdevice/common/EventLoopTaskQueue.cpp:154
      #14 facebook::logdevice::EventLoopTaskQueue::haveTasksEventHandler()::$_1::operator()(unsigned int) const at ./logdevice/common/EventLoopTaskQueue.cpp:101
      #15 void facebook::logdevice::LifoEventSemImpl<std::atomic>::AsyncWaiter::processBatch<facebook::logdevice::EventLoopTaskQueue::haveTasksEventHandler()::$_1&>(facebook::logdevice::EventLoopTaskQueue::haveTasksEventHandler()::$_1&, unsigned int) at ./logdevice/common/LifoEventSem.h:368
      #16 facebook::logdevice::EventLoopTaskQueue::haveTasksEventHandler() at ./logdevice/common/EventLoopTaskQueue.cpp:106
      #17 facebook::logdevice::EventLoopTaskQueue::EventLoopTaskQueue(facebook::logdevice::EvBase&, unsigned long, std::array<unsigned int, 3ul> const&)::$_0::operator()() const at ./logdevice/common/EventLoopTaskQueue.cpp:36
      #18 void folly::detail::function::FunctionTraits<void ()>::callSmall<facebook::logdevice::EventLoopTaskQueue::EventLoopTaskQueue(facebook::logdevice::EvBase&, unsigned long, std::array<unsigned int, 3ul> const&)::$_0>(folly::detail::function::Data&) at ./folly/Function.h:376
      #19 folly::detail::function::FunctionTraits<void ()>::operator()() at ./folly/Function.h:392
      #20 facebook::logdevice::EventLegacy::evCallback(int, short, void*) at ./logdevice/common/libevent/EventLegacy.cpp:41
      #21 event_persist_closure at ./logdevice/external/libevent-2.1.3-alpha/event.c:1452
      #22 event_process_active_single_queue at ./logdevice/external/libevent-2.1.3-alpha/event.c:1508
      #23 event_process_active at ./logdevice/external/libevent-2.1.3-alpha/event.c:1596
      #24 ld_event_base_loop at ./logdevice/external/libevent-2.1.3-alpha/event.c:1819
      #25 facebook::logdevice::EvBaseLegacy::loop() at ./logdevice/common/libevent/EvBaseLegacy.cpp:58
      #26 facebook::logdevice::EvBase::loop() at ./logdevice/common/libevent/LibEventCompatibility.cpp:52
  W1101 14:22:40.569391  907396 [logdevice:WG0] Worker.cpp:1319] processRequest() Request queued for 433 msec: NODE_STATE_UPDATED (id: 4294979886), p :LO_PRI
      #27 facebook::logdevice::EventLoop::run() at ./logdevice/common/EventLoop.cpp:140
  W1101 14:22:40.584447  907396 [logdevice:WG0] Worker.cpp:1319] processRequest() Request queued for 446 msec: NODE_STATE_UPDATED (id: 4294979887), p :LO_PRI
      #28 facebook::logdevice::EventLoop::EventLoop(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, facebook::logdevice::ThreadID::Type, unsigned long, bool, std::array<unsigned int, 3ul> const&, facebook::logdevice::EvBase::EvBaseType)::$_0::operator()() const at ./logdevice/common/EventLoop.cpp:81
      #29 std::_Function_handler<void (), facebook::logdevice::EventLoop::EventLoop(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, facebook::logdevice::ThreadID::Type, unsigned long, bool, std::array<unsigned int, 3ul> const&, facebook::logdevice::EvBase::EvBaseType)::$_0>::_M_invoke(std::_Any_data const&) at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/std_function.h:316
  W1101 14:22:40.603377  907396 [logdevice:WG0] Worker.cpp:1319] processRequest() Request queued for 468 msec: NODE_STATE_UPDATED (id: 4294979888), p :LO_PRI
      #30 std::function<void ()>::operator()() const at ./third-party-buck/platform007/build/libgcc/include/c++/trunk/bits/std_function.h:706
  W1101 14:22:40.621593  907396 [logdevice:WG0] Worker.cpp:1319] processRequest() Request queued for 491 msec: NODE_STATE_UPDATED (id: 4294979892), p :LO_PRI
      #31 facebook::logdevice::thread_func(void*) at ./logdevice/common/PThread.cpp:18
  W1101 14:22:40.645430  907396 [logdevice:WG0] Worker.cpp:1319] processRequest() Request queued for 505 msec: NODE_STATE_UPDATED (id: 4294979893), p :LO_PRI
      #32 __tsan_thread_start_func at tsan.c:?

Reviewed By: mcrnic

Differential Revision: D18286463

fbshipit-source-id: 1ea365415f1e9dc8e907eabcbdd8a8249652a16e
  • Loading branch information
al13n321 authored and facebook-github-bot committed Nov 7, 2019
1 parent 1cc1045 commit 016132b
Show file tree
Hide file tree
Showing 19 changed files with 276 additions and 230 deletions.
22 changes: 19 additions & 3 deletions logdevice/common/EventLoop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ EventLoop::EventLoop(
bool enable_priority_queues,
const std::array<uint32_t, EventLoopTaskQueue::kNumberOfPriorities>&
requests_per_iteration,
EvBase::EvBaseType base_type)
EvBase::EvBaseType base_type,
bool start_running)
: thread_type_(thread_type),
thread_name_(thread_name),
priority_queues_enabled_(enable_priority_queues) {
Expand All @@ -77,16 +78,22 @@ EventLoop::EventLoop(
auto res = init_result =
init(base_type, request_pump_capacity, requests_per_iteration);
initialized.post();
if (res == Status::OK) {
run();
if (res != Status::OK) {
return;
}

start_running_.wait();
run();
});
initialized.wait();
if (init_result != Status::OK) {
err = init_result;
thread_.join();
throw ConstructorFailed();
}
if (start_running) {
startRunning();
}
}

EventLoop::~EventLoop() {
Expand All @@ -99,10 +106,19 @@ EventLoop::~EventLoop() {
// the eventloop instance.
// Tell EventLoop on the other end to destroy itself and terminate the
// thread
if (!started_running_) {
start_running_.post();
}
task_queue_->shutdown();
thread_.join();
}

void EventLoop::startRunning() {
ld_check(!started_running_);
start_running_.post();
started_running_ = true;
}

void EventLoop::add(folly::Function<void()> func) {
addWithPriority(std::move(func), folly::Executor::LO_PRI);
}
Expand Down
13 changes: 12 additions & 1 deletion logdevice/common/EventLoop.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class EventLoop : public folly::Executor {
bool enable_priority_queues = true,
const std::array<uint32_t, EventLoopTaskQueue::kNumberOfPriorities>&
requests_per_iteration = {13, 3, 1},
EvBase::EvBaseType base_type = EvBase::LEGACY_EVENTBASE);
EvBase::EvBaseType base_type = EvBase::LEGACY_EVENTBASE,
bool start_running = true);

// destructor has to be virtual because it is invoked by EventLoop::run()
// as "delete this"
Expand All @@ -77,6 +78,10 @@ class EventLoop : public folly::Executor {
return EventLoopTaskQueue::kNumberOfPriorities;
}

// Tells the event loop thread to start processing requests.
// Must be called if `start_running = false` was passed to constructor.
void startRunning();

/// Enqueue a function to executed by this executor. This and all
/// variants must be threadsafe.
void add(folly::Function<void()>) override;
Expand Down Expand Up @@ -176,6 +181,12 @@ class EventLoop : public folly::Executor {
// Main task queue; (shutting down this TaskQueue stops the event loop)
std::unique_ptr<EventLoopTaskQueue> task_queue_;

// The thread will block on this semaphore before it starts processing
// requests.
Semaphore start_running_;
// True if we posted to start_running_.
bool started_running_ = false;

Status
init(EvBase::EvBaseType base_type,
size_t request_pump_capacity,
Expand Down
9 changes: 8 additions & 1 deletion logdevice/common/Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ void Processor::init() {
initialized_.store(true, std::memory_order_relaxed);
}

void Processor::startRunning() {
for (std::unique_ptr<EventLoop>& loop : impl_->ev_loops_) {
loop->startRunning();
}
}

ReadStreamDebugInfoSamplingConfig& Processor::getDebugClientConfig() {
return impl_->read_stream_debug_info_sampling_config_;
}
Expand All @@ -272,7 +278,8 @@ workers_t Processor::createWorkerPool(WorkerType type, size_t count) {
local_settings->mid_requests_per_iteration,
local_settings->lo_requests_per_iteration),
local_settings->use_legacy_eventbase ? EvBase::LEGACY_EVENTBASE
: EvBase::FOLLY_EVENTBASE));
: EvBase::FOLLY_EVENTBASE,
/* start_running */ false));
auto executor = folly::getKeepAliveToken(loops.back().get());
worker.reset(createWorker(std::move(executor), worker_id_t(i), type));
} catch (ConstructorFailed&) {
Expand Down
11 changes: 10 additions & 1 deletion logdevice/common/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class Processor : public folly::enable_shared_from_this<Processor> {
protected:
/**
* Second step of construction, decoupled from constructor to allow
* subclasses to override Worker construction. Starts all worker threads.
* subclasses to override Worker construction. Creates workers but doesn't
* start their threads.
*
* @throws ConstructorFailed if one or more EventLoops failed to start. err
* is set to NOMEM, SYSLIMIT or INTERNAL as defined for EventLoop
Expand All @@ -122,6 +123,13 @@ class Processor : public folly::enable_shared_from_this<Processor> {
*/
virtual void init();

/**
* Start workers' event loop threads, and any background work.
* This is separate from init() to make it easy to ensure that worker threads
* don't try to access half-initialized Processor.
*/
virtual void startRunning();

/**
* Creates a worker pool of the supplied type and returns the vector
*/
Expand Down Expand Up @@ -167,6 +175,7 @@ class Processor : public folly::enable_shared_from_this<Processor> {
static std::shared_ptr<Processor> create(Args&&... args) {
auto p = std::make_shared<Processor>(std::forward<Args>(args)...);
p->init();
p->startRunning();
return p;
}

Expand Down
42 changes: 12 additions & 30 deletions logdevice/common/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,11 @@ void Worker::onStoppedRunning(RunContext prev_context) {
prev_context.describe().c_str());
WORKER_STAT_INCR(worker_slow_requests);
if (worker_type_ == WorkerType::GENERAL) {
callLongExecutionCallback(
idx_.val_,
std::chrono::duration_cast<std::chrono::milliseconds>(duration));
if (long_execution_cb_) {
long_execution_cb_(
idx_.val_,
std::chrono::duration_cast<std::chrono::milliseconds>(duration));
}
}
}

Expand Down Expand Up @@ -1295,7 +1297,9 @@ void Worker::processRequest(std::unique_ptr<Request> rq) {
priority_to_str(priority));
if (worker_type_ == WorkerType::GENERAL &&
priority == folly::Executor::HI_PRI) {
callLongQueuedCallback(idx_.val_, queue_time);
if (long_queued_cb_) {
long_queued_cb_(idx_.val_, queue_time);
}
WORKER_STAT_INCR(worker_hi_pri_long_queued_requests);
}
}
Expand Down Expand Up @@ -1420,35 +1424,13 @@ void Worker::generateErrorInjection(double error_chance,
}
}
void Worker::setLongExecutionCallback(SlowRequestCallback cb) {
std::atomic_store_explicit(
&long_execution_cb_,
std::make_shared<SlowRequestCallback>(std::move(cb)),
std::memory_order_relaxed);
ld_check(!long_execution_cb_);
long_execution_cb_ = cb;
}

void Worker::setLongQueuedCallback(SlowRequestCallback cb) {
std::atomic_store_explicit(
&long_queued_cb_,
std::make_shared<SlowRequestCallback>(std::move(cb)),
std::memory_order_relaxed);
}

void Worker::callLongExecutionCallback(int idx,
std::chrono::milliseconds duration) {
auto cb =
std::atomic_load_explicit(&long_execution_cb_, std::memory_order_relaxed);
if (cb && *cb) {
(*cb)(idx, duration);
}
}

void Worker::callLongQueuedCallback(int idx,
std::chrono::milliseconds duration) {
auto cb =
std::atomic_load_explicit(&long_queued_cb_, std::memory_order_relaxed);
if (cb && *cb) {
(*cb)(idx, duration);
}
ld_check(!long_queued_cb_);
long_queued_cb_ = cb;
}

}} // namespace facebook::logdevice
10 changes: 3 additions & 7 deletions logdevice/common/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ class Worker : public WorkContext {
// Callback functions that register worker id and duration of slow/delayed
// action.
using SlowRequestCallback =
folly::Function<void(int idx, std::chrono::milliseconds duration)>;
std::function<void(int idx, std::chrono::milliseconds duration)>;

void setLongExecutionCallback(SlowRequestCallback cb);
void setLongQueuedCallback(SlowRequestCallback cb);
Expand Down Expand Up @@ -835,10 +835,6 @@ class Worker : public WorkContext {
// subclasses to clear state machines that were waiting for that
virtual void noteShuttingDownNoPendingRequests() {}

void callLongExecutionCallback(int idx, std::chrono::milliseconds duration);

void callLongQueuedCallback(int idx, std::chrono::milliseconds duration);

std::shared_ptr<UpdateableConfig> config_; // cluster config to use for
// all ops on this thread

Expand Down Expand Up @@ -933,8 +929,8 @@ class Worker : public WorkContext {
// Set to true once stop has been called
bool event_log_stopped_{false};

std::shared_ptr<SlowRequestCallback> long_execution_cb_{nullptr};
std::shared_ptr<SlowRequestCallback> long_queued_cb_{nullptr};
SlowRequestCallback long_execution_cb_{nullptr};
SlowRequestCallback long_queued_cb_{nullptr};

// Error Injection settings:
double worker_stall_error_injection_chance_;
Expand Down
38 changes: 21 additions & 17 deletions logdevice/common/buffered_writer/BufferedWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,39 @@ int ProcessorProxy::postWithRetrying(std::unique_ptr<Request>& rq) {

namespace {

// All requests used by a single BufferedWriterImpl must have the same priority
// to avoid reordering.
static constexpr int8_t kBufferedWriterRequestsPriority =
folly::Executor::HI_PRI;

// Instructs the Worker to create a BufferedWriterShard instance with the
// specified ID.
class CreateShardRequest : public Request {
public:
CreateShardRequest(worker_id_t worker,
std::function<BufferedWriterShard*()> factory,
Semaphore* sem)
std::function<BufferedWriterShard*()> factory)
: Request(RequestType::BUFFERED_WRITER_CREATE_SHARD),
worker_(worker),
factory_(std::move(factory)),
sem_(sem) {}
factory_(std::move(factory)) {}

int getThreadAffinity(int /*nthreads*/) override {
return worker_.val_;
}

int8_t getExecutorPriority() const override {
return kBufferedWriterRequestsPriority;
}

Execution execute() override {
BufferedWriterShard* instance = factory_();
Worker::onThisThread()->active_buffered_writers_.insert(
std::make_pair(instance->id_, instance));
sem_->post();
return Execution::COMPLETE;
}

private:
worker_id_t worker_;
std::function<BufferedWriterShard*()> factory_;
Semaphore* sem_;
};
} // namespace

Expand Down Expand Up @@ -129,28 +134,23 @@ BufferedWriterImpl::BufferedWriterImpl(ProcessorProxy* processor_proxy,
}

const int nworkers = processor()->getWorkerCount(WorkerType::GENERAL);
Semaphore sem;
for (worker_id_t widx(0); widx.val_ < nworkers; ++widx.val_) {
// We generate the ID here but let the Worker actually create the
// BufferedWriterShard instance so that it's collocated with other data
// belonging to that Worker.
buffered_writer_id_t id = processor()->issueBufferedWriterID();
auto factory = [id, &get_log_options, this]() {
auto factory = [id, get_log_options, this]() {
return new BufferedWriterShard(id, get_log_options, this);
};
std::unique_ptr<Request> req =
std::make_unique<CreateShardRequest>(widx, factory, &sem);
std::make_unique<CreateShardRequest>(widx, factory);
int rv = processor()->postWithRetrying(req);
ld_check(rv == 0);
shards_.push_back(id);
}

// Wait until all CreateShardRequest's have been processed by Workers.
// After this it is surely safe to process append() calls and the
// constructor can return.
for (int i = 0; i < nworkers; ++i) {
sem.wait();
}
// No need to wait for the requests to complete because they can't fail and
// can't be reordered with future append requests.
}

namespace {
Expand All @@ -167,6 +167,10 @@ class PerShardRequest : public Request {
return worker_.val_;
}

int8_t getExecutorPriority() const override {
return kBufferedWriterRequestsPriority;
}

Execution execute() override {
auto& map = Worker::onThisThread()->active_buffered_writers_;
auto it = map.find(id_);
Expand Down Expand Up @@ -301,7 +305,7 @@ class BufferedAppendRequest : public Request {
}

int8_t getExecutorPriority() const override {
return folly::Executor::HI_PRI;
return kBufferedWriterRequestsPriority;
}

Execution execute() override {
Expand Down Expand Up @@ -623,7 +627,7 @@ class FlushShardRequest : public Request {
}

int8_t getExecutorPriority() const override {
return folly::Executor::HI_PRI;
return kBufferedWriterRequestsPriority;
}

Execution execute() override {
Expand Down
1 change: 1 addition & 0 deletions logdevice/lib/ClientProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ClientProcessor : public Processor {
static std::shared_ptr<ClientProcessor> create(Args&&... args) {
auto p = std::make_shared<ClientProcessor>(std::forward<Args>(args)...);
p->init();
p->startRunning();
return p;
}

Expand Down
12 changes: 1 addition & 11 deletions logdevice/server/FailureDetector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ class FailureDetector::InitRequest : public Request {

FailureDetector::FailureDetector(UpdateableSettings<GossipSettings> settings,
ServerProcessor* processor,
StatsHolder* stats,
bool attach)
StatsHolder* stats)
: settings_(std::move(settings)), stats_(stats), processor_(processor) {
size_t max_nodes = processor->settings()->max_nodes;

Expand Down Expand Up @@ -166,17 +165,8 @@ FailureDetector::FailureDetector(UpdateableSettings<GossipSettings> settings,
for (const auto& it : nodes_) {
cs->setNodeState(it.first, ClusterState::NodeState::DEAD);
}

if (attach) {
start();
}
}

FailureDetector::FailureDetector(UpdateableSettings<GossipSettings> settings,
ServerProcessor* processor,
bool attach)
: FailureDetector(std::move(settings), processor, nullptr, attach) {}

void FailureDetector::start() {
std::unique_ptr<Request> rq = std::make_unique<InitRequest>(this);
int rv = processor_->postImportant(rq);
Expand Down
Loading

0 comments on commit 016132b

Please sign in to comment.