Commit e63cf68a authored by chenzk's avatar chenzk
Browse files

v1.0

parents
Pipeline #2842 canceled with stages
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
from .model import SAM
from .predict import Predictor, SAM2Predictor, SAM2VideoPredictor
__all__ = "SAM", "Predictor", "SAM2Predictor", "SAM2VideoPredictor" # tuple or list
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
import math
from itertools import product
from typing import Any, Generator, List, Tuple
import numpy as np
import torch
def is_box_near_crop_edge(
boxes: torch.Tensor, crop_box: List[int], orig_box: List[int], atol: float = 20.0
) -> torch.Tensor:
"""Determines if bounding boxes are near the edge of a cropped image region using a specified tolerance."""
crop_box_torch = torch.as_tensor(crop_box, dtype=torch.float, device=boxes.device)
orig_box_torch = torch.as_tensor(orig_box, dtype=torch.float, device=boxes.device)
boxes = uncrop_boxes_xyxy(boxes, crop_box).float()
near_crop_edge = torch.isclose(boxes, crop_box_torch[None, :], atol=atol, rtol=0)
near_image_edge = torch.isclose(boxes, orig_box_torch[None, :], atol=atol, rtol=0)
near_crop_edge = torch.logical_and(near_crop_edge, ~near_image_edge)
return torch.any(near_crop_edge, dim=1)
def batch_iterator(batch_size: int, *args) -> Generator[List[Any], None, None]:
"""Yields batches of data from input arguments with specified batch size for efficient processing."""
assert args and all(len(a) == len(args[0]) for a in args), "Batched iteration must have same-size inputs."
n_batches = len(args[0]) // batch_size + int(len(args[0]) % batch_size != 0)
for b in range(n_batches):
yield [arg[b * batch_size : (b + 1) * batch_size] for arg in args]
def calculate_stability_score(masks: torch.Tensor, mask_threshold: float, threshold_offset: float) -> torch.Tensor:
"""
Computes the stability score for a batch of masks.
The stability score is the IoU between binary masks obtained by thresholding the predicted mask logits at
high and low values.
Args:
masks (torch.Tensor): Batch of predicted mask logits.
mask_threshold (float): Threshold value for creating binary masks.
threshold_offset (float): Offset applied to the threshold for creating high and low binary masks.
Returns:
(torch.Tensor): Stability scores for each mask in the batch.
Notes:
- One mask is always contained inside the other.
- Memory is saved by preventing unnecessary cast to torch.int64.
Examples:
>>> masks = torch.rand(10, 256, 256) # Batch of 10 masks
>>> mask_threshold = 0.5
>>> threshold_offset = 0.1
>>> stability_scores = calculate_stability_score(masks, mask_threshold, threshold_offset)
"""
intersections = (masks > (mask_threshold + threshold_offset)).sum(-1, dtype=torch.int16).sum(-1, dtype=torch.int32)
unions = (masks > (mask_threshold - threshold_offset)).sum(-1, dtype=torch.int16).sum(-1, dtype=torch.int32)
return intersections / unions
def build_point_grid(n_per_side: int) -> np.ndarray:
"""Generate a 2D grid of evenly spaced points in the range [0,1]x[0,1] for image segmentation tasks."""
offset = 1 / (2 * n_per_side)
points_one_side = np.linspace(offset, 1 - offset, n_per_side)
points_x = np.tile(points_one_side[None, :], (n_per_side, 1))
points_y = np.tile(points_one_side[:, None], (1, n_per_side))
return np.stack([points_x, points_y], axis=-1).reshape(-1, 2)
def build_all_layer_point_grids(n_per_side: int, n_layers: int, scale_per_layer: int) -> List[np.ndarray]:
"""Generates point grids for multiple crop layers with varying scales and densities."""
return [build_point_grid(int(n_per_side / (scale_per_layer**i))) for i in range(n_layers + 1)]
def generate_crop_boxes(
im_size: Tuple[int, ...], n_layers: int, overlap_ratio: float
) -> Tuple[List[List[int]], List[int]]:
"""Generates crop boxes of varying sizes for multiscale image processing, with layered overlapping regions."""
crop_boxes, layer_idxs = [], []
im_h, im_w = im_size
short_side = min(im_h, im_w)
# Original image
crop_boxes.append([0, 0, im_w, im_h])
layer_idxs.append(0)
def crop_len(orig_len, n_crops, overlap):
"""Crops bounding boxes to the size of the input image."""
return int(math.ceil((overlap * (n_crops - 1) + orig_len) / n_crops))
for i_layer in range(n_layers):
n_crops_per_side = 2 ** (i_layer + 1)
overlap = int(overlap_ratio * short_side * (2 / n_crops_per_side))
crop_w = crop_len(im_w, n_crops_per_side, overlap)
crop_h = crop_len(im_h, n_crops_per_side, overlap)
crop_box_x0 = [int((crop_w - overlap) * i) for i in range(n_crops_per_side)]
crop_box_y0 = [int((crop_h - overlap) * i) for i in range(n_crops_per_side)]
# Crops in XYWH format
for x0, y0 in product(crop_box_x0, crop_box_y0):
box = [x0, y0, min(x0 + crop_w, im_w), min(y0 + crop_h, im_h)]
crop_boxes.append(box)
layer_idxs.append(i_layer + 1)
return crop_boxes, layer_idxs
def uncrop_boxes_xyxy(boxes: torch.Tensor, crop_box: List[int]) -> torch.Tensor:
"""Uncrop bounding boxes by adding the crop box offset to their coordinates."""
x0, y0, _, _ = crop_box
offset = torch.tensor([[x0, y0, x0, y0]], device=boxes.device)
# Check if boxes has a channel dimension
if len(boxes.shape) == 3:
offset = offset.unsqueeze(1)
return boxes + offset
def uncrop_points(points: torch.Tensor, crop_box: List[int]) -> torch.Tensor:
"""Uncrop points by adding the crop box offset to their coordinates."""
x0, y0, _, _ = crop_box
offset = torch.tensor([[x0, y0]], device=points.device)
# Check if points has a channel dimension
if len(points.shape) == 3:
offset = offset.unsqueeze(1)
return points + offset
def uncrop_masks(masks: torch.Tensor, crop_box: List[int], orig_h: int, orig_w: int) -> torch.Tensor:
"""Uncrop masks by padding them to the original image size, handling coordinate transformations."""
x0, y0, x1, y1 = crop_box
if x0 == 0 and y0 == 0 and x1 == orig_w and y1 == orig_h:
return masks
# Coordinate transform masks
pad_x, pad_y = orig_w - (x1 - x0), orig_h - (y1 - y0)
pad = (x0, pad_x - x0, y0, pad_y - y0)
return torch.nn.functional.pad(masks, pad, value=0)
def remove_small_regions(mask: np.ndarray, area_thresh: float, mode: str) -> Tuple[np.ndarray, bool]:
"""Removes small disconnected regions or holes in a mask based on area threshold and mode."""
import cv2 # type: ignore
assert mode in {"holes", "islands"}, f"Provided mode {mode} is invalid"
correct_holes = mode == "holes"
working_mask = (correct_holes ^ mask).astype(np.uint8)
n_labels, regions, stats, _ = cv2.connectedComponentsWithStats(working_mask, 8)
sizes = stats[:, -1][1:] # Row 0 is background label
small_regions = [i + 1 for i, s in enumerate(sizes) if s < area_thresh]
if not small_regions:
return mask, False
fill_labels = [0] + small_regions
if not correct_holes:
# If every region is below threshold, keep largest
fill_labels = [i for i in range(n_labels) if i not in fill_labels] or [int(np.argmax(sizes)) + 1]
mask = np.isin(regions, fill_labels)
return mask, True
def batched_mask_to_box(masks: torch.Tensor) -> torch.Tensor:
"""Calculates bounding boxes in XYXY format around binary masks, handling empty masks and various input shapes."""
# torch.max below raises an error on empty inputs, just skip in this case
if torch.numel(masks) == 0:
return torch.zeros(*masks.shape[:-2], 4, device=masks.device)
# Normalize shape to CxHxW
shape = masks.shape
h, w = shape[-2:]
masks = masks.flatten(0, -3) if len(shape) > 2 else masks.unsqueeze(0)
# Get top and bottom edges
in_height, _ = torch.max(masks, dim=-1)
in_height_coords = in_height * torch.arange(h, device=in_height.device)[None, :]
bottom_edges, _ = torch.max(in_height_coords, dim=-1)
in_height_coords = in_height_coords + h * (~in_height)
top_edges, _ = torch.min(in_height_coords, dim=-1)
# Get left and right edges
in_width, _ = torch.max(masks, dim=-2)
in_width_coords = in_width * torch.arange(w, device=in_width.device)[None, :]
right_edges, _ = torch.max(in_width_coords, dim=-1)
in_width_coords = in_width_coords + w * (~in_width)
left_edges, _ = torch.min(in_width_coords, dim=-1)
# If the mask is empty the right edge will be to the left of the left edge.
# Replace these boxes with [0, 0, 0, 0]
empty_filter = (right_edges < left_edges) | (bottom_edges < top_edges)
out = torch.stack([left_edges, top_edges, right_edges, bottom_edges], dim=-1)
out = out * (~empty_filter).unsqueeze(-1)
# Return to original shape
return out.reshape(*shape[:-2], 4) if len(shape) > 2 else out[0]
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
from functools import partial
import torch
from ultralytics.utils.downloads import attempt_download_asset
from .modules.decoders import MaskDecoder
from .modules.encoders import FpnNeck, Hiera, ImageEncoder, ImageEncoderViT, MemoryEncoder, PromptEncoder
from .modules.memory_attention import MemoryAttention, MemoryAttentionLayer
from .modules.sam import SAM2Model, SAMModel
from .modules.tiny_encoder import TinyViT
from .modules.transformer import TwoWayTransformer
def build_sam_vit_h(checkpoint=None):
"""Builds and returns a Segment Anything Model (SAM) h-size model with specified encoder parameters."""
return _build_sam(
encoder_embed_dim=1280,
encoder_depth=32,
encoder_num_heads=16,
encoder_global_attn_indexes=[7, 15, 23, 31],
checkpoint=checkpoint,
)
def build_sam_vit_l(checkpoint=None):
"""Builds and returns a Segment Anything Model (SAM) l-size model with specified encoder parameters."""
return _build_sam(
encoder_embed_dim=1024,
encoder_depth=24,
encoder_num_heads=16,
encoder_global_attn_indexes=[5, 11, 17, 23],
checkpoint=checkpoint,
)
def build_sam_vit_b(checkpoint=None):
"""Constructs and returns a Segment Anything Model (SAM) with b-size architecture and optional checkpoint."""
return _build_sam(
encoder_embed_dim=768,
encoder_depth=12,
encoder_num_heads=12,
encoder_global_attn_indexes=[2, 5, 8, 11],
checkpoint=checkpoint,
)
def build_mobile_sam(checkpoint=None):
"""Builds and returns a Mobile Segment Anything Model (Mobile-SAM) for efficient image segmentation."""
return _build_sam(
encoder_embed_dim=[64, 128, 160, 320],
encoder_depth=[2, 2, 6, 2],
encoder_num_heads=[2, 4, 5, 10],
encoder_global_attn_indexes=None,
mobile_sam=True,
checkpoint=checkpoint,
)
def build_sam2_t(checkpoint=None):
"""Builds and returns a Segment Anything Model 2 (SAM2) tiny-size model with specified architecture parameters."""
return _build_sam2(
encoder_embed_dim=96,
encoder_stages=[1, 2, 7, 2],
encoder_num_heads=1,
encoder_global_att_blocks=[5, 7, 9],
encoder_window_spec=[8, 4, 14, 7],
encoder_backbone_channel_list=[768, 384, 192, 96],
checkpoint=checkpoint,
)
def build_sam2_s(checkpoint=None):
"""Builds and returns a small-size Segment Anything Model (SAM2) with specified architecture parameters."""
return _build_sam2(
encoder_embed_dim=96,
encoder_stages=[1, 2, 11, 2],
encoder_num_heads=1,
encoder_global_att_blocks=[7, 10, 13],
encoder_window_spec=[8, 4, 14, 7],
encoder_backbone_channel_list=[768, 384, 192, 96],
checkpoint=checkpoint,
)
def build_sam2_b(checkpoint=None):
"""Builds and returns a SAM2 base-size model with specified architecture parameters."""
return _build_sam2(
encoder_embed_dim=112,
encoder_stages=[2, 3, 16, 3],
encoder_num_heads=2,
encoder_global_att_blocks=[12, 16, 20],
encoder_window_spec=[8, 4, 14, 7],
encoder_window_spatial_size=[14, 14],
encoder_backbone_channel_list=[896, 448, 224, 112],
checkpoint=checkpoint,
)
def build_sam2_l(checkpoint=None):
"""Builds and returns a large-size Segment Anything Model (SAM2) with specified architecture parameters."""
return _build_sam2(
encoder_embed_dim=144,
encoder_stages=[2, 6, 36, 4],
encoder_num_heads=2,
encoder_global_att_blocks=[23, 33, 43],
encoder_window_spec=[8, 4, 16, 8],
encoder_backbone_channel_list=[1152, 576, 288, 144],
checkpoint=checkpoint,
)
def _build_sam(
encoder_embed_dim,
encoder_depth,
encoder_num_heads,
encoder_global_attn_indexes,
checkpoint=None,
mobile_sam=False,
):
"""
Builds a Segment Anything Model (SAM) with specified encoder parameters.
Args:
encoder_embed_dim (int | List[int]): Embedding dimension for the encoder.
encoder_depth (int | List[int]): Depth of the encoder.
encoder_num_heads (int | List[int]): Number of attention heads in the encoder.
encoder_global_attn_indexes (List[int] | None): Indexes for global attention in the encoder.
checkpoint (str | None): Path to the model checkpoint file.
mobile_sam (bool): Whether to build a Mobile-SAM model.
Returns:
(SAMModel): A Segment Anything Model instance with the specified architecture.
Examples:
>>> sam = _build_sam(768, 12, 12, [2, 5, 8, 11])
>>> sam = _build_sam([64, 128, 160, 320], [2, 2, 6, 2], [2, 4, 5, 10], None, mobile_sam=True)
"""
prompt_embed_dim = 256
image_size = 1024
vit_patch_size = 16
image_embedding_size = image_size // vit_patch_size
image_encoder = (
TinyViT(
img_size=1024,
in_chans=3,
num_classes=1000,
embed_dims=encoder_embed_dim,
depths=encoder_depth,
num_heads=encoder_num_heads,
window_sizes=[7, 7, 14, 7],
mlp_ratio=4.0,
drop_rate=0.0,
drop_path_rate=0.0,
use_checkpoint=False,
mbconv_expand_ratio=4.0,
local_conv_size=3,
layer_lr_decay=0.8,
)
if mobile_sam
else ImageEncoderViT(
depth=encoder_depth,
embed_dim=encoder_embed_dim,
img_size=image_size,
mlp_ratio=4,
norm_layer=partial(torch.nn.LayerNorm, eps=1e-6),
num_heads=encoder_num_heads,
patch_size=vit_patch_size,
qkv_bias=True,
use_rel_pos=True,
global_attn_indexes=encoder_global_attn_indexes,
window_size=14,
out_chans=prompt_embed_dim,
)
)
sam = SAMModel(
image_encoder=image_encoder,
prompt_encoder=PromptEncoder(
embed_dim=prompt_embed_dim,
image_embedding_size=(image_embedding_size, image_embedding_size),
input_image_size=(image_size, image_size),
mask_in_chans=16,
),
mask_decoder=MaskDecoder(
num_multimask_outputs=3,
transformer=TwoWayTransformer(
depth=2,
embedding_dim=prompt_embed_dim,
mlp_dim=2048,
num_heads=8,
),
transformer_dim=prompt_embed_dim,
iou_head_depth=3,
iou_head_hidden_dim=256,
),
pixel_mean=[123.675, 116.28, 103.53],
pixel_std=[58.395, 57.12, 57.375],
)
if checkpoint is not None:
checkpoint = attempt_download_asset(checkpoint)
with open(checkpoint, "rb") as f:
state_dict = torch.load(f)
sam.load_state_dict(state_dict)
sam.eval()
return sam
def _build_sam2(
encoder_embed_dim=1280,
encoder_stages=[2, 6, 36, 4],
encoder_num_heads=2,
encoder_global_att_blocks=[7, 15, 23, 31],
encoder_backbone_channel_list=[1152, 576, 288, 144],
encoder_window_spatial_size=[7, 7],
encoder_window_spec=[8, 4, 16, 8],
checkpoint=None,
):
"""
Builds and returns a Segment Anything Model 2 (SAM2) with specified architecture parameters.
Args:
encoder_embed_dim (int): Embedding dimension for the encoder.
encoder_stages (List[int]): Number of blocks in each stage of the encoder.
encoder_num_heads (int): Number of attention heads in the encoder.
encoder_global_att_blocks (List[int]): Indices of global attention blocks in the encoder.
encoder_backbone_channel_list (List[int]): Channel dimensions for each level of the encoder backbone.
encoder_window_spatial_size (List[int]): Spatial size of the window for position embeddings.
encoder_window_spec (List[int]): Window specifications for each stage of the encoder.
checkpoint (str | None): Path to the checkpoint file for loading pre-trained weights.
Returns:
(SAM2Model): A configured and initialized SAM2 model.
Examples:
>>> sam2_model = _build_sam2(encoder_embed_dim=96, encoder_stages=[1, 2, 7, 2])
>>> sam2_model.eval()
"""
image_encoder = ImageEncoder(
trunk=Hiera(
embed_dim=encoder_embed_dim,
num_heads=encoder_num_heads,
stages=encoder_stages,
global_att_blocks=encoder_global_att_blocks,
window_pos_embed_bkg_spatial_size=encoder_window_spatial_size,
window_spec=encoder_window_spec,
),
neck=FpnNeck(
d_model=256,
backbone_channel_list=encoder_backbone_channel_list,
fpn_top_down_levels=[2, 3],
fpn_interp_model="nearest",
),
scalp=1,
)
memory_attention = MemoryAttention(d_model=256, pos_enc_at_input=True, num_layers=4, layer=MemoryAttentionLayer())
memory_encoder = MemoryEncoder(out_dim=64)
is_sam2_1 = checkpoint is not None and "sam2.1" in checkpoint
sam2 = SAM2Model(
image_encoder=image_encoder,
memory_attention=memory_attention,
memory_encoder=memory_encoder,
num_maskmem=7,
image_size=1024,
sigmoid_scale_for_mem_enc=20.0,
sigmoid_bias_for_mem_enc=-10.0,
use_mask_input_as_output_without_sam=True,
directly_add_no_mem_embed=True,
use_high_res_features_in_sam=True,
multimask_output_in_sam=True,
iou_prediction_use_sigmoid=True,
use_obj_ptrs_in_encoder=True,
add_tpos_enc_to_obj_ptrs=True,
only_obj_ptrs_in_the_past_for_eval=True,
pred_obj_scores=True,
pred_obj_scores_mlp=True,
fixed_no_obj_ptr=True,
multimask_output_for_tracking=True,
use_multimask_token_for_obj_ptr=True,
multimask_min_pt_num=0,
multimask_max_pt_num=1,
use_mlp_for_obj_ptr_proj=True,
compile_image_encoder=False,
no_obj_embed_spatial=is_sam2_1,
proj_tpos_enc_in_obj_ptrs=is_sam2_1,
use_signed_tpos_enc_to_obj_ptrs=is_sam2_1,
sam_mask_decoder_extra_args=dict(
dynamic_multimask_via_stability=True,
dynamic_multimask_stability_delta=0.05,
dynamic_multimask_stability_thresh=0.98,
),
)
if checkpoint is not None:
checkpoint = attempt_download_asset(checkpoint)
with open(checkpoint, "rb") as f:
state_dict = torch.load(f)["model"]
sam2.load_state_dict(state_dict)
sam2.eval()
return sam2
sam_model_map = {
"sam_h.pt": build_sam_vit_h,
"sam_l.pt": build_sam_vit_l,
"sam_b.pt": build_sam_vit_b,
"mobile_sam.pt": build_mobile_sam,
"sam2_t.pt": build_sam2_t,
"sam2_s.pt": build_sam2_s,
"sam2_b.pt": build_sam2_b,
"sam2_l.pt": build_sam2_l,
"sam2.1_t.pt": build_sam2_t,
"sam2.1_s.pt": build_sam2_s,
"sam2.1_b.pt": build_sam2_b,
"sam2.1_l.pt": build_sam2_l,
}
def build_sam(ckpt="sam_b.pt"):
"""
Builds and returns a Segment Anything Model (SAM) based on the provided checkpoint.
Args:
ckpt (str | Path): Path to the checkpoint file or name of a pre-defined SAM model.
Returns:
(SAMModel | SAM2Model): A configured and initialized SAM or SAM2 model instance.
Raises:
FileNotFoundError: If the provided checkpoint is not a supported SAM model.
Examples:
>>> sam_model = build_sam("sam_b.pt")
>>> sam_model = build_sam("path/to/custom_checkpoint.pt")
Notes:
Supported pre-defined models include:
- SAM: 'sam_h.pt', 'sam_l.pt', 'sam_b.pt', 'mobile_sam.pt'
- SAM2: 'sam2_t.pt', 'sam2_s.pt', 'sam2_b.pt', 'sam2_l.pt'
"""
model_builder = None
ckpt = str(ckpt) # to allow Path ckpt types
for k in sam_model_map.keys():
if ckpt.endswith(k):
model_builder = sam_model_map.get(k)
if not model_builder:
raise FileNotFoundError(f"{ckpt} is not a supported SAM model. Available models are: \n {sam_model_map.keys()}")
return model_builder(ckpt)
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
"""
SAM model interface.
This module provides an interface to the Segment Anything Model (SAM) from Ultralytics, designed for real-time image
segmentation tasks. The SAM model allows for promptable segmentation with unparalleled versatility in image analysis,
and has been trained on the SA-1B dataset. It features zero-shot performance capabilities, enabling it to adapt to new
image distributions and tasks without prior knowledge.
Key Features:
- Promptable segmentation
- Real-time performance
- Zero-shot transfer capabilities
- Trained on SA-1B dataset
"""
from pathlib import Path
from ultralytics.engine.model import Model
from ultralytics.utils.torch_utils import model_info
from .build import build_sam
from .predict import Predictor, SAM2Predictor
class SAM(Model):
"""
SAM (Segment Anything Model) interface class for real-time image segmentation tasks.
This class provides an interface to the Segment Anything Model (SAM) from Ultralytics, designed for
promptable segmentation with versatility in image analysis. It supports various prompts such as bounding
boxes, points, or labels, and features zero-shot performance capabilities.
Attributes:
model (torch.nn.Module): The loaded SAM model.
is_sam2 (bool): Indicates whether the model is SAM2 variant.
task (str): The task type, set to "segment" for SAM models.
Methods:
predict: Performs segmentation prediction on the given image or video source.
info: Logs information about the SAM model.
Examples:
>>> sam = SAM("sam_b.pt")
>>> results = sam.predict("image.jpg", points=[[500, 375]])
>>> for r in results:
>>> print(f"Detected {len(r.masks)} masks")
"""
def __init__(self, model="sam_b.pt") -> None:
"""
Initializes the SAM (Segment Anything Model) instance.
Args:
model (str): Path to the pre-trained SAM model file. File should have a .pt or .pth extension.
Raises:
NotImplementedError: If the model file extension is not .pt or .pth.
Examples:
>>> sam = SAM("sam_b.pt")
>>> print(sam.is_sam2)
"""
if model and Path(model).suffix not in {".pt", ".pth"}:
raise NotImplementedError("SAM prediction requires pre-trained *.pt or *.pth model.")
self.is_sam2 = "sam2" in Path(model).stem
super().__init__(model=model, task="segment")
def _load(self, weights: str, task=None):
"""
Loads the specified weights into the SAM model.
This method initializes the SAM model with the provided weights file, setting up the model architecture
and loading the pre-trained parameters.
Args:
weights (str): Path to the weights file. Should be a .pt or .pth file containing the model parameters.
task (str | None): Task name. If provided, it specifies the particular task the model is being loaded for.
Examples:
>>> sam = SAM("sam_b.pt")
>>> sam._load("path/to/custom_weights.pt")
"""
self.model = build_sam(weights)
def predict(self, source, stream=False, bboxes=None, points=None, labels=None, **kwargs):
"""
Performs segmentation prediction on the given image or video source.
Args:
source (str | PIL.Image | numpy.ndarray): Path to the image or video file, or a PIL.Image object, or
a numpy.ndarray object.
stream (bool): If True, enables real-time streaming.
bboxes (List[List[float]] | None): List of bounding box coordinates for prompted segmentation.
points (List[List[float]] | None): List of points for prompted segmentation.
labels (List[int] | None): List of labels for prompted segmentation.
**kwargs (Any): Additional keyword arguments for prediction.
Returns:
(List): The model predictions.
Examples:
>>> sam = SAM("sam_b.pt")
>>> results = sam.predict("image.jpg", points=[[500, 375]])
>>> for r in results:
... print(f"Detected {len(r.masks)} masks")
"""
overrides = dict(conf=0.25, task="segment", mode="predict", imgsz=1024)
kwargs = {**overrides, **kwargs}
prompts = dict(bboxes=bboxes, points=points, labels=labels)
return super().predict(source, stream, prompts=prompts, **kwargs)
def __call__(self, source=None, stream=False, bboxes=None, points=None, labels=None, **kwargs):
"""
Performs segmentation prediction on the given image or video source.
This method is an alias for the 'predict' method, providing a convenient way to call the SAM model
for segmentation tasks.
Args:
source (str | PIL.Image | numpy.ndarray | None): Path to the image or video file, or a PIL.Image
object, or a numpy.ndarray object.
stream (bool): If True, enables real-time streaming.
bboxes (List[List[float]] | None): List of bounding box coordinates for prompted segmentation.
points (List[List[float]] | None): List of points for prompted segmentation.
labels (List[int] | None): List of labels for prompted segmentation.
**kwargs (Any): Additional keyword arguments to be passed to the predict method.
Returns:
(List): The model predictions, typically containing segmentation masks and other relevant information.
Examples:
>>> sam = SAM("sam_b.pt")
>>> results = sam("image.jpg", points=[[500, 375]])
>>> print(f"Detected {len(results[0].masks)} masks")
"""
return self.predict(source, stream, bboxes, points, labels, **kwargs)
def info(self, detailed=False, verbose=True):
"""
Logs information about the SAM model.
This method provides details about the Segment Anything Model (SAM), including its architecture,
parameters, and computational requirements.
Args:
detailed (bool): If True, displays detailed information about the model layers and operations.
verbose (bool): If True, prints the information to the console.
Returns:
(tuple): A tuple containing the model's information (string representations of the model).
Examples:
>>> sam = SAM("sam_b.pt")
>>> info = sam.info()
>>> print(info[0]) # Print summary information
"""
return model_info(self.model, detailed=detailed, verbose=verbose)
@property
def task_map(self):
"""
Provides a mapping from the 'segment' task to its corresponding 'Predictor'.
Returns:
(Dict[str, Type[Predictor]]): A dictionary mapping the 'segment' task to its corresponding Predictor
class. For SAM2 models, it maps to SAM2Predictor, otherwise to the standard Predictor.
Examples:
>>> sam = SAM("sam_b.pt")
>>> task_map = sam.task_map
>>> print(task_map)
{'segment': <class 'ultralytics.models.sam.predict.Predictor'>}
"""
return {"segment": {"predictor": SAM2Predictor if self.is_sam2 else Predictor}}
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
import copy
import math
from functools import partial
from typing import Any, Optional, Tuple, Type, Union
import numpy as np
import torch
import torch.nn.functional as F
from torch import Tensor, nn
from ultralytics.nn.modules import MLP, LayerNorm2d, MLPBlock
from .transformer import Attention, TwoWayAttentionBlock, TwoWayTransformer
from .utils import add_decomposed_rel_pos, apply_rotary_enc, compute_axial_cis, window_partition, window_unpartition
class DropPath(nn.Module):
"""
Implements stochastic depth regularization for neural networks during training.
Attributes:
drop_prob (float): Probability of dropping a path during training.
scale_by_keep (bool): Whether to scale the output by the keep probability.
Methods:
forward: Applies stochastic depth to input tensor during training, with optional scaling.
Examples:
>>> drop_path = DropPath(drop_prob=0.2, scale_by_keep=True)
>>> x = torch.randn(32, 64, 224, 224)
>>> output = drop_path(x)
"""
def __init__(self, drop_prob=0.0, scale_by_keep=True):
"""Initialize DropPath module for stochastic depth regularization during training."""
super().__init__()
self.drop_prob = drop_prob
self.scale_by_keep = scale_by_keep
def forward(self, x):
"""Applies stochastic depth to input tensor during training, with optional scaling."""
if self.drop_prob == 0.0 or not self.training:
return x
keep_prob = 1 - self.drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1)
random_tensor = x.new_empty(shape).bernoulli_(keep_prob)
if keep_prob > 0.0 and self.scale_by_keep:
random_tensor.div_(keep_prob)
return x * random_tensor
class MaskDownSampler(nn.Module):
"""
A mask downsampling and embedding module for efficient processing of input masks.
This class implements a mask downsampler that progressively reduces the spatial dimensions of input masks
while expanding their channel dimensions using convolutional layers, layer normalization, and activation
functions.
Attributes:
encoder (nn.Sequential): A sequential container of convolutional layers, layer normalization, and
activation functions for downsampling and embedding masks.
Methods:
forward: Downsamples and encodes input mask to embed_dim channels.
Examples:
>>> mask_downsampler = MaskDownSampler(embed_dim=256, kernel_size=4, stride=4, padding=0, total_stride=16)
>>> input_mask = torch.randn(1, 1, 256, 256)
>>> output = mask_downsampler(input_mask)
>>> print(output.shape)
torch.Size([1, 256, 16, 16])
"""
def __init__(
self,
embed_dim=256,
kernel_size=4,
stride=4,
padding=0,
total_stride=16,
activation=nn.GELU,
):
"""Initializes a mask downsampler module for progressive downsampling and channel expansion."""
super().__init__()
num_layers = int(math.log2(total_stride) // math.log2(stride))
assert stride**num_layers == total_stride
self.encoder = nn.Sequential()
mask_in_chans, mask_out_chans = 1, 1
for _ in range(num_layers):
mask_out_chans = mask_in_chans * (stride**2)
self.encoder.append(
nn.Conv2d(
mask_in_chans,
mask_out_chans,
kernel_size=kernel_size,
stride=stride,
padding=padding,
)
)
self.encoder.append(LayerNorm2d(mask_out_chans))
self.encoder.append(activation())
mask_in_chans = mask_out_chans
self.encoder.append(nn.Conv2d(mask_out_chans, embed_dim, kernel_size=1))
def forward(self, x):
"""Downsamples and encodes input mask to embed_dim channels using convolutional layers and LayerNorm2d."""
return self.encoder(x)
class CXBlock(nn.Module):
"""
ConvNeXt Block for efficient feature extraction in convolutional neural networks.
This block implements a modified version of the ConvNeXt architecture, offering improved performance and
flexibility in feature extraction.
Attributes:
dwconv (nn.Conv2d): Depthwise or standard 2D convolution layer.
norm (LayerNorm2d): Layer normalization applied to channels.
pwconv1 (nn.Linear): First pointwise convolution implemented as a linear layer.
act (nn.GELU): GELU activation function.
pwconv2 (nn.Linear): Second pointwise convolution implemented as a linear layer.
gamma (nn.Parameter | None): Learnable scale parameter for layer scaling.
drop_path (nn.Module): DropPath layer for stochastic depth regularization.
Methods:
forward: Processes the input tensor through the ConvNeXt block.
Examples:
>>> import torch
>>> x = torch.randn(1, 64, 56, 56)
>>> block = CXBlock(dim=64, kernel_size=7, padding=3)
>>> output = block(x)
>>> print(output.shape)
torch.Size([1, 64, 56, 56])
"""
def __init__(
self,
dim,
kernel_size=7,
padding=3,
drop_path=0.0,
layer_scale_init_value=1e-6,
use_dwconv=True,
):
"""
Initialize a ConvNeXt Block for efficient feature extraction in convolutional neural networks.
This block implements a modified version of the ConvNeXt architecture, offering improved performance and
flexibility in feature extraction.
Args:
dim (int): Number of input channels.
kernel_size (int): Size of the convolutional kernel.
padding (int): Padding size for the convolution.
drop_path (float): Stochastic depth rate.
layer_scale_init_value (float): Initial value for Layer Scale.
use_dwconv (bool): Whether to use depthwise convolution.
Examples:
>>> block = CXBlock(dim=64, kernel_size=7, padding=3)
>>> x = torch.randn(1, 64, 32, 32)
>>> output = block(x)
>>> print(output.shape)
torch.Size([1, 64, 32, 32])
"""
super().__init__()
self.dwconv = nn.Conv2d(
dim,
dim,
kernel_size=kernel_size,
padding=padding,
groups=dim if use_dwconv else 1,
) # depthwise conv
self.norm = LayerNorm2d(dim, eps=1e-6)
self.pwconv1 = nn.Linear(dim, 4 * dim) # pointwise/1x1 convs, implemented with linear layers
self.act = nn.GELU()
self.pwconv2 = nn.Linear(4 * dim, dim)
self.gamma = (
nn.Parameter(layer_scale_init_value * torch.ones(dim), requires_grad=True)
if layer_scale_init_value > 0
else None
)
self.drop_path = DropPath(drop_path) if drop_path > 0.0 else nn.Identity()
def forward(self, x):
"""Applies ConvNeXt block operations to input tensor, including convolutions and residual connection."""
input = x
x = self.dwconv(x)
x = self.norm(x)
x = x.permute(0, 2, 3, 1) # (N, C, H, W) -> (N, H, W, C)
x = self.pwconv1(x)
x = self.act(x)
x = self.pwconv2(x)
if self.gamma is not None:
x = self.gamma * x
x = x.permute(0, 3, 1, 2) # (N, H, W, C) -> (N, C, H, W)
x = input + self.drop_path(x)
return x
class Fuser(nn.Module):
"""
A module for fusing features through multiple layers of a neural network.
This class applies a series of identical layers to an input tensor, optionally projecting the input first.
Attributes:
proj (nn.Module): An optional input projection layer. Identity if no projection is needed.
layers (nn.ModuleList): A list of identical layers to be applied sequentially.
Methods:
forward: Applies the fuser to an input tensor.
Examples:
>>> layer = CXBlock(dim=256)
>>> fuser = Fuser(layer, num_layers=3, dim=256, input_projection=True)
>>> x = torch.randn(1, 256, 32, 32)
>>> output = fuser(x)
>>> print(output.shape)
torch.Size([1, 256, 32, 32])
"""
def __init__(self, layer, num_layers, dim=None, input_projection=False):
"""
Initializes the Fuser module for feature fusion through multiple layers.
This module creates a sequence of identical layers and optionally applies an input projection.
Args:
layer (nn.Module): The layer to be replicated in the fuser.
num_layers (int): The number of times to replicate the layer.
dim (int | None): The dimension for input projection, if used.
input_projection (bool): Whether to use input projection.
Examples:
>>> layer = nn.Linear(64, 64)
>>> fuser = Fuser(layer, num_layers=3, dim=64, input_projection=True)
>>> input_tensor = torch.randn(1, 64)
>>> output = fuser(input_tensor)
"""
super().__init__()
self.proj = nn.Identity()
self.layers = nn.ModuleList([copy.deepcopy(layer) for _ in range(num_layers)])
if input_projection:
assert dim is not None
self.proj = nn.Conv2d(dim, dim, kernel_size=1)
def forward(self, x):
"""Applies a series of layers to the input tensor, optionally projecting it first."""
x = self.proj(x)
for layer in self.layers:
x = layer(x)
return x
class SAM2TwoWayAttentionBlock(TwoWayAttentionBlock):
"""
A two-way attention block for performing self-attention and cross-attention in both directions.
This block extends the TwoWayAttentionBlock and consists of four main components: self-attention on
sparse inputs, cross-attention from sparse to dense inputs, an MLP block on sparse inputs, and
cross-attention from dense to sparse inputs.
Attributes:
self_attn (Attention): Self-attention layer for queries.
norm1 (nn.LayerNorm): Layer normalization after the first attention block.
cross_attn_token_to_image (Attention): Cross-attention layer from queries to keys.
norm2 (nn.LayerNorm): Layer normalization after the second attention block.
mlp (MLP): MLP block for transforming query embeddings.
norm3 (nn.LayerNorm): Layer normalization after the MLP block.
norm4 (nn.LayerNorm): Layer normalization after the third attention block.
cross_attn_image_to_token (Attention): Cross-attention layer from keys to queries.
skip_first_layer_pe (bool): Flag to skip positional encoding in the first layer.
Methods:
forward: Processes input through the attention blocks and MLP.
Examples:
>>> block = SAM2TwoWayAttentionBlock(embedding_dim=256, num_heads=8)
>>> sparse_input = torch.randn(1, 100, 256)
>>> dense_input = torch.randn(1, 256, 16, 16)
>>> sparse_output, dense_output = block(sparse_input, dense_input)
"""
def __init__(
self,
embedding_dim: int,
num_heads: int,
mlp_dim: int = 2048,
activation: Type[nn.Module] = nn.ReLU,
attention_downsample_rate: int = 2,
skip_first_layer_pe: bool = False,
) -> None:
"""
Initializes a SAM2TwoWayAttentionBlock for performing self-attention and cross-attention in two directions.
This block extends the TwoWayAttentionBlock and consists of four main components: self-attention on sparse
inputs, cross-attention from sparse to dense inputs, an MLP block on sparse inputs, and cross-attention
from dense to sparse inputs.
Args:
embedding_dim (int): The channel dimension of the embeddings.
num_heads (int): The number of heads in the attention layers.
mlp_dim (int): The hidden dimension of the MLP block.
activation (Type[nn.Module]): The activation function of the MLP block.
attention_downsample_rate (int): The downsample rate for attention computations.
skip_first_layer_pe (bool): Whether to skip the positional encoding in the first layer.
Examples:
>>> block = SAM2TwoWayAttentionBlock(embedding_dim=256, num_heads=8, mlp_dim=2048)
>>> sparse_inputs = torch.randn(1, 100, 256)
>>> dense_inputs = torch.randn(1, 256, 32, 32)
>>> sparse_outputs, dense_outputs = block(sparse_inputs, dense_inputs)
"""
super().__init__(embedding_dim, num_heads, mlp_dim, activation, attention_downsample_rate, skip_first_layer_pe)
self.mlp = MLP(embedding_dim, mlp_dim, embedding_dim, num_layers=2, act=activation)
class SAM2TwoWayTransformer(TwoWayTransformer):
"""
A Two-Way Transformer module for simultaneous attention to image and query points.
This class extends the TwoWayTransformer, implementing a specialized transformer decoder that attends to an
input image using queries with supplied positional embeddings. It is particularly useful for tasks like
object detection, image segmentation, and point cloud processing.
Attributes:
depth (int): Number of layers in the transformer.
embedding_dim (int): Channel dimension for input embeddings.
num_heads (int): Number of heads for multihead attention.
mlp_dim (int): Internal channel dimension for the MLP block.
layers (nn.ModuleList): List of SAM2TwoWayAttentionBlock layers comprising the transformer.
final_attn_token_to_image (Attention): Final attention layer from queries to image.
norm_final_attn (nn.LayerNorm): Layer normalization applied to final queries.
Methods:
forward: Processes input image embeddings and query embeddings through the transformer.
Examples:
>>> transformer = SAM2TwoWayTransformer(depth=5, embedding_dim=256, num_heads=8, mlp_dim=2048)
>>> image_embedding = torch.randn(1, 256, 64, 64)
>>> query_embedding = torch.randn(1, 100, 256)
>>> output = transformer(image_embedding, query_embedding)
>>> print(output[0].shape, output[1].shape)
torch.Size([1, 100, 256]) torch.Size([1, 256, 64, 64])
"""
def __init__(
self,
depth: int,
embedding_dim: int,
num_heads: int,
mlp_dim: int,
activation: Type[nn.Module] = nn.ReLU,
attention_downsample_rate: int = 2,
) -> None:
"""
Initializes a SAM2TwoWayTransformer instance.
This transformer decoder attends to an input image using queries with supplied positional embeddings.
It is designed for tasks like object detection, image segmentation, and point cloud processing.
Args:
depth (int): Number of layers in the transformer.
embedding_dim (int): Channel dimension for the input embeddings.
num_heads (int): Number of heads for multihead attention. Must divide embedding_dim.
mlp_dim (int): Channel dimension internal to the MLP block.
activation (Type[nn.Module]): Activation function to use in the MLP block.
attention_downsample_rate (int): Downsampling rate for attention computations.
Examples:
>>> transformer = SAM2TwoWayTransformer(depth=5, embedding_dim=256, num_heads=8, mlp_dim=2048)
>>> transformer
SAM2TwoWayTransformer(
(layers): ModuleList(
(0-4): 5 x SAM2TwoWayAttentionBlock(...)
)
(final_attn_token_to_image): Attention(...)
(norm_final_attn): LayerNorm(...)
)
"""
super().__init__(depth, embedding_dim, num_heads, mlp_dim, activation, attention_downsample_rate)
self.layers = nn.ModuleList()
for i in range(depth):
self.layers.append(
SAM2TwoWayAttentionBlock(
embedding_dim=embedding_dim,
num_heads=num_heads,
mlp_dim=mlp_dim,
activation=activation,
attention_downsample_rate=attention_downsample_rate,
skip_first_layer_pe=(i == 0),
)
)
class RoPEAttention(Attention):
"""
Implements rotary position encoding for attention mechanisms in transformer architectures.
This class extends the base Attention class by incorporating Rotary Position Encoding (RoPE) to enhance
the positional awareness of the attention mechanism.
Attributes:
compute_cis (Callable): Function to compute axial complex numbers for rotary encoding.
freqs_cis (Tensor): Precomputed frequency tensor for rotary encoding.
rope_k_repeat (bool): Flag to repeat query RoPE to match key length for cross-attention to memories.
Methods:
forward: Applies rotary position encoding and computes attention between query, key, and value tensors.
Examples:
>>> rope_attn = RoPEAttention(embedding_dim=256, num_heads=8, rope_theta=10000.0, feat_sizes=(32, 32))
>>> q = torch.randn(1, 1024, 256)
>>> k = torch.randn(1, 1024, 256)
>>> v = torch.randn(1, 1024, 256)
>>> output = rope_attn(q, k, v)
>>> print(output.shape)
torch.Size([1, 1024, 256])
"""
def __init__(
self,
*args,
rope_theta=10000.0,
rope_k_repeat=False,
feat_sizes=(32, 32), # [w, h] for stride 16 feats at 512 resolution
**kwargs,
):
"""Initializes RoPEAttention with rotary position encoding for enhanced positional awareness."""
super().__init__(*args, **kwargs)
self.compute_cis = partial(compute_axial_cis, dim=self.internal_dim // self.num_heads, theta=rope_theta)
freqs_cis = self.compute_cis(end_x=feat_sizes[0], end_y=feat_sizes[1])
self.freqs_cis = freqs_cis
self.rope_k_repeat = rope_k_repeat # repeat q rope to match k length, needed for cross-attention to memories
def forward(self, q: Tensor, k: Tensor, v: Tensor, num_k_exclude_rope: int = 0) -> Tensor:
"""Applies rotary position encoding and computes attention between query, key, and value tensors."""
q = self.q_proj(q)
k = self.k_proj(k)
v = self.v_proj(v)
# Separate into heads
q = self._separate_heads(q, self.num_heads)
k = self._separate_heads(k, self.num_heads)
v = self._separate_heads(v, self.num_heads)
# Apply rotary position encoding
w = h = math.sqrt(q.shape[-2])
self.freqs_cis = self.freqs_cis.to(q.device)
if self.freqs_cis.shape[0] != q.shape[-2]:
self.freqs_cis = self.compute_cis(end_x=w, end_y=h).to(q.device)
if q.shape[-2] != k.shape[-2]:
assert self.rope_k_repeat
num_k_rope = k.size(-2) - num_k_exclude_rope
q, k[:, :, :num_k_rope] = apply_rotary_enc(
q,
k[:, :, :num_k_rope],
freqs_cis=self.freqs_cis,
repeat_freqs_k=self.rope_k_repeat,
)
# Attention
_, _, _, c_per_head = q.shape
attn = q @ k.permute(0, 1, 3, 2) # B x N_heads x N_tokens x N_tokens
attn = attn / math.sqrt(c_per_head)
attn = torch.softmax(attn, dim=-1)
# Get output
out = attn @ v
out = self._recombine_heads(out)
out = self.out_proj(out)
return out
def do_pool(x: torch.Tensor, pool: nn.Module, norm: nn.Module = None) -> torch.Tensor:
"""Applies pooling and optional normalization to a tensor, handling spatial dimension permutations."""
if pool is None:
return x
# (B, H, W, C) -> (B, C, H, W)
x = x.permute(0, 3, 1, 2)
x = pool(x)
# (B, C, H', W') -> (B, H', W', C)
x = x.permute(0, 2, 3, 1)
if norm:
x = norm(x)
return x
class MultiScaleAttention(nn.Module):
"""
Implements multiscale self-attention with optional query pooling for efficient feature extraction.
This class provides a flexible implementation of multiscale attention, allowing for optional
downsampling of query features through pooling. It's designed to enhance the model's ability to
capture multiscale information in visual tasks.
Attributes:
dim (int): Input dimension of the feature map.
dim_out (int): Output dimension of the attention module.
num_heads (int): Number of attention heads.
scale (float): Scaling factor for dot-product attention.
q_pool (nn.Module | None): Optional pooling module for query features.
qkv (nn.Linear): Linear projection for query, key, and value.
proj (nn.Linear): Output projection.
Methods:
forward: Applies multiscale attention to the input tensor.
Examples:
>>> import torch
>>> from torch import nn
>>> x = torch.randn(1, 64, 64, 256)
>>> msa = MultiScaleAttention(dim=256, dim_out=256, num_heads=8)
>>> output = msa(x)
>>> print(output.shape)
torch.Size([1, 64, 64, 256])
"""
def __init__(
self,
dim: int,
dim_out: int,
num_heads: int,
q_pool: nn.Module = None,
):
"""Initializes multiscale attention with optional query pooling for efficient feature extraction."""
super().__init__()
self.dim = dim
self.dim_out = dim_out
self.num_heads = num_heads
head_dim = dim_out // num_heads
self.scale = head_dim**-0.5
self.q_pool = q_pool
self.qkv = nn.Linear(dim, dim_out * 3)
self.proj = nn.Linear(dim_out, dim_out)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Applies multiscale attention with optional query pooling to extract multiscale features."""
B, H, W, _ = x.shape
# qkv with shape (B, H * W, 3, nHead, C)
qkv = self.qkv(x).reshape(B, H * W, 3, self.num_heads, -1)
# q, k, v with shape (B, H * W, nheads, C)
q, k, v = torch.unbind(qkv, 2)
# Q pooling (for downsample at stage changes)
if self.q_pool:
q = do_pool(q.reshape(B, H, W, -1), self.q_pool)
H, W = q.shape[1:3] # downsampled shape
q = q.reshape(B, H * W, self.num_heads, -1)
# Torch's SDPA expects [B, nheads, H*W, C] so we transpose
x = F.scaled_dot_product_attention(
q.transpose(1, 2),
k.transpose(1, 2),
v.transpose(1, 2),
)
# Transpose back
x = x.transpose(1, 2)
x = x.reshape(B, H, W, -1)
x = self.proj(x)
return x
class MultiScaleBlock(nn.Module):
"""
A multiscale attention block with window partitioning and query pooling for efficient vision transformers.
This class implements a multiscale attention mechanism with optional window partitioning and downsampling,
designed for use in vision transformer architectures.
Attributes:
dim (int): Input dimension of the block.
dim_out (int): Output dimension of the block.
norm1 (nn.Module): First normalization layer.
window_size (int): Size of the window for partitioning.
pool (nn.Module | None): Pooling layer for query downsampling.
q_stride (Tuple[int, int] | None): Stride for query pooling.
attn (MultiScaleAttention): Multi-scale attention module.
drop_path (nn.Module): Drop path layer for regularization.
norm2 (nn.Module): Second normalization layer.
mlp (MLP): Multi-layer perceptron module.
proj (nn.Linear | None): Projection layer for dimension mismatch.
Methods:
forward: Processes input tensor through the multiscale block.
Examples:
>>> block = MultiScaleBlock(dim=256, dim_out=512, num_heads=8, window_size=7)
>>> x = torch.randn(1, 56, 56, 256)
>>> output = block(x)
>>> print(output.shape)
torch.Size([1, 28, 28, 512])
"""
def __init__(
self,
dim: int,
dim_out: int,
num_heads: int,
mlp_ratio: float = 4.0,
drop_path: float = 0.0,
norm_layer: Union[nn.Module, str] = "LayerNorm",
q_stride: Tuple[int, int] = None,
act_layer: nn.Module = nn.GELU,
window_size: int = 0,
):
"""Initializes a multiscale attention block with window partitioning and optional query pooling."""
super().__init__()
if isinstance(norm_layer, str):
norm_layer = partial(getattr(nn, norm_layer), eps=1e-6)
self.dim = dim
self.dim_out = dim_out
self.norm1 = norm_layer(dim)
self.window_size = window_size
self.pool, self.q_stride = None, q_stride
if self.q_stride:
self.pool = nn.MaxPool2d(kernel_size=q_stride, stride=q_stride, ceil_mode=False)
self.attn = MultiScaleAttention(
dim,
dim_out,
num_heads=num_heads,
q_pool=self.pool,
)
self.drop_path = DropPath(drop_path) if drop_path > 0.0 else nn.Identity()
self.norm2 = norm_layer(dim_out)
self.mlp = MLP(
dim_out,
int(dim_out * mlp_ratio),
dim_out,
num_layers=2,
act=act_layer,
)
if dim != dim_out:
self.proj = nn.Linear(dim, dim_out)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Processes input through multiscale attention and MLP, with optional windowing and downsampling."""
shortcut = x # B, H, W, C
x = self.norm1(x)
# Skip connection
if self.dim != self.dim_out:
shortcut = do_pool(self.proj(x), self.pool)
# Window partition
window_size = self.window_size
if window_size > 0:
H, W = x.shape[1], x.shape[2]
x, pad_hw = window_partition(x, window_size)
# Window Attention + Q Pooling (if stage change)
x = self.attn(x)
if self.q_stride:
# Shapes have changed due to Q pooling
window_size = self.window_size // self.q_stride[0]
H, W = shortcut.shape[1:3]
pad_h = (window_size - H % window_size) % window_size
pad_w = (window_size - W % window_size) % window_size
pad_hw = (H + pad_h, W + pad_w)
# Reverse window partition
if self.window_size > 0:
x = window_unpartition(x, window_size, pad_hw, (H, W))
x = shortcut + self.drop_path(x)
# MLP
x = x + self.drop_path(self.mlp(self.norm2(x)))
return x
class PositionEmbeddingSine(nn.Module):
"""
A module for generating sinusoidal positional embeddings for 2D inputs like images.
This class implements sinusoidal position encoding for 2D spatial positions, which can be used in
transformer-based models for computer vision tasks.
Attributes:
num_pos_feats (int): Number of positional features (half of the embedding dimension).
temperature (int): Temperature parameter for the sinusoidal functions.
normalize (bool): Whether to normalize the positional embeddings.
scale (float): Scaling factor for the embeddings when normalize is True.
cache (Dict): Cache for storing precomputed embeddings.
Methods:
_encode_xy: Encodes 2D positions using sine and cosine functions.
encode_boxes: Encodes box coordinates and dimensions into positional embeddings.
encode_points: Encodes 2D point coordinates with sinusoidal positional embeddings.
forward: Generates sinusoidal position embeddings for 2D inputs.
Examples:
>>> pos_emb = PositionEmbeddingSine(num_pos_feats=128)
>>> x = torch.randn(1, 3, 224, 224)
>>> embeddings = pos_emb(x)
>>> print(embeddings.shape)
torch.Size([1, 256, 224, 224])
"""
def __init__(
self,
num_pos_feats,
temperature: int = 10000,
normalize: bool = True,
scale: Optional[float] = None,
):
"""Initializes sinusoidal position embeddings for 2D image inputs."""
super().__init__()
assert num_pos_feats % 2 == 0, "Expecting even model width"
self.num_pos_feats = num_pos_feats // 2
self.temperature = temperature
self.normalize = normalize
if scale is not None and not normalize:
raise ValueError("normalize should be True if scale is passed")
if scale is None:
scale = 2 * math.pi
self.scale = scale
self.cache = {}
def _encode_xy(self, x, y):
"""Encodes 2D positions using sine/cosine functions for transformer positional embeddings."""
assert len(x) == len(y) and x.ndim == y.ndim == 1
x_embed = x * self.scale
y_embed = y * self.scale
dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)
dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)
pos_x = x_embed[:, None] / dim_t
pos_y = y_embed[:, None] / dim_t
pos_x = torch.stack((pos_x[:, 0::2].sin(), pos_x[:, 1::2].cos()), dim=2).flatten(1)
pos_y = torch.stack((pos_y[:, 0::2].sin(), pos_y[:, 1::2].cos()), dim=2).flatten(1)
return pos_x, pos_y
@torch.no_grad()
def encode_boxes(self, x, y, w, h):
"""Encodes box coordinates and dimensions into positional embeddings for detection."""
pos_x, pos_y = self._encode_xy(x, y)
return torch.cat((pos_y, pos_x, h[:, None], w[:, None]), dim=1)
encode = encode_boxes # Backwards compatibility
@torch.no_grad()
def encode_points(self, x, y, labels):
"""Encodes 2D points with sinusoidal embeddings and appends labels."""
(bx, nx), (by, ny), (bl, nl) = x.shape, y.shape, labels.shape
assert bx == by and nx == ny and bx == bl and nx == nl
pos_x, pos_y = self._encode_xy(x.flatten(), y.flatten())
pos_x, pos_y = pos_x.reshape(bx, nx, -1), pos_y.reshape(by, ny, -1)
return torch.cat((pos_y, pos_x, labels[:, :, None]), dim=2)
@torch.no_grad()
def forward(self, x: torch.Tensor):
"""Generates sinusoidal position embeddings for 2D inputs like images."""
cache_key = (x.shape[-2], x.shape[-1])
if cache_key in self.cache:
return self.cache[cache_key][None].repeat(x.shape[0], 1, 1, 1)
y_embed = (
torch.arange(1, x.shape[-2] + 1, dtype=torch.float32, device=x.device)
.view(1, -1, 1)
.repeat(x.shape[0], 1, x.shape[-1])
)
x_embed = (
torch.arange(1, x.shape[-1] + 1, dtype=torch.float32, device=x.device)
.view(1, 1, -1)
.repeat(x.shape[0], x.shape[-2], 1)
)
if self.normalize:
eps = 1e-6
y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale
x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale
dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)
dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)
pos_x = x_embed[:, :, :, None] / dim_t
pos_y = y_embed[:, :, :, None] / dim_t
pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)
pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)
pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)
self.cache[cache_key] = pos[0]
return pos
class PositionEmbeddingRandom(nn.Module):
"""
Positional encoding using random spatial frequencies.
This class generates positional embeddings for input coordinates using random spatial frequencies. It is
particularly useful for transformer-based models that require position information.
Attributes:
positional_encoding_gaussian_matrix (torch.Tensor): A buffer containing random values for encoding.
Methods:
_pe_encoding: Positionally encodes points that are normalized to [0,1].
forward: Generates positional encoding for a grid of the specified size.
forward_with_coords: Positionally encodes points that are not normalized to [0,1].
Examples:
>>> pe = PositionEmbeddingRandom(num_pos_feats=64)
>>> size = (32, 32)
>>> encoding = pe(size)
>>> print(encoding.shape)
torch.Size([128, 32, 32])
"""
def __init__(self, num_pos_feats: int = 64, scale: Optional[float] = None) -> None:
"""Initializes random spatial frequency position embedding for transformers."""
super().__init__()
if scale is None or scale <= 0.0:
scale = 1.0
self.register_buffer("positional_encoding_gaussian_matrix", scale * torch.randn((2, num_pos_feats)))
# Set non-deterministic for forward() error 'cumsum_cuda_kernel does not have a deterministic implementation'
torch.use_deterministic_algorithms(False)
torch.backends.cudnn.deterministic = False
def _pe_encoding(self, coords: torch.Tensor) -> torch.Tensor:
"""Encodes normalized [0,1] coordinates using random spatial frequencies."""
# Assuming coords are in [0, 1]^2 square and have d_1 x ... x d_n x 2 shape
coords = 2 * coords - 1
coords = coords @ self.positional_encoding_gaussian_matrix
coords = 2 * np.pi * coords
# Outputs d_1 x ... x d_n x C shape
return torch.cat([torch.sin(coords), torch.cos(coords)], dim=-1)
def forward(self, size: Tuple[int, int]) -> torch.Tensor:
"""Generates positional encoding for a grid using random spatial frequencies."""
h, w = size
device: Any = self.positional_encoding_gaussian_matrix.device
grid = torch.ones((h, w), device=device, dtype=torch.float32)
y_embed = grid.cumsum(dim=0) - 0.5
x_embed = grid.cumsum(dim=1) - 0.5
y_embed = y_embed / h
x_embed = x_embed / w
pe = self._pe_encoding(torch.stack([x_embed, y_embed], dim=-1))
return pe.permute(2, 0, 1) # C x H x W
def forward_with_coords(self, coords_input: torch.Tensor, image_size: Tuple[int, int]) -> torch.Tensor:
"""Positionally encodes input coordinates, normalizing them to [0,1] based on the given image size."""
coords = coords_input.clone()
coords[:, :, 0] = coords[:, :, 0] / image_size[1]
coords[:, :, 1] = coords[:, :, 1] / image_size[0]
return self._pe_encoding(coords.to(torch.float)) # B x N x C
class Block(nn.Module):
"""
Transformer block with support for window attention and residual propagation.
This class implements a transformer block that can use either global or windowed self-attention,
followed by a feed-forward network. It supports relative positional embeddings and is designed
for use in vision transformer architectures.
Attributes:
norm1 (nn.Module): First normalization layer.
attn (REAttention): Self-attention layer with optional relative positional encoding.
norm2 (nn.Module): Second normalization layer.
mlp (MLPBlock): Multi-layer perceptron block.
window_size (int): Size of attention window. If 0, global attention is used.
Methods:
forward: Processes input through the transformer block.
Examples:
>>> import torch
>>> block = Block(dim=256, num_heads=8, window_size=7)
>>> x = torch.randn(1, 56, 56, 256)
>>> output = block(x)
>>> print(output.shape)
torch.Size([1, 56, 56, 256])
"""
def __init__(
self,
dim: int,
num_heads: int,
mlp_ratio: float = 4.0,
qkv_bias: bool = True,
norm_layer: Type[nn.Module] = nn.LayerNorm,
act_layer: Type[nn.Module] = nn.GELU,
use_rel_pos: bool = False,
rel_pos_zero_init: bool = True,
window_size: int = 0,
input_size: Optional[Tuple[int, int]] = None,
) -> None:
"""
Initializes a transformer block with optional window attention and relative positional embeddings.
This constructor sets up a transformer block that can use either global or windowed self-attention,
followed by a feed-forward network. It supports relative positional embeddings and is designed
for use in vision transformer architectures.
Args:
dim (int): Number of input channels.
num_heads (int): Number of attention heads in the self-attention layer.
mlp_ratio (float): Ratio of mlp hidden dimension to embedding dimension.
qkv_bias (bool): If True, adds a learnable bias to query, key, value projections.
norm_layer (Type[nn.Module]): Type of normalization layer to use.
act_layer (Type[nn.Module]): Type of activation function to use in the MLP block.
use_rel_pos (bool): If True, uses relative positional embeddings in attention.
rel_pos_zero_init (bool): If True, initializes relative positional parameters to zero.
window_size (int): Size of attention window. If 0, uses global attention.
input_size (Optional[Tuple[int, int]]): Input resolution for calculating relative positional parameter size.
Examples:
>>> block = Block(dim=256, num_heads=8, window_size=7)
>>> x = torch.randn(1, 56, 56, 256)
>>> output = block(x)
>>> print(output.shape)
torch.Size([1, 56, 56, 256])
"""
super().__init__()
self.norm1 = norm_layer(dim)
self.attn = REAttention(
dim,
num_heads=num_heads,
qkv_bias=qkv_bias,
use_rel_pos=use_rel_pos,
rel_pos_zero_init=rel_pos_zero_init,
input_size=input_size if window_size == 0 else (window_size, window_size),
)
self.norm2 = norm_layer(dim)
self.mlp = MLPBlock(embedding_dim=dim, mlp_dim=int(dim * mlp_ratio), act=act_layer)
self.window_size = window_size
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Processes input through transformer block with optional windowed self-attention and residual connection."""
shortcut = x
x = self.norm1(x)
# Window partition
if self.window_size > 0:
H, W = x.shape[1], x.shape[2]
x, pad_hw = window_partition(x, self.window_size)
x = self.attn(x)
# Reverse window partition
if self.window_size > 0:
x = window_unpartition(x, self.window_size, pad_hw, (H, W))
x = shortcut + x
return x + self.mlp(self.norm2(x))
class REAttention(nn.Module):
"""
Rotary Embedding Attention module for efficient self-attention in transformer architectures.
This class implements a multi-head attention mechanism with rotary positional embeddings, designed
for use in vision transformer models. It supports optional query pooling and window partitioning
for efficient processing of large inputs.
Attributes:
compute_cis (Callable): Function to compute axial complex numbers for rotary encoding.
freqs_cis (Tensor): Precomputed frequency tensor for rotary encoding.
rope_k_repeat (bool): Flag to repeat query RoPE to match key length for cross-attention to memories.
q_proj (nn.Linear): Linear projection for query.
k_proj (nn.Linear): Linear projection for key.
v_proj (nn.Linear): Linear projection for value.
out_proj (nn.Linear): Output projection.
num_heads (int): Number of attention heads.
internal_dim (int): Internal dimension for attention computation.
Methods:
forward: Applies rotary position encoding and computes attention between query, key, and value tensors.
Examples:
>>> rope_attn = REAttention(embedding_dim=256, num_heads=8, rope_theta=10000.0, feat_sizes=(32, 32))
>>> q = torch.randn(1, 1024, 256)
>>> k = torch.randn(1, 1024, 256)
>>> v = torch.randn(1, 1024, 256)
>>> output = rope_attn(q, k, v)
>>> print(output.shape)
torch.Size([1, 1024, 256])
"""
def __init__(
self,
dim: int,
num_heads: int = 8,
qkv_bias: bool = True,
use_rel_pos: bool = False,
rel_pos_zero_init: bool = True,
input_size: Optional[Tuple[int, int]] = None,
) -> None:
"""
Initializes a Relative Position Attention module for transformer-based architectures.
This module implements multi-head attention with optional relative positional encodings, designed
specifically for vision tasks in transformer models.
Args:
dim (int): Number of input channels.
num_heads (int): Number of attention heads. Default is 8.
qkv_bias (bool): If True, adds a learnable bias to query, key, value projections. Default is True.
use_rel_pos (bool): If True, uses relative positional encodings. Default is False.
rel_pos_zero_init (bool): If True, initializes relative positional parameters to zero. Default is True.
input_size (Tuple[int, int] | None): Input resolution for calculating relative positional parameter size.
Required if use_rel_pos is True. Default is None.
Examples:
>>> attention = REAttention(dim=256, num_heads=8, input_size=(32, 32))
>>> x = torch.randn(1, 32, 32, 256)
>>> output = attention(x)
>>> print(output.shape)
torch.Size([1, 32, 32, 256])
"""
super().__init__()
self.num_heads = num_heads
head_dim = dim // num_heads
self.scale = head_dim**-0.5
self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
self.proj = nn.Linear(dim, dim)
self.use_rel_pos = use_rel_pos
if self.use_rel_pos:
assert input_size is not None, "Input size must be provided if using relative positional encoding."
# Initialize relative positional embeddings
self.rel_pos_h = nn.Parameter(torch.zeros(2 * input_size[0] - 1, head_dim))
self.rel_pos_w = nn.Parameter(torch.zeros(2 * input_size[1] - 1, head_dim))
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Applies multi-head attention with optional relative positional encoding to input tensor."""
B, H, W, _ = x.shape
# qkv with shape (3, B, nHead, H * W, C)
qkv = self.qkv(x).reshape(B, H * W, 3, self.num_heads, -1).permute(2, 0, 3, 1, 4)
# q, k, v with shape (B * nHead, H * W, C)
q, k, v = qkv.reshape(3, B * self.num_heads, H * W, -1).unbind(0)
attn = (q * self.scale) @ k.transpose(-2, -1)
if self.use_rel_pos:
attn = add_decomposed_rel_pos(attn, q, self.rel_pos_h, self.rel_pos_w, (H, W), (H, W))
attn = attn.softmax(dim=-1)
x = (attn @ v).view(B, self.num_heads, H, W, -1).permute(0, 2, 3, 1, 4).reshape(B, H, W, -1)
return self.proj(x)
class PatchEmbed(nn.Module):
"""
Image to Patch Embedding module for vision transformer architectures.
This module converts an input image into a sequence of patch embeddings using a convolutional layer.
It is commonly used as the first layer in vision transformer architectures to transform image data
into a suitable format for subsequent transformer blocks.
Attributes:
proj (nn.Conv2d): Convolutional layer for projecting image patches to embeddings.
Methods:
forward: Applies patch embedding to the input tensor.
Examples:
>>> patch_embed = PatchEmbed(kernel_size=(16, 16), stride=(16, 16), in_chans=3, embed_dim=768)
>>> x = torch.randn(1, 3, 224, 224)
>>> output = patch_embed(x)
>>> print(output.shape)
torch.Size([1, 768, 14, 14])
"""
def __init__(
self,
kernel_size: Tuple[int, int] = (16, 16),
stride: Tuple[int, int] = (16, 16),
padding: Tuple[int, int] = (0, 0),
in_chans: int = 3,
embed_dim: int = 768,
) -> None:
"""
Initializes the PatchEmbed module for converting image patches to embeddings.
This module is typically used as the first layer in vision transformer architectures to transform
image data into a suitable format for subsequent transformer blocks.
Args:
kernel_size (Tuple[int, int]): Size of the convolutional kernel for patch extraction.
stride (Tuple[int, int]): Stride of the convolutional operation.
padding (Tuple[int, int]): Padding applied to the input before convolution.
in_chans (int): Number of input image channels.
embed_dim (int): Dimensionality of the output patch embeddings.
Examples:
>>> patch_embed = PatchEmbed(kernel_size=(16, 16), stride=(16, 16), in_chans=3, embed_dim=768)
>>> x = torch.randn(1, 3, 224, 224)
>>> output = patch_embed(x)
>>> print(output.shape)
torch.Size([1, 768, 14, 14])
"""
super().__init__()
self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=kernel_size, stride=stride, padding=padding)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Computes patch embedding by applying convolution and transposing resulting tensor."""
return self.proj(x).permute(0, 2, 3, 1) # B C H W -> B H W C
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment