"lib/bindings/vscode:/vscode.git/clone" did not exist on "3036e60b1e630d9e9905d4896642fd2ebdf3b0bc"
Unverified Commit 0b47f897 authored by Faradawn Yang's avatar Faradawn Yang Committed by GitHub
Browse files

feat: add kv router to sglang (#1605)

parent ae47638b
...@@ -71,6 +71,20 @@ cd /workspace/examples/sglang ...@@ -71,6 +71,20 @@ cd /workspace/examples/sglang
dynamo serve graphs.agg:Frontend -f ./configs/agg.yaml dynamo serve graphs.agg:Frontend -f ./configs/agg.yaml
``` ```
#### Aggregated with router
> [!NOTE]
> The current implementation of `examples/sglang/components/worker.py` publishes _placeholder_ engine metrics to keep the Dynamo KV-router happy. Real-time metrics will be surfaced directly from the SGLang engine once the following pull requests are merged:
> • Upstream: [sgl-project/sglang #6721](https://github.com/sgl-project/sglang/pull/6721) – _Expose runtime KV-cache & request metrics_.
> • Dynamo: [ai-dynamo/dynamo #1465](https://github.com/ai-dynamo/dynamo/pull/1465) – _feat: receive kvmetrics from sglang scheduler_.
>
> After these are in, the TODOs in `worker.py` will be resolved and the placeholder logic removed.
```bash
cd /workspace/examples/sglang
dynamo serve graphs.agg:Frontend -f ./configs/agg.yaml --Frontend.router=kv
```
#### Disaggregated #### Disaggregated
<details> <details>
......
...@@ -45,6 +45,7 @@ class FrontendConfig(BaseModel): ...@@ -45,6 +45,7 @@ class FrontendConfig(BaseModel):
served_model_name: str served_model_name: str
endpoint: str endpoint: str
port: int = 8080 port: int = 8080
router: str = "round-robin"
@service( @service(
...@@ -81,6 +82,8 @@ class Frontend: ...@@ -81,6 +82,8 @@ class Frontend:
f"out={endpoint}", f"out={endpoint}",
"--http-port", "--http-port",
str(self.frontend_config.port), str(self.frontend_config.port),
"--router-mode",
str(self.frontend_config.router),
], ],
stdout=None, stdout=None,
stderr=None, stderr=None,
......
...@@ -36,7 +36,13 @@ from sglang.srt.utils import get_ip ...@@ -36,7 +36,13 @@ from sglang.srt.utils import get_ip
from utils.protocol import DisaggPreprocessedRequest, PreprocessedRequest from utils.protocol import DisaggPreprocessedRequest, PreprocessedRequest
from utils.sgl_utils import parse_sglang_args from utils.sgl_utils import parse_sglang_args
from dynamo.llm import ModelType, register_llm from dynamo.llm import (
ModelType,
WorkerMetricsPublisher,
ZmqKvEventPublisher,
ZmqKvEventPublisherConfig,
register_llm,
)
from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service from dynamo.sdk import async_on_start, depends, dynamo_context, endpoint, service
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -57,20 +63,62 @@ class SGLangWorker: ...@@ -57,20 +63,62 @@ class SGLangWorker:
self.engine_args = parse_sglang_args(class_name, "") self.engine_args = parse_sglang_args(class_name, "")
self.engine = sgl.Engine(server_args=self.engine_args) self.engine = sgl.Engine(server_args=self.engine_args)
logger.info("SGLangWorker initialized") # Initialize metrics publisher
self.metrics_publisher = WorkerMetricsPublisher()
def _update_metrics(self):
"""Update metrics with current engine state"""
# TODO: remove this once the following upstream changes are merged:
# • ai-dynamo/dynamo#1465 – "feat: receive kvmetrics from sglang scheduler"
# • sgl-project/sglang#6721 – "Expose runtime KV-cache & request metrics"
logger.warning(
"Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands."
)
self.metrics_publisher.publish(
request_active_slots=1,
request_total_slots=100,
kv_active_blocks=random.randint(0, 500),
kv_total_blocks=1000,
num_requests_waiting=0,
gpu_cache_usage_perc=random.uniform(0.1, 0.8),
gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5),
)
async def create_metrics_publisher_endpoint(self):
component = dynamo_context["component"]
await self.metrics_publisher.create_endpoint(component)
@async_on_start @async_on_start
async def async_init(self): async def async_init(self):
runtime = dynamo_context["runtime"] runtime = dynamo_context["runtime"]
logger.info("Registering LLM for discovery")
comp_ns, comp_name = SGLangWorker.dynamo_address() # type: ignore comp_ns, comp_name = SGLangWorker.dynamo_address() # type: ignore
endpoint = runtime.namespace(comp_ns).component(comp_name).endpoint("generate") endpoint = runtime.namespace(comp_ns).component(comp_name).endpoint("generate")
component = runtime.namespace(comp_ns).component(comp_name)
logger.info(
f"Registering LLM for discovery with kv block size {self.engine_args.page_size}, endpoint={endpoint}, model_path={self.engine_args.model_path}, served_model_name={self.engine_args.served_model_name}"
)
await register_llm( await register_llm(
ModelType.Backend, ModelType.Backend,
endpoint, endpoint,
self.engine_args.model_path, self.engine_args.model_path,
self.engine_args.served_model_name, self.engine_args.served_model_name,
kv_cache_block_size=self.engine_args.page_size,
) )
self.metrics_publisher.publish(
request_active_slots=0,
request_total_slots=1024,
kv_active_blocks=0,
kv_total_blocks=1024,
num_requests_waiting=0,
gpu_cache_usage_perc=0.0,
gpu_prefix_cache_hit_rate=0.0,
)
# Create metrics publisher endpoint for KV router discovery
asyncio.create_task(self.create_metrics_publisher_endpoint())
if self.engine_args.disaggregation_mode: if self.engine_args.disaggregation_mode:
self.bootstrap_host, self.bootstrap_port = self._get_bootstrap_info() self.bootstrap_host, self.bootstrap_port = self._get_bootstrap_info()
comp_ns, comp_name = SGLangDecodeWorker.dynamo_address() # type: ignore comp_ns, comp_name = SGLangDecodeWorker.dynamo_address() # type: ignore
...@@ -81,6 +129,18 @@ class SGLangWorker: ...@@ -81,6 +129,18 @@ class SGLangWorker:
.client() .client()
) )
# Configure ZMQ KV Event Publisher to relay KV events from SGLang to NATS
zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(),
kv_block_size=self.engine_args.page_size, # Keep in sync with register_llm above
)
# Keep a reference on the instance to avoid the publisher being garbage-collected.
self._kv_event_publisher = ZmqKvEventPublisher(
component=component,
config=zmq_config,
)
def _get_bootstrap_info(self): def _get_bootstrap_info(self):
""" """
Bootstrap info is stored in the worker's tokenizer manager. We use it to Bootstrap info is stored in the worker's tokenizer manager. We use it to
...@@ -100,7 +160,6 @@ class SGLangWorker: ...@@ -100,7 +160,6 @@ class SGLangWorker:
return bootstrap_host, bootstrap_port return bootstrap_host, bootstrap_port
def _build_sampling_params(self, request: PreprocessedRequest) -> dict: def _build_sampling_params(self, request: PreprocessedRequest) -> dict:
# TODO: maintain a full mapping from PreprocessedRequest to SGLang's SamplingParams
sampling_params = {} sampling_params = {}
if request.sampling_options.temperature: if request.sampling_options.temperature:
sampling_params["temperature"] = request.sampling_options.temperature sampling_params["temperature"] = request.sampling_options.temperature
......
...@@ -20,6 +20,7 @@ Frontend: ...@@ -20,6 +20,7 @@ Frontend:
SGLangWorker: SGLangWorker:
model-path: deepseek-ai/DeepSeek-R1-Distill-Llama-8B model-path: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
served-model-name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B served-model-name: deepseek-ai/DeepSeek-R1-Distill-Llama-8B
page-size: 16
tp: 1 tp: 1
trust-remote-code: true trust-remote-code: true
skip-tokenizer-init: true skip-tokenizer-init: true
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment