Unverified Commit 00730fc6 authored by ptarasiewiczNV's avatar ptarasiewiczNV Committed by GitHub
Browse files

feat: bump vLLM version to v0.8.4 (#690)

parent 48733546
......@@ -181,16 +181,12 @@ RUN mkdir /opt/dynamo && \
ENV VIRTUAL_ENV=/opt/dynamo/venv
ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"
# Common dependencies
RUN --mount=type=bind,source=./container/deps/requirements.txt,target=/tmp/requirements.txt \
uv pip install --requirement /tmp/requirements.txt
# Install patched vllm - keep this early in Dockerfile to avoid
# rebuilds from unrelated source code changes
ARG VLLM_REF="0.7.2"
ARG VLLM_REF="0.8.4"
ARG VLLM_PATCH="vllm_v${VLLM_REF}-dynamo-kv-disagg-patch.patch"
ARG VLLM_PATCHED_PACKAGE_NAME="ai_dynamo_vllm"
ARG VLLM_PATCHED_PACKAGE_VERSION="0.7.2.post1"
ARG VLLM_PATCHED_PACKAGE_VERSION="0.8.4"
RUN --mount=type=bind,source=./container/deps/,target=/tmp/deps \
mkdir /tmp/vllm && \
uv pip install pip wheel && \
......@@ -211,6 +207,10 @@ RUN --mount=type=bind,source=./container/deps/,target=/tmp/deps \
wheel pack . --dest-dir /workspace/dist && \
uv pip install /workspace/dist/${VLLM_PATCHED_PACKAGE_NAME}-*.whl
# Common dependencies
RUN --mount=type=bind,source=./container/deps/requirements.txt,target=/tmp/requirements.txt \
uv pip install --requirement /tmp/requirements.txt
# Install test dependencies
RUN --mount=type=bind,source=./container/deps/requirements.test.txt,target=/tmp/requirements.txt \
uv pip install --requirement /tmp/requirements.txt
......
......@@ -27,4 +27,4 @@ pytestmark = pytest.mark.pre_merge
@pytest.mark.skipif(vllm is None, reason="Skipping vllm tests, vllm not installed")
def test_version():
# Verify that the image has the patched version of vllm
assert vllm.__version__.endswith("0.7.2") # type: ignore
assert vllm.__version__.endswith("0.8.4") # type: ignore
diff --git a/vllm/config.py b/vllm/config.py
index 9ba49757..a4df0019 100644
index 9ba497576..db2dc002f 100644
--- a/vllm/config.py
+++ b/vllm/config.py
@@ -2620,6 +2620,9 @@ class KVTransferConfig(BaseModel):
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import ast
import copy
@@ -2620,6 +2633,9 @@ class KVTransferConfig(BaseModel):
# The KV connector for vLLM to transmit KV caches between vLLM instances.
kv_connector: Optional[str] = None
......@@ -12,7 +30,7 @@ index 9ba49757..a4df0019 100644
# The device used by kv connector to buffer the KV cache.
# Currently only support 'cuda'.
kv_buffer_device: Optional[str] = "cuda"
@@ -2629,7 +2632,7 @@ class KVTransferConfig(BaseModel):
@@ -2629,7 +2645,7 @@ class KVTransferConfig(BaseModel):
kv_buffer_size: float = 1e9
# Whether this vLLM instance produces, consumes KV cache, or both. Choices
......@@ -21,7 +39,7 @@ index 9ba49757..a4df0019 100644
kv_role: Optional[str] = None
# The rank of this vLLM instance in the KV cache transfer. Typical value:
@@ -2647,6 +2650,14 @@ class KVTransferConfig(BaseModel):
@@ -2647,6 +2663,14 @@ class KVTransferConfig(BaseModel):
# The KV connector port, used to build distributed connection
kv_port: int = 14579
......@@ -36,7 +54,7 @@ index 9ba49757..a4df0019 100644
def compute_hash(self) -> str:
"""
WARNING: Whenever a new field is added to this config,
@@ -2680,11 +2691,16 @@ class KVTransferConfig(BaseModel):
@@ -2680,11 +2704,16 @@ class KVTransferConfig(BaseModel):
f"Supported roles are `kv_producer`, `kv_consumer`, "
f"and `kv_both`")
......@@ -54,7 +72,7 @@ index 9ba49757..a4df0019 100644
@property
def is_kv_transfer_instance(self) -> bool:
return self.kv_connector is not None and \
@@ -2694,6 +2710,8 @@ class KVTransferConfig(BaseModel):
@@ -2694,6 +2723,8 @@ class KVTransferConfig(BaseModel):
def need_kv_parallel_group(self) -> bool:
# for those database-based connector, vLLM does not need to create
# parallel group, and in that case the kv parallel size will be 1.
......@@ -63,7 +81,7 @@ index 9ba49757..a4df0019 100644
return self.kv_connector is not None and self.kv_parallel_size > 1
@property
@@ -2706,6 +2724,18 @@ class KVTransferConfig(BaseModel):
@@ -2706,6 +2737,18 @@ class KVTransferConfig(BaseModel):
return self.kv_connector is not None and \
self.kv_role in ["kv_consumer", "kv_both"]
......@@ -83,10 +101,28 @@ index 9ba49757..a4df0019 100644
class CompilationLevel:
# constants for the levels of the compilation process
diff --git a/vllm/core/block/cpu_gpu_block_allocator.py b/vllm/core/block/cpu_gpu_block_allocator.py
index 359b5b26..d52ee050 100644
index 359b5b263..7bac45ff0 100644
--- a/vllm/core/block/cpu_gpu_block_allocator.py
+++ b/vllm/core/block/cpu_gpu_block_allocator.py
@@ -6,6 +6,7 @@ from vllm.core.block.interfaces import (Block, BlockAllocator, BlockId,
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from typing import Dict, FrozenSet, List, Optional, Tuple
@@ -6,6 +19,7 @@ from vllm.core.block.interfaces import (Block, BlockAllocator, BlockId,
DeviceAwareBlockAllocator)
from vllm.core.block.naive_block import NaiveBlock, NaiveBlockAllocator
from vllm.core.block.prefix_caching_block import PrefixCachingBlockAllocator
......@@ -94,7 +130,7 @@ index 359b5b26..d52ee050 100644
from vllm.platforms import current_platform
from vllm.utils import Device
@@ -28,6 +29,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
@@ -28,6 +42,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
num_gpu_blocks: int,
num_cpu_blocks: int,
block_size: int,
......@@ -102,7 +138,7 @@ index 359b5b26..d52ee050 100644
) -> DeviceAwareBlockAllocator:
"""Creates a CpuGpuBlockAllocator instance with the specified
configuration.
@@ -64,6 +66,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
@@ -64,6 +79,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
cpu_block_ids = block_ids[num_gpu_blocks:]
if allocator_type == "naive":
......@@ -110,7 +146,7 @@ index 359b5b26..d52ee050 100644
gpu_allocator: BlockAllocator = NaiveBlockAllocator(
create_block=NaiveBlock, # type: ignore
num_blocks=num_gpu_blocks,
@@ -82,12 +85,14 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
@@ -82,12 +98,14 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
num_blocks=num_gpu_blocks,
block_size=block_size,
block_ids=gpu_block_ids,
......@@ -125,7 +161,7 @@ index 359b5b26..d52ee050 100644
)
else:
raise ValueError(f"Unknown allocator type {allocator_type=}")
@@ -95,10 +100,12 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
@@ -95,10 +113,12 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
return CpuGpuBlockAllocator(
cpu_block_allocator=cpu_allocator,
gpu_block_allocator=gpu_allocator,
......@@ -139,7 +175,7 @@ index 359b5b26..d52ee050 100644
assert not (
cpu_block_allocator.all_block_ids
& gpu_block_allocator.all_block_ids
@@ -108,6 +115,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
@@ -108,6 +128,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
Device.CPU: cpu_block_allocator,
Device.GPU: gpu_block_allocator,
}
......@@ -148,10 +184,24 @@ index 359b5b26..d52ee050 100644
self._swap_mapping: Dict[int, int] = {}
self._null_block: Optional[Block] = None
diff --git a/vllm/core/block/naive_block.py b/vllm/core/block/naive_block.py
index c388366b..31ed7aa4 100644
index c388366b8..3c223b519 100644
--- a/vllm/core/block/naive_block.py
+++ b/vllm/core/block/naive_block.py
@@ -2,7 +2,7 @@
@@ -1,8 +1,21 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from collections import deque
from typing import Deque, FrozenSet, Iterable, List, Optional, Tuple, Union
......@@ -160,7 +210,7 @@ index c388366b..31ed7aa4 100644
from vllm.core.block.common import (BlockPool, CopyOnWriteTracker, RefCounter,
get_all_blocks_recursively)
from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device
@@ -38,7 +38,7 @@ class NaiveBlockAllocator(BlockAllocator):
@@ -38,7 +51,7 @@ class NaiveBlockAllocator(BlockAllocator):
if block_ids is None:
block_ids = range(num_blocks)
......@@ -169,7 +219,7 @@ index c388366b..31ed7aa4 100644
self._all_block_indices = frozenset(block_ids)
assert len(self._all_block_indices) == num_blocks
@@ -134,7 +134,8 @@ class NaiveBlockAllocator(BlockAllocator):
@@ -134,7 +147,8 @@ class NaiveBlockAllocator(BlockAllocator):
if not self._free_block_indices:
raise BlockAllocator.NoFreeBlocksError()
......@@ -179,7 +229,7 @@ index c388366b..31ed7aa4 100644
self._refcounter.incr(block_id)
return block_id
@@ -148,7 +149,7 @@ class NaiveBlockAllocator(BlockAllocator):
@@ -148,7 +162,7 @@ class NaiveBlockAllocator(BlockAllocator):
refcount = self._refcounter.decr(block_id)
if refcount == 0:
......@@ -189,10 +239,26 @@ index c388366b..31ed7aa4 100644
def free(self, block: Block, keep_block_object: bool = False) -> None:
# Release the physical block id
diff --git a/vllm/core/block/prefix_caching_block.py b/vllm/core/block/prefix_caching_block.py
index 1ca9e49d..cd780f69 100644
index 1ca9e49da..26fabb243 100644
--- a/vllm/core/block/prefix_caching_block.py
+++ b/vllm/core/block/prefix_caching_block.py
@@ -4,7 +4,7 @@ import sys
@@ -1,10 +1,23 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""Token blocks."""
import sys
from bisect import bisect_left
from os.path import commonprefix
from typing import (Callable, Dict, FrozenSet, Iterable, List, Optional, Set,
......@@ -201,7 +267,7 @@ index 1ca9e49d..cd780f69 100644
from vllm.core.block.common import (CacheMetricData, CopyOnWriteTracker,
get_all_blocks_recursively)
@@ -23,6 +23,9 @@ PrefixHash = int
@@ -23,6 +36,9 @@ PrefixHash = int
# then we know this block hasn't been accessed yet.
_DEFAULT_LAST_ACCESSED_TIME = -1
......@@ -211,7 +277,7 @@ index 1ca9e49d..cd780f69 100644
logger = init_logger(__name__)
@@ -80,6 +83,7 @@ class PrefixCachingBlockAllocator(BlockAllocator):
@@ -80,6 +96,7 @@ class PrefixCachingBlockAllocator(BlockAllocator):
block_size: int,
block_ids: Optional[Iterable[int]] = None,
eviction_policy: EvictionPolicy = EvictionPolicy.LRU,
......@@ -219,7 +285,7 @@ index 1ca9e49d..cd780f69 100644
):
if block_ids is None:
block_ids = range(num_blocks)
@@ -131,6 +135,9 @@ class PrefixCachingBlockAllocator(BlockAllocator):
@@ -131,6 +148,9 @@ class PrefixCachingBlockAllocator(BlockAllocator):
self.metric_data = CacheMetricData()
......@@ -229,7 +295,7 @@ index 1ca9e49d..cd780f69 100644
def _create_block(
self,
prev_block: Optional[Block],
@@ -337,6 +344,9 @@ class PrefixCachingBlockAllocator(BlockAllocator):
@@ -337,6 +357,9 @@ class PrefixCachingBlockAllocator(BlockAllocator):
assert self._refcounter.get(_block_id) == 0
assert _block_id == block_id
......@@ -239,7 +305,7 @@ index 1ca9e49d..cd780f69 100644
self._cached_blocks.pop(content_hash_to_evict)
self._refcounter.incr(block_id)
@@ -513,6 +523,10 @@ class PrefixCachingBlockAllocator(BlockAllocator):
@@ -513,6 +536,10 @@ class PrefixCachingBlockAllocator(BlockAllocator):
# Mark this block as touched so that it can be marked as
# computed after the entire batch of sequences are scheduled.
self._touched_blocks.add(block.block_id)
......@@ -250,7 +316,7 @@ index 1ca9e49d..cd780f69 100644
return block.block_id
# Reuse the cached content hash
@@ -579,9 +593,11 @@ class PrefixCachingBlockAllocator(BlockAllocator):
@@ -579,9 +606,11 @@ class PrefixCachingBlockAllocator(BlockAllocator):
def mark_blocks_as_computed(self, block_ids: List[int]) -> None:
# Mark all touched blocks as computed.
......@@ -266,10 +332,28 @@ index 1ca9e49d..cd780f69 100644
def _track_block_id(self, block_id: Optional[BlockId],
computed: bool) -> None:
diff --git a/vllm/core/block_manager.py b/vllm/core/block_manager.py
index c5b3b04f..21fe0fc8 100644
index c5b3b04f3..d3a4b77f8 100644
--- a/vllm/core/block_manager.py
+++ b/vllm/core/block_manager.py
@@ -10,7 +10,10 @@ from vllm.core.block.interfaces import Block
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""A block manager that manages token blocks."""
from typing import Dict, List, Optional
from typing import Sequence as GenericSequence
@@ -10,7 +23,10 @@ from vllm.core.block.interfaces import Block
from vllm.core.block.prefix_caching_block import (ComputedBlocksTracker,
LastAccessBlocksTracker)
from vllm.core.block.utils import check_no_caching_or_swa_for_blockmgr_encdec
......@@ -280,7 +364,7 @@ index c5b3b04f..21fe0fc8 100644
from vllm.sequence import Sequence, SequenceGroup, SequenceStatus
from vllm.utils import Device
@@ -60,6 +63,7 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
@@ -60,6 +76,7 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
def __init__(
self,
......@@ -288,7 +372,7 @@ index c5b3b04f..21fe0fc8 100644
block_size: int,
num_gpu_blocks: int,
num_cpu_blocks: int,
@@ -91,11 +95,29 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
@@ -91,11 +108,29 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
self.watermark_blocks = int(watermark * num_gpu_blocks)
......@@ -318,7 +402,7 @@ index c5b3b04f..21fe0fc8 100644
)
self.block_tables: Dict[SeqId, BlockTable] = {}
@@ -108,7 +130,8 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
@@ -108,7 +143,8 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
def can_allocate(self,
seq_group: SequenceGroup,
......@@ -328,7 +412,7 @@ index c5b3b04f..21fe0fc8 100644
# FIXME(woosuk): Here we assume that all sequences in the group share
# the same prompt. This may not be true for preempted sequences.
@@ -121,6 +144,10 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
@@ -121,6 +157,10 @@ class SelfAttnBlockSpaceManager(BlockSpaceManager):
num_lookahead_slots=num_lookahead_slots,
)
......@@ -341,11 +425,24 @@ index c5b3b04f..21fe0fc8 100644
assert encoder_seq is not None
diff --git a/vllm/core/event_manager.py b/vllm/core/event_manager.py
new file mode 100644
index 00000000..a27af580
index 000000000..79eb8db67
--- /dev/null
+++ b/vllm/core/event_manager.py
@@ -0,0 +1,108 @@
@@ -0,0 +1,121 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import ctypes
+import logging
+import uuid
......@@ -454,10 +551,26 @@ index 00000000..a27af580
+
+ self.event_id_counter += 1
diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py
index f507847a..170a359f 100644
index f507847ad..3f3cba766 100644
--- a/vllm/core/scheduler.py
+++ b/vllm/core/scheduler.py
@@ -4,22 +4,22 @@ import enum
@@ -1,25 +1,38 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import enum
import os
import random
import time
......@@ -484,7 +597,7 @@ index f507847a..170a359f 100644
logger = init_logger(__name__)
# Test-only. If configured, decode is preempted with
@@ -285,6 +285,7 @@ class SchedulerPrefillOutputs:
@@ -285,6 +298,7 @@ class SchedulerPrefillOutputs:
# Ignored sequence groups.
ignored_seq_groups: List[SequenceGroup]
num_lookahead_slots: int
......@@ -492,7 +605,7 @@ index f507847a..170a359f 100644
@classmethod
def create_empty(cls) -> "SchedulerPrefillOutputs":
@@ -292,6 +293,7 @@ class SchedulerPrefillOutputs:
@@ -292,6 +306,7 @@ class SchedulerPrefillOutputs:
seq_groups=[],
ignored_seq_groups=[],
num_lookahead_slots=0,
......@@ -500,7 +613,7 @@ index f507847a..170a359f 100644
)
@@ -325,12 +327,14 @@ class Scheduler:
@@ -325,12 +340,14 @@ class Scheduler:
def __init__(
self,
......@@ -515,7 +628,7 @@ index f507847a..170a359f 100644
self.scheduler_config = scheduler_config
self.cache_config = cache_config
# Note for LoRA scheduling: the current policy is extremely
@@ -356,6 +360,7 @@ class Scheduler:
@@ -356,6 +373,7 @@ class Scheduler:
# Create the block space manager.
self.block_manager = BlockSpaceManagerImpl(
......@@ -523,7 +636,7 @@ index f507847a..170a359f 100644
block_size=self.cache_config.block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=num_cpu_blocks,
@@ -371,6 +376,16 @@ class Scheduler:
@@ -371,6 +389,16 @@ class Scheduler:
# Sequence groups in the SWAPPED state.
# Contain decode requests that are swapped out.
self.swapped: Deque[SequenceGroup] = deque()
......@@ -540,7 +653,7 @@ index f507847a..170a359f 100644
# Sequence groups finished requests ids since last step iteration.
# It lets the model know that any state associated with these requests
# can and must be released after the current step.
@@ -501,7 +516,7 @@ class Scheduler:
@@ -501,7 +529,7 @@ class Scheduler:
def has_unfinished_seqs(self) -> bool:
return len(self.waiting) != 0 or len(self.running) != 0 or len(
......@@ -549,7 +662,7 @@ index f507847a..170a359f 100644
def get_prefix_cache_hit_rate(self, device: Device) -> float:
return self.block_manager.get_prefix_cache_hit_rate(device)
@@ -523,6 +538,8 @@ class Scheduler:
@@ -523,6 +551,8 @@ class Scheduler:
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
enable_chunking: bool = False,
......@@ -558,7 +671,7 @@ index f507847a..170a359f 100644
) -> SchedulerRunningOutputs:
"""Schedule sequence groups that are running.
@@ -537,6 +554,8 @@ class Scheduler:
@@ -537,6 +567,8 @@ class Scheduler:
chunked number of tokens are scheduled if
`budget.num_batched_tokens` has not enough capacity to schedule
all tokens.
......@@ -567,7 +680,7 @@ index f507847a..170a359f 100644
Returns:
SchedulerRunningOutputs.
@@ -566,6 +585,38 @@ class Scheduler:
@@ -566,6 +598,38 @@ class Scheduler:
preempted: List[SequenceGroup] = ret.preempted
swapped_out: List[SequenceGroup] = ret.swapped_out
......@@ -606,7 +719,7 @@ index f507847a..170a359f 100644
running_queue = self.running
assert len(self._async_stopped) == 0
while running_queue:
@@ -925,6 +976,7 @@ class Scheduler:
@@ -925,6 +989,7 @@ class Scheduler:
seq_groups: List[ScheduledSequenceGroup] = []
waiting_queue = self.waiting
......@@ -614,7 +727,7 @@ index f507847a..170a359f 100644
leftover_waiting_sequences: Deque[SequenceGroup] = deque()
while self._passed_delay(time.time()) and waiting_queue:
@@ -961,8 +1013,10 @@ class Scheduler:
@@ -961,8 +1026,10 @@ class Scheduler:
True, enable_chunking)
# If the sequence group cannot be allocated, stop.
......@@ -626,7 +739,7 @@ index f507847a..170a359f 100644
if can_allocate == AllocStatus.LATER:
break
elif can_allocate == AllocStatus.NEVER:
@@ -1008,7 +1062,18 @@ class Scheduler:
@@ -1008,7 +1075,18 @@ class Scheduler:
if curr_loras is not None and lora_int_id > 0:
curr_loras.add(lora_int_id)
waiting_queue.popleft()
......@@ -646,7 +759,7 @@ index f507847a..170a359f 100644
if enable_chunking and self.scheduler_config.is_multi_step:
blocks_to_copy: List[Tuple[int, int]] = []
@@ -1046,9 +1111,11 @@ class Scheduler:
@@ -1046,9 +1124,11 @@ class Scheduler:
seq_groups=seq_groups,
ignored_seq_groups=ignored_seq_groups,
num_lookahead_slots=self._get_num_lookahead_slots(
......@@ -660,7 +773,7 @@ index f507847a..170a359f 100644
"""Schedule queued requests.
The current policy is designed to optimize the throughput. First,
@@ -1066,9 +1133,13 @@ class Scheduler:
@@ -1066,9 +1146,13 @@ class Scheduler:
for seq_group in self.running:
budget.add_num_seqs(seq_group.request_id,
seq_group.get_max_num_running_seqs())
......@@ -676,7 +789,7 @@ index f507847a..170a359f 100644
prefills = SchedulerPrefillOutputs.create_empty()
running_scheduled = SchedulerRunningOutputs.create_empty()
@@ -1090,7 +1161,9 @@ class Scheduler:
@@ -1090,7 +1174,9 @@ class Scheduler:
if len(prefills.seq_groups) == 0:
running_scheduled = self._schedule_running(budget,
curr_loras,
......@@ -687,7 +800,7 @@ index f507847a..170a359f 100644
# If any sequence group is preempted, do not swap in any sequence
# group. because it means there's no slot for new running requests.
@@ -1106,7 +1179,12 @@ class Scheduler:
@@ -1106,7 +1192,12 @@ class Scheduler:
self.waiting.extendleft(running_scheduled.preempted)
# Update new running requests.
if len(prefills.seq_groups) > 0:
......@@ -701,7 +814,7 @@ index f507847a..170a359f 100644
self.running.extend(running_scheduled.decode_seq_groups_list)
@@ -1248,12 +1326,14 @@ class Scheduler:
@@ -1248,12 +1339,14 @@ class Scheduler:
len(running_scheduled.swapped_out)),
)
......@@ -718,7 +831,7 @@ index f507847a..170a359f 100644
def _can_append_slots(self, seq_group: SequenceGroup,
enable_chunking: bool) -> bool:
@@ -1287,14 +1367,16 @@ class Scheduler:
@@ -1287,14 +1380,16 @@ class Scheduler:
return no_single_seq
def schedule(
......@@ -738,7 +851,7 @@ index f507847a..170a359f 100644
now = time.time()
if not self.cache_config.enable_prefix_caching:
@@ -1333,7 +1415,8 @@ class Scheduler:
@@ -1333,7 +1428,8 @@ class Scheduler:
encoder_seq_data = None
cross_block_table = None
......@@ -748,7 +861,7 @@ index f507847a..170a359f 100644
seq_id = seq.seq_id
seq_data[seq_id] = seq.data
block_tables[seq_id] = self.block_manager.get_block_table(seq)
@@ -1342,7 +1425,9 @@ class Scheduler:
@@ -1342,7 +1438,9 @@ class Scheduler:
if self.cache_config.enable_prefix_caching:
common_computed_block_nums = (
self.block_manager.get_common_computed_block_ids(
......@@ -759,7 +872,7 @@ index f507847a..170a359f 100644
do_sample = True
is_prompt = seq_group.is_prefill()
@@ -1364,9 +1449,30 @@ class Scheduler:
@@ -1364,9 +1462,30 @@ class Scheduler:
< seqs[0].data.get_len()):
do_sample = False
......@@ -790,7 +903,7 @@ index f507847a..170a359f 100644
seq_group_metadata = SequenceGroupMetadata(
request_id=seq_group.request_id,
is_prompt=is_prompt,
@@ -1392,6 +1498,7 @@ class Scheduler:
@@ -1392,6 +1511,7 @@ class Scheduler:
if scheduler_outputs.num_prefill_groups > 0 else None,
mm_processor_kwargs=seq_group.mm_processor_kwargs,
prompt_adapter_request=seq_group.prompt_adapter_request,
......@@ -798,7 +911,7 @@ index f507847a..170a359f 100644
)
else:
# When SPMD mode is enabled, we only send delta data except for
@@ -1490,11 +1597,17 @@ class Scheduler:
@@ -1490,11 +1610,17 @@ class Scheduler:
self._async_stopped.clear()
......@@ -821,10 +934,25 @@ index f507847a..170a359f 100644
blocks_to_copy: List[Tuple[int, int]],
diff --git a/vllm/distributed/device_communicators/kv_rearrange.py b/vllm/distributed/device_communicators/kv_rearrange.py
new file mode 100644
index 00000000..b9485bd5
index 000000000..a2f9ce99e
--- /dev/null
+++ b/vllm/distributed/device_communicators/kv_rearrange.py
@@ -0,0 +1,110 @@
@@ -0,0 +1,125 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import torch
+import triton
+import triton.language as tl
......@@ -938,10 +1066,25 @@ index 00000000..b9485bd5
\ No newline at end of file
diff --git a/vllm/distributed/device_communicators/nixl.py b/vllm/distributed/device_communicators/nixl.py
new file mode 100644
index 00000000..a8bd202f
index 000000000..136a0bd37
--- /dev/null
+++ b/vllm/distributed/device_communicators/nixl.py
@@ -0,0 +1,379 @@
@@ -0,0 +1,394 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import torch
+from typing import List, Tuple
+from vllm.config import VllmConfig
......@@ -1323,11 +1466,24 @@ index 00000000..a8bd202f
+ return done_req_ids
diff --git a/vllm/distributed/kv_transfer/kv_connector/dynamo_connector.py b/vllm/distributed/kv_transfer/kv_connector/dynamo_connector.py
new file mode 100644
index 00000000..7b3344f8
index 000000000..418fc7154
--- /dev/null
+++ b/vllm/distributed/kv_transfer/kv_connector/dynamo_connector.py
@@ -0,0 +1,350 @@
@@ -0,0 +1,363 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Simple KV Cache Connector for Distributed Machine Learning Inference
+
......@@ -1678,10 +1834,28 @@ index 00000000..7b3344f8
+ self.config.kv_consumers_pipeline_parallel_size = kv_config_enhanced["kv_consumers_pipeline_parallel_size"]
+ self.config.kv_producers_parallel_size = kv_config_enhanced["kv_producers_parallel_size"]
diff --git a/vllm/distributed/kv_transfer/kv_connector/factory.py b/vllm/distributed/kv_transfer/kv_connector/factory.py
index fe480533..c82fda80 100644
index fe4805334..0e16f0b31 100644
--- a/vllm/distributed/kv_transfer/kv_connector/factory.py
+++ b/vllm/distributed/kv_transfer/kv_connector/factory.py
@@ -27,13 +27,13 @@ class KVConnectorFactory:
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import importlib
from typing import TYPE_CHECKING, Callable, Dict, Type
@@ -27,13 +40,13 @@ class KVConnectorFactory:
@classmethod
def create_connector(cls, rank: int, local_rank: int,
......@@ -1697,7 +1871,7 @@ index fe480533..c82fda80 100644
# Register various connectors here.
@@ -48,3 +48,8 @@ KVConnectorFactory.register_connector(
@@ -48,3 +61,8 @@ KVConnectorFactory.register_connector(
"MooncakeConnector",
"vllm.distributed.kv_transfer.kv_connector.simple_connector",
"SimpleConnector")
......@@ -1707,10 +1881,28 @@ index fe480533..c82fda80 100644
+ "vllm.distributed.kv_transfer.kv_connector.dynamo_connector",
+ "DynamoConnector")
diff --git a/vllm/distributed/kv_transfer/kv_connector/simple_connector.py b/vllm/distributed/kv_transfer/kv_connector/simple_connector.py
index 2033e976..ddebb68e 100644
index 2033e9762..983bc69a3 100644
--- a/vllm/distributed/kv_transfer/kv_connector/simple_connector.py
+++ b/vllm/distributed/kv_transfer/kv_connector/simple_connector.py
@@ -8,13 +8,15 @@ MooncakePipe.
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""
Simple KV Cache Connector for Distributed Machine Learning Inference
@@ -8,13 +21,15 @@ MooncakePipe.
But the logic can be extended to support other pipe and lookup buffer.
"""
......@@ -1727,7 +1919,7 @@ index 2033e976..ddebb68e 100644
from vllm.distributed.kv_transfer.kv_lookup_buffer.simple_buffer import (
SimpleBuffer)
from vllm.logger import init_logger
@@ -33,6 +35,7 @@ class SimpleConnector(KVConnectorBase):
@@ -33,6 +48,7 @@ class SimpleConnector(KVConnectorBase):
rank: int,
local_rank: int,
config: VllmConfig,
......@@ -1735,7 +1927,7 @@ index 2033e976..ddebb68e 100644
):
self.config = config.kv_transfer_config
@@ -71,20 +74,31 @@ class SimpleConnector(KVConnectorBase):
@@ -71,20 +87,31 @@ class SimpleConnector(KVConnectorBase):
self.producer_signal_pipe: Union[PyNcclPipe, MooncakePipe]
self.consumer_signal_pipe: Union[PyNcclPipe, MooncakePipe]
......@@ -1768,7 +1960,7 @@ index 2033e976..ddebb68e 100644
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base + 1,
@@ -108,11 +122,13 @@ class SimpleConnector(KVConnectorBase):
@@ -108,11 +135,13 @@ class SimpleConnector(KVConnectorBase):
# its recv pipe to the send pipe of KV producder
if self.config.kv_connector == "PyNcclConnector":
self.consumer_data_pipe = PyNcclPipe(
......@@ -1782,7 +1974,7 @@ index 2033e976..ddebb68e 100644
local_rank=local_rank,
config=self.config,
port_offset=port_offset_base + 1,
@@ -131,21 +147,25 @@ class SimpleConnector(KVConnectorBase):
@@ -131,21 +160,25 @@ class SimpleConnector(KVConnectorBase):
self.config.kv_buffer_size,
)
......@@ -1812,7 +2004,7 @@ index 2033e976..ddebb68e 100644
def send_kv_caches_and_hidden_states(
self,
@@ -161,12 +181,20 @@ class SimpleConnector(KVConnectorBase):
@@ -161,12 +194,20 @@ class SimpleConnector(KVConnectorBase):
slot_mapping_flat = model_input.attn_metadata.slot_mapping.flatten()
start_layer = model_executable.model.start_layer
end_layer = model_executable.model.end_layer
......@@ -1837,7 +2029,7 @@ index 2033e976..ddebb68e 100644
# query_lens contains new KV caches that are added to vLLM.
# so we will send them to decode instance
@@ -175,27 +203,40 @@ class SimpleConnector(KVConnectorBase):
@@ -175,27 +216,40 @@ class SimpleConnector(KVConnectorBase):
start_pos = sum(seq_lens[:idx])
end_pos = start_pos + slen
current_tokens = input_tokens_tensor[start_pos:end_pos]
......@@ -1892,7 +2084,7 @@ index 2033e976..ddebb68e 100644
logger.debug("[rank%d]: KV send DONE.", torch.distributed.get_rank())
@@ -215,6 +256,7 @@ class SimpleConnector(KVConnectorBase):
@@ -215,6 +269,7 @@ class SimpleConnector(KVConnectorBase):
input_tokens_tensor = model_input.input_tokens
seq_lens = model_input.attn_metadata.seq_lens
slot_mapping = model_input.attn_metadata.slot_mapping.flatten()
......@@ -1900,7 +2092,7 @@ index 2033e976..ddebb68e 100644
hidden_or_intermediate_states_for_one_req = []
@@ -222,6 +264,9 @@ class SimpleConnector(KVConnectorBase):
@@ -222,6 +277,9 @@ class SimpleConnector(KVConnectorBase):
num_computed_tokens_list = []
start_pos_list = []
......@@ -1910,7 +2102,7 @@ index 2033e976..ddebb68e 100644
# enumerate different requests
# FIXME(Kuntai): This impl assumes that all requests are prefill.
for idx, slen in enumerate(seq_lens):
@@ -229,13 +274,15 @@ class SimpleConnector(KVConnectorBase):
@@ -229,13 +287,15 @@ class SimpleConnector(KVConnectorBase):
start_pos = sum(seq_lens[:idx])
end_pos = start_pos + slen
current_tokens = input_tokens_tensor[start_pos:end_pos]
......@@ -1927,7 +2119,7 @@ index 2033e976..ddebb68e 100644
torch.ones_like(current_tokens, dtype=bool))
if ret[0] is None:
# didn't find any match.
@@ -267,19 +314,25 @@ class SimpleConnector(KVConnectorBase):
@@ -267,19 +327,25 @@ class SimpleConnector(KVConnectorBase):
kv_cache = kv_caches[i - model_executable.model.start_layer]
layer = model_executable.model.layers[i]
......@@ -1966,7 +2158,7 @@ index 2033e976..ddebb68e 100644
hidden_or_intermediate_states_for_one_req.append(hidden)
@@ -312,3 +365,77 @@ class SimpleConnector(KVConnectorBase):
@@ -312,3 +378,77 @@ class SimpleConnector(KVConnectorBase):
# MooncakePipe reuses data_pipe for signal_pipe, so we only have to
# close the data_pipe.
pass
......@@ -2045,10 +2237,28 @@ index 2033e976..ddebb68e 100644
+ self.config.kv_consumers_pipeline_parallel_size = kv_config_enhanced["kv_consumers_pipeline_parallel_size"]
+ self.config.kv_producers_parallel_size = kv_config_enhanced["kv_producers_parallel_size"]
diff --git a/vllm/distributed/kv_transfer/kv_lookup_buffer/simple_buffer.py b/vllm/distributed/kv_transfer/kv_lookup_buffer/simple_buffer.py
index 5e1b6235..b4506877 100644
index 5e1b62352..7b4cb406e 100644
--- a/vllm/distributed/kv_transfer/kv_lookup_buffer/simple_buffer.py
+++ b/vllm/distributed/kv_transfer/kv_lookup_buffer/simple_buffer.py
@@ -12,7 +12,8 @@
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""
Implements a distributed key-value (KV) cache transfer mechanism.
@@ -12,7 +25,8 @@
import threading
import time
from collections import deque
......@@ -2058,7 +2268,7 @@ index 5e1b6235..b4506877 100644
import torch
@@ -46,7 +47,7 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -46,7 +60,7 @@ class SimpleBuffer(KVLookupBufferBase):
self.buffer_lock = threading.Lock()
self.signal_pipe = signal_pipe
self.data_pipe = data_pipe
......@@ -2067,7 +2277,7 @@ index 5e1b6235..b4506877 100644
self.normal_signal = torch.tensor([0], device="cpu")
self.end_signal = None
@@ -57,10 +58,16 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -57,10 +71,16 @@ class SimpleBuffer(KVLookupBufferBase):
# tokens_roi_sender: tokens and roi of the producer (in the buffer)
# tokens_roi_recver: tokens and roi of the consumer (query)
......@@ -2088,7 +2298,7 @@ index 5e1b6235..b4506877 100644
if tokens_recver is None:
# consumer sends an empty request
@@ -80,14 +87,14 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -80,14 +100,14 @@ class SimpleBuffer(KVLookupBufferBase):
return 0
......@@ -2106,7 +2316,7 @@ index 5e1b6235..b4506877 100644
def _get_element_size(self, data: Optional[Union[List, torch.Tensor]]):
@@ -100,7 +107,7 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -100,7 +120,7 @@ class SimpleBuffer(KVLookupBufferBase):
raise AssertionError(f"Unknown data type {type(data)}")
......@@ -2115,7 +2325,7 @@ index 5e1b6235..b4506877 100644
key: torch.Tensor, value: torch.Tensor,
hidden: torch.Tensor):
@@ -115,7 +122,7 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -115,7 +135,7 @@ class SimpleBuffer(KVLookupBufferBase):
if isinstance(hidden, torch.Tensor):
hidden = hidden.clone()
......@@ -2124,7 +2334,7 @@ index 5e1b6235..b4506877 100644
with self.buffer_lock:
for data in buffer_item:
@@ -125,53 +132,54 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -125,53 +145,54 @@ class SimpleBuffer(KVLookupBufferBase):
def _is_end_signal(self, signal):
return signal is None
......@@ -2223,7 +2433,7 @@ index 5e1b6235..b4506877 100644
except RuntimeError as e:
if 'Connection closed by peer' not in str(e):
@@ -180,10 +188,10 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -180,10 +201,10 @@ class SimpleBuffer(KVLookupBufferBase):
logger.debug("Closing drop_select_handler")
def drop_select(
......@@ -2236,7 +2446,7 @@ index 5e1b6235..b4506877 100644
"drop_select should be called by the KV cache consumer "\
"(e.g. the decode vLLM instance)"
@@ -192,26 +200,28 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -192,26 +213,28 @@ class SimpleBuffer(KVLookupBufferBase):
if isinstance(roi, torch.Tensor):
roi = roi.clone().float()
......@@ -2274,7 +2484,7 @@ index 5e1b6235..b4506877 100644
key: torch.Tensor, value: torch.Tensor,
hidden: torch.Tensor) -> None:
@@ -222,20 +232,19 @@ class SimpleBuffer(KVLookupBufferBase):
@@ -222,20 +245,19 @@ class SimpleBuffer(KVLookupBufferBase):
while self.buffer_size > self.buffer_size_threshold:
self.full_handler()
......@@ -2302,10 +2512,28 @@ index 5e1b6235..b4506877 100644
else:
# TODO: have a explicit close signal and have a explicit way to
diff --git a/vllm/distributed/kv_transfer/kv_pipe/base.py b/vllm/distributed/kv_transfer/kv_pipe/base.py
index 40589fb3..da2829cf 100644
index 40589fb3e..a3991c39d 100644
--- a/vllm/distributed/kv_transfer/kv_pipe/base.py
+++ b/vllm/distributed/kv_transfer/kv_pipe/base.py
@@ -23,7 +23,7 @@ class KVPipeBase(ABC):
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""
This file defines an interface `KVPipeBase`
that provides an abstraction for sending and receiving tensors, or None, via
@@ -23,7 +36,7 @@ class KVPipeBase(ABC):
"""
@abstractmethod
......@@ -2314,7 +2542,7 @@ index 40589fb3..da2829cf 100644
"""Send a tensor, or None, via the pipe.
Need to support sending None -- important for error handling.
@@ -41,7 +41,7 @@ class KVPipeBase(ABC):
@@ -41,7 +54,7 @@ class KVPipeBase(ABC):
raise NotImplementedError
@abstractmethod
......@@ -2325,10 +2553,25 @@ index 40589fb3..da2829cf 100644
Returns:
diff --git a/vllm/distributed/kv_transfer/kv_pipe/dynamo_nccl_pipe.py b/vllm/distributed/kv_transfer/kv_pipe/dynamo_nccl_pipe.py
new file mode 100644
index 00000000..3ee0fa78
index 000000000..ca5345359
--- /dev/null
+++ b/vllm/distributed/kv_transfer/kv_pipe/dynamo_nccl_pipe.py
@@ -0,0 +1,124 @@
@@ -0,0 +1,139 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+import typing
......@@ -2454,10 +2697,28 @@ index 00000000..3ee0fa78
+ # dtype = getattr(torch, dtype)
+ self._receive_tensor(tensor_id, rank)
diff --git a/vllm/distributed/kv_transfer/kv_pipe/pynccl_pipe.py b/vllm/distributed/kv_transfer/kv_pipe/pynccl_pipe.py
index 7aa53d07..f5dd50b7 100644
index 7aa53d07a..8fb256aff 100644
--- a/vllm/distributed/kv_transfer/kv_pipe/pynccl_pipe.py
+++ b/vllm/distributed/kv_transfer/kv_pipe/pynccl_pipe.py
@@ -45,33 +45,33 @@ class PyNcclPipe(KVPipeBase):
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""
This module implements a PyNccl pipe for sending and receiving
Optional[torch.Tensor] between distributed ranks with advanced
@@ -45,33 +58,33 @@ class PyNcclPipe(KVPipeBase):
METADATA_DTYPE = torch.int64
def __init__(self,
......@@ -2497,7 +2758,7 @@ index 7aa53d07..f5dd50b7 100644
# transportation-related variables
self.transport_thread: Optional[ThreadPoolExecutor] = None
@@ -145,16 +145,16 @@ class PyNcclPipe(KVPipeBase):
@@ -145,16 +158,16 @@ class PyNcclPipe(KVPipeBase):
dtype=metadata["dtype"],
device=self.device)
......@@ -2517,7 +2778,7 @@ index 7aa53d07..f5dd50b7 100644
"""
Receive the metadata dictionary from the target rank.
@@ -162,9 +162,9 @@ class PyNcclPipe(KVPipeBase):
@@ -162,9 +175,9 @@ class PyNcclPipe(KVPipeBase):
- metadata: A dictionary with keys "dtype" and "shape" describing
the tensor.
"""
......@@ -2529,7 +2790,7 @@ index 7aa53d07..f5dd50b7 100644
"""
The actual implementation of sending the tensor and its metadata to the
target rank.
@@ -174,12 +174,12 @@ class PyNcclPipe(KVPipeBase):
@@ -174,12 +187,12 @@ class PyNcclPipe(KVPipeBase):
being sent.
"""
metadata = self._make_metadata(tensor)
......@@ -2545,7 +2806,7 @@ index 7aa53d07..f5dd50b7 100644
"""
The actual implementation of receiving a tensor and its metadata from
the target rank.
@@ -187,21 +187,22 @@ class PyNcclPipe(KVPipeBase):
@@ -187,21 +200,22 @@ class PyNcclPipe(KVPipeBase):
Returns:
- buffer: The received tensor, or None if no tensor is received.
"""
......@@ -2572,7 +2833,7 @@ index 7aa53d07..f5dd50b7 100644
with self.buffer_size_lock:
self.buffer_size -= tensor_size
@@ -220,7 +221,7 @@ class PyNcclPipe(KVPipeBase):
@@ -220,7 +234,7 @@ class PyNcclPipe(KVPipeBase):
logger.debug("KV cache transfer pipe is full. Waiting...")
time.sleep(0.05)
......@@ -2581,7 +2842,7 @@ index 7aa53d07..f5dd50b7 100644
"""
Sends a tensor and its metadata to the destination rank in a
non-blocking way.
@@ -228,6 +229,7 @@ class PyNcclPipe(KVPipeBase):
@@ -228,6 +242,7 @@ class PyNcclPipe(KVPipeBase):
Parameters:
- tensor: The tensor to send, or None if no tensor is being sent.
"""
......@@ -2589,7 +2850,7 @@ index 7aa53d07..f5dd50b7 100644
if self.transport_thread is None:
self.transport_thread = ThreadPoolExecutor(max_workers=1)
@@ -241,32 +243,39 @@ class PyNcclPipe(KVPipeBase):
@@ -241,32 +256,39 @@ class PyNcclPipe(KVPipeBase):
with self.buffer_size_lock:
self.buffer_size += tensor_size
......@@ -2644,10 +2905,28 @@ index 7aa53d07..f5dd50b7 100644
def close(self):
"""
diff --git a/vllm/distributed/kv_transfer/kv_transfer_agent.py b/vllm/distributed/kv_transfer/kv_transfer_agent.py
index 1e80e0bd..cd90206f 100644
index 1e80e0bd7..f06c7a5f6 100644
--- a/vllm/distributed/kv_transfer/kv_transfer_agent.py
+++ b/vllm/distributed/kv_transfer/kv_transfer_agent.py
@@ -35,6 +35,7 @@ class KVTransferAgent:
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""A centralized entrypoint to perform distributed KV cache transfer.
This implementation is a shim wrapper on two APIs exposed by `kv_connector`:
@@ -35,6 +48,7 @@ class KVTransferAgent:
rank: int,
local_rank: int,
config: "VllmConfig",
......@@ -2655,7 +2934,7 @@ index 1e80e0bd..cd90206f 100644
):
self.config = config
@@ -47,7 +48,7 @@ class KVTransferAgent:
@@ -47,7 +61,7 @@ class KVTransferAgent:
"TransferAgent should only be used when kv_connector is set."
self.connector = KVConnectorFactory.create_connector(
......@@ -2665,10 +2944,28 @@ index 1e80e0bd..cd90206f 100644
def send_kv_caches_and_hidden_states(
self,
diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py
index 321902d1..b8937ef8 100644
index 321902d11..03409899e 100644
--- a/vllm/distributed/parallel_state.py
+++ b/vllm/distributed/parallel_state.py
@@ -1085,7 +1085,8 @@ def ensure_kv_transfer_initialized(vllm_config: "VllmConfig") -> None:
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
# Copyright 2023 The vLLM team.
# Adapted from
@@ -1085,7 +1098,8 @@ def ensure_kv_transfer_initialized(vllm_config: "VllmConfig") -> None:
_KV_TRANSFER = kv_transfer.KVTransferAgent(
rank=get_world_group().rank,
local_rank=get_world_group().local_rank,
......@@ -2679,10 +2976,24 @@ index 321902d1..b8937ef8 100644
def ensure_model_parallel_initialized(
diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py
index d82d9ad9..03896aa6 100644
index d82d9ad9d..61c1e429d 100644
--- a/vllm/engine/llm_engine.py
+++ b/vllm/engine/llm_engine.py
@@ -2,13 +2,17 @@
@@ -1,14 +1,31 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import copy
import time
......@@ -2701,7 +3012,7 @@ index d82d9ad9..03896aa6 100644
from typing import Sequence as GenericSequence
from typing import Set, Type, Union, cast, overload
@@ -60,6 +64,9 @@ from vllm.usage.usage_lib import (UsageContext, is_usage_stats_enabled,
@@ -60,6 +77,9 @@ from vllm.usage.usage_lib import (UsageContext, is_usage_stats_enabled,
usage_message)
from vllm.utils import Counter, Device, deprecate_kwargs, weak_bind
from vllm.version import __version__ as VLLM_VERSION
......@@ -2711,7 +3022,7 @@ index d82d9ad9..03896aa6 100644
logger = init_logger(__name__)
_LOCAL_LOGGING_INTERVAL_SEC = 5
@@ -90,7 +97,7 @@ class OutputData(NamedTuple):
@@ -90,7 +110,7 @@ class OutputData(NamedTuple):
# outputs from multiple steps.
is_first_step_output: Optional[bool]
skip: List[int]
......@@ -2720,7 +3031,7 @@ index d82d9ad9..03896aa6 100644
class SchedulerContext:
@@ -104,11 +111,14 @@ class SchedulerContext:
@@ -104,11 +124,14 @@ class SchedulerContext:
self.multi_step_stream_outputs: bool = multi_step_stream_outputs
......@@ -2736,7 +3047,7 @@ index d82d9ad9..03896aa6 100644
self.output_queue.append(
OutputData(outputs=outputs,
seq_group_metadata_list=seq_group_metadata_list,
@@ -116,7 +126,9 @@ class SchedulerContext:
@@ -116,7 +139,9 @@ class SchedulerContext:
is_async=is_async,
is_last_step=is_last_step,
is_first_step_output=is_first_step_output,
......@@ -2747,7 +3058,7 @@ index d82d9ad9..03896aa6 100644
class LLMEngine:
@@ -348,7 +360,7 @@ class LLMEngine:
@@ -348,7 +373,7 @@ class LLMEngine:
# GPU and CPU blocks, which are profiled in the distributed executor.
self.scheduler = [
Scheduler(
......@@ -2756,7 +3067,7 @@ index d82d9ad9..03896aa6 100644
self.parallel_config.pipeline_parallel_size,
self.async_callbacks[v_id]
if self.model_config.use_async_output_proc else None)
@@ -405,6 +417,40 @@ class LLMEngine:
@@ -405,6 +430,40 @@ class LLMEngine:
self.seq_id_to_seq_group: Dict[str, SequenceGroupBase] = {}
......@@ -2797,7 +3108,7 @@ index d82d9ad9..03896aa6 100644
def _initialize_kv_caches(self) -> None:
"""Initialize the KV cache in the worker(s).
@@ -500,6 +546,8 @@ class LLMEngine:
@@ -500,6 +559,8 @@ class LLMEngine:
# Shutdown model executor when engine is garbage collected
# Use getattr since __init__ can fail before the field is set
if model_executor := getattr(self, "model_executor", None):
......@@ -2806,7 +3117,7 @@ index d82d9ad9..03896aa6 100644
model_executor.shutdown()
def get_tokenizer_group(
@@ -552,11 +600,14 @@ class LLMEngine:
@@ -552,11 +613,14 @@ class LLMEngine:
prompt_adapter_request: Optional[PromptAdapterRequest],
trace_headers: Optional[Mapping[str, str]] = None,
priority: int = 0,
......@@ -2821,7 +3132,7 @@ index d82d9ad9..03896aa6 100644
ParallelSampleSequenceGroup.add_request(
request_id,
self,
@@ -574,6 +625,8 @@ class LLMEngine:
@@ -574,6 +638,8 @@ class LLMEngine:
# Create the sequences.
block_size = self.cache_config.block_size
seq_id = next(self.seq_counter)
......@@ -2830,7 +3141,7 @@ index d82d9ad9..03896aa6 100644
eos_token_id = self.input_preprocessor.get_eos_token_id(lora_request)
if is_encoder_decoder_inputs(processed_inputs):
@@ -584,7 +637,7 @@ class LLMEngine:
@@ -584,7 +650,7 @@ class LLMEngine:
encoder_inputs = None
seq = Sequence(seq_id, decoder_inputs, block_size, eos_token_id,
......@@ -2839,7 +3150,7 @@ index d82d9ad9..03896aa6 100644
encoder_seq = (None if encoder_inputs is None else Sequence(
seq_id, encoder_inputs, block_size, eos_token_id, lora_request,
@@ -601,8 +654,12 @@ class LLMEngine:
@@ -601,8 +667,12 @@ class LLMEngine:
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
encoder_seq=encoder_seq,
......@@ -2853,7 +3164,7 @@ index d82d9ad9..03896aa6 100644
seq_group = self._create_sequence_group_with_pooling(
request_id,
seq,
@@ -673,6 +730,7 @@ class LLMEngine:
@@ -673,6 +743,7 @@ class LLMEngine:
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
......@@ -2861,7 +3172,7 @@ index d82d9ad9..03896aa6 100644
*,
inputs: Optional[PromptType] = None, # DEPRECATED
) -> None:
@@ -765,6 +823,7 @@ class LLMEngine:
@@ -765,6 +836,7 @@ class LLMEngine:
prompt_adapter_request=prompt_adapter_request,
trace_headers=trace_headers,
priority=priority,
......@@ -2869,7 +3180,7 @@ index d82d9ad9..03896aa6 100644
)
def _validate_token_prompt(self, prompt: PromptType,
@@ -799,6 +858,7 @@ class LLMEngine:
@@ -799,6 +871,7 @@ class LLMEngine:
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
encoder_seq: Optional[Sequence] = None,
priority: int = 0,
......@@ -2877,7 +3188,7 @@ index d82d9ad9..03896aa6 100644
) -> SequenceGroup:
"""Creates a SequenceGroup with SamplingParams."""
max_logprobs = self.get_model_config().max_logprobs
@@ -829,7 +889,9 @@ class LLMEngine:
@@ -829,7 +902,9 @@ class LLMEngine:
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
encoder_seq=encoder_seq,
......@@ -2888,7 +3199,7 @@ index d82d9ad9..03896aa6 100644
return seq_group
@@ -995,11 +1057,11 @@ class LLMEngine:
@@ -995,11 +1070,11 @@ class LLMEngine:
# When we process only one request, no pop is required
# (since later we will process all of the rest)
(outputs, seq_group_metadata_list, scheduler_outputs, is_async,
......@@ -2902,7 +3213,7 @@ index d82d9ad9..03896aa6 100644
# Sanity check
assert len(seq_group_metadata_list) == len(
@@ -1325,15 +1387,55 @@ class LLMEngine:
@@ -1325,15 +1400,55 @@ class LLMEngine:
# Clear outputs for each new scheduler iteration
ctx.request_outputs.clear()
......@@ -2960,7 +3271,7 @@ index d82d9ad9..03896aa6 100644
ctx.seq_group_metadata_list = seq_group_metadata_list
ctx.scheduler_outputs = scheduler_outputs
@@ -1383,9 +1485,46 @@ class LLMEngine:
@@ -1383,9 +1498,46 @@ class LLMEngine:
execute_model_req.async_callback = self.async_callbacks[
virtual_engine]
......@@ -3009,7 +3320,7 @@ index d82d9ad9..03896aa6 100644
# We need to do this here so that last step's sampled_token_ids can
# be passed to the next iteration for PP.
if self.scheduler_config.is_multi_step:
@@ -1396,7 +1535,26 @@ class LLMEngine:
@@ -1396,7 +1548,26 @@ class LLMEngine:
if len(ctx.output_queue) > 0:
self._process_model_outputs(ctx=ctx)
# No outputs in this case
......@@ -3037,7 +3348,7 @@ index d82d9ad9..03896aa6 100644
# Finish the current step for all the sequence groups.
if self.scheduler_config.is_multi_step:
@@ -1456,7 +1614,7 @@ class LLMEngine:
@@ -1456,7 +1627,7 @@ class LLMEngine:
# queued control plane messages, such as add/remove lora adapters.
logger.debug("Stopping remote worker execution loop.")
self.model_executor.stop_remote_worker_execution_loop()
......@@ -3047,10 +3358,28 @@ index d82d9ad9..03896aa6 100644
def _has_remaining_steps(
diff --git a/vllm/engine/multiprocessing/__init__.py b/vllm/engine/multiprocessing/__init__.py
index 3cf1850e..ae006579 100644
index 3cf1850ee..d20a5f20b 100644
--- a/vllm/engine/multiprocessing/__init__.py
+++ b/vllm/engine/multiprocessing/__init__.py
@@ -14,13 +14,17 @@ from vllm.outputs import RequestOutput
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import uuid
from dataclasses import dataclass, field
@@ -14,13 +27,17 @@ from vllm.outputs import RequestOutput
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import SamplingParams
from vllm.utils import deprecate_kwargs
......@@ -3069,7 +3398,7 @@ index 3cf1850e..ae006579 100644
class MQEngineDeadError(RuntimeError):
@@ -36,6 +40,7 @@ class RPCProcessRequest:
@@ -36,6 +53,7 @@ class RPCProcessRequest:
trace_headers: Optional[Mapping[str, str]] = None
prompt_adapter_request: Optional[PromptAdapterRequest] = None
priority: int = 0
......@@ -3077,7 +3406,7 @@ index 3cf1850e..ae006579 100644
@overload
def __init__(
@@ -78,6 +83,7 @@ class RPCProcessRequest:
@@ -78,6 +96,7 @@ class RPCProcessRequest:
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
......@@ -3085,7 +3414,7 @@ index 3cf1850e..ae006579 100644
*,
inputs: Optional[PromptType] = None, # DEPRECATED
) -> None:
@@ -95,7 +101,7 @@ class RPCProcessRequest:
@@ -95,7 +114,7 @@ class RPCProcessRequest:
self.trace_headers = trace_headers
self.prompt_adapter_request = prompt_adapter_request
self.priority = priority
......@@ -3094,7 +3423,7 @@ index 3cf1850e..ae006579 100644
@dataclass
class RPCError:
@@ -116,7 +122,7 @@ class RPCStartupRequest(Enum):
@@ -116,7 +135,7 @@ class RPCStartupRequest(Enum):
@dataclass
class RPCStartupResponse:
tracing_enabled: bool
......@@ -3103,7 +3432,7 @@ index 3cf1850e..ae006579 100644
class RPCUProfileRequest(Enum):
START_PROFILE = 1
@@ -157,3 +163,13 @@ def ENGINE_DEAD_ERROR(
@@ -157,3 +176,13 @@ def ENGINE_DEAD_ERROR(
return MQEngineDeadError(
"Engine loop is not running. Inspect the stacktrace to "
f"find the original error: {repr(error)}.")
......@@ -3118,10 +3447,28 @@ index 3cf1850e..ae006579 100644
+ gpu_cache_usage_perc: float
+ gpu_prefix_cache_hit_rate: float
diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py
index 85b5f31e..05030292 100644
index 85b5f31e3..c53b9eced 100644
--- a/vllm/engine/multiprocessing/client.py
+++ b/vllm/engine/multiprocessing/client.py
@@ -8,6 +8,7 @@ from typing import (Any, AsyncGenerator, Dict, Iterator, List, Mapping,
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import asyncio
import copy
@@ -8,6 +21,7 @@ from typing import (Any, AsyncGenerator, Dict, Iterator, List, Mapping,
Optional, Union, cast, overload)
import cloudpickle
......@@ -3129,7 +3476,7 @@ index 85b5f31e..05030292 100644
import psutil
import zmq
import zmq.asyncio
@@ -19,20 +20,23 @@ from vllm import PoolingParams
@@ -19,20 +33,23 @@ from vllm import PoolingParams
from vllm.config import DecodingConfig, ModelConfig, VllmConfig
from vllm.core.scheduler import SchedulerOutputs
from vllm.engine.arg_utils import AsyncEngineArgs
......@@ -3156,7 +3503,7 @@ index 85b5f31e..05030292 100644
from vllm.engine.protocol import EngineClient
# yapf: enable
from vllm.envs import VLLM_RPC_TIMEOUT
@@ -46,6 +50,8 @@ from vllm.prompt_adapter.request import PromptAdapterRequest
@@ -46,6 +63,8 @@ from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import SamplingParams
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.utils import deprecate_kwargs
......@@ -3165,7 +3512,7 @@ index 85b5f31e..05030292 100644
logger = init_logger(__name__)
@@ -91,6 +97,7 @@ class MQLLMEngineClient(EngineClient):
@@ -91,6 +110,7 @@ class MQLLMEngineClient(EngineClient):
self._errored_with: Optional[BaseException] = None
# Get the configs.
......@@ -3173,7 +3520,7 @@ index 85b5f31e..05030292 100644
self.model_config = engine_config.model_config
self.decoding_config = engine_config.decoding_config
@@ -115,6 +122,10 @@ class MQLLMEngineClient(EngineClient):
@@ -115,6 +135,10 @@ class MQLLMEngineClient(EngineClient):
self.heartbeat_socket: Socket = self.context.socket(zmq.constants.PULL)
self.heartbeat_socket.connect(f"{ipc_path}{IPC_HEALTH_EXT}")
......@@ -3184,7 +3531,7 @@ index 85b5f31e..05030292 100644
# IPC path for the data socket.
self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}"
@@ -129,8 +140,27 @@ class MQLLMEngineClient(EngineClient):
@@ -129,8 +153,27 @@ class MQLLMEngineClient(EngineClient):
# Loop to check health of the LLMEngine periodically.
# Started after the MQLLMEngine is ready.
self.health_loop: Optional[asyncio.Task] = None
......@@ -3212,7 +3559,7 @@ index 85b5f31e..05030292 100644
@staticmethod
def is_unsupported_config(engine_args: AsyncEngineArgs):
# Pipeline parallel not yet supported
@@ -180,6 +210,61 @@ class MQLLMEngineClient(EngineClient):
@@ -180,6 +223,61 @@ class MQLLMEngineClient(EngineClient):
except Exception as e:
self._set_errored(e)
......@@ -3274,7 +3621,7 @@ index 85b5f31e..05030292 100644
async def run_output_handler_loop(self):
"""Get RequestOutputs from Engine and stream to Request Queues"""
@@ -278,12 +363,26 @@ class MQLLMEngineClient(EngineClient):
@@ -278,12 +376,26 @@ class MQLLMEngineClient(EngineClient):
# Wait until server is ready.
response = await self._wait_for_server_rpc(socket)
......@@ -3301,7 +3648,7 @@ index 85b5f31e..05030292 100644
def close(self):
"""Destroy the ZeroMQ Context."""
@@ -293,6 +392,8 @@ class MQLLMEngineClient(EngineClient):
@@ -293,6 +405,8 @@ class MQLLMEngineClient(EngineClient):
# Cancel background tasks.
if self.health_loop is not None:
self.health_loop.cancel()
......@@ -3310,7 +3657,7 @@ index 85b5f31e..05030292 100644
if self.output_loop is not None:
self.output_loop.cancel()
@@ -415,6 +516,9 @@ class MQLLMEngineClient(EngineClient):
@@ -415,6 +529,9 @@ class MQLLMEngineClient(EngineClient):
"""
if self._errored_with is not None:
raise self._errored_with
......@@ -3320,7 +3667,7 @@ index 85b5f31e..05030292 100644
@property
def is_running(self) -> bool:
@@ -473,6 +577,7 @@ class MQLLMEngineClient(EngineClient):
@@ -473,6 +590,7 @@ class MQLLMEngineClient(EngineClient):
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
......@@ -3328,7 +3675,7 @@ index 85b5f31e..05030292 100644
*,
inputs: Optional[PromptType] = None # DEPRECATED
) -> AsyncGenerator[RequestOutput, None]:
@@ -502,7 +607,8 @@ class MQLLMEngineClient(EngineClient):
@@ -502,7 +620,8 @@ class MQLLMEngineClient(EngineClient):
return self._process_request(prompt, sampling_params, request_id,
lora_request, trace_headers,
......@@ -3338,7 +3685,7 @@ index 85b5f31e..05030292 100644
@overload
def encode(
@@ -586,6 +692,7 @@ class MQLLMEngineClient(EngineClient):
@@ -586,6 +705,7 @@ class MQLLMEngineClient(EngineClient):
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
......@@ -3346,7 +3693,7 @@ index 85b5f31e..05030292 100644
) -> Union[AsyncGenerator[RequestOutput, None], AsyncGenerator[
PoolingRequestOutput, None]]:
"""Send an RPCGenerateRequest to the RPCServer and stream responses."""
@@ -630,6 +737,12 @@ class MQLLMEngineClient(EngineClient):
@@ -630,6 +750,12 @@ class MQLLMEngineClient(EngineClient):
else:
lp_bytes = None
......@@ -3359,7 +3706,7 @@ index 85b5f31e..05030292 100644
request_bytes = pickle.dumps(
RPCProcessRequest(
prompt=prompt,
@@ -639,11 +752,11 @@ class MQLLMEngineClient(EngineClient):
@@ -639,11 +765,11 @@ class MQLLMEngineClient(EngineClient):
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority,
......@@ -3373,7 +3720,7 @@ index 85b5f31e..05030292 100644
await self.input_socket.send_multipart(parts, copy=False)
# 4) Stream the RequestOutputs from the output queue. Note
@@ -705,3 +818,6 @@ class MQLLMEngineClient(EngineClient):
@@ -705,3 +831,6 @@ class MQLLMEngineClient(EngineClient):
# Raise on error, otherwise happily return None
if isinstance(request_output, BaseException):
raise request_output
......@@ -3381,10 +3728,25 @@ index 85b5f31e..05030292 100644
+ def set_metrics_publisher(self, metrics_publisher):
+ self.metrics_publisher = metrics_publisher
diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py
index a0dd7958..c82bc15b 100644
index a0dd79586..ea0d2cd68 100644
--- a/vllm/engine/multiprocessing/engine.py
+++ b/vllm/engine/multiprocessing/engine.py
@@ -3,35 +3,115 @@
@@ -1,37 +1,130 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import pickle
import signal
from contextlib import contextmanager
......@@ -3505,7 +3867,7 @@ index a0dd7958..c82bc15b 100644
class MQLLMEngine:
"""A multiprocessing wrapper for :class:`LLMEngine`.
@@ -94,12 +174,37 @@ class MQLLMEngine:
@@ -94,12 +187,37 @@ class MQLLMEngine:
self.heartbeat_socket = self.ctx.socket(zmq.constants.PUSH)
self.heartbeat_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}")
......@@ -3543,7 +3905,7 @@ index a0dd7958..c82bc15b 100644
@property
def dead_error(self) -> BaseException:
if self._errored_with is not None:
@@ -171,8 +276,17 @@ class MQLLMEngine:
@@ -171,8 +289,17 @@ class MQLLMEngine:
# Handle the query from the Client.
if request == RPCStartupRequest.IS_SERVER_READY:
tracing_enabled = self.engine.is_tracing_enabled()
......@@ -3563,7 +3925,7 @@ index a0dd7958..c82bc15b 100644
except Exception as e:
response = e
@@ -185,6 +299,7 @@ class MQLLMEngine:
@@ -185,6 +312,7 @@ class MQLLMEngine:
while True:
if not self.engine.has_unfinished_requests():
......@@ -3571,7 +3933,7 @@ index a0dd7958..c82bc15b 100644
# Poll until there is work to do.
while self.input_socket.poll(timeout=POLLING_TIMEOUT_MS) == 0:
# When there's no work, check on engine health and send
@@ -220,6 +335,13 @@ class MQLLMEngine:
@@ -220,6 +348,13 @@ class MQLLMEngine:
def handle_new_input(self):
"""Handle new input from the socket"""
try:
......@@ -3585,7 +3947,7 @@ index a0dd7958..c82bc15b 100644
while self.input_socket.poll(timeout=0) != 0:
frames = self.input_socket.recv_multipart(copy=False)
request = pickle.loads(frames[0].buffer)
@@ -262,6 +384,11 @@ class MQLLMEngine:
@@ -262,6 +397,11 @@ class MQLLMEngine:
self._send_outputs(rpc_err)
try:
......@@ -3597,7 +3959,7 @@ index a0dd7958..c82bc15b 100644
self.engine.add_request(
request_id=request_id,
prompt=request.prompt,
@@ -269,7 +396,9 @@ class MQLLMEngine:
@@ -269,7 +409,9 @@ class MQLLMEngine:
lora_request=request.lora_request,
trace_headers=request.trace_headers,
prompt_adapter_request=request.prompt_adapter_request,
......@@ -3609,10 +3971,28 @@ index a0dd7958..c82bc15b 100644
if self.log_requests:
logger.info("Added request %s.", request.request_id)
diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py
index 107220d5..c716f75f 100644
index 107220d54..e0e0590b6 100644
--- a/vllm/entrypoints/openai/serving_chat.py
+++ b/vllm/entrypoints/openai/serving_chat.py
@@ -34,6 +34,7 @@ from vllm.sampling_params import BeamSearchParams, SamplingParams
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import asyncio
import json
@@ -34,6 +47,7 @@ from vllm.sampling_params import BeamSearchParams, SamplingParams
from vllm.sequence import Logprob
from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer
from vllm.transformers_utils.tokenizers import maybe_serialize_tool_calls
......@@ -3620,7 +4000,7 @@ index 107220d5..c716f75f 100644
logger = init_logger(__name__)
@@ -112,6 +113,7 @@ class OpenAIServingChat(OpenAIServing):
@@ -112,6 +126,7 @@ class OpenAIServingChat(OpenAIServing):
self,
request: ChatCompletionRequest,
raw_request: Optional[Request] = None,
......@@ -3628,7 +4008,7 @@ index 107220d5..c716f75f 100644
) -> Union[AsyncGenerator[str, None], ChatCompletionResponse,
ErrorResponse]:
"""
@@ -243,6 +245,7 @@ class OpenAIServingChat(OpenAIServing):
@@ -243,6 +258,7 @@ class OpenAIServingChat(OpenAIServing):
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=request.priority,
......@@ -3637,10 +4017,28 @@ index 107220d5..c716f75f 100644
generators.append(generator)
diff --git a/vllm/envs.py b/vllm/envs.py
index 745b068b..0ae63d9b 100644
index 745b068b7..0f1a022fb 100644
--- a/vllm/envs.py
+++ b/vllm/envs.py
@@ -87,6 +87,10 @@ if TYPE_CHECKING:
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import os
import tempfile
@@ -87,6 +100,10 @@ if TYPE_CHECKING:
VLLM_ENABLE_MOE_ALIGN_BLOCK_SIZE_TRITON: bool = False
VLLM_RAY_PER_WORKER_GPUS: float = 1.0
VLLM_RAY_BUNDLE_INDICES: str = ""
......@@ -3651,7 +4049,7 @@ index 745b068b..0ae63d9b 100644
def get_default_cache_root():
@@ -572,6 +576,21 @@ environment_variables: Dict[str, Callable[[], Any]] = {
@@ -572,6 +589,21 @@ environment_variables: Dict[str, Callable[[], Any]] = {
# models the alignment is already naturally aligned to 256 bytes.
"VLLM_CUDA_MEM_ALIGN_KV_CACHE":
lambda: bool(int(os.getenv("VLLM_CUDA_MEM_ALIGN_KV_CACHE", "1"))),
......@@ -3674,10 +4072,28 @@ index 745b068b..0ae63d9b 100644
# end-env-vars-definition
diff --git a/vllm/model_executor/models/deepseek_v2.py b/vllm/model_executor/models/deepseek_v2.py
index 773f5abe..3eefd266 100644
index 773f5abe7..365685e13 100644
--- a/vllm/model_executor/models/deepseek_v2.py
+++ b/vllm/model_executor/models/deepseek_v2.py
@@ -585,6 +585,8 @@ class DeepseekV2Model(nn.Module):
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
# Adapted from
# https://github.com/huggingface/transformers/blob/v4.28.0/src/transformers/models/llama/modeling_llama.py
@@ -585,6 +598,8 @@ class DeepseekV2Model(nn.Module):
cache_config = vllm_config.cache_config
quant_config = vllm_config.quant_config
......@@ -3687,10 +4103,28 @@ index 773f5abe..3eefd266 100644
self.vocab_size = config.vocab_size
diff --git a/vllm/outputs.py b/vllm/outputs.py
index 786380c3..56a7cf89 100644
index 786380c37..e9c3a5e16 100644
--- a/vllm/outputs.py
+++ b/vllm/outputs.py
@@ -6,16 +6,16 @@ from typing import Dict, Generic, List, MutableSequence, Optional
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import time
from dataclasses import dataclass
@@ -6,16 +19,16 @@ from typing import Dict, Generic, List, MutableSequence, Optional
from typing import Sequence as GenericSequence
from typing import Union
......@@ -3711,10 +4145,25 @@ index 786380c3..56a7cf89 100644
"""The output data of one completion output of a request.
diff --git a/vllm/remote_prefill.py b/vllm/remote_prefill.py
new file mode 100644
index 00000000..3f9711ef
index 000000000..83f6cd575
--- /dev/null
+++ b/vllm/remote_prefill.py
@@ -0,0 +1,67 @@
@@ -0,0 +1,82 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass
+from typing import Callable, Optional, List
+from enum import Enum
......@@ -3784,10 +4233,29 @@ index 00000000..3f9711ef
+ remote_prefill_request_callback: Optional[RemotePrefillRequestCallback] = None
\ No newline at end of file
diff --git a/vllm/sampling_params.py b/vllm/sampling_params.py
index 97f9e212..1bb97b00 100644
index 97f9e2129..5849befba 100644
--- a/vllm/sampling_params.py
+++ b/vllm/sampling_params.py
@@ -83,7 +83,7 @@ class RequestOutputKind(Enum):
@@ -1,4 +1,18 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
"""Sampling parameters for text generation."""
import copy
from dataclasses import dataclass
@@ -83,7 +97,7 @@ class RequestOutputKind(Enum):
DELTA = 1
# Do not return intermediate RequestOuputs
FINAL_ONLY = 2
......@@ -3797,10 +4265,29 @@ index 97f9e212..1bb97b00 100644
class SamplingParams(
msgspec.Struct,
diff --git a/vllm/sequence.py b/vllm/sequence.py
index 534b9e60..18675d2f 100644
index 534b9e606..c33bbde1c 100644
--- a/vllm/sequence.py
+++ b/vllm/sequence.py
@@ -20,6 +20,7 @@ from vllm.multimodal import MultiModalDataDict, MultiModalPlaceholderDict
@@ -1,4 +1,18 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
"""Sequence and its related classes."""
import copy
import enum
@@ -20,6 +34,7 @@ from vllm.multimodal import MultiModalDataDict, MultiModalPlaceholderDict
from vllm.pooling_params import PoolingParams
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sampling_params import RequestOutputKind, SamplingParams
......@@ -3808,7 +4295,7 @@ index 534b9e60..18675d2f 100644
VLLM_TOKEN_ID_ARRAY_TYPE = "l"
@@ -59,13 +60,14 @@ class SequenceStatus(enum.IntEnum):
@@ -59,13 +74,14 @@ class SequenceStatus(enum.IntEnum):
"""Status of a sequence."""
WAITING = 0
RUNNING = 1
......@@ -3829,7 +4316,7 @@ index 534b9e60..18675d2f 100644
@staticmethod
def is_finished(status: "SequenceStatus") -> bool:
@@ -409,6 +411,7 @@ class Sequence:
@@ -409,6 +425,7 @@ class Sequence:
eos_token_id: Optional[int] = None,
lora_request: Optional[LoRARequest] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
......@@ -3837,7 +4324,7 @@ index 534b9e60..18675d2f 100644
) -> None:
self.seq_id = seq_id
self.inputs = SingletonInputsAdapter(inputs)
@@ -416,7 +419,7 @@ class Sequence:
@@ -416,7 +433,7 @@ class Sequence:
self.eos_token_id = eos_token_id
self.lora_request = lora_request
self.prompt_adapter_request = prompt_adapter_request
......@@ -3846,7 +4333,7 @@ index 534b9e60..18675d2f 100644
self.data = SequenceData.from_seqs(self.prompt_token_ids)
self.output_logprobs: SampleLogprobs = []
self.output_text = ""
@@ -639,6 +642,7 @@ class SequenceGroup:
@@ -639,6 +656,7 @@ class SequenceGroup:
trace_headers: OpenTelemetry trace headers.
prompt_adapter_request: Prompt Adapter request.
priority: User-defined priority of the request.
......@@ -3854,7 +4341,7 @@ index 534b9e60..18675d2f 100644
"""
def __init__(
@@ -654,6 +658,7 @@ class SequenceGroup:
@@ -654,6 +672,7 @@ class SequenceGroup:
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
......@@ -3862,7 +4349,7 @@ index 534b9e60..18675d2f 100644
) -> None:
self.request_id = request_id
self.seqs = seqs
@@ -678,7 +683,7 @@ class SequenceGroup:
@@ -678,7 +697,7 @@ class SequenceGroup:
self.encoder_seq = encoder_seq
self.trace_headers = trace_headers
self.priority = priority
......@@ -3871,7 +4358,7 @@ index 534b9e60..18675d2f 100644
self.cached_request_output = None
@property
@@ -927,6 +932,9 @@ class SequenceGroupMetadata(
@@ -927,6 +946,9 @@ class SequenceGroupMetadata(
query tokens for prefill, we don't need sampling.
token_chunk_size: The number of tokens to be processed (per sequence).
None if chunking is not required.
......@@ -3881,7 +4368,7 @@ index 534b9e60..18675d2f 100644
lora_request: LoRA request.
computed_block_nums: The block numbers that are already computed,
used in prefix caching.
@@ -966,6 +974,9 @@ class SequenceGroupMetadata(
@@ -966,6 +988,9 @@ class SequenceGroupMetadata(
cross_block_table: Optional[List[int]] = None
prompt_adapter_request: Optional[PromptAdapterRequest] = None
token_chunk_size: Optional[int] = None
......@@ -3891,7 +4378,7 @@ index 534b9e60..18675d2f 100644
### Stateful fields that are lazily defined. ###
# The number of speculative tokens adopted in this request.
@@ -1310,6 +1321,8 @@ class ExecuteModelRequest(
@@ -1310,6 +1335,8 @@ class ExecuteModelRequest(
last_sampled_token_ids: Optional[torch.Tensor] = None
# Async callback
async_callback: Optional[Callable] = None
......@@ -3901,10 +4388,28 @@ index 534b9e60..18675d2f 100644
@property
def is_first_multi_step(self) -> bool:
diff --git a/vllm/worker/model_runner.py b/vllm/worker/model_runner.py
index 12baecde..a3f2c464 100644
index 12baecde6..11034b391 100644
--- a/vllm/worker/model_runner.py
+++ b/vllm/worker/model_runner.py
@@ -1824,6 +1824,9 @@ class ModelRunner(GPUModelRunnerBase[ModelInputForGPUWithSamplingMetadata]):
@@ -1,4 +1,17 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import dataclasses
import gc
@@ -1824,6 +1837,9 @@ class ModelRunner(GPUModelRunnerBase[ModelInputForGPUWithSamplingMetadata]):
if self.vllm_config.kv_transfer_config is None:
return False
......@@ -3914,7 +4419,7 @@ index 12baecde..a3f2c464 100644
prefill_meta = model_input.attn_metadata.prefill_metadata
@@ -1849,6 +1852,9 @@ class ModelRunner(GPUModelRunnerBase[ModelInputForGPUWithSamplingMetadata]):
@@ -1849,6 +1865,9 @@ class ModelRunner(GPUModelRunnerBase[ModelInputForGPUWithSamplingMetadata]):
if self.vllm_config.kv_transfer_config is None:
return False
......@@ -3925,10 +4430,25 @@ index 12baecde..a3f2c464 100644
prefill_meta = model_input.attn_metadata.prefill_metadata
diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py
index 582aa460..876329d6 100644
index 582aa460e..0be784a40 100644
--- a/vllm/worker/worker.py
+++ b/vllm/worker/worker.py
@@ -2,7 +2,7 @@
@@ -1,8 +1,22 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
"""A GPU worker class."""
import gc
import os
......@@ -3937,7 +4457,7 @@ index 582aa460..876329d6 100644
import torch
import torch.distributed
@@ -31,6 +31,9 @@ from vllm.worker.model_runner import GPUModelRunnerBase, ModelRunner
@@ -31,6 +45,9 @@ from vllm.worker.model_runner import GPUModelRunnerBase, ModelRunner
from vllm.worker.pooling_model_runner import PoolingModelRunner
from vllm.worker.worker_base import (LocalOrDistributedWorkerBase, WorkerBase,
WorkerInput)
......@@ -3947,7 +4467,7 @@ index 582aa460..876329d6 100644
logger = init_logger(__name__)
@@ -306,6 +309,46 @@ class Worker(LocalOrDistributedWorkerBase):
@@ -306,6 +323,46 @@ class Worker(LocalOrDistributedWorkerBase):
self._init_cache_engine()
self._warm_up_model()
......@@ -3994,7 +4514,7 @@ index 582aa460..876329d6 100644
def _init_cache_engine(self):
assert self.cache_config.num_gpu_blocks is not None
self.cache_engine = [
@@ -367,6 +410,8 @@ class Worker(LocalOrDistributedWorkerBase):
@@ -367,6 +424,8 @@ class Worker(LocalOrDistributedWorkerBase):
blocks_to_copy = torch.tensor(execute_model_req.blocks_to_copy,
device=self.device,
dtype=torch.int64).view(-1, 2)
......@@ -4003,7 +4523,7 @@ index 582aa460..876329d6 100644
return WorkerInput(
num_seq_groups=num_seq_groups,
@@ -375,6 +420,12 @@ class Worker(LocalOrDistributedWorkerBase):
@@ -375,6 +434,12 @@ class Worker(LocalOrDistributedWorkerBase):
blocks_to_copy=blocks_to_copy,
virtual_engine=virtual_engine,
num_steps=num_steps,
......@@ -4017,10 +4537,29 @@ index 582aa460..876329d6 100644
@torch.inference_mode()
diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py
index 819b81fb..2891854b 100644
index 819b81fbf..7d1b1836d 100644
--- a/vllm/worker/worker_base.py
+++ b/vllm/worker/worker_base.py
@@ -9,6 +9,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
@@ -1,4 +1,18 @@
+# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
import dataclasses
import os
@@ -9,6 +23,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
import cloudpickle
import torch
import torch.nn as nn
......@@ -4028,7 +4567,7 @@ index 819b81fb..2891854b 100644
from vllm.config import (ObservabilityConfig, VllmConfig,
set_current_vllm_config)
@@ -23,6 +24,9 @@ from vllm.utils import (enable_trace_function_call_for_thread,
@@ -23,6 +38,9 @@ from vllm.utils import (enable_trace_function_call_for_thread,
from vllm.worker.model_runner_base import (BroadcastableModelInput,
ModelRunnerBase,
ModelRunnerInputBase)
......@@ -4038,7 +4577,7 @@ index 819b81fb..2891854b 100644
logger = init_logger(__name__)
@@ -53,6 +57,8 @@ class WorkerBase(ABC):
@@ -53,6 +71,8 @@ class WorkerBase(ABC):
from vllm.platforms import current_platform
self.current_platform = current_platform
......@@ -4047,7 +4586,7 @@ index 819b81fb..2891854b 100644
@abstractmethod
def init_device(self) -> None:
"""Initialize device state, such as loading the model or other on-device
@@ -216,6 +222,13 @@ class WorkerInput:
@@ -216,6 +236,13 @@ class WorkerInput:
virtual_engine: int = 0
num_steps: int = 1
......@@ -4061,7 +4600,7 @@ index 819b81fb..2891854b 100644
@classmethod
def from_broadcasted_tensor_dict(
cls: Type["WorkerInput"],
@@ -232,6 +245,12 @@ class WorkerInput:
@@ -232,6 +259,12 @@ class WorkerInput:
blocks_to_copy=tensor_dict.pop("blocks_to_copy"),
virtual_engine=tensor_dict["virtual_engine"],
num_steps=tensor_dict.pop("num_steps"),
......@@ -4074,7 +4613,7 @@ index 819b81fb..2891854b 100644
)
def as_broadcastable_tensor_dict(
@@ -246,6 +265,12 @@ class WorkerInput:
@@ -246,6 +279,12 @@ class WorkerInput:
"blocks_to_copy": self.blocks_to_copy,
"virtual_engine": self.virtual_engine,
"num_steps": self.num_steps,
......@@ -4087,7 +4626,7 @@ index 819b81fb..2891854b 100644
}
return tensor_dict
@@ -316,13 +341,16 @@ class LocalOrDistributedWorkerBase(WorkerBase):
@@ -316,13 +355,16 @@ class LocalOrDistributedWorkerBase(WorkerBase):
return None
worker_input = WorkerInput.from_broadcasted_tensor_dict(broadcast_data)
......@@ -4109,7 +4648,7 @@ index 819b81fb..2891854b 100644
def _get_driver_input_and_broadcast(
self, execute_model_req: ExecuteModelRequest
@@ -396,49 +424,88 @@ class LocalOrDistributedWorkerBase(WorkerBase):
@@ -396,49 +438,88 @@ class LocalOrDistributedWorkerBase(WorkerBase):
self.execute_worker(worker_input)
# If there is no input, we don't need to execute the model.
......@@ -4161,7 +4700,12 @@ index 819b81fb..2891854b 100644
- and self.observability_config.collect_model_execute_time):
- orig_model_execute_time = intermediate_tensors.tensors.get(
- "model_execute_time", torch.tensor(0)).item()
-
+ and self.observability_config.collect_model_execute_time
+ and output is not None):
+ for o in output:
+ o.model_execute_time = (orig_model_execute_time +
+ model_execute_time)
- output = self.model_runner.execute_model(
- model_input=model_input,
- kv_caches=self.kv_cache[worker_input.virtual_engine]
......@@ -4170,12 +4714,7 @@ index 819b81fb..2891854b 100644
- num_steps=num_steps,
- **kwargs,
- )
+ and self.observability_config.collect_model_execute_time
+ and output is not None):
+ for o in output:
+ o.model_execute_time = (orig_model_execute_time +
+ model_execute_time)
-
- model_execute_time = time.perf_counter() - start_time
- if not get_pp_group().is_last_rank:
- # output is IntermediateTensors
......
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment