Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
vllm_cscc
Commits
b8025f24
Commit
b8025f24
authored
Jan 23, 2026
by
zhuwenwen
Browse files
remove unused code
parent
3b2aefb1
Changes
11
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
2 additions
and
3730 deletions
+2
-3730
CMakeLists.txt
CMakeLists.txt
+1
-1
vllm/_custom_ops.py
vllm/_custom_ops.py
+1
-9
vllm/worker/cpu_enc_dec_model_runner.py
vllm/worker/cpu_enc_dec_model_runner.py
+0
-326
vllm/worker/cpu_model_runner.py
vllm/worker/cpu_model_runner.py
+0
-671
vllm/worker/cpu_pooling_model_runner.py
vllm/worker/cpu_pooling_model_runner.py
+0
-125
vllm/worker/cpu_worker.py
vllm/worker/cpu_worker.py
+0
-452
vllm/worker/multi_step_tpu_worker.py
vllm/worker/multi_step_tpu_worker.py
+0
-108
vllm/worker/tpu_model_runner.py
vllm/worker/tpu_model_runner.py
+0
-909
vllm/worker/tpu_worker.py
vllm/worker/tpu_worker.py
+0
-337
vllm/worker/xpu_model_runner.py
vllm/worker/xpu_model_runner.py
+0
-606
vllm/worker/xpu_worker.py
vllm/worker/xpu_worker.py
+0
-186
No files found.
CMakeLists.txt
View file @
b8025f24
...
@@ -37,7 +37,7 @@ install(CODE "set(CMAKE_INSTALL_LOCAL_ONLY TRUE)" ALL_COMPONENTS)
...
@@ -37,7 +37,7 @@ install(CODE "set(CMAKE_INSTALL_LOCAL_ONLY TRUE)" ALL_COMPONENTS)
set
(
PYTHON_SUPPORTED_VERSIONS
"3.10"
"3.11"
"3.12"
"3.13"
)
set
(
PYTHON_SUPPORTED_VERSIONS
"3.10"
"3.11"
"3.12"
"3.13"
)
# Supported AMD GPU architectures.
# Supported AMD GPU architectures.
set
(
HIP_SUPPORTED_ARCHS
"gfx906;gfx908;gfx90a;gfx942;gfx950;gfx1030;gfx1100;gfx1101;gfx1200;gfx1201;gfx1150;gfx1151;gfx928;gfx936"
)
set
(
HIP_SUPPORTED_ARCHS
"gfx906;gfx908;gfx90a;gfx942;gfx950;gfx1030;gfx1100;gfx1101;gfx1200;gfx1201;gfx1150;gfx1151;gfx928;gfx936
;gfx938
"
)
# ROCm installation prefix. Default to /opt/rocm but allow override via
# ROCm installation prefix. Default to /opt/rocm but allow override via
# -DROCM_PATH=/your/rocm/path when invoking cmake.
# -DROCM_PATH=/your/rocm/path when invoking cmake.
...
...
vllm/_custom_ops.py
View file @
b8025f24
...
@@ -415,6 +415,7 @@ def apply_repetition_penalties(
...
@@ -415,6 +415,7 @@ def apply_repetition_penalties(
logits
,
prompt_mask
,
output_mask
,
repetition_penalties
logits
,
prompt_mask
,
output_mask
,
repetition_penalties
)
)
# fused quant layer norm ops
# fused quant layer norm ops
def
rms_norm_dynamic_per_token_quant
(
def
rms_norm_dynamic_per_token_quant
(
input
:
torch
.
Tensor
,
input
:
torch
.
Tensor
,
...
@@ -2539,15 +2540,6 @@ def cp_gather_indexer_k_quant_cache(
...
@@ -2539,15 +2540,6 @@ def cp_gather_indexer_k_quant_cache(
)
)
def
indexer_k_quant_and_cache
(
k
:
torch
.
Tensor
,
kv_cache
:
torch
.
Tensor
,
slot_mapping
:
torch
.
Tensor
,
quant_block_size
:
int
,
kv_cache_dtype
:
str
)
->
None
:
torch
.
ops
.
_C_cache_ops
.
indexer_k_quant_and_cache
(
k
,
kv_cache
,
slot_mapping
,
quant_block_size
,
kv_cache_dtype
)
def
get_device_attribute
(
attribute
:
int
,
device
:
int
)
->
int
:
def
get_device_attribute
(
attribute
:
int
,
device
:
int
)
->
int
:
return
torch
.
ops
.
_C_cuda_utils
.
get_device_attribute
(
attribute
,
device
)
return
torch
.
ops
.
_C_cuda_utils
.
get_device_attribute
(
attribute
,
device
)
...
...
vllm/worker/cpu_enc_dec_model_runner.py
deleted
100644 → 0
View file @
3b2aefb1
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import
dataclasses
from
typing
import
TYPE_CHECKING
,
Any
,
Dict
,
List
,
Optional
,
Tuple
,
Type
,
cast
import
torch
from
vllm.attention
import
AttentionMetadata
from
vllm.forward_context
import
set_forward_context
from
vllm.model_executor
import
SamplingMetadata
from
vllm.model_executor.layers.sampler
import
SamplerOutput
from
vllm.multimodal
import
MultiModalKwargs
from
vllm.sequence
import
IntermediateTensors
,
SequenceGroupMetadata
from
vllm.utils
import
make_tensor_with_pad
from
vllm.worker.cpu_model_runner
import
(
CPUModelRunnerBase
,
ModelInputForCPUBuilder
,
ModelInputForCPUWithSamplingMetadata
)
from
vllm.worker.model_runner_base
import
(
_add_attn_metadata_broadcastable_dict
,
_add_sampling_metadata_broadcastable_dict
)
if
TYPE_CHECKING
:
from
vllm.attention.backends.abstract
import
AttentionBackend
@
dataclasses
.
dataclass
(
frozen
=
True
)
class
EncoderDecoderModelInputForCPU
(
ModelInputForCPUWithSamplingMetadata
):
"""
Used by the EncoderDecoderModelRunner.
"""
encoder_input_tokens
:
Optional
[
torch
.
Tensor
]
=
None
encoder_input_positions
:
Optional
[
torch
.
Tensor
]
=
None
def
as_broadcastable_tensor_dict
(
self
)
->
Dict
[
str
,
Any
]:
tensor_dict
=
{
"input_tokens"
:
self
.
input_tokens
,
"input_positions"
:
self
.
input_positions
,
"encoder_input_tokens"
:
self
.
encoder_input_tokens
,
"encoder_input_positions"
:
self
.
encoder_input_positions
,
"multi_modal_kwargs"
:
self
.
multi_modal_kwargs
,
}
_add_attn_metadata_broadcastable_dict
(
tensor_dict
,
self
.
attn_metadata
)
_add_sampling_metadata_broadcastable_dict
(
tensor_dict
,
self
.
sampling_metadata
)
return
tensor_dict
@
classmethod
def
from_broadcasted_tensor_dict
(
cls
,
tensor_dict
:
Dict
[
str
,
Any
],
attn_backend
:
Optional
[
"AttentionBackend"
]
=
None
,
)
->
"EncoderDecoderModelInputForCPU"
:
return
cast
(
EncoderDecoderModelInputForCPU
,
super
().
from_broadcasted_tensor_dict
(
tensor_dict
,
attn_backend
))
class
CPUEncoderDecoderModelRunner
(
CPUModelRunnerBase
[
EncoderDecoderModelInputForCPU
]):
_model_input_cls
:
Type
[
EncoderDecoderModelInputForCPU
]
=
(
EncoderDecoderModelInputForCPU
)
_builder_cls
:
Type
[
ModelInputForCPUBuilder
]
=
ModelInputForCPUBuilder
def
_list_to_int32_tensor
(
self
,
_list
:
List
[
int
],
)
->
torch
.
Tensor
:
return
torch
.
tensor
(
_list
,
dtype
=
torch
.
int32
,
device
=
self
.
device
)
def
_list_to_long_tensor
(
self
,
_list
:
List
[
int
],
)
->
torch
.
Tensor
:
return
torch
.
tensor
(
_list
,
dtype
=
torch
.
long
,
device
=
self
.
device
)
def
_empty_int32_tensor
(
self
)
->
torch
.
Tensor
:
return
self
.
_list_to_int32_tensor
([])
def
_empty_long_tensor
(
self
)
->
torch
.
Tensor
:
return
self
.
_list_to_long_tensor
([])
def
make_model_input_from_broadcasted_tensor_dict
(
self
,
tensor_dict
:
Dict
[
str
,
Any
])
->
EncoderDecoderModelInputForCPU
:
return
EncoderDecoderModelInputForCPU
.
from_broadcasted_tensor_dict
(
tensor_dict
,
attn_backend
=
self
.
attn_backend
,
)
def
prepare_model_input
(
self
,
seq_group_metadata_list
:
List
[
SequenceGroupMetadata
],
virtual_engine
:
int
=
0
,
finished_requests_ids
:
Optional
[
List
[
str
]]
=
None
)
->
EncoderDecoderModelInputForCPU
:
model_input
=
self
.
_prepare_model_input_tensors
(
seq_group_metadata_list
,
finished_requests_ids
)
(
attn_metadata
,
encoder_input_tokens_tensor
,
encoder_input_positions_tensor
,
)
=
self
.
_prepare_encoder_model_input_tensors
(
seq_group_metadata_list
,
model_input
)
# Sampling metadata is only required for the final pp group
generators
=
self
.
get_generators
(
finished_requests_ids
)
sampling_metadata
=
SamplingMetadata
.
prepare
(
seq_group_metadata_list
,
model_input
.
seq_lens
,
model_input
.
query_lens
,
self
.
device
,
pin_memory
=
False
,
generators
=
generators
)
return
dataclasses
.
replace
(
model_input
,
sampling_metadata
=
sampling_metadata
,
attn_metadata
=
attn_metadata
,
encoder_input_tokens
=
encoder_input_tokens_tensor
,
encoder_input_positions
=
encoder_input_positions_tensor
,
virtual_engine
=
virtual_engine
,
)
def
_prepare_encoder_model_input_tensors
(
self
,
seq_group_metadata_list
:
List
[
SequenceGroupMetadata
],
model_input
:
EncoderDecoderModelInputForCPU
,
)
->
Tuple
[
AttentionMetadata
,
Optional
[
torch
.
Tensor
],
Optional
[
torch
.
Tensor
]]:
"""Helper method to prepare the encoder- and cross-attn-related
model inputs based on a given sequence group. These additional inputs
are used to augment an already-computed `EncoderDecoderModelInput`
data structure which already has decoder-related model inputs
populated.
Sets the following attn_metadata fields:
* `num_encoder_tokens`
* `encoder_seq_lens`
* `encoder_seq_lens_tensor`
* `max_encoder_seq_len`
* `cross_slot_mapping`
* `cross_block_tables`
Constructs a new model inputs data structure, based on
(1) the existing fields in the `model_inputs` argument,
and (2) the following additional fields which are
computed (or in the case of `attn_metadata`, updated)
by this function:
* attn_metadata
* encoder_input_tokens
* encoder_input_positions
Arguments:
* seq_group_metadata_list: list of sequence groups for which to
compute inputs
* model_inputs: model inputs data structure with decoder-oriented
fields already computed.
Return:
* Updated model inputs data structure
"""
if
len
(
seq_group_metadata_list
)
==
0
:
return
(
model_input
.
attn_metadata
,
None
,
None
)
# Since we are not supporting chunked prefill either the entire
# batch is prefill or it is decode
is_prompt
=
seq_group_metadata_list
[
0
].
is_prompt
# Build encoder inputs
encoder_seq_lens
:
List
[
int
]
=
[]
if
is_prompt
:
# Prefill phase.
cross_block_tables
=
self
.
_empty_int32_tensor
().
view
(
len
(
seq_group_metadata_list
),
-
1
)
# Extract input tokens/positions, cross-attention slot-mapping,
# & seq len from each sequence group metadata
(
encoder_input_tokens
,
encoder_input_positions
,
cross_slot_mapping
,
)
=
(
[],
[],
[],
)
for
seq_group_metadata
in
seq_group_metadata_list
:
# Build seq lens
seq_len
=
seq_group_metadata
.
encoder_seq_data
.
get_len
()
token_ids
=
seq_group_metadata
.
encoder_seq_data
.
get_token_ids
()
encoder_seq_lens
.
append
(
seq_len
)
# Build slot mapping
for
i
in
range
(
0
,
seq_len
):
block_number
=
seq_group_metadata
.
cross_block_table
[
i
//
self
.
block_size
]
block_offset
=
i
%
self
.
block_size
slot
=
block_number
*
self
.
block_size
+
block_offset
cross_slot_mapping
.
append
(
slot
)
# Build encoder input tokens
encoder_input_tokens
.
extend
(
token_ids
)
encoder_input_positions
.
extend
(
list
(
range
(
0
,
seq_len
)))
# Convert tokens/positions & cross-attention
# slot-mapping to encoder input tensors
encoder_input_tokens_tensor
=
self
.
_list_to_long_tensor
(
encoder_input_tokens
)
encoder_input_positions_tensor
=
self
.
_list_to_long_tensor
(
encoder_input_positions
)
cross_slot_mapping_tensor
=
self
.
_list_to_long_tensor
(
cross_slot_mapping
)
else
:
# Decode phase.
encoder_input_tokens_tensor
=
self
.
_empty_long_tensor
()
encoder_input_positions_tensor
=
self
.
_empty_long_tensor
()
cross_slot_mapping_tensor
=
self
.
_empty_long_tensor
()
# Extract cross-attention block tables &
# seq len from each sequence group metadata.
# Cross-attention block tables are empty
# during vLLM memory profiling.
cross_block_tables
=
[]
for
seq_group_metadata
in
seq_group_metadata_list
:
for
_
in
range
(
len
(
seq_group_metadata
.
seq_data
)):
encoder_seq_lens
.
append
(
seq_group_metadata
.
encoder_seq_data
.
get_len
())
cross_block_table
=
seq_group_metadata
.
cross_block_table
cross_block_tables
.
append
([]
if
(
cross_block_table
is
None
)
else
cross_block_table
)
max_len_of_block_table
=
max
(
len
(
block_table
)
for
block_table
in
cross_block_tables
)
cross_block_tables
=
make_tensor_with_pad
(
cross_block_tables
,
max_len
=
max_len_of_block_table
,
pad
=
0
,
dtype
=
torch
.
int32
,
device
=
self
.
device
,
)
# Compute encoder sequence lengths & encoder
# sequence starting offset tensors
max_encoder_seq_len
=
max
(
encoder_seq_lens
,
default
=
0
)
encoder_seq_lens_tensor
=
self
.
_list_to_int32_tensor
(
encoder_seq_lens
)
encoder_seq_start_loc
=
torch
.
zeros
(
encoder_seq_lens_tensor
.
shape
[
0
]
+
1
,
dtype
=
torch
.
int32
,
device
=
self
.
device
)
torch
.
cumsum
(
encoder_seq_lens_tensor
,
dim
=
0
,
dtype
=
encoder_seq_start_loc
.
dtype
,
out
=
encoder_seq_start_loc
[
1
:])
# Update attention metadata with encoder-oriented attributes
attn_metadata
=
model_input
.
attn_metadata
assert
attn_metadata
is
not
None
(
attn_metadata
.
num_encoder_tokens
,
attn_metadata
.
encoder_seq_lens
,
attn_metadata
.
encoder_seq_lens_tensor
,
attn_metadata
.
max_encoder_seq_len
,
attn_metadata
.
cross_slot_mapping
,
attn_metadata
.
cross_block_tables
,
)
=
(
sum
(
encoder_seq_lens
),
encoder_seq_lens
,
encoder_seq_lens_tensor
,
max_encoder_seq_len
,
cross_slot_mapping_tensor
,
cross_block_tables
,
)
return
(
attn_metadata
,
encoder_input_tokens_tensor
,
encoder_input_positions_tensor
)
@
torch
.
no_grad
()
def
execute_model
(
self
,
model_input
:
EncoderDecoderModelInputForCPU
,
kv_caches
:
List
[
torch
.
Tensor
],
intermediate_tensors
:
Optional
[
IntermediateTensors
]
=
None
,
num_steps
:
int
=
1
,
)
->
Optional
[
List
[
SamplerOutput
]]:
if
num_steps
>
1
:
raise
ValueError
(
"CPU worker does not support multi-step execution."
)
model_executable
=
self
.
model
execute_model_kwargs
=
{
"input_ids"
:
model_input
.
input_tokens
,
"positions"
:
model_input
.
input_positions
,
"encoder_input_ids"
:
model_input
.
encoder_input_tokens
,
"encoder_positions"
:
model_input
.
encoder_input_positions
,
**
MultiModalKwargs
.
as_kwargs
(
model_input
.
multi_modal_kwargs
or
{},
device
=
self
.
device
,
),
"intermediate_tensors"
:
intermediate_tensors
,
}
with
set_forward_context
(
model_input
.
attn_metadata
,
self
.
vllm_config
,
model_input
.
virtual_engine
):
hidden_states
=
model_executable
(
**
execute_model_kwargs
)
# Compute the logits.
logits
=
self
.
model
.
compute_logits
(
hidden_states
,
model_input
.
sampling_metadata
)
# Only perform sampling in the driver worker.
if
not
self
.
is_driver_worker
:
return
[]
# Sample the next token.
output
=
self
.
sampler
(
logits
=
logits
,
sampling_metadata
=
model_input
.
sampling_metadata
,
)
return
[
output
]
vllm/worker/cpu_model_runner.py
deleted
100644 → 0
View file @
3b2aefb1
This diff is collapsed.
Click to expand it.
vllm/worker/cpu_pooling_model_runner.py
deleted
100644 → 0
View file @
3b2aefb1
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import
dataclasses
from
typing
import
Any
,
Dict
,
List
,
Optional
,
Tuple
,
Type
,
Union
import
torch
from
vllm.forward_context
import
set_forward_context
from
vllm.model_executor.pooling_metadata
import
PoolingMetadata
from
vllm.multimodal
import
MultiModalKwargs
from
vllm.pooling_params
import
PoolingParams
from
vllm.sequence
import
(
IntermediateTensors
,
PoolerOutput
,
SequenceData
,
SequenceGroupMetadata
)
from
vllm.worker.cpu_model_runner
import
(
CPUModelRunnerBase
,
ModelInputForCPU
,
ModelInputForCPUBuilder
)
@
dataclasses
.
dataclass
(
frozen
=
True
)
class
ModelInputForCPUWithPoolingMetadata
(
ModelInputForCPU
):
"""
Used by the CPUPoolingModelRunner.
"""
pooling_metadata
:
Optional
[
"PoolingMetadata"
]
=
None
class
CPUPoolingModelRunner
(
CPUModelRunnerBase
[
ModelInputForCPUWithPoolingMetadata
]):
_model_input_cls
:
Type
[
ModelInputForCPUWithPoolingMetadata
]
=
(
ModelInputForCPUWithPoolingMetadata
)
_builder_cls
:
Type
[
ModelInputForCPUBuilder
]
=
ModelInputForCPUBuilder
@
torch
.
inference_mode
()
def
execute_model
(
self
,
model_input
:
ModelInputForCPUWithPoolingMetadata
,
kv_caches
:
List
[
torch
.
Tensor
],
intermediate_tensors
:
Optional
[
IntermediateTensors
]
=
None
,
num_steps
:
int
=
1
,
)
->
Optional
[
Union
[
List
[
PoolerOutput
],
IntermediateTensors
]]:
if
num_steps
>
1
:
raise
ValueError
(
"CPU worker does not support multi-step execution."
)
model_executable
=
self
.
model
cross_enc_kwargs
=
{}
if
model_input
.
token_type_ids
is
not
None
:
cross_enc_kwargs
[
"token_type_ids"
]
=
model_input
.
token_type_ids
execute_model_kwargs
=
{
"input_ids"
:
model_input
.
input_tokens
,
"positions"
:
model_input
.
input_positions
,
**
MultiModalKwargs
.
as_kwargs
(
model_input
.
multi_modal_kwargs
or
{},
device
=
self
.
device
,
),
**
cross_enc_kwargs
,
"intermediate_tensors"
:
intermediate_tensors
,
}
with
set_forward_context
(
model_input
.
attn_metadata
,
self
.
vllm_config
,
model_input
.
virtual_engine
):
hidden_states
=
model_executable
(
**
execute_model_kwargs
)
# Only perform pooling in the driver worker.
if
not
self
.
is_driver_worker
:
return
[]
return
[
self
.
model
.
pooler
(
hidden_states
=
hidden_states
,
pooling_metadata
=
model_input
.
pooling_metadata
)
]
def
make_model_input_from_broadcasted_tensor_dict
(
self
,
tensor_dict
:
Dict
[
str
,
Any
])
->
ModelInputForCPUWithPoolingMetadata
:
return
ModelInputForCPUWithPoolingMetadata
.
from_broadcasted_tensor_dict
(
tensor_dict
,
attn_backend
=
self
.
attn_backend
,
)
def
prepare_model_input
(
self
,
seq_group_metadata_list
:
Optional
[
List
[
SequenceGroupMetadata
]],
virtual_engine
:
int
=
0
,
finished_requests_ids
:
Optional
[
List
[
str
]]
=
None
)
->
ModelInputForCPUWithPoolingMetadata
:
assert
seq_group_metadata_list
is
not
None
model_input
=
self
.
_prepare_model_input_tensors
(
seq_group_metadata_list
,
finished_requests_ids
)
# Prepare PoolingMetadata.
assert
model_input
.
seq_lens
is
not
None
pooling_metadata
=
self
.
_prepare_pooling
(
seq_group_metadata_list
,
model_input
.
seq_lens
)
return
dataclasses
.
replace
(
model_input
,
virtual_engine
=
virtual_engine
,
pooling_metadata
=
pooling_metadata
)
def
_prepare_pooling
(
self
,
seq_group_metadata_list
:
List
[
SequenceGroupMetadata
],
prompt_lens
:
List
[
int
],
)
->
PoolingMetadata
:
"""Prepare PoolingMetadata for the sequence group metadata list."""
seq_groups
:
List
[
Tuple
[
List
[
int
],
PoolingParams
]]
=
[]
for
i
,
seq_group_metadata
in
enumerate
(
seq_group_metadata_list
):
seq_ids
=
list
(
seq_group_metadata
.
seq_data
.
keys
())
pooling_params
=
seq_group_metadata
.
pooling_params
seq_groups
.
append
((
seq_ids
,
pooling_params
))
seq_data
:
Dict
[
int
,
SequenceData
]
=
{}
for
seq_group_metadata
in
seq_group_metadata_list
:
seq_data
.
update
(
seq_group_metadata
.
seq_data
)
pooling_metadata
=
PoolingMetadata
(
seq_groups
=
seq_groups
,
seq_data
=
seq_data
,
prompt_lens
=
prompt_lens
,
)
return
pooling_metadata
vllm/worker/cpu_worker.py
deleted
100644 → 0
View file @
3b2aefb1
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""A CPU worker class."""
import
os
from
importlib
import
util
from
typing
import
List
,
Optional
,
Set
,
Tuple
,
Type
import
torch
import
torch.distributed
import
vllm.envs
as
envs
from
vllm.attention
import
get_attn_backend
from
vllm.config
import
(
CacheConfig
,
DeviceConfig
,
ModelConfig
,
ParallelConfig
,
VllmConfig
)
from
vllm.distributed
import
(
ensure_model_parallel_initialized
,
init_distributed_environment
)
from
vllm.logger
import
init_logger
from
vllm.lora.request
import
LoRARequest
from
vllm.model_executor
import
set_random_seed
from
vllm.sequence
import
ExecuteModelRequest
from
vllm.utils
import
bind_kv_cache
from
vllm.worker.cpu_enc_dec_model_runner
import
CPUEncoderDecoderModelRunner
from
vllm.worker.cpu_model_runner
import
CPUModelRunner
,
CPUModelRunnerBase
from
vllm.worker.cpu_pooling_model_runner
import
CPUPoolingModelRunner
from
vllm.worker.worker_base
import
(
LocalOrDistributedWorkerBase
,
WorkerBase
,
WorkerInput
)
logger
=
init_logger
(
__name__
)
class
CPUCacheEngine
:
"""Manages the KV cache for CPU backend.
This class is responsible for initializing and managing CPU KV
caches. It also provides methods for performing KV cache operations, such
as copying.
"""
def
__init__
(
self
,
cache_config
:
CacheConfig
,
model_config
:
ModelConfig
,
parallel_config
:
ParallelConfig
,
device_config
:
DeviceConfig
)
->
None
:
assert
device_config
.
device_type
==
"cpu"
self
.
cache_config
=
cache_config
self
.
model_config
=
model_config
self
.
parallel_config
=
parallel_config
self
.
head_size
=
model_config
.
get_head_size
()
self
.
num_layers
=
model_config
.
get_num_layers
(
parallel_config
)
self
.
num_heads
=
model_config
.
get_num_kv_heads
(
parallel_config
)
self
.
block_size
=
cache_config
.
block_size
# Note: In CacheConfig, num_gpu_blocks actual is num_cpu_blocks
# for CPU backend, because we want to reuse KV cache management
# in the scheduler.
self
.
num_cpu_blocks
=
cache_config
.
num_gpu_blocks
self
.
dtype
=
CPUCacheEngine
.
get_kv_cache_dtype
(
cache_config
,
model_config
)
# Get attention backend.
self
.
attn_backend
=
get_attn_backend
(
self
.
model_config
.
get_head_size
(),
self
.
model_config
.
dtype
,
cache_config
.
cache_dtype
,
self
.
block_size
,
self
.
model_config
.
is_attention_free
,
use_mla
=
self
.
model_config
.
use_mla
,
)
# Initialize the cache.
self
.
cpu_cache
=
self
.
_allocate_kv_cache
(
self
.
num_cpu_blocks
)
def
_allocate_kv_cache
(
self
,
num_blocks
:
int
,
)
->
List
[
torch
.
Tensor
]:
"""Allocates KV cache on CPU."""
kv_cache_shape
=
self
.
attn_backend
.
get_kv_cache_shape
(
num_blocks
,
self
.
block_size
,
self
.
num_heads
,
self
.
head_size
)
kv_cache
:
List
[
torch
.
Tensor
]
=
[]
for
_
in
range
(
self
.
num_layers
):
kv_cache
.
append
(
torch
.
empty
(
kv_cache_shape
,
dtype
=
self
.
dtype
,
device
=
"cpu"
))
return
kv_cache
def
swap_in
(
self
,
src_to_dst
:
torch
.
Tensor
)
->
None
:
raise
NotImplementedError
(
"Swap is not supported in CPUCacheEngine."
)
def
swap_out
(
self
,
src_to_dst
:
torch
.
Tensor
)
->
None
:
raise
NotImplementedError
(
"Swap is not supported in CPUCacheEngine."
)
def
copy
(
self
,
src_to_dsts
:
torch
.
Tensor
)
->
None
:
self
.
attn_backend
.
copy_blocks
(
self
.
cpu_cache
,
src_to_dsts
)
@
staticmethod
def
get_kv_cache_dtype
(
cache_config
:
CacheConfig
,
model_config
:
ModelConfig
):
if
cache_config
.
cache_dtype
==
"auto"
:
return
model_config
.
dtype
elif
cache_config
.
cache_dtype
in
[
"fp8"
,
"fp8_e5m2"
]:
return
torch
.
float8_e5m2
else
:
raise
NotImplementedError
(
f
"Unsupported KV cache type "
f
"
{
cache_config
.
cache_dtype
}
."
)
@
staticmethod
def
get_cache_block_size
(
cache_config
:
CacheConfig
,
model_config
:
ModelConfig
,
parallel_config
:
ParallelConfig
,
)
->
int
:
head_size
=
model_config
.
get_head_size
()
num_heads
=
model_config
.
get_num_kv_heads
(
parallel_config
)
num_layers
=
model_config
.
get_num_layers
(
parallel_config
)
key_cache_block
=
cache_config
.
block_size
*
num_heads
*
head_size
value_cache_block
=
key_cache_block
if
not
model_config
.
use_mla
else
0
total
=
num_layers
*
(
key_cache_block
+
value_cache_block
)
dtype
=
CPUCacheEngine
.
get_kv_cache_dtype
(
cache_config
,
model_config
)
dtype_size
=
torch
.
tensor
([],
dtype
=
dtype
).
element_size
()
return
dtype_size
*
total
class
CPUWorker
(
LocalOrDistributedWorkerBase
):
"""A worker class that executes (a partition of) the model on a CPU socket.
Each worker is associated with a single CPU socket. The worker is
responsible for maintaining the KV cache and executing the model on the
CPU. In case of distributed inference, each worker is assigned a partition
of the model.
"""
def
__init__
(
self
,
vllm_config
:
VllmConfig
,
local_rank
:
int
,
rank
:
int
,
distributed_init_method
:
str
,
kv_cache_dtype
:
Optional
[
str
]
=
"auto"
,
is_driver_worker
:
bool
=
False
,
model_runner_cls
:
Optional
[
Type
[
CPUModelRunner
]]
=
None
,
)
->
None
:
WorkerBase
.
__init__
(
self
,
vllm_config
=
vllm_config
)
self
.
local_rank
=
local_rank
self
.
rank
=
rank
vllm_config
.
parallel_config
.
rank
=
rank
self
.
distributed_init_method
=
distributed_init_method
self
.
is_driver_worker
=
is_driver_worker
if
self
.
is_driver_worker
:
assert
self
.
rank
==
0
,
"The driver worker must have rank 0."
if
self
.
model_config
.
trust_remote_code
:
# note: lazy import to avoid importing torch before initializing
from
vllm.utils
import
init_cached_hf_modules
init_cached_hf_modules
()
# Setup OpenMP threads affinity.
omp_cpuids
=
envs
.
VLLM_CPU_OMP_THREADS_BIND
self
.
local_omp_cpuid
=
"all"
if
omp_cpuids
==
"auto"
:
self
.
local_omp_cpuid
=
self
.
get_cpus_id_binding_based_on_numa_nodes
(
)
else
:
self
.
local_omp_cpuid
=
omp_cpuids
.
split
(
"|"
)[
rank
]
# Return hidden states from target model if the draft model is an
# mlp_speculator
speculative_config
=
self
.
speculative_config
model_config
=
self
.
model_config
speculative_args
=
{}
if
speculative_config
is
None
\
or
(
speculative_config
.
draft_model_config
.
model
==
model_config
.
model
)
\
or
(
speculative_config
.
draft_model_config
.
hf_config
.
model_type
not
in
[
"medusa"
,
"mlp_speculator"
,
"eagle"
])
\
else
{
"return_hidden_states"
:
True
}
ModelRunnerClass
:
Type
[
CPUModelRunnerBase
]
=
CPUModelRunner
if
self
.
model_config
.
runner_type
==
"pooling"
:
ModelRunnerClass
=
CPUPoolingModelRunner
elif
self
.
model_config
.
is_encoder_decoder
:
ModelRunnerClass
=
CPUEncoderDecoderModelRunner
self
.
model_runner
:
CPUModelRunnerBase
=
ModelRunnerClass
(
vllm_config
=
vllm_config
,
kv_cache_dtype
=
kv_cache_dtype
,
is_driver_worker
=
is_driver_worker
,
**
speculative_args
,
)
if
model_runner_cls
is
not
None
:
self
.
model_runner
=
model_runner_cls
(
self
.
model_runner
)
# Uninitialized cache engine. Will be initialized by
# initialize_cache.
self
.
cache_engine
:
List
[
CPUCacheEngine
]
# Initialize cpu_cache as pooling models don't initialize kv_caches
self
.
cpu_cache
:
Optional
[
List
[
List
[
torch
.
Tensor
]]]
=
None
# Torch profiler. Enabled and configured through env vars:
# VLLM_TORCH_PROFILER_DIR=/path/to/save/trace
if
envs
.
VLLM_TORCH_PROFILER_DIR
:
torch_profiler_trace_dir
=
envs
.
VLLM_TORCH_PROFILER_DIR
logger
.
info
(
"Profiling enabled. Traces will be saved to: %s"
,
torch_profiler_trace_dir
)
self
.
profiler
=
torch
.
profiler
.
profile
(
activities
=
[
torch
.
profiler
.
ProfilerActivity
.
CPU
,
],
with_stack
=
True
,
on_trace_ready
=
torch
.
profiler
.
tensorboard_trace_handler
(
torch_profiler_trace_dir
,
use_gzip
=
True
))
else
:
self
.
profiler
=
None
def
start_profile
(
self
):
if
self
.
profiler
is
None
:
raise
RuntimeError
(
"Profiler is not enabled."
)
self
.
profiler
.
start
()
def
stop_profile
(
self
):
if
self
.
profiler
is
None
:
raise
RuntimeError
(
"Profiler is not enabled."
)
self
.
profiler
.
stop
()
def
init_device
(
self
)
->
None
:
if
self
.
local_omp_cpuid
!=
"all"
:
ret
=
torch
.
ops
.
_C_utils
.
init_cpu_threads_env
(
self
.
local_omp_cpuid
)
if
ret
:
logger
.
info
(
ret
)
# Note: unique identifier for creating allreduce shared memory
os
.
environ
[
"VLLM_DIST_IDENT"
]
=
self
.
distributed_init_method
.
split
(
":"
)[
-
1
]
self
.
device
=
torch
.
device
(
"cpu"
)
self
.
init_distributed_environment
()
# Set random seed.
set_random_seed
(
self
.
model_config
.
seed
)
def
load_model
(
self
):
self
.
model_runner
.
load_model
()
def
determine_num_available_blocks
(
self
)
->
Tuple
[
int
,
int
]:
"""Determine the number of blocks available for the KV cache.
This determines how many KV blocks can fit into the configured CPU
KV cache space.
Note that since vLLM assumes a block resides on GPU if it can be
modified, we return num_gpu_blocks=num_cpu_blocks and num_cpu_blocks=0.
This allows us to reuse the scheduler of vLLM without generalizing it
to different devices.
"""
# For CPU device, the block number will be calculated based on the
# cpu_kvcache_space.
cache_block_size
=
self
.
get_cache_block_size_bytes
()
num_cpu_blocks
=
int
(
self
.
cache_config
.
cpu_kvcache_space_bytes
//
cache_block_size
)
num_cpu_blocks
=
max
(
num_cpu_blocks
,
0
)
# Note: To reuse the cache management procedure,
# use cpu cache as 'gpu cache'.
num_gpu_blocks
=
num_cpu_blocks
num_cpu_blocks
=
0
return
num_gpu_blocks
,
num_cpu_blocks
def
initialize_cache
(
self
,
num_gpu_blocks
:
int
,
num_cpu_blocks
:
int
)
->
None
:
"""Initialize the KV cache. Currently, swappable CPU memory is not
supported.
Since this worker does not support GPUs, we use the num_gpu_blocks to
determine how many non-swappable CPU blocks to allocate.
"""
assert
(
num_cpu_blocks
==
0
),
f
"
{
type
(
self
)
}
does not support swappable cache"
# Note: To reuse the cache management procedure,
# use cpu cache as 'gpu cache'.
num_cpu_blocks
=
num_gpu_blocks
self
.
_validate_num_cpu_blocks
(
num_cpu_blocks
)
self
.
cache_config
.
num_gpu_blocks
=
num_cpu_blocks
self
.
cache_config
.
num_cpu_blocks
=
0
# Initialize the cache.
self
.
_init_cache_engine
()
def
add_lora
(
self
,
lora_request
:
LoRARequest
)
->
bool
:
return
self
.
model_runner
.
add_lora
(
lora_request
)
def
remove_lora
(
self
,
lora_id
:
int
)
->
bool
:
return
self
.
model_runner
.
remove_lora
(
lora_id
)
def
pin_lora
(
self
,
lora_id
:
int
)
->
bool
:
return
self
.
model_runner
.
pin_lora
(
lora_id
)
def
list_loras
(
self
)
->
Set
[
int
]:
return
self
.
model_runner
.
list_loras
()
def
_validate_num_cpu_blocks
(
self
,
num_cpu_blocks
:
int
)
->
None
:
"""Raise errors if the num_cpu_blocks is invalid.
"""
if
num_cpu_blocks
<=
0
:
raise
ValueError
(
"No available memory for the cache blocks. "
"Try increasing `VLLM_CPU_KVCACHE_SPACE` when "
"initializing the engine."
)
max_seq_len
=
self
.
cache_config
.
block_size
*
num_cpu_blocks
if
self
.
model_config
.
max_model_len
>
max_seq_len
:
raise
ValueError
(
f
"The model's max seq len (
{
self
.
model_config
.
max_model_len
}
) "
"is larger than the maximum number of tokens that can be "
f
"stored in KV cache (
{
max_seq_len
}
). Try increasing "
"`VLLM_CPU_KVCACHE_SPACE` or decreasing `max_model_len` when "
"initializing the engine."
)
def
_init_cache_engine
(
self
)
->
None
:
self
.
cache_engine
=
[
CPUCacheEngine
(
self
.
cache_config
,
self
.
model_config
,
self
.
parallel_config
,
self
.
device_config
)
for
_
in
range
(
self
.
parallel_config
.
pipeline_parallel_size
)
]
self
.
cpu_cache
=
[
self
.
cache_engine
[
ve
].
cpu_cache
for
ve
in
range
(
self
.
parallel_config
.
pipeline_parallel_size
)
]
bind_kv_cache
(
self
.
compilation_config
.
static_forward_context
,
self
.
cpu_cache
)
self
.
model_runner
.
block_size
=
self
.
cache_engine
[
0
].
block_size
assert
all
(
self
.
cpu_cache
[
ve
]
is
not
None
for
ve
in
range
(
self
.
parallel_config
.
pipeline_parallel_size
))
# Populate the cache to warmup the memory
for
ve
in
range
(
self
.
parallel_config
.
pipeline_parallel_size
):
for
layer_cache
in
self
.
cpu_cache
[
ve
]:
layer_cache
.
fill_
(
0
)
@
property
def
do_metadata_broadcast
(
self
)
->
bool
:
return
self
.
parallel_config
.
tensor_parallel_size
>
1
@
property
def
kv_cache
(
self
)
->
Optional
[
List
[
List
[
torch
.
Tensor
]]]:
return
self
.
cpu_cache
@
property
def
vocab_size
(
self
)
->
int
:
return
self
.
model_runner
.
vocab_size
@
property
def
max_model_len
(
self
)
->
int
:
return
self
.
model_config
.
max_model_len
def
execute_worker
(
self
,
worker_input
:
WorkerInput
,
)
->
None
:
if
(
worker_input
.
blocks_to_copy
is
not
None
and
worker_input
.
blocks_to_copy
.
numel
()
>
0
):
self
.
cache_engine
[
worker_input
.
virtual_engine
].
copy
(
worker_input
.
blocks_to_copy
)
@
torch
.
inference_mode
()
def
prepare_worker_input
(
self
,
execute_model_req
:
ExecuteModelRequest
)
->
WorkerInput
:
assert
execute_model_req
is
not
None
virtual_engine
:
int
=
execute_model_req
.
virtual_engine
num_seq_groups
:
int
=
len
(
execute_model_req
.
seq_group_metadata_list
)
blocks_to_copy
=
torch
.
tensor
(
execute_model_req
.
blocks_to_copy
,
device
=
"cpu"
,
dtype
=
torch
.
int64
).
view
(
-
1
,
2
)
assert
len
(
execute_model_req
.
blocks_to_swap_in
)
==
0
assert
len
(
execute_model_req
.
blocks_to_swap_out
)
==
0
return
WorkerInput
(
num_seq_groups
=
num_seq_groups
,
blocks_to_copy
=
blocks_to_copy
,
virtual_engine
=
virtual_engine
,
)
def
init_distributed_environment
(
self
)
->
None
:
"""Initialize the distributed environment."""
parallel_config
=
self
.
parallel_config
rank
=
self
.
rank
distributed_init_method
=
self
.
distributed_init_method
init_distributed_environment
(
world_size
=
parallel_config
.
world_size
,
rank
=
rank
,
distributed_init_method
=
distributed_init_method
,
backend
=
"gloo"
,
)
# A small all_reduce for warmup.
torch
.
distributed
.
all_reduce
(
torch
.
zeros
(
1
).
cpu
())
ensure_model_parallel_initialized
(
parallel_config
.
tensor_parallel_size
,
parallel_config
.
pipeline_parallel_size
)
def
get_cache_block_size_bytes
(
self
)
->
int
:
"""Return the size in bytes of a single KV cache block.
"""
return
CPUCacheEngine
.
get_cache_block_size
(
self
.
cache_config
,
self
.
model_config
,
self
.
parallel_config
)
def
get_cpus_id_binding_based_on_numa_nodes
(
self
)
->
str
:
"""Return CPUs id binding based on NUMA nodes.
"""
rank_to_cpus
=
self
.
local_omp_cpuid
# Setup OpenMP thread affinity based on NUMA nodes automatically
world_size
=
self
.
vllm_config
.
parallel_config
.
world_size
libnuma_found
=
util
.
find_spec
(
"numa"
)
is
not
None
psutil_found
=
util
.
find_spec
(
"psutil"
)
is
not
None
if
libnuma_found
and
psutil_found
:
import
psutil
from
numa
import
info
cpu_count
=
psutil
.
cpu_count
(
logical
=
False
)
cpus_allow_list
=
psutil
.
Process
().
cpu_affinity
()
numa_size
=
info
.
get_num_configured_nodes
()
cpu_count_per_numa
=
cpu_count
//
numa_size
num_of_reserved_cpu
=
min
(
envs
.
VLLM_CPU_NUM_OF_RESERVED_CPU
,
cpu_count_per_numa
//
2
)
# check allow node_to_cpus list
node_to_cpus
=
[]
for
i
in
range
(
numa_size
):
node_intersect
=
set
(
info
.
node_to_cpus
(
i
)).
intersection
(
cpus_allow_list
)
if
bool
(
node_intersect
):
node_to_cpus
.
append
(
list
(
node_intersect
))
if
world_size
>
len
(
node_to_cpus
):
logger
.
error
(
"Auto thread-binding failed due to "
"world size: %d is larger than "
"allowed NUMA nodes number: %d."
"Please try to bind threads manually."
,
world_size
,
len
(
node_to_cpus
))
else
:
end
=
cpu_count_per_numa
-
num_of_reserved_cpu
rank_to_cpus_list
=
node_to_cpus
[
self
.
rank
][:
end
]
rank_to_cpus
=
','
.
join
(
str
(
x
)
for
x
in
rank_to_cpus_list
)
logger
.
info
(
"auto thread-binding list: %s"
,
rank_to_cpus
)
else
:
logger
.
warning
(
"Auto thread-binding is not supported due to "
"the lack of package numa and psutil,"
"fallback to no thread-binding. To get better performance,"
"please try to manually bind threads."
)
return
rank_to_cpus
vllm/worker/multi_step_tpu_worker.py
deleted
100644 → 0
View file @
3b2aefb1
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import
dataclasses
from
typing
import
Dict
,
Optional
,
Tuple
import
torch
from
vllm.distributed
import
broadcast_tensor_dict
from
vllm.sequence
import
ExecuteModelRequest
from
vllm.worker.tpu_model_runner
import
ModelInputForTPU
from
vllm.worker.tpu_worker
import
TPUWorker
from
vllm.worker.worker_base
import
WorkerInput
class
MultiStepTPUWorker
(
TPUWorker
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
().
__init__
(
*
args
,
**
kwargs
)
self
.
cached_model_input
:
Optional
[
ModelInputForTPU
]
=
None
def
_get_driver_input_and_broadcast
(
self
,
execute_model_req
:
ExecuteModelRequest
)
->
Tuple
[
ModelInputForTPU
,
WorkerInput
,
Dict
[
str
,
torch
.
Tensor
]]:
assert
self
.
is_driver_worker
assert
execute_model_req
.
virtual_engine
==
0
is_first_multi_step
=
execute_model_req
.
is_first_multi_step
is_last_step
=
execute_model_req
.
is_last_step
if
is_first_multi_step
:
worker_input
:
WorkerInput
=
self
.
prepare_worker_input
(
execute_model_req
=
execute_model_req
)
worker_input
=
dataclasses
.
replace
(
worker_input
,
num_steps
=
execute_model_req
.
num_lookahead_slots
+
1
)
model_input
:
ModelInputForTPU
=
(
self
.
model_runner
.
prepare_model_input
(
execute_model_req
.
seq_group_metadata_list
,
execute_model_req
.
virtual_engine
,
execute_model_req
.
finished_requests_ids
))
if
execute_model_req
.
async_callback
:
model_input
=
dataclasses
.
replace
(
model_input
,
async_callback
=
execute_model_req
.
async_callback
)
else
:
assert
self
.
cached_model_input
is
not
None
model_input
=
self
.
cached_model_input
worker_input
=
WorkerInput
()
model_input
=
dataclasses
.
replace
(
model_input
,
is_first_multi_step
=
is_first_multi_step
,
is_last_step
=
is_last_step
)
if
self
.
do_metadata_broadcast
:
if
is_first_multi_step
:
broadcast_data
=
worker_input
.
as_broadcastable_tensor_dict
()
broadcast_data
.
update
(
model_input
.
as_broadcastable_tensor_dict
())
broadcast_tensor_dict
(
broadcast_data
,
src
=
0
)
else
:
broadcast_data
=
{
"is_first_multi_step"
:
is_first_multi_step
,
"is_last_step"
:
is_last_step
,
}
broadcast_tensor_dict
(
broadcast_data
,
src
=
0
)
# Retuning empty dict here to keep this compatible with
# `LocalOrDistributedWorkerBase._get_driver_input_and_broadcast`
return
model_input
,
worker_input
,
{}
def
prepare_input
(
self
,
execute_model_req
:
Optional
[
ExecuteModelRequest
]
=
None
,
)
->
Optional
[
Tuple
[
ModelInputForTPU
,
WorkerInput
,
Dict
[
str
,
torch
.
Tensor
]]]:
if
self
.
is_driver_worker
:
if
execute_model_req
is
None
:
if
self
.
do_metadata_broadcast
:
broadcast_tensor_dict
({},
src
=
0
)
return
None
model_input
,
worker_input
,
_
=
self
.
_get_driver_input_and_broadcast
(
execute_model_req
)
if
model_input
.
is_first_multi_step
:
self
.
cached_model_input
=
model_input
return
model_input
,
worker_input
,
{}
else
:
broadcast_data
=
broadcast_tensor_dict
(
src
=
0
)
if
not
broadcast_data
:
return
None
if
len
(
broadcast_data
)
==
2
:
assert
self
.
cached_model_input
is
not
None
self
.
cached_model_input
=
dataclasses
.
replace
(
self
.
cached_model_input
,
is_first_multi_step
=
broadcast_data
[
"is_first_multi_step"
],
is_last_step
=
broadcast_data
[
"is_last_step"
])
empty_worker_input
=
WorkerInput
()
return
self
.
cached_model_input
,
empty_worker_input
,
{}
worker_input
=
WorkerInput
.
from_broadcasted_tensor_dict
(
broadcast_data
)
model_input
=
(
self
.
model_runner
.
make_model_input_from_broadcasted_tensor_dict
(
broadcast_data
))
self
.
cached_model_input
=
model_input
return
model_input
,
worker_input
,
{}
vllm/worker/tpu_model_runner.py
deleted
100644 → 0
View file @
3b2aefb1
This diff is collapsed.
Click to expand it.
vllm/worker/tpu_worker.py
deleted
100644 → 0
View file @
3b2aefb1
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import
os
from
typing
import
List
,
Optional
,
Tuple
,
Union
import
torch
import
torch_xla.core.xla_model
as
xm
import
torch_xla.debug.profiler
as
xp
import
torch_xla.runtime
as
xr
import
vllm.envs
as
envs
from
vllm.config
import
VllmConfig
from
vllm.distributed
import
(
ensure_model_parallel_initialized
,
init_distributed_environment
)
from
vllm.logger
import
init_logger
from
vllm.model_executor
import
set_random_seed
from
vllm.sequence
import
ExecuteModelRequest
from
vllm.utils
import
STR_DTYPE_TO_TORCH_DTYPE
,
bind_kv_cache
,
get_dtype_size
from
vllm.worker.tpu_model_runner
import
ExecutionMode
,
TPUModelRunner
from
vllm.worker.worker_base
import
(
LocalOrDistributedWorkerBase
,
LoRANotSupportedWorkerBase
,
WorkerBase
,
WorkerInput
)
logger
=
init_logger
(
__name__
)
class
TPUWorker
(
LoRANotSupportedWorkerBase
,
LocalOrDistributedWorkerBase
):
def
__init__
(
self
,
vllm_config
:
VllmConfig
,
local_rank
:
int
,
rank
:
int
,
distributed_init_method
:
str
,
is_driver_worker
:
bool
,
)
->
None
:
WorkerBase
.
__init__
(
self
,
vllm_config
=
vllm_config
)
self
.
parallel_config
.
rank
=
rank
self
.
local_rank
=
local_rank
self
.
rank
=
rank
self
.
distributed_init_method
=
distributed_init_method
self
.
is_driver_worker
=
is_driver_worker
assert
self
.
device_config
.
device_type
==
"tpu"
if
self
.
cache_config
.
cache_dtype
==
"auto"
:
self
.
cache_dtype
=
self
.
model_config
.
dtype
else
:
self
.
cache_dtype
=
STR_DTYPE_TO_TORCH_DTYPE
[
self
.
cache_config
.
cache_dtype
]
self
.
model_runner
:
TPUModelRunner
=
TPUModelRunner
(
vllm_config
=
vllm_config
,
is_driver_worker
=
is_driver_worker
)
if
self
.
model_config
.
seed
is
None
:
self
.
model_config
.
seed
=
0
if
vllm_config
.
lora_config
is
not
None
:
raise
NotImplementedError
(
"The V0 TPU backend doesn't support LoRA serving"
)
def
init_device
(
self
)
->
None
:
os
.
environ
[
"PJRT_DEVICE"
]
=
"TPU"
torch
.
set_grad_enabled
(
False
)
torch
.
set_default_dtype
(
self
.
model_config
.
dtype
)
# NOTE(woosuk): This is just to initialize the TP group and broadcast
# the input objects on CPU. The all-reduce and all-gather ops on TPU
# are invoked by `xm.all_reduce` and `xm.all_gather` which use their
# own context.
init_distributed_environment
(
world_size
=
self
.
parallel_config
.
world_size
,
rank
=
self
.
rank
,
local_rank
=
self
.
local_rank
,
distributed_init_method
=
self
.
distributed_init_method
,
backend
=
"gloo"
,
)
ensure_model_parallel_initialized
(
self
.
parallel_config
.
tensor_parallel_size
,
self
.
parallel_config
.
pipeline_parallel_size
)
# Device initialization should happen after initializing the distributed
# runtime.
self
.
device
=
xm
.
xla_device
()
self
.
device_config
.
device
=
self
.
device
# Set random seed.
set_random_seed
(
self
.
model_config
.
seed
)
xm
.
set_rng_state
(
self
.
model_config
.
seed
,
self
.
device
)
# Increase the cache size limit, which is the maximum number of
# dynamo graphs that can be compiled.
# NOTE(woosuk): Usually, we compile 10-15 graphs for prefill and
# 30-40 graphs for decode. 128 is an arbitrary safe number.
torch
.
_dynamo
.
config
.
cache_size_limit
=
128
# Use persistent cache to avoid XLA recompilation.
# NOTE(woosuk): Set per-rank cache path since different ranks
# can have slightly different XLA graphs.
world_size
=
self
.
parallel_config
.
world_size
rank
=
xr
.
global_ordinal
()
# The PyTorch/XLA compilation cache uses the Torch IR to generate keys.
# Consequently, changes in optimization flags, which affect compilation
# results, don't change the cache key. This can result in the wrong
# compilation being used. To prevent this, disabling the XLA compilation
# cache during development is recommended.We can disable it by
# `export VLLM_XLA_CACHE_PATH=`
if
envs
.
VLLM_XLA_CACHE_PATH
:
per_rank_path
=
os
.
path
.
join
(
envs
.
VLLM_XLA_CACHE_PATH
,
f
"tp
{
world_size
}
_rank
{
rank
}
"
)
xr
.
initialize_cache
(
per_rank_path
,
readonly
=
False
)
self
.
profiler
=
None
if
envs
.
VLLM_TORCH_PROFILER_DIR
and
self
.
rank
<
1
:
# For TPU, we can only have 1 active profiler session for 1 profiler
# server. So we only profile on rank0.
self
.
profile_dir
=
envs
.
VLLM_TORCH_PROFILER_DIR
logger
.
info
(
"Profiling enabled. Traces will be saved to: %s"
,
self
.
profile_dir
)
self
.
profiler
=
xp
.
start_server
(
9012
)
def
start_profile
(
self
):
if
self
.
rank
<
1
:
if
self
.
profiler
is
None
:
raise
RuntimeError
(
"Profiler is not enabled."
)
xp
.
start_trace
(
self
.
profile_dir
)
def
stop_profile
(
self
):
if
self
.
rank
<
1
:
if
self
.
profiler
is
None
:
raise
RuntimeError
(
"Profiler is not enabled."
)
xp
.
stop_trace
()
def
load_model
(
self
):
self
.
model_runner
.
load_model
()
def
determine_num_available_blocks
(
self
)
->
Tuple
[
int
,
int
]:
num_layers
=
self
.
model_config
.
get_num_layers
(
self
.
parallel_config
)
head_size
=
self
.
model_config
.
get_head_size
()
num_kv_heads
=
self
.
model_config
.
get_num_kv_heads
(
self
.
parallel_config
)
# use an empty tensor instead of `None`` to force Dynamo to pass
# it by reference, rather by specializing on the value ``None``.
# the `dtype` argument does not matter, and we use `float32` as
# a placeholder (it has wide hardware support).
kv_caches
=
[(
torch
.
tensor
([],
dtype
=
torch
.
float32
,
device
=
self
.
device
),
torch
.
tensor
([],
dtype
=
torch
.
float32
,
device
=
self
.
device
))
for
_
in
range
(
num_layers
)]
bind_kv_cache
(
self
.
compilation_config
.
static_forward_context
,
[
kv_caches
])
self
.
model_runner
.
_dummy_run
(
batch_size
=
1
,
seq_len
=
self
.
scheduler_config
.
max_num_batched_tokens
,
kv_caches
=
kv_caches
,
exec_mode
=
ExecutionMode
.
PREFILL
,
)
# Synchronize before measuring the memory usage.
xm
.
wait_device_ops
()
# Get the maximum amount of memory used by the model weights and
# intermediate activations.
m
=
xm
.
get_memory_info
(
self
.
device
)
total_memory_size
=
m
[
"bytes_limit"
]
profiled
=
m
[
"peak_bytes_used"
]
# Weights + intermediate activations.
# Calculate the TPU KV cache size based on profiling.
usable_memory_size
=
int
(
total_memory_size
*
self
.
cache_config
.
gpu_memory_utilization
)
tpu_kv_cache_bytes
=
max
(
usable_memory_size
-
profiled
,
0
)
dtype_bytes
=
get_dtype_size
(
self
.
cache_dtype
)
block_size_bytes
=
(
dtype_bytes
*
self
.
cache_config
.
block_size
*
num_layers
*
2
*
head_size
*
num_kv_heads
)
num_tpu_blocks
=
tpu_kv_cache_bytes
//
block_size_bytes
num_tpu_blocks
=
(
num_tpu_blocks
//
8
)
*
8
# Round down to 8.
# Calculate the CPU KV cache size based on the config.
num_cpu_blocks
=
int
(
self
.
cache_config
.
swap_space_bytes
//
block_size_bytes
)
num_cpu_blocks
=
(
num_cpu_blocks
//
8
)
*
8
# Round down to 8.
return
num_tpu_blocks
,
num_cpu_blocks
def
initialize_cache
(
self
,
num_gpu_blocks
:
int
,
num_cpu_blocks
:
int
,
)
->
None
:
self
.
cache_config
.
num_gpu_blocks
=
num_gpu_blocks
self
.
cache_config
.
num_cpu_blocks
=
num_cpu_blocks
self
.
block_size
=
self
.
cache_config
.
block_size
dtype
=
self
.
cache_dtype
num_layers
=
self
.
model_config
.
get_num_layers
(
self
.
parallel_config
)
num_kv_heads
=
self
.
model_config
.
get_num_kv_heads
(
self
.
parallel_config
)
head_size
=
self
.
model_config
.
get_head_size
()
self
.
cpu_cache
:
List
[
Tuple
[
torch
.
Tensor
,
torch
.
Tensor
]]
=
[]
self
.
tpu_cache
:
List
[
Tuple
[
torch
.
Tensor
,
torch
.
Tensor
]]
=
[]
tpu_cache_shape
=
self
.
model_runner
.
attn_backend
.
get_kv_cache_shape
(
num_gpu_blocks
,
self
.
block_size
,
num_kv_heads
,
head_size
)
cpu_cache_shape
=
self
.
model_runner
.
attn_backend
.
get_kv_cache_shape
(
num_cpu_blocks
,
self
.
block_size
,
num_kv_heads
,
head_size
)
for
_
in
range
(
num_layers
):
tpu_k_cache
=
torch
.
zeros
(
tpu_cache_shape
,
dtype
=
dtype
,
device
=
self
.
device
)
tpu_v_cache
=
torch
.
zeros_like
(
tpu_k_cache
)
self
.
tpu_cache
.
append
((
tpu_k_cache
,
tpu_v_cache
))
cpu_k_cache
=
torch
.
zeros
(
cpu_cache_shape
,
dtype
=
dtype
,
device
=
"cpu"
)
cpu_v_cache
=
torch
.
zeros_like
(
cpu_k_cache
)
self
.
cpu_cache
.
append
((
cpu_k_cache
,
cpu_v_cache
))
bind_kv_cache
(
self
.
compilation_config
.
static_forward_context
,
[
self
.
tpu_cache
])
self
.
_warmup_model
()
def
_warmup_model
(
self
)
->
None
:
# FIXME(woosuk): Here we are abusing `enforce_eager` which is defined
# for CUDA graphs. We should refactor this part.
if
not
self
.
model_config
.
enforce_eager
:
# Warm up the model with all possible input shapes so that
# compilation never happens during the actual execution.
# This may take ~30 mins for the first run and ~20 mins for the
# subsequent runs.
# If `enforce_eager` is True, the ahead-of-time compilation is
# skipped and the compilation happens during the actual execution,
# which is bad for performance but useful for development.
self
.
model_runner
.
warmup_model
(
self
.
tpu_cache
)
def
get_cache_block_size_bytes
(
self
)
->
int
:
head_size
=
self
.
model_config
.
get_head_size
()
num_heads
=
self
.
model_config
.
get_num_kv_heads
(
self
.
parallel_config
)
num_layers
=
self
.
model_config
.
get_num_layers
(
self
.
parallel_config
)
key_cache_block
=
self
.
cache_config
.
block_size
*
num_heads
*
head_size
value_cache_block
=
key_cache_block
total
=
num_layers
*
(
key_cache_block
+
value_cache_block
)
dtype_size
=
get_dtype_size
(
self
.
cache_dtype
)
return
dtype_size
*
total
@
property
def
do_metadata_broadcast
(
self
)
->
bool
:
return
self
.
parallel_config
.
tensor_parallel_size
>
1
@
property
def
kv_cache
(
self
)
->
Optional
[
List
[
List
[
torch
.
Tensor
]]]:
# NOTE(woosuk): This assumes virtual_engine == 0, i.e., no pipeline
# parallelism.
return
[
self
.
tpu_cache
]
def
prepare_worker_input
(
self
,
execute_model_req
:
ExecuteModelRequest
,
)
->
WorkerInput
:
virtual_engine
=
execute_model_req
.
virtual_engine
num_seq_groups
=
len
(
execute_model_req
.
seq_group_metadata_list
)
blocks_to_swap_in
=
_make_src_to_dst
(
execute_model_req
.
blocks_to_swap_in
,
"cpu"
,
self
.
device
)
blocks_to_swap_out
=
_make_src_to_dst
(
execute_model_req
.
blocks_to_swap_out
,
self
.
device
,
"cpu"
)
blocks_to_copy
=
_make_src_to_dst
(
execute_model_req
.
blocks_to_copy
,
self
.
device
,
self
.
device
)
return
WorkerInput
(
num_seq_groups
=
num_seq_groups
,
blocks_to_swap_in
=
blocks_to_swap_in
,
blocks_to_swap_out
=
blocks_to_swap_out
,
blocks_to_copy
=
blocks_to_copy
,
virtual_engine
=
virtual_engine
,
)
def
execute_worker
(
self
,
worker_input
:
WorkerInput
)
->
None
:
virtual_engine
=
worker_input
.
virtual_engine
assert
virtual_engine
==
0
attn_backend
=
self
.
model_runner
.
attn_backend
num_layers
=
self
.
model_config
.
get_num_layers
(
self
.
parallel_config
)
# Issue cache operations.
if
worker_input
.
blocks_to_swap_in
is
not
None
:
src_indices
,
dst_indices
=
worker_input
.
blocks_to_swap_in
if
src_indices
.
numel
()
>
0
:
# Swap from CPU to TPU.
for
i
in
range
(
num_layers
):
tpu_k_cache
,
tpu_v_cache
=
self
.
tpu_cache
[
i
]
cpu_k_cache
,
cpu_v_cache
=
self
.
cpu_cache
[
i
]
k
=
cpu_k_cache
[:,
src_indices
].
to
(
self
.
device
)
v
=
cpu_v_cache
[:,
src_indices
].
to
(
self
.
device
)
_insert_kv
(
k
,
v
,
dst_indices
,
tpu_k_cache
,
tpu_v_cache
)
if
worker_input
.
blocks_to_swap_out
is
not
None
:
src_indices
,
dst_indices
=
worker_input
.
blocks_to_swap_out
if
src_indices
.
numel
()
>
0
:
# Swap from TPU to CPU.
for
i
in
range
(
num_layers
):
tpu_k_cache
,
tpu_v_cache
=
self
.
tpu_cache
[
i
]
cpu_k_cache
,
cpu_v_cache
=
self
.
cpu_cache
[
i
]
cpu_k_cache
[:,
dst_indices
]
=
tpu_k_cache
[:,
src_indices
]
cpu_v_cache
[:,
dst_indices
]
=
tpu_v_cache
[:,
src_indices
]
if
worker_input
.
blocks_to_copy
is
not
None
:
src_indices
,
dst_indices
=
worker_input
.
blocks_to_copy
if
src_indices
.
numel
()
>
0
:
attn_backend
.
copy_blocks
(
self
.
tpu_cache
,
(
src_indices
,
dst_indices
))
def
_make_src_to_dst
(
mapping
:
List
[
Tuple
[
int
,
int
]],
src_device
:
Union
[
torch
.
device
,
str
],
dst_device
:
Union
[
torch
.
device
,
str
],
)
->
Optional
[
Tuple
[
torch
.
Tensor
,
torch
.
Tensor
]]:
if
not
mapping
:
return
None
src_indices
=
[
i
for
i
,
_
in
mapping
]
dst_indices
=
[
i
for
_
,
i
in
mapping
]
src_indices
=
torch
.
tensor
(
src_indices
,
device
=
src_device
,
dtype
=
torch
.
int64
)
dst_indices
=
torch
.
tensor
(
dst_indices
,
device
=
dst_device
,
dtype
=
torch
.
int64
)
return
src_indices
,
dst_indices
@
torch
.
compile
(
backend
=
"openxla"
)
def
_insert_kv
(
k
:
torch
.
Tensor
,
v
:
torch
.
Tensor
,
indices
:
torch
.
Tensor
,
tpu_k_cache
:
torch
.
Tensor
,
tpu_v_cache
:
torch
.
Tensor
,
)
->
None
:
torch
.
ops
.
xla
.
dynamo_set_buffer_donor_
(
tpu_k_cache
,
True
)
torch
.
ops
.
xla
.
dynamo_set_buffer_donor_
(
tpu_v_cache
,
True
)
tpu_k_cache
[:,
indices
]
=
k
tpu_v_cache
[:,
indices
]
=
v
vllm/worker/xpu_model_runner.py
deleted
100644 → 0
View file @
3b2aefb1
This diff is collapsed.
Click to expand it.
vllm/worker/xpu_worker.py
deleted
100644 → 0
View file @
3b2aefb1
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""A XPU worker class."""
import
gc
import
os
from
typing
import
List
,
Optional
,
Tuple
import
intel_extension_for_pytorch
# noqa: F401
import
oneccl_bindings_for_pytorch
# noqa: F401
import
torch
import
torch.distributed
from
vllm.config
import
VllmConfig
from
vllm.distributed
import
(
ensure_model_parallel_initialized
,
init_distributed_environment
)
from
vllm.distributed.parallel_state
import
get_pp_group
from
vllm.logger
import
init_logger
from
vllm.model_executor
import
set_random_seed
from
vllm.platforms
import
current_platform
from
vllm.worker.cache_engine
import
CacheEngine
from
vllm.worker.worker
import
Worker
from
vllm.worker.worker_base
import
LoRANotSupportedWorkerBase
,
WorkerBase
from
vllm.worker.xpu_model_runner
import
XPUModelRunner
logger
=
init_logger
(
__name__
)
class
XPUWorker
(
LoRANotSupportedWorkerBase
,
Worker
):
"""A worker class that executes (a partition of) the model on a GPU.
Each worker is associated with a single XPU device. The worker is
responsible for maintaining the KV cache and executing the model on the
XPU. In case of distributed inference, each worker is assigned a partition
of the model.
"""
def
__init__
(
self
,
vllm_config
:
VllmConfig
,
local_rank
:
int
,
rank
:
int
,
distributed_init_method
:
str
,
is_driver_worker
:
bool
=
False
,
)
->
None
:
WorkerBase
.
__init__
(
self
,
vllm_config
=
vllm_config
)
device_config
=
self
.
device_config
parallel_config
=
self
.
parallel_config
assert
device_config
.
device_type
==
"xpu"
assert
current_platform
.
is_xpu
()
self
.
parallel_config
.
rank
=
rank
self
.
local_rank
=
local_rank
self
.
rank
=
rank
self
.
distributed_init_method
=
distributed_init_method
self
.
is_driver_worker
=
is_driver_worker
if
parallel_config
and
is_driver_worker
:
assert
rank
%
parallel_config
.
tensor_parallel_size
==
0
,
\
"Driver worker should be rank 0 of tensor parallel group."
self
.
model_runner
=
XPUModelRunner
(
# type: ignore
vllm_config
=
vllm_config
,
kv_cache_dtype
=
self
.
cache_config
.
cache_dtype
,
is_driver_worker
=
is_driver_worker
,
)
# Uninitialized cache engine. Will be initialized by
# initialize_cache.
self
.
cache_engine
:
List
[
CacheEngine
]
self
.
gpu_cache
:
Optional
[
List
[
List
[
torch
.
Tensor
]]]
def
init_device
(
self
)
->
None
:
if
self
.
device_config
.
device
.
type
==
"xpu"
and
current_platform
.
is_xpu
(
):
self
.
device
=
torch
.
device
(
f
"xpu:
{
self
.
local_rank
}
"
)
torch
.
xpu
.
set_device
(
self
.
device
)
torch
.
xpu
.
empty_cache
()
self
.
init_gpu_memory
=
torch
.
xpu
.
get_device_properties
(
self
.
local_rank
).
total_memory
else
:
raise
RuntimeError
(
f
"Not support device type:
{
self
.
device_config
.
device
}
"
)
# Initialize the distributed environment.
self
.
init_worker_distributed_environment
()
# Initialize the model.
set_random_seed
(
self
.
model_config
.
seed
)
# keep this method for `empty_cache` and `synchronize` api
@
torch
.
inference_mode
()
def
determine_num_available_blocks
(
self
)
->
Tuple
[
int
,
int
]:
"""Profiles the peak memory usage of the model to determine how many
KV blocks may be allocated without OOMs.
The engine will first conduct a profiling of the existing memory usage.
Then, it calculate the maximum possible number of GPU and CPU blocks
that can be allocated with the remaining free memory.
Tip:
You may limit the usage of GPU memory
by adjusting the `gpu_memory_utilization` parameter.
"""
# Profile the memory usage of the model and get the maximum number of
# cache blocks that can be allocated with the remaining free memory.
torch
.
xpu
.
empty_cache
()
# Execute a forward pass with dummy inputs to profile the memory usage
# of the model.
self
.
model_runner
.
profile_run
()
# Calculate the number of blocks that can be allocated with the
# profiled peak memory.
torch
.
xpu
.
synchronize
()
used_memory
=
torch
.
xpu
.
memory_allocated
()
total_gpu_memory
=
torch
.
xpu
.
get_device_properties
(
self
.
local_rank
).
total_memory
free_gpu_memory
=
total_gpu_memory
-
used_memory
# NOTE(woosuk): Here we assume that the other processes using the same
# GPU did not change their memory usage during the profiling.
peak_memory
=
self
.
init_gpu_memory
-
free_gpu_memory
assert
peak_memory
>
0
,
(
"Error in memory profiling. "
f
"Initial free memory
{
self
.
init_gpu_memory
}
, current free memory"
f
"
{
free_gpu_memory
}
. This happens when the GPU memory was "
"not properly cleaned up before initializing the vLLM instance."
)
cache_block_size
=
self
.
get_cache_block_size_bytes
()
num_gpu_blocks
=
int
(
(
total_gpu_memory
*
self
.
cache_config
.
gpu_memory_utilization
-
peak_memory
)
//
cache_block_size
)
num_cpu_blocks
=
int
(
self
.
cache_config
.
swap_space_bytes
//
cache_block_size
)
num_gpu_blocks
=
max
(
num_gpu_blocks
,
0
)
num_cpu_blocks
=
max
(
num_cpu_blocks
,
0
)
gc
.
collect
()
torch
.
xpu
.
empty_cache
()
return
num_gpu_blocks
,
num_cpu_blocks
def
_warm_up_model
(
self
)
->
None
:
# IPEX don't support capture graph yet
pass
def
init_worker_distributed_environment
(
self
)
->
None
:
"""Initialize the distributed environment."""
parallel_config
=
self
.
parallel_config
rank
=
self
.
rank
distributed_init_method
=
self
.
distributed_init_method
if
torch
.
distributed
.
is_initialized
():
torch_world_size
=
torch
.
distributed
.
get_world_size
()
if
torch_world_size
!=
parallel_config
.
world_size
:
raise
RuntimeError
(
"torch.distributed is already initialized but the torch "
"world size does not match parallel_config.world_size "
f
"(
{
torch_world_size
}
vs.
{
parallel_config
.
world_size
}
)."
)
elif
not
distributed_init_method
:
raise
ValueError
(
"distributed_init_method must be set if torch.distributed "
"is not already initialized"
)
else
:
# use sockets as default Level zero IPC exchange backend. By
# default oneccl will use `drmfd` as mechanism which need extra
# dependency (libdrm and drm headers) on your system.
ENV_CCL_ATL_TRANSPORT
=
os
.
getenv
(
"CCL_ATL_TRANSPORT"
,
"ofi"
)
ENV_LOCAL_WORLD_SIZE
=
os
.
getenv
(
"LOCAL_WORLD_SIZE"
,
str
(
parallel_config
.
world_size
))
os
.
environ
[
"CCL_ATL_TRANSPORT"
]
=
ENV_CCL_ATL_TRANSPORT
os
.
environ
[
"LOCAL_WORLD_SIZE"
]
=
ENV_LOCAL_WORLD_SIZE
os
.
environ
[
"LOCAL_RANK"
]
=
str
(
self
.
local_rank
)
init_distributed_environment
(
world_size
=
parallel_config
.
world_size
,
rank
=
rank
,
distributed_init_method
=
distributed_init_method
,
local_rank
=
self
.
local_rank
,
backend
=
"ccl"
)
ensure_model_parallel_initialized
(
parallel_config
.
tensor_parallel_size
,
parallel_config
.
pipeline_parallel_size
)
# global all_reduce needed for overall oneccl warm up
torch
.
distributed
.
all_reduce
(
torch
.
zeros
(
1
).
xpu
())
if
parallel_config
.
pipeline_parallel_size
>
1
:
# Add pp group init to avoid
# p2p communication as the first call
get_pp_group
().
all_reduce
(
torch
.
zeros
(
1
).
xpu
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment