Commit 6b7cdbf4 authored by 王敏's avatar 王敏
Browse files

优化pp+mtp代码

parent 19f117d8
......@@ -266,6 +266,7 @@ class DeepSeekMTP(nn.Module, DeepseekV2MixtureOfExperts, SupportsPP):
os.environ['LLAMA_NN'] = '0'
os.environ['LM_NN'] = '0'
self.use_llama_nn = os.environ.get('LLAMA_NN') == '1'
self.use_pp = vllm_config.parallel_config.pipeline_parallel_size > 1
def set_moe_parameters(self):
......@@ -351,8 +352,8 @@ class DeepSeekMTP(nn.Module, DeepseekV2MixtureOfExperts, SupportsPP):
spec_layer = get_spec_layer_idx_from_weight_name(self.config, name)
if spec_layer is None:
# load embed_tokens weight from target model if mtp weights missing embed_tokens
if "embed_tokens" in name:
# load embed_tokens weight from target model in pp mode
if "embed_tokens" in name and self.use_pp:
for local_name in params_dict.keys():
if "embed_tokens" in local_name:
param = params_dict[local_name]
......
......@@ -1657,11 +1657,10 @@ class Scheduler(SchedulerInterface):
for idx, req in enumerate(itertools.chain(running_reqs, resumed_reqs)):
req_id = req.request_id
req_ids.append(req_id)
#if self.use_pp:
# NOTE: In PP+async scheduling, we consume token ids via a direct GPU
# broadcast path (`input_batch.prev_sampled_token_ids`), so we can
# omit this payload.
if self.use_pp and not self.scheduler_config.async_scheduling:
if self.use_pp:
# When using PP, the scheduler sends the sampled tokens back,
# because there's no direct communication between the first-
# stage worker and the last-stage worker. Otherwise, we don't
......
......@@ -931,7 +931,7 @@ class GPUModelRunner(
The SamplingMetadata is updated and copied to the GPU if there is a
new/resumed/paused/finished request in the batch.
"""
if scheduler_output.total_num_scheduled_tokens == 0:
if scheduler_output.total_num_scheduled_tokens == 0 and not self.use_async_scheduling:
return
# Remove finished requests from the cached states.
for req_id in scheduler_output.finished_req_ids:
......@@ -1092,13 +1092,9 @@ class GPUModelRunner(
req_state.num_computed_tokens = num_computed_tokens
if not is_last_rank:
if not req_data.new_token_ids:
# Async scheduled PP: Sampled tokens propagated via GPU broadcast.
new_token_ids: list[int] = []
else:
# Non-async scheduling with PP: The scheduler sends
# sampled token ids back because there's no direct communication
# between the first-stage worker and the last-stage worker.
# When using PP, the scheduler sends the sampled tokens back,
# because there's no direct communication between the first-
# stage worker and the last-stage worker.
new_token_ids = req_data.new_token_ids[i]
# Add the sampled token(s) from the previous step (if any).
# This doesn't include "unverified" tokens like spec tokens.
......@@ -1109,9 +1105,7 @@ class GPUModelRunner(
# Avoid slicing list in most common case.
req_state.output_token_ids.append(new_token_ids[-1])
elif num_new_tokens > 0:
req_state.output_token_ids.extend(
new_token_ids[-num_new_tokens:]
)
req_state.output_token_ids.extend(new_token_ids[-num_new_tokens:])
elif num_output_tokens < len(req_state.output_token_ids):
# Some output tokens were discarded due to a sync-KV-load
# failure. Align the cached state.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment