Commit 54a066bf authored by mashun1's avatar mashun1
Browse files

ootdiffusion

parents
Pipeline #1004 canceled with stages
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
from __future__ import division
from typing import Any, List, Sequence, Tuple, Union
import torch
from torch.nn import functional as F
class ImageList(object):
"""
Structure that holds a list of images (of possibly
varying sizes) as a single tensor.
This works by padding the images to the same size,
and storing in a field the original sizes of each image
Attributes:
image_sizes (list[tuple[int, int]]): each tuple is (h, w)
"""
def __init__(self, tensor: torch.Tensor, image_sizes: List[Tuple[int, int]]):
"""
Arguments:
tensor (Tensor): of shape (N, H, W) or (N, C_1, ..., C_K, H, W) where K >= 1
image_sizes (list[tuple[int, int]]): Each tuple is (h, w). It can
be smaller than (H, W) due to padding.
"""
self.tensor = tensor
self.image_sizes = image_sizes
def __len__(self) -> int:
return len(self.image_sizes)
def __getitem__(self, idx: Union[int, slice]) -> torch.Tensor:
"""
Access the individual image in its original size.
Returns:
Tensor: an image of shape (H, W) or (C_1, ..., C_K, H, W) where K >= 1
"""
size = self.image_sizes[idx]
return self.tensor[idx, ..., : size[0], : size[1]] # type: ignore
def to(self, *args: Any, **kwargs: Any) -> "ImageList":
cast_tensor = self.tensor.to(*args, **kwargs)
return ImageList(cast_tensor, self.image_sizes)
@property
def device(self) -> torch.device:
return self.tensor.device
@staticmethod
def from_tensors(
tensors: Sequence[torch.Tensor], size_divisibility: int = 0, pad_value: float = 0.0
) -> "ImageList":
"""
Args:
tensors: a tuple or list of `torch.Tensors`, each of shape (Hi, Wi) or
(C_1, ..., C_K, Hi, Wi) where K >= 1. The Tensors will be padded
to the same shape with `pad_value`.
size_divisibility (int): If `size_divisibility > 0`, add padding to ensure
the common height and width is divisible by `size_divisibility`.
This depends on the model and many models need a divisibility of 32.
pad_value (float): value to pad
Returns:
an `ImageList`.
"""
assert len(tensors) > 0
assert isinstance(tensors, (tuple, list))
for t in tensors:
assert isinstance(t, torch.Tensor), type(t)
assert t.shape[1:-2] == tensors[0].shape[1:-2], t.shape
# per dimension maximum (H, W) or (C_1, ..., C_K, H, W) where K >= 1 among all tensors
max_size = (
# In tracing mode, x.shape[i] is Tensor, and should not be converted
# to int: this will cause the traced graph to have hard-coded shapes.
# Instead we should make max_size a Tensor that depends on these tensors.
# Using torch.stack twice seems to be the best way to convert
# list[list[ScalarTensor]] to a Tensor
torch.stack(
[
torch.stack([torch.as_tensor(dim) for dim in size])
for size in [tuple(img.shape) for img in tensors]
]
)
.max(0)
.values
)
if size_divisibility > 0:
stride = size_divisibility
# the last two dims are H,W, both subject to divisibility requirement
max_size = torch.cat([max_size[:-2], (max_size[-2:] + (stride - 1)) // stride * stride])
image_sizes = [tuple(im.shape[-2:]) for im in tensors]
if len(tensors) == 1:
# This seems slightly (2%) faster.
# TODO: check whether it's faster for multiple images as well
image_size = image_sizes[0]
padding_size = [0, max_size[-1] - image_size[1], 0, max_size[-2] - image_size[0]]
if all(x == 0 for x in padding_size): # https://github.com/pytorch/pytorch/issues/31734
batched_imgs = tensors[0].unsqueeze(0)
else:
padded = F.pad(tensors[0], padding_size, value=pad_value)
batched_imgs = padded.unsqueeze_(0)
else:
# max_size can be a tensor in tracing mode, therefore use tuple()
batch_shape = (len(tensors),) + tuple(max_size)
batched_imgs = tensors[0].new_full(batch_shape, pad_value)
for img, pad_img in zip(tensors, batched_imgs):
pad_img[..., : img.shape[-2], : img.shape[-1]].copy_(img)
return ImageList(batched_imgs.contiguous(), image_sizes)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import itertools
from typing import Any, Dict, List, Tuple, Union
import torch
class Instances:
"""
This class represents a list of instances in an image.
It stores the attributes of instances (e.g., boxes, masks, labels, scores) as "fields".
All fields must have the same ``__len__`` which is the number of instances.
All other (non-field) attributes of this class are considered private:
they must start with '_' and are not modifiable by a user.
Some basic usage:
1. Set/Get a field:
.. code-block:: python
instances.gt_boxes = Boxes(...)
print(instances.pred_masks) # a tensor of shape (N, H, W)
print('gt_masks' in instances)
2. ``len(instances)`` returns the number of instances
3. Indexing: ``instances[indices]`` will apply the indexing on all the fields
and returns a new :class:`Instances`.
Typically, ``indices`` is a integer vector of indices,
or a binary mask of length ``num_instances``,
"""
def __init__(self, image_size: Tuple[int, int], **kwargs: Any):
"""
Args:
image_size (height, width): the spatial size of the image.
kwargs: fields to add to this `Instances`.
"""
self._image_size = image_size
self._fields: Dict[str, Any] = {}
for k, v in kwargs.items():
self.set(k, v)
@property
def image_size(self) -> Tuple[int, int]:
"""
Returns:
tuple: height, width
"""
return self._image_size
def __setattr__(self, name: str, val: Any) -> None:
if name.startswith("_"):
super().__setattr__(name, val)
else:
self.set(name, val)
def __getattr__(self, name: str) -> Any:
if name == "_fields" or name not in self._fields:
raise AttributeError("Cannot find field '{}' in the given Instances!".format(name))
return self._fields[name]
def set(self, name: str, value: Any) -> None:
"""
Set the field named `name` to `value`.
The length of `value` must be the number of instances,
and must agree with other existing fields in this object.
"""
data_len = len(value)
if len(self._fields):
assert (
len(self) == data_len
), "Adding a field of length {} to a Instances of length {}".format(data_len, len(self))
self._fields[name] = value
def has(self, name: str) -> bool:
"""
Returns:
bool: whether the field called `name` exists.
"""
return name in self._fields
def remove(self, name: str) -> None:
"""
Remove the field called `name`.
"""
del self._fields[name]
def get(self, name: str) -> Any:
"""
Returns the field called `name`.
"""
return self._fields[name]
def get_fields(self) -> Dict[str, Any]:
"""
Returns:
dict: a dict which maps names (str) to data of the fields
Modifying the returned dict will modify this instance.
"""
return self._fields
# Tensor-like methods
def to(self, device: str) -> "Instances":
"""
Returns:
Instances: all fields are called with a `to(device)`, if the field has this method.
"""
ret = Instances(self._image_size)
for k, v in self._fields.items():
if hasattr(v, "to"):
v = v.to(device)
ret.set(k, v)
return ret
def __getitem__(self, item: Union[int, slice, torch.BoolTensor]) -> "Instances":
"""
Args:
item: an index-like object and will be used to index all the fields.
Returns:
If `item` is a string, return the data in the corresponding field.
Otherwise, returns an `Instances` where all fields are indexed by `item`.
"""
if type(item) == int:
if item >= len(self) or item < -len(self):
raise IndexError("Instances index out of range!")
else:
item = slice(item, None, len(self))
ret = Instances(self._image_size)
for k, v in self._fields.items():
ret.set(k, v[item])
return ret
def __len__(self) -> int:
for v in self._fields.values():
return len(v)
raise NotImplementedError("Empty Instances does not support __len__!")
def __iter__(self):
raise NotImplementedError("`Instances` object is not iterable!")
@staticmethod
def cat(instance_lists: List["Instances"]) -> "Instances":
"""
Args:
instance_lists (list[Instances])
Returns:
Instances
"""
assert all(isinstance(i, Instances) for i in instance_lists)
assert len(instance_lists) > 0
if len(instance_lists) == 1:
return instance_lists[0]
image_size = instance_lists[0].image_size
for i in instance_lists[1:]:
assert i.image_size == image_size
ret = Instances(image_size)
for k in instance_lists[0]._fields.keys():
values = [i.get(k) for i in instance_lists]
v0 = values[0]
if isinstance(v0, torch.Tensor):
values = torch.cat(values, dim=0)
elif isinstance(v0, list):
values = list(itertools.chain(*values))
elif hasattr(type(v0), "cat"):
values = type(v0).cat(values)
else:
raise ValueError("Unsupported type {} for concatenation".format(type(v0)))
ret.set(k, values)
return ret
def __str__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={}, ".format(len(self))
s += "image_height={}, ".format(self._image_size[0])
s += "image_width={}, ".format(self._image_size[1])
s += "fields=[{}])".format(", ".join((f"{k}: {v}" for k, v in self._fields.items())))
return s
__repr__ = __str__
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import numpy as np
from typing import Any, List, Tuple, Union
import torch
from detectron2.layers import interpolate
class Keypoints:
"""
Stores keypoint annotation data. GT Instances have a `gt_keypoints` property
containing the x,y location and visibility flag of each keypoint. This tensor has shape
(N, K, 3) where N is the number of instances and K is the number of keypoints per instance.
The visibility flag follows the COCO format and must be one of three integers:
* v=0: not labeled (in which case x=y=0)
* v=1: labeled but not visible
* v=2: labeled and visible
"""
def __init__(self, keypoints: Union[torch.Tensor, np.ndarray, List[List[float]]]):
"""
Arguments:
keypoints: A Tensor, numpy array, or list of the x, y, and visibility of each keypoint.
The shape should be (N, K, 3) where N is the number of
instances, and K is the number of keypoints per instance.
"""
device = keypoints.device if isinstance(keypoints, torch.Tensor) else torch.device("cpu")
keypoints = torch.as_tensor(keypoints, dtype=torch.float32, device=device)
assert keypoints.dim() == 3 and keypoints.shape[2] == 3, keypoints.shape
self.tensor = keypoints
def __len__(self) -> int:
return self.tensor.size(0)
def to(self, *args: Any, **kwargs: Any) -> "Keypoints":
return type(self)(self.tensor.to(*args, **kwargs))
@property
def device(self) -> torch.device:
return self.tensor.device
def to_heatmap(self, boxes: torch.Tensor, heatmap_size: int) -> torch.Tensor:
"""
Arguments:
boxes: Nx4 tensor, the boxes to draw the keypoints to
Returns:
heatmaps:
A tensor of shape (N, K) containing an integer spatial label
in the range [0, heatmap_size**2 - 1] for each keypoint in the input.
valid:
A tensor of shape (N, K) containing whether each keypoint is in the roi or not.
"""
return _keypoints_to_heatmap(self.tensor, boxes, heatmap_size)
def __getitem__(self, item: Union[int, slice, torch.BoolTensor]) -> "Keypoints":
"""
Create a new `Keypoints` by indexing on this `Keypoints`.
The following usage are allowed:
1. `new_kpts = kpts[3]`: return a `Keypoints` which contains only one instance.
2. `new_kpts = kpts[2:10]`: return a slice of key points.
3. `new_kpts = kpts[vector]`, where vector is a torch.ByteTensor
with `length = len(kpts)`. Nonzero elements in the vector will be selected.
Note that the returned Keypoints might share storage with this Keypoints,
subject to Pytorch's indexing semantics.
"""
if isinstance(item, int):
return Keypoints([self.tensor[item]])
return Keypoints(self.tensor[item])
def __repr__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={})".format(len(self.tensor))
return s
# TODO make this nicer, this is a direct translation from C2 (but removing the inner loop)
def _keypoints_to_heatmap(
keypoints: torch.Tensor, rois: torch.Tensor, heatmap_size: int
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Encode keypoint locations into a target heatmap for use in SoftmaxWithLoss across space.
Maps keypoints from the half-open interval [x1, x2) on continuous image coordinates to the
closed interval [0, heatmap_size - 1] on discrete image coordinates. We use the
continuous-discrete conversion from Heckbert 1990 ("What is the coordinate of a pixel?"):
d = floor(c) and c = d + 0.5, where d is a discrete coordinate and c is a continuous coordinate.
Arguments:
keypoints: tensor of keypoint locations in of shape (N, K, 3).
rois: Nx4 tensor of rois in xyxy format
heatmap_size: integer side length of square heatmap.
Returns:
heatmaps: A tensor of shape (N, K) containing an integer spatial label
in the range [0, heatmap_size**2 - 1] for each keypoint in the input.
valid: A tensor of shape (N, K) containing whether each keypoint is in
the roi or not.
"""
if rois.numel() == 0:
return rois.new().long(), rois.new().long()
offset_x = rois[:, 0]
offset_y = rois[:, 1]
scale_x = heatmap_size / (rois[:, 2] - rois[:, 0])
scale_y = heatmap_size / (rois[:, 3] - rois[:, 1])
offset_x = offset_x[:, None]
offset_y = offset_y[:, None]
scale_x = scale_x[:, None]
scale_y = scale_y[:, None]
x = keypoints[..., 0]
y = keypoints[..., 1]
x_boundary_inds = x == rois[:, 2][:, None]
y_boundary_inds = y == rois[:, 3][:, None]
x = (x - offset_x) * scale_x
x = x.floor().long()
y = (y - offset_y) * scale_y
y = y.floor().long()
x[x_boundary_inds] = heatmap_size - 1
y[y_boundary_inds] = heatmap_size - 1
valid_loc = (x >= 0) & (y >= 0) & (x < heatmap_size) & (y < heatmap_size)
vis = keypoints[..., 2] > 0
valid = (valid_loc & vis).long()
lin_ind = y * heatmap_size + x
heatmaps = lin_ind * valid
return heatmaps, valid
@torch.no_grad()
def heatmaps_to_keypoints(maps: torch.Tensor, rois: torch.Tensor) -> torch.Tensor:
"""
Extract predicted keypoint locations from heatmaps.
Args:
maps (Tensor): (#ROIs, #keypoints, POOL_H, POOL_W). The predicted heatmap of logits for
each ROI and each keypoint.
rois (Tensor): (#ROIs, 4). The box of each ROI.
Returns:
Tensor of shape (#ROIs, #keypoints, 4) with the last dimension corresponding to
(x, y, logit, score) for each keypoint.
When converting discrete pixel indices in an NxN image to a continuous keypoint coordinate,
we maintain consistency with :meth:`Keypoints.to_heatmap` by using the conversion from
Heckbert 1990: c = d + 0.5, where d is a discrete coordinate and c is a continuous coordinate.
"""
offset_x = rois[:, 0]
offset_y = rois[:, 1]
widths = (rois[:, 2] - rois[:, 0]).clamp(min=1)
heights = (rois[:, 3] - rois[:, 1]).clamp(min=1)
widths_ceil = widths.ceil()
heights_ceil = heights.ceil()
num_rois, num_keypoints = maps.shape[:2]
xy_preds = maps.new_zeros(rois.shape[0], num_keypoints, 4)
width_corrections = widths / widths_ceil
height_corrections = heights / heights_ceil
keypoints_idx = torch.arange(num_keypoints, device=maps.device)
for i in range(num_rois):
outsize = (int(heights_ceil[i]), int(widths_ceil[i]))
roi_map = interpolate(maps[[i]], size=outsize, mode="bicubic", align_corners=False).squeeze(
0
) # #keypoints x H x W
# softmax over the spatial region
max_score, _ = roi_map.view(num_keypoints, -1).max(1)
max_score = max_score.view(num_keypoints, 1, 1)
tmp_full_resolution = (roi_map - max_score).exp_()
tmp_pool_resolution = (maps[i] - max_score).exp_()
# Produce scores over the region H x W, but normalize with POOL_H x POOL_W,
# so that the scores of objects of different absolute sizes will be more comparable
roi_map_scores = tmp_full_resolution / tmp_pool_resolution.sum((1, 2), keepdim=True)
w = roi_map.shape[2]
pos = roi_map.view(num_keypoints, -1).argmax(1)
x_int = pos % w
y_int = (pos - x_int) // w
assert (
roi_map_scores[keypoints_idx, y_int, x_int]
== roi_map_scores.view(num_keypoints, -1).max(1)[0]
).all()
x = (x_int.float() + 0.5) * width_corrections[i]
y = (y_int.float() + 0.5) * height_corrections[i]
xy_preds[i, :, 0] = x + offset_x[i]
xy_preds[i, :, 1] = y + offset_y[i]
xy_preds[i, :, 2] = roi_map[keypoints_idx, y_int, x_int]
xy_preds[i, :, 3] = roi_map_scores[keypoints_idx, y_int, x_int]
return xy_preds
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import copy
import itertools
import numpy as np
from typing import Any, Iterator, List, Union
import pycocotools.mask as mask_utils
import torch
from detectron2.layers.roi_align import ROIAlign
from .boxes import Boxes
def polygon_area(x, y):
# Using the shoelace formula
# https://stackoverflow.com/questions/24467972/calculate-area-of-polygon-given-x-y-coordinates
return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1)))
def polygons_to_bitmask(polygons: List[np.ndarray], height: int, width: int) -> np.ndarray:
"""
Args:
polygons (list[ndarray]): each array has shape (Nx2,)
height, width (int)
Returns:
ndarray: a bool mask of shape (height, width)
"""
assert len(polygons) > 0, "COCOAPI does not support empty polygons"
rles = mask_utils.frPyObjects(polygons, height, width)
rle = mask_utils.merge(rles)
return mask_utils.decode(rle).astype(np.bool)
def rasterize_polygons_within_box(
polygons: List[np.ndarray], box: np.ndarray, mask_size: int
) -> torch.Tensor:
"""
Rasterize the polygons into a mask image and
crop the mask content in the given box.
The cropped mask is resized to (mask_size, mask_size).
This function is used when generating training targets for mask head in Mask R-CNN.
Given original ground-truth masks for an image, new ground-truth mask
training targets in the size of `mask_size x mask_size`
must be provided for each predicted box. This function will be called to
produce such targets.
Args:
polygons (list[ndarray[float]]): a list of polygons, which represents an instance.
box: 4-element numpy array
mask_size (int):
Returns:
Tensor: BoolTensor of shape (mask_size, mask_size)
"""
# 1. Shift the polygons w.r.t the boxes
w, h = box[2] - box[0], box[3] - box[1]
polygons = copy.deepcopy(polygons)
for p in polygons:
p[0::2] = p[0::2] - box[0]
p[1::2] = p[1::2] - box[1]
# 2. Rescale the polygons to the new box size
# max() to avoid division by small number
ratio_h = mask_size / max(h, 0.1)
ratio_w = mask_size / max(w, 0.1)
if ratio_h == ratio_w:
for p in polygons:
p *= ratio_h
else:
for p in polygons:
p[0::2] *= ratio_w
p[1::2] *= ratio_h
# 3. Rasterize the polygons with coco api
mask = polygons_to_bitmask(polygons, mask_size, mask_size)
mask = torch.from_numpy(mask)
return mask
class BitMasks:
"""
This class stores the segmentation masks for all objects in one image, in
the form of bitmaps.
Attributes:
tensor: bool Tensor of N,H,W, representing N instances in the image.
"""
def __init__(self, tensor: Union[torch.Tensor, np.ndarray]):
"""
Args:
tensor: bool Tensor of N,H,W, representing N instances in the image.
"""
device = tensor.device if isinstance(tensor, torch.Tensor) else torch.device("cpu")
tensor = torch.as_tensor(tensor, dtype=torch.bool, device=device)
assert tensor.dim() == 3, tensor.size()
self.image_size = tensor.shape[1:]
self.tensor = tensor
def to(self, device: str) -> "BitMasks":
return BitMasks(self.tensor.to(device))
@property
def device(self) -> torch.device:
return self.tensor.device
def __getitem__(self, item: Union[int, slice, torch.BoolTensor]) -> "BitMasks":
"""
Returns:
BitMasks: Create a new :class:`BitMasks` by indexing.
The following usage are allowed:
1. `new_masks = masks[3]`: return a `BitMasks` which contains only one mask.
2. `new_masks = masks[2:10]`: return a slice of masks.
3. `new_masks = masks[vector]`, where vector is a torch.BoolTensor
with `length = len(masks)`. Nonzero elements in the vector will be selected.
Note that the returned object might share storage with this object,
subject to Pytorch's indexing semantics.
"""
if isinstance(item, int):
return BitMasks(self.tensor[item].view(1, -1))
m = self.tensor[item]
assert m.dim() == 3, "Indexing on BitMasks with {} returns a tensor with shape {}!".format(
item, m.shape
)
return BitMasks(m)
def __iter__(self) -> torch.Tensor:
yield from self.tensor
def __repr__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={})".format(len(self.tensor))
return s
def __len__(self) -> int:
return self.tensor.shape[0]
def nonempty(self) -> torch.Tensor:
"""
Find masks that are non-empty.
Returns:
Tensor: a BoolTensor which represents
whether each mask is empty (False) or non-empty (True).
"""
return self.tensor.flatten(1).any(dim=1)
@staticmethod
def from_polygon_masks(
polygon_masks: Union["PolygonMasks", List[List[np.ndarray]]], height: int, width: int
) -> "BitMasks":
"""
Args:
polygon_masks (list[list[ndarray]] or PolygonMasks)
height, width (int)
"""
if isinstance(polygon_masks, PolygonMasks):
polygon_masks = polygon_masks.polygons
masks = [polygons_to_bitmask(p, height, width) for p in polygon_masks]
return BitMasks(torch.stack([torch.from_numpy(x) for x in masks]))
def crop_and_resize(self, boxes: torch.Tensor, mask_size: int) -> torch.Tensor:
"""
Crop each bitmask by the given box, and resize results to (mask_size, mask_size).
This can be used to prepare training targets for Mask R-CNN.
It has less reconstruction error compared to rasterization with polygons.
However we observe no difference in accuracy,
but BitMasks requires more memory to store all the masks.
Args:
boxes (Tensor): Nx4 tensor storing the boxes for each mask
mask_size (int): the size of the rasterized mask.
Returns:
Tensor:
A bool tensor of shape (N, mask_size, mask_size), where
N is the number of predicted boxes for this image.
"""
assert len(boxes) == len(self), "{} != {}".format(len(boxes), len(self))
device = self.tensor.device
batch_inds = torch.arange(len(boxes), device=device).to(dtype=boxes.dtype)[:, None]
rois = torch.cat([batch_inds, boxes], dim=1) # Nx5
bit_masks = self.tensor.to(dtype=torch.float32)
rois = rois.to(device=device)
output = (
ROIAlign((mask_size, mask_size), 1.0, 0, aligned=True)
.forward(bit_masks[:, None, :, :], rois)
.squeeze(1)
)
output = output >= 0.5
return output
def get_bounding_boxes(self) -> None:
# not needed now
raise NotImplementedError
@staticmethod
def cat(bitmasks_list: List["BitMasks"]) -> "BitMasks":
"""
Concatenates a list of BitMasks into a single BitMasks
Arguments:
bitmasks_list (list[BitMasks])
Returns:
BitMasks: the concatenated BitMasks
"""
assert isinstance(bitmasks_list, (list, tuple))
assert len(bitmasks_list) > 0
assert all(isinstance(bitmask, BitMasks) for bitmask in bitmasks_list)
cat_bitmasks = type(bitmasks_list[0])(torch.cat([bm.tensor for bm in bitmasks_list], dim=0))
return cat_bitmasks
class PolygonMasks:
"""
This class stores the segmentation masks for all objects in one image, in the form of polygons.
Attributes:
polygons: list[list[ndarray]]. Each ndarray is a float64 vector representing a polygon.
"""
def __init__(self, polygons: List[List[Union[torch.Tensor, np.ndarray]]]):
"""
Arguments:
polygons (list[list[np.ndarray]]): The first
level of the list correspond to individual instances,
the second level to all the polygons that compose the
instance, and the third level to the polygon coordinates.
The third level array should have the format of
[x0, y0, x1, y1, ..., xn, yn] (n >= 3).
"""
assert isinstance(polygons, list), (
"Cannot create PolygonMasks: Expect a list of list of polygons per image. "
"Got '{}' instead.".format(type(polygons))
)
def _make_array(t: Union[torch.Tensor, np.ndarray]) -> np.ndarray:
# Use float64 for higher precision, because why not?
# Always put polygons on CPU (self.to is a no-op) since they
# are supposed to be small tensors.
# May need to change this assumption if GPU placement becomes useful
if isinstance(t, torch.Tensor):
t = t.cpu().numpy()
return np.asarray(t).astype("float64")
def process_polygons(
polygons_per_instance: List[Union[torch.Tensor, np.ndarray]]
) -> List[np.ndarray]:
assert isinstance(polygons_per_instance, list), (
"Cannot create polygons: Expect a list of polygons per instance. "
"Got '{}' instead.".format(type(polygons_per_instance))
)
# transform the polygon to a tensor
polygons_per_instance = [_make_array(p) for p in polygons_per_instance]
for polygon in polygons_per_instance:
assert len(polygon) % 2 == 0 and len(polygon) >= 6
return polygons_per_instance
self.polygons: List[List[np.ndarray]] = [
process_polygons(polygons_per_instance) for polygons_per_instance in polygons
]
def to(self, *args: Any, **kwargs: Any) -> "PolygonMasks":
return self
@property
def device(self) -> torch.device:
return torch.device("cpu")
def get_bounding_boxes(self) -> Boxes:
"""
Returns:
Boxes: tight bounding boxes around polygon masks.
"""
boxes = torch.zeros(len(self.polygons), 4, dtype=torch.float32)
for idx, polygons_per_instance in enumerate(self.polygons):
minxy = torch.as_tensor([float("inf"), float("inf")], dtype=torch.float32)
maxxy = torch.zeros(2, dtype=torch.float32)
for polygon in polygons_per_instance:
coords = torch.from_numpy(polygon).view(-1, 2).to(dtype=torch.float32)
minxy = torch.min(minxy, torch.min(coords, dim=0).values)
maxxy = torch.max(maxxy, torch.max(coords, dim=0).values)
boxes[idx, :2] = minxy
boxes[idx, 2:] = maxxy
return Boxes(boxes)
def nonempty(self) -> torch.Tensor:
"""
Find masks that are non-empty.
Returns:
Tensor:
a BoolTensor which represents whether each mask is empty (False) or not (True).
"""
keep = [1 if len(polygon) > 0 else 0 for polygon in self.polygons]
return torch.from_numpy(np.asarray(keep, dtype=np.bool))
def __getitem__(self, item: Union[int, slice, List[int], torch.BoolTensor]) -> "PolygonMasks":
"""
Support indexing over the instances and return a `PolygonMasks` object.
`item` can be:
1. An integer. It will return an object with only one instance.
2. A slice. It will return an object with the selected instances.
3. A list[int]. It will return an object with the selected instances,
correpsonding to the indices in the list.
4. A vector mask of type BoolTensor, whose length is num_instances.
It will return an object with the instances whose mask is nonzero.
"""
if isinstance(item, int):
selected_polygons = [self.polygons[item]]
elif isinstance(item, slice):
selected_polygons = self.polygons[item]
elif isinstance(item, list):
selected_polygons = [self.polygons[i] for i in item]
elif isinstance(item, torch.Tensor):
# Polygons is a list, so we have to move the indices back to CPU.
if item.dtype == torch.bool:
assert item.dim() == 1, item.shape
item = item.nonzero().squeeze(1).cpu().numpy().tolist()
elif item.dtype in [torch.int32, torch.int64]:
item = item.cpu().numpy().tolist()
else:
raise ValueError("Unsupported tensor dtype={} for indexing!".format(item.dtype))
selected_polygons = [self.polygons[i] for i in item]
return PolygonMasks(selected_polygons)
def __iter__(self) -> Iterator[List[np.ndarray]]:
"""
Yields:
list[ndarray]: the polygons for one instance.
Each Tensor is a float64 vector representing a polygon.
"""
return iter(self.polygons)
def __repr__(self) -> str:
s = self.__class__.__name__ + "("
s += "num_instances={})".format(len(self.polygons))
return s
def __len__(self) -> int:
return len(self.polygons)
def crop_and_resize(self, boxes: torch.Tensor, mask_size: int) -> torch.Tensor:
"""
Crop each mask by the given box, and resize results to (mask_size, mask_size).
This can be used to prepare training targets for Mask R-CNN.
Args:
boxes (Tensor): Nx4 tensor storing the boxes for each mask
mask_size (int): the size of the rasterized mask.
Returns:
Tensor: A bool tensor of shape (N, mask_size, mask_size), where
N is the number of predicted boxes for this image.
"""
assert len(boxes) == len(self), "{} != {}".format(len(boxes), len(self))
device = boxes.device
# Put boxes on the CPU, as the polygon representation is not efficient GPU-wise
# (several small tensors for representing a single instance mask)
boxes = boxes.to(torch.device("cpu"))
results = [
rasterize_polygons_within_box(poly, box.numpy(), mask_size)
for poly, box in zip(self.polygons, boxes)
]
"""
poly: list[list[float]], the polygons for one instance
box: a tensor of shape (4,)
"""
if len(results) == 0:
return torch.empty(0, mask_size, mask_size, dtype=torch.bool, device=device)
return torch.stack(results, dim=0).to(device=device)
def area(self):
"""
Computes area of the mask.
Only works with Polygons, using the shoelace formula:
https://stackoverflow.com/questions/24467972/calculate-area-of-polygon-given-x-y-coordinates
Returns:
Tensor: a vector, area for each instance
"""
area = []
for polygons_per_instance in self.polygons:
area_per_instance = 0
for p in polygons_per_instance:
area_per_instance += polygon_area(p[0::2], p[1::2])
area.append(area_per_instance)
return torch.tensor(area)
@staticmethod
def cat(polymasks_list: List["PolygonMasks"]) -> "PolygonMasks":
"""
Concatenates a list of PolygonMasks into a single PolygonMasks
Arguments:
polymasks_list (list[PolygonMasks])
Returns:
PolygonMasks: the concatenated PolygonMasks
"""
assert isinstance(polymasks_list, (list, tuple))
assert len(polymasks_list) > 0
assert all(isinstance(polymask, PolygonMasks) for polymask in polymasks_list)
cat_polymasks = type(polymasks_list[0])(
list(itertools.chain.from_iterable(pm.polygons for pm in polymasks_list))
)
return cat_polymasks
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import math
from typing import Iterator, Union
import torch
from detectron2.layers.rotated_boxes import pairwise_iou_rotated
from .boxes import Boxes
class RotatedBoxes(Boxes):
"""
This structure stores a list of rotated boxes as a Nx5 torch.Tensor.
It supports some common methods about boxes
(`area`, `clip`, `nonempty`, etc),
and also behaves like a Tensor
(support indexing, `to(device)`, `.device`, and iteration over all boxes)
"""
def __init__(self, tensor: torch.Tensor):
"""
Args:
tensor (Tensor[float]): a Nx5 matrix. Each row is
(x_center, y_center, width, height, angle),
in which angle is represented in degrees.
While there's no strict range restriction for it,
the recommended principal range is between [-180, 180) degrees.
Assume we have a horizontal box B = (x_center, y_center, width, height),
where width is along the x-axis and height is along the y-axis.
The rotated box B_rot (x_center, y_center, width, height, angle)
can be seen as:
1. When angle == 0:
B_rot == B
2. When angle > 0:
B_rot is obtained by rotating B w.r.t its center by :math:`|angle|` degrees CCW;
3. When angle < 0:
B_rot is obtained by rotating B w.r.t its center by :math:`|angle|` degrees CW.
Mathematically, since the right-handed coordinate system for image space
is (y, x), where y is top->down and x is left->right, the 4 vertices of the
rotated rectangle :math:`(yr_i, xr_i)` (i = 1, 2, 3, 4) can be obtained from
the vertices of the horizontal rectangle (y_i, x_i) (i = 1, 2, 3, 4)
in the following way (:math:`\\theta = angle*\\pi/180` is the angle in radians,
(y_c, x_c) is the center of the rectangle):
.. math::
yr_i = \\cos(\\theta) (y_i - y_c) - \\sin(\\theta) (x_i - x_c) + y_c,
xr_i = \\sin(\\theta) (y_i - y_c) + \\cos(\\theta) (x_i - x_c) + x_c,
which is the standard rigid-body rotation transformation.
Intuitively, the angle is
(1) the rotation angle from y-axis in image space
to the height vector (top->down in the box's local coordinate system)
of the box in CCW, and
(2) the rotation angle from x-axis in image space
to the width vector (left->right in the box's local coordinate system)
of the box in CCW.
More intuitively, consider the following horizontal box ABCD represented
in (x1, y1, x2, y2): (3, 2, 7, 4),
covering the [3, 7] x [2, 4] region of the continuous coordinate system
which looks like this:
.. code:: none
O--------> x
|
| A---B
| | |
| D---C
|
v y
Note that each capital letter represents one 0-dimensional geometric point
instead of a 'square pixel' here.
In the example above, using (x, y) to represent a point we have:
.. math::
O = (0, 0), A = (3, 2), B = (7, 2), C = (7, 4), D = (3, 4)
We name vector AB = vector DC as the width vector in box's local coordinate system, and
vector AD = vector BC as the height vector in box's local coordinate system. Initially,
when angle = 0 degree, they're aligned with the positive directions of x-axis and y-axis
in the image space, respectively.
For better illustration, we denote the center of the box as E,
.. code:: none
O--------> x
|
| A---B
| | E |
| D---C
|
v y
where the center E = ((3+7)/2, (2+4)/2) = (5, 3).
Also,
.. math::
width = |AB| = |CD| = 7 - 3 = 4,
height = |AD| = |BC| = 4 - 2 = 2.
Therefore, the corresponding representation for the same shape in rotated box in
(x_center, y_center, width, height, angle) format is:
(5, 3, 4, 2, 0),
Now, let's consider (5, 3, 4, 2, 90), which is rotated by 90 degrees
CCW (counter-clockwise) by definition. It looks like this:
.. code:: none
O--------> x
| B-C
| | |
| |E|
| | |
| A-D
v y
The center E is still located at the same point (5, 3), while the vertices
ABCD are rotated by 90 degrees CCW with regard to E:
A = (4, 5), B = (4, 1), C = (6, 1), D = (6, 5)
Here, 90 degrees can be seen as the CCW angle to rotate from y-axis to
vector AD or vector BC (the top->down height vector in box's local coordinate system),
or the CCW angle to rotate from x-axis to vector AB or vector DC (the left->right
width vector in box's local coordinate system).
.. math::
width = |AB| = |CD| = 5 - 1 = 4,
height = |AD| = |BC| = 6 - 4 = 2.
Next, how about (5, 3, 4, 2, -90), which is rotated by 90 degrees CW (clockwise)
by definition? It looks like this:
.. code:: none
O--------> x
| D-A
| | |
| |E|
| | |
| C-B
v y
The center E is still located at the same point (5, 3), while the vertices
ABCD are rotated by 90 degrees CW with regard to E:
A = (6, 1), B = (6, 5), C = (4, 5), D = (4, 1)
.. math::
width = |AB| = |CD| = 5 - 1 = 4,
height = |AD| = |BC| = 6 - 4 = 2.
This covers exactly the same region as (5, 3, 4, 2, 90) does, and their IoU
will be 1. However, these two will generate different RoI Pooling results and
should not be treated as an identical box.
On the other hand, it's easy to see that (X, Y, W, H, A) is identical to
(X, Y, W, H, A+360N), for any integer N. For example (5, 3, 4, 2, 270) would be
identical to (5, 3, 4, 2, -90), because rotating the shape 270 degrees CCW is
equivalent to rotating the same shape 90 degrees CW.
We could rotate further to get (5, 3, 4, 2, 180), or (5, 3, 4, 2, -180):
.. code:: none
O--------> x
|
| C---D
| | E |
| B---A
|
v y
.. math::
A = (7, 4), B = (3, 4), C = (3, 2), D = (7, 2),
width = |AB| = |CD| = 7 - 3 = 4,
height = |AD| = |BC| = 4 - 2 = 2.
Finally, this is a very inaccurate (heavily quantized) illustration of
how (5, 3, 4, 2, 60) looks like in case anyone wonders:
.. code:: none
O--------> x
| B\
| / C
| /E /
| A /
| `D
v y
It's still a rectangle with center of (5, 3), width of 4 and height of 2,
but its angle (and thus orientation) is somewhere between
(5, 3, 4, 2, 0) and (5, 3, 4, 2, 90).
"""
device = tensor.device if isinstance(tensor, torch.Tensor) else torch.device("cpu")
tensor = torch.as_tensor(tensor, dtype=torch.float32, device=device)
if tensor.numel() == 0:
# Use reshape, so we don't end up creating a new tensor that does not depend on
# the inputs (and consequently confuses jit)
tensor = tensor.reshape((0, 5)).to(dtype=torch.float32, device=device)
assert tensor.dim() == 2 and tensor.size(-1) == 5, tensor.size()
self.tensor = tensor
def clone(self) -> "RotatedBoxes":
"""
Clone the RotatedBoxes.
Returns:
RotatedBoxes
"""
return RotatedBoxes(self.tensor.clone())
def to(self, device: str) -> "RotatedBoxes":
return RotatedBoxes(self.tensor.to(device))
def area(self) -> torch.Tensor:
"""
Computes the area of all the boxes.
Returns:
torch.Tensor: a vector with areas of each box.
"""
box = self.tensor
area = box[:, 2] * box[:, 3]
return area
def normalize_angles(self) -> None:
"""
Restrict angles to the range of [-180, 180) degrees
"""
self.tensor[:, 4] = (self.tensor[:, 4] + 180.0) % 360.0 - 180.0
def clip(self, box_size: Boxes.BoxSizeType, clip_angle_threshold: float = 1.0) -> None:
"""
Clip (in place) the boxes by limiting x coordinates to the range [0, width]
and y coordinates to the range [0, height].
For RRPN:
Only clip boxes that are almost horizontal with a tolerance of
clip_angle_threshold to maintain backward compatibility.
Rotated boxes beyond this threshold are not clipped for two reasons:
1. There are potentially multiple ways to clip a rotated box to make it
fit within the image.
2. It's tricky to make the entire rectangular box fit within the image
and still be able to not leave out pixels of interest.
Therefore we rely on ops like RoIAlignRotated to safely handle this.
Args:
box_size (height, width): The clipping box's size.
clip_angle_threshold:
Iff. abs(normalized(angle)) <= clip_angle_threshold (in degrees),
we do the clipping as horizontal boxes.
"""
h, w = box_size
# normalize angles to be within (-180, 180] degrees
self.normalize_angles()
idx = torch.where(torch.abs(self.tensor[:, 4]) <= clip_angle_threshold)[0]
# convert to (x1, y1, x2, y2)
x1 = self.tensor[idx, 0] - self.tensor[idx, 2] / 2.0
y1 = self.tensor[idx, 1] - self.tensor[idx, 3] / 2.0
x2 = self.tensor[idx, 0] + self.tensor[idx, 2] / 2.0
y2 = self.tensor[idx, 1] + self.tensor[idx, 3] / 2.0
# clip
x1.clamp_(min=0, max=w)
y1.clamp_(min=0, max=h)
x2.clamp_(min=0, max=w)
y2.clamp_(min=0, max=h)
# convert back to (xc, yc, w, h)
self.tensor[idx, 0] = (x1 + x2) / 2.0
self.tensor[idx, 1] = (y1 + y2) / 2.0
# make sure widths and heights do not increase due to numerical errors
self.tensor[idx, 2] = torch.min(self.tensor[idx, 2], x2 - x1)
self.tensor[idx, 3] = torch.min(self.tensor[idx, 3], y2 - y1)
def nonempty(self, threshold: float = 0.0) -> torch.Tensor:
"""
Find boxes that are non-empty.
A box is considered empty, if either of its side is no larger than threshold.
Returns:
Tensor: a binary vector which represents
whether each box is empty (False) or non-empty (True).
"""
box = self.tensor
widths = box[:, 2]
heights = box[:, 3]
keep = (widths > threshold) & (heights > threshold)
return keep
def __getitem__(self, item: Union[int, slice, torch.BoolTensor]) -> "RotatedBoxes":
"""
Returns:
RotatedBoxes: Create a new :class:`RotatedBoxes` by indexing.
The following usage are allowed:
1. `new_boxes = boxes[3]`: return a `RotatedBoxes` which contains only one box.
2. `new_boxes = boxes[2:10]`: return a slice of boxes.
3. `new_boxes = boxes[vector]`, where vector is a torch.ByteTensor
with `length = len(boxes)`. Nonzero elements in the vector will be selected.
Note that the returned RotatedBoxes might share storage with this RotatedBoxes,
subject to Pytorch's indexing semantics.
"""
if isinstance(item, int):
return RotatedBoxes(self.tensor[item].view(1, -1))
b = self.tensor[item]
assert b.dim() == 2, "Indexing on RotatedBoxes with {} failed to return a matrix!".format(
item
)
return RotatedBoxes(b)
def __len__(self) -> int:
return self.tensor.shape[0]
def __repr__(self) -> str:
return "RotatedBoxes(" + str(self.tensor) + ")"
def inside_box(self, box_size: Boxes.BoxSizeType, boundary_threshold: int = 0) -> torch.Tensor:
"""
Args:
box_size (height, width): Size of the reference box covering
[0, width] x [0, height]
boundary_threshold (int): Boxes that extend beyond the reference box
boundary by more than boundary_threshold are considered "outside".
For RRPN, it might not be necessary to call this function since it's common
for rotated box to extend to outside of the image boundaries
(the clip function only clips the near-horizontal boxes)
Returns:
a binary vector, indicating whether each box is inside the reference box.
"""
height, width = box_size
cnt_x = self.tensor[..., 0]
cnt_y = self.tensor[..., 1]
half_w = self.tensor[..., 2] / 2.0
half_h = self.tensor[..., 3] / 2.0
a = self.tensor[..., 4]
c = torch.abs(torch.cos(a * math.pi / 180.0))
s = torch.abs(torch.sin(a * math.pi / 180.0))
# This basically computes the horizontal bounding rectangle of the rotated box
max_rect_dx = c * half_w + s * half_h
max_rect_dy = c * half_h + s * half_w
inds_inside = (
(cnt_x - max_rect_dx >= -boundary_threshold)
& (cnt_y - max_rect_dy >= -boundary_threshold)
& (cnt_x + max_rect_dx < width + boundary_threshold)
& (cnt_y + max_rect_dy < height + boundary_threshold)
)
return inds_inside
def get_centers(self) -> torch.Tensor:
"""
Returns:
The box centers in a Nx2 array of (x, y).
"""
return self.tensor[:, :2]
def scale(self, scale_x: float, scale_y: float) -> None:
"""
Scale the rotated box with horizontal and vertical scaling factors
Note: when scale_factor_x != scale_factor_y,
the rotated box does not preserve the rectangular shape when the angle
is not a multiple of 90 degrees under resize transformation.
Instead, the shape is a parallelogram (that has skew)
Here we make an approximation by fitting a rotated rectangle to the parallelogram.
"""
self.tensor[:, 0] *= scale_x
self.tensor[:, 1] *= scale_y
theta = self.tensor[:, 4] * math.pi / 180.0
c = torch.cos(theta)
s = torch.sin(theta)
# In image space, y is top->down and x is left->right
# Consider the local coordintate system for the rotated box,
# where the box center is located at (0, 0), and the four vertices ABCD are
# A(-w / 2, -h / 2), B(w / 2, -h / 2), C(w / 2, h / 2), D(-w / 2, h / 2)
# the midpoint of the left edge AD of the rotated box E is:
# E = (A+D)/2 = (-w / 2, 0)
# the midpoint of the top edge AB of the rotated box F is:
# F(0, -h / 2)
# To get the old coordinates in the global system, apply the rotation transformation
# (Note: the right-handed coordinate system for image space is yOx):
# (old_x, old_y) = (s * y + c * x, c * y - s * x)
# E(old) = (s * 0 + c * (-w/2), c * 0 - s * (-w/2)) = (-c * w / 2, s * w / 2)
# F(old) = (s * (-h / 2) + c * 0, c * (-h / 2) - s * 0) = (-s * h / 2, -c * h / 2)
# After applying the scaling factor (sfx, sfy):
# E(new) = (-sfx * c * w / 2, sfy * s * w / 2)
# F(new) = (-sfx * s * h / 2, -sfy * c * h / 2)
# The new width after scaling tranformation becomes:
# w(new) = |E(new) - O| * 2
# = sqrt[(sfx * c * w / 2)^2 + (sfy * s * w / 2)^2] * 2
# = sqrt[(sfx * c)^2 + (sfy * s)^2] * w
# i.e., scale_factor_w = sqrt[(sfx * c)^2 + (sfy * s)^2]
#
# For example,
# when angle = 0 or 180, |c| = 1, s = 0, scale_factor_w == scale_factor_x;
# when |angle| = 90, c = 0, |s| = 1, scale_factor_w == scale_factor_y
self.tensor[:, 2] *= torch.sqrt((scale_x * c) ** 2 + (scale_y * s) ** 2)
# h(new) = |F(new) - O| * 2
# = sqrt[(sfx * s * h / 2)^2 + (sfy * c * h / 2)^2] * 2
# = sqrt[(sfx * s)^2 + (sfy * c)^2] * h
# i.e., scale_factor_h = sqrt[(sfx * s)^2 + (sfy * c)^2]
#
# For example,
# when angle = 0 or 180, |c| = 1, s = 0, scale_factor_h == scale_factor_y;
# when |angle| = 90, c = 0, |s| = 1, scale_factor_h == scale_factor_x
self.tensor[:, 3] *= torch.sqrt((scale_x * s) ** 2 + (scale_y * c) ** 2)
# The angle is the rotation angle from y-axis in image space to the height
# vector (top->down in the box's local coordinate system) of the box in CCW.
#
# angle(new) = angle_yOx(O - F(new))
# = angle_yOx( (sfx * s * h / 2, sfy * c * h / 2) )
# = atan2(sfx * s * h / 2, sfy * c * h / 2)
# = atan2(sfx * s, sfy * c)
#
# For example,
# when sfx == sfy, angle(new) == atan2(s, c) == angle(old)
self.tensor[:, 4] = torch.atan2(scale_x * s, scale_y * c) * 180 / math.pi
@property
def device(self) -> str:
return self.tensor.device
def __iter__(self) -> Iterator[torch.Tensor]:
"""
Yield a box as a Tensor of shape (5,) at a time.
"""
yield from self.tensor
def pairwise_iou(boxes1: RotatedBoxes, boxes2: RotatedBoxes) -> None:
"""
Given two lists of rotated boxes of size N and M,
compute the IoU (intersection over union)
between __all__ N x M pairs of boxes.
The box order must be (x_center, y_center, width, height, angle).
Args:
boxes1, boxes2 (RotatedBoxes):
two `RotatedBoxes`. Contains N & M rotated boxes, respectively.
Returns:
Tensor: IoU, sized [N,M].
"""
return pairwise_iou_rotated(boxes1.tensor, boxes2.tensor)
# Utility functions
This folder contain utility functions that are not used in the
core library, but are useful for building models or training
code using the config system.
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
# -*- coding: utf-8 -*-
import logging
import typing
import torch
from fvcore.nn import activation_count, flop_count, parameter_count, parameter_count_table
from torch import nn
from detectron2.structures import BitMasks, Boxes, ImageList, Instances
from .logger import log_first_n
__all__ = [
"activation_count_operators",
"flop_count_operators",
"parameter_count_table",
"parameter_count",
]
FLOPS_MODE = "flops"
ACTIVATIONS_MODE = "activations"
# some extra ops to ignore from counting.
_IGNORED_OPS = [
"aten::add",
"aten::add_",
"aten::batch_norm",
"aten::constant_pad_nd",
"aten::div",
"aten::div_",
"aten::exp",
"aten::log2",
"aten::max_pool2d",
"aten::meshgrid",
"aten::mul",
"aten::mul_",
"aten::nonzero_numpy",
"aten::relu",
"aten::relu_",
"aten::rsub",
"aten::sigmoid",
"aten::sigmoid_",
"aten::softmax",
"aten::sort",
"aten::sqrt",
"aten::sub",
"aten::upsample_nearest2d",
"prim::PythonOp",
"torchvision::nms",
]
def flop_count_operators(
model: nn.Module, inputs: list, **kwargs
) -> typing.DefaultDict[str, float]:
"""
Implement operator-level flops counting using jit.
This is a wrapper of fvcore.nn.flop_count, that supports standard detection models
in detectron2.
Note:
The function runs the input through the model to compute flops.
The flops of a detection model is often input-dependent, for example,
the flops of box & mask head depends on the number of proposals &
the number of detected objects.
Therefore, the flops counting using a single input may not accurately
reflect the computation cost of a model.
Args:
model: a detectron2 model that takes `list[dict]` as input.
inputs (list[dict]): inputs to model, in detectron2's standard format.
"""
return _wrapper_count_operators(model=model, inputs=inputs, mode=FLOPS_MODE, **kwargs)
def activation_count_operators(
model: nn.Module, inputs: list, **kwargs
) -> typing.DefaultDict[str, float]:
"""
Implement operator-level activations counting using jit.
This is a wrapper of fvcore.nn.activation_count, that supports standard detection models
in detectron2.
Note:
The function runs the input through the model to compute activations.
The activations of a detection model is often input-dependent, for example,
the activations of box & mask head depends on the number of proposals &
the number of detected objects.
Args:
model: a detectron2 model that takes `list[dict]` as input.
inputs (list[dict]): inputs to model, in detectron2's standard format.
"""
return _wrapper_count_operators(model=model, inputs=inputs, mode=ACTIVATIONS_MODE, **kwargs)
def _flatten_to_tuple(outputs):
result = []
if isinstance(outputs, torch.Tensor):
result.append(outputs)
elif isinstance(outputs, (list, tuple)):
for v in outputs:
result.extend(_flatten_to_tuple(v))
elif isinstance(outputs, dict):
for _, v in outputs.items():
result.extend(_flatten_to_tuple(v))
elif isinstance(outputs, Instances):
result.extend(_flatten_to_tuple(outputs.get_fields()))
elif isinstance(outputs, (Boxes, BitMasks, ImageList)):
result.append(outputs.tensor)
else:
log_first_n(
logging.WARN,
f"Output of type {type(outputs)} not included in flops/activations count.",
n=10,
)
return tuple(result)
def _wrapper_count_operators(
model: nn.Module, inputs: list, mode: str, **kwargs
) -> typing.DefaultDict[str, float]:
# ignore some ops
supported_ops = {k: lambda *args, **kwargs: {} for k in _IGNORED_OPS}
supported_ops.update(kwargs.pop("supported_ops", {}))
kwargs["supported_ops"] = supported_ops
assert len(inputs) == 1, "Please use batch size=1"
tensor_input = inputs[0]["image"]
class WrapModel(nn.Module):
def __init__(self, model):
super().__init__()
if isinstance(
model, (nn.parallel.distributed.DistributedDataParallel, nn.DataParallel)
):
self.model = model.module
else:
self.model = model
def forward(self, image):
# jit requires the input/output to be Tensors
inputs = [{"image": image}]
outputs = self.model.forward(inputs)
# Only the subgraph that computes the returned tuple of tensor will be
# counted. So we flatten everything we found to tuple of tensors.
return _flatten_to_tuple(outputs)
old_train = model.training
with torch.no_grad():
if mode == FLOPS_MODE:
ret = flop_count(WrapModel(model).train(False), (tensor_input,), **kwargs)
elif mode == ACTIVATIONS_MODE:
ret = activation_count(WrapModel(model).train(False), (tensor_input,), **kwargs)
else:
raise NotImplementedError("Count for mode {} is not supported yet.".format(mode))
# compatible with change in fvcore
if isinstance(ret, tuple):
ret = ret[0]
model.train(old_train)
return ret
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import importlib
import numpy as np
import os
import re
import subprocess
import sys
from collections import defaultdict
import PIL
import torch
import torchvision
from tabulate import tabulate
__all__ = ["collect_env_info"]
def collect_torch_env():
try:
import torch.__config__
return torch.__config__.show()
except ImportError:
# compatible with older versions of pytorch
from torch.utils.collect_env import get_pretty_env_info
return get_pretty_env_info()
def get_env_module():
var_name = "DETECTRON2_ENV_MODULE"
return var_name, os.environ.get(var_name, "<not set>")
def detect_compute_compatibility(CUDA_HOME, so_file):
try:
cuobjdump = os.path.join(CUDA_HOME, "bin", "cuobjdump")
if os.path.isfile(cuobjdump):
output = subprocess.check_output(
"'{}' --list-elf '{}'".format(cuobjdump, so_file), shell=True
)
output = output.decode("utf-8").strip().split("\n")
sm = []
for line in output:
line = re.findall(r"\.sm_[0-9]*\.", line)[0]
sm.append(line.strip("."))
sm = sorted(set(sm))
return ", ".join(sm)
else:
return so_file + "; cannot find cuobjdump"
except Exception:
# unhandled failure
return so_file
def collect_env_info():
has_cuda = torch.cuda.is_available()
# NOTE: the use of CUDA_HOME requires the CUDA build deps, though in
# theory detectron2 should be made runnable with only the CUDA runtime
from torch.utils.cpp_extension import CUDA_HOME
data = []
data.append(("sys.platform", sys.platform))
data.append(("Python", sys.version.replace("\n", "")))
data.append(("numpy", np.__version__))
try:
import detectron2 # noqa
data.append(
("detectron2", detectron2.__version__ + " @" + os.path.dirname(detectron2.__file__))
)
except ImportError:
data.append(("detectron2", "failed to import"))
else:
try:
from detectron2 import _C
except ImportError:
data.append(("detectron2._C", "failed to import"))
else:
data.append(("detectron2 compiler", _C.get_compiler_version()))
data.append(("detectron2 CUDA compiler", _C.get_cuda_version()))
if has_cuda:
data.append(
("detectron2 arch flags", detect_compute_compatibility(CUDA_HOME, _C.__file__))
)
data.append(get_env_module())
data.append(("PyTorch", torch.__version__ + " @" + os.path.dirname(torch.__file__)))
data.append(("PyTorch debug build", torch.version.debug))
data.append(("CUDA available", has_cuda))
if has_cuda:
devices = defaultdict(list)
for k in range(torch.cuda.device_count()):
devices[torch.cuda.get_device_name(k)].append(str(k))
for name, devids in devices.items():
data.append(("GPU " + ",".join(devids), name))
from torch.utils.cpp_extension import CUDA_HOME
data.append(("CUDA_HOME", str(CUDA_HOME)))
if CUDA_HOME is not None and os.path.isdir(CUDA_HOME):
try:
nvcc = os.path.join(CUDA_HOME, "bin", "nvcc")
nvcc = subprocess.check_output("'{}' -V | tail -n1".format(nvcc), shell=True)
nvcc = nvcc.decode("utf-8").strip()
except subprocess.SubprocessError:
nvcc = "Not Available"
data.append(("NVCC", nvcc))
cuda_arch_list = os.environ.get("TORCH_CUDA_ARCH_LIST", None)
if cuda_arch_list:
data.append(("TORCH_CUDA_ARCH_LIST", cuda_arch_list))
data.append(("Pillow", PIL.__version__))
try:
data.append(
(
"torchvision",
str(torchvision.__version__) + " @" + os.path.dirname(torchvision.__file__),
)
)
if has_cuda:
try:
torchvision_C = importlib.util.find_spec("torchvision._C").origin
msg = detect_compute_compatibility(CUDA_HOME, torchvision_C)
data.append(("torchvision arch flags", msg))
except ImportError:
data.append(("torchvision._C", "failed to find"))
except AttributeError:
data.append(("torchvision", "unknown"))
try:
import fvcore
data.append(("fvcore", fvcore.__version__))
except ImportError:
pass
try:
import cv2
data.append(("cv2", cv2.__version__))
except ImportError:
pass
env_str = tabulate(data) + "\n"
env_str += collect_torch_env()
return env_str
if __name__ == "__main__":
try:
import detectron2 # noqa
except ImportError:
print(collect_env_info())
else:
from detectron2.utils.collect_env import collect_env_info
print(collect_env_info())
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
"""
An awesome colormap for really neat visualizations.
Copied from Detectron, and removed gray colors.
"""
import numpy as np
__all__ = ["colormap", "random_color"]
# fmt: off
# RGB:
_COLORS = np.array(
[
0.000, 0.447, 0.741,
0.850, 0.325, 0.098,
0.929, 0.694, 0.125,
0.494, 0.184, 0.556,
0.466, 0.674, 0.188,
0.301, 0.745, 0.933,
0.635, 0.078, 0.184,
0.300, 0.300, 0.300,
0.600, 0.600, 0.600,
1.000, 0.000, 0.000,
1.000, 0.500, 0.000,
0.749, 0.749, 0.000,
0.000, 1.000, 0.000,
0.000, 0.000, 1.000,
0.667, 0.000, 1.000,
0.333, 0.333, 0.000,
0.333, 0.667, 0.000,
0.333, 1.000, 0.000,
0.667, 0.333, 0.000,
0.667, 0.667, 0.000,
0.667, 1.000, 0.000,
1.000, 0.333, 0.000,
1.000, 0.667, 0.000,
1.000, 1.000, 0.000,
0.000, 0.333, 0.500,
0.000, 0.667, 0.500,
0.000, 1.000, 0.500,
0.333, 0.000, 0.500,
0.333, 0.333, 0.500,
0.333, 0.667, 0.500,
0.333, 1.000, 0.500,
0.667, 0.000, 0.500,
0.667, 0.333, 0.500,
0.667, 0.667, 0.500,
0.667, 1.000, 0.500,
1.000, 0.000, 0.500,
1.000, 0.333, 0.500,
1.000, 0.667, 0.500,
1.000, 1.000, 0.500,
0.000, 0.333, 1.000,
0.000, 0.667, 1.000,
0.000, 1.000, 1.000,
0.333, 0.000, 1.000,
0.333, 0.333, 1.000,
0.333, 0.667, 1.000,
0.333, 1.000, 1.000,
0.667, 0.000, 1.000,
0.667, 0.333, 1.000,
0.667, 0.667, 1.000,
0.667, 1.000, 1.000,
1.000, 0.000, 1.000,
1.000, 0.333, 1.000,
1.000, 0.667, 1.000,
0.333, 0.000, 0.000,
0.500, 0.000, 0.000,
0.667, 0.000, 0.000,
0.833, 0.000, 0.000,
1.000, 0.000, 0.000,
0.000, 0.167, 0.000,
0.000, 0.333, 0.000,
0.000, 0.500, 0.000,
0.000, 0.667, 0.000,
0.000, 0.833, 0.000,
0.000, 1.000, 0.000,
0.000, 0.000, 0.167,
0.000, 0.000, 0.333,
0.000, 0.000, 0.500,
0.000, 0.000, 0.667,
0.000, 0.000, 0.833,
0.000, 0.000, 1.000,
0.000, 0.000, 0.000,
0.143, 0.143, 0.143,
0.857, 0.857, 0.857,
1.000, 1.000, 1.000
]
).astype(np.float32).reshape(-1, 3)
# fmt: on
def colormap(rgb=False, maximum=255):
"""
Args:
rgb (bool): whether to return RGB colors or BGR colors.
maximum (int): either 255 or 1
Returns:
ndarray: a float32 array of Nx3 colors, in range [0, 255] or [0, 1]
"""
assert maximum in [255, 1], maximum
c = _COLORS * maximum
if not rgb:
c = c[:, ::-1]
return c
def random_color(rgb=False, maximum=255):
"""
Args:
rgb (bool): whether to return RGB colors or BGR colors.
maximum (int): either 255 or 1
Returns:
ndarray: a vector of 3 numbers
"""
idx = np.random.randint(0, len(_COLORS))
ret = _COLORS[idx] * maximum
if not rgb:
ret = ret[::-1]
return ret
if __name__ == "__main__":
import cv2
size = 100
H, W = 10, 10
canvas = np.random.rand(H * size, W * size, 3).astype("float32")
for h in range(H):
for w in range(W):
idx = h * W + w
if idx >= len(_COLORS):
break
canvas[h * size : (h + 1) * size, w * size : (w + 1) * size] = _COLORS[idx]
cv2.imshow("a", canvas)
cv2.waitKey(0)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
This file contains primitives for multi-gpu communication.
This is useful when doing distributed training.
"""
import functools
import logging
import numpy as np
import pickle
import torch
import torch.distributed as dist
_LOCAL_PROCESS_GROUP = None
"""
A torch process group which only includes processes that on the same machine as the current process.
This variable is set when processes are spawned by `launch()` in "engine/launch.py".
"""
def get_world_size() -> int:
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size()
def get_rank() -> int:
if not dist.is_available():
return 0
if not dist.is_initialized():
return 0
return dist.get_rank()
def get_local_rank() -> int:
"""
Returns:
The rank of the current process within the local (per-machine) process group.
"""
if not dist.is_available():
return 0
if not dist.is_initialized():
return 0
assert _LOCAL_PROCESS_GROUP is not None
return dist.get_rank(group=_LOCAL_PROCESS_GROUP)
def get_local_size() -> int:
"""
Returns:
The size of the per-machine process group,
i.e. the number of processes per machine.
"""
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size(group=_LOCAL_PROCESS_GROUP)
def is_main_process() -> bool:
return get_rank() == 0
def synchronize():
"""
Helper function to synchronize (barrier) among all processes when
using distributed training
"""
if not dist.is_available():
return
if not dist.is_initialized():
return
world_size = dist.get_world_size()
if world_size == 1:
return
dist.barrier()
@functools.lru_cache()
def _get_global_gloo_group():
"""
Return a process group based on gloo backend, containing all the ranks
The result is cached.
"""
if dist.get_backend() == "nccl":
return dist.new_group(backend="gloo")
else:
return dist.group.WORLD
def _serialize_to_tensor(data, group):
backend = dist.get_backend(group)
assert backend in ["gloo", "nccl"]
device = torch.device("cpu" if backend == "gloo" else "cuda")
buffer = pickle.dumps(data)
if len(buffer) > 1024 ** 3:
logger = logging.getLogger(__name__)
logger.warning(
"Rank {} trying to all-gather {:.2f} GB of data on device {}".format(
get_rank(), len(buffer) / (1024 ** 3), device
)
)
storage = torch.ByteStorage.from_buffer(buffer)
tensor = torch.ByteTensor(storage).to(device=device)
return tensor
def _pad_to_largest_tensor(tensor, group):
"""
Returns:
list[int]: size of the tensor, on each rank
Tensor: padded tensor that has the max size
"""
world_size = dist.get_world_size(group=group)
assert (
world_size >= 1
), "comm.gather/all_gather must be called from ranks within the given group!"
local_size = torch.tensor([tensor.numel()], dtype=torch.int64, device=tensor.device)
size_list = [
torch.zeros([1], dtype=torch.int64, device=tensor.device) for _ in range(world_size)
]
dist.all_gather(size_list, local_size, group=group)
size_list = [int(size.item()) for size in size_list]
max_size = max(size_list)
# we pad the tensor because torch all_gather does not support
# gathering tensors of different shapes
if local_size != max_size:
padding = torch.zeros((max_size - local_size,), dtype=torch.uint8, device=tensor.device)
tensor = torch.cat((tensor, padding), dim=0)
return size_list, tensor
def all_gather(data, group=None):
"""
Run all_gather on arbitrary picklable data (not necessarily tensors).
Args:
data: any picklable object
group: a torch process group. By default, will use a group which
contains all ranks on gloo backend.
Returns:
list[data]: list of data gathered from each rank
"""
if get_world_size() == 1:
return [data]
if group is None:
group = _get_global_gloo_group()
if dist.get_world_size(group) == 1:
return [data]
tensor = _serialize_to_tensor(data, group)
size_list, tensor = _pad_to_largest_tensor(tensor, group)
max_size = max(size_list)
# receiving Tensor from all ranks
tensor_list = [
torch.empty((max_size,), dtype=torch.uint8, device=tensor.device) for _ in size_list
]
dist.all_gather(tensor_list, tensor, group=group)
data_list = []
for size, tensor in zip(size_list, tensor_list):
buffer = tensor.cpu().numpy().tobytes()[:size]
data_list.append(pickle.loads(buffer))
return data_list
def gather(data, dst=0, group=None):
"""
Run gather on arbitrary picklable data (not necessarily tensors).
Args:
data: any picklable object
dst (int): destination rank
group: a torch process group. By default, will use a group which
contains all ranks on gloo backend.
Returns:
list[data]: on dst, a list of data gathered from each rank. Otherwise,
an empty list.
"""
if get_world_size() == 1:
return [data]
if group is None:
group = _get_global_gloo_group()
if dist.get_world_size(group=group) == 1:
return [data]
rank = dist.get_rank(group=group)
tensor = _serialize_to_tensor(data, group)
size_list, tensor = _pad_to_largest_tensor(tensor, group)
# receiving Tensor from all ranks
if rank == dst:
max_size = max(size_list)
tensor_list = [
torch.empty((max_size,), dtype=torch.uint8, device=tensor.device) for _ in size_list
]
dist.gather(tensor, tensor_list, dst=dst, group=group)
data_list = []
for size, tensor in zip(size_list, tensor_list):
buffer = tensor.cpu().numpy().tobytes()[:size]
data_list.append(pickle.loads(buffer))
return data_list
else:
dist.gather(tensor, [], dst=dst, group=group)
return []
def shared_random_seed():
"""
Returns:
int: a random number that is the same across all workers.
If workers need a shared RNG, they can use this shared seed to
create one.
All workers must call this function, otherwise it will deadlock.
"""
ints = np.random.randint(2 ** 31)
all_ints = all_gather(ints)
return all_ints[0]
def reduce_dict(input_dict, average=True):
"""
Reduce the values in the dictionary from all processes so that process with rank
0 has the reduced results.
Args:
input_dict (dict): inputs to be reduced. All the values must be scalar CUDA Tensor.
average (bool): whether to do average or sum
Returns:
a dict with the same keys as input_dict, after reduction.
"""
world_size = get_world_size()
if world_size < 2:
return input_dict
with torch.no_grad():
names = []
values = []
# sort the keys so that they are consistent across processes
for k in sorted(input_dict.keys()):
names.append(k)
values.append(input_dict[k])
values = torch.stack(values, dim=0)
dist.reduce(values, dst=0)
if dist.get_rank() == 0 and average:
# only main process gets accumulated, so only divide by
# world_size in this case
values /= world_size
reduced_dict = {k: v for k, v in zip(names, values)}
return reduced_dict
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import importlib
import importlib.util
import logging
import numpy as np
import os
import random
import sys
from datetime import datetime
import torch
__all__ = ["seed_all_rng"]
def seed_all_rng(seed=None):
"""
Set the random seed for the RNG in torch, numpy and python.
Args:
seed (int): if None, will use a strong random seed.
"""
if seed is None:
seed = (
os.getpid()
+ int(datetime.now().strftime("%S%f"))
+ int.from_bytes(os.urandom(2), "big")
)
logger = logging.getLogger(__name__)
logger.info("Using a generated random seed {}".format(seed))
np.random.seed(seed)
torch.set_rng_state(torch.manual_seed(seed).get_state())
random.seed(seed)
# from https://stackoverflow.com/questions/67631/how-to-import-a-module-given-the-full-path
def _import_file(module_name, file_path, make_importable=False):
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if make_importable:
sys.modules[module_name] = module
return module
def _configure_libraries():
"""
Configurations for some libraries.
"""
# An environment option to disable `import cv2` globally,
# in case it leads to negative performance impact
disable_cv2 = int(os.environ.get("DETECTRON2_DISABLE_CV2", False))
if disable_cv2:
sys.modules["cv2"] = None
else:
# Disable opencl in opencv since its interaction with cuda often has negative effects
# This envvar is supported after OpenCV 3.4.0
os.environ["OPENCV_OPENCL_RUNTIME"] = "disabled"
try:
import cv2
if int(cv2.__version__.split(".")[0]) >= 3:
cv2.ocl.setUseOpenCL(False)
except ImportError:
pass
def get_version(module, digit=2):
return tuple(map(int, module.__version__.split(".")[:digit]))
# fmt: off
assert get_version(torch) >= (1, 4), "Requires torch>=1.4"
import fvcore
assert get_version(fvcore, 3) >= (0, 1, 1), "Requires fvcore>=0.1.1"
import yaml
assert get_version(yaml) >= (5, 1), "Requires pyyaml>=5.1"
# fmt: on
_ENV_SETUP_DONE = False
def setup_environment():
"""Perform environment setup work. The default setup is a no-op, but this
function allows the user to specify a Python source file or a module in
the $DETECTRON2_ENV_MODULE environment variable, that performs
custom setup work that may be necessary to their computing environment.
"""
global _ENV_SETUP_DONE
if _ENV_SETUP_DONE:
return
_ENV_SETUP_DONE = True
_configure_libraries()
custom_module_path = os.environ.get("DETECTRON2_ENV_MODULE")
if custom_module_path:
setup_custom_environment(custom_module_path)
else:
# The default setup is a no-op
pass
def setup_custom_environment(custom_module):
"""
Load custom environment setup by importing a Python source file or a
module, and run the setup function.
"""
if custom_module.endswith(".py"):
module = _import_file("detectron2.utils.env.custom_module", custom_module)
else:
module = importlib.import_module(custom_module)
assert hasattr(module, "setup_environment") and callable(module.setup_environment), (
"Custom environment module defined in {} does not have the "
"required callable attribute 'setup_environment'."
).format(custom_module)
module.setup_environment()
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import datetime
import json
import logging
import os
import time
from collections import defaultdict
from contextlib import contextmanager
import torch
from fvcore.common.file_io import PathManager
from fvcore.common.history_buffer import HistoryBuffer
_CURRENT_STORAGE_STACK = []
def get_event_storage():
"""
Returns:
The :class:`EventStorage` object that's currently being used.
Throws an error if no :class:`EventStorage` is currently enabled.
"""
assert len(
_CURRENT_STORAGE_STACK
), "get_event_storage() has to be called inside a 'with EventStorage(...)' context!"
return _CURRENT_STORAGE_STACK[-1]
class EventWriter:
"""
Base class for writers that obtain events from :class:`EventStorage` and process them.
"""
def write(self):
raise NotImplementedError
def close(self):
pass
class JSONWriter(EventWriter):
"""
Write scalars to a json file.
It saves scalars as one json per line (instead of a big json) for easy parsing.
Examples parsing such a json file:
.. code-block:: none
$ cat metrics.json | jq -s '.[0:2]'
[
{
"data_time": 0.008433341979980469,
"iteration": 20,
"loss": 1.9228371381759644,
"loss_box_reg": 0.050025828182697296,
"loss_classifier": 0.5316952466964722,
"loss_mask": 0.7236229181289673,
"loss_rpn_box": 0.0856662318110466,
"loss_rpn_cls": 0.48198649287223816,
"lr": 0.007173333333333333,
"time": 0.25401854515075684
},
{
"data_time": 0.007216215133666992,
"iteration": 40,
"loss": 1.282649278640747,
"loss_box_reg": 0.06222952902317047,
"loss_classifier": 0.30682939291000366,
"loss_mask": 0.6970193982124329,
"loss_rpn_box": 0.038663312792778015,
"loss_rpn_cls": 0.1471673548221588,
"lr": 0.007706666666666667,
"time": 0.2490077018737793
}
]
$ cat metrics.json | jq '.loss_mask'
0.7126231789588928
0.689423680305481
0.6776131987571716
...
"""
def __init__(self, json_file, window_size=20):
"""
Args:
json_file (str): path to the json file. New data will be appended if the file exists.
window_size (int): the window size of median smoothing for the scalars whose
`smoothing_hint` are True.
"""
self._file_handle = PathManager.open(json_file, "a")
self._window_size = window_size
def write(self):
storage = get_event_storage()
to_save = {"iteration": storage.iter}
to_save.update(storage.latest_with_smoothing_hint(self._window_size))
self._file_handle.write(json.dumps(to_save, sort_keys=True) + "\n")
self._file_handle.flush()
try:
os.fsync(self._file_handle.fileno())
except AttributeError:
pass
def close(self):
self._file_handle.close()
class TensorboardXWriter(EventWriter):
"""
Write all scalars to a tensorboard file.
"""
def __init__(self, log_dir: str, window_size: int = 20, **kwargs):
"""
Args:
log_dir (str): the directory to save the output events
window_size (int): the scalars will be median-smoothed by this window size
kwargs: other arguments passed to `torch.utils.tensorboard.SummaryWriter(...)`
"""
self._window_size = window_size
from torch.utils.tensorboard import SummaryWriter
self._writer = SummaryWriter(log_dir, **kwargs)
def write(self):
storage = get_event_storage()
for k, v in storage.latest_with_smoothing_hint(self._window_size).items():
self._writer.add_scalar(k, v, storage.iter)
# storage.put_{image,histogram} is only meant to be used by
# tensorboard writer. So we access its internal fields directly from here.
if len(storage._vis_data) >= 1:
for img_name, img, step_num in storage._vis_data:
self._writer.add_image(img_name, img, step_num)
# Storage stores all image data and rely on this writer to clear them.
# As a result it assumes only one writer will use its image data.
# An alternative design is to let storage store limited recent
# data (e.g. only the most recent image) that all writers can access.
# In that case a writer may not see all image data if its period is long.
storage.clear_images()
if len(storage._histograms) >= 1:
for params in storage._histograms:
self._writer.add_histogram_raw(**params)
storage.clear_histograms()
def close(self):
if hasattr(self, "_writer"): # doesn't exist when the code fails at import
self._writer.close()
class CommonMetricPrinter(EventWriter):
"""
Print **common** metrics to the terminal, including
iteration time, ETA, memory, all losses, and the learning rate.
To print something different, please implement a similar printer by yourself.
"""
def __init__(self, max_iter):
"""
Args:
max_iter (int): the maximum number of iterations to train.
Used to compute ETA.
"""
self.logger = logging.getLogger(__name__)
self._max_iter = max_iter
self._last_write = None
def write(self):
storage = get_event_storage()
iteration = storage.iter
try:
data_time = storage.history("data_time").avg(20)
except KeyError:
# they may not exist in the first few iterations (due to warmup)
# or when SimpleTrainer is not used
data_time = None
eta_string = None
try:
iter_time = storage.history("time").global_avg()
eta_seconds = storage.history("time").median(1000) * (self._max_iter - iteration)
storage.put_scalar("eta_seconds", eta_seconds, smoothing_hint=False)
eta_string = str(datetime.timedelta(seconds=int(eta_seconds)))
except KeyError:
iter_time = None
# estimate eta on our own - more noisy
if self._last_write is not None:
estimate_iter_time = (time.perf_counter() - self._last_write[1]) / (
iteration - self._last_write[0]
)
eta_seconds = estimate_iter_time * (self._max_iter - iteration)
eta_string = str(datetime.timedelta(seconds=int(eta_seconds)))
self._last_write = (iteration, time.perf_counter())
try:
lr = "{:.6f}".format(storage.history("lr").latest())
except KeyError:
lr = "N/A"
if torch.cuda.is_available():
max_mem_mb = torch.cuda.max_memory_allocated() / 1024.0 / 1024.0
else:
max_mem_mb = None
# NOTE: max_mem is parsed by grep in "dev/parse_results.sh"
self.logger.info(
" {eta}iter: {iter} {losses} {time}{data_time}lr: {lr} {memory}".format(
eta=f"eta: {eta_string} " if eta_string else "",
iter=iteration,
losses=" ".join(
[
"{}: {:.3f}".format(k, v.median(20))
for k, v in storage.histories().items()
if "loss" in k
]
),
time="time: {:.4f} ".format(iter_time) if iter_time is not None else "",
data_time="data_time: {:.4f} ".format(data_time) if data_time is not None else "",
lr=lr,
memory="max_mem: {:.0f}M".format(max_mem_mb) if max_mem_mb is not None else "",
)
)
class EventStorage:
"""
The user-facing class that provides metric storage functionalities.
In the future we may add support for storing / logging other types of data if needed.
"""
def __init__(self, start_iter=0):
"""
Args:
start_iter (int): the iteration number to start with
"""
self._history = defaultdict(HistoryBuffer)
self._smoothing_hints = {}
self._latest_scalars = {}
self._iter = start_iter
self._current_prefix = ""
self._vis_data = []
self._histograms = []
def put_image(self, img_name, img_tensor):
"""
Add an `img_tensor` associated with `img_name`, to be shown on
tensorboard.
Args:
img_name (str): The name of the image to put into tensorboard.
img_tensor (torch.Tensor or numpy.array): An `uint8` or `float`
Tensor of shape `[channel, height, width]` where `channel` is
3. The image format should be RGB. The elements in img_tensor
can either have values in [0, 1] (float32) or [0, 255] (uint8).
The `img_tensor` will be visualized in tensorboard.
"""
self._vis_data.append((img_name, img_tensor, self._iter))
def put_scalar(self, name, value, smoothing_hint=True):
"""
Add a scalar `value` to the `HistoryBuffer` associated with `name`.
Args:
smoothing_hint (bool): a 'hint' on whether this scalar is noisy and should be
smoothed when logged. The hint will be accessible through
:meth:`EventStorage.smoothing_hints`. A writer may ignore the hint
and apply custom smoothing rule.
It defaults to True because most scalars we save need to be smoothed to
provide any useful signal.
"""
name = self._current_prefix + name
history = self._history[name]
value = float(value)
history.update(value, self._iter)
self._latest_scalars[name] = value
existing_hint = self._smoothing_hints.get(name)
if existing_hint is not None:
assert (
existing_hint == smoothing_hint
), "Scalar {} was put with a different smoothing_hint!".format(name)
else:
self._smoothing_hints[name] = smoothing_hint
def put_scalars(self, *, smoothing_hint=True, **kwargs):
"""
Put multiple scalars from keyword arguments.
Examples:
storage.put_scalars(loss=my_loss, accuracy=my_accuracy, smoothing_hint=True)
"""
for k, v in kwargs.items():
self.put_scalar(k, v, smoothing_hint=smoothing_hint)
def put_histogram(self, hist_name, hist_tensor, bins=1000):
"""
Create a histogram from a tensor.
Args:
hist_name (str): The name of the histogram to put into tensorboard.
hist_tensor (torch.Tensor): A Tensor of arbitrary shape to be converted
into a histogram.
bins (int): Number of histogram bins.
"""
ht_min, ht_max = hist_tensor.min().item(), hist_tensor.max().item()
# Create a histogram with PyTorch
hist_counts = torch.histc(hist_tensor, bins=bins)
hist_edges = torch.linspace(start=ht_min, end=ht_max, steps=bins + 1, dtype=torch.float32)
# Parameter for the add_histogram_raw function of SummaryWriter
hist_params = dict(
tag=hist_name,
min=ht_min,
max=ht_max,
num=len(hist_tensor),
sum=float(hist_tensor.sum()),
sum_squares=float(torch.sum(hist_tensor ** 2)),
bucket_limits=hist_edges[1:].tolist(),
bucket_counts=hist_counts.tolist(),
global_step=self._iter,
)
self._histograms.append(hist_params)
def history(self, name):
"""
Returns:
HistoryBuffer: the scalar history for name
"""
ret = self._history.get(name, None)
if ret is None:
raise KeyError("No history metric available for {}!".format(name))
return ret
def histories(self):
"""
Returns:
dict[name -> HistoryBuffer]: the HistoryBuffer for all scalars
"""
return self._history
def latest(self):
"""
Returns:
dict[name -> number]: the scalars that's added in the current iteration.
"""
return self._latest_scalars
def latest_with_smoothing_hint(self, window_size=20):
"""
Similar to :meth:`latest`, but the returned values
are either the un-smoothed original latest value,
or a median of the given window_size,
depend on whether the smoothing_hint is True.
This provides a default behavior that other writers can use.
"""
result = {}
for k, v in self._latest_scalars.items():
result[k] = self._history[k].median(window_size) if self._smoothing_hints[k] else v
return result
def smoothing_hints(self):
"""
Returns:
dict[name -> bool]: the user-provided hint on whether the scalar
is noisy and needs smoothing.
"""
return self._smoothing_hints
def step(self):
"""
User should call this function at the beginning of each iteration, to
notify the storage of the start of a new iteration.
The storage will then be able to associate the new data with the
correct iteration number.
"""
self._iter += 1
self._latest_scalars = {}
@property
def iter(self):
return self._iter
@property
def iteration(self):
# for backward compatibility
return self._iter
def __enter__(self):
_CURRENT_STORAGE_STACK.append(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
assert _CURRENT_STORAGE_STACK[-1] == self
_CURRENT_STORAGE_STACK.pop()
@contextmanager
def name_scope(self, name):
"""
Yields:
A context within which all the events added to this storage
will be prefixed by the name scope.
"""
old_prefix = self._current_prefix
self._current_prefix = name.rstrip("/") + "/"
yield
self._current_prefix = old_prefix
def clear_images(self):
"""
Delete all the stored images for visualization. This should be called
after images are written to tensorboard.
"""
self._vis_data = []
def clear_histograms(self):
"""
Delete all the stored histograms for visualization.
This should be called after histograms are written to tensorboard.
"""
self._histograms = []
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import functools
import logging
import os
import sys
import time
from collections import Counter
from fvcore.common.file_io import PathManager
from tabulate import tabulate
from termcolor import colored
class _ColorfulFormatter(logging.Formatter):
def __init__(self, *args, **kwargs):
self._root_name = kwargs.pop("root_name") + "."
self._abbrev_name = kwargs.pop("abbrev_name", "")
if len(self._abbrev_name):
self._abbrev_name = self._abbrev_name + "."
super(_ColorfulFormatter, self).__init__(*args, **kwargs)
def formatMessage(self, record):
record.name = record.name.replace(self._root_name, self._abbrev_name)
log = super(_ColorfulFormatter, self).formatMessage(record)
if record.levelno == logging.WARNING:
prefix = colored("WARNING", "red", attrs=["blink"])
elif record.levelno == logging.ERROR or record.levelno == logging.CRITICAL:
prefix = colored("ERROR", "red", attrs=["blink", "underline"])
else:
return log
return prefix + " " + log
@functools.lru_cache() # so that calling setup_logger multiple times won't add many handlers
def setup_logger(
output=None, distributed_rank=0, *, color=True, name="detectron2", abbrev_name=None
):
"""
Initialize the detectron2 logger and set its verbosity level to "DEBUG".
Args:
output (str): a file name or a directory to save log. If None, will not save log file.
If ends with ".txt" or ".log", assumed to be a file name.
Otherwise, logs will be saved to `output/log.txt`.
name (str): the root module name of this logger
abbrev_name (str): an abbreviation of the module, to avoid long names in logs.
Set to "" to not log the root module in logs.
By default, will abbreviate "detectron2" to "d2" and leave other
modules unchanged.
Returns:
logging.Logger: a logger
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.propagate = False
if abbrev_name is None:
abbrev_name = "d2" if name == "detectron2" else name
plain_formatter = logging.Formatter(
"[%(asctime)s] %(name)s %(levelname)s: %(message)s", datefmt="%m/%d %H:%M:%S"
)
# stdout logging: master only
if distributed_rank == 0:
ch = logging.StreamHandler(stream=sys.stdout)
ch.setLevel(logging.DEBUG)
if color:
formatter = _ColorfulFormatter(
colored("[%(asctime)s %(name)s]: ", "green") + "%(message)s",
datefmt="%m/%d %H:%M:%S",
root_name=name,
abbrev_name=str(abbrev_name),
)
else:
formatter = plain_formatter
ch.setFormatter(formatter)
logger.addHandler(ch)
# file logging: all workers
if output is not None:
if output.endswith(".txt") or output.endswith(".log"):
filename = output
else:
filename = os.path.join(output, "log.txt")
if distributed_rank > 0:
filename = filename + ".rank{}".format(distributed_rank)
PathManager.mkdirs(os.path.dirname(filename))
fh = logging.StreamHandler(_cached_log_stream(filename))
fh.setLevel(logging.DEBUG)
fh.setFormatter(plain_formatter)
logger.addHandler(fh)
return logger
# cache the opened file object, so that different calls to `setup_logger`
# with the same file name can safely write to the same file.
@functools.lru_cache(maxsize=None)
def _cached_log_stream(filename):
return PathManager.open(filename, "a")
"""
Below are some other convenient logging methods.
They are mainly adopted from
https://github.com/abseil/abseil-py/blob/master/absl/logging/__init__.py
"""
def _find_caller():
"""
Returns:
str: module name of the caller
tuple: a hashable key to be used to identify different callers
"""
frame = sys._getframe(2)
while frame:
code = frame.f_code
if os.path.join("utils", "logger.") not in code.co_filename:
mod_name = frame.f_globals["__name__"]
if mod_name == "__main__":
mod_name = "detectron2"
return mod_name, (code.co_filename, frame.f_lineno, code.co_name)
frame = frame.f_back
_LOG_COUNTER = Counter()
_LOG_TIMER = {}
def log_first_n(lvl, msg, n=1, *, name=None, key="caller"):
"""
Log only for the first n times.
Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module by default.
key (str or tuple[str]): the string(s) can be one of "caller" or
"message", which defines how to identify duplicated logs.
For example, if called with `n=1, key="caller"`, this function
will only log the first call from the same caller, regardless of
the message content.
If called with `n=1, key="message"`, this function will log the
same content only once, even if they are called from different places.
If called with `n=1, key=("caller", "message")`, this function
will not log only if the same caller has logged the same message before.
"""
if isinstance(key, str):
key = (key,)
assert len(key) > 0
caller_module, caller_key = _find_caller()
hash_key = ()
if "caller" in key:
hash_key = hash_key + caller_key
if "message" in key:
hash_key = hash_key + (msg,)
_LOG_COUNTER[hash_key] += 1
if _LOG_COUNTER[hash_key] <= n:
logging.getLogger(name or caller_module).log(lvl, msg)
def log_every_n(lvl, msg, n=1, *, name=None):
"""
Log once per n times.
Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module by default.
"""
caller_module, key = _find_caller()
_LOG_COUNTER[key] += 1
if n == 1 or _LOG_COUNTER[key] % n == 1:
logging.getLogger(name or caller_module).log(lvl, msg)
def log_every_n_seconds(lvl, msg, n=1, *, name=None):
"""
Log no more than once per n seconds.
Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module by default.
"""
caller_module, key = _find_caller()
last_logged = _LOG_TIMER.get(key, None)
current_time = time.time()
if last_logged is None or current_time - last_logged >= n:
logging.getLogger(name or caller_module).log(lvl, msg)
_LOG_TIMER[key] = current_time
def create_small_table(small_dict):
"""
Create a small table using the keys of small_dict as headers. This is only
suitable for small dictionaries.
Args:
small_dict (dict): a result dictionary of only a few items.
Returns:
str: the table as a string.
"""
keys, values = tuple(zip(*small_dict.items()))
table = tabulate(
[values],
headers=keys,
tablefmt="pipe",
floatfmt=".3f",
stralign="center",
numalign="center",
)
return table
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
import logging
from contextlib import contextmanager
from functools import wraps
import torch
__all__ = ["retry_if_cuda_oom"]
@contextmanager
def _ignore_torch_cuda_oom():
"""
A context which ignores CUDA OOM exception from pytorch.
"""
try:
yield
except RuntimeError as e:
# NOTE: the string may change?
if "CUDA out of memory. " in str(e):
pass
else:
raise
def retry_if_cuda_oom(func):
"""
Makes a function retry itself after encountering
pytorch's CUDA OOM error.
It will first retry after calling `torch.cuda.empty_cache()`.
If that still fails, it will then retry by trying to convert inputs to CPUs.
In this case, it expects the function to dispatch to CPU implementation.
The return values may become CPU tensors as well and it's user's
responsibility to convert it back to CUDA tensor if needed.
Args:
func: a stateless callable that takes tensor-like objects as arguments
Returns:
a callable which retries `func` if OOM is encountered.
Examples:
.. code-block:: python
output = retry_if_cuda_oom(some_torch_function)(input1, input2)
# output may be on CPU even if inputs are on GPU
Note:
1. When converting inputs to CPU, it will only look at each argument and check
if it has `.device` and `.to` for conversion. Nested structures of tensors
are not supported.
2. Since the function might be called more than once, it has to be
stateless.
"""
def maybe_to_cpu(x):
try:
like_gpu_tensor = x.device.type == "cuda" and hasattr(x, "to")
except AttributeError:
like_gpu_tensor = False
if like_gpu_tensor:
return x.to(device="cpu")
else:
return x
@wraps(func)
def wrapped(*args, **kwargs):
with _ignore_torch_cuda_oom():
return func(*args, **kwargs)
# Clear cache and retry
torch.cuda.empty_cache()
with _ignore_torch_cuda_oom():
return func(*args, **kwargs)
# Try on CPU. This slows down the code significantly, therefore print a notice.
logger = logging.getLogger(__name__)
logger.info("Attempting to copy inputs of {} to CPU due to CUDA OOM".format(str(func)))
new_args = (maybe_to_cpu(x) for x in args)
new_kwargs = {k: maybe_to_cpu(v) for k, v in kwargs.items()}
return func(*new_args, **new_kwargs)
return wrapped
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
# Keep this module for backward compatibility.
from fvcore.common.registry import Registry # noqa
__all__ = ["Registry"]
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import cloudpickle
class PicklableWrapper(object):
"""
Wrap an object to make it more picklable, note that it uses
heavy weight serialization libraries that are slower than pickle.
It's best to use it only on closures (which are usually not picklable).
This is a simplified version of
https://github.com/joblib/joblib/blob/master/joblib/externals/loky/cloudpickle_wrapper.py
"""
def __init__(self, obj):
self._obj = obj
def __reduce__(self):
s = cloudpickle.dumps(self._obj)
return cloudpickle.loads, (s,)
def __call__(self, *args, **kwargs):
return self._obj(*args, **kwargs)
def __getattr__(self, attr):
# Ensure that the wrapped object can be used seamlessly as the previous object.
if attr not in ["_obj"]:
return getattr(self._obj, attr)
return getattr(self, attr)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import numpy as np
import pycocotools.mask as mask_util
from detectron2.utils.visualizer import (
ColorMode,
Visualizer,
_create_text_labels,
_PanopticPrediction,
)
from .colormap import random_color
class _DetectedInstance:
"""
Used to store data about detected objects in video frame,
in order to transfer color to objects in the future frames.
Attributes:
label (int):
bbox (tuple[float]):
mask_rle (dict):
color (tuple[float]): RGB colors in range (0, 1)
ttl (int): time-to-live for the instance. For example, if ttl=2,
the instance color can be transferred to objects in the next two frames.
"""
__slots__ = ["label", "bbox", "mask_rle", "color", "ttl"]
def __init__(self, label, bbox, mask_rle, color, ttl):
self.label = label
self.bbox = bbox
self.mask_rle = mask_rle
self.color = color
self.ttl = ttl
class VideoVisualizer:
def __init__(self, metadata, instance_mode=ColorMode.IMAGE):
"""
Args:
metadata (MetadataCatalog): image metadata.
"""
self.metadata = metadata
self._old_instances = []
assert instance_mode in [
ColorMode.IMAGE,
ColorMode.IMAGE_BW,
], "Other mode not supported yet."
self._instance_mode = instance_mode
def draw_instance_predictions(self, frame, predictions):
"""
Draw instance-level prediction results on an image.
Args:
frame (ndarray): an RGB image of shape (H, W, C), in the range [0, 255].
predictions (Instances): the output of an instance detection/segmentation
model. Following fields will be used to draw:
"pred_boxes", "pred_classes", "scores", "pred_masks" (or "pred_masks_rle").
Returns:
output (VisImage): image object with visualizations.
"""
frame_visualizer = Visualizer(frame, self.metadata)
num_instances = len(predictions)
if num_instances == 0:
return frame_visualizer.output
boxes = predictions.pred_boxes.tensor.numpy() if predictions.has("pred_boxes") else None
scores = predictions.scores if predictions.has("scores") else None
classes = predictions.pred_classes.numpy() if predictions.has("pred_classes") else None
keypoints = predictions.pred_keypoints if predictions.has("pred_keypoints") else None
if predictions.has("pred_masks"):
masks = predictions.pred_masks
# mask IOU is not yet enabled
# masks_rles = mask_util.encode(np.asarray(masks.permute(1, 2, 0), order="F"))
# assert len(masks_rles) == num_instances
else:
masks = None
detected = [
_DetectedInstance(classes[i], boxes[i], mask_rle=None, color=None, ttl=8)
for i in range(num_instances)
]
colors = self._assign_colors(detected)
labels = _create_text_labels(classes, scores, self.metadata.get("thing_classes", None))
if self._instance_mode == ColorMode.IMAGE_BW:
# any() returns uint8 tensor
frame_visualizer.output.img = frame_visualizer._create_grayscale_image(
(masks.any(dim=0) > 0).numpy() if masks is not None else None
)
alpha = 0.3
else:
alpha = 0.5
frame_visualizer.overlay_instances(
boxes=None if masks is not None else boxes, # boxes are a bit distracting
masks=masks,
labels=labels,
keypoints=keypoints,
assigned_colors=colors,
alpha=alpha,
)
return frame_visualizer.output
def draw_sem_seg(self, frame, sem_seg, area_threshold=None):
"""
Args:
sem_seg (ndarray or Tensor): semantic segmentation of shape (H, W),
each value is the integer label.
area_threshold (Optional[int]): only draw segmentations larger than the threshold
"""
# don't need to do anything special
frame_visualizer = Visualizer(frame, self.metadata)
frame_visualizer.draw_sem_seg(sem_seg, area_threshold=None)
return frame_visualizer.output
def draw_panoptic_seg_predictions(
self, frame, panoptic_seg, segments_info, area_threshold=None, alpha=0.5
):
frame_visualizer = Visualizer(frame, self.metadata)
pred = _PanopticPrediction(panoptic_seg, segments_info)
if self._instance_mode == ColorMode.IMAGE_BW:
frame_visualizer.output.img = frame_visualizer._create_grayscale_image(
pred.non_empty_mask()
)
# draw mask for all semantic segments first i.e. "stuff"
for mask, sinfo in pred.semantic_masks():
category_idx = sinfo["category_id"]
try:
mask_color = [x / 255 for x in self.metadata.stuff_colors[category_idx]]
except AttributeError:
mask_color = None
frame_visualizer.draw_binary_mask(
mask,
color=mask_color,
text=self.metadata.stuff_classes[category_idx],
alpha=alpha,
area_threshold=area_threshold,
)
all_instances = list(pred.instance_masks())
if len(all_instances) == 0:
return frame_visualizer.output
# draw mask for all instances second
masks, sinfo = list(zip(*all_instances))
num_instances = len(masks)
masks_rles = mask_util.encode(
np.asarray(np.asarray(masks).transpose(1, 2, 0), dtype=np.uint8, order="F")
)
assert len(masks_rles) == num_instances
category_ids = [x["category_id"] for x in sinfo]
detected = [
_DetectedInstance(category_ids[i], bbox=None, mask_rle=masks_rles[i], color=None, ttl=8)
for i in range(num_instances)
]
colors = self._assign_colors(detected)
labels = [self.metadata.thing_classes[k] for k in category_ids]
frame_visualizer.overlay_instances(
boxes=None,
masks=masks,
labels=labels,
keypoints=None,
assigned_colors=colors,
alpha=alpha,
)
return frame_visualizer.output
def _assign_colors(self, instances):
"""
Naive tracking heuristics to assign same color to the same instance,
will update the internal state of tracked instances.
Returns:
list[tuple[float]]: list of colors.
"""
# Compute iou with either boxes or masks:
is_crowd = np.zeros((len(instances),), dtype=np.bool)
if instances[0].bbox is None:
assert instances[0].mask_rle is not None
# use mask iou only when box iou is None
# because box seems good enough
rles_old = [x.mask_rle for x in self._old_instances]
rles_new = [x.mask_rle for x in instances]
ious = mask_util.iou(rles_old, rles_new, is_crowd)
threshold = 0.5
else:
boxes_old = [x.bbox for x in self._old_instances]
boxes_new = [x.bbox for x in instances]
ious = mask_util.iou(boxes_old, boxes_new, is_crowd)
threshold = 0.6
if len(ious) == 0:
ious = np.zeros((len(self._old_instances), len(instances)), dtype="float32")
# Only allow matching instances of the same label:
for old_idx, old in enumerate(self._old_instances):
for new_idx, new in enumerate(instances):
if old.label != new.label:
ious[old_idx, new_idx] = 0
matched_new_per_old = np.asarray(ious).argmax(axis=1)
max_iou_per_old = np.asarray(ious).max(axis=1)
# Try to find match for each old instance:
extra_instances = []
for idx, inst in enumerate(self._old_instances):
if max_iou_per_old[idx] > threshold:
newidx = matched_new_per_old[idx]
if instances[newidx].color is None:
instances[newidx].color = inst.color
continue
# If an old instance does not match any new instances,
# keep it for the next frame in case it is just missed by the detector
inst.ttl -= 1
if inst.ttl > 0:
extra_instances.append(inst)
# Assign random color to newly-detected instances:
for inst in instances:
if inst.color is None:
inst.color = random_color(rgb=True, maximum=1)
self._old_instances = instances[:] + extra_instances
return [d.color for d in instances]
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import colorsys
import logging
import math
import numpy as np
from enum import Enum, unique
import cv2
import matplotlib as mpl
import matplotlib.colors as mplc
import matplotlib.figure as mplfigure
import pycocotools.mask as mask_util
import torch
from fvcore.common.file_io import PathManager
from matplotlib.backends.backend_agg import FigureCanvasAgg
from PIL import Image
from detectron2.structures import BitMasks, Boxes, BoxMode, Keypoints, PolygonMasks, RotatedBoxes
from .colormap import random_color
logger = logging.getLogger(__name__)
__all__ = ["ColorMode", "VisImage", "Visualizer"]
_SMALL_OBJECT_AREA_THRESH = 1000
_LARGE_MASK_AREA_THRESH = 120000
_OFF_WHITE = (1.0, 1.0, 240.0 / 255)
_BLACK = (0, 0, 0)
_RED = (1.0, 0, 0)
_KEYPOINT_THRESHOLD = 0.05
@unique
class ColorMode(Enum):
"""
Enum of different color modes to use for instance visualizations.
"""
IMAGE = 0
"""
Picks a random color for every instance and overlay segmentations with low opacity.
"""
SEGMENTATION = 1
"""
Let instances of the same category have similar colors
(from metadata.thing_colors), and overlay them with
high opacity. This provides more attention on the quality of segmentation.
"""
IMAGE_BW = 2
"""
Same as IMAGE, but convert all areas without masks to gray-scale.
Only available for drawing per-instance mask predictions.
"""
class GenericMask:
"""
Attribute:
polygons (list[ndarray]): list[ndarray]: polygons for this mask.
Each ndarray has format [x, y, x, y, ...]
mask (ndarray): a binary mask
"""
def __init__(self, mask_or_polygons, height, width):
self._mask = self._polygons = self._has_holes = None
self.height = height
self.width = width
m = mask_or_polygons
if isinstance(m, dict):
# RLEs
assert "counts" in m and "size" in m
if isinstance(m["counts"], list): # uncompressed RLEs
h, w = m["size"]
assert h == height and w == width
m = mask_util.frPyObjects(m, h, w)
self._mask = mask_util.decode(m)[:, :]
return
if isinstance(m, list): # list[ndarray]
self._polygons = [np.asarray(x).reshape(-1) for x in m]
return
if isinstance(m, np.ndarray): # assumed to be a binary mask
assert m.shape[1] != 2, m.shape
assert m.shape == (height, width), m.shape
self._mask = m.astype("uint8")
return
raise ValueError("GenericMask cannot handle object {} of type '{}'".format(m, type(m)))
@property
def mask(self):
if self._mask is None:
self._mask = self.polygons_to_mask(self._polygons)
return self._mask
@property
def polygons(self):
if self._polygons is None:
self._polygons, self._has_holes = self.mask_to_polygons(self._mask)
return self._polygons
@property
def has_holes(self):
if self._has_holes is None:
if self._mask is not None:
self._polygons, self._has_holes = self.mask_to_polygons(self._mask)
else:
self._has_holes = False # if original format is polygon, does not have holes
return self._has_holes
def mask_to_polygons(self, mask):
# cv2.RETR_CCOMP flag retrieves all the contours and arranges them to a 2-level
# hierarchy. External contours (boundary) of the object are placed in hierarchy-1.
# Internal contours (holes) are placed in hierarchy-2.
# cv2.CHAIN_APPROX_NONE flag gets vertices of polygons from contours.
mask = np.ascontiguousarray(mask) # some versions of cv2 does not support incontiguous arr
res = cv2.findContours(mask.astype("uint8"), cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE)
hierarchy = res[-1]
if hierarchy is None: # empty mask
return [], False
has_holes = (hierarchy.reshape(-1, 4)[:, 3] >= 0).sum() > 0
res = res[-2]
res = [x.flatten() for x in res]
res = [x for x in res if len(x) >= 6]
return res, has_holes
def polygons_to_mask(self, polygons):
rle = mask_util.frPyObjects(polygons, self.height, self.width)
rle = mask_util.merge(rle)
return mask_util.decode(rle)[:, :]
def area(self):
return self.mask.sum()
def bbox(self):
p = mask_util.frPyObjects(self.polygons, self.height, self.width)
p = mask_util.merge(p)
bbox = mask_util.toBbox(p)
bbox[2] += bbox[0]
bbox[3] += bbox[1]
return bbox
class _PanopticPrediction:
def __init__(self, panoptic_seg, segments_info):
self._seg = panoptic_seg
self._sinfo = {s["id"]: s for s in segments_info} # seg id -> seg info
segment_ids, areas = torch.unique(panoptic_seg, sorted=True, return_counts=True)
areas = areas.numpy()
sorted_idxs = np.argsort(-areas)
self._seg_ids, self._seg_areas = segment_ids[sorted_idxs], areas[sorted_idxs]
self._seg_ids = self._seg_ids.tolist()
for sid, area in zip(self._seg_ids, self._seg_areas):
if sid in self._sinfo:
self._sinfo[sid]["area"] = float(area)
def non_empty_mask(self):
"""
Returns:
(H, W) array, a mask for all pixels that have a prediction
"""
empty_ids = []
for id in self._seg_ids:
if id not in self._sinfo:
empty_ids.append(id)
if len(empty_ids) == 0:
return np.zeros(self._seg.shape, dtype=np.uint8)
assert (
len(empty_ids) == 1
), ">1 ids corresponds to no labels. This is currently not supported"
return (self._seg != empty_ids[0]).numpy().astype(np.bool)
def semantic_masks(self):
for sid in self._seg_ids:
sinfo = self._sinfo.get(sid)
if sinfo is None or sinfo["isthing"]:
# Some pixels (e.g. id 0 in PanopticFPN) have no instance or semantic predictions.
continue
yield (self._seg == sid).numpy().astype(np.bool), sinfo
def instance_masks(self):
for sid in self._seg_ids:
sinfo = self._sinfo.get(sid)
if sinfo is None or not sinfo["isthing"]:
continue
mask = (self._seg == sid).numpy().astype(np.bool)
if mask.sum() > 0:
yield mask, sinfo
def _create_text_labels(classes, scores, class_names):
"""
Args:
classes (list[int] or None):
scores (list[float] or None):
class_names (list[str] or None):
Returns:
list[str] or None
"""
labels = None
if classes is not None and class_names is not None and len(class_names) > 1:
labels = [class_names[i] for i in classes]
if scores is not None:
if labels is None:
labels = ["{:.0f}%".format(s * 100) for s in scores]
else:
labels = ["{} {:.0f}%".format(l, s * 100) for l, s in zip(labels, scores)]
return labels
class VisImage:
def __init__(self, img, scale=1.0):
"""
Args:
img (ndarray): an RGB image of shape (H, W, 3).
scale (float): scale the input image
"""
self.img = img
self.scale = scale
self.width, self.height = img.shape[1], img.shape[0]
self._setup_figure(img)
def _setup_figure(self, img):
"""
Args:
Same as in :meth:`__init__()`.
Returns:
fig (matplotlib.pyplot.figure): top level container for all the image plot elements.
ax (matplotlib.pyplot.Axes): contains figure elements and sets the coordinate system.
"""
fig = mplfigure.Figure(frameon=False)
self.dpi = fig.get_dpi()
# add a small 1e-2 to avoid precision lost due to matplotlib's truncation
# (https://github.com/matplotlib/matplotlib/issues/15363)
fig.set_size_inches(
(self.width * self.scale + 1e-2) / self.dpi,
(self.height * self.scale + 1e-2) / self.dpi,
)
self.canvas = FigureCanvasAgg(fig)
# self.canvas = mpl.backends.backend_cairo.FigureCanvasCairo(fig)
ax = fig.add_axes([0.0, 0.0, 1.0, 1.0])
ax.axis("off")
ax.set_xlim(0.0, self.width)
ax.set_ylim(self.height)
self.fig = fig
self.ax = ax
def save(self, filepath):
"""
Args:
filepath (str): a string that contains the absolute path, including the file name, where
the visualized image will be saved.
"""
if filepath.lower().endswith(".jpg") or filepath.lower().endswith(".png"):
# faster than matplotlib's imshow
cv2.imwrite(filepath, self.get_image()[:, :, ::-1])
else:
# support general formats (e.g. pdf)
self.ax.imshow(self.img, interpolation="nearest")
self.fig.savefig(filepath)
def get_image(self):
"""
Returns:
ndarray:
the visualized image of shape (H, W, 3) (RGB) in uint8 type.
The shape is scaled w.r.t the input image using the given `scale` argument.
"""
canvas = self.canvas
s, (width, height) = canvas.print_to_buffer()
if (self.width, self.height) != (width, height):
img = cv2.resize(self.img, (width, height))
else:
img = self.img
# buf = io.BytesIO() # works for cairo backend
# canvas.print_rgba(buf)
# width, height = self.width, self.height
# s = buf.getvalue()
buffer = np.frombuffer(s, dtype="uint8")
# imshow is slow. blend manually (still quite slow)
img_rgba = buffer.reshape(height, width, 4)
rgb, alpha = np.split(img_rgba, [3], axis=2)
try:
import numexpr as ne # fuse them with numexpr
visualized_image = ne.evaluate("demo * (1 - alpha / 255.0) + rgb * (alpha / 255.0)")
except ImportError:
alpha = alpha.astype("float32") / 255.0
visualized_image = img * (1 - alpha) + rgb * alpha
visualized_image = visualized_image.astype("uint8")
return visualized_image
class Visualizer:
def __init__(self, img_rgb, metadata, scale=1.0, instance_mode=ColorMode.IMAGE):
"""
Args:
img_rgb: a numpy array of shape (H, W, C), where H and W correspond to
the height and width of the image respectively. C is the number of
color channels. The image is required to be in RGB format since that
is a requirement of the Matplotlib library. The image is also expected
to be in the range [0, 255].
metadata (MetadataCatalog): image metadata.
"""
self.img = np.asarray(img_rgb).clip(0, 255).astype(np.uint8)
self.metadata = metadata
self.output = VisImage(self.img, scale=scale)
self.cpu_device = torch.device("cpu")
# too small texts are useless, therefore clamp to 9
self._default_font_size = max(
np.sqrt(self.output.height * self.output.width) // 90, 10 // scale
)
self._instance_mode = instance_mode
def draw_instance_predictions(self, predictions):
"""
Draw instance-level prediction results on an image.
Args:
predictions (Instances): the output of an instance detection/segmentation
model. Following fields will be used to draw:
"pred_boxes", "pred_classes", "scores", "pred_masks" (or "pred_masks_rle").
Returns:
output (VisImage): image object with visualizations.
"""
boxes = predictions.pred_boxes if predictions.has("pred_boxes") else None
scores = predictions.scores if predictions.has("scores") else None
classes = predictions.pred_classes if predictions.has("pred_classes") else None
labels = _create_text_labels(classes, scores, self.metadata.get("thing_classes", None))
keypoints = predictions.pred_keypoints if predictions.has("pred_keypoints") else None
if predictions.has("pred_masks"):
masks = np.asarray(predictions.pred_masks)
masks = [GenericMask(x, self.output.height, self.output.width) for x in masks]
else:
masks = None
if self._instance_mode == ColorMode.SEGMENTATION and self.metadata.get("thing_colors"):
colors = [
self._jitter([x / 255 for x in self.metadata.thing_colors[c]]) for c in classes
]
alpha = 0.8
else:
colors = None
alpha = 0.5
if self._instance_mode == ColorMode.IMAGE_BW:
self.output.img = self._create_grayscale_image(
(predictions.pred_masks.any(dim=0) > 0).numpy()
)
alpha = 0.3
self.overlay_instances(
masks=masks,
boxes=boxes,
labels=labels,
keypoints=keypoints,
assigned_colors=colors,
alpha=alpha,
)
return self.output
def draw_sem_seg(self, sem_seg, area_threshold=None, alpha=0.8):
"""
Draw semantic segmentation predictions/labels.
Args:
sem_seg (Tensor or ndarray): the segmentation of shape (H, W).
Each value is the integer label of the pixel.
area_threshold (int): segments with less than `area_threshold` are not drawn.
alpha (float): the larger it is, the more opaque the segmentations are.
Returns:
output (VisImage): image object with visualizations.
"""
if isinstance(sem_seg, torch.Tensor):
sem_seg = sem_seg.numpy()
labels, areas = np.unique(sem_seg, return_counts=True)
sorted_idxs = np.argsort(-areas).tolist()
labels = labels[sorted_idxs]
for label in filter(lambda l: l < len(self.metadata.stuff_classes), labels):
try:
mask_color = [x / 255 for x in self.metadata.stuff_colors[label]]
except (AttributeError, IndexError):
mask_color = None
binary_mask = (sem_seg == label).astype(np.uint8)
text = self.metadata.stuff_classes[label]
self.draw_binary_mask(
binary_mask,
color=mask_color,
edge_color=_OFF_WHITE,
text=text,
alpha=alpha,
area_threshold=area_threshold,
)
return self.output
def draw_panoptic_seg_predictions(
self, panoptic_seg, segments_info, area_threshold=None, alpha=0.7
):
"""
Draw panoptic prediction results on an image.
Args:
panoptic_seg (Tensor): of shape (height, width) where the values are ids for each
segment.
segments_info (list[dict]): Describe each segment in `panoptic_seg`.
Each dict contains keys "id", "category_id", "isthing".
area_threshold (int): stuff segments with less than `area_threshold` are not drawn.
Returns:
output (VisImage): image object with visualizations.
"""
pred = _PanopticPrediction(panoptic_seg, segments_info)
if self._instance_mode == ColorMode.IMAGE_BW:
self.output.img = self._create_grayscale_image(pred.non_empty_mask())
# draw mask for all semantic segments first i.e. "stuff"
for mask, sinfo in pred.semantic_masks():
category_idx = sinfo["category_id"]
try:
mask_color = [x / 255 for x in self.metadata.stuff_colors[category_idx]]
except AttributeError:
mask_color = None
text = self.metadata.stuff_classes[category_idx]
self.draw_binary_mask(
mask,
color=mask_color,
edge_color=_OFF_WHITE,
text=text,
alpha=alpha,
area_threshold=area_threshold,
)
# draw mask for all instances second
all_instances = list(pred.instance_masks())
if len(all_instances) == 0:
return self.output
masks, sinfo = list(zip(*all_instances))
category_ids = [x["category_id"] for x in sinfo]
try:
scores = [x["score"] for x in sinfo]
except KeyError:
scores = None
labels = _create_text_labels(category_ids, scores, self.metadata.thing_classes)
try:
colors = [random_color(rgb=True, maximum=1) for k in category_ids]
except AttributeError:
colors = None
self.overlay_instances(masks=masks, labels=labels, assigned_colors=colors, alpha=alpha)
return self.output
def draw_dataset_dict(self, dic):
"""
Draw annotations/segmentaions in Detectron2 Dataset format.
Args:
dic (dict): annotation/segmentation data of one image, in Detectron2 Dataset format.
Returns:
output (VisImage): image object with visualizations.
"""
annos = dic.get("annotations", None)
if annos:
if "segmentation" in annos[0]:
masks = [x["segmentation"] for x in annos]
else:
masks = None
if "keypoints" in annos[0]:
keypts = [x["keypoints"] for x in annos]
keypts = np.array(keypts).reshape(len(annos), -1, 3)
else:
keypts = None
boxes = [BoxMode.convert(x["bbox"], x["bbox_mode"], BoxMode.XYXY_ABS) for x in annos]
labels = [x["category_id"] for x in annos]
colors = None
if self._instance_mode == ColorMode.SEGMENTATION and self.metadata.get("thing_colors"):
colors = [
self._jitter([x / 255 for x in self.metadata.thing_colors[c]]) for c in labels
]
names = self.metadata.get("thing_classes", None)
if names:
labels = [names[i] for i in labels]
labels = [
"{}".format(i) + ("|crowd" if a.get("iscrowd", 0) else "")
for i, a in zip(labels, annos)
]
self.overlay_instances(
labels=labels, boxes=boxes, masks=masks, keypoints=keypts, assigned_colors=colors
)
sem_seg = dic.get("sem_seg", None)
if sem_seg is None and "sem_seg_file_name" in dic:
with PathManager.open(dic["sem_seg_file_name"], "rb") as f:
sem_seg = Image.open(f)
sem_seg = np.asarray(sem_seg, dtype="uint8")
if sem_seg is not None:
self.draw_sem_seg(sem_seg, area_threshold=0, alpha=0.5)
return self.output
def overlay_instances(
self,
*,
boxes=None,
labels=None,
masks=None,
keypoints=None,
assigned_colors=None,
alpha=0.5
):
"""
Args:
boxes (Boxes, RotatedBoxes or ndarray): either a :class:`Boxes`,
or an Nx4 numpy array of XYXY_ABS format for the N objects in a single image,
or a :class:`RotatedBoxes`,
or an Nx5 numpy array of (x_center, y_center, width, height, angle_degrees) format
for the N objects in a single image,
labels (list[str]): the text to be displayed for each instance.
masks (masks-like object): Supported types are:
* :class:`detectron2.structures.PolygonMasks`,
:class:`detectron2.structures.BitMasks`.
* list[list[ndarray]]: contains the segmentation masks for all objects in one image.
The first level of the list corresponds to individual instances. The second
level to all the polygon that compose the instance, and the third level
to the polygon coordinates. The third level should have the format of
[x0, y0, x1, y1, ..., xn, yn] (n >= 3).
* list[ndarray]: each ndarray is a binary mask of shape (H, W).
* list[dict]: each dict is a COCO-style RLE.
keypoints (Keypoint or array like): an array-like object of shape (N, K, 3),
where the N is the number of instances and K is the number of keypoints.
The last dimension corresponds to (x, y, visibility or score).
assigned_colors (list[matplotlib.colors]): a list of colors, where each color
corresponds to each mask or box in the image. Refer to 'matplotlib.colors'
for full list of formats that the colors are accepted in.
Returns:
output (VisImage): image object with visualizations.
"""
num_instances = None
if boxes is not None:
boxes = self._convert_boxes(boxes)
num_instances = len(boxes)
if masks is not None:
masks = self._convert_masks(masks)
if num_instances:
assert len(masks) == num_instances
else:
num_instances = len(masks)
if keypoints is not None:
if num_instances:
assert len(keypoints) == num_instances
else:
num_instances = len(keypoints)
keypoints = self._convert_keypoints(keypoints)
if labels is not None:
assert len(labels) == num_instances
if assigned_colors is None:
assigned_colors = [random_color(rgb=True, maximum=1) for _ in range(num_instances)]
if num_instances == 0:
return self.output
if boxes is not None and boxes.shape[1] == 5:
return self.overlay_rotated_instances(
boxes=boxes, labels=labels, assigned_colors=assigned_colors
)
# Display in largest to smallest order to reduce occlusion.
areas = None
if boxes is not None:
areas = np.prod(boxes[:, 2:] - boxes[:, :2], axis=1)
elif masks is not None:
areas = np.asarray([x.area() for x in masks])
if areas is not None:
sorted_idxs = np.argsort(-areas).tolist()
# Re-order overlapped instances in descending order.
boxes = boxes[sorted_idxs] if boxes is not None else None
labels = [labels[k] for k in sorted_idxs] if labels is not None else None
masks = [masks[idx] for idx in sorted_idxs] if masks is not None else None
assigned_colors = [assigned_colors[idx] for idx in sorted_idxs]
keypoints = keypoints[sorted_idxs] if keypoints is not None else None
for i in range(num_instances):
color = assigned_colors[i]
if boxes is not None:
self.draw_box(boxes[i], edge_color=color)
if masks is not None:
for segment in masks[i].polygons:
self.draw_polygon(segment.reshape(-1, 2), color, alpha=alpha)
if labels is not None:
# first get a box
if boxes is not None:
x0, y0, x1, y1 = boxes[i]
text_pos = (x0, y0) # if drawing boxes, put text on the box corner.
horiz_align = "left"
elif masks is not None:
x0, y0, x1, y1 = masks[i].bbox()
# draw text in the center (defined by median) when box is not drawn
# median is less sensitive to outliers.
text_pos = np.median(masks[i].mask.nonzero(), axis=1)[::-1]
horiz_align = "center"
else:
continue # drawing the box confidence for keypoints isn't very useful.
# for small objects, draw text at the side to avoid occlusion
instance_area = (y1 - y0) * (x1 - x0)
if (
instance_area < _SMALL_OBJECT_AREA_THRESH * self.output.scale
or y1 - y0 < 40 * self.output.scale
):
if y1 >= self.output.height - 5:
text_pos = (x1, y0)
else:
text_pos = (x0, y1)
height_ratio = (y1 - y0) / np.sqrt(self.output.height * self.output.width)
lighter_color = self._change_color_brightness(color, brightness_factor=0.7)
font_size = (
np.clip((height_ratio - 0.02) / 0.08 + 1, 1.2, 2)
* 0.5
* self._default_font_size
)
self.draw_text(
labels[i],
text_pos,
color=lighter_color,
horizontal_alignment=horiz_align,
font_size=font_size,
)
# draw keypoints
if keypoints is not None:
for keypoints_per_instance in keypoints:
self.draw_and_connect_keypoints(keypoints_per_instance)
return self.output
def overlay_rotated_instances(self, boxes=None, labels=None, assigned_colors=None):
"""
Args:
boxes (ndarray): an Nx5 numpy array of
(x_center, y_center, width, height, angle_degrees) format
for the N objects in a single image.
labels (list[str]): the text to be displayed for each instance.
assigned_colors (list[matplotlib.colors]): a list of colors, where each color
corresponds to each mask or box in the image. Refer to 'matplotlib.colors'
for full list of formats that the colors are accepted in.
Returns:
output (VisImage): image object with visualizations.
"""
num_instances = len(boxes)
if assigned_colors is None:
assigned_colors = [random_color(rgb=True, maximum=1) for _ in range(num_instances)]
if num_instances == 0:
return self.output
# Display in largest to smallest order to reduce occlusion.
if boxes is not None:
areas = boxes[:, 2] * boxes[:, 3]
sorted_idxs = np.argsort(-areas).tolist()
# Re-order overlapped instances in descending order.
boxes = boxes[sorted_idxs]
labels = [labels[k] for k in sorted_idxs] if labels is not None else None
colors = [assigned_colors[idx] for idx in sorted_idxs]
for i in range(num_instances):
self.draw_rotated_box_with_label(
boxes[i], edge_color=colors[i], label=labels[i] if labels is not None else None
)
return self.output
def draw_and_connect_keypoints(self, keypoints):
"""
Draws keypoints of an instance and follows the rules for keypoint connections
to draw lines between appropriate keypoints. This follows color heuristics for
line color.
Args:
keypoints (Tensor): a tensor of shape (K, 3), where K is the number of keypoints
and the last dimension corresponds to (x, y, probability).
Returns:
output (VisImage): image object with visualizations.
"""
visible = {}
keypoint_names = self.metadata.get("keypoint_names")
for idx, keypoint in enumerate(keypoints):
# draw keypoint
x, y, prob = keypoint
if prob > _KEYPOINT_THRESHOLD:
self.draw_circle((x, y), color=_RED)
if keypoint_names:
keypoint_name = keypoint_names[idx]
visible[keypoint_name] = (x, y)
if self.metadata.get("keypoint_connection_rules"):
for kp0, kp1, color in self.metadata.keypoint_connection_rules:
if kp0 in visible and kp1 in visible:
x0, y0 = visible[kp0]
x1, y1 = visible[kp1]
color = tuple(x / 255.0 for x in color)
self.draw_line([x0, x1], [y0, y1], color=color)
# draw lines from nose to mid-shoulder and mid-shoulder to mid-hip
# Note that this strategy is specific to person keypoints.
# For other keypoints, it should just do nothing
try:
ls_x, ls_y = visible["left_shoulder"]
rs_x, rs_y = visible["right_shoulder"]
mid_shoulder_x, mid_shoulder_y = (ls_x + rs_x) / 2, (ls_y + rs_y) / 2
except KeyError:
pass
else:
# draw line from nose to mid-shoulder
nose_x, nose_y = visible.get("nose", (None, None))
if nose_x is not None:
self.draw_line([nose_x, mid_shoulder_x], [nose_y, mid_shoulder_y], color=_RED)
try:
# draw line from mid-shoulder to mid-hip
lh_x, lh_y = visible["left_hip"]
rh_x, rh_y = visible["right_hip"]
except KeyError:
pass
else:
mid_hip_x, mid_hip_y = (lh_x + rh_x) / 2, (lh_y + rh_y) / 2
self.draw_line([mid_hip_x, mid_shoulder_x], [mid_hip_y, mid_shoulder_y], color=_RED)
return self.output
"""
Primitive drawing functions:
"""
def draw_text(
self,
text,
position,
*,
font_size=None,
color="g",
horizontal_alignment="center",
rotation=0
):
"""
Args:
text (str): class label
position (tuple): a tuple of the x and y coordinates to place text on image.
font_size (int, optional): font of the text. If not provided, a font size
proportional to the image width is calculated and used.
color: color of the text. Refer to `matplotlib.colors` for full list
of formats that are accepted.
horizontal_alignment (str): see `matplotlib.text.Text`
rotation: rotation angle in degrees CCW
Returns:
output (VisImage): image object with text drawn.
"""
if not font_size:
font_size = self._default_font_size
# since the text background is dark, we don't want the text to be dark
color = np.maximum(list(mplc.to_rgb(color)), 0.2)
color[np.argmax(color)] = max(0.8, np.max(color))
x, y = position
self.output.ax.text(
x,
y,
text,
size=font_size * self.output.scale,
family="sans-serif",
bbox={"facecolor": "black", "alpha": 0.8, "pad": 0.7, "edgecolor": "none"},
verticalalignment="top",
horizontalalignment=horizontal_alignment,
color=color,
zorder=10,
rotation=rotation,
)
return self.output
def draw_box(self, box_coord, alpha=0.5, edge_color="g", line_style="-"):
"""
Args:
box_coord (tuple): a tuple containing x0, y0, x1, y1 coordinates, where x0 and y0
are the coordinates of the image's top left corner. x1 and y1 are the
coordinates of the image's bottom right corner.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
edge_color: color of the outline of the box. Refer to `matplotlib.colors`
for full list of formats that are accepted.
line_style (string): the string to use to create the outline of the boxes.
Returns:
output (VisImage): image object with box drawn.
"""
x0, y0, x1, y1 = box_coord
width = x1 - x0
height = y1 - y0
linewidth = max(self._default_font_size / 4, 1)
self.output.ax.add_patch(
mpl.patches.Rectangle(
(x0, y0),
width,
height,
fill=False,
edgecolor=edge_color,
linewidth=linewidth * self.output.scale,
alpha=alpha,
linestyle=line_style,
)
)
return self.output
def draw_rotated_box_with_label(
self, rotated_box, alpha=0.5, edge_color="g", line_style="-", label=None
):
"""
Args:
rotated_box (tuple): a tuple containing (cnt_x, cnt_y, w, h, angle),
where cnt_x and cnt_y are the center coordinates of the box.
w and h are the width and height of the box. angle represents how
many degrees the box is rotated CCW with regard to the 0-degree box.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
edge_color: color of the outline of the box. Refer to `matplotlib.colors`
for full list of formats that are accepted.
line_style (string): the string to use to create the outline of the boxes.
label (string): label for rotated box. It will not be rendered when set to None.
Returns:
output (VisImage): image object with box drawn.
"""
cnt_x, cnt_y, w, h, angle = rotated_box
area = w * h
# use thinner lines when the box is small
linewidth = self._default_font_size / (
6 if area < _SMALL_OBJECT_AREA_THRESH * self.output.scale else 3
)
theta = angle * math.pi / 180.0
c = math.cos(theta)
s = math.sin(theta)
rect = [(-w / 2, h / 2), (-w / 2, -h / 2), (w / 2, -h / 2), (w / 2, h / 2)]
# x: left->right ; y: top->down
rotated_rect = [(s * yy + c * xx + cnt_x, c * yy - s * xx + cnt_y) for (xx, yy) in rect]
for k in range(4):
j = (k + 1) % 4
self.draw_line(
[rotated_rect[k][0], rotated_rect[j][0]],
[rotated_rect[k][1], rotated_rect[j][1]],
color=edge_color,
linestyle="--" if k == 1 else line_style,
linewidth=linewidth,
)
if label is not None:
text_pos = rotated_rect[1] # topleft corner
height_ratio = h / np.sqrt(self.output.height * self.output.width)
label_color = self._change_color_brightness(edge_color, brightness_factor=0.7)
font_size = (
np.clip((height_ratio - 0.02) / 0.08 + 1, 1.2, 2) * 0.5 * self._default_font_size
)
self.draw_text(label, text_pos, color=label_color, font_size=font_size, rotation=angle)
return self.output
def draw_circle(self, circle_coord, color, radius=3):
"""
Args:
circle_coord (list(int) or tuple(int)): contains the x and y coordinates
of the center of the circle.
color: color of the polygon. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
radius (int): radius of the circle.
Returns:
output (VisImage): image object with box drawn.
"""
x, y = circle_coord
self.output.ax.add_patch(
mpl.patches.Circle(circle_coord, radius=radius, fill=True, color=color)
)
return self.output
def draw_line(self, x_data, y_data, color, linestyle="-", linewidth=None):
"""
Args:
x_data (list[int]): a list containing x values of all the points being drawn.
Length of list should match the length of y_data.
y_data (list[int]): a list containing y values of all the points being drawn.
Length of list should match the length of x_data.
color: color of the line. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
linestyle: style of the line. Refer to `matplotlib.lines.Line2D`
for a full list of formats that are accepted.
linewidth (float or None): width of the line. When it's None,
a default value will be computed and used.
Returns:
output (VisImage): image object with line drawn.
"""
if linewidth is None:
linewidth = self._default_font_size / 3
linewidth = max(linewidth, 1)
self.output.ax.add_line(
mpl.lines.Line2D(
x_data,
y_data,
linewidth=linewidth * self.output.scale,
color=color,
linestyle=linestyle,
)
)
return self.output
def draw_binary_mask(
self, binary_mask, color=None, *, edge_color=None, text=None, alpha=0.5, area_threshold=4096
):
"""
Args:
binary_mask (ndarray): numpy array of shape (H, W), where H is the image height and
W is the image width. Each value in the array is either a 0 or 1 value of uint8
type.
color: color of the mask. Refer to `matplotlib.colors` for a full list of
formats that are accepted. If None, will pick a random color.
edge_color: color of the polygon edges. Refer to `matplotlib.colors` for a
full list of formats that are accepted.
text (str): if None, will be drawn in the object's center of mass.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
area_threshold (float): a connected component small than this will not be shown.
Returns:
output (VisImage): image object with mask drawn.
"""
if color is None:
color = random_color(rgb=True, maximum=1)
if area_threshold is None:
area_threshold = 4096
has_valid_segment = False
binary_mask = binary_mask.astype("uint8") # opencv needs uint8
mask = GenericMask(binary_mask, self.output.height, self.output.width)
shape2d = (binary_mask.shape[0], binary_mask.shape[1])
if not mask.has_holes:
# draw polygons for regular masks
for segment in mask.polygons:
area = mask_util.area(mask_util.frPyObjects([segment], shape2d[0], shape2d[1]))
if area < area_threshold:
continue
has_valid_segment = True
segment = segment.reshape(-1, 2)
self.draw_polygon(segment, color=color, edge_color=edge_color, alpha=alpha)
else:
rgba = np.zeros(shape2d + (4,), dtype="float32")
rgba[:, :, :3] = color
rgba[:, :, 3] = (mask.mask == 1).astype("float32") * alpha
has_valid_segment = True
self.output.ax.imshow(rgba)
if text is not None and has_valid_segment:
# TODO sometimes drawn on wrong objects. the heuristics here can improve.
lighter_color = self._change_color_brightness(color, brightness_factor=0.7)
_num_cc, cc_labels, stats, centroids = cv2.connectedComponentsWithStats(binary_mask, 8)
largest_component_id = np.argmax(stats[1:, -1]) + 1
# draw text on the largest component, as well as other very large components.
for cid in range(1, _num_cc):
if cid == largest_component_id or stats[cid, -1] > _LARGE_MASK_AREA_THRESH:
# median is more stable than centroid
# center = centroids[largest_component_id]
center = np.median((cc_labels == cid).nonzero(), axis=1)[::-1]
self.draw_text(text, center, color=lighter_color)
return self.output
def draw_polygon(self, segment, color, edge_color=None, alpha=0.5):
"""
Args:
segment: numpy array of shape Nx2, containing all the points in the polygon.
color: color of the polygon. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
edge_color: color of the polygon edges. Refer to `matplotlib.colors` for a
full list of formats that are accepted. If not provided, a darker shade
of the polygon color will be used instead.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
Returns:
output (VisImage): image object with polygon drawn.
"""
if edge_color is None:
# make edge color darker than the polygon color
if alpha > 0.8:
edge_color = self._change_color_brightness(color, brightness_factor=-0.7)
else:
edge_color = color
edge_color = mplc.to_rgb(edge_color) + (1,)
polygon = mpl.patches.Polygon(
segment,
fill=True,
facecolor=mplc.to_rgb(color) + (alpha,),
edgecolor=edge_color,
linewidth=max(self._default_font_size // 15 * self.output.scale, 1),
)
self.output.ax.add_patch(polygon)
return self.output
"""
Internal methods:
"""
def _jitter(self, color):
"""
Randomly modifies given color to produce a slightly different color than the color given.
Args:
color (tuple[double]): a tuple of 3 elements, containing the RGB values of the color
picked. The values in the list are in the [0.0, 1.0] range.
Returns:
jittered_color (tuple[double]): a tuple of 3 elements, containing the RGB values of the
color after being jittered. The values in the list are in the [0.0, 1.0] range.
"""
color = mplc.to_rgb(color)
vec = np.random.rand(3)
# better to do it in another color space
vec = vec / np.linalg.norm(vec) * 0.5
res = np.clip(vec + color, 0, 1)
return tuple(res)
def _create_grayscale_image(self, mask=None):
"""
Create a grayscale version of the original image.
The colors in masked area, if given, will be kept.
"""
img_bw = self.img.astype("f4").mean(axis=2)
img_bw = np.stack([img_bw] * 3, axis=2)
if mask is not None:
img_bw[mask] = self.img[mask]
return img_bw
def _change_color_brightness(self, color, brightness_factor):
"""
Depending on the brightness_factor, gives a lighter or darker color i.e. a color with
less or more saturation than the original color.
Args:
color: color of the polygon. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
brightness_factor (float): a value in [-1.0, 1.0] range. A lightness factor of
0 will correspond to no change, a factor in [-1.0, 0) range will result in
a darker color and a factor in (0, 1.0] range will result in a lighter color.
Returns:
modified_color (tuple[double]): a tuple containing the RGB values of the
modified color. Each value in the tuple is in the [0.0, 1.0] range.
"""
assert brightness_factor >= -1.0 and brightness_factor <= 1.0
color = mplc.to_rgb(color)
polygon_color = colorsys.rgb_to_hls(*mplc.to_rgb(color))
modified_lightness = polygon_color[1] + (brightness_factor * polygon_color[1])
modified_lightness = 0.0 if modified_lightness < 0.0 else modified_lightness
modified_lightness = 1.0 if modified_lightness > 1.0 else modified_lightness
modified_color = colorsys.hls_to_rgb(polygon_color[0], modified_lightness, polygon_color[2])
return modified_color
def _convert_boxes(self, boxes):
"""
Convert different format of boxes to an NxB array, where B = 4 or 5 is the box dimension.
"""
if isinstance(boxes, Boxes) or isinstance(boxes, RotatedBoxes):
return boxes.tensor.numpy()
else:
return np.asarray(boxes)
def _convert_masks(self, masks_or_polygons):
"""
Convert different format of masks or polygons to a tuple of masks and polygons.
Returns:
list[GenericMask]:
"""
m = masks_or_polygons
if isinstance(m, PolygonMasks):
m = m.polygons
if isinstance(m, BitMasks):
m = m.tensor.numpy()
if isinstance(m, torch.Tensor):
m = m.numpy()
ret = []
for x in m:
if isinstance(x, GenericMask):
ret.append(x)
else:
ret.append(GenericMask(x, self.output.height, self.output.width))
return ret
def _convert_keypoints(self, keypoints):
if isinstance(keypoints, Keypoints):
keypoints = keypoints.tensor
keypoints = np.asarray(keypoints)
return keypoints
def get_output(self):
"""
Returns:
output (VisImage): the image output containing the visualizations added
to the image.
"""
return self.output
## Some scripts for developers to use, include:
- `linter.sh`: lint the codebase before commit
- `run_{inference,instant}_tests.sh`: run inference/training for a few iterations.
Note that these tests require 2 GPUs.
- `parse_results.sh`: parse results from a log file.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment