Commit b78549c2 authored by lizhigong's avatar lizhigong
Browse files

debug zero overhead

parent 7d867671
......@@ -14,7 +14,7 @@ from vllm.attention.backends.abstract import AttentionType
from vllm.multimodal import MultiModalPlaceholderMap
from vllm.utils import async_tensor_h2d, make_tensor_with_pad
from vllm.profiler.prof import profile
if TYPE_CHECKING:
from vllm.worker.model_runner_base import ModelRunnerBase
......@@ -235,8 +235,10 @@ class CommonMetadataBuilder(AttentionMetadataBuilder[TAttentionMetadata]):
for i, block_table in enumerate(self.block_tables):
if block_table:
input_block_tables[i, :len(block_table)] = block_table
block_tables = torch.from_numpy(input_block_tables).to(
device, non_blocking=True)
# block_tables = torch.from_numpy(input_block_tables).to(
# device, non_blocking=True)
block_tables = async_tensor_h2d(input_block_tables.tolist(), torch.int32,
device, self.runner.pin_memory)
else:
block_tables = make_tensor_with_pad(
self.block_tables,
......@@ -245,7 +247,6 @@ class CommonMetadataBuilder(AttentionMetadataBuilder[TAttentionMetadata]):
device=device,
)
assert max_query_len > 0, "query_lens: {}".format(query_lens)
assert device is not None
context_lens_tensor = async_tensor_h2d(self.context_lens, torch.int,
device, self.runner.pin_memory)
......
......@@ -412,6 +412,8 @@ class LLMEngine:
self.zero_overhead = os.environ.get('VLLM_ZERO_OVERHEAD') == '1'
self.step_switch = 0 # 0 step A 1 step B
self.output_recorder = [None, None]
self.async_d2h = None
self.async_event = torch.cuda.Event(enable_timing=False)
profile.StartTracer()
def _initialize_kv_caches(self) -> None:
......@@ -1238,9 +1240,11 @@ class LLMEngine:
seq_group_metadata_list: List[SequenceGroupMetadata],
scheduled_seq_groups: List[ScheduledSequenceGroup]) -> None:
sample_out_list = output[0].sampler_out_tenosr.cpu().tolist()
for seq_group_metadata, sequence_group_outputs, scheduled_seq_group, token_id in \
zip(seq_group_metadata_list, output[0], scheduled_seq_groups, sample_out_list):
#sample_out_list = output[0].sampler_out_tenosr.cpu().tolist()
sample_out_list = self.async_d2h.tolist()
sample_out_ids = output[0].sampler_out_ids.tolist()
for seq_group_metadata, sequence_group_outputs, scheduled_seq_group in \
zip(seq_group_metadata_list, output[0], scheduled_seq_groups):
seq_group = scheduled_seq_group.seq_group
if seq_group.is_finished():
......@@ -1251,8 +1255,11 @@ class LLMEngine:
assert len(seq_group.seqs) == 1
seq = seq_group.seqs[0]
for token_id, seq_id in zip(sample_out_list, sample_out_ids):
if seq.seq_id == seq_id:
sample.output_token = token_id[0]
seq.fix_last_token_id(sample.output_token)
break
def _advance_to_next_step(
self, output: List[SamplerOutput],
......@@ -1406,15 +1413,15 @@ class LLMEngine:
assert seq_group_metadata_list is not None
assert scheduler_outputs is not None
profile.ProfRangeAutoPush('execute_model')
last_outputs_ids = None
last_outputs_tensor = None
if self.zero_overhead:
recode_output = self.output_recorder[self.step_switch]
recode_output = self.output_recorder[1 - self.step_switch]
if recode_output is not None:
last_output = recode_output[0][0]
last_outputs_ids, last_outputs_tensor = last_output.sampler_out_ids, last_output.sampler_out_tenosr
self.output_recorder[self.step_switch] = None # only use for once
self.async_d2h = last_outputs_tensor.to('cpu', non_blocking=True)
self.async_event.record()
if not scheduler_outputs.is_empty():
# Check if we have a cached last_output from the previous iteration.
......@@ -1441,9 +1448,10 @@ class LLMEngine:
execute_model_req.async_callback = self.async_callbacks[
virtual_engine]
profile.ProfRangeAutoPush('model_executor')
outputs = self.model_executor.execute_model(
execute_model_req=execute_model_req)
print('###outputs', outputs)
profile.ProfRangeAutoPush('end_executor')
# We need to do this here so that last step's sampled_token_ids can
# be passed to the next iteration for PP.
if self.scheduler_config.is_multi_step:
......@@ -1468,8 +1476,10 @@ class LLMEngine:
if recode_output is None:
return None
outputs, seq_group_metadata_list, scheduler_outputs = self.output_recorder[self.step_switch]
self.output_recorder[self.step_switch] = None # only use for once
ctx.seq_group_metadata_list = seq_group_metadata_list
ctx.scheduler_outputs = scheduler_outputs
self.async_event.synchronize()
self._fix_last_step(
outputs, seq_group_metadata_list,
scheduler_outputs.scheduled_seq_groups)
......
......@@ -1388,7 +1388,7 @@ class LLM:
total_out_toks = 0
while self.llm_engine.has_unfinished_requests():
step_outputs = self.llm_engine.step()
print('###step_outputs', step_outputs)
#print('###step_outputs', step_outputs)
if step_outputs is None:
continue
for output in step_outputs:
......
......@@ -24,6 +24,5 @@ def _update_input_tokens(
tl.store(input_tokens + pid, output_token)
def UpdateInputTokens(input_tokens, input_seq_ids, last_sample, last_ids):
last_ids = last_ids.to('cuda')
grid = [input_seq_ids.shape[0], 1, 1]
_update_input_tokens[grid](last_sample, last_ids, input_tokens, input_seq_ids, last_ids.shape[0], input_seq_ids.shape[0])
\ No newline at end of file
......@@ -32,6 +32,7 @@ if envs.VLLM_USE_FLASHINFER_SAMPLER and find_spec("flashinfer"):
# yapf: enable
else:
flashinfer_top_k_top_p_sampling = None
from vllm.profiler.prof import profile
def get_sampler() -> torch.nn.Module:
......@@ -266,6 +267,7 @@ class Sampler(nn.Module):
logits: (num_tokens, vocab_size).
sampling_metadata: Metadata for sampling.
"""
profile.ProfRangeAutoPush('sampler_forward')
assert logits is not None
_, vocab_size = logits.shape
......@@ -278,6 +280,7 @@ class Sampler(nn.Module):
# reuse sampling tensors, since "output_tokens" changes
# between decode runs.
self._init_sampling_tensors(logits, sampling_metadata)
profile.ProfRangeAutoPush('sampler1')
assert self._sampling_tensors is not None
sampling_tensors = self._sampling_tensors
......@@ -1292,9 +1295,6 @@ def _build_sampler_output(
sampled_token_probs, logprobs_tensor, sampled_token_ids = (None, None,
None)
if d2d_data.zero_overhead:
pass
return SamplerOutput(
outputs=sampler_output,
sampled_token_probs=sampled_token_probs,
......
......@@ -11,6 +11,7 @@ from vllm.sequence import (VLLM_TOKEN_ID_ARRAY_TYPE, SequenceData,
SequenceGroupMetadata)
from vllm.utils import (PyObjectCache, async_tensor_h2d,
is_pin_memory_available, make_tensor_with_pad)
from vllm.profiler.prof import profile
_SAMPLING_EPS = 1e-5
......@@ -511,10 +512,10 @@ class SamplingTensors:
) -> "SamplingTensors":
# Note that the performance will be very bad without
# pinned memory.
profile.ProfRangeAutoPush('from_lists')
pin_memory = is_pin_memory_available()
do_penalties = prompt_tokens or output_tokens
if do_penalties:
prompt_t = make_tensor_with_pad(
prompt_tokens,
......@@ -534,7 +535,7 @@ class SamplingTensors:
empty_tensor = torch.empty(0, device=device, dtype=torch.long)
prompt_t = empty_tensor
output_t = empty_tensor
profile.ProfRangeAutoPush('from_lists1')
temperatures_t = torch.tensor(
temperatures,
device="cpu",
......@@ -580,6 +581,7 @@ class SamplingTensors:
# Because the memory is pinned, we can do non-blocking
# transfer to device.
profile.ProfRangeAutoPush('from_lists2')
return cls(
temperatures=temperatures_t.to(device=device, non_blocking=True),
top_ps=top_ps_t.to(device=device, non_blocking=True),
......
......@@ -841,6 +841,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]):
# Combine and flatten intermediate data.
input_tokens = []
token_types = []
profile.ProfRangeAutoPush('build')
for inter_data in self.inter_data_list:
for cur_input_tokens in inter_data.input_tokens:
input_tokens.extend(cur_input_tokens)
......@@ -917,12 +918,13 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]):
self.runner.pin_memory)
if self.zero_overhead and self.last_sample_tensor is not None:
input_ids = torch.tensor(self.req_ids, device='cuda')
UpdateInputTokens(input_tokens_tensor, input_ids, self.last_sample_tensor, self.last_sample_ids)
print('####input_tokens_tensor', input_tokens_tensor)
print('####input_ids', input_ids)
print('####self.last_sample_tensor', self.last_sample_tensor)
print('####self.last_sample_ids', self.last_sample_ids)
input_ids = async_tensor_h2d(self.req_ids, torch.long,
self.runner.device,
self.runner.pin_memory)
last_ids = async_tensor_h2d(self.last_sample_ids.tolist(), torch.long,
self.runner.device,
self.runner.pin_memory)
UpdateInputTokens(input_tokens_tensor, input_ids, self.last_sample_tensor, last_ids)
token_types_tensor = async_tensor_h2d(token_types, torch.long,
self.runner.device,
......@@ -1004,6 +1006,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]):
]
multi_modal_kwargs = MultiModalKwargs.batch(multi_modal_kwargs_list)
profile.ProfRangeAutoPush('build_end')
return self.model_input_cls(
input_tokens=input_tokens_tensor,
input_positions=input_positions_tensor,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment