diff --git a/src/c++/perf_analyzer/client_backend/mock_client_backend.h b/src/c++/perf_analyzer/client_backend/mock_client_backend.h index 483af914d..04f0304e9 100644 --- a/src/c++/perf_analyzer/client_backend/mock_client_backend.h +++ b/src/c++/perf_analyzer/client_backend/mock_client_backend.h @@ -489,6 +489,14 @@ class NaggyMockClientBackend : public ClientBackend { }); } + ~NaggyMockClientBackend() + { + // Make sure no requests carry over to the next test + while (stats_->num_active_infer_calls) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + MOCK_METHOD( Error, ModelConfig, (rapidjson::Document*, const std::string&, const std::string&), diff --git a/src/c++/perf_analyzer/concurrency_worker.cc b/src/c++/perf_analyzer/concurrency_worker.cc index 37a562f76..ec22c9a38 100644 --- a/src/c++/perf_analyzer/concurrency_worker.cc +++ b/src/c++/perf_analyzer/concurrency_worker.cc @@ -111,7 +111,7 @@ ConcurrencyWorker::HandleExecuteOff() // Wait if no request should be sent and it is not exiting thread_config_->is_paused_ = true; std::unique_lock lock(wake_mutex_); - wake_signal_.wait(lock, [this]() { return early_exit || execute_; }); + wake_signal_.wait(lock, [this]() { return exiting_ || execute_; }); // TODO REFACTOR TMA-1043 - memory manager should be handling this instead // of here @@ -131,10 +131,10 @@ ConcurrencyWorker::HandleNoConcurrency() // Wait if no request should be sent and it is not exiting std::unique_lock lock(wake_mutex_); wake_signal_.wait(lock, [this]() { - return early_exit || (thread_config_->concurrency_ > 0); + return exiting_ || (thread_config_->concurrency_ > 0); }); - // Stop executing if concurrency is 0 and early exit is requested - if (early_exit && thread_config_->concurrency_ == 0) { + // Stop executing if concurrency is 0 and we are exiting + if (exiting_ && thread_config_->concurrency_ == 0) { return true; } } @@ -181,7 +181,7 @@ ConcurrencyWorker::WaitForResponses() std::unique_lock lk(cb_mtx_); thread_stat_->idle_timer.Start(); cb_cv_.wait(lk, [this] { - if (notified_) { + if (notified_ || (exiting_ && fast_exit_)) { notified_ = false; return true; } diff --git a/src/c++/perf_analyzer/infer_context.cc b/src/c++/perf_analyzer/infer_context.cc index 8929e6c99..67fdb5af9 100644 --- a/src/c++/perf_analyzer/infer_context.cc +++ b/src/c++/perf_analyzer/infer_context.cc @@ -63,7 +63,7 @@ InferContext::SendSequenceInferRequest(uint32_t seq_stat_index, bool delayed) // This also helps in reporting the realistic latencies. std::lock_guard guard( sequence_manager_->GetMutex(seq_stat_index)); - if (!early_exit && execute_) { + if (!exiting_ && execute_) { sequence_manager_->SetInferSequenceOptions( seq_stat_index, infer_data_.options_); @@ -298,6 +298,13 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result) // Add the request record to thread request records vector with // proper locking std::lock_guard lock(thread_stat_->mu_); + + // If we are fast exiting, do not handle the request and instead exit + // immediately + if (exiting_ && fast_exit_) { + return; + } + thread_stat_->cb_status_ = result_ptr->RequestStatus(); if (thread_stat_->cb_status_.IsOk()) { std::string request_id; diff --git a/src/c++/perf_analyzer/infer_context.h b/src/c++/perf_analyzer/infer_context.h index 7bacb16d5..35715a24d 100644 --- a/src/c++/perf_analyzer/infer_context.h +++ b/src/c++/perf_analyzer/infer_context.h @@ -104,6 +104,16 @@ class InferContext { // Initialize the context. Must be done before any inferences are sent void Init(); + // Signal to the context to stop working and exit + // If fast exit is true, everything should be immediately dropped + // If fast exit is false, the context should still handle outstanding requests + // before exiting + void Exit(bool fast_exit) + { + exiting_ = true; + fast_exit_ = fast_exit; + } + // Send a single inference request to the server void SendInferRequest(bool delayed = false); @@ -192,6 +202,8 @@ class InferContext { const uint32_t id_{0}; const size_t thread_id_{0}; + bool exiting_{false}; + bool fast_exit_{false}; size_t GetNumActiveThreads() { return num_active_threads_; } diff --git a/src/c++/perf_analyzer/iworker.h b/src/c++/perf_analyzer/iworker.h index 3a72f4c10..bc6d1b9b7 100644 --- a/src/c++/perf_analyzer/iworker.h +++ b/src/c++/perf_analyzer/iworker.h @@ -33,6 +33,7 @@ namespace triton { namespace perfanalyzer { class IWorker { public: virtual void Infer() = 0; + virtual void Exit(bool fast_exit) = 0; }; }} // namespace triton::perfanalyzer diff --git a/src/c++/perf_analyzer/load_manager.cc b/src/c++/perf_analyzer/load_manager.cc index ac9150a9d..148060864 100644 --- a/src/c++/perf_analyzer/load_manager.cc +++ b/src/c++/perf_analyzer/load_manager.cc @@ -164,8 +164,8 @@ LoadManager::LoadManager( const std::unordered_map& request_parameters) : async_(async), streaming_(streaming), batch_size_(batch_size), - max_threads_(max_threads), parser_(parser), factory_(factory), - using_json_data_(false) + max_threads_(max_threads), shared_memory_type_{shared_memory_type}, + parser_(parser), factory_(factory), using_json_data_(false) { on_sequence_model_ = ((parser_->SchedulerType() == ModelParser::SEQUENCE) || @@ -248,9 +248,16 @@ LoadManager::InitManagerInputs( void LoadManager::StopWorkerThreads() { - early_exit = true; - // wake up all threads - wake_signal_.notify_all(); + bool fast_exit = shared_memory_type_ == SharedMemoryType::NO_SHARED_MEMORY; + + for (auto& worker : workers_) { + worker->Exit(fast_exit); + } + + { + std::unique_lock lock(wake_mutex_); + wake_signal_.notify_all(); + } size_t cnt = 0; for (auto& thread : threads_) { diff --git a/src/c++/perf_analyzer/load_manager.h b/src/c++/perf_analyzer/load_manager.h index 799bfa75f..fbaf5234a 100644 --- a/src/c++/perf_analyzer/load_manager.h +++ b/src/c++/perf_analyzer/load_manager.h @@ -140,6 +140,7 @@ class LoadManager { size_t batch_size_; size_t max_threads_; bool on_sequence_model_; + SharedMemoryType shared_memory_type_; std::shared_ptr parser_; std::shared_ptr factory_; diff --git a/src/c++/perf_analyzer/load_worker.cc b/src/c++/perf_analyzer/load_worker.cc index a32976c6a..aceab12e7 100644 --- a/src/c++/perf_analyzer/load_worker.cc +++ b/src/c++/perf_analyzer/load_worker.cc @@ -34,6 +34,22 @@ namespace triton { namespace perfanalyzer { +void +LoadWorker::Exit(bool fast_exit) +{ + for (auto ctx : ctxs_) { + ctx->Exit(fast_exit); + } + + fast_exit_ = fast_exit; + exiting_ = true; + + { + std::lock_guard lk(cb_mtx_); + cb_cv_.notify_all(); + } +} + bool LoadWorker::ShouldExit() { @@ -44,7 +60,7 @@ LoadWorker::ShouldExit() thread_config_->num_requests_ != 0 && thread_stat_->num_sent_requests_ >= thread_config_->num_requests_; - return early_exit || bad_status || done_with_request_count; + return exiting_ || bad_status || done_with_request_count; } bool @@ -73,7 +89,7 @@ LoadWorker::CompleteOngoingSequences() void LoadWorker::WaitForOngoingRequests() { - while (GetNumOngoingRequests() != 0) { + while (GetNumOngoingRequests() != 0 && !(exiting_ && fast_exit_)) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } } diff --git a/src/c++/perf_analyzer/load_worker.h b/src/c++/perf_analyzer/load_worker.h index dd7e0297f..c544222b3 100644 --- a/src/c++/perf_analyzer/load_worker.h +++ b/src/c++/perf_analyzer/load_worker.h @@ -69,6 +69,12 @@ class LoadWorker : public IWorker { virtual ~LoadWorker() = default; + // Tell the worker thread to stop issuing new requests and to exit + // If fast_exit is true, the worker thread should exit as fast as possible. If + // it is false, it should still wait for all outstanding requests before + // exiting + virtual void Exit(bool fast_exit) override; + protected: // Return the total number of async requests that have started and not // finished @@ -117,6 +123,9 @@ class LoadWorker : public IWorker { void AsyncCallbackFinalize(uint32_t ctx_id); + bool exiting_ = false; + bool fast_exit_ = false; + uint32_t id_; std::vector> ctxs_; diff --git a/src/c++/perf_analyzer/request_rate_worker.cc b/src/c++/perf_analyzer/request_rate_worker.cc index 48ccb361b..e49bcaa13 100644 --- a/src/c++/perf_analyzer/request_rate_worker.cc +++ b/src/c++/perf_analyzer/request_rate_worker.cc @@ -46,6 +46,9 @@ RequestRateWorker::Infer() HandleExecuteOff(); bool is_delayed = SleepIfNecessary(); + if (HandleExitConditions()) { + return; + } uint32_t ctx_id = GetCtxId(); SendInferRequest(ctx_id, is_delayed); RestoreFreeCtxId(ctx_id); @@ -119,7 +122,7 @@ RequestRateWorker::HandleExecuteOff() // Wait if no request should be sent and it is not exiting thread_config_->is_paused_ = true; std::unique_lock lock(wake_mutex_); - wake_signal_.wait(lock, [this]() { return early_exit || execute_; }); + wake_signal_.wait(lock, [this]() { return exiting_ || execute_; }); } thread_config_->is_paused_ = false; @@ -155,7 +158,7 @@ RequestRateWorker::WaitForFreeCtx() std::unique_lock lk(cb_mtx_); thread_stat_->idle_timer.Start(); cb_cv_.wait(lk, [this] { - if (notified_) { + if (notified_ || exiting_) { notified_ = false; return true; } diff --git a/src/c++/perf_analyzer/test_concurrency_manager.cc b/src/c++/perf_analyzer/test_concurrency_manager.cc index 1941a018e..07d803ba1 100644 --- a/src/c++/perf_analyzer/test_concurrency_manager.cc +++ b/src/c++/perf_analyzer/test_concurrency_manager.cc @@ -474,7 +474,7 @@ TEST_CASE("concurrency_free_ctx_ids") std::this_thread::sleep_for(std::chrono::milliseconds(15)); - early_exit = true; + worker->Exit(false); infer_future.get(); // The first sequence should only be called two times, once at the very start, @@ -590,7 +590,7 @@ TEST_CASE("Concurrency - shared memory infer input calls") std::this_thread::sleep_for(std::chrono::milliseconds(18)); - early_exit = true; + worker->Exit(false); infer_future.get(); const auto& actual_append_raw_calls{tcm.stats_->num_append_raw_calls}; diff --git a/src/c++/perf_analyzer/test_load_manager_base.h b/src/c++/perf_analyzer/test_load_manager_base.h index 6bbdf6d23..c48bdead1 100644 --- a/src/c++/perf_analyzer/test_load_manager_base.h +++ b/src/c++/perf_analyzer/test_load_manager_base.h @@ -66,12 +66,6 @@ class TestLoadManagerBase { is_sequence_model, is_decoupled_model); } - ~TestLoadManagerBase() - { - // Reset early_exit in case any test sets it to true during execution. - early_exit = false; - } - // Helper function to process custom json data in testing // Creates a model tensor to pass to a mock parser which is consumed by the // mock data loader diff --git a/src/c++/perf_analyzer/test_request_rate_manager.cc b/src/c++/perf_analyzer/test_request_rate_manager.cc index e4870b95b..81229623d 100644 --- a/src/c++/perf_analyzer/test_request_rate_manager.cc +++ b/src/c++/perf_analyzer/test_request_rate_manager.cc @@ -149,7 +149,6 @@ class TestRequestRateManager : public TestLoadManagerBase, REQUIRE(timestamp.count() == expected_current_timestamp.count()); } } - early_exit = true; } void TestCreateSchedule( @@ -168,7 +167,6 @@ class TestRequestRateManager : public TestLoadManagerBase, total_num_seqs += w->thread_config_->num_sequences_; worker_schedule_sizes.push_back(w->schedule_->intervals.size()); } - early_exit = true; CHECK(num_of_sequences_ == total_num_seqs); for (int i = 0; i < worker_schedule_sizes.size() - 1; i++) { @@ -977,7 +975,7 @@ TEST_CASE("request_rate_streaming: test that streaming-specific logic works") std::dynamic_pointer_cast(worker)->SetSchedule(schedule); std::future infer_future{std::async(&IWorker::Infer, worker)}; - early_exit = true; + worker->Exit(false); infer_future.get(); CHECK( @@ -1827,7 +1825,7 @@ TEST_CASE("Request rate - Shared memory infer input calls") std::this_thread::sleep_for(milliseconds(18)); - early_exit = true; + worker->Exit(false); infer_future.get(); const auto& actual_append_raw_calls{trrm.stats_->num_append_raw_calls}; @@ -2184,7 +2182,7 @@ TEST_CASE("request rate create schedule") params.max_trials = 10; bool is_sequence_model = false; bool is_decoupled = false; - bool use_mock_infer = false; + bool use_mock_infer = true; double rate = 10; std::vector expected_worker_ratio;