Commit b4aff959 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat(deploy): Add examples for dynamo serve (#173)


Co-authored-by: default avatarRyan McCormick <rmccormick@nvidia.com>
Co-authored-by: default avatarNeelay Shah <neelays@nvidia.com>
parent 177fb356
......@@ -16,9 +16,10 @@
from typing import Any
from bentoml import api # type: ignore
from bentoml import on_shutdown as async_on_shutdown
from bentoml._internal.context import server_context # type: ignore
from dynamo.sdk.lib.decorators import async_onstart, dynamo_api, dynamo_endpoint
from dynamo.sdk.lib.decorators import async_on_start, dynamo_api, dynamo_endpoint
from dynamo.sdk.lib.dependency import depends
from dynamo.sdk.lib.image import DYNAMO_IMAGE
from dynamo.sdk.lib.service import service
......@@ -26,13 +27,14 @@ from dynamo.sdk.lib.service import service
dynamo_context: dict[str, Any] = {}
__all__ = [
"DYNAMO_IMAGE",
"api",
"server_context",
"async_onstart",
"async_on_shutdown",
"async_on_start",
"depends",
"dynamo_api",
"dynamo_context",
"dynamo_endpoint",
"depends",
"DYNAMO_IMAGE",
"server_context",
"service",
"dynamo_context",
]
......@@ -169,7 +169,7 @@ def main(
logger.info(f"[{run_id}] Running startup hook: {name}")
result = getattr(class_instance, name)()
if inspect.isawaitable(result):
# await on startup hook async_onstart
# await on startup hook async_on_start
await result
logger.info(
f"[{run_id}] Completed async startup hook: {name}"
......
......@@ -94,7 +94,7 @@ def dynamo_api(func: t.Callable) -> t.Callable:
return bentoml.api(func)
def async_onstart(func: t.Callable) -> t.Callable:
def async_on_start(func: t.Callable) -> t.Callable:
"""Decorator for async onstart functions."""
# Mark the function as a startup hook
setattr(func, "__bentoml_startup_hook__", True)
......
......@@ -43,9 +43,8 @@ Users/Clients (HTTP)
1. Launch all three services using a single command -
```bash
cd /workspace/deploy/examples
dynamo hello_world.hello_world:Frontend
cd /workspace/deploy/examples/hello_world
dynamo serve hello_world:Frontend
```
2. Send request to frontend using curl -
......
......@@ -15,34 +15,49 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
## Prerequisites
# LLM Deployment Examples
Start required services (etcd and NATS):
This directory contains examples and reference implementations for deploying Large Language Models (LLMs) in various configurations.
Option A: Using [Docker Compose](/deploy/docker-compose.yml) (Recommended)
```bash
docker compose -f deploy/docker-compose.yml up -d
```
## Components
Option B: Manual Setup
- workers: Prefill and decode worker handles actual LLM inference
- router: Handles API requests and routes them to appropriate workers based on specified strategy
- frontend: OpenAI compatible http server handles incoming requests
- [NATS.io](https://docs.nats.io/running-a-nats-service/introduction/installation) server with [Jetstream](https://docs.nats.io/nats-concepts/jetstream)
- example: `nats-server -js --trace`
- [etcd](https://etcd.io) server
- follow instructions in [etcd installation](https://etcd.io/docs/v3.5/install/) to start an `etcd-server` locally
## Deployment Architectures
## Build docker
### Monolith
Single-instance deployment where both prefill and decode are done by the same worker.
### Disaggregated
Distributed deployment where prefill and decode are done by separate workers that can scale independently.
## Getting Started
1. Choose a deployment architecture based on your requirements
2. Configure the components as needed
3. Deploy using the provided scripts
### Prerequisites
Start required services (etcd and NATS) using [Docker Compose](/deploy/docker-compose.yml)
```bash
docker compose -f deploy/docker-compose.yml up -d
```
### Build docker
```
./container/build.sh
```
## Run container
### Run container
```
./container/run.sh -it
```
## Run deployment
## Run Deployment
This figure shows an overview of the major components to deploy:
......@@ -66,64 +81,39 @@ This figure shows an overview of the major components to deploy:
```
### Disaggregated vLLM deployment
Serve following components:
- processor: Processor routes the requests to the (decode) workers. Three scheduling strategies are supported: random and kv.
- kv router: The KV Router is a component that aggregates KV Events from all the workers and maintains
a prefix tree of the cached tokens. It makes decisions on which worker to route requests
to based on the length of the prefix match and the load on the workers.
- decode worker: runs on gpu = 0
- prefill worker: runs on gpu = 1
### Example architectures
#### Router based worker
```bash
cd /workspace/deploy/examples/vllm
dynamo serve disaggregated.processor:Processor \
--Processor.model=deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--Processor.tokenizer=deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--Processor.block-size=64 \
--Processor.max-model-len=16384 \
--Processor.router=kv \
--Router.min-workers=1 \
--Router.block-size=64 \
--Router.model-name=deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--VllmWorker.remote-prefill=true \
--VllmWorker.model=deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--VllmWorker.enforce-eager=true \
--VllmWorker.tensor-parallel-size=1 \
--VllmWorker.kv-transfer-config='{"kv_connector": "DynamoNixlConnector"}' \
--VllmWorker.block-size=64 \
--VllmWorker.max-num-batched-tokens=16384 \
--VllmWorker.max-model-len=16384 \
--VllmWorker.router=kv \
--VllmWorker.enable-prefix-caching=true \
--PrefillWorker.model=deepseek-ai/DeepSeek-R1-Distill-Llama-8B \
--PrefillWorker.enforce-eager=true \
--PrefillWorker.block-size=64 \
--PrefillWorker.max-model-len=16384 \
--PrefillWorker.max-num-batched-tokens=16384 \
--PrefillWorker.kv-transfer-config='{"kv_connector": "DynamoNixlConnector"}' \
--PrefillWorker.cuda-visible-device-offset=1
cd /workspace/deploy/examples/llm
dynamo serve monolith.router_based_deployment:Frontend -f ./configs/monolith/router_based_deployment.yaml
```
#### Routerless monolith
```bash
cd /workspace/deploy/examples/llm
dynamo serve monolith.routerless_deployment:Frontend -f ./configs/monolith/routerless_deployment.yaml
```
Add model to dynamo and start http server.
#### Router based disaggregated serving
```bash
cd /workspace/deploy/examples/llm
dynamo serve disaggregated.router_based_deployment:Frontend -f ./configs/disaggregated/router_based_deployment.yaml
```
llmctl http add chat-models deepseek-ai/DeepSeek-R1-Distill-Llama-8B dynamo-init.Processor.chat_completions
TRT_LOG=DEBUG http --port 8181
#### Routerless disaggregated serving
```bash
cd /workspace/deploy/examples/llm
dynamo serve disaggregated.routerless_deployment:Frontend -f ./configs/disaggregated/routerles_deployment.yaml
```
### Client
In another terminal:
```bash
# this test request has around 200 tokens isl
curl localhost:8181/v1/chat/completions -H "Content-Type: application/json" -d '{
curl localhost:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"messages": [
{
......@@ -134,12 +124,24 @@ curl localhost:8181/v1/chat/completions -H "Content-Type: application/json"
"stream":false,
"max_tokens": 30
}'
```
### Close deployment
Kill all python processes and clean up metadata files:
Kill all dynamo processes managed by circusd.
```
pkill -9 -f python
function kill_tree() {
local parent=$1
local children=$(ps -o pid= --ppid $parent)
for child in $children; do
kill_tree $child
done
echo "Killing process $parent"
kill -9 $parent
}
# kill process-tree of circusd
kill_tree $(pgrep circusd)
```
\ No newline at end of file
......@@ -16,8 +16,9 @@
import os
import subprocess
from disaggregated.processor import Processor
from disaggregated.worker import VllmWorker
from components.processor import Processor
from components.routerless.worker import VllmWorkerRouterLess
from components.worker import VllmWorker
from pydantic import BaseModel
from dynamo.sdk import depends, service
......@@ -38,6 +39,7 @@ class FrontendConfig(BaseModel):
# todo this should be called ApiServer
class Frontend:
worker = depends(VllmWorker)
worker_routerless = depends(VllmWorkerRouterLess)
processor = depends(Processor)
def __init__(self):
......
......@@ -20,12 +20,12 @@ import random
from argparse import Namespace
from typing import AsyncIterator
from disaggregated.worker import VllmWorker
from components.worker import VllmWorker
from utils.protocol import Tokens
from vllm.logger import logger as vllm_logger
from dynamo.llm import AggregatedMetrics, KvIndexer, KvMetricsAggregator, OverlapScores
from dynamo.sdk import async_onstart, depends, dynamo_context, dynamo_endpoint, service
from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service
from dynamo.sdk.lib.config import ServiceConfig
WorkerId = str
......@@ -83,7 +83,7 @@ class Router:
vllm_logger.info("Initializing Custom Router")
self.args = parse_args(self.__class__.__name__, "")
@async_onstart
@async_on_start
async def async_init(self):
self.runtime = dynamo_context["runtime"]
self.workers_client = (
......
......@@ -29,7 +29,7 @@ from vllm.logger import logger as vllm_logger
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from dynamo.sdk import (
async_onstart,
async_on_start,
dynamo_context,
dynamo_endpoint,
server_context,
......@@ -41,9 +41,6 @@ class RequestType(BaseModel):
text: str
os.environ["VLLM_LOG_LEVEL"] = "DEBUG"
@service(
dynamo={
"enabled": True,
......@@ -81,7 +78,7 @@ class PrefillWorker:
self.engine_args.enforce_eager = True
print("PrefillWorker initialized")
@async_onstart
@async_on_start
async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args(
self.engine_args
......
......@@ -13,13 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import uuid
from enum import Enum
from typing import AsyncIterator, Tuple, Union
from disaggregated.kv_router import Router
from disaggregated.worker import VllmWorker
from components.kv_router import Router
from components.worker import VllmWorker
from transformers import AutoTokenizer
from utils.chat_processor import ChatProcessor, CompletionsProcessor, ProcessMixIn
from utils.protocol import MyRequestOutput, Tokens, vLLMGenerateRequest
......@@ -32,8 +31,6 @@ from vllm.transformers_utils.tokenizer import AnyTokenizer
from dynamo.sdk import depends, dynamo_context, dynamo_endpoint, service
os.environ["VLLM_LOG_LEVEL"] = "DEBUG"
class RequestType(Enum):
CHAT = "chat"
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import msgspec
from utils.nixl import NixlMetadataStore
from utils.vllm import parse_vllm_args
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.inputs.data import TokensPrompt
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from dynamo.sdk import (
async_on_start,
dynamo_context,
dynamo_endpoint,
server_context,
service,
)
@service(
dynamo={
"enabled": True,
"namespace": "dynamo-init",
},
resources={"cpu": "10", "memory": "20Gi"},
workers=1,
)
class PrefillWorkerRouterLess:
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
gpu_idx = (
self.engine_args.cuda_visible_device_offset
+ server_context.worker_index
- 1
)
os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_idx}"
self._loaded_metadata = set()
self.initialized = False
if self.engine_args.enable_chunked_prefill is not False:
print("Chunked prefill is not supported yet, setting to False")
self.engine_args.enable_chunked_prefill = False
if self.engine_args.pipeline_parallel_size != 1:
print("Pipeline parallel size is not supported yet, setting to 1")
self.engine_args.pipeline_parallel_size = 1
if self.engine_args.disable_async_output_proc is not True:
print("Async output processing is not supported yet, setting to True")
self.engine_args.disable_async_output_proc = True
if self.engine_args.enforce_eager is not True:
print("Prefill must be done eagerly, setting to True")
self.engine_args.enforce_eager = True
print("PrefillWorkerRouterLess initialized")
@async_on_start
async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args(
self.engine_args
)
if self._engine_context is not None:
self.engine_client = await self._engine_context.__aenter__()
else:
raise RuntimeError("Failed to initialize engine client")
runtime = dynamo_context["runtime"]
metadata = self.engine_client.nixl_metadata
self._metadata_store = NixlMetadataStore("dynamo-init", runtime)
await self._metadata_store.put(metadata.engine_id, metadata)
@dynamo_endpoint()
async def generate(self, raw_request: str):
request: RemotePrefillRequest = msgspec.json.decode(
raw_request.encode("utf-8"), type=RemotePrefillRequest
)
sampling_params = request.sampling_params
sampling_params.max_tokens = 1
sampling_params.min_tokens = 1
remote_prefill_params = RemotePrefillParams(
is_remote_decode=True,
decode_block_ids=request.block_ids,
decode_engine_id=request.engine_id,
)
# TODO check if metadata has changed
# and reload - currently only loading once
if request.engine_id not in self._loaded_metadata:
remote_metadata = await self._metadata_store.get(request.engine_id)
await self.engine_client.add_remote_nixl_metadata(remote_metadata)
print(
f"Loaded nixl metadata from engine {request.engine_id} into engine {self.engine_client.nixl_metadata.engine_id}"
)
self._loaded_metadata.add(request.engine_id)
async for _ in self.engine_client.generate(
request_id=request.request_id,
prompt=TokensPrompt(prompt_token_ids=request.prompt_token_ids),
sampling_params=sampling_params,
remote_prefill_params=remote_prefill_params,
):
yield
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import msgspec
from components.routerless.prefill_worker import PrefillWorkerRouterLess
from utils.nixl import NixlMetadataStore
from utils.vllm import parse_vllm_args
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.entrypoints.openai.protocol import ChatCompletionRequest
from vllm.entrypoints.openai.serving_chat import OpenAIServingChat
from vllm.entrypoints.openai.serving_models import BaseModelPath, OpenAIServingModels
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from dynamo.sdk import (
async_on_shutdown,
async_on_start,
depends,
dynamo_context,
dynamo_endpoint,
service,
)
@service(
dynamo={
"enabled": True,
"namespace": "dynamo-init",
},
resources={"gpu": 1, "cpu": "10", "memory": "20Gi"},
workers=1,
)
class VllmWorkerRouterLess:
prefill_client = depends(PrefillWorkerRouterLess)
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_vllm_args(class_name, "")
self.do_remote_prefill = self.engine_args.remote_prefill
self.client = None
self.model_name = (
self.engine_args.served_model_name
if self.engine_args.served_model_name is not None
else "vllm"
)
if self.engine_args.remote_prefill:
if self.engine_args.enable_chunked_prefill is not False:
print("Chunked prefill is not supported yet, setting to False")
self.engine_args.enable_chunked_prefill = False
if self.engine_args.preemption_mode != "swap":
print("Preemption mode is not supported yet, setting to swap")
self.engine_args.preemption_mode = "swap"
if self.engine_args.pipeline_parallel_size != 1:
print("Pipeline parallel size is not supported yet, setting to 1")
self.engine_args.pipeline_parallel_size = 1
self.openai_serving_chat = None
self.initialized = False
print("VllmWorkerRouterLess initialized")
@async_on_start
async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args(
self.engine_args
)
if self._engine_context is not None:
self.engine_client = await self._engine_context.__aenter__()
else:
raise RuntimeError("Failed to initialize engine client")
runtime = dynamo_context["runtime"]
if self.engine_args.remote_prefill:
metadata = self.engine_client.nixl_metadata
metadata_store = NixlMetadataStore("dynamo-init", runtime)
await metadata_store.put(metadata.engine_id, metadata)
models = OpenAIServingModels(
engine_client=self.engine_client,
model_config=await self.engine_client.get_model_config(),
base_model_paths=[
BaseModelPath(
name=self.model_name,
model_path=self.model_name,
)
],
)
self.openai_serving_chat = OpenAIServingChat(
engine_client=self.engine_client,
model_config=await self.engine_client.get_model_config(),
models=models,
request_logger=None,
response_role="assistant",
chat_template=None,
chat_template_content_format="auto",
)
self.initialized = True
@async_on_shutdown
async def async_shutdown(self):
if self._engine_context is not None:
await self._engine_context.__aexit__(None, None, None)
print("VllmWorkerRouterLess shutting down")
def get_remote_prefill_request_callback(self):
async def callback(request: RemotePrefillRequest):
json_request = msgspec.json.encode(request).decode("utf-8")
async for _ in self.prefill_client.generate(json_request):
pass
return callback
@dynamo_endpoint()
async def generate(self, request: ChatCompletionRequest):
assert self.openai_serving_chat is not None
request.model = "vllm"
if self.do_remote_prefill:
remote_prefill_params = RemotePrefillParams(
is_remote_prefill=True,
remote_prefill_request_callback=self.get_remote_prefill_request_callback(),
)
else:
remote_prefill_params = None
async for raw_response in await self.openai_serving_chat.create_chat_completion(
request,
remote_prefill_params=remote_prefill_params,
):
if raw_response.startswith("data: [DONE]"):
break
response = json.loads(raw_response.lstrip("data: "))
yield response
......@@ -17,8 +17,8 @@
import asyncio
import os
from disaggregated.disagg_router import PyDisaggregatedRouter
from disaggregated.prefill_worker import PrefillWorker
from components.disagg_router import PyDisaggregatedRouter
from components.prefill_worker import PrefillWorker
from utils.nixl import NixlMetadataStore
from utils.prefill_queue import PrefillQueue
from utils.protocol import MyRequestOutput, vLLMGenerateRequest
......@@ -32,7 +32,7 @@ from vllm.sampling_params import RequestOutputKind
from dynamo.llm import KvMetricsPublisher
from dynamo.sdk import (
async_onstart,
async_on_start,
depends,
dynamo_context,
dynamo_endpoint,
......@@ -40,8 +40,6 @@ from dynamo.sdk import (
service,
)
os.environ["VLLM_LOG_LEVEL"] = "DEBUG"
@service(
dynamo={
......@@ -101,7 +99,7 @@ class VllmWorker:
os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_idx}"
self.metrics_publisher = KvMetricsPublisher()
@async_onstart
@async_on_start
async def async_init(self):
self._engine_context = build_async_engine_client_from_engine_args(
self.engine_args
......
......@@ -13,22 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
Frontend:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo-init.VllmWorker.generate
endpoint: dynamo-init.VllmWorkerRouterLess.generate
port: 8000
VllmWorker:
VllmWorkerRouterLess:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
enforce-eager: true
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
block-size: 64
max-model-len: 16384
max-num-batched-tokens: 16384
conditional-disagg: true
remote-prefill: true
ServiceArgs:
workers: 1
PrefillWorker:
PrefillWorkerRouterLess:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
enforce-eager: true
kv-transfer-config: '{"kv_connector":"DynamoNixlConnector"}'
......@@ -36,3 +39,5 @@ PrefillWorker:
max-model-len: 16384
max-num-batched-tokens: 16384
cuda-visible-device-offset: 1
ServiceArgs:
workers: 1
......@@ -37,7 +37,6 @@ VllmWorker:
max-num-batched-tokens: 16384
enable-prefix-caching: true
router: kv
remote-prefill: true
tensor-parallel-size: 1
ServiceArgs:
workers: 2
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Frontend:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo-init.VllmWorkerRouterLess.generate
port: 8000
VllmWorkerRouterLess:
model: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
enforce-eager: true
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment