"vllm/vscode:/vscode.git/clone" did not exist on "b40f2ffccc19db2e7186d7dbade5ed507a4c9115"
Commit 8bfc2d65 authored by yangql's avatar yangql
Browse files

Merge branch 'v0.9.2-dev-ds-yql_auto' into v0.9.2-dev-ds_auto_21

parents 8813afd8 094f1299
......@@ -194,6 +194,7 @@ class DeepEPHTAll2AllManager(DeepEPAll2AllManagerBase):
num_rdma_bytes=num_rdma_bytes,
low_latency_mode=False,
num_qps_per_rank=num_qps_per_rank,
allow_mnnvl=envs.VLLM_ALLOW_MNNVL,
explicitly_destroy=False)
def get_handle(self, kwargs):
......@@ -275,3 +276,56 @@ class DeepEPLLAll2AllManager(DeepEPAll2AllManagerBase):
# in get_or_create must be updated.
handle.set_num_sms(self.num_sms)
return handle
class DeepEPAutoAll2AllManager(All2AllManagerBase):
"""
Simplified auto manager that always builds handles through the
low-latency DeepEP manager. This avoids creating multiple buffer
instances and mirrors the sglang behavior of relying on LL buffers.
"""
def __init__(self, cpu_group):
super().__init__(cpu_group)
self.ll_manager = DeepEPLLAll2AllManager(cpu_group)
self.ht_manager = DeepEPHTAll2AllManager(cpu_group)
def get_handle(self, kwargs):
"""
Build a DeepEP Buffer using LL args but sized to the larger of HT/LL
requirements (max of num_nvl_bytes/num_rdma_bytes).
"""
import deep_ep
kwargs = dict(kwargs)
# Build canonical kwargs for each path.
ll_kwargs = self.ll_manager._make_all2all_kwargs(**kwargs)
ht_kwargs = self.ht_manager._make_all2all_kwargs()
# Take the max for buffer sizes to be compatible with both modes.
merged_kwargs = dict(ll_kwargs)
merged_kwargs["num_nvl_bytes"] = max(ll_kwargs["num_nvl_bytes"],
ht_kwargs["num_nvl_bytes"])
merged_kwargs["num_rdma_bytes"] = max(ll_kwargs["num_rdma_bytes"],
ht_kwargs["num_rdma_bytes"])
logger.debug("DeepEP auto merged args %s", merged_kwargs)
handle: deep_ep.Buffer = self.ll_manager.handle_cache.get_or_create(
merged_kwargs, deep_ep.Buffer)
handle.set_num_sms(self.ll_manager.num_sms)
return handle
def dispatch(self, hidden_states: torch.Tensor,
router_logits: torch.Tensor):
raise NotImplementedError(
"DeepEPAutoAll2AllManager does not support dispatch directly; "
"use the underlying HT/LL managers.")
def combine(self, hidden_states: torch.Tensor) -> torch.Tensor:
raise NotImplementedError(
"DeepEPAutoAll2AllManager does not support combine directly; "
"use the underlying HT/LL managers.")
def destroy(self):
self.ll_manager.destroy()
......@@ -87,6 +87,10 @@ class CudaCommunicator(DeviceCommunicatorBase):
from .all2all import DeepEPLLAll2AllManager
self.all2all_manager = DeepEPLLAll2AllManager(self.cpu_group)
logger.info("Using DeepEP Low-Latency all2all manager.")
elif all2all_backend == "deepep_auto":
from .all2all import DeepEPAutoAll2AllManager
self.all2all_manager = DeepEPAutoAll2AllManager(self.cpu_group)
logger.info("Using DeepEP Auto all2all manager.")
elif all2all_backend == "mori":
pass
else:
......
......@@ -128,6 +128,7 @@ if TYPE_CHECKING:
VLLM_NIXL_SIDE_CHANNEL_HOST: str = "localhost"
VLLM_NIXL_SIDE_CHANNEL_PORT: int = 5557
VLLM_ALL2ALL_BACKEND: str = "naive"
VLLM_MOE_HT_THRESHOLD: int = 128
VLLM_ALLOW_MNNVL: bool = False
VLLM_MAX_TOKENS_PER_EXPERT_FP4_MOE: int = 163840
VLLM_TOOL_PARSE_REGEX_TIMEOUT_SECONDS: int = 1
......@@ -954,6 +955,9 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_ALL2ALL_BACKEND":
lambda: os.getenv("VLLM_ALL2ALL_BACKEND", "naive"),
# VLLM_MOE_HT_THRESHOLD
"VLLM_MOE_HT_THRESHOLD":
lambda: int(os.getenv("VLLM_MOE_HT_THRESHOLD", "128")),
# use ALLOW_MNNVL
"VLLM_ALLOW_MNNVL":
lambda: (os.environ.get("VLLM_ALLOW_MNNVL", "False").lower() in
......
......@@ -187,6 +187,11 @@ class FusedMoEParallelConfig:
return (self.use_all2all_kernels
and envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency")
@property
def use_deepep_auto_kernels(self):
return (self.use_all2all_kernels
and envs.VLLM_ALL2ALL_BACKEND == "deepep_auto")
@staticmethod
def make(tp_size_: int, dp_size_: int,
vllm_parallel_config: ParallelConfig) -> "FusedMoEParallelConfig":
......@@ -385,6 +390,10 @@ class FusedMoEConfig:
def use_deepep_ll_kernels(self):
return self.moe_parallel_config.use_deepep_ll_kernels
@property
def use_deepep_auto_kernels(self):
return self.moe_parallel_config.use_deepep_auto_kernels
@staticmethod
def make(
num_experts: int,
......
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from typing import Optional
import torch
import vllm.model_executor.layers.fused_moe.modular_kernel as mk
from vllm.model_executor.layers.fused_moe.config import FusedMoEQuantConfig
from vllm.forward_context import get_forward_context
class DeepEPAutoPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize):
"""
Auto Prepare/Finalize that wraps both DeepEP High-Throughput and
Low-Latency implementations and selects one based on prefill/decode phase.
"""
def __init__(self,
ht_prepare_finalize: mk.FusedMoEPrepareAndFinalize,
ll_prepare_finalize: mk.FusedMoEPrepareAndFinalize):
super().__init__()
self.ht_prepare_finalize = ht_prepare_finalize
self.ll_prepare_finalize = ll_prepare_finalize
self._current_phase = "decode" # default to decode (LL)
def _get_current_prepare_finalize(self) -> mk.FusedMoEPrepareAndFinalize:
"""Get the appropriate prepare_finalize based on current phase."""
# Try to infer phase from forward_context if available:
# try:
# forward_context = get_forward_context()
# attn_metadata = forward_context.attn_metadata
# # Handle both v0 (single AttentionMetadata) and v1 (dict) formats
# if isinstance(attn_metadata, dict):
# if attn_metadata:
# attn_metadata = next(iter(attn_metadata.values()))
# else:
# attn_metadata = None
# if attn_metadata is not None and hasattr(attn_metadata,
# "num_decode_tokens"):
# # 只根据 decode tokens 判定:有 decode -> decode,否则 prefill
# self._current_phase = ("decode"
# if attn_metadata.num_decode_tokens > 0
# else "prefill")
# except Exception:
# # If forward_context is not available, use stored phase
# pass
# Prefill uses HT, decode uses LL
if self._current_phase == "prefill":
#rint("************prefill***********")
return self.ll_prepare_finalize
else:
# print("attn_metadata.num_decode_tokens",attn_metadata.num_decode_tokens)
return self.ht__prepare_finalize
#return self.ht_prepare_finalize
@property
def activation_format(self) -> mk.FusedMoEActivationFormat:
pf = self._get_current_prepare_finalize()
try:
return pf.activation_format
except NotImplementedError:
# Fallback to standard format if underlying impl does not provide it.
return mk.FusedMoEActivationFormat.Standard
def topk_indices_dtype(self) -> Optional[torch.dtype]:
pf = self._get_current_prepare_finalize()
return pf.topk_indices_dtype()
def max_num_tokens_per_rank(self) -> Optional[int]:
pf = self._get_current_prepare_finalize()
return pf.max_num_tokens_per_rank()
def num_dispatchers(self) -> int:
pf = self._get_current_prepare_finalize()
return pf.num_dispatchers()
def prepare_async(
self,
a1: torch.Tensor,
a1_scale: Optional[torch.Tensor],
a2_scale: Optional[torch.Tensor],
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
num_experts: int,
expert_map: Optional[torch.Tensor],
apply_router_weight_on_input: bool,
quant_config: FusedMoEQuantConfig,
):
pf = self._get_current_prepare_finalize()
return pf.prepare_async(
a1, a1_scale, a2_scale, topk_weights, topk_ids,
num_experts, expert_map, apply_router_weight_on_input, quant_config)
def prepare(
self,
a1: torch.Tensor,
a1_scale: Optional[torch.Tensor],
a2_scale: Optional[torch.Tensor],
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
num_experts: int,
expert_map: Optional[torch.Tensor],
apply_router_weight_on_input: bool,
quant_config: FusedMoEQuantConfig,
) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[torch.Tensor],
Optional[torch.Tensor], Optional[torch.Tensor]]:
pf = self._get_current_prepare_finalize()
return pf.prepare(
a1, a1_scale, a2_scale, topk_weights, topk_ids,
num_experts, expert_map, apply_router_weight_on_input, quant_config)
def finalize(
self,
output: torch.Tensor,
fused_expert_output: torch.Tensor,
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
apply_router_weight_on_input: bool,
apply_weights_and_reduce: bool = True
) -> None:
pf = self._get_current_prepare_finalize()
return pf.finalize(
output, fused_expert_output, topk_weights, topk_ids,
apply_router_weight_on_input, apply_weights_and_reduce)
def finalize_async(
self,
output: torch.Tensor,
fused_expert_output: torch.Tensor,
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
apply_router_weight_on_input: bool,
apply_weights_and_reduce: bool = True
):
pf = self._get_current_prepare_finalize()
if hasattr(pf, "finalize_async"):
return pf.finalize_async(
output, fused_expert_output, topk_weights, topk_ids,
apply_router_weight_on_input, apply_weights_and_reduce)
return pf.finalize(
output, fused_expert_output, topk_weights, topk_ids,
apply_router_weight_on_input, apply_weights_and_reduce)
......@@ -55,6 +55,7 @@ if current_platform.is_cuda_alike():
from .deepep_ht_prepare_finalize import DeepEPHTPrepareAndFinalize
from .deepep_ll_prepare_finalize import (DEEPEP_QUANT_BLOCK_SHAPE,
DeepEPLLPrepareAndFinalize)
from .deepep_auto_prepare_finalize import DeepEPAutoPrepareAndFinalize
else:
fused_experts = None # type: ignore
FusedMoEPermuteExpertsUnpermute = None # type: ignore
......@@ -140,6 +141,62 @@ class FusedMoEMethodBase(QuantizeMethodBase):
num_local_experts=moe.num_local_experts,
num_dispatchers=num_dispatchers,
)
elif moe.use_deepep_auto_kernels:
# Initialize both HT and LL prepare_finalize but reuse the single
# LL handle for both (sglang-style single handle)
assert moe.dp_size == all2all_manager.dp_world_size
ll_all_to_all_args = dict(
max_num_tokens_per_dp_rank=moe.max_num_tokens,
token_hidden_size=moe.hidden_dim,
num_ep_ranks=all2all_manager.world_size,
num_global_experts=moe.num_experts,
num_local_experts=moe.num_experts //
all2all_manager.world_size,
)
ll_handle = all2all_manager.get_handle(ll_all_to_all_args)
# HT prepare/finalize built on the same LL handle per request
ht_prepare_finalize = DeepEPHTPrepareAndFinalize(
ll_handle,
num_dispatchers=all2all_manager.world_size,
dp_size=all2all_manager.dp_world_size,
rank_expert_offset=all2all_manager.rank *
moe.num_local_experts,
)
use_fp8_dispatch = (moe.quant_config is not None
and moe.quant_config.quant_dtype
== current_platform.fp8_dtype()
and moe.quant_config.block_shape
== DEEPEP_QUANT_BLOCK_SHAPE)
use_int8_dispatch = False
ll_prepare_finalize = DeepEPLLPrepareAndFinalize(
ll_handle,
max_tokens_per_rank=moe.max_num_tokens,
num_dispatchers=all2all_manager.world_size,
use_fp8_dispatch=use_fp8_dispatch,
use_int8_dispatch=use_int8_dispatch,
)
prepare_finalize = DeepEPAutoPrepareAndFinalize(
ht_prepare_finalize, ll_prepare_finalize)
experts_ht = self.select_gemm_impl(ht_prepare_finalize, moe)
experts_ll = self.select_gemm_impl(ll_prepare_finalize, moe)
self.topk_indices_dtype = ll_prepare_finalize.topk_indices_dtype()
self.fused_experts = DeepGemmDisabledFusedMoEModularKernel(
prepare_finalize,
experts_ll,
experts_ht=experts_ht,
experts_ll=experts_ll,
shared_experts=layer.shared_experts if hasattr(layer, "shared_experts") else None,
)
return
elif moe.use_deepep_ht_kernels:
assert moe.dp_size == all2all_manager.dp_world_size
......@@ -916,6 +973,9 @@ class FusedMoE(torch.nn.Module):
@property
def use_deepep_ll_kernels(self):
return self.moe_parallel_config.use_deepep_ll_kernels
@property
def use_deepep_auto_kernels(self):
return self.moe_parallel_config.use_deepep_auto_kernels
@property
def shared_experts(self) -> Optional[torch.nn.Module]:
......@@ -1443,7 +1503,7 @@ class FusedMoE(torch.nn.Module):
early.
"""
return (self.use_pplx_kernels or self.use_deepep_ht_kernels
or self.use_deepep_ll_kernels)
or self.use_deepep_ll_kernels or self.use_deepep_auto_kernels)
def maybe_all_reduce_tensor_model_parallel(
self, final_hidden_states: torch.Tensor):
......@@ -1451,7 +1511,7 @@ class FusedMoE(torch.nn.Module):
The pplx combine kernel reduces across GPU ranks by default.
"""
if (self.use_pplx_kernels or self.use_deepep_ht_kernels
or self.use_deepep_ll_kernels):
or self.use_deepep_ll_kernels or self.use_deepep_auto_kernels):
return final_hidden_states
else:
return tensor_model_parallel_all_reduce(final_hidden_states)
......
......@@ -6,7 +6,9 @@ from math import prod
from typing import Optional, final
from dataclasses import dataclass
from collections.abc import Callable
from vllm.logger import init_logger
logger = init_logger(__name__)
import torch
import vllm.envs as envs
......@@ -828,11 +830,16 @@ class DeepGemmDisabledFusedMoEModularKernel(torch.nn.Module):
self,
prepare_finalize: FusedMoEPrepareAndFinalize,
fused_experts: CustomizedFusedMoEPermuteExpertsUnpermute,
experts_ht: CustomizedFusedMoEPermuteExpertsUnpermute = None,
experts_ll: CustomizedFusedMoEPermuteExpertsUnpermute = None,
shared_experts: Optional[torch.nn.Module] = None,
):
super().__init__()
self.prepare_finalize = prepare_finalize
self.fused_experts = fused_experts
self.fused_experts_ht = experts_ht
self.fused_experts_ll = experts_ll
self.shared_experts = shared_experts
# assert prepare_finalize.activation_format == \
......@@ -899,7 +906,29 @@ class DeepGemmDisabledFusedMoEModularKernel(torch.nn.Module):
Returns:
- torch.Tensor: The output tensor after applying the MoE layer.
"""
prepare_finalize = self.prepare_finalize
fused_experts = self.fused_experts
# from vllm.config import get_current_vllm_config
# vllm_cfg = get_current_vllm_config()
# max_tokens_for_cudagraph = vllm_cfg.compilation_config.max_capture_size
# num_ht_ll_tokens = max_tokens_for_cudagraph
if envs.VLLM_ALL2ALL_BACKEND == "deepep_auto":
num_ht_ll_tokens = envs.VLLM_MOE_HT_THRESHOLD
num_tokens = hidden_states.size(0)
logger.info("num_tokens=%d", num_tokens)
if num_tokens > num_ht_ll_tokens and False:
prepare_finalize = self.prepare_finalize.ht_prepare_finalize
fused_experts = self.fused_experts_ht
else:
prepare_finalize = self.prepare_finalize.ll_prepare_finalize
fused_experts = self.fused_experts_ll
a1 = hidden_states
if inplace and self.shared_experts is None:
......@@ -911,7 +940,7 @@ class DeepGemmDisabledFusedMoEModularKernel(torch.nn.Module):
if global_num_experts == -1:
global_num_experts = local_num_experts
prepare_ret = self.prepare_finalize.prepare_async(
prepare_ret = prepare_finalize.prepare_async(
a1,
a1_scale,
a2_scale,
......@@ -920,7 +949,7 @@ class DeepGemmDisabledFusedMoEModularKernel(torch.nn.Module):
global_num_experts,
expert_map,
apply_router_weight_on_input,
self.fused_experts.quant_config,
fused_experts.quant_config,
)
hook, receiver = (
prepare_ret if isinstance(prepare_ret, tuple) else (None, prepare_ret)
......@@ -951,7 +980,7 @@ class DeepGemmDisabledFusedMoEModularKernel(torch.nn.Module):
# and can never run into the tensor.numel() == 0 case.
fused_out = torch.empty_like(a1q).to(dtype=a1.dtype)
else:
fused_out = self.fused_experts.apply(
fused_out = fused_experts.apply(
None,
a1,
a1q,
......@@ -978,7 +1007,7 @@ class DeepGemmDisabledFusedMoEModularKernel(torch.nn.Module):
)
shared_output = None
hook = self.prepare_finalize.finalize_async(output, fused_out, topk_weights,
hook = prepare_finalize.finalize_async(output, fused_out, topk_weights,
topk_ids, apply_router_weight_on_input, apply_weights_and_reduce=True)
if self.shared_experts is not None:
......
......@@ -87,9 +87,11 @@ class CompressedTensorsW8A8Int8MarlinMoEMethod(CompressedTensorsMarlinMoEMethod)
parallel_config = vllm_config.parallel_config
self.dp_size = get_dp_group().world_size
self.ep_size = get_ep_group().world_size
self.use_deepep = self.dp_size > 1 and parallel_config.enable_expert_parallel and \
(envs.VLLM_ALL2ALL_BACKEND == "deepep_high_throughput" or \
envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency")
envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency" or \
envs.VLLM_ALL2ALL_BACKEND == "deepep_auto")
self.use_deepgemm = False
if self.use_deepep:
......@@ -97,7 +99,7 @@ class CompressedTensorsW8A8Int8MarlinMoEMethod(CompressedTensorsMarlinMoEMethod)
assert all2all_manager is not None
self.num_dispatchers = all2all_manager.world_size
self.block_shape = [256, 256]
self.use_deepgemm = envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency" or envs.VLLM_ENABLE_DEEPEP_HT_DEEPGEMM
self.use_deepgemm = envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency" or envs.VLLM_ENABLE_DEEPEP_HT_DEEPGEMM or envs.VLLM_ALL2ALL_BACKEND == "deepep_auto"
def create_weights(self, layer: torch.nn.Module, num_experts: int,
......
......@@ -176,7 +176,8 @@ class DeepseekV2MoE(nn.Module):
self.enable_expert_parallel = parallel_config.enable_expert_parallel
self.use_deepep = dp_size > 1 and parallel_config.enable_expert_parallel and \
(envs.VLLM_ALL2ALL_BACKEND == "deepep_high_throughput" or \
envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency")
envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency" or \
envs.VLLM_ALL2ALL_BACKEND == "deepep_auto")
if not self.use_deepep:
moe_cls = FusedMoE if not self.use_mori_ep else MoriMoE
......@@ -724,7 +725,8 @@ class DeepseekV2DecoderLayer(nn.Module):
parallel_config = vllm_config.parallel_config
self.use_deepep = self.dp_size > 1 and parallel_config.enable_expert_parallel and \
(envs.VLLM_ALL2ALL_BACKEND == "deepep_high_throughput" or \
envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency")
envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency" or \
envs.VLLM_ALL2ALL_BACKEND == "deepep_auto")
self.tp_size = get_tensor_model_parallel_world_size()
self.config = config
......
......@@ -1238,6 +1238,8 @@ class GPUModelRunnerBase(LoRAModelRunnerMixin):
# prefills, causing unnecessary and excessive padding of activations.
if dp_size == 1 or self.vllm_config.model_config.enforce_eager or envs.VLLM_ALL2ALL_BACKEND != 'naive':
# auto
if not envs.VLLM_ALL2ALL_BACKEND == "deepep_auto":
# Early exit.
return 0, None
......@@ -1314,6 +1316,7 @@ class GPUModelRunnerBase(LoRAModelRunnerMixin):
spec_decode_metadata,
num_scheduled_tokens_np) = (self._prepare_inputs(scheduler_output))
num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
logger.info("***********self.cudagraph_batch_sizes_max",self.cudagraph_batch_sizes[-1])
if (self.use_cuda_graph
and num_scheduled_tokens <= self.cudagraph_batch_sizes[-1]):
# Use piecewise CUDA graphs.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment