"platforms/reference/vscode:/vscode.git/clone" did not exist on "e45f1f289c2dff4847d80d9941d9eee9254de1ac"
Commit 80a37498 authored by yongshk's avatar yongshk
Browse files

Initial commit

parents
Pipeline #3463 failed with stages
in 0 seconds
# Copyright (c) Facebook, Inc. and its affiliates.
from .build import build_lr_scheduler, build_optimizer, get_default_optimizer_params
from .lr_scheduler import (
LRMultiplier,
LRScheduler,
WarmupCosineLR,
WarmupMultiStepLR,
WarmupParamScheduler,
)
__all__ = [k for k in globals().keys() if not k.startswith("_")]
# Copyright (c) Facebook, Inc. and its affiliates.
import copy
import itertools
import logging
from collections import defaultdict
from enum import Enum
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Type, Union
import torch
from fvcore.common.param_scheduler import (
CosineParamScheduler,
MultiStepParamScheduler,
StepWithFixedGammaParamScheduler,
)
from detectron2.config import CfgNode
from detectron2.utils.env import TORCH_VERSION
from .lr_scheduler import LRMultiplier, LRScheduler, WarmupParamScheduler
_GradientClipperInput = Union[torch.Tensor, Iterable[torch.Tensor]]
_GradientClipper = Callable[[_GradientClipperInput], None]
class GradientClipType(Enum):
VALUE = "value"
NORM = "norm"
def _create_gradient_clipper(cfg: CfgNode) -> _GradientClipper:
"""
Creates gradient clipping closure to clip by value or by norm,
according to the provided config.
"""
cfg = copy.deepcopy(cfg)
def clip_grad_norm(p: _GradientClipperInput):
torch.nn.utils.clip_grad_norm_(p, cfg.CLIP_VALUE, cfg.NORM_TYPE)
def clip_grad_value(p: _GradientClipperInput):
torch.nn.utils.clip_grad_value_(p, cfg.CLIP_VALUE)
_GRADIENT_CLIP_TYPE_TO_CLIPPER = {
GradientClipType.VALUE: clip_grad_value,
GradientClipType.NORM: clip_grad_norm,
}
return _GRADIENT_CLIP_TYPE_TO_CLIPPER[GradientClipType(cfg.CLIP_TYPE)]
def _generate_optimizer_class_with_gradient_clipping(
optimizer: Type[torch.optim.Optimizer],
*,
per_param_clipper: Optional[_GradientClipper] = None,
global_clipper: Optional[_GradientClipper] = None,
) -> Type[torch.optim.Optimizer]:
"""
Dynamically creates a new type that inherits the type of a given instance
and overrides the `step` method to add gradient clipping
"""
assert (
per_param_clipper is None or global_clipper is None
), "Not allowed to use both per-parameter clipping and global clipping"
def optimizer_wgc_step(self, closure=None):
if per_param_clipper is not None:
for group in self.param_groups:
for p in group["params"]:
per_param_clipper(p)
else:
# global clipper for future use with detr
# (https://github.com/facebookresearch/detr/pull/287)
all_params = itertools.chain(*[g["params"] for g in self.param_groups])
global_clipper(all_params)
super(type(self), self).step(closure)
OptimizerWithGradientClip = type(
optimizer.__name__ + "WithGradientClip",
(optimizer,),
{"step": optimizer_wgc_step},
)
return OptimizerWithGradientClip
def maybe_add_gradient_clipping(
cfg: CfgNode, optimizer: Type[torch.optim.Optimizer]
) -> Type[torch.optim.Optimizer]:
"""
If gradient clipping is enabled through config options, wraps the existing
optimizer type to become a new dynamically created class OptimizerWithGradientClip
that inherits the given optimizer and overrides the `step` method to
include gradient clipping.
Args:
cfg: CfgNode, configuration options
optimizer: type. A subclass of torch.optim.Optimizer
Return:
type: either the input `optimizer` (if gradient clipping is disabled), or
a subclass of it with gradient clipping included in the `step` method.
"""
if not cfg.SOLVER.CLIP_GRADIENTS.ENABLED:
return optimizer
if isinstance(optimizer, torch.optim.Optimizer):
optimizer_type = type(optimizer)
else:
assert issubclass(optimizer, torch.optim.Optimizer), optimizer
optimizer_type = optimizer
grad_clipper = _create_gradient_clipper(cfg.SOLVER.CLIP_GRADIENTS)
OptimizerWithGradientClip = _generate_optimizer_class_with_gradient_clipping(
optimizer_type, per_param_clipper=grad_clipper
)
if isinstance(optimizer, torch.optim.Optimizer):
optimizer.__class__ = OptimizerWithGradientClip # a bit hacky, not recommended
return optimizer
else:
return OptimizerWithGradientClip
def build_optimizer(cfg: CfgNode, model: torch.nn.Module) -> torch.optim.Optimizer:
"""
Build an optimizer from config.
"""
params = get_default_optimizer_params(
model,
base_lr=cfg.SOLVER.BASE_LR,
weight_decay_norm=cfg.SOLVER.WEIGHT_DECAY_NORM,
bias_lr_factor=cfg.SOLVER.BIAS_LR_FACTOR,
weight_decay_bias=cfg.SOLVER.WEIGHT_DECAY_BIAS,
)
sgd_args = {
"params": params,
"lr": cfg.SOLVER.BASE_LR,
"momentum": cfg.SOLVER.MOMENTUM,
"nesterov": cfg.SOLVER.NESTEROV,
"weight_decay": cfg.SOLVER.WEIGHT_DECAY,
}
if TORCH_VERSION >= (1, 12):
sgd_args["foreach"] = True
return maybe_add_gradient_clipping(cfg, torch.optim.SGD(**sgd_args))
def get_default_optimizer_params(
model: torch.nn.Module,
base_lr: Optional[float] = None,
weight_decay: Optional[float] = None,
weight_decay_norm: Optional[float] = None,
bias_lr_factor: Optional[float] = 1.0,
weight_decay_bias: Optional[float] = None,
lr_factor_func: Optional[Callable] = None,
overrides: Optional[Dict[str, Dict[str, float]]] = None,
) -> List[Dict[str, Any]]:
"""
Get default param list for optimizer, with support for a few types of
overrides. If no overrides needed, this is equivalent to `model.parameters()`.
Args:
base_lr: lr for every group by default. Can be omitted to use the one in optimizer.
weight_decay: weight decay for every group by default. Can be omitted to use the one
in optimizer.
weight_decay_norm: override weight decay for params in normalization layers
bias_lr_factor: multiplier of lr for bias parameters.
weight_decay_bias: override weight decay for bias parameters.
lr_factor_func: function to calculate lr decay rate by mapping the parameter names to
corresponding lr decay rate. Note that setting this option requires
also setting ``base_lr``.
overrides: if not `None`, provides values for optimizer hyperparameters
(LR, weight decay) for module parameters with a given name; e.g.
``{"embedding": {"lr": 0.01, "weight_decay": 0.1}}`` will set the LR and
weight decay values for all module parameters named `embedding`.
For common detection models, ``weight_decay_norm`` is the only option
needed to be set. ``bias_lr_factor,weight_decay_bias`` are legacy settings
from Detectron1 that are not found useful.
Example:
::
torch.optim.SGD(get_default_optimizer_params(model, weight_decay_norm=0),
lr=0.01, weight_decay=1e-4, momentum=0.9)
"""
if overrides is None:
overrides = {}
defaults = {}
if base_lr is not None:
defaults["lr"] = base_lr
if weight_decay is not None:
defaults["weight_decay"] = weight_decay
bias_overrides = {}
if bias_lr_factor is not None and bias_lr_factor != 1.0:
# NOTE: unlike Detectron v1, we now by default make bias hyperparameters
# exactly the same as regular weights.
if base_lr is None:
raise ValueError("bias_lr_factor requires base_lr")
bias_overrides["lr"] = base_lr * bias_lr_factor
if weight_decay_bias is not None:
bias_overrides["weight_decay"] = weight_decay_bias
if len(bias_overrides):
if "bias" in overrides:
raise ValueError("Conflicting overrides for 'bias'")
overrides["bias"] = bias_overrides
if lr_factor_func is not None:
if base_lr is None:
raise ValueError("lr_factor_func requires base_lr")
norm_module_types = (
torch.nn.BatchNorm1d,
torch.nn.BatchNorm2d,
torch.nn.BatchNorm3d,
torch.nn.SyncBatchNorm,
# NaiveSyncBatchNorm inherits from BatchNorm2d
torch.nn.GroupNorm,
torch.nn.InstanceNorm1d,
torch.nn.InstanceNorm2d,
torch.nn.InstanceNorm3d,
torch.nn.LayerNorm,
torch.nn.LocalResponseNorm,
)
params: List[Dict[str, Any]] = []
memo: Set[torch.nn.parameter.Parameter] = set()
for module_name, module in model.named_modules():
for module_param_name, value in module.named_parameters(recurse=False):
if not value.requires_grad:
continue
# Avoid duplicating parameters
if value in memo:
continue
memo.add(value)
hyperparams = copy.copy(defaults)
if isinstance(module, norm_module_types) and weight_decay_norm is not None:
hyperparams["weight_decay"] = weight_decay_norm
if lr_factor_func is not None:
hyperparams["lr"] *= lr_factor_func(f"{module_name}.{module_param_name}")
hyperparams.update(overrides.get(module_param_name, {}))
params.append({"params": [value], **hyperparams})
return reduce_param_groups(params)
def _expand_param_groups(params: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Transform parameter groups into per-parameter structure.
# Later items in `params` can overwrite parameters set in previous items.
ret = defaultdict(dict)
for item in params:
assert "params" in item
cur_params = {x: y for x, y in item.items() if x != "params" and x != "param_names"}
if "param_names" in item:
for param_name, param in zip(item["param_names"], item["params"]):
ret[param].update({"param_names": [param_name], "params": [param], **cur_params})
else:
for param in item["params"]:
ret[param].update({"params": [param], **cur_params})
return list(ret.values())
def reduce_param_groups(params: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Reorganize the parameter groups and merge duplicated groups.
# The number of parameter groups needs to be as small as possible in order
# to efficiently use the PyTorch multi-tensor optimizer. Therefore instead
# of using a parameter_group per single parameter, we reorganize the
# parameter groups and merge duplicated groups. This approach speeds
# up multi-tensor optimizer significantly.
params = _expand_param_groups(params)
groups = defaultdict(list) # re-group all parameter groups by their hyperparams
for item in params:
cur_params = tuple((x, y) for x, y in item.items() if x != "params" and x != "param_names")
groups[cur_params].append({"params": item["params"]})
if "param_names" in item:
groups[cur_params][-1]["param_names"] = item["param_names"]
ret = []
for param_keys, param_values in groups.items():
cur = {kv[0]: kv[1] for kv in param_keys}
cur["params"] = list(
itertools.chain.from_iterable([params["params"] for params in param_values])
)
if len(param_values) > 0 and "param_names" in param_values[0]:
cur["param_names"] = list(
itertools.chain.from_iterable([params["param_names"] for params in param_values])
)
ret.append(cur)
return ret
def build_lr_scheduler(cfg: CfgNode, optimizer: torch.optim.Optimizer) -> LRScheduler:
"""
Build a LR scheduler from config.
"""
name = cfg.SOLVER.LR_SCHEDULER_NAME
if name == "WarmupMultiStepLR":
steps = [x for x in cfg.SOLVER.STEPS if x <= cfg.SOLVER.MAX_ITER]
if len(steps) != len(cfg.SOLVER.STEPS):
logger = logging.getLogger(__name__)
logger.warning(
"SOLVER.STEPS contains values larger than SOLVER.MAX_ITER. "
"These values will be ignored."
)
sched = MultiStepParamScheduler(
values=[cfg.SOLVER.GAMMA**k for k in range(len(steps) + 1)],
milestones=steps,
num_updates=cfg.SOLVER.MAX_ITER,
)
elif name == "WarmupCosineLR":
end_value = cfg.SOLVER.BASE_LR_END / cfg.SOLVER.BASE_LR
assert end_value >= 0.0 and end_value <= 1.0, end_value
sched = CosineParamScheduler(1, end_value)
elif name == "WarmupStepWithFixedGammaLR":
sched = StepWithFixedGammaParamScheduler(
base_value=1.0,
gamma=cfg.SOLVER.GAMMA,
num_decays=cfg.SOLVER.NUM_DECAYS,
num_updates=cfg.SOLVER.MAX_ITER,
)
else:
raise ValueError("Unknown LR scheduler: {}".format(name))
sched = WarmupParamScheduler(
sched,
cfg.SOLVER.WARMUP_FACTOR,
min(cfg.SOLVER.WARMUP_ITERS / cfg.SOLVER.MAX_ITER, 1.0),
cfg.SOLVER.WARMUP_METHOD,
cfg.SOLVER.RESCALE_INTERVAL,
)
return LRMultiplier(optimizer, multiplier=sched, max_iter=cfg.SOLVER.MAX_ITER)
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
import math
from bisect import bisect_right
from typing import List
import torch
from fvcore.common.param_scheduler import (
CompositeParamScheduler,
ConstantParamScheduler,
LinearParamScheduler,
ParamScheduler,
)
try:
from torch.optim.lr_scheduler import LRScheduler
except ImportError:
from torch.optim.lr_scheduler import _LRScheduler as LRScheduler
logger = logging.getLogger(__name__)
class WarmupParamScheduler(CompositeParamScheduler):
"""
Add an initial warmup stage to another scheduler.
"""
def __init__(
self,
scheduler: ParamScheduler,
warmup_factor: float,
warmup_length: float,
warmup_method: str = "linear",
rescale_interval: bool = False,
):
"""
Args:
scheduler: warmup will be added at the beginning of this scheduler
warmup_factor: the factor w.r.t the initial value of ``scheduler``, e.g. 0.001
warmup_length: the relative length (in [0, 1]) of warmup steps w.r.t the entire
training, e.g. 0.01
warmup_method: one of "linear" or "constant"
rescale_interval: whether we will rescale the interval of the scheduler after
warmup
"""
# the value to reach when warmup ends
end_value = scheduler(0.0) if rescale_interval else scheduler(warmup_length)
start_value = warmup_factor * scheduler(0.0)
if warmup_method == "constant":
warmup = ConstantParamScheduler(start_value)
elif warmup_method == "linear":
warmup = LinearParamScheduler(start_value, end_value)
else:
raise ValueError("Unknown warmup method: {}".format(warmup_method))
super().__init__(
[warmup, scheduler],
interval_scaling=["rescaled", "rescaled" if rescale_interval else "fixed"],
lengths=[warmup_length, 1 - warmup_length],
)
class LRMultiplier(LRScheduler):
"""
A LRScheduler which uses fvcore :class:`ParamScheduler` to multiply the
learning rate of each param in the optimizer.
Every step, the learning rate of each parameter becomes its initial value
multiplied by the output of the given :class:`ParamScheduler`.
The absolute learning rate value of each parameter can be different.
This scheduler can be used as long as the relative scale among them do
not change during training.
Examples:
::
LRMultiplier(
opt,
WarmupParamScheduler(
MultiStepParamScheduler(
[1, 0.1, 0.01],
milestones=[60000, 80000],
num_updates=90000,
), 0.001, 100 / 90000
),
max_iter=90000
)
"""
# NOTES: in the most general case, every LR can use its own scheduler.
# Supporting this requires interaction with the optimizer when its parameter
# group is initialized. For example, classyvision implements its own optimizer
# that allows different schedulers for every parameter group.
# To avoid this complexity, we use this class to support the most common cases
# where the relative scale among all LRs stay unchanged during training. In this
# case we only need a total of one scheduler that defines the relative LR multiplier.
def __init__(
self,
optimizer: torch.optim.Optimizer,
multiplier: ParamScheduler,
max_iter: int,
last_iter: int = -1,
):
"""
Args:
optimizer, last_iter: See ``torch.optim.lr_scheduler.LRScheduler``.
``last_iter`` is the same as ``last_epoch``.
multiplier: a fvcore ParamScheduler that defines the multiplier on
every LR of the optimizer
max_iter: the total number of training iterations
"""
if not isinstance(multiplier, ParamScheduler):
raise ValueError(
"_LRMultiplier(multiplier=) must be an instance of fvcore "
f"ParamScheduler. Got {multiplier} instead."
)
self._multiplier = multiplier
self._max_iter = max_iter
super().__init__(optimizer, last_epoch=last_iter)
def state_dict(self):
# fvcore schedulers are stateless. Only keep pytorch scheduler states
return {"base_lrs": self.base_lrs, "last_epoch": self.last_epoch}
def get_lr(self) -> List[float]:
multiplier = self._multiplier(self.last_epoch / self._max_iter)
return [base_lr * multiplier for base_lr in self.base_lrs]
"""
Content below is no longer needed!
"""
# NOTE: PyTorch's LR scheduler interface uses names that assume the LR changes
# only on epoch boundaries. We typically use iteration based schedules instead.
# As a result, "epoch" (e.g., as in self.last_epoch) should be understood to mean
# "iteration" instead.
# FIXME: ideally this would be achieved with a CombinedLRScheduler, separating
# MultiStepLR with WarmupLR but the current LRScheduler design doesn't allow it.
class WarmupMultiStepLR(LRScheduler):
def __init__(
self,
optimizer: torch.optim.Optimizer,
milestones: List[int],
gamma: float = 0.1,
warmup_factor: float = 0.001,
warmup_iters: int = 1000,
warmup_method: str = "linear",
last_epoch: int = -1,
):
logger.warning(
"WarmupMultiStepLR is deprecated! Use LRMultipilier with fvcore ParamScheduler instead!"
)
if not list(milestones) == sorted(milestones):
raise ValueError(
"Milestones should be a list of" " increasing integers. Got {}", milestones
)
self.milestones = milestones
self.gamma = gamma
self.warmup_factor = warmup_factor
self.warmup_iters = warmup_iters
self.warmup_method = warmup_method
super().__init__(optimizer, last_epoch)
def get_lr(self) -> List[float]:
warmup_factor = _get_warmup_factor_at_iter(
self.warmup_method, self.last_epoch, self.warmup_iters, self.warmup_factor
)
return [
base_lr * warmup_factor * self.gamma ** bisect_right(self.milestones, self.last_epoch)
for base_lr in self.base_lrs
]
def _compute_values(self) -> List[float]:
# The new interface
return self.get_lr()
class WarmupCosineLR(LRScheduler):
def __init__(
self,
optimizer: torch.optim.Optimizer,
max_iters: int,
warmup_factor: float = 0.001,
warmup_iters: int = 1000,
warmup_method: str = "linear",
last_epoch: int = -1,
):
logger.warning(
"WarmupCosineLR is deprecated! Use LRMultipilier with fvcore ParamScheduler instead!"
)
self.max_iters = max_iters
self.warmup_factor = warmup_factor
self.warmup_iters = warmup_iters
self.warmup_method = warmup_method
super().__init__(optimizer, last_epoch)
def get_lr(self) -> List[float]:
warmup_factor = _get_warmup_factor_at_iter(
self.warmup_method, self.last_epoch, self.warmup_iters, self.warmup_factor
)
# Different definitions of half-cosine with warmup are possible. For
# simplicity we multiply the standard half-cosine schedule by the warmup
# factor. An alternative is to start the period of the cosine at warmup_iters
# instead of at 0. In the case that warmup_iters << max_iters the two are
# very close to each other.
return [
base_lr
* warmup_factor
* 0.5
* (1.0 + math.cos(math.pi * self.last_epoch / self.max_iters))
for base_lr in self.base_lrs
]
def _compute_values(self) -> List[float]:
# The new interface
return self.get_lr()
def _get_warmup_factor_at_iter(
method: str, iter: int, warmup_iters: int, warmup_factor: float
) -> float:
"""
Return the learning rate warmup factor at a specific iteration.
See :paper:`ImageNet in 1h` for more details.
Args:
method (str): warmup method; either "constant" or "linear".
iter (int): iteration at which to calculate the warmup factor.
warmup_iters (int): the number of warmup iterations.
warmup_factor (float): the base warmup factor (the meaning changes according
to the method used).
Returns:
float: the effective warmup factor at the given iteration.
"""
if iter >= warmup_iters:
return 1.0
if method == "constant":
return warmup_factor
elif method == "linear":
alpha = iter / warmup_iters
return warmup_factor * (1 - alpha) + alpha
else:
raise ValueError("Unknown warmup method: {}".format(method))
# Copyright (c) Facebook, Inc. and its affiliates.
from .boxes import Boxes, BoxMode, pairwise_iou, pairwise_ioa, pairwise_point_box_distance
from .image_list import ImageList
from .instances import Instances
from .keypoints import Keypoints, heatmaps_to_keypoints
from .masks import BitMasks, PolygonMasks, polygons_to_bitmask, ROIMasks
from .rotated_boxes import RotatedBoxes
from .rotated_boxes import pairwise_iou as pairwise_iou_rotated
__all__ = [k for k in globals().keys() if not k.startswith("_")]
from detectron2.utils.env import fixup_module_metadata
fixup_module_metadata(__name__, globals(), __all__)
del fixup_module_metadata
# Copyright (c) Facebook, Inc. and its affiliates.
import math
import numpy as np
from enum import IntEnum, unique
from typing import List, Tuple, Union
import torch
from torch import device
_RawBoxType = Union[List[float], Tuple[float, ...], torch.Tensor, np.ndarray]
@unique
class BoxMode(IntEnum):
"""
Enum of different ways to represent a box.
"""
XYXY_ABS = 0
"""
(x0, y0, x1, y1) in absolute floating points coordinates.
The coordinates in range [0, width or height].
"""
XYWH_ABS = 1
"""
(x0, y0, w, h) in absolute floating points coordinates.
"""
XYXY_REL = 2
"""
Not yet supported!
(x0, y0, x1, y1) in range [0, 1]. They are relative to the size of the image.
"""
XYWH_REL = 3
"""
Not yet supported!
(x0, y0, w, h) in range [0, 1]. They are relative to the size of the image.
"""
XYWHA_ABS = 4
"""
(xc, yc, w, h, a) in absolute floating points coordinates.
(xc, yc) is the center of the rotated box, and the angle a is in degrees ccw.
"""
@staticmethod
def convert(box: _RawBoxType, from_mode: "BoxMode", to_mode: "BoxMode") -> _RawBoxType:
"""
Args:
box: can be a k-tuple, k-list or an Nxk array/tensor, where k = 4 or 5
from_mode, to_mode (BoxMode)
Returns:
The converted box of the same type.
"""
if from_mode == to_mode:
return box
original_type = type(box)
is_numpy = isinstance(box, np.ndarray)
single_box = isinstance(box, (list, tuple))
if single_box:
assert len(box) == 4 or len(box) == 5, (
"BoxMode.convert takes either a k-tuple/list or an Nxk array/tensor,"
" where k == 4 or 5"
)
arr = torch.tensor(box)[None, :]
else:
# avoid modifying the input box
if is_numpy:
arr = torch.from_numpy(np.asarray(box)).clone()
else:
arr = box.clone()
assert to_mode not in [BoxMode.XYXY_REL, BoxMode.XYWH_REL] and from_mode not in [
BoxMode.XYXY_REL,
BoxMode.XYWH_REL,
], "Relative mode not yet supported!"
if from_mode == BoxMode.XYWHA_ABS and to_mode == BoxMode.XYXY_ABS:
assert (
arr.shape[-1] == 5
), "The last dimension of input shape must be 5 for XYWHA format"
original_dtype = arr.dtype
arr = arr.double()
w = arr[:, 2]
h = arr[:, 3]
a = arr[:, 4]
c = torch.abs(torch.cos(a * math.pi / 180.0))
s = torch.abs(torch.sin(a * math.pi / 180.0))
# This basically computes the horizontal bounding rectangle of the rotated box
new_w = c * w + s * h
new_h = c * h + s * w
# convert center to top-left corner
arr[:, 0] -= new_w / 2.0
arr[:, 1] -= new_h / 2.0
# bottom-right corner
arr[:, 2] = arr[:, 0] + new_w
arr[:, 3] = arr[:, 1] + new_h
arr = arr[:, :4].to(dtype=original_dtype)
elif from_mode == BoxMode.XYWH_ABS and to_mode == BoxMode.XYWHA_ABS:
original_dtype = arr.dtype
arr = arr.double()
arr[:, 0] += arr[:, 2] / 2.0
arr[:, 1] += arr[:, 3] / 2.0
angles = torch.zeros((arr.shape[0], 1), dtype=arr.dtype)
arr = torch.cat((arr, angles), axis=1).to(dtype=original_dtype)
else:
if to_mode == BoxMode.XYXY_ABS and from_mode == BoxMode.XYWH_ABS:
arr[:, 2] += arr[:, 0]
arr[:, 3] += arr[:, 1]
elif from_mode == BoxMode.XYXY_ABS and to_mode == BoxMode.XYWH_ABS:
arr[:, 2] -= arr[:, 0]
arr[:, 3] -= arr[:, 1]
else:
raise NotImplementedError(
"Conversion from BoxMode {} to {} is not supported yet".format(
from_mode, to_mode
)
)
if single_box:
return original_type(arr.flatten().tolist())
if is_numpy:
return arr.numpy()
else:
return arr
class Boxes:
"""
This structure stores a list of boxes as a Nx4 torch.Tensor.
It supports some common methods about boxes
(`area`, `clip`, `nonempty`, etc),
and also behaves like a Tensor
(support indexing, `to(device)`, `.device`, and iteration over all boxes)
Attributes:
tensor (torch.Tensor): float matrix of Nx4. Each row is (x1, y1, x2, y2).
"""
def __init__(self, tensor: torch.Tensor):
"""
Args:
tensor (Tensor[float]): a Nx4 matrix. Each row is (x1, y1, x2, y2).
"""
if not isinstance(tensor, torch.Tensor):
tensor = torch.as_tensor(tensor, dtype=torch.float32, device=torch.device("cpu"))
else:
tensor = tensor.to(torch.float32)
if tensor.numel() == 0:
# Use reshape, so we don't end up creating a new tensor that does not depend on
# the inputs (and consequently confuses jit)
tensor = tensor.reshape((-1, 4)).to(dtype=torch.float32)
assert tensor.dim() == 2 and tensor.size(-1) == 4, tensor.size()
self.tensor = tensor
def clone(self) -> "Boxes":
"""
Clone the Boxes.
Returns:
Boxes
"""
return Boxes(self.tensor.clone())
def to(self, device: torch.device):
# Boxes are assumed float32 and does not support to(dtype)
return Boxes(self.tensor.to(device=device))
def area(self) -> torch.Tensor:
"""
Computes the area of all the boxes.
Returns:
torch.Tensor: a vector with areas of each box.
"""
box = self.tensor
area = (box[:, 2] - box[:, 0]) * (box[:, 3] - box[:, 1])
return area
def clip(self, box_size: Tuple[int, int]) -> None:
"""
Clip (in place) the boxes by limiting x coordinates to the range [0, width]
and y coordinates to the range [0, height].
Args:
box_size (height, width): The clipping box's size.
"""
assert torch.isfinite(self.tensor).all(), "Box tensor contains infinite or NaN!"
h, w = box_size
x1 = self.tensor[:, 0].clamp(min=0, max=w)
y1 = self.tensor[:, 1].clamp(min=0, max=h)
x2 = self.tensor[:, 2].clamp(min=0, max=w)
y2 = self.tensor[:, 3].clamp(min=0, max=h)
self.tensor = torch.stack((x1, y1, x2, y2), dim=-1)
def nonempty(self, threshold: float = 0.0) -> torch.Tensor:
"""
Find boxes that are non-empty.
A box is considered empty, if either of its side is no larger than threshold.
Returns:
Tensor:
a binary vector which represents whether each box is empty
(False) or non-empty (True).
"""
box = self.tensor
widths = box[:, 2] - box[:, 0]
heights = box[:, 3] - box[:, 1]
keep = (widths > threshold) & (heights > threshold)
return keep
def __getitem__(self, item) -> "Boxes":
"""
Args:
item: int, slice, or a BoolTensor
Returns:
Boxes: Create a new :class:`Boxes` by indexing.
The following usage are allowed:
1. `new_boxes = boxes[3]`: return a `Boxes` which contains only one box.
2. `new_boxes = boxes[2:10]`: return a slice of boxes.
3. `new_boxes = boxes[vector]`, where vector is a torch.BoolTensor
with `length = len(boxes)`. Nonzero elements in the vector will be selected.
Note that the returned Boxes might share storage with this Boxes,
subject to Pytorch's indexing semantics.
"""
if isinstance(item, int):
return Boxes(self.tensor[item].view(1, -1))
b = self.tensor[item]
assert b.dim() == 2, "Indexing on Boxes with {} failed to return a matrix!".format(item)
return Boxes(b)
def __len__(self) -> int:
return self.tensor.shape[0]
def __repr__(self) -> str:
return "Boxes(" + str(self.tensor) + ")"
def inside_box(self, box_size: Tuple[int, int], boundary_threshold: int = 0) -> torch.Tensor:
"""
Args:
box_size (height, width): Size of the reference box.
boundary_threshold (int): Boxes that extend beyond the reference box
boundary by more than boundary_threshold are considered "outside".
Returns:
a binary vector, indicating whether each box is inside the reference box.
"""
height, width = box_size
inds_inside = (
(self.tensor[..., 0] >= -boundary_threshold)
& (self.tensor[..., 1] >= -boundary_threshold)
& (self.tensor[..., 2] < width + boundary_threshold)
& (self.tensor[..., 3] < height + boundary_threshold)
)
return inds_inside
def get_centers(self) -> torch.Tensor:
"""
Returns:
The box centers in a Nx2 array of (x, y).
"""
return (self.tensor[:, :2] + self.tensor[:, 2:]) / 2
def scale(self, scale_x: float, scale_y: float) -> None:
"""
Scale the box with horizontal and vertical scaling factors
"""
self.tensor[:, 0::2] *= scale_x
self.tensor[:, 1::2] *= scale_y
@classmethod
def cat(cls, boxes_list: List["Boxes"]) -> "Boxes":
"""
Concatenates a list of Boxes into a single Boxes
Arguments:
boxes_list (list[Boxes])
Returns:
Boxes: the concatenated Boxes
"""
assert isinstance(boxes_list, (list, tuple))
if len(boxes_list) == 0:
return cls(torch.empty(0))
assert all([isinstance(box, Boxes) for box in boxes_list])
# use torch.cat (v.s. layers.cat) so the returned boxes never share storage with input
cat_boxes = cls(torch.cat([b.tensor for b in boxes_list], dim=0))
return cat_boxes
@property
def device(self) -> device:
return self.tensor.device
# type "Iterator[torch.Tensor]", yield, and iter() not supported by torchscript
# https://github.com/pytorch/pytorch/issues/18627
@torch.jit.unused
def __iter__(self):
"""
Yield a box as a Tensor of shape (4,) at a time.
"""
yield from self.tensor
def pairwise_intersection(boxes1: Boxes, boxes2: Boxes) -> torch.Tensor:
"""
Given two lists of boxes of size N and M,
compute the intersection area between __all__ N x M pairs of boxes.
The box order must be (xmin, ymin, xmax, ymax)
Args:
boxes1,boxes2 (Boxes): two `Boxes`. Contains N & M boxes, respectively.
Returns:
Tensor: intersection, sized [N,M].
"""
boxes1, boxes2 = boxes1.tensor, boxes2.tensor
width_height = torch.min(boxes1[:, None, 2:], boxes2[:, 2:]) - torch.max(
boxes1[:, None, :2], boxes2[:, :2]
) # [N,M,2]
width_height.clamp_(min=0) # [N,M,2]
intersection = width_height.prod(dim=2) # [N,M]
return intersection
# implementation from https://github.com/kuangliu/torchcv/blob/master/torchcv/utils/box.py
# with slight modifications
def pairwise_iou(boxes1: Boxes, boxes2: Boxes) -> torch.Tensor:
"""
Given two lists of boxes of size N and M, compute the IoU
(intersection over union) between **all** N x M pairs of boxes.
The box order must be (xmin, ymin, xmax, ymax).
Args:
boxes1,boxes2 (Boxes): two `Boxes`. Contains N & M boxes, respectively.
Returns:
Tensor: IoU, sized [N,M].
"""
area1 = boxes1.area() # [N]
area2 = boxes2.area() # [M]
inter = pairwise_intersection(boxes1, boxes2)
# handle empty boxes
iou = torch.where(
inter > 0,
inter / (area1[:, None] + area2 - inter),
torch.zeros(1, dtype=inter.dtype, device=inter.device),
)
return iou
def pairwise_ioa(boxes1: Boxes, boxes2: Boxes) -> torch.Tensor:
"""
Similar to :func:`pariwise_iou` but compute the IoA (intersection over boxes2 area).
Args:
boxes1,boxes2 (Boxes): two `Boxes`. Contains N & M boxes, respectively.
Returns:
Tensor: IoA, sized [N,M].
"""
area2 = boxes2.area() # [M]
inter = pairwise_intersection(boxes1, boxes2)
# handle empty boxes
ioa = torch.where(
inter > 0, inter / area2, torch.zeros(1, dtype=inter.dtype, device=inter.device)
)
return ioa
def pairwise_point_box_distance(points: torch.Tensor, boxes: Boxes):
"""
Pairwise distance between N points and M boxes. The distance between a
point and a box is represented by the distance from the point to 4 edges
of the box. Distances are all positive when the point is inside the box.
Args:
points: Nx2 coordinates. Each row is (x, y)
boxes: M boxes
Returns:
Tensor: distances of size (N, M, 4). The 4 values are distances from
the point to the left, top, right, bottom of the box.
"""
x, y = points.unsqueeze(dim=2).unbind(dim=1) # (N, 1)
x0, y0, x1, y1 = boxes.tensor.unsqueeze(dim=0).unbind(dim=2) # (1, M)
return torch.stack([x - x0, y - y0, x1 - x, y1 - y], dim=2)
def matched_pairwise_iou(boxes1: Boxes, boxes2: Boxes) -> torch.Tensor:
"""
Compute pairwise intersection over union (IOU) of two sets of matched
boxes that have the same number of boxes.
Similar to :func:`pairwise_iou`, but computes only diagonal elements of the matrix.
Args:
boxes1 (Boxes): bounding boxes, sized [N,4].
boxes2 (Boxes): same length as boxes1
Returns:
Tensor: iou, sized [N].
"""
assert len(boxes1) == len(
boxes2
), "boxlists should have the same" "number of entries, got {}, {}".format(
len(boxes1), len(boxes2)
)
area1 = boxes1.area() # [N]
area2 = boxes2.area() # [N]
box1, box2 = boxes1.tensor, boxes2.tensor
lt = torch.max(box1[:, :2], box2[:, :2]) # [N,2]
rb = torch.min(box1[:, 2:], box2[:, 2:]) # [N,2]
wh = (rb - lt).clamp(min=0) # [N,2]
inter = wh[:, 0] * wh[:, 1] # [N]
iou = inter / (area1 + area2 - inter) # [N]
return iou
# Copyright (c) Facebook, Inc. and its affiliates.
from __future__ import division
from typing import Any, Dict, List, Optional, Tuple
import torch
from torch import device
from torch.nn import functional as F
from detectron2.layers.wrappers import move_device_like, shapes_to_tensor
from detectron2.utils.torch_version_utils import min_torch_version
class ImageList:
"""
Structure that holds a list of images (of possibly
varying sizes) as a single tensor.
This works by padding the images to the same size.
The original sizes of each image is stored in `image_sizes`.
Attributes:
image_sizes (list[tuple[int, int]]): each tuple is (h, w).
During tracing, it becomes list[Tensor] instead.
"""
def __init__(self, tensor: torch.Tensor, image_sizes: List[Tuple[int, int]]):
"""
Arguments:
tensor (Tensor): of shape (N, H, W) or (N, C_1, ..., C_K, H, W) where K >= 1
image_sizes (list[tuple[int, int]]): Each tuple is (h, w). It can
be smaller than (H, W) due to padding.
"""
self.tensor = tensor
self.image_sizes = image_sizes
def __len__(self) -> int:
return len(self.image_sizes)
def __getitem__(self, idx) -> torch.Tensor:
"""
Access the individual image in its original size.
Args:
idx: int or slice
Returns:
Tensor: an image of shape (H, W) or (C_1, ..., C_K, H, W) where K >= 1
"""
size = self.image_sizes[idx]
return self.tensor[idx, ..., : size[0], : size[1]]
@torch.jit.unused
def to(self, *args: Any, **kwargs: Any) -> "ImageList":
cast_tensor = self.tensor.to(*args, **kwargs)
return ImageList(cast_tensor, self.image_sizes)
@property
def device(self) -> device:
return self.tensor.device
@staticmethod
def from_tensors(
tensors: List[torch.Tensor],
size_divisibility: int = 0,
pad_value: float = 0.0,
padding_constraints: Optional[Dict[str, int]] = None,
) -> "ImageList":
"""
Args:
tensors: a tuple or list of `torch.Tensor`, each of shape (Hi, Wi) or
(C_1, ..., C_K, Hi, Wi) where K >= 1. The Tensors will be padded
to the same shape with `pad_value`.
size_divisibility (int): If `size_divisibility > 0`, add padding to ensure
the common height and width is divisible by `size_divisibility`.
This depends on the model and many models need a divisibility of 32.
pad_value (float): value to pad.
padding_constraints (optional[Dict]): If given, it would follow the format as
{"size_divisibility": int, "square_size": int}, where `size_divisibility` will
overwrite the above one if presented and `square_size` indicates the
square padding size if `square_size` > 0.
Returns:
an `ImageList`.
"""
assert len(tensors) > 0
assert isinstance(tensors, (tuple, list))
for t in tensors:
assert isinstance(t, torch.Tensor), type(t)
assert t.shape[:-2] == tensors[0].shape[:-2], t.shape
image_sizes = [(im.shape[-2], im.shape[-1]) for im in tensors]
image_sizes_tensor = [shapes_to_tensor(x) for x in image_sizes]
max_size = torch.stack(image_sizes_tensor).max(0).values
if padding_constraints is not None:
square_size = padding_constraints.get("square_size", 0)
if square_size > 0:
# pad to square.
max_size[0] = max_size[1] = square_size
if "size_divisibility" in padding_constraints:
size_divisibility = padding_constraints["size_divisibility"]
if size_divisibility > 1:
stride = size_divisibility
# the last two dims are H,W, both subject to divisibility requirement
max_size = (max_size + (stride - 1)).div(stride, rounding_mode="floor") * stride
# handle weirdness of scripting and tracing ...
if torch.jit.is_scripting():
max_size: List[int] = max_size.to(dtype=torch.long).tolist()
else:
if torch.jit.is_tracing():
image_sizes = image_sizes_tensor
if len(tensors) == 1:
# This seems slightly (2%) faster.
# TODO: check whether it's faster for multiple images as well
image_size = image_sizes[0]
u0 = max_size[-1] - image_size[1]
u1 = max_size[-2] - image_size[0]
padding_size = [0, u0, 0, u1]
if not torch.jit.is_scripting():
if min_torch_version("2.6.0") and torch.compiler.is_compiling():
torch._check(u0.item() >= 0)
torch._check(u1.item() >= 0)
batched_imgs = F.pad(tensors[0], padding_size, value=pad_value).unsqueeze_(0)
else:
# max_size can be a tensor in tracing mode, therefore convert to list
batch_shape = [len(tensors)] + list(tensors[0].shape[:-2]) + list(max_size)
device = (
None if torch.jit.is_scripting() else ("cpu" if torch.jit.is_tracing() else None)
)
batched_imgs = tensors[0].new_full(batch_shape, pad_value, device=device)
batched_imgs = move_device_like(batched_imgs, tensors[0])
for i, img in enumerate(tensors):
# Use `batched_imgs` directly instead of `img, pad_img = zip(tensors, batched_imgs)`
# Tracing mode cannot capture `copy_()` of temporary locals
batched_imgs[i, ..., : img.shape[-2], : img.shape[-1]].copy_(img)
return ImageList(batched_imgs.contiguous(), image_sizes)
# Copyright (c) Facebook, Inc. and its affiliates.
import itertools
import warnings
from typing import Any, Dict, List, Tuple, Union
import torch
class Instances:
"""
This class represents a list of instances in an image.
It stores the attributes of instances (e.g., boxes, masks, labels, scores) as "fields".
All fields must have the same ``__len__`` which is the number of instances.
All other (non-field) attributes of this class are considered private:
they must start with '_' and are not modifiable by a user.
Some basic usage:
1. Set/get/check a field:
.. code-block:: python
instances.gt_boxes = Boxes(...)
print(instances.pred_masks) # a tensor of shape (N, H, W)
print('gt_masks' in instances)
2. ``len(instances)`` returns the number of instances
3. Indexing: ``instances[indices]`` will apply the indexing on all the fields
and returns a new :class:`Instances`.
Typically, ``indices`` is a integer vector of indices,
or a binary mask of length ``num_instances``
.. code-block:: python
category_3_detections = instances[instances.pred_classes == 3]
confident_detections = instances[instances.scores > 0.9]
"""
def __init__(self, image_size: Tuple[int, int], **kwargs: Any):
"""
Args:
image_size (height, width): the spatial size of the image.
kwargs: fields to add to this `Instances`.
"""
self._image_size = image_size
self._fields: Dict[str, Any] = {}
for k, v in kwargs.items():
self.set(k, v)
@property
def image_size(self) -> Tuple[int, int]:
"""
Returns:
tuple: height, width
"""
return self._image_size
def __setattr__(self, name: str, val: Any) -> None:
if name.startswith("_"):
super().__setattr__(name, val)
else:
self.set(name, val)
def __getattr__(self, name: str) -> Any:
if name == "_fields" or name not in self._fields:
raise AttributeError("Cannot find field '{}' in the given Instances!".format(name))
return self._fields[name]
def set(self, name: str, value: Any) -> None:
"""
Set the field named `name` to `value`.
The length of `value` must be the number of instances,
and must agree with other existing fields in this object.
"""
with warnings.catch_warnings(record=True):
data_len = len(value)
if len(self._fields):
assert (
len(self) == data_len
), "Adding a field of length {} to a Instances of length {}".format(data_len, len(self))
self._fields[name] = value
def has(self, name: str) -> bool:
"""
Returns:
bool: whether the field called `name` exists.
"""
return name in self._fields
def remove(self, name: str) -> None:
"""
Remove the field called `name`.
"""
del self._fields[name]
def get(self, name: str) -> Any:
"""
Returns the field called `name`.
"""
return self._fields[name]
def get_fields(self) -> Dict[str, Any]:
"""
Returns:
dict: a dict which maps names (str) to data of the fields
Modifying the returned dict will modify this instance.
"""
return self._fields
# Tensor-like methods
def to(self, *args: Any, **kwargs: Any) -> "Instances":
"""
Returns:
Instances: all fields are called with a `to(device)`, if the field has this method.
"""
ret = Instances(self._image_size)
for k, v in self._fields.items():
if hasattr(v, "to"):
v = v.to(*args, **kwargs)
ret.set(k, v)
return ret
def __getitem__(self, item: Union[int, slice, torch.BoolTensor]) -> "Instances":
"""
Args:
item: an index-like object and will be used to index all the fields.
Returns:
If `item` is a string, return the data in the corresponding field.
Otherwise, returns an `Instances` where all fields are indexed by `item`.
"""
if type(item) is int:
if item >= len(self) or item < -len(self):
raise IndexError("Instances index out of range!")
else:
item = slice(item, None, len(self))
ret = Instances(self._image_size)
for k, v in self._fields.items():
ret.set(k, v[item])
return ret
def __len__(self) -> int:
for v in self._fields.values():
# use __len__ because len() has to be int and is not friendly to tracing
return v.__len__()
raise NotImplementedError("Empty Instances does not support __len__!")
def __iter__(self):
raise NotImplementedError("`Instances` object is not iterable!")
@staticmethod
def cat(instance_lists: List["Instances"]) -> "Instances":
"""
Args:
instance_lists (list[Instances])
Returns:
Instances
"""
assert all(isinstance(i, Instances) for i in instance_lists)
assert len(instance_lists) > 0
if len(instance_lists) == 1:
return instance_lists[0]
image_size = instance_lists[0].image_size
if not isinstance(image_size, torch.Tensor): # could be a tensor in tracing
for i in instance_lists[1:]:
assert i.image_size == image_size
ret = Instances(image_size)
for k in instance_lists[0]._fields.keys():
values = [i.get(k) for i in instance_lists]
v0 = values[0]
if isinstance(v0, torch.Tensor):
values = torch.cat(values, dim=0)
elif isinstance(v0, list):
values = list(itertools.chain(*values))
elif hasattr(type(v0), "cat"):
values = type(v0).cat(values)
else:
raise ValueError("Unsupported type {} for concatenation".format(type(v0)))
ret.set(k, values)
return ret
def __str__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={}, ".format(len(self))
s += "image_height={}, ".format(self._image_size[0])
s += "image_width={}, ".format(self._image_size[1])
s += "fields=[{}])".format(", ".join((f"{k}: {v}" for k, v in self._fields.items())))
return s
__repr__ = __str__
# Copyright (c) Facebook, Inc. and its affiliates.
import numpy as np
from typing import Any, List, Tuple, Union
import torch
from torch.nn import functional as F
class Keypoints:
"""
Stores keypoint **annotation** data. GT Instances have a `gt_keypoints` property
containing the x,y location and visibility flag of each keypoint. This tensor has shape
(N, K, 3) where N is the number of instances and K is the number of keypoints per instance.
The visibility flag follows the COCO format and must be one of three integers:
* v=0: not labeled (in which case x=y=0)
* v=1: labeled but not visible
* v=2: labeled and visible
"""
def __init__(self, keypoints: Union[torch.Tensor, np.ndarray, List[List[float]]]):
"""
Arguments:
keypoints: A Tensor, numpy array, or list of the x, y, and visibility of each keypoint.
The shape should be (N, K, 3) where N is the number of
instances, and K is the number of keypoints per instance.
"""
device = keypoints.device if isinstance(keypoints, torch.Tensor) else torch.device("cpu")
keypoints = torch.as_tensor(keypoints, dtype=torch.float32, device=device)
assert keypoints.dim() == 3 and keypoints.shape[2] == 3, keypoints.shape
self.tensor = keypoints
def __len__(self) -> int:
return self.tensor.size(0)
def to(self, *args: Any, **kwargs: Any) -> "Keypoints":
return type(self)(self.tensor.to(*args, **kwargs))
@property
def device(self) -> torch.device:
return self.tensor.device
def to_heatmap(self, boxes: torch.Tensor, heatmap_size: int) -> torch.Tensor:
"""
Convert keypoint annotations to a heatmap of one-hot labels for training,
as described in :paper:`Mask R-CNN`.
Arguments:
boxes: Nx4 tensor, the boxes to draw the keypoints to
Returns:
heatmaps:
A tensor of shape (N, K), each element is integer spatial label
in the range [0, heatmap_size**2 - 1] for each keypoint in the input.
valid:
A tensor of shape (N, K) containing whether each keypoint is in the roi or not.
"""
return _keypoints_to_heatmap(self.tensor, boxes, heatmap_size)
def __getitem__(self, item: Union[int, slice, torch.BoolTensor]) -> "Keypoints":
"""
Create a new `Keypoints` by indexing on this `Keypoints`.
The following usage are allowed:
1. `new_kpts = kpts[3]`: return a `Keypoints` which contains only one instance.
2. `new_kpts = kpts[2:10]`: return a slice of key points.
3. `new_kpts = kpts[vector]`, where vector is a torch.ByteTensor
with `length = len(kpts)`. Nonzero elements in the vector will be selected.
Note that the returned Keypoints might share storage with this Keypoints,
subject to Pytorch's indexing semantics.
"""
if isinstance(item, int):
return Keypoints([self.tensor[item]])
return Keypoints(self.tensor[item])
def __repr__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={})".format(len(self.tensor))
return s
@staticmethod
def cat(keypoints_list: List["Keypoints"]) -> "Keypoints":
"""
Concatenates a list of Keypoints into a single Keypoints
Arguments:
keypoints_list (list[Keypoints])
Returns:
Keypoints: the concatenated Keypoints
"""
assert isinstance(keypoints_list, (list, tuple))
assert len(keypoints_list) > 0
assert all(isinstance(keypoints, Keypoints) for keypoints in keypoints_list)
cat_kpts = type(keypoints_list[0])(
torch.cat([kpts.tensor for kpts in keypoints_list], dim=0)
)
return cat_kpts
# TODO make this nicer, this is a direct translation from C2 (but removing the inner loop)
def _keypoints_to_heatmap(
keypoints: torch.Tensor, rois: torch.Tensor, heatmap_size: int
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Encode keypoint locations into a target heatmap for use in SoftmaxWithLoss across space.
Maps keypoints from the half-open interval [x1, x2) on continuous image coordinates to the
closed interval [0, heatmap_size - 1] on discrete image coordinates. We use the
continuous-discrete conversion from Heckbert 1990 ("What is the coordinate of a pixel?"):
d = floor(c) and c = d + 0.5, where d is a discrete coordinate and c is a continuous coordinate.
Arguments:
keypoints: tensor of keypoint locations in of shape (N, K, 3).
rois: Nx4 tensor of rois in xyxy format
heatmap_size: integer side length of square heatmap.
Returns:
heatmaps: A tensor of shape (N, K) containing an integer spatial label
in the range [0, heatmap_size**2 - 1] for each keypoint in the input.
valid: A tensor of shape (N, K) containing whether each keypoint is in
the roi or not.
"""
if rois.numel() == 0:
return rois.new().long(), rois.new().long()
offset_x = rois[:, 0]
offset_y = rois[:, 1]
scale_x = heatmap_size / (rois[:, 2] - rois[:, 0])
scale_y = heatmap_size / (rois[:, 3] - rois[:, 1])
offset_x = offset_x[:, None]
offset_y = offset_y[:, None]
scale_x = scale_x[:, None]
scale_y = scale_y[:, None]
x = keypoints[..., 0]
y = keypoints[..., 1]
x_boundary_inds = x == rois[:, 2][:, None]
y_boundary_inds = y == rois[:, 3][:, None]
x = (x - offset_x) * scale_x
x = x.floor().long()
y = (y - offset_y) * scale_y
y = y.floor().long()
x[x_boundary_inds] = heatmap_size - 1
y[y_boundary_inds] = heatmap_size - 1
valid_loc = (x >= 0) & (y >= 0) & (x < heatmap_size) & (y < heatmap_size)
vis = keypoints[..., 2] > 0
valid = (valid_loc & vis).long()
lin_ind = y * heatmap_size + x
heatmaps = lin_ind * valid
return heatmaps, valid
@torch.jit.script_if_tracing
def heatmaps_to_keypoints(maps: torch.Tensor, rois: torch.Tensor) -> torch.Tensor:
"""
Extract predicted keypoint locations from heatmaps.
Args:
maps (Tensor): (#ROIs, #keypoints, POOL_H, POOL_W). The predicted heatmap of logits for
each ROI and each keypoint.
rois (Tensor): (#ROIs, 4). The box of each ROI.
Returns:
Tensor of shape (#ROIs, #keypoints, 4) with the last dimension corresponding to
(x, y, logit, score) for each keypoint.
When converting discrete pixel indices in an NxN image to a continuous keypoint coordinate,
we maintain consistency with :meth:`Keypoints.to_heatmap` by using the conversion from
Heckbert 1990: c = d + 0.5, where d is a discrete coordinate and c is a continuous coordinate.
"""
offset_x = rois[:, 0]
offset_y = rois[:, 1]
widths = (rois[:, 2] - rois[:, 0]).clamp(min=1)
heights = (rois[:, 3] - rois[:, 1]).clamp(min=1)
widths_ceil = widths.ceil()
heights_ceil = heights.ceil()
num_rois, num_keypoints = maps.shape[:2]
xy_preds = maps.new_zeros(rois.shape[0], num_keypoints, 4)
width_corrections = widths / widths_ceil
height_corrections = heights / heights_ceil
keypoints_idx = torch.arange(num_keypoints, device=maps.device)
for i in range(num_rois):
outsize = (int(heights_ceil[i]), int(widths_ceil[i]))
roi_map = F.interpolate(maps[[i]], size=outsize, mode="bicubic", align_corners=False)
# Although semantically equivalent, `reshape` is used instead of `squeeze` due
# to limitation during ONNX export of `squeeze` in scripting mode
roi_map = roi_map.reshape(roi_map.shape[1:]) # keypoints x H x W
# softmax over the spatial region
max_score, _ = roi_map.view(num_keypoints, -1).max(1)
max_score = max_score.view(num_keypoints, 1, 1)
tmp_full_resolution = (roi_map - max_score).exp_()
tmp_pool_resolution = (maps[i] - max_score).exp_()
# Produce scores over the region H x W, but normalize with POOL_H x POOL_W,
# so that the scores of objects of different absolute sizes will be more comparable
roi_map_scores = tmp_full_resolution / tmp_pool_resolution.sum((1, 2), keepdim=True)
w = roi_map.shape[2]
pos = roi_map.view(num_keypoints, -1).argmax(1)
x_int = pos % w
y_int = (pos - x_int) // w
assert (
roi_map_scores[keypoints_idx, y_int, x_int]
== roi_map_scores.view(num_keypoints, -1).max(1)[0]
).all()
x = (x_int.float() + 0.5) * width_corrections[i]
y = (y_int.float() + 0.5) * height_corrections[i]
xy_preds[i, :, 0] = x + offset_x[i]
xy_preds[i, :, 1] = y + offset_y[i]
xy_preds[i, :, 2] = roi_map[keypoints_idx, y_int, x_int]
xy_preds[i, :, 3] = roi_map_scores[keypoints_idx, y_int, x_int]
return xy_preds
# Copyright (c) Facebook, Inc. and its affiliates.
import copy
import itertools
import numpy as np
from typing import Any, Iterator, List, Union
import pycocotools.mask as mask_util
import torch
from torch import device
from detectron2.layers.roi_align import ROIAlign
from detectron2.utils.memory import retry_if_cuda_oom
from .boxes import Boxes
def polygon_area(x, y):
# Using the shoelace formula
# https://stackoverflow.com/questions/24467972/calculate-area-of-polygon-given-x-y-coordinates
return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1)))
def polygons_to_bitmask(polygons: List[np.ndarray], height: int, width: int) -> np.ndarray:
"""
Args:
polygons (list[ndarray]): each array has shape (Nx2,)
height, width (int)
Returns:
ndarray: a bool mask of shape (height, width)
"""
if len(polygons) == 0:
# COCOAPI does not support empty polygons
return np.zeros((height, width)).astype(bool)
rles = mask_util.frPyObjects(polygons, height, width)
rle = mask_util.merge(rles)
return mask_util.decode(rle).astype(bool)
def rasterize_polygons_within_box(
polygons: List[np.ndarray], box: np.ndarray, mask_size: int
) -> torch.Tensor:
"""
Rasterize the polygons into a mask image and
crop the mask content in the given box.
The cropped mask is resized to (mask_size, mask_size).
This function is used when generating training targets for mask head in Mask R-CNN.
Given original ground-truth masks for an image, new ground-truth mask
training targets in the size of `mask_size x mask_size`
must be provided for each predicted box. This function will be called to
produce such targets.
Args:
polygons (list[ndarray[float]]): a list of polygons, which represents an instance.
box: 4-element numpy array
mask_size (int):
Returns:
Tensor: BoolTensor of shape (mask_size, mask_size)
"""
# 1. Shift the polygons w.r.t the boxes
w, h = box[2] - box[0], box[3] - box[1]
polygons = copy.deepcopy(polygons)
for p in polygons:
p[0::2] = p[0::2] - box[0]
p[1::2] = p[1::2] - box[1]
# 2. Rescale the polygons to the new box size
# max() to avoid division by small number
ratio_h = mask_size / max(h, 0.1)
ratio_w = mask_size / max(w, 0.1)
if ratio_h == ratio_w:
for p in polygons:
p *= ratio_h
else:
for p in polygons:
p[0::2] *= ratio_w
p[1::2] *= ratio_h
# 3. Rasterize the polygons with coco api
mask = polygons_to_bitmask(polygons, mask_size, mask_size)
mask = torch.from_numpy(mask)
return mask
class BitMasks:
"""
This class stores the segmentation masks for all objects in one image, in
the form of bitmaps.
Attributes:
tensor: bool Tensor of N,H,W, representing N instances in the image.
"""
def __init__(self, tensor: Union[torch.Tensor, np.ndarray]):
"""
Args:
tensor: bool Tensor of N,H,W, representing N instances in the image.
"""
if isinstance(tensor, torch.Tensor):
tensor = tensor.to(torch.bool)
else:
tensor = torch.as_tensor(tensor, dtype=torch.bool, device=torch.device("cpu"))
assert tensor.dim() == 3, tensor.size()
self.image_size = tensor.shape[1:]
self.tensor = tensor
@torch.jit.unused
def to(self, *args: Any, **kwargs: Any) -> "BitMasks":
return BitMasks(self.tensor.to(*args, **kwargs))
@property
def device(self) -> torch.device:
return self.tensor.device
@torch.jit.unused
def __getitem__(self, item: Union[int, slice, torch.BoolTensor]) -> "BitMasks":
"""
Returns:
BitMasks: Create a new :class:`BitMasks` by indexing.
The following usage are allowed:
1. `new_masks = masks[3]`: return a `BitMasks` which contains only one mask.
2. `new_masks = masks[2:10]`: return a slice of masks.
3. `new_masks = masks[vector]`, where vector is a torch.BoolTensor
with `length = len(masks)`. Nonzero elements in the vector will be selected.
Note that the returned object might share storage with this object,
subject to Pytorch's indexing semantics.
"""
if isinstance(item, int):
return BitMasks(self.tensor[item].unsqueeze(0))
m = self.tensor[item]
assert m.dim() == 3, "Indexing on BitMasks with {} returns a tensor with shape {}!".format(
item, m.shape
)
return BitMasks(m)
@torch.jit.unused
def __iter__(self) -> torch.Tensor:
yield from self.tensor
@torch.jit.unused
def __repr__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={})".format(len(self.tensor))
return s
def __len__(self) -> int:
return self.tensor.shape[0]
def nonempty(self) -> torch.Tensor:
"""
Find masks that are non-empty.
Returns:
Tensor: a BoolTensor which represents
whether each mask is empty (False) or non-empty (True).
"""
return self.tensor.flatten(1).any(dim=1)
@staticmethod
def from_polygon_masks(
polygon_masks: Union["PolygonMasks", List[List[np.ndarray]]], height: int, width: int
) -> "BitMasks":
"""
Args:
polygon_masks (list[list[ndarray]] or PolygonMasks)
height, width (int)
"""
if isinstance(polygon_masks, PolygonMasks):
polygon_masks = polygon_masks.polygons
masks = [polygons_to_bitmask(p, height, width) for p in polygon_masks]
if len(masks):
return BitMasks(torch.stack([torch.from_numpy(x) for x in masks]))
else:
return BitMasks(torch.empty(0, height, width, dtype=torch.bool))
@staticmethod
def from_roi_masks(roi_masks: "ROIMasks", height: int, width: int) -> "BitMasks":
"""
Args:
roi_masks:
height, width (int):
"""
return roi_masks.to_bitmasks(height, width)
def crop_and_resize(self, boxes: torch.Tensor, mask_size: int) -> torch.Tensor:
"""
Crop each bitmask by the given box, and resize results to (mask_size, mask_size).
This can be used to prepare training targets for Mask R-CNN.
It has less reconstruction error compared to rasterization with polygons.
However we observe no difference in accuracy,
but BitMasks requires more memory to store all the masks.
Args:
boxes (Tensor): Nx4 tensor storing the boxes for each mask
mask_size (int): the size of the rasterized mask.
Returns:
Tensor:
A bool tensor of shape (N, mask_size, mask_size), where
N is the number of predicted boxes for this image.
"""
assert len(boxes) == len(self), "{} != {}".format(len(boxes), len(self))
device = self.tensor.device
batch_inds = torch.arange(len(boxes), device=device).to(dtype=boxes.dtype)[:, None]
rois = torch.cat([batch_inds, boxes], dim=1) # Nx5
bit_masks = self.tensor.to(dtype=torch.float32)
rois = rois.to(device=device)
output = (
ROIAlign((mask_size, mask_size), 1.0, 0, aligned=True)
.forward(bit_masks[:, None, :, :], rois)
.squeeze(1)
)
output = output >= 0.5
return output
def get_bounding_boxes(self) -> Boxes:
"""
Returns:
Boxes: tight bounding boxes around bitmasks.
If a mask is empty, it's bounding box will be all zero.
"""
boxes = torch.zeros(self.tensor.shape[0], 4, dtype=torch.float32)
x_any = torch.any(self.tensor, dim=1)
y_any = torch.any(self.tensor, dim=2)
for idx in range(self.tensor.shape[0]):
x = torch.where(x_any[idx, :])[0]
y = torch.where(y_any[idx, :])[0]
if len(x) > 0 and len(y) > 0:
boxes[idx, :] = torch.as_tensor(
[x[0], y[0], x[-1] + 1, y[-1] + 1], dtype=torch.float32
)
return Boxes(boxes)
@staticmethod
def cat(bitmasks_list: List["BitMasks"]) -> "BitMasks":
"""
Concatenates a list of BitMasks into a single BitMasks
Arguments:
bitmasks_list (list[BitMasks])
Returns:
BitMasks: the concatenated BitMasks
"""
assert isinstance(bitmasks_list, (list, tuple))
assert len(bitmasks_list) > 0
assert all(isinstance(bitmask, BitMasks) for bitmask in bitmasks_list)
cat_bitmasks = type(bitmasks_list[0])(torch.cat([bm.tensor for bm in bitmasks_list], dim=0))
return cat_bitmasks
class PolygonMasks:
"""
This class stores the segmentation masks for all objects in one image, in the form of polygons.
Attributes:
polygons: list[list[ndarray]]. Each ndarray is a float64 vector representing a polygon.
"""
def __init__(self, polygons: List[List[Union[torch.Tensor, np.ndarray]]]):
"""
Arguments:
polygons (list[list[np.ndarray]]): The first
level of the list correspond to individual instances,
the second level to all the polygons that compose the
instance, and the third level to the polygon coordinates.
The third level array should have the format of
[x0, y0, x1, y1, ..., xn, yn] (n >= 3).
"""
if not isinstance(polygons, list):
raise ValueError(
"Cannot create PolygonMasks: Expect a list of list of polygons per image. "
"Got '{}' instead.".format(type(polygons))
)
def _make_array(t: Union[torch.Tensor, np.ndarray]) -> np.ndarray:
# Use float64 for higher precision, because why not?
# Always put polygons on CPU (self.to is a no-op) since they
# are supposed to be small tensors.
# May need to change this assumption if GPU placement becomes useful
if isinstance(t, torch.Tensor):
t = t.cpu().numpy()
return np.asarray(t).astype("float64")
def process_polygons(
polygons_per_instance: List[Union[torch.Tensor, np.ndarray]]
) -> List[np.ndarray]:
if not isinstance(polygons_per_instance, list):
raise ValueError(
"Cannot create polygons: Expect a list of polygons per instance. "
"Got '{}' instead.".format(type(polygons_per_instance))
)
# transform each polygon to a numpy array
polygons_per_instance = [_make_array(p) for p in polygons_per_instance]
for polygon in polygons_per_instance:
if len(polygon) % 2 != 0 or len(polygon) < 6:
raise ValueError(f"Cannot create a polygon from {len(polygon)} coordinates.")
return polygons_per_instance
self.polygons: List[List[np.ndarray]] = [
process_polygons(polygons_per_instance) for polygons_per_instance in polygons
]
def to(self, *args: Any, **kwargs: Any) -> "PolygonMasks":
return self
@property
def device(self) -> torch.device:
return torch.device("cpu")
def get_bounding_boxes(self) -> Boxes:
"""
Returns:
Boxes: tight bounding boxes around polygon masks.
"""
boxes = torch.zeros(len(self.polygons), 4, dtype=torch.float32)
for idx, polygons_per_instance in enumerate(self.polygons):
minxy = torch.as_tensor([float("inf"), float("inf")], dtype=torch.float32)
maxxy = torch.zeros(2, dtype=torch.float32)
for polygon in polygons_per_instance:
coords = torch.from_numpy(polygon).view(-1, 2).to(dtype=torch.float32)
minxy = torch.min(minxy, torch.min(coords, dim=0).values)
maxxy = torch.max(maxxy, torch.max(coords, dim=0).values)
boxes[idx, :2] = minxy
boxes[idx, 2:] = maxxy
return Boxes(boxes)
def nonempty(self) -> torch.Tensor:
"""
Find masks that are non-empty.
Returns:
Tensor:
a BoolTensor which represents whether each mask is empty (False) or not (True).
"""
keep = [1 if len(polygon) > 0 else 0 for polygon in self.polygons]
return torch.from_numpy(np.asarray(keep, dtype=bool))
def __getitem__(self, item: Union[int, slice, List[int], torch.BoolTensor]) -> "PolygonMasks":
"""
Support indexing over the instances and return a `PolygonMasks` object.
`item` can be:
1. An integer. It will return an object with only one instance.
2. A slice. It will return an object with the selected instances.
3. A list[int]. It will return an object with the selected instances,
correpsonding to the indices in the list.
4. A vector mask of type BoolTensor, whose length is num_instances.
It will return an object with the instances whose mask is nonzero.
"""
if isinstance(item, int):
selected_polygons = [self.polygons[item]]
elif isinstance(item, slice):
selected_polygons = self.polygons[item]
elif isinstance(item, list):
selected_polygons = [self.polygons[i] for i in item]
elif isinstance(item, torch.Tensor):
# Polygons is a list, so we have to move the indices back to CPU.
if item.dtype == torch.bool:
assert item.dim() == 1, item.shape
item = item.nonzero().squeeze(1).cpu().numpy().tolist()
elif item.dtype in [torch.int32, torch.int64]:
item = item.cpu().numpy().tolist()
else:
raise ValueError("Unsupported tensor dtype={} for indexing!".format(item.dtype))
selected_polygons = [self.polygons[i] for i in item]
return PolygonMasks(selected_polygons)
def __iter__(self) -> Iterator[List[np.ndarray]]:
"""
Yields:
list[ndarray]: the polygons for one instance.
Each Tensor is a float64 vector representing a polygon.
"""
return iter(self.polygons)
def __repr__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={})".format(len(self.polygons))
return s
def __len__(self) -> int:
return len(self.polygons)
def crop_and_resize(self, boxes: torch.Tensor, mask_size: int) -> torch.Tensor:
"""
Crop each mask by the given box, and resize results to (mask_size, mask_size).
This can be used to prepare training targets for Mask R-CNN.
Args:
boxes (Tensor): Nx4 tensor storing the boxes for each mask
mask_size (int): the size of the rasterized mask.
Returns:
Tensor: A bool tensor of shape (N, mask_size, mask_size), where
N is the number of predicted boxes for this image.
"""
assert len(boxes) == len(self), "{} != {}".format(len(boxes), len(self))
device = boxes.device
# Put boxes on the CPU, as the polygon representation is not efficient GPU-wise
# (several small tensors for representing a single instance mask)
boxes = boxes.to(torch.device("cpu"))
results = [
rasterize_polygons_within_box(poly, box.numpy(), mask_size)
for poly, box in zip(self.polygons, boxes)
]
"""
poly: list[list[float]], the polygons for one instance
box: a tensor of shape (4,)
"""
if len(results) == 0:
return torch.empty(0, mask_size, mask_size, dtype=torch.bool, device=device)
return torch.stack(results, dim=0).to(device=device)
def area(self):
"""
Computes area of the mask.
Only works with Polygons, using the shoelace formula:
https://stackoverflow.com/questions/24467972/calculate-area-of-polygon-given-x-y-coordinates
Returns:
Tensor: a vector, area for each instance
"""
area = []
for polygons_per_instance in self.polygons:
area_per_instance = 0
for p in polygons_per_instance:
area_per_instance += polygon_area(p[0::2], p[1::2])
area.append(area_per_instance)
return torch.tensor(area)
@staticmethod
def cat(polymasks_list: List["PolygonMasks"]) -> "PolygonMasks":
"""
Concatenates a list of PolygonMasks into a single PolygonMasks
Arguments:
polymasks_list (list[PolygonMasks])
Returns:
PolygonMasks: the concatenated PolygonMasks
"""
assert isinstance(polymasks_list, (list, tuple))
assert len(polymasks_list) > 0
assert all(isinstance(polymask, PolygonMasks) for polymask in polymasks_list)
cat_polymasks = type(polymasks_list[0])(
list(itertools.chain.from_iterable(pm.polygons for pm in polymasks_list))
)
return cat_polymasks
class ROIMasks:
"""
Represent masks by N smaller masks defined in some ROIs. Once ROI boxes are given,
full-image bitmask can be obtained by "pasting" the mask on the region defined
by the corresponding ROI box.
"""
def __init__(self, tensor: torch.Tensor):
"""
Args:
tensor: (N, M, M) mask tensor that defines the mask within each ROI.
"""
if tensor.dim() != 3:
raise ValueError("ROIMasks must take a masks of 3 dimension.")
self.tensor = tensor
def to(self, device: torch.device) -> "ROIMasks":
return ROIMasks(self.tensor.to(device))
@property
def device(self) -> device:
return self.tensor.device
def __len__(self):
return self.tensor.shape[0]
def __getitem__(self, item) -> "ROIMasks":
"""
Returns:
ROIMasks: Create a new :class:`ROIMasks` by indexing.
The following usage are allowed:
1. `new_masks = masks[2:10]`: return a slice of masks.
2. `new_masks = masks[vector]`, where vector is a torch.BoolTensor
with `length = len(masks)`. Nonzero elements in the vector will be selected.
Note that the returned object might share storage with this object,
subject to Pytorch's indexing semantics.
"""
t = self.tensor[item]
if t.dim() != 3:
raise ValueError(
f"Indexing on ROIMasks with {item} returns a tensor with shape {t.shape}!"
)
return ROIMasks(t)
@torch.jit.unused
def __repr__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={})".format(len(self.tensor))
return s
@torch.jit.unused
def to_bitmasks(self, boxes: torch.Tensor, height, width, threshold=0.5):
"""
Args: see documentation of :func:`paste_masks_in_image`.
"""
from detectron2.layers.mask_ops import paste_masks_in_image, _paste_masks_tensor_shape
if torch.jit.is_tracing():
if isinstance(height, torch.Tensor):
paste_func = _paste_masks_tensor_shape
else:
paste_func = paste_masks_in_image
else:
paste_func = retry_if_cuda_oom(paste_masks_in_image)
bitmasks = paste_func(self.tensor, boxes.tensor, (height, width), threshold=threshold)
return BitMasks(bitmasks)
# Copyright (c) Facebook, Inc. and its affiliates.
import math
from typing import List, Tuple
import torch
from detectron2.layers.rotated_boxes import pairwise_iou_rotated
from .boxes import Boxes
class RotatedBoxes(Boxes):
"""
This structure stores a list of rotated boxes as a Nx5 torch.Tensor.
It supports some common methods about boxes
(`area`, `clip`, `nonempty`, etc),
and also behaves like a Tensor
(support indexing, `to(device)`, `.device`, and iteration over all boxes)
"""
def __init__(self, tensor: torch.Tensor):
"""
Args:
tensor (Tensor[float]): a Nx5 matrix. Each row is
(x_center, y_center, width, height, angle),
in which angle is represented in degrees.
While there's no strict range restriction for it,
the recommended principal range is between [-180, 180) degrees.
Assume we have a horizontal box B = (x_center, y_center, width, height),
where width is along the x-axis and height is along the y-axis.
The rotated box B_rot (x_center, y_center, width, height, angle)
can be seen as:
1. When angle == 0:
B_rot == B
2. When angle > 0:
B_rot is obtained by rotating B w.r.t its center by :math:`|angle|` degrees CCW;
3. When angle < 0:
B_rot is obtained by rotating B w.r.t its center by :math:`|angle|` degrees CW.
Mathematically, since the right-handed coordinate system for image space
is (y, x), where y is top->down and x is left->right, the 4 vertices of the
rotated rectangle :math:`(yr_i, xr_i)` (i = 1, 2, 3, 4) can be obtained from
the vertices of the horizontal rectangle :math:`(y_i, x_i)` (i = 1, 2, 3, 4)
in the following way (:math:`\\theta = angle*\\pi/180` is the angle in radians,
:math:`(y_c, x_c)` is the center of the rectangle):
.. math::
yr_i = \\cos(\\theta) (y_i - y_c) - \\sin(\\theta) (x_i - x_c) + y_c,
xr_i = \\sin(\\theta) (y_i - y_c) + \\cos(\\theta) (x_i - x_c) + x_c,
which is the standard rigid-body rotation transformation.
Intuitively, the angle is
(1) the rotation angle from y-axis in image space
to the height vector (top->down in the box's local coordinate system)
of the box in CCW, and
(2) the rotation angle from x-axis in image space
to the width vector (left->right in the box's local coordinate system)
of the box in CCW.
More intuitively, consider the following horizontal box ABCD represented
in (x1, y1, x2, y2): (3, 2, 7, 4),
covering the [3, 7] x [2, 4] region of the continuous coordinate system
which looks like this:
.. code:: none
O--------> x
|
| A---B
| | |
| D---C
|
v y
Note that each capital letter represents one 0-dimensional geometric point
instead of a 'square pixel' here.
In the example above, using (x, y) to represent a point we have:
.. math::
O = (0, 0), A = (3, 2), B = (7, 2), C = (7, 4), D = (3, 4)
We name vector AB = vector DC as the width vector in box's local coordinate system, and
vector AD = vector BC as the height vector in box's local coordinate system. Initially,
when angle = 0 degree, they're aligned with the positive directions of x-axis and y-axis
in the image space, respectively.
For better illustration, we denote the center of the box as E,
.. code:: none
O--------> x
|
| A---B
| | E |
| D---C
|
v y
where the center E = ((3+7)/2, (2+4)/2) = (5, 3).
Also,
.. math::
width = |AB| = |CD| = 7 - 3 = 4,
height = |AD| = |BC| = 4 - 2 = 2.
Therefore, the corresponding representation for the same shape in rotated box in
(x_center, y_center, width, height, angle) format is:
(5, 3, 4, 2, 0),
Now, let's consider (5, 3, 4, 2, 90), which is rotated by 90 degrees
CCW (counter-clockwise) by definition. It looks like this:
.. code:: none
O--------> x
| B-C
| | |
| |E|
| | |
| A-D
v y
The center E is still located at the same point (5, 3), while the vertices
ABCD are rotated by 90 degrees CCW with regard to E:
A = (4, 5), B = (4, 1), C = (6, 1), D = (6, 5)
Here, 90 degrees can be seen as the CCW angle to rotate from y-axis to
vector AD or vector BC (the top->down height vector in box's local coordinate system),
or the CCW angle to rotate from x-axis to vector AB or vector DC (the left->right
width vector in box's local coordinate system).
.. math::
width = |AB| = |CD| = 5 - 1 = 4,
height = |AD| = |BC| = 6 - 4 = 2.
Next, how about (5, 3, 4, 2, -90), which is rotated by 90 degrees CW (clockwise)
by definition? It looks like this:
.. code:: none
O--------> x
| D-A
| | |
| |E|
| | |
| C-B
v y
The center E is still located at the same point (5, 3), while the vertices
ABCD are rotated by 90 degrees CW with regard to E:
A = (6, 1), B = (6, 5), C = (4, 5), D = (4, 1)
.. math::
width = |AB| = |CD| = 5 - 1 = 4,
height = |AD| = |BC| = 6 - 4 = 2.
This covers exactly the same region as (5, 3, 4, 2, 90) does, and their IoU
will be 1. However, these two will generate different RoI Pooling results and
should not be treated as an identical box.
On the other hand, it's easy to see that (X, Y, W, H, A) is identical to
(X, Y, W, H, A+360N), for any integer N. For example (5, 3, 4, 2, 270) would be
identical to (5, 3, 4, 2, -90), because rotating the shape 270 degrees CCW is
equivalent to rotating the same shape 90 degrees CW.
We could rotate further to get (5, 3, 4, 2, 180), or (5, 3, 4, 2, -180):
.. code:: none
O--------> x
|
| C---D
| | E |
| B---A
|
v y
.. math::
A = (7, 4), B = (3, 4), C = (3, 2), D = (7, 2),
width = |AB| = |CD| = 7 - 3 = 4,
height = |AD| = |BC| = 4 - 2 = 2.
Finally, this is a very inaccurate (heavily quantized) illustration of
how (5, 3, 4, 2, 60) looks like in case anyone wonders:
.. code:: none
O--------> x
| B\
| / C
| /E /
| A /
| `D
v y
It's still a rectangle with center of (5, 3), width of 4 and height of 2,
but its angle (and thus orientation) is somewhere between
(5, 3, 4, 2, 0) and (5, 3, 4, 2, 90).
"""
device = tensor.device if isinstance(tensor, torch.Tensor) else torch.device("cpu")
tensor = torch.as_tensor(tensor, dtype=torch.float32, device=device)
if tensor.numel() == 0:
# Use reshape, so we don't end up creating a new tensor that does not depend on
# the inputs (and consequently confuses jit)
tensor = tensor.reshape((0, 5)).to(dtype=torch.float32, device=device)
assert tensor.dim() == 2 and tensor.size(-1) == 5, tensor.size()
self.tensor = tensor
def clone(self) -> "RotatedBoxes":
"""
Clone the RotatedBoxes.
Returns:
RotatedBoxes
"""
return RotatedBoxes(self.tensor.clone())
def to(self, device: torch.device, non_blocking: bool = False):
# Boxes are assumed float32 and does not support to(dtype)
return RotatedBoxes(self.tensor.to(device=device, non_blocking=non_blocking))
def area(self) -> torch.Tensor:
"""
Computes the area of all the boxes.
Returns:
torch.Tensor: a vector with areas of each box.
"""
box = self.tensor
area = box[:, 2] * box[:, 3]
return area
# Avoid in-place operations so that we can torchscript; NOTE: this creates a new tensor
def normalize_angles(self) -> None:
"""
Restrict angles to the range of [-180, 180) degrees
"""
angle_tensor = (self.tensor[:, 4] + 180.0) % 360.0 - 180.0
self.tensor = torch.cat((self.tensor[:, :4], angle_tensor[:, None]), dim=1)
def clip(self, box_size: Tuple[int, int], clip_angle_threshold: float = 1.0) -> None:
"""
Clip (in place) the boxes by limiting x coordinates to the range [0, width]
and y coordinates to the range [0, height].
For RRPN:
Only clip boxes that are almost horizontal with a tolerance of
clip_angle_threshold to maintain backward compatibility.
Rotated boxes beyond this threshold are not clipped for two reasons:
1. There are potentially multiple ways to clip a rotated box to make it
fit within the image.
2. It's tricky to make the entire rectangular box fit within the image
and still be able to not leave out pixels of interest.
Therefore we rely on ops like RoIAlignRotated to safely handle this.
Args:
box_size (height, width): The clipping box's size.
clip_angle_threshold:
Iff. abs(normalized(angle)) <= clip_angle_threshold (in degrees),
we do the clipping as horizontal boxes.
"""
h, w = box_size
# normalize angles to be within (-180, 180] degrees
self.normalize_angles()
idx = torch.where(torch.abs(self.tensor[:, 4]) <= clip_angle_threshold)[0]
# convert to (x1, y1, x2, y2)
x1 = self.tensor[idx, 0] - self.tensor[idx, 2] / 2.0
y1 = self.tensor[idx, 1] - self.tensor[idx, 3] / 2.0
x2 = self.tensor[idx, 0] + self.tensor[idx, 2] / 2.0
y2 = self.tensor[idx, 1] + self.tensor[idx, 3] / 2.0
# clip
x1.clamp_(min=0, max=w)
y1.clamp_(min=0, max=h)
x2.clamp_(min=0, max=w)
y2.clamp_(min=0, max=h)
# convert back to (xc, yc, w, h)
self.tensor[idx, 0] = (x1 + x2) / 2.0
self.tensor[idx, 1] = (y1 + y2) / 2.0
# make sure widths and heights do not increase due to numerical errors
self.tensor[idx, 2] = torch.min(self.tensor[idx, 2], x2 - x1)
self.tensor[idx, 3] = torch.min(self.tensor[idx, 3], y2 - y1)
def nonempty(self, threshold: float = 0.0) -> torch.Tensor:
"""
Find boxes that are non-empty.
A box is considered empty, if either of its side is no larger than threshold.
Returns:
Tensor: a binary vector which represents
whether each box is empty (False) or non-empty (True).
"""
box = self.tensor
widths = box[:, 2]
heights = box[:, 3]
keep = (widths > threshold) & (heights > threshold)
return keep
def __getitem__(self, item) -> "RotatedBoxes":
"""
Returns:
RotatedBoxes: Create a new :class:`RotatedBoxes` by indexing.
The following usage are allowed:
1. `new_boxes = boxes[3]`: return a `RotatedBoxes` which contains only one box.
2. `new_boxes = boxes[2:10]`: return a slice of boxes.
3. `new_boxes = boxes[vector]`, where vector is a torch.ByteTensor
with `length = len(boxes)`. Nonzero elements in the vector will be selected.
Note that the returned RotatedBoxes might share storage with this RotatedBoxes,
subject to Pytorch's indexing semantics.
"""
if isinstance(item, int):
return RotatedBoxes(self.tensor[item].view(1, -1))
b = self.tensor[item]
assert b.dim() == 2, "Indexing on RotatedBoxes with {} failed to return a matrix!".format(
item
)
return RotatedBoxes(b)
def __len__(self) -> int:
return self.tensor.shape[0]
def __repr__(self) -> str:
return "RotatedBoxes(" + str(self.tensor) + ")"
def inside_box(self, box_size: Tuple[int, int], boundary_threshold: int = 0) -> torch.Tensor:
"""
Args:
box_size (height, width): Size of the reference box covering
[0, width] x [0, height]
boundary_threshold (int): Boxes that extend beyond the reference box
boundary by more than boundary_threshold are considered "outside".
For RRPN, it might not be necessary to call this function since it's common
for rotated box to extend to outside of the image boundaries
(the clip function only clips the near-horizontal boxes)
Returns:
a binary vector, indicating whether each box is inside the reference box.
"""
height, width = box_size
cnt_x = self.tensor[..., 0]
cnt_y = self.tensor[..., 1]
half_w = self.tensor[..., 2] / 2.0
half_h = self.tensor[..., 3] / 2.0
a = self.tensor[..., 4]
c = torch.abs(torch.cos(a * math.pi / 180.0))
s = torch.abs(torch.sin(a * math.pi / 180.0))
# This basically computes the horizontal bounding rectangle of the rotated box
max_rect_dx = c * half_w + s * half_h
max_rect_dy = c * half_h + s * half_w
inds_inside = (
(cnt_x - max_rect_dx >= -boundary_threshold)
& (cnt_y - max_rect_dy >= -boundary_threshold)
& (cnt_x + max_rect_dx < width + boundary_threshold)
& (cnt_y + max_rect_dy < height + boundary_threshold)
)
return inds_inside
def get_centers(self) -> torch.Tensor:
"""
Returns:
The box centers in a Nx2 array of (x, y).
"""
return self.tensor[:, :2]
def scale(self, scale_x: float, scale_y: float) -> None:
"""
Scale the rotated box with horizontal and vertical scaling factors
Note: when scale_factor_x != scale_factor_y,
the rotated box does not preserve the rectangular shape when the angle
is not a multiple of 90 degrees under resize transformation.
Instead, the shape is a parallelogram (that has skew)
Here we make an approximation by fitting a rotated rectangle to the parallelogram.
"""
self.tensor[:, 0] *= scale_x
self.tensor[:, 1] *= scale_y
theta = self.tensor[:, 4] * math.pi / 180.0
c = torch.cos(theta)
s = torch.sin(theta)
# In image space, y is top->down and x is left->right
# Consider the local coordintate system for the rotated box,
# where the box center is located at (0, 0), and the four vertices ABCD are
# A(-w / 2, -h / 2), B(w / 2, -h / 2), C(w / 2, h / 2), D(-w / 2, h / 2)
# the midpoint of the left edge AD of the rotated box E is:
# E = (A+D)/2 = (-w / 2, 0)
# the midpoint of the top edge AB of the rotated box F is:
# F(0, -h / 2)
# To get the old coordinates in the global system, apply the rotation transformation
# (Note: the right-handed coordinate system for image space is yOx):
# (old_x, old_y) = (s * y + c * x, c * y - s * x)
# E(old) = (s * 0 + c * (-w/2), c * 0 - s * (-w/2)) = (-c * w / 2, s * w / 2)
# F(old) = (s * (-h / 2) + c * 0, c * (-h / 2) - s * 0) = (-s * h / 2, -c * h / 2)
# After applying the scaling factor (sfx, sfy):
# E(new) = (-sfx * c * w / 2, sfy * s * w / 2)
# F(new) = (-sfx * s * h / 2, -sfy * c * h / 2)
# The new width after scaling tranformation becomes:
# w(new) = |E(new) - O| * 2
# = sqrt[(sfx * c * w / 2)^2 + (sfy * s * w / 2)^2] * 2
# = sqrt[(sfx * c)^2 + (sfy * s)^2] * w
# i.e., scale_factor_w = sqrt[(sfx * c)^2 + (sfy * s)^2]
#
# For example,
# when angle = 0 or 180, |c| = 1, s = 0, scale_factor_w == scale_factor_x;
# when |angle| = 90, c = 0, |s| = 1, scale_factor_w == scale_factor_y
self.tensor[:, 2] *= torch.sqrt((scale_x * c) ** 2 + (scale_y * s) ** 2)
# h(new) = |F(new) - O| * 2
# = sqrt[(sfx * s * h / 2)^2 + (sfy * c * h / 2)^2] * 2
# = sqrt[(sfx * s)^2 + (sfy * c)^2] * h
# i.e., scale_factor_h = sqrt[(sfx * s)^2 + (sfy * c)^2]
#
# For example,
# when angle = 0 or 180, |c| = 1, s = 0, scale_factor_h == scale_factor_y;
# when |angle| = 90, c = 0, |s| = 1, scale_factor_h == scale_factor_x
self.tensor[:, 3] *= torch.sqrt((scale_x * s) ** 2 + (scale_y * c) ** 2)
# The angle is the rotation angle from y-axis in image space to the height
# vector (top->down in the box's local coordinate system) of the box in CCW.
#
# angle(new) = angle_yOx(O - F(new))
# = angle_yOx( (sfx * s * h / 2, sfy * c * h / 2) )
# = atan2(sfx * s * h / 2, sfy * c * h / 2)
# = atan2(sfx * s, sfy * c)
#
# For example,
# when sfx == sfy, angle(new) == atan2(s, c) == angle(old)
self.tensor[:, 4] = torch.atan2(scale_x * s, scale_y * c) * 180 / math.pi
@classmethod
def cat(cls, boxes_list: List["RotatedBoxes"]) -> "RotatedBoxes":
"""
Concatenates a list of RotatedBoxes into a single RotatedBoxes
Arguments:
boxes_list (list[RotatedBoxes])
Returns:
RotatedBoxes: the concatenated RotatedBoxes
"""
assert isinstance(boxes_list, (list, tuple))
if len(boxes_list) == 0:
return cls(torch.empty(0))
assert all([isinstance(box, RotatedBoxes) for box in boxes_list])
# use torch.cat (v.s. layers.cat) so the returned boxes never share storage with input
cat_boxes = cls(torch.cat([b.tensor for b in boxes_list], dim=0))
return cat_boxes
@property
def device(self) -> torch.device:
return self.tensor.device
@torch.jit.unused
def __iter__(self):
"""
Yield a box as a Tensor of shape (5,) at a time.
"""
yield from self.tensor
def pairwise_iou(boxes1: RotatedBoxes, boxes2: RotatedBoxes) -> None:
"""
Given two lists of rotated boxes of size N and M,
compute the IoU (intersection over union)
between **all** N x M pairs of boxes.
The box order must be (x_center, y_center, width, height, angle).
Args:
boxes1, boxes2 (RotatedBoxes):
two `RotatedBoxes`. Contains N & M rotated boxes, respectively.
Returns:
Tensor: IoU, sized [N,M].
"""
return pairwise_iou_rotated(boxes1.tensor, boxes2.tensor)
# Copyright (c) Facebook, Inc. and its affiliates.
from .base_tracker import ( # noqa
BaseTracker,
build_tracker_head,
TRACKER_HEADS_REGISTRY,
)
from .bbox_iou_tracker import BBoxIOUTracker # noqa
from .hungarian_tracker import BaseHungarianTracker # noqa
from .iou_weighted_hungarian_bbox_iou_tracker import ( # noqa
IOUWeightedHungarianBBoxIOUTracker,
)
from .utils import create_prediction_pairs # noqa
from .vanilla_hungarian_bbox_iou_tracker import VanillaHungarianBBoxIOUTracker # noqa
__all__ = [k for k in globals().keys() if not k.startswith("_")]
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.
from detectron2.config import configurable
from detectron2.utils.registry import Registry
from ..config.config import CfgNode as CfgNode_
from ..structures import Instances
TRACKER_HEADS_REGISTRY = Registry("TRACKER_HEADS")
TRACKER_HEADS_REGISTRY.__doc__ = """
Registry for tracking classes.
"""
class BaseTracker:
"""
A parent class for all trackers
"""
@configurable
def __init__(self, **kwargs):
self._prev_instances = None # (D2)instances for previous frame
self._matched_idx = set() # indices in prev_instances found matching
self._matched_ID = set() # idendities in prev_instances found matching
self._untracked_prev_idx = set() # indices in prev_instances not found matching
self._id_count = 0 # used to assign new id
@classmethod
def from_config(cls, cfg: CfgNode_):
raise NotImplementedError("Calling BaseTracker::from_config")
def update(self, predictions: Instances) -> Instances:
"""
Args:
predictions: D2 Instances for predictions of the current frame
Return:
D2 Instances for predictions of the current frame with ID assigned
_prev_instances and instances will have the following fields:
.pred_boxes (shape=[N, 4])
.scores (shape=[N,])
.pred_classes (shape=[N,])
.pred_keypoints (shape=[N, M, 3], Optional)
.pred_masks (shape=List[2D_MASK], Optional) 2D_MASK: shape=[H, W]
.ID (shape=[N,])
N: # of detected bboxes
H and W: height and width of 2D mask
"""
raise NotImplementedError("Calling BaseTracker::update")
def build_tracker_head(cfg: CfgNode_) -> BaseTracker:
"""
Build a tracker head from `cfg.TRACKER_HEADS.TRACKER_NAME`.
Args:
cfg: D2 CfgNode, config file with tracker information
Return:
tracker object
"""
name = cfg.TRACKER_HEADS.TRACKER_NAME
tracker_class = TRACKER_HEADS_REGISTRY.get(name)
return tracker_class(cfg)
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.
import copy
import numpy as np
from typing import List
import torch
from detectron2.config import configurable
from detectron2.structures import Boxes, Instances
from detectron2.structures.boxes import pairwise_iou
from ..config.config import CfgNode as CfgNode_
from .base_tracker import TRACKER_HEADS_REGISTRY, BaseTracker
@TRACKER_HEADS_REGISTRY.register()
class BBoxIOUTracker(BaseTracker):
"""
A bounding box tracker to assign ID based on IoU between current and previous instances
"""
@configurable
def __init__(
self,
*,
video_height: int,
video_width: int,
max_num_instances: int = 200,
max_lost_frame_count: int = 0,
min_box_rel_dim: float = 0.02,
min_instance_period: int = 1,
track_iou_threshold: float = 0.5,
**kwargs,
):
"""
Args:
video_height: height the video frame
video_width: width of the video frame
max_num_instances: maximum number of id allowed to be tracked
max_lost_frame_count: maximum number of frame an id can lost tracking
exceed this number, an id is considered as lost
forever
min_box_rel_dim: a percentage, smaller than this dimension, a bbox is
removed from tracking
min_instance_period: an instance will be shown after this number of period
since its first showing up in the video
track_iou_threshold: iou threshold, below this number a bbox pair is removed
from tracking
"""
super().__init__(**kwargs)
self._video_height = video_height
self._video_width = video_width
self._max_num_instances = max_num_instances
self._max_lost_frame_count = max_lost_frame_count
self._min_box_rel_dim = min_box_rel_dim
self._min_instance_period = min_instance_period
self._track_iou_threshold = track_iou_threshold
@classmethod
def from_config(cls, cfg: CfgNode_):
"""
Old style initialization using CfgNode
Args:
cfg: D2 CfgNode, config file
Return:
dictionary storing arguments for __init__ method
"""
assert "VIDEO_HEIGHT" in cfg.TRACKER_HEADS
assert "VIDEO_WIDTH" in cfg.TRACKER_HEADS
video_height = cfg.TRACKER_HEADS.get("VIDEO_HEIGHT")
video_width = cfg.TRACKER_HEADS.get("VIDEO_WIDTH")
max_num_instances = cfg.TRACKER_HEADS.get("MAX_NUM_INSTANCES", 200)
max_lost_frame_count = cfg.TRACKER_HEADS.get("MAX_LOST_FRAME_COUNT", 0)
min_box_rel_dim = cfg.TRACKER_HEADS.get("MIN_BOX_REL_DIM", 0.02)
min_instance_period = cfg.TRACKER_HEADS.get("MIN_INSTANCE_PERIOD", 1)
track_iou_threshold = cfg.TRACKER_HEADS.get("TRACK_IOU_THRESHOLD", 0.5)
return {
"_target_": "detectron2.tracking.bbox_iou_tracker.BBoxIOUTracker",
"video_height": video_height,
"video_width": video_width,
"max_num_instances": max_num_instances,
"max_lost_frame_count": max_lost_frame_count,
"min_box_rel_dim": min_box_rel_dim,
"min_instance_period": min_instance_period,
"track_iou_threshold": track_iou_threshold,
}
def update(self, instances: Instances) -> Instances:
"""
See BaseTracker description
"""
instances = self._initialize_extra_fields(instances)
if self._prev_instances is not None:
# calculate IoU of all bbox pairs
iou_all = pairwise_iou(
boxes1=instances.pred_boxes,
boxes2=self._prev_instances.pred_boxes,
)
# sort IoU in descending order
bbox_pairs = self._create_prediction_pairs(instances, iou_all)
# assign previous ID to current bbox if IoU > track_iou_threshold
self._reset_fields()
for bbox_pair in bbox_pairs:
idx = bbox_pair["idx"]
prev_id = bbox_pair["prev_id"]
if (
idx in self._matched_idx
or prev_id in self._matched_ID
or bbox_pair["IoU"] < self._track_iou_threshold
):
continue
instances.ID[idx] = prev_id
instances.ID_period[idx] = bbox_pair["prev_period"] + 1
instances.lost_frame_count[idx] = 0
self._matched_idx.add(idx)
self._matched_ID.add(prev_id)
self._untracked_prev_idx.remove(bbox_pair["prev_idx"])
instances = self._assign_new_id(instances)
instances = self._merge_untracked_instances(instances)
self._prev_instances = copy.deepcopy(instances)
return instances
def _create_prediction_pairs(self, instances: Instances, iou_all: np.ndarray) -> List:
"""
For all instances in previous and current frames, create pairs. For each
pair, store index of the instance in current frame predcitions, index in
previous predictions, ID in previous predictions, IoU of the bboxes in this
pair, period in previous predictions.
Args:
instances: D2 Instances, for predictions of the current frame
iou_all: IoU for all bboxes pairs
Return:
A list of IoU for all pairs
"""
bbox_pairs = []
for i in range(len(instances)):
for j in range(len(self._prev_instances)):
bbox_pairs.append(
{
"idx": i,
"prev_idx": j,
"prev_id": self._prev_instances.ID[j],
"IoU": iou_all[i, j],
"prev_period": self._prev_instances.ID_period[j],
}
)
return bbox_pairs
def _initialize_extra_fields(self, instances: Instances) -> Instances:
"""
If input instances don't have ID, ID_period, lost_frame_count fields,
this method is used to initialize these fields.
Args:
instances: D2 Instances, for predictions of the current frame
Return:
D2 Instances with extra fields added
"""
if not instances.has("ID"):
instances.set("ID", [None] * len(instances))
if not instances.has("ID_period"):
instances.set("ID_period", [None] * len(instances))
if not instances.has("lost_frame_count"):
instances.set("lost_frame_count", [None] * len(instances))
if self._prev_instances is None:
instances.ID = list(range(len(instances)))
self._id_count += len(instances)
instances.ID_period = [1] * len(instances)
instances.lost_frame_count = [0] * len(instances)
return instances
def _reset_fields(self):
"""
Before each uodate call, reset fields first
"""
self._matched_idx = set()
self._matched_ID = set()
self._untracked_prev_idx = set(range(len(self._prev_instances)))
def _assign_new_id(self, instances: Instances) -> Instances:
"""
For each untracked instance, assign a new id
Args:
instances: D2 Instances, for predictions of the current frame
Return:
D2 Instances with new ID assigned
"""
untracked_idx = set(range(len(instances))).difference(self._matched_idx)
for idx in untracked_idx:
instances.ID[idx] = self._id_count
self._id_count += 1
instances.ID_period[idx] = 1
instances.lost_frame_count[idx] = 0
return instances
def _merge_untracked_instances(self, instances: Instances) -> Instances:
"""
For untracked previous instances, under certain condition, still keep them
in tracking and merge with the current instances.
Args:
instances: D2 Instances, for predictions of the current frame
Return:
D2 Instances merging current instances and instances from previous
frame decided to keep tracking
"""
untracked_instances = Instances(
image_size=instances.image_size,
pred_boxes=[],
pred_classes=[],
scores=[],
ID=[],
ID_period=[],
lost_frame_count=[],
)
prev_bboxes = list(self._prev_instances.pred_boxes)
prev_classes = list(self._prev_instances.pred_classes)
prev_scores = list(self._prev_instances.scores)
prev_ID_period = self._prev_instances.ID_period
if instances.has("pred_masks"):
untracked_instances.set("pred_masks", [])
prev_masks = list(self._prev_instances.pred_masks)
if instances.has("pred_keypoints"):
untracked_instances.set("pred_keypoints", [])
prev_keypoints = list(self._prev_instances.pred_keypoints)
if instances.has("pred_keypoint_heatmaps"):
untracked_instances.set("pred_keypoint_heatmaps", [])
prev_keypoint_heatmaps = list(self._prev_instances.pred_keypoint_heatmaps)
for idx in self._untracked_prev_idx:
x_left, y_top, x_right, y_bot = prev_bboxes[idx]
if (
(1.0 * (x_right - x_left) / self._video_width < self._min_box_rel_dim)
or (1.0 * (y_bot - y_top) / self._video_height < self._min_box_rel_dim)
or self._prev_instances.lost_frame_count[idx] >= self._max_lost_frame_count
or prev_ID_period[idx] <= self._min_instance_period
):
continue
untracked_instances.pred_boxes.append(list(prev_bboxes[idx].numpy()))
untracked_instances.pred_classes.append(int(prev_classes[idx]))
untracked_instances.scores.append(float(prev_scores[idx]))
untracked_instances.ID.append(self._prev_instances.ID[idx])
untracked_instances.ID_period.append(self._prev_instances.ID_period[idx])
untracked_instances.lost_frame_count.append(
self._prev_instances.lost_frame_count[idx] + 1
)
if instances.has("pred_masks"):
untracked_instances.pred_masks.append(prev_masks[idx].numpy().astype(np.uint8))
if instances.has("pred_keypoints"):
untracked_instances.pred_keypoints.append(
prev_keypoints[idx].numpy().astype(np.uint8)
)
if instances.has("pred_keypoint_heatmaps"):
untracked_instances.pred_keypoint_heatmaps.append(
prev_keypoint_heatmaps[idx].numpy().astype(np.float32)
)
untracked_instances.pred_boxes = Boxes(torch.FloatTensor(untracked_instances.pred_boxes))
untracked_instances.pred_classes = torch.IntTensor(untracked_instances.pred_classes)
untracked_instances.scores = torch.FloatTensor(untracked_instances.scores)
if instances.has("pred_masks"):
untracked_instances.pred_masks = torch.IntTensor(untracked_instances.pred_masks)
if instances.has("pred_keypoints"):
untracked_instances.pred_keypoints = torch.IntTensor(untracked_instances.pred_keypoints)
if instances.has("pred_keypoint_heatmaps"):
untracked_instances.pred_keypoint_heatmaps = torch.FloatTensor(
untracked_instances.pred_keypoint_heatmaps
)
return Instances.cat(
[
instances,
untracked_instances,
]
)
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.
import copy
import numpy as np
from typing import Dict
import torch
from scipy.optimize import linear_sum_assignment
from detectron2.config import configurable
from detectron2.structures import Boxes, Instances
from ..config.config import CfgNode as CfgNode_
from .base_tracker import BaseTracker
class BaseHungarianTracker(BaseTracker):
"""
A base class for all Hungarian trackers
"""
@configurable
def __init__(
self,
video_height: int,
video_width: int,
max_num_instances: int = 200,
max_lost_frame_count: int = 0,
min_box_rel_dim: float = 0.02,
min_instance_period: int = 1,
**kwargs
):
"""
Args:
video_height: height the video frame
video_width: width of the video frame
max_num_instances: maximum number of id allowed to be tracked
max_lost_frame_count: maximum number of frame an id can lost tracking
exceed this number, an id is considered as lost
forever
min_box_rel_dim: a percentage, smaller than this dimension, a bbox is
removed from tracking
min_instance_period: an instance will be shown after this number of period
since its first showing up in the video
"""
super().__init__(**kwargs)
self._video_height = video_height
self._video_width = video_width
self._max_num_instances = max_num_instances
self._max_lost_frame_count = max_lost_frame_count
self._min_box_rel_dim = min_box_rel_dim
self._min_instance_period = min_instance_period
@classmethod
def from_config(cls, cfg: CfgNode_) -> Dict:
raise NotImplementedError("Calling HungarianTracker::from_config")
def build_cost_matrix(self, instances: Instances, prev_instances: Instances) -> np.ndarray:
raise NotImplementedError("Calling HungarianTracker::build_matrix")
def update(self, instances: Instances) -> Instances:
if instances.has("pred_keypoints"):
raise NotImplementedError("Need to add support for keypoints")
instances = self._initialize_extra_fields(instances)
if self._prev_instances is not None:
self._untracked_prev_idx = set(range(len(self._prev_instances)))
cost_matrix = self.build_cost_matrix(instances, self._prev_instances)
matched_idx, matched_prev_idx = linear_sum_assignment(cost_matrix)
instances = self._process_matched_idx(instances, matched_idx, matched_prev_idx)
instances = self._process_unmatched_idx(instances, matched_idx)
instances = self._process_unmatched_prev_idx(instances, matched_prev_idx)
self._prev_instances = copy.deepcopy(instances)
return instances
def _initialize_extra_fields(self, instances: Instances) -> Instances:
"""
If input instances don't have ID, ID_period, lost_frame_count fields,
this method is used to initialize these fields.
Args:
instances: D2 Instances, for predictions of the current frame
Return:
D2 Instances with extra fields added
"""
if not instances.has("ID"):
instances.set("ID", [None] * len(instances))
if not instances.has("ID_period"):
instances.set("ID_period", [None] * len(instances))
if not instances.has("lost_frame_count"):
instances.set("lost_frame_count", [None] * len(instances))
if self._prev_instances is None:
instances.ID = list(range(len(instances)))
self._id_count += len(instances)
instances.ID_period = [1] * len(instances)
instances.lost_frame_count = [0] * len(instances)
return instances
def _process_matched_idx(
self, instances: Instances, matched_idx: np.ndarray, matched_prev_idx: np.ndarray
) -> Instances:
assert matched_idx.size == matched_prev_idx.size
for i in range(matched_idx.size):
instances.ID[matched_idx[i]] = self._prev_instances.ID[matched_prev_idx[i]]
instances.ID_period[matched_idx[i]] = (
self._prev_instances.ID_period[matched_prev_idx[i]] + 1
)
instances.lost_frame_count[matched_idx[i]] = 0
return instances
def _process_unmatched_idx(self, instances: Instances, matched_idx: np.ndarray) -> Instances:
untracked_idx = set(range(len(instances))).difference(set(matched_idx))
for idx in untracked_idx:
instances.ID[idx] = self._id_count
self._id_count += 1
instances.ID_period[idx] = 1
instances.lost_frame_count[idx] = 0
return instances
def _process_unmatched_prev_idx(
self, instances: Instances, matched_prev_idx: np.ndarray
) -> Instances:
untracked_instances = Instances(
image_size=instances.image_size,
pred_boxes=[],
pred_masks=[],
pred_classes=[],
scores=[],
ID=[],
ID_period=[],
lost_frame_count=[],
)
prev_bboxes = list(self._prev_instances.pred_boxes)
prev_classes = list(self._prev_instances.pred_classes)
prev_scores = list(self._prev_instances.scores)
prev_ID_period = self._prev_instances.ID_period
if instances.has("pred_masks"):
prev_masks = list(self._prev_instances.pred_masks)
untracked_prev_idx = set(range(len(self._prev_instances))).difference(set(matched_prev_idx))
for idx in untracked_prev_idx:
x_left, y_top, x_right, y_bot = prev_bboxes[idx]
if (
(1.0 * (x_right - x_left) / self._video_width < self._min_box_rel_dim)
or (1.0 * (y_bot - y_top) / self._video_height < self._min_box_rel_dim)
or self._prev_instances.lost_frame_count[idx] >= self._max_lost_frame_count
or prev_ID_period[idx] <= self._min_instance_period
):
continue
untracked_instances.pred_boxes.append(list(prev_bboxes[idx].numpy()))
untracked_instances.pred_classes.append(int(prev_classes[idx]))
untracked_instances.scores.append(float(prev_scores[idx]))
untracked_instances.ID.append(self._prev_instances.ID[idx])
untracked_instances.ID_period.append(self._prev_instances.ID_period[idx])
untracked_instances.lost_frame_count.append(
self._prev_instances.lost_frame_count[idx] + 1
)
if instances.has("pred_masks"):
untracked_instances.pred_masks.append(prev_masks[idx].numpy().astype(np.uint8))
untracked_instances.pred_boxes = Boxes(torch.FloatTensor(untracked_instances.pred_boxes))
untracked_instances.pred_classes = torch.IntTensor(untracked_instances.pred_classes)
untracked_instances.scores = torch.FloatTensor(untracked_instances.scores)
if instances.has("pred_masks"):
untracked_instances.pred_masks = torch.IntTensor(untracked_instances.pred_masks)
else:
untracked_instances.remove("pred_masks")
return Instances.cat(
[
instances,
untracked_instances,
]
)
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.
import numpy as np
from typing import List
from detectron2.config import CfgNode as CfgNode_
from detectron2.config import configurable
from .base_tracker import TRACKER_HEADS_REGISTRY
from .vanilla_hungarian_bbox_iou_tracker import VanillaHungarianBBoxIOUTracker
@TRACKER_HEADS_REGISTRY.register()
class IOUWeightedHungarianBBoxIOUTracker(VanillaHungarianBBoxIOUTracker):
"""
A tracker using IoU as weight in Hungarian algorithm, also known
as Munkres or Kuhn-Munkres algorithm
"""
@configurable
def __init__(
self,
*,
video_height: int,
video_width: int,
max_num_instances: int = 200,
max_lost_frame_count: int = 0,
min_box_rel_dim: float = 0.02,
min_instance_period: int = 1,
track_iou_threshold: float = 0.5,
**kwargs,
):
"""
Args:
video_height: height the video frame
video_width: width of the video frame
max_num_instances: maximum number of id allowed to be tracked
max_lost_frame_count: maximum number of frame an id can lost tracking
exceed this number, an id is considered as lost
forever
min_box_rel_dim: a percentage, smaller than this dimension, a bbox is
removed from tracking
min_instance_period: an instance will be shown after this number of period
since its first showing up in the video
track_iou_threshold: iou threshold, below this number a bbox pair is removed
from tracking
"""
super().__init__(
video_height=video_height,
video_width=video_width,
max_num_instances=max_num_instances,
max_lost_frame_count=max_lost_frame_count,
min_box_rel_dim=min_box_rel_dim,
min_instance_period=min_instance_period,
track_iou_threshold=track_iou_threshold,
)
@classmethod
def from_config(cls, cfg: CfgNode_):
"""
Old style initialization using CfgNode
Args:
cfg: D2 CfgNode, config file
Return:
dictionary storing arguments for __init__ method
"""
assert "VIDEO_HEIGHT" in cfg.TRACKER_HEADS
assert "VIDEO_WIDTH" in cfg.TRACKER_HEADS
video_height = cfg.TRACKER_HEADS.get("VIDEO_HEIGHT")
video_width = cfg.TRACKER_HEADS.get("VIDEO_WIDTH")
max_num_instances = cfg.TRACKER_HEADS.get("MAX_NUM_INSTANCES", 200)
max_lost_frame_count = cfg.TRACKER_HEADS.get("MAX_LOST_FRAME_COUNT", 0)
min_box_rel_dim = cfg.TRACKER_HEADS.get("MIN_BOX_REL_DIM", 0.02)
min_instance_period = cfg.TRACKER_HEADS.get("MIN_INSTANCE_PERIOD", 1)
track_iou_threshold = cfg.TRACKER_HEADS.get("TRACK_IOU_THRESHOLD", 0.5)
return {
"_target_": "detectron2.tracking.iou_weighted_hungarian_bbox_iou_tracker.IOUWeightedHungarianBBoxIOUTracker", # noqa
"video_height": video_height,
"video_width": video_width,
"max_num_instances": max_num_instances,
"max_lost_frame_count": max_lost_frame_count,
"min_box_rel_dim": min_box_rel_dim,
"min_instance_period": min_instance_period,
"track_iou_threshold": track_iou_threshold,
}
def assign_cost_matrix_values(self, cost_matrix: np.ndarray, bbox_pairs: List) -> np.ndarray:
"""
Based on IoU for each pair of bbox, assign the associated value in cost matrix
Args:
cost_matrix: np.ndarray, initialized 2D array with target dimensions
bbox_pairs: list of bbox pair, in each pair, iou value is stored
Return:
np.ndarray, cost_matrix with assigned values
"""
for pair in bbox_pairs:
# assign (-1 * IoU) for above threshold pairs, algorithms will minimize cost
cost_matrix[pair["idx"]][pair["prev_idx"]] = -1 * pair["IoU"]
return cost_matrix
#!/usr/bin/env python3
import numpy as np
from typing import List
from detectron2.structures import Instances
def create_prediction_pairs(
instances: Instances,
prev_instances: Instances,
iou_all: np.ndarray,
threshold: float = 0.5,
) -> List:
"""
Args:
instances: predictions from current frame
prev_instances: predictions from previous frame
iou_all: 2D numpy array containing iou for each bbox pair
threshold: below the threshold, doesn't consider the pair of bbox is valid
Return:
List of bbox pairs
"""
bbox_pairs = []
for i in range(len(instances)):
for j in range(len(prev_instances)):
if iou_all[i, j] < threshold:
continue
bbox_pairs.append(
{
"idx": i,
"prev_idx": j,
"prev_id": prev_instances.ID[j],
"IoU": iou_all[i, j],
"prev_period": prev_instances.ID_period[j],
}
)
return bbox_pairs
LARGE_COST_VALUE = 100000
#!/usr/bin/env python3
# Copyright 2004-present Facebook. All Rights Reserved.
import numpy as np
from typing import List
from detectron2.config import CfgNode as CfgNode_
from detectron2.config import configurable
from detectron2.structures import Instances
from detectron2.structures.boxes import pairwise_iou
from detectron2.tracking.utils import LARGE_COST_VALUE, create_prediction_pairs
from .base_tracker import TRACKER_HEADS_REGISTRY
from .hungarian_tracker import BaseHungarianTracker
@TRACKER_HEADS_REGISTRY.register()
class VanillaHungarianBBoxIOUTracker(BaseHungarianTracker):
"""
Hungarian algo based tracker using bbox iou as metric
"""
@configurable
def __init__(
self,
*,
video_height: int,
video_width: int,
max_num_instances: int = 200,
max_lost_frame_count: int = 0,
min_box_rel_dim: float = 0.02,
min_instance_period: int = 1,
track_iou_threshold: float = 0.5,
**kwargs,
):
"""
Args:
video_height: height the video frame
video_width: width of the video frame
max_num_instances: maximum number of id allowed to be tracked
max_lost_frame_count: maximum number of frame an id can lost tracking
exceed this number, an id is considered as lost
forever
min_box_rel_dim: a percentage, smaller than this dimension, a bbox is
removed from tracking
min_instance_period: an instance will be shown after this number of period
since its first showing up in the video
track_iou_threshold: iou threshold, below this number a bbox pair is removed
from tracking
"""
super().__init__(
video_height=video_height,
video_width=video_width,
max_num_instances=max_num_instances,
max_lost_frame_count=max_lost_frame_count,
min_box_rel_dim=min_box_rel_dim,
min_instance_period=min_instance_period,
)
self._track_iou_threshold = track_iou_threshold
@classmethod
def from_config(cls, cfg: CfgNode_):
"""
Old style initialization using CfgNode
Args:
cfg: D2 CfgNode, config file
Return:
dictionary storing arguments for __init__ method
"""
assert "VIDEO_HEIGHT" in cfg.TRACKER_HEADS
assert "VIDEO_WIDTH" in cfg.TRACKER_HEADS
video_height = cfg.TRACKER_HEADS.get("VIDEO_HEIGHT")
video_width = cfg.TRACKER_HEADS.get("VIDEO_WIDTH")
max_num_instances = cfg.TRACKER_HEADS.get("MAX_NUM_INSTANCES", 200)
max_lost_frame_count = cfg.TRACKER_HEADS.get("MAX_LOST_FRAME_COUNT", 0)
min_box_rel_dim = cfg.TRACKER_HEADS.get("MIN_BOX_REL_DIM", 0.02)
min_instance_period = cfg.TRACKER_HEADS.get("MIN_INSTANCE_PERIOD", 1)
track_iou_threshold = cfg.TRACKER_HEADS.get("TRACK_IOU_THRESHOLD", 0.5)
return {
"_target_": "detectron2.tracking.vanilla_hungarian_bbox_iou_tracker.VanillaHungarianBBoxIOUTracker", # noqa
"video_height": video_height,
"video_width": video_width,
"max_num_instances": max_num_instances,
"max_lost_frame_count": max_lost_frame_count,
"min_box_rel_dim": min_box_rel_dim,
"min_instance_period": min_instance_period,
"track_iou_threshold": track_iou_threshold,
}
def build_cost_matrix(self, instances: Instances, prev_instances: Instances) -> np.ndarray:
"""
Build the cost matrix for assignment problem
(https://en.wikipedia.org/wiki/Assignment_problem)
Args:
instances: D2 Instances, for current frame predictions
prev_instances: D2 Instances, for previous frame predictions
Return:
the cost matrix in numpy array
"""
assert instances is not None and prev_instances is not None
# calculate IoU of all bbox pairs
iou_all = pairwise_iou(
boxes1=instances.pred_boxes,
boxes2=self._prev_instances.pred_boxes,
)
bbox_pairs = create_prediction_pairs(
instances, self._prev_instances, iou_all, threshold=self._track_iou_threshold
)
# assign large cost value to make sure pair below IoU threshold won't be matched
cost_matrix = np.full((len(instances), len(prev_instances)), LARGE_COST_VALUE)
return self.assign_cost_matrix_values(cost_matrix, bbox_pairs)
def assign_cost_matrix_values(self, cost_matrix: np.ndarray, bbox_pairs: List) -> np.ndarray:
"""
Based on IoU for each pair of bbox, assign the associated value in cost matrix
Args:
cost_matrix: np.ndarray, initialized 2D array with target dimensions
bbox_pairs: list of bbox pair, in each pair, iou value is stored
Return:
np.ndarray, cost_matrix with assigned values
"""
for pair in bbox_pairs:
# assign -1 for IoU above threshold pairs, algorithms will minimize cost
cost_matrix[pair["idx"]][pair["prev_idx"]] = -1
return cost_matrix
# Utility functions
This folder contain utility functions that are not used in the
core library, but are useful for building models or training
code using the config system.
# Copyright (c) Facebook, Inc. and its affiliates.
# -*- coding: utf-8 -*-
import typing
from typing import Any, List
import fvcore
from fvcore.nn import activation_count, flop_count, parameter_count, parameter_count_table
from torch import nn
from detectron2.export import TracingAdapter
__all__ = [
"activation_count_operators",
"flop_count_operators",
"parameter_count_table",
"parameter_count",
"FlopCountAnalysis",
]
FLOPS_MODE = "flops"
ACTIVATIONS_MODE = "activations"
# Some extra ops to ignore from counting, including elementwise and reduction ops
_IGNORED_OPS = {
"aten::add",
"aten::add_",
"aten::argmax",
"aten::argsort",
"aten::batch_norm",
"aten::constant_pad_nd",
"aten::div",
"aten::div_",
"aten::exp",
"aten::log2",
"aten::max_pool2d",
"aten::meshgrid",
"aten::mul",
"aten::mul_",
"aten::neg",
"aten::nonzero_numpy",
"aten::reciprocal",
"aten::repeat_interleave",
"aten::rsub",
"aten::sigmoid",
"aten::sigmoid_",
"aten::softmax",
"aten::sort",
"aten::sqrt",
"aten::sub",
"torchvision::nms", # TODO estimate flop for nms
}
class FlopCountAnalysis(fvcore.nn.FlopCountAnalysis):
"""
Same as :class:`fvcore.nn.FlopCountAnalysis`, but supports detectron2 models.
"""
def __init__(self, model, inputs):
"""
Args:
model (nn.Module):
inputs (Any): inputs of the given model. Does not have to be tuple of tensors.
"""
wrapper = TracingAdapter(model, inputs, allow_non_tensor=True)
super().__init__(wrapper, wrapper.flattened_inputs)
self.set_op_handle(**{k: None for k in _IGNORED_OPS})
def flop_count_operators(model: nn.Module, inputs: list) -> typing.DefaultDict[str, float]:
"""
Implement operator-level flops counting using jit.
This is a wrapper of :func:`fvcore.nn.flop_count` and adds supports for standard
detection models in detectron2.
Please use :class:`FlopCountAnalysis` for more advanced functionalities.
Note:
The function runs the input through the model to compute flops.
The flops of a detection model is often input-dependent, for example,
the flops of box & mask head depends on the number of proposals &
the number of detected objects.
Therefore, the flops counting using a single input may not accurately
reflect the computation cost of a model. It's recommended to average
across a number of inputs.
Args:
model: a detectron2 model that takes `list[dict]` as input.
inputs (list[dict]): inputs to model, in detectron2's standard format.
Only "image" key will be used.
supported_ops (dict[str, Handle]): see documentation of :func:`fvcore.nn.flop_count`
Returns:
Counter: Gflop count per operator
"""
old_train = model.training
model.eval()
ret = FlopCountAnalysis(model, inputs).by_operator()
model.train(old_train)
return {k: v / 1e9 for k, v in ret.items()}
def activation_count_operators(
model: nn.Module, inputs: list, **kwargs
) -> typing.DefaultDict[str, float]:
"""
Implement operator-level activations counting using jit.
This is a wrapper of fvcore.nn.activation_count, that supports standard detection models
in detectron2.
Note:
The function runs the input through the model to compute activations.
The activations of a detection model is often input-dependent, for example,
the activations of box & mask head depends on the number of proposals &
the number of detected objects.
Args:
model: a detectron2 model that takes `list[dict]` as input.
inputs (list[dict]): inputs to model, in detectron2's standard format.
Only "image" key will be used.
Returns:
Counter: activation count per operator
"""
return _wrapper_count_operators(model=model, inputs=inputs, mode=ACTIVATIONS_MODE, **kwargs)
def _wrapper_count_operators(
model: nn.Module, inputs: list, mode: str, **kwargs
) -> typing.DefaultDict[str, float]:
# ignore some ops
supported_ops = {k: lambda *args, **kwargs: {} for k in _IGNORED_OPS}
supported_ops.update(kwargs.pop("supported_ops", {}))
kwargs["supported_ops"] = supported_ops
assert len(inputs) == 1, "Please use batch size=1"
tensor_input = inputs[0]["image"]
inputs = [{"image": tensor_input}] # remove other keys, in case there are any
old_train = model.training
if isinstance(model, (nn.parallel.distributed.DistributedDataParallel, nn.DataParallel)):
model = model.module
wrapper = TracingAdapter(model, inputs)
wrapper.eval()
if mode == FLOPS_MODE:
ret = flop_count(wrapper, (tensor_input,), **kwargs)
elif mode == ACTIVATIONS_MODE:
ret = activation_count(wrapper, (tensor_input,), **kwargs)
else:
raise NotImplementedError("Count for mode {} is not supported yet.".format(mode))
# compatible with change in fvcore
if isinstance(ret, tuple):
ret = ret[0]
model.train(old_train)
return ret
def find_unused_parameters(model: nn.Module, inputs: Any) -> List[str]:
"""
Given a model, find parameters that do not contribute
to the loss.
Args:
model: a model in training mode that returns losses
inputs: argument or a tuple of arguments. Inputs of the model
Returns:
list[str]: the name of unused parameters
"""
assert model.training
for _, prm in model.named_parameters():
prm.grad = None
if isinstance(inputs, tuple):
losses = model(*inputs)
else:
losses = model(inputs)
if isinstance(losses, dict):
losses = sum(losses.values())
losses.backward()
unused: List[str] = []
for name, prm in model.named_parameters():
if prm.grad is None:
unused.append(name)
prm.grad = None
return unused
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment