Unverified Commit fc36bf5b authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

feat: receive kvmetrics from sglang scheduler (#1789)


Co-authored-by: default avatarzixuanzhang226 <zixuanzhang@bytedance.com>
parent df91fce2
......@@ -11,8 +11,9 @@ from typing import Any, Dict, Optional, Union
import sglang as sgl
import uvloop
import zmq
from sglang.srt.server_args import ServerArgs
from sglang.srt.utils import get_ip
from sglang.srt.utils import get_ip, get_zmq_socket
from utils.protocol import DisaggPreprocessedRequest
from utils.sgl_utils import parse_sglang_args_inc
......@@ -45,6 +46,9 @@ class RequestHandler:
self.component = component
self.metrics_publisher = WorkerMetricsPublisher()
self.zmq_context = zmq.asyncio.Context() # type: ignore
self.receive_metrics_from_scheduler = None
if server_args.disaggregation_mode != "null":
self.bootstrap_host, self.bootstrap_port = self._get_bootstrap_info()
if decode_client is None:
......@@ -59,19 +63,33 @@ class RequestHandler:
logging.info("Request handler initialized")
def setup_metrics(self):
"""Set up metrics publisher - call this after handler creation"""
"""Set up metrics publisher"""
self.receive_metrics_from_scheduler = get_zmq_socket(
self.zmq_context, zmq.PULL, self.engine.port_args.metrics_ipc_name, True
)
self.init_publish()
asyncio.create_task(self._receive_and_publish_metrics_loop())
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback(
lambda _: logging.debug("metrics publisher endpoint created")
)
def init_publish(self):
"""Publish initial set of warmup metrics"""
worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=1024,
num_requests_waiting=0,
data_parallel_rank=None,
data_parallel_rank=0,
)
kv_stats = KvStats(
kv_active_blocks=0,
kv_total_blocks=1024,
gpu_cache_usage_perc=0.0,
gpu_prefix_cache_hit_rate=0.0,
gpu_cache_usage_perc=0,
gpu_prefix_cache_hit_rate=0,
)
metrics = ForwardPassMetrics(
......@@ -79,47 +97,40 @@ class RequestHandler:
kv_stats=kv_stats,
spec_decode_stats=None,
)
self.metrics_publisher.publish(metrics)
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback(
lambda _: logging.debug("metrics publisher endpoint created")
)
async def create_metrics_publisher_endpoint(self):
logging.debug("Creating metrics publisher endpoint")
await self.metrics_publisher.create_endpoint(self.component)
def _update_metrics(self):
"""Update metrics with current engine state"""
# TODO: remove this once the following upstream changes are merged:
# • sgl-project/sglang#6721 – "Expose runtime KV-cache & request metrics"
logging.warning(
"Publishing placeholder metrics in SGLangWorker; these are NOT real engine metrics yet and will be replaced once upstream support lands."
)
async def _receive_and_publish_metrics_loop(self):
"""Receive metrics from SGL scheduler and publish them"""
while True:
try:
kv_metrics = await self.receive_metrics_from_scheduler.recv_pyobj() # type: ignore
worker_stats = WorkerStats(
request_active_slots=0,
request_total_slots=1024,
num_requests_waiting=0,
data_parallel_rank=None,
request_active_slots=kv_metrics.request_active_slots,
request_total_slots=kv_metrics.request_total_slots,
num_requests_waiting=kv_metrics.num_requests_waiting,
data_parallel_rank=kv_metrics.data_parallel_rank, # Note: 0 means it's either 0 or None from sglang
)
kv_stats = KvStats(
kv_active_blocks=random.randint(0, 500),
kv_total_blocks=1000,
gpu_cache_usage_perc=random.uniform(0.1, 0.8),
gpu_prefix_cache_hit_rate=random.uniform(0.0, 0.5),
kv_active_blocks=kv_metrics.kv_active_blocks,
kv_total_blocks=kv_metrics.kv_total_blocks,
gpu_cache_usage_perc=kv_metrics.gpu_cache_usage_perc,
gpu_prefix_cache_hit_rate=kv_metrics.gpu_prefix_cache_hit_rate,
)
# TODO: get spec_dec_stats from sglang once real engine metrics are available
spec_dec_stats = None
metrics = ForwardPassMetrics(
worker_stats=worker_stats,
kv_stats=kv_stats,
spec_decode_stats=spec_dec_stats,
)
self.metrics_publisher.publish(metrics)
except Exception:
logging.exception("Failed to recieve or publish metrics")
def _get_bootstrap_info(self):
"""Bootstrap info from tokenizer manager"""
......@@ -332,7 +343,7 @@ async def init(runtime: DistributedRuntime, server_args: ServerArgs):
else:
handler = RequestHandler(engine, server_args, component)
# Set up metrics in background
# Set up the engine metrics reciever
handler.setup_metrics()
# Set up ZMQ kv event publisher
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment