Unverified Commit 51dfd760 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: add SGLang chat processor for frontend pre/post processing (#6834)


Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent 63d7c01c
...@@ -71,6 +71,7 @@ class FrontendConfig(KvRouterConfigBase): ...@@ -71,6 +71,7 @@ class FrontendConfig(KvRouterConfigBase):
event_plane: str event_plane: str
chat_processor: str chat_processor: str
enable_anthropic_api: bool enable_anthropic_api: bool
debug_perf: bool
preprocess_workers: int preprocess_workers: int
def validate(self) -> None: def validate(self) -> None:
...@@ -350,10 +351,25 @@ class FrontendArgGroup(ArgGroup): ...@@ -350,10 +351,25 @@ class FrontendArgGroup(ArgGroup):
default="dynamo", default="dynamo",
dest="chat_processor", dest="chat_processor",
help=( help=(
"[EXPERIMENTAL] When set to 'vllm', use local vllm for the pre and post " "[EXPERIMENTAL] Chat pre/post processor backend. 'dynamo' uses the Rust "
"processor." "preprocessor. 'vllm' uses local vLLM for pre and post processing. "
"'sglang' uses SGLang APIs for chat template rendering, tool call "
"parsing, and reasoning parsing."
),
choices=["dynamo", "vllm", "sglang"],
)
add_negatable_bool_argument(
g,
flag_name="--dyn-debug-perf",
env_var="DYN_DEBUG_PERF",
default=False,
dest="debug_perf",
help=(
"[EXPERIMENTAL] Enable performance instrumentation for diagnosing preprocessing bottlenecks. "
"Logs per-function timing, request concurrency, and hot-path section durations. "
"Supported with '--dyn-chat-processor vllm' and '--dyn-chat-processor sglang'."
), ),
choices=["dynamo", "vllm"],
) )
add_argument( add_argument(
...@@ -366,7 +382,8 @@ class FrontendArgGroup(ArgGroup): ...@@ -366,7 +382,8 @@ class FrontendArgGroup(ArgGroup):
"[EXPERIMENTAL] Number of worker processes for preprocessing and output processing. " "[EXPERIMENTAL] Number of worker processes for preprocessing and output processing. "
"When > 0, offloads CPU-bound work (tokenization, template rendering, " "When > 0, offloads CPU-bound work (tokenization, template rendering, "
"detokenization) to a ProcessPoolExecutor with N workers, each with its " "detokenization) to a ProcessPoolExecutor with N workers, each with its "
"own GIL. 0 (default) keeps all processing on the main event loop. '--dyn-chat-processor vllm' only." "own GIL. 0 (default) keeps all processing on the main event loop. "
"Supported with '--dyn-chat-processor vllm' and '--dyn-chat-processor sglang'."
), ),
arg_type=int, arg_type=int,
) )
...@@ -63,11 +63,36 @@ def setup_engine_factory( ...@@ -63,11 +63,36 @@ def setup_engine_factory(
return EngineFactory(runtime, router_config, config, vllm_flags) return EngineFactory(runtime, router_config, config, vllm_flags)
def parse_args() -> tuple[FrontendConfig, Optional[Namespace]]: def setup_sglang_engine_factory(
runtime: DistributedRuntime,
router_config: RouterConfig,
config: FrontendConfig,
sglang_flags: Optional[Namespace] = None,
):
"""
When using sglang pre and post processor, create the SglangEngineFactory
that creates the engines that run requests.
"""
from .sglang_processor import SglangEngineFactory
tool_call_parser = getattr(sglang_flags, "tool_call_parser", None)
reasoning_parser = getattr(sglang_flags, "reasoning_parser", None)
return SglangEngineFactory(
runtime,
router_config,
config,
debug_perf=config.debug_perf,
tool_call_parser_name=tool_call_parser,
reasoning_parser_name=reasoning_parser,
)
def parse_args() -> tuple[FrontendConfig, Optional[Namespace], Optional[Namespace]]:
"""Parse command-line arguments for the Dynamo frontend. """Parse command-line arguments for the Dynamo frontend.
Returns: Returns:
FrontendConfig: Parsed configuration object. Tuple of (FrontendConfig, vllm_flags, sglang_flags).
""" """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
...@@ -83,6 +108,7 @@ def parse_args() -> tuple[FrontendConfig, Optional[Namespace]]: ...@@ -83,6 +108,7 @@ def parse_args() -> tuple[FrontendConfig, Optional[Namespace]]:
config.validate() config.validate()
vllm_flags = None vllm_flags = None
sglang_flags = None
# parse extra vllm flags using vllm native parser. # parse extra vllm flags using vllm native parser.
if config.chat_processor == "vllm": if config.chat_processor == "vllm":
...@@ -108,11 +134,19 @@ def parse_args() -> tuple[FrontendConfig, Optional[Namespace]]: ...@@ -108,11 +134,19 @@ def parse_args() -> tuple[FrontendConfig, Optional[Namespace]]:
vllm_parser = AsyncEngineArgs.add_cli_args(vllm_parser) vllm_parser = AsyncEngineArgs.add_cli_args(vllm_parser)
# the result is returned as Namespace object rather than AsyncEngineArgs object to avoid import error for non-vllm users. # the result is returned as Namespace object rather than AsyncEngineArgs object to avoid import error for non-vllm users.
vllm_flags = vllm_parser.parse_args(unknown) vllm_flags = vllm_parser.parse_args(unknown)
elif config.chat_processor == "sglang":
sglang_parser = argparse.ArgumentParser(add_help=False)
sglang_parser.add_argument("--tool-call-parser", default=None)
sglang_parser.add_argument("--reasoning-parser", default=None)
sglang_flags, remaining = sglang_parser.parse_known_args(unknown)
if remaining:
logger.error(f"Unknown arguments specified: {remaining}")
sys.exit(1)
else: else:
if unknown: if unknown:
logger.error(f"Unknown arguments specified: {unknown}") logger.error(f"Unknown arguments specified: {unknown}")
sys.exit(1) sys.exit(1)
return config, vllm_flags return config, vllm_flags, sglang_flags
async def async_main(): async def async_main():
...@@ -128,7 +162,7 @@ async def async_main(): ...@@ -128,7 +162,7 @@ async def async_main():
# bind that port before the worker, causing port conflicts and/or scraping the # bind that port before the worker, causing port conflicts and/or scraping the
# wrong metrics endpoint. # wrong metrics endpoint.
os.environ.pop("DYN_SYSTEM_PORT", None) os.environ.pop("DYN_SYSTEM_PORT", None)
config, vllm_flags = parse_args() config, vllm_flags, sglang_flags = parse_args()
dump_config(config.dump_config_to, config) dump_config(config.dump_config_to, config)
os.environ["DYN_EVENT_PLANE"] = config.event_plane os.environ["DYN_EVENT_PLANE"] = config.event_plane
logger.info( logger.info(
...@@ -233,6 +267,11 @@ async def async_main(): ...@@ -233,6 +267,11 @@ async def async_main():
runtime, router_config, config, vllm_flags runtime, router_config, config, vllm_flags
).chat_engine_factory ).chat_engine_factory
kwargs["chat_engine_factory"] = chat_engine_factory kwargs["chat_engine_factory"] = chat_engine_factory
elif config.chat_processor == "sglang":
chat_engine_factory = setup_sglang_engine_factory(
runtime, router_config, config, sglang_flags
).chat_engine_factory
kwargs["chat_engine_factory"] = chat_engine_factory
e = EntrypointArgs(EngineType.Dynamic, **kwargs) e = EntrypointArgs(EngineType.Dynamic, **kwargs)
engine = await make_engine(runtime, e) engine = await make_engine(runtime, e)
......
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from sglang.srt.entrypoints.openai.protocol import Function as SglangFunction
from sglang.srt.entrypoints.openai.protocol import Tool as SglangTool
from sglang.srt.function_call.function_call_parser import FunctionCallParser
from sglang.srt.parser.reasoning_parser import ReasoningParser
from .utils import random_call_id
@dataclass
class SglangPreprocessResult:
"""Result of SGLang preprocessing."""
prompt_token_ids: list[int]
tool_call_parser: FunctionCallParser | None
reasoning_parser: ReasoningParser | None
request: dict[str, Any]
def convert_tools(tools: list[dict[str, Any]] | None) -> list[SglangTool] | None:
"""Convert OpenAI tool dicts to SGLang Tool objects."""
if not tools:
return None
sglang_tools = []
for tool in tools:
func = tool.get("function", {})
sglang_tools.append(
SglangTool(
type=tool.get("type", "function"),
function=SglangFunction(
name=func.get("name", ""),
description=func.get("description"),
parameters=func.get("parameters"),
strict=func.get("strict", False),
),
)
)
return sglang_tools
def _materialize_messages(messages: list[Any]) -> list[dict[str, Any]]:
"""Convert message objects to plain dicts for apply_chat_template."""
normalized = []
for msg in messages:
if hasattr(msg, "model_dump"):
normalized.append(msg.model_dump(exclude_none=False))
elif isinstance(msg, dict):
normalized.append(msg)
else:
normalized.append(dict(msg))
return normalized
def create_parsers(
request: dict[str, Any],
*,
tool_call_parser_name: str | None,
reasoning_parser_name: str | None,
sglang_tools: list[SglangTool] | None = None,
) -> tuple[FunctionCallParser | None, ReasoningParser | None]:
"""Create tool call and reasoning parsers for a request.
Shared by both the single-process preprocessing path and the pool path
(which must recreate non-picklable parsers in the main process).
If ``sglang_tools`` is provided, reuses them; otherwise converts from
the request's ``tools`` field.
"""
if sglang_tools is None:
sglang_tools = convert_tools(request.get("tools"))
tool_choice = request.get("tool_choice", "auto")
tool_call_parser = None
if tool_call_parser_name and sglang_tools and tool_choice != "none":
tool_call_parser = FunctionCallParser(
tools=sglang_tools,
tool_call_parser=tool_call_parser_name,
)
reasoning_parser = None
if reasoning_parser_name:
reasoning_parser = ReasoningParser(
model_type=reasoning_parser_name,
stream_reasoning=True,
)
return tool_call_parser, reasoning_parser
def preprocess_chat_request(
request: dict[str, Any],
*,
tokenizer,
tool_call_parser_name: str | None,
reasoning_parser_name: str | None,
) -> SglangPreprocessResult:
"""Preprocess a chat request using SGLang tokenizer and parser APIs.
Synchronous -- suitable for both main-process and worker-process execution.
"""
messages = _materialize_messages(request.get("messages", []))
# Convert tools to SGLang format (done once, shared with parser creation)
sglang_tools = convert_tools(request.get("tools"))
# Build template kwargs -- single call for rendering + tokenization
template_kwargs: dict[str, Any] = {
"add_generation_prompt": True,
"tokenize": True,
}
if sglang_tools:
template_kwargs["tools"] = [t.model_dump() for t in sglang_tools]
prompt_token_ids = tokenizer.apply_chat_template(messages, **template_kwargs)
if not isinstance(prompt_token_ids, list):
prompt_token_ids = list(prompt_token_ids)
tool_call_parser, reasoning_parser = create_parsers(
request,
tool_call_parser_name=tool_call_parser_name,
reasoning_parser_name=reasoning_parser_name,
sglang_tools=sglang_tools,
)
return SglangPreprocessResult(
prompt_token_ids=prompt_token_ids,
tool_call_parser=tool_call_parser,
reasoning_parser=reasoning_parser,
request=request,
)
def _random_call_id() -> str:
return random_call_id()
class SglangStreamingPostProcessor:
"""Streaming post-processor using SGLang parsers and HF tokenizer detokenization.
Handles:
- Incremental detokenization via sliding-window decode (6-token lookback)
- Reasoning content extraction via SGLang ReasoningParser
- Tool call parsing via SGLang FunctionCallParser (parameter deltas)
"""
# Lookback window size for incremental detokenization. UTF-8 characters
# can span up to 4 bytes, each potentially its own token. A lookback of
# 6 covers the worst case (4-token char) plus margin for BPE merges that
# cross the old/new boundary.
LOOKBACK = 6
def __init__(
self,
*,
tokenizer,
tool_call_parser: FunctionCallParser | None,
reasoning_parser: ReasoningParser | None,
) -> None:
self.tokenizer = tokenizer
self.tool_call_parser = tool_call_parser
self.reasoning_parser = reasoning_parser
self._fast_plain_text = tool_call_parser is None and reasoning_parser is None
self._all_token_ids: list[int] = []
# Tool call accumulation. SGLang's streaming parser returns
# deltas (name in one chunk, argument fragments across subsequent
# chunks). However, when the complete tool-call JSON arrives in a
# single chunk the parser emits the name but never streams
# arguments (a chunking-sensitivity issue in the base detector).
# We accumulate names + arg fragments from streaming deltas and,
# on finish, fall back to parse_non_stream on the detector buffer
# for any tool call whose arguments are still missing.
self._tool_call_ids: dict[int, str] = {} # tool_index -> call_id
self._tool_call_names: dict[int, str] = {} # tool_index -> name
self._tool_call_args: dict[int, list[str]] = {} # tool_index -> arg chunks
def _incremental_decode(self, new_token_ids: list[int]) -> str:
"""Decode new tokens with lookback window for multi-byte char boundaries.
Re-decodes a small window of previous tokens alongside new tokens so that
multi-byte characters spanning token boundaries are correctly resolved.
Only retains the last LOOKBACK tokens to bound memory usage.
"""
prev_count = len(self._all_token_ids)
self._all_token_ids.extend(new_token_ids)
start = max(0, prev_count - self.LOOKBACK)
# Trim to avoid unbounded growth -- only the tail matters for decoding
if len(self._all_token_ids) > self.LOOKBACK * 16:
self._all_token_ids = self._all_token_ids[
-(self.LOOKBACK + len(new_token_ids)) :
]
prev_count = len(self._all_token_ids) - len(new_token_ids)
start = max(0, prev_count - self.LOOKBACK)
# Decode lookback-only prefix (before new tokens)
prefix_tokens = self._all_token_ids[start:prev_count]
prefix_text = (
self.tokenizer.decode(prefix_tokens, skip_special_tokens=True)
if prefix_tokens
else ""
)
# Decode lookback + new tokens together
window_tokens = self._all_token_ids[start:]
window_text = self.tokenizer.decode(window_tokens, skip_special_tokens=True)
return window_text[len(prefix_text) :]
def process_output(self, engine_response: dict[str, Any]) -> dict[str, Any] | None:
"""Process a single engine response chunk into an OpenAI SSE choice dict.
Args:
engine_response: Dict with ``token_ids`` and optional ``finish_reason``.
Returns:
OpenAI choice dict or ``None`` if nothing to emit yet.
"""
raw_ids = engine_response.get("token_ids")
token_ids = raw_ids if isinstance(raw_ids, list) else list(raw_ids or [])
finish_reason = engine_response.get("finish_reason")
delta_text = self._incremental_decode(token_ids) if token_ids else ""
if self._fast_plain_text:
if delta_text:
return {
"index": 0,
"delta": {"role": "assistant", "content": delta_text},
"finish_reason": finish_reason,
"logprobs": None,
}
elif finish_reason:
return {
"index": 0,
"delta": {},
"finish_reason": finish_reason,
"logprobs": None,
}
return None
# -- Reasoning parsing --
reasoning_text = None
normal_text = delta_text
if self.reasoning_parser and delta_text:
r_text, n_text = self.reasoning_parser.parse_stream_chunk(delta_text)
reasoning_text = r_text or None
normal_text = n_text or ""
# -- Tool call parsing (accumulate deltas) --
content_text = normal_text
if self.tool_call_parser and normal_text:
parsed_text, tool_calls = self.tool_call_parser.parse_stream_chunk(
normal_text
)
content_text = parsed_text
for tc in tool_calls:
idx = tc.tool_index
if idx not in self._tool_call_ids:
self._tool_call_ids[idx] = _random_call_id()
if tc.name:
self._tool_call_names[idx] = tc.name
if tc.parameters:
self._tool_call_args.setdefault(idx, []).append(tc.parameters)
# -- Assemble delta --
delta: dict[str, Any] = {"role": "assistant"}
has_content = False
if content_text:
delta["content"] = content_text
has_content = True
if reasoning_text:
delta["reasoning_content"] = reasoning_text
has_content = True
# Emit complete tool calls on finish. For any tool call whose
# arguments are still empty (chunking-sensitivity issue), fall
# back to parse_non_stream on the detector's buffer.
if finish_reason and self._tool_call_names:
missing_args = any(
idx not in self._tool_call_args for idx in self._tool_call_names
)
if missing_args:
buffer = getattr(self.tool_call_parser.detector, "_buffer", "")
if buffer:
_, final_calls = self.tool_call_parser.parse_non_stream(buffer)
for tc in final_calls:
idx = tc.tool_index
if idx not in self._tool_call_ids:
self._tool_call_ids[idx] = _random_call_id()
if tc.name:
self._tool_call_names[idx] = tc.name
if tc.parameters:
self._tool_call_args[idx] = [tc.parameters]
tool_calls_out: list[dict[str, Any]] = []
for idx in sorted(self._tool_call_names):
tool_calls_out.append(
{
"index": idx,
"id": self._tool_call_ids[idx],
"type": "function",
"function": {
"name": self._tool_call_names[idx],
"arguments": "".join(self._tool_call_args.get(idx, [])),
},
}
)
delta["tool_calls"] = tool_calls_out
has_content = True
if has_content or finish_reason:
return {
"index": 0,
"delta": delta if has_content else {},
"finish_reason": finish_reason,
"logprobs": None,
}
return None
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Use SGLang for input and output processing
#
import asyncio
import logging
import os
import time
from collections.abc import AsyncGenerator
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait as _futures_wait
from dataclasses import dataclass
from typing import Any
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
from dynamo._internal import ModelDeploymentCard
from dynamo.frontend.frontend_args import FrontendConfig
from dynamo.llm import (
KvRouter,
ModelCardInstanceId,
PythonAsyncEngine,
RouterConfig,
RouterMode,
fetch_model,
)
from dynamo.runtime import DistributedRuntime
from .sglang_prepost import (
SglangStreamingPostProcessor,
create_parsers,
preprocess_chat_request,
)
from .utils import PreprocessError, random_uuid, worker_warmup
logger = logging.getLogger(__name__)
def _unsupported_n_error(n: int) -> dict[str, Any]:
return {
"error": {
"message": (
f"Unsupported value: 'n={n}'. "
"This endpoint currently supports only n=1."
),
"type": "invalid_request_error",
"param": "n",
"code": "unsupported_value",
}
}
_FINISH_REASON_MAP: dict[str, str] = {
"eos": "stop",
"stop": "stop",
"length": "length",
"error": "error",
"abort": "stop",
"cancelled": "stop",
"content_filter": "stop",
}
def _map_finish_reason(raw: str | None) -> str | None:
"""Map Dynamo router finish reasons to OpenAI finish reasons.
Exact matches use the dict. Prefixed variants (``error:timeout``,
``abort:cancelled``) are handled by ``startswith`` fallbacks.
"""
if raw is None:
return None
mapped = _FINISH_REASON_MAP.get(raw)
if mapped is not None:
return mapped
if raw.startswith("error"):
return "error"
if raw.startswith("abort"):
return "stop"
return raw
# ---------------------------------------------------------------------------
# Worker process globals (initialized once per process by _init_worker)
# ---------------------------------------------------------------------------
_w_tokenizer: Any = None
_w_tool_call_parser_name: str | None = None
_w_reasoning_parser_name: str | None = None
@dataclass
class SglangPreprocessWorkerResult:
"""Picklable return value from the SGLang preprocess worker."""
prompt_token_ids: list[int]
dynamo_preproc: dict[str, Any]
request: dict[str, Any]
def _init_worker(
model_path: str,
tool_call_parser_name: str | None,
reasoning_parser_name: str | None,
) -> None:
"""Initialize a worker process with its own tokenizer."""
global _w_tokenizer, _w_tool_call_parser_name, _w_reasoning_parser_name
_w_tokenizer = get_tokenizer(model_path)
_w_tool_call_parser_name = tool_call_parser_name
_w_reasoning_parser_name = reasoning_parser_name
def _preprocess_worker(
request: dict[str, Any],
model_name: str,
eos_token_id: int | None,
) -> SglangPreprocessWorkerResult:
"""Preprocess a request in a worker process and return a picklable result."""
pre = preprocess_chat_request(
request,
tokenizer=_w_tokenizer,
tool_call_parser_name=_w_tool_call_parser_name,
reasoning_parser_name=_w_reasoning_parser_name,
)
n = request.get("n", 1)
if n != 1:
raise PreprocessError(_unsupported_n_error(n))
dynamo_preproc = _build_dynamo_preproc(
request, pre.prompt_token_ids, model_name, eos_token_id
)
return SglangPreprocessWorkerResult(
prompt_token_ids=pre.prompt_token_ids,
dynamo_preproc=dynamo_preproc,
request=request,
)
def _build_dynamo_preproc(
request: dict[str, Any],
prompt_token_ids: list[int],
model_name: str,
eos_token_id: int | None,
) -> dict[str, Any]:
"""Build the Dynamo preprocessed request dict from request fields."""
max_tokens = request.get("max_completion_tokens") or request.get("max_tokens")
stop = request.get("stop")
if isinstance(stop, str):
stop = [stop]
elif stop is None:
stop = []
stop_token_ids = request.get("stop_token_ids", [])
# Handle logprobs
logprobs_val = None
logprobs = request.get("logprobs")
top_logprobs = request.get("top_logprobs")
if logprobs is True:
logprobs_val = top_logprobs or 1
elif isinstance(logprobs, int) and not isinstance(logprobs, bool):
logprobs_val = logprobs
elif top_logprobs not in (None, 0):
logprobs_val = top_logprobs
return {
"model": model_name,
"token_ids": prompt_token_ids,
"stop_conditions": {
"max_tokens": max_tokens,
"stop": stop,
"stop_token_ids": stop_token_ids,
"min_tokens": request.get("min_tokens", 0),
"ignore_eos": request.get("ignore_eos", False),
},
"sampling_options": {
"n": request.get("n", 1),
"presence_penalty": request.get("presence_penalty", 0.0),
"frequency_penalty": request.get("frequency_penalty", 0.0),
"repetition_penalty": request.get("repetition_penalty", 1.0),
"temperature": request.get("temperature", 1.0),
"top_p": request.get("top_p", 1.0),
# SGLang uses -1 for "disabled", OpenAI/vLLM use 0
"top_k": request.get("top_k", 0) or -1,
"min_p": request.get("min_p", 0.0),
"seed": request.get("seed"),
},
"output_options": {
"logprobs": logprobs_val,
"prompt_logprobs": None,
"skip_special_tokens": True,
},
"eos_token_ids": [eos_token_id] if eos_token_id is not None else [],
"annotations": [],
}
class SglangProcessor:
def __init__(
self,
tokenizer,
router, # Client or KvRouter
tool_call_parser_name: str | None,
reasoning_parser_name: str | None,
eos_token_id: int | None,
debug_perf: bool = False,
preprocess_pool: ProcessPoolExecutor | None = None,
preprocess_workers: int = 0,
stream_interval: int = 1,
):
self.tokenizer = tokenizer
self.router = router
self.is_kv_router = isinstance(router, KvRouter)
self.tool_call_parser_name = tool_call_parser_name
self.reasoning_parser_name = reasoning_parser_name
self.eos_token_id = eos_token_id
self.debug_perf = debug_perf
self.stream_interval = stream_interval
self.preprocess_pool = preprocess_pool
if preprocess_pool is not None:
self._worker_semaphore: asyncio.Semaphore | None = asyncio.Semaphore(
preprocess_workers + 2
)
else:
self._worker_semaphore = None
async def generator(
self, request: dict[str, Any]
) -> AsyncGenerator[dict[str, Any], None]:
"""Main entry point: preprocess, route, post-process a chat request."""
if self.debug_perf:
from .perf_instrumentation import enter_generator, exit_generator
active = enter_generator()
t_start = time.monotonic()
logger.info("[perf] sglang generator enter: active_requests=%d", active)
try:
if self.preprocess_pool is None:
async for item in self._generator_inner(request):
yield item
else:
async for item in self._generator_inner_pool(request):
yield item
finally:
if self.debug_perf:
active = exit_generator()
elapsed_ms = (time.monotonic() - t_start) * 1000.0
logger.info(
"[perf] sglang generator exit: total=%.2fms active_requests=%d",
elapsed_ms,
active,
)
async def _generator_inner(
self, request: dict[str, Any]
) -> AsyncGenerator[dict[str, Any], None]:
"""Single-process path: preprocess, dispatch, stream post-process."""
request_id = random_uuid()
try:
if self.debug_perf:
t0 = time.monotonic()
pre = preprocess_chat_request(
request,
tokenizer=self.tokenizer,
tool_call_parser_name=self.tool_call_parser_name,
reasoning_parser_name=self.reasoning_parser_name,
)
if self.debug_perf:
t1 = time.monotonic()
logger.info(
"[perf] sglang preprocess: %.2fms (request=%s)",
(t1 - t0) * 1000.0,
request_id,
)
tokens = pre.prompt_token_ids
n = request.get("n", 1)
if n != 1:
logger.error("Unsupported n=%d, only n=1 is supported", n)
yield _unsupported_n_error(n)
return
dynamo_preproc = _build_dynamo_preproc(
request, tokens, request["model"], self.eos_token_id
)
except Exception as exc:
logger.exception("SGLang preprocessing failed for request %s", request_id)
yield {
"error": {
"message": f"Preprocessing error: {exc}",
"type": "internal_error",
}
}
return
post = SglangStreamingPostProcessor(
tokenizer=self.tokenizer,
tool_call_parser=pre.tool_call_parser,
reasoning_parser=pre.reasoning_parser,
)
async for item in self._generate_and_stream(
request_id, request, dynamo_preproc, tokens, post
):
yield item
async def _generator_inner_pool(
self, request: dict[str, Any]
) -> AsyncGenerator[dict[str, Any], None]:
"""Pool path: preprocess in worker, stream in main process."""
request_id = random_uuid()
# --- Phase 1: Preprocess (semaphore held) ---
try:
async with self._worker_semaphore:
future = self.preprocess_pool.submit(
_preprocess_worker,
request,
request["model"],
self.eos_token_id,
)
preproc_result: SglangPreprocessWorkerResult = (
await asyncio.wrap_future(future)
)
except PreprocessError as exc:
yield exc.error_dict
return
except Exception as exc:
logger.exception(
"SGLang worker preprocessing failed for request %s", request_id
)
yield {
"error": {
"message": f"Worker error: {exc}",
"type": "internal_error",
}
}
return
# --- Phase 2: Recreate parsers in main process (not picklable) ---
tool_call_parser, reasoning_parser = create_parsers(
request,
tool_call_parser_name=self.tool_call_parser_name,
reasoning_parser_name=self.reasoning_parser_name,
)
post = SglangStreamingPostProcessor(
tokenizer=self.tokenizer,
tool_call_parser=tool_call_parser,
reasoning_parser=reasoning_parser,
)
async for item in self._generate_and_stream(
request_id,
request,
preproc_result.dynamo_preproc,
preproc_result.prompt_token_ids,
post,
):
yield item
async def _generate_and_stream(
self,
request_id: str,
request: dict[str, Any],
dynamo_preproc: dict[str, Any],
tokens: list[int],
post: SglangStreamingPostProcessor,
) -> AsyncGenerator[dict[str, Any], None]:
"""Shared streaming logic for both single-process and pool paths."""
token_count = 0
post_proc_total_ms = 0.0
created_ts = int(time.time())
stream_interval = self.stream_interval
try:
if self.is_kv_router:
dynamo_stream = await self.router.generate(
token_ids=tokens,
model=dynamo_preproc["model"],
stop_conditions=dynamo_preproc["stop_conditions"],
sampling_options=dynamo_preproc["sampling_options"],
output_options=dynamo_preproc["output_options"],
)
else:
dynamo_stream = await self.router.generate(
dynamo_preproc, annotated=False
)
# Accumulate tokens for batched detokenization when
# stream_interval > 1. Flush every N tokens or on
# finish_reason. Use si=1 for the first chunk to minimize
# TTFT, then switch to the configured interval.
pending_token_ids: list[int] = []
pending_usage: dict[str, Any] | None = None
first_chunk = True
async for dynamo_response in dynamo_stream:
if self.is_kv_router:
engine_response = dynamo_response
elif hasattr(dynamo_response, "data"):
engine_response = dynamo_response.data()
else:
engine_response = dynamo_response
if engine_response is None or "token_ids" not in engine_response:
logger.error("No outputs from engine for request %s", request_id)
yield {
"error": {
"message": (
f"Invalid engine response for request {request_id}"
),
"type": "internal_error",
}
}
break
new_ids = engine_response["token_ids"]
raw_finish = engine_response.get("finish_reason")
finish_reason = _map_finish_reason(raw_finish)
if usage := engine_response.get("completion_usage"):
pending_usage = usage
pending_token_ids.extend(new_ids)
# Flush on finish or when we've accumulated enough tokens.
# First chunk flushes immediately (si=1) to minimize TTFT.
flush_threshold = 1 if first_chunk else stream_interval
if finish_reason or len(pending_token_ids) >= flush_threshold:
mapped_response = {
"token_ids": pending_token_ids,
"finish_reason": finish_reason,
}
if self.debug_perf:
t_pp0 = time.monotonic()
choice = post.process_output(mapped_response)
if self.debug_perf:
t_pp1 = time.monotonic()
post_proc_total_ms += (t_pp1 - t_pp0) * 1000.0
token_count += len(pending_token_ids)
if choice:
dynamo_out: dict[str, Any] = {
"id": request_id,
"choices": [choice],
"created": created_ts,
"model": request["model"],
"object": "chat.completion.chunk",
}
if pending_usage:
dynamo_out["usage"] = pending_usage
yield dynamo_out
pending_token_ids = []
pending_usage = None
first_chunk = False
finally:
if self.debug_perf and token_count > 0:
logger.info(
"[perf] sglang stream done: request=%s tokens=%d "
"post_processor_total=%.2fms (%.3fms/tok)",
request_id,
token_count,
post_proc_total_ms,
post_proc_total_ms / token_count,
)
class SglangEngineFactory:
def __init__(
self,
runtime: DistributedRuntime,
router_config: RouterConfig,
config: FrontendConfig,
debug_perf: bool = False,
tool_call_parser_name: str | None = None,
reasoning_parser_name: str | None = None,
):
self.runtime = runtime
self.router_config = router_config
self.config = config
self.debug_perf = debug_perf
self.tool_call_parser_name = tool_call_parser_name
self.reasoning_parser_name = reasoning_parser_name
self.stream_interval = 20
raw_stream_interval = os.getenv("DYN_SGLANG_STREAM_INTERVAL")
if raw_stream_interval:
try:
self.stream_interval = max(1, int(raw_stream_interval))
except ValueError:
logger.warning(
"Invalid DYN_SGLANG_STREAM_INTERVAL=%r, using default=%d",
raw_stream_interval,
self.stream_interval,
)
async def chat_engine_factory(
self,
instance_id: ModelCardInstanceId,
mdc: ModelDeploymentCard,
) -> PythonAsyncEngine:
"""Called by Rust when a model is discovered."""
model_type = mdc.model_type()
if not model_type.supports_chat():
raise RuntimeError(
f"model type {model_type} is not supported by this factory"
)
loop = asyncio.get_running_loop()
source_path = mdc.source_path()
if not os.path.exists(source_path):
await fetch_model(source_path, ignore_weights=True)
logger.info("Loading SGLang tokenizer from %s", source_path)
tokenizer = get_tokenizer(source_path)
eos_token_id = getattr(tokenizer, "eos_token_id", None)
tool_call_parser_name = self.tool_call_parser_name
reasoning_parser_name = self.reasoning_parser_name
if tool_call_parser_name:
logger.info("SGLang tool call parser: %s", tool_call_parser_name)
if reasoning_parser_name:
logger.info("SGLang reasoning parser: %s", reasoning_parser_name)
(namespace_name, component_name, endpoint_name) = instance_id.triple()
generate_endpoint = self.runtime.endpoint(
f"{namespace_name}.{component_name}.{endpoint_name}"
)
if self.router_config.router_mode == RouterMode.KV:
router = KvRouter(
endpoint=generate_endpoint,
block_size=self.config.kv_cache_block_size or 16,
kv_router_config=self.router_config.kv_router_config,
)
else:
router = await generate_endpoint.client(
router_mode=self.router_config.router_mode
)
preprocess_pool = None
preprocess_workers = self.config.preprocess_workers
if preprocess_workers > 0:
logger.info(
"Creating SGLang preprocess worker pool with %d workers for %s",
preprocess_workers,
source_path,
)
preprocess_pool = ProcessPoolExecutor(
max_workers=preprocess_workers,
initializer=_init_worker,
initargs=(
source_path,
tool_call_parser_name,
reasoning_parser_name,
),
)
futures = [
preprocess_pool.submit(worker_warmup) for _ in range(preprocess_workers)
]
done, not_done = _futures_wait(futures, timeout=120)
if not_done:
for f in not_done:
f.cancel()
preprocess_pool.shutdown(wait=False, cancel_futures=True)
raise RuntimeError(
"Timed out waiting for SGLang preprocess worker pool warmup"
)
try:
for f in done:
f.result()
except Exception:
preprocess_pool.shutdown(wait=False, cancel_futures=True)
raise
logger.info(
"SGLang preprocess worker pool ready (%d workers)", preprocess_workers
)
logger.info("SGLang processor stream_interval=%d", self.stream_interval)
gen = SglangProcessor(
tokenizer,
router,
tool_call_parser_name,
reasoning_parser_name,
eos_token_id,
debug_perf=self.debug_perf,
preprocess_pool=preprocess_pool,
preprocess_workers=preprocess_workers,
stream_interval=self.stream_interval,
)
return PythonAsyncEngine(gen.generator, loop)
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Conformance tests for the SGLang API surface used by the sglang processor.
These tests lock down the SGLang interfaces we depend on so that SGLang
upgrades that break our integration surface are caught immediately.
"""
import inspect
import pickle
# ---------------------------------------------------------------------------
# Import tests -- verify all required modules and symbols exist
# ---------------------------------------------------------------------------
def test_get_tokenizer_importable():
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
assert callable(get_tokenizer)
def test_function_call_parser_importable():
from sglang.srt.function_call.function_call_parser import FunctionCallParser
assert callable(FunctionCallParser)
def test_tool_call_item_importable():
from sglang.srt.function_call.core_types import ToolCallItem
assert callable(ToolCallItem)
def test_reasoning_parser_importable():
from sglang.srt.parser.reasoning_parser import ReasoningParser
assert callable(ReasoningParser)
def test_sglang_tool_importable():
from sglang.srt.entrypoints.openai.protocol import Function, Tool
assert callable(Tool)
assert callable(Function)
# ---------------------------------------------------------------------------
# get_tokenizer signature
# ---------------------------------------------------------------------------
def test_get_tokenizer_accepts_tokenizer_mode():
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
sig = inspect.signature(get_tokenizer)
params = sig.parameters
assert "tokenizer_name" in params or list(params.keys())[0] != ""
assert "tokenizer_mode" in params
# ---------------------------------------------------------------------------
# FunctionCallParser
# ---------------------------------------------------------------------------
def test_function_call_parser_init():
"""Verify FunctionCallParser constructor accepts tools and tool_call_parser."""
from sglang.srt.entrypoints.openai.protocol import Function, Tool
from sglang.srt.function_call.function_call_parser import FunctionCallParser
tools = [
Tool(
type="function",
function=Function(
name="get_weather",
description="Get weather for a city",
parameters={
"type": "object",
"properties": {"city": {"type": "string"}},
},
),
)
]
parser = FunctionCallParser(tools=tools, tool_call_parser="hermes")
assert parser is not None
def test_function_call_parser_enum_keys():
"""Verify commonly-used parser names are accepted."""
from sglang.srt.entrypoints.openai.protocol import Function, Tool
from sglang.srt.function_call.function_call_parser import FunctionCallParser
tools = [
Tool(
type="function",
function=Function(
name="f",
description="d",
parameters={"type": "object", "properties": {}},
),
)
]
# These parser names must remain available
for name in ("hermes", "llama3", "qwen25"):
parser = FunctionCallParser(tools=tools, tool_call_parser=name)
assert parser is not None
def test_parse_stream_chunk_signature():
"""Verify parse_stream_chunk returns (str, list[ToolCallItem])."""
from sglang.srt.entrypoints.openai.protocol import Function, Tool
from sglang.srt.function_call.function_call_parser import FunctionCallParser
tools = [
Tool(
type="function",
function=Function(
name="f",
description="d",
parameters={"type": "object", "properties": {}},
),
)
]
parser = FunctionCallParser(tools=tools, tool_call_parser="hermes")
result = parser.parse_stream_chunk("Hello world")
assert isinstance(result, tuple)
assert len(result) == 2
normal_text, calls = result
assert isinstance(normal_text, str)
assert isinstance(calls, list)
def test_tool_call_item_fields():
"""Verify ToolCallItem has expected fields."""
from sglang.srt.function_call.core_types import ToolCallItem
item = ToolCallItem(tool_index=0, name="test", parameters='{"x": 1}')
assert item.tool_index == 0
assert item.name == "test"
assert item.parameters == '{"x": 1}'
# ---------------------------------------------------------------------------
# ReasoningParser
# ---------------------------------------------------------------------------
def test_reasoning_parser_init():
"""Verify ReasoningParser constructor accepts model_type."""
from sglang.srt.parser.reasoning_parser import ReasoningParser
parser = ReasoningParser(model_type="deepseek-r1", stream_reasoning=True)
assert parser is not None
def test_reasoning_parser_detector_map():
"""Verify commonly-used detector names are accepted."""
from sglang.srt.parser.reasoning_parser import ReasoningParser
for name in ("deepseek-r1", "qwen3"):
parser = ReasoningParser(model_type=name, stream_reasoning=True)
assert parser is not None
def test_reasoning_parser_parse_stream_chunk():
"""Verify parse_stream_chunk returns (reasoning_text, normal_text)."""
from sglang.srt.parser.reasoning_parser import ReasoningParser
parser = ReasoningParser(model_type="deepseek-r1", stream_reasoning=True)
result = parser.parse_stream_chunk("Hello")
assert isinstance(result, tuple)
assert len(result) == 2
# ---------------------------------------------------------------------------
# StreamingParseResult (function call variant)
# ---------------------------------------------------------------------------
def test_streaming_parse_result_fields():
"""Verify function-call StreamingParseResult has expected fields."""
from sglang.srt.function_call.core_types import StreamingParseResult
r = StreamingParseResult(normal_text="hello", calls=[])
assert r.normal_text == "hello"
assert r.calls == []
# ---------------------------------------------------------------------------
# Tool / Function protocol models
# ---------------------------------------------------------------------------
def test_sglang_tool_model_dump():
"""Verify Tool.model_dump() produces a dict suitable for chat templates."""
from sglang.srt.entrypoints.openai.protocol import Function, Tool
tool = Tool(
type="function",
function=Function(
name="search",
description="Search the web",
parameters={"type": "object", "properties": {"q": {"type": "string"}}},
),
)
d = tool.model_dump()
assert d["type"] == "function"
assert d["function"]["name"] == "search"
assert "properties" in d["function"]["parameters"]
# ---------------------------------------------------------------------------
# Picklability (required for ProcessPoolExecutor worker results)
# ---------------------------------------------------------------------------
def test_preprocess_result_picklability():
"""Verify SglangPreprocessWorkerResult survives pickle round-trip."""
from dynamo.frontend.sglang_processor import SglangPreprocessWorkerResult
result = SglangPreprocessWorkerResult(
prompt_token_ids=[1, 2, 3],
dynamo_preproc={
"model": "test",
"token_ids": [1, 2, 3],
"stop_conditions": {},
"sampling_options": {},
"output_options": {},
"eos_token_ids": [],
"annotations": [],
},
request={"model": "test", "messages": []},
)
restored = pickle.loads(pickle.dumps(result))
assert restored.prompt_token_ids == result.prompt_token_ids
assert restored.dynamo_preproc == result.dynamo_preproc
assert restored.request == result.request
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Unit tests for SGLang processor components.
Tests for preprocessing, sampling parameter projection, finish reason mapping,
incremental detokenization, error handling, and deprecation warnings.
Parallels test_vllm_unit.py for the vLLM backend.
"""
import pytest
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
from dynamo.frontend.sglang_prepost import (
SglangPreprocessResult,
SglangStreamingPostProcessor,
convert_tools,
create_parsers,
preprocess_chat_request,
)
from dynamo.frontend.sglang_processor import (
SglangPreprocessWorkerResult,
_build_dynamo_preproc,
_map_finish_reason,
)
from dynamo.frontend.utils import PreprocessError, random_call_id, random_uuid
MODEL = "Qwen/Qwen3-0.6B"
@pytest.fixture(scope="module")
def tokenizer():
return get_tokenizer(MODEL)
# ---------------------------------------------------------------------------
# _build_dynamo_preproc: sampling parameter projection
# ---------------------------------------------------------------------------
class TestBuildDynamoPreproc:
"""Test sampling parameter projection from request to Dynamo format."""
def test_defaults(self):
"""Default sampling options when request has minimal fields."""
result = _build_dynamo_preproc(
{"model": "test", "messages": []},
prompt_token_ids=[1, 2, 3],
model_name="test",
eos_token_id=2,
)
sampling = result["sampling_options"]
assert sampling["n"] == 1
assert sampling["temperature"] == 1.0
assert sampling["top_p"] == 1.0
assert sampling["top_k"] == -1 # 0 -> -1 for SGLang
assert sampling["min_p"] == 0.0
assert sampling["presence_penalty"] == 0.0
assert sampling["frequency_penalty"] == 0.0
assert sampling["repetition_penalty"] == 1.0
assert sampling["seed"] is None
def test_top_k_zero_maps_to_negative_one(self):
"""SGLang uses -1 for disabled top_k, OpenAI uses 0."""
result = _build_dynamo_preproc(
{"model": "test", "top_k": 0},
prompt_token_ids=[1],
model_name="test",
eos_token_id=None,
)
assert result["sampling_options"]["top_k"] == -1
def test_top_k_positive_preserved(self):
"""Positive top_k values pass through unchanged."""
result = _build_dynamo_preproc(
{"model": "test", "top_k": 50},
prompt_token_ids=[1],
model_name="test",
eos_token_id=None,
)
assert result["sampling_options"]["top_k"] == 50
def test_sampling_options_from_request(self):
"""All sampling fields are projected from request."""
request = {
"model": "test",
"temperature": 0.7,
"top_p": 0.9,
"top_k": 40,
"min_p": 0.05,
"presence_penalty": 0.1,
"frequency_penalty": 0.2,
"repetition_penalty": 1.1,
"seed": 42,
"n": 1,
}
result = _build_dynamo_preproc(request, [1], "test", None)
sampling = result["sampling_options"]
assert sampling["temperature"] == 0.7
assert sampling["top_p"] == 0.9
assert sampling["top_k"] == 40
assert sampling["min_p"] == 0.05
assert sampling["presence_penalty"] == 0.1
assert sampling["frequency_penalty"] == 0.2
assert sampling["repetition_penalty"] == 1.1
assert sampling["seed"] == 42
def test_stop_conditions_string(self):
"""Single stop string is wrapped in a list."""
result = _build_dynamo_preproc(
{"model": "test", "stop": "END"},
[1],
"test",
None,
)
assert result["stop_conditions"]["stop"] == ["END"]
def test_stop_conditions_list(self):
"""Stop list passes through."""
result = _build_dynamo_preproc(
{"model": "test", "stop": ["END", "STOP"]},
[1],
"test",
None,
)
assert result["stop_conditions"]["stop"] == ["END", "STOP"]
def test_stop_conditions_none(self):
"""None stop becomes empty list."""
result = _build_dynamo_preproc(
{"model": "test"},
[1],
"test",
None,
)
assert result["stop_conditions"]["stop"] == []
def test_max_tokens_from_max_completion_tokens(self):
"""max_completion_tokens takes precedence over max_tokens."""
result = _build_dynamo_preproc(
{"model": "test", "max_completion_tokens": 200, "max_tokens": 100},
[1],
"test",
None,
)
assert result["stop_conditions"]["max_tokens"] == 200
def test_max_tokens_fallback(self):
"""max_tokens used when max_completion_tokens not set."""
result = _build_dynamo_preproc(
{"model": "test", "max_tokens": 100},
[1],
"test",
None,
)
assert result["stop_conditions"]["max_tokens"] == 100
def test_eos_token_id_present(self):
"""eos_token_id is wrapped in a list."""
result = _build_dynamo_preproc({"model": "test"}, [1], "test", 151643)
assert result["eos_token_ids"] == [151643]
def test_eos_token_id_none(self):
"""None eos_token_id becomes empty list."""
result = _build_dynamo_preproc({"model": "test"}, [1], "test", None)
assert result["eos_token_ids"] == []
def test_logprobs_true_with_top_logprobs(self):
"""logprobs=True with top_logprobs=5 yields 5."""
result = _build_dynamo_preproc(
{"model": "test", "logprobs": True, "top_logprobs": 5},
[1],
"test",
None,
)
assert result["output_options"]["logprobs"] == 5
def test_logprobs_true_without_top_logprobs(self):
"""logprobs=True without top_logprobs yields 1."""
result = _build_dynamo_preproc(
{"model": "test", "logprobs": True},
[1],
"test",
None,
)
assert result["output_options"]["logprobs"] == 1
def test_logprobs_integer(self):
"""Integer logprobs pass through."""
result = _build_dynamo_preproc(
{"model": "test", "logprobs": 3},
[1],
"test",
None,
)
assert result["output_options"]["logprobs"] == 3
def test_logprobs_disabled(self):
"""No logprobs yields None."""
result = _build_dynamo_preproc(
{"model": "test"},
[1],
"test",
None,
)
assert result["output_options"]["logprobs"] is None
def test_model_name_and_token_ids(self):
"""Model name and token_ids are set correctly."""
result = _build_dynamo_preproc(
{"model": "test"},
[10, 20, 30],
"my-model",
None,
)
assert result["model"] == "my-model"
assert result["token_ids"] == [10, 20, 30]
# ---------------------------------------------------------------------------
# _map_finish_reason
# ---------------------------------------------------------------------------
class TestMapFinishReason:
"""Test Dynamo-to-OpenAI finish reason mapping."""
def test_none_passthrough(self):
assert _map_finish_reason(None) is None
def test_eos_to_stop(self):
assert _map_finish_reason("eos") == "stop"
def test_stop_to_stop(self):
assert _map_finish_reason("stop") == "stop"
def test_length(self):
assert _map_finish_reason("length") == "length"
def test_error(self):
assert _map_finish_reason("error") == "error"
def test_error_prefix(self):
"""error:* strings all map to 'error'."""
assert _map_finish_reason("error:timeout") == "error"
def test_abort_exact(self):
assert _map_finish_reason("abort") == "stop"
def test_abort_prefix(self):
"""abort:* strings all map to 'stop'."""
assert _map_finish_reason("abort:cancelled") == "stop"
def test_cancelled(self):
assert _map_finish_reason("cancelled") == "stop"
def test_content_filter(self):
assert _map_finish_reason("content_filter") == "stop"
def test_unknown_passthrough(self):
"""Unknown reasons pass through unchanged."""
assert _map_finish_reason("tool_calls") == "tool_calls"
# ---------------------------------------------------------------------------
# convert_tools
# ---------------------------------------------------------------------------
class TestConvertTools:
"""Test OpenAI tool dict to SGLang Tool conversion."""
def test_none_returns_none(self):
assert convert_tools(None) is None
def test_empty_list_returns_none(self):
assert convert_tools([]) is None
def test_single_tool(self):
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
},
},
}
]
result = convert_tools(tools)
assert len(result) == 1
assert result[0].function.name == "get_weather"
assert result[0].type == "function"
def test_multiple_tools(self):
tools = [
{
"type": "function",
"function": {"name": "f1", "description": "d1", "parameters": {}},
},
{
"type": "function",
"function": {"name": "f2", "description": "d2", "parameters": {}},
},
]
result = convert_tools(tools)
assert len(result) == 2
assert result[0].function.name == "f1"
assert result[1].function.name == "f2"
def test_model_dump_roundtrip(self):
"""Converted tools can be model_dump()'d for chat templates."""
tools = [
{
"type": "function",
"function": {
"name": "search",
"description": "Search",
"parameters": {
"type": "object",
"properties": {"q": {"type": "string"}},
},
},
}
]
result = convert_tools(tools)
dumped = result[0].model_dump()
assert dumped["function"]["name"] == "search"
assert "properties" in dumped["function"]["parameters"]
# ---------------------------------------------------------------------------
# create_parsers
# ---------------------------------------------------------------------------
class TestCreateParsers:
"""Test parser creation logic."""
def test_no_parsers(self):
tcp, rp = create_parsers(
{}, tool_call_parser_name=None, reasoning_parser_name=None
)
assert tcp is None
assert rp is None
def test_reasoning_only(self):
tcp, rp = create_parsers(
{}, tool_call_parser_name=None, reasoning_parser_name="qwen3"
)
assert tcp is None
assert rp is not None
def test_tool_parser_requires_tools(self):
"""Tool parser is not created if no tools in request."""
tcp, rp = create_parsers(
{}, tool_call_parser_name="hermes", reasoning_parser_name=None
)
assert tcp is None
def test_tool_parser_with_tools(self):
"""Tool parser is created when tools are present."""
request = {
"tools": [
{
"type": "function",
"function": {
"name": "f",
"description": "d",
"parameters": {},
},
}
]
}
tcp, rp = create_parsers(
request, tool_call_parser_name="hermes", reasoning_parser_name=None
)
assert tcp is not None
assert rp is None
def test_tool_choice_none_skips_parser(self):
"""tool_choice='none' should skip tool parser creation."""
request = {
"tools": [
{
"type": "function",
"function": {
"name": "f",
"description": "d",
"parameters": {},
},
}
],
"tool_choice": "none",
}
tcp, rp = create_parsers(
request, tool_call_parser_name="hermes", reasoning_parser_name=None
)
assert tcp is None
def test_both_parsers(self):
"""Both parsers created when tools and reasoning requested."""
request = {
"tools": [
{
"type": "function",
"function": {
"name": "f",
"description": "d",
"parameters": {},
},
}
]
}
tcp, rp = create_parsers(
request,
tool_call_parser_name="hermes",
reasoning_parser_name="qwen3",
)
assert tcp is not None
assert rp is not None
# ---------------------------------------------------------------------------
# preprocess_chat_request
# ---------------------------------------------------------------------------
class TestPreprocessChatRequest:
"""Test end-to-end preprocessing with a real tokenizer."""
def test_basic_chat(self, tokenizer):
"""Simple user message preprocesses to non-empty token IDs."""
request = {
"model": MODEL,
"messages": [{"role": "user", "content": "Hello"}],
}
result = preprocess_chat_request(
request,
tokenizer=tokenizer,
tool_call_parser_name=None,
reasoning_parser_name=None,
)
assert isinstance(result, SglangPreprocessResult)
assert len(result.prompt_token_ids) > 0
assert result.tool_call_parser is None
assert result.reasoning_parser is None
def test_multi_turn(self, tokenizer):
"""Multi-turn conversation produces more tokens than single turn."""
single = preprocess_chat_request(
{
"model": MODEL,
"messages": [{"role": "user", "content": "Hello"}],
},
tokenizer=tokenizer,
tool_call_parser_name=None,
reasoning_parser_name=None,
)
multi = preprocess_chat_request(
{
"model": MODEL,
"messages": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"},
],
},
tokenizer=tokenizer,
tool_call_parser_name=None,
reasoning_parser_name=None,
)
assert len(multi.prompt_token_ids) > len(single.prompt_token_ids)
def test_with_tools(self, tokenizer):
"""Tools are passed through to chat template, producing more tokens."""
without_tools = preprocess_chat_request(
{
"model": MODEL,
"messages": [{"role": "user", "content": "Hello"}],
},
tokenizer=tokenizer,
tool_call_parser_name=None,
reasoning_parser_name=None,
)
with_tools = preprocess_chat_request(
{
"model": MODEL,
"messages": [{"role": "user", "content": "Hello"}],
"tools": [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a city",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
},
},
}
],
},
tokenizer=tokenizer,
tool_call_parser_name="hermes",
reasoning_parser_name=None,
)
assert len(with_tools.prompt_token_ids) > len(without_tools.prompt_token_ids)
assert with_tools.tool_call_parser is not None
def test_with_reasoning_parser(self, tokenizer):
"""Reasoning parser is attached to result."""
result = preprocess_chat_request(
{
"model": MODEL,
"messages": [{"role": "user", "content": "Hello"}],
},
tokenizer=tokenizer,
tool_call_parser_name=None,
reasoning_parser_name="qwen3",
)
assert result.reasoning_parser is not None
def test_system_message(self, tokenizer):
"""System message is included in tokenization."""
without_system = preprocess_chat_request(
{
"model": MODEL,
"messages": [{"role": "user", "content": "Hello"}],
},
tokenizer=tokenizer,
tool_call_parser_name=None,
reasoning_parser_name=None,
)
with_system = preprocess_chat_request(
{
"model": MODEL,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
],
},
tokenizer=tokenizer,
tool_call_parser_name=None,
reasoning_parser_name=None,
)
assert len(with_system.prompt_token_ids) > len(without_system.prompt_token_ids)
# ---------------------------------------------------------------------------
# SglangStreamingPostProcessor: incremental detokenization
# ---------------------------------------------------------------------------
class TestIncrementalDetokenization:
"""Test the sliding-window incremental detokenizer."""
def test_basic_decode(self, tokenizer):
"""Tokens decode to expected text."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
token_ids = tokenizer.encode("Hello world")
choice = post.process_output({"token_ids": token_ids, "finish_reason": "stop"})
assert choice is not None
assert "Hello world" in choice["delta"]["content"]
def test_incremental_batches(self, tokenizer):
"""Batched tokens produce the full text when concatenated."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
text = "The quick brown fox jumps over the lazy dog."
token_ids = tokenizer.encode(text)
content = ""
batch_size = 3
for i in range(0, len(token_ids), batch_size):
batch = token_ids[i : i + batch_size]
is_last = i + batch_size >= len(token_ids)
choice = post.process_output(
{"token_ids": batch, "finish_reason": "stop" if is_last else None}
)
if choice and "content" in choice.get("delta", {}):
content += choice["delta"]["content"]
assert text in content
def test_empty_token_ids(self, tokenizer):
"""Empty token_ids with no finish_reason returns None."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
result = post.process_output({"token_ids": [], "finish_reason": None})
assert result is None
def test_finish_reason_only(self, tokenizer):
"""finish_reason without new tokens emits a finish chunk."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
# First send some tokens
token_ids = tokenizer.encode("Hello")
post.process_output({"token_ids": token_ids, "finish_reason": None})
# Then send finish with no new tokens
choice = post.process_output({"token_ids": [], "finish_reason": "stop"})
assert choice is not None
assert choice["finish_reason"] == "stop"
def test_lookback_trimming(self, tokenizer):
"""Verify _all_token_ids doesn't grow unbounded."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
# Send enough tokens to trigger trimming (LOOKBACK * 16 = 96)
for _ in range(200):
post.process_output({"token_ids": [1], "finish_reason": None})
# Should be trimmed, not 200 tokens
assert len(post._all_token_ids) < 200
# ---------------------------------------------------------------------------
# SglangStreamingPostProcessor: fast plain text path
# ---------------------------------------------------------------------------
class TestFastPlainTextPath:
"""Test the fast path when no parsers are active."""
def test_fast_path_active(self, tokenizer):
"""No parsers -> fast plain text path."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
assert post._fast_plain_text is True
def test_fast_path_inactive_with_reasoning(self, tokenizer):
"""Reasoning parser disables fast path."""
from sglang.srt.parser.reasoning_parser import ReasoningParser
rp = ReasoningParser(model_type="qwen3", stream_reasoning=True)
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=rp
)
assert post._fast_plain_text is False
def test_fast_path_content_output(self, tokenizer):
"""Fast path produces role and content in delta."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
token_ids = tokenizer.encode("Hello")
choice = post.process_output({"token_ids": token_ids, "finish_reason": None})
assert choice is not None
assert choice["delta"]["role"] == "assistant"
assert "content" in choice["delta"]
assert choice["index"] == 0
assert choice["logprobs"] is None
# ---------------------------------------------------------------------------
# SglangStreamingPostProcessor: reasoning parsing
# ---------------------------------------------------------------------------
class TestReasoningParsing:
"""Test reasoning content extraction via post-processor."""
def test_reasoning_separated(self, tokenizer):
"""<think>...</think> content goes to reasoning_content field."""
from sglang.srt.parser.reasoning_parser import ReasoningParser
rp = ReasoningParser(model_type="qwen3", stream_reasoning=True)
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=rp
)
text = "<think>\nLet me think about this.\n</think>\n\nThe answer is 42."
token_ids = tokenizer.encode(text)
reasoning = ""
content = ""
for i in range(0, len(token_ids), 5):
batch = token_ids[i : i + 5]
is_last = i + 5 >= len(token_ids)
choice = post.process_output(
{"token_ids": batch, "finish_reason": "stop" if is_last else None}
)
if choice:
delta = choice.get("delta", {})
reasoning += delta.get("reasoning_content", "")
content += delta.get("content", "")
assert "think about this" in reasoning
assert "42" in content
# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------
class TestUtilities:
"""Test shared utility functions."""
def test_random_uuid_format(self):
"""random_uuid produces 16-char hex string."""
uid = random_uuid()
assert len(uid) == 16
int(uid, 16) # Should not raise
def test_random_uuid_unique(self):
"""Two calls produce different UUIDs."""
assert random_uuid() != random_uuid()
def test_random_call_id_format(self):
"""random_call_id produces call_<16hex> format."""
cid = random_call_id()
assert cid.startswith("call_")
assert len(cid) == 21 # "call_" + 16 hex chars
int(cid[5:], 16) # Should not raise
def test_preprocess_error(self):
"""PreprocessError stores error_dict and stringifies."""
err = PreprocessError({"error": {"message": "n=2 unsupported"}})
assert err.error_dict == {"error": {"message": "n=2 unsupported"}}
assert "n=2" in str(err)
# ---------------------------------------------------------------------------
# SglangPreprocessWorkerResult picklability
# ---------------------------------------------------------------------------
class TestWorkerResultPicklability:
"""Test that worker results survive ProcessPoolExecutor round-trip."""
def test_full_result(self):
"""Full SglangPreprocessWorkerResult survives pickle round-trip."""
import pickle
result = SglangPreprocessWorkerResult(
prompt_token_ids=[1, 2, 3],
dynamo_preproc={
"model": "test-model",
"token_ids": [1, 2, 3],
"stop_conditions": {
"max_tokens": 100,
"stop": [],
"stop_token_ids": [2],
"min_tokens": 0,
"ignore_eos": False,
},
"sampling_options": {
"n": 1,
"presence_penalty": 0.0,
"frequency_penalty": 0.0,
"repetition_penalty": 1.0,
"temperature": 1.0,
"top_p": 1.0,
"top_k": -1,
"min_p": 0.0,
"seed": None,
},
"output_options": {
"logprobs": None,
"prompt_logprobs": None,
"skip_special_tokens": True,
},
"eos_token_ids": [2],
"annotations": [],
},
request={"model": "test-model", "messages": [], "tools": None},
)
data = pickle.dumps(result)
restored = pickle.loads(data)
assert restored.prompt_token_ids == result.prompt_token_ids
assert restored.dynamo_preproc == result.dynamo_preproc
assert restored.request == result.request
# ---------------------------------------------------------------------------
# Deprecation warning for --use-sglang-tokenizer
# ---------------------------------------------------------------------------
class TestDeprecationWarning:
"""Test that --use-sglang-tokenizer deprecation warning is in place."""
def test_deprecation_warning_in_source(self):
"""Verify parse_args contains FutureWarning for use_sglang_tokenizer.
The warning is embedded in parse_args() which requires full ServerArgs
initialization -- too heavy for a unit test. Instead, verify the warning
text exists in the source code so it isn't accidentally removed.
"""
import inspect
from dynamo.sglang import args as sglang_args
source = inspect.getsource(sglang_args)
assert "use_sglang_tokenizer" in source
assert "FutureWarning" in source
assert "--dyn-chat-processor sglang" in source
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for tool call parsing in SglangStreamingPostProcessor.
Covers the interaction between SGLang's FunctionCallParser, ReasoningParser,
and our post-processor's accumulate-and-emit-on-finish logic, including the
parse_non_stream fallback for the chunking-sensitivity issue in
BaseFormatDetector.parse_streaming_increment.
"""
import json
import pytest
from sglang.srt.entrypoints.openai.protocol import Function as SglangFunction
from sglang.srt.entrypoints.openai.protocol import Tool as SglangTool
from sglang.srt.function_call.function_call_parser import FunctionCallParser
from sglang.srt.parser.reasoning_parser import ReasoningParser
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
from dynamo.frontend.sglang_prepost import SglangStreamingPostProcessor
MODEL = "Qwen/Qwen3-0.6B"
@pytest.fixture(scope="module")
def tokenizer():
return get_tokenizer(MODEL)
TOOLS = [
SglangTool(
type="function",
function=SglangFunction(
name="search_gutenberg_books",
description="Search for books in the Project Gutenberg library",
parameters={
"type": "object",
"properties": {
"search_terms": {
"type": "array",
"items": {"type": "string"},
"description": "List of search terms to find books",
}
},
"required": ["search_terms"],
},
),
),
SglangTool(
type="function",
function=SglangFunction(
name="get_weather",
description="Get weather for a city",
parameters={
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
),
),
]
def _run_postprocessor(tokenizer, full_text, batch_size, *, use_reasoning=True):
"""Tokenize text, feed through post-processor in batches, return all choices."""
tcp = FunctionCallParser(tools=TOOLS, tool_call_parser="hermes")
rp = (
ReasoningParser(model_type="qwen3", stream_reasoning=True)
if use_reasoning
else None
)
post = SglangStreamingPostProcessor(
tokenizer=tokenizer,
tool_call_parser=tcp,
reasoning_parser=rp,
)
token_ids = tokenizer.encode(full_text)
results = []
for i in range(0, len(token_ids), batch_size):
batch = token_ids[i : i + batch_size]
is_last = i + batch_size >= len(token_ids)
choice = post.process_output(
{"token_ids": batch, "finish_reason": "stop" if is_last else None}
)
if choice:
results.append(choice)
return results
def _extract_tool_calls(results):
"""Extract tool_calls from the list of choices."""
for r in results:
tc = r.get("delta", {}).get("tool_calls")
if tc:
return tc
return []
# ---------------------------------------------------------------------------
# Single tool call
# ---------------------------------------------------------------------------
class TestSingleToolCall:
"""Single tool call with reasoning, various batch sizes."""
TEXT = (
"<think>\nLet me search for books.\n</think>\n\n"
'<tool_call>\n{"name": "search_gutenberg_books", '
'"arguments": {"search_terms": ["James Joyce"]}}\n</tool_call>'
)
def test_large_batches(self, tokenizer):
"""stream_interval=20 scenario -- complete JSON in one chunk."""
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 20))
assert len(tc) == 1
assert tc[0]["function"]["name"] == "search_gutenberg_books"
args = json.loads(tc[0]["function"]["arguments"])
assert args == {"search_terms": ["James Joyce"]}
def test_small_batches(self, tokenizer):
"""Token-by-token-ish scenario -- streaming deltas work directly."""
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 3))
assert len(tc) == 1
assert tc[0]["function"]["name"] == "search_gutenberg_books"
args = json.loads(tc[0]["function"]["arguments"])
assert args == {"search_terms": ["James Joyce"]}
def test_medium_batches(self, tokenizer):
"""Intermediate batch size."""
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 10))
assert len(tc) == 1
assert tc[0]["function"]["name"] == "search_gutenberg_books"
args = json.loads(tc[0]["function"]["arguments"])
assert args == {"search_terms": ["James Joyce"]}
def test_tool_call_has_id_and_type(self, tokenizer):
"""Each tool call must have id and type fields."""
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 20))
assert tc[0]["id"].startswith("call_")
assert tc[0]["type"] == "function"
assert tc[0]["index"] == 0
# ---------------------------------------------------------------------------
# No reasoning parser
# ---------------------------------------------------------------------------
class TestNoReasoningParser:
"""Tool calls without reasoning parser active."""
TEXT = (
'<tool_call>\n{"name": "get_weather", '
'"arguments": {"city": "Paris"}}\n</tool_call>'
)
def test_large_batches(self, tokenizer):
tc = _extract_tool_calls(
_run_postprocessor(tokenizer, self.TEXT, 15, use_reasoning=False)
)
assert len(tc) == 1
assert tc[0]["function"]["name"] == "get_weather"
args = json.loads(tc[0]["function"]["arguments"])
assert args == {"city": "Paris"}
def test_small_batches(self, tokenizer):
tc = _extract_tool_calls(
_run_postprocessor(tokenizer, self.TEXT, 3, use_reasoning=False)
)
assert len(tc) == 1
assert tc[0]["function"]["name"] == "get_weather"
args = json.loads(tc[0]["function"]["arguments"])
assert args == {"city": "Paris"}
# ---------------------------------------------------------------------------
# Multiple tool calls
# ---------------------------------------------------------------------------
class TestMultipleToolCalls:
"""Two tool calls in a single response."""
TEXT = (
"<think>\nI'll search and check weather.\n</think>\n\n"
'<tool_call>\n{"name": "search_gutenberg_books", '
'"arguments": {"search_terms": ["Joyce"]}}\n</tool_call>\n'
'<tool_call>\n{"name": "get_weather", '
'"arguments": {"city": "London"}}\n</tool_call>'
)
def test_both_tools_present(self, tokenizer):
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 10))
assert len(tc) == 2
names = {t["function"]["name"] for t in tc}
assert names == {"search_gutenberg_books", "get_weather"}
def test_arguments_correct(self, tokenizer):
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 10))
by_name = {t["function"]["name"]: t for t in tc}
assert json.loads(
by_name["search_gutenberg_books"]["function"]["arguments"]
) == {"search_terms": ["Joyce"]}
assert json.loads(by_name["get_weather"]["function"]["arguments"]) == {
"city": "London"
}
def test_distinct_ids(self, tokenizer):
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 10))
ids = [t["id"] for t in tc]
assert len(set(ids)) == len(ids), "Tool call IDs must be unique"
# ---------------------------------------------------------------------------
# Content alongside tool calls
# ---------------------------------------------------------------------------
class TestContentWithToolCalls:
"""Reasoning content and regular content are preserved alongside tool calls."""
TEXT = (
"<think>\nThinking about it.\n</think>\n\n"
'<tool_call>\n{"name": "get_weather", '
'"arguments": {"city": "NYC"}}\n</tool_call>'
)
def test_reasoning_content_present(self, tokenizer):
results = _run_postprocessor(tokenizer, self.TEXT, 20)
reasoning = ""
for r in results:
rc = r.get("delta", {}).get("reasoning_content", "")
reasoning += rc
assert "Thinking about it" in reasoning
def test_content_is_whitespace_only(self, tokenizer):
"""Content between </think> and <tool_call> should be whitespace only."""
results = _run_postprocessor(tokenizer, self.TEXT, 20)
content = ""
for r in results:
c = r.get("delta", {}).get("content", "")
content += c
assert content.strip() == ""
# ---------------------------------------------------------------------------
# No tool calls (plain text)
# ---------------------------------------------------------------------------
class TestNoToolCalls:
"""When no tool call markup is present, no tool_calls should appear."""
TEXT = "<think>\nJust thinking.\n</think>\n\nHello, world!"
def test_no_tool_calls_emitted(self, tokenizer):
tc = _extract_tool_calls(_run_postprocessor(tokenizer, self.TEXT, 10))
assert tc == []
def test_content_preserved(self, tokenizer):
results = _run_postprocessor(tokenizer, self.TEXT, 10)
content = ""
for r in results:
c = r.get("delta", {}).get("content", "")
content += c
assert "Hello, world!" in content
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Shared utilities for frontend chat processors (vLLM, SGLang)."""
import uuid
from typing import Any
_MASK_64_BITS = (1 << 64) - 1
def random_uuid() -> str:
"""Generate a random 16-character hex UUID."""
return f"{uuid.uuid4().int & _MASK_64_BITS:016x}"
def random_call_id() -> str:
"""Generate a random tool call ID in OpenAI format."""
return f"call_{uuid.uuid4().int & _MASK_64_BITS:016x}"
def worker_warmup() -> bool:
"""Dummy task to ensure a ProcessPoolExecutor worker is fully initialized."""
return True
class PreprocessError(Exception):
"""Raised by preprocess workers for user-facing errors (e.g., n!=1)."""
def __init__(self, error_dict: dict[str, Any]):
self.error_dict = error_dict
super().__init__(str(error_dict))
...@@ -9,9 +9,11 @@ import asyncio ...@@ -9,9 +9,11 @@ import asyncio
import logging import logging
import os import os
import time import time
import uuid
from argparse import Namespace from argparse import Namespace
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait as _futures_wait
from dataclasses import dataclass
from typing import Any from typing import Any
from vllm.config import CacheConfig, LoadConfig, ModelConfig, VllmConfig from vllm.config import CacheConfig, LoadConfig, ModelConfig, VllmConfig
...@@ -36,12 +38,16 @@ from dynamo.llm import ( ...@@ -36,12 +38,16 @@ from dynamo.llm import (
) )
from dynamo.runtime import Client, DistributedRuntime from dynamo.runtime import Client, DistributedRuntime
from .prepost import StreamingPostProcessor, preprocess_chat_request from .prepost import (
StreamingPostProcessor,
preprocess_chat_request,
preprocess_chat_request_sync,
)
from .utils import PreprocessError, random_uuid, worker_warmup
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_MASK_64_BITS = (1 << 64) - 1
_FINISH_REASON_MAP: dict[str, FinishReason] = { _FINISH_REASON_MAP: dict[str, FinishReason] = {
"eos": FinishReason.STOP, "eos": FinishReason.STOP,
"stop": FinishReason.STOP, "stop": FinishReason.STOP,
...@@ -52,10 +58,6 @@ _FINISH_REASON_MAP: dict[str, FinishReason] = { ...@@ -52,10 +58,6 @@ _FINISH_REASON_MAP: dict[str, FinishReason] = {
} }
def random_uuid() -> str:
return f"{uuid.uuid4().int & _MASK_64_BITS:016x}" # 16 hex chars
def map_finish_reason(raw_reason: str | None) -> FinishReason | None: def map_finish_reason(raw_reason: str | None) -> FinishReason | None:
if raw_reason is None: if raw_reason is None:
return None return None
...@@ -72,6 +74,181 @@ def map_finish_reason(raw_reason: str | None) -> FinishReason | None: ...@@ -72,6 +74,181 @@ def map_finish_reason(raw_reason: str | None) -> FinishReason | None:
return mapped return mapped
# --- Worker process globals (initialized once per process by _init_worker) ---
_w_input_processor: InputProcessor | None = None
_w_tokenizer: Any = None
_w_tool_parser_class: type[ToolParser] | None = None
@dataclass
class PreprocessWorkerResult:
"""Picklable return value from the preprocess worker."""
dynamo_preproc: dict[str, Any]
tokens: list[int]
vllm_preproc: EngineCoreRequest
sampling_params: SamplingParams
request_for_sampling: Any # ChatCompletionRequest (Pydantic model, picklable)
chat_template_kwargs: dict[str, Any]
def _init_worker(
model_path: str,
tokenizer_mode: str,
config_format: str,
load_format: str,
tool_parser_name: str | None,
) -> None:
"""Initialize a worker process with its own VllmConfig and InputProcessor."""
global _w_input_processor, _w_tokenizer, _w_tool_parser_class
global _w_reasoning_parser_class
model_config = ModelConfig(
model=model_path,
tokenizer_mode=tokenizer_mode,
config_format=config_format,
)
vllm_config = VllmConfig(
model_config=model_config,
load_config=LoadConfig(load_format=load_format),
cache_config=CacheConfig(),
)
_w_input_processor = InputProcessor(vllm_config)
_w_tokenizer = _w_input_processor.get_tokenizer()
if tool_parser_name:
_w_tool_parser_class = ToolParserManager.get_tool_parser(tool_parser_name)
else:
_w_tool_parser_class = None
def _preprocess_worker(
request: dict[str, Any],
request_id: str,
model_name: str,
) -> PreprocessWorkerResult:
"""Preprocess a request in a worker process and return a picklable result."""
assert _w_input_processor is not None
pre = preprocess_chat_request_sync(
request,
tokenizer=_w_tokenizer,
renderer=_w_input_processor.renderer,
tool_parser_class=_w_tool_parser_class,
)
request_for_sampling = pre.request_for_sampling
engine_prompt = pre.engine_prompt
tokens = pre.prompt_token_ids
if request_for_sampling.max_completion_tokens is not None:
max_tokens = request_for_sampling.max_completion_tokens
elif request_for_sampling.max_tokens is not None:
max_tokens = request_for_sampling.max_tokens
else:
max_tokens = None
sampling_params = SamplingParams(
output_kind=RequestOutputKind.DELTA,
max_tokens=max_tokens,
)
for k, v in _w_input_processor.generation_config_fields.items():
if hasattr(sampling_params, k):
setattr(sampling_params, k, v)
sampling_fields = (
set(getattr(SamplingParams, "__annotations__", ()))
& set(type(request_for_sampling).model_fields)
) - {"max_tokens", "logprobs", "output_kind"}
for k in sorted(sampling_fields):
v = getattr(request_for_sampling, k, None)
if v is not None:
setattr(sampling_params, k, v)
logprobs = request_for_sampling.logprobs
top_logprobs = request_for_sampling.top_logprobs
if logprobs is True:
sampling_params.logprobs = top_logprobs or 1
elif isinstance(logprobs, int) and not isinstance(logprobs, bool):
sampling_params.logprobs = logprobs
elif top_logprobs not in (None, 0):
sampling_params.logprobs = top_logprobs
prompt_inputs = TokensPrompt(prompt_token_ids=tokens)
if "multi_modal_data" in engine_prompt:
prompt_inputs["multi_modal_data"] = engine_prompt["multi_modal_data"]
if "multi_modal_uuids" in engine_prompt:
prompt_inputs["multi_modal_uuids"] = engine_prompt["multi_modal_uuids"]
if request_for_sampling.cache_salt is not None:
prompt_inputs["cache_salt"] = request_for_sampling.cache_salt
if request_for_sampling.mm_processor_kwargs is not None:
prompt_inputs["mm_processor_kwargs"] = request_for_sampling.mm_processor_kwargs
vllm_preproc: EngineCoreRequest = _w_input_processor.process_inputs(
request_id,
prompt_inputs,
sampling_params,
)
InputProcessor.assign_request_id(vllm_preproc)
sp = vllm_preproc.sampling_params
if sp.n != 1:
raise PreprocessError(
{
"error": {
"message": (
f"Unsupported value: 'n={sp.n}'. "
"This endpoint currently supports only n=1."
),
"type": "invalid_request_error",
"param": "n",
"code": "unsupported_value",
}
}
)
dynamo_preproc = {
"model": model_name,
"token_ids": tokens,
"stop_conditions": {
"max_tokens": sp.max_tokens,
"stop": sp.stop,
"stop_token_ids": sp.stop_token_ids,
"min_tokens": sp.min_tokens,
"ignore_eos": sp.ignore_eos,
},
"sampling_options": {
"n": sp.n,
"presence_penalty": sp.presence_penalty,
"frequency_penalty": sp.frequency_penalty,
"repetition_penalty": sp.repetition_penalty,
"temperature": sp.temperature,
"top_p": sp.top_p,
"top_k": sp.top_k,
"min_p": sp.min_p,
"seed": sp.seed,
},
"output_options": {
"logprobs": sp.logprobs,
"prompt_logprobs": sp.prompt_logprobs,
"skip_special_tokens": sp.skip_special_tokens,
},
"eos_token_ids": (
[vllm_preproc.eos_token_id] if vllm_preproc.eos_token_id is not None else []
),
"annotations": [],
}
return PreprocessWorkerResult(
dynamo_preproc=dynamo_preproc,
tokens=tokens,
vllm_preproc=vllm_preproc,
sampling_params=sampling_params,
request_for_sampling=request_for_sampling,
chat_template_kwargs=pre.chat_template_kwargs,
)
class VllmProcessor: class VllmProcessor:
def __init__( def __init__(
self, self,
...@@ -234,9 +411,11 @@ class VllmProcessor: ...@@ -234,9 +411,11 @@ class VllmProcessor:
"prompt_logprobs": sp.prompt_logprobs, "prompt_logprobs": sp.prompt_logprobs,
"skip_special_tokens": sp.skip_special_tokens, "skip_special_tokens": sp.skip_special_tokens,
}, },
"eos_token_ids": [vllm_preproc.eos_token_id] "eos_token_ids": (
[vllm_preproc.eos_token_id]
if vllm_preproc.eos_token_id is not None if vllm_preproc.eos_token_id is not None
else [], else []
),
"annotations": [], "annotations": [],
} }
...@@ -347,6 +526,77 @@ class VllmProcessor: ...@@ -347,6 +526,77 @@ class VllmProcessor:
[vllm_preproc.request_id], internal=True [vllm_preproc.request_id], internal=True
) )
async def _generator_inner_pool(
self, request: dict[str, Any]
) -> AsyncGenerator[dict[str, Any], None]:
"""Process a request using the worker pool.
Phase 1: Preprocess in a worker process (semaphore held).
Phase 2: Remote inference via router (no worker held).
Phase 3: Post-process tokens in the main process.
"""
request_id = random_uuid()
# --- Phase 1: Preprocess (semaphore held) ---
try:
assert self._worker_semaphore is not None
async with self._worker_semaphore:
assert self.preprocess_pool is not None
future = self.preprocess_pool.submit(
_preprocess_worker, request, request_id, request["model"]
)
preproc_result: PreprocessWorkerResult = await asyncio.wrap_future(
future
)
# Semaphore + worker released here
except PreprocessError as exc:
yield exc.error_dict
return
except Exception as exc:
logger.exception("Worker preprocessing failed for request %s", request_id)
yield {
"error": {
"message": f"Worker error: {exc}",
"type": "internal_error",
}
}
return
# --- Between phases: reconstruct main-process objects ---
dynamo_preproc = preproc_result.dynamo_preproc
tokens = preproc_result.tokens
vllm_preproc = preproc_result.vllm_preproc
sampling_params = preproc_result.sampling_params
request_for_sampling = preproc_result.request_for_sampling
tool_parser = None
if (
self.tool_parser_class
and request_for_sampling.tools
and request_for_sampling.tool_choice != "none"
):
tool_parser = self.tool_parser_class(self.tokenizer)
post = StreamingPostProcessor(
tokenizer=self.tokenizer,
request_for_sampling=request_for_sampling,
sampling_params=sampling_params,
prompt_token_ids=tokens,
tool_parser=tool_parser,
reasoning_parser_class=self.reasoning_parser_class,
chat_template_kwargs=preproc_result.chat_template_kwargs,
)
async for item in self._generate_and_stream(
request_id,
request,
dynamo_preproc,
tokens,
vllm_preproc,
post,
):
yield item
class EngineFactory: class EngineFactory:
def __init__( def __init__(
...@@ -439,7 +689,7 @@ class EngineFactory: ...@@ -439,7 +689,7 @@ class EngineFactory:
else: else:
reasoning_parser_class = None reasoning_parser_class = None
(namespace_name, component_name, endpoint_name) = instance_id.triple() namespace_name, component_name, endpoint_name = instance_id.triple()
generate_endpoint = self.runtime.endpoint( generate_endpoint = self.runtime.endpoint(
f"{namespace_name}.{component_name}.{endpoint_name}" f"{namespace_name}.{component_name}.{endpoint_name}"
) )
...@@ -455,6 +705,45 @@ class EngineFactory: ...@@ -455,6 +705,45 @@ class EngineFactory:
router_mode=self.router_config.router_mode router_mode=self.router_config.router_mode
) )
preprocess_pool = None
preprocess_workers = self.config.preprocess_workers
if preprocess_workers > 0:
logger.info(
"Creating preprocess worker pool with %d workers for model %s",
preprocess_workers,
source_path,
)
preprocess_pool = ProcessPoolExecutor(
max_workers=preprocess_workers,
initializer=_init_worker,
initargs=(
source_path,
tokenizer_mode,
config_format,
load_format,
tool_parser_name,
),
)
# Warm up all workers to ensure initialization completes
futures = [
preprocess_pool.submit(worker_warmup) for _ in range(preprocess_workers)
]
done, not_done = _futures_wait(futures, timeout=120)
if not_done:
for f in not_done:
f.cancel()
preprocess_pool.shutdown(wait=False, cancel_futures=True)
raise RuntimeError(
"Timed out waiting for preprocess worker pool warmup"
)
try:
for f in done:
f.result() # Raises if initializer failed
except Exception:
preprocess_pool.shutdown(wait=False, cancel_futures=True)
raise
logger.info("Preprocess worker pool ready (%d workers)", preprocess_workers)
gen = VllmProcessor( gen = VllmProcessor(
tokenizer, tokenizer,
input_processor, input_processor,
......
...@@ -8,6 +8,7 @@ import os ...@@ -8,6 +8,7 @@ import os
import socket import socket
import sys import sys
import tempfile import tempfile
import warnings
from argparse import Namespace from argparse import Namespace
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Generator, Optional from typing import Any, Dict, Generator, Optional
...@@ -374,6 +375,14 @@ async def parse_args(args: list[str]) -> Config: ...@@ -374,6 +375,14 @@ async def parse_args(args: list[str]) -> Config:
server_args.stream_output = True server_args.stream_output = True
if dynamo_config.use_sglang_tokenizer: if dynamo_config.use_sglang_tokenizer:
warnings.warn(
"--use-sglang-tokenizer is deprecated and will be removed in a future "
"release. Use '--dyn-chat-processor sglang' on the frontend instead, "
"which provides the same SGLang-native pre/post processing with KV "
"router support.",
FutureWarning,
stacklevel=2,
)
logging.info( logging.info(
"Using SGLang's built in tokenizer. Setting skip_tokenizer_init to False" "Using SGLang's built in tokenizer. Setting skip_tokenizer_init to False"
) )
......
...@@ -34,7 +34,10 @@ class DynamoSGLangArgGroup(ArgGroup): ...@@ -34,7 +34,10 @@ class DynamoSGLangArgGroup(ArgGroup):
flag_name="--use-sglang-tokenizer", flag_name="--use-sglang-tokenizer",
env_var="DYN_SGL_USE_TOKENIZER", env_var="DYN_SGL_USE_TOKENIZER",
default=False, default=False,
help="Use SGLang's tokenizer for pre and post processing. This bypasses Dynamo's preprocessor and only v1/chat/completions will be available through the Dynamo frontend. Cannot be used with --custom-jinja-template.", help="[Deprecated] Use SGLang's tokenizer for pre and post processing. "
"This option will be removed in a future release. Use "
"'--dyn-chat-processor sglang' on the frontend instead, which provides "
"the same SGLang-native pre/post processing with KV router support.",
) )
add_negatable_bool_argument( add_negatable_bool_argument(
......
---
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: SGLang Chat Processor
subtitle: SGLang-native preprocessing and postprocessing for chat completions
---
The SGLang chat processor enables SGLang-native preprocessing and postprocessing in the Dynamo frontend. It uses SGLang's tokenizer, chat templates, tool call parser, and reasoning parser directly -- bypassing the default Rust preprocessor for `v1/chat/completions` requests.
## When to Use
Use `--dyn-chat-processor sglang` when Dynamo's built-in Rust preprocessor does not yet support a tool call parser or reasoning parser you need. The SGLang processor delegates to SGLang's Python implementations, so any parser SGLang supports works immediately.
Common cases:
- A **tool call format** not yet in the Rust `tool_calling` library
- A **reasoning parser** not yet supported natively
- A **chat template** that the Rust preprocessor doesn't handle correctly
If the parser you need is missing from the Rust preprocessor, consider [opening an issue or PR](https://github.com/ai-dynamo/dynamo/issues) to add native support -- native parsers avoid the Python GIL overhead entirely.
## Quick Start
```bash
# Frontend with SGLang processor, tool calling, and reasoning
python -m dynamo.frontend \
--router-mode kv \
--dyn-chat-processor sglang \
--tool-call-parser hermes \
--reasoning-parser qwen3
# Workers (unchanged)
CUDA_VISIBLE_DEVICES=0 python -m dynamo.sglang \
--model-path Qwen/Qwen3-14B-FP8 \
--served-model-name Qwen/Qwen3-14B-FP8 \
--tp 1 --trust-remote-code \
--kv-events-config '{"publisher":"zmq","topic":"kv-events","endpoint":"tcp://*:5557"}'
```
## Frontend Arguments
These arguments are passed to the **frontend** (not the worker) when using `--dyn-chat-processor sglang`:
| Argument | Default | Description |
|----------|---------|-------------|
| `--dyn-chat-processor sglang` | (none) | Enable the SGLang chat processor |
| `--tool-call-parser` | `None` | Tool call parser name (any SGLang-supported parser) |
| `--reasoning-parser` | `None` | Reasoning parser name (any SGLang-supported parser) |
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `DYN_SGLANG_STREAM_INTERVAL` | `20` | Number of tokens to accumulate before detokenizing. Higher values improve throughput. The first chunk always emits immediately (interval=1) to minimize time-to-first-token. |
## Tool Calling
The processor supports all SGLang tool call formats. Pass `--tool-call-parser` on the frontend:
```bash
python -m dynamo.frontend \
--dyn-chat-processor sglang \
--tool-call-parser hermes
```
Any parser supported by SGLang can be used. See the [SGLang documentation](https://docs.sglang.ai/) for the full list of available tool call parsers.
### Example: Tool Call Request
```bash
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "Qwen/Qwen3-14B-FP8",
"messages": [{"role": "user", "content": "What is the weather in Paris?"}],
"tools": [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a city",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"]
}
}
}],
"tool_choice": "auto"
}'
```
Response:
```json
{
"choices": [{
"message": {
"role": "assistant",
"tool_calls": [{
"id": "call_8cd24396f3671048",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\"city\": \"Paris\"}"
}
}],
"reasoning_content": "The user wants weather info for Paris..."
},
"finish_reason": "tool_calls"
}]
}
```
## Reasoning Parsing
For models that produce chain-of-thought reasoning (e.g., Qwen3, DeepSeek-R1), pass `--reasoning-parser`:
```bash
python -m dynamo.frontend \
--dyn-chat-processor sglang \
--reasoning-parser qwen3
```
The parser separates think tag content into the `reasoning_content` field and regular content into the `content` field.
## Migration from `--use-sglang-tokenizer`
`--use-sglang-tokenizer` on the **worker** is deprecated. Replace with `--dyn-chat-processor sglang` on the **frontend**:
```diff
# Before (deprecated)
- python -m dynamo.sglang --model-path <model> --use-sglang-tokenizer
- python -m dynamo.frontend
# After
python -m dynamo.sglang --model-path <model>
+ python -m dynamo.frontend --dyn-chat-processor sglang
```
Key differences:
| | `--use-sglang-tokenizer` | `--dyn-chat-processor sglang` |
|---|---|---|
| Location | Worker flag | Frontend flag |
| KV router | Not supported | Supported |
| Tool calling | Not supported | Supported |
| Reasoning | Not supported | Supported |
| Endpoints | `v1/chat/completions` only | `v1/chat/completions` only |
## See Also
- **[Tool Calling](../../agents/tool-calling.md)**: General tool calling guide
- **[Reference Guide](sglang-reference-guide.md)**: Full SGLang backend reference
- **[Agentic Workloads](agents.md)**: Priority scheduling and cache pinning for agents
...@@ -35,7 +35,7 @@ These arguments are added by Dynamo on top of SGLang's native arguments. ...@@ -35,7 +35,7 @@ These arguments are added by Dynamo on top of SGLang's native arguments.
| Argument | Env Var | Default | Description | | Argument | Env Var | Default | Description |
|----------|---------|---------|-------------| |----------|---------|---------|-------------|
| `--endpoint` | `DYN_ENDPOINT` | Auto-generated | Dynamo endpoint in `dyn://namespace.component.endpoint` format | | `--endpoint` | `DYN_ENDPOINT` | Auto-generated | Dynamo endpoint in `dyn://namespace.component.endpoint` format |
| `--use-sglang-tokenizer` | `DYN_SGL_USE_TOKENIZER` | `false` | Use SGLang's tokenizer instead of Dynamo's | | `--use-sglang-tokenizer` | `DYN_SGL_USE_TOKENIZER` | `false` | **[Deprecated]** Use `--dyn-chat-processor sglang` on the frontend instead. See [SGLang Chat Processor](sglang-chat-processor.md). |
| `--dyn-tool-call-parser` | `DYN_TOOL_CALL_PARSER` | `None` | [Tool call](../../agents/tool-calling.md) parser (overrides SGLang's `--tool-call-parser`) | | `--dyn-tool-call-parser` | `DYN_TOOL_CALL_PARSER` | `None` | [Tool call](../../agents/tool-calling.md) parser (overrides SGLang's `--tool-call-parser`) |
| `--dyn-reasoning-parser` | `DYN_REASONING_PARSER` | `None` | Reasoning parser for chain-of-thought models | | `--dyn-reasoning-parser` | `DYN_REASONING_PARSER` | `None` | Reasoning parser for chain-of-thought models |
| `--custom-jinja-template` | `DYN_CUSTOM_JINJA_TEMPLATE` | `None` | Custom chat template path (incompatible with `--use-sglang-tokenizer`) | | `--custom-jinja-template` | `DYN_CUSTOM_JINJA_TEMPLATE` | `None` | Custom chat template path (incompatible with `--use-sglang-tokenizer`) |
...@@ -56,10 +56,10 @@ These arguments are added by Dynamo on top of SGLang's native arguments. ...@@ -56,10 +56,10 @@ These arguments are added by Dynamo on top of SGLang's native arguments.
By default, Dynamo handles tokenization and detokenization through its Rust-based frontend, passing `input_ids` to SGLang. This enables all frontend endpoints (`v1/chat/completions`, `v1/completions`, `v1/embeddings`). By default, Dynamo handles tokenization and detokenization through its Rust-based frontend, passing `input_ids` to SGLang. This enables all frontend endpoints (`v1/chat/completions`, `v1/completions`, `v1/embeddings`).
With `--use-sglang-tokenizer`, SGLang handles tokenization internally and Dynamo passes raw prompts. This restricts the frontend to `v1/chat/completions` only. For SGLang-native preprocessing (tool calling, reasoning parsing, chat templates), use `--dyn-chat-processor sglang` on the frontend. See [SGLang Chat Processor](sglang-chat-processor.md) for architecture and usage.
<Warning> <Warning>
`--custom-jinja-template` and `--use-sglang-tokenizer` are mutually exclusive. Custom templates require Dynamo's preprocessor. `--use-sglang-tokenizer` is deprecated. Use `--dyn-chat-processor sglang` on the frontend instead, which provides the same SGLang-native processing with KV router support and the completions endpoint.
</Warning> </Warning>
## Request Cancellation ## Request Cancellation
......
...@@ -149,6 +149,8 @@ navigation: ...@@ -149,6 +149,8 @@ navigation:
contents: contents:
- page: Reference Guide - page: Reference Guide
path: backends/sglang/sglang-reference-guide.md path: backends/sglang/sglang-reference-guide.md
- page: Chat Processor
path: backends/sglang/sglang-chat-processor.md
- page: Examples - page: Examples
path: backends/sglang/sglang-examples.md path: backends/sglang/sglang-examples.md
- page: Disaggregation - page: Disaggregation
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment