Commit 3f8b2afe authored by lizhigong's avatar lizhigong
Browse files

merge and debug tbo on 0.9.2

parent 20e75ed6
This diff is collapsed.
from typing import Any, Optional, Union
import numpy as np
import torch
from vllm import envs
from vllm.distributed.kv_transfer.kv_transfer_state import get_kv_transfer_group, has_kv_transfer_group
from vllm.distributed.parallel_state import get_pp_group, get_tp_group
from vllm.forward_context import set_forward_context
from vllm.sequence import IntermediateTensors
from vllm.two_batch_overlap.v1.two_batch_overlap_v1 import tbo_model_executable_v1
from vllm.utils import async_tensor_h2d
from vllm.v1.attention.backends.utils import CommonAttentionMetadata
from vllm.v1.core.sched.output import CachedRequestData, SchedulerOutput
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.spec_decode.metadata import SpecDecodeMetadata
from vllm.v1.worker.block_table import BlockTable
class TBOModelInputSplit():
def __init__(self):
self.req_ids_left = []
self.req_ids_right = []
self.req_num_left = 0
self.req_num_right = 0
self.scheduler_output_left = None
self.scheduler_output_right = None
input_split = TBOModelInputSplit()
def split_scheduler_output(runner, scheduler_output:SchedulerOutput):
split_tokens = scheduler_output.total_num_scheduled_tokens // 2
req_ids = runner.input_batch.req_ids
tokens_counter = 0
min_idx = -1
min_counter = 0
for i, id in enumerate(req_ids):
tokens_counter += scheduler_output.num_scheduled_tokens[id]
diff = abs(tokens_counter - split_tokens)
if min_idx == -1 or diff < min_counter:
min_idx = i
min_counter = diff
if tokens_counter > split_tokens or diff == 0:
break
input_split.req_num_left = min_idx + 1
if input_split.req_num_left == len(req_ids):
input_split.req_num_left = input_split.req_num_left - 1
input_split.req_ids_left = req_ids[:input_split.req_num_left]
input_split.req_ids_right = req_ids[input_split.req_num_left:]
input_split.req_num_right = len(req_ids) - input_split.req_num_left
new_req_data_left = []
new_req_data_right = []
cached_reqs_left = []
cached_reqs_right = []
num_scheduled_tokens_left = {}
num_scheduled_tokens_right = {}
total_num_scheduled_tokens_left = 0
total_num_scheduled_tokens_right = 0
for new_req in scheduler_output.scheduled_new_reqs:
if new_req.req_id in input_split.req_ids_left:
new_req_data_left.append(new_req)
else:
new_req_data_right.append(new_req)
#print('###scheduler_output.scheduled_cached_reqs', scheduler_output.scheduled_cached_reqs)
cached_reqs_left = CachedRequestData.make_empty()
cached_reqs_right = CachedRequestData.make_empty()
for req_idx, req_id in enumerate(scheduler_output.scheduled_cached_reqs.req_ids):
if req_id in input_split.req_ids_left:
cached_reqs_left.req_ids.append(req_id)
cached_reqs_left.resumed_from_preemption.append(scheduler_output.scheduled_cached_reqs.resumed_from_preemption[req_idx])
if len(scheduler_output.scheduled_cached_reqs.new_token_ids) > 0:
cached_reqs_left.new_token_ids.append(scheduler_output.scheduled_cached_reqs.new_token_ids[req_idx])
cached_reqs_left.new_block_ids.append(scheduler_output.scheduled_cached_reqs.new_block_ids[req_idx])
cached_reqs_left.num_computed_tokens.append(scheduler_output.scheduled_cached_reqs.num_computed_tokens[req_idx])
else:
cached_reqs_right.req_ids.append(req_id)
cached_reqs_right.resumed_from_preemption.append(scheduler_output.scheduled_cached_reqs.resumed_from_preemption[req_idx])
if len(scheduler_output.scheduled_cached_reqs.new_token_ids) > 0:
cached_reqs_right.new_token_ids.append(scheduler_output.scheduled_cached_reqs.new_token_ids[req_idx])
cached_reqs_right.new_block_ids.append(scheduler_output.scheduled_cached_reqs.new_block_ids[req_idx])
cached_reqs_right.num_computed_tokens.append(scheduler_output.scheduled_cached_reqs.num_computed_tokens[req_idx])
for key, value in scheduler_output.num_scheduled_tokens.items():
if key in input_split.req_ids_left:
num_scheduled_tokens_left[key] = value
total_num_scheduled_tokens_left += value
else:
num_scheduled_tokens_right[key] = value
total_num_scheduled_tokens_right += value
input_split.scheduler_output_left = SchedulerOutput(
scheduled_new_reqs=new_req_data_left,
scheduled_cached_reqs=cached_reqs_left,
num_scheduled_tokens=num_scheduled_tokens_left,
total_num_scheduled_tokens=total_num_scheduled_tokens_left,
scheduled_spec_decode_tokens=scheduler_output.scheduled_spec_decode_tokens,
scheduled_encoder_inputs=scheduler_output.scheduled_encoder_inputs, ##unsupport yet
num_common_prefix_blocks=scheduler_output.num_common_prefix_blocks,
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids=scheduler_output.finished_req_ids,
free_encoder_input_ids=scheduler_output.free_encoder_input_ids,
structured_output_request_ids=scheduler_output.structured_output_request_ids,
grammar_bitmask=scheduler_output.grammar_bitmask,
)
input_split.scheduler_output_right = SchedulerOutput(
scheduled_new_reqs=new_req_data_right,
scheduled_cached_reqs=cached_reqs_right,
num_scheduled_tokens=num_scheduled_tokens_right,
total_num_scheduled_tokens=total_num_scheduled_tokens_right,
scheduled_spec_decode_tokens=scheduler_output.scheduled_spec_decode_tokens,
scheduled_encoder_inputs=scheduler_output.scheduled_encoder_inputs, ##unsupport yet
num_common_prefix_blocks=scheduler_output.num_common_prefix_blocks,
# finished_req_ids is an existing state in the scheduler,
# instead of being newly scheduled in this step.
# It contains the request IDs that are finished in between
# the previous and the current steps.
finished_req_ids=scheduler_output.finished_req_ids,
free_encoder_input_ids=scheduler_output.free_encoder_input_ids,
structured_output_request_ids=scheduler_output.structured_output_request_ids,
grammar_bitmask=scheduler_output.grammar_bitmask,
)
def prepare_tbo_atten_metadata(
runner,
scheduler_output: "SchedulerOutput",
req_ids,
req_offset
) -> tuple[dict[str, Any], torch.Tensor, Optional[SpecDecodeMetadata]]:
total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
assert total_num_scheduled_tokens > 0
num_reqs = len(req_ids)
assert num_reqs > 0
seq_len_offset = req_offset
if req_offset == 0: #left
query_start_offset = 0
else:
query_start_offset = req_offset + 1
# Get the number of scheduled tokens for each request.
tokens = [scheduler_output.num_scheduled_tokens[i] for i in req_ids]
num_scheduled_tokens = np.array(tokens, dtype=np.int32)
max_num_scheduled_tokens = max(tokens)
# Get request indices.
# E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2]
req_indices = np.repeat(runner.arange_np[:num_reqs],
num_scheduled_tokens) + req_offset
# cu_num_tokens: [2, 5, 3] -> [2, 7, 10]
# arange: [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
cu_num_tokens, arange = runner._get_cumsum_and_arange(
num_scheduled_tokens)
# Get positions.
positions_np = runner.positions_np[:total_num_scheduled_tokens]
np.add(runner.input_batch.num_computed_tokens_cpu[req_indices],
arange,
out=positions_np)
# Calculate the slot mapping for each KV cache group.
for kv_cache_group_id, kv_cache_group_spec in enumerate(
runner.kv_cache_config.kv_cache_groups):
block_size = kv_cache_group_spec.kv_cache_spec.block_size
block_table: BlockTable = runner.input_batch.block_table[
kv_cache_group_id]
# E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
# -> [0, 0, K, K, K + 1, K + 1, K + 2, 2 * K, 2 * K, 2 * K + 1]
# where K is the max_num_blocks_per_req and the block size is 2.
# NOTE(woosuk): We can't simply use `token_indices // block_size`
# here because M (max_model_len) is not necessarily divisible by
# block_size.
block_table_indices = (
req_indices * block_table.max_num_blocks_per_req +
positions_np // block_size)
block_table_cpu = block_table.get_cpu_tensor()
block_numbers = block_table_cpu.flatten(
)[block_table_indices].numpy()
block_offsets = positions_np % block_size
np.add(
block_numbers * block_size,
block_offsets,
out=block_table.slot_mapping_np[:total_num_scheduled_tokens])
# Prepare the attention metadata.
runner.query_start_loc_np[0] = 0
runner.query_start_loc_np[1:num_reqs + 1] = cu_num_tokens
runner.seq_lens_np[:num_reqs] = (
runner.input_batch.num_computed_tokens_cpu[req_offset : req_offset + num_reqs] +
num_scheduled_tokens)
runner.query_start_loc[query_start_offset: query_start_offset + num_reqs + 1].copy_(
runner.query_start_loc_cpu[:num_reqs + 1], non_blocking=True)
# Note: pad query_start_loc to be non-decreasing, as kernels
# like FlashAttention requires that
if req_offset > 0: #right
runner.query_start_loc[query_start_offset + num_reqs + 1:].fill_(
runner.query_start_loc_cpu[num_reqs].item())
runner.seq_lens[seq_len_offset :seq_len_offset + num_reqs].copy_(runner.seq_lens_cpu[:num_reqs],
non_blocking=True)
# Fill unused with -1. Needed for reshape_and_cache
if req_offset > 0: #right
runner.seq_lens[seq_len_offset + num_reqs:].fill_(0)
query_start_loc = runner.query_start_loc[query_start_offset: query_start_offset + num_reqs + 1]
seq_lens = runner.seq_lens[seq_len_offset : seq_len_offset + num_reqs]
common_attn_metadata = CommonAttentionMetadata(
query_start_loc=query_start_loc,
seq_lens=seq_lens,
num_reqs=num_reqs,
num_actual_tokens=total_num_scheduled_tokens,
max_query_len=max_num_scheduled_tokens)
attn_metadata: dict[str, Any] = {}
# Prepare the attention metadata for each KV cache group and make layers
# in the same group share the same metadata.
for kv_cache_group_id, kv_cache_group_spec in enumerate(
runner.kv_cache_config.kv_cache_groups):
# Prepare for cascade attention if enabled & beneficial.
common_prefix_len = 0
if runner.cascade_attn_enabled:
common_prefix_len = runner._compute_cascade_attn_prefix_len(
num_scheduled_tokens,
scheduler_output.
num_common_prefix_blocks[kv_cache_group_id],
kv_cache_group_spec.kv_cache_spec,
runner.attn_metadata_builders[kv_cache_group_id],
)
if req_offset > 0:
origin_block_table = runner.attn_metadata_builders[kv_cache_group_id].block_table.block_table
runner.attn_metadata_builders[kv_cache_group_id].block_table.block_table = origin_block_table[req_offset:, :]
origin_slot_mapping = runner.attn_metadata_builders[kv_cache_group_id].block_table.slot_mapping
runner.attn_metadata_builders[kv_cache_group_id].block_table.slot_mapping = \
origin_slot_mapping[input_split.scheduler_output_left.total_num_scheduled_tokens:]
attn_metadata_i = (
runner.attn_metadata_builders[kv_cache_group_id].build(
common_prefix_len=common_prefix_len,
common_attn_metadata=common_attn_metadata)) # maybe FlashAttentionMetadata
if req_offset > 0:
runner.attn_metadata_builders[kv_cache_group_id].block_table.block_table = origin_block_table
runner.attn_metadata_builders[kv_cache_group_id].block_table.slot_mapping = origin_slot_mapping
for layer_name in kv_cache_group_spec.layer_names:
attn_metadata[layer_name] = attn_metadata_i
return attn_metadata
def pad_num_input_tokens(self, scheduler_output):
num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
if (self.use_cuda_graph
and num_scheduled_tokens <= self.cudagraph_batch_sizes[-1]):
# Use piecewise CUDA graphs.
# Add padding to the batch size.
num_input_tokens = self.vllm_config.pad_for_cudagraph(
num_scheduled_tokens)
else:
# Eager mode.
# Pad tokens to multiple of tensor_parallel_size when
# enabled collective fusion for SP
tp_size = self.vllm_config.parallel_config.tensor_parallel_size
if self.vllm_config.compilation_config.pass_config. \
enable_sequence_parallelism and tp_size > 1:
from vllm.utils import round_up
num_input_tokens = round_up(num_scheduled_tokens, tp_size)
else:
num_input_tokens = num_scheduled_tokens
# Padding for DP
num_pad, num_tokens_across_dp = self.get_dp_padding(num_input_tokens)
num_input_tokens += num_pad
return num_input_tokens, num_tokens_across_dp
def tbo_split_and_execute_model(
runner,
attn_metadata,
num_input_tokens,
num_tokens_across_dp,
input_ids,
positions,
inputs_embeds,
scheduler_output: "SchedulerOutput",
intermediate_tensors: Optional[IntermediateTensors] = None,
) -> Union[ModelRunnerOutput, IntermediateTensors]:
use_tbo = False
if len(scheduler_output.num_scheduled_tokens) > 1:
split_scheduler_output(runner, scheduler_output)
if input_split.scheduler_output_left.total_num_scheduled_tokens >= envs.VLLM_TBO_MIN_TOKENS and \
input_split.scheduler_output_right.total_num_scheduled_tokens >= envs.VLLM_TBO_MIN_TOKENS:
use_tbo = True
if use_tbo:
num_input_tokens_left = input_split.scheduler_output_left.total_num_scheduled_tokens
num_input_tokens_right = num_input_tokens - num_input_tokens_left
attn_metadata_left = prepare_tbo_atten_metadata(runner, input_split.scheduler_output_left, input_split.req_ids_left, 0)
attn_metadata_right = prepare_tbo_atten_metadata(runner, input_split.scheduler_output_right, input_split.req_ids_right, input_split.req_num_left)
model_output = tbo_model_executable_v1(
runner,
attn_metadata_left,
attn_metadata_right,
num_input_tokens_left,
num_input_tokens_right,
num_tokens_across_dp,
input_ids,
positions,
intermediate_tensors,
inputs_embeds)
finished_sending, finished_recving = None, None
else:
# Run the decoder.
# Use persistent buffers for CUDA graphs.
with set_forward_context(attn_metadata,
runner.vllm_config,
num_tokens=num_input_tokens,
num_tokens_across_dp=num_tokens_across_dp):
runner.maybe_setup_kv_connector(scheduler_output)
model_output = runner.model(
input_ids=input_ids,
positions=positions,
intermediate_tensors=intermediate_tensors,
inputs_embeds=inputs_embeds,
)
runner.maybe_wait_for_kv_save()
finished_sending, finished_recving = (
runner.get_finished_kv_transfers(scheduler_output))
return model_output, finished_sending, finished_recving
\ No newline at end of file
...@@ -68,6 +68,7 @@ from vllm.v1.worker.block_table import BlockTable ...@@ -68,6 +68,7 @@ from vllm.v1.worker.block_table import BlockTable
from vllm.v1.worker.gpu_input_batch import CachedRequestState, InputBatch from vllm.v1.worker.gpu_input_batch import CachedRequestState, InputBatch
from vllm.v1.worker.lora_model_runner_mixin import LoRAModelRunnerMixin from vllm.v1.worker.lora_model_runner_mixin import LoRAModelRunnerMixin
from vllm.platforms import current_platform from vllm.platforms import current_platform
from vllm.two_batch_overlap.v1.model_input_split_v1 import tbo_split_and_execute_model
from ..sample.logits_processor import LogitsProcessorManager from ..sample.logits_processor import LogitsProcessorManager
from .utils import (gather_mm_placeholders, initialize_kv_cache_for_kv_sharing, from .utils import (gather_mm_placeholders, initialize_kv_cache_for_kv_sharing,
...@@ -1361,6 +1362,12 @@ class GPUModelRunner(LoRAModelRunnerMixin): ...@@ -1361,6 +1362,12 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# compiled with full CUDA graphs, we have to skip them entirely. # compiled with full CUDA graphs, we have to skip them entirely.
skip_cuda_graphs = self.full_cuda_graph and not attention_cuda_graphs skip_cuda_graphs = self.full_cuda_graph and not attention_cuda_graphs
if envs.VLLM_ENABLE_TBO and not self.use_cuda_graph:
model_output, finished_sending, finished_recving = \
tbo_split_and_execute_model(self, attn_metadata, num_input_tokens,
num_tokens_across_dp, input_ids, positions,
inputs_embeds, scheduler_output, intermediate_tensors)
else:
# Run the model. # Run the model.
# Use persistent buffers for CUDA graphs. # Use persistent buffers for CUDA graphs.
with set_forward_context( with set_forward_context(
......
...@@ -22,7 +22,6 @@ from vllm.lora.request import LoRARequest ...@@ -22,7 +22,6 @@ from vllm.lora.request import LoRARequest
from vllm.model_executor import set_random_seed from vllm.model_executor import set_random_seed
from vllm.platforms import current_platform from vllm.platforms import current_platform
from vllm.sequence import IntermediateTensors from vllm.sequence import IntermediateTensors
from vllm.two_batch_overlap.v1.gpu_model_runner import TBO_GPUModelRunner
from vllm.utils import GiB_bytes, MemorySnapshot, memory_profiling from vllm.utils import GiB_bytes, MemorySnapshot, memory_profiling
from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec
from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.outputs import ModelRunnerOutput
...@@ -163,10 +162,6 @@ class Worker(WorkerBase): ...@@ -163,10 +162,6 @@ class Worker(WorkerBase):
set_random_seed(self.model_config.seed) set_random_seed(self.model_config.seed)
# Construct the model runner # Construct the model runner
if envs.VLLM_ENABLE_TBO:
self.model_runner: TBO_GPUModelRunner = TBO_GPUModelRunner(
self.vllm_config, self.device)
else:
self.model_runner: GPUModelRunner = GPUModelRunner( self.model_runner: GPUModelRunner = GPUModelRunner(
self.vllm_config, self.device) self.vllm_config, self.device)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment