Unverified Commit b4189c68 authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

feat: support SGLang in pre-deployment sweeping (#2360)


Signed-off-by: default avatarHongkuan Zhou <tedzhouhk@gmail.com>
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
parent 89ede845
...@@ -34,6 +34,8 @@ spec: ...@@ -34,6 +34,8 @@ spec:
- /workspace/profiling_results - /workspace/profiling_results
- --namespace - --namespace
- ${NAMESPACE} - ${NAMESPACE}
- --backend
- vllm
- --min-num-gpus-per-engine - --min-num-gpus-per-engine
- "1" - "1"
- --max-num-gpus-per-engine - --max-num-gpus-per-engine
......
...@@ -21,7 +21,7 @@ import os ...@@ -21,7 +21,7 @@ import os
import numpy as np import numpy as np
import yaml import yaml
from utils.config import CONFIG_MODIFIERS from utils.config import CONFIG_MODIFIERS, WORKER_COMPONENT_NAMES
from utils.defaults import DECODE_NUM_REQUESTS_RANGE from utils.defaults import DECODE_NUM_REQUESTS_RANGE
from utils.dynamo_deployment import ( from utils.dynamo_deployment import (
DynamoDeploymentClient, DynamoDeploymentClient,
...@@ -140,6 +140,7 @@ async def run_profile(args): ...@@ -140,6 +140,7 @@ async def run_profile(args):
model_name=model_name, model_name=model_name,
service_name=args.service_name, service_name=args.service_name,
frontend_port=frontend_port, frontend_port=frontend_port,
deployment_name=prefill_config["metadata"]["name"],
) )
logger.info(f"Created client with service_name: {client.service_name}") logger.info(f"Created client with service_name: {client.service_name}")
deployment_clients.append(client) # Track for cleanup deployment_clients.append(client) # Track for cleanup
...@@ -247,6 +248,7 @@ async def run_profile(args): ...@@ -247,6 +248,7 @@ async def run_profile(args):
model_name=model_name, model_name=model_name,
service_name=args.service_name, service_name=args.service_name,
frontend_port=frontend_port, frontend_port=frontend_port,
deployment_name=decode_config["metadata"]["name"],
) )
deployment_clients.append(client) # Track for cleanup deployment_clients.append(client) # Track for cleanup
await client.create_deployment(decode_config_fn) await client.create_deployment(decode_config_fn)
...@@ -261,7 +263,7 @@ async def run_profile(args): ...@@ -261,7 +263,7 @@ async def run_profile(args):
) )
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/vllm-v1-agg/vllmdecodeworker/0.log" f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log"
) )
max_concurrency = max_kv_tokens // (args.isl + args.osl) max_concurrency = max_kv_tokens // (args.isl + args.osl)
sweep_num_request = [ sweep_num_request = [
...@@ -393,6 +395,7 @@ async def run_profile(args): ...@@ -393,6 +395,7 @@ async def run_profile(args):
model_name=model_name, model_name=model_name,
service_name=args.service_name, service_name=args.service_name,
frontend_port=frontend_port, frontend_port=frontend_port,
deployment_name=prefill_config["metadata"]["name"],
) )
deployment_clients.append(client) # Track for cleanup deployment_clients.append(client) # Track for cleanup
await client.create_deployment(prefill_config_fn) await client.create_deployment(prefill_config_fn)
...@@ -448,8 +451,10 @@ async def run_profile(args): ...@@ -448,8 +451,10 @@ async def run_profile(args):
client = DynamoDeploymentClient( client = DynamoDeploymentClient(
namespace=args.namespace, namespace=args.namespace,
base_log_dir=work_dir, base_log_dir=work_dir,
model_name=model_name,
service_name=args.service_name, service_name=args.service_name,
frontend_port=frontend_port, frontend_port=frontend_port,
deployment_name=decode_config["metadata"]["name"],
) )
deployment_clients.append(client) # Track for cleanup deployment_clients.append(client) # Track for cleanup
await client.create_deployment(decode_config_fn) await client.create_deployment(decode_config_fn)
...@@ -464,7 +469,7 @@ async def run_profile(args): ...@@ -464,7 +469,7 @@ async def run_profile(args):
) )
max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log(
f"{work_dir}/vllm-v1-agg/vllmdecodeworker/0.log" f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log"
) )
base_url = client.get_service_url() base_url = client.get_service_url()
...@@ -508,8 +513,8 @@ if __name__ == "__main__": ...@@ -508,8 +513,8 @@ if __name__ == "__main__":
"--backend", "--backend",
type=str, type=str,
default="vllm", default="vllm",
choices=["vllm"], choices=["vllm", "sglang"],
help="backend type, currently support [vllm]", help="backend type, currently support [vllm, sglang]",
) )
parser.add_argument( parser.add_argument(
"--config", "--config",
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re
from copy import deepcopy from copy import deepcopy
from typing import Literal from typing import Literal
...@@ -35,13 +36,23 @@ logger.addHandler(console_handler) ...@@ -35,13 +36,23 @@ logger.addHandler(console_handler)
def break_arguments(args: list[str]) -> list[str]: def break_arguments(args: list[str]) -> list[str]:
ans = [] ans = []
if isinstance(args, str): if isinstance(args, str):
ans = args.split(" ") ans = re.split(r"[ =]", args)
else: else:
for arg in args: for arg in args:
ans.extend(arg.split(" ")) ans.extend(arg.split(" "))
return ans return ans
def remove_valued_arguments(args: list[str], key: str) -> list[str]:
"""Remove a valued argument (e.g., --key value) from the arguments list if exists."""
if key in args:
idx = args.index(key)
if idx + 1 < len(args):
del args[idx : idx + 2]
return args
def join_arguments(args: list[str]) -> list[str]: def join_arguments(args: list[str]) -> list[str]:
return [" ".join(args)] return [" ".join(args)]
...@@ -237,6 +248,167 @@ class VllmV1ConfigModifier: ...@@ -237,6 +248,167 @@ class VllmV1ConfigModifier:
return 0 return 0
class SGLangConfigModifier:
@classmethod
def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict:
config = deepcopy(config)
# set metadata name
config["metadata"]["name"] = "sglang-agg"
# disable planner
if "Planner" in config["spec"]["services"]:
del config["spec"]["services"]["Planner"]
if target == "prefill":
# convert prefill worker into decode worker
config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
] = config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].prefill_worker_k8s_name
]
del config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].prefill_worker_k8s_name
]
args = config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["extraPodSpec"]["mainContainer"]["args"]
args = break_arguments(args)
# remove `--disaggregation-mode` and `--disaggregation-transfer-backend`
args = remove_valued_arguments(args, "--disaggregation-mode")
args = remove_valued_arguments(args, "--disaggregation-transfer-backend")
# disable prefix caching
if "--disable-radix-cache" not in args:
args = append_argument(args, "--disable-radix-cache")
config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["extraPodSpec"]["mainContainer"]["args"] = join_arguments(args)
elif target == "decode":
# delete prefill worker
del config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].prefill_worker_k8s_name
]
args = config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["extraPodSpec"]["mainContainer"]["args"]
args = break_arguments(args)
# call `dynamo.sglang.worker` instead of `dynamo.sglang.decode_worker`
idx = args.index("dynamo.sglang.decode_worker")
args[idx] = "dynamo.sglang.worker"
# remove `--disaggregation-mode` and `--disaggregation-transfer-backend`
args = remove_valued_arguments(args, "--disaggregation-mode")
args = remove_valued_arguments(args, "--disaggregation-transfer-backend")
# enable prefix caching
if "--disable-radix-cache" in args:
args.remove("--disable-radix-cache")
config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["extraPodSpec"]["mainContainer"]["args"] = join_arguments(args)
# set num workers to 1
decode_worker_config = config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]
decode_worker_config["replicas"] = 1
return config
@classmethod
def set_config_tp_size(cls, config: dict, tp_size: int):
config = deepcopy(config)
config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["resources"]["requests"]["gpu"] = str(tp_size)
if (
"limits"
in config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["resources"]
):
config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["resources"]["limits"]["gpu"] = str(tp_size)
args = config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["extraPodSpec"]["mainContainer"]["args"]
args = break_arguments(args)
try:
idx = args.index("--tp")
args[idx + 1] = str(tp_size)
except ValueError:
args = append_argument(args, ["--tp", str(tp_size)])
config["spec"]["services"][
WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
]["extraPodSpec"]["mainContainer"]["args"] = join_arguments(args)
return config
@classmethod
def get_model_name(cls, config: dict) -> str:
worker_name = WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name
args = config["spec"]["services"][worker_name]["extraPodSpec"]["mainContainer"][
"args"
]
args = break_arguments(args)
for i, arg in enumerate(args):
if arg == "--served-model-name" and i + 1 < len(args):
return args[i + 1]
logger.warning(
f"Model name not found in configuration args, using default model name: {DEFAULT_MODEL_NAME}"
)
return DEFAULT_MODEL_NAME
@classmethod
def get_port(cls, config: dict) -> int:
args = config["spec"]["services"]["Frontend"]["extraPodSpec"]["mainContainer"][
"args"
]
args = break_arguments(args)
try:
idx = args.index("--http-port")
return int(args[idx + 1])
except ValueError:
logger.warning(
f"Port not found in configuration args, using default port: {DYNAMO_RUN_DEFAULT_PORT}"
)
return DYNAMO_RUN_DEFAULT_PORT
@classmethod
def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int:
# TODO
try:
with open(dynamo_log_fn, "r") as f:
for line in f:
if "KV Cache is allocated" in line and "#tokens:" in line:
# Extract the number after "#tokens:"
match = re.search(r"#tokens:\s*(\d+)", line)
if match:
return int(match.group(1))
except Exception as e:
logger.warning(f"Failed to parse KV cache size from log file. Error: {e}")
return 0
CONFIG_MODIFIERS = { CONFIG_MODIFIERS = {
"vllm": VllmV1ConfigModifier, "vllm": VllmV1ConfigModifier,
"sglang": SGLangConfigModifier,
} }
...@@ -116,6 +116,13 @@ class DynamoDeploymentClient: ...@@ -116,6 +116,13 @@ class DynamoDeploymentClient:
self.deployment_spec["metadata"]["name"] = self.deployment_name self.deployment_spec["metadata"]["name"] = self.deployment_name
self.deployment_spec["metadata"]["namespace"] = self.namespace self.deployment_spec["metadata"]["namespace"] = self.namespace
# Disable grove as it will cause the deployment to not report ready
if "annotations" not in self.deployment_spec["metadata"]:
self.deployment_spec["metadata"]["annotations"] = {}
self.deployment_spec["metadata"]["annotations"][
"nvidia.com/enable-grove"
] = "false"
try: try:
await self.custom_api.create_namespaced_custom_object( await self.custom_api.create_namespaced_custom_object(
group="nvidia.com", group="nvidia.com",
......
...@@ -89,6 +89,9 @@ spec: ...@@ -89,6 +89,9 @@ spec:
failureThreshold: 60 failureThreshold: 60
image: my-registry/sglang-runtime:my-tag image: my-registry/sglang-runtime:my-tag
workingDir: /workspace/components/backends/sglang workingDir: /workspace/components/backends/sglang
command:
- /bin/sh
- -c
args: args:
- "python3" - "python3"
- "-m" - "-m"
......
...@@ -89,6 +89,9 @@ spec: ...@@ -89,6 +89,9 @@ spec:
failureThreshold: 60 failureThreshold: 60
image: my-registry/sglang-runtime:my-tag image: my-registry/sglang-runtime:my-tag
workingDir: /workspace/components/backends/sglang workingDir: /workspace/components/backends/sglang
command:
- /bin/sh
- -c
args: args:
- "python3" - "python3"
- "-m" - "-m"
......
...@@ -38,7 +38,7 @@ spec: ...@@ -38,7 +38,7 @@ spec:
memory: "40Gi" memory: "40Gi"
extraPodSpec: extraPodSpec:
mainContainer: mainContainer:
image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0804 image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0808-07
workingDir: /workspace/components/backends/sglang workingDir: /workspace/components/backends/sglang
command: ["sh", "-c"] command: ["sh", "-c"]
args: args:
...@@ -87,8 +87,11 @@ spec: ...@@ -87,8 +87,11 @@ spec:
port: 9090 port: 9090
periodSeconds: 10 periodSeconds: 10
failureThreshold: 60 failureThreshold: 60
image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0804 image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0808-07
workingDir: /workspace/components/backends/sglang workingDir: /workspace/components/backends/sglang
command:
- /bin/sh
- -c
args: args:
- "python3" - "python3"
- "-m" - "-m"
...@@ -151,8 +154,11 @@ spec: ...@@ -151,8 +154,11 @@ spec:
port: 9090 port: 9090
periodSeconds: 10 periodSeconds: 10
failureThreshold: 60 failureThreshold: 60
image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0804 image: nvcr.io/nvidian/nim-llm-dev/sglang-runtime:hzhou-0808-07
workingDir: /workspace/components/backends/sglang workingDir: /workspace/components/backends/sglang
command:
- /bin/sh
- -c
args: args:
- "python3" - "python3"
- "-m" - "-m"
......
...@@ -82,6 +82,16 @@ class VllmComponentName: ...@@ -82,6 +82,16 @@ class VllmComponentName:
decode_worker_endpoint = "generate" decode_worker_endpoint = "generate"
class SGLangComponentName:
prefill_worker_k8s_name = "SGLangPrefillWorker"
prefill_worker_component_name = "worker"
prefill_worker_endpoint = "generate"
decode_worker_k8s_name = "SGLangDecodeWorker"
decode_worker_component_name = "decode"
decode_worker_endpoint = "generate"
WORKER_COMPONENT_NAMES = { WORKER_COMPONENT_NAMES = {
"vllm": VllmComponentName, "vllm": VllmComponentName,
"sglang": SGLangComponentName,
} }
...@@ -4,6 +4,16 @@ ...@@ -4,6 +4,16 @@
To ensure Dynamo deployments comply with the SLA, we provide a pre-deployment script to profile the model performance with different parallelization mappings and recommend the parallelization mapping for prefill and decode workers and planner configurations. To use this script, the user needs to provide the target ISL, OSL, TTFT SLA, and ITL SLA. To ensure Dynamo deployments comply with the SLA, we provide a pre-deployment script to profile the model performance with different parallelization mappings and recommend the parallelization mapping for prefill and decode workers and planner configurations. To use this script, the user needs to provide the target ISL, OSL, TTFT SLA, and ITL SLA.
Support matrix:
| Backends | Model Types | Supported |
| --- | --- | --- |
| vLLM | Dense | ✅ |
| vLLM | MoE | 🚧 |
| SGLang | Dense | ✅ |
| SGLang | MoE | 🚧 |
| TensorRT-LLM | Dense | 🚧 |
| TensorRT-LLM | MoE | 🚧 |
> [!NOTE] > [!NOTE]
> The script considers a fixed ISL/OSL without KV cache reuse. If the real ISL/OSL has a large variance or a significant amount of KV cache can be reused, the result might be inaccurate. > The script considers a fixed ISL/OSL without KV cache reuse. If the real ISL/OSL has a large variance or a significant amount of KV cache can be reused, the result might be inaccurate.
...@@ -120,7 +130,7 @@ This approach allows you to: ...@@ -120,7 +130,7 @@ This approach allows you to:
Only needed if you require custom code modifications beyond configuration changes: Only needed if you require custom code modifications beyond configuration changes:
```bash ```bash
# in the project's root folder # in the project's root folder
./container/build.sh --framework VLLM ./container/build.sh --framework <VLLM/sglang>
# Tag and push to your container registry # Tag and push to your container registry
export DOCKER_IMAGE=<your docker tag> export DOCKER_IMAGE=<your docker tag>
export DGD_CONFIG_FILE=<disagg config path> # path to your disagg.yaml file within the DOCKER_IMAGE export DGD_CONFIG_FILE=<disagg config path> # path to your disagg.yaml file within the DOCKER_IMAGE
...@@ -128,7 +138,7 @@ export DGD_CONFIG_FILE=<disagg config path> # path to your disagg.yaml file with ...@@ -128,7 +138,7 @@ export DGD_CONFIG_FILE=<disagg config path> # path to your disagg.yaml file with
**Step 2: Set SLA target** **Step 2: Set SLA target**
Edit `$DYNAMO_HOME/benchmarks/profiler/deploy/profile_sla_job.yaml` to set the target ISL, OSL, TTFT, and ITL. Edit `$DYNAMO_HOME/benchmarks/profiler/deploy/profile_sla_job.yaml` to set the target ISL, OSL, TTFT, and ITL. Also, set the backend type to `vllm` or `sglang`. The backend type must match the dynamo deployment in the `DGD_CONFIG_FILE`.
```yaml ```yaml
spec: spec:
...@@ -145,6 +155,8 @@ spec: ...@@ -145,6 +155,8 @@ spec:
- "200" # target TTFT is 200ms - "200" # target TTFT is 200ms
- --itl - --itl
- "20" # target ITL is 20ms - "20" # target ITL is 20ms
- --backend
- <vllm/sglang>
``` ```
**Step 3: Run profiling (required)** **Step 3: Run profiling (required)**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment