Unverified Commit e1078789 authored by Alec's avatar Alec Committed by GitHub
Browse files

fix: add periodic vLLM engine stats logging and fix log routing (#6566)


Signed-off-by: default avataralec-flowers <aflowers@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent 5a7ead2b
...@@ -44,6 +44,7 @@ class VllmEngineMonitor: ...@@ -44,6 +44,7 @@ class VllmEngineMonitor:
self.engine_client = engine_client self.engine_client = engine_client
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self._monitor_task = asyncio.create_task(self._check_engine_health()) self._monitor_task = asyncio.create_task(self._check_engine_health())
self._stats_task = asyncio.create_task(self._periodic_log_stats())
logger.info( logger.info(
f"{self.__class__.__name__} initialized and health check task started." f"{self.__class__.__name__} initialized and health check task started."
...@@ -51,6 +52,7 @@ class VllmEngineMonitor: ...@@ -51,6 +52,7 @@ class VllmEngineMonitor:
def __del__(self): def __del__(self):
self._monitor_task.cancel() self._monitor_task.cancel()
self._stats_task.cancel()
def _shutdown_engine(self): def _shutdown_engine(self):
""" """
...@@ -117,3 +119,40 @@ class VllmEngineMonitor: ...@@ -117,3 +119,40 @@ class VllmEngineMonitor:
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug(f"{self.__class__.__name__}: Health check task cancelled.") logger.debug(f"{self.__class__.__name__}: Health check task cancelled.")
break break
async def _periodic_log_stats(self):
"""Periodically flush vLLM engine stats (throughput, cache usage, etc.)."""
try:
interval = float(os.environ.get("VLLM_LOG_STATS_INTERVAL", "10.0"))
except ValueError:
logger.warning(
"Invalid VLLM_LOG_STATS_INTERVAL value: %r, using default 10.0",
os.environ.get("VLLM_LOG_STATS_INTERVAL"),
)
interval = 10.0
if interval <= 0:
return
if not getattr(self.engine_client, "log_stats", True):
return
while True:
try:
if self.shutdown_event and self.shutdown_event.is_set():
break
if self.shutdown_event:
try:
await asyncio.wait_for(
self.shutdown_event.wait(), timeout=interval
)
break
except asyncio.TimeoutError:
pass
else:
await asyncio.sleep(interval)
await self.engine_client.do_log_stats()
except asyncio.CancelledError:
break
except Exception:
logger.debug("Error in periodic stats logging", exc_info=True)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for VllmEngineMonitor._periodic_log_stats."""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from dynamo.vllm.engine_monitor import VllmEngineMonitor
pytestmark = [
pytest.mark.unit,
pytest.mark.vllm,
pytest.mark.pre_merge,
]
@pytest.fixture
def mock_engine():
"""Create a mock engine client with log_stats enabled."""
engine = AsyncMock()
engine.log_stats = True
engine.do_log_stats = AsyncMock()
engine.check_health = AsyncMock()
return engine
def _make_monitor(engine, shutdown_event=None):
"""Create a VllmEngineMonitor bypassing __init__ validation."""
monitor = object.__new__(VllmEngineMonitor)
monitor.runtime = MagicMock()
monitor.engine_client = engine
monitor.shutdown_event = shutdown_event
monitor._monitor_task = asyncio.get_event_loop().create_future()
monitor._stats_task = asyncio.get_event_loop().create_future()
return monitor
@pytest.mark.asyncio
async def test_periodic_log_stats_calls_do_log_stats(mock_engine):
"""Stats task calls do_log_stats after the interval."""
shutdown_event = asyncio.Event()
monitor = _make_monitor(mock_engine, shutdown_event)
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "0.05"}):
task = asyncio.create_task(monitor._periodic_log_stats())
await asyncio.sleep(0.15)
shutdown_event.set()
await task
assert mock_engine.do_log_stats.call_count >= 1
@pytest.mark.asyncio
async def test_periodic_log_stats_skips_when_disabled(mock_engine):
"""Stats task exits immediately when log_stats is False."""
mock_engine.log_stats = False
monitor = _make_monitor(mock_engine)
task = asyncio.create_task(monitor._periodic_log_stats())
await task
mock_engine.do_log_stats.assert_not_called()
@pytest.mark.asyncio
async def test_periodic_log_stats_skips_when_interval_zero(mock_engine):
"""Stats task exits immediately when interval is 0."""
monitor = _make_monitor(mock_engine)
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "0"}):
task = asyncio.create_task(monitor._periodic_log_stats())
await task
mock_engine.do_log_stats.assert_not_called()
@pytest.mark.asyncio
async def test_periodic_log_stats_skips_when_interval_negative(mock_engine):
"""Stats task exits immediately when interval is negative."""
monitor = _make_monitor(mock_engine)
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "-1"}):
task = asyncio.create_task(monitor._periodic_log_stats())
await task
mock_engine.do_log_stats.assert_not_called()
@pytest.mark.asyncio
async def test_periodic_log_stats_respects_shutdown_event(mock_engine):
"""Stats task stops when shutdown_event is set."""
shutdown_event = asyncio.Event()
monitor = _make_monitor(mock_engine, shutdown_event)
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "0.05"}):
task = asyncio.create_task(monitor._periodic_log_stats())
await asyncio.sleep(0.02)
shutdown_event.set()
await asyncio.wait_for(task, timeout=1.0)
@pytest.mark.asyncio
async def test_periodic_log_stats_handles_exception(mock_engine):
"""Stats task continues after do_log_stats raises."""
shutdown_event = asyncio.Event()
monitor = _make_monitor(mock_engine, shutdown_event)
call_count = 0
async def flaky_log_stats():
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("transient error")
mock_engine.do_log_stats = flaky_log_stats
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "0.05"}):
task = asyncio.create_task(monitor._periodic_log_stats())
await asyncio.sleep(0.2)
shutdown_event.set()
await task
assert call_count >= 2
@pytest.mark.asyncio
async def test_periodic_log_stats_cancellation(mock_engine):
"""Stats task handles cancellation gracefully (exits without error)."""
monitor = _make_monitor(mock_engine)
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "0.05"}):
task = asyncio.create_task(monitor._periodic_log_stats())
await asyncio.sleep(0.02)
task.cancel()
# The method catches CancelledError and breaks, so it completes normally
await asyncio.wait_for(task, timeout=1.0)
assert task.done()
assert task.exception() is None
@pytest.mark.asyncio
async def test_periodic_log_stats_no_shutdown_event(mock_engine):
"""Stats task works without a shutdown_event (uses asyncio.sleep)."""
monitor = _make_monitor(mock_engine, shutdown_event=None)
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "0.05"}):
task = asyncio.create_task(monitor._periodic_log_stats())
await asyncio.sleep(0.12)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert mock_engine.do_log_stats.call_count >= 1
@pytest.mark.asyncio
async def test_periodic_log_stats_malformed_interval(mock_engine):
"""Stats task falls back to default 10s when VLLM_LOG_STATS_INTERVAL is invalid."""
shutdown_event = asyncio.Event()
monitor = _make_monitor(mock_engine, shutdown_event)
with patch.dict("os.environ", {"VLLM_LOG_STATS_INTERVAL": "not_a_number"}):
# Should not crash — falls back to 10.0s default
task = asyncio.create_task(monitor._periodic_log_stats())
# Give it a moment to start (it will sleep 10s, so just cancel quickly)
await asyncio.sleep(0.05)
shutdown_event.set()
await asyncio.wait_for(task, timeout=2.0)
# Task ran without error (used 10s fallback, didn't crash)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Regression tests for vLLM logging integration.
vLLM configures its own logger at import time (before dynamo can act).
These tests verify that dynamo reclaims the vllm logger in the main process,
that VLLM_LOGGING_LEVEL controls the vllm logger level (not DYN_LOG),
and that no config file is written (subprocesses use vLLM's built-in default).
"""
import logging
import os
import pytest
from dynamo.runtime.logging import (
VllmColorFormatter,
configure_dynamo_logging,
configure_vllm_logging,
)
pytestmark = [
pytest.mark.unit,
pytest.mark.vllm,
pytest.mark.pre_merge,
]
@pytest.fixture(autouse=True)
def _clean_env(monkeypatch):
"""Remove logging-related env vars before each test."""
for var in [
"DYN_LOG",
"VLLM_LOGGING_LEVEL",
"VLLM_CONFIGURE_LOGGING",
"VLLM_LOGGING_CONFIG_PATH",
"DYN_SKIP_SGLANG_LOG_FORMATTING",
"DYN_SKIP_TRTLLM_LOG_FORMATTING",
"SGLANG_LOGGING_CONFIG_PATH",
"TLLM_LOG_LEVEL",
]:
monkeypatch.delenv(var, raising=False)
@pytest.fixture(autouse=True)
def _clean_loggers():
"""Reset the root and vllm loggers before/after each test."""
def _reset():
for name in (None, "vllm"):
lgr = logging.getLogger(name)
lgr.handlers.clear()
lgr.setLevel(logging.WARNING if name is None else logging.NOTSET)
logging.getLogger("vllm").propagate = True
logging.getLogger("vllm.v1.engine.async_llm").filters.clear()
_reset()
yield
_reset()
def _simulate_vllm_import_time_logger():
"""Reproduce what vLLM's _configure_vllm_root_logger() does at import time."""
vllm_logger = logging.getLogger("vllm")
vllm_logger.addHandler(logging.StreamHandler())
vllm_logger.propagate = False
return vllm_logger
def test_dictconfig_replaces_vllm_stream_handler():
"""
Regression: vLLM sets up a StreamHandler at import time. configure_vllm_logging()
must replace it with a new StreamHandler using VllmColorFormatter so logs
bypass the Rust bridge and respect VLLM_LOGGING_LEVEL independently.
"""
vllm_logger = _simulate_vllm_import_time_logger()
configure_vllm_logging(logging.INFO)
assert len(vllm_logger.handlers) == 1
assert isinstance(vllm_logger.handlers[0], logging.StreamHandler)
assert isinstance(vllm_logger.handlers[0].formatter, VllmColorFormatter)
def test_vllm_logging_level_controls_vllm_logger(monkeypatch):
"""
VLLM_LOGGING_LEVEL=DEBUG must set the vllm logger to DEBUG,
regardless of DYN_LOG.
"""
monkeypatch.setenv("VLLM_LOGGING_LEVEL", "DEBUG")
configure_vllm_logging(logging.INFO)
vllm_logger = logging.getLogger("vllm")
assert vllm_logger.level == logging.DEBUG
assert isinstance(vllm_logger.handlers[0], logging.StreamHandler)
assert isinstance(vllm_logger.handlers[0].formatter, VllmColorFormatter)
def test_dyn_log_does_not_affect_vllm_level(monkeypatch):
"""
DYN_LOG=debug alone must NOT change the vllm logger level from the
default INFO. DYN_LOG controls dynamo logging only.
"""
monkeypatch.setenv("DYN_LOG", "debug")
configure_dynamo_logging()
vllm_logger = logging.getLogger("vllm")
assert vllm_logger.level == logging.INFO
assert isinstance(vllm_logger.handlers[0], logging.StreamHandler)
assert isinstance(vllm_logger.handlers[0].formatter, VllmColorFormatter)
def test_no_config_file_written():
"""
configure_vllm_logging() must NOT set VLLM_LOGGING_CONFIG_PATH.
Subprocesses should use vLLM's built-in DEFAULT_LOGGING_CONFIG instead
of a config file pointing to dynamo's LogHandler (which requires the
Rust runtime that is not available in subprocesses).
"""
configure_vllm_logging(logging.INFO)
assert "VLLM_LOGGING_CONFIG_PATH" not in os.environ
assert os.environ.get("VLLM_CONFIGURE_LOGGING") == "1"
def test_health_check_filter_applied():
"""
configure_vllm_logging() must add a filter to vllm.v1.engine.async_llm
that suppresses DEBUG-level check_health messages.
"""
configure_vllm_logging(logging.INFO)
async_llm_logger = logging.getLogger("vllm.v1.engine.async_llm")
assert len(async_llm_logger.filters) == 1
# Simulate a check_health DEBUG record — should be filtered out
record = logging.LogRecord(
name="vllm.v1.engine.async_llm",
level=logging.DEBUG,
pathname="",
lineno=0,
msg="Called check_health.",
args=(),
exc_info=None,
)
record.funcName = "check_health"
health_filter = async_llm_logger.filters[0]
assert isinstance(health_filter, logging.Filter)
assert not health_filter.filter(record)
# Same message at WARNING level — should pass through
record.levelno = logging.WARNING
assert health_filter.filter(record)
def test_health_check_filter_not_duplicated():
"""
Calling configure_vllm_logging() multiple times must not accumulate
duplicate health-check filters.
"""
configure_vllm_logging(logging.INFO)
configure_vllm_logging(logging.INFO)
configure_vllm_logging(logging.INFO)
async_llm_logger = logging.getLogger("vllm.v1.engine.async_llm")
assert len(async_llm_logger.filters) == 1
...@@ -15,8 +15,10 @@ ...@@ -15,8 +15,10 @@
import json import json
import logging import logging
import logging.config
import os import os
import tempfile import tempfile
from datetime import datetime, timezone
from dynamo._core import log_message from dynamo._core import log_message
...@@ -44,6 +46,57 @@ class LogHandler(logging.Handler): ...@@ -44,6 +46,57 @@ class LogHandler(logging.Handler):
) )
class _HealthCheckFilter(logging.Filter):
"""Suppress DEBUG-level check_health messages from vLLM's AsyncLLM.
Dynamo's VllmEngineMonitor calls check_health every 2s which floods
the logs at DEBUG level. vLLM's own server doesn't have this issue
because it doesn't run a periodic health check loop.
"""
def filter(self, record):
return not (
record.funcName == "check_health" and record.levelno <= logging.DEBUG
)
class VllmColorFormatter(logging.Formatter):
"""Formatter that matches Rust tracing's compact colored output style.
Used for vLLM logs routed through a StreamHandler (bypassing the Rust
bridge) so that VLLM_LOGGING_LEVEL is respected independently of DYN_LOG
while still producing visually consistent colored output.
"""
# ANSI color codes matching Rust tracing's defaults
_COLORS = {
"DEBUG": "\033[2m", # dim
"INFO": "\033[32m", # green
"WARNING": "\033[33m", # yellow
"ERROR": "\033[31m", # red
}
_DIM = "\033[2m"
_RESET = "\033[0m"
def format(self, record):
ts = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)
level = record.levelname
color = self._COLORS.get(level, "")
if record.funcName and record.funcName != "<module>":
target = f"{record.module}.{record.funcName}"
else:
target = record.module
msg = record.getMessage()
return (
f"{self._DIM}{ts}{self._RESET} "
f"{color}{level:>5}{self._RESET} "
f"{self._DIM}{target}{self._RESET}{self._DIM}:{self._RESET} "
f"{msg}"
)
# Configure the Python logger to use the NimLogHandler # Configure the Python logger to use the NimLogHandler
def configure_logger(service_name: str | None, worker_id: int | None): def configure_logger(service_name: str | None, worker_id: int | None):
""" """
...@@ -93,8 +146,7 @@ def configure_dynamo_logging( ...@@ -93,8 +146,7 @@ def configure_dynamo_logging(
dyn_level = log_level_mapping(dyn_var) dyn_level = log_level_mapping(dyn_var)
# configure inference engine loggers # configure inference engine loggers
if not get_bool_env_var("DYN_SKIP_VLLM_LOG_FORMATTING"): configure_vllm_logging(dyn_level)
configure_vllm_logging(dyn_level)
if not get_bool_env_var("DYN_SKIP_SGLANG_LOG_FORMATTING"): if not get_bool_env_var("DYN_SKIP_SGLANG_LOG_FORMATTING"):
configure_sglang_logging(dyn_level) configure_sglang_logging(dyn_level)
if not get_bool_env_var("DYN_SKIP_TRTLLM_LOG_FORMATTING"): if not get_bool_env_var("DYN_SKIP_TRTLLM_LOG_FORMATTING"):
...@@ -165,34 +217,59 @@ def configure_sglang_logging(dyn_level: int): ...@@ -165,34 +217,59 @@ def configure_sglang_logging(dyn_level: int):
def configure_vllm_logging(dyn_level: int): def configure_vllm_logging(dyn_level: int):
""" """
vLLM requires a logging config file to be set in the environment. Configure vLLM logging for the main process and subprocesses.
This function creates a temporary file with the VLLM logging config and sets the
VLLM_LOGGING_CONFIG_PATH environment variable to the path of the file. Main process: replaces vLLM's StreamHandler with a new StreamHandler that
""" uses VllmColorFormatter and writes directly to stderr. This bypasses the
Rust LogHandler bridge so that VLLM_LOGGING_LEVEL is respected independently
of DYN_LOG (the Rust bridge filters based on DYN_LOG).
Subprocesses (EngineCore, workers): use vLLM's DEFAULT_LOGGING_CONFIG
(StreamHandler to stderr) since the Rust runtime is not initialized there.
Setting VLLM_CONFIGURE_LOGGING=1 without VLLM_LOGGING_CONFIG_PATH causes
vLLM to use its built-in default config in spawned subprocesses.
The dyn_level param is kept for signature compatibility but does not control
the vLLM logger level. Use VLLM_LOGGING_LEVEL env var instead.
"""
os.environ["VLLM_CONFIGURE_LOGGING"] = "1" os.environ["VLLM_CONFIGURE_LOGGING"] = "1"
vllm_level = logging.getLevelName(dyn_level)
# Create a temporary config file for VLLM # vLLM level is controlled exclusively by VLLM_LOGGING_LEVEL.
vllm_config = { # DYN_LOG controls dynamo logging only — it does not affect vLLM.
"formatters": {"simple": {"format": "%(message)s"}}, vllm_level = os.environ.get("VLLM_LOGGING_LEVEL", "INFO").upper()
# Use a StreamHandler to stderr with VllmColorFormatter (colored output
# matching Rust tracing style). This bypasses the Rust env_filter so
# VLLM_LOGGING_LEVEL is fully independent of DYN_LOG.
main_config = {
"formatters": {
"vllm": {
"()": "dynamo.runtime.logging.VllmColorFormatter",
}
},
"handlers": { "handlers": {
"dynamo": { "vllm_stderr": {
"class": "dynamo.runtime.logging.LogHandler", "class": "logging.StreamHandler",
"formatter": "simple", "formatter": "vllm",
"level": vllm_level, "stream": "ext://sys.stderr",
} }
}, },
"loggers": { "loggers": {
"vllm": {"handlers": ["dynamo"], "level": vllm_level, "propagate": False} "vllm": {
"handlers": ["vllm_stderr"],
"level": vllm_level,
"propagate": False,
}
}, },
"version": 1, "version": 1,
"disable_existing_loggers": False, "disable_existing_loggers": False,
} }
logging.config.dictConfig(main_config)
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: # Add health-check filter (idempotent — skips if already present).
json.dump(vllm_config, f) async_llm_logger = logging.getLogger("vllm.v1.engine.async_llm")
os.environ["VLLM_LOGGING_CONFIG_PATH"] = f.name if not any(isinstance(f, _HealthCheckFilter) for f in async_llm_logger.filters):
async_llm_logger.addFilter(_HealthCheckFilter())
def map_dyn_log_to_tllm_level(dyn_log_value: str) -> str: def map_dyn_log_to_tllm_level(dyn_log_value: str) -> str:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment