"lib/llm/src/vscode:/vscode.git/clone" did not exist on "9bf9709a655bb061d3a4ded14aee5ebda670fb9c"
Unverified Commit f242b455 authored by Alec's avatar Alec Committed by GitHub
Browse files
parent 3b3d0f2e
......@@ -174,9 +174,9 @@ RUN if [ "$ARCH" = "arm64" ]; then \
# TODO: Move NIXL wheel install to the wheel_builder stage
RUN uv pip install /workspace/wheels/nixl/*.whl
# Install patched vllm - keep this early in Dockerfile to avoid
# Install vllm - keep this early in Dockerfile to avoid
# rebuilds from unrelated source code changes
ARG VLLM_REF="3c545c0c3b98ee642373a308197d750d0e449403"
ARG VLLM_REF="059d4cd"
ENV CUDA_HOME=/usr/local/cuda
RUN --mount=type=bind,source=./container/deps/,target=/tmp/deps \
--mount=type=cache,target=/root/.cache/uv \
......@@ -308,8 +308,6 @@ RUN SNIPPET="export PROMPT_COMMAND='history -a' && export HISTFILE=$HOME/.comman
RUN mkdir -p /home/$USERNAME/.cache/
ENV VLLM_KV_CAPI_PATH=$HOME/dynamo/.build/target/debug/libdynamo_llm_capi.so
ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"]
##################################
......@@ -429,8 +427,6 @@ RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/la
sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \
echo "cat ~/.launch_screen" >> ~/.bashrc
# Tell vllm to use the Dynamo LLM C API for KV Cache Routing
ENV VLLM_KV_CAPI_PATH=/opt/dynamo/bindings/lib/libdynamo_llm_capi.so
ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/nvidia/nvda_nixl/lib/x86_64-linux-gnu/
########################################
......@@ -495,9 +491,6 @@ RUN uv pip install ai-dynamo --find-links wheelhouse && \
ln -sf $VIRTUAL_ENV/bin/* /usr/local/bin/ && \
rm -r wheelhouse
# Tell vllm to use the Dynamo LLM C API for KV Cache Routing
ENV VLLM_KV_CAPI_PATH="/opt/dynamo/bindings/lib/libdynamo_llm_capi.so"
# Copy launch banner
RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/launch_message.txt \
sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \
......
......@@ -15,174 +15,136 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
# vLLM Deployment Examples
# LLM Deployment Examples using vLLM
This directory contains examples for deploying vLLM models in both aggregated and disaggregated configurations.
This directory contains examples and reference implementations for deploying Large Language Models (LLMs) in various configurations using vLLM. For Dynamo integration, we leverage vLLM's native KV cache events, NIXL based transfer mechanisms, and metric reporting to enable KV-aware routing and P/D disaggregation.
## Use the Latest Release
## Deployment Architectures
We recommend using the latest stable release of dynamo to avoid breaking changes:
See [deployment architectures](../llm/README.md#deployment-architectures) to learn about the general idea of the architecture. vLLM supports aggregated, disaggregated, and KV-routed serving patterns.
[![GitHub Release](https://img.shields.io/github/v/release/ai-dynamo/dynamo)](https://github.com/ai-dynamo/dynamo/releases/latest)
## Getting Started
You can find the latest release [here](https://github.com/ai-dynamo/dynamo/releases/latest) and check out the corresponding branch with:
### Prerequisites
```bash
git checkout $(git describe --tags $(git rev-list --tags --max-count=1))
```
## Prerequisites
1. Run Dynamo vLLM V1 docker:
```bash
./container/build.sh --framework VLLM_V1 --target dev
./container/run.sh --framework VLLM_V1 --target dev -it
```
Or install vLLM manually:
```bash
export VLLM_REF=3c545c0c3b98ee642373a308197d750d0e449403
git clone https://github.com/vllm-project/vllm.git
cd vllm
git checkout $VLLM_REF
VLLM_USE_PRECOMPILED=1 uv pip install -e .
```
If you are in the default vllm container remember to uninstall the old vllm using 'uv pip uninstall ai-dynamo-vllm'
Start required services (etcd and NATS) using [Docker Compose](../../deploy/metrics/docker-compose.yml):
2. Start required services:
```bash
docker compose -f deploy/metrics/docker-compose.yml up -d
```
## Running the Server
### Build and Run docker
### Aggregated Deployment
```bash
cd examples/vllm_v1
dynamo serve graphs.agg:Frontend -f configs/agg.yaml
./container/build.sh --framework VLLM_V1
```
### Disaggregated Deployment
```bash
cd examples/vllm_v1
dynamo serve graphs.disagg:Frontend -f configs/disagg.yaml
./container/run.sh -it --framework VLLM_V1
```
## Testing the API
This includes the specific commit [vllm-project/vllm#19790](https://github.com/vllm-project/vllm/pull/19790) which enables support for external control of the DP ranks.
Send a test request using curl:
```bash
curl localhost:8000/v1/completions \
-H "Content-Type: application/json" \
-d '{
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"prompt": "In the heart of Eldoria...",
"stream": false,
"max_tokens": 30
}'
```
## Run Deployment
For more detailed explenations, refer to the main [LLM examples README](../llm/README.md).
This figure shows an overview of the major components to deploy:
```
+------+ +-----------+ +------------------+ +---------------+
| HTTP |----->| dynamo |----->| vLLM Worker |------------>| vLLM Prefill |
| |<-----| ingress |<-----| |<------------| Worker |
+------+ +-----------+ +------------------+ +---------------+
| ^ |
query best | | return | publish kv events
worker | | worker_id v
| | +------------------+
| +---------| kv-router |
+------------->| |
+------------------+
```
Note: The above architecture illustrates all the components. The final components that get spawned depend upon the chosen deployment pattern.
## Deepseek R1
### Example Architectures
To run DSR1 model please first follow the Ray setup from the [multinode documentation](../../docs/examples/multinode.md).
> [!IMPORTANT]
> Below we provide simple shell scripts that run the components for each configuration. Each shell script runs `dynamo run` to start the ingress and uses `python3 main.py` to start the vLLM workers. You can run each command in separate terminals for better log visibility.
### Aggregated Deployment
#### Aggregated Serving
```bash
# requires one gpu
cd examples/vllm_v1
dynamo serve graphs.agg:Frontend -f configs/deepseek_r1/agg.yaml
bash launch/agg.sh
```
#### Aggregated Serving with KV Routing
### Disaggregated Deployment
To create frontend with a single decode worker:
```bash
# requires two gpus
cd examples/vllm_v1
dynamo serve graphs.agg:Frontend -f configs/deepseek_r1/disagg.yaml
bash launch/agg_router.sh
```
To create a single decode worker:
```bash
cd examples/vllm_v1
dynamo serve components.worker:VllmDecodeWorker -f configs/deepseek_r1/disagg.yaml
```
#### Disaggregated Serving
To create a single prefill worker:
```bash
# requires two gpus
cd examples/vllm_v1
dynamo serve components.worker:VllmPrefillWorker -f configs/deepseek_r1/disagg.yaml
bash launch/disagg.sh
```
### Data Parallelism Deployment
Additional configuration steps will be required for enabling DP for DSR1 model,
as it typically requires setting up the DP groups across nodes.
`configs/deepseek_r1/agg_dp.yaml` and `configs/deepseek_r1/disagg_dp.yaml` will be
the replacement for aggregated deployment and disaggregated deployment.
The below demonstration will use deployment of a single worker as an example,
the reader should apply the same to any `dynamo serve` command that will create
a worker.
#### Disaggregated Serving with KV Routing
To create a single decode worker, take note of the IP address, referred as <head-ip> below, of the node, and:
```bash
# requires three gpus
cd examples/vllm_v1
dynamo serve components.worker:VllmDecodeWorker -f configs/deepseek_r1/disagg_dp.yaml --VllmDecodeWorker.data_parallel_address=<head-ip>
bash launch/disagg_router.sh
```
The above command will create 1 of the 2 DP groups and the worker will be consdiered
the head of the DP groups. Next we need to create a `VllmDpWorker` to create the rest of the DP groups, one for each group.
#### Single Node Data Parallel Attention / Expert Parallelism
This example is not meant to be performant but showcases dynamo routing to data parallel workers
```bash
# requires four gpus
cd examples/vllm_v1
# 'data_parallel_start_rank' == `dp_group_index * data_parallel_size_local`
dynamo serve components.worker:VllmDpWorker -f configs/deepseek_r1/disagg_dp.yaml --VllmDpWorker.data_parallel_address=<head-ip> --VllmDpWorker.data_parallel_start_rank=8
# repeat above until all DP groups are created
bash launch/dep.sh
```
### Wide EP
If running oustide of Dynamo vLLM V1 container please follow [vLLM guide](https://github.com/vllm-project/vllm/tree/main/tools/ep_kernels) to install EP kernels and install [DeepGEMM](https://github.com/deepseek-ai/DeepGEMM).
To run DSR1 with DEP16 (EP16 MoE and DP16 for other layers) with [DeepEP kernels](https://github.com/deepseek-ai/DeepEP) run on head node:
```
export VLLM_ALL2ALL_BACKEND="deepep_low_latency" # or "deepep_high_throughput"
export VLLM_USE_DEEP_GEMM=1
export GLOO_SOCKET_IFNAME=eth3 # or another non IB interface that you can find with `ifconfig -a`
cd examples/vllm_v1
dynamo serve components.worker:VllmDecodeWorker -f configs/deepseek_r1/disagg_dp.yaml --VllmDecodeWorker.data_parallel_address=<head-ip> --VllmDecodeWorker.enable_expert_parallel=true
```
on 2nd node:
> [!TIP]
> Run a disaggregated example and try adding another prefill worker once the setup is running! The system will automatically discover and utilize the new worker.
```
export VLLM_ALL2ALL_BACKEND="deepep_low_latency" # or "deepep_high_throughput"
export VLLM_USE_DEEP_GEMM=1
export GLOO_SOCKET_IFNAME=eth3 # or another non IB interface that you can find with `ifconfig -a`
cd examples/vllm_v1
# 'data_parallel_start_rank' == `dp_group_index * data_parallel_size_local`
dynamo serve components.worker:VllmDpWorker -f configs/deepseek_r1/disagg_dp.yaml --VllmDpWorker.data_parallel_address=<head-ip> --VllmDpWorker.data_parallel_start_rank=8 --VllmDpWorker.enable_expert_parallel=true
```
### Testing the Deployment
## Testing
Send a test request to verify your deployment:
Send a test request using curl:
```bash
curl localhost:8000/v1/completions \
curl localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "deepseek-ai/DeepSeek-R1",
"prompt": "In the heart of Eldoria...",
"model": "Qwen/Qwen3-0.6B",
"messages": [
{
"role": "user",
"content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden."
}
],
"stream": false,
"max_tokens": 30
}'
```
## Configuration
vLLM workers are configured through command-line arguments. Key parameters include:
- `--endpoint`: Dynamo endpoint in format `dyn://namespace.component.endpoint`
- `--model`: Model to serve (e.g., `Qwen/Qwen3-0.6B`)
- `--is-prefill-worker`: Enable prefill-only mode for disaggregated serving
- `--metrics-endpoint-port`: Port for publishing KV metrics to Dynamo
See `args.py` for the full list of configuration options and their defaults.
The [documentation](https://docs.vllm.ai/en/v0.9.2/configuration/serve_args.html?h=serve+arg) for the vLLM CLI args points to running 'vllm serve --help' to see what CLI args can be added. We use the same argument parser as vLLM.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import socket
import sys
from typing import Optional
from vllm.config import KVTransferConfig
from vllm.distributed.kv_events import KVEventsConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser
logger = logging.getLogger(__name__)
# Only used if you run it manually from the command line
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
def find_free_port() -> int:
"""Find a free port by binding to port 0."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
port = s.getsockname()[1]
return port
class Config:
"""Command line parameters or defaults"""
# dynamo specific
namespace: str
component: str
endpoint: str
kv_events_port: int
is_prefill_worker: bool
# mirror vLLM
model: str
served_model_name: Optional[str]
# rest vLLM args
engine_args: AsyncEngineArgs
def overwrite_args(config):
defaults = {
"task": "generate",
"skip_tokenizer_init": True,
"disable_log_requests": True,
"enable_prefix_caching": True,
# KV routing relies on logging KV metrics
"disable_log_stats": False,
# Always set up KV Events for routing
"kv_events_config": KVEventsConfig(
enable_kv_cache_events=True,
publisher="zmq",
endpoint=f"tcp://*:{config.kv_events_port}",
),
# Always setting up kv transfer for disagg
"kv_transfer_config": KVTransferConfig(
kv_connector="NixlConnector", kv_role="kv_both"
),
}
# Made decision to always overwrite.
# Respecting users original cmd line args at all costs requires a bunch of arg parse work
logger.debug("Setting Dynamo defaults for vLLM")
for key, value in defaults.items():
if hasattr(config.engine_args, key):
setattr(config.engine_args, key, value)
logger.debug(f" engine_args.{key} = {value}")
else:
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.")
def parse_args() -> Config:
parser = FlexibleArgumentParser(
description="vLLM server integrated with Dynamo LLM."
)
parser.add_argument(
"--endpoint",
type=str,
default=DEFAULT_ENDPOINT,
help=f"Dynamo endpoint string in 'dyn://namespace.component.endpoint' format. Default: {DEFAULT_ENDPOINT}",
)
parser.add_argument(
"--is-prefill-worker",
action="store_true",
help="Enable prefill functionality for this worker. Currently overwrites the --endpoint to be a specially chosen dyn://dynamo.prefill.generate",
)
parser.add_argument(
"--kv-events-port",
type=int,
default=find_free_port(),
help="Endpoint where vLLM publishes metrics for dynamo. For DP, we handle the port iteration.",
)
parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args()
engine_args = AsyncEngineArgs.from_cli_args(args)
config = Config()
config.model = args.model
if args.served_model_name:
assert (
len(args.served_model_name) <= 1
), "We do not support multiple model names."
config.served_model_name = args.served_model_name[0]
else:
# This becomes an `Option` on the Rust side
config.served_model_name = None
if args.is_prefill_worker:
args.endpoint = "dyn://dynamo.prefill.generate"
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
logger.error(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
parsed_namespace, parsed_component_name, parsed_endpoint_name = endpoint_parts
config.namespace = parsed_namespace
config.component = parsed_component_name
config.endpoint = parsed_endpoint_name
config.engine_args = engine_args
config.is_prefill_worker = args.is_prefill_worker
config.kv_events_port = args.kv_events_port
if config.engine_args.block_size is None:
config.engine_args.block_size = 16
logger.debug(
f"Setting reasonable default of {config.engine_args.block_size} for block_size"
)
overwrite_args(config)
return config
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import subprocess
from pathlib import Path
from components.simple_load_balancer import SimpleLoadBalancer
from fastapi import FastAPI
from pydantic import BaseModel
import dynamo.sdk as sdk
from dynamo.planner.planner_sla import Planner
from dynamo.planner.prometheus import Prometheus
from dynamo.sdk import depends, service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
logger = logging.getLogger(__name__)
def get_dynamo_run_binary():
"""Find the dynamo-run binary path in SDK or fallback to 'dynamo-run' command."""
sdk_path = Path(sdk.__file__)
binary_path = sdk_path.parent / "cli/bin/dynamo-run"
if not binary_path.exists():
return "dynamo-run"
else:
return str(binary_path)
class FrontendConfig(BaseModel):
"""Configuration for the Frontend service including model and HTTP server settings."""
served_model_name: str
endpoint: str
port: int = 8080
# TODO: move these to common for all LLMs once we adopt dynamo-run
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
workers=1,
image=DYNAMO_IMAGE,
app=FastAPI(title="LLM Example"),
)
class Frontend:
worker = depends(SimpleLoadBalancer)
planner = depends(Planner)
prometheus = depends(Prometheus)
def __init__(self):
"""Initialize Frontend service with HTTP server and model configuration."""
frontend_config = FrontendConfig(**ServiceConfig.get_parsed_config("Frontend"))
self.frontend_config = frontend_config
self.process = None
self.start_ingress_and_processor()
def start_ingress_and_processor(self):
"""Starting dynamo-run based ingress and processor"""
logger.info(
f"Starting HTTP server and processor on port {self.frontend_config.port}"
)
dynamo_run_binary = get_dynamo_run_binary()
endpoint = f"dyn://{self.frontend_config.endpoint}"
logger.info(
f"Starting HTTP server and processor on port {self.frontend_config.port}"
)
logger.info(f"Endpoint: {endpoint}")
self.process = subprocess.Popen(
[
dynamo_run_binary,
"in=http",
f"out={endpoint}",
"--http-port",
str(self.frontend_config.port),
],
stdout=None,
stderr=None,
)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import uuid
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import AsyncGenerator
import msgspec
from protocol import MyRequestOutput
from vllm.inputs import TokensPrompt
from vllm.sampling_params import SamplingParams
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
class BaseWorkerHandler(ABC):
"""
Request handler for the generate and clear_kv_blocks endpoints.
"""
def __init__(self, component, engine, default_sampling_params):
self.component = component
self.engine_client = engine
self.default_sampling_params = default_sampling_params
self.kv_publisher = None
@abstractmethod
async def generate(self, request) -> AsyncGenerator[dict, None]:
raise NotImplementedError
async def clear_kv_blocks(self, request=None):
try:
await self.engine_client.reset_prefix_cache()
yield {"status": "success", "message": "KV cache cleared"}
except Exception as e:
yield {"status": "error", "message": str(e)}
def cleanup(self):
"""Override in subclasses if cleanup is needed."""
pass
async def generate_tokens(self, prompt, sampling_params, request_id):
gen = self.engine_client.generate(prompt, sampling_params, request_id)
num_output_tokens_so_far = 0
async for res in gen:
# res is vllm's RequestOutput
# This is the expected way for a request to end.
# The new token ID will be eos, don't forward it.
if res.finished:
yield {"finish_reason": "stop", "token_ids": []}
break
if not res.outputs:
yield {"finish_reason": "error", "token_ids": []}
break
output = res.outputs[0]
next_total_toks = len(output.token_ids)
out = {"token_ids": output.token_ids[num_output_tokens_so_far:]}
if output.finish_reason:
out["finish_reason"] = output.finish_reason
if output.stop_reason:
out["stop_reason"] = output.stop_reason
yield out
num_output_tokens_so_far = next_total_toks
class DecodeWorkerHandler(BaseWorkerHandler):
def __init__(
self, component, engine, default_sampling_params, prefill_worker_client=None
):
super().__init__(component, engine, default_sampling_params)
self.prefill_worker_client = prefill_worker_client
self.can_prefill = 0
self._prefill_check_task = None
if self.prefill_worker_client is not None:
self._prefill_check_task = asyncio.create_task(self._prefill_check_loop())
async def _prefill_check_loop(self):
"""Background task that checks prefill worker availability every 5 seconds."""
while True:
try:
if self.prefill_worker_client is not None:
self.can_prefill = len(self.prefill_worker_client.instance_ids())
logger.debug(f"Current Prefill Workers: {self.can_prefill}")
await asyncio.sleep(5)
except Exception as e:
logger.error(f"Error in prefill check loop: {e}")
await asyncio.sleep(5) # Still sleep on error to avoid tight loop
def cleanup(self):
"""Cancel background tasks."""
if self._prefill_check_task is not None:
self._prefill_check_task.cancel()
super().cleanup()
async def generate(self, request):
request_id = str(uuid.uuid4().hex)
prompt = TokensPrompt(prompt_token_ids=request["token_ids"])
sampling_params = SamplingParams(**self.default_sampling_params)
for key, value in request["sampling_options"].items():
if not value:
continue
if hasattr(sampling_params, key):
setattr(sampling_params, key, value)
max_tokens = request["stop_conditions"]["max_tokens"]
if max_tokens:
sampling_params.max_tokens = max_tokens
if self.can_prefill:
# Create a copy for prefill with specific modifications
prefill_sampling_params = deepcopy(sampling_params)
if prefill_sampling_params.extra_args is None:
prefill_sampling_params.extra_args = {}
prefill_sampling_params.extra_args["kv_transfer_params"] = {
"do_remote_decode": True,
}
prefill_sampling_params.max_tokens = 1
prefill_sampling_params.min_tokens = 1
prefill_request = {
"token_ids": request["token_ids"],
"sampling_params": msgspec.to_builtins(prefill_sampling_params),
"request_id": request_id,
}
# TODO Change to prefill queue
if self.prefill_worker_client is not None:
prefill_response = await anext(
await self.prefill_worker_client.round_robin(prefill_request)
)
prefill_response = MyRequestOutput.model_validate_json(
prefill_response.data()
)
# Modify original sampling_params for decode
if sampling_params.extra_args is None:
sampling_params.extra_args = {}
sampling_params.extra_args[
"kv_transfer_params"
] = prefill_response.kv_transfer_params
async for tok in self.generate_tokens(prompt, sampling_params, request_id):
yield tok
class PrefillWorkerHandler(BaseWorkerHandler):
def __init__(self, component, engine, default_sampling_params):
super().__init__(component, engine, default_sampling_params)
async def generate(self, request):
request_id = request["request_id"]
prompt = TokensPrompt(prompt_token_ids=request["token_ids"])
sampling_params = msgspec.convert(request["sampling_params"], SamplingParams)
gen = self.engine_client.generate(prompt, sampling_params, request_id)
# Generate only 1 token in prefill
async for res in gen:
logger.debug(f"kv transfer params: {res.kv_transfer_params}")
yield MyRequestOutput(
request_id=res.request_id,
prompt=res.prompt,
prompt_token_ids=res.prompt_token_ids,
prompt_logprobs=res.prompt_logprobs,
outputs=res.outputs,
finished=res.finished,
metrics=res.metrics,
kv_transfer_params=res.kv_transfer_params,
).model_dump_json()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os
import signal
import socket
from typing import Optional
import uvloop
from args import Config, find_free_port, parse_args
from handlers import DecodeWorkerHandler, PrefillWorkerHandler
from publisher import StatLoggerFactory
from vllm.distributed.kv_events import ZmqEventPublisher
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.async_llm import AsyncLLM
from dynamo.llm import (
ModelType,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
register_llm,
)
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
async def graceful_shutdown(runtime):
"""
By calling `runtime.shutdown()`, the endpoints will immediately be unavailable.
However, in-flight requests will still be processed until they are finished.
After all in-flight requests are finished, the `serve_endpoint` functions will return
and the engine will be shutdown by Python's garbage collector.
"""
logging.info("Received shutdown signal, shutting down DistributedRuntime")
runtime.shutdown()
logging.info("DistributedRuntime shutdown complete")
@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
config = parse_args()
# Set up signal handler for graceful shutdown
loop = asyncio.get_running_loop()
def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
logging.info("Signal handlers set up for graceful shutdown")
if config.is_prefill_worker:
await init_prefill(runtime, config)
else:
await init(runtime, config)
def setup_vllm_engine(config, stat_logger=None):
os.environ["VLLM_NO_USAGE_STATS"] = "1" # Avoid internal HTTP requests
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
set_side_channel_host_and_port()
engine_args = config.engine_args
# Load default sampling params from `generation_config.json`
default_sampling_params = (
engine_args.create_model_config().get_diff_sampling_param()
)
# Taken from build_async_engine_client_from_engine_args()
usage_context = UsageContext.OPENAI_API_SERVER
vllm_config = engine_args.create_engine_config(usage_context=usage_context)
factory = []
if stat_logger:
factory.append(stat_logger)
engine_client = AsyncLLM.from_vllm_config(
vllm_config=vllm_config,
usage_context=usage_context,
stat_loggers=factory,
disable_log_requests=engine_args.disable_log_requests,
disable_log_stats=engine_args.disable_log_stats,
)
logger.info(f"VllmWorker for {config.model} has been initialized")
return engine_client, vllm_config, default_sampling_params
def set_side_channel_host_and_port(
hostname: Optional[str] = None, port: Optional[int] = None
):
"""vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
This sets the port number for the side channel.
"""
if hostname is None:
hostname = socket.gethostname()
if port is None:
port = find_free_port()
logger.debug("Setting VLLM_NIXL_SIDE_CHANNEL_HOST to %s", hostname)
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = hostname
logger.debug("Setting VLLM_NIXL_SIDE_CHANNEL_PORT to %s", port)
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port)
async def init_prefill(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
generate_endpoint = component.endpoint(config.endpoint)
clear_endpoint = component.endpoint("clear_kv_blocks")
engine_client, _, default_sampling_params = setup_vllm_engine(config)
# TODO register_prefill in similar vein to register_llm
handler = PrefillWorkerHandler(component, engine_client, default_sampling_params)
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(handler.generate),
clear_endpoint.serve_endpoint(handler.clear_kv_blocks),
)
except Exception as e:
logger.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
async def init(runtime: DistributedRuntime, config: Config):
"""
Instantiate and serve
"""
component = runtime.namespace(config.namespace).component(config.component)
await component.create_service()
generate_endpoint = component.endpoint(config.endpoint)
clear_endpoint = component.endpoint("clear_kv_blocks")
prefill_worker_client = (
await runtime.namespace("dynamo")
.component("prefill") # TODO don't hardcode
.endpoint("generate")
.client()
)
if not config.engine_args.data_parallel_rank: # if rank is 0 or None then register
await register_llm(
ModelType.Backend,
generate_endpoint,
config.model,
config.served_model_name,
kv_cache_block_size=config.engine_args.block_size,
)
factory = StatLoggerFactory(component, config.engine_args.data_parallel_rank or 0)
engine_client, vllm_config, default_sampling_params = setup_vllm_engine(
config, factory
)
# TODO Hack to get data, move this to registering in ETCD
factory.set_num_gpu_blocks_all(vllm_config.cache_config.num_gpu_blocks)
factory.set_request_total_slots_all(vllm_config.scheduler_config.max_num_seqs)
factory.init_publish()
logger.info(f"VllmWorker for {config.model} has been initialized")
# TODO: We start off with a valid endpoint, then we increment it by dp_rank
# May no longer be valid. Lets remove the increment behavior from vLLM and here
zmq_endpoint = ZmqEventPublisher.offset_endpoint_port(
config.engine_args.kv_events_config.endpoint,
data_parallel_rank=config.engine_args.data_parallel_rank or 0,
).replace("*", "127.0.0.1")
zmq_config = ZmqKvEventPublisherConfig(
worker_id=generate_endpoint.lease_id(),
kv_block_size=vllm_config.cache_config.block_size,
zmq_endpoint=zmq_endpoint,
)
kv_publisher = ZmqKvEventPublisher(component=component, config=zmq_config)
logger.info(f"Reading Events from {zmq_endpoint}")
handler = DecodeWorkerHandler(
component, engine_client, default_sampling_params, prefill_worker_client
)
handler.kv_publisher = kv_publisher
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(handler.generate),
clear_endpoint.serve_endpoint(handler.clear_kv_blocks),
)
except Exception as e:
logger.error(f"Failed to serve endpoints: {e}")
raise
finally:
# Cleanup background tasks
handler.cleanup()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
......@@ -13,102 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Any, List, Optional
import msgspec
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic_core import core_schema
from typing_extensions import NotRequired
from vllm.inputs import TokensPrompt
from vllm.lora.request import LoRARequest
from pydantic import BaseModel, ConfigDict
from vllm.outputs import CompletionOutput
from vllm.sampling_params import SamplingParams
from vllm.sequence import PromptLogprobs, RequestMetrics
TokenIdType = int
# TODO: move these to common for all LLMs once we adopt dynamo-run
# derived from lib/llm/src/protocols/common/preprocessor.rs
class StopConditions(BaseModel):
max_tokens: Optional[int] = None
stop: Optional[List[str]] = None
stop_token_ids_hidden: Optional[List[TokenIdType]] = None
min_tokens: Optional[int] = None
ignore_eos: Optional[bool] = None
class SamplingOptions(BaseModel):
n: Optional[int] = None
best_of: Optional[int] = None
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
repetition_penalty: Optional[float] = None
temperature: Optional[float] = None
top_p: Optional[float] = None
top_k: Optional[int] = None
min_p: Optional[float] = None
use_beam_search: Optional[bool] = None
length_penalty: Optional[float] = None
seed: Optional[int] = None
class PreprocessedRequest(BaseModel):
token_ids: List[TokenIdType]
stop_conditions: StopConditions
sampling_options: SamplingOptions
eos_token_ids: List[TokenIdType] = Field(default_factory=list)
mdc_sum: Optional[str] = None
annotations: List[str] = Field(default_factory=list)
# Hack to override the type of multi_modal_data in TokensPrompt
# as pydantic doesn't understand generic types
# TokensPrompt is defined here: https://github.com/vllm-project/vllm/blob/a4c402a756fa3213caf9d2cde0e4ceb2d57727f2/vllm/inputs/data.py#L38
# multi_modal_data is defined here: https://github.com/vllm-project/vllm/blob/main/vllm/multimodal/inputs.py#L103
# ModalityData is defined here: https://github.com/vllm-project/vllm/blob/main/vllm/multimodal/inputs.py#L80
class PatchedTokensPrompt(TokensPrompt):
multi_modal_data: NotRequired[Optional[Any]] # type: ignore
# Monkey-patch the SamplingParams and KVTransferParams types to add a dummy core schema so pydantic can validate them
# Sampling params is a mspspec struct
# SamplingParams is defined here: https://github.com/vllm-project/vllm/blob/a4c402a756fa3213caf9d2cde0e4ceb2d57727f2/vllm/sampling_params.py#L88
SamplingParams.__get_pydantic_core_schema__ = classmethod(
lambda cls, source, handler: core_schema.any_schema()
)
LoRARequest.__get_pydantic_core_schema__ = classmethod(
lambda cls, source, handler: core_schema.any_schema()
)
class vLLMGenerateRequest(BaseModel):
"""
Serializable class of all the fields vLLM engine requires for inference
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
prompt: PatchedTokensPrompt
sampling_params: SamplingParams
request_id: str
@field_validator("sampling_params", mode="before")
@classmethod
def parse_sampling_params(cls, v: Any) -> SamplingParams:
if isinstance(v, str):
v = json.loads(v)
if isinstance(v, dict):
return SamplingParams(**v)
return v
model_config = ConfigDict(
json_encoders={SamplingParams: lambda v: msgspec.json.encode(v)}
)
class MyRequestOutput(BaseModel):
"""
......@@ -128,9 +38,9 @@ class MyRequestOutput(BaseModel):
outputs: List[CompletionOutput]
finished: bool
metrics: Optional[RequestMetrics] = None
kv_transfer_params: Optional[dict[str, Any]] = None
# lora_request: Optional[LoRARequest] = None
# encoder_prompt: Optional[str] = None
# encoder_prompt_token_ids: Optional[List[int]] = None
# num_cached_tokens: Optional[int] = None
# multi_modal_placeholders: Optional[MultiModalPlaceholderDict] = None
kv_transfer_params: Optional[dict[str, Any]] = None
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from vllm.config import VllmConfig
from vllm.v1.metrics.loggers import StatLoggerBase
from vllm.v1.metrics.stats import IterationStats, SchedulerStats
from dynamo.llm import (
ForwardPassMetrics,
KvStats,
SpecDecodeStats,
WorkerMetricsPublisher,
WorkerStats,
)
from dynamo.runtime import Component
class NullStatLogger(StatLoggerBase):
def __init__(self):
pass
def record(
self,
scheduler_stats: Optional[SchedulerStats],
iteration_stats: Optional[IterationStats],
):
pass
def log_engine_initialized(self):
pass
class DynamoStatLoggerPublisher(StatLoggerBase):
"""Stat logger publisher. Wrapper for the WorkerMetricsPublisher to match the StatLoggerBase interface."""
def __init__(self, component: Component, dp_rank: int) -> None:
self.inner = WorkerMetricsPublisher()
self.inner.create_endpoint(component)
self.dp_rank = dp_rank
self.num_gpu_block = 1
self.request_total_slots = 1
# TODO: Remove this and pass as metadata through etcd
def set_num_gpu_block(self, num_blocks):
self.num_gpu_block = num_blocks
# TODO: Remove this and pass as metadata through etcd
def set_num_request_total_slots(self, request_total_slots):
self.request_total_slots = request_total_slots
def record(
self, scheduler_stats: SchedulerStats, iteration_stats: Optional[IterationStats]
):
# request_total_slots and kv_total_blocks are properties of model + gpu
# we should only publish them once, not every metric update
# they should be part of some runtime metadata tied to MDC or put in etcd ?
hit_rate = 0
if scheduler_stats.prefix_cache_stats.queries > 0:
hit_rate = (
scheduler_stats.prefix_cache_stats.hits
/ scheduler_stats.prefix_cache_stats.queries
)
worker_stats = WorkerStats(
request_active_slots=scheduler_stats.num_running_reqs,
request_total_slots=self.request_total_slots,
num_requests_waiting=scheduler_stats.num_waiting_reqs,
data_parallel_rank=self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=int(self.num_gpu_block * scheduler_stats.kv_cache_usage),
kv_total_blocks=self.num_gpu_block,
gpu_cache_usage_perc=scheduler_stats.kv_cache_usage,
gpu_prefix_cache_hit_rate=hit_rate, # TODO: This is a point in time update, not cumulative. Will be problematic on router side if we try to use it.
)
spec_dec_stats = scheduler_stats.spec_decoding_stats
if spec_dec_stats:
spec_dec_stats = SpecDecodeStats(
num_spec_tokens=spec_dec_stats.num_spec_tokens,
num_drafts=spec_dec_stats.num_drafts,
num_draft_tokens=spec_dec_stats.num_draft_tokens,
num_accepted_tokens=spec_dec_stats.num_accepted_tokens,
num_accepted_tokens_per_pos=spec_dec_stats.num_accepted_tokens_per_pos,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)
self.inner.publish(metrics)
def init_publish(self):
worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=self.request_total_slots,
num_requests_waiting=0,
data_parallel_rank=self.dp_rank,
)
kv_stats = KvStats(
kv_active_blocks=0,
kv_total_blocks=self.num_gpu_block,
gpu_cache_usage_perc=0,
gpu_prefix_cache_hit_rate=0,
)
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.inner.publish(metrics)
def log_engine_initialized(self) -> None:
pass
class StatLoggerFactory:
"""Factory for creating stat logger publishers. Required by vLLM."""
def __init__(self, component: Component, dp_rank: int = 0) -> None:
self.component = component
self.created_logger: Optional[DynamoStatLoggerPublisher] = None
self.dp_rank = dp_rank
def create_stat_logger(self, dp_rank: int) -> StatLoggerBase:
if self.dp_rank != dp_rank:
return NullStatLogger()
logger = DynamoStatLoggerPublisher(self.component, dp_rank)
self.created_logger = logger
return logger
def __call__(self, vllm_config: VllmConfig, dp_rank: int) -> StatLoggerBase:
return self.create_stat_logger(dp_rank=dp_rank)
# TODO Remove once we publish metadata to etcd
def set_num_gpu_blocks_all(self, num_blocks):
if self.created_logger:
self.created_logger.set_num_gpu_block(num_blocks)
def set_request_total_slots_all(self, request_total_slots):
if self.created_logger:
self.created_logger.set_num_request_total_slots(request_total_slots)
def init_publish(self):
if self.created_logger:
self.created_logger.init_publish()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import logging
import uuid
from typing import AsyncGenerator, Optional
from components.worker import VllmDecodeWorker, VllmPrefillWorker
from utils.args import parse_vllm_args
from utils.protocol import MyRequestOutput, PreprocessedRequest, vLLMGenerateRequest
from vllm.inputs import TokensPrompt
from vllm.sampling_params import SamplingParams
from dynamo.llm import ModelType, register_llm
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service
logger = logging.getLogger(__name__)
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
resources={"cpu": "10", "memory": "20Gi"},
workers=1,
)
class SimpleLoadBalancer:
prefill_worker = depends(VllmPrefillWorker)
decode_worker = depends(VllmDecodeWorker)
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
model_config = self.engine_args.create_model_config()
self.default_sampling_params = model_config.get_diff_sampling_param()
self.enable_disagg = self.engine_args.enable_disagg
@async_on_start
async def async_init(self):
runtime = dynamo_context["runtime"]
logger.info("Registering LLM for discovery")
comp_ns, comp_name = SimpleLoadBalancer.dynamo_address() # type: ignore
endpoint_name = "generate"
for served_model_name in self.engine_args.served_model_name:
logger.info(
f"Registering endpoint {endpoint_name} with model {self.engine_args.model} and served_model_name {served_model_name}"
)
endpoint = (
runtime.namespace(comp_ns).component(comp_name).endpoint(endpoint_name)
)
await register_llm(
ModelType.Backend,
endpoint,
self.engine_args.model,
served_model_name,
)
comp_ns, comp_name = VllmDecodeWorker.dynamo_address() # type: ignore
self.decode_worker_client = (
await runtime.namespace(comp_ns)
.component(comp_name)
.endpoint("generate")
.client()
)
comp_ns, comp_name = VllmPrefillWorker.dynamo_address() # type: ignore
self.prefill_worker_client = (
await runtime.namespace(comp_ns)
.component(comp_name)
.endpoint("generate")
.client()
)
logger.info("SimpleLoadBalancer has been initialized")
async def send_request_to_prefill(
self, request: vLLMGenerateRequest
) -> MyRequestOutput:
logger.debug("Sending request to prefill")
prefill_request = copy.deepcopy(request)
extra_args = prefill_request.sampling_params.extra_args or {}
extra_args["kv_transfer_params"] = {
"do_remote_decode": True,
}
prefill_request.sampling_params.extra_args = extra_args
prefill_request.sampling_params.max_tokens = 1
prefill_request.sampling_params.min_tokens = 1
logger.debug("Prefill request: %s", prefill_request.model_dump_json())
async for prefill_response in await self.prefill_worker_client.round_robin(
prefill_request.model_dump_json()
):
return MyRequestOutput.model_validate_json(prefill_response.data())
async def send_request_to_decode(
self,
request: vLLMGenerateRequest,
prefill_response: Optional[MyRequestOutput] = None,
) -> AsyncGenerator[MyRequestOutput, None]:
logger.debug("Sending request to decode")
decode_request = copy.deepcopy(request)
if prefill_response:
extra_args = decode_request.sampling_params.extra_args or {}
extra_args["kv_transfer_params"] = prefill_response.kv_transfer_params
decode_request.sampling_params.extra_args = extra_args
logger.debug("Decode request: %s", decode_request.model_dump_json())
async for decode_response in await self.decode_worker_client.round_robin(
decode_request.model_dump_json()
):
yield MyRequestOutput.model_validate_json(decode_response.data())
@endpoint()
async def generate(self, request: PreprocessedRequest):
logger.debug(
"Processor received completion request: %s", request.model_dump_json()
)
vllm_request = self._create_vllm_request(request)
logger.debug("VLLM request: %s", vllm_request.model_dump_json())
if self.enable_disagg:
prefill_response = await self.send_request_to_prefill(vllm_request)
logger.debug("Prefill response: %s", prefill_response.model_dump_json())
else:
prefill_response = None
gen = self.send_request_to_decode(vllm_request, prefill_response)
async for res in self._stream_response(gen):
yield res
def _create_vllm_request(self, request: PreprocessedRequest) -> vLLMGenerateRequest:
request_id = str(uuid.uuid4().hex)
prompt = TokensPrompt(prompt_token_ids=request.token_ids)
sampling_params = SamplingParams(**self.default_sampling_params)
for key, value in request.sampling_options.model_dump().items():
if not value:
continue
if hasattr(sampling_params, key):
setattr(sampling_params, key, value)
max_tokens = request.stop_conditions.max_tokens
if max_tokens:
sampling_params.max_tokens = max_tokens
return vLLMGenerateRequest(
prompt=prompt,
sampling_params=sampling_params,
request_id=request_id,
)
async def _stream_response(self, gen: AsyncGenerator[MyRequestOutput, None]):
num_output_tokens_so_far = 0
async for res in gen:
logger.debug("Decode response: %s", res.model_dump_json())
# res is our MyRequestOutput
# This is the expected way for a request to end.
# The new token ID will be eos, don't forward it.
if res.finished:
yield {"finish_reason": "stop", "token_ids": []}
break
if not res.outputs:
yield {"finish_reason": "error", "token_ids": []}
break
output = res.outputs[0]
next_total_toks = len(output.token_ids)
out = {"token_ids": output.token_ids[num_output_tokens_so_far:]}
if output.finish_reason:
out["finish_reason"] = output.finish_reason
if output.stop_reason:
out["stop_reason"] = output.stop_reason
yield out
num_output_tokens_so_far = next_total_toks
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os
import signal
import socket
from typing import Optional
from utils.args import parse_vllm_args
from utils.protocol import MyRequestOutput, vLLMGenerateRequest
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
# Additional vLLM imports for DP worker
from vllm.usage.usage_lib import UsageContext
from vllm.utils import get_tcp_uri
from vllm.v1.engine.core import EngineCoreProc
from vllm.v1.engine.core_client import CoreEngineProcManager
from vllm.v1.executor.abstract import Executor
from dynamo.sdk import async_on_start, dynamo_context, endpoint, service
logger = logging.getLogger(__name__)
class VllmBaseWorker:
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
signal.signal(signal.SIGTERM, self.graceful_shutdown)
signal.signal(signal.SIGINT, self.graceful_shutdown)
self.set_side_channel_host_and_port()
async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args(
self.engine_args
)
if self._engine_context is not None:
self.engine_client = await self._engine_context.__aenter__()
else:
raise RuntimeError("Failed to initialize engine client")
logger.info("VllmWorker has been initialized")
def graceful_shutdown(self, signum, frame):
"""
Gracefully shutdown the worker by shutting down the dynamo runtime.
This will
1. disable the generate endpoint so no new requests are accepted.
2. wait until all in-flight requests are completed.
3. finish the awaiting for the endpoint service.
4. rely on python's garbage collection to clean up the GPU.
"""
logger.info("Shutting down dynamo runtime...")
dynamo_context["runtime"].shutdown()
logger.info("Dynamo runtime shutdown complete.")
def shutdown_vllm_worker(self, signum, frame):
"""Shutdown the worker immediately by killing the background loop"""
loop = asyncio.get_event_loop()
try:
self.engine_client.close()
logger.info("VllmWorker shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
finally:
loop.stop()
@endpoint()
async def generate(self, request: vLLMGenerateRequest):
gen = self.engine_client.generate(
prompt=request.prompt,
sampling_params=request.sampling_params,
request_id=request.request_id,
)
async for response in gen:
logger.debug(f"Response kv_transfer_params: {response.kv_transfer_params}")
yield MyRequestOutput(
request_id=response.request_id,
prompt=response.prompt,
prompt_token_ids=response.prompt_token_ids,
prompt_logprobs=response.prompt_logprobs,
outputs=response.outputs,
finished=response.finished,
metrics=response.metrics,
kv_transfer_params=response.kv_transfer_params,
).model_dump_json()
def set_side_channel_host_and_port(
self, hostname: Optional[str] = None, port: Optional[int] = None
):
"""vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
This sets the port number for the side channel.
"""
if hostname is None:
hostname = socket.gethostname()
if port is None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to a free port provided by the host.
port = s.getsockname()[1] # Get the port number assigned.
logger.debug("Setting VLLM_NIXL_SIDE_CHANNEL_HOST to %s", hostname)
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = hostname
logger.debug("Setting VLLM_NIXL_SIDE_CHANNEL_PORT to %s", port)
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port)
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
resources={"gpu": 1, "cpu": "10", "memory": "20Gi"},
workers=1,
)
class VllmPrefillWorker(VllmBaseWorker):
@async_on_start
async def async_init(self):
await super().async_init()
logger.info("VllmPrefillWorker has been initialized")
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
resources={"gpu": 1, "cpu": "10", "memory": "20Gi"},
workers=1,
)
class VllmDecodeWorker(VllmBaseWorker):
@async_on_start
async def async_init(self):
await super().async_init()
logger.info("VllmDecodeWorker has been initialized")
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
resources={"gpu": 1, "cpu": "10", "memory": "20Gi"},
workers=1,
)
class VllmDpWorker(VllmBaseWorker):
@async_on_start
async def async_init(self):
vllm_config = self.engine_args.create_engine_config(
usage_context=UsageContext.OPENAI_API_SERVER
)
parallel_config = vllm_config.parallel_config
local_engine_count = parallel_config.data_parallel_size_local
host = parallel_config.data_parallel_master_ip
port = self.engine_args.data_parallel_rpc_port # add to config too
handshake_address = get_tcp_uri(host, port)
self.engine_manager = CoreEngineProcManager(
target_fn=EngineCoreProc.run_engine_core,
local_engine_count=local_engine_count,
start_index=self.engine_args.data_parallel_start_rank,
local_start_index=0,
vllm_config=vllm_config,
on_head_node=False,
handshake_address=handshake_address,
executor_class=Executor.get_class(vllm_config),
log_stats=not self.engine_args.disable_log_stats,
)
def shutdown_vllm_engine(self, signum, frame):
"""Shutdown the engine manager"""
try:
self.engine_manager.join_first()
finally:
logger.info("Shutting down.")
self.engine_manager.close()
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
Frontend:
endpoint: dynamo.SimpleLoadBalancer.generate_agg
port: 8000
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
SimpleLoadBalancer:
enable_disagg: false
common-configs: [model, served_model_name]
VllmDecodeWorker:
enforce-eager: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, served_model_name]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1
served_model_name: deepseek-ai/DeepSeek-R1
Frontend:
endpoint: dynamo.SimpleLoadBalancer.generate
port: 8000
common-configs: [served_model_name]
SimpleLoadBalancer:
enable_disagg: false
common-configs: [model, served_model_name]
VllmDecodeWorker:
tensor_parallel_size: 16
ServiceArgs:
workers: 1
resources:
gpu: '16'
common-configs: [model, served_model_name]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1
served_model_name: deepseek-ai/DeepSeek-R1
# data parallel configuration, these value may need to be overridden
# depending on the deployment.
data_parallel_size: 16
data_parallel_size_local: 8
data_parallel_address: "localhost"
data_parallel_rpc_port: 13345
max-model-len: 10240
trust-remote-code: true
enable_expert_parallel: false
Frontend:
endpoint: dynamo.SimpleLoadBalancer.generate
port: 8000
common-configs: [served_model_name]
SimpleLoadBalancer:
enable_disagg: false
common-configs: [model, served_model_name]
VllmDecodeWorker:
ServiceArgs:
workers: 1
resources:
gpu: '8'
common-configs: [model, served_model_name, data_parallel_size, data_parallel_size_local, data_parallel_address, data_parallel_rpc_port, max-model-len, trust-remote-code, enable_expert_parallel]
VllmDpWorker:
# [NOTE] 'data_parallel_address' and 'data_parallel_start_rank' will be set differently
# depending on the DP worker counts and where the DP worker is located.
# See README.md for more details.
data_parallel_start_rank: 8
ServiceArgs:
workers: 1
resources:
gpu: '8'
common-configs: [model, served_model_name, data_parallel_size, data_parallel_size_local, data_parallel_address, data_parallel_rpc_port, max-model-len, trust-remote-code, enable_expert_parallel]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1
kv_transfer_config:
kv_connector: NixlConnector
kv_role: kv_both
served_model_name: deepseek-ai/DeepSeek-R1
Frontend:
endpoint: dynamo.SimpleLoadBalancer.generate
port: 8000
common-configs: [served_model_name]
SimpleLoadBalancer:
enable_disagg: true
common-configs: [model, kv_transfer_config, served_model_name]
VllmPrefillWorker:
tensor_parallel_size: 16
ServiceArgs:
workers: 1
resources:
gpu: '16'
common-configs: [model, kv_transfer_config, served_model_name]
VllmDecodeWorker:
tensor_parallel_size: 16
ServiceArgs:
workers: 1
resources:
gpu: '16'
common-configs: [model, kv_transfer_config, served_model_name]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1
kv_transfer_config:
kv_connector: NixlConnector
kv_role: kv_both
served_model_name: deepseek-ai/DeepSeek-R1
enable_expert_parallel: false
# data parallel configuration, these value may need to be overridden
# depending on the deployment.
data_parallel_size: 16
data_parallel_size_local: 8
data_parallel_address: 0.0.0.0
data_parallel_rpc_port: 13345
max-model-len: 10240
trust-remote-code: true
Frontend:
endpoint: dynamo.SimpleLoadBalancer.generate
port: 8000
common-configs: [served_model_name]
SimpleLoadBalancer:
enable_disagg: true
common-configs: [model, kv_transfer_config, served_model_name]
VllmPrefillWorker:
ServiceArgs:
workers: 1
resources:
gpu: '8'
common-configs: [model, kv_transfer_config, served_model_name, data_parallel_size, data_parallel_size_local, data_parallel_address, data_parallel_rpc_port, max-model-len, trust-remote-code, enable_expert_parallel]
VllmDecodeWorker:
ServiceArgs:
workers: 1
resources:
gpu: '8'
common-configs: [model, kv_transfer_config, served_model_name, data_parallel_size, data_parallel_size_local, data_parallel_address, data_parallel_rpc_port, max-model-len, trust-remote-code, enable_expert_parallel]
# VllmDpWorker is a special worker that is not part of the graph, and should be deployed separately
# depending on the DP configuration of the VllmPrefillWorker / VllmDecodeWorker
VllmDpWorker:
# [NOTE] 'data_parallel_address' and 'data_parallel_start_rank' will be set differently
# depending on the DP worker counts and where the DP worker is located.
# See README.md for more details.
data_parallel_start_rank: 8
ServiceArgs:
workers: 1
resources:
gpu: '8'
common-configs: [model, kv_transfer_config, served_model_name, data_parallel_size, data_parallel_size_local, data_parallel_address, data_parallel_rpc_port, max-model-len, trust-remote-code, enable_expert_parallel]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
kv-transfer-config: '{"kv_connector":"NixlConnector","kv_role":"kv_both"}'
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
Frontend:
endpoint: dynamo.SimpleLoadBalancer.generate_disagg
port: 8000
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
SimpleLoadBalancer:
enable_disagg: true
common-configs: [model, kv-transfer-config, served_model_name]
VllmPrefillWorker:
enforce-eager: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, kv-transfer-config, served_model_name]
VllmDecodeWorker:
enforce-eager: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, kv-transfer-config, served_model_name]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
kv-transfer-config: '{"kv_connector":"NixlConnector","kv_role":"kv_both"}'
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
Frontend:
endpoint: dynamo.SimpleLoadBalancer.generate_disagg
port: 8000
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
SimpleLoadBalancer:
enable_disagg: true
common-configs: [model, kv-transfer-config, served_model_name]
VllmPrefillWorker:
enforce-eager: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, kv-transfer-config, served_model_name]
VllmDecodeWorker:
enforce-eager: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, kv-transfer-config, served_model_name]
Prometheus:
global:
scrape_interval: 5s
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'frontend'
static_configs:
- targets: ['localhost:8000']
Planner:
backend: "vllm_v1"
adjustment-interval: 180
profile-results-dir: "/workspace/examples/profiling_results"
isl: 3000
osl: 150
ttft: 0.5
itl: 0.05
load-predictor: "arima"
\ No newline at end of file
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Running Deepseek R1 with Wide EP
Dynamo supports running Deepseek R1 with data parallel attention and wide expert parallelism. Each data parallel attention rank is a seperate dynamo component that will emit its own KV Events and Metrics. vLLM controls the expert parallelism using the flag `--enable-expert-parallel`
# Instructions
The following script can be adapted to run Deepseek R1 with a variety of different configuration. The current configuration uses 2 nodes, 16 GPUs, and a dp of 16. Follow the [ReadMe](README.md) Getting Started section on each node, and then run these two commands.
node 0
```bash
./launch/dsr1_dep.sh --num-nodes 2 --node-rank 0 --gpus-per-node 8 --master-addr <node 0 addr>
```
node 1
```bash
./launch/dsr1_dep.sh --num-nodes 2 --node-rank 1 --gpus-per-node 8 --master-addr <node 0 addr>
```
### Testing the Deployment
On node 0 (where the frontend was started) send a test request to verify your deployment:
```bash
curl localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "deepseek-ai/DeepSeek-R1",
"messages": [
{
"role": "user",
"content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden."
}
],
"stream": false,
"max_tokens": 30
}'
```
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from components.frontend import Frontend
from components.simple_load_balancer import SimpleLoadBalancer
from components.worker import VllmDecodeWorker
load_balancer = Frontend.link(SimpleLoadBalancer)
load_balancer.link(VllmDecodeWorker)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from components.frontend import Frontend
from components.simple_load_balancer import SimpleLoadBalancer
from components.worker import VllmDecodeWorker, VllmPrefillWorker
load_balancer = Frontend.link(SimpleLoadBalancer)
load_balancer.link(VllmPrefillWorker)
load_balancer.link(VllmDecodeWorker)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment