Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
norm
vllm
Commits
331fa0b0
Commit
331fa0b0
authored
Feb 23, 2023
by
Woosuk Kwon
Browse files
Implement scheduler.step & Add a threshold for batch size
parent
501c4bd0
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
59 additions
and
21 deletions
+59
-21
cacheflow/master/scheduler.py
cacheflow/master/scheduler.py
+59
-21
No files found.
cacheflow/master/scheduler.py
View file @
331fa0b0
...
...
@@ -5,6 +5,8 @@ from cacheflow.sequence import Sequence
from
cacheflow.sequence
import
SequenceGroup
from
cacheflow.sequence
import
SequenceStatus
_MAX_NUM_BATCHED_TOKENS
=
2048
class
Scheduler
:
...
...
@@ -27,8 +29,8 @@ class Scheduler:
num_cpu_blocks
=
num_cpu_blocks
,
)
#
Serv
ing sequence groups (FIFO).
self
.
serv
ing
:
List
[
SequenceGroup
]
=
[]
#
Runn
ing sequence groups (FIFO).
self
.
runn
ing
:
List
[
SequenceGroup
]
=
[]
# Mapping: group_id -> num_steps.
self
.
num_steps
:
Dict
[
int
,
int
]
=
{}
# Mapping: group_id -> max_num_steps.
...
...
@@ -54,7 +56,7 @@ class Scheduler:
self
.
block_manager
.
allocate
(
seq_group
)
for
seq
in
seq_group
.
seqs
:
seq
.
status
=
SequenceStatus
.
RUNNING
self
.
serv
ing
.
append
(
seq_group
)
self
.
runn
ing
.
append
(
seq_group
)
# FIXME
self
.
num_steps
[
seq_group
.
group_id
]
=
0
...
...
@@ -73,7 +75,7 @@ class Scheduler:
for
seq
in
seq_group
.
seqs
:
if
seq
.
status
==
SequenceStatus
.
SWAPPED
:
seq
.
status
=
SequenceStatus
.
RUNNING
self
.
serv
ing
.
append
(
seq_group
)
self
.
runn
ing
.
append
(
seq_group
)
def
_swap_out
(
self
,
seq_group
:
SequenceGroup
)
->
None
:
assert
self
.
block_manager
.
can_swap_out
(
seq_group
)
...
...
@@ -89,14 +91,14 @@ class Scheduler:
# NOTE: Here we implicitly assume FCFS scheduling.
# That is, the most recently added sequence group is the first
# to be swapped out.
victim_idx
=
len
(
self
.
serv
ing
)
-
1
for
i
,
seq_group
in
enumerate
(
self
.
serv
ing
):
victim_idx
=
len
(
self
.
runn
ing
)
-
1
for
i
,
seq_group
in
enumerate
(
self
.
runn
ing
):
if
i
>
victim_idx
:
# The i-th sequence group has already been swapped out.
break
# OOM. Swap out the victim sequence groups.
while
not
self
.
block_manager
.
can_append
(
seq_group
):
victim_seq_group
=
self
.
serv
ing
[
victim_idx
]
victim_seq_group
=
self
.
runn
ing
[
victim_idx
]
self
.
_swap_out
(
victim_seq_group
)
victim_idx
-=
1
if
i
>
victim_idx
:
...
...
@@ -104,7 +106,7 @@ class Scheduler:
break
else
:
self
.
_append
(
seq_group
)
self
.
serv
ing
=
self
.
serv
ing
[:
victim_idx
+
1
]
self
.
runn
ing
=
self
.
runn
ing
[:
victim_idx
+
1
]
# 2. Swap in the swapped sequences if possible.
# NOTE: Here we implicitly assume FCFS scheduling.
...
...
@@ -121,16 +123,25 @@ class Scheduler:
# All swapped sequences are swapped in.
self
.
swapped
.
clear
()
num_batched_tokens
=
sum
(
seq_group
.
num_seqs
(
status
=
SequenceStatus
.
RUNNING
)
for
seq_group
in
self
.
running
)
# 3. Join new sequences if possible.
# NOTE: Here we implicitly assume FCFS scheduling.
# TODO(woosuk): Add a
heurist
ic to control the
maximum
batch size.
# TODO(woosuk): Add a
batching pol
ic
y
to control the batch size.
if
not
self
.
swapped
:
# FIXME: Acquire a lock.
# FIXME
(woosuk)
: Acquire a lock
to protect pending
.
for
i
,
seq_group
in
enumerate
(
self
.
pending
):
num_prompt_tokens
=
seq_group
.
seqs
[
0
].
get_len
()
if
self
.
block_manager
.
can_allocate
(
seq_group
):
if
(
num_batched_tokens
+
num_prompt_tokens
<=
_MAX_NUM_BATCHED_TOKENS
):
self
.
_allocate
(
seq_group
)
else
:
# FIXME: Consider the race condition.
num_batched_tokens
+=
num_prompt_tokens
continue
self
.
pending
=
self
.
pending
[
i
:]
break
else
:
...
...
@@ -141,8 +152,35 @@ class Scheduler:
if
self
.
blocks_to_swap_in
:
assert
not
self
.
blocks_to_swap_out
# Create input data structures.
prompt_tokens
:
Dict
[
int
,
List
[
int
]]
=
{}
generation_tokens
:
Dict
[
int
,
int
]
=
{}
context_lens
:
Dict
[
int
,
int
]
=
{}
block_tables
:
Dict
[
int
,
List
[
int
]]
=
{}
for
seq_group
in
self
.
running
:
group_id
=
seq_group
.
group_id
num_steps
=
self
.
num_steps
[
group_id
]
# NOTE(woosuk): We assume that the number of steps is 0
# for the prompt sequences.
is_prompt
=
num_steps
==
0
for
seq
in
seq_group
.
seqs
:
if
seq
.
status
!=
SequenceStatus
.
RUNNING
:
continue
seq_id
=
seq
.
seq_id
block_tables
[
seq_id
]
=
self
.
block_manager
.
get_block_table
(
seq
)
if
is_prompt
:
prompt_tokens
[
seq_id
]
=
seq
.
get_token_ids
()
else
:
generation_tokens
[
seq_id
]
=
seq
.
get_token_ids
()[
-
1
]
context_lens
[
seq_id
]
=
seq
.
get_len
()
# Execute the first stage of the pipeline.
self
.
controllers
[
0
].
execute_stage
(
prompt_tokens
,
generation_tokens
,
context_lens
,
block_tables
,
self
.
blocks_to_swap_in
.
copy
(),
self
.
blocks_to_swap_out
.
copy
(),
self
.
blocks_to_copy
.
copy
(),
...
...
@@ -158,7 +196,7 @@ class Scheduler:
next_tokens
:
Dict
[
int
,
Tuple
[
int
,
int
]],
)
->
None
:
# Update the running sequences and free blocks.
for
seq_group
in
self
.
serv
ing
:
for
seq_group
in
self
.
runn
ing
:
group_id
=
seq_group
.
group_id
self
.
num_steps
[
group_id
]
+=
1
stop_token_ids
=
self
.
stop_token_ids
[
group_id
]
...
...
@@ -190,14 +228,14 @@ class Scheduler:
self
.
_free_seq
(
seq
)
continue
# Update the
serv
ing s
tat
es.
serv
ing
:
List
[
SequenceGroup
]
=
[]
for
seq_group
in
self
.
serv
ing
:
# Update the
runn
ing s
equenc
es.
runn
ing
:
List
[
SequenceGroup
]
=
[]
for
seq_group
in
self
.
runn
ing
:
if
all
(
seq
.
status
==
SequenceStatus
.
FINISHED
for
seq
in
seq_group
.
seqs
):
del
self
.
num_steps
[
seq_group
.
group_id
]
del
self
.
max_num_steps
[
seq_group
.
group_id
]
del
self
.
stop_token_ids
[
seq_group
.
group_id
]
# TODO: Return the seq_group to the client.
else
:
serv
ing
.
append
(
seq_group
)
self
.
serv
ing
=
serv
ing
runn
ing
.
append
(
seq_group
)
self
.
runn
ing
=
runn
ing
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment