From e15b255d2362c5772b6a4dc1087b0be79069dc97 Mon Sep 17 00:00:00 2001 From: tgerdes Date: Fri, 24 May 2024 10:10:27 -0500 Subject: [PATCH 1/3] don't flush all requests at end of PA --- .../client_backend/mock_client_backend.h | 8 ++++++++ src/c++/perf_analyzer/concurrency_worker.cc | 8 ++++---- src/c++/perf_analyzer/infer_context.cc | 6 +++++- src/c++/perf_analyzer/infer_context.h | 4 ++++ src/c++/perf_analyzer/iworker.h | 1 + src/c++/perf_analyzer/load_manager.cc | 12 +++++++++--- src/c++/perf_analyzer/load_worker.cc | 19 ++++++++++++++++--- src/c++/perf_analyzer/load_worker.h | 4 ++++ src/c++/perf_analyzer/request_rate_worker.cc | 7 +++++-- .../perf_analyzer/test_concurrency_manager.cc | 4 ++-- .../perf_analyzer/test_load_manager_base.h | 6 ------ .../test_request_rate_manager.cc | 8 +++----- 12 files changed, 61 insertions(+), 26 deletions(-) diff --git a/src/c++/perf_analyzer/client_backend/mock_client_backend.h b/src/c++/perf_analyzer/client_backend/mock_client_backend.h index 483af914d..04f0304e9 100644 --- a/src/c++/perf_analyzer/client_backend/mock_client_backend.h +++ b/src/c++/perf_analyzer/client_backend/mock_client_backend.h @@ -489,6 +489,14 @@ class NaggyMockClientBackend : public ClientBackend { }); } + ~NaggyMockClientBackend() + { + // Make sure no requests carry over to the next test + while (stats_->num_active_infer_calls) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + MOCK_METHOD( Error, ModelConfig, (rapidjson::Document*, const std::string&, const std::string&), diff --git a/src/c++/perf_analyzer/concurrency_worker.cc b/src/c++/perf_analyzer/concurrency_worker.cc index 37a562f76..26b286b94 100644 --- a/src/c++/perf_analyzer/concurrency_worker.cc +++ b/src/c++/perf_analyzer/concurrency_worker.cc @@ -111,7 +111,7 @@ ConcurrencyWorker::HandleExecuteOff() // Wait if no request should be sent and it is not exiting thread_config_->is_paused_ = true; std::unique_lock lock(wake_mutex_); - wake_signal_.wait(lock, [this]() { return early_exit || execute_; }); + wake_signal_.wait(lock, [this]() { return exiting_ || execute_; }); // TODO REFACTOR TMA-1043 - memory manager should be handling this instead // of here @@ -131,10 +131,10 @@ ConcurrencyWorker::HandleNoConcurrency() // Wait if no request should be sent and it is not exiting std::unique_lock lock(wake_mutex_); wake_signal_.wait(lock, [this]() { - return early_exit || (thread_config_->concurrency_ > 0); + return exiting_ || (thread_config_->concurrency_ > 0); }); // Stop executing if concurrency is 0 and early exit is requested - if (early_exit && thread_config_->concurrency_ == 0) { + if (exiting_ && thread_config_->concurrency_ == 0) { return true; } } @@ -181,7 +181,7 @@ ConcurrencyWorker::WaitForResponses() std::unique_lock lk(cb_mtx_); thread_stat_->idle_timer.Start(); cb_cv_.wait(lk, [this] { - if (notified_) { + if (notified_ || exiting_) { notified_ = false; return true; } diff --git a/src/c++/perf_analyzer/infer_context.cc b/src/c++/perf_analyzer/infer_context.cc index 8929e6c99..d313d545b 100644 --- a/src/c++/perf_analyzer/infer_context.cc +++ b/src/c++/perf_analyzer/infer_context.cc @@ -63,7 +63,7 @@ InferContext::SendSequenceInferRequest(uint32_t seq_stat_index, bool delayed) // This also helps in reporting the realistic latencies. std::lock_guard guard( sequence_manager_->GetMutex(seq_stat_index)); - if (!early_exit && execute_) { + if (!exiting_ && execute_) { sequence_manager_->SetInferSequenceOptions( seq_stat_index, infer_data_.options_); @@ -298,6 +298,10 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result) // Add the request record to thread request records vector with // proper locking std::lock_guard lock(thread_stat_->mu_); + if (exiting_) { + return; + } + thread_stat_->cb_status_ = result_ptr->RequestStatus(); if (thread_stat_->cb_status_.IsOk()) { std::string request_id; diff --git a/src/c++/perf_analyzer/infer_context.h b/src/c++/perf_analyzer/infer_context.h index 7bacb16d5..20916fdde 100644 --- a/src/c++/perf_analyzer/infer_context.h +++ b/src/c++/perf_analyzer/infer_context.h @@ -104,6 +104,9 @@ class InferContext { // Initialize the context. Must be done before any inferences are sent void Init(); + // Signal to the context to stop working and exit + void Exit() { exiting_ = true; } + // Send a single inference request to the server void SendInferRequest(bool delayed = false); @@ -192,6 +195,7 @@ class InferContext { const uint32_t id_{0}; const size_t thread_id_{0}; + bool exiting_{false}; size_t GetNumActiveThreads() { return num_active_threads_; } diff --git a/src/c++/perf_analyzer/iworker.h b/src/c++/perf_analyzer/iworker.h index 3a72f4c10..2ee6e135e 100644 --- a/src/c++/perf_analyzer/iworker.h +++ b/src/c++/perf_analyzer/iworker.h @@ -33,6 +33,7 @@ namespace triton { namespace perfanalyzer { class IWorker { public: virtual void Infer() = 0; + virtual void Exit() = 0; }; }} // namespace triton::perfanalyzer diff --git a/src/c++/perf_analyzer/load_manager.cc b/src/c++/perf_analyzer/load_manager.cc index ac9150a9d..e95cbaf55 100644 --- a/src/c++/perf_analyzer/load_manager.cc +++ b/src/c++/perf_analyzer/load_manager.cc @@ -248,9 +248,15 @@ LoadManager::InitManagerInputs( void LoadManager::StopWorkerThreads() { - early_exit = true; - // wake up all threads - wake_signal_.notify_all(); + // FIXME do I need to acquire the lock first? + for (auto& worker : workers_) { + worker->Exit(); + } + + { + std::unique_lock lock(wake_mutex_); + wake_signal_.notify_all(); + } size_t cnt = 0; for (auto& thread : threads_) { diff --git a/src/c++/perf_analyzer/load_worker.cc b/src/c++/perf_analyzer/load_worker.cc index a32976c6a..0707f9304 100644 --- a/src/c++/perf_analyzer/load_worker.cc +++ b/src/c++/perf_analyzer/load_worker.cc @@ -34,6 +34,21 @@ namespace triton { namespace perfanalyzer { +void +LoadWorker::Exit() +{ + for (auto ctx : ctxs_) { + ctx->Exit(); + } + + exiting_ = true; + + { + std::lock_guard lk(cb_mtx_); + cb_cv_.notify_all(); + } +} + bool LoadWorker::ShouldExit() { @@ -44,7 +59,7 @@ LoadWorker::ShouldExit() thread_config_->num_requests_ != 0 && thread_stat_->num_sent_requests_ >= thread_config_->num_requests_; - return early_exit || bad_status || done_with_request_count; + return exiting_ || bad_status || done_with_request_count; } bool @@ -52,8 +67,6 @@ LoadWorker::HandleExitConditions() { if (ShouldExit()) { CompleteOngoingSequences(); - thread_stat_->idle_timer.Start(); - WaitForOngoingRequests(); return true; } return false; diff --git a/src/c++/perf_analyzer/load_worker.h b/src/c++/perf_analyzer/load_worker.h index dd7e0297f..933007e83 100644 --- a/src/c++/perf_analyzer/load_worker.h +++ b/src/c++/perf_analyzer/load_worker.h @@ -69,6 +69,8 @@ class LoadWorker : public IWorker { virtual ~LoadWorker() = default; + virtual void Exit() override; + protected: // Return the total number of async requests that have started and not // finished @@ -117,6 +119,8 @@ class LoadWorker : public IWorker { void AsyncCallbackFinalize(uint32_t ctx_id); + bool exiting_ = false; + uint32_t id_; std::vector> ctxs_; diff --git a/src/c++/perf_analyzer/request_rate_worker.cc b/src/c++/perf_analyzer/request_rate_worker.cc index 48ccb361b..e49bcaa13 100644 --- a/src/c++/perf_analyzer/request_rate_worker.cc +++ b/src/c++/perf_analyzer/request_rate_worker.cc @@ -46,6 +46,9 @@ RequestRateWorker::Infer() HandleExecuteOff(); bool is_delayed = SleepIfNecessary(); + if (HandleExitConditions()) { + return; + } uint32_t ctx_id = GetCtxId(); SendInferRequest(ctx_id, is_delayed); RestoreFreeCtxId(ctx_id); @@ -119,7 +122,7 @@ RequestRateWorker::HandleExecuteOff() // Wait if no request should be sent and it is not exiting thread_config_->is_paused_ = true; std::unique_lock lock(wake_mutex_); - wake_signal_.wait(lock, [this]() { return early_exit || execute_; }); + wake_signal_.wait(lock, [this]() { return exiting_ || execute_; }); } thread_config_->is_paused_ = false; @@ -155,7 +158,7 @@ RequestRateWorker::WaitForFreeCtx() std::unique_lock lk(cb_mtx_); thread_stat_->idle_timer.Start(); cb_cv_.wait(lk, [this] { - if (notified_) { + if (notified_ || exiting_) { notified_ = false; return true; } diff --git a/src/c++/perf_analyzer/test_concurrency_manager.cc b/src/c++/perf_analyzer/test_concurrency_manager.cc index 1941a018e..7918b1062 100644 --- a/src/c++/perf_analyzer/test_concurrency_manager.cc +++ b/src/c++/perf_analyzer/test_concurrency_manager.cc @@ -474,7 +474,7 @@ TEST_CASE("concurrency_free_ctx_ids") std::this_thread::sleep_for(std::chrono::milliseconds(15)); - early_exit = true; + worker->Exit(); infer_future.get(); // The first sequence should only be called two times, once at the very start, @@ -590,7 +590,7 @@ TEST_CASE("Concurrency - shared memory infer input calls") std::this_thread::sleep_for(std::chrono::milliseconds(18)); - early_exit = true; + worker->Exit(); infer_future.get(); const auto& actual_append_raw_calls{tcm.stats_->num_append_raw_calls}; diff --git a/src/c++/perf_analyzer/test_load_manager_base.h b/src/c++/perf_analyzer/test_load_manager_base.h index 6bbdf6d23..c48bdead1 100644 --- a/src/c++/perf_analyzer/test_load_manager_base.h +++ b/src/c++/perf_analyzer/test_load_manager_base.h @@ -66,12 +66,6 @@ class TestLoadManagerBase { is_sequence_model, is_decoupled_model); } - ~TestLoadManagerBase() - { - // Reset early_exit in case any test sets it to true during execution. - early_exit = false; - } - // Helper function to process custom json data in testing // Creates a model tensor to pass to a mock parser which is consumed by the // mock data loader diff --git a/src/c++/perf_analyzer/test_request_rate_manager.cc b/src/c++/perf_analyzer/test_request_rate_manager.cc index e4870b95b..5acf4e08e 100644 --- a/src/c++/perf_analyzer/test_request_rate_manager.cc +++ b/src/c++/perf_analyzer/test_request_rate_manager.cc @@ -149,7 +149,6 @@ class TestRequestRateManager : public TestLoadManagerBase, REQUIRE(timestamp.count() == expected_current_timestamp.count()); } } - early_exit = true; } void TestCreateSchedule( @@ -168,7 +167,6 @@ class TestRequestRateManager : public TestLoadManagerBase, total_num_seqs += w->thread_config_->num_sequences_; worker_schedule_sizes.push_back(w->schedule_->intervals.size()); } - early_exit = true; CHECK(num_of_sequences_ == total_num_seqs); for (int i = 0; i < worker_schedule_sizes.size() - 1; i++) { @@ -977,7 +975,7 @@ TEST_CASE("request_rate_streaming: test that streaming-specific logic works") std::dynamic_pointer_cast(worker)->SetSchedule(schedule); std::future infer_future{std::async(&IWorker::Infer, worker)}; - early_exit = true; + worker->Exit(); infer_future.get(); CHECK( @@ -1827,7 +1825,7 @@ TEST_CASE("Request rate - Shared memory infer input calls") std::this_thread::sleep_for(milliseconds(18)); - early_exit = true; + worker->Exit(); infer_future.get(); const auto& actual_append_raw_calls{trrm.stats_->num_append_raw_calls}; @@ -2184,7 +2182,7 @@ TEST_CASE("request rate create schedule") params.max_trials = 10; bool is_sequence_model = false; bool is_decoupled = false; - bool use_mock_infer = false; + bool use_mock_infer = true; double rate = 10; std::vector expected_worker_ratio; From 2ca6e64c70d0f46d772f3c96e97b525925e6e5b1 Mon Sep 17 00:00:00 2001 From: tgerdes Date: Thu, 30 May 2024 12:42:39 -0500 Subject: [PATCH 2/3] Only fast exit for non-shm cases --- src/c++/perf_analyzer/infer_context.cc | 2 +- src/c++/perf_analyzer/infer_context.h | 7 ++++++- src/c++/perf_analyzer/iworker.h | 2 +- src/c++/perf_analyzer/load_manager.cc | 8 +++++--- src/c++/perf_analyzer/load_manager.h | 1 + src/c++/perf_analyzer/load_worker.cc | 10 +++++++--- src/c++/perf_analyzer/load_worker.h | 3 ++- src/c++/perf_analyzer/test_concurrency_manager.cc | 4 ++-- src/c++/perf_analyzer/test_request_rate_manager.cc | 4 ++-- 9 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/c++/perf_analyzer/infer_context.cc b/src/c++/perf_analyzer/infer_context.cc index d313d545b..5e559cba5 100644 --- a/src/c++/perf_analyzer/infer_context.cc +++ b/src/c++/perf_analyzer/infer_context.cc @@ -298,7 +298,7 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result) // Add the request record to thread request records vector with // proper locking std::lock_guard lock(thread_stat_->mu_); - if (exiting_) { + if (exiting_ && fast_exit_) { return; } diff --git a/src/c++/perf_analyzer/infer_context.h b/src/c++/perf_analyzer/infer_context.h index 20916fdde..852d88784 100644 --- a/src/c++/perf_analyzer/infer_context.h +++ b/src/c++/perf_analyzer/infer_context.h @@ -105,7 +105,11 @@ class InferContext { void Init(); // Signal to the context to stop working and exit - void Exit() { exiting_ = true; } + void Exit(bool fast_exit) + { + exiting_ = true; + fast_exit_ = fast_exit; + } // Send a single inference request to the server void SendInferRequest(bool delayed = false); @@ -196,6 +200,7 @@ class InferContext { const uint32_t id_{0}; const size_t thread_id_{0}; bool exiting_{false}; + bool fast_exit_{false}; size_t GetNumActiveThreads() { return num_active_threads_; } diff --git a/src/c++/perf_analyzer/iworker.h b/src/c++/perf_analyzer/iworker.h index 2ee6e135e..bc6d1b9b7 100644 --- a/src/c++/perf_analyzer/iworker.h +++ b/src/c++/perf_analyzer/iworker.h @@ -33,7 +33,7 @@ namespace triton { namespace perfanalyzer { class IWorker { public: virtual void Infer() = 0; - virtual void Exit() = 0; + virtual void Exit(bool fast_exit) = 0; }; }} // namespace triton::perfanalyzer diff --git a/src/c++/perf_analyzer/load_manager.cc b/src/c++/perf_analyzer/load_manager.cc index e95cbaf55..f708d14b6 100644 --- a/src/c++/perf_analyzer/load_manager.cc +++ b/src/c++/perf_analyzer/load_manager.cc @@ -164,8 +164,8 @@ LoadManager::LoadManager( const std::unordered_map& request_parameters) : async_(async), streaming_(streaming), batch_size_(batch_size), - max_threads_(max_threads), parser_(parser), factory_(factory), - using_json_data_(false) + max_threads_(max_threads), shared_memory_type_{shared_memory_type}, + parser_(parser), factory_(factory), using_json_data_(false) { on_sequence_model_ = ((parser_->SchedulerType() == ModelParser::SEQUENCE) || @@ -248,9 +248,11 @@ LoadManager::InitManagerInputs( void LoadManager::StopWorkerThreads() { + bool fast_exit = shared_memory_type_ == SharedMemoryType::NO_SHARED_MEMORY; + // FIXME do I need to acquire the lock first? for (auto& worker : workers_) { - worker->Exit(); + worker->Exit(fast_exit); } { diff --git a/src/c++/perf_analyzer/load_manager.h b/src/c++/perf_analyzer/load_manager.h index 799bfa75f..fbaf5234a 100644 --- a/src/c++/perf_analyzer/load_manager.h +++ b/src/c++/perf_analyzer/load_manager.h @@ -140,6 +140,7 @@ class LoadManager { size_t batch_size_; size_t max_threads_; bool on_sequence_model_; + SharedMemoryType shared_memory_type_; std::shared_ptr parser_; std::shared_ptr factory_; diff --git a/src/c++/perf_analyzer/load_worker.cc b/src/c++/perf_analyzer/load_worker.cc index 0707f9304..69e683dfa 100644 --- a/src/c++/perf_analyzer/load_worker.cc +++ b/src/c++/perf_analyzer/load_worker.cc @@ -35,13 +35,14 @@ namespace triton { namespace perfanalyzer { void -LoadWorker::Exit() +LoadWorker::Exit(bool fast_exit) { for (auto ctx : ctxs_) { - ctx->Exit(); + ctx->Exit(fast_exit); } exiting_ = true; + fast_exit_ = fast_exit; { std::lock_guard lk(cb_mtx_); @@ -67,6 +68,9 @@ LoadWorker::HandleExitConditions() { if (ShouldExit()) { CompleteOngoingSequences(); + if (!fast_exit_) { + WaitForOngoingRequests(); + } return true; } return false; @@ -86,7 +90,7 @@ LoadWorker::CompleteOngoingSequences() void LoadWorker::WaitForOngoingRequests() { - while (GetNumOngoingRequests() != 0) { + while (GetNumOngoingRequests() != 0 && !fast_exit_) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } } diff --git a/src/c++/perf_analyzer/load_worker.h b/src/c++/perf_analyzer/load_worker.h index 933007e83..8d5c8d921 100644 --- a/src/c++/perf_analyzer/load_worker.h +++ b/src/c++/perf_analyzer/load_worker.h @@ -69,7 +69,7 @@ class LoadWorker : public IWorker { virtual ~LoadWorker() = default; - virtual void Exit() override; + virtual void Exit(bool fast_exit) override; protected: // Return the total number of async requests that have started and not @@ -120,6 +120,7 @@ class LoadWorker : public IWorker { void AsyncCallbackFinalize(uint32_t ctx_id); bool exiting_ = false; + bool fast_exit_ = false; uint32_t id_; diff --git a/src/c++/perf_analyzer/test_concurrency_manager.cc b/src/c++/perf_analyzer/test_concurrency_manager.cc index 7918b1062..07d803ba1 100644 --- a/src/c++/perf_analyzer/test_concurrency_manager.cc +++ b/src/c++/perf_analyzer/test_concurrency_manager.cc @@ -474,7 +474,7 @@ TEST_CASE("concurrency_free_ctx_ids") std::this_thread::sleep_for(std::chrono::milliseconds(15)); - worker->Exit(); + worker->Exit(false); infer_future.get(); // The first sequence should only be called two times, once at the very start, @@ -590,7 +590,7 @@ TEST_CASE("Concurrency - shared memory infer input calls") std::this_thread::sleep_for(std::chrono::milliseconds(18)); - worker->Exit(); + worker->Exit(false); infer_future.get(); const auto& actual_append_raw_calls{tcm.stats_->num_append_raw_calls}; diff --git a/src/c++/perf_analyzer/test_request_rate_manager.cc b/src/c++/perf_analyzer/test_request_rate_manager.cc index 5acf4e08e..81229623d 100644 --- a/src/c++/perf_analyzer/test_request_rate_manager.cc +++ b/src/c++/perf_analyzer/test_request_rate_manager.cc @@ -975,7 +975,7 @@ TEST_CASE("request_rate_streaming: test that streaming-specific logic works") std::dynamic_pointer_cast(worker)->SetSchedule(schedule); std::future infer_future{std::async(&IWorker::Infer, worker)}; - worker->Exit(); + worker->Exit(false); infer_future.get(); CHECK( @@ -1825,7 +1825,7 @@ TEST_CASE("Request rate - Shared memory infer input calls") std::this_thread::sleep_for(milliseconds(18)); - worker->Exit(); + worker->Exit(false); infer_future.get(); const auto& actual_append_raw_calls{trrm.stats_->num_append_raw_calls}; From bee15ad48b21f911d8d0eb966c416e604e107b7e Mon Sep 17 00:00:00 2001 From: tgerdes Date: Fri, 31 May 2024 09:37:48 -0500 Subject: [PATCH 3/3] comments --- src/c++/perf_analyzer/concurrency_worker.cc | 4 ++-- src/c++/perf_analyzer/infer_context.cc | 3 +++ src/c++/perf_analyzer/infer_context.h | 3 +++ src/c++/perf_analyzer/load_manager.cc | 1 - src/c++/perf_analyzer/load_worker.cc | 9 ++++----- src/c++/perf_analyzer/load_worker.h | 4 ++++ 6 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/c++/perf_analyzer/concurrency_worker.cc b/src/c++/perf_analyzer/concurrency_worker.cc index 26b286b94..ec22c9a38 100644 --- a/src/c++/perf_analyzer/concurrency_worker.cc +++ b/src/c++/perf_analyzer/concurrency_worker.cc @@ -133,7 +133,7 @@ ConcurrencyWorker::HandleNoConcurrency() wake_signal_.wait(lock, [this]() { return exiting_ || (thread_config_->concurrency_ > 0); }); - // Stop executing if concurrency is 0 and early exit is requested + // Stop executing if concurrency is 0 and we are exiting if (exiting_ && thread_config_->concurrency_ == 0) { return true; } @@ -181,7 +181,7 @@ ConcurrencyWorker::WaitForResponses() std::unique_lock lk(cb_mtx_); thread_stat_->idle_timer.Start(); cb_cv_.wait(lk, [this] { - if (notified_ || exiting_) { + if (notified_ || (exiting_ && fast_exit_)) { notified_ = false; return true; } diff --git a/src/c++/perf_analyzer/infer_context.cc b/src/c++/perf_analyzer/infer_context.cc index 5e559cba5..67fdb5af9 100644 --- a/src/c++/perf_analyzer/infer_context.cc +++ b/src/c++/perf_analyzer/infer_context.cc @@ -298,6 +298,9 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result) // Add the request record to thread request records vector with // proper locking std::lock_guard lock(thread_stat_->mu_); + + // If we are fast exiting, do not handle the request and instead exit + // immediately if (exiting_ && fast_exit_) { return; } diff --git a/src/c++/perf_analyzer/infer_context.h b/src/c++/perf_analyzer/infer_context.h index 852d88784..35715a24d 100644 --- a/src/c++/perf_analyzer/infer_context.h +++ b/src/c++/perf_analyzer/infer_context.h @@ -105,6 +105,9 @@ class InferContext { void Init(); // Signal to the context to stop working and exit + // If fast exit is true, everything should be immediately dropped + // If fast exit is false, the context should still handle outstanding requests + // before exiting void Exit(bool fast_exit) { exiting_ = true; diff --git a/src/c++/perf_analyzer/load_manager.cc b/src/c++/perf_analyzer/load_manager.cc index f708d14b6..148060864 100644 --- a/src/c++/perf_analyzer/load_manager.cc +++ b/src/c++/perf_analyzer/load_manager.cc @@ -250,7 +250,6 @@ LoadManager::StopWorkerThreads() { bool fast_exit = shared_memory_type_ == SharedMemoryType::NO_SHARED_MEMORY; - // FIXME do I need to acquire the lock first? for (auto& worker : workers_) { worker->Exit(fast_exit); } diff --git a/src/c++/perf_analyzer/load_worker.cc b/src/c++/perf_analyzer/load_worker.cc index 69e683dfa..aceab12e7 100644 --- a/src/c++/perf_analyzer/load_worker.cc +++ b/src/c++/perf_analyzer/load_worker.cc @@ -41,8 +41,8 @@ LoadWorker::Exit(bool fast_exit) ctx->Exit(fast_exit); } - exiting_ = true; fast_exit_ = fast_exit; + exiting_ = true; { std::lock_guard lk(cb_mtx_); @@ -68,9 +68,8 @@ LoadWorker::HandleExitConditions() { if (ShouldExit()) { CompleteOngoingSequences(); - if (!fast_exit_) { - WaitForOngoingRequests(); - } + thread_stat_->idle_timer.Start(); + WaitForOngoingRequests(); return true; } return false; @@ -90,7 +89,7 @@ LoadWorker::CompleteOngoingSequences() void LoadWorker::WaitForOngoingRequests() { - while (GetNumOngoingRequests() != 0 && !fast_exit_) { + while (GetNumOngoingRequests() != 0 && !(exiting_ && fast_exit_)) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } } diff --git a/src/c++/perf_analyzer/load_worker.h b/src/c++/perf_analyzer/load_worker.h index 8d5c8d921..c544222b3 100644 --- a/src/c++/perf_analyzer/load_worker.h +++ b/src/c++/perf_analyzer/load_worker.h @@ -69,6 +69,10 @@ class LoadWorker : public IWorker { virtual ~LoadWorker() = default; + // Tell the worker thread to stop issuing new requests and to exit + // If fast_exit is true, the worker thread should exit as fast as possible. If + // it is false, it should still wait for all outstanding requests before + // exiting virtual void Exit(bool fast_exit) override; protected: