Commit 4698c0f4 authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

feat: hello world


Co-authored-by: default avatarPiotr Marcinkiewicz <piotrm@nvidia.com>
Co-authored-by: default avatarTanmay Verma <tanmay2592@gmail.com>
parent e6c12674
......@@ -15,7 +15,6 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
# Triton Distributed
<h4> A Datacenter Scale Distributed Inference Serving Framework </h4>
......@@ -84,14 +83,3 @@ HF_TOKEN```) and mounts common directories such as ```/tmp:/tmp```,
Please see the instructions in the corresponding example for specific
deployment instructions.
<!--
## Goals
## Concepts
## Examples
-->
......@@ -133,7 +133,7 @@ COPY . /workspace
RUN /workspace/icp/protos/gen_python.sh
# Sets pythonpath for python modules
ENV PYTHONPATH="${PYTHONPATH}:/workspace/icp/src/python:/workspace/worker/src/python"
ENV PYTHONPATH="${PYTHONPATH}:/workspace/icp/src/python:/workspace/worker/src/python:/workspace/examples"
# Command and Entrypoint
CMD []
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import signal
import sys
import time
from typing import Optional
from .client import _start_client
from .parser import parse_args
processes: Optional[list[multiprocessing.context.SpawnProcess]] = None
def handler(signum, frame):
exit_code = 0
if processes:
print("Stopping Clients")
for process in processes:
process.terminate()
process.kill()
process.join()
if process.exitcode is not None:
exit_code += process.exitcode
print(f"Clients Stopped Exit Code {exit_code}")
sys.exit(exit_code)
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for sig in signals:
try:
signal.signal(sig, handler)
except Exception:
pass
def main(args):
global processes
process_context = multiprocessing.get_context("spawn")
args.lock = process_context.Lock()
processes = []
start_time = time.time()
for index in range(args.clients):
processes.append(
process_context.Process(target=_start_client, args=(index, args))
)
processes[-1].start()
for process in processes:
process.join()
end_time = time.time()
print(
f"Throughput: {(args.requests_per_client*args.clients)/(end_time-start_time)} Total Time: {end_time-start_time}"
)
exit_code = 0
for process in processes:
if process.exitcode is not None:
exit_code += process.exitcode
print(f"Clients Stopped Exit Code {exit_code}")
return exit_code
if __name__ == "__main__":
args = parse_args()
sys.exit(main(args))
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import sys
import cupy
import numpy
from tqdm import tqdm
from triton_distributed.icp import NatsRequestPlane, UcpDataPlane
from triton_distributed.worker import RemoteOperator
from tritonserver import MemoryType
def _get_input_sizes(args):
return numpy.maximum(
0,
numpy.round(
numpy.random.normal(
loc=args.input_size_mean,
scale=args.input_size_stdev,
size=args.requests_per_client,
)
),
).astype(int)
def _start_client(client_index, args):
sys.exit(asyncio.run(client(client_index, args)))
async def client(client_index, args):
request_count = args.requests_per_client
try:
request_plane = NatsRequestPlane(args.request_plane_uri)
data_plane = UcpDataPlane()
await request_plane.connect()
data_plane.connect()
remote_operator: RemoteOperator = RemoteOperator(
args.operator, request_plane, data_plane
)
input_sizes = _get_input_sizes(args)
inputs = [
numpy.array(numpy.random.randint(0, 100, input_sizes[index]))
for index in range(request_count)
]
tqdm.set_lock(args.lock)
with tqdm(
total=args.requests_per_client,
desc=f"Client: {client_index}",
unit="request",
position=client_index,
leave=False,
) as pbar:
requests = [
await remote_operator.async_infer(
inputs={"input": inputs[index]}, request_id=str(index)
)
for index in range(request_count)
]
for request in requests:
async for response in request:
for output_name, output_value in response.outputs.items():
if output_value.memory_type == MemoryType.CPU:
output = numpy.from_dlpack(output_value)
numpy.testing.assert_array_equal(
output, inputs[int(response.request_id)]
)
else:
output = cupy.from_dlpack(output_value)
cupy.testing.assert_array_equal(
output, inputs[int(response.request_id)]
)
del output_value
pbar.set_description(
f"Client: {client_index} Received Response: {response.request_id} From: {response.component_id} Error: {response.error}"
)
pbar.update(1)
del response
await request_plane.close()
data_plane.close()
except Exception as e:
print(f"Exception: {e}")
return 1
else:
return 0
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
def parse_args(args=None):
parser = argparse.ArgumentParser(description="Hello World Client")
parser.add_argument(
"--request-plane-uri",
type=str,
default="nats://localhost:4223",
help="URI of request plane",
)
parser.add_argument(
"--requests-per-client",
type=int,
default=100,
help="number of requests to send per client",
)
parser.add_argument(
"--operator",
type=str,
choices=["encoder_decoder", "encoder", "decoder"],
default="encoder_decoder",
help="operator to send requests to. In this example all operators have the same input and output names.",
)
parser.add_argument(
"--input-size-mean",
type=int,
default=1000,
help="average input size for requests",
)
parser.add_argument(
"--input-size-stdev",
type=float,
default=0,
help="standard deviation for input size",
)
parser.add_argument(
"--clients", type=int, default=1, help="number of concurrent clients to launch."
)
args = parser.parse_args(args)
return args
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import shutil
import signal
import sys
import time
from pathlib import Path
from triton_distributed.worker import (
Deployment,
OperatorConfig,
TritonCoreOperator,
WorkerConfig,
)
from .parser import parse_args
deployment = None
def handler(signum, frame):
exit_code = 0
if deployment:
print("Stopping Workers")
exit_code = deployment.stop()
print(f"Workers Stopped Exit Code {exit_code}")
sys.exit(exit_code)
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for sig in signals:
try:
signal.signal(sig, handler)
except Exception:
pass
def _create_encoder_decoder_op(name, max_inflight_requests, args):
return OperatorConfig(
name=name,
implementation="EncodeDecodeOperator",
max_inflight_requests=int(max_inflight_requests),
repository=args.operator_repository,
)
def _create_triton_core_op(
name,
max_inflight_requests,
instances_per_worker,
kind,
delay_per_token,
input_copies,
args,
):
return OperatorConfig(
name=name,
repository=args.triton_core_models,
implementation=TritonCoreOperator,
max_inflight_requests=int(max_inflight_requests),
parameters={
"config": {
"instance_group": [
{"count": int(instances_per_worker), "kind": f"KIND_{kind}"}
],
"parameters": {
"delay": {"string_value": f"{delay_per_token}"},
"input_copies": {"string_value": f"{input_copies}"},
},
}
},
)
async def main(args):
global deployment
log_dir = Path(args.log_dir)
if args.clear_logs:
shutil.rmtree(log_dir)
log_dir.mkdir(exist_ok=True)
(
encoder_worker_instances,
encoder_max_inflight_requests,
encoder_instances_per_worker,
encoder_device_kind,
) = args.encoders
(
decoder_worker_instances,
decoder_max_inflight_requests,
decoder_instances_per_worker,
decoder_device_kind,
) = args.decoders
(
encoder_decoder_worker_instances,
encoder_decoder_max_inflight_requests,
) = args.encoder_decoders
encoder_op = _create_triton_core_op(
name="encoder",
max_inflight_requests=encoder_max_inflight_requests,
instances_per_worker=encoder_instances_per_worker,
kind=encoder_device_kind,
delay_per_token=args.encoder_delay_per_token,
input_copies=args.encoder_input_copies,
args=args,
)
encoder = WorkerConfig(
operators=[encoder_op],
name="encoder",
)
decoder_op = _create_triton_core_op(
name="decoder",
max_inflight_requests=decoder_max_inflight_requests,
instances_per_worker=decoder_instances_per_worker,
kind=decoder_device_kind,
delay_per_token=args.decoder_delay_per_token,
input_copies=args.encoder_input_copies,
args=args,
)
decoder = WorkerConfig(
operators=[decoder_op],
name="decoder",
)
encoder_decoder_op = _create_encoder_decoder_op(
name="encoder_decoder",
max_inflight_requests=encoder_decoder_max_inflight_requests,
args=args,
)
encoder_decoder = WorkerConfig(
operators=[encoder_decoder_op],
name="encoder_decoder",
)
print("Starting Workers")
deployment = Deployment(
[
# (worker_config, repeat_count )
(encoder, int(encoder_decoder_worker_instances)),
(decoder, int(decoder_worker_instances)),
(encoder_decoder, int(encoder_decoder_worker_instances)),
],
initialize_request_plane=args.initialize_request_plane,
log_dir=args.log_dir,
log_level=args.log_level,
starting_metrics_port=args.starting_metrics_port,
)
deployment.start()
print("Workers started ... press Ctrl-C to Exit")
while True:
time.sleep(10)
if __name__ == "__main__":
args = parse_args()
asyncio.run(main(args))
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from pathlib import Path
def parse_args(args=None):
example_dir = Path(__file__).parent.absolute().parent.absolute()
default_log_dir = example_dir.joinpath("logs")
default_operator_repository = example_dir.joinpath("operators")
default_triton_core_models = default_operator_repository.joinpath(
"triton_core_models"
)
parser = argparse.ArgumentParser(description="Hello World Deployment")
parser.add_argument(
"--initialize-request-plane",
default=False,
action="store_true",
help="Initialize the request plane, should only be done once per deployment",
)
parser.add_argument(
"--log-dir",
type=str,
default=str(default_log_dir),
help="log dir folder",
)
parser.add_argument(
"--clear-logs", default=False, action="store_true", help="clear log directory"
)
parser.add_argument(
"--log-level", type=int, default=1, help="log level applied to all workers"
)
parser.add_argument(
"--request-plane-uri",
type=str,
default="nats://localhost:4223",
help="URI of request plane",
)
parser.add_argument(
"--starting-metrics-port",
type=int,
default=50000,
help="Metrics port for first worker. Each worker will expose metrics on subsequent ports, ex. worker 1: 50000, worker 2: 50001, worker 3: 50002",
)
parser.add_argument(
"--operator-repository",
type=str,
default=str(default_operator_repository),
help="operator repository",
)
parser.add_argument(
"--triton-core-models",
type=str,
default=str(default_triton_core_models),
help="model repository for triton core models.",
)
parser.add_argument(
"--encoder-delay-per-token",
type=float,
default=0,
help="Delay per input token. In this toy example can be used to vary the simulated compute load for encoding stage.",
)
parser.add_argument(
"--encoder-input-copies",
type=int,
default=1,
help="Number of copies of input to create during encoding. In this toy example can be used to vary the memory transferred between encoding and decoding stages.",
)
parser.add_argument(
"--encoders",
type=str,
nargs=4,
default=["1", "1", "1", "CPU"],
help="Number of encoding workers to deploy. Specified as #Workers, #MaxInflightRequests, #ModelInstancesPerWorker, CPU || GPU",
)
parser.add_argument(
"--decoders",
type=str,
nargs=4,
default=["1", "1", "1", "CPU"],
help="Number of decoding workers to deploy. Specified as #Workers, #MaxInflightRequests,#ModelInstancesPerWorker, CPU || GPU",
)
parser.add_argument(
"--decoder-delay-per-token",
type=float,
default=0,
help="Delay per input token. In this toy example can be used to vary the simulated compute load for decoding stage.",
)
parser.add_argument(
"--encoder-decoders",
type=str,
nargs=2,
default=["1", "1"],
help="Number of encode-decode workers to deploy. Specified as #Worker, #MaxInflightRequests",
)
args = parser.parse_args(args)
return args
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from hello_world.operators.encoder_decoder import (
EncodeDecodeOperator as EncodeDecodeOperator,
)
import numpy
from triton_distributed.worker import Operator, RemoteInferenceRequest, RemoteOperator
class EncodeDecodeOperator(Operator):
def __init__(
self,
name,
version,
triton_core,
request_plane,
data_plane,
parameters,
repository,
logger,
):
self._encoder = RemoteOperator("encoder", request_plane, data_plane)
self._decoder = RemoteOperator("decoder", request_plane, data_plane)
self._logger = logger
async def execute(self, requests: list[RemoteInferenceRequest]):
self._logger.info("got request!")
for request in requests:
encoded_responses = await self._encoder.async_infer(
inputs={"input": request.inputs["input"]}
)
async for encoded_response in encoded_responses:
input_copies = int(
numpy.from_dlpack(encoded_response.outputs["input_copies"])
)
decoded_responses = await self._decoder.async_infer(
inputs={"input": encoded_response.outputs["output"]},
parameters={"input_copies": input_copies},
)
async for decoded_response in decoded_responses:
await request.response_sender().send(
final=True,
outputs={"output": decoded_response.outputs["output"]},
)
del decoded_response
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import time
import numpy
import triton_python_backend_utils as pb_utils
try:
import cupy
except Exception:
cupy = None
class TritonPythonModel:
@staticmethod
def auto_complete_config(auto_complete_model_config):
"""Auto Complets Model Config
Model has one input and one output
both of type int64
Parameters
----------
auto_complete_model_config : config
Enables reading and updating config.pbtxt
"""
input_config = {
"name": "input",
"data_type": "TYPE_INT64",
"dims": [-1],
"optional": False,
}
output_config = {
"name": "output",
"data_type": "TYPE_INT64",
"dims": [-1],
}
auto_complete_model_config.add_input(input_config)
auto_complete_model_config.add_output(output_config)
auto_complete_model_config.set_max_batch_size(0)
auto_complete_model_config.set_model_transaction_policy({"decoupled": False})
return auto_complete_model_config
def initialize(self, args):
self._model_config = json.loads(args["model_config"])
self._model_instance_kind = args["model_instance_kind"]
self._model_instance_device_id = int(args["model_instance_device_id"])
self._config_parameters = self._model_config.get("parameters", {})
self._input_copies = int(
self._config_parameters.get("input_copies", {"string_value": "5"})[
"string_value"
]
)
self._delay = float(
self._config_parameters.get("delay", {"string_value": "0"})["string_value"]
)
if self._model_instance_kind == "GPU" and cupy is None:
raise RuntimeError("GPU Device set but cupy not installed")
def execute(self, requests):
responses = []
input_copies = self._input_copies
delay = self._delay
for request in requests:
output_tensors = []
parameters = json.loads(request.parameters())
if parameters:
input_copies = int(parameters.get("input_copies", self._input_copies))
delay = float(parameters.get("delay", self._delay))
for input_tensor in request.inputs():
input_value = input_tensor.as_numpy()
output_value = []
if self._model_instance_kind == "GPU" and cupy is not None:
with cupy.cuda.Device(self._model_instance_device_id):
input_value = cupy.array(input_value)
output_value = cupy.invert(input_value)
output_value = output_value[::input_copies]
output_tensor = pb_utils.Tensor.from_dlpack(
"output", output_value
)
else:
output_value = numpy.invert(input_value)
output_value = output_value[::input_copies]
output_tensor = pb_utils.Tensor("output", output_value)
output_tensors.append(output_tensor)
time.sleep(len(output_value) * delay)
responses.append(pb_utils.InferenceResponse(output_tensors=output_tensors))
return responses
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## Model Instance and Kind are filled in by configuration when launched
## All other values are filled in by auto_complete in model.py
backend: "python"
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import time
import numpy
import triton_python_backend_utils as pb_utils
try:
import cupy
except Exception:
cupy = None
class TritonPythonModel:
@staticmethod
def auto_complete_config(auto_complete_model_config):
"""Auto Complets Model Config
Model has one input and one output
both of type int64
Parameters
----------
auto_complete_model_config : config
Enables reading and updating config.pbtxt
"""
input_config = {
"name": "input",
"data_type": "TYPE_INT64",
"dims": [-1],
"optional": False,
}
output_config = {
"name": "output",
"data_type": "TYPE_INT64",
"dims": [-1],
}
copies_config = {
"name": "input_copies",
"data_type": "TYPE_INT64",
"dims": [1],
}
auto_complete_model_config.add_input(input_config)
auto_complete_model_config.add_output(output_config)
auto_complete_model_config.add_output(copies_config)
auto_complete_model_config.set_max_batch_size(0)
auto_complete_model_config.set_model_transaction_policy({"decoupled": False})
return auto_complete_model_config
def initialize(self, args):
self._model_config = json.loads(args["model_config"])
self._model_instance_kind = args["model_instance_kind"]
self._model_instance_device_id = int(args["model_instance_device_id"])
self._config_parameters = self._model_config.get("parameters", {})
self._input_copies = int(
self._config_parameters.get("input_copies", {"string_value": "5"})[
"string_value"
]
)
self._delay = float(
self._config_parameters.get("delay", {"string_value": "0"})["string_value"]
)
if self._model_instance_kind == "GPU" and cupy is None:
raise RuntimeError("GPU Device set but cupy not installed")
def execute(self, requests):
responses = []
input_copies = self._input_copies
delay = self._delay
for request in requests:
output_tensors = []
parameters = json.loads(request.parameters())
if parameters:
input_copies = int(parameters.get("input_copies", self._input_copies))
delay = float(parameters.get("delay", self._delay))
for input_tensor in request.inputs():
input_value = input_tensor.as_numpy()
output_value = []
if self._model_instance_kind == "GPU" and cupy is not None:
with cupy.cuda.Device(self._model_instance_device_id):
input_value = cupy.array(input_value)
output_value = cupy.tile(input_value, input_copies)
output_value = cupy.invert(output_value)
output_tensor = pb_utils.Tensor.from_dlpack(
"output", output_value
)
else:
output_value = numpy.tile(input_value, input_copies)
output_value = numpy.invert(output_value)
output_tensor = pb_utils.Tensor("output", output_value)
output_tensors.append(output_tensor)
output_tensors.append(
pb_utils.Tensor(
"input_copies", numpy.array(input_copies).astype("int64")
)
)
time.sleep(len(output_value) * delay)
responses.append(pb_utils.InferenceResponse(output_tensors=output_tensors))
return responses
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## Model Instance and Kind are filled in by configuration when launched
## All other values are filled in by auto_complete in model.py
backend: "python"
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import shutil
import sys
from pathlib import Path
import cupy
import numpy
from tqdm import tqdm
from triton_distributed.icp.nats_request_plane import NatsRequestPlane
from triton_distributed.icp.ucp_data_plane import UcpDataPlane
from triton_distributed.worker import (
Deployment,
Operator,
OperatorConfig,
RemoteInferenceRequest,
RemoteOperator,
TritonCoreOperator,
WorkerConfig,
)
from tritonserver import MemoryType
class EncodeDecodeOperator(Operator):
def __init__(
self,
name,
version,
triton_core,
request_plane,
data_plane,
parameters,
repository,
logger,
):
self._encoder = RemoteOperator("encoder", request_plane, data_plane)
self._decoder = RemoteOperator("decoder", request_plane, data_plane)
self._logger = logger
async def execute(self, requests: list[RemoteInferenceRequest]):
for request in requests:
self._logger.info("got request!")
encoded_responses = await self._encoder.async_infer(
inputs={"input": request.inputs["input"]}
)
async for encoded_response in encoded_responses:
input_copies = int(
numpy.from_dlpack(encoded_response.outputs["input_copies"])
)
decoded_responses = await self._decoder.async_infer(
inputs={"input": encoded_response.outputs["output"]},
parameters={"input_copies": input_copies},
)
async for decoded_response in decoded_responses:
await request.response_sender().send(
final=True,
outputs={"output": decoded_response.outputs["output"]},
)
del decoded_response
async def send_requests(nats_server_url, request_count=10):
request_plane = NatsRequestPlane(nats_server_url)
data_plane = UcpDataPlane()
await request_plane.connect()
data_plane.connect()
remote_operator: RemoteOperator = RemoteOperator(
"encoder_decoder", request_plane, data_plane
)
inputs = [
numpy.array(numpy.random.randint(0, 100, 10000)).astype("int64")
for _ in range(request_count)
]
with tqdm(total=request_count, desc="Sending Requests", unit="request") as pbar:
requests = [
await remote_operator.async_infer(
inputs={"input": inputs[index]}, request_id=str(index)
)
for index in range(request_count)
]
for request in requests:
async for response in request:
for output_name, output_value in response.outputs.items():
if output_value.memory_type == MemoryType.CPU:
output = numpy.from_dlpack(output_value)
numpy.testing.assert_array_equal(
output, inputs[int(response.request_id)]
)
else:
output = cupy.from_dlpack(output_value)
cupy.testing.assert_array_equal(
output, inputs[int(response.request_id)]
)
del output_value
pbar.set_description(
f"Finished Request: {response.request_id} Response From: {response.component_id} Error: {response.error}"
)
pbar.update(1)
del response
await request_plane.close()
data_plane.close()
async def main():
module_dir = Path(__file__).parent.absolute()
log_dir = module_dir.joinpath("logs")
if log_dir.is_dir():
shutil.rmtree(log_dir)
log_dir.mkdir(exist_ok=True)
triton_core_models_dir = module_dir.joinpath("operators", "triton_core_models")
encoder_op = OperatorConfig(
name="encoder",
repository=str(triton_core_models_dir),
implementation=TritonCoreOperator,
max_inflight_requests=1,
parameters={
"config": {
"instance_group": [{"count": 1, "kind": "KIND_CPU"}],
"parameters": {"delay": {"string_value": "0"}},
}
},
)
decoder_op = OperatorConfig(
name="decoder",
repository=str(triton_core_models_dir),
implementation=TritonCoreOperator,
max_inflight_requests=1,
parameters={
"config": {
"instance_group": [{"count": 1, "kind": "KIND_GPU"}],
"parameters": {"delay": {"string_value": "0"}},
}
},
)
encoder_decoder_op = OperatorConfig(
name="encoder_decoder",
implementation=EncodeDecodeOperator,
max_inflight_requests=100,
)
encoder = WorkerConfig(
operators=[encoder_op],
name="encoder",
)
decoder = WorkerConfig(
operators=[decoder_op],
name="decoder",
)
encoder_decoder = WorkerConfig(
operators=[encoder_decoder_op],
name="encoder_decoder",
)
print("Starting Workers")
# You can configure the number of instances of each
# type of worker in a deployment
num_instances = 1
deployment = Deployment(
[
(encoder, num_instances),
(decoder, num_instances),
(encoder_decoder, num_instances),
],
initialize_request_plane=True,
log_dir=str(log_dir),
log_level=1,
)
deployment.start()
print("Sending Requests")
await send_requests(deployment.request_plane_server.url)
print("Stopping Workers")
sys.exit(deployment.stop())
if __name__ == "__main__":
asyncio.run(main())
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import pytest
# TODO
# Decide if this should be
# pre merge, nightly, or weekly
pytestmark = pytest.mark.pre_merge
@pytest.mark.skip("interactions with sanity test")
def test_single_file():
command = [
"python3",
"examples/hello_world/single_file.py",
]
process = subprocess.Popen(
command,
stdin=subprocess.DEVNULL,
)
try:
process.wait(60)
except subprocess.TimeoutExpired:
print("single file timed out!")
process.terminate()
process.kill()
assert process.returncode == 0, "Error in single file!"
def test_sanity():
deployment_command = [
"python3",
"-m",
"hello_world.deploy",
"--initialize-request-plane",
]
deployment_process = subprocess.Popen(
deployment_command,
stdin=subprocess.DEVNULL,
)
client_command = [
"python3",
"-m",
"hello_world.client",
"--requests-per-client",
"10",
]
client_process = subprocess.Popen(
client_command,
stdin=subprocess.DEVNULL,
)
try:
client_process.wait(timeout=60)
except subprocess.TimeoutExpired:
print("Client timed out!")
client_process.terminate()
client_process.wait()
client_process.terminate()
client_process.kill()
client_process.wait()
deployment_process.terminate()
deployment_process.wait()
assert client_process.returncode == 0, "Error in clients!"
assert deployment_process.returncode == 0, "Error starting deployment!"
......@@ -12,3 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from triton_distributed.icp.data_plane import DataPlane as DataPlane
from triton_distributed.icp.nats_request_plane import (
NatsRequestPlane as NatsRequestPlane,
)
from triton_distributed.icp.nats_request_plane import NatsServer as NatsServer
from triton_distributed.icp.request_plane import RequestPlane as RequestPlane
from triton_distributed.icp.ucp_data_plane import UcpDataPlane as UcpDataPlane
......@@ -22,5 +22,8 @@ from triton_distributed.worker.remote_request import (
from triton_distributed.worker.remote_response import (
RemoteInferenceResponse as RemoteInferenceResponse,
)
from triton_distributed.worker.triton_core_operator import (
TritonCoreOperator as TritonCoreOperator,
)
from triton_distributed.worker.worker import Worker as Worker
from triton_distributed.worker.worker import WorkerConfig as WorkerConfig
......@@ -13,33 +13,119 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
from pprint import pformat
from typing import Optional, Type
from triton_distributed.icp import (
DataPlane,
NatsRequestPlane,
NatsServer,
RequestPlane,
UcpDataPlane,
)
from triton_distributed.worker.log_formatter import setup_logger
from triton_distributed.worker.worker import Worker, WorkerConfig
from tritonserver import InvalidArgumentError
LOGGER_NAME = __name__
class Deployment:
def __init__(self, worker_configs: list[WorkerConfig]):
def __init__(
self,
worker_configs: list[WorkerConfig | tuple[WorkerConfig, int]],
log_level=3,
initialize_request_plane=False,
initialize_data_plane=False,
request_plane_args: Optional[tuple[list, dict]] = None,
request_plane: Optional[Type[RequestPlane]] = NatsRequestPlane,
data_plane: Optional[Type[DataPlane]] = UcpDataPlane,
data_plane_args: Optional[tuple[list, dict]] = None,
log_dir="logs",
starting_metrics_port=0,
):
self._process_context = multiprocessing.get_context("spawn")
self._worker_configs = worker_configs
self._workers: list[multiprocessing.context.SpawnProcess] = []
self._logger = setup_logger(log_level, LOGGER_NAME)
self._default_request_plane = request_plane
self._default_request_plane_args = request_plane_args
self._default_data_plane = data_plane
self._default_data_plane_args = data_plane_args
self._initialize_request_plane = initialize_request_plane
self._initialize_data_plane = initialize_data_plane
self.request_plane_server: NatsServer = None
self._default_log_dir = log_dir
self._default_log_level = log_level
self._starting_metrics_port = starting_metrics_port
@staticmethod
def _start_worker(worker_config):
Worker(worker_config).start()
def start(self):
if self._initialize_request_plane:
if self._default_request_plane == NatsRequestPlane:
self.request_plane_server = NatsServer(log_dir=self._default_log_dir)
else:
raise InvalidArgumentError(
f"Unknown Request Plane Type, can not initialize {self._default_request_plane}"
)
for worker_config in self._worker_configs:
self._workers.append(
self._process_context.Process(
target=Deployment._start_worker,
name=worker_config.name,
args=[worker_config],
worker_instances = 1
if isinstance(worker_config, tuple):
worker_instances = worker_config[1]
worker_config = worker_config[0]
base_name = worker_config.name
base_port = worker_config.metrics_port
if not base_port and self._starting_metrics_port:
base_port = self._starting_metrics_port
self._starting_metrics_port += worker_instances
request_plane_args, request_plane_kwargs = worker_config.request_plane_args
if not request_plane_args and not request_plane_kwargs:
if self._default_request_plane_args:
worker_config.request_plane_args = self._default_request_plane_args
elif self.request_plane_server:
worker_config.request_plane_args = (
[self.request_plane_server.url],
{},
)
if not worker_config.log_dir:
worker_config.log_dir = self._default_log_dir
if not worker_config.log_level:
worker_config.log_level = self._default_log_level
for index in range(worker_instances):
worker_config.name = f"{base_name}.{index}"
worker_config.metrics_port = base_port + index
self._workers.append(
self._process_context.Process(
target=Deployment._start_worker,
name=worker_config.name,
args=[worker_config],
)
)
self._logger.info(
"\n\nStarting Worker:\n\n\tConfig:\n\t%s\n\t%s\n",
pformat(worker_config),
self._workers[-1],
)
)
self._workers[-1].start()
self._workers[-1].start()
def stop(self):
return self.shutdown()
def shutdown(self, join=True, timeout=10):
exit_code = 0
for worker in self._workers:
self._logger.info("\n\nStopping Worker:\n\n\n\t%s\n", worker)
worker.terminate()
if join:
for worker in self._workers:
......@@ -47,4 +133,14 @@ class Deployment:
for worker in self._workers:
if worker.is_alive():
worker.kill()
worker.join(timeout)
worker.join(timeout)
self._logger.info("\n\nWorker Stopped:\n\n\n\t%s\n", worker)
if worker.exitcode is not None:
# Note we accumulate exit codes
# assumption being no error is exit_code==0
# anything else represents an error
#
# this is to catch some obvious errors but not all
exit_code += worker.exitcode
return exit_code
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment