From 01ba273cf6d90661f9e14b7d6890117e95ec604b Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Fri, 31 May 2024 15:11:42 -0700 Subject: [PATCH 1/7] Add response sender to non-decoupled models and unify data pipelines (#360) * Add response sender to non-decoupled model and unify data pipelines * Rename variable and class name --- src/infer_request.cc | 7 - src/pb_stub.cc | 240 ++++++------------ src/pb_stub.h | 7 +- src/python_be.cc | 566 ++----------------------------------------- src/python_be.h | 39 +-- 5 files changed, 103 insertions(+), 756 deletions(-) diff --git a/src/infer_request.cc b/src/infer_request.cc index 31182281..fc1e4206 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -402,13 +402,6 @@ InferRequest::IsCancelled() std::shared_ptr InferRequest::GetResponseSender() { - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - if (!stub->IsDecoupled()) { - throw PythonBackendException( - "'get_response_sender' function must be called only when the model is " - "using the decoupled transaction policy."); - } - return response_sender_; } diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 56d466f5..0e288e68 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -402,11 +402,7 @@ Stub::RunCommand() shm_pool_->Load(ipc_message->Args()); RequestBatch* request_batch_shm_ptr = reinterpret_cast(request_batch.data_.get()); - if (!ipc_control_->decoupled) { - ProcessRequests(request_batch_shm_ptr); - } else { - ProcessRequestsDecoupled(request_batch_shm_ptr); - } + ProcessRequests(request_batch_shm_ptr); } break; case PYTHONSTUB_CommandType::PYTHONSTUB_FinalizeRequest: @@ -597,18 +593,6 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle) initialized_ = true; } -void -Stub::ProcessResponse(InferResponse* response) -{ - response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */); - - for (auto& output_tensor : response->OutputTensors()) { - if (!output_tensor->IsCPU()) { - gpu_tensors_.push_back(output_tensor); - } - } -} - void Stub::LoadGPUBuffers(std::unique_ptr& ipc_message) { @@ -682,7 +666,7 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr) } void -Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr) +Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { py::list py_request_list = LoadRequestsFromSharedMemory(request_batch_shm_ptr); @@ -718,18 +702,21 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr) py::object execute_return = model_instance_.attr("execute")(py_request_list); + bool is_coroutine = py::module::import("asyncio") .attr("iscoroutine")(execute_return) .cast(); if (is_coroutine) { - RunCoroutine(execute_return); - } else { - if (!py::isinstance(execute_return)) { - throw PythonBackendException( - "Python model '" + name_ + - "' is using the decoupled mode and the execute function must " - "return None."); + if (IsDecoupled()) { + // Do not wait for async decoupled execute to return. + RunCoroutine(execute_return, true /* in_background */); + } else { + py::object coroutine_return = + RunCoroutine(execute_return, false /* in_background */); + ProcessReturnedResponses(py_request_list, coroutine_return); } + } else { + ProcessReturnedResponses(py_request_list, execute_return); } } } @@ -757,151 +744,60 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr) } void -Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) +Stub::ProcessReturnedResponses( + py::list py_requests, py::object py_responses_obj) { - std::unique_ptr execute_response = - IPCMessage::Create(shm_pool_, false /* Inline response */); - execute_response->Command() = PYTHONSTUB_ExecuteResponse; - - AllocatedSharedMemory response_batch = shm_pool_->Construct( - request_batch_shm_ptr->batch_size * - sizeof(bi::managed_external_buffer::handle_t) + - sizeof(ResponseBatch)); - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); - - std::unique_ptr error_string_shm; - py::list inference_responses; - - bi::managed_external_buffer::handle_t* responses_shm_handle = - reinterpret_cast( - response_batch.data_.get() + sizeof(ResponseBatch)); - - py::list responses; - - // Notifying the stub should be after responses. - ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); }); - ScopedDefer _( - [this, &execute_response] { SendIPCMessage(execute_response); }); - - execute_response->Args() = response_batch.handle_; - - bool has_exception = false; - std::string error_string; - try { - response_batch_shm_ptr->has_error = false; - response_batch_shm_ptr->is_error_set = false; - - uint32_t batch_size = request_batch_shm_ptr->batch_size; - - if (batch_size == 0) { - return; - } - - py::list py_request_list = - LoadRequestsFromSharedMemory(request_batch_shm_ptr); - - if (!py::hasattr(model_instance_, "execute")) { - std::string message = "Python model " + model_context_.PythonModelPath() + - " does not implement `execute` method."; - throw PythonBackendException(message); - } - - py::object request_list = py_request_list; - py::module asyncio = py::module::import("asyncio"); - - // Execute Response - py::object execute_return; - py::object responses_obj; - bool is_coroutine; - - { - NVTX_RANGE(nvtx_, "PyExecute " + name_); - execute_return = model_instance_.attr("execute")(request_list); - is_coroutine = asyncio.attr("iscoroutine")(execute_return).cast(); - } - - if (is_coroutine) { - responses_obj = asyncio.attr("run")(execute_return); - } else { - responses_obj = execute_return; - } - - // Check the return type of execute function. - if (!py::isinstance(responses_obj)) { - std::string str = py::str(execute_return.get_type()); - throw PythonBackendException( - std::string("Expected a list in the execute return, found type '") + - str + "'."); - } - - responses = responses_obj; - size_t response_size = py::len(responses); - - // If the number of request objects do not match the number of - // response objects throw an error. - if (response_size != batch_size) { - std::string err = - "Number of InferenceResponse objects do not match the number " - "of " - "InferenceRequest objects. InferenceRequest(s) size is:" + - std::to_string(batch_size) + ", and InferenceResponse(s) size is:" + - std::to_string(response_size) + "\n"; - throw PythonBackendException(err); - } - - for (size_t i = 0; i < response_size; i++) { - // Check the return type of execute function. - InferRequest* infer_request = py_request_list[i].cast(); - if (infer_request->ReleaseFlags() == - TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { - if (!py::isinstance(responses[i])) { - // When the request is rescheduled in non-decoupled model, the - // response must be None. - std::string str = py::str(responses[i].get_type()); - throw PythonBackendException( - "Expected a None object in the execute function return list for " - "reschduled request, " - "found type '" + - str + "'."); - } - } else { - if (!py::isinstance(responses[i])) { - std::string str = py::str(responses[i].get_type()); - throw PythonBackendException( - std::string( - "Expected an 'InferenceResponse' object in the execute " - "function return list, found type '") + - str + "'."); - } - InferResponse* infer_response = responses[i].cast(); - infer_response->PruneOutputTensors( - infer_request->RequestedOutputNames()); - ProcessResponse(infer_response); - responses_shm_handle[i] = infer_response->ShmHandle(); - } - } - response_batch_shm_ptr->batch_size = response_size; + // Return if there is nothing to process. + if (py::isinstance(py_responses_obj)) { + return; } - catch (const PythonBackendException& pb_exception) { - has_exception = true; - error_string = pb_exception.what(); + // Only non-decoupled may return responses. + if (IsDecoupled()) { + throw PythonBackendException( + "Python model '" + name_ + + "' is using the decoupled mode and the execute function must return " + "None."); } - catch (const py::error_already_set& error) { - has_exception = true; - error_string = error.what(); + // Check responses is a list. + if (!py::isinstance(py_responses_obj)) { + throw PythonBackendException( + "Expected a list in the execute return, found type '" + + std::string(py::str(py_responses_obj.get_type())) + "'."); + } + py::list py_responses = py_responses_obj; + // Responses and requests length must match. + size_t requests_size = py::len(py_requests); + size_t responses_size = py::len(py_responses); + if (requests_size != responses_size) { + throw PythonBackendException( + "Number of InferenceResponse objects do not match the number of " + "InferenceRequest objects. InferenceRequest(s) size is:" + + std::to_string(requests_size) + ", and InferenceResponse(s) size is:" + + std::to_string(responses_size) + "\n"); } - if (has_exception) { - std::string err_message = - std::string( - "Failed to process the request(s) for model '" + name_ + - "', message: ") + - error_string; - error_string_shm = PbString::Create(shm_pool_, error_string); - response_batch_shm_ptr->has_error = true; - response_batch_shm_ptr->is_error_set = true; - response_batch_shm_ptr->error = error_string_shm->ShmHandle(); + for (size_t i = 0; i < responses_size; i++) { + if (!py::isinstance(py_responses[i])) { + InferRequest* request = py_requests[i].cast(); + // Response must be None if rescheduled. + if (request->ReleaseFlags() == TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { + throw PythonBackendException( + "Expected a None object in the execute function return list for " + "reschduled request, found type '" + + std::string(py::str(py_responses[i].get_type())) + "'."); + } + // Send the response. + if (!py::isinstance(py_responses[i])) { + throw PythonBackendException( + "Expected an 'InferenceResponse' object in the execute function " + "return list, found type '" + + std::string(py::str(py_responses[i].get_type())) + "'."); + } + std::shared_ptr response = + py_responses[i].cast>(); + request->GetResponseSender()->Send( + response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + } } } @@ -923,15 +819,19 @@ Stub::GetAsyncEventLoop() return async_event_loop_; } -void -Stub::RunCoroutine(py::object coroutine) +py::object +Stub::RunCoroutine(py::object coroutine, bool in_background) { py::object loop = GetAsyncEventLoop(); py::object py_future = py::module_::import("asyncio").attr( "run_coroutine_threadsafe")(coroutine, loop); - py_future.attr("add_done_callback")( - py::module_::import("c_python_backend_utils") - .attr("async_event_future_done_callback")); + if (in_background) { + py_future.attr("add_done_callback")( + py::module_::import("c_python_backend_utils") + .attr("async_event_future_done_callback")); + return py::none(); + } + return py_future.attr("result")(); } void diff --git a/src/pb_stub.h b/src/pb_stub.h index c9462fd0..10e7606a 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -253,11 +253,12 @@ class Stub { /// Execute a batch of requests. void ProcessRequests(RequestBatch* request_batch_shm_ptr); - void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr); + void ProcessReturnedResponses( + py::list py_requests, py::object py_responses_obj); py::object GetAsyncEventLoop(); - void RunCoroutine(py::object coroutine); + py::object RunCoroutine(py::object coroutine, bool in_background); /// Get the memory manager message queue std::unique_ptr>& MemoryManagerQueue(); @@ -265,8 +266,6 @@ class Stub { /// Get the shared memory pool std::unique_ptr& ShmPool() { return shm_pool_; } - void ProcessResponse(InferResponse* response); - void ProcessBLSResponseDecoupled(std::unique_ptr& ipc_message); void LoadGPUBuffers(std::unique_ptr& ipc_message); diff --git a/src/python_be.cc b/src/python_be.cc index b95fb715..d5657b30 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -153,124 +153,6 @@ ModelInstanceState::SetErrorForResponseSendMessage( } } -void -ModelInstanceState::SendMessageAndReceiveResponse( - bi::managed_external_buffer::handle_t message, - bi::managed_external_buffer::handle_t& response, bool& restart, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count) -{ - auto error = SendMessageToStub(message); - if (error != nullptr) { - restart = true; - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - - return; - } - - bi::managed_external_buffer::handle_t response_message; - error = Stub()->ReceiveMessageFromStub(response_message); - if (error != nullptr) { - restart = true; - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - - return; - } - - response = response_message; -} - -TRITONSERVER_Error* -ModelInstanceState::SendMessageToStub( - bi::managed_external_buffer::handle_t message) -{ - bool success = false; - while (!success) { - uint64_t timeout_miliseconds = 1000; - { - boost::posix_time::ptime timeout = - boost::get_system_time() + - boost::posix_time::milliseconds(timeout_miliseconds); - - bi::scoped_lock lock( - *(Stub()->HealthMutex()), timeout); - - // Check if lock has been acquired. - if (lock) { - Stub()->IpcControl()->stub_health = false; - } else { - // If it failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex."); - } - } - - Stub()->StubMessageQueue()->Push( - message, timeout_miliseconds /* duration ms */, success); - - if (!success && !IsStubProcessAlive()) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, "Stub process is not healthy."); - } - } - - return nullptr; // success -} - -void -ModelInstanceState::RespondErrorToAllRequests( - const char* message, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count) -{ - for (uint32_t r = 0; r < request_count; ++r) { - if ((*responses)[r] == nullptr) - continue; - - std::string err_message = - std::string( - "Failed to process the request(s) for model instance '" + Name() + - "', message: ") + - message; - - TRITONSERVER_Error* err = - TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, err_message.c_str()); - LOG_IF_ERROR( - TRITONBACKEND_ResponseSend( - (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), - "failed sending response"); - - (*responses)[r] = nullptr; - TRITONSERVER_ErrorDelete(err); - } -} - -void -ModelInstanceState::WaitForBLSRequestsToFinish() -{ - futures_.clear(); -} - -bool -ModelInstanceState::IsStubProcessAlive() -{ - boost::posix_time::ptime timeout = - boost::get_system_time() + boost::posix_time::seconds(1); - bi::scoped_lock lock(*Stub()->HealthMutex(), timeout); - - // Check if lock has been acquired. - if (lock) { - return Stub()->IpcControl()->stub_health; - } else { - // If It failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - return false; - } -} - TRITONSERVER_Error* ModelInstanceState::SaveRequestsToSharedMemory( TRITONBACKEND_Request** requests, const uint32_t request_count, @@ -408,24 +290,15 @@ ModelInstanceState::SaveRequestsToSharedMemory( request, &request_timeout)); std::unique_ptr infer_request; - if (model_state->IsDecoupled()) { - TRITONBACKEND_ResponseFactory* factory_ptr; - RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); - - infer_request = std::make_unique( - id, correlation_id, pb_input_tensors, requested_output_names, - model_state->Name(), model_state->Version(), parameters_string, flags, - request_timeout, reinterpret_cast(factory_ptr), - reinterpret_cast(request), - PreferredMemory(PreferredMemory::kDefault, 0), trace); - } else { - infer_request = std::make_unique( - id, correlation_id, pb_input_tensors, requested_output_names, - model_state->Name(), model_state->Version(), parameters_string, flags, - request_timeout, 0 /* response_factory_address */, - reinterpret_cast(request), - PreferredMemory(PreferredMemory::kDefault, 0), trace); - } + TRITONBACKEND_ResponseFactory* factory_ptr; + RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + + infer_request = std::make_unique( + id, correlation_id, pb_input_tensors, requested_output_names, + model_state->Name(), model_state->Version(), parameters_string, flags, + request_timeout, reinterpret_cast(factory_ptr), + reinterpret_cast(request), + PreferredMemory(PreferredMemory::kDefault, 0), trace); RETURN_IF_EXCEPTION(infer_request->SaveToSharedMemory(Stub()->ShmPool())); requests_shm[r] = infer_request->ShmHandle(); pb_infer_requests.emplace_back(std::move(infer_request)); @@ -449,11 +322,9 @@ ModelInstanceState::LaunchStubProcess() thread_pool_ = std::make_unique( model_state->StateForBackend()->thread_pool_size); - if (model_state->IsDecoupled()) { - decoupled_thread_ = true; - decoupled_monitor_ = - std::thread(&ModelInstanceState::DecoupledMessageQueueMonitor, this); - } + queue_monitor_thread_ = true; + decoupled_monitor_ = + std::thread(&ModelInstanceState::MessageQueueMonitor, this); request_executor_ = std::make_unique( Stub()->ShmPool(), model_state->TritonServer()); @@ -806,9 +677,9 @@ ModelInstanceState::ExecuteBLSRequest( } void -ModelInstanceState::DecoupledMessageQueueMonitor() +ModelInstanceState::MessageQueueMonitor() { - while (decoupled_thread_) { + while (queue_monitor_thread_) { bi::managed_external_buffer::handle_t handle = Stub()->ParentMessageQueue()->Pop(); if (handle == DUMMY_MESSAGE) { @@ -1306,7 +1177,7 @@ ModelInstanceState::ResponseSendDecoupled( } TRITONSERVER_Error* -ModelInstanceState::ProcessRequestsDecoupled( +ModelInstanceState::ProcessRequests( TRITONBACKEND_Request** requests, const uint32_t request_count, std::vector>& pb_infer_requests, PbMetricReporter& reporter) @@ -1382,364 +1253,6 @@ ModelInstanceState::ProcessRequestsDecoupled( return nullptr; // success } -void -ModelInstanceState::ProcessRequests( - TRITONBACKEND_Request** requests, const uint32_t request_count, - std::vector>& pb_infer_requests, - bool& restart) -{ - NVTX_RANGE(nvtx_, "ProcessRequests " + Name()); - ModelState* model_state = reinterpret_cast(Model()); - std::string name = model_state->Name(); - - LOG_MESSAGE( - TRITONSERVER_LOG_VERBOSE, - (std::string("model ") + model_state->Name() + ", instance " + Name() + - ", executing " + std::to_string(request_count) + " requests") - .c_str()); - - uint64_t exec_start_ns = 0; - SET_TIMESTAMP(exec_start_ns); - - // We take the responsibility of the responses. - std::shared_ptr> responses( - new std::vector()); - responses->reserve(request_count); - PbMetricReporter reporter( - TritonModelInstance(), requests, request_count, responses); - reporter.SetExecStartNs(exec_start_ns); - - for (size_t i = 0; i < request_count; i++) { - TRITONBACKEND_Response* response; - auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); - if (err == nullptr) { - responses->emplace_back(response); - } else { - responses->emplace_back(nullptr); - LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response"); - TRITONSERVER_ErrorDelete(err); - } - } - - size_t total_batch_size = 0; - RESPOND_ALL_AND_RETURN_IF_ERROR( - responses, request_count, - CheckIncomingRequests(requests, request_count, total_batch_size)); - - // No request to process - if (total_batch_size == 0) { - return; - } - - // Wait for all the pending BLS requests to be completed. - ScopedDefer bls_defer([this] { WaitForBLSRequestsToFinish(); }); - AllocatedSharedMemory request_batch; - RESPOND_ALL_AND_RETURN_IF_ERROR( - responses, request_count, - SaveRequestsToSharedMemory( - requests, request_count, pb_infer_requests, request_batch, - responses)); - - std::shared_ptr ipc_message = - IPCMessage::Create(Stub()->ShmPool(), false /*inline_response*/); - ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest; - ipc_message->Args() = request_batch.handle_; - - uint64_t compute_start_ns = 0; - SET_TIMESTAMP(compute_start_ns); - reporter.SetComputeStartNs(compute_start_ns); - - // This means that the stub process has exited and Python - // backend failed to restart the stub process. - if (!Stub()->StubActive()) { - const char* error_message = "The stub process has exited unexpectedly."; - RespondErrorToAllRequests( - error_message, responses, requests, request_count); - return; - } - - bi::managed_external_buffer::handle_t response_message; - { - NVTX_RANGE(nvtx_, "StubProcessing " + Name()); - SendMessageAndReceiveResponse( - ipc_message->ShmHandle(), response_message, restart, responses, - requests, request_count); - } - - ScopedDefer execute_finalize([this, &restart] { - // Push a dummy message to the message queue so that - // the stub process is notified that it can release - // the object stored in shared memory. - NVTX_RANGE(nvtx_, "RequestExecuteFinalize " + Name()); - if (!restart) - // Push a dummy message to signal the thread to terminate. - Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE); - }); - if (restart) { - return; - } - - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - ipc_message = IPCMessage::LoadFromSharedMemory( - Stub()->ShmPool(), response_message)); - - // If the stub command is no longer PYTHONSTUB_InferExecRequest, it indicates - // that inference request execution has finished and there are no more BLS - // requests to execute. Otherwise, the Python backend will continuously - // execute BLS requests pushed to the message queue. - while (ipc_message->Command() == - PYTHONSTUB_CommandType::PYTHONSTUB_InferExecRequest || - ipc_message->Command() == - PYTHONSTUB_CommandType::PYTHONSTUB_InferStreamExecRequest) { - std::packaged_task task([this, ipc_message] { - ExecuteBLSRequest( - ipc_message, - (ipc_message->Command() == - PYTHONSTUB_CommandType::PYTHONSTUB_InferStreamExecRequest)); - }); - std::future future = - boost::asio::post(*thread_pool_, std::move(task)); - futures_.emplace_back(std::move(future)); - - auto error = Stub()->ReceiveMessageFromStub(response_message); - if (error != nullptr) { - restart = true; - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - return; - } - - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - ipc_message = IPCMessage::LoadFromSharedMemory( - Stub()->ShmPool(), response_message)); - } - - uint64_t compute_end_ns = 0; - SET_TIMESTAMP(compute_end_ns); - reporter.SetComputeEndNs(compute_end_ns); - - // Parsing the request response - AllocatedSharedMemory response_batch; - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - response_batch = Stub()->ShmPool()->Load(ipc_message->Args())); - - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); - - // If inference fails, release all the requests and send an error response. - // If inference fails at this stage, it usually indicates a bug in the model - // code - if (response_batch_shm_ptr->has_error) { - if (response_batch_shm_ptr->is_error_set) { - std::unique_ptr error_message_shm; - RESPOND_ALL_AND_RETURN_IF_EXCEPTION( - responses, request_count, - error_message_shm = PbString::LoadFromSharedMemory( - Stub()->ShmPool(), response_batch_shm_ptr->error)); - RespondErrorToAllRequests( - error_message_shm->String().c_str(), responses, requests, - request_count); - } else { - const char* error_message = - "Failed to fetch the error in response batch."; - RespondErrorToAllRequests( - error_message, responses, requests, request_count); - } - - // Reset the release flags for all the requests. - for (auto& infer_request : pb_infer_requests) { - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - } - return; - } - - bi::managed_external_buffer::handle_t* response_shm_handle = - reinterpret_cast( - response_batch.data_.get() + sizeof(ResponseBatch)); - - // If the output provided by the model is in GPU, we will pass the list of - // buffers provided by Triton to the stub process. - bool has_gpu_output = false; - std::vector requires_deferred_callback; - - std::vector> shm_responses; - std::vector, void*>>> - gpu_output_buffers(request_count); - GPUBuffersHelper gpu_buffer_helper; - - for (uint32_t r = 0; r < request_count; ++r) { - NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); - TRITONBACKEND_Response* response = (*responses)[r]; - TRITONBACKEND_Request* request = requests[r]; - uint32_t requested_output_count = 0; - requires_deferred_callback.push_back(false); - - shm_responses.emplace_back(nullptr); - std::unique_ptr& infer_response = shm_responses.back(); - try { - if (pb_infer_requests[r]->ReleaseFlags() == - TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { - // For rescheduled requests, we do not need to send a response. - LOG_IF_ERROR( - TRITONBACKEND_ResponseDelete((*responses)[r]), - "failed to delete response"); - (*responses)[r] = nullptr; - continue; - } - infer_response = InferResponse::LoadFromSharedMemory( - Stub()->ShmPool(), response_shm_handle[r], - false /* open_cuda_handle */); - if (infer_response->HasError()) { - TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( - infer_response->Error()->Code(), - infer_response->Error()->Message().c_str()); - - LOG_IF_ERROR( - TRITONBACKEND_ResponseSend( - (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), - "failed sending response"); - TRITONSERVER_ErrorDelete(err); - (*responses)[r] = nullptr; - - // Reset the release flags for the request. - pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - - // If has_error is true, we do not look at the response tensors. - continue; - } - } - catch (const PythonBackendException& pb_exception) { - TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, pb_exception.what()); - LOG_IF_ERROR( - TRITONBACKEND_ResponseSend( - (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), - "failed sending response"); - TRITONSERVER_ErrorDelete(err); - (*responses)[r] = nullptr; - - // Reset the release flags for the request. - pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - - continue; - } - - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_RequestOutputCount(request, &requested_output_count)); - - std::set requested_output_names; - for (size_t j = 0; j < requested_output_count; ++j) { - const char* output_name; - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_RequestOutputName(request, j, &output_name)); - requested_output_names.insert(output_name); - } - - bool require_deferred_callback = false; - -#ifdef TRITON_ENABLE_GPU - for (auto& output_tensor : infer_response->OutputTensors()) { - if (output_tensor->MemoryType() == TRITONSERVER_MEMORY_GPU) { - // Attempt to use the cuda shared memory pool for GPU tensor. - ShareCUDAMemoryPool(output_tensor->MemoryTypeId()); - } - } -#endif // TRITON_ENABLE_GPU - - gpu_output_buffers[r] = - std::vector, void*>>{}; - infer_response->Send( - response, CudaStream(), require_deferred_callback, - TRITONSERVER_RESPONSE_COMPLETE_FINAL, Stub()->ShmPool(), - gpu_buffer_helper, gpu_output_buffers[r], requested_output_names); - - requires_deferred_callback[r] = require_deferred_callback; - - if (requires_deferred_callback[r]) { - has_gpu_output = true; - } - } - - // Finalize the execute. - execute_finalize.Complete(); - - // If the output tensor is in GPU, there will be a second round trip - // required for filling the GPU buffers provided by the main process. - if (has_gpu_output) { - ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; - gpu_buffer_helper.Complete(Stub()->ShmPool()); - ipc_message->Args() = gpu_buffer_helper.ShmHandle(); - SendMessageAndReceiveResponse( - ipc_message->ShmHandle(), response_message, restart, responses, - requests, 0); - - bool cuda_copy = false; - - uint32_t response_index = 0; - for (auto& gpu_output_buffer : gpu_output_buffers) { - for (auto& buffer_memory_pair : gpu_output_buffer) { - auto& pb_memory = buffer_memory_pair.first; - void* pointer = buffer_memory_pair.second; - bool cuda_used = false; - - if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { - GUARDED_RESPOND_IF_ERROR( - responses, response_index, - CopyBuffer( - "Failed to copy the output tensor to buffer.", - TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, - pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, - CudaStream(), &cuda_used)); - cuda_copy |= cuda_used; - } else if ( - (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && - pb_memory->UseCUDASharedPool() && - (pb_memory->DataPtr() != pointer)) { - // If the data pointer from pb_memory is not the same as the pointer, - // it means that the Triton-provided buffer is not used during tensor - // transfer. Instead, an intermediate buffer that uses CUDA shared - // memory pool is used. In this case, we need to copy the data - // from the intermediate buffer back to the Triton-provided buffer. - GUARDED_RESPOND_IF_ERROR( - responses, response_index, - CopyBuffer( - "Failed to copy the output tensor to buffer.", - TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), - TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), - pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, - CudaStream(), &cuda_used)); - cuda_copy |= cuda_used; - } - } - response_index++; -#ifdef TRITON_ENABLE_GPU - if (cuda_copy) { - cudaStreamSynchronize(stream_); - } -#endif // TRITON_ENABLE_GPU - } - } - - bls_defer.Complete(); - for (uint32_t r = 0; r < request_count; ++r) { - if (requires_deferred_callback[r]) { - shm_responses[r]->DeferredSendCallback(); - } - } - - uint64_t exec_end_ns = 0; - SET_TIMESTAMP(exec_end_ns); - reporter.SetExecEndNs(exec_end_ns); - reporter.SetBatchStatistics(total_batch_size); - - return; -} - void ModelInstanceState::PrepareResponseBatch( ResponseBatch** response_batch, @@ -1873,18 +1386,13 @@ ModelInstanceState::ShareCUDAMemoryPool(const int32_t device_id) ModelInstanceState::~ModelInstanceState() { - ModelState* model_state = reinterpret_cast(Model()); Stub()->UpdateHealth(); if (Stub()->IsHealthy()) { - if (model_state->IsDecoupled()) { - // Wait for all the pending tasks to finish. - thread_pool_->wait(); - // Push a dummy message to signal the thread to terminate. - Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); - decoupled_monitor_.join(); - } else { - thread_pool_->wait(); - } + // Wait for all the pending tasks to finish. + thread_pool_->wait(); + // Push a dummy message to signal the thread to terminate. + Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); + decoupled_monitor_.join(); } // Terminate stub first to allow any last messages to be received by the back // end before deallocating the queue memory @@ -2445,36 +1953,10 @@ TRITONBACKEND_ModelInstanceExecute( // If restart is equal to true, it indicates that the stub process is // unhealthy and needs a restart. - bool restart = false; - ModelState* model_state = - reinterpret_cast(instance_state->Model()); - std::vector> infer_requests; - if (!model_state->IsDecoupled()) { - instance_state->ProcessRequests( - requests, request_count, infer_requests, restart); + // TODO: Implement restart on decoupled - if (restart) { - LOG_MESSAGE( - TRITONSERVER_LOG_ERROR, - "Stub process is unhealthy and it will be restarted."); - instance_state->TerminateMonitor(); - instance_state->Stub()->KillStubProcess(); - TRITONSERVER_Error* err = instance_state->Stub()->Setup(); - if (err == nullptr) { - instance_state->StartMonitor(); - } - LOG_IF_ERROR(err, "Failed to restart the stub process."); - err = instance_state->Stub()->Launch(); - LOG_IF_ERROR( - err, - "Failed to restart the stub process: failed to launch " - "the stub process."); - // Reset the release flags for all the requests. - for (auto& infer_request : infer_requests) { - infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); - } - } - } else { + std::vector> infer_requests; + { uint64_t exec_start_ns = 0; SET_TIMESTAMP(exec_start_ns); @@ -2483,7 +1965,7 @@ TRITONBACKEND_ModelInstanceExecute( nullptr); reporter.SetExecStartNs(exec_start_ns); - error = instance_state->ProcessRequestsDecoupled( + error = instance_state->ProcessRequests( requests, request_count, infer_requests, reporter); uint64_t exec_end_ns = 0; diff --git a/src/python_be.h b/src/python_be.h index 9618204c..46c93012 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -289,7 +289,7 @@ class ModelInstanceState : public BackendModelInstance { bool stub_to_parent_thread_; // Decoupled monitor thread std::thread decoupled_monitor_; - bool decoupled_thread_; + bool queue_monitor_thread_; std::mutex mu_; std::condition_variable cv_; std::unique_ptr received_message_; @@ -309,30 +309,12 @@ class ModelInstanceState : public BackendModelInstance { // Launch stub process. TRITONSERVER_Error* LaunchStubProcess(); - TRITONSERVER_Error* SendMessageToStub( - bi::managed_external_buffer::handle_t message); void ResponseSendDecoupled(std::shared_ptr response_send_message); - // Checks whether the stub process is live - bool IsStubProcessAlive(); - - // Get a message from the stub process - void SendMessageAndReceiveResponse( - bi::managed_external_buffer::handle_t message, - bi::managed_external_buffer::handle_t& response, bool& restart, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count); - - // Responds to all the requests with an error message. - void RespondErrorToAllRequests( - const char* message, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count); - - // In the decoupled mode, the parent message queue is monitored only by this - // function during the execute phase. No other thread should pop any message - // from the message queue in the decoupled mode. - void DecoupledMessageQueueMonitor(); + // The parent message queue is monitored only by this function during the + // execute phase. No other thread should pop any message from the message + // queue. + void MessageQueueMonitor(); // This function is executed on a separate thread and monitors the queue for // message sent from stub to parent process. @@ -347,14 +329,8 @@ class ModelInstanceState : public BackendModelInstance { TRITONBACKEND_Request* request, std::shared_ptr>& responses); - // Process all the requests obtained from Triton. - void ProcessRequests( - TRITONBACKEND_Request** requests, const uint32_t request_count, - std::vector>& pb_infer_requests, - bool& restart); - // Process all the requests in the decoupled mode. - TRITONSERVER_Error* ProcessRequestsDecoupled( + TRITONSERVER_Error* ProcessRequests( TRITONBACKEND_Request** requests, const uint32_t request_count, std::vector>& pb_infer_requests, PbMetricReporter& pb_metric_reporter); @@ -368,9 +344,6 @@ class ModelInstanceState : public BackendModelInstance { // Cleanup BLS responses void CleanupBLSResponses(); - // Wait for BLS requests to complete - void WaitForBLSRequestsToFinish(); - // Check the incoming requests for errors TRITONSERVER_Error* CheckIncomingRequests( TRITONBACKEND_Request** requests, const uint32_t request_count, From 4551e0458b3c67cbd29a3a3c23996ee960a2ad7d Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Fri, 31 May 2024 15:22:24 -0700 Subject: [PATCH 2/7] Fix decoupled batch statistics to account for implicit batch size (#361) --- src/python_be.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python_be.cc b/src/python_be.cc index d5657b30..ddf1b022 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1236,7 +1236,7 @@ ModelInstanceState::ProcessRequests( uint64_t compute_end_ns = 0; SET_TIMESTAMP(compute_end_ns); reporter.SetComputeEndNs(compute_end_ns); - reporter.SetBatchStatistics(request_count); + reporter.SetBatchStatistics(total_batch_size); if (response_batch.data_->has_error) { if (response_batch.data_->is_error_set) { From 9f2865da7a590f6e411b0906898cbd8ecb9a335d Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Mon, 3 Jun 2024 14:16:22 -0700 Subject: [PATCH 3/7] Fix decoupled gpu output error handling (#362) * Fix decoupled gpu output error handling * Return full error string upon exception from model --- src/pb_stub.cc | 4 ++-- src/response_sender.cc | 39 ++++++++++++++++++++++----------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 0e288e68..2f5c556c 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -735,9 +735,9 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) "Failed to process the request(s) for model '" + name_ + "', message: ") + error_string; - LOG_INFO << err_message.c_str(); + LOG_ERROR << err_message.c_str(); response_batch_shm_ptr->has_error = true; - error_string_shm = PbString::Create(shm_pool_, error_string); + error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); response_batch_shm_ptr->is_error_set = true; } diff --git a/src/response_sender.cc b/src/response_sender.cc index 94e3f0c8..038279db 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -147,9 +147,26 @@ ResponseSender::Send( } if (has_gpu_output) { + ScopedDefer _([send_message_payload] { + bi::scoped_lock guard{send_message_payload->mu}; + send_message_payload->is_stub_turn = false; + send_message_payload->cv.notify_one(); + while (!send_message_payload->is_stub_turn) { + // Wait for the stub process to send the response and populate error + // message if any. + send_message_payload->cv.wait(guard); + } + }); + AllocatedSharedMemory gpu_buffers_handle = shm_pool_->Load( send_message_payload->gpu_buffers_handle); + if (!gpu_buffers_handle.data_->success) { + std::unique_ptr error = PbString::LoadFromSharedMemory( + shm_pool_, gpu_buffers_handle.data_->error); + throw PythonBackendException( + "Failed to load GPU buffers: " + error->String()); + } AllocatedSharedMemory gpu_buffers_handle_shm = @@ -157,12 +174,11 @@ ResponseSender::Send( gpu_buffers_handle.data_->buffers); uint64_t gpu_buffer_count = gpu_buffers_handle.data_->buffer_count; if (gpu_tensors.size() != gpu_buffer_count) { - LOG_ERROR - << (std::string( - "GPU buffers size does not match the provided buffers: ") + - std::to_string(gpu_tensors.size()) + - " != " + std::to_string(gpu_buffer_count)); - return; + throw PythonBackendException( + std::string( + "GPU buffers size does not match the provided buffers: ") + + std::to_string(gpu_tensors.size()) + + " != " + std::to_string(gpu_buffer_count)); } std::vector> dst_buffers; @@ -175,17 +191,6 @@ ResponseSender::Send( std::shared_ptr& src_buffer = gpu_tensors[i]; PbMemory::CopyBuffer(dst_buffers[i], src_buffer->Memory()); } - - { - bi::scoped_lock guard{send_message_payload->mu}; - send_message_payload->is_stub_turn = false; - send_message_payload->cv.notify_one(); - while (!send_message_payload->is_stub_turn) { - // Wait for the stub process to send the response and populate error - // message if any. - send_message_payload->cv.wait(guard); - } - } } if (send_message_payload->has_error) { From 4961e24a35f06892aa85f86a3c3c701ede9c25d9 Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Tue, 4 Jun 2024 11:59:30 -0700 Subject: [PATCH 4/7] Response sender to check for improper non-decoupled model usage (#363) * Response sender to check for improper non-decoupled model usage * Force close response sender on exception * Rename functions --- src/infer_request.cc | 12 +++--- src/infer_request.h | 5 ++- src/pb_stub.cc | 11 +++++- src/python_be.cc | 3 +- src/response_sender.cc | 88 ++++++++++++++++++++++++++++++++---------- src/response_sender.h | 16 +++++++- 6 files changed, 103 insertions(+), 32 deletions(-) diff --git a/src/infer_request.cc b/src/infer_request.cc index fc1e4206..57ea6cf1 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -74,7 +74,7 @@ InferRequest::InferRequest( pb_cancel_ = std::make_shared(response_factory_address_, request_address_); response_sender_ = std::make_shared( - request_address_, response_factory_address_, + request_address_, response_factory_address_, nullptr /* is_decoupled */, Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_); #endif } @@ -272,7 +272,8 @@ InferRequest::SaveToSharedMemory(std::unique_ptr& shm_pool) std::unique_ptr InferRequest::LoadFromSharedMemory( std::unique_ptr& shm_pool, - bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle) + bi::managed_external_buffer::handle_t request_handle, bool open_cuda_handle, + bool const* is_model_decoupled) { AllocatedSharedMemory infer_request_shm = shm_pool->Load(request_handle); @@ -328,7 +329,7 @@ InferRequest::LoadFromSharedMemory( return std::unique_ptr(new InferRequest( infer_request_shm, request_id_shm, correlation_id_shm, requested_output_names_shm, model_name_shm, input_tensors, parameters_shm, - infer_trace_shm)); + infer_trace_shm, is_model_decoupled)); } InferRequest::InferRequest( @@ -339,7 +340,8 @@ InferRequest::InferRequest( std::unique_ptr& model_name_shm, std::vector>& input_tensors, std::unique_ptr& parameters_shm, - std::unique_ptr& infer_trace_shm) + std::unique_ptr& infer_trace_shm, + bool const* is_model_decoupled) : infer_request_shm_(std::move(infer_request_shm)), request_id_shm_(std::move(request_id_shm)), requested_output_names_shm_(std::move(requested_output_names_shm)), @@ -387,7 +389,7 @@ InferRequest::InferRequest( pb_cancel_ = std::make_shared(response_factory_address_, request_address_); response_sender_ = std::make_shared( - request_address_, response_factory_address_, + request_address_, response_factory_address_, is_model_decoupled, Stub::GetOrCreateInstance()->SharedMemory(), pb_cancel_); #endif } diff --git a/src/infer_request.h b/src/infer_request.h index e0887624..c67e2fb0 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -118,7 +118,7 @@ class InferRequest { static std::unique_ptr LoadFromSharedMemory( std::unique_ptr& shm_pool, bi::managed_external_buffer::handle_t request_handle, - bool open_cuda_handle); + bool open_cuda_handle, bool const* is_model_decoupled); /// Disallow copying the inference request object. DISALLOW_COPY_AND_ASSIGN(InferRequest); @@ -135,7 +135,8 @@ class InferRequest { std::unique_ptr& model_name_shm, std::vector>& input_tensors, std::unique_ptr& parameters_shm, - std::unique_ptr& infer_trace_shm); + std::unique_ptr& infer_trace_shm, + bool const* is_model_decoupled); std::string request_id_; CorrelationId correlation_id_; diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 2f5c556c..87410a70 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -658,7 +658,8 @@ Stub::LoadRequestsFromSharedMemory(RequestBatch* request_batch_shm_ptr) for (size_t i = 0; i < batch_size; i++) { std::shared_ptr infer_request = InferRequest::LoadFromSharedMemory( - shm_pool_, request_shm_handle[i], true /* open_cuda_handle */); + shm_pool_, request_shm_handle[i], true /* open_cuda_handle */, + &ipc_control_->decoupled /* is_model_decoupled */); py_request_list.append(infer_request); } @@ -740,6 +741,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); response_batch_shm_ptr->is_error_set = true; + // Once the error is sent to the backend, the backend is supposed to close + // all response factories if not already closed, so closing all response + // senders if not already closed to prevent the model from sending more + // responses after the factories are closed. + for (py::handle py_request : py_request_list) { + InferRequest* request = py_request.cast(); + request->GetResponseSender()->Close(); + } } } diff --git a/src/python_be.cc b/src/python_be.cc index ddf1b022..4d32e4e5 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -571,7 +571,8 @@ ModelInstanceState::ExecuteBLSRequest( reinterpret_cast( request_batch.data_.get() + sizeof(RequestBatch)); infer_request = InferRequest::LoadFromSharedMemory( - Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */); + Stub()->ShmPool(), *request_handle, false /* open_cuda_handle */, + nullptr /* is_model_decoupled */); // If the BLS inputs are in GPU an additional round trip between the // stub process and the main process is required. The reason is that we diff --git a/src/response_sender.cc b/src/response_sender.cc index 038279db..d4122aa9 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -35,13 +35,31 @@ namespace triton { namespace backend { namespace python { +void +CheckResponseSenderArguments( + const std::shared_ptr& response, const uint32_t flags) +{ + // Check the correctness of the provided flags. + if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) { + throw PythonBackendException( + "Unable to send response. Unsupported flag provided."); + } + + if (flags == 0 && response == nullptr) { + throw PythonBackendException( + "Inference Response object must be provided when the response flags is " + "set to zero."); + } +} + ResponseSender::ResponseSender( intptr_t request_address, intptr_t response_factory_address, - std::unique_ptr& shm_pool, + bool const* is_decoupled, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel) : request_address_(request_address), - response_factory_address_(response_factory_address), shm_pool_(shm_pool), - closed_(false), pb_cancel_(pb_cancel) + response_factory_address_(response_factory_address), + is_decoupled_(is_decoupled), shm_pool_(shm_pool), pb_cancel_(pb_cancel), + closed_(false), number_of_response_sent_(0) { } @@ -54,15 +72,32 @@ ResponseSender::~ResponseSender() } void -ResponseSender::Send( - std::shared_ptr infer_response, const uint32_t flags) +ResponseSender::UpdateStateAndCounters( + const std::shared_ptr& response, const uint32_t flags) { - // Release the GIL. This avoids a potential deadlock situation in the parent - // process, where every thread in the thread pool is indirectly waiting for a - // function in the stub process that acquires the GIL. Meanwhile, the current - // thread, which holds the GIL, is also waiting for the parent side to have - // the next available thread to pick up the job during resource contention. - py::gil_scoped_release release; + if (is_decoupled_ == nullptr) { + // TODO: Can a model access the response sender on a BLS infer request? + throw PythonBackendException( + "Unable to send response. Response sender has no reference to the " + "decoupled state of the model."); + } + bool is_decoupled = *is_decoupled_; + + std::lock_guard lk(mu_); + + if (!is_decoupled) { + if (response != nullptr && number_of_response_sent_ > 0) { + throw PythonBackendException( + "Unable to send response. Non-decoupled model cannot send more than " + "one response."); + } + if (response == nullptr && flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL && + number_of_response_sent_ == 0) { + throw PythonBackendException( + "Unable to send response. Non-decoupled model cannot send complete " + "final before sending a response."); + } + } if (closed_) { throw PythonBackendException( @@ -72,18 +107,22 @@ ResponseSender::Send( if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { closed_ = true; } + number_of_response_sent_++; +} - // Check the correctness of the provided flags. - if (flags != TRITONSERVER_RESPONSE_COMPLETE_FINAL && flags != 0) { - throw PythonBackendException( - "Unable to send response. Unsupported flag provided."); - } +void +ResponseSender::Send( + std::shared_ptr infer_response, const uint32_t flags) +{ + // Release the GIL. This avoids a potential deadlock situation in the parent + // process, where every thread in the thread pool is indirectly waiting for a + // function in the stub process that acquires the GIL. Meanwhile, the current + // thread, which holds the GIL, is also waiting for the parent side to have + // the next available thread to pick up the job during resource contention. + py::gil_scoped_release release; - if (flags == 0 && infer_response == nullptr) { - throw PythonBackendException( - "Inference Response object must be provided when the response flags is " - "set to zero."); - } + CheckResponseSenderArguments(infer_response, flags); + UpdateStateAndCounters(infer_response, flags); std::unique_ptr& stub = Stub::GetOrCreateInstance(); @@ -211,4 +250,11 @@ ResponseSender::IsCancelled() return pb_cancel_->IsCancelled(); } +void +ResponseSender::Close() +{ + std::lock_guard lk(mu_); + closed_ = true; +} + }}} // namespace triton::backend::python diff --git a/src/response_sender.h b/src/response_sender.h index d29a6ab6..be1d49ae 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -26,6 +26,8 @@ #pragma once +#include + #include "infer_response.h" #include "pb_cancel.h" #include "shm_manager.h" @@ -36,17 +38,27 @@ class ResponseSender { public: ResponseSender( intptr_t request_address, intptr_t response_factory_address, - std::unique_ptr& shm_pool, + bool const* is_decoupled, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel); ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled(); + // Can be useful at stopping the model from sending any more responses. + void Close(); + private: + void UpdateStateAndCounters( + const std::shared_ptr& response, const uint32_t flags); + intptr_t request_address_; intptr_t response_factory_address_; + bool const* is_decoupled_; std::unique_ptr& shm_pool_; - bool closed_; std::shared_ptr pb_cancel_; + + std::mutex mu_; + bool closed_; + size_t number_of_response_sent_; }; }}} // namespace triton::backend::python From 61f46a780bcff76531f0083fda4ad4c5ce92fa5f Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 4 Jun 2024 12:37:58 -0700 Subject: [PATCH 5/7] Update copyright --- src/python_be.h | 2 +- src/response_sender.cc | 2 +- src/response_sender.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/python_be.h b/src/python_be.h index 46c93012..52aab759 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -1,4 +1,4 @@ -// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions diff --git a/src/response_sender.cc b/src/response_sender.cc index d4122aa9..74914ab4 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -1,4 +1,4 @@ -// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions diff --git a/src/response_sender.h b/src/response_sender.h index be1d49ae..1b57508e 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -1,4 +1,4 @@ -// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions From f5e6e3b77a0d5807f748830696ddec20b0ad49ca Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 4 Jun 2024 18:16:26 -0700 Subject: [PATCH 6/7] Rename decoupled monitor to queue monitor --- src/python_be.cc | 5 ++--- src/python_be.h | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/python_be.cc b/src/python_be.cc index 4d32e4e5..cd31e79e 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -323,8 +323,7 @@ ModelInstanceState::LaunchStubProcess() model_state->StateForBackend()->thread_pool_size); queue_monitor_thread_ = true; - decoupled_monitor_ = - std::thread(&ModelInstanceState::MessageQueueMonitor, this); + queue_monitor_ = std::thread(&ModelInstanceState::MessageQueueMonitor, this); request_executor_ = std::make_unique( Stub()->ShmPool(), model_state->TritonServer()); @@ -1393,7 +1392,7 @@ ModelInstanceState::~ModelInstanceState() thread_pool_->wait(); // Push a dummy message to signal the thread to terminate. Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); - decoupled_monitor_.join(); + queue_monitor_.join(); } // Terminate stub first to allow any last messages to be received by the back // end before deallocating the queue memory diff --git a/src/python_be.h b/src/python_be.h index 52aab759..59660fc4 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -287,8 +287,8 @@ class ModelInstanceState : public BackendModelInstance { std::thread stub_to_parent_queue_monitor_; bool stub_to_parent_thread_; - // Decoupled monitor thread - std::thread decoupled_monitor_; + // Queue monitor thread + std::thread queue_monitor_; bool queue_monitor_thread_; std::mutex mu_; std::condition_variable cv_; From 4d732035f00f395c22e9d341ba164f287d08b55b Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 4 Jun 2024 18:26:08 -0700 Subject: [PATCH 7/7] Add documentation --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 89b9213e..30f2dd25 100644 --- a/README.md +++ b/README.md @@ -479,6 +479,12 @@ Upon return from the execute function all tensor data associated with the InferenceRequest objects passed to the function are deleted, and so InferenceRequest objects should not be retained by the Python model. +Starting from 24.06, models may choose to send the response using the +`InferenceResponseSender` as illustrated on [Decoupled mode](#decoupled-mode). +Since the model is in default mode, it must send exactly one response per +request. The `pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL` flag must be sent +either with the response or as a flag only response afterward. + #### Error Handling In case one of the requests has an error, you can use the `TritonError` object