_misc.py 18.4 KB
Newer Older
1
import warnings
2
from typing import Any, Callable, Dict, List, Optional, Sequence, Type, Union
3
4
5
6
7
8

import PIL.Image

import torch
from torch.utils._pytree import tree_flatten, tree_unflatten

9
from torchvision import transforms as _transforms, tv_tensors
10
11
from torchvision.transforms.v2 import functional as F, Transform

12
from ._utils import _parse_labels_getter, _setup_number_or_seq, _setup_size, get_bounding_boxes, has_any, is_pure_tensor
13
14


Nicolas Hug's avatar
Nicolas Hug committed
15
# TODO: do we want/need to expose this?
16
17
18
19
20
21
class Identity(Transform):
    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
        return inpt


class Lambda(Transform):
22
    """Apply a user-defined function as a transform.
23
24
25
26
27
28
29

    This transform does not support torchscript.

    Args:
        lambd (function): Lambda/function to be used for transform.
    """

Philip Meier's avatar
Philip Meier committed
30
31
    _transformed_types = (object,)

32
33
34
    def __init__(self, lambd: Callable[[Any], Any], *types: Type):
        super().__init__()
        self.lambd = lambd
Philip Meier's avatar
Philip Meier committed
35
        self.types = types or self._transformed_types
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
        if isinstance(inpt, self.types):
            return self.lambd(inpt)
        else:
            return inpt

    def extra_repr(self) -> str:
        extras = []
        name = getattr(self.lambd, "__name__", None)
        if name:
            extras.append(name)
        extras.append(f"types={[type.__name__ for type in self.types]}")
        return ", ".join(extras)


class LinearTransformation(Transform):
53
    """Transform a tensor image or video with a square transformation matrix and a mean_vector computed offline.
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

    This transform does not support PIL Image.
    Given transformation_matrix and mean_vector, will flatten the torch.*Tensor and
    subtract mean_vector from it which is then followed by computing the dot
    product with the transformation matrix and then reshaping the tensor to its
    original shape.

    Applications:
        whitening transformation: Suppose X is a column vector zero-centered data.
        Then compute the data covariance matrix [D x D] with torch.mm(X.t(), X),
        perform SVD on this matrix and pass it as transformation_matrix.

    Args:
        transformation_matrix (Tensor): tensor [D x D], D = C x H x W
        mean_vector (Tensor): tensor [D], D = C x H x W
    """

71
72
    _v1_transform_cls = _transforms.LinearTransformation

73
    _transformed_types = (is_pure_tensor, tv_tensors.Image, tv_tensors.Video)
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

    def __init__(self, transformation_matrix: torch.Tensor, mean_vector: torch.Tensor):
        super().__init__()
        if transformation_matrix.size(0) != transformation_matrix.size(1):
            raise ValueError(
                "transformation_matrix should be square. Got "
                f"{tuple(transformation_matrix.size())} rectangular matrix."
            )

        if mean_vector.size(0) != transformation_matrix.size(0):
            raise ValueError(
                f"mean_vector should have the same length {mean_vector.size(0)}"
                f" as any one of the dimensions of the transformation_matrix [{tuple(transformation_matrix.size())}]"
            )

        if transformation_matrix.device != mean_vector.device:
            raise ValueError(
                f"Input tensors should be on the same device. Got {transformation_matrix.device} and {mean_vector.device}"
            )

        if transformation_matrix.dtype != mean_vector.dtype:
            raise ValueError(
                f"Input tensors should have the same dtype. Got {transformation_matrix.dtype} and {mean_vector.dtype}"
            )

        self.transformation_matrix = transformation_matrix
        self.mean_vector = mean_vector

    def _check_inputs(self, sample: Any) -> Any:
        if has_any(sample, PIL.Image.Image):
104
            raise TypeError(f"{type(self).__name__}() does not support PIL images.")
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
        shape = inpt.shape
        n = shape[-3] * shape[-2] * shape[-1]
        if n != self.transformation_matrix.shape[0]:
            raise ValueError(
                "Input tensor and transformation matrix have incompatible shape."
                + f"[{shape[-3]} x {shape[-2]} x {shape[-1]}] != "
                + f"{self.transformation_matrix.shape[0]}"
            )

        if inpt.device.type != self.mean_vector.device.type:
            raise ValueError(
                "Input tensor should be on the same device as transformation matrix and mean vector. "
                f"Got {inpt.device} vs {self.mean_vector.device}"
            )

        flat_inpt = inpt.reshape(-1, n) - self.mean_vector

        transformation_matrix = self.transformation_matrix.to(flat_inpt.dtype)
        output = torch.mm(flat_inpt, transformation_matrix)
        output = output.reshape(shape)

128
129
        if isinstance(inpt, (tv_tensors.Image, tv_tensors.Video)):
            output = tv_tensors.wrap(output, like=inpt)
130
131
132
133
        return output


class Normalize(Transform):
134
    """Normalize a tensor image or video with mean and standard deviation.
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

    This transform does not support PIL Image.
    Given mean: ``(mean[1],...,mean[n])`` and std: ``(std[1],..,std[n])`` for ``n``
    channels, this transform will normalize each channel of the input
    ``torch.*Tensor`` i.e.,
    ``output[channel] = (input[channel] - mean[channel]) / std[channel]``

    .. note::
        This transform acts out of place, i.e., it does not mutate the input tensor.

    Args:
        mean (sequence): Sequence of means for each channel.
        std (sequence): Sequence of standard deviations for each channel.
        inplace(bool,optional): Bool to make this operation in-place.

    """

152
153
154
155
156
157
158
159
160
161
162
163
    _v1_transform_cls = _transforms.Normalize

    def __init__(self, mean: Sequence[float], std: Sequence[float], inplace: bool = False):
        super().__init__()
        self.mean = list(mean)
        self.std = list(std)
        self.inplace = inplace

    def _check_inputs(self, sample: Any) -> Any:
        if has_any(sample, PIL.Image.Image):
            raise TypeError(f"{type(self).__name__}() does not support PIL images.")

164
    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
165
        return self._call_kernel(F.normalize, inpt, mean=self.mean, std=self.std, inplace=self.inplace)
166
167
168


class GaussianBlur(Transform):
169
170
171
    """Blurs image with randomly chosen Gaussian blur kernel.

    The convolution will be using reflection padding corresponding to the kernel size, to maintain the input shape.
172

Nicolas Hug's avatar
Nicolas Hug committed
173
    If the input is a Tensor, it is expected
174
175
176
177
178
179
180
181
182
183
    to have [..., C, H, W] shape, where ... means an arbitrary number of leading dimensions.

    Args:
        kernel_size (int or sequence): Size of the Gaussian kernel.
        sigma (float or tuple of float (min, max)): Standard deviation to be used for
            creating kernel to perform blurring. If float, sigma is fixed. If it is tuple
            of float (min, max), sigma is chosen uniformly at random to lie in the
            given range.
    """

184
185
186
187
188
189
190
191
192
193
194
    _v1_transform_cls = _transforms.GaussianBlur

    def __init__(
        self, kernel_size: Union[int, Sequence[int]], sigma: Union[int, float, Sequence[float]] = (0.1, 2.0)
    ) -> None:
        super().__init__()
        self.kernel_size = _setup_size(kernel_size, "Kernel size should be a tuple/list of two integers")
        for ks in self.kernel_size:
            if ks <= 0 or ks % 2 == 0:
                raise ValueError("Kernel size value should be an odd and positive number.")

195
        self.sigma = _setup_number_or_seq(sigma, "sigma")
196

197
198
        if not 0.0 < self.sigma[0] <= self.sigma[1]:
            raise ValueError(f"sigma values should be positive and of the form (min, max). Got {self.sigma}")
199
200
201
202
203
204

    def _get_params(self, flat_inputs: List[Any]) -> Dict[str, Any]:
        sigma = torch.empty(1).uniform_(self.sigma[0], self.sigma[1]).item()
        return dict(sigma=[sigma, sigma])

    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
205
        return self._call_kernel(F.gaussian_blur, inpt, self.kernel_size, **params)
206
207


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class GaussianNoise(Transform):
    """Add gaussian noise to images or videos.

    The input tensor is expected to be in [..., 1 or 3, H, W] format,
    where ... means it can have an arbitrary number of leading dimensions.
    Each image or frame in a batch will be transformed independently i.e. the
    noise added to each image will be different.

    The input tensor is also expected to be of float dtype in ``[0, 1]``.
    This transform does not support PIL images.

    Args:
        mean (float): Mean of the sampled normal distribution. Default is 0.
        sigma (float): Standard deviation of the sampled normal distribution. Default is 0.1.
        clip (bool, optional): Whether to clip the values in ``[0, 1]`` after adding noise. Default is True.
    """

    def __init__(self, mean: float = 0.0, sigma: float = 0.1, clip=True) -> None:
        super().__init__()
        self.mean = mean
        self.sigma = sigma
        self.clip = clip

    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
        return self._call_kernel(F.gaussian_noise, inpt, mean=self.mean, sigma=self.sigma, clip=self.clip)


235
class ToDtype(Transform):
236
    """Converts the input to a specific dtype, optionally scaling the values for images or videos.
Nicolas Hug's avatar
Nicolas Hug committed
237

238
239
240
    .. note::
        ``ToDtype(dtype, scale=True)`` is the recommended replacement for ``ConvertImageDtype(dtype)``.

Nicolas Hug's avatar
Nicolas Hug committed
241
    Args:
242
        dtype (``torch.dtype`` or dict of ``TVTensor`` -> ``torch.dtype``): The dtype to convert to.
243
244
            If a ``torch.dtype`` is passed, e.g. ``torch.float32``, only images and videos will be converted
            to that dtype: this is for compatibility with :class:`~torchvision.transforms.v2.ConvertImageDtype`.
245
246
247
            A dict can be passed to specify per-tv_tensor conversions, e.g.
            ``dtype={tv_tensors.Image: torch.float32, tv_tensors.Mask: torch.int64, "others":None}``. The "others"
            key can be used as a catch-all for any other tv_tensor type, and ``None`` means no conversion.
248
249
        scale (bool, optional): Whether to scale the values for images or videos. See :ref:`range_and_dtype`.
            Default: ``False``.
Nicolas Hug's avatar
Nicolas Hug committed
250
251
    """

252
253
    _transformed_types = (torch.Tensor,)

254
255
256
    def __init__(
        self, dtype: Union[torch.dtype, Dict[Union[Type, str], Optional[torch.dtype]]], scale: bool = False
    ) -> None:
257
        super().__init__()
258
259
260
261
262
263
264

        if not isinstance(dtype, (dict, torch.dtype)):
            raise ValueError(f"dtype must be a dict or a torch.dtype, got {type(dtype)} instead")

        if (
            isinstance(dtype, dict)
            and torch.Tensor in dtype
265
            and any(cls in dtype for cls in [tv_tensors.Image, tv_tensors.Video])
266
        ):
267
            warnings.warn(
268
                "Got `dtype` values for `torch.Tensor` and either `tv_tensors.Image` or `tv_tensors.Video`. "
269
                "Note that a plain `torch.Tensor` will *not* be transformed by this (or any other transformation) "
270
                "in case a `tv_tensors.Image` or `tv_tensors.Video` is present in the input."
271
272
            )
        self.dtype = dtype
273
        self.scale = scale
274
275

    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
276
277
278
        if isinstance(self.dtype, torch.dtype):
            # For consistency / BC with ConvertImageDtype, we only care about images or videos when dtype
            # is a simple torch.dtype
279
            if not is_pure_tensor(inpt) and not isinstance(inpt, (tv_tensors.Image, tv_tensors.Video)):
280
281
282
283
284
285
286
287
288
289
290
291
292
                return inpt

            dtype: Optional[torch.dtype] = self.dtype
        elif type(inpt) in self.dtype:
            dtype = self.dtype[type(inpt)]
        elif "others" in self.dtype:
            dtype = self.dtype["others"]
        else:
            raise ValueError(
                f"No dtype was specified for type {type(inpt)}. "
                "If you only need to convert the dtype of images or videos, you can just pass e.g. dtype=torch.float32. "
                "If you're passing a dict as dtype, "
                'you can use "others" as a catch-all key '
293
                'e.g. dtype={tv_tensors.Mask: torch.int64, "others": None} to pass-through the rest of the inputs.'
294
295
            )

296
        supports_scaling = is_pure_tensor(inpt) or isinstance(inpt, (tv_tensors.Image, tv_tensors.Video))
297
        if dtype is None:
298
299
300
301
            if self.scale and supports_scaling:
                warnings.warn(
                    "scale was set to True but no dtype was specified for images or videos: no scaling will be done."
                )
302
            return inpt
303

304
        return self._call_kernel(F.to_dtype, inpt, dtype=dtype, scale=self.scale)
305
306


307
class ConvertImageDtype(Transform):
308
    """[DEPRECATED] Use ``v2.ToDtype(dtype, scale=True)`` instead.
Nicolas Hug's avatar
Nicolas Hug committed
309
310

    Convert input image to the given ``dtype`` and scale the values accordingly.
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338

    .. warning::
        Consider using ``ToDtype(dtype, scale=True)`` instead. See :class:`~torchvision.transforms.v2.ToDtype`.

    This function does not support PIL Image.

    Args:
        dtype (torch.dtype): Desired data type of the output

    .. note::

        When converting from a smaller to a larger integer ``dtype`` the maximum values are **not** mapped exactly.
        If converted back and forth, this mismatch has no effect.

    Raises:
        RuntimeError: When trying to cast :class:`torch.float32` to :class:`torch.int32` or :class:`torch.int64` as
            well as for trying to cast :class:`torch.float64` to :class:`torch.int64`. These conversions might lead to
            overflow errors since the floating point ``dtype`` cannot store consecutive integers over the whole range
            of the integer ``dtype``.
    """

    _v1_transform_cls = _transforms.ConvertImageDtype

    def __init__(self, dtype: torch.dtype = torch.float32) -> None:
        super().__init__()
        self.dtype = dtype

    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
339
        return self._call_kernel(F.to_dtype, inpt, dtype=self.dtype, scale=True)
340
341


342
class SanitizeBoundingBoxes(Transform):
343
    """Remove degenerate/invalid bounding boxes and their corresponding labels and masks.
Nicolas Hug's avatar
Nicolas Hug committed
344
345
346
347
348

    This transform removes bounding boxes and their associated labels/masks that:

    - are below a given ``min_size``: by default this also removes degenerate boxes that have e.g. X2 <= X1.
    - have any coordinate outside of their corresponding image. You may want to
349
      call :class:`~torchvision.transforms.v2.ClampBoundingBoxes` first to avoid undesired removals.
Nicolas Hug's avatar
Nicolas Hug committed
350

351
352
353
    It can also sanitize other tensors like the "iscrowd" or "area" properties from COCO
    (see ``labels_getter`` parameter).

Nicolas Hug's avatar
Nicolas Hug committed
354
355
356
357
358
359
360
361
362
    It is recommended to call it at the end of a pipeline, before passing the
    input to the models. It is critical to call this transform if
    :class:`~torchvision.transforms.v2.RandomIoUCrop` was called.
    If you want to be extra careful, you may call it after all transforms that
    may modify bounding boxes but once at the end should be enough in most
    cases.

    Args:
        min_size (float, optional) The size below which bounding boxes are removed. Default is 1.
363
364
        labels_getter (callable or str or None, optional): indicates how to identify the labels in the input
            (or anything else that needs to be sanitized along with the bounding boxes).
365
            By default, this will try to find a "labels" key in the input (case-insensitive), if
Nicolas Hug's avatar
Nicolas Hug committed
366
367
            the input is a dict or it is a tuple whose second element is a dict.
            This heuristic should work well with a lot of datasets, including the built-in torchvision datasets.
368
369
370
371
372
373
374
375
376

            It can also be a callable that takes the same input as the transform, and returns either:

            - A single tensor (the labels)
            - A tuple/list of tensors, each of which will be subject to the same sanitization as the bounding boxes.
              This is useful to sanitize multiple tensors like the labels, and the "iscrowd" or "area" properties
              from COCO.

            If ``labels_getter`` is None then only bounding boxes are sanitized.
Nicolas Hug's avatar
Nicolas Hug committed
377
    """
378
379
380
381

    def __init__(
        self,
        min_size: float = 1.0,
382
        labels_getter: Union[Callable[[Any], Any], str, None] = "default",
383
384
385
386
387
388
389
390
    ) -> None:
        super().__init__()

        if min_size < 1:
            raise ValueError(f"min_size must be >= 1, got {min_size}.")
        self.min_size = min_size

        self.labels_getter = labels_getter
391
        self._labels_getter = _parse_labels_getter(labels_getter)
392
393
394
395

    def forward(self, *inputs: Any) -> Any:
        inputs = inputs if len(inputs) > 1 else inputs[0]

396
        labels = self._labels_getter(inputs)
397
398
399
400
401
402
403
404
405
406
407
        if labels is not None:
            msg = "The labels in the input to forward() must be a tensor or None, got {type} instead."
            if isinstance(labels, torch.Tensor):
                labels = (labels,)
            elif isinstance(labels, (tuple, list)):
                for entry in labels:
                    if not isinstance(entry, torch.Tensor):
                        # TODO: we don't need to enforce tensors, just that entries are indexable as t[bool_mask]
                        raise ValueError(msg.format(type=type(entry)))
            else:
                raise ValueError(msg.format(type=type(labels)))
408
409

        flat_inputs, spec = tree_flatten(inputs)
410
        boxes = get_bounding_boxes(flat_inputs)
411

412
413
414
415
416
417
418
        if labels is not None:
            for label in labels:
                if boxes.shape[0] != label.shape[0]:
                    raise ValueError(
                        f"Number of boxes (shape={boxes.shape}) and must match the number of labels."
                        f"Found labels with shape={label.shape})."
                    )
419

420
421
422
423
424
        valid = F._misc._get_sanitize_bounding_boxes_mask(
            boxes,
            format=boxes.format,
            canvas_size=boxes.canvas_size,
            min_size=self.min_size,
425
        )
426
427
        params = dict(valid=valid, labels=labels)
        flat_outputs = [self._transform(inpt, params) for inpt in flat_inputs]
428
429
430
431

        return tree_unflatten(flat_outputs, spec)

    def _transform(self, inpt: Any, params: Dict[str, Any]) -> Any:
432
        is_label = params["labels"] is not None and any(inpt is label for label in params["labels"])
433
        is_bounding_boxes_or_mask = isinstance(inpt, (tv_tensors.BoundingBoxes, tv_tensors.Mask))
434

435
        if not (is_label or is_bounding_boxes_or_mask):
436
            return inpt
437

438
439
440
441
        output = inpt[params["valid"]]

        if is_label:
            return output
442
443
        else:
            return tv_tensors.wrap(output, like=inpt)