Commit ad385667 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.6.3.post1-dev'

parents be0967c1 903593d3
......@@ -10,23 +10,23 @@ async def test_request_tracker():
stream_1 = tracker.add_request("1")
assert tracker.new_requests_event.is_set()
await tracker.wait_for_new_requests()
new, finished = tracker.get_new_and_finished_requests()
new, aborted = tracker.get_new_and_aborted_requests()
assert not tracker.new_requests_event.is_set()
assert len(new) == 1
assert new[0]["request_id"] == "1"
assert not finished
assert not aborted
assert not stream_1.finished
stream_2 = tracker.add_request("2")
stream_3 = tracker.add_request("3")
assert tracker.new_requests_event.is_set()
await tracker.wait_for_new_requests()
new, finished = tracker.get_new_and_finished_requests()
new, aborted = tracker.get_new_and_aborted_requests()
assert not tracker.new_requests_event.is_set()
assert len(new) == 2
assert new[0]["request_id"] == "2"
assert new[1]["request_id"] == "3"
assert not finished
assert not aborted
assert not stream_2.finished
assert not stream_3.finished
......@@ -36,9 +36,9 @@ async def test_request_tracker():
assert not tracker.new_requests_event.is_set()
tracker.abort_request("1")
new, finished = tracker.get_new_and_finished_requests()
assert len(finished) == 1
assert "1" in finished
new, aborted = tracker.get_new_and_aborted_requests()
assert len(aborted) == 1
assert "1" in aborted
assert not new
assert stream_1.finished
......@@ -46,9 +46,11 @@ async def test_request_tracker():
tracker.abort_request("4")
assert tracker.new_requests_event.is_set()
await tracker.wait_for_new_requests()
new, finished = tracker.get_new_and_finished_requests()
assert len(finished) == 1
assert "4" in finished
new, aborted = tracker.get_new_and_aborted_requests()
# aborted new requests will cancel each other out -
# there's no need for them to propagate into the
# engine
assert not aborted
assert not new
assert stream_4.finished
......@@ -57,10 +59,9 @@ async def test_request_tracker():
tracker.process_request_output(
RequestOutput("2", "output", [], [], [], finished=True))
await tracker.wait_for_new_requests()
new, finished = tracker.get_new_and_finished_requests()
new, aborted = tracker.get_new_and_aborted_requests()
assert not tracker.new_requests_event.is_set()
assert len(finished) == 1
assert "2" in finished
assert not aborted
assert len(new) == 1
assert new[0]["request_id"] == "5"
assert stream_2.finished
......
......@@ -3,20 +3,27 @@
Run `pytest tests/basic_correctness/test_basic_correctness.py`.
"""
import os
import pickle
import re
import weakref
from unittest.mock import patch
import pytest
from vllm import LLM
from vllm.utils import is_hip
from vllm.worker.model_runner import ModelInputForGPUWithSamplingMetadata
from ..models.utils import check_outputs_equal
from ..utils import multi_gpu_test
MODELS = [
"facebook/opt-125m",
"meta-llama/Llama-2-7b-hf",
]
TARGET_TEST_SUITE = os.environ.get("TARGET_TEST_SUITE", "L4")
def test_vllm_gc_ed():
"""Verify vllm instance is GC'ed when it is deleted"""
......@@ -64,3 +71,88 @@ def test_models(
name_0="hf",
name_1="vllm",
)
@multi_gpu_test(num_gpus=2)
@pytest.mark.parametrize(
"model, distributed_executor_backend, attention_backend, "
"test_suite", [
("facebook/opt-125m", "ray", "", "L4"),
("facebook/opt-125m", "mp", "", "L4"),
("meta-llama/Llama-2-7b-hf", "ray", "", "L4"),
("meta-llama/Llama-2-7b-hf", "mp", "", "L4"),
("facebook/opt-125m", "ray", "", "A100"),
("facebook/opt-125m", "mp", "", "A100"),
("facebook/opt-125m", "mp", "FLASHINFER", "A100"),
("meta-llama/Meta-Llama-3-8B", "ray", "FLASHINFER", "A100"),
])
def test_models_distributed(
hf_runner,
vllm_runner,
example_prompts,
model: str,
distributed_executor_backend: str,
attention_backend: str,
test_suite: str,
) -> None:
if test_suite != TARGET_TEST_SUITE:
pytest.skip(f"Skip test for {test_suite}")
if model == "meta-llama/Llama-2-7b-hf" and distributed_executor_backend == "ray" and attention_backend == "" and test_suite == "L4": # noqa
# test ray adag
os.environ['VLLM_USE_RAY_SPMD_WORKER'] = "1"
os.environ['VLLM_USE_RAY_COMPILED_DAG'] = "1"
if attention_backend:
os.environ["VLLM_ATTENTION_BACKEND"] = attention_backend
dtype = "half"
max_tokens = 5
# NOTE: take care of the order. run vLLM first, and then run HF.
# vLLM needs a fresh new process without cuda initialization.
# if we run HF first, the cuda initialization will be done and it
# will hurt multiprocessing backend with fork method (the default method).
with vllm_runner(model,
dtype=dtype,
tensor_parallel_size=2,
distributed_executor_backend=distributed_executor_backend
) as vllm_model:
vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens)
with hf_runner(model, dtype=dtype) as hf_model:
hf_outputs = hf_model.generate_greedy(example_prompts, max_tokens)
check_outputs_equal(
outputs_0_lst=hf_outputs,
outputs_1_lst=vllm_outputs,
name_0="hf",
name_1="vllm",
)
def test_model_with_failure(vllm_runner) -> None:
try:
with patch("vllm.model_executor.models.opt.OPTForCausalLM.forward",
side_effect=ValueError()):
with pytest.raises(ValueError) as exc_info:
vllm_runner("facebook/opt-125m",
dtype="half",
enforce_eager=False,
gpu_memory_utilization=0.7)
matches = re.search(r"input dumped to (.+).pkl",
str(exc_info.value))
assert matches is not None
filename = f"{matches.group(1)}.pkl"
with open(filename, "rb") as filep:
inputs = pickle.load(filep)
if any(key not in inputs for key in ("arg_1", "arg_2", "arg_3")):
raise AssertionError("Missing keys in dumped inputs. Dumped keys: "
f"{list(inputs.keys())}")
assert isinstance(inputs["arg_1"],
ModelInputForGPUWithSamplingMetadata)
finally:
os.remove(filename)
......@@ -6,9 +6,13 @@ prefill requests are chunked.
Run `pytest tests/models/test_chunked_prefill.py`.
"""
import os
from contextlib import nullcontext
import pytest
from ..models.utils import check_outputs_equal
from ..models.utils import check_logprobs_close, check_outputs_equal
from ..utils import multi_gpu_test
MODELS = [
"facebook/opt-125m",
......@@ -35,12 +39,12 @@ def test_models(
enforce_eager: bool,
tensor_parallel_size: int,
) -> None:
max_num_seqs = min(chunked_prefill_token_size, 256)
enable_chunked_prefill = False
max_num_batched_tokens = None
if chunked_prefill_token_size != -1:
enable_chunked_prefill = True
max_num_batched_tokens = chunked_prefill_token_size
"""
Checks exact match decode between huggingface model and vllm runner with
chunked prefill.
"""
max_num_seqs = chunked_prefill_token_size
max_num_batched_tokens = chunked_prefill_token_size
with hf_runner(model, dtype=dtype) as hf_model:
hf_outputs = hf_model.generate_greedy(example_prompts, max_tokens)
......@@ -49,7 +53,7 @@ def test_models(
model,
dtype=dtype,
max_num_batched_tokens=max_num_batched_tokens,
enable_chunked_prefill=enable_chunked_prefill,
enable_chunked_prefill=True,
tensor_parallel_size=tensor_parallel_size,
enforce_eager=enforce_eager,
max_num_seqs=max_num_seqs,
......@@ -62,3 +66,185 @@ def test_models(
name_0="hf",
name_1="vllm",
)
@multi_gpu_test(num_gpus=2)
@pytest.mark.parametrize("distributed_executor_backend", ["ray", "mp"])
@pytest.mark.parametrize("model", MODELS)
def test_models_distributed(
hf_runner,
vllm_runner,
example_prompts,
model: str,
distributed_executor_backend: str,
) -> None:
if (model == "meta-llama/Llama-2-7b-hf"
and distributed_executor_backend == "ray"):
# test ray adag
os.environ['VLLM_USE_RAY_SPMD_WORKER'] = "1"
os.environ['VLLM_USE_RAY_COMPILED_DAG'] = "1"
dtype = "half"
max_tokens = 5
chunked_prefill_token_size = 16
# Add a chunked prefill config.
max_num_seqs = min(chunked_prefill_token_size, 256)
assert chunked_prefill_token_size != -1
enable_chunked_prefill = True
max_num_batched_tokens = chunked_prefill_token_size
# NOTE: take care of the order. run vLLM first, and then run HF.
# vLLM needs a fresh new process without cuda initialization.
# if we run HF first, the cuda initialization will be done and it
# will hurt multiprocessing backend with fork method (the default method).
with vllm_runner(
model,
dtype=dtype,
tensor_parallel_size=2,
max_num_seqs=max_num_seqs,
enable_chunked_prefill=enable_chunked_prefill,
max_num_batched_tokens=max_num_batched_tokens,
distributed_executor_backend=distributed_executor_backend,
) as vllm_model:
vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens)
with hf_runner(model, dtype=dtype) as hf_model:
hf_outputs = hf_model.generate_greedy(example_prompts, max_tokens)
check_outputs_equal(
outputs_0_lst=hf_outputs,
outputs_1_lst=vllm_outputs,
name_0="hf",
name_1="vllm",
)
@pytest.mark.parametrize(
"kv_cache_dtype,model",
[("fp8_e4m3",
"nm-testing/TinyLlama-1.1B-compressed-tensors-kv-cache-scheme")])
# Due to low-precision numerical divergence, we only test logprob of 4 tokens
@pytest.mark.parametrize("max_tokens", [4])
@pytest.mark.parametrize("chunked_prefill_token_size", [4, 16])
@pytest.mark.parametrize("enforce_eager", [False, True])
# NOTE: Increasing this in this suite will fail CI because we currently cannot
# reset distributed env properly. Use a value > 1 just when you test.
@pytest.mark.parametrize("tensor_parallel_size", [1])
# Due to low-precision numerical divergence, this test is too sensitive to
# the async postprocessor
@pytest.mark.parametrize("disable_async_output_proc", [True])
def test_models_with_fp8_kv_cache(
vllm_runner,
example_prompts,
kv_cache_dtype: str,
model: str,
max_tokens: int,
chunked_prefill_token_size: int,
enforce_eager: bool,
tensor_parallel_size: int,
disable_async_output_proc: bool,
) -> None:
"""
Check output logprobs match between no_chunked_prefill and chunked_prefill
with fp8 kv cache. General fp8 kv-cache tests are covered in test_fp8.py,
so here we only check chunked prefill.
"""
NUM_LOG_PROBS = 8
max_num_seqs = chunked_prefill_token_size
max_num_batched_tokens = chunked_prefill_token_size
with vllm_runner(
model,
tensor_parallel_size=tensor_parallel_size,
enforce_eager=enforce_eager,
max_num_seqs=max_num_seqs,
kv_cache_dtype=kv_cache_dtype,
disable_async_output_proc=disable_async_output_proc,
) as vllm_model:
no_chunked_prefill_outputs = vllm_model.generate_greedy_logprobs(
example_prompts, max_tokens, NUM_LOG_PROBS)
with vllm_runner(
model,
max_num_batched_tokens=max_num_batched_tokens,
enable_chunked_prefill=True,
tensor_parallel_size=tensor_parallel_size,
enforce_eager=enforce_eager,
max_num_seqs=max_num_seqs,
kv_cache_dtype=kv_cache_dtype,
disable_async_output_proc=disable_async_output_proc,
) as vllm_model:
chunked_prefill_outputs = vllm_model.generate_greedy_logprobs(
example_prompts, max_tokens, NUM_LOG_PROBS)
check_logprobs_close(
outputs_0_lst=no_chunked_prefill_outputs,
outputs_1_lst=chunked_prefill_outputs,
name_0="no_chunked_prefill",
name_1="chunked_prefill",
)
@pytest.mark.parametrize("max_tokens", [16])
@pytest.mark.parametrize("enforce_eager", [False])
@pytest.mark.parametrize("chunk_size", [30, 32])
# NOTE: Increasing this in this suite will fail CI because we currently cannot
# reset distributed env properly. Use a value > 1 just when you test.
@pytest.mark.parametrize("tensor_parallel_size", [1])
def test_with_prefix_caching(
vllm_runner,
max_tokens: int,
enforce_eager: bool,
chunk_size: int,
tensor_parallel_size: int,
) -> None:
"""
Checks exact match decode with and without prefix caching
with chunked prefill enabled.
"""
model = "meta-llama/Llama-2-7b-chat-hf"
# The common prompt has 142 tokens with Llama-2 tokenizer.
common_prompt = "You are a helpful AI assistant " * 20
unique_prompts = [
"Question", # Warmup
"Question", # Fully cached
"Another question", # Partial cached
]
full_prompts = [f"{common_prompt}\n{p}" for p in unique_prompts]
max_num_batched_tokens = max_num_seqs = chunk_size
outputs = {} # type: ignore
check_result = True
for enable in (True, False):
with vllm_runner(
model,
dtype="half",
max_num_batched_tokens=max_num_batched_tokens,
enable_chunked_prefill=True,
enable_prefix_caching=enable,
tensor_parallel_size=tensor_parallel_size,
enforce_eager=enforce_eager,
max_num_seqs=max_num_seqs,
) as vllm_model:
# It should fail when prefix caching is enable and chunk
# size is not a multiple of block size (16).
should_fail = chunk_size % 16 != 0 and enable
check_result &= not should_fail
outputs[enable] = []
# Send the request one-by-one to ensure the cache is populated.
with pytest.raises(ValueError) if should_fail else nullcontext():
for prompt in full_prompts:
outputs[enable] += vllm_model.generate_greedy([prompt],
max_tokens)
# Check results only if we did not expect a failure.
if check_result:
check_outputs_equal(
outputs_0_lst=outputs[False],
outputs_1_lst=outputs[True],
name_0="w/o prefix caching",
name_1="with prefix caching",
)
import pytest
from tests.quantization.utils import is_quant_method_supported
from ..utils import compare_two_settings
def test_cpu_offload():
compare_two_settings("meta-llama/Llama-2-7b-hf", [],
["--cpu-offload-gb", "4"])
@pytest.mark.skipif(not is_quant_method_supported("fp8"),
reason="fp8 is not supported on this GPU type.")
def test_cpu_offload_fp8():
# Test quantization of an unquantized checkpoint
compare_two_settings("meta-llama/Meta-Llama-3-8B-Instruct",
["--quantization", "fp8"],
["--quantization", "fp8", "--cpu-offload-gb", "2"])
# Test loading a quantized checkpoint
compare_two_settings("neuralmagic/Meta-Llama-3-8B-Instruct-FP8", [],
["--cpu-offload-gb", "2"])
@pytest.mark.skipif(not is_quant_method_supported("awq"),
reason="awq is not supported on this GPU type.")
def test_cpu_offload_awq():
compare_two_settings("casperhansen/llama-3-8b-instruct-awq", [],
["--cpu-offload-gb", "2"])
@pytest.mark.skipif(not is_quant_method_supported("gptq_marlin"),
reason="gptq_marlin is not supported on this GPU type.")
def test_cpu_offload_compressed_tensors():
# Test wNa16
compare_two_settings("nm-testing/tinyllama-oneshot-w4a16-channel-v2", [],
["--cpu-offload-gb", "1"])
# Test w4a16_marlin24
compare_two_settings("nm-testing/llama7b-one-shot-2_4-w4a16-marlin24-t",
[], ["--cpu-offload-gb", "1"])
# Test w8a8
compare_two_settings(
"nm-testing/tinyllama-oneshot-w8w8-test-static-shape-change", [],
["--cpu-offload-gb", "1"])
......@@ -8,6 +8,7 @@ pytest tests/basic_correctness/test_preemption.py`.
import pytest
from prometheus_client import REGISTRY
import vllm.envs as envs
from vllm import SamplingParams
from vllm.core.scheduler import (ARTIFICIAL_PREEMPTION_MAX_CNT,
ENABLE_ARTIFICIAL_PREEMPT)
......@@ -18,10 +19,20 @@ MODELS = [
"facebook/opt-125m",
]
assert ENABLE_ARTIFICIAL_PREEMPT is True, (
"Use an env var VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1. "
"`VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1 pytest "
"tests/basic_correctness/test_preemption.py`")
@pytest.fixture(scope="module", autouse=True)
def check_settings():
assert ENABLE_ARTIFICIAL_PREEMPT is True, (
"Use an env var VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1."
"`VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1 "
"pytest tests/basic_correctness/test_preemption.py`")
@pytest.fixture
def worker_use_ray() -> bool:
# When SPMD worker is used, use ray_use_worker=True
# to test delta input optimization works with preemption.
return envs.VLLM_USE_RAY_SPMD_WORKER
@pytest.mark.parametrize("model", MODELS)
......@@ -36,6 +47,7 @@ def test_chunked_prefill_recompute(
dtype: str,
max_tokens: int,
chunked_prefill_token_size: int,
worker_use_ray: bool,
) -> None:
"""Ensure that chunked prefill works with preemption."""
max_num_seqs = min(chunked_prefill_token_size, 256)
......@@ -54,6 +66,8 @@ def test_chunked_prefill_recompute(
max_num_batched_tokens=max_num_batched_tokens,
enable_chunked_prefill=enable_chunked_prefill,
max_num_seqs=max_num_seqs,
worker_use_ray=worker_use_ray,
disable_log_stats=False,
) as vllm_model:
vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens)
assert (vllm_model.model.llm_engine.scheduler[0].artificial_preempt_cnt
......@@ -69,8 +83,7 @@ def test_chunked_prefill_recompute(
@pytest.mark.parametrize("model", MODELS)
# @pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("dtype", ["half"])
@pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("max_tokens", [96])
def test_preemption(
caplog_vllm,
......@@ -80,6 +93,7 @@ def test_preemption(
model: str,
dtype: str,
max_tokens: int,
worker_use_ray: bool,
) -> None:
"""By default, recompute preemption is enabled"""
......@@ -90,6 +104,7 @@ def test_preemption(
model,
dtype=dtype,
disable_log_stats=False,
worker_use_ray=worker_use_ray,
) as vllm_model:
vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens)
assert (vllm_model.model.llm_engine.scheduler[0].artificial_preempt_cnt
......@@ -121,112 +136,7 @@ def test_preemption(
@pytest.mark.parametrize("model", MODELS)
# @pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("dtype", ["half"])
@pytest.mark.parametrize("max_tokens", [96])
@pytest.mark.parametrize("beam_width", [4])
def test_swap(
caplog_vllm,
hf_runner,
vllm_runner,
example_prompts,
model: str,
dtype: str,
max_tokens: int,
beam_width: int,
) -> None:
"""Use beam search enables swapping."""
example_prompts = example_prompts[:1]
with hf_runner(model, dtype=dtype) as hf_model:
hf_outputs = hf_model.generate_beam_search(example_prompts, beam_width,
max_tokens)
with vllm_runner(
model,
dtype=dtype,
swap_space=10,
disable_log_stats=False,
) as vllm_model:
vllm_outputs = vllm_model.generate_beam_search(example_prompts,
beam_width, max_tokens)
assert (vllm_model.model.llm_engine.scheduler[0].artificial_preempt_cnt
< ARTIFICIAL_PREEMPTION_MAX_CNT)
total_preemption = (
vllm_model.model.llm_engine.scheduler[0].num_cumulative_preemption)
for i in range(len(example_prompts)):
hf_output_ids, _ = hf_outputs[i]
vllm_output_ids, _ = vllm_outputs[i]
assert len(hf_output_ids) == len(vllm_output_ids)
for j in range(len(hf_output_ids)):
assert hf_output_ids[j] == vllm_output_ids[j], (
f"Test{i} output{j}:\nHF: {hf_output_ids}\n"
f"vLLM: {vllm_output_ids}")
assert ("is preempted by PreemptionMode.SWAP mode because there "
"is not enough KV cache space." in caplog_vllm.text)
# Ensure the count bucket of request-level histogram metrics matches
# the number of requests as a simple sanity check to ensure metrics are
# generated
preemption_metrics = None
for m in REGISTRY.collect():
if m.name == "vllm:num_preemptions":
preemption_metrics = m
assert preemption_metrics is not None
total_recorded_preemption = 0
for sample in preemption_metrics.samples:
total_recorded_preemption += sample.value
assert total_preemption == total_recorded_preemption
@pytest.mark.parametrize("model", MODELS)
# @pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("dtype", ["half"])
@pytest.mark.parametrize("max_tokens", [96])
@pytest.mark.parametrize("beam_width", [4])
def test_swap_infeasible(
vllm_runner,
example_prompts,
model: str,
dtype: str,
max_tokens: int,
beam_width: int,
) -> None:
"""Verify infeasible swap request will be ignored."""
BLOCK_SIZE = 16
prefill_blocks = 2
decode_blocks = max_tokens // BLOCK_SIZE
example_prompts = example_prompts[:1]
with vllm_runner(
model,
dtype=dtype,
swap_space=10,
block_size=BLOCK_SIZE,
# Since beam search have more than 1 sequence, prefill +
# decode blocks are not enough to finish.
num_gpu_blocks_override=prefill_blocks + decode_blocks,
max_model_len=(prefill_blocks + decode_blocks) * BLOCK_SIZE,
) as vllm_model:
sampling_params = SamplingParams(n=beam_width,
use_beam_search=True,
temperature=0.0,
max_tokens=max_tokens,
ignore_eos=True)
req_outputs = vllm_model.model.generate(
example_prompts,
sampling_params=sampling_params,
)
assert (vllm_model.model.llm_engine.scheduler[0].artificial_preempt_cnt
< ARTIFICIAL_PREEMPTION_MAX_CNT)
# Verify the request is ignored and not hang.
assert req_outputs[0].outputs[0].finish_reason == "length"
@pytest.mark.parametrize("model", MODELS)
# @pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("dtype", ["half"])
@pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("max_tokens", [96])
def test_preemption_infeasible(
vllm_runner,
......@@ -234,6 +144,7 @@ def test_preemption_infeasible(
model: str,
dtype: str,
max_tokens: int,
worker_use_ray: bool,
) -> None:
"""Verify infeasible preemption request will be ignored."""
BLOCK_SIZE = 16
......@@ -248,6 +159,7 @@ def test_preemption_infeasible(
# ignored instead of hanging forever.
num_gpu_blocks_override=prefill_blocks + decode_blocks // 2,
max_model_len=((prefill_blocks + decode_blocks // 2) * BLOCK_SIZE),
worker_use_ray=worker_use_ray,
) as vllm_model:
sampling_params = SamplingParams(max_tokens=max_tokens,
ignore_eos=True)
......@@ -263,4 +175,4 @@ def test_preemption_infeasible(
for req_output in req_outputs:
outputs = req_output.outputs
assert len(outputs) == 1
assert outputs[0].finish_reason == "length"
assert outputs[0].finish_reason == "length"
\ No newline at end of file
from typing import Dict, List, Optional
import pytest
from vllm.compilation.levels import CompilationLevel
from vllm.utils import cuda_device_count_stateless
from ..utils import compare_all_settings
# we cannot afford testing the full Catesian product
# of all models and all levels
@pytest.mark.parametrize(
"model, model_args, pp_size, tp_size, attn_backend, method, fullgraph",
[
("meta-llama/Meta-Llama-3-8B", [], 2, 2, "FLASH_ATTN", "generate",
True),
("nm-testing/Meta-Llama-3-8B-Instruct-W8A8-Dyn-Per-Token-2048-Samples",
["--quantization", "compressed-tensors"
], 1, 1, "FLASH_ATTN", "generate", True),
("google/gemma-2-2b-it", [], 1, 2, "FLASHINFER", "generate", True),
# TODO: add multi-modality test for llava
("llava-hf/llava-1.5-7b-hf", [], 2, 1, "FLASHINFER", "generate", False)
])
def test_compile_correctness(model, model_args, pp_size, tp_size, attn_backend,
method, fullgraph):
# this test is run under multiple suits, with different GPUs.
# make sure we only run the test with correct CUDA devices.
# don't use "<", as it will duplicate the tests.
if cuda_device_count_stateless() != pp_size * tp_size:
pytest.skip("Not correct CUDA devices for the test.")
import os
os.environ["VLLM_ATTENTION_BACKEND"] = attn_backend
if not fullgraph:
os.environ["VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE"] = "0"
all_args = [["--enforce-eager"] + model_args + ["--max_model_len", "1024"]
+ ["-pp", str(pp_size)] + ["-tp", str(tp_size)]] * 3
# don't test VLLM_TORCH_COMPILE_LEVEL == 3 case
# inductor will change the output, so we cannot compare them.
all_envs: List[Optional[Dict[str, str]]] = [{
"VLLM_TORCH_COMPILE_LEVEL":
str(level)
} for level in [
CompilationLevel.NO_COMPILATION,
CompilationLevel.DYNAMO_AS_IS,
CompilationLevel.DYNAMO_ONCE,
]]
compare_all_settings(model, all_args, all_envs, method=method)
import pytest
from vllm.compilation.levels import CompilationLevel
from ..utils import fork_new_process_for_each_test
from .utils import TEST_MODELS, check_full_graph_support
@pytest.mark.parametrize("model_info", TEST_MODELS)
@pytest.mark.parametrize(
"optimization_level",
[CompilationLevel.DYNAMO_ONCE, CompilationLevel.INDUCTOR])
@fork_new_process_for_each_test
def test_full_graph(model_info, optimization_level):
model = model_info[0]
model_kwargs = model_info[1]
check_full_graph_support(model,
model_kwargs,
optimization_level,
tp_size=1)
from typing import Optional
import torch
from vllm.compilation.wrapper import TorchCompileWrapperWithCustomDispatcher
class MyMod(torch.nn.Module):
def forward(self, x: torch.Tensor, cache: Optional[torch.Tensor] = None):
if cache is not None:
return x + cache
return x * 2
class MyWrapper(TorchCompileWrapperWithCustomDispatcher):
def __init__(self, model):
self.model = model
compiled_callable = torch.compile(self.forward, backend="eager")
super().__init__(compiled_callable)
def forward(self, x: torch.Tensor, cache: Optional[torch.Tensor] = None):
# this is the function to be compiled
return self.model(x, cache)
def __call__(self, x: torch.Tensor, cache: Optional[torch.Tensor] = None):
# let torch.compile compile twice
if len(self.compiled_codes) == 2:
dispatch_id = 0 if cache is None else 1
with self.dispatch_to_code(dispatch_id):
return self.forward(x, cache)
else:
return self.compiled_callable(x, cache)
def test_torch_compile_wrapper():
mod = MyMod()
wrappers = []
for i in range(3):
torch._dynamo.reset()
wrapper = MyWrapper(mod)
wrappers.append(wrapper)
x = torch.tensor([1])
wrapper(x, None) # profile run, compile
# create a cache tensor
cache = torch.tensor([2])
wrapper(x, cache) # warm up with cache, recompile
# for new input, dispatch to the compiled code directly
new_x = torch.tensor([3])
assert wrapper(new_x,
None).item() == 6 # dispatch to the first compiled code
assert wrapper(
new_x, cache).item() == 5 # dispatch to the second compiled code
for wrapper in wrappers:
# make sure they have independent compiled codes
assert len(wrapper.compiled_codes) == 2
import os
import torch
from tests.quantization.utils import is_quant_method_supported
from vllm import LLM, SamplingParams
from vllm.compilation.levels import CompilationLevel
from vllm.utils import is_hip
TEST_MODELS = [
("facebook/opt-125m", {}),
("nm-testing/tinyllama-oneshot-w8w8-test-static-shape-change", {
"dtype": torch.float16,
"quantization": "compressed-tensors"
}),
("neuralmagic/Meta-Llama-3-8B-Instruct-FP8", {
"dtype": torch.float16,
"quantization": "fp8"
}),
("nm-testing/Meta-Llama-3-8B-Instruct-W8A8-Dyn-Per-Token-2048-Samples", {
"quantization": "compressed-tensors"
}),
("meta-llama/Meta-Llama-3-8B", {}),
]
# TODO: enable in pytorch 2.5
if False and is_quant_method_supported("aqlm"): # noqa: SIM223
TEST_MODELS.append(("ISTA-DASLab/Llama-2-7b-AQLM-2Bit-1x16-hf", {
"quantization": "aqlm"
}))
# TODO: enable in pytorch 2.5
if False and is_quant_method_supported("gguf"): # noqa: SIM223
TEST_MODELS.append(("TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF", {
"quantization": "gguf"
}))
if is_quant_method_supported("gptq"):
TEST_MODELS.append(("TheBloke/TinyLlama-1.1B-Chat-v0.3-GPTQ", {
"quantization": "gptq"
}))
if is_quant_method_supported("gptq_marlin"):
TEST_MODELS.append(("TheBloke/TinyLlama-1.1B-Chat-v1.0-GPTQ", {
"quantization": "gptq_marlin"
}))
if is_quant_method_supported("gptq_marlin_24"):
TEST_MODELS.append(("alexm-nm/tinyllama-24-marlin24-4bit-g128", {
"quantization": "gptq_marlin_24"
}))
if is_quant_method_supported("marlin"):
TEST_MODELS.append(("robertgshaw2/TinyLlama-1.1B-Chat-v1.0-g128-marlin", {
"quantization": "marlin"
}))
if not is_hip() and is_quant_method_supported("awq"):
TEST_MODELS.append(("TheBloke/TinyLlama-1.1B-Chat-v0.3-AWQ", {
"quantization": "AWQ"
}))
def check_full_graph_support(model,
model_kwargs,
optimization_level,
tp_size=1):
# make sure these models can be captured in full graph mode
os.environ["VLLM_TORCH_COMPILE_LEVEL"] = str(optimization_level)
os.environ["VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE"] = "1"
# Inductor doesn't support fp8/gptq_marlin_24 yet.
quantization = model_kwargs.get("quantization")
if (quantization == "fp8" or quantization == "gptq_marlin"
or quantization == "gptq_marlin_24"
) and optimization_level >= CompilationLevel.INDUCTOR:
return
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
sampling_params = SamplingParams(temperature=0)
llm = LLM(model=model,
enforce_eager=True,
tensor_parallel_size=tp_size,
disable_custom_all_reduce=True,
**model_kwargs)
outputs = llm.generate(prompts, sampling_params)
# Print the outputs.
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}")
import contextlib
import gc
import json
import os
import sys
import tempfile
from collections import UserList
from typing import Any, Dict, List, Optional, Tuple, TypedDict, TypeVar, Union
from enum import Enum
from typing import (Any, Callable, Dict, List, Optional, Tuple, Type,
TypedDict, TypeVar, Union)
import numpy as np
import pytest
import torch
import torch.nn as nn
import torch.nn.functional as F
from huggingface_hub import snapshot_download
from PIL import Image
from transformers import (AutoModelForCausalLM, AutoModelForVision2Seq,
AutoTokenizer, BatchEncoding, BatchFeature)
from transformers import (AutoModelForCausalLM, AutoTokenizer, BatchEncoding,
BatchFeature)
from transformers.models.auto.auto_factory import _BaseAutoModelClass
from tests.models.utils import (TokensTextLogprobs,
TokensTextLogprobsPromptLogprobs)
from vllm import LLM, SamplingParams
from vllm.assets.image import ImageAsset
from vllm.assets.video import VideoAsset
from vllm.config import TokenizerPoolConfig
from vllm.connections import global_http_connection
from vllm.distributed import (destroy_distributed_environment,
destroy_model_parallel)
from vllm.inputs import TextPrompt
destroy_model_parallel,
init_distributed_environment,
initialize_model_parallel)
from vllm.inputs import (ExplicitEncoderDecoderPrompt, TextPrompt,
to_enc_dec_tuple_list, zip_enc_dec_prompts)
from vllm.logger import init_logger
from vllm.sequence import SampleLogprobs
from vllm.outputs import RequestOutput
from vllm.sampling_params import BeamSearchParams
from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, cuda_device_count_stateless,
is_cpu)
identity, is_cpu)
logger = init_logger(__name__)
......@@ -31,6 +45,11 @@ _TEST_DIR = os.path.dirname(__file__)
_TEST_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "example.txt")]
_LONG_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "summary.txt")]
PromptImageInput = Union[List[Image.Image], List[List[Image.Image]]]
PromptAudioInput = Union[List[Tuple[np.ndarray, int]],
List[List[Tuple[np.ndarray, int]]]]
PromptVideoInput = Union[List[np.ndarray], List[List[np.ndarray]]]
def _read_prompts(filename: str) -> List[str]:
with open(filename, "r") as f:
......@@ -71,8 +90,35 @@ class _ImageAssets(_ImageAssetsBase):
return [prompts["stop_sign"], prompts["cherry_blossom"]]
class _VideoAssetPrompts(TypedDict):
sample_demo_1: str
if sys.version_info < (3, 9):
# UserList cannot be subscripted
class _VideoAssetsBase(UserList):
pass
else:
class _VideoAssetsBase(UserList[VideoAsset]):
pass
class _VideoAssets(_VideoAssetsBase):
def __init__(self) -> None:
super().__init__([
VideoAsset("sample_demo_1.mp4"),
])
def prompts(self, prompts: _VideoAssetPrompts) -> List[str]:
return [prompts["sample_demo_1"]]
IMAGE_ASSETS = _ImageAssets()
"""Singleton instance of :class:`_ImageAssets`."""
VIDEO_ASSETS = _VideoAssets()
"""Singleton instance of :class:`_VideoAssets`."""
@pytest.fixture(autouse=True)
......@@ -82,6 +128,21 @@ def init_test_http_connection():
global_http_connection.reuse_client = False
@pytest.fixture
def dist_init():
temp_file = tempfile.mkstemp()[1]
init_distributed_environment(
world_size=1,
rank=0,
distributed_init_method=f"file://{temp_file}",
local_rank=0,
backend="nccl",
)
initialize_model_parallel(1, 1)
yield
cleanup()
def cleanup():
destroy_model_parallel()
destroy_distributed_environment()
......@@ -99,10 +160,7 @@ def should_do_global_cleanup_after_test(request) -> bool:
to initialize torch.
"""
if request.node.get_closest_marker("skip_global_cleanup"):
return False
return True
return not request.node.get_closest_marker("skip_global_cleanup")
@pytest.fixture(autouse=True)
......@@ -112,6 +170,12 @@ def cleanup_fixture(should_do_global_cleanup_after_test: bool):
cleanup()
@pytest.fixture(autouse=True)
def dynamo_reset():
yield
torch._dynamo.reset()
@pytest.fixture
def example_prompts() -> List[str]:
prompts = []
......@@ -120,6 +184,46 @@ def example_prompts() -> List[str]:
return prompts
class DecoderPromptType(Enum):
"""For encoder/decoder models only."""
CUSTOM = 1
NONE = 2
EMPTY_STR = 3
@pytest.fixture
def example_encoder_decoder_prompts(
) -> Dict[DecoderPromptType, List[ExplicitEncoderDecoderPrompt]]:
'''
Returns an encoder prompt list and a decoder prompt list, wherein each pair
of same-index entries in both lists corresponds to an (encoder prompt,
decoder prompt) tuple.
Returns:
* Encoder prompt list
* Decoder prompt list (reverse of encoder prompt list)
'''
encoder_prompts = []
for filename in _TEST_PROMPTS:
encoder_prompts += _read_prompts(filename)
custom_decoder_prompts = encoder_prompts[::-1]
empty_str_decoder_prompts = [""] * len(encoder_prompts)
none_decoder_prompts = [None] * len(encoder_prompts)
# NONE decoder prompt type
return {
DecoderPromptType.NONE:
zip_enc_dec_prompts(encoder_prompts, none_decoder_prompts),
DecoderPromptType.EMPTY_STR:
zip_enc_dec_prompts(encoder_prompts, empty_str_decoder_prompts),
DecoderPromptType.CUSTOM:
zip_enc_dec_prompts(encoder_prompts, custom_decoder_prompts),
}
@pytest.fixture
def example_long_prompts() -> List[str]:
prompts = []
......@@ -133,16 +237,24 @@ def image_assets() -> _ImageAssets:
return IMAGE_ASSETS
@pytest.fixture(scope="session")
def video_assets() -> _VideoAssets:
return VIDEO_ASSETS
_T = TypeVar("_T", nn.Module, torch.Tensor, BatchEncoding, BatchFeature)
class HfRunner:
def wrap_device(self, input: _T) -> _T:
if not is_cpu():
return input.to("cuda")
else:
return input.to("cpu")
def wrap_device(self, input: _T, device: Optional[str] = None) -> _T:
if device is None:
return self.wrap_device(input, "cpu" if is_cpu() else "cuda")
if hasattr(input, "device") and input.device.type == device:
return input
return input.to(device)
def __init__(
self,
......@@ -150,27 +262,25 @@ class HfRunner:
dtype: str = "half",
*,
model_kwargs: Optional[Dict[str, Any]] = None,
is_embedding_model: bool = False,
is_vision_model: bool = False,
is_sentence_transformer: bool = False,
auto_cls: Type[_BaseAutoModelClass] = AutoModelForCausalLM,
postprocess_inputs: Callable[[BatchEncoding],
BatchEncoding] = identity,
) -> None:
torch_dtype = STR_DTYPE_TO_TORCH_DTYPE[dtype]
self.model_name = model_name
if is_embedding_model:
if is_sentence_transformer:
# Lazy init required for AMD CI
from sentence_transformers import SentenceTransformer
self.model = self.wrap_device(
SentenceTransformer(
model_name,
device="cpu",
trust_remote_code=True,
).to(dtype=torch_dtype))
else:
if is_vision_model:
auto_cls = AutoModelForVision2Seq
else:
auto_cls = AutoModelForCausalLM
model_kwargs = model_kwargs if model_kwargs is not None else {}
self.model = self.wrap_device(
auto_cls.from_pretrained(
......@@ -186,31 +296,34 @@ class HfRunner:
trust_remote_code=True,
)
try:
# don't put this import at the top level
# it will call torch.cuda.device_count()
from transformers import AutoProcessor # noqa: F401
self.processor = AutoProcessor.from_pretrained(
model_name,
torch_dtype=torch_dtype,
trust_remote_code=True,
)
except Exception:
logger.warning(
"Unable to auto-load processor from HuggingFace for "
"model %s. Using tokenizer instead.", model_name)
self.processor = self.tokenizer
# don't put this import at the top level
# it will call torch.cuda.device_count()
from transformers import AutoProcessor # noqa: F401
self.processor = AutoProcessor.from_pretrained(
model_name,
torch_dtype=torch_dtype,
trust_remote_code=True,
)
def generate(
self.postprocess_inputs = postprocess_inputs
def get_inputs(
self,
prompts: List[str],
images: Optional[List[Image.Image]] = None,
**kwargs: Any,
) -> List[Tuple[List[List[int]], List[str]]]:
if images:
images: Optional[PromptImageInput] = None,
videos: Optional[PromptVideoInput] = None,
audios: Optional[PromptAudioInput] = None,
) -> List[BatchEncoding]:
if images is not None:
assert len(prompts) == len(images)
outputs: List[Tuple[List[List[int]], List[str]]] = []
if videos is not None:
assert len(prompts) == len(videos)
if audios is not None:
assert len(prompts) == len(audios)
all_inputs: List[BatchEncoding] = []
for i, prompt in enumerate(prompts):
processor_kwargs: Dict[str, Any] = {
"text": prompt,
......@@ -218,11 +331,37 @@ class HfRunner:
}
if images is not None and images[i] is not None:
processor_kwargs["images"] = images[i]
if videos is not None and videos[i] is not None:
processor_kwargs["videos"] = videos[i]
if audios is not None and audios[i] is not None:
audio, sr = audios[i]
processor_kwargs["audio"] = audio
processor_kwargs["sampling_rate"] = sr
inputs = self.processor(**processor_kwargs)
inputs = self.postprocess_inputs(inputs)
all_inputs.append(inputs)
return all_inputs
def generate(
self,
prompts: List[str],
images: Optional[PromptImageInput] = None,
videos: Optional[List[np.ndarray]] = None,
audios: Optional[PromptAudioInput] = None,
**kwargs: Any,
) -> List[Tuple[List[List[int]], List[str]]]:
all_inputs = self.get_inputs(prompts,
images=images,
videos=videos,
audios=audios)
outputs: List[Tuple[List[List[int]], List[str]]] = []
for inputs in all_inputs:
output_ids = self.model.generate(
**self.wrap_device(inputs),
**self.wrap_device(inputs, device=self.model.device.type),
use_cache=True,
**kwargs,
)
......@@ -239,13 +378,17 @@ class HfRunner:
self,
prompts: List[str],
max_tokens: int,
images: Optional[List[Image.Image]] = None,
images: Optional[PromptImageInput] = None,
videos: Optional[List[np.ndarray]] = None,
audios: Optional[PromptAudioInput] = None,
**kwargs: Any,
) -> List[Tuple[List[int], str]]:
outputs = self.generate(prompts,
do_sample=False,
max_new_tokens=max_tokens,
images=images,
videos=videos,
audios=audios,
**kwargs)
return [(output_ids[0], output_str[0])
......@@ -276,22 +419,20 @@ class HfRunner:
self,
prompts: List[str],
max_tokens: int,
images: Optional[List[Image.Image]] = None,
images: Optional[PromptImageInput] = None,
videos: Optional[List[np.ndarray]] = None,
audios: Optional[PromptAudioInput] = None,
**kwargs: Any,
) -> List[List[torch.Tensor]]:
all_logprobs: List[List[torch.Tensor]] = []
for i, prompt in enumerate(prompts):
processor_kwargs: Dict[str, Any] = {
"text": prompt,
"return_tensors": "pt",
}
if images is not None and images[i] is not None:
processor_kwargs["images"] = images[i]
inputs = self.processor(**processor_kwargs)
all_inputs = self.get_inputs(prompts,
images=images,
videos=videos,
audios=audios)
all_logprobs: List[List[torch.Tensor]] = []
for inputs in all_inputs:
output = self.model.generate(
**self.wrap_device(inputs),
**self.wrap_device(inputs, device=self.model.device.type),
use_cache=True,
do_sample=False,
max_new_tokens=max_tokens,
......@@ -299,45 +440,140 @@ class HfRunner:
return_dict_in_generate=True,
**kwargs,
)
seq_logprobs: List[torch.Tensor] = []
for hidden_states in output.hidden_states:
last_hidden_states = hidden_states[-1][0]
logits = torch.matmul(
last_hidden_states,
self.model.get_output_embeddings().weight.t(),
)
if self.model.get_output_embeddings().bias is not None:
logits += self.model.get_output_embeddings(
).bias.unsqueeze(0)
logprobs = F.log_softmax(logits, dim=-1, dtype=torch.float32)
seq_logprobs.append(logprobs)
seq_logprobs = self._hidden_states_to_seq_logprobs(
output.hidden_states)
all_logprobs.append(seq_logprobs)
return all_logprobs
def _hidden_states_to_seq_logprobs(
self,
hidden_states: Tuple[Tuple[torch.Tensor, ...], ...],
) -> List[torch.Tensor]:
output_embeddings = self.model.get_output_embeddings()
seq_logprobs: List[torch.Tensor] = []
for _, hidden_state in enumerate(hidden_states):
last_hidden_states = hidden_state[-1][0]
logits = torch.matmul(
last_hidden_states.to(output_embeddings.weight.device),
output_embeddings.weight.t(),
)
if getattr(output_embeddings, "bias", None) is not None:
logits += output_embeddings.bias.unsqueeze(0)
logprobs = F.log_softmax(logits, dim=-1, dtype=torch.float32)
seq_logprobs.append(logprobs)
return seq_logprobs
def _hidden_states_to_logprobs(
self,
hidden_states: Tuple[Tuple[torch.Tensor, ...], ...],
num_logprobs: int,
) -> Tuple[List[Dict[int, float]], int]:
seq_logprobs = self._hidden_states_to_seq_logprobs(hidden_states)
output_len = len(hidden_states)
# convert to dict
seq_logprobs_lst: List[Dict[int, float]] = []
for tok_idx, tok_logprobs in enumerate(seq_logprobs):
# drop prompt logprobs
if tok_idx == 0:
tok_logprobs = tok_logprobs[-1, :].reshape(1, -1)
topk = tok_logprobs.topk(num_logprobs)
tok_logprobs_dct = {}
for token_id, logprob in zip(topk.indices[0], topk.values[0]):
tok_logprobs_dct[token_id.item()] = logprob.item()
seq_logprobs_lst.append(tok_logprobs_dct)
return (
seq_logprobs_lst,
output_len,
)
def generate_greedy_logprobs_limit(
self,
prompts: List[str],
max_tokens: int,
num_logprobs: int,
images: Optional[List[Image.Image]] = None,
images: Optional[PromptImageInput] = None,
audios: Optional[PromptAudioInput] = None,
videos: Optional[List[np.ndarray]] = None,
**kwargs: Any,
) -> List[Tuple[List[int], str, List[Dict[int, float]]]]:
) -> List[TokensTextLogprobs]:
all_inputs = self.get_inputs(prompts,
images=images,
videos=videos,
audios=audios)
all_logprobs: List[List[Dict[int, float]]] = []
all_output_ids: List[List[int]] = []
all_output_strs: List[str] = []
for i, prompt in enumerate(prompts):
processor_kwargs: Dict[str, Any] = {
"text": prompt,
"return_tensors": "pt",
}
if images is not None and images[i] is not None:
processor_kwargs["images"] = images[i]
for inputs in all_inputs:
output = self.model.generate(
**self.wrap_device(inputs, device=self.model.device.type),
use_cache=True,
do_sample=False,
max_new_tokens=max_tokens,
output_hidden_states=True,
return_dict_in_generate=True,
**kwargs,
)
inputs = self.processor(**processor_kwargs)
(
seq_logprobs_lst,
output_len,
) = self._hidden_states_to_logprobs(output.hidden_states,
num_logprobs)
all_logprobs.append(seq_logprobs_lst)
seq_ids = output.sequences[0]
output_len = len(seq_logprobs_lst)
output_ids = seq_ids[-output_len:]
all_output_ids.append(output_ids.tolist())
all_output_strs.append(self.tokenizer.decode(output_ids))
outputs = zip(all_output_ids, all_output_strs, all_logprobs)
return [(output_ids, output_str, output_logprobs)
for output_ids, output_str, output_logprobs in outputs]
def generate_encoder_decoder_greedy_logprobs_limit(
self,
encoder_decoder_prompts: List[ExplicitEncoderDecoderPrompt[str, str]],
max_tokens: int,
num_logprobs: int,
**kwargs: Any,
) -> List[TokensTextLogprobs]:
'''
Greedy logprobs generation for vLLM encoder/decoder models
'''
all_logprobs: List[List[Dict[int, float]]] = []
all_output_ids: List[List[int]] = []
all_output_strs: List[str] = []
for (encoder_prompt,
decoder_prompt) in to_enc_dec_tuple_list(encoder_decoder_prompts):
encoder_input_ids = self.wrap_device(
self.tokenizer(encoder_prompt, return_tensors="pt").input_ids,
device=self.model.device.type,
)
if decoder_prompt is None:
decoder_input_ids = None
else:
decoder_input_ids = self.wrap_device(
self.tokenizer(decoder_prompt,
return_tensors="pt").input_ids,
device=self.model.device.type,
)
output = self.model.generate(
**self.wrap_device(inputs),
encoder_input_ids,
decoder_input_ids=decoder_input_ids,
use_cache=True,
do_sample=False,
max_new_tokens=max_tokens,
......@@ -346,37 +582,14 @@ class HfRunner:
**kwargs,
)
seq_logprobs: List[torch.Tensor] = []
for _, hidden_states in enumerate(output.hidden_states):
last_hidden_states = hidden_states[-1][0]
logits = torch.matmul(
last_hidden_states,
self.model.get_output_embeddings().weight.t(),
)
if getattr(self.model.get_output_embeddings(), "bias",
None) is not None:
logits += self.model.get_output_embeddings(
).bias.unsqueeze(0)
logprobs = F.log_softmax(logits, dim=-1, dtype=torch.float32)
seq_logprobs.append(logprobs)
# convert to dict
seq_logprobs_lst: List[Dict[int, float]] = []
for tok_idx, tok_logprobs in enumerate(seq_logprobs):
# drop prompt logprobs
if tok_idx == 0:
tok_logprobs = tok_logprobs[-1, :].reshape(1, -1)
topk = tok_logprobs.topk(num_logprobs)
tok_logprobs_dct = {}
for token_id, logprob in zip(topk.indices[0], topk.values[0]):
tok_logprobs_dct[token_id.item()] = logprob.item()
seq_logprobs_lst.append(tok_logprobs_dct)
(
seq_logprobs_lst,
output_len,
) = self._hidden_states_to_logprobs(output.decoder_hidden_states,
num_logprobs)
all_logprobs.append(seq_logprobs_lst)
seq_ids = output.sequences[0]
output_len = len(seq_logprobs_lst)
output_ids = seq_ids[-output_len:]
all_output_ids.append(output_ids.tolist())
all_output_strs.append(self.tokenizer.decode(output_ids))
......@@ -416,7 +629,7 @@ class VllmRunner:
block_size: int = 16,
enable_chunked_prefill: bool = False,
swap_space: int = 4,
enforce_eager: bool = False,
enforce_eager: Optional[bool] = False,
**kwargs,
) -> None:
self.model = LLM(
......@@ -434,20 +647,50 @@ class VllmRunner:
**kwargs,
)
def generate(
def get_inputs(
self,
prompts: List[str],
sampling_params: SamplingParams,
images: Optional[List[Image.Image]] = None,
) -> List[Tuple[List[List[int]], List[str]]]:
images: Optional[PromptImageInput] = None,
videos: Optional[PromptVideoInput] = None,
audios: Optional[PromptAudioInput] = None,
) -> List[TextPrompt]:
if images is not None:
assert len(prompts) == len(images)
if videos is not None:
assert len(prompts) == len(videos)
if audios is not None:
assert len(prompts) == len(audios)
inputs = [TextPrompt(prompt=prompt) for prompt in prompts]
if images is not None:
for i, image in enumerate(images):
inputs[i]["multi_modal_data"] = {"image": image}
if videos is not None:
for i, video in enumerate(videos):
inputs[i]["multi_modal_data"] = {"video": video}
if audios is not None:
for i, audio in enumerate(audios):
inputs[i]["multi_modal_data"] = {"audio": audio}
return inputs
def generate(
self,
prompts: List[str],
sampling_params: SamplingParams,
images: Optional[PromptImageInput] = None,
videos: Optional[PromptVideoInput] = None,
audios: Optional[PromptAudioInput] = None,
) -> List[Tuple[List[List[int]], List[str]]]:
inputs = self.get_inputs(prompts,
images=images,
videos=videos,
audios=audios)
req_outputs = self.model.generate(inputs,
sampling_params=sampling_params)
......@@ -465,41 +708,79 @@ class VllmRunner:
outputs.append((req_sample_output_ids, req_sample_output_strs))
return outputs
@staticmethod
def _final_steps_generate_w_logprobs(
req_outputs: List[RequestOutput],
) -> List[TokensTextLogprobsPromptLogprobs]:
outputs: List[TokensTextLogprobsPromptLogprobs] = []
for req_output in req_outputs:
assert len(req_output.outputs) > 0
for sample in req_output.outputs:
output_str = sample.text
output_ids = list(sample.token_ids)
output_logprobs = sample.logprobs
outputs.append((output_ids, output_str, output_logprobs,
req_output.prompt_logprobs))
return outputs
def generate_w_logprobs(
self,
prompts: List[str],
sampling_params: SamplingParams,
images: Optional[List[Image.Image]] = None,
) -> List[Tuple[List[int], str, Optional[SampleLogprobs]]]:
assert sampling_params.logprobs is not None
images: Optional[PromptImageInput] = None,
audios: Optional[PromptAudioInput] = None,
videos: Optional[PromptVideoInput] = None,
) -> Union[List[TokensTextLogprobs],
List[TokensTextLogprobsPromptLogprobs]]:
inputs = self.get_inputs(prompts,
images=images,
videos=videos,
audios=audios)
if images is not None:
assert len(prompts) == len(images)
req_outputs = self.model.generate(inputs,
sampling_params=sampling_params)
inputs = [TextPrompt(prompt=prompt) for prompt in prompts]
if images is not None:
for i, image in enumerate(images):
inputs[i]["multi_modal_data"] = {"image": image}
toks_str_logsprobs_prompt_logprobs = (
self._final_steps_generate_w_logprobs(req_outputs))
# Omit prompt logprobs if not required by sampling params
return ([x[0:-1] for x in toks_str_logsprobs_prompt_logprobs]
if sampling_params.prompt_logprobs is None else
toks_str_logsprobs_prompt_logprobs)
req_outputs = self.model.generate(inputs,
def generate_encoder_decoder_w_logprobs(
self,
encoder_decoder_prompts: List[ExplicitEncoderDecoderPrompt[str, str]],
sampling_params: SamplingParams,
) -> Union[List[TokensTextLogprobs],
List[TokensTextLogprobsPromptLogprobs]]:
'''
Logprobs generation for vLLM encoder/decoder models
'''
assert sampling_params.logprobs is not None
req_outputs = self.model.generate(encoder_decoder_prompts,
sampling_params=sampling_params)
outputs: List[Tuple[List[int], str, Optional[SampleLogprobs]]] = []
for req_output in req_outputs:
for sample in req_output.outputs:
output_str = sample.text
output_ids = sample.token_ids
output_logprobs = sample.logprobs
outputs.append((output_ids, output_str, output_logprobs))
return outputs
toks_str_logsprobs_prompt_logprobs = (
self._final_steps_generate_w_logprobs(req_outputs))
# Omit prompt logprobs if not required by sampling params
return ([x[0:-1] for x in toks_str_logsprobs_prompt_logprobs]
if sampling_params.prompt_logprobs is None else
toks_str_logsprobs_prompt_logprobs)
def generate_greedy(
self,
prompts: List[str],
max_tokens: int,
images: Optional[List[Image.Image]] = None,
images: Optional[PromptImageInput] = None,
videos: Optional[PromptVideoInput] = None,
audios: Optional[PromptAudioInput] = None,
) -> List[Tuple[List[int], str]]:
greedy_params = SamplingParams(temperature=0.0, max_tokens=max_tokens)
outputs = self.generate(prompts, greedy_params, images=images)
outputs = self.generate(prompts,
greedy_params,
images=images,
videos=videos,
audios=audios)
return [(output_ids[0], output_str[0])
for output_ids, output_str in outputs]
......@@ -508,33 +789,62 @@ class VllmRunner:
prompts: List[str],
max_tokens: int,
num_logprobs: int,
images: Optional[Union[List[Image.Image],
List[List[Image.Image]]]] = None,
num_prompt_logprobs: Optional[int] = None,
images: Optional[PromptImageInput] = None,
audios: Optional[PromptAudioInput] = None,
videos: Optional[PromptVideoInput] = None,
stop_token_ids: Optional[List[int]] = None,
) -> List[Tuple[List[int], str, Optional[SampleLogprobs]]]:
greedy_logprobs_params = SamplingParams(temperature=0.0,
max_tokens=max_tokens,
logprobs=num_logprobs,
stop_token_ids=stop_token_ids)
outputs = self.generate_w_logprobs(prompts,
greedy_logprobs_params,
images=images)
) -> Union[List[TokensTextLogprobs],
List[TokensTextLogprobsPromptLogprobs]]:
greedy_logprobs_params = SamplingParams(
temperature=0.0,
max_tokens=max_tokens,
logprobs=num_logprobs,
prompt_logprobs=num_prompt_logprobs,
stop_token_ids=stop_token_ids)
return self.generate_w_logprobs(prompts,
greedy_logprobs_params,
images=images,
audios=audios,
videos=videos)
def generate_encoder_decoder_greedy_logprobs(
self,
encoder_decoder_prompts: List[ExplicitEncoderDecoderPrompt[str, str]],
max_tokens: int,
num_logprobs: int,
num_prompt_logprobs: Optional[int] = None,
) -> Union[List[TokensTextLogprobs],
List[TokensTextLogprobsPromptLogprobs]]:
greedy_logprobs_params = SamplingParams(
temperature=0.0,
max_tokens=max_tokens,
logprobs=num_logprobs,
prompt_logprobs=(num_prompt_logprobs),
)
'''
Greedy logprobs generation for vLLM encoder/decoder models
'''
return [(output_ids, output_str, output_logprobs)
for output_ids, output_str, output_logprobs in outputs]
return self.generate_encoder_decoder_w_logprobs(
encoder_decoder_prompts, greedy_logprobs_params)
def generate_beam_search(
self,
prompts: List[str],
prompts: Union[List[str], List[List[int]]],
beam_width: int,
max_tokens: int,
) -> List[Tuple[List[List[int]], List[str]]]:
beam_search_params = SamplingParams(n=beam_width,
use_beam_search=True,
temperature=0.0,
max_tokens=max_tokens)
outputs = self.generate(prompts, beam_search_params)
return outputs
outputs = self.model.beam_search(
prompts,
BeamSearchParams(beam_width=beam_width, max_tokens=max_tokens))
returned_outputs = []
for output in outputs:
token_ids = [x.tokens for x in output.sequences]
texts = [x.text for x in output.sequences]
returned_outputs.append((token_ids, texts))
return returned_outputs
def encode(self, prompts: List[str]) -> List[List[float]]:
req_outputs = self.model.encode(prompts)
......@@ -593,3 +903,66 @@ def num_gpus_available():
in current process."""
return cuda_device_count_stateless()
temp_dir = tempfile.gettempdir()
_dummy_opt_path = os.path.join(temp_dir, "dummy_opt")
_dummy_llava_path = os.path.join(temp_dir, "dummy_llava")
_dummy_gemma2_embedding_path = os.path.join(temp_dir, "dummy_gemma2_embedding")
@pytest.fixture
def dummy_opt_path():
json_path = os.path.join(_dummy_opt_path, "config.json")
if not os.path.exists(_dummy_opt_path):
snapshot_download(repo_id="facebook/opt-125m",
local_dir=_dummy_opt_path,
ignore_patterns=[
"*.bin", "*.bin.index.json", "*.pt", "*.h5",
"*.msgpack"
])
assert os.path.exists(json_path)
with open(json_path, "r") as f:
config = json.load(f)
config["architectures"] = ["MyOPTForCausalLM"]
with open(json_path, "w") as f:
json.dump(config, f)
return _dummy_opt_path
@pytest.fixture
def dummy_llava_path():
json_path = os.path.join(_dummy_llava_path, "config.json")
if not os.path.exists(_dummy_llava_path):
snapshot_download(repo_id="llava-hf/llava-1.5-7b-hf",
local_dir=_dummy_llava_path,
ignore_patterns=[
"*.bin", "*.bin.index.json", "*.pt", "*.h5",
"*.msgpack"
])
assert os.path.exists(json_path)
with open(json_path, "r") as f:
config = json.load(f)
config["architectures"] = ["MyLlava"]
with open(json_path, "w") as f:
json.dump(config, f)
return _dummy_llava_path
@pytest.fixture
def dummy_gemma2_embedding_path():
json_path = os.path.join(_dummy_gemma2_embedding_path, "config.json")
if not os.path.exists(_dummy_gemma2_embedding_path):
snapshot_download(repo_id="BAAI/bge-multilingual-gemma2",
local_dir=_dummy_gemma2_embedding_path,
ignore_patterns=[
"*.bin", "*.bin.index.json", "*.pt", "*.h5",
"*.msgpack"
])
assert os.path.exists(json_path)
with open(json_path, "r") as f:
config = json.load(f)
config["architectures"] = ["MyGemma2Embedding"]
with open(json_path, "w") as f:
json.dump(config, f)
return _dummy_gemma2_embedding_path
......@@ -21,32 +21,32 @@ from .conftest import get_token_ids_from_llm_generator
"num_gpu_blocks_override": 5 * (64 + 1),
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
"use_v2_block_manager": False
}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"use_v2_block_manager": True,
"preemption_mode": "swap"
}, {
"use_v2_block_manager": True,
"preemption_mode": "recompute"
}])
@pytest.mark.parametrize("batch_size", [10])
@pytest.mark.parametrize("seed", [1])
def test_v1_v2_greedy_equality_with_preemption(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify block manager v2 produces same outputs as block manager v1, even
when there is preemption.
def test_block_manager_with_preemption(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify block manager produces same outputs even when there is preemption.
This constructs two LLM, each with limited number of GPU blocks. The limit
is decided such that as the sequences in the batch grow, sequences must be
preempted and removed from cache.
If the output token ids are equivalent, then we have confidence that the KV
cache is not corrupted in the v2 block manager.
cache is not corrupted.
NOTE: We want a significant number of generated tokens so that any incorrect
KV mapping has time to build up error.
NOTE(Kuntai): Though we have removed block manager v1, this test is still
useful as it asserts the behavior of block manager v2 (now it is called
SelfAttnBlockSpaceManager) is the same when swapping / preemption, so we
keep this test.
"""
output_len = 1024
temperature = 0.0
......@@ -70,78 +70,9 @@ def test_v1_v2_greedy_equality_with_preemption(baseline_llm_generator,
temperature=temperature,
)
print('Getting token ids from block manager v1')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids from block manager v2')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
for expected_token_ids, actual_token_ids in zip(baseline_token_ids,
test_token_ids):
assert expected_token_ids == actual_token_ids
assert baseline_token_ids == test_token_ids
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
# Use a small model for a fast test.
"model": "facebook/opt-125m",
# skip cuda graph creation for fast test.
"enforce_eager": True,
# Use a large block size to trigger more copy-on-writes.
"block_size": 32,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
"use_v2_block_manager": False
}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"use_v2_block_manager": True,
"preemption_mode": "swap"
}, {
"use_v2_block_manager": True,
"preemption_mode": "recompute"
}])
@pytest.mark.parametrize("batch_size", [10])
@pytest.mark.parametrize("seed", [1])
def test_v1_v2_greedy_equality_with_cow(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify beam search equality with block manager v1 and v2.
This requires copy-on-writes; if the v1 and v2 output is the same, then
we have some confidence cow is working.
"""
output_len = 128
temperature = 0.0
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
prompts = [prompt for prompt, _ in zip(cycle(prompts), range(batch_size))]
sampling_params = SamplingParams(
max_tokens=output_len,
ignore_eos=True,
temperature=temperature,
use_beam_search=True,
best_of=2,
)
print('Getting token ids from block manager v1')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids from block manager v2')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
......@@ -164,9 +95,6 @@ def test_v1_v2_greedy_equality_with_cow(baseline_llm_generator,
# skip cuda graph creation for fast test.
"enforce_eager": True,
# Lookahead scheduling only supported in v2 block manager.
"use_v2_block_manager": True,
}])
@pytest.mark.parametrize(
"per_test_common_llm_kwargs",
......@@ -261,32 +189,39 @@ def test_lookahead_greedy_equality_with_preemption(baseline_llm_generator,
# skip cuda graph creation for fast test.
"enforce_eager": True,
"enable_chunked_prefill": True,
"max_num_batched_tokens": 2,
"max_num_seqs": 2,
},
])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("per_test_common_llm_kwargs",
[{
"block_size": 8,
"max_num_batched_tokens": 2,
"max_num_seqs": 2,
}, {
"block_size": 8,
"max_num_batched_tokens": 3,
"max_num_seqs": 2,
}, {
"block_size": 8,
"max_num_batched_tokens": 256,
"max_num_seqs": 10,
}])
@pytest.mark.parametrize("baseline_llm_kwargs", [
{
"use_v2_block_manager": False,
},
{},
])
@pytest.mark.parametrize("test_llm_kwargs", [
{
"use_v2_block_manager": True,
"num_lookahead_slots": 0,
},
{
"use_v2_block_manager": True,
"num_lookahead_slots": 5,
},
])
@pytest.mark.parametrize("batch_size", [4])
@pytest.mark.parametrize("seed", [1])
def test_chunked_prefill_block_manager_v2(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify that chunked prefill works with BlockManagerV2, with and without
lookahead scheduling.
def test_chunked_prefill_block_manager(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify that chunked prefill works with SelfAttnBlockSpaceManager,
with and without lookahead scheduling.
"""
output_len = 32
temperature = 0.0
......@@ -294,6 +229,7 @@ def test_chunked_prefill_block_manager_v2(baseline_llm_generator,
prompts = [
"Hello, my name is",
"The president of the United States is",
("1 + " * 50) + " 1 = ", # Longer prompt.
"The capital of France is",
"The future of AI is",
]
......@@ -306,11 +242,11 @@ def test_chunked_prefill_block_manager_v2(baseline_llm_generator,
temperature=temperature,
)
print('Getting token ids with BlockManagerV1')
print('Getting token ids with BlockManager')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids with BlockManagerV2')
print('Getting token ids with BlockManager, with lookahead slots.')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
......@@ -338,32 +274,32 @@ def test_chunked_prefill_block_manager_v2(baseline_llm_generator,
"enable_prefix_caching": True,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
"use_v2_block_manager": False
}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"use_v2_block_manager": True,
"preemption_mode": "swap"
}, {
"use_v2_block_manager": True,
"preemption_mode": "recompute"
}])
@pytest.mark.parametrize("batch_size", [10])
@pytest.mark.parametrize("seed", [1])
def test_v1_v2_greedy_equality_prefix_caching_enabled_with_preemption(
def test_block_manager_prefix_caching_enabled_with_preemption(
baseline_llm_generator, test_llm_generator, batch_size):
"""Verify block manager v2 produces same outputs as block manager v1, even
when there is preemption.
"""Verify block manager produces same outputs even when there is preemption.
This constructs two LLM, each with limited number of GPU blocks. The limit
is decided such that as the sequences in the batch grow, sequences must be
preempted and removed from cache.
If the output token ids are equivalent, then we have confidence that the KV
cache is not corrupted in the v2 block manager.
cache is not corrupted.
NOTE: We want a significant number of generated tokens so that any incorrect
KV mapping has time to build up error.
NOTE(Kuntai): Though we have removed block manager v1, this test is still
useful as it asserts the behavior of block manager v2 (now it is called
SelfAttnBlockSpaceManager) is the same when swapping / preemption, so we
keep this test.
"""
output_len = 1024
temperature = 0.0
......@@ -387,11 +323,11 @@ def test_v1_v2_greedy_equality_prefix_caching_enabled_with_preemption(
temperature=temperature,
)
print('Getting token ids from block manager v1')
print('Getting token ids from block manager')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids from block manager v2')
print('Getting token ids from block manager, with preemption')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
......@@ -414,9 +350,6 @@ def test_v1_v2_greedy_equality_prefix_caching_enabled_with_preemption(
# Allow only 5 sequences of ~1024 tokens in worst case.
"block_size": 16,
"num_gpu_blocks_override": 5 * (64 + 1),
# Test APC in v2 block
"use_v2_block_manager": True,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
......@@ -492,9 +425,6 @@ def test_auto_prefix_caching_with_preemption(baseline_llm_generator,
"max_model_len": 48,
"block_size": 16,
"num_gpu_blocks_override": 3,
# Test APC in v2 block
"use_v2_block_manager": True,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
......
......@@ -24,10 +24,8 @@ BLOCK_SIZE = 16
"num_gpu_blocks_override": 100000 // BLOCK_SIZE,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
"use_v2_block_manager": False
}])
@pytest.mark.parametrize("test_llm_kwargs", [{"use_v2_block_manager": True}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{}])
@pytest.mark.parametrize("batch_size", [5])
@pytest.mark.parametrize("seed", [1])
def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator,
......@@ -48,7 +46,6 @@ def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator,
prompts, answer, indices = prep_prompts(batch_size)
print('Getting token ids from block manager v1')
baseline_texts = get_text_from_llm_generator(baseline_llm_generator,
prompts,
sampling_params,
......@@ -84,10 +81,7 @@ def test_sliding_window_retrival(baseline_llm_generator, test_llm_generator,
"num_gpu_blocks_override": 100000 // BLOCK_SIZE,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"use_v2_block_manager": True,
"enable_chunked_prefill": True
}])
@pytest.mark.parametrize("test_llm_kwargs", [{"enable_chunked_prefill": True}])
@pytest.mark.parametrize("batch_size", [5])
@pytest.mark.parametrize("seed", [1])
def test_sliding_window_chunked_prefill(test_llm_generator, batch_size, seed):
......
......@@ -2,7 +2,7 @@ import pytest
from vllm.core.block.utils import (STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE,
STR_NOT_IMPL_ENC_DEC_SWA)
from vllm.core.block_manager_v2 import BlockSpaceManagerV2
from vllm.core.block_manager import SelfAttnBlockSpaceManager
from vllm.core.interfaces import AllocStatus
from vllm.sequence import Logprob, SequenceStatus
from vllm.utils import chunk_list
......@@ -17,7 +17,7 @@ from ..utils import (create_dummy_prompt, create_seq_group,
@pytest.mark.parametrize("watermark", [0.0, 0.5])
def test_can_allocate_seq_group(block_size: int, num_seqs_per_group: int,
num_gpu_blocks: int, watermark: float):
block_manager = BlockSpaceManagerV2(
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
......@@ -63,7 +63,7 @@ def test_can_allocate_seq_group_encoder_decoder(block_size: int,
num_seqs_per_group: int,
num_gpu_blocks: int,
watermark: float):
block_manager = BlockSpaceManagerV2(
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
......@@ -117,16 +117,16 @@ def test_can_allocate_encoder_decoder_fails_with_swa(block_size: int,
'''
SWA short for Sliding Window Attention.
At time of writing block manager v2 does not support SWA.
At time of writing block manager does not support SWA.
However even when SWA is implemented for block manager v2,
However even when SWA is implemented for block manager,
there will still most likely be a separate workstream required
to enable SWA for encoder/decoder models.
Therefore this test enforces that one of the following cases
hold true:
1. Block manager v2 does not support SWA at all (true at time of writing)
2. Block manager v2 fails with NotImplementError when SWA is enabled
1. Block manager does not support SWA at all (true at time of writing)
2. Block manager fails with NotImplementError when SWA is enabled
AND a SequenceGroup with an encoder sequence (i.e. in support of an
encoder/decoder model) is passed into can_allocate() as an argument
......@@ -135,7 +135,7 @@ def test_can_allocate_encoder_decoder_fails_with_swa(block_size: int,
'''
with pytest.raises((NotImplementedError, AssertionError)) as exc_info:
block_manager = BlockSpaceManagerV2(
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
......@@ -158,7 +158,7 @@ def test_can_allocate_encoder_decoder_fails_with_swa(block_size: int,
block_manager.can_allocate(seq_group)
# Assert that either
# 1. Block manager v2 constructor fails with assertion that sliding window
# 1. Block manager constructor fails with assertion that sliding window
# is not yet supported (most likely near-term outcome at time of
# writing), or
# 2. can_allocate() fails with NotImplementedError due to combination of
......@@ -177,7 +177,7 @@ def test_can_allocate_encoder_decoder_fails_with_prefix_cache(
block_size: int, num_seqs_per_group: int, num_gpu_blocks: int,
watermark: float):
block_manager = BlockSpaceManagerV2(
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
......@@ -217,7 +217,7 @@ def test_append_slots(block_size, prompt_len, num_slots_to_append,
num_gpu_blocks = 1024
watermark = 0.1
block_manager = BlockSpaceManagerV2(
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
......@@ -269,14 +269,15 @@ def test_swap(block_size, num_cpu_blocks, num_gpu_blocks, num_lookahead_slots,
"""Verify blocks number on src/desc device is correct after swapping in/out
sequence group (not missing or extra blocks).
"""
block_manager = BlockSpaceManagerV2(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt, seq_group = create_dummy_prompt("1", prompt_length=block_size - 1)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
......@@ -311,6 +312,114 @@ def test_swap(block_size, num_cpu_blocks, num_gpu_blocks, num_lookahead_slots,
assert before_gpu_blocks == after_gpu_blocks + len(cpu_blocks)
@pytest.mark.parametrize("block_size", [8])
@pytest.mark.parametrize("num_gpu_blocks", [4])
@pytest.mark.parametrize("num_lookahead_slots", [3, 8, 10])
@pytest.mark.parametrize("enable_caching", [True, False])
def test_can_swap(block_size, num_gpu_blocks, num_lookahead_slots,
enable_caching):
""" Verify the block manager can correctly determine if a sequence group
can be swapped in/out.
"""
num_cpu_blocks = num_gpu_blocks
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt, seq_group = create_dummy_prompt(
"1", prompt_length=(num_gpu_blocks - 1) * block_size - 1)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
prompt.status = SequenceStatus.RUNNING
# Swap seq group from GPU -> CPU.
gpu_blocks = block_manager.get_block_table(prompt)
assert block_manager.can_swap_out(seq_group)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_out(seq_group)
mapping_keys = [key for key, _ in mapping]
assert mapping_keys == gpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
prompt.status = SequenceStatus.SWAPPED
# At this moment, we still have enough free blocks to swap in the seq group.
if num_lookahead_slots <= block_size:
assert block_manager.can_swap_in(seq_group,
num_lookahead_slots) == AllocStatus.OK
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
# During Swapped out, 2 cached blocks were evicted from the GPU,
# so the prompt1 can't be swapped in
prompt2_len = 2 * block_size - 1
prompt2, seq_group2 = create_dummy_prompt(
"2",
prompt_length=prompt2_len,
prompt_tokens=[10000 + i for i in range(prompt2_len)])
prompt2.status = SequenceStatus.WAITING
block_manager.allocate(seq_group2)
# Swap seq group from CPU -> GPU.
if num_lookahead_slots <= block_size:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.LATER
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
@pytest.mark.parametrize("num_lookahead_slots", [0, 2, 10])
@pytest.mark.parametrize("enable_caching", [False, True])
def test_swap_in_infeasible(num_lookahead_slots, enable_caching):
"""Verifies that swapping fails if there is not enough free blocks
to account for unseen tokens and lookahead_slots.
"""
block_size = 8
num_cpu_blocks = 1
num_gpu_blocks = 1
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt_length = block_size - 3
assert prompt_length > 0
prompt, seq_group = create_dummy_prompt("1", prompt_length=prompt_length)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
token_id = 0
prompt.status = SequenceStatus.RUNNING
prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
# Swap seq group from GPU -> CPU.
assert block_manager.can_swap_out(seq_group)
block_manager.swap_out(seq_group)
prompt.status = SequenceStatus.SWAPPED
# Swap seq group from CPU -> GPU.
# The number of unseen tokens is 1. If the number of existing
# tokens plus the unseen ones and number of lookahead slots exceeds
# the total number of available GPU blocks then the swap
# should fail.
num_unseen_tokens = 1
if (num_lookahead_slots + num_unseen_tokens +
prompt_length) <= (block_size * num_gpu_blocks):
assert block_manager.can_swap_in(seq_group,
num_lookahead_slots) == AllocStatus.OK
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
# TODO(cade/kaiyang): add comprehensive tests for swapping at allocator level.
......@@ -326,7 +435,7 @@ def test_sliding_window(block_size, prompt_len, num_slots_to_append,
num_gpu_blocks = 1024
watermark = 0.1
block_manager = BlockSpaceManagerV2(
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
......@@ -338,7 +447,6 @@ def test_sliding_window(block_size, prompt_len, num_slots_to_append,
if max_n is None:
max_n = min_n
used = num_gpu_blocks - block_manager.get_num_free_gpu_blocks()
#print("check", min_n, used, max_n)
assert min_n <= used
assert used <= max_n
......@@ -367,7 +475,7 @@ def test_sliding_window(block_size, prompt_len, num_slots_to_append,
seq.data.update_num_computed_tokens(prompt_len)
check_used(num_blocks(prompt_len))
# this is how we compute it in BlockSpaceManagerV2.__init__
# this is how we compute it in SelfAttnBlockSpaceManager.__init__
sliding_blocks = (sliding_window // block_size) + 2
# plus one block for null block
sliding_blocks += 1
......
......@@ -100,3 +100,46 @@ class TestNaiveBlockAllocator:
for i, block in enumerate(blocks):
assert allocator.get_num_free_blocks() == i
allocator.free(block)
@staticmethod
@pytest.mark.parametrize("num_blocks", [4])
@pytest.mark.parametrize("block_size", [8])
def test_naive_block_get_num_full_blocks_touched(num_blocks, block_size):
""" Verify the allocator can correctly return the number of
full blocks touched.
"""
allocator_src = NaiveBlockAllocator(create_block=NaiveBlock,
num_blocks=num_blocks,
block_size=block_size)
allocator_dst = NaiveBlockAllocator(create_block=NaiveBlock,
num_blocks=num_blocks,
block_size=block_size)
# Create a chain of cacheable blocks in the dst
allocate_block = TestNaiveBlockAllocator.create_allocate_lambda(
"immutable",
allocator_src,
prev_block=None,
token_ids=list(range(block_size)))
src_blocks = [allocate_block() for _ in range(num_blocks - 1)]
# All blocks are cached
assert allocator_dst.get_num_full_blocks_touched(
src_blocks) == num_blocks - 1
# Insert one non-full block in the src
allocate_non_full_block = \
TestNaiveBlockAllocator.create_allocate_lambda(
"mutable", allocator_src,
prev_block=src_blocks[-1],token_ids=[]
)
src_blocks.append(allocate_non_full_block())
src_blocks[-1].append_token_ids([0])
assert allocator_dst.get_num_full_blocks_touched(
src_blocks) == num_blocks - 1
# Fill up the last source block and then invoke
# get_num_blocks_touched
src_blocks[-1].append_token_ids([0] * (block_size - 1))
assert allocator_dst.get_num_full_blocks_touched(
src_blocks) == num_blocks
......@@ -315,6 +315,61 @@ class TestPrefixCachingBlockAllocator:
i)
allocator.free(block)
@staticmethod
@pytest.mark.parametrize("num_blocks", [4])
@pytest.mark.parametrize("block_size", [8])
def test_prefix_caching_block_get_num_full_blocks_touched(
num_blocks, block_size):
""" Verify the allocator can correctly return the number of
blocks touched, when there are cached prefixes.
"""
allocator_src = PrefixCachingBlockAllocator(num_blocks=num_blocks,
block_size=block_size)
allocator_dst = PrefixCachingBlockAllocator(num_blocks=num_blocks,
block_size=block_size)
# Create token ids that will exhaust all blocks except the last
token_ids = list(range((num_blocks - 1) * block_size))
# Create a chain of cacheable blocks in the dst
cached_blocks = TestPrefixCachingBlockAllocator.create_immutable_chain(
block_size=block_size,
token_ids=token_ids,
allocator=allocator_dst,
)
# Create a chain of the same blocks in the src
blocks_to_swap_in = \
TestPrefixCachingBlockAllocator.create_immutable_chain(
block_size=block_size,
token_ids=token_ids,
allocator=allocator_src,
)
# All blocks are cached
assert allocator_dst.get_num_full_blocks_touched(
blocks_to_swap_in) == 0
# Free the first block in the dst
allocator_dst.free(cached_blocks[0])
# Now the first block becomes dangling, the swapped blocks need
# to reclaim the first block in the dst
assert allocator_dst.get_num_full_blocks_touched(
blocks_to_swap_in) == 1
# Insert one non-full block in the src
non_full_block = allocator_src.allocate_mutable_block(
blocks_to_swap_in[-1])
non_full_block.append_token_ids([0])
blocks_to_swap_in.append(non_full_block)
assert allocator_dst.get_num_full_blocks_touched(
blocks_to_swap_in) == 1
# Fill up the last mutable block and invoke get_num_blocks_touched.
# Note: The last block is not cached so it will be touched.
non_full_block.append_token_ids([0] * (block_size - 1))
assert allocator_dst.get_num_full_blocks_touched(
blocks_to_swap_in) == 2
@staticmethod
@pytest.mark.parametrize("num_blocks", [1024])
@pytest.mark.parametrize("block_size", [16])
......@@ -628,6 +683,63 @@ class TestPrefixCachingBlockAllocator:
assert new_block[0].block_id == last_block_id
# Test case for cache mertics
@staticmethod
def test_metric():
block_size = 16
allocator = PrefixCachingBlockAllocator(num_blocks=4,
block_size=block_size)
# Test when no query (0/0)
assert allocator.get_prefix_cache_hit_rate() == 0.0
token_ids = list(range(block_size))
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids)
# Test 0/1 hit rate
assert allocator.get_prefix_cache_hit_rate() == 0.0
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids)
# Test 1/2 hit rate
assert allocator.get_prefix_cache_hit_rate() == 0.5
# Test more than one block
for _ in range(2, 1005):
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids)
assert allocator.get_prefix_cache_hit_rate() > 0.99
# Test case for marking cache hit blocks as computed right after
# a batch of prefill sequences are scheduled.
@staticmethod
def test_touch_block():
block_size = 16
common_blocks = 4
allocator = PrefixCachingBlockAllocator(num_blocks=8,
block_size=block_size)
common_token_ids = list(range(block_size * common_blocks))
# Mimic the behavior of allocating the same block chain
# (i.e., common prefix) for a batch of 3 different prefill sequences.
for _ in range(3):
blocks = TestPrefixCachingBlockAllocator.create_immutable_chain(
block_size=block_size,
token_ids=common_token_ids,
allocator=allocator,
)
block_ids = [block.block_id for block in blocks]
# The allocated blocks should be marked as touched
# but not computed.
computed_block_ids = allocator.get_computed_block_ids(
[], block_ids, skip_last_block_id=False)
assert len(computed_block_ids) == 0
allocator.mark_blocks_as_computed([])
computed_block_ids = allocator.get_computed_block_ids(
[], block_ids, skip_last_block_id=False)
assert len(computed_block_ids) == common_blocks
@staticmethod
def create_immutable_chain(
block_size: int,
......
import time
from collections import defaultdict
from typing import List
import pytest
from vllm import SamplingParams
from vllm.block import PhysicalTokenBlock
from vllm.core.block.utils import (STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE,
STR_NOT_IMPL_ENC_DEC_SWA)
from vllm.core.block_manager_v1 import (BlockSpaceManagerV1,
UncachedBlockAllocator)
from vllm.core.interfaces import AllocStatus
from vllm.sequence import Logprob, Sequence, SequenceGroup, SequenceStatus
from vllm.utils import Device
from .utils import create_dummy_prompt, create_dummy_prompt_encoder_decoder
def test_block_allocator_allocate():
block_size = 4
num_cpu_blocks = 4
cpu_allocator = UncachedBlockAllocator(Device.CPU, block_size,
num_cpu_blocks)
# Allocate all available cpu blocks.
num_free = num_cpu_blocks
assert cpu_allocator.get_num_free_blocks() == num_free
for _ in range(num_cpu_blocks):
block = cpu_allocator.allocate()
num_free -= 1
assert block not in cpu_allocator.free_blocks
assert cpu_allocator.get_num_free_blocks() == num_free
with pytest.raises(ValueError):
cpu_allocator.allocate()
def test_block_allocator_free():
block_size = 4
num_cpu_blocks = 4
cpu_allocator = UncachedBlockAllocator(Device.CPU, block_size,
num_cpu_blocks)
# Allocate all available cpu blocks.
blocks: List[PhysicalTokenBlock] = []
for _ in range(num_cpu_blocks):
block = cpu_allocator.allocate()
blocks.append(block)
assert block not in cpu_allocator.free_blocks
# Free all allocated cpu blocks.
num_free = 0
assert cpu_allocator.get_num_free_blocks() == num_free
for block in blocks:
cpu_allocator.free(block)
num_free += 1
assert block in cpu_allocator.free_blocks
assert cpu_allocator.get_num_free_blocks() == num_free
with pytest.raises(ValueError):
cpu_allocator.free(block)
def test_allocate():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
# Allocate same sequence group to all available gpu blocks.
for i in range(num_gpu_blocks):
_, seq_group = create_dummy_prompt(str(i), block_size)
assert block_manager.can_allocate(seq_group) == AllocStatus.OK
block_manager.allocate(seq_group)
assert block_manager.can_allocate(seq_group) != AllocStatus.OK
# Allocate same sequence group to all available gpu blocks.
# Use watermark to reserve one gpu block.
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=1 / num_gpu_blocks)
for i in range(num_gpu_blocks - 1):
_, seq_group = create_dummy_prompt(str(i), block_size)
assert block_manager.can_allocate(seq_group) == AllocStatus.OK
block_manager.allocate(seq_group)
assert block_manager.can_allocate(seq_group) != AllocStatus.OK
def test_allocate_encoder_decoder():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_req_per_seq_group = 2
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
# Allocate same sequence group to all available gpu blocks.
for i in range(num_gpu_blocks // block_req_per_seq_group):
_, _, seq_group = create_dummy_prompt_encoder_decoder(
str(i),
decoder_prompt_length=block_size,
encoder_prompt_length=block_size)
assert block_manager.can_allocate(seq_group) == AllocStatus.OK
block_manager.allocate(seq_group)
assert block_manager.can_allocate(seq_group) != AllocStatus.OK
# Allocate same sequence group to all available gpu blocks.
# Use watermark to reserve one gpu block.
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=1 / num_gpu_blocks)
for i in range((num_gpu_blocks - 1) // block_req_per_seq_group):
_, _, seq_group = create_dummy_prompt_encoder_decoder(
str(i),
decoder_prompt_length=block_size,
encoder_prompt_length=block_size)
assert block_manager.can_allocate(seq_group) == AllocStatus.OK
block_manager.allocate(seq_group)
assert block_manager.can_allocate(seq_group) != AllocStatus.OK
def test_allocate_encoder_decoder_fails_with_swa():
# SWA short for sliding window attention
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
sliding_window=5) # swa
# Allocate same sequence group to all available gpu blocks.
_, _, seq_group = create_dummy_prompt_encoder_decoder(
"0",
decoder_prompt_length=block_size,
encoder_prompt_length=block_size)
# Assert that can_allocate() fails due to SWA
with pytest.raises(NotImplementedError) as exc_info:
block_manager.can_allocate(seq_group)
assert str(exc_info.value) == STR_NOT_IMPL_ENC_DEC_SWA
# Assert that allocate() fails due to SWA
with pytest.raises(NotImplementedError) as exc_info:
block_manager.allocate(seq_group)
assert str(exc_info.value) == STR_NOT_IMPL_ENC_DEC_SWA
def test_allocate_encoder_decoder_fails_with_prefix_caching():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=True) # Prefix cache
# Allocate same sequence group to all available gpu blocks.
_, _, seq_group = create_dummy_prompt_encoder_decoder(
"0",
decoder_prompt_length=block_size,
encoder_prompt_length=block_size)
# Assert that can_allocate() fails due to prefix caching
with pytest.raises(NotImplementedError) as exc_info:
block_manager.can_allocate(seq_group)
assert str(exc_info.value) == STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE
# Assert that allocate() fails due to prefix caching
with pytest.raises(NotImplementedError) as exc_info:
block_manager.allocate(seq_group)
assert str(exc_info.value) == STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE
def test_append_slot_single_seq():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
# Allocate single seq to gpu block.
prompt, seq_group = create_dummy_prompt("1", block_size)
block_manager.allocate(seq_group)
# Nothing to append. Sequence has no new logical blocks.
assert block_manager.can_append_slots(seq_group)
before_blocks = block_manager.get_num_free_gpu_blocks()
assert not block_manager.append_slots(prompt)
after_blocks = block_manager.get_num_free_gpu_blocks()
assert before_blocks == after_blocks
# Add block_size number of new tokens and append slot.
for i in range(block_size):
token_id = i + 5
prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
assert block_manager.can_append_slots(seq_group)
before_blocks = block_manager.get_num_free_gpu_blocks()
assert not block_manager.append_slots(prompt)
after_blocks = block_manager.get_num_free_gpu_blocks()
assert before_blocks - after_blocks == 1
def test_append_slot_cow():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size=block_size,
num_cpu_blocks=num_cpu_blocks,
num_gpu_blocks=num_gpu_blocks,
watermark=0)
# Allocate prompt to gpu block. There is one slot left in the block.
prompt = Sequence(seq_id=1,
inputs={
"prompt": "one two three",
"prompt_token_ids": [1, 2, 3],
},
block_size=block_size)
# Fork the sequence, such that a COW will be required when we append a new
# token id.
child = prompt.fork(new_seq_id=2)
# Allocate space for the sequence group.
seq_group = SequenceGroup(request_id="1",
seqs=[prompt, child],
arrival_time=time.time(),
sampling_params=SamplingParams())
block_manager.allocate(seq_group)
# Fork and append a new token id. We expect a COW to be scheduled.
token_id = 4
child.append_token_id(token_id, {token_id: Logprob(0.0)})
block_manager.fork(prompt, child)
assert block_manager.can_append_slots(seq_group)
before_blocks = block_manager.get_num_free_gpu_blocks()
cows = block_manager.append_slots(child)
assert cows
dict_cows = defaultdict(list)
for src_block, dst_block in cows:
dict_cows[src_block].append(dst_block)
for src_block, dst_blocks in dict_cows.items():
assert src_block not in dst_blocks
after_blocks = block_manager.get_num_free_gpu_blocks()
assert before_blocks - after_blocks == 1
def test_fork():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
prompt, seq_group = create_dummy_prompt("1",
block_size - 1,
block_size=block_size)
block_manager.allocate(seq_group)
# Fork prompt and copy block tables.
child = prompt.fork(2)
block_manager.fork(prompt, child)
assert block_manager.get_block_table(
prompt) == block_manager.get_block_table(child)
token_id = 4
# Append token to child. Block is shared so copy on write occurs.
child.append_token_id(token_id, {token_id: Logprob(0.0)})
block_manager.append_slots(child)
assert block_manager.get_block_table(
prompt) != block_manager.get_block_table(child)
def test_swap():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
prompt, seq_group = create_dummy_prompt("1", prompt_length=block_size - 1)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
token_id = 0
prompt.status = SequenceStatus.RUNNING
prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
# Swap seq group from GPU -> CPU.
gpu_blocks = block_manager.get_block_table(prompt)
assert block_manager.can_swap_out(seq_group)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_out(seq_group)
assert [x[0] for x in mapping] == gpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
prompt.status = SequenceStatus.SWAPPED
# Swap seq group from CPU -> GPU.
cpu_blocks = block_manager.get_block_table(prompt)
assert block_manager.can_swap_in(seq_group) == AllocStatus.OK
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_in(seq_group)
assert [x[0] for x in mapping] == cpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks + len(cpu_blocks) == after_cpu_blocks
assert before_gpu_blocks == after_gpu_blocks + len(cpu_blocks)
def test_swap_encoder_decoder():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
decoder_prompt, encoder_prompt, seq_group = \
create_dummy_prompt_encoder_decoder(
"1",
decoder_prompt_length=block_size,
encoder_prompt_length=block_size)
decoder_prompt.status = SequenceStatus.WAITING
encoder_prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
token_id = 0
decoder_prompt.status = SequenceStatus.RUNNING
decoder_prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
# Swap encoder/decoder seq group from GPU -> CPU.
decoder_gpu_blocks = block_manager.get_block_table(decoder_prompt)
cross_gpu_blocks = block_manager.get_cross_block_table(seq_group)
gpu_blocks = decoder_gpu_blocks + cross_gpu_blocks
assert block_manager.can_swap_out(seq_group)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_out(seq_group)
assert [x[0] for x in mapping] == gpu_blocks
#assert list(mapping.keys()) == gpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
decoder_prompt.status = SequenceStatus.SWAPPED
# Swap encoder/decoder seq group from CPU -> GPU.
decoder_cpu_blocks = block_manager.get_block_table(decoder_prompt)
cross_cpu_blocks = block_manager.get_cross_block_table(seq_group)
cpu_blocks = decoder_cpu_blocks + cross_cpu_blocks
assert block_manager.can_swap_in(seq_group) == AllocStatus.OK
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_in(seq_group)
assert [x[0] for x in mapping] == cpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks + len(cpu_blocks) == after_cpu_blocks
assert before_gpu_blocks == after_gpu_blocks + len(cpu_blocks)
def test_free():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
prompt, seq_group = create_dummy_prompt("1", block_size)
block_manager.allocate(seq_group)
# Free allocated seq.
prompt_blocks = len(block_manager.get_block_table(prompt))
before_blocks = block_manager.get_num_free_gpu_blocks()
block_manager.free(prompt)
after_blocks = block_manager.get_num_free_gpu_blocks()
assert after_blocks == before_blocks + prompt_blocks
# Block table for freed seq is deleted.
with pytest.raises(KeyError):
block_manager.get_block_table(prompt)
def test_free_encoder_decoder():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
decoder_prompt, encoder_prompt, seq_group = \
create_dummy_prompt_encoder_decoder(
"1",
decoder_prompt_length=block_size,
encoder_prompt_length=block_size)
block_manager.allocate(seq_group)
# Free allocated seq.
decoder_prompt_blocks = len(block_manager.get_block_table(decoder_prompt))
encoder_prompt_blocks = len(block_manager.get_cross_block_table(seq_group))
prompt_blocks = decoder_prompt_blocks + encoder_prompt_blocks
before_blocks = block_manager.get_num_free_gpu_blocks()
block_manager.free(decoder_prompt)
block_manager.free_cross(seq_group)
after_blocks = block_manager.get_num_free_gpu_blocks()
assert after_blocks == before_blocks + prompt_blocks
# Block table for freed encoder & decoder seq's are deleted.
with pytest.raises(KeyError):
block_manager.get_block_table(decoder_prompt)
# Block table for freed encoder & decoder seq's are deleted.
with pytest.raises(KeyError):
block_manager.get_block_table(encoder_prompt)
def test_reset():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
# Allocate same seq group on all available gpu blocks.
original_blocks = block_manager.get_num_free_gpu_blocks()
for i in range(num_gpu_blocks):
_, seq_group = create_dummy_prompt(str(i), block_size)
block_manager.allocate(seq_group)
assert block_manager.get_num_free_gpu_blocks() == 0
# Resetting block manager frees all allocated blocks.
block_manager.reset()
assert block_manager.get_num_free_gpu_blocks() == original_blocks
def test_reset_encoder_decoder():
block_size = 4
num_cpu_blocks = 4
num_gpu_blocks = 4
block_req_per_seq_group = 2
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0)
# Allocate same seq group on all available gpu blocks.
original_blocks = block_manager.get_num_free_gpu_blocks()
for i in range(num_gpu_blocks // block_req_per_seq_group):
_, _, seq_group = create_dummy_prompt_encoder_decoder(
f"{i}",
decoder_prompt_length=block_size,
encoder_prompt_length=block_size)
block_manager.allocate(seq_group)
assert block_manager.get_num_free_gpu_blocks() == 0
# Resetting block manager frees all allocated blocks.
block_manager.reset()
assert block_manager.get_num_free_gpu_blocks() == original_blocks
def test_sliding_window_multi_seq():
"""
Tests that memory allocation and deallocation is handled
correctly with multiple sequences that exceed the sliding
window's capacity.
"""
block_size = 1
num_cpu_blocks = 8
num_gpu_blocks = 8
sliding_window = 2
block_manager = BlockSpaceManagerV1(block_size,
num_cpu_blocks,
num_gpu_blocks,
sliding_window=sliding_window,
watermark=0)
assert block_manager.get_num_free_gpu_blocks() == num_gpu_blocks
parent = Sequence(seq_id=1,
inputs={
"prompt": "one two three",
"prompt_token_ids": [0, 1, 2],
},
block_size=block_size)
seq_group = SequenceGroup(request_id="1",
seqs=[parent],
arrival_time=time.time(),
sampling_params=SamplingParams(),
lora_request=None)
block_manager.allocate(seq_group)
# assert the number of blocks allocated is correct
# the parent seq has len 3, but since sliding_window is 2,
# we will use at most 2 blocks
assert block_manager.get_num_free_gpu_blocks(
) == num_gpu_blocks - sliding_window
# Fork prompt and copy block tables.
child = parent.fork(2)
block_manager.fork(parent, child)
# assert the number of blocks allocated is correct
# forking does not increase memory consumption
assert block_manager.get_num_free_gpu_blocks(
) == num_gpu_blocks - sliding_window
# assert both parent and child share all blocks
assert block_manager.get_block_table(
parent) == block_manager.get_block_table(child)
token_id = 4
# Append token to child. Block is shared so copy on write occurs.
child.append_token_id(token_id, {token_id: Logprob(0.0)})
block_manager.append_slots(child)
# assert the number of blocks allocated is correct
# we will use now one block more. Each seq will use 2 blocks,
# but only one can be shared
assert block_manager.get_num_free_gpu_blocks(
) == num_gpu_blocks - sliding_window - 1
token_id = 5
parent.append_token_id(token_id, {token_id: Logprob(0.0)})
block_manager.append_slots(parent)
# assert the number of blocks allocated is correct
# no change, because both sequences are still just sharing one block
assert block_manager.get_num_free_gpu_blocks(
) == num_gpu_blocks - sliding_window - 1
block_table_parent = block_manager.get_block_table(parent)
block_table_child = block_manager.get_block_table(child)
assert block_table_parent != block_table_child
# assert both blocks are sharing the second-last block
assert block_table_parent[-2] == block_table_child[-2]
# now let's clean up...
block_manager.free(parent)
# assert the number of blocks allocated is correct
# We have freed one seq, reducing the ref count of two blocks by one.
# One of the two was only used by the parent seq, so this is now free.
# The child seq still consumes sliding_window blocks
assert block_manager.get_num_free_gpu_blocks(
) == num_gpu_blocks - sliding_window
# free all blocks
block_manager.free(child)
# assert all blocks are free now
assert block_manager.get_num_free_gpu_blocks() == num_gpu_blocks
......@@ -21,7 +21,7 @@ def append_new_token(seq_group, token_id: int):
def schedule_and_update_computed_tokens(scheduler):
metas, out = scheduler.schedule()
metas, out, _ = scheduler.schedule()
for s, meta in zip(out.scheduled_seq_groups, metas):
s.seq_group.update_num_computed_tokens(meta.token_chunk_size)
return metas, out
......@@ -45,7 +45,9 @@ def test_simple():
# Add seq groups to scheduler.
for i in range(num_seq_group):
_, seq_group = create_dummy_prompt(str(i), prompt_length=block_size)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=block_size,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
......@@ -75,24 +77,29 @@ def test_chunk():
max_seqs = 60
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# Verify the second request is chunked.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
print()
assert set(get_sequence_groups(out)) == set(running)
assert seq_group_meta[0].token_chunk_size == 60
# Verify it is chunked.
......@@ -118,19 +125,23 @@ def test_complex():
max_seqs = 60
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 64
cache_config.num_gpu_blocks = 64
scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
......@@ -151,7 +162,9 @@ def test_complex():
# Add 2 more requests.
for i in range(2, 4):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
......@@ -180,12 +193,14 @@ def test_maximal_decoding():
"""Verify decoding requests are prioritized."""
block_size = 4
max_seqs = 2
max_model_len = 2
max_model_len = 8
max_num_batched_tokens = 2
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -194,7 +209,9 @@ def test_maximal_decoding():
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i), prompt_length=2)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
......@@ -211,7 +228,9 @@ def test_maximal_decoding():
append_new_token(running[0], 1)
# Create one more seq_group.
_, seq_group = create_dummy_prompt("3", prompt_length=2)
_, seq_group = create_dummy_prompt("3",
prompt_length=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
......@@ -269,17 +288,21 @@ def test_prompt_limit():
max_seqs = 32
max_model_len = 64
max_num_batched_tokens = 32
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = []
_, seq_group = create_dummy_prompt("1", prompt_length=48)
_, seq_group = create_dummy_prompt("1",
prompt_length=48,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
......@@ -303,12 +326,13 @@ def test_prompt_limit_exceed():
max_model_len,
enable_chunked_prefill=True)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = []
_, seq_group = create_dummy_prompt("2", prompt_length=48)
_, seq_group = create_dummy_prompt("2",
prompt_length=48,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
......@@ -323,16 +347,21 @@ def test_swap():
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
......@@ -374,16 +403,21 @@ def test_running_prefill_prioritized_over_swap():
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
......@@ -413,7 +447,9 @@ def test_running_prefill_prioritized_over_swap():
scheduler.block_manager.can_swap_in = MagicMock()
scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
_, seq_group2 = create_dummy_prompt("2", prompt_length=60)
_, seq_group2 = create_dummy_prompt("2",
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group2)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
......@@ -461,16 +497,20 @@ def test_chunked_prefill_preempt():
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
_, seq_group = create_dummy_prompt("1", prompt_length=60)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
......@@ -522,17 +562,21 @@ def test_chunked_prefill_max_seqs():
max_seqs = 2
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = 128
cache_config.num_gpu_blocks = 128
scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = []
_, seq_group = create_dummy_prompt("1", prompt_length=65)
_, seq_group = create_dummy_prompt("1",
prompt_length=65,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# The first prefill is chunked.
......@@ -542,7 +586,9 @@ def test_chunked_prefill_max_seqs():
# Add new requests.
for i in range(4):
_, seq_group = create_dummy_prompt(str(i), prompt_length=65)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=65,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
......@@ -562,3 +608,44 @@ def test_chunked_prefill_max_seqs():
assert len(get_sequence_groups(out)) == max_seqs
assert not running[0].is_prefill()
assert not running[1].is_prefill()
def test_perfix_caching():
"""Verify allocating full blocks when prefix caching is enabled."""
block_size = 4
max_seqs = 10
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size,
1.0,
1,
"auto",
enable_prefix_caching=True)
cache_config.num_cpu_blocks = 0
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
running: List[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
block_size=block_size,
prompt_length=50)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
assert seq_group_meta[0].token_chunk_size == 50
# Verify it is chunked. Note that although the budget is 64-50=14,
# we only allocate full blocks for prefix caching, so only 4*(14//4)=12
# tokens are allocated.
assert seq_group_meta[1].token_chunk_size == 12
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 62
import pytest
from tests.conftest import VllmRunner
from tests.core.utils import create_dummy_prompt
from vllm.engine.llm_engine import LLMEngine
from vllm.platforms import current_platform
from vllm.sequence import SequenceGroup
MODEL = "JackFram/llama-160m"
def add_seq_group_to_engine(engine: LLMEngine, seq_group: SequenceGroup):
scheduler = engine.scheduler[0]
scheduler.add_seq_group(seq_group)
@pytest.mark.parametrize("num_scheduler_steps", [1, 8])
@pytest.mark.parametrize("enable_chunked_prefill", [False, True])
@pytest.mark.parametrize("enforce_eager", [False, True])
def test_num_computed_tokens_update(num_scheduler_steps: int,
enable_chunked_prefill: bool,
enforce_eager: bool):
is_multi_step = num_scheduler_steps > 1
is_multi_step_chunked_prefill = is_multi_step and enable_chunked_prefill
if is_multi_step_chunked_prefill and current_platform.is_rocm():
pytest.skip("Multi-step with Chunked-Prefill does not support "
"rocm_flash_attn backend")
# Make a vllm engine
runner = VllmRunner(model_name=MODEL,
gpu_memory_utilization=0.7,
num_scheduler_steps=num_scheduler_steps,
enable_chunked_prefill=enable_chunked_prefill,
enforce_eager=enforce_eager)
engine: LLMEngine = runner.model.llm_engine
# In multi-step + chunked-prefill there is no separate single prompt step.
# What is scheduled will run for num_scheduler_steps always.
num_prompt_steps = num_scheduler_steps \
if is_multi_step_chunked_prefill else 1
num_output_tokens_list = [4, 8, 12, 15, 16, 17]
# Create sequence and add to engine
prompt_len = 10
for req_idx, num_output_tokens in enumerate(num_output_tokens_list):
seq, seq_group = create_dummy_prompt(request_id=str(req_idx),
prompt_length=prompt_len,
min_tokens=num_output_tokens,
max_tokens=num_output_tokens)
add_seq_group_to_engine(engine, seq_group)
assert seq.data.get_num_computed_tokens() == 0
for _ in range(num_prompt_steps):
# prompt steps
engine.step()
if not seq.is_finished():
prompt_num_computed_tokens = seq.data.get_num_computed_tokens()
# Test correctness of num_computed_tokens after the prompt steps
assert prompt_num_computed_tokens == \
prompt_len + num_prompt_steps - 1
decode_step_counter = 0
while not seq.is_finished():
# Test correctness of num_computed_tokens after the decode steps
assert seq.data.get_num_computed_tokens(
) == prompt_num_computed_tokens + decode_step_counter
for _ in range(num_scheduler_steps):
# decode step
engine.step()
decode_step_counter += 1
# Test correctness of num_computed_tokens after the sequence finish.
assert seq.data.get_num_computed_tokens(
) == prompt_len + num_output_tokens - 1
......@@ -4,43 +4,26 @@ from typing import List, Set, Tuple
from unittest.mock import MagicMock
import pytest # noqa
from torch import Use # noqa
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
from vllm.sequence import Logprob, SequenceGroup, SequenceStatus
from vllm.sequence import SequenceGroup, SequenceStatus
from .utils import create_dummy_prompt
def get_sequence_groups(scheduler_output):
return [s.seq_group for s in scheduler_output.scheduled_seq_groups]
def append_new_token(out, token_id: int):
seq_groups = get_sequence_groups(out)
for seq_group in seq_groups:
for seq in seq_group.get_seqs():
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
def schedule_and_update_computed_tokens(scheduler):
metas, out = scheduler.schedule()
for s, meta in zip(out.scheduled_seq_groups, metas):
s.seq_group.update_num_computed_tokens(meta.token_chunk_size)
return metas, out
def append_new_token_seq_group(token_chunk_size, seq_group, token_id: int):
seq_group.update_num_computed_tokens(token_chunk_size)
for seq in seq_group.get_seqs():
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
from .utils import (append_new_token, append_new_token_seq_group,
create_dummy_prompt, get_sequence_groups,
schedule_and_update_computed_tokens)
def test_scheduler_add_seq_group():
block_size = 4
scheduler_config = SchedulerConfig(100, 64, 1)
scheduler_config = SchedulerConfig(
100,
64,
1,
)
cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
cache_config.num_cpu_blocks = 4
cache_config.num_gpu_blocks = 4
......@@ -49,14 +32,20 @@ def test_scheduler_add_seq_group():
# Add seq group to scheduler.
num_seq_group = 4
for i in range(num_seq_group):
_, seq_group = create_dummy_prompt(str(i), block_size)
_, seq_group = create_dummy_prompt(str(i),
block_size,
block_size=block_size)
scheduler.add_seq_group(seq_group)
assert scheduler.get_num_unfinished_seq_groups() == i + 1
def test_scheduler_abort_seq_group():
block_size = 4
scheduler_config = SchedulerConfig(100, 64, 1)
scheduler_config = SchedulerConfig(
100,
64,
1,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 4
cache_config.num_gpu_blocks = 4
......@@ -80,7 +69,11 @@ def test_scheduler_schedule_simple():
block_size = 4
num_seq_group = 4
max_model_len = 16
scheduler_config = SchedulerConfig(64, num_seq_group, max_model_len)
scheduler_config = SchedulerConfig(
64,
num_seq_group,
max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -89,7 +82,9 @@ def test_scheduler_schedule_simple():
# Add seq groups to scheduler.
for i in range(num_seq_group):
_, seq_group = create_dummy_prompt(str(i), prompt_length=block_size)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=block_size,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
......@@ -118,15 +113,18 @@ def test_scheduler_prefill_prioritized():
block_size = 4
max_model_len = 30
max_batched_num_tokens = 30
scheduler_config = SchedulerConfig(max_batched_num_tokens, 2,
max_model_len)
scheduler_config = SchedulerConfig(
max_batched_num_tokens,
2,
max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 2
cache_config.num_gpu_blocks = 2
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
# Add seq groups to scheduler.
_, seq_group_a = create_dummy_prompt("1", 1)
_, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
scheduler.add_seq_group(seq_group_a)
# Schedule seq groups prompts.
......@@ -134,7 +132,7 @@ def test_scheduler_prefill_prioritized():
assert get_sequence_groups(out) == [seq_group_a]
# Add a new prefill request B.
_, seq_group_b = create_dummy_prompt("2", 30)
_, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
scheduler.add_seq_group(seq_group_b)
# Verify prefill requests are prioritized. Since max_batched_num_tokens
......@@ -146,15 +144,23 @@ def test_scheduler_prefill_prioritized():
def test_scheduler_schedule_preempt_abort():
block_size = 4
max_model_len = 16
scheduler_config = SchedulerConfig(64, 2, max_model_len)
scheduler_config = SchedulerConfig(
64,
2,
max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 2
cache_config.num_gpu_blocks = 2
scheduler = Scheduler(scheduler_config, cache_config, None)
# Add seq groups to scheduler.
seq_a, seq_group_a = create_dummy_prompt("1", block_size)
seq_b, seq_group_b = create_dummy_prompt("2", block_size)
seq_a, seq_group_a = create_dummy_prompt("1",
block_size,
block_size=block_size)
seq_b, seq_group_b = create_dummy_prompt("2",
block_size,
block_size=block_size)
scheduler.add_seq_group(seq_group_a)
scheduler.add_seq_group(seq_group_b)
......@@ -197,7 +203,11 @@ def test_scheduler_max_seqs():
num_seq_group = 4
max_seq_group = 2
max_model_len = 16
scheduler_config = SchedulerConfig(64, max_seq_group, max_model_len)
scheduler_config = SchedulerConfig(
64,
max_seq_group,
max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -206,7 +216,9 @@ def test_scheduler_max_seqs():
all_seq_groups: List[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(num_seq_group):
_, seq_group = create_dummy_prompt(str(i), prompt_length=block_size)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=block_size,
block_size=block_size)
all_seq_groups.append(seq_group)
# Append 1 seq group
......@@ -235,7 +247,12 @@ def test_scheduler_max_seqs():
def test_scheduler_delay_factor():
block_size = 4
scheduler_config = SchedulerConfig(100, 64, 16, delay_factor=0.5)
scheduler_config = SchedulerConfig(
100,
64,
16,
delay_factor=0.5,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
......@@ -243,7 +260,8 @@ def test_scheduler_delay_factor():
# schedule first prompt
seq_group_meta, seq_group = create_dummy_prompt("0",
prompt_length=block_size)
prompt_length=block_size,
block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert out.num_prefill_groups > 0
......@@ -253,7 +271,8 @@ def test_scheduler_delay_factor():
# wait for a second before scheduling next prompt
time.sleep(1)
seq_group_meta, seq_group = create_dummy_prompt("1",
prompt_length=block_size)
prompt_length=block_size,
block_size=block_size)
scheduler.add_seq_group(seq_group)
# second prompt should *not* be scheduled
......@@ -271,10 +290,17 @@ def test_scheduler_delay_factor():
def test_swapped_out_prioritized():
scheduler = initialize_scheduler(max_num_seqs=6)
block_size = 4
scheduler = initialize_scheduler(max_num_seqs=6,
block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
# best_of=2 * 3 == 6 sequences.
for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
# prefill scheduled now.
......@@ -298,7 +324,10 @@ def test_swapped_out_prioritized():
append_new_token(out, 1)
# Add 1 more task. Swap should be prioritized over prefill.
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
append_new_token(out, 1)
......@@ -309,17 +338,25 @@ def test_swapped_out_prioritized():
assert out.blocks_to_swap_out == []
def initialize_scheduler(*,
max_num_seqs=1000,
max_token_budget=1000,
max_model_len=1000,
lora_config=None):
block_size = 4
scheduler_config = SchedulerConfig(max_token_budget, max_num_seqs,
max_model_len)
def initialize_scheduler(
*,
max_num_seqs=1000,
max_token_budget=1000,
max_model_len=1000,
lora_config=None,
block_size=4,
num_cpu_blocks=8,
num_gpu_blocks=8,
):
block_size = block_size
scheduler_config = SchedulerConfig(
max_token_budget,
max_num_seqs,
max_model_len,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
cache_config.num_cpu_blocks = num_cpu_blocks
cache_config.num_gpu_blocks = num_gpu_blocks
scheduler = Scheduler(scheduler_config, cache_config, lora_config)
return scheduler
......@@ -345,8 +382,11 @@ def test_prefill_schedule_max_prompt_len():
"""
Test prompt longer than max_prompt_len is aborted.
"""
scheduler = initialize_scheduler(max_model_len=30)
_, seq_group = create_dummy_prompt("0", prompt_length=60)
block_size = 4
scheduler = initialize_scheduler(max_model_len=30, block_size=block_size)
_, seq_group = create_dummy_prompt("0",
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
budget = create_token_budget()
output = scheduler._schedule_prefills(budget, None)
......@@ -362,10 +402,15 @@ def test_prefill_schedule_token_budget():
"""
Test token budget respected.
"""
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
budget = create_token_budget(token_budget=0)
for i in range(2):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
# 0 token budget == nothing is scheduled.
......@@ -388,10 +433,14 @@ def test_prefill_schedule_token_budget():
assert len(remaining_waiting) == 1
# Test when current_batched_tokens respected.
scheduler = initialize_scheduler()
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=16,
num_gpu_blocks=16)
budget = create_token_budget(token_budget=60)
add_token_budget(budget, 30, 0)
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
# Cannot schedule a prompt that doesn't fit the budget.
scheduler.add_seq_group(seq_group)
output = scheduler._schedule_prefills(budget, None)
......@@ -415,10 +464,15 @@ def test_prefill_schedule_max_seqs():
"""
Test max seq respected.
"""
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
budget = create_token_budget(max_num_seqs=2)
for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
output = scheduler._schedule_prefills(budget, None)
remaining_waiting = scheduler.waiting
......@@ -432,7 +486,9 @@ def test_prefill_schedule_max_seqs():
scheduler.waiting = deque()
budget = create_token_budget(max_num_seqs=2)
add_token_budget(budget, 0, 2)
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
output = scheduler._schedule_prefills(budget, None)
remaining_waiting = scheduler.waiting
......@@ -447,13 +503,18 @@ def test_prefill_schedule_max_lora():
"""
Test max lora is respected and prioritized.
"""
block_size = 4
lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
scheduler = initialize_scheduler(lora_config=lora_config)
scheduler = initialize_scheduler(lora_config=lora_config,
block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
budget = create_token_budget(token_budget=120)
curr_loras: Set[int] = set()
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size,
lora_request=LoRARequest(
lora_name=str(i),
lora_int_id=i + 1,
......@@ -465,7 +526,9 @@ def test_prefill_schedule_max_lora():
# If a request is not scheduled because it hits max lora, it is
# prioritized. Verify that.
for i in range(2, 4):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
# Schedule 2 requests (0 and 2)
output = scheduler._schedule_prefills(budget, curr_loras)
......@@ -493,10 +556,15 @@ def test_prefill_schedule_no_block_manager_capacity():
"""
Test sequence cannot be scheduled due to block manager has no capacity.
"""
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_gpu_blocks=128,
num_cpu_blocks=128)
budget = create_token_budget()
for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
scheduler.block_manager.can_allocate = MagicMock()
scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
......@@ -511,7 +579,9 @@ def test_prefill_schedule_no_block_manager_capacity():
scheduler = initialize_scheduler()
budget = create_token_budget()
for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
scheduler.block_manager.can_allocate = MagicMock()
scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
......@@ -528,10 +598,15 @@ def test_decode_schedule_preempted():
"""
Test decodes cannot be scheduled and preempted.
"""
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
curr_loras = None
for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._add_seq_group_to_running(seq_group)
......@@ -567,11 +642,17 @@ def test_decode_swap_beam_search():
"""
Test best_of > 1 swap out blocks
"""
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_gpu_blocks=64,
num_cpu_blocks=64)
curr_loras = None
budget = create_token_budget()
for i in range(3):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
scheduler._add_seq_group_to_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
......@@ -615,8 +696,14 @@ def test_schedule_decode_blocks_to_copy_update():
"""
Verify blocks_to_copy is updated.
"""
scheduler = initialize_scheduler()
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
block_size = 4
scheduler = initialize_scheduler(block_size=4,
num_cpu_blocks=16,
num_gpu_blocks=16)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
curr_loras = None
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
......@@ -642,12 +729,16 @@ def test_schedule_decode_blocks_to_copy_update():
def test_schedule_swapped_simple():
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
_, seq_group = create_dummy_prompt("1",
prompt_length=4,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
append_new_token_seq_group(4, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
scheduler._add_seq_group_to_swapped(seq_group)
......@@ -667,11 +758,14 @@ def test_schedule_swapped_simple():
def test_schedule_swapped_max_token_budget():
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for _ in range(2):
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
for i in range(2):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
......@@ -699,11 +793,16 @@ def test_schedule_swapped_max_token_budget():
def test_schedule_swapped_max_seqs():
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=64,
num_gpu_blocks=64)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for i in range(4):
_, seq_group = create_dummy_prompt(str(i), prompt_length=60)
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=4)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
......@@ -729,13 +828,18 @@ def test_schedule_swapped_max_seqs():
def test_schedule_swapped_max_loras():
block_size = 4
lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
scheduler = initialize_scheduler(lora_config=lora_config)
scheduler = initialize_scheduler(lora_config=lora_config,
block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras: Set[int] = set()
blocks_to_swap_out: List[Tuple[int, int]] = []
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size,
lora_request=LoRARequest(
lora_name=str(i),
lora_int_id=i + 1,
......@@ -757,11 +861,17 @@ def test_schedule_swapped_max_loras():
def test_schedule_swapped_cannot_swap_in():
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for _ in range(2):
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
......@@ -782,11 +892,17 @@ def test_schedule_swapped_cannot_swap_in():
def test_infeasible_swap():
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
blocks_to_swap_out: List[Tuple[int, int]] = []
for _ in range(2):
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
scheduler._swap_out(seq_group, blocks_to_swap_out)
......@@ -808,9 +924,15 @@ def test_infeasible_swap():
def test_schedule_swapped_blocks_to_copy():
scheduler = initialize_scheduler()
block_size = 4
scheduler = initialize_scheduler(block_size=block_size,
num_cpu_blocks=32,
num_gpu_blocks=32)
curr_loras = None
_, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
best_of=2,
block_size=block_size)
scheduler._allocate_and_set_running(seq_group)
append_new_token_seq_group(60, seq_group, 1)
blocks_to_swap_out: List[Tuple[int, int]] = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment