Commit 6d2051cc authored by zhuwenwen's avatar zhuwenwen
Browse files

Merge tag 'v0.6.3.post1' into v0.6.3.post1-dev

parents 2c7f740a a2c71c54
import copy
import operator import operator
from typing import Callable, Dict, List, Optional, Tuple, Union
import torch import torch
import torch.fx as fx import torch.fx as fx
from vllm.logger import init_logger
from .compile_context import get_compile_context
from .levels import CompilationLevel
logger = init_logger(__name__)
def fix_functionalization(graph: fx.Graph): def fix_functionalization(graph: fx.Graph):
""" """
...@@ -148,9 +157,113 @@ def fix_functionalization(graph: fx.Graph): ...@@ -148,9 +157,113 @@ def fix_functionalization(graph: fx.Graph):
# print(graph.python_code(root_module="self", verbose=True).src, file=f) # print(graph.python_code(root_module="self", verbose=True).src, file=f)
def vllm_backend(graph, example_inputs): def wrap_inductor(graph, example_inputs, additional_inductor_config):
from torch._inductor import config from torch._inductor import config
current_config = config.shallow_copy_dict() current_config = config.shallow_copy_dict()
from torch._inductor.compile_fx import compile_fx from torch._inductor.compile_fx import compile_fx
if additional_inductor_config is not None:
current_config.update(additional_inductor_config)
if current_config['post_grad_custom_post_pass'] is not None:
logger.warning(
"post_grad_custom_post_pass is already set in the config. "
"Overwriting it with the fix_functionalization")
current_config['post_grad_custom_post_pass'] = fix_functionalization current_config['post_grad_custom_post_pass'] = fix_functionalization
return compile_fx(graph, example_inputs, config_patches=current_config) return compile_fx(graph, example_inputs, config_patches=current_config)
def vllm_backend(
graph,
example_inputs,
additional_inductor_config: Optional[Dict] = None) -> Callable:
context = get_compile_context()
context = copy.deepcopy(context) if context is not None else []
sizes_to_specialize: List[int] = context
# flags for all the seen shapes, whether we need to specialize
runtime_shapes_to_compile_flags: Dict[Tuple[int, ...], bool] = {}
# if we need to specialize, the compiled graph for that shape
runtime_shapes_to_compiled_graph: Dict[Tuple[int, ...], Callable] = {}
# this is the first compilation, we will compile a graph with
# dynamic shape, as the caller will mark first dimension as dynamic
logger.info("Compiling a graph for general shapes")
graph_for_symbolic_shape = wrap_inductor(graph, example_inputs,
additional_inductor_config)
# TODO: Dynamo does not pass all dynamic shapes.
# Need to investigate why. It works now because all the dynamic
# shapes have the same value, and either of them can be used.
sym_shape_indices = [
i for i, x in enumerate(example_inputs) if isinstance(x, torch.SymInt)
]
first_run = True
# this is the function we return to Dynamo to run finally
def compiled_graph_wrapper(*args):
runtime_shapes: Tuple[int,
...] = tuple(args[i] for i in sym_shape_indices)
nonlocal first_run
nonlocal runtime_shapes_to_compile_flags
nonlocal runtime_shapes_to_compiled_graph
if first_run:
# the first compilation is for profiling, we directly run it
first_run = False
return graph_for_symbolic_shape(*args)
if runtime_shapes not in runtime_shapes_to_compile_flags:
# we haven't seen this shape before
# query if we need to specialize for this shape
# we only specialize for the first dimension.
# TODO: investigate if any model needs to specialize
# beyond the first dimension
runtime_shapes_to_compile_flags[runtime_shapes] = runtime_shapes[
0] in sizes_to_specialize
if not runtime_shapes_to_compile_flags[runtime_shapes]:
# we don't need to specialize for this shape
return graph_for_symbolic_shape(*args)
if runtime_shapes not in runtime_shapes_to_compiled_graph:
# we need to specialize for this shape, and we haven't compiled
# compile the graph for this shape
logger.info("Compiling a graph for shapes %s", runtime_shapes)
runtime_shapes_to_compiled_graph[runtime_shapes] = wrap_inductor(
graph, args, additional_inductor_config)
return runtime_shapes_to_compiled_graph[runtime_shapes](*args)
return compiled_graph_wrapper
def select_default_backend(level: int) -> Union[str, Callable]:
if level in [CompilationLevel.DYNAMO_AS_IS, CompilationLevel.DYNAMO_ONCE]:
backend_str = "eager"
return backend_str
assert level in [
CompilationLevel.INDUCTOR, CompilationLevel.INDUCTOR_MAX_AUTOTUNE
], f"Invalid level {level}"
from vllm.compilation.backends import vllm_backend
from vllm.plugins import get_inductor_additional_configs
additional_configs = get_inductor_additional_configs()
if level == CompilationLevel.INDUCTOR_MAX_AUTOTUNE:
if "max_autotune" in additional_configs and not additional_configs[
"max_autotune"]:
logger.warning(
"max_autotune is disabled, but is overridden by level %s",
CompilationLevel.INDUCTOR_MAX_AUTOTUNE)
additional_configs['max_autotune'] = True
from functools import partial
backend = partial(vllm_backend,
additional_inductor_config=additional_configs)
return backend
from contextlib import contextmanager
from typing import Any
_compile_context: Any = None
def get_compile_context() -> Any:
"""Get the current compile context."""
return _compile_context
@contextmanager
def set_compile_context(context: Any):
"""A context manager that stores the current compile context,
usually it is a list of sizes to specialize.
"""
global _compile_context
prev_context = _compile_context
_compile_context = context
try:
yield
finally:
_compile_context = prev_context
import inspect
from typing import Dict, List, Union
import torch
import vllm.envs as envs
from vllm.compilation.levels import CompilationLevel
from vllm.compilation.wrapper import TorchCompileWrapperWithCustomDispatcher
from vllm.sequence import IntermediateTensors
from vllm.utils import supports_dynamo
def support_torch_compile(dynamic_arg_dims: Dict[str, Union[int, List[int]]]):
"""
A decorator to add support for compiling the forward method of a class.
`dynamic_arg_dims` is a dictionary that maps argument names to the dynamic
dimensions of the argument. The dynamic dimensions can be either a single
integer or a list of integers.
Depending on the value of arguments:
- if it is a single integer, the corresponding dimension of the argument
will be marked as dynamic.
- if it is `None`, ignored.
- if it is `IntermediateTensors`, all the tensors in the intermediate
tensors will be marked as dynamic.
- otherwise, it will raise an error.
NOTE: if an argument is `None`, it should always be passed as `None` during
the lifetime of the model, otherwise, it cannot be captured as a single
computation graph.
"""
def cls_decorator_helper(cls: type):
# helper to pass `dynamic_arg_dims`` to `_support_torch_compile``
# to avoid too much indentation for `_support_torch_compile``
if not hasattr(cls, 'forward'):
raise TypeError("decorated class should have a forward method.")
sig = inspect.signature(cls.forward)
for k in dynamic_arg_dims:
if k not in sig.parameters:
raise ValueError(
f"Argument {k} not found in the forward method of {cls}")
return _support_torch_compile(cls, dynamic_arg_dims)
return cls_decorator_helper
def _support_torch_compile(cls: type,
dynamic_arg_dims: Dict[str, Union[int, List[int]]]):
"""
A decorator to add support for compiling the forward method of a class.
"""
# for CompilationLevel.DYNAMO_AS_IS , the upper level model runner
# will handle the compilation, so we don't need to do anything here.
if envs.VLLM_TORCH_COMPILE_LEVEL in [
CompilationLevel.NO_COMPILATION, CompilationLevel.DYNAMO_AS_IS
] or not supports_dynamo():
return cls
# take care of method resolution order
# make sure super().__init__ is called on the base class
# other than TorchCompileWrapperWithCustomDispatcher
cls.__bases__ = cls.__bases__ + (TorchCompileWrapperWithCustomDispatcher, )
old_init = cls.__init__ # type: ignore
def __init__(self, *args, **kwargs):
old_init(self, *args, **kwargs)
TorchCompileWrapperWithCustomDispatcher.__init__(self)
cls.__init__ = __init__ # type: ignore
def __call__(self, *args, **kwargs):
# torch.compiler.is_compiling() means we are inside the compilation
# e.g. TPU has the compilation logic in model runner, so we don't
# need to compile the model inside.
if torch.compiler.is_compiling():
return self.forward(*args, **kwargs)
# the first compilation needs to have dynamic shapes marked
if len(self.compiled_codes) < 1:
sig = inspect.signature(self.__class__.forward)
bound_args = sig.bind(self, *args, **kwargs)
bound_args.apply_defaults()
for k, dims in dynamic_arg_dims.items():
arg = bound_args.arguments.get(k)
if arg is not None:
if isinstance(arg, torch.Tensor):
torch._dynamo.mark_dynamic(arg, dims)
elif isinstance(arg, IntermediateTensors):
for tensor in arg.tensors.values():
torch._dynamo.mark_dynamic(tensor, dims)
else:
raise ValueError(
"Unsupported dynamic dimensions"
f" {dims} for argument {k} with type {type(arg)}.")
# if we don't use custom dispatcher, we can directly call the
# compiled function and let torch.compile handle the dispatching,
# with the overhead of guard evaluation and recompilation.
if len(self.compiled_codes) < 1 or not self.use_custom_dispatcher:
return self.compiled_callable(*args, **kwargs)
# usually, capturing the model once is enough, and then we can
# dispatch to the compiled code directly, without going through
# the Dynamo guard mechanism.
with self.dispatch_to_code(0):
model_output = self.forward(*args, **kwargs)
return model_output
cls.__call__ = __call__ # type: ignore
return cls
# constants for the levels of the compilation process
class CompilationLevel:
NO_COMPILATION = 0
DYNAMO_AS_IS = 1
DYNAMO_ONCE = 2
INDUCTOR = 3
INDUCTOR_MAX_AUTOTUNE = 4
...@@ -3,12 +3,14 @@ import sys ...@@ -3,12 +3,14 @@ import sys
from abc import abstractmethod from abc import abstractmethod
from contextlib import contextmanager from contextlib import contextmanager
from types import CodeType from types import CodeType
from typing import Callable, List from typing import Callable, List, Optional
import torch import torch
import vllm.envs as envs import vllm.envs as envs
from .levels import CompilationLevel
class TorchCompileWrapperWithCustomDispatcher: class TorchCompileWrapperWithCustomDispatcher:
""" """
...@@ -23,7 +25,26 @@ class TorchCompileWrapperWithCustomDispatcher: ...@@ -23,7 +25,26 @@ class TorchCompileWrapperWithCustomDispatcher:
`torch.compile` over the forward method. `torch.compile` over the forward method.
""" """
def __init__(self, compiled_callable: Callable): def __init__(self, compiled_callable: Optional[Callable] = None):
if compiled_callable is None:
# default compilation settings
# compiling the forward method
# choose the compile backend
# if the user has set the backend, use it
from vllm.plugins import get_torch_compile_backend
backend = get_torch_compile_backend()
if backend is None:
from vllm.compilation.backends import select_default_backend
backend = select_default_backend(envs.VLLM_TORCH_COMPILE_LEVEL)
compiled_callable = torch.compile(
self.forward,
fullgraph=envs.VLLM_TEST_DYNAMO_FULLGRAPH_CAPTURE,
backend=backend)
self.compiled_callable = compiled_callable self.compiled_callable = compiled_callable
self.original_code_object = self.__class__.forward.__code__ self.original_code_object = self.__class__.forward.__code__
self.compiled_codes: List[CodeType] = [] self.compiled_codes: List[CodeType] = []
...@@ -33,7 +54,7 @@ class TorchCompileWrapperWithCustomDispatcher: ...@@ -33,7 +54,7 @@ class TorchCompileWrapperWithCustomDispatcher:
# subclasses can use this to switch between the custom dispatcher # subclasses can use this to switch between the custom dispatcher
# and the default Dynamo guard mechanism. # and the default Dynamo guard mechanism.
self.use_custom_dispatcher: bool = \ self.use_custom_dispatcher: bool = \
envs.VLLM_DYNAMO_USE_CUSTOM_DISPATCHER envs.VLLM_TORCH_COMPILE_LEVEL >= CompilationLevel.DYNAMO_ONCE
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
"""Implement the dispatch logic here, beyond the torch.compile level. """Implement the dispatch logic here, beyond the torch.compile level.
...@@ -52,7 +73,7 @@ class TorchCompileWrapperWithCustomDispatcher: ...@@ -52,7 +73,7 @@ class TorchCompileWrapperWithCustomDispatcher:
return return
# code borrowed from https://github.com/thuml/depyf/blob/f4ad79fadee27ea113b4c75202db1eb1a11c0dbc/depyf/explain/enable_debugging.py#L25 # code borrowed from https://github.com/thuml/depyf/blob/f4ad79fadee27ea113b4c75202db1eb1a11c0dbc/depyf/explain/enable_debugging.py#L25
frame = sys._getframe() frame = sys._getframe()
while True: while frame and frame.f_back:
frame = frame.f_back frame = frame.f_back
code_name = frame.f_code.co_name code_name = frame.f_code.co_name
file_name = frame.f_code.co_filename.split(os.path.sep)[-1] file_name = frame.f_code.co_filename.split(os.path.sep)[-1]
......
...@@ -31,28 +31,7 @@ if TYPE_CHECKING: ...@@ -31,28 +31,7 @@ if TYPE_CHECKING:
logger = init_logger(__name__) logger = init_logger(__name__)
_EMBEDDING_MODEL_MAX_NUM_BATCHED_TOKENS = 32768 _EMBEDDING_MODEL_MAX_NUM_BATCHED_TOKENS = 32768
_MULTIMODAL_MODEL_MAX_NUM_BATCHED_TOKENS = 4096 _MULTIMODAL_MODEL_MAX_NUM_BATCHED_TOKENS = 5120
_PP_SUPPORTED_MODELS = [
"AquilaForCausalLM",
"AquilaModel",
"DeepseekV2ForCausalLM",
"GPT2LMHeadModel",
"InternLM2ForCausalLM",
"InternLMForCausalLM",
"InternVLChatModel",
"JAISLMHeadModel",
"LlamaForCausalLM",
"LLaMAForCausalLM",
"MistralForCausalLM",
"MixtralForCausalLM",
"NemotronForCausalLM",
"Phi3ForCausalLM",
"Qwen2ForCausalLM",
"Qwen2MoeForCausalLM",
"QWenLMHeadModel",
"Qwen2VLForConditionalGeneration",
]
class ModelConfig: class ModelConfig:
...@@ -194,14 +173,20 @@ class ModelConfig: ...@@ -194,14 +173,20 @@ class ModelConfig:
if self.enforce_eager is None: if self.enforce_eager is None:
self.enforce_eager = False self.enforce_eager = False
if (not self.disable_sliding_window sliding_window = getattr(self.hf_text_config, "sliding_window", None)
and self.hf_text_config.model_type == "gemma2" has_interleaved_attention = (sliding_window is not None) and (
and self.hf_text_config.sliding_window is not None): isinstance(sliding_window, list) or
(self.hf_text_config.model_type in ["gemma2"]))
if (not self.disable_sliding_window and has_interleaved_attention):
sliding_window_len_min = get_min_sliding_window(
self.hf_text_config.sliding_window)
print_warning_once( print_warning_once(
"Gemma 2 uses sliding window attention for every odd layer, " f"{self.hf_text_config.model_type} has interleaved attention, "
"which is currently not supported by vLLM. Disabling sliding " "which is currently not supported by vLLM. Disabling sliding "
"window and capping the max length to the sliding window size " "window and capping the max length to the sliding window size "
f"({self.hf_text_config.sliding_window}).") f"({sliding_window_len_min}).")
self.disable_sliding_window = True self.disable_sliding_window = True
self.max_model_len = _get_and_verify_max_len( self.max_model_len = _get_and_verify_max_len(
...@@ -217,6 +202,9 @@ class ModelConfig: ...@@ -217,6 +202,9 @@ class ModelConfig:
if not self.skip_tokenizer_init: if not self.skip_tokenizer_init:
self._verify_tokenizer_mode() self._verify_tokenizer_mode()
self.is_attention_free = self._init_attention_free()
self.has_inner_state = self._init_has_inner_state()
self.override_neuron_config = override_neuron_config if is_neuron( self.override_neuron_config = override_neuron_config if is_neuron(
) else None ) else None
self._verify_embedding_mode() self._verify_embedding_mode()
...@@ -228,16 +216,22 @@ class ModelConfig: ...@@ -228,16 +216,22 @@ class ModelConfig:
self, limit_mm_per_prompt: Optional[Mapping[str, int]] self, limit_mm_per_prompt: Optional[Mapping[str, int]]
) -> Optional["MultiModalConfig"]: ) -> Optional["MultiModalConfig"]:
architectures = getattr(self.hf_config, "architectures", []) architectures = getattr(self.hf_config, "architectures", [])
if any( if ModelRegistry.is_multimodal_model(architectures):
ModelRegistry.is_multimodal_model(arch)
for arch in architectures):
return MultiModalConfig(limit_per_prompt=limit_mm_per_prompt or {}) return MultiModalConfig(limit_per_prompt=limit_mm_per_prompt or {})
else:
if limit_mm_per_prompt: if limit_mm_per_prompt:
raise ValueError( raise ValueError("`limit_mm_per_prompt` is only supported for "
"limit_mm_per_prompt is only supported for multimodal " "multimodal models.")
"models.")
return None return None
def _init_attention_free(self) -> bool:
architectures = getattr(self.hf_config, "architectures", [])
return ModelRegistry.is_attention_free_model(architectures)
def _init_has_inner_state(self) -> bool:
architectures = getattr(self.hf_config, "architectures", [])
return ModelRegistry.model_has_inner_state(architectures)
def _verify_tokenizer_mode(self) -> None: def _verify_tokenizer_mode(self) -> None:
tokenizer_mode = self.tokenizer_mode.lower() tokenizer_mode = self.tokenizer_mode.lower()
...@@ -249,8 +243,16 @@ class ModelConfig: ...@@ -249,8 +243,16 @@ class ModelConfig:
def _verify_embedding_mode(self) -> None: def _verify_embedding_mode(self) -> None:
architectures = getattr(self.hf_config, "architectures", []) architectures = getattr(self.hf_config, "architectures", [])
self.embedding_mode = any(
ModelRegistry.is_embedding_model(arch) for arch in architectures) # TODO: Allow the same model architecture to be specified as either
# generation or embedding model
if "Phi3VForCausalLM" in architectures:
# Match both remote and local names
embedding_mode = "/VLM2Vec" in self.model
else:
embedding_mode = ModelRegistry.is_embedding_model(architectures)
self.embedding_mode = embedding_mode
def _parse_quant_hf_config(self): def _parse_quant_hf_config(self):
quant_cfg = getattr(self.hf_config, "quantization_config", None) quant_cfg = getattr(self.hf_config, "quantization_config", None)
...@@ -381,9 +383,11 @@ class ModelConfig: ...@@ -381,9 +383,11 @@ class ModelConfig:
self.use_async_output_proc = False self.use_async_output_proc = False
return return
if device_config.device_type not in ("cuda", "tpu"): # Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if device_config.device_type not in ("cuda", "tpu", "xpu"):
logger.warning( logger.warning(
"Async output processing is only supported for CUDA or TPU. " "Async output processing is only supported for CUDA, TPU, XPU. "
"Disabling it for other platforms.") "Disabling it for other platforms.")
self.use_async_output_proc = False self.use_async_output_proc = False
return return
...@@ -394,6 +398,8 @@ class ModelConfig: ...@@ -394,6 +398,8 @@ class ModelConfig:
self.use_async_output_proc = False self.use_async_output_proc = False
return return
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if device_config.device_type == "cuda" and self.enforce_eager: if device_config.device_type == "cuda" and self.enforce_eager:
logger.warning( logger.warning(
"To see benefits of async output processing, enable CUDA " "To see benefits of async output processing, enable CUDA "
...@@ -407,6 +413,8 @@ class ModelConfig: ...@@ -407,6 +413,8 @@ class ModelConfig:
if self.embedding_mode: if self.embedding_mode:
self.use_async_output_proc = False self.use_async_output_proc = False
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if speculative_config: if speculative_config:
logger.warning("Async output processing is not supported with" logger.warning("Async output processing is not supported with"
" speculative decoding currently.") " speculative decoding currently.")
...@@ -426,19 +434,20 @@ class ModelConfig: ...@@ -426,19 +434,20 @@ class ModelConfig:
f"({tensor_parallel_size}).") f"({tensor_parallel_size}).")
pipeline_parallel_size = parallel_config.pipeline_parallel_size pipeline_parallel_size = parallel_config.pipeline_parallel_size
architectures = getattr(self.hf_config, "architectures", []) if pipeline_parallel_size > 1:
if not all(arch in _PP_SUPPORTED_MODELS architectures = getattr(self.hf_config, "architectures", [])
for arch in architectures) and pipeline_parallel_size > 1: if not ModelRegistry.is_pp_supported_model(architectures):
raise NotImplementedError( raise NotImplementedError(
"Pipeline parallelism is only supported for the following " "Pipeline parallelism is not supported for this model. "
f" architectures: {_PP_SUPPORTED_MODELS}.") "Supported models implement the `SupportsPP` interface.")
if pipeline_parallel_size > 1 and self.use_async_output_proc: if self.use_async_output_proc:
logger.warning("Async output processor is not supported with " logger.warning("Async output processor is not supported with "
"pipeline parallelism currently. Disabling it.") "pipeline parallelism currently. Disabling it.")
self.use_async_output_proc = False self.use_async_output_proc = False
def get_hf_config_sliding_window(self) -> Optional[int]: def get_hf_config_sliding_window(
self) -> Union[Optional[int], List[Optional[int]]]:
"""Get the sliding window size, or None if disabled.""" """Get the sliding window size, or None if disabled."""
# Some models, like Qwen2 and Qwen1.5, use `use_sliding_window` in # Some models, like Qwen2 and Qwen1.5, use `use_sliding_window` in
...@@ -449,7 +458,7 @@ class ModelConfig: ...@@ -449,7 +458,7 @@ class ModelConfig:
return None return None
return getattr(self.hf_text_config, "sliding_window", None) return getattr(self.hf_text_config, "sliding_window", None)
def get_sliding_window(self) -> Optional[int]: def get_sliding_window(self) -> Optional[Union[int, List[Optional[int]]]]:
"""Get the sliding window size, or None if disabled. """Get the sliding window size, or None if disabled.
""" """
# If user disables sliding window, return None. # If user disables sliding window, return None.
...@@ -471,6 +480,10 @@ class ModelConfig: ...@@ -471,6 +480,10 @@ class ModelConfig:
# FlashAttention supports only head_size 32, 64, 128, 256, # FlashAttention supports only head_size 32, 64, 128, 256,
# we need to pad head_size 192 to 256 # we need to pad head_size 192 to 256
return 256 return 256
if self.is_attention_free:
return 0
if hasattr(self.hf_text_config, "head_dim"): if hasattr(self.hf_text_config, "head_dim"):
return self.hf_text_config.head_dim return self.hf_text_config.head_dim
# FIXME(woosuk): This may not be true for all models. # FIXME(woosuk): This may not be true for all models.
...@@ -502,6 +515,9 @@ class ModelConfig: ...@@ -502,6 +515,9 @@ class ModelConfig:
return getattr(self.hf_config.attn_config, "kv_n_heads", return getattr(self.hf_config.attn_config, "kv_n_heads",
self.hf_config.num_attention_heads) self.hf_config.num_attention_heads)
if self.is_attention_free:
return 0
attributes = [ attributes = [
# For Falcon: # For Falcon:
"n_head_kv", "n_head_kv",
...@@ -544,31 +560,17 @@ class ModelConfig: ...@@ -544,31 +560,17 @@ class ModelConfig:
start, end = get_pp_indices(total_num_hidden_layers, pp_rank, pp_size) start, end = get_pp_indices(total_num_hidden_layers, pp_rank, pp_size)
return end - start return end - start
def contains_seqlen_agnostic_layers( def get_num_attention_layers(self,
self, parallel_config: "ParallelConfig") -> bool: parallel_config: "ParallelConfig") -> int:
"""True for Mamba/SSM models (Jamba)""" if self.is_attention_free:
return self._get_num_seqlen_agnostic_layers(parallel_config) > 0 return 0
def get_layers_block_type(self,
parallel_config: "ParallelConfig") -> List[str]:
num_layers = self.get_num_layers(parallel_config) num_layers = self.get_num_layers(parallel_config)
# Transformers supports layers_block_type @property
return getattr(self.hf_config, "layers_block_type",
["attention"] * num_layers)
def get_num_attention_layers(self, # Transformers supports layers_block_type @property
parallel_config: "ParallelConfig") -> int: layers = getattr(self.hf_config, "layers_block_type",
return len([ ["attention"] * num_layers)
t for t in self.get_layers_block_type(parallel_config) return len([t for t in layers if t == "attention"])
if t == "attention"
])
def _get_num_seqlen_agnostic_layers(
self, parallel_config: "ParallelConfig") -> int:
return len([
t for t in self.get_layers_block_type(parallel_config)
if t != "attention"
])
def get_multimodal_config(self) -> "MultiModalConfig": def get_multimodal_config(self) -> "MultiModalConfig":
""" """
...@@ -618,6 +620,7 @@ class CacheConfig: ...@@ -618,6 +620,7 @@ class CacheConfig:
gpu_memory_utilization: float, gpu_memory_utilization: float,
swap_space: float, swap_space: float,
cache_dtype: str, cache_dtype: str,
is_attention_free: bool = False,
num_gpu_blocks_override: Optional[int] = None, num_gpu_blocks_override: Optional[int] = None,
sliding_window: Optional[int] = None, sliding_window: Optional[int] = None,
enable_prefix_caching: bool = False, enable_prefix_caching: bool = False,
...@@ -628,16 +631,18 @@ class CacheConfig: ...@@ -628,16 +631,18 @@ class CacheConfig:
self.swap_space_bytes = swap_space * GiB_bytes self.swap_space_bytes = swap_space * GiB_bytes
self.num_gpu_blocks_override = num_gpu_blocks_override self.num_gpu_blocks_override = num_gpu_blocks_override
self.cache_dtype = cache_dtype self.cache_dtype = cache_dtype
self.is_attention_free = is_attention_free
self.sliding_window = sliding_window self.sliding_window = sliding_window
self.enable_prefix_caching = enable_prefix_caching self.enable_prefix_caching = enable_prefix_caching
self.cpu_offload_gb = cpu_offload_gb self.cpu_offload_gb = cpu_offload_gb
self._verify_args() self._verify_args()
self._verify_cache_dtype() self._verify_cache_dtype()
self._verify_prefix_caching() self._verify_prefix_caching()
# Will be set after profiling. # Will be set after profiling.
self.num_gpu_blocks = None self.num_gpu_blocks: Optional[int] = None
self.num_cpu_blocks = None self.num_cpu_blocks: Optional[int] = None
def metrics_info(self): def metrics_info(self):
# convert cache_config to dict(key: str, value: str) for prometheus # convert cache_config to dict(key: str, value: str) for prometheus
...@@ -714,7 +719,8 @@ class TokenizerPoolConfig: ...@@ -714,7 +719,8 @@ class TokenizerPoolConfig:
@classmethod @classmethod
def create_config( def create_config(
cls, tokenizer_pool_size: int, tokenizer_pool_type: str, cls, tokenizer_pool_size: int,
tokenizer_pool_type: Union[str, Type["BaseTokenizerGroup"]],
tokenizer_pool_extra_config: Optional[Union[str, dict]] tokenizer_pool_extra_config: Optional[Union[str, dict]]
) -> Optional["TokenizerPoolConfig"]: ) -> Optional["TokenizerPoolConfig"]:
"""Create a TokenizerPoolConfig from the given parameters. """Create a TokenizerPoolConfig from the given parameters.
...@@ -952,7 +958,6 @@ class SchedulerConfig: ...@@ -952,7 +958,6 @@ class SchedulerConfig:
iteration. iteration.
max_model_len: Maximum length of a sequence (including prompt max_model_len: Maximum length of a sequence (including prompt
and generated text). and generated text).
use_v2_block_manager: Whether to use the BlockSpaceManagerV2 or not.
num_lookahead_slots: The number of slots to allocate per sequence per num_lookahead_slots: The number of slots to allocate per sequence per
step, beyond the known token ids. This is used in speculative step, beyond the known token ids. This is used in speculative
decoding to store KV activations of tokens which may or may not be decoding to store KV activations of tokens which may or may not be
...@@ -979,7 +984,6 @@ class SchedulerConfig: ...@@ -979,7 +984,6 @@ class SchedulerConfig:
max_num_batched_tokens: Optional[int], max_num_batched_tokens: Optional[int],
max_num_seqs: int, max_num_seqs: int,
max_model_len: int, max_model_len: int,
use_v2_block_manager: bool = False,
num_lookahead_slots: int = 0, num_lookahead_slots: int = 0,
delay_factor: float = 0.0, delay_factor: float = 0.0,
enable_chunked_prefill: bool = False, enable_chunked_prefill: bool = False,
...@@ -992,9 +996,16 @@ class SchedulerConfig: ...@@ -992,9 +996,16 @@ class SchedulerConfig:
policy: str = "fcfs") -> None: policy: str = "fcfs") -> None:
if max_num_batched_tokens is None: if max_num_batched_tokens is None:
if enable_chunked_prefill: if enable_chunked_prefill:
# It is the values that have the best balance between ITL if num_scheduler_steps > 1:
# and TTFT on A100. Note it is not optimized for throughput. # Multi-step Chunked-Prefill doesn't allow prompt-chunking
max_num_batched_tokens = 512 # for now. Have max_num_batched_tokens set to max_model_len
# so we don't reject sequences on account of a short
# max_num_batched_tokens.
max_num_batched_tokens = max(max_model_len, 2048)
else:
# It is the values that have the best balance between ITL
# and TTFT on A100. Note it is not optimized for throughput.
max_num_batched_tokens = 512
else: else:
# If max_model_len is too short, use 2048 as the default value # If max_model_len is too short, use 2048 as the default value
# for higher throughput. # for higher throughput.
...@@ -1022,7 +1033,6 @@ class SchedulerConfig: ...@@ -1022,7 +1033,6 @@ class SchedulerConfig:
self.max_num_seqs = max_num_seqs self.max_num_seqs = max_num_seqs
self.max_model_len = max_model_len self.max_model_len = max_model_len
self.use_v2_block_manager = use_v2_block_manager
self.num_lookahead_slots = num_lookahead_slots self.num_lookahead_slots = num_lookahead_slots
self.delay_factor = delay_factor self.delay_factor = delay_factor
self.chunked_prefill_enabled = enable_chunked_prefill self.chunked_prefill_enabled = enable_chunked_prefill
...@@ -1118,9 +1128,9 @@ class SpeculativeConfig: ...@@ -1118,9 +1128,9 @@ class SpeculativeConfig:
speculative_model_quantization: Optional[str], speculative_model_quantization: Optional[str],
speculative_draft_tensor_parallel_size: Optional[int], speculative_draft_tensor_parallel_size: Optional[int],
num_speculative_tokens: Optional[int], num_speculative_tokens: Optional[int],
speculative_disable_mqa_scorer: Optional[bool],
speculative_max_model_len: Optional[int], speculative_max_model_len: Optional[int],
enable_chunked_prefill: bool, enable_chunked_prefill: bool,
use_v2_block_manager: bool,
disable_log_stats: bool, disable_log_stats: bool,
speculative_disable_by_batch_size: Optional[int], speculative_disable_by_batch_size: Optional[int],
ngram_prompt_lookup_max: Optional[int], ngram_prompt_lookup_max: Optional[int],
...@@ -1152,15 +1162,15 @@ class SpeculativeConfig: ...@@ -1152,15 +1162,15 @@ class SpeculativeConfig:
num_speculative_tokens (Optional[int]): The number of speculative num_speculative_tokens (Optional[int]): The number of speculative
tokens, if provided. Will default to the number in the draft tokens, if provided. Will default to the number in the draft
model config if present, otherwise is required. model config if present, otherwise is required.
speculative_disable_mqa_scorer (Optional[bool]): Disable the MQA
scorer for the speculative model and fall back to batch
expansion for scoring.
speculative_max_model_len (Optional[int]): The maximum model len of speculative_max_model_len (Optional[int]): The maximum model len of
the speculative model. Used when testing the ability to skip the speculative model. Used when testing the ability to skip
speculation for some sequences. speculation for some sequences.
enable_chunked_prefill (bool): Whether vLLM is configured to use enable_chunked_prefill (bool): Whether vLLM is configured to use
chunked prefill or not. Used for raising an error since its not chunked prefill or not. Used for raising an error since its not
yet compatible with spec decode. yet compatible with spec decode.
use_v2_block_manager (bool): Whether vLLM is configured to use the
v2 block manager or not. Used for raising an error since the v2
block manager is required with spec decode.
speculative_disable_by_batch_size (Optional[int]): Disable speculative_disable_by_batch_size (Optional[int]): Disable
speculative decoding for new incoming requests when the number speculative decoding for new incoming requests when the number
of enqueue requests is larger than this value, if provided. of enqueue requests is larger than this value, if provided.
...@@ -1204,16 +1214,13 @@ class SpeculativeConfig: ...@@ -1204,16 +1214,13 @@ class SpeculativeConfig:
"speculative decoding is > 1, but got " "speculative decoding is > 1, but got "
f"{speculative_disable_by_batch_size=}") f"{speculative_disable_by_batch_size=}")
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if enable_chunked_prefill: if enable_chunked_prefill:
raise ValueError( raise ValueError(
"Speculative decoding and chunked prefill are " "Speculative decoding and chunked prefill are "
f"currently mutually exclusive ({enable_chunked_prefill=}).") f"currently mutually exclusive ({enable_chunked_prefill=}).")
if not use_v2_block_manager:
raise ValueError(
"Speculative decoding requires usage of the V2 "
"block manager. Enable it with --use-v2-block-manager.")
# TODO: The user should be able to specify revision/max model len # TODO: The user should be able to specify revision/max model len
# for the draft model. It is not currently supported. # for the draft model. It is not currently supported.
draft_revision = None draft_revision = None
...@@ -1306,6 +1313,7 @@ class SpeculativeConfig: ...@@ -1306,6 +1313,7 @@ class SpeculativeConfig:
draft_model_config, draft_model_config,
draft_parallel_config, draft_parallel_config,
num_speculative_tokens, num_speculative_tokens,
speculative_disable_mqa_scorer,
speculative_disable_by_batch_size, speculative_disable_by_batch_size,
ngram_prompt_lookup_max, ngram_prompt_lookup_max,
ngram_prompt_lookup_min, ngram_prompt_lookup_min,
...@@ -1402,6 +1410,7 @@ class SpeculativeConfig: ...@@ -1402,6 +1410,7 @@ class SpeculativeConfig:
draft_model_config: ModelConfig, draft_model_config: ModelConfig,
draft_parallel_config: ParallelConfig, draft_parallel_config: ParallelConfig,
num_speculative_tokens: int, num_speculative_tokens: int,
speculative_disable_mqa_scorer: Optional[bool],
speculative_disable_by_batch_size: Optional[int], speculative_disable_by_batch_size: Optional[int],
ngram_prompt_lookup_max: Optional[int], ngram_prompt_lookup_max: Optional[int],
ngram_prompt_lookup_min: Optional[int], ngram_prompt_lookup_min: Optional[int],
...@@ -1448,6 +1457,7 @@ class SpeculativeConfig: ...@@ -1448,6 +1457,7 @@ class SpeculativeConfig:
self.draft_model_config = draft_model_config self.draft_model_config = draft_model_config
self.draft_parallel_config = draft_parallel_config self.draft_parallel_config = draft_parallel_config
self.num_speculative_tokens = num_speculative_tokens self.num_speculative_tokens = num_speculative_tokens
self.speculative_disable_mqa_scorer = speculative_disable_mqa_scorer
self.speculative_disable_by_batch_size = \ self.speculative_disable_by_batch_size = \
speculative_disable_by_batch_size speculative_disable_by_batch_size
self.ngram_prompt_lookup_max = ngram_prompt_lookup_max or 0 self.ngram_prompt_lookup_max = ngram_prompt_lookup_max or 0
...@@ -1521,7 +1531,7 @@ class LoRAConfig: ...@@ -1521,7 +1531,7 @@ class LoRAConfig:
max_loras: int max_loras: int
fully_sharded_loras: bool = False fully_sharded_loras: bool = False
max_cpu_loras: Optional[int] = None max_cpu_loras: Optional[int] = None
lora_dtype: Optional[torch.dtype] = None lora_dtype: Optional[Union[torch.dtype, str]] = None
lora_extra_vocab_size: int = 256 lora_extra_vocab_size: int = 256
# This is a constant. # This is a constant.
lora_vocab_padding_size: ClassVar[int] = 256 lora_vocab_padding_size: ClassVar[int] = 256
...@@ -1562,6 +1572,8 @@ class LoRAConfig: ...@@ -1562,6 +1572,8 @@ class LoRAConfig:
model_config.quantization) model_config.quantization)
def verify_with_scheduler_config(self, scheduler_config: SchedulerConfig): def verify_with_scheduler_config(self, scheduler_config: SchedulerConfig):
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if scheduler_config.chunked_prefill_enabled: if scheduler_config.chunked_prefill_enabled:
raise ValueError("LoRA is not supported with chunked prefill yet.") raise ValueError("LoRA is not supported with chunked prefill yet.")
...@@ -1671,7 +1683,7 @@ def _get_and_verify_max_len( ...@@ -1671,7 +1683,7 @@ def _get_and_verify_max_len(
hf_config: PretrainedConfig, hf_config: PretrainedConfig,
max_model_len: Optional[int], max_model_len: Optional[int],
disable_sliding_window: bool, disable_sliding_window: bool,
sliding_window_len: Optional[int], sliding_window_len: Optional[Union[int, List[Optional[int]]]],
spec_target_max_model_len: Optional[int] = None, spec_target_max_model_len: Optional[int] = None,
) -> int: ) -> int:
"""Get and verify the model's maximum length.""" """Get and verify the model's maximum length."""
...@@ -1704,9 +1716,12 @@ def _get_and_verify_max_len( ...@@ -1704,9 +1716,12 @@ def _get_and_verify_max_len(
# If sliding window is manually disabled, max_length should be less # If sliding window is manually disabled, max_length should be less
# than the sliding window length in the model config. # than the sliding window length in the model config.
if disable_sliding_window and sliding_window_len is not None: if disable_sliding_window and sliding_window_len is not None:
sliding_window_len_min = get_min_sliding_window(sliding_window_len)
max_len_key = "sliding_window" \ max_len_key = "sliding_window" \
if sliding_window_len < derived_max_model_len else max_len_key if sliding_window_len_min < derived_max_model_len else max_len_key
derived_max_model_len = min(derived_max_model_len, sliding_window_len) derived_max_model_len = min(derived_max_model_len,
sliding_window_len_min)
# If none of the keys were found in the config, use a default and # If none of the keys were found in the config, use a default and
# log a warning. # log a warning.
...@@ -1730,16 +1745,10 @@ def _get_and_verify_max_len( ...@@ -1730,16 +1745,10 @@ def _get_and_verify_max_len(
rope_scaling = getattr(hf_config, "rope_scaling", None) rope_scaling = getattr(hf_config, "rope_scaling", None)
if rope_scaling is not None: if rope_scaling is not None:
if "type" in rope_scaling: # No need to consider "type" key because of patch_rope_scaling when
rope_type = rope_scaling["type"] # loading HF config
elif "rope_type" in rope_scaling: rope_type = rope_scaling["rope_type"]
rope_type = rope_scaling["rope_type"]
else:
raise ValueError(
"rope_scaling must have a 'type' or 'rope_type' key.")
# The correct one should be "longrope", kept "su" here
# to be backward compatible
if rope_type not in ("su", "longrope", "llama3"): if rope_type not in ("su", "longrope", "llama3"):
if disable_sliding_window: if disable_sliding_window:
# TODO(robertgshaw): Find a model that supports rope_scaling # TODO(robertgshaw): Find a model that supports rope_scaling
...@@ -1749,11 +1758,10 @@ def _get_and_verify_max_len( ...@@ -1749,11 +1758,10 @@ def _get_and_verify_max_len(
"with rope_scaling. Please raise an issue so we can " "with rope_scaling. Please raise an issue so we can "
"investigate.") "investigate.")
if rope_type == "mrope": # NOTE: rope_type == "default" does not define factor
scaling_factor = 1 # https://github.com/huggingface/transformers/blob/v4.45.2/src/transformers/modeling_rope_utils.py
else: scaling_factor = rope_scaling.get("factor", 1.0)
assert "factor" in rope_scaling
scaling_factor = rope_scaling["factor"]
if rope_type == "yarn": if rope_type == "yarn":
derived_max_model_len = rope_scaling[ derived_max_model_len = rope_scaling[
"original_max_position_embeddings"] "original_max_position_embeddings"]
...@@ -1794,6 +1802,14 @@ def _get_and_verify_max_len( ...@@ -1794,6 +1802,14 @@ def _get_and_verify_max_len(
return int(max_model_len) return int(max_model_len)
def get_min_sliding_window(
sliding_window: Union[int, List[Optional[int]]]) -> int:
if isinstance(sliding_window, list):
return min(s for s in sliding_window if s is not None)
return sliding_window
def get_served_model_name(model: str, def get_served_model_name(model: str,
served_model_name: Optional[Union[str, List[str]]]): served_model_name: Optional[Union[str, List[str]]]):
""" """
......
...@@ -55,9 +55,12 @@ class BlockTable: ...@@ -55,9 +55,12 @@ class BlockTable:
self._num_full_slots = self._get_num_token_ids() self._num_full_slots = self._get_num_token_ids()
@staticmethod @staticmethod
def get_num_required_blocks(token_ids: List[int], block_size: int) -> int: def get_num_required_blocks(token_ids: List[int],
block_size: int,
num_lookahead_slots: int = 0) -> int:
"""Calculates the minimum number of blocks required to store a given """Calculates the minimum number of blocks required to store a given
sequence of token IDs. sequence of token IDs along with any look-ahead slots that may be
required (like in multi-step + chunked-prefill).
This assumes worst-case scenario, where every block requires a new This assumes worst-case scenario, where every block requires a new
allocation (e.g. ignoring prefix caching). allocation (e.g. ignoring prefix caching).
...@@ -66,12 +69,14 @@ class BlockTable: ...@@ -66,12 +69,14 @@ class BlockTable:
token_ids (List[int]): The sequence of token IDs to be stored. token_ids (List[int]): The sequence of token IDs to be stored.
block_size (int): The maximum number of tokens that can be stored in block_size (int): The maximum number of tokens that can be stored in
a single block. a single block.
num_lookahead_slots (int): look-ahead slots that the sequence may
require.
Returns: Returns:
int: The minimum number of blocks required to store the given int: The minimum number of blocks required to store the given
sequence of token IDs. sequence of token IDs along with any required look-ahead slots.
""" """
return cdiv(len(token_ids), block_size) return cdiv(len(token_ids) + num_lookahead_slots, block_size)
def allocate(self, def allocate(self,
token_ids: List[int], token_ids: List[int],
...@@ -215,7 +220,6 @@ class BlockTable: ...@@ -215,7 +220,6 @@ class BlockTable:
occupied by each block. After freeing all the blocks, the `_blocks` list occupied by each block. After freeing all the blocks, the `_blocks` list
is set to `None`. is set to `None`.
""" """
assert self._is_allocated
for block in self.blocks: for block in self.blocks:
self._allocator.free(block) self._allocator.free(block)
self._blocks.reset() self._blocks.reset()
...@@ -234,7 +238,6 @@ class BlockTable: ...@@ -234,7 +238,6 @@ class BlockTable:
List[int]: A list of physical block indices for the blocks in the List[int]: A list of physical block indices for the blocks in the
BlockTable. BlockTable.
""" """
assert self._is_allocated
return self._blocks.ids() return self._blocks.ids()
def get_unseen_token_ids(self, sequence_token_ids: List[int]) -> List[int]: def get_unseen_token_ids(self, sequence_token_ids: List[int]) -> List[int]:
......
...@@ -259,25 +259,22 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator): ...@@ -259,25 +259,22 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
current_swap_mapping[src_block_id] = dst_block_id current_swap_mapping[src_block_id] = dst_block_id
return current_swap_mapping return current_swap_mapping
def get_num_blocks_touched(self, def get_num_full_blocks_touched(self, blocks: List[Block],
blocks: List[Block], device: Device) -> int:
device: Device, """Returns the number of full blocks that will be touched by
num_lookahead_slots: int = 0) -> int:
"""Returns the number of blocks that will be touched by
swapping in/out the given blocks on to the 'device'. swapping in/out the given blocks on to the 'device'.
Args: Args:
blocks: List of blocks to be swapped. blocks: List of blocks to be swapped.
device (Device): Device to swap the 'blocks' on. device (Device): Device to swap the 'blocks' on.
num_lookahead_slots (int): Number of lookahead slots used in
speculative decoding, default to 0.
Returns: Returns:
int: the number of blocks that will be touched by int: the number of full blocks that will be touched by
swapping in/out the given blocks on to the 'device'. swapping in/out the given blocks on to the 'device'.
Non full blocks are ignored when deciding the number
of blocks to touch.
""" """
return self._allocators[device].get_num_blocks_touched( return self._allocators[device].get_num_full_blocks_touched(blocks)
blocks, num_lookahead_slots)
def clear_copy_on_writes(self) -> List[Tuple[int, int]]: def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
"""Clears the copy-on-write (CoW) state and returns the mapping of """Clears the copy-on-write (CoW) state and returns the mapping of
......
...@@ -181,9 +181,7 @@ class BlockAllocator(ABC): ...@@ -181,9 +181,7 @@ class BlockAllocator(ABC):
pass pass
@abstractmethod @abstractmethod
def get_num_blocks_touched(self, def get_num_full_blocks_touched(self, blocks: List[Block]) -> int:
blocks: List[Block],
num_lookahead_slots: int = 0) -> int:
pass pass
@abstractmethod @abstractmethod
...@@ -260,10 +258,8 @@ class DeviceAwareBlockAllocator(ABC): ...@@ -260,10 +258,8 @@ class DeviceAwareBlockAllocator(ABC):
pass pass
@abstractmethod @abstractmethod
def get_num_blocks_touched(self, def get_num_full_blocks_touched(self, blocks: List[Block],
blocks: List[Block], device: Device) -> int:
device: Device,
num_lookahead_slots: int = 0) -> int:
pass pass
@abstractmethod @abstractmethod
......
...@@ -4,7 +4,6 @@ from typing import Deque, FrozenSet, Iterable, List, Optional, Tuple ...@@ -4,7 +4,6 @@ from typing import Deque, FrozenSet, Iterable, List, Optional, Tuple
from vllm.core.block.common import (BlockPool, CopyOnWriteTracker, RefCounter, from vllm.core.block.common import (BlockPool, CopyOnWriteTracker, RefCounter,
get_all_blocks_recursively) get_all_blocks_recursively)
from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device
from vllm.utils import cdiv
Refcount = int Refcount = int
...@@ -282,40 +281,26 @@ class NaiveBlockAllocator(BlockAllocator): ...@@ -282,40 +281,26 @@ class NaiveBlockAllocator(BlockAllocator):
def promote_to_immutable_block(self, block: Block) -> BlockId: def promote_to_immutable_block(self, block: Block) -> BlockId:
raise NotImplementedError("There is no promotion for naive blocks") raise NotImplementedError("There is no promotion for naive blocks")
def get_num_blocks_touched(self, def get_num_full_blocks_touched(self, blocks: List[Block]) -> int:
blocks: List[Block], """Returns the number of full blocks that will be touched by
num_lookahead_slots: int = 0) -> int: swapping in/out.
"""Determine the number of blocks that will be touched by
swapping in/out the given blocks from certain sequence
group with the provided num_lookahead_slots.
Args: Args:
blocks (List[Block]): The potential blocks to swap. blocks: List of blocks to be swapped.
num_lookahead_slots (int): number of lookahead slots (0 for swap
out).
Returns: Returns:
int: the number of blocks that will be touched by int: the number of full blocks that will be touched by
swapping in/out the given blocks and num_lookahead_slots. swapping in/out the given blocks. Non full blocks are ignored
when deciding the number of blocks to touch.
""" """
# NOTE: for naive block, we use set to eliminate common blocks among # NOTE: for naive block, we use set to eliminate common blocks among
# seqs, also we compare the empty slots in the mutable blocks with # seqs, also we compare the empty slots in the mutable blocks with
# lookahead slots to get the number of unique new block that are # lookahead slots to get the number of unique new block that are
# needed. # needed.
old_block_set = set() old_block_set = set()
new_block_count = 0
# TODO(cade): make sure the logic is correct and clean it up.
for block in blocks: for block in blocks:
if not block.is_full and num_lookahead_slots != 0: if block.is_full:
new_block_count += 1 old_block_set.add(block)
if num_lookahead_slots > block.num_empty_slots: return len(old_block_set)
new_block_count += cdiv(
num_lookahead_slots - block.num_empty_slots,
self._block_size)
else:
old_block_set.add(block.block_id)
num_touched_blocks = new_block_count + len(old_block_set)
return num_touched_blocks
def swap_out(self, blocks: List[Block]) -> None: def swap_out(self, blocks: List[Block]) -> None:
for block in blocks: for block in blocks:
......
...@@ -8,7 +8,6 @@ from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device ...@@ -8,7 +8,6 @@ from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device
from vllm.core.block.naive_block import (BlockPool, NaiveBlock, from vllm.core.block.naive_block import (BlockPool, NaiveBlock,
NaiveBlockAllocator) NaiveBlockAllocator)
from vllm.core.evictor_v2 import EvictionPolicy, Evictor, make_evictor from vllm.core.evictor_v2 import EvictionPolicy, Evictor, make_evictor
from vllm.utils import cdiv
PrefixHash = int PrefixHash = int
...@@ -576,37 +575,27 @@ class PrefixCachingBlockAllocator(BlockAllocator): ...@@ -576,37 +575,27 @@ class PrefixCachingBlockAllocator(BlockAllocator):
if ids if ids
]) ])
def get_num_blocks_touched(self, def get_num_full_blocks_touched(self, blocks: List[Block]) -> int:
blocks: List[Block], """Returns the number of full blocks that will be touched by
num_lookahead_slots: int = 0) -> int: swapping in/out.
"""Determine the number of blocks that will be touched by
swapping in/out the given blocks from certain sequence
group with the provided num_lookahead_slots.
Args: Args:
blocks (List[Block]): The potential blocks to swap. blocks: List of blocks to be swapped.
num_lookahead_slots (int): number of lookahead slots (0 for
swap out).
Returns: Returns:
int: the number of blocks that will be touched by int: the number of full blocks that will be touched by
swapping in/out the given blocks and num_lookahead_slots. swapping in/out the given blocks. Non full blocks are ignored
when deciding the number of blocks to touch.
""" """
num_touched_blocks = 0 num_touched_blocks: int = 0
for block in blocks: for block in blocks:
if not block.is_full: # If the block has a match in the cache and the cached
# block is not referenced, then we still count it as a
# touched block
if block.is_full and (not self.is_block_cached(block) or \
(block.content_hash is not None and \
self._cached_blocks[block.content_hash] in \
self.evictor)):
num_touched_blocks += 1 num_touched_blocks += 1
if num_lookahead_slots > block.num_empty_slots:
num_touched_blocks += cdiv(
num_lookahead_slots - block.num_empty_slots,
self._block_size)
else:
# If the block has a match in the cache and the cached block
# is not referenced, then we still count it as a touched block
if not self.is_block_cached(block) or \
(block.content_hash is not None and \
self._cached_blocks[block.content_hash] in self.evictor):
num_touched_blocks += 1
return num_touched_blocks return num_touched_blocks
def swap_out(self, blocks: List[Block]) -> None: def swap_out(self, blocks: List[Block]) -> None:
......
...@@ -4,28 +4,6 @@ from vllm.utils import (STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE, ...@@ -4,28 +4,6 @@ from vllm.utils import (STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE,
STR_NOT_IMPL_ENC_DEC_SWA) STR_NOT_IMPL_ENC_DEC_SWA)
def _get_block_mgr_sliding_window_attr(block_mgr):
'''
BlockManagerV1 and BlockManagerV2 have slightly different
members related to sliding window attention (SWA). This
function extracts the appropriate member to use for determining
whether SWA is enabled.
Arguments:
* block_mgr: BlockManagerV1 or BlockManagerV2 instance
'''
if hasattr(block_mgr, 'block_sliding_window'):
return block_mgr.block_sliding_window
if hasattr(block_mgr, 'max_block_sliding_window'):
return block_mgr.max_block_sliding_window
raise AttributeError("Block manager instance has neither " + \
"block_sliding_window nor " + \
"max_block_sliding_window attributes.")
def check_no_caching_or_swa_for_blockmgr_encdec( def check_no_caching_or_swa_for_blockmgr_encdec(
block_mgr, seq_group: SequenceGroup) -> None: block_mgr, seq_group: SequenceGroup) -> None:
''' '''
...@@ -41,7 +19,7 @@ def check_no_caching_or_swa_for_blockmgr_encdec( ...@@ -41,7 +19,7 @@ def check_no_caching_or_swa_for_blockmgr_encdec(
''' '''
if seq_group.is_encoder_decoder(): if seq_group.is_encoder_decoder():
if _get_block_mgr_sliding_window_attr(block_mgr) is not None: if block_mgr.max_block_sliding_window is not None:
raise NotImplementedError(STR_NOT_IMPL_ENC_DEC_SWA) raise NotImplementedError(STR_NOT_IMPL_ENC_DEC_SWA)
if block_mgr.enable_caching: if block_mgr.enable_caching:
......
"""A block manager that manages token blocks.""" """A block manager that manages token blocks."""
from itertools import chain
from typing import Dict, List, Optional from typing import Dict, List, Optional
from typing import Sequence as GenericSequence from typing import Sequence as GenericSequence
from typing import Tuple from typing import Tuple
...@@ -18,16 +17,15 @@ SeqId = int ...@@ -18,16 +17,15 @@ SeqId = int
EncoderSeqId = str EncoderSeqId = str
class BlockSpaceManagerV2(BlockSpaceManager): class SelfAttnBlockSpaceManager(BlockSpaceManager):
"""BlockSpaceManager which manages the allocation of KV cache. """BlockSpaceManager which manages the allocation of KV cache.
It owns responsibility for allocation, swapping, allocating memory for It owns responsibility for allocation, swapping, allocating memory for
autoregressively-generated tokens, and other advanced features such as autoregressively-generated tokens, and other advanced features such as
prefix caching, forking/copy-on-write, and sliding-window memory allocation. prefix caching, forking/copy-on-write, and sliding-window memory allocation.
The current implementation is partial; in particular prefix caching and This class implements the design described in
sliding-window are not feature complete. This class implements the design https://github.com/vllm-project/vllm/pull/3492.
described in https://github.com/vllm-project/vllm/pull/3492.
Lookahead slots Lookahead slots
The block manager has the notion of a "lookahead slot". These are slots The block manager has the notion of a "lookahead slot". These are slots
...@@ -107,7 +105,9 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -107,7 +105,9 @@ class BlockSpaceManagerV2(BlockSpaceManager):
self._last_access_blocks_tracker = LastAccessBlocksTracker( self._last_access_blocks_tracker = LastAccessBlocksTracker(
self.block_allocator) self.block_allocator)
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus: def can_allocate(self,
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> AllocStatus:
# FIXME(woosuk): Here we assume that all sequences in the group share # FIXME(woosuk): Here we assume that all sequences in the group share
# the same prompt. This may not be true for preempted sequences. # the same prompt. This may not be true for preempted sequences.
...@@ -117,6 +117,7 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -117,6 +117,7 @@ class BlockSpaceManagerV2(BlockSpaceManager):
num_required_blocks = BlockTable.get_num_required_blocks( num_required_blocks = BlockTable.get_num_required_blocks(
seq.get_token_ids(), seq.get_token_ids(),
block_size=self.block_size, block_size=self.block_size,
num_lookahead_slots=num_lookahead_slots,
) )
if seq_group.is_encoder_decoder(): if seq_group.is_encoder_decoder():
...@@ -149,7 +150,9 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -149,7 +150,9 @@ class BlockSpaceManagerV2(BlockSpaceManager):
block_allocator=self.block_allocator, block_allocator=self.block_allocator,
max_block_sliding_window=self.max_block_sliding_window, max_block_sliding_window=self.max_block_sliding_window,
) )
block_table.allocate(seq.get_token_ids()) if seq.get_token_ids():
# Add blocks to the block table only if the sequence is non empty.
block_table.allocate(seq.get_token_ids())
return block_table return block_table
...@@ -186,7 +189,7 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -186,7 +189,7 @@ class BlockSpaceManagerV2(BlockSpaceManager):
assert (request_id assert (request_id
not in self.cross_block_tables), \ not in self.cross_block_tables), \
"block table already exists" "block table already exists"
check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group) check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)
...@@ -467,12 +470,31 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -467,12 +470,31 @@ class BlockSpaceManagerV2(BlockSpaceManager):
AllocStatus: The AllocStatus for swapping in/out the given AllocStatus: The AllocStatus for swapping in/out the given
sequence_group on to the 'device'. sequence_group on to the 'device'.
""" """
blocks = self._get_blocks_for_swap(seq_group, status) # First determine the number of blocks that will be touched by this
num_blocks_touched = self.block_allocator.get_num_blocks_touched( # swap. Then verify if there are available blocks in the device
blocks, device, num_lookahead_slots) # to perform the swap.
num_blocks_touched = 0
blocks: List[Block] = []
for seq in seq_group.get_seqs(status=status):
block_table = self.block_tables[seq.seq_id]
if block_table.blocks is not None:
# Compute the number blocks to touch for the tokens to be
# appended. This does NOT include the full blocks that need
# to be touched for the swap.
num_blocks_touched += \
block_table.get_num_blocks_touched_by_append_slots(
block_table.get_unseen_token_ids(seq.get_token_ids()),
num_lookahead_slots=num_lookahead_slots)
blocks.extend(block_table.blocks)
# Compute the number of full blocks to touch and add it to the
# existing count of blocks to touch.
num_blocks_touched += self.block_allocator.get_num_full_blocks_touched(
blocks, device=device)
watermark_blocks = 0 watermark_blocks = 0
if device == Device.GPU: if device == Device.GPU:
watermark_blocks = self.watermark_blocks watermark_blocks = self.watermark_blocks
if self.block_allocator.get_num_total_blocks( if self.block_allocator.get_num_total_blocks(
device) < num_blocks_touched: device) < num_blocks_touched:
return AllocStatus.NEVER return AllocStatus.NEVER
...@@ -481,23 +503,3 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -481,23 +503,3 @@ class BlockSpaceManagerV2(BlockSpaceManager):
return AllocStatus.OK return AllocStatus.OK
else: else:
return AllocStatus.LATER return AllocStatus.LATER
def _get_blocks_for_swap(self, seq_group: SequenceGroup,
status: SequenceStatus) -> List[Block]:
"""Returns the list of blocks those are touched by the seq_group
Args:
sequence_group (SequenceGroup): The sequence group to swap in.
status (SequenceStatus): The status of sequence which is needed
for action. RUNNING for swap out and SWAPPED for swap in
Returns:
The list of blocks those are touched by the seq_group.
"""
blocks: Dict[int, List[Block]] = {}
for seq in seq_group.get_seqs(status=status):
block_table = self.block_tables[seq.seq_id]
if block_table.blocks is not None:
blocks[seq.seq_id] = block_table.blocks
combined_blocks = list(chain(*blocks.values()))
return combined_blocks
"""A block manager that manages token blocks."""
import math
from abc import ABC, abstractmethod
from itertools import count, takewhile
from os.path import commonprefix
from typing import Dict, List, Optional
from typing import Sequence as GenericSequence
from typing import Set, Tuple
from vllm.block import BlockTable, PhysicalTokenBlock
from vllm.core.block.common import CacheMetricData
from vllm.core.block.utils import check_no_caching_or_swa_for_blockmgr_encdec
from vllm.core.evictor_v1 import EvictionPolicy, Evictor, make_evictor
from vllm.core.interfaces import AllocStatus, BlockSpaceManager
from vllm.logger import init_logger
from vllm.sequence import Sequence, SequenceGroup, SequenceStatus
from vllm.utils import Device
logger = init_logger(__name__)
class BlockAllocatorBase(ABC):
"""Manages free physical token blocks for a device.
The allocator maintains a list of free blocks and allocates a block when
requested. When a block is freed, its reference count is decremented. If
the reference count becomes zero, the block is added back to the free list.
"""
@abstractmethod
def __init__(self,
device: Device,
block_size: int,
num_blocks: int,
eviction_policy: EvictionPolicy = EvictionPolicy.LRU):
pass
@abstractmethod
def allocate(self,
block_hash: Optional[int] = None,
num_hashed_tokens: int = 0) -> PhysicalTokenBlock:
pass
@abstractmethod
def free(self, block: PhysicalTokenBlock) -> None:
pass
@abstractmethod
def get_num_free_blocks(self) -> int:
pass
@abstractmethod
def get_num_total_blocks(self) -> int:
pass
@abstractmethod
def contains_block(self, block_hash: int) -> bool:
pass
@abstractmethod
def update_hash(self, block_hash: int, block: PhysicalTokenBlock):
pass
@abstractmethod
def get_prefix_cache_hit_rate(self) -> float:
"""Prefix cache hit rate. -1 means not supported or disabled."""
pass
class CachedBlockAllocator(BlockAllocatorBase):
"""Manages free physical token blocks for a device.
The allocator maintains a list of free blocks and allocates a block when
requested. When a block is freed, its reference count is decremented. If
the reference count becomes zero, the block is added back to the free list.
"""
def __init__(self,
device: Device,
block_size: int,
num_blocks: int,
eviction_policy: EvictionPolicy = EvictionPolicy.LRU) -> None:
self.device = device
self.block_size = block_size
self.num_blocks = num_blocks
self.current_num_blocks = 0
self.cached_blocks: Dict[int, PhysicalTokenBlock] = {}
self.evictor: Evictor = make_evictor(eviction_policy)
self.default_hash_ctr = count()
self.cache_metric_data = CacheMetricData()
def allocate_block(self, block_hash: int,
num_hashed_tokens: int) -> PhysicalTokenBlock:
if self.current_num_blocks == self.num_blocks:
block = self.evictor.evict()
block.block_hash = block_hash
block.num_hashed_tokens = num_hashed_tokens
return block
block = PhysicalTokenBlock(device=self.device,
block_number=self.current_num_blocks,
block_size=self.block_size,
block_hash=block_hash,
num_hashed_tokens=num_hashed_tokens)
self.current_num_blocks += 1
return block
def allocate(self,
block_hash: Optional[int] = None,
num_hashed_tokens: int = 0) -> PhysicalTokenBlock:
if block_hash is None:
block_hash = next(self.default_hash_ctr)
if block_hash in self.evictor:
assert block_hash not in self.cached_blocks
block = self.evictor.remove(block_hash)
assert block.ref_count == 0
self.cached_blocks[block_hash] = block
if block_hash in self.cached_blocks:
self.cache_metric_data.query(hit=True)
else:
self.cache_metric_data.query(hit=False)
self.cached_blocks[block_hash] = self.allocate_block(
block_hash, num_hashed_tokens)
block = self.cached_blocks[block_hash]
assert block.block_hash == block_hash
block.ref_count += 1
return block
def free(self, block: PhysicalTokenBlock) -> None:
if block.ref_count == 0:
raise ValueError(f"Double free! {block} is already freed.")
block.ref_count -= 1
if block.ref_count == 0:
assert block.block_hash not in self.evictor
self.evictor.add(block)
# Remove the block from the cached_blocks
del self.cached_blocks[block.block_hash]
def get_num_free_blocks(self) -> int:
return (self.num_blocks - self.current_num_blocks +
self.evictor.num_blocks)
def get_num_total_blocks(self) -> int:
return self.num_blocks
def contains_block(self, block_hash: int) -> bool:
return block_hash in self.cached_blocks or block_hash in self.evictor
def update_hash(self, block_hash: int, block: PhysicalTokenBlock):
# Update the hash of block and the cached_blocks dictionary.
assert not self.contains_block(block_hash)
old_hash = block.block_hash
block.block_hash = block_hash
del self.cached_blocks[old_hash]
self.cached_blocks[block_hash] = block
def get_prefix_cache_hit_rate(self) -> float:
return self.cache_metric_data.get_hit_rate()
class UncachedBlockAllocator(BlockAllocatorBase):
"""Manages free physical token blocks for a device.
The allocator maintains a list of free blocks and allocates a block when
requested. When a block is freed, its reference count is decremented. If
the reference count becomes zero, the block is added back to the free list.
"""
def __init__(
self,
device: Device,
block_size: int,
num_blocks: int,
) -> None:
self.device = device
self.block_size = block_size
self.num_blocks = num_blocks
# Initialize the free blocks.
self.free_blocks: List[PhysicalTokenBlock] = []
for i in range(num_blocks):
block = PhysicalTokenBlock(device=device,
block_number=i,
block_size=block_size,
block_hash=-1,
num_hashed_tokens=0)
self.free_blocks.append(block)
def allocate(self,
block_hash: Optional[int] = None,
num_hashed_tokens: int = 0) -> PhysicalTokenBlock:
if not self.free_blocks:
raise ValueError("Out of memory! No free blocks are available.")
block = self.free_blocks.pop()
block.ref_count = 1
return block
def free(self, block: PhysicalTokenBlock) -> None:
if block.ref_count == 0:
raise ValueError(f"Double free! {block} is already freed.")
block.ref_count -= 1
if block.ref_count == 0:
self.free_blocks.append(block)
def get_num_free_blocks(self) -> int:
return len(self.free_blocks)
def get_num_total_blocks(self) -> int:
return self.num_blocks
def contains_block(self, block_hash: int) -> bool:
raise NotImplementedError(
"Invalid codepath for uncached block allocator.")
def update_hash(self, block_hash: int, block: PhysicalTokenBlock):
raise NotImplementedError(
"Invalid codepath for uncached block allocator.")
def get_prefix_cache_hit_rate(self) -> float:
return -1
class BlockSpaceManagerV1(BlockSpaceManager):
"""Manages the mapping between logical and physical token blocks."""
def __init__(
self,
block_size: int,
num_gpu_blocks: int,
num_cpu_blocks: int,
watermark: float = 0.01,
sliding_window: Optional[int] = None,
enable_caching: bool = False,
) -> None:
self.block_size = block_size
self.num_total_gpu_blocks = num_gpu_blocks
self.num_total_cpu_blocks = num_cpu_blocks
if enable_caching and sliding_window is not None:
raise NotImplementedError(
"Sliding window is not allowed with prefix caching enabled!")
self.block_sliding_window = None
if sliding_window is not None:
# Round up to nearest block size to regularize sliding window
# allocation sizes.
self.block_sliding_window = math.ceil(sliding_window / block_size)
self.watermark = watermark
assert watermark >= 0.0
self.enable_caching = enable_caching
self.watermark_blocks = int(watermark * num_gpu_blocks)
if self.enable_caching:
logger.info("Automatic prefix caching is enabled.")
self.gpu_allocator: BlockAllocatorBase = CachedBlockAllocator(
Device.GPU, block_size, num_gpu_blocks)
self.cpu_allocator: BlockAllocatorBase = CachedBlockAllocator(
Device.CPU, block_size, num_cpu_blocks)
else:
self.gpu_allocator = UncachedBlockAllocator(
Device.GPU, block_size, num_gpu_blocks)
self.cpu_allocator = UncachedBlockAllocator(
Device.CPU, block_size, num_cpu_blocks)
# Mapping: seq_id -> BlockTable.
self.block_tables: Dict[int, BlockTable] = {}
# Mapping: req_id -> BlockTable
# Note that each SequenceGroup has a unique
# request ID
self.cross_block_tables: Dict[str, BlockTable] = {}
def _get_seq_num_required_blocks(self, seq: Optional[Sequence]) -> int:
return 0 if seq is None else seq.n_blocks
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:
# FIXME(woosuk): Here we assume that all sequences in the group share
# the same prompt. This may not be true for preempted sequences.
check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)
self_num_required_blocks = self._get_seq_num_required_blocks(
seq_group.get_seqs(status=SequenceStatus.WAITING)[0])
cross_num_required_blocks = self._get_seq_num_required_blocks(
seq_group.get_encoder_seq())
num_required_blocks = self_num_required_blocks + \
cross_num_required_blocks
if self.block_sliding_window is not None:
num_required_blocks = min(num_required_blocks,
self.block_sliding_window)
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
# Use watermark to avoid frequent cache eviction.
if (self.num_total_gpu_blocks - num_required_blocks <
self.watermark_blocks):
return AllocStatus.NEVER
if num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:
return AllocStatus.OK
else:
return AllocStatus.LATER
def _allocate_sequence(self, \
seq: Optional[Sequence], \
ref_count: int, \
is_encoder_decoder: bool = True) -> BlockTable:
# Allocate new physical token blocks that will store the prompt tokens.
num_prompt_blocks = self._get_seq_num_required_blocks(seq)
block_table: BlockTable = BlockTable()
assert seq is not None
for logical_idx in range(num_prompt_blocks):
if (self.block_sliding_window is not None
and logical_idx >= self.block_sliding_window):
block = block_table[logical_idx % self.block_sliding_window]
# Set the reference counts of the token blocks.
block.ref_count = ref_count
elif not is_encoder_decoder and self.enable_caching:
block = self.gpu_allocator.allocate(
seq.hash_of_block(logical_idx),
seq.num_hashed_tokens_of_block(logical_idx))
else:
block = self.gpu_allocator.allocate()
# Set the reference counts of the token blocks.
block.ref_count = ref_count
block_table.append(block)
return block_table
def allocate(self, seq_group: SequenceGroup) -> None:
is_encoder_decoder = seq_group.is_encoder_decoder()
check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)
# Allocate decoder sequences
#
# NOTE: Here we assume that all sequences in the group have the same
# decoder prompt.
wait_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
seq = wait_seqs[0]
block_table: BlockTable = \
self._allocate_sequence(seq,
seq_group.num_seqs(),
is_encoder_decoder)
# Assign the self-attention block tables for each sequence.
if len(wait_seqs) == 1:
self.block_tables[seq.seq_id] = block_table
else:
for seq in wait_seqs:
self.block_tables[seq.seq_id] = block_table.copy()
# Allocate encoder sequence
if is_encoder_decoder:
# A SequenceGroup has only a single encoder sequence (at most),
# thus allocate with a ref count of 1
block_table = self._allocate_sequence(seq_group.get_encoder_seq(),
1, is_encoder_decoder)
# Assign the cross-attention block table for the SequenceGroup.
self.cross_block_tables[seq_group.request_id] = block_table
def can_append_slots(self,
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> bool:
assert (num_lookahead_slots == 0
), "lookahead allocation not supported in BlockSpaceManagerV1"
# Simple heuristic: If there is at least one free block
# for each sequence, we can append.
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
num_seqs = seq_group.num_seqs(status=SequenceStatus.RUNNING)
return num_seqs <= num_free_gpu_blocks
def _promote_last_block(
self,
seq: Sequence,
last_block: PhysicalTokenBlock,
) -> PhysicalTokenBlock:
assert self.enable_caching
# Compute a new hash for the block so that it can be shared by other
# Sequences
new_hash = seq.hash_of_block(seq.n_blocks - 1)
# if new_hash is already in the cached table, then free last_block
# and return the cached version
if self.gpu_allocator.contains_block(new_hash):
self.gpu_allocator.free(last_block)
return self.gpu_allocator.allocate(new_hash)
else:
self.gpu_allocator.update_hash(new_hash, last_block)
return last_block
def _is_last_block_full(
self,
seq: Sequence,
) -> bool:
token_ids_len = seq.data.get_len()
return token_ids_len > 0 and token_ids_len % seq.block_size == 0
def _maybe_promote_last_block(
self,
seq: Sequence,
last_block: PhysicalTokenBlock,
) -> PhysicalTokenBlock:
if self._is_last_block_full(seq):
return self._promote_last_block(seq, last_block)
else:
return last_block
def _allocate_last_physical_block(
self,
seq: Sequence,
) -> PhysicalTokenBlock:
# Called before a new block is appended.
# This is in charge of allocating a new physical block (to be appended).
# None if the last block is not full. Otherwise, we set it to the
# content hash.
if not self.enable_caching:
return self.gpu_allocator.allocate()
block_hash: Optional[int] = None
n_blocks = seq.n_blocks
if (self._is_last_block_full(seq)):
block_hash = seq.hash_of_block(n_blocks - 1)
num_hashed_tokens = seq.num_hashed_tokens_of_block(n_blocks - 1)
# num_hashed_tokens is used to compute future hashes
# (e.g. in the hashing function, it is used to ask the sequence for
# prefix tokens)
new_block = self.gpu_allocator.allocate(block_hash, num_hashed_tokens)
# If the block has is None, then the block is not full.
# If the block is not full, then we expect it to have a refcount of 1.
if block_hash is None:
assert new_block.ref_count == 1
return new_block
def append_slots(
self,
seq: Sequence,
num_lookahead_slots: int = 0,
) -> List[Tuple[int, int]]:
"""Allocate a physical slot for a new token."""
n_blocks = seq.n_blocks
block_table = self.block_tables[seq.seq_id]
# If we need to allocate a new physical block
if len(block_table) < n_blocks:
# Currently this code only supports adding one physical block
assert len(block_table) == n_blocks - 1
if (self.block_sliding_window
and len(block_table) >= self.block_sliding_window):
# reuse a block
block_table.append(block_table[len(block_table) %
self.block_sliding_window])
else:
# The sequence hash a new logical block.
# Allocate a new physical block.
new_block = self._allocate_last_physical_block(seq)
block_table.append(new_block)
return []
# We want to append the token to the last physical block.
last_block = block_table[-1]
assert last_block.device == Device.GPU
if last_block.ref_count == 1:
# Not shared with other sequences. Appendable.
if self.enable_caching:
# If the last block is now complete, we may reuse an old block
# to save memory.
maybe_new_block = self._maybe_promote_last_block(
seq, last_block)
block_table[-1] = maybe_new_block
return []
else:
# The last block is shared with other sequences.
# Copy on Write: Allocate a new block and copy the tokens.
new_block = self._allocate_last_physical_block(seq)
block_table[-1] = new_block
self.gpu_allocator.free(last_block)
return [(last_block.block_number, new_block.block_number)]
def fork(self, parent_seq: Sequence, child_seq: Sequence) -> None:
# NOTE: fork does not allocate a new physical block.
# Thus, it is always safe from OOM.
if parent_seq.seq_id not in self.block_tables:
# Parent sequence has either been freed or never existed.
return
src_block_table = self.block_tables[parent_seq.seq_id]
self.block_tables[child_seq.seq_id] = src_block_table.copy()
# When using a sliding window, blocks will be eventually reused.
# In this case the block tables will contain repeated blocks.
# When forking, we must make sure that each block's `ref_count`
# is only incremented by one, so we deduplicate them by wrapping
# them in a set.
for block in set(src_block_table):
block.ref_count += 1
def _get_physical_blocks(
self, seq_group: SequenceGroup) -> List[PhysicalTokenBlock]:
# NOTE: Here, we assume that the physical blocks are only shared by
# the sequences in the same group.
request_id = seq_group.request_id
blocks: Set[PhysicalTokenBlock] = set()
for seq in seq_group.get_seqs():
if seq.is_finished():
continue
blocks.update(self.block_tables[seq.seq_id])
# Cross-attention blocks
if seq_group.is_encoder_decoder():
blocks.update(self.cross_block_tables[request_id])
return list(blocks)
def can_swap_in(self,
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> AllocStatus:
assert (num_lookahead_slots == 0
), "BlockSpaceManagerV1 does not support lookahead allocation"
blocks = self._get_physical_blocks(seq_group)
num_swapped_seqs = seq_group.num_seqs(status=SequenceStatus.SWAPPED)
if seq_group.is_encoder_decoder():
num_swapped_seqs += 1
num_free_blocks = self.gpu_allocator.get_num_free_blocks()
# NOTE: Conservatively, we assume that every sequence will allocate
# at least one free block right after the swap-in.
# NOTE: This should match the logic in can_append_slot().
num_required_blocks = len(blocks) + num_swapped_seqs
if self.gpu_allocator.get_num_total_blocks() < num_required_blocks:
return AllocStatus.NEVER
elif num_free_blocks - num_required_blocks >= self.watermark_blocks:
return AllocStatus.OK
else:
return AllocStatus.LATER
def _swap_block_table(
self, block_table: BlockTable, src_allocator: BlockAllocatorBase,
dest_allocator: BlockAllocatorBase,
mapping: Dict[PhysicalTokenBlock,
PhysicalTokenBlock]) -> BlockTable:
new_block_table: BlockTable = BlockTable()
for from_block in block_table:
if from_block in mapping:
to_block = mapping[from_block]
to_block.ref_count += 1
else:
to_block = dest_allocator.allocate(
from_block.block_hash, from_block.num_hashed_tokens)
mapping[from_block] = to_block
new_block_table.append(to_block)
# Free the source block swapped in to destination.
src_allocator.free(from_block)
return new_block_table
def swap_in(self, seq_group: SequenceGroup) -> List[Tuple[int, int]]:
request_id = seq_group.request_id
# CPU block -> GPU block.
# dict is efficient in lookup `if cpu_block in mapping`
mapping: Dict[PhysicalTokenBlock, PhysicalTokenBlock] = {}
for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
self.block_tables[seq.seq_id] = \
self._swap_block_table(self.block_tables[seq.seq_id],
self.cpu_allocator, self.gpu_allocator,
mapping)
if seq_group.is_encoder_decoder():
self.cross_block_tables[request_id] = \
self._swap_block_table(self.cross_block_tables[request_id],
self.cpu_allocator,
self.gpu_allocator,
mapping)
return [(cpu_block.block_number, gpu_block.block_number)
for cpu_block, gpu_block in mapping.items()]
def can_swap_out(self, seq_group: SequenceGroup) -> bool:
blocks = self._get_physical_blocks(seq_group)
return len(blocks) <= self.cpu_allocator.get_num_free_blocks()
def swap_out(self, seq_group: SequenceGroup) -> List[Tuple[int, int]]:
request_id = seq_group.request_id
# GPU block -> CPU block.
# dict is efficient in lookup `if gpu_block in mapping`
mapping: Dict[PhysicalTokenBlock, PhysicalTokenBlock] = {}
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
self.block_tables[seq.seq_id] = \
self._swap_block_table(self.block_tables[seq.seq_id],
self.gpu_allocator, self.cpu_allocator,
mapping)
if seq_group.is_encoder_decoder():
self.cross_block_tables[request_id] = \
self._swap_block_table(self.cross_block_tables[request_id],
self.gpu_allocator,
self.cpu_allocator,
mapping)
return [(cpu_block.block_number, gpu_block.block_number)
for cpu_block, gpu_block in mapping.items()]
def _free_block_table(self, block_table: BlockTable) -> None:
# when using a sliding window, each seq will only use up
# to `self.block_sliding_window` blocks. When freeing
# the block table, we must make sure to not free blocks more
# than once. If no sliding window is used, there is no block
# reuse in the block table, so we must free all blocks.
blocks_to_free = (block_table[-self.block_sliding_window:]
if self.block_sliding_window is not None else
block_table)
for block in set(blocks_to_free):
if block.device == Device.GPU:
self.gpu_allocator.free(block)
else:
self.cpu_allocator.free(block)
def free(self, seq: Sequence) -> None:
if seq.seq_id not in self.block_tables:
# Already freed or haven't been scheduled yet.
return
block_table = self.block_tables[seq.seq_id]
self._free_block_table(block_table)
del self.block_tables[seq.seq_id]
def free_cross(self, seq_group: SequenceGroup) -> None:
if seq_group.request_id not in self.cross_block_tables:
# Already freed or hasn't ben scheduled yet.
return
block_table = self.cross_block_tables[seq_group.request_id]
self._free_block_table(block_table)
del self.cross_block_tables[seq_group.request_id]
def reset(self) -> None:
# Free decoder block tables
for block_table in self.block_tables.values():
self._free_block_table(block_table)
self.block_tables.clear()
# Free cross-attention block tables
for block_table in self.cross_block_tables.values():
self._free_block_table(block_table)
self.cross_block_tables.clear()
def get_block_table(self, seq: Sequence) -> List[int]:
return self.block_tables[seq.seq_id].ids()
def get_cross_block_table(self, seq_group: SequenceGroup) -> List[int]:
block_table = self.cross_block_tables[seq_group.request_id]
return [block.block_number for block in block_table]
def get_num_free_gpu_blocks(self) -> int:
return self.gpu_allocator.get_num_free_blocks()
def get_num_free_cpu_blocks(self) -> int:
return self.cpu_allocator.get_num_free_blocks()
def access_all_blocks_in_seq(
self,
seq: Sequence,
access_time: float,
) -> None:
if self.enable_caching:
# Update the last accessed time of all the blocks accessed
# in this step.
block_table = self.block_tables[seq.seq_id]
for block in block_table:
block.last_accessed = access_time
def compute_full_blocks_in_seq(self, seq: Sequence, token_chunk_size: int):
if seq.seq_id not in self.block_tables:
return
# When chunked prefill is enabled, the computed full blocks
# should be calculated based on the number of computed tokens.
max_computed_tokens = (seq.data.get_num_computed_tokens() +
token_chunk_size)
computed_full_blocks = max_computed_tokens // self.block_size
block_table = self.block_tables[seq.seq_id]
if computed_full_blocks == 0:
return
for i in reversed(range(computed_full_blocks)):
if block_table[i].computed:
break
block_table[i].computed = True
def get_all_computed_blocks(self, seq: Sequence) -> List[int]:
if seq.seq_id not in self.block_tables:
return []
block_table = self.block_tables[seq.seq_id]
# NOTE We exclude the last block to avoid the case where the entire
# prompt is cached. This would cause erroneous behavior in model
# runner.
return [
b.block_number
for b in takewhile(lambda b: b.computed, block_table[:-1])
]
def get_common_computed_block_ids(
self, seqs: List[Sequence]) -> GenericSequence[int]:
"""Return the block ids that are common for a given sequence group.
Used in prefill (can skip prefill of some blocks).
"""
# Can return non-empty result only with prefix caching enabled.
if not self.enable_caching:
return []
ids_list = [self.get_all_computed_blocks(seq) for seq in seqs]
return commonprefix([ids for ids in ids_list if ids != []])
def mark_blocks_as_computed(self, seq_group: SequenceGroup,
token_chunk_size: int):
if self.enable_caching:
for seq in seq_group.get_seqs():
self.compute_full_blocks_in_seq(seq, token_chunk_size)
def get_prefix_cache_hit_rate(self, device: Device) -> float:
if device == Device.GPU:
return self.gpu_allocator.get_prefix_cache_hit_rate()
if device == Device.CPU:
return self.cpu_allocator.get_prefix_cache_hit_rate()
raise ValueError(f"Invalid device: {device}")
...@@ -28,23 +28,21 @@ class BlockSpaceManager(ABC): ...@@ -28,23 +28,21 @@ class BlockSpaceManager(ABC):
def get_block_space_manager_class(version: str): def get_block_space_manager_class(version: str):
version = version.lower() version = version.lower()
if version == "v1": if version == "selfattn":
from vllm.core.block_manager_v1 import BlockSpaceManagerV1 from vllm.core.block_manager import SelfAttnBlockSpaceManager
return BlockSpaceManagerV1 return SelfAttnBlockSpaceManager
if version == "v2": if version == "placeholder":
from vllm.core.block_manager_v2 import BlockSpaceManagerV2 from vllm.core.placeholder_block_space_manager import (
return BlockSpaceManagerV2 PlaceholderBlockSpaceManager)
return PlaceholderBlockSpaceManager
if version == "embedding":
from vllm.core.embedding_model_block_manager import (
EmbeddingModelBlockSpaceManager)
return EmbeddingModelBlockSpaceManager
raise ValueError(f"Unknown version {version=}") raise ValueError(f"Unknown version {version=}")
@abstractmethod @abstractmethod
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus: def can_allocate(self,
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> AllocStatus:
pass pass
@abstractmethod @abstractmethod
......
...@@ -5,9 +5,10 @@ from vllm.sequence import Sequence, SequenceGroup ...@@ -5,9 +5,10 @@ from vllm.sequence import Sequence, SequenceGroup
from vllm.utils import Device from vllm.utils import Device
class EmbeddingModelBlockSpaceManager(BlockSpaceManager): class PlaceholderBlockSpaceManager(BlockSpaceManager):
"""An embedding version of BlockSpaceManager for use in environments """A version of BlockSpaceManager for use in environments
with embedding models where block management is not required. where block management is not required.
For example: embedding models or attention-free models like Mamba.
This class provides the same interface as BlockSpaceManager, but its This class provides the same interface as BlockSpaceManager, but its
methods perform no actions or return simple values like True in specific methods perform no actions or return simple values like True in specific
...@@ -21,7 +22,9 @@ class EmbeddingModelBlockSpaceManager(BlockSpaceManager): ...@@ -21,7 +22,9 @@ class EmbeddingModelBlockSpaceManager(BlockSpaceManager):
) -> None: ) -> None:
pass pass
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus: def can_allocate(self,
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> AllocStatus:
# Always return OK for dummy purposes # Always return OK for dummy purposes
return AllocStatus.OK return AllocStatus.OK
...@@ -38,7 +41,7 @@ class EmbeddingModelBlockSpaceManager(BlockSpaceManager): ...@@ -38,7 +41,7 @@ class EmbeddingModelBlockSpaceManager(BlockSpaceManager):
seq: Sequence, seq: Sequence,
num_lookahead_slots: int, num_lookahead_slots: int,
) -> List[Tuple[int, int]]: ) -> List[Tuple[int, int]]:
return None # type: ignore return []
def fork(self, parent_seq: Sequence, child_seq: Sequence) -> None: def fork(self, parent_seq: Sequence, child_seq: Sequence) -> None:
pass pass
......
...@@ -4,8 +4,9 @@ import random ...@@ -4,8 +4,9 @@ import random
import time import time
from collections import deque from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import (Callable, Deque, Dict, Iterable, List, Optional, Set, from typing import Callable, Deque, Dict, Iterable, List, Optional
Tuple, Union) from typing import Sequence as GenericSequence
from typing import Set, Tuple, Union
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus, BlockSpaceManager from vllm.core.interfaces import AllocStatus, BlockSpaceManager
...@@ -115,7 +116,7 @@ class ScheduledSequenceGroup: ...@@ -115,7 +116,7 @@ class ScheduledSequenceGroup:
class SchedulerOutputs: class SchedulerOutputs:
"""The scheduling decision made from a scheduler.""" """The scheduling decision made from a scheduler."""
# Scheduled sequence groups. # Scheduled sequence groups.
scheduled_seq_groups: Iterable[ScheduledSequenceGroup] scheduled_seq_groups: GenericSequence[ScheduledSequenceGroup]
# Number of prefill groups scheduled. # Number of prefill groups scheduled.
num_prefill_groups: int num_prefill_groups: int
# Total number of batched tokens. # Total number of batched tokens.
...@@ -311,11 +312,10 @@ class Scheduler: ...@@ -311,11 +312,10 @@ class Scheduler:
# LoRAs. This should be improved in the future. # LoRAs. This should be improved in the future.
self.lora_config = lora_config self.lora_config = lora_config
version = "v1" version = "selfattn"
if self.scheduler_config.use_v2_block_manager: if (self.scheduler_config.embedding_mode
version = "v2" or self.cache_config.is_attention_free):
if self.scheduler_config.embedding_mode: version = "placeholder"
version = "embedding"
BlockSpaceManagerImpl = BlockSpaceManager.get_block_space_manager_class( BlockSpaceManagerImpl = BlockSpaceManager.get_block_space_manager_class(
version) version)
...@@ -522,7 +522,7 @@ class Scheduler: ...@@ -522,7 +522,7 @@ class Scheduler:
ret.swapped_out.clear() ret.swapped_out.clear()
ret.num_lookahead_slots = self._get_num_lookahead_slots( ret.num_lookahead_slots = self._get_num_lookahead_slots(
is_prefill=False) is_prefill=False, enable_chunking=enable_chunking)
ret.decode_seq_groups_list.clear() ret.decode_seq_groups_list.clear()
ret.prefill_seq_groups_list.clear() ret.prefill_seq_groups_list.clear()
...@@ -561,7 +561,7 @@ class Scheduler: ...@@ -561,7 +561,7 @@ class Scheduler:
# NOTE(woosuk): Preemption happens only when there is no available # NOTE(woosuk): Preemption happens only when there is no available
# slot to keep all the sequence groups in the RUNNING state. # slot to keep all the sequence groups in the RUNNING state.
while not self._can_append_slots(seq_group): while not self._can_append_slots(seq_group, enable_chunking):
budget.subtract_num_batched_tokens(seq_group.request_id, budget.subtract_num_batched_tokens(seq_group.request_id,
num_running_tokens) num_running_tokens)
num_running_seqs = seq_group.get_max_num_running_seqs() num_running_seqs = seq_group.get_max_num_running_seqs()
...@@ -611,7 +611,7 @@ class Scheduler: ...@@ -611,7 +611,7 @@ class Scheduler:
if not cont_loop: if not cont_loop:
break break
else: else:
self._append_slots(seq_group, blocks_to_copy) self._append_slots(seq_group, blocks_to_copy, enable_chunking)
is_prefill = seq_group.is_prefill() is_prefill = seq_group.is_prefill()
scheduled_seq_group: ScheduledSequenceGroup = \ scheduled_seq_group: ScheduledSequenceGroup = \
...@@ -684,7 +684,8 @@ class Scheduler: ...@@ -684,7 +684,8 @@ class Scheduler:
# If the sequence group cannot be swapped in, stop. # If the sequence group cannot be swapped in, stop.
is_prefill = seq_group.is_prefill() is_prefill = seq_group.is_prefill()
alloc_status = self.block_manager.can_swap_in( alloc_status = self.block_manager.can_swap_in(
seq_group, self._get_num_lookahead_slots(is_prefill)) seq_group,
self._get_num_lookahead_slots(is_prefill, enable_chunking))
if alloc_status == AllocStatus.LATER: if alloc_status == AllocStatus.LATER:
break break
elif alloc_status == AllocStatus.NEVER: elif alloc_status == AllocStatus.NEVER:
...@@ -727,7 +728,7 @@ class Scheduler: ...@@ -727,7 +728,7 @@ class Scheduler:
curr_loras.add(lora_int_id) curr_loras.add(lora_int_id)
swapped_queue.popleft() swapped_queue.popleft()
self._swap_in(seq_group, blocks_to_swap_in) self._swap_in(seq_group, blocks_to_swap_in)
self._append_slots(seq_group, blocks_to_copy) self._append_slots(seq_group, blocks_to_copy, enable_chunking)
is_prefill = seq_group.is_prefill() is_prefill = seq_group.is_prefill()
if is_prefill: if is_prefill:
prefill_seq_groups.append( prefill_seq_groups.append(
...@@ -747,12 +748,13 @@ class Scheduler: ...@@ -747,12 +748,13 @@ class Scheduler:
blocks_to_swap_in=blocks_to_swap_in, blocks_to_swap_in=blocks_to_swap_in,
blocks_to_copy=blocks_to_copy, blocks_to_copy=blocks_to_copy,
num_lookahead_slots=self._get_num_lookahead_slots( num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=False), is_prefill=False, enable_chunking=enable_chunking),
infeasible_seq_groups=infeasible_seq_groups, infeasible_seq_groups=infeasible_seq_groups,
) )
def _get_prompt_limit(self, seq_group: SequenceGroup) -> int: def _get_prompt_limit(self, seq_group: SequenceGroup) -> int:
if self.scheduler_config.chunked_prefill_enabled: if self.scheduler_config.chunked_prefill_enabled and \
not self.scheduler_config.is_multi_step:
prompt_limit = self.scheduler_config.max_model_len prompt_limit = self.scheduler_config.max_model_len
else: else:
prompt_limit = min(self.scheduler_config.max_model_len, prompt_limit = min(self.scheduler_config.max_model_len,
...@@ -899,15 +901,21 @@ class Scheduler: ...@@ -899,15 +901,21 @@ class Scheduler:
waiting_queue.popleft() waiting_queue.popleft()
continue continue
num_lookahead_slots: int = 0
if self.scheduler_config.is_multi_step and enable_chunking:
num_lookahead_slots = self._get_num_lookahead_slots(
True, enable_chunking)
# If the sequence group cannot be allocated, stop. # If the sequence group cannot be allocated, stop.
can_allocate = self.block_manager.can_allocate(seq_group) can_allocate = self.block_manager.can_allocate(
seq_group, num_lookahead_slots=num_lookahead_slots)
if can_allocate == AllocStatus.LATER: if can_allocate == AllocStatus.LATER:
break break
elif can_allocate == AllocStatus.NEVER: elif can_allocate == AllocStatus.NEVER:
logger.warning( logger.warning(
"Input prompt (%d tokens) is too long" "Input prompt (%d tokens) + lookahead slots (%d) is "
" and exceeds the capacity of block_manager", "too long and exceeds the capacity of block_manager",
num_new_tokens) num_new_tokens, num_lookahead_slots)
for seq in waiting_seqs: for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group) ignored_seq_groups.append(seq_group)
...@@ -939,9 +947,24 @@ class Scheduler: ...@@ -939,9 +947,24 @@ class Scheduler:
curr_loras.add(lora_int_id) curr_loras.add(lora_int_id)
waiting_queue.popleft() waiting_queue.popleft()
self._allocate_and_set_running(seq_group) self._allocate_and_set_running(seq_group)
seq_group.init_multi_step(
num_scheduler_steps=self._get_num_lookahead_slots( if enable_chunking and self.scheduler_config.is_multi_step:
is_prefill=True) + 1) blocks_to_copy: List[Tuple[int, int]] = []
# init_multi_step_from_lookahead_slots happens in append_slots
self._append_slots(seq_group, blocks_to_copy, enable_chunking)
# This assert will trip when a copy-on-write happens. This is
# not a concern as the very first sequence-group block
# allocation happens above. Still, we have the assert to
# catch any edge-cases.
assert not blocks_to_copy
else:
seq_group.init_multi_step_from_lookahead_slots(
num_lookahead_slots,
num_scheduler_steps=self.scheduler_config.
num_scheduler_steps,
is_multi_step=self.scheduler_config.is_multi_step,
enable_chunking=enable_chunking)
seq_groups.append( seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group, ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=num_new_tokens)) token_chunk_size=num_new_tokens))
...@@ -956,7 +979,8 @@ class Scheduler: ...@@ -956,7 +979,8 @@ class Scheduler:
return SchedulerPrefillOutputs( return SchedulerPrefillOutputs(
seq_groups=seq_groups, seq_groups=seq_groups,
ignored_seq_groups=ignored_seq_groups, ignored_seq_groups=ignored_seq_groups,
num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=True)) num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=True, enable_chunking=enable_chunking))
def _schedule_default(self) -> SchedulerOutputs: def _schedule_default(self) -> SchedulerOutputs:
"""Schedule queued requests. """Schedule queued requests.
...@@ -1153,7 +1177,8 @@ class Scheduler: ...@@ -1153,7 +1177,8 @@ class Scheduler:
else: else:
return self._schedule_default() return self._schedule_default()
def _can_append_slots(self, seq_group: SequenceGroup) -> bool: def _can_append_slots(self, seq_group: SequenceGroup,
enable_chunking: bool) -> bool:
"""Determine whether or not we have enough space in the KV cache to """Determine whether or not we have enough space in the KV cache to
continue generation of the sequence group. continue generation of the sequence group.
""" """
...@@ -1164,19 +1189,24 @@ class Scheduler: ...@@ -1164,19 +1189,24 @@ class Scheduler:
self.artificial_preempt_cnt -= 1 self.artificial_preempt_cnt -= 1
return False return False
# Appending slots only occurs in decoding. is_prefill = seq_group.is_prefill()
is_prefill = False num_lookahead_slots = self._get_num_lookahead_slots(
is_prefill, enable_chunking)
if is_prefill and num_lookahead_slots > 0:
# Appending prefill slots only happens multi-step and
# chunked-prefill are enabled together.
assert self.scheduler_config.is_multi_step and enable_chunking
return self.block_manager.can_append_slots( return self.block_manager.can_append_slots(
seq_group=seq_group, seq_group=seq_group, num_lookahead_slots=num_lookahead_slots)
num_lookahead_slots=self._get_num_lookahead_slots(is_prefill),
)
def _allow_async_output_proc(self, seq_group: SequenceGroup) -> bool: def _allow_async_output_proc(self, seq_group: SequenceGroup) -> bool:
no_beam_search = seq_group.sampling_params is None or ( # async_output_proc is allowed only when we have a single sequence
seq_group.sampling_params.best_of == 1 # in the sequence group
and not seq_group.sampling_params.use_beam_search) no_single_seq = seq_group.sampling_params is None or (
return no_beam_search seq_group.sampling_params.n == 1)
return no_single_seq
def schedule( def schedule(
self self
...@@ -1186,7 +1216,7 @@ class Scheduler: ...@@ -1186,7 +1216,7 @@ class Scheduler:
# such as self.running, self.swapped, and self.waiting. # such as self.running, self.swapped, and self.waiting.
scheduler_start_time = time.perf_counter() scheduler_start_time = time.perf_counter()
scheduler_outputs = self._schedule() scheduler_outputs: SchedulerOutputs = self._schedule()
now = time.time() now = time.time()
if not self.cache_config.enable_prefix_caching: if not self.cache_config.enable_prefix_caching:
...@@ -1279,6 +1309,7 @@ class Scheduler: ...@@ -1279,6 +1309,7 @@ class Scheduler:
# `multi_modal_data` will be None. # `multi_modal_data` will be None.
multi_modal_data=seq_group.multi_modal_data multi_modal_data=seq_group.multi_modal_data
if scheduler_outputs.num_prefill_groups > 0 else None, if scheduler_outputs.num_prefill_groups > 0 else None,
mm_processor_kwargs=seq_group.mm_processor_kwargs,
prompt_adapter_request=seq_group.prompt_adapter_request, prompt_adapter_request=seq_group.prompt_adapter_request,
) )
else: else:
...@@ -1383,11 +1414,10 @@ class Scheduler: ...@@ -1383,11 +1414,10 @@ class Scheduler:
for seq in seq_group.get_seqs(status=SequenceStatus.WAITING): for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
seq.status = SequenceStatus.RUNNING seq.status = SequenceStatus.RUNNING
def _append_slots( def _append_slots(self,
self, seq_group: SequenceGroup,
seq_group: SequenceGroup, blocks_to_copy: List[Tuple[int, int]],
blocks_to_copy: List[Tuple[int, int]], enable_chunking: bool = False) -> None:
) -> None:
"""Appends new slots to the sequences in the given sequence group. """Appends new slots to the sequences in the given sequence group.
Args: Args:
...@@ -1398,11 +1428,25 @@ class Scheduler: ...@@ -1398,11 +1428,25 @@ class Scheduler:
int is the destination block index. This list is updated with int is the destination block index. This list is updated with
the new source and destination block indices for the appended the new source and destination block indices for the appended
slots. slots.
enable_chunking (bool): True if chunked prefill is enabled.
""" """
num_lookahead_slots = self._get_num_lookahead_slots(is_prefill=False) is_prefill: bool = seq_group.is_prefill()
seq_group.init_multi_step(num_scheduler_steps=num_lookahead_slots + 1) num_lookahead_slots: int = self._get_num_lookahead_slots(
is_prefill, enable_chunking)
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
seq_group.init_multi_step_from_lookahead_slots(
num_lookahead_slots,
num_scheduler_steps=self.scheduler_config.num_scheduler_steps,
is_multi_step=self.scheduler_config.is_multi_step,
enable_chunking=enable_chunking)
seq_status: Optional[SequenceStatus] = SequenceStatus.RUNNING
if self.scheduler_config.is_multi_step and enable_chunking:
# In multi-step chunked-prefill any sequence type can have
# slots appended.
seq_status = None
for seq in seq_group.get_seqs(status=seq_status):
cows = self.block_manager.append_slots(seq, num_lookahead_slots) cows = self.block_manager.append_slots(seq, num_lookahead_slots)
if len(cows) > 0: if len(cows) > 0:
blocks_to_copy.extend(cows) blocks_to_copy.extend(cows)
...@@ -1513,16 +1557,32 @@ class Scheduler: ...@@ -1513,16 +1557,32 @@ class Scheduler:
passed_delay = True passed_delay = True
return passed_delay return passed_delay
def _get_num_lookahead_slots(self, is_prefill: bool) -> int: def _get_num_lookahead_slots(self, is_prefill: bool,
enable_chunking: bool) -> int:
"""The number of slots to allocate per sequence per step, beyond known """The number of slots to allocate per sequence per step, beyond known
token ids. Speculative decoding uses these slots to store KV activations token ids. Speculative decoding uses these slots to store KV activations
of tokens which may or may not be accepted. of tokens which may or may not be accepted.
Speculative decoding does not yet support prefill, so we do not perform Speculative decoding does not yet support prefill, so we do not perform
lookahead allocation for prefill. lookahead allocation for prefill.
When chunking is enabled with multi-step, we allocate lookahead slots
for the prefills for when the prefills turn into decodes in the first
step.
""" """
if is_prefill: if is_prefill:
return 0 if self.scheduler_config.is_multi_step and enable_chunking:
# num_lookahead_slots was introduced in the context of decodes,
# in Speculative Decoding.
# When the num_scheduler_steps is 8, say, then the
# num_lookahead_slots is 7. Meaning, we are doing a 1-step of
# decode anyways and we wish to do 7 more.
#
# "lookaheads" for prefills, is introduced in support for
# Chunked-Prefill in Multi-Step.
return self.scheduler_config.num_lookahead_slots + 1
else:
return 0
return self.scheduler_config.num_lookahead_slots return self.scheduler_config.num_lookahead_slots
...@@ -1549,10 +1609,29 @@ class Scheduler: ...@@ -1549,10 +1609,29 @@ class Scheduler:
# in a decode phase. Do not chunk. # in a decode phase. Do not chunk.
if enable_chunking and len(seqs) == 1: if enable_chunking and len(seqs) == 1:
remaining_token_budget = budget.remaining_token_budget() remaining_token_budget = budget.remaining_token_budget()
if self.cache_config.enable_prefix_caching: if self.scheduler_config.is_multi_step:
# The current multi-step + chunked prefill capability does
# not actually support chunking prompts.
#
# Therefore, `num_new_tokens` is computed in the same fashion
# for both multi-step+chunked-prefill &
# multi-step+chunked-prefill+APC
#
# Prompts with more tokens than the current remaining budget
# are postponed to future scheduler steps
if num_new_tokens > self._get_prompt_limit(seq_group):
# If the seq_group is in prompt-stage, pass the
# num_new_tokens as-is so the caller can ignore
# the sequence.
pass
else:
num_new_tokens = 0 \
if num_new_tokens > remaining_token_budget \
else num_new_tokens
elif self.cache_config.enable_prefix_caching:
# When prefix caching is enabled, we always allocate # When prefix caching is enabled, we always allocate
# the number of new tokens that is dividable by the block size # the number of new tokens that is dividable by the block
# to avoid partial block matching. # size to avoid partial block matching.
block_size = self.cache_config.block_size block_size = self.cache_config.block_size
remainder = budget.token_budget % block_size remainder = budget.token_budget % block_size
if remainder != 0: if remainder != 0:
......
...@@ -29,6 +29,10 @@ def _can_p2p(rank: int, world_size: int) -> bool: ...@@ -29,6 +29,10 @@ def _can_p2p(rank: int, world_size: int) -> bool:
for i in range(world_size): for i in range(world_size):
if i == rank: if i == rank:
continue continue
if envs.VLLM_SKIP_P2P_CHECK:
logger.info(
"Skipping P2P check and trusting the driver's P2P report.")
return torch.cuda.can_device_access_peer(rank, i)
if not gpu_p2p_access_check(rank, i): if not gpu_p2p_access_check(rank, i):
return False return False
return True return True
...@@ -262,24 +266,21 @@ class CustomAllreduce: ...@@ -262,24 +266,21 @@ class CustomAllreduce:
def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]: def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]:
# when custom allreduce is disabled, this will be None # when custom allreduce is disabled, this will be None
if self.disabled: if self.disabled or not self.should_custom_ar(input):
return None return None
if self._IS_CAPTURING: if self._IS_CAPTURING:
if torch.cuda.is_current_stream_capturing(): if torch.cuda.is_current_stream_capturing():
if self.should_custom_ar(input): return self.all_reduce_reg(input)
return self.all_reduce_reg(input)
else: else:
if self.should_custom_ar(input): # if warm up, mimic the allocation pattern
# if warm up, mimic the allocation pattern # since custom allreduce is out-of-place
# since custom allreduce is out-of-place return torch.empty_like(input)
return torch.empty_like(input)
else: else:
# note: outside of cuda graph context, # note: outside of cuda graph context,
# custom allreduce incurs a cost of cudaMemcpy, which should # custom allreduce incurs a cost of cudaMemcpy, which should
# be small(<=1% of overall latency) compared to the performance # be small(<=1% of overall latency) compared to the performance
# gains of using custom kernels # gains of using custom kernels
if self.should_custom_ar(input): return self.all_reduce_unreg(input)
return self.all_reduce_unreg(input)
return None return None
......
...@@ -105,7 +105,7 @@ if supports_custom_op(): ...@@ -105,7 +105,7 @@ if supports_custom_op():
group = _groups[group_name]() group = _groups[group_name]()
if group is None: if group is None:
raise ValueError(f"Group {group_name} is destroyed.") raise ValueError(f"Group {group_name} is destroyed.")
group._all_reduce(tensor) group._all_reduce_in_place(tensor)
@inplace_all_reduce.register_fake @inplace_all_reduce.register_fake
def _(tensor: torch.Tensor, group_name: str) -> None: def _(tensor: torch.Tensor, group_name: str) -> None:
...@@ -118,7 +118,7 @@ if supports_custom_op(): ...@@ -118,7 +118,7 @@ if supports_custom_op():
group = _groups[group_name]() group = _groups[group_name]()
if group is None: if group is None:
raise ValueError(f"Group {group_name} is destroyed.") raise ValueError(f"Group {group_name} is destroyed.")
return group._all_reduce(tensor) return group._all_reduce_out_place(tensor)
@outplace_all_reduce.register_fake @outplace_all_reduce.register_fake
def _(tensor: torch.Tensor, group_name: str) -> torch.Tensor: def _(tensor: torch.Tensor, group_name: str) -> torch.Tensor:
...@@ -338,14 +338,17 @@ class GroupCoordinator: ...@@ -338,14 +338,17 @@ class GroupCoordinator:
return input_ return input_
if not supports_custom_op(): if not supports_custom_op():
return self._all_reduce(input_) self._all_reduce_in_place(input_)
return input_
if self.tpu_communicator is not None and \ if self.tpu_communicator is not None and \
not self.tpu_communicator.disabled: not self.tpu_communicator.disabled:
# TPU handles Dynamo with its own logic. # TPU handles Dynamo with its own logic.
return self._all_reduce(input_) return self.tpu_communicator.all_reduce(input_)
if self.ca_comm is not None and self.ca_comm.should_custom_ar(input_): if self.ca_comm is not None and \
not self.ca_comm.disabled and \
self.ca_comm.should_custom_ar(input_):
return torch.ops.vllm.outplace_all_reduce( return torch.ops.vllm.outplace_all_reduce(
input_, group_name=self.unique_name) input_, group_name=self.unique_name)
else: else:
...@@ -353,25 +356,15 @@ class GroupCoordinator: ...@@ -353,25 +356,15 @@ class GroupCoordinator:
group_name=self.unique_name) group_name=self.unique_name)
return input_ return input_
def _all_reduce(self, input_: torch.Tensor) -> torch.Tensor: def _all_reduce_out_place(self, input_: torch.Tensor) -> torch.Tensor:
"""
The actual all-reduce implementation.
NOTE: This operation will be applied in-place or out-of-place.
Always assume this function modifies its input, but use the return
value as the output.
"""
ca_comm = self.ca_comm ca_comm = self.ca_comm
assert ca_comm is not None
assert not ca_comm.disabled
out = ca_comm.custom_all_reduce(input_)
assert out is not None
return out
# For TPUs, use TPU communicator. def _all_reduce_in_place(self, input_: torch.Tensor) -> None:
tpu_comm = self.tpu_communicator
if tpu_comm is not None and not tpu_comm.disabled:
return tpu_comm.all_reduce(input_)
if ca_comm is not None:
out = ca_comm.custom_all_reduce(input_)
if out is not None:
return out
pynccl_comm = self.pynccl_comm pynccl_comm = self.pynccl_comm
if (pynccl_comm is not None and not pynccl_comm.disabled): if (pynccl_comm is not None and not pynccl_comm.disabled):
pynccl_comm.all_reduce(input_) pynccl_comm.all_reduce(input_)
...@@ -380,7 +373,6 @@ class GroupCoordinator: ...@@ -380,7 +373,6 @@ class GroupCoordinator:
ipex.distributed.all_reduce(input_, group=self.device_group) ipex.distributed.all_reduce(input_, group=self.device_group)
else: else:
torch.distributed.all_reduce(input_, group=self.device_group) torch.distributed.all_reduce(input_, group=self.device_group)
return input_
def all_gather(self, input_: torch.Tensor, dim: int = -1) -> torch.Tensor: def all_gather(self, input_: torch.Tensor, dim: int = -1) -> torch.Tensor:
world_size = self.world_size world_size = self.world_size
......
...@@ -2,8 +2,8 @@ import argparse ...@@ -2,8 +2,8 @@ import argparse
import dataclasses import dataclasses
import json import json
from dataclasses import dataclass from dataclasses import dataclass
from typing import (TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, from typing import (TYPE_CHECKING, Any, Dict, List, Literal, Mapping, Optional,
Type, Union) Tuple, Type, Union, cast)
import torch import torch
...@@ -89,7 +89,7 @@ class EngineArgs: ...@@ -89,7 +89,7 @@ class EngineArgs:
trust_remote_code: bool = False trust_remote_code: bool = False
download_dir: Optional[str] = None download_dir: Optional[str] = None
load_format: str = 'auto' load_format: str = 'auto'
config_format: str = 'auto' config_format: ConfigFormat = ConfigFormat.AUTO
dtype: str = 'auto' dtype: str = 'auto'
kv_cache_dtype: str = 'auto' kv_cache_dtype: str = 'auto'
quantization_param_path: Optional[str] = None quantization_param_path: Optional[str] = None
...@@ -107,7 +107,7 @@ class EngineArgs: ...@@ -107,7 +107,7 @@ class EngineArgs:
block_size: int = 16 block_size: int = 16
enable_prefix_caching: bool = False enable_prefix_caching: bool = False
disable_sliding_window: bool = False disable_sliding_window: bool = False
use_v2_block_manager: bool = False use_v2_block_manager: bool = True
swap_space: float = 4 # GiB swap_space: float = 4 # GiB
cpu_offload_gb: float = 0 # GiB cpu_offload_gb: float = 0 # GiB
gpu_memory_utilization: float = 0.90 gpu_memory_utilization: float = 0.90
...@@ -145,7 +145,7 @@ class EngineArgs: ...@@ -145,7 +145,7 @@ class EngineArgs:
max_cpu_loras: Optional[int] = None max_cpu_loras: Optional[int] = None
device: str = 'auto' device: str = 'auto'
num_scheduler_steps: int = 1 num_scheduler_steps: int = 1
multi_step_stream_outputs: bool = False multi_step_stream_outputs: bool = True
ray_workers_use_nsight: bool = False ray_workers_use_nsight: bool = False
num_gpu_blocks_override: Optional[int] = None num_gpu_blocks_override: Optional[int] = None
num_lookahead_slots: int = 0 num_lookahead_slots: int = 0
...@@ -162,6 +162,7 @@ class EngineArgs: ...@@ -162,6 +162,7 @@ class EngineArgs:
speculative_model_quantization: Optional[str] = None speculative_model_quantization: Optional[str] = None
speculative_draft_tensor_parallel_size: Optional[int] = None speculative_draft_tensor_parallel_size: Optional[int] = None
num_speculative_tokens: Optional[int] = None num_speculative_tokens: Optional[int] = None
speculative_disable_mqa_scorer: Optional[bool] = False
speculative_max_model_len: Optional[int] = None speculative_max_model_len: Optional[int] = None
speculative_disable_by_batch_size: Optional[int] = None speculative_disable_by_batch_size: Optional[int] = None
ngram_prompt_lookup_max: Optional[int] = None ngram_prompt_lookup_max: Optional[int] = None
...@@ -177,11 +178,16 @@ class EngineArgs: ...@@ -177,11 +178,16 @@ class EngineArgs:
disable_async_output_proc: bool = False disable_async_output_proc: bool = False
override_neuron_config: Optional[Dict[str, Any]] = None override_neuron_config: Optional[Dict[str, Any]] = None
mm_processor_kwargs: Optional[Dict[str, Any]] = None mm_processor_kwargs: Optional[Dict[str, Any]] = None
scheduling_policy: Literal["fcfs", "priority"] = "fcfs"
def __post_init__(self): def __post_init__(self):
if self.tokenizer is None: if not self.tokenizer:
self.tokenizer = self.model self.tokenizer = self.model
# Setup plugins
from vllm.plugins import load_general_plugins
load_general_plugins()
@staticmethod @staticmethod
def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
"""Shared CLI arguments for vLLM engine.""" """Shared CLI arguments for vLLM engine."""
...@@ -369,7 +375,11 @@ class EngineArgs: ...@@ -369,7 +375,11 @@ class EngineArgs:
'capping to sliding window size') 'capping to sliding window size')
parser.add_argument('--use-v2-block-manager', parser.add_argument('--use-v2-block-manager',
action='store_true', action='store_true',
help='Use BlockSpaceMangerV2.') help='[DEPRECATED] block manager v1 has been '
'removed and SelfAttnBlockSpaceManager (i.e. '
'block manager v2) is now the default. '
'Setting this flag to True or False'
' has no effect on vLLM behavior.')
parser.add_argument( parser.add_argument(
'--num-lookahead-slots', '--num-lookahead-slots',
type=int, type=int,
...@@ -445,11 +455,12 @@ class EngineArgs: ...@@ -445,11 +455,12 @@ class EngineArgs:
'None, we assume the model weights are not ' 'None, we assume the model weights are not '
'quantized and use `dtype` to determine the data ' 'quantized and use `dtype` to determine the data '
'type of the weights.') 'type of the weights.')
parser.add_argument('--rope-scaling', parser.add_argument(
default=None, '--rope-scaling',
type=json.loads, default=None,
help='RoPE scaling configuration in JSON format. ' type=json.loads,
'For example, {"type":"dynamic","factor":2.0}') help='RoPE scaling configuration in JSON format. '
'For example, {"rope_type":"dynamic","factor":2.0}')
parser.add_argument('--rope-theta', parser.add_argument('--rope-theta',
default=None, default=None,
type=float, type=float,
...@@ -598,13 +609,17 @@ class EngineArgs: ...@@ -598,13 +609,17 @@ class EngineArgs:
parser.add_argument( parser.add_argument(
'--multi-step-stream-outputs', '--multi-step-stream-outputs',
action='store_true', action=StoreBoolean,
help='If True, then multi-step will stream outputs for every step') default=EngineArgs.multi_step_stream_outputs,
nargs="?",
const="True",
help='If False, then multi-step will stream outputs at the end '
'of all steps')
parser.add_argument( parser.add_argument(
'--scheduler-delay-factor', '--scheduler-delay-factor',
type=float, type=float,
default=EngineArgs.scheduler_delay_factor, default=EngineArgs.scheduler_delay_factor,
help='Apply a delay (of delay factor multiplied by previous' help='Apply a delay (of delay factor multiplied by previous '
'prompt latency) before scheduling next prompt.') 'prompt latency) before scheduling next prompt.')
parser.add_argument( parser.add_argument(
'--enable-chunked-prefill', '--enable-chunked-prefill',
...@@ -627,7 +642,7 @@ class EngineArgs: ...@@ -627,7 +642,7 @@ class EngineArgs:
type=nullable_str, type=nullable_str,
choices=[*QUANTIZATION_METHODS, None], choices=[*QUANTIZATION_METHODS, None],
default=EngineArgs.speculative_model_quantization, default=EngineArgs.speculative_model_quantization,
help='Method used to quantize the weights of speculative model.' help='Method used to quantize the weights of speculative model. '
'If None, we first check the `quantization_config` ' 'If None, we first check the `quantization_config` '
'attribute in the model config file. If that is ' 'attribute in the model config file. If that is '
'None, we assume the model weights are not ' 'None, we assume the model weights are not '
...@@ -639,6 +654,12 @@ class EngineArgs: ...@@ -639,6 +654,12 @@ class EngineArgs:
default=EngineArgs.num_speculative_tokens, default=EngineArgs.num_speculative_tokens,
help='The number of speculative tokens to sample from ' help='The number of speculative tokens to sample from '
'the draft model in speculative decoding.') 'the draft model in speculative decoding.')
parser.add_argument(
'--speculative-disable-mqa-scorer',
action='store_true',
help=
'If set to True, the MQA scorer will be disabled in speculative '
' and fall back to batch expansion')
parser.add_argument( parser.add_argument(
'--speculative-draft-tensor-parallel-size', '--speculative-draft-tensor-parallel-size',
'-spec-draft-tp', '-spec-draft-tp',
...@@ -789,13 +810,20 @@ class EngineArgs: ...@@ -789,13 +810,20 @@ class EngineArgs:
"lower performance.") "lower performance.")
parser.add_argument( parser.add_argument(
'--override-neuron-config', '--override-neuron-config',
type=lambda configs: { type=json.loads,
str(key): value
for key, value in
(config.split(':') for config in configs.split(','))
},
default=None, default=None,
help="override or set neuron device configuration.") help="Override or set neuron device configuration. "
"e.g. {\"cast_logits_dtype\": \"bloat16\"}.'")
parser.add_argument(
'--scheduling-policy',
choices=['fcfs', 'priority'],
default="fcfs",
help='The scheduling policy to use. "fcfs" (first come first served'
', i.e. requests are handled in order of arrival; default) '
'or "priority" (requests are handled based on given '
'priority (lower value means earlier handling) and time of '
'arrival deciding any ties).')
return parser return parser
...@@ -810,7 +838,8 @@ class EngineArgs: ...@@ -810,7 +838,8 @@ class EngineArgs:
def create_model_config(self) -> ModelConfig: def create_model_config(self) -> ModelConfig:
return ModelConfig( return ModelConfig(
model=self.model, model=self.model,
tokenizer=self.tokenizer, # We know this is not None because we set it in __post_init__
tokenizer=cast(str, self.tokenizer),
tokenizer_mode=self.tokenizer_mode, tokenizer_mode=self.tokenizer_mode,
trust_remote_code=self.trust_remote_code, trust_remote_code=self.trust_remote_code,
dtype=self.dtype, dtype=self.dtype,
...@@ -881,11 +910,13 @@ class EngineArgs: ...@@ -881,11 +910,13 @@ class EngineArgs:
self.enable_prefix_caching = False self.enable_prefix_caching = False
cache_config = CacheConfig( cache_config = CacheConfig(
# neuron needs block_size = max_model_len
block_size=self.block_size if self.device != "neuron" else block_size=self.block_size if self.device != "neuron" else
self.max_model_len, # neuron needs block_size = max_model_len (self.max_model_len if self.max_model_len is not None else 0),
gpu_memory_utilization=self.gpu_memory_utilization, gpu_memory_utilization=self.gpu_memory_utilization,
swap_space=self.swap_space, swap_space=self.swap_space,
cache_dtype=self.kv_cache_dtype, cache_dtype=self.kv_cache_dtype,
is_attention_free=model_config.is_attention_free,
num_gpu_blocks_override=self.num_gpu_blocks_override, num_gpu_blocks_override=self.num_gpu_blocks_override,
sliding_window=model_config.get_sliding_window(), sliding_window=model_config.get_sliding_window(),
enable_prefix_caching=self.enable_prefix_caching, enable_prefix_caching=self.enable_prefix_caching,
...@@ -919,13 +950,9 @@ class EngineArgs: ...@@ -919,13 +950,9 @@ class EngineArgs:
use_sliding_window = (model_config.get_sliding_window() use_sliding_window = (model_config.get_sliding_window()
is not None) is not None)
use_spec_decode = self.speculative_model is not None use_spec_decode = self.speculative_model is not None
has_seqlen_agnostic_layers = (
model_config.contains_seqlen_agnostic_layers(
parallel_config))
if (is_gpu and not use_sliding_window and not use_spec_decode if (is_gpu and not use_sliding_window and not use_spec_decode
and not self.enable_lora and not self.enable_lora
and not self.enable_prompt_adapter and not self.enable_prompt_adapter):
and not has_seqlen_agnostic_layers):
self.enable_chunked_prefill = True self.enable_chunked_prefill = True
logger.warning( logger.warning(
"Chunked prefill is enabled by default for models with " "Chunked prefill is enabled by default for models with "
...@@ -943,12 +970,6 @@ class EngineArgs: ...@@ -943,12 +970,6 @@ class EngineArgs:
"in low performance due to small KV cache space. Consider " "in low performance due to small KV cache space. Consider "
"setting --max-model-len to a smaller value.", max_model_len) "setting --max-model-len to a smaller value.", max_model_len)
if self.num_scheduler_steps > 1 and not self.use_v2_block_manager:
self.use_v2_block_manager = True
logger.warning(
"Enabled BlockSpaceManagerV2 because it is "
"required for multi-step (--num-scheduler-steps > 1)")
speculative_config = SpeculativeConfig.maybe_create_spec_config( speculative_config = SpeculativeConfig.maybe_create_spec_config(
target_model_config=model_config, target_model_config=model_config,
target_parallel_config=parallel_config, target_parallel_config=parallel_config,
...@@ -959,11 +980,11 @@ class EngineArgs: ...@@ -959,11 +980,11 @@ class EngineArgs:
speculative_draft_tensor_parallel_size = \ speculative_draft_tensor_parallel_size = \
self.speculative_draft_tensor_parallel_size, self.speculative_draft_tensor_parallel_size,
num_speculative_tokens=self.num_speculative_tokens, num_speculative_tokens=self.num_speculative_tokens,
speculative_disable_mqa_scorer=self.speculative_disable_mqa_scorer,
speculative_disable_by_batch_size=self. speculative_disable_by_batch_size=self.
speculative_disable_by_batch_size, speculative_disable_by_batch_size,
speculative_max_model_len=self.speculative_max_model_len, speculative_max_model_len=self.speculative_max_model_len,
enable_chunked_prefill=self.enable_chunked_prefill, enable_chunked_prefill=self.enable_chunked_prefill,
use_v2_block_manager=self.use_v2_block_manager,
disable_log_stats=self.disable_log_stats, disable_log_stats=self.disable_log_stats,
ngram_prompt_lookup_max=self.ngram_prompt_lookup_max, ngram_prompt_lookup_max=self.ngram_prompt_lookup_max,
ngram_prompt_lookup_min=self.ngram_prompt_lookup_min, ngram_prompt_lookup_min=self.ngram_prompt_lookup_min,
...@@ -976,13 +997,15 @@ class EngineArgs: ...@@ -976,13 +997,15 @@ class EngineArgs:
disable_logprobs=self.disable_logprobs_during_spec_decoding, disable_logprobs=self.disable_logprobs_during_spec_decoding,
) )
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if self.num_scheduler_steps > 1: if self.num_scheduler_steps > 1:
if speculative_config is not None: if speculative_config is not None:
raise ValueError("Speculative decoding is not supported with " raise ValueError("Speculative decoding is not supported with "
"multi-step (--num-scheduler-steps > 1)") "multi-step (--num-scheduler-steps > 1)")
if self.enable_chunked_prefill: if self.enable_chunked_prefill and self.pipeline_parallel_size > 1:
raise ValueError("Chunked prefill is not supported with " raise ValueError("Multi-Step Chunked-Prefill is not supported "
"multi-step (--num-scheduler-steps > 1)") "for pipeline-parallel-size > 1")
# make sure num_lookahead_slots is set the higher value depending on # make sure num_lookahead_slots is set the higher value depending on
# if we are using speculative decoding or multi-step # if we are using speculative decoding or multi-step
...@@ -992,11 +1015,20 @@ class EngineArgs: ...@@ -992,11 +1015,20 @@ class EngineArgs:
if speculative_config is None \ if speculative_config is None \
else speculative_config.num_lookahead_slots else speculative_config.num_lookahead_slots
if not self.use_v2_block_manager:
logger.warning(
"[DEPRECATED] Block manager v1 has been removed, "
"and setting --use-v2-block-manager to True or False has "
"no effect on vLLM behavior. Please remove "
"--use-v2-block-manager in your engine argument. "
"If your use case is not supported by "
"SelfAttnBlockSpaceManager (i.e. block manager v2),"
" please file an issue with detailed information.")
scheduler_config = SchedulerConfig( scheduler_config = SchedulerConfig(
max_num_batched_tokens=self.max_num_batched_tokens, max_num_batched_tokens=self.max_num_batched_tokens,
max_num_seqs=self.max_num_seqs, max_num_seqs=self.max_num_seqs,
max_model_len=model_config.max_model_len, max_model_len=model_config.max_model_len,
use_v2_block_manager=self.use_v2_block_manager,
num_lookahead_slots=num_lookahead_slots, num_lookahead_slots=num_lookahead_slots,
delay_factor=self.scheduler_delay_factor, delay_factor=self.scheduler_delay_factor,
enable_chunked_prefill=self.enable_chunked_prefill, enable_chunked_prefill=self.enable_chunked_prefill,
...@@ -1007,6 +1039,7 @@ class EngineArgs: ...@@ -1007,6 +1039,7 @@ class EngineArgs:
multi_step_stream_outputs=self.multi_step_stream_outputs, multi_step_stream_outputs=self.multi_step_stream_outputs,
send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER
and parallel_config.use_ray), and parallel_config.use_ray),
policy=self.scheduling_policy,
) )
lora_config = LoRAConfig( lora_config = LoRAConfig(
max_lora_rank=self.max_lora_rank, max_lora_rank=self.max_lora_rank,
...@@ -1051,13 +1084,6 @@ class EngineArgs: ...@@ -1051,13 +1084,6 @@ class EngineArgs:
or "all" in detailed_trace_modules, or "all" in detailed_trace_modules,
) )
if (model_config.get_sliding_window() is not None
and scheduler_config.chunked_prefill_enabled
and not scheduler_config.use_v2_block_manager):
raise ValueError(
"Chunked prefill is not supported with sliding window. "
"Set --disable-sliding-window to disable sliding window.")
return EngineConfig( return EngineConfig(
model_config=model_config, model_config=model_config,
cache_config=cache_config, cache_config=cache_config,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment