Unverified Commit 6642e23e authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: sglang to 0.5.9 + updated docs (#6518)


Co-authored-by: default avatarbaihuitian <baihuitian.bht@gmail.com>
Co-authored-by: default avatarCursor <cursoragent@cursor.com>
parent 1df620b4
......@@ -379,7 +379,7 @@ For Slurm or other distributed deployments (and KV-aware routing):
To quickly setup both: `docker compose -f deploy/docker-compose.yml up -d`
See [SGLang on Slurm](examples/backends/sglang/slurm_jobs/README.md) and [TRT-LLM on Slurm](examples/basics/multinode/trtllm/README.md) for deployment examples.
See [TRT-LLM on Slurm](examples/basics/multinode/trtllm/README.md) for deployment examples.
## More News
......
CLAUDE.md
\ No newline at end of file
# SGLang Component
Dynamo's SGLang backend wraps SGLang's inference engine (`sgl.Engine`) and diffusion
generator (`DiffGenerator`) behind Dynamo's distributed runtime. It handles model
registration, request routing, metrics, and disaggregated serving.
## Entry Point
`__main__.py` -> `main.py:main()` -> `main.py:worker()`
`worker()` parses args, creates the distributed runtime, installs graceful shutdown,
then dispatches to one of 10 init functions based on CLI flags:
```
args.py:parse_args() -> Config(server_args, dynamo_args)
Worker dispatch (main.py:60-132):
--image-diffusion-worker -> init_diffusion.init_image_diffusion()
--video-generation-worker -> init_diffusion.init_video_diffusion()
--embedding-worker -> init_embedding.init_embedding()
--multimodal-processor -> init_multimodal.init_multimodal_processor()
--multimodal-encode-worker -> init_multimodal.init_multimodal_encode_worker()
--multimodal-worker -> init_multimodal.init_multimodal_worker() or _prefill_worker()
--dllm-algorithm <algo> -> init_diffusion.init_llm_diffusion()
(default, prefill mode) -> init_llm.init_prefill()
(default, decode/agg mode) -> init_llm.init_decode()
```
## Config / Args
`args.py:parse_args()` is the main parsing function. It returns `Config(server_args, dynamo_args)`.
**Two config paths:**
1. **LLM workers** (decode, prefill, embedding, multimodal-worker, dllm): Creates full
`sglang.srt.server_args.ServerArgs` via `ServerArgs.from_cli_args()`. This triggers
model config loading, tokenizer detection, etc.
2. **Diffusion workers** (image, video): Creates a minimal `types.SimpleNamespace` stub
(args.py:350-366) with only the fields needed for `DiffGenerator`. The stub does NOT
have `max_running_requests`, `dllm_algorithm_config`, or other LLM-specific fields.
Use `getattr()` when accessing fields that may not exist on the stub.
**DynamoConfig** combines `DynamoRuntimeConfig` (common flags like `--namespace`,
`--output-modalities`, `--media-output-fs-url`) with `DynamoSGLangConfig` (sglang-specific
flags like `--multimodal-processor`, `--embedding-worker`).
Key gotcha: `--output-modalities` defaults to `["text"]` globally. Image/video diffusion
workers override this in their init functions to `["image"]`/`["video"]` to ensure correct
registration with the Rust side.
## Handler Hierarchy
```
BaseGenerativeHandler (handler_base.py)
Abstract base. Has config, publisher, tracing. No engine.
Subclasses: ImageDiffusionWorkerHandler, VideoGenerationWorkerHandler
BaseWorkerHandler (handler_base.py)
Adds sgl.Engine, tokenizer, priority support, engine routes,
cancellation, bootstrap (disagg), weight update APIs.
Constructor accepts engine=None for encode-only workers.
DecodeWorkerHandler (llm/decode_handler.py)
Aggregated + disaggregated decode. Token/text streaming.
DiffusionWorkerHandler (llm/diffusion_handler.py)
LLM diffusion (DLLM). Simplified decode without disagg.
PrefillWorkerHandler (llm/prefill_handler.py)
Disaggregated prefill. Yields bootstrap info first, then consumes.
EmbeddingWorkerHandler (embedding/embedding_handler.py)
Uses engine.async_encode() instead of async_generate().
MultimodalWorkerHandler (multimodal/worker_handler.py)
Multimodal inference. Aggregated or disaggregated paths.
Has EmbeddingsProcessor for NIXL-transferred image embeddings.
MultimodalPrefillWorkerHandler (multimodal/worker_handler.py)
Multimodal prefill phase. Yields bootstrap info.
MultimodalProcessorHandler (multimodal/processor_handler.py)
Front-facing. No engine. Routes to encode worker.
MultimodalEncodeWorkerHandler (multimodal/encode_worker_handler.py)
No engine. Uses MMEncoder from SGLang. NIXL for embeddings transfer.
```
## Engine Types by Worker
| Worker | Engine | Notes |
|--------|--------|-------|
| decode, prefill, dllm, embedding | `sgl.Engine` | Full SGLang inference engine |
| multimodal-worker, multimodal-prefill | `sgl.Engine` | Plus EmbeddingsProcessor |
| multimodal-processor | None | Tokenizer only, routes to encoder |
| multimodal-encode-worker | None | `MMEncoder` from SGLang |
| image-diffusion-worker | `DiffGenerator` | From `sglang.multimodal_gen` |
| video-generation-worker | `DiffGenerator` | From `sglang.multimodal_gen` |
`DiffGenerator.generate()` returns `GenerationResult | list[GenerationResult] | None`
(dataclass, NOT a dict). Access `result.frames` for images/video frames,
`result.samples` for raw tensors.
## Registration
`register.py` has three paths:
1. **LLM** (`register_model_with_readiness_gate`): Builds `ModelRuntimeConfig` with
bootstrap info, scheduler stats, parser configs. Calls Rust `register_model()` which
downloads `config.json` + tokenizer from HuggingFace.
2. **Image diffusion** (`register_image_diffusion_model`): Calls `register_model()` with
`ModelType.Images`. The Rust side skips HF downloads for Images/Videos/Tensor types
(lib/bindings/python/rust/lib.rs:314) and uses `ModelDeploymentCard::with_name_only()`.
3. **Video generation** (`register_video_generation_model`): Same fast path with
`ModelType.Videos`.
## Init Flow (typical LLM decode)
```
init_decode():
engine = sgl.Engine(server_args)
handler = DecodeWorkerHandler(engine, config, publisher, endpoint, shutdown_event)
handler.register_engine_routes(runtime) # profiling, weight updates, memory mgmt
setup_sgl_metrics(engine, config, endpoint) # Prometheus + KV events via ZMQ
asyncio.gather(
endpoint.serve_endpoint(handler.generate, ...),
register_model_with_readiness_gate(engine, endpoint, ...),
)
```
## Disaggregated Serving
Prefill and decode workers coordinate via a bootstrap mechanism:
1. **Prefill handler** generates a `bootstrap_room` (random 63-bit ID)
2. Prefill yields bootstrap info (host, port, room) as its first response
3. **Decode handler** receives bootstrap info, passes it to `engine.async_generate()`
4. SGLang transfers KV cache via NIXL/RDMA between workers
Key functions: `BaseWorkerHandler._get_bootstrap_info()`,
`BaseWorkerHandler._generate_bootstrap_room()`.
## Metrics & Publishing
`publisher.py:DynamoSglangPublisher` manages:
- **Scheduler metrics**: Received via ZMQ from SGLang's scheduler, published to Prometheus
- **KV events**: ZMQ subscribers per DP rank, forwarded via `KvEventPublisher`
Only leader nodes (node_rank==0) run the metrics loop. Non-leader nodes just wait.
`setup_sgl_metrics()` returns `(publisher, metrics_task, metrics_labels)`.
## Graceful Shutdown
`shutdown.py:install_graceful_shutdown()` monkey-patches `loop.add_signal_handler()` to
capture SGLang's internal signal registrations and defer them. On SIGTERM/SIGINT:
1. Unregisters from discovery (stops new requests)
2. Waits grace period for in-flight requests
3. Runs deferred SGLang signal handlers
## Request Flow
```
Frontend (Rust, lib/llm/)
-> Preprocessor (tokenizes, builds PreprocessedRequest with token_ids + sampling + stop)
-> Dynamo RPC to endpoint (dyn://{namespace}.{component}.{endpoint})
-> Python handler.generate(request_dict, context)
handler._build_sampling_params(request) -> SGLang-native params
engine.async_generate(**params) -> async iterator of dicts
handler yields {token_ids, text, finish_reason, ...} back to frontend
-> Frontend postprocesses into OpenAI-compatible response
```
Two request formats depending on `--skip-tokenizer-init`:
- **Token-based** (skip_tokenizer_init=True): Frontend tokenizes. Request has `token_ids`,
`sampling_options`, `stop_conditions`. Handler maps to SGLang params.
- **Text-based** (skip_tokenizer_init=False): SGLang tokenizes. Request is an OpenAI
`ChatCompletionRequest`. Only `/v1/chat/completions` available.
Image/video diffusion handlers receive the full OpenAI-format request dict directly
(not preprocessed), since the frontend passes through diffusion requests without
tokenization.
## Health Checks
Each worker type has a custom health check payload (`health_check.py`):
- **Decode/Aggregated**: `SglangHealthCheckPayload` -- sends BOS token, expects 1 token back
- **Prefill (disagg)**: `SglangPrefillHealthCheckPayload` -- wrapped `{request, sampling_params}`
- **Image diffusion**: `ImageDiffusionHealthCheckPayload` -- 512x512, 1 inference step, b64_json
- **Video generation**: `VideoGenerationHealthCheckPayload` -- 256x256, 8 frames, 1 step, b64_json
Health check payloads can be overridden via `DYNAMO_HEALTH_CHECK_PAYLOAD` env var (JSON).
## Launch Scripts
Examples in `examples/backends/sglang/launch/`. Each script starts a frontend + worker(s)
in one terminal. GPU requirements are documented in script headers.
```
agg.sh # 1 GPU - Single aggregated worker
agg_embed.sh # 1 GPU - Embedding model
agg_vision.sh # 1 GPU - Multimodal (vision + LLM)
agg_router.sh # 2 GPUs - Two workers behind KV-aware router
disagg.sh # 2 GPUs - Prefill + decode on separate GPUs
disagg_router.sh # 4 GPUs - 2 prefill + 2 decode with KV routing
disagg_same_gpu.sh # 1 GPU - Both workers on single GPU (16+ GB VRAM)
multimodal_epd.sh # 2 GPUs - Encoder + PD worker
multimodal_disagg.sh # 3 GPUs - Encoder + prefill + decode
diffusion_llada.sh # 1 GPU - Diffusion language model
image_diffusion.sh # 1 GPU - Text-to-image (~38 GB VRAM for FLUX.1-dev)
text-to-video-diffusion.sh # 1-2 GPUs - Text-to-video (Wan2.1)
```
## Common Pitfalls
- **SimpleNamespace vs ServerArgs**: Image/video diffusion workers use SimpleNamespace
stubs. Always use `getattr(server_args, field, default)` for fields that may not exist.
- **engine=None**: Multimodal processor and encode worker pass `engine=None` to
BaseWorkerHandler. Any code in the base class that touches engine must guard with
`if engine is not None`.
- **GenerationResult is a dataclass**: SGLang 0.5.9 changed `DiffGenerator.generate()`
to return `GenerationResult` (not a dict). Use `result.frames`, not `result["frames"]`.
- **output_modalities default**: Global default is `["text"]`. Image/video diffusion
workers must override to `["image"]`/`["video"]` or the Rust registration path tries
to load `config.json` (which doesn't exist for diffusers models).
- **Zombie GPU processes**: `sgl_diffusion::scheduler` spawns a child process that
survives parent kill. Always check `nvidia-smi` after teardown.
For troubleshooting (CuDNN, config.json errors, OOM, disagg connectivity), see
`docs/pages/backends/sglang/sglang-examples.md#troubleshooting`.
## Adding a New Worker Type
Checklist for adding a new worker (e.g., a new modality or serving mode):
1. **CLI flag**: Add to `backend_args.py` (DynamoSGLangConfig) and parse in `args.py`
2. **Init function**: Create `init_<type>.py` with `init_<type>(config, runtime)` that:
- Creates the engine (sgl.Engine, DiffGenerator, or None for encode-only)
- Creates the handler
- Sets up metrics (`setup_sgl_metrics` if applicable)
- Calls `endpoint.serve_endpoint(handler.generate, ...)`
- Registers the model
3. **Handler**: Subclass `BaseWorkerHandler` (if engine-backed) or `BaseGenerativeHandler`
(if no engine). Implement `async generate(request, context) -> AsyncGenerator`
4. **Registration**: Add a function in `register.py`. Choose the right `ModelType`:
- `Chat | Completions` for LLM (Rust downloads config.json + tokenizer)
- `Images`, `Videos`, `Tensor` for non-LLM (Rust skips HF downloads)
5. **Health check**: Add a payload class in `health_check.py`
6. **Dispatch**: Add the flag check in `main.py:worker()` dispatch block
7. **output_modalities**: If not text, override in the init function (default is `["text"]`)
8. **Launch script**: Add to `examples/backends/sglang/launch/` with GPU count in header
## Tips for AI Assistants
- **Read before editing**: Always read handler_base.py and the relevant init_*.py before
modifying handler or registration code. The inheritance chain matters.
- **Test with launch scripts**: The fastest way to validate changes is to run the
corresponding launch script in `examples/backends/sglang/launch/`.
- **Kill zombies between tests**: `pkill -9 -f sglang; sleep 3` before relaunching.
Diffusion workers spawn child processes (`sgl_diffusion::scheduler`) that survive kills.
- **Check nvidia-smi**: If a launch OOMs, check for orphaned GPU processes from prior runs.
- **SimpleNamespace stubs**: When touching args.py or code that reads server_args, always
use `getattr(server_args, field, default)` -- image/video workers don't have full ServerArgs.
- **engine can be None**: Encode-only workers (multimodal-processor, multimodal-encode-worker)
pass engine=None. Guard any engine access in shared base class code.
- **Rebuild after Rust changes**: If changing registration (register.py interacts with Rust
bindings), rebuild: `cd lib/bindings/python && maturin develop --uv && cd <root> && uv pip install -e .`
- **Troubleshooting**: See `docs/pages/backends/sglang/sglang-examples.md#troubleshooting`
for CuDNN, config.json, OOM, and disagg connectivity issues.
## File Index
```
sglang/
__main__.py # Entry point
main.py # Worker dispatch
args.py # Config parsing (ServerArgs vs SimpleNamespace)
backend_args.py # Dynamo-specific SGLang CLI flags
init_llm.py # init_decode(), init_prefill()
init_diffusion.py # init_llm_diffusion(), init_image_diffusion(), init_video_diffusion()
init_multimodal.py # init_multimodal_{processor,encode_worker,worker,prefill_worker}()
init_embedding.py # init_embedding()
register.py # Model registration (LLM, image, video)
publisher.py # Metrics + KV event publishing
protocol.py # Request/response Pydantic models
health_check.py # Health check payloads per worker type
shutdown.py # Graceful shutdown with deferred signal handling
request_handlers/
handler_base.py # BaseGenerativeHandler, BaseWorkerHandler
llm/
decode_handler.py # DecodeWorkerHandler (agg + disagg)
prefill_handler.py # PrefillWorkerHandler (disagg prefill)
diffusion_handler.py # DiffusionWorkerHandler (DLLM)
embedding/
embedding_handler.py # EmbeddingWorkerHandler
image_diffusion/
image_diffusion_handler.py # ImageDiffusionWorkerHandler (DiffGenerator)
video_generation/
video_generation_handler.py # VideoGenerationWorkerHandler (DiffGenerator)
multimodal/
processor_handler.py # MultimodalProcessorHandler (no engine)
encode_worker_handler.py # MultimodalEncodeWorkerHandler (MMEncoder)
worker_handler.py # MultimodalWorkerHandler + PrefillWorkerHandler
```
......@@ -218,29 +218,8 @@ async def parse_args(args: list[str]) -> Config:
unknown.append("--config")
unknown.append(temp_config_file)
# Handle SGLang --config file merge if present.
if "--config" in unknown:
# Merge config file arguments with CLI arguments.
# ConfigArgumentMerger API changed after SGLang v0.5.7:
# - New API (post-v0.5.7): accepts parser= for proper store_true detection
# - Old API (v0.5.7 and earlier): only accepts boolean_actions=
# We use inspect.signature to detect the API rather than version checking
# since unreleased builds may have the new API while still reporting v0.5.7.
# Related upstream issue: https://github.com/sgl-project/sglang/issues/16256
# Upstream fix PR: https://github.com/sgl-project/sglang/pull/16638
import inspect
sig = inspect.signature(ConfigArgumentMerger.__init__)
if "parser" in sig.parameters:
config_merger = ConfigArgumentMerger(parser=sglang_only_parser)
else:
# Legacy path: extract store_true actions manually
boolean_actions = [
action.dest
for action in sglang_only_parser._actions
if isinstance(action, argparse._StoreTrueAction)
]
config_merger = ConfigArgumentMerger(boolean_actions=boolean_actions)
config_merger = ConfigArgumentMerger(parser=sglang_only_parser)
unknown = config_merger.merge_config_with_args(unknown)
parsed_args = sglang_only_parser.parse_args(unknown)
......@@ -423,6 +402,19 @@ async def parse_args(args: list[str]) -> Config:
# Auto-detect diffusion worker mode if dllm_algorithm
diffusion_worker = server_args.dllm_algorithm is not None
# SGLang's DLLM scheduler reads server_args.max_running_requests directly
# but the field stays None until the normal scheduler init sets it from
# tp_worker.get_worker_info(). Set a safe default so the DLLM mixin
# doesn't crash on `None - int`.
# Only applies to real DLLM workers (truthy algorithm string), not
# video/image diffusion stubs where dllm_algorithm=False.
if (
server_args.dllm_algorithm
and getattr(server_args, "max_running_requests", None) is None
):
server_args.max_running_requests = 8
logging.info("Defaulting max_running_requests to 8 for diffusion worker")
dynamo_config.namespace = parsed_namespace
dynamo_config.component = parsed_component_name
dynamo_config.endpoint = parsed_endpoint_name
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
import os
from typing import Awaitable, Callable
import sglang as sgl
from dynamo.common.storage import get_fs
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.runtime import DistributedRuntime
from dynamo.sglang.args import Config
from dynamo.sglang.health_check import (
ImageDiffusionHealthCheckPayload,
SglangHealthCheckPayload,
VideoGenerationHealthCheckPayload,
)
from dynamo.sglang.publisher import handle_non_leader_node, setup_sgl_metrics
from dynamo.sglang.register import (
register_image_diffusion_model,
register_model_with_readiness_gate,
register_video_generation_model,
)
from dynamo.sglang.request_handlers import (
DiffusionWorkerHandler,
ImageDiffusionWorkerHandler,
VideoGenerationWorkerHandler,
)
async def init_llm_diffusion(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize diffusion language model worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
logging.info(
f"Initializing diffusion worker with algorithm: {server_args.dllm_algorithm}"
)
if server_args.dllm_algorithm_config:
logging.info(
f"Using diffusion algorithm config: {server_args.dllm_algorithm_config}"
)
if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
if server_args.node_rank >= 1:
await handle_non_leader_node(engine, publisher, metrics_task)
return
ready_event = asyncio.Event()
handler = DiffusionWorkerHandler(
engine, config, publisher, generate_endpoint, shutdown_event
)
handler.register_engine_routes(runtime)
health_check_payload = SglangHealthCheckPayload(
engine, use_text_input=dynamo_args.use_sglang_tokenizer
).to_dict()
logging.info(
f"Registering diffusion model with endpoint types: {dynamo_args.endpoint_types}"
)
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve diffusion endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_image_diffusion(
runtime: DistributedRuntime,
config: Config,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize image diffusion worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
from sglang.multimodal_gen import DiffGenerator
if not server_args.model_path:
raise ValueError("--model is required for diffusion workers")
tp_size = getattr(server_args, "tp_size", 1)
dp_size = getattr(server_args, "dp_size", 1)
num_gpus = tp_size * dp_size
dist_timeout = getattr(server_args, "dist_timeout", None)
generator = DiffGenerator.from_pretrained(
model_path=server_args.model_path,
num_gpus=num_gpus,
tp_size=tp_size,
dp_size=dp_size,
dist_timeout=dist_timeout,
)
fs_url = dynamo_args.media_output_fs_url
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
handler = ImageDiffusionWorkerHandler(
generator,
config,
publisher=None,
fs=get_fs(fs_url),
)
health_check_payload = ImageDiffusionHealthCheckPayload(
model_path=server_args.model_path
).to_dict()
ready_event = asyncio.Event()
# The global --output-modalities default is ["text"] which is wrong for
# image diffusion workers -- it causes the Rust registration path to look
# for config.json (LLM artefacts). Only override when the user hasn't
# explicitly chosen a non-default value.
output_modalities = dynamo_args.output_modalities
if output_modalities is None or output_modalities == ["text"]:
output_modalities = ["image"]
logging.info(
"Overriding output_modalities to ['image'] for image diffusion worker"
)
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[],
health_check_payload=health_check_payload,
),
register_image_diffusion_model(
generator,
generate_endpoint,
server_args,
output_modalities=output_modalities,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve image diffusion endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_video_diffusion(
runtime: DistributedRuntime,
config: Config,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize video generation worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
from sglang.multimodal_gen import DiffGenerator
if not server_args.model_path:
raise ValueError("--model is required for video generation workers")
tp_size = getattr(server_args, "tp_size", 1)
dp_size = getattr(server_args, "dp_size", 1)
num_gpus = tp_size * dp_size
dist_timeout = getattr(server_args, "dist_timeout", None)
generator = DiffGenerator.from_pretrained(
model_path=server_args.model_path,
num_gpus=num_gpus,
tp_size=tp_size,
dp_size=dp_size,
dist_timeout=dist_timeout,
)
fs_url = dynamo_args.media_output_fs_url
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
handler = VideoGenerationWorkerHandler(
generator,
config,
publisher=None,
fs=get_fs(fs_url),
)
health_check_payload = VideoGenerationHealthCheckPayload(
model_path=server_args.model_path
).to_dict()
ready_event = asyncio.Event()
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[],
health_check_payload=health_check_payload,
),
register_video_generation_model(
generator,
generate_endpoint,
server_args,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve video generation endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
from typing import Awaitable, Callable
import sglang as sgl
from dynamo.llm import ModelInput, ModelType
from dynamo.runtime import DistributedRuntime
from dynamo.sglang.args import Config
from dynamo.sglang.health_check import SglangHealthCheckPayload
from dynamo.sglang.publisher import setup_sgl_metrics
from dynamo.sglang.register import register_model_with_readiness_gate
from dynamo.sglang.request_handlers import EmbeddingWorkerHandler
async def init_embedding(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize embedding worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
ready_event = asyncio.Event()
handler = EmbeddingWorkerHandler(engine, config, publisher, shutdown_event)
health_check_payload = SglangHealthCheckPayload(
engine, use_text_input=dynamo_args.use_sglang_tokenizer
).to_dict()
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
input_type=ModelInput.Text,
output_type=ModelType.Embedding,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve embedding endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
import os
import time
from typing import Awaitable, Callable
import sglang as sgl
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.llm import ModelInput, ModelType
from dynamo.runtime import DistributedRuntime
from dynamo.sglang.args import Config
from dynamo.sglang.health_check import (
SglangHealthCheckPayload,
SglangPrefillHealthCheckPayload,
)
from dynamo.sglang.publisher import handle_non_leader_node, setup_sgl_metrics
from dynamo.sglang.register import register_model_with_readiness_gate
from dynamo.sglang.request_handlers import DecodeWorkerHandler, PrefillWorkerHandler
async def _warmup_prefill_engine(engine: sgl.Engine, server_args) -> None:
"""Perform warmup request for prefill engine to reduce initial TTFT."""
logging.info("Start of prefill disaggregation warmup ...")
try:
from sglang.srt.disaggregation.utils import FAKE_BOOTSTRAP_HOST
from sglang.srt.sampling.sampling_params import SamplingParams
sampling_params = SamplingParams(
temperature=0.0,
max_new_tokens=8,
ignore_eos=True,
)
async def _do_warmup():
results = await engine.async_generate(
input_ids=[0, 1, 2, 3],
sampling_params=sampling_params,
stream=True,
bootstrap_host=FAKE_BOOTSTRAP_HOST,
bootstrap_port=server_args.disaggregation_bootstrap_port,
bootstrap_room=999999,
)
async for _ in results:
pass
await asyncio.wait_for(_do_warmup(), timeout=1800)
logging.info("Prefill warmup completed")
except asyncio.TimeoutError:
logging.warning("Prefill warmup timed out after 1800s")
except Exception as e:
logging.warning(f"Prefill warmup failed: {e}")
async def init_decode(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
server_args, dynamo_args = config.server_args, config.dynamo_args
if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
start_time = time.time()
engine = sgl.Engine(server_args=server_args)
load_time = time.time() - start_time
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
publisher.component_gauges.set_model_load_time(load_time)
logging.debug(f"SGLang model load time: {load_time:.2f}s")
if server_args.node_rank >= 1:
await handle_non_leader_node(engine, publisher, metrics_task)
return
ready_event = asyncio.Event()
handler = DecodeWorkerHandler(
engine, config, publisher, generate_endpoint, shutdown_event
)
handler.register_engine_routes(runtime)
health_check_payload = SglangHealthCheckPayload(
engine, use_text_input=dynamo_args.use_sglang_tokenizer
).to_dict()
logging.info(f"Registering model with endpoint types: {dynamo_args.endpoint_types}")
if dynamo_args.custom_jinja_template and "chat" not in dynamo_args.endpoint_types:
logging.warning(
"Custom Jinja template provided (--custom-jinja-template) but 'chat' not in --dyn-endpoint-types. "
"The chat template will be loaded but the /v1/chat/completions endpoint will not be available."
)
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_prefill(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
server_args, dynamo_args = config.server_args, config.dynamo_args
if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
if server_args.node_rank >= 1:
await handle_non_leader_node(engine, publisher, metrics_task)
return
await _warmup_prefill_engine(engine, server_args)
handler = PrefillWorkerHandler(
engine, config, publisher, generate_endpoint, shutdown_event
)
handler.register_engine_routes(runtime)
health_check_payload = SglangPrefillHealthCheckPayload(engine).to_dict()
ready_event = asyncio.Event()
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
input_type=ModelInput.Tokens,
output_type=ModelType.Prefill,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
from typing import Awaitable, Callable
import sglang as sgl
from dynamo import prometheus_names
from dynamo.common.constants import DisaggregationMode
from dynamo.llm import ModelInput
from dynamo.runtime import DistributedRuntime
from dynamo.sglang.args import Config
from dynamo.sglang.health_check import (
SglangHealthCheckPayload,
SglangPrefillHealthCheckPayload,
)
from dynamo.sglang.register import register_model_with_readiness_gate
from dynamo.sglang.request_handlers import (
MultimodalEncodeWorkerHandler,
MultimodalPrefillWorkerHandler,
MultimodalProcessorHandler,
MultimodalWorkerHandler,
)
async def init_multimodal_processor(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal processor component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
encode_worker_client = await runtime.endpoint(
f"{dynamo_args.namespace}.encoder.generate"
).client()
ready_event = asyncio.Event()
handler = MultimodalProcessorHandler(config, encode_worker_client, shutdown_event)
logging.info("Waiting for Encoder Worker Instances ...")
await encode_worker_client.wait_for_instances()
try:
_ = await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[
(prometheus_names.labels.MODEL, server_args.served_model_name),
(prometheus_names.labels.MODEL_NAME, server_args.served_model_name),
],
),
register_model_with_readiness_gate(
None, # engine
generate_endpoint,
server_args,
dynamo_args,
input_type=ModelInput.Text,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_multimodal_encode_worker(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal encode worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
pd_worker_client = await runtime.endpoint(
f"{dynamo_args.namespace}.backend.generate"
).client()
handler = MultimodalEncodeWorkerHandler(config, pd_worker_client, shutdown_event)
await handler.async_init(runtime)
await pd_worker_client.wait_for_instances()
try:
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[
(prometheus_names.labels.MODEL, server_args.served_model_name),
(prometheus_names.labels.MODEL_NAME, server_args.served_model_name),
],
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_multimodal_worker(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal worker component.
This worker is always an internal component that should not register with
the Frontend. Public registration is handled by the Processor component
(--multimodal-processor). For standalone serving, use init() (default).
"""
server_args, dynamo_args = config.server_args, config.dynamo_args
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
engine = sgl.Engine(server_args=server_args)
if config.serving_mode == DisaggregationMode.DECODE:
logging.info("Initializing prefill client for multimodal decode worker")
prefill_client = await runtime.endpoint(
f"{dynamo_args.namespace}.prefill.generate"
).client()
handler = MultimodalWorkerHandler(
engine, config, prefill_client, shutdown_event
)
else:
handler = MultimodalWorkerHandler(engine, config, None, shutdown_event)
await handler.async_init()
health_check_payload = SglangHealthCheckPayload(engine).to_dict()
try:
await generate_endpoint.serve_endpoint(
handler.generate,
metrics_labels=[("model", server_args.served_model_name)],
graceful_shutdown=True,
health_check_payload=health_check_payload,
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_multimodal_prefill_worker(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal prefill worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
handler = MultimodalPrefillWorkerHandler(engine, config, shutdown_event)
shutdown_endpoints[:] = [generate_endpoint]
await handler.async_init()
health_check_payload = SglangPrefillHealthCheckPayload(engine).to_dict()
try:
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[("model", server_args.served_model_name)],
health_check_payload=health_check_payload,
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
......@@ -2,190 +2,38 @@
# SPDX-License-Identifier: Apache-2.0
import asyncio
import inspect
import logging
import os
import signal
import sys
import time
from collections import defaultdict
from typing import Any, Awaitable, Callable, DefaultDict
import sglang as sgl
import uvloop
from dynamo import prometheus_names
from dynamo.common.config_dump import dump_config
from dynamo.common.constants import DisaggregationMode
from dynamo.common.storage import get_fs
from dynamo.common.utils.endpoint_types import parse_endpoint_types
from dynamo.common.utils.graceful_shutdown import graceful_shutdown_with_discovery
from dynamo.common.utils.runtime import create_runtime
from dynamo.llm import ModelInput, ModelType
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sglang.args import Config, parse_args
from dynamo.sglang.health_check import (
ImageDiffusionHealthCheckPayload,
SglangHealthCheckPayload,
SglangPrefillHealthCheckPayload,
VideoGenerationHealthCheckPayload,
from dynamo.sglang.args import parse_args
from dynamo.sglang.init_diffusion import (
init_image_diffusion,
init_llm_diffusion,
init_video_diffusion,
)
from dynamo.sglang.publisher import DynamoSglangPublisher, setup_sgl_metrics
from dynamo.sglang.register import (
register_image_diffusion_model,
register_model_with_readiness_gate,
register_video_generation_model,
)
from dynamo.sglang.request_handlers import (
DecodeWorkerHandler,
DiffusionWorkerHandler,
EmbeddingWorkerHandler,
ImageDiffusionWorkerHandler,
MultimodalEncodeWorkerHandler,
MultimodalPrefillWorkerHandler,
MultimodalProcessorHandler,
MultimodalWorkerHandler,
PrefillWorkerHandler,
VideoGenerationWorkerHandler,
from dynamo.sglang.init_embedding import init_embedding
from dynamo.sglang.init_llm import init_decode, init_prefill
from dynamo.sglang.init_multimodal import (
init_multimodal_encode_worker,
init_multimodal_prefill_worker,
init_multimodal_processor,
init_multimodal_worker,
)
from dynamo.sglang.shutdown import install_graceful_shutdown
configure_dynamo_logging()
async def _handle_non_leader_node(
engine: sgl.Engine,
publisher: DynamoSglangPublisher,
metrics_task: asyncio.Task,
) -> None:
"""
Handle non-leader node (node_rank >= 1) in multi-node deployments.
Non-leader nodes run scheduler processes but don't handle requests directly.
They still need:
- KV event publishing (subscribe to local DP ranks, forward to NATS)
- Metrics collection from local schedulers
- Prometheus metrics exposure
Args:
engine: The SGLang engine instance.
publisher: The DynamoSglangPublisher for metrics and KV events.
metrics_task: The asyncio task running the metrics loop.
"""
logging.info(
f"Non-leader node detected (node_rank={engine.server_args.node_rank}). "
"Running with metrics and KV event publishing for local DP ranks."
)
try:
# Wait indefinitely - the process will be terminated via signal handlers
await asyncio.Event().wait()
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
pass
publisher.cleanup()
SignalCallback = Callable[..., Any]
def install_graceful_shutdown(
loop: asyncio.AbstractEventLoop,
runtime: Any,
endpoints: list,
shutdown_event: asyncio.Event,
*,
signals: tuple[int, ...] = (signal.SIGTERM, signal.SIGINT),
) -> Callable[[], Awaitable[None]]:
"""
Set up graceful shutdown with discovery unregister and grace period.
Owns OS-level SIGTERM/SIGINT via signal.signal() so SGLang's internal
loop.add_signal_handler registrations cannot replace our handler.
Monkey-patches loop.add_signal_handler to capture (defer) those
registrations. Returns run_deferred_handlers to be invoked in init
finally blocks (after the asyncio loop / serve_endpoint is done).
"""
# Deferred handlers registered via loop.add_signal_handler for these signals
deferred_handlers: DefaultDict[int, list[tuple[SignalCallback, tuple[Any, ...]]]] = defaultdict(list) # type: ignore[assignment]
shutdown_started = False
shutdown_signum: int | None = None
deferred_handlers_ran = False
async def run_deferred_handlers() -> None:
nonlocal deferred_handlers_ran
if not shutdown_started or deferred_handlers_ran:
return
deferred_handlers_ran = True
signums = (
[shutdown_signum]
if shutdown_signum is not None
else list(deferred_handlers.keys())
)
for sig in signums:
for cb, args in list(deferred_handlers.get(sig, [])):
try:
res = cb(*args)
if inspect.isawaitable(res):
await res
except Exception:
logging.exception("Deferred signal callback failed: %r", cb)
async def _shutdown_sequence(signum: int, frame: Any | None) -> None:
nonlocal shutdown_started, shutdown_signum
if shutdown_started:
return
shutdown_signum = signum
shutdown_started = True
logging.info("Received signal %s, starting graceful shutdown", signum)
await graceful_shutdown_with_discovery(
runtime,
endpoints,
shutdown_event=shutdown_event,
grace_period_s=None,
)
def _schedule_shutdown(signum: int, frame: Any | None) -> None:
def _kick() -> None:
asyncio.create_task(_shutdown_sequence(signum, frame))
loop.call_soon_threadsafe(_kick)
def _os_signal_handler(signum: int, frame: Any) -> None:
_schedule_shutdown(signum, frame)
for sig in signals:
signal.signal(sig, _os_signal_handler)
orig_add = loop.add_signal_handler
def watching_add_signal_handler(sig: int, callback: SignalCallback, *args: Any):
if sig in signals:
logging.debug(
"Captured underlying service trying to register for loop.add_signal_handler(%s, %r, ...).",
sig,
callback,
)
deferred_handlers[sig].append((callback, args))
return None
return orig_add(sig, callback, *args)
loop.add_signal_handler = watching_add_signal_handler # type: ignore[assignment]
return run_deferred_handlers
async def worker():
config = await parse_args(sys.argv[1:])
dump_config(config.dynamo_args.dump_config_to, config)
# Setup GPU Memory Service if --load-format gms is used
if config.server_args.load_format == "gms":
from gpu_memory_service.integrations.sglang import setup_gms
......@@ -214,7 +62,7 @@ async def worker():
runtime, config, shutdown_endpoints, run_deferred_handlers
)
elif config.dynamo_args.video_generation_worker:
await init_video_generation(
await init_video_diffusion(
runtime, config, shutdown_endpoints, run_deferred_handlers
)
elif config.dynamo_args.embedding_worker:
......@@ -259,7 +107,7 @@ async def worker():
run_deferred_handlers,
)
elif config.dynamo_args.diffusion_worker:
await init_diffusion(
await init_llm_diffusion(
runtime,
config,
shutdown_event,
......@@ -267,7 +115,7 @@ async def worker():
run_deferred_handlers,
)
elif config.serving_mode != DisaggregationMode.PREFILL:
await init(
await init_decode(
runtime,
config,
shutdown_event,
......@@ -284,744 +132,6 @@ async def worker():
)
async def init(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
server_args, dynamo_args = config.server_args, config.dynamo_args
# Prevent SGLang from blocking on non-leader nodes
if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
# Time model loading
start_time = time.time()
engine = sgl.Engine(server_args=server_args)
load_time = time.time() - start_time
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# Setup metrics and KV events for ALL nodes (including non-leader)
# Non-leader nodes need KV event publishing for their local DP ranks
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
# Record model load time immediately after publisher setup (which creates the gauges)
publisher.component_gauges.set_model_load_time(load_time)
logging.debug(f"SGLang model load time: {load_time:.2f}s")
# Handle non-leader nodes (multi-node parallelism)
# Non-leader nodes run schedulers and publish KV events, but don't serve requests
if server_args.node_rank >= 1:
await _handle_non_leader_node(engine, publisher, metrics_task)
return
# Readiness gate: requests wait until model is registered
ready_event = asyncio.Event()
handler = DecodeWorkerHandler(
engine, config, publisher, generate_endpoint, shutdown_event
)
handler.register_engine_routes(runtime)
health_check_payload = SglangHealthCheckPayload(
engine, use_text_input=dynamo_args.use_sglang_tokenizer
).to_dict()
logging.info(f"Registering model with endpoint types: {dynamo_args.endpoint_types}")
if dynamo_args.custom_jinja_template and "chat" not in dynamo_args.endpoint_types:
logging.warning(
"Custom Jinja template provided (--custom-jinja-template) but 'chat' not in --dyn-endpoint-types. "
"The chat template will be loaded but the /v1/chat/completions endpoint will not be available."
)
try:
# Start endpoint immediately and register model concurrently
# Requests queue until ready_event is set (TODO: Part of new PR)
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_prefill(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
server_args, dynamo_args = config.server_args, config.dynamo_args
# Prevent SGLang from blocking on non-leader nodes
if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# Setup metrics and KV events for ALL nodes (including non-leader)
# Non-leader nodes need KV event publishing for their local DP ranks
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
# Handle non-leader nodes (multi-node parallelism)
# Non-leader nodes run schedulers and publish KV events, but don't serve requests
if server_args.node_rank >= 1:
await _handle_non_leader_node(engine, publisher, metrics_task)
return
# Perform dummy warmup for prefill worker to avoid initial TTFT hit
# Only needed on leader node that handles requests
await _warmup_prefill_engine(engine, server_args)
handler = PrefillWorkerHandler(
engine, config, publisher, generate_endpoint, shutdown_event
)
handler.register_engine_routes(runtime)
health_check_payload = SglangPrefillHealthCheckPayload(engine).to_dict()
# Readiness gate: requests wait until model is registered
ready_event = asyncio.Event()
try:
# Start endpoint immediately and register model concurrently
# Registration publishes runtime_config with bootstrap endpoint for optimization
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
input_type=ModelInput.Tokens,
output_type=ModelType.Prefill,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_diffusion(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize diffusion language model worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
logging.info(
f"Initializing diffusion worker with algorithm: {server_args.dllm_algorithm}"
)
if server_args.dllm_algorithm_config:
logging.info(
f"Using diffusion algorithm config: {server_args.dllm_algorithm_config}"
)
# Prevent SGLang from blocking on non-leader nodes
if server_args.node_rank >= 1:
os.environ["SGLANG_BLOCK_NONZERO_RANK_CHILDREN"] = "0"
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# Setup metrics and KV events for ALL nodes (including non-leader)
# Non-leader nodes need KV event publishing for their local DP ranks
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
# Handle non-leader nodes (multi-node parallelism)
# Non-leader nodes run schedulers and publish KV events, but don't serve requests
if server_args.node_rank >= 1:
await _handle_non_leader_node(engine, publisher, metrics_task)
return
# Readiness gate: requests wait until model is registered
ready_event = asyncio.Event()
handler = DiffusionWorkerHandler(
engine, config, publisher, generate_endpoint, shutdown_event
)
handler.register_engine_routes(runtime)
health_check_payload = SglangHealthCheckPayload(
engine, use_text_input=dynamo_args.use_sglang_tokenizer
).to_dict()
logging.info(
f"Registering diffusion model with endpoint types: {dynamo_args.endpoint_types}"
)
try:
# Start endpoint and register model
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
output_type=parse_endpoint_types(dynamo_args.endpoint_types),
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve diffusion endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_embedding(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize embedding worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# publisher instantiates the metrics and kv event publishers
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, generate_endpoint
)
# Readiness gate: requests wait until model is registered
ready_event = asyncio.Event()
handler = EmbeddingWorkerHandler(engine, config, publisher, shutdown_event)
health_check_payload = SglangHealthCheckPayload(
engine, use_text_input=dynamo_args.use_sglang_tokenizer
).to_dict()
try:
# Start endpoint immediately and register model concurrently
# Requests queue until ready_event is set
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=metrics_labels,
health_check_payload=health_check_payload,
),
register_model_with_readiness_gate(
engine,
generate_endpoint,
server_args,
dynamo_args,
input_type=ModelInput.Text,
output_type=ModelType.Embedding,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve embedding endpoints: {e}")
raise
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
logging.info("Metrics task successfully cancelled")
pass
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_image_diffusion(
runtime: DistributedRuntime,
config: Config,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize image diffusion worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
# Initialize DiffGenerator (not sgl.Engine)
from sglang.multimodal_gen import DiffGenerator
if not server_args.model_path:
raise ValueError("--model is required for diffusion workers")
# Parallelism configuration
tp_size = getattr(server_args, "tp_size", 1)
dp_size = getattr(server_args, "dp_size", 1)
num_gpus = tp_size * dp_size
# Distributed configuration
dist_timeout = getattr(server_args, "dist_timeout", None)
generator = DiffGenerator.from_pretrained(
model_path=server_args.model_path,
# Parallelism configuration
num_gpus=num_gpus,
tp_size=tp_size,
dp_size=dp_size,
# Distributed configuration
dist_timeout=dist_timeout,
)
fs_url = dynamo_args.media_output_fs_url
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# Image diffusion doesn't have metrics publisher like LLM
# Could add custom metrics for images/sec, steps/sec later
handler = ImageDiffusionWorkerHandler(
generator,
config,
publisher=None,
fs=get_fs(fs_url),
)
# Create proper health check payload that sends a minimal diffusion request
health_check_payload = ImageDiffusionHealthCheckPayload(
model_path=server_args.model_path
).to_dict()
ready_event = asyncio.Event()
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[], # No LLM metrics labels
health_check_payload=health_check_payload,
),
register_image_diffusion_model(
generator,
generate_endpoint,
server_args,
output_modalities=dynamo_args.output_modalities,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve image diffusion endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_video_generation(
runtime: DistributedRuntime,
config: Config,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize video generation worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
# Initialize DiffGenerator (not sgl.Engine) - same as image diffusion
from sglang.multimodal_gen import DiffGenerator
if not server_args.model_path:
raise ValueError("--model is required for video generation workers")
# Parallelism configuration
tp_size = getattr(server_args, "tp_size", 1)
dp_size = getattr(server_args, "dp_size", 1)
num_gpus = tp_size * dp_size
# Distributed configuration
dist_timeout = getattr(server_args, "dist_timeout", None)
generator = DiffGenerator.from_pretrained(
model_path=server_args.model_path,
# Parallelism configuration
num_gpus=num_gpus,
tp_size=tp_size,
dp_size=dp_size,
# Distributed configuration
dist_timeout=dist_timeout,
)
fs_url = dynamo_args.media_output_fs_url
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
handler = VideoGenerationWorkerHandler(
generator,
config,
publisher=None,
fs=get_fs(fs_url),
)
# Create proper health check payload that sends a minimal video request
health_check_payload = VideoGenerationHealthCheckPayload(
model_path=server_args.model_path
).to_dict()
ready_event = asyncio.Event()
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[], # No LLM metrics labels
health_check_payload=health_check_payload,
),
register_video_generation_model(
generator,
generate_endpoint,
server_args,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve video generation endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_multimodal_processor(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal processor component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# For processor, we need to connect to the encode worker
encode_worker_client = await runtime.endpoint(
f"{dynamo_args.namespace}.encoder.generate"
).client()
ready_event = asyncio.Event()
handler = MultimodalProcessorHandler(config, encode_worker_client, shutdown_event)
logging.info("Waiting for Encoder Worker Instances ...")
await encode_worker_client.wait_for_instances()
try:
_ = await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[
(prometheus_names.labels.MODEL, server_args.served_model_name),
(prometheus_names.labels.MODEL_NAME, server_args.served_model_name),
],
),
register_model_with_readiness_gate(
None, # engine
generate_endpoint,
server_args,
dynamo_args,
input_type=ModelInput.Text,
readiness_gate=ready_event,
),
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_multimodal_encode_worker(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal encode worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
# For encode worker, we need to connect to the downstream LLM worker
pd_worker_client = await runtime.endpoint(
f"{dynamo_args.namespace}.backend.generate"
).client()
handler = MultimodalEncodeWorkerHandler(config, pd_worker_client, shutdown_event)
await handler.async_init(runtime)
await pd_worker_client.wait_for_instances()
try:
# Encode Worker is an internal component, should not register with Frontend
# Only needs to provide internal service endpoint for Processor to call
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[
(prometheus_names.labels.MODEL, server_args.served_model_name),
(prometheus_names.labels.MODEL_NAME, server_args.served_model_name),
],
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_multimodal_worker(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal worker component.
This worker is always an internal component that should not register with
the Frontend. Public registration is handled by the Processor component
(--multimodal-processor). For standalone serving, use init() (default).
"""
server_args, dynamo_args = config.server_args, config.dynamo_args
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
shutdown_endpoints[:] = [generate_endpoint]
engine = sgl.Engine(server_args=server_args)
if config.serving_mode == DisaggregationMode.DECODE:
logging.info("Initializing prefill client for multimodal decode worker")
prefill_client = await runtime.endpoint(
f"{dynamo_args.namespace}.prefill.generate"
).client()
handler = MultimodalWorkerHandler(
engine, config, prefill_client, shutdown_event
)
else:
handler = MultimodalWorkerHandler(engine, config, None, shutdown_event)
await handler.async_init()
health_check_payload = SglangHealthCheckPayload(engine).to_dict()
try:
# Multimodal Worker is an internal component, should not register with Frontend.
# Only needs to provide internal service endpoint for Processor to call.
await generate_endpoint.serve_endpoint(
handler.generate,
metrics_labels=[("model", server_args.served_model_name)],
graceful_shutdown=True,
health_check_payload=health_check_payload,
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def init_multimodal_prefill_worker(
runtime: DistributedRuntime,
config: Config,
shutdown_event: asyncio.Event,
shutdown_endpoints: list,
run_deferred_handlers: Callable[[], Awaitable[None]] | None = None,
):
"""Initialize multimodal prefill worker component"""
server_args, dynamo_args = config.server_args, config.dynamo_args
engine = sgl.Engine(server_args=server_args)
generate_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.{dynamo_args.endpoint}"
)
handler = MultimodalPrefillWorkerHandler(engine, config, shutdown_event)
shutdown_endpoints[:] = [generate_endpoint]
await handler.async_init()
health_check_payload = SglangPrefillHealthCheckPayload(engine).to_dict()
try:
# Prefill Worker is an internal component, should not register with Frontend
# Only needs to provide internal service endpoint for Decode Worker to call
await generate_endpoint.serve_endpoint(
handler.generate,
graceful_shutdown=True,
metrics_labels=[("model", server_args.served_model_name)],
health_check_payload=health_check_payload,
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
raise
finally:
handler.cleanup()
if run_deferred_handlers is not None:
logging.info("Running deferred handlers")
await run_deferred_handlers()
async def _warmup_prefill_engine(engine: sgl.Engine, server_args) -> None:
"""Perform warmup request for prefill engine to reduce initial TTFT."""
logging.info("Start of prefill disaggregation warmup ...")
try:
from sglang.srt.disaggregation.utils import FAKE_BOOTSTRAP_HOST
from sglang.srt.sampling.sampling_params import SamplingParams
sampling_params = SamplingParams(
temperature=0.0,
max_new_tokens=8,
ignore_eos=True,
)
# Timeout: 1800s (30 min) for deep gemm precache
async def _do_warmup():
results = await engine.async_generate(
input_ids=[0, 1, 2, 3],
sampling_params=sampling_params,
stream=True,
bootstrap_host=FAKE_BOOTSTRAP_HOST,
bootstrap_port=server_args.disaggregation_bootstrap_port,
bootstrap_room=999999,
)
# Consume the stream
async for _ in results:
pass
await asyncio.wait_for(_do_warmup(), timeout=1800)
logging.info("Prefill warmup completed")
except asyncio.TimeoutError:
logging.warning("Prefill warmup timed out after 1800s")
except Exception as e:
logging.warning(f"Prefill warmup failed: {e}")
def main():
uvloop.run(worker())
......
......@@ -375,3 +375,33 @@ async def setup_sgl_metrics(
task = asyncio.create_task(publisher.run())
logging.info("SGLang metrics loop started")
return publisher, task, metrics_labels
async def handle_non_leader_node(
engine: sgl.Engine,
publisher: DynamoSglangPublisher,
metrics_task: asyncio.Task,
) -> None:
"""
Handle non-leader node (node_rank >= 1) in multi-node deployments.
Non-leader nodes run scheduler processes but don't handle requests directly.
They still need:
- KV event publishing (subscribe to local DP ranks, forward to NATS)
- Metrics collection from local schedulers
- Prometheus metrics exposure
"""
logging.info(
f"Non-leader node detected (node_rank={engine.server_args.node_rank}). "
"Running with metrics and KV event publishing for local DP ranks."
)
try:
await asyncio.Event().wait()
finally:
metrics_task.cancel()
try:
await metrics_task
except asyncio.CancelledError:
pass
publisher.cleanup()
......@@ -129,15 +129,20 @@ class BaseWorkerHandler(BaseGenerativeHandler):
self.skip_tokenizer_init = config.server_args.skip_tokenizer_init
self.enable_trace = config.server_args.enable_trace
self.input_param_manager = InputParamManager(
self.engine.tokenizer_manager.tokenizer
if not self.skip_tokenizer_init
else None
)
self._engine_supports_priority = (
"priority" in inspect.signature(engine.async_generate).parameters
)
if engine is not None:
self.input_param_manager = InputParamManager(
self.engine.tokenizer_manager.tokenizer
if not self.skip_tokenizer_init
else None
)
self._engine_supports_priority = (
"priority" in inspect.signature(engine.async_generate).parameters
)
else:
# Encode-only workers (e.g. MultimodalEncodeWorkerHandler) don't
# have an sgl.Engine.
self.input_param_manager = InputParamManager(None)
self._engine_supports_priority = False
def _priority_kwargs(self, priority: Any) -> Dict[str, Any]:
if priority is not None and self._engine_supports_priority:
......
......@@ -160,10 +160,13 @@ class ImageDiffusionWorkerHandler(BaseGenerativeHandler):
sampling_params_kwargs=args,
)
# DiffGenerator.generate() returns GenerationResult | list[GenerationResult] | None
if result is None:
raise RuntimeError("No result from generator")
if isinstance(result, list):
result = result[0]
images = result["frames"] if "frames" in result else []
images = result.frames if result.frames else []
# Convert images to bytes (handle PIL Images, numpy arrays, or bytes)
image_bytes_list = []
......
......@@ -223,8 +223,12 @@ class VideoGenerationWorkerHandler(BaseGenerativeHandler):
sampling_params_kwargs=args,
)
# Result contains 'frames' with list of frames
frames = result.get("frames", [])
# DiffGenerator.generate() returns GenerationResult | list[GenerationResult] | None
if result is None:
raise RuntimeError("DiffGenerator returned None")
if isinstance(result, list):
result = result[0]
frames = result.frames
if not frames:
raise RuntimeError("DiffGenerator returned no frames")
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import inspect
import logging
import signal
from collections import defaultdict
from typing import Any, Awaitable, Callable, DefaultDict
from dynamo.common.utils.graceful_shutdown import graceful_shutdown_with_discovery
SignalCallback = Callable[..., Any]
def install_graceful_shutdown(
loop: asyncio.AbstractEventLoop,
runtime: Any,
endpoints: list,
shutdown_event: asyncio.Event,
*,
signals: tuple[int, ...] = (signal.SIGTERM, signal.SIGINT),
) -> Callable[[], Awaitable[None]]:
"""
Set up graceful shutdown with discovery unregister and grace period.
Owns OS-level SIGTERM/SIGINT via signal.signal() so SGLang's internal
loop.add_signal_handler registrations cannot replace our handler.
Monkey-patches loop.add_signal_handler to capture (defer) those
registrations. Returns run_deferred_handlers to be invoked in init
finally blocks (after the asyncio loop / serve_endpoint is done).
"""
deferred_handlers: DefaultDict[
int, list[tuple[SignalCallback, tuple[Any, ...]]]
] = defaultdict(
list
) # type: ignore[assignment]
shutdown_started = False
shutdown_signum: int | None = None
deferred_handlers_ran = False
async def run_deferred_handlers() -> None:
nonlocal deferred_handlers_ran
if not shutdown_started or deferred_handlers_ran:
return
deferred_handlers_ran = True
signums = (
[shutdown_signum]
if shutdown_signum is not None
else list(deferred_handlers.keys())
)
for sig in signums:
for cb, args in list(deferred_handlers.get(sig, [])):
try:
res = cb(*args)
if inspect.isawaitable(res):
await res
except Exception:
logging.exception("Deferred signal callback failed: %r", cb)
async def _shutdown_sequence(signum: int, frame: Any | None) -> None:
nonlocal shutdown_started, shutdown_signum
if shutdown_started:
return
shutdown_signum = signum
shutdown_started = True
logging.info("Received signal %s, starting graceful shutdown", signum)
await graceful_shutdown_with_discovery(
runtime,
endpoints,
shutdown_event=shutdown_event,
grace_period_s=None,
)
def _schedule_shutdown(signum: int, frame: Any | None) -> None:
def _kick() -> None:
asyncio.create_task(_shutdown_sequence(signum, frame))
loop.call_soon_threadsafe(_kick)
def _os_signal_handler(signum: int, frame: Any) -> None:
_schedule_shutdown(signum, frame)
for sig in signals:
signal.signal(sig, _os_signal_handler)
orig_add = loop.add_signal_handler
def watching_add_signal_handler(sig: int, callback: SignalCallback, *args: Any):
if sig in signals:
logging.debug(
"Captured underlying service trying to register for loop.add_signal_handler(%s, %r, ...).",
sig,
callback,
)
deferred_handlers[sig].append((callback, args))
return None
return orig_add(sig, callback, *args)
loop.add_signal_handler = watching_add_signal_handler # type: ignore[assignment]
return run_deferred_handlers
......@@ -5,6 +5,7 @@
import base64
import io
from types import SimpleNamespace
from unittest.mock import MagicMock, Mock, patch
import pytest
......@@ -134,7 +135,7 @@ class TestImageDiffusionWorkerHandler:
# Mock generator response
handler.generator.generate = Mock(
return_value={"frames": [test_image.convert("RGB")]}
return_value=SimpleNamespace(frames=[test_image.convert("RGB")])
)
request = {
......@@ -173,7 +174,7 @@ class TestImageDiffusionWorkerHandler:
# Mock generator response
handler.generator.generate = Mock(
return_value={"frames": [test_image.convert("RGB")]}
return_value=SimpleNamespace(frames=[test_image.convert("RGB")])
)
request = {
......@@ -213,7 +214,9 @@ class TestImageDiffusionWorkerHandler:
):
"""Test that num_inference_steps defaults to 50."""
test_image = Image.new("RGB", (256, 256), color="green")
handler.generator.generate = Mock(return_value={"frames": [test_image]})
handler.generator.generate = Mock(
return_value=SimpleNamespace(frames=[test_image])
)
request = {
"prompt": "A green square",
......@@ -281,7 +284,9 @@ class TestImageDiffusionWorkerHandler:
# Create a numpy array representing an image
np_image = np.random.randint(0, 255, (256, 256, 3), dtype=np.uint8)
handler.generator.generate = Mock(return_value={"frames": [np_image]})
handler.generator.generate = Mock(
return_value=SimpleNamespace(frames=[np_image])
)
images = await handler._generate_images(
prompt="test",
......@@ -300,7 +305,9 @@ class TestImageDiffusionWorkerHandler:
"""Test _generate_images handles PIL Images."""
pil_image = Image.new("RGB", (256, 256), color="red")
handler.generator.generate = Mock(return_value={"frames": [pil_image]})
handler.generator.generate = Mock(
return_value=SimpleNamespace(frames=[pil_image])
)
images = await handler._generate_images(
prompt="test",
......@@ -319,7 +326,9 @@ class TestImageDiffusionWorkerHandler:
"""Test _generate_images handles bytes directly."""
img_bytes = b"raw image bytes"
handler.generator.generate = Mock(return_value={"frames": [img_bytes]})
handler.generator.generate = Mock(
return_value=SimpleNamespace(frames=[img_bytes])
)
images = await handler._generate_images(
prompt="test",
......
......@@ -58,10 +58,10 @@ sglang:
runtime_image: lmsysorg/sglang
cuda12.9:
base_image_tag: 25.06-cuda12.9-devel-ubuntu24.04
runtime_image_tag: v0.5.7-runtime
runtime_image_tag: v0.5.9-runtime
cuda13.0:
base_image_tag: 25.11-cuda13.0-devel-ubuntu24.04
runtime_image_tag: v0.5.8-cu130-runtime
runtime_image_tag: v0.5.9-cu130-runtime
enable_media_ffmpeg: "true"
enable_gpu_memory_service: "true"
enable_kvbm: "false"
......
......@@ -8,7 +8,7 @@ title: SGLang
## Use the Latest Release
We recommend using the latest stable release of dynamo to avoid breaking changes:
We recommend using the latest stable release of Dynamo to avoid breaking changes:
[![GitHub Release](https://img.shields.io/github/v/release/ai-dynamo/dynamo)](https://github.com/ai-dynamo/dynamo/releases/latest)
......@@ -20,245 +20,115 @@ git checkout $(git describe --tags $(git rev-list --tags --max-count=1))
---
## Table of Contents
- [Feature Support Matrix](#feature-support-matrix)
- [Dynamo SGLang Integration](#dynamo-sglang-integration)
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Single Node Examples](#run-single-node-examples)
- [Multi-Node and Advanced Examples](#advanced-examples)
- [Deploy on SLURM or Kubernetes](#deployment)
## Feature Support Matrix
### Core Dynamo Features
| Feature | SGLang | Notes |
|---------|--------|-------|
| [**Disaggregated Serving**](../../design-docs/disagg-serving.md) | ✅ | |
| [**Conditional Disaggregation**](../../design-docs/disagg-serving.md) | 🚧 | WIP [PR](https://github.com/sgl-project/sglang/pull/7730) |
| [**KV-Aware Routing**](../../components/router/README.md) | ✅ | |
| [**SLA-Based Planner**](../../components/planner/planner-guide.md) | ✅ | |
| [**Multimodal Support**](../../features/multimodal/multimodal-sglang.md) | ✅ | |
| [**KVBM**](../../components/kvbm/README.md) | ❌ | Planned |
## Dynamo SGLang Integration
Dynamo SGLang integrates SGLang engines into Dynamo's distributed runtime, enabling advanced features like disaggregated serving, KV-aware routing, and request migration while maintaining full compatibility with SGLang's engine arguments.
### Argument Handling
Dynamo SGLang uses SGLang's native argument parser, so **most SGLang engine arguments work identically**. You can pass any SGLang argument (like `--model-path`, `--tp`, `--trust-remote-code`) directly to `dynamo.sglang`.
#### Dynamo-Specific Arguments
| Argument | Description | Default | SGLang Equivalent |
|----------|-------------|---------|-------------------|
| `--endpoint` | Dynamo endpoint in `dyn://namespace.component.endpoint` format | Auto-generated based on mode | N/A |
| `--dyn-tool-call-parser` | Tool call parser for structured outputs (takes precedence over `--tool-call-parser`) | `None` | `--tool-call-parser` |
| `--dyn-reasoning-parser` | Reasoning parser for CoT models (takes precedence over `--reasoning-parser`) | `None` | `--reasoning-parser` |
| `--use-sglang-tokenizer` | Use SGLang's tokenizer instead of Dynamo's | `False` | N/A |
| `--custom-jinja-template` | Use custom chat template for that model (takes precedence over default chat template in model repo) | `None` | `--chat-template` |
#### Tokenizer Behavior
- **Default (`--use-sglang-tokenizer` not set)**: Dynamo handles tokenization/detokenization via our blazing fast frontend and passes `input_ids` to SGLang
- **With `--use-sglang-tokenizer`**: SGLang handles tokenization/detokenization, Dynamo passes raw prompts
> [!NOTE]
> When using `--use-sglang-tokenizer`, only `v1/chat/completions` is available through Dynamo's frontend.
### Request Cancellation
When a user cancels a request (e.g., by disconnecting from the frontend), the request is automatically cancelled across all workers, freeing compute resources for other requests.
#### Cancellation Support Matrix
| | Prefill | Decode |
|-|---------|--------|
| **Aggregated** | ✅ | ✅ |
| **Disaggregated** | ⚠️ | ✅ |
> [!WARNING]
> ⚠️ SGLang backend currently does not support cancellation during remote prefill phase in disaggregated mode.
For more details, see the [Request Cancellation Architecture](../../fault-tolerance/request-cancellation.md) documentation.
Dynamo SGLang integrates [SGLang](https://github.com/sgl-project/sglang) engines into Dynamo's distributed runtime, enabling disaggregated serving, KV-aware routing, and request cancellation while maintaining full compatibility with SGLang's native engine arguments. It supports LLM inference, embedding models, multimodal vision models, and diffusion-based generation (LLM, image, video).
## Installation
### Install latest release
We suggest using uv to install the latest release of ai-dynamo[sglang]. You can install it with `curl -LsSf https://astral.sh/uv/install.sh | sh`
### Install Latest Release
We recommend using [uv](https://github.com/astral-sh/uv) to install:
<Accordion title="Expand for instructions">
```bash
# create a virtual env
uv venv --python 3.12 --seed
# install the latest release (which comes bundled with a stable sglang version)
uv pip install "ai-dynamo[sglang]"
```
</Accordion>
### Install editable version for development
This installs Dynamo with the compatible SGLang version.
<Accordion title="Expand for instructions">
This requires having rust installed. We also recommend having a proper installation of the cuda toolkit as sglang requires `nvcc` to be available.
### Install for Development
<Accordion title="Development installation">
Requires Rust and the CUDA toolkit (`nvcc`).
```bash
# create a virtual env
# install dynamo
uv venv --python 3.12 --seed
# build dynamo runtime bindings
uv pip install maturin
uv pip install maturin nixl
cd $DYNAMO_HOME/lib/bindings/python
maturin develop --uv
cd $DYNAMO_HOME
# installs sglang supported version along with dynamo
# include the prerelease flag to install flashinfer rc versions
uv pip install -e .
# install any sglang version >= 0.5.3.post2
uv pip install "sglang[all]==0.5.3.post2"
# install sglang
git clone https://github.com/sgl-project/sglang.git
cd sglang && uv pip install -e "python"
```
</Accordion>
### Using docker containers
This is the ideal way for agents to also develop. You can provide the path to both repos and the virtual environment and have it rerun these commands as it makes changes
</Accordion>
<Accordion title="Expand for instructions">
We are in the process of shipping pre-built docker containers that contain installations of DeepEP, DeepGEMM, and NVSHMEM in order to support WideEP and P/D. For now, you can quickly build the container from source with the following command.
### Docker
<Accordion title="Build and run container">
```bash
cd $DYNAMO_ROOT
python container/render.py --framework sglang --output-short-filename
docker build -f container/rendered.Dockerfile -t dynamo:latest-sglang .
```
And then run it using
```bash
docker run \
--gpus all \
-it \
--rm \
--network host \
--shm-size=10G \
--ulimit memlock=-1 \
--ulimit stack=67108864 \
--gpus all -it --rm \
--network host --shm-size=10G \
--ulimit memlock=-1 --ulimit stack=67108864 \
--ulimit nofile=65536:65536 \
--cap-add CAP_SYS_PTRACE \
--ipc host \
--cap-add CAP_SYS_PTRACE --ipc host \
dynamo:latest-sglang
```
</Accordion>
## Quick Start
## Feature Support Matrix
Below we provide a guide that lets you run all of our common deployment patterns on a single node.
| Feature | Status | Notes |
|---------|--------|-------|
| [**Disaggregated Serving**](../../design-docs/disagg-serving.md) | ✅ | Prefill/decode separation with NIXL KV transfer |
| [**KV-Aware Routing**](../../components/router/README.md) | ✅ | |
| [**SLA-Based Planner**](../../components/planner/planner-guide.md) | ✅ | |
| [**Multimodal Support**](../../features/multimodal/multimodal-sglang.md) | ✅ | Image via EPD, E/PD, E/P/D patterns |
| [**Diffusion Models**](sglang-diffusion.md) | ✅ | LLM diffusion, image, and video generation |
| [**Request Cancellation**](../../fault-tolerance/request-cancellation.md) | ✅ | Aggregated full; disaggregated decode-only |
| [**Graceful Shutdown**](../../fault-tolerance/graceful-shutdown.md) | ✅ | Discovery unregister + grace period |
| [**Prometheus Metrics**](sglang-prometheus.md) | ✅ | SGLang + Dynamo metrics on `/metrics` |
| [**KVBM**](../../components/kvbm/README.md) | ❌ | Planned |
### Start Infrastructure Services (Local Development Only)
## Quick Start
For local/bare-metal development, start etcd and optionally NATS using [Docker Compose](https://github.com/ai-dynamo/dynamo/tree/main/deploy/docker-compose.yml):
### Python / CLI Deployment
Start infrastructure services for local development:
```bash
docker compose -f deploy/docker-compose.yml up -d
```
> [!NOTE]
> - **etcd** is optional but is the default local discovery backend. You can also use `--discovery-backend file` to use file system based discovery.
> - **NATS** is optional - only needed if using KV routing with events. Workers must be explicitly configured with `--kv-events-config` to publish events. Use `--no-router-kv-events` on the frontend for prediction-based routing without events
> - **On Kubernetes**, neither is required when using the Dynamo operator, which explicitly sets `DYN_DISCOVERY_BACKEND=kubernetes` to enable native K8s service discovery (DynamoWorkerMetadata CRD)
> [!TIP]
> Each example corresponds to a simple bash script that runs the OpenAI compatible server, processor, and optional router (written in Rust) and LLM engine (written in Python) in a single terminal. You can easily take each command and run them in separate terminals.
>
> Additionally - because we use sglang's argument parser, you can pass in any argument that sglang supports to the worker!
### Aggregated Serving
Launch an aggregated serving deployment:
```bash
cd $DYNAMO_HOME/examples/backends/sglang
./launch/agg.sh
```
### Aggregated Serving with KV Routing
```bash
cd $DYNAMO_HOME/examples/backends/sglang
./launch/agg_router.sh
```
### Aggregated Serving for Embedding Models
Here's an example that uses the [Qwen/Qwen3-Embedding-4B](https://huggingface.co/Qwen/Qwen3-Embedding-4B) model.
```bash
cd $DYNAMO_HOME/examples/backends/sglang
./launch/agg_embed.sh
```
<Accordion title="Send the following request to verify your deployment:">
```bash
curl localhost:8000/v1/embeddings \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen3-Embedding-4B",
"input": "Hello, world!"
}'
```
</Accordion>
### Disaggregated serving
See [SGLang Disaggregation](sglang-disaggregation.md) to learn more about how sglang and dynamo handle disaggregated serving.
```bash
cd $DYNAMO_HOME/examples/backends/sglang
./launch/disagg.sh
```
### Disaggregated Serving with KV Aware Prefill Routing
```bash
cd $DYNAMO_HOME/examples/backends/sglang
./launch/disagg_router.sh
```
### Disaggregated Serving with Mixture-of-Experts (MoE) models and DP attention
You can use this configuration to test out disaggregated serving with dp attention and expert parallelism on a single node before scaling to the full DeepSeek-R1 model across multiple nodes.
```bash
# note this will require 4 GPUs
cd $DYNAMO_HOME/examples/backends/sglang
./launch/disagg_dp_attn.sh
```
### Testing the Deployment
Send a test request to verify your deployment:
Verify the deployment:
```bash
curl localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen3-0.6B",
"messages": [
{
"role": "user",
"content": "Explain why Roger Federer is considered one of the greatest tennis players of all time"
}
],
"messages": [{"role": "user", "content": "Hello!"}],
"stream": true,
"max_tokens": 30
}'
```
### Kubernetes Deployment
## Deployment
We currently provide deployment examples for Kubernetes and SLURM.
You can deploy SGLang with Dynamo on Kubernetes using a `DynamoGraphDeployment`. For more details, see the [SGLang Kubernetes Deployment Guide](https://github.com/ai-dynamo/dynamo/tree/main/examples/backends/sglang/deploy).
## Kubernetes
- **[Deploying Dynamo with SGLang on Kubernetes](https://github.com/ai-dynamo/dynamo/tree/main/examples/backends/sglang/deploy/README.md)**
## Next Steps
## SLURM
- **[Deploying Dynamo with SGLang on SLURM](https://github.com/ai-dynamo/dynamo/tree/main/examples/backends/sglang/slurm_jobs/README.md)**
- **[Reference Guide](sglang-reference-guide.md)**: Worker types, architecture, and configuration
- **[Examples](sglang-examples.md)**: All deployment patterns with launch scripts
- **[Disaggregation](sglang-disaggregation.md)**: P/D architecture and KV transfer details
- **[Diffusion](sglang-diffusion.md)**: LLM, image, and video diffusion models
- **[Prometheus Metrics](sglang-prometheus.md)**: Metrics integration and monitoring
- **[Deploying SGLang with Dynamo on Kubernetes](https://github.com/ai-dynamo/dynamo/tree/main/examples/backends/sglang/deploy)**: Kubernetes deployment guide
---
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: Diffusion LM
---
# Running Diffusion LMs with SGLang
Diffusion Language Models (Diffusion LMs) are a class of generative models that use diffusion processes for text generation. This guide shows how to deploy diffusion models like LLaDA2.0 using SGLang as the backend with Dynamo. Diffusion LMs work differently from autoregressive models - they iteratively refine generated text through a diffusion process.
## Launch the Deployment
### Using the Launch Script (Recommended)
The easiest way to start the diffusion LM service is using the provided launch script:
```bash
bash examples/backends/sglang/launch/diffusion_llada.sh
```
### Manual Launch Steps
If you prefer to launch components manually:
**Start frontend**
```bash
python -m dynamo.frontend --http-port 8001 &
```
**Run diffusion worker**
```bash
export CUDA_VISIBLE_DEVICES=0,1
python -m dynamo.sglang \
--model-path inclusionAI/LLaDA2.0-mini-preview \
--tp-size 2 \
--skip-tokenizer-init \
--trust-remote-code \
--endpoint dyn://dynamo.backend.generate \
--enable-metrics \
--disable-cuda-graph \
--disable-overlap-schedule \
--attention-backend triton \
--dllm-algorithm LowConfidence
```
## Diffusion Algorithms
The diffusion worker uses the **LowConfidence** algorithm for the iterative refinement process. This algorithm refines tokens with low confidence scores, progressively replacing masked tokens with the model's predictions until confidence thresholds are met.
For more details on diffusion algorithms and configuration options, refer to the [SGLang Diffusion Language Models documentation](https://github.com/sgl-project/sglang/blob/main/docs/supported_models/text_generation/diffusion_language_models.md).
## Testing the Deployment
Once deployed, you can test the service using curl:
```bash
curl -X POST http://localhost:8001/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "inclusionAI/LLaDA2.0-mini-preview",
"messages": [
{
"role": "user",
"content": "Hello! How are you?"
}
],
"temperature": 0.7,
"max_tokens": 512
}'
```
Or use the completions endpoint:
```bash
curl -X POST http://localhost:8001/v1/completions \
-H "Content-Type: application/json" \
-d '{
"model": "inclusionAI/LLaDA2.0-mini-preview",
"prompt": "Once upon a time",
"max_tokens": 256
}'
```
\ No newline at end of file
---
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: Expert Distribution (EPLB)
---
# Expert Parallelism Load Balancer (EPLB) in SGLang
Mixture-of-Experts (MoE) models utilize a technique called Expert Parallelism (EP), where experts are distributed across multiple GPUs. While this allows for much larger and more powerful models, it can lead to an uneven workload distribution. Because the load on different experts may vary depending on the workload, some GPUs can become bottlenecks, forcing the entire system to wait. This imbalance leads to wasted compute cycles and increased memory usage.
To address this, SGLang implements an Expert Parallelism Load Balancer (EPLB) inspired by the work in the DeepSeek-V3 paper. EPLB analyzes expert usage patterns and dynamically re-arranges the experts across the available GPUs to ensure a more balanced workload.
## The EPLB Algorithm: Core Concepts
The load balancing algorithm revolves around a few key ideas to achieve an optimal distribution of work.
### Redundant Experts for Flexibility
The core strategy is to create **redundant experts**. Instead of being limited to the model's original number of experts, EPLB can create duplicates of heavily-loaded experts. For example, if a model has 256 experts, you can configure EPLB to create an additional 32 "redundant" experts, bringing the total to 288. This pool of replicated experts is then strategically packed onto the available GPUs. A popular expert might be duplicated multiple times, while a moderately used expert might be grouped with several rarely used ones on a single GPU.
### Group-Limited Routing for Efficiency
Modern MoE models like DeepSeek-V3 use **group-limited expert routing**. In this design, experts are organized into groups, and routing decisions are constrained within these groups. EPLB can take advantage of this structure to reduce inter-node data traffic by attempting to place all experts from the same group onto the same node whenever possible.
### Load Balancing Policies
The algorithm comes with two policies for different scenarios:
1. **Hierarchical Load Balancing**: This policy is used when the number of server nodes evenly divides the number of expert groups. It first harnesses the group-limited routing by packing expert groups onto nodes to balance the load between nodes. Then, within each node, it replicates and packs the experts onto individual GPUs to balance the load locally. This is often used during prefill where the expert-parallel size might be smaller.
2. **Global Load Balancing**: In all other cases, a global policy is used. It replicates experts globally without regard to their group affiliation and packs them onto individual GPUs. This policy is more general and can be adopted during the decoding stage with a larger expert-parallel size.
## How SGLang Implements EPLB
SGLang provides a robust implementation of EPLB, allowing for dynamic, online rebalancing of expert locations based on real-world traffic.
### Dynamic Rebalancing
You can enable dynamic rebalancing by setting the `--enable-eplb` flag. When enabled, the `EPLBManager` runs in the background. It periodically triggers a rebalance after a certain number of requests, configured with `--eplb-rebalance-num-iterations`. At each rebalance, it computes a new expert placement plan based on the latest usage statistics and updates the model's expert locations on the fly.
### Expert Usage Recording
To make intelligent balancing decisions, SGLang needs to collect data on expert usage. The `ExpertDistributionRecorder` is responsible for this, and its behavior is controlled by the `--expert-distribution-recorder-mode` flag. This flag determines the granularity of the collected data. When `enable_eplb` is on, this mode defaults to `stat` to gather statistics for rebalancing. The available modes are:
- **`per_token`**: This is the most detailed mode. It records the specific expert choices for every single token processed by the model. While it provides the richest data, it also has the highest performance overhead. The raw, unaggregated data for each forward pass is stored.
- **`per_pass`**: In this mode, SGLang records the aggregated expert usage counts for each individual forward pass. The data is not aggregated across different passes, giving you a snapshot of expert popularity for each batch of requests.
- **`stat`**: This mode also records the exact expert usage counts for each forward pass, but it then aggregates these counts across multiple passes (the number of passes is determined by `--expert-distribution-recorder-buffer-size`). This provides a moving average of expert usage statistics and is the default when EPLB is enabled.
- **`stat_approx`**: This mode is similar to `stat` but gathers _approximate_ statistics, usually from the DeepEP dispatcher. This method has lower overhead than `stat` but is less precise, especially for small batch sizes. It is a good choice when performance is critical.
The collected statistics are then fed into the rebalancing algorithm to generate a new expert placement plan.
### Initializing with a Pre-computed Distribution
While SGLang can start with a simple default layout and learn a better one over time, you can also provide it with a pre-computed expert distribution to start with. The `--init-expert-location` flag allows you to specify a file path (`.pt` or `.json`) or a JSON string containing an expert layout. This is useful if you have already analyzed a representative workload offline and want the server to start immediately with a balanced configuration. If this flag is not set, it defaults to a `trivial` sequential layout.
### References and further reading
- [SGLang Large Scale P/D + WideEP Deployment](https://lmsys.org/blog/2025-05-05-large-scale-ep/#expert-parallelism-load-balancer)
- [Deepseek's EPLB repository](https://github.com/deepseek-ai/EPLB)
---
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: GPT-OSS
---
# Running gpt-oss-120b Disaggregated with SGLang
The gpt-oss-120b guide for SGLang is largely identical to the [guide for vLLM](../vllm/gpt-oss.md),
please ues the vLLM guide as a reference with the different deployment steps as highlighted below:
# Launch the Deployment
Note that GPT-OSS is a reasoning model with tool calling support. To
ensure the response is being processed correctly, the worker should be
launched with proper `--dyn-reasoning-parser` and `--dyn-tool-call-parser`.
**Start frontend**
```bash
python3 -m dynamo.frontend --http-port 8000 &
```
**Run decode worker**
```bash
CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m dynamo.sglang \
--model-path openai/gpt-oss-120b \
--served-model-name openai/gpt-oss-120b \
--tp 4 \
--trust-remote-code \
--skip-tokenizer-init \
--disaggregation-mode decode \
--disaggregation-transfer-backend nixl \
--dyn-reasoning-parser gpt_oss \
--dyn-tool-call-parser harmony
```
**Run prefill workers**
```bash
CUDA_VISIBLE_DEVICES=4,5,6,7 python3 -m dynamo.sglang \
--model-path openai/gpt-oss-120b \
--served-model-name openai/gpt-oss-120b \
--tp 4 \
--trust-remote-code \
--skip-tokenizer-init \
--disaggregation-mode prefill \
--disaggregation-transfer-backend nixl \
--dyn-reasoning-parser gpt_oss \
--dyn-tool-call-parser harmony
```
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment