Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
20e9c3eb
Commit
20e9c3eb
authored
Apr 03, 2025
by
lizhigong
Browse files
debug and fix error
parent
e6bbec9e
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
70 additions
and
73 deletions
+70
-73
vllm/engine/llm_engine.py
vllm/engine/llm_engine.py
+56
-64
vllm/model_executor/layers/sampler.py
vllm/model_executor/layers/sampler.py
+10
-7
vllm/spec_decode/target_model_runner.py
vllm/spec_decode/target_model_runner.py
+4
-2
No files found.
vllm/engine/llm_engine.py
View file @
20e9c3eb
...
...
@@ -1326,65 +1326,62 @@ class LLMEngine:
self
.
sem_m2s
.
release
()
def
thread_zero_overhead
(
self
):
while
True
:
self
.
sem_m2s
.
acquire
()
if
not
self
.
thread_running
:
break
last_outputs_ids
=
None
last_outputs_tensor
=
None
if
self
.
last_record
is
not
None
:
last_output
=
self
.
last_record
[
0
][
0
]
last_outputs_ids
,
last_outputs_tensor
=
last_output
.
sampler_out_ids
,
last_output
.
sampler_out_tenosr
self
.
async_d2h
=
last_outputs_tensor
.
to
(
'cpu'
,
non_blocking
=
True
)
self
.
async_event
.
record
()
self
.
q_recorder
.
put
(
self
.
last_record
)
virtual_engine
=
0
ctx
=
self
.
scheduler_contexts
[
virtual_engine
]
# Clear outputs for each new scheduler iteration
ctx
.
request_outputs
.
clear
()
# Schedule iteration
(
seq_group_metadata_list
,
scheduler_outputs
,
allow_async_output_proc
)
=
self
.
scheduler
[
virtual_engine
].
schedule
()
ctx
.
seq_group_metadata_list
=
seq_group_metadata_list
ctx
.
scheduler_outputs
=
scheduler_outputs
finished_requests_ids
=
self
.
scheduler
[
virtual_engine
].
get_and_reset_finished_requests_ids
()
assert
seq_group_metadata_list
is
not
None
assert
scheduler_outputs
is
not
None
last_sampled_token_ids
=
\
self
.
_get_last_sampled_token_ids
(
virtual_engine
)
execute_model_req
=
ExecuteModelRequest
(
seq_group_metadata_list
=
seq_group_metadata_list
,
blocks_to_swap_in
=
scheduler_outputs
.
blocks_to_swap_in
,
blocks_to_swap_out
=
scheduler_outputs
.
blocks_to_swap_out
,
blocks_to_copy
=
scheduler_outputs
.
blocks_to_copy
,
num_lookahead_slots
=
scheduler_outputs
.
num_lookahead_slots
,
running_queue_size
=
scheduler_outputs
.
running_queue_size
,
finished_requests_ids
=
finished_requests_ids
,
# We use ExecuteModelRequest to pass the last sampled_token_ids
# to each of the non-last PP stages for in-place prepare_input.
last_sampled_token_ids
=
last_sampled_token_ids
,
last_outputs_ids
=
last_outputs_ids
,
last_outputs_sample
=
last_outputs_tensor
)
# if allow_async_output_proc:
# execute_model_req.async_callback = self.async_callbacks[
# virtual_engine]
#profile.ProfRangeAutoPush('model_executor')
outputs
=
self
.
model_executor
.
execute_model
(
execute_model_req
=
execute_model_req
)
try
:
while
True
:
self
.
sem_m2s
.
acquire
()
if
not
self
.
thread_running
:
break
self
.
_advance_to_next_step
(
outputs
[
0
],
seq_group_metadata_list
,
scheduler_outputs
.
scheduled_seq_groups
)
self
.
last_record
=
[
outputs
,
seq_group_metadata_list
,
scheduler_outputs
]
virtual_engine
=
0
# Clear outputs for each new scheduler iteration
# Schedule iteration
(
seq_group_metadata_list
,
scheduler_outputs
,
allow_async_output_proc
)
=
self
.
scheduler
[
virtual_engine
].
schedule
()
last_outputs_ids
=
None
last_outputs_tensor
=
None
if
self
.
last_record
is
not
None
:
last_output
=
self
.
last_record
[
0
][
0
]
last_outputs_ids
,
last_outputs_tensor
=
last_output
.
sampler_out_ids
,
last_output
.
sampler_out_tenosr
self
.
async_d2h
=
last_outputs_tensor
.
to
(
'cpu'
,
non_blocking
=
True
)
self
.
async_event
.
record
()
self
.
q_recorder
.
put
(
self
.
last_record
)
finished_requests_ids
=
self
.
scheduler
[
virtual_engine
].
get_and_reset_finished_requests_ids
()
assert
seq_group_metadata_list
is
not
None
assert
scheduler_outputs
is
not
None
last_sampled_token_ids
=
\
self
.
_get_last_sampled_token_ids
(
virtual_engine
)
execute_model_req
=
ExecuteModelRequest
(
seq_group_metadata_list
=
seq_group_metadata_list
,
blocks_to_swap_in
=
scheduler_outputs
.
blocks_to_swap_in
,
blocks_to_swap_out
=
scheduler_outputs
.
blocks_to_swap_out
,
blocks_to_copy
=
scheduler_outputs
.
blocks_to_copy
,
num_lookahead_slots
=
scheduler_outputs
.
num_lookahead_slots
,
running_queue_size
=
scheduler_outputs
.
running_queue_size
,
finished_requests_ids
=
finished_requests_ids
,
# We use ExecuteModelRequest to pass the last sampled_token_ids
# to each of the non-last PP stages for in-place prepare_input.
last_sampled_token_ids
=
last_sampled_token_ids
,
last_outputs_ids
=
last_outputs_ids
,
last_outputs_sample
=
last_outputs_tensor
)
outputs
=
self
.
model_executor
.
execute_model
(
execute_model_req
=
execute_model_req
)
if
len
(
outputs
)
==
1
:
self
.
_advance_to_next_step
(
outputs
[
0
],
seq_group_metadata_list
,
scheduler_outputs
.
scheduled_seq_groups
)
self
.
last_record
=
[
outputs
,
seq_group_metadata_list
,
scheduler_outputs
]
except
Exception
as
e
:
print
(
f
"thread_zero_overhead error :
{
e
}
"
)
traceback
.
print_exc
()
def
zero_overhead_step
(
self
)
->
List
[
Union
[
RequestOutput
,
PoolingRequestOutput
]]:
if
not
self
.
thread_running
:
...
...
@@ -1398,6 +1395,7 @@ class LLMEngine:
return
None
virtual_engine
=
0
ctx
=
self
.
scheduler_contexts
[
virtual_engine
]
ctx
.
request_outputs
.
clear
()
outputs
,
seq_group_metadata_list
,
scheduler_outputs
=
recode_output
ctx
.
seq_group_metadata_list
=
seq_group_metadata_list
ctx
.
scheduler_outputs
=
scheduler_outputs
...
...
@@ -1424,12 +1422,6 @@ class LLMEngine:
#if not allow_async_output_proc:
self
.
_process_model_outputs
(
ctx
=
ctx
)
# Log stats.
self
.
do_log_stats
(
scheduler_outputs
,
outputs
)
# Tracing
self
.
do_tracing
(
scheduler_outputs
)
#profile.ProfRangeAutoPush('has_unfinish')
if
not
self
.
has_unfinished_requests
():
# Drain async postprocessor (if exists)
...
...
vllm/model_executor/layers/sampler.py
View file @
20e9c3eb
...
...
@@ -75,7 +75,7 @@ class SampleResultArgsType:
class
SampleDeviceToDevices
:
def
__init__
(
self
):
self
.
seq_id
:
torch
.
Tensor
=
None
self
.
random_samples
:
torch
.
Tensor
=
None
self
.
sampled_token_ids_tensor
:
torch
.
Tensor
=
None
self
.
zero_overhead
:
bool
=
False
d2d_data
=
SampleDeviceToDevices
()
...
...
@@ -312,7 +312,6 @@ class Sampler(nn.Module):
probs
=
torch
.
softmax
(
logits
,
dim
=-
1
,
dtype
=
torch
.
float
)
# Compute the log probabilities.
logprobs
=
torch
.
log_softmax
(
logits
,
dim
=-
1
,
dtype
=
torch
.
float
)
# Sample the next tokens.
maybe_deferred_sample_results
,
maybe_sampled_tokens_tensor
=
_sample
(
probs
,
...
...
@@ -725,10 +724,8 @@ def get_pythonized_sample_results(
continue
(
seq_group_id
,
seq_groups
)
=
sample_metadata
[
sampling_type
]
if
sampling_type
==
SamplingType
.
GREEDY
:
d2d_data
.
random_samples
=
greedy_samples
sample_results
=
_greedy_sample
(
seq_groups
,
greedy_samples
)
elif
sampling_type
in
(
SamplingType
.
RANDOM
,
SamplingType
.
RANDOM_SEED
):
d2d_data
.
random_samples
=
multinomial_samples
[
sampling_type
]
#记录random_samples的数据
sample_results
=
_random_sample
(
seq_groups
,
multinomial_samples
[
sampling_type
])
elif
sampling_type
==
SamplingType
.
BEAM
:
...
...
@@ -766,14 +763,13 @@ def _sample_with_torch(
t
:
[]
for
t
in
SamplingType
}
d2d_data
.
seq_id
=
torch
.
zeros
(
len
(
sampling_metadata
.
seq_groups
))
d2d_data
.
seq_id
=
torch
.
zeros
(
len
(
sampling_metadata
.
seq_groups
)
,
dtype
=
torch
.
int32
)
categorized_sample_indices
=
sampling_metadata
.
categorized_sample_indices
for
i
,
seq_group
in
enumerate
(
sampling_metadata
.
seq_groups
):
d2d_data
.
seq_id
[
i
]
=
seq_group
.
seq_ids
[
0
]
sampling_params
=
seq_group
.
sampling_params
sampling_type
=
sampling_params
.
sampling_type
categorized_seq_group_ids
[
sampling_type
].
append
(
i
)
sample_results_dict
:
SampleResultsDictType
=
{}
sample_metadata
:
SampleMetadataType
=
{}
multinomial_samples
:
MultinomialSamplesType
=
{}
...
...
@@ -804,6 +800,9 @@ def _sample_with_torch(
if
sampling_type
==
SamplingType
.
GREEDY
:
greedy_samples
=
torch
.
argmax
(
logprobs
[
long_sample_indices
],
dim
=-
1
)
if
d2d_data
.
zero_overhead
:
d2d_data
.
sampled_token_ids_tensor
=
greedy_samples
.
unsqueeze
(
-
1
)
if
sampled_token_ids_tensor
is
not
None
:
# Store sampled tokens in output tensor.
...
...
@@ -841,6 +840,10 @@ def _sample_with_torch(
probs
[
long_sample_indices
],
max_n_in_batch
,
seq_groups
=
seq_groups_arg
)
if
d2d_data
.
zero_overhead
:
d2d_data
.
sampled_token_ids_tensor
=
\
multinomial_samples
[
sampling_type
].
to
(
torch
.
long
)
if
sampled_token_ids_tensor
is
not
None
:
# Store sampled tokens in output tensor.
...
...
@@ -1306,7 +1309,7 @@ def _build_sampler_output(
logprobs
=
logprobs_tensor
,
deferred_sample_results_args
=
deferred_sample_results_args
,
logits
=
logits
,
sampler_out_tenosr
=
d2d_data
.
random_samples
,
sampler_out_tenosr
=
d2d_data
.
sampled_token_ids_tensor
,
sampler_out_ids
=
d2d_data
.
seq_id
)
...
...
vllm/spec_decode/target_model_runner.py
View file @
20e9c3eb
# SPDX-License-Identifier: Apache-2.0
from
typing
import
List
,
Optional
import
torch
from
vllm.sequence
import
SequenceGroupMetadata
from
vllm.worker.model_runner_base
import
(
ModelRunnerBase
,
ModelRunnerInputBase
,
...
...
@@ -31,10 +31,12 @@ class TargetModelRunner(ModelRunnerWrapperBase):
seq_group_metadata_list
:
List
[
SequenceGroupMetadata
],
virtual_engine
:
int
=
0
,
finished_requests_ids
:
Optional
[
List
[
str
]]
=
None
,
last_outputs_ids
:
torch
.
Tensor
=
None
,
last_output_sample
:
torch
.
Tensor
=
None
,
)
->
ModelRunnerInputBase
:
model_input
:
ModelRunnerInputBase
=
\
self
.
model_runner
.
prepare_model_input
(
seq_group_metadata_list
,
virtual_engine
,
finished_requests_ids
)
seq_group_metadata_list
,
virtual_engine
,
finished_requests_ids
,
last_outputs_ids
,
last_output_sample
)
# If token log probabilities is disabled then skip generating sampler
# CPU output. We directly serialize the GPU sampled_token_id tensors
# as needed. If log probabilities is enabled then synchronize all the
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment