Skip to content

Commit

Permalink
Improve perf
Browse files Browse the repository at this point in the history
  • Loading branch information
Tabrizian committed Sep 16, 2024
1 parent f4c83f2 commit b36b55d
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 75 deletions.
2 changes: 1 addition & 1 deletion src/infer_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled)
{
bi::scoped_lock<bi::interprocess_mutex> lock{
*(ipc_message->ResponseMutex())};
stub->SendIPCMessage(ipc_message);
stub->SendIPCUtilsMessage(ipc_message);
ipc_message->ResponseCondition()->wait(lock);
}

Expand Down
19 changes: 19 additions & 0 deletions src/ipc_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ IPCMessage::Create(
new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm));
}

std::unique_ptr<IPCMessage>
IPCMessage::Create(IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle)
{
return std::unique_ptr<IPCMessage>(new IPCMessage(ipc_message_shm, message_handle));
}

AllocatedSharedMemory<IPCMessageShm>&
IPCMessage::GetAllocatedSharedMemory()
{
return ipc_message_shm_;
}

std::unique_ptr<IPCMessage>
IPCMessage::LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
Expand Down Expand Up @@ -133,4 +146,10 @@ IPCMessage::IPCMessage(
ipc_message_handle_ = ipc_message_shm_.handle_;
}

IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle)
{
ipc_message_handle_ = handle;
ipc_message_shm_ptr_ = ipc_message_shm;
}

}}}; // namespace triton::backend::python
7 changes: 7 additions & 0 deletions src/ipc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class IPCMessage {
static std::unique_ptr<IPCMessage> Create(
const std::unique_ptr<SharedMemoryManager>& shm_pool,
bool inline_response);

static std::unique_ptr<IPCMessage>
Create(IPCMessageShm* ipc_message_shm,
bi::managed_external_buffer::handle_t& message_handle);
static std::unique_ptr<IPCMessage> LoadFromSharedMemory(
std::unique_ptr<SharedMemoryManager>& shm_pool,
bi::managed_external_buffer::handle_t message_handle);
Expand All @@ -108,6 +112,7 @@ class IPCMessage {
bi::interprocess_mutex* ResponseMutex();
bi::managed_external_buffer::handle_t& Args();
bi::managed_external_buffer::handle_t ShmHandle();
AllocatedSharedMemory<IPCMessageShm>& GetAllocatedSharedMemory();

private:
AllocatedSharedMemory<IPCMessageShm> ipc_message_shm_;
Expand All @@ -129,6 +134,8 @@ class IPCMessage {
AllocatedSharedMemory<IPCMessageShm>& ipc_message_shm,
AllocatedSharedMemory<bi::interprocess_mutex>& response_mutex_shm,
AllocatedSharedMemory<bi::interprocess_condition>& response_cond_shm);

IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle);
};

}}}; // namespace triton::backend::python
24 changes: 13 additions & 11 deletions src/pb_stub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
{
py::list py_request_list =
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
std::unique_ptr<IPCMessage> execute_response =
IPCMessage::Create(shm_pool_, false /* Inline response */);
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
std::unique_ptr<IPCMessage> execute_response;
// IPCMessage::Create(shm_pool_, false /* Inline response */);

std::optional<AllocatedSharedMemory<char>> response_batch;
bool has_exception = false;
Expand Down Expand Up @@ -713,9 +712,9 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
error_string;
LOG_ERROR << err_message.c_str();
if (!response_batch) {
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch));
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
}
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));

response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
response_batch_shm_ptr->has_error = true;
Expand All @@ -733,14 +732,17 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
}

if (!response_batch) {
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch));
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->batch_size = 0;
}
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
response_batch_shm_ptr->has_error = false;
response_batch_shm_ptr->is_error_set = false;
execute_response = IPCMessage::Create(reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()), response_batch.value().handle_);
execute_response->Args() = response_batch.value().handle_;
execute_response->InlineResponse() = false;
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
_.Complete();
execute_finalize.Complete();
}
Expand Down Expand Up @@ -813,15 +815,15 @@ Stub::ProcessReturnedResponses(
request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
}
}
response_batch = std::move(shm_pool_->Construct<char>(
response_batch = std::move(shm_pool_->Construct<char>(sizeof(IPCMessageShm) +
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
sizeof(ResponseBatch)));
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));

bi::managed_external_buffer::handle_t* responses_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.value().data_.get() + sizeof(ResponseBatch));
response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm));

for (size_t i = 0; i < responses_size; i++) {
// Check the return type of execute function.
Expand Down
85 changes: 29 additions & 56 deletions src/python_be.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ ModelInstanceState::SaveRequestsToSharedMemory(
request, &request_timeout));

std::unique_ptr<InferRequest> infer_request;
TRITONBACKEND_ResponseFactory* factory_ptr;
RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
TRITONBACKEND_ResponseFactory* factory_ptr = nullptr;
// RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));

infer_request = std::make_unique<InferRequest>(
id, correlation_id, pb_input_tensors, requested_output_names,
Expand Down Expand Up @@ -322,8 +322,6 @@ ModelInstanceState::LaunchStubProcess()
thread_pool_ = std::make_unique<boost::asio::thread_pool>(
model_state->StateForBackend()->thread_pool_size);

queue_monitor_thread_ = true;
queue_monitor_ = std::thread(&ModelInstanceState::MessageQueueMonitor, this);
request_executor_ = std::make_unique<RequestExecutor>(
Stub()->ShmPool(), model_state->TritonServer());

Expand Down Expand Up @@ -685,44 +683,6 @@ ModelInstanceState::ExecuteBLSRequest(
}
}

void
ModelInstanceState::MessageQueueMonitor()
{
while (queue_monitor_thread_) {
bi::managed_external_buffer::handle_t handle =
Stub()->ParentMessageQueue()->Pop();
if (handle == DUMMY_MESSAGE) {
break;
}
std::unique_ptr<IPCMessage> message =
IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), handle);

// Need to notify the model instance thread that the execute response has
// been received.
if (message->Command() == PYTHONSTUB_ExecuteResponse) {
std::lock_guard<std::mutex> guard{mu_};
received_message_ = std::move(message);
cv_.notify_one();
} else if (message->Command() == PYTHONSTUB_ResponseSend) {
std::shared_ptr<IPCMessage> response_send_message = std::move(message);
std::packaged_task<void()> task([this, response_send_message] {
ResponseSendDecoupled(response_send_message);
});
boost::asio::post(*thread_pool_, std::move(task));
} else if (
message->Command() == PYTHONSTUB_InferExecRequest ||
message->Command() == PYTHONSTUB_InferStreamExecRequest) {
std::shared_ptr<IPCMessage> bls_execute = std::move(message);
std::packaged_task<void()> task([this, bls_execute] {
ExecuteBLSRequest(
bls_execute,
(bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest));
});
boost::asio::post(*thread_pool_, std::move(task));
}
}
}

void
ModelInstanceState::StubToParentMQMonitor()
{
Expand Down Expand Up @@ -769,6 +729,25 @@ ModelInstanceState::StubToParentMQMonitor()
ProcessModelControlRequest(message);
break;
}
case PYTHONSTUB_ResponseSend: {
std::shared_ptr<IPCMessage> response_send_message = std::move(message);
std::packaged_task<void()> task([this, response_send_message] {
ResponseSendDecoupled(response_send_message);
});
boost::asio::post(*thread_pool_, std::move(task));
break;
}
case PYTHONSTUB_InferExecRequest:
case PYTHONSTUB_InferStreamExecRequest: {
std::shared_ptr<IPCMessage> bls_execute = std::move(message);
std::packaged_task<void()> task([this, bls_execute] {
ExecuteBLSRequest(
bls_execute,
(bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest));
});
boost::asio::post(*thread_pool_, std::move(task));
break;
}
default: {
LOG_MESSAGE(
TRITONSERVER_LOG_ERROR, "Unexpected message type received.");
Expand Down Expand Up @@ -1228,26 +1207,23 @@ ModelInstanceState::ProcessRequests(
IPCMessage::Create(Stub()->ShmPool(), false /*inline_response*/));
ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest;
ipc_message->Args() = request_batch.handle_;
received_message_ = nullptr;

ScopedDefer execute_finalize([this] {
// Push a dummy message to signal the thread to terminate.
Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE);
});

std::unique_ptr<IPCMessage> response;
{
std::unique_lock<std::mutex> guard{mu_};
Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle());
cv_.wait(guard, [this] { return received_message_ != nullptr; });
bi::managed_external_buffer::handle_t response_message;
Stub()->ReceiveMessageFromStub(response_message);
response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message);
}


AllocatedSharedMemory<char> response_batch = Stub()->ShmPool()->Load<char>(received_message_->Args());

char* ipc_message_shm = reinterpret_cast<char*>(response->GetAllocatedSharedMemory().data_.get());;
ResponseBatch* response_batch_shm_ptr =
reinterpret_cast<ResponseBatch*>(response_batch.data_.get());
reinterpret_cast<ResponseBatch*>(ipc_message_shm + sizeof(IPCMessageShm));

received_message_.reset();

uint64_t compute_end_ns = 0;
SET_TIMESTAMP(compute_end_ns);
reporter.SetComputeEndNs(compute_end_ns);
Expand Down Expand Up @@ -1282,7 +1258,7 @@ ModelInstanceState::ProcessRequests(
}
bi::managed_external_buffer::handle_t* response_shm_handle =
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
response_batch.data_.get() + sizeof(ResponseBatch));
ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm));

// If the output provided by the model is in GPU, we will pass the list of
// buffers provided by Triton to the stub process.
Expand Down Expand Up @@ -1390,8 +1366,6 @@ ModelInstanceState::ProcessRequests(
}
}

// Finalize the execute.
execute_finalize.Complete();
}

// If the output tensor is in GPU, there will be a second round trip
Expand Down Expand Up @@ -1610,7 +1584,6 @@ ModelInstanceState::~ModelInstanceState()
Stub()->TerminateStub();
TerminateMonitor();
Stub()->ClearQueues();
received_message_.reset();
Stub().reset();
}

Expand Down
14 changes: 7 additions & 7 deletions src/response_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ResponseSender::ResponseSender(

ResponseSender::~ResponseSender()
{
DeleteResponseFactory();
// DeleteResponseFactory();
}

void
Expand Down Expand Up @@ -172,7 +172,7 @@ ResponseSender::Send(

{
bi::scoped_lock<bi::interprocess_mutex> guard{send_message_payload->mu};
stub->SendIPCMessage(ipc_message);
stub->SendIPCUtilsMessage(ipc_message);
while (!send_message_payload->is_stub_turn) {
send_message_payload->cv.wait(guard);
}
Expand Down Expand Up @@ -248,7 +248,7 @@ ResponseSender::Send(
}

if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
DeleteResponseFactory();
// DeleteResponseFactory();
}
}

Expand All @@ -270,10 +270,10 @@ ResponseSender::DeleteResponseFactory()
{
bool already_deleted = response_factory_deleted_.exchange(true);
if (!already_deleted) {
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
stub->EnqueueCleanupId(
reinterpret_cast<void*>(response_factory_address_),
PYTHONSTUB_DecoupledResponseFactoryCleanup);
// std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
// stub->EnqueueCleanupId(
// reinterpret_cast<void*>(response_factory_address_),
// PYTHONSTUB_DecoupledResponseFactoryCleanup);
}
}

Expand Down

0 comments on commit b36b55d

Please sign in to comment.