Commit f48954a4 authored by zhuwenwen's avatar zhuwenwen
Browse files

merge v0.5.0

parents 1dba29d3 8f89d720
...@@ -247,7 +247,7 @@ class ROCmFlashAttentionImpl(AttentionImpl): ...@@ -247,7 +247,7 @@ class ROCmFlashAttentionImpl(AttentionImpl):
self.use_naive_attn = True self.use_naive_attn = True
if self.use_naive_attn: if self.use_naive_attn:
self.attn_func = _naive_attention self.attn_func = _sdpa_attention
logger.debug("Using naive attention in ROCmBackend") logger.debug("Using naive attention in ROCmBackend")
def repeat_kv(self, x: torch.Tensor, n_rep: int) -> torch.Tensor: def repeat_kv(self, x: torch.Tensor, n_rep: int) -> torch.Tensor:
...@@ -342,11 +342,18 @@ class ROCmFlashAttentionImpl(AttentionImpl): ...@@ -342,11 +342,18 @@ class ROCmFlashAttentionImpl(AttentionImpl):
# Interleave for MQA workaround. # Interleave for MQA workaround.
key = self.repeat_kv(key, self.num_queries_per_kv) key = self.repeat_kv(key, self.num_queries_per_kv)
value = self.repeat_kv(value, self.num_queries_per_kv) value = self.repeat_kv(value, self.num_queries_per_kv)
query = query.movedim(0, query.dim() - 2)
key = key.movedim(0, key.dim() - 2)
value = value.movedim(0, value.dim() - 2)
# sdpa math backend attention
out = self.attn_func( out = self.attn_func(
query, query,
key, key,
value, value,
prefill_meta.seq_lens, prefill_meta.seq_lens,
num_tokens,
self.num_heads,
self.head_size,
self.scale, self.scale,
) )
else: else:
...@@ -402,45 +409,34 @@ class ROCmFlashAttentionImpl(AttentionImpl): ...@@ -402,45 +409,34 @@ class ROCmFlashAttentionImpl(AttentionImpl):
return output.view(num_tokens, hidden_size) return output.view(num_tokens, hidden_size)
def _naive_attention( def _sdpa_attention(
query: torch.Tensor, query: torch.Tensor,
key: torch.Tensor, key: torch.Tensor,
value: torch.Tensor, value: torch.Tensor,
seq_lens: List[int], seq_lens: List[int],
num_tokens: int,
num_heads: int,
head_size: int,
scale: float, scale: float,
) -> torch.Tensor: ) -> torch.Tensor:
output = torch.empty_like(query)
start = 0 start = 0
for _, seq_len in enumerate(seq_lens): output = torch.empty((num_tokens, num_heads, head_size),
dtype=query.dtype,
device=query.device)
for seq_len in seq_lens:
end = start + seq_len end = start + seq_len
out = _naive_masked_attention( with torch.backends.cuda.sdp_kernel(enable_math=True,
query[start:end], enable_flash=False,
key[start:end], enable_mem_efficient=False):
value[start:end], sub_out = torch.nn.functional.scaled_dot_product_attention(
scale, query[:, start:end, :],
) key[:, start:end, :],
# TODO(woosuk): Unnecessary copy. Optimize. value[:, start:end, :],
output[start:end].copy_(out) dropout_p=0.0,
start += seq_len is_causal=True,
scale=scale).movedim(query.dim() - 2, 0)
output[start:end, :, :] = sub_out
start = end
return output return output
def _naive_masked_attention(
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
scale: float,
) -> torch.Tensor:
seq_len, head_size, head_dim = query.shape
attn_mask = torch.triu(torch.ones(seq_len,
seq_len,
dtype=query.dtype,
device=query.device),
diagonal=1)
attn_mask = attn_mask * torch.finfo(query.dtype).min
attn_weights = scale * torch.einsum("qhd,khd->hqk", query, key).float()
attn_weights = attn_weights + attn_mask.float()
attn_weights = torch.softmax(attn_weights, dim=-1).to(value.dtype)
out = torch.einsum("hqk,khd->qhd", attn_weights, value)
return out
...@@ -31,15 +31,14 @@ def get_attn_backend( ...@@ -31,15 +31,14 @@ def get_attn_backend(
block_size: int, block_size: int,
is_blocksparse: bool = False, is_blocksparse: bool = False,
) -> Type[AttentionBackend]: ) -> Type[AttentionBackend]:
"""Selects which attention backend to use and lazily imports it."""
if is_blocksparse: if is_blocksparse:
logger.info("Using BlocksparseFlashAttention backend.") logger.info("Using BlocksparseFlashAttention backend.")
from vllm.attention.backends.blocksparse_attn import ( from vllm.attention.backends.blocksparse_attn import (
BlocksparseFlashAttentionBackend) BlocksparseFlashAttentionBackend)
return BlocksparseFlashAttentionBackend return BlocksparseFlashAttentionBackend
"""Determine which attention backend to use and only import
the selected backend module.
"""
backend = which_attn_to_use(num_heads, head_size, num_kv_heads, backend = which_attn_to_use(num_heads, head_size, num_kv_heads,
sliding_window, dtype, kv_cache_dtype, sliding_window, dtype, kv_cache_dtype,
block_size) block_size)
......
import enum import enum
import json import json
from dataclasses import dataclass, field, fields from dataclasses import dataclass, field, fields
from typing import TYPE_CHECKING, ClassVar, List, Optional, Tuple, Union from typing import (TYPE_CHECKING, Any, ClassVar, Dict, List, Optional, Tuple,
Union)
import torch import torch
from transformers import PretrainedConfig from transformers import PretrainedConfig, PreTrainedTokenizerBase
from vllm.logger import init_logger from vllm.logger import init_logger
from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS
...@@ -92,6 +93,7 @@ class ModelConfig: ...@@ -92,6 +93,7 @@ class ModelConfig:
revision: Optional[str] = None, revision: Optional[str] = None,
code_revision: Optional[str] = None, code_revision: Optional[str] = None,
rope_scaling: Optional[dict] = None, rope_scaling: Optional[dict] = None,
rope_theta: Optional[float] = None,
tokenizer_revision: Optional[str] = None, tokenizer_revision: Optional[str] = None,
max_model_len: Optional[int] = None, max_model_len: Optional[int] = None,
quantization: Optional[str] = None, quantization: Optional[str] = None,
...@@ -99,7 +101,7 @@ class ModelConfig: ...@@ -99,7 +101,7 @@ class ModelConfig:
enforce_eager: bool = False, enforce_eager: bool = False,
max_context_len_to_capture: Optional[int] = None, max_context_len_to_capture: Optional[int] = None,
max_seq_len_to_capture: Optional[int] = None, max_seq_len_to_capture: Optional[int] = None,
max_logprobs: int = 5, max_logprobs: int = 20,
disable_sliding_window: bool = False, disable_sliding_window: bool = False,
skip_tokenizer_init: bool = False, skip_tokenizer_init: bool = False,
served_model_name: Optional[Union[str, List[str]]] = None, served_model_name: Optional[Union[str, List[str]]] = None,
...@@ -112,7 +114,12 @@ class ModelConfig: ...@@ -112,7 +114,12 @@ class ModelConfig:
self.revision = revision self.revision = revision
self.code_revision = code_revision self.code_revision = code_revision
self.rope_scaling = rope_scaling self.rope_scaling = rope_scaling
self.tokenizer_revision = tokenizer_revision self.rope_theta = rope_theta
# The tokenizer version is consistent with the model version by default.
if tokenizer_revision is None:
self.tokenizer_revision = revision
else:
self.tokenizer_revision = tokenizer_revision
self.quantization = quantization self.quantization = quantization
self.quantization_param_path = quantization_param_path self.quantization_param_path = quantization_param_path
self.enforce_eager = enforce_eager self.enforce_eager = enforce_eager
...@@ -127,7 +134,7 @@ class ModelConfig: ...@@ -127,7 +134,7 @@ class ModelConfig:
self.skip_tokenizer_init = skip_tokenizer_init self.skip_tokenizer_init = skip_tokenizer_init
self.hf_config = get_config(self.model, trust_remote_code, revision, self.hf_config = get_config(self.model, trust_remote_code, revision,
code_revision, rope_scaling) code_revision, rope_scaling, rope_theta)
self.hf_text_config = get_hf_text_config(self.hf_config) self.hf_text_config = get_hf_text_config(self.hf_config)
self.dtype = _get_and_verify_dtype(self.hf_text_config, dtype) self.dtype = _get_and_verify_dtype(self.hf_text_config, dtype)
self.max_model_len = _get_and_verify_max_len( self.max_model_len = _get_and_verify_max_len(
...@@ -159,12 +166,8 @@ class ModelConfig: ...@@ -159,12 +166,8 @@ class ModelConfig:
def _parse_quant_hf_config(self): def _parse_quant_hf_config(self):
quant_cfg = getattr(self.hf_config, "quantization_config", None) quant_cfg = getattr(self.hf_config, "quantization_config", None)
if quant_cfg is None: if quant_cfg is None:
# SparseML uses a "compression_config" with a "quantization_config". # compress-tensors uses a "compression_config" key
compression_cfg = getattr(self.hf_config, "compression_config", quant_cfg = getattr(self.hf_config, "compression_config", None)
None)
if compression_cfg is not None:
quant_cfg = compression_cfg.get("quantization_config", None)
return quant_cfg return quant_cfg
def _verify_quantization(self) -> None: def _verify_quantization(self) -> None:
...@@ -241,6 +244,12 @@ class ModelConfig: ...@@ -241,6 +244,12 @@ class ModelConfig:
"must be divisible by pipeline parallel size " "must be divisible by pipeline parallel size "
f"({pipeline_parallel_size}).") f"({pipeline_parallel_size}).")
if self.quantization == "bitsandbytes" and (
parallel_config.tensor_parallel_size > 1
or parallel_config.pipeline_parallel_size > 1):
raise ValueError(
"BitAndBytes quantization with TP or PP is not supported yet.")
def get_hf_config_sliding_window(self) -> Optional[int]: def get_hf_config_sliding_window(self) -> Optional[int]:
"""Get the sliding window size, or None if disabled. """Get the sliding window size, or None if disabled.
""" """
...@@ -327,7 +336,7 @@ class ModelConfig: ...@@ -327,7 +336,7 @@ class ModelConfig:
def get_num_attention_heads(self, def get_num_attention_heads(self,
parallel_config: "ParallelConfig") -> int: parallel_config: "ParallelConfig") -> int:
return self.hf_text_config.num_attention_heads // \ return self.hf_text_config.num_attention_heads // \
parallel_config.tensor_parallel_size parallel_config.tensor_parallel_size
def get_num_layers(self, parallel_config: "ParallelConfig") -> int: def get_num_layers(self, parallel_config: "ParallelConfig") -> int:
total_num_hidden_layers = self.hf_text_config.num_hidden_layers total_num_hidden_layers = self.hf_text_config.num_hidden_layers
...@@ -487,6 +496,7 @@ class LoadFormat(str, enum.Enum): ...@@ -487,6 +496,7 @@ class LoadFormat(str, enum.Enum):
DUMMY = "dummy" DUMMY = "dummy"
TENSORIZER = "tensorizer" TENSORIZER = "tensorizer"
SHARDED_STATE = "sharded_state" SHARDED_STATE = "sharded_state"
BITSANDBYTES = "bitsandbytes"
@dataclass @dataclass
...@@ -593,9 +603,25 @@ class ParallelConfig: ...@@ -593,9 +603,25 @@ class ParallelConfig:
f"'{self.distributed_executor_backend}'.") f"'{self.distributed_executor_backend}'.")
if self.distributed_executor_backend is None and self.world_size > 1: if self.distributed_executor_backend is None and self.world_size > 1:
# We use multiprocessing by default if world_size fits on the
# current node and we aren't in a ray placement group.
from torch.cuda import device_count
from vllm.executor import ray_utils from vllm.executor import ray_utils
backend = "mp"
ray_found = ray_utils.ray is not None ray_found = ray_utils.ray is not None
self.distributed_executor_backend = "ray" if ray_found else "mp" if device_count() < self.world_size:
if not ray_found:
raise ValueError("Unable to load Ray which is "
"required for multi-node inference")
backend = "ray"
elif ray_found:
from ray.util import get_current_placement_group
if self.placement_group or get_current_placement_group():
backend = "ray"
self.distributed_executor_backend = backend
logger.info("Defaulting to use %s for distributed inference",
backend)
self._verify_args() self._verify_args()
...@@ -644,19 +670,24 @@ class SchedulerConfig: ...@@ -644,19 +670,24 @@ class SchedulerConfig:
enable_chunked_prefill: If True, prefill requests can be chunked based enable_chunked_prefill: If True, prefill requests can be chunked based
on the remaining max_num_batched_tokens. on the remaining max_num_batched_tokens.
embedding_mode: Whether the running model is for embedding. embedding_mode: Whether the running model is for embedding.
preemption_mode: Whether to perform preemption by swapping or
recomputation. If not specified, we determine the mode as follows:
We use recomputation by default since it incurs lower overhead than
swapping. However, when the sequence group has multiple sequences
(e.g., beam search), recomputation is not currently supported. In
such a case, we use swapping instead.
""" """
def __init__( def __init__(self,
self, max_num_batched_tokens: Optional[int],
max_num_batched_tokens: Optional[int], max_num_seqs: int,
max_num_seqs: int, max_model_len: int,
max_model_len: int, use_v2_block_manager: bool = False,
use_v2_block_manager: bool = False, num_lookahead_slots: int = 0,
num_lookahead_slots: int = 0, delay_factor: float = 0.0,
delay_factor: float = 0.0, enable_chunked_prefill: bool = False,
enable_chunked_prefill: bool = False, embedding_mode: Optional[bool] = False,
embedding_mode: Optional[bool] = False, preemption_mode: Optional[str] = None) -> None:
) -> None:
if max_num_batched_tokens is not None: if max_num_batched_tokens is not None:
self.max_num_batched_tokens = max_num_batched_tokens self.max_num_batched_tokens = max_num_batched_tokens
else: else:
...@@ -682,6 +713,7 @@ class SchedulerConfig: ...@@ -682,6 +713,7 @@ class SchedulerConfig:
self.delay_factor = delay_factor self.delay_factor = delay_factor
self.chunked_prefill_enabled = enable_chunked_prefill self.chunked_prefill_enabled = enable_chunked_prefill
self.embedding_mode = embedding_mode self.embedding_mode = embedding_mode
self.preemption_mode = preemption_mode
self._verify_args() self._verify_args()
...@@ -1087,10 +1119,12 @@ class VisionLanguageConfig: ...@@ -1087,10 +1119,12 @@ class VisionLanguageConfig:
# worst case scenario (biggest supported resolution). # worst case scenario (biggest supported resolution).
image_input_shape: tuple image_input_shape: tuple
image_feature_size: int image_feature_size: int
# The image processor to load from HuggingFace
image_processor: Optional[str]
image_processor_revision: Optional[str]
@classmethod @classmethod
def get_image_input_enum_type( def get_image_input_enum_type(cls, value: str) -> ImageInputType:
cls, value: str) -> "VisionLanguageConfig.ImageInputType":
"""Get the image input type from a string.""" """Get the image input type from a string."""
try: try:
return cls.ImageInputType[value.upper()] return cls.ImageInputType[value.upper()]
...@@ -1099,6 +1133,35 @@ class VisionLanguageConfig: ...@@ -1099,6 +1133,35 @@ class VisionLanguageConfig:
f"Expecting to choose from " f"Expecting to choose from "
f"{[x.name for x in cls.ImageInputType]}.") from e f"{[x.name for x in cls.ImageInputType]}.") from e
#TODO(ywang96): make this a cached property once we refactor the
# VisionLanguageConfig class.
def get_image_token_text(
self, tokenizer: PreTrainedTokenizerBase) -> Tuple[str, str]:
"""Get the image token placeholder text to be inserted into the
text prompt and the string representation of the image token id.
"""
image_token_str = tokenizer.decode(self.image_token_id)
return image_token_str * self.image_feature_size, image_token_str
def as_cli_args_dict(self) -> Dict[str, Any]:
"""Flatten vision language config to pure args.
Compatible with what llm entrypoint expects.
"""
result: Dict[str, Any] = {}
for f in fields(self):
value = getattr(self, f.name)
if isinstance(value, enum.Enum):
result[f.name] = value.name.lower()
elif isinstance(value, tuple):
result[f.name] = ",".join([str(item) for item in value])
else:
result[f.name] = value
result["disable_image_processor"] = self.image_processor is None
return result
_STR_DTYPE_TO_TORCH_DTYPE = { _STR_DTYPE_TO_TORCH_DTYPE = {
"half": torch.float16, "half": torch.float16,
...@@ -1208,7 +1271,7 @@ def _get_and_verify_max_len( ...@@ -1208,7 +1271,7 @@ def _get_and_verify_max_len(
logger.warning( logger.warning(
"The model's config.json does not contain any of the following " "The model's config.json does not contain any of the following "
"keys to determine the original maximum length of the model: " "keys to determine the original maximum length of the model: "
"%d. Assuming the model's maximum length is %d.", possible_keys, "%s. Assuming the model's maximum length is %d.", possible_keys,
default_max_len) default_max_len)
derived_max_model_len = default_max_len derived_max_model_len = default_max_len
......
...@@ -283,6 +283,10 @@ class BlockTable: ...@@ -283,6 +283,10 @@ class BlockTable:
def _is_allocated(self) -> bool: def _is_allocated(self) -> bool:
return len(self._blocks) > 0 return len(self._blocks) > 0
@property
def blocks(self) -> Optional[List[Block]]:
return self._blocks
@property @property
def _num_empty_slots(self) -> int: def _num_empty_slots(self) -> int:
assert self._is_allocated assert self._is_allocated
......
...@@ -140,7 +140,6 @@ class CopyOnWriteTracker: ...@@ -140,7 +140,6 @@ class CopyOnWriteTracker:
assert refcount != 0 assert refcount != 0
if refcount > 1: if refcount > 1:
src_block_id = block_id src_block_id = block_id
# Decrement refcount of the old block. # Decrement refcount of the old block.
self._allocator.free(block) self._allocator.free(block)
......
...@@ -90,11 +90,8 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator): ...@@ -90,11 +90,8 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
gpu_block_allocator=gpu_allocator, gpu_block_allocator=gpu_allocator,
) )
def __init__( def __init__(self, cpu_block_allocator: BlockAllocator,
self, gpu_block_allocator: BlockAllocator):
cpu_block_allocator: BlockAllocator,
gpu_block_allocator: BlockAllocator,
):
assert not ( assert not (
cpu_block_allocator.all_block_ids cpu_block_allocator.all_block_ids
& gpu_block_allocator.all_block_ids & gpu_block_allocator.all_block_ids
...@@ -105,6 +102,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator): ...@@ -105,6 +102,7 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
Device.GPU: gpu_block_allocator, Device.GPU: gpu_block_allocator,
} }
self._swap_mapping: Dict[int, int] = {}
self._null_block: Optional[Block] = None self._null_block: Optional[Block] = None
self._block_ids_to_allocator: Dict[int, BlockAllocator] = {} self._block_ids_to_allocator: Dict[int, BlockAllocator] = {}
...@@ -198,6 +196,68 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator): ...@@ -198,6 +196,68 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
def get_num_total_blocks(self, device: Device) -> int: def get_num_total_blocks(self, device: Device) -> int:
return self._allocators[device].get_num_total_blocks() return self._allocators[device].get_num_total_blocks()
def get_physical_block_id(self, device: Device, absolute_id: int) -> int:
"""Returns the zero-offset block id on certain device given the
absolute block id.
Args:
device (Device): The device for which to query relative block id.
absolute_id (int): The absolute block id for the block in
whole allocator.
Returns:
int: The zero-offset block id on certain device.
"""
return self._allocators[device].get_physical_block_id(absolute_id)
def swap(self, blocks: List[Block], source_device: Device,
dest_device: Device) -> Dict[int, int]:
"""Execute the swap for the given blocks from source_device
on to dest_device, save the current swap mapping and append
them to the accumulated `self._swap_mapping` for each
scheduling move.
Args:
blocks: List of blocks to be swapped.
source_device (Device): Device to swap the 'blocks' from.
dest_device (Device): Device to swap the 'blocks' to.
Returns:
Dict[int, int]: Swap mapping from source_device
on to dest_device.
"""
source_block_ids = [block.block_id for block in blocks]
self._allocators[source_device].swap_out(blocks)
self._allocators[dest_device].swap_in(blocks)
dest_block_ids = [block.block_id for block in blocks]
current_swap_mapping: Dict[int, int] = {}
for src, dest in zip(source_block_ids, dest_block_ids):
if src is not None and dest is not None:
self._swap_mapping[src] = dest
current_swap_mapping[src] = dest
return current_swap_mapping
def get_num_blocks_touched(self,
blocks: List[Block],
device: Device,
num_lookahead_slots: int = 0) -> int:
"""Returns the number of blocks that will be touched by
swapping in/out the given blocks on to the 'device'.
Args:
blocks: List of blocks to be swapped.
device (Device): Device to swap the 'blocks' on.
num_lookahead_slots (int): Number of lookahead slots used in
speculative decoding, default to 0.
Returns:
int: the number of blocks that will be touched by
swapping in/out the given blocks on to the 'device'.
"""
return self._allocators[device].get_num_blocks_touched(
blocks, num_lookahead_slots)
def clear_copy_on_writes(self) -> List[Tuple[int, int]]: def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
"""Clears the copy-on-write (CoW) state and returns the mapping of """Clears the copy-on-write (CoW) state and returns the mapping of
source to destination block IDs. source to destination block IDs.
...@@ -240,6 +300,18 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator): ...@@ -240,6 +300,18 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
def cow_block_if_not_appendable(self, block: Block) -> Optional[BlockId]: def cow_block_if_not_appendable(self, block: Block) -> Optional[BlockId]:
raise NotImplementedError raise NotImplementedError
def get_and_reset_swaps(self) -> List[Tuple[int, int]]:
"""Returns and clears the mapping of source to destination block IDs.
Will be called after every swapping operations for now, and after every
schedule when BlockManagerV2 become default. Currently not useful.
Returns:
List[Tuple[int, int]]: A mapping of source to destination block IDs.
"""
mapping = self._swap_mapping.copy()
self._swap_mapping.clear()
return list(mapping.items())
class NullBlock(Block): class NullBlock(Block):
""" """
......
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import FrozenSet, List, Optional, Protocol, Tuple from typing import Dict, FrozenSet, List, Optional, Protocol, Tuple
from vllm.utils import Device from vllm.utils import Device
...@@ -116,6 +116,18 @@ class BlockAllocator(ABC): ...@@ -116,6 +116,18 @@ class BlockAllocator(ABC):
def get_num_free_blocks(self) -> int: def get_num_free_blocks(self) -> int:
pass pass
@abstractmethod
def get_physical_block_id(self, absolute_id: int) -> int:
pass
@abstractmethod
def swap_out(self, blocks: List[Block]) -> None:
pass
@abstractmethod
def swap_in(self, blocks: List[Block]) -> None:
pass
@property @property
@abstractmethod @abstractmethod
def all_block_ids(self) -> FrozenSet[int]: def all_block_ids(self) -> FrozenSet[int]:
...@@ -149,6 +161,12 @@ class BlockAllocator(ABC): ...@@ -149,6 +161,12 @@ class BlockAllocator(ABC):
"""NOTE: This should not be used besides Block""" """NOTE: This should not be used besides Block"""
pass pass
@abstractmethod
def get_num_blocks_touched(self,
blocks: List[Block],
num_lookahead_slots: int = 0) -> int:
pass
class NoFreeBlocksError(ValueError): class NoFreeBlocksError(ValueError):
pass pass
...@@ -204,6 +222,22 @@ class DeviceAwareBlockAllocator(ABC): ...@@ -204,6 +222,22 @@ class DeviceAwareBlockAllocator(ABC):
self, seq_block_ids: List[List[int]]) -> List[int]: self, seq_block_ids: List[List[int]]) -> List[int]:
pass pass
@abstractmethod
def get_num_blocks_touched(self,
blocks: List[Block],
device: Device,
num_lookahead_slots: int = 0) -> int:
pass
@abstractmethod
def swap(self, blocks: List[Block], source_device: Device,
dest_device: Device) -> Dict[int, int]:
pass
@abstractmethod
def get_physical_block_id(self, device: Device, absolute_id: int) -> int:
pass
@abstractmethod @abstractmethod
def allocate_or_get_null_block(self) -> Block: def allocate_or_get_null_block(self) -> Block:
""" """
......
...@@ -3,6 +3,7 @@ from typing import FrozenSet, Iterable, List, Optional, Set, Tuple ...@@ -3,6 +3,7 @@ from typing import FrozenSet, Iterable, List, Optional, Set, Tuple
from vllm.core.block.common import (CopyOnWriteTracker, RefCounter, from vllm.core.block.common import (CopyOnWriteTracker, RefCounter,
get_all_blocks_recursively) get_all_blocks_recursively)
from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device
from vllm.utils import cdiv
Refcount = int Refcount = int
...@@ -95,8 +96,6 @@ class NaiveBlockAllocator(BlockAllocator): ...@@ -95,8 +96,6 @@ class NaiveBlockAllocator(BlockAllocator):
def free(self, block: Block) -> None: def free(self, block: Block) -> None:
assert block.block_id is not None assert block.block_id is not None
self._free_block_id(block.block_id) self._free_block_id(block.block_id)
# Mark the block as having no allocation.
block.block_id = None block.block_id = None
def fork(self, last_block: Block) -> List[Block]: def fork(self, last_block: Block) -> List[Block]:
...@@ -153,6 +152,19 @@ class NaiveBlockAllocator(BlockAllocator): ...@@ -153,6 +152,19 @@ class NaiveBlockAllocator(BlockAllocator):
if refcount == 0: if refcount == 0:
self._free_block_indices.add(block_id) self._free_block_indices.add(block_id)
def get_physical_block_id(self, absolute_id: int) -> int:
"""Returns the zero-offset block id on certain block allocator
given the absolute block id.
Args:
absolute_id (int): The absolute block id for the block
in whole allocator.
Returns:
int: The zero-offset block id on certain device.
"""
return sorted(self._all_block_indices).index(absolute_id)
@property @property
def refcounter(self): def refcounter(self):
return self._refcounter return self._refcounter
...@@ -213,6 +225,56 @@ class NaiveBlockAllocator(BlockAllocator): ...@@ -213,6 +225,56 @@ class NaiveBlockAllocator(BlockAllocator):
def promote_to_immutable_block(self, block: Block) -> BlockId: def promote_to_immutable_block(self, block: Block) -> BlockId:
raise NotImplementedError raise NotImplementedError
def get_num_blocks_touched(self,
blocks: List[Block],
num_lookahead_slots: int = 0) -> int:
"""Determine the number of blocks that will be touched by
swapping in/out the given blocks from certain sequence
group with the provided num_lookahead_slots.
Args:
blocks (List[Block]): The potential blocks to swap.
num_lookahead_slots (int): number of lookahead slots (0 for swap
out).
Returns:
int: the number of blocks that will be touched by
swapping in/out the given blocks and num_lookahead_slots.
"""
# NOTE: for naive block, we use set to eliminate common blocks among
# seqs, also we compare the empty slots in the mutable blocks with
# lookahead slots to get the number of unique new block that are
# needed.
old_block_set = set()
new_block_count = 0
# TODO(cade): make sure the logic is correct and clean it up.
for block in blocks:
if not block.is_full and num_lookahead_slots != 0:
if block.num_empty_slots >= num_lookahead_slots:
new_block_count += 1
else:
new_block_count += cdiv(
num_lookahead_slots - block.num_empty_slots,
self._block_size)
else:
old_block_set.add(block.block_id)
num_touched_blocks = new_block_count + len(old_block_set)
return num_touched_blocks
def swap_out(self, blocks: List[Block]) -> None:
for block in blocks:
self.free(block)
def swap_in(self, blocks: List[Block]) -> None:
for block in blocks:
if block.is_full:
alloc = self.allocate_immutable(block.prev_block,
block.token_ids)
else:
alloc = self.allocate_mutable(block.prev_block)
alloc.append_token_ids(block.token_ids)
block.block_id = alloc.block_id
class NaiveBlock(Block): class NaiveBlock(Block):
"""An implementation of the Block class that does not support prefix """An implementation of the Block class that does not support prefix
......
"""Token blocks.""" """Token blocks."""
from itertools import takewhile from itertools import takewhile
from os.path import commonprefix from os.path import commonprefix
from typing import Dict, FrozenSet, Iterable, List, Optional, Tuple from typing import Dict, FrozenSet, Iterable, List, Optional, Tuple
...@@ -8,6 +9,7 @@ from vllm.core.block.common import (CopyOnWriteTracker, ...@@ -8,6 +9,7 @@ from vllm.core.block.common import (CopyOnWriteTracker,
from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device from vllm.core.block.interfaces import Block, BlockAllocator, BlockId, Device
from vllm.core.block.naive_block import NaiveBlock, NaiveBlockAllocator from vllm.core.block.naive_block import NaiveBlock, NaiveBlockAllocator
from vllm.core.evictor_v2 import EvictionPolicy, Evictor, make_evictor from vllm.core.evictor_v2 import EvictionPolicy, Evictor, make_evictor
from vllm.utils import cdiv
PrefixHash = int PrefixHash = int
...@@ -294,10 +296,29 @@ class PrefixCachingBlockAllocator(BlockAllocator): ...@@ -294,10 +296,29 @@ class PrefixCachingBlockAllocator(BlockAllocator):
def get_num_total_blocks(self) -> int: def get_num_total_blocks(self) -> int:
return self._hashless_allocator.get_num_total_blocks() return self._hashless_allocator.get_num_total_blocks()
def get_physical_block_id(self, absolute_id: int) -> int:
"""Returns the zero-offset block id on certain block allocator
given the absolute block id.
Args:
absolute_id (int): The absolute block id for the block
in whole allocator.
Returns:
int: The rzero-offset block id on certain device.
"""
return sorted(self.all_block_ids).index(absolute_id)
@property @property
def all_block_ids(self) -> FrozenSet[int]: def all_block_ids(self) -> FrozenSet[int]:
return self._hashless_allocator.all_block_ids return self._hashless_allocator.all_block_ids
def is_block_cached(self, block: Block) -> bool:
assert block.content_hash is not None
if block.content_hash in self._cached_blocks:
return True
return False
def promote_to_immutable_block(self, block: Block) -> BlockId: def promote_to_immutable_block(self, block: Block) -> BlockId:
"""Once a mutable block is full, it can be promoted to an immutable """Once a mutable block is full, it can be promoted to an immutable
block. This means that its content can be referenced by future blocks block. This means that its content can be referenced by future blocks
...@@ -411,6 +432,63 @@ class PrefixCachingBlockAllocator(BlockAllocator): ...@@ -411,6 +432,63 @@ class PrefixCachingBlockAllocator(BlockAllocator):
if ids != [] if ids != []
]) ])
def get_num_blocks_touched(self,
blocks: List[Block],
num_lookahead_slots: int = 0) -> int:
"""Determine the number of blocks that will be touched by
swapping in/out the given blocks from certain sequence
group with the provided num_lookahead_slots.
Args:
blocks (List[Block]): The potential blocks to swap.
num_lookahead_slots (int): number of lookahead slots (0 for
swap out).
Returns:
int: the number of blocks that will be touched by
swapping in/out the given blocks and num_lookahead_slots.
"""
num_touched_blocks = 0
for block in blocks:
if not block.is_full:
if block.num_empty_slots >= num_lookahead_slots:
num_touched_blocks += 1
else:
num_touched_blocks += cdiv(
num_lookahead_slots - block.num_empty_slots,
self._block_size)
else:
if not self.is_block_cached(block):
num_touched_blocks += 1
return num_touched_blocks
def swap_out(self, blocks: List[Block]) -> None:
"""Execute the swap out actions. Basically just free the
given blocks.
Args:
blocks: List of blocks to be swapped out.
"""
for block in blocks:
self.free(block)
def swap_in(self, blocks: List[Block]) -> None:
"""Execute the swap int actions. Change the block id from
old allocator to current allocator for each block to finish
the block table update.
Args:
blocks: List of blocks to be swapped in.
"""
for block in blocks:
if block.is_full:
alloc = self.allocate_immutable(block.prev_block,
block.token_ids)
else:
alloc = self.allocate_mutable(block.prev_block)
alloc.append_token_ids(block.token_ids)
block.block_id = alloc.block_id
class PrefixCachingBlock(Block): class PrefixCachingBlock(Block):
"""A block implementation that supports prefix caching. """A block implementation that supports prefix caching.
......
...@@ -541,11 +541,7 @@ class BlockSpaceManagerV1(BlockSpaceManager): ...@@ -541,11 +541,7 @@ class BlockSpaceManagerV1(BlockSpaceManager):
return new_block_table return new_block_table
def swap_in(self, def swap_in(self, seq_group: SequenceGroup) -> List[Tuple[int, int]]:
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> List[Tuple[int, int]]:
assert (num_lookahead_slots == 0
), "BlockSpaceManagerV1 does not support lookahead allocation"
request_id = seq_group.request_id request_id = seq_group.request_id
......
"""A block manager that manages token blocks.""" """A block manager that manages token blocks."""
from itertools import chain
from typing import Dict, List, Optional from typing import Dict, List, Optional
from typing import Sequence as GenericSequence from typing import Sequence as GenericSequence
from typing import Tuple from typing import Tuple
from vllm.core.block.block_table import BlockTable from vllm.core.block.block_table import BlockTable
from vllm.core.block.cpu_gpu_block_allocator import CpuGpuBlockAllocator from vllm.core.block.cpu_gpu_block_allocator import CpuGpuBlockAllocator
from vllm.core.block.interfaces import Block
from vllm.core.block.utils import check_no_caching_or_swa_for_blockmgr_encdec from vllm.core.block.utils import check_no_caching_or_swa_for_blockmgr_encdec
from vllm.core.interfaces import AllocStatus, BlockSpaceManager from vllm.core.interfaces import AllocStatus, BlockSpaceManager
from vllm.sequence import Sequence, SequenceGroup, SequenceStatus from vllm.sequence import Sequence, SequenceGroup, SequenceStatus
...@@ -217,7 +219,6 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -217,7 +219,6 @@ class BlockSpaceManagerV2(BlockSpaceManager):
num_lookahead_slots=num_lookahead_slots, num_lookahead_slots=num_lookahead_slots,
num_computed_slots=seq.data.get_num_computed_tokens(), num_computed_slots=seq.data.get_num_computed_tokens(),
) )
# Return any new copy-on-writes. # Return any new copy-on-writes.
new_cows = self.block_allocator.clear_copy_on_writes() new_cows = self.block_allocator.clear_copy_on_writes()
return new_cows return new_cows
...@@ -297,20 +298,145 @@ class BlockSpaceManagerV2(BlockSpaceManager): ...@@ -297,20 +298,145 @@ class BlockSpaceManagerV2(BlockSpaceManager):
def can_swap_in(self, seq_group: SequenceGroup, def can_swap_in(self, seq_group: SequenceGroup,
num_lookahead_slots: int) -> AllocStatus: num_lookahead_slots: int) -> AllocStatus:
return AllocStatus.LATER """Returns the AllocStatus for the given sequence_group
with num_lookahead_slots.
Args:
sequence_group (SequenceGroup): The sequence group to swap in.
num_lookahead_slots (int): Number of lookahead slots used in
speculative decoding, default to 0.
Returns:
AllocStatus: The AllocStatus for the given sequence group.
"""
return self._can_swap(seq_group, Device.GPU, SequenceStatus.SWAPPED,
num_lookahead_slots)
def swap_in(self, seq_group: SequenceGroup) -> List[Tuple[int, int]]:
"""Returns the block id mapping (from CPU to GPU) generated by
swapping in the given seq_group with num_lookahead_slots.
def swap_in(self, seq_group: SequenceGroup, Args:
num_lookahead_slots: int) -> List[Tuple[int, int]]: seq_group (SequenceGroup): The sequence group to swap in.
raise NotImplementedError
Returns:
List[Tuple[int, int]]: The mapping of swapping block from CPU
to GPU.
"""
blocks = self._get_blocks_for_swap(seq_group, SequenceStatus.SWAPPED)
current_swap_mapping = self.block_allocator.swap(
blocks=blocks, source_device=Device.CPU, dest_device=Device.GPU)
block_number_mapping = {
self.block_allocator.get_physical_block_id(Device.CPU,
cpu_block_id):
self.block_allocator.get_physical_block_id(Device.GPU,
gpu_block_id)
for cpu_block_id, gpu_block_id in current_swap_mapping.items()
}
# convert to list of tuples once here
return list(block_number_mapping.items())
def can_swap_out(self, seq_group: SequenceGroup) -> bool: def can_swap_out(self, seq_group: SequenceGroup) -> bool:
"""Returns whether we can swap out the given sequence_group
with num_lookahead_slots.
Args:
seq_group (SequenceGroup): The sequence group to swap in.
num_lookahead_slots (int): Number of lookahead slots used in
speculative decoding, default to 0.
Returns:
bool: Whether it's possible to swap out current sequence group.
"""
alloc_status = self._can_swap(seq_group, Device.CPU,
SequenceStatus.RUNNING)
if alloc_status == AllocStatus.OK:
return True
return False return False
def swap_out(self, seq_group: SequenceGroup) -> List[Tuple[int, int]]: def swap_out(self, sequence_group: SequenceGroup) -> List[Tuple[int, int]]:
raise NotImplementedError """Returns the block id mapping (from GPU to CPU) generated by
swapping out the given sequence_group with num_lookahead_slots.
Args:
sequence_group (SequenceGroup): The sequence group to swap in.
Returns:
List[Tuple[int, int]]: The mapping of swapping block from
GPU to CPU.
"""
blocks = self._get_blocks_for_swap(sequence_group,
SequenceStatus.RUNNING)
current_swap_mapping = self.block_allocator.swap(
blocks=blocks, source_device=Device.GPU, dest_device=Device.CPU)
block_number_mapping = {
self.block_allocator.get_physical_block_id(Device.GPU,
gpu_block_id):
self.block_allocator.get_physical_block_id(Device.CPU,
cpu_block_id)
for gpu_block_id, cpu_block_id in current_swap_mapping.items()
}
# convert to list of tuples once here
return list(block_number_mapping.items())
def get_num_free_gpu_blocks(self) -> int: def get_num_free_gpu_blocks(self) -> int:
return self.block_allocator.get_num_free_blocks(Device.GPU) return self.block_allocator.get_num_free_blocks(Device.GPU)
def get_num_free_cpu_blocks(self) -> int: def get_num_free_cpu_blocks(self) -> int:
return self.block_allocator.get_num_free_blocks(Device.CPU) return self.block_allocator.get_num_free_blocks(Device.CPU)
def _can_swap(self,
seq_group: SequenceGroup,
device: Device,
status: SequenceStatus,
num_lookahead_slots: int = 0) -> AllocStatus:
"""Returns the AllocStatus for swapping in/out the given sequence_group
on to the 'device'.
Args:
sequence_group (SequenceGroup): The sequence group to swap in.
device (Device): device to swap the 'seq_group' on.
status (SequenceStatus): The status of sequence which is needed
for action. RUNNING for swap out and SWAPPED for swap in
num_lookahead_slots (int): Number of lookahead slots used in
speculative decoding, default to 0.
Returns:
AllocStatus: The AllocStatus for swapping in/out the given
sequence_group on to the 'device'.
"""
blocks = self._get_blocks_for_swap(seq_group, status)
num_blocks_touched = self.block_allocator.get_num_blocks_touched(
blocks, device, num_lookahead_slots)
watermark_blocks = 0
if device == Device.GPU:
watermark_blocks = self.watermark_blocks
if self.block_allocator.get_num_total_blocks(
device) < num_blocks_touched:
return AllocStatus.NEVER
elif self.block_allocator.get_num_free_blocks(
device) - num_blocks_touched >= watermark_blocks:
return AllocStatus.OK
else:
return AllocStatus.LATER
def _get_blocks_for_swap(self, seq_group: SequenceGroup,
status: SequenceStatus) -> List[Block]:
"""Returns the list of blocks those are touched by the seq_group
Args:
sequence_group (SequenceGroup): The sequence group to swap in.
status (SequenceStatus): The status of sequence which is needed
for action. RUNNING for swap out and SWAPPED for swap in
Returns:
The list of blocks those are touched by the seq_group.
"""
blocks: Dict[int, List[Block]] = {}
for seq in seq_group.get_seqs(status=status):
block_table = self.block_tables[seq.seq_id]
if block_table.blocks is not None:
blocks[seq.seq_id] = block_table.blocks
combined_blocks = list(chain(*blocks.values()))
return combined_blocks
...@@ -46,8 +46,7 @@ class EmbeddingModelBlockSpaceManager(BlockSpaceManager): ...@@ -46,8 +46,7 @@ class EmbeddingModelBlockSpaceManager(BlockSpaceManager):
num_lookahead_slots: int) -> AllocStatus: num_lookahead_slots: int) -> AllocStatus:
return AllocStatus.OK return AllocStatus.OK
def swap_in(self, seq_group: SequenceGroup, def swap_in(self, seq_group: SequenceGroup) -> List[Tuple[int, int]]:
num_lookahead_slots: int) -> List[Tuple[int, int]]:
return None # type: ignore return None # type: ignore
def can_swap_out(self, seq_group: SequenceGroup) -> bool: def can_swap_out(self, seq_group: SequenceGroup) -> bool:
......
import enum import enum
from abc import ABC, abstractmethod, abstractproperty from abc import ABC, abstractmethod
from typing import OrderedDict from typing import OrderedDict
from vllm.block import PhysicalTokenBlock from vllm.block import PhysicalTokenBlock
...@@ -44,7 +44,8 @@ class Evictor(ABC): ...@@ -44,7 +44,8 @@ class Evictor(ABC):
""" """
pass pass
@abstractproperty @property
@abstractmethod
def num_blocks(self) -> int: def num_blocks(self) -> int:
pass pass
......
import enum import enum
from abc import ABC, abstractmethod, abstractproperty from abc import ABC, abstractmethod
from typing import OrderedDict, Tuple from typing import OrderedDict, Tuple
...@@ -46,7 +46,8 @@ class Evictor(ABC): ...@@ -46,7 +46,8 @@ class Evictor(ABC):
"""Remove a given block id from the cache.""" """Remove a given block id from the cache."""
pass pass
@abstractproperty @property
@abstractmethod
def num_blocks(self) -> int: def num_blocks(self) -> int:
pass pass
......
...@@ -73,8 +73,7 @@ class BlockSpaceManager(ABC): ...@@ -73,8 +73,7 @@ class BlockSpaceManager(ABC):
pass pass
@abstractmethod @abstractmethod
def swap_in(self, seq_group: SequenceGroup, def swap_in(self, seq_group: SequenceGroup) -> List[Tuple[int, int]]:
num_lookahead_slots: int) -> List[Tuple[int, int]]:
pass pass
@abstractmethod @abstractmethod
......
...@@ -297,6 +297,8 @@ class Scheduler: ...@@ -297,6 +297,8 @@ class Scheduler:
self.prev_prompt = False self.prev_prompt = False
# Latency of the last prompt step # Latency of the last prompt step
self.last_prompt_latency = 0.0 self.last_prompt_latency = 0.0
# preemption mode, RECOMPUTE or SWAP
self.user_specified_preemption_mode = scheduler_config.preemption_mode
# The following field is test-only. It is used to inject artificial # The following field is test-only. It is used to inject artificial
# preemption. # preemption.
...@@ -421,7 +423,9 @@ class Scheduler: ...@@ -421,7 +423,9 @@ class Scheduler:
num_running_seqs = seq_group.get_max_num_running_seqs() num_running_seqs = seq_group.get_max_num_running_seqs()
budget.subtract_num_seqs(seq_group.request_id, budget.subtract_num_seqs(seq_group.request_id,
num_running_seqs) num_running_seqs)
if curr_loras is not None and seq_group.lora_int_id > 0:
if (curr_loras is not None and seq_group.lora_int_id > 0
and seq_group.lora_int_id in curr_loras):
curr_loras.remove(seq_group.lora_int_id) curr_loras.remove(seq_group.lora_int_id)
if running_queue: if running_queue:
...@@ -522,7 +526,9 @@ class Scheduler: ...@@ -522,7 +526,9 @@ class Scheduler:
seq_group = swapped_queue[0] seq_group = swapped_queue[0]
# If the sequence group cannot be swapped in, stop. # If the sequence group cannot be swapped in, stop.
alloc_status = self.block_manager.can_swap_in(seq_group) is_prefill = seq_group.is_prefill()
alloc_status = self.block_manager.can_swap_in(
seq_group, self._get_num_lookahead_slots(is_prefill))
if alloc_status == AllocStatus.LATER: if alloc_status == AllocStatus.LATER:
break break
elif alloc_status == AllocStatus.NEVER: elif alloc_status == AllocStatus.NEVER:
...@@ -901,7 +907,8 @@ class Scheduler: ...@@ -901,7 +907,8 @@ class Scheduler:
blocks_to_swap_out=running_scheduled.blocks_to_swap_out, blocks_to_swap_out=running_scheduled.blocks_to_swap_out,
blocks_to_copy=running_scheduled.blocks_to_copy + blocks_to_copy=running_scheduled.blocks_to_copy +
swapped_in.blocks_to_copy, swapped_in.blocks_to_copy,
ignored_seq_groups=prefills.ignored_seq_groups, ignored_seq_groups=prefills.ignored_seq_groups +
swapped_in.infeasible_seq_groups,
num_lookahead_slots=running_scheduled.num_lookahead_slots, num_lookahead_slots=running_scheduled.num_lookahead_slots,
running_queue_size=len(self.running), running_queue_size=len(self.running),
preempted=(len(running_scheduled.preempted) + preempted=(len(running_scheduled.preempted) +
...@@ -1067,12 +1074,17 @@ class Scheduler: ...@@ -1067,12 +1074,17 @@ class Scheduler:
# over sequence groups with a single sequence. # over sequence groups with a single sequence.
# TODO(woosuk): Support recomputation for sequence groups with multiple # TODO(woosuk): Support recomputation for sequence groups with multiple
# sequences. This may require a more sophisticated CUDA kernel. # sequences. This may require a more sophisticated CUDA kernel.
if preemption_mode is None: if self.user_specified_preemption_mode is None:
if seq_group.get_max_num_running_seqs() == 1: if seq_group.get_max_num_running_seqs() == 1:
preemption_mode = PreemptionMode.RECOMPUTE preemption_mode = PreemptionMode.RECOMPUTE
else: else:
preemption_mode = PreemptionMode.SWAP preemption_mode = PreemptionMode.SWAP
elif self.user_specified_preemption_mode == "swap":
preemption_mode = PreemptionMode.SWAP
else:
preemption_mode = PreemptionMode.RECOMPUTE
if self.num_cumulative_preemption % 50 == 0: if self.num_cumulative_preemption % 50 == 0:
logger.warning( logger.warning(
"Sequence group %s is preempted by %s mode because there is " "Sequence group %s is preempted by %s mode because there is "
......
...@@ -6,16 +6,21 @@ import torch.distributed as dist ...@@ -6,16 +6,21 @@ import torch.distributed as dist
from torch.distributed import ProcessGroup from torch.distributed import ProcessGroup
import vllm.envs as envs import vllm.envs as envs
from vllm import _custom_ops as ops
from vllm.distributed.device_communicators.custom_all_reduce_utils import ( from vllm.distributed.device_communicators.custom_all_reduce_utils import (
gpu_p2p_access_check) gpu_p2p_access_check)
from vllm.distributed.parallel_state import ( from vllm.distributed.parallel_state import (
get_local_rank, get_tensor_model_parallel_cpu_group) get_local_rank, get_tensor_model_parallel_cpu_group, is_in_the_same_node)
from vllm.logger import init_logger from vllm.logger import init_logger
try: try:
import pynvml import pynvml
from vllm._C import custom_ar # Simulate ImportError if custom_ar ops are not supported.
if not ops.is_custom_op_supported("_C_custom_ar::meta_size"):
raise ImportError("custom_ar", __file__)
custom_ar = True
@contextmanager @contextmanager
def _nvml(): def _nvml():
...@@ -27,7 +32,7 @@ try: ...@@ -27,7 +32,7 @@ try:
except ImportError: except ImportError:
# For AMD GPUs # For AMD GPUs
custom_ar = None custom_ar = False
pynvml = None pynvml = None
@contextmanager @contextmanager
...@@ -97,7 +102,7 @@ class CustomAllreduce: ...@@ -97,7 +102,7 @@ class CustomAllreduce:
self._IS_CAPTURING = False self._IS_CAPTURING = False
self.disabled = True self.disabled = True
if custom_ar is None: if not custom_ar:
# disable because of missing custom allreduce library # disable because of missing custom allreduce library
# e.g. in a non-cuda environment # e.g. in a non-cuda environment
return return
...@@ -108,6 +113,13 @@ class CustomAllreduce: ...@@ -108,6 +113,13 @@ class CustomAllreduce:
assert dist.get_backend(group) != dist.Backend.NCCL, ( assert dist.get_backend(group) != dist.Backend.NCCL, (
"CustomAllreduce should be attached to a non-NCCL group.") "CustomAllreduce should be attached to a non-NCCL group.")
if not is_in_the_same_node(group):
# No need to initialize custom allreduce for multi-node case.
logger.warning(
"Custom allreduce is disabled because this process group"
" spans across nodes.")
return
rank = dist.get_rank(group=self.group) rank = dist.get_rank(group=self.group)
world_size = dist.get_world_size(group=self.group) world_size = dist.get_world_size(group=self.group)
if world_size == 1: if world_size == 1:
...@@ -175,7 +187,7 @@ class CustomAllreduce: ...@@ -175,7 +187,7 @@ class CustomAllreduce:
# meta data composes of two parts: meta data for synchronization # meta data composes of two parts: meta data for synchronization
# (256 bytes) and a temporary buffer for storing intermediate # (256 bytes) and a temporary buffer for storing intermediate
# allreduce results. # allreduce results.
self.meta = torch.zeros(custom_ar.meta_size() + max_size, self.meta = torch.zeros(ops.meta_size() + max_size,
dtype=torch.uint8, dtype=torch.uint8,
device=self.device) device=self.device)
# This is a pre-registered IPC buffer. In eager mode, input tensors # This is a pre-registered IPC buffer. In eager mode, input tensors
...@@ -196,9 +208,8 @@ class CustomAllreduce: ...@@ -196,9 +208,8 @@ class CustomAllreduce:
self.world_size = world_size self.world_size = world_size
handles, offsets = self._get_ipc_meta(self.meta) handles, offsets = self._get_ipc_meta(self.meta)
self.full_nvlink = full_nvlink self.full_nvlink = full_nvlink
self._ptr = custom_ar.init_custom_ar(self.meta, self.rank_data, self._ptr = ops.init_custom_ar(self.meta, self.rank_data, handles,
handles, offsets, rank, offsets, rank, self.full_nvlink)
self.full_nvlink)
self.register_buffer(self.buffer) self.register_buffer(self.buffer)
@contextmanager @contextmanager
...@@ -252,31 +263,31 @@ class CustomAllreduce: ...@@ -252,31 +263,31 @@ class CustomAllreduce:
def register_buffer(self, inp: torch.Tensor): def register_buffer(self, inp: torch.Tensor):
handles, offsets = self._get_ipc_meta(inp) handles, offsets = self._get_ipc_meta(inp)
custom_ar.register_buffer(self._ptr, inp, handles, offsets) ops.register_buffer(self._ptr, inp, handles, offsets)
def register_graph_buffers(self): def register_graph_buffers(self):
handle, offset = custom_ar.get_graph_buffer_ipc_meta(self._ptr) handle, offset = ops.get_graph_buffer_ipc_meta(self._ptr)
handles, offsets = self._gather_ipc_meta((bytes(handle), offset)) handles, offsets = self._gather_ipc_meta((bytes(handle), offset))
logger.info("Registering %d cuda graph addresses", len(offset)) logger.info("Registering %d cuda graph addresses", len(offset))
custom_ar.register_graph_buffers(self._ptr, handles, offsets) ops.register_graph_buffers(self._ptr, handles, offsets)
def should_custom_ar(self, inp: torch.Tensor): def should_custom_ar(self, inp: torch.Tensor):
return custom_ar.should_custom_ar(inp, self.max_size, self.world_size, return ops.should_custom_ar(inp, self.max_size, self.world_size,
self.full_nvlink) self.full_nvlink)
# all reduce, assuming inp tensor is IPC registered with register_buffer, # all reduce, assuming inp tensor is IPC registered with register_buffer,
# or, in the context of cuda graphs, register_graph_buffers # or, in the context of cuda graphs, register_graph_buffers
def all_reduce_reg(self, inp: torch.Tensor, out: torch.Tensor = None): def all_reduce_reg(self, inp: torch.Tensor, out: torch.Tensor = None):
if out is None: if out is None:
out = torch.empty_like(inp) out = torch.empty_like(inp)
custom_ar.all_reduce_reg(self._ptr, inp, out) ops.all_reduce_reg(self._ptr, inp, out)
return out return out
# all reduce, assuming inp tensor is NOT IPC registered # all reduce, assuming inp tensor is NOT IPC registered
def all_reduce_unreg(self, inp: torch.Tensor, out: torch.Tensor = None): def all_reduce_unreg(self, inp: torch.Tensor, out: torch.Tensor = None):
if out is None: if out is None:
out = torch.empty_like(inp) out = torch.empty_like(inp)
custom_ar.all_reduce_unreg(self._ptr, inp, self.buffer, out) ops.all_reduce_unreg(self._ptr, inp, self.buffer, out)
return out return out
def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]: def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]:
...@@ -304,7 +315,7 @@ class CustomAllreduce: ...@@ -304,7 +315,7 @@ class CustomAllreduce:
def close(self): def close(self):
if not self.disabled and self._ptr: if not self.disabled and self._ptr:
custom_ar.dispose(self._ptr) ops.dispose(self._ptr)
self._ptr = 0 self._ptr = 0
def __del__(self): def __del__(self):
......
...@@ -166,7 +166,7 @@ def gpu_p2p_access_check(i: int, j: int) -> bool: ...@@ -166,7 +166,7 @@ def gpu_p2p_access_check(i: int, j: int) -> bool:
and (not os.path.exists(path))): and (not os.path.exists(path))):
# only the local master process (with local_rank == 0) can # only the local master process (with local_rank == 0) can
# enter this block to calculate the cache # enter this block to calculate the cache
logger.info("generating GPU P2P access cache for in %s", path) logger.info("generating GPU P2P access cache in %s", path)
cache = {} cache = {}
for _i in range(num_dev): for _i in range(num_dev):
for _j in range(num_dev): for _j in range(num_dev):
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/parallel_state.py # https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/parallel_state.py
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. # Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
"""Tensor and pipeline parallel groups.""" """Tensor and pipeline parallel groups."""
import contextlib
from multiprocessing import resource_tracker, shared_memory
from typing import List, Optional from typing import List, Optional
import torch import torch
...@@ -376,3 +378,68 @@ def destroy_model_parallel(): ...@@ -376,3 +378,68 @@ def destroy_model_parallel():
_PP_DEVICE_GROUP = None _PP_DEVICE_GROUP = None
global _PP_GLOBAL_RANKS global _PP_GLOBAL_RANKS
_PP_GLOBAL_RANKS = None _PP_GLOBAL_RANKS = None
def is_in_the_same_node(pg: ProcessGroup):
"""
This is a collective operation that checks if all processes in the group
are in the same node. It tests if all processes are attached to the same
memory system (shared access to shared memory).
"""
assert torch.distributed.get_backend(
pg) != torch.distributed.Backend.NCCL, (
"is_in_the_same_node should be tested with a non-NCCL group.")
# local rank inside the group
rank = torch.distributed.get_rank(group=pg)
world_size = torch.distributed.get_world_size(group=pg)
# local tensor in each process to store the result
is_in_the_same_node = torch.tensor([0] * world_size, dtype=torch.int32)
# global ranks of the processes in the group
ranks = torch.distributed.get_process_group_ranks(pg)
magic_message = b"magic_message"
shm = None
try:
with contextlib.suppress(OSError):
if rank == 0:
# create a shared memory segment
shm = shared_memory.SharedMemory(create=True, size=128)
shm.buf[:len(magic_message)] = magic_message
torch.distributed.broadcast_object_list([shm.name],
src=ranks[0],
group=pg)
is_in_the_same_node[0] = 1
else:
# try to open the shared memory segment
recv = [None]
torch.distributed.broadcast_object_list(recv,
src=ranks[0],
group=pg)
name = recv[0]
shm = shared_memory.SharedMemory(name=name)
if shm.buf[:len(magic_message)] == magic_message:
is_in_the_same_node[rank] = 1
except Exception as e:
logger.error("Error ignored in is_in_the_same_node: %s", e)
finally:
if shm:
shm.close()
torch.distributed.barrier(group=pg)
# clean up the shared memory segment
with contextlib.suppress(OSError):
if rank == 0:
if shm:
shm.unlink()
else:
if shm:
# fix to https://stackoverflow.com/q/62748654/9191338
resource_tracker.unregister(
shm._name, "shared_memory") # type: ignore[attr-defined]
torch.distributed.all_reduce(is_in_the_same_node, group=pg)
return is_in_the_same_node.sum().item() == world_size
import argparse import argparse
import dataclasses import dataclasses
import json import json
import warnings
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Optional, Tuple, Union from typing import List, Optional, Tuple, Union
...@@ -47,11 +48,12 @@ class EngineArgs: ...@@ -47,11 +48,12 @@ class EngineArgs:
gpu_memory_utilization: float = 0.90 gpu_memory_utilization: float = 0.90
max_num_batched_tokens: Optional[int] = None max_num_batched_tokens: Optional[int] = None
max_num_seqs: int = 256 max_num_seqs: int = 256
max_logprobs: int = 5 # OpenAI default value max_logprobs: int = 20 # Default value for OpenAI Chat Completions API
disable_log_stats: bool = False disable_log_stats: bool = False
revision: Optional[str] = None revision: Optional[str] = None
code_revision: Optional[str] = None code_revision: Optional[str] = None
rope_scaling: Optional[dict] = None rope_scaling: Optional[dict] = None
rope_theta: Optional[float] = None
tokenizer_revision: Optional[str] = None tokenizer_revision: Optional[str] = None
quantization: Optional[str] = None quantization: Optional[str] = None
enforce_eager: bool = False enforce_eager: bool = False
...@@ -67,19 +69,24 @@ class EngineArgs: ...@@ -67,19 +69,24 @@ class EngineArgs:
fully_sharded_loras: bool = False fully_sharded_loras: bool = False
lora_extra_vocab_size: int = 256 lora_extra_vocab_size: int = 256
long_lora_scaling_factors: Optional[Tuple[float]] = None long_lora_scaling_factors: Optional[Tuple[float]] = None
lora_dtype = 'auto' lora_dtype: str = 'auto'
max_cpu_loras: Optional[int] = None max_cpu_loras: Optional[int] = None
device: str = 'auto' device: str = 'auto'
ray_workers_use_nsight: bool = False ray_workers_use_nsight: bool = False
num_gpu_blocks_override: Optional[int] = None num_gpu_blocks_override: Optional[int] = None
num_lookahead_slots: int = 0 num_lookahead_slots: int = 0
model_loader_extra_config: Optional[dict] = None model_loader_extra_config: Optional[dict] = None
preemption_mode: Optional[str] = None
# Related to Vision-language models such as llava # Related to Vision-language models such as llava
image_input_type: Optional[str] = None image_input_type: Optional[str] = None
image_token_id: Optional[int] = None image_token_id: Optional[int] = None
image_input_shape: Optional[str] = None image_input_shape: Optional[str] = None
image_feature_size: Optional[int] = None image_feature_size: Optional[int] = None
image_processor: Optional[str] = None
image_processor_revision: Optional[str] = None
disable_image_processor: bool = False
scheduler_delay_factor: float = 0.0 scheduler_delay_factor: float = 0.0
enable_chunked_prefill: bool = False enable_chunked_prefill: bool = False
...@@ -92,10 +99,59 @@ class EngineArgs: ...@@ -92,10 +99,59 @@ class EngineArgs:
ngram_prompt_lookup_max: Optional[int] = None ngram_prompt_lookup_max: Optional[int] = None
ngram_prompt_lookup_min: Optional[int] = None ngram_prompt_lookup_min: Optional[int] = None
qlora_adapter_name_or_path: Optional[str] = None
def __post_init__(self): def __post_init__(self):
if self.tokenizer is None: if self.tokenizer is None:
self.tokenizer = self.model self.tokenizer = self.model
@staticmethod
def add_cli_args_for_vlm(
parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
parser.add_argument('--image-input-type',
type=nullable_str,
default=None,
choices=[
t.name.lower()
for t in VisionLanguageConfig.ImageInputType
],
help=('The image input type passed into vLLM.'))
parser.add_argument('--image-token-id',
type=int,
default=None,
help=('Input id for image token.'))
parser.add_argument(
'--image-input-shape',
type=nullable_str,
default=None,
help=('The biggest image input shape (worst for memory footprint) '
'given an input type. Only used for vLLM\'s profile_run.'))
parser.add_argument(
'--image-feature-size',
type=int,
default=None,
help=('The image feature size along the context dimension.'))
parser.add_argument(
'--image-processor',
type=str,
default=EngineArgs.image_processor,
help='Name or path of the huggingface image processor to use. '
'If unspecified, model name or path will be used.')
parser.add_argument(
'--image-processor-revision',
type=str,
default=None,
help='Revision of the huggingface image processor version to use. '
'It can be a branch name, a tag name, or a commit id. '
'If unspecified, will use the default version.')
parser.add_argument(
'--disable-image-processor',
action='store_true',
help='Disables the use of image processor, even if one is defined '
'for the model on huggingface.')
return parser
@staticmethod @staticmethod
def add_cli_args( def add_cli_args(
parser: argparse.ArgumentParser) -> argparse.ArgumentParser: parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
...@@ -111,7 +167,8 @@ class EngineArgs: ...@@ -111,7 +167,8 @@ class EngineArgs:
'--tokenizer', '--tokenizer',
type=nullable_str, type=nullable_str,
default=EngineArgs.tokenizer, default=EngineArgs.tokenizer,
help='Name or path of the huggingface tokenizer to use.') help='Name or path of the huggingface tokenizer to use. '
'If unspecified, model name or path will be used.')
parser.add_argument( parser.add_argument(
'--skip-tokenizer-init', '--skip-tokenizer-init',
action='store_true', action='store_true',
...@@ -134,9 +191,9 @@ class EngineArgs: ...@@ -134,9 +191,9 @@ class EngineArgs:
'--tokenizer-revision', '--tokenizer-revision',
type=nullable_str, type=nullable_str,
default=None, default=None,
help='The specific tokenizer version to use. It can be a branch ' help='Revision of the huggingface tokenizer to use. '
'name, a tag name, or a commit id. If unspecified, will use ' 'It can be a branch name, a tag name, or a commit id. '
'the default version.') 'If unspecified, will use the default version.')
parser.add_argument( parser.add_argument(
'--tokenizer-mode', '--tokenizer-mode',
type=str, type=str,
...@@ -159,7 +216,8 @@ class EngineArgs: ...@@ -159,7 +216,8 @@ class EngineArgs:
type=str, type=str,
default=EngineArgs.load_format, default=EngineArgs.load_format,
choices=[ choices=[
'auto', 'pt', 'safetensors', 'npcache', 'dummy', 'tensorizer' 'auto', 'pt', 'safetensors', 'npcache', 'dummy', 'tensorizer',
'bitsandbytes'
], ],
help='The format of the model weights to load.\n\n' help='The format of the model weights to load.\n\n'
'* "auto" will try to load the weights in the safetensors format ' '* "auto" will try to load the weights in the safetensors format '
...@@ -173,7 +231,9 @@ class EngineArgs: ...@@ -173,7 +231,9 @@ class EngineArgs:
'which is mainly for profiling.\n' 'which is mainly for profiling.\n'
'* "tensorizer" will load the weights using tensorizer from ' '* "tensorizer" will load the weights using tensorizer from '
'CoreWeave. See the Tensorize vLLM Model script in the Examples' 'CoreWeave. See the Tensorize vLLM Model script in the Examples'
'section for more information.\n') 'section for more information.\n'
'* "bitsandbytes" will load the weights using bitsandbytes '
'quantization.\n')
parser.add_argument( parser.add_argument(
'--dtype', '--dtype',
type=str, type=str,
...@@ -341,6 +401,12 @@ class EngineArgs: ...@@ -341,6 +401,12 @@ class EngineArgs:
type=json.loads, type=json.loads,
help='RoPE scaling configuration in JSON format. ' help='RoPE scaling configuration in JSON format. '
'For example, {"type":"dynamic","factor":2.0}') 'For example, {"type":"dynamic","factor":2.0}')
parser.add_argument('--rope-theta',
default=None,
type=float,
help='RoPE theta. Use with `rope_scaling`. In '
'some cases, changing the RoPE theta improves the '
'performance of the scaled model.')
parser.add_argument('--enforce-eager', parser.add_argument('--enforce-eager',
action='store_true', action='store_true',
help='Always use eager-mode PyTorch. If False, ' help='Always use eager-mode PyTorch. If False, '
...@@ -440,31 +506,10 @@ class EngineArgs: ...@@ -440,31 +506,10 @@ class EngineArgs:
default=EngineArgs.device, default=EngineArgs.device,
choices=["auto", "cuda", "neuron", "cpu"], choices=["auto", "cuda", "neuron", "cpu"],
help='Device type for vLLM execution.') help='Device type for vLLM execution.')
# Related to Vision-language models such as llava # Related to Vision-language models such as llava
parser.add_argument( parser = EngineArgs.add_cli_args_for_vlm(parser)
'--image-input-type',
type=nullable_str,
default=None,
choices=[
t.name.lower() for t in VisionLanguageConfig.ImageInputType
],
help=('The image input type passed into vLLM. '
'Should be one of "pixel_values" or "image_features".'))
parser.add_argument('--image-token-id',
type=int,
default=None,
help=('Input id for image token.'))
parser.add_argument(
'--image-input-shape',
type=nullable_str,
default=None,
help=('The biggest image input shape (worst for memory footprint) '
'given an input type. Only used for vLLM\'s profile_run.'))
parser.add_argument(
'--image-feature-size',
type=int,
default=None,
help=('The image feature size along the context dimension.'))
parser.add_argument( parser.add_argument(
'--scheduler-delay-factor', '--scheduler-delay-factor',
type=float, type=float,
...@@ -483,7 +528,6 @@ class EngineArgs: ...@@ -483,7 +528,6 @@ class EngineArgs:
default=EngineArgs.speculative_model, default=EngineArgs.speculative_model,
help= help=
'The name of the draft model to be used in speculative decoding.') 'The name of the draft model to be used in speculative decoding.')
parser.add_argument( parser.add_argument(
'--num-speculative-tokens', '--num-speculative-tokens',
type=int, type=int,
...@@ -528,6 +572,13 @@ class EngineArgs: ...@@ -528,6 +572,13 @@ class EngineArgs:
'corresponding to the chosen load_format. ' 'corresponding to the chosen load_format. '
'This should be a JSON string that will be ' 'This should be a JSON string that will be '
'parsed into a dictionary.') 'parsed into a dictionary.')
parser.add_argument(
'--preemption_mode',
type=str,
default=None,
help='If \'recompute\', the engine performs preemption by block '
'swapping; If \'swap\', the engine performs preemption by block '
'swapping.')
parser.add_argument( parser.add_argument(
"--served-model-name", "--served-model-name",
...@@ -543,7 +594,10 @@ class EngineArgs: ...@@ -543,7 +594,10 @@ class EngineArgs:
"will also be used in `model_name` tag content of " "will also be used in `model_name` tag content of "
"prometheus metrics, if multiple names provided, metrics" "prometheus metrics, if multiple names provided, metrics"
"tag will take the first one.") "tag will take the first one.")
parser.add_argument('--qlora-adapter-name-or-path',
type=str,
default=None,
help='Name or path of the QLoRA adapter.')
return parser return parser
@classmethod @classmethod
...@@ -555,34 +609,66 @@ class EngineArgs: ...@@ -555,34 +609,66 @@ class EngineArgs:
return engine_args return engine_args
def create_engine_config(self, ) -> EngineConfig: def create_engine_config(self, ) -> EngineConfig:
device_config = DeviceConfig(self.device)
# bitsandbytes quantization needs a specific model loader
# so we make sure the quant method and the load format are consistent
if (self.quantization == "bitsandbytes" or
self.qlora_adapter_name_or_path is not None) and \
self.load_format != "bitsandbytes":
raise ValueError(
"BitsAndBytes quantization and QLoRA adapter only support "
f"'bitsandbytes' load format, but got {self.load_format}")
if (self.load_format == "bitsandbytes" or
self.qlora_adapter_name_or_path is not None) and \
self.quantization != "bitsandbytes":
raise ValueError(
"BitsAndBytes load format and QLoRA adapter only support "
f"'bitsandbytes' quantization, but got {self.quantization}")
device_config = DeviceConfig(device=self.device)
model_config = ModelConfig( model_config = ModelConfig(
self.model, self.tokenizer, self.tokenizer_mode, model=self.model,
self.trust_remote_code, self.dtype, self.seed, self.revision, tokenizer=self.tokenizer,
self.code_revision, self.rope_scaling, self.tokenizer_revision, tokenizer_mode=self.tokenizer_mode,
self.max_model_len, self.quantization, trust_remote_code=self.trust_remote_code,
self.quantization_param_path, self.enforce_eager, dtype=self.dtype,
self.max_context_len_to_capture, self.max_seq_len_to_capture, seed=self.seed,
self.max_logprobs, self.disable_sliding_window, revision=self.revision,
self.skip_tokenizer_init, self.served_model_name) code_revision=self.code_revision,
cache_config = CacheConfig(self.block_size, rope_scaling=self.rope_scaling,
self.gpu_memory_utilization, rope_theta=self.rope_theta,
self.swap_space, self.kv_cache_dtype, tokenizer_revision=self.tokenizer_revision,
self.num_gpu_blocks_override, max_model_len=self.max_model_len,
model_config.get_sliding_window(), quantization=self.quantization,
self.enable_prefix_caching) quantization_param_path=self.quantization_param_path,
enforce_eager=self.enforce_eager,
max_context_len_to_capture=self.max_context_len_to_capture,
max_seq_len_to_capture=self.max_seq_len_to_capture,
max_logprobs=self.max_logprobs,
disable_sliding_window=self.disable_sliding_window,
skip_tokenizer_init=self.skip_tokenizer_init,
served_model_name=self.served_model_name)
cache_config = CacheConfig(
block_size=self.block_size,
gpu_memory_utilization=self.gpu_memory_utilization,
swap_space=self.swap_space,
cache_dtype=self.kv_cache_dtype,
num_gpu_blocks_override=self.num_gpu_blocks_override,
sliding_window=model_config.get_sliding_window(),
enable_prefix_caching=self.enable_prefix_caching)
parallel_config = ParallelConfig( parallel_config = ParallelConfig(
self.pipeline_parallel_size, pipeline_parallel_size=self.pipeline_parallel_size,
self.tensor_parallel_size, tensor_parallel_size=self.tensor_parallel_size,
self.worker_use_ray, worker_use_ray=self.worker_use_ray,
self.max_parallel_loading_workers, max_parallel_loading_workers=self.max_parallel_loading_workers,
self.disable_custom_all_reduce, disable_custom_all_reduce=self.disable_custom_all_reduce,
TokenizerPoolConfig.create_config( tokenizer_pool_config=TokenizerPoolConfig.create_config(
self.tokenizer_pool_size, self.tokenizer_pool_size,
self.tokenizer_pool_type, self.tokenizer_pool_type,
self.tokenizer_pool_extra_config, self.tokenizer_pool_extra_config,
), ),
self.ray_workers_use_nsight, ray_workers_use_nsight=self.ray_workers_use_nsight,
distributed_executor_backend=self.distributed_executor_backend) distributed_executor_backend=self.distributed_executor_backend)
speculative_config = SpeculativeConfig.maybe_create_spec_config( speculative_config = SpeculativeConfig.maybe_create_spec_config(
...@@ -601,16 +687,17 @@ class EngineArgs: ...@@ -601,16 +687,17 @@ class EngineArgs:
) )
scheduler_config = SchedulerConfig( scheduler_config = SchedulerConfig(
self.max_num_batched_tokens, max_num_batched_tokens=self.max_num_batched_tokens,
self.max_num_seqs, max_num_seqs=self.max_num_seqs,
model_config.max_model_len, max_model_len=model_config.max_model_len,
self.use_v2_block_manager, use_v2_block_manager=self.use_v2_block_manager,
num_lookahead_slots=(self.num_lookahead_slots num_lookahead_slots=(self.num_lookahead_slots
if speculative_config is None else if speculative_config is None else
speculative_config.num_lookahead_slots), speculative_config.num_lookahead_slots),
delay_factor=self.scheduler_delay_factor, delay_factor=self.scheduler_delay_factor,
enable_chunked_prefill=self.enable_chunked_prefill, enable_chunked_prefill=self.enable_chunked_prefill,
embedding_mode=model_config.embedding_mode, embedding_mode=model_config.embedding_mode,
preemption_mode=self.preemption_mode,
) )
lora_config = LoRAConfig( lora_config = LoRAConfig(
max_lora_rank=self.max_lora_rank, max_lora_rank=self.max_lora_rank,
...@@ -622,6 +709,13 @@ class EngineArgs: ...@@ -622,6 +709,13 @@ class EngineArgs:
max_cpu_loras=self.max_cpu_loras if self.max_cpu_loras max_cpu_loras=self.max_cpu_loras if self.max_cpu_loras
and self.max_cpu_loras > 0 else None) if self.enable_lora else None and self.max_cpu_loras > 0 else None) if self.enable_lora else None
if self.qlora_adapter_name_or_path is not None and \
self.qlora_adapter_name_or_path != "":
if self.model_loader_extra_config is None:
self.model_loader_extra_config = {}
self.model_loader_extra_config[
"qlora_adapter_name_or_path"] = self.qlora_adapter_name_or_path
load_config = LoadConfig( load_config = LoadConfig(
load_format=self.load_format, load_format=self.load_format,
download_dir=self.download_dir, download_dir=self.download_dir,
...@@ -634,12 +728,27 @@ class EngineArgs: ...@@ -634,12 +728,27 @@ class EngineArgs:
raise ValueError( raise ValueError(
'Specify `image_token_id`, `image_input_shape` and ' 'Specify `image_token_id`, `image_input_shape` and '
'`image_feature_size` together with `image_input_type`.') '`image_feature_size` together with `image_input_type`.')
if self.image_processor is None:
self.image_processor = self.model
if self.disable_image_processor:
if self.image_processor != self.model:
warnings.warn(
"You've specified an image processor "
f"({self.image_processor}) but also disabled "
"it via `--disable-image-processor`.",
stacklevel=2)
self.image_processor = None
vision_language_config = VisionLanguageConfig( vision_language_config = VisionLanguageConfig(
image_input_type=VisionLanguageConfig. image_input_type=VisionLanguageConfig.
get_image_input_enum_type(self.image_input_type), get_image_input_enum_type(self.image_input_type),
image_token_id=self.image_token_id, image_token_id=self.image_token_id,
image_input_shape=str_to_int_tuple(self.image_input_shape), image_input_shape=str_to_int_tuple(self.image_input_shape),
image_feature_size=self.image_feature_size, image_feature_size=self.image_feature_size,
image_processor=self.image_processor,
image_processor_revision=self.image_processor_revision,
) )
else: else:
vision_language_config = None vision_language_config = None
...@@ -702,3 +811,7 @@ def _engine_args_parser(): ...@@ -702,3 +811,7 @@ def _engine_args_parser():
def _async_engine_args_parser(): def _async_engine_args_parser():
return AsyncEngineArgs.add_cli_args(argparse.ArgumentParser(), return AsyncEngineArgs.add_cli_args(argparse.ArgumentParser(),
async_args_only=True) async_args_only=True)
def _vlm_engine_args_parser():
return EngineArgs.add_cli_args_for_vlm(argparse.ArgumentParser())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment