Commit c11b09df authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge branch 'v0.9.2-dev' of https://developer.sourcefind.cn/codes/OpenDAS/vllm into v0.9.2-dev

parents ca034fd5 3a596f0d
...@@ -6,7 +6,7 @@ from typing import Any, Optional, Union ...@@ -6,7 +6,7 @@ from typing import Any, Optional, Union
import torch import torch
import torch.distributed import torch.distributed
from .parallel_state import get_tp_group from .parallel_state import get_tp_group, get_ep_group
def tensor_model_parallel_all_reduce(input_: torch.Tensor) -> torch.Tensor: def tensor_model_parallel_all_reduce(input_: torch.Tensor) -> torch.Tensor:
...@@ -32,6 +32,17 @@ def tensor_model_parallel_gather(input_: torch.Tensor, ...@@ -32,6 +32,17 @@ def tensor_model_parallel_gather(input_: torch.Tensor,
"""Gather the input tensor across model parallel group.""" """Gather the input tensor across model parallel group."""
return get_tp_group().gather(input_, dst, dim) return get_tp_group().gather(input_, dst, dim)
def expert_parallel_all_gather(input_: torch.Tensor,
dim: int = -1) -> torch.Tensor:
"""All-gather the input tensor across model parallel group."""
return get_ep_group().all_gather(input_, dim)
def expert_parallel_gather(input_: torch.Tensor,
dst: int = 0,
dim: int = -1) -> Optional[torch.Tensor]:
"""Gather the input tensor across model parallel group."""
return get_ep_group().gather(input_, dst, dim)
def broadcast_tensor_dict(tensor_dict: Optional[dict[Any, Union[torch.Tensor, def broadcast_tensor_dict(tensor_dict: Optional[dict[Any, Union[torch.Tensor,
Any]]] = None, Any]]] = None,
......
...@@ -164,6 +164,7 @@ if TYPE_CHECKING: ...@@ -164,6 +164,7 @@ if TYPE_CHECKING:
VLLM_USE_FLASH_ATTN_PA: bool = False VLLM_USE_FLASH_ATTN_PA: bool = False
VLLM_USE_APEX_RN: bool = False VLLM_USE_APEX_RN: bool = False
VLLM_USE_GLOBAL_CACHE13: bool = False VLLM_USE_GLOBAL_CACHE13: bool = False
VLLM_USE_ALLTOALL_EP: bool = False
def get_default_cache_root(): def get_default_cache_root():
return os.getenv( return os.getenv(
...@@ -1089,7 +1090,10 @@ environment_variables: dict[str, Callable[[], Any]] = { ...@@ -1089,7 +1090,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_USE_GLOBAL_CACHE13": "VLLM_USE_GLOBAL_CACHE13":
lambda: (os.environ.get("VLLM_USE_GLOBAL_CACHE13", "False").lower() in lambda: (os.environ.get("VLLM_USE_GLOBAL_CACHE13", "False").lower() in
("true", "1")), ("true", "1")),
# vLLM will use all_to_all ep mode
"VLLM_USE_ALLTOALL_EP":
lambda: (os.environ.get("VLLM_USE_ALLTOALL_EP", "True").lower() in
("true", "1")),
} }
# --8<-- [end:env-vars-definition] # --8<-- [end:env-vars-definition]
......
import math
from typing import Callable, List, Optional, Tuple, Union
from dataclasses import dataclass
import torch
from torch import nn
from vllm.model_executor.layers.quantization.base_config import (
QuantizationConfig, QuantizeMethodBase)
from vllm.model_executor.layers.linear import (ColumnParallelLinear,
MergedColumnParallelLinear,
ReplicatedLinear,
RowParallelLinear)
from vllm.model_executor.layers.activation import SiluAndMul
from vllm.distributed import (get_dp_group, get_ep_group,
get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size,
tensor_model_parallel_all_reduce)
try:
from transformer_engine.pytorch.permutation import (
moe_permute,
moe_sort_chunks_by_index,
moe_unpermute,
)
fused_permute = moe_permute
fused_unpermute = moe_unpermute
fused_sort_chunks_by_index = moe_sort_chunks_by_index
HAVE_TE = True
except ImportError:
fused_permute = None
fused_unpermute = None
fused_sort_chunks_by_index = None
HAVE_TE = False
shared_experts_overlap_stream = torch.cuda.Stream()
@dataclass
class EpMoeConfig:
moe_router_topk: int = 2
moe_permute_fusion: bool = False
moe_shared_expert_overlap: bool = False
ep_size: int = 1
num_moe_experts: int = 256
apply_router_weight_on_input: bool = False
routed_scaling_factor: float = 1.0
@staticmethod
def make(moe_router_topk: int = 2,
moe_permute_fusion: bool = False,
moe_shared_expert_overlap: bool = False,
ep_size: int = 1,
num_moe_experts: int = 256,
routed_scaling_factor: float = 1.0,
apply_router_weight_on_input: bool = False) -> "EpMoeConfig":
return EpMoeConfig(moe_router_topk=moe_router_topk,
moe_permute_fusion=moe_permute_fusion,
moe_shared_expert_overlap=moe_shared_expert_overlap,
ep_size=ep_size,
num_moe_experts=num_moe_experts,
routed_scaling_factor=routed_scaling_factor,
apply_router_weight_on_input=apply_router_weight_on_input)
class EPSharedExperts(nn.Module):
def __init__(
self,
hidden_size: int,
intermediate_size: int,
hidden_act: str,
quant_config: Optional[QuantizationConfig] = None,
reduce_results: bool = True,
prefix: str = "",
moe_shared_expert_overlap: bool = True,
) -> None:
super().__init__()
self.gate_up_proj = MergedColumnParallelLinear(
hidden_size, [intermediate_size] * 2,
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.gate_up_proj")
self.down_proj = RowParallelLinear(intermediate_size,
hidden_size,
bias=False,
quant_config=quant_config,
reduce_results=reduce_results,
prefix=f"{prefix}.down_proj")
if hidden_act != "silu":
raise ValueError(f"Unsupported activation: {hidden_act}. "
"Only silu is supported for now.")
self.act_fn = SiluAndMul()
self.moe_shared_expert_overlap = moe_shared_expert_overlap
if self.moe_shared_expert_overlap:
self.cached_fc1_input = None
self.cached_fc2_input = None
self.cached_fc2_output = None
self.cached_output = None
self.gate_score = None
self.stream = shared_experts_overlap_stream
def forward(self, x):
gate_up, _ = self.gate_up_proj(x)
x = self.act_fn(gate_up)
x, _ = self.down_proj(x)
return x
def linear_fc1_forward_and_act(self, overlapped_comm_output=None):
"""
Do Linear FC1 and activation function forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert self.moe_shared_expert_overlap
with torch.cuda.stream(self.stream):
# [s, b, 4 * h/p]
intermediate_parallel, bias_parallel = self.gate_up_proj(self.cached_fc1_input)
self.cached_fc1_input = None
if bias_parallel is not None:
intermediate_parallel = intermediate_parallel + bias_parallel
intermediate_parallel = self.act_fn(intermediate_parallel)
self.cached_fc2_input = intermediate_parallel
def linear_fc2_forward(self, overlapped_comm_output=None):
"""
Do Linear FC2 forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert self.moe_shared_expert_overlap
assert self.cached_fc2_input is not None
with torch.cuda.stream(self.stream):
# [s, b, h]
self.cached_fc2_output, _ = self.down_proj(self.cached_fc2_input)
self.cached_fc2_input = None
def pre_forward_comm(self, input):
"""
All Gather for SP before forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert self.cached_output is None
self.stream.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(self.stream):
self.cached_fc1_input = input
def post_forward_comm(self):
"""
Reduce scatter for SP after forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert self.moe_shared_expert_overlap
assert self.cached_fc2_output is not None
with torch.cuda.stream(self.stream):
self.cached_output = tensor_model_parallel_all_reduce(
self.cached_fc2_output
)
self.cached_fc2_output = None
def get_output(self):
"""
Gets the module forward output.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert self.moe_shared_expert_overlap
assert self.cached_output is not None
with torch.cuda.stream(self.stream):
output = self.cached_output
self.cached_output = None
torch.cuda.current_stream().wait_stream(self.stream)
return output
def maybe_move_tensor_to_cpu(tensor, as_numpy=False, record_stream=False):
"""Move a tensor to CPU if it is on GPU.
Args:
tensor (torch.Tensor or None): The tensor to move to CPU.
as_numpy (bool): Whether to convert the tensor to a numpy array.
record_stream (bool): Whether to record the stream of the tensor, to prevent memory leak
when the DtoH data transfer is on a side stream.
"""
if torch.is_tensor(tensor) and tensor.is_cuda:
cpu_tensor = tensor.to(torch.device("cpu"), non_blocking=True)
if as_numpy:
cpu_tensor = cpu_tensor.numpy()
if record_stream:
tensor.record_stream(torch.cuda.current_stream())
tensor = cpu_tensor
return tensor
def sort_chunks_by_idxs(
input: torch.Tensor, split_sizes: torch.Tensor, sorted_idxs: torch.Tensor, fused: bool = False
):
"""Split and sort the input tensor based on the split_sizes and sorted indices."""
if fused:
if not HAVE_TE or fused_sort_chunks_by_index is None:
raise ValueError(
"fused_sort_chunks_by_index is not available. Please install TE >= 2.1.0."
)
return fused_sort_chunks_by_index(input, split_sizes, sorted_idxs)
input = torch.split(input, split_sizes.tolist(), dim=0)
output = torch.cat([input[i] for i in sorted_idxs.tolist()], dim=0)
return output
def permute(
tokens,
routing_map,
num_out_tokens: Optional[int] = None,
fused: bool = False,
):
"""Permute the tokens and probs based on the mask.
Tokens with the same designated expert will be grouped together.
The shape of mask is [tokens, num_experts], it indicates which experts were selected
by each token.
Args:
tokens (torch.Tensor): The input token tensor, [num_tokens, hidden].
routing_map (torch.Tensor): The sparse token to expert mapping, [num_tokens, num_experts].
num_out_tokens (int, optional): The number of output tokens. If None, it's set to
the number of input tokens.
fused (bool, optional): Whether use the fused permute function.
"""
if fused:
if not HAVE_TE or fused_permute is None:
raise ValueError("fused_permute is not available. Please install TE >= 2.1.0.")
return fused_permute(tokens, routing_map, num_out_tokens)
num_tokens, hidden = tokens.shape
num_experts = routing_map.shape[1]
# mask [num_tokens, num_experts] -> [num_experts, num_tokens]
routing_map = routing_map.bool().T.contiguous()
# Create a dense expert-to-token mapping from the sparse token-to-expert mapping
token_indices = (
torch.arange(num_tokens, device=routing_map.device).unsqueeze(0).expand(num_experts, -1)
)
sorted_indices = token_indices.masked_select(routing_map)
# use the mapping to permute the tokens
permuted_input = tokens.index_select(0, sorted_indices)
return permuted_input, sorted_indices
def unpermute(
permuted_tokens: torch.Tensor,
sorted_indices: torch.Tensor,
restore_shape: torch.Size,
probs: torch.Tensor = None,
routing_map: torch.Tensor = None,
fused: bool = False,
):
"""
Restore the original order of tokens after permutation. If probs are provided, it
will also apply them to the tokens before restoring the order.
This function exploits these features to use ops that support cuda graph.
Args:
permuted_tokens (torch.Tensor): The permuted token tensor.
sorted_indices (torch.Tensor): The indices used to sort the tokens.
restore_shape (torch.Size): The shape of the unpermuted tensor.
probs (torch.Tensor, optional): The unpermuted probs tensor,
routing_map (torch.Tensor, optional): Token to expert mapping, shape
[num_tokens, num_experts].
fused (bool, optional): Whether use the fused unpermute function.
Returns:
torch.Tensor: The tokens restored to their original order.
"""
if fused:
if not HAVE_TE or fused_unpermute is None:
raise ValueError("fused_unpermute is not available. Please install TE >= 2.1.0.")
return fused_unpermute(permuted_tokens, sorted_indices, probs, restore_shape)
_, hidden = restore_shape
input_dtype = permuted_tokens.dtype
if probs is not None:
assert routing_map is not None, "Mask must be provided to permute the probs."
permuted_probs = probs.T.contiguous().masked_select(routing_map.T.contiguous())
# Here may promote permuted_tokens to higher precision (fp32/fp64) if probs is in
# higher precision due to moe_router_dtype being enabled. This can lead to
# additional GPU memory usage. Use --moe-permute-fusion flag to avoid this extra memory
# allocation.
permuted_tokens = permuted_tokens * permuted_probs.unsqueeze(-1)
# Create an output tensor filled with zeros
output_tokens = torch.zeros(
restore_shape, dtype=permuted_tokens.dtype, device=permuted_tokens.device
)
# Scatter add the permuted_input back to the original positions
output_tokens.scatter_add_(0, sorted_indices.unsqueeze(1).expand(-1, hidden), permuted_tokens)
return output_tokens.to(dtype=input_dtype)
def all_to_all(group, input, output_split_sizes, input_split_sizes):
world_size = torch.distributed.get_world_size(group=group)
# Bypass the function if we are using only 1 GPU.
if world_size == 1:
return input
input = input.contiguous()
if output_split_sizes is None:
# Equal split (all2all)
output = torch.empty_like(input)
else:
# Unequal split (all2all-v)
output = input.new_empty(
size=[sum(output_split_sizes)] + list(input.size()[1:]),
dtype=input.dtype,
device=torch.cuda.current_device(),
)
torch.distributed.all_to_all_single(
output,
input,
output_split_sizes=output_split_sizes,
input_split_sizes=input_split_sizes,
group=group,
)
return output
import os
import logging
from typing import Callable, List, Optional, Tuple
from dataclasses import dataclass
import torch
import torch.nn.functional as F
from vllm.logger import init_logger
from vllm.platforms import current_platform
from vllm.model_executor.custom_op import CustomOp
from vllm.forward_context import ForwardContext, get_forward_context
from vllm.model_executor.layers.fused_moe.config import FusedMoEConfig
from vllm.model_executor.layers.quantization.base_config import (
QuantizationConfig, QuantizeMethodBase)
from vllm.model_executor.layers.fused_moe import FusedMoE
from vllm.model_executor.layers.fused_moe.layer import FusedMoEMethodBase, UnquantizedFusedMoEMethod
from vllm.model_executor.layers.fused_moe.ep_moe.token_dispatcher import MoEAlltoAllTokenDispatcher
from vllm.model_executor.layers.fused_moe.ep_moe.ep_moe_utlis import EpMoeConfig
from vllm.utils import direct_register_custom_op
logger = init_logger(__name__)
@CustomOp.register("unquantized_ep_moe")
class UnquantizedEPGroupedGemmMethod(UnquantizedFusedMoEMethod):
"""MoE method without quantization."""
def __init__(self, moe: FusedMoEConfig):
super().__init__(moe)
self.topk_indices_dtype = None
self.moe = moe
self.rocm_aiter_moe_enabled = False # is_rocm_aiter_moe_enabled()
def apply(
self,
layer: torch.nn.Module,
hidden_states: torch.Tensor,
tokens_per_expert: torch.Tensor,
) -> torch.Tensor:
return self.forward(
hidden_states=hidden_states,
layer=layer,
tokens_per_expert=tokens_per_expert)
def forward_cuda(
self,
layer: torch.nn.Module,
hidden_states: torch.Tensor,
tokens_per_expert: torch.Tensor,
) -> torch.Tensor:
# process MoE
def custom_forward(layer, hidden_states, tokens_per_expert):
tokens_per_expert = tokens_per_expert.cpu().numpy()
outputs = []
start_idx = 0
for i, num_tokens in enumerate(tokens_per_expert):
end_idx = start_idx + num_tokens
if num_tokens == 0:
continue
w1 = layer.w13_weight[i]
w2 = layer.w2_weight[i]
tokens_for_this_expert = hidden_states[start_idx:end_idx]
gateup_output = torch.matmul(tokens_for_this_expert, w1)
# Act
down_input = torch.zeros(
gateup_output.shape[0],
gateup_output.shape[1] // 2,
device=gateup_output.device,
dtype=hidden_states.dtype
)
torch.ops._C.silu_and_mul(down_input,
gateup_output.view(-1, w1.shape[1]))
expert_out = torch.matmul(down_input, w2)
outputs.append(expert_out)
start_idx = end_idx
if len(outputs) > 0:
expert_output = torch.cat(outputs, dim=0)
else:
assert hidden_states.numel() == 0, f"sorted_tokens: should be empty, but got {hidden_states.shape}"
expert_output = hidden_states
return expert_output
output = custom_forward(layer, hidden_states, tokens_per_expert)
return output
def forward_cpu(
self,
layer: torch.nn.Module,
hidden_states: torch.Tensor,
tokens_per_expert: torch.Tensor,
**kwargs,
):
raise NotImplementedError
def forward_hpu(
self,
layer: torch.nn.Module,
hidden_states: torch.Tensor,
tokens_per_expert: torch.Tensor,
) -> torch.Tensor:
raise NotImplementedError
def forward_tpu(
self,
layer: torch.nn.Module,
hidden_states: torch.Tensor,
tokens_per_expert: torch.Tensor,
) -> torch.Tensor:
raise NotImplementedError
if current_platform.is_tpu():
forward_native = forward_tpu
elif current_platform.is_cpu():
forward_native = forward_cpu
else:
forward_native = forward_cuda
class EPMoE(FusedMoE):
"""
dp+ep MoE Expert Parallel Impl
"""
def __init__(
self,
num_experts: int, # Global number of experts
top_k: int,
hidden_size: int,
intermediate_size: int,
params_dtype: Optional[torch.dtype] = None,
reduce_results: bool = False,
renormalize: bool = True,
use_grouped_topk: bool = False,
num_expert_group: Optional[int] = None,
topk_group: Optional[int] = None,
quant_config: Optional[QuantizationConfig] = None,
tp_size: Optional[int] = None,
ep_size: Optional[int] = None,
dp_size: Optional[int] = None,
prefix: str = "",
custom_routing_function: Optional[Callable] = None,
scoring_func: str = "softmax",
e_score_correction_bias: Optional[torch.Tensor] = None,
apply_router_weight_on_input: bool = False,
activation: str = "silu",
routed_scaling_factor: Optional[float] = None,
enable_eplb: bool = False,
num_redundant_experts: int = 0,
moe_permute_fusion: bool = True,
moe_shared_expert_overlap: bool = False
):
super().__init__(num_experts, top_k, hidden_size,
intermediate_size, params_dtype,
reduce_results, renormalize,
use_grouped_topk, num_expert_group,
topk_group, quant_config, tp_size,
ep_size, dp_size, prefix,
custom_routing_function, scoring_func,
e_score_correction_bias,
apply_router_weight_on_input,
activation,
routed_scaling_factor=routed_scaling_factor,
enable_eplb=enable_eplb,
num_redundant_experts=num_redundant_experts,
)
self.ep_moe_config: EpMoeConfig = EpMoeConfig.make(
moe_router_topk=self.top_k,
# TODO: support fusion permute
moe_permute_fusion=moe_permute_fusion,
moe_shared_expert_overlap=moe_shared_expert_overlap,
ep_size=self.ep_size,
num_moe_experts=self.global_num_experts,
routed_scaling_factor=self.routed_scaling_factor,
apply_router_weight_on_input=self.apply_router_weight_on_input
)
local_expert_indices_offset = (
self.ep_rank * self.local_num_experts
)
self.local_expert_indices = [
local_expert_indices_offset + i for i in range(self.local_num_experts)
]
self.use_shared_expert = False
self.token_dispatcher = MoEAlltoAllTokenDispatcher(
self.local_num_experts, self.local_expert_indices, config=self.ep_moe_config
)
self.shared_expert_overlap = moe_shared_expert_overlap
self.shared_experts = None
self.dpsk_fp16_quick = os.environ.get('DPSK_FP16_QUICK') == '1'
def set_shared_experts(self, shared_experts: torch.nn.Module):
if self.shared_experts is None:
self.shared_experts = shared_experts
if self.shared_expert_overlap:
self.token_dispatcher.set_shared_experts(self.shared_experts)
def create_quant_method(self, moe, quant_config, prefix):
# Note: get_quant_method will look at the layer's local_num_experts
# for heuristic purposes, so it must be initialized first.
quant_method: Optional[QuantizeMethodBase] = None
quant_method = (UnquantizedEPGroupedGemmMethod(moe) if quant_config is None
else quant_config.get_quant_method(self, prefix))
assert quant_method is not None
assert isinstance(quant_method, FusedMoEMethodBase)
return quant_method
def forward(self, hidden_states: torch.Tensor,
router_logits: torch.Tensor):
return torch.ops.vllm.ep_moe_forward(hidden_states, router_logits,
self.layer_name)
def forward_impl(self, hidden_states: torch.Tensor, router_logits: torch.Tensor):
topk_weights, topk_ids = self.select_experts(
hidden_states=hidden_states,
router_logits=router_logits,
use_grouped_topk=self.use_grouped_topk,
top_k=self.top_k,
renormalize=self.renormalize,
topk_group=self.topk_group,
num_expert_group=self.num_expert_group,
custom_routing_function=self.custom_routing_function,
scoring_func=self.scoring_func,
e_score_correction_bias=self.e_score_correction_bias,
indices_type=torch.int64,
routed_scaling_factor=self.routed_scaling_factor,
use_fused_gate=self.use_fused_gate)
if not self.ep_moe_config.moe_shared_expert_overlap and self.shared_experts is not None:
shared_output = self.shared_experts(hidden_states)
probs = torch.zeros_like(router_logits, dtype=topk_weights.dtype).scatter(1, topk_ids, topk_weights)
routing_map = torch.zeros_like(router_logits).int().scatter(1, topk_ids, 1).bool()
(dispatched_input, tokens_per_expert) = self.token_dispatcher.token_permutation(
hidden_states, probs, routing_map
)
# Matrix multiply.
expert_output = self.quant_method.apply(
layer=self,
hidden_states=dispatched_input,
tokens_per_expert=tokens_per_expert
)
final_hidden_states = self.token_dispatcher.token_unpermutation(expert_output)
if not self.ep_moe_config.moe_shared_expert_overlap and self.shared_experts is not None:
# if shared_expert_overlap is True, the expert calculation happens in
# the token_dispatcher to overlap communications and computations
shared_output = (
self.maybe_all_reduce_tensor_model_parallel(
shared_output))
if hidden_states.dtype != torch.float16 or self.dpsk_fp16_quick:
final_hidden_states = final_hidden_states + shared_output
else:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states = final_hidden_states + shared_output \
* (1. / self.routed_scaling_factor)
return final_hidden_states
def ep_moe_forward(hidden_states: torch.Tensor, router_logits: torch.Tensor,
layer_name: str) -> torch.Tensor:
forward_context: ForwardContext = get_forward_context()
self = forward_context.no_compile_layers[layer_name]
assert self.quant_method is not None
return self.forward_impl(hidden_states, router_logits)
def ep_moe_forward_fake(hidden_states: torch.Tensor, router_logits: torch.Tensor,
layer_name: str) -> torch.Tensor:
return torch.empty_like(hidden_states)
direct_register_custom_op(
op_name="ep_moe_forward",
op_func=ep_moe_forward,
mutates_args=["hidden_states"],
fake_impl=ep_moe_forward_fake,
dispatch_key=current_platform.dispatch_key,
tags=(torch.Tag.needs_fixed_stride_order, ),
)
\ No newline at end of file
...@@ -772,11 +772,7 @@ class FusedMoE(torch.nn.Module): ...@@ -772,11 +772,7 @@ class FusedMoE(torch.nn.Module):
self.moe_config = moe self.moe_config = moe
self.quant_config = quant_config self.quant_config = quant_config
# Note: get_quant_method will look at the layer's local_num_experts quant_method = self.create_quant_method(moe, quant_config, prefix)
# for heuristic purposes, so it must be initialized first.
quant_method: Optional[QuantizeMethodBase] = None
quant_method = (UnquantizedFusedMoEMethod(moe) if quant_config is None
else quant_config.get_quant_method(self, prefix))
assert quant_method is not None assert quant_method is not None
assert isinstance(quant_method, FusedMoEMethodBase) assert isinstance(quant_method, FusedMoEMethodBase)
...@@ -851,6 +847,17 @@ class FusedMoE(torch.nn.Module): ...@@ -851,6 +847,17 @@ class FusedMoE(torch.nn.Module):
dtype=moe.in_dtype, dtype=moe.in_dtype,
device=torch.cuda.current_device()) device=torch.cuda.current_device())
def create_quant_method(self, moe, quant_config, prefix):
# Note: get_quant_method will look at the layer's local_num_experts
# for heuristic purposes, so it must be initialized first.
quant_method: Optional[QuantizeMethodBase] = None
quant_method = (UnquantizedFusedMoEMethod(moe) if quant_config is None
else quant_config.get_quant_method(self, prefix))
assert quant_method is not None
assert isinstance(quant_method, FusedMoEMethodBase)
return quant_method
@property @property
def tp_size(self): def tp_size(self):
return self.moe_parallel_config.tp_size return self.moe_parallel_config.tp_size
......
...@@ -11,6 +11,7 @@ import torch ...@@ -11,6 +11,7 @@ import torch
import torch.nn as nn import torch.nn as nn
from transformers import PretrainedConfig from transformers import PretrainedConfig
import vllm.envs as envs
from vllm.config import CacheConfig, ModelConfig, VllmConfig from vllm.config import CacheConfig, ModelConfig, VllmConfig
from vllm.model_executor.layers.fused_moe import FusedMoE from vllm.model_executor.layers.fused_moe import FusedMoE
from vllm.model_executor.layers.layernorm import RMSNorm from vllm.model_executor.layers.layernorm import RMSNorm
...@@ -24,6 +25,7 @@ from vllm.sequence import IntermediateTensors ...@@ -24,6 +25,7 @@ from vllm.sequence import IntermediateTensors
from vllm.compilation.decorators import support_torch_compile from vllm.compilation.decorators import support_torch_compile
from .deepseek_v2 import (DeepseekV2DecoderLayer, from .deepseek_v2 import (DeepseekV2DecoderLayer,
get_spec_layer_idx_from_weight_name) get_spec_layer_idx_from_weight_name)
from vllm.distributed import get_dp_group
from .interfaces import SupportsPP from .interfaces import SupportsPP
from .utils import maybe_prefix from .utils import maybe_prefix
from vllm import _custom_ops as ops from vllm import _custom_ops as ops
...@@ -174,6 +176,10 @@ class DeepSeekMTP(nn.Module, SupportsPP): ...@@ -174,6 +176,10 @@ class DeepSeekMTP(nn.Module, SupportsPP):
prefix, "model")) prefix, "model"))
self.use_llama_nn = os.environ.get('LLAMA_NN') == '1' self.use_llama_nn = os.environ.get('LLAMA_NN') == '1'
parallel_config = vllm_config.parallel_config
dp_size = get_dp_group().world_size
self.use_all2all_ep = envs.VLLM_USE_ALLTOALL_EP and dp_size > 1 and parallel_config.enable_expert_parallel
def forward( def forward(
self, self,
...@@ -205,6 +211,10 @@ class DeepSeekMTP(nn.Module, SupportsPP): ...@@ -205,6 +211,10 @@ class DeepSeekMTP(nn.Module, SupportsPP):
("gate_up_proj", "up_proj", 1), ("gate_up_proj", "up_proj", 1),
] ]
if self.use_all2all_ep:
ep_moe_shared_experts_keys = "mlp.shared_experts"
ep_moe_shared_experts_mapping = {ep_moe_shared_experts_keys:"mlp.experts.shared_experts"}
expert_params_mapping = FusedMoE.make_expert_params_mapping( expert_params_mapping = FusedMoE.make_expert_params_mapping(
ckpt_gate_proj_name="gate_proj", ckpt_gate_proj_name="gate_proj",
ckpt_down_proj_name="down_proj", ckpt_down_proj_name="down_proj",
...@@ -233,6 +243,9 @@ class DeepSeekMTP(nn.Module, SupportsPP): ...@@ -233,6 +243,9 @@ class DeepSeekMTP(nn.Module, SupportsPP):
if (("mlp.experts." in name) and name not in params_dict): if (("mlp.experts." in name) and name not in params_dict):
continue continue
name = name.replace(weight_name, param_name) name = name.replace(weight_name, param_name)
if self.use_all2all_ep:
name = name.replace(ep_moe_shared_experts_keys, ep_moe_shared_experts_mapping[ep_moe_shared_experts_keys])
# Skip loading extra bias for GPTQ models. # Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict: if name.endswith(".bias") and name not in params_dict:
continue continue
...@@ -248,6 +261,9 @@ class DeepSeekMTP(nn.Module, SupportsPP): ...@@ -248,6 +261,9 @@ class DeepSeekMTP(nn.Module, SupportsPP):
continue continue
name = name.replace(weight_name, param_name) name = name.replace(weight_name, param_name)
if self.use_all2all_ep:
name = name.replace(ep_moe_shared_experts_keys, ep_moe_shared_experts_mapping[ep_moe_shared_experts_keys])
param = params_dict[name] param = params_dict[name]
weight_loader = param.weight_loader weight_loader = param.weight_loader
weight_loader(param, weight_loader(param,
...@@ -257,6 +273,8 @@ class DeepSeekMTP(nn.Module, SupportsPP): ...@@ -257,6 +273,8 @@ class DeepSeekMTP(nn.Module, SupportsPP):
expert_id=expert_id) expert_id=expert_id)
break break
else: else:
if self.use_all2all_ep:
name = name.replace(ep_moe_shared_experts_keys, ep_moe_shared_experts_mapping[ep_moe_shared_experts_keys])
# Skip loading extra bias for GPTQ models. # Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict: if name.endswith(".bias") and name not in params_dict:
continue continue
......
...@@ -39,10 +39,12 @@ from vllm.attention import Attention ...@@ -39,10 +39,12 @@ from vllm.attention import Attention
from vllm.compilation.decorators import support_torch_compile from vllm.compilation.decorators import support_torch_compile
from vllm.config import (CacheConfig, ModelConfig, VllmConfig, from vllm.config import (CacheConfig, ModelConfig, VllmConfig,
get_current_vllm_config) get_current_vllm_config)
from vllm.distributed import (get_ep_group, get_pp_group, from vllm.distributed import (get_ep_group, get_pp_group, get_dp_group,
get_tensor_model_parallel_world_size) get_tensor_model_parallel_world_size)
from vllm.model_executor.layers.activation import SiluAndMul from vllm.model_executor.layers.activation import SiluAndMul
from vllm.model_executor.layers.fused_moe import FusedMoE from vllm.model_executor.layers.fused_moe import FusedMoE
from vllm.model_executor.layers.fused_moe.ep_moe.layer import EPMoE
from vllm.model_executor.layers.fused_moe.ep_moe.ep_moe_utlis import EPSharedExperts
from vllm.model_executor.layers.layernorm import RMSNorm from vllm.model_executor.layers.layernorm import RMSNorm
from vllm.model_executor.layers.linear import (ColumnParallelLinear, from vllm.model_executor.layers.linear import (ColumnParallelLinear,
MergedColumnParallelLinear, MergedColumnParallelLinear,
...@@ -151,8 +153,12 @@ class DeepseekV2MoE(nn.Module): ...@@ -151,8 +153,12 @@ class DeepseekV2MoE(nn.Module):
self.n_local_physical_experts) self.n_local_physical_experts)
self.physical_expert_end = (self.physical_expert_start + self.physical_expert_end = (self.physical_expert_start +
self.n_local_physical_experts) self.n_local_physical_experts)
self.experts = FusedMoE( dp_size = get_dp_group().world_size
self.use_all2all_ep = envs.VLLM_USE_ALLTOALL_EP and dp_size > 1 and parallel_config.enable_expert_parallel
moe_cls = FusedMoE if not self.use_all2all_ep else EPMoE
self.experts = moe_cls(
num_experts=config.n_routed_experts, num_experts=config.n_routed_experts,
top_k=config.num_experts_per_tok, top_k=config.num_experts_per_tok,
hidden_size=config.hidden_size, hidden_size=config.hidden_size,
...@@ -173,7 +179,8 @@ class DeepseekV2MoE(nn.Module): ...@@ -173,7 +179,8 @@ class DeepseekV2MoE(nn.Module):
if config.n_shared_experts is not None: if config.n_shared_experts is not None:
intermediate_size = (config.moe_intermediate_size * intermediate_size = (config.moe_intermediate_size *
config.n_shared_experts) config.n_shared_experts)
self.shared_experts = DeepseekV2MLP( shared_expert_cls = DeepseekV2MLP if not self.use_all2all_ep else EPSharedExperts
self.shared_experts = shared_expert_cls(
hidden_size=config.hidden_size, hidden_size=config.hidden_size,
intermediate_size=intermediate_size, intermediate_size=intermediate_size,
hidden_act=config.hidden_act, hidden_act=config.hidden_act,
...@@ -182,43 +189,52 @@ class DeepseekV2MoE(nn.Module): ...@@ -182,43 +189,52 @@ class DeepseekV2MoE(nn.Module):
), ),
prefix=f"{prefix}.shared_experts", prefix=f"{prefix}.shared_experts",
) )
if self.use_all2all_ep:
self.experts.set_shared_experts(self.shared_experts)
from vllm.two_batch_overlap.two_batch_overlap import tbo_all_reduce from vllm.two_batch_overlap.two_batch_overlap import tbo_all_reduce
self.tbo_all_reduce = tbo_all_reduce self.tbo_all_reduce = tbo_all_reduce
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
num_tokens, hidden_dim = hidden_states.shape num_tokens, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim) hidden_states = hidden_states.view(-1, hidden_dim)
if self.n_shared_experts is not None: if not self.use_all2all_ep:
shared_output = self.shared_experts(hidden_states) if self.n_shared_experts is not None:
# router_logits: (num_tokens, n_experts) shared_output = self.shared_experts(hidden_states)
router_logits, _ = self.gate(hidden_states) router_logits, _ = self.gate(hidden_states)
if hidden_states.dtype != torch.float16 or self.dpsk_fp16_quick: if not self.use_all2all_ep:
final_hidden_states = self.experts( if hidden_states.dtype != torch.float16:
hidden_states=hidden_states, final_hidden_states = self.experts(
router_logits=router_logits) * self.routed_scaling_factor hidden_states=hidden_states,
else: router_logits=router_logits) * self.routed_scaling_factor
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states = self.experts(hidden_states=hidden_states,
router_logits=router_logits)
if shared_output is not None:
if hidden_states.dtype != torch.float16 or self.dpsk_fp16_quick:
final_hidden_states = final_hidden_states + shared_output
else: else:
# Fix FP16 overflow # Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details. # See DeepseekV2DecoderLayer for more details.
final_hidden_states = final_hidden_states + shared_output \ final_hidden_states = self.experts(hidden_states=hidden_states,
* (1. / self.routed_scaling_factor) router_logits=router_logits)
else:
if self.tp_size > 1: final_hidden_states = self.experts(hidden_states=hidden_states,
if envs.VLLM_ENABLE_TBO: router_logits=router_logits)
final_hidden_states = self.tbo_all_reduce(final_hidden_states)
else: if not self.use_all2all_ep:
final_hidden_states = ( if shared_output is not None:
self.experts.maybe_all_reduce_tensor_model_parallel( if hidden_states.dtype != torch.float16 or self.dpsk_fp16_quick:
final_hidden_states)) final_hidden_states = final_hidden_states + shared_output
else:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states = final_hidden_states + shared_output \
* (1. / self.routed_scaling_factor)
if self.tp_size > 1:
if envs.VLLM_ENABLE_TBO:
final_hidden_states = self.tbo_all_reduce(final_hidden_states)
else:
final_hidden_states = (
self.experts.maybe_all_reduce_tensor_model_parallel(
final_hidden_states))
return final_hidden_states.view(num_tokens, hidden_dim) return final_hidden_states.view(num_tokens, hidden_dim)
...@@ -790,6 +806,10 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts): ...@@ -790,6 +806,10 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
self.tritonsingleton.topk = config.num_experts_per_tok self.tritonsingleton.topk = config.num_experts_per_tok
self.tritonsingleton.quant_method=self.quant_method self.tritonsingleton.quant_method=self.quant_method
parallel_config = vllm_config.parallel_config
dp_size = get_dp_group().world_size
self.use_all2all_ep = envs.VLLM_USE_ALLTOALL_EP and dp_size > 1 and parallel_config.enable_expert_parallel
def set_eplb_state( def set_eplb_state(
self, self,
expert_load_view: torch.Tensor, expert_load_view: torch.Tensor,
...@@ -871,6 +891,10 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts): ...@@ -871,6 +891,10 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
("gate_up_proj", "up_proj", 1), ("gate_up_proj", "up_proj", 1),
] ]
if self.use_all2all_ep:
ep_moe_shared_experts_keys = "mlp.shared_experts"
ep_moe_shared_experts_mapping = {ep_moe_shared_experts_keys:"mlp.experts.shared_experts"}
# Params for weights, fp8 weight scales, fp8 activation scales # Params for weights, fp8 weight scales, fp8 activation scales
# (param_name, weight_name, expert_id, shard_id) # (param_name, weight_name, expert_id, shard_id)
expert_params_mapping = FusedMoE.make_expert_params_mapping( expert_params_mapping = FusedMoE.make_expert_params_mapping(
...@@ -903,6 +927,10 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts): ...@@ -903,6 +927,10 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
if (("mlp.experts." in name) and name not in params_dict): if (("mlp.experts." in name) and name not in params_dict):
continue continue
name = name.replace(weight_name, param_name) name = name.replace(weight_name, param_name)
if self.use_all2all_ep:
name = name.replace(ep_moe_shared_experts_keys, ep_moe_shared_experts_mapping[ep_moe_shared_experts_keys])
# Skip loading extra bias for GPTQ models. # Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict: if name.endswith(".bias") and name not in params_dict:
continue continue
...@@ -929,6 +957,9 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts): ...@@ -929,6 +957,9 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
# Instead, create a new variable # Instead, create a new variable
name_mapped = name.replace(weight_name, param_name) name_mapped = name.replace(weight_name, param_name)
if self.use_all2all_ep:
name_mapped = name_mapped.replace(ep_moe_shared_experts_keys, ep_moe_shared_experts_mapping[ep_moe_shared_experts_keys])
if is_pp_missing_parameter(name_mapped, self): if is_pp_missing_parameter(name_mapped, self):
continue continue
...@@ -953,7 +984,9 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts): ...@@ -953,7 +984,9 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
# However it's not mapped locally to this rank # However it's not mapped locally to this rank
# So we simply skip it # So we simply skip it
continue continue
if self.use_all2all_ep:
name = name.replace(ep_moe_shared_experts_keys, ep_moe_shared_experts_mapping[ep_moe_shared_experts_keys])
# Skip loading extra bias for GPTQ models. # Skip loading extra bias for GPTQ models.
if name.endswith(".bias") and name not in params_dict: if name.endswith(".bias") and name not in params_dict:
continue continue
......
...@@ -244,11 +244,18 @@ class CoreEngineActorManager: ...@@ -244,11 +244,18 @@ class CoreEngineActorManager:
local_engine_count = \ local_engine_count = \
vllm_config.parallel_config.data_parallel_size_local vllm_config.parallel_config.data_parallel_size_local
nodes = sorted(list_nodes(), # nodes = sorted(list_nodes(),
key=lambda node: node.node_ip != dp_master_ip) # key=lambda node: node.node_ip != dp_master_ip)
assert nodes[0].node_ip == dp_master_ip, ( # assert nodes[0].node_ip == dp_master_ip, (
# "The first node must be the head node")
# assert len(nodes) == 1 or nodes[1].node_ip != dp_master_ip, (
# "There can only be one head node")
nodes = ray.nodes()
nodes = sorted(nodes,
key=lambda node: node["NodeManagerAddress"] != dp_master_ip)
assert nodes[0]["NodeManagerAddress"] == dp_master_ip, (
"The first node must be the head node") "The first node must be the head node")
assert len(nodes) == 1 or nodes[1].node_ip != dp_master_ip, ( assert len(nodes) == 1 or nodes[1]["NodeManagerAddress"] != dp_master_ip, (
"There can only be one head node") "There can only be one head node")
available_resources = available_resources_per_node() available_resources = available_resources_per_node()
...@@ -257,8 +264,11 @@ class CoreEngineActorManager: ...@@ -257,8 +264,11 @@ class CoreEngineActorManager:
local_dp_ranks: list[int] = [] local_dp_ranks: list[int] = []
for node in nodes: for node in nodes:
node_ip = node.node_ip # node_ip = node.node_ip
node_resources = available_resources[node.node_id] # node_resources = available_resources[node.node_id]
node_ip = node["NodeManagerAddress"]
node_resources = available_resources[node["NodeID"]]
# For now, each DP rank can only be assigned to one node # For now, each DP rank can only be assigned to one node
# TODO(rui): support allocating a single DP rank # TODO(rui): support allocating a single DP rank
# to multiple nodes # to multiple nodes
......
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import os
import copy import copy
import gc import gc
import time import time
...@@ -319,6 +320,9 @@ class GPUModelRunner(LoRAModelRunnerMixin): ...@@ -319,6 +320,9 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# from the KV cache of `shared_kv_cache_layers[layer_name]`. # from the KV cache of `shared_kv_cache_layers[layer_name]`.
self.shared_kv_cache_layers: dict[str, str] = {} self.shared_kv_cache_layers: dict[str, str] = {}
dp_size = self.vllm_config.parallel_config.data_parallel_size
self.use_all2all_ep = envs.VLLM_USE_ALLTOALL_EP and dp_size > 1 and parallel_config.enable_expert_parallel
def _may_reorder_batch(self, scheduler_output: "SchedulerOutput") -> None: def _may_reorder_batch(self, scheduler_output: "SchedulerOutput") -> None:
""" """
Update the order of requests in the batch based on the attention Update the order of requests in the batch based on the attention
...@@ -1231,7 +1235,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): ...@@ -1231,7 +1235,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# TODO(tms) : There are many cases where padding is enabled for # TODO(tms) : There are many cases where padding is enabled for
# prefills, causing unnecessary and excessive padding of activations. # prefills, causing unnecessary and excessive padding of activations.
if dp_size == 1 or self.vllm_config.model_config.enforce_eager: if dp_size == 1 or self.vllm_config.model_config.enforce_eager or self.use_all2all_ep:
# Early exit. # Early exit.
return 0, None return 0, None
...@@ -2005,7 +2009,10 @@ class GPUModelRunner(LoRAModelRunnerMixin): ...@@ -2005,7 +2009,10 @@ class GPUModelRunner(LoRAModelRunnerMixin):
num_reqs = min(num_tokens, max_num_reqs) num_reqs = min(num_tokens, max_num_reqs)
min_tokens_per_req = num_tokens // num_reqs min_tokens_per_req = num_tokens // num_reqs
if not is_profile and self.speculative_config is not None and self.speculative_config.num_lookahead_slots > 0: if not is_profile and self.speculative_config is not None \
and self.speculative_config.num_lookahead_slots > 0 \
and num_tokens >= 1 + self.speculative_config.num_lookahead_slots:
min_tokens_per_req = (1 + self.speculative_config.num_lookahead_slots) min_tokens_per_req = (1 + self.speculative_config.num_lookahead_slots)
num_reqs = num_tokens // min_tokens_per_req num_reqs = num_tokens // min_tokens_per_req
num_scheduled_tokens_list = [min_tokens_per_req] * num_reqs num_scheduled_tokens_list = [min_tokens_per_req] * num_reqs
...@@ -2054,6 +2061,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): ...@@ -2054,6 +2061,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
input_ids = None input_ids = None
inputs_embeds = self.inputs_embeds[:num_tokens] inputs_embeds = self.inputs_embeds[:num_tokens]
else: else:
self.input_ids[:num_tokens] = torch.randint(0, self.model_config.get_vocab_size(), (num_tokens,), dtype=torch.int32)
input_ids = self.input_ids[:num_tokens] input_ids = self.input_ids[:num_tokens]
inputs_embeds = None inputs_embeds = None
if self.uses_mrope: if self.uses_mrope:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment