Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
675ba75f
Commit
675ba75f
authored
Apr 07, 2025
by
zhuwenwen
Browse files
Merge tag 'v0.8.3' into v0.8.3-ori
parents
5cc98918
296c6572
Changes
501
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
1110 additions
and
521 deletions
+1110
-521
vllm/compilation/compiler_interface.py
vllm/compilation/compiler_interface.py
+53
-5
vllm/compilation/fusion.py
vllm/compilation/fusion.py
+2
-2
vllm/config.py
vllm/config.py
+175
-82
vllm/device_allocator/cumem.py
vllm/device_allocator/cumem.py
+24
-14
vllm/distributed/device_communicators/cpu_communicator.py
vllm/distributed/device_communicators/cpu_communicator.py
+117
-12
vllm/distributed/device_communicators/custom_all_reduce.py
vllm/distributed/device_communicators/custom_all_reduce.py
+42
-49
vllm/distributed/device_communicators/shm_broadcast.py
vllm/distributed/device_communicators/shm_broadcast.py
+7
-2
vllm/distributed/device_communicators/tpu_communicator.py
vllm/distributed/device_communicators/tpu_communicator.py
+6
-1
vllm/distributed/kv_transfer/kv_connector/factory.py
vllm/distributed/kv_transfer/kv_connector/factory.py
+5
-0
vllm/distributed/kv_transfer/kv_connector/mooncake_store_connector.py
...uted/kv_transfer/kv_connector/mooncake_store_connector.py
+216
-0
vllm/distributed/kv_transfer/kv_lookup_buffer/base.py
vllm/distributed/kv_transfer/kv_lookup_buffer/base.py
+75
-10
vllm/distributed/kv_transfer/kv_lookup_buffer/mooncake_store.py
...istributed/kv_transfer/kv_lookup_buffer/mooncake_store.py
+160
-0
vllm/distributed/parallel_state.py
vllm/distributed/parallel_state.py
+4
-1
vllm/distributed/utils.py
vllm/distributed/utils.py
+14
-4
vllm/engine/arg_utils.py
vllm/engine/arg_utils.py
+86
-232
vllm/engine/async_llm_engine.py
vllm/engine/async_llm_engine.py
+7
-4
vllm/engine/llm_engine.py
vllm/engine/llm_engine.py
+27
-21
vllm/engine/metrics.py
vllm/engine/metrics.py
+85
-78
vllm/engine/multiprocessing/__init__.py
vllm/engine/multiprocessing/__init__.py
+3
-2
vllm/engine/multiprocessing/client.py
vllm/engine/multiprocessing/client.py
+2
-2
No files found.
vllm/compilation/compiler_interface.py
View file @
675ba75f
# SPDX-License-Identifier: Apache-2.0
import
contextlib
import
copy
import
hashlib
import
importlib.metadata
import
os
from
contextlib
import
ExitStack
from
typing
import
Any
,
Callable
,
Dict
,
List
,
Optional
,
Tuple
...
...
@@ -9,6 +11,7 @@ from unittest.mock import patch
import
torch
import
torch._inductor.compile_fx
import
torch.fx
as
fx
from
packaging.version
import
Version
from
vllm.config
import
VllmConfig
...
...
@@ -139,10 +142,12 @@ class InductorAdaptor(CompilerInterface):
from
torch._inductor.codecache
import
torch_key
torch_factors
=
torch_key
()
factors
.
append
(
torch_factors
)
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()[:
10
]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()[:
10
]
return
hash_str
def
initialize_cache
(
self
,
cache_dir
:
str
,
disable_cache
:
bool
=
False
):
self
.
cache_dir
=
cache_dir
if
disable_cache
:
return
# redirect the cache directory to a sub-directory
...
...
@@ -155,7 +160,6 @@ class InductorAdaptor(CompilerInterface):
triton_cache
=
os
.
path
.
join
(
cache_dir
,
"triton_cache"
)
os
.
makedirs
(
triton_cache
,
exist_ok
=
True
)
os
.
environ
[
"TRITON_CACHE_DIR"
]
=
triton_cache
self
.
cache_dir
=
cache_dir
def
compile
(
self
,
...
...
@@ -228,7 +232,20 @@ class InductorAdaptor(CompilerInterface):
inductor_compiled_graph
=
output
if
inductor_compiled_graph
is
not
None
:
nonlocal
file_path
file_path
=
inductor_compiled_graph
.
current_callable
.
__code__
.
co_filename
# noqa
compiled_fn
=
inductor_compiled_graph
.
current_callable
file_path
=
compiled_fn
.
__code__
.
co_filename
# noqa
if
not
file_path
.
startswith
(
self
.
cache_dir
):
# hooked in the align_inputs_from_check_idxs function
# in torch/_inductor/utils.py
for
cell
in
compiled_fn
.
__closure__
:
if
not
callable
(
cell
.
cell_contents
):
continue
code
=
cell
.
cell_contents
.
__code__
if
code
.
co_filename
.
startswith
(
self
.
cache_dir
):
# this is the real file path
# compiled from Inductor
file_path
=
code
.
co_filename
break
hash_str
=
inductor_compiled_graph
.
_fx_graph_cache_key
return
output
...
...
@@ -271,6 +288,9 @@ class InductorAdaptor(CompilerInterface):
"torch._inductor.codecache.FxGraphCache._check_can_cache"
,
_check_can_cache
))
# Dynamo metrics context, see method for more details.
stack
.
enter_context
(
self
.
metrics_context
())
compiled_graph
=
compile_fx
(
graph
,
example_inputs
,
...
...
@@ -295,8 +315,14 @@ class InductorAdaptor(CompilerInterface):
hash_str
=
handle
[
0
]
from
torch._inductor.codecache
import
FxGraphCache
with
patch
(
"torch._inductor.codecache.FxGraphCache._get_shape_env"
,
lambda
*
args
,
**
kwargs
:
AlwaysHitShapeEnv
()):
with
ExitStack
()
as
exit_stack
:
exit_stack
.
enter_context
(
patch
(
"torch._inductor.codecache.FxGraphCache._get_shape_env"
,
lambda
*
args
,
**
kwargs
:
AlwaysHitShapeEnv
()))
# Dynamo metrics context, see method for more details.
exit_stack
.
enter_context
(
self
.
metrics_context
())
if
torch
.
__version__
.
startswith
(
"2.5"
):
inductor_compiled_graph
=
FxGraphCache
.
_lookup_graph
(
hash_str
,
example_inputs
,
True
,
False
)
...
...
@@ -337,6 +363,28 @@ class InductorAdaptor(CompilerInterface):
return
compiled_graph
def
metrics_context
(
self
)
->
contextlib
.
AbstractContextManager
:
"""
This method returns the Dynamo metrics context (if it exists,
otherwise a null context). It is used by various compile components.
Present in torch>=2.6, it's used inside FxGraphCache in
torch==2.6 (but not after). It might also be used in various other
torch.compile internal functions.
Because it is re-entrant, we always set it (even if entering via Dynamo
and the context was already entered). We might want to revisit if it
should be set at a different level of compilation.
This is likely a bug in PyTorch: public APIs should not rely on
manually setting up internal contexts. But we also rely on non-public
APIs which might not provide these guarantees.
"""
if
Version
(
importlib
.
metadata
.
version
(
'torch'
))
>=
Version
(
"2.6"
):
import
torch._dynamo.utils
return
torch
.
_dynamo
.
utils
.
get_metrics_context
()
else
:
return
contextlib
.
nullcontext
()
class
EagerAdaptor
(
CompilerInterface
):
name
=
"eager"
...
...
vllm/compilation/fusion.py
View file @
675ba75f
...
...
@@ -4,8 +4,6 @@ from typing import Callable, Dict, List, NamedTuple, Optional, Tuple
import
torch
import
torch._inductor.pattern_matcher
as
pm
# TODO(luka) use vllm.utils once #10836 landed
from
compressed_tensors.quantization
import
FP8_DTYPE
from
torch
import
fx
from
torch._higher_order_ops.auto_functionalize
import
auto_functionalized
from
torch._inductor.pattern_matcher
import
PatternMatcherPass
...
...
@@ -13,12 +11,14 @@ from torch._ops import OpOverload
from
vllm.config
import
CompilationConfig
from
vllm.logger
import
init_logger
from
vllm.platforms
import
current_platform
from
.fx_utils
import
find_getitem_maybe
from
.multi_output_match
import
MultiOutputMatch
from
.vllm_inductor_pass
import
VllmInductorPass
logger
=
init_logger
(
__name__
)
FP8_DTYPE
=
current_platform
.
fp8_dtype
()
def
empty_bf16
(
*
args
,
**
kwargs
):
...
...
vllm/config.py
View file @
675ba75f
...
...
@@ -29,7 +29,7 @@ from vllm.logger import init_logger
from
vllm.model_executor.layers.quantization
import
(
QUANTIZATION_METHODS
,
get_quantization_config
)
from
vllm.model_executor.models
import
ModelRegistry
from
vllm.platforms
import
CpuArchEnum
from
vllm.platforms
import
CpuArchEnum
,
current_platform
from
vllm.sampling_params
import
GuidedDecodingParams
from
vllm.tracing
import
is_otel_available
,
otel_import_error_traceback
from
vllm.transformers_utils.config
import
(
...
...
@@ -38,9 +38,10 @@ from vllm.transformers_utils.config import (
get_sentence_transformer_tokenizer_config
,
is_encoder_decoder
,
try_get_generation_config
,
uses_mrope
)
from
vllm.transformers_utils.s3_utils
import
S3Model
from
vllm.transformers_utils.utils
import
is_s3
from
vllm.transformers_utils.utils
import
is_s3
,
maybe_model_redirect
from
vllm.utils
import
(
GiB_bytes
,
LayerBlockType
,
cuda_device_count_stateless
,
get_cpu_memory
,
random_uuid
,
resolve_obj_by_qualname
)
get_cpu_memory
,
get_open_port
,
random_uuid
,
resolve_obj_by_qualname
)
if
TYPE_CHECKING
:
from
ray.util.placement_group
import
PlacementGroup
...
...
@@ -221,6 +222,9 @@ class ModelConfig:
factors
.
append
(
self
.
trust_remote_code
)
factors
.
append
(
self
.
rope_scaling
)
factors
.
append
(
self
.
rope_theta
)
# rope cos/sin cache depends on the max_position_embeddings
factors
.
append
(
getattr
(
self
.
hf_config
,
"max_position_embeddings"
,
"None"
))
return
hashlib
.
sha256
(
str
(
factors
).
encode
()).
hexdigest
()
def
__init__
(
...
...
@@ -263,9 +267,13 @@ class ModelConfig:
override_generation_config
:
Optional
[
dict
[
str
,
Any
]]
=
None
,
model_impl
:
Union
[
str
,
ModelImpl
]
=
ModelImpl
.
AUTO
,
)
->
None
:
self
.
model
=
model
self
.
model
=
maybe_model_redirect
(
model
)
self
.
tokenizer
=
maybe_model_redirect
(
tokenizer
)
self
.
hf_config_path
=
hf_config_path
self
.
tokenizer
=
tokenizer
if
isinstance
(
hf_config_path
,
str
):
self
.
hf_config_path
=
maybe_model_redirect
(
hf_config_path
)
self
.
tokenizer_mode
=
tokenizer_mode
self
.
trust_remote_code
=
trust_remote_code
self
.
allowed_local_media_path
=
allowed_local_media_path
...
...
@@ -309,8 +317,8 @@ class ModelConfig:
)
and
backend
==
"FLASHINFER"
and
find_spec
(
"flashinfer"
)
is
None
:
raise
ValueError
(
"VLLM_ATTENTION_BACKEND is set to FLASHINFER, but flashinfer "
"module was not found."
"
See
https://github.com/vllm-project/vllm/blob/main/Dockerfile
"
"module was not found.
See
"
"https://github.com/vllm-project/vllm/blob/main/
docker/
Dockerfile
"
# noqa: E501
"for instructions on how to install it."
)
# The tokenizer version is consistent with the model version by default.
...
...
@@ -346,6 +354,8 @@ class ModelConfig:
self
.
hf_config
=
hf_config
self
.
hf_text_config
=
get_hf_text_config
(
self
.
hf_config
)
self
.
attention_chunk_size
=
getattr
(
self
.
hf_text_config
,
"attention_chunk_size"
,
None
)
self
.
encoder_config
=
self
.
_get_encoder_config
()
self
.
hf_image_processor_config
=
get_hf_image_processor_config
(
self
.
model
,
revision
)
...
...
@@ -403,6 +413,7 @@ class ModelConfig:
self
.
is_attention_free
=
self
.
_init_attention_free
()
self
.
is_hybrid
=
self
.
_init_is_hybrid
()
self
.
has_noops
=
self
.
_init_has_noops
()
self
.
has_inner_state
=
self
.
_init_has_inner_state
()
if
current_platform
.
is_neuron
():
...
...
@@ -502,6 +513,10 @@ class ModelConfig:
def
_init_is_hybrid
(
self
)
->
bool
:
return
self
.
registry
.
is_hybrid_model
(
self
.
architectures
)
def
_init_has_noops
(
self
)
->
bool
:
architectures
=
getattr
(
self
.
hf_config
,
"architectures"
,
[])
return
self
.
registry
.
is_noops_model
(
architectures
)
def
_init_has_inner_state
(
self
)
->
bool
:
return
self
.
registry
.
model_has_inner_state
(
self
.
architectures
)
...
...
@@ -671,11 +686,19 @@ class ModelConfig:
self
.
max_seq_len_to_capture
=
self
.
max_model_len
self
.
max_seq_len_to_capture
=
min
(
self
.
max_seq_len_to_capture
,
self
.
max_model_len
)
ROCM_UNSUPPORTED_MODELS
=
[
'mllama'
]
if
(
self
.
hf_config
.
model_type
in
ROCM_UNSUPPORTED_MODELS
and
not
self
.
enforce_eager
and
current_platform
.
is_rocm
()):
logger
.
warning
(
"CUDA graph is not supported for %s on ROCm yet, fallback "
"to the eager mode."
,
self
.
hf_config
.
model_type
)
self
.
enforce_eager
=
True
def
_verify_bnb_config
(
self
)
->
None
:
"""
The current version of bitsandbytes (0.4
4.0
) with 8-bit models does not
The current version of bitsandbytes (0.4
5.3
) with 8-bit models does not
yet support CUDA graph.
# TODO Remove this when bitsandbytes supports.
"""
is_bitsandbytes
=
self
.
quantization
==
"bitsandbytes"
has_quantization_config
=
(
getattr
(
self
.
hf_config
,
...
...
@@ -690,8 +713,9 @@ class ModelConfig:
not
self
.
enforce_eager
,
]):
logger
.
warning
(
"CUDA graph is not supported on BitAndBytes 8bit yet, "
"CUDA graph is not supported on Bit
s
AndBytes 8bit yet, "
"fallback to the eager mode."
)
self
.
enforce_eager
=
True
def
_verify_with_expert_parallelism
(
self
)
->
None
:
...
...
@@ -746,6 +770,12 @@ class ModelConfig:
self
,
parallel_config
:
"ParallelConfig"
,
)
->
None
:
if
parallel_config
.
distributed_executor_backend
==
"external_launcher"
:
assert
self
.
seed
is
not
None
,
(
"Seed must be set when using external launcher backend to "
"make sure sampling results are the same across workers."
)
total_num_attention_heads
=
getattr
(
self
.
hf_text_config
,
"num_attention_heads"
,
0
)
tensor_parallel_size
=
parallel_config
.
tensor_parallel_size
...
...
@@ -797,10 +827,18 @@ class ModelConfig:
@
property
def
is_deepseek_mla
(
self
)
->
bool
:
return
(
hasattr
(
self
.
hf_text_config
,
"model_type"
))
\
and
(
self
.
hf_text_config
.
model_type
in
\
(
'deepseek_v2'
,
'deepseek_v3'
,
'deepseek_mtp'
))
\
and
(
self
.
hf_text_config
.
kv_lora_rank
is
not
None
)
if
not
hasattr
(
self
.
hf_text_config
,
"model_type"
):
return
False
elif
self
.
hf_text_config
.
model_type
in
\
(
'deepseek_v2'
,
'deepseek_v3'
,
'deepseek_mtp'
):
return
self
.
hf_text_config
.
kv_lora_rank
is
not
None
elif
self
.
hf_text_config
.
model_type
==
'eagle'
:
# if the model is an EAGLE module, check for the
# underlying architecture
return
self
.
hf_text_config
.
model
.
model_type
in
\
(
'deepseek_v2'
,
'deepseek_v3'
)
\
and
self
.
hf_text_config
.
kv_lora_rank
is
not
None
return
False
def
get_head_size
(
self
)
->
int
:
# TODO remove hard code
...
...
@@ -854,6 +892,14 @@ class ModelConfig:
return
getattr
(
self
.
hf_config
.
attn_config
,
"kv_n_heads"
,
self
.
hf_config
.
num_attention_heads
)
if
self
.
hf_config
.
model_type
==
"nemotron-nas"
:
for
block
in
self
.
hf_config
.
block_configs
:
if
not
block
.
attention
.
no_op
:
return
self
.
hf_config
.
num_attention_heads
\
//
block
.
attention
.
n_heads_in_group
raise
RuntimeError
(
"Couldn't determine number of kv heads"
)
if
self
.
is_attention_free
:
return
0
...
...
@@ -922,7 +968,9 @@ class ModelConfig:
# This function relies on 'layers_block_type' in hf_config,
# for w/o this attribute, we will need to have workarounds like so
attn_block_type
=
block_type
==
LayerBlockType
.
attention
is_transformer
=
not
self
.
is_hybrid
and
not
self
.
is_attention_free
is_transformer
=
not
self
.
is_hybrid
and
\
not
self
.
has_noops
and
\
not
self
.
is_attention_free
start
,
end
=
self
.
get_layers_start_end_indices
(
parallel_config
)
if
is_transformer
:
...
...
@@ -933,27 +981,39 @@ class ModelConfig:
# Note that this code assumes there
# is only one type of attention-free block type.
return
0
if
attn_block_type
else
end
-
start
elif
self
.
has_noops
:
block_configs
=
self
.
hf_config
.
block_configs
return
sum
(
not
bc
.
attention
.
no_op
for
bc
in
block_configs
[
start
:
end
])
else
:
# Hybrid model
# Hybrid model
Jamba
layers_block_type_value
=
getattr
(
self
.
hf_config
,
"layers_block_type"
,
None
)
if
layers_block_type_value
is
None
:
raise
ValueError
(
"The model is an hybrid without a "
"layers_block_type in the hf_config, "
"cannot determine the num of "
f
"
{
block_type
.
value
}
layers"
)
if
hasattr
(
self
.
hf_text_config
,
"model_type"
)
and
(
self
.
hf_text_config
.
model_type
==
"zamba2"
):
if
attn_block_type
:
return
sum
(
t
==
"hybrid"
for
t
in
layers_block_type_value
[
start
:
end
])
else
:
return
self
.
get_num_layers
(
parallel_config
)
if
layers_block_type_value
is
not
None
:
if
hasattr
(
self
.
hf_text_config
,
"model_type"
)
and
(
self
.
hf_text_config
.
model_type
==
"zamba2"
):
if
attn_block_type
:
return
sum
(
t
==
"hybrid"
for
t
in
layers_block_type_value
[
start
:
end
])
else
:
return
self
.
get_num_layers
(
parallel_config
)
return
sum
(
t
==
block_type
.
value
for
t
in
layers_block_type_value
[
start
:
end
])
return
sum
(
t
==
block_type
.
value
for
t
in
layers_block_type_value
[
start
:
end
])
# Hybrid model Minimax
attn_type_list
=
getattr
(
self
.
hf_config
,
"attn_type_list"
,
None
)
if
attn_type_list
:
return
sum
(
t
==
1
for
t
in
attn_type_list
[
start
:
end
])
if
layers_block_type_value
is
None
and
attn_type_list
is
None
:
raise
ValueError
(
"The model is an hybrid without a"
"layers_block_type or an attn_type_list in the hf_config,"
"cannot determine the num of "
f
"
{
block_type
.
value
}
layers"
)
return
sum
(
t
==
1
for
t
in
attn_type_list
[
start
:
end
])
def
get_multimodal_config
(
self
)
->
"MultiModalConfig"
:
"""
...
...
@@ -1079,8 +1139,7 @@ class CacheConfig:
is_attention_free: Whether the model is attention-free.
num_gpu_blocks_override: Number of GPU blocks to use. This overrides the
profiled num_gpu_blocks if specified. Does nothing if None.
sliding_window: Sliding window size for the KV cache. Can not work with
prefix caching enabled.
sliding_window: Sliding window size for the KV cache.
enable_prefix_caching: Whether to enable prefix caching.
cpu_offload_gb: Size of the CPU offload buffer in GiB.
"""
...
...
@@ -1100,7 +1159,8 @@ class CacheConfig:
factors
:
list
[
Any
]
=
[]
factors
.
append
(
self
.
cache_dtype
)
# `cpu_offload_gb` does not use `torch.compile` yet.
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__init__
(
...
...
@@ -1113,6 +1173,7 @@ class CacheConfig:
num_gpu_blocks_override
:
Optional
[
int
]
=
None
,
sliding_window
:
Optional
[
int
]
=
None
,
enable_prefix_caching
:
bool
=
False
,
prefix_caching_hash_algo
:
str
=
"builtin"
,
cpu_offload_gb
:
float
=
0
,
calculate_kv_scales
:
Optional
[
bool
]
=
None
,
)
->
None
:
...
...
@@ -1124,6 +1185,7 @@ class CacheConfig:
self
.
is_attention_free
=
is_attention_free
self
.
sliding_window
=
sliding_window
self
.
enable_prefix_caching
=
enable_prefix_caching
self
.
prefix_caching_hash_algo
=
prefix_caching_hash_algo
self
.
cpu_offload_gb
=
cpu_offload_gb
self
.
calculate_kv_scales
=
calculate_kv_scales
self
.
_verify_args
()
...
...
@@ -1174,6 +1236,13 @@ class CacheConfig:
"Prefix caching is not supported with sliding window. "
"Run with --disable-sliding-window to use prefix caching."
)
if
self
.
enable_prefix_caching
and
self
.
prefix_caching_hash_algo
not
in
(
"builtin"
,
"sha256"
):
raise
ValueError
(
"Unknown prefix caching hash algorithm: "
f
"
{
self
.
prefix_caching_hash_algo
}
. Must be either "
"'builtin' or 'sha256'."
)
def
verify_with_parallel_config
(
self
,
parallel_config
:
"ParallelConfig"
,
...
...
@@ -1223,7 +1292,8 @@ class TokenizerPoolConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__post_init__
(
self
):
...
...
@@ -1334,7 +1404,8 @@ class LoadConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__post_init__
(
self
):
...
...
@@ -1362,6 +1433,8 @@ class ParallelConfig:
tensor_parallel_size
:
int
=
1
# Number of tensor parallel groups.
data_parallel_size
:
int
=
1
# Number of data parallel groups.
data_parallel_rank
:
int
=
0
# Rank of the data parallel group.
# Local rank of the data parallel group, defaults to global rank.
data_parallel_rank_local
:
Optional
[
int
]
=
None
# IP of the data parallel master.
data_parallel_master_ip
:
str
=
"127.0.0.1"
data_parallel_master_port
:
int
=
29500
# Port of the data parallel master.
...
...
@@ -1466,10 +1539,18 @@ class ParallelConfig:
self
.
world_size
=
self
.
pipeline_parallel_size
*
\
self
.
tensor_parallel_size
self
.
data_parallel_size
=
envs
.
VLLM_DP_SIZE
self
.
data_parallel_rank
=
envs
.
VLLM_DP_RANK
self
.
data_parallel_master_ip
=
envs
.
VLLM_DP_MASTER_IP
self
.
data_parallel_master_port
=
envs
.
VLLM_DP_MASTER_PORT
if
self
.
data_parallel_size
>
1
:
# Data parallel was specified in the engine args.
self
.
data_parallel_master_port
=
get_open_port
()
# TODO multi-node
else
:
# Otherwise fall back to env vars (e.g. for offline SPMD case).
self
.
data_parallel_size
=
envs
.
VLLM_DP_SIZE
self
.
data_parallel_rank
=
envs
.
VLLM_DP_RANK
self
.
data_parallel_rank_local
=
envs
.
VLLM_DP_RANK_LOCAL
self
.
data_parallel_master_ip
=
envs
.
VLLM_DP_MASTER_IP
self
.
data_parallel_master_port
=
envs
.
VLLM_DP_MASTER_PORT
self
.
world_size_across_dp
=
self
.
world_size
*
self
.
data_parallel_size
if
self
.
distributed_executor_backend
==
"external_launcher"
:
...
...
@@ -1547,11 +1628,12 @@ class ParallelConfig:
if
self
.
use_ray
:
from
vllm.executor
import
ray_utils
ray_utils
.
assert_ray_available
()
if
current_platform
.
is_rocm
():
if
not
current_platform
.
use_custom_allreduce
():
self
.
disable_custom_all_reduce
=
True
logger
.
info
(
"Disabled the custom all-reduce kernel because it is not "
"supported on
AMD GPUs
."
)
"supported on
current platform
."
)
if
self
.
ray_workers_use_nsight
and
not
self
.
use_ray
:
raise
ValueError
(
"Unable to use nsight profiling unless workers "
"run with Ray."
)
...
...
@@ -1654,7 +1736,8 @@ class SchedulerConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__post_init__
(
self
)
->
None
:
...
...
@@ -1790,7 +1873,8 @@ class DeviceConfig:
# the device/platform information will be summarized
# by torch/vllm automatically.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__init__
(
self
,
device
:
str
=
"auto"
)
->
None
:
...
...
@@ -1799,7 +1883,10 @@ class DeviceConfig:
from
vllm.platforms
import
current_platform
self
.
device_type
=
current_platform
.
device_type
if
not
self
.
device_type
:
raise
RuntimeError
(
"Failed to infer device type"
)
raise
RuntimeError
(
"Failed to infer device type, please set "
"the environment variable `VLLM_LOGGING_LEVEL=DEBUG` "
"to turn on verbose logging to help debug the issue."
)
else
:
# Device type is assigned explicitly
self
.
device_type
=
device
...
...
@@ -1963,7 +2050,8 @@ class SpeculativeConfig:
# no factors to consider.
# spec decode does not use `torch.compile` yet.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
@
classmethod
...
...
@@ -1985,14 +2073,13 @@ class SpeculativeConfig:
def
__post_init__
(
self
):
# Note: After next release, the method parameter will be used to
# specify the speculative method, which helps to extend the
# configuration of non-model-based proposers, and the model parameter
# will be used when the draft model or head is needed.
# If users do not specify the method, the speculative method will
# be detected automatically if possible. If the speculative method can
# not be detected, it will be considered as the draft-model-based
# method by default.
# Note: "method" is a new parameter that helps to extend the
# configuration of non-model-based proposers, and the "model" parameter
# will be used to set the draft model, eagle head, or additional weight
# when needed. If users do not specify "method", the speculative method
# will be detected automatically if possible. If the speculative method
# can not be detected, it will be considered as the "draft_model" by
# default.
if
self
.
model
is
None
and
self
.
num_speculative_tokens
is
not
None
:
# TODO(Shangming): Refactor mtp configuration logic when supporting
...
...
@@ -2007,8 +2094,8 @@ class SpeculativeConfig:
raise
ValueError
(
"num_speculative_tokens was provided without "
"speculative model."
)
# Automatically configure the
ngram
method
during configuration
#
refactoring to ensure a smooth transition.
# Automatically configure the method
for ngram when "model" is used
#
instead of "method"
if
self
.
method
is
None
and
(
self
.
model
is
not
None
and
self
.
model
in
(
"ngram"
,
"[ngram]"
)):
self
.
method
=
"ngram"
...
...
@@ -2090,9 +2177,10 @@ class SpeculativeConfig:
# Replace hf_config for EAGLE draft_model
if
self
.
method
==
"eagle"
:
if
self
.
enable_chunked_prefill
:
if
self
.
enable_chunked_prefill
and
not
envs
.
VLLM_USE_V1
:
raise
ValueError
(
"Chunked prefill and EAGLE are not compatible."
)
"Chunked prefill and EAGLE are not compatible "
"when using V0."
)
from
vllm.transformers_utils.configs.eagle
import
(
EAGLEConfig
)
...
...
@@ -2297,12 +2385,10 @@ class SpeculativeConfig:
return
self
.
num_speculative_tokens
def
__repr__
(
self
)
->
str
:
if
self
.
prompt_lookup_max
is
not
None
and
self
.
prompt_lookup_max
>
0
:
draft_model
=
"ngram"
else
:
draft_model
=
self
.
draft_model_config
.
model
method
=
self
.
method
model
=
None
if
method
==
"ngram"
else
self
.
draft_model_config
.
model
num_spec_tokens
=
self
.
num_speculative_tokens
return
f
"SpeculativeConfig(
{
draft_
model
=
}
,
{
num_spec_tokens
=
}
)"
return
f
"SpeculativeConfig(
{
method
=
}
,
{
model
=
}
,
{
num_spec_tokens
=
}
)"
@
dataclass
...
...
@@ -2338,7 +2424,8 @@ class LoRAConfig:
factors
.
append
(
self
.
lora_extra_vocab_size
)
factors
.
append
(
self
.
long_lora_scaling_factors
)
factors
.
append
(
self
.
bias_enabled
)
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__post_init__
(
self
):
...
...
@@ -2364,9 +2451,9 @@ class LoRAConfig:
f
"max_loras (
{
self
.
max_loras
}
)"
)
def
verify_with_cache_config
(
self
,
cache_config
:
CacheConfig
):
# TODO LoRA supports CPU offload.
if
cache_config
.
cpu_offload_gb
>
0
:
raise
ValueError
(
"CPU offload is not supported with LoRA yet
."
)
if
cache_config
.
cpu_offload_gb
>
0
and
not
envs
.
VLLM_USE_V1
:
raise
ValueError
(
"V0 LoRA does not support CPU offload, please use V1
."
)
def
verify_with_model_config
(
self
,
model_config
:
ModelConfig
):
if
self
.
lora_dtype
in
(
None
,
"auto"
):
...
...
@@ -2404,7 +2491,8 @@ class PromptAdapterConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__post_init__
(
self
):
...
...
@@ -2449,7 +2537,8 @@ class MultiModalConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
get_limit_per_prompt
(
self
,
modality
:
str
)
->
int
:
...
...
@@ -2515,7 +2604,8 @@ class PoolerConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
@
staticmethod
...
...
@@ -2652,6 +2742,10 @@ def _get_and_verify_max_len(
max_len_key
=
key
if
max_len
<
derived_max_model_len
\
else
max_len_key
derived_max_model_len
=
min
(
derived_max_model_len
,
max_len
)
# For Command-R / Cohere, Cohere2 / Aya Vision models
if
tmp_max_len
:
=
getattr
(
hf_config
,
"model_max_length"
,
None
):
max_len_key
=
"model_max_length"
derived_max_model_len
=
tmp_max_len
# If sliding window is manually disabled, max_length should be less
# than the sliding window length in the model config.
...
...
@@ -2796,7 +2890,8 @@ class DecodingConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__post_init__
(
self
):
...
...
@@ -2846,7 +2941,8 @@ class ObservabilityConfig:
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
def
__post_init__
(
self
):
...
...
@@ -2908,7 +3004,8 @@ class KVTransferConfig(BaseModel):
# no factors to consider.
# this config will not affect the computation graph.
factors
:
list
[
Any
]
=
[]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()
return
hash_str
@
classmethod
...
...
@@ -2936,12 +3033,6 @@ class KVTransferConfig(BaseModel):
return
self
.
kv_connector
is
not
None
and
\
self
.
kv_role
in
[
"kv_producer"
,
"kv_consumer"
,
"kv_both"
]
@
property
def
need_kv_parallel_group
(
self
)
->
bool
:
# for those database-based connector, vLLM does not need to create
# parallel group, and in that case the kv parallel size will be 1.
return
self
.
kv_connector
is
not
None
and
self
.
kv_parallel_size
>
1
@
property
def
is_kv_producer
(
self
)
->
bool
:
return
self
.
kv_connector
is
not
None
and
\
...
...
@@ -3405,7 +3496,8 @@ class VllmConfig:
vllm_factors
.
append
(
"None"
)
factors
.
append
(
vllm_factors
)
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
()).
hexdigest
()[:
10
]
hash_str
=
hashlib
.
md5
(
str
(
factors
).
encode
(),
usedforsecurity
=
False
).
hexdigest
()[:
10
]
return
hash_str
def
pad_for_cudagraph
(
self
,
batch_size
:
int
)
->
int
:
...
...
@@ -3517,9 +3609,10 @@ class VllmConfig:
if
self
.
cache_config
is
not
None
and
\
self
.
cache_config
.
cpu_offload_gb
>
0
and
\
self
.
compilation_config
.
level
!=
CompilationLevel
.
NO_COMPILATION
:
self
.
compilation_config
.
level
!=
CompilationLevel
.
NO_COMPILATION
\
and
not
envs
.
VLLM_USE_V1
:
logger
.
warning
(
"CPU offload is not supported with `torch.compile` yet."
"CPU offload is not supported with `torch.compile`
in v0
yet."
" Disabling `torch.compile`."
)
self
.
compilation_config
.
level
=
CompilationLevel
.
NO_COMPILATION
...
...
vllm/device_allocator/cumem.py
View file @
675ba75f
...
...
@@ -8,6 +8,7 @@
# not sure why, they are created from a different context.
# the only successful approach is to call cuda driver API in C.
import
dataclasses
import
gc
import
os
from
contextlib
import
contextmanager
from
typing
import
Any
,
Callable
,
Dict
,
Optional
,
Tuple
,
Union
...
...
@@ -175,7 +176,7 @@ class CuMemAllocator:
str
]]
=
None
)
->
None
:
"""
Put the allocator in sleep mode.
All data in the memory allocation with the specified tag will be
All data in the memory allocation with the specified tag will be
offloaded to CPU memory, and others will be discarded.
:param offload_tags: The tags of the memory allocation that will be
...
...
@@ -204,28 +205,37 @@ class CuMemAllocator:
data
.
cpu_backup_tensor
=
cpu_backup_tensor
unmap_and_release
(
handle
)
def
wake_up
(
self
):
gc
.
collect
()
torch
.
cuda
.
empty_cache
()
def
wake_up
(
self
,
tags
:
Optional
[
list
[
str
]]
=
None
)
->
None
:
"""
Wake up the allocator from sleep mode.
All data that is previously offloaded will be loaded back to GPU
memory, and the rest of the data will have empty memory."""
memory, and the rest of the data will have empty memory.
:param tags: The tags of the memory allocation that will be loaded
back to GPU memory. If None, all memory allocation will be loaded
back to GPU memory.
"""
for
ptr
,
data
in
self
.
pointer_to_data
.
items
():
handle
=
data
.
handle
create_and_map
(
handle
)
if
data
.
cpu_backup_tensor
is
not
None
:
cpu_backup_tensor
=
data
.
cpu_backup_tensor
if
cpu_backup_tensor
is
not
None
:
size_in_bytes
=
cpu_backup_tensor
.
numel
(
)
*
cpu_backup_tensor
.
element_size
()
cpu_ptr
=
cpu_backup_tensor
.
data_ptr
()
libcudart
.
cudaMemcpy
(
ptr
,
cpu_ptr
,
size_in_bytes
)
data
.
cpu_backup_tensor
=
None
if
tags
is
None
or
data
.
tag
in
tags
:
handle
=
data
.
handle
create_and_map
(
handle
)
if
data
.
cpu_backup_tensor
is
not
None
:
cpu_backup_tensor
=
data
.
cpu_backup_tensor
if
cpu_backup_tensor
is
not
None
:
size_in_bytes
=
cpu_backup_tensor
.
numel
(
)
*
cpu_backup_tensor
.
element_size
()
cpu_ptr
=
cpu_backup_tensor
.
data_ptr
()
libcudart
.
cudaMemcpy
(
ptr
,
cpu_ptr
,
size_in_bytes
)
data
.
cpu_backup_tensor
=
None
@
contextmanager
def
use_memory_pool
(
self
,
tag
:
Optional
[
str
]
=
None
):
"""
A context manager to use the memory pool.
All memory allocation created inside the context will be allocated
All memory allocation created inside the context will be allocated
in the memory pool, and has the specified tag.
:param tag: The tag of the memory allocation. If None, the default tag
...
...
vllm/distributed/device_communicators/cpu_communicator.py
View file @
675ba75f
# SPDX-License-Identifier: Apache-2.0
from
typing
import
Optional
import
os
from
typing
import
List
,
Optional
import
torch
from
torch.distributed
import
ProcessGroup
from
vllm.platforms
import
current_platform
from
vllm.platforms.interface
import
CpuArchEnum
from
.base_device_communicator
import
DeviceCommunicatorBase
...
...
@@ -16,19 +20,120 @@ class CpuCommunicator(DeviceCommunicatorBase):
device_group
:
Optional
[
ProcessGroup
]
=
None
,
unique_name
:
str
=
""
):
super
().
__init__
(
cpu_group
,
device
,
device_group
,
unique_name
)
self
.
ipex_available
=
False
self
.
dist_module
=
torch
.
distributed
try
:
import
intel_extension_for_pytorch
as
ipex
self
.
ipex_available
=
True
self
.
dist_module
=
ipex
.
distributed
except
ImportError
:
"""
Intel IPEX not found. Falling back to PyTorch native
all_reduce for CPU (e.g. MacOS)
"""
pass
if
current_platform
.
get_cpu_architecture
()
==
CpuArchEnum
.
X86
:
self
.
dist_module
=
_CPUSHMDistributed
(
self
)
def
all_reduce
(
self
,
input_
):
self
.
dist_module
.
all_reduce
(
input_
,
group
=
self
.
device_group
)
return
input_
def
gather
(
self
,
input_
:
torch
.
Tensor
,
dst
:
int
=
0
,
dim
:
int
=
-
1
)
->
Optional
[
torch
.
Tensor
]:
"""
NOTE: We assume that the input tensor is on the same device across
all the ranks.
NOTE: `dst` is the local rank of the destination rank.
"""
world_size
=
self
.
world_size
assert
-
input_
.
dim
()
<=
dim
<
input_
.
dim
(),
(
f
"Invalid dim (
{
dim
}
) for input tensor with shape
{
input_
.
size
()
}
"
)
if
dim
<
0
:
# Convert negative dim to positive.
dim
+=
input_
.
dim
()
# Allocate output tensor.
if
self
.
rank_in_group
==
dst
:
gather_list
=
[
torch
.
empty_like
(
input_
)
for
_
in
range
(
world_size
)]
else
:
gather_list
=
None
# Gather.
self
.
dist_module
.
gather
(
input_
,
gather_list
,
dst
=
self
.
ranks
[
dst
],
group
=
self
.
device_group
)
if
self
.
rank_in_group
==
dst
:
output_tensor
=
torch
.
cat
(
gather_list
,
dim
=
dim
)
else
:
output_tensor
=
None
return
output_tensor
def
all_gather
(
self
,
input_
:
torch
.
Tensor
,
dim
:
int
=
-
1
)
->
torch
.
Tensor
:
if
dim
<
0
:
# Convert negative dim to positive.
dim
+=
input_
.
dim
()
input_size
=
input_
.
size
()
# NOTE: we have to use concat-style all-gather here,
# stack-style all-gather has compatibility issues with
# torch.compile . see https://github.com/pytorch/pytorch/issues/138795
output_size
=
(
input_size
[
0
]
*
self
.
world_size
,
)
+
input_size
[
1
:]
# Allocate output tensor.
output_tensor
=
torch
.
empty
(
output_size
,
dtype
=
input_
.
dtype
,
device
=
input_
.
device
)
# All-gather.
self
.
dist_module
.
all_gather_into_tensor
(
output_tensor
,
input_
,
group
=
self
.
device_group
)
# Reshape
output_tensor
=
output_tensor
.
reshape
((
self
.
world_size
,
)
+
input_size
)
output_tensor
=
output_tensor
.
movedim
(
0
,
dim
)
output_tensor
=
output_tensor
.
reshape
(
input_size
[:
dim
]
+
(
self
.
world_size
*
input_size
[
dim
],
)
+
input_size
[
dim
+
1
:])
return
output_tensor
class
_CPUSHMDistributed
:
def
__init__
(
self
,
communicator
:
CpuCommunicator
):
instance_identifier
=
os
.
environ
[
"VLLM_DIST_IDENT"
]
self
.
communicator
=
communicator
group_ranks
=
[
str
(
rank
)
for
rank
in
self
.
communicator
.
ranks
]
shm_group_identifier
=
f
"[
{
'-'
.
join
(
group_ranks
)
}
]"
self
.
group_name
=
f
"
{
instance_identifier
}
-
{
shm_group_identifier
}
-cpushm"
self
.
handle
=
self
.
_init_cpu_shm
()
def
_init_cpu_shm
(
self
)
->
int
:
handle
=
torch
.
ops
.
_C
.
init_shm_manager
(
self
.
group_name
,
self
.
communicator
.
world_size
,
self
.
communicator
.
rank
,
)
torch
.
distributed
.
barrier
(
self
.
communicator
.
device_group
)
torch
.
ops
.
_C
.
join_shm_manager
(
handle
,
self
.
group_name
,
)
torch
.
distributed
.
barrier
(
self
.
communicator
.
device_group
)
return
handle
def
all_reduce
(
self
,
input
:
torch
.
Tensor
,
group
:
Optional
[
ProcessGroup
]
=
None
)
->
None
:
torch
.
ops
.
_C
.
shm_allreduce
(
self
.
handle
,
input
)
def
gather
(
self
,
input
:
torch
.
Tensor
,
gather_list
:
Optional
[
List
[
torch
.
Tensor
]],
dst
:
int
=
-
1
,
group
:
Optional
[
ProcessGroup
]
=
None
)
->
None
:
# Note: different from the torch gather, here we use local dst rank.
torch
.
ops
.
_C
.
shm_gather
(
self
.
handle
,
input
,
gather_list
,
torch
.
distributed
.
get_group_rank
(
group
,
dst
))
def
all_gather_into_tensor
(
self
,
output
:
torch
.
Tensor
,
input
:
torch
.
Tensor
,
group
:
Optional
[
ProcessGroup
]
=
None
)
->
None
:
torch
.
ops
.
_C
.
shm_all_gather
(
self
.
handle
,
input
,
output
)
vllm/distributed/device_communicators/custom_all_reduce.py
View file @
675ba75f
# SPDX-License-Identifier: Apache-2.0
import
ctypes
from
contextlib
import
contextmanager
from
typing
import
List
,
Optional
,
Union
...
...
@@ -10,7 +9,6 @@ from torch.distributed import ProcessGroup
import
vllm.envs
as
envs
from
vllm
import
_custom_ops
as
ops
from
vllm.distributed.device_communicators.cuda_wrapper
import
CudaRTLibrary
from
vllm.distributed.device_communicators.custom_all_reduce_utils
import
(
gpu_p2p_access_check
)
from
vllm.distributed.parallel_state
import
in_the_same_node_as
...
...
@@ -22,7 +20,7 @@ try:
ops
.
meta_size
()
custom_ar
=
True
except
Exception
:
# For
AMD GPUs and
CPUs
# For CPUs
custom_ar
=
False
logger
=
init_logger
(
__name__
)
...
...
@@ -71,7 +69,9 @@ class CustomAllreduce:
if
not
custom_ar
:
# disable because of missing custom allreduce library
# e.g. in a non-cuda environment
# e.g. in a non-GPU environment
logger
.
info
(
"Custom allreduce is disabled because "
"of missing custom allreduce library"
)
return
self
.
group
=
group
...
...
@@ -129,11 +129,10 @@ class CustomAllreduce:
# test nvlink first, this will filter out most of the cases
# where custom allreduce is not supported
# this checks hardware and driver support for NVLink
assert
current_platform
.
is_cuda
()
from
vllm.platforms.cuda
import
CudaPlatform
cuda_platform
:
CudaPlatform
=
current_platform
full_nvlink
=
cuda_platform
.
is_full_nvlink
(
physical_device_ids
)
if
world_size
>
2
and
not
full_nvlink
:
assert
current_platform
.
is_cuda_alike
()
fully_connected
=
current_platform
.
is_fully_connected
(
physical_device_ids
)
if
world_size
>
2
and
not
fully_connected
:
logger
.
warning
(
"Custom allreduce is disabled because it's not supported on"
" more than two PCIe-only GPUs. To silence this warning, "
...
...
@@ -142,7 +141,8 @@ class CustomAllreduce:
# test P2P capability, this checks software/cudaruntime support
# this is expensive to compute at the first time
# then we cache the result
if
not
_can_p2p
(
rank
,
world_size
):
# On AMD GPU, p2p is always enabled between XGMI connected GPUs
if
not
current_platform
.
is_rocm
()
and
not
_can_p2p
(
rank
,
world_size
):
logger
.
warning
(
"Custom allreduce is disabled because your platform lacks "
"GPU P2P capability or P2P test failed. To silence this "
...
...
@@ -154,7 +154,8 @@ class CustomAllreduce:
# Meta data composes of two parts: meta data for synchronization and a
# temporary buffer for storing intermediate allreduce results.
self
.
meta_ptrs
=
self
.
create_shared_buffer
(
ops
.
meta_size
()
+
max_size
,
group
=
group
)
group
=
group
,
uncached
=
True
)
# This is a pre-registered IPC buffer. In eager mode, input tensors
# are first copied into this buffer before allreduce is performed
self
.
buffer_ptrs
=
self
.
create_shared_buffer
(
max_size
,
group
=
group
)
...
...
@@ -169,46 +170,11 @@ class CustomAllreduce:
self
.
max_size
=
max_size
self
.
rank
=
rank
self
.
world_size
=
world_size
self
.
full
_nvlink
=
full_nvlink
self
.
full
y_connected
=
fully_connected
self
.
_ptr
=
ops
.
init_custom_ar
(
self
.
meta_ptrs
,
self
.
rank_data
,
rank
,
self
.
full
_nvlink
)
self
.
full
y_connected
)
ops
.
register_buffer
(
self
.
_ptr
,
self
.
buffer_ptrs
)
@
staticmethod
def
create_shared_buffer
(
size_in_bytes
:
int
,
group
:
Optional
[
ProcessGroup
]
=
None
)
->
List
[
int
]:
"""
Creates a shared buffer and returns a list of pointers
representing the buffer on all processes in the group.
"""
lib
=
CudaRTLibrary
()
pointer
=
lib
.
cudaMalloc
(
size_in_bytes
)
handle
=
lib
.
cudaIpcGetMemHandle
(
pointer
)
world_size
=
dist
.
get_world_size
(
group
=
group
)
rank
=
dist
.
get_rank
(
group
=
group
)
handles
=
[
None
]
*
world_size
dist
.
all_gather_object
(
handles
,
handle
,
group
=
group
)
pointers
:
List
[
int
]
=
[]
for
i
,
h
in
enumerate
(
handles
):
if
i
==
rank
:
pointers
.
append
(
pointer
.
value
)
# type: ignore
else
:
pointers
.
append
(
lib
.
cudaIpcOpenMemHandle
(
h
).
value
)
# type: ignore
return
pointers
@
staticmethod
def
free_shared_buffer
(
pointers
:
List
[
int
],
group
:
Optional
[
ProcessGroup
]
=
None
,
rank
:
Optional
[
int
]
=
None
)
->
None
:
if
rank
is
None
:
rank
=
dist
.
get_rank
(
group
=
group
)
lib
=
CudaRTLibrary
()
lib
.
cudaFree
(
ctypes
.
c_void_p
(
pointers
[
rank
]))
@
contextmanager
def
capture
(
self
):
"""
...
...
@@ -255,7 +221,7 @@ class CustomAllreduce:
return
False
# for 4 or more non NVLink-capable GPUs, custom allreduce provides
# little performance improvement over NCCL.
if
self
.
world_size
==
2
or
self
.
full
_nvlink
:
if
self
.
world_size
==
2
or
self
.
full
y_connected
:
return
inp_size
<
self
.
max_size
return
False
...
...
@@ -306,3 +272,30 @@ class CustomAllreduce:
def
__del__
(
self
):
self
.
close
()
@
staticmethod
def
create_shared_buffer
(
size_in_bytes
:
int
,
group
:
Optional
[
ProcessGroup
]
=
None
,
uncached
:
Optional
[
bool
]
=
False
)
->
List
[
int
]:
pointer
,
handle
=
ops
.
allocate_shared_buffer_and_handle
(
size_in_bytes
)
world_size
=
dist
.
get_world_size
(
group
=
group
)
rank
=
dist
.
get_rank
(
group
=
group
)
handles
=
[
None
]
*
world_size
dist
.
all_gather_object
(
handles
,
handle
,
group
=
group
)
pointers
:
List
[
int
]
=
[]
for
i
,
h
in
enumerate
(
handles
):
if
i
==
rank
:
pointers
.
append
(
pointer
)
# type: ignore
else
:
pointers
.
append
(
ops
.
open_mem_handle
(
h
))
return
pointers
@
staticmethod
def
free_shared_buffer
(
pointers
:
List
[
int
],
group
:
Optional
[
ProcessGroup
]
=
None
,
rank
:
Optional
[
int
]
=
0
)
->
None
:
if
rank
is
None
:
rank
=
dist
.
get_rank
(
group
=
group
)
ops
.
free_shared_buffer
(
pointers
[
rank
])
vllm/distributed/device_communicators/shm_broadcast.py
View file @
675ba75f
...
...
@@ -125,8 +125,13 @@ class ShmRingBuffer:
lambda
*
args
,
**
kwargs
:
None
):
try
:
self
.
shared_memory
=
shared_memory
.
SharedMemory
(
name
=
name
)
assert
(
self
.
shared_memory
.
size
==
self
.
total_bytes_of_buffer
)
# See https://docs.python.org/3/library/multiprocessing.shared_memory.html # noqa
# Some platforms allocate memory based on page size,
# so the shared memory block size may be larger or equal
# to the requested size. The size parameter is ignored
# when attaching to an existing block.
assert
(
self
.
shared_memory
.
size
>=
self
.
total_bytes_of_buffer
)
except
FileNotFoundError
:
# we might deserialize the object in a different node
# in this case, this object is not used,
...
...
vllm/distributed/device_communicators/tpu_communicator.py
View file @
675ba75f
...
...
@@ -22,6 +22,8 @@ if current_platform.is_tpu():
import
torch_xla.core.xla_model
as
xm
import
torch_xla.runtime
as
xr
from
torch_xla._internal
import
pjrt
from
torch_xla.distributed.xla_multiprocessing
import
(
create_optimized_replica_groups
)
if
USE_RAY
:
from
vllm.executor
import
ray_utils
...
...
@@ -79,9 +81,12 @@ class TpuCommunicator(DeviceCommunicatorBase):
pjrt
.
initialize_multiprocess
(
local_rank
,
local_world_size
)
xr
.
_init_world_size_ordinal
()
self
.
groups
=
create_optimized_replica_groups
()
def
all_reduce
(
self
,
input_
:
torch
.
Tensor
)
->
torch
.
Tensor
:
return
xm
.
all_reduce
(
xm
.
REDUCE_SUM
,
input_
)
# TODO: Remove the groups specification after XLA compiler can support
# auto-reordering the ring order for all-reduce.
return
xm
.
all_reduce
(
xm
.
REDUCE_SUM
,
input_
,
groups
=
self
.
groups
)
def
all_gather
(
self
,
input_
:
torch
.
Tensor
,
dim
:
int
=
-
1
)
->
torch
.
Tensor
:
assert
dim
==
-
1
,
"TPUs only support dim=-1 for all-gather."
...
...
vllm/distributed/kv_transfer/kv_connector/factory.py
View file @
675ba75f
...
...
@@ -53,3 +53,8 @@ KVConnectorFactory.register_connector(
"LMCacheConnector"
,
"vllm.distributed.kv_transfer.kv_connector.lmcache_connector"
,
"LMCacheConnector"
)
KVConnectorFactory
.
register_connector
(
"MooncakeStoreConnector"
,
"vllm.distributed.kv_transfer.kv_connector.mooncake_store_connector"
,
"MooncakeStoreConnector"
)
\ No newline at end of file
vllm/distributed/kv_transfer/kv_connector/mooncake_store_connector.py
0 → 100644
View file @
675ba75f
# SPDX-License-Identifier: Apache-2.0
"""
MooncakeStore Connector for Distributed Machine Learning Inference
The MooncakeStoreConnector transfers KV caches between prefill vLLM workers
(KV cache producer) and decode vLLM workers (KV cache consumer) using a
database-style KVStore.
"""
import
hashlib
from
typing
import
TYPE_CHECKING
,
List
,
Tuple
,
Union
import
torch
from
vllm
import
_custom_ops
as
ops
from
vllm.config
import
VllmConfig
from
vllm.distributed.kv_transfer.kv_connector.base
import
KVConnectorBase
from
vllm.logger
import
init_logger
from
vllm.sequence
import
IntermediateTensors
if
TYPE_CHECKING
:
from
vllm.worker.model_runner
import
ModelInputForGPUWithSamplingMetadata
logger
=
init_logger
(
__name__
)
class
MooncakeStoreConnector
(
KVConnectorBase
):
def
__init__
(
self
,
rank
:
int
,
local_rank
:
int
,
config
:
VllmConfig
,
):
self
.
config
=
config
.
kv_transfer_config
self
.
tp_size
=
config
.
parallel_config
.
tensor_parallel_size
self
.
local_tp_rank
=
local_rank
# Init kv_store
if
self
.
config
.
kv_connector
==
"MooncakeStoreConnector"
:
# Check if MOONCAKE_CONFIG_PATH is set
import
os
use_mooncake_store
=
os
.
getenv
(
'MOONCAKE_CONFIG_PATH'
)
is
not
None
if
not
use_mooncake_store
:
raise
ValueError
(
"To use MooncakeStoreConnector, you need to pass the ENV: "
"'MOONCAKE_CONFIG_PATH=/path/to/mooncake_config.json'."
)
else
:
from
vllm.distributed.kv_transfer.kv_lookup_buffer.mooncake_store
import
(
# noqa: E501
MooncakeStore
)
logger
.
info
(
"Initializing KVStoreConnector under kv_transfer_config %s"
,
self
.
config
)
self
.
kv_store
=
MooncakeStore
(
config
)
else
:
logger
.
error
(
"Can not find %s"
,
self
.
config
.
kv_connector
)
assert
self
.
kv_store
is
not
None
def
close
(
self
)
->
None
:
"""Close the buffer and release resources.
This method is responsible for cleaning up resources related to the
connector when it is no longer needed.
Raises:
NotImplementedError: This method must be implemented in subclasses.
"""
self
.
kv_store
.
close
()
def
send_kv_caches_and_hidden_states
(
self
,
model_executable
:
torch
.
nn
.
Module
,
model_input
:
"ModelInputForGPUWithSamplingMetadata"
,
kv_caches
:
List
[
torch
.
Tensor
],
hidden_or_intermediate_states
:
Union
[
torch
.
Tensor
,
IntermediateTensors
],
)
->
None
:
input_tokens_tensor
=
model_input
.
input_tokens
seq_lens
=
model_input
.
attn_metadata
.
seq_lens
slot_mapping_flat
=
model_input
.
attn_metadata
.
slot_mapping
.
flatten
()
start_layer
=
model_executable
.
model
.
start_layer
end_layer
=
model_executable
.
model
.
end_layer
model_config
=
model_executable
.
model
.
config
num_heads
=
int
(
model_config
.
num_key_value_heads
/
self
.
tp_size
)
hidden_size
=
model_config
.
hidden_size
num_attention_heads
=
model_config
.
num_attention_heads
head_size
=
int
(
hidden_size
/
num_attention_heads
)
for
idx
,
slen
in
enumerate
(
seq_lens
):
start_pos
=
sum
(
seq_lens
[:
idx
])
end_pos
=
start_pos
+
slen
current_tokens
=
input_tokens_tensor
[
start_pos
:
end_pos
]
store_key_prefix
=
self
.
tensor_hash
(
current_tokens
)
keys
,
values
=
[],
[]
for
layer_id
in
range
(
start_layer
,
end_layer
):
kv_cache
=
kv_caches
[
layer_id
-
start_layer
]
key_cache
=
kv_cache
[
0
].
reshape
(
-
1
,
num_heads
,
head_size
)
value_cache
=
kv_cache
[
1
].
reshape
(
-
1
,
num_heads
,
head_size
)
current_slot_mapping
=
slot_mapping_flat
[
start_pos
:
end_pos
]
keys
.
append
(
key_cache
[
current_slot_mapping
].
unsqueeze
(
0
))
values
.
append
(
value_cache
[
current_slot_mapping
].
unsqueeze
(
0
))
keys
=
torch
.
cat
(
keys
,
dim
=
0
)
values
=
torch
.
cat
(
values
,
dim
=
0
)
kvcache_to_sent
=
torch
.
stack
((
keys
,
values
),
dim
=
0
)
store_kvcache_key
=
f
"
{
store_key_prefix
}
_
{
self
.
local_tp_rank
}
"
self
.
kv_store
.
put
(
store_kvcache_key
,
kvcache_to_sent
)
hidden_key
=
f
"
{
store_key_prefix
}
_hidden_
{
self
.
local_tp_rank
}
"
self
.
kv_store
.
put
(
hidden_key
,
hidden_or_intermediate_states
[
start_pos
:
end_pos
])
logger
.
debug
(
"[rank%d]: KV send DONE."
,
torch
.
distributed
.
get_rank
())
def
recv_kv_caches_and_hidden_states
(
self
,
model_executable
:
torch
.
nn
.
Module
,
model_input
:
"ModelInputForGPUWithSamplingMetadata"
,
kv_caches
:
List
[
torch
.
Tensor
]
)
->
Tuple
[
Union
[
torch
.
Tensor
,
IntermediateTensors
],
bool
,
"ModelInputForGPUWithSamplingMetadata"
]:
bypass_model_exec
=
True
input_tokens_tensor
=
model_input
.
input_tokens
seq_lens
=
model_input
.
attn_metadata
.
seq_lens
num_prefill_tokens
=
model_input
.
attn_metadata
.
num_prefill_tokens
slot_mapping
=
model_input
.
attn_metadata
.
slot_mapping
.
flatten
()
start_layer
=
model_executable
.
model
.
start_layer
end_layer
=
model_executable
.
model
.
end_layer
hidden_or_intermediate_states_for_one_req
=
[]
for
idx
,
slen
in
enumerate
(
seq_lens
):
start_pos
=
sum
(
seq_lens
[:
idx
])
end_pos
=
start_pos
+
slen
if
start_pos
>=
num_prefill_tokens
:
# This can happen during inflight batching. See:
# vllm/worker/model_runner.py::_prepare_model_input_tensors:
# - input_tokens[:num_prefill_tokens] contains prefill tokens.
# - input_tokens[num_prefill_tokens:] contains decode tokens.
logger
.
warning
(
"You should set --enable_chunked_prefill=False "
"and --max_num_batched_tokens "
"should be equal to max_seq_len_to_capture"
)
bypass_model_exec
=
False
assert
start_pos
==
num_prefill_tokens
break
current_tokens
=
input_tokens_tensor
[
start_pos
:
end_pos
]
# get roi for current seq
load_key_prefix
=
self
.
tensor_hash
(
current_tokens
)
load_kvcache_key
=
f
"
{
load_key_prefix
}
_
{
self
.
local_tp_rank
}
"
remote_kv
=
self
.
kv_store
.
get
(
load_kvcache_key
)
hidden_key
=
f
"
{
load_key_prefix
}
_hidden_
{
self
.
local_tp_rank
}
"
hidden
=
self
.
kv_store
.
get
(
hidden_key
)
if
remote_kv
is
None
or
hidden
is
None
:
# didn't find any match.
bypass_model_exec
=
False
continue
num_computed_tokens
=
current_tokens
.
shape
[
0
]
# update the end position based on how many tokens are cached.
end_pos
=
start_pos
+
num_computed_tokens
# call self.kv_store to get kv layer by layer
for
layer_id
in
range
(
start_layer
,
end_layer
):
layer
=
model_executable
.
model
.
layers
[
layer_id
]
# get kvcache object
kv_cache
=
kv_caches
[
layer_id
-
start_layer
]
key_cache
,
value_cache
=
kv_cache
[
0
],
kv_cache
[
1
]
# get remote kvcache
remote_k
,
remote_v
=
remote_kv
[
0
][
layer_id
],
remote_kv
[
1
][
layer_id
]
# use ops.reshape_and_cache_flash to put kv into kvcache
ops
.
reshape_and_cache_flash
(
remote_k
.
to
(
key_cache
.
device
),
remote_v
.
to
(
value_cache
.
device
),
key_cache
,
value_cache
,
slot_mapping
[
start_pos
:
end_pos
],
layer
.
self_attn
.
attn
.
kv_cache_dtype
,
layer
.
self_attn
.
attn
.
_k_scale
,
layer
.
self_attn
.
attn
.
_v_scale
,
)
hidden_or_intermediate_states_for_one_req
.
append
(
hidden
)
if
not
bypass_model_exec
:
logger
.
warning
(
"[rank%d]: Failed to receive all KVs and hidden "
"states, redo model forwarding."
,
torch
.
distributed
.
get_rank
())
hidden_or_intermediate_states
=
None
else
:
logger
.
debug
(
"[rank%d]: Successfully received all KVs and hidden "
"states, skip model forwarding."
,
torch
.
distributed
.
get_rank
())
hidden_or_intermediate_states
=
torch
.
cat
(
hidden_or_intermediate_states_for_one_req
,
dim
=
0
)
return
hidden_or_intermediate_states
,
bypass_model_exec
,
model_input
@
staticmethod
def
tensor_hash
(
tensor
:
torch
.
Tensor
)
->
int
:
"""Calculate the hash value of the tensor."""
tensor_bytes
=
tensor
.
clone
().
detach
().
cpu
().
numpy
().
tobytes
()
hash_object
=
hashlib
.
blake2b
(
tensor_bytes
)
hash_hex
=
hash_object
.
hexdigest
()
return
int
(
hash_hex
[:
16
],
16
)
vllm/distributed/kv_transfer/kv_lookup_buffer/base.py
View file @
675ba75f
# SPDX-License-Identifier: Apache-2.0
"""
This file contains a new class `KVLookupBufferBase` that allows developers to
think of KV cache operations as inserting new KV cache entries (`insert`)
into the lookup buffer and querying existing KV caches (`drop_select`)
This file contains a new class `KVLookupBufferBase` that allows developers to
think of KV cache operations as inserting new KV cache entries (`insert`)
into the lookup buffer and querying existing KV caches (`drop_select`)
from the lookup buffer.
All distributed communications are abstracted behind this class.
This file also contains a new class `KVStoreBufferBase` that allows developers
to manage the KVCache buffer as a simple key-value storage buffer with basic
put/get operations.
These classes above are abstracted behind class `KVCacheBufferBase`.
"""
from
abc
import
ABC
,
abstractmethod
...
...
@@ -14,9 +18,27 @@ from typing import List, Optional
import
torch
class
KVLookupBufferBase
(
ABC
):
class
KVCacheBufferBase
(
ABC
):
"""
Abstract base class for a KVCache buffer.
"""
Abstract base class for a lookup buffer.
@
abstractmethod
def
close
(
self
)
->
None
:
"""Close the buffer and release resources.
This method is responsible for cleaning up resources related to the
KVCache buffer when it is no longer needed.
Raises:
NotImplementedError: This method must be implemented in subclasses.
"""
raise
NotImplementedError
class
KVLookupBufferBase
(
KVCacheBufferBase
):
"""
Abstract base class for a KVCache lookup buffer.
This class provides an abstraction for a key-value (KV) cache lookup buffer.
...
...
@@ -96,12 +118,55 @@ class KVLookupBufferBase(ABC):
"""
raise
NotImplementedError
class
KVStoreBufferBase
(
KVCacheBufferBase
):
"""
Abstract base class for a KVCache storage buffer with key-value semantics.
This class provides a simple key-value storage buffer abstract with basic
put/get operations, which enables flexible KVCache transfer granular
control.
The functionality is similar to a distributed key-value store, where:
- Key: A unique string identifier for the cached entry
- Value:
- Tensor to be stored and retrieved
- None (indicating deletion or empty value)
"""
@
abstractmethod
def
put
(
self
,
key
:
str
,
value
:
Optional
[
torch
.
Tensor
],
)
->
None
:
"""Store a key-value pair in the buffer.
Args:
key (str): Unique identifier for a tensor, this tensor could be the
key cache tensor, value cache tensor, or hidden state tensor
generated during model forwarding.
value (Optional[torch.Tensor]): Tensor to be stored.
Raises:
NotImplementedError: This method must be implemented in subclasses.
"""
raise
NotImplementedError
@
abstractmethod
def
close
(
self
)
->
None
:
"""Close the buffer and release resources.
def
get
(
self
,
key
:
str
,
)
->
Optional
[
torch
.
Tensor
]:
"""Retrieve a value from the buffer by key.
Args:
key (str): Unique identifier for a tensor, this tensor could be the
key cache tensor, value cache tensor, or hidden state tensor
generated during model forwarding.
This method is responsible for cleaning up resources related to the
lookup buffer when it is no longer needed
.
Returns:
Optional[torch.Tensor]: Stored tensor if exists, None otherwise
.
Raises:
NotImplementedError: This method must be implemented in subclasses.
...
...
vllm/distributed/kv_transfer/kv_lookup_buffer/mooncake_store.py
0 → 100644
View file @
675ba75f
# SPDX-License-Identifier: Apache-2.0
"""
This file contains a new class `MooncakeStore` that allows developers to
think of KV cache transfer operations as putting new KV cache entries
into a remote KVStore-based lookup buffer and getting existing KV caches
from this remote lookup buffer.
"""
import
json
import
os
from
dataclasses
import
dataclass
from
typing
import
Optional
import
torch
from
safetensors.torch
import
load
as
safetensors_load
from
safetensors.torch
import
save
as
safetensors_save
from
vllm.config
import
VllmConfig
from
vllm.distributed.kv_transfer.kv_lookup_buffer.base
import
(
KVStoreBufferBase
)
from
vllm.logger
import
init_logger
DEFAULT_GLOBAL_SEGMENT_SIZE
=
3355443200
# 3.125 GiB
DEFAULT_LOCAL_BUFFER_SIZE
=
1073741824
# 1.0 GiB
logger
=
init_logger
(
__name__
)
@
dataclass
class
MooncakeStoreConfig
:
local_hostname
:
str
metadata_server
:
str
global_segment_size
:
int
local_buffer_size
:
int
protocol
:
str
device_name
:
str
master_server_address
:
str
@
staticmethod
def
from_file
(
file_path
:
str
)
->
'MooncakeStoreConfig'
:
"""Load the config from a JSON file."""
with
open
(
file_path
)
as
fin
:
config
=
json
.
load
(
fin
)
return
MooncakeStoreConfig
(
local_hostname
=
config
.
get
(
"local_hostname"
),
metadata_server
=
config
.
get
(
"metadata_server"
),
global_segment_size
=
config
.
get
(
"global_segment_size"
,
DEFAULT_GLOBAL_SEGMENT_SIZE
),
local_buffer_size
=
config
.
get
(
"local_buffer_size"
,
DEFAULT_LOCAL_BUFFER_SIZE
),
protocol
=
config
.
get
(
"protocol"
,
"tcp"
),
device_name
=
config
.
get
(
"device_name"
,
""
),
master_server_address
=
config
.
get
(
"master_server_address"
),
)
@
staticmethod
def
load_from_env
()
->
'MooncakeStoreConfig'
:
"""Load config from a file specified in the environment variable."""
config_file_path
=
os
.
getenv
(
'MOONCAKE_CONFIG_PATH'
)
if
config_file_path
is
None
:
raise
ValueError
(
"The environment variable 'MOONCAKE_CONFIG_PATH' is not set."
)
return
MooncakeStoreConfig
.
from_file
(
config_file_path
)
class
MooncakeStore
(
KVStoreBufferBase
):
def
__init__
(
self
,
config
:
VllmConfig
,
):
try
:
from
mooncake_vllm_adaptor
import
MooncakeDistributedStore
except
ImportError
as
e
:
raise
ImportError
(
"Please install mooncake by following the instructions at "
"https://github.com/kvcache-ai/Mooncake/blob/main/doc/en/build.md "
# noqa: E501
"to run vLLM with MooncakeConnector."
)
from
e
try
:
self
.
store
=
MooncakeDistributedStore
()
self
.
config
=
MooncakeStoreConfig
.
load_from_env
()
logger
.
info
(
"Mooncake Configuration loaded successfully."
)
self
.
store
.
setup
(
self
.
config
.
local_hostname
,
self
.
config
.
metadata_server
,
self
.
config
.
global_segment_size
,
self
.
config
.
local_buffer_size
,
self
.
config
.
protocol
,
self
.
config
.
device_name
,
self
.
config
.
master_server_address
)
except
ValueError
as
e
:
logger
.
error
(
"Configuration loading failed: %s"
,
e
)
raise
except
Exception
as
exc
:
logger
.
error
(
"An error occurred while loading the configuration: %s"
,
exc
)
raise
def
close
(
self
):
# MooncakeDistributedStore will automatically call the destructor, so
# it is unnecessary to close it manually.
pass
def
put
(
self
,
key
:
str
,
value
:
Optional
[
torch
.
Tensor
],
)
->
None
:
# A message queue needs to be introduced before making it asynchronous.
if
value
is
not
None
:
self
.
_put_impl
(
key
,
value
)
def
get
(
self
,
key
:
str
,
)
->
Optional
[
torch
.
Tensor
]:
# A message queue needs to be introduced before making it asynchronous.
value
=
self
.
_get_impl
(
key
)
return
value
def
_put_impl
(
self
,
key
:
str
,
value
:
torch
.
Tensor
,
)
->
None
:
"""Put KVCache to Mooncake Store"""
device_id
=
value
.
device
.
index
if
value
.
device
.
type
==
'cuda'
else
-
1
device_tensor
=
torch
.
tensor
(
device_id
,
dtype
=
torch
.
int32
)
value_bytes
=
safetensors_save
({
"tensor"
:
value
,
"device_id"
:
device_tensor
})
try
:
self
.
store
.
put
(
key
,
value_bytes
)
except
TypeError
as
err
:
logger
.
error
(
"Failed to put value into Mooncake Store: %s"
,
err
)
raise
TypeError
(
"Mooncake Store Put Type Error."
)
from
err
def
_get_impl
(
self
,
key
:
str
,
)
->
Optional
[
torch
.
Tensor
]:
"""Get KVCache from Mooncake Store"""
try
:
data
=
self
.
store
.
get
(
key
)
except
TypeError
as
err
:
logger
.
error
(
"Failed to get value from Mooncake Store: %s"
,
err
)
raise
TypeError
(
"Mooncake Store Get Type Error."
)
from
err
if
data
:
loaded_tensors
=
safetensors_load
(
data
)
tensor
=
loaded_tensors
[
"tensor"
]
device_id_tensor
=
loaded_tensors
[
"device_id"
]
device_id
=
int
(
device_id_tensor
.
item
())
device
=
torch
.
device
(
'cuda'
,
device_id
)
if
device_id
>=
0
else
torch
.
device
(
'cpu'
)
return
tensor
.
to
(
device
)
return
None
vllm/distributed/parallel_state.py
View file @
675ba75f
...
...
@@ -119,11 +119,13 @@ def all_reduce_fake(tensor: torch.Tensor, group_name: str) -> torch.Tensor:
if
supports_custom_op
():
from
vllm.platforms
import
current_platform
direct_register_custom_op
(
op_name
=
"all_reduce"
,
op_func
=
all_reduce
,
mutates_args
=
[],
fake_impl
=
all_reduce_fake
,
dispatch_key
=
current_platform
.
dispatch_key
,
)
...
...
@@ -219,7 +221,8 @@ class GroupCoordinator:
self
.
cpu_group
,
1
<<
22
,
6
)
from
vllm.platforms
import
current_platform
self
.
use_custom_op_call
=
current_platform
.
is_cuda_alike
()
self
.
use_custom_op_call
=
(
current_platform
.
is_cuda_alike
()
or
current_platform
.
is_tpu
())
@
property
def
first_rank
(
self
):
...
...
vllm/distributed/utils.py
View file @
675ba75f
...
...
@@ -15,6 +15,7 @@ import torch
from
torch.distributed
import
ProcessGroup
,
TCPStore
from
torch.distributed.distributed_c10d
import
(
Backend
,
PrefixStore
,
_get_default_timeout
,
_unregister_process_group
,
is_nccl_available
)
from
torch.distributed.rendezvous
import
rendezvous
...
...
@@ -206,10 +207,7 @@ class StatelessProcessGroup:
def
barrier
(
self
):
"""A barrier to synchronize all ranks."""
for
i
in
range
(
self
.
world_size
):
if
i
==
self
.
rank
:
self
.
broadcast_obj
(
None
,
src
=
self
.
rank
)
else
:
self
.
broadcast_obj
(
None
,
src
=
i
)
self
.
broadcast_obj
(
None
,
src
=
i
)
@
staticmethod
def
create
(
...
...
@@ -333,3 +331,15 @@ def stateless_init_torch_distributed_process_group(
pg
.
_register_backend
(
device
,
backend_type
,
backend_class
)
return
pg
def
stateless_destroy_torch_distributed_process_group
(
pg
:
ProcessGroup
)
->
None
:
"""
Destroy ProcessGroup returned by
stateless_init_torch_distributed_process_group().
"""
# Lazy import for non-CUDA backends.
from
torch.distributed.distributed_c10d
import
_shutdown_backend
_shutdown_backend
(
pg
)
_unregister_process_group
(
pg
.
group_name
)
vllm/engine/arg_utils.py
View file @
675ba75f
...
...
@@ -23,6 +23,7 @@ from vllm.executor.executor_base import ExecutorBase
from
vllm.logger
import
init_logger
from
vllm.model_executor.layers.quantization
import
QUANTIZATION_METHODS
from
vllm.plugins
import
load_general_plugins
from
vllm.reasoning
import
ReasoningParserManager
from
vllm.test_utils
import
MODEL_WEIGHTS_S3_BUCKET
,
MODELS_ON_S3
from
vllm.transformers_utils.utils
import
check_gguf_file
from
vllm.usage.usage_lib
import
UsageContext
...
...
@@ -114,10 +115,12 @@ class EngineArgs:
# number of P/D disaggregation (or other disaggregation) workers
pipeline_parallel_size
:
int
=
1
tensor_parallel_size
:
int
=
1
data_parallel_size
:
int
=
1
enable_expert_parallel
:
bool
=
False
max_parallel_loading_workers
:
Optional
[
int
]
=
None
block_size
:
Optional
[
int
]
=
None
enable_prefix_caching
:
Optional
[
bool
]
=
None
prefix_caching_hash_algo
:
str
=
"builtin"
disable_sliding_window
:
bool
=
False
disable_cascade_attn
:
bool
=
False
use_v2_block_manager
:
bool
=
True
...
...
@@ -178,22 +181,7 @@ class EngineArgs:
guided_decoding_backend
:
str
=
'xgrammar'
logits_processor_pattern
:
Optional
[
str
]
=
None
speculative_config
:
Optional
[
Union
[
str
,
Dict
[
str
,
Any
]]]
=
None
# TODO(Shangming): Deprecate these out-of-date params after next release
speculative_model
:
Optional
[
str
]
=
None
speculative_model_quantization
:
Optional
[
str
]
=
None
speculative_draft_tensor_parallel_size
:
Optional
[
int
]
=
None
num_speculative_tokens
:
Optional
[
int
]
=
None
speculative_disable_mqa_scorer
:
Optional
[
bool
]
=
False
speculative_max_model_len
:
Optional
[
int
]
=
None
speculative_disable_by_batch_size
:
Optional
[
int
]
=
None
ngram_prompt_lookup_max
:
Optional
[
int
]
=
None
ngram_prompt_lookup_min
:
Optional
[
int
]
=
None
spec_decoding_acceptance_method
:
str
=
'rejection_sampler'
typical_acceptance_sampler_posterior_threshold
:
Optional
[
float
]
=
None
typical_acceptance_sampler_posterior_alpha
:
Optional
[
float
]
=
None
disable_logprobs_during_spec_decoding
:
Optional
[
bool
]
=
None
speculative_config
:
Optional
[
Dict
[
str
,
Any
]]
=
None
qlora_adapter_name_or_path
:
Optional
[
str
]
=
None
show_hidden_metrics_for_version
:
Optional
[
str
]
=
None
...
...
@@ -319,9 +307,7 @@ class EngineArgs:
parser
.
add_argument
(
'--download-dir'
,
type
=
nullable_str
,
default
=
EngineArgs
.
download_dir
,
help
=
'Directory to download and load the weights, '
'default to the default cache dir of '
'huggingface.'
)
help
=
'Directory to download and load the weights.'
)
parser
.
add_argument
(
'--load-format'
,
type
=
str
,
...
...
@@ -396,8 +382,7 @@ class EngineArgs:
'Valid backend values are "xgrammar", "guidance", and "auto". '
'With "auto", we will make opinionated choices based on request'
'contents and what the backend libraries currently support, so '
'the behavior is subject to change in each release. '
'The default is xgrammar.'
)
'the behavior is subject to change in each release.'
)
parser
.
add_argument
(
'--logits-processor-pattern'
,
type
=
nullable_str
,
...
...
@@ -441,6 +426,14 @@ class EngineArgs:
type
=
int
,
default
=
EngineArgs
.
tensor_parallel_size
,
help
=
'Number of tensor parallel replicas.'
)
parser
.
add_argument
(
'--data-parallel-size'
,
'-dp'
,
type
=
int
,
default
=
EngineArgs
.
data_parallel_size
,
help
=
'Number of data parallel replicas. '
'MoE layers will be sharded according to the '
'product of the tensor-parallel-size and '
'data-parallel-size.'
)
parser
.
add_argument
(
'--enable-expert-parallel'
,
action
=
'store_true'
,
...
...
@@ -475,6 +468,15 @@ class EngineArgs:
help
=
"Enables automatic prefix caching. "
"Use ``--no-enable-prefix-caching`` to disable explicitly."
,
)
parser
.
add_argument
(
"--prefix-caching-hash-algo"
,
type
=
str
,
choices
=
[
"builtin"
,
"sha256"
],
default
=
EngineArgs
.
prefix_caching_hash_algo
,
help
=
"Set the hash algorithm for prefix caching. "
"Options are 'builtin' (Python's built-in hash) or 'sha256' "
"(collision resistant but with certain overheads)."
,
)
parser
.
add_argument
(
'--disable-sliding-window'
,
action
=
'store_true'
,
help
=
'Disables sliding window, '
...
...
@@ -547,9 +549,7 @@ class EngineArgs:
type
=
int
,
default
=
EngineArgs
.
max_num_partial_prefills
,
help
=
"For chunked prefill, the max number of concurrent
\
partial prefills."
"Defaults to 1"
,
)
partial prefills."
)
parser
.
add_argument
(
"--max-long-partial-prefills"
,
type
=
int
,
...
...
@@ -558,15 +558,13 @@ class EngineArgs:
"than --long-prefill-token-threshold that will be prefilled "
"concurrently. Setting this less than --max-num-partial-prefills "
"will allow shorter prompts to jump the queue in front of longer "
"prompts in some cases, improving latency.
Defaults to 1.
"
)
"prompts in some cases, improving latency."
)
parser
.
add_argument
(
"--long-prefill-token-threshold"
,
type
=
float
,
default
=
EngineArgs
.
long_prefill_token_threshold
,
help
=
"For chunked prefill, a request is considered long if the "
"prompt is longer than this number of tokens. Defaults to 4%% of "
"the model's context length."
,
)
"prompt is longer than this number of tokens."
)
parser
.
add_argument
(
'--max-num-seqs'
,
type
=
int
,
default
=
EngineArgs
.
max_num_seqs
,
...
...
@@ -654,7 +652,7 @@ class EngineArgs:
type
=
nullable_kvs
,
default
=
EngineArgs
.
limit_mm_per_prompt
,
# The default value is given in
# MultiModal
Registry.init_mm
_limit
s
_per_prompt
# MultiModal
Config.get
_limit_per_prompt
help
=
(
'For each multimodal plugin, limit how many '
'input instances to allow for each prompt. '
'Expects a comma-separated list of items, '
...
...
@@ -718,8 +716,7 @@ class EngineArgs:
type
=
int
,
default
=
EngineArgs
.
max_cpu_loras
,
help
=
(
'Maximum number of LoRAs to store in CPU memory. '
'Must be >= than max_loras. '
'Defaults to max_loras.'
))
'Must be >= than max_loras.'
))
parser
.
add_argument
(
'--fully-sharded-loras'
,
action
=
'store_true'
,
...
...
@@ -781,122 +778,10 @@ class EngineArgs:
help
=
'If set, the prefill requests can be chunked based on the '
'max_num_batched_tokens.'
)
parser
.
add_argument
(
'--speculative-config'
,
type
=
nullable_str
,
type
=
json
.
loads
,
default
=
None
,
help
=
'The configurations for speculative decoding.'
' Should be a JSON string.'
)
parser
.
add_argument
(
'--speculative-model'
,
type
=
nullable_str
,
default
=
EngineArgs
.
speculative_model
,
help
=
'The name of the draft model to be used in speculative decoding.'
)
# Quantization settings for speculative model.
parser
.
add_argument
(
'--speculative-model-quantization'
,
type
=
nullable_str
,
choices
=
[
*
QUANTIZATION_METHODS
,
None
],
default
=
EngineArgs
.
speculative_model_quantization
,
help
=
'Method used to quantize the weights of speculative model. '
'If None, we first check the `quantization_config` '
'attribute in the model config file. If that is '
'None, we assume the model weights are not '
'quantized and use `dtype` to determine the data '
'type of the weights.'
)
parser
.
add_argument
(
'--num-speculative-tokens'
,
type
=
int
,
default
=
EngineArgs
.
num_speculative_tokens
,
help
=
'The number of speculative tokens to sample from '
'the draft model in speculative decoding.'
)
parser
.
add_argument
(
'--speculative-disable-mqa-scorer'
,
action
=
'store_true'
,
help
=
'If set to True, the MQA scorer will be disabled in speculative '
' and fall back to batch expansion'
)
parser
.
add_argument
(
'--speculative-draft-tensor-parallel-size'
,
'-spec-draft-tp'
,
type
=
int
,
default
=
EngineArgs
.
speculative_draft_tensor_parallel_size
,
help
=
'Number of tensor parallel replicas for '
'the draft model in speculative decoding.'
)
parser
.
add_argument
(
'--speculative-max-model-len'
,
type
=
int
,
default
=
EngineArgs
.
speculative_max_model_len
,
help
=
'The maximum sequence length supported by the '
'draft model. Sequences over this length will skip '
'speculation.'
)
parser
.
add_argument
(
'--speculative-disable-by-batch-size'
,
type
=
int
,
default
=
EngineArgs
.
speculative_disable_by_batch_size
,
help
=
'Disable speculative decoding for new incoming requests '
'if the number of enqueue requests is larger than this value.'
)
parser
.
add_argument
(
'--ngram-prompt-lookup-max'
,
type
=
int
,
default
=
EngineArgs
.
ngram_prompt_lookup_max
,
help
=
'Max size of window for ngram prompt lookup in speculative '
'decoding.'
)
parser
.
add_argument
(
'--ngram-prompt-lookup-min'
,
type
=
int
,
default
=
EngineArgs
.
ngram_prompt_lookup_min
,
help
=
'Min size of window for ngram prompt lookup in speculative '
'decoding.'
)
parser
.
add_argument
(
'--spec-decoding-acceptance-method'
,
type
=
str
,
default
=
EngineArgs
.
spec_decoding_acceptance_method
,
choices
=
[
'rejection_sampler'
,
'typical_acceptance_sampler'
],
help
=
'Specify the acceptance method to use during draft token '
'verification in speculative decoding. Two types of acceptance '
'routines are supported: '
'1) RejectionSampler which does not allow changing the '
'acceptance rate of draft tokens, '
'2) TypicalAcceptanceSampler which is configurable, allowing for '
'a higher acceptance rate at the cost of lower quality, '
'and vice versa.'
)
parser
.
add_argument
(
'--typical-acceptance-sampler-posterior-threshold'
,
type
=
float
,
default
=
EngineArgs
.
typical_acceptance_sampler_posterior_threshold
,
help
=
'Set the lower bound threshold for the posterior '
'probability of a token to be accepted. This threshold is '
'used by the TypicalAcceptanceSampler to make sampling decisions '
'during speculative decoding. Defaults to 0.09'
)
parser
.
add_argument
(
'--typical-acceptance-sampler-posterior-alpha'
,
type
=
float
,
default
=
EngineArgs
.
typical_acceptance_sampler_posterior_alpha
,
help
=
'A scaling factor for the entropy-based threshold for token '
'acceptance in the TypicalAcceptanceSampler. Typically defaults '
'to sqrt of --typical-acceptance-sampler-posterior-threshold '
'i.e. 0.3'
)
parser
.
add_argument
(
'--disable-logprobs-during-spec-decoding'
,
action
=
StoreBoolean
,
default
=
EngineArgs
.
disable_logprobs_during_spec_decoding
,
nargs
=
"?"
,
const
=
"True"
,
help
=
'If set to True, token log probabilities are not returned '
'during speculative decoding. If set to False, log probabilities '
'are returned according to the settings in SamplingParams. If '
'not specified, it defaults to True. Disabling log probabilities '
'during speculative decoding reduces latency by skipping logprob '
'calculation in proposal sampling, target sampling, and after '
'accepted tokens are determined.'
)
parser
.
add_argument
(
'--model-loader-extra-config'
,
type
=
nullable_str
,
...
...
@@ -1099,7 +984,7 @@ class EngineArgs:
parser
.
add_argument
(
"--reasoning-parser"
,
type
=
str
,
choices
=
[
"deepseek_r1"
]
,
choices
=
list
(
ReasoningParserManager
.
reasoning_parsers
)
,
default
=
None
,
help
=
"Select the reasoning parser depending on the model that you're "
...
...
@@ -1209,58 +1094,14 @@ class EngineArgs:
This function utilizes `speculative_config` to create a
SpeculativeConfig object. The `speculative_config` can either be
provided as a JSON string input via CLI arguments or directly as a
dictionary from the engine. If `speculative_config` is not set, this
function will attempt to construct a configuration dictionary using
certain parameters, which are scheduled for deprecation in the next
release. Note that in next releases, `speculative_config` must be
provided, and the deprecated standalone speculative-related parameters
will be removed.
dictionary from the engine.
"""
if
self
.
speculative_config
is
None
:
if
(
self
.
speculative_model
is
None
and
self
.
num_speculative_tokens
is
None
):
return
None
# TODO(Shangming): Deprecate this way of setting SpeculativeConfig,
# only allow '--speculative-config' after next release
logger
.
warning_once
(
"Please use '--speculative-config' to set all configurations "
"related to speculative decoding. The current method of "
"specifying the model through '--speculative-model' and "
"adding related parameters (e.g., '--num-speculative-tokens') "
"separately will be deprecated in the next release."
)
spec_config_dict
=
{
"model"
:
self
.
speculative_model
,
"quantization"
:
self
.
speculative_model_quantization
,
"max_model_len"
:
self
.
speculative_max_model_len
,
"draft_tensor_parallel_size"
:
self
.
speculative_draft_tensor_parallel_size
,
"num_speculative_tokens"
:
self
.
num_speculative_tokens
,
"disable_mqa_scorer"
:
self
.
speculative_disable_mqa_scorer
,
"disable_by_batch_size"
:
self
.
speculative_disable_by_batch_size
,
"prompt_lookup_max"
:
self
.
ngram_prompt_lookup_max
,
"prompt_lookup_min"
:
self
.
ngram_prompt_lookup_min
,
"acceptance_method"
:
self
.
spec_decoding_acceptance_method
,
"posterior_threshold"
:
self
.
typical_acceptance_sampler_posterior_threshold
,
"posterior_alpha"
:
self
.
typical_acceptance_sampler_posterior_alpha
,
"disable_logprobs"
:
self
.
disable_logprobs_during_spec_decoding
,
}
return
None
self
.
speculative_config
=
spec_config_dict
else
:
if
isinstance
(
self
.
speculative_config
,
str
):
import
ast
self
.
speculative_config
=
ast
.
literal_eval
(
self
.
speculative_config
)
# Note(Shangming): These parameters are not obtained from the cli arg
# '--speculative-config' and must be passed in when creating the engine
# config.
assert
isinstance
(
self
.
speculative_config
,
dict
)
self
.
speculative_config
.
update
({
"target_model_config"
:
target_model_config
,
"target_parallel_config"
:
target_parallel_config
,
...
...
@@ -1329,6 +1170,7 @@ class EngineArgs:
num_gpu_blocks_override
=
self
.
num_gpu_blocks_override
,
sliding_window
=
model_config
.
get_sliding_window
(),
enable_prefix_caching
=
self
.
enable_prefix_caching
,
prefix_caching_hash_algo
=
self
.
prefix_caching_hash_algo
,
cpu_offload_gb
=
self
.
cpu_offload_gb
,
calculate_kv_scales
=
self
.
calculate_kv_scales
,
)
...
...
@@ -1347,6 +1189,7 @@ class EngineArgs:
parallel_config
=
ParallelConfig
(
pipeline_parallel_size
=
self
.
pipeline_parallel_size
,
tensor_parallel_size
=
self
.
tensor_parallel_size
,
data_parallel_size
=
self
.
data_parallel_size
,
enable_expert_parallel
=
self
.
enable_expert_parallel
,
max_parallel_loading_workers
=
self
.
max_parallel_loading_workers
,
disable_custom_all_reduce
=
self
.
disable_custom_all_reduce
,
...
...
@@ -1538,7 +1381,8 @@ class EngineArgs:
# Xgrammar and Guidance are supported.
SUPPORTED_GUIDED_DECODING
=
[
"xgrammar"
,
"xgrammar:disable-any-whitespace"
,
"guidance"
,
"auto"
"xgrammar"
,
"xgrammar:disable-any-whitespace"
,
"guidance"
,
"guidance:disable-any-whitespace"
,
"auto"
]
if
self
.
guided_decoding_backend
not
in
SUPPORTED_GUIDED_DECODING
:
_raise_or_fallback
(
feature_name
=
"--guided-decoding-backend"
,
...
...
@@ -1580,12 +1424,6 @@ class EngineArgs:
recommend_to_remove
=
False
)
return
False
# No CPU offloading yet.
if
self
.
cpu_offload_gb
!=
EngineArgs
.
cpu_offload_gb
:
_raise_or_fallback
(
feature_name
=
"--cpu-offload-gb"
,
recommend_to_remove
=
False
)
return
False
# Only Fp16 and Bf16 dtypes since we only support FA.
V1_SUPPORTED_DTYPES
=
[
torch
.
bfloat16
,
torch
.
float16
]
if
model_config
.
dtype
not
in
V1_SUPPORTED_DTYPES
:
...
...
@@ -1594,7 +1432,7 @@ class EngineArgs:
return
False
# Some quantization is not compatible with torch.compile.
V1_UNSUPPORTED_QUANT
=
[
"bitsandbytes"
,
"gguf"
]
V1_UNSUPPORTED_QUANT
=
[
"gguf"
]
if
model_config
.
quantization
in
V1_UNSUPPORTED_QUANT
:
_raise_or_fallback
(
feature_name
=
f
"--quantization
{
model_config
.
quantization
}
"
,
...
...
@@ -1613,21 +1451,11 @@ class EngineArgs:
recommend_to_remove
=
False
)
return
False
# No TransformersModel support so far.
if
(
model_config
.
model_impl
==
ModelImpl
.
TRANSFORMERS
or
model_config
.
model_impl
==
"transformers"
):
_raise_or_fallback
(
feature_name
=
f
"model_impl=
{
model_config
.
model_impl
}
"
,
recommend_to_remove
=
False
)
return
False
# No Concurrent Partial Prefills so far.
if
(
self
.
max_num_partial_prefills
!=
EngineArgs
.
max_num_partial_prefills
or
self
.
max_long_partial_prefills
!=
EngineArgs
.
max_long_partial_prefills
or
self
.
long_prefill_token_threshold
!=
EngineArgs
.
long_prefill_token_threshold
):
!=
EngineArgs
.
max_long_partial_prefills
):
_raise_or_fallback
(
feature_name
=
"Concurrent Partial Prefill"
,
recommend_to_remove
=
False
)
return
False
...
...
@@ -1639,12 +1467,22 @@ class EngineArgs:
return
False
# Only Ngram speculative decoding so far.
if
(
self
.
speculative_model
is
not
None
or
self
.
num_speculative_tokens
is
not
None
):
is_ngram_enabled
=
False
is_eagle_enabled
=
False
if
self
.
speculative_config
is
not
None
:
# This is supported but experimental (handled below).
if
self
.
speculative_model
in
(
"ngram"
,
"[ngram]"
):
pass
speculative_method
=
self
.
speculative_config
.
get
(
"method"
)
if
speculative_method
:
if
speculative_method
in
(
"ngram"
,
"[ngram]"
):
is_ngram_enabled
=
True
elif
speculative_method
==
"eagle"
:
is_eagle_enabled
=
True
else
:
speculative_model
=
self
.
speculative_config
.
get
(
"model"
)
if
speculative_model
in
(
"ngram"
,
"[ngram]"
):
is_ngram_enabled
=
True
if
not
(
is_ngram_enabled
or
is_eagle_enabled
):
# Other speculative decoding methods are not supported yet.
_raise_or_fallback
(
feature_name
=
"Speculative Decoding"
,
recommend_to_remove
=
False
)
return
False
...
...
@@ -1666,9 +1504,8 @@ class EngineArgs:
_raise_or_fallback
(
feature_name
=
name
,
recommend_to_remove
=
True
)
return
False
# No support for device type other than CUDA, AMD (experiemntal) or
# TPU (experimental) so far.
if
not
(
current_platform
.
is_cuda_alike
()
or
current_platform
.
is_tpu
()):
# Platforms must decide if they can support v1 for this model
if
not
current_platform
.
supports_v1
(
model_config
=
model_config
):
_raise_or_fallback
(
feature_name
=
f
"device type=
{
current_platform
.
device_type
}
"
,
recommend_to_remove
=
False
)
...
...
@@ -1681,23 +1518,26 @@ class EngineArgs:
and
_warn_or_fallback
(
"Engine in background thread"
)):
return
False
# LoRA is supported on V1, but off by default for now.
if
self
.
enable_lora
and
_warn_or_fallback
(
"LORA"
):
# PP is supported on V1 with Ray distributed executor,
# but off for MP distributed executor for now.
if
(
self
.
pipeline_parallel_size
>
1
and
self
.
distributed_executor_backend
!=
"ray"
):
name
=
"Pipeline Parallelism without Ray distributed executor"
_raise_or_fallback
(
feature_name
=
name
,
recommend_to_remove
=
False
)
return
False
#
PP
is supported on V1, but off by default for now.
if
self
.
pipeline_parallel_size
>
1
and
_warn_or_fallback
(
"
PP
"
):
#
ngram
is supported on V1, but off by default for now.
if
is_ngram_enabled
and
_warn_or_fallback
(
"
ngram
"
):
return
False
# ngram is supported on V1, but off by default for now.
if
self
.
speculative_model
in
(
"ngram"
,
"[ngram]"
)
and
_warn_or_fallback
(
"ngram"
):
# Eagle is under development, so we don't support it yet.
if
is_eagle_enabled
and
_warn_or_fallback
(
"Eagle"
):
return
False
# Non-CUDA is supported on V1, but off by default for now.
not_cuda
=
not
current_platform
.
is_cuda
()
if
not_cuda
and
_warn_or_fallback
(
# noqa: SIM103
current_platform
.
device_
typ
e
):
current_platform
.
device_
nam
e
):
return
False
#############################################################
...
...
@@ -1720,7 +1560,7 @@ class EngineArgs:
is_gpu
=
current_platform
.
is_cuda
()
use_sliding_window
=
(
model_config
.
get_sliding_window
()
is
not
None
)
use_spec_decode
=
self
.
speculative_
model
is
not
None
use_spec_decode
=
self
.
speculative_
config
is
not
None
if
(
is_gpu
and
not
use_sliding_window
and
not
use_spec_decode
and
not
self
.
enable_lora
...
...
@@ -1748,12 +1588,22 @@ class EngineArgs:
msg
=
"Chunked prefill is not supported for pooling models"
raise
ValueError
(
msg
)
# Disable prefix caching for multimodal models for VLLM_V0.
if
(
model_config
.
is_multimodal_model
and
self
.
enable_prefix_caching
):
logger
.
warning
(
"--enable-prefix-caching is not supported for multimodal "
"models in V0 and has been disabled."
)
self
.
enable_prefix_caching
=
False
# if using prefix caching, we must set a hash algo
if
self
.
enable_prefix_caching
:
# Disable prefix caching for multimodal models for VLLM_V0.
if
model_config
.
is_multimodal_model
:
logger
.
warning
(
"--enable-prefix-caching is not supported for multimodal "
"models in V0 and has been disabled."
)
self
.
enable_prefix_caching
=
False
# VLLM_V0 only supports builtin hash algo for prefix caching.
if
self
.
prefix_caching_hash_algo
is
None
:
self
.
prefix_caching_hash_algo
=
"builtin"
elif
self
.
prefix_caching_hash_algo
==
"sha256"
:
raise
ValueError
(
"sha256 is not supported for prefix caching in V0 engine. "
"Please use 'builtin'."
)
# Set max_num_seqs to 256 for VLLM_V0.
if
self
.
max_num_seqs
is
None
:
...
...
@@ -1769,6 +1619,10 @@ class EngineArgs:
if
self
.
enable_prefix_caching
is
None
:
self
.
enable_prefix_caching
=
True
# if using prefix caching, we must set a hash algo
if
self
.
enable_prefix_caching
and
self
.
prefix_caching_hash_algo
is
None
:
self
.
prefix_caching_hash_algo
=
"builtin"
# V1 should use the new scheduler by default.
# Swap it only if this arg is set to the original V0 default
if
self
.
scheduler_cls
==
EngineArgs
.
scheduler_cls
:
...
...
vllm/engine/async_llm_engine.py
View file @
675ba75f
...
...
@@ -303,8 +303,11 @@ class _AsyncLLMEngine(LLMEngine):
ctx
.
seq_group_metadata_list
=
seq_group_metadata_list
ctx
.
scheduler_outputs
=
scheduler_outputs
finished_requests_ids
=
self
.
scheduler
[
virtual_engine
].
get_and_reset_finished_requests_ids
()
if
not
scheduler_outputs
.
is_empty
():
# this will cause mamba_cache/minimax_cache failed
# to release finished_requests_ids of the last steps
finished_requests_ids
=
self
.
scheduler
[
virtual_engine
].
get_and_reset_finished_requests_ids
()
# Maybe switch from async mode to sync mode
if
not
allow_async_output_proc
and
len
(
ctx
.
output_queue
)
>
0
:
...
...
@@ -1222,8 +1225,8 @@ class AsyncLLMEngine(EngineClient):
async
def
sleep
(
self
,
level
:
int
=
1
)
->
None
:
self
.
engine
.
sleep
(
level
)
async
def
wake_up
(
self
)
->
None
:
self
.
engine
.
wake_up
()
async
def
wake_up
(
self
,
tags
:
Optional
[
list
[
str
]]
=
None
)
->
None
:
self
.
engine
.
wake_up
(
tags
)
async
def
is_sleeping
(
self
)
->
bool
:
return
self
.
engine
.
is_sleeping
()
...
...
vllm/engine/llm_engine.py
View file @
675ba75f
...
...
@@ -7,8 +7,8 @@ from collections import deque
from
contextlib
import
contextmanager
from
dataclasses
import
dataclass
from
functools
import
partial
from
typing
import
(
TYPE_CHECKING
,
Callable
,
ClassVar
,
Deque
,
Dict
,
Iterable
,
List
,
Mapping
,
NamedTuple
,
Optional
)
from
typing
import
(
TYPE_CHECKING
,
Any
,
Callable
,
ClassVar
,
Deque
,
Dict
,
Iterable
,
List
,
Mapping
,
NamedTuple
,
Optional
)
from
typing
import
Sequence
as
GenericSequence
from
typing
import
Set
,
Type
,
Union
,
cast
,
overload
...
...
@@ -30,8 +30,8 @@ from vllm.entrypoints.openai.logits_processors import (
get_logits_processors
as
get_openai_logits_processors
)
from
vllm.executor.executor_base
import
ExecutorBase
from
vllm.inputs
import
(
INPUT_REGISTRY
,
InputRegistry
,
ProcessorInputs
,
PromptType
,
SingletonInputsAdapter
)
from
vllm.inputs.parse
import
is_
encoder_decoder_inputs
,
is_token_prompt
PromptType
)
from
vllm.inputs.parse
import
is_
token_prompt
,
split_enc_dec_inputs
from
vllm.inputs.preprocess
import
InputPreprocessor
from
vllm.logger
import
init_logger
from
vllm.logits_process
import
get_bad_words_logits_processors
...
...
@@ -67,6 +67,7 @@ _LOCAL_LOGGING_INTERVAL_SEC = 5
_G
=
TypeVar
(
"_G"
,
bound
=
BaseTokenizerGroup
,
default
=
BaseTokenizerGroup
)
_O
=
TypeVar
(
"_O"
,
RequestOutput
,
PoolingRequestOutput
)
_R
=
TypeVar
(
"_R"
,
default
=
Any
)
@
dataclass
...
...
@@ -609,12 +610,7 @@ class LLMEngine:
seq_id
=
next
(
self
.
seq_counter
)
eos_token_id
=
self
.
input_preprocessor
.
get_eos_token_id
(
lora_request
)
if
is_encoder_decoder_inputs
(
processed_inputs
):
decoder_inputs
=
processed_inputs
[
"decoder"
]
encoder_inputs
=
processed_inputs
[
"encoder"
]
else
:
decoder_inputs
=
processed_inputs
encoder_inputs
=
None
encoder_inputs
,
decoder_inputs
=
split_enc_dec_inputs
(
processed_inputs
)
seq
=
Sequence
(
seq_id
,
decoder_inputs
,
block_size
,
eos_token_id
,
lora_request
,
prompt_adapter_request
)
...
...
@@ -1942,10 +1938,10 @@ class LLMEngine:
"Sleep mode is not enabled in the model config"
)
self
.
model_executor
.
sleep
(
level
=
level
)
def
wake_up
(
self
)
->
None
:
def
wake_up
(
self
,
tags
:
Optional
[
list
[
str
]]
=
None
)
->
None
:
assert
self
.
vllm_config
.
model_config
.
enable_sleep_mode
,
(
"Sleep mode is not enabled in the model config"
)
self
.
model_executor
.
wake_up
()
self
.
model_executor
.
wake_up
(
tags
)
def
is_sleeping
(
self
)
->
bool
:
return
self
.
model_executor
.
is_sleeping
...
...
@@ -2031,15 +2027,16 @@ class LLMEngine:
def
_validate_model_inputs
(
self
,
inputs
:
ProcessorInputs
,
lora_request
:
Optional
[
LoRARequest
]):
if
is_encoder_decoder_inputs
(
inputs
):
# For encoder-decoder multimodal models, the max_prompt_len
# restricts the decoder prompt length
prompt_inputs
=
inputs
[
"decoder"
if
self
.
model_config
.
is_multimodal_model
else
"encoder"
]
encoder_inputs
,
decoder_inputs
=
split_enc_dec_inputs
(
inputs
)
# For encoder-decoder multimodal models, the max_prompt_len
# restricts the decoder prompt length
if
self
.
model_config
.
is_multimodal_model
:
prompt_inputs
=
decoder_inputs
else
:
prompt_inputs
=
inputs
prompt_inputs
=
encoder_inputs
or
decoder_
inputs
prompt_ids
=
SingletonInputsAdapter
(
prompt_inputs
).
prompt_token_ids
prompt_ids
=
prompt_inputs
[
"
prompt_token_ids
"
]
if
prompt_ids
is
None
or
len
(
prompt_ids
)
==
0
:
raise
ValueError
(
"Prompt cannot be empty"
)
...
...
@@ -2084,8 +2081,9 @@ class LLMEngine:
guided_decoding
.
backend
=
guided_decoding
.
backend
or
\
self
.
decoding_config
.
guided_decoding_backend
logger
.
debug
(
"Reasoning backend: %s"
,
self
.
decoding_config
.
reasoning_backend
)
if
self
.
decoding_config
.
reasoning_backend
is
not
None
:
logger
.
debug
(
"Building with reasoning backend %s"
,
self
.
decoding_config
.
reasoning_backend
)
processor
=
get_local_guided_decoding_logits_processor
(
guided_params
=
guided_decoding
,
...
...
@@ -2126,6 +2124,14 @@ class LLMEngine:
return
sampling_params
def
collective_rpc
(
self
,
method
:
Union
[
str
,
Callable
[...,
_R
]],
timeout
:
Optional
[
float
]
=
None
,
args
:
tuple
=
(),
kwargs
:
Optional
[
dict
[
str
,
Any
]]
=
None
)
->
list
[
_R
]:
return
self
.
model_executor
.
collective_rpc
(
method
,
timeout
,
args
,
kwargs
)
if
envs
.
is_set
(
"VLLM_USE_V1"
)
and
envs
.
VLLM_USE_V1
:
from
vllm.v1.engine.llm_engine
import
LLMEngine
as
V1LLMEngine
...
...
vllm/engine/metrics.py
View file @
675ba75f
...
...
@@ -52,6 +52,11 @@ class Metrics:
max_model_len
=
vllm_config
.
model_config
.
max_model_len
# Use this flag to hide metrics that were deprecated in
# a previous release and which will be removed future
self
.
show_hidden_metrics
=
\
vllm_config
.
observability_config
.
show_hidden_metrics
# System stats
# Scheduler State
self
.
gauge_scheduler_running
=
self
.
_gauge_cls
(
...
...
@@ -76,14 +81,15 @@ class Metrics:
)
# Deprecated in 0.8 - KV cache offloading is not used in V1
# TODO: in 0.9, only enable if show_hidden_metrics=True
self
.
gauge_scheduler_swapped
=
self
.
_gauge_cls
(
name
=
"vllm:num_requests_swapped"
,
documentation
=
(
"Number of requests swapped to CPU. "
"DEPRECATED: KV cache offloading is not used in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
# Hidden in 0.9, due to be removed in 0.10
if
self
.
show_hidden_metrics
:
self
.
gauge_scheduler_swapped
=
self
.
_gauge_cls
(
name
=
"vllm:num_requests_swapped"
,
documentation
=
(
"Number of requests swapped to CPU. "
"DEPRECATED: KV cache offloading is not used in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
# KV Cache Usage in %
self
.
gauge_gpu_cache_usage
=
self
.
_gauge_cls
(
...
...
@@ -93,34 +99,33 @@ class Metrics:
multiprocess_mode
=
"sum"
)
# Deprecated in 0.8 - KV cache offloading is not used in V1
# TODO: in 0.9, only enable if show_hidden_metrics=True
self
.
gauge_cpu_cache_usage
=
self
.
_gauge_cls
(
name
=
"vllm:cpu_cache_usage_perc"
,
documentation
=
(
"CPU KV-cache usage. 1 means 100 percent usage. "
"DEPRECATED: KV cache offloading is not used in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
# Deprecated in 0.8 - KV cache offloading is not used in V1
# TODO: in 0.9, only enable if show_hidden_metrics=True
self
.
gauge_cpu_prefix_cache_hit_rate
=
self
.
_gauge_cls
(
name
=
"vllm:cpu_prefix_cache_hit_rate"
,
documentation
=
(
"CPU prefix cache block hit rate. "
"DEPRECATED: KV cache offloading is not used in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
# Hidden in 0.9, due to be removed in 0.10
if
self
.
show_hidden_metrics
:
self
.
gauge_cpu_cache_usage
=
self
.
_gauge_cls
(
name
=
"vllm:cpu_cache_usage_perc"
,
documentation
=
(
"CPU KV-cache usage. 1 means 100 percent usage. "
"DEPRECATED: KV cache offloading is not used in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
self
.
gauge_cpu_prefix_cache_hit_rate
=
self
.
_gauge_cls
(
name
=
"vllm:cpu_prefix_cache_hit_rate"
,
documentation
=
(
"CPU prefix cache block hit rate. "
"DEPRECATED: KV cache offloading is not used in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
# Deprecated in 0.8 - replaced by queries+hits counters in V1
# TODO: in 0.9, only enable if show_hidden_metrics=True
self
.
gauge_gpu_prefix_cache_hit_rate
=
self
.
_gauge_cls
(
name
=
"vllm:gpu_prefix_cache_hit_rate"
,
documentation
=
(
"GPU prefix cache block hit rate. "
"DEPRECATED: use vllm:gpu_prefix_cache_queries and "
"vllm:gpu_prefix_cache_queries in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
# Hidden in 0.9, due to be removed in 0.10
if
self
.
show_hidden_metrics
:
self
.
gauge_gpu_prefix_cache_hit_rate
=
self
.
_gauge_cls
(
name
=
"vllm:gpu_prefix_cache_hit_rate"
,
documentation
=
(
"GPU prefix cache block hit rate. "
"DEPRECATED: use vllm:gpu_prefix_cache_queries "
"and vllm:gpu_prefix_cache_queries in V1"
),
labelnames
=
labelnames
,
multiprocess_mode
=
"sum"
)
# Iteration stats
self
.
counter_num_preemption
=
self
.
_counter_cls
(
...
...
@@ -198,33 +203,35 @@ class Metrics:
labelnames
=
labelnames
,
buckets
=
request_latency_buckets
)
# Deprecated in 0.8 - duplicates vllm:request_queue_time_seconds:
# TODO: in 0.9, only enable if show_hidden_metrics=True
self
.
histogram_time_in_queue_request
=
self
.
_histogram_cls
(
name
=
"vllm:time_in_queue_requests"
,
documentation
=
(
"Histogram of time the request spent in the queue in seconds. "
"DEPRECATED: use vllm:request_queue_time_seconds instead."
),
labelnames
=
labelnames
,
buckets
=
request_latency_buckets
)
# Hidden in 0.9, due to be removed in 0.10
if
self
.
show_hidden_metrics
:
self
.
histogram_time_in_queue_request
=
self
.
_histogram_cls
(
name
=
"vllm:time_in_queue_requests"
,
documentation
=
(
"Histogram of time the request spent in the queue in seconds. "
"DEPRECATED: use vllm:request_queue_time_seconds instead."
),
labelnames
=
labelnames
,
buckets
=
request_latency_buckets
)
# Deprecated in 0.8 - use prefill/decode/inference time metrics
# TODO: in 0.9, only enable if show_hidden_metrics=True
self
.
histogram_model_forward_time_request
=
self
.
_histogram_cls
(
name
=
"vllm:model_forward_time_milliseconds"
,
documentation
=
(
"Histogram of time spent in the model forward pass in ms. "
"DEPRECATED: use prefill/decode/inference time metrics instead."
),
labelnames
=
labelnames
,
buckets
=
build_1_2_3_5_8_buckets
(
3000
))
self
.
histogram_model_execute_time_request
=
self
.
_histogram_cls
(
name
=
"vllm:model_execute_time_milliseconds"
,
documentation
=
(
"Histogram of time spent in the model execute function in ms."
"DEPRECATED: use prefill/decode/inference time metrics instead."
),
labelnames
=
labelnames
,
buckets
=
build_1_2_3_5_8_buckets
(
3000
))
# Hidden in 0.9, due to be removed in 0.10
if
self
.
show_hidden_metrics
:
self
.
histogram_model_forward_time_request
=
self
.
_histogram_cls
(
name
=
"vllm:model_forward_time_milliseconds"
,
documentation
=
(
"Histogram of time spent in the model forward pass in ms. "
"DEPRECATED: use prefill/decode/inference time metrics instead"
),
labelnames
=
labelnames
,
buckets
=
build_1_2_3_5_8_buckets
(
3000
))
self
.
histogram_model_execute_time_request
=
self
.
_histogram_cls
(
name
=
"vllm:model_execute_time_milliseconds"
,
documentation
=
(
"Histogram of time spent in the model execute function in ms."
"DEPRECATED: use prefill/decode/inference time metrics instead"
),
labelnames
=
labelnames
,
buckets
=
build_1_2_3_5_8_buckets
(
3000
))
# Metadata
self
.
histogram_num_prompt_tokens_request
=
self
.
_histogram_cls
(
...
...
@@ -543,11 +550,6 @@ class PrometheusStatLogger(StatLoggerBase):
self
.
metrics
=
self
.
_metrics_cls
(
labelnames
=
list
(
labels
.
keys
()),
vllm_config
=
vllm_config
)
# Use this flag to hide metrics that were deprecated in
# a previous release and which will be removed future
self
.
show_hidden_metrics
=
\
vllm_config
.
observability_config
.
show_hidden_metrics
def
_log_gauge
(
self
,
gauge
,
data
:
Union
[
int
,
float
])
->
None
:
# Convenience function for logging to gauge.
gauge
.
labels
(
**
self
.
labels
).
set
(
data
)
...
...
@@ -580,18 +582,20 @@ class PrometheusStatLogger(StatLoggerBase):
# System state data
self
.
_log_gauge
(
self
.
metrics
.
gauge_scheduler_running
,
stats
.
num_running_sys
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_scheduler_swapped
,
stats
.
num_swapped_sys
)
if
self
.
metrics
.
show_hidden_metrics
:
self
.
_log_gauge
(
self
.
metrics
.
gauge_scheduler_swapped
,
stats
.
num_swapped_sys
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_scheduler_waiting
,
stats
.
num_waiting_sys
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_gpu_cache_usage
,
stats
.
gpu_cache_usage_sys
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_cpu_cache_usage
,
stats
.
cpu_cache_usage_sys
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_cpu_prefix_cache_hit_rate
,
stats
.
cpu_prefix_cache_hit_rate
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_gpu_prefix_cache_hit_rate
,
stats
.
gpu_prefix_cache_hit_rate
)
if
self
.
metrics
.
show_hidden_metrics
:
self
.
_log_gauge
(
self
.
metrics
.
gauge_cpu_cache_usage
,
stats
.
cpu_cache_usage_sys
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_cpu_prefix_cache_hit_rate
,
stats
.
cpu_prefix_cache_hit_rate
)
self
.
_log_gauge
(
self
.
metrics
.
gauge_gpu_prefix_cache_hit_rate
,
stats
.
gpu_prefix_cache_hit_rate
)
# Including max-lora in metric, in future this property of lora
# config maybe extended to be dynamic.
lora_info
=
{
...
...
@@ -629,12 +633,15 @@ class PrometheusStatLogger(StatLoggerBase):
stats
.
time_prefill_requests
)
self
.
_log_histogram
(
self
.
metrics
.
histogram_decode_time_request
,
stats
.
time_decode_requests
)
self
.
_log_histogram
(
self
.
metrics
.
histogram_time_in_queue_request
,
stats
.
time_in_queue_requests
)
self
.
_log_histogram
(
self
.
metrics
.
histogram_model_forward_time_request
,
stats
.
model_forward_time_requests
)
self
.
_log_histogram
(
self
.
metrics
.
histogram_model_execute_time_request
,
stats
.
model_execute_time_requests
)
if
self
.
metrics
.
show_hidden_metrics
:
self
.
_log_histogram
(
self
.
metrics
.
histogram_time_in_queue_request
,
stats
.
time_in_queue_requests
)
self
.
_log_histogram
(
self
.
metrics
.
histogram_model_forward_time_request
,
stats
.
model_forward_time_requests
)
self
.
_log_histogram
(
self
.
metrics
.
histogram_model_execute_time_request
,
stats
.
model_execute_time_requests
)
# Metadata
finished_reason_counter
=
CollectionsCounter
(
stats
.
finished_reason_requests
)
...
...
vllm/engine/multiprocessing/__init__.py
View file @
675ba75f
...
...
@@ -133,8 +133,9 @@ class RPCSleepRequest(Enum):
SLEEP_LEVEL_2
=
2
class
RPCWakeUpRequest
(
Enum
):
WAKE_UP
=
1
@
dataclass
class
RPCWakeUpRequest
:
tags
:
Optional
[
list
[
str
]]
=
None
@
dataclass
...
...
vllm/engine/multiprocessing/client.py
View file @
675ba75f
...
...
@@ -697,10 +697,10 @@ class MQLLMEngineClient(EngineClient):
return
await
self
.
_send_one_way_rpc_request
(
request
=
RPCSleepRequest
(
level
),
socket
=
self
.
input_socket
)
async
def
wake_up
(
self
)
->
None
:
async
def
wake_up
(
self
,
tags
:
Optional
[
list
[
str
]]
=
None
)
->
None
:
"""Wake up the engine"""
return
await
self
.
_send_one_way_rpc_request
(
request
=
RPCWakeUpRequest
.
WAKE_UP
,
socket
=
self
.
input_socket
)
request
=
RPCWakeUpRequest
(
tags
)
,
socket
=
self
.
input_socket
)
async
def
is_sleeping
(
self
)
->
bool
:
"""Check whether the engine is sleeping"""
...
...
Prev
1
…
11
12
13
14
15
16
17
18
19
…
26
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment