Unverified Commit 71683ca6 authored by Woosuk Kwon's avatar Woosuk Kwon Committed by GitHub
Browse files

[V0 Deprecation] Remove multi-step scheduling (#22138)


Signed-off-by: default avatarWoosuk Kwon <woosuk.kwon@berkeley.edu>
Signed-off-by: default avatarWoosuk Kwon <woosuk@thinkingmachines.ai>
parent e1885929
...@@ -3779,8 +3779,6 @@ class VllmConfig: ...@@ -3779,8 +3779,6 @@ class VllmConfig:
f"observability_config={self.observability_config!r}, " f"observability_config={self.observability_config!r}, "
f"seed={self.model_config.seed}, " f"seed={self.model_config.seed}, "
f"served_model_name={self.model_config.served_model_name}, " f"served_model_name={self.model_config.served_model_name}, "
f"num_scheduler_steps={self.scheduler_config.num_scheduler_steps}, "
f"multi_step_stream_outputs={self.scheduler_config.multi_step_stream_outputs}, " # noqa
f"enable_prefix_caching={self.cache_config.enable_prefix_caching}, " f"enable_prefix_caching={self.cache_config.enable_prefix_caching}, "
f"chunked_prefill_enabled={self.scheduler_config.chunked_prefill_enabled}, " # noqa f"chunked_prefill_enabled={self.scheduler_config.chunked_prefill_enabled}, " # noqa
f"use_async_output_proc={self.model_config.use_async_output_proc}, " f"use_async_output_proc={self.model_config.use_async_output_proc}, "
......
...@@ -929,8 +929,7 @@ class Scheduler: ...@@ -929,8 +929,7 @@ class Scheduler:
) )
def _get_prompt_limit(self, seq_group: SequenceGroup) -> int: def _get_prompt_limit(self, seq_group: SequenceGroup) -> int:
if (self.scheduler_config.chunked_prefill_enabled if self.scheduler_config.chunked_prefill_enabled:
and not self.scheduler_config.is_multi_step):
prompt_limit = self.scheduler_config.max_model_len prompt_limit = self.scheduler_config.max_model_len
else: else:
prompt_limit = min( prompt_limit = min(
...@@ -1114,9 +1113,6 @@ class Scheduler: ...@@ -1114,9 +1113,6 @@ class Scheduler:
continue continue
num_lookahead_slots: int = 0 num_lookahead_slots: int = 0
if self.scheduler_config.is_multi_step and enable_chunking:
num_lookahead_slots = self._get_num_lookahead_slots(
True, enable_chunking)
# If the sequence group cannot be allocated, stop. # If the sequence group cannot be allocated, stop.
can_allocate = self.block_manager.can_allocate( can_allocate = self.block_manager.can_allocate(
...@@ -1195,24 +1191,6 @@ class Scheduler: ...@@ -1195,24 +1191,6 @@ class Scheduler:
partial_prefill_metadata.maybe_increment_partial_prefills( partial_prefill_metadata.maybe_increment_partial_prefills(
seq_group) seq_group)
if enable_chunking and self.scheduler_config.is_multi_step:
blocks_to_copy: List[Tuple[int, int]] = []
# init_multi_step_from_lookahead_slots happens in append_slots
self._append_slots(seq_group, blocks_to_copy, enable_chunking)
# This assert will trip when a copy-on-write happens. This is
# not a concern as the very first sequence-group block
# allocation happens above. Still, we have the assert to
# catch any edge-cases.
assert not blocks_to_copy
else:
seq_group.init_multi_step_from_lookahead_slots(
num_lookahead_slots,
num_scheduler_steps=self.scheduler_config.
num_scheduler_steps,
is_multi_step=self.scheduler_config.is_multi_step,
enable_chunking=enable_chunking,
)
seq_groups.append( seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group, ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=num_new_tokens)) token_chunk_size=num_new_tokens))
...@@ -1453,14 +1431,6 @@ class Scheduler: ...@@ -1453,14 +1431,6 @@ class Scheduler:
num_prefill_groups = (len(prefills.seq_groups) + num_prefill_groups = (len(prefills.seq_groups) +
len(swapped_in.prefill_seq_groups) + len(swapped_in.prefill_seq_groups) +
len(running_scheduled.prefill_seq_groups)) len(running_scheduled.prefill_seq_groups))
# If all prompts, then we set num_lookahead_slots to 0
# this allows us to go through the `no_spec` path in
# `spec_decode_worker.py`
all_prefills = len(scheduled_seq_groups) == num_prefill_groups
num_lookahead_slots = (0 if
(all_prefills
and not self.scheduler_config.is_multi_step)
else running_scheduled.num_lookahead_slots)
return SchedulerOutputs( return SchedulerOutputs(
scheduled_seq_groups=scheduled_seq_groups, scheduled_seq_groups=scheduled_seq_groups,
num_prefill_groups=num_prefill_groups, num_prefill_groups=num_prefill_groups,
...@@ -1472,7 +1442,7 @@ class Scheduler: ...@@ -1472,7 +1442,7 @@ class Scheduler:
swapped_in.blocks_to_copy, swapped_in.blocks_to_copy,
ignored_seq_groups=prefills.ignored_seq_groups + ignored_seq_groups=prefills.ignored_seq_groups +
swapped_in.infeasible_seq_groups, swapped_in.infeasible_seq_groups,
num_lookahead_slots=num_lookahead_slots, num_lookahead_slots=0,
running_queue_size=len(self.running), running_queue_size=len(self.running),
preempted=(len(running_scheduled.preempted) + preempted=(len(running_scheduled.preempted) +
len(running_scheduled.swapped_out)), len(running_scheduled.swapped_out)),
...@@ -1516,11 +1486,6 @@ class Scheduler: ...@@ -1516,11 +1486,6 @@ class Scheduler:
num_lookahead_slots = self._get_num_lookahead_slots( num_lookahead_slots = self._get_num_lookahead_slots(
is_prefill, enable_chunking) is_prefill, enable_chunking)
if is_prefill and num_lookahead_slots > 0:
# Appending prefill slots only happens multi-step and
# chunked-prefill are enabled together.
assert self.scheduler_config.is_multi_step and enable_chunking
return self.block_manager.can_append_slots( return self.block_manager.can_append_slots(
seq_group=seq_group, num_lookahead_slots=num_lookahead_slots) seq_group=seq_group, num_lookahead_slots=num_lookahead_slots)
...@@ -1776,19 +1741,7 @@ class Scheduler: ...@@ -1776,19 +1741,7 @@ class Scheduler:
num_lookahead_slots: int = self._get_num_lookahead_slots( num_lookahead_slots: int = self._get_num_lookahead_slots(
is_prefill, enable_chunking) is_prefill, enable_chunking)
seq_group.init_multi_step_from_lookahead_slots(
num_lookahead_slots,
num_scheduler_steps=self.scheduler_config.num_scheduler_steps,
is_multi_step=self.scheduler_config.is_multi_step,
enable_chunking=enable_chunking,
)
seq_status: Optional[SequenceStatus] = SequenceStatus.RUNNING seq_status: Optional[SequenceStatus] = SequenceStatus.RUNNING
if self.scheduler_config.is_multi_step and enable_chunking:
# In multi-step chunked-prefill any sequence type can have
# slots appended.
seq_status = None
for seq in seq_group.get_seqs(status=seq_status): for seq in seq_group.get_seqs(status=seq_status):
cows = self.block_manager.append_slots(seq, num_lookahead_slots) cows = self.block_manager.append_slots(seq, num_lookahead_slots)
if len(cows) > 0: if len(cows) > 0:
...@@ -1904,29 +1857,8 @@ class Scheduler: ...@@ -1904,29 +1857,8 @@ class Scheduler:
"""The number of slots to allocate per sequence per step, beyond known """The number of slots to allocate per sequence per step, beyond known
token ids. Speculative decoding uses these slots to store KV activations token ids. Speculative decoding uses these slots to store KV activations
of tokens which may or may not be accepted. of tokens which may or may not be accepted.
Speculative decoding does not yet support prefill, so we do not perform
lookahead allocation for prefill.
When chunking is enabled with multi-step, we allocate lookahead slots
for the prefills for when the prefills turn into decodes in the first
step.
""" """
if is_prefill: return 0
if self.scheduler_config.is_multi_step and enable_chunking:
# num_lookahead_slots was introduced in the context of decodes,
# in Speculative Decoding.
# When the num_scheduler_steps is 8, say, then the
# num_lookahead_slots is 7. Meaning, we are doing a 1-step of
# decode anyways and we wish to do 7 more.
#
# "lookaheads" for prefills, is introduced in support for
# Chunked-Prefill in Multi-Step.
return self.scheduler_config.num_lookahead_slots + 1
else:
return 0
return self.scheduler_config.num_lookahead_slots
def _get_num_new_uncached_and_cached_tokens( def _get_num_new_uncached_and_cached_tokens(
self, self,
...@@ -2068,24 +2000,6 @@ class Scheduler: ...@@ -2068,24 +2000,6 @@ class Scheduler:
The number of new tokens to schedule after chunking. The number of new tokens to schedule after chunking.
""" """
remaining_token_budget = budget.remaining_token_budget() remaining_token_budget = budget.remaining_token_budget()
if scheduler_config.is_multi_step:
# The current multi-step + chunked prefill capability does
# not actually support chunking prompts.
#
# Therefore, `num_new_tokens` is computed in the same fashion
# for both multi-step+chunked-prefill &
# multi-step+chunked-prefill+APC
#
# Prompts with more tokens than the current remaining budget
# are postponed to future scheduler steps
if num_new_tokens > prompt_limit:
# If the seq_group is in prompt-stage, pass the
# num_new_tokens as-is so the caller can ignore
# the sequence.
return num_new_tokens
return 0 if num_new_tokens > \
remaining_token_budget else num_new_tokens
# Get the number of tokens to allocate to this prefill slot # Get the number of tokens to allocate to this prefill slot
prefill_slot_budget = ( prefill_slot_budget = (
......
...@@ -362,8 +362,6 @@ class EngineArgs: ...@@ -362,8 +362,6 @@ class EngineArgs:
lora_dtype: Optional[Union[str, torch.dtype]] = LoRAConfig.lora_dtype lora_dtype: Optional[Union[str, torch.dtype]] = LoRAConfig.lora_dtype
lora_extra_vocab_size: int = LoRAConfig.lora_extra_vocab_size lora_extra_vocab_size: int = LoRAConfig.lora_extra_vocab_size
num_scheduler_steps: int = SchedulerConfig.num_scheduler_steps
multi_step_stream_outputs: bool = SchedulerConfig.multi_step_stream_outputs
ray_workers_use_nsight: bool = ParallelConfig.ray_workers_use_nsight ray_workers_use_nsight: bool = ParallelConfig.ray_workers_use_nsight
num_gpu_blocks_override: Optional[ num_gpu_blocks_override: Optional[
int] = CacheConfig.num_gpu_blocks_override int] = CacheConfig.num_gpu_blocks_override
...@@ -799,11 +797,8 @@ class EngineArgs: ...@@ -799,11 +797,8 @@ class EngineArgs:
**scheduler_kwargs["delay_factor"]) **scheduler_kwargs["delay_factor"])
scheduler_group.add_argument("--preemption-mode", scheduler_group.add_argument("--preemption-mode",
**scheduler_kwargs["preemption_mode"]) **scheduler_kwargs["preemption_mode"])
scheduler_group.add_argument("--num-scheduler-steps", # multi-step scheduling has been removed; corresponding arguments
**scheduler_kwargs["num_scheduler_steps"]) # are no longer supported.
scheduler_group.add_argument(
"--multi-step-stream-outputs",
**scheduler_kwargs["multi_step_stream_outputs"])
scheduler_group.add_argument("--scheduling-policy", scheduler_group.add_argument("--scheduling-policy",
**scheduler_kwargs["policy"]) **scheduler_kwargs["policy"])
scheduler_group.add_argument( scheduler_group.add_argument(
...@@ -1257,28 +1252,11 @@ class EngineArgs: ...@@ -1257,28 +1252,11 @@ class EngineArgs:
disable_log_stats=self.disable_log_stats, disable_log_stats=self.disable_log_stats,
) )
# Reminder: Please update docs/features/compatibility_matrix.md # make sure num_lookahead_slots is set appropriately depending on
# If the feature combo become valid # whether speculative decoding is enabled
if self.num_scheduler_steps > 1: num_lookahead_slots = self.num_lookahead_slots
if speculative_config is not None: if speculative_config is not None:
raise ValueError("Speculative decoding is not supported with " num_lookahead_slots = speculative_config.num_lookahead_slots
"multi-step (--num-scheduler-steps > 1)")
if self.enable_chunked_prefill and self.pipeline_parallel_size > 1:
raise ValueError("Multi-Step Chunked-Prefill is not supported "
"for pipeline-parallel-size > 1")
if current_platform.is_cpu():
logger.warning("Multi-Step (--num-scheduler-steps > 1) is "
"currently not supported for CPUs and has been "
"disabled.")
self.num_scheduler_steps = 1
# make sure num_lookahead_slots is set the higher value depending on
# if we are using speculative decoding or multi-step
num_lookahead_slots = max(self.num_lookahead_slots,
self.num_scheduler_steps - 1)
num_lookahead_slots = num_lookahead_slots \
if speculative_config is None \
else speculative_config.num_lookahead_slots
scheduler_config = SchedulerConfig( scheduler_config = SchedulerConfig(
runner_type=model_config.runner_type, runner_type=model_config.runner_type,
...@@ -1292,8 +1270,6 @@ class EngineArgs: ...@@ -1292,8 +1270,6 @@ class EngineArgs:
disable_chunked_mm_input=self.disable_chunked_mm_input, disable_chunked_mm_input=self.disable_chunked_mm_input,
is_multimodal_model=model_config.is_multimodal_model, is_multimodal_model=model_config.is_multimodal_model,
preemption_mode=self.preemption_mode, preemption_mode=self.preemption_mode,
num_scheduler_steps=self.num_scheduler_steps,
multi_step_stream_outputs=self.multi_step_stream_outputs,
send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER
and parallel_config.use_ray), and parallel_config.use_ray),
policy=self.scheduling_policy, policy=self.scheduling_policy,
...@@ -1392,11 +1368,6 @@ class EngineArgs: ...@@ -1392,11 +1368,6 @@ class EngineArgs:
recommend_to_remove=True) recommend_to_remove=True)
return False return False
if self.num_scheduler_steps != SchedulerConfig.num_scheduler_steps:
_raise_or_fallback(feature_name="--num-scheduler-steps",
recommend_to_remove=True)
return False
if self.scheduler_delay_factor != SchedulerConfig.delay_factor: if self.scheduler_delay_factor != SchedulerConfig.delay_factor:
_raise_or_fallback(feature_name="--scheduler-delay-factor", _raise_or_fallback(feature_name="--scheduler-delay-factor",
recommend_to_remove=True) recommend_to_remove=True)
......
...@@ -15,7 +15,7 @@ from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig, ...@@ -15,7 +15,7 @@ from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig,
from vllm.core.scheduler import SchedulerOutputs from vllm.core.scheduler import SchedulerOutputs
from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_timeout import asyncio_timeout from vllm.engine.async_timeout import asyncio_timeout
from vllm.engine.llm_engine import LLMEngine, SchedulerOutputState from vllm.engine.llm_engine import LLMEngine
from vllm.engine.metrics_types import StatLoggerBase from vllm.engine.metrics_types import StatLoggerBase
from vllm.engine.protocol import EngineClient from vllm.engine.protocol import EngineClient
from vllm.executor.executor_base import ExecutorBase from vllm.executor.executor_base import ExecutorBase
...@@ -308,13 +308,6 @@ class _AsyncLLMEngine(LLMEngine): ...@@ -308,13 +308,6 @@ class _AsyncLLMEngine(LLMEngine):
if not allow_async_output_proc and len(ctx.output_queue) > 0: if not allow_async_output_proc and len(ctx.output_queue) > 0:
self._process_model_outputs(ctx=ctx) self._process_model_outputs(ctx=ctx)
if (self.scheduler_config.is_multi_step
and scheduler_outputs.num_lookahead_slots > 0):
# cache the scheduler outputs for the next iteration if we have
# lookahead slots
self._cache_scheduler_outputs_for_multi_step(
virtual_engine, seq_group_metadata_list, scheduler_outputs,
allow_async_output_proc)
else: else:
finished_requests_ids = list() finished_requests_ids = list()
...@@ -351,29 +344,14 @@ class _AsyncLLMEngine(LLMEngine): ...@@ -351,29 +344,14 @@ class _AsyncLLMEngine(LLMEngine):
outputs = await self.model_executor.execute_model_async( outputs = await self.model_executor.execute_model_async(
execute_model_req) execute_model_req)
# we need to do this here so that last step's sampled_token_ids can
# be passed to the next iteration for PP.
if self.scheduler_config.is_multi_step:
self._update_cached_scheduler_output(virtual_engine, outputs)
else: else:
if len(ctx.output_queue) > 0: if len(ctx.output_queue) > 0:
self._process_model_outputs(ctx=ctx) self._process_model_outputs(ctx=ctx)
outputs = [] outputs = []
# Finish the current step for all the sequence groups.
if self.scheduler_config.is_multi_step:
for seq_group in seq_group_metadata_list:
seq_group.finish_step()
if not self._has_remaining_steps(seq_group_metadata_list): if not self._has_remaining_steps(seq_group_metadata_list):
# Clear the cache if we have finished all the steps
if self.scheduler_config.is_multi_step:
self.cached_scheduler_outputs[
virtual_engine] = SchedulerOutputState()
# is_first_step_output is True only when the num_steps of all # is_first_step_output is True only when the num_steps of all
# the sequences are 1. When the num_steps > 1, # the sequences are 1.
# multi_step_model_runner does the first-step output append.
is_first_step_output: bool = False if not seq_group_metadata_list \ is_first_step_output: bool = False if not seq_group_metadata_list \
else seq_group_metadata_list[0].state.num_steps == 1 else seq_group_metadata_list[0].state.num_steps == 1
......
...@@ -25,7 +25,6 @@ from vllm.engine.metrics_types import StatLoggerBase, Stats ...@@ -25,7 +25,6 @@ from vllm.engine.metrics_types import StatLoggerBase, Stats
from vllm.engine.output_processor.interfaces import ( from vllm.engine.output_processor.interfaces import (
SequenceGroupOutputProcessor) SequenceGroupOutputProcessor)
from vllm.engine.output_processor.stop_checker import StopChecker from vllm.engine.output_processor.stop_checker import StopChecker
from vllm.engine.output_processor.util import create_output_by_sequence_group
from vllm.entrypoints.openai.logits_processors import ( from vllm.entrypoints.openai.logits_processors import (
get_logits_processors as get_openai_logits_processors) get_logits_processors as get_openai_logits_processors)
from vllm.executor.executor_base import ExecutorBase from vllm.executor.executor_base import ExecutorBase
...@@ -91,7 +90,7 @@ class OutputData(NamedTuple): ...@@ -91,7 +90,7 @@ class OutputData(NamedTuple):
class SchedulerContext: class SchedulerContext:
def __init__(self, multi_step_stream_outputs: bool = False): def __init__(self) -> None:
self.output_queue: Deque[OutputData] = deque() self.output_queue: Deque[OutputData] = deque()
self.request_outputs: List[Union[RequestOutput, self.request_outputs: List[Union[RequestOutput,
PoolingRequestOutput]] = [] PoolingRequestOutput]] = []
...@@ -99,8 +98,6 @@ class SchedulerContext: ...@@ -99,8 +98,6 @@ class SchedulerContext:
List[SequenceGroupMetadata]] = None List[SequenceGroupMetadata]] = None
self.scheduler_outputs: Optional[SchedulerOutputs] = None self.scheduler_outputs: Optional[SchedulerOutputs] = None
self.multi_step_stream_outputs: bool = multi_step_stream_outputs
def append_output(self, outputs: List[SamplerOutput], def append_output(self, outputs: List[SamplerOutput],
seq_group_metadata_list: List[SequenceGroupMetadata], seq_group_metadata_list: List[SequenceGroupMetadata],
scheduler_outputs: SchedulerOutputs, is_async: bool, scheduler_outputs: SchedulerOutputs, is_async: bool,
...@@ -303,8 +300,7 @@ class LLMEngine: ...@@ -303,8 +300,7 @@ class LLMEngine:
] ]
self.scheduler_contexts = [ self.scheduler_contexts = [
SchedulerContext(multi_step_stream_outputs=self.scheduler_config. SchedulerContext()
multi_step_stream_outputs)
for _ in range(self.parallel_config.pipeline_parallel_size) for _ in range(self.parallel_config.pipeline_parallel_size)
] ]
...@@ -683,8 +679,7 @@ class LLMEngine: ...@@ -683,8 +679,7 @@ class LLMEngine:
"Priority scheduling is not enabled.") "Priority scheduling is not enabled.")
if isinstance(params, SamplingParams) \ if isinstance(params, SamplingParams) \
and params.logits_processors \ and params.logits_processors:
and self.scheduler_config.num_scheduler_steps > 1:
raise ValueError( raise ValueError(
"Logits processors are not supported in multi-step decoding") "Logits processors are not supported in multi-step decoding")
...@@ -868,45 +863,6 @@ class LLMEngine: ...@@ -868,45 +863,6 @@ class LLMEngine:
return return
def _update_num_computed_tokens_for_multi_step_prefill(
self, seq_group: SequenceGroup,
seq_group_meta: SequenceGroupMetadata,
is_first_step_output: Optional[bool]):
"""
This function updates num_computed_tokens for prompt sequences
when Multi-Step is enabled.
seq_group: SequenceGroup to update the num_computed_tokens for.
seq_group_meta: Metadata of the given SequenceGroup.
is_first_step_output: Optional[bool] -
When available, is_first_step_output indicates if the appended
output token is the output of the first-step in multi-step.
A value of None indicates that outputs from all steps in
in multi-step are submitted in a single burst.
"""
assert self.scheduler_config.is_multi_step
if not seq_group_meta.is_prompt:
# num_computed_token updates for multi-step decodes happen after
# the tokens are appended to the sequence.
return
do_update: bool = False
if self.scheduler_config.chunked_prefill_enabled:
# In multi-step + chunked-prefill case, the prompt sequences
# that are scheduled are fully processed in the first step.
do_update = is_first_step_output is None or is_first_step_output
else:
# Normal multi-step decoding case. In this case prompt-sequences
# are actually single-stepped. Always update in this case.
assert seq_group.state.num_steps == 1
do_update = True
if do_update:
seq_group.update_num_computed_tokens(
seq_group_meta.token_chunk_size)
def _process_model_outputs(self, def _process_model_outputs(self,
ctx: SchedulerContext, ctx: SchedulerContext,
request_id: Optional[str] = None) -> None: request_id: Optional[str] = None) -> None:
...@@ -939,33 +895,8 @@ class LLMEngine: ...@@ -939,33 +895,8 @@ class LLMEngine:
has_multiple_outputs: bool = len(outputs) > 1 has_multiple_outputs: bool = len(outputs) > 1
outputs_by_sequence_group: List[List[SequenceGroupOutput]] outputs_by_sequence_group: List[List[SequenceGroupOutput]]
if has_multiple_outputs: assert not has_multiple_outputs
assert self.scheduler_config.is_multi_step or \ outputs_by_sequence_group = outputs
self.speculative_config
# Organize outputs by [step][sequence group] instead of
# [sequence group][step].
if self.scheduler_config.is_multi_step:
outputs_by_sequence_group = create_output_by_sequence_group(
outputs, len(seq_group_metadata_list))
elif self.speculative_config:
# Decodes are multi-steps while prefills are not, outputting at
# most 1 token. Separate them so that we can trigger chunk
# processing without having to pad or copy over prompts K times
# to match decodes structure (costly with prompt_logprobs).
num_prefills = sum(sg.is_prompt
for sg in seq_group_metadata_list)
prefills, decodes = outputs[:num_prefills], outputs[
num_prefills:]
outputs_by_sequence_group = create_output_by_sequence_group(
decodes,
num_seq_groups=len(seq_group_metadata_list) - num_prefills)
outputs_by_sequence_group = [p.outputs for p in prefills
] + outputs_by_sequence_group
# We have outputs for multiple steps submitted in a single burst,
# so invalidate is_first_step_output.
is_first_step_output = None
else:
outputs_by_sequence_group = outputs
# Determine the requests we need to operate on # Determine the requests we need to operate on
if request_id: if request_id:
...@@ -1006,13 +937,8 @@ class LLMEngine: ...@@ -1006,13 +937,8 @@ class LLMEngine:
output = [outputs_by_sequence_group[0][i]] output = [outputs_by_sequence_group[0][i]]
if not is_async: if not is_async:
if self.scheduler_config.is_multi_step: seq_group.update_num_computed_tokens(
# Updates happen only if the sequence is prefill seq_group_meta.token_chunk_size or 0)
self._update_num_computed_tokens_for_multi_step_prefill(
seq_group, seq_group_meta, is_first_step_output)
else:
seq_group.update_num_computed_tokens(
seq_group_meta.token_chunk_size or 0)
if outputs: if outputs:
for o in outputs: for o in outputs:
...@@ -1074,15 +1000,6 @@ class LLMEngine: ...@@ -1074,15 +1000,6 @@ class LLMEngine:
for scheduler in self.scheduler: for scheduler in self.scheduler:
scheduler.free_finished_seq_groups() scheduler.free_finished_seq_groups()
# For multi-step without streaming, don't create outputs each iteration
if not is_last_step and not ctx.multi_step_stream_outputs:
# Immediately process request outputs here (if callback is given)
if (finished_now
and self.process_request_outputs_callback is not None):
self.process_request_outputs_callback(ctx.request_outputs)
ctx.request_outputs.clear()
return
# Create the outputs # Create the outputs
for i in indices: for i in indices:
if i in skip or i in finished_before or i in finished_now: if i in skip or i in finished_before or i in finished_now:
...@@ -1101,13 +1018,7 @@ class LLMEngine: ...@@ -1101,13 +1018,7 @@ class LLMEngine:
if request_output: if request_output:
ctx.request_outputs.append(request_output) ctx.request_outputs.append(request_output)
# For multi-step with streaming, create outputs each iteration # Create outputs only after processing the scheduler's results
if not is_last_step and ctx.multi_step_stream_outputs:
# Immediately process request outputs here (if callback is given)
if self.process_request_outputs_callback is not None:
self.process_request_outputs_callback(ctx.request_outputs)
ctx.request_outputs.clear()
return
for seq_group in scheduler_outputs.ignored_seq_groups: for seq_group in scheduler_outputs.ignored_seq_groups:
params = seq_group.sampling_params params = seq_group.sampling_params
...@@ -1157,16 +1068,10 @@ class LLMEngine: ...@@ -1157,16 +1068,10 @@ class LLMEngine:
if seq_group.is_finished(): if seq_group.is_finished():
continue continue
if self.scheduler_config.is_multi_step: token_chunk_size = (seq_group_metadata.token_chunk_size
# Updates happen only if the sequence is prefill if seq_group_metadata.token_chunk_size
self._update_num_computed_tokens_for_multi_step_prefill( is not None else 0)
seq_group, seq_group_metadata, seq_group.update_num_computed_tokens(token_chunk_size)
seq_group.state.num_steps == 1)
else:
token_chunk_size = (seq_group_metadata.token_chunk_size
if seq_group_metadata.token_chunk_size
is not None else 0)
seq_group.update_num_computed_tokens(token_chunk_size)
if seq_group_metadata.do_sample: if seq_group_metadata.do_sample:
assert len(sequence_group_outputs.samples) == 1, ( assert len(sequence_group_outputs.samples) == 1, (
...@@ -1177,16 +1082,8 @@ class LLMEngine: ...@@ -1177,16 +1082,8 @@ class LLMEngine:
assert len(seq_group.seqs) == 1 assert len(seq_group.seqs) == 1
seq = seq_group.seqs[0] seq = seq_group.seqs[0]
if self.scheduler_config.is_multi_step: seq.append_token_id(sample.output_token, sample.logprobs,
is_prefill_append = seq.data.get_num_uncomputed_tokens( sample.output_embed)
) == 0
seq.append_token_id(sample.output_token, sample.logprobs,
sample.output_embed)
if not is_prefill_append:
seq_group.update_num_computed_tokens(1)
else:
seq.append_token_id(sample.output_token, sample.logprobs,
sample.output_embed)
def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]: def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]:
"""Performs one decoding iteration and returns newly generated results. """Performs one decoding iteration and returns newly generated results.
...@@ -1289,13 +1186,6 @@ class LLMEngine: ...@@ -1289,13 +1186,6 @@ class LLMEngine:
if not allow_async_output_proc and len(ctx.output_queue) > 0: if not allow_async_output_proc and len(ctx.output_queue) > 0:
self._process_model_outputs(ctx=ctx) self._process_model_outputs(ctx=ctx)
if (self.scheduler_config.is_multi_step
and scheduler_outputs.num_lookahead_slots > 0):
# cache the scheduler outputs for the next iteration if we have
# lookahead slots
self._cache_scheduler_outputs_for_multi_step(
virtual_engine, seq_group_metadata_list, scheduler_outputs,
allow_async_output_proc)
else: else:
finished_requests_ids = list() finished_requests_ids = list()
...@@ -1345,10 +1235,6 @@ class LLMEngine: ...@@ -1345,10 +1235,6 @@ class LLMEngine:
# Raise so the caller is notified that this request failed # Raise so the caller is notified that this request failed
raise raise
# We need to do this here so that last step's sampled_token_ids can
# be passed to the next iteration for PP.
if self.scheduler_config.is_multi_step:
self._update_cached_scheduler_output(virtual_engine, outputs)
else: else:
# Nothing scheduled => If there is pending async postprocessor, # Nothing scheduled => If there is pending async postprocessor,
# then finish it here. # then finish it here.
...@@ -1357,19 +1243,9 @@ class LLMEngine: ...@@ -1357,19 +1243,9 @@ class LLMEngine:
# No outputs in this case # No outputs in this case
outputs = [] outputs = []
# Finish the current step for all the sequence groups.
if self.scheduler_config.is_multi_step:
for seq_group in seq_group_metadata_list:
seq_group.finish_step()
if not self._has_remaining_steps(seq_group_metadata_list): if not self._has_remaining_steps(seq_group_metadata_list):
# clear the cache if we have finished all the steps.
if self.scheduler_config.is_multi_step:
self.cached_scheduler_outputs[0] = SchedulerOutputState()
# is_first_step_output is True only when the num_steps of all # is_first_step_output is True only when the num_steps of all
# the sequences are 1. When the num_steps > 1, # the sequences are 1.
# multi_step_model_runner does the first-step output append.
is_first_step_output: bool = False if not seq_group_metadata_list \ is_first_step_output: bool = False if not seq_group_metadata_list \
else seq_group_metadata_list[0].state.num_steps == 1 else seq_group_metadata_list[0].state.num_steps == 1
...@@ -1453,22 +1329,7 @@ class LLMEngine: ...@@ -1453,22 +1329,7 @@ class LLMEngine:
def _has_remaining_steps( def _has_remaining_steps(
self, seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] self, seq_group_metadata_list: Optional[List[SequenceGroupMetadata]]
) -> bool: ) -> bool:
if (not self.scheduler_config.is_multi_step return False
or not seq_group_metadata_list):
return False
# TODO(will) this is a sanity check for nowto make sure that all the
# seqs are on the same steps. Eventually we will want to do some sort of
# dynamic scheduling when doing multi-step decoding.
ref_remaining_steps = seq_group_metadata_list[0].state.remaining_steps
if any([
seq_group.state.remaining_steps != ref_remaining_steps
for seq_group in seq_group_metadata_list[1:]
]):
raise AssertionError("All running sequence groups should "
"have the same remaining steps.")
return ref_remaining_steps > 0
def _cache_scheduler_outputs_for_multi_step( def _cache_scheduler_outputs_for_multi_step(
self, virtual_engine: int, self, virtual_engine: int,
...@@ -1497,13 +1358,6 @@ class LLMEngine: ...@@ -1497,13 +1358,6 @@ class LLMEngine:
def _get_last_sampled_token_ids( def _get_last_sampled_token_ids(
self, virtual_engine: int) -> Optional[torch.Tensor]: self, virtual_engine: int) -> Optional[torch.Tensor]:
cached_last_output = self.cached_scheduler_outputs[
virtual_engine].last_output
if (self.scheduler_config.is_multi_step
and self.parallel_config.pipeline_parallel_size > 1
and cached_last_output is not None
and cached_last_output.sampled_token_ids_cpu is not None):
return cached_last_output.sampled_token_ids_cpu
return None return None
def add_logger(self, logger_name: str, logger: StatLoggerBase) -> None: def add_logger(self, logger_name: str, logger: StatLoggerBase) -> None:
......
...@@ -36,27 +36,13 @@ class SequenceGroupOutputProcessor(ABC): ...@@ -36,27 +36,13 @@ class SequenceGroupOutputProcessor(ABC):
): ):
"""Create an output processor. """Create an output processor.
This returns a single-step output processor if num_lookahead_slots is Multi-step scheduling is no longer supported. Always return a
zero, else returns a multi-step output processor. single-step output processor.
""" """
if scheduler_config.num_lookahead_slots == 0: from vllm.engine.output_processor.single_step import (
# Importing here to avoid cycle. SingleStepOutputProcessor)
from vllm.engine.output_processor.single_step import ( return SingleStepOutputProcessor(scheduler_config, detokenizer,
SingleStepOutputProcessor) scheduler, seq_counter, stop_checker)
return SingleStepOutputProcessor(scheduler_config, detokenizer,
scheduler, seq_counter,
stop_checker)
else:
# Importing here to avoid cycle.
from vllm.engine.output_processor.multi_step import (
MultiStepOutputProcessor)
return MultiStepOutputProcessor(
detokenizer,
scheduler,
seq_counter,
get_tokenizer_for_seq,
stop_checker,
)
@abstractmethod @abstractmethod
def process_outputs(self, sequence_group: SequenceGroup, def process_outputs(self, sequence_group: SequenceGroup,
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import functools
from typing import Callable, List, cast
from vllm.core.scheduler import Scheduler
from vllm.engine.output_processor.interfaces import (
SequenceGroupOutputProcessor)
from vllm.engine.output_processor.single_step import (
single_step_process_prompt_logprob)
from vllm.engine.output_processor.stop_checker import StopChecker
from vllm.logger import init_logger
from vllm.sampling_params import SamplingParams
from vllm.sequence import (VLLM_INVALID_TOKEN_ID,
CompletionSequenceGroupOutput, Sequence,
SequenceGroup, SequenceGroupOutput, SequenceOutput,
SequenceStatus)
from vllm.transformers_utils.detokenizer import Detokenizer
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.utils import Counter
logger = init_logger(__name__)
class MultiStepOutputProcessor(SequenceGroupOutputProcessor):
"""SequenceGroupOutputProcessor which handles logic related to
detokenization and stopping conditions. It specializes to "multi-step
decoding", where vLLM's worker may generate multiple tokens per invocation.
This is currently mutually exclusive with advanced sampling techniques like
beam search, which motivates the separation of this logic from the single
step output processor.
This class is responsible for things such as correctly appending all new
token ids to their sequence, detokenizing new token ids, truncating new
output tokens after an eos token, and correctly handling the case where the
number of new output tokens per sequence differs in a single batch.
"""
def __init__(
self,
detokenizer: Detokenizer,
scheduler: List[Scheduler],
seq_counter: Counter,
get_tokenizer_for_seq: Callable[[Sequence], AnyTokenizer],
stop_checker: StopChecker,
):
self.detokenizer = detokenizer
self.scheduler = scheduler
self.seq_counter = seq_counter
self.get_tokenizer_for_seq = get_tokenizer_for_seq
self.stop_checker = stop_checker
def process_prompt_logprob(self, seq_group: SequenceGroup,
outputs: List[SequenceGroupOutput]) -> None:
"""Process prompt logprobs associated with each step of a multi-step-
scheduled computation.
Args:
seq_group: the outputs are associated with this
[`SequenceGroup`][vllm.sequence.SequenceGroup]
outputs: the
[`SequenceGroupOutput`][vllm.sequence.SequenceGroupOutput]s
for all scheduler steps
"""
for output in outputs:
# Concatenate single-step prompt logprob processing results.
assert isinstance(output, CompletionSequenceGroupOutput)
single_step_process_prompt_logprob(self, seq_group, output)
@staticmethod
@functools.lru_cache
def _log_prompt_logprob_unsupported_warning_once():
# Reminder: Please update docs/features/compatibility_matrix.md
# If the feature combo become valid
logger.warning(
"Prompt logprob is not supported by multi step workers. "
"(e.g., speculative decode uses multi step workers).")
def process_outputs(self,
sequence_group: SequenceGroup,
outputs: List[SequenceGroupOutput],
is_async: bool = False) -> None:
"""Append new tokens in the outputs to sequences in the sequence group.
This only supports sequence groups of size 1. It supports greater than
one new token per sequence.
This applies logic like stop condition checking and detokenization.
It also handles cases where there are tokens emitted after
the EOS token.
is_async - Indicates whether this postprocessor runs in
parallel with the GPU forward pass and is processing
tokens from the previous step. If this is true, then
no tokens need to be appended since it is already done
externally (before the next schedule() call)
"""
# Sequences can be in RUNNING or FINISHED_ABORTED state
# once scheduled, as a sequence is moved to FINISHED_ABORTED
# if a client disconnects from the api server.
seqs = sequence_group.get_seqs(status=SequenceStatus.RUNNING)
if seqs is None:
seqs = sequence_group.get_seqs(
status=SequenceStatus.FINISHED_ABORTED)
assert seqs, "Expected RUNNING or FINISHED_ABORTED sequences"
assert len(seqs) == 1, (
"Beam search not supported in multi-step decoding.")
seq = seqs[0]
seq_id = seq.seq_id
# This method is defined in the more generic
# SequenceGroupOutputProcessor, but here we assume that the outputs are
# of a more specific type.
assert all([
isinstance(output, CompletionSequenceGroupOutput)
for output in outputs
])
compl_outputs = cast(List[CompletionSequenceGroupOutput], outputs)
assert all([
seq_id == output.samples[0].parent_seq_id
for output in compl_outputs
])
if is_async:
# Async case: We process tokens one by one. Here, we know the token
# was already appended, so we only need to do the rest of the
# postprocessor: Detokenization + stopping logic
self._process_decode_and_stop(seq, sequence_group.sampling_params)
else:
# Standard multi-step case
# Since there's only one sequence per sequence group,
# we can take the first sample.
samples = [output.samples[0] for output in compl_outputs]
# entries in sample tokens may be invalid (eg. due to spec decode
# rejecting tokens).
valid_samples = [
sample for sample in samples
if sample.output_token != VLLM_INVALID_TOKEN_ID
]
# When both spec-decode and pre-fill chunking are enabled, we
# don't have guaranteed samples here (e.g. all -1s).
if valid_samples:
self._process_seq_outputs(seq, valid_samples,
sequence_group.sampling_params)
def _process_decode_and_stop(self, seq: Sequence,
sampling_params: SamplingParams) -> None:
new_char_count = 0
if sampling_params.detokenize and self.detokenizer:
new_char_count = self.detokenizer.decode_sequence_inplace(
seq, sampling_params)
# TODO(sang): Support lora.
self.stop_checker.maybe_stop_sequence(
seq,
new_char_count=new_char_count,
sampling_params=sampling_params,
)
def _process_seq_outputs(self, seq: Sequence,
valid_samples: List[SequenceOutput],
sampling_params: SamplingParams) -> None:
output_token_ids = [sample.output_token for sample in valid_samples]
output_logprobs = [sample.logprobs for sample in valid_samples]
output_embeds = [sample.output_embed for sample in valid_samples]
# Truncate to max_tokens if necessary.
remaining_tokens = sampling_params.max_tokens - (seq.get_output_len() +
len(output_token_ids))
if remaining_tokens < 0:
output_token_ids = output_token_ids[:remaining_tokens]
# Truncate any tokens after EOS. This is required as spec decode
# generates a fixed number of tokens without evaluating stopping
# conditions within the block. This can cause an eos token to be
# unintentionally ignored.
if not sampling_params.ignore_eos and self.detokenizer:
eos_token_id = self.get_tokenizer_for_seq(seq).eos_token_id
# Avoiding .index calls as exception throwing in the happy path
# is expensive.
for i in range(len(output_token_ids)):
if output_token_ids[i] == eos_token_id:
output_token_ids = output_token_ids[:i + 1]
break
is_prefill_sampled_token = seq.data.get_num_uncomputed_tokens() == 0
# Incrementally append tokens to the sequence, as if we had only one new
# token.
for output_token_id, output_logprob, output_embed in zip(
output_token_ids, output_logprobs, output_embeds):
seq.append_token_id(
token_id=output_token_id,
logprobs=output_logprob,
token_embed=output_embed,
)
if is_prefill_sampled_token:
is_prefill_sampled_token = False
else:
# Update num_computed_tokens iff the sampled token is not from
# a prefill step.
seq.data.update_num_computed_tokens(1)
self._process_decode_and_stop(seq, sampling_params)
if seq.is_finished():
break
...@@ -118,20 +118,10 @@ class CudaPlatformBase(Platform): ...@@ -118,20 +118,10 @@ class CudaPlatformBase(Platform):
@classmethod @classmethod
def check_and_update_config(cls, vllm_config: "VllmConfig") -> None: def check_and_update_config(cls, vllm_config: "VllmConfig") -> None:
parallel_config = vllm_config.parallel_config parallel_config = vllm_config.parallel_config
scheduler_config = vllm_config.scheduler_config
model_config = vllm_config.model_config model_config = vllm_config.model_config
if parallel_config.worker_cls == "auto": if parallel_config.worker_cls == "auto":
if scheduler_config.is_multi_step: if vllm_config.speculative_config:
if envs.VLLM_USE_V1:
raise NotImplementedError(
"Multi-step scheduling is not supported (and not "
"needed) on vLLM V1. Please launch without "
"--num-scheduler-steps.")
else:
parallel_config.worker_cls = \
"vllm.worker.multi_step_worker.MultiStepWorker"
elif vllm_config.speculative_config:
if not envs.VLLM_USE_V1: if not envs.VLLM_USE_V1:
raise NotImplementedError( raise NotImplementedError(
"Speculative decoding is not supported on vLLM V0.") "Speculative decoding is not supported on vLLM V0.")
...@@ -139,7 +129,7 @@ class CudaPlatformBase(Platform): ...@@ -139,7 +129,7 @@ class CudaPlatformBase(Platform):
else: else:
if envs.VLLM_USE_V1: if envs.VLLM_USE_V1:
parallel_config.worker_cls = \ parallel_config.worker_cls = \
"vllm.v1.worker.gpu_worker.Worker" "vllm.v1.worker.gpu_worker.Worker"
else: else:
parallel_config.worker_cls = "vllm.worker.worker.Worker" parallel_config.worker_cls = "vllm.worker.worker.Worker"
......
...@@ -327,18 +327,8 @@ class RocmPlatform(Platform): ...@@ -327,18 +327,8 @@ class RocmPlatform(Platform):
cache_config.block_size = 16 cache_config.block_size = 16
parallel_config = vllm_config.parallel_config parallel_config = vllm_config.parallel_config
scheduler_config = vllm_config.scheduler_config
if parallel_config.worker_cls == "auto": if parallel_config.worker_cls == "auto":
if scheduler_config.is_multi_step: if vllm_config.speculative_config:
if envs.VLLM_USE_V1:
raise NotImplementedError(
"Multi-step scheduling is not supported (and not "
"needed) on vLLM V1. Please launch without "
"--num-scheduler-steps.")
else:
parallel_config.worker_cls = \
"vllm.worker.multi_step_worker.MultiStepWorker"
elif vllm_config.speculative_config:
if not envs.VLLM_USE_V1: if not envs.VLLM_USE_V1:
raise NotImplementedError( raise NotImplementedError(
"Speculative decoding is not supported on vLLM V0.") "Speculative decoding is not supported on vLLM V0.")
...@@ -346,7 +336,7 @@ class RocmPlatform(Platform): ...@@ -346,7 +336,7 @@ class RocmPlatform(Platform):
else: else:
if envs.VLLM_USE_V1: if envs.VLLM_USE_V1:
parallel_config.worker_cls = \ parallel_config.worker_cls = \
"vllm.v1.worker.gpu_worker.Worker" "vllm.v1.worker.gpu_worker.Worker"
else: else:
parallel_config.worker_cls = "vllm.worker.worker.Worker" parallel_config.worker_cls = "vllm.worker.worker.Worker"
......
...@@ -133,18 +133,13 @@ class TpuPlatform(Platform): ...@@ -133,18 +133,13 @@ class TpuPlatform(Platform):
parallel_config = vllm_config.parallel_config parallel_config = vllm_config.parallel_config
scheduler_config = vllm_config.scheduler_config scheduler_config = vllm_config.scheduler_config
if parallel_config.worker_cls == "auto": if parallel_config.worker_cls == "auto":
if scheduler_config.is_multi_step:
raise NotImplementedError(
"Multi-step scheduling is not supported (and not "
"needed) on vLLM V1. Please launch without "
"--num-scheduler-steps.")
parallel_config.worker_cls = "vllm.v1.worker.tpu_worker.TPUWorker" parallel_config.worker_cls = "vllm.v1.worker.tpu_worker.TPUWorker"
assert not vllm_config.speculative_config, ( assert not vllm_config.speculative_config, (
"Speculative decoding is not yet supported for TPU backend") "Speculative decoding is not yet supported for TPU backend")
if scheduler_config.is_multimodal_model and not \ if scheduler_config.is_multimodal_model and not \
scheduler_config.disable_chunked_mm_input: scheduler_config.disable_chunked_mm_input:
logger.warning("TPU does not support running Multimodal models"\ logger.warning("TPU does not support running Multimodal models"\
" without setting `--disable_chunked_mm_input`. " \ " without setting `--disable_chunked_mm_input`. " \
"Forcing --disable_chunked_mm_input.") "Forcing --disable_chunked_mm_input.")
......
...@@ -794,35 +794,6 @@ class SequenceGroup: ...@@ -794,35 +794,6 @@ class SequenceGroup:
def lora_int_id(self) -> int: def lora_int_id(self) -> int:
return self.lora_request.lora_int_id if self.lora_request else 0 return self.lora_request.lora_int_id if self.lora_request else 0
def init_multi_step(self, num_steps: int) -> None:
self.state.num_steps = num_steps
self.state.current_step = 0
def init_multi_step_from_lookahead_slots(self, num_lookahead_slots: int,
num_scheduler_steps: int,
is_multi_step: bool,
enable_chunking: bool) -> None:
if not is_multi_step:
self.init_multi_step(num_steps=num_scheduler_steps)
return
# Multi-Step case
is_prefill = self.is_prefill()
# The asserts below reflect the expectations of the current system.
if is_prefill and enable_chunking:
assert num_lookahead_slots == num_scheduler_steps
self.init_multi_step(num_steps=num_lookahead_slots)
else:
is_decode: bool = not is_prefill
# If it is a prefill, num_lookahead_slots must be 0
assert num_lookahead_slots == 0 or is_decode
# If it is a decode, num_lookahead_slots + 1 must match
# the scheduler steps.
assert num_lookahead_slots + 1 == num_scheduler_steps or is_prefill
self.init_multi_step(num_steps=num_lookahead_slots + 1)
def set_last_token_time(self, now: float) -> None: def set_last_token_time(self, now: float) -> None:
"""Sets the last token time for Request level timings.""" """Sets the last token time for Request level timings."""
# If still in prefill phase, assertion fails. # If still in prefill phase, assertion fails.
...@@ -1367,15 +1338,6 @@ class ExecuteModelRequest( ...@@ -1367,15 +1338,6 @@ class ExecuteModelRequest(
# Async callback # Async callback
async_callback: Optional[Callable] = None async_callback: Optional[Callable] = None
@property
def is_first_multi_step(self) -> bool:
# TODO(will) make this be able to handle batches with variable number of
# steps
assert len(self.seq_group_metadata_list) > 0
first_seq_group = self.seq_group_metadata_list[0]
assert first_seq_group.state is not None
return first_seq_group.state.current_step == 0
@property @property
def is_last_step(self) -> bool: def is_last_step(self) -> bool:
# TODO(will) make this be able to handle batches with variable number of # TODO(will) make this be able to handle batches with variable number of
......
...@@ -508,8 +508,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]): ...@@ -508,8 +508,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]):
if inter_data.is_prompt: if inter_data.is_prompt:
context_len = seq_data.get_num_computed_tokens() context_len = seq_data.get_num_computed_tokens()
seq_len = min(seq_len, context_len + token_chunk_size) seq_len = min(seq_len, context_len + token_chunk_size)
elif self.runner.scheduler_config.is_multi_step or \ elif self.runner.model_config.is_encoder_decoder:
self.runner.model_config.is_encoder_decoder:
context_len = seq_len - 1 context_len = seq_len - 1
else: else:
context_len = seq_data.get_num_computed_tokens() context_len = seq_data.get_num_computed_tokens()
...@@ -778,9 +777,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]): ...@@ -778,9 +777,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]):
int: Returns the determined number of padding sequences. If int: Returns the determined number of padding sequences. If
CUDA graphs is not viable, returns -1. CUDA graphs is not viable, returns -1.
""" """
is_mscp: bool = self.runner.scheduler_config.is_multi_step and \ decode_only = self.decode_only
self.runner.scheduler_config.chunked_prefill_enabled
decode_only = self.decode_only or is_mscp
if not decode_only: if not decode_only:
# Early exit so we can treat num_seqs as the batch_size below. # Early exit so we can treat num_seqs as the batch_size below.
return -1 return -1
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import dataclasses
import functools
from dataclasses import dataclass, field
from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple,
Union)
import torch
from vllm.distributed import get_pp_group
from vllm.logger import init_logger
from vllm.model_executor.layers.sampler import (PromptLogprobs, SampleLogprobs,
SamplerOutput,
SamplingMetadata, get_logprobs,
get_pythonized_sample_results)
from vllm.platforms import current_platform
from vllm.sequence import (CompletionSequenceGroupOutput, IntermediateTensors,
Logprob, SequenceGroupMetadata, SequenceOutput)
from vllm.utils import PyObjectCache, async_tensor_h2d, current_stream
from vllm.worker.model_runner import (GPUModelRunnerBase,
ModelInputForGPUWithSamplingMetadata)
from vllm.worker.model_runner_base import (
BroadcastableModelInput, _init_attn_metadata_from_tensor_dict,
_init_frozen_model_input_from_tensor_dict,
_init_sampling_metadata_from_tensor_dict)
from ..model_executor.model_loader.tensorizer import TensorizerConfig
if TYPE_CHECKING:
from vllm.attention.backends.abstract import AttentionBackend
logger = init_logger(__name__)
MULTI_STEP_ATTENTION_BACKENDS = [
"FLASH_ATTN", "ROCM_FLASH", "FLASHINFER", "NO_ATTENTION"
]
MULTI_STEP_CHUNKED_PREFILL_ATTENTION_BACKENDS = ["FLASH_ATTN", "FLASHINFER"]
def _get_supported_attention_backends(chunked_prefill_enabled: bool) \
-> List[str]:
if chunked_prefill_enabled:
return MULTI_STEP_CHUNKED_PREFILL_ATTENTION_BACKENDS
else:
return MULTI_STEP_ATTENTION_BACKENDS
def seq_output_builder():
return SequenceOutput(
0, 0,
{0: Logprob(logprob=float('inf'), rank=None, decoded_token=None)})
def completion_seq_group_output_builder():
return CompletionSequenceGroupOutput([], None)
# Used by pythonization to reduce python object allocations
class PythonizationCache:
def __init__(self):
self.cached_seq_output = PyObjectCache(seq_output_builder)
self.cached_completion_seq_group_output = PyObjectCache(
completion_seq_group_output_builder)
def reset(self):
self.cached_seq_output.reset()
self.cached_completion_seq_group_output.reset()
@dataclass
class ModelOutput:
"""The output of a single model forward pass.
The sampler_output_ready_event is set when the tensors in
sampler_output are ready (the model+sampler forward pass has
completed). We use the event to synchronize the GPU->CPU transfer,
which we want to only run when the data has been written to the
GPU tensors. Until the event is ready, the tensors in sampler_output
will have garbage data.
There are two scenarios:
1. The output tensors are ready and we can pythonize them immediately.
2. The output tensors are not ready and we need to wait for the event to be
ready.
"""
sampler_output: SamplerOutput
sampler_output_ready_event: torch.cuda.Event
sampled_token_ids: Optional[torch.Tensor] = None
pythonized: bool = False
# On-device tensor containing the logprobs of each token.
logprobs: Optional["torch.Tensor"] = None
pythonization_cache: Optional[PythonizationCache] = None
def pythonize(self, input_metadata: "StatefulModelInput",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> None:
"""Pythonize the output. Blocking."""
if not self.pythonized:
self._pythonize_sampler_output(input_metadata, copy_stream,
pinned_sampled_token_buffer, True)
self.pythonized = True
def maybe_pythonize(self, input_metadata: "StatefulModelInput",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> None:
"""Pythonize the output if ready, else return None. Non-blocking."""
if not self.pythonized:
self.pythonized = self._pythonize_sampler_output(
input_metadata, copy_stream, pinned_sampled_token_buffer,
False)
def _pythonize_sampler_output(self, input_metadata: "StatefulModelInput",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor,
blocking: bool) -> bool:
"""
If blocking is set, will block until the forward pass for the output is
ready and pythonize the output. Upon completing Pythonization, erases
self.logprobs (note that a non-blocking call that is performed when
the sampler output is not yet ready, will not erase self.logprobs.)
"""
assert self.sampled_token_ids is not None
if not blocking and not self.sampler_output_ready_event.query():
return False
if blocking:
self.sampler_output_ready_event.synchronize()
with torch.cuda.stream(copy_stream):
_pythonize_sampler_output(input_metadata, self.sampler_output,
pinned_sampled_token_buffer,
self.sampled_token_ids, self.logprobs,
self.pythonization_cache)
# Erase the logprobs GPU-side tensor.
# Note that although _pythonize_sampler_output() runs in its
# own CUDA stream, nonetheless _pythonize_sampler_output()
# cannot return until Pythonization is complete; therefore
# we know that by the time the CPU reaches this point,
# `self.logprobs` is no longer needed.
self.logprobs = None
return True
@dataclass(frozen=False)
class StatefulModelInput(BroadcastableModelInput):
# actual frozen model input dataclass passed to _base_model_runner
frozen_model_input: Optional[ModelInputForGPUWithSamplingMetadata] = None
# list of model outputs for each step, may not be all pythonized
cached_outputs: List[ModelOutput] = field(default_factory=list)
# used to pass sampled token ids from the last step to the current step for
# TP workers. Used to append to end of outputs and used by advance_step
last_sampled_token_ids: Optional[torch.Tensor] = None
current_step: int = 0
is_multi_step: bool = True
is_last_step: bool = False
is_first_multi_step: bool = False
base_output_proc_callback: Optional[Callable] = None
# ping-pong data structures for multi-step to wait on the previous step
step_cuda_events: List[current_platform.Event] = field(
default_factory=lambda: [current_platform.Event(blocking=True)] * 2)
num_seqs: int = -1
num_queries: int = -1
num_single_step_prefills: int = 0
def as_broadcastable_tensor_dict(self) -> Dict[str, Any]:
assert self.frozen_model_input is not None
tensor_dict = self.frozen_model_input.as_broadcastable_tensor_dict()
new_tensor_dict = {
'last_sampled_token_ids': self.last_sampled_token_ids,
'current_step': self.current_step,
'is_multi_step': self.is_multi_step,
'is_last_step': self.is_last_step,
'is_first_multi_step': self.is_first_multi_step,
'num_seqs': self.num_seqs,
'num_queries': self.num_queries,
'num_single_step_prefills': self.num_single_step_prefills,
}
tensor_dict.update(new_tensor_dict)
return tensor_dict
@classmethod
def from_broadcasted_tensor_dict(
cls,
tensor_dict: Dict[str, Any],
attn_backend: Optional["AttentionBackend"] = None,
) -> "StatefulModelInput":
tensor_dict = _init_sampling_metadata_from_tensor_dict(tensor_dict)
if attn_backend is not None:
tensor_dict = _init_attn_metadata_from_tensor_dict(
attn_backend, tensor_dict)
tensor_dict = _init_frozen_model_input_from_tensor_dict(
ModelInputForGPUWithSamplingMetadata, tensor_dict)
return cls(**tensor_dict)
def record_step_event(self, current_stream: torch.cuda.Stream):
# record the event for the current step so that the next step can sync
# on it. We modulo by 2 to keep the events in a circular buffer and
# support any attn backends that may be supported in the future. ie
# Flashinfer would want two DecodeWrappers to overlap the CPU and GPU.
self.step_cuda_events[self.current_step & 1] = \
torch.cuda.Event(blocking=True)
self.step_cuda_events[self.current_step & 1].record(current_stream)
def wait_previous_step(self):
# These cuda events are an explicit synchronization to ensure that
# advance_step() (for other attn backends that may be supported in the
# future) do not clobber any data structures that is also used by any
# enqueued forwards steps. For distributed case, only a single event is
# needed, but for single GPU case, since we can let the CPU run much
# further ahead, two events allow us to overlap the advance_step with
# the previous forward (ie using two DecodeWrappers for flashinfer
# backend)
self.step_cuda_events[(self.current_step + 1) & 1].wait()
def add_sampler_output(self,
sampler_output: SamplerOutput,
sampled_token_ids: Optional[torch.Tensor] = None):
self.cached_outputs.append(
ModelOutput(sampler_output=sampler_output,
sampler_output_ready_event=None,
sampled_token_ids=sampled_token_ids,
pythonized=False))
def maybe_advance_sampling_metadata(self, device: str, pin_memory: bool):
"""
sampling_metadata.selected_token_indices is constructed for the
first-step in Multi-Step. However, when chunked-prefill is enabled with
multi-step, the scheduled prompts are fully processed in the
first-step and are processed as decodes in the rest of the steps.
This function updates the sampling_metadata.selected_token_indices
to account for this conversion.
Example:
Let 2 prompts and 2 decodes be scheduled together. Let the
num-tokens to process for the 2 prompts be 5 and 8 respectively.
In that case, sampling_metadata.sampled_token_indices will be,
[4, 12, 13, 14] as it is constructed for the first-step in
multi-step.
However, the prompts turns to decodes after the first-step
and the num-tokens for the previously-prompt sequences will
be 1 and 1 as they are decodes now. The self.sampled_token_indices
must be updated to [0,1,2,3].
"""
assert self.current_step == 1 and self.num_single_step_prefills > 0
if not get_pp_group().is_last_rank:
return
assert self.frozen_model_input is not None
assert self.frozen_model_input.sampling_metadata is not None
self.frozen_model_input.sampling_metadata.selected_token_indices = \
async_tensor_h2d(list(range(self.num_queries)),
dtype=torch.long,
target_device=device,
pin_memory=pin_memory)
def maybe_advance_frozen_model_input(self, device: str, pin_memory: bool):
"""
Advancing the datastructures of StatefulModelInput::frozen_model_input
is only required when prefills are scheduled with decodes to run in
multi-step. This advancement/correction is required to account for
the conversion of Prefills to Decodes after the first multi-step.
"""
if self.current_step != 1 or self.num_single_step_prefills == 0:
return
assert self.frozen_model_input is not None
fmi = self.frozen_model_input
# Truncate input_tokens
assert fmi.input_tokens is not None
assert fmi.input_tokens.shape[0] >= self.num_seqs
fmi_new_input_tokens: torch.Tensor = fmi.input_tokens[:self.num_seqs]
# Update frozen_model_input::input_positions.
assert fmi.input_positions is not None
assert fmi.input_positions.shape[0] >= self.num_seqs
fmi_new_input_positions: torch.Tensor = fmi.input_positions[:self.
num_seqs]
# Assert unsupported
assert fmi.lora_mapping is None
assert fmi.lora_requests is not None
assert len(fmi.lora_requests) == 0
assert fmi.attn_metadata is not None
assert fmi.multi_modal_kwargs is not None
assert len(fmi.multi_modal_kwargs) == 0
self.frozen_model_input = dataclasses.replace(
self.frozen_model_input,
input_tokens=fmi_new_input_tokens,
input_positions=fmi_new_input_positions)
self.maybe_advance_sampling_metadata(device, pin_memory)
# MutableModelInputForGPUWithMultiStepMetadata is not subclass of
# ModelInputForGPU but it wraps the actual input dataclass and adds multi-step
# metadata
# mypy: disable-error-code=type-var
class MultiStepModelRunner(GPUModelRunnerBase[StatefulModelInput]):
# mypy: enable-error-code=type-var
def __init__(self, base_model_runner: GPUModelRunnerBase, *args, **kwargs):
super().__init__(*args, **kwargs)
# Check attention backend support.
supported_attention_backends: List[str] = \
_get_supported_attention_backends(
self.scheduler_config.chunked_prefill_enabled)
if self.attn_backend.get_name() not in supported_attention_backends:
ms_config_str: str = "Multi-Step + Chunked-Prefill" \
if self.scheduler_config.chunked_prefill_enabled \
else "Multi-Step"
raise ValueError(
f"{ms_config_str} not supported for attention backend: "
f"{self.attn_backend.get_name()}. Set VLLM_ATTENTION_BACKEND "
f"to a value from {supported_attention_backends}.")
# uses the base model runner to execute the model and wraps it with
# multi-step logic
self._base_model_runner: GPUModelRunnerBase = base_model_runner
self.is_multi_step = self.scheduler_config.is_multi_step
self.pinned_sampled_token_ids: Optional[torch.Tensor] = None
# Using the PythonizationCache in Pipeline-Parallel clobbers the
# SequenceOutput and CompletionSequenceGroupOutput object.
# When cache-reset happens at the last step of a multi-step
# execution, there may be other on-going single-step/multi-step
# executions. The current caching implementation does not check
# for this.
self.pythonization_cache = PythonizationCache() \
if self.parallel_config.pipeline_parallel_size == 1 else None
@functools.cached_property
def _copy_stream(self):
# used to copy tensors from GPU to CPU asynchronously
return torch.cuda.Stream()
def make_model_input_from_broadcasted_tensor_dict(
self, tensor_dict: Dict[str, Any]) -> StatefulModelInput:
model_input = (StatefulModelInput.from_broadcasted_tensor_dict(
tensor_dict,
attn_backend=self.attn_backend,
))
return model_input
def prepare_model_input(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
virtual_engine: int = 0,
finished_requests_ids: Optional[List[str]] = None
) -> StatefulModelInput:
frozen_model_input: ModelInputForGPUWithSamplingMetadata = \
self._base_model_runner.prepare_model_input(
seq_group_metadata_list,
virtual_engine,
finished_requests_ids)
assert frozen_model_input.query_lens is not None
assert frozen_model_input.seq_lens is not None
assert frozen_model_input.attn_metadata is not None
num_queries = len(frozen_model_input.query_lens)
num_seqs = len(frozen_model_input.seq_lens)
num_single_step_prefills = frozen_model_input.attn_metadata.num_prefills
model_input = StatefulModelInput(
frozen_model_input=frozen_model_input,
num_seqs=num_seqs,
num_queries=num_queries,
num_single_step_prefills=num_single_step_prefills)
return model_input
def _async_process_outputs(self, model_input: StatefulModelInput,
output_proc_callback: Callable):
# Proceed with pythonization and output_proc in order.
# Stop on the first one that fails to pythonize
output_proc_callback()
cont = True
for step_num, model_output in enumerate(model_input.cached_outputs):
if not model_output.pythonized:
model_output.maybe_pythonize(model_input, self._copy_stream,
self.pinned_sampled_token_ids)
if model_output.pythonized:
ctx = output_proc_callback.keywords["ctx"]
ctx.append_output(
outputs=[model_output.sampler_output],
seq_group_metadata_list=ctx.seq_group_metadata_list,
scheduler_outputs=ctx.scheduler_outputs,
is_async=False,
is_last_step=False,
is_first_step_output=step_num == 0)
output_proc_callback()
else:
cont = False
if not cont:
break
def _final_process_outputs(
self, model_input: StatefulModelInput,
output_proc_callback: Optional[Callable]) -> List[SamplerOutput]:
assert model_input.frozen_model_input is not None
has_async_callback = output_proc_callback is not None
outputs = []
for step_num, output in enumerate(model_input.cached_outputs):
is_last_step = step_num == len(model_input.cached_outputs) - 1
# For non-async case:
# -- We simply add the outputs
# For async case:
# -- Invoke callback, pythonize, add to callback queue and repeat
# -- For last output, just add to callback queue
if has_async_callback:
assert output_proc_callback is not None
# Invoke callback before pythonize (to overlap with GPU)
output_proc_callback()
# Pythonize
if not output.pythonized:
output.pythonize(model_input, self._copy_stream,
self.pinned_sampled_token_ids)
# For non last step, add to callback queue to chain
# callbacks=>pythonize pairs (for GPU overlap)
if not is_last_step:
ctx = output_proc_callback.keywords[ # type: ignore
"ctx"] # type: ignore
ctx.append_output(
outputs=[output.sampler_output],
seq_group_metadata_list=ctx.
seq_group_metadata_list,
scheduler_outputs=ctx.scheduler_outputs,
is_async=False,
is_last_step=False,
is_first_step_output=step_num == 0)
else:
outputs.append(output.sampler_output)
else:
output.pythonize(model_input, self._copy_stream,
self.pinned_sampled_token_ids)
outputs.append(output.sampler_output)
return outputs
@torch.inference_mode()
def execute_model(
self,
model_input: StatefulModelInput,
kv_caches: List[torch.Tensor],
intermediate_tensors: Optional[IntermediateTensors] = None,
num_steps: int = 1,
) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]:
"""
Execute the model for a single step and update multi-step
metadata
"""
assert num_steps == 1, "MultiStepModelRunner only supports num_steps=1"
frozen_model_input = model_input.frozen_model_input
assert frozen_model_input is not None
# path for warm up runs
if not model_input.is_multi_step:
return self._base_model_runner.execute_model(
frozen_model_input, None, intermediate_tensors, num_steps)
# make sure we skip the sampler on the lask rank and only pythonize
# if CPU is ahead.
if self.is_driver_worker and get_pp_group().is_last_rank:
if self.pinned_sampled_token_ids is None:
self.pinned_sampled_token_ids = torch.zeros(
(self.scheduler_config.max_num_seqs, 1),
dtype=torch.long,
device="cpu",
pin_memory=True)
self._base_model_runner.sampler.include_gpu_probs_tensor = True
if frozen_model_input.sampling_metadata:
frozen_model_input.sampling_metadata.skip_sampler_cpu_output = (
True)
# some pre-execute model logic for multi-step:
# - if it's the first step, we need to reset the sampling tensors
# - if it's not the first step, we need to advance the step using the
# appended sampler output from last iteration
# - also maybe pythonize if CPU is ahead of GPU
stream = current_stream()
if not model_input.is_first_multi_step:
# Explicitly block on the previous step's forward to make sure we
# don't clobber any GPU tensors still in use.
# This is not needed for flashattn backend, but for other attn
# backends such as flashinfer that performs extra CPU operations on
# input metadata we may need to synchronize any CPU operations that
# might clobber enqueued forwards. (prevents CPU from running too
# far ahead if needed)
model_input.wait_previous_step()
model_input = self._advance_step(
model_input, model_input.cached_outputs[-1].sampler_output)
# frozen_model_input may have been updated
frozen_model_input = model_input.frozen_model_input
assert frozen_model_input is not None
if model_input.base_output_proc_callback is None:
assert frozen_model_input is not None
model_input.base_output_proc_callback = \
frozen_model_input.async_callback
if frozen_model_input.async_callback is not None:
assert model_input.base_output_proc_callback is not None
async_callback = functools.partial(
self._async_process_outputs,
model_input=model_input,
output_proc_callback=model_input.base_output_proc_callback)
model_input.frozen_model_input = dataclasses.replace( # type: ignore
model_input.frozen_model_input,
async_callback=async_callback)
# Update the local instance
frozen_model_input = model_input.frozen_model_input
assert frozen_model_input is not None
# Execute the model
output = self._base_model_runner.execute_model(frozen_model_input,
None,
intermediate_tensors,
num_steps=1)
# record the event for the current step so that the next step can sync
model_input.record_step_event(stream)
if get_pp_group().is_last_rank and self.is_driver_worker:
assert isinstance(output, list)
assert len(
output
) == 1, "MultiStepModelRunner requires single-step base_models"
# event for the pythonization so that we only pythonize if the
# tensors are ready. May be able to be combined with the step event
output_ready_event = torch.cuda.Event()
output_ready_event.record(stream)
if self.parallel_config.pipeline_parallel_size > 1:
output[0].sampled_token_ids_cpu = output[
0].sampled_token_ids.cpu()
model_input.cached_outputs.append(
ModelOutput(output[0], output_ready_event,
output[0].sampled_token_ids, False,
output[0].logprobs, self.pythonization_cache))
# These GPU tensors are not required by multi-step;
# erase them to ensure they are not pythonized or
# transferred to CPU
output[0].sampled_token_ids = None
output[0].sampled_token_probs = None
output[0].logprobs = None
# Pythonize the output if CPU is ahead and the previous step is
# ready.
if frozen_model_input.async_callback is None:
for model_output in model_input.cached_outputs:
model_output.maybe_pythonize(model_input,
self._copy_stream,
self.pinned_sampled_token_ids)
model_input.current_step += 1
if not get_pp_group().is_last_rank:
# Should be IntermediateTensors
assert isinstance(output, IntermediateTensors)
return output
if not self.is_driver_worker:
return []
# Pythonize the output and block if needed since it is the last step
if model_input.is_last_step:
outputs = self._final_process_outputs(
model_input, model_input.base_output_proc_callback)
if self.pythonization_cache:
self.pythonization_cache.reset()
return outputs
# should be [SamplerOutput]
return output
def _update_sampling_metadata(self, sampling_metadata: SamplingMetadata,
num_seqs: Optional[int], num_queries: int):
assert sampling_metadata.num_prompts == 0
assert len(sampling_metadata.seq_groups) == num_queries
assert sampling_metadata.selected_token_indices.shape == (
num_queries, )
# assert sampling_metadata.categorized_sample_indices == TODO: Add if needed # noqa: E501
# Verify that all sequences are decodes
for i in range(num_queries):
seq_group = sampling_metadata.seq_groups[i]
assert seq_group.is_prompt is False # No prompt
assert seq_group.prompt_logprob_indices == [] # No prompt
assert seq_group.sample_indices == [i] # Simple
assert seq_group.seq_len is None # Decode
assert seq_group.query_len is None # Decode
def _advance_step(self, model_input: StatefulModelInput,
out: SamplerOutput) -> StatefulModelInput:
model_input.maybe_advance_frozen_model_input(self.device,
self.pin_memory)
frozen_model_input = model_input.frozen_model_input
assert frozen_model_input is not None
assert frozen_model_input.input_tokens is not None
assert frozen_model_input.input_tokens.shape[0] == model_input.num_seqs
assert frozen_model_input.attn_metadata is not None
sampled_token_ids = model_input.cached_outputs[-1].sampled_token_ids
num_seqs = model_input.num_seqs
num_queries = model_input.num_queries
frozen_model_input = model_input.frozen_model_input
assert frozen_model_input is not None
attn_metadata = frozen_model_input.attn_metadata
assert attn_metadata is not None
turn_prefills_into_decodes: bool = model_input.current_step == 1 and \
model_input.num_single_step_prefills != 0
attn_metadata.advance_step(
frozen_model_input,
sampled_token_ids,
self.block_size,
num_seqs,
num_queries,
turn_prefills_into_decodes=turn_prefills_into_decodes)
return model_input
def load_model(self) -> None:
self._base_model_runner.load_model()
self.model_memory_usage = self._base_model_runner.model_memory_usage
def save_sharded_state(
self,
path: str,
pattern: Optional[str] = None,
max_size: Optional[int] = None,
) -> None:
return self._base_model_runner.save_sharded_state(
path, pattern, max_size)
def save_tensorized_model(self,
tensorizer_config: TensorizerConfig) -> None:
return self._base_model_runner.save_tensorized_model(tensorizer_config)
def profile_run(self) -> None:
return self._base_model_runner.profile_run()
def remove_all_loras(self):
return self._base_model_runner.remove_all_loras()
def capture_model(self, kv_caches: List[List]) -> None:
return self._base_model_runner.capture_model(kv_caches)
@property
def vocab_size(self) -> int:
return self._base_model_runner.vocab_size
DeferredLogprobsReturnType = Tuple[Optional[List[Optional[PromptLogprobs]]],
Optional[List[SampleLogprobs]]]
def deferred_pythonize_logprobs(
output: SamplerOutput,
sampling_metadata: SamplingMetadata,
logprobs_tensor: Optional[torch.Tensor],
) -> DeferredLogprobsReturnType:
"""Perform deferred logprob Pythonization.
1. Pythonize GPU-side sampler result tensors into CPU-side sampler result.
2. Pythonize GPU-side logprobs tensor into CPU-side logprobs lists,
utilizing the Pythonized sampler result computed in step 1.
These deferred computations are not required for single-step scheduling
or the `profile_run()` phase of multi-step scheduling.
Args:
output: sampler output (under deferred Pythonization)
sampling_metadata
Returns:
prompt_logprobs (CPU), sample_logprobs (CPU)
"""
# - Deferred pythonization of sample result
sampler_result = get_pythonized_sample_results(
output.deferred_sample_results_args)
# - Erase the GPU-side deferred sample_result
# computation args to ensure it is never
# pythonized or transferred to CPU
output.deferred_sample_results_args = None
# - Deferred pythonization of logprobs
(
prompt_logprobs,
sample_logprobs,
) = get_logprobs(logprobs_tensor, sampling_metadata, sampler_result)
assert len(prompt_logprobs) == len(sampling_metadata.seq_groups)
assert len(sample_logprobs) == len(sampling_metadata.seq_groups)
return prompt_logprobs, sample_logprobs
def _pythonize_sampler_output(
model_input: StatefulModelInput,
output: SamplerOutput,
pinned_sampled_token_buffer: torch.Tensor,
sampled_token_ids: torch.Tensor,
logprobs_tensor: Optional[torch.Tensor],
cache: Optional[PythonizationCache],
) -> None:
""" This function is only called when the output tensors are ready.
See [`ModelOutput`][vllm.worker.multi_step_model_runner.ModelOutput].
Modifies `output.outputs` and `pinned_sampled_token_buffer` in-place,
adding a Pythonized output data structure
([`CompletionSequenceGroupOutput`][vllm.sequence.CompletionSequenceGroupOutput])
for each [`SequenceGroup`][vllm.sequence.SequenceGroup].
Args:
model_input
output: sampler output
pinned_sampled_token_token_buffer: CPU-side pinned memory
(receives copy of
GPU-side token buffer.)
sampled_token_ids: GPU-side token buffer
logprobs_tensor: GPU-side tensor containing
logprobs computed during sampling
"""
assert model_input.frozen_model_input is not None
frozen_model_input = model_input.frozen_model_input
assert frozen_model_input.sampling_metadata is not None
sampling_metadata = frozen_model_input.sampling_metadata
# samples generation should have been skipped
assert not output.outputs
pinned_buffer = pinned_sampled_token_buffer[:model_input.num_queries]
# We guarantee output tensors are ready, so it is safe to
# pythonize the sampler output & obtain CPU-side logprobs.
#
# However we should check whether logprobs pythonization may
# be skipped entirely, i.e. because no logprobs were requested
# or pythonization was not deferred. To that end,
#
# * `prompt_logprobs_are_requested_for_prefill` signals that
# there are *any* prefill-phase requests which specify that
# prompt logprobs should be returned.
#
# * `any_logprobs_are_requested` signals that there are any
# requests which (1) specify that sample logprobs should be
# returned, or (2) are in the prefill phase AND specify that
# prompt logprobs should be returned.
#
# Later on, these flags cause adjustments to the pythonization
# process to accommodate logprobs.
seq_groups = sampling_metadata.seq_groups
prompt_logprobs_are_requested_for_prefill = any([
sg.sampling_params.prompt_logprobs is not None and sg.is_prompt
for sg in seq_groups
])
any_logprobs_are_requested = (
prompt_logprobs_are_requested_for_prefill
or any([sg.sampling_params.logprobs is not None for sg in seq_groups]))
if prompt_logprobs_are_requested_for_prefill:
# CPU GPU sync, after gathering *only* sampled tokens (since
# requesting prompt logprobs leads `sampled_token_ids` to
# include prompt token ids in addition to sampled token ids.)
sample_idx_tensor = torch.tensor(
[sdx for sg in seq_groups for sdx in sg.sample_indices])
pinned_buffer = pinned_buffer.copy_(
sampled_token_ids[sample_idx_tensor, :], non_blocking=False)
else:
# CPU GPU sync
pinned_buffer = pinned_buffer.copy_(sampled_token_ids,
non_blocking=False)
# this will not block as the tensors are already on CPU
samples_list = pinned_buffer.tolist()
skip_sampler_cpu_output = (
frozen_model_input.sampling_metadata.skip_sampler_cpu_output)
# *Don't* skip logprobs pythonization *if*:
# * Any requests require logprobs to be returned in this
# iteration AND
# * These requests are being scheduled in a fashion which
# defers pythonization (i.e. multi-step scheduling.)
do_pythonize_logprobs = (skip_sampler_cpu_output
and any_logprobs_are_requested)
(
prompt_logprobs,
sample_logprobs,
) = (deferred_pythonize_logprobs(output, sampling_metadata,
logprobs_tensor)
if do_pythonize_logprobs else (None, None))
for sgdx, (seq_group,
sample_result) in enumerate(zip(seq_groups, samples_list)):
# Reminder: Please update docs/features/compatibility_matrix.md
# If the feature combo become valid
# (Check for Guided Decoding)
if seq_group.sampling_params.logits_processors:
assert len(seq_group.sampling_params.logits_processors) == 0, (
"Logits Processors are not supported in multi-step decoding")
if do_pythonize_logprobs:
assert prompt_logprobs is not None
assert sample_logprobs is not None
(
group_prompt_logprobs,
group_sample_logprobs,
) = ( # Utilize deferred pythonization results
prompt_logprobs[sgdx],
sample_logprobs[sgdx],
)
elif any_logprobs_are_requested:
(
group_prompt_logprobs,
group_sample_logprobs,
) = (
# profile_run: use already-computed logprobs
output.outputs[sgdx].prompt_logprobs,
[sample.logprobs for sample in output.outputs[sgdx].samples])
seq_ids = seq_group.seq_ids
next_token_ids = sample_result
parent_ids = [0]
seq_outputs: List[SequenceOutput]
if cache is not None:
completion_seq_group_output: CompletionSequenceGroupOutput = \
cache.cached_completion_seq_group_output.get_object()
completion_seq_group_output.samples.clear()
seq_outputs = completion_seq_group_output.samples
else:
seq_outputs = []
for tdx, (parent_id,
next_token_id) in enumerate(zip(parent_ids, next_token_ids)):
if cache is not None:
seq_output: SequenceOutput = cache.cached_seq_output.get_object(
)
seq_output.parent_seq_id = seq_ids[parent_id]
seq_output.output_token = next_token_id
if any_logprobs_are_requested:
seq_output.logprobs = group_sample_logprobs[tdx]
else:
logprobs = next(iter(seq_output.logprobs.values()))
seq_output.logprobs.clear()
logprobs.logprob = float('inf')
logprobs.rank = None
logprobs.decoded_token = None
seq_output.logprobs[next_token_id] = logprobs
seq_outputs.append(seq_output)
else:
seq_outputs.append(
SequenceOutput(seq_ids[parent_id], next_token_id,
(group_sample_logprobs[tdx]
if any_logprobs_are_requested else {
next_token_id:
Logprob(logprob=float('inf'),
rank=None,
decoded_token=None)
})))
if cache is not None:
completion_seq_group_output.prompt_logprobs = \
group_prompt_logprobs if any_logprobs_are_requested else None
output.outputs.append(completion_seq_group_output)
else:
output.outputs.append(
CompletionSequenceGroupOutput(
seq_outputs, (group_prompt_logprobs
if any_logprobs_are_requested else None)))
assert len(output.outputs) > 0
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from importlib.util import find_spec
from typing import List, Optional
import torch
from vllm.config import VllmConfig
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.multimodal import MultiModalKwargs
from vllm.sequence import IntermediateTensors
from vllm.worker.neuron_model_runner import (ModelInputForNeuron,
NeuronModelRunner)
class MultiStepNeuronModelRunner(NeuronModelRunner):
"""A model runner for multi step decoding using the transformers_neuronx
framework"""
def __init__(
self,
vllm_config: VllmConfig,
):
super().__init__(vllm_config)
self.speculation_config = self.speculative_config
from transformers_neuronx.config import GenerationConfig
self.speculation_config.draft_model_config.neuron_sampling_params = (
GenerationConfig(
max_length=self.scheduler_config.max_model_len,
do_sample=True,
per_batch_line=True,
top_k=[self._MAX_NEURON_SAMPLING_TOP_K] \
* self.scheduler_config.max_num_seqs,
top_p=[1.0] * self.scheduler_config.max_num_seqs,
temperature=[1.0] * self.scheduler_config.max_num_seqs,
dynamic=True,
global_top_k=self._MAX_NEURON_SAMPLING_TOP_K
))
def load_model(self) -> None:
if find_spec("transformers_neuronx") is not None:
from vllm.model_executor.model_loader.neuron import (
get_neuron_eagle_speculation_model,
get_neuron_speculation_model)
if self.speculation_config.speculative_token_tree is not None:
self.model = get_neuron_eagle_speculation_model(
self.model_config,
parallel_config=self.parallel_config,
scheduler_config=self.scheduler_config,
speculation_config=self.speculation_config)
else:
self.model = get_neuron_speculation_model(
self.model_config,
parallel_config=self.parallel_config,
scheduler_config=self.scheduler_config,
speculation_config=self.speculation_config)
else:
raise NotImplementedError(
"Supports only Transformer-NeuronX based models.")
@torch.inference_mode()
def execute_model(
self,
model_input: ModelInputForNeuron,
kv_caches: Optional[List[torch.Tensor]] = None,
intermediate_tensors: Optional[IntermediateTensors] = None,
num_steps: int = 1,
) -> Optional[List[SamplerOutput]]:
logits = self.model(
input_ids=model_input.input_tokens,
positions=model_input.input_positions,
input_block_ids=model_input.input_block_ids,
**MultiModalKwargs.as_kwargs(
model_input.multi_modal_kwargs or {},
device=self.device,
),
)
output = self.model.sample(
logits=logits,
sampling_metadata=model_input.sampling_metadata,
)
return output
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from typing import List, Optional
import torch
from vllm.config import VllmConfig
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.multimodal import MultiModalKwargs
from vllm.sequence import IntermediateTensors
from vllm.worker.neuronx_distributed_model_runner import (
NeuronxDistributedModelRunner)
class MultiStepNeuronxDistributedModelRunner(NeuronxDistributedModelRunner):
"""A model runner for multi-step decoding using the
neuronx-distributed-inference framework"""
def __init__(
self,
vllm_config: VllmConfig,
):
super().__init__(vllm_config)
def load_model(self) -> None:
from vllm.model_executor.model_loader.neuronx_distributed import (
get_neuron_speculation_model)
self.model = get_neuron_speculation_model(
self.model_config,
parallel_config=self.parallel_config,
scheduler_config=self.scheduler_config,
speculation_config=self.speculative_config)
@torch.inference_mode()
def execute_model(
self,
model_input,
kv_caches: Optional[List[torch.Tensor]] = None,
intermediate_tensors: Optional[IntermediateTensors] = None,
num_steps: int = 1,
) -> Optional[List[SamplerOutput]]:
sampling_params = torch.tensor([[
seq_group.sampling_params.top_k,
seq_group.sampling_params.top_p,
seq_group.sampling_params.temperature,
] for seq_group in model_input.sampling_metadata.seq_groups])
logits = self.model(
input_ids=model_input.input_tokens,
positions=model_input.input_positions,
input_block_ids=model_input.input_block_ids,
sampling_params=sampling_params,
**MultiModalKwargs.as_kwargs(
model_input.multi_modal_kwargs or {},
device=self.device,
),
)
output = self.model.sample(
logits=logits,
sampling_metadata=model_input.sampling_metadata,
)
return output
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import dataclasses
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import torch
from vllm.distributed import broadcast_tensor_dict, get_pp_group
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.sequence import ExecuteModelRequest
from vllm.worker.model_runner_base import BroadcastableModelInput
from vllm.worker.multi_step_model_runner import (MultiStepModelRunner,
StatefulModelInput)
from vllm.worker.worker import Worker, WorkerInput
@dataclass
class MultiStepState:
worker_input: WorkerInput
model_input: StatefulModelInput
class MultiStepWorker(Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
base_model_runner = self.model_runner
# for multi-step model, wrap the model runner with MultiStepModelRunner
self.model_runner = MultiStepModelRunner(
base_model_runner,
vllm_config=base_model_runner.vllm_config,
kv_cache_dtype=self.cache_config.cache_dtype,
is_driver_worker=base_model_runner.is_driver_worker,
)
pipeline_parallel_size = self.parallel_config.pipeline_parallel_size
self.multi_step_states: List[
Optional[MultiStepState]] = [None] * pipeline_parallel_size
self.temp_output = None
def _get_driver_input_and_broadcast(
self, execute_model_req: ExecuteModelRequest
) -> Tuple[BroadcastableModelInput, WorkerInput, Dict[str, torch.Tensor]]:
"""
Get the driver input and broadcast it to other workers.
"""
assert self.is_driver_worker
virtual_engine = execute_model_req.virtual_engine
is_first_multi_step = execute_model_req.is_first_multi_step
if is_first_multi_step:
# on first step we prepare the worker input and model input normally
worker_input: WorkerInput = self.prepare_worker_input(
execute_model_req=execute_model_req)
model_input: StatefulModelInput = (
self.model_runner.prepare_model_input(
execute_model_req.seq_group_metadata_list,
execute_model_req.virtual_engine,
execute_model_req.finished_requests_ids))
if execute_model_req.async_callback:
model_input.frozen_model_input = dataclasses.replace( # type: ignore
model_input.frozen_model_input,
async_callback=execute_model_req.async_callback)
else:
# on subsequent steps we reuse the worker input and model input
multi_step_state = self.multi_step_states[virtual_engine]
worker_input = multi_step_state.worker_input
model_input = multi_step_state.model_input
frozen_model_input = model_input.frozen_model_input
assert frozen_model_input is not None
assert frozen_model_input.attn_metadata is not None
# clear the cached metadata so that it can be recomputed on
# the workers.
frozen_model_input.attn_metadata._cached_prefill_metadata = None
frozen_model_input.attn_metadata._cached_decode_metadata = None
model_input.is_first_multi_step = is_first_multi_step
model_input.is_last_step = execute_model_req.is_last_step
if not is_first_multi_step:
# we broadcast the last sampled token ids to all TP workers so they
# can update their model input metadata in-place.
self._prepare_last_sampled_token_ids_for_tp_workers(
execute_model_req=execute_model_req, model_input=model_input)
if self.do_metadata_broadcast:
broadcast_data = worker_input.as_broadcastable_tensor_dict()
broadcast_data.update(model_input.as_broadcastable_tensor_dict())
broadcast_tensor_dict(broadcast_data, src=0)
# Retuning empty dict here to keep this compatible with
# `LocalOrDistributedWorkerBase._get_driver_input_and_broadcast`
return model_input, worker_input, {}
def _prepare_last_sampled_token_ids_for_tp_workers(
self,
execute_model_req: ExecuteModelRequest,
model_input: StatefulModelInput,
) -> None:
"""
Prepare the last sampled token ids for TP workers. If it's the last
PP rank, then the last sampled token ids are already in the model_input.
If it is NOT the last PP rank, then we need to get the last sampled
token that is cached in the execute_model_req.
"""
if get_pp_group().is_last_rank:
assert model_input.cached_outputs[
-1].sampler_output.sampled_token_ids is None
assert model_input.cached_outputs[-1].sampled_token_ids is not None
model_input.last_sampled_token_ids = model_input.cached_outputs[
-1].sampled_token_ids
# free sampled token ids from the previous step if it has been
# pythonized. Cannot free the last sampled token ids because
# we need it for GPU advance_step.
for output in model_input.cached_outputs[:-1]:
if output.pythonized:
output.sampled_token_ids = None
else:
# otherwise we need to get the cached sampled token ids from the
# execute_model_req
assert execute_model_req.last_sampled_token_ids is not None
model_input.last_sampled_token_ids = (
execute_model_req.last_sampled_token_ids.cuda())
model_input.add_sampler_output(
SamplerOutput(outputs=[], sampled_token_ids=None),
model_input.last_sampled_token_ids)
# free sampled token ids from the previous step.
# TODO(will) we could reuse the sampled token ids tensor from
# the previous step instead.
for output in model_input.cached_outputs[:-1]:
output.sampled_token_ids = None
assert model_input.cached_outputs[-1].sampled_token_ids is not None
def prepare_input(
self,
execute_model_req: Optional[ExecuteModelRequest] = None,
) -> Optional[Tuple[StatefulModelInput, WorkerInput, Dict[str,
torch.Tensor]]]:
"""
Depending on the current state of the request and multi step worker,
this method may skip the normal _prepare_model_input and
_prepare_worker_input methods and instead used cached values.
"""
if self.is_driver_worker:
if execute_model_req is None:
if self.do_metadata_broadcast:
# This signals that there's no more requests to process for
# now. All workers are running infinite loop with
# broadcast_tensor_dict, and it stops the loop when the
# driver broadcasts an empty input. Send an empty input to
# notify all other workers to stop their execution loop.
broadcast_tensor_dict({}, src=0)
return None
virtual_engine = execute_model_req.virtual_engine
(model_input, worker_input,
kwargs) = self._get_driver_input_and_broadcast(execute_model_req)
assert isinstance(model_input, StatefulModelInput)
if execute_model_req.is_first_multi_step:
# cache the worker input and model input for the next steps
self.multi_step_states[virtual_engine] = MultiStepState(
worker_input=worker_input, model_input=model_input)
# if TP workers
else:
broadcast_data = self._get_worker_input_from_broadcast()
# if the driver has sent an empty input, we should stop the worker
# loop
if broadcast_data is None:
return None
model_input, worker_input, kwargs = broadcast_data
assert isinstance(model_input, StatefulModelInput)
virtual_engine = worker_input.virtual_engine
if model_input.is_first_multi_step:
pass
# TODO(will) Can cache the worker input and model input for the
# next steps. See below for details
else:
# TODO(will) possible to also cache and reuse the cached worker
# input and model input. The idea is essentially the delta
# optimization for model_inputs. Where the TP workers can cache
# the model input states and we only broadcast the delta need
# for the next step (sampled_token_ids from the previous step)
assert isinstance(model_input, StatefulModelInput)
# we need to update the last sampled token ids in the model
# input for the workers so that they can run inplace
# advance_step
model_input.add_sampler_output(
SamplerOutput(outputs=[], sampled_token_ids=None),
model_input.last_sampled_token_ids)
assert model_input is not None
assert worker_input is not None
return model_input, worker_input, kwargs
...@@ -64,25 +64,21 @@ class NeuronWorker(LocalOrDistributedWorkerBase): ...@@ -64,25 +64,21 @@ class NeuronWorker(LocalOrDistributedWorkerBase):
assert (self.lora_config assert (self.lora_config
is None), ("LoRA is not supported for TransformersNeuronX " is None), ("LoRA is not supported for TransformersNeuronX "
"framework.") "framework.")
from vllm.worker.multi_step_neuron_model_runner import (
MultiStepNeuronModelRunner)
if self.speculative_config is not None: if self.speculative_config is not None:
return MultiStepNeuronModelRunner(vllm_config=vllm_config) raise NotImplementedError(
else: "Speculative decoding is not supported for TransformersNeuronX"
return NeuronModelRunner(vllm_config=vllm_config) )
return NeuronModelRunner(vllm_config=vllm_config)
def get_neuronx_distributed_model_runner(self, vllm_config): def get_neuronx_distributed_model_runner(self, vllm_config):
from vllm.worker.multi_step_neuronx_distributed_model_runner import (
MultiStepNeuronxDistributedModelRunner)
from vllm.worker.neuronx_distributed_model_runner import ( from vllm.worker.neuronx_distributed_model_runner import (
NeuronxDistributedModelRunner) NeuronxDistributedModelRunner)
if self.speculative_config is not None: if self.speculative_config is not None:
assert (self.lora_config assert (self.lora_config is None), (
is None), "LoRA is not supported for Speculative Decoding" "LoRA is not supported for Speculative Decoding")
return MultiStepNeuronxDistributedModelRunner( raise NotImplementedError(
vllm_config=vllm_config) "Speculative decoding is not supported for NeuronxDistributed")
else: return NeuronxDistributedModelRunner(vllm_config=vllm_config)
return NeuronxDistributedModelRunner(vllm_config=vllm_config)
def init_device(self) -> None: def init_device(self) -> None:
self.init_distributed_environment() self.init_distributed_environment()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment