Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
norm
vllm
Commits
d3d31766
"src/vscode:/vscode.git/clone" did not exist on "42077e6c734df2fc7bbed373abceab99635500ad"
Commit
d3d31766
authored
Feb 13, 2023
by
Woosuk Kwon
Browse files
Fix scheduler
parent
fffa2e1f
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
38 additions
and
11 deletions
+38
-11
cacheflow/master/scheduler.py
cacheflow/master/scheduler.py
+38
-11
No files found.
cacheflow/master/scheduler.py
View file @
d3d31766
...
@@ -10,10 +10,17 @@ class Scheduler:
...
@@ -10,10 +10,17 @@ class Scheduler:
def
__int__
(
def
__int__
(
self
,
self
,
controllers
:
List
,
block_size
:
int
,
block_size
:
int
,
num_gpu_blocks
:
int
,
num_gpu_blocks
:
int
,
num_cpu_blocks
:
int
,
num_cpu_blocks
:
int
,
)
->
None
:
)
->
None
:
self
.
controllers
=
controllers
self
.
block_size
=
block_size
self
.
num_gpu_blocks
=
num_gpu_blocks
self
.
num_cpu_blocks
=
num_cpu_blocks
# Create the block space manager.
self
.
block_manager
=
BlockSpaceManager
(
self
.
block_manager
=
BlockSpaceManager
(
block_size
=
block_size
,
block_size
=
block_size
,
num_gpu_blocks
=
num_gpu_blocks
,
num_gpu_blocks
=
num_gpu_blocks
,
...
@@ -31,9 +38,13 @@ class Scheduler:
...
@@ -31,9 +38,13 @@ class Scheduler:
# Swapped sequence groups (LIFO).
# Swapped sequence groups (LIFO).
self
.
swapped
:
List
[
SequenceGroup
]
=
[]
self
.
swapped
:
List
[
SequenceGroup
]
=
[]
# Pending sequence groups (FIFO).
# Pending sequence groups (FIFO).
self
.
queue
:
List
[
SequenceGroup
]
=
[]
self
.
pending
:
List
[
SequenceGroup
]
=
[]
# Blocks that need to be swaped or copied before model execution.
self
.
blocks_to_swap_in
:
Dict
[
int
,
int
]
=
[]
self
.
blocks_to_swap_out
:
Dict
[
int
,
int
]
=
[]
self
.
blocks_to_copy
:
Dict
[
int
,
int
]
=
[]
def
_free_seq
(
self
,
seq
:
Sequence
)
->
None
:
def
_free_seq
(
self
,
seq
:
Sequence
)
->
None
:
seq
.
status
=
SequenceStatus
.
FINISHED
seq
.
status
=
SequenceStatus
.
FINISHED
...
@@ -52,12 +63,11 @@ class Scheduler:
...
@@ -52,12 +63,11 @@ class Scheduler:
ret
=
self
.
block_manager
.
append
(
seq
)
ret
=
self
.
block_manager
.
append
(
seq
)
if
ret
is
not
None
:
if
ret
is
not
None
:
src_block
,
dst_block
=
ret
src_block
,
dst_block
=
ret
# TODO: Issue COPY commands to the workers.
self
.
blocks_to_copy
[
src_block
]
=
dst_block
def
_swap_in
(
self
,
seq_group
:
SequenceGroup
)
->
None
:
def
_swap_in
(
self
,
seq_group
:
SequenceGroup
)
->
None
:
# TODO: Issue SWAP_IN commands to the workers.
mapping
=
self
.
block_manager
.
swap_in
(
seq_group
)
self
.
block_manager
.
swap_in
(
seq_group
)
self
.
blocks_to_swap_in
.
update
(
mapping
)
self
.
block_manager
.
append
(
seq_group
)
for
seq
in
seq_group
.
seqs
:
for
seq
in
seq_group
.
seqs
:
if
seq
.
status
==
SequenceStatus
.
SWAPPED
:
if
seq
.
status
==
SequenceStatus
.
SWAPPED
:
seq
.
status
=
SequenceStatus
.
RUNNING
seq
.
status
=
SequenceStatus
.
RUNNING
...
@@ -65,14 +75,14 @@ class Scheduler:
...
@@ -65,14 +75,14 @@ class Scheduler:
def
_swap_out
(
self
,
seq_group
:
SequenceGroup
)
->
None
:
def
_swap_out
(
self
,
seq_group
:
SequenceGroup
)
->
None
:
assert
self
.
block_manager
.
can_swap_out
(
seq_group
)
assert
self
.
block_manager
.
can_swap_out
(
seq_group
)
# TODO: Issue SWAP_OUT commands to the workers.
mapping
=
self
.
block_manager
.
swap_out
(
seq_group
)
self
.
block
_manager
.
swap_out
(
seq_group
)
self
.
block
s_to_swap_out
.
update
(
mapping
)
for
seq
in
seq_group
.
seqs
:
for
seq
in
seq_group
.
seqs
:
if
seq
.
status
==
SequenceStatus
.
RUNNING
:
if
seq
.
status
==
SequenceStatus
.
RUNNING
:
seq
.
status
=
SequenceStatus
.
SWAPPED
seq
.
status
=
SequenceStatus
.
SWAPPED
self
.
swapped
.
append
(
seq_group
)
self
.
swapped
.
append
(
seq_group
)
def
step
(
self
)
->
None
:
def
prepare
(
self
)
->
None
:
# 1. Prepare new slots for the running sequences.
# 1. Prepare new slots for the running sequences.
# NOTE: Here we implicitly assume FCFS scheduling.
# NOTE: Here we implicitly assume FCFS scheduling.
# That is, the most recently added sequence group is the first
# That is, the most recently added sequence group is the first
...
@@ -100,6 +110,7 @@ class Scheduler:
...
@@ -100,6 +110,7 @@ class Scheduler:
for
i
,
seq_group
in
enumerate
(
reversed
(
self
.
swapped
)):
for
i
,
seq_group
in
enumerate
(
reversed
(
self
.
swapped
)):
if
self
.
block_manager
.
can_swap_in
(
seq_group
):
if
self
.
block_manager
.
can_swap_in
(
seq_group
):
self
.
_swap_in
(
seq_group
)
self
.
_swap_in
(
seq_group
)
self
.
_append
(
seq_group
)
else
:
else
:
# OOM. Stop swapping.
# OOM. Stop swapping.
self
.
swapped
=
self
.
swapped
[:
len
(
self
.
swapped
)
-
i
]
self
.
swapped
=
self
.
swapped
[:
len
(
self
.
swapped
)
-
i
]
...
@@ -112,14 +123,30 @@ class Scheduler:
...
@@ -112,14 +123,30 @@ class Scheduler:
# NOTE: Here we implicitly assume FCFS scheduling.
# NOTE: Here we implicitly assume FCFS scheduling.
# TODO(woosuk): Add a heuristic to control the maximum batch size.
# TODO(woosuk): Add a heuristic to control the maximum batch size.
if
not
self
.
swapped
:
if
not
self
.
swapped
:
for
i
,
seq_group
in
enumerate
(
self
.
queue
):
for
i
,
seq_group
in
enumerate
(
self
.
pending
):
if
self
.
block_manager
.
can_allocate
(
seq_group
):
if
self
.
block_manager
.
can_allocate
(
seq_group
):
self
.
_allocate
(
seq_group
)
self
.
_allocate
(
seq_group
)
else
:
else
:
# FIXME: Consider the race condition.
# FIXME: Consider the race condition.
self
.
queue
=
self
.
queue
[
i
:]
self
.
pending
=
self
.
pending
[
i
:]
break
break
def
step
(
self
)
->
None
:
# Ensure that either swap-in or swap-out is performed.
if
self
.
blocks_to_swap_in
is
not
None
:
assert
self
.
blocks_to_swap_out
is
None
# Execute the first stage of the pipeline.
self
.
controllers
[
0
].
execute_stage
(
self
.
blocks_to_swap_in
.
copy
(),
self
.
blocks_to_swap_out
.
copy
(),
self
.
blocks_to_copy
.
copy
(),
)
# Clear for the next step.
self
.
blocks_to_swap_in
.
clear
()
self
.
blocks_to_swap_out
.
clear
()
self
.
blocks_to_copy
.
clear
()
def
post_step
(
def
post_step
(
self
,
self
,
next_tokens
:
Dict
[
int
,
Tuple
[
int
,
int
]],
next_tokens
:
Dict
[
int
,
Tuple
[
int
,
int
]],
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment