Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
fc5bfc66
Commit
fc5bfc66
authored
Oct 10, 2025
by
maxiao1
Committed by
lizhigong
Oct 10, 2025
Browse files
keep same version
parent
12291212
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
125 additions
and
64 deletions
+125
-64
vllm/attention/layer.py
vllm/attention/layer.py
+8
-8
vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py
...ted/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py
+66
-27
vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_engine.py
...ibuted/kv_transfer/kv_connector/v1/p2p/p2p_nccl_engine.py
+2
-0
vllm/two_batch_overlap/v1/model_input_split_v1.py
vllm/two_batch_overlap/v1/model_input_split_v1.py
+20
-15
vllm/two_batch_overlap/v1/two_batch_overlap_v1.py
vllm/two_batch_overlap/v1/two_batch_overlap_v1.py
+14
-1
vllm/v1/worker/gpu_model_runner.py
vllm/v1/worker/gpu_model_runner.py
+3
-2
vllm/version.py
vllm/version.py
+12
-11
No files found.
vllm/attention/layer.py
View file @
fc5bfc66
...
...
@@ -414,10 +414,10 @@ def unified_attention(
output
=
self
.
impl
.
forward
(
self
,
query
,
key
,
value
,
kv_cache
,
attn_metadata
)
#
if envs.VLLM_ENABLE_TBO:
#
tbo_maybe_save_kv_layer_to_connector(layer_name, kv_cache)
#
else:
maybe_save_kv_layer_to_connector
(
layer_name
,
kv_cache
)
if
envs
.
VLLM_ENABLE_TBO
:
tbo_maybe_save_kv_layer_to_connector
(
layer_name
,
kv_cache
)
else
:
maybe_save_kv_layer_to_connector
(
layer_name
,
kv_cache
)
return
output
...
...
@@ -462,10 +462,10 @@ def unified_attention_with_output(
attn_metadata
,
output
=
output
,
output_scale
=
output_scale
)
#
if envs.VLLM_ENABLE_TBO:
#
tbo_maybe_save_kv_layer_to_connector(layer_name, kv_cache)
#
else:
maybe_save_kv_layer_to_connector
(
layer_name
,
kv_cache
)
if
envs
.
VLLM_ENABLE_TBO
:
tbo_maybe_save_kv_layer_to_connector
(
layer_name
,
kv_cache
)
else
:
maybe_save_kv_layer_to_connector
(
layer_name
,
kv_cache
)
def
unified_attention_with_output_fake
(
...
...
vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_connector.py
View file @
fc5bfc66
...
...
@@ -18,7 +18,7 @@ from vllm.forward_context import get_forward_context
from
vllm.logger
import
init_logger
from
vllm.v1.attention.backends.mla.common
import
MLACommonMetadata
from
vllm.v1.core.sched.output
import
SchedulerOutput
from
vllm.two_batch_overlap.v1.two_batch_overlap_v1
import
tbo_get_done_event
if
TYPE_CHECKING
:
from
vllm.attention.backends.abstract
import
AttentionMetadata
from
vllm.forward_context
import
ForwardContext
...
...
@@ -274,8 +274,8 @@ class P2pNcclConnector(KVConnectorBase_V1):
Assume the shape of the layer is (2, num_pages, page_size, xxx)
if MLA is not used, and (num_pages, page_size, xxx) otherwise.
"""
if
envs
.
VLLM_ENABLE_TBO
:
slot_mapping
=
slot_mapping
.
pin_memory
().
to
(
device
=
layer
.
device
,
non_blocking
=
True
)
#
if envs.VLLM_ENABLE_TBO:
#
slot_mapping = slot_mapping.pin_memory().to(device=layer.device, non_blocking=True)
if
isinstance
(
attn_metadata
,
MLACommonMetadata
):
num_pages
,
page_size
=
layer
.
shape
[
0
],
layer
.
shape
[
1
]
return
layer
.
reshape
(
num_pages
*
page_size
,
-
1
)[
slot_mapping
,
...
...
@@ -286,34 +286,73 @@ class P2pNcclConnector(KVConnectorBase_V1):
connector_metadata
=
self
.
_get_connector_metadata
()
assert
isinstance
(
connector_metadata
,
P2pNcclConnectorMetadata
)
for
request
in
connector_metadata
.
requests
:
request_id
=
request
.
request_id
ip
,
port
=
self
.
parse_request_id
(
request_id
,
True
)
remote_address
=
ip
+
":"
+
str
(
port
+
self
.
_rank
)
kv_cache
=
extract_kv_from_layer
(
kv_layer
,
request
.
slot_mapping
)
pp_rank
=
(
self
.
parallel_config
.
rank
//
self
.
parallel_config
.
tensor_parallel_size
)
%
self
.
parallel_config
.
pipeline_parallel_size
if
(
self
.
pp_size
==
1
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
elif
(
self
.
pp_size
==
2
):
if
(
pp_rank
==
0
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
if
envs
.
VLLM_ENABLE_TBO
:
send_stream
=
self
.
p2p_nccl_engine
.
send_stream
for
request
in
connector_metadata
.
requests
:
request_id
=
request
.
request_id
ip
,
port
=
self
.
parse_request_id
(
request_id
,
True
)
remote_address
=
ip
+
":"
+
str
(
port
+
self
.
_rank
)
kv_cache
=
extract_kv_from_layer
(
kv_layer
,
request
.
slot_mapping
)
# tbo_evt = torch.cuda.Event(enable_timing=False)
# tbo_evt.record()
# with torch.cuda.stream(send_stream):
# send_stream.wait_event(tbo_evt) # 等 TBO all_reduce_stream 完成本轮
# kv_cache.record_stream(send_stream)
pp_rank
=
(
self
.
parallel_config
.
rank
//
self
.
parallel_config
.
tensor_parallel_size
)
%
\
self
.
parallel_config
.
pipeline_parallel_size
if
(
self
.
pp_size
==
1
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
self
.
_rank
+
4
))
kv_cache
,
remote_address
)
elif
(
self
.
pp_size
==
2
):
if
(
pp_rank
==
0
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
self
.
_rank
+
4
))
else
:
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
self
.
_rank
-
4
))
elif
(
self
.
pp_size
==
8
):
for
i
in
range
(
8
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
i
))
else
:
print
(
"Error: only suppprt pp1 pp2 pp8!!!!!!"
)
else
:
for
request
in
connector_metadata
.
requests
:
request_id
=
request
.
request_id
ip
,
port
=
self
.
parse_request_id
(
request_id
,
True
)
remote_address
=
ip
+
":"
+
str
(
port
+
self
.
_rank
)
kv_cache
=
extract_kv_from_layer
(
kv_layer
,
request
.
slot_mapping
)
pp_rank
=
(
self
.
parallel_config
.
rank
//
self
.
parallel_config
.
tensor_parallel_size
)
%
self
.
parallel_config
.
pipeline_parallel_size
if
(
self
.
pp_size
==
1
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
self
.
_rank
-
4
))
elif
(
self
.
pp_size
==
8
):
for
i
in
range
(
8
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
i
))
else
:
print
(
"Error: only suppprt pp1 pp2 pp8!!!!!!"
)
elif
(
self
.
pp_size
==
2
):
if
(
pp_rank
==
0
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
self
.
_rank
+
4
))
else
:
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
remote_address
)
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
self
.
_rank
-
4
))
elif
(
self
.
pp_size
==
8
):
for
i
in
range
(
8
):
self
.
p2p_nccl_engine
.
send_tensor
(
request_id
+
"#"
+
layer_name
,
kv_cache
,
ip
+
":"
+
str
(
port
+
i
))
else
:
print
(
"Error: only suppprt pp1 pp2 pp8!!!!!!"
)
def
wait_for_save
(
self
):
pass
...
...
vllm/distributed/kv_transfer/kv_connector/v1/p2p/p2p_nccl_engine.py
View file @
fc5bfc66
...
...
@@ -20,6 +20,7 @@ from vllm.distributed.device_communicators.pynccl_wrapper import (
from
vllm.distributed.kv_transfer.kv_connector.v1.p2p.tensor_memory_pool
import
(
# noqa: E501
TensorMemoryPool
)
from
vllm.utils
import
current_stream
,
get_ip
from
vllm.two_batch_overlap.v1.two_batch_overlap_v1
import
all_reduce_stream
as
tbo_all_reduce_stream
if
TYPE_CHECKING
:
from
vllm.forward_context
import
ForwardContext
...
...
@@ -110,6 +111,7 @@ class P2pNcclEngine:
self
.
recv_store_cv
=
threading
.
Condition
()
self
.
send_stream
=
torch
.
cuda
.
Stream
()
# self.send_stream = tbo_all_reduce_stream
self
.
recv_stream
=
torch
.
cuda
.
Stream
()
mem_pool_size_gb
=
self
.
config
.
get_from_extra_config
(
...
...
vllm/two_batch_overlap/v1/model_input_split_v1.py
View file @
fc5bfc66
...
...
@@ -159,7 +159,9 @@ def prepare_tbo_atten_metadata(
# The block_table for RIGHT starts from (req_offset-1).
# Align both offsets to that, and re-build the seq_lens for row-0.
seq_len_offset
=
req_offset
-
1
query_start_offset
=
req_offset
-
1
# query_start_offset = req_offset - 1
query_start_offset
=
req_offset
# row-0 is the split request (global row index = req_offset-1):
base_hist
=
runner
.
input_batch
.
num_computed_tokens_cpu
[
req_offset
-
1
].
item
()
...
...
@@ -180,7 +182,8 @@ def prepare_tbo_atten_metadata(
else
:
# RIGHT without split-in-req: natural positions
seq_len_offset
=
req_offset
query_start_offset
=
req_offset
# query_start_offset = req_offset
query_start_offset
=
req_offset
+
1
seq_lens_cpu_local
=
torch
.
as_tensor
(
default_seq_lens
,
device
=
runner
.
seq_lens_cpu
.
device
)
# Copy query_start_loc into global GPU buffer window
...
...
@@ -201,8 +204,10 @@ def prepare_tbo_atten_metadata(
runner
.
seq_lens
[
seq_len_offset
+
num_reqs
:].
fill_
(
0
)
# Build common metadata (pass CLONES to avoid aliasing between threads)
query_start_loc
=
runner
.
query_start_loc
[
query_start_offset
:
query_start_offset
+
num_reqs
+
1
].
clone
()
seq_lens
=
runner
.
seq_lens
[
seq_len_offset
:
seq_len_offset
+
num_reqs
].
clone
()
# query_start_loc = runner.query_start_loc[query_start_offset: query_start_offset + num_reqs + 1].clone()
# seq_lens = runner.seq_lens[seq_len_offset : seq_len_offset + num_reqs].clone()
query_start_loc
=
runner
.
query_start_loc
[
query_start_offset
:
query_start_offset
+
num_reqs
+
1
]
seq_lens
=
runner
.
seq_lens
[
seq_len_offset
:
seq_len_offset
+
num_reqs
]
common_attn_metadata
=
CommonAttentionMetadata
(
query_start_loc
=
query_start_loc
,
seq_lens
=
seq_lens
,
...
...
@@ -319,20 +324,20 @@ def tbo_split_and_execute_model(
# === Added: split inputs_embeds & intermediate_tensors per half; setup KV connector ===
# 真实 token
real_L
=
int
(
input_split
.
scheduler_output_left
.
total_num_scheduled_tokens
)
real_R
=
int
(
input_split
.
scheduler_output_right
.
total_num_scheduled_tokens
)
num_tokens_left
=
int
(
input_split
.
scheduler_output_left
.
total_num_scheduled_tokens
)
num_tokens_right
=
int
(
input_split
.
scheduler_output_right
.
total_num_scheduled_tokens
)
# 按左右半批切成两份
def
_split_i
t
(
it
,
l
,
r
):
def
_split_i
ntermediate_tensors
(
it
,
l
,
r
):
if
it
is
None
:
return
None
,
None
l
m
,
rm
=
{},
{}
for
k
,
v
in
it
.
tensors
.
items
():
vl
,
vr
=
torch
.
split
(
v
[:
l
+
r
],
[
l
,
r
],
dim
=
0
)
l
m
[
k
],
rm
[
k
]
=
vl
,
vr
return
IntermediateTensors
(
l
m
),
IntermediateTensors
(
r
m
)
intermediate_tensors_left
,
intermediate_tensors_right
=
_split_i
t
(
intermediate_tensors
,
real_L
,
real_R
l
eft_tensor_map
,
right_tensor_map
=
{},
{}
for
name
,
tensor
in
it
.
tensors
.
items
():
vl
,
vr
=
torch
.
split
(
tensor
[:
l
+
r
],
[
l
,
r
],
dim
=
0
)
l
eft_tensor_map
[
name
],
right_tensor_map
[
name
]
=
vl
,
vr
return
IntermediateTensors
(
l
eft_tensor_map
),
IntermediateTensors
(
r
ight_tensor_map
)
intermediate_tensors_left
,
intermediate_tensors_right
=
_split_i
ntermediate_tensors
(
intermediate_tensors
,
num_tokens_left
,
num_tokens_right
)
with
set_forward_context
(
attn_metadata
,
...
...
vllm/two_batch_overlap/v1/two_batch_overlap_v1.py
View file @
fc5bfc66
...
...
@@ -220,6 +220,7 @@ def tbo_all_reduce_v1(obj):
all_reduce_stream
.
wait_event
(
event_c2t
)
output
=
tensor_model_parallel_all_reduce
(
obj
)
event_t2c
.
record
()
#tbo_mark_allreduce_done()
tbo_obj_v1
.
tbo_thread_synchronize
(
tid
)
tbo_step_stream
.
wait_event
(
event_t2c
)
return
output
...
...
@@ -280,10 +281,22 @@ def tbo_model_executable_v1(
return
hidden_or_intermediate_states
_tbo_done_event
=
torch
.
cuda
.
Event
(
enable_timing
=
False
)
def
tbo_mark_allreduce_done
():
"""Record completion of all_reduce_stream for external synchronization."""
global
all_reduce_stream
,
_tbo_done_event
_tbo_done_event
.
record
(
all_reduce_stream
)
def
tbo_get_done_event
():
"""Return the event recorded by all_reduce_stream."""
return
_tbo_done_event
def
finalize_two_batch_overlap
():
global
tbo_obj_v1
if
tbo_obj_v1
is
not
None
:
try
:
tbo_obj_v1
.
shutdown
()
finally
:
tbo_obj_v1
=
None
tbo_obj_v1
=
None
\ No newline at end of file
vllm/v1/worker/gpu_model_runner.py
View file @
fc5bfc66
...
...
@@ -69,7 +69,7 @@ from vllm.v1.worker.gpu_input_batch import CachedRequestState, InputBatch
from
vllm.v1.worker.lora_model_runner_mixin
import
LoRAModelRunnerMixin
from
vllm.platforms
import
current_platform
from
vllm.two_batch_overlap.v1.model_input_split_v1
import
tbo_split_and_execute_model
from
vllm.profiler.prof
import
profile
from
..sample.logits_processor
import
LogitsProcessorManager
from
.utils
import
(
gather_mm_placeholders
,
initialize_kv_cache_for_kv_sharing
,
sanity_check_mm_encoder_outputs
,
scatter_mm_placeholders
)
...
...
@@ -1295,6 +1295,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
scheduler_output
:
"SchedulerOutput"
,
intermediate_tensors
:
Optional
[
IntermediateTensors
]
=
None
,
)
->
Union
[
ModelRunnerOutput
,
IntermediateTensors
]:
profile
.
StartTracer
()
self
.
_update_states
(
scheduler_output
)
if
not
scheduler_output
.
total_num_scheduled_tokens
:
if
not
has_kv_transfer_group
():
...
...
@@ -1573,7 +1574,7 @@ class GPUModelRunner(LoRAModelRunnerMixin):
get_kv_transfer_group
().
clear_connector_metadata
()
self
.
eplb_step
()
print
(
'###valid_sampled_token_ids'
,
valid_sampled_token_ids
)
return
ModelRunnerOutput
(
req_ids
=
self
.
input_batch
.
req_ids
,
req_id_to_index
=
self
.
input_batch
.
req_id_to_index
,
...
...
vllm/version.py
View file @
fc5bfc66
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
try
:
from
._version
import
__version__
,
__version_tuple__
__version__
=
"0.9.2"
__version_tuple__
=
(
0
,
9
,
2
)
__hcu_version__
=
f
'0.9.2+das.opt1.rc2.dtk25041'
from
vllm.version
import
__version__
,
__version_tuple__
,
__hcu_version__
except
Exception
as
e
:
import
warnings
warnings
.
warn
(
f
"Failed to read commit hash:
\n
{
e
}
"
,
warnings
.
warn
(
f
"Failed to read commit hash:
\n
+ str(e)
"
,
RuntimeWarning
,
stacklevel
=
2
)
__version__
=
"dev"
__version_tuple__
=
(
0
,
0
,
__version__
)
def
_prev_minor_version_was
(
version_str
):
"""
Check whether a given version matches the previous minor version.
'''
Check whether a given version matches the previous minor version.
Return True if version_str matches the previous minor version.
...
...
@@ -23,19 +24,19 @@ def _prev_minor_version_was(version_str):
supplied version_str is '0.6'.
Used for --show-hidden-metrics-for-version.
"""
'''
# Match anything if this is a dev tree
if
__version_tuple__
[
0
:
2
]
==
(
0
,
0
):
return
True
# Note - this won't do the right thing when we release 1.0!
assert
__version_tuple__
[
0
]
==
0
#
assert __version_tuple__[0] == 0
assert
isinstance
(
__version_tuple__
[
1
],
int
)
return
version_str
==
f
"
{
__version_tuple__
[
0
]
}
.
{
__version_tuple__
[
1
]
-
1
}
"
def
_prev_minor_version
():
"""
For the purpose of testing, return a previous minor version number.
"""
'''
For the purpose of testing, return a previous minor version number.
'''
# In dev tree, this will return "0.-1", but that will work fine"
assert
isinstance
(
__version_tuple__
[
1
],
int
)
return
f
"
{
__version_tuple__
[
0
]
}
.
{
__version_tuple__
[
1
]
-
1
}
"
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment