Unverified Commit 5d5235bc authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat(sglang): aggregated support (#937)


Co-authored-by: default avatarishandhanani <ishandhananai@gmail.com>
parent bdf60ca0
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# LLM Deployment Examples using SGLang
This directory contains examples and reference implementations for deploying Large Language Models (LLMs) in various configurations using SGLang. SGLang internally uses ZMQ to communicate between the ingress and the engine processes. For Dynamo, we leverage the runtime to communicate directly with the engine processes and handle ingress and pre/post processing on our end.
## Deployment Architectures
See [deployment architectures](../llm/README.md#deployment-architectures) to learn about the general idea of the architecture. SGLang currently support only aggregated serving but routing and disaggregation support are coming very soon!
## Getting Started
1. Choose a deployment architecture based on your requirements
2. Configure the components as needed
3. Deploy using the provided scripts
### Prerequisites
Start required services (etcd and NATS) using [Docker Compose](../../deploy/docker-compose.yml)
```bash
docker compose -f deploy/docker-compose.yml up -d
```
### Build docker
```bash
# On an x86 machine - sglang does not support ARM yet
./container/build.sh
```
### Run container
```bash
./container/run.sh -it
```
### Example architectures
#### Aggregated
```bash
cd /workspace/examples/sglang
dynamo serve graphs.agg:Frontend -f ./configs/agg.yaml
```
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import subprocess
from pathlib import Path
from components.worker import SGLangWorker
from fastapi import FastAPI
from pydantic import BaseModel
import dynamo.sdk as sdk
from dynamo.sdk import depends, service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
logger = logging.getLogger(__name__)
def get_dynamo_run_binary():
"""Find the dynamo-run binary path in SDK or fallback to 'dynamo-run' command."""
sdk_path = Path(sdk.__file__)
binary_path = sdk_path.parent / "cli/bin/dynamo-run"
if not binary_path.exists():
return "dynamo-run"
else:
return str(binary_path)
class FrontendConfig(BaseModel):
"""Configuration for the Frontend service including model and HTTP server settings."""
served_model_name: str
endpoint: str
port: int = 8080
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
workers=1,
image=DYNAMO_IMAGE,
app=FastAPI(title="LLM Example"),
)
class Frontend:
worker = depends(SGLangWorker)
def __init__(self):
"""Initialize Frontend service with HTTP server and model configuration."""
config = ServiceConfig.get_instance()
frontend_config = FrontendConfig(**config.get("Frontend", {}))
self.frontend_config = frontend_config
self.process = None
self.start_ingress_and_processor()
def start_ingress_and_processor(self):
"""Starting dynamo-run based ingress and processor"""
logger.info(
f"Starting HTTP server and processor on port {self.frontend_config.port}"
)
dynamo_run_binary = get_dynamo_run_binary()
endpoint = f"dyn://{self.frontend_config.endpoint}"
self.process = subprocess.Popen(
[
dynamo_run_binary,
"in=http",
f"out={endpoint}",
"--http-port",
str(self.frontend_config.port),
],
stdout=None,
stderr=None,
)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SGLang disaggregated serving flow is
Processor -> PrefillWorker -> DecodeWorker
This is different from how we've implemented the vLLM disaggregated flow.
For now - the SGLangWorker will be responsible for aggreagted and prefill and we will
have a separate DecodeWorker.
"""
import logging
import signal
import sglang as sgl
from utils.protocol import PreprocessedRequest
from utils.sglang import parse_sglang_args
from dynamo.llm import ModelType, register_llm
from dynamo.sdk import async_on_start, dynamo_context, dynamo_endpoint, service
logger = logging.getLogger(__name__)
@service(
dynamo={
"enabled": True,
"namespace": "dynamo",
},
resources={"gpu": 1},
workers=1,
)
class SGLangWorker:
def __init__(self):
class_name = self.__class__.__name__
self.engine_args = parse_sglang_args(class_name, "")
self.engine = sgl.Engine(server_args=self.engine_args)
for sig in [signal.SIGINT, signal.SIGTERM]:
signal.signal(sig, self.shutdown_sglang_engine)
logger.info("SGLangWorker initialized")
@async_on_start
async def async_init(self):
runtime = dynamo_context["runtime"]
logger.info("Registering LLM for discovery")
comp_ns, comp_name = SGLangWorker.dynamo_address() # type: ignore
endpoint = runtime.namespace(comp_ns).component(comp_name).endpoint("generate")
await register_llm(
ModelType.Backend,
endpoint,
self.engine_args.model_path,
self.engine_args.served_model_name,
)
def shutdown_sglang_engine(self, signum, frame):
self.engine.shutdown()
logger.info("SGLang engine shutdown")
def _build_sampling_params(self, request: PreprocessedRequest) -> dict:
# TODO: maintain a full mapping from PreprocessedRequest to SGLang's SamplingParams
sampling_params = {}
if request.sampling_options.temperature:
sampling_params["temperature"] = request.sampling_options.temperature
if request.sampling_options.top_p:
sampling_params["top_p"] = request.sampling_options.top_p
if request.sampling_options.top_k:
sampling_params["top_k"] = request.sampling_options.top_k
sampling_params["max_new_tokens"] = request.stop_conditions.max_tokens
if request.stop_conditions.ignore_eos:
sampling_params["ignore_eos"] = request.stop_conditions.ignore_eos
return sampling_params
@dynamo_endpoint()
async def generate(self, request: PreprocessedRequest):
# TODO: maintain a mapping from SGLang's Ouput struct to LLMEngineOuput
sampling_params = self._build_sampling_params(request)
g = await self.engine.async_generate(
input_ids=request.token_ids,
sampling_params=sampling_params,
stream=True,
)
num_output_tokens_so_far = 0
async for res in g:
finish_reason = res["meta_info"]["finish_reason"]
if finish_reason:
# Don't forward the stop token
out = {"token_ids": [], "finish_reason": finish_reason["type"]}
else:
next_total_toks = len(res["output_ids"])
out = {"token_ids": res["output_ids"][num_output_tokens_so_far:]}
yield out
num_output_tokens_so_far = next_total_toks
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Frontend:
served_model_name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
endpoint: dynamo.SGLangWorker.generate
port: 8000
SGLangWorker:
model-path: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
served-model-name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
tp: 1
trust-remote-code: true
skip-tokenizer-init: true
ServiceArgs:
workers: 1
resources:
gpu: 1
\ No newline at end of file
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from components.frontend import Frontend
from components.worker import SGLangWorker
Frontend.link(SGLangWorker)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
from pydantic import BaseModel, Field
TokenIdType = int
# TODO: move these to common for all LLMs once we adopt dynamo-run
# derived from lib/llm/src/protocols/common/preprocessor.rs
class StopConditions(BaseModel):
max_tokens: Optional[int] = None
stop: Optional[List[str]] = None
stop_token_ids_hidden: Optional[List[TokenIdType]] = None
min_tokens: Optional[int] = None
ignore_eos: Optional[bool] = None
class SamplingOptions(BaseModel):
n: Optional[int] = None
best_of: Optional[int] = None
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
repetition_penalty: Optional[float] = None
temperature: Optional[float] = None
top_p: Optional[float] = None
top_k: Optional[int] = None
min_p: Optional[float] = None
use_beam_search: Optional[bool] = None
length_penalty: Optional[float] = None
seed: Optional[int] = None
class PreprocessedRequest(BaseModel):
token_ids: List[TokenIdType]
stop_conditions: StopConditions
sampling_options: SamplingOptions
eos_token_ids: List[TokenIdType] = Field(default_factory=list)
mdc_sum: Optional[str] = None
annotations: List[str] = Field(default_factory=list)
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from sglang.srt.server_args import ServerArgs
from dynamo.sdk.lib.config import ServiceConfig
def parse_sglang_args(service_name, prefix) -> ServerArgs:
config = ServiceConfig.get_instance()
sglang_args = config.as_args(service_name, prefix=prefix)
parser = argparse.ArgumentParser()
# add future dynamo arguments here
ServerArgs.add_cli_args(parser)
args = parser.parse_args(sglang_args)
return ServerArgs.from_cli_args(args)
......@@ -60,16 +60,12 @@ class RequestHandler:
async def generate(self, request):
sampling_params = {}
for key, value in request["sampling_options"].items():
if value:
# TODO: Do these always match? Maybe allow-list the fields that do match
sampling_params[key] = value
# sglang defaults this to 128
max_new_tokens = request["stop_conditions"]["max_tokens"]
if max_new_tokens:
sampling_params["max_new_tokens"] = max_new_tokens
if request["sampling_options"]["temperature"] is not None:
sampling_params["temperature"] = request["sampling_options"]["temperature"]
sampling_params = {
# sglang defaults this to 128
"max_new_tokens": request["stop_conditions"]["max_tokens"],
}
num_output_tokens_so_far = 0
gen = await self.engine_client.async_generate(
input_ids=request["token_ids"], sampling_params=sampling_params, stream=True
......
......@@ -92,7 +92,9 @@ def configure_dynamo_logging(
dyn_var = os.environ.get("DYN_LOG", "info")
dyn_level = log_level_mapping(dyn_var)
# configure inference engine loggers
configure_vllm_logging(dyn_level)
configure_sglang_logging(dyn_level)
# loggers that should be configured to ERROR
error_loggers = ["bentoml", "tag"]
......@@ -125,6 +127,38 @@ def log_level_mapping(level: str) -> int:
return logging.INFO
def configure_sglang_logging(dyn_level: int):
"""
SGLang allows us to create a custom logging config file
"""
sglang_level = logging.getLevelName(dyn_level)
sglang_config = {
"formatters": {"simple": {"format": "%(message)s"}},
"handlers": {
"dynamo": {
"class": "dynamo.runtime.logging.LogHandler",
"formatter": "simple",
"level": sglang_level,
}
},
"loggers": {
"sglang": {
"handlers": ["dynamo"],
"level": sglang_level,
"propagate": False,
}
},
"version": 1,
"disable_existing_loggers": False,
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(sglang_config, f)
os.environ["SGLANG_LOGGING_CONFIG_PATH"] = f.name
def configure_vllm_logging(dyn_level: int):
"""
vLLM requires a logging config file to be set in the environment.
......
......@@ -34,6 +34,7 @@ dependencies = [
"distro",
"typer",
"circus>=0.17.0",
"sglang[all]==0.4.6.post2"
]
classifiers = [
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment