Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
0b467604
Commit
0b467604
authored
Oct 13, 2025
by
王敏
Browse files
去掉all2all ep相关代码
parent
766663e6
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
45 additions
and
1252 deletions
+45
-1252
vllm/distributed/communication_op.py
vllm/distributed/communication_op.py
+1
-12
vllm/envs.py
vllm/envs.py
+0
-5
vllm/model_executor/layers/fused_moe/ep_moe/ep_moe_utlis.py
vllm/model_executor/layers/fused_moe/ep_moe/ep_moe_utlis.py
+0
-341
vllm/model_executor/layers/fused_moe/ep_moe/layer.py
vllm/model_executor/layers/fused_moe/ep_moe/layer.py
+0
-302
vllm/model_executor/layers/fused_moe/ep_moe/token_dispatcher.py
...odel_executor/layers/fused_moe/ep_moe/token_dispatcher.py
+0
-470
vllm/model_executor/layers/fused_moe/layer.py
vllm/model_executor/layers/fused_moe/layer.py
+5
-12
vllm/model_executor/layers/quantization/slimquant_w4a8.py
vllm/model_executor/layers/quantization/slimquant_w4a8.py
+0
-18
vllm/model_executor/models/deepseek_mtp.py
vllm/model_executor/models/deepseek_mtp.py
+0
-17
vllm/model_executor/models/deepseek_v2.py
vllm/model_executor/models/deepseek_v2.py
+38
-70
vllm/v1/worker/gpu_model_runner.py
vllm/v1/worker/gpu_model_runner.py
+1
-5
No files found.
vllm/distributed/communication_op.py
View file @
0b467604
...
...
@@ -6,7 +6,7 @@ from typing import Any, Optional, Union
import
torch
import
torch.distributed
from
.parallel_state
import
get_tp_group
,
get_ep_group
from
.parallel_state
import
get_tp_group
def
tensor_model_parallel_all_reduce
(
input_
:
torch
.
Tensor
)
->
torch
.
Tensor
:
...
...
@@ -32,17 +32,6 @@ def tensor_model_parallel_gather(input_: torch.Tensor,
"""Gather the input tensor across model parallel group."""
return
get_tp_group
().
gather
(
input_
,
dst
,
dim
)
def
expert_parallel_all_gather
(
input_
:
torch
.
Tensor
,
dim
:
int
=
-
1
)
->
torch
.
Tensor
:
"""All-gather the input tensor across model parallel group."""
return
get_ep_group
().
all_gather
(
input_
,
dim
)
def
expert_parallel_gather
(
input_
:
torch
.
Tensor
,
dst
:
int
=
0
,
dim
:
int
=
-
1
)
->
Optional
[
torch
.
Tensor
]:
"""Gather the input tensor across model parallel group."""
return
get_ep_group
().
gather
(
input_
,
dst
,
dim
)
def
broadcast_tensor_dict
(
tensor_dict
:
Optional
[
dict
[
Any
,
Union
[
torch
.
Tensor
,
Any
]]]
=
None
,
...
...
vllm/envs.py
View file @
0b467604
...
...
@@ -172,7 +172,6 @@ if TYPE_CHECKING:
VLLM_USE_MERGE_ATTN_STATES_OPT
:
bool
=
False
USE_FUSED_RMS_QUANT
:
bool
=
False
USE_FUSED_SILU_MUL_QUANT
:
bool
=
False
VLLM_USE_ALLTOALL_EP
:
bool
=
False
VLLM_P2P_ASYNC
:
bool
=
False
def
get_default_cache_root
():
...
...
@@ -1131,10 +1130,6 @@ environment_variables: dict[str, Callable[[], Any]] = {
"USE_FUSED_SILU_MUL_QUANT"
:
lambda
:
(
os
.
getenv
(
'USE_FUSED_SILU_MUL_QUANT'
,
'0'
).
lower
()
in
(
"true"
,
"1"
)),
# vLLM will use all_to_all ep mode
"VLLM_USE_ALLTOALL_EP"
:
lambda
:
(
os
.
environ
.
get
(
"VLLM_USE_ALLTOALL_EP"
,
"True"
).
lower
()
in
(
"true"
,
"1"
)),
# vllm pd separation will be used async
"VLLM_P2P_ASYNC"
:
lambda
:
bool
(
int
(
os
.
getenv
(
"VLLM_P2P_ASYNC"
,
"0"
))),
...
...
vllm/model_executor/layers/fused_moe/ep_moe/ep_moe_utlis.py
deleted
100644 → 0
View file @
766663e6
import
math
from
typing
import
Callable
,
List
,
Optional
,
Tuple
,
Union
from
dataclasses
import
dataclass
import
torch
from
torch
import
nn
from
vllm.model_executor.layers.quantization.base_config
import
(
QuantizationConfig
,
QuantizeMethodBase
)
from
vllm.model_executor.layers.linear
import
(
ColumnParallelLinear
,
MergedColumnParallelLinear
,
ReplicatedLinear
,
RowParallelLinear
)
from
vllm.model_executor.layers.activation
import
SiluAndMul
from
vllm.distributed
import
(
get_dp_group
,
get_ep_group
,
get_tensor_model_parallel_rank
,
get_tensor_model_parallel_world_size
,
tensor_model_parallel_all_reduce
)
try
:
from
transformer_engine.pytorch.permutation
import
(
moe_permute
,
moe_sort_chunks_by_index
,
moe_unpermute
,
)
fused_permute
=
moe_permute
fused_unpermute
=
moe_unpermute
fused_sort_chunks_by_index
=
moe_sort_chunks_by_index
HAVE_TE
=
True
except
ImportError
:
fused_permute
=
None
fused_unpermute
=
None
fused_sort_chunks_by_index
=
None
HAVE_TE
=
False
shared_experts_overlap_stream
=
torch
.
cuda
.
Stream
()
@
dataclass
class
EpMoeConfig
:
moe_router_topk
:
int
=
2
moe_permute_fusion
:
bool
=
False
moe_shared_expert_overlap
:
bool
=
False
ep_size
:
int
=
1
num_moe_experts
:
int
=
256
apply_router_weight_on_input
:
bool
=
False
routed_scaling_factor
:
float
=
1.0
@
staticmethod
def
make
(
moe_router_topk
:
int
=
2
,
moe_permute_fusion
:
bool
=
False
,
moe_shared_expert_overlap
:
bool
=
False
,
ep_size
:
int
=
1
,
num_moe_experts
:
int
=
256
,
routed_scaling_factor
:
float
=
1.0
,
apply_router_weight_on_input
:
bool
=
False
)
->
"EpMoeConfig"
:
return
EpMoeConfig
(
moe_router_topk
=
moe_router_topk
,
moe_permute_fusion
=
moe_permute_fusion
,
moe_shared_expert_overlap
=
moe_shared_expert_overlap
,
ep_size
=
ep_size
,
num_moe_experts
=
num_moe_experts
,
routed_scaling_factor
=
routed_scaling_factor
,
apply_router_weight_on_input
=
apply_router_weight_on_input
)
class
EPSharedExperts
(
nn
.
Module
):
def
__init__
(
self
,
hidden_size
:
int
,
intermediate_size
:
int
,
hidden_act
:
str
,
quant_config
:
Optional
[
QuantizationConfig
]
=
None
,
reduce_results
:
bool
=
True
,
prefix
:
str
=
""
,
moe_shared_expert_overlap
:
bool
=
True
,
)
->
None
:
super
().
__init__
()
self
.
gate_up_proj
=
MergedColumnParallelLinear
(
hidden_size
,
[
intermediate_size
]
*
2
,
bias
=
False
,
quant_config
=
quant_config
,
prefix
=
f
"
{
prefix
}
.gate_up_proj"
)
self
.
down_proj
=
RowParallelLinear
(
intermediate_size
,
hidden_size
,
bias
=
False
,
quant_config
=
quant_config
,
reduce_results
=
reduce_results
,
prefix
=
f
"
{
prefix
}
.down_proj"
)
if
hidden_act
!=
"silu"
:
raise
ValueError
(
f
"Unsupported activation:
{
hidden_act
}
. "
"Only silu is supported for now."
)
self
.
act_fn
=
SiluAndMul
()
self
.
moe_shared_expert_overlap
=
moe_shared_expert_overlap
if
self
.
moe_shared_expert_overlap
:
self
.
cached_fc1_input
=
None
self
.
cached_fc2_input
=
None
self
.
cached_fc2_output
=
None
self
.
cached_output
=
None
self
.
gate_score
=
None
self
.
stream
=
shared_experts_overlap_stream
def
forward
(
self
,
x
):
gate_up
,
_
=
self
.
gate_up_proj
(
x
)
x
=
self
.
act_fn
(
gate_up
)
x
,
_
=
self
.
down_proj
(
x
)
return
x
def
linear_fc1_forward_and_act
(
self
,
overlapped_comm_output
=
None
):
"""
Do Linear FC1 and activation function forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert
self
.
moe_shared_expert_overlap
with
torch
.
cuda
.
stream
(
self
.
stream
):
# [s, b, 4 * h/p]
intermediate_parallel
,
bias_parallel
=
self
.
gate_up_proj
(
self
.
cached_fc1_input
)
self
.
cached_fc1_input
=
None
if
bias_parallel
is
not
None
:
intermediate_parallel
=
intermediate_parallel
+
bias_parallel
intermediate_parallel
=
self
.
act_fn
(
intermediate_parallel
)
self
.
cached_fc2_input
=
intermediate_parallel
def
linear_fc2_forward
(
self
,
overlapped_comm_output
=
None
):
"""
Do Linear FC2 forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert
self
.
moe_shared_expert_overlap
assert
self
.
cached_fc2_input
is
not
None
with
torch
.
cuda
.
stream
(
self
.
stream
):
# [s, b, h]
self
.
cached_fc2_output
,
_
=
self
.
down_proj
(
self
.
cached_fc2_input
)
self
.
cached_fc2_input
=
None
def
pre_forward_comm
(
self
,
input
):
"""
All Gather for SP before forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert
self
.
cached_output
is
None
self
.
stream
.
wait_stream
(
torch
.
cuda
.
current_stream
())
with
torch
.
cuda
.
stream
(
self
.
stream
):
self
.
cached_fc1_input
=
input
def
post_forward_comm
(
self
):
"""
Reduce scatter for SP after forward.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert
self
.
moe_shared_expert_overlap
assert
self
.
cached_fc2_output
is
not
None
with
torch
.
cuda
.
stream
(
self
.
stream
):
self
.
cached_output
=
tensor_model_parallel_all_reduce
(
self
.
cached_fc2_output
)
self
.
cached_fc2_output
=
None
def
get_output
(
self
):
"""
Gets the module forward output.
This function is used to overlap shared experts with the dispatcher.
It is only useful when --moe-shared-expert-overlap is set and may be changed.
"""
assert
self
.
moe_shared_expert_overlap
assert
self
.
cached_output
is
not
None
with
torch
.
cuda
.
stream
(
self
.
stream
):
output
=
self
.
cached_output
self
.
cached_output
=
None
torch
.
cuda
.
current_stream
().
wait_stream
(
self
.
stream
)
return
output
def
maybe_move_tensor_to_cpu
(
tensor
,
as_numpy
=
False
,
record_stream
=
False
):
"""Move a tensor to CPU if it is on GPU.
Args:
tensor (torch.Tensor or None): The tensor to move to CPU.
as_numpy (bool): Whether to convert the tensor to a numpy array.
record_stream (bool): Whether to record the stream of the tensor, to prevent memory leak
when the DtoH data transfer is on a side stream.
"""
if
torch
.
is_tensor
(
tensor
)
and
tensor
.
is_cuda
:
cpu_tensor
=
tensor
.
to
(
torch
.
device
(
"cpu"
),
non_blocking
=
True
)
if
as_numpy
:
cpu_tensor
=
cpu_tensor
.
numpy
()
if
record_stream
:
tensor
.
record_stream
(
torch
.
cuda
.
current_stream
())
tensor
=
cpu_tensor
return
tensor
def
sort_chunks_by_idxs
(
input
:
torch
.
Tensor
,
split_sizes
:
torch
.
Tensor
,
sorted_idxs
:
torch
.
Tensor
,
fused
:
bool
=
False
):
"""Split and sort the input tensor based on the split_sizes and sorted indices."""
if
fused
:
if
not
HAVE_TE
or
fused_sort_chunks_by_index
is
None
:
raise
ValueError
(
"fused_sort_chunks_by_index is not available. Please install TE >= 2.1.0."
)
return
fused_sort_chunks_by_index
(
input
,
split_sizes
,
sorted_idxs
)
input
=
torch
.
split
(
input
,
split_sizes
.
tolist
(),
dim
=
0
)
output
=
torch
.
cat
([
input
[
i
]
for
i
in
sorted_idxs
.
tolist
()],
dim
=
0
)
return
output
def
permute
(
tokens
,
routing_map
,
num_out_tokens
:
Optional
[
int
]
=
None
,
fused
:
bool
=
False
,
):
"""Permute the tokens and probs based on the mask.
Tokens with the same designated expert will be grouped together.
The shape of mask is [tokens, num_experts], it indicates which experts were selected
by each token.
Args:
tokens (torch.Tensor): The input token tensor, [num_tokens, hidden].
routing_map (torch.Tensor): The sparse token to expert mapping, [num_tokens, num_experts].
num_out_tokens (int, optional): The number of output tokens. If None, it's set to
the number of input tokens.
fused (bool, optional): Whether use the fused permute function.
"""
if
fused
:
if
not
HAVE_TE
or
fused_permute
is
None
:
raise
ValueError
(
"fused_permute is not available. Please install TE >= 2.1.0."
)
return
fused_permute
(
tokens
,
routing_map
,
num_out_tokens
)
num_tokens
,
hidden
=
tokens
.
shape
num_experts
=
routing_map
.
shape
[
1
]
# mask [num_tokens, num_experts] -> [num_experts, num_tokens]
routing_map
=
routing_map
.
bool
().
T
.
contiguous
()
# Create a dense expert-to-token mapping from the sparse token-to-expert mapping
token_indices
=
(
torch
.
arange
(
num_tokens
,
device
=
routing_map
.
device
).
unsqueeze
(
0
).
expand
(
num_experts
,
-
1
)
)
sorted_indices
=
token_indices
.
masked_select
(
routing_map
)
# use the mapping to permute the tokens
permuted_input
=
tokens
.
index_select
(
0
,
sorted_indices
)
return
permuted_input
,
sorted_indices
def
unpermute
(
permuted_tokens
:
torch
.
Tensor
,
sorted_indices
:
torch
.
Tensor
,
restore_shape
:
torch
.
Size
,
probs
:
torch
.
Tensor
=
None
,
routing_map
:
torch
.
Tensor
=
None
,
fused
:
bool
=
False
,
):
"""
Restore the original order of tokens after permutation. If probs are provided, it
will also apply them to the tokens before restoring the order.
This function exploits these features to use ops that support cuda graph.
Args:
permuted_tokens (torch.Tensor): The permuted token tensor.
sorted_indices (torch.Tensor): The indices used to sort the tokens.
restore_shape (torch.Size): The shape of the unpermuted tensor.
probs (torch.Tensor, optional): The unpermuted probs tensor,
routing_map (torch.Tensor, optional): Token to expert mapping, shape
[num_tokens, num_experts].
fused (bool, optional): Whether use the fused unpermute function.
Returns:
torch.Tensor: The tokens restored to their original order.
"""
if
fused
:
if
not
HAVE_TE
or
fused_unpermute
is
None
:
raise
ValueError
(
"fused_unpermute is not available. Please install TE >= 2.1.0."
)
return
fused_unpermute
(
permuted_tokens
,
sorted_indices
,
probs
,
restore_shape
)
_
,
hidden
=
restore_shape
input_dtype
=
permuted_tokens
.
dtype
if
probs
is
not
None
:
assert
routing_map
is
not
None
,
"Mask must be provided to permute the probs."
permuted_probs
=
probs
.
T
.
contiguous
().
masked_select
(
routing_map
.
T
.
contiguous
())
# Here may promote permuted_tokens to higher precision (fp32/fp64) if probs is in
# higher precision due to moe_router_dtype being enabled. This can lead to
# additional GPU memory usage. Use --moe-permute-fusion flag to avoid this extra memory
# allocation.
permuted_tokens
=
permuted_tokens
*
permuted_probs
.
unsqueeze
(
-
1
)
# Create an output tensor filled with zeros
output_tokens
=
torch
.
zeros
(
restore_shape
,
dtype
=
permuted_tokens
.
dtype
,
device
=
permuted_tokens
.
device
)
# Scatter add the permuted_input back to the original positions
output_tokens
.
scatter_add_
(
0
,
sorted_indices
.
unsqueeze
(
1
).
expand
(
-
1
,
hidden
),
permuted_tokens
)
return
output_tokens
.
to
(
dtype
=
input_dtype
)
def
all_to_all
(
group
,
input
,
output_split_sizes
,
input_split_sizes
):
world_size
=
torch
.
distributed
.
get_world_size
(
group
=
group
)
# Bypass the function if we are using only 1 GPU.
if
world_size
==
1
:
return
input
input
=
input
.
contiguous
()
if
output_split_sizes
is
None
:
# Equal split (all2all)
output
=
torch
.
empty_like
(
input
)
else
:
# Unequal split (all2all-v)
output
=
input
.
new_empty
(
size
=
[
sum
(
output_split_sizes
)]
+
list
(
input
.
size
()[
1
:]),
dtype
=
input
.
dtype
,
device
=
torch
.
cuda
.
current_device
(),
)
torch
.
distributed
.
all_to_all_single
(
output
,
input
,
output_split_sizes
=
output_split_sizes
,
input_split_sizes
=
input_split_sizes
,
group
=
group
,
)
return
output
vllm/model_executor/layers/fused_moe/ep_moe/layer.py
deleted
100644 → 0
View file @
766663e6
import
os
import
logging
from
typing
import
Callable
,
List
,
Optional
,
Tuple
from
dataclasses
import
dataclass
import
torch
import
torch.nn.functional
as
F
from
vllm.logger
import
init_logger
from
vllm.platforms
import
current_platform
from
vllm.model_executor.custom_op
import
CustomOp
from
vllm.forward_context
import
ForwardContext
,
get_forward_context
from
vllm.model_executor.layers.fused_moe.config
import
FusedMoEConfig
from
vllm.model_executor.layers.quantization.base_config
import
(
QuantizationConfig
,
QuantizeMethodBase
)
from
vllm.model_executor.layers.fused_moe
import
FusedMoE
from
vllm.model_executor.layers.fused_moe.layer
import
FusedMoEMethodBase
,
UnquantizedFusedMoEMethod
from
vllm.model_executor.layers.fused_moe.ep_moe.token_dispatcher
import
MoEAlltoAllTokenDispatcher
from
vllm.model_executor.layers.fused_moe.ep_moe.ep_moe_utlis
import
EpMoeConfig
from
vllm.utils
import
direct_register_custom_op
logger
=
init_logger
(
__name__
)
@
CustomOp
.
register
(
"unquantized_ep_moe"
)
class
UnquantizedEPGroupedGemmMethod
(
UnquantizedFusedMoEMethod
):
"""MoE method without quantization."""
def
__init__
(
self
,
moe
:
FusedMoEConfig
):
super
().
__init__
(
moe
)
self
.
topk_indices_dtype
=
None
self
.
moe
=
moe
self
.
rocm_aiter_moe_enabled
=
False
# is_rocm_aiter_moe_enabled()
def
apply_ep
(
self
,
layer
:
torch
.
nn
.
Module
,
hidden_states
:
torch
.
Tensor
,
tokens_per_expert
:
torch
.
Tensor
,
)
->
torch
.
Tensor
:
return
self
.
forward
(
hidden_states
=
hidden_states
,
layer
=
layer
,
tokens_per_expert
=
tokens_per_expert
)
def
forward_cuda
(
self
,
layer
:
torch
.
nn
.
Module
,
hidden_states
:
torch
.
Tensor
,
tokens_per_expert
:
torch
.
Tensor
,
)
->
torch
.
Tensor
:
# process MoE
def
custom_forward
(
layer
,
hidden_states
,
tokens_per_expert
):
tokens_per_expert
=
tokens_per_expert
.
cpu
().
numpy
()
outputs
=
[]
start_idx
=
0
for
i
,
num_tokens
in
enumerate
(
tokens_per_expert
):
end_idx
=
start_idx
+
num_tokens
if
num_tokens
==
0
:
continue
w1
=
layer
.
w13_weight
[
i
]
w2
=
layer
.
w2_weight
[
i
]
tokens_for_this_expert
=
hidden_states
[
start_idx
:
end_idx
]
gateup_output
=
torch
.
matmul
(
tokens_for_this_expert
,
w1
)
# Act
down_input
=
torch
.
zeros
(
gateup_output
.
shape
[
0
],
gateup_output
.
shape
[
1
]
//
2
,
device
=
gateup_output
.
device
,
dtype
=
hidden_states
.
dtype
)
torch
.
ops
.
_C
.
silu_and_mul
(
down_input
,
gateup_output
.
view
(
-
1
,
w1
.
shape
[
1
]))
expert_out
=
torch
.
matmul
(
down_input
,
w2
)
outputs
.
append
(
expert_out
)
start_idx
=
end_idx
if
len
(
outputs
)
>
0
:
expert_output
=
torch
.
cat
(
outputs
,
dim
=
0
)
else
:
assert
hidden_states
.
numel
()
==
0
,
f
"sorted_tokens: should be empty, but got
{
hidden_states
.
shape
}
"
expert_output
=
hidden_states
return
expert_output
output
=
custom_forward
(
layer
,
hidden_states
,
tokens_per_expert
)
return
output
def
forward_cpu
(
self
,
layer
:
torch
.
nn
.
Module
,
hidden_states
:
torch
.
Tensor
,
tokens_per_expert
:
torch
.
Tensor
,
**
kwargs
,
):
raise
NotImplementedError
def
forward_hpu
(
self
,
layer
:
torch
.
nn
.
Module
,
hidden_states
:
torch
.
Tensor
,
tokens_per_expert
:
torch
.
Tensor
,
)
->
torch
.
Tensor
:
raise
NotImplementedError
def
forward_tpu
(
self
,
layer
:
torch
.
nn
.
Module
,
hidden_states
:
torch
.
Tensor
,
tokens_per_expert
:
torch
.
Tensor
,
)
->
torch
.
Tensor
:
raise
NotImplementedError
if
current_platform
.
is_tpu
():
forward_native
=
forward_tpu
elif
current_platform
.
is_cpu
():
forward_native
=
forward_cpu
else
:
forward_native
=
forward_cuda
class
EPMoE
(
FusedMoE
):
"""
dp+ep MoE Expert Parallel Impl
"""
def
__init__
(
self
,
num_experts
:
int
,
# Global number of experts
top_k
:
int
,
hidden_size
:
int
,
intermediate_size
:
int
,
params_dtype
:
Optional
[
torch
.
dtype
]
=
None
,
reduce_results
:
bool
=
False
,
renormalize
:
bool
=
True
,
use_grouped_topk
:
bool
=
False
,
num_expert_group
:
Optional
[
int
]
=
None
,
topk_group
:
Optional
[
int
]
=
None
,
quant_config
:
Optional
[
QuantizationConfig
]
=
None
,
tp_size
:
Optional
[
int
]
=
None
,
ep_size
:
Optional
[
int
]
=
None
,
dp_size
:
Optional
[
int
]
=
None
,
prefix
:
str
=
""
,
custom_routing_function
:
Optional
[
Callable
]
=
None
,
scoring_func
:
str
=
"softmax"
,
e_score_correction_bias
:
Optional
[
torch
.
Tensor
]
=
None
,
apply_router_weight_on_input
:
bool
=
False
,
activation
:
str
=
"silu"
,
routed_scaling_factor
:
Optional
[
float
]
=
None
,
enable_eplb
:
bool
=
False
,
num_redundant_experts
:
int
=
0
,
moe_permute_fusion
:
bool
=
True
,
moe_shared_expert_overlap
:
bool
=
False
):
super
().
__init__
(
num_experts
,
top_k
,
hidden_size
,
intermediate_size
,
params_dtype
,
reduce_results
,
renormalize
,
use_grouped_topk
,
num_expert_group
,
topk_group
,
quant_config
,
tp_size
,
ep_size
,
dp_size
,
prefix
,
custom_routing_function
,
scoring_func
,
e_score_correction_bias
,
apply_router_weight_on_input
,
activation
,
routed_scaling_factor
=
routed_scaling_factor
,
enable_eplb
=
enable_eplb
,
num_redundant_experts
=
num_redundant_experts
,
)
self
.
ep_moe_config
:
EpMoeConfig
=
EpMoeConfig
.
make
(
moe_router_topk
=
self
.
top_k
,
# TODO: support fusion permute
moe_permute_fusion
=
moe_permute_fusion
,
moe_shared_expert_overlap
=
moe_shared_expert_overlap
,
ep_size
=
self
.
ep_size
,
num_moe_experts
=
self
.
global_num_experts
,
routed_scaling_factor
=
self
.
routed_scaling_factor
,
apply_router_weight_on_input
=
self
.
apply_router_weight_on_input
)
local_expert_indices_offset
=
(
self
.
ep_rank
*
self
.
local_num_experts
)
self
.
local_expert_indices
=
[
local_expert_indices_offset
+
i
for
i
in
range
(
self
.
local_num_experts
)
]
self
.
use_shared_expert
=
False
self
.
token_dispatcher
=
MoEAlltoAllTokenDispatcher
(
self
.
local_num_experts
,
self
.
local_expert_indices
,
config
=
self
.
ep_moe_config
)
self
.
shared_expert_overlap
=
moe_shared_expert_overlap
self
.
shared_experts
=
None
self
.
dpsk_fp16_quick
=
os
.
environ
.
get
(
'DPSK_FP16_QUICK'
)
==
'1'
def
set_shared_experts
(
self
,
shared_experts
:
torch
.
nn
.
Module
):
if
self
.
shared_experts
is
None
:
self
.
shared_experts
=
shared_experts
if
self
.
shared_expert_overlap
:
self
.
token_dispatcher
.
set_shared_experts
(
self
.
shared_experts
)
def
create_quant_method
(
self
,
moe
,
quant_config
,
prefix
):
# Note: get_quant_method will look at the layer's local_num_experts
# for heuristic purposes, so it must be initialized first.
quant_method
:
Optional
[
QuantizeMethodBase
]
=
None
quant_method
=
(
UnquantizedEPGroupedGemmMethod
(
moe
)
if
quant_config
is
None
else
quant_config
.
get_quant_method
(
self
,
prefix
))
assert
quant_method
is
not
None
assert
isinstance
(
quant_method
,
FusedMoEMethodBase
)
return
quant_method
def
forward
(
self
,
hidden_states
:
torch
.
Tensor
,
router_logits
:
torch
.
Tensor
):
return
torch
.
ops
.
vllm
.
ep_moe_forward
(
hidden_states
,
router_logits
,
self
.
layer_name
)
def
forward_impl
(
self
,
hidden_states
:
torch
.
Tensor
,
router_logits
:
torch
.
Tensor
):
topk_weights
,
topk_ids
=
self
.
select_experts
(
hidden_states
=
hidden_states
,
router_logits
=
router_logits
,
use_grouped_topk
=
self
.
use_grouped_topk
,
top_k
=
self
.
top_k
,
renormalize
=
self
.
renormalize
,
topk_group
=
self
.
topk_group
,
num_expert_group
=
self
.
num_expert_group
,
custom_routing_function
=
self
.
custom_routing_function
,
scoring_func
=
self
.
scoring_func
,
e_score_correction_bias
=
self
.
e_score_correction_bias
,
indices_type
=
torch
.
int64
,
routed_scaling_factor
=
self
.
routed_scaling_factor
,
use_fused_gate
=
self
.
use_fused_gate
)
if
not
self
.
ep_moe_config
.
moe_shared_expert_overlap
and
self
.
shared_experts
is
not
None
:
shared_output
=
self
.
shared_experts
(
hidden_states
)
probs
=
torch
.
zeros_like
(
router_logits
,
dtype
=
topk_weights
.
dtype
).
scatter
(
1
,
topk_ids
,
topk_weights
)
routing_map
=
torch
.
zeros_like
(
router_logits
).
int
().
scatter
(
1
,
topk_ids
,
1
).
bool
()
(
dispatched_input
,
tokens_per_expert
)
=
self
.
token_dispatcher
.
token_permutation
(
hidden_states
,
probs
,
routing_map
)
# Matrix multiply.
expert_output
=
self
.
quant_method
.
apply_ep
(
layer
=
self
,
hidden_states
=
dispatched_input
,
tokens_per_expert
=
tokens_per_expert
)
final_hidden_states
=
self
.
token_dispatcher
.
token_unpermutation
(
expert_output
)
if
not
self
.
ep_moe_config
.
moe_shared_expert_overlap
and
self
.
shared_experts
is
not
None
:
# if shared_expert_overlap is True, the expert calculation happens in
# the token_dispatcher to overlap communications and computations
shared_output
=
(
self
.
maybe_all_reduce_tensor_model_parallel
(
shared_output
))
if
hidden_states
.
dtype
!=
torch
.
float16
or
self
.
dpsk_fp16_quick
:
final_hidden_states
=
final_hidden_states
+
shared_output
else
:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states
=
final_hidden_states
+
shared_output
\
*
(
1.
/
self
.
routed_scaling_factor
)
return
final_hidden_states
def
ep_moe_forward
(
hidden_states
:
torch
.
Tensor
,
router_logits
:
torch
.
Tensor
,
layer_name
:
str
)
->
torch
.
Tensor
:
forward_context
:
ForwardContext
=
get_forward_context
()
self
=
forward_context
.
no_compile_layers
[
layer_name
]
assert
self
.
quant_method
is
not
None
return
self
.
forward_impl
(
hidden_states
,
router_logits
)
def
ep_moe_forward_fake
(
hidden_states
:
torch
.
Tensor
,
router_logits
:
torch
.
Tensor
,
layer_name
:
str
)
->
torch
.
Tensor
:
return
torch
.
empty_like
(
hidden_states
)
direct_register_custom_op
(
op_name
=
"ep_moe_forward"
,
op_func
=
ep_moe_forward
,
mutates_args
=
[
"hidden_states"
],
fake_impl
=
ep_moe_forward_fake
,
dispatch_key
=
current_platform
.
dispatch_key
,
tags
=
(
torch
.
Tag
.
needs_fixed_stride_order
,
),
)
\ No newline at end of file
vllm/model_executor/layers/fused_moe/ep_moe/token_dispatcher.py
deleted
100644 → 0
View file @
766663e6
import
os
from
abc
import
ABC
,
abstractmethod
from
typing
import
List
,
Optional
,
Tuple
import
torch
from
vllm.distributed.parallel_state
import
(
get_dp_group
,
get_tp_group
,
get_ep_group
,
get_tensor_model_parallel_rank
)
from
vllm.model_executor.layers.fused_moe.ep_moe.ep_moe_utlis
import
(
EPSharedExperts
,
maybe_move_tensor_to_cpu
,
permute
,
sort_chunks_by_idxs
,
unpermute
,
all_to_all
,
EpMoeConfig
)
from
vllm.distributed
import
(
tensor_model_parallel_all_gather
,
tensor_model_parallel_gather
,
expert_parallel_all_gather
,
expert_parallel_gather
)
from
vllm.platforms
import
current_platform
cuda_dtoh_stream
=
torch
.
cuda
.
Stream
()
cuda_dtoh_sync_event
=
torch
.
cuda
.
Event
(
enable_timing
=
False
)
class
MoETokenDispatcher
:
"""
MoE Token Dispatcher
"""
def
__init__
(
self
,
config
:
EpMoeConfig
)
->
None
:
"""
Initialize the MoE Token Dispatcher.
"""
self
.
config
=
config
self
.
tp_size
=
1
self
.
ep_size
=
config
.
ep_size
@
property
def
ep_group
(
self
):
"""Get expert model parallel group."""
return
get_ep_group
()
@
property
def
tp_group
(
self
):
"""Get expert tensor parallel group."""
return
get_tp_group
()
@
property
def
tp_rank
(
self
):
"""Get expert tensor parallel rank."""
return
0
#get_tensor_model_parallel_rank()
@
property
def
tp_ep_group
(
self
):
"""Get expert tensor and model parallel group."""
return
get_ep_group
()
@
abstractmethod
def
token_permutation
(
self
,
tokens
:
torch
.
Tensor
,
probs
:
torch
.
Tensor
,
routing_map
:
torch
.
Tensor
):
"""Dispatch tokens to experts.
Args:
tokens (torch.Tensor): Input tokens.
probs (torch.Tensor): The routing probability tensor [num_tokens, num_experts].
routing_map (torch.Tensor): Token to expert mapping tensor.
Returns:
torch.Tensor: Tokens tensor.
"""
raise
NotImplementedError
(
"Dispatch function not implemented."
)
@
abstractmethod
def
token_unpermutation
(
self
,
expert_output
:
torch
.
Tensor
,
bias
:
torch
.
Tensor
=
None
):
"""Restores the expert output to its original ordering.
Args:
expert_output (torch.Tensor): The output tensor from the expert models.
bias (torch.Tensor): The bias tensor.
Returns:
(torch.Tensor, torch.Tensor): Unpermuted activation and optional bias.
"""
raise
NotImplementedError
(
"Restore function not implemented."
)
def
set_shared_experts
(
self
,
shared_experts
):
"""Set shared expert to the dispatcher."""
assert
self
.
config
.
moe_shared_expert_overlap
self
.
shared_experts
=
shared_experts
class
MoEAlltoAllTokenDispatcher
(
MoETokenDispatcher
):
"""
AlltoAll-based token dispatcher.
The workflow of AlltoAll token dispatcher is as follows:
(1) preprocess(): calculate necessary metadata for communication and permute
(2) token_permutation(): permute->A2A(EP)->AG(TP)->sort_chunk(if num_local_experts>1)
(3) token_unpermutation(): sort_chunk(if num_local_experts>1)->RS(TP)->A2A(EP)->unpermute
"""
def
__init__
(
self
,
num_local_experts
:
int
,
local_expert_indices
:
List
[
int
],
config
:
EpMoeConfig
)
->
None
:
"""
Initialize the AlltoAll token dispatcher.
Args:
num_local_experts (int): Number of local experts on the current device.
local_expert_indices (List[int]): Indices of local experts on the current device.
config (TransformerConfig): Configuration for the transformer model.
"""
super
().
__init__
(
config
=
config
)
self
.
num_local_experts
=
num_local_experts
assert
config
.
num_moe_experts
is
not
None
self
.
num_experts
=
config
.
num_moe_experts
assert
self
.
num_local_experts
>
0
,
"Expected at least one expert"
self
.
local_expert_indices
=
local_expert_indices
assert
(
len
(
self
.
local_expert_indices
)
==
self
.
num_local_experts
),
"Invalid local expert indices"
for
i
in
range
(
len
(
self
.
local_expert_indices
)
-
1
):
assert
(
self
.
local_expert_indices
[
i
]
==
self
.
local_expert_indices
[
i
+
1
]
-
1
),
"local_expert_indices must be continous"
# [ep_size]. Represents the number of tokens sent by the current rank to other
# EP ranks.
self
.
input_splits
=
None
# [ep_size]. Represents the number of tokens received by the current rank from
# other EP ranks.
self
.
output_splits
=
None
# [tp_size]. Represents the number of tokens received by the current rank from
# other TP ranks.
#self.output_splits_tp = None
self
.
permute_idx_device
=
torch
.
device
(
"cuda"
)
if
self
.
config
.
moe_permute_fusion
else
None
input_chunk_idxs
=
torch
.
arange
(
self
.
num_experts
*
self
.
tp_size
,
device
=
self
.
permute_idx_device
)
# [num_local_experts, tp_size * ep_size]. Sort the input chunks by local experts.
self
.
sort_input_by_local_experts
=
input_chunk_idxs
.
reshape
(
-
1
,
self
.
num_local_experts
).
T
.
ravel
()
# [tp_size * ep_size, num_local_experts]. Restore the output chunks by local experts.
self
.
restore_output_by_local_experts
=
input_chunk_idxs
.
reshape
(
self
.
num_local_experts
,
-
1
).
T
.
ravel
()
# A cuda stream synchronization is needed in self.token_permutation() in some cases,
# because there are several non-blocking DtoH data transfers called at
# `self.cuda_dtoh_point`. The synchronization happens at `self.cuda_sync_point`, which is
# decided based on the MoE and parallel settings. Valid points are "before_permutation_1",
# "before_ep_alltoall", "before_permutation_2", "before_finish", and "no_sync".
self
.
cuda_sync_point
=
"no_sync"
self
.
cuda_sync_point_priority
=
{
"before_permutation_1"
:
0
,
"before_ep_alltoall"
:
1
,
"before_permutation_2"
:
2
,
"before_finish"
:
3
,
"no_sync"
:
4
,
}
self
.
cuda_dtoh_point
=
"before_permutation_1"
#self.cuda_dtoh_stream = torch.cuda.Stream()
# Whether to use gather or all-gather to gather the logits.
self
.
use_all_gather
=
current_platform
.
use_all_gather
()
self
.
probs
=
None
self
.
dpsk_fp16_quick
=
os
.
environ
.
get
(
'DPSK_FP16_QUICK'
)
==
'1'
def
preprocess
(
self
,
routing_map
:
torch
.
Tensor
)
->
torch
.
Tensor
:
"""
Preprocess token routing map for AlltoAll communication and token permutation.
This method computes the number of tokens assigned to each expert based on the routing_map.
It also initializes the necessary data structures for AlltoAll communication, such as input
and output splits, and the mapping between global tokens and local experts. This method
should not call any DtoH data copying due to performance consideration. The necessary DtoH
copies are made on the `self.cuda_dtoh_stream` at `self.cuda_dtoh_point`.
Args:
routing_map (torch.Tensor): The mapping of tokens to experts, with shape
[num_tokens, num_experts].
Returns:
torch.Tensor: Tensor containing the number of tokens assigned to local expert.
"""
# [num_experts], number of tokens assigned to each expert from the current rank's input.
num_local_tokens_per_expert
=
routing_map
.
sum
(
dim
=
0
).
long
()
self
.
num_out_tokens
=
routing_map
.
size
(
0
)
*
self
.
config
.
moe_router_topk
if
self
.
ep_size
>
1
or
self
.
tp_size
>
1
:
# ===================================================
# Calculate input_splits, output_splits for alltoall/allgather in variable size.
# ===================================================
# [ep_size]. Represents the number of tokens sent by the current rank to other
# EP ranks.
self
.
input_splits
=
num_local_tokens_per_expert
.
reshape
(
self
.
ep_size
,
self
.
num_local_experts
).
sum
(
axis
=
1
)
# Gather the global distribution of tokens across ranks.
# num_global_tokens_per_expert represents the number of tokens sent to each
# expert by all ranks.
# [tp_size, ep_size, num_experts]
if
self
.
use_all_gather
:
# Gather is not supported for some devices such as TPUs.
# Use all-gather instead.
num_global_tokens_per_expert
=
expert_parallel_all_gather
(
num_local_tokens_per_expert
)
\
.
reshape
(
self
.
ep_size
,
self
.
tp_size
,
self
.
num_experts
)
\
.
transpose
(
0
,
1
)
else
:
# None may be returned for rank > 0
num_global_tokens_per_expert
=
expert_parallel_gather
(
num_local_tokens_per_expert
)
\
.
reshape
(
self
.
ep_size
,
self
.
tp_size
,
self
.
num_experts
)
\
.
transpose
(
0
,
1
)
# [tp_size, ep_size, num_experts] -> [tp_size, ep_size, num_local_experts]
num_global_tokens_per_local_expert
=
num_global_tokens_per_expert
[
:,
:,
self
.
local_expert_indices
[
0
]
:
self
.
local_expert_indices
[
-
1
]
+
1
].
contiguous
()
# [tp_size, ep_size, num_local_experts] -> [tp_size, ep_size]
num_global_tokens_per_rank
=
num_global_tokens_per_local_expert
.
sum
(
axis
=
2
)
# [tp_size, ep_size] -> [ep_size]
# self.output_splits represents the number of tokens received by the current rank
# from other EP rank.
self
.
output_splits
=
num_global_tokens_per_rank
[
self
.
tp_rank
]
# [tp_size, ep_size] -> [tp_size]
# self.output_splits_tp represents the number of tokens received by the current
# rank from other TP rank.
#self.output_splits_tp = num_global_tokens_per_rank.sum(axis=1)
# [tp_size, ep_size, num_local_experts] -> [num_local_experts]
num_tokens_per_local_expert
=
num_global_tokens_per_local_expert
.
sum
(
dim
=
(
0
,
1
))
# A synchronization is needed before expert parallel AlltoAll communication
# to get the `input_splits` and `output_splits` CPU values.
self
.
_maybe_update_cuda_sync_point
(
"before_ep_alltoall"
)
else
:
num_global_tokens_per_local_expert
=
num_local_tokens_per_expert
.
reshape
(
self
.
num_experts
)
num_tokens_per_local_expert
=
num_local_tokens_per_expert
# A synchronization is needed before the returns
# to get the `num_tokens_per_local_expert` CPU value.
self
.
_maybe_update_cuda_sync_point
(
"before_finish"
)
if
self
.
num_local_experts
>
1
:
# [tp_size * ep_size, num_local_experts]. Represents the number of tokens sent
# to each local expert by all ranks.
self
.
num_global_tokens_per_local_expert
=
num_global_tokens_per_local_expert
.
view
(
-
1
,
self
.
num_local_experts
)
if
not
self
.
config
.
moe_permute_fusion
:
# A synchronization is needed before permutation 2
# to get the `num_global_tokens_per_local_expert` CPU value.
self
.
_maybe_update_cuda_sync_point
(
"before_permutation_2"
)
assert
(
self
.
cuda_sync_point_priority
[
self
.
cuda_dtoh_point
]
<=
self
.
cuda_sync_point_priority
[
self
.
cuda_sync_point
]
),
"cuda_sync_point must be after cuda_dtoh_point."
return
num_tokens_per_local_expert
def
token_permutation
(
self
,
hidden_states
:
torch
.
Tensor
,
probs
:
torch
.
Tensor
,
routing_map
:
torch
.
Tensor
,
)
->
Tuple
[
torch
.
Tensor
,
torch
.
Tensor
]:
"""
Dispatch tokens to local experts using AlltoAll communication.
This method performs the following steps:
1. Preprocess the routing map to get metadata for communication and permutation.
2. Permute input tokens for AlltoAll communication.
3. Perform expert parallel AlltoAll communication.
4. Sort tokens by local expert (if multiple local experts exist).
Args:
hidden_states (torch.Tensor): Input token embeddings.
probs (torch.Tensor): The probabilities of token to experts assignment.
routing_map (torch.Tensor): The mapping of token to experts assignment.
Returns:
Tuple[torch.Tensor, torch.Tensor]:
- Permuted token embeddings for local experts.
- Number of tokens per expert.
"""
# Preprocess: Get the metadata for communication, permutation and computation operations.
self
.
hidden_shape
=
hidden_states
.
shape
if
self
.
config
.
apply_router_weight_on_input
:
self
.
probs
=
probs
self
.
routing_map
=
routing_map
assert
probs
.
dim
()
==
2
,
"Expected 2D tensor for probs"
assert
routing_map
.
dim
()
==
2
,
"Expected 2D tensor for token2expert mask"
assert
routing_map
.
dtype
==
torch
.
bool
,
"Expected bool tensor for mask"
hidden_states
=
hidden_states
.
view
(
-
1
,
self
.
hidden_shape
[
-
1
])
tokens_per_expert
=
self
.
preprocess
(
self
.
routing_map
)
if
self
.
config
.
moe_shared_expert_overlap
and
self
.
shared_experts
is
not
None
:
self
.
shared_experts
.
pre_forward_comm
(
hidden_states
.
view
(
self
.
hidden_shape
))
# Permutation 1: input to AlltoAll input
tokens_per_expert
=
self
.
_maybe_dtoh_and_synchronize
(
"before_permutation_1"
,
tokens_per_expert
)
self
.
hidden_shape_before_permute
=
hidden_states
.
shape
permutated_local_input_tokens
,
self
.
reversed_local_input_permutation_mapping
=
permute
(
hidden_states
,
routing_map
,
num_out_tokens
=
self
.
num_out_tokens
,
fused
=
self
.
config
.
moe_permute_fusion
)
# Perform expert parallel AlltoAll communication
# tokens_per_expert = self._maybe_dtoh_and_synchronize(
# "before_ep_alltoall", tokens_per_expert
# )
###test##############
#cuda_dtoh_stream.synchronize()
cuda_dtoh_sync_event
.
synchronize
()
###test##############
global_input_tokens
=
all_to_all
(
self
.
ep_group
.
device_group
,
permutated_local_input_tokens
,
self
.
output_splits
,
self
.
input_splits
)
if
self
.
config
.
moe_shared_expert_overlap
and
self
.
shared_experts
is
not
None
:
self
.
shared_experts
.
linear_fc1_forward_and_act
(
global_input_tokens
)
# Permutation 2: Sort tokens by local expert.
# tokens_per_expert = self._maybe_dtoh_and_synchronize(
# "before_permutation_2", tokens_per_expert
# )
if
self
.
num_local_experts
>
1
:
global_input_tokens
=
sort_chunks_by_idxs
(
global_input_tokens
,
self
.
num_global_tokens_per_local_expert
.
ravel
(),
self
.
sort_input_by_local_experts
,
fused
=
self
.
config
.
moe_permute_fusion
,
)
#tokens_per_expert = self._maybe_dtoh_and_synchronize("before_finish", tokens_per_expert)
return
global_input_tokens
,
tokens_per_expert
def
token_unpermutation
(
self
,
hidden_states
:
torch
.
Tensor
,
)
->
Tuple
[
torch
.
Tensor
,
Optional
[
torch
.
Tensor
]]:
"""
Reverse the token permutation to restore the original order.
This method performs the following steps:
1. Unsort tokens by local expert (if multiple local experts exist).
2. Perform expert parallel AlltoAll communication to restore the original order.
3. Unpermute tokens to restore the original order.
Args:
hidden_states (torch.Tensor): Output from local experts.
bias (torch.Tensor, optional): Bias tensor (not supported).
Returns:
Tuple[torch.Tensor, Optional[torch.Tensor]]:
- Unpermuted token embeddings in the original order.
- None (bias is not supported).
"""
# Unpermutation 2: Unsort tokens by local expert.
if
self
.
num_local_experts
>
1
:
hidden_states
=
sort_chunks_by_idxs
(
hidden_states
,
self
.
num_global_tokens_per_local_expert
.
T
.
ravel
(),
self
.
restore_output_by_local_experts
,
fused
=
self
.
config
.
moe_permute_fusion
,
)
# Perform expert parallel AlltoAll communication
# hidden_states: [SEQL, H] -> [SEQL, H/TP]
permutated_local_input_tokens
=
all_to_all
(
self
.
ep_group
.
device_group
,
hidden_states
,
self
.
input_splits
,
self
.
output_splits
)
if
self
.
config
.
moe_shared_expert_overlap
and
self
.
shared_experts
is
not
None
:
self
.
shared_experts
.
linear_fc2_forward
(
permutated_local_input_tokens
)
self
.
shared_experts
.
post_forward_comm
()
# Unpermutation 1: AlltoAll output to output
output
=
unpermute
(
permutated_local_input_tokens
,
self
.
reversed_local_input_permutation_mapping
,
restore_shape
=
self
.
hidden_shape_before_permute
,
probs
=
self
.
probs
,
routing_map
=
self
.
routing_map
,
fused
=
self
.
config
.
moe_permute_fusion
,
)
# Reshape the output tensor
output
=
output
.
view
(
self
.
hidden_shape
)
# Add shared experts output
if
self
.
config
.
moe_shared_expert_overlap
and
self
.
shared_experts
is
not
None
:
shared_output
=
self
.
shared_experts
.
get_output
()
if
hidden_states
.
dtype
!=
torch
.
float16
or
self
.
dpsk_fp16_quick
:
output
=
output
+
shared_output
else
:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
output
=
output
+
shared_output
\
*
(
1.
/
self
.
config
.
routed_scaling_factor
)
return
output
def
_maybe_update_cuda_sync_point
(
self
,
point
:
str
):
"""
Update the CUDA sync point if the priority of the new point is higher than the current
sync point, which means the new point is reached earlier than the current sync point.
"""
if
(
self
.
cuda_sync_point_priority
[
point
]
<
self
.
cuda_sync_point_priority
[
self
.
cuda_sync_point
]
):
self
.
cuda_sync_point
=
point
def
_maybe_dtoh_and_synchronize
(
self
,
point
:
str
,
tokens_per_expert
:
torch
.
Tensor
=
None
)
->
torch
.
Tensor
:
"""
Move all possible GPU tensors to CPU and make a synchronization at the expected point.
"""
if
point
==
self
.
cuda_dtoh_point
:
# Move all possible GPU tensors to CPU at self.cuda_dtoh_point.
on_side_stream
=
torch
.
cuda
.
current_stream
()
!=
cuda_dtoh_stream
if
on_side_stream
:
cuda_dtoh_stream
.
wait_stream
(
torch
.
cuda
.
current_stream
())
with
torch
.
cuda
.
stream
(
cuda_dtoh_stream
):
# TODO: use MemcpyBatchAsync instead.
# tokens_per_expert = maybe_move_tensor_to_cpu(
# tokens_per_expert, record_stream=on_side_stream
# )
self
.
input_splits
=
maybe_move_tensor_to_cpu
(
self
.
input_splits
,
as_numpy
=
True
,
record_stream
=
on_side_stream
)
self
.
output_splits
=
maybe_move_tensor_to_cpu
(
self
.
output_splits
,
as_numpy
=
True
,
record_stream
=
on_side_stream
)
# self.output_splits_tp = maybe_move_tensor_to_cpu(
# self.output_splits_tp, as_numpy=True, record_stream=on_side_stream
# )
self
.
num_out_tokens
=
maybe_move_tensor_to_cpu
(
self
.
num_out_tokens
,
record_stream
=
on_side_stream
)
if
self
.
num_local_experts
>
1
and
not
self
.
config
.
moe_permute_fusion
:
self
.
num_global_tokens_per_local_expert
=
maybe_move_tensor_to_cpu
(
self
.
num_global_tokens_per_local_expert
,
record_stream
=
on_side_stream
)
cuda_dtoh_sync_event
.
record
()
# if point == self.cuda_sync_point:
# # Synchronize with the dtoh stream at self.cuda_sync_point.
# cuda_dtoh_stream.synchronize()
return
tokens_per_expert
\ No newline at end of file
vllm/model_executor/layers/fused_moe/layer.py
View file @
0b467604
...
...
@@ -779,7 +779,11 @@ class FusedMoE(torch.nn.Module):
self
.
moe_config
=
moe
self
.
quant_config
=
quant_config
quant_method
=
self
.
create_quant_method
(
moe
,
quant_config
,
prefix
)
# Note: get_quant_method will look at the layer's local_num_experts
# for heuristic purposes, so it must be initialized first.
quant_method
:
Optional
[
QuantizeMethodBase
]
=
None
quant_method
=
(
UnquantizedFusedMoEMethod
(
moe
)
if
quant_config
is
None
else
quant_config
.
get_quant_method
(
self
,
prefix
))
assert
quant_method
is
not
None
assert
isinstance
(
quant_method
,
FusedMoEMethodBase
)
...
...
@@ -854,17 +858,6 @@ class FusedMoE(torch.nn.Module):
dtype
=
moe
.
in_dtype
,
device
=
torch
.
cuda
.
current_device
())
def
create_quant_method
(
self
,
moe
,
quant_config
,
prefix
):
# Note: get_quant_method will look at the layer's local_num_experts
# for heuristic purposes, so it must be initialized first.
quant_method
:
Optional
[
QuantizeMethodBase
]
=
None
quant_method
=
(
UnquantizedFusedMoEMethod
(
moe
)
if
quant_config
is
None
else
quant_config
.
get_quant_method
(
self
,
prefix
))
assert
quant_method
is
not
None
assert
isinstance
(
quant_method
,
FusedMoEMethodBase
)
return
quant_method
@
property
def
tp_size
(
self
):
return
self
.
moe_parallel_config
.
tp_size
...
...
vllm/model_executor/layers/quantization/slimquant_w4a8.py
View file @
0b467604
...
...
@@ -24,11 +24,6 @@ from vllm import _custom_ops as ops
from
vllm
import
envs
try
:
from
lmslim.layers.fused_moe.fuse_moe_w4a8
import
fused_experts_impl_w4a8_ep
except
Exception
:
print
(
"INFO: Please install lmslim if you want to infer the quantitative model of moe.
\n
"
)
W8A8_TRITONJSON
=
W8a8GetCacheJSON
()
def
baseline_scaled_mm
(
a
:
torch
.
Tensor
,
...
...
@@ -343,19 +338,6 @@ class SlimQuantW4A8Int8MoEMethod:
layer
.
w2_weight_scale
.
data
,
requires_grad
=
False
)
def
apply_ep
(
#dp+ep
self
,
layer
:
torch
.
nn
.
Module
,
hidden_states
:
torch
.
Tensor
,
tokens_per_expert
:
torch
.
Tensor
,
)
->
torch
.
Tensor
:
return
fused_experts_impl_w4a8_ep
(
hidden_states
,
layer
.
w13_weight
,
layer
.
w2_weight
,
layer
.
w13_weight_scale
,
layer
.
w2_weight_scale
,
tokens_per_expert
)
def
apply
(
# tp
self
,
...
...
vllm/model_executor/models/deepseek_mtp.py
View file @
0b467604
...
...
@@ -11,7 +11,6 @@ import torch
import
torch.nn
as
nn
from
transformers
import
PretrainedConfig
import
vllm.envs
as
envs
from
vllm.config
import
CacheConfig
,
ModelConfig
,
VllmConfig
from
vllm.model_executor.layers.fused_moe
import
FusedMoE
from
vllm.model_executor.layers.layernorm
import
RMSNorm
...
...
@@ -25,7 +24,6 @@ from vllm.sequence import IntermediateTensors
from
vllm.compilation.decorators
import
support_torch_compile
from
.deepseek_v2
import
(
DeepseekV2DecoderLayer
,
get_spec_layer_idx_from_weight_name
)
from
vllm.distributed
import
get_dp_group
from
.interfaces
import
SupportsPP
from
.utils
import
maybe_prefix
from
vllm
import
_custom_ops
as
ops
...
...
@@ -176,10 +174,6 @@ class DeepSeekMTP(nn.Module, SupportsPP):
prefix
,
"model"
))
self
.
use_llama_nn
=
os
.
environ
.
get
(
'LLAMA_NN'
)
==
'1'
parallel_config
=
vllm_config
.
parallel_config
dp_size
=
get_dp_group
().
world_size
self
.
use_all2all_ep
=
envs
.
VLLM_USE_ALLTOALL_EP
and
dp_size
>
1
and
parallel_config
.
enable_expert_parallel
def
forward
(
self
,
...
...
@@ -211,10 +205,6 @@ class DeepSeekMTP(nn.Module, SupportsPP):
(
"gate_up_proj"
,
"up_proj"
,
1
),
]
if
self
.
use_all2all_ep
:
ep_moe_shared_experts_keys
=
"mlp.shared_experts"
ep_moe_shared_experts_mapping
=
{
ep_moe_shared_experts_keys
:
"mlp.experts.shared_experts"
}
expert_params_mapping
=
FusedMoE
.
make_expert_params_mapping
(
ckpt_gate_proj_name
=
"gate_proj"
,
ckpt_down_proj_name
=
"down_proj"
,
...
...
@@ -244,8 +234,6 @@ class DeepSeekMTP(nn.Module, SupportsPP):
continue
name
=
name
.
replace
(
weight_name
,
param_name
)
if
self
.
use_all2all_ep
:
name
=
name
.
replace
(
ep_moe_shared_experts_keys
,
ep_moe_shared_experts_mapping
[
ep_moe_shared_experts_keys
])
# Skip loading extra bias for GPTQ models.
if
name
.
endswith
(
".bias"
)
and
name
not
in
params_dict
:
continue
...
...
@@ -261,9 +249,6 @@ class DeepSeekMTP(nn.Module, SupportsPP):
continue
name
=
name
.
replace
(
weight_name
,
param_name
)
if
self
.
use_all2all_ep
:
name
=
name
.
replace
(
ep_moe_shared_experts_keys
,
ep_moe_shared_experts_mapping
[
ep_moe_shared_experts_keys
])
param
=
params_dict
[
name
]
weight_loader
=
param
.
weight_loader
weight_loader
(
param
,
...
...
@@ -273,8 +258,6 @@ class DeepSeekMTP(nn.Module, SupportsPP):
expert_id
=
expert_id
)
break
else
:
if
self
.
use_all2all_ep
:
name
=
name
.
replace
(
ep_moe_shared_experts_keys
,
ep_moe_shared_experts_mapping
[
ep_moe_shared_experts_keys
])
# Skip loading extra bias for GPTQ models.
if
name
.
endswith
(
".bias"
)
and
name
not
in
params_dict
:
continue
...
...
vllm/model_executor/models/deepseek_v2.py
View file @
0b467604
...
...
@@ -43,8 +43,6 @@ from vllm.distributed import (get_ep_group, get_pp_group, get_dp_group,
get_tensor_model_parallel_world_size
)
from
vllm.model_executor.layers.activation
import
SiluAndMul
from
vllm.model_executor.layers.fused_moe
import
FusedMoE
from
vllm.model_executor.layers.fused_moe.ep_moe.layer
import
EPMoE
from
vllm.model_executor.layers.fused_moe.ep_moe.ep_moe_utlis
import
EPSharedExperts
from
vllm.model_executor.layers.layernorm
import
RMSNorm
from
vllm.model_executor.layers.linear
import
(
ColumnParallelLinear
,
MergedColumnParallelLinear
,
...
...
@@ -168,11 +166,7 @@ class DeepseekV2MoE(nn.Module):
self
.
physical_expert_end
=
(
self
.
physical_expert_start
+
self
.
n_local_physical_experts
)
dp_size
=
get_dp_group
().
world_size
self
.
use_all2all_ep
=
envs
.
VLLM_USE_ALLTOALL_EP
and
dp_size
>
1
and
parallel_config
.
enable_expert_parallel
moe_cls
=
FusedMoE
if
not
self
.
use_all2all_ep
else
EPMoE
self
.
experts
=
moe_cls
(
self
.
experts
=
FusedMoE
(
num_experts
=
config
.
n_routed_experts
,
top_k
=
config
.
num_experts_per_tok
,
hidden_size
=
config
.
hidden_size
,
...
...
@@ -193,8 +187,7 @@ class DeepseekV2MoE(nn.Module):
if
config
.
n_shared_experts
is
not
None
:
intermediate_size
=
(
config
.
moe_intermediate_size
*
config
.
n_shared_experts
)
shared_expert_cls
=
DeepseekV2MLP
if
not
self
.
use_all2all_ep
else
EPSharedExperts
self
.
shared_experts
=
shared_expert_cls
(
self
.
shared_experts
=
DeepseekV2MLP
(
hidden_size
=
config
.
hidden_size
,
intermediate_size
=
intermediate_size
,
hidden_act
=
config
.
hidden_act
,
...
...
@@ -203,8 +196,6 @@ class DeepseekV2MoE(nn.Module):
),
prefix
=
f
"
{
prefix
}
.shared_experts"
,
)
if
self
.
use_all2all_ep
:
self
.
experts
.
set_shared_experts
(
self
.
shared_experts
)
from
vllm.two_batch_overlap.two_batch_overlap
import
tbo_all_reduce
self
.
tbo_all_reduce
=
tbo_all_reduce
...
...
@@ -216,59 +207,52 @@ class DeepseekV2MoE(nn.Module):
num_tokens
,
hidden_dim
=
hidden_states
.
shape
hidden_states
=
hidden_states
.
view
(
-
1
,
hidden_dim
)
if
not
self
.
use_all2all_ep
:
if
self
.
n_shared_experts
is
not
None
:
if
envs
.
USE_FUSED_RMS_QUANT
:
shared_output
,
new_resi
=
self
.
shared_experts
(
hidden_states
,
rms_weight
,
residual
,
update_hd
=
True
)
else
:
shared_output
=
self
.
shared_experts
(
hidden_states
)
if
self
.
n_shared_experts
is
not
None
:
if
envs
.
USE_FUSED_RMS_QUANT
:
shared_output
,
new_resi
=
self
.
shared_experts
(
hidden_states
,
rms_weight
,
residual
,
update_hd
=
True
)
else
:
shared_output
=
self
.
shared_experts
(
hidden_states
)
# router_logits: (num_tokens, n_experts)
router_logits
,
_
=
self
.
gate
(
hidden_states
)
if
not
self
.
use_all2all_ep
:
if
envs
.
VLLM_USE_LIGHTOP
and
not
self
.
dpsk_fp16_quick
:
if
envs
.
VLLM_USE_LIGHTOP
and
not
self
.
dpsk_fp16_quick
:
final_hidden_states
=
self
.
experts
(
hidden_states
=
hidden_states
,
router_logits
=
router_logits
,
shared_output
=
shared_output
)
else
:
if
hidden_states
.
dtype
!=
torch
.
float16
or
self
.
dpsk_fp16_quick
:
final_hidden_states
=
self
.
experts
(
hidden_states
=
hidden_states
,
router_logits
=
router_logits
,
shared_output
=
shared_output
)
router_logits
=
router_logits
)
*
self
.
routed_scaling_factor
else
:
if
hidden_states
.
dtype
!=
torch
.
float16
or
self
.
dpsk_fp16_quick
:
final_hidden_states
=
self
.
experts
(
hidden_states
=
hidden_states
,
router_logits
=
router_logits
)
*
self
.
routed_scaling_factor
else
:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states
=
self
.
experts
(
hidden_states
=
hidden_states
,
router_logits
=
router_logits
)
else
:
final_hidden_states
=
self
.
experts
(
hidden_states
=
hidden_states
,
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states
=
self
.
experts
(
hidden_states
=
hidden_states
,
router_logits
=
router_logits
)
if
not
self
.
use_all2all_ep
:
if
shared_output
is
not
None
:
if
hidden_states
.
dtype
!=
torch
.
float16
or
self
.
dpsk_fp16_quick
:
final_hidden_states
=
final_hidden_states
+
shared_output
else
:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states
=
final_hidden_states
+
shared_output
\
*
(
1.
/
self
.
routed_scaling_factor
)
if
self
.
tp_size
>
1
:
if
envs
.
VLLM_ENABLE_TBO
:
final_hidden_states
=
self
.
tbo_all_reduce
(
final_hidden_states
)
else
:
final_hidden_states
=
(
self
.
experts
.
maybe_all_reduce_tensor_model_parallel
(
final_hidden_states
))
if
shared_output
is
not
None
:
if
hidden_states
.
dtype
!=
torch
.
float16
or
self
.
dpsk_fp16_quick
:
final_hidden_states
=
final_hidden_states
+
shared_output
else
:
# Fix FP16 overflow
# See DeepseekV2DecoderLayer for more details.
final_hidden_states
=
final_hidden_states
+
shared_output
\
*
(
1.
/
self
.
routed_scaling_factor
)
if
envs
.
USE_FUSED_RMS_QUANT
:
return
final_hidden_states
.
view
(
num_tokens
,
hidden_dim
),
new_resi
if
self
.
tp_size
>
1
:
if
envs
.
VLLM_ENABLE_TBO
:
final_hidden_states
=
self
.
tbo_all_reduce
(
final_hidden_states
)
else
:
return
final_hidden_states
.
view
(
num_tokens
,
hidden_dim
)
final_hidden_states
=
(
self
.
experts
.
maybe_all_reduce_tensor_model_parallel
(
final_hidden_states
))
if
envs
.
USE_FUSED_RMS_QUANT
:
return
final_hidden_states
.
view
(
num_tokens
,
hidden_dim
),
new_resi
else
:
return
final_hidden_states
.
view
(
num_tokens
,
hidden_dim
)
def
yarn_get_mscale
(
scale
:
float
=
1
,
mscale
:
float
=
1
)
->
float
:
...
...
@@ -928,11 +912,7 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
self
.
use_awq_pad
=
os
.
environ
.
get
(
'AWQ_PAD'
)
==
'1'
self
.
tritonsingleton
=
W8a8GetCacheJSON
()
self
.
tritonsingleton
.
topk
=
config
.
num_experts_per_tok
self
.
tritonsingleton
.
quant_method
=
self
.
quant_method
parallel_config
=
vllm_config
.
parallel_config
dp_size
=
get_dp_group
().
world_size
self
.
use_all2all_ep
=
envs
.
VLLM_USE_ALLTOALL_EP
and
dp_size
>
1
and
parallel_config
.
enable_expert_parallel
self
.
tritonsingleton
.
quant_method
=
self
.
quant_method
def
set_eplb_state
(
self
,
...
...
@@ -1015,10 +995,6 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
(
"gate_up_proj"
,
"up_proj"
,
1
),
]
if
self
.
use_all2all_ep
:
ep_moe_shared_experts_keys
=
"mlp.shared_experts"
ep_moe_shared_experts_mapping
=
{
ep_moe_shared_experts_keys
:
"mlp.experts.shared_experts"
}
# Params for weights, fp8 weight scales, fp8 activation scales
# (param_name, weight_name, expert_id, shard_id)
expert_params_mapping
=
FusedMoE
.
make_expert_params_mapping
(
...
...
@@ -1052,9 +1028,6 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
continue
name
=
name
.
replace
(
weight_name
,
param_name
)
if
self
.
use_all2all_ep
:
name
=
name
.
replace
(
ep_moe_shared_experts_keys
,
ep_moe_shared_experts_mapping
[
ep_moe_shared_experts_keys
])
# Skip loading extra bias for GPTQ models.
if
name
.
endswith
(
".bias"
)
and
name
not
in
params_dict
:
continue
...
...
@@ -1081,9 +1054,6 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
# Instead, create a new variable
name_mapped
=
name
.
replace
(
weight_name
,
param_name
)
if
self
.
use_all2all_ep
:
name_mapped
=
name_mapped
.
replace
(
ep_moe_shared_experts_keys
,
ep_moe_shared_experts_mapping
[
ep_moe_shared_experts_keys
])
if
is_pp_missing_parameter
(
name_mapped
,
self
):
continue
...
...
@@ -1109,8 +1079,6 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP, MixtureOfExperts):
# So we simply skip it
continue
if
self
.
use_all2all_ep
:
name
=
name
.
replace
(
ep_moe_shared_experts_keys
,
ep_moe_shared_experts_mapping
[
ep_moe_shared_experts_keys
])
# Skip loading extra bias for GPTQ models.
if
name
.
endswith
(
".bias"
)
and
name
not
in
params_dict
:
continue
...
...
vllm/v1/worker/gpu_model_runner.py
View file @
0b467604
...
...
@@ -323,9 +323,6 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# from the KV cache of `shared_kv_cache_layers[layer_name]`.
self
.
shared_kv_cache_layers
:
dict
[
str
,
str
]
=
{}
dp_size
=
self
.
vllm_config
.
parallel_config
.
data_parallel_size
self
.
use_all2all_ep
=
envs
.
VLLM_USE_ALLTOALL_EP
and
dp_size
>
1
and
parallel_config
.
enable_expert_parallel
def
_may_reorder_batch
(
self
,
scheduler_output
:
"SchedulerOutput"
)
->
None
:
"""
Update the order of requests in the batch based on the attention
...
...
@@ -1238,7 +1235,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
# TODO(tms) : There are many cases where padding is enabled for
# prefills, causing unnecessary and excessive padding of activations.
if
dp_size
==
1
or
self
.
vllm_config
.
model_config
.
enforce_eager
or
self
.
use_all2all_ep
:
if
dp_size
==
1
or
self
.
vllm_config
.
model_config
.
enforce_eager
:
# Early exit.
return
0
,
None
...
...
@@ -2092,7 +2089,6 @@ class GPUModelRunner(LoRAModelRunnerMixin):
input_ids
=
None
inputs_embeds
=
self
.
inputs_embeds
[:
num_tokens
]
else
:
self
.
input_ids
[:
num_tokens
]
=
torch
.
randint
(
0
,
self
.
model_config
.
get_vocab_size
(),
(
num_tokens
,),
dtype
=
torch
.
int32
)
input_ids
=
self
.
input_ids
[:
num_tokens
]
inputs_embeds
=
None
if
self
.
uses_mrope
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment