"lib/bindings/python/vscode:/vscode.git/clone" did not exist on "72ec5f5c7b9e3d482faa3485fa81994e99d33b22"
Unverified Commit 17abc9de authored by Hongkuan Zhou's avatar Hongkuan Zhou Committed by GitHub
Browse files

chore: expose inc id and add version to forwardpassmetric (#7501)


Signed-off-by: default avatarhongkuanz <hongkuanz@nvidia.com>
parent da2c5e76
......@@ -36,8 +36,14 @@ TODO: planner consuming these metrics instead of frontend/router metrics
from __future__ import annotations
import logging
import msgspec
logger = logging.getLogger(__name__)
FPM_VERSION: int = 1
class WelfordAccumulator:
"""Welford's online algorithm for count / sum / population-variance.
......@@ -156,12 +162,21 @@ class ForwardPassMetrics(
engine transitions from active to idle.
"""
# Schema version. Consumers must check this before interpreting
# the remaining fields. Bump when the schema changes incompatibly.
version: int = FPM_VERSION
# Unique worker identifier (Dynamo runtime connection_id).
worker_id: str = ""
# Data parallel rank. Each DP rank has its own scheduler and ZMQ port.
dp_rank: int = 0
# Monotonically increasing sequence number per (worker_id, dp_rank).
# Set by _FpmPublisherThread before encoding; 0 for messages that
# have not been stamped (e.g. unit-test fixtures).
counter_id: int = 0
# Wall-clock time of this iteration: from schedule() to update_from_output().
# Covers scheduling + model forward pass + output processing.
# 0.0 for idle heartbeat messages.
......@@ -182,5 +197,27 @@ def encode(metrics: ForwardPassMetrics) -> bytes:
return _encoder.encode(metrics)
def decode(data: bytes) -> ForwardPassMetrics:
return _decoder.decode(data)
class UnsupportedFpmVersionError(Exception):
"""Raised when a ForwardPassMetrics message has an unrecognised version."""
def decode(data: bytes) -> ForwardPassMetrics | None:
"""Decode a ForwardPassMetrics message, returning None for unknown versions.
Returns None (and logs a warning) if the message cannot be decoded or
carries a version this code does not understand, so callers can simply
skip unsupported messages without crashing.
"""
try:
metrics = _decoder.decode(data)
except Exception:
logger.warning("Failed to decode ForwardPassMetrics message, skipping")
return None
if metrics.version != FPM_VERSION:
logger.warning(
"Unsupported ForwardPassMetrics version %d (expected %d), skipping",
metrics.version,
FPM_VERSION,
)
return None
return metrics
......@@ -16,13 +16,18 @@ Usage:
import argparse
import asyncio
import json
import logging
import os
import sys
import msgspec
from dynamo.common.forward_pass_metrics import decode
from dynamo.llm import FpmEventSubscriber
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
configure_dynamo_logging()
logger = logging.getLogger(__name__)
def main() -> None:
......@@ -54,8 +59,6 @@ def main() -> None:
async def run(args: argparse.Namespace) -> None:
from dynamo.llm import FpmEventSubscriber
loop = asyncio.get_running_loop()
event_plane = os.environ.get("DYN_EVENT_PLANE", "nats")
enable_nats = args.request_plane == "nats" or event_plane == "nats"
......@@ -67,26 +70,32 @@ async def run(args: argparse.Namespace) -> None:
subscriber = FpmEventSubscriber(endpoint)
json_encoder = msgspec.json.Encoder()
print(
f"Subscribed to forward-pass-metrics via event plane "
f"(namespace={args.namespace}, component={args.component}) "
f"Ctrl+C to stop",
file=sys.stderr,
logger.info(
"Subscribed to forward-pass-metrics via event plane "
"(namespace=%s, component=%s) Ctrl+C to stop",
args.namespace,
args.component,
)
seq = 0
try:
while True:
data = await asyncio.to_thread(subscriber.recv)
if data is None:
print("Stream closed.", file=sys.stderr)
logger.info("Stream closed.")
break
metrics = decode(data)
if metrics is None:
continue
pretty = json.loads(json_encoder.encode(metrics))
print(f"[seq={seq}] {json.dumps(pretty, indent=2)}", flush=True)
seq += 1
logger.info(
"[worker=%s dp=%d counter=%d] %s",
metrics.worker_id,
metrics.dp_rank,
metrics.counter_id,
json.dumps(pretty, indent=2),
)
except KeyboardInterrupt:
print("\nStopped.", file=sys.stderr)
logger.info("Stopped.")
finally:
subscriber.shutdown()
......
......@@ -25,6 +25,7 @@ import time
from itertools import count
from typing import TYPE_CHECKING
import msgspec.structs
import zmq
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.core.sched.scheduler import Scheduler
......@@ -37,6 +38,7 @@ from dynamo.common.forward_pass_metrics import (
WelfordAccumulator,
encode,
)
from dynamo.runtime.logging import configure_dynamo_logging
if TYPE_CHECKING:
from vllm.config import VllmConfig
......@@ -45,6 +47,7 @@ if TYPE_CHECKING:
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.structured_output import StructuredOutputManager
configure_dynamo_logging()
logger = logging.getLogger(__name__)
DEFAULT_FPM_PORT = 20380
......@@ -128,8 +131,10 @@ class _FpmPublisherThread:
continue
try:
seq = next(self._seq)
metrics = msgspec.structs.replace(metrics, counter_id=seq)
payload = encode(metrics)
seq_bytes = next(self._seq).to_bytes(8, "big")
seq_bytes = seq.to_bytes(8, "big")
self._pub.send_multipart((topic, seq_bytes, payload), flags=zmq.NOBLOCK)
last_publish = time.monotonic()
except zmq.Again:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment