Unverified Commit 47a1f11b authored by Aaron Hao's avatar Aaron Hao Committed by GitHub
Browse files

[docs] Add docs for new RL flows (#36188)


Signed-off-by: default avatarahao-anyscale <ahao@anyscale.com>
Signed-off-by: default avatarHarry Mellor <19981378+hmellor@users.noreply.github.com>
Co-authored-by: default avatarHarry Mellor <19981378+hmellor@users.noreply.github.com>
parent fad09e8a
...@@ -1573,7 +1573,7 @@ steps: ...@@ -1573,7 +1573,7 @@ steps:
- tests/compile/fullgraph/test_basic_correctness.py - tests/compile/fullgraph/test_basic_correctness.py
- examples/offline_inference/rlhf.py - examples/offline_inference/rlhf.py
- examples/offline_inference/rlhf_colocate.py - examples/offline_inference/rlhf_colocate.py
- examples/offline_inference/new_weight_syncing/ - examples/rl/
- tests/examples/offline_inference/data_parallel.py - tests/examples/offline_inference/data_parallel.py
- tests/v1/distributed - tests/v1/distributed
- tests/v1/engine/test_engine_core_client.py - tests/v1/engine/test_engine_core_client.py
...@@ -1615,7 +1615,7 @@ steps: ...@@ -1615,7 +1615,7 @@ steps:
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py
- popd - popd
# NEW rlhf examples # NEW rlhf examples
- pushd ../examples/offline_inference/new_weight_syncing - pushd ../examples/rl
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_async_new_apis.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_async_new_apis.py
...@@ -2660,7 +2660,7 @@ steps: ...@@ -2660,7 +2660,7 @@ steps:
- tests/v1/entrypoints/openai/test_multi_api_servers.py - tests/v1/entrypoints/openai/test_multi_api_servers.py
- tests/v1/shutdown - tests/v1/shutdown
- tests/v1/worker/test_worker_memory_snapshot.py - tests/v1/worker/test_worker_memory_snapshot.py
- examples/offline_inference/new_weight_syncing/ - examples/rl/
commands: commands:
# Work around HIP bug tracked here: https://github.com/ROCm/hip/issues/3876 # Work around HIP bug tracked here: https://github.com/ROCm/hip/issues/3876
# TODO: Remove when the bug is fixed in a future ROCm release # TODO: Remove when the bug is fixed in a future ROCm release
...@@ -3325,7 +3325,7 @@ steps: ...@@ -3325,7 +3325,7 @@ steps:
- tests/compile/fullgraph/test_basic_correctness.py - tests/compile/fullgraph/test_basic_correctness.py
- examples/offline_inference/rlhf.py - examples/offline_inference/rlhf.py
- examples/offline_inference/rlhf_colocate.py - examples/offline_inference/rlhf_colocate.py
- examples/offline_inference/new_weight_syncing/ - examples/rl/
- tests/examples/offline_inference/data_parallel.py - tests/examples/offline_inference/data_parallel.py
- tests/v1/distributed - tests/v1/distributed
- tests/v1/engine/test_engine_core_client.py - tests/v1/engine/test_engine_core_client.py
...@@ -3367,7 +3367,7 @@ steps: ...@@ -3367,7 +3367,7 @@ steps:
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py
- popd - popd
# NEW rlhf examples # NEW rlhf examples
- pushd ../examples/offline_inference/new_weight_syncing - pushd ../examples/rl
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_async_new_apis.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_async_new_apis.py
......
...@@ -82,7 +82,7 @@ steps: ...@@ -82,7 +82,7 @@ steps:
- label: Distributed Torchrun + Examples (4 GPUs) - label: Distributed Torchrun + Examples (4 GPUs)
timeout_in_minutes: 30 timeout_in_minutes: 30
working_dir: "/vllm-workspace/tests" working_dir: "/vllm-workspace"
num_devices: 4 num_devices: 4
source_file_dependencies: source_file_dependencies:
- vllm/distributed/ - vllm/distributed/
...@@ -90,33 +90,28 @@ steps: ...@@ -90,33 +90,28 @@ steps:
- tests/distributed/test_torchrun_example_moe.py - tests/distributed/test_torchrun_example_moe.py
- examples/offline_inference/rlhf.py - examples/offline_inference/rlhf.py
- examples/offline_inference/rlhf_colocate.py - examples/offline_inference/rlhf_colocate.py
- examples/offline_inference/new_weight_syncing/ - examples/rl/
- tests/examples/offline_inference/data_parallel.py - tests/examples/offline_inference/data_parallel.py
commands: commands:
# https://github.com/NVIDIA/nccl/issues/1838 # https://github.com/NVIDIA/nccl/issues/1838
- export NCCL_CUMEM_HOST_ENABLE=0 - export NCCL_CUMEM_HOST_ENABLE=0
# test with torchrun tp=2 and external_dp=2 # test with torchrun tp=2 and external_dp=2
- torchrun --nproc-per-node=4 distributed/test_torchrun_example.py - torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example.py
# test with torchrun tp=2 and pp=2 # test with torchrun tp=2 and pp=2
- PP_SIZE=2 torchrun --nproc-per-node=4 distributed/test_torchrun_example.py - PP_SIZE=2 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example.py
# test with torchrun tp=4 and dp=1 # test with torchrun tp=4 and dp=1
- TP_SIZE=4 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py - TP_SIZE=4 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py
# test with torchrun tp=2, pp=2 and dp=1 # test with torchrun tp=2, pp=2 and dp=1
- PP_SIZE=2 TP_SIZE=2 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py - PP_SIZE=2 TP_SIZE=2 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py
# test with torchrun tp=1 and dp=4 with ep # test with torchrun tp=1 and dp=4 with ep
- DP_SIZE=4 ENABLE_EP=1 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py - DP_SIZE=4 ENABLE_EP=1 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py
# test with torchrun tp=2 and dp=2 with ep # test with torchrun tp=2 and dp=2 with ep
- TP_SIZE=2 DP_SIZE=2 ENABLE_EP=1 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py - TP_SIZE=2 DP_SIZE=2 ENABLE_EP=1 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py
# test with internal dp # test with internal dp
- python3 ../examples/offline_inference/data_parallel.py --enforce-eager - python3 examples/offline_inference/data_parallel.py --enforce-eager
# OLD rlhf examples # rlhf examples
- cd ../examples/offline_inference - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 examples/rl/rlhf_nccl.py
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 examples/rl/rlhf_ipc.py
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py
# NEW rlhf examples
- cd new_weight_syncing
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py
- VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py
- label: Distributed DP Tests (4 GPUs) - label: Distributed DP Tests (4 GPUs)
timeout_in_minutes: 30 timeout_in_minutes: 30
......
...@@ -23,15 +23,18 @@ def title(text: str) -> str: ...@@ -23,15 +23,18 @@ def title(text: str) -> str:
# Custom substitutions # Custom substitutions
subs = { subs = {
"io": "IO", "io": "IO",
"api": "API", "rl": "RL",
"api(s?)": r"API\1",
"cli": "CLI", "cli": "CLI",
"cpu": "CPU", "cpu": "CPU",
"ipc": "IPC",
"llm": "LLM", "llm": "LLM",
"mae": "MAE", "mae": "MAE",
"ner": "NER", "ner": "NER",
"tpu": "TPU", "tpu": "TPU",
"gguf": "GGUF", "gguf": "GGUF",
"lora": "LoRA", "lora": "LoRA",
"nccl": "NCCL",
"rlhf": "RLHF", "rlhf": "RLHF",
"vllm": "vLLM", "vllm": "vLLM",
"openai": "OpenAI", "openai": "OpenAI",
...@@ -196,6 +199,11 @@ class Example: ...@@ -196,6 +199,11 @@ class Example:
def on_startup(command: Literal["build", "gh-deploy", "serve"], dirty: bool): def on_startup(command: Literal["build", "gh-deploy", "serve"], dirty: bool):
# Monkey-patch dirname_to_title in awesome-nav so that sub-directory names are
# title-cased (e.g. "Offline Inference" instead of "Offline inference").
import mkdocs_awesome_nav.nav.directory as _nav_dir
_nav_dir.dirname_to_title = title
logger.info("Generating example documentation") logger.info("Generating example documentation")
logger.debug("Root directory: %s", ROOT_DIR.resolve()) logger.debug("Root directory: %s", ROOT_DIR.resolve())
logger.debug("Example directory: %s", EXAMPLE_DIR.resolve()) logger.debug("Example directory: %s", EXAMPLE_DIR.resolve())
......
# Async Reinforcement Learning
## Overview
In a standard RL training loop, generation and training happen sequentially: the policy generates rollouts, then training runs on those rollouts, and the cycle repeats. During generation the training accelerators sit idle, and vice versa.
The **one-off pipelining** approach separates the generation and training phases into two parallel coroutines, allowing the model to generate new samples while simultaneously training on previously generated data. This can lead to better GPU utilization and greater training throughput.
However, this overlap introduces a complication: weights must be updated in the inference engine mid-flight, while requests may still be in progress.
## The Pause and Resume API
To safely update weights while the inference engine is running, vLLM provides `pause_generation` and `resume_generation` methods. These let the trainer coordinate a clean window for weight synchronization without losing in-flight work.
### pause_generation
```python
await engine.pause_generation(mode="keep", clear_cache=True)
```
The `mode` parameter controls how in-flight requests are handled:
| Mode | Behavior |
| ---- | -------- |
| `"abort"` | Abort all in-flight requests immediately and return partial results (default) |
| `"wait"` | Wait for all in-flight requests to finish before pausing |
| `"keep"` | Freeze requests in the queue; they resume when `resume_generation` is called |
The `clear_cache` parameter controls whether to clear the KV cache and prefix cache after pausing.
### resume_generation
```python
await engine.resume_generation()
```
Resumes the scheduler after a pause. Any requests frozen with `mode="keep"` will continue generating.
### HTTP Endpoints
When using the vLLM HTTP server, the same functionality is available via:
- `POST /pause?mode=keep` - Pause generation
- `POST /resume` - Resume generation
!!! note "Data Parallelism"
When using data parallelism with vLLM's **internal load balancer** (i.e. `data_parallel_backend="ray"`), pause and resume are handled automatically across all DP ranks -- a single call is sufficient. When using an **external load balancer** (i.e. multiple independent vLLM instances behind a proxy), you must send pause and resume requests to **every** engine instance individually before and after the weight update.
## Typical Async RL Flow
A typical async RL loop with weight syncing looks like this:
1. Start generating rollouts from the current policy
2. Once trainer has new weights to update to, pause generation with `mode="keep"`
3. Sync the updated weights from the trainer to the inference engine (see [Weight Transfer](weight_transfer/README.md))
4. Resume generation -- in-flight requests continue with the new weights
5. Repeat
The key insight is that requests paused with `mode="keep"` will produce tokens from the **old** weights before the pause and tokens from the **new** weights after resume. The `clear_cache` parameter controls whether the KV cache is invalidated during the pause. When `clear_cache=True`, previously cached key-value entries are discarded, so all tokens generated after resume will be computed entirely with the new weights. When `clear_cache=False`, existing KV cache entries are retained, meaning some tokens in context may still reflect the old weights (stale KV cache).
## Example
The [async RLHF example](../examples/rl/rlhf_async_new_apis.md) demonstrates this pattern with `vllm.AsyncLLMEngine`, NCCL weight transfer, and mid-flight pause/resume with validation.
...@@ -16,11 +16,9 @@ The following open-source RL libraries use vLLM for fast rollouts (sorted alphab ...@@ -16,11 +16,9 @@ The following open-source RL libraries use vLLM for fast rollouts (sorted alphab
- [Unsloth](https://github.com/unslothai/unsloth) - [Unsloth](https://github.com/unslothai/unsloth)
- [verl](https://github.com/volcengine/verl) - [verl](https://github.com/volcengine/verl)
See the following basic examples to get started if you don't want to use an existing library: For weight synchronization between training and inference, see the [Weight Transfer](weight_transfer/README.md) documentation, which covers the pluggable backend system with [NCCL](weight_transfer/nccl.md) (multi-GPU) and [IPC](weight_transfer/ipc.md) (same-GPU) engines.
- [Training and inference processes are located on separate GPUs (inspired by OpenRLHF)](../examples/offline_inference/rlhf.md) For pipelining generation and training to improve GPU utilization and throughput, see the [Async Reinforcement Learning](async_rl.md) guide, which covers the pause/resume API for safely updating weights mid-flight.
- [Training and inference processes are colocated on the same GPUs using Ray](../examples/offline_inference/rlhf_colocate.md)
- [Utilities for performing RLHF with vLLM](../examples/offline_inference/rlhf_utils.md)
See the following notebooks showing how to use vLLM for GRPO: See the following notebooks showing how to use vLLM for GRPO:
......
# Weight Transfer
vLLM provides a pluggable weight transfer system for synchronizing model weights from a training process to the inference engine during reinforcement learning (RL) workflows. This is essential for RLHF, GRPO, and other online RL methods where the policy model is iteratively updated during training and the updated weights must be reflected in the inference engine for rollout generation.
## Architecture
The weight transfer system follows a **two-phase protocol** with a pluggable backend design:
1. **Initialization** (`init_weight_transfer_engine`): Establishes the communication channel between the trainer and inference workers. Called once before the training loop begins.
2. **Weight Update** (`update_weights`): Transfers updated weights from the trainer to the inference engine. Called after each training step (or batch of steps).
## Available Backends
| Backend | Transport | Use Case |
| ------- | --------- | -------- |
| [NCCL](nccl.md) | NCCL broadcast | Separate GPUs for training and inference |
| [IPC](ipc.md) | CUDA IPC handles | Colocated training and inference on same GPU |
## Configuration
Specify the weight transfer backend through `WeightTransferConfig`. The backend determines which engine handles the weight synchronization.
### Programmatic (Offline Inference)
```python
from vllm import LLM
from vllm.config import WeightTransferConfig
llm = LLM(
model="my-model",
weight_transfer_config=WeightTransferConfig(backend="nccl"), # or "ipc"
)
```
### CLI (Online Serving)
```bash
vllm serve my-model \
--weight-transfer-config '{"backend": "nccl"}'
```
The `backend` field accepts `"nccl"` (default) or `"ipc"`.
## API Endpoints
When running vLLM as an HTTP server, the following endpoints are available for weight transfer:
| Endpoint | Method | Description |
| -------- | ------ | ----------- |
| `/init_weight_transfer_engine` | POST | Initialize the weight transfer engine with backend-specific info |
| `/update_weights` | POST | Trigger a weight update with backend-specific metadata |
| `/pause` | POST | Pause generation before weight sync to handle inflight requests |
| `/resume` | POST | Resume generation after weight sync |
| `/get_world_size` | GET | Get the number of inference workers (useful for NCCL world size calculation) |
!!! note
The HTTP weight transfer endpoints require `VLLM_SERVER_DEV_MODE=1` to be set.
## Trainer-Side API
Both backends provide static methods that the trainer calls to send weights. The general pattern is:
```python
# 1. Initialize the transfer engine (backend-specific)
EngineClass.trainer_init(init_info)
# 2. Send weights to inference workers
EngineClass.trainer_send_weights(
iterator=model.named_parameters(),
trainer_args=backend_specific_args,
)
```
See the [NCCL](nccl.md) and [IPC](ipc.md) pages for backend-specific trainer APIs and full examples.
## Extending the System
The weight transfer system is designed to be extensible. You can implement custom backends by subclassing `WeightTransferEngine` and registering them with the factory. See the [Base Class](base.md) page for details.
# Base Class and Custom Engines
The weight transfer system is built on an abstract base class that defines the contract between vLLM's worker infrastructure and the transport backend. You can implement custom backends by subclassing `WeightTransferEngine` and registering them with the `WeightTransferEngineFactory`.
## WeightTransferEngine
The `WeightTransferEngine` is a generic abstract class parameterized by two dataclass types:
- **`TInitInfo`** (extends `WeightTransferInitInfo`): Backend-specific initialization parameters.
- **`TUpdateInfo`** (extends `WeightTransferUpdateInfo`): Backend-specific weight update metadata.
### Abstract Methods
Subclasses must implement these four methods:
| Method | Side | Description |
| ------ | ---- | ----------- |
| `init_transfer_engine(init_info)` | Inference | Initialize the communication channel on each inference worker |
| `receive_weights(update_info, load_weights)` | Inference | Receive weights and call `load_weights` incrementally |
| `shutdown()` | Inference | Clean up resources |
| `trainer_send_weights(iterator, trainer_args)` | Trainer | Static method to send weights from the trainer process |
### Request Classes
The API-level request classes provide backend-agnostic serialization using plain dictionaries. The engine's `parse_init_info` and `parse_update_info` methods convert these dictionaries into typed dataclasses.
```python
from vllm.distributed.weight_transfer.base import (
WeightTransferInitRequest,
WeightTransferUpdateRequest,
)
# Init request (dict is converted to backend-specific TInitInfo)
init_request = WeightTransferInitRequest(
init_info={"master_address": "10.0.0.1", "master_port": 29500, ...}
)
# Update request (dict is converted to backend-specific TUpdateInfo)
update_request = WeightTransferUpdateRequest(
update_info={"names": [...], "dtype_names": [...], "shapes": [...]}
)
```
### WeightTransferUpdateInfo
The base `WeightTransferUpdateInfo` includes an `is_checkpoint_format` flag:
```python
@dataclass
class WeightTransferUpdateInfo(ABC):
is_checkpoint_format: bool = True
```
When `is_checkpoint_format=True` (the default), vLLM applies layerwise weight processing (repacking, renaming, etc.) on the received weights before loading them. Set to `False` if the trainer has already converted weights to the kernel format expected by the model.
## Implementing a Custom Engine
To create a custom weight transfer backend:
### 1. Define Info Dataclasses
```python
from dataclasses import dataclass
from vllm.distributed.weight_transfer.base import (
WeightTransferEngine,
WeightTransferInitInfo,
WeightTransferUpdateInfo,
)
@dataclass
class MyInitInfo(WeightTransferInitInfo):
endpoint: str
token: str
@dataclass
class MyUpdateInfo(WeightTransferUpdateInfo):
names: list[str]
dtype_names: list[str]
shapes: list[list[int]]
# Add custom fields as needed
```
### 2. Implement the Engine
```python
from collections.abc import Callable, Iterator
from typing import Any
import torch
class MyWeightTransferEngine(WeightTransferEngine[MyInitInfo, MyUpdateInfo]):
init_info_cls = MyInitInfo
update_info_cls = MyUpdateInfo
def init_transfer_engine(self, init_info: MyInitInfo) -> None:
# Set up connection to trainer using init_info.endpoint, etc.
...
def receive_weights(
self,
update_info: MyUpdateInfo,
load_weights: Callable[[list[tuple[str, torch.Tensor]]], None],
) -> None:
# Receive each weight and call load_weights incrementally
for name, dtype_name, shape in zip(
update_info.names, update_info.dtype_names, update_info.shapes
):
dtype = getattr(torch, dtype_name)
weight = self._fetch_weight(name, shape, dtype)
load_weights([(name, weight)])
def shutdown(self) -> None:
# Clean up resources
...
@staticmethod
def trainer_send_weights(
iterator: Iterator[tuple[str, torch.Tensor]],
trainer_args: dict[str, Any],
) -> None:
# Send weights from the trainer process
for name, tensor in iterator:
# Send tensor via custom transport
...
```
!!! important
The `load_weights` callable passed to `receive_weights` should be called **incrementally** (one or a few weights at a time) rather than accumulating all weights first. This avoids GPU out-of-memory errors with large models.
### 3. Register with the Factory
```python
from vllm.distributed.weight_transfer.factory import WeightTransferEngineFactory
# Option 1: Lazy loading (recommended for built-in engines)
WeightTransferEngineFactory.register_engine(
"my_backend",
"my_package.my_module",
"MyWeightTransferEngine",
)
# Option 2: Direct class registration
WeightTransferEngineFactory.register_engine(
"my_backend",
MyWeightTransferEngine,
)
```
Once registered, users can select your backend via `WeightTransferConfig(backend="my_backend")`.
## WeightTransferEngineFactory
The factory uses a registry pattern with lazy loading. Built-in engines (`nccl` and `ipc`) are registered at import time but their modules are only loaded when the backend is actually requested. This avoids importing heavy dependencies (like NCCL communicators) when they aren't needed.
```python
from vllm.distributed.weight_transfer.factory import WeightTransferEngineFactory
# Create an engine from config
engine = WeightTransferEngineFactory.create_engine(
config=weight_transfer_config,
parallel_config=parallel_config,
)
```
# IPC Engine
The IPC weight transfer engine uses **CUDA IPC** (Inter-Process Communication) handles to share GPU memory directly between the trainer and inference workers on the **same node and same GPU**. This avoids any data copying, making it a efficient option when colocating training and inference.
## When to Use IPC
- Training and inference on the **same GPU** (colocated)
- You want to minimize memory overhead by sharing tensors in-place
## How It Works
1. The trainer creates CUDA tensors for each weight and generates IPC handles using `torch.multiprocessing.reductions.reduce_tensor`.
2. IPC handles are sent to the inference engine via **Ray.remote()** or **HTTP POST**.
3. The inference worker reconstructs the tensors from the handles, reading directly from the trainer's GPU memory.
!!! warning
IPC handles involve sending serialized Python objects. When using HTTP transport, you must set `VLLM_ALLOW_INSECURE_SERIALIZATION=1` on both the server and client. This is because IPC handles are pickled and base64-encoded for HTTP transmission.
## Initialization
The IPC backend requires no initialization on either side. The `init_transfer_engine` call is a no-op for IPC.
## Sending Weights
IPC supports two transport modes for delivering the handles:
### Ray Mode
Used when vLLM is running as a Ray actor:
```python
from vllm.distributed.weight_transfer.ipc_engine import (
IPCTrainerSendWeightsArgs,
IPCWeightTransferEngine,
)
trainer_args = IPCTrainerSendWeightsArgs(
mode="ray",
llm_handle=llm_actor_handle,
)
IPCWeightTransferEngine.trainer_send_weights(
iterator=model.named_parameters(),
trainer_args=trainer_args,
)
```
In Ray mode, the engine calls `llm_handle.update_weights.remote(...)` directly, passing the IPC handles via Ray's serialization.
### HTTP Mode
Used when vLLM is running as an HTTP server:
```python
trainer_args = IPCTrainerSendWeightsArgs(
mode="http",
url="http://localhost:8000",
)
IPCWeightTransferEngine.trainer_send_weights(
iterator=model.named_parameters(),
trainer_args=trainer_args,
)
```
In HTTP mode, IPC handles are pickled, base64-encoded, and sent as JSON to the `/update_weights` endpoint.
See [`IPCTrainerSendWeightsArgs`](https://github.com/vllm-project/vllm/blob/main/vllm/distributed/weight_transfer/ipc_engine.py) for the full list of configurable fields.
## Examples
- [RLHF with IPC weight syncing (offline, Ray)](../../examples/rl/rlhf_ipc.md) - Colocated training and inference on a single GPU using Ray placement groups and CUDA IPC handles
- [RLHF with IPC weight syncing (online serving, HTTP)](../../examples/rl/rlhf_http_ipc.md) - Weight transfer with a vLLM HTTP server where both server and trainer share the same GPU
# NCCL Engine
The NCCL weight transfer engine uses [NCCL](https://developer.nvidia.com/nccl) broadcast operations to transfer weights from the trainer to inference workers. It supports **multi-node** and **multi-GPU** setups where the trainer and inference engine run on separate GPUs.
## When to Use NCCL
- Training and inference on **separate GPUs** (possibly across nodes)
- **Tensor-parallel** inference with multiple workers that all need the updated weights
- You need high-bandwidth, low-latency weight transfer over NVLink or InfiniBand
## How It Works
1. The trainer and all inference workers join a shared NCCL process group using `StatelessProcessGroup` (vLLM's torch.distributed-independent group abstraction).
2. The trainer broadcasts weights to all workers simultaneously. Each worker receives and loads weights incrementally.
3. Optionally, **packed tensor broadcasting** batches multiple small tensors into larger buffers with double/triple buffering and CUDA stream overlap for higher throughput. This implementation is based on [NeMo-RL's packed tensor](https://github.com/NVIDIA-NeMo/RL/blob/main/nemo_rl/utils/packed_tensor.py).
## Initialization
NCCL requires explicit process group setup. The trainer and inference workers must agree on a master address, port, and world size.
### Inference Side
```python
from vllm.distributed.weight_transfer.base import WeightTransferInitRequest
# rank_offset accounts for the trainer occupying rank 0
llm.init_weight_transfer_engine(
WeightTransferInitRequest(
init_info=dict(
master_address=master_address,
master_port=master_port,
rank_offset=1,
world_size=world_size, # trainer + all inference workers
)
)
)
```
### Trainer Side
```python
from vllm.distributed.weight_transfer.nccl_engine import (
NCCLWeightTransferEngine,
)
group = NCCLWeightTransferEngine.trainer_init(
dict(
master_address=master_address,
master_port=master_port,
world_size=world_size,
)
)
```
!!! note
`trainer_init` always assigns the trainer to rank 0. Inference workers start at `rank_offset` (typically 1).
## Sending Weights
```python
from vllm.distributed.weight_transfer.nccl_engine import (
NCCLTrainerSendWeightsArgs,
NCCLWeightTransferEngine,
)
trainer_args = NCCLTrainerSendWeightsArgs(
group=group,
packed=True, # use packed broadcasting for efficiency
)
NCCLWeightTransferEngine.trainer_send_weights(
iterator=model.named_parameters(),
trainer_args=trainer_args,
)
```
See [`NCCLTrainerSendWeightsArgs`](https://github.com/vllm-project/vllm/blob/main/vllm/distributed/weight_transfer/nccl_engine.py) for the full list of configurable fields.
### Packed Tensor Broadcasting
When `packed=True`, multiple weight tensors are packed into large contiguous buffers before broadcasting. This reduces the number of NCCL operations and uses double/triple buffering with dedicated CUDA streams for overlap between packing, broadcasting, and unpacking.
Both the trainer (`NCCLTrainerSendWeightsArgs`) and inference side (`NCCLWeightTransferUpdateInfo`) must use matching `packed_buffer_size_bytes` and `packed_num_buffers` values.
## Receiving Weights (Inference Side)
The inference side triggers weight reception by calling `update_weights`:
```python
from vllm.distributed.weight_transfer.base import WeightTransferUpdateRequest
llm.update_weights(
WeightTransferUpdateRequest(
update_info=dict(
names=names,
dtype_names=dtype_names,
shapes=shapes,
packed=True,
)
)
)
```
The `names`, `dtype_names`, and `shapes` lists describe each parameter. These must match the order in which the trainer iterates over its parameters.
## Examples
- [RLHF with NCCL weight syncing (offline, Ray)](../../examples/rl/rlhf_nccl.md) - Trainer on one GPU, 2x tensor-parallel vLLM engine on two others, with packed NCCL weight broadcast
- [RLHF with async weight syncing (offline, Ray)](../../examples/rl/rlhf_async_new_apis.md) - Async generation with mid-flight pause, weight sync, resume, and validation against a fresh model
- [RLHF with NCCL weight syncing (online serving, HTTP)](../../examples/rl/rlhf_http_nccl.md) - Weight transfer with a running vLLM HTTP server using HTTP control plane and NCCL data plane
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Demonstrates reinforcement learning from human feedback (RLHF) using vLLM and Ray.
The script separates training and inference workloads onto distinct GPUs
so that Ray can manage process placement and inter-process communication.
A Hugging Face Transformer model occupies GPU 0 for training, whereas a
tensor-parallel vLLM inference engine occupies GPU 1–2.
The example performs the following steps:
* Load the training model on GPU 0.
* Split the inference model across GPUs 1–2 using vLLM's tensor parallelism
and Ray placement groups.
* Generate text from a list of prompts using the inference engine.
* Update the weights of the training model and broadcast the updated weights
to the inference engine by using a Ray collective RPC group. Note that
for demonstration purposes we simply zero out the weights.
For a production-ready implementation that supports multiple training and
inference replicas, see the OpenRLHF framework:
https://github.com/OpenRLHF/OpenRLHF
This example assumes a single-node cluster with three GPUs, but Ray
supports multi-node clusters. vLLM expects the GPUs are only used for vLLM
workloads. Residual GPU activity interferes with vLLM memory profiling and
causes unexpected behavior.
"""
import os
import ray
import torch
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from rlhf_utils import stateless_init_process_group
from transformers import AutoModelForCausalLM
from vllm import LLM, SamplingParams
from vllm.utils.network_utils import get_ip, get_open_port
class MyLLM(LLM):
"""Configure the vLLM worker for Ray placement group execution."""
def __init__(self, *args, **kwargs):
# Remove the top-level CUDA_VISIBLE_DEVICES variable set by Ray
# so that vLLM can manage its own device placement within the worker.
os.environ.pop("CUDA_VISIBLE_DEVICES", None)
super().__init__(*args, **kwargs)
# Load the OPT-125M model onto GPU 0 for the training workload.
train_model = AutoModelForCausalLM.from_pretrained("facebook/opt-125m")
train_model.to("cuda:0")
# Initialize Ray and set the visible devices. The vLLM engine will
# be placed on GPUs 1 and 2.
os.environ["CUDA_VISIBLE_DEVICES"] = "1,2"
ray.init()
# Create a placement group that reserves GPU 1–2 for the vLLM inference engine.
# Learn more about Ray placement groups:
# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html
pg_inference = placement_group([{"GPU": 1, "CPU": 0}] * 2)
ray.get(pg_inference.ready())
scheduling_inference = PlacementGroupSchedulingStrategy(
placement_group=pg_inference,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=0,
)
# Launch the vLLM inference engine. The `enforce_eager` flag reduces
# start-up latency.
llm = ray.remote(
num_cpus=0,
num_gpus=0,
scheduling_strategy=scheduling_inference,
)(MyLLM).remote(
model="facebook/opt-125m",
enforce_eager=True,
worker_extension_cls="rlhf_utils.WorkerExtension",
tensor_parallel_size=2,
distributed_executor_backend="ray",
)
# Generate text from the prompts.
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
sampling_params = SamplingParams(temperature=0)
outputs = ray.get(llm.generate.remote(prompts, sampling_params))
print("-" * 50)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}")
print("-" * 50)
# Set up the communication channel between the training process and the
# inference engine.
master_address = get_ip()
master_port = get_open_port()
handle = llm.collective_rpc.remote(
"init_weight_update_group", args=(master_address, master_port, 1, 3)
)
model_update_group = stateless_init_process_group(
master_address, master_port, 0, 3, torch.device("cuda:0")
)
ray.get(handle)
# Simulate a training step by zeroing out all model weights.
# In a real RLHF training loop the weights would be updated using the gradient
# from an RL objective such as PPO on a reward model.
for name, p in train_model.named_parameters():
p.data.zero_()
# Synchronize the updated weights to the inference engine.
for name, p in train_model.named_parameters():
dtype_name = str(p.dtype).split(".")[-1]
handle = llm.collective_rpc.remote(
"update_weight", args=(name, dtype_name, p.shape)
)
model_update_group.broadcast(p, src=0, stream=torch.cuda.current_stream())
ray.get(handle)
# Verify that the inference weights have been updated.
assert all(ray.get(llm.collective_rpc.remote("check_weights_changed")))
# Generate text with the updated model. The output is expected to be nonsense
# because the weights are zero.
outputs_updated = ray.get(llm.generate.remote(prompts, sampling_params))
print("-" * 50)
for output in outputs_updated:
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}")
print("-" * 50)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Demonstrates how to co-locate a vLLM inference worker and training
actors on the same set of GPUs for reinforcement learning from human feedback
(RLHF) workloads.
Ray serves as the distributed execution framework in this example. Ray
placement groups allocate both training actors and vLLM workers to the
same GPU bundles, enabling fast, in-GPU communication between the two
components.
The script shows how to do the following:
* Configure environment variables (`VLLM_RAY_PER_WORKER_GPUS` and
`VLLM_RAY_BUNDLE_INDICES`) so that vLLM workers land on the desired
devices.
* Exchange tensors between processes by means of CUDA inter-process
communication (IPC). CUDA IPC sidesteps NCCL limitations that occur
when multiple processes share a single GPU.
Note that this example assumes a single-node cluster with four GPUs, but Ray
supports multi-node clusters. vLLM expects exclusive use of the GPUs during
its initialization for memory profiling. Residual GPU activity interferes
with vLLM memory profiling and causes unexpected behavior.
Learn more about Ray placement groups:
https://docs.ray.io/en/latest/placement-groups.html
"""
import gc
import os
import sys
import ray
import torch
import zmq
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from torch.multiprocessing.reductions import reduce_tensor
from vllm import LLM
if torch.version.hip is not None:
print("Skipping test for ROCm. Ray is unsupported on vLLM ROCm.")
sys.exit(0)
class MyLLM(LLM):
"""Configure the vLLM worker for Ray placement group execution.
The constructor sets environment variables that allow multiple vLLM
workers to share a single physical GPU and that encode the bundle
indices assigned by the placement group.
Args:
*args: Positional arguments forwarded to `vllm.LLM`.
bundle_indices (list[int]): Placement-group bundle indices
assigned to this worker.
**kwargs: Keyword arguments forwarded to `vllm.LLM`.
"""
def __init__(self, *args, bundle_indices: list[int], **kwargs):
# Prevent Ray from manipulating the top-level CUDA_VISIBLE_DEVICES variable
# so that vLLM can its own device placement inside the worker.
os.environ.pop("CUDA_VISIBLE_DEVICES", None)
# Each worker uses 0.4 GPU so that two instances fit on the same GPUs.
os.environ["VLLM_RAY_PER_WORKER_GPUS"] = "0.4"
os.environ["VLLM_RAY_BUNDLE_INDICES"] = ",".join(map(str, bundle_indices))
print(f"creating LLM with bundle_indices={bundle_indices}")
super().__init__(*args, **kwargs)
class RayTrainingActor:
"""Training actor that hosts a Facebook OPT-125M model from Hugging Face.
The model is loaded onto the first GPU assigned to this actor, and expose
the CUDA IPC handles so that colocated vLLM workers can map tensors
directly.
"""
def __init__(self):
# Ray sets CUDA_VISIBLE_DEVICES to the GPUs assigned to this actor.
from transformers import AutoModelForCausalLM
self.model = AutoModelForCausalLM.from_pretrained("facebook/opt-125m")
self.model.to("cuda:0")
# Zero out all the parameters.
for name, p in self.model.named_parameters():
p.data.zero_()
torch.accelerator.synchronize()
# The argument for `get_device_uuid` is the index of the GPU in the
# list of visible devices.
from vllm.platforms import current_platform
self.device_uuid = current_platform.get_device_uuid(0)
self.zmq_context = zmq.Context()
self.zmq_address_counter = 0
self.zmq_handle = None
def report_device_id(self) -> str:
return self.device_uuid
def get_zmq_handles(self) -> dict[str, str]:
suffix = f"{self.device_uuid}-{self.zmq_address_counter}"
self.zmq_handle = f"ipc:///tmp/rl-colocate-zmq-{suffix}.sock"
self.zmq_address_counter += 1
return {self.device_uuid: self.zmq_handle}
def update_weights(self):
# align size to avoid misaligned address
align_size = 256
def get_size(p: torch.Tensor) -> int:
return (p.nbytes + align_size - 1) // align_size * align_size
named_parameters: dict[str, torch.nn.Parameter] = dict(
self.model.named_parameters()
)
max_tensor_size = max(get_size(p) for p in named_parameters.values())
# use max_tensor_size * 2 as buffer size
buffer = torch.empty(max_tensor_size * 2, dtype=torch.uint8, device="cuda:0")
s = self.zmq_context.socket(zmq.REQ)
s.bind(self.zmq_handle)
handle = reduce_tensor(buffer)
offset = 0
buckets: list[tuple[list[dict], list[torch.Tensor]]] = []
named_tensors: list[dict] = []
real_tensors: list[torch.Tensor] = []
for name, p in named_parameters.items():
size = get_size(p)
if offset + size > buffer.numel():
buckets.append((named_tensors, real_tensors))
named_tensors, real_tensors = [], []
offset = 0
# assume tensors are contiguous
named_tensors.append(
{"name": name, "dtype": p.dtype, "shape": p.shape, "offset": offset}
)
real_tensors.append(p)
offset += size
if named_tensors:
buckets.append((named_tensors, real_tensors))
s.send_pyobj(handle)
s.recv()
for named_tensors, real_tensors in buckets:
offset = 0
for p in real_tensors:
buffer[offset : offset + p.nbytes].data.copy_(
p.data.view(-1).view(dtype=torch.uint8), non_blocking=True
)
offset += get_size(p)
torch.accelerator.synchronize()
s.send_pyobj(named_tensors)
s.recv()
s.send_pyobj(None)
s.recv()
s.close()
del buffer
gc.collect()
torch.accelerator.empty_cache()
# Ray manages four GPUs.
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
ray.init()
# Co-locate vLLM instances and training actors on the same set of GPUs:
# * GPU 0 and 1: training actor 0, training actor 1, and vLLM instance 0
# (tensor parallelism = 2).
# * GPU 2 and 3: training actor 2, training actor 3, and vLLM instance 1
# (tensor parallelism = 2).
pg = placement_group([{"GPU": 1, "CPU": 0}] * 4)
ray.get(pg.ready())
print(f"placement group has bundles {pg.bundle_specs=}")
training_actors = []
training_actor_device_ids = []
inference_engines = []
inference_engine_device_ids = []
for bundle_index in [0, 1, 2, 3]:
training_actor = ray.remote(
num_cpus=0,
num_gpus=0.4,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=bundle_index,
),
)(RayTrainingActor).remote()
training_actors.append(training_actor)
for bundle_index, training_actor in enumerate(training_actors):
device_id = ray.get(training_actor.report_device_id.remote())
print(f"training actor {bundle_index} is on {device_id}")
training_actor_device_ids.append(device_id)
for i, bundle_indices in enumerate([[0, 1], [2, 3]]):
# Use the following syntax instead of the @ray.remote decorator so that
# the placement group is customized for each bundle.
llm = ray.remote(
num_cpus=0,
num_gpus=0,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_capture_child_tasks=True,
),
)(MyLLM).remote(
model="facebook/opt-125m",
enforce_eager=True,
worker_extension_cls="rlhf_utils.ColocateWorkerExtension",
tensor_parallel_size=2,
distributed_executor_backend="ray",
gpu_memory_utilization=0.4,
bundle_indices=bundle_indices,
)
inference_engines.append(llm)
# Do not call any method on the inference engine at this point; the call
# blocks until the vLLM instance finishes initialization.
for i, llm in enumerate(inference_engines):
inference_engine_device_ids.append(
ray.get(llm.collective_rpc.remote("report_device_id", args=tuple()))
)
print(f"inference engine {i} is on {inference_engine_device_ids[-1]}")
# Verify placement: the first two training actors share the same GPUs as
# the first inference engine.
assert training_actor_device_ids[:2] == inference_engine_device_ids[0]
# Verify placement: the last two training actors share the same GPUs as
# the second inference engine.
assert training_actor_device_ids[2:] == inference_engine_device_ids[1]
print("Gather all the ZMQ handles from the training actors.")
zmq_handles = {}
for actor in training_actors:
zmq_handles.update(ray.get(actor.get_zmq_handles.remote()))
print(f"ZMQ handles: {zmq_handles}")
print("Update the weights of the inference engines.")
ray.get(
[actor.update_weights.remote() for actor in training_actors]
+ [
llm.collective_rpc.remote("update_weights_from_ipc", args=(zmq_handles,))
for llm in inference_engines
]
)
print("Check if the weights are updated.")
for llm in inference_engines:
assert ray.get(llm.collective_rpc.remote("check_weights_changed", args=tuple()))
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""
Demonstrates reinforcement learning from human feedback (RLHF) using vLLM and Ray.
The script separates training and inference workloads onto distinct GPUs
so that Ray can manage process placement and inter-process communication.
A Hugging Face Transformer model occupies GPU 0 for training, whereas a
tensor-parallel vLLM inference engine occupies GPU 1–2.
The example performs the following steps:
* Load the training model on GPU 0.
* Split the inference model across GPUs 1–2 using vLLM's tensor parallelism
and Ray placement groups.
* Generate text from a list of prompts using the inference engine.
* Update the weights of the training model and broadcast the updated weights
to the inference engine by using a Ray collective RPC group. Note that
for demonstration purposes we simply zero out the weights.
For a production-ready implementation that supports multiple training and
inference replicas, see the OpenRLHF framework:
https://github.com/OpenRLHF/OpenRLHF
This example assumes a single-node cluster with three GPUs, but Ray
supports multi-node clusters. vLLM expects the GPUs are only used for vLLM
workloads. Residual GPU activity interferes with vLLM memory profiling and
causes unexpected behavior.
"""
import json
import os
import ray
import torch
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from rlhf_utils import stateless_init_process_group
from torchao.core.config import config_to_dict
from torchao.quantization import (
Float8DynamicActivationFloat8WeightConfig,
PerRow,
)
from transformers import AutoModelForCausalLM
from vllm import LLM, SamplingParams
from vllm.utils.network_utils import get_ip, get_open_port
class MyLLM(LLM):
"""Configure the vLLM worker for Ray placement group execution."""
def __init__(self, *args, **kwargs):
# Remove the top-level CUDA_VISIBLE_DEVICES variable set by Ray
# so that vLLM can manage its own device placement within the worker.
os.environ.pop("CUDA_VISIBLE_DEVICES", None)
super().__init__(*args, **kwargs)
# Load the OPT-125M model onto GPU 0 for the training workload.
train_model = AutoModelForCausalLM.from_pretrained("facebook/opt-125m")
train_model.to("cuda:0")
# Initialize Ray and set the visible devices. The vLLM engine will
# be placed on GPUs 1 and 2.
os.environ["CUDA_VISIBLE_DEVICES"] = "1,2"
ray.init()
# Create a placement group that reserves GPU 1–2 for the vLLM inference engine.
# Learn more about Ray placement groups:
# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html
pg_inference = placement_group([{"GPU": 1, "CPU": 0}] * 2)
ray.get(pg_inference.ready())
scheduling_inference = PlacementGroupSchedulingStrategy(
placement_group=pg_inference,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=0,
)
# Launch the vLLM inference engine. The `enforce_eager` flag reduces
# start-up latency.
# generate torchao quantization config for RL rollout
# see https://github.com/vllm-project/vllm/pull/23014 for instructions to
# use serialized config files instead of passing around json string
config = Float8DynamicActivationFloat8WeightConfig(granularity=PerRow())
json_str = json.dumps(config_to_dict(config))
llm = ray.remote(
num_cpus=0,
num_gpus=0,
scheduling_strategy=scheduling_inference,
)(MyLLM).remote(
model="facebook/opt-125m",
hf_overrides={"quantization_config_dict_json": json_str},
enforce_eager=True,
worker_extension_cls="rlhf_utils.WorkerExtension",
tensor_parallel_size=2,
distributed_executor_backend="ray",
)
# Generate text from the prompts.
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
sampling_params = SamplingParams(temperature=0)
outputs = ray.get(llm.generate.remote(prompts, sampling_params))
print("-" * 50)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}")
print("-" * 50)
# Set up the communication channel between the training process and the
# inference engine.
master_address = get_ip()
master_port = get_open_port()
handle = llm.collective_rpc.remote(
"init_weight_update_group", args=(master_address, master_port, 1, 3)
)
model_update_group = stateless_init_process_group(
master_address, master_port, 0, 3, torch.device("cuda:0")
)
ray.get(handle)
# Simulate a training step by zeroing out all model weights.
# In a real RLHF training loop the weights would be updated using the gradient
# from an RL objective such as PPO on a reward model.
for name, p in train_model.named_parameters():
p.data.zero_()
# Synchronize the updated weights to the inference engine.
for name, p in train_model.named_parameters():
dtype_name = str(p.dtype).split(".")[-1]
handle = llm.collective_rpc.remote(
"update_weight", args=(name, dtype_name, p.shape)
)
model_update_group.broadcast(p, src=0, stream=torch.cuda.current_stream())
ray.get(handle)
# Verify that the inference weights have been updated.
assert all(ray.get(llm.collective_rpc.remote("check_weights_changed")))
# Generate text with the updated model. The output is expected to be nonsense
# because the weights are zero.
outputs_updated = ray.get(llm.generate.remote(prompts, sampling_params))
print("-" * 50)
for output in outputs_updated:
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}")
print("-" * 50)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import gc
from collections.abc import Callable
from typing import TypedDict
import torch
import zmq
def stateless_init_process_group(master_address, master_port, rank, world_size, device):
"""
vLLM provides `StatelessProcessGroup` to create a process group
without considering the global process group in torch.distributed.
It is recommended to create `StatelessProcessGroup`, and then initialize
the data-plane communication (NCCL) between external (train processes)
and vLLM workers.
"""
from vllm.distributed.device_communicators.pynccl import PyNcclCommunicator
from vllm.distributed.utils import StatelessProcessGroup
pg = StatelessProcessGroup.create(
host=master_address, port=master_port, rank=rank, world_size=world_size
)
pynccl = PyNcclCommunicator(pg, device=device)
return pynccl
class WorkerExtension:
"""
The class for vLLM's worker to inherit from.
By defining an extension class, the code can work no matter what is
the underlying worker class.
NOTE: we define this class in a separate module, and the main module
should pass the full qualified name as `worker_extension_cls` argument.
"""
def init_weight_update_group(
self, master_address, master_port, rank_offset, world_size
):
from vllm.distributed.parallel_state import get_world_group
rank = get_world_group().rank + rank_offset
self.model_update_group = stateless_init_process_group(
master_address,
master_port,
rank,
world_size,
self.device,
)
def update_weight(self, name, dtype_name, shape):
dtype = getattr(torch, dtype_name)
weight = torch.empty(shape, dtype=dtype, device="cuda")
self.model_update_group.broadcast(
weight, src=0, stream=torch.cuda.current_stream()
)
self.model_runner.model.load_weights(weights=[(name, weight)])
del weight
def check_weights_changed(self):
"""
Check if the weights are updated to 0.
"""
weights_updated = True
for name, p in self.model_runner.model.named_parameters():
weights_updated = weights_updated and torch.allclose(p, torch.zeros_like(p))
return weights_updated
def rebuild_ipc(
handle: tuple[Callable, tuple], device_id: int | None = None
) -> torch.Tensor:
func, args = handle
list_args = list(args)
if device_id is not None:
# the key is to change device id to the current device id
# in case two processes have different CUDA_VISIBLE_DEVICES
list_args[6] = device_id
buffer = func(*list_args)
return buffer
class FlattenedTensorMetadata(TypedDict):
name: str
shape: torch.Size
dtype: torch.dtype
# specify the start offset of this tensor in shared ipc_buffer tensor
offset: int
class ColocateWorkerExtension:
"""
The class for vLLM's worker to inherit from, in the colocate setting.
By defining an extension class, the code can work no matter what is
the underlying worker class.
NOTE: we define this class in a separate module, and the main module
should pass the full qualified name as `worker_extension_cls` argument.
"""
def update_weights_from_ipc(self, zmq_handles: dict[str, str]):
from vllm.model_executor.model_loader.utils import process_weights_after_loading
assert self.device is not None
if not hasattr(self, "_zmq_ctx") or self._zmq_ctx is None:
self._zmq_ctx = zmq.Context()
socket = self._zmq_ctx.socket(zmq.REP)
socket.connect(zmq_handles[self.report_device_id()])
buffer: torch.Tensor | None = None
while True:
payload: tuple[Callable, tuple] | list[FlattenedTensorMetadata] | None = (
socket.recv_pyobj()
)
if payload is None:
# means the update is done
process_weights_after_loading(
self.model_runner.model, self.model_config, self.device
)
torch.accelerator.synchronize()
socket.send(b"")
break
if isinstance(payload, tuple):
# an ipc handle that vLLM can use `func, args = handle`
# and `func(*args)` to rebuild GPU tensor.
buffer = rebuild_ipc(payload, self.device.index)
assert buffer.dtype == torch.uint8
socket.send(b"")
continue
assert isinstance(payload, list)
assert buffer is not None
weights = []
for item in payload:
shape = item["shape"]
if isinstance(shape, (list, tuple)):
shape = torch.Size(shape)
assert isinstance(shape, torch.Size)
dtype, offset = item["dtype"], item["offset"]
size = dtype.itemsize * shape.numel()
tensor = buffer[offset : offset + size].view(dtype=dtype).view(shape)
weights.append((item["name"], tensor))
self.model_runner.model.load_weights(weights=weights)
del weights
torch.accelerator.synchronize()
socket.send(b"")
socket.close()
del buffer
gc.collect()
torch.accelerator.empty_cache()
def report_device_id(self) -> str:
from vllm.platforms import current_platform
self.device_uuid = current_platform.get_device_uuid(self.device.index)
return self.device_uuid
def check_weights_changed(self):
"""
Check if the weights are updated to 0.
"""
weights_updated = True
for name, p in self.model_runner.model.named_parameters():
weights_updated = weights_updated and torch.allclose(p, torch.zeros_like(p))
return weights_updated
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment