diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..30973e42 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,31 @@ +FROM asnpdsacr.azurecr.io/public/tritonserver:23.05-tf2-python-py3 + +#RUN DEBIAN_FRONTEND="noninteractive" apt-get update && apt-get -y install tzdata + +RUN apt-get update \ + && apt-get install -y build-essential \ + gcc \ + g++ \ + gdb \ + clang \ + make \ + ninja-build \ + cmake \ + autoconf \ + automake \ + libtool \ + valgrind \ + locales-all \ + dos2unix \ + rsync \ + tar +RUN apt-get install -y python3-pip python3.10-dev +RUN apt-get install -y rapidjson-dev libarchive-dev zlib1g-dev +RUN apt-get install -y git +RUN pip3 install numpy +RUN rm -r /opt/tritonserver/backends/python +RUN git config --global --add safe.directory '*' +RUN apt-get install -y ssh + +RUN useradd -m user && yes password | passwd user +RUN apt-get install gdbserver \ No newline at end of file diff --git a/models/.DS_Store b/models/.DS_Store new file mode 100644 index 00000000..de05d4be Binary files /dev/null and b/models/.DS_Store differ diff --git a/models/category_tensorflow_model/1/model.savedmodel/fingerprint.pb b/models/category_tensorflow_model/1/model.savedmodel/fingerprint.pb new file mode 100644 index 00000000..9dd4a74c --- /dev/null +++ b/models/category_tensorflow_model/1/model.savedmodel/fingerprint.pb @@ -0,0 +1 @@ +åÖ§£ßþ¡¨D Ÿ ë ÈÙ²±¥¡Òç€Ù¿¸ µ¶°ÿÛäñŠÅ(û…–œÿµ’‹˜2 \ No newline at end of file diff --git a/models/category_tensorflow_model/1/model.savedmodel/saved_model.pb b/models/category_tensorflow_model/1/model.savedmodel/saved_model.pb new file mode 100644 index 00000000..b91baea3 Binary files /dev/null and b/models/category_tensorflow_model/1/model.savedmodel/saved_model.pb differ diff --git a/models/category_tensorflow_model/1/model.savedmodel/variables/variables.data-00000-of-00001 b/models/category_tensorflow_model/1/model.savedmodel/variables/variables.data-00000-of-00001 new file mode 100644 index 00000000..94b922e8 Binary files /dev/null and b/models/category_tensorflow_model/1/model.savedmodel/variables/variables.data-00000-of-00001 differ diff --git a/models/category_tensorflow_model/1/model.savedmodel/variables/variables.index b/models/category_tensorflow_model/1/model.savedmodel/variables/variables.index new file mode 100644 index 00000000..1e8815e0 Binary files /dev/null and b/models/category_tensorflow_model/1/model.savedmodel/variables/variables.index differ diff --git a/models/category_tensorflow_model/config.pbtxt b/models/category_tensorflow_model/config.pbtxt new file mode 100755 index 00000000..cdd72390 --- /dev/null +++ b/models/category_tensorflow_model/config.pbtxt @@ -0,0 +1,43 @@ +name: "category_tensorflow_model" +platform: "tensorflow_savedmodel" +max_batch_size: 0 + +parameters: { + key: "TF_SIGNATURE_DEF" + value: { + string_value: "call" + } +} + +input [ + { + name: "candidatesss" + data_type: TYPE_FP32 + dims: [ -1 , -1] + + } +] +input [ + { + name: "user_history" + data_type: TYPE_FP32 + dims: [ -1 , -1] + + } +] +output [ + { + name: "scores" + data_type: TYPE_FP32 + dims: [ -1 ] + } +] + +instance_group [ + { + count: 2 + kind: KIND_CPU + } + ] + +dynamic_batching { } \ No newline at end of file diff --git a/models/test_bls/1/model.py b/models/test_bls/1/model.py new file mode 100644 index 00000000..76a63602 --- /dev/null +++ b/models/test_bls/1/model.py @@ -0,0 +1,63 @@ +""" +Category model +""" +import time +from typing import cast + +import numpy as np + +try: + import triton_python_backend_utils as pb_utils +except ImportError: + import tests.stub.triton_python_backend_utils + + pb_utils: tests.stub.triton_python_backend_utils = cast( + tests.stub.triton_python_backend_utils, None + ) + + +def breakpoint(): + import pydevd_pycharm + + pydevd_pycharm.settrace( + 'host.docker.internal', port=5858, stdoutToServer=True, stderrToServer=True + ) + + +class TritonPythonModel: + def initialize(self, args): + import triton_python_backend_utils + self.shm = triton_python_backend_utils.shared_memory + self.candidates_cache = np.random.random((500000, 200)).astype(np.float32) + + def execute_request(self, request): + n = int(pb_utils.get_input_tensor_by_name(request, "n").as_numpy()[0]) + candidates = np.random.randint(100000, size=n) + candidate_tensor: pb_utils.Tensor = pb_utils.new_shm_tensor("candidatesss", self.shm, (n, 200), np.float32) + np.take(self.candidates_cache, candidates, axis=0, out=candidate_tensor.as_numpy(), mode='clip') + + context_array = np.random.random((10, 200)).astype(np.float32) + context_tensor = pb_utils.Tensor( + "user_history", + context_array, + ) + + inference_response = pb_utils.InferenceRequest( + model_name="category_tensorflow_model", + requested_output_names=["scores"], + inputs=[candidate_tensor, context_tensor], + ).exec() + + if inference_response.has_error(): + raise pb_utils.TritonModelException(inference_response.error().message()) + else: + scores = pb_utils.get_output_tensor_by_name(inference_response, "scores") + + out_scores = pb_utils.Tensor("scores", scores.as_numpy()[:400]) + + response = pb_utils.InferenceResponse(output_tensors=[out_scores]) + + return response + + def execute(self, requests): + return [self.execute_request(request) for request in requests] diff --git a/models/test_bls/config.pbtxt b/models/test_bls/config.pbtxt new file mode 100644 index 00000000..95b8e3ce --- /dev/null +++ b/models/test_bls/config.pbtxt @@ -0,0 +1,22 @@ +name: "test_bls" +backend: "python" + +input [ + { + name: "n" + data_type: TYPE_INT32 + dims: [ -1] + + } +] + +output [ + { + name: "scores" + data_type: TYPE_FP32 + dims: [ -1 ] + } +] + + +instance_group [{ kind: KIND_CPU }] diff --git a/models/test_bls_before/1/model.py b/models/test_bls_before/1/model.py new file mode 100644 index 00000000..c2f8ff33 --- /dev/null +++ b/models/test_bls_before/1/model.py @@ -0,0 +1,68 @@ +""" +Category model +""" +import time +from typing import cast + +import numpy as np + +try: + import triton_python_backend_utils as pb_utils +except ImportError: + import tests.stub.triton_python_backend_utils + + pb_utils: tests.stub.triton_python_backend_utils = cast( + tests.stub.triton_python_backend_utils, None + ) + + +def breakpoint(): + import pydevd_pycharm + + pydevd_pycharm.settrace( + 'host.docker.internal', port=5858, stdoutToServer=True, stderrToServer=True + ) + + +class TritonPythonModel: + def initialize(self, args): + import triton_python_backend_utils + self.shm = triton_python_backend_utils.shared_memory + self.candidates_cache = np.random.random((500000, 200)).astype(np.float32) + + def execute_request(self, request): + n = pb_utils.get_input_tensor_by_name(request, "n").as_numpy()[0] + candidates = np.random.randint(100000, size=int(n)) + + context_array = np.random.random((10, 200)).astype(np.float32) + candidates_array = np.take(self.candidates_cache, candidates, axis=0) + + candidate_tensor = pb_utils.Tensor( + "candidatesss", + candidates_array, + ) + + context_tensor = pb_utils.Tensor( + "user_history", + context_array, + ) + + inference_response = pb_utils.InferenceRequest( + model_name="category_tensorflow_model", + requested_output_names=["scores"], + inputs=[candidate_tensor, context_tensor], + ).exec() + + if inference_response.has_error(): + raise pb_utils.TritonModelException(inference_response.error().message()) + else: + scores = pb_utils.get_output_tensor_by_name(inference_response, "scores") + + out_scores = pb_utils.Tensor("scores", scores.as_numpy()[:400]) + + response = pb_utils.InferenceResponse(output_tensors=[out_scores]) + + return response + + def execute(self, requests): + return [self.execute_request(request) for request in requests] diff --git a/models/test_bls_before/config.pbtxt b/models/test_bls_before/config.pbtxt new file mode 100644 index 00000000..22171c53 --- /dev/null +++ b/models/test_bls_before/config.pbtxt @@ -0,0 +1,22 @@ +name: "test_bls_before" +backend: "python" + +input [ + { + name: "n" + data_type: TYPE_INT32 + dims: [ -1] + + } +] + +output [ + { + name: "scores" + data_type: TYPE_FP32 + dims: [ -1 ] + } +] + + +instance_group [{ kind: KIND_CPU }] diff --git a/models/test_take/1/model.py b/models/test_take/1/model.py new file mode 100644 index 00000000..459b4bcd --- /dev/null +++ b/models/test_take/1/model.py @@ -0,0 +1,56 @@ +""" +Category model +""" +import time +from typing import cast +import timeit +import numpy as np + +try: + import triton_python_backend_utils as pb_utils +except ImportError: + import tests.stub.triton_python_backend_utils + + pb_utils: tests.stub.triton_python_backend_utils = cast( + tests.stub.triton_python_backend_utils, None + ) + + +def breakpoint(): + import pydevd_pycharm + + pydevd_pycharm.settrace( + 'host.docker.internal', port=5858, stdoutToServer=True, stderrToServer=True + ) + + +class TritonPythonModel: + def initialize(self, args): + import triton_python_backend_utils + shm = triton_python_backend_utils.shared_memory + n = 100000 + candidate_tensor = pb_utils.new_shm_tensor("candidatesss", shm, (n, 200), np.float32) # Offset is 68 + buffer = candidate_tensor.as_numpy() + + pb_utils.Logger.log_error(f"buffer - {buffer}, {buffer.dtype}, {buffer.shape}, {buffer.flags}, {buffer.base}") + candidates_cache = np.random.random((500000, 200)).astype(np.float32) + candidates = np.random.randint(100000, size=n) + np_out = np.empty((n, 200), dtype=np.float32) + + r1 = timeit.timeit("buffer[:] = np.take(candidates_cache, candidates, axis=0, mode='clip')", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np})*10 + r2 = timeit.timeit("np.take(candidates_cache, candidates, axis=0, mode='clip', out=buffer)", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np})*10 + r3 = timeit.timeit("r = np.take(candidates_cache, candidates, axis=0, mode='clip')", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np})*10 + r4 = timeit.timeit("np.take(candidates_cache, candidates, axis=0, mode='clip', out=np_out)", number=100, globals={"candidates_cache":candidates_cache, "candidates":candidates, "buffer": buffer, "np":np, "np_out":np_out})*10 + + pb_utils.Logger.log_error(f"Buffer - assignment - {r1}") + pb_utils.Logger.log_error(f"Buffer - output - {r2}") + pb_utils.Logger.log_error(f"Baseline - assignment - {r3}") + pb_utils.Logger.log_error(f"Baseline - np out - {r4}") + pb_utils.Logger.log_error(f"numpy version {np.__version__}") + + + def execute_request(self, request): + pass + + def execute(self, requests): + return [self.execute_request(request) for request in requests] diff --git a/models/test_take/config.pbtxt b/models/test_take/config.pbtxt new file mode 100644 index 00000000..5fa89919 --- /dev/null +++ b/models/test_take/config.pbtxt @@ -0,0 +1,22 @@ +name: "test_take" +backend: "python" + +input [ + { + name: "n" + data_type: TYPE_INT32 + dims: [ -1] + + } +] + +output [ + { + name: "scores" + data_type: TYPE_FP32 + dims: [ -1 ] + } +] + + +instance_group [{ kind: KIND_CPU }] diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 9539a250..6b3cb983 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -431,8 +431,12 @@ Stub::StubSetup() py::setattr( python_backend_utils, "MetricFamily", c_python_backend_utils.attr("MetricFamily")); + py::setattr( + python_backend_utils, "new_shm_tensor", + c_python_backend_utils.attr("new_shm_tensor")); c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get()); + python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get()); deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor"); serialize_bytes_ = python_backend_utils.attr("serialize_byte_tensor"); @@ -494,6 +498,7 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle) python_backend_utils, "InferenceResponse", c_python_backend_utils.attr("InferenceResponse")); c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get()); + python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get()); py::object TritonPythonModel = sys.attr("TritonPythonModel"); deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor"); @@ -1516,7 +1521,7 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) .def("get_response_sender", &InferRequest::GetResponseSender); py::class_>(module, "Tensor") - .def(py::init(&PbTensor::FromNumpy)) + .def(py::init(&PbTensor::FromNumpy), py::arg("name"), py::arg("numpy_array")) .def("name", &PbTensor::Name) // The reference_internal is added to make sure that the NumPy object has // the same lifetime as the tensor object. This means even when the NumPy @@ -1603,6 +1608,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) py::register_exception( module, "TritonModelException"); + + module.def("new_shm_tensor", &PbTensor::CreateInSHM, "Creates a new Tensor directly into shared memory"); } extern "C" { diff --git a/src/pb_tensor.cc b/src/pb_tensor.cc index 20d5302f..b405b661 100644 --- a/src/pb_tensor.cc +++ b/src/pb_tensor.cc @@ -35,6 +35,9 @@ namespace py = pybind11; #endif #include "pb_tensor.h" +#ifndef TRITON_PB_STUB +#include "triton/common/logging.h" +#endif namespace triton { namespace backend { namespace python { @@ -226,6 +229,68 @@ delete_unused_dltensor(PyObject* dlp) } } +std::shared_ptr +PbTensor::CreateInSHM(const std::string& name, SharedMemoryManager& shm_pool, std::vector dims, py::object data_type) +{ + + // Input params of tensor + //std::vector dims = std::vector({10, 10}); + TRITONSERVER_DataType dtype = numpy_to_triton_type(data_type); + + TRITONSERVER_MemoryType memory_type_ = TRITONSERVER_MEMORY_CPU; + uint64_t elements = 1; + for (size_t i = 0; i < dims.size(); i++) { + elements *= dims[i]; + } + py::module np = py::module::import("numpy"); + uint64_t item_size = np.attr("dtype")(data_type).attr("itemsize").cast(); + uint64_t byte_size_ = elements * item_size; + + // Calculate the offset of the data and add padding so the numpy array is memory aligned + std::size_t name_offset = sizeof(TensorShm) + sizeof(int64_t) * dims.size(); + std::size_t pb_memory_offset = name_offset + PbString::ShmStructSize(name); + std::size_t padding = pb_memory_offset % item_size; + std::cout << "Required padding " << padding << "\n"; + uint64_t byte_size; + byte_size = sizeof(TensorShm) + sizeof(int64_t) * dims.size() + + PbString::ShmStructSize(name) + + PbMemory::ShmStructSize(memory_type_, byte_size_); + + // Do the allocation + AllocatedSharedMemory tensor_shm = shm_pool.Construct(byte_size); + auto shm_handle = tensor_shm.handle_; + auto shm_data = tensor_shm.data_.get(); + + // Wrap the raw memory in TensorShm + auto* tensor_shm_ptr = reinterpret_cast(shm_data); + tensor_shm_ptr->dtype = dtype; + tensor_shm_ptr->dims_count = dims.size(); + + // Write the dimensions data to shared memory. + auto* dims_shm_ptr_ = reinterpret_cast( + reinterpret_cast(tensor_shm_ptr) + sizeof(TensorShm)); + for (size_t i = 0; i < dims.size(); i++) { + dims_shm_ptr_[i] = dims[i]; + } + + // Write the name data to shared memory. + auto name_shm = PbString::Create(name, reinterpret_cast(tensor_shm_ptr) + name_offset, shm_handle + name_offset); + + int64_t memory_type_id_ = 0; // Maybe + + auto pb_memory = PbMemory::Create( + memory_type_, memory_type_id_, byte_size_, + nullptr, + reinterpret_cast(tensor_shm_ptr) + pb_memory_offset, + shm_handle + pb_memory_offset, false); + tensor_shm_ptr->memory = 0; + std::cout << "Offset is - " << pb_memory_offset<< "\n"; + + return std::unique_ptr( + new PbTensor(tensor_shm, name_shm, pb_memory)); +} + + std::shared_ptr PbTensor::FromNumpy(const std::string& name, py::array& numpy_array) { @@ -602,9 +667,8 @@ PbTensor::LoadFromSharedMemory( pb_memory = PbMemory::LoadFromSharedMemory( shm_pool, tensor_shm_ptr->memory, open_cuda_handle); } - return std::unique_ptr( - new PbTensor(tensor_shm, name_shm, pb_memory)); + new PbTensor(tensor_shm, name_shm, pb_memory)); } TRITONSERVER_DataType @@ -631,11 +695,14 @@ PbTensor::PbTensor( : tensor_shm_(std::move(tensor_shm)), name_shm_(std::move(name_shm)), pb_memory_(std::move(pb_memory)) { + + tensor_shm_ptr_ = reinterpret_cast(tensor_shm_.data_.get()); dims_shm_ptr_ = reinterpret_cast( reinterpret_cast(tensor_shm_ptr_) + sizeof(TensorShm)); name_ = name_shm_->String(); + dims_ = std::vector( dims_shm_ptr_, dims_shm_ptr_ + tensor_shm_ptr_->dims_count); dtype_ = tensor_shm_ptr_->dtype; @@ -646,17 +713,18 @@ PbTensor::PbTensor( memory_type_id_ = pb_memory_->MemoryTypeId(); shm_handle_ = tensor_shm_.handle_; + #ifdef TRITON_PB_STUB if (memory_type_ == TRITONSERVER_MEMORY_CPU || memory_type_ == TRITONSERVER_MEMORY_CPU_PINNED) { if (dtype_ != TRITONSERVER_TYPE_BYTES) { py::object numpy_array = - py::array(triton_to_pybind_dtype(dtype_), dims_, (void*)memory_ptr_); - numpy_array_ = numpy_array.attr("view")(triton_to_numpy_type(dtype_)); + py::array(triton_to_pybind_dtype(dtype_), dims_, (void*)memory_ptr_, py::none()); + numpy_array_ = numpy_array.attr("view")(triton_to_numpy_type(dtype_)); } else { py::object numpy_array = py::array( triton_to_pybind_dtype(TRITONSERVER_TYPE_UINT8), {byte_size_}, - (void*)memory_ptr_); + (void*)memory_ptr_, py::none()); py::module triton_pb_utils = py::module::import("triton_python_backend_utils"); numpy_array_ = diff --git a/src/pb_tensor.h b/src/pb_tensor.h index 79adf500..570469da 100644 --- a/src/pb_tensor.h +++ b/src/pb_tensor.h @@ -130,6 +130,10 @@ class PbTensor { static std::shared_ptr FromNumpy( const std::string& name, py::array& numpy_array); + static std::shared_ptr CreateInSHM( + const std::string& name, SharedMemoryManager& shm_pool, std::vector dims, py::object data_type + ); + /// Get device type in DLPack format. DLDeviceType DeviceType();