diff --git a/README.md b/README.md index 22f2fdd..5a4a3fa 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,6 @@ See the License for the specific language governing permissions and limitations under the License. --> - # Triton Distributed

A Datacenter Scale Distributed Inference Serving Framework

@@ -48,9 +47,9 @@ We provide 3 types of builds: 2. `TENSORRTLLM` which includes our TRT-LLM backend 3. `VLLM` which includes our VLLM backend -For example, if you want to build a container for the `VLLM` backend you can run +For example, if you want to build a container for the `STANDARD` backends you can run -`./container/build.sh --framework VLLM` +`./container/build.sh` Please see the instructions in the corresponding example for specific build instructions. @@ -85,13 +84,22 @@ HF_TOKEN```) and mounts common directories such as ```/tmp:/tmp```, Please see the instructions in the corresponding example for specific deployment instructions. +## Hello World - +2. Testing has primarily been on single node systems with processes + launched within a single container. diff --git a/container/Dockerfile b/container/Dockerfile index 59971e1..0948054 100644 --- a/container/Dockerfile +++ b/container/Dockerfile @@ -133,7 +133,7 @@ COPY . /workspace RUN /workspace/icp/protos/gen_python.sh # Sets pythonpath for python modules -ENV PYTHONPATH="${PYTHONPATH}:/workspace/icp/src/python:/workspace/worker/src/python" +ENV PYTHONPATH="${PYTHONPATH}:/workspace/icp/src/python:/workspace/worker/src/python:/workspace/examples" # Command and Entrypoint CMD [] diff --git a/examples/hello_world/README.md b/examples/hello_world/README.md new file mode 100644 index 0000000..e03adc7 --- /dev/null +++ b/examples/hello_world/README.md @@ -0,0 +1,274 @@ + + +# Hello World + +A basic example demonstrating the new interfaces and concepts of +triton distributed. In the hello world example, you can deploy a set +of simple workers to load balance requests from a local work queue. + +The example demonstrates: + +1. How to incorporate an existing Triton Core Model into a triton distributed worker. +2. How to incorporate a standalone python class into a triton distributed worker. +3. How deploy a set of workers +4. How to send requests to the triton distributed deployment +5. Requests over the Request Plane and Data movement over the Data + Plane. + +## Building the Hello World Environment + +The hello world example is designed to be deployed in a containerized +environment and to work with and without GPU support. + +To get started build the "STANDARD" triton distributed development +environment. + +Note: "STANDARD" is the default framework + +``` +./container/build.sh +``` + + +## Starting the Deployment + +``` +./container/run.sh -it -- python3 -m hello_world.deploy --initialize-request-plane +``` + +#### Expected Output + + +``` +Starting Workers +17:17:09 deployment.py:115[triton_distributed.worker.deployment] INFO: + +Starting Worker: + + Config: + WorkerConfig(request_plane=, + data_plane=, + request_plane_args=([], {}), + data_plane_args=([], {}), + log_level=1, + operators=[OperatorConfig(name='encoder', + implementation=, + repository='/workspace/examples/hello_world/operators/triton_core_models', + version=1, + max_inflight_requests=1, + parameters={'config': {'instance_group': [{'count': 1, + 'kind': 'KIND_CPU'}], + 'parameters': {'delay': {'string_value': '0'}, + 'input_copies': {'string_value': '1'}}}}, + log_level=None)], + triton_log_path=None, + name='encoder.0', + log_dir='/workspace/examples/hello_world/logs', + metrics_port=50000) + + +17:17:09 deployment.py:115[triton_distributed.worker.deployment] INFO: + +Starting Worker: + + Config: + WorkerConfig(request_plane=, + data_plane=, + request_plane_args=([], {}), + data_plane_args=([], {}), + log_level=1, + operators=[OperatorConfig(name='decoder', + implementation=, + repository='/workspace/examples/hello_world/operators/triton_core_models', + version=1, + max_inflight_requests=1, + parameters={'config': {'instance_group': [{'count': 1, + 'kind': 'KIND_CPU'}], + 'parameters': {'delay': {'string_value': '0'}, + 'input_copies': {'string_value': '1'}}}}, + log_level=None)], + triton_log_path=None, + name='decoder.0', + log_dir='/workspace/examples/hello_world/logs', + metrics_port=50001) + + +17:17:09 deployment.py:115[triton_distributed.worker.deployment] INFO: + +Starting Worker: + + Config: + WorkerConfig(request_plane=, + data_plane=, + request_plane_args=([], {}), + data_plane_args=([], {}), + log_level=1, + operators=[OperatorConfig(name='encoder_decoder', + implementation='EncodeDecodeOperator', + repository='/workspace/examples/hello_world/operators', + version=1, + max_inflight_requests=1, + parameters={}, + log_level=None)], + triton_log_path=None, + name='encoder_decoder.0', + log_dir='/workspace/examples/hello_world/logs', + metrics_port=50002) + + +Workers started ... press Ctrl-C to Exit +``` + +## Sending Requests + +From a separate terminal run the sample client. + +``` +./container/run.sh -it -- python3 -m hello_world.client +``` + +#### Expected Output + +``` + +Client: 0 Received Response: 42 From: 39491f06-d4f7-11ef-be96-047bcba9020e Error: None: 43%|███████▋ | 43/100 [00:04<00:05, 9.83request/s] + +Throughput: 9.10294484748811 Total Time: 10.985455989837646 +Clients Stopped Exit Code 0 + + +``` + +## Behind the Scenes + +The hello world example is designed to demonstrate and allow +experimenting with different mixtures of compute and memory loads and +different numbers of workers for different parts of the hello world +workflow. + +### Hello World Workflow + +The hello world workflow is a simple two stage pipeline with an +encoding stage and a decoding stage plus an encoder-decoder stage to +orchestrate the overall workflow. + +``` +client <-> encoder_decoder <-> encoder + | + -----<-> decoder +``` + + +#### Encoder + +The encoder follows the simple procedure: + +1. copy the input x times (x is configurable via parameter) +2. invert the input +3. delay * size of output + +#### Decoder + +The decoder follows the simple procedure: + +1. remove the extra copies +2. invert the input +3. delay * size of output + +#### Encoder - Decoder + +The encoder-decoder operator controls the overall workflow. + +It first sends a request for an encoder. Once it receives the response +it sends the output from the encoder as an input to the decoder. Note +in this step memory is transferred directly between the encoder and +decoder workers - and does not pass through the encoder-decoder. + +### Operators + +Operators are responsible for actually doing work and responding to +requests. Operators are supported in two main flavors and are hosted +by a common Worker class. + +#### Triton Core Operator + +The triton core operator makes a triton model (following the [standard +model +repo](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/model_repository.md) +and backend structure of the tritonserver) available on the request +plane. Both the encoder and decoder are implemented as triton python +backend models. + +#### Generic Operator + +The encoder-decoder operator is a python class that implements the +Operator interface. Internally it makes remote requests to other +workers. Generally a operator can make use of other operators for its +work but isn't required to. + +### Workers + +Workers host one or more operators and pull requests from the request +plane and forward them to a local operator. + +### Request Plane + +The current triton distributed framework leverages a distributed work +queue for its request plane implementation. The request plane ensures +that requests for operators are forwarded and serviced by a single +worker. + +### Data Plane + +The triton distributed framework leverages point to point data +transfers using the UCX library to provide optimized primitives for +device to device transfers. + +Data sent over the data plane is only pulled by the worker that needs +to perform work on it. Requests themselves contain data descriptors +and can be referenced and shared with other workers. + +Note: there is also a provision for sending data in the request +contents when the message size is small enough that UCX transfer is +not needed. + +### Components + +Any process which communicates with one or more of the request or data +planes is considered a "component". While this example only uses +"Workers" future examples will also include api servers, routers, and +other types of components. + +### Deployment + +The final piece is a deployment. A deployment is a set of components +deployed across a cluster. Components may be added and removed from +deployments. + + +## Limitations and Caveats + +The example is a rapidly evolving prototype and shouldn't be used in +production. Limited testing has been done and it is meant to help +flesh out the triton distributed concepts, architecture, and +interfaces. + +1. No multi-node testing / support has been done + +2. No performance tuning / measurement has been done + diff --git a/examples/hello_world/__init__.py b/examples/hello_world/__init__.py new file mode 100644 index 0000000..e9d1d88 --- /dev/null +++ b/examples/hello_world/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/examples/hello_world/client/__main__.py b/examples/hello_world/client/__main__.py new file mode 100644 index 0000000..9f7744f --- /dev/null +++ b/examples/hello_world/client/__main__.py @@ -0,0 +1,78 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import signal +import sys +import time +from typing import Optional + +from .client import _start_client +from .parser import parse_args + +processes: Optional[list[multiprocessing.context.SpawnProcess]] = None + + +def handler(signum, frame): + exit_code = 0 + if processes: + print("Stopping Clients") + for process in processes: + process.terminate() + process.kill() + process.join() + if process.exitcode is not None: + exit_code += process.exitcode + print(f"Clients Stopped Exit Code {exit_code}") + sys.exit(exit_code) + + +signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) +for sig in signals: + try: + signal.signal(sig, handler) + except Exception: + pass + + +def main(args): + global processes + process_context = multiprocessing.get_context("spawn") + args.lock = process_context.Lock() + processes = [] + start_time = time.time() + for index in range(args.clients): + processes.append( + process_context.Process(target=_start_client, args=(index, args)) + ) + processes[-1].start() + + for process in processes: + process.join() + end_time = time.time() + print( + f"Throughput: {(args.requests_per_client*args.clients)/(end_time-start_time)} Total Time: {end_time-start_time}" + ) + exit_code = 0 + for process in processes: + if process.exitcode is not None: + exit_code += process.exitcode + print(f"Clients Stopped Exit Code {exit_code}") + return exit_code + + +if __name__ == "__main__": + args = parse_args() + sys.exit(main(args)) diff --git a/examples/hello_world/client/client.py b/examples/hello_world/client/client.py new file mode 100644 index 0000000..e43c84d --- /dev/null +++ b/examples/hello_world/client/client.py @@ -0,0 +1,104 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import sys + +import cupy +import numpy +from tqdm import tqdm +from triton_distributed.icp import NatsRequestPlane, UcpDataPlane +from triton_distributed.worker import RemoteOperator +from tritonserver import MemoryType + + +def _get_input_sizes(args): + return numpy.maximum( + 0, + numpy.round( + numpy.random.normal( + loc=args.input_size_mean, + scale=args.input_size_stdev, + size=args.requests_per_client, + ) + ), + ).astype(int) + + +def _start_client(client_index, args): + sys.exit(asyncio.run(client(client_index, args))) + + +async def client(client_index, args): + request_count = args.requests_per_client + try: + request_plane = NatsRequestPlane(args.request_plane_uri) + data_plane = UcpDataPlane() + await request_plane.connect() + data_plane.connect() + + remote_operator: RemoteOperator = RemoteOperator( + args.operator, request_plane, data_plane + ) + input_sizes = _get_input_sizes(args) + + inputs = [ + numpy.array(numpy.random.randint(0, 100, input_sizes[index])) + for index in range(request_count) + ] + tqdm.set_lock(args.lock) + + with tqdm( + total=args.requests_per_client, + desc=f"Client: {client_index}", + unit="request", + position=client_index, + leave=False, + ) as pbar: + requests = [ + await remote_operator.async_infer( + inputs={"input": inputs[index]}, request_id=str(index) + ) + for index in range(request_count) + ] + + for request in requests: + async for response in request: + for output_name, output_value in response.outputs.items(): + if output_value.memory_type == MemoryType.CPU: + output = numpy.from_dlpack(output_value) + numpy.testing.assert_array_equal( + output, inputs[int(response.request_id)] + ) + else: + output = cupy.from_dlpack(output_value) + cupy.testing.assert_array_equal( + output, inputs[int(response.request_id)] + ) + del output_value + + pbar.set_description( + f"Client: {client_index} Received Response: {response.request_id} From: {response.component_id} Error: {response.error}" + ) + pbar.update(1) + del response + + await request_plane.close() + data_plane.close() + except Exception as e: + print(f"Exception: {e}") + return 1 + else: + return 0 diff --git a/examples/hello_world/client/parser.py b/examples/hello_world/client/parser.py new file mode 100644 index 0000000..6ba1660 --- /dev/null +++ b/examples/hello_world/client/parser.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + + +def parse_args(args=None): + parser = argparse.ArgumentParser(description="Hello World Client") + + parser.add_argument( + "--request-plane-uri", type=str, default="nats://localhost:4223" + ) + + parser.add_argument("--requests-per-client", type=int, default=100) + + parser.add_argument( + "--operator", + type=str, + choices=["encoder_decoder", "encoder", "decoder"], + default="encoder_decoder", + ) + + parser.add_argument("--input-size-mean", type=int, default=1000) + + parser.add_argument("--input-size-stdev", type=float, default=0) + + parser.add_argument("--clients", type=int, default=1) + + args = parser.parse_args(args) + + return args diff --git a/examples/hello_world/deploy/__main__.py b/examples/hello_world/deploy/__main__.py new file mode 100644 index 0000000..fee985f --- /dev/null +++ b/examples/hello_world/deploy/__main__.py @@ -0,0 +1,163 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import shutil +import signal +import sys +import time +from pathlib import Path + +from triton_distributed.worker import ( + Deployment, + OperatorConfig, + TritonCoreOperator, + WorkerConfig, +) + +from .parser import parse_args + +deployment = None + + +def handler(signum, frame): + exit_code = 0 + if deployment: + print("Stopping Workers") + exit_code = deployment.stop() + print(f"Workers Stopped Exit Code {exit_code}") + sys.exit(exit_code) + + +signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) +for sig in signals: + try: + signal.signal(sig, handler) + except Exception: + pass + + +def _create_encoder_decoder_op(name, max_inflight_requests, args): + return OperatorConfig( + name=name, + implementation="EncodeDecodeOperator", + max_inflight_requests=int(max_inflight_requests), + repository=args.operator_repository, + ) + + +def _create_triton_core_operator( + name, + max_inflight_requests, + instances_per_worker, + kind, + delay_per_token, + input_copies, + args, +): + return OperatorConfig( + name=name, + repository=args.triton_core_models, + implementation=TritonCoreOperator, + max_inflight_requests=int(max_inflight_requests), + parameters={ + "config": { + "instance_group": [ + {"count": int(instances_per_worker), "kind": f"KIND_{kind}"} + ], + "parameters": { + "delay": {"string_value": f"{delay_per_token}"}, + "input_copies": {"string_value": f"{input_copies}"}, + }, + } + }, + ) + + +async def main(args): + global deployment + log_dir = Path(args.log_dir) + + if args.clear_logs: + shutil.rmtree(log_dir) + + log_dir.mkdir(exist_ok=True) + + encoder_op = _create_triton_core_operator( + name="encoder", + max_inflight_requests=args.encoders[1], + instances_per_worker=args.encoders[2], + kind=args.encoders[3], + delay_per_token=args.encoder_delay_per_token, + input_copies=args.encoder_input_copies, + args=args, + ) + + encoder = WorkerConfig( + operators=[encoder_op], + name="encoder", + ) + + decoder_op = _create_triton_core_operator( + name="decoder", + max_inflight_requests=args.decoders[1], + instances_per_worker=args.decoders[2], + kind=args.decoders[3], + delay_per_token=args.decoder_delay_per_token, + input_copies=args.encoder_input_copies, + args=args, + ) + + decoder = WorkerConfig( + operators=[decoder_op], + name="decoder", + ) + + encoder_decoder_op = _create_encoder_decoder_op( + name="encoder_decoder", + max_inflight_requests=args.encoder_decoders[1], + args=args, + ) + + encoder_decoder = WorkerConfig( + operators=[encoder_decoder_op], + name="encoder_decoder", + ) + + print("Starting Workers") + + deployment = Deployment( + [ + (encoder, int(args.encoders[0])), + (decoder, int(args.decoders[0])), + (encoder_decoder, int(args.encoder_decoders[0])), + ], + initialize_request_plane=args.initialize_request_plane, + log_dir=args.log_dir, + log_level=args.log_level, + starting_metrics_port=args.starting_metrics_port, + ) + + deployment.start() + + print("Workers started ... press Ctrl-C to Exit") + + while True: + time.sleep(10) + + +if __name__ == "__main__": + args = parse_args() + asyncio.run(main(args)) diff --git a/examples/hello_world/deploy/parser.py b/examples/hello_world/deploy/parser.py new file mode 100644 index 0000000..4e2885d --- /dev/null +++ b/examples/hello_world/deploy/parser.py @@ -0,0 +1,114 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +from pathlib import Path + + +def parse_args(args=None): + example_dir = Path(__file__).parent.absolute().parent.absolute() + + default_log_dir = example_dir.joinpath("logs") + + default_operator_repository = example_dir.joinpath("operators") + + default_triton_core_models = default_operator_repository.joinpath( + "triton_core_models" + ) + + parser = argparse.ArgumentParser(description="Hello World Deployment") + + parser.add_argument( + "--initialize-request-plane", + default=False, + action="store_true", + help="Initialize the request plane, should only be done once per deployment", + ) + + parser.add_argument( + "--log-dir", + type=str, + default=str(default_log_dir), + help="log dir folder", + ) + + parser.add_argument( + "--clear-logs", default=False, action="store_true", help="clear log dir" + ) + + parser.add_argument("--log-level", type=int, default=1) + + parser.add_argument( + "--request-plane-uri", type=str, default="nats://localhost:4223" + ) + + parser.add_argument("--starting-metrics-port", type=int, default=50000) + + parser.add_argument( + "--operator-repository", type=str, default=str(default_operator_repository) + ) + + parser.add_argument( + "--triton-core-models", type=str, default=str(default_triton_core_models) + ) + + parser.add_argument( + "--encoder-delay-per-token", + type=float, + default=0, + help="Delay per input token. In this toy example can be used to vary the simulated compute load for encoding stage.", + ) + + parser.add_argument( + "--encoder-input-copies", + type=int, + default=1, + help="Number of copies of input to create during encoding. In this toy example can be used to vary the memory transferred between encoding and decoding stages.", + ) + + parser.add_argument( + "--encoders", + type=str, + nargs=4, + default=["1", "1", "1", "CPU"], + help="Number of encoding workers to deploy. Specified as #Workers, #MaxInflightRequests, #ModelInstancesPerWorker, CPU || GPU", + ) + + parser.add_argument( + "--decoders", + type=str, + nargs=4, + default=["1", "1", "1", "CPU"], + help="Number of decoding workers to deploy. Specified as #Workers, #MaxInflightRequests,#ModelInstancesPerWorker, CPU || GPU", + ) + + parser.add_argument( + "--decoder-delay-per-token", + type=float, + default=0, + help="Delay per input token. In this toy example can be used to vary the simulated compute load for decoding stage.", + ) + + parser.add_argument( + "--encoder-decoders", + type=str, + nargs=2, + default=["1", "1"], + help="Number of encode-decode workers to deploy. Specified as #Worker, #MaxInflightRequests", + ) + + args = parser.parse_args(args) + + return args diff --git a/examples/hello_world/operators/__init__.py b/examples/hello_world/operators/__init__.py new file mode 100644 index 0000000..9b3bfb4 --- /dev/null +++ b/examples/hello_world/operators/__init__.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hello_world.operators.encoder_decoder import ( + EncodeDecodeOperator as EncodeDecodeOperator, +) diff --git a/examples/hello_world/operators/encoder_decoder.py b/examples/hello_world/operators/encoder_decoder.py new file mode 100644 index 0000000..f6b013a --- /dev/null +++ b/examples/hello_world/operators/encoder_decoder.py @@ -0,0 +1,42 @@ +import numpy +from triton_distributed.worker import Operator, RemoteInferenceRequest, RemoteOperator + + +class EncodeDecodeOperator(Operator): + def __init__( + self, + name, + version, + triton_core, + request_plane, + data_plane, + parameters, + repository, + logger, + ): + self._encoder = RemoteOperator("encoder", request_plane, data_plane) + self._decoder = RemoteOperator("decoder", request_plane, data_plane) + self._logger = logger + + async def execute(self, requests: list[RemoteInferenceRequest]): + self._logger.info("got request!") + for request in requests: + encoded_responses = await self._encoder.async_infer( + inputs={"input": request.inputs["input"]} + ) + + async for encoded_response in encoded_responses: + input_copies = int( + numpy.from_dlpack(encoded_response.outputs["input_copies"]) + ) + decoded_responses = await self._decoder.async_infer( + inputs={"input": encoded_response.outputs["output"]}, + parameters={"input_copies": input_copies}, + ) + + async for decoded_response in decoded_responses: + await request.response_sender().send( + final=True, + outputs={"output": decoded_response.outputs["output"]}, + ) + del decoded_response diff --git a/examples/hello_world/operators/triton_core_models/decoder/1/model.py b/examples/hello_world/operators/triton_core_models/decoder/1/model.py new file mode 100644 index 0000000..8187835 --- /dev/null +++ b/examples/hello_world/operators/triton_core_models/decoder/1/model.py @@ -0,0 +1,105 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import time + +import numpy +import triton_python_backend_utils as pb_utils + +try: + import cupy +except Exception: + cupy = None + + +class TritonPythonModel: + @staticmethod + def auto_complete_config(auto_complete_model_config): + """Auto Complets Model Config + + Model has one input and one output + both of type int64 + + Parameters + ---------- + auto_complete_model_config : config + Enables reading and updating config.pbtxt + + + """ + + input_config = { + "name": "input", + "data_type": "TYPE_INT64", + "dims": [-1], + "optional": False, + } + + output_config = { + "name": "output", + "data_type": "TYPE_INT64", + "dims": [-1], + } + + auto_complete_model_config.add_input(input_config) + auto_complete_model_config.add_output(output_config) + auto_complete_model_config.set_max_batch_size(0) + auto_complete_model_config.set_model_transaction_policy({"decoupled": False}) + + return auto_complete_model_config + + def initialize(self, args): + self._model_config = json.loads(args["model_config"]) + self._model_instance_kind = args["model_instance_kind"] + self._model_instance_device_id = int(args["model_instance_device_id"]) + self._config_parameters = self._model_config.get("parameters", {}) + self._input_copies = int( + self._config_parameters.get("input_copies", {"string_value": "5"})[ + "string_value" + ] + ) + self._delay = float( + self._config_parameters.get("delay", {"string_value": "0"})["string_value"] + ) + + def execute(self, requests): + responses = [] + input_copies = self._input_copies + delay = self._delay + for request in requests: + output_tensors = [] + parameters = json.loads(request.parameters()) + if parameters: + input_copies = int(parameters.get("input_copies", self._input_copies)) + delay = float(parameters.get("delay", self._delay)) + for input_tensor in request.inputs(): + input_value = input_tensor.as_numpy() + output_value = [] + if self._model_instance_kind == "GPU": + with cupy.cuda.Device(self._model_instance_device_id): + input_value = cupy.array(input_value) + output_value = cupy.invert(input_value) + output_value = output_value[::input_copies] + output_tensor = pb_utils.Tensor.from_dlpack( + "output", output_value + ) + else: + output_value = numpy.invert(input_value) + output_value = output_value[::input_copies] + output_tensor = pb_utils.Tensor("output", output_value) + output_tensors.append(output_tensor) + time.sleep(len(output_value) * delay) + responses.append(pb_utils.InferenceResponse(output_tensors=output_tensors)) + return responses diff --git a/examples/hello_world/operators/triton_core_models/decoder/config.pbtxt b/examples/hello_world/operators/triton_core_models/decoder/config.pbtxt new file mode 100644 index 0000000..5581461 --- /dev/null +++ b/examples/hello_world/operators/triton_core_models/decoder/config.pbtxt @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## Model Instance and Kind are filled in by configuration when launched +## All other values are filled in by auto_complete in model.py + +backend: "python" + diff --git a/examples/hello_world/operators/triton_core_models/encoder/1/model.py b/examples/hello_world/operators/triton_core_models/encoder/1/model.py new file mode 100644 index 0000000..f7b458a --- /dev/null +++ b/examples/hello_world/operators/triton_core_models/encoder/1/model.py @@ -0,0 +1,119 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import time + +import numpy +import triton_python_backend_utils as pb_utils + +try: + import cupy +except Exception: + cupy = None + + +class TritonPythonModel: + @staticmethod + def auto_complete_config(auto_complete_model_config): + """Auto Complets Model Config + + Model has one input and one output + both of type int64 + + Parameters + ---------- + auto_complete_model_config : config + Enables reading and updating config.pbtxt + + + """ + + input_config = { + "name": "input", + "data_type": "TYPE_INT64", + "dims": [-1], + "optional": False, + } + + output_config = { + "name": "output", + "data_type": "TYPE_INT64", + "dims": [-1], + } + + copies_config = { + "name": "input_copies", + "data_type": "TYPE_INT64", + "dims": [1], + } + + auto_complete_model_config.add_input(input_config) + auto_complete_model_config.add_output(output_config) + auto_complete_model_config.add_output(copies_config) + auto_complete_model_config.set_max_batch_size(0) + auto_complete_model_config.set_model_transaction_policy({"decoupled": False}) + + return auto_complete_model_config + + def initialize(self, args): + self._model_config = json.loads(args["model_config"]) + self._model_instance_kind = args["model_instance_kind"] + self._model_instance_device_id = int(args["model_instance_device_id"]) + self._config_parameters = self._model_config.get("parameters", {}) + self._input_copies = int( + self._config_parameters.get("input_copies", {"string_value": "5"})[ + "string_value" + ] + ) + self._delay = float( + self._config_parameters.get("delay", {"string_value": "0"})["string_value"] + ) + + def execute(self, requests): + responses = [] + input_copies = self._input_copies + delay = self._delay + for request in requests: + output_tensors = [] + parameters = json.loads(request.parameters()) + if parameters: + input_copies = int(parameters.get("input_copies", self._input_copies)) + delay = float(parameters.get("delay", self._delay)) + for input_tensor in request.inputs(): + input_value = input_tensor.as_numpy() + output_value = [] + if self._model_instance_kind == "GPU": + with cupy.cuda.Device(self._model_instance_device_id): + input_value = cupy.array(input_value) + output_value = cupy.tile(input_value, input_copies) + output_value = cupy.invert(output_value) + output_tensor = pb_utils.Tensor.from_dlpack( + "output", output_value + ) + else: + output_value = numpy.tile(input_value, input_copies) + output_value = numpy.invert(output_value) + output_tensor = pb_utils.Tensor("output", output_value) + output_tensors.append(output_tensor) + output_tensors.append( + pb_utils.Tensor( + "input_copies", numpy.array(input_copies).astype("int64") + ) + ) + time.sleep(len(output_value) * delay) + + responses.append(pb_utils.InferenceResponse(output_tensors=output_tensors)) + return responses diff --git a/examples/hello_world/operators/triton_core_models/encoder/config.pbtxt b/examples/hello_world/operators/triton_core_models/encoder/config.pbtxt new file mode 100644 index 0000000..5581461 --- /dev/null +++ b/examples/hello_world/operators/triton_core_models/encoder/config.pbtxt @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## Model Instance and Kind are filled in by configuration when launched +## All other values are filled in by auto_complete in model.py + +backend: "python" + diff --git a/examples/hello_world/single_file.py b/examples/hello_world/single_file.py new file mode 100644 index 0000000..1788856 --- /dev/null +++ b/examples/hello_world/single_file.py @@ -0,0 +1,206 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import shutil +from pathlib import Path + +import cupy +import numpy +from tqdm import tqdm +from triton_distributed.icp.nats_request_plane import NatsRequestPlane +from triton_distributed.icp.ucp_data_plane import UcpDataPlane +from triton_distributed.worker import ( + Deployment, + Operator, + OperatorConfig, + RemoteInferenceRequest, + RemoteOperator, + TritonCoreOperator, + WorkerConfig, +) +from tritonserver import MemoryType + + +class EncodeDecodeOperator(Operator): + def __init__( + self, + name, + version, + triton_core, + request_plane, + data_plane, + parameters, + repository, + logger, + ): + self._encoder = RemoteOperator("encoder", request_plane, data_plane) + self._decoder = RemoteOperator("decoder", request_plane, data_plane) + self._logger = logger + + async def execute(self, requests: list[RemoteInferenceRequest]): + for request in requests: + self._logger.info("got request!") + encoded_responses = await self._encoder.async_infer( + inputs={"input": request.inputs["input"]} + ) + + async for encoded_response in encoded_responses: + input_copies = int( + numpy.from_dlpack(encoded_response.outputs["input_copies"]) + ) + decoded_responses = await self._decoder.async_infer( + inputs={"input": encoded_response.outputs["output"]}, + parameters={"input_copies": input_copies}, + ) + + async for decoded_response in decoded_responses: + await request.response_sender().send( + final=True, + outputs={"output": decoded_response.outputs["output"]}, + ) + del decoded_response + + +async def send_requests(nats_server_url, request_count=100): + request_plane = NatsRequestPlane(nats_server_url) + data_plane = UcpDataPlane() + await request_plane.connect() + data_plane.connect() + + remote_operator: RemoteOperator = RemoteOperator( + "encoder_decoder", request_plane, data_plane + ) + + inputs = [ + numpy.array(numpy.random.randint(0, 100, 10000)).astype("int64") + for _ in range(request_count) + ] + + with tqdm(total=request_count, desc="Sending Requests", unit="request") as pbar: + requests = [ + await remote_operator.async_infer( + inputs={"input": inputs[index]}, request_id=str(index) + ) + for index in range(request_count) + ] + + for request in requests: + async for response in request: + for output_name, output_value in response.outputs.items(): + if output_value.memory_type == MemoryType.CPU: + output = numpy.from_dlpack(output_value) + numpy.testing.assert_array_equal( + output, inputs[int(response.request_id)] + ) + else: + output = cupy.from_dlpack(output_value) + cupy.testing.assert_array_equal( + output, inputs[int(response.request_id)] + ) + del output_value + + pbar.set_description( + f"Finished Request: {response.request_id} Response From: {response.component_id} Error: {response.error}" + ) + pbar.update(1) + del response + + await request_plane.close() + data_plane.close() + + +async def main(): + module_dir = Path(__file__).parent.absolute() + + log_dir = module_dir.joinpath("logs") + + if log_dir.is_dir(): + shutil.rmtree(log_dir) + + log_dir.mkdir(exist_ok=True) + + triton_core_models_dir = module_dir.joinpath("operators", "triton_core_models") + + encoder_op = OperatorConfig( + name="encoder", + repository=str(triton_core_models_dir), + implementation=TritonCoreOperator, + max_inflight_requests=1, + parameters={ + "config": { + "instance_group": [{"count": 1, "kind": "KIND_CPU"}], + "parameters": {"delay": {"string_value": "0"}}, + } + }, + ) + + decoder_op = OperatorConfig( + name="decoder", + repository=str(triton_core_models_dir), + implementation=TritonCoreOperator, + max_inflight_requests=1, + parameters={ + "config": { + "instance_group": [{"count": 1, "kind": "KIND_GPU"}], + "parameters": {"delay": {"string_value": "0"}}, + } + }, + ) + + encoder_decoder_op = OperatorConfig( + name="encoder_decoder", + implementation=EncodeDecodeOperator, + max_inflight_requests=100, + ) + + encoder = WorkerConfig( + operators=[encoder_op], + name="encoder", + ) + + decoder = WorkerConfig( + operators=[decoder_op], + name="decoder", + ) + + encoder_decoder = WorkerConfig( + operators=[encoder_decoder_op], + name="encoder_decoder", + ) + + print("Starting Workers") + + deployment = Deployment( + [(encoder, 1), (decoder, 1), (encoder_decoder, 1)], + initialize_request_plane=True, + log_dir=str(log_dir), + log_level=1, + starting_metrics_port=50000, + ) + + deployment.start() + + print("Sending Requests") + + await send_requests(deployment.request_plane_server.url) + + print("Stopping Workers") + + deployment.stop() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/hello_world/tests/test_sanity.py b/examples/hello_world/tests/test_sanity.py new file mode 100644 index 0000000..e90e6dc --- /dev/null +++ b/examples/hello_world/tests/test_sanity.py @@ -0,0 +1,64 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess + +import pytest + +# TODO +# Decide if this should be +# pre merge, nightly, or weekly +pytestmark = pytest.mark.pre_merge + + +def test_sanity(): + deployment_command = [ + "python3", + "-m", + "hello_world.deploy", + "--initialize-request-plane", + ] + + deployment_process = subprocess.Popen( + deployment_command, + stdin=subprocess.DEVNULL, + ) + + client_command = [ + "python3", + "-m", + "hello_world.client", + "--requests-per-client", + "10", + ] + + client_process = subprocess.Popen( + client_command, + stdin=subprocess.DEVNULL, + ) + try: + client_process.wait(timeout=30) + except subprocess.TimeoutExpired: + print("Client timed out!") + client_process.terminate() + client_process.wait() + + client_process.terminate() + client_process.kill() + client_process.wait() + deployment_process.terminate() + deployment_process.wait() + assert client_process.returncode == 0, "Error in clients!" + assert deployment_process.returncode == 0, "Error starting deployment!" diff --git a/icp/src/python/triton_distributed/icp/__init__.py b/icp/src/python/triton_distributed/icp/__init__.py index e9d1d88..a01b1f9 100644 --- a/icp/src/python/triton_distributed/icp/__init__.py +++ b/icp/src/python/triton_distributed/icp/__init__.py @@ -12,3 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from triton_distributed.icp.data_plane import DataPlane as DataPlane +from triton_distributed.icp.nats_request_plane import ( + NatsRequestPlane as NatsRequestPlane, +) +from triton_distributed.icp.nats_request_plane import NatsServer as NatsServer +from triton_distributed.icp.request_plane import RequestPlane as RequestPlane +from triton_distributed.icp.ucp_data_plane import UcpDataPlane as UcpDataPlane diff --git a/worker/src/python/triton_distributed/worker/__init__.py b/worker/src/python/triton_distributed/worker/__init__.py index 365c827..e681f75 100644 --- a/worker/src/python/triton_distributed/worker/__init__.py +++ b/worker/src/python/triton_distributed/worker/__init__.py @@ -22,5 +22,8 @@ from triton_distributed.worker.remote_response import ( RemoteInferenceResponse as RemoteInferenceResponse, ) +from triton_distributed.worker.triton_core_operator import ( + TritonCoreOperator as TritonCoreOperator, +) from triton_distributed.worker.worker import Worker as Worker from triton_distributed.worker.worker import WorkerConfig as WorkerConfig diff --git a/worker/src/python/triton_distributed/worker/deployment.py b/worker/src/python/triton_distributed/worker/deployment.py index 9f3bf14..016fc5c 100644 --- a/worker/src/python/triton_distributed/worker/deployment.py +++ b/worker/src/python/triton_distributed/worker/deployment.py @@ -13,33 +13,119 @@ # See the License for the specific language governing permissions and # limitations under the License. import multiprocessing +from pprint import pformat +from typing import Optional, Type +from triton_distributed.icp import ( + DataPlane, + NatsRequestPlane, + NatsServer, + RequestPlane, + UcpDataPlane, +) +from triton_distributed.worker.log_formatter import setup_logger from triton_distributed.worker.worker import Worker, WorkerConfig +from tritonserver import InvalidArgumentError + +LOGGER_NAME = __name__ class Deployment: - def __init__(self, worker_configs: list[WorkerConfig]): + def __init__( + self, + worker_configs: list[WorkerConfig | tuple[WorkerConfig, int]], + log_level=3, + initialize_request_plane=False, + initialize_data_plane=False, + request_plane_args: Optional[tuple[list, dict]] = None, + request_plane: Optional[Type[RequestPlane]] = NatsRequestPlane, + data_plane: Optional[Type[DataPlane]] = UcpDataPlane, + data_plane_args: Optional[tuple[list, dict]] = None, + log_dir="logs", + starting_metrics_port=50000, + ): self._process_context = multiprocessing.get_context("spawn") self._worker_configs = worker_configs self._workers: list[multiprocessing.context.SpawnProcess] = [] + self._logger = setup_logger(log_level, LOGGER_NAME) + self._default_request_plane = request_plane + self._default_request_plane_args = request_plane_args + self._default_data_plane = data_plane + self._default_data_plane_args = data_plane_args + self._initialize_request_plane = initialize_request_plane + self._initialize_data_plane = initialize_data_plane + self.request_plane_server: NatsServer = None + self._default_log_dir = log_dir + self._default_log_level = log_level + self._starting_metrics_port = starting_metrics_port @staticmethod def _start_worker(worker_config): Worker(worker_config).start() def start(self): + if self._initialize_request_plane: + if self._default_request_plane == NatsRequestPlane: + self.request_plane_server = NatsServer(log_dir=self._default_log_dir) + else: + raise InvalidArgumentError( + f"Unknown Request Plane Type, can not initialize {self._default_request_plane}" + ) + for worker_config in self._worker_configs: - self._workers.append( - self._process_context.Process( - target=Deployment._start_worker, - name=worker_config.name, - args=[worker_config], + worker_instances = 1 + if isinstance(worker_config, tuple): + worker_instances = worker_config[1] + worker_config = worker_config[0] + + base_name = worker_config.name + base_port = worker_config.metrics_port + + if not base_port and self._starting_metrics_port: + base_port = self._starting_metrics_port + self._starting_metrics_port += worker_instances + + request_plane_args, request_plane_kwargs = worker_config.request_plane_args + + if not request_plane_args and not request_plane_kwargs: + if self._default_request_plane_args: + worker_config.request_plane_args = self._default_request_plane_args + elif self.request_plane_server: + worker_config.request_plane_args = ( + [self.request_plane_server.url], + {}, + ) + + if not worker_config.log_dir: + worker_config.log_dir = self._default_log_dir + + if not worker_config.log_level: + worker_config.log_level = self._default_log_level + + for index in range(worker_instances): + worker_config.name = f"{base_name}.{index}" + worker_config.metrics_port = base_port + index + self._workers.append( + self._process_context.Process( + target=Deployment._start_worker, + name=worker_config.name, + args=[worker_config], + ) ) - ) - self._workers[-1].start() + self._logger.info( + "\n\nStarting Worker:\n\n\tConfig:\n\t%s\n\t%s\n", + pformat(worker_config), + self._workers[-1], + ) + self._workers[-1].start() + + def stop(self): + return self.shutdown() def shutdown(self, join=True, timeout=10): + exit_code = 0 for worker in self._workers: + self._logger.info("\n\nStopping Worker:\n\n\n\t%s\n", worker) worker.terminate() if join: for worker in self._workers: @@ -47,4 +133,8 @@ def shutdown(self, join=True, timeout=10): for worker in self._workers: if worker.is_alive(): worker.kill() - worker.join(timeout) + worker.join(timeout) + self._logger.info("\n\nWorker Stopped:\n\n\n\t%s\n", worker) + if worker.exitcode is not None: + exit_code += worker.exitcode + return exit_code diff --git a/worker/src/python/triton_distributed/worker/remote_operator.py b/worker/src/python/triton_distributed/worker/remote_operator.py index 9ea78e0..6c58702 100644 --- a/worker/src/python/triton_distributed/worker/remote_operator.py +++ b/worker/src/python/triton_distributed/worker/remote_operator.py @@ -29,14 +29,17 @@ class RemoteOperator: def __init__( self, - name: str, - version: int, + operator: str | tuple[str, int], request_plane: RequestPlane, data_plane: DataPlane, component_id: Optional[uuid.UUID] = None, ): - self.name = name - self.version = version + if isinstance(operator, str): + self.name = operator + self.version = 1 + else: + self.name = operator[0] + self.version = operator[1] self._request_plane = request_plane self._data_plane = data_plane self.component_id = component_id diff --git a/worker/src/python/triton_distributed/worker/worker.py b/worker/src/python/triton_distributed/worker/worker.py index 9f3646f..8952ef7 100644 --- a/worker/src/python/triton_distributed/worker/worker.py +++ b/worker/src/python/triton_distributed/worker/worker.py @@ -16,7 +16,6 @@ import asyncio import importlib import logging -import multiprocessing import os import pathlib import signal @@ -50,7 +49,7 @@ class WorkerConfig: data_plane: Type[DataPlane] = UcpDataPlane request_plane_args: tuple[list, dict] = field(default_factory=lambda: ([], {})) data_plane_args: tuple[list, dict] = field(default_factory=lambda: ([], {})) - log_level: int = 0 + log_level: Optional[int] = None operators: list[OperatorConfig] = field(default_factory=list) triton_log_path: Optional[str] = None name: str = str(uuid.uuid1()) @@ -75,6 +74,8 @@ def __init__( self._triton_log_path = config.triton_log_path self._name = config.name self._log_level = config.log_level + if self._log_level is None: + self._log_level = 0 self._operator_configs = config.operators self._log_dir = config.log_dir @@ -87,6 +88,7 @@ def __init__( self._operators: dict[tuple[str, int], Operator] = {} self._metrics_port = config.metrics_port self._metrics_server: Optional[uvicorn.Server] = None + self._component_id = self._request_plane.component_id def _import_operators(self): for operator_config in self._operator_configs: @@ -225,6 +227,7 @@ async def _initialize_request_handlers(self): await asyncio.gather(*handlers) async def serve(self): + error = None self._triton_core = tritonserver.Server( model_repository=".", log_error=True, @@ -258,6 +261,7 @@ async def serve(self): except Exception as e: logger.exception("Encountered an error in worker: %s", e) self._stop_requested = True + error = e logger.info("worker store: %s", list(self._data_plane._tensor_store.keys())) logger.info("Worker stopped...") logger.info( @@ -272,6 +276,7 @@ async def serve(self): if self._metrics_server: self._metrics_server.should_exit = True await self._metrics_server.shutdown() + return error async def shutdown(self, signal): logger.info("Received exit signal %s...", signal.name) @@ -326,13 +331,20 @@ async def _wait_for_tasks(self, loop): loop.stop() def start(self): + exit_condition = None + if self._log_dir: + pid = os.getpid() os.makedirs(self._log_dir, exist_ok=True) - stdout_path = os.path.join(self._log_dir, f"{self._name}.stdout.log") - stderr_path = os.path.join(self._log_dir, f"{self._name}.stderr.log") + stdout_path = os.path.join( + self._log_dir, f"{self._name}.{self._component_id}.{pid}.stdout.log" + ) + stderr_path = os.path.join( + self._log_dir, f"{self._name}.{self._component_id}.{pid}.stderr.log" + ) if not self._triton_log_path: self._triton_log_path = os.path.join( - self._log_dir, f"{self._name}.triton.log" + self._log_dir, f"{self._name}.{self._component_id}.{pid}.triton.log" ) sys.stdout = open(stdout_path, "w", buffering=1) sys.stderr = open(stderr_path, "w", buffering=1) @@ -349,55 +361,34 @@ def start(self): loop.add_signal_handler( sig, lambda s=sig: asyncio.create_task(self.shutdown(s)) # type: ignore ) + serve_result = None try: if self._metrics_port: - loop.create_task(self.serve()) + serve_result = loop.create_task(self.serve()) self._metrics_server = self._setup_metrics_server() assert self._metrics_server, "Unable to start metrics server" loop.run_until_complete(self._metrics_server.serve()) else: - loop.run_until_complete(self.serve()) + serve_result = loop.run_until_complete(self.serve()) except asyncio.CancelledError: - pass logger.info("Worker cancelled!") finally: loop.run_until_complete(self._wait_for_tasks(loop)) loop.close() logger.info("Successfully shutdown worker.") + if isinstance(serve_result, asyncio.Task): + exit_condition = serve_result.result() + else: + exit_condition = serve_result + sys.stdout.flush() sys.stderr.flush() + if self._log_dir: sys.stdout.close() sys.stderr.close() - -class Deployment: - def __init__(self, worker_configs: list[WorkerConfig]): - self._process_context = multiprocessing.get_context("spawn") - self._worker_configs = worker_configs - self._workers: list[multiprocessing.context.SpawnProcess] = [] - - @staticmethod - def _start_worker(worker_config): - Worker(worker_config).start() - - def start(self): - for worker_config in self._worker_configs: - self._workers.append( - self._process_context.Process( - target=Deployment._start_worker, - name=worker_config.name, - args=[worker_config], - ) - ) - - def shutdown(self, join=True, timeout=10): - for worker in self._workers: - worker.terminate() - if join: - for worker in self._workers: - worker.join(timeout) - for worker in self._workers: - if worker.is_alive(): - worker.kill() - worker.join(timeout) + if exit_condition is not None: + sys.exit(1) + else: + sys.exit(0) diff --git a/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/1/model.py b/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/1/model.py index c80fdf1..f6c6a95 100644 --- a/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/1/model.py +++ b/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/1/model.py @@ -1,28 +1,17 @@ -# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import asyncio import gc @@ -162,7 +151,7 @@ def initialize(self, args): "string_value" ] self._remote_operator = RemoteOperator( - self._remote_worker_name, 1, self._request_plane, self._data_plane + self._remote_worker_name, self._request_plane, self._data_plane ) # Starting the response thread. It allows API Server to keep making progress while diff --git a/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/config.pbtxt b/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/config.pbtxt index 1fc2916..5c68b52 100644 --- a/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/config.pbtxt +++ b/worker/tests/python/integration/api_server/models/mock_disaggregated_serving/config.pbtxt @@ -1,28 +1,17 @@ -# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. name: "mock_disaggregated_serving" backend: "python" diff --git a/worker/tests/python/integration/operators/add_multiply_divide.py b/worker/tests/python/integration/operators/add_multiply_divide.py index b1c982e..631f259 100644 --- a/worker/tests/python/integration/operators/add_multiply_divide.py +++ b/worker/tests/python/integration/operators/add_multiply_divide.py @@ -35,14 +35,12 @@ def __init__( self._request_plane = request_plane self._data_plane = data_plane self._parameters = parameters - self._add_model = RemoteOperator( - "add", 1, self._request_plane, self._data_plane - ) + self._add_model = RemoteOperator("add", self._request_plane, self._data_plane) self._multiply_model = RemoteOperator( - "multiply", 1, self._request_plane, self._data_plane + "multiply", self._request_plane, self._data_plane ) self._divide_model = RemoteOperator( - "divide", 1, self._request_plane, self._data_plane + "divide", self._request_plane, self._data_plane ) async def execute(self, requests: list[RemoteInferenceRequest]): diff --git a/worker/tests/python/integration/operators/mock_disaggregated_serving.py b/worker/tests/python/integration/operators/mock_disaggregated_serving.py index 5539565..d228883 100644 --- a/worker/tests/python/integration/operators/mock_disaggregated_serving.py +++ b/worker/tests/python/integration/operators/mock_disaggregated_serving.py @@ -37,16 +37,16 @@ def __init__( self._data_plane = data_plane self._params = params self._preprocessing_model = RemoteOperator( - "preprocessing", 1, self._request_plane, self._data_plane + "preprocessing", self._request_plane, self._data_plane ) self._context_model = RemoteOperator( - "context", 1, self._request_plane, self._data_plane + "context", self._request_plane, self._data_plane ) self._generate_model = RemoteOperator( - "generation", 1, self._request_plane, self._data_plane + "generation", self._request_plane, self._data_plane ) self._postprocessing_model = RemoteOperator( - "postprocessing", 1, self._request_plane, self._data_plane + "postprocessing", self._request_plane, self._data_plane ) self._logger = logger diff --git a/worker/tests/python/integration/test_add_multiply_divide.py b/worker/tests/python/integration/test_add_multiply_divide.py index 669210b..18f7fdc 100644 --- a/worker/tests/python/integration/test_add_multiply_divide.py +++ b/worker/tests/python/integration/test_add_multiply_divide.py @@ -160,7 +160,7 @@ async def post_requests(num_requests, store_inputs_in_request): await request_plane.connect() add_multiply_divide_operator = RemoteOperator( - "add_multiply_divide", 1, request_plane, data_plane + "add_multiply_divide", request_plane, data_plane ) results = [] diff --git a/worker/tests/python/integration/test_direct.py b/worker/tests/python/integration/test_direct.py index 6be761f..c031ba7 100644 --- a/worker/tests/python/integration/test_direct.py +++ b/worker/tests/python/integration/test_direct.py @@ -115,7 +115,7 @@ async def post_requests(num_requests, num_targets): request_plane = NatsRequestPlane(f"nats://localhost:{NATS_PORT}") await request_plane.connect() - identity_operator = RemoteOperator("identity", 1, request_plane, data_plane) + identity_operator = RemoteOperator("identity", request_plane, data_plane) target_components = set() target_component_list: list[uuid.UUID] = [] diff --git a/worker/tests/python/integration/test_mock_disaggregated_serving.py b/worker/tests/python/integration/test_mock_disaggregated_serving.py index c56389c..c23e1df 100644 --- a/worker/tests/python/integration/test_mock_disaggregated_serving.py +++ b/worker/tests/python/integration/test_mock_disaggregated_serving.py @@ -156,7 +156,7 @@ async def post_requests(num_requests): await request_plane.connect() mock_disaggregated_serving_operator = RemoteOperator( - "mock_disaggregated_serving", 1, request_plane, data_plane + "mock_disaggregated_serving", request_plane, data_plane ) expected_results = {} diff --git a/worker/tests/python/integration/test_perf_benchmark.py b/worker/tests/python/integration/test_perf_benchmark.py index 7d1c722..d54c376 100644 --- a/worker/tests/python/integration/test_perf_benchmark.py +++ b/worker/tests/python/integration/test_perf_benchmark.py @@ -133,7 +133,7 @@ def run( asyncio.get_event_loop().run_until_complete(request_plane.connect()) identity_operator = RemoteOperator( - "identity", 1, request_plane, data_plane_tracker._data_plane + "identity", request_plane, data_plane_tracker._data_plane ) inputs, outputs = _create_inputs(1, tensor_size_in_kb)