Unverified Commit 6ebe34d6 authored by Max Hu's avatar Max Hu Committed by GitHub
Browse files

[Feature] Add iteration level logging and enhance nvtx marker (#31193)


Signed-off-by: default avatarMax Hu <maxhu@nvidia.com>
Signed-off-by: default avatarMax Hu <hyoung2991@gmail.com>
Co-authored-by: default avatarMax Hu <maxhu@nvidia.com>
parent 11cec296
...@@ -75,6 +75,12 @@ class ObservabilityConfig: ...@@ -75,6 +75,12 @@ class ObservabilityConfig:
enable_mfu_metrics: bool = False enable_mfu_metrics: bool = False
"""Enable Model FLOPs Utilization (MFU) metrics.""" """Enable Model FLOPs Utilization (MFU) metrics."""
enable_logging_iteration_details: bool = False
"""Enable detailed logging of iteration details.
If set, vllm EngineCore will log iteration details
This includes number of context/generation requests and tokens
and the elapsed cpu time for the iteration."""
@cached_property @cached_property
def collect_model_forward_time(self) -> bool: def collect_model_forward_time(self) -> bool:
"""Whether to collect model forward time for the request.""" """Whether to collect model forward time for the request."""
......
...@@ -525,6 +525,9 @@ class EngineArgs: ...@@ -525,6 +525,9 @@ class EngineArgs:
ObservabilityConfig.enable_layerwise_nvtx_tracing ObservabilityConfig.enable_layerwise_nvtx_tracing
) )
enable_mfu_metrics: bool = ObservabilityConfig.enable_mfu_metrics enable_mfu_metrics: bool = ObservabilityConfig.enable_mfu_metrics
enable_logging_iteration_details: bool = (
ObservabilityConfig.enable_logging_iteration_details
)
enable_mm_processor_stats: bool = ObservabilityConfig.enable_mm_processor_stats enable_mm_processor_stats: bool = ObservabilityConfig.enable_mm_processor_stats
scheduling_policy: SchedulerPolicy = SchedulerConfig.policy scheduling_policy: SchedulerPolicy = SchedulerConfig.policy
scheduler_cls: str | type[object] | None = SchedulerConfig.scheduler_cls scheduler_cls: str | type[object] | None = SchedulerConfig.scheduler_cls
...@@ -1059,6 +1062,10 @@ class EngineArgs: ...@@ -1059,6 +1062,10 @@ class EngineArgs:
"--enable-mfu-metrics", "--enable-mfu-metrics",
**observability_kwargs["enable_mfu_metrics"], **observability_kwargs["enable_mfu_metrics"],
) )
observability_group.add_argument(
"--enable-logging-iteration-details",
**observability_kwargs["enable_logging_iteration_details"],
)
# Scheduler arguments # Scheduler arguments
scheduler_kwargs = get_kwargs(SchedulerConfig) scheduler_kwargs = get_kwargs(SchedulerConfig)
...@@ -1713,6 +1720,7 @@ class EngineArgs: ...@@ -1713,6 +1720,7 @@ class EngineArgs:
enable_layerwise_nvtx_tracing=self.enable_layerwise_nvtx_tracing, enable_layerwise_nvtx_tracing=self.enable_layerwise_nvtx_tracing,
enable_mfu_metrics=self.enable_mfu_metrics, enable_mfu_metrics=self.enable_mfu_metrics,
enable_mm_processor_stats=self.enable_mm_processor_stats, enable_mm_processor_stats=self.enable_mm_processor_stats,
enable_logging_iteration_details=self.enable_logging_iteration_details,
) )
# Compilation config overrides # Compilation config overrides
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from dataclasses import dataclass from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from vllm._bc_linter import bc_linter_include from vllm._bc_linter import bc_linter_include
...@@ -151,6 +152,20 @@ class CachedRequestData: ...@@ -151,6 +152,20 @@ class CachedRequestData:
def num_reqs(self) -> int: def num_reqs(self) -> int:
return len(self.req_ids) return len(self.req_ids)
@cached_property
def _req_id_to_num_output_tokens(self) -> dict[str, int]:
"""Cache mapping of req_id to num_output_tokens for O(1) lookup.
This cached property is safe because CachedRequestData instances
are created fresh each scheduling iteration and not mutated during
computation of iteration details.
"""
return dict(zip(self.req_ids, self.num_output_tokens))
def is_context_phase(self, req_id: str) -> bool:
num_output_tokens = self._req_id_to_num_output_tokens.get(req_id)
return num_output_tokens is not None and num_output_tokens == 0
@classmethod @classmethod
def make_empty(cls) -> "CachedRequestData": def make_empty(cls) -> "CachedRequestData":
return cls( return cls(
......
...@@ -65,6 +65,7 @@ from vllm.v1.outputs import ModelRunnerOutput ...@@ -65,6 +65,7 @@ from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus from vllm.v1.request import Request, RequestStatus
from vllm.v1.serial_utils import MsgpackDecoder, MsgpackEncoder from vllm.v1.serial_utils import MsgpackDecoder, MsgpackEncoder
from vllm.v1.structured_output import StructuredOutputManager from vllm.v1.structured_output import StructuredOutputManager
from vllm.v1.utils import compute_iteration_details
from vllm.version import __version__ as VLLM_VERSION from vllm.version import __version__ as VLLM_VERSION
logger = init_logger(__name__) logger = init_logger(__name__)
...@@ -208,7 +209,6 @@ class EngineCore: ...@@ -208,7 +209,6 @@ class EngineCore:
self.async_scheduling = vllm_config.scheduler_config.async_scheduling self.async_scheduling = vllm_config.scheduler_config.async_scheduling
self.aborts_queue = queue.Queue[list[str]]() self.aborts_queue = queue.Queue[list[str]]()
# Mark the startup heap as static so that it's ignored by GC. # Mark the startup heap as static so that it's ignored by GC.
# Reduces pause times of oldest generation collections. # Reduces pause times of oldest generation collections.
freeze_gc_heap() freeze_gc_heap()
...@@ -337,6 +337,36 @@ class EngineCore: ...@@ -337,6 +337,36 @@ class EngineCore:
) )
raise err raise err
@contextmanager
def log_iteration_details(self, scheduler_output: SchedulerOutput):
if not self.vllm_config.observability_config.enable_logging_iteration_details:
yield
return
self._iteration_index = getattr(self, "_iteration_index", 0)
iteration_details = compute_iteration_details(scheduler_output)
before = time.monotonic()
yield
logger.info(
"".join(
[
"Iteration(",
str(self._iteration_index),
"): ",
str(iteration_details.num_ctx_requests),
" context requests, ",
str(iteration_details.num_ctx_tokens),
" context tokens, ",
str(iteration_details.num_generation_requests),
" generation requests, ",
str(iteration_details.num_generation_tokens),
" generation tokens, iteration elapsed time: ",
format((time.monotonic() - before) * 1000, ".2f"),
" ms",
]
)
)
self._iteration_index += 1
def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]: def step(self) -> tuple[dict[int, EngineCoreOutputs], bool]:
"""Schedule, execute, and make output. """Schedule, execute, and make output.
...@@ -351,7 +381,10 @@ class EngineCore: ...@@ -351,7 +381,10 @@ class EngineCore:
scheduler_output = self.scheduler.schedule() scheduler_output = self.scheduler.schedule()
future = self.model_executor.execute_model(scheduler_output, non_block=True) future = self.model_executor.execute_model(scheduler_output, non_block=True)
grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output) grammar_output = self.scheduler.get_grammar_bitmask(scheduler_output)
with self.log_error_detail(scheduler_output): with (
self.log_error_detail(scheduler_output),
self.log_iteration_details(scheduler_output),
):
model_output = future.result() model_output = future.result()
if model_output is None: if model_output is None:
model_output = self.model_executor.sample_tokens(grammar_output) model_output = self.model_executor.sample_tokens(grammar_output)
...@@ -447,7 +480,10 @@ class EngineCore: ...@@ -447,7 +480,10 @@ class EngineCore:
# Block until the next result is available. # Block until the next result is available.
future, scheduler_output, exec_model_fut = batch_queue.pop() future, scheduler_output, exec_model_fut = batch_queue.pop()
with self.log_error_detail(scheduler_output): with (
self.log_error_detail(scheduler_output),
self.log_iteration_details(scheduler_output),
):
model_output = future.result() model_output = future.result()
if model_output is None: if model_output is None:
# None from sample_tokens() implies that the original execute_model() # None from sample_tokens() implies that the original execute_model()
......
...@@ -7,6 +7,7 @@ import time ...@@ -7,6 +7,7 @@ import time
import weakref import weakref
from collections.abc import Callable, Sequence from collections.abc import Callable, Sequence
from contextlib import AbstractContextManager from contextlib import AbstractContextManager
from dataclasses import dataclass
from multiprocessing import connection from multiprocessing import connection
from multiprocessing.process import BaseProcess from multiprocessing.process import BaseProcess
from typing import ( from typing import (
...@@ -27,6 +28,7 @@ from vllm.logger import init_logger ...@@ -27,6 +28,7 @@ from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext, is_usage_stats_enabled, usage_message from vllm.usage.usage_lib import UsageContext, is_usage_stats_enabled, usage_message
from vllm.utils.network_utils import get_open_port, get_open_zmq_ipc_path, get_tcp_uri from vllm.utils.network_utils import get_open_port, get_open_zmq_ipc_path, get_tcp_uri
from vllm.utils.system_utils import kill_process_tree from vllm.utils.system_utils import kill_process_tree
from vllm.v1.core.sched.output import SchedulerOutput
if TYPE_CHECKING: if TYPE_CHECKING:
import numpy as np import numpy as np
...@@ -412,3 +414,53 @@ def tensor_data(tensor: torch.Tensor) -> memoryview: ...@@ -412,3 +414,53 @@ def tensor_data(tensor: torch.Tensor) -> memoryview:
A memoryview of the tensor data as uint8. A memoryview of the tensor data as uint8.
""" """
return tensor.flatten().contiguous().view(torch.uint8).numpy().data return tensor.flatten().contiguous().view(torch.uint8).numpy().data
@dataclass
class IterationDetails:
num_ctx_requests: int
num_ctx_tokens: int
num_generation_requests: int
num_generation_tokens: int
def __repr__(self) -> str:
return f"IterationDetails(num_ctx_requests={self.num_ctx_requests},\
num_ctx_tokens={self.num_ctx_tokens}, \
num_generation_requests={self.num_generation_requests}, \
num_generation_tokens={self.num_generation_tokens})"
def compute_iteration_details(scheduler_output: SchedulerOutput) -> IterationDetails:
"""
Compute the number of context/generation requests and tokens
for the current iteration's scheduler output. A requests is regarded
as a context request if its output tokens are still 0, an extended chunk
of chunked prefill falls into this category.
Args:
scheduler_output: The scheduler output for the current iteration.
Returns:
An IterationDetails object containing the number of
context/generation requests and tokens.
"""
num_context_requests = 0
num_context_tokens = 0
num_generation_requests = 0
num_generation_tokens = 0
new_req_ids = {new_req.req_id for new_req in scheduler_output.scheduled_new_reqs}
for req_id, num_tokens in scheduler_output.num_scheduled_tokens.items():
if scheduler_output.scheduled_cached_reqs.is_context_phase(req_id) or (
req_id in new_req_ids
):
num_context_requests += 1
num_context_tokens += num_tokens
else:
num_generation_requests += 1
num_generation_tokens += num_tokens
return IterationDetails(
num_context_requests,
num_context_tokens,
num_generation_requests,
num_generation_tokens,
)
...@@ -50,7 +50,7 @@ from vllm.v1.outputs import ( ...@@ -50,7 +50,7 @@ from vllm.v1.outputs import (
DraftTokenIds, DraftTokenIds,
ModelRunnerOutput, ModelRunnerOutput,
) )
from vllm.v1.utils import report_usage_stats from vllm.v1.utils import compute_iteration_details, report_usage_stats
from vllm.v1.worker.utils import is_residual_scattered_for_sp from vllm.v1.worker.utils import is_residual_scattered_for_sp
from vllm.v1.worker.worker_base import WorkerBase from vllm.v1.worker.worker_base import WorkerBase
from vllm.v1.worker.workspace import init_workspace_manager from vllm.v1.worker.workspace import init_workspace_manager
...@@ -547,18 +547,29 @@ class Worker(WorkerBase): ...@@ -547,18 +547,29 @@ class Worker(WorkerBase):
def annotate_profile(self, scheduler_output): def annotate_profile(self, scheduler_output):
# add trace annotation so that we can easily distinguish # add trace annotation so that we can easily distinguish
# new/cached request numbers in each iteration # context/generation request numbers in each iteration.
# A context request is a request that has not yet generated any tokens
if not self.profiler: if not self.profiler:
return nullcontext() return nullcontext()
self.profiler.step() self.profiler.step()
num_new = len(scheduler_output.scheduled_new_reqs) iteration_details = compute_iteration_details(scheduler_output)
num_cached = len(scheduler_output.scheduled_cached_reqs.req_ids)
annotation = "".join(
return self.profiler.annotate_context_manager( [
f"execute_new_{num_new}_cached_{num_cached}" "execute_context_",
str(iteration_details.num_ctx_requests),
"(",
str(iteration_details.num_ctx_tokens),
")_generation_",
str(iteration_details.num_generation_requests),
"(",
str(iteration_details.num_generation_tokens),
")",
]
) )
return self.profiler.annotate_context_manager(annotation)
@torch.inference_mode() @torch.inference_mode()
def sample_tokens( def sample_tokens(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment