Unverified Commit be0c855e authored by omerpaz95's avatar omerpaz95 Committed by GitHub
Browse files

[KV Offload] Unified memory layout for offloading workers (#37206)


Signed-off-by: default avataromerpaz95 <omerpaz95@gmail.com>
Co-authored-by: default avatarOr Ozeri <oro@il.ibm.com>
parent e64b39ea
......@@ -2,12 +2,14 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import random
import time
import uuid
import pytest
import torch
from vllm.platforms import current_platform
from vllm.utils.torch_utils import set_random_seed
from vllm.v1.kv_offload.cpu.shared_offload_region import SharedOffloadRegion
from vllm.v1.kv_offload.mediums import CPULoadStoreSpec, GPULoadStoreSpec
from vllm.v1.kv_offload.spec import (
CanonicalKVCacheRef,
......@@ -36,6 +38,7 @@ NUM_MAPPINGS = [3]
@pytest.mark.parametrize("num_tensors", NUM_TENSORS)
@pytest.mark.parametrize("seed", SEEDS)
@pytest.mark.parametrize("device", DEVICES)
@pytest.mark.parametrize("use_shared_memory", [False, True])
@torch.inference_mode()
def test_transfer(
default_vllm_config,
......@@ -48,6 +51,7 @@ def test_transfer(
num_tensors: int,
seed: int,
device: str,
use_shared_memory: bool,
) -> None:
set_random_seed(seed)
......@@ -83,10 +87,24 @@ def test_transfer(
tensors=kv_cache_tensors,
group_data_refs=kv_cache_groups_data_refs,
)
mmap_region: SharedOffloadRegion | None = None
if use_shared_memory:
cpu_page_size = gpu_page_size_bytes * num_tensors * block_size_factor
mmap_region = SharedOffloadRegion(
instance_id=str(uuid.uuid4()),
total_size_bytes=num_cpu_blocks * cpu_page_size,
num_blocks=num_cpu_blocks,
rank=0,
num_workers=1,
cpu_page_size=cpu_page_size,
)
handlers = CpuGpuOffloadingHandlers(
kv_caches=kv_caches,
block_size_factor=block_size_factor,
num_cpu_blocks=num_cpu_blocks,
mmap_region=mmap_region,
)
# select block mappings
......@@ -137,10 +155,8 @@ def test_transfer(
if finished:
assert finished[0].job_id == 1
assert finished[0].success
assert (
finished[0].transfer_type == ("GPU", "CPU")
if gpu_to_cpu
else ("CPU", "GPU")
assert finished[0].transfer_type == (
("GPU", "CPU") if gpu_to_cpu else ("CPU", "GPU")
)
assert finished[0].transfer_size == (
len(gpu_blocks) * handler.group_block_size_in_bytes[0]
......@@ -161,9 +177,9 @@ def test_transfer(
orig_dst_tensors,
):
# view both GPU and CPU tensors as (n, gpu_page_size_bytes) for comparison.
src_view = src_tensor.view(-1, gpu_page_size_bytes)
dst_view = dst_tensor.view(-1, gpu_page_size_bytes)
orig_dst_view = orig_dst_tensor.view(-1, gpu_page_size_bytes)
src_view = src_tensor.reshape(-1, gpu_page_size_bytes)
dst_view = dst_tensor.reshape(-1, gpu_page_size_bytes)
orig_dst_view = orig_dst_tensor.reshape(-1, gpu_page_size_bytes)
for dst_sub_block in range(num_dst_sub_blocks):
src_sub_block = dst_to_src.get(dst_sub_block)
if src_sub_block is not None:
......@@ -171,3 +187,12 @@ def test_transfer(
else:
expected = orig_dst_view[dst_sub_block]
torch.testing.assert_close(dst_view[dst_sub_block].cpu(), expected.cpu())
# Drop loop-variable refs so mmap_obj has no exported buffers at cleanup.
del orig_tensor, tensor, src_tensor, dst_tensor, orig_dst_tensor
del src_view, dst_view, orig_dst_view, expected
handlers.cpu_to_gpu_handler.shutdown()
handlers.gpu_to_cpu_handler.shutdown()
if mmap_region:
mmap_region.cleanup()
This diff is collapsed.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import mmap
import os
import time
import torch
from vllm.logger import init_logger
logger = init_logger(__name__)
def _wait_for_file_size(fd: int, expected_size: int, timeout: float = 30.0) -> None:
"""Spin-wait until the file reaches expected_size (creator truncated it)."""
deadline = time.monotonic() + timeout
while True:
if os.fstat(fd).st_size >= expected_size:
return
if time.monotonic() > deadline:
raise TimeoutError(
f"Timed out waiting for mmap file to reach {expected_size} bytes"
)
time.sleep(0.005)
class SharedOffloadRegion:
"""
Single mmap-backed memory region shared across all workers for a
vLLM instance. Workers coordinate via the filesystem: the first worker
to open the file with O_EXCL becomes the creator and calls ftruncate;
the rest open the existing file and wait until it reaches the expected
size. Each worker then mmap()s the full file.
File path: /dev/shm/vllm_offload_{instance_id}.mmap
"""
def __init__(
self,
instance_id: str,
total_size_bytes: int,
num_blocks: int,
rank: int | None,
num_workers: int,
cpu_page_size: int,
) -> None:
self.page_size = mmap.PAGESIZE
self.total_size_bytes = total_size_bytes
self.mmap_path = f"/dev/shm/vllm_offload_{instance_id}.mmap"
self._creator = False # set True only if this worker creates the file
self.num_blocks = num_blocks
self.rank = rank
# interleaved-layout stride: one row = all workers' data for one block
self._row_stride = cpu_page_size * num_workers
if rank is not None:
# byte offset to this worker's first slot within each block row
self._worker_offset = rank * cpu_page_size
# exclusive upper bound for this worker's area within each row
self._worker_area_end = (rank + 1) * cpu_page_size
try:
# Exclusive create — only one worker succeeds
self.fd: int | None = os.open(
self.mmap_path, os.O_CREAT | os.O_EXCL | os.O_RDWR, 0o600
)
os.ftruncate(self.fd, self.total_size_bytes)
self._creator = True
logger.info(
"Created mmap file %s (%.2f GB)",
self.mmap_path,
self.total_size_bytes / 1e9,
)
except FileExistsError:
self.fd = os.open(self.mmap_path, os.O_RDWR)
_wait_for_file_size(self.fd, self.total_size_bytes)
logger.info("Opened existing mmap file %s", self.mmap_path)
self.mmap_obj: mmap.mmap | None = mmap.mmap(
self.fd,
self.total_size_bytes,
flags=mmap.MAP_SHARED,
prot=mmap.PROT_READ | mmap.PROT_WRITE,
)
# MADV_POPULATE_WRITE was added in Linux 5.14 (value 23).
_MADV_POPULATE_WRITE = getattr(mmap, "MADV_POPULATE_WRITE", 23)
if rank is not None:
# Populate only this worker's pages (one slot per block row).
worker_offset = rank * cpu_page_size
_t0 = time.perf_counter()
page_size = self.page_size
for block in range(num_blocks):
raw_offset = block * self._row_stride + worker_offset
aligned_offset = (raw_offset // page_size) * page_size
end = raw_offset + cpu_page_size
aligned_length = end - aligned_offset
self.mmap_obj.madvise(
_MADV_POPULATE_WRITE, aligned_offset, aligned_length
)
logger.debug(
"MADV_POPULATE_WRITE loop: %d blocks in %.3f s",
num_blocks,
time.perf_counter() - _t0,
)
else:
# No rank — populate the entire shared region in one call.
_t0 = time.perf_counter()
self.mmap_obj.madvise(_MADV_POPULATE_WRITE, 0, self.total_size_bytes)
logger.debug(
"MADV_POPULATE_WRITE entire region: %.3f s", time.perf_counter() - _t0
)
self._base = torch.frombuffer(memoryview(self.mmap_obj), dtype=torch.int8)
self._views: list[torch.Tensor] = []
self.is_pinned: bool = False
def create_next_view(self, tensor_page_size: int) -> torch.Tensor:
"""Allocate a strided int8 view for this worker, one canonical tensor.
Must be called once per canonical tensor. The full mmap layout is:
worker0_block0 | worker1_block0 | ... | worker{M-1}_block0
worker0_block1 | worker1_block1 | ... | worker{M-1}_block1
...
Each worker_block cell is cpu_page_size bytes and holds all canonical
tensors for that worker and block concatenated:
[ tensor0_data | tensor1_data | ... | tensor{L-1}_data ]
Consecutive rows are separated by row_stride = cpu_page_size * M.
Returns an int8 tensor of shape (num_blocks, tensor_page_size) with stride
(row_stride, 1). Using int8 keeps stride == bytes, so swap_blocks
address arithmetic works without any dtype conversion.
Args:
tensor_page_size: Bytes per block for this tensor.
"""
assert self.rank is not None
new_offset = self._worker_offset + tensor_page_size
assert new_offset <= self._worker_area_end, (
f"Worker offset {new_offset} exceeds worker area end "
f"{self._worker_area_end} (overflowed by "
f"{new_offset - self._worker_area_end} bytes)"
)
worker_layer_view = torch.as_strided(
self._base,
size=(self.num_blocks, tensor_page_size),
stride=(self._row_stride, 1),
storage_offset=self._worker_offset,
)
self._worker_offset = new_offset
self._views.append(worker_layer_view)
return worker_layer_view
def cleanup(self) -> None:
if self.is_pinned and self._base is not None:
base_ptr = self._base.data_ptr()
result = torch.cuda.cudart().cudaHostUnregister(base_ptr)
if result.value != 0:
logger.warning(
"cudaHostUnregister failed for rank=%d (code=%d)", self.rank, result
)
self.is_pinned = False
# Release views before _base: each view holds a _base reference and a
# direct StorageImpl reference. Freeing views first lets both refcounts
# drop so the storage (which holds the mmap_obj buffer export) is freed
# before mmap_obj.close() is called below.
if self._views is not None:
self._views.clear()
self._base = None
if self.mmap_obj:
try:
self.mmap_obj.close()
except Exception:
logger.warning("Failed to close mmap_obj", exc_info=True)
self.mmap_obj = None
if self.fd is not None:
try:
os.close(self.fd)
except Exception:
logger.warning("Failed to close fd %s", self.fd, exc_info=True)
self.fd = None
if self._creator and getattr(self, "mmap_path", None):
try:
os.unlink(self.mmap_path)
logger.info("Removed mmap file %s", self.mmap_path)
except Exception:
logger.warning(
"Failed to unlink path %s", self.mmap_path, exc_info=True
)
self._creator = False
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import time
from collections import deque
from dataclasses import dataclass
......@@ -9,6 +10,7 @@ import torch
from vllm import _custom_ops as ops
from vllm.logger import init_logger
from vllm.utils.platform_utils import is_pin_memory_available
from vllm.v1.kv_offload.cpu.shared_offload_region import SharedOffloadRegion
from vllm.v1.kv_offload.mediums import BlockIDsLoadStoreSpec
from vllm.v1.kv_offload.spec import CanonicalKVCacheRef, CanonicalKVCaches
from vllm.v1.kv_offload.worker.worker import (
......@@ -29,37 +31,76 @@ class Transfer:
num_bytes: int
def expand_block_ids(
def compute_sub_block_ptrs(
block_ids: np.ndarray,
block_size_factor: int,
output: np.ndarray,
tensor: torch.Tensor,
skip_count: int = 0,
):
"""
Convert a list of block IDs to a list of matching block ids,
assuming each block is composed of actual block_size_factor blocks.
Outputs to output tensor.
The first skip_count blocks will be skipped.
Note that skip_count must be less than block_size_factor.
For example, if block_ids = [0, 1, 3] and block_size_factor = 4,
then it yields [0, 1, 2, 3, 4, 5, 6, 7, 12, 13, 14, 15]
since 0 maps to [0, 1, 2, 3]
1 maps to [4, 5, 6, 7]
and 3 maps to [12, 13, 14, 15]
Compute byte pointers for sub-blocks of the given block IDs.
Each block in block_ids contains block_size_factor sub-blocks.
The pointer for sub-block j of block b is:
base_ptr + b * row_stride + j * sub_block_size
where sub_block_size = tensor.shape[1] // block_size_factor (gpu page size).
This handles tensors where row_stride != block_size_factor * sub_block_size
(e.g. non-contiguous CPU tensors).
Args:
block_ids: array of block IDs at the tensor's native granularity.
block_size_factor: number of sub-blocks per block.
output: pre-allocated int64 array to write pointers into.
tensor: the source or destination tensor.
skip_count: sub-blocks to skip in the first block.
"""
assert skip_count < block_size_factor
first_range = np.arange(skip_count, block_size_factor)
full_range = np.arange(0, block_size_factor)
output_idx = 0
for i, block_id in enumerate(block_ids):
base_block_id = block_id * block_size_factor
indices = first_range if i == 0 else full_range
output_end_idx = output_idx + len(indices)
output[output_idx:output_end_idx] = base_block_id + indices
output_idx = output_end_idx
num_sub_blocks = len(output)
base_ptr = tensor.data_ptr()
row_stride = tensor.stride(0)
if block_size_factor == 1:
# Fast path: 1:1 mapping, no sub-block expansion needed.
output[:] = base_ptr + block_ids[:num_sub_blocks] * row_stride
return
# Vectorized expansion for block_size_factor > 1.
assert tensor.shape[1] % block_size_factor == 0
sub_block_size = tensor.shape[1] // block_size_factor
sub_offsets = np.arange(block_size_factor, dtype=np.int64) * sub_block_size
# (num_blocks, 1) + (1, block_size_factor) -> (num_blocks, block_size_factor)
all_ptrs = (
base_ptr + block_ids.astype(np.int64)[:, np.newaxis] * row_stride
) + sub_offsets[np.newaxis, :]
# Flatten and apply skip_count / truncation
flat = all_ptrs.ravel()
output[:] = flat[skip_count : skip_count + num_sub_blocks]
def pin_mmap_region(region: SharedOffloadRegion) -> None:
"""Register the entire mmap as CUDA pinned memory via cudaHostRegister."""
rank = region.rank
base_ptr = region._base.data_ptr()
result = torch.cuda.cudart().cudaHostRegister(base_ptr, region.total_size_bytes, 0)
if result.value != 0:
logger.warning(
"cudaHostRegister failed for rank=%d (code=%d) — "
"transfers will still work but may be slower (unpinned DMA)",
rank,
result,
)
else:
logger.debug(
"cudaHostRegister rank=%d %.2f GB",
rank,
region.total_size_bytes / 1e9,
)
region.is_pinned = True
class SingleDirectionOffloadingHandler(OffloadingHandler):
......@@ -149,13 +190,7 @@ class SingleDirectionOffloadingHandler(OffloadingHandler):
# list of CUDA events available for re-use
self._event_pool: list[torch.Event] = []
# Pre-compute base pointers and block sizes for batch copies.
self._src_base_ptrs = np.array(
[t.data_ptr() for t in self.src_tensors], dtype=np.int64
)
self._dst_base_ptrs = np.array(
[t.data_ptr() for t in self.dst_tensors], dtype=np.int64
)
# Pre-compute block sizes for batch copies.
self._block_size_in_bytes_arr = np.array(
self.tensor_block_size_in_bytes, dtype=np.int64
)
......@@ -176,17 +211,6 @@ class SingleDirectionOffloadingHandler(OffloadingHandler):
assert dst_sub_block_count == src_sub_block_count - src_sub_blocks_to_skip
src_block_ids = np.empty(dst_sub_block_count, dtype=np.int64)
dst_block_ids = np.empty(dst_sub_block_count, dtype=np.int64)
expand_block_ids(
src_blocks,
self.src_block_size_factor,
src_block_ids,
skip_count=src_sub_blocks_to_skip,
)
expand_block_ids(dst_blocks, self.dst_block_size_factor, dst_block_ids)
# Build flat pointer arrays for all tensors × all block pairs.
num_pairs = dst_sub_block_count
num_tensors = len(self.src_tensors)
total = num_pairs * num_tensors
......@@ -198,8 +222,19 @@ class SingleDirectionOffloadingHandler(OffloadingHandler):
for t_idx, bsz in enumerate(self._block_size_in_bytes_arr):
start = t_idx * num_pairs
end = start + num_pairs
all_src[start:end] = self._src_base_ptrs[t_idx] + src_block_ids * bsz
all_dst[start:end] = self._dst_base_ptrs[t_idx] + dst_block_ids * bsz
compute_sub_block_ptrs(
block_ids=src_blocks,
block_size_factor=self.src_block_size_factor,
output=all_src[start:end],
tensor=self.src_tensors[t_idx],
skip_count=src_sub_blocks_to_skip,
)
compute_sub_block_ptrs(
block_ids=dst_blocks,
block_size_factor=self.dst_block_size_factor,
output=all_dst[start:end],
tensor=self.dst_tensors[t_idx],
)
all_sizes[start:end] = bsz
batch_src = torch.from_numpy(all_src)
......@@ -281,6 +316,8 @@ class SingleDirectionOffloadingHandler(OffloadingHandler):
self._transfer_events.clear()
self._stream_pool.clear()
self._event_pool.clear()
self.src_tensors.clear()
self.dst_tensors.clear()
class CpuGpuOffloadingHandlers:
......@@ -289,9 +326,14 @@ class CpuGpuOffloadingHandlers:
kv_caches: CanonicalKVCaches,
block_size_factor: int,
num_cpu_blocks: int,
mmap_region: SharedOffloadRegion | None = None,
):
pin_memory = is_pin_memory_available()
logger.info("Allocating %d CPU tensors...", len(kv_caches.tensors))
self._mmap_region = mmap_region
if mmap_region is not None and pin_memory:
pin_mmap_region(mmap_region)
gpu_tensors: list[torch.Tensor] = []
cpu_tensors: list[torch.Tensor] = []
for kv_cache_tensor in kv_caches.tensors:
......@@ -300,12 +342,24 @@ class CpuGpuOffloadingHandlers:
(-1, gpu_page_size_bytes)
)
cpu_page_size_bytes = gpu_page_size_bytes * block_size_factor
if mmap_region is not None:
cpu_tensor = mmap_region.create_next_view(cpu_page_size_bytes)
else:
t0 = time.monotonic()
cpu_tensor = torch.zeros(
(num_cpu_blocks, cpu_page_size_bytes),
dtype=torch.int8,
device="cpu",
pin_memory=pin_memory,
)
logger.debug(
"torch.zeros pinned tensor %d×%d (%.2f GB): %.3f s",
num_cpu_blocks,
cpu_page_size_bytes,
num_cpu_blocks * cpu_page_size_bytes / 1e9,
time.monotonic() - t0,
)
gpu_tensors.append(gpu_tensor)
cpu_tensors.append(cpu_tensor)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment