Commit 99324e25 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge tag 'v0.9.2' into v0.9.2-ori

parents cc7f22a8 a5dd03c1
...@@ -2,15 +2,16 @@ ...@@ -2,15 +2,16 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import contextlib import contextlib
import os import os
import weakref
from contextlib import ExitStack
import pytest import pytest
from tests.utils import wait_for_gpu_memory_to_clear
from vllm import LLM, SamplingParams from vllm import LLM, SamplingParams
from vllm.config import CompilationConfig from vllm.config import CompilationConfig
from vllm.platforms import current_platform from vllm.platforms import current_platform
MODEL = "Qwen/Qwen2-1.5B-Instruct"
@contextlib.contextmanager @contextlib.contextmanager
def temporary_environ(env_vars): def temporary_environ(env_vars):
...@@ -31,71 +32,127 @@ def temporary_environ(env_vars): ...@@ -31,71 +32,127 @@ def temporary_environ(env_vars):
os.environ[k] = v os.environ[k] = v
@pytest.fixture(scope="module") @pytest.fixture(scope="class")
def full_cudagraph_llm(): def llm_pair(request):
with temporary_environ({ model = request.param
"VLLM_USE_V1": "1",
"VLLM_FLASH_ATTN_VERSION": "3"
}):
return LLM(model=MODEL,
gpu_memory_utilization=0.3,
compilation_config=CompilationConfig(full_cuda_graph=True))
@pytest.fixture(scope="module")
def piecewise_llm():
with temporary_environ({ with temporary_environ({
"VLLM_USE_V1": "1", "VLLM_USE_V1": "1",
"VLLM_FLASH_ATTN_VERSION": "3" "VLLM_FLASH_ATTN_VERSION": "3"
}): }):
return LLM(model=MODEL, full = LLM(
gpu_memory_utilization=0.6, model=model,
compilation_config=CompilationConfig()) gpu_memory_utilization=0.45,
trust_remote_code=True,
max_model_len=1024,
def generate_text(llm: LLM, batch_size: int, max_tokens: int): compilation_config=CompilationConfig(full_cuda_graph=True),
prompts = ["Hi my name is"] * batch_size )
sampling_params = SamplingParams(temperature=0.0, piecewise = LLM(
max_tokens=max_tokens, model=model,
top_p=0.95) gpu_memory_utilization=0.45,
trust_remote_code=True,
return llm.generate(prompts, sampling_params) max_model_len=1024,
compilation_config=CompilationConfig(),
)
# PyTest caches the fixture values so we use weakref.proxy to enable GC
yield weakref.proxy(full), weakref.proxy(piecewise)
del full
del piecewise
wait_for_gpu_memory_to_clear(
devices=[0],
threshold_ratio=0.1,
)
@pytest.mark.parametrize(
"llm_pair",
[
# Model names for the llm_pair fixture
"deepseek-ai/DeepSeek-V2-Lite",
"Qwen/Qwen2-1.5B-Instruct"
],
indirect=True)
@pytest.mark.skipif(current_platform.get_device_capability() != (9, 0), @pytest.mark.skipif(current_platform.get_device_capability() != (9, 0),
reason="Only Hopper GPUs support FlashAttention 3") reason="Only Hopper GPUs support FA3 and FlashMLA")
@pytest.mark.parametrize(("batch_size", "max_tokens"), [(1, 10), (7, 10), class TestFullCUDAGraph:
(16, 10), (25, 10),
(32, 10), (45, 10),
(64, 10), (8, 5),
(8, 20), (8, 200)])
def test_full_cudagraph(batch_size, max_tokens, full_cudagraph_llm,
piecewise_llm):
""" """
Load full cudagraph model and piecewise model once, and at the same time to Use a class such that an llm pair is constructed once for all
reuse them across various test cases. batch_size/max_tokens combinations and released immediately after.
Test various batch sizes and max_tokens to ensure that the full cudagraph Module-scope fixtures would stick around the whole time,
compilation works for padded cases too. meaning there would be multiple LLM instances hogging memory simultaneously.
""" """
piecewise_responses = generate_text(piecewise_llm,
batch_size=batch_size,
max_tokens=max_tokens)
full_cudagraph_responses = generate_text(full_cudagraph_llm,
batch_size=batch_size,
max_tokens=max_tokens)
# Check that all responses are the same @pytest.mark.parametrize(("batch_size", "max_tokens"), [
for i in range(len(piecewise_responses)): (1, 10),
assert piecewise_responses[i].outputs[ (7, 10),
0].text == full_cudagraph_responses[i].outputs[0].text (16, 10),
(25, 10),
(32, 10),
(45, 10),
(64, 10),
(123, 10),
(8, 5),
(8, 30),
])
def test_full_cudagraph(self, batch_size, max_tokens,
llm_pair: tuple[LLM, LLM]):
"""
Test various batch sizes and max_tokens to ensure that the
full cudagraph compilation works for padded cases too.
"""
piecewise_llm, full_cudagraph_llm = llm_pair
prompts = ["Hello, my name is"] * batch_size
sampling_params = SamplingParams(temperature=0.0,
max_tokens=max_tokens,
top_p=0.95)
piecewise_responses = piecewise_llm.generate(prompts, sampling_params)
full_responses = full_cudagraph_llm.generate(prompts, sampling_params)
# Check that all responses are the same
for piecewise_res, full_res in zip(piecewise_responses,
full_responses):
assert piecewise_res.outputs[0].text == full_res.outputs[0].text
@pytest.mark.parametrize(
"model, supported",
[
("Qwen/Qwen2-1.5B-Instruct", True),
# MLA does not support capturing CUDA Graphs with size > max_num_seqs
("deepseek-ai/DeepSeek-V2-Lite", False),
])
@pytest.mark.skipif(current_platform.get_device_capability() != (9, 0),
reason="Only Hopper GPUs support FA3 and FlashMLA")
def test_lower_max_num_seqs(model, supported):
with temporary_environ({
"VLLM_USE_V1": "1",
"VLLM_FLASH_ATTN_VERSION": "3"
}), ExitStack() as stack:
if not supported:
stack.enter_context(pytest.raises(RuntimeError))
llm = LLM(model=model,
max_num_seqs=256,
trust_remote_code=True,
max_model_len=1024,
compilation_config=CompilationConfig(
full_cuda_graph=True,
cudagraph_capture_sizes=[64, 256, 512]))
llm.generate(["Hello, my name is"] * 10)
@pytest.mark.skipif(not current_platform.is_cuda(), reason="Skip if not cuda")
def test_full_cudagraph_with_invalid_backend(): def test_full_cudagraph_with_invalid_backend():
with temporary_environ({ with temporary_environ({
"VLLM_USE_V1": "1", "VLLM_USE_V1": "1",
"VLLM_FLASH_ATTN_VERSION": "VLLM_FLASH_ATTN_VERSION":
"2" #FA2 not supported with full_cuda_graph "2" #FA2 not supported with full_cuda_graph
}), pytest.raises(RuntimeError): }), pytest.raises(RuntimeError):
LLM(model=MODEL, LLM(model="Qwen/Qwen2-1.5B-Instruct",
compilation_config=CompilationConfig(full_cuda_graph=True)) compilation_config=CompilationConfig(full_cuda_graph=True))
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
Test the piecewise compilation with a simple model so that we Test the piecewise compilation with a simple model so that we
can exactly calculate the expected output and side effects. can exactly calculate the expected output and side effects.
""" """
import pytest
import torch import torch
from torch import nn from torch import nn
from torch.library import Library from torch.library import Library
...@@ -14,6 +14,7 @@ from vllm.compilation.decorators import support_torch_compile ...@@ -14,6 +14,7 @@ from vllm.compilation.decorators import support_torch_compile
from vllm.config import (CompilationConfig, CompilationLevel, VllmConfig, from vllm.config import (CompilationConfig, CompilationLevel, VllmConfig,
set_current_vllm_config) set_current_vllm_config)
from vllm.envs import VLLM_USE_V1 from vllm.envs import VLLM_USE_V1
from vllm.forward_context import set_forward_context
from vllm.utils import direct_register_custom_op from vllm.utils import direct_register_custom_op
global_counter = 0 global_counter = 0
...@@ -76,7 +77,8 @@ class SillyModel(nn.Module): ...@@ -76,7 +77,8 @@ class SillyModel(nn.Module):
return x return x
def _test_simple_piecewise_compile(*, use_inductor): @pytest.mark.parametrize("use_inductor", [True, False])
def test_simple_piecewise_compile(use_inductor):
assert VLLM_USE_V1 assert VLLM_USE_V1
vllm_config = VllmConfig(compilation_config=CompilationConfig( vllm_config = VllmConfig(compilation_config=CompilationConfig(
...@@ -99,7 +101,7 @@ def _test_simple_piecewise_compile(*, use_inductor): ...@@ -99,7 +101,7 @@ def _test_simple_piecewise_compile(*, use_inductor):
num_backend_compilations=3, # num_piecewise_capturable_graphs_seen num_backend_compilations=3, # num_piecewise_capturable_graphs_seen
num_cudagraph_captured= num_cudagraph_captured=
6, # num_cudagraph_sizes * num_piecewise_capturable_graphs_seen 6, # num_cudagraph_sizes * num_piecewise_capturable_graphs_seen
): ), set_forward_context({}, vllm_config=vllm_config):
model(inputs) model(inputs)
...@@ -112,11 +114,3 @@ def _test_simple_piecewise_compile(*, use_inductor): ...@@ -112,11 +114,3 @@ def _test_simple_piecewise_compile(*, use_inductor):
output = model(input) output = model(input)
assert global_counter == 2 assert global_counter == 2
assert torch.allclose(output.cpu(), torch.tensor([3., 1.])) assert torch.allclose(output.cpu(), torch.tensor([3., 1.]))
def test_simple_piecewise_compile_inductor():
_test_simple_piecewise_compile(use_inductor=True)
def test_simple_piecewise_compile_no_inductor():
_test_simple_piecewise_compile(use_inductor=False)
...@@ -11,6 +11,7 @@ initialized randomly with a fixed seed. ...@@ -11,6 +11,7 @@ initialized randomly with a fixed seed.
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Optional from typing import Any, Optional
import pytest
import torch import torch
from torch import nn from torch import nn
from torch.library import Library from torch.library import Library
...@@ -19,6 +20,7 @@ from vllm.compilation.counter import compilation_counter ...@@ -19,6 +20,7 @@ from vllm.compilation.counter import compilation_counter
from vllm.compilation.decorators import support_torch_compile from vllm.compilation.decorators import support_torch_compile
from vllm.config import (CompilationConfig, CompilationLevel, VllmConfig, from vllm.config import (CompilationConfig, CompilationLevel, VllmConfig,
set_current_vllm_config) set_current_vllm_config)
from vllm.forward_context import set_forward_context
from vllm.utils import direct_register_custom_op from vllm.utils import direct_register_custom_op
# create a library to hold the custom op # create a library to hold the custom op
...@@ -285,29 +287,32 @@ def run_model(llama_config, ...@@ -285,29 +287,32 @@ def run_model(llama_config,
vllm_config=vllm_config, vllm_config=vllm_config,
prefix="").eval().cuda() prefix="").eval().cuda()
B = 16 # max batch size with set_forward_context({}, vllm_config=vllm_config):
input_ids = torch.randint(0, llama_config.vocab_size, (B, )).cuda() B = 16 # max batch size
positions = torch.arange(B).cuda() input_ids = torch.randint(0, llama_config.vocab_size, (B, )).cuda()
positions = torch.arange(B).cuda()
model(input_ids, positions) model(input_ids, positions)
model(input_ids[:2], positions[:2]) model(input_ids[:2], positions[:2])
model(input_ids[:1], positions[:1]) model(input_ids[:1], positions[:1])
input_ids[:2].zero_() input_ids[:2].zero_()
output = model(input_ids[:2], positions[:2]) output = model(input_ids[:2], positions[:2])
output = output.cpu() output = output.cpu()
if llama_config.tractable_init: if llama_config.tractable_init:
expected_output = tractable_computation(input_ids[:2], positions[:2], expected_output = tractable_computation(input_ids[:2],
llama_config).cpu() positions[:2],
llama_config).cpu()
assert torch.allclose(output, expected_output) assert torch.allclose(output, expected_output)
else: else:
return output.cpu() return output.cpu()
def _test_toy_llama(*, use_inductor): @pytest.mark.parametrize("use_inductor", [True, False])
def test_toy_llama(use_inductor: bool):
# compare output with and without piecewise compilation # compare output with and without piecewise compilation
llama_config = LlamaConfig(hidden_size=128, llama_config = LlamaConfig(hidden_size=128,
...@@ -379,14 +384,6 @@ def _test_toy_llama(*, use_inductor): ...@@ -379,14 +384,6 @@ def _test_toy_llama(*, use_inductor):
assert torch.allclose(outputs[0], outputs[i]) assert torch.allclose(outputs[0], outputs[i])
def test_toy_llama_inductor():
_test_toy_llama(use_inductor=True)
def test_toy_no_inductor():
_test_toy_llama(use_inductor=False)
@torch.inference_mode @torch.inference_mode
def benchmark(): def benchmark():
from triton.testing import do_bench from triton.testing import do_bench
......
...@@ -169,8 +169,7 @@ def async_tp_pass_on_test_model(local_rank: int, world_size: int, ...@@ -169,8 +169,7 @@ def async_tp_pass_on_test_model(local_rank: int, world_size: int,
# In pre-nodes, all gather or reduce scatter should exist, # In pre-nodes, all gather or reduce scatter should exist,
# fused_matmul_reduce_scatter or fused_all_gather_matmul should not # fused_matmul_reduce_scatter or fused_all_gather_matmul should not
backend.check_before_ops(model.ops_in_model_before(), backend.check_before_ops(model.ops_in_model_before(), fully_replaced=False)
ops_fully_replaced=False)
# In post-nodes, fused_matmul_reduce_scatter or \ # In post-nodes, fused_matmul_reduce_scatter or \
# fused_all_gather_matmul should exist # fused_all_gather_matmul should exist
...@@ -223,7 +222,7 @@ def test_async_tp_pass_correctness( ...@@ -223,7 +222,7 @@ def test_async_tp_pass_correctness(
"VLLM_USE_V1": "1", "VLLM_USE_V1": "1",
} }
aysnc_tp_args = [ async_tp_args = [
*common_args, *common_args,
"--tensor-parallel-size", "--tensor-parallel-size",
str(tp_size), str(tp_size),
...@@ -242,7 +241,7 @@ def test_async_tp_pass_correctness( ...@@ -242,7 +241,7 @@ def test_async_tp_pass_correctness(
] ]
compare_two_settings(model_id, compare_two_settings(model_id,
aysnc_tp_args, async_tp_args,
tp_args, tp_args,
async_tp_env, async_tp_env,
tp_env, tp_env,
......
...@@ -31,7 +31,7 @@ class TestSetting: ...@@ -31,7 +31,7 @@ class TestSetting:
# basic llama model # basic llama model
TestSetting( TestSetting(
model="meta-llama/Llama-3.2-1B-Instruct", model="meta-llama/Llama-3.2-1B-Instruct",
model_args=[], model_args=["--max-model-len", "2048"],
pp_size=2, pp_size=2,
tp_size=2, tp_size=2,
attn_backend="FLASHINFER", attn_backend="FLASHINFER",
...@@ -41,7 +41,7 @@ class TestSetting: ...@@ -41,7 +41,7 @@ class TestSetting:
# llama model with quantization # llama model with quantization
TestSetting( TestSetting(
model="TheBloke/TinyLlama-1.1B-Chat-v0.3-GPTQ", model="TheBloke/TinyLlama-1.1B-Chat-v0.3-GPTQ",
model_args=["--quantization", "gptq"], model_args=["--quantization", "gptq", "--max-model-len", "2048"],
pp_size=1, pp_size=1,
tp_size=1, tp_size=1,
attn_backend="FLASH_ATTN", attn_backend="FLASH_ATTN",
...@@ -51,7 +51,7 @@ class TestSetting: ...@@ -51,7 +51,7 @@ class TestSetting:
# MoE model # MoE model
TestSetting( TestSetting(
model="ibm/PowerMoE-3b", model="ibm/PowerMoE-3b",
model_args=[], model_args=["--max-model-len", "2048"],
pp_size=1, pp_size=1,
tp_size=2, tp_size=2,
attn_backend="FLASH_ATTN", attn_backend="FLASH_ATTN",
...@@ -61,23 +61,27 @@ class TestSetting: ...@@ -61,23 +61,27 @@ class TestSetting:
# embedding model # embedding model
TestSetting( TestSetting(
model="BAAI/bge-multilingual-gemma2", model="BAAI/bge-multilingual-gemma2",
model_args=["--task", "embed", "--dtype", "bfloat16"], model_args=[
"--task", "embed", "--dtype", "bfloat16", "--max-model-len",
"2048"
],
pp_size=1, pp_size=1,
tp_size=1, tp_size=1,
attn_backend="FLASH_ATTN", attn_backend="FLASH_ATTN",
method="encode", method="encode",
fullgraph=True, fullgraph=True,
), ),
# encoder-based embedding model (BERT) # TODO: bert models are not supported in V1 yet
TestSetting( # # encoder-based embedding model (BERT)
model="BAAI/bge-base-en-v1.5", # TestSetting(
model_args=["--task", "embed"], # model="BAAI/bge-base-en-v1.5",
pp_size=1, # model_args=["--task", "embed"],
tp_size=1, # pp_size=1,
attn_backend="XFORMERS", # tp_size=1,
method="encode", # attn_backend="XFORMERS",
fullgraph=True, # method="encode",
), # fullgraph=True,
# ),
# vision language model # vision language model
TestSetting( TestSetting(
model="microsoft/Phi-3.5-vision-instruct", model="microsoft/Phi-3.5-vision-instruct",
......
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest import pytest
import torch
import vllm import vllm
from vllm.compilation.counter import compilation_counter from vllm.compilation.counter import compilation_counter
from vllm.config import (CompilationConfig, CompilationLevel, VllmConfig, from vllm.config import VllmConfig
set_current_vllm_config) from vllm.utils import _is_torch_equal_or_newer
from .piecewise.test_simple import SillyModel
def test_version():
assert _is_torch_equal_or_newer('2.8.0.dev20250624+cu128', '2.8.0.dev')
assert _is_torch_equal_or_newer('2.8.0a0+gitc82a174', '2.8.0.dev')
assert _is_torch_equal_or_newer('2.8.0', '2.8.0.dev')
assert _is_torch_equal_or_newer('2.8.1', '2.8.0.dev')
assert not _is_torch_equal_or_newer('2.7.1', '2.8.0.dev')
def test_use_cudagraphs_dynamic(monkeypatch):
assert vllm.envs.VLLM_USE_V1
vllm_config = VllmConfig()
assert vllm_config.compilation_config.use_cudagraph
monkeypatch.setenv('VLLM_USE_V1', '0')
vllm_config = VllmConfig()
assert not vllm_config.compilation_config.use_cudagraph
@pytest.mark.parametrize("enabled", [True, False]) @pytest.mark.parametrize("enabled", [True, False])
def test_use_cudagraphs(enabled): def test_use_cudagraphs(vllm_runner, monkeypatch, enabled):
assert vllm.envs.VLLM_USE_V1 assert vllm.envs.VLLM_USE_V1
vllm_config = VllmConfig(compilation_config=CompilationConfig(
level=CompilationLevel.PIECEWISE, # Disable multiprocessing so that the counter is in the same process
use_cudagraph=enabled, monkeypatch.setenv('VLLM_ENABLE_V1_MULTIPROCESSING', '0')
cudagraph_capture_sizes=[100],
)) compilation_config = {
with set_current_vllm_config(vllm_config): "cudagraph_capture_sizes": [100],
model = SillyModel(vllm_config=vllm_config, prefix='') "use_cudagraph": enabled,
}
inputs = torch.randn(100, device="cuda") with (
compilation_counter.expect(
with compilation_counter.expect( num_graphs_seen=1,
num_graphs_seen=1, # one graph for the model num_gpu_runner_capture_triggers=1 if enabled else 0,
num_cudagraph_captured=1 if enabled else 0, num_cudagraph_captured=13 if enabled else 0,
): ),
# first run is warmup # loading the model causes compilation (if enabled) to happen
model(inputs) vllm_runner('facebook/opt-125m',
# second run does CUDAGraphs recording (if enabled) compilation_config=compilation_config,
model(inputs) gpu_memory_utilization=0.4) as _):
pass
...@@ -7,8 +7,7 @@ import torch ...@@ -7,8 +7,7 @@ import torch
import vllm.envs as envs import vllm.envs as envs
import vllm.plugins import vllm.plugins
from vllm.compilation.fusion import (FUSED_OPS, QUANT_OPS, FusedRMSQuantKey, from vllm.compilation.fusion import (FUSED_OPS, QUANT_OPS, FusedRMSQuantKey,
FusionPass, QuantKey) FusionPass, GroupShape, QuantKey)
from vllm.compilation.fx_utils import find_auto_fn, find_auto_fn_maybe
from vllm.compilation.noop_elimination import NoOpEliminationPass from vllm.compilation.noop_elimination import NoOpEliminationPass
from vllm.config import (CompilationConfig, CompilationLevel, PassConfig, from vllm.config import (CompilationConfig, CompilationLevel, PassConfig,
VllmConfig) VllmConfig)
...@@ -30,9 +29,10 @@ class TestModel(torch.nn.Module): ...@@ -30,9 +29,10 @@ class TestModel(torch.nn.Module):
self.cutlass_fp8_enabled = cutlass_fp8_enabled self.cutlass_fp8_enabled = cutlass_fp8_enabled
self.norm = [RMSNorm(hidden_size, eps) for _ in range(3)] self.norm = [RMSNorm(hidden_size, eps) for _ in range(3)]
self.wscale = [torch.rand(1, dtype=torch.float32) for _ in range(2)] self.wscale = [torch.rand(1, dtype=torch.float32) for _ in range(2)]
group_shape = GroupShape.PER_TENSOR if static else GroupShape.PER_TOKEN
self.key = QuantKey(dtype=FP8_DTYPE, self.key = QuantKey(dtype=FP8_DTYPE,
static=static, static=static,
per_tensor=static, group_shape=group_shape,
symmetric=True) symmetric=True)
if static: if static:
self.scale = [torch.rand(1, dtype=torch.float32) for _ in range(2)] self.scale = [torch.rand(1, dtype=torch.float32) for _ in range(2)]
...@@ -122,9 +122,7 @@ def test_fusion_rmsnorm_quant(dtype, hidden_size, num_tokens, eps, static, ...@@ -122,9 +122,7 @@ def test_fusion_rmsnorm_quant(dtype, hidden_size, num_tokens, eps, static,
torch.testing.assert_close(result, result2, atol=ATOL, rtol=RTOL) torch.testing.assert_close(result, result2, atol=ATOL, rtol=RTOL)
# In pre-nodes, fp8 quant should be there and fused kernels should not # In pre-nodes, fp8 quant should be there and fused kernels should not
backend.check_before_ops(model.ops_in_model_before(), find_auto_fn, backend.check_before_ops(model.ops_in_model_before())
find_auto_fn_maybe)
# In post-nodes, fused kernels should be there and fp8 quant should not # In post-nodes, fused kernels should be there and fp8 quant should not
backend.check_after_ops(model.ops_in_model_after(), find_auto_fn, backend.check_after_ops(model.ops_in_model_after())
find_auto_fn_maybe)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from typing import Optional
import pytest
import torch._dynamo
from tests.compile.backend import TestBackend
from tests.models.utils import check_outputs_equal
from vllm import LLM, SamplingParams
from vllm.compilation.fusion import QUANT_OPS, QuantKey, kFp8StaticTensorSym
from vllm.compilation.fusion_attn import ATTN_OP, AttnFusionPass
from vllm.compilation.fx_utils import find_op_nodes
from vllm.compilation.noop_elimination import NoOpEliminationPass
from vllm.config import CompilationConfig, CompilationLevel, VllmConfig
from vllm.platforms import current_platform
# globals needed for string-import custom Dynamo backend field
backend: Optional[TestBackend] = None
backend_unfused: Optional[TestBackend] = None
@pytest.mark.parametrize(
"model, quant_key",
[("amd/Llama-3.1-8B-Instruct-FP8-KV", kFp8StaticTensorSym)])
@pytest.mark.parametrize(
"use_triton_fa", [True, False] if current_platform.is_rocm() else [False])
@pytest.mark.skipif(not current_platform.supports_fp8(), reason="Need FP8")
@pytest.mark.skipif(not current_platform.is_cuda_alike(),
reason="Only test CUDA and ROCm")
def test_attention_fusion(example_prompts, monkeypatch, model: str,
quant_key: QuantKey, use_triton_fa: bool):
# Clean Dynamo cache to avoid reusing other test cases
# (for some reason the reset at the end is not enough)
torch._dynamo.reset()
# Use global backends
global backend, backend_unfused
use_v1 = False # can be made a param once V1 support added
monkeypatch.setenv("VLLM_USE_V1", str(int(use_v1)))
monkeypatch.setenv("VLLM_USE_TRITON_FLASH_ATTN", str(int(use_triton_fa)))
# Prompt 4 seems too open-ended, differs between fused and unfused
# (both outputs look reasonable though)
prompts = example_prompts[:4] + example_prompts[5:]
compile_config = CompilationConfig(
# DYNAMO_AS_IS triggers custom backend & does full Dynamo compilation
# DYNAMO_ONCE does not properly propagate shapes.
level=CompilationLevel.DYNAMO_AS_IS,
backend="tests.compile.test_fusion_attn.backend_unfused",
)
vllm_config = VllmConfig(compilation_config=compile_config)
backend_unfused = TestBackend(NoOpEliminationPass(vllm_config))
llm = LLM(model,
enforce_eager=True,
compilation_config=compile_config,
gpu_memory_utilization=0.9,
max_model_len=2048)
sampling_params = SamplingParams(temperature=0.0,
max_tokens=10,
top_p=0.95)
unfused_output = llm.generate(prompts, sampling_params)
backend_unfused = None # Reset backend to make sure llm gets released
del llm
compile_config = CompilationConfig(
# DYNAMO_AS_IS triggers custom backend & does full Dynamo compilation
# DYNAMO_ONCE does not properly propagate shapes.
level=CompilationLevel.DYNAMO_AS_IS,
backend="tests.compile.test_fusion_attn.backend",
)
vllm_config = VllmConfig(compilation_config=compile_config)
# AttnFusionPass needs attention layers to be registered in config upon init
# so we initialize it during compilation.
attn_pass = lambda *args, **kw: AttnFusionPass(vllm_config)(*args, **kw)
backend = TestBackend(NoOpEliminationPass(vllm_config), attn_pass)
llm2 = LLM(model,
enforce_eager=True,
compilation_config=compile_config,
gpu_memory_utilization=0.9,
max_model_len=2048)
# check support
attn_fusion_supported = [
layer.impl.fused_output_quant_supported(quant_key.dtype,
quant_key.static,
quant_key.group_shape)
for key, layer in compile_config.static_forward_context.items()
]
print(f"{attn_fusion_supported=}")
if any(attn_fusion_supported):
# Check quant ops
backend.check_before_ops([QUANT_OPS[quant_key]], fully_replaced=False)
# attention ops present in both, just output_scale param changes
attn_nodes_pre = list(find_op_nodes(ATTN_OP, backend.graph_pre_pass))
attn_nodes_post = list(find_op_nodes(ATTN_OP, backend.graph_post_pass))
assert len(attn_nodes_pre) == len(attn_nodes_post)
for i in range(len(attn_nodes_pre)):
assert attn_nodes_pre[i].kwargs["output_scale"] is None
fused = attn_nodes_post[i].kwargs["output_scale"] is not None
assert fused == attn_fusion_supported[i], \
f"Node {i} {'' if fused else 'not '} expected " \
f"to have fused output quant"
# check outputs
fused_output = llm2.generate(prompts, sampling_params)
# transform outputs to format expected by check_outputs_equal
sample_outs = lambda s: (list(s.token_ids), s.text)
outs_lst = lambda ros: [sample_outs(ro.outputs[0]) for ro in ros]
check_outputs_equal(
outputs_0_lst=outs_lst(unfused_output),
outputs_1_lst=outs_lst(fused_output),
name_0="unfused",
name_1="fused",
)
# Clean Dynamo cache to avoid polluting other case(s)
torch._dynamo.reset()
# Reset backend to make sure llm2 gets released
backend = None
...@@ -6,7 +6,9 @@ import torch ...@@ -6,7 +6,9 @@ import torch
import vllm.envs as envs import vllm.envs as envs
from vllm.compilation.fix_functionalization import FixFunctionalizationPass from vllm.compilation.fix_functionalization import FixFunctionalizationPass
from vllm.compilation.fusion import FusionPass
from vllm.compilation.fx_utils import find_auto_fn, find_auto_fn_maybe, is_func from vllm.compilation.fx_utils import find_auto_fn, find_auto_fn_maybe, is_func
from vllm.compilation.noop_elimination import NoOpEliminationPass
from vllm.compilation.sequence_parallelism import SequenceParallelismPass from vllm.compilation.sequence_parallelism import SequenceParallelismPass
from vllm.config import (CompilationConfig, DeviceConfig, ModelConfig, from vllm.config import (CompilationConfig, DeviceConfig, ModelConfig,
PassConfig, VllmConfig) PassConfig, VllmConfig)
...@@ -14,12 +16,15 @@ from vllm.distributed import tensor_model_parallel_all_reduce ...@@ -14,12 +16,15 @@ from vllm.distributed import tensor_model_parallel_all_reduce
from vllm.distributed.parallel_state import (init_distributed_environment, from vllm.distributed.parallel_state import (init_distributed_environment,
initialize_model_parallel) initialize_model_parallel)
from vllm.model_executor.layers.layernorm import RMSNorm from vllm.model_executor.layers.layernorm import RMSNorm
from vllm.model_executor.layers.quantization.utils.w8a8_utils import (
Fp8LinearOp)
from vllm.platforms import current_platform from vllm.platforms import current_platform
from vllm.utils import update_environment_variables from vllm.utils import update_environment_variables
from ..utils import multi_gpu_test from ..utils import multi_gpu_test
from .backend import TestBackend from .backend import TestBackend
FP8_DTYPE = current_platform.fp8_dtype()
prompts = [ prompts = [
"Hello, my name is", "Hello, my name is",
"The president of the United States is", "The president of the United States is",
...@@ -30,13 +35,16 @@ prompts = [ ...@@ -30,13 +35,16 @@ prompts = [
class TestModel(torch.nn.Module): class TestModel(torch.nn.Module):
def __init__(self, hidden_size=16, intermediate_size=32): def __init__(self,
hidden_size=16,
intermediate_size=32,
vllm_config: VllmConfig = None):
super().__init__() super().__init__()
self.hidden_size = hidden_size self.hidden_size = hidden_size
self.intermediate_size = intermediate_size self.intermediate_size = intermediate_size
self.gate_proj = torch.nn.Parameter( self.gate_proj = torch.nn.Parameter(
torch.empty((intermediate_size, hidden_size))) torch.empty((intermediate_size, hidden_size)))
self.norm = RMSNorm(hidden_size, 1e-05) self.norm = RMSNorm(intermediate_size, 1e-05)
# Initialize weights # Initialize weights
torch.nn.init.normal_(self.gate_proj, std=0.02) torch.nn.init.normal_(self.gate_proj, std=0.02)
...@@ -79,32 +87,138 @@ class TestModel(torch.nn.Module): ...@@ -79,32 +87,138 @@ class TestModel(torch.nn.Module):
return [torch.ops._C.fused_add_rms_norm.default] return [torch.ops._C.fused_add_rms_norm.default]
class TestQuantModel(torch.nn.Module):
def __init__(self,
hidden_size=16,
intermediate_size=32,
vllm_config: VllmConfig = None):
super().__init__()
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.vllm_config = vllm_config
self.gate_proj = torch.nn.Parameter(torch.empty(
(intermediate_size, hidden_size)),
requires_grad=False)
self.norm = RMSNorm(intermediate_size, 1e-05)
# Initialize weights
torch.nn.init.normal_(self.gate_proj, std=0.02)
self.fp8_linear = Fp8LinearOp(cutlass_fp8_supported=True,
use_per_token_if_dynamic=False)
self.scale = torch.rand(1, dtype=torch.float32)
# Create a weight that is compatible with torch._scaled_mm,
# which expects a column-major layout.
self.w = torch.rand(hidden_size,
intermediate_size).to(dtype=FP8_DTYPE).t()
self.wscale = torch.rand(1, dtype=torch.float32)
def forward(self, hidden_states, residual):
"""
Forward pass implementing the operations in the FX graph
Args:
hidden_states: Input tensor
residual: Residual tensor from previous layer
Returns:
Tuple containing the output tensor
"""
# Reshape input
view = hidden_states.reshape(-1, self.hidden_size)
#matrix multiplication
permute = self.gate_proj.permute(1, 0)
mm = torch.mm(view, permute)
# Tensor parallel all-reduce
all_reduce = tensor_model_parallel_all_reduce(mm)
# layer normalization
norm_output, residual_output = self.norm(all_reduce, residual)
# for static input quantization
# self.fp8_linear is initialized with use_per_token_if_dynamic=False
fp8_linear_result = self.fp8_linear.apply(norm_output,
self.w,
self.wscale,
input_scale=self.scale.to(
norm_output.device))
return fp8_linear_result, residual_output
def ops_in_model_before(self):
ops_to_remove = [torch.ops.vllm.all_reduce.default
] # Always removed by SP
# The following are only removed if fusion happens
if self.vllm_config and self.vllm_config.compilation_config \
.pass_config.enable_fusion:
ops_to_remove.extend([
torch.ops._C.fused_add_rms_norm.default,
torch.ops._C.static_scaled_fp8_quant.default,
])
return ops_to_remove
def ops_in_model_after(self):
ops_to_add = [
torch.ops.vllm.reduce_scatter.default,
torch.ops.vllm.all_gather.default
]
# The following is only added if fusion happens
if self.vllm_config and self.vllm_config.compilation_config \
.pass_config.enable_fusion:
ops_to_add.append(
torch.ops._C.fused_add_rms_norm_static_fp8_quant.default)
return ops_to_add
def ops_in_model(self):
if self.vllm_config and self.vllm_config.compilation_config \
.pass_config.enable_fusion:
# If fusion happens, the fused op is the one
# we check for (de)functionalization
return [torch.ops._C.fused_add_rms_norm_static_fp8_quant.default
] # noqa: E501
else:
# If no fusion, the original ops are checked
return [
torch.ops._C.fused_add_rms_norm.default,
# TODO functionalization pass does not handle this yet
# torch.ops._C.static_scaled_fp8_quant.default,
]
@multi_gpu_test(num_gpus=2) @multi_gpu_test(num_gpus=2)
@pytest.mark.parametrize("test_model_cls", [TestModel, TestQuantModel])
@pytest.mark.parametrize("batch_size", [8]) @pytest.mark.parametrize("batch_size", [8])
@pytest.mark.parametrize("seq_len", [16]) @pytest.mark.parametrize("seq_len", [16])
@pytest.mark.parametrize("hidden_size", [16]) @pytest.mark.parametrize("hidden_size", [16])
@pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16]) @pytest.mark.parametrize("dtype", [torch.float16, torch.bfloat16])
@pytest.mark.parametrize("enable_fusion", [True, False])
@pytest.mark.skipif(envs.VLLM_TARGET_DEVICE not in ["cuda"], @pytest.mark.skipif(envs.VLLM_TARGET_DEVICE not in ["cuda"],
reason="Only test on CUDA") reason="Only test on CUDA")
def test_sequence_parallelism_pass(batch_size: int, seq_len: int, def test_sequence_parallelism_pass(test_model_cls: type[torch.nn.Module],
hidden_size: int, dtype: torch.dtype): batch_size: int, seq_len: int,
hidden_size: int, dtype: torch.dtype,
enable_fusion: bool):
num_processes = 2 num_processes = 2
def run_torch_spawn(fn, nprocs): def run_torch_spawn(fn, nprocs):
# need to use torch.mp.spawn otherwise will have problems with # need to use torch.mp.spawn otherwise will have problems with
# torch.distributed and cuda # torch.distributed and cuda
torch.multiprocessing.spawn(fn, torch.multiprocessing.spawn(fn,
args=(num_processes, batch_size, seq_len, args=(num_processes, test_model_cls,
hidden_size, dtype), batch_size, seq_len, hidden_size,
dtype, enable_fusion),
nprocs=nprocs) nprocs=nprocs)
run_torch_spawn(sequence_parallelism_pass_on_test_model, num_processes) run_torch_spawn(sequence_parallelism_pass_on_test_model, num_processes)
def sequence_parallelism_pass_on_test_model(local_rank: int, world_size: int, def sequence_parallelism_pass_on_test_model(
batch_size: int, seq_len: int, local_rank: int, world_size: int,
hidden_size: int, test_model_cls: type[torch.nn.Module], batch_size: int, seq_len: int,
dtype: torch.dtype): hidden_size: int, dtype: torch.dtype, enable_fusion: bool):
current_platform.seed_everything(0) current_platform.seed_everything(0)
device = torch.device(f"cuda:{local_rank}") device = torch.device(f"cuda:{local_rank}")
...@@ -127,26 +241,39 @@ def sequence_parallelism_pass_on_test_model(local_rank: int, world_size: int, ...@@ -127,26 +241,39 @@ def sequence_parallelism_pass_on_test_model(local_rank: int, world_size: int,
# configure vllm config for SequenceParallelismPass # configure vllm config for SequenceParallelismPass
vllm_config = VllmConfig() vllm_config = VllmConfig()
vllm_config.compilation_config = CompilationConfig(pass_config=PassConfig( vllm_config.compilation_config = CompilationConfig(pass_config=PassConfig(
enable_sequence_parallelism=True)) enable_sequence_parallelism=True,
enable_fusion=enable_fusion,
enable_noop=True)) # NoOp needed for fusion
vllm_config.device_config = DeviceConfig(device=torch.device("cuda")) vllm_config.device_config = DeviceConfig(device=torch.device("cuda"))
# this is a fake model name to construct the model config # this is a fake model name to construct the model config
# in the vllm_config, it's not really used. # in the vllm_config, it's not really used.
model = "nm-testing/TinyLlama-1.1B-Chat-v1.0-FP8-e2e" model_name = "nm-testing/TinyLlama-1.1B-Chat-v1.0-FP8-e2e"
vllm_config.model_config = ModelConfig(model=model, vllm_config.model_config = ModelConfig(model=model_name,
task="auto", task="auto",
tokenizer=model, tokenizer=model_name,
tokenizer_mode="auto", tokenizer_mode="auto",
trust_remote_code=True, trust_remote_code=True,
dtype=dtype, dtype=dtype,
seed=42) seed=42)
sequence_parallelism_pass = SequenceParallelismPass(vllm_config) sequence_parallelism_pass = SequenceParallelismPass(vllm_config)
backend_no_func = TestBackend(sequence_parallelism_pass) noop_pass = NoOpEliminationPass(vllm_config)
func_pass = FixFunctionalizationPass(vllm_config) func_pass = FixFunctionalizationPass(vllm_config)
backend_func = TestBackend(sequence_parallelism_pass, func_pass)
model = TestModel(hidden_size, hidden_size * 2) passes_for_backend = [noop_pass, sequence_parallelism_pass]
if enable_fusion:
fusion_pass = FusionPass.instance(vllm_config)
passes_for_backend.append(fusion_pass)
backend_no_func = TestBackend(*passes_for_backend)
backend_func = TestBackend(*passes_for_backend, func_pass)
model = test_model_cls(hidden_size,
hidden_size * 2,
vllm_config=vllm_config)
hidden_states = torch.randn((batch_size * seq_len, hidden_size), hidden_states = torch.randn((batch_size * seq_len, hidden_size),
dtype=dtype) dtype=dtype)
residual = torch.randn((batch_size * seq_len, hidden_size), dtype=dtype) residual = torch.randn((batch_size * seq_len, hidden_size), dtype=dtype)
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from vllm.engine.arg_utils import EngineArgs
from vllm.model_executor.layers.quantization.quark.utils import deep_compare
def test_cuda_empty_vs_unset_configs(monkeypatch: pytest.MonkeyPatch):
"""Test that configs created with normal (untouched) CUDA_VISIBLE_DEVICES
and CUDA_VISIBLE_DEVICES="" are equivalent. This ensures consistent
behavior regardless of whether GPU visibility is disabled via empty string
or left in its normal state.
"""
def create_config():
engine_args = EngineArgs(model="deepseek-ai/DeepSeek-V2-Lite",
trust_remote_code=True)
return engine_args.create_engine_config()
# Create config with CUDA_VISIBLE_DEVICES set normally
normal_config = create_config()
# Create config with CUDA_VISIBLE_DEVICES=""
with monkeypatch.context() as m:
m.setenv("CUDA_VISIBLE_DEVICES", "")
empty_config = create_config()
normal_config_dict = vars(normal_config)
empty_config_dict = vars(empty_config)
# Remove instance_id before comparison as it's expected to be different
normal_config_dict.pop("instance_id", None)
empty_config_dict.pop("instance_id", None)
assert deep_compare(normal_config_dict, empty_config_dict), (
"Configs with normal CUDA_VISIBLE_DEVICES and CUDA_VISIBLE_DEVICES=\"\""
" should be equivalent")
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import sys
from unittest.mock import patch
from vllm.config import VllmConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.v1.engine.async_llm import AsyncLLM
def test_mp_reducer(monkeypatch):
"""
Test that _reduce_config reducer is registered when AsyncLLM is instantiated
without transformers_modules. This is a regression test for
https://github.com/vllm-project/vllm/pull/18640.
"""
# Use V1 AsyncLLM which calls maybe_register_config_serialize_by_value
monkeypatch.setenv('VLLM_USE_V1', '1')
# Ensure transformers_modules is not in sys.modules
if 'transformers_modules' in sys.modules:
del sys.modules['transformers_modules']
with patch('multiprocessing.reducer.register') as mock_register:
engine_args = AsyncEngineArgs(
model="facebook/opt-125m",
max_model_len=32,
gpu_memory_utilization=0.1,
disable_log_stats=True,
disable_log_requests=True,
)
async_llm = AsyncLLM.from_engine_args(
engine_args,
start_engine_loop=False,
)
assert mock_register.called, (
"multiprocessing.reducer.register should have been called")
vllm_config_registered = False
for call_args in mock_register.call_args_list:
# Verify that a reducer for VllmConfig was registered
if len(call_args[0]) >= 2 and call_args[0][0] == VllmConfig:
vllm_config_registered = True
reducer_func = call_args[0][1]
assert callable(
reducer_func), "Reducer function should be callable"
break
assert vllm_config_registered, (
"VllmConfig should have been registered to multiprocessing.reducer"
)
async_llm.shutdown()
...@@ -33,7 +33,7 @@ from vllm.inputs import (ExplicitEncoderDecoderPrompt, TextPrompt, ...@@ -33,7 +33,7 @@ from vllm.inputs import (ExplicitEncoderDecoderPrompt, TextPrompt,
from vllm.logger import init_logger from vllm.logger import init_logger
from vllm.outputs import RequestOutput from vllm.outputs import RequestOutput
from vllm.sampling_params import BeamSearchParams from vllm.sampling_params import BeamSearchParams
from vllm.utils import cuda_device_count_stateless from vllm.transformers_utils.utils import maybe_model_redirect
logger = init_logger(__name__) logger = init_logger(__name__)
...@@ -145,6 +145,7 @@ def run_with_both_engines(request, monkeypatch): ...@@ -145,6 +145,7 @@ def run_with_both_engines(request, monkeypatch):
# Automatically runs tests twice, once with V1 and once without # Automatically runs tests twice, once with V1 and once without
use_v1 = request.param use_v1 = request.param
# Tests decorated with `@skip_v1` are only run without v1 # Tests decorated with `@skip_v1` are only run without v1
skip_v0 = request.node.get_closest_marker("skip_v0")
skip_v1 = request.node.get_closest_marker("skip_v1") skip_v1 = request.node.get_closest_marker("skip_v1")
if use_v1: if use_v1:
...@@ -152,6 +153,8 @@ def run_with_both_engines(request, monkeypatch): ...@@ -152,6 +153,8 @@ def run_with_both_engines(request, monkeypatch):
pytest.skip("Skipping test on vllm V1") pytest.skip("Skipping test on vllm V1")
monkeypatch.setenv('VLLM_USE_V1', '1') monkeypatch.setenv('VLLM_USE_V1', '1')
else: else:
if skip_v0:
pytest.skip("Skipping test on vllm V0")
monkeypatch.setenv('VLLM_USE_V1', '0') monkeypatch.setenv('VLLM_USE_V1', '0')
yield yield
...@@ -318,6 +321,7 @@ class HfRunner: ...@@ -318,6 +321,7 @@ class HfRunner:
skip_tokenizer_init: bool = False, skip_tokenizer_init: bool = False,
auto_cls: type[_BaseAutoModelClass] = AutoModelForCausalLM, auto_cls: type[_BaseAutoModelClass] = AutoModelForCausalLM,
) -> None: ) -> None:
model_name = maybe_model_redirect(model_name)
self.model_name = model_name self.model_name = model_name
self.config = AutoConfig.from_pretrained( self.config = AutoConfig.from_pretrained(
...@@ -727,8 +731,12 @@ class HfRunner: ...@@ -727,8 +731,12 @@ class HfRunner:
**kwargs) -> list[list[torch.Tensor]]: **kwargs) -> list[list[torch.Tensor]]:
return self.model.encode(prompts, *args, **kwargs) return self.model.encode(prompts, *args, **kwargs)
def predict(self, prompts: list[list[str]]) -> torch.Tensor: def predict(self, prompts: list[list[str]], *args,
return self.model.predict(prompts, convert_to_tensor=True) **kwargs) -> torch.Tensor:
return self.model.predict(prompts,
*args,
convert_to_tensor=True,
**kwargs)
def __enter__(self): def __enter__(self):
return self return self
...@@ -1018,13 +1026,13 @@ class VllmRunner: ...@@ -1018,13 +1026,13 @@ class VllmRunner:
req_outputs = self.model.classify(prompts) req_outputs = self.model.classify(prompts)
return [req_output.outputs.probs for req_output in req_outputs] return [req_output.outputs.probs for req_output in req_outputs]
def encode(self, def embed(self,
prompts: list[str], prompts: list[str],
images: Optional[PromptImageInput] = None, images: Optional[PromptImageInput] = None,
videos: Optional[PromptVideoInput] = None, videos: Optional[PromptVideoInput] = None,
audios: Optional[PromptAudioInput] = None, audios: Optional[PromptAudioInput] = None,
*args, *args,
**kwargs) -> list[list[float]]: **kwargs) -> list[list[float]]:
inputs = self.get_inputs(prompts, inputs = self.get_inputs(prompts,
images=images, images=images,
videos=videos, videos=videos,
...@@ -1033,12 +1041,18 @@ class VllmRunner: ...@@ -1033,12 +1041,18 @@ class VllmRunner:
req_outputs = self.model.embed(inputs, *args, **kwargs) req_outputs = self.model.embed(inputs, *args, **kwargs)
return [req_output.outputs.embedding for req_output in req_outputs] return [req_output.outputs.embedding for req_output in req_outputs]
def encode(self, prompts: list[str]) -> list[list[float]]:
req_outputs = self.model.encode(prompts)
return [req_output.outputs.data for req_output in req_outputs]
def score( def score(
self, self,
text_1: Union[str, list[str]], text_1: Union[str, list[str]],
text_2: Union[str, list[str]], text_2: Union[str, list[str]],
*args,
**kwargs,
) -> list[float]: ) -> list[float]:
req_outputs = self.model.score(text_1, text_2) req_outputs = self.model.score(text_1, text_2, *args, **kwargs)
return [req_output.outputs.score for req_output in req_outputs] return [req_output.outputs.score for req_output in req_outputs]
def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]: def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]:
...@@ -1079,7 +1093,8 @@ def num_gpus_available(): ...@@ -1079,7 +1093,8 @@ def num_gpus_available():
"""Get number of GPUs without initializing the CUDA context """Get number of GPUs without initializing the CUDA context
in current process.""" in current process."""
return cuda_device_count_stateless() from vllm.platforms import current_platform
return current_platform.device_count()
temp_dir = tempfile.gettempdir() temp_dir = tempfile.gettempdir()
......
...@@ -437,8 +437,8 @@ def test_auto_prefix_caching_with_preemption(baseline_llm_generator, ...@@ -437,8 +437,8 @@ def test_auto_prefix_caching_with_preemption(baseline_llm_generator,
"enable_prefix_caching": True, "enable_prefix_caching": True,
}]) }])
@pytest.mark.parametrize("seed", [1]) @pytest.mark.parametrize("seed", [1])
def test_auto_prefix_caching_after_evition_start(baseline_llm_generator, def test_auto_prefix_caching_after_eviction_start(baseline_llm_generator,
test_llm_generator): test_llm_generator):
"""Verify block manager v2 with auto prefix caching could works normal """Verify block manager v2 with auto prefix caching could works normal
even when eviction started. even when eviction started.
With APC enabled, all blocks are held by native block at the beginning. With APC enabled, all blocks are held by native block at the beginning.
......
...@@ -33,8 +33,8 @@ BLOCK_SIZE = 16 ...@@ -33,8 +33,8 @@ BLOCK_SIZE = 16
@pytest.mark.parametrize("batch_size", [5]) @pytest.mark.parametrize("batch_size", [5])
@pytest.mark.parametrize("seed", [1]) @pytest.mark.parametrize("seed", [1])
@pytest.mark.parametrize("backend", ["FLASH_ATTN", "FLASHINFER", "XFORMERS"]) @pytest.mark.parametrize("backend", ["FLASH_ATTN", "FLASHINFER", "XFORMERS"])
def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator, def test_sliding_window_retrieval(baseline_llm_generator, test_llm_generator,
batch_size, seed, backend, monkeypatch): batch_size, seed, backend, monkeypatch):
""" """
The test does a bunch of assignments "x1 = 10\nx2 = 33\n..." and then The test does a bunch of assignments "x1 = 10\nx2 = 33\n..." and then
asks for value of one of them (which is outside the sliding window). asks for value of one of them (which is outside the sliding window).
...@@ -100,7 +100,7 @@ def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator, ...@@ -100,7 +100,7 @@ def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator,
def test_sliding_window_chunked_prefill(test_llm_generator, batch_size, seed, def test_sliding_window_chunked_prefill(test_llm_generator, batch_size, seed,
backend, monkeypatch): backend, monkeypatch):
""" """
This is similar to test_sliding_window_retrival, however, it doesn't This is similar to test_sliding_window_retrieval, however, it doesn't
compare against the v1 block manager since v1 doesn't support compare against the v1 block manager since v1 doesn't support
chunked prefill with sliding window. chunked prefill with sliding window.
......
...@@ -594,8 +594,8 @@ def test_decode_schedule_preempted(): ...@@ -594,8 +594,8 @@ def test_decode_schedule_preempted():
# should be preempted. 1 will also be preempted. # should be preempted. 1 will also be preempted.
budget = create_token_budget() budget = create_token_budget()
output = scheduler._schedule_running(budget, curr_loras) output = scheduler._schedule_running(budget, curr_loras)
remainig_running = scheduler.running remaining_running = scheduler.running
assert len(remainig_running) == 0 assert len(remaining_running) == 0
assert len(output.decode_seq_groups) == 1 assert len(output.decode_seq_groups) == 1
assert len(output.prefill_seq_groups) == 0 assert len(output.prefill_seq_groups) == 0
assert output.decode_seq_groups[0].seq_group.request_id == "0" assert output.decode_seq_groups[0].seq_group.request_id == "0"
...@@ -1041,3 +1041,297 @@ def test_no_batches_mixed_with_prompt_tokens_and_prompt_embeds(): ...@@ -1041,3 +1041,297 @@ def test_no_batches_mixed_with_prompt_tokens_and_prompt_embeds():
for seq in scheduled_seq_group.seq_group.seqs: for seq in scheduled_seq_group.seq_group.seqs:
seq.status = SequenceStatus.FINISHED_STOPPED seq.status = SequenceStatus.FINISHED_STOPPED
scheduler.free_finished_seq_groups() scheduler.free_finished_seq_groups()
def test_remove_seq_from_computed_blocks_tracker():
"""
Test that computed_blocks_tracker correctly removes stale sequences
during scheduling.
The test covers 9 scheduling branches where stale seqs are removed:
- 1 in _schedule_swapped
- 1 in _schedule_priority_preemption
- 7 in _schedule_prefill
Each branch is tested to ensure proper cleanup of
_seq_id_to_num_tokens_computed.
"""
# Budget can not schedule in swapped
block_size = 2
max_seq_group = 3
seq_tokens_with_swapped: list[list[int]] = []
blocks_to_swap_out: list[tuple[int, int]] = []
curr_loras: set[int] = set()
scheduler = initialize_scheduler(
block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=16,
max_num_seqs=max_seq_group,
enable_prefix_caching=True,
)
budget = create_token_budget(token_budget=15)
seq_length = 16
num_seqs = 3
for i in range(num_seqs):
seq_tokens_with_swapped.append([i] * seq_length)
seq_and_seq_groups = [
create_dummy_prompt(f"{i}",
prompt_tokens=seq_tokens_with_swapped[i],
block_size=block_size)
for i in range(len(seq_tokens_with_swapped))
]
for _, seq_group in seq_and_seq_groups:
scheduler._allocate_and_set_running(seq_group)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)
scheduler._schedule_swapped(budget, curr_loras)
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(1))
assert seq_id_to_num_tokens_computed is None
# Prefill schedule don't have a space for another LoRA, so
# we ignore this request for now.
block_size = 4
lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
scheduler = initialize_scheduler(lora_config=lora_config,
block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64,
enable_prefix_caching=True)
budget = create_token_budget(token_budget=120)
num_seqs = 2
for i in range(num_seqs):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=seq_length,
block_size=block_size,
lora_request=LoRARequest(
lora_name=str(i),
lora_int_id=i + 1,
lora_path="abc"))
scheduler.add_seq_group(seq_group)
scheduler._schedule_prefills(budget, curr_loras)
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(1))
assert seq_id_to_num_tokens_computed is None
# Priority preemption schedule
scheduler._schedule_priority_preemption(budget)
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(1))
assert seq_id_to_num_tokens_computed is None
# Prefill scheduler does not schedule batches with prompt tokens and
# prompt embeddings co-mingled.
block_size = 2
max_seq_group = 3
scheduler = initialize_scheduler(
block_size=block_size,
num_cpu_blocks=16,
num_gpu_blocks=16,
max_num_seqs=max_seq_group,
max_model_len=100,
enable_prefix_caching=True,
)
seq_length = 7
embedding_size = 5
seq_tokens_with_embedding: list[list[int]] = []
seq_embeds: list[Optional[torch.Tensor]] = []
seq_tokens_with_embedding.append(list(range(seq_length)))
seq_embeds.append(None)
seq_tokens_with_embedding.append([0] * seq_length)
seq_embeds.append(torch.rand(embedding_size))
seq_and_seq_groups = [
create_dummy_prompt(f"{i}",
prompt_tokens=seq_tokens_with_embedding[i],
prompt_embeds=seq_embeds[i],
block_size=block_size)
for i in range(len(seq_tokens_with_embedding))
]
for _, seq_group in seq_and_seq_groups:
scheduler.add_seq_group(seq_group)
scheduler._schedule_default()
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(1))
assert seq_id_to_num_tokens_computed is None
# Prefill scheduler budget num_batched_tokens
# >= scheduler_config max_num_batched_tokens
block_size = 2
max_seq_group = 3
seq_tokens_prefill_budget: list[list[int]] = []
scheduler = initialize_scheduler(
block_size=block_size,
max_token_budget=8,
num_cpu_blocks=16,
num_gpu_blocks=16,
max_num_seqs=max_seq_group,
max_model_len=5,
enable_prefix_caching=True,
)
seq_length = 4
num_seqs = 3
for i in range(num_seqs):
seq_tokens_prefill_budget.append([i] * seq_length)
seq_and_seq_groups = [
create_dummy_prompt(f"{i}",
prompt_tokens=seq_tokens_prefill_budget[i],
block_size=block_size)
for i in range(len(seq_tokens_prefill_budget))
]
for _, seq_group in seq_and_seq_groups:
scheduler.add_seq_group(seq_group)
scheduler._schedule_default()
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(2))
assert seq_id_to_num_tokens_computed is None
# Budget can not schedule in waiting
block_size = 2
max_seq_group = 3
scheduler = initialize_scheduler(
block_size=block_size,
max_token_budget=30,
num_cpu_blocks=16,
num_gpu_blocks=16,
max_num_seqs=max_seq_group,
max_model_len=30,
enable_prefix_caching=True,
)
seq_length = 16
num_seqs = 3
seq_tokens_prefill_budget_waiting: list[list[int]] = []
for i in range(num_seqs):
seq_tokens_prefill_budget_waiting.append(list(range(seq_length)))
seq_and_seq_groups = [
create_dummy_prompt(f"{i}",
prompt_tokens=seq_tokens_prefill_budget_waiting[i],
block_size=block_size)
for i in range(len(seq_tokens_prefill_budget_waiting))
]
for _, seq_group in seq_and_seq_groups:
scheduler.add_seq_group(seq_group)
scheduler._schedule_default()
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(1))
assert seq_id_to_num_tokens_computed is None
# Sequence num_new_tokens > prompt_limit marked FINISHED_IGNORED
block_size = 2
max_seq_group = 3
scheduler = initialize_scheduler(
block_size=block_size,
num_cpu_blocks=16,
num_gpu_blocks=16,
max_num_seqs=max_seq_group,
max_model_len=30,
enable_prefix_caching=True,
)
seq_length = 31
seq_tokens_prompt_limit: list[list[int]] = []
seq_tokens_prompt_limit.append(list(range(seq_length)))
seq_and_seq_groups = [
create_dummy_prompt("0",
prompt_tokens=seq_tokens_prompt_limit[0],
block_size=block_size)
]
for _, seq_group in seq_and_seq_groups:
scheduler.add_seq_group(seq_group)
scheduler._schedule_default()
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(0))
assert seq_id_to_num_tokens_computed is None
# Budget can not allocate, AllocStatus is NEVER marked FINISHED_IGNORED
block_size = 2
max_seq_group = 3
scheduler = initialize_scheduler(
block_size=block_size,
num_cpu_blocks=160,
num_gpu_blocks=160,
max_num_seqs=max_seq_group,
max_model_len=320,
enable_prefix_caching=True,
)
seq_length = 320
num_seqs = 1
seq_tokens_never: list[list[int]] = []
for i in range(num_seqs):
seq_tokens_never.append(list(range(seq_length)))
seq_and_seq_groups = [
create_dummy_prompt(f"{i}",
prompt_tokens=seq_tokens_never[i],
block_size=block_size)
for i in range(len(seq_tokens_never))
]
for _, seq_group in seq_and_seq_groups:
scheduler.add_seq_group(seq_group)
scheduler._schedule_default()
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(0))
assert seq_id_to_num_tokens_computed is None
# Budget can not allocate, AllocStatus is LATER
block_size = 2
max_seq_group = 3
scheduler = initialize_scheduler(
block_size=block_size,
num_cpu_blocks=160,
num_gpu_blocks=160,
max_num_seqs=max_seq_group,
max_model_len=320,
enable_prefix_caching=True,
)
seq_length = 160
num_seqs = 2
seq_tokens_later: list[list[int]] = []
for i in range(num_seqs):
seq_tokens_later.append(list(range(seq_length)))
seq_and_seq_groups = [
create_dummy_prompt(f"{i}",
prompt_tokens=seq_tokens_later[i],
block_size=block_size)
for i in range(len(seq_tokens_later))
]
for _, seq_group in seq_and_seq_groups:
scheduler.add_seq_group(seq_group)
scheduler._schedule_default()
seq_id_to_num_tokens_computed = (
scheduler.block_manager._computed_blocks_tracker.
_seq_id_to_num_tokens_computed.get(1))
assert seq_id_to_num_tokens_computed is None
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import ctypes
from concurrent.futures import ThreadPoolExecutor
import pytest
import torch
from vllm.platforms import current_platform
def check_cuda_context():
"""Check CUDA driver context status"""
try:
cuda = ctypes.CDLL('libcuda.so')
device = ctypes.c_int()
result = cuda.cuCtxGetDevice(ctypes.byref(device))
return (True, device.value) if result == 0 else (False, None)
except Exception:
return False, None
def run_cuda_test_in_thread(device_input, expected_device_id):
"""Run CUDA context test in separate thread for isolation"""
try:
# New thread should have no CUDA context initially
valid_before, device_before = check_cuda_context()
if valid_before:
return False, \
"CUDA context should not exist in new thread, " \
f"got device {device_before}"
# Test setting CUDA context
current_platform.set_device(device_input)
# Verify context is created correctly
valid_after, device_id = check_cuda_context()
if not valid_after:
return False, "CUDA context should be valid after set_cuda_context"
if device_id != expected_device_id:
return False, \
f"Expected device {expected_device_id}, got {device_id}"
return True, "Success"
except Exception as e:
return False, f"Exception in thread: {str(e)}"
class TestSetCudaContext:
"""Test suite for the set_cuda_context function."""
@pytest.mark.skipif(not current_platform.is_cuda(),
reason="CUDA not available")
@pytest.mark.parametrize(argnames="device_input,expected_device_id",
argvalues=[
(0, 0),
(torch.device('cuda:0'), 0),
('cuda:0', 0),
],
ids=["int", "torch_device", "string"])
def test_set_cuda_context_parametrized(self, device_input,
expected_device_id):
"""Test setting CUDA context in isolated threads."""
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(run_cuda_test_in_thread, device_input,
expected_device_id)
success, message = future.result(timeout=30)
assert success, message
@pytest.mark.skipif(not current_platform.is_cuda(),
reason="CUDA not available")
def test_set_cuda_context_invalid_device_type(self):
"""Test error handling for invalid device type."""
with pytest.raises(ValueError, match="Expected a cuda device"):
current_platform.set_device(torch.device('cpu'))
if __name__ == "__main__":
pytest.main([__file__, "-v"])
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
import torch
from vllm.distributed.eplb.rebalance_algo import rebalance_experts
def test_basic_rebalance():
"""Test basic rebalancing functionality"""
# Example from https://github.com/deepseek-ai/eplb
weight = torch.tensor([
[90, 132, 40, 61, 104, 165, 39, 4, 73, 56, 183, 86],
[20, 107, 104, 64, 19, 197, 187, 157, 172, 86, 16, 27],
])
num_layers = weight.shape[0]
num_replicas = 16
num_groups = 4
num_nodes = 2
num_gpus = 8
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Verify output shapes
assert phy2log.shape == (
2,
16,
), f"Expected `phy2log` shape (2, 16), got {phy2log.shape}"
assert (log2phy.shape[0] == 2
), f"Expected `log2phy` first dimension 2, got {log2phy.shape[0]}"
assert (
log2phy.shape[1] == 12
), f"Expected `log2phy` second dimension 12, got {log2phy.shape[1]}"
assert logcnt.shape == (
2,
12,
), f"Expected `logcnt` shape (2, 12), got {logcnt.shape}"
# Verify physical to logical expert mapping range is correct
assert torch.all(phy2log >= 0) and torch.all(
phy2log < 12), "Physical to logical mapping should be in range [0, 12)"
# Verify expert count reasonableness
assert torch.all(
logcnt >= 1), "Each logical expert should have at least 1 replica"
assert (
torch.sum(logcnt, dim=1).sum() == num_replicas *
num_layers), f"Total replicas should be {num_replicas * num_layers}"
# Verify expected output
expected_phy2log = torch.tensor([
[5, 6, 5, 7, 8, 4, 3, 4, 10, 9, 10, 2, 0, 1, 11, 1],
[7, 10, 6, 8, 6, 11, 8, 9, 2, 4, 5, 1, 5, 0, 3, 1],
])
assert torch.all(phy2log == expected_phy2log)
expected_logcnt = torch.tensor([[1, 2, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1],
[1, 2, 1, 1, 1, 2, 2, 1, 2, 1, 1, 1]])
assert torch.all(logcnt == expected_logcnt)
def test_single_gpu_case():
"""Test single GPU case"""
weight = torch.tensor([[10, 20, 30, 40]])
num_replicas = 4
num_groups = 1
num_nodes = 1
num_gpus = 1
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Verify shapes
assert phy2log.shape == (1, 4)
assert log2phy.shape[0] == 1
assert log2phy.shape[1] == 4
assert logcnt.shape == (1, 4)
# Verify all logical experts are mapped
assert set(phy2log[0].tolist()) == {0, 1, 2, 3}
def test_equal_weights():
"""Test case with equal weights"""
weight = torch.tensor([[50, 50, 50, 50, 50, 50, 50, 50]])
num_replicas = 8
num_groups = 2
num_nodes = 2
num_gpus = 4
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Verify shapes
assert phy2log.shape == (1, 8)
assert logcnt.shape == (1, 8)
# With equal weights, each expert should have exactly one replica
assert torch.all(
logcnt == 1
), "With equal weights and no replication, " \
"each expert should have exactly 1 replica"
def test_extreme_weight_imbalance():
"""Test extreme weight imbalance case"""
weight = torch.tensor([[1000, 1, 1, 1, 1, 1, 1, 1]])
num_replicas = 12
num_groups = 2
num_nodes = 2
num_gpus = 4
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Verify shapes
assert phy2log.shape == (1, 12)
assert logcnt.shape == (1, 8)
# Expert with highest weight (index 0) should have more replicas
assert (
logcnt[0, 0]
> logcnt[0, 1]), "Expert with highest weight should have more replicas"
def test_multiple_layers():
"""Test multiple layers case"""
weight = torch.tensor([
[10, 20, 30, 40, 50, 60], # First layer
[60, 50, 40, 30, 20, 10], # Second layer (opposite weight pattern)
[25, 25, 25, 25, 25, 25], # Third layer (equal weights)
])
num_replicas = 8
num_groups = 2
num_nodes = 2
num_gpus = 4
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Verify shapes
assert phy2log.shape == (3, 8)
assert logcnt.shape == (3, 6)
# Verify expert allocation is reasonable for each layer
for layer in range(3):
assert torch.all(phy2log[layer] >= 0) and torch.all(
phy2log[layer] < 6
), f"Layer {layer} physical to logical mapping" \
"should be in range [0, 6)"
assert (torch.sum(logcnt[layer]) == num_replicas
), f"Layer {layer} total replicas should be {num_replicas}"
def test_parameter_validation():
"""Test parameter validation"""
weight = torch.tensor([[10, 20, 30, 40]])
# Test non-divisible case - this should handle normally without throwing
# errors because the function will fall back to global load balancing
# strategy
phy2log, log2phy, logcnt = rebalance_experts(weight, 8, 3, 2, 4)
assert phy2log.shape == (1, 8)
assert logcnt.shape == (1, 4)
# Test cases that will actually cause errors:
# num_physical_experts not divisible by num_gpus
with pytest.raises(AssertionError):
rebalance_experts(weight, 7, 2, 2, 4) # 7 not divisible by 4
def test_small_scale_hierarchical():
"""Test small-scale hierarchical load balancing"""
weight = torch.tensor([
[100, 50, 200, 75, 150, 25, 300, 80], # 8 experts
])
num_replicas = 12
num_groups = 4 # 4 groups, 2 experts each
num_nodes = 2 # 2 nodes
num_gpus = 4 # 4 GPUs
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Verify basic constraints
assert phy2log.shape == (1, 12)
assert logcnt.shape == (1, 8)
assert torch.sum(logcnt) == num_replicas
assert torch.all(logcnt >= 1)
# Expert with highest weight should have more replicas
max_weight_expert = torch.argmax(weight[0])
assert (logcnt[0, max_weight_expert]
>= 2), "Highest weight expert should have multiple replicas"
def test_global_load_balance_fallback():
"""Test global load balancing fallback case"""
# When num_groups % num_nodes != 0, should fall back to global load
# balancing
weight = torch.tensor([[10, 20, 30, 40, 50, 60]])
num_replicas = 8
num_groups = 3 # Cannot be divided evenly by num_nodes=2
num_nodes = 2
num_gpus = 4
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Should work normally, just using global load balancing strategy
assert phy2log.shape == (1, 8)
assert logcnt.shape == (1, 6)
assert torch.sum(logcnt) == num_replicas
@pytest.mark.parametrize("device", ["cpu", "cuda"])
def test_device_compatibility(device):
"""Test device compatibility"""
if device == "cuda" and not torch.cuda.is_available():
pytest.skip("CUDA not available")
weight = torch.tensor([[10, 20, 30, 40]], device=device)
num_replicas = 6
num_groups = 2
num_nodes = 1
num_gpus = 2
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
# Function will convert to CPU internally, but should handle different
# device inputs normally
assert phy2log.shape == (1, 6)
assert logcnt.shape == (1, 4)
def test_additional_cases():
"""Test more edge cases and different parameter combinations"""
# Test case 1: Large-scale distributed setup
weight1 = torch.tensor(
[[50, 100, 75, 120, 90, 60, 80, 110, 40, 70, 95, 85, 65, 55, 45, 35]])
phy2log1, log2phy1, logcnt1 = rebalance_experts(weight1, 24, 8, 4, 8)
assert phy2log1.shape == (1, 24)
assert logcnt1.shape == (1, 16)
assert torch.sum(logcnt1) == 24
# Test case 2: Different weight distributions
weight2 = torch.tensor([
[200, 150, 100, 50, 25, 12], # Decreasing weights
[12, 25, 50, 100, 150, 200], # Increasing weights
])
phy2log2, log2phy2, logcnt2 = rebalance_experts(weight2, 10, 3, 1, 2)
assert phy2log2.shape == (2, 10)
assert logcnt2.shape == (2, 6)
# Verify high-weight experts have more replicas
for layer in range(2):
max_weight_idx = torch.argmax(weight2[layer])
assert logcnt2[layer, max_weight_idx] >= 2
if __name__ == "__main__":
weight = torch.tensor([
[90, 132, 40, 61, 104, 165, 39, 4, 73, 56, 183, 86],
[20, 107, 104, 64, 19, 197, 187, 157, 172, 86, 16, 27],
])
num_replicas = 16
num_groups = 4
num_nodes = 2
num_gpus = 8
phy2log, log2phy, logcnt = rebalance_experts(weight, num_replicas,
num_groups, num_nodes,
num_gpus)
print(phy2log)
test_basic_rebalance()
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import multiprocessing
import os
import random
import pytest
import torch
import torch.distributed
from vllm.distributed.eplb.rebalance_execute import (
rearrange_expert_weights_inplace)
from vllm.distributed.parallel_state import (ensure_model_parallel_initialized,
get_tp_group,
init_distributed_environment)
from vllm.utils import update_environment_variables
def distributed_run(fn, world_size):
number_of_processes = world_size
processes: list[multiprocessing.Process] = []
for i in range(number_of_processes):
env: dict[str, str] = {}
env['RANK'] = str(i)
env['LOCAL_RANK'] = str(i)
env['WORLD_SIZE'] = str(number_of_processes)
env['LOCAL_WORLD_SIZE'] = str(number_of_processes)
env['MASTER_ADDR'] = 'localhost'
env['MASTER_PORT'] = '12345'
p = multiprocessing.Process(target=fn, args=(env, ))
processes.append(p)
p.start()
for p in processes:
p.join()
for p in processes:
assert p.exitcode == 0
def worker_fn_wrapper(fn):
# `multiprocessing.Process` cannot accept environment variables directly
# so we need to pass the environment variables as arguments
# and update the environment variables in the function
def wrapped_fn(env):
update_environment_variables(env)
local_rank = os.environ['LOCAL_RANK']
device = torch.device(f"cuda:{local_rank}")
torch.cuda.set_device(device)
init_distributed_environment()
# Ensure each worker process has the same random seed
random.seed(42)
torch.manual_seed(42)
fn()
return wrapped_fn
def create_expert_indices_with_redundancy(
num_layers: int,
num_logical_experts: int,
total_physical_experts: int,
redundancy_config: list[int], # redundancy for each logical expert
) -> torch.Tensor:
"""
Create expert indices with redundancy.
Args:
num_layers: number of layers
num_logical_experts: number of logical experts
total_physical_experts: total number of physical experts
redundancy_config: redundancy for each logical expert
Returns:
indices: Shape (num_layers, total_physical_experts)
"""
assert sum(redundancy_config) == total_physical_experts
assert len(redundancy_config) == num_logical_experts
indices = torch.zeros(num_layers, total_physical_experts, dtype=torch.long)
for layer in range(num_layers):
physical_pos = 0
for logical_expert_id, redundancy in enumerate(redundancy_config):
for _ in range(redundancy):
indices[layer, physical_pos] = logical_expert_id
physical_pos += 1
# Shuffle the indices at dim 1
for layer in range(num_layers):
indices[layer] = indices[layer][torch.randperm(indices.shape[1])]
return indices
def create_expert_weights(
num_layers: int,
num_local_experts: int,
hidden_sizes: list[int],
rank: int,
device: torch.device,
physical_to_logical_mapping: torch.Tensor,
) -> list[list[torch.Tensor]]:
"""
Create fake expert weights tensor for testing.
Use `arange` to generate predictable weights values, based on logical
expert ID.
All replicas of the same logical expert should have the same weights.
Args:
physical_to_logical_mapping: Shape (num_layers, num_local_experts)
mapping[layer, physical_pos] = logical_expert_id
"""
expert_weights = []
for layer in range(num_layers):
layer_weights = []
for weight_idx, hidden_size in enumerate(hidden_sizes):
weight_tensor = torch.zeros(num_local_experts,
hidden_size,
device=device,
dtype=torch.float32)
for local_expert in range(num_local_experts):
# Get the logical expert ID for this physical expert
global_pos = rank * num_local_experts + local_expert
logical_expert_id = physical_to_logical_mapping[
layer, global_pos].item()
# Generate weights based on logical expert ID
# (so that all replicas of the same logical expert have the
# same weights)
base_value = (logical_expert_id * 1000 + layer * 100 +
weight_idx * 10)
weight_tensor[local_expert] = torch.arange(base_value,
base_value +
hidden_size,
device=device,
dtype=torch.float32)
layer_weights.append(weight_tensor)
expert_weights.append(layer_weights)
return expert_weights
def create_redundancy_config(
num_logical_experts: int,
num_physical_experts: int,
) -> list[int]:
"""Create a redundancy configuration."""
redundancy_config = [1] * num_logical_experts
remaining = num_physical_experts - num_logical_experts
# Randomly assign the remaining physical experts to the logical experts
for _ in range(remaining):
redundancy_config[random.choice(range(num_logical_experts))] += 1
return redundancy_config
def verify_expert_weights_after_shuffle(
expert_weights: list[list[torch.Tensor]],
new_indices: torch.Tensor,
hidden_sizes: list[int],
ep_rank: int,
num_local_experts: int,
):
"""Verify the weights after shuffling are correct."""
num_layers = len(expert_weights)
for layer in range(num_layers):
for weight_idx, hidden_size in enumerate(hidden_sizes):
weight_tensor = expert_weights[layer][weight_idx]
for local_expert in range(num_local_experts):
# Calculate the global expert ID for this local expert
global_pos = ep_rank * num_local_experts + local_expert
expected_logical_expert = new_indices[layer, global_pos].item()
# Check if the weights are correct
actual_weights = weight_tensor[local_expert]
expected_base = (expected_logical_expert * 1000 + layer * 100 +
weight_idx * 10)
expected_weights = torch.arange(expected_base,
expected_base + hidden_size,
device=actual_weights.device,
dtype=actual_weights.dtype)
torch.testing.assert_close(
actual_weights,
expected_weights,
msg=f"Layer {layer}, weight {weight_idx},"
f"local expert {local_expert}: "
f"weights do not match. "
f"Expected logical expert {expected_logical_expert}")
def verify_redundant_experts_have_same_weights(
expert_weights: list[list[torch.Tensor]],
indices: torch.Tensor,
hidden_sizes: list[int],
world_size: int,
num_local_experts: int,
):
"""
Verify that all replicas of the same logical expert have the same weights.
"""
num_layers = len(expert_weights)
total_physical_experts = world_size * num_local_experts
for layer in range(num_layers):
# Collect weights for all physical experts for each weight matrix
all_weights: list[torch.Tensor] = []
for weight_idx, hidden_size in enumerate(hidden_sizes):
# Create tensor to store all expert weights
# Shape: [total_physical_experts, hidden_size]
gathered_weights = torch.zeros(
total_physical_experts,
hidden_size,
device=expert_weights[layer][weight_idx].device,
dtype=expert_weights[layer][weight_idx].dtype)
# Use all_gather to collect expert weights from current node
# expert_weights[layer][weight_idx] shape:
# [num_local_experts, hidden_size]
local_weights = expert_weights[layer][
weight_idx] # [num_local_experts, hidden_size]
# Split tensor along dim 0 into a list for all_gather
gathered_weights_list = torch.chunk(gathered_weights,
world_size,
dim=0)
torch.distributed.all_gather(
# Output list: each element corresponds to one rank's weights
list(gathered_weights_list),
local_weights # Input: current rank's local weights
)
all_weights.append(gathered_weights)
# Verify that all replicas of the same logical expert have the same
# weights
logical_expert_weights: dict[int, dict[int, torch.Tensor]] = {}
for physical_pos in range(total_physical_experts):
logical_expert_id = int(indices[layer, physical_pos].item())
if logical_expert_id not in logical_expert_weights:
# First time encountering this logical expert, save its weights
logical_expert_weights[logical_expert_id] = {
weight_idx: all_weights[weight_idx][physical_pos]
for weight_idx in range(len(hidden_sizes))
}
else:
# Verify that current physical expert's weights match the
# previously saved logical expert weights
for weight_idx in range(len(hidden_sizes)):
torch.testing.assert_close(
all_weights[weight_idx][physical_pos],
logical_expert_weights[logical_expert_id][weight_idx],
msg=f"Layer {layer}, weight {weight_idx},"
f"logical expert {logical_expert_id}: "
f"Physical expert {physical_pos} has different weights"
f"than expected")
@pytest.mark.parametrize(
"world_size,num_layers,num_local_experts,num_logical_experts",
[
# 2 GPU, 2 experts per GPU
# 3 logical experts, 4 physical experts, 1 redundant experts
(2, 1, 2, 3),
# 2 GPU, 3 experts per GPU
# 4 logical experts, 6 physical experts, 2 redundant experts
(2, 2, 3, 4),
# 2 GPU, 8 experts per GPU
# 16 logical experts, 16 physical experts, 0 redundant experts
(2, 4, 8, 16),
# 4 GPU, 2 experts per GPU
# 6 logical experts, 8 physical experts, 2 redundant experts
(4, 1, 2, 6),
# 4 GPU, 2 experts per GPU
# 5 logical experts, 8 physical experts, 3 redundant experts
(4, 2, 2, 5),
# 4 GPU, 8 experts per GPU
# 16 logical experts, 32 physical experts, 16 redundant experts
(4, 8, 8, 16),
])
def test_rearrange_expert_weights_with_redundancy(world_size, num_layers,
num_local_experts,
num_logical_experts):
"""Test the functionality of rearranging expert weights with redundancy."""
if torch.cuda.device_count() < world_size:
pytest.skip(f"Need at least {world_size} GPUs to run the test")
@worker_fn_wrapper
def worker_fn():
# Initialize model parallel (using tensor parallel as an entrypoint
# to expert parallel)
ensure_model_parallel_initialized(
tensor_model_parallel_size=world_size,
pipeline_model_parallel_size=1)
ep_group = get_tp_group().cpu_group
ep_rank = torch.distributed.get_rank()
device = torch.device(f"cuda:{ep_rank}")
# Test parameters
total_physical_experts = world_size * num_local_experts
hidden_sizes = [32, 64] # Two different weight matrices
# Create old expert indices (with redundancy)
redundancy_config = create_redundancy_config(num_logical_experts,
total_physical_experts)
old_indices = create_expert_indices_with_redundancy(
num_layers,
num_logical_experts,
total_physical_experts,
redundancy_config,
)
# Create new expert indices (with redundancy)
new_redundancy_config = create_redundancy_config(
num_logical_experts, total_physical_experts)
new_indices = create_expert_indices_with_redundancy(
num_layers,
num_logical_experts,
total_physical_experts,
new_redundancy_config,
)
# Create expert weights
expert_weights = create_expert_weights(num_layers, num_local_experts,
hidden_sizes, ep_rank, device,
old_indices)
# Execute weight rearrangement
rearrange_expert_weights_inplace(
old_indices,
new_indices,
expert_weights,
ep_group,
is_profile=False,
)
# Verify the rearrangement result
verify_expert_weights_after_shuffle(
expert_weights,
new_indices,
hidden_sizes,
ep_rank,
num_local_experts,
)
verify_redundant_experts_have_same_weights(
expert_weights,
new_indices,
hidden_sizes,
world_size,
num_local_experts,
)
distributed_run(worker_fn, world_size)
@pytest.mark.parametrize("world_size", [2, 4])
def test_rearrange_expert_weights_no_change(world_size):
"""
Test that when the indices do not change, the weights should remain
unchanged.
"""
if torch.cuda.device_count() < world_size:
pytest.skip(f"Need at least {world_size} GPUs to run the test")
@worker_fn_wrapper
def worker_fn():
ensure_model_parallel_initialized(
tensor_model_parallel_size=world_size,
pipeline_model_parallel_size=1)
ep_group = get_tp_group().cpu_group
ep_rank = torch.distributed.get_rank()
device = torch.device(f"cuda:{ep_rank}")
num_layers = 2
num_local_experts = 2
total_physical_experts = world_size * num_local_experts
num_logical_experts = total_physical_experts // 2 # Some redundancy
hidden_sizes = [32, 64]
# Create redundancy configuration
redundancy_config = [2] * num_logical_experts
# Same indices - no change
indices = create_expert_indices_with_redundancy(
num_layers, num_logical_experts, total_physical_experts,
redundancy_config)
expert_weights = create_expert_weights(num_layers, num_local_experts,
hidden_sizes, ep_rank, device,
indices)
# Save original weights
original_weights = []
for layer_weights in expert_weights:
layer_copy = []
for weight in layer_weights:
layer_copy.append(weight.clone())
original_weights.append(layer_copy)
# Execute rearrangement (should be no change)
rearrange_expert_weights_inplace(
indices,
indices, # Same indices
expert_weights,
ep_group,
is_profile=False)
# Verify that the weights have not changed
for layer in range(num_layers):
for weight_idx in range(len(hidden_sizes)):
torch.testing.assert_close(
expert_weights[layer][weight_idx],
original_weights[layer][weight_idx],
msg=f"Layer {layer}, weight {weight_idx} should remain "
f"unchanged")
distributed_run(worker_fn, world_size)
@pytest.mark.parametrize("world_size", [2, 4])
def test_rearrange_expert_weights_profile_mode(world_size):
"""Test profile mode (should not copy actual weights)"""
if torch.cuda.device_count() < world_size:
pytest.skip(f"Need at least {world_size} GPUs to run the test")
@worker_fn_wrapper
def worker_fn():
ensure_model_parallel_initialized(
tensor_model_parallel_size=world_size,
pipeline_model_parallel_size=1)
ep_group = get_tp_group().cpu_group
ep_rank = torch.distributed.get_rank()
device = torch.device(f"cuda:{ep_rank}")
num_layers = 1
num_local_experts = 2
total_physical_experts = world_size * num_local_experts
num_logical_experts = total_physical_experts // 2
hidden_sizes = [32]
# Create different index distributions
old_redundancy = create_redundancy_config(num_logical_experts,
total_physical_experts)
new_redundancy = create_redundancy_config(num_logical_experts,
total_physical_experts)
old_indices = create_expert_indices_with_redundancy(
num_layers, num_logical_experts, total_physical_experts,
old_redundancy)
new_indices = create_expert_indices_with_redundancy(
num_layers, num_logical_experts, total_physical_experts,
new_redundancy)
expert_weights = create_expert_weights(num_layers, num_local_experts,
hidden_sizes, ep_rank, device,
old_indices)
# Save original weights
original_weights = []
for layer_weights in expert_weights:
layer_copy = []
for weight in layer_weights:
layer_copy.append(weight.clone())
original_weights.append(layer_copy)
# Execute profile mode rearrangement
rearrange_expert_weights_inplace(
old_indices,
new_indices,
expert_weights,
ep_group,
is_profile=True # Profile mode
)
# In profile mode, the weights should remain unchanged
for layer in range(num_layers):
for weight_idx in range(len(hidden_sizes)):
torch.testing.assert_close(
expert_weights[layer][weight_idx],
original_weights[layer][weight_idx],
msg="In profile mode, the weights should remain unchanged")
distributed_run(worker_fn, world_size)
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import os
import torch.distributed as dist
from vllm.distributed.parallel_state import _node_count
from vllm.distributed.utils import StatelessProcessGroup
from vllm.utils import get_ip, get_open_port
if __name__ == "__main__":
dist.init_process_group(backend="gloo")
rank = dist.get_rank()
world_size = dist.get_world_size()
if rank == 0:
port = get_open_port()
ip = get_ip()
dist.broadcast_object_list([ip, port], src=0)
else:
recv = [None, None]
dist.broadcast_object_list(recv, src=0)
ip, port = recv
stateless_pg = StatelessProcessGroup.create(ip, port, rank, world_size)
for pg in [dist.group.WORLD, stateless_pg]:
test_result = _node_count(pg)
# Expected node count based on environment variable)
expected = int(os.environ.get("NUM_NODES", "1"))
assert test_result == expected, \
f"Expected {expected} nodes, got {test_result}"
if pg == dist.group.WORLD:
print(f"Node count test passed! Got {test_result} nodes "
f"when using torch distributed!")
else:
print(f"Node count test passed! Got {test_result} nodes "
f"when using StatelessProcessGroup!")
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import random
import pytest
import ray
import torch
import torch.distributed as dist
from vllm.distributed.communication_op import ( # noqa
tensor_model_parallel_all_reduce)
from vllm.distributed.parallel_state import (get_tensor_model_parallel_group,
get_tp_group, graph_capture)
from vllm.platforms import current_platform
from ..utils import (ensure_model_parallel_initialized,
init_test_distributed_environment, multi_process_parallel)
torch.manual_seed(42)
random.seed(44)
# Size over 8MB is sufficient for custom quick allreduce.
test_sizes = [
random.randint(8 * 1024 * 1024, 10 * 1024 * 1024) for _ in range(8)
]
for i, v in enumerate(test_sizes):
test_sizes[i] -= v % 8
@ray.remote(num_gpus=1, max_calls=1)
def graph_quickreduce(
monkeypatch: pytest.MonkeyPatch,
tp_size,
pp_size,
rank,
distributed_init_port,
):
with monkeypatch.context() as m:
m.delenv("CUDA_VISIBLE_DEVICES", raising=False)
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
init_test_distributed_environment(tp_size, pp_size, rank,
distributed_init_port)
ensure_model_parallel_initialized(tp_size, pp_size)
group = get_tensor_model_parallel_group().device_group
# A small all_reduce for warmup.
# this is needed because device communicators might be created lazily
# (e.g. NCCL). This will ensure that the communicator is initialized
# before any communication happens, so that this group can be used for
# graph capture immediately.
data = torch.zeros(1)
data = data.to(device=device)
torch.distributed.all_reduce(data, group=group)
torch.cuda.synchronize()
del data
# we use the first group to communicate once
# and the second group to communicate twice
# and so on
# this is used to demonstrate that each group can
# communicate independently
num_communication = rank // tp_size + 1
for sz in test_sizes:
for dtype in [torch.float16, torch.bfloat16]:
with graph_capture(device=device) as graph_capture_context:
inp1 = torch.randint(1,
23, (sz, ),
dtype=dtype,
device=torch.cuda.current_device())
inp2 = torch.randint(-23,
1, (sz, ),
dtype=dtype,
device=torch.cuda.current_device())
torch.cuda.synchronize()
graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(graph,
stream=graph_capture_context.stream):
for _ in range(num_communication):
out1 = tensor_model_parallel_all_reduce(inp1)
dist.all_reduce(inp1, group=group)
out2 = tensor_model_parallel_all_reduce(inp2)
dist.all_reduce(inp2, group=group)
graph.replay()
torch.testing.assert_close(out1, inp1, atol=2.5, rtol=0.1)
torch.testing.assert_close(out2, inp2, atol=2.5, rtol=0.1)
@ray.remote(num_gpus=1, max_calls=1)
def eager_quickreduce(
monkeypatch: pytest.MonkeyPatch,
tp_size,
pp_size,
rank,
distributed_init_port,
):
with monkeypatch.context() as m:
m.delenv("CUDA_VISIBLE_DEVICES", raising=False)
device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device)
init_test_distributed_environment(tp_size, pp_size, rank,
distributed_init_port)
# Size over 8MB is sufficient for custom quick allreduce.
sz = 16 * 1024 * 1024
fa = get_tp_group().device_communicator.qr_comm
inp = torch.tensor([1.0 * ((i) % 23) for i in range(sz)],
dtype=torch.float16,
device=device)
out = fa.quick_all_reduce(inp)
torch.testing.assert_close(out, inp * tp_size, atol=2.5, rtol=0.1)
inp = torch.tensor([1.0 * ((i) % 23) for i in range(sz)],
dtype=torch.bfloat16,
device=device)
out = fa.quick_all_reduce(inp)
torch.testing.assert_close(out, inp * tp_size, atol=2.5, rtol=0.1)
@pytest.mark.skipif(not current_platform.is_rocm(),
reason="only test quick allreduce for rocm")
@pytest.mark.parametrize("quant_mode", ["FP", "INT8", "INT6", "INT4"])
@pytest.mark.parametrize("tp_size", [2])
@pytest.mark.parametrize("pipeline_parallel_size", [1, 2])
@pytest.mark.parametrize("test_target", [graph_quickreduce, eager_quickreduce])
def test_custom_quick_allreduce(monkeypatch: pytest.MonkeyPatch, tp_size,
pipeline_parallel_size, test_target,
quant_mode):
world_size = tp_size * pipeline_parallel_size
if world_size > torch.cuda.device_count():
pytest.skip("Not enough GPUs to run the test.")
monkeypatch.setenv("VLLM_ROCM_QUICK_REDUCE_QUANTIZATION", quant_mode)
multi_process_parallel(monkeypatch, tp_size, pipeline_parallel_size,
test_target)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment