diff --git a/src/infer_request.cc b/src/infer_request.cc index 8a95b524..e5733662 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled) { bi::scoped_lock lock{ *(ipc_message->ResponseMutex())}; - stub->SendIPCMessage(ipc_message); + stub->SendIPCUtilsMessage(ipc_message); ipc_message->ResponseCondition()->wait(lock); } diff --git a/src/ipc_message.cc b/src/ipc_message.cc index ea1dc5b0..1b813214 100644 --- a/src/ipc_message.cc +++ b/src/ipc_message.cc @@ -56,6 +56,19 @@ IPCMessage::Create( new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm)); } +std::unique_ptr +IPCMessage::Create(IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle) +{ + return std::unique_ptr(new IPCMessage(ipc_message_shm, message_handle)); +} + + AllocatedSharedMemory& +IPCMessage::GetAllocatedSharedMemory() +{ + return ipc_message_shm_; +} + std::unique_ptr IPCMessage::LoadFromSharedMemory( std::unique_ptr& shm_pool, @@ -133,4 +146,10 @@ IPCMessage::IPCMessage( ipc_message_handle_ = ipc_message_shm_.handle_; } +IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle) +{ + ipc_message_handle_ = handle; + ipc_message_shm_ptr_ = ipc_message_shm; +} + }}}; // namespace triton::backend::python diff --git a/src/ipc_message.h b/src/ipc_message.h index 8e762b8f..c7d0ae9d 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -97,6 +97,10 @@ class IPCMessage { static std::unique_ptr Create( const std::unique_ptr& shm_pool, bool inline_response); + + static std::unique_ptr + Create(IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle); static std::unique_ptr LoadFromSharedMemory( std::unique_ptr& shm_pool, bi::managed_external_buffer::handle_t message_handle); @@ -108,6 +112,7 @@ class IPCMessage { bi::interprocess_mutex* ResponseMutex(); bi::managed_external_buffer::handle_t& Args(); bi::managed_external_buffer::handle_t ShmHandle(); + AllocatedSharedMemory& GetAllocatedSharedMemory(); private: AllocatedSharedMemory ipc_message_shm_; @@ -129,6 +134,8 @@ class IPCMessage { AllocatedSharedMemory& ipc_message_shm, AllocatedSharedMemory& response_mutex_shm, AllocatedSharedMemory& response_cond_shm); + + IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle); }; }}}; // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index a4e051e9..e6c93214 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -653,9 +653,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { py::list py_request_list = LoadRequestsFromSharedMemory(request_batch_shm_ptr); - std::unique_ptr execute_response = - IPCMessage::Create(shm_pool_, false /* Inline response */); - execute_response->Command() = PYTHONSTUB_ExecuteResponse; + std::unique_ptr execute_response; + // IPCMessage::Create(shm_pool_, false /* Inline response */); std::optional> response_batch; bool has_exception = false; @@ -713,9 +712,9 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) error_string; LOG_ERROR << err_message.c_str(); if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); + response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; @@ -733,14 +732,17 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->batch_size = 0; } - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->has_error = false; response_batch_shm_ptr->is_error_set = false; + execute_response = IPCMessage::Create(reinterpret_cast(response_batch.value().data_.get()), response_batch.value().handle_); execute_response->Args() = response_batch.value().handle_; + execute_response->InlineResponse() = false; + execute_response->Command() = PYTHONSTUB_ExecuteResponse; _.Complete(); execute_finalize.Complete(); } @@ -813,15 +815,15 @@ Stub::ProcessReturnedResponses( request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } - response_batch = std::move(shm_pool_->Construct( + response_batch = std::move(shm_pool_->Construct(sizeof(IPCMessageShm) + requests_size * sizeof(bi::managed_external_buffer::handle_t) + sizeof(ResponseBatch))); ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.value().data_.get()); + reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); bi::managed_external_buffer::handle_t* responses_shm_handle = reinterpret_cast( - response_batch.value().data_.get() + sizeof(ResponseBatch)); + response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); for (size_t i = 0; i < responses_size; i++) { // Check the return type of execute function. diff --git a/src/python_be.cc b/src/python_be.cc index 78b306b9..361e1401 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -290,8 +290,8 @@ ModelInstanceState::SaveRequestsToSharedMemory( request, &request_timeout)); std::unique_ptr infer_request; - TRITONBACKEND_ResponseFactory* factory_ptr; - RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + TRITONBACKEND_ResponseFactory* factory_ptr = nullptr; + // RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, @@ -322,8 +322,6 @@ ModelInstanceState::LaunchStubProcess() thread_pool_ = std::make_unique( model_state->StateForBackend()->thread_pool_size); - queue_monitor_thread_ = true; - queue_monitor_ = std::thread(&ModelInstanceState::MessageQueueMonitor, this); request_executor_ = std::make_unique( Stub()->ShmPool(), model_state->TritonServer()); @@ -685,44 +683,6 @@ ModelInstanceState::ExecuteBLSRequest( } } -void -ModelInstanceState::MessageQueueMonitor() -{ - while (queue_monitor_thread_) { - bi::managed_external_buffer::handle_t handle = - Stub()->ParentMessageQueue()->Pop(); - if (handle == DUMMY_MESSAGE) { - break; - } - std::unique_ptr message = - IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), handle); - - // Need to notify the model instance thread that the execute response has - // been received. - if (message->Command() == PYTHONSTUB_ExecuteResponse) { - std::lock_guard guard{mu_}; - received_message_ = std::move(message); - cv_.notify_one(); - } else if (message->Command() == PYTHONSTUB_ResponseSend) { - std::shared_ptr response_send_message = std::move(message); - std::packaged_task task([this, response_send_message] { - ResponseSendDecoupled(response_send_message); - }); - boost::asio::post(*thread_pool_, std::move(task)); - } else if ( - message->Command() == PYTHONSTUB_InferExecRequest || - message->Command() == PYTHONSTUB_InferStreamExecRequest) { - std::shared_ptr bls_execute = std::move(message); - std::packaged_task task([this, bls_execute] { - ExecuteBLSRequest( - bls_execute, - (bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest)); - }); - boost::asio::post(*thread_pool_, std::move(task)); - } - } -} - void ModelInstanceState::StubToParentMQMonitor() { @@ -769,6 +729,25 @@ ModelInstanceState::StubToParentMQMonitor() ProcessModelControlRequest(message); break; } + case PYTHONSTUB_ResponseSend: { + std::shared_ptr response_send_message = std::move(message); + std::packaged_task task([this, response_send_message] { + ResponseSendDecoupled(response_send_message); + }); + boost::asio::post(*thread_pool_, std::move(task)); + break; + } + case PYTHONSTUB_InferExecRequest: + case PYTHONSTUB_InferStreamExecRequest: { + std::shared_ptr bls_execute = std::move(message); + std::packaged_task task([this, bls_execute] { + ExecuteBLSRequest( + bls_execute, + (bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest)); + }); + boost::asio::post(*thread_pool_, std::move(task)); + break; + } default: { LOG_MESSAGE( TRITONSERVER_LOG_ERROR, "Unexpected message type received."); @@ -1228,26 +1207,23 @@ ModelInstanceState::ProcessRequests( IPCMessage::Create(Stub()->ShmPool(), false /*inline_response*/)); ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest; ipc_message->Args() = request_batch.handle_; - received_message_ = nullptr; + ScopedDefer execute_finalize([this] { // Push a dummy message to signal the thread to terminate. Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE); }); + std::unique_ptr response; { - std::unique_lock guard{mu_}; Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle()); - cv_.wait(guard, [this] { return received_message_ != nullptr; }); + bi::managed_external_buffer::handle_t response_message; + Stub()->ReceiveMessageFromStub(response_message); + response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); } - - - AllocatedSharedMemory response_batch = Stub()->ShmPool()->Load(received_message_->Args()); - + char* ipc_message_shm = reinterpret_cast(response->GetAllocatedSharedMemory().data_.get());; ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); + reinterpret_cast(ipc_message_shm + sizeof(IPCMessageShm)); - received_message_.reset(); - uint64_t compute_end_ns = 0; SET_TIMESTAMP(compute_end_ns); reporter.SetComputeEndNs(compute_end_ns); @@ -1282,7 +1258,7 @@ ModelInstanceState::ProcessRequests( } bi::managed_external_buffer::handle_t* response_shm_handle = reinterpret_cast( - response_batch.data_.get() + sizeof(ResponseBatch)); + ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); // If the output provided by the model is in GPU, we will pass the list of // buffers provided by Triton to the stub process. @@ -1390,8 +1366,6 @@ ModelInstanceState::ProcessRequests( } } - // Finalize the execute. - execute_finalize.Complete(); } // If the output tensor is in GPU, there will be a second round trip @@ -1610,7 +1584,6 @@ ModelInstanceState::~ModelInstanceState() Stub()->TerminateStub(); TerminateMonitor(); Stub()->ClearQueues(); - received_message_.reset(); Stub().reset(); } diff --git a/src/response_sender.cc b/src/response_sender.cc index 0a88fb6b..cef4e3a7 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -69,7 +69,7 @@ ResponseSender::ResponseSender( ResponseSender::~ResponseSender() { - DeleteResponseFactory(); + // DeleteResponseFactory(); } void @@ -172,7 +172,7 @@ ResponseSender::Send( { bi::scoped_lock guard{send_message_payload->mu}; - stub->SendIPCMessage(ipc_message); + stub->SendIPCUtilsMessage(ipc_message); while (!send_message_payload->is_stub_turn) { send_message_payload->cv.wait(guard); } @@ -248,7 +248,7 @@ ResponseSender::Send( } if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - DeleteResponseFactory(); + // DeleteResponseFactory(); } } @@ -270,10 +270,10 @@ ResponseSender::DeleteResponseFactory() { bool already_deleted = response_factory_deleted_.exchange(true); if (!already_deleted) { - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - stub->EnqueueCleanupId( - reinterpret_cast(response_factory_address_), - PYTHONSTUB_DecoupledResponseFactoryCleanup); + // std::unique_ptr& stub = Stub::GetOrCreateInstance(); + // stub->EnqueueCleanupId( + // reinterpret_cast(response_factory_address_), + // PYTHONSTUB_DecoupledResponseFactoryCleanup); } }