Commit e326b648 authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.9.2-dev' of http://10.16.6.30/dcutoolkit/deeplearing/vllm into v0.9.2-dev

parents 8a1e7a3d 89626cfc
......@@ -61,7 +61,6 @@ def split_scheduler_output(runner, scheduler_output:SchedulerOutput):
else:
new_req_data_right.append(new_req)
#print('###scheduler_output.scheduled_cached_reqs', scheduler_output.scheduled_cached_reqs)
cached_reqs_left = CachedRequestData.make_empty()
cached_reqs_right = CachedRequestData.make_empty()
for req_idx, req_id in enumerate(scheduler_output.scheduled_cached_reqs.req_ids):
......
......@@ -637,12 +637,8 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
decode_metadata = None
if self._num_decodes > 0:
if self.use_spec_decode:
if self.use_spec_decode and not common_attn_metadata.spec_layer_decoding:
query_lens = self.num_scheduled_tokens_np[:self._num_decodes]
if common_attn_metadata.num_rejected_tokens is not None:
num_rejected_tokens = common_attn_metadata.num_rejected_tokens[:self._num_decodes]
query_lens = query_lens - np.array(num_rejected_tokens, dtype=np.int32)
self._num_decode_tokens -= sum(num_rejected_tokens)
cu_num_blocks = np.cumsum(query_lens)
virtual_batches = cu_num_blocks[-1]
block_offsets = np.repeat(cu_num_blocks - query_lens, query_lens)
......@@ -673,10 +669,20 @@ class MLACommonMetadataBuilder(AttentionMetadataBuilder[M]):
seq_lens=decode_seq_lens,
)
else:
decode_metadata = self._build_decode(
block_table_tensor=block_table_tensor[:self._num_decodes, ...],
seq_lens=seq_lens[:self._num_decodes],
)
self._num_decode_tokens = self._num_decodes
if self.use_spec_decode and self.spec_decode_block_table_tensor is not None:
self.spec_decode_block_table_tensor[:self._num_decode_tokens].copy_(block_table_tensor[:self._num_decode_tokens, ...])
self.spec_decode_seq_lens[:self._num_decode_tokens].copy_(seq_lens[:self._num_decode_tokens])
decode_metadata = self._build_decode(
block_table_tensor=self.spec_decode_block_table_tensor[:self._num_decode_tokens, ...],
seq_lens=self.spec_decode_seq_lens[:self._num_decode_tokens],
)
else:
decode_metadata = self._build_decode(
block_table_tensor=block_table_tensor[:self._num_decode_tokens, ...],
seq_lens=seq_lens[:self._num_decode_tokens],
)
return self.metadata_cls(
num_actual_tokens=num_actual_tokens,
......
......@@ -43,12 +43,11 @@ class CommonAttentionMetadata:
"""Total number of tokens in batch"""
max_query_len: int
"""Longest query in batch"""
num_rejected_tokens: list[int] = None
"""(batch_size,), record the rejected tokens number in cpu and gpu"""
num_speculative_tokens: int = 0
"""Number of speculative tokens"""
slot_mapping: torch.Tensor = None
"""(batch_size, seq_len), slot mapping"""
spec_layer_decoding: bool = False
M = TypeVar("M")
......
......@@ -15,6 +15,8 @@ from logging import DEBUG
from typing import Any, Callable, Optional, TypeVar, Union
import msgspec
from vllm import envs
from vllm.zero_overhead.v1.core import engine_core_step
import zmq
from vllm.config import ParallelConfig, VllmConfig
......@@ -226,6 +228,8 @@ class EngineCore:
Returns tuple of outputs and a flag indicating whether the model
was executed.
"""
if envs.VLLM_ZERO_OVERHEAD:
return engine_core_step(self)
# Check for any requests remaining in the scheduler - unfinished,
# or finished and not yet removed from the batch.
......@@ -235,7 +239,6 @@ class EngineCore:
model_output = self.execute_model(scheduler_output)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output) # type: ignore
return (engine_core_outputs,
scheduler_output.total_num_scheduled_tokens > 0)
......
......@@ -104,9 +104,8 @@ class EagleProposer:
# [batch_size, max_num_blocks_per_req]
block_table: torch.Tensor,
# [batch_size]
num_rejected_tokens: list[int],
# [batch_size]
sampling_metadata: SamplingMetadata
sampling_metadata: SamplingMetadata,
decoding: bool = False,
) -> torch.Tensor:
num_tokens = target_token_ids.shape[0]
batch_size = next_token_ids.shape[0]
......@@ -158,8 +157,8 @@ class EagleProposer:
num_reqs=batch_size,
num_actual_tokens=num_tokens,
max_query_len=max_query_len,
num_rejected_tokens=num_rejected_tokens,
slot_mapping=target_slot_mapping
slot_mapping=target_slot_mapping,
spec_layer_decoding=decoding
)
assert self.runner is not None
......@@ -186,7 +185,7 @@ class EagleProposer:
self.positions[:num_tokens] = target_positions
self.hidden_states[:num_tokens] = target_hidden_states
if (self.use_full_cuda_graph
if (decoding and self.use_full_cuda_graph
and num_tokens <= self.cudagraph_batch_sizes[-1]):
assert self.attn_metadata_cudagraph
if self.method in ["eagle", "eagle3"]:
......@@ -214,13 +213,14 @@ class EagleProposer:
if attn_metadata.decode is not None:
self.attn_metadata_cudagraph.decode.block_table[:attn_metadata.num_decode_tokens] = (
attn_metadata.decode.block_table)
attn_metadata.decode.block_table)
self.attn_metadata_cudagraph.decode.seq_lens[:attn_metadata.num_decode_tokens] = (
attn_metadata.decode.seq_lens)
with set_forward_context(per_layer_attn_metadata,
self.vllm_config,
num_tokens=num_input_tokens):
num_tokens=num_input_tokens,
skip_cuda_graphs=not decoding):
ret_hidden_states = self.model(
self.input_ids[:num_input_tokens],
self.positions[:num_input_tokens],
......@@ -390,45 +390,56 @@ class EagleProposer:
return draft_token_ids
# @staticmethod
# def prepare_inputs(
# # [batch_size + 1]
# cu_target_query_lens: torch.Tensor,
# # [batch_size]
# num_rejected_tokens: torch.Tensor,
# num_tokens: int,
# ) -> tuple[torch.Tensor, torch.Tensor]:
# # cu_target_query_lens: [0, a, a + b, a + b + c]
# # num_rejected_tokens: [n1, n2, n3]
# # num_tokens_per_req: [a - n1, b - n2, c - n3]
# # cu_num_tokens: [0, a - n1, a + b - n1 - n2, a + b + c - n1 - n2 - n3]
# # token_indices: [0, 1, ..., a - n1 - 1,
# # a, a + 1, ..., a + b - n2 - 1,
# # a + b, a + b + 1, ..., a + b + c - n3 - 1]
# # [0, a, a + b, a + b + c] -> [a, b, c]
# query_len_per_req = (cu_target_query_lens[1:] -
# cu_target_query_lens[:-1])
# # [a, b, c] -> [a - n1, b - n2, c - n3]
# num_tokens_per_req = query_len_per_req - num_rejected_tokens
# # [a - n1, b - n2, c - n3] ->
# # [0, a - n1, a + b - n1 - n2, a + b + c - n1 - n2 - n3]
# cu_num_tokens = torch.zeros_like(cu_target_query_lens)
# torch.cumsum(num_tokens_per_req, dim=0, out=cu_num_tokens[1:])
# token_indices = torch.empty(
# num_tokens,
# dtype=torch.int32,
# device=cu_target_query_lens.device,
# )
# batch_size = num_rejected_tokens.shape[0]
# BLOCK_SIZE = 1024
# prepare_eagle_input_kernel[(batch_size, )](
# token_indices,
# cu_target_query_lens,
# cu_num_tokens,
# BLOCK_SIZE=BLOCK_SIZE,
# )
# return cu_num_tokens, token_indices
@staticmethod
def prepare_inputs(
# [batch_size + 1]
cu_target_query_lens: torch.Tensor,
# [batch_size]
num_rejected_tokens: torch.Tensor,
num_tokens: int,
num_accepted_tokens_tensor: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
# cu_target_query_lens: [0, a, a + b, a + b + c]
# num_rejected_tokens: [n1, n2, n3]
# num_tokens_per_req: [a - n1, b - n2, c - n3]
# cu_num_tokens: [0, a - n1, a + b - n1 - n2, a + b + c - n1 - n2 - n3]
# token_indices: [0, 1, ..., a - n1 - 1,
# a, a + 1, ..., a + b - n2 - 1,
# a + b, a + b + 1, ..., a + b + c - n3 - 1]
# [0, a, a + b, a + b + c] -> [a, b, c]
query_len_per_req = (cu_target_query_lens[1:] -
cu_target_query_lens[:-1])
# [a, b, c] -> [a - n1, b - n2, c - n3]
num_tokens_per_req = query_len_per_req - num_rejected_tokens
# [a - n1, b - n2, c - n3] ->
# [0, a - n1, a + b - n1 - n2, a + b + c - n1 - n2 - n3]
cu_num_tokens = torch.zeros_like(cu_target_query_lens)
torch.cumsum(num_tokens_per_req, dim=0, out=cu_num_tokens[1:])
token_indices = torch.empty(
num_tokens,
dtype=torch.int32,
device=cu_target_query_lens.device,
)
batch_size = num_rejected_tokens.shape[0]
BLOCK_SIZE = 1024
prepare_eagle_input_kernel[(batch_size, )](
token_indices,
cu_target_query_lens,
cu_num_tokens,
BLOCK_SIZE=BLOCK_SIZE,
)
cu_num_tokens = torch.arange(cu_target_query_lens.shape[0], device=cu_target_query_lens.device)
token_indices = num_accepted_tokens_tensor + cu_target_query_lens[:-1]
return cu_num_tokens, token_indices
def load_model(self, target_model: nn.Module) -> None:
......
......@@ -69,6 +69,7 @@ from vllm.v1.worker.gpu_input_batch import CachedRequestState, InputBatch
from vllm.v1.worker.lora_model_runner_mixin import LoRAModelRunnerMixin
from vllm.platforms import current_platform
from vllm.two_batch_overlap.v1.model_input_split_v1 import tbo_split_and_execute_model
from vllm.zero_overhead.v1.gpu_model_runner import execute_model_sampled, zero_prepare_inputs
from ..sample.logits_processor import LogitsProcessorManager
from .utils import (gather_mm_placeholders, initialize_kv_cache_for_kv_sharing,
......@@ -1365,6 +1366,8 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# compiled with full CUDA graphs, we have to skip them entirely.
skip_cuda_graphs = self.full_cuda_graph and not attention_cuda_graphs
if envs.VLLM_ZERO_OVERHEAD:
zero_prepare_inputs(self, scheduler_output, input_ids)
if envs.VLLM_ENABLE_TBO and not self.use_cuda_graph:
model_output, finished_sending, finished_recving = \
tbo_split_and_execute_model(self, attn_metadata, num_input_tokens,
......@@ -1505,6 +1508,22 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# Get the valid generated tokens.
sampled_token_ids = sampler_output.sampled_token_ids
max_gen_len = sampled_token_ids.shape[-1]
if envs.VLLM_ZERO_OVERHEAD:
return execute_model_sampled(self, max_gen_len, sampled_token_ids,
discard_sampled_tokens_req_indices, scheduler_output,
sampling_metadata,
hidden_states,
sample_hidden_states,
aux_hidden_states,
spec_decode_metadata,
attn_metadata,
logprobs_lists,
prompt_logprobs_dict,
finished_sending,
finished_recving,
num_nans_in_logits)
if max_gen_len == 1:
# No spec decode tokens.
valid_sampled_token_ids = sampled_token_ids.tolist()
......@@ -1643,7 +1662,6 @@ class GPUModelRunner(LoRAModelRunnerMixin):
else:
block_table = None
num_rejected_tokens = None
if spec_decode_metadata is None:
# input_ids can be None for multimodal models.
target_token_ids = self.input_ids[:num_scheduled_tokens]
......@@ -1659,21 +1677,15 @@ class GPUModelRunner(LoRAModelRunnerMixin):
cu_num_tokens = eagle_attn_metadata.query_start_loc
else:
# TODO(woosuk): Refactor this.
num_draft_tokens = spec_decode_metadata.num_draft_tokens
num_rejected_tokens = [
n + 1 - len(sampled_token_ids[i]) if n > 0 else 0
for i, n in enumerate(num_draft_tokens)
]
num_rejected_tokens_tensor = async_tensor_h2d(
num_rejected_tokens,
num_accepted_tokens = [len(s) - 1 for s in sampled_token_ids]
num_accepted_tokens_tensor = async_tensor_h2d(
num_accepted_tokens,
dtype=torch.int32,
target_device=self.device,
pin_memory=True)
num_tokens = num_scheduled_tokens - sum(num_rejected_tokens)
cu_num_tokens, token_indices = self.drafter.prepare_inputs(
eagle_attn_metadata.query_start_loc,
num_rejected_tokens_tensor,
num_tokens,
num_accepted_tokens_tensor,
)
target_token_ids = self.input_ids[token_indices]
# TODO(woosuk): Support M-RoPE.
......@@ -1694,7 +1706,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
cu_num_tokens=cu_num_tokens,
block_table=block_table,
sampling_metadata=sampling_metadata,
num_rejected_tokens=num_rejected_tokens
decoding=spec_decode_metadata is not None
)
spec_token_ids = draft_token_ids.tolist()
return spec_token_ids
......
......@@ -28,6 +28,7 @@ from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.utils import report_usage_stats
from vllm.v1.worker.gpu_model_runner import GPUModelRunner
from vllm.v1.worker.worker_base import WorkerBase
from vllm.zero_overhead.utils import zero_overhead_stream
logger = init_logger(__name__)
......@@ -304,9 +305,14 @@ class Worker(WorkerBase):
intermediate_tensors = IntermediateTensors(
get_pp_group().recv_tensor_dict(
all_gather_group=get_tp_group()))
output = self.model_runner.execute_model(scheduler_output,
intermediate_tensors)
if envs.VLLM_ZERO_OVERHEAD:
use_stream = zero_overhead_stream(self.device)
with torch.cuda.stream(use_stream):
output = self.model_runner.execute_model(scheduler_output,
intermediate_tensors)
else:
output = self.model_runner.execute_model(scheduler_output,
intermediate_tensors)
parallel_config = self.vllm_config.parallel_config
if parallel_config.distributed_executor_backend != "external_launcher" \
and not get_pp_group().is_last_rank:
......
import torch
from collections import defaultdict
from typing import Optional
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.core.sched.scheduler import Scheduler
from vllm.v1.engine import EngineCoreOutput, EngineCoreOutputs
from vllm.v1.request import Request, RequestStatus
from vllm.v1.spec_decode.metrics import SpecDecodingStats
from vllm.zero_overhead.v1.outputs import ZeroV1ModelRunnerOutput
requsets_valid_token_len = {}
def check_stop(request: Request,
max_model_len: int,
pooler_output: Optional[torch.Tensor] = None) -> bool:
if request.request_id not in requsets_valid_token_len:
requsets_valid_token_len[request.request_id] = 0
return False
valid_output_len = requsets_valid_token_len[request.request_id]
valid_num_tokens = request.num_prompt_tokens + valid_output_len
if (valid_num_tokens >= max_model_len
or valid_output_len >= request.max_tokens):
request.status = RequestStatus.FINISHED_LENGTH_CAPPED
return True
if request.pooling_params:
if pooler_output is not None:
request.status = RequestStatus.FINISHED_STOPPED
return True
return False
sampling_params = request.sampling_params
assert sampling_params is not None
last_token_id = request.output_token_ids[valid_output_len - 1]
if (not sampling_params.ignore_eos
and last_token_id == request.eos_token_id):
request.status = RequestStatus.FINISHED_STOPPED
return True
if last_token_id in (sampling_params.stop_token_ids or ()):
request.status = RequestStatus.FINISHED_STOPPED
request.stop_reason = last_token_id
return True
return False
def zero_overhead_update_from_output(scheduler:Scheduler,
scheduler_output: SchedulerOutput,
model_runner_output: ZeroV1ModelRunnerOutput):
global requsets_valid_token_len
sampled_token_ids = model_runner_output.sampled_token_ids
spec_token_ids = model_runner_output.spec_token_ids
logprobs = model_runner_output.logprobs
prompt_logprobs_dict = model_runner_output.prompt_logprobs_dict
num_scheduled_tokens = scheduler_output.num_scheduled_tokens
pooler_outputs = model_runner_output.pooler_output
num_nans_in_logits = model_runner_output.num_nans_in_logits
new_running: list[Request] = []
outputs: dict[int, list[EngineCoreOutput]] = defaultdict(list)
spec_decoding_stats: Optional[SpecDecodingStats] = None
# fix last model out in zero overhead
for req_idx, req_id in enumerate(model_runner_output.fix_req_ids):
if req_id not in scheduler.requests:
continue
request = scheduler.requests[req_id]
generated_token_ids = model_runner_output.fix_sampled_token_ids[req_idx]
if req_id not in requsets_valid_token_len:
requsets_valid_token_len[req_id] = 0
valid_output_len = requsets_valid_token_len[req_id]
fix_offset = valid_output_len - request.num_output_tokens
if isinstance(generated_token_ids, int):
request._output_token_ids[fix_offset] = generated_token_ids
request._all_token_ids[fix_offset] = generated_token_ids
requsets_valid_token_len[req_id] += 1
else:
valid_output_end = valid_output_len + len(generated_token_ids) - request.num_output_tokens
if valid_output_end == 0:
request._output_token_ids[fix_offset : ] = generated_token_ids
request._all_token_ids[fix_offset : ] = generated_token_ids
else:
request._output_token_ids[fix_offset : valid_output_end] = generated_token_ids
request._all_token_ids[fix_offset : valid_output_end] = generated_token_ids
requsets_valid_token_len[req_id] += len(generated_token_ids)
stopped = False
new_logprobs = None
new_token_ids = generated_token_ids
kv_transfer_params = None
# Check for stop and update request state.
# This must be called before we make the EngineCoreOutput.
for num_new, output_token_id in enumerate(new_token_ids, 1):
stopped = check_stop(request, scheduler.max_model_len)
if stopped:
kv_transfer_params = scheduler._free_request(request)
del new_token_ids[num_new:] # Trim new tokens if needed.
break
pooler_output = None
if pooler_outputs:
pooler_output = pooler_outputs[req_index]
stopped = check_stop(request, scheduler.max_model_len,
pooler_output)
if stopped:
kv_transfer_params = scheduler._free_request(request)
# Extract sample logprobs if needed.
if request.sampling_params is not None \
and request.sampling_params.logprobs is not None and logprobs:
# NOTE: once we support N tokens per step (spec decode),
# the outer lists can be of length > 1.
new_logprobs = logprobs.slice(req_index, req_index + 1)
if new_token_ids and scheduler.structured_output_manager.should_advance(
request):
# NOTE: structured_output_request
# should not be None if use_structured_output, we have
# check above, so safe to ignore type warning
request.structured_output_request.grammar.accept_tokens( # type: ignore[union-attr]
req_id, new_token_ids)
# spec_token_ids comes from the model runner output
if num_nans_in_logits is not None and req_id in num_nans_in_logits:
request.num_nans_in_logits = num_nans_in_logits[req_id]
# Add newly generated spec token ids to the request.
if spec_token_ids is not None:
if scheduler.structured_output_manager.should_advance(request):
metadata = request.structured_output_request
# Needs to happen after new_token_ids are accepted.
request.spec_token_ids = metadata.grammar.validate_tokens( # type: ignore[union-attr]
spec_token_ids[req_index])
else:
request.spec_token_ids = spec_token_ids[req_index]
# Get prompt logprobs for this request.
prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id)
if new_token_ids or pooler_output is not None \
or kv_transfer_params:
# Add EngineCoreOutput for this Request.
outputs[request.client_index].append(
EngineCoreOutput(
request_id=req_id,
new_token_ids=new_token_ids,
finish_reason=request.get_finished_reason(),
new_logprobs=new_logprobs,
new_prompt_logprobs_tensors=prompt_logprobs_tensors,
pooling_output=pooler_output,
stop_reason=request.stop_reason,
events=request.take_events(),
kv_transfer_params=kv_transfer_params,
num_cached_tokens=request.num_cached_tokens,
))
else:
# Invariant: EngineCore returns no partial prefill outputs.
assert not prompt_logprobs_tensors
# NOTE(woosuk): As len(self.running) can be up to 1K or more, the below
# loop can be a performance bottleneck. We should do our best to avoid
# expensive operations inside the loop.
for request in scheduler.running:
req_id = request.request_id
num_tokens_scheduled = num_scheduled_tokens.get(req_id, 0)
if num_tokens_scheduled == 0:
# The request was not scheduled in this step.
new_running.append(request)
continue
req_index = model_runner_output.req_id_to_index[req_id]
generated_token_ids = sampled_token_ids[
req_index] if sampled_token_ids else []
scheduled_spec_token_ids = (
scheduler_output.scheduled_spec_decode_tokens.get(req_id))
if scheduled_spec_token_ids:
# num_computed_tokens represents the number of tokens
# processed in the current step, considering scheduled
# tokens and rejections. If some tokens are rejected,
# num_computed_tokens is decreased by the number of rejected
# tokens, where is given by:
# len(scheduled_spec_token_ids) + 1 - len(generated_token_ids).
num_tokens_rejected = (len(scheduled_spec_token_ids) + 1 -
len(generated_token_ids))
request.num_computed_tokens -= num_tokens_rejected
spec_decoding_stats = scheduler.make_spec_decoding_stats(
spec_decoding_stats,
num_draft_tokens=len(scheduled_spec_token_ids),
num_accepted_tokens=len(generated_token_ids) - 1)
# NOTE(woosuk): This has to be executed after updating
# `request.num_computed_tokens`.
if request.has_encoder_inputs:
scheduler._free_encoder_inputs(request)
stopped = False
new_logprobs = None
new_token_ids = generated_token_ids
kv_transfer_params = None
# Append generated tokens and check for stop. Note that if
# a request is still being prefilled, we expect the model runner
# to return empty token ids for the request.
for num_new, output_token_id in enumerate(new_token_ids, 1):
request.append_output_token_ids(output_token_id)
# Check for stop and update request state.
# This must be called before we make the EngineCoreOutput.
stopped = check_stop(request, scheduler.max_model_len)
# if stopped:
# kv_transfer_params = scheduler._free_request(request)
# del new_token_ids[num_new:] # Trim new tokens if needed.
# break
pooler_output = None
if pooler_outputs:
pooler_output = pooler_outputs[req_index]
stopped = check_stop(request, scheduler.max_model_len,
pooler_output)
# if stopped:
# kv_transfer_params = scheduler._free_request(request)
# Extract sample logprobs if needed.
if request.sampling_params is not None \
and request.sampling_params.logprobs is not None and logprobs:
# NOTE: once we support N tokens per step (spec decode),
# the outer lists can be of length > 1.
new_logprobs = logprobs.slice(req_index, req_index + 1)
if new_token_ids and scheduler.structured_output_manager.should_advance(
request):
# NOTE: structured_output_request
# should not be None if use_structured_output, we have
# check above, so safe to ignore type warning
request.structured_output_request.grammar.accept_tokens( # type: ignore[union-attr]
req_id, new_token_ids)
# spec_token_ids comes from the model runner output
if num_nans_in_logits is not None and req_id in num_nans_in_logits:
request.num_nans_in_logits = num_nans_in_logits[req_id]
# Add newly generated spec token ids to the request.
if spec_token_ids is not None:
if scheduler.structured_output_manager.should_advance(request):
metadata = request.structured_output_request
# Needs to happen after new_token_ids are accepted.
request.spec_token_ids = metadata.grammar.validate_tokens( # type: ignore[union-attr]
spec_token_ids[req_index])
else:
request.spec_token_ids = spec_token_ids[req_index]
if not stopped:
new_running.append(request)
scheduler.running = new_running
# KV Connector: update state for finished KV Transfers.
scheduler._update_from_kv_xfer_finished(model_runner_output)
# Create EngineCoreOutputs for all clients that have requests with
# outputs in this step.
engine_core_outputs = {
client_index: EngineCoreOutputs(outputs=outs)
for client_index, outs in outputs.items()
}
finished_req_ids = scheduler.finished_req_ids_dict
if finished_req_ids:
# Include ids of requests that finished since last outputs
# were sent.
for client_index, finished_set in finished_req_ids.items():
# Set finished request set in EngineCoreOutputs for this client.
if (eco := engine_core_outputs.get(client_index)) is not None:
eco.finished_requests = finished_set
else:
engine_core_outputs[client_index] = EngineCoreOutputs(
finished_requests=finished_set)
finished_req_ids.clear()
if engine_core_outputs:
# Return stats to only one of the front-ends.
next(iter(engine_core_outputs.values())).scheduler_stats = (
scheduler.make_stats(spec_decoding_stats))
return engine_core_outputs
def engine_core_step(core) -> tuple[dict[int, EngineCoreOutputs], bool]:
"""Schedule, execute, and make output.
Returns tuple of outputs and a flag indicating whether the model
was executed.
"""
# Check for any requests remaining in the scheduler - unfinished,
# or finished and not yet removed from the batch.
if not core.scheduler.has_requests():
return {}, False
scheduler_output = core.scheduler.schedule()
model_output = core.execute_model(scheduler_output)
if isinstance(model_output, ZeroV1ModelRunnerOutput):
engine_core_outputs = zero_overhead_update_from_output(core.scheduler,
scheduler_output, model_output) # type: ignore
else:
engine_core_outputs = core.scheduler.update_from_output(
scheduler_output, model_output) # type: ignore
return (engine_core_outputs,
scheduler_output.total_num_scheduled_tokens > 0)
\ No newline at end of file
import torch
import numpy as np
from vllm.distributed.kv_transfer.kv_transfer_state import get_kv_transfer_group, has_kv_transfer_group
from vllm.distributed.parallel_state import get_tp_group
from vllm.utils import async_tensor_h2d
from vllm.zero_overhead.v1.outputs import ZeroV1ModelRunnerOutput
from vllm.profiler.prof import profile
class V1ZeroModelRunner():
def __init__(self):
self.last_sampled_token_ids = None
self.last_sampled_req_ids = []
self.last_sampled_token_lens = []
self.last_sampler_event = torch.cuda.Event(enable_timing=False)
self.last_sampler_host_tokens = None
self.token_ids_cpu_fix_recode = []
def set_last_sampled_token_ids(self, sampled_token_ids):
self.last_sampled_token_ids = sampled_token_ids
self.last_sampled_req_ids = []
self.last_sampled_token_lens = []
v1_zero_overhead = V1ZeroModelRunner()
def zero_prepare_inputs(runner, scheduler_output, input_ids):
req_ids = runner.input_batch.req_ids
update_req_indices = []
input_ids_indices = []
token_idx = 0
if v1_zero_overhead.last_sampled_token_ids is None:
return
sampled_tokens_num = v1_zero_overhead.last_sampled_token_ids.shape[1]
for req_id in req_ids:
if req_id in v1_zero_overhead.last_sampled_req_ids:
req_idx = v1_zero_overhead.last_sampled_req_ids.index(req_id) * sampled_tokens_num
update_req_indices.append(req_idx)
input_ids_indices.append(token_idx)
token_idx += scheduler_output.num_scheduled_tokens[req_id]
if len(update_req_indices) > 0:
update_req_indices_tensor = async_tensor_h2d(update_req_indices, torch.int32,
runner.device,
True)
input_ids_indices_tensor = async_tensor_h2d(input_ids_indices, torch.int32,
runner.device,
True)
last_sampled_token_ids = v1_zero_overhead.last_sampled_token_ids.flatten()
for i in range(sampled_tokens_num):
input_ids[input_ids_indices_tensor + i] = last_sampled_token_ids[update_req_indices_tensor + i]
def execute_model_sampled(runner, max_gen_len, sampled_token_ids,
discard_sampled_tokens_req_indices, scheduler_output,
sampling_metadata,
hidden_states,
sample_hidden_states,
aux_hidden_states,
spec_decode_metadata,
attn_metadata,
logprobs_lists,
prompt_logprobs_dict,
finished_sending,
finished_recving,
num_nans_in_logits
):
fix_req_ids = None
fix_sampled_token_ids = None
if max_gen_len == 1:
# No spec decode tokens.
if v1_zero_overhead.last_sampler_host_tokens != None:
v1_zero_overhead.last_sampler_event.synchronize()
fix_sampled_token_ids = v1_zero_overhead.last_sampler_host_tokens.tolist()
for req_idx, start_idx, end_idx in v1_zero_overhead.token_ids_cpu_fix_recode:
runner.input_batch.token_ids_cpu[req_idx, start_idx:end_idx] = fix_sampled_token_ids[req_idx]
fix_req_ids = v1_zero_overhead.last_sampled_req_ids
for req_idx, req_id in enumerate(fix_req_ids):
if req_id in runner.requests:
req_state = runner.requests[req_id]
token_idx = v1_zero_overhead.last_sampled_token_lens[req_idx]
req_state.output_token_ids[token_idx] = fix_sampled_token_ids[req_idx][0]
v1_zero_overhead.last_sampler_host_tokens = sampled_token_ids.to('cpu', non_blocking=True)
v1_zero_overhead.last_sampler_event.record()
v1_zero_overhead.set_last_sampled_token_ids(sampled_token_ids)
valid_sampled_token_ids = np.ones(sampled_token_ids.shape, dtype=int).tolist()
else:
# Includes spec decode tokens.
valid_sampled_token_ids = runner.rejection_sampler.parse_output(
sampled_token_ids,
runner.input_batch.vocab_size,
)
# Mask out the sampled tokens that should not be sampled.
for i in discard_sampled_tokens_req_indices:
valid_sampled_token_ids[i].clear()
# Cache the sampled tokens in the model runner, so that the scheduler
# doesn't need to send them back.
# NOTE(woosuk): As an exception, when using PP, the scheduler sends
# the sampled tokens back, because there's no direct communication
# between the first-stage worker and the last-stage worker.
v1_zero_overhead.token_ids_cpu_fix_recode.clear()
for req_idx, sampled_ids in enumerate(valid_sampled_token_ids):
if not sampled_ids:
continue
start_idx = runner.input_batch.num_tokens_no_spec[req_idx]
end_idx = start_idx + len(sampled_ids)
assert end_idx <= runner.max_model_len, (
"Sampled token IDs exceed the max model length. "
f"Total number of tokens: {end_idx} > max_model_len: "
f"{runner.max_model_len}")
runner.input_batch.token_ids_cpu[req_idx,
start_idx:end_idx] = sampled_ids
v1_zero_overhead.token_ids_cpu_fix_recode.append([req_idx, start_idx, end_idx])
runner.input_batch.num_tokens_no_spec[req_idx] = end_idx
runner.input_batch.num_tokens[req_idx] = end_idx
req_id = runner.input_batch.req_ids[req_idx]
if req_id in runner.requests:
req_state = runner.requests[req_id]
v1_zero_overhead.last_sampled_req_ids.append(req_id)
v1_zero_overhead.last_sampled_token_lens.append(len(req_state.output_token_ids))
req_state.output_token_ids.extend(sampled_ids)
if not runner.speculative_config:
# Speculative decoding is not enabled.
spec_token_ids = None
else:
spec_token_ids = runner.propose_draft_token_ids(
scheduler_output,
valid_sampled_token_ids,
sampling_metadata,
hidden_states,
sample_hidden_states,
aux_hidden_states,
spec_decode_metadata,
attn_metadata,
)
# Clear KVConnector state after all KVs are generated.
if has_kv_transfer_group():
get_kv_transfer_group().clear_connector_metadata()
runner.eplb_step()
model_output = ZeroV1ModelRunnerOutput(
req_ids=runner.input_batch.req_ids,
req_id_to_index=runner.input_batch.req_id_to_index,
sampled_token_ids=valid_sampled_token_ids,
spec_token_ids=spec_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict=prompt_logprobs_dict,
pooler_output=[],
finished_sending=finished_sending,
finished_recving=finished_recving,
num_nans_in_logits=num_nans_in_logits,
fix_req_ids = fix_req_ids,
fix_sampled_token_ids = fix_sampled_token_ids
)
return model_output
\ No newline at end of file
from dataclasses import dataclass
from vllm.v1.outputs import ModelRunnerOutput
@dataclass
class ZeroV1ModelRunnerOutput(ModelRunnerOutput):
# [num_reqs]
fix_req_ids: list[str] = None
fix_sampled_token_ids:list[list[int]] = None
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment