From 521874e4e6849b4df53d67c205666cbd8d6f498f Mon Sep 17 00:00:00 2001 From: krishung5 Date: Wed, 6 Mar 2024 01:49:42 -0800 Subject: [PATCH] Initial commit --- CMakeLists.txt | 2 + src/correlation_id.cc | 120 ++++++++++++++++++++++++++++++++++++++++ src/correlation_id.h | 99 +++++++++++++++++++++++++++++++++ src/infer_request.cc | 86 +++++++++++++--------------- src/infer_request.h | 13 +++-- src/pb_stub.cc | 19 ++++++- src/python_be.cc | 21 ++++++- src/request_executor.cc | 12 +++- 8 files changed, 312 insertions(+), 60 deletions(-) create mode 100644 src/correlation_id.cc create mode 100644 src/correlation_id.h diff --git a/CMakeLists.txt b/CMakeLists.txt index bc5387ef..cea5a50b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -148,6 +148,8 @@ configure_file(src/libtriton_python.ldscript libtriton_python.ldscript COPYONLY) set( COMMON_SRCS + src/correlation_id.cc + src/correlation_id.h src/infer_response.cc src/infer_response.h src/infer_request.cc diff --git a/src/correlation_id.cc b/src/correlation_id.cc new file mode 100644 index 00000000..2c324c2f --- /dev/null +++ b/src/correlation_id.cc @@ -0,0 +1,120 @@ +// Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "correlation_id.h" + +namespace triton { namespace backend { namespace python { + +SequenceId::SequenceId() + : sequence_label_(""), sequence_index_(0), id_type_(CorrIdDataType::UINT64) +{ +} + +SequenceId::SequenceId(const std::string& sequence_label) + : sequence_label_(sequence_label), sequence_index_(0), + id_type_(CorrIdDataType::STRING) +{ +} + +SequenceId::SequenceId(uint64_t sequence_index) + : sequence_label_(""), sequence_index_(sequence_index), + id_type_(CorrIdDataType::UINT64) +{ +} + +SequenceId::SequenceId(const SequenceId& rhs) +{ + sequence_index_ = rhs.sequence_index_; + id_type_ = rhs.id_type_; + sequence_label_ = rhs.sequence_label_; +} + +// SequenceId::SequenceId(const std::unique_ptr& rhs_ptr) +// { +// sequence_index_ = rhs_ptr->sequence_index_; +// id_type_ = rhs_ptr->id_type_; +// sequence_label_ = rhs_ptr->sequence_label_; +// } + +SequenceId& +SequenceId::operator=(const SequenceId& rhs) +{ + sequence_index_ = rhs.sequence_index_; + id_type_ = rhs.id_type_; + sequence_label_ = rhs.sequence_label_; + return *this; +} + +void +SequenceId::SaveToSharedMemory(std::unique_ptr& shm_pool) +{ + AllocatedSharedMemory sequence_id_shm = + shm_pool->Construct(); + sequence_id_shm_ptr_ = sequence_id_shm.data_.get(); + + std::unique_ptr sequence_label_shm = + PbString::Create(shm_pool, sequence_label_); + + sequence_id_shm_ptr_->sequence_index = sequence_index_; + sequence_id_shm_ptr_->sequence_label_shm_handle = + sequence_label_shm->ShmHandle(); + sequence_id_shm_ptr_->id_type = id_type_; + + // Save the references to shared memory. + sequence_id_shm_ = std::move(sequence_id_shm); + sequence_label_shm_ = std::move(sequence_label_shm); + shm_handle_ = sequence_id_shm_.handle_; +} + +std::unique_ptr +SequenceId::LoadFromSharedMemory( + std::unique_ptr& shm_pool, + bi::managed_external_buffer::handle_t handle) +{ + AllocatedSharedMemory sequence_id_shm = + shm_pool->Load(handle); + SequenceIdShm* sequence_id_shm_ptr = sequence_id_shm.data_.get(); + + std::unique_ptr sequence_label_shm = PbString::LoadFromSharedMemory( + shm_pool, sequence_id_shm_ptr->sequence_label_shm_handle); + + return std::unique_ptr( + new SequenceId(sequence_id_shm, sequence_label_shm)); +} + +SequenceId::SequenceId( + AllocatedSharedMemory& sequence_id_shm, + std::unique_ptr& sequence_label_shm) + : sequence_id_shm_(std::move(sequence_id_shm)), + sequence_label_shm_(std::move(sequence_label_shm)) +{ + sequence_id_shm_ptr_ = sequence_id_shm_.data_.get(); + sequence_label_ = sequence_label_shm_->String(); + sequence_index_ = sequence_id_shm_ptr_->sequence_index; + id_type_ = sequence_id_shm_ptr_->id_type; +} + +}}}; // namespace triton::backend::python diff --git a/src/correlation_id.h b/src/correlation_id.h new file mode 100644 index 00000000..c3ff796d --- /dev/null +++ b/src/correlation_id.h @@ -0,0 +1,99 @@ +// Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include + +#include "pb_string.h" +#include "pb_utils.h" + +namespace triton { namespace backend { namespace python { + +enum class CorrIdDataType { UINT64, STRING }; + +struct SequenceIdShm { + bi::managed_external_buffer::handle_t sequence_label_shm_handle; + uint64_t sequence_index; + CorrIdDataType id_type; +}; + +class SequenceId { + public: + SequenceId(); + SequenceId(const std::string& sequence_label); + SequenceId(uint64_t sequence_index); + SequenceId(const SequenceId& rhs); + // SequenceId(const std::unique_ptr& rhs_ptr); + SequenceId& operator=(const SequenceId& rhs); + + // ~SequenceId(){}; + + /// Save SequenceId object to shared memory. + /// \param shm_pool Shared memory pool to save the SequenceId object. + void SaveToSharedMemory(std::unique_ptr& shm_pool); + + /// Create a SequenceId object from shared memory. + /// \param shm_pool Shared memory pool + /// \param handle Shared memory handle of the SequenceId. + /// \return Returns the SequenceId in the specified request_handle + /// location. + static std::unique_ptr LoadFromSharedMemory( + std::unique_ptr& shm_pool, + bi::managed_external_buffer::handle_t handle); + + // Functions that help determine exact type of sequence Id + CorrIdDataType Type() const { return id_type_; } + bool InSequence() const + { + return ((sequence_label_ != "") || (sequence_index_ != 0)); + } + + // Get the value of the SequenceId based on the type + const std::string& StringValue() const { return sequence_label_; } + uint64_t UnsignedIntValue() const { return sequence_index_; } + + bi::managed_external_buffer::handle_t ShmHandle() { return shm_handle_; } + + private: + // The private constructor for creating a SequenceId object from shared + // memory. + SequenceId( + AllocatedSharedMemory& sequence_id_shm, + std::unique_ptr& sequence_label_shm); + + std::string sequence_label_; + uint64_t sequence_index_; + CorrIdDataType id_type_; + + // Shared Memory Data Structures + AllocatedSharedMemory sequence_id_shm_; + SequenceIdShm* sequence_id_shm_ptr_; + bi::managed_external_buffer::handle_t shm_handle_; + std::unique_ptr sequence_label_shm_; +}; + +}}}; // namespace triton::backend::python diff --git a/src/infer_request.cc b/src/infer_request.cc index f18900d0..ff131de1 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -38,7 +38,7 @@ namespace triton { namespace backend { namespace python { InferRequest::InferRequest( - const std::string& request_id, uint64_t correlation_id, + const std::string& request_id, const SequenceId& correlation_id, const std::vector>& inputs, const std::set& requested_output_names, const std::string& model_name, const int64_t model_version, @@ -97,7 +97,7 @@ InferRequest::RequestId() return request_id_; } -uint64_t +SequenceId& InferRequest::CorrelationId() { return correlation_id_; @@ -196,14 +196,13 @@ InferRequest::SaveToSharedMemory(std::unique_ptr& shm_pool) sizeof(InferRequestShm) + (RequestedOutputNames().size() * sizeof(bi::managed_external_buffer::handle_t)) + - (Inputs().size() * sizeof(bi::managed_external_buffer::handle_t)) + - PbString::ShmStructSize(ModelName()) + - PbString::ShmStructSize(RequestId()) + - PbString::ShmStructSize(Parameters())); + (Inputs().size() * sizeof(bi::managed_external_buffer::handle_t))); infer_request_shm_ptr_ = reinterpret_cast(infer_request_shm.data_.get()); - infer_request_shm_ptr_->correlation_id = CorrelationId(); + correlation_id_.SaveToSharedMemory(shm_pool); + infer_request_shm_ptr_->correlation_id_shm_handle = + correlation_id_.ShmHandle(); infer_request_shm_ptr_->input_count = Inputs().size(); infer_request_shm_ptr_->model_version = model_version_; infer_request_shm_ptr_->requested_output_count = @@ -246,30 +245,17 @@ InferRequest::SaveToSharedMemory(std::unique_ptr& shm_pool) i++; } - size_t model_name_offset = - sizeof(InferRequestShm) + - (RequestedOutputNames().size() * - sizeof(bi::managed_external_buffer::handle_t)) + - (Inputs().size() * sizeof(bi::managed_external_buffer::handle_t)); - - std::unique_ptr model_name_shm = PbString::Create( - ModelName(), - reinterpret_cast(infer_request_shm_ptr_) + model_name_offset, - infer_request_shm.handle_ + model_name_offset); - - size_t request_id_offset = - model_name_offset + PbString::ShmStructSize(ModelName()); - std::unique_ptr request_id_shm = PbString::Create( - RequestId(), - reinterpret_cast(infer_request_shm_ptr_) + request_id_offset, - infer_request_shm.handle_ + request_id_offset); - - size_t parameters_offset = - request_id_offset + PbString::ShmStructSize(RequestId()); - std::unique_ptr parameters_shm = PbString::Create( - Parameters(), - reinterpret_cast(infer_request_shm_ptr_) + parameters_offset, - infer_request_shm.handle_ + parameters_offset); + std::unique_ptr model_name_shm = + PbString::Create(shm_pool, ModelName()); + + std::unique_ptr request_id_shm = + PbString::Create(shm_pool, RequestId()); + + std::unique_ptr parameters_shm = + PbString::Create(shm_pool, Parameters()); + infer_request_shm_ptr_->model_name_shm_handle = model_name_shm->ShmHandle(); + infer_request_shm_ptr_->request_id_shm_handle = request_id_shm->ShmHandle(); + infer_request_shm_ptr_->parameters_shm_handle = parameters_shm->ShmHandle(); // Save the references to shared memory. infer_request_shm_ = std::move(infer_request_shm); @@ -321,34 +307,27 @@ InferRequest::LoadFromSharedMemory( input_tensors.emplace_back(std::move(input_tensor)); } - size_t model_name_offset = - sizeof(InferRequestShm) + - (requested_output_count * sizeof(bi::managed_external_buffer::handle_t)) + - (infer_request_shm_ptr->input_count * - sizeof(bi::managed_external_buffer::handle_t)); + std::unique_ptr correlation_id_shm = + SequenceId::LoadFromSharedMemory( + shm_pool, infer_request_shm_ptr->correlation_id_shm_handle); std::unique_ptr model_name_shm = PbString::LoadFromSharedMemory( - request_handle + model_name_offset, - reinterpret_cast(infer_request_shm_ptr) + model_name_offset); - - size_t request_id_offset = model_name_offset + model_name_shm->Size(); + shm_pool, infer_request_shm_ptr->model_name_shm_handle); std::unique_ptr request_id_shm = PbString::LoadFromSharedMemory( - request_handle + request_id_offset, - reinterpret_cast(infer_request_shm_ptr) + request_id_offset); - - size_t parameters_offset = request_id_offset + request_id_shm->Size(); + shm_pool, infer_request_shm_ptr->request_id_shm_handle); std::unique_ptr parameters_shm = PbString::LoadFromSharedMemory( - request_handle + request_id_offset, - reinterpret_cast(infer_request_shm_ptr) + parameters_offset); + shm_pool, infer_request_shm_ptr->parameters_shm_handle); return std::unique_ptr(new InferRequest( - infer_request_shm, request_id_shm, requested_output_names_shm, - model_name_shm, input_tensors, parameters_shm)); + infer_request_shm, request_id_shm, correlation_id_shm, + requested_output_names_shm, model_name_shm, input_tensors, + parameters_shm)); } InferRequest::InferRequest( AllocatedSharedMemory& infer_request_shm, std::unique_ptr& request_id_shm, + std::unique_ptr& correlation_id_shm, std::vector>& requested_output_names_shm, std::unique_ptr& model_name_shm, std::vector>& input_tensors, @@ -387,7 +366,6 @@ InferRequest::InferRequest( model_name_ = model_name_shm_->String(); flags_ = infer_request_shm_ptr_->flags; model_version_ = infer_request_shm_ptr_->model_version; - correlation_id_ = infer_request_shm_ptr_->correlation_id; request_address_ = infer_request_shm_ptr_->address; response_factory_address_ = infer_request_shm_ptr_->response_factory_address; is_decoupled_ = infer_request_shm_ptr_->is_decoupled; @@ -396,6 +374,16 @@ InferRequest::InferRequest( trace_ = infer_request_shm_ptr_->trace; request_release_flags_ = infer_request_shm_ptr_->request_release_flags; + if (correlation_id_shm->Type() == CorrIdDataType::STRING) { + correlation_id_ = SequenceId(correlation_id_shm->StringValue()); + std::cerr << "=== InferRequest::InferRequest: correlation_id_ = " + << correlation_id_.StringValue() << std::endl; + } else { + correlation_id_ = SequenceId(correlation_id_shm->UnsignedIntValue()); + std::cerr << "=== InferRequest::InferRequest: correlation_id_ = " + << correlation_id_.UnsignedIntValue() << std::endl; + } + #ifdef TRITON_PB_STUB pb_cancel_ = std::make_shared(response_factory_address_, request_address_); diff --git a/src/infer_request.h b/src/infer_request.h index ba586535..1b3ad989 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -29,6 +29,7 @@ #include #include +#include "correlation_id.h" #include "infer_response.h" #include "pb_preferred_memory.h" #include "pb_tensor.h" @@ -62,7 +63,7 @@ struct InferenceTrace { // Inference Request // struct InferRequestShm { - uint64_t correlation_id; + bi::managed_external_buffer::handle_t correlation_id_shm_handle; uint32_t input_count; uint32_t requested_output_count; int64_t model_version; @@ -74,12 +75,15 @@ struct InferRequestShm { PreferredMemory preferred_memory; InferenceTrace trace; uint32_t request_release_flags; + bi::managed_external_buffer::handle_t model_name_shm_handle; + bi::managed_external_buffer::handle_t request_id_shm_handle; + bi::managed_external_buffer::handle_t parameters_shm_handle; }; class InferRequest { public: InferRequest( - const std::string& request_id, uint64_t correlation_id, + const std::string& request_id, const SequenceId& correlation_id, const std::vector>& inputs, const std::set& requested_output_names, const std::string& model_name, const int64_t model_version, @@ -93,7 +97,7 @@ class InferRequest { const std::vector>& Inputs(); const std::string& RequestId(); const std::string& Parameters(); - uint64_t CorrelationId(); + SequenceId& CorrelationId(); const std::string& ModelName(); int64_t ModelVersion(); uint32_t Flags(); @@ -141,13 +145,14 @@ class InferRequest { InferRequest( AllocatedSharedMemory& infer_request_shm, std::unique_ptr& request_id_shm, + std::unique_ptr& correlation_id, std::vector>& requested_output_names_shm, std::unique_ptr& model_name_shm, std::vector>& input_tensors, std::unique_ptr& parameters_shm); std::string request_id_; - uint64_t correlation_id_; + SequenceId correlation_id_; std::vector> inputs_; std::set requested_output_names_; std::string model_name_; diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 26003f71..80a3586e 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -42,6 +42,7 @@ #include #include +#include "correlation_id.h" #include "model_loader.h" #include "pb_error.h" #include "pb_map.h" @@ -1615,7 +1616,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) py::class_>( module, "InferenceRequest") .def( - py::init([](const std::string& request_id, uint64_t correlation_id, + py::init([](const std::string& request_id, + const py::object& correlation_id, const std::vector>& inputs, const std::vector& requested_output_names, const std::string& model_name, @@ -1648,8 +1650,21 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) py::module_ py_json = py::module_::import("json"); std::string parameters_str = py::str(py_json.attr("dumps")(parameters)); + + SequenceId correlation_id_obj; + if (py::isinstance(correlation_id)) { + correlation_id_obj = + SequenceId(py::cast(correlation_id)); + } else if (py::isinstance(correlation_id)) { + correlation_id_obj = + SequenceId(py::cast(correlation_id)); + } else { + throw PythonBackendException( + "Correlation ID must be integer or string"); + } + return std::make_shared( - request_id, correlation_id, inputs, requested_outputs, + request_id, correlation_id_obj, inputs, requested_outputs, model_name, model_version, parameters_str, flags, timeout, 0 /*response_factory_address*/, 0 /*request_address*/, preferred_memory, trace); diff --git a/src/python_be.cc b/src/python_be.cc index 0fa318ff..7f7ebb46 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -27,6 +27,7 @@ #include +#include "correlation_id.h" #include "gpu_buffers.h" #include "infer_payload.h" #include "model_loader.h" @@ -362,9 +363,23 @@ ModelInstanceState::SaveRequestsToSharedMemory( const char* id; RETURN_IF_ERROR(TRITONBACKEND_RequestId(request, &id)); - uint64_t correlation_id; - RETURN_IF_ERROR( - TRITONBACKEND_RequestCorrelationId(request, &correlation_id)); + // uint64_t correlation_id; + // RETURN_IF_ERROR( + // TRITONBACKEND_RequestCorrelationId(request, &correlation_id)); + + uint64_t correlation_id_uint = 0; + SequenceId correlation_id; + + auto error = + TRITONBACKEND_RequestCorrelationId(request, &correlation_id_uint); + if (error != nullptr) { + const char* correlation_id_string = ""; + RETURN_IF_ERROR(TRITONBACKEND_RequestCorrelationIdString( + request, &correlation_id_string)); + correlation_id = SequenceId(std::string(correlation_id_string)); + } else { + correlation_id = SequenceId(correlation_id_uint); + } uint32_t flags; RETURN_IF_ERROR(TRITONBACKEND_RequestFlags(request, &flags)); diff --git a/src/request_executor.cc b/src/request_executor.cc index d78972a5..224fcac0 100644 --- a/src/request_executor.cc +++ b/src/request_executor.cc @@ -28,6 +28,7 @@ #include +#include "correlation_id.h" #include "pb_utils.h" #include "scoped_defer.h" #include "triton/backend/backend_common.h" @@ -354,8 +355,15 @@ RequestExecutor::Infer( THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetId( irequest, infer_request->RequestId().c_str())); - THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetCorrelationId( - irequest, infer_request->CorrelationId())); + // THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetCorrelationId( + // irequest, infer_request->CorrelationId())); + if (infer_request->CorrelationId().Type() == CorrIdDataType::UINT64) { + THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetCorrelationId( + irequest, infer_request->CorrelationId().UnsignedIntValue())); + } else { + THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetCorrelationIdString( + irequest, infer_request->CorrelationId().StringValue().c_str())); + } THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetFlags( irequest, infer_request->Flags()));