Unverified Commit 7cd0d680 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: add dynamo-run example for vllm v0 (#1186)

parent 9d944c27
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# vLLM Deployment Examples
This directory contains examples for deploying vLLM (v0) models in both aggregated and disaggregated configurations.
> [!NOTE]
> Different than `/examples/llm`, this example uses `dynamo-run` to handle the (de)tokenization and routing. `dynamo-run` is a rust-based CLI designed for high-performance pre/post-processing and routing. Read more about `dynamo-run`: [dynamo_run.md](../docs/guides/dynamo_run.md).
## Prerequisites
Start required services (etcd and NATS) using [Docker Compose](../../deploy/metrics/docker-compose.yml)
```bash
docker compose -f deploy/metrics/docker-compose.yml up -d
```
### Build and run docker
```bash
# On an x86 machine
./container/build.sh --framework vllm
# On an ARM machine (ex: GB200)
./container/build.sh --framework vllm --platform linux/arm64
./container/run.sh -it --framework vllm
```
> [!WARNING]
> Starting the container not in `--privileged` mode might result in significant CPU bottlenecks. Please turn on `--privileged` if you experience any performance issues.
## Run Deployment
```bash
# aggregated
cd $DYNAMO_HOME/examples/vllm_v0
dynamo serve graphs.agg:Frontend -f ./configs/agg.yaml
# aggregated with kv
cd $DYNAMO_HOME/examples/vllm_v0
dynamo serve graphs.agg:Frontend -f ./configs/agg_kv.yaml
# disaggregated
cd $DYNAMO_HOME/examples/vllm_v0
dynamo serve graphs.disagg:Frontend -f ./configs/disagg.yaml
# disaggregated with kv
cd $DYNAMO_HOME/examples/vllm_v0
dynamo serve graphs.disagg:Frontend -f ./configs/disagg_kv.yaml
```
## Client
```bash
# this test request has around 200 tokens isl
curl localhost:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"messages": [
{
"role": "user",
"content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden."
}
],
"stream":false,
"max_tokens": 30
}'
```
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from dynamo.runtime import EtcdKvCache
from dynamo.sdk import dynamo_context
logger = logging.getLogger(__name__)
class PyDisaggregatedRouter:
def __init__(
self,
runtime,
namespace,
max_local_prefill_length=1000,
max_prefill_queue_size=2,
):
self.runtime = runtime
self.namespace = namespace
self.max_local_prefill_length = max_local_prefill_length
self.max_prefill_queue_size = max_prefill_queue_size
async def async_init(self):
runtime = dynamo_context["runtime"]
self.etcd_kv_cache = await EtcdKvCache.create(
runtime.etcd_client(),
f"/{self.namespace}/disagg_router/",
{
"max_local_prefill_length": str(self.max_local_prefill_length),
"max_prefill_queue_size": str(self.max_prefill_queue_size),
},
)
async def prefill_remote(
self, prompt_length: int, prefix_hit_rate: float, queue_size: int
):
max_local_prefill_length = int(
await self.etcd_kv_cache.get("max_local_prefill_length")
)
max_prefill_queue_size = int(
await self.etcd_kv_cache.get("max_prefill_queue_size")
)
absolute_prefill_length = int(prompt_length * (1 - prefix_hit_rate))
# TODO: consider size of each request in the queue when making the decision
decision = (
absolute_prefill_length > max_local_prefill_length
and queue_size < max_prefill_queue_size
)
logger.info(
f"Remote prefill: {decision} (prefill length: {absolute_prefill_length}/{max_local_prefill_length}, prefill queue size: {queue_size}/{max_prefill_queue_size})"
)
return decision
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import subprocess
from pathlib import Path
from components.worker import VllmWorker
from fastapi import FastAPI
from pydantic import BaseModel
from dynamo import sdk
from dynamo.sdk import depends, service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
logger = logging.getLogger(__name__)
def get_dynamo_run_binary():
"""Find the dynamo-run binary path in SDK or fallback to 'dynamo-run' command."""
sdk_path = Path(sdk.__file__)
binary_path = sdk_path.parent / "cli/bin/dynamo-run"
if not binary_path.exists():
return "dynamo-run"
else:
return str(binary_path)
class FrontendConfig(BaseModel):
"""Configuration for the Frontend service including model and HTTP server settings."""
served_model_name: str
endpoint: str
port: int = 8080
router: str = "round-robin"
block_size: int = 64
# todo this should be called ApiServer
@service(
dynamo={
"namespace": "dynamo",
},
workers=1,
image=DYNAMO_IMAGE,
app=FastAPI(title="LLM Example"),
)
class Frontend:
worker = depends(VllmWorker)
def __init__(self):
"""Initialize Frontend service with HTTP server and model configuration."""
config = ServiceConfig.get_instance()
self.frontend_config = FrontendConfig(**config.get("Frontend", {}))
self.process = None
self.start_ingress_and_processor()
def start_ingress_and_processor(self):
"""Starting dynamo-run based ingress and processor"""
logger.info(
f"Starting HTTP server and processor on port {self.frontend_config.port}"
)
dynamo_run_binary = get_dynamo_run_binary()
endpoint = f"dyn://{self.frontend_config.endpoint}"
cmd = [
dynamo_run_binary,
"in=http",
f"out={endpoint}",
"--http-port",
str(self.frontend_config.port),
"--router-mode",
self.frontend_config.router,
]
self.process = subprocess.Popen(
cmd,
stdout=None,
stderr=None,
)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os
import signal
import sys
from pydantic import BaseModel
from utils.nixl import NixlMetadataStore
from utils.prefill_queue import PrefillQueue
from utils.vllm import parse_vllm_args
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.inputs.data import TokensPrompt
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from dynamo.sdk import async_on_start, dynamo_context, endpoint, service
logger = logging.getLogger(__name__)
class RequestType(BaseModel):
text: str
@service(
dynamo={
"namespace": "dynamo",
},
resources={"gpu": 1, "cpu": "10", "memory": "20Gi"},
workers=1,
)
class PrefillWorker:
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
self._loaded_metadata = set()
self.initialized = False
if self.engine_args.enable_chunked_prefill is not False:
logger.info("Chunked prefill is not supported yet, setting to False")
self.engine_args.enable_chunked_prefill = False
if self.engine_args.pipeline_parallel_size != 1:
logger.info("Pipeline parallel size is not supported yet, setting to 1")
self.engine_args.pipeline_parallel_size = 1
if self.engine_args.disable_async_output_proc is not True:
logger.info("Async output processing is not supported yet, setting to True")
self.engine_args.disable_async_output_proc = True
if self.engine_args.enforce_eager is not True:
logger.info("Prefill must be done eagerly, setting to True")
self.engine_args.enforce_eager = True
if self.engine_args.enable_prefix_caching is not False:
logger.info(
"Prefix caching is not supported yet in prefill worker, setting to False"
)
self.engine_args.enable_prefix_caching = False
@async_on_start
async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args(
self.engine_args
)
if self._engine_context is not None:
self.engine_client = await self._engine_context.__aenter__()
else:
raise RuntimeError("Failed to initialize engine client")
runtime = dynamo_context["runtime"]
metadata = self.engine_client.nixl_metadata
self._metadata_store = NixlMetadataStore("dynamo", runtime)
await self._metadata_store.put(metadata.engine_id, metadata)
self.task = asyncio.create_task(self.prefill_queue_handler())
def prefill_queue_handler_cb(fut):
try:
fut.result()
logger.info("prefill queue handler exited successfully")
except Exception as e:
logger.error(f"[ERROR] prefill queue handler failed: {e!r}")
sys.exit(1)
self.task.add_done_callback(prefill_queue_handler_cb)
self.shutdown_requested = False
# Set up signal handler for graceful shutdown
# TODO: move to dynamo sdk
loop = asyncio.get_running_loop()
def signal_handler():
# Schedule the shutdown coroutine instead of calling it directly
asyncio.create_task(self.graceful_shutdown(runtime))
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
logger.info("PrefillWorker initialized")
async def graceful_shutdown(self, runtime):
logger.info("Received shutdown signal, shutting down DistributedRuntime")
# first shutdown the vllm engine
self.shutdown_requested = True
await asyncio.wait_for(self.task, timeout=None)
# then shutdown the mock endpoint
runtime.shutdown()
logger.info("DistributedRuntime shutdown complete")
def shutdown_vllm_engine(self):
"""Shutdown the background loop"""
logger.info("Shutting down vllm engine")
loop = asyncio.get_event_loop()
try:
self.engine_client.close()
logger.info("PrefillWorker shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
finally:
loop.stop()
async def prefill_queue_handler(self):
logger.info("Prefill queue handler entered")
prefill_queue_nats_server = os.getenv("NATS_SERVER", "nats://localhost:4222")
namespace, _ = PrefillWorker.dynamo_address() # type: ignore
prefill_queue_stream_name = f"{namespace}_prefill_queue"
logger.info(
f"Prefill queue: {prefill_queue_nats_server}:{prefill_queue_stream_name}"
)
self.initialized = True
# TODO: integrate prefill_queue to a dynamo endpoint
async with PrefillQueue.get_instance(
nats_server=prefill_queue_nats_server,
stream_name=prefill_queue_stream_name,
) as prefill_queue:
logger.info("prefill queue handler started")
while True:
# TODO: this might add a small overhead to pull prefill from nats
# need to test and check how much overhead it is
prefill_request = await prefill_queue.dequeue_prefill_request()
if prefill_request is not None:
logger.info(
f"Dequeued prefill request: {prefill_request.request_id}"
)
async for _ in self.generate(prefill_request):
pass
if self.shutdown_requested:
logger.info(
"Shutdown requested, checking if engine has any pending prefill sending requests"
)
while True:
if not await self.engine_client.has_unfinished_requests():
break
logger.info(
"Engine has pending prefill sending requests, rechecking in 1 second..."
)
await asyncio.sleep(1)
self.shutdown_vllm_engine()
break
async def generate(self, request: RemotePrefillRequest):
sampling_params = request.sampling_params
sampling_params.max_tokens = 1
sampling_params.min_tokens = 1
remote_prefill_params = RemotePrefillParams(
is_remote_decode=True,
decode_block_ids=request.block_ids,
decode_engine_id=request.engine_id,
decode_computed_block_ids=request.computed_block_ids,
)
# TODO check if metadata has changed
# and reload - currently only loading once
if request.engine_id not in self._loaded_metadata:
remote_metadata = await self._metadata_store.get(request.engine_id)
await self.engine_client.add_remote_nixl_metadata(remote_metadata)
logger.info(
f"Loaded nixl metadata from engine {request.engine_id} into "
f"engine {self.engine_client.nixl_metadata.engine_id}"
)
self._loaded_metadata.add(request.engine_id)
async for _ in self.engine_client.generate(
request_id=request.request_id,
prompt=TokensPrompt(prompt_token_ids=request.prompt_token_ids),
sampling_params=sampling_params,
remote_prefill_params=remote_prefill_params,
):
yield
@endpoint()
async def mock(self, req: RequestType):
yield f"mock_response: {req}"
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import os
import signal
import uuid
from components.disagg_router import PyDisaggregatedRouter
from components.prefill_worker import PrefillWorker
from utils.nixl import NixlMetadataStore
from utils.prefill_queue import PrefillQueue
from utils.protocol import PreprocessedRequest
from utils.vllm import RouterType, parse_vllm_args
from vllm import SamplingParams
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.inputs import TokensPrompt
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from vllm.sampling_params import RequestOutputKind
from dynamo.llm import KvMetricsPublisher, ModelType, register_llm
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service
logger = logging.getLogger(__name__)
@service(
dynamo={
"namespace": "dynamo",
},
resources={"gpu": 1, "cpu": "10", "memory": "20Gi"},
workers=1,
)
class VllmWorker:
prefill_worker = depends(PrefillWorker)
def __init__(self):
self.client = None
self.disaggregated_router: PyDisaggregatedRouter = None # type: ignore
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
self.do_remote_prefill = self.engine_args.remote_prefill
self._prefill_queue_nats_server = os.getenv(
"NATS_SERVER", "nats://localhost:4222"
)
self.namespace, _ = VllmWorker.dynamo_address() # type: ignore
self._prefill_queue_stream_name = f"{self.namespace}_prefill_queue"
logger.info(
f"Prefill queue: {self._prefill_queue_nats_server}:{self._prefill_queue_stream_name}"
)
if self.engine_args.remote_prefill:
if self.engine_args.enable_chunked_prefill is not False:
logger.info("Chunked prefill is not supported yet, setting to False")
self.engine_args.enable_chunked_prefill = False
if self.engine_args.preemption_mode != "swap":
logger.info("Preemption mode is not supported yet, setting to swap")
self.engine_args.preemption_mode = "swap"
if self.engine_args.pipeline_parallel_size != 1:
logger.info("Pipeline parallel size is not supported yet, setting to 1")
self.engine_args.pipeline_parallel_size = 1
if self.engine_args.router == RouterType.KV:
if not self.engine_args.enable_prefix_caching:
logger.info(
"When using KV router, prefix caching must be enabled, setting to True"
)
self.engine_args.enable_prefix_caching = True
VLLM_WORKER_ID = dynamo_context["endpoints"][0].lease_id()
os.environ["VLLM_WORKER_ID"] = str(VLLM_WORKER_ID)
os.environ["VLLM_KV_NAMESPACE"] = "dynamo"
os.environ["VLLM_KV_COMPONENT"] = class_name
self.metrics_publisher = KvMetricsPublisher()
model_config = self.engine_args.create_model_config()
self.default_sampling_params = model_config.get_diff_sampling_param()
signal.signal(signal.SIGTERM, self.shutdown_vllm_engine)
signal.signal(signal.SIGINT, self.shutdown_vllm_engine)
@async_on_start
async def async_init(self):
runtime = dynamo_context["runtime"]
logger.info("Registering LLM for discovery")
comp_ns, comp_name = VllmWorker.dynamo_address() # type: ignore
endpoint = runtime.namespace(comp_ns).component(comp_name).endpoint("generate")
print(endpoint)
await register_llm(
ModelType.Backend,
endpoint,
self.engine_args.model,
self.engine_args.served_model_name,
kv_cache_block_size=self.engine_args.block_size,
)
self._engine_context = build_async_engine_client_from_engine_args(
self.engine_args
)
if self._engine_context is not None:
self.engine_client = await self._engine_context.__aenter__()
else:
raise RuntimeError("Failed to initialize engine client")
self.engine_client.set_metrics_publisher(self.metrics_publisher)
# Initially send dummy metrics to kick start,
# vLLM will not update stat until forward pass is triggered
self.metrics_publisher.publish(
0, # request_active_slots
1024, # request_total_slots
0, # kv_active_blocks
1024, # kv_total_blocks
0, # num_requests_waiting
0.0, # gpu_cache_usage_perc
0.0, # gpu_prefix_cache_hit_rate
)
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback(
lambda _: logger.info("metrics publisher endpoint created")
)
runtime = dynamo_context["runtime"]
if self.engine_args.remote_prefill:
metadata = self.engine_client.nixl_metadata
metadata_store = NixlMetadataStore("dynamo", runtime)
await metadata_store.put(metadata.engine_id, metadata)
if self.engine_args.conditional_disagg:
self.disaggregated_router = PyDisaggregatedRouter(
runtime,
self.namespace,
max_local_prefill_length=self.engine_args.max_local_prefill_length,
max_prefill_queue_size=self.engine_args.max_prefill_queue_size,
)
await self.disaggregated_router.async_init()
else:
self.disaggregated_router = None
# Set up signal handler for graceful shutdown
# TODO: move to dynamo sdk
loop = asyncio.get_running_loop()
def signal_handler():
# Schedule the shutdown coroutine instead of calling it directly
asyncio.create_task(self.graceful_shutdown(runtime))
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
logger.info("VllmWorker has been initialized")
async def graceful_shutdown(self, runtime):
logger.info("Received shutdown signal, shutting down DistributedRuntime")
runtime.shutdown()
logger.info("DistributedRuntime shutdown complete")
def shutdown_vllm_engine(self, signum, frame):
"""Shutdown the background loop"""
logger.info(f"Received signal {signum}, shutting down")
loop = asyncio.get_event_loop()
try:
self.engine_client.close()
logger.info("VllmWorker shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
finally:
loop.stop()
async def create_metrics_publisher_endpoint(self):
component = dynamo_context["component"]
logger.info("Creating metrics publisher endpoint with primary lease")
await self.metrics_publisher.create_endpoint(component)
def get_remote_prefill_request_callback(self):
# TODO: integrate prefill_queue to dynamo endpoint
async def callback(request: RemotePrefillRequest):
async with PrefillQueue.get_instance(
nats_server=self._prefill_queue_nats_server,
stream_name=self._prefill_queue_stream_name,
) as prefill_queue:
await prefill_queue.enqueue_prefill_request(request)
return callback
@endpoint()
async def generate(self, request: PreprocessedRequest):
request_id = str(uuid.uuid4())
if self.disaggregated_router is not None:
async with PrefillQueue.get_instance(
nats_server=self._prefill_queue_nats_server,
stream_name=self._prefill_queue_stream_name,
) as prefill_queue:
prefill_queue_size = await prefill_queue.get_queue_size()
disagg_router_decision = await self.disaggregated_router.prefill_remote(
len(request.token_ids),
0, # TODO: return prefix hit rate from dynamo-run router
prefill_queue_size,
)
else:
# always prefill remotely if no disaggregated router is provided
disagg_router_decision = True
if self.do_remote_prefill and disagg_router_decision:
remote_prefill_params = RemotePrefillParams(
is_remote_prefill=True,
remote_prefill_request_callback=self.get_remote_prefill_request_callback(),
)
logger.info(
f"Prefilling remotely for request {request_id} with length {len(request.token_ids)}"
)
else:
remote_prefill_params = None
logger.info(
f"Prefilling locally for request {request_id} with length {len(request.token_ids)}"
)
sampling_params = SamplingParams(**self.default_sampling_params)
sampling_params.output_kind = RequestOutputKind.DELTA
if request.sampling_options.temperature:
sampling_params.temperature = request.sampling_options.temperature
if request.sampling_options.top_p:
sampling_params.top_p = request.sampling_options.top_p
if request.sampling_options.top_k:
sampling_params.top_k = request.sampling_options.top_k
sampling_params.max_tokens = request.stop_conditions.max_tokens
if request.stop_conditions.ignore_eos:
sampling_params.ignore_eos = request.stop_conditions.ignore_eos
async for response in self.engine_client.generate(
prompt=TokensPrompt(prompt_token_ids=request.token_ids),
sampling_params=sampling_params,
request_id=request_id,
remote_prefill_params=remote_prefill_params,
):
if response.finished:
yield {"finish_reason": "stop", "token_ids": []}
break
if not response.outputs:
yield {"finish_reason": "error", "token_ids": []}
break
output = response.outputs[0]
out = {"token_ids": output.token_ids}
if output.finish_reason:
out["finish_reason"] = output.finish_reason
if output.stop_reason:
out["stop_reason"] = output.stop_reason
yield out
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.VllmWorker.generate
port: 8000
router: round-robin
common-configs: [block-size]
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len]
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.VllmWorker.generate
port: 8000
router: kv
common-configs: [block-size]
VllmWorker:
enforce-eager: true
max-num-batched-tokens: 16384
enable-prefix-caching: true
ServiceArgs:
workers: 1
resources:
gpu: '1'
common-configs: [model, block-size, max-model-len]
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.VllmWorker.generate
port: 8000
router: round-robin
common-configs: [block-size]
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
enable-prefix-caching: true
ServiceArgs:
workers: 1
resources:
gpu: 1
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
ServiceArgs:
workers: 1
resources:
gpu: 1
common-configs: [model, block-size, max-model-len, kv-transfer-config]
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Common:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
block-size: 64
max-model-len: 16384
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.VllmWorker.generate
port: 8000
router: kv
common-configs: [block-size]
VllmWorker:
remote-prefill: true
conditional-disagg: true
max-local-prefill-length: 10
max-prefill-queue-size: 2
enable-prefix-caching: true
ServiceArgs:
workers: 1
resources:
gpu: 1
common-configs: [model, block-size, max-model-len, kv-transfer-config]
PrefillWorker:
max-num-batched-tokens: 16384
ServiceArgs:
workers: 1
resources:
gpu: 1
common-configs: [model, block-size, max-model-len, kv-transfer-config]
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from components.frontend import Frontend
from components.worker import VllmWorker
Frontend.link(VllmWorker)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from components.frontend import Frontend
from components.prefill_worker import PrefillWorker
from components.worker import VllmWorker
Frontend.link(VllmWorker).link(PrefillWorker)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from contextlib import asynccontextmanager
from typing import ClassVar, Optional
from dynamo._core import NatsQueue
class NATSQueue:
_instance: ClassVar[Optional["NATSQueue"]] = None
_lock: ClassVar[asyncio.Lock] = asyncio.Lock()
def __init__(
self,
stream_name: str = "default",
nats_server: str = "nats://localhost:4222",
dequeue_timeout: float = 1,
):
self.nats_q = NatsQueue(stream_name, nats_server, dequeue_timeout)
@classmethod
@asynccontextmanager
async def get_instance(
cls,
*,
stream_name: str = "default",
nats_server: str = "nats://localhost:4222",
dequeue_timeout: float = 1,
):
"""Get or create a singleton instance of NATSq"""
# TODO: check if this _lock is needed with GIL
async with cls._lock:
if cls._instance is None:
cls._instance = cls(
stream_name=stream_name,
nats_server=nats_server,
dequeue_timeout=dequeue_timeout,
)
await cls._instance.connect()
try:
yield cls._instance
except Exception:
if cls._instance:
await cls._instance.close()
cls._instance = None
raise
# TODO: check to see if this can be replaced by something like get_instance().close()
@classmethod
async def shutdown(cls):
"""Explicitly close the singleton instance if it exists"""
async with cls._lock:
if cls._instance:
await cls._instance.close()
cls._instance = None
async def connect(self):
await self.nats_q.connect()
async def ensure_connection(self):
await self.nats_q.ensure_connection()
async def close(self):
await self.nats_q.close()
# TODO: is enqueue/dequeue_object a better name for a general queue?
async def enqueue_task(self, task_data: bytes) -> None:
await self.nats_q.enqueue_task(task_data)
async def dequeue_task(self, timeout: Optional[float] = None) -> Optional[bytes]:
return await self.nats_q.dequeue_task(timeout)
async def get_queue_size(self) -> int:
return await self.nats_q.get_queue_size()
async def clear_queue(self) -> int:
try:
cleared_count = 0
# Continue until we can't dequeue any more messages
while True:
# use a small timeout
message = await self.dequeue_task(timeout=0.1)
if message is None:
break
cleared_count += 1
return cleared_count
except Exception as e:
raise RuntimeError(f"Failed to clear queue: {e}")
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from contextlib import contextmanager
import msgspec
from vllm.distributed.device_communicators.nixl import NixlMetadata
from dynamo.runtime import DistributedRuntime
METADATA_DIR = "/tmp/nixl"
logger = logging.getLogger(__name__)
@contextmanager
def temp_metadata_file(engine_id, metadata: NixlMetadata):
os.makedirs(METADATA_DIR, exist_ok=True)
path = f"{METADATA_DIR}/{engine_id}.nixl_meta"
with open(path, "wb") as f:
encoded = msgspec.msgpack.encode(metadata)
logger.info(f"Size of encoded metadata: {len(encoded)}")
f.write(encoded)
try:
yield path
finally:
if os.path.exists(path):
os.remove(path)
def find_remote_metadata(engine_id):
# find and load metadata from METADATA_DIR that do not match engine_id
remote_metadata = []
for file in os.listdir(METADATA_DIR):
if file.endswith(".nixl_meta"):
if file.split(".")[0] != engine_id:
with open(os.path.join(METADATA_DIR, file), "rb") as f:
remote_metadata.append(
msgspec.msgpack.decode(f.read(), type=NixlMetadata)
)
return remote_metadata
class NixlMetadataStore:
NIXL_METADATA_KEY = "nixl_metadata"
def __init__(self, namespace: str, runtime: DistributedRuntime) -> None:
self._namespace = namespace
# TODO Remove metadata from etcd on delete
self._stored: set[str] = set()
self._cached: dict[str, NixlMetadata] = {}
self._client = runtime.etcd_client()
if self._client is None:
raise Exception("Cannot be used with static workers")
self._key_prefix = f"{self._namespace}/{NixlMetadataStore.NIXL_METADATA_KEY}"
async def put(self, engine_id, metadata: NixlMetadata):
serialized_metadata = msgspec.msgpack.encode(metadata)
key = "/".join([self._key_prefix, engine_id])
# create with primary lease so that the kv entry will be deleted when the worker shutdowns
try:
# TODO: should we create a series of function in etcd client to use primary lease?
await self._client.kv_create_or_validate(
key, serialized_metadata, self._client.primary_lease_id()
)
except Exception as e:
logger.warning(f"A different metadata exists for engine {engine_id}: {e}")
self._stored.add(engine_id)
async def get(self, engine_id) -> NixlMetadata:
try:
if engine_id in self._cached:
return self._cached[engine_id]
key = "/".join([self._key_prefix, engine_id])
key_values = await self._client.kv_get_prefix(key)
deserialized_metadata = None
for item in key_values:
deserialized_metadata = msgspec.msgpack.decode(
item["value"], type=NixlMetadata
)
break
if deserialized_metadata is None:
raise Exception("metadata not found in etcd")
self._cached[engine_id] = deserialized_metadata
# TODO watch for changes and update cache
# self._client.add_watch_callback(
# key,
# self._watch_callback,
# )
except Exception as e:
raise Exception("Error retrieving metadata for engine {engine_id}") from e
return deserialized_metadata
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import msgspec
from utils.nats_queue import NATSQueue
from vllm.remote_prefill import RemotePrefillRequest
class PrefillQueue(NATSQueue):
"""
A wrapper of NATSQueue for PrefillRequest.
The stream name is forced to be "prefill_queue".
"""
def __init__(
self,
stream_name="prefill_queue",
nats_server: str = "nats://localhost:4222",
dequeue_timeout: float = 1,
):
super().__init__(
stream_name=stream_name,
nats_server=nats_server,
dequeue_timeout=dequeue_timeout,
)
async def enqueue_prefill_request(
self, prefill_request: RemotePrefillRequest
) -> None:
encoded_request = msgspec.json.encode(prefill_request)
await self.enqueue_task(encoded_request)
async def dequeue_prefill_request(self) -> Optional[RemotePrefillRequest]:
encoded_request = await self.dequeue_task()
if encoded_request is not None:
prefill_request = msgspec.json.decode(
encoded_request, type=RemotePrefillRequest
)
return prefill_request
else:
return None
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
from pydantic import BaseModel, Field
TokenIdType = int
# TODO: move these to common for all LLMs once we adopt dynamo-run
# derived from lib/llm/src/protocols/common/preprocessor.rs
class StopConditions(BaseModel):
max_tokens: Optional[int] = None
stop: Optional[List[str]] = None
stop_token_ids_hidden: Optional[List[TokenIdType]] = None
min_tokens: Optional[int] = None
ignore_eos: Optional[bool] = None
class SamplingOptions(BaseModel):
n: Optional[int] = None
best_of: Optional[int] = None
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
repetition_penalty: Optional[float] = None
temperature: Optional[float] = None
top_p: Optional[float] = None
top_k: Optional[int] = None
min_p: Optional[float] = None
use_beam_search: Optional[bool] = None
length_penalty: Optional[float] = None
seed: Optional[int] = None
class PreprocessedRequest(BaseModel):
token_ids: List[TokenIdType]
stop_conditions: StopConditions
sampling_options: SamplingOptions
eos_token_ids: List[TokenIdType] = Field(default_factory=list)
mdc_sum: Optional[str] = None
annotations: List[str] = Field(default_factory=list)
class DisaggPreprocessedRequest(BaseModel):
request: PreprocessedRequest
sampling_params: dict
bootstrap_host: str
bootstrap_port: int
bootstrap_room: int
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: rename to avoid ambiguity with vllm package
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser
from dynamo.sdk.lib.config import ServiceConfig
class RouterType:
RANDOM = "random"
ROUND_ROBIN = "round-robin"
KV = "kv"
KV_LOAD = "kv-load"
def parse_vllm_args(service_name, prefix) -> AsyncEngineArgs:
config = ServiceConfig.get_instance()
vllm_args = config.as_args(service_name, prefix=prefix)
parser = FlexibleArgumentParser()
parser.add_argument(
"--router",
type=str,
choices=[
RouterType.RANDOM,
RouterType.ROUND_ROBIN,
RouterType.KV,
RouterType.KV_LOAD,
],
default=RouterType.RANDOM,
help="Router type to use for scheduling requests to workers",
)
parser.add_argument(
"--router-num-threads",
type=int,
default=4,
help="Number of threads to use for the router to process the requests",
)
parser.add_argument(
"--remote-prefill", action="store_true", help="Enable remote prefill"
)
parser.add_argument(
"--conditional-disagg",
action="store_true",
help="Use disaggregated router to decide whether to prefill locally or remotely",
)
parser.add_argument(
"--max-local-prefill-length",
type=int,
default=1000,
help="Maximum length for local prefill. If remote prefill is enabled and the prefill length is greater than this value the request will be sent for remote prefill, otherwise prefill phase will run locally.",
)
parser.add_argument(
"--max-prefill-queue-size",
type=int,
default=3,
help="Maximum queue size for remote prefill. If the prefill queue size is greater than this value, prefill phase of the incoming request will be executed locally.",
)
parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args(vllm_args)
engine_args = AsyncEngineArgs.from_cli_args(args)
engine_args.router = args.router
engine_args.router_num_threads = args.router_num_threads
engine_args.remote_prefill = args.remote_prefill
engine_args.conditional_disagg = args.conditional_disagg
engine_args.max_local_prefill_length = args.max_local_prefill_length
engine_args.max_prefill_queue_size = args.max_prefill_queue_size
return engine_args
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment