Commit e0bb5bd3 authored by Tanmay Verma's avatar Tanmay Verma Committed by GitHub
Browse files

feat: LLMAPI PoC with dynamo-run launcher (#114)

parent 76b79149
......@@ -109,8 +109,8 @@ Note: The following commands are tested on machines withH100x8 GPUs
```bash
# Launch worker
cd /workspace/examples/python_rs/llm/tensorrt_llm
mpirun --allow-run-as-root -n 1 --oversubscribe python3 -m monolith.worker --engine_args llm_api_config.yaml 1>agg_worker.log 2>&1 &
cd /workspace/examples/python_rs/llm/trtllm
mpirun --allow-run-as-root -n 1 --oversubscribe python3 -m monolith.launch --engine_args llm_api_config.yaml 1>agg_worker.log 2>&1 &
```
Upon successful launch, the output should look similar to:
......@@ -240,7 +240,7 @@ WORLD_SIZE is the total number of workers covering all the servers described in
For example, 2 TP2 generation servers are 2 servers but 4 workers/mpi executor.
```bash
cd /workspace/examples/python_rs/llm/tensorrt_llm/
cd /workspace/examples/python_rs/llm/trtllm/
mpirun --allow-run-as-root --oversubscribe -n WORLD_SIZE python3 -m disaggregated.worker --engine_args llm_api_config.yaml -c disaggregated/llmapi_disaggregated_configs/single_node_config.yaml 1>disagg_workers.log 2>&1 &
```
If using the provided [single_node_config.yaml](disaggregated/llmapi_disaggregated_configs/single_node_config.yaml), WORLD_SIZE should be 2 as it has 1 context servers(TP=1) and 1 generation server(TP=1).
......@@ -248,7 +248,7 @@ If using the provided [single_node_config.yaml](disaggregated/llmapi_disaggregat
2. **Launch the router**
```bash
cd /workspace/examples/python_rs/llm/tensorrt_llm/
cd /workspace/examples/python_rs/llm/trtllm/
python3 -m disaggregated.router 1>router.log 2>&1 &
```
......@@ -309,7 +309,7 @@ export ETCD_ENDPOINTS="http://node1:2379,http://node2:2379"
3. Launch the workers from node1 or login node. WORLD_SIZE is similar to single node deployment.
```bash
srun --mpi pmix -N NUM_NODES --ntasks WORLD_SIZE --ntasks-per-node=WORLD_SIZE --no-container-mount-home --overlap --container-image IMAGE --output batch_%x_%j.log --err batch_%x_%j.err --container-mounts PATH_TO_DYNAMO:/workspace --container-env=NATS_SERVER,ETCD_ENDPOINTS bash -c 'cd /workspace/examples/python_rs/llm/tensorrt_llm && python3 -m disaggregated.worker --engine_args llm_api_config.yaml -c disaggregated/llmapi_disaggregated_configs/multi_node_config.yaml' &
srun --mpi pmix -N NUM_NODES --ntasks WORLD_SIZE --ntasks-per-node=WORLD_SIZE --no-container-mount-home --overlap --container-image IMAGE --output batch_%x_%j.log --err batch_%x_%j.err --container-mounts PATH_TO_DYNAMO:/workspace --container-env=NATS_SERVER,ETCD_ENDPOINTS bash -c 'cd /workspace/examples/python_rs/llm/trtllm && python3 -m disaggregated.worker --engine_args llm_api_config.yaml -c disaggregated/llmapi_disaggregated_configs/multi_node_config.yaml' &
```
Once the workers are launched, you should see the output similar to the following in the worker logs.
......@@ -326,7 +326,7 @@ Once the workers are launched, you should see the output similar to the followin
4. Launch the router from node1 or login node.
```bash
srun --mpi pmix -N 1 --ntasks 1 --ntasks-per-node=1 --overlap --container-image IMAGE --output batch_router_%x_%j.log --err batch_router_%x_%j.err --container-mounts PATH_TO_DYNAMO:/workspace --container-env=NATS_SERVER,ETCD_ENDPOINTS bash -c 'cd /workspace/examples/python_rs/llm/tensorrt_llm && python3 -m disaggregated.router' &
srun --mpi pmix -N 1 --ntasks 1 --ntasks-per-node=1 --overlap --container-image IMAGE --output batch_router_%x_%j.log --err batch_router_%x_%j.err --container-mounts PATH_TO_DYNAMO:/workspace --container-env=NATS_SERVER,ETCD_ENDPOINTS bash -c 'cd /workspace/examples/python_rs/llm/trtllm && python3 -m disaggregated.router' &
```
5. Send requests to the router.
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import signal
import uuid
from common.base_engine import BaseTensorrtLLMEngine
from common.processor import merge_promises, parse_chat_message_content
from tensorrt_llm.executor import CppExecutorError
from tensorrt_llm.logger import logger
logger.set_level("debug")
async def chat_generator(engine: BaseTensorrtLLMEngine, request):
if engine._llm_engine is None:
raise RuntimeError("Engine not initialized")
logger.debug(f"Received chat request: {request}")
request_id = str(uuid.uuid4())
engine._ongoing_request_count += 1
try:
conversation = []
for message in request.messages:
conversation.extend(parse_chat_message_content(message))
tool_dicts = (
None
if request.tools is None
else [tool.model_dump() for tool in request.tools]
)
prompt: str = engine._tokenizer.apply_chat_template(
conversation=conversation,
tokenize=False,
add_generation_prompt=request.add_generation_prompt,
tools=tool_dicts,
documents=request.documents,
chat_template=request.chat_template,
**(request.chat_template_kwargs or {}),
)
sampling_params = request.to_sampling_params()
promise = engine._llm_engine.generate_async(
prompt,
sampling_params,
streaming=request.stream,
)
# NOTE: somehow stream and non-stream is working with the same path
response_generator = engine.chat_processor.stream_response(
request, request_id, conversation, promise
)
async for response in response_generator:
yield response
engine._ongoing_request_count -= 1
except CppExecutorError:
# If internal executor error is raised, shutdown the server
signal.raise_signal(signal.SIGINT)
except Exception as e:
raise RuntimeError("Failed to generate: " + str(e))
async def completion_generator(engine: BaseTensorrtLLMEngine, request):
if engine._llm_engine is None:
raise RuntimeError("Engine not initialized")
engine._ongoing_request_count += 1
logger.debug(f"Received completion request: {request}")
if isinstance(request.prompt, str) or (
isinstance(request.prompt, list) and isinstance(request.prompt[0], int)
):
prompts = [request.prompt]
else:
prompts = request.prompt
promises = []
sampling_params = request.to_sampling_params()
try:
for prompt in prompts:
promise = engine._llm_engine.generate_async(
prompt,
sampling_params,
streaming=request.stream,
)
promises.append(promise)
generator = merge_promises(promises)
num_choices = len(prompts) if request.n is None else len(prompts) * request.n
# NOTE: always send `stream: true` to the worker, and decide whether to aggregate or not before sending the response back to client.
response_generator = engine.completions_processor.create_completion_generator(
request, generator, num_choices
)
async for response in response_generator:
yield json.loads(response)
engine._ongoing_request_count -= 1
except CppExecutorError:
# If internal executor error is raised, shutdown the server
signal.raise_signal(signal.SIGINT)
except Exception as e:
raise RuntimeError("Failed to generate: " + str(e))
......@@ -134,5 +134,36 @@ def parse_tensorrt_llm_args() -> Tuple[Any, Tuple[Dict[str, Any], Dict[str, Any]
help="KV block size for TensorRT-LLM. Currently, only supported for context worker in Disaggregated mode.",
default=64,
)
args = parser.parse_args()
return (args, _init_engine_args(args.engine_args))
def parse_dynamo_run_args() -> Tuple[Any, Tuple[Dict[str, Any], Dict[str, Any]]]:
parser = argparse.ArgumentParser(
description="A TensorRT-LLM Dynamo-run engine parser"
)
parser.add_argument(
"--engine_args", type=str, required=True, help="Path to the engine args file"
)
# Disaggregated mode is not supported in dynamo-run launcher yet.
# parser.add_argument(
# "--llmapi-disaggregated-config",
# "-c",
# type=str,
# help="Path to the llmapi disaggregated config file",
# default=None,
# )
parser.add_argument(
"--publish-kv-cache-events",
action="store_true",
help="Publish KV cache events from TensorRT-LLM. Currently, only supported for context worker in Disaggregated mode.",
)
parser.add_argument(
"--publish-stats",
action="store_true",
help="Publish stats from TensorRT-LLM. Currently, only supported for context worker in Disaggregated mode.",
)
args, _ = parser.parse_known_args()
return (args, _init_engine_args(args.engine_args))
......@@ -23,6 +23,8 @@ from typing import Callable, Optional, Union
from tensorrt_llm.logger import logger
logger.set_level("info")
class ManagedThread(threading.Thread):
def __init__(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment