Commit 469e903b authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge tag 'v0.8.2' into v0.8.2-dev

parents 389ebcf7 25f560a6
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from typing import Callable, Iterable, Optional from collections.abc import Iterable
from typing import Callable, Optional
import pytest import pytest
......
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import random import random
from typing import List
import pytest import pytest
import os import os
from tests.kernels.utils import override_backend_env_variable from tests.kernels.utils import override_backend_env_variable
from vllm import LLM, SamplingParams from vllm import LLM, SamplingParams
from vllm.platforms import current_platform
from .conftest import get_text_from_llm_generator from .conftest import get_text_from_llm_generator
from ....utils import models_path_prefix from ....utils import models_path_prefix
...@@ -44,6 +44,11 @@ def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator, ...@@ -44,6 +44,11 @@ def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator,
Additionally, we compare the results of the v1 and v2 managers. Additionally, we compare the results of the v1 and v2 managers.
""" """
if backend == "FLASHINFER" and current_platform.is_rocm():
pytest.skip("Flashinfer does not support ROCm/HIP.")
if backend == "XFORMERS" and current_platform.is_rocm():
pytest.skip("Xformers does not support ROCm/HIP.")
override_backend_env_variable(monkeypatch, backend) override_backend_env_variable(monkeypatch, backend)
sampling_params = SamplingParams( sampling_params = SamplingParams(
...@@ -103,6 +108,10 @@ def test_sliding_window_chunked_prefill(test_llm_generator, batch_size, seed, ...@@ -103,6 +108,10 @@ def test_sliding_window_chunked_prefill(test_llm_generator, batch_size, seed,
The results with and without chunked prefill are not the same due to The results with and without chunked prefill are not the same due to
numerical instabilities. numerical instabilities.
""" """
if backend == "FLASHINFER" and current_platform.is_rocm():
pytest.skip("Flashinfer does not support ROCm/HIP.")
if backend == "XFORMERS" and current_platform.is_rocm():
pytest.skip("Xformers does not support ROCm/HIP.")
override_backend_env_variable(monkeypatch, backend) override_backend_env_variable(monkeypatch, backend)
sampling_params = SamplingParams( sampling_params = SamplingParams(
...@@ -129,9 +138,9 @@ def prep_prompts(batch_size: int): ...@@ -129,9 +138,9 @@ def prep_prompts(batch_size: int):
The prompt is just under 10k tokens; sliding window is 4k The prompt is just under 10k tokens; sliding window is 4k
so the answer is outside sliding window, but should still be correct. so the answer is outside sliding window, but should still be correct.
""" """
prompts: List[str] = [] prompts: list[str] = []
answer: List[int] = [] answer: list[int] = []
indices: List[int] = [] indices: list[int] = []
random.seed(1) random.seed(1)
for _ in range(batch_size): for _ in range(batch_size):
idx = random.randint(30, 90) idx = random.randint(30, 90)
...@@ -150,7 +159,7 @@ def prep_prompts(batch_size: int): ...@@ -150,7 +159,7 @@ def prep_prompts(batch_size: int):
return prompts, answer, indices return prompts, answer, indices
def check_answers(indices: List[int], answer: List[int], outputs: List[str]): def check_answers(indices: list[int], answer: list[int], outputs: list[str]):
answer2 = [int(text[0:2].strip()) for text in outputs] answer2 = [int(text[0:2].strip()) for text in outputs]
print(list(zip(indices, zip(answer, answer2)))) print(list(zip(indices, zip(answer, answer2))))
numok = 0 numok = 0
...@@ -162,7 +171,7 @@ def check_answers(indices: List[int], answer: List[int], outputs: List[str]): ...@@ -162,7 +171,7 @@ def check_answers(indices: List[int], answer: List[int], outputs: List[str]):
assert frac_ok > 0.7 assert frac_ok > 0.7
def check_window(prompts: List[str]): def check_window(prompts: list[str]):
def inner(llm: LLM): def inner(llm: LLM):
sliding_window = llm.llm_engine.model_config.get_sliding_window() sliding_window = llm.llm_engine.model_config.get_sliding_window()
......
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from typing import List
import pytest import pytest
from vllm.core.block.block_table import BlockTable from vllm.core.block.block_table import BlockTable
...@@ -32,7 +30,7 @@ def test_allocate_naive(block_size: int, sequence_len: int): ...@@ -32,7 +30,7 @@ def test_allocate_naive(block_size: int, sequence_len: int):
token_ids = list(range(sequence_len)) token_ids = list(range(sequence_len))
num_blocks_per_alloc = len(list(chunk_list(token_ids, block_size))) num_blocks_per_alloc = len(list(chunk_list(token_ids, block_size)))
block_tables: List[BlockTable] = [] block_tables: list[BlockTable] = []
for i in range(5): for i in range(5):
assert allocator.get_num_free_blocks( assert allocator.get_num_free_blocks(
device=Device.GPU) == num_gpu_blocks - i * num_blocks_per_alloc device=Device.GPU) == num_gpu_blocks - i * num_blocks_per_alloc
...@@ -77,7 +75,7 @@ def test_allocate_prefix_caching(block_size: int, sequence_len: int): ...@@ -77,7 +75,7 @@ def test_allocate_prefix_caching(block_size: int, sequence_len: int):
num_immutable_blocks_per_alloc = len( num_immutable_blocks_per_alloc = len(
chunked_tokens) - num_mutable_blocks_per_alloc chunked_tokens) - num_mutable_blocks_per_alloc
block_tables: List[BlockTable] = [] block_tables: list[BlockTable] = []
for alloc_i in range(1, 6): for alloc_i in range(1, 6):
block_tables.append( block_tables.append(
...@@ -272,7 +270,7 @@ def test_append_token_ids_correct_content(block_size: int, sequence_len: int, ...@@ -272,7 +270,7 @@ def test_append_token_ids_correct_content(block_size: int, sequence_len: int,
) )
block_table.allocate(token_ids=token_ids, device=Device.GPU) block_table.allocate(token_ids=token_ids, device=Device.GPU)
appended_so_far: List[int] = [] appended_so_far: list[int] = []
for append in chunk_list(token_ids_to_append, append_size): for append in chunk_list(token_ids_to_append, append_size):
block_table.append_token_ids(append) block_table.append_token_ids(append)
appended_so_far.extend(append) appended_so_far.extend(append)
......
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from typing import List, Optional from typing import Optional
import pytest import pytest
...@@ -14,7 +14,7 @@ class TestNaiveBlockAllocator: ...@@ -14,7 +14,7 @@ class TestNaiveBlockAllocator:
def create_allocate_lambda(allocate_type: str, def create_allocate_lambda(allocate_type: str,
allocator: NaiveBlockAllocator, allocator: NaiveBlockAllocator,
prev_block: Optional[Block], prev_block: Optional[Block],
token_ids: List[int]): token_ids: list[int]):
if allocate_type == "immutable": if allocate_type == "immutable":
allocate_block = lambda: allocator.allocate_immutable_block( allocate_block = lambda: allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=token_ids) prev_block=prev_block, token_ids=token_ids)
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
import math import math
import random import random
from typing import List, Optional from typing import Optional
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
...@@ -123,11 +123,11 @@ class TestPrefixCachingBlock: ...@@ -123,11 +123,11 @@ class TestPrefixCachingBlock:
@staticmethod @staticmethod
def create_chain(block_size: int, def create_chain(block_size: int,
token_ids: List[int], token_ids: list[int],
num_empty_trailing_blocks=0) -> List[PrefixCachingBlock]: num_empty_trailing_blocks=0) -> list[PrefixCachingBlock]:
"""Helper method which creates a chain of blocks. """Helper method which creates a chain of blocks.
""" """
blocks: List[PrefixCachingBlock] = [] blocks: list[PrefixCachingBlock] = []
num_blocks = math.ceil( num_blocks = math.ceil(
len(token_ids) / block_size) + num_empty_trailing_blocks len(token_ids) / block_size) + num_empty_trailing_blocks
...@@ -161,7 +161,7 @@ class TestPrefixCachingBlockAllocator: ...@@ -161,7 +161,7 @@ class TestPrefixCachingBlockAllocator:
@staticmethod @staticmethod
def create_allocate_lambda(allocate_type: str, allocator: BlockAllocator, def create_allocate_lambda(allocate_type: str, allocator: BlockAllocator,
prev_block: Optional[Block], prev_block: Optional[Block],
token_ids: List[int]): token_ids: list[int]):
if allocate_type == "immutable": if allocate_type == "immutable":
allocate_block = lambda: allocator.allocate_immutable_block( allocate_block = lambda: allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=token_ids) prev_block=prev_block, token_ids=token_ids)
...@@ -839,13 +839,13 @@ class TestPrefixCachingBlockAllocator: ...@@ -839,13 +839,13 @@ class TestPrefixCachingBlockAllocator:
@staticmethod @staticmethod
def create_immutable_chain( def create_immutable_chain(
block_size: int, block_size: int,
token_ids: List[int], token_ids: list[int],
allocator: PrefixCachingBlockAllocator, allocator: PrefixCachingBlockAllocator,
extra_hash: Optional[int] = None, extra_hash: Optional[int] = None,
) -> List[PrefixCachingBlock]: ) -> list[PrefixCachingBlock]:
"""Helper method which creates a chain of blocks. """Helper method which creates a chain of blocks.
""" """
blocks: List[Block] = [] blocks: list[Block] = []
num_blocks = math.ceil(len(token_ids) / block_size) num_blocks = math.ceil(len(token_ids) / block_size)
if num_blocks == 0: if num_blocks == 0:
......
# SPDX-License-Identifier: Apache-2.0
import pytest
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
Since this module is V0 only, set VLLM_USE_V1=0 for
all tests in the module.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from typing import List
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest # noqa import pytest # noqa
...@@ -46,7 +45,7 @@ def test_simple(): ...@@ -46,7 +45,7 @@ def test_simple():
cache_config.num_cpu_blocks = 8 cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8 cache_config.num_gpu_blocks = 8
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(num_seq_group): for i in range(num_seq_group):
...@@ -93,7 +92,7 @@ def test_chunk(): ...@@ -93,7 +92,7 @@ def test_chunk():
cache_config.num_cpu_blocks = 32 cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32 cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(2): for i in range(2):
...@@ -145,7 +144,7 @@ def test_concurrent_chunking(): ...@@ -145,7 +144,7 @@ def test_concurrent_chunking():
cache_config.num_cpu_blocks = 32 cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32 cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(2): for i in range(2):
...@@ -226,8 +225,8 @@ def test_short_prompts_jump_long_prompts_in_queue(): ...@@ -226,8 +225,8 @@ def test_short_prompts_jump_long_prompts_in_queue():
cache_config.num_cpu_blocks = 3200 # large KV cache size for large requests cache_config.num_cpu_blocks = 3200 # large KV cache size for large requests
cache_config.num_gpu_blocks = 3200 cache_config.num_gpu_blocks = 3200
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
long_seqs: List[SequenceGroup] = [] long_seqs: list[SequenceGroup] = []
short_seqs: List[SequenceGroup] = [] short_seqs: list[SequenceGroup] = []
# Add 2 large seq groups to scheduler. # Add 2 large seq groups to scheduler.
for i in range(2): for i in range(2):
...@@ -368,7 +367,7 @@ def test_complex(): ...@@ -368,7 +367,7 @@ def test_complex():
cache_config.num_cpu_blocks = 64 cache_config.num_cpu_blocks = 64
cache_config.num_gpu_blocks = 64 cache_config.num_gpu_blocks = 64
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(2): for i in range(2):
...@@ -439,7 +438,7 @@ def test_maximal_decoding(): ...@@ -439,7 +438,7 @@ def test_maximal_decoding():
cache_config.num_cpu_blocks = 8 cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8 cache_config.num_gpu_blocks = 8
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(2): for i in range(2):
...@@ -533,7 +532,7 @@ def test_prompt_limit(): ...@@ -533,7 +532,7 @@ def test_prompt_limit():
cache_config.num_cpu_blocks = 16 cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16 cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
_, seq_group = create_dummy_prompt("1", _, seq_group = create_dummy_prompt("1",
prompt_length=48, prompt_length=48,
...@@ -565,7 +564,7 @@ def test_prompt_limit_exceed(): ...@@ -565,7 +564,7 @@ def test_prompt_limit_exceed():
cache_config.num_cpu_blocks = 16 cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16 cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
_, seq_group = create_dummy_prompt("2", _, seq_group = create_dummy_prompt("2",
prompt_length=48, prompt_length=48,
block_size=block_size) block_size=block_size)
...@@ -699,7 +698,7 @@ def test_chunked_prefill_max_seqs(): ...@@ -699,7 +698,7 @@ def test_chunked_prefill_max_seqs():
cache_config.num_cpu_blocks = 128 cache_config.num_cpu_blocks = 128
cache_config.num_gpu_blocks = 128 cache_config.num_gpu_blocks = 128
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
_, seq_group = create_dummy_prompt("1", _, seq_group = create_dummy_prompt("1",
prompt_length=65, prompt_length=65,
...@@ -758,7 +757,7 @@ def test_prefix_caching(): ...@@ -758,7 +757,7 @@ def test_prefix_caching():
cache_config.num_cpu_blocks = 0 cache_config.num_cpu_blocks = 0
cache_config.num_gpu_blocks = 32 cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(2): for i in range(2):
...@@ -800,7 +799,7 @@ def test_prefix_caching_with_concurrent_partial_prefills(): ...@@ -800,7 +799,7 @@ def test_prefix_caching_with_concurrent_partial_prefills():
cache_config.num_cpu_blocks = 0 cache_config.num_cpu_blocks = 0
cache_config.num_gpu_blocks = 32 cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(2): for i in range(2):
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
import time import time
from collections import deque from collections import deque
from typing import List, Set, Tuple
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest # noqa import pytest # noqa
...@@ -57,7 +56,7 @@ def test_scheduler_abort_seq_group(): ...@@ -57,7 +56,7 @@ def test_scheduler_abort_seq_group():
# Add multiple seq groups to scheduler. # Add multiple seq groups to scheduler.
num_seq_group = 4 num_seq_group = 4
request_ids: Set[str] = set() request_ids: set[str] = set()
for i in range(num_seq_group): for i in range(num_seq_group):
_, seq_group = create_dummy_prompt(str(i), block_size) _, seq_group = create_dummy_prompt(str(i), block_size)
scheduler.add_seq_group(seq_group) scheduler.add_seq_group(seq_group)
...@@ -83,7 +82,7 @@ def test_scheduler_schedule_simple(): ...@@ -83,7 +82,7 @@ def test_scheduler_schedule_simple():
cache_config.num_cpu_blocks = 8 cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8 cache_config.num_gpu_blocks = 8
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(num_seq_group): for i in range(num_seq_group):
...@@ -221,7 +220,7 @@ def test_scheduler_max_seqs(): ...@@ -221,7 +220,7 @@ def test_scheduler_max_seqs():
cache_config.num_gpu_blocks = 8 cache_config.num_gpu_blocks = 8
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
all_seq_groups: List[SequenceGroup] = [] all_seq_groups: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
for i in range(num_seq_group): for i in range(num_seq_group):
_, seq_group = create_dummy_prompt(str(i), _, seq_group = create_dummy_prompt(str(i),
...@@ -480,7 +479,7 @@ def test_prefill_schedule_max_lora(): ...@@ -480,7 +479,7 @@ def test_prefill_schedule_max_lora():
num_cpu_blocks=64, num_cpu_blocks=64,
num_gpu_blocks=64) num_gpu_blocks=64)
budget = create_token_budget(token_budget=120) budget = create_token_budget(token_budget=120)
curr_loras: Set[int] = set() curr_loras: set[int] = set()
for i in range(2): for i in range(2):
_, seq_group = create_dummy_prompt(str(i), _, seq_group = create_dummy_prompt(str(i),
prompt_length=60, prompt_length=60,
...@@ -491,7 +490,7 @@ def test_prefill_schedule_max_lora(): ...@@ -491,7 +490,7 @@ def test_prefill_schedule_max_lora():
lora_path="abc")) lora_path="abc"))
scheduler.add_seq_group(seq_group) scheduler.add_seq_group(seq_group)
# Add two more requests to verify lora is prioritized. # Add two more requests to verify lora is prioritized.
# 0: Lora, 1: Lora, 2: regular, 3: regular # 0: LoRA, 1: LoRA, 2: regular, 3: regular
# In the first iteration, index 0, 2 is scheduled. # In the first iteration, index 0, 2 is scheduled.
# If a request is not scheduled because it hits max lora, it is # If a request is not scheduled because it hits max lora, it is
# prioritized. Verify that. # prioritized. Verify that.
...@@ -618,7 +617,6 @@ def test_schedule_decode_blocks_to_copy_update(): ...@@ -618,7 +617,6 @@ def test_schedule_decode_blocks_to_copy_update():
num_gpu_blocks=16) num_gpu_blocks=16)
_, seq_group = create_dummy_prompt("1", _, seq_group = create_dummy_prompt("1",
prompt_length=60, prompt_length=60,
best_of=2,
block_size=block_size) block_size=block_size)
curr_loras = None curr_loras = None
scheduler._allocate_and_set_running(seq_group) scheduler._allocate_and_set_running(seq_group)
...@@ -651,8 +649,8 @@ def test_schedule_swapped_max_loras(): ...@@ -651,8 +649,8 @@ def test_schedule_swapped_max_loras():
block_size=block_size, block_size=block_size,
num_cpu_blocks=32, num_cpu_blocks=32,
num_gpu_blocks=32) num_gpu_blocks=32)
curr_loras: Set[int] = set() curr_loras: set[int] = set()
blocks_to_swap_out: List[Tuple[int, int]] = [] blocks_to_swap_out: list[tuple[int, int]] = []
for i in range(2): for i in range(2):
_, seq_group = create_dummy_prompt(str(i), _, seq_group = create_dummy_prompt(str(i),
prompt_length=60, prompt_length=60,
...@@ -683,11 +681,10 @@ def test_schedule_swapped_cannot_swap_in(): ...@@ -683,11 +681,10 @@ def test_schedule_swapped_cannot_swap_in():
num_cpu_blocks=32, num_cpu_blocks=32,
num_gpu_blocks=32) num_gpu_blocks=32)
curr_loras = None curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = [] blocks_to_swap_out: list[tuple[int, int]] = []
for i in range(2): for i in range(2):
_, seq_group = create_dummy_prompt(str(i), _, seq_group = create_dummy_prompt(str(i),
prompt_length=60, prompt_length=60,
best_of=2,
block_size=block_size) block_size=block_size)
scheduler._allocate_and_set_running(seq_group) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
...@@ -714,11 +711,10 @@ def test_infeasible_swap(): ...@@ -714,11 +711,10 @@ def test_infeasible_swap():
num_cpu_blocks=32, num_cpu_blocks=32,
num_gpu_blocks=32) num_gpu_blocks=32)
curr_loras = None curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = [] blocks_to_swap_out: list[tuple[int, int]] = []
for i in range(2): for i in range(2):
_, seq_group = create_dummy_prompt(str(i), _, seq_group = create_dummy_prompt(str(i),
prompt_length=60, prompt_length=60,
best_of=2,
block_size=block_size) block_size=block_size)
scheduler._allocate_and_set_running(seq_group) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
...@@ -748,11 +744,10 @@ def test_schedule_swapped_blocks_to_copy(): ...@@ -748,11 +744,10 @@ def test_schedule_swapped_blocks_to_copy():
curr_loras = None curr_loras = None
_, seq_group = create_dummy_prompt("1", _, seq_group = create_dummy_prompt("1",
prompt_length=60, prompt_length=60,
best_of=2,
block_size=block_size) block_size=block_size)
scheduler._allocate_and_set_running(seq_group) scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1) append_new_token_seq_group(60, seq_group, 1)
blocks_to_swap_out: List[Tuple[int, int]] = [] blocks_to_swap_out: list[tuple[int, int]] = []
scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group) scheduler._add_seq_group_to_swapped(seq_group)
......
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
from typing import List
import pytest # noqa import pytest # noqa
from vllm.config import CacheConfig, SchedulerConfig from vllm.config import CacheConfig, SchedulerConfig
...@@ -48,7 +46,7 @@ def test_scheduler_schedule_simple_encoder_decoder(): ...@@ -48,7 +46,7 @@ def test_scheduler_schedule_simple_encoder_decoder():
cache_config.num_cpu_blocks = 16 # enc and dec prompts per seq_group cache_config.num_cpu_blocks = 16 # enc and dec prompts per seq_group
cache_config.num_gpu_blocks = 16 # enc and dec prompts per seq_group cache_config.num_gpu_blocks = 16 # enc and dec prompts per seq_group
scheduler = Scheduler(scheduler_config, cache_config, None) scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = [] running: list[SequenceGroup] = []
# Add seq groups to scheduler. # Add seq groups to scheduler.
req_id_list = [] req_id_list = []
......
...@@ -2,9 +2,8 @@ ...@@ -2,9 +2,8 @@
import time import time
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List, Optional from collections.abc import Sequence as GenericSequence
from typing import Sequence as GenericSequence from typing import Any, Optional
from typing import Tuple
from vllm import SamplingParams from vllm import SamplingParams
from vllm.core.scheduler import Scheduler, SchedulerOutputs from vllm.core.scheduler import Scheduler, SchedulerOutputs
...@@ -19,11 +18,10 @@ def create_dummy_prompt( ...@@ -19,11 +18,10 @@ def create_dummy_prompt(
prompt_length: int = -1, prompt_length: int = -1,
block_size: Optional[int] = None, block_size: Optional[int] = None,
lora_request: Optional[LoRARequest] = None, lora_request: Optional[LoRARequest] = None,
best_of: int = 1, prompt_tokens: Optional[list[int]] = None,
prompt_tokens: Optional[List[int]] = None,
min_tokens: int = 0, min_tokens: int = 0,
max_tokens: int = 16, max_tokens: int = 16,
) -> Tuple[Sequence, SequenceGroup]: ) -> tuple[Sequence, SequenceGroup]:
if not block_size: if not block_size:
block_size = prompt_length block_size = prompt_length
...@@ -33,22 +31,24 @@ def create_dummy_prompt( ...@@ -33,22 +31,24 @@ def create_dummy_prompt(
prompt_tokens = list(range(prompt_length)) prompt_tokens = list(range(prompt_length))
prompt_str = " ".join([str(t) for t in prompt_tokens]) prompt_str = " ".join([str(t) for t in prompt_tokens])
prompt = Sequence(int(request_id), prompt = Sequence(
inputs=token_inputs(prompt_tokens, prompt=prompt_str), int(request_id),
block_size=block_size) inputs=token_inputs(prompt_tokens, prompt=prompt_str),
seq_group = SequenceGroup(request_id=request_id, block_size=block_size,
seqs=[prompt], )
arrival_time=time.time(), seq_group = SequenceGroup(
sampling_params=SamplingParams( request_id=request_id,
best_of=best_of, seqs=[prompt],
max_tokens=max_tokens, arrival_time=time.time(),
min_tokens=min_tokens), sampling_params=SamplingParams(max_tokens=max_tokens,
lora_request=lora_request) min_tokens=min_tokens),
lora_request=lora_request,
)
return prompt, seq_group return prompt, seq_group
def create_dummy_lora_sequence(request_id: int, token_ids: List[int], def create_dummy_lora_sequence(request_id: int, token_ids: list[int],
block_size: int, lora_int_id: int) -> Sequence: block_size: int, lora_int_id: int) -> Sequence:
return Sequence(seq_id=request_id, return Sequence(seq_id=request_id,
inputs=token_inputs(token_ids), inputs=token_inputs(token_ids),
...@@ -58,7 +58,7 @@ def create_dummy_lora_sequence(request_id: int, token_ids: List[int], ...@@ -58,7 +58,7 @@ def create_dummy_lora_sequence(request_id: int, token_ids: List[int],
lora_int_id=lora_int_id)) lora_int_id=lora_int_id))
def create_dummy_sequence(request_id: int, token_ids: List[int], def create_dummy_sequence(request_id: int, token_ids: list[int],
block_size: int) -> Sequence: block_size: int) -> Sequence:
return Sequence( return Sequence(
seq_id=request_id, seq_id=request_id,
...@@ -73,8 +73,7 @@ def create_dummy_prompt_encoder_decoder( ...@@ -73,8 +73,7 @@ def create_dummy_prompt_encoder_decoder(
encoder_prompt_length: int, encoder_prompt_length: int,
block_size: Optional[int] = None, block_size: Optional[int] = None,
lora_request: Optional[LoRARequest] = None, lora_request: Optional[LoRARequest] = None,
best_of: int = 1, ) -> tuple[Sequence, Sequence, SequenceGroup]:
) -> Tuple[Sequence, Sequence, SequenceGroup]:
if not block_size: if not block_size:
block_size = decoder_prompt_length block_size = decoder_prompt_length
...@@ -103,7 +102,6 @@ def create_dummy_prompt_encoder_decoder( ...@@ -103,7 +102,6 @@ def create_dummy_prompt_encoder_decoder(
seq_group = SequenceGroup(request_id=request_id, seq_group = SequenceGroup(request_id=request_id,
seqs=[decoder_prompt], seqs=[decoder_prompt],
sampling_params=SamplingParams(best_of=best_of),
arrival_time=time.time(), arrival_time=time.time(),
lora_request=lora_request, lora_request=lora_request,
encoder_seq=encoder_prompt) encoder_seq=encoder_prompt)
...@@ -125,7 +123,7 @@ def create_seq_group( ...@@ -125,7 +123,7 @@ def create_seq_group(
prompt_token_ids = [0] * seq_prompt_len prompt_token_ids = [0] * seq_prompt_len
seqs: List[Sequence] = [] seqs: list[Sequence] = []
for seq_id_offset, output_len in enumerate(seq_output_lens): for seq_id_offset, output_len in enumerate(seq_output_lens):
seq = Sequence( seq = Sequence(
seq_id=seq_id_start + seq_id_offset, seq_id=seq_id_start + seq_id_offset,
...@@ -241,7 +239,7 @@ class SchedulerProxy: ...@@ -241,7 +239,7 @@ class SchedulerProxy:
def __init__(self, scheduler: Scheduler): def __init__(self, scheduler: Scheduler):
self.scheduler_ = scheduler self.scheduler_ = scheduler
self.call_history: Dict[str, List[Any]] = defaultdict(list) self.call_history: dict[str, list[Any]] = defaultdict(list)
def __getattr__(self, name: str) -> Any: def __getattr__(self, name: str) -> Any:
...@@ -253,6 +251,6 @@ class SchedulerProxy: ...@@ -253,6 +251,6 @@ class SchedulerProxy:
return wrapper return wrapper
def last_schedule_ret( def last_schedule_ret(
self, ) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs, Any]: self, ) -> tuple[list[SequenceGroupMetadata], SchedulerOutputs, Any]:
_, _, ret = self.call_history["schedule"][-1] _, _, ret = self.call_history["schedule"][-1]
return ret return ret
# SPDX-License-Identifier: Apache-2.0
import pytest
@pytest.fixture(autouse=True)
def v1(run_with_both_engines):
# Simple autouse wrapper to run both engines for each test
# This can be promoted up to conftest.py to run for every
# test in a package
pass
...@@ -2,15 +2,12 @@ ...@@ -2,15 +2,12 @@
import pytest import pytest
from vllm.config import LoadFormat
from vllm.entrypoints.llm import LLM from vllm.entrypoints.llm import LLM
from vllm.sampling_params import SamplingParams from vllm.sampling_params import SamplingParams
from ..conftest import MODEL_WEIGHTS_S3_BUCKET
@pytest.mark.skip_v1
@pytest.mark.parametrize("model", @pytest.mark.parametrize("model", ["distilbert/distilgpt2"])
[f"{MODEL_WEIGHTS_S3_BUCKET}/distilbert/distilgpt2"])
def test_computed_prefix_blocks(model: str): def test_computed_prefix_blocks(model: str):
# This test checks if the engine generates completions both with and # This test checks if the engine generates completions both with and
# without optional detokenization, that detokenization includes text # without optional detokenization, that detokenization includes text
...@@ -21,7 +18,7 @@ def test_computed_prefix_blocks(model: str): ...@@ -21,7 +18,7 @@ def test_computed_prefix_blocks(model: str):
"paper clips? Is there an easy to follow video tutorial available " "paper clips? Is there an easy to follow video tutorial available "
"online for free?") "online for free?")
llm = LLM(model=model, load_format=LoadFormat.RUNAI_STREAMER) llm = LLM(model=model)
sampling_params = SamplingParams(max_tokens=10, sampling_params = SamplingParams(max_tokens=10,
temperature=0.0, temperature=0.0,
detokenize=False) detokenize=False)
......
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Optional
import pytest
from vllm import LLM, SamplingParams, envs
MODEL = "meta-llama/llama-2-7b-hf"
MAX_TOKENS = 200
def _test_stopping(llm: LLM,
expected_output: str,
expected_reason: Any,
stop: Optional[list[str]] = None,
stop_token_ids: Optional[list[int]] = None,
include_in_output: bool = False) -> None:
output = llm.generate(
"A story about vLLM:\n",
SamplingParams(
temperature=0.0,
max_tokens=MAX_TOKENS,
stop=stop,
stop_token_ids=stop_token_ids,
include_stop_str_in_output=include_in_output,
))[0].outputs[0]
assert output is not None
assert output.text == expected_output
assert output.stop_reason == expected_reason
def _set_async_mode(llm, is_async):
llm.llm_engine.scheduler[0].use_async_output_proc = is_async
def _stop_basic(llm):
_test_stopping(llm,
stop=["."],
include_in_output=False,
expected_output="VLLM is a 100% volunteer organization",
expected_reason=".")
_test_stopping(llm,
stop=["."],
include_in_output=True,
expected_output="VLLM is a 100% volunteer organization.",
expected_reason=".")
def _stop_multi_tokens(llm):
_test_stopping(
llm,
stop=["group of peo", "short"],
include_in_output=False,
expected_output="VLLM is a 100% volunteer organization. We are a ",
expected_reason="group of peo")
_test_stopping(
llm,
stop=["group of peo", "short"],
include_in_output=True,
expected_output=
"VLLM is a 100% volunteer organization. We are a group of peo",
expected_reason="group of peo")
def _stop_partial_token(llm):
_test_stopping(llm,
stop=["gani"],
include_in_output=False,
expected_output="VLLM is a 100% volunteer or",
expected_reason="gani")
_test_stopping(llm,
stop=["gani"],
include_in_output=True,
expected_output="VLLM is a 100% volunteer organi",
expected_reason="gani")
def _stop_token_id(llm):
# token id 13013 => " organization"
_test_stopping(llm,
stop_token_ids=[13013],
include_in_output=False,
expected_output="VLLM is a 100% volunteer",
expected_reason=13013)
_test_stopping(llm,
stop_token_ids=[13013],
include_in_output=True,
expected_output="VLLM is a 100% volunteer organization",
expected_reason=13013)
@pytest.mark.skip_global_cleanup
def test_stop_strings():
# If V0, must set enforce_eager=False since we use
# async output processing below.
vllm_model = LLM(MODEL, enforce_eager=envs.VLLM_USE_V1)
if envs.VLLM_USE_V1:
_stop_basic(vllm_model)
else:
_set_async_mode(vllm_model, True)
_stop_basic(vllm_model)
_set_async_mode(vllm_model, False)
_stop_basic(vllm_model)
if envs.VLLM_USE_V1:
_stop_multi_tokens(vllm_model)
else:
_set_async_mode(vllm_model, True)
_stop_multi_tokens(vllm_model)
_set_async_mode(vllm_model, False)
_stop_multi_tokens(vllm_model)
if envs.VLLM_USE_V1:
_stop_partial_token(vllm_model)
else:
_set_async_mode(vllm_model, True)
_stop_partial_token(vllm_model)
_set_async_mode(vllm_model, False)
_stop_partial_token(vllm_model)
if envs.VLLM_USE_V1:
# FIXME: this does not respect include_in_output=False
# _stop_token_id(vllm_model)
pass
else:
_set_async_mode(vllm_model, True)
_stop_token_id(vllm_model)
_set_async_mode(vllm_model, False)
_stop_token_id(vllm_model)
...@@ -3,7 +3,10 @@ ...@@ -3,7 +3,10 @@
Run `pytest tests/distributed/test_comm_ops.py`. Run `pytest tests/distributed/test_comm_ops.py`.
""" """
import os
from __future__ import annotations
from typing import Any, Callable
import pytest import pytest
import ray import ray
...@@ -17,12 +20,18 @@ from ..utils import init_test_distributed_environment, multi_process_parallel ...@@ -17,12 +20,18 @@ from ..utils import init_test_distributed_environment, multi_process_parallel
@ray.remote(num_gpus=1, max_calls=1) @ray.remote(num_gpus=1, max_calls=1)
def all_reduce_test_worker(tp_size: int, pp_size: int, rank: int, def all_reduce_test_worker(
distributed_init_port: str): monkeypatch: pytest.MonkeyPatch,
tp_size: int,
pp_size: int,
rank: int,
distributed_init_port: str,
):
# it is important to delete the CUDA_VISIBLE_DEVICES environment variable # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
# so that each worker can see all the GPUs # so that each worker can see all the GPUs
# they will be able to set the device to the correct GPU # they will be able to set the device to the correct GPU
os.environ.pop("CUDA_VISIBLE_DEVICES", None) monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False)
device = torch.device(f"cuda:{rank}") device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device) torch.cuda.set_device(device)
init_test_distributed_environment(tp_size, pp_size, rank, init_test_distributed_environment(tp_size, pp_size, rank,
...@@ -39,12 +48,17 @@ def all_reduce_test_worker(tp_size: int, pp_size: int, rank: int, ...@@ -39,12 +48,17 @@ def all_reduce_test_worker(tp_size: int, pp_size: int, rank: int,
@ray.remote(num_gpus=1, max_calls=1) @ray.remote(num_gpus=1, max_calls=1)
def all_gather_test_worker(tp_size: int, pp_size: int, rank: int, def all_gather_test_worker(
distributed_init_port: str): monkeypatch: pytest.MonkeyPatch,
tp_size: int,
pp_size: int,
rank: int,
distributed_init_port: str,
):
# it is important to delete the CUDA_VISIBLE_DEVICES environment variable # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
# so that each worker can see all the GPUs # so that each worker can see all the GPUs
# they will be able to set the device to the correct GPU # they will be able to set the device to the correct GPU
os.environ.pop("CUDA_VISIBLE_DEVICES", None) monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False)
device = torch.device(f"cuda:{rank}") device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device) torch.cuda.set_device(device)
init_test_distributed_environment(tp_size, pp_size, rank, init_test_distributed_environment(tp_size, pp_size, rank,
...@@ -67,12 +81,17 @@ def all_gather_test_worker(tp_size: int, pp_size: int, rank: int, ...@@ -67,12 +81,17 @@ def all_gather_test_worker(tp_size: int, pp_size: int, rank: int,
@ray.remote(num_gpus=1, max_calls=1) @ray.remote(num_gpus=1, max_calls=1)
def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int, def broadcast_tensor_dict_test_worker(
distributed_init_port: str): monkeypatch: pytest.MonkeyPatch,
tp_size: int,
pp_size: int,
rank: int,
distributed_init_port: str,
):
# it is important to delete the CUDA_VISIBLE_DEVICES environment variable # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
# so that each worker can see all the GPUs # so that each worker can see all the GPUs
# they will be able to set the device to the correct GPU # they will be able to set the device to the correct GPU
os.environ.pop("CUDA_VISIBLE_DEVICES", None) monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False)
device = torch.device(f"cuda:{rank}") device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device) torch.cuda.set_device(device)
init_test_distributed_environment(tp_size, pp_size, rank, init_test_distributed_environment(tp_size, pp_size, rank,
...@@ -106,9 +125,14 @@ def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int, ...@@ -106,9 +125,14 @@ def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
@ray.remote(num_gpus=1, max_calls=1) @ray.remote(num_gpus=1, max_calls=1)
def send_recv_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int, def send_recv_tensor_dict_test_worker(
distributed_init_port: str): monkeypatch: pytest.MonkeyPatch,
os.environ.pop("CUDA_VISIBLE_DEVICES", None) tp_size: int,
pp_size: int,
rank: int,
distributed_init_port: str,
):
monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False)
device = torch.device(f"cuda:{rank}") device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device) torch.cuda.set_device(device)
init_test_distributed_environment(tp_size, pp_size, rank, init_test_distributed_environment(tp_size, pp_size, rank,
...@@ -146,9 +170,14 @@ def send_recv_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int, ...@@ -146,9 +170,14 @@ def send_recv_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
@ray.remote(num_gpus=1, max_calls=1) @ray.remote(num_gpus=1, max_calls=1)
def send_recv_test_worker(tp_size: int, pp_size: int, rank: int, def send_recv_test_worker(
distributed_init_port: str): monkeypatch: pytest.MonkeyPatch,
os.environ.pop("CUDA_VISIBLE_DEVICES", None) tp_size: int,
pp_size: int,
rank: int,
distributed_init_port: str,
):
monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False)
device = torch.device(f"cuda:{rank}") device = torch.device(f"cuda:{rank}")
torch.cuda.set_device(device) torch.cuda.set_device(device)
init_test_distributed_environment(tp_size, pp_size, rank, init_test_distributed_environment(tp_size, pp_size, rank,
...@@ -174,8 +203,12 @@ def send_recv_test_worker(tp_size: int, pp_size: int, rank: int, ...@@ -174,8 +203,12 @@ def send_recv_test_worker(tp_size: int, pp_size: int, rank: int,
all_reduce_test_worker, all_gather_test_worker, all_reduce_test_worker, all_gather_test_worker,
broadcast_tensor_dict_test_worker broadcast_tensor_dict_test_worker
]) ])
def test_multi_process_tensor_parallel(tp_size, test_target): def test_multi_process_tensor_parallel(
multi_process_parallel(tp_size, 1, test_target) monkeypatch: pytest.MonkeyPatch,
tp_size: int,
test_target: Callable[..., Any],
):
multi_process_parallel(monkeypatch, tp_size, 1, test_target)
@pytest.mark.skipif(torch.cuda.device_count() < 2, @pytest.mark.skipif(torch.cuda.device_count() < 2,
...@@ -183,8 +216,12 @@ def test_multi_process_tensor_parallel(tp_size, test_target): ...@@ -183,8 +216,12 @@ def test_multi_process_tensor_parallel(tp_size, test_target):
@pytest.mark.parametrize("pp_size", [2]) @pytest.mark.parametrize("pp_size", [2])
@pytest.mark.parametrize( @pytest.mark.parametrize(
"test_target", [send_recv_test_worker, send_recv_tensor_dict_test_worker]) "test_target", [send_recv_test_worker, send_recv_tensor_dict_test_worker])
def test_multi_process_pipeline_parallel(pp_size, test_target): def test_multi_process_pipeline_parallel(
multi_process_parallel(1, pp_size, test_target) monkeypatch: pytest.MonkeyPatch,
pp_size: int,
test_target: Callable[..., Any],
):
multi_process_parallel(monkeypatch, 1, pp_size, test_target)
@pytest.mark.skipif(torch.cuda.device_count() < 4, @pytest.mark.skipif(torch.cuda.device_count() < 4,
...@@ -197,5 +234,9 @@ def test_multi_process_pipeline_parallel(pp_size, test_target): ...@@ -197,5 +234,9 @@ def test_multi_process_pipeline_parallel(pp_size, test_target):
broadcast_tensor_dict_test_worker broadcast_tensor_dict_test_worker
]) ])
def test_multi_process_tensor_parallel_pipeline_parallel( def test_multi_process_tensor_parallel_pipeline_parallel(
tp_size, pp_size, test_target): tp_size: int,
multi_process_parallel(tp_size, pp_size, test_target) pp_size: int,
test_target: Callable[..., Any],
monkeypatch: pytest.MonkeyPatch,
):
multi_process_parallel(monkeypatch, tp_size, pp_size, test_target)
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import os
import random import random
import pytest import pytest
...@@ -23,95 +22,116 @@ for i, v in enumerate(test_sizes): ...@@ -23,95 +22,116 @@ for i, v in enumerate(test_sizes):
@ray.remote(num_gpus=1, max_calls=1) @ray.remote(num_gpus=1, max_calls=1)
def graph_allreduce(tp_size, pp_size, rank, distributed_init_port): def graph_allreduce(
os.environ.pop("CUDA_VISIBLE_DEVICES", None) monkeypatch: pytest.MonkeyPatch,
device = torch.device(f"cuda:{rank}") tp_size,
torch.cuda.set_device(device) pp_size,
init_test_distributed_environment(tp_size, pp_size, rank, rank,
distributed_init_port) distributed_init_port,
ensure_model_parallel_initialized(tp_size, pp_size) ):
group = get_tensor_model_parallel_group().device_group with monkeypatch.context() as m:
m.delenv("CUDA_VISIBLE_DEVICES", raising=False)
# A small all_reduce for warmup. device = torch.device(f"cuda:{rank}")
# this is needed because device communicators might be created lazily torch.cuda.set_device(device)
# (e.g. NCCL). This will ensure that the communicator is initialized init_test_distributed_environment(tp_size, pp_size, rank,
# before any communication happens, so that this group can be used for distributed_init_port)
# graph capture immediately. ensure_model_parallel_initialized(tp_size, pp_size)
data = torch.zeros(1) group = get_tensor_model_parallel_group().device_group
data = data.to(device=device)
torch.distributed.all_reduce(data, group=group) # A small all_reduce for warmup.
torch.cuda.synchronize() # this is needed because device communicators might be created lazily
del data # (e.g. NCCL). This will ensure that the communicator is initialized
# before any communication happens, so that this group can be used for
# we use the first group to communicate once # graph capture immediately.
# and the second group to communicate twice data = torch.zeros(1)
# and so on data = data.to(device=device)
# this is used to demonstrate that each group can torch.distributed.all_reduce(data, group=group)
# communicate independently torch.cuda.synchronize()
num_communication = rank // tp_size + 1 del data
for sz in test_sizes: # we use the first group to communicate once
for dtype in [torch.float32, torch.float16, torch.bfloat16]: # and the second group to communicate twice
with graph_capture(device=device) as graph_capture_context: # and so on
# use integers so result matches NCCL exactly # this is used to demonstrate that each group can
inp1 = torch.randint(1, # communicate independently
16, (sz, ), num_communication = rank // tp_size + 1
dtype=dtype,
device=torch.cuda.current_device()) for sz in test_sizes:
inp2 = torch.randint(1, for dtype in [torch.float32, torch.float16, torch.bfloat16]:
16, (sz, ), with graph_capture(device=device) as graph_capture_context:
dtype=dtype, # use integers so result matches NCCL exactly
device=torch.cuda.current_device()) inp1 = torch.randint(1,
torch.cuda.synchronize() 16, (sz, ),
graph = torch.cuda.CUDAGraph() dtype=dtype,
with torch.cuda.graph(graph, device=torch.cuda.current_device())
stream=graph_capture_context.stream): inp2 = torch.randint(1,
for i in range(num_communication): 16, (sz, ),
out1 = tensor_model_parallel_all_reduce(inp1) dtype=dtype,
# the input buffer is immediately modified to test device=torch.cuda.current_device())
# synchronization torch.cuda.synchronize()
dist.all_reduce(inp1, group=group) graph = torch.cuda.CUDAGraph()
out2 = tensor_model_parallel_all_reduce(inp2) with torch.cuda.graph(graph,
dist.all_reduce(inp2, group=group) stream=graph_capture_context.stream):
graph.replay() for i in range(num_communication):
torch.testing.assert_close(out1, inp1) out1 = tensor_model_parallel_all_reduce(inp1)
torch.testing.assert_close(out2, inp2) # the input buffer is immediately modified to test
# synchronization
dist.all_reduce(inp1, group=group)
out2 = tensor_model_parallel_all_reduce(inp2)
dist.all_reduce(inp2, group=group)
graph.replay()
torch.testing.assert_close(out1, inp1)
torch.testing.assert_close(out2, inp2)
@ray.remote(num_gpus=1, max_calls=1) @ray.remote(num_gpus=1, max_calls=1)
def eager_allreduce(tp_size, pp_size, rank, distributed_init_port): def eager_allreduce(
os.environ.pop("CUDA_VISIBLE_DEVICES", None) monkeypatch: pytest.MonkeyPatch,
device = torch.device(f"cuda:{rank}") tp_size,
torch.cuda.set_device(device) pp_size,
init_test_distributed_environment(tp_size, pp_size, rank, rank,
distributed_init_port) distributed_init_port,
):
# we use the first group to communicate once with monkeypatch.context() as m:
# and the second group to communicate twice m.delenv("CUDA_VISIBLE_DEVICES", raising=False)
# and so on device = torch.device(f"cuda:{rank}")
# this is used to demonstrate that each group can torch.cuda.set_device(device)
# communicate independently init_test_distributed_environment(tp_size, pp_size, rank,
num_communication = rank // tp_size + 1 distributed_init_port)
sz = 1024
fa = get_tp_group().ca_comm # we use the first group to communicate once
inp = torch.ones(sz, dtype=torch.float32, device=device) # and the second group to communicate twice
out = inp # and so on
for _ in range(num_communication): # this is used to demonstrate that each group can
out = fa.all_reduce(out, registered=False) # communicate independently
torch.testing.assert_close(out, inp * (tp_size**num_communication)) num_communication = rank // tp_size + 1
sz = 1024
inp = torch.ones(sz * 4, dtype=torch.bfloat16, device=device) # fa = get_tp_group().ca_comm
out = inp fa = get_tp_group().device_communicator.ca_comm
for _ in range(num_communication): inp = torch.ones(sz, dtype=torch.float32, device=device)
out = fa.all_reduce(out, registered=False) out = inp
torch.testing.assert_close(out, inp * (tp_size**num_communication)) for _ in range(num_communication):
out = fa.all_reduce(out, registered=False)
torch.testing.assert_close(out, inp * (tp_size**num_communication))
inp = torch.ones(sz * 4, dtype=torch.bfloat16, device=device)
out = inp
for _ in range(num_communication):
out = fa.all_reduce(out, registered=False)
torch.testing.assert_close(out, inp * (tp_size**num_communication))
@pytest.mark.parametrize("tp_size", [2]) @pytest.mark.parametrize("tp_size", [2])
@pytest.mark.parametrize("pipeline_parallel_size", [1, 2]) @pytest.mark.parametrize("pipeline_parallel_size", [1, 2])
@pytest.mark.parametrize("test_target", [eager_allreduce, graph_allreduce]) @pytest.mark.parametrize("test_target", [eager_allreduce, graph_allreduce])
def test_custom_allreduce(tp_size, pipeline_parallel_size, test_target): def test_custom_allreduce(
monkeypatch: pytest.MonkeyPatch,
tp_size,
pipeline_parallel_size,
test_target,
):
world_size = tp_size * pipeline_parallel_size world_size = tp_size * pipeline_parallel_size
if world_size > torch.cuda.device_count(): if world_size > torch.cuda.device_count():
pytest.skip("Not enough GPUs to run the test.") pytest.skip("Not enough GPUs to run the test.")
multi_process_parallel(tp_size, pipeline_parallel_size, test_target) multi_process_parallel(monkeypatch, tp_size, pipeline_parallel_size,
test_target)
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass
from typing import Literal, NamedTuple, Optional
import pytest
from vllm.config import TaskOption
from vllm.logger import init_logger
from ..utils import compare_two_settings, create_new_process_for_each_test
logger = init_logger("test_expert_parallel")
class ParallelSetup(NamedTuple):
tp_size: int
eager_mode: bool
chunked_prefill: bool
class EPTestOptions(NamedTuple):
trust_remote_code: bool
tokenizer_mode: Optional[str]
load_format: Optional[str] = None
hf_overrides: Optional[str] = None
@dataclass
class EPTestSettings:
parallel_setups: list[ParallelSetup]
distributed_backends: list[str]
task: TaskOption
test_options: EPTestOptions
@staticmethod
def detailed(
*,
tp_base: int = 2,
task: TaskOption = "auto",
trust_remote_code: bool = False,
tokenizer_mode: Optional[str] = None,
load_format: Optional[str] = None,
hf_overrides: Optional[str] = None,
):
return EPTestSettings(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
eager_mode=False,
chunked_prefill=False),
ParallelSetup(tp_size=tp_base,
eager_mode=False,
chunked_prefill=True),
ParallelSetup(tp_size=tp_base,
eager_mode=True,
chunked_prefill=False),
ParallelSetup(tp_size=2 * tp_base,
eager_mode=False,
chunked_prefill=True),
ParallelSetup(tp_size=2 * tp_base,
eager_mode=True,
chunked_prefill=False),
],
distributed_backends=["mp", "ray"],
task=task,
test_options=EPTestOptions(trust_remote_code=trust_remote_code,
tokenizer_mode=tokenizer_mode,
load_format=load_format,
hf_overrides=hf_overrides),
)
@staticmethod
def fast(
*,
tp_base: int = 2,
task: TaskOption = "auto",
trust_remote_code: bool = False,
tokenizer_mode: Optional[str] = None,
load_format: Optional[str] = None,
hf_overrides: Optional[str] = None,
):
return EPTestSettings(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
eager_mode=True,
chunked_prefill=False),
],
distributed_backends=["mp"],
task=task,
test_options=EPTestOptions(trust_remote_code=trust_remote_code,
tokenizer_mode=tokenizer_mode,
load_format=load_format,
hf_overrides=hf_overrides),
)
def iter_params(self, model_name: str):
opts = self.test_options
for parallel_setup in self.parallel_setups:
for distributed_backend in self.distributed_backends:
yield (model_name, parallel_setup, distributed_backend,
self.task, opts)
# NOTE: You can adjust tp_base locally to fit the model in GPU
# The values displayed here are only a rough indicator of the size of the model
# yapf: disable
TEST_MODELS = {
"deepseek-ai/DeepSeek-V2-Lite-Chat": EPTestSettings.fast(
trust_remote_code=True),
"mistralai/Mixtral-8x7B-Instruct-v0.1": EPTestSettings.fast(tp_base=4),
}
def _compare_tp(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: EPTestOptions,
num_gpus_available: int,
*,
method: Literal["generate"],
):
(
tp_size,
eager_mode,
chunked_prefill,
) = parallel_setup
(
trust_remote_code,
tokenizer_mode,
load_format,
hf_overrides,
) = test_options
if num_gpus_available < tp_size:
pytest.skip(f"Need at least {tp_size} GPUs")
common_args = [
# use half precision for speed and memory savings in CI environment
"--dtype",
"float16",
"--max-model-len",
"2048",
"--max-num-seqs",
"8",
"--load-format",
"auto",
]
if chunked_prefill:
common_args.append("--enable-chunked-prefill")
if eager_mode:
common_args.append("--enforce-eager")
if task != "auto":
common_args.extend(["--task", task])
if trust_remote_code:
common_args.append("--trust-remote-code")
if tokenizer_mode:
common_args.extend(["--tokenizer-mode", tokenizer_mode])
if load_format:
common_args.extend(["--load-format", load_format])
if hf_overrides:
common_args.extend(["--hf-overrides", hf_overrides])
ep_env = {
"VLLM_TEST_ENABLE_EP": "1",
}
ep_args = [
*common_args,
"--tensor-parallel-size",
str(tp_size),
"--distributed-executor-backend",
distributed_backend,
]
# compare without expert parallelism
tp_env = {
"VLLM_TEST_ENABLE_EP": "0",
}
tp_args = [
*common_args,
"--tensor-parallel-size",
str(tp_size),
"--distributed-executor-backend",
"mp",
]
try:
compare_two_settings(model_name,
ep_args,
tp_args,
ep_env,
tp_env,
method=method,
max_wait_seconds=360)
except Exception:
raise
@pytest.mark.parametrize(
("model_name", "parallel_setup", "distributed_backend", "task",
"test_options"),
[
params for model_name, settings in TEST_MODELS.items()
for params in settings.iter_params(model_name)
],
)
@create_new_process_for_each_test()
def test_ep(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: EPTestOptions,
num_gpus_available,
):
_compare_tp(model_name,
parallel_setup,
distributed_backend,
task,
test_options,
num_gpus_available,
method="generate")
...@@ -9,7 +9,7 @@ WARNING: This test runs in both single-node (4 GPUs) and multi-node ...@@ -9,7 +9,7 @@ WARNING: This test runs in both single-node (4 GPUs) and multi-node
import json import json
import os import os
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Literal, NamedTuple, Optional from typing import Literal, NamedTuple, Optional
import pytest import pytest
...@@ -17,13 +17,25 @@ from vllm.config import TaskOption ...@@ -17,13 +17,25 @@ from vllm.config import TaskOption
from vllm.logger import init_logger from vllm.logger import init_logger
from ..models.registry import HF_EXAMPLE_MODELS from ..models.registry import HF_EXAMPLE_MODELS
from ..utils import compare_two_settings, fork_new_process_for_each_test, models_path_prefix from ..utils import compare_two_settings, create_new_process_for_each_test, models_path_prefix
logger = init_logger("test_pipeline_parallel") logger = init_logger("test_pipeline_parallel")
VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1" VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
For PP, we fall back to V0 by default. This means
that the TP baseline runs with V1 while the PP engine
runs with V0. This gives divergent results with dummy
weights. Once we enable V1 by default for PP, we can
remove this.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')
class ParallelSetup(NamedTuple): class ParallelSetup(NamedTuple):
tp_size: int tp_size: int
pp_size: int pp_size: int
...@@ -38,14 +50,14 @@ class PPTestOptions(NamedTuple): ...@@ -38,14 +50,14 @@ class PPTestOptions(NamedTuple):
@dataclass @dataclass
class PPTestSettings: class PPTestSettings:
parallel_setups: List[ParallelSetup] parallel_setups: list[ParallelSetup]
# NOTE: the length of distributed_backends and # NOTE: the length of distributed_backends and
# vllm_major_versions should be the same, and they # vllm_major_versions should be the same, and they
# are first zipped together to iterate over all # are first zipped together to iterate over all
# test settings. # test settings.
distributed_backends: List[str] distributed_backends: list[str]
# vllm major version: "0" for V0, "1" for V1 # vllm major version: "0" for V0, "1" for V1
vllm_major_versions: List[str] vllm_major_versions: list[str]
task: TaskOption task: TaskOption
test_options: PPTestOptions test_options: PPTestOptions
...@@ -163,6 +175,8 @@ TEXT_GENERATION_MODELS = { ...@@ -163,6 +175,8 @@ TEXT_GENERATION_MODELS = {
os.path.join(models_path_prefix, "inceptionai/jais-13b-chat"): PPTestSettings.fast(), os.path.join(models_path_prefix, "inceptionai/jais-13b-chat"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "ai21labs/Jamba-tiny-dev"): PPTestSettings.fast(), os.path.join(models_path_prefix, "ai21labs/Jamba-tiny-dev"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "meta-llama/Llama-3.2-1B-Instruct"): PPTestSettings.detailed(), os.path.join(models_path_prefix, "meta-llama/Llama-3.2-1B-Instruct"): PPTestSettings.detailed(),
# Tests TransformersModel
os.path.join(models_path_prefix, "ArthurZ/Ilama-3.2-1B"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "openbmb/MiniCPM-2B-sft-bf16"): PPTestSettings.fast(), os.path.join(models_path_prefix, "openbmb/MiniCPM-2B-sft-bf16"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "openbmb/MiniCPM3-4B"): PPTestSettings.fast(), os.path.join(models_path_prefix, "openbmb/MiniCPM3-4B"): PPTestSettings.fast(),
# Uses Llama # Uses Llama
...@@ -214,7 +228,7 @@ MULTIMODAL_MODELS = { ...@@ -214,7 +228,7 @@ MULTIMODAL_MODELS = {
os.path.join(models_path_prefix, "llava-hf/llava-onevision-qwen2-0.5b-ov-hf"): PPTestSettings.fast(), os.path.join(models_path_prefix, "llava-hf/llava-onevision-qwen2-0.5b-ov-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "openbmb/MiniCPM-Llama3-V-2_5"): PPTestSettings.fast(), os.path.join(models_path_prefix, "openbmb/MiniCPM-Llama3-V-2_5"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "allenai/Molmo-7B-D-0924"): PPTestSettings.fast(), os.path.join(models_path_prefix, "allenai/Molmo-7B-D-0924"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "microsoft/Phi-3-vision-128k-instruct"): PPTestSettings.fast(), os.path.join(models_path_prefix, "microsoft/Phi-3.5-vision-instruct"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "mistralai/Pixtral-12B-2409"): PPTestSettings.fast(load_format="dummy"), os.path.join(models_path_prefix, "mistralai/Pixtral-12B-2409"): PPTestSettings.fast(load_format="dummy"),
os.path.join(models_path_prefix, "Qwen/Qwen-VL-Chat"): PPTestSettings.fast(), os.path.join(models_path_prefix, "Qwen/Qwen-VL-Chat"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "Qwen/Qwen2-Audio-7B-Instruct"): PPTestSettings.fast(), os.path.join(models_path_prefix, "Qwen/Qwen2-Audio-7B-Instruct"): PPTestSettings.fast(),
...@@ -231,13 +245,14 @@ TEST_MODELS = [ ...@@ -231,13 +245,14 @@ TEST_MODELS = [
# [LANGUAGE GENERATION] # [LANGUAGE GENERATION]
os.path.join(models_path_prefix, "microsoft/Phi-3.5-MoE-instruct"), os.path.join(models_path_prefix, "microsoft/Phi-3.5-MoE-instruct"),
os.path.join(models_path_prefix, "meta-llama/Llama-3.2-1B-Instruct"), os.path.join(models_path_prefix, "meta-llama/Llama-3.2-1B-Instruct"),
# "ArthurZ/Ilama-3.2-1B", NOTE: Uncomment after #13905
os.path.join(models_path_prefix, "ibm/PowerLM-3b"), os.path.join(models_path_prefix, "ibm/PowerLM-3b"),
# [LANGUAGE EMBEDDING] # [LANGUAGE EMBEDDING]
os.path.join(models_path_prefix, "intfloat/e5-mistral-7b-instruct"), os.path.join(models_path_prefix, "intfloat/e5-mistral-7b-instruct"),
os.path.join(models_path_prefix, "BAAI/bge-multilingual-gemma2"), os.path.join(models_path_prefix, "BAAI/bge-multilingual-gemma2"),
# [MULTIMODAL GENERATION] # [MULTIMODAL GENERATION]
os.path.join(models_path_prefix, "OpenGVLab/InternVL2-1B"), os.path.join(models_path_prefix, "OpenGVLab/InternVL2-1B"),
os.path.join(models_path_prefix, "microsoft/Phi-3-vision-128k-instruct"), os.path.join(models_path_prefix, "microsoft/Phi-3.5-vision-instruct"),
os.path.join(models_path_prefix, "fixie-ai/ultravox-v0_5-llama-3_2-1b"), os.path.join(models_path_prefix, "fixie-ai/ultravox-v0_5-llama-3_2-1b"),
# [LANGUAGE GENERATION - HYBRID ARCH] # [LANGUAGE GENERATION - HYBRID ARCH]
os.path.join(models_path_prefix, "ai21labs/Jamba-tiny-dev"), os.path.join(models_path_prefix, "ai21labs/Jamba-tiny-dev"),
...@@ -324,8 +339,8 @@ def _compare_tp( ...@@ -324,8 +339,8 @@ def _compare_tp(
specific_case = tp_size == 2 and pp_size == 2 and chunked_prefill specific_case = tp_size == 2 and pp_size == 2 and chunked_prefill
if distributed_backend == "ray" and (vllm_major_version == "1" if distributed_backend == "ray" and (vllm_major_version == "1"
or specific_case): or specific_case):
# For V1, test Ray ADAG for all the tests # For V1, test Ray Compiled Graph for all the tests
# For V0, test Ray ADAG for a subset of the tests # For V0, test Ray Compiled Graph for a subset of the tests
pp_env = { pp_env = {
"VLLM_USE_V1": vllm_major_version, "VLLM_USE_V1": vllm_major_version,
"VLLM_USE_RAY_COMPILED_DAG": "1", "VLLM_USE_RAY_COMPILED_DAG": "1",
...@@ -333,11 +348,15 @@ def _compare_tp( ...@@ -333,11 +348,15 @@ def _compare_tp(
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL": "1", "VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL": "1",
} }
# Temporary. Currently when zeromq + SPMD is used, it does not properly # Temporary. Currently when zeromq + SPMD is used, it does not properly
# terminate because of aDAG issue. # terminate because of a Ray Compiled Graph issue.
common_args.append("--disable-frontend-multiprocessing") common_args.append("--disable-frontend-multiprocessing")
else: else:
pp_env = None pp_env = None
tp_env = {
"VLLM_USE_V1": vllm_major_version,
}
pp_args = [ pp_args = [
*common_args, *common_args,
"--pipeline-parallel-size", "--pipeline-parallel-size",
...@@ -362,13 +381,20 @@ def _compare_tp( ...@@ -362,13 +381,20 @@ def _compare_tp(
] ]
try: try:
compare_two_settings(model_id, pp_args, tp_args, pp_env, method=method) compare_two_settings(model_id,
pp_args,
tp_args,
pp_env,
tp_env,
method=method)
except Exception: except Exception:
if pp_env is None: testing_ray_compiled_graph = pp_env is not None
raise if testing_ray_compiled_graph and vllm_major_version == "0":
# Ray Compiled Graph tests are flaky for V0,
# so we don't want to fail the test
logger.exception("Ray Compiled Graph tests failed")
else: else:
# Ray ADAG tests are flaky, so we don't want to fail the test raise
logger.exception("Ray ADAG tests failed")
@pytest.mark.parametrize( @pytest.mark.parametrize(
...@@ -379,7 +405,7 @@ def _compare_tp( ...@@ -379,7 +405,7 @@ def _compare_tp(
for params in settings.iter_params(model_id) if model_id in TEST_MODELS for params in settings.iter_params(model_id) if model_id in TEST_MODELS
], ],
) )
@fork_new_process_for_each_test @create_new_process_for_each_test()
def test_tp_language_generation( def test_tp_language_generation(
model_id: str, model_id: str,
parallel_setup: ParallelSetup, parallel_setup: ParallelSetup,
...@@ -408,7 +434,7 @@ def test_tp_language_generation( ...@@ -408,7 +434,7 @@ def test_tp_language_generation(
for params in settings.iter_params(model_id) if model_id in TEST_MODELS for params in settings.iter_params(model_id) if model_id in TEST_MODELS
], ],
) )
@fork_new_process_for_each_test @create_new_process_for_each_test()
def test_tp_language_embedding( def test_tp_language_embedding(
model_id: str, model_id: str,
parallel_setup: ParallelSetup, parallel_setup: ParallelSetup,
...@@ -437,7 +463,7 @@ def test_tp_language_embedding( ...@@ -437,7 +463,7 @@ def test_tp_language_embedding(
for params in settings.iter_params(model_id) if model_id in TEST_MODELS for params in settings.iter_params(model_id) if model_id in TEST_MODELS
], ],
) )
@fork_new_process_for_each_test @create_new_process_for_each_test()
def test_tp_multimodal_generation( def test_tp_multimodal_generation(
model_id: str, model_id: str,
parallel_setup: ParallelSetup, parallel_setup: ParallelSetup,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment