Unverified Commit 5bb74904 authored by Alec's avatar Alec Committed by GitHub
Browse files

chore: bump vllm version to 0.10.2 (#3180)


Signed-off-by: default avatarAlec <aflowers@nvidia.com>
Signed-off-by: default avatarAlec <35311602+alec-flowers@users.noreply.github.com>
Signed-off-by: default avatarkrishung5 <krish@nvidia.com>
Co-authored-by: default avatarKris Hung <krish@nvidia.com>
parent f2e2e935
...@@ -12,23 +12,22 @@ ARG RELEASE_BUILD ...@@ -12,23 +12,22 @@ ARG RELEASE_BUILD
ARG ENABLE_KVBM=false ARG ENABLE_KVBM=false
ARG RUNTIME_IMAGE="nvcr.io/nvidia/cuda" ARG RUNTIME_IMAGE="nvcr.io/nvidia/cuda"
ARG RUNTIME_IMAGE_TAG="12.8.1-runtime-ubuntu24.04" ARG RUNTIME_IMAGE_TAG="12.8.1-runtime-ubuntu24.04"
ARG CUDA_VERSION="12.8"
# Make sure to update the dependency version in pyproject.toml when updating this # Make sure to update the dependency version in pyproject.toml when updating this
ARG VLLM_REF="1da94e673c257373280026f75ceb4effac80e892" # from v0.10.1.1 ARG VLLM_REF="v0.10.2"
# FlashInfer only respected when building vLLM from source, ie when VLLM_REF does not start with 'v' or for arm64 builds
ARG FLASHINF_REF="v0.3.0"
ARG TORCH_BACKEND="cu128" ARG TORCH_BACKEND="cu128"
# If left blank, then we will fallback to vLLM defaults
ARG DEEPGEMM_REF=""
# sccache configuration - inherit from base build # sccache configuration - inherit from base build
ARG USE_SCCACHE ARG USE_SCCACHE
ARG SCCACHE_BUCKET="" ARG SCCACHE_BUCKET=""
ARG SCCACHE_REGION="" ARG SCCACHE_REGION=""
# Match 0.10.1.1 vLLM release
# https://github.com/vllm-project/vllm/releases/tag/v0.10.1.1
# Pinned to commit before https://github.com/deepseek-ai/DeepGEMM/pull/112 for DeepGEMM which seems to break on H100:
# "RuntimeError: Failed: CUDA runtime error csrc/jit/kernel_runtime.hpp:108 '98'"
ARG DEEPGEMM_REF="f85ec64"
ARG FLASHINF_REF="v0.2.11"
# Define general architecture ARGs for supporting both x86 and aarch64 builds. # Define general architecture ARGs for supporting both x86 and aarch64 builds.
# ARCH: Used for package suffixes (e.g., amd64, arm64) # ARCH: Used for package suffixes (e.g., amd64, arm64)
# ARCH_ALT: Used for Rust targets, manylinux suffix (e.g., x86_64, aarch64) # ARCH_ALT: Used for Rust targets, manylinux suffix (e.g., x86_64, aarch64)
...@@ -108,6 +107,7 @@ ARG VLLM_GIT_URL ...@@ -108,6 +107,7 @@ ARG VLLM_GIT_URL
ARG DEEPGEMM_REF ARG DEEPGEMM_REF
ARG FLASHINF_REF ARG FLASHINF_REF
ARG TORCH_BACKEND ARG TORCH_BACKEND
ARG CUDA_VERSION
ARG MAX_JOBS=16 ARG MAX_JOBS=16
ENV MAX_JOBS=$MAX_JOBS ENV MAX_JOBS=$MAX_JOBS
...@@ -138,18 +138,15 @@ RUN --mount=type=bind,source=./container/deps/,target=/tmp/deps \ ...@@ -138,18 +138,15 @@ RUN --mount=type=bind,source=./container/deps/,target=/tmp/deps \
--mount=type=cache,target=/root/.cache/uv \ --mount=type=cache,target=/root/.cache/uv \
--mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \ --mount=type=secret,id=aws-key-id,env=AWS_ACCESS_KEY_ID \
--mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \ --mount=type=secret,id=aws-secret-id,env=AWS_SECRET_ACCESS_KEY \
# TODO - split vllm, DeepEP, DeepGeMM, PPLX installs
# Should be able to select how you want your build to go
cp /tmp/deps/vllm/install_vllm.sh /tmp/install_vllm.sh && \ cp /tmp/deps/vllm/install_vllm.sh /tmp/install_vllm.sh && \
chmod +x /tmp/install_vllm.sh && \ chmod +x /tmp/install_vllm.sh && \
/tmp/install_vllm.sh --editable --vllm-ref $VLLM_REF --max-jobs $MAX_JOBS --arch $ARCH --installation-dir /opt --deepgemm-ref $DEEPGEMM_REF --flashinf-ref $FLASHINF_REF --torch-backend $TORCH_BACKEND && \ /tmp/install_vllm.sh --editable --vllm-ref $VLLM_REF --max-jobs $MAX_JOBS --arch $ARCH --installation-dir /opt ${DEEPGEMM_REF:+--deepgemm-ref "$DEEPGEMM_REF"} ${FLASHINF_REF:+--flashinf-ref "$FLASHINF_REF"} --torch-backend $TORCH_BACKEND --cuda-version $CUDA_VERSION && \
/tmp/use-sccache.sh show-stats "vLLM"; /tmp/use-sccache.sh show-stats "vLLM";
ENV LD_LIBRARY_PATH=\ ENV LD_LIBRARY_PATH=\
/opt/vllm/tools/ep_kernels/ep_kernels_workspace/nvshmem_install/lib:\ /opt/vllm/tools/ep_kernels/ep_kernels_workspace/nvshmem_install/lib:\
$LD_LIBRARY_PATH $LD_LIBRARY_PATH
################################################## ##################################################
########## Runtime Image ######################## ########## Runtime Image ########################
################################################## ##################################################
......
#!/usr/bin/env bash #!/usr/bin/env bash
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License"); # This script is used to install vLLM and its dependencies
# you may not use this file except in compliance with the License. # If installing vLLM from a release tag, we will use pip to manage the install
# You may obtain a copy of the License at # Otherwise, we will use git to checkout the vLLM source code and build it from source.
# # The dependencies are installed in the following order:
# http://www.apache.org/licenses/LICENSE-2.0 # 1. vLLM
# # 2. LMCache
# Unless required by applicable law or agreed to in writing, software # 3. DeepGEMM
# distributed under the License is distributed on an "AS IS" BASIS, # 4. EP kernels
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Install vllm and wideEP kernels from a specific git reference
set -euo pipefail set -euo pipefail
# Parse arguments VLLM_REF="v0.10.2"
EDITABLE=true
# REMOVE nvshmem cherry-pick when moving to next version of vllm # Basic Configurations
VLLM_REF="1da94e673c257373280026f75ceb4effac80e892" # from v0.10.1.1 ARCH=$(uname -m)
# When updating above VLLM_REF make sure precompiled wheel file URL is correct. Run this command:
# aws s3 ls s3://vllm-wheels/${VLLM_REF}/ --region us-west-2 --no-sign-request
VLLM_PRECOMPILED_WHEEL_LOCATION="https://vllm-wheels.s3.us-west-2.amazonaws.com/${VLLM_REF}/vllm-0.10.1.1-cp38-abi3-manylinux1_x86_64.whl"
VLLM_GIT_URL="https://github.com/vllm-project/vllm.git"
MAX_JOBS=16 MAX_JOBS=16
INSTALLATION_DIR=/tmp INSTALLATION_DIR=/tmp
ARCH=$(uname -m)
DEEPGEMM_REF="f85ec64" # VLLM and Dependency Configurations
FLASHINF_REF="v0.2.11"
TORCH_BACKEND="cu128" TORCH_BACKEND="cu128"
TORCH_CUDA_ARCH_LIST="9.0;10.0" # For EP Kernels
DEEPGEMM_REF=""
CUDA_VERSION="12.8" # For DEEPGEMM
# Convert x86_64 to amd64 for consistency with Docker ARG # These flags are applicable when installing vLLM from source code
if [ "$ARCH" = "x86_64" ]; then EDITABLE=true
ARCH="amd64" VLLM_GIT_URL="https://github.com/vllm-project/vllm.git"
elif [ "$ARCH" = "aarch64" ]; then FLASHINF_REF="v0.3.0"
ARCH="arm64"
fi
while [[ $# -gt 0 ]]; do while [[ $# -gt 0 ]]; do
case $1 in case $1 in
...@@ -82,8 +73,16 @@ while [[ $# -gt 0 ]]; do ...@@ -82,8 +73,16 @@ while [[ $# -gt 0 ]]; do
TORCH_BACKEND="$2" TORCH_BACKEND="$2"
shift 2 shift 2
;; ;;
--torch-cuda-arch-list)
TORCH_CUDA_ARCH_LIST="$2"
shift 2
;;
--cuda-version)
CUDA_VERSION="$2"
shift 2
;;
-h|--help) -h|--help)
echo "Usage: $0 [--editable|--no-editable] [--vllm-ref REF] [--max-jobs NUM] [--arch ARCH] [--deepgemm-ref REF] [--flashinf-ref REF] [--torch-backend BACKEND]" echo "Usage: $0 [--editable|--no-editable] [--vllm-ref REF] [--max-jobs NUM] [--arch ARCH] [--deepgemm-ref REF] [--flashinf-ref REF] [--torch-backend BACKEND] [--torch-cuda-arch-list LIST] [--cuda-version VERSION]"
echo "Options:" echo "Options:"
echo " --editable Install vllm in editable mode (default)" echo " --editable Install vllm in editable mode (default)"
echo " --no-editable Install vllm in non-editable mode" echo " --no-editable Install vllm in non-editable mode"
...@@ -94,6 +93,8 @@ while [[ $# -gt 0 ]]; do ...@@ -94,6 +93,8 @@ while [[ $# -gt 0 ]]; do
echo " --deepgemm-ref REF Git reference for DeepGEMM (default: ${DEEPGEMM_REF})" echo " --deepgemm-ref REF Git reference for DeepGEMM (default: ${DEEPGEMM_REF})"
echo " --flashinf-ref REF Git reference for Flash Infer (default: ${FLASHINF_REF})" echo " --flashinf-ref REF Git reference for Flash Infer (default: ${FLASHINF_REF})"
echo " --torch-backend BACKEND Torch backend to use (default: ${TORCH_BACKEND})" echo " --torch-backend BACKEND Torch backend to use (default: ${TORCH_BACKEND})"
echo " --torch-cuda-arch-list LIST CUDA architectures to compile for (default: ${TORCH_CUDA_ARCH_LIST})"
echo " --cuda-version VERSION CUDA version to use (default: ${CUDA_VERSION})"
exit 0 exit 0
;; ;;
*) *)
...@@ -103,105 +104,143 @@ while [[ $# -gt 0 ]]; do ...@@ -103,105 +104,143 @@ while [[ $# -gt 0 ]]; do
esac esac
done done
# Convert x86_64 to amd64 for consistency with Docker ARG
if [ "$ARCH" = "x86_64" ]; then
ARCH="amd64"
elif [ "$ARCH" = "aarch64" ]; then
ARCH="arm64"
fi
export MAX_JOBS=$MAX_JOBS export MAX_JOBS=$MAX_JOBS
export CUDA_HOME=/usr/local/cuda export CUDA_HOME=/usr/local/cuda
echo "Installing vllm with the following configuration:" echo "=== Installing prerequisites ==="
echo " EDITABLE: $EDITABLE"
echo " VLLM_REF: $VLLM_REF"
echo " MAX_JOBS: $MAX_JOBS"
echo " ARCH: $ARCH"
echo " TORCH_BACKEND: $TORCH_BACKEND"
# Install common dependencies
uv pip install pip cuda-python uv pip install pip cuda-python
if [ "$ARCH" = "amd64" ]; then echo "\n=== Configuration Summary ==="
# LMCache installation currently fails on arm64 due to CUDA dependency issues: echo " VLLM_REF=$VLLM_REF | EDITABLE=$EDITABLE | ARCH=$ARCH"
# OSError: CUDA_HOME environment variable is not set. Please set it to your CUDA install root. echo " MAX_JOBS=$MAX_JOBS | TORCH_BACKEND=$TORCH_BACKEND | CUDA_VERSION=$CUDA_VERSION"
# TODO: Re-enable for arm64 after verifying lmcache compatibility and resolving the build issue. echo " TORCH_CUDA_ARCH_LIST=$TORCH_CUDA_ARCH_LIST"
uv pip install lmcache==0.3.3 echo " DEEPGEMM_REF=$DEEPGEMM_REF | FLASHINF_REF=$FLASHINF_REF"
fi echo " INSTALLATION_DIR=$INSTALLATION_DIR | VLLM_GIT_URL=$VLLM_GIT_URL"
# Create vllm directory and clone echo "\n=== Cloning vLLM repository ==="
mkdir -p $INSTALLATION_DIR # We need to clone to install dependencies
cd $INSTALLATION_DIR cd $INSTALLATION_DIR
git clone $VLLM_GIT_URL vllm git clone $VLLM_GIT_URL vllm
cd vllm cd vllm
git checkout $VLLM_REF git checkout $VLLM_REF
# nvshmem fix - cherry-pick commit pinning pplx version
# https://github.com/ai-dynamo/dynamo/actions/runs/17907241473/job/50910654042?pr=2969#step:8:280
# remove when moving to next version of vllm
# Configure git user for cherry-pick operation
GIT_COMMITTER_NAME="Container Build" GIT_COMMITTER_EMAIL="container@buildkitsandbox.local" git cherry-pick 906e461ed6ddccd3cc7b68fa72048d2d3fcbd72c
if [ "$ARCH" = "arm64" ]; then # TODO remove in future vLLM release, re-instate ignore torch script
echo "Installing vllm for ARM64 architecture" # https://github.com/vllm-project/vllm/pull/24729
GIT_COMMITTER_NAME="Container Build" GIT_COMMITTER_EMAIL="container@buildkitsandbox.local" git cherry-pick 740f064
echo "\n=== Installing vLLM & FlashInfer ==="
if [[ $VLLM_REF =~ ^v ]] && [ "$ARCH" = "amd64" ]; then
# VLLM_REF starts with 'v' and amd64 - use pip install with version tag
echo "Installing vLLM $VLLM_REF from PyPI..."
# Try to install specific PyTorch version first, fallback to latest nightly uv pip install vllm[flashinfer]==$VLLM_REF --torch-backend=$TORCH_BACKEND
else
# VLLM_REF does not start with 'v' or amd64 - use git checkout path
if [ "$ARCH" = "arm64" ]; then
# torch 2.8.0 doesn't have a aarch wheel for cu128, vLLM uses torch 2.8.0 nightly wheel builds to compile its aarch wheel against
# nightly can be unstable so we will not use it here
# for now we will use torch 2.7.1+cu128 but this requires a recompilation from source
echo "Building vLLM from source for ARM64 architecture..."
# Try to install specific PyTorch version first
echo "Attempting to install pinned PyTorch nightly versions..." echo "Attempting to install pinned PyTorch nightly versions..."
if ! uv pip install torch==2.7.1+cu128 torchaudio==2.7.1 torchvision==0.22.1 --index-url https://download.pytorch.org/whl; then if ! uv pip install torch==2.7.1+cu128 torchaudio==2.7.1 torchvision==0.22.1 --index-url https://download.pytorch.org/whl/cu128; then
echo "Pinned versions failed" echo "Pinned versions failed"
exit 1 exit 1
# uv pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128
fi fi
# Create constraints file to pin all PyTorch-related versions
echo "Creating constraints file to preserve PyTorch ecosystem versions..."
TORCH_VERSION=$(python -c "import torch; print(torch.__version__)")
TORCHAUDIO_VERSION=$(python -c "import torchaudio; print(torchaudio.__version__)")
TORCHVISION_VERSION=$(python -c "import torchvision; print(torchvision.__version__)")
rm -rf /tmp/torch_constraints.txt
echo "torch==$TORCH_VERSION" > /tmp/torch_constraints.txt
echo "torchaudio==$TORCHAUDIO_VERSION" >> /tmp/torch_constraints.txt
echo "torchvision==$TORCHVISION_VERSION" >> /tmp/torch_constraints.txt
echo "Pinned versions:"
echo " - torch==$TORCH_VERSION"
echo " - torchaudio==$TORCHAUDIO_VERSION"
echo " - torchvision==$TORCHVISION_VERSION"
python use_existing_torch.py python use_existing_torch.py
uv pip install -r requirements/build.txt uv pip install -c /tmp/torch_constraints.txt -r requirements/build.txt
if [ "$EDITABLE" = "true" ]; then if [ "$EDITABLE" = "true" ]; then
MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation -e . -v MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation -c /tmp/torch_constraints.txt -e . -v
else else
MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation . -v MAX_JOBS=${MAX_JOBS} uv pip install --no-build-isolation -c /tmp/torch_constraints.txt . -v
fi fi
else
echo "Installing vllm for AMD64 architecture"
echo "Attempting to install pinned OpenAI version..." echo "\n=== Installing FlashInfer from source ==="
if ! uv pip install openai==1.99.9; then cd $INSTALLATION_DIR
echo "Pinned versions failed" git clone https://github.com/flashinfer-ai/flashinfer.git --recursive
exit 1 cd flashinfer
fi git checkout $FLASHINF_REF
export VLLM_PRECOMPILED_WHEEL_LOCATION="${VLLM_PRECOMPILED_WHEEL_LOCATION}" # Install with constraints to prevent PyTorch upgrade
uv pip install -v --no-build-isolation -c /tmp/torch_constraints.txt .
else
echo "Building vLLM from source for AMD64 architecture..."
# When updating above VLLM_REF make sure precompiled wheel file URL is correct. Run this command:
# aws s3 ls s3://vllm-wheels/${VLLM_REF}/ --region us-west-2 --no-sign-request
export VLLM_PRECOMPILED_WHEEL_LOCATION="https://vllm-wheels.s3.us-west-2.amazonaws.com/${VLLM_REF}/vllm-0.10.2-cp38-abi3-manylinux1_x86_64.whl"
if [ "$EDITABLE" = "true" ]; then if [ "$EDITABLE" = "true" ]; then
uv pip install -e . --torch-backend=$TORCH_BACKEND uv pip install -e . --torch-backend=$TORCH_BACKEND
else else
uv pip install . --torch-backend=$TORCH_BACKEND uv pip install . --torch-backend=$TORCH_BACKEND
fi fi
fi
# Install ep_kernels and DeepGEMM echo "\n=== Installing FlashInfer from PyPI ==="
echo "Installing ep_kernels and DeepGEMM" uv pip install flashinfer-python==$FLASHINF_REF
cd tools/ep_kernels
TORCH_CUDA_ARCH_LIST="9.0;10.0" bash install_python_libraries.sh # These libraries aren't pinned.
cd ep_kernels_workspace
git clone https://github.com/deepseek-ai/DeepGEMM.git
cd DeepGEMM
git checkout $DEEPGEMM_REF # Pin Version
sed -i 's|git@github.com:|https://github.com/|g' .gitmodules fi
git submodule sync --recursive fi
git submodule update --init --recursive
# command for 03d0be3 echo "✓ vLLM installation completed"
python setup.py install
# new install command for post 03d0be3 echo "\n=== Installing LMCache ==="
# cat install.sh if [ "$ARCH" = "amd64" ]; then
# ./install.sh # LMCache installation currently fails on arm64 due to CUDA dependency issues:
# OSError: CUDA_HOME environment variable is not set. Please set it to your CUDA install root.
# TODO: Re-enable for arm64 after verifying lmcache compatibility and resolving the build issue.
# Alec: Likely lmcache was compiled witha different version of torch and need to install it from source for arm64
uv pip install lmcache==0.3.3
echo "✓ LMCache installed"
else
echo "⚠ Skipping LMCache on ARM64 (compatibility issues)"
fi
# Install Flash Infer echo "\n=== Installing DeepGEMM ==="
if [ "$ARCH" = "arm64" ]; then cd $INSTALLATION_DIR/vllm/tools
uv pip install flashinfer-python
if [ -n "$DEEPGEMM_REF" ]; then
bash install_deepgemm.sh --cuda-version "${CUDA_VERSION}" --ref "$DEEPGEMM_REF"
else else
cd $INSTALLATION_DIR bash install_deepgemm.sh --cuda-version "${CUDA_VERSION}"
git clone https://github.com/flashinfer-ai/flashinfer.git --recursive
cd flashinfer
git checkout $FLASHINF_REF
uv pip install -v --no-build-isolation .
fi fi
echo "✓ DeepGEMM installation completed"
echo "\n=== Installing EP Kernels (PPLX and DeepEP) ==="
cd ep_kernels/
TORCH_CUDA_ARCH_LIST="$TORCH_CUDA_ARCH_LIST" bash install_python_libraries.sh
echo "vllm installation completed successfully" echo "\n✅ All installations completed successfully!"
\ No newline at end of file
...@@ -162,7 +162,6 @@ class ChatProcessor: ...@@ -162,7 +162,6 @@ class ChatProcessor:
documents=request.documents, documents=request.documents,
chat_template_kwargs=request.chat_template_kwargs, chat_template_kwargs=request.chat_template_kwargs,
tool_parser=self.openai_serving.tool_parser, tool_parser=self.openai_serving.tool_parser,
truncate_prompt_tokens=request.truncate_prompt_tokens,
add_special_tokens=request.add_special_tokens, add_special_tokens=request.add_special_tokens,
) )
...@@ -288,7 +287,6 @@ class CompletionsProcessor: ...@@ -288,7 +287,6 @@ class CompletionsProcessor:
request, request,
self.tokenizer, self.tokenizer,
input_or_inputs=request.prompt, input_or_inputs=request.prompt,
truncate_prompt_tokens=request.truncate_prompt_tokens,
add_special_tokens=request.add_special_tokens, add_special_tokens=request.add_special_tokens,
) )
......
...@@ -22,6 +22,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator ...@@ -22,6 +22,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic_core import core_schema from pydantic_core import core_schema
from typing_extensions import NotRequired from typing_extensions import NotRequired
from vllm.inputs.data import TokensPrompt from vllm.inputs.data import TokensPrompt
from vllm.multimodal.inputs import MultiModalUUIDDict # noqa: F401
from vllm.outputs import CompletionOutput from vllm.outputs import CompletionOutput
from vllm.sampling_params import SamplingParams from vllm.sampling_params import SamplingParams
from vllm.sequence import PromptLogprobs, RequestMetrics from vllm.sequence import PromptLogprobs, RequestMetrics
......
...@@ -272,6 +272,7 @@ impl KvEventPublisher { ...@@ -272,6 +272,7 @@ impl KvEventPublisher {
lora_id: u64, lora_id: u64,
parent_hash: Option<i64>, parent_hash: Option<i64>,
) -> PyResult<()> { ) -> PyResult<()> {
let block_hashes_u64: Vec<u64> = block_hashes.iter().map(|&h| h as u64).collect();
let event = KvCacheEvent { let event = KvCacheEvent {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
...@@ -280,7 +281,7 @@ impl KvEventPublisher { ...@@ -280,7 +281,7 @@ impl KvEventPublisher {
self.kv_block_size as u32, self.kv_block_size as u32,
&token_ids, &token_ids,
&num_block_tokens, &num_block_tokens,
&block_hashes, &block_hashes_u64,
lora_id, lora_id,
&self.warning_count, &self.warning_count,
), ),
...@@ -292,8 +293,8 @@ impl KvEventPublisher { ...@@ -292,8 +293,8 @@ impl KvEventPublisher {
fn publish_removed(&self, _py: Python, event_id: u64, block_hashes: Vec<i64>) -> PyResult<()> { fn publish_removed(&self, _py: Python, event_id: u64, block_hashes: Vec<i64>) -> PyResult<()> {
let block_hashes: Vec<ExternalSequenceBlockHash> = block_hashes let block_hashes: Vec<ExternalSequenceBlockHash> = block_hashes
.iter() .into_iter()
.map(|&h| ExternalSequenceBlockHash::from(h)) .map(ExternalSequenceBlockHash::from)
.collect(); .collect();
let event = KvCacheEvent { let event = KvCacheEvent {
event_id, event_id,
......
...@@ -277,10 +277,16 @@ impl RadixTree { ...@@ -277,10 +277,16 @@ impl RadixTree {
let mut scores = OverlapScores::new(); let mut scores = OverlapScores::new();
let mut current = self.root.clone(); let mut current = self.root.clone();
let now = Instant::now(); let now = Instant::now();
for block_hash in sequence {
tracing::trace!(
"RadixTree::find_matches: looking for sequence={:?}",
sequence.iter().map(|h| h.0).collect::<Vec<_>>()
);
for (idx, block_hash) in sequence.iter().enumerate() {
let next_block = { let next_block = {
let current_borrow = current.borrow(); let current_borrow = current.borrow();
current_borrow.children.get(&block_hash).cloned() current_borrow.children.get(block_hash).cloned()
}; };
if let Some(block) = next_block { if let Some(block) = next_block {
scores.update_scores(&block.borrow().workers); scores.update_scores(&block.borrow().workers);
...@@ -305,10 +311,17 @@ impl RadixTree { ...@@ -305,10 +311,17 @@ impl RadixTree {
current = block; current = block;
} else { } else {
tracing::trace!(
"RadixTree::find_matches: block not found at index {} for hash {}",
idx,
block_hash.0
);
break; break;
} }
} }
tracing::trace!("RadixTree::find_matches: final scores={:?}", scores.scores);
scores scores
} }
...@@ -320,7 +333,7 @@ impl RadixTree { ...@@ -320,7 +333,7 @@ impl RadixTree {
pub fn apply_event(&mut self, event: RouterEvent) -> Result<(), KvCacheEventError> { pub fn apply_event(&mut self, event: RouterEvent) -> Result<(), KvCacheEventError> {
let (worker_id, event) = (event.worker_id, event.event); let (worker_id, event) = (event.worker_id, event.event);
let (id, op) = (event.event_id, event.data); let (id, op) = (event.event_id, event.data);
tracing::trace!(id, "Store operation: {:?}", op); tracing::trace!(id, "RadixTree::apply_event: Store operation: {:?}", op);
let worker_lookup = self.lookup.entry(worker_id).or_default(); let worker_lookup = self.lookup.entry(worker_id).or_default();
......
...@@ -28,6 +28,8 @@ use tokio_util::sync::CancellationToken; ...@@ -28,6 +28,8 @@ use tokio_util::sync::CancellationToken;
use rmp_serde as rmps; use rmp_serde as rmps;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use serde::de::{self, Deserializer, IgnoredAny, MapAccess, SeqAccess, Visitor};
use std::fmt;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration; use std::time::Duration;
use zeromq::{Socket, SocketRecv, SubSocket}; use zeromq::{Socket, SocketRecv, SubSocket};
...@@ -363,26 +365,34 @@ fn convert_event( ...@@ -363,26 +365,34 @@ fn convert_event(
token_ids, token_ids,
block_size, block_size,
lora_id, lora_id,
..
} => { } => {
let num_block_tokens = vec![block_size as u64; block_hashes.len()]; let num_block_tokens = vec![block_size as u64; block_hashes.len()];
let block_hashes_u64: Vec<u64> = block_hashes
.into_iter()
.map(BlockHashValue::into_u64)
.collect();
KvCacheEvent { KvCacheEvent {
event_id, event_id,
data: KvCacheEventData::Stored(KvCacheStoreData { data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_block_hash.map(ExternalSequenceBlockHash::from), parent_hash: parent_block_hash
.map(BlockHashValue::into_u64)
.map(ExternalSequenceBlockHash::from),
blocks: create_stored_blocks( blocks: create_stored_blocks(
kv_block_size, kv_block_size,
&token_ids, &token_ids,
&num_block_tokens, &num_block_tokens,
&block_hashes, &block_hashes_u64,
lora_id.unwrap_or(0), lora_id.unwrap_or(0),
warning_count, warning_count,
), ),
}), }),
} }
} }
RawKvEvent::BlockRemoved { block_hashes } => { RawKvEvent::BlockRemoved { block_hashes, .. } => {
let hashes = block_hashes let hashes = block_hashes
.into_iter() .into_iter()
.map(BlockHashValue::into_u64)
.map(ExternalSequenceBlockHash::from) .map(ExternalSequenceBlockHash::from)
.collect(); .collect();
KvCacheEvent { KvCacheEvent {
...@@ -401,11 +411,18 @@ fn convert_event( ...@@ -401,11 +411,18 @@ fn convert_event(
pub fn create_stored_block_from_parts( pub fn create_stored_block_from_parts(
kv_block_size: u32, kv_block_size: u32,
block_hash: i64, block_hash: u64,
token_ids: &[u32], token_ids: &[u32],
_lora_id: u64, _lora_id: u64,
) -> KvCacheStoredBlockData { ) -> KvCacheStoredBlockData {
let tokens_hash = compute_block_hash_for_seq(token_ids, kv_block_size)[0]; let tokens_hash = compute_block_hash_for_seq(token_ids, kv_block_size)[0];
tracing::trace!(
"Creating stored block: external_block_hash={}, tokens_hash={}, token_ids={:?}, kv_block_size={}",
block_hash,
tokens_hash.0,
token_ids,
kv_block_size
);
KvCacheStoredBlockData { KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash::from(block_hash), block_hash: ExternalSequenceBlockHash::from(block_hash),
tokens_hash, tokens_hash,
...@@ -416,7 +433,7 @@ pub fn create_stored_blocks( ...@@ -416,7 +433,7 @@ pub fn create_stored_blocks(
kv_block_size: u32, kv_block_size: u32,
token_ids: &[u32], token_ids: &[u32],
num_block_tokens: &[u64], num_block_tokens: &[u64],
block_hashes: &[i64], block_hashes: &[u64],
lora_id: u64, lora_id: u64,
warning_count: &Arc<AtomicU32>, warning_count: &Arc<AtomicU32>,
) -> Vec<KvCacheStoredBlockData> { ) -> Vec<KvCacheStoredBlockData> {
...@@ -460,22 +477,206 @@ struct KvEventBatch { ...@@ -460,22 +477,206 @@ struct KvEventBatch {
data_parallel_rank: u32, // we are ignoring this for now data_parallel_rank: u32, // we are ignoring this for now
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[serde(untagged)]
enum BlockHashValue {
Signed(i64),
Unsigned(u64),
}
impl BlockHashValue {
fn into_u64(self) -> u64 {
match self {
BlockHashValue::Signed(v) => v as u64,
BlockHashValue::Unsigned(v) => v,
}
}
}
#[derive(Debug, Serialize)]
#[serde(tag = "type")] // msgspec encodes variant tag as a string when `tag=True` #[serde(tag = "type")] // msgspec encodes variant tag as a string when `tag=True`
enum RawKvEvent { enum RawKvEvent {
BlockStored { BlockStored {
block_hashes: Vec<i64>, /// Block hashes may be emitted as either signed or unsigned 64-bit values.
parent_block_hash: Option<i64>, /// We normalize them to `u64` while deserializing to support both producers.
block_hashes: Vec<BlockHashValue>,
parent_block_hash: Option<BlockHashValue>,
token_ids: Vec<u32>, token_ids: Vec<u32>,
block_size: usize, block_size: usize,
lora_id: Option<u64>, lora_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
medium: Option<String>,
}, },
BlockRemoved { BlockRemoved {
block_hashes: Vec<i64>, block_hashes: Vec<BlockHashValue>,
#[serde(skip_serializing_if = "Option::is_none")]
medium: Option<String>,
}, },
AllBlocksCleared, AllBlocksCleared,
} }
/// Our producers use msgspec with `tag=True` and `array_like=True`, which
/// encodes each event as either a tagged map or a tagged tuple. To be tolerant of
/// additional fields that may be appended in the future, we implement a custom
/// deserializer that ignores unknown keys and any extra positional elements.
///
/// This keeps us compatible with older payloads while safely
/// accepting newer ones that include extra metadata.
impl<'de> Deserialize<'de> for RawKvEvent {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(RawKvEventVisitor)
}
}
struct RawKvEventVisitor;
impl<'de> Visitor<'de> for RawKvEventVisitor {
type Value = RawKvEvent;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a kv event encoded as a tagged map or sequence")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut event_type: Option<String> = None;
let mut block_hashes: Option<Vec<BlockHashValue>> = None;
let mut parent_block_hash: Option<Option<BlockHashValue>> = None;
let mut token_ids: Option<Vec<u32>> = None;
let mut block_size: Option<usize> = None;
let mut lora_id: Option<Option<u64>> = None;
let mut medium: Option<Option<String>> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"type" => {
event_type = Some(map.next_value()?);
}
"block_hashes" => {
block_hashes = Some(map.next_value()?);
}
"parent_block_hash" => {
parent_block_hash = Some(map.next_value()?);
}
"token_ids" => {
token_ids = Some(map.next_value()?);
}
"block_size" => {
block_size = Some(map.next_value()?);
}
"lora_id" => {
lora_id = Some(map.next_value()?);
}
"medium" => {
medium = Some(map.next_value()?);
}
_ => {
map.next_value::<IgnoredAny>()?;
}
}
}
match event_type.as_deref() {
Some("BlockStored") => {
let block_hashes =
block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?;
let token_ids = token_ids.ok_or_else(|| de::Error::missing_field("token_ids"))?;
let block_size =
block_size.ok_or_else(|| de::Error::missing_field("block_size"))?;
Ok(RawKvEvent::BlockStored {
block_hashes,
parent_block_hash: parent_block_hash.unwrap_or(None),
token_ids,
block_size,
lora_id: lora_id.unwrap_or(None),
medium: medium.unwrap_or(None),
})
}
Some("BlockRemoved") => {
let block_hashes =
block_hashes.ok_or_else(|| de::Error::missing_field("block_hashes"))?;
Ok(RawKvEvent::BlockRemoved {
block_hashes,
medium: medium.unwrap_or(None),
})
}
Some("AllBlocksCleared") => Ok(RawKvEvent::AllBlocksCleared),
Some(other) => Err(de::Error::unknown_variant(
other,
&["BlockStored", "BlockRemoved", "AllBlocksCleared"],
)),
None => Err(de::Error::missing_field("type")),
}
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let tag: Option<String> = seq.next_element()?;
let Some(tag) = tag else {
return Err(de::Error::invalid_length(
0,
&"sequence must start with event tag",
));
};
match tag.as_str() {
"BlockStored" => {
let block_hashes: Vec<BlockHashValue> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?;
let parent_block_hash: Option<BlockHashValue> = seq.next_element()?.unwrap_or(None);
let token_ids: Vec<u32> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(3, &"missing token_ids"))?;
let block_size: usize = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(4, &"missing block_size"))?;
let lora_id: Option<u64> = seq.next_element()?.unwrap_or(None);
let medium: Option<String> = seq.next_element()?.unwrap_or(None);
while seq.next_element::<IgnoredAny>()?.is_some() {}
Ok(RawKvEvent::BlockStored {
block_hashes,
parent_block_hash,
token_ids,
block_size,
lora_id,
medium,
})
}
"BlockRemoved" => {
let block_hashes: Vec<BlockHashValue> = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(1, &"missing block_hashes"))?;
let medium: Option<String> = seq.next_element()?.unwrap_or(None);
while seq.next_element::<IgnoredAny>()?.is_some() {}
Ok(RawKvEvent::BlockRemoved {
block_hashes,
medium,
})
}
"AllBlocksCleared" => {
while seq.next_element::<IgnoredAny>()?.is_some() {}
Ok(RawKvEvent::AllBlocksCleared)
}
other => Err(de::Error::unknown_variant(
other,
&["BlockStored", "BlockRemoved", "AllBlocksCleared"],
)),
}
}
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Metrics Publishers ------------------------------------------------------ // Metrics Publishers ------------------------------------------------------
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
...@@ -745,7 +946,7 @@ mod test_event_processing { ...@@ -745,7 +946,7 @@ mod test_event_processing {
let stored = create_stored_block_from_parts(kv_block_size, blk_hash, &token_ids, 0); let stored = create_stored_block_from_parts(kv_block_size, blk_hash, &token_ids, 0);
assert_eq!(stored.block_hash.0, blk_hash as u64); assert_eq!(stored.block_hash.0, blk_hash);
let expected_hash = compute_block_hash_for_seq(&token_ids, 4)[0]; let expected_hash = compute_block_hash_for_seq(&token_ids, 4)[0];
assert_eq!(stored.tokens_hash, expected_hash); assert_eq!(stored.tokens_hash, expected_hash);
} }
...@@ -759,7 +960,7 @@ mod test_event_processing { ...@@ -759,7 +960,7 @@ mod test_event_processing {
// two blocks, each of size 4 // two blocks, each of size 4
let token_ids = vec![1, 2, 3, 4, 5, 6, 7, 8]; let token_ids = vec![1, 2, 3, 4, 5, 6, 7, 8];
let num_block_tokens = vec![4_u64, 4_u64]; let num_block_tokens = vec![4_u64, 4_u64];
let block_hashes = vec![111_i64, 222_i64]; let block_hashes = vec![111_u64, 222_u64];
let blocks = create_stored_blocks( let blocks = create_stored_blocks(
kv_block_size, kv_block_size,
...@@ -781,7 +982,7 @@ mod test_event_processing { ...@@ -781,7 +982,7 @@ mod test_event_processing {
// second block is the wrong size // second block is the wrong size
let token_ids = vec![1, 2, 3, 4, 5, 6, 7]; let token_ids = vec![1, 2, 3, 4, 5, 6, 7];
let num_block_tokens = vec![4_u64, 3_u64]; let num_block_tokens = vec![4_u64, 3_u64];
let block_hashes = vec![111_i64, 222_i64]; let block_hashes = vec![111_u64, 222_u64];
let warning_count = Arc::new(AtomicU32::new(0)); let warning_count = Arc::new(AtomicU32::new(0));
let blocks = create_stored_blocks( let blocks = create_stored_blocks(
...@@ -805,11 +1006,12 @@ mod test_event_processing { ...@@ -805,11 +1006,12 @@ mod test_event_processing {
fn test_convert_event_block_stored() { fn test_convert_event_block_stored() {
let kv_block_size = 4; let kv_block_size = 4;
let raw_evt = RawKvEvent::BlockStored { let raw_evt = RawKvEvent::BlockStored {
block_hashes: vec![10, 11], block_hashes: vec![BlockHashValue::Unsigned(10), BlockHashValue::Unsigned(11)],
parent_block_hash: Some(99), parent_block_hash: Some(BlockHashValue::Unsigned(99)),
token_ids: vec![1, 2, 3, 4, 5, 6, 7, 8], token_ids: vec![1, 2, 3, 4, 5, 6, 7, 8],
block_size: 4, block_size: 4,
lora_id: Some(0), lora_id: Some(0),
medium: None,
}; };
let out = convert_event(raw_evt, 42, kv_block_size, &Arc::new(AtomicU32::new(0))); let out = convert_event(raw_evt, 42, kv_block_size, &Arc::new(AtomicU32::new(0)));
...@@ -820,7 +1022,8 @@ mod test_event_processing { ...@@ -820,7 +1022,8 @@ mod test_event_processing {
fn test_convert_event_block_removed() { fn test_convert_event_block_removed() {
let kv_block_size = 4; let kv_block_size = 4;
let raw_evt = RawKvEvent::BlockRemoved { let raw_evt = RawKvEvent::BlockRemoved {
block_hashes: vec![123, 456], block_hashes: vec![BlockHashValue::Unsigned(123), BlockHashValue::Signed(456)],
medium: None,
}; };
let out = convert_event(raw_evt, 7, kv_block_size, &Arc::new(AtomicU32::new(0))); let out = convert_event(raw_evt, 7, kv_block_size, &Arc::new(AtomicU32::new(0)));
...@@ -965,11 +1168,12 @@ mod tests_startup_helpers { ...@@ -965,11 +1168,12 @@ mod tests_startup_helpers {
let seq: u64 = 77; let seq: u64 = 77;
let events = vec![RawKvEvent::BlockStored { let events = vec![RawKvEvent::BlockStored {
block_hashes: vec![42], block_hashes: vec![BlockHashValue::Unsigned(42)],
parent_block_hash: None, parent_block_hash: None,
token_ids: vec![0, 1, 2, 3], token_ids: vec![0, 1, 2, 3],
block_size: 4, block_size: 4,
lora_id: None, lora_id: None,
medium: None,
}]; }];
let batch = KvEventBatch { let batch = KvEventBatch {
......
...@@ -54,7 +54,7 @@ trtllm =[ ...@@ -54,7 +54,7 @@ trtllm =[
vllm = [ vllm = [
"uvloop", "uvloop",
"nixl<=0.4.1", "nixl<=0.4.1",
"vllm[flashinfer]==0.10.1.1", "vllm[flashinfer]==0.10.2",
] ]
sglang = [ sglang = [
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment