Unverified Commit 07cfc3a1 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat: kvbm + connector (#2258)


Signed-off-by: default avatarRyan Olson <rolson@nvidia.com>
Co-authored-by: default avatarOlga Andreeva <oandreeva@nvidia.com>
Co-authored-by: default avatarZiqi Fan <ziqif@nvidia.com>
Co-authored-by: default avatarJohn Thompson <jothomson@nvidia.com>
Co-authored-by: default avatarRichard Huo <rihuo@nvidia.com>
Co-authored-by: default avatarZicheng Ma <zichengm@nvidia.com>
parent bf5862a1
This diff is collapsed.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
ARG BASE_IMAGE="nvcr.io/nvidia/cuda-dl-base"
# FIXME: NCCL will hang with 25.03, so use 25.01 for now
# Please check https://github.com/ai-dynamo/dynamo/pull/1065
# for details and reproducer to manually test if the image
# can be updated to later versions.
ARG BASE_IMAGE_TAG="25.01-cuda12.8-devel-ubuntu24.04"
ARG RELEASE_BUILD
ARG RUNTIME_IMAGE="nvcr.io/nvidia/cuda"
ARG RUNTIME_IMAGE_TAG="12.8.1-runtime-ubuntu24.04"
# Define general architecture ARGs for supporting both x86 and aarch64 builds.
# ARCH: Used for package suffixes (e.g., amd64, arm64)
# ARCH_ALT: Used for Rust targets, manylinux suffix (e.g., x86_64, aarch64)
#
# Default values are for x86/amd64:
# --build-arg ARCH=amd64 --build-arg ARCH_ALT=x86_64
#
# For arm64/aarch64, build with:
# --build-arg ARCH=arm64 --build-arg ARCH_ALT=aarch64
#
# NOTE: There isn't an easy way to define one of these values based on the other value
# without adding if statements everywhere, so just define both as ARGs for now.
ARG ARCH=amd64
ARG ARCH_ALT=x86_64
FROM ${BASE_IMAGE}:${BASE_IMAGE_TAG} AS nixl_base
# Redeclare ARCH and ARCH_ALT so they're available in this stage
ARG ARCH
ARG ARCH_ALT
WORKDIR /opt/nixl
# Add a cache hint that only changes when the nixl commit changes
ARG NIXL_COMMIT
# This line acts as a cache key - it only changes when NIXL_COMMIT changes
RUN echo "NIXL commit: ${NIXL_COMMIT}" > /opt/nixl/commit.txt
# Copy the nixl source
COPY --from=nixl . .
##################################
########## Base Image ############
##################################
FROM ${BASE_IMAGE}:${BASE_IMAGE_TAG} AS base
# Redeclare ARCH and ARCH_ALT so they're available in this stage
ARG ARCH
ARG ARCH_ALT
USER root
ARG PYTHON_VERSION=3.12
RUN apt-get update -y && \
apt-get install -y \
# NIXL build dependencies
cmake \
meson \
ninja-build \
pybind11-dev \
# Rust build dependencies
clang \
libclang-dev \
git \
# Install utilities
nvtop \
tmux \
vim \
autoconf \
libtool
# These headers are missing with the hpcx installer, required
# by UCX to find RDMA devices
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
--reinstall libibverbs-dev rdma-core ibverbs-utils libibumad-dev \
libnuma-dev librdmacm-dev ibverbs-providers
ARG NIXL_UCX_REF=v1.19.x
WORKDIR /workspace
### UCX EFA Setup ###
RUN rm -rf /opt/hpcx/ucx
RUN rm -rf /usr/local/ucx
RUN echo "Building UCX with reference $NIXL_UCX_REF"
RUN cd /usr/local/src && \
git clone https://github.com/openucx/ucx.git && \
cd ucx && \
git checkout $NIXL_UCX_REF && \
./autogen.sh && ./configure \
--prefix=/usr/local/ucx \
--enable-shared \
--disable-static \
--disable-doxygen-doc \
--enable-optimizations \
--enable-cma \
--enable-devel-headers \
--with-cuda=/usr/local/cuda \
--with-verbs \
--with-efa \
--with-dm \
--with-gdrcopy=/usr/local \
--enable-mt && \
make -j && \
make -j install-strip && \
ldconfig
ENV LD_LIBRARY_PATH=/usr/lib:/usr/local/ucx/lib:$LD_LIBRARY_PATH
ENV CPATH=/usr/include
ENV PATH=/usr/bin:$PATH
ENV PKG_CONFIG_PATH=/usr/lib/pkgconfig
SHELL ["/bin/bash", "-c"]
WORKDIR /workspace
### NIXL SETUP ###
# Copy nixl source, and use commit hash as cache hint
COPY --from=nixl_base /opt/nixl /opt/nixl
COPY --from=nixl_base /opt/nixl/commit.txt /opt/nixl/commit.txt
RUN cd /opt/nixl && \
mkdir build && \
meson setup build/ --buildtype=release --prefix=/usr/local/nixl && \
cd build/ && \
ninja && \
ninja install
### NATS & ETCD SETUP ###
# nats
RUN wget --tries=3 --waitretry=5 https://github.com/nats-io/nats-server/releases/download/v2.10.24/nats-server-v2.10.24-${ARCH}.deb && \
dpkg -i nats-server-v2.10.24-${ARCH}.deb && rm nats-server-v2.10.24-${ARCH}.deb
# etcd
ENV ETCD_VERSION="v3.5.18"
RUN wget --tries=3 --waitretry=5 https://github.com/etcd-io/etcd/releases/download/$ETCD_VERSION/etcd-$ETCD_VERSION-linux-${ARCH}.tar.gz -O /tmp/etcd.tar.gz && \
mkdir -p /usr/local/bin/etcd && \
tar -xvf /tmp/etcd.tar.gz -C /usr/local/bin/etcd --strip-components=1 && \
rm /tmp/etcd.tar.gz
ENV PATH=/usr/local/bin/etcd/:$PATH
### VIRTUAL ENVIRONMENT SETUP ###
# Install uv and create virtualenv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
RUN mkdir /opt/dynamo && \
uv venv /opt/dynamo/venv --python 3.12
# Activate virtual environment
ENV VIRTUAL_ENV=/opt/dynamo/venv
ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"
# Install NIXL Python module
RUN cd /opt/nixl && uv build . --out-dir /workspace/wheels/nixl
# Install the wheel
# TODO: Move NIXL wheel install to the wheel_builder stage
RUN uv pip install /workspace/wheels/nixl/*.whl
# Install forked vLLM with KVBM integration
ARG VLLM_REF="dynamo/stage-1"
ENV CUDA_HOME=/usr/local/cuda
RUN --mount=type=bind,source=./container/deps/,target=/tmp/deps \
--mount=type=cache,target=/root/.cache/uv \
uv pip install pip cuda-python && \
mkdir /opt/vllm && \
cd /opt/vllm && \
git clone https://github.com/ryanolson/vllm.git && \
cd vllm && \
git checkout $VLLM_REF && \
VLLM_USE_PRECOMPILED=1 uv pip install -e .
# Common dependencies
RUN --mount=type=bind,source=./container/deps/requirements.txt,target=/tmp/requirements.txt \
uv pip install --requirement /tmp/requirements.txt
# Install test dependencies
RUN --mount=type=bind,source=./container/deps/requirements.test.txt,target=/tmp/requirements.txt \
uv pip install --requirement /tmp/requirements.txt
# ### MISC UTILITY SETUP ###
# Finish pyright install
RUN pyright --help > /dev/null 2>&1
# Enable Git operations in the /workspace directory
RUN printf "[safe]\n directory=/workspace\n" > /root/.gitconfig
RUN ln -sf /bin/bash /bin/sh
# Install prometheus
ARG PROM_VERSION=3.4.1
RUN apt-get update && apt-get install -y --no-install-recommends \
curl tar ca-certificates && \
rm -rf /var/lib/apt/lists/*
RUN ARCH=$(dpkg --print-architecture) && \
case "$ARCH" in \
amd64) PLATFORM=linux-amd64 ;; \
arm64) PLATFORM=linux-arm64 ;; \
*) echo "Unsupported architecture: $ARCH" && exit 1 ;; \
esac && \
curl -fsSL https://github.com/prometheus/prometheus/releases/download/v${PROM_VERSION}/prometheus-${PROM_VERSION}.${PLATFORM}.tar.gz \
| tar -xz -C /tmp && \
mv /tmp/prometheus-${PROM_VERSION}.${PLATFORM}/prometheus /usr/local/bin/ && \
chmod +x /usr/local/bin/prometheus && \
rm -rf /tmp/prometheus-${PROM_VERSION}.${PLATFORM}
### BUILDS ###
# Rust build/dev dependencies
RUN apt update -y && \
apt install --no-install-recommends -y \
build-essential \
protobuf-compiler \
cmake \
libssl-dev \
pkg-config
ENV RUSTUP_HOME=/usr/local/rustup \
CARGO_HOME=/usr/local/cargo \
PATH=/usr/local/cargo/bin:$PATH \
RUST_VERSION=1.87.0
# Define Rust target based on ARCH_ALT ARG
ARG RUSTARCH=${ARCH_ALT}-unknown-linux-gnu
# Install Rust using RUSTARCH derived from ARCH_ALT
RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/${RUSTARCH}/rustup-init" && \
# TODO: Add SHA check back based on RUSTARCH
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile default --default-toolchain $RUST_VERSION --default-host ${RUSTARCH} && \
rm rustup-init && \
chmod -R a+w $RUSTUP_HOME $CARGO_HOME
ARG CARGO_BUILD_JOBS
# Set CARGO_BUILD_JOBS to 16 if not provided
# This is to prevent cargo from building $(nproc) jobs in parallel,
# which might exceed the number of opened files limit.
ENV CARGO_BUILD_JOBS=${CARGO_BUILD_JOBS:-16}
#######################################
########## Local Development ##########
#######################################
FROM base AS local-dev
# https://code.visualstudio.com/remote/advancedcontainers/add-nonroot-user
# Will use the default ubuntu user, but give sudo access
# Needed so files permissions aren't set to root ownership when writing from inside container
# Don't want ubuntu to be editable, just change uid and gid. User ubuntu is hardcoded in .devcontainer
ENV USERNAME=ubuntu
ARG USER_UID=1000
ARG USER_GID=1000
RUN apt-get update && apt-get install -y sudo gnupg2 gnupg1 \
&& echo "$USERNAME ALL=(root) NOPASSWD:ALL" > /etc/sudoers.d/$USERNAME \
&& chmod 0440 /etc/sudoers.d/$USERNAME \
&& mkdir -p /home/$USERNAME \
&& chown -R $USERNAME:$USERNAME /home/$USERNAME \
&& rm -rf /var/lib/apt/lists/* \
&& chsh -s /bin/bash $USERNAME
# This is a slow operation (~40s on my cpu)
# Much better than chown -R $USERNAME:$USERNAME /opt/dynamo/venv (~10min on my cpu)
COPY --from=base --chown=$USER_UID:$USER_GID /opt/dynamo/venv/ /opt/dynamo/venv/
RUN chown $USERNAME:$USERNAME /opt/dynamo/venv
COPY --from=base --chown=$USERNAME:$USERNAME /usr/local/bin /usr/local/bin
# so we can use maturin develop
RUN uv pip install maturin[patchelf]
USER $USERNAME
ENV HOME=/home/$USERNAME
ENV PYTHONPATH=$HOME/dynamo/deploy/sdk/src:$PYTHONPATH:$HOME/dynamo/components/planner/src:$PYTHONPATH
ENV CARGO_TARGET_DIR=$HOME/dynamo/.build/target
WORKDIR $HOME
# https://code.visualstudio.com/remote/advancedcontainers/persist-bash-history
RUN SNIPPET="export PROMPT_COMMAND='history -a' && export HISTFILE=$HOME/.commandhistory/.bash_history" \
&& mkdir -p $HOME/.commandhistory \
&& touch $HOME/.commandhistory/.bash_history \
&& echo "$SNIPPET" >> "$HOME/.bashrc"
RUN mkdir -p /home/$USERNAME/.cache/
ENV VLLM_KV_CAPI_PATH=$HOME/dynamo/.build/target/debug/libdynamo_llm_capi.so
ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"]
##################################
##### Wheel Build Image ##########
##################################
# Redeclare ARCH_ALT ARG so it's available for interpolation in the FROM instruction
ARG ARCH_ALT
FROM quay.io/pypa/manylinux_2_28_${ARCH_ALT} AS wheel_builder
ARG CARGO_BUILD_JOBS
# Set CARGO_BUILD_JOBS to 16 if not provided
# This is to prevent cargo from building $(nproc) jobs in parallel,
# which might exceed the number of opened files limit.
ENV CARGO_BUILD_JOBS=${CARGO_BUILD_JOBS:-16}
# Use build arg RELEASE_BUILD = true to generate wheels for Python 3.10, 3.11 and 3.12.
ARG RELEASE_BUILD
WORKDIR /workspace
RUN yum update -y \
&& yum install -y llvm-toolset \
&& yum install -y python3.12-devel \
&& yum install -y protobuf-compiler \
&& yum clean all \
&& rm -rf /var/cache/yum
ENV RUSTUP_HOME=/usr/local/rustup \
CARGO_HOME=/usr/local/cargo \
CARGO_TARGET_DIR=/workspace/target \
VIRTUAL_ENV=/opt/dynamo/venv
COPY --from=base $RUSTUP_HOME $RUSTUP_HOME
COPY --from=base $CARGO_HOME $CARGO_HOME
COPY --from=base /usr/local/nixl /opt/nvidia/nvda_nixl
COPY --from=base /workspace /workspace
COPY --from=base $VIRTUAL_ENV $VIRTUAL_ENV
ENV PATH=$CARGO_HOME/bin:$VIRTUAL_ENV/bin:$PATH
# Copy configuration files
COPY pyproject.toml /workspace/
COPY README.md /workspace/
COPY LICENSE /workspace/
COPY Cargo.toml /workspace/
COPY Cargo.lock /workspace/
COPY rust-toolchain.toml /workspace/
COPY hatch_build.py /workspace/
# Copy source code
COPY lib/ /workspace/lib/
COPY components /workspace/components
COPY launch /workspace/launch
COPY deploy/sdk /workspace/deploy/sdk
RUN cargo build \
--release \
--locked \
--features dynamo-llm/block-manager \
--workspace
# Build dynamo wheel
RUN uv build --wheel --out-dir /workspace/dist && \
cd /workspace/lib/bindings/python && \
uv pip install maturin[patchelf] && \
maturin build --release --features block-manager --out /workspace/dist && \
if [ "$RELEASE_BUILD" = "true" ]; then \
# do not enable KVBM feature, ensure compatibility with lower glibc
uv run --python 3.11 maturin build --release --out /workspace/dist && \
uv run --python 3.10 maturin build --release --out /workspace/dist; \
fi
#######################################
########## CI Minimum Image ###########
#######################################
FROM base AS ci_minimum
ENV DYNAMO_HOME=/workspace
ENV CARGO_TARGET_DIR=/workspace/target
WORKDIR /workspace
COPY --from=wheel_builder /workspace /workspace
COPY --from=wheel_builder /opt/nvidia/nvda_nixl /opt/nvidia/nvda_nixl
# Copy Cargo cache to avoid re-downloading dependencies
COPY --from=wheel_builder $CARGO_HOME $CARGO_HOME
# Copy rest of the code
COPY . /workspace
# Build C bindings, creates lib/bindings/c/include
#
# TODO: In theory the 'cargo build' in earlier stage covers this, we "just" need to copy the
# `lib/bindings/c/include` folder that build.rs generated across.
# I couldn't get that to work, hence TODO.
RUN cd /workspace/lib/bindings/c && cargo build --release --locked
# Package the bindings
RUN mkdir -p /opt/dynamo/bindings/wheels && \
mkdir /opt/dynamo/bindings/lib && \
cp dist/ai_dynamo*cp312*.whl /opt/dynamo/bindings/wheels/. && \
cp target/release/libdynamo_llm_capi.so /opt/dynamo/bindings/lib/. && \
cp -r lib/bindings/c/include /opt/dynamo/bindings/. && \
cp target/release/dynamo-run /usr/local/bin && \
cp target/release/http /usr/local/bin && \
cp target/release/llmctl /usr/local/bin && \
cp target/release/metrics /usr/local/bin && \
cp target/release/mock_worker /usr/local/bin
RUN uv pip install /workspace/dist/ai_dynamo_runtime*cp312*.whl && \
uv pip install /workspace/dist/ai_dynamo*any.whl
RUN uv pip install /workspace/benchmarks
# Copy launch banner
RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/launch_message.txt \
sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \
echo "cat ~/.launch_screen" >> ~/.bashrc
# Tell vllm to use the Dynamo LLM C API for KV Cache Routing
ENV VLLM_KV_CAPI_PATH=/opt/dynamo/bindings/lib/libdynamo_llm_capi.so
ARG ARCH_ALT
ENV NIXL_PLUGIN_DIR=/usr/local/nixl/lib/${ARCH_ALT}-linux-gnu/plugins
ENV LD_LIBRARY_PATH=/usr/local/nixl/lib/${ARCH_ALT}-linux-gnu:/usr/local/nixl/lib/${ARCH_ALT}-linux-gnu/plugins:/usr/local/ucx/lib:$LD_LIBRARY_PATH
########################################
########## Development Image ###########
########################################
FROM ci_minimum AS dev
ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"]
CMD []
####################################
########## Runtime Image ###########
####################################
FROM ${RUNTIME_IMAGE}:${RUNTIME_IMAGE_TAG} AS runtime
WORKDIR /workspace
ENV DYNAMO_HOME=/workspace
ENV VIRTUAL_ENV=/opt/dynamo/venv
ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"
# Install build-essential and python3-dev as apt dependencies
RUN apt-get update && \
DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
build-essential \
python3-dev && \
rm -rf /var/lib/apt/lists/*
### COPY BINDINGS ###
# Copy all bindings (wheels, lib, include) from ci_minimum
COPY --from=ci_minimum /opt/dynamo/bindings /opt/dynamo/bindings
### COPY NATS & ETCD ###
# Copy nats and etcd from base image
COPY --from=base /usr/bin/nats-server /usr/bin/nats-server
COPY --from=base /usr/local/bin/etcd/ /usr/local/bin/etcd/
ENV PATH=/usr/local/bin/etcd/:$PATH
# Copy UCX from base image as plugin for NIXL
# Copy NIXL source from base image (required for NIXL plugins)
COPY --from=base /usr/local/ucx /usr/local/ucx
COPY --from=base /usr/local/nixl /usr/local/nixl
ARG ARCH_ALT
ENV NIXL_PLUGIN_DIR=/usr/local/nixl/lib/${ARCH_ALT}-linux-gnu/plugins
ENV LD_LIBRARY_PATH=/usr/local/nixl/lib/${ARCH_ALT}-linux-gnu:/usr/local/nixl/lib/${ARCH_ALT}-linux-gnu/plugins:/usr/local/ucx/lib:$LD_LIBRARY_PATH
# Setup the python environment
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
RUN uv venv $VIRTUAL_ENV --python 3.12 && \
echo "source $VIRTUAL_ENV/bin/activate" >> ~/.bashrc
# Common dependencies
RUN --mount=type=bind,source=./container/deps/requirements.txt,target=/tmp/requirements.txt \
uv pip install --requirement /tmp/requirements.txt
# Install test dependencies
#TODO: Remove this once we have a functional ci_minimum image built on top of the runtime image
RUN --mount=type=bind,source=./container/deps/requirements.test.txt,target=/tmp/requirements.txt \
uv pip install --requirement /tmp/requirements.txt
#TODO: Remove this once we have a functional ci_minimum image built on top of the runtime image
COPY . /workspace
RUN uv pip install /workspace/benchmarks
# Install the wheels and symlink executables to /usr/local/bin so dynamo components can use them
# Dynamo components currently do not have the VIRTUAL_ENV in their PATH, so we need to symlink the executables
#Copy NIXL and Dynamo wheels into wheelhouse
COPY --from=base /workspace/wheels/nixl/*.whl wheelhouse/
COPY --from=wheel_builder /workspace/dist/*.whl wheelhouse/
RUN uv pip install ai-dynamo[vllm] --find-links wheelhouse && \
uv pip install nixl --find-links wheelhouse && \
ln -sf $VIRTUAL_ENV/bin/* /usr/local/bin/
# Tell vllm to use the Dynamo LLM C API for KV Cache Routing
ENV VLLM_KV_CAPI_PATH="/opt/dynamo/bindings/lib/libdynamo_llm_capi.so"
# Copy launch banner
RUN --mount=type=bind,source=./container/launch_message.txt,target=/workspace/launch_message.txt \
sed '/^#\s/d' /workspace/launch_message.txt > ~/.launch_screen && \
echo "cat ~/.launch_screen" >> ~/.bashrc
ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"]
CMD []
......@@ -49,7 +49,8 @@ PYTHON_PACKAGE_VERSION=${current_tag:-$latest_tag.dev+$commit_id}
# dependencies are specified in the /container/deps folder and
# installed within framework specific sections of the Dockerfile.
declare -A FRAMEWORKS=(["VLLM"]=1 ["TRTLLM"]=2 ["NONE"]=3 ["SGLANG"]=4)
declare -A FRAMEWORKS=(["VLLM"]=1 ["TRTLLM"]=2 ["NONE"]=3 ["SGLANG"]=4 ["KVBM"]=5)
DEFAULT_FRAMEWORK=VLLM
SOURCE_DIR=$(dirname "$(readlink -f "$0")")
......@@ -414,6 +415,8 @@ elif [[ $FRAMEWORK == "NONE" ]]; then
DOCKERFILE=${SOURCE_DIR}/Dockerfile
elif [[ $FRAMEWORK == "SGLANG" ]]; then
DOCKERFILE=${SOURCE_DIR}/Dockerfile.sglang
elif [[ $FRAMEWORK == "KVBM" ]]; then
DOCKERFILE=${SOURCE_DIR}/Dockerfile.kvbm
fi
# Add NIXL_REF as a build argument
......
......@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# For IFEval dataset loading in kvbm tests
datasets
psutil>=5.0.0
pyright
pytest
......
......@@ -24,7 +24,8 @@ RUN_PREFIX=
# dependencies are specified in the /container/deps folder and
# installed within framework specific sections of the Dockerfile.
declare -A FRAMEWORKS=(["VLLM"]=1 ["TRTLLM"]=2 ["NONE"]=3 ["SGLANG"]=4)
declare -A FRAMEWORKS=(["VLLM"]=1 ["TRTLLM"]=2 ["NONE"]=3 ["SGLANG"]=4 ["KVBM"]=5)
DEFAULT_FRAMEWORK=VLLM
SOURCE_DIR=$(dirname "$(readlink -f "$0")")
......@@ -276,6 +277,14 @@ get_options() {
if [ -n "$USE_NIXL_GDS" ]; then
VOLUME_MOUNTS+=" -v /run/udev:/run/udev:ro "
NIXL_GDS_CAPS="--cap-add=IPC_LOCK"
# NOTE(jthomson04): In the KVBM disk pools, we currently allocate our files in /tmp.
# For some arcane reason, GDS requires that /tmp be mounted.
# This is already handled for us if we set --mount-workspace
# If we aren't mounting our workspace but need GDS, we need to mount /tmp.
if [ -z "$MOUNT_WORKSPACE" ]; then
VOLUME_MOUNTS+=" -v /tmp:/tmp "
fi
else
NIXL_GDS_CAPS=""
fi
......
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Running KVBM in vLLM
This guide explains how to leverage KVBM (KV Block Manager) to mange KV cache and do KV offloading in vLLM.
To learn what KVBM is, please check [here](https://docs.nvidia.com/dynamo/latest/architecture/kvbm_intro.html)
## Quick Start
To use KVBM in vLLM, you can follow the steps below:
```bash
# start up etcd for KVBM leader/worker registration and discovery
docker compose -f deploy/metrics/docker-compose.yml up -d
# build a container containing vllm and kvbm
./container/build.sh --framework kvbm
# launch the container
./container/run.sh --framework kvbm -it --mount-workspace --use-nixl-gds
# enable using kvbm instead of vllm's own kv cache manager
export DYN_KVBM_MANAGER=kvbm
# enable kv offloading to CPU memory
# 4 means 4GB of CPU memory would be used
export DYN_KVBM_CPU_CACHE_GB=4
# enable kv offloading to disk
# 8 means 8GB of disk would be used
export DYN_KVBM_DISK_CACHE_GB=8
# serve an example LLM model
vllm serve deepseek-ai/DeepSeek-R1-Distill-Llama-8B
# make a call to LLM
curl localhost:8000/v1/chat/completions -H "Content-Type: application/json" -d '{
"model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"messages": [
{
"role": "user",
"content": "In the heart of Eldoria, an ancient land of boundless magic and mysterious creatures, lies the long-forgotten city of Aeloria. Once a beacon of knowledge and power, Aeloria was buried beneath the shifting sands of time, lost to the world for centuries. You are an intrepid explorer, known for your unparalleled curiosity and courage, who has stumbled upon an ancient map hinting at ests that Aeloria holds a secret so profound that it has the potential to reshape the very fabric of reality. Your journey will take you through treacherous deserts, enchanted forests, and across perilous mountain ranges. Your Task: Character Background: Develop a detailed background for your character. Describe their motivations for seeking out Aeloria, their skills and weaknesses, and any personal connections to the ancient city or its legends. Are they driven by a quest for knowledge, a search for lost familt clue is hidden."
}
],
"stream":false,
"max_tokens": 30
}'
```
This diff is collapsed.
......@@ -35,7 +35,7 @@ crate-type = ["cdylib", "rlib"]
[features]
default = []
block-manager = ["dynamo-llm/block-manager", "dep:dlpark"]
block-manager = ["dynamo-llm/block-manager", "dep:dlpark", "dep:cudarc"]
[dependencies]
dynamo-llm = { path = "../../llm" }
......@@ -45,6 +45,7 @@ anyhow = { version = "1" }
async-openai = { version = "0.29.0" }
async-stream = { version = "0.3" }
async-trait = { version = "0.1" }
derive-getters = "0.5"
either = { version = "1.13", features = ["serde"] }
futures = { version = "0.3" }
once_cell = { version = "1.20.3" }
......@@ -53,9 +54,10 @@ serde_json = { version = "1.0.138" }
thiserror = { version = "2.0" }
tokio = { version = "1.46.0", features = ["full"] }
tokio-stream = { version = "0" }
tokio-util = { version = "0.7" }
tracing = { version = "0" }
tokio-util = { version = "0.7", features = ["rt"] }
tracing = { version = "0" }
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
uuid = { version = "1.17", features = ["v4", "serde"] }
# "extension-module" tells pyo3 we want to build an extension module (skips linking against libpython.so)
# "abi3-py39" tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.9
......@@ -77,4 +79,8 @@ pyo3-async-runtimes = { version = "0.23.0", default-features = false, features =
pythonize = "0.23"
dlpark = { version = "0.5", features = ["pyo3", "half"], optional = true }
cudarc = { version = "0.16.2", features = ["cuda-12020"], optional = true }
[dev-dependencies]
rstest = "0.25"
......@@ -199,11 +199,18 @@ struct EtcdKvCache {
#[pyclass]
#[derive(Clone)]
struct DistributedRuntime {
pub struct DistributedRuntime {
inner: rs::DistributedRuntime,
event_loop: PyObject,
}
impl DistributedRuntime {
#[allow(dead_code)]
fn inner(&self) -> &rs::DistributedRuntime {
&self.inner
}
}
#[pyclass]
#[derive(Clone)]
struct EtcdClient {
......@@ -283,6 +290,21 @@ impl DistributedRuntime {
Ok(DistributedRuntime { inner, event_loop })
}
#[staticmethod]
fn detached(py: Python) -> PyResult<Self> {
let rt = rs::Worker::runtime_from_existing().map_err(to_pyerr)?;
let handle = rt.primary();
let inner = handle
.block_on(rs::DistributedRuntime::from_settings(rt))
.map_err(to_pyerr)?;
Ok(DistributedRuntime {
inner,
event_loop: py.None(),
})
}
fn namespace(&self, name: String) -> PyResult<Namespace> {
Ok(Namespace {
inner: self.inner.namespace(name).map_err(to_pyerr)?,
......
......@@ -27,7 +27,6 @@
use super::*;
pub mod backend;
pub mod block_manager;
pub mod disagg_router;
pub mod entrypoint;
pub mod kv;
......@@ -35,3 +34,6 @@ pub mod local_model;
pub mod model_card;
pub mod nats;
pub mod preprocessor;
#[cfg(feature = "block-manager")]
pub mod block_manager;
......@@ -13,216 +13,210 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![cfg(feature = "block-manager")]
use super::*;
use dynamo_llm::block_manager::block::{
data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical,
};
use dynamo_llm::block_manager::{BasicMetadata, BlockParallelismStrategy};
use pyo3::PyResult;
use tokio_util::sync::CancellationToken;
mod controller;
mod distributed;
mod block;
mod block_list;
mod dlpack;
mod layer;
pub mod vllm;
/// Add bingings from this crate to the provided module
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<layer::Layer>()?;
m.add_class::<block::Block>()?;
m.add_class::<block_list::BlockList>()?;
m.add_class::<BlockManager>()?;
m.add_class::<distributed::KvbmWorker>()?;
m.add_class::<distributed::KvbmLeader>()?;
m.add_class::<controller::BlockManagerClient>()?;
m.add_class::<controller::BlockPoolStatus>()?;
m.add_class::<controller::ResetBlocksResponse>()?;
vllm::add_to_module(m)?;
Ok(())
}
type VllmBlockManager = dynamo_llm::block_manager::KvBlockManager<
Logical<DistributedLeaderWorkerResources>,
BasicMetadata,
>;
type VllmController = Arc<
dynamo_llm::block_manager::controller::Controller<
Logical<DistributedLeaderWorkerResources>,
BasicMetadata,
>,
>;
#[pyclass]
#[derive(Clone)]
pub struct BlockManager {
inner: Arc<dynamo_llm::block_manager::ReferenceBlockManager>,
// TODO: Metadata should be stored in the block manager?
dtype: dynamo_llm::common::dtype::DType,
device_id: usize,
inner: VllmBlockManager,
drt: DistributedRuntime,
_controller: Option<VllmController>,
}
// TODO: This is in desperate need of a massive refactor. We bind and instantiate this in Python, but we never actually use it.
#[pymethods]
#[allow(unused_variables)]
impl BlockManager {
#[new]
#[pyo3(signature = (worker_id, num_layer, outer_dim, page_size, inner_dim, dtype=None, host_num_blocks=None, device_num_blocks=None, device_id=0))]
#[pyo3(signature = (worker_id, leader = None, page_size = 32, num_device_blocks = None, disable_device_pool = false))]
fn new(
worker_id: u64,
num_layer: usize,
outer_dim: usize,
leader: Option<distributed::KvbmLeader>,
page_size: usize,
inner_dim: usize,
dtype: Option<String>,
host_num_blocks: Option<usize>,
device_num_blocks: Option<usize>,
device_id: usize,
num_device_blocks: Option<usize>,
disable_device_pool: bool,
) -> PyResult<Self> {
let cancel_token = CancellationToken::new();
let mut config = dynamo_llm::block_manager::KvBlockManagerConfig::builder().runtime(
dynamo_llm::block_manager::KvManagerRuntimeConfig::builder()
.worker_id(worker_id)
.cancellation_token(cancel_token.clone())
.build()
.map_err(to_pyerr)?,
);
let mut model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder()
.num_layers(num_layer)
.outer_dim(outer_dim)
let model_config = dynamo_llm::block_manager::KvManagerModelConfig::builder()
.num_layers(1)
.outer_dim(1)
.page_size(page_size)
.inner_dim(inner_dim);
let mut dtype_ = dynamo_llm::common::dtype::DType::FP16; // Default in block_manager config
if let Some(dtype_str) = dtype {
dtype_ = match dtype_str.as_str() {
"fp8" | "FP8" => dynamo_llm::common::dtype::DType::FP8,
"fp16" | "FP16" => dynamo_llm::common::dtype::DType::FP16,
"bf16" | "BF16" => dynamo_llm::common::dtype::DType::BF16,
"fp32" | "FP32" => dynamo_llm::common::dtype::DType::FP32,
"u8" | "U8" => dynamo_llm::common::dtype::DType::U8,
"u16" | "U16" => dynamo_llm::common::dtype::DType::U16,
"u32" | "U32" => dynamo_llm::common::dtype::DType::U32,
"u64" | "U64" => dynamo_llm::common::dtype::DType::U64,
"i8" | "I8" => dynamo_llm::common::dtype::DType::I8,
"i16" | "I16" => dynamo_llm::common::dtype::DType::I16,
"i32" | "I32" => dynamo_llm::common::dtype::DType::I32,
"i64" | "I64" => dynamo_llm::common::dtype::DType::I64,
_ => {
return Err(pyo3::exceptions::PyValueError::new_err(format!(
"Unsupported dtype: {}",
dtype_str
)))
}
};
}
model_config = model_config.dtype(dtype_.clone());
.inner_dim(1);
config = config.model(model_config.build().map_err(to_pyerr)?);
if let Some(host_num_blocks) = host_num_blocks {
config = config.host_layout(
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
.num_blocks(host_num_blocks)
.allocator(
dynamo_llm::block_manager::storage::PinnedAllocator::new()
.map_err(to_pyerr)?,
)
.build()
.map_err(to_pyerr)?,
);
}
if let Some(device_num_blocks) = device_num_blocks {
config = config.device_layout(
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
.num_blocks(device_num_blocks)
.allocator(
dynamo_llm::block_manager::storage::DeviceAllocator::new(device_id)
.map_err(to_pyerr)?,
)
.build()
.map_err(to_pyerr)?,
);
}
let (leader, drt) = if let Some(leader) = leader {
let (leader, rt) = leader.dissolve();
if !disable_device_pool {
config = config.device_layout(
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
.num_blocks(leader.num_device_blocks())
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
.build()
.map_err(to_pyerr)?,
);
}
if leader.num_host_blocks() > 0 {
tracing::info!("Using {} host blocks", leader.num_host_blocks());
config = config.host_layout(
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
.num_blocks(leader.num_host_blocks())
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
.build()
.map_err(to_pyerr)?,
);
}
if leader.num_disk_blocks() > 0 {
tracing::info!("Using {} disk blocks", leader.num_disk_blocks());
config = config.disk_layout(
dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
.num_blocks(leader.num_disk_blocks())
.logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
.build()
.map_err(to_pyerr)?,
);
}
(Some(leader), rt)
} else {
tracing::info!("Leader not provided. Block transfer functionality will be disabled.");
// let num_device_blocks = num_device_blocks
// .expect("num_device_blocks must be provided if leader is not provided");
// config = config.device_layout(
// dynamo_llm::block_manager::KvManagerLayoutConfig::builder()
// .num_blocks(num_device_blocks)
// .logical(Some(BlockParallelismStrategy::LeaderWorkerSharded))
// .build()
// .map_err(to_pyerr)?,
// );
unimplemented!("Leader not provided");
// (
// None,
// Arc::new(
// tokio::runtime::Builder::new_multi_thread()
// .enable_all()
// .build()
// .map_err(to_pyerr)?,
// ),
// )
};
let rt = drt.inner().runtime().primary();
let config = config.build().map_err(to_pyerr)?;
let tokio_runtime = pyo3_async_runtimes::tokio::get_runtime();
Ok(BlockManager {
inner: Arc::from(
tokio_runtime
.block_on(async {
dynamo_llm::block_manager::ReferenceBlockManager::new(config)
})
.map_err(to_pyerr)?,
),
dtype: dtype_,
device_id: device_id,
inner: rt
.block_on(async {
let resources =
DistributedLeaderWorkerResources::new(leader, cancel_token.child_token())?;
dynamo_llm::block_manager::KvBlockManager::<
Logical<DistributedLeaderWorkerResources>,
BasicMetadata,
>::new(config, resources)
.await
})
.map_err(to_pyerr)?,
drt,
_controller: None,
})
}
fn allocate_host_blocks_blocking(&self, count: usize) -> PyResult<block_list::BlockList> {
let blocks = self
.inner
.host()
.ok_or_else(|| {
pyo3::exceptions::PyRuntimeError::new_err("Host allocator not available")
})?
.allocate_blocks_blocking(count)
.map_err(to_pyerr)?;
// Wrap each block in an enum accounting for Pinned & Device block
let blocks = blocks
.into_iter()
.map(|b| block::BlockType::Pinned(b))
.collect();
Ok(block_list::BlockList::from_rust(
blocks,
self.dtype.clone(),
self.device_id,
))
fn block_size(&self) -> usize {
self.inner.block_size()
}
#[pyo3(signature = (count))]
fn allocate_host_blocks<'py>(
&self,
py: Python<'py>,
count: usize,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let dtype = self.dtype.clone();
let device_id = self.device_id;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let blocks = inner
.host()
.ok_or_else(|| {
pyo3::exceptions::PyRuntimeError::new_err("Host allocator not available")
})?
.allocate_blocks(count)
.await
.map_err(to_pyerr)?;
// Wrap each block in an enum accounting for Pinned & Device block
let blocks = blocks
.into_iter()
.map(|b| block::BlockType::Pinned(b))
.collect();
Ok(block_list::BlockList::from_rust(blocks, dtype, device_id))
})
}
fn init_controller(&mut self, component: Component) -> PyResult<()> {
if self._controller.is_some() {
tracing::warn!("Controller already initialized. Ignoring init_controller call.");
return Ok(());
}
fn allocate_device_blocks_blocking(&self, count: usize) -> PyResult<block_list::BlockList> {
let blocks = self
.inner
.device()
.ok_or_else(|| {
pyo3::exceptions::PyRuntimeError::new_err("Device allocator not available")
})?
.allocate_blocks_blocking(count)
let block_manager = self.inner.clone();
let controller = self
.drt
.inner()
.runtime()
.primary()
.block_on(controller::Controller::new(
block_manager,
component.inner.clone(),
))
.map_err(to_pyerr)?;
// Wrap each block in an enum accounting for Pinned & Device block
let blocks = blocks
.into_iter()
.map(|b| block::BlockType::Device(b))
.collect();
Ok(block_list::BlockList::from_rust(
blocks,
self.dtype.clone(),
self.device_id,
))
self._controller = Some(Arc::new(controller));
let instance_id = component
.inner
.drt()
.primary_lease()
.map(|lease| lease.id())
.ok_or_else(|| to_pyerr(anyhow::anyhow!("no instance id")))?;
tracing::info!(
"Dynamo KVBM Controller: {}.{}:{}",
component.inner.namespace().name(),
component.inner.name(),
instance_id
);
Ok(())
}
}
#[pyo3(signature = (count))]
fn allocate_device_blocks<'py>(
&self,
py: Python<'py>,
count: usize,
) -> PyResult<Bound<'py, PyAny>> {
let inner = self.inner.clone();
let dtype = self.dtype.clone();
let device_id = self.device_id;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let blocks = inner
.device()
.ok_or_else(|| {
pyo3::exceptions::PyRuntimeError::new_err("Device allocator not available")
})?
.allocate_blocks(count)
.await
.map_err(to_pyerr)?;
// Wrap each block in an enum accounting for Pinned & Device block
let blocks = blocks
.into_iter()
.map(|b| block::BlockType::Device(b))
.collect();
Ok(block_list::BlockList::from_rust(blocks, dtype, device_id))
})
impl BlockManager {
#[inline(always)]
pub fn get_block_manager(&self) -> &VllmBlockManager {
&self.inner
}
}
......@@ -17,6 +17,7 @@
use super::*;
use dynamo_llm::block_manager::block::BlockDataExt;
use dynamo_llm::block_manager::block::BlockDataProviderMut;
use pyo3::{
types::{PyList, PyTuple},
PyObject, PyResult, Python,
......@@ -27,12 +28,14 @@ pub enum BlockType {
Pinned(
dynamo_llm::block_manager::block::MutableBlock<
dynamo_llm::block_manager::storage::PinnedStorage,
dynamo_llm::block_manager::block::locality::Local,
dynamo_llm::block_manager::block::BasicMetadata,
>,
),
Device(
dynamo_llm::block_manager::block::MutableBlock<
dynamo_llm::block_manager::storage::DeviceStorage,
dynamo_llm::block_manager::block::locality::Local,
dynamo_llm::block_manager::block::BasicMetadata,
>,
),
......@@ -56,8 +59,8 @@ impl Block {
) -> Self {
Self {
inner: block,
dtype: dtype,
device_id: device_id,
dtype,
device_id,
py_itr_idx: 0,
}
}
......@@ -77,12 +80,7 @@ impl Block {
fn to_list<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
let layers: Vec<layer::Layer> = (0..self.num_layers())
.map(|layer_idx| {
layer::Layer::from_rust(
self.inner.clone(),
layer_idx,
self.dtype.clone(),
self.device_id,
)
layer::Layer::from_rust(self.inner.clone(), layer_idx, self.dtype, self.device_id)
})
.collect();
PyList::new(py, layers)
......@@ -100,12 +98,7 @@ impl Block {
index, num_layers
)));
}
let layer = layer::Layer::from_rust(
self.inner.clone(),
index,
self.dtype.clone(),
self.device_id,
);
let layer = layer::Layer::from_rust(self.inner.clone(), index, self.dtype, self.device_id);
Ok(layer)
}
......@@ -125,7 +118,7 @@ impl Block {
let layer = layer::Layer::from_rust(
self.inner.clone(),
self.py_itr_idx,
self.dtype.clone(),
self.dtype,
self.device_id,
);
self.py_itr_idx += 1;
......@@ -174,11 +167,15 @@ impl Block {
let mut mutable_block = self.inner.lock().unwrap();
ptr = match &mut *mutable_block {
BlockType::Pinned(block) => {
let mut block_view_mut = block.block_view_mut().map_err(to_pyerr)?;
use dynamo_llm::block_manager::block::private::PrivateToken;
let block_data = block.block_data_mut(PrivateToken);
let mut block_view_mut = block_data.block_view_mut().map_err(to_pyerr)?;
(unsafe { block_view_mut.as_mut_ptr() }) as *mut std::ffi::c_void
}
BlockType::Device(block) => {
let mut block_view_mut = block.block_view_mut().map_err(to_pyerr)?;
use dynamo_llm::block_manager::block::private::PrivateToken;
let block_data = block.block_data_mut(PrivateToken);
let mut block_view_mut = block_data.block_view_mut().map_err(to_pyerr)?;
(unsafe { block_view_mut.as_mut_ptr() }) as *mut std::ffi::c_void
}
};
......@@ -206,7 +203,7 @@ impl Block {
self.inner.clone(),
ptr,
vec![num_blocks, num_layers, num_outer_dims, page_size, inner_dim],
self.dtype.clone(),
self.dtype,
self.device_id,
)
}
......
......@@ -40,8 +40,8 @@ impl BlockList {
.into_iter()
.map(|b| Arc::new(Mutex::new(b)))
.collect(),
dtype: dtype,
device_id: device_id,
dtype,
device_id,
py_itr_idx: 0,
}
}
......@@ -54,7 +54,7 @@ impl BlockList {
let blocks: Vec<block::Block> = self
.inner
.iter()
.map(|b| block::Block::from_rust(b.clone(), self.dtype.clone(), self.device_id))
.map(|b| block::Block::from_rust(b.clone(), self.dtype, self.device_id))
.collect();
PyList::new(py, blocks)
}
......@@ -71,11 +71,7 @@ impl BlockList {
self.inner.len()
)));
}
let block = block::Block::from_rust(
self.inner[index].clone(),
self.dtype.clone(),
self.device_id,
);
let block = block::Block::from_rust(self.inner[index].clone(), self.dtype, self.device_id);
Ok(block)
}
......@@ -94,7 +90,7 @@ impl BlockList {
}
let block = block::Block::from_rust(
self.inner[self.py_itr_idx].clone(),
self.dtype.clone(),
self.dtype,
self.device_id,
);
self.py_itr_idx += 1;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use super::*;
pub use dynamo_llm::block_manager::controller::client::ControlClient;
pub use dynamo_llm::block_manager::controller::{CacheLevel, Controller};
#[pyclass]
pub struct BlockManagerClient {
inner: ControlClient,
}
#[pymethods]
impl BlockManagerClient {
#[new]
fn new(component: Component, instance_id: i64) -> PyResult<Self> {
let client = pyo3_async_runtimes::tokio::get_runtime()
.block_on(ControlClient::new(component.inner, instance_id))
.map_err(to_pyerr)?;
Ok(BlockManagerClient { inner: client })
}
fn reset_pool(&self, cache_level: String) -> PyResult<()> {
let cache_level = Self::cache_level_from_str(&cache_level).map_err(to_pyerr)?;
pyo3_async_runtimes::tokio::get_runtime()
.block_on(self.inner.reset_pool(cache_level))
.map_err(to_pyerr)
}
fn reset_blocks(&self, cache_level: String, blocks: Vec<u64>) -> PyResult<ResetBlocksResponse> {
let cache_level = Self::cache_level_from_str(&cache_level).map_err(to_pyerr)?;
let response = pyo3_async_runtimes::tokio::get_runtime()
.block_on(self.inner.reset_blocks(cache_level, blocks))
.map_err(to_pyerr)?;
Ok(ResetBlocksResponse { inner: response })
}
fn status(&self, cache_level: String) -> PyResult<BlockPoolStatus> {
let cache_level = Self::cache_level_from_str(&cache_level).map_err(to_pyerr)?;
let status = pyo3_async_runtimes::tokio::get_runtime()
.block_on(self.inner.status(cache_level))
.map_err(to_pyerr)?;
Ok(BlockPoolStatus { inner: status })
}
fn reset_all_pools(&self) -> PyResult<()> {
pyo3_async_runtimes::tokio::get_runtime()
.block_on(self.inner.reset_all_pools())
.map_err(to_pyerr)
}
}
impl BlockManagerClient {
// convert string to cache level
fn cache_level_from_str(cache_level: &str) -> anyhow::Result<CacheLevel> {
match cache_level.to_uppercase().as_str() {
"G1" => Ok(CacheLevel::G1),
"G2" => Ok(CacheLevel::G2),
"G3" => Ok(CacheLevel::G3),
_ => anyhow::bail!("Invalid cache level: allowed values are G1, G2, G3"),
}
}
}
#[pyclass]
#[derive(Clone)]
pub struct BlockPoolStatus {
inner: dynamo_llm::block_manager::pool::BlockPoolStatus,
}
#[pymethods]
impl BlockPoolStatus {
fn active_blocks(&self) -> usize {
self.inner.active_blocks
}
fn inactive_blocks(&self) -> usize {
self.inner.inactive_blocks
}
fn empty_blocks(&self) -> usize {
self.inner.empty_blocks
}
}
#[pyclass]
pub struct ResetBlocksResponse {
inner: dynamo_llm::block_manager::pool::ResetBlocksResponse,
}
#[pymethods]
impl ResetBlocksResponse {
fn reset_blocks(&self) -> Vec<u64> {
self.inner.reset_blocks.clone()
}
fn not_found_blocks(&self) -> Vec<u64> {
self.inner.not_found.clone()
}
fn not_reset_blocks(&self) -> Vec<u64> {
self.inner.not_reset.clone()
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use super::*;
mod leader;
mod utils;
mod worker;
pub use leader::KvbmLeader;
pub use utils::get_barrier_id;
pub use worker::{KvbmWorker, VllmTensor};
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use super::*;
use utils::get_barrier_id;
use derive_getters::Dissolve;
use llm_rs::block_manager::distributed::{KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig};
const CPU_CACHE: &str = "DYN_KVBM_CPU_CACHE_GB";
const CPU_CACHE_OVERRIDE: &str = "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS";
const DISK_CACHE: &str = "DYN_KVBM_DISK_CACHE_GB";
const DISK_CACHE_OVERRIDE: &str = "DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS";
const LEADER_WORKER_INIT_TIMEOUT_SECS: &str = "DYN_KVBM_LEADER_WORKER_INIT_TIMEOUT_SECS";
const DEFAULT_INIT_TIMEOUT_SECS: u64 = 120;
fn compute_num_blocks(cache_size_key: &str, override_key: &str, bytes_per_block: usize) -> usize {
if let Ok(override_num_blocks) = std::env::var(override_key) {
override_num_blocks.parse::<usize>().unwrap_or(0)
} else {
let cache_size_gb = std::env::var(cache_size_key)
.unwrap_or_default()
.parse::<f64>()
.unwrap_or(0.0);
((cache_size_gb * 1_000_000_000.0) / bytes_per_block as f64) as usize
}
}
fn get_leader_init_timeout_secs(override_key: &str) -> u64 {
std::env::var(override_key)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(DEFAULT_INIT_TIMEOUT_SECS)
}
#[pyclass]
#[derive(Clone, Dissolve)]
pub struct KvbmLeader {
leader: Arc<KvbmLeaderImpl>,
drt: DistributedRuntime,
}
impl KvbmLeader {
pub fn get_inner(&self) -> Arc<KvbmLeaderImpl> {
self.leader.clone()
}
}
#[pymethods]
impl KvbmLeader {
#[new]
#[pyo3(signature = (bytes_per_block, world_size, drt))]
fn new(bytes_per_block: usize, world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
let num_host_blocks = compute_num_blocks(CPU_CACHE, CPU_CACHE_OVERRIDE, bytes_per_block);
let num_disk_blocks = compute_num_blocks(DISK_CACHE, DISK_CACHE_OVERRIDE, bytes_per_block);
let barrier_id = get_barrier_id();
let leader_init_timeout_sec: u64 =
get_leader_init_timeout_secs(LEADER_WORKER_INIT_TIMEOUT_SECS);
let config = KvbmLeaderConfig::builder()
.barrier_id(barrier_id)
.num_host_blocks(num_host_blocks)
.num_disk_blocks(num_disk_blocks)
.world_size(world_size)
.leader_init_timeout_secs(leader_init_timeout_sec)
.drt(drt.inner().clone())
.build()
.map_err(to_pyerr)?;
let rt = drt.inner().runtime().primary();
let leader =
rt.block_on(async move { KvbmLeaderImpl::new(config).await.map_err(to_pyerr) })?;
Ok(Self {
leader: Arc::new(leader),
drt,
})
}
}
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
pub fn get_barrier_id() -> String {
std::env::var("DYN_KVBM_BARRIER_ID").unwrap_or("kvbm".to_string())
}
......@@ -96,11 +96,11 @@ pub fn dlpack<'py>(
device_id: usize,
) -> PyResult<PyObject> {
let manager_ctx = ManagerCtx::new(DlPackTensor {
block: block,
ptr: ptr,
shape: shape,
dtype: dtype,
device_id: device_id,
block,
ptr,
shape,
dtype,
device_id,
});
let py_capsule = manager_ctx.into_py(py);
Ok(py_capsule)
......
......@@ -17,6 +17,7 @@
use super::*;
use dynamo_llm::block_manager::block::BlockDataExt;
use dynamo_llm::block_manager::block::BlockDataProviderMut;
use pyo3::{types::PyTuple, PyObject, PyResult, Python};
use std::sync::{Arc, Mutex};
......@@ -87,13 +88,17 @@ impl Layer {
let mut mutable_block = self.inner.lock().unwrap();
ptr = match &mut *mutable_block {
block::BlockType::Pinned(block) => {
use dynamo_llm::block_manager::block::private::PrivateToken;
let block_data = block.block_data_mut(PrivateToken);
let mut layer_view_mut =
block.layer_view_mut(self.layer_idx, 0).map_err(to_pyerr)?;
block_data.layer_view_mut(self.layer_idx, 0).map_err(to_pyerr)?;
(unsafe { layer_view_mut.as_mut_ptr() }) as *mut std::ffi::c_void
}
block::BlockType::Device(block) => {
use dynamo_llm::block_manager::block::private::PrivateToken;
let block_data = block.block_data_mut(PrivateToken);
let mut layer_view_mut =
block.layer_view_mut(self.layer_idx, 0).map_err(to_pyerr)?;
block_data.layer_view_mut(self.layer_idx, 0).map_err(to_pyerr)?;
(unsafe { layer_view_mut.as_mut_ptr() }) as *mut std::ffi::c_void
}
};
......@@ -117,7 +122,7 @@ impl Layer {
self.inner.clone(),
ptr,
vec![1, 1, num_outer_dims, page_size, inner_dim],
self.dtype.clone(),
self.dtype,
self.device_id,
)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment