"csrc/vscode:/vscode.git/clone" did not exist on "bd363067da87c46e57f41bbf7f36df8045a62e31"
Commit 3a49f3d2 authored by guanyu1's avatar guanyu1
Browse files

detok修改

parent 052059d9
......@@ -1244,8 +1244,255 @@ class LLMEngine:
return None
def fix_process_model_output(self,
ctx_output_queue,
ctx_request_outputs,
ctx_multi_step_stream_outputs,
request_id: Optional[str] = None) -> None:
now = time.time()
if len(ctx_output_queue) == 0:
return None
# Get pending async postprocessor
if request_id:
# When we process only one request, no pop is required
# (since later we will process all of the rest)
(outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output, skip) = ctx_output_queue[0]
else:
(outputs, seq_group_metadata_list, scheduler_outputs, is_async,
is_last_step, is_first_step_output,
skip) = ctx_output_queue.popleft()
# Sanity check
assert len(seq_group_metadata_list) == len(
scheduler_outputs.scheduled_seq_groups)
has_multiple_outputs: bool = len(outputs) > 1
outputs_by_sequence_group: List[List[SequenceGroupOutput]]
if has_multiple_outputs:
assert self.scheduler_config.is_multi_step or \
self.speculative_config
# Organize outputs by [step][sequence group] instead of
# [sequence group][step].
if self.scheduler_config.is_multi_step:
outputs_by_sequence_group = create_output_by_sequence_group(
outputs, len(seq_group_metadata_list))
elif self.speculative_config:
# Decodes are multi-steps while prefills are not, outputting at
# most 1 token. Separate them so that we can trigger chunk
# processing without having to pad or copy over prompts K times
# to match decodes structure (costly with prompt_logprobs).
num_prefills = sum(sg.is_prompt
for sg in seq_group_metadata_list)
prefills, decodes = outputs[:num_prefills], outputs[
num_prefills:]
outputs_by_sequence_group = create_output_by_sequence_group(
decodes,
num_seq_groups=len(seq_group_metadata_list) - num_prefills)
outputs_by_sequence_group = [p.outputs for p in prefills
] + outputs_by_sequence_group
# We have outputs for multiple steps submitted in a single burst,
# so invalidate is_first_step_output.
is_first_step_output = None
elif len(outputs) == 1:
outputs_by_sequence_group = outputs
else:
return None
# Determine the requests we need to operate on
if request_id:
indices = []
for i, seq_group_meta in enumerate(seq_group_metadata_list):
if seq_group_meta.request_id == request_id:
assert i not in skip # Cannot be called twice
indices.append(i)
break
# If the request_id was not found, then it means that
# this is a new request that has no pending async
# postprocessor
if not indices:
return
else:
indices = range(len(seq_group_metadata_list)) # type: ignore
finished_before: List[int] = []
finished_now: List[int] = []
empty_seq_indices: List[int] = []
for i in indices:
if i in skip:
continue
seq_group_meta = seq_group_metadata_list[i]
scheduled_seq_group = scheduler_outputs.scheduled_seq_groups[i]
seq_group: SequenceGroup = scheduled_seq_group.seq_group
if seq_group.is_finished():
finished_before.append(i)
continue
output: List[SequenceGroupOutput]
if has_multiple_outputs:
output = outputs_by_sequence_group[i]
else:
output = [outputs_by_sequence_group[0][i]]
# tree style speculative decoding may generate empty output in first step
if self.tree_decoding and outputs and isinstance(output[0], CompletionSequenceGroupOutput):
samples = [o.samples[0] for o in output]
valid_samples = [
sample for sample in samples
if sample.output_token != VLLM_INVALID_TOKEN_ID
]
if len(valid_samples) == 0:
empty_seq_indices.append(i)
continue
if not is_async:
#print("hello")
if self.scheduler_config.is_multi_step:
# Updates happen only if the sequence is prefill
self._update_num_computed_tokens_for_multi_step_prefill(
seq_group, seq_group_meta, is_first_step_output)
else:
seq_group.update_num_computed_tokens(
seq_group_meta.token_chunk_size or 0)
if outputs:
for o in outputs:
if (isinstance(o, SamplerOutput)
and seq_group.metrics is not None):
if seq_group.metrics.model_forward_time is not None:
seq_group.metrics.model_forward_time += (
o.model_forward_time or 0)
else:
seq_group.metrics.model_forward_time = (
o.model_forward_time)
if seq_group.metrics.model_execute_time is not None:
seq_group.metrics.model_execute_time += (
o.model_execute_time or 0)
else:
seq_group.metrics.model_execute_time = (
o.model_execute_time)
if self.model_config.runner_type == "pooling":
self._process_sequence_group_outputs(seq_group, output)
else:
self.output_processor.process_prompt_logprob(seq_group, output)
if seq_group_meta.do_sample:
self.output_processor.process_outputs(
seq_group, output, is_async)
if seq_group.is_finished():
finished_now.append(i)
# Generate outputs for the requests that finished this iteration
for i in finished_now:
scheduled_seq_group = scheduler_outputs.scheduled_seq_groups[i]
seq_group = scheduled_seq_group.seq_group
seq_group.maybe_set_first_token_time(now)
if not seq_group.is_prefill():
seq_group.set_last_token_time(now)
request_output = RequestOutputFactory.create(
seq_group,
self.seq_id_to_seq_group,
use_cache=self.use_cached_outputs)
if request_output:
ctx_request_outputs.append(request_output)
# When we process a single request, we skip it for the next time,
# and invoke the request output callback (if there was final output)
if request_id:
assert len(indices) == 1
skip.append(indices[0])
if (finished_now
and self.process_request_outputs_callback is not None):
self.process_request_outputs_callback(ctx_request_outputs)
ctx_request_outputs.clear()
return
# Free currently finished requests
if finished_now:
for scheduler in self.scheduler:
scheduler.free_finished_seq_groups()
# For multi-step without streaming, don't create outputs each iteration
if not is_last_step and not ctx_multi_step_stream_outputs:
# Immediately process request outputs here (if callback is given)
if (finished_now
and self.process_request_outputs_callback is not None):
self.process_request_outputs_callback(ctx_request_outputs)
ctx_request_outputs.clear()
return
# Create the outputs
for i in indices:
if i in skip or i in finished_before or i in finished_now or i in empty_seq_indices:
continue # Avoids double processing
scheduled_seq_group = scheduler_outputs.scheduled_seq_groups[i]
seq_group = scheduled_seq_group.seq_group
seq_group.maybe_set_first_token_time(now)
if not seq_group.is_prefill():
seq_group.set_last_token_time(now)
request_output = RequestOutputFactory.create(
seq_group,
self.seq_id_to_seq_group,
use_cache=self.use_cached_outputs)
if request_output:
ctx_request_outputs.append(request_output)
# For multi-step with streaming, create outputs each iteration
if not is_last_step and ctx_multi_step_stream_outputs:
# Immediately process request outputs here (if callback is given)
if self.process_request_outputs_callback is not None:
self.process_request_outputs_callback(ctx_request_outputs)
ctx_request_outputs.clear()
return
for seq_group in scheduler_outputs.ignored_seq_groups:
params = seq_group.sampling_params
if params is not None and params.output_kind == (
RequestOutputKind.DELTA) and not seq_group.is_finished():
continue
request_output = RequestOutputFactory.create(
seq_group,
self.seq_id_to_seq_group,
use_cache=self.use_cached_outputs,
)
if request_output:
ctx_request_outputs.append(request_output)
# Immediately process request outputs here (if callback is given)
if (ctx_request_outputs
and self.process_request_outputs_callback is not None):
self.process_request_outputs_callback(ctx_request_outputs)
ctx_request_outputs.clear()
# For async case, we need to record the stats here.
# For non-async case, the stats are done in the
# LLMEngine/AsyncLLMEngine directly
if is_async:
# Log stats.
self.do_log_stats(scheduler_outputs, outputs, finished_before,
skip)
# Tracing
self.do_tracing(scheduler_outputs, finished_before)
return None
def _fix_last_step(
self, output: List[SamplerOutput],
self, ctx_output_queue,ctx_request_outputs,
ctx_multi_step_stream_outputs,output: List[SamplerOutput],
seq_group_metadata_list: List[SequenceGroupMetadata],
scheduled_seq_groups: List[ScheduledSequenceGroup]) -> None:
......@@ -1388,6 +1635,9 @@ class LLMEngine:
ctx.scheduler_outputs = scheduler_outputs
self.async_event.synchronize()
self._fix_last_step(
ctx.output_queue,
ctx.request_outputs,
ctx.multi_step_stream_outputs,
outputs, seq_group_metadata_list,
scheduler_outputs.scheduled_seq_groups)
......@@ -1568,7 +1818,8 @@ class LLMEngine:
# to each of the non-last PP stages for in-place prepare_input.
last_sampled_token_ids=last_sampled_token_ids)
if allow_async_output_proc:
execute_model_req.async_callback = self.async_callbacks[
if not self.zero_overhead:
execute_model_req.async_callback = self.async_callbacks[
virtual_engine]
#profile.ProfRangeAutoPush('model_executor')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment