Commit 9beadbfb authored by 王敏's avatar 王敏
Browse files

去掉自研专家并行代码

parent 3d68ca03
......@@ -44,7 +44,6 @@ def benchmark_config(
num_iters: int = 100,
block_quant_shape: List[int] = None,
nn_moe: Optional[bool] = False,
moe_ep_size: int = 1,
) -> float:
init_dtype = torch.float16 if use_fp8_w8a8 else dtype
x = torch.randn(num_tokens, hidden_size, dtype=dtype)
......@@ -160,9 +159,6 @@ def benchmark_config(
a2_scale=a2_scale,
block_shape=block_quant_shape,
use_nn_moe=nn_moe,
moe_ep_size=moe_ep_size,
start_expert=0,
end_expert=num_experts,
)
# JIT compilation & warmup
......@@ -410,7 +406,6 @@ class BenchmarkWorker:
use_int8_w8a16: bool,
block_quant_shape: List[int] = None,
nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
) -> tuple[dict[str, int], float]:
current_platform.seed_everything(self.seed)
dtype_str = get_config_dtype_str(dtype,
......@@ -443,8 +438,7 @@ class BenchmarkWorker:
use_int8_w8a16,
num_iters=100,
block_quant_shape=block_quant_shape,
nn_moe=nn_moe,
moe_ep_size=moe_ep_size)
nn_moe=nn_moe,)
return config, kernel_time
def tune(
......@@ -460,7 +454,6 @@ class BenchmarkWorker:
search_space: list[dict[str, int]],
block_quant_shape: list[int],
nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
) -> dict[str, int]:
best_config = None
best_time = float("inf")
......@@ -487,8 +480,7 @@ class BenchmarkWorker:
use_int8_w8a16,
num_iters=20,
block_quant_shape=block_quant_shape,
nn_moe=nn_moe,
moe_ep_size=moe_ep_size)
nn_moe=nn_moe,)
except triton.runtime.autotuner.OutOfResources:
# Some configurations may be invalid and fail to compile.
continue
......@@ -564,35 +556,28 @@ def main(args: argparse.Namespace):
block_quant_shape = None
moe_ep_size = args.moe_ep_size
tp_size = args.tp_size
if moe_ep_size > 1:
tp_size = tp_size // moe_ep_size
config = AutoConfig.from_pretrained(
args.model, trust_remote_code=args.trust_remote_code)
if config.architectures[0] == "DbrxForCausalLM":
E = config.ffn_config.moe_num_experts
E = E // moe_ep_size
topk = config.ffn_config.moe_top_k
intermediate_size = config.ffn_config.ffn_hidden_size
shard_intermediate_size = 2 * intermediate_size // tp_size
elif config.architectures[0] == "JambaForCausalLM":
E = config.num_experts
E = E // moe_ep_size
topk = config.num_experts_per_tok
intermediate_size = config.intermediate_size
shard_intermediate_size = 2 * intermediate_size // tp_size
elif (config.architectures[0] == "DeepseekV3ForCausalLM"
or config.architectures[0] == "DeepseekV2ForCausalLM"):
E = config.n_routed_experts
E = E // moe_ep_size
topk = config.num_experts_per_tok
intermediate_size = config.moe_intermediate_size
shard_intermediate_size = 2 * intermediate_size // tp_size
elif config.architectures[0] == "Qwen2MoeForCausalLM":
E = config.num_experts
E = E // moe_ep_size
topk = config.num_experts_per_tok
intermediate_size = config.moe_intermediate_size
shard_intermediate_size = 2 * intermediate_size // tp_size
......@@ -605,7 +590,6 @@ def main(args: argparse.Namespace):
else:
# Default: Mixtral.
E = config.num_local_experts
E = E // moe_ep_size
topk = config.num_experts_per_tok
intermediate_size = config.intermediate_size
shard_intermediate_size = 2 * intermediate_size // tp_size
......@@ -649,7 +633,7 @@ def main(args: argparse.Namespace):
configs = _distribute(
"tune",
[(batch_size, E, shard_intermediate_size, hidden_size, topk, dtype,
use_fp8_w8a8, use_int8_w8a16, search_space, block_quant_shape, args.nn_moe, moe_ep_size)
use_fp8_w8a8, use_int8_w8a16, search_space, block_quant_shape, args.nn_moe)
for batch_size in batch_sizes])
best_configs = {
M: sort_config(config)
......@@ -664,7 +648,7 @@ def main(args: argparse.Namespace):
outputs = _distribute(
"benchmark",
[(batch_size, E, shard_intermediate_size, hidden_size, topk, dtype,
use_fp8_w8a8, use_int8_w8a16, block_quant_shape, args.nn_moe, moe_ep_size)
use_fp8_w8a8, use_int8_w8a16, block_quant_shape, args.nn_moe)
for batch_size in batch_sizes])
for batch_size, (config, kernel_time) in zip(batch_sizes, outputs):
......@@ -691,7 +675,6 @@ if __name__ == "__main__":
parser.add_argument("--tune", action="store_true")
parser.add_argument("--nn-moe", action='store_true', default=False)
parser.add_argument("--trust-remote-code", action="store_true")
parser.add_argument("--moe-ep-size", "-ep", type=int, default=1)
parser.add_argument("--num-gpus", type=int, default=1)
args = parser.parse_args()
......
......@@ -363,104 +363,6 @@ __global__ void moe_sum_kernel(
}
}
template <typename scalar_t, typename token_cnts_t>
__global__ void ep_moe_align_block_size_kernel(scalar_t* __restrict__ topk_ids,
int32_t* sorted_token_ids,
int32_t* expert_ids,
int32_t* total_tokens_post_pad,
int32_t num_experts,
int32_t block_size, size_t numel,
int32_t start_expert, int32_t end_expert) {
const size_t tokens_per_thread = CEILDIV(numel, blockDim.x);
const size_t start_idx = threadIdx.x * tokens_per_thread;
extern __shared__ int32_t shared_mem[];
int32_t* cumsum = shared_mem; // 1d tensor with shape (num_experts + 1)
token_cnts_t* tokens_cnts =
(token_cnts_t*)(shared_mem + num_experts +
1); // 2d tensor with shape (blockDim.x + 1, num_experts)
for (int i = 0; i < num_experts; ++i) {
tokens_cnts[index(num_experts, threadIdx.x + 1, i)] = 0;
}
/**
* In the first step we compute token_cnts[thread_index + 1][expert_index],
* which counts how many tokens in the token shard of thread_index are
* assigned to expert expert_index.
*/
for (int i = start_idx; i < numel && i < start_idx + tokens_per_thread; ++i) {
int32_t expert_id = topk_ids[i];
if (expert_id >= start_expert && expert_id < end_expert) {
++tokens_cnts[index(num_experts, threadIdx.x + 1, expert_id - start_expert)];
}
}
__syncthreads();
// For each expert we accumulate the token counts from the different threads.
if (threadIdx.x < num_experts) {
tokens_cnts[index(num_experts, 0, threadIdx.x)] = 0;
for (int i = 1; i <= blockDim.x; ++i) {
tokens_cnts[index(num_experts, i, threadIdx.x)] +=
tokens_cnts[index(num_experts, i - 1, threadIdx.x)];
}
}
__syncthreads();
// We accumulate the token counts of all experts in thread 0.
if (threadIdx.x == 0) {
cumsum[0] = 0;
for (int i = 1; i <= num_experts; ++i) {
cumsum[i] = cumsum[i - 1] +
CEILDIV(tokens_cnts[index(num_experts, blockDim.x, i - 1)],
block_size) *
block_size;
}
*total_tokens_post_pad = static_cast<int32_t>(cumsum[num_experts]);
}
__syncthreads();
/**
* For each expert, each thread processes the tokens of the corresponding
* blocks and stores the corresponding expert_id for each block.
*/
if (threadIdx.x < num_experts) {
for (int i = cumsum[threadIdx.x]; i < cumsum[threadIdx.x + 1];
i += block_size) {
expert_ids[i / block_size] = threadIdx.x;
}
}
/**
* Each thread processes a token shard, calculating the index of each token
* after sorting by expert number. Given the example topk_ids =
* [0,1,2,1,2,3,0,3,4] and block_size = 4, then the output would be [0, 6, *,
* *, 1, 3, *, *, 2, 4, *, *, 5, 7, *, *, 8, *, *, *], where * represents a
* padding value(preset in python).
*/
for (int i = start_idx; i < numel && i < start_idx + tokens_per_thread; ++i) {
int32_t expert_id = topk_ids[i];
if (expert_id >= start_expert && expert_id < end_expert) {
expert_id -= start_expert;
/** The cumsum[expert_id] stores the starting index of the tokens that the
* expert with expert_id needs to process, and
* tokens_cnts[threadIdx.x][expert_id] stores the indices of the tokens
* processed by the expert with expert_id within the current thread's token
* shard.
*/
int32_t rank_post_pad =
tokens_cnts[index(num_experts, threadIdx.x, expert_id)] +
cumsum[expert_id];
sorted_token_ids[rank_post_pad] = i;
++tokens_cnts[index(num_experts, threadIdx.x, expert_id)];
}
}
}
} // namespace moe
} // namespace vllm
......@@ -553,91 +455,6 @@ void moe_align_block_size(torch::Tensor topk_ids, int64_t num_experts,
}
}
void ep_moe_align_block_size(torch::Tensor topk_ids, int64_t num_experts,
int64_t block_size, torch::Tensor sorted_token_ids,
torch::Tensor experts_ids,
torch::Tensor num_tokens_post_pad,
int64_t start_expert, int64_t end_expert) {
const cudaStream_t stream = at::cuda::getCurrentCUDAStream();
int device_max_shared_mem;
auto dev = topk_ids.get_device();
cudaDeviceGetAttribute(&device_max_shared_mem,
cudaDevAttrMaxSharedMemoryPerBlockOptin, dev);
const int32_t num_thread = max((int32_t)num_experts, WARP_SIZE);
const int32_t shared_mem_i32 =
((num_thread + 1) * num_experts + (num_experts + 1)) * sizeof(int32_t);
const int32_t shared_mem_i16 =
((num_thread + 1) * num_experts) * sizeof(uint16_t) +
(num_experts + 1) * sizeof(int32_t);
bool use_sgl_kernel = false;
bool use_i16 = false; // Use uint16_t for shared memory token counts
if (shared_mem_i32 < device_max_shared_mem) {
// Do nothing in this case. We're all set to use int32_t token counts
} else if (shared_mem_i16 < device_max_shared_mem &&
topk_ids.numel() <= 65535) {
// when nelements of topk_ids is smaller than 65535 (max value of uint16),
// element value of token_cnts would also smaller than 65535,
// so we can use uint16 as dtype of token_cnts
use_i16 = true;
} else {
use_sgl_kernel = true;
}
if (use_sgl_kernel) {
VLLM_DISPATCH_INTEGRAL_TYPES(
topk_ids.scalar_type(), "sgl_ep_moe_align_block_size_kernel", [&] {
// calc needed amount of shared mem for `tokens_cnts` and `cumsum`
// tensors
auto options_int =
torch::TensorOptions().dtype(torch::kInt).device(topk_ids.device());
// torch::Tensor token_cnts_buffer =
// torch::empty({(num_experts + 1) * num_experts}, options_int);
torch::Tensor cumsum_buffer =
torch::empty({num_experts + 1}, options_int);
auto kernel = vllm::moe::sgl_ep_moe_align_block_size_kernel<scalar_t>;
kernel<<<1, 1024, 0, stream>>>(
topk_ids.data_ptr<scalar_t>(), sorted_token_ids.data_ptr<int32_t>(),
experts_ids.data_ptr<int32_t>(),
num_tokens_post_pad.data_ptr<int32_t>(), num_experts, block_size,
topk_ids.numel(), cumsum_buffer.data_ptr<int32_t>(), start_expert, end_expert);
});
} else if (use_i16) {
VLLM_DISPATCH_INTEGRAL_TYPES(
topk_ids.scalar_type(), "ep_moe_align_block_size_kernel", [&] {
auto kernel =
vllm::moe::ep_moe_align_block_size_kernel<scalar_t, uint16_t>;
AT_CUDA_CHECK(VLLM_DevFuncAttribute_SET_MaxDynamicSharedMemorySize(
(void*)kernel, shared_mem_i16));
kernel<<<1, num_thread, shared_mem_i16, stream>>>(
topk_ids.data_ptr<scalar_t>(),
sorted_token_ids.data_ptr<int32_t>(),
experts_ids.data_ptr<int32_t>(),
num_tokens_post_pad.data_ptr<int32_t>(), num_experts, block_size,
topk_ids.numel(), start_expert, end_expert);
});
} else {
VLLM_DISPATCH_INTEGRAL_TYPES(
topk_ids.scalar_type(), "ep_moe_align_block_size_kernel", [&] {
auto kernel =
vllm::moe::ep_moe_align_block_size_kernel<scalar_t, int32_t>;
AT_CUDA_CHECK(VLLM_DevFuncAttribute_SET_MaxDynamicSharedMemorySize(
(void*)kernel, shared_mem_i32));
kernel<<<1, num_thread, shared_mem_i32, stream>>>(
topk_ids.data_ptr<scalar_t>(),
sorted_token_ids.data_ptr<int32_t>(),
experts_ids.data_ptr<int32_t>(),
num_tokens_post_pad.data_ptr<int32_t>(), num_experts, block_size,
topk_ids.numel(), start_expert, end_expert);
});
}
}
void sgl_moe_align_block_size(torch::Tensor topk_ids, int64_t num_experts,
int64_t block_size,
torch::Tensor sorted_token_ids,
......
......@@ -13,12 +13,6 @@ void moe_align_block_size(torch::Tensor topk_ids, int64_t num_experts,
torch::Tensor experts_ids,
torch::Tensor num_tokens_post_pad);
void ep_moe_align_block_size(torch::Tensor topk_ids, int64_t num_experts,
int64_t block_size, torch::Tensor sorted_token_ids,
torch::Tensor experts_ids,
torch::Tensor num_tokens_post_pad,
int64_t start_expert, int64_t end_expert);
void sgl_moe_align_block_size(torch::Tensor topk_ids, int64_t num_experts,
int64_t block_size,
torch::Tensor sorted_token_ids,
......
......@@ -22,14 +22,6 @@ TORCH_LIBRARY_EXPAND(TORCH_EXTENSION_NAME, m) {
" Tensor! num_tokens_post_pad) -> ()");
m.impl("moe_align_block_size", torch::kCUDA, &moe_align_block_size);
m.def(
"ep_moe_align_block_size(Tensor topk_ids, int num_experts,"
" int block_size, Tensor! sorted_token_ids,"
" Tensor! experts_ids,"
" Tensor! num_tokens_post_pad,"
" int start_expert, int end_expert) -> ()");
m.impl("ep_moe_align_block_size", torch::kCUDA, &ep_moe_align_block_size);
// temporarily adapted from
// https://github.com/sgl-project/sglang/commit/ded9fcd09a43d5e7d5bb31a2bc3e9fc21bf65d2a
m.def(
......
......@@ -1557,16 +1557,6 @@ def sgl_moe_align_block_size(topk_ids: torch.Tensor, num_experts: int,
block_size, sorted_token_ids,
experts_ids, num_tokens_post_pad)
def ep_moe_align_block_size(topk_ids: torch.Tensor, num_experts: int,
block_size: int, sorted_token_ids: torch.Tensor,
experts_ids: torch.Tensor,
num_tokens_post_pad: torch.Tensor,
start_expert, end_expert) -> None:
torch.ops._moe_C.ep_moe_align_block_size(topk_ids, num_experts, block_size,
sorted_token_ids, experts_ids,
num_tokens_post_pad, start_expert,
end_expert)
def moe_wna16_gemm(input: torch.Tensor, output: torch.Tensor,
b_qweight: torch.Tensor, b_scales: torch.Tensor,
......
......@@ -1408,8 +1408,6 @@ class ParallelConfig:
rank: int = 0
moe_ep_size: Optional[int] = 1
def get_next_dp_init_port(self) -> int:
"""
We might need to initialize process groups in multiple
......
......@@ -226,7 +226,6 @@ class EngineArgs:
reasoning_parser: Optional[str] = None
use_tqdm_on_load: bool = True
moe_ep_size: int = 1
def __post_init__(self):
if not self.tokenizer:
......@@ -446,11 +445,6 @@ class EngineArgs:
type=int,
default=EngineArgs.tensor_parallel_size,
help='Number of tensor parallel replicas.')
parser.add_argument('--moe-ep-size',
'-ep',
type=int,
default=EngineArgs.moe_ep_size,
help='Number of moe expert parallel replicas.')
parser.add_argument(
'--enable-expert-parallel',
action='store_true',
......@@ -1386,7 +1380,6 @@ class EngineArgs:
distributed_executor_backend=self.distributed_executor_backend,
worker_cls=self.worker_cls,
worker_extension_cls=self.worker_extension_cls,
moe_ep_size=self.moe_ep_size,
)
speculative_config = self.create_speculative_config(
......
......@@ -923,66 +923,6 @@ def moe_align_block_size(
return sorted_ids, expert_ids, num_tokens_post_pad
def moe_ep_align_block_size(
topk_ids: torch.Tensor, block_size: int,
num_experts: int, start_expert: int,
end_expert: int) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Aligns the token distribution across experts to be compatible with block
size for matrix multiplication.
Parameters:
- topk_ids: A tensor of shape [total_tokens, top_k] representing the
top-k expert indices for each token.
- block_size: The block size used in block matrix multiplication.
- num_experts: The total number of experts.
Returns:
- sorted_token_ids: A tensor containing the sorted token indices according
to their allocated expert.
- expert_ids: A tensor indicating the assigned expert index for each block.
- num_tokens_post_padded: The total number of tokens after padding,
ensuring divisibility by block_size.
This function pads the number of tokens that each expert needs to process
so that it is divisible by block_size.
Padding ensures that during block matrix multiplication, the dimensions
align correctly.
Example:
Given topk_ids = [[2, 3, 4], [1, 2, 4], [1, 3, 4], [1, 2, 3]],
block_size = 4, and num_experts = 4:
- We initially have 12 tokens (after repeating 'top_k' times) and 4 experts,
with each expert needing to process 3 tokens.
- As block_size is 4, we pad 1 token for each expert.
- First, flatten topk_ids to [2, 3, 4, 1, 2, 4, 1, 3, 4, 1, 2, 3].
- Then append padding tokens [12, 12, 12, 12] for each block.
- After sorting by expert index, we obtain token_ids
[3, 6, 9, 12, 0, 4, 10, 12, 1, 7, 11, 12, 2, 5, 8, 12].
Tokens 12 are non-existent (padding) and are ignored in
the subsequent matrix multiplication.
- The padding ensures that the total number of tokens is now divisible
by block_size for proper block matrix operations.
"""
max_num_tokens_padded = topk_ids.numel() + num_experts * (block_size - 1)
sorted_ids = torch.empty((max_num_tokens_padded, ),
dtype=torch.int32,
device=topk_ids.device)
sorted_ids.fill_(topk_ids.numel())
max_num_m_blocks = triton.cdiv(max_num_tokens_padded, block_size)
expert_ids = torch.empty((max_num_m_blocks, ),
dtype=torch.int32,
device=topk_ids.device)
num_tokens_post_pad = torch.empty((1),
dtype=torch.int32,
device=topk_ids.device)
ops.ep_moe_align_block_size(topk_ids, num_experts, block_size, sorted_ids,
expert_ids, num_tokens_post_pad, start_expert,
end_expert)
return sorted_ids, expert_ids, num_tokens_post_pad
def invoke_fused_moe_kernel(A: torch.Tensor,
B: torch.Tensor,
C: torch.Tensor,
......@@ -1565,16 +1505,12 @@ def inplace_fused_experts(hidden_states: torch.Tensor,
a1_scale: Optional[torch.Tensor] = None,
a2_scale: Optional[torch.Tensor] = None,
block_shape: Optional[List[int]] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1) -> None:
use_nn_moe: Optional[bool] = False,) -> None:
fused_experts_impl(hidden_states, w1, w2, topk_weights, topk_ids, True,
activation, use_fp8_w8a8, use_int8_w8a8, use_int8_w8a16,
use_int4_w4a16, global_num_experts, expert_map,
w1_scale, w2_scale, w1_zp, w2_zp, a1_scale, a2_scale,
block_shape, use_nn_moe, moe_ep_size=moe_ep_size,
start_expert=start_expert, end_expert=end_expert)
block_shape, use_nn_moe,)
def inplace_fused_experts_fake(
......@@ -1597,10 +1533,7 @@ def inplace_fused_experts_fake(
a1_scale: Optional[torch.Tensor] = None,
a2_scale: Optional[torch.Tensor] = None,
block_shape: Optional[List[int]] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1) -> None:
use_nn_moe: Optional[bool] = False,) -> None:
pass
......@@ -1632,17 +1565,13 @@ def outplace_fused_experts(
a1_scale: Optional[torch.Tensor] = None,
a2_scale: Optional[torch.Tensor] = None,
block_shape: Optional[List[int]] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1) -> torch.Tensor:
use_nn_moe: Optional[bool] = False,) -> torch.Tensor:
return fused_experts_impl(hidden_states, w1, w2, topk_weights, topk_ids,
False, activation, use_fp8_w8a8, use_int8_w8a8, use_int8_w8a16,
use_int4_w4a16, global_num_experts, expert_map,
w1_scale, w2_scale, w1_zp, w2_zp, a1_scale,
a2_scale, block_shape,
use_nn_moe, moe_ep_size=moe_ep_size,
start_expert=start_expert, end_expert=end_expert)
use_nn_moe,)
def outplace_fused_experts_fake(
......@@ -1665,10 +1594,7 @@ def outplace_fused_experts_fake(
a1_scale: Optional[torch.Tensor] = None,
a2_scale: Optional[torch.Tensor] = None,
block_shape: Optional[List[int]] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1) -> torch.Tensor:
use_nn_moe: Optional[bool] = False,) -> torch.Tensor:
return torch.empty_like(hidden_states)
......@@ -1700,10 +1626,7 @@ def fused_experts(hidden_states: torch.Tensor,
a1_scale: Optional[torch.Tensor] = None,
a2_scale: Optional[torch.Tensor] = None,
block_shape: Optional[List[int]] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1) -> torch.Tensor:
use_nn_moe: Optional[bool] = False,) -> torch.Tensor:
if inplace:
torch.ops.vllm.inplace_fused_experts(
......@@ -1711,10 +1634,7 @@ def fused_experts(hidden_states: torch.Tensor,
use_fp8_w8a8, use_int8_w8a8, use_int8_w8a16, use_int4_w4a16, global_num_experts,
expert_map, w1_scale, w2_scale, w1_zp, w2_zp, a1_scale, a2_scale,
block_shape,
use_nn_moe,
moe_ep_size=moe_ep_size,
start_expert=start_expert,
end_expert=end_expert)
use_nn_moe,)
return hidden_states
else:
return torch.ops.vllm.outplace_fused_experts(
......@@ -1722,10 +1642,7 @@ def fused_experts(hidden_states: torch.Tensor,
use_fp8_w8a8, use_int8_w8a8, use_int8_w8a16, use_int4_w4a16, global_num_experts,
expert_map, w1_scale, w2_scale, w1_zp, w2_zp, a1_scale, a2_scale,
block_shape,
use_nn_moe,
moe_ep_size=moe_ep_size,
start_expert=start_expert,
end_expert=end_expert)
use_nn_moe,)
def fused_experts_impl(hidden_states: torch.Tensor,
......@@ -1748,10 +1665,7 @@ def fused_experts_impl(hidden_states: torch.Tensor,
a1_scale: Optional[torch.Tensor] = None,
a2_scale: Optional[torch.Tensor] = None,
block_shape: Optional[List[int]] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1):
use_nn_moe: Optional[bool] = False,):
# Check constraints.
if use_int4_w4a16:
assert hidden_states.shape[1] // 2 == w1.shape[
......@@ -1817,9 +1731,6 @@ def fused_experts_impl(hidden_states: torch.Tensor,
device=hidden_states.device,
dtype=hidden_states.dtype)
if moe_ep_size > 1:
intermediate_cache3.zero_()
if hidden_states.dtype == torch.bfloat16:
compute_type = tl.bfloat16
elif hidden_states.dtype == torch.float16:
......@@ -1885,17 +1796,12 @@ def fused_experts_impl(hidden_states: torch.Tensor,
"num_warps": 4
}
if moe_ep_size == 1:
if use_int4_w4a16:
sorted_token_ids, expert_ids, num_tokens_post_padded = (
moe_align_block_size(curr_topk_ids, config['BLOCK_SIZE_M'], global_num_experts, expert_map, curr_hidden_states.shape[0]))
else:
sorted_token_ids, expert_ids, num_tokens_post_padded = (
moe_align_block_size(curr_topk_ids, config['BLOCK_SIZE_M'], global_num_experts, expert_map))
else:
sorted_token_ids, expert_ids, num_tokens_post_padded = (
moe_ep_align_block_size(curr_topk_ids, config['BLOCK_SIZE_M'], global_num_experts, expert_map,
start_expert, end_expert))
invoke_fused_moe_kernel(curr_hidden_states,
w1,
......@@ -2009,9 +1915,6 @@ def fused_moe(
a2_scale: Optional[torch.Tensor] = None,
block_shape: Optional[List[int]] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = None,
end_expert: Optional[int] = None,
) -> torch.Tensor:
"""
This function computes a Mixture of Experts (MoE) layer using two sets of
......@@ -2093,7 +1996,4 @@ def fused_moe(
a1_scale=a1_scale,
a2_scale=a2_scale,
block_shape=block_shape,
use_nn_moe=use_nn_moe,
moe_ep_size=moe_ep_size,
start_expert=start_expert,
end_expert=end_expert)
\ No newline at end of file
use_nn_moe=use_nn_moe,)
\ No newline at end of file
......@@ -164,9 +164,6 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp):
e_score_correction_bias: Optional[torch.Tensor] = None,
activation: str = "silu",
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1,
) -> torch.Tensor:
return self.forward(x=x,
layer=layer,
......@@ -182,10 +179,7 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp):
scoring_func=scoring_func,
e_score_correction_bias=e_score_correction_bias,
activation=activation,
use_nn_moe=use_nn_moe,
moe_ep_size=moe_ep_size,
start_expert=start_expert,
end_expert=end_expert)
use_nn_moe=use_nn_moe,)
def forward_cuda(
self,
......@@ -204,9 +198,6 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp):
e_score_correction_bias: Optional[torch.Tensor] = None,
activation: str = "silu",
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1,
) -> torch.Tensor:
topk_weights, topk_ids = FusedMoE.select_experts(
hidden_states=x,
......@@ -229,10 +220,7 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp):
activation=activation,
global_num_experts=global_num_experts,
expert_map=expert_map,
use_nn_moe=use_nn_moe,
moe_ep_size=moe_ep_size,
start_expert=start_expert,
end_expert=end_expert)
use_nn_moe=use_nn_moe,)
def forward_cpu(
self,
......@@ -281,9 +269,6 @@ class UnquantizedFusedMoEMethod(FusedMoEMethodBase, CustomOp):
scoring_func: str = "softmax",
e_score_correction_bias: Optional[torch.Tensor] = None,
use_nn_moe: Optional[bool] = False,
moe_ep_size: Optional[int] = 1,
start_expert: Optional[int] = -1,
end_expert: Optional[int] = -1
) -> torch.Tensor:
assert not use_grouped_topk
assert num_expert_group is None
......@@ -426,7 +411,6 @@ class FusedMoE(torch.nn.Module):
scoring_func: str = "softmax",
e_score_correction_bias: Optional[torch.Tensor] = None,
activation: str = "silu",
moe_ep_size: Optional[int] = 1,
):
super().__init__()
......@@ -495,17 +479,6 @@ class FusedMoE(torch.nn.Module):
self.e_score_correction_bias = e_score_correction_bias
self.activation = activation
self.tp_rank = get_tensor_model_parallel_rank()
self.moe_ep_size = moe_ep_size
self.moe_tp_rank = self.tp_rank // self.moe_ep_size
self.moe_tp_size = self.tp_size // self.moe_ep_size
if self.moe_ep_size > 1:
self.intermediate_size_per_partition = intermediate_size // self.moe_tp_size
self.moe_ep_rank = self.tp_rank % self.moe_ep_size
num_experts_per_node = num_experts // self.moe_ep_size
self.start_expert = num_experts_per_node * self.moe_ep_rank
self.end_expert = self.start_expert + num_experts_per_node
if self.scoring_func != "softmax" and not self.use_grouped_topk:
raise ValueError("Only softmax scoring function is supported for "
"non-grouped topk.")
......@@ -529,7 +502,7 @@ class FusedMoE(torch.nn.Module):
self.use_nn_moe = False
moe_quant_params = {
"num_experts": self.local_num_experts if self.moe_ep_size == 1 else num_experts_per_node,
"num_experts": self.local_num_experts,
"hidden_size": hidden_size,
"intermediate_size_per_partition":
self.intermediate_size_per_partition,
......@@ -707,10 +680,7 @@ class FusedMoE(torch.nn.Module):
# dimension intermediate_size_per_partition is used.
SHARD_ID_TO_SHARDED_DIM = {"w1": 0, "w2": 1, "w3": 0}
expert_id = expert_id - self.start_expert
expert_data = param.data[expert_id]
tp_rank = get_tensor_model_parallel_rank()
tp_rank = tp_rank // self.moe_ep_size
is_gguf_weight = getattr(param, "is_gguf_weight", False)
is_gguf_weight_type = getattr(param, "is_gguf_weight_type", False)
......@@ -917,9 +887,6 @@ class FusedMoE(torch.nn.Module):
e_score_correction_bias=self.e_score_correction_bias,
activation=self.activation,
use_nn_moe=self.use_nn_moe,
moe_ep_size=self.moe_ep_size,
start_expert=self.start_expert,
end_expert=self.end_expert,
)
if self.dp_size > 1:
......@@ -955,32 +922,6 @@ class FusedMoE(torch.nn.Module):
]
]
@classmethod
def make_expert_params_mapping_ep(
cls, ckpt_gate_proj_name: str, ckpt_down_proj_name: str,
ckpt_up_proj_name: str,
num_experts: int,
moe_ep_size) -> List[Tuple[str, str, int, str]]:
# tp_size = get_tensor_model_parallel_world_size()
tp_rank = get_tensor_model_parallel_rank()
# moe_tp_rank = tp_rank // moe_ep_size
moe_ep_rank = tp_rank % moe_ep_size
experts_per_rank = num_experts // moe_ep_size
experts_range = range(moe_ep_rank * experts_per_rank,
(moe_ep_rank + 1) * experts_per_rank)
return [
# (param_name, weight_name, expert_id, shard_id)
("experts.w13_" if weight_name
in [ckpt_gate_proj_name, ckpt_up_proj_name] else "experts.w2_",
f"experts.{expert_id}.{weight_name}.", expert_id, shard_id)
for expert_id in experts_range for shard_id, weight_name in [
("w1", ckpt_gate_proj_name),
("w2", ckpt_down_proj_name),
("w3", ckpt_up_proj_name),
]
]
def _load_fp8_scale(self, param: torch.nn.Parameter,
loaded_weight: torch.Tensor, weight_name: str,
......
......@@ -109,7 +109,6 @@ class DeepseekV2MoE(nn.Module):
config: PretrainedConfig,
quant_config: Optional[QuantizationConfig] = None,
prefix: str = "",
moe_ep_size: int = 1
):
super().__init__()
self.tp_size = get_tensor_model_parallel_world_size()
......@@ -144,8 +143,7 @@ class DeepseekV2MoE(nn.Module):
topk_group=config.topk_group,
prefix=f"{prefix}.experts",
scoring_func=config.scoring_func,
e_score_correction_bias=self.gate.e_score_correction_bias,
moe_ep_size=moe_ep_size)
e_score_correction_bias=self.gate.e_score_correction_bias,)
if config.n_shared_experts is not None:
intermediate_size = (config.moe_intermediate_size *
......@@ -500,7 +498,6 @@ class DeepseekV2DecoderLayer(nn.Module):
model_config: ModelConfig,
cache_config: Optional[CacheConfig] = None,
quant_config: Optional[QuantizationConfig] = None,
moe_ep_size : int = 1,
) -> None:
super().__init__()
self.hidden_size = config.hidden_size
......@@ -540,7 +537,6 @@ class DeepseekV2DecoderLayer(nn.Module):
config=config,
quant_config=quant_config,
prefix=f"{prefix}.mlp",
moe_ep_size=moe_ep_size
)
else:
self.mlp = DeepseekV2MLP(
......@@ -595,7 +591,7 @@ class DeepseekV2Model(nn.Module):
fall_back_to_pt_during_load = False
def __init__(self, *, vllm_config: VllmConfig, prefix: str = "", moe_ep_size: int = 1):
def __init__(self, *, vllm_config: VllmConfig, prefix: str = "",):
super().__init__()
config = vllm_config.model_config.hf_config
......@@ -623,7 +619,6 @@ class DeepseekV2Model(nn.Module):
model_config=model_config,
cache_config=cache_config,
quant_config=quant_config,
moe_ep_size=moe_ep_size
),
prefix=f"{prefix}.layers")
......@@ -687,11 +682,9 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP):
self.config = config
self.quant_config = quant_config
self.parallel_config = vllm_config.parallel_config
self.moe_ep_size = self.parallel_config.moe_ep_size
self.model = DeepseekV2Model(vllm_config=vllm_config,
prefix=maybe_prefix(prefix, "model"),
moe_ep_size=self.moe_ep_size)
prefix=maybe_prefix(prefix, "model"),)
if get_pp_group().is_last_rank:
self.lm_head = ParallelLMHead(config.vocab_size,
config.hidden_size,
......@@ -780,19 +773,11 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP):
# Params for weights, fp8 weight scales, fp8 activation scales
# (param_name, weight_name, expert_id, shard_id)
if self.moe_ep_size == 1:
expert_params_mapping = FusedMoE.make_expert_params_mapping(
ckpt_gate_proj_name="gate_proj",
ckpt_down_proj_name="down_proj",
ckpt_up_proj_name="up_proj",
num_experts=self.config.n_routed_experts)
else:
expert_params_mapping = FusedMoE.make_expert_params_mapping_ep(
ckpt_gate_proj_name="gate_proj",
ckpt_down_proj_name="down_proj",
ckpt_up_proj_name="up_proj",
num_experts=self.config.n_routed_experts,
moe_ep_size=self.moe_ep_size)
params_dict = dict(self.named_parameters())
loaded_params: Set[str] = set()
......@@ -857,10 +842,6 @@ class DeepseekV2ForCausalLM(nn.Module, SupportsPP):
continue
if is_pp_missing_parameter(name, self):
continue
# Skip loading extra expert weights for ep moe mode
if name not in params_dict:
continue
param = params_dict[name]
......
......@@ -99,7 +99,6 @@ class DeepseekV3MoE(nn.Module):
config: PretrainedConfig,
quant_config: Optional[QuantizationConfig] = None,
prefix: str = "",
moe_ep_size: int = 1
):
super().__init__()
self.tp_size = get_tensor_model_parallel_world_size()
......@@ -139,8 +138,7 @@ class DeepseekV3MoE(nn.Module):
topk_group=config.topk_group,
prefix=f"{prefix}.experts",
scoring_func=config.scoring_func,
e_score_correction_bias=self.gate.e_score_correction_bias,
moe_ep_size=moe_ep_size)
e_score_correction_bias=self.gate.e_score_correction_bias,)
if config.n_shared_experts is not None:
intermediate_size = (config.moe_intermediate_size *
......@@ -490,7 +488,6 @@ class DeepseekV3DecoderLayer(nn.Module):
model_config: ModelConfig,
cache_config: Optional[CacheConfig] = None,
quant_config: Optional[QuantizationConfig] = None,
moe_ep_size : int = 1,
) -> None:
super().__init__()
self.hidden_size = config.hidden_size
......@@ -529,7 +526,6 @@ class DeepseekV3DecoderLayer(nn.Module):
config=config,
quant_config=quant_config,
prefix=f"{prefix}.mlp",
moe_ep_size=moe_ep_size
)
else:
self.mlp = DeepseekV3MLP(
......@@ -579,7 +575,7 @@ class DeepseekV3Model(nn.Module):
fall_back_to_pt_during_load = False
def __init__(self, *, vllm_config: VllmConfig, prefix: str = "", moe_ep_size: int = 1):
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
super().__init__()
config = vllm_config.model_config.hf_config
......@@ -606,7 +602,6 @@ class DeepseekV3Model(nn.Module):
model_config=model_config,
cache_config=cache_config,
quant_config=quant_config,
moe_ep_size=moe_ep_size
),
prefix=f"{prefix}.layers")
......@@ -666,11 +661,9 @@ class DeepseekV3ForCausalLM(nn.Module, SupportsPP):
self.config = config
self.quant_config = quant_config
self.parallel_config = vllm_config.parallel_config
self.moe_ep_size = self.parallel_config.moe_ep_size
self.model = DeepseekV3Model(vllm_config=vllm_config,
prefix=maybe_prefix(prefix, "model"),
moe_ep_size=self.moe_ep_size)
prefix=maybe_prefix(prefix, "model"),)
self.lm_head = ParallelLMHead(config.vocab_size,
config.hidden_size,
quant_config=quant_config)
......@@ -743,19 +736,11 @@ class DeepseekV3ForCausalLM(nn.Module, SupportsPP):
# Params for weights, fp8 weight scales, fp8 activation scales
# (param_name, weight_name, expert_id, shard_id)
if self.moe_ep_size == 1:
expert_params_mapping = FusedMoE.make_expert_params_mapping(
ckpt_gate_proj_name="gate_proj",
ckpt_down_proj_name="down_proj",
ckpt_up_proj_name="up_proj",
num_experts=self.config.n_routed_experts)
else:
expert_params_mapping = FusedMoE.make_expert_params_mapping_ep(
ckpt_gate_proj_name="gate_proj",
ckpt_down_proj_name="down_proj",
ckpt_up_proj_name="up_proj",
num_experts=self.config.n_routed_experts,
moe_ep_size=self.moe_ep_size)
params_dict = dict(self.named_parameters())
loaded_params: Set[str] = set()
......@@ -821,10 +806,6 @@ class DeepseekV3ForCausalLM(nn.Module, SupportsPP):
if is_pp_missing_parameter(name, self):
continue
# Skip loading extra expert weights for ep moe mode
if name not in params_dict:
continue
param = params_dict[name]
weight_loader = getattr(param, "weight_loader",
default_weight_loader)
......
......@@ -76,8 +76,7 @@ class MixtralMoE(nn.Module):
quant_config: Optional[QuantizationConfig] = None,
tp_size: Optional[int] = None,
dp_size: Optional[int] = None,
prefix: str = "",
moe_ep_size: int = 1):
prefix: str = "",):
super().__init__()
self.hidden_size = hidden_size
......@@ -100,8 +99,7 @@ class MixtralMoE(nn.Module):
quant_config=quant_config,
tp_size=tp_size,
dp_size=dp_size,
prefix=f"{prefix}.experts",
moe_ep_size=moe_ep_size)
prefix=f"{prefix}.experts",)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
# NOTE: hidden_states can have either 1D or 2D shape.
......@@ -203,7 +201,6 @@ class MixtralDecoderLayer(nn.Module):
cache_config: Optional[CacheConfig] = None,
quant_config: Optional[QuantizationConfig] = None,
prefix: str = "",
moe_ep_size : int = 1,
) -> None:
super().__init__()
self.hidden_size = config.hidden_size
......@@ -225,8 +222,7 @@ class MixtralDecoderLayer(nn.Module):
hidden_size=config.hidden_size,
intermediate_size=config.intermediate_size,
quant_config=quant_config,
prefix=f"{prefix}.block_sparse_moe",
moe_ep_size=moe_ep_size)
prefix=f"{prefix}.block_sparse_moe",)
self.input_layernorm = RMSNorm(config.hidden_size,
eps=config.rms_norm_eps)
self.post_attention_layernorm = RMSNorm(config.hidden_size,
......@@ -260,7 +256,7 @@ class MixtralDecoderLayer(nn.Module):
@support_torch_compile
class MixtralModel(nn.Module):
def __init__(self, *, vllm_config: VllmConfig, prefix: str = "", moe_ep_size: int = 1):
def __init__(self, *, vllm_config: VllmConfig, prefix: str = "",):
super().__init__()
config = vllm_config.model_config.hf_config
......@@ -282,7 +278,7 @@ class MixtralModel(nn.Module):
self.start_layer, self.end_layer, self.layers = make_layers(
config.num_hidden_layers,
lambda prefix: MixtralDecoderLayer(
config, cache_config, quant_config=quant_config, prefix=prefix, moe_ep_size=moe_ep_size
config, cache_config, quant_config=quant_config, prefix=prefix,
),
prefix=f"{prefix}.layers")
......@@ -350,11 +346,9 @@ class MixtralForCausalLM(nn.Module, SupportsLoRA, SupportsPP):
self.quant_config = quant_config
self.parallel_config = vllm_config.parallel_config
self.moe_ep_size = self.parallel_config.moe_ep_size
self.model = MixtralModel(vllm_config=vllm_config,
prefix=maybe_prefix(prefix, "model"),
moe_ep_size=self.moe_ep_size)
prefix=maybe_prefix(prefix, "model"),)
self.unpadded_vocab_size = config.vocab_size
if lora_config:
self.unpadded_vocab_size += lora_config.lora_extra_vocab_size
......@@ -425,19 +419,11 @@ class MixtralForCausalLM(nn.Module, SupportsLoRA, SupportsPP):
# Params for weights, fp8 weight scales, fp8 activation scales
# (param_name, weight_name, expert_id, shard_id)
if self.moe_ep_size == 1:
expert_params_mapping = FusedMoE.make_expert_params_mapping(
ckpt_gate_proj_name="w1",
ckpt_down_proj_name="w2",
ckpt_up_proj_name="w3",
num_experts=self.config.num_local_experts)
else:
expert_params_mapping = FusedMoE.make_expert_params_mapping_ep(
ckpt_gate_proj_name="w1",
ckpt_down_proj_name="w2",
ckpt_up_proj_name="w3",
num_experts=self.config.num_local_experts,
moe_ep_size=self.moe_ep_size)
params_dict = dict(self.named_parameters())
loaded_params: Set[str] = set()
......@@ -511,10 +497,6 @@ class MixtralForCausalLM(nn.Module, SupportsLoRA, SupportsPP):
if name is None:
continue
# Skip loading extra expert weights for ep moe mode
if name not in params_dict:
continue
param = params_dict[name]
weight_loader = getattr(param, "weight_loader",
default_weight_loader)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment