Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
e7c1b7f3
Commit
e7c1b7f3
authored
Sep 06, 2024
by
zhuwenwen
Browse files
Merge branch 'v0.5.4-dtk24.04.1'
parents
7462218e
04c62b93
Changes
442
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
752 additions
and
280 deletions
+752
-280
tests/core/block/e2e/test_correctness.py
tests/core/block/e2e/test_correctness.py
+68
-1
tests/core/block/test_block_manager_v2.py
tests/core/block/test_block_manager_v2.py
+7
-4
tests/core/block/test_block_table.py
tests/core/block/test_block_table.py
+8
-5
tests/core/block/test_cpu_gpu_block_allocator.py
tests/core/block/test_cpu_gpu_block_allocator.py
+16
-16
tests/core/block/test_naive_block.py
tests/core/block/test_naive_block.py
+3
-3
tests/core/block/test_prefix_caching_block.py
tests/core/block/test_prefix_caching_block.py
+69
-41
tests/core/test_chunked_prefill_scheduler.py
tests/core/test_chunked_prefill_scheduler.py
+5
-5
tests/core/test_scheduler.py
tests/core/test_scheduler.py
+81
-108
tests/core/utils.py
tests/core/utils.py
+7
-5
tests/distributed/test_basic_distributed_correctness.py
tests/distributed/test_basic_distributed_correctness.py
+49
-32
tests/distributed/test_chunked_prefill_distributed.py
tests/distributed/test_chunked_prefill_distributed.py
+31
-31
tests/distributed/test_comm_ops.py
tests/distributed/test_comm_ops.py
+91
-7
tests/distributed/test_custom_all_reduce.py
tests/distributed/test_custom_all_reduce.py
+5
-5
tests/distributed/test_multimodal_broadcast.py
tests/distributed/test_multimodal_broadcast.py
+52
-0
tests/distributed/test_pipeline_parallel.py
tests/distributed/test_pipeline_parallel.py
+108
-0
tests/distributed/test_pipeline_partition.py
tests/distributed/test_pipeline_partition.py
+34
-0
tests/distributed/test_pynccl.py
tests/distributed/test_pynccl.py
+15
-6
tests/distributed/test_same_node.py
tests/distributed/test_same_node.py
+4
-2
tests/distributed/test_shm_broadcast.py
tests/distributed/test_shm_broadcast.py
+88
-0
tests/distributed/test_utils.py
tests/distributed/test_utils.py
+11
-9
No files found.
Too many changes to show.
To preserve performance only
442 of 442+
files are displayed.
Plain diff
Email patch
tests/core/block/e2e/test_correctness.py
View file @
e7c1b7f3
...
...
@@ -183,7 +183,7 @@ def test_v1_v2_greedy_equality_with_cow(baseline_llm_generator,
# Allow only 2 sequences of ~128 tokens in worst case.
# Note 16 = 128/block_size
"num_gpu_blocks_override"
:
2
*
(
16
+
1
),
"num_gpu_blocks_override"
:
2
*
(
16
+
2
),
}
])
@
pytest
.
mark
.
parametrize
(
"baseline_llm_kwargs"
,
[{
...
...
@@ -477,3 +477,70 @@ def test_auto_prefix_caching_with_preemption(baseline_llm_generator,
assert
expected_token_ids
==
actual_token_ids
assert
baseline_token_ids
==
test_token_ids
@
pytest
.
mark
.
parametrize
(
"common_llm_kwargs"
,
[{
# Use a small model for a fast test.
"model"
:
"facebook/opt-125m"
,
# skip cuda graph creation for fast test.
"enforce_eager"
:
True
,
# we keep the blocks small, so that hit eviction quickly
"max_model_len"
:
48
,
"block_size"
:
16
,
"num_gpu_blocks_override"
:
3
,
# Test APC in v2 block
"use_v2_block_manager"
:
True
,
}])
@
pytest
.
mark
.
parametrize
(
"per_test_common_llm_kwargs"
,
[{}])
@
pytest
.
mark
.
parametrize
(
"baseline_llm_kwargs"
,
[{
"enable_prefix_caching"
:
False
}])
@
pytest
.
mark
.
parametrize
(
"test_llm_kwargs"
,
[{
"enable_prefix_caching"
:
True
,
}])
@
pytest
.
mark
.
parametrize
(
"seed"
,
[
1
])
def
test_auto_prefix_caching_after_evition_start
(
baseline_llm_generator
,
test_llm_generator
):
"""Verify block manager v2 with auto prefix caching could works normal
even when eviction started.
With APC enabled, all blocks are held by native block at the beginning.
Then blocks are managed by evictor instead. If cache hit at the evitor's
block, then it could be reused, or we need to recompute its kv cache.
"""
output_len
=
10
temperature
=
0.0
prompts
=
[
"You are a helpful assistant. Please answer truthfully and write "
"out your thinking step by step to be sure you get the right answer. "
"If you make a mistake, attempt to correct it. who are you?"
,
"You are a helpful assistant. Please answer truthfully and write out "
"your thinking step by step to be sure you get the right answer. You "
"are helpful and harmless and you follow ethical guidelines. "
"who are you?"
]
sampling_params
=
SamplingParams
(
max_tokens
=
output_len
,
ignore_eos
=
True
,
temperature
=
temperature
,
)
print
(
'Getting token ids with APC disabled'
)
baseline_token_ids
=
get_token_ids_from_llm_generator
(
baseline_llm_generator
,
prompts
,
sampling_params
)
print
(
'Getting token ids with APC enabled'
)
test_token_ids
=
get_token_ids_from_llm_generator
(
test_llm_generator
,
prompts
,
sampling_params
)
for
expected_token_ids
,
actual_token_ids
in
zip
(
baseline_token_ids
,
test_token_ids
):
assert
expected_token_ids
==
actual_token_ids
assert
baseline_token_ids
==
test_token_ids
tests/core/block/test_block_manager_v2.py
View file @
e7c1b7f3
...
...
@@ -249,10 +249,13 @@ def test_append_slots(block_size, prompt_len, num_slots_to_append,
# Expect consumed blocks to be new blocks required to support the new slots.
expected_consumed_blocks
=
len
(
chunk_list
(
list
(
range
(
prompt_len
+
num_slots_to_append
+
num_lookahead_slots
)),
block_size
))
-
len
(
chunk_list
(
list
(
range
(
prompt_len
)),
block_size
))
list
(
chunk_list
(
list
(
range
(
prompt_len
+
num_slots_to_append
+
num_lookahead_slots
)),
block_size
)))
-
len
(
list
(
chunk_list
(
list
(
range
(
prompt_len
)),
block_size
)))
assert
num_consumed_blocks
==
expected_consumed_blocks
...
...
tests/core/block/test_block_table.py
View file @
e7c1b7f3
from
typing
import
List
import
pytest
from
vllm.core.block.block_table
import
BlockTable
...
...
@@ -28,7 +30,7 @@ def test_allocate_naive(block_size: int, sequence_len: int):
token_ids
=
list
(
range
(
sequence_len
))
num_blocks_per_alloc
=
len
(
list
(
chunk_list
(
token_ids
,
block_size
)))
block_tables
=
[]
block_tables
:
List
[
BlockTable
]
=
[]
for
i
in
range
(
5
):
assert
allocator
.
get_num_free_blocks
(
device
=
Device
.
GPU
)
==
num_gpu_blocks
-
i
*
num_blocks_per_alloc
...
...
@@ -73,7 +75,7 @@ def test_allocate_prefix_caching(block_size: int, sequence_len: int):
num_immutable_blocks_per_alloc
=
len
(
chunked_tokens
)
-
num_mutable_blocks_per_alloc
block_tables
=
[]
block_tables
:
List
[
BlockTable
]
=
[]
for
alloc_i
in
range
(
1
,
6
):
block_tables
.
append
(
...
...
@@ -268,7 +270,7 @@ def test_append_token_ids_correct_content(block_size: int, sequence_len: int,
)
block_table
.
allocate
(
token_ids
=
token_ids
,
device
=
Device
.
GPU
)
appended_so_far
=
[]
appended_so_far
:
List
[
int
]
=
[]
for
append
in
chunk_list
(
token_ids_to_append
,
append_size
):
block_table
.
append_token_ids
(
append
)
appended_so_far
.
extend
(
append
)
...
...
@@ -371,8 +373,9 @@ def test_cow(block_size: int, sequence_len: int, append_len: int,
block_size
)
-
(
sequence_len
//
block_size
)
original_block_table
.
allocate
(
token_ids
=
token_ids
,
device
=
Device
.
GPU
)
original_block_ids
=
original_block_table
.
physical_block_ids
original_block_ids
=
original_block_table
.
physical_block_ids
[:]
print
(
"original_block_ids = {}"
.
format
(
original_block_ids
))
forked_block_table
=
original_block_table
.
fork
()
# Expect no additional allocation (copy on _write_).
...
...
@@ -455,7 +458,7 @@ def test_cow_lookahead_simple(block_size: int, sequence_len: int,
# Allocate lookahead slots.
original_block_table
.
ensure_num_empty_slots
(
lookahead_slots
)
original_block_ids
=
original_block_table
.
physical_block_ids
original_block_ids
=
original_block_table
.
physical_block_ids
[:]
forked_block_table
=
original_block_table
.
fork
()
...
...
tests/core/block/test_cpu_gpu_block_allocator.py
View file @
e7c1b7f3
...
...
@@ -8,8 +8,8 @@ from vllm.utils import Device, chunk_list
@
pytest
.
mark
.
parametrize
(
"num_gpu_blocks"
,
[
1024
])
@
pytest
.
mark
.
parametrize
(
"block_size"
,
[
16
])
@
pytest
.
mark
.
parametrize
(
"allocator_type"
,
[
"naive"
,
"prefix_caching"
])
def
test_allocate_mutable
(
num_cpu_blocks
:
int
,
num_gpu_blocks
:
int
,
block_size
:
int
,
allocator_type
:
str
):
def
test_allocate_mutable
_block
(
num_cpu_blocks
:
int
,
num_gpu_blocks
:
int
,
block_size
:
int
,
allocator_type
:
str
):
allocator
=
CpuGpuBlockAllocator
.
create
(
allocator_type
=
allocator_type
,
num_gpu_blocks
=
num_gpu_blocks
,
...
...
@@ -21,14 +21,14 @@ def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
assert
allocator
.
get_num_free_blocks
(
Device
.
GPU
)
==
num_gpu_blocks
cpu_blocks
=
[
allocator
.
allocate_mutable
(
prev_block
=
None
,
device
=
Device
.
CPU
)
allocator
.
allocate_mutable
_block
(
prev_block
=
None
,
device
=
Device
.
CPU
)
for
_
in
range
(
num_cpu_blocks
)
]
assert
allocator
.
get_num_free_blocks
(
Device
.
CPU
)
==
0
assert
allocator
.
get_num_free_blocks
(
Device
.
GPU
)
==
num_gpu_blocks
gpu_blocks
=
[
allocator
.
allocate_mutable
(
prev_block
=
None
,
device
=
Device
.
GPU
)
allocator
.
allocate_mutable
_block
(
prev_block
=
None
,
device
=
Device
.
GPU
)
for
_
in
range
(
num_gpu_blocks
)
]
assert
allocator
.
get_num_free_blocks
(
Device
.
CPU
)
==
0
...
...
@@ -47,8 +47,8 @@ def test_allocate_mutable(num_cpu_blocks: int, num_gpu_blocks: int,
@
pytest
.
mark
.
parametrize
(
"num_gpu_blocks"
,
[
1024
])
@
pytest
.
mark
.
parametrize
(
"block_size"
,
[
2
])
@
pytest
.
mark
.
parametrize
(
"allocator_type"
,
[
"naive"
,
"prefix_caching"
])
def
test_allocate_immutable
(
num_cpu_blocks
:
int
,
num_gpu_blocks
:
int
,
block_size
:
int
,
allocator_type
:
str
):
def
test_allocate_immutable
_block
(
num_cpu_blocks
:
int
,
num_gpu_blocks
:
int
,
block_size
:
int
,
allocator_type
:
str
):
allocator
=
CpuGpuBlockAllocator
.
create
(
allocator_type
=
allocator_type
,
num_gpu_blocks
=
num_gpu_blocks
,
...
...
@@ -58,27 +58,27 @@ def test_allocate_immutable(num_cpu_blocks: int, num_gpu_blocks: int,
unique_token_ids
=
list
(
range
((
num_cpu_blocks
+
num_gpu_blocks
)
*
block_size
))
gpu_token_ids
=
chunk_list
(
unique_token_ids
[:
num_gpu_blocks
*
block_size
],
block_size
)
cpu_token_ids
=
chunk_list
(
unique_token_ids
[
num_gpu_blocks
*
block_size
:],
block_size
)
gpu_token_ids
=
list
(
chunk_list
(
unique_token_ids
[:
num_gpu_blocks
*
block_size
],
block_size
)
)
cpu_token_ids
=
list
(
chunk_list
(
unique_token_ids
[
num_gpu_blocks
*
block_size
:],
block_size
)
)
assert
allocator
.
get_num_free_blocks
(
Device
.
CPU
)
==
num_cpu_blocks
assert
allocator
.
get_num_free_blocks
(
Device
.
GPU
)
==
num_gpu_blocks
cpu_blocks
=
[
allocator
.
allocate_immutable
(
prev_block
=
None
,
token_ids
=
token_ids
,
device
=
Device
.
CPU
)
allocator
.
allocate_immutable
_block
(
prev_block
=
None
,
token_ids
=
token_ids
,
device
=
Device
.
CPU
)
for
token_ids
in
cpu_token_ids
]
assert
allocator
.
get_num_free_blocks
(
Device
.
CPU
)
==
0
assert
allocator
.
get_num_free_blocks
(
Device
.
GPU
)
==
num_gpu_blocks
gpu_blocks
=
[
allocator
.
allocate_immutable
(
prev_block
=
None
,
token_ids
=
token_ids
,
device
=
Device
.
GPU
)
allocator
.
allocate_immutable
_block
(
prev_block
=
None
,
token_ids
=
token_ids
,
device
=
Device
.
GPU
)
for
token_ids
in
gpu_token_ids
]
assert
allocator
.
get_num_free_blocks
(
Device
.
CPU
)
==
0
...
...
tests/core/block/test_naive_block.py
View file @
e7c1b7f3
...
...
@@ -14,11 +14,11 @@ class TestNaiveBlockAllocator:
prev_block
:
Optional
[
Block
],
token_ids
:
List
[
int
]):
if
allocate_type
==
"immutable"
:
allocate_block
=
lambda
:
allocator
.
allocate_immutable
(
allocate_block
=
lambda
:
allocator
.
allocate_immutable
_block
(
prev_block
=
prev_block
,
token_ids
=
token_ids
)
elif
allocate_type
==
"mutable"
:
allocate_block
=
lambda
:
allocator
.
allocate_mutable
(
prev
_block
=
prev_block
)
allocate_block
=
lambda
:
allocator
.
allocate_mutable_block
(
prev_block
=
prev_block
)
else
:
raise
ValueError
()
...
...
tests/core/block/test_prefix_caching_block.py
View file @
e7c1b7f3
...
...
@@ -26,11 +26,10 @@ class TestPrefixCachingBlock:
token_ids
=
list
(
range
(
num_to_fill
))
mock_allocator
=
MagicMock
(
spec
=
PrefixCachingBlockAllocator
)
block_with_prev
=
PrefixCachingBlock
(
prev_block
=
None
,
token_ids
=
token_ids
,
block_size
=
block_size
,
prefix_caching_allocator
=
mock_allocator
)
block_with_prev
=
PrefixCachingBlock
(
prev_block
=
None
,
token_ids
=
token_ids
,
block_size
=
block_size
,
allocator
=
mock_allocator
)
if
is_curr_block_full
:
# Expect hash since block is full.
...
...
@@ -71,7 +70,7 @@ class TestPrefixCachingBlock:
prev_block
=
previous_block
,
token_ids
=
token_ids
,
block_size
=
block_size
,
prefix_caching_
allocator
=
mock_allocator
,
allocator
=
mock_allocator
,
)
if
is_curr_block_full
and
prev_block_has_hash
:
...
...
@@ -123,7 +122,7 @@ class TestPrefixCachingBlock:
num_empty_trailing_blocks
=
0
)
->
List
[
PrefixCachingBlock
]:
"""Helper method which creates a chain of blocks.
"""
blocks
=
[]
blocks
:
List
[
PrefixCachingBlock
]
=
[]
num_blocks
=
math
.
ceil
(
len
(
token_ids
)
/
block_size
)
+
num_empty_trailing_blocks
...
...
@@ -138,7 +137,7 @@ class TestPrefixCachingBlock:
prev_block
=
prev_block
,
token_ids
=
[],
block_size
=
block_size
,
prefix_caching_
allocator
=
allocator
,
allocator
=
allocator
,
)
tokens_to_append
=
token_ids
[
block_number
*
...
...
@@ -159,11 +158,11 @@ class TestPrefixCachingBlockAllocator:
prev_block
:
Optional
[
Block
],
token_ids
:
List
[
int
]):
if
allocate_type
==
"immutable"
:
allocate_block
=
lambda
:
allocator
.
allocate_immutable
(
allocate_block
=
lambda
:
allocator
.
allocate_immutable
_block
(
prev_block
=
prev_block
,
token_ids
=
token_ids
)
elif
allocate_type
==
"mutable"
:
allocate_block
=
lambda
:
allocator
.
allocate_mutable
(
prev
_block
=
prev_block
)
allocate_block
=
lambda
:
allocator
.
allocate_mutable_block
(
prev_block
=
prev_block
)
else
:
raise
ValueError
()
...
...
@@ -233,12 +232,13 @@ class TestPrefixCachingBlockAllocator:
# Expect allocation with unseen hash to fail.
with
pytest
.
raises
(
BlockAllocator
.
NoFreeBlocksError
):
allocator
.
allocate_immutable
(
prev_block
=
chain
[
-
1
],
token_ids
=
list
(
range
(
block_size
)))
allocator
.
allocate_immutable_block
(
prev_block
=
chain
[
-
1
],
token_ids
=
list
(
range
(
block_size
)))
# Expect mutable allocation to fail.
with
pytest
.
raises
(
BlockAllocator
.
NoFreeBlocksError
):
allocator
.
allocate_mutable
(
prev_block
=
chain
[
-
1
])
allocator
.
allocate_mutable
_block
(
prev_block
=
chain
[
-
1
])
# Expect allocation of exact same chain to pass.
second_chain
=
TestPrefixCachingBlockAllocator
.
create_immutable_chain
(
...
...
@@ -270,7 +270,7 @@ class TestPrefixCachingBlockAllocator:
# Expect mutable allocation to fail.
with
pytest
.
raises
(
BlockAllocator
.
NoFreeBlocksError
):
allocator
.
allocate_mutable
(
prev_block
=
None
)
allocator
.
allocate_mutable
_block
(
prev_block
=
None
)
block_to_free
=
chain
[
-
1
]
...
...
@@ -280,11 +280,11 @@ class TestPrefixCachingBlockAllocator:
allocator
.
free
(
block_to_free
)
assert
block_to_free
.
block_id
is
None
,
i
new_block
=
allocator
.
allocate_mutable
(
prev_block
=
None
)
new_block
=
allocator
.
allocate_mutable
_block
(
prev_block
=
None
)
assert
new_block
.
block_id
==
block_id
,
i
with
pytest
.
raises
(
BlockAllocator
.
NoFreeBlocksError
):
allocator
.
allocate_mutable
(
prev_block
=
None
)
allocator
.
allocate_mutable
_block
(
prev_block
=
None
)
block_to_free
=
new_block
...
...
@@ -376,7 +376,6 @@ class TestPrefixCachingBlockAllocator:
# Create token ids that will exhaust all blocks.
token_ids
=
list
(
range
(
num_blocks_to_consume
*
block_size
))
blocks
=
list
(
range
(
num_blocks_to_consume
))
first_chain
=
TestPrefixCachingBlockAllocator
.
create_immutable_chain
(
block_size
=
block_size
,
...
...
@@ -384,9 +383,6 @@ class TestPrefixCachingBlockAllocator:
allocator
=
allocator
,
)
# mark all blocks in first chain as computed
allocator
.
mark_blocks_as_computed
(
blocks
)
# After zero_point, second_chain's token_ids would be set -1, which
# make it different from here comparing with first_chain
zero_point
=
random
.
randint
(
1
,
len
(
token_ids
)
-
1
)
...
...
@@ -424,15 +420,16 @@ class TestPrefixCachingBlockAllocator:
block_size
=
block_size
)
token_ids
=
list
(
range
(
block_size
))
block
=
allocator
.
allocate_immutable
(
prev_block
=
None
,
token_ids
=
token_ids
)
block
=
allocator
.
allocate_immutable
_block
(
prev_block
=
None
,
token_ids
=
token_ids
)
assert
allocator
.
_refcounter
.
get
(
block
.
block_id
)
==
1
m
=
allocator
.
allocate_mutable
(
prev_block
=
None
)
m
=
allocator
.
allocate_mutable
_block
(
prev_block
=
None
)
block_id
=
m
.
block_id
for
i
in
range
(
block_size
):
m
.
append_token_ids
([
i
])
# After block get promoted to immutable from mutable, if there is
# already same content hash block, then it shall be released into
# hashless_allocator
...
...
@@ -452,48 +449,79 @@ class TestPrefixCachingBlockAllocator:
all_blocks_list
=
[
i
for
i
in
range
(
num_blocks
)]
zero_ref
=
{
i
:
0
for
i
in
range
(
num_blocks
)}
one_ref
=
{
i
:
1
for
i
in
range
(
num_blocks
)}
allocator
=
PrefixCachingBlockAllocator
(
num_blocks
=
num_blocks
,
block_size
=
block_size
)
token_ids
=
list
(
range
(
num_blocks
*
block_size
))
#
now we have num_blocks free blocks in hashless allocator
# with internal tracking list _blocks _cached_blocks and evictor
#
empty and
block
'
s re
f shall be 0
#
Verify initial/pre-alloc state
#
Ensure all
blocks
a
re
free inside hashless allocator
assert
list
(
allocator
.
_hashless_allocator
.
_free_block_indices
)
==
all_blocks_list
assert
len
(
allocator
.
_blocks
.
keys
())
==
0
# Ensure no tracked blocks
assert
len
(
allocator
.
_block_tracker
.
keys
())
==
num_blocks
for
block_id
in
range
(
num_blocks
):
assert
not
allocator
.
_block_tracker
[
block_id
].
active
# Ensure no cached blocks
assert
len
(
allocator
.
_cached_blocks
.
values
())
==
0
# Ensure no evicted blocks
assert
len
(
allocator
.
evictor
.
free_table
.
keys
())
==
0
# Ensure 0s ref counts for all blocks
assert
allocator
.
_refcounter
.
_refcounts
==
zero_ref
# Allocate immutable chains with only one block residuled in
new_block
=
[]
for
i
in
range
(
num_blocks
):
block
=
allocator
.
allocate_immutable
(
block
=
allocator
.
allocate_immutable
_block
(
prev_block
=
None
,
token_ids
=
token_ids
[
block_size
*
i
:
block_size
*
(
i
+
1
)])
new_block
.
append
(
block
)
# Verify post-alloc state
# Ensure no blocks are free inside hashless allocator
assert
(
len
(
allocator
.
_hashless_allocator
.
_free_block_indices
)
==
0
)
# Ensure all blocks are tracked
assert
len
(
allocator
.
_block_tracker
.
keys
())
==
num_blocks
for
block_id
in
range
(
num_blocks
):
assert
allocator
.
_block_tracker
[
block_id
].
active
# Ensure all blocks are cached (all promoted)
assert
len
(
allocator
.
_cached_blocks
.
values
())
==
num_blocks
# Ensure no evicted blocks
assert
len
(
allocator
.
evictor
.
free_table
.
keys
())
==
0
# Ensure 1s ref counts for all blocks
assert
allocator
.
_refcounter
.
_refcounts
==
one_ref
# Free all blocks, and now all blocks shall be in the evictor
# there shall be no tracking data left in _block
s
# there shall be no tracking data left in _block
_tracker
# all blocks shall be tracked in _cached_blocks
# all blocks' ref shall be zero
for
block
in
new_block
:
allocator
.
free
(
block
)
assert
len
(
allocator
.
_blocks
.
keys
())
==
0
# Verify post-free state
# Ensure no tracked blocks
assert
len
(
allocator
.
_block_tracker
.
keys
())
==
num_blocks
for
block_id
in
range
(
num_blocks
):
assert
not
allocator
.
_block_tracker
[
block_id
].
active
# Ensure no blocks in hashless allocator (all promoted)
assert
len
(
allocator
.
_hashless_allocator
.
_free_block_indices
)
==
0
# Ensure all blocks are cached
assert
list
(
allocator
.
_cached_blocks
.
values
())
==
all_blocks_list
# Ensure all blocks are inside the evictor
assert
list
(
allocator
.
evictor
.
free_table
.
keys
())
==
all_blocks_list
# Ensure 0s refcounts
assert
allocator
.
_refcounter
.
_refcounts
==
zero_ref
# Allocate a mutable block, and the first block shall be evicted
# and set its content hash into None, ref to 1
mutable
=
allocator
.
allocate_mutable
(
prev_block
=
None
)
mutable
=
allocator
.
allocate_mutable
_block
(
prev_block
=
None
)
assert
mutable
.
block_id
==
0
assert
mutable
.
content_hash
is
None
assert
0
in
allocator
.
_block
s
assert
allocator
.
_block
_tracker
[
0
].
active
assert
allocator
.
_refcounter
.
get
(
0
)
==
1
assert
0
not
in
allocator
.
_cached_blocks
assert
0
not
in
allocator
.
evictor
...
...
@@ -502,27 +530,27 @@ class TestPrefixCachingBlockAllocator:
# hashless allocator
allocator
.
free
(
mutable
)
assert
len
(
allocator
.
_block
s
.
keys
())
==
0
assert
not
allocator
.
_block
_tracker
[
0
].
active
assert
allocator
.
_refcounter
.
_refcounts
==
zero_ref
assert
0
not
in
allocator
.
_cached_blocks
assert
0
not
in
allocator
.
evictor
assert
0
in
allocator
.
_hashless_allocator
.
_free_block_indices
#
w
hen allocate immutable with first block_size tokens, we
#
W
hen allocate immutable with first block_size tokens, we
# shall get free block from hashless allocator, thus no block left
# in hashless
block
=
allocator
.
allocate_immutable
(
prev
_block
=
None
,
token_ids
=
token_ids
[:
block_size
])
block
=
allocator
.
allocate_immutable_block
(
prev_block
=
None
,
token_ids
=
token_ids
[:
block_size
])
assert
block
.
block_id
==
0
assert
len
(
allocator
.
_hashless_allocator
.
_free_block_indices
)
==
0
assert
0
in
allocator
.
_block
s
assert
allocator
.
_block
_tracker
[
0
].
active
assert
0
in
allocator
.
_cached_blocks
.
values
()
assert
allocator
.
_refcounter
.
get
(
0
)
==
1
assert
0
not
in
allocator
.
evictor
# allocate mutable block again, it shall be popped from evictor
mutable
=
allocator
.
allocate_mutable
(
prev_block
=
None
)
mutable
=
allocator
.
allocate_mutable
_block
(
prev_block
=
None
)
assert
len
(
allocator
.
_hashless_allocator
.
_free_block_indices
)
==
0
assert
mutable
.
block_id
not
in
allocator
.
evictor
.
free_table
assert
allocator
.
_refcounter
.
get
(
mutable
.
block_id
)
==
1
...
...
@@ -608,7 +636,7 @@ class TestPrefixCachingBlockAllocator:
)
->
List
[
PrefixCachingBlock
]:
"""Helper method which creates a chain of blocks.
"""
blocks
=
[]
blocks
:
List
[
Block
]
=
[]
num_blocks
=
math
.
ceil
(
len
(
token_ids
)
/
block_size
)
if
num_blocks
==
0
:
...
...
@@ -619,7 +647,7 @@ class TestPrefixCachingBlockAllocator:
block_token_ids
=
token_ids
[
block_number
*
block_size
:(
block_number
+
1
)
*
block_size
]
prev_block
=
allocator
.
allocate_immutable
(
prev_block
=
allocator
.
allocate_immutable
_block
(
prev_block
=
prev_block
,
token_ids
=
block_token_ids
)
blocks
.
append
(
prev_block
)
...
...
tests/core/test_chunked_prefill_scheduler.py
View file @
e7c1b7f3
...
...
@@ -483,11 +483,11 @@ def test_chunked_prefill_preempt():
# The request should be preempted.
scheduler
.
block_manager
.
can_append_slots
=
MagicMock
()
def
cannot_append_second_group
(
seq_group
,
num_lookahead_slots
):
def
cannot_append_second_group
1
(
seq_group
,
num_lookahead_slots
):
return
seq_group
.
request_id
!=
"1"
scheduler
.
block_manager
.
can_append_slots
.
side_effect
=
(
cannot_append_second_group
)
cannot_append_second_group
1
)
# The running prefill is now preempted.
_
,
out
=
schedule_and_update_computed_tokens
(
scheduler
)
...
...
@@ -505,11 +505,11 @@ def test_chunked_prefill_preempt():
assert
seq_group
.
get_num_uncomputed_tokens
()
==
30
# We should be able to run prefill twice as it is chunked.
def
cannot_append_second_group
(
seq_group
,
num_lookahead_slots
):
def
cannot_append_second_group
2
(
seq_group
,
num_lookahead_slots
):
return
True
scheduler
.
block_manager
.
can_append_slots
.
side_effect
=
(
cannot_append_second_group
)
cannot_append_second_group
2
)
_
,
out
=
schedule_and_update_computed_tokens
(
scheduler
)
assert
len
(
out
.
scheduled_seq_groups
)
==
1
assert
out
.
num_prefill_groups
==
1
...
...
@@ -530,7 +530,7 @@ def test_chunked_prefill_max_seqs():
cache_config
.
num_cpu_blocks
=
8
cache_config
.
num_gpu_blocks
=
8
scheduler
=
Scheduler
(
scheduler_config
,
cache_config
,
None
)
running
=
[]
running
:
List
[
SequenceGroup
]
=
[]
_
,
seq_group
=
create_dummy_prompt
(
"1"
,
prompt_length
=
65
)
scheduler
.
add_seq_group
(
seq_group
)
...
...
tests/core/test_scheduler.py
View file @
e7c1b7f3
import
time
from
collections
import
deque
from
typing
import
List
from
typing
import
List
,
Set
,
Tuple
from
unittest.mock
import
MagicMock
import
pytest
# noqa
from
vllm.config
import
CacheConfig
,
LoRAConfig
,
SchedulerConfig
from
vllm.core.interfaces
import
AllocStatus
from
vllm.core.policy
import
PolicyFactory
from
vllm.core.scheduler
import
Scheduler
,
SchedulingBudget
from
vllm.lora.request
import
LoRARequest
from
vllm.sequence
import
Logprob
,
SequenceGroup
,
SequenceStatus
...
...
@@ -65,7 +64,7 @@ def test_scheduler_abort_seq_group():
# Add multiple seq groups to scheduler.
num_seq_group
=
4
request_ids
=
set
()
request_ids
:
Set
[
str
]
=
set
()
for
i
in
range
(
num_seq_group
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
block_size
)
scheduler
.
add_seq_group
(
seq_group
)
...
...
@@ -347,11 +346,11 @@ def test_prefill_schedule_max_prompt_len():
Test prompt longer than max_prompt_len is aborted.
"""
scheduler
=
initialize_scheduler
(
max_model_len
=
30
)
_
,
seq_group
=
create_dummy_prompt
(
0
,
prompt_length
=
60
)
waiting
=
deque
([
seq_group
]
)
_
,
seq_group
=
create_dummy_prompt
(
"0"
,
prompt_length
=
60
)
scheduler
.
add_seq_group
(
seq_group
)
budget
=
create_token_budget
()
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
1
assert
len
(
output
.
seq_groups
)
==
0
assert
budget
.
num_batched_tokens
==
0
...
...
@@ -364,15 +363,14 @@ def test_prefill_schedule_token_budget():
Test token budget respected.
"""
scheduler
=
initialize_scheduler
()
waiting
=
deque
()
budget
=
create_token_budget
(
token_budget
=
0
)
for
i
in
range
(
2
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
waiting
.
append
(
seq_group
)
scheduler
.
add_seq_group
(
seq_group
)
# 0 token budget == nothing is scheduled.
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
0
assert
len
(
output
.
seq_groups
)
==
0
assert
budget
.
num_batched_tokens
==
0
...
...
@@ -381,8 +379,8 @@ def test_prefill_schedule_token_budget():
# 60 token budget == 1 request scheduled.
budget
=
create_token_budget
(
token_budget
=
60
)
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
0
assert
len
(
output
.
seq_groups
)
==
1
assert
budget
.
num_batched_tokens
==
60
...
...
@@ -391,14 +389,13 @@ def test_prefill_schedule_token_budget():
# Test when current_batched_tokens respected.
scheduler
=
initialize_scheduler
()
waiting
=
deque
()
budget
=
create_token_budget
(
token_budget
=
60
)
add_token_budget
(
budget
,
30
,
0
)
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
# Cannot schedule a prompt that doesn't fit the budget.
waiting
.
append
(
seq_group
)
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
scheduler
.
add_seq_group
(
seq_group
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
0
assert
len
(
output
.
seq_groups
)
==
0
assert
budget
.
num_batched_tokens
==
30
...
...
@@ -406,8 +403,8 @@ def test_prefill_schedule_token_budget():
assert
len
(
remaining_waiting
)
==
1
budget
=
create_token_budget
(
token_budget
=
90
)
add_token_budget
(
budget
,
30
,
0
)
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
seq_groups
)
==
1
assert
budget
.
num_batched_tokens
==
90
assert
budget
.
num_curr_seqs
==
1
...
...
@@ -419,13 +416,12 @@ def test_prefill_schedule_max_seqs():
Test max seq respected.
"""
scheduler
=
initialize_scheduler
()
waiting
=
deque
()
budget
=
create_token_budget
(
max_num_seqs
=
2
)
for
i
in
range
(
3
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
waiting
.
append
(
seq_group
)
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
scheduler
.
add_seq_group
(
seq_group
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
0
assert
len
(
output
.
seq_groups
)
==
2
assert
budget
.
num_batched_tokens
==
120
...
...
@@ -433,13 +429,13 @@ def test_prefill_schedule_max_seqs():
assert
len
(
remaining_waiting
)
==
1
# Verify curr_num_seqs respected.
waiting
=
deque
()
scheduler
.
waiting
=
deque
()
budget
=
create_token_budget
(
max_num_seqs
=
2
)
add_token_budget
(
budget
,
0
,
2
)
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
waiting
.
append
(
seq_group
)
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
scheduler
.
add_seq_group
(
seq_group
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
0
assert
len
(
output
.
seq_groups
)
==
0
assert
budget
.
num_batched_tokens
==
0
...
...
@@ -453,17 +449,16 @@ def test_prefill_schedule_max_lora():
"""
lora_config
=
LoRAConfig
(
max_lora_rank
=
8
,
max_loras
=
1
)
scheduler
=
initialize_scheduler
(
lora_config
=
lora_config
)
waiting
=
deque
()
budget
=
create_token_budget
(
token_budget
=
120
)
curr_loras
=
set
()
curr_loras
:
Set
[
int
]
=
set
()
for
i
in
range
(
2
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
,
lora_request
=
LoRARequest
(
lora_name
=
str
(
i
),
lora_int_id
=
i
+
1
,
lora_
local_
path
=
"abc"
))
waiting
.
append
(
seq_group
)
lora_path
=
"abc"
))
scheduler
.
add_seq_group
(
seq_group
)
# Add two more requests to verify lora is prioritized.
# 0: Lora, 1: Lora, 2: regular, 3: regular
# In the first iteration, index 0, 2 is scheduled.
...
...
@@ -471,10 +466,10 @@ def test_prefill_schedule_max_lora():
# prioritized. Verify that.
for
i
in
range
(
2
,
4
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
waiting
.
append
(
seq_group
)
scheduler
.
add_seq_group
(
seq_group
)
# Schedule 2 requests (0 and 2)
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
curr_loras
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
curr_loras
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
0
assert
len
(
output
.
seq_groups
)
==
2
assert
budget
.
num_batched_tokens
==
120
...
...
@@ -485,8 +480,8 @@ def test_prefill_schedule_max_lora():
# Reset curr_loras so that it can be scheduled.
curr_loras
=
set
()
budget
=
create_token_budget
(
token_budget
=
60
)
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
remaining_waiting
,
budget
,
curr_loras
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
curr_loras
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
seq_groups
)
==
1
assert
output
.
seq_groups
[
0
].
seq_group
.
request_id
==
"1"
assert
len
(
remaining_waiting
)
==
1
...
...
@@ -499,31 +494,29 @@ def test_prefill_schedule_no_block_manager_capacity():
Test sequence cannot be scheduled due to block manager has no capacity.
"""
scheduler
=
initialize_scheduler
()
waiting
=
deque
()
budget
=
create_token_budget
()
for
i
in
range
(
3
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
waiting
.
append
(
seq_group
)
scheduler
.
add_seq_group
(
seq_group
)
scheduler
.
block_manager
.
can_allocate
=
MagicMock
()
scheduler
.
block_manager
.
can_allocate
.
return_value
=
AllocStatus
.
LATER
remainig_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
0
assert
len
(
output
.
seq_groups
)
==
0
assert
budget
.
num_batched_tokens
==
0
assert
budget
.
num_curr_seqs
==
0
assert
len
(
remainig_waiting
)
==
3
assert
len
(
remaini
n
g_waiting
)
==
3
scheduler
=
initialize_scheduler
()
waiting
=
deque
()
budget
=
create_token_budget
()
for
i
in
range
(
3
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
waiting
.
append
(
seq_group
)
scheduler
.
add_seq_group
(
seq_group
)
scheduler
.
block_manager
.
can_allocate
=
MagicMock
()
scheduler
.
block_manager
.
can_allocate
.
return_value
=
AllocStatus
.
NEVER
remaining_waiting
,
output
=
scheduler
.
_schedule_prefills
(
waiting
,
budget
,
None
)
output
=
scheduler
.
_schedule_prefills
(
budget
,
None
)
remaining_waiting
=
scheduler
.
waiting
assert
len
(
output
.
ignored_seq_groups
)
==
3
assert
len
(
output
.
seq_groups
)
==
0
assert
budget
.
num_batched_tokens
==
0
...
...
@@ -536,14 +529,12 @@ def test_decode_schedule_preempted():
Test decodes cannot be scheduled and preempted.
"""
scheduler
=
initialize_scheduler
()
running
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
for
i
in
range
(
3
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
running
.
append
(
seq_group
)
scheduler
.
_add_seq_group_to_
running
(
seq_group
)
scheduler
.
block_manager
.
can_append_slots
=
MagicMock
()
def
cannot_append_second_group
(
seq_group
,
num_lookahead_slots
):
...
...
@@ -555,8 +546,8 @@ def test_decode_schedule_preempted():
# 1 cannot be scheduled, and the lowest priority (request 2)
# should be preempted. 1 will also be preempted.
budget
=
create_token_budget
()
remainig_running
,
output
=
scheduler
.
_schedule_running
(
running
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_running
(
budget
,
curr_loras
)
remainig_running
=
scheduler
.
running
assert
len
(
remainig_running
)
==
0
assert
len
(
output
.
decode_seq_groups
)
==
1
assert
len
(
output
.
prefill_seq_groups
)
==
0
...
...
@@ -577,14 +568,12 @@ def test_decode_swap_beam_search():
Test best_of > 1 swap out blocks
"""
scheduler
=
initialize_scheduler
()
running
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
budget
=
create_token_budget
()
for
i
in
range
(
3
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
,
best_of
=
2
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
running
.
append
(
seq_group
)
scheduler
.
_add_seq_group_to_
running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
budget
.
add_num_seqs
(
seq_group
.
request_id
,
seq_group
.
get_max_num_running_seqs
())
...
...
@@ -603,8 +592,8 @@ def test_decode_swap_beam_search():
expected_swap_mapping
=
[(
"5"
,
"7"
)]
scheduler
.
block_manager
.
swap_out
.
return_value
=
expected_swap_mapping
remainig_running
,
output
=
scheduler
.
_schedule_running
(
running
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_running
(
budget
,
curr_loras
)
remainig_running
=
scheduler
.
running
assert
len
(
remainig_running
)
==
0
assert
len
(
output
.
decode_seq_groups
)
==
2
assert
len
(
output
.
prefill_seq_groups
)
==
0
...
...
@@ -628,20 +617,18 @@ def test_schedule_decode_blocks_to_copy_update():
"""
scheduler
=
initialize_scheduler
()
_
,
seq_group
=
create_dummy_prompt
(
"1"
,
prompt_length
=
60
,
best_of
=
2
)
running
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
running
.
append
(
seq_group
)
scheduler
.
_add_seq_group_to_
running
(
seq_group
)
# The last request should be swapped out.
scheduler
.
block_manager
.
append_slots
=
MagicMock
()
scheduler
.
block_manager
.
append_slots
.
return_value
=
[(
2
,
3
)]
budget
=
create_token_budget
()
remaining_running
,
output
=
scheduler
.
_schedule_running
(
running
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_running
(
budget
,
curr_loras
)
remaining_running
=
scheduler
.
running
assert
len
(
remaining_running
)
==
0
assert
len
(
output
.
decode_seq_groups
)
==
1
assert
len
(
output
.
prefill_seq_groups
)
==
0
...
...
@@ -656,19 +643,17 @@ def test_schedule_decode_blocks_to_copy_update():
def
test_schedule_swapped_simple
():
scheduler
=
initialize_scheduler
()
swapped
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
blocks_to_swap_out
=
[]
blocks_to_swap_out
:
List
[
Tuple
[
int
,
int
]]
=
[]
_
,
seq_group
=
create_dummy_prompt
(
"1"
,
prompt_length
=
60
,
best_of
=
2
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
scheduler
.
_swap_out
(
seq_group
,
blocks_to_swap_out
)
s
wapped
.
appe
n
d
(
seq_group
)
s
cheduler
.
_add_seq_group_to_sw
apped
(
seq_group
)
budget
=
create_token_budget
()
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
0
assert
budget
.
num_batched_tokens
==
1
assert
budget
.
num_curr_seqs
==
2
...
...
@@ -683,20 +668,18 @@ def test_schedule_swapped_simple():
def
test_schedule_swapped_max_token_budget
():
scheduler
=
initialize_scheduler
()
swapped
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
blocks_to_swap_out
=
[]
blocks_to_swap_out
:
List
[
Tuple
[
int
,
int
]]
=
[]
for
_
in
range
(
2
):
_
,
seq_group
=
create_dummy_prompt
(
"1"
,
prompt_length
=
60
,
best_of
=
2
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
scheduler
.
_swap_out
(
seq_group
,
blocks_to_swap_out
)
s
wapped
.
appe
n
d
(
seq_group
)
s
cheduler
.
_add_seq_group_to_sw
apped
(
seq_group
)
budget
=
create_token_budget
(
token_budget
=
1
)
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
1
assert
budget
.
num_batched_tokens
==
1
assert
budget
.
num_curr_seqs
==
2
...
...
@@ -706,8 +689,8 @@ def test_schedule_swapped_max_token_budget():
# Verify num_batched_tokens are respected.
budget
=
create_token_budget
(
token_budget
=
1
)
add_token_budget
(
budget
,
1
,
0
)
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
remaining_swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
1
assert
budget
.
num_batched_tokens
==
1
assert
budget
.
num_curr_seqs
==
0
...
...
@@ -717,20 +700,18 @@ def test_schedule_swapped_max_token_budget():
def
test_schedule_swapped_max_seqs
():
scheduler
=
initialize_scheduler
()
swapped
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
blocks_to_swap_out
=
[]
blocks_to_swap_out
:
List
[
Tuple
[
int
,
int
]]
=
[]
for
i
in
range
(
4
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
scheduler
.
_swap_out
(
seq_group
,
blocks_to_swap_out
)
s
wapped
.
appe
n
d
(
seq_group
)
s
cheduler
.
_add_seq_group_to_sw
apped
(
seq_group
)
budget
=
create_token_budget
(
max_num_seqs
=
2
)
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
2
assert
budget
.
num_batched_tokens
==
2
assert
budget
.
num_curr_seqs
==
2
...
...
@@ -738,8 +719,8 @@ def test_schedule_swapped_max_seqs():
assert
len
(
output
.
prefill_seq_groups
)
==
0
# Verify num_curr_seqs are respected.
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
remaining_swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
2
assert
budget
.
num_batched_tokens
==
2
assert
budget
.
num_curr_seqs
==
2
...
...
@@ -750,25 +731,23 @@ def test_schedule_swapped_max_seqs():
def
test_schedule_swapped_max_loras
():
lora_config
=
LoRAConfig
(
max_lora_rank
=
8
,
max_loras
=
1
)
scheduler
=
initialize_scheduler
(
lora_config
=
lora_config
)
swapped
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
set
()
blocks_to_swap_out
=
[]
curr_loras
:
Set
[
int
]
=
set
()
blocks_to_swap_out
:
List
[
Tuple
[
int
,
int
]]
=
[]
for
i
in
range
(
2
):
_
,
seq_group
=
create_dummy_prompt
(
str
(
i
),
prompt_length
=
60
,
lora_request
=
LoRARequest
(
lora_name
=
str
(
i
),
lora_int_id
=
i
+
1
,
lora_
local_
path
=
"abc"
))
lora_path
=
"abc"
))
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
scheduler
.
_swap_out
(
seq_group
,
blocks_to_swap_out
)
s
wapped
.
appe
n
d
(
seq_group
)
s
cheduler
.
_add_seq_group_to_sw
apped
(
seq_group
)
budget
=
create_token_budget
()
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
1
assert
budget
.
num_batched_tokens
==
1
assert
budget
.
num_curr_seqs
==
1
...
...
@@ -779,24 +758,22 @@ def test_schedule_swapped_max_loras():
def
test_schedule_swapped_cannot_swap_in
():
scheduler
=
initialize_scheduler
()
swapped
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
blocks_to_swap_out
=
[]
blocks_to_swap_out
:
List
[
Tuple
[
int
,
int
]]
=
[]
for
_
in
range
(
2
):
_
,
seq_group
=
create_dummy_prompt
(
"1"
,
prompt_length
=
60
,
best_of
=
2
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
scheduler
.
_swap_out
(
seq_group
,
blocks_to_swap_out
)
s
wapped
.
appe
n
d
(
seq_group
)
s
cheduler
.
_add_seq_group_to_sw
apped
(
seq_group
)
# The last request should be swapped out.
scheduler
.
block_manager
.
can_swap_in
=
MagicMock
()
scheduler
.
block_manager
.
can_swap_in
.
return_value
=
AllocStatus
.
LATER
# Since we cannot swap in, none of the requests are swapped in.
budget
=
create_token_budget
()
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
2
assert
budget
.
num_batched_tokens
==
0
assert
budget
.
num_curr_seqs
==
0
...
...
@@ -806,24 +783,22 @@ def test_schedule_swapped_cannot_swap_in():
def
test_infeasible_swap
():
scheduler
=
initialize_scheduler
()
swapped
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
blocks_to_swap_out
=
[]
blocks_to_swap_out
:
List
[
Tuple
[
int
,
int
]]
=
[]
for
_
in
range
(
2
):
_
,
seq_group
=
create_dummy_prompt
(
"1"
,
prompt_length
=
60
,
best_of
=
2
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
scheduler
.
_swap_out
(
seq_group
,
blocks_to_swap_out
)
s
wapped
.
appe
n
d
(
seq_group
)
s
cheduler
.
_add_seq_group_to_sw
apped
(
seq_group
)
# The last request should be swapped out.
scheduler
.
block_manager
.
can_swap_in
=
MagicMock
()
scheduler
.
block_manager
.
can_swap_in
.
return_value
=
AllocStatus
.
NEVER
# Since we cannot swap in, none of the requests are swapped in.
budget
=
create_token_budget
()
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
0
assert
len
(
output
.
infeasible_seq_groups
)
==
2
assert
budget
.
num_batched_tokens
==
0
...
...
@@ -834,23 +809,21 @@ def test_infeasible_swap():
def
test_schedule_swapped_blocks_to_copy
():
scheduler
=
initialize_scheduler
()
swapped
=
deque
()
policy
=
PolicyFactory
.
get_policy
(
policy_name
=
"fcfs"
)
curr_loras
=
None
_
,
seq_group
=
create_dummy_prompt
(
"1"
,
prompt_length
=
60
,
best_of
=
2
)
scheduler
.
_allocate_and_set_running
(
seq_group
)
append_new_token_seq_group
(
60
,
seq_group
,
1
)
blocks_to_swap_out
=
[]
blocks_to_swap_out
:
List
[
Tuple
[
int
,
int
]]
=
[]
scheduler
.
_swap_out
(
seq_group
,
blocks_to_swap_out
)
s
wapped
.
appe
n
d
(
seq_group
)
s
cheduler
.
_add_seq_group_to_sw
apped
(
seq_group
)
# The last request should be swapped out.
scheduler
.
block_manager
.
append_slots
=
MagicMock
()
scheduler
.
block_manager
.
append_slots
.
return_value
=
[(
2
,
3
)]
budget
=
create_token_budget
()
remaining_swapped
,
output
=
scheduler
.
_schedule_swapped
(
swapped
,
budget
,
curr_loras
,
policy
)
output
=
scheduler
.
_schedule_swapped
(
budget
,
curr_loras
)
remaining_swapped
=
scheduler
.
swapped
assert
len
(
remaining_swapped
)
==
0
assert
len
(
output
.
decode_seq_groups
)
==
1
assert
len
(
output
.
prefill_seq_groups
)
==
0
...
...
tests/core/utils.py
View file @
e7c1b7f3
import
time
from
typing
import
Iterable
,
Optional
,
Tuple
from
typing
import
List
,
Optional
from
typing
import
Sequence
as
GenericSequence
from
typing
import
Tuple
from
vllm
import
SamplingParams
from
vllm.lora.request
import
LoRARequest
...
...
@@ -46,7 +48,7 @@ def create_dummy_prompt_encoder_decoder(
lora_request
:
Optional
[
LoRARequest
]
=
None
,
use_beam_search
:
bool
=
False
,
best_of
:
int
=
1
,
)
->
Tuple
[
Sequence
,
SequenceGroup
]:
)
->
Tuple
[
Sequence
,
Sequence
,
SequenceGroup
]:
if
not
block_size
:
block_size
=
decoder_prompt_length
...
...
@@ -86,7 +88,7 @@ def create_dummy_prompt_encoder_decoder(
def
create_seq_group
(
seq_prompt_len
:
int
=
1024
,
seq_output_lens
:
Iterabl
e
[
int
]
=
(
128
,
),
seq_output_lens
:
GenericSequenc
e
[
int
]
=
(
128
,
),
request_id
:
str
=
'0'
,
seq_id_start
:
int
=
0
,
sampling_params
:
Optional
[
SamplingParams
]
=
None
)
->
SequenceGroup
:
...
...
@@ -98,7 +100,7 @@ def create_seq_group(
prompt_token_ids
=
[
0
]
*
seq_prompt_len
seqs
=
[]
seqs
:
List
[
Sequence
]
=
[]
for
seq_id_offset
,
output_len
in
enumerate
(
seq_output_lens
):
seq
=
Sequence
(
seq_id
=
seq_id_start
+
seq_id_offset
,
...
...
@@ -125,7 +127,7 @@ def create_seq_group(
def
create_seq_group_encoder_decoder
(
seq_prompt_len
:
int
=
1024
,
seq_output_lens
:
Iterabl
e
[
int
]
=
(
128
,
),
seq_output_lens
:
GenericSequenc
e
[
int
]
=
(
128
,
),
request_id
:
str
=
'0'
,
seq_id_start
:
int
=
0
,
sampling_params
:
Optional
[
SamplingParams
]
=
None
)
->
SequenceGroup
:
...
...
tests/distributed/test_basic_distributed_correctness.py
View file @
e7c1b7f3
"""Compare the outputs of HF and distributed vLLM when using greedy sampling.
vLLM will allocate all the available memory, so we need to run the tests one
by one. The solution is to pass arguments (model name) by environment
variables.
Run:
```sh
cd $VLLM_PATH/tests
TEST_DIST_MODEL=facebook/opt-125m pytest
\
distributed/test_basic_distributed_correctness.py
TEST_DIST_MODEL=meta-llama/Llama-2-7b-hf
\
distributed/test_basic_distributed_correctness.py
pytest distributed/test_basic_distributed_correctness.py
```
"""
import
os
import
pytest
import
torch
MODELS
=
[
os
.
environ
[
"TEST_DIST_MODEL"
],
]
DISTRIBUTED_EXECUTOR_BACKEND
=
"DISTRIBUTED_EXECUTOR_BACKEND"
VLLM_ATTENTION_BACKEND
=
"VLLM_ATTENTION_BACKEND"
from
vllm.utils
import
cuda_device_count_stateless
from
..models.utils
import
check_outputs_equal
from
..utils
import
fork_new_process_for_each_test
TARGET_TEST_SUITE
=
os
.
environ
.
get
(
"TARGET_TEST_SUITE"
,
"L4"
)
@
pytest
.
mark
.
skipif
(
torch
.
cuda
.
device_count
()
<
2
,
@
pytest
.
mark
.
skipif
(
cuda
_
device_count
_stateless
()
<
2
,
reason
=
"Need at least 2 GPUs to run the test."
)
@
pytest
.
mark
.
parametrize
(
"model"
,
MODELS
)
@
pytest
.
mark
.
parametrize
(
"dtype"
,
[
"half"
])
@
pytest
.
mark
.
parametrize
(
"max_tokens"
,
[
5
])
@
pytest
.
mark
.
parametrize
(
"model, distributed_executor_backend, attention_backend, test_suite"
,
[
(
"facebook/opt-125m"
,
"ray"
,
""
,
"L4"
),
(
"facebook/opt-125m"
,
"mp"
,
""
,
"L4"
),
(
"meta-llama/Llama-2-7b-hf"
,
"ray"
,
""
,
"L4"
),
(
"meta-llama/Llama-2-7b-hf"
,
"mp"
,
""
,
"L4"
),
(
"facebook/opt-125m"
,
"ray"
,
""
,
"A100"
),
(
"facebook/opt-125m"
,
"mp"
,
""
,
"A100"
),
(
"facebook/opt-125m"
,
"mp"
,
"FLASHINFER"
,
"A100"
),
(
"meta-llama/Meta-Llama-3-8B"
,
"ray"
,
"FLASHINFER"
,
"A100"
),
])
@
fork_new_process_for_each_test
def
test_models
(
hf_runner
,
vllm_runner
,
example_prompts
,
model
:
str
,
dtype
:
str
,
max_tokens
:
int
,
distributed_executor_backend
:
str
,
attention_backend
:
str
,
test_suite
:
str
,
)
->
None
:
distributed_executor_backend
=
os
.
getenv
(
DISTRIBUTED_EXECUTOR_BACKEND
)
backend_by_env_var
=
os
.
getenv
(
VLLM_ATTENTION_BACKEND
)
enforce_eager
=
backend_by_env_var
==
"FLASHINFER"
if
test_suite
!=
TARGET_TEST_SUITE
:
pytest
.
skip
(
f
"Skip test for
{
test_suite
}
"
)
with
hf_runner
(
model
,
dtype
=
dtype
)
as
hf_model
:
hf_outputs
=
hf_model
.
generate_greedy
(
example_prompts
,
max_tokens
)
if
model
==
"meta-llama/Llama-2-7b-hf"
and
distributed_executor_backend
==
"ray"
and
attention_backend
==
""
and
test_suite
==
"L4"
:
# noqa
# test ray adag
os
.
environ
[
'VLLM_USE_RAY_SPMD_WORKER'
]
=
"1"
os
.
environ
[
'VLLM_USE_RAY_COMPILED_DAG'
]
=
"1"
if
attention_backend
:
os
.
environ
[
"VLLM_ATTENTION_BACKEND"
]
=
attention_backend
dtype
=
"half"
max_tokens
=
5
# NOTE: take care of the order. run vLLM first, and then run HF.
# vLLM needs a fresh new process without cuda initialization.
# if we run HF first, the cuda initialization will be done and it
# will hurt multiprocessing backend with fork method (the default method).
with
vllm_runner
(
model
,
dtype
=
dtype
,
tensor_parallel_size
=
2
,
enforce_eager
=
enforce_eager
,
distributed_executor_backend
=
distributed_executor_backend
)
as
vllm_model
:
vllm_outputs
=
vllm_model
.
generate_greedy
(
example_prompts
,
max_tokens
)
for
i
in
range
(
len
(
example_prompts
)):
hf_output_ids
,
hf_output_str
=
hf_outputs
[
i
]
vllm_output_ids
,
vllm_output_str
=
vllm_outputs
[
i
]
assert
hf_output_str
==
vllm_output_str
,
(
f
"Test
{
i
}
:
\n
HF:
{
hf_output_str
!
r
}
\n
vLLM:
{
vllm_output_str
!
r
}
"
)
assert
hf_output_ids
==
vllm_output_ids
,
(
f
"Test
{
i
}
:
\n
HF:
{
hf_output_ids
}
\n
vLLM:
{
vllm_output_ids
}
"
)
with
hf_runner
(
model
,
dtype
=
dtype
)
as
hf_model
:
hf_outputs
=
hf_model
.
generate_greedy
(
example_prompts
,
max_tokens
)
check_outputs_equal
(
outputs_0_lst
=
hf_outputs
,
outputs_1_lst
=
vllm_outputs
,
name_0
=
"hf"
,
name_1
=
"vllm"
,
)
tests/distributed/test_chunked_prefill_distributed.py
View file @
e7c1b7f3
"""Compare the outputs of HF and distributed vLLM when using greedy sampling.
vLLM will allocate all the available memory, so we need to run the tests one
by one. The solution is to pass arguments (model name) by environment
variables.
Run:
```sh
TEST_DIST_MODEL=facebook/opt-125m pytest
\
test_chunked_prefill_distributed.py
TEST_DIST_MODEL=meta-llama/Llama-2-7b-hf
\
test_chunked_prefill_distributed.py
pytest test_chunked_prefill_distributed.py
```
"""
import
os
import
pytest
import
torch
MODELS
=
[
os
.
environ
[
"TEST_DIST_MODEL"
],
]
DISTRIBUTED_EXECUTOR_BACKEND
=
"DISTRIBUTED_EXECUTOR_BACKEND"
from
vllm.utils
import
cuda_device_count_stateless
from
..models.utils
import
check_outputs_equal
from
..utils
import
fork_new_process_for_each_test
@
pytest
.
mark
.
skipif
(
torch
.
cuda
.
device_count
()
<
2
,
@
pytest
.
mark
.
skipif
(
cuda_device_count_stateless
()
<
2
,
reason
=
"Need at least 2 GPUs to run the test."
)
@
pytest
.
mark
.
parametrize
(
"model"
,
MODELS
)
@
pytest
.
mark
.
parametrize
(
"dtype"
,
[
"half"
])
@
pytest
.
mark
.
parametrize
(
"max_tokens"
,
[
5
])
@
pytest
.
mark
.
parametrize
(
"chunked_prefill_token_size"
,
[
16
])
@
pytest
.
mark
.
parametrize
(
"model, distributed_executor_backend"
,
[
(
"facebook/opt-125m"
,
"ray"
),
(
"meta-llama/Llama-2-7b-hf"
,
"ray"
),
(
"facebook/opt-125m"
,
"mp"
),
(
"meta-llama/Llama-2-7b-hf"
,
"mp"
),
])
@
fork_new_process_for_each_test
def
test_models
(
hf_runner
,
vllm_runner
,
example_prompts
,
model
:
str
,
dtype
:
str
,
max_tokens
:
int
,
chunked_prefill_token_size
:
int
,
distributed_executor_backend
:
str
,
)
->
None
:
distributed_executor_backend
=
os
.
getenv
(
DISTRIBUTED_EXECUTOR_BACKEND
)
dtype
=
"half"
max_tokens
=
5
chunked_prefill_token_size
=
16
# Add a chunked prefill config.
max_num_seqs
=
min
(
chunked_prefill_token_size
,
256
)
...
...
@@ -45,8 +41,10 @@ def test_models(
enable_chunked_prefill
=
True
max_num_batched_tokens
=
chunked_prefill_token_size
with
hf_runner
(
model
,
dtype
=
dtype
)
as
hf_model
:
hf_outputs
=
hf_model
.
generate_greedy
(
example_prompts
,
max_tokens
)
# NOTE: take care of the order. run vLLM first, and then run HF.
# vLLM needs a fresh new process without cuda initialization.
# if we run HF first, the cuda initialization will be done and it
# will hurt multiprocessing backend with fork method (the default method).
with
vllm_runner
(
model
,
...
...
@@ -59,10 +57,12 @@ def test_models(
)
as
vllm_model
:
vllm_outputs
=
vllm_model
.
generate_greedy
(
example_prompts
,
max_tokens
)
for
i
in
range
(
len
(
example_prompts
)):
hf_output_ids
,
hf_output_str
=
hf_outputs
[
i
]
vllm_output_ids
,
vllm_output_str
=
vllm_outputs
[
i
]
assert
hf_output_str
==
vllm_output_str
,
(
f
"Test
{
i
}
:
\n
HF:
{
hf_output_str
!
r
}
\n
vLLM:
{
vllm_output_str
!
r
}
"
)
assert
hf_output_ids
==
vllm_output_ids
,
(
f
"Test
{
i
}
:
\n
HF:
{
hf_output_ids
}
\n
vLLM:
{
vllm_output_ids
}
"
)
with
hf_runner
(
model
,
dtype
=
dtype
)
as
hf_model
:
hf_outputs
=
hf_model
.
generate_greedy
(
example_prompts
,
max_tokens
)
check_outputs_equal
(
outputs_0_lst
=
hf_outputs
,
outputs_1_lst
=
vllm_outputs
,
name_0
=
"hf"
,
name_1
=
"vllm"
,
)
tests/distributed/test_comm_ops.py
View file @
e7c1b7f3
...
...
@@ -8,12 +8,11 @@ import pytest
import
ray
import
torch
from
vllm.distributed
import
(
broadcast_tensor_dict
,
from
vllm.distributed
import
(
broadcast_tensor_dict
,
get_pp_group
,
tensor_model_parallel_all_gather
,
tensor_model_parallel_all_reduce
)
from
..utils
import
(
init_test_distributed_environment
,
multi_process_tensor_parallel
)
from
..utils
import
init_test_distributed_environment
,
multi_process_parallel
@
ray
.
remote
(
num_gpus
=
1
,
max_calls
=
1
)
...
...
@@ -33,7 +32,7 @@ def all_reduce_test_worker(tp_size: int, pp_size: int, rank: int,
(
r
+
1
)
for
r
in
range
(
tp_size
)
]
expected
=
torch
.
sum
(
torch
.
stack
(
all_tensors
,
dim
=
0
),
dim
=
0
)
t
=
all_tensors
[
rank
]
t
=
all_tensors
[
rank
%
tp_size
]
t
=
tensor_model_parallel_all_reduce
(
t
)
assert
torch
.
allclose
(
t
,
expected
)
...
...
@@ -61,7 +60,7 @@ def all_gather_test_worker(tp_size: int, pp_size: int, rank: int,
for
r
in
range
(
tp_size
)
]
expected
=
torch
.
cat
(
all_tensors
,
dim
=
all_gather_dimension
)
t
=
all_tensors
[
rank
]
t
=
all_tensors
[
rank
%
tp_size
]
t
=
tensor_model_parallel_all_gather
(
t
,
all_gather_dimension
)
assert
torch
.
allclose
(
t
,
expected
)
...
...
@@ -92,7 +91,7 @@ def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
"f"
:
torch
.
tensor
([],
dtype
=
torch
.
float32
,
device
=
"cuda"
),
}
if
rank
==
0
:
if
(
rank
%
tp_size
)
==
0
:
broadcast_tensor_dict
(
test_dict
,
src
=
0
)
else
:
recv_dict
=
broadcast_tensor_dict
(
src
=
0
)
...
...
@@ -105,6 +104,68 @@ def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
assert
torch
.
allclose
(
recv_dict
[
"f"
],
test_dict
[
"f"
])
@
ray
.
remote
(
num_gpus
=
1
,
max_calls
=
1
)
def
send_recv_tensor_dict_test_worker
(
tp_size
:
int
,
pp_size
:
int
,
rank
:
int
,
distributed_init_port
:
str
):
del
os
.
environ
[
"CUDA_VISIBLE_DEVICES"
]
device
=
torch
.
device
(
f
"cuda:
{
rank
}
"
)
torch
.
cuda
.
set_device
(
device
)
init_test_distributed_environment
(
tp_size
,
pp_size
,
rank
,
distributed_init_port
)
test_dict
=
{
# device tensor
"a"
:
torch
.
arange
(
8
,
dtype
=
torch
.
float32
,
device
=
"cuda"
),
# CPU tensor
"b"
:
torch
.
arange
(
16
,
dtype
=
torch
.
int8
,
device
=
"cpu"
),
"c"
:
"test"
,
"d"
:
[
1
,
2
,
3
],
"e"
:
{
"a"
:
1
,
"b"
:
2
},
# empty tensor
"f"
:
torch
.
tensor
([],
dtype
=
torch
.
float32
,
device
=
"cuda"
),
}
if
not
get_pp_group
().
is_first_rank
:
recv_dict
=
get_pp_group
().
recv_tensor_dict
()
if
not
get_pp_group
().
is_last_rank
:
get_pp_group
().
send_tensor_dict
(
test_dict
)
if
not
get_pp_group
().
is_first_rank
:
assert
len
(
recv_dict
)
==
len
(
test_dict
)
assert
torch
.
allclose
(
recv_dict
[
"a"
],
test_dict
[
"a"
])
assert
torch
.
allclose
(
recv_dict
[
"b"
],
test_dict
[
"b"
])
assert
recv_dict
[
"c"
]
==
test_dict
[
"c"
]
assert
recv_dict
[
"d"
]
==
test_dict
[
"d"
]
assert
recv_dict
[
"e"
]
==
test_dict
[
"e"
]
assert
torch
.
allclose
(
recv_dict
[
"f"
],
test_dict
[
"f"
])
@
ray
.
remote
(
num_gpus
=
1
,
max_calls
=
1
)
def
send_recv_test_worker
(
tp_size
:
int
,
pp_size
:
int
,
rank
:
int
,
distributed_init_port
:
str
):
del
os
.
environ
[
"CUDA_VISIBLE_DEVICES"
]
device
=
torch
.
device
(
f
"cuda:
{
rank
}
"
)
torch
.
cuda
.
set_device
(
device
)
init_test_distributed_environment
(
tp_size
,
pp_size
,
rank
,
distributed_init_port
)
size
=
64
test_tensor
=
torch
.
arange
(
64
,
dtype
=
torch
.
float32
,
device
=
"cuda"
)
if
not
get_pp_group
().
is_first_rank
:
recv_tensor
=
get_pp_group
().
recv
(
size
,
dtype
=
torch
.
float32
)
if
not
get_pp_group
().
is_last_rank
:
get_pp_group
().
send
(
test_tensor
)
if
not
get_pp_group
().
is_first_rank
:
assert
torch
.
allclose
(
test_tensor
,
recv_tensor
)
@
pytest
.
mark
.
skipif
(
torch
.
cuda
.
device_count
()
<
2
,
reason
=
"Need at least 2 GPUs to run the test."
)
@
pytest
.
mark
.
parametrize
(
"tp_size"
,
[
2
])
...
...
@@ -113,4 +174,27 @@ def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
broadcast_tensor_dict_test_worker
])
def
test_multi_process_tensor_parallel
(
tp_size
,
test_target
):
multi_process_tensor_parallel
(
tp_size
,
1
,
test_target
)
multi_process_parallel
(
tp_size
,
1
,
test_target
)
@
pytest
.
mark
.
skipif
(
torch
.
cuda
.
device_count
()
<
2
,
reason
=
"Need at least 2 GPUs to run the test."
)
@
pytest
.
mark
.
parametrize
(
"pp_size"
,
[
2
])
@
pytest
.
mark
.
parametrize
(
"test_target"
,
[
send_recv_test_worker
,
send_recv_tensor_dict_test_worker
])
def
test_multi_process_pipeline_parallel
(
pp_size
,
test_target
):
multi_process_parallel
(
1
,
pp_size
,
test_target
)
@
pytest
.
mark
.
skipif
(
torch
.
cuda
.
device_count
()
<
4
,
reason
=
"Need at least 4 GPUs to run the test."
)
@
pytest
.
mark
.
parametrize
(
"tp_size"
,
[
2
])
@
pytest
.
mark
.
parametrize
(
"pp_size"
,
[
2
])
@
pytest
.
mark
.
parametrize
(
"test_target"
,
[
send_recv_test_worker
,
send_recv_tensor_dict_test_worker
,
all_reduce_test_worker
,
all_gather_test_worker
,
broadcast_tensor_dict_test_worker
])
def
test_multi_process_tensor_parallel_pipeline_parallel
(
tp_size
,
pp_size
,
test_target
):
multi_process_parallel
(
tp_size
,
pp_size
,
test_target
)
tests/distributed/test_custom_all_reduce.py
View file @
e7c1b7f3
...
...
@@ -11,8 +11,8 @@ from vllm.distributed.communication_op import ( # noqa
from
vllm.distributed.parallel_state
import
(
get_tensor_model_parallel_group
,
get_tp_group
,
graph_capture
)
from
..utils
import
(
init_test_distributed_environment
,
multi_process_
tensor_
parallel
)
from
..utils
import
(
ensure_model_parallel_initialized
,
init_test_distributed_environment
,
multi_process_parallel
)
random
.
seed
(
42
)
test_sizes
=
[
random
.
randint
(
1024
,
2048
*
1024
)
for
_
in
range
(
8
)]
...
...
@@ -27,8 +27,8 @@ def graph_allreduce(tp_size, pp_size, rank, distributed_init_port):
torch
.
cuda
.
set_device
(
device
)
init_test_distributed_environment
(
tp_size
,
pp_size
,
rank
,
distributed_init_port
)
group
=
get_tensor_model_parallel_group
()
ensure_model_parallel_initialized
(
tp_size
,
pp_size
)
group
=
get_tensor_model_parallel_group
()
.
device_group
# A small all_reduce for warmup.
# this is needed because device communicators might be created lazily
...
...
@@ -112,4 +112,4 @@ def test_custom_allreduce(tp_size, pipeline_parallel_size, test_target):
world_size
=
tp_size
*
pipeline_parallel_size
if
world_size
>
torch
.
cuda
.
device_count
():
pytest
.
skip
(
"Not enough GPUs to run the test."
)
multi_process_
tensor_
parallel
(
tp_size
,
pipeline_parallel_size
,
test_target
)
multi_process_parallel
(
tp_size
,
pipeline_parallel_size
,
test_target
)
tests/distributed/test_multimodal_broadcast.py
0 → 100644
View file @
e7c1b7f3
"""Compare the outputs of HF and distributed vLLM when using greedy sampling.
Run:
```sh
pytest -s -v test_multimodal_broadcast.py
```
"""
import
pytest
from
vllm.utils
import
cuda_device_count_stateless
from
..utils
import
fork_new_process_for_each_test
@
pytest
.
mark
.
skipif
(
cuda_device_count_stateless
()
<
2
,
reason
=
"Need at least 2 GPUs to run the test."
)
@
pytest
.
mark
.
parametrize
(
"model, distributed_executor_backend"
,
[
(
"llava-hf/llava-1.5-7b-hf"
,
"ray"
),
(
"llava-hf/llava-v1.6-mistral-7b-hf"
,
"ray"
),
(
"llava-hf/llava-1.5-7b-hf"
,
"mp"
),
(
"llava-hf/llava-v1.6-mistral-7b-hf"
,
"mp"
),
])
@
fork_new_process_for_each_test
def
test_models
(
hf_runner
,
vllm_runner
,
image_assets
,
model
:
str
,
distributed_executor_backend
:
str
)
->
None
:
dtype
=
"half"
max_tokens
=
5
num_logprobs
=
5
tensor_parallel_size
=
2
if
model
.
startswith
(
"llava-hf/llava-1.5"
):
from
..models.test_llava
import
models
,
run_test
elif
model
.
startswith
(
"llava-hf/llava-v1.6"
):
from
..models.test_llava_next
import
models
,
run_test
else
:
raise
NotImplementedError
(
f
"Unsupported model:
{
model
}
"
)
run_test
(
hf_runner
,
vllm_runner
,
image_assets
,
model
=
models
[
0
],
# So that LLaVA-NeXT processor may return nested list
size_factors
=
[
0.25
,
0.5
,
1.0
],
dtype
=
dtype
,
max_tokens
=
max_tokens
,
num_logprobs
=
num_logprobs
,
tensor_parallel_size
=
tensor_parallel_size
,
distributed_executor_backend
=
distributed_executor_backend
,
)
tests/distributed/test_pipeline_parallel.py
0 → 100644
View file @
e7c1b7f3
"""
WARNING: This test runs in both single-node (4 GPUs) and multi-node
(2 node with 2 GPUs each) modes. If the test only uses 2 GPUs, it is
important to set the distributed backend to "mp" to avoid Ray scheduling
all workers in a node other than the head node, which can cause the test
to fail.
"""
import
os
import
pytest
from
..utils
import
compare_two_settings
,
fork_new_process_for_each_test
VLLM_MULTI_NODE
=
os
.
getenv
(
"VLLM_MULTI_NODE"
,
"0"
)
==
"1"
@
pytest
.
mark
.
parametrize
((
"TP_SIZE, PP_SIZE, EAGER_MODE, CHUNKED_PREFILL, "
"MODEL_NAME, DIST_BACKEND"
),
[
(
2
,
2
,
0
,
1
,
"meta-llama/Meta-Llama-3-8B"
,
"ray"
),
(
2
,
2
,
1
,
0
,
"meta-llama/Meta-Llama-3-8B"
,
"ray"
),
(
1
,
3
,
0
,
0
,
"meta-llama/Meta-Llama-3-8B"
,
"ray"
),
(
1
,
4
,
0
,
1
,
"meta-llama/Meta-Llama-3-8B"
,
"ray"
),
(
1
,
4
,
1
,
0
,
"meta-llama/Meta-Llama-3-8B"
,
"ray"
),
(
2
,
2
,
0
,
1
,
"meta-llama/Meta-Llama-3-8B"
,
"mp"
),
(
2
,
2
,
1
,
0
,
"meta-llama/Meta-Llama-3-8B"
,
"mp"
),
(
1
,
3
,
0
,
0
,
"meta-llama/Meta-Llama-3-8B"
,
"mp"
),
(
1
,
4
,
0
,
1
,
"meta-llama/Meta-Llama-3-8B"
,
"mp"
),
(
1
,
4
,
1
,
0
,
"meta-llama/Meta-Llama-3-8B"
,
"mp"
),
])
def
test_compare_tp
(
TP_SIZE
,
PP_SIZE
,
EAGER_MODE
,
CHUNKED_PREFILL
,
MODEL_NAME
,
DIST_BACKEND
):
if
VLLM_MULTI_NODE
and
DIST_BACKEND
==
"mp"
:
pytest
.
skip
(
"Skipping multi-node pipeline parallel test for "
"multiprocessing distributed backend"
)
USE_RAY_ADAG_NCCL
=
0
USE_RAY_ADAG
=
0
pp_args
=
[
# use half precision for speed and memory savings in CI environment
"--dtype"
,
"float16"
,
"--pipeline-parallel-size"
,
str
(
PP_SIZE
),
"--tensor-parallel-size"
,
str
(
TP_SIZE
),
"--distributed-executor-backend"
,
DIST_BACKEND
,
]
# compare without pipeline parallelism
# NOTE: use mp backend for TP
# PP tests might involve multiple nodes, and ray might
# schedule all workers in a node other than the head node,
# which can cause the test to fail.
tp_args
=
[
# use half precision for speed and memory savings in CI environment
"--dtype"
,
"bfloat16"
,
"--tensor-parallel-size"
,
str
(
max
(
TP_SIZE
,
2
)),
# We only use 2 GPUs in the CI.
"--distributed-executor-backend"
,
"mp"
,
]
if
CHUNKED_PREFILL
:
pp_args
.
append
(
"--enable-chunked-prefill"
)
tp_args
.
append
(
"--enable-chunked-prefill"
)
if
EAGER_MODE
:
pp_args
.
append
(
"--enforce-eager"
)
tp_args
.
append
(
"--enforce-eager"
)
pp_env
=
None
if
USE_RAY_ADAG
:
assert
DIST_BACKEND
==
"ray"
,
(
"Ray ADAG is only supported with Ray distributed backend"
)
pp_env
=
{
"VLLM_USE_RAY_COMPILED_DAG"
:
"1"
,
"VLLM_USE_RAY_SPMD_WORKER"
:
"1"
,
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL"
:
str
(
int
(
USE_RAY_ADAG_NCCL
)),
}
compare_two_settings
(
MODEL_NAME
,
pp_args
,
tp_args
,
pp_env
)
@
pytest
.
mark
.
parametrize
(
"PP_SIZE, MODEL_NAME"
,
[
(
2
,
"JackFram/llama-160m"
),
])
@
pytest
.
mark
.
parametrize
(
"ATTN_BACKEND"
,
[
"FLASH_ATTN"
,
"FLASHINFER"
,
])
@
fork_new_process_for_each_test
def
test_pp_cudagraph
(
PP_SIZE
,
MODEL_NAME
,
ATTN_BACKEND
):
cudagraph_args
=
[
# use half precision for speed and memory savings in CI environment
"--dtype"
,
"float16"
,
"--pipeline-parallel-size"
,
str
(
PP_SIZE
),
"--distributed-executor-backend"
,
"mp"
,
]
os
.
environ
[
"VLLM_ATTENTION_BACKEND"
]
=
ATTN_BACKEND
eager_args
=
cudagraph_args
+
[
"--enforce-eager"
]
compare_two_settings
(
MODEL_NAME
,
eager_args
,
cudagraph_args
)
tests/distributed/test_pipeline_partition.py
0 → 100644
View file @
e7c1b7f3
import
os
import
pytest
from
vllm.distributed.utils
import
get_pp_indices
def
test_custom_layer_partition
():
def
_verify
(
partition_str
,
num_layers
,
pp_size
,
goldens
):
bak
=
os
.
environ
.
get
(
"VLLM_PP_LAYER_PARTITION"
,
None
)
os
.
environ
[
"VLLM_PP_LAYER_PARTITION"
]
=
partition_str
for
pp_rank
,
golden
in
enumerate
(
goldens
):
assert
get_pp_indices
(
num_layers
,
pp_rank
,
pp_size
)
==
golden
if
bak
is
not
None
:
os
.
environ
[
"VLLM_PP_LAYER_PARTITION"
]
=
bak
# Even partition
_verify
(
"5,5,5,5"
,
20
,
4
,
[(
0
,
5
),
(
5
,
10
),
(
10
,
15
),
(
15
,
20
)])
# Balanced partition
_verify
(
"4,6,6,4"
,
20
,
4
,
[(
0
,
4
),
(
4
,
10
),
(
10
,
16
),
(
16
,
20
)])
# Put reminder somewhere
_verify
(
"5,6,5,6"
,
22
,
4
,
[(
0
,
5
),
(
5
,
11
),
(
11
,
16
),
(
16
,
22
)])
# Invalid partition strings
with
pytest
.
raises
(
ValueError
):
_verify
(
"5,5,5,5,"
,
20
,
4
,
[(
0
,
5
),
(
5
,
10
),
(
10
,
15
),
(
15
,
20
)])
with
pytest
.
raises
(
ValueError
):
_verify
(
"5,5,5,a"
,
20
,
4
,
[(
0
,
5
),
(
5
,
10
),
(
10
,
15
),
(
15
,
20
)])
# Wrong number of partitions
with
pytest
.
raises
(
ValueError
):
_verify
(
"5,5,5"
,
20
,
4
,
[(
0
,
5
),
(
5
,
10
),
(
10
,
15
),
(
15
,
20
)])
# Wrong number of layers
with
pytest
.
raises
(
ValueError
):
_verify
(
"5,5,5,5"
,
21
,
4
,
[(
0
,
5
),
(
5
,
10
),
(
10
,
15
),
(
15
,
20
)])
tests/distributed/test_pynccl.py
View file @
e7c1b7f3
import
multiprocessing
import
os
from
typing
import
Dict
,
List
import
pytest
import
torch
...
...
@@ -17,9 +18,9 @@ from vllm.utils import update_environment_variables
def
distributed_run
(
fn
,
world_size
):
number_of_processes
=
world_size
processes
=
[]
processes
:
List
[
multiprocessing
.
Process
]
=
[]
for
i
in
range
(
number_of_processes
):
env
=
{}
env
:
Dict
[
str
,
str
]
=
{}
env
[
'RANK'
]
=
str
(
i
)
env
[
'LOCAL_RANK'
]
=
str
(
i
)
env
[
'WORLD_SIZE'
]
=
str
(
number_of_processes
)
...
...
@@ -167,9 +168,13 @@ def send_recv_worker_fn():
dtype
=
torch
.
float32
).
cuda
(
pynccl_comm
.
rank
)
with
pynccl_comm
.
change_state
(
enable
=
True
):
if
pynccl_comm
.
rank
==
0
:
pynccl_comm
.
send
(
tensor
)
pynccl_comm
.
send
(
tensor
,
dst
=
(
pynccl_comm
.
rank
+
1
)
%
pynccl_comm
.
world_size
)
else
:
pynccl_comm
.
recv
(
tensor
)
pynccl_comm
.
recv
(
tensor
,
src
=
(
pynccl_comm
.
rank
-
1
)
%
pynccl_comm
.
world_size
)
result
=
tensor
.
mean
().
cpu
().
item
()
assert
result
==
1
...
...
@@ -202,9 +207,13 @@ def multiple_send_recv_worker_fn():
device
=
device
)
with
pynccl_comm
.
change_state
(
enable
=
True
):
if
torch
.
distributed
.
get_rank
()
in
[
0
,
1
]:
pynccl_comm
.
send
(
tensor
)
pynccl_comm
.
send
(
tensor
,
dst
=
(
pynccl_comm
.
rank
+
1
)
%
pynccl_comm
.
world_size
)
else
:
pynccl_comm
.
recv
(
tensor
)
pynccl_comm
.
recv
(
tensor
,
src
=
(
pynccl_comm
.
rank
-
1
)
%
pynccl_comm
.
world_size
)
result
=
tensor
.
mean
().
cpu
().
item
()
if
torch
.
distributed
.
get_rank
()
in
[
0
,
2
]:
assert
result
==
1
...
...
tests/distributed/test_same_node.py
View file @
e7c1b7f3
...
...
@@ -2,10 +2,12 @@ import os
import
torch
from
vllm.distributed.parallel_state
import
is_
in_the_same_node
from
vllm.distributed.parallel_state
import
in_the_same_node
_as
torch
.
distributed
.
init_process_group
(
backend
=
"gloo"
)
test_result
=
is_in_the_same_node
(
torch
.
distributed
.
group
.
WORLD
)
test_result
=
all
(
in_the_same_node_as
(
torch
.
distributed
.
group
.
WORLD
,
source_rank
=
0
))
expected
=
os
.
environ
.
get
(
"VLLM_TEST_SAME_HOST"
,
"1"
)
==
"1"
assert
test_result
==
expected
,
f
"Expected
{
expected
}
, got
{
test_result
}
"
print
(
"Same node test passed!"
)
tests/distributed/test_shm_broadcast.py
0 → 100644
View file @
e7c1b7f3
import
multiprocessing
import
random
import
time
from
typing
import
List
import
numpy
as
np
import
torch.distributed
as
dist
from
vllm.distributed.device_communicators.shm_broadcast
import
MessageQueue
from
vllm.utils
import
update_environment_variables
def
get_arrays
(
n
:
int
,
seed
:
int
=
0
)
->
List
[
np
.
ndarray
]:
np
.
random
.
seed
(
seed
)
sizes
=
np
.
random
.
randint
(
1
,
10_000
,
n
)
# on average, each array will have 5k elements
# with int64, each array will have 40kb
return
[
np
.
random
.
randint
(
1
,
100
,
i
)
for
i
in
sizes
]
def
distributed_run
(
fn
,
world_size
):
number_of_processes
=
world_size
processes
=
[]
for
i
in
range
(
number_of_processes
):
env
=
{}
env
[
'RANK'
]
=
str
(
i
)
env
[
'LOCAL_RANK'
]
=
str
(
i
)
env
[
'WORLD_SIZE'
]
=
str
(
number_of_processes
)
env
[
'LOCAL_WORLD_SIZE'
]
=
str
(
number_of_processes
)
env
[
'MASTER_ADDR'
]
=
'localhost'
env
[
'MASTER_PORT'
]
=
'12345'
p
=
multiprocessing
.
Process
(
target
=
fn
,
args
=
(
env
,
))
processes
.
append
(
p
)
p
.
start
()
for
p
in
processes
:
p
.
join
()
for
p
in
processes
:
assert
p
.
exitcode
==
0
def
worker_fn_wrapper
(
fn
):
# `multiprocessing.Process` cannot accept environment variables directly
# so we need to pass the environment variables as arguments
# and update the environment variables in the function
def
wrapped_fn
(
env
):
update_environment_variables
(
env
)
dist
.
init_process_group
(
backend
=
"gloo"
)
fn
()
return
wrapped_fn
@
worker_fn_wrapper
def
worker_fn
():
writer_rank
=
2
broadcaster
=
MessageQueue
.
create_from_process_group
(
dist
.
group
.
WORLD
,
40
*
1024
,
2
,
writer_rank
)
if
dist
.
get_rank
()
==
writer_rank
:
seed
=
random
.
randint
(
0
,
1000
)
dist
.
broadcast_object_list
([
seed
],
writer_rank
)
else
:
recv
=
[
None
]
dist
.
broadcast_object_list
(
recv
,
writer_rank
)
seed
=
recv
[
0
]
# type: ignore
dist
.
barrier
()
# in case we find a race condition
# print the seed so that we can reproduce the error
print
(
f
"Rank
{
dist
.
get_rank
()
}
got seed
{
seed
}
"
)
# test broadcasting with about 400MB of data
N
=
10_000
if
dist
.
get_rank
()
==
writer_rank
:
arrs
=
get_arrays
(
N
,
seed
)
for
x
in
arrs
:
broadcaster
.
broadcast_object
(
x
)
time
.
sleep
(
random
.
random
()
/
1000
)
else
:
arrs
=
get_arrays
(
N
,
seed
)
for
x
in
arrs
:
y
=
broadcaster
.
broadcast_object
(
None
)
assert
np
.
array_equal
(
x
,
y
)
time
.
sleep
(
random
.
random
()
/
1000
)
dist
.
barrier
()
def
test_shm_broadcast
():
distributed_run
(
worker_fn
,
4
)
tests/distributed/test_utils.py
View file @
e7c1b7f3
import
os
import
ray
from
vllm.utils
import
cuda_device_count_stateless
import
vllm.envs
as
envs
from
vllm.utils
import
(
cuda_device_count_stateless
,
update_environment_variables
)
@
ray
.
remote
class
_CUDADeviceCountStatelessTestActor
()
:
class
_CUDADeviceCountStatelessTestActor
:
def
get_count
(
self
):
return
cuda_device_count_stateless
()
def
set_cuda_visible_devices
(
self
,
cuda_visible_devices
:
str
):
os
.
environ
[
"CUDA_VISIBLE_DEVICES"
]
=
cuda_visible_devices
update_environment_variables
(
{
"CUDA_VISIBLE_DEVICES"
:
cuda_visible_devices
})
def
get_cuda_visible_devices
(
self
):
return
os
.
environ
[
"
CUDA_VISIBLE_DEVICES
"
]
return
envs
.
CUDA_VISIBLE_DEVICES
def
test_cuda_device_count_stateless
():
"""Test that cuda_device_count_stateless changes return value if
CUDA_VISIBLE_DEVICES is changed."""
actor
=
_CUDADeviceCountStatelessTestActor
.
options
(
num_gpus
=
2
).
remote
()
assert
ray
.
get
(
actor
.
get_cuda_visible_devices
.
remote
())
==
"0,1"
actor
=
_CUDADeviceCountStatelessTestActor
.
options
(
# type: ignore
num_gpus
=
2
).
remote
()
assert
sorted
(
ray
.
get
(
actor
.
get_cuda_visible_devices
.
remote
()).
split
(
","
))
==
[
"0"
,
"1"
]
assert
ray
.
get
(
actor
.
get_count
.
remote
())
==
2
ray
.
get
(
actor
.
set_cuda_visible_devices
.
remote
(
"0"
))
assert
ray
.
get
(
actor
.
get_count
.
remote
())
==
1
...
...
Prev
1
…
10
11
12
13
14
15
16
17
18
…
23
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment