Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
DeepEP
Commits
0f80da84
Commit
0f80da84
authored
Apr 10, 2025
by
fujianhao.fjh
Browse files
fix: not output result in some linux system
parent
42494864
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
20 additions
and
20 deletions
+20
-20
tests/test_internode.py
tests/test_internode.py
+9
-9
tests/test_intranode.py
tests/test_intranode.py
+9
-9
tests/test_low_latency.py
tests/test_low_latency.py
+2
-2
No files found.
tests/test_internode.py
View file @
0f80da84
...
...
@@ -76,7 +76,7 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in
t
=
bench
(
lambda
:
buffer
.
get_dispatch_layout
(
topk_idx
,
num_experts
))[
0
]
if
local_rank
==
0
:
print
(
f
'[layout] Kernel performance:
{
t
*
1000
:.
3
f
}
ms'
,
flush
=
True
)
print
()
print
(
''
,
flush
=
True
)
group
.
barrier
()
time
.
sleep
(
1
)
...
...
@@ -163,7 +163,7 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in
if
local_rank
==
0
:
print
(
' passed'
,
flush
=
True
)
if
local_rank
==
0
:
print
()
print
(
''
,
flush
=
True
)
# Tune dispatch performance
best_dispatch_results
=
None
...
...
@@ -180,10 +180,10 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in
if
t
<
best_time
:
best_time
,
best_results
=
t
,
(
num_sms
,
nvl_chunk_size
,
rdma_chunk_size
)
if
local_rank
==
0
:
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
, RDMA chunk
{
rdma_chunk_size
}
:
{
rdma_send_bytes
/
1e9
/
t
:.
2
f
}
GB/s (RDMA),
{
nvl_recv_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
)
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
, RDMA chunk
{
rdma_chunk_size
}
:
{
rdma_send_bytes
/
1e9
/
t
:.
2
f
}
GB/s (RDMA),
{
nvl_recv_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
,
flush
=
True
)
if
local_rank
==
0
:
print
(
f
'[tuning] Best dispatch (
{
"FP8"
if
isinstance
(
current_x
,
tuple
)
else
"BF16"
}
): SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
, RDMA chunk
{
best_results
[
2
]
}
:
{
rdma_send_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (RDMA),
{
nvl_recv_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
)
print
()
print
(
f
'[tuning] Best dispatch (
{
"FP8"
if
isinstance
(
current_x
,
tuple
)
else
"BF16"
}
): SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
, RDMA chunk
{
best_results
[
2
]
}
:
{
rdma_send_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (RDMA),
{
nvl_recv_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
,
flush
=
True
)
print
(
''
,
flush
=
True
)
if
isinstance
(
current_x
,
tuple
):
# Gather FP8 the best config from rank 0
...
...
@@ -206,13 +206,13 @@ def test_main(num_sms: int, local_rank: int, num_local_ranks: int, num_ranks: in
tune_args
=
{
'x'
:
recv_x
,
'handle'
:
handle
,
'config'
:
config
}
t
=
bench
(
lambda
:
buffer
.
combine
(
**
tune_args
))[
0
]
if
local_rank
==
0
:
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
, RDMA chunk
{
rdma_chunk_size
}
:
{
combine_bf16_rdma_recv_bytes
/
1e9
/
t
:.
2
f
}
GB/s (RDMA),
{
combine_bf16_nvl_send_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
)
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
, RDMA chunk
{
rdma_chunk_size
}
:
{
combine_bf16_rdma_recv_bytes
/
1e9
/
t
:.
2
f
}
GB/s (RDMA),
{
combine_bf16_nvl_send_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
,
flush
=
True
)
if
t
<
best_time
:
best_time
,
best_results
=
t
,
(
num_sms
,
nvl_chunk_size
,
rdma_chunk_size
)
if
local_rank
==
0
:
print
(
f
'[tuning] Best combine: SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
, RDMA chunk
{
best_results
[
2
]
}
:
{
combine_bf16_rdma_recv_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (RDMA),
{
combine_bf16_nvl_send_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
)
print
()
print
(
f
'[tuning] Best combine: SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
, RDMA chunk
{
best_results
[
2
]
}
:
{
combine_bf16_rdma_recv_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (RDMA),
{
combine_bf16_nvl_send_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
,
flush
=
True
)
print
(
''
,
flush
=
True
)
# noinspection PyUnboundLocalVariable
...
...
@@ -231,7 +231,7 @@ def test_loop(local_rank: int, num_local_ranks: int):
for
i
in
(
24
,
):
test_main
(
i
,
local_rank
,
num_local_ranks
,
num_ranks
,
num_nodes
,
rank
,
buffer
,
group
)
if
local_rank
==
0
:
print
()
print
(
''
,
flush
=
True
)
# Test compatibility with low latency functions
if
test_ll_compatibility
:
...
...
tests/test_intranode.py
View file @
0f80da84
...
...
@@ -60,7 +60,7 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer:
t
=
bench
(
lambda
:
buffer
.
get_dispatch_layout
(
topk_idx
,
num_experts
))[
0
]
if
local_rank
==
0
:
print
(
f
'[layout] Kernel performance:
{
t
*
1000
:.
3
f
}
ms'
,
flush
=
True
)
print
()
print
(
''
,
flush
=
True
)
group
.
barrier
()
time
.
sleep
(
1
)
...
...
@@ -145,7 +145,7 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer:
if
local_rank
==
0
:
print
(
' passed'
,
flush
=
True
)
if
local_rank
==
0
:
print
()
print
(
''
,
flush
=
True
)
# Tune dispatch performance
best_dispatch_results
=
None
...
...
@@ -160,10 +160,10 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer:
if
t
<
best_time
:
best_time
,
best_results
=
t
,
(
num_sms
,
nvl_chunk_size
)
if
local_rank
==
0
:
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
:
{
nvl_recv_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
)
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
:
{
nvl_recv_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
,
flush
=
True
)
if
local_rank
==
0
:
print
(
f
'[tuning] Best dispatch (
{
"FP8"
if
isinstance
(
current_x
,
tuple
)
else
"BF16"
}
): SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
,
{
nvl_recv_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
)
print
()
print
(
f
'[tuning] Best dispatch (
{
"FP8"
if
isinstance
(
current_x
,
tuple
)
else
"BF16"
}
): SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
,
{
nvl_recv_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
,
flush
=
True
)
print
(
''
,
flush
=
True
)
if
isinstance
(
current_x
,
tuple
):
# Gather FP8 the best config from rank 0
...
...
@@ -185,13 +185,13 @@ def test_main(num_sms: int, local_rank: int, num_ranks: int, rank: int, buffer:
tune_args
=
{
'x'
:
recv_x
,
'handle'
:
handle
,
'config'
:
config
}
t
=
bench
(
lambda
:
buffer
.
combine
(
**
tune_args
))[
0
]
if
local_rank
==
0
:
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
:
{
combine_bf16_nvl_send_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
)
print
(
f
'[tuning] SMs
{
num_sms
}
, NVL chunk
{
nvl_chunk_size
}
:
{
combine_bf16_nvl_send_bytes
/
1e9
/
t
:.
2
f
}
GB/s (NVL) '
,
flush
=
True
)
if
t
<
best_time
:
best_time
,
best_results
=
t
,
(
num_sms
,
nvl_chunk_size
)
if
local_rank
==
0
:
print
(
f
'[tuning] Best combine: SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
:
{
combine_bf16_nvl_send_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
)
print
()
print
(
f
'[tuning] Best combine: SMs
{
best_results
[
0
]
}
, NVL chunk
{
best_results
[
1
]
}
:
{
combine_bf16_nvl_send_bytes
/
1e9
/
best_time
:.
2
f
}
GB/s (NVL)'
,
flush
=
True
)
print
(
''
,
flush
=
True
)
# noinspection PyUnboundLocalVariable
...
...
@@ -209,7 +209,7 @@ def test_loop(local_rank: int, num_local_ranks: int):
for
i
in
(
24
,
):
test_main
(
i
,
local_rank
,
num_ranks
,
rank
,
buffer
,
group
)
if
local_rank
==
0
:
print
()
print
(
''
,
flush
=
True
)
# Test compatibility with low latency functions
if
test_ll_compatibility
:
...
...
tests/test_low_latency.py
View file @
0f80da84
...
...
@@ -137,10 +137,10 @@ def test_main(num_tokens: int, hidden: int, num_experts: int, num_topk: int,
suppress_kineto_output
=
True
)
if
not
return_recv_hook
:
print
(
f
'[rank
{
rank
}
] Dispatch bandwidth:
{
num_dispatch_comm_bytes
/
1e9
/
dispatch_t
:.
2
f
}
GB/s, avg_t=
{
dispatch_t
*
1e6
:.
2
f
}
us | '
f
'Combine bandwidth:
{
num_combine_comm_bytes
/
1e9
/
combine_t
:.
2
f
}
GB/s, avg_t=
{
combine_t
*
1e6
:.
2
f
}
us'
)
f
'Combine bandwidth:
{
num_combine_comm_bytes
/
1e9
/
combine_t
:.
2
f
}
GB/s, avg_t=
{
combine_t
*
1e6
:.
2
f
}
us'
,
flush
=
True
)
else
:
print
(
f
'[rank
{
rank
}
] Dispatch send/recv time:
{
dispatch_t
*
2
*
1e6
:.
2
f
}
us | '
f
'Combine send/recv time:
{
combine_t
*
2
*
1e6
:.
2
f
}
us'
)
f
'Combine send/recv time:
{
combine_t
*
2
*
1e6
:.
2
f
}
us'
,
flush
=
True
)
return
hash_value
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment