Commit 4d3a2c28 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge tag 'v0.6.5' into v0.6.5-dev

parents 92ec5d8e 2d1b9baa
......@@ -4,7 +4,6 @@ from unittest.mock import MagicMock
import pytest # noqa
from vllm.config import CacheConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler
from vllm.sequence import Logprob, SequenceGroup
......@@ -27,19 +26,17 @@ def schedule_and_update_computed_tokens(scheduler):
return metas, out
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_simple(use_v2_block_manager: bool):
def test_simple():
"""Verify basic scheduling works."""
block_size = 4
num_seq_group = 4
max_model_len = 16
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
num_seq_group,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
scheduler_config = SchedulerConfig("generate",
max_num_batched_tokens,
num_seq_group,
max_model_len,
enable_chunked_prefill=True)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -74,19 +71,19 @@ def test_simple(use_v2_block_manager: bool):
assert len(seq_group_meta) == num_seq_group
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_chunk(use_v2_block_manager: bool):
def test_chunk():
"""Verify prefills are chunked properly."""
block_size = 4
max_seqs = 60
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32
......@@ -124,18 +121,18 @@ def test_chunk(use_v2_block_manager: bool):
assert out.num_batched_tokens == 57
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_complex(use_v2_block_manager: bool):
def test_complex():
block_size = 4
max_seqs = 60
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 64
cache_config.num_gpu_blocks = 64
......@@ -194,19 +191,19 @@ def test_complex(use_v2_block_manager: bool):
assert running[2].is_prefill()
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_maximal_decoding(use_v2_block_manager: bool):
def test_maximal_decoding():
"""Verify decoding requests are prioritized."""
block_size = 4
max_seqs = 2
max_model_len = 8
max_num_batched_tokens = 2
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -288,19 +285,19 @@ def test_maximal_decoding(use_v2_block_manager: bool):
assert out.num_batched_tokens == 2
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prompt_limit(use_v2_block_manager: bool):
def test_prompt_limit():
"""Verify max_num_batched_tokens < max_model_len is possible."""
block_size = 4
max_seqs = 32
max_model_len = 64
max_num_batched_tokens = 32
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
......@@ -323,13 +320,13 @@ def test_prompt_limit(use_v2_block_manager: bool):
assert out.num_batched_tokens == 32
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prompt_limit_exceed(use_v2_block_manager: bool):
def test_prompt_limit_exceed():
block_size = 4
max_seqs = 64
max_model_len = 32
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(max_num_batched_tokens,
scheduler_config = SchedulerConfig("generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
......@@ -349,19 +346,19 @@ def test_prompt_limit_exceed(use_v2_block_manager: bool):
assert out.ignored_seq_groups[0] == seq_group
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_swap(use_v2_block_manager: bool):
"""Verify swapping works with chunked prefill requests"""
def test_chunked_prefill_preempt():
"""Verify preempt works with chunked prefill requests"""
block_size = 4
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
......@@ -369,7 +366,6 @@ def test_swap(use_v2_block_manager: bool):
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
......@@ -380,147 +376,69 @@ def test_swap(use_v2_block_manager: bool):
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
# The last request should be swapped out.
# The request should be preempted.
scheduler.block_manager.can_append_slots = MagicMock()
def cannot_append_second_group(seq_group, num_lookahead_slots):
def cannot_append_second_group1(seq_group, num_lookahead_slots):
return seq_group.request_id != "1"
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)
cannot_append_second_group1)
# The running prefill is now swapped.
# The running prefill is now preempted.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 0
assert out.num_batched_tokens == 0
assert out.blocks_to_swap_out != []
assert out.blocks_to_swap_in == []
# Add 1 more task. Swap should be prioritized over new prefill.
_, seq_group = create_dummy_prompt("2", prompt_length=60)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in != []
assert out.blocks_to_swap_out == []
assert out.blocks_to_swap_in == []
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_running_prefill_prioritized_over_swap(use_v2_block_manager: bool):
block_size = 4
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
# Make sure we can reschedule preempted request.
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
assert seq_group.get_num_uncomputed_tokens() == 30
# The request should be swapped out.
scheduler.block_manager.can_append_slots = MagicMock()
def cannot_append_second_group(seq_group, num_lookahead_slots):
return seq_group.request_id != "1"
# We should be able to run prefill twice as it is chunked.
def cannot_append_second_group2(seq_group, num_lookahead_slots):
return True
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)
# The running prefill is now swapped.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 0
assert out.num_batched_tokens == 0
assert out.blocks_to_swap_out != []
assert out.blocks_to_swap_in == []
# Add 1 more task. Swap is not possible, so prefill is running.
scheduler.block_manager.can_swap_in = MagicMock()
scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
_, seq_group2 = create_dummy_prompt("2",
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group2)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in == []
assert out.blocks_to_swap_out == []
assert out.scheduled_seq_groups[0].seq_group == seq_group2
# Now although swap is possible, running prefill is prioritized.
scheduler.block_manager.can_swap_in.return_value = AllocStatus.OK
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in == []
assert out.blocks_to_swap_out == []
assert not seq_group2.is_prefill()
assert out.scheduled_seq_groups[0].seq_group == seq_group2
append_new_token(seq_group2, 1)
# Decoding is prioritized.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 1
assert out.blocks_to_swap_in == []
assert out.blocks_to_swap_out == []
assert not seq_group2.is_prefill()
assert out.scheduled_seq_groups[0].seq_group == seq_group2
append_new_token(seq_group2, 1)
# Since we abort the sequence group, we can finally swap.
scheduler.abort_seq_group(seq_group2.request_id)
cannot_append_second_group2)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.num_batched_tokens == 30
assert out.blocks_to_swap_in != []
assert out.blocks_to_swap_out == []
assert out.num_prefill_groups == 1
assert not seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_chunked_prefill_preempt(use_v2_block_manager: bool):
"""Verify preempt works with chunked prefill requests"""
@pytest.mark.parametrize("num_scheduler_steps", [1, 5])
def test_chunked_prefill_spec_prefill(num_scheduler_steps):
"""Verify that the num_lookahead_slots is set appropriately for an all"""
"""prefill batch depending on whether multi-step scheduling is enabled"""
"""or not"""
block_size = 4
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
num_lookahead_slots = 4
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
num_lookahead_slots=num_lookahead_slots,
num_scheduler_steps=num_scheduler_steps,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
prompt_length=30,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
......@@ -528,58 +446,24 @@ def test_chunked_prefill_preempt(use_v2_block_manager: bool):
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
# The request should be preempted.
scheduler.block_manager.can_append_slots = MagicMock()
def cannot_append_second_group1(seq_group, num_lookahead_slots):
return seq_group.request_id != "1"
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group1)
# The running prefill is now preempted.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 0
assert out.num_batched_tokens == 0
assert out.blocks_to_swap_out == []
assert out.blocks_to_swap_in == []
# Make sure we can reschedule preempted request.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
assert seq_group.get_num_uncomputed_tokens() == 30
# We should be able to run prefill twice as it is chunked.
def cannot_append_second_group2(seq_group, num_lookahead_slots):
return True
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group2)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert not seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
print(out.num_lookahead_slots)
assert out.num_lookahead_slots == (0 if (num_scheduler_steps == 1) else
num_lookahead_slots)
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_chunked_prefill_max_seqs(use_v2_block_manager: bool):
def test_chunked_prefill_max_seqs():
block_size = 4
max_seqs = 2
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 128
cache_config.num_gpu_blocks = 128
......@@ -622,19 +506,19 @@ def test_chunked_prefill_max_seqs(use_v2_block_manager: bool):
assert not running[1].is_prefill()
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_perfix_caching(use_v2_block_manager: bool):
def test_perfix_caching():
"""Verify allocating full blocks when prefix caching is enabled."""
block_size = 4
max_seqs = 10
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size,
1.0,
1,
......
import os
import pytest
from tests.conftest import VllmRunner
from tests.core.utils import create_dummy_prompt
from vllm.engine.llm_engine import LLMEngine
from vllm.platforms import current_platform
from vllm.sequence import SequenceGroup
from ..utils import models_path_prefix
MODEL = os.path.join(models_path_prefix, "JackFram/llama-160m")
def add_seq_group_to_engine(engine: LLMEngine, seq_group: SequenceGroup):
scheduler = engine.scheduler[0]
scheduler.add_seq_group(seq_group)
@pytest.mark.parametrize("num_scheduler_steps", [1, 8])
@pytest.mark.parametrize("enable_chunked_prefill", [False, True])
@pytest.mark.parametrize("enforce_eager", [False, True])
def test_num_computed_tokens_update(num_scheduler_steps: int,
enable_chunked_prefill: bool,
enforce_eager: bool):
is_multi_step = num_scheduler_steps > 1
is_multi_step_chunked_prefill = is_multi_step and enable_chunked_prefill
if is_multi_step_chunked_prefill and current_platform.is_rocm():
pytest.skip("Multi-step with Chunked-Prefill does not support "
"rocm_flash_attn backend")
# Make a vllm engine
runner = VllmRunner(model_name=MODEL,
gpu_memory_utilization=0.7,
num_scheduler_steps=num_scheduler_steps,
enable_chunked_prefill=enable_chunked_prefill,
enforce_eager=enforce_eager)
engine: LLMEngine = runner.model.llm_engine
# In multi-step + chunked-prefill there is no separate single prompt step.
# What is scheduled will run for num_scheduler_steps always.
num_prompt_steps = num_scheduler_steps \
if is_multi_step_chunked_prefill else 1
num_output_tokens_list = [4, 8, 12, 15, 16, 17]
# Create sequence and add to engine
prompt_len = 10
for req_idx, num_output_tokens in enumerate(num_output_tokens_list):
seq, seq_group = create_dummy_prompt(request_id=str(req_idx),
prompt_length=prompt_len,
min_tokens=num_output_tokens,
max_tokens=num_output_tokens)
add_seq_group_to_engine(engine, seq_group)
assert seq.data.get_num_computed_tokens() == 0
for _ in range(num_prompt_steps):
# prompt steps
engine.step()
if not seq.is_finished():
prompt_num_computed_tokens = seq.data.get_num_computed_tokens()
# Test correctness of num_computed_tokens after the prompt steps
assert prompt_num_computed_tokens == \
prompt_len + num_prompt_steps - 1
decode_step_counter = 0
while not seq.is_finished():
# Test correctness of num_computed_tokens after the decode steps
assert seq.data.get_num_computed_tokens(
) == prompt_num_computed_tokens + decode_step_counter
for _ in range(num_scheduler_steps):
# decode step
engine.step()
decode_step_counter += 1
# Test correctness of num_computed_tokens after the sequence finish.
assert seq.data.get_num_computed_tokens(
) == prompt_len + num_output_tokens - 1
......@@ -3,25 +3,28 @@ from collections import deque
from typing import List, Set, Tuple
from unittest.mock import MagicMock
import pytest
import pytest # noqa
from torch import Use # noqa
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
from vllm.sequence import SequenceGroup, SequenceStatus
from vllm.sequence import SequenceGroup
from .utils import (append_new_token, append_new_token_seq_group,
create_dummy_prompt, get_sequence_groups,
schedule_and_update_computed_tokens)
from .utils import (append_new_token, append_new_token_seq,
append_new_token_seq_group, create_dummy_prompt,
get_sequence_groups, schedule_and_update_computed_tokens)
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_add_seq_group(use_v2_block_manager: bool):
def test_scheduler_add_seq_group():
block_size = 4
scheduler_config = SchedulerConfig(
100, 64, 1, use_v2_block_manager=use_v2_block_manager)
"generate",
max_num_batched_tokens=100,
max_num_seqs=64,
max_model_len=1,
)
cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
cache_config.num_cpu_blocks = 4
cache_config.num_gpu_blocks = 4
......@@ -37,11 +40,14 @@ def test_scheduler_add_seq_group(use_v2_block_manager: bool):
assert scheduler.get_num_unfinished_seq_groups() == i + 1
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_abort_seq_group(use_v2_block_manager: bool):
def test_scheduler_abort_seq_group():
block_size = 4
scheduler_config = SchedulerConfig(
100, 64, 1, use_v2_block_manager=use_v2_block_manager)
"generate",
max_num_batched_tokens=100,
max_num_seqs=64,
max_model_len=1,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 4
cache_config.num_gpu_blocks = 4
......@@ -61,16 +67,16 @@ def test_scheduler_abort_seq_group(use_v2_block_manager: bool):
assert scheduler.get_num_unfinished_seq_groups() == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_simple(use_v2_block_manager: bool):
def test_scheduler_schedule_simple():
block_size = 4
num_seq_group = 4
max_model_len = 16
scheduler_config = SchedulerConfig(
64,
num_seq_group,
max_model_len,
use_v2_block_manager=use_v2_block_manager)
"generate",
max_num_batched_tokens=64,
max_num_seqs=num_seq_group,
max_model_len=max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -105,17 +111,17 @@ def test_scheduler_schedule_simple(use_v2_block_manager: bool):
append_new_token(out, 1)
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_prefill_prioritized(use_v2_block_manager: bool):
def test_scheduler_prefill_prioritized():
"""Verify running batched tokens are not applied to prefill requests."""
block_size = 4
max_model_len = 30
max_batched_num_tokens = 30
scheduler_config = SchedulerConfig(
max_batched_num_tokens,
2,
max_model_len,
use_v2_block_manager=use_v2_block_manager)
"generate",
max_num_batched_tokens=max_batched_num_tokens,
max_num_seqs=2,
max_model_len=max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
......@@ -139,12 +145,15 @@ def test_scheduler_prefill_prioritized(use_v2_block_manager: bool):
assert get_sequence_groups(out) == [seq_group_b]
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_preempt_abort(use_v2_block_manager: bool):
def test_scheduler_schedule_preempt_abort():
block_size = 4
max_model_len = 16
scheduler_config = SchedulerConfig(
64, 2, max_model_len, use_v2_block_manager=use_v2_block_manager)
"generate",
max_num_batched_tokens=64,
max_num_seqs=2,
max_model_len=max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 2
cache_config.num_gpu_blocks = 2
......@@ -194,17 +203,17 @@ def test_scheduler_schedule_preempt_abort(use_v2_block_manager: bool):
assert scheduler.get_num_unfinished_seq_groups() == 1
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_max_seqs(use_v2_block_manager: bool):
def test_scheduler_max_seqs():
block_size = 4
num_seq_group = 4
max_seq_group = 2
max_model_len = 16
scheduler_config = SchedulerConfig(
64,
max_seq_group,
max_model_len,
use_v2_block_manager=use_v2_block_manager)
"generate",
max_num_batched_tokens=64,
max_num_seqs=max_seq_group,
max_model_len=max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -242,15 +251,15 @@ def test_scheduler_max_seqs(use_v2_block_manager: bool):
assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_delay_factor(use_v2_block_manager: bool):
def test_scheduler_delay_factor():
block_size = 4
scheduler_config = SchedulerConfig(
100,
64,
16,
"generate",
max_num_batched_tokens=100,
max_num_seqs=64,
max_model_len=16,
delay_factor=0.5,
use_v2_block_manager=use_v2_block_manager)
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -287,75 +296,33 @@ def test_scheduler_delay_factor(use_v2_block_manager: bool):
append_new_token(out, 1)
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_swapped_out_prioritized(use_v2_block_manager: bool):
block_size = 4
scheduler = initialize_scheduler(max_num_seqs=6,
block_size=block_size,
use_v2_block_manager=use_v2_block_manager,
num_cpu_blocks=64,
num_gpu_blocks=64)
# best_of=2 * 3 == 6 sequences.
for i in range(3):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 3
append_new_token(out, 1)
# The last request should be swapped out.
scheduler.block_manager.can_append_slots = MagicMock()
def cannot_append_second_group(seq_group, num_lookahead_slots):
return seq_group.request_id != "2"
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 2
assert out.num_batched_tokens == 2
assert out.blocks_to_swap_out != []
assert out.blocks_to_swap_in == []
append_new_token(out, 1)
# Add 1 more task. Swap should be prioritized over prefill.
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)
assert len(out.scheduled_seq_groups) == 3
# 3 decodes. It is swapped in.
assert out.num_batched_tokens == 3
assert out.blocks_to_swap_in != []
assert out.blocks_to_swap_out == []
def initialize_scheduler(
*,
max_num_seqs=1000,
max_token_budget=1000,
max_model_len=1000,
lora_config=None,
use_v2_block_manager=False,
block_size=4,
num_cpu_blocks=8,
num_gpu_blocks=8,
enable_prefix_caching=False,
enable_chunked_prefill=False,
):
block_size = block_size
scheduler_config = SchedulerConfig(
max_token_budget,
max_num_seqs,
max_model_len,
use_v2_block_manager=use_v2_block_manager)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
"generate",
max_num_batched_tokens=max_token_budget,
max_num_seqs=max_num_seqs,
max_model_len=max_model_len,
enable_chunked_prefill=enable_chunked_prefill,
)
cache_config = CacheConfig(
block_size,
1.0,
1,
"auto",
enable_prefix_caching=enable_prefix_caching,
)
cache_config.num_cpu_blocks = num_cpu_blocks
cache_config.num_gpu_blocks = num_gpu_blocks
scheduler = Scheduler(scheduler_config, cache_config, lora_config)
......@@ -379,15 +346,12 @@ def add_token_budget(budget: SchedulingBudget,
budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_prompt_len(use_v2_block_manager: bool):
def test_prefill_schedule_max_prompt_len():
"""
Test prompt longer than max_prompt_len is aborted.
"""
block_size = 4
scheduler = initialize_scheduler(max_model_len=30,
use_v2_block_manager=use_v2_block_manager,
block_size=block_size)
scheduler = initialize_scheduler(max_model_len=30, block_size=block_size)
_, seq_group = create_dummy_prompt("0",
prompt_length=60,
block_size=block_size)
......@@ -402,14 +366,12 @@ def test_prefill_schedule_max_prompt_len(use_v2_block_manager: bool):
assert len(remaining_waiting) == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_token_budget(use_v2_block_manager: bool):
def test_prefill_schedule_token_budget():
"""
Test token budget respected.
"""
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
budget = create_token_budget(token_budget=0)
......@@ -439,8 +401,7 @@ def test_prefill_schedule_token_budget(use_v2_block_manager: bool):
assert len(remaining_waiting) == 1
# Test when current_batched_tokens respected.
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=16,
num_gpu_blocks=16)
budget = create_token_budget(token_budget=60)
......@@ -467,14 +428,12 @@ def test_prefill_schedule_token_budget(use_v2_block_manager: bool):
assert len(remaining_waiting) == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_seqs(use_v2_block_manager: bool):
def test_prefill_schedule_max_seqs():
"""
Test max seq respected.
"""
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
budget = create_token_budget(max_num_seqs=2)
......@@ -508,15 +467,13 @@ def test_prefill_schedule_max_seqs(use_v2_block_manager: bool):
assert len(remaining_waiting) == 1
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_lora(use_v2_block_manager: bool):
def test_prefill_schedule_max_lora():
"""
Test max lora is respected and prioritized.
"""
block_size = 4
lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
scheduler = initialize_scheduler(lora_config=lora_config,
use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
......@@ -563,14 +520,12 @@ def test_prefill_schedule_max_lora(use_v2_block_manager: bool):
assert budget.num_batched_tokens == 60
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_no_block_manager_capacity(use_v2_block_manager):
def test_prefill_schedule_no_block_manager_capacity():
"""
Test sequence cannot be scheduled due to block manager has no capacity.
"""
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_gpu_blocks=128,
num_cpu_blocks=128)
budget = create_token_budget()
......@@ -607,14 +562,12 @@ def test_prefill_schedule_no_block_manager_capacity(use_v2_block_manager):
assert len(remaining_waiting) == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_schedule_preempted(use_v2_block_manager: bool):
def test_decode_schedule_preempted():
"""
Test decodes cannot be scheduled and preempted.
"""
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
curr_loras = None
......@@ -653,70 +606,12 @@ def test_decode_schedule_preempted(use_v2_block_manager: bool):
assert output.blocks_to_copy == []
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_swap_beam_search(use_v2_block_manager: bool):
"""
Test best_of > 1 swap out blocks
"""
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
num_gpu_blocks=64,
num_cpu_blocks=64)
curr_loras = None
budget = create_token_budget()
for i in range(3):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
scheduler._add_seq_group_to_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
budget.add_num_seqs(seq_group.request_id,
seq_group.get_max_num_running_seqs())
budget.add_num_batched_tokens(
seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
# The last request should be swapped out.
scheduler.block_manager.can_append_slots = MagicMock()
def cannot_append_second_group(seq_group, num_lookahead_slots):
return seq_group.request_id != "2"
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group)
scheduler.block_manager.swap_out = MagicMock()
expected_swap_mapping = [("5", "7")]
scheduler.block_manager.swap_out.return_value = expected_swap_mapping
output = scheduler._schedule_running(budget, curr_loras)
remainig_running = scheduler.running
assert len(remainig_running) == 0
assert len(output.decode_seq_groups) == 2
assert len(output.prefill_seq_groups) == 0
assert output.decode_seq_groups[0].seq_group.request_id == "0"
assert output.decode_seq_groups[1].seq_group.request_id == "1"
assert len(output.preempted) == 0
assert len(output.swapped_out) == 1
# Budget should refledct preempted requests.
assert budget.num_batched_tokens == 2
# since there are 2 sequences, 2 should be subtracted.
assert budget.num_curr_seqs == 4
# Both should be preempted, not swapped.
assert output.blocks_to_swap_out == expected_swap_mapping
# Nothing is copied.
assert output.blocks_to_copy == []
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_decode_blocks_to_copy_update(use_v2_block_manager: bool):
def test_schedule_decode_blocks_to_copy_update():
"""
Verify blocks_to_copy is updated.
"""
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=4,
scheduler = initialize_scheduler(block_size=4,
num_cpu_blocks=16,
num_gpu_blocks=16)
_, seq_group = create_dummy_prompt("1",
......@@ -747,117 +642,10 @@ def test_schedule_decode_blocks_to_copy_update(use_v2_block_manager: bool):
assert output.blocks_to_copy == [(2, 3)]
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_simple(use_v2_block_manager: bool):
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
_, seq_group = create_dummy_prompt("1",
prompt_length=4,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(4, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)
budget = create_token_budget()
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 0
assert budget.num_batched_tokens == 1
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 1
assert len(output.prefill_seq_groups) == 0
# swap in is the reverse of swap out
blocks_to_swap_in_reverse = []
for swapin, swapout in output.blocks_to_swap_in:
blocks_to_swap_in_reverse.append((swapout, swapin))
assert blocks_to_swap_out == blocks_to_swap_in_reverse
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_token_budget(use_v2_block_manager: bool):
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for i in range(2):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)
budget = create_token_budget(token_budget=1)
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 1
assert budget.num_batched_tokens == 1
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 1
assert len(output.prefill_seq_groups) == 0
# Verify num_batched_tokens are respected.
budget = create_token_budget(token_budget=1)
add_token_budget(budget, 1, 0)
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 1
assert budget.num_batched_tokens == 1
assert budget.num_curr_seqs == 0
assert len(output.decode_seq_groups) == 0
assert len(output.prefill_seq_groups) == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_seqs(use_v2_block_manager: bool):
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for i in range(4):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=4)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)
budget = create_token_budget(max_num_seqs=2)
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 2
assert budget.num_batched_tokens == 2
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 2
assert len(output.prefill_seq_groups) == 0
# Verify num_curr_seqs are respected.
output = scheduler._schedule_swapped(budget, curr_loras)
remaining_swapped = scheduler.swapped
assert len(remaining_swapped) == 2
assert budget.num_batched_tokens == 2
assert budget.num_curr_seqs == 2
assert len(output.decode_seq_groups) == 0
assert len(output.prefill_seq_groups) == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_loras(use_v2_block_manager: bool):
def test_schedule_swapped_max_loras():
block_size = 4
lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
scheduler = initialize_scheduler(lora_config=lora_config,
use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
......@@ -887,11 +675,9 @@ def test_schedule_swapped_max_loras(use_v2_block_manager: bool):
assert len(curr_loras) == 1
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_cannot_swap_in(use_v2_block_manager: bool):
def test_schedule_swapped_cannot_swap_in():
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
......@@ -920,11 +706,9 @@ def test_schedule_swapped_cannot_swap_in(use_v2_block_manager: bool):
assert len(output.prefill_seq_groups) == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_infeasible_swap(use_v2_block_manager: bool):
def test_infeasible_swap():
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
......@@ -954,11 +738,9 @@ def test_infeasible_swap(use_v2_block_manager: bool):
assert len(output.prefill_seq_groups) == 0
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_blocks_to_copy(use_v2_block_manager: bool):
def test_schedule_swapped_blocks_to_copy():
block_size = 4
scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
block_size=block_size,
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
......@@ -1027,3 +809,165 @@ def test_scheduling_budget():
assert budget.num_curr_seqs == 0
budget.subtract_num_seqs(seq_group.request_id, 2)
assert budget.num_curr_seqs == 0
@pytest.mark.parametrize("enable_prefix_caching", [True, False])
def test_prefix_caching_aware_prefills(enable_prefix_caching):
"""
Test the below scenario:
For 3 sequences, seqA, seqB, seqC, share the first block as prefix.
The test verifies the below scenarios:
1. SeqA is first scheduled.
2. SeqB and SeqC can be prefilled together in a single schedule round
even though there are not enough token budgets to prefill both without
considering prefix caching.
"""
block_size = 4
max_num_batched_tokens = 12
max_seq_group = 3
scheduler = initialize_scheduler(
block_size=block_size,
num_cpu_blocks=16,
num_gpu_blocks=16,
max_token_budget=max_num_batched_tokens,
max_num_seqs=max_seq_group,
max_model_len=max_num_batched_tokens,
enable_prefix_caching=enable_prefix_caching,
)
seqA_tokens = list(range(8))
num_shared_tokens = 4
seqB_tokens = seqA_tokens[:num_shared_tokens] + list(range(
12, 16)) # Shared prefix first 4.
seqC_tokens = seqA_tokens[:num_shared_tokens] + list(range(
16, 20)) # Shared prefix first 4.
seqA, seqA_group = create_dummy_prompt("0",
prompt_tokens=seqA_tokens,
block_size=block_size)
seqB, seqB_group = create_dummy_prompt("1",
prompt_tokens=seqB_tokens,
block_size=block_size)
seqC, seqC_group = create_dummy_prompt("2",
prompt_tokens=seqC_tokens,
block_size=block_size)
# Schedule seqA prefill.
scheduler.add_seq_group(seqA_group)
metas, out, _ = scheduler.schedule()
assert (len(out.scheduled_seq_groups) == 1
and out.scheduled_seq_groups[0].seq_group == seqA_group)
assert out.scheduled_seq_groups[0].token_chunk_size == len(seqA_tokens)
# Schedule seqA decode.
append_new_token_seq_group(len(seqA_tokens), seqA_group, 999)
metas, out, _ = scheduler.schedule()
assert len(out.scheduled_seq_groups) == 1
assert out.scheduled_seq_groups[0].seq_group == seqA_group
assert out.scheduled_seq_groups[0].token_chunk_size == 1
# Schedule seqB and seqC prefills should work with prefix caching.
scheduler.add_seq_group(seqB_group)
scheduler.add_seq_group(seqC_group)
metas, out, _ = scheduler.schedule()
if enable_prefix_caching:
assert len(out.scheduled_seq_groups) == 2
assert set([
out.scheduled_seq_groups[0].seq_group,
out.scheduled_seq_groups[1].seq_group,
]) == set([seqB_group, seqC_group])
assert len(metas) == 2
for meta in metas:
assert meta.token_chunk_size == 8
assert (len(meta.computed_block_nums) == num_shared_tokens //
block_size) # 1 Block for the 8 tokens.
else:
assert len(out.scheduled_seq_groups) == 1
assert len(metas) == 1
assert metas[0].token_chunk_size == 8
assert len(metas[0].computed_block_nums) == 0 # No blocks computed.
def test_no_multiple_partial_prefills_with_chunked_prefill_and_prefix_caching(
):
"""
This test verifies that we don't schedule new prefills if there's already
a continuous prefill in progress even though the new prefills with shared
prefix can fit in the token budget:
- SeqA is being chunked prefill.
- SeqB with the same prompt shouldn't be scheduled for prefill even though
there's enough token budget to prefill the cached tokens.
- Neither should seqC be scheduled.
- When seqA is in decoding phase, seqB and seqC can be scheduled.
- Entire seqB should be prefilled since it's a full prefix cache hit.
- SeqC would be partially prefilled with the prefix shared, and the
remaining unique tokens would be prefilled (rounded down to be
block-size aligned).
"""
block_size = 2
max_num_batched_tokens = 4
max_seq_group = 3
scheduler = initialize_scheduler(
block_size=block_size,
num_cpu_blocks=16,
num_gpu_blocks=16,
max_token_budget=max_num_batched_tokens,
max_num_seqs=max_seq_group,
max_model_len=100,
enable_prefix_caching=True,
enable_chunked_prefill=True,
)
seqA_tokens = list(range(8))
seqB_tokens = seqA_tokens
seqC_shared_prefix_len = 4
seqC_tokens = seqA_tokens[:seqC_shared_prefix_len] + list(range(12, 20))
seqA, seqA_group = create_dummy_prompt("0",
prompt_tokens=seqA_tokens,
block_size=block_size)
seqB, seqB_group = create_dummy_prompt("1",
prompt_tokens=seqB_tokens,
block_size=block_size)
# Chunked prefill seqA.
scheduler.add_seq_group(seqA_group)
metas, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.scheduled_seq_groups[0].seq_group == seqA_group
assert out.scheduled_seq_groups[0].token_chunk_size == 4
# seqB should not be scheduled with ongoing prefills.
scheduler.add_seq_group(seqB_group)
metas, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.scheduled_seq_groups[0].seq_group == seqA_group
assert out.scheduled_seq_groups[0].token_chunk_size == 4
# both seqB and seqC can now be scheduled with seqA is over.
# seqA is in decoding phase.
append_new_token_seq(seqA, 999)
seqC, seqC_group = create_dummy_prompt("2",
prompt_tokens=seqC_tokens,
block_size=block_size)
scheduler.add_seq_group(seqC_group)
metas, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 3
metas = {meta.request_id: meta for meta in metas}
assert metas[seqA_group.request_id].token_chunk_size == 1 # Decode
assert (metas[seqB_group.request_id].token_chunk_size == 8
) # Fully cached prefill
assert (
metas[seqC_group.request_id].token_chunk_size == 6
), "A partial prefix of C (4 tokens) should be prefilled, with the "
"remaining tokens fit into 3 token budget (4-1 from the seqA). It will "
"then be rounded down to 2 tokens on block size, thus 6 tokens in total."
......@@ -36,7 +36,12 @@ def test_scheduler_schedule_simple_encoder_decoder():
block_size = 4
num_seq_group = 4
max_model_len = 16
scheduler_config = SchedulerConfig(64, num_seq_group, max_model_len)
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens=64,
max_num_seqs=num_seq_group,
max_model_len=max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16 # enc and dec prompts per seq_group
cache_config.num_gpu_blocks = 16 # enc and dec prompts per seq_group
......
import time
from typing import List, Optional
from collections import defaultdict
from typing import Any, Dict, List, Optional
from typing import Sequence as GenericSequence
from typing import Tuple
from vllm import SamplingParams
from vllm.core.scheduler import Scheduler, SchedulerOutputs
from vllm.inputs import EncoderDecoderInputs, token_inputs
from vllm.lora.request import LoRARequest
from vllm.sequence import Logprob, Sequence, SequenceGroup
from vllm.sequence import (Logprob, Sequence, SequenceGroup,
SequenceGroupMetadata)
def create_dummy_prompt(
request_id: str,
prompt_length: int,
prompt_length: int = -1,
block_size: Optional[int] = None,
lora_request: Optional[LoRARequest] = None,
use_beam_search: bool = False,
best_of: int = 1,
prompt_tokens: Optional[List[int]] = None,
min_tokens: int = 0,
max_tokens: int = 16,
) -> Tuple[Sequence, SequenceGroup]:
if not block_size:
block_size = prompt_length
......@@ -24,31 +29,48 @@ def create_dummy_prompt(
# Create dummy prompt sequence with tokens 0...block_size-1
# and prompt "0 ... block_size".
prompt_tokens = list(range(prompt_length))
prompt_str = " ".join([str(t) for t in prompt_tokens])
prompt = Sequence(int(request_id),
inputs={
"prompt": prompt_str,
"prompt_token_ids": prompt_tokens,
},
inputs=token_inputs(prompt_tokens, prompt=prompt_str),
block_size=block_size)
seq_group = SequenceGroup(request_id=request_id,
seqs=[prompt],
arrival_time=time.time(),
sampling_params=SamplingParams(
use_beam_search=use_beam_search,
best_of=best_of),
best_of=best_of,
max_tokens=max_tokens,
min_tokens=min_tokens),
lora_request=lora_request)
return prompt, seq_group
def create_dummy_lora_sequence(request_id: int, token_ids: List[int],
block_size: int, lora_int_id: int) -> Sequence:
return Sequence(seq_id=request_id,
inputs=token_inputs(token_ids),
block_size=block_size,
lora_request=LoRARequest(lora_name="dummy",
lora_path="/dummy",
lora_int_id=lora_int_id))
def create_dummy_sequence(request_id: int, token_ids: List[int],
block_size: int) -> Sequence:
return Sequence(
seq_id=request_id,
inputs=token_inputs(token_ids),
block_size=block_size,
)
def create_dummy_prompt_encoder_decoder(
request_id: str,
decoder_prompt_length: int,
encoder_prompt_length: int,
block_size: Optional[int] = None,
lora_request: Optional[LoRARequest] = None,
use_beam_search: bool = False,
best_of: int = 1,
) -> Tuple[Sequence, Sequence, SequenceGroup]:
if not block_size:
......@@ -62,28 +84,24 @@ def create_dummy_prompt_encoder_decoder(
encoder_prompt_tokens = list(reversed(list(range(encoder_prompt_length))))
encoder_prompt_str = " ".join([str(t) for t in encoder_prompt_tokens])
inputs = {
"prompt": decoder_prompt_str,
"prompt_token_ids": decoder_prompt_tokens,
"encoder_prompt": encoder_prompt_str,
"encoder_prompt_token_ids": encoder_prompt_tokens,
"multi_modal_data": None,
inputs: EncoderDecoderInputs = {
"decoder": token_inputs(decoder_prompt_tokens,
prompt=decoder_prompt_str),
"encoder": token_inputs(encoder_prompt_tokens,
prompt=encoder_prompt_str),
}
decoder_prompt = Sequence(int(request_id),
inputs=inputs,
block_size=block_size,
from_decoder_prompt=True)
inputs=inputs["decoder"],
block_size=block_size)
encoder_prompt = Sequence(int(request_id),
inputs=inputs,
block_size=block_size,
from_decoder_prompt=False)
inputs=inputs["encoder"],
block_size=block_size)
seq_group = SequenceGroup(request_id=request_id,
seqs=[decoder_prompt],
sampling_params=SamplingParams(
use_beam_search=use_beam_search,
best_of=best_of),
sampling_params=SamplingParams(best_of=best_of),
arrival_time=time.time(),
lora_request=lora_request,
encoder_seq=encoder_prompt)
......@@ -109,7 +127,7 @@ def create_seq_group(
for seq_id_offset, output_len in enumerate(seq_output_lens):
seq = Sequence(
seq_id=seq_id_start + seq_id_offset,
inputs={"prompt_token_ids": prompt_token_ids},
inputs=token_inputs(prompt_token_ids),
block_size=16,
)
......@@ -144,21 +162,19 @@ def create_seq_group_encoder_decoder(
prompt_token_ids = [0] * seq_prompt_len
inputs = {
"prompt": "",
"prompt_token_ids": prompt_token_ids,
"encoder_prompt": "",
"encoder_prompt_token_ids": prompt_token_ids,
"multi_modal_data": None,
inputs: EncoderDecoderInputs = {
"decoder": token_inputs(prompt_token_ids),
"encoder": token_inputs(prompt_token_ids),
}
seqs = []
for seq_id_offset, output_len in enumerate(seq_output_lens):
# Construct decoder input sequences
seq = Sequence(seq_id=seq_id_start + seq_id_offset,
inputs=inputs,
block_size=16,
from_decoder_prompt=True)
seq = Sequence(
seq_id=seq_id_start + seq_id_offset,
inputs=inputs["decoder"],
block_size=16,
)
for i in range(output_len):
seq.append_token_id(
......@@ -168,10 +184,11 @@ def create_seq_group_encoder_decoder(
seqs.append(seq)
# Encoder input sequence
encoder_seq = Sequence(seq_id=seq_id_start + len(seq_output_lens),
inputs=inputs,
block_size=16,
from_decoder_prompt=False)
encoder_seq = Sequence(
seq_id=seq_id_start + len(seq_output_lens),
inputs=inputs["encoder"],
block_size=16,
)
return SequenceGroup(request_id=request_id,
seqs=seqs,
......@@ -200,12 +217,40 @@ def append_new_token(out, token_id: int):
def schedule_and_update_computed_tokens(scheduler):
metas, out, _ = scheduler.schedule()
for s, meta in zip(out.scheduled_seq_groups, metas):
s.seq_group.update_num_computed_tokens(meta.token_chunk_size)
for s in out.scheduled_seq_groups:
s.seq_group.update_num_computed_tokens(s.token_chunk_size)
return metas, out
def append_new_token_seq(seq: Sequence, token_id: int):
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
def append_new_token_seq_group(token_chunk_size, seq_group, token_id: int):
seq_group.update_num_computed_tokens(token_chunk_size)
for seq in seq_group.get_seqs():
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
class SchedulerProxy:
"""
A proxy class to forward calls to the scheduler.
"""
def __init__(self, scheduler: Scheduler):
self.scheduler_ = scheduler
self.call_history: Dict[str, List[Any]] = defaultdict(list)
def __getattr__(self, name: str) -> Any:
def wrapper(*args, **kwargs):
result = getattr(self.scheduler_, name)(*args, **kwargs)
self.call_history[name].append((args, kwargs, result))
return result
return wrapper
def last_schedule_ret(
self, ) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs, Any]:
_, _, ret = self.call_history["schedule"][-1]
return ret
port: 12312
served_model_name: mymodel
tensor_parallel_size: 2
trust_remote_code: true
multi_step_stream_outputs: false
# can only run on machines with p2p access across GPUs
# can only run with torchrun:
# torchrun --nproc_per_node=2 tests/distributed/test_ca_buffer_sharing.py
import ctypes
import torch
import torch.distributed as dist
from vllm.distributed.device_communicators.cuda_wrapper import CudaRTLibrary
from vllm.distributed.device_communicators.custom_all_reduce import ( # noqa
CustomAllreduce)
# create a cpu process group for communicating metadata (ipc handle)
dist.init_process_group(backend="gloo")
rank = local_rank = dist.get_rank()
world_size = dist.get_world_size()
# every process sets its own device (differently)
lib = CudaRTLibrary()
lib.cudaSetDevice(rank)
buffer_size_in_bytes = 1024
byte_value = 2 # the value we write to the buffer for verification
pointers = CustomAllreduce.create_shared_buffer(buffer_size_in_bytes)
print(f"Rank {rank} has pointers {pointers}")
dist.barrier()
torch.cuda.synchronize()
if rank == 0:
# the first rank tries to write to all buffers
for p in pointers:
pointer = ctypes.c_void_p(p)
lib.cudaMemset(pointer, byte_value, buffer_size_in_bytes)
dist.barrier()
torch.cuda.synchronize()
host_data = (ctypes.c_char * buffer_size_in_bytes)()
# all ranks read from all buffers, and check if the data is correct
for p in pointers:
pointer = ctypes.c_void_p(p)
lib.cudaMemcpy(host_data, pointer, buffer_size_in_bytes)
for i in range(buffer_size_in_bytes):
assert ord(host_data[i]) == byte_value, (
f"Rank {rank} failed"
f" to verify buffer {p}. Expected {byte_value}, "
f"got {ord(host_data[i])}")
print(f"Rank {rank} verified all buffers")
dist.barrier()
torch.cuda.synchronize()
CustomAllreduce.free_shared_buffer(pointers)
......@@ -95,13 +95,13 @@ def eager_allreduce(tp_size, pp_size, rank, distributed_init_port):
inp = torch.ones(sz, dtype=torch.float32, device=device)
out = inp
for _ in range(num_communication):
out = fa.all_reduce_unreg(out)
out = fa.all_reduce(out, registered=False)
torch.testing.assert_close(out, inp * (tp_size**num_communication))
inp = torch.ones(sz * 4, dtype=torch.bfloat16, device=device)
out = inp
for _ in range(num_communication):
out = fa.all_reduce_unreg(out)
out = fa.all_reduce(out, registered=False)
torch.testing.assert_close(out, inp * (tp_size**num_communication))
......
......@@ -6,11 +6,12 @@ WARNING: This test runs in both single-node (4 GPUs) and multi-node
to fail.
"""
import os
from dataclasses import dataclass
from typing import List, Literal, NamedTuple, Optional
import pytest
from packaging import version
from transformers import __version__ as transformers_version
from vllm.config import TaskOption
from vllm.logger import init_logger
from ..utils import compare_two_settings, fork_new_process_for_each_test, models_path_prefix
......@@ -20,52 +21,302 @@ logger = init_logger("test_pipeline_parallel")
VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"
@pytest.mark.parametrize(
("TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, TRUST_REMOTE_CODE, "
"MODEL_NAME, DIST_BACKEND"),
[
(2, 2, 0, 1, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "mp"),
(2, 2, 1, 0, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "mp"),
(1, 3, 0, 0, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "mp"),
(1, 4, 0, 1, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "mp"),
(1, 4, 1, 0, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "mp"),
(1, 3, 0, 0, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "ray"),
(1, 4, 0, 1, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "ray"),
(1, 4, 1, 0, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "ray"),
(2, 2, 1, 0, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "ray"),
(2, 2, 0, 1, 0, os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"), "ray"),
# NOTE: InternVL2 multi-node tests are flaky,
# use mp backend to skip the multi-node tests
(1, 2, 1, 1, 1, os.path.join(models_path_prefix, "OpenGVLab/InternVL2-1B"), "mp"),
(1, 2, 1, 1, 1, os.path.join(models_path_prefix, "OpenGVLab/InternVL2-2B"), "mp"),
(1, 2, 1, 0, 1, os.path.join(models_path_prefix, "OpenGVLab/InternVL2-4B"), "mp"),
(1, 2, 0, 1, 0, os.path.join(models_path_prefix, "Qwen/Qwen2-VL-2B-Instruct"), "mp")
],
)
@fork_new_process_for_each_test
def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL,
TRUST_REMOTE_CODE, MODEL_NAME, DIST_BACKEND):
if VLLM_MULTI_NODE and DIST_BACKEND == "mp":
class ParallelSetup(NamedTuple):
tp_size: int
pp_size: int
eager_mode: bool
chunked_prefill: bool
class PPTestOptions(NamedTuple):
multi_node_only: bool
trust_remote_code: bool
tokenizer_mode: Optional[str]
load_format: Optional[str] = None
hf_overrides: Optional[str] = None
@dataclass
class PPTestSettings:
parallel_setups: List[ParallelSetup]
distributed_backends: List[str]
task: TaskOption
test_options: PPTestOptions
@staticmethod
def detailed(
*,
tp_base: int = 1,
pp_base: int = 2,
multi_node_only: bool = False,
task: TaskOption = "auto",
trust_remote_code: bool = False,
tokenizer_mode: Optional[str] = None,
load_format: Optional[str] = None,
hf_overrides: Optional[str] = None,
):
return PPTestSettings(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
pp_size=pp_base,
eager_mode=False,
chunked_prefill=False),
ParallelSetup(tp_size=tp_base,
pp_size=2 * pp_base,
eager_mode=False,
chunked_prefill=True),
ParallelSetup(tp_size=tp_base,
pp_size=2 * pp_base,
eager_mode=True,
chunked_prefill=False),
ParallelSetup(tp_size=2 * tp_base,
pp_size=pp_base,
eager_mode=False,
chunked_prefill=True),
ParallelSetup(tp_size=2 * tp_base,
pp_size=pp_base,
eager_mode=True,
chunked_prefill=False),
],
distributed_backends=["mp", "ray"],
task=task,
test_options=PPTestOptions(multi_node_only=multi_node_only,
trust_remote_code=trust_remote_code,
tokenizer_mode=tokenizer_mode,
load_format=load_format,
hf_overrides=hf_overrides),
)
@staticmethod
def fast(
*,
tp_base: int = 1,
pp_base: int = 2,
task: TaskOption = "auto",
multi_node_only: bool = False,
trust_remote_code: bool = False,
tokenizer_mode: Optional[str] = None,
load_format: Optional[str] = None,
hf_overrides: Optional[str] = None,
):
return PPTestSettings(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
pp_size=pp_base,
eager_mode=True,
chunked_prefill=False),
],
distributed_backends=["mp"],
task=task,
test_options=PPTestOptions(multi_node_only=multi_node_only,
trust_remote_code=trust_remote_code,
tokenizer_mode=tokenizer_mode,
load_format=load_format,
hf_overrides=hf_overrides),
)
def iter_params(self, model_name: str):
opts = self.test_options
for parallel_setup in self.parallel_setups:
for distributed_backend in self.distributed_backends:
yield (model_name, parallel_setup, distributed_backend,
self.task, opts)
# NOTE: You can adjust tp_base and/or pp_base locally to fit the model in GPU
# The values displayed here are only a rough indicator of the size of the model
# yapf: disable
TEXT_GENERATION_MODELS = {
# [Decoder-only]
# Uses Llama
# "BAAI/AquilaChat-7B": PPTestSettings.fast(),
os.path.join(models_path_prefix, "Snowflake/snowflake-arctic-instruct"): PPTestSettings.fast(tp_base=8, trust_remote_code=True), # noqa: E501
os.path.join(models_path_prefix, "baichuan-inc/Baichuan-7B"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "baichuan-inc/Baichuan2-13B-Chat"): PPTestSettings.fast(trust_remote_code=True), # noqa: E501
os.path.join(models_path_prefix, "bigscience/bloomz-1b1"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "THUDM/chatglm3-6b"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "CohereForAI/c4ai-command-r-v01"): PPTestSettings.fast(tp_base=2, trust_remote_code=True), # noqa: E501
os.path.join(models_path_prefix, "databricks/dbrx-instruct"): PPTestSettings.fast(tp_base=8),
os.path.join(models_path_prefix, "Deci/DeciLM-7B-instruct"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "deepseek-ai/deepseek-llm-7b-chat"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "deepseek-ai/DeepSeek-V2-Lite-Chat"): PPTestSettings.fast(trust_remote_code=True), # noqa: E501
os.path.join(models_path_prefix, "LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "tiiuae/falcon-7b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "google/gemma-2b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "google/gemma-2-9b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "gpt2"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "bigcode/starcoder"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "EleutherAI/gpt-j-6b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "EleutherAI/pythia-12b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "ibm/PowerLM-3b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "ibm/PowerMoE-3b"): PPTestSettings.fast(),
# Uses Llama
# "internlm/internlm-chat-7b": PPTestSettings.fast(),
os.path.join(models_path_prefix, "internlm/internlm2-chat-7b"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "inceptionai/jais-13b-chat"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "ai21labs/Jamba-tiny-dev"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"): PPTestSettings.detailed(),
os.path.join(models_path_prefix, "openbmb/MiniCPM-2B-sft-bf16"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "openbmb/MiniCPM3-4B"): PPTestSettings.fast(trust_remote_code=True),
# Uses Llama
# "mistralai/Mistral-7B-Instruct-v0.1": PPTestSettings.fast(),
os.path.join(models_path_prefix, "state-spaces/mamba-130m-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "mistralai/Mixtral-8x7B-Instruct-v0.1"): PPTestSettings.fast(tp_base=4),
os.path.join(models_path_prefix, "mosaicml/mpt-7b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "nvidia/Minitron-8B-Base"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "allenai/OLMo-1B-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "shanearora/OLMo-7B-1124-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "allenai/OLMoE-1B-7B-0924-Instruct"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "facebook/opt-iml-max-1.3b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "OrionStarAI/Orion-14B-Chat"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "adept/persimmon-8b-chat"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "microsoft/phi-2"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "microsoft/Phi-3-small-8k-instruct"): PPTestSettings.fast(trust_remote_code=True), # noqa: E501
os.path.join(models_path_prefix, "microsoft/Phi-3.5-MoE-instruct"): PPTestSettings.detailed(trust_remote_code=True, multi_node_only=True, load_format="dummy", hf_overrides='{"num_hidden_layers": 4, "hidden_size": 512, "intermediate_size": 800, "num_attention_heads": 4, "num_key_value_heads": 1}'), # noqa: E501
os.path.join(models_path_prefix, "Qwen/Qwen-7B-Chat"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "Qwen/Qwen2-7B-Instruct"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "Qwen/Qwen1.5-MoE-A2.7B-Chat"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "stabilityai/stablelm-3b-4e1t"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "bigcode/starcoder2-3b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "upstage/solar-pro-preview-instruct"): PPTestSettings.fast(tp_base=2),
# FIXME: Cannot load tokenizer in latest transformers version.
# Need to use tokenizer from `meta-llama/Llama-2-7b-chat-hf`
# "xverse/XVERSE-7B-Chat": PPTestSettings.fast(trust_remote_code=True),
# [Encoder-only]
# TODO: Implement PP
# "facebook/bart-base": PPTestSettings.fast(),
}
EMBEDDING_MODELS = { # type: ignore[var-annotated]
# [Text-only]
"intfloat/e5-mistral-7b-instruct": PPTestSettings.fast(),
"BAAI/bge-multilingual-gemma2": PPTestSettings.fast(),
"Qwen/Qwen2.5-Math-RM-72B": PPTestSettings.fast(tp_base=4, trust_remote_code=True), # noqa: E501
}
MULTIMODAL_MODELS = {
# [Decoder-only]
os.path.join(models_path_prefix, "Salesforce/blip2-opt-2.7b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "facebook/chameleon-7b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "adept/fuyu-8b"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "THUDM/glm-4v-9b"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "OpenGVLab/InternVL2-1B"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "llava-hf/llava-1.5-7b-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "llava-hf/llava-v1.6-mistral-7b-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "llava-hf/LLaVA-NeXT-Video-7B-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "llava-hf/llava-onevision-qwen2-0.5b-ov-hf"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "openbmb/MiniCPM-Llama3-V-2_5"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "allenai/Molmo-7B-D-0924"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "microsoft/Phi-3-vision-128k-instruct"): PPTestSettings.fast(trust_remote_code=True), # noqa: E501
os.path.join(models_path_prefix, "mistralai/Pixtral-12B-2409"): PPTestSettings.fast(tp_base=2, tokenizer_mode="mistral"), # noqa: E501
os.path.join(models_path_prefix, "Qwen/Qwen-VL-Chat"): PPTestSettings.fast(trust_remote_code=True),
os.path.join(models_path_prefix, "Qwen/Qwen2-Audio-7B-Instruct"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "Qwen/Qwen2-VL-2B-Instruct"): PPTestSettings.fast(),
os.path.join(models_path_prefix, "fixie-ai/ultravox-v0_3"): PPTestSettings.fast(trust_remote_code=True),
# [Encoder-decoder]
# TODO: Implement PP
# "meta-llama/Llama-3.2-11B-Vision-Instruct": PPTestSettings.fast(),
}
# yapf: enable
# NOTE: You can update this on your local machine to run specific tests
TEST_MODELS = [
# [LANGUAGE GENERATION]
os.path.join(models_path_prefix, "microsoft/Phi-3.5-MoE-instruct"),
os.path.join(models_path_prefix, "meta-llama/Meta-Llama-3-8B"),
os.path.join(models_path_prefix, "ibm/PowerLM-3b"),
# [LANGUAGE EMBEDDING]
os.path.join(models_path_prefix, "intfloat/e5-mistral-7b-instruct"),
os.path.join(models_path_prefix, "BAAI/bge-multilingual-gemma2"),
# [MULTIMODAL GENERATION]
os.path.join(models_path_prefix, "OpenGVLab/InternVL2-1B"),
os.path.join(models_path_prefix, "microsoft/Phi-3-vision-128k-instruct"),
os.path.join(models_path_prefix, "fixie-ai/ultravox-v0_3"),
# [LANGUAGE GENERATION - HYBRID ARCH]
os.path.join(models_path_prefix, "ai21labs/Jamba-tiny-dev"),
]
def _compare_tp(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: PPTestOptions,
num_gpus_available: int,
*,
method: Literal["generate", "encode"],
):
(
tp_size,
pp_size,
eager_mode,
chunked_prefill,
) = parallel_setup
(
multi_node_only,
trust_remote_code,
tokenizer_mode,
load_format,
hf_overrides,
) = test_options
if num_gpus_available < tp_size * pp_size:
pytest.skip(f"Need at least {tp_size} x {pp_size} GPUs")
if VLLM_MULTI_NODE and distributed_backend == "mp":
pytest.skip("Skipping multi-node pipeline parallel test for "
"multiprocessing distributed backend")
if multi_node_only and not VLLM_MULTI_NODE:
pytest.skip("Not in multi-node setting")
# Skip tests that require transformers>=4.45.0
if "Qwen2-VL" in MODEL_NAME and version.parse(
transformers_version) < version.parse("4.45.0.dev0"):
pytest.skip("This test requires transformers>=4.45.0")
pp_args = [
common_args = [
# use half precision for speed and memory savings in CI environment
"--dtype",
"float16",
"--max-model-len",
"8192",
"2048",
"--max-num-seqs",
"8",
]
if chunked_prefill:
common_args.append("--enable-chunked-prefill")
if eager_mode:
common_args.append("--enforce-eager")
if task != "auto":
common_args.extend(["--task", task])
if trust_remote_code:
common_args.append("--trust-remote-code")
if tokenizer_mode:
common_args.extend(["--tokenizer-mode", tokenizer_mode])
if load_format:
common_args.extend(["--load-format", load_format])
if hf_overrides:
common_args.extend(["--hf-overrides", hf_overrides])
if (distributed_backend == "ray" and tp_size == 2 and pp_size == 2
and chunked_prefill):
# Test Ray ADAG for a subset of the tests
pp_env = {
"VLLM_USE_RAY_COMPILED_DAG": "1",
"VLLM_USE_RAY_SPMD_WORKER": "1",
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL": "1",
}
# Temporary. Currently when zeromq + SPMD is used, it does not properly
# terminate because of aDAG issue.
common_args.append("--disable-frontend-multiprocessing")
else:
pp_env = None
pp_args = [
*common_args,
"--pipeline-parallel-size",
str(PP_SIZE),
str(pp_size),
"--tensor-parallel-size",
str(TP_SIZE),
str(tp_size),
"--distributed-executor-backend",
DIST_BACKEND,
distributed_backend,
]
# compare without pipeline parallelism
......@@ -74,44 +325,103 @@ def test_compare_tp(TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL,
# schedule all workers in a node other than the head node,
# which can cause the test to fail.
tp_args = [
# use half precision for speed and memory savings in CI environment
"--dtype",
"float16",
"--max-model-len",
"8192",
*common_args,
"--tensor-parallel-size",
str(max(TP_SIZE, 2)), # We only use 2 GPUs in the CI.
str(tp_size),
"--distributed-executor-backend",
"mp",
]
if CHUNKED_PREFILL:
pp_args.append("--enable-chunked-prefill")
tp_args.append("--enable-chunked-prefill")
if EAGER_MODE:
pp_args.append("--enforce-eager")
tp_args.append("--enforce-eager")
if TRUST_REMOTE_CODE:
pp_args.append("--trust-remote-code")
tp_args.append("--trust-remote-code")
pp_env = None
if (DIST_BACKEND == "ray" and TP_SIZE == 2 and PP_SIZE == 2
and CHUNKED_PREFILL):
# Test Ray ADAG for a subset of the tests
pp_env = {
"VLLM_USE_RAY_COMPILED_DAG": "1",
"VLLM_USE_RAY_SPMD_WORKER": "1",
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL": "1",
}
# Temporary. Currently when zeromq + SPMD is used, it does not properly
# terminate because of aDAG issue.
pp_args.append("--disable-frontend-multiprocessing")
tp_args.append("--disable-frontend-multiprocessing")
try:
compare_two_settings(MODEL_NAME, pp_args, tp_args, pp_env)
compare_two_settings(model_name,
pp_args,
tp_args,
pp_env,
method=method)
except Exception:
if pp_env is None:
raise
else:
# Ray ADAG tests are flaky, so we don't want to fail the test
logger.exception("Ray ADAG tests failed")
@pytest.mark.parametrize(
("model_name", "parallel_setup", "distributed_backend", "task",
"test_options"),
[
params for model_name, settings in TEXT_GENERATION_MODELS.items()
for params in settings.iter_params(model_name)
if model_name in TEST_MODELS
],
)
@fork_new_process_for_each_test
def test_tp_language_generation(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: PPTestOptions,
num_gpus_available,
):
_compare_tp(model_name,
parallel_setup,
distributed_backend,
task,
test_options,
num_gpus_available,
method="generate")
@pytest.mark.parametrize(
("model_name", "parallel_setup", "distributed_backend", "task",
"test_options"),
[
params for model_name, settings in EMBEDDING_MODELS.items()
for params in settings.iter_params(model_name)
if model_name in TEST_MODELS
],
)
@fork_new_process_for_each_test
def test_tp_language_embedding(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: PPTestOptions,
num_gpus_available,
):
_compare_tp(model_name,
parallel_setup,
distributed_backend,
task,
test_options,
num_gpus_available,
method="encode")
@pytest.mark.parametrize(
("model_name", "parallel_setup", "distributed_backend", "task",
"test_options"),
[
params for model_name, settings in MULTIMODAL_MODELS.items()
for params in settings.iter_params(model_name)
if model_name in TEST_MODELS
],
)
@fork_new_process_for_each_test
def test_tp_multimodal_generation(
model_name: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
task: TaskOption,
test_options: PPTestOptions,
num_gpus_available,
):
_compare_tp(model_name,
parallel_setup,
distributed_backend,
task,
test_options,
num_gpus_available,
method="generate")
......@@ -60,9 +60,9 @@ def worker_fn():
tensor = torch.ones(16, 1024, 1024,
dtype=torch.float32).cuda(pynccl_comm.rank)
with pynccl_comm.change_state(enable=True):
pynccl_comm.all_reduce(tensor)
result = tensor.mean().cpu().item()
assert result == pynccl_comm.world_size
tensor = pynccl_comm.all_reduce(tensor)
torch.cuda.synchronize()
assert torch.all(tensor == pynccl_comm.world_size).cpu().item()
@pytest.mark.skipif(torch.cuda.device_count() < 2,
......@@ -84,14 +84,14 @@ def multiple_allreduce_worker_fn():
with pynccl_comm.change_state(enable=True):
# two groups can communicate independently
if torch.distributed.get_rank() in [0, 1]:
pynccl_comm.all_reduce(tensor)
pynccl_comm.all_reduce(tensor)
result = tensor.mean().cpu().item()
assert result == 4
tensor = pynccl_comm.all_reduce(tensor)
tensor = pynccl_comm.all_reduce(tensor)
torch.cuda.synchronize()
assert torch.all(tensor == 4).cpu().item()
else:
pynccl_comm.all_reduce(tensor)
result = tensor.mean().cpu().item()
assert result == 2
tensor = pynccl_comm.all_reduce(tensor)
torch.cuda.synchronize()
assert torch.all(tensor == 2).cpu().item()
@pytest.mark.skipif(torch.cuda.device_count() < 4,
......@@ -112,12 +112,12 @@ def multiple_allreduce_with_vllm_worker_fn():
if torch.distributed.get_rank() in [0, 1]:
tensor = tensor_model_parallel_all_reduce(tensor)
tensor = tensor_model_parallel_all_reduce(tensor)
result = tensor.mean().cpu().item()
assert result == 4
torch.cuda.synchronize()
assert torch.all(tensor == 4).cpu().item()
else:
tensor = tensor_model_parallel_all_reduce(tensor)
result = tensor.mean().cpu().item()
assert result == 2
torch.cuda.synchronize()
assert torch.all(tensor == 2).cpu().item()
@pytest.mark.skipif(torch.cuda.device_count() < 4,
......@@ -140,14 +140,82 @@ def worker_fn_with_cudagraph():
with torch.cuda.graph(
graph, stream=pynccl_comm.stream), pynccl_comm.change_state(
enable=True):
# operation during the graph capture is recorded but not executed
# see https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#creating-a-graph-using-stream-capture # noqa
pynccl_comm.all_reduce(a)
pynccl_comm.stream.synchronize()
assert a.mean().cpu().item() == pynccl_comm.world_size**0
a_out = pynccl_comm.all_reduce(a)
torch.cuda.synchronize()
graph.replay()
pynccl_comm.stream.synchronize()
assert a.mean().cpu().item() == pynccl_comm.world_size**1
torch.cuda.synchronize()
assert torch.all(a_out == pynccl_comm.world_size).cpu().item()
@worker_fn_wrapper
def all_gather_worker_fn():
pynccl_comm = PyNcclCommunicator(get_world_group().cpu_group,
device=get_world_group().device)
rank = pynccl_comm.rank
world_size = pynccl_comm.world_size
device = f'cuda:{pynccl_comm.rank}'
num_elems = 1000
tensor = torch.arange(num_elems, dtype=torch.float32,
device=device) + rank * num_elems
result = torch.zeros(num_elems * world_size,
dtype=torch.float32,
device=device)
expected = torch.cat([
torch.arange(num_elems, dtype=torch.float32) + r * num_elems
for r in range(world_size)
]).to(device)
with pynccl_comm.change_state(enable=True):
pynccl_comm.all_gather(result, tensor)
torch.cuda.synchronize()
torch.testing.assert_close(result, expected, rtol=1e-5, atol=1e-8)
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason="Need at least 2 GPUs to run the test.")
def test_pynccl_all_gather():
distributed_run(all_gather_worker_fn, 2)
@worker_fn_wrapper
def reduce_scatter_worker_fn():
pynccl_comm = PyNcclCommunicator(get_world_group().cpu_group,
device=get_world_group().device)
rank = pynccl_comm.rank
world_size = pynccl_comm.world_size
device = f'cuda:{pynccl_comm.rank}'
num_elems = 1000
tensor = torch.arange(num_elems, dtype=torch.float32,
device=device) + rank * num_elems
assert (num_elems % world_size == 0)
result = torch.zeros(num_elems // world_size,
dtype=torch.float32,
device=device)
# Calculate expected result for this rank's chunk
scattered_size = num_elems // world_size
all_tensors = [
torch.arange(num_elems, dtype=torch.float32) + r * num_elems
for r in range(world_size)
]
expected = sum(tensor[rank * scattered_size:(rank + 1) * scattered_size]
for tensor in all_tensors).to(device)
with pynccl_comm.change_state(enable=True):
pynccl_comm.reduce_scatter(result, tensor)
torch.cuda.synchronize()
torch.testing.assert_close(result, expected, rtol=1e-5, atol=1e-8)
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason="Need at least 2 GPUs to run the test.")
def test_pynccl_reduce_scatter():
distributed_run(reduce_scatter_worker_fn, 2)
@pytest.mark.skipif(torch.cuda.device_count() < 2,
......@@ -175,8 +243,8 @@ def send_recv_worker_fn():
pynccl_comm.recv(tensor,
src=(pynccl_comm.rank - 1) %
pynccl_comm.world_size)
result = tensor.mean().cpu().item()
assert result == 1
torch.cuda.synchronize()
assert torch.all(tensor == 1).cpu().item()
@pytest.mark.skipif(torch.cuda.device_count() < 2,
......@@ -214,11 +282,11 @@ def multiple_send_recv_worker_fn():
pynccl_comm.recv(tensor,
src=(pynccl_comm.rank - 1) %
pynccl_comm.world_size)
result = tensor.mean().cpu().item()
torch.cuda.synchronize()
if torch.distributed.get_rank() in [0, 2]:
assert result == 1
assert torch.all(tensor == 1).cpu().item()
else:
assert result == 2
assert torch.all(tensor == 2).cpu().item()
@pytest.mark.skipif(torch.cuda.device_count() < 4,
......@@ -227,6 +295,38 @@ def test_pynccl_multiple_send_recv():
distributed_run(multiple_send_recv_worker_fn, 4)
@pytest.mark.skipif(torch.cuda.device_count() < 4,
reason="Need at least 4 GPUs to run the test.")
def test_pynccl_broadcast():
distributed_run(broadcast_worker_fn, 4)
@worker_fn_wrapper
def broadcast_worker_fn():
# Test broadcast for every root rank.
# Essentially this is an all-gather operation.
pynccl_comm = PyNcclCommunicator(get_world_group().cpu_group,
device=get_world_group().device)
recv_tensors = [
torch.empty(16,
1024,
1024,
dtype=torch.float32,
device=pynccl_comm.device)
for i in range(pynccl_comm.world_size)
]
recv_tensors[pynccl_comm.rank] = torch.ones(
16, 1024, 1024, dtype=torch.float32,
device=pynccl_comm.device) * pynccl_comm.rank
for i in range(pynccl_comm.world_size):
pynccl_comm.broadcast(recv_tensors[i], src=i)
# the broadcast op might be launched in a different stream
# need to synchronize to make sure the tensor is ready
torch.cuda.synchronize()
assert torch.all(recv_tensors[i] == i).cpu().item()
def test_ncclGetUniqueId():
lib = NCCLLibrary()
unique_id = lib.ncclGetUniqueId()
......
......@@ -3,11 +3,32 @@ import os
import torch.distributed as dist
from vllm.distributed.parallel_state import in_the_same_node_as
from vllm.distributed.utils import StatelessProcessGroup
from vllm.utils import get_ip, get_open_port
if __name__ == "__main__":
dist.init_process_group(backend="gloo")
test_result = all(in_the_same_node_as(dist.group.WORLD, source_rank=0))
expected = os.environ.get("VLLM_TEST_SAME_HOST", "1") == "1"
assert test_result == expected, f"Expected {expected}, got {test_result}"
print("Same node test passed!")
rank = dist.get_rank()
if rank == 0:
port = get_open_port()
ip = get_ip()
dist.broadcast_object_list([ip, port], src=0)
else:
recv = [None, None]
dist.broadcast_object_list(recv, src=0)
ip, port = recv
stateless_pg = StatelessProcessGroup.create(ip, port, rank,
dist.get_world_size())
for pg in [dist.group.WORLD, stateless_pg]:
test_result = all(in_the_same_node_as(pg, source_rank=0))
expected = os.environ.get("VLLM_TEST_SAME_HOST", "1") == "1"
assert test_result == expected, \
f"Expected {expected}, got {test_result}"
if pg == dist.group.WORLD:
print("Same node test passed! when using torch distributed!")
else:
print("Same node test passed! when using StatelessProcessGroup!")
......@@ -7,7 +7,8 @@ import numpy as np
import torch.distributed as dist
from vllm.distributed.device_communicators.shm_broadcast import MessageQueue
from vllm.utils import update_environment_variables
from vllm.distributed.utils import StatelessProcessGroup
from vllm.utils import get_ip, get_open_port, update_environment_variables
def get_arrays(n: int, seed: int = 0) -> List[np.ndarray]:
......@@ -54,34 +55,61 @@ def worker_fn_wrapper(fn):
@worker_fn_wrapper
def worker_fn():
writer_rank = 2
broadcaster = MessageQueue.create_from_process_group(
dist.group.WORLD, 40 * 1024, 2, writer_rank)
if dist.get_rank() == writer_rank:
seed = random.randint(0, 1000)
dist.broadcast_object_list([seed], writer_rank)
else:
recv = [None]
dist.broadcast_object_list(recv, writer_rank)
seed = recv[0] # type: ignore
dist.barrier()
# in case we find a race condition
# print the seed so that we can reproduce the error
print(f"Rank {dist.get_rank()} got seed {seed}")
# test broadcasting with about 400MB of data
N = 10_000
if dist.get_rank() == writer_rank:
arrs = get_arrays(N, seed)
for x in arrs:
broadcaster.broadcast_object(x)
time.sleep(random.random() / 1000)
rank = dist.get_rank()
if rank == 0:
port = get_open_port()
ip = get_ip()
dist.broadcast_object_list([ip, port], src=0)
else:
arrs = get_arrays(N, seed)
for x in arrs:
y = broadcaster.broadcast_object(None)
assert np.array_equal(x, y)
time.sleep(random.random() / 1000)
dist.barrier()
recv = [None, None]
dist.broadcast_object_list(recv, src=0)
ip, port = recv
stateless_pg = StatelessProcessGroup.create(ip, port, rank,
dist.get_world_size())
for pg in [dist.group.WORLD, stateless_pg]:
writer_rank = 2
broadcaster = MessageQueue.create_from_process_group(
pg, 40 * 1024, 2, writer_rank)
if rank == writer_rank:
seed = random.randint(0, 1000)
dist.broadcast_object_list([seed], writer_rank)
else:
recv = [None]
dist.broadcast_object_list(recv, writer_rank)
seed = recv[0] # type: ignore
if pg == dist.group.WORLD:
dist.barrier()
else:
pg.barrier()
# in case we find a race condition
# print the seed so that we can reproduce the error
print(f"Rank {rank} got seed {seed}")
# test broadcasting with about 400MB of data
N = 10_000
if rank == writer_rank:
arrs = get_arrays(N, seed)
for x in arrs:
broadcaster.broadcast_object(x)
time.sleep(random.random() / 1000)
else:
arrs = get_arrays(N, seed)
for x in arrs:
y = broadcaster.broadcast_object(None)
assert np.array_equal(x, y)
time.sleep(random.random() / 1000)
if pg == dist.group.WORLD:
dist.barrier()
print("torch distributed passed the test!")
else:
pg.barrier()
print("StatelessProcessGroup passed the test!")
def test_shm_broadcast():
......
import socket
import pytest
import ray
import torch
import vllm.envs as envs
from vllm.utils import (cuda_device_count_stateless,
from vllm.distributed.device_communicators.pynccl import PyNcclCommunicator
from vllm.distributed.utils import StatelessProcessGroup
from vllm.utils import (cuda_device_count_stateless, get_open_port,
update_environment_variables)
from ..utils import multi_gpu_test
@ray.remote
class _CUDADeviceCountStatelessTestActor:
......@@ -24,10 +32,110 @@ def test_cuda_device_count_stateless():
CUDA_VISIBLE_DEVICES is changed."""
actor = _CUDADeviceCountStatelessTestActor.options( # type: ignore
num_gpus=2).remote()
assert sorted(ray.get(
actor.get_cuda_visible_devices.remote()).split(",")) == ["0", "1"]
assert len(
sorted(ray.get(
actor.get_cuda_visible_devices.remote()).split(","))) == 2
assert ray.get(actor.get_count.remote()) == 2
ray.get(actor.set_cuda_visible_devices.remote("0"))
assert ray.get(actor.get_count.remote()) == 1
ray.get(actor.set_cuda_visible_devices.remote(""))
assert ray.get(actor.get_count.remote()) == 0
def cpu_worker(rank, WORLD_SIZE, port1, port2):
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
if rank <= 2:
pg2 = StatelessProcessGroup.create(host="127.0.0.1",
port=port2,
rank=rank,
world_size=3)
data = torch.tensor([rank])
data = pg1.broadcast_obj(data, src=2)
assert data.item() == 2
if rank <= 2:
data = torch.tensor([rank + 1])
data = pg2.broadcast_obj(data, src=2)
assert data.item() == 3
pg2.barrier()
pg1.barrier()
def gpu_worker(rank, WORLD_SIZE, port1, port2):
torch.cuda.set_device(rank)
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
pynccl1 = PyNcclCommunicator(pg1, device=rank)
if rank <= 2:
pg2 = StatelessProcessGroup.create(host="127.0.0.1",
port=port2,
rank=rank,
world_size=3)
pynccl2 = PyNcclCommunicator(pg2, device=rank)
data = torch.tensor([rank]).cuda()
pynccl1.all_reduce(data)
pg1.barrier()
torch.cuda.synchronize()
if rank <= 2:
pynccl2.all_reduce(data)
pg2.barrier()
torch.cuda.synchronize()
item = data[0].item()
print(f"rank: {rank}, item: {item}")
if rank == 3:
assert item == 6
else:
assert item == 18
def broadcast_worker(rank, WORLD_SIZE, port1, port2):
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
if rank == 2:
pg1.broadcast_obj("secret", src=2)
else:
obj = pg1.broadcast_obj(None, src=2)
assert obj == "secret"
pg1.barrier()
def allgather_worker(rank, WORLD_SIZE, port1, port2):
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
data = pg1.all_gather_obj(rank)
assert data == list(range(WORLD_SIZE))
pg1.barrier()
@pytest.mark.skip(reason="This test is flaky and prone to hang.")
@multi_gpu_test(num_gpus=4)
@pytest.mark.parametrize(
"worker", [cpu_worker, gpu_worker, broadcast_worker, allgather_worker])
def test_stateless_process_group(worker):
port1 = get_open_port()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", port1))
port2 = get_open_port()
WORLD_SIZE = 4
from multiprocessing import get_context
ctx = get_context("fork")
processes = []
for i in range(WORLD_SIZE):
rank = i
processes.append(
ctx.Process(target=worker, args=(rank, WORLD_SIZE, port1, port2)))
for p in processes:
p.start()
for p in processes:
p.join()
for p in processes:
assert not p.exitcode
print("All processes finished.")
......@@ -8,8 +8,10 @@ import pytest
import os
from transformers import AutoModelForSeq2SeqLM
from vllm.attention.selector import (_Backend, _cached_get_attn_backend,
global_force_attn_backend_context_manager)
from vllm.platforms import current_platform
from vllm.sequence import SampleLogprobs
from vllm.utils import is_cpu
from ..conftest import DecoderPromptType
from ..models.utils import check_logprobs_close
......@@ -17,6 +19,10 @@ from ..utils import models_path_prefix
from vllm.utils import is_hip
from vllm.attention.backends.utils import STR_NOT_IMPL_ENC_DEC_ROCM_HIP
LIST_ENC_DEC_SUPPORTED_BACKENDS = [
_Backend.XFORMERS, _Backend.FLASH_ATTN, None
]
def vllm_to_hf_output(
vllm_output: Tuple[List[int], str, Optional[SampleLogprobs]],
......@@ -32,16 +38,24 @@ def vllm_to_hf_output(
return output_ids, hf_output_str, out_logprobs
@pytest.mark.skipif(is_hip(),
@pytest.mark.skipif(current_platform.is_rocm(),
reason=STR_NOT_IMPL_ENC_DEC_ROCM_HIP)
@pytest.fixture(autouse=True)
def clear_cache():
"""Fixture to clear backend cache before each test."""
_cached_get_attn_backend.cache_clear() # Clear the cache
yield # This allows the test to run
@pytest.mark.parametrize("model", [os.path.join(models_path_prefix, "facebook/bart-large-cnn")])
@pytest.mark.parametrize("dtype", ["bfloat16"])
@pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("attn_backend", LIST_ENC_DEC_SUPPORTED_BACKENDS)
@pytest.mark.parametrize("max_tokens", [128])
@pytest.mark.parametrize("num_logprobs", [5])
@pytest.mark.parametrize("decoder_prompt_type", list(DecoderPromptType))
@pytest.mark.parametrize("enforce_eager", [True, False])
@pytest.mark.skipif(
is_cpu(),
current_platform.is_cpu(),
reason="CPU backend is not currently supported with encoder/decoder models"
)
def test_encoder_decoder_e2e(
......@@ -54,51 +68,58 @@ def test_encoder_decoder_e2e(
num_logprobs: int,
decoder_prompt_type: DecoderPromptType,
enforce_eager: bool,
attn_backend: _Backend,
) -> None:
'''
End-to-End (E2E) test for the encoder-decoder framework.
End-to-End (E2E) test for the encoder-decoder framework.
This test evaluates the encoder-decoder functionality using the BART
model. We compare the outputs of the Hugging Face and vLLM
implementations to ensure that both implementations produce consistent
and correct results.
'''
test_case_prompts = example_encoder_decoder_prompts[decoder_prompt_type]
# Configuration settings for HF baseline
hf_kwargs = {
"top_k": None,
"num_beams": 1,
"repetition_penalty": 1.0,
"top_p": 1.0,
"length_penalty": 1.0,
"early_stopping": False,
"no_repeat_ngram_size": None,
"min_length": 0
}
with hf_runner(model, dtype=dtype,
auto_cls=AutoModelForSeq2SeqLM) as hf_model:
hf_outputs = (hf_model.generate_encoder_decoder_greedy_logprobs_limit(
test_case_prompts,
max_tokens,
num_logprobs,
**hf_kwargs,
))
with vllm_runner(model, dtype=dtype,
enforce_eager=enforce_eager) as vllm_model:
vllm_outputs = vllm_model.generate_encoder_decoder_greedy_logprobs(
test_case_prompts, max_tokens, num_logprobs)
hf_skip_tokens = (1
if decoder_prompt_type == DecoderPromptType.NONE else 0)
check_logprobs_close(
outputs_0_lst=hf_outputs,
outputs_1_lst=[
vllm_to_hf_output(vllm_output, decoder_prompt_type)
for vllm_output in vllm_outputs
],
name_0="hf",
name_1="vllm",
num_outputs_0_skip_tokens=hf_skip_tokens,
)
with global_force_attn_backend_context_manager(attn_backend):
if attn_backend == _Backend.FLASH_ATTN:
# Flash Attention works only with bfloat16 data-type
dtype = 'bfloat16'
test_case_prompts = example_encoder_decoder_prompts[
decoder_prompt_type]
# Configuration settings for HF baseline
hf_kwargs = {
"top_k": None,
"num_beams": 1,
"repetition_penalty": 1.0,
"top_p": 1.0,
"length_penalty": 1.0,
"early_stopping": False,
"no_repeat_ngram_size": None,
"min_length": 0
}
with hf_runner(model, dtype=dtype,
auto_cls=AutoModelForSeq2SeqLM) as hf_model:
hf_outputs = (
hf_model.generate_encoder_decoder_greedy_logprobs_limit(
test_case_prompts,
max_tokens,
num_logprobs,
**hf_kwargs,
))
with vllm_runner(model, dtype=dtype,
enforce_eager=enforce_eager) as vllm_model:
vllm_outputs = vllm_model.generate_encoder_decoder_greedy_logprobs(
test_case_prompts, max_tokens, num_logprobs)
hf_skip_tokens = (1 if decoder_prompt_type == DecoderPromptType.NONE
else 0)
check_logprobs_close(
outputs_0_lst=hf_outputs,
outputs_1_lst=[
vllm_to_hf_output(vllm_output, decoder_prompt_type)
for vllm_output in vllm_outputs
],
name_0="hf",
name_1="vllm",
num_outputs_0_skip_tokens=hf_skip_tokens,
)
......@@ -4,6 +4,7 @@ import pytest
from transformers import PreTrainedTokenizer
from vllm.engine.output_processor.stop_checker import StopChecker
from vllm.inputs import token_inputs
from vllm.sampling_params import SamplingParams
from vllm.sequence import Logprob, Sequence, SequenceStatus
......@@ -15,7 +16,7 @@ def sequence_with_eos(text: str, eos_token: str,
"""
seq = Sequence(
seq_id=0,
inputs={"prompt_token_ids": []},
inputs=token_inputs([]),
block_size=16,
eos_token_id=eos_token_id,
)
......
......@@ -2,6 +2,7 @@ from argparse import ArgumentTypeError
import pytest
from vllm.config import PoolerConfig
from vllm.engine.arg_utils import EngineArgs, nullable_kvs
from vllm.utils import FlexibleArgumentParser
......@@ -30,6 +31,64 @@ def test_limit_mm_per_prompt_parser(arg, expected):
assert args.limit_mm_per_prompt == expected
def test_compilation_config():
parser = EngineArgs.add_cli_args(FlexibleArgumentParser())
# default value
args = parser.parse_args([])
assert args.compilation_config is None
# set to O3
args = parser.parse_args(["-O3"])
assert args.compilation_config.level == 3
# set to O 3 (space)
args = parser.parse_args(["-O", "3"])
assert args.compilation_config.level == 3
# set to O 3 (equals)
args = parser.parse_args(["-O=3"])
assert args.compilation_config.level == 3
# set to string form of a dict
args = parser.parse_args(["--compilation-config", "{'level': 3}"])
assert args.compilation_config.level == 3
# set to string form of a dict
args = parser.parse_args(["--compilation-config={'level': 3}"])
assert args.compilation_config.level == 3
def test_prefix_cache_default():
parser = EngineArgs.add_cli_args(FlexibleArgumentParser())
args = parser.parse_args([])
engine_args = EngineArgs.from_cli_args(args=args)
assert (not engine_args.enable_prefix_caching
), "prefix caching defaults to off."
# with flag to turn it on.
args = parser.parse_args(["--enable-prefix-caching"])
engine_args = EngineArgs.from_cli_args(args=args)
assert engine_args.enable_prefix_caching
# with disable flag to turn it off.
args = parser.parse_args(["--no-enable-prefix-caching"])
engine_args = EngineArgs.from_cli_args(args=args)
assert not engine_args.enable_prefix_caching
def test_valid_pooling_config():
parser = EngineArgs.add_cli_args(FlexibleArgumentParser())
args = parser.parse_args([
'--override-pooler-config',
'{"pooling_type": "MEAN"}',
])
engine_args = EngineArgs.from_cli_args(args=args)
assert engine_args.override_pooler_config == PoolerConfig(
pooling_type="MEAN", )
@pytest.mark.parametrize(
("arg"),
[
......@@ -42,22 +101,42 @@ def test_bad_nullable_kvs(arg):
nullable_kvs(arg)
@pytest.mark.parametrize(("arg", "expected"), [
(None, None),
("{}", {}),
('{"num_crops": 4}', {
"num_crops": 4
}),
('{"foo": {"bar": "baz"}}', {
"foo": {
"bar": "baz"
}
}),
# yapf: disable
@pytest.mark.parametrize(("arg", "expected", "option"), [
(None, None, "mm-processor-kwargs"),
("{}", {}, "mm-processor-kwargs"),
(
'{"num_crops": 4}',
{
"num_crops": 4
},
"mm-processor-kwargs"
),
(
'{"foo": {"bar": "baz"}}',
{
"foo":
{
"bar": "baz"
}
},
"mm-processor-kwargs"
),
(
'{"cast_logits_dtype":"bfloat16","sequence_parallel_norm":true,"sequence_parallel_norm_threshold":2048}',
{
"cast_logits_dtype": "bfloat16",
"sequence_parallel_norm": True,
"sequence_parallel_norm_threshold": 2048,
},
"override-neuron-config"
),
])
def test_mm_processor_kwargs_prompt_parser(arg, expected):
# yapf: enable
def test_composite_arg_parser(arg, expected, option):
parser = EngineArgs.add_cli_args(FlexibleArgumentParser())
if arg is None:
args = parser.parse_args([])
else:
args = parser.parse_args(["--mm-processor-kwargs", arg])
assert args.mm_processor_kwargs == expected
args = parser.parse_args([f"--{option}", arg])
assert getattr(args, option.replace("-", "_")) == expected
......@@ -50,9 +50,9 @@ def test_custom_executor_type_checking(model):
@pytest.mark.parametrize("model", [os.path.join(models_path_prefix, "facebook/opt-125m")])
def test_custom_executor(model, tmpdir):
def test_custom_executor(model, tmp_path):
cwd = os.path.abspath(".")
os.chdir(tmpdir)
os.chdir(tmp_path)
try:
assert not os.path.exists(".marker")
......@@ -70,9 +70,9 @@ def test_custom_executor(model, tmpdir):
@pytest.mark.parametrize("model", [os.path.join(models_path_prefix, "facebook/opt-125m")])
def test_custom_executor_async(model, tmpdir):
def test_custom_executor_async(model, tmp_path):
cwd = os.path.abspath(".")
os.chdir(tmpdir)
os.chdir(tmp_path)
try:
assert not os.path.exists(".marker")
......
import pytest
from ..conftest import IMAGE_ASSETS
from ..utils import models_path_prefix
HF_IMAGE_PROMPTS = IMAGE_ASSETS.prompts({
"stop_sign":
"USER: <image>\nWhat's the content of the image?\nASSISTANT:",
"cherry_blossom":
"USER: <image>\nWhat is the season?\nASSISTANT:",
})
models = [os.path.join(models_path_prefix, "llava-hf/llava-1.5-7b-hf")]
@pytest.mark.parametrize("model", models)
def test_context_length_too_short(vllm_runner, image_assets, model):
images = [asset.pil_image for asset in image_assets]
with pytest.raises(ValueError, match="too long to fit into the model"):
vllm_model = vllm_runner(
model,
max_model_len=128, # LLaVA has a feature size of 576
enforce_eager=True,
)
with vllm_model:
vllm_model.generate_greedy([HF_IMAGE_PROMPTS[0]],
max_tokens=1,
images=[images[0]])
......@@ -69,6 +69,76 @@ def sample_json_schema():
}
@pytest.fixture
def sample_complex_json_schema():
return {
"type": "object",
"properties": {
"score": {
"type": "integer",
"minimum": 0,
"maximum": 100 # Numeric range
},
"grade": {
"type": "string",
"pattern": "^[A-D]$" # Regex pattern
},
"email": {
"type": "string",
"pattern": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
},
"tags": {
"type": "array",
"items": {
"type": "string",
"pattern":
"^[a-z]{1,10}$" # Combining length and pattern restrictions
}
}
},
"required": ["score", "grade", "email", "tags"]
}
@pytest.fixture
def sample_definition_json_schema():
return {
'$defs': {
'Step': {
'properties': {
'explanation': {
'title': 'Explanation',
'type': 'string'
},
'output': {
'title': 'Output',
'type': 'string'
}
},
'required': ['explanation', 'output'],
'title': 'Step',
'type': 'object'
}
},
'properties': {
'steps': {
'items': {
'$ref': '#/$defs/Step'
},
'title': 'Steps',
'type': 'array'
},
'final_answer': {
'title': 'Final Answer',
'type': 'string'
}
},
'required': ['steps', 'final_answer'],
'title': 'MathReasoning',
'type': 'object'
}
@pytest.fixture
def sample_guided_choice():
return [
......
"""
This file test accuracy of the vLLM server via LMEval.
It uses local-completions, which interacts with vLLM
through the OAI API with N concurrent connections.
This simulates real work usage of the API and makes
sure that the zmq frontend mp RPC message passing and
AsyncLLMEngine are working correctly.
"""
import os
import lm_eval
import pytest
from vllm.platforms import current_platform
from ...utils import models_path_prefix
MODEL_NAME = os.path.join(models_path_prefix, "Qwen/Qwen2-1.5B-Instruct")
NUM_CONCURRENT = 500
TASK = "gsm8k"
FILTER = "exact_match,strict-match"
RTOL = 0.03
EXPECTED_VALUE = 0.58
def run_test():
"""Run the end to end accuracy test."""
model_args = f"pretrained={MODEL_NAME},max_model_len=2048"
results = lm_eval.simple_evaluate(
model="vllm",
model_args=model_args,
tasks="gsm8k",
batch_size="auto",
)
measured_value = results["results"][TASK][FILTER]
assert (measured_value - RTOL < EXPECTED_VALUE
and measured_value + RTOL > EXPECTED_VALUE
), f"Expected: {EXPECTED_VALUE} | Measured: {measured_value}"
@pytest.mark.skipif(not current_platform.is_cuda(),
reason="V1 is currently only supported on CUDA.")
def test_lm_eval_accuracy_v1_engine(monkeypatch):
"""Run with the V1 Engine."""
with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "1")
run_test()
def test_lm_eval_accuracy_v0_engine(monkeypatch):
"""Run with the V0 Engine."""
with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "0")
run_test()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment