Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
680ee839
Commit
680ee839
authored
Jan 16, 2026
by
zhuwenwen
Browse files
Merge remote-tracking branch 'origin/v0.9.2-dev_mtp_sampler' into v0.9.2-dev
parents
2c560dc5
a2f0ce42
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
410 additions
and
0 deletions
+410
-0
vllm/envs.py
vllm/envs.py
+27
-0
vllm/model_executor/layers/fused_moe/router_capture.py
vllm/model_executor/layers/fused_moe/router_capture.py
+360
-0
vllm/model_executor/models/qwen3_moe.py
vllm/model_executor/models/qwen3_moe.py
+23
-0
No files found.
vllm/envs.py
View file @
680ee839
...
...
@@ -208,6 +208,12 @@ if TYPE_CHECKING:
VLLM_V1_FAST_TOKEN_ID_COPY
:
bool
=
False
VLLM_DISABLE_SHARED_EXPERTS_STREAM
:
bool
=
True
VLLM_W8A8_BACKEND
:
int
=
3
VLLM_MOE_ROUTER_CAPTURE
:
bool
=
False
VLLM_MOE_ROUTER_CAPTURE_DIR
:
str
=
"/tmp"
VLLM_MOE_ROUTER_CAPTURE_RANK
:
int
=
-
1
VLLM_MOE_ROUTER_CAPTURE_MAX_LAYERS
:
int
=
0
VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_GT
:
int
=
-
1
VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_LT
:
int
=
-
1
def
get_default_cache_root
():
return
os
.
getenv
(
...
...
@@ -1343,6 +1349,27 @@ environment_variables: dict[str, Callable[[], Any]] = {
# rocblas: others
"VLLM_W8A8_BACKEND"
:
lambda
:
int
(
os
.
getenv
(
"VLLM_W8A8_BACKEND"
,
"3"
)),
# Capture MoE router logits for debugging/analysis.
"VLLM_MOE_ROUTER_CAPTURE"
:
lambda
:
(
os
.
getenv
(
"VLLM_MOE_ROUTER_CAPTURE"
,
"0"
).
lower
()
in
(
"true"
,
"1"
)),
# Output directory for MoE router capture dumps.
"VLLM_MOE_ROUTER_CAPTURE_DIR"
:
lambda
:
os
.
environ
.
get
(
"VLLM_MOE_ROUTER_CAPTURE_DIR"
,
"/tmp"
,
),
# Capture only the specified rank; set to -1 to capture all ranks.
"VLLM_MOE_ROUTER_CAPTURE_RANK"
:
lambda
:
int
(
os
.
environ
.
get
(
"VLLM_MOE_ROUTER_CAPTURE_RANK"
,
"-1"
)),
# Max number of MoE layers to record per process (0 = unlimited).
"VLLM_MOE_ROUTER_CAPTURE_MAX_LAYERS"
:
lambda
:
int
(
os
.
environ
.
get
(
"VLLM_MOE_ROUTER_CAPTURE_MAX_LAYERS"
,
"0"
)),
# Only capture when num_tokens > N (negative disables).
"VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_GT"
:
lambda
:
int
(
os
.
environ
.
get
(
"VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_GT"
,
"-1"
)),
# Only capture when num_tokens < N (0 disables).
"VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_LT"
:
lambda
:
int
(
os
.
environ
.
get
(
"VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_LT"
,
"-1"
)),
}
# --8<-- [end:env-vars-definition]
...
...
vllm/model_executor/layers/fused_moe/router_capture.py
0 → 100644
View file @
680ee839
"""
Utilities for capturing MoE router distributions from real workloads.
This is intentionally lightweight and gated behind env vars so it has zero
runtime impact unless explicitly enabled.
Env vars (defaults from vllm.envs):
- VLLM_MOE_ROUTER_CAPTURE=0/1: enable capture (default: 0).
- VLLM_MOE_ROUTER_CAPTURE_DIR=/path: output directory for per-process dumps
(default: /tmp).
- VLLM_MOE_ROUTER_CAPTURE_RANK=N: only capture on the given torch.distributed
rank (default: -1; set to -1 to capture all ranks).
- VLLM_MOE_ROUTER_CAPTURE_MAX_LAYERS=N: max number of layers to record per
process (default: 0; 0 = unlimited).
- VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_GT=A: only record calls where router_logits
has num_tokens > A (default: -1; <0 = disabled).
- VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_LT=B: only record calls where router_logits
has num_tokens < B (default: -1; 0 = disabled).
Output format:
- A single `.pt` per captured num_tokens (and per rank if torch.distributed is
initialized).
- Payload includes `layers_by_num_tokens: dict[str, dict[layer_name, layer_state]]`.
- A convenience `layers` field is also included (same as
`layers_by_num_tokens[str(num_tokens)]`) for easy loading.
- For each captured MoE layer, stores a list of 2D tensors
`router_logits_chunks: list[Tensor[num_tokens_i, num_experts]]` on CPU,
typically in fp16 for space efficiency.
"""
from
__future__
import
annotations
import
atexit
import
inspect
import
os
import
socket
import
threading
import
time
from
dataclasses
import
dataclass
from
typing
import
Optional
import
torch
import
vllm.envs
as
envs
_DEFAULT_SKIP_STACK_FUNCS
=
(
"profile_run"
,
"_dummy_run"
,
"determine_available_memory"
)
@
dataclass
(
frozen
=
True
)
class
RouterCaptureConfig
:
enabled
:
bool
=
False
out_dir
:
str
=
"/tmp"
skip_profile
:
bool
=
True
skip_stack_funcs
:
tuple
[
str
,
...]
=
_DEFAULT_SKIP_STACK_FUNCS
only_rank
:
Optional
[
int
]
=
0
max_layers
:
int
=
0
num_tokens_gt
:
Optional
[
int
]
=
None
num_tokens_lt
:
Optional
[
int
]
=
None
@
staticmethod
def
from_env
()
->
"RouterCaptureConfig"
:
enabled
=
envs
.
VLLM_MOE_ROUTER_CAPTURE
out_dir
=
envs
.
VLLM_MOE_ROUTER_CAPTURE_DIR
skip_profile
=
True
skip_stack_funcs
=
_DEFAULT_SKIP_STACK_FUNCS
only_rank
:
Optional
[
int
]
=
None
if
envs
.
VLLM_MOE_ROUTER_CAPTURE_RANK
>=
0
:
only_rank
=
envs
.
VLLM_MOE_ROUTER_CAPTURE_RANK
max_layers
=
envs
.
VLLM_MOE_ROUTER_CAPTURE_MAX_LAYERS
num_tokens_gt_opt
=
(
envs
.
VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_GT
if
envs
.
VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_GT
>=
0
else
None
)
num_tokens_lt_opt
=
(
envs
.
VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_LT
if
envs
.
VLLM_MOE_ROUTER_CAPTURE_NUM_TOKENS_LT
>
0
else
None
)
# Per-size mode requires an explicit token-count filter to avoid
# unbounded captures by default.
if
num_tokens_gt_opt
is
None
and
num_tokens_lt_opt
is
None
:
enabled
=
False
if
(
num_tokens_gt_opt
is
not
None
and
num_tokens_lt_opt
is
not
None
and
num_tokens_gt_opt
>=
num_tokens_lt_opt
):
enabled
=
False
return
RouterCaptureConfig
(
enabled
=
enabled
,
out_dir
=
out_dir
,
skip_profile
=
skip_profile
,
skip_stack_funcs
=
skip_stack_funcs
,
only_rank
=
only_rank
,
max_layers
=
max_layers
,
num_tokens_gt
=
num_tokens_gt_opt
,
num_tokens_lt
=
num_tokens_lt_opt
)
def
_in_profile_run
(
skip_stack_funcs
:
tuple
[
str
,
...])
->
bool
:
"""
Best-effort detection for vLLM startup profiling/warmup runs.
Startup warmups often execute MoE kernels with synthetic shapes. When
enabled, skip captures from these stacks so the first capture comes from a
real request.
"""
if
not
skip_stack_funcs
:
return
False
frame
=
inspect
.
currentframe
()
try
:
while
frame
is
not
None
:
name
=
frame
.
f_code
.
co_name
if
name
in
skip_stack_funcs
:
return
True
frame
=
frame
.
f_back
finally
:
# Avoid reference cycles.
del
frame
return
False
class
_RouterCapture
:
def
__init__
(
self
,
cfg
:
RouterCaptureConfig
)
->
None
:
self
.
cfg
=
cfg
# Bucket captures by token count.
self
.
_layers_by_num_tokens
:
dict
[
int
,
dict
[
str
,
dict
[
str
,
object
]]]
=
{}
self
.
_layer_names
:
set
[
str
]
=
set
()
self
.
_completed_num_tokens
:
set
[
int
]
=
set
()
self
.
_lock
=
threading
.
Lock
()
self
.
_flush_counter
=
0
self
.
_pid
=
os
.
getpid
()
self
.
_host
=
socket
.
gethostname
()
self
.
_start_time
=
time
.
time
()
os
.
makedirs
(
cfg
.
out_dir
,
exist_ok
=
True
)
atexit
.
register
(
self
.
flush
)
def
_bucket_for_num_tokens
(
self
,
num_tokens
:
int
)
->
Optional
[
int
]:
"""Return the per-size bucket key for this record call, or None if filtered."""
if
self
.
cfg
.
num_tokens_gt
is
None
and
self
.
cfg
.
num_tokens_lt
is
None
:
return
None
if
self
.
cfg
.
num_tokens_gt
is
not
None
:
if
int
(
num_tokens
)
<=
int
(
self
.
cfg
.
num_tokens_gt
):
return
None
if
self
.
cfg
.
num_tokens_lt
is
not
None
:
if
int
(
num_tokens
)
>=
int
(
self
.
cfg
.
num_tokens_lt
):
return
None
bucket_num_tokens
=
int
(
num_tokens
)
if
bucket_num_tokens
!=
0
and
bucket_num_tokens
in
self
.
_completed_num_tokens
:
return
None
return
bucket_num_tokens
def
_snapshot_layers_by_num_tokens
(
self
,
layers_by_num_tokens
:
dict
[
int
,
dict
[
str
,
dict
[
str
,
object
]]],
)
->
dict
[
int
,
dict
[
str
,
dict
[
str
,
object
]]]:
snapshot
:
dict
[
int
,
dict
[
str
,
dict
[
str
,
object
]]]
=
{}
for
num_tokens
,
bucket
in
layers_by_num_tokens
.
items
():
bucket_snapshot
:
dict
[
str
,
dict
[
str
,
object
]]
=
{}
for
layer_name
,
state
in
bucket
.
items
():
chunks
=
state
.
get
(
"router_logits_chunks"
,
[])
bucket_snapshot
[
layer_name
]
=
{
"num_experts"
:
int
(
state
.
get
(
"num_experts"
,
0
)),
"num_tokens"
:
int
(
state
.
get
(
"num_tokens"
,
0
)),
"router_logits_chunks"
:
list
(
chunks
),
}
snapshot
[
int
(
num_tokens
)]
=
bucket_snapshot
return
snapshot
@
torch
.
no_grad
()
def
record
(
self
,
layer_name
:
str
,
router_logits
:
torch
.
Tensor
,
top_k
:
int
)
->
None
:
if
self
.
cfg
.
skip_profile
and
_in_profile_run
(
self
.
cfg
.
skip_stack_funcs
):
return
if
self
.
cfg
.
only_rank
is
not
None
:
rank
=
_get_rank
()
if
rank
is
not
None
and
rank
!=
self
.
cfg
.
only_rank
:
return
if
router_logits
.
dim
()
!=
2
:
return
num_tokens
,
num_experts
=
router_logits
.
shape
if
num_tokens
==
0
or
num_experts
==
0
:
return
bucket_num_tokens
=
self
.
_bucket_for_num_tokens
(
int
(
num_tokens
))
if
bucket_num_tokens
is
None
:
return
# Limit the number of recorded layers to avoid unbounded dumps.
if
layer_name
not
in
self
.
_layer_names
:
if
self
.
cfg
.
max_layers
!=
0
and
len
(
self
.
_layer_names
)
>=
self
.
cfg
.
max_layers
:
return
self
.
_layer_names
.
add
(
layer_name
)
# Store on CPU to avoid consuming GPU memory during long runs.
# fp16 is typically sufficient because we primarily care about
# distribution and relative ordering (top-k), not exact values.
router_logits_cpu
=
router_logits
.
detach
()
if
router_logits_cpu
.
is_cuda
:
router_logits_cpu
=
router_logits_cpu
.
to
(
device
=
"cpu"
,
dtype
=
torch
.
float16
)
else
:
router_logits_cpu
=
router_logits_cpu
.
to
(
dtype
=
torch
.
float16
)
bucket_snapshot
:
Optional
[
dict
[
str
,
dict
[
str
,
object
]]]
=
None
should_flush
=
False
with
self
.
_lock
:
bucket
=
self
.
_layers_by_num_tokens
.
setdefault
(
bucket_num_tokens
,
{})
if
layer_name
in
bucket
:
return
bucket
[
layer_name
]
=
{
"num_experts"
:
int
(
num_experts
),
"num_tokens"
:
int
(
num_tokens
),
"router_logits_chunks"
:
[
router_logits_cpu
],
}
if
self
.
cfg
.
max_layers
!=
0
and
len
(
bucket
)
>=
int
(
self
.
cfg
.
max_layers
):
should_flush
=
True
bucket_snapshot
=
self
.
_snapshot_layers_by_num_tokens
(
{
int
(
bucket_num_tokens
):
bucket
})[
int
(
bucket_num_tokens
)]
self
.
_completed_num_tokens
.
add
(
int
(
bucket_num_tokens
))
self
.
_layers_by_num_tokens
.
pop
(
int
(
bucket_num_tokens
),
None
)
if
should_flush
and
bucket_snapshot
is
not
None
:
self
.
_flush_payload
(
layers_by_num_tokens
=
{
int
(
bucket_num_tokens
):
bucket_snapshot
},
file_tag
=
f
"nt
{
int
(
bucket_num_tokens
)
}
"
,
)
def
_flush_payload
(
self
,
*
,
layers_by_num_tokens
:
dict
[
int
,
dict
[
str
,
dict
[
str
,
object
]]],
file_tag
:
Optional
[
str
]
=
None
,
)
->
Optional
[
str
]:
if
not
self
.
cfg
.
enabled
:
return
None
if
self
.
cfg
.
only_rank
is
not
None
:
rank
=
_get_rank
()
if
rank
is
not
None
and
rank
!=
self
.
cfg
.
only_rank
:
return
None
rank
=
_get_rank
()
now
=
time
.
time
()
ts
=
time
.
strftime
(
"%Y%m%d_%H%M%S"
,
time
.
localtime
(
now
))
ts_us
=
int
(
now
*
1_000_000
)
with
self
.
_lock
:
flush_idx
=
self
.
_flush_counter
self
.
_flush_counter
+=
1
rank_str
=
f
"rank
{
rank
}
"
if
rank
is
not
None
else
"rankNA"
tag
=
f
"
{
file_tag
}
_"
if
file_tag
else
""
out_path
=
os
.
path
.
join
(
self
.
cfg
.
out_dir
,
f
"moe_router_stats_
{
tag
}{
ts_us
}
_
{
self
.
_host
}
_
{
rank_str
}
_pid
{
self
.
_pid
}
_flush
{
flush_idx
}
.pt"
,
)
layers_by_num_tokens_out
:
dict
[
str
,
object
]
=
{}
for
num_tokens
,
bucket
in
layers_by_num_tokens
.
items
():
bucket_out
:
dict
[
str
,
object
]
=
{}
for
layer_name
,
state
in
bucket
.
items
():
bucket_out
[
layer_name
]
=
{
"num_experts"
:
int
(
state
[
"num_experts"
]),
"num_tokens"
:
int
(
state
[
"num_tokens"
]),
"router_logits_chunks"
:
state
[
"router_logits_chunks"
],
# type: ignore[typeddict-item]
}
layers_by_num_tokens_out
[
str
(
int
(
num_tokens
))]
=
bucket_out
payload
:
dict
[
str
,
object
]
=
{
"meta"
:
{
"timestamp"
:
ts
,
"timestamp_us"
:
ts_us
,
"flush_index"
:
int
(
flush_idx
),
"host"
:
self
.
_host
,
"pid"
:
self
.
_pid
,
"rank"
:
rank
,
"wall_time_s"
:
float
(
now
-
self
.
_start_time
),
},
"layers_by_num_tokens"
:
layers_by_num_tokens_out
,
}
# Backward-compatible convenience field when there is a single bucket.
if
len
(
layers_by_num_tokens
)
==
1
:
(
only_bucket_key
,
)
=
layers_by_num_tokens
.
keys
()
payload
[
"layers"
]
=
layers_by_num_tokens_out
[
str
(
int
(
only_bucket_key
))]
try
:
torch
.
save
(
payload
,
out_path
)
except
Exception
:
return
None
return
out_path
def
flush
(
self
)
->
Optional
[
str
]:
with
self
.
_lock
:
if
not
self
.
_layers_by_num_tokens
:
return
None
snapshot
=
self
.
_snapshot_layers_by_num_tokens
(
self
.
_layers_by_num_tokens
)
return
self
.
_flush_payload
(
layers_by_num_tokens
=
snapshot
)
def
reset
(
self
)
->
None
:
with
self
.
_lock
:
self
.
_layers_by_num_tokens
.
clear
()
self
.
_layer_names
.
clear
()
self
.
_completed_num_tokens
.
clear
()
self
.
_start_time
=
time
.
time
()
_CAPTURE
:
Optional
[
_RouterCapture
]
=
None
_CAPTURE_DISABLED
:
bool
=
False
def
_disable_global_capture
()
->
None
:
global
_CAPTURE
,
_CAPTURE_DISABLED
_CAPTURE
=
None
_CAPTURE_DISABLED
=
True
def
_get_rank
()
->
Optional
[
int
]:
if
torch
.
distributed
.
is_available
()
and
torch
.
distributed
.
is_initialized
():
try
:
return
torch
.
distributed
.
get_rank
()
except
Exception
:
return
None
return
None
def
_get_capture
()
->
Optional
[
_RouterCapture
]:
global
_CAPTURE
,
_CAPTURE_DISABLED
if
_CAPTURE_DISABLED
:
return
None
if
_CAPTURE
is
not
None
:
return
_CAPTURE
cfg
=
RouterCaptureConfig
.
from_env
()
if
not
cfg
.
enabled
:
_disable_global_capture
()
return
None
if
cfg
.
only_rank
is
not
None
:
rank
=
_get_rank
()
if
rank
is
not
None
and
rank
!=
cfg
.
only_rank
:
_disable_global_capture
()
return
None
_CAPTURE
=
_RouterCapture
(
cfg
)
return
_CAPTURE
@
torch
.
no_grad
()
def
maybe_record_router_logits
(
*
,
layer_name
:
str
,
router_logits
:
torch
.
Tensor
,
top_k
:
int
)
->
None
:
capture
=
_get_capture
()
if
capture
is
None
:
return
capture
.
record
(
layer_name
=
layer_name
,
router_logits
=
router_logits
,
top_k
=
top_k
)
def
maybe_flush_router_capture
(
*
,
reset
:
bool
=
False
)
->
Optional
[
str
]:
"""Flush capture buffers to disk without exiting the process."""
capture
=
_get_capture
()
if
capture
is
None
:
return
None
out_path
=
capture
.
flush
()
if
out_path
is
not
None
and
reset
:
capture
.
reset
()
return
out_path
vllm/model_executor/models/qwen3_moe.py
View file @
680ee839
...
...
@@ -38,6 +38,19 @@ from vllm.distributed import get_pp_group, get_tensor_model_parallel_world_size
from
vllm.logger
import
init_logger
from
vllm.model_executor.layers.activation
import
SiluAndMul
from
vllm.model_executor.layers.fused_moe
import
FusedMoE
try
:
from
vllm.model_executor.layers.fused_moe.router_capture
import
(
maybe_record_router_logits
,
)
except
ImportError
:
def
maybe_record_router_logits
(
*
,
layer_name
:
str
,
router_logits
:
torch
.
Tensor
,
top_k
:
int
,
)
->
None
:
return
None
from
vllm.model_executor.layers.layernorm
import
RMSNorm
from
vllm.model_executor.layers.linear
import
(
MergedColumnParallelLinear
,
QKVParallelLinear
,
...
...
@@ -111,6 +124,8 @@ class Qwen3MoeSparseMoeBlock(nn.Module):
):
super
().
__init__
()
self
.
tp_size
=
get_tensor_model_parallel_world_size
()
self
.
_router_top_k
=
int
(
config
.
num_experts_per_tok
)
self
.
_router_capture_layer_name
=
prefix
if
self
.
tp_size
>
config
.
num_experts
:
raise
ValueError
(
...
...
@@ -140,6 +155,14 @@ class Qwen3MoeSparseMoeBlock(nn.Module):
# router_logits: (num_tokens, n_experts)
router_logits
,
_
=
self
.
gate
(
hidden_states
)
if
not
(
hasattr
(
torch
,
"_dynamo"
)
and
torch
.
_dynamo
.
is_compiling
()):
capture_enabled
=
envs
.
VLLM_MOE_ROUTER_CAPTURE
if
capture_enabled
:
maybe_record_router_logits
(
layer_name
=
self
.
_router_capture_layer_name
,
router_logits
=
router_logits
,
top_k
=
self
.
_router_top_k
,
)
final_hidden_states
=
self
.
experts
(
hidden_states
=
hidden_states
,
router_logits
=
router_logits
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment