"docs/source/deployment/frameworks/helm.md" did not exist on "fe2e10c71b98a43ccde0e8aba0d4fe0d23369538"
Unverified Commit 6831650c authored by Ming Yang's avatar Ming Yang Committed by GitHub
Browse files

[offloader] v2: Hide weight onloading latency via prefetching (#29941)


Signed-off-by: default avatarMing Yang <minos.future@gmail.com>
Signed-off-by: default avatarMichael Goin <mgoin64@gmail.com>
Co-authored-by: default avatarMichael Goin <mgoin64@gmail.com>
parent ed42507f
#!/usr/bin/env bash
set -euxo pipefail
# Nightly e2e test for prefetch offloading with a MoE model.
# Runs DeepSeek-V2-Lite with prefetch offloading of MoE expert weights
# and validates GSM8K accuracy matches baseline (no offloading).
#
# args: [THRESHOLD] [NUM_QUESTIONS] [START_PORT]
THRESHOLD=${1:-0.25}
NUM_Q=${2:-1319}
PORT=${3:-8030}
OUT_DIR=${OUT_DIR:-/tmp/vllm-scheduled}
mkdir -p "${OUT_DIR}"
wait_for_server() {
local port=$1
timeout 600 bash -c '
until curl -sf "http://127.0.0.1:'"$port"'/health" > /dev/null; do
sleep 1
done'
}
MODEL="deepseek-ai/DeepSeek-V2-Lite"
cleanup() {
if [[ -n "${SERVER_PID:-}" ]] && kill -0 "${SERVER_PID}" 2>/dev/null; then
kill "${SERVER_PID}" 2>/dev/null || true
for _ in {1..20}; do
kill -0 "${SERVER_PID}" 2>/dev/null || break
sleep 0.5
done
kill -9 "${SERVER_PID}" 2>/dev/null || true
fi
}
trap cleanup EXIT
vllm serve "$MODEL" \
--max-model-len 2048 \
--offload-group-size 8 \
--offload-num-in-group 2 \
--offload-prefetch-step 1 \
--offload-params w13_weight w2_weight \
--port "$PORT" &
SERVER_PID=$!
wait_for_server "$PORT"
TAG=$(echo "$MODEL" | tr '/: \\n' '_____')
OUT="${OUT_DIR}/${TAG}_prefetch_offload.json"
python3 tests/evals/gsm8k/gsm8k_eval.py --host http://127.0.0.1 --port "$PORT" --num-questions "${NUM_Q}" --save-results "${OUT}"
python3 - <<PY
import json; acc=json.load(open('${OUT}'))['accuracy']
print(f"${MODEL} prefetch_offload: accuracy {acc:.3f}")
assert acc >= ${THRESHOLD}, f"${MODEL} prefetch_offload accuracy {acc}"
PY
cleanup
SERVER_PID=
......@@ -28,3 +28,12 @@ steps:
working_dir: "/vllm-workspace"
commands:
- bash .buildkite/scripts/scheduled_integration_test/qwen30b_a3b_fp8_block_ep_eplb.sh 0.8 200 8020 2 1
- label: DeepSeek V2-Lite Prefetch Offload Accuracy (H100)
timeout_in_minutes: 60
device: h100
optional: true
num_devices: 1
working_dir: "/vllm-workspace"
commands:
- bash .buildkite/scripts/scheduled_integration_test/deepseek_v2_lite_prefetch_offload.sh 0.25 200 8030
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Test prefetch offloading correctness with Llama model."""
from ..utils import compare_two_settings
def test_prefetch_offload_llama():
"""Test prefetch CPU offloading with Llama-3.2-1B-Instruct.
Compares outputs between:
1. Baseline (no offloading)
2. Prefetch offloading (group_size=8, num_in_group=2, prefetch_step=1)
This tests prefetching-based offloading on a dense model.
"""
compare_two_settings(
"meta-llama/Llama-3.2-1B-Instruct",
[
# Prefetch offloading configuration
"--offload-group-size",
"8",
"--offload-num-in-group",
"2",
"--offload-prefetch-step",
"1",
# Selective offloading: only MLP weights
"--offload-params",
"gate_up_proj",
"down_proj",
],
[], # Baseline: no offloading
)
......@@ -17,6 +17,7 @@ from vllm.config import CUDAGraphMode, VllmConfig
from vllm.distributed.device_communicators.pynccl_allocator import set_graph_pool_id
from vllm.forward_context import BatchDescriptor, get_forward_context
from vllm.logger import init_logger
from vllm.model_executor.offloader.base import get_offloader
from vllm.platforms import current_platform
from vllm.utils.torch_utils import current_stream, weak_ref_tensors
......@@ -265,6 +266,11 @@ class CUDAGraphWrapper:
set_graph_pool_id(self.graph_pool)
else:
set_graph_pool_id(current_platform.graph_pool_handle())
# Sync offloader's copy stream before capture.
# Ensure any pre-capture prefetches from offloader are complete.
get_offloader().sync_prev_onload()
# mind-exploding: carefully manage the reference and memory.
with torch.cuda.graph(
cudagraph,
......@@ -273,6 +279,11 @@ class CUDAGraphWrapper:
):
# `output` is managed by pytorch's cudagraph pool
output = self.runnable(*args, **kwargs)
# Join offloader's copy stream after forward to avoid
# unjoined stream error. The last layer's start_prefetch
# forks copy_stream, but wait_prefetch only happens in
# the next forward pass.
get_offloader().join_after_forward()
if self.cudagraph_options.weak_ref_output:
# by converting it to weak ref,
# the original `output` will immediately be released
......@@ -305,5 +316,8 @@ class CUDAGraphWrapper:
f"got {new_input_addresses}"
)
# Sync offloader before replay - ensures any external dependencies
# from pre-capture prefetches are satisfied.
get_offloader().sync_prev_onload()
entry.cudagraph.replay()
return entry.output
......@@ -24,6 +24,12 @@ from vllm.config.model import (
)
from vllm.config.multimodal import MultiModalConfig
from vllm.config.observability import ObservabilityConfig
from vllm.config.offload import (
OffloadBackend,
OffloadConfig,
PrefetchOffloadConfig,
UVAOffloadConfig,
)
from vllm.config.parallel import EPLBConfig, ParallelConfig
from vllm.config.pooler import PoolerConfig
from vllm.config.profiler import ProfilerConfig
......@@ -85,6 +91,11 @@ __all__ = [
"MultiModalConfig",
# From vllm.config.observability
"ObservabilityConfig",
# From vllm.config.offload
"OffloadBackend",
"OffloadConfig",
"PrefetchOffloadConfig",
"UVAOffloadConfig",
# From vllm.config.parallel
"EPLBConfig",
"ParallelConfig",
......
......@@ -100,17 +100,15 @@ class CacheConfig:
load a 13B model with BF16 weight, which requires at least 26GB GPU memory.
Note that this requires fast CPU-GPU interconnect, as part of the model is
loaded from CPU memory to GPU memory on the fly in each model forward pass.
DEPRECATED: This field is deprecated and will be removed in v0.16.
Please use OffloadConfig.uva.cpu_offload_gb instead.
"""
cpu_offload_params: set[str] = Field(default_factory=set)
""" The set of parameter name segments to target for CPU offloading.
Unmatched parameters are not offloaded. If this set is empty, parameters
are offloaded non-selectively until the memory limit defined by
`cpu_offload_gb` is reached.
Examples:
- For parameter name "mlp.experts.w2_weight":
- "experts" or "experts.w2_weight" will match.
- "expert" or "w2" will NOT match (must be exact segments).
This allows distinguishing parameters like "w2_weight" and "w2_weight_scale".
"""The set of parameter name segments to target for CPU offloading.
DEPRECATED: This field is deprecated and will be removed in v0.16.
Please use OffloadConfig.uva.cpu_offload_params instead.
"""
calculate_kv_scales: bool = False
"""This enables dynamic calculation of `k_scale` and `v_scale` when
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Configuration for model weight offloading."""
import warnings
from typing import Literal
from pydantic import Field, model_validator
from vllm.config.utils import config
OffloadBackend = Literal["auto", "uva", "prefetch"]
@config
class UVAOffloadConfig:
"""Configuration for UVA (Unified Virtual Addressing) CPU offloading.
Uses zero-copy access from CPU-pinned memory. Simple but requires
fast CPU-GPU interconnect.
"""
cpu_offload_gb: float = Field(default=0, ge=0)
"""The space in GiB to offload to CPU, per GPU. Default is 0, which means
no offloading. Intuitively, this argument can be seen as a virtual way to
increase the GPU memory size. For example, if you have one 24 GB GPU and
set this to 10, virtually you can think of it as a 34 GB GPU. Then you can
load a 13B model with BF16 weight, which requires at least 26GB GPU memory.
Note that this requires fast CPU-GPU interconnect, as part of the model is
loaded from CPU memory to GPU memory on the fly in each model forward pass.
This uses UVA (Unified Virtual Addressing) for zero-copy access.
"""
cpu_offload_params: set[str] = Field(default_factory=set)
"""The set of parameter name segments to target for CPU offloading.
Unmatched parameters are not offloaded. If this set is empty, parameters
are offloaded non-selectively until the memory limit defined by
`cpu_offload_gb` is reached.
Examples:
- For parameter name "mlp.experts.w2_weight":
- "experts" or "experts.w2_weight" will match.
- "expert" or "w2" will NOT match (must be exact segments).
This allows distinguishing parameters like "w2_weight" and "w2_weight_scale".
"""
@config
class PrefetchOffloadConfig:
"""Configuration for prefetch-based CPU offloading.
Groups layers and uses async H2D prefetch to hide transfer latency.
"""
offload_group_size: int = Field(default=0, ge=0)
"""Group every N layers together. Offload last `offload_num_in_group`
layers of each group. Default is 0 (disabled).
Example: group_size=8, num_in_group=2 offloads layers 6,7,14,15,22,23,...
Unlike cpu_offload_gb, this uses explicit async prefetching to hide transfer
latency.
"""
offload_num_in_group: int = Field(default=1, ge=1)
"""Number of layers to offload per group.
Must be <= offload_group_size. Default is 1."""
offload_prefetch_step: int = Field(default=1, ge=0)
"""Number of layers to prefetch ahead.
Higher values hide more latency but use more GPU memory. Default is 1."""
offload_params: set[str] = Field(default_factory=set)
"""The set of parameter name segments to target for prefetch offloading.
Unmatched parameters are not offloaded. If this set is empty, ALL
parameters of each offloaded layer are offloaded.
Uses segment matching: "w13_weight" matches "mlp.experts.w13_weight"
but not "mlp.experts.w13_weight_scale".
"""
@config
class OffloadConfig:
"""Configuration for model weight offloading to reduce GPU memory usage."""
offload_backend: OffloadBackend = "auto"
"""The backend for weight offloading. Options:
- "auto": Selects based on which sub-config has non-default values
(prefetch if offload_group_size > 0, uva if cpu_offload_gb > 0).
- "uva": UVA (Unified Virtual Addressing) zero-copy offloading.
- "prefetch": Async prefetch with group-based layer offloading.
"""
uva: UVAOffloadConfig = Field(default_factory=UVAOffloadConfig)
"""Parameters for UVA offloading backend."""
prefetch: PrefetchOffloadConfig = Field(default_factory=PrefetchOffloadConfig)
"""Parameters for prefetch offloading backend."""
@model_validator(mode="after")
def validate_offload_config(self) -> "OffloadConfig":
"""Validate offload configuration constraints."""
if self.offload_backend == "prefetch" or self.prefetch.offload_group_size > 0:
if self.prefetch.offload_num_in_group > self.prefetch.offload_group_size:
raise ValueError(
f"offload_num_in_group ({self.prefetch.offload_num_in_group})"
f" must be <= offload_group_size"
f" ({self.prefetch.offload_group_size})"
)
if self.prefetch.offload_prefetch_step < 1:
raise ValueError(
f"offload_prefetch_step"
f" ({self.prefetch.offload_prefetch_step})"
f" must be >= 1 when prefetch offloading is enabled"
f" (offload_group_size > 0)"
)
# Warn if both backends have non-default values
uva_active = self.uva.cpu_offload_gb > 0
prefetch_active = self.prefetch.offload_group_size > 0
if self.offload_backend == "uva" and prefetch_active:
warnings.warn(
"Prefetch offload fields are set but offload_backend='uva'. "
"Prefetch settings will be ignored.",
stacklevel=2,
)
elif self.offload_backend == "prefetch" and uva_active:
warnings.warn(
"UVA offload fields are set but offload_backend='prefetch'. "
"UVA settings will be ignored.",
stacklevel=2,
)
elif self.offload_backend == "auto" and uva_active and prefetch_active:
warnings.warn(
"Both UVA and prefetch offload fields are set with "
"offload_backend='auto'. Prefetch backend will be selected. "
"Set offload_backend explicitly to suppress this warning.",
stacklevel=2,
)
return self
def compute_hash(self) -> str:
"""
Provide a hash that uniquely identifies all the offload configs.
All fields are included because PrefetchOffloader patches module
forwards and inserts custom ops (wait_prefetch, start_prefetch)
into the computation graph. Changing any offload setting can
alter which layers are hooked and how prefetch indices are
computed, so the compilation cache must distinguish them.
"""
from vllm.config.utils import get_hash_factors, hash_factors
factors = get_hash_factors(self, ignored_factors=set())
hash_str = hash_factors(factors)
return hash_str
......@@ -37,6 +37,7 @@ from .load import LoadConfig
from .lora import LoRAConfig
from .model import ModelConfig
from .observability import ObservabilityConfig
from .offload import OffloadConfig
from .parallel import ParallelConfig
from .profiler import ProfilerConfig
from .scheduler import SchedulerConfig
......@@ -259,6 +260,8 @@ class VllmConfig:
"""Device configuration."""
load_config: LoadConfig = Field(default_factory=LoadConfig)
"""Load configuration."""
offload_config: OffloadConfig = Field(default_factory=OffloadConfig)
"""Model weight offloading configuration."""
attention_config: AttentionConfig = Field(default_factory=AttentionConfig)
"""Attention configuration."""
kernel_config: KernelConfig = Field(default_factory=KernelConfig)
......@@ -361,6 +364,10 @@ class VllmConfig:
vllm_factors.append(self.load_config.compute_hash())
else:
vllm_factors.append("None")
if self.offload_config:
vllm_factors.append(self.offload_config.compute_hash())
else:
vllm_factors.append("None")
if self.attention_config:
vllm_factors.append(self.attention_config.compute_hash())
else:
......
......@@ -48,12 +48,15 @@ from vllm.config import (
ModelConfig,
MultiModalConfig,
ObservabilityConfig,
OffloadConfig,
ParallelConfig,
PoolerConfig,
PrefetchOffloadConfig,
ProfilerConfig,
SchedulerConfig,
SpeculativeConfig,
StructuredOutputsConfig,
UVAOffloadConfig,
VllmConfig,
WeightTransferConfig,
get_attr_docs,
......@@ -439,8 +442,13 @@ class EngineArgs:
disable_sliding_window: bool = ModelConfig.disable_sliding_window
disable_cascade_attn: bool = ModelConfig.disable_cascade_attn
swap_space: float = CacheConfig.swap_space
cpu_offload_gb: float = CacheConfig.cpu_offload_gb
cpu_offload_params: set[str] = get_field(CacheConfig, "cpu_offload_params")
offload_backend: str = OffloadConfig.offload_backend
cpu_offload_gb: float = UVAOffloadConfig.cpu_offload_gb
cpu_offload_params: set[str] = get_field(UVAOffloadConfig, "cpu_offload_params")
offload_group_size: int = PrefetchOffloadConfig.offload_group_size
offload_num_in_group: int = PrefetchOffloadConfig.offload_num_in_group
offload_prefetch_step: int = PrefetchOffloadConfig.offload_prefetch_step
offload_params: set[str] = get_field(PrefetchOffloadConfig, "offload_params")
gpu_memory_utilization: float = CacheConfig.gpu_memory_utilization
kv_cache_memory_bytes: int | None = CacheConfig.kv_cache_memory_bytes
max_num_batched_tokens: int | None = None
......@@ -948,10 +956,6 @@ class EngineArgs:
cache_group.add_argument(
"--prefix-caching-hash-algo", **cache_kwargs["prefix_caching_hash_algo"]
)
cache_group.add_argument("--cpu-offload-gb", **cache_kwargs["cpu_offload_gb"])
cache_group.add_argument(
"--cpu-offload-params", **cache_kwargs["cpu_offload_params"]
)
cache_group.add_argument(
"--calculate-kv-scales", **cache_kwargs["calculate_kv_scales"]
)
......@@ -977,6 +981,37 @@ class EngineArgs:
"--kv-offloading-backend", **cache_kwargs["kv_offloading_backend"]
)
# Model weight offload related configs
offload_kwargs = get_kwargs(OffloadConfig)
uva_kwargs = get_kwargs(UVAOffloadConfig)
prefetch_kwargs = get_kwargs(PrefetchOffloadConfig)
offload_group = parser.add_argument_group(
title="OffloadConfig",
description=OffloadConfig.__doc__,
)
offload_group.add_argument(
"--offload-backend", **offload_kwargs["offload_backend"]
)
offload_group.add_argument("--cpu-offload-gb", **uva_kwargs["cpu_offload_gb"])
offload_group.add_argument(
"--cpu-offload-params", **uva_kwargs["cpu_offload_params"]
)
offload_group.add_argument(
"--offload-group-size",
**prefetch_kwargs["offload_group_size"],
)
offload_group.add_argument(
"--offload-num-in-group",
**prefetch_kwargs["offload_num_in_group"],
)
offload_group.add_argument(
"--offload-prefetch-step",
**prefetch_kwargs["offload_prefetch_step"],
)
offload_group.add_argument(
"--offload-params", **prefetch_kwargs["offload_params"]
)
# Multimodal related configs
multimodal_kwargs = get_kwargs(MultiModalConfig)
multimodal_group = parser.add_argument_group(
......@@ -1466,8 +1501,6 @@ class EngineArgs:
sliding_window=sliding_window,
enable_prefix_caching=self.enable_prefix_caching,
prefix_caching_hash_algo=self.prefix_caching_hash_algo,
cpu_offload_gb=self.cpu_offload_gb,
cpu_offload_params=self.cpu_offload_params,
calculate_kv_scales=self.calculate_kv_scales,
kv_sharing_fast_prefill=self.kv_sharing_fast_prefill,
mamba_cache_dtype=self.mamba_cache_dtype,
......@@ -1825,6 +1858,21 @@ class EngineArgs:
compilation_config.max_cudagraph_capture_size = (
self.max_cudagraph_capture_size
)
offload_config = OffloadConfig(
offload_backend=self.offload_backend,
uva=UVAOffloadConfig(
cpu_offload_gb=self.cpu_offload_gb,
cpu_offload_params=self.cpu_offload_params,
),
prefetch=PrefetchOffloadConfig(
offload_group_size=self.offload_group_size,
offload_num_in_group=self.offload_num_in_group,
offload_prefetch_step=self.offload_prefetch_step,
offload_params=self.offload_params,
),
)
config = VllmConfig(
model_config=model_config,
cache_config=cache_config,
......@@ -1832,6 +1880,7 @@ class EngineArgs:
scheduler_config=scheduler_config,
device_config=device_config,
load_config=load_config,
offload_config=offload_config,
attention_config=attention_config,
kernel_config=kernel_config,
lora_config=lora_config,
......
......@@ -170,6 +170,19 @@ class LLM:
the model weights. This virtually increases the GPU memory space
you can use to hold the model weights, at the cost of CPU-GPU data
transfer for every forward pass.
offload_group_size: Prefetch offloading: Group every N layers
together. Offload last `offload_num_in_group` layers of each group.
Default is 0 (disabled).
offload_num_in_group: Prefetch offloading: Number of layers to
offload per group. Default is 1.
offload_prefetch_step: Prefetch offloading: Number of layers to
prefetch ahead. Higher values hide more latency but use more GPU
memory. Default is 1.
offload_params: Prefetch offloading: Set of parameter name segments
to selectively offload. Only parameters whose names contain one of
these segments will be offloaded (e.g., {"gate_up_proj", "down_proj"}
for MLP weights, or {"w13_weight", "w2_weight"} for MoE expert
weights). If None or empty, all parameters are offloaded.
enforce_eager: Whether to enforce eager execution. If True, we will
disable CUDA graph and always execute the model in eager mode.
If False, we will use CUDA graph and eager execution in hybrid.
......@@ -224,6 +237,10 @@ class LLM:
gpu_memory_utilization: float = 0.9,
swap_space: float = 4,
cpu_offload_gb: float = 0,
offload_group_size: int = 0,
offload_num_in_group: int = 1,
offload_prefetch_step: int = 1,
offload_params: set[str] | None = None,
enforce_eager: bool = False,
enable_return_routed_experts: bool = False,
disable_custom_all_reduce: bool = False,
......@@ -333,6 +350,10 @@ class LLM:
kv_cache_memory_bytes=kv_cache_memory_bytes,
swap_space=swap_space,
cpu_offload_gb=cpu_offload_gb,
offload_group_size=offload_group_size,
offload_num_in_group=offload_num_in_group,
offload_prefetch_step=offload_prefetch_step,
offload_params=offload_params or set(),
enforce_eager=enforce_eager,
enable_return_routed_experts=enable_return_routed_experts,
disable_custom_all_reduce=disable_custom_all_reduce,
......
......@@ -9,11 +9,9 @@ from typing import Any, Literal, Protocol, overload
import torch
import torch.nn as nn
from torch.func import functional_call
from torch.nn.modules.module import register_module_module_registration_hook
from transformers import PretrainedConfig
import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.distributed import (
get_tensor_model_parallel_rank,
......@@ -31,14 +29,11 @@ from vllm.model_executor.models.interfaces import supports_any_eagle
from vllm.multimodal import NestedTensors
from vllm.sequence import IntermediateTensors
from vllm.utils.math_utils import cdiv
from vllm.utils.mem_utils import format_gib
from vllm.utils.platform_utils import (
is_pin_memory_available,
is_uva_available,
)
from vllm.utils.torch_utils import (
direct_register_custom_op,
get_accelerator_view_from_cpu_tensor,
)
logger = init_logger(__name__)
......@@ -612,98 +607,6 @@ class PPMissingLayer(torch.nn.Identity):
return args[0] if args else next(iter(kwargs.values()))
_CPU_OFFLOAD_BYTES = 0
_CPU_OFFLOAD_MAX_BYTES = 0
_CPU_OFFLOAD_PARAMS = set()
def set_cpu_offload_max_bytes(max_bytes: int) -> None:
global _CPU_OFFLOAD_MAX_BYTES, _CPU_OFFLOAD_BYTES
_CPU_OFFLOAD_BYTES = 0
_CPU_OFFLOAD_MAX_BYTES = max_bytes
def set_cpu_offload_params(params: set[str]) -> None:
global _CPU_OFFLOAD_PARAMS
_CPU_OFFLOAD_PARAMS = params
def maybe_offload_to_cpu(module: torch.nn.Module) -> torch.nn.Module:
if (params := next(module.parameters(), None)) is None:
return module
device = params.device
if device == torch.device("cpu"):
return module
global _CPU_OFFLOAD_MAX_BYTES, _CPU_OFFLOAD_BYTES
if _CPU_OFFLOAD_BYTES >= _CPU_OFFLOAD_MAX_BYTES:
return module
pin_memory = (
is_pin_memory_available() and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_PIN_MEMORY
)
uva_offloading = is_uva_available() and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_UVA
# offload parameters to CPU
# use pin_memory if possible, which helps cudagraph capture speed
offloaded_parameters = False
for name, p in module.named_parameters():
if _CPU_OFFLOAD_BYTES >= _CPU_OFFLOAD_MAX_BYTES:
# we use per-parameter offloading
# one module might have some parameters offloaded and some not
break
if _CPU_OFFLOAD_PARAMS:
# Check if parameter belongs to the offloading set
# Add dots here to ensure we match full segments only
# e.g., "experts.w2_weight" matches "mlp.experts.w2_weight" but not
# "mlp.experts.w2_weight_scale"
should_offload = any(
f".{param}." in f".{name}." for param in _CPU_OFFLOAD_PARAMS
)
if not should_offload:
continue
cpu_data = p.data.to(device="cpu")
if pin_memory:
cpu_data = cpu_data.pin_memory()
if not uva_offloading:
p.data = cpu_data
else:
p.data = get_accelerator_view_from_cpu_tensor(cpu_data)
p._vllm_is_uva_offloaded = True
_CPU_OFFLOAD_BYTES += p.data.numel() * p.data.element_size()
offloaded_parameters = True
if offloaded_parameters and not uva_offloading:
original_forward = module.forward
def forward(*args, **kwargs):
module.forward = original_forward
device_state = {
# here we blindly call `to(device)`
# if the parameter is already on the device, it will be a no-op
k: v.to(device, non_blocking=True)
for k, v in module.state_dict().items()
}
# set `tie_weights=False` as tied weights in original model
# become untied when calling .to(device) individually
output = functional_call(
module, device_state, args=args, kwargs=kwargs, tie_weights=False
)
module.forward = forward
return output
module.forward = forward
return module
def make_layers(
num_hidden_layers: int,
layer_fn: LayerFn,
......@@ -711,25 +614,31 @@ def make_layers(
) -> tuple[int, int, torch.nn.ModuleList]:
"""Make a list of layers with the given layer function, taking
pipeline parallelism into account.
Args:
num_hidden_layers: Total number of hidden layers in the model.
layer_fn: Function to create a layer given its index.
prefix: Prefix for layer names.
Returns:
Tuple of (start_layer, end_layer, modules).
"""
from vllm.distributed.parallel_state import get_pp_group
from vllm.distributed.utils import get_pp_indices
from vllm.model_executor.offloader import get_offloader
start_layer, end_layer = get_pp_indices(
num_hidden_layers, get_pp_group().rank_in_group, get_pp_group().world_size
)
modules = torch.nn.ModuleList(
[PPMissingLayer() for _ in range(start_layer)]
+ [
maybe_offload_to_cpu(layer_fn(prefix=f"{prefix}.{idx}"))
for idx in range(start_layer, end_layer)
]
+ get_offloader().wrap_modules(
layer_fn(prefix=f"{prefix}.{idx}") for idx in range(start_layer, end_layer)
)
+ [PPMissingLayer() for _ in range(end_layer, num_hidden_layers)]
)
if _CPU_OFFLOAD_MAX_BYTES > 0:
logger.info(
"Total CPU offloaded parameters: %s GBs", format_gib(_CPU_OFFLOAD_BYTES)
)
return start_layer, end_layer, modules
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Model parameter offloading infrastructure."""
from vllm.model_executor.offloader.base import (
BaseOffloader,
NoopOffloader,
create_offloader,
get_offloader,
set_offloader,
)
from vllm.model_executor.offloader.prefetch import PrefetchOffloader
from vllm.model_executor.offloader.uva import UVAOffloader
__all__ = [
"BaseOffloader",
"NoopOffloader",
"UVAOffloader",
"PrefetchOffloader",
"create_offloader",
"get_offloader",
"set_offloader",
]
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# Adapted from
# https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/utils/offloader.py
"""Base classes for model parameter offloading."""
from abc import ABC, abstractmethod
from collections.abc import Generator
from typing import TYPE_CHECKING
import torch.nn as nn
from vllm.logger import init_logger
if TYPE_CHECKING:
from vllm.config import OffloadConfig
logger = init_logger(__name__)
"""
class relation:
BaseOffloader (ABC)
* implemented by: UVAOffloader
* implemented by: PrefetchOffloader
* uses: _ModuleOffloader
* uses: _BaseParamOffloader (ABC)
* implemented by: _CpuParamOffloader
"""
class BaseOffloader(ABC):
"""Base class for model parameter offloading strategies.
Offloaders control how model parameters are stored and loaded during
inference. Different strategies trade memory for compute/transfer time.
"""
@abstractmethod
def wrap_modules(
self,
modules_generator: Generator[nn.Module, None, None],
) -> list[nn.Module]:
"""Wrap modules with offloading logic.
Args:
modules_generator: Generator yielding modules to potentially offload.
Returns:
List of modules, potentially with offloading hooks installed.
"""
pass
def post_init(self):
"""Called after model construction completes.
Offloaders can use this to:
- Finalize parameter storage
- Start initial prefetching
- Allocate shared resources
"""
return
def sync_prev_onload(self) -> None: # noqa: B027
"""Sync previous onload operations. Override in subclasses."""
pass
def join_after_forward(self) -> None: # noqa: B027
"""Join streams after forward. Override in subclasses."""
pass
def _wait_for_layer(self, layer_idx: int) -> None: # noqa: B027
"""Wait for layer prefetch. Override in subclasses."""
pass
def _start_prefetch(self, layer_idx: int) -> None: # noqa: B027
"""Start layer prefetch. Override in subclasses."""
pass
class NoopOffloader(BaseOffloader):
"""No-op offloader that returns modules as-is without any offloading."""
def wrap_modules(
self,
modules_generator: Generator[nn.Module, None, None],
) -> list[nn.Module]:
"""Return modules unchanged."""
return list(modules_generator)
# Global singleton offloader instance (defaults to no-op).
_instance: BaseOffloader = NoopOffloader()
def get_offloader() -> BaseOffloader:
"""Get the global offloader instance."""
return _instance
def set_offloader(instance: BaseOffloader) -> None:
"""Set the global offloader instance."""
global _instance
_instance = instance
logger.info("Offloader set to %s", type(instance).__name__)
def create_offloader(offload_config: "OffloadConfig") -> BaseOffloader:
"""Create an offloader based on the offload configuration.
Uses the explicit ``offload_backend`` selector. When set to ``"auto"``,
selects prefetch if ``offload_group_size > 0``, UVA if
``cpu_offload_gb > 0``, otherwise noop.
"""
from vllm.model_executor.offloader.prefetch import PrefetchOffloader
from vllm.model_executor.offloader.uva import UVAOffloader
backend = offload_config.offload_backend
uva = offload_config.uva
prefetch = offload_config.prefetch
if backend == "auto":
if prefetch.offload_group_size > 0:
backend = "prefetch"
elif uva.cpu_offload_gb > 0:
backend = "uva"
else:
return NoopOffloader()
if backend == "prefetch":
return PrefetchOffloader(
group_size=prefetch.offload_group_size,
num_in_group=prefetch.offload_num_in_group,
prefetch_step=prefetch.offload_prefetch_step,
offload_params=prefetch.offload_params,
mode="cpu",
)
elif backend == "uva":
return UVAOffloader(
cpu_offload_max_bytes=int(uva.cpu_offload_gb * 1024**3),
cpu_offload_params=uva.cpu_offload_params,
)
else:
return NoopOffloader()
This diff is collapsed.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Custom ops for prefetch offloader torch.compile + CUDA graph compatibility.
These ops use mutates_args to create data dependencies that prevent
the compiler from reordering prefetch/sync operations.
"""
from __future__ import annotations
import torch
from vllm.model_executor.offloader.base import get_offloader
from vllm.utils.torch_utils import direct_register_custom_op
# --- wait_prefetch op ---
def _wait_prefetch_impl(
input_tensor: torch.Tensor,
layer_idx: int,
) -> None:
"""Wait for prefetch of layer_idx to complete.
Synchronizes the compute stream with the copy stream to ensure
the prefetched weights are ready for use.
Args:
input_tensor: Input to the layer (e.g., hidden_states) - declared
as mutated to create data dependency for torch.compile.
layer_idx: Index of the layer to wait for.
"""
get_offloader()._wait_for_layer(layer_idx)
def _wait_prefetch_fake(
input_tensor: torch.Tensor,
layer_idx: int,
) -> None:
"""Fake implementation for torch.compile tracing."""
return
# --- start_prefetch op ---
def _start_prefetch_impl(
output_tensor: torch.Tensor,
layer_idx: int,
) -> None:
"""Start async prefetch of layer_idx weights.
Initiates H2D copy on the copy stream for the specified layer.
Args:
output_tensor: Output from forward - declared as mutated to
prevent torch.compile from reordering this op before the
computation that produces output_tensor.
layer_idx: Index of the layer to prefetch.
"""
get_offloader()._start_prefetch(layer_idx)
def _start_prefetch_fake(
output_tensor: torch.Tensor,
layer_idx: int,
) -> None:
"""Fake implementation for torch.compile tracing."""
return
def register_prefetch_offloader_ops() -> None:
"""Register custom ops for prefetch offloader.
Must be called before the ops are used. This is typically done
at module import time.
"""
direct_register_custom_op(
op_name="wait_prefetch",
op_func=_wait_prefetch_impl,
mutates_args=["input_tensor"],
fake_impl=_wait_prefetch_fake,
)
direct_register_custom_op(
op_name="start_prefetch",
op_func=_start_prefetch_impl,
mutates_args=["output_tensor"],
fake_impl=_start_prefetch_fake,
)
# Register ops at module import time
register_prefetch_offloader_ops()
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""UVA-based CPU offloading using Unified Virtual Addressing."""
from collections.abc import Generator
import torch
import torch.nn as nn
from torch.func import functional_call
import vllm.envs as envs
from vllm.logger import init_logger
from vllm.model_executor.offloader.base import BaseOffloader
from vllm.utils.mem_utils import format_gib
from vllm.utils.platform_utils import is_pin_memory_available, is_uva_available
from vllm.utils.torch_utils import get_accelerator_view_from_cpu_tensor
logger = init_logger(__name__)
class UVAOffloader(BaseOffloader):
"""Offloader using Unified Virtual Addressing (UVA) for zero-copy access.
This offloader moves parameters to pinned CPU memory and creates CUDA views
using UVA. The GPU can then directly access the CPU memory without explicit
transfers, at the cost of PCIe bandwidth (slower than GPU memory).
When UVA is disabled via env var, falls back to a functional_call-based
approach that moves parameters on-demand.
Args:
cpu_offload_max_bytes: Maximum bytes to offload to CPU.
cpu_offload_params: Set of parameter name segments to selectively
offload. If empty, all parameters are eligible up to the byte limit.
"""
def __init__(
self,
cpu_offload_max_bytes: int,
cpu_offload_params: set[str] | None = None,
):
self.cpu_offload_max_bytes = cpu_offload_max_bytes
self.cpu_offload_bytes = 0
self.cpu_offload_params = cpu_offload_params or set()
self.pin_memory = (
is_pin_memory_available()
and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_PIN_MEMORY
)
self.uva_offloading = (
is_uva_available() and not envs.VLLM_WEIGHT_OFFLOADING_DISABLE_UVA
)
def wrap_modules(
self,
modules_generator: Generator[nn.Module, None, None],
) -> list[nn.Module]:
"""Wrap modules with UVA offloading."""
modules = [self._maybe_offload_to_cpu(module) for module in modules_generator]
if self.cpu_offload_bytes > 0:
logger.info(
"Total CPU offloaded parameters: %s",
format_gib(self.cpu_offload_bytes),
)
return modules
def _maybe_offload_to_cpu(self, module: nn.Module) -> nn.Module:
"""Offload module parameters to CPU using UVA if budget allows."""
if (params := next(module.parameters(), None)) is None:
return module
device = params.device
if device == torch.device("cpu"):
return module
if self.cpu_offload_bytes >= self.cpu_offload_max_bytes:
return module
# offload parameters to CPU
# use pin_memory if possible, which helps cudagraph capture speed
offloaded_parameters = False
for name, p in module.named_parameters():
if self.cpu_offload_bytes >= self.cpu_offload_max_bytes:
# we use per-parameter offloading
# one module might have some parameters offloaded and some not
break
if self.cpu_offload_params:
# Check if parameter belongs to the offloading set
# Add dots here to ensure we match full segments only
# e.g., "experts.w2_weight" matches "mlp.experts.w2_weight"
# but not "mlp.experts.w2_weight_scale"
should_offload = any(
f".{param}." in f".{name}." for param in self.cpu_offload_params
)
if not should_offload:
continue
cpu_data = p.data.to(device="cpu")
if self.pin_memory:
cpu_data = cpu_data.pin_memory()
if not self.uva_offloading:
p.data = cpu_data
else:
p.data = get_accelerator_view_from_cpu_tensor(cpu_data)
p._vllm_is_uva_offloaded = True
self.cpu_offload_bytes += p.data.numel() * p.data.element_size()
offloaded_parameters = True
if offloaded_parameters and not self.uva_offloading:
original_forward = module.forward
def forward(*args, **kwargs):
module.forward = original_forward
device_state = {
# here we blindly call `to(device)`
# if the parameter is already on the device,
# it will be a no-op
k: v.to(device, non_blocking=True)
for k, v in module.state_dict().items()
}
# set `tie_weights=False` as tied weights in original model
# become untied when calling .to(device) individually
output = functional_call(
module,
device_state,
args=args,
kwargs=kwargs,
tie_weights=False,
)
module.forward = forward
return output
module.forward = forward
return module
......@@ -12,6 +12,7 @@ from vllm.config import VllmConfig
from vllm.config.compilation import CUDAGraphMode
from vllm.distributed.parallel_state import graph_capture, is_global_first_rank
from vllm.forward_context import BatchDescriptor, set_forward_context
from vllm.model_executor.offloader.base import get_offloader
from vllm.utils.math_utils import cdiv
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.worker.gpu.attn_utils import (
......@@ -189,6 +190,11 @@ class CudaGraphManager:
# Capture the graph.
assert num_tokens not in self.graphs
graph = torch.cuda.CUDAGraph()
# Sync offloader's copy stream before capture.
# Ensure any pre-capture prefetches from offloader are complete.
get_offloader().sync_prev_onload()
with (
set_forward_context(
attn_metadata=attn_metadata,
......@@ -205,6 +211,11 @@ class CudaGraphManager:
positions=positions,
inputs_embeds=inputs_embeds,
)
# Join offloader's copy stream after forward to avoid unjoined
# stream error. The last layer's start_prefetch forks copy_stream,
# but wait_prefetch only happens in the next forward pass.
get_offloader().join_after_forward()
if self.use_aux_hidden_state_outputs:
hidden_states, aux_hidden_states = model_output
else:
......@@ -329,6 +340,13 @@ class CudaGraphManager:
self, num_tokens: int
) -> torch.Tensor | tuple[torch.Tensor, list[torch.Tensor]]:
assert num_tokens in self.graphs, f"No cudagraph for {num_tokens} tokens"
# Sync offloader before replay - needed when transitioning from
# eager/piecewise to full cudagraph (e.g., prefill → decode).
# The previous eager iteration's start_prefetch may have queued
# H2D copies on copy_stream that the graph's captured events
# cannot see. Without this, replay could overwrite static buffers
# while those copies are still in flight.
get_offloader().sync_prev_onload()
self.graphs[num_tokens].replay()
assert self.hidden_states is not None
hidden_states = self.hidden_states[:num_tokens]
......
......@@ -7,6 +7,7 @@ import torch
from vllm.config import VllmConfig
from vllm.config.compilation import CUDAGraphMode
from vllm.model_executor.offloader.base import get_offloader
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.worker.gpu.block_table import BlockTables
from vllm.v1.worker.gpu.cudagraph_utils import (
......@@ -115,6 +116,11 @@ class EagleCudaGraphManager:
) -> None:
assert num_tokens not in self.graphs
graph = torch.cuda.CUDAGraph()
# Sync offloader's copy stream before capture.
# Ensure any pre-capture prefetches from offloader are complete.
get_offloader().sync_prev_onload()
with torch.cuda.graph(graph, self.pool):
generate_fn(
num_reqs,
......@@ -124,6 +130,10 @@ class EagleCudaGraphManager:
num_tokens_across_dp,
CUDAGraphMode.NONE,
)
# Join offloader's copy stream after forward to avoid unjoined
# stream error. The last layer's start_prefetch forks copy_stream,
# but wait_prefetch only happens in the next forward pass.
get_offloader().join_after_forward()
self.graphs[num_tokens] = graph
def _capture_piecewise_graph(
......@@ -171,4 +181,11 @@ class EagleCudaGraphManager:
def run_fullgraph(self, num_tokens: int) -> None:
assert num_tokens in self.graphs
# Sync offloader before replay - needed when transitioning from
# eager/piecewise to full cudagraph (e.g., prefill → decode).
# The previous eager iteration's start_prefetch may have queued
# H2D copies on copy_stream that the graph's captured events
# cannot see. Without this, replay could overwrite static buffers
# while those copies are still in flight.
get_offloader().sync_prev_onload()
self.graphs[num_tokens].replay()
......@@ -81,6 +81,11 @@ from vllm.model_executor.models.interfaces_base import (
is_pooling_model,
is_text_generation_model,
)
from vllm.model_executor.offloader import (
create_offloader,
get_offloader,
set_offloader,
)
from vllm.multimodal import MULTIMODAL_REGISTRY
from vllm.multimodal.encoder_budget import MultiModalBudget
from vllm.multimodal.inputs import (
......@@ -378,6 +383,7 @@ class GPUModelRunner(
self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.cache_config = vllm_config.cache_config
self.offload_config = vllm_config.offload_config
self.compilation_config = vllm_config.compilation_config
self.lora_config = vllm_config.lora_config
self.load_config = vllm_config.load_config
......@@ -386,14 +392,6 @@ class GPUModelRunner(
self.speculative_config = vllm_config.speculative_config
self.observability_config = vllm_config.observability_config
from vllm.model_executor.models.utils import (
set_cpu_offload_max_bytes,
set_cpu_offload_params,
)
set_cpu_offload_max_bytes(int(self.cache_config.cpu_offload_gb * 1024**3))
set_cpu_offload_params(self.cache_config.cpu_offload_params)
model_config = self.model_config
cache_config = self.cache_config
scheduler_config = self.scheduler_config
......@@ -749,6 +747,10 @@ class GPUModelRunner(
pin_memory=self.pin_memory,
)
# Model weight offloader
# Make sure this is called before any get_offloader call
set_offloader(create_offloader(self.offload_config))
# Ephemeral state transferred between execute_model() and sample_tokens().
self.execute_model_state: ExecuteModelState | None = None
self.kv_connector_output: KVConnectorOutput | None = None
......@@ -4342,6 +4344,8 @@ class GPUModelRunner(
self.model, self.vllm_config, CUDAGraphMode.NONE, self.device
)
get_offloader().post_init()
def _get_eagle3_aux_layers_from_config(self) -> tuple[int, ...] | None:
"""Extract Eagle3 auxiliary layer indices from speculative config.
......@@ -5780,7 +5784,7 @@ class GPUModelRunner(
if block_sizes != [self.cache_config.block_size] or kernel_block_sizes != [
self.cache_config.block_size
]:
assert self.cache_config.cpu_offload_gb == 0, (
assert self.offload_config.uva.cpu_offload_gb == 0, (
"Cannot re-initialize the input batch when CPU weight "
"offloading is enabled. See https://github.com/vllm-project/vllm/pull/18298 " # noqa: E501
"for more details."
......
......@@ -20,6 +20,7 @@ from vllm.forward_context import (
override_forward_context,
)
from vllm.logger import init_logger
from vllm.model_executor.offloader.base import get_offloader
from vllm.platforms import current_platform
from vllm.sequence import IntermediateTensors
from vllm.utils.import_utils import has_deep_gemm
......@@ -239,6 +240,11 @@ class UBatchWrapper:
set_graph_pool_id(self.graph_pool)
else:
set_graph_pool_id(current_platform.graph_pool_handle())
# Sync offloader's copy stream before capture.
# Ensure any pre-capture prefetches from offloader are complete.
get_offloader().sync_prev_onload()
with torch.cuda.graph(
cudagraph_metadata.cudagraph,
stream=compute_stream,
......@@ -250,6 +256,10 @@ class UBatchWrapper:
sorted_results = [value for position, value in sorted(results)]
result = torch.cat(sorted_results, dim=0)
cudagraph_metadata.outputs = result
# Join offloader's copy stream after forward to avoid unjoined
# stream error. The last layer's start_prefetch forks copy_stream,
# but wait_prefetch only happens in the next forward pass.
get_offloader().join_after_forward()
self.cudagraphs[num_tokens] = cudagraph_metadata
return cudagraph_metadata.outputs
......@@ -461,6 +471,9 @@ class UBatchWrapper:
and cudagraph_runtime_mode is CUDAGraphMode.FULL
):
cudagraph_metadata = self.cudagraphs[num_tokens]
# Sync offloader before replay - ensures any external dependencies
# from pre-capture prefetches are satisfied.
get_offloader().sync_prev_onload()
cudagraph_metadata.cudagraph.replay()
return cudagraph_metadata.outputs
else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment