Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
5d423ddc
Commit
5d423ddc
authored
Dec 23, 2025
by
zhuwenwen
Browse files
Merge branch 'v0.9.2-dev' of
http://10.16.6.30/dcutoolkit/deeplearing/vllm
into v0.9.2-dev
parents
e80dcabe
3c117f20
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
31 additions
and
7 deletions
+31
-7
vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py
...ted/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py
+31
-7
No files found.
vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py
View file @
5d423ddc
...
@@ -90,6 +90,8 @@ class P2pNcclConnector(KVConnectorBase_V1):
...
@@ -90,6 +90,8 @@ class P2pNcclConnector(KVConnectorBase_V1):
if
role
==
KVConnectorRole
.
WORKER
else
0
if
role
==
KVConnectorRole
.
WORKER
else
0
self
.
_local_rank
=
get_world_group
().
local_rank
\
self
.
_local_rank
=
get_world_group
().
local_rank
\
if
role
==
KVConnectorRole
.
WORKER
else
0
if
role
==
KVConnectorRole
.
WORKER
else
0
self
.
_tp_rank
=
get_tp_group
().
rank_in_group
\
if
role
==
KVConnectorRole
.
WORKER
else
0
self
.
p2p_nccl_engine
=
P2pNcclEngine
(
self
.
p2p_nccl_engine
=
P2pNcclEngine
(
local_rank
=
self
.
_local_rank
,
local_rank
=
self
.
_local_rank
,
...
@@ -105,9 +107,19 @@ class P2pNcclConnector(KVConnectorBase_V1):
...
@@ -105,9 +107,19 @@ class P2pNcclConnector(KVConnectorBase_V1):
self
.
pp_size
=
self
.
parallel_config
.
pipeline_parallel_size
self
.
pp_size
=
self
.
parallel_config
.
pipeline_parallel_size
self
.
tp_size
=
self
.
parallel_config
.
tensor_parallel_size
self
.
tp_size
=
self
.
parallel_config
.
tensor_parallel_size
self
.
num_card
=
self
.
pp_size
*
self
.
tp_size
self
.
num_card
=
self
.
pp_size
*
self
.
tp_size
self
.
multiple_machines
=
1
if
self
.
num_card
>
8
else
0
if
self
.
is_producer
and
self
.
multiple_machines
==
1
:
self
.
remote_tp_size
=
self
.
config
.
get_from_extra_config
(
"remote_tp_size"
,
self
.
tp_size
)
self
.
remote_pp_size
=
self
.
config
.
get_from_extra_config
(
"remote_pp_size"
,
self
.
pp_size
)
self
.
enable_asymmetric_p2p
=
self
.
config
.
get_from_extra_config
(
"enable_asymmetric_p2p"
,
False
)
self
.
remote_num_card
=
self
.
remote_tp_size
*
self
.
remote_pp_size
self
.
multiple_machines_d
=
1
if
self
.
remote_num_card
>
8
else
0
self
.
multiple_machines_p
=
1
if
self
.
num_card
>
8
else
0
if
self
.
is_producer
and
self
.
multiple_machines_p
==
1
:
self
.
ip_map
=
{}
self
.
ip_map
=
{}
self
.
duplicate_keys
=
[]
self
.
duplicate_keys
=
[]
config_file
=
os
.
getenv
(
'IP_CONFIG_FILE'
)
config_file
=
os
.
getenv
(
'IP_CONFIG_FILE'
)
...
@@ -417,7 +429,7 @@ class P2pNcclConnector(KVConnectorBase_V1):
...
@@ -417,7 +429,7 @@ class P2pNcclConnector(KVConnectorBase_V1):
pp_rank
=
(
self
.
parallel_config
.
rank
//
self
.
parallel_config
.
tensor_parallel_size
pp_rank
=
(
self
.
parallel_config
.
rank
//
self
.
parallel_config
.
tensor_parallel_size
)
%
self
.
parallel_config
.
pipeline_parallel_size
)
%
self
.
parallel_config
.
pipeline_parallel_size
if
(
self
.
multiple_machines
):
if
(
self
.
multiple_machines
_p
and
self
.
multiple_machines_d
):
ip_second
=
self
.
get_ip_value
(
ip
)
ip_second
=
self
.
get_ip_value
(
ip
)
if
(
self
.
pp_size
==
1
):
if
(
self
.
pp_size
==
1
):
if
self
.
_rank
<
8
:
if
self
.
_rank
<
8
:
...
@@ -433,8 +445,16 @@ class P2pNcclConnector(KVConnectorBase_V1):
...
@@ -433,8 +445,16 @@ class P2pNcclConnector(KVConnectorBase_V1):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
str
(
ip_second
)
+
":"
+
str
(
port
+
self
.
_rank
))
kv_cache
,
str
(
ip_second
)
+
":"
+
str
(
port
+
self
.
_rank
))
else
:
else
:
print
(
"Error: only suppprt pp1 pp2 !!!!!!"
)
logger
.
error
(
"Error: multiple machines only suppprt pp1tp16 and pp2tp8!!!!!!"
)
else
:
elif
(
self
.
multiple_machines_p
and
not
self
.
multiple_machines_d
):
if
(
self
.
pp_size
==
2
):
remote_address
=
ip
+
":"
+
str
(
port
+
self
.
_tp_rank
)
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
else
:
logger
.
error
(
"Error: P multiple machines D machine only suppprt P:pp2tp8 D:tp8 !!!!!!"
)
elif
(
not
self
.
multiple_machines_p
and
not
self
.
multiple_machines_d
):
if
(
self
.
pp_size
==
1
):
if
(
self
.
pp_size
==
1
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
kv_cache
,
remote_address
)
...
@@ -453,9 +473,13 @@ class P2pNcclConnector(KVConnectorBase_V1):
...
@@ -453,9 +473,13 @@ class P2pNcclConnector(KVConnectorBase_V1):
for
i
in
range
(
8
):
for
i
in
range
(
8
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
i
))
kv_cache
,
ip
+
":"
+
str
(
port
+
i
))
elif
(
self
.
enable_asymmetric_p2p
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
else
:
else
:
print
(
"Error: only suppprt pp1 pp2 pp8!!!!!!"
)
logger
.
error
(
"Error: P/D single machine only suppprt multiple tp:: (P: pp2tp4 D:tp8 P:pp8tp1 D:tp8) !!!!!!"
)
else
:
logger
.
error
(
"Error: not support!!!!!!"
)
def
wait_for_save
(
self
):
def
wait_for_save
(
self
):
pass
pass
# if self.is_producer:
# if self.is_producer:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment