Unverified Commit 1a9ff0d7 authored by Philip Meier's avatar Philip Meier Committed by GitHub
Browse files

Port remaining transforms tests (#7954)

parent 997384cf
...@@ -272,57 +272,6 @@ class TestSmoke: ...@@ -272,57 +272,6 @@ class TestSmoke:
) )
assert transforms.SanitizeBoundingBoxes()(sample)["boxes"].shape == (0, 4) assert transforms.SanitizeBoundingBoxes()(sample)["boxes"].shape == (0, 4)
@parametrize(
[
(
transform,
itertools.chain.from_iterable(
fn(
color_spaces=[
"GRAY",
"RGB",
],
dtypes=[torch.uint8],
extra_dims=[(), (4,)],
**(dict(num_frames=[3]) if fn is make_videos else dict()),
)
for fn in [
make_images,
make_vanilla_tensor_images,
make_pil_images,
make_videos,
]
),
)
for transform in (
transforms.RandAugment(),
transforms.TrivialAugmentWide(),
transforms.AutoAugment(),
transforms.AugMix(),
)
]
)
def test_auto_augment(self, transform, input):
transform(input)
@parametrize(
[
(
transforms.Normalize(mean=[0.0, 0.0, 0.0], std=[1.0, 1.0, 1.0]),
itertools.chain.from_iterable(
fn(color_spaces=["RGB"], dtypes=[torch.float32])
for fn in [
make_images,
make_vanilla_tensor_images,
make_videos,
]
),
),
]
)
def test_normalize(self, transform, input):
transform(input)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"flat_inputs", "flat_inputs",
...@@ -385,40 +334,6 @@ def test_pure_tensor_heuristic(flat_inputs): ...@@ -385,40 +334,6 @@ def test_pure_tensor_heuristic(flat_inputs):
assert transform.was_applied(output, input) assert transform.was_applied(output, input)
class TestElasticTransform:
def test_assertions(self):
with pytest.raises(TypeError, match="alpha should be a number or a sequence of numbers"):
transforms.ElasticTransform({})
with pytest.raises(ValueError, match="alpha is a sequence its length should be 1 or 2"):
transforms.ElasticTransform([1.0, 2.0, 3.0])
with pytest.raises(TypeError, match="sigma should be a number or a sequence of numbers"):
transforms.ElasticTransform(1.0, {})
with pytest.raises(ValueError, match="sigma is a sequence its length should be 1 or 2"):
transforms.ElasticTransform(1.0, [1.0, 2.0, 3.0])
with pytest.raises(TypeError, match="Got inappropriate fill arg"):
transforms.ElasticTransform(1.0, 2.0, fill="abc")
def test__get_params(self):
alpha = 2.0
sigma = 3.0
transform = transforms.ElasticTransform(alpha, sigma)
h, w = size = (24, 32)
image = make_image(size)
params = transform._get_params([image])
displacement = params["displacement"]
assert displacement.shape == (1, h, w, 2)
assert (-alpha / w <= displacement[0, ..., 0]).all() and (displacement[0, ..., 0] <= alpha / w).all()
assert (-alpha / h <= displacement[0, ..., 1]).all() and (displacement[0, ..., 1] <= alpha / h).all()
class TestTransform: class TestTransform:
@pytest.mark.parametrize( @pytest.mark.parametrize(
"inpt_type", "inpt_type",
...@@ -705,25 +620,6 @@ class TestRandomResize: ...@@ -705,25 +620,6 @@ class TestRandomResize:
assert min_size <= size < max_size assert min_size <= size < max_size
class TestUniformTemporalSubsample:
@pytest.mark.parametrize(
"inpt",
[
torch.zeros(10, 3, 8, 8),
torch.zeros(1, 10, 3, 8, 8),
tv_tensors.Video(torch.zeros(1, 10, 3, 8, 8)),
],
)
def test__transform(self, inpt):
num_samples = 5
transform = transforms.UniformTemporalSubsample(num_samples)
output = transform(inpt)
assert type(output) is type(inpt)
assert output.shape[-4] == num_samples
assert output.dtype == inpt.dtype
@pytest.mark.parametrize("image_type", (PIL.Image, torch.Tensor, tv_tensors.Image)) @pytest.mark.parametrize("image_type", (PIL.Image, torch.Tensor, tv_tensors.Image))
@pytest.mark.parametrize("label_type", (torch.Tensor, int)) @pytest.mark.parametrize("label_type", (torch.Tensor, int))
@pytest.mark.parametrize("dataset_return_type", (dict, tuple)) @pytest.mark.parametrize("dataset_return_type", (dict, tuple))
......
...@@ -72,34 +72,6 @@ LINEAR_TRANSFORMATION_MEAN = torch.rand(36) ...@@ -72,34 +72,6 @@ LINEAR_TRANSFORMATION_MEAN = torch.rand(36)
LINEAR_TRANSFORMATION_MATRIX = torch.rand([LINEAR_TRANSFORMATION_MEAN.numel()] * 2) LINEAR_TRANSFORMATION_MATRIX = torch.rand([LINEAR_TRANSFORMATION_MEAN.numel()] * 2)
CONSISTENCY_CONFIGS = [ CONSISTENCY_CONFIGS = [
ConsistencyConfig(
v2_transforms.Normalize,
legacy_transforms.Normalize,
[
ArgsKwargs(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
],
supports_pil=False,
make_images_kwargs=dict(DEFAULT_MAKE_IMAGES_KWARGS, dtypes=[torch.float]),
),
ConsistencyConfig(
v2_transforms.FiveCrop,
legacy_transforms.FiveCrop,
[
ArgsKwargs(18),
ArgsKwargs((18, 13)),
],
make_images_kwargs=dict(DEFAULT_MAKE_IMAGES_KWARGS, sizes=[(20, 19)]),
),
ConsistencyConfig(
v2_transforms.TenCrop,
legacy_transforms.TenCrop,
[
ArgsKwargs(18),
ArgsKwargs((18, 13)),
ArgsKwargs(18, vertical_flip=True),
],
make_images_kwargs=dict(DEFAULT_MAKE_IMAGES_KWARGS, sizes=[(20, 19)]),
),
*[ *[
ConsistencyConfig( ConsistencyConfig(
v2_transforms.LinearTransformation, v2_transforms.LinearTransformation,
...@@ -147,65 +119,6 @@ CONSISTENCY_CONFIGS = [ ...@@ -147,65 +119,6 @@ CONSISTENCY_CONFIGS = [
# images given that the transform does nothing but call it anyway. # images given that the transform does nothing but call it anyway.
supports_pil=False, supports_pil=False,
), ),
ConsistencyConfig(
v2_transforms.RandomEqualize,
legacy_transforms.RandomEqualize,
[
ArgsKwargs(p=0),
ArgsKwargs(p=1),
],
make_images_kwargs=dict(DEFAULT_MAKE_IMAGES_KWARGS, dtypes=[torch.uint8]),
),
ConsistencyConfig(
v2_transforms.RandomInvert,
legacy_transforms.RandomInvert,
[
ArgsKwargs(p=0),
ArgsKwargs(p=1),
],
),
ConsistencyConfig(
v2_transforms.RandomPosterize,
legacy_transforms.RandomPosterize,
[
ArgsKwargs(p=0, bits=5),
ArgsKwargs(p=1, bits=1),
ArgsKwargs(p=1, bits=3),
],
make_images_kwargs=dict(DEFAULT_MAKE_IMAGES_KWARGS, dtypes=[torch.uint8]),
),
ConsistencyConfig(
v2_transforms.RandomSolarize,
legacy_transforms.RandomSolarize,
[
ArgsKwargs(p=0, threshold=0.5),
ArgsKwargs(p=1, threshold=0.3),
ArgsKwargs(p=1, threshold=0.99),
],
),
*[
ConsistencyConfig(
v2_transforms.RandomAutocontrast,
legacy_transforms.RandomAutocontrast,
[
ArgsKwargs(p=0),
ArgsKwargs(p=1),
],
make_images_kwargs=dict(DEFAULT_MAKE_IMAGES_KWARGS, dtypes=[dt]),
closeness_kwargs=ckw,
)
for dt, ckw in [(torch.uint8, dict(atol=1, rtol=0)), (torch.float32, dict(rtol=None, atol=None))]
],
ConsistencyConfig(
v2_transforms.RandomAdjustSharpness,
legacy_transforms.RandomAdjustSharpness,
[
ArgsKwargs(p=0, sharpness_factor=0.5),
ArgsKwargs(p=1, sharpness_factor=0.2),
ArgsKwargs(p=1, sharpness_factor=0.99),
],
closeness_kwargs={"atol": 1e-6, "rtol": 1e-6},
),
ConsistencyConfig( ConsistencyConfig(
v2_transforms.PILToTensor, v2_transforms.PILToTensor,
legacy_transforms.PILToTensor, legacy_transforms.PILToTensor,
...@@ -230,22 +143,6 @@ CONSISTENCY_CONFIGS = [ ...@@ -230,22 +143,6 @@ CONSISTENCY_CONFIGS = [
v2_transforms.RandomOrder, v2_transforms.RandomOrder,
legacy_transforms.RandomOrder, legacy_transforms.RandomOrder,
), ),
ConsistencyConfig(
v2_transforms.AugMix,
legacy_transforms.AugMix,
),
ConsistencyConfig(
v2_transforms.AutoAugment,
legacy_transforms.AutoAugment,
),
ConsistencyConfig(
v2_transforms.RandAugment,
legacy_transforms.RandAugment,
),
ConsistencyConfig(
v2_transforms.TrivialAugmentWide,
legacy_transforms.TrivialAugmentWide,
),
] ]
...@@ -753,36 +650,9 @@ class TestRefSegTransforms: ...@@ -753,36 +650,9 @@ class TestRefSegTransforms:
(legacy_F.pil_to_tensor, {}), (legacy_F.pil_to_tensor, {}),
(legacy_F.convert_image_dtype, {}), (legacy_F.convert_image_dtype, {}),
(legacy_F.to_pil_image, {}), (legacy_F.to_pil_image, {}),
(legacy_F.normalize, {}),
(legacy_F.resize, {"interpolation"}),
(legacy_F.pad, {"padding", "fill"}),
(legacy_F.crop, {}),
(legacy_F.center_crop, {}),
(legacy_F.resized_crop, {"interpolation"}),
(legacy_F.hflip, {}),
(legacy_F.perspective, {"startpoints", "endpoints", "fill", "interpolation"}),
(legacy_F.vflip, {}),
(legacy_F.five_crop, {}),
(legacy_F.ten_crop, {}),
(legacy_F.adjust_brightness, {}),
(legacy_F.adjust_contrast, {}),
(legacy_F.adjust_saturation, {}),
(legacy_F.adjust_hue, {}),
(legacy_F.adjust_gamma, {}),
(legacy_F.rotate, {"center", "fill", "interpolation"}),
(legacy_F.affine, {"angle", "translate", "center", "fill", "interpolation"}),
(legacy_F.to_grayscale, {}), (legacy_F.to_grayscale, {}),
(legacy_F.rgb_to_grayscale, {}), (legacy_F.rgb_to_grayscale, {}),
(legacy_F.to_tensor, {}), (legacy_F.to_tensor, {}),
(legacy_F.erase, {}),
(legacy_F.gaussian_blur, {}),
(legacy_F.invert, {}),
(legacy_F.posterize, {}),
(legacy_F.solarize, {}),
(legacy_F.adjust_sharpness, {}),
(legacy_F.autocontrast, {}),
(legacy_F.equalize, {}),
(legacy_F.elastic_transform, {"fill", "interpolation"}),
], ],
) )
def test_dispatcher_signature_consistency(legacy_dispatcher, name_only_params): def test_dispatcher_signature_consistency(legacy_dispatcher, name_only_params):
......
import inspect
import re
import numpy as np import numpy as np
import PIL.Image import PIL.Image
import pytest import pytest
import torch import torch
from common_utils import assert_close, cache, cpu_and_cuda, needs_cuda, set_rng_seed
from torch.utils._pytree import tree_map
from torchvision import tv_tensors
from torchvision.transforms.v2 import functional as F from torchvision.transforms.v2 import functional as F
from torchvision.transforms.v2._utils import is_pure_tensor
from transforms_v2_dispatcher_infos import DISPATCHER_INFOS
from transforms_v2_kernel_infos import KERNEL_INFOS
from transforms_v2_legacy_utils import (
DEFAULT_SQUARE_SPATIAL_SIZE,
make_multiple_bounding_boxes,
parametrized_error_message,
)
KERNEL_INFOS_MAP = {info.kernel: info for info in KERNEL_INFOS}
DISPATCHER_INFOS_MAP = {info.dispatcher: info for info in DISPATCHER_INFOS}
@cache
def script(fn):
try:
return torch.jit.script(fn)
except Exception as error:
raise AssertionError(f"Trying to `torch.jit.script` '{fn.__name__}' raised the error above.") from error
# Scripting a function often triggers a warning like
# `UserWarning: operator() profile_node %$INT1 : int[] = prim::profile_ivalue($INT2) does not have profile information`
# with varying `INT1` and `INT2`. Since these are uninteresting for us and only clutter the test summary, we ignore
# them.
ignore_jit_warning_no_profile = pytest.mark.filterwarnings(
f"ignore:{re.escape('operator() profile_node %')}:UserWarning"
)
def make_info_args_kwargs_params(info, *, args_kwargs_fn, test_id=None):
args_kwargs = list(args_kwargs_fn(info))
if not args_kwargs:
raise pytest.UsageError(
f"Couldn't collect a single `ArgsKwargs` for `{info.id}`{f' in {test_id}' if test_id else ''}"
)
idx_field_len = len(str(len(args_kwargs)))
return [
pytest.param(
info,
args_kwargs_,
marks=info.get_marks(test_id, args_kwargs_) if test_id else [],
id=f"{info.id}-{idx:0{idx_field_len}}",
)
for idx, args_kwargs_ in enumerate(args_kwargs)
]
def make_info_args_kwargs_parametrization(infos, *, args_kwargs_fn):
def decorator(test_fn):
parts = test_fn.__qualname__.split(".")
if len(parts) == 1:
test_class_name = None
test_function_name = parts[0]
elif len(parts) == 2:
test_class_name, test_function_name = parts
else:
raise pytest.UsageError("Unable to parse the test class name and test function name from test function")
test_id = (test_class_name, test_function_name)
argnames = ("info", "args_kwargs")
argvalues = []
for info in infos:
argvalues.extend(make_info_args_kwargs_params(info, args_kwargs_fn=args_kwargs_fn, test_id=test_id))
return pytest.mark.parametrize(argnames, argvalues)(test_fn)
return decorator
@pytest.fixture(autouse=True)
def fix_rng_seed():
set_rng_seed(0)
yield
@pytest.fixture()
def test_id(request):
test_class_name = request.cls.__name__ if request.cls is not None else None
test_function_name = request.node.originalname
return test_class_name, test_function_name
class TestKernels:
sample_inputs = make_info_args_kwargs_parametrization(
KERNEL_INFOS,
args_kwargs_fn=lambda kernel_info: kernel_info.sample_inputs_fn(),
)
reference_inputs = make_info_args_kwargs_parametrization(
[info for info in KERNEL_INFOS if info.reference_fn is not None],
args_kwargs_fn=lambda info: info.reference_inputs_fn(),
)
@make_info_args_kwargs_parametrization(
[info for info in KERNEL_INFOS if info.logs_usage],
args_kwargs_fn=lambda info: info.sample_inputs_fn(),
)
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_logging(self, spy_on, info, args_kwargs, device):
spy = spy_on(torch._C._log_api_usage_once)
(input, *other_args), kwargs = args_kwargs.load(device)
info.kernel(input.as_subclass(torch.Tensor), *other_args, **kwargs)
spy.assert_any_call(f"{info.kernel.__module__}.{info.id}")
@ignore_jit_warning_no_profile
@sample_inputs
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_scripted_vs_eager(self, test_id, info, args_kwargs, device):
kernel_eager = info.kernel
kernel_scripted = script(kernel_eager)
(input, *other_args), kwargs = args_kwargs.load(device)
input = input.as_subclass(torch.Tensor)
actual = kernel_scripted(input, *other_args, **kwargs)
expected = kernel_eager(input, *other_args, **kwargs)
assert_close(
actual,
expected,
**info.get_closeness_kwargs(test_id, dtype=input.dtype, device=input.device),
msg=parametrized_error_message(input, other_args, **kwargs),
)
def _unbatch(self, batch, *, data_dims):
if isinstance(batch, torch.Tensor):
batched_tensor = batch
metadata = ()
else:
batched_tensor, *metadata = batch
if batched_tensor.ndim == data_dims:
return batch
return [
self._unbatch(unbatched, data_dims=data_dims)
for unbatched in (
batched_tensor.unbind(0) if not metadata else [(t, *metadata) for t in batched_tensor.unbind(0)]
)
]
@sample_inputs
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_batched_vs_single(self, test_id, info, args_kwargs, device):
(batched_input, *other_args), kwargs = args_kwargs.load(device)
tv_tensor_type = tv_tensors.Image if is_pure_tensor(batched_input) else type(batched_input)
# This dictionary contains the number of rightmost dimensions that contain the actual data.
# Everything to the left is considered a batch dimension.
data_dims = {
tv_tensors.Image: 3,
tv_tensors.BoundingBoxes: 1,
# `Mask`'s are special in the sense that the data dimensions depend on the type of mask. For detection masks
# it is 3 `(*, N, H, W)`, but for segmentation masks it is 2 `(*, H, W)`. Since both a grouped under one
# type all kernels should also work without differentiating between the two. Thus, we go with 2 here as
# common ground.
tv_tensors.Mask: 2,
tv_tensors.Video: 4,
}.get(tv_tensor_type)
if data_dims is None:
raise pytest.UsageError(
f"The number of data dimensions cannot be determined for input of type {tv_tensor_type.__name__}."
) from None
elif batched_input.ndim <= data_dims:
pytest.skip("Input is not batched.")
elif not all(batched_input.shape[:-data_dims]):
pytest.skip("Input has a degenerate batch shape.")
batched_input = batched_input.as_subclass(torch.Tensor)
batched_output = info.kernel(batched_input, *other_args, **kwargs)
actual = self._unbatch(batched_output, data_dims=data_dims)
single_inputs = self._unbatch(batched_input, data_dims=data_dims)
expected = tree_map(lambda single_input: info.kernel(single_input, *other_args, **kwargs), single_inputs)
assert_close(
actual,
expected,
**info.get_closeness_kwargs(test_id, dtype=batched_input.dtype, device=batched_input.device),
msg=parametrized_error_message(batched_input, *other_args, **kwargs),
)
@sample_inputs
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_no_inplace(self, info, args_kwargs, device):
(input, *other_args), kwargs = args_kwargs.load(device)
input = input.as_subclass(torch.Tensor)
if input.numel() == 0:
pytest.skip("The input has a degenerate shape.")
input_version = input._version
info.kernel(input, *other_args, **kwargs)
assert input._version == input_version
@sample_inputs
@needs_cuda
def test_cuda_vs_cpu(self, test_id, info, args_kwargs):
(input_cpu, *other_args), kwargs = args_kwargs.load("cpu")
input_cpu = input_cpu.as_subclass(torch.Tensor)
input_cuda = input_cpu.to("cuda")
output_cpu = info.kernel(input_cpu, *other_args, **kwargs)
output_cuda = info.kernel(input_cuda, *other_args, **kwargs)
assert_close(
output_cuda,
output_cpu,
check_device=False,
**info.get_closeness_kwargs(test_id, dtype=input_cuda.dtype, device=input_cuda.device),
msg=parametrized_error_message(input_cpu, *other_args, **kwargs),
)
@sample_inputs
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_dtype_and_device_consistency(self, info, args_kwargs, device):
(input, *other_args), kwargs = args_kwargs.load(device)
input = input.as_subclass(torch.Tensor)
output = info.kernel(input, *other_args, **kwargs)
# Most kernels just return a tensor, but some also return some additional metadata
if not isinstance(output, torch.Tensor):
output, *_ = output
assert output.dtype == input.dtype
assert output.device == input.device
@reference_inputs
def test_against_reference(self, test_id, info, args_kwargs):
(input, *other_args), kwargs = args_kwargs.load("cpu")
actual = info.kernel(input.as_subclass(torch.Tensor), *other_args, **kwargs)
# We intnetionally don't unwrap the input of the reference function in order for it to have access to all
# metadata regardless of whether the kernel takes it explicitly or not
expected = info.reference_fn(input, *other_args, **kwargs)
assert_close(
actual,
expected,
**info.get_closeness_kwargs(test_id, dtype=input.dtype, device=input.device),
msg=parametrized_error_message(input, *other_args, **kwargs),
)
@make_info_args_kwargs_parametrization(
[info for info in KERNEL_INFOS if info.float32_vs_uint8],
args_kwargs_fn=lambda info: info.reference_inputs_fn(),
)
def test_float32_vs_uint8(self, test_id, info, args_kwargs):
(input, *other_args), kwargs = args_kwargs.load("cpu")
input = input.as_subclass(torch.Tensor)
if input.dtype != torch.uint8:
pytest.skip(f"Input dtype is {input.dtype}.")
adapted_other_args, adapted_kwargs = info.float32_vs_uint8(other_args, kwargs)
actual = info.kernel(
F.to_dtype_image(input, dtype=torch.float32, scale=True),
*adapted_other_args,
**adapted_kwargs,
)
expected = F.to_dtype_image(info.kernel(input, *other_args, **kwargs), dtype=torch.float32, scale=True)
assert_close(
actual,
expected,
**info.get_closeness_kwargs(test_id, dtype=torch.float32, device=input.device),
msg=parametrized_error_message(input, *other_args, **kwargs),
)
@pytest.fixture
def spy_on(mocker):
def make_spy(fn, *, module=None, name=None):
# TODO: we can probably get rid of the non-default modules and names if we eliminate aliasing
module = module or fn.__module__
name = name or fn.__name__
spy = mocker.patch(f"{module}.{name}", wraps=fn)
return spy
return make_spy
class TestDispatchers:
image_sample_inputs = make_info_args_kwargs_parametrization(
[info for info in DISPATCHER_INFOS if tv_tensors.Image in info.kernels],
args_kwargs_fn=lambda info: info.sample_inputs(tv_tensors.Image),
)
@make_info_args_kwargs_parametrization(
DISPATCHER_INFOS,
args_kwargs_fn=lambda info: info.sample_inputs(),
)
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_logging(self, spy_on, info, args_kwargs, device):
spy = spy_on(torch._C._log_api_usage_once)
args, kwargs = args_kwargs.load(device)
info.dispatcher(*args, **kwargs)
spy.assert_any_call(f"{info.dispatcher.__module__}.{info.id}")
@ignore_jit_warning_no_profile
@image_sample_inputs
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_scripted_smoke(self, info, args_kwargs, device):
dispatcher = script(info.dispatcher)
(image_tv_tensor, *other_args), kwargs = args_kwargs.load(device)
image_pure_tensor = torch.Tensor(image_tv_tensor)
dispatcher(image_pure_tensor, *other_args, **kwargs)
# TODO: We need this until the dispatchers below also have `DispatcherInfo`'s. If they do, `test_scripted_smoke`
# replaces this test for them.
@ignore_jit_warning_no_profile
@pytest.mark.parametrize(
"dispatcher",
[
F.get_dimensions,
F.get_image_num_channels,
F.get_image_size,
F.get_num_channels,
F.get_num_frames,
F.get_size,
F.rgb_to_grayscale,
F.uniform_temporal_subsample,
],
ids=lambda dispatcher: dispatcher.__name__,
)
def test_scriptable(self, dispatcher):
script(dispatcher)
@image_sample_inputs
def test_pure_tensor_output_type(self, info, args_kwargs):
(image_tv_tensor, *other_args), kwargs = args_kwargs.load()
image_pure_tensor = image_tv_tensor.as_subclass(torch.Tensor)
output = info.dispatcher(image_pure_tensor, *other_args, **kwargs)
# We cannot use `isinstance` here since all tv_tensors are instances of `torch.Tensor` as well
assert type(output) is torch.Tensor
@make_info_args_kwargs_parametrization(
[info for info in DISPATCHER_INFOS if info.pil_kernel_info is not None],
args_kwargs_fn=lambda info: info.sample_inputs(tv_tensors.Image),
)
def test_pil_output_type(self, info, args_kwargs):
(image_tv_tensor, *other_args), kwargs = args_kwargs.load()
if image_tv_tensor.ndim > 3:
pytest.skip("Input is batched")
image_pil = F.to_pil_image(image_tv_tensor)
output = info.dispatcher(image_pil, *other_args, **kwargs)
assert isinstance(output, PIL.Image.Image)
@make_info_args_kwargs_parametrization(
DISPATCHER_INFOS,
args_kwargs_fn=lambda info: info.sample_inputs(),
)
def test_tv_tensor_output_type(self, info, args_kwargs):
(tv_tensor, *other_args), kwargs = args_kwargs.load()
output = info.dispatcher(tv_tensor, *other_args, **kwargs)
assert isinstance(output, type(tv_tensor))
if isinstance(tv_tensor, tv_tensors.BoundingBoxes) and info.dispatcher is not F.convert_bounding_box_format:
assert output.format == tv_tensor.format
@pytest.mark.parametrize(
("dispatcher_info", "tv_tensor_type", "kernel_info"),
[
pytest.param(
dispatcher_info, tv_tensor_type, kernel_info, id=f"{dispatcher_info.id}-{tv_tensor_type.__name__}"
)
for dispatcher_info in DISPATCHER_INFOS
for tv_tensor_type, kernel_info in dispatcher_info.kernel_infos.items()
],
)
def test_dispatcher_kernel_signatures_consistency(self, dispatcher_info, tv_tensor_type, kernel_info):
dispatcher_signature = inspect.signature(dispatcher_info.dispatcher)
dispatcher_params = list(dispatcher_signature.parameters.values())[1:]
kernel_signature = inspect.signature(kernel_info.kernel)
kernel_params = list(kernel_signature.parameters.values())[1:]
# We filter out metadata that is implicitly passed to the dispatcher through the input tv_tensor, but has to be
# explicitly passed to the kernel.
input_type = {v: k for k, v in dispatcher_info.kernels.items()}.get(kernel_info.kernel)
explicit_metadata = {
tv_tensors.BoundingBoxes: {"format", "canvas_size"},
}
kernel_params = [param for param in kernel_params if param.name not in explicit_metadata.get(input_type, set())]
dispatcher_params = iter(dispatcher_params)
for dispatcher_param, kernel_param in zip(dispatcher_params, kernel_params):
try:
# In general, the dispatcher parameters are a superset of the kernel parameters. Thus, we filter out
# dispatcher parameters that have no kernel equivalent while keeping the order intact.
while dispatcher_param.name != kernel_param.name:
dispatcher_param = next(dispatcher_params)
except StopIteration:
raise AssertionError(
f"Parameter `{kernel_param.name}` of kernel `{kernel_info.id}` "
f"has no corresponding parameter on the dispatcher `{dispatcher_info.id}`."
) from None
assert dispatcher_param == kernel_param
@pytest.mark.parametrize("info", DISPATCHER_INFOS, ids=lambda info: info.id)
def test_unkown_type(self, info):
unkown_input = object()
(_, *other_args), kwargs = next(iter(info.sample_inputs())).load("cpu")
with pytest.raises(TypeError, match=re.escape(str(type(unkown_input)))):
info.dispatcher(unkown_input, *other_args, **kwargs)
@make_info_args_kwargs_parametrization(
[
info
for info in DISPATCHER_INFOS
if tv_tensors.BoundingBoxes in info.kernels and info.dispatcher is not F.convert_bounding_box_format
],
args_kwargs_fn=lambda info: info.sample_inputs(tv_tensors.BoundingBoxes),
)
def test_bounding_boxes_format_consistency(self, info, args_kwargs):
(bounding_boxes, *other_args), kwargs = args_kwargs.load()
format = bounding_boxes.format
output = info.dispatcher(bounding_boxes, *other_args, **kwargs)
assert output.format == format
@pytest.mark.parametrize( @pytest.mark.parametrize(
...@@ -471,56 +24,6 @@ def test_alias(alias, target): ...@@ -471,56 +24,6 @@ def test_alias(alias, target):
assert alias is target assert alias is target
@pytest.mark.parametrize("device", cpu_and_cuda())
@pytest.mark.parametrize("num_channels", [1, 3])
def test_normalize_image_tensor_stats(device, num_channels):
stats = pytest.importorskip("scipy.stats", reason="SciPy is not available")
def assert_samples_from_standard_normal(t):
p_value = stats.kstest(t.flatten(), cdf="norm", args=(0, 1)).pvalue
return p_value > 1e-4
image = torch.rand(num_channels, DEFAULT_SQUARE_SPATIAL_SIZE, DEFAULT_SQUARE_SPATIAL_SIZE)
mean = image.mean(dim=(1, 2)).tolist()
std = image.std(dim=(1, 2)).tolist()
assert_samples_from_standard_normal(F.normalize_image(image, mean, std))
class TestClampBoundingBoxes:
@pytest.mark.parametrize(
"metadata",
[
dict(),
dict(format=tv_tensors.BoundingBoxFormat.XYXY),
dict(canvas_size=(1, 1)),
],
)
def test_pure_tensor_insufficient_metadata(self, metadata):
pure_tensor = next(make_multiple_bounding_boxes()).as_subclass(torch.Tensor)
with pytest.raises(ValueError, match=re.escape("`format` and `canvas_size` has to be passed")):
F.clamp_bounding_boxes(pure_tensor, **metadata)
@pytest.mark.parametrize(
"metadata",
[
dict(format=tv_tensors.BoundingBoxFormat.XYXY),
dict(canvas_size=(1, 1)),
dict(format=tv_tensors.BoundingBoxFormat.XYXY, canvas_size=(1, 1)),
],
)
def test_tv_tensor_explicit_metadata(self, metadata):
tv_tensor = next(make_multiple_bounding_boxes())
with pytest.raises(ValueError, match=re.escape("`format` and `canvas_size` must not be passed")):
F.clamp_bounding_boxes(tv_tensor, **metadata)
# TODO: All correctness checks below this line should be ported to be references on a `KernelInfo` in
# `transforms_v2_kernel_infos.py`
@pytest.mark.parametrize( @pytest.mark.parametrize(
"inpt", "inpt",
[ [
...@@ -549,24 +52,3 @@ def test_to_pil_image(inpt, mode): ...@@ -549,24 +52,3 @@ def test_to_pil_image(inpt, mode):
assert isinstance(output, PIL.Image.Image) assert isinstance(output, PIL.Image.Image)
assert np.asarray(inpt).sum() == np.asarray(output).sum() assert np.asarray(inpt).sum() == np.asarray(output).sum()
def test_equalize_image_tensor_edge_cases():
inpt = torch.zeros(3, 200, 200, dtype=torch.uint8)
output = F.equalize_image(inpt)
torch.testing.assert_close(inpt, output)
inpt = torch.zeros(5, 3, 200, 200, dtype=torch.uint8)
inpt[..., 100:, 100:] = 1
output = F.equalize_image(inpt)
assert output.unique().tolist() == [0, 255]
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_correctness_uniform_temporal_subsample(device):
video = torch.arange(10, device=device)[:, None, None, None].expand(-1, 3, 8, 8)
out_video = F.uniform_temporal_subsample(video, 5)
assert out_video.unique().tolist() == [0, 2, 4, 6, 9]
out_video = F.uniform_temporal_subsample(video, 8)
assert out_video.unique().tolist() == [0, 1, 2, 3, 5, 6, 7, 9]
...@@ -3859,6 +3859,729 @@ class TestPerspective: ...@@ -3859,6 +3859,729 @@ class TestPerspective:
assert_close(actual, expected, rtol=0, atol=1) assert_close(actual, expected, rtol=0, atol=1)
class TestEqualize:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.equalize_image, make_image(dtype=dtype, device=device))
def test_kernel_video(self):
check_kernel(F.equalize_image, make_video())
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image_pil, make_image, make_video])
def test_functional(self, make_input):
check_functional(F.equalize, make_input())
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.equalize_image, torch.Tensor),
(F._equalize_image_pil, PIL.Image.Image),
(F.equalize_image, tv_tensors.Image),
(F.equalize_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.equalize, kernel=kernel, input_type=input_type)
@pytest.mark.parametrize(
"make_input",
[make_image_tensor, make_image_pil, make_image, make_video],
)
def test_transform(self, make_input):
check_transform(transforms.RandomEqualize(p=1), make_input())
@pytest.mark.parametrize(("low", "high"), [(0, 64), (64, 192), (192, 256), (0, 1), (127, 128), (255, 256)])
@pytest.mark.parametrize("fn", [F.equalize, transform_cls_to_functional(transforms.RandomEqualize, p=1)])
def test_image_correctness(self, low, high, fn):
# We are not using the default `make_image` here since that uniformly samples the values over the whole value
# range. Since the whole point of F.equalize is to transform an arbitrary distribution of values into a uniform
# one over the full range, the information gain is low if we already provide something really close to the
# expected value.
image = tv_tensors.Image(
torch.testing.make_tensor((3, 117, 253), dtype=torch.uint8, device="cpu", low=low, high=high)
)
actual = fn(image)
expected = F.to_image(F.equalize(F.to_pil_image(image)))
assert_equal(actual, expected)
class TestUniformTemporalSubsample:
def test_kernel_video(self):
check_kernel(F.uniform_temporal_subsample_video, make_video(), num_samples=2)
@pytest.mark.parametrize("make_input", [make_video_tensor, make_video])
def test_functional(self, make_input):
check_functional(F.uniform_temporal_subsample, make_input(), num_samples=2)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.uniform_temporal_subsample_video, torch.Tensor),
(F.uniform_temporal_subsample_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.uniform_temporal_subsample, kernel=kernel, input_type=input_type)
@pytest.mark.parametrize("make_input", [make_video_tensor, make_video])
def test_transform(self, make_input):
check_transform(transforms.UniformTemporalSubsample(num_samples=2), make_input())
def _reference_uniform_temporal_subsample_video(self, video, *, num_samples):
# Adapted from
# https://github.com/facebookresearch/pytorchvideo/blob/c8d23d8b7e597586a9e2d18f6ed31ad8aa379a7a/pytorchvideo/transforms/functional.py#L19
t = video.shape[-4]
assert num_samples > 0 and t > 0
# Sample by nearest neighbor interpolation if num_samples > t.
indices = torch.linspace(0, t - 1, num_samples, device=video.device)
indices = torch.clamp(indices, 0, t - 1).long()
return tv_tensors.Video(torch.index_select(video, -4, indices))
CORRECTNESS_NUM_FRAMES = 5
@pytest.mark.parametrize("num_samples", list(range(1, CORRECTNESS_NUM_FRAMES + 1)))
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
@pytest.mark.parametrize(
"fn", [F.uniform_temporal_subsample, transform_cls_to_functional(transforms.UniformTemporalSubsample)]
)
def test_video_correctness(self, num_samples, dtype, device, fn):
video = make_video(num_frames=self.CORRECTNESS_NUM_FRAMES, dtype=dtype, device=device)
actual = fn(video, num_samples=num_samples)
expected = self._reference_uniform_temporal_subsample_video(video, num_samples=num_samples)
assert_equal(actual, expected)
class TestNormalize:
MEANS_STDS = [
((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
([0.0, 0.0, 0.0], [1.0, 1.0, 1.0]),
]
MEAN, STD = MEANS_STDS[0]
@pytest.mark.parametrize(("mean", "std"), [*MEANS_STDS, (0.5, 2.0)])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, mean, std, device):
check_kernel(F.normalize_image, make_image(dtype=torch.float32, device=device), mean=self.MEAN, std=self.STD)
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image_inplace(self, device):
input = make_image_tensor(dtype=torch.float32, device=device)
input_version = input._version
output_out_of_place = F.normalize_image(input, mean=self.MEAN, std=self.STD)
assert output_out_of_place.data_ptr() != input.data_ptr()
assert output_out_of_place is not input
output_inplace = F.normalize_image(input, mean=self.MEAN, std=self.STD, inplace=True)
assert output_inplace.data_ptr() == input.data_ptr()
assert output_inplace._version > input_version
assert output_inplace is input
assert_equal(output_inplace, output_out_of_place)
def test_kernel_video(self):
check_kernel(F.normalize_video, make_video(dtype=torch.float32), mean=self.MEAN, std=self.STD)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_video])
def test_functional(self, make_input):
check_functional(F.normalize, make_input(dtype=torch.float32), mean=self.MEAN, std=self.STD)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.normalize_image, torch.Tensor),
(F.normalize_image, tv_tensors.Image),
(F.normalize_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.normalize, kernel=kernel, input_type=input_type)
def test_functional_error(self):
with pytest.raises(TypeError, match="should be a float tensor"):
F.normalize_image(make_image(dtype=torch.uint8), mean=self.MEAN, std=self.STD)
with pytest.raises(ValueError, match="tensor image of size"):
F.normalize_image(torch.rand(16, 16, dtype=torch.float32), mean=self.MEAN, std=self.STD)
for std in [0, [0, 0, 0], [0, 1, 1]]:
with pytest.raises(ValueError, match="std evaluated to zero, leading to division by zero"):
F.normalize_image(make_image(dtype=torch.float32), mean=self.MEAN, std=std)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_video])
def test_transform(self, make_input):
check_transform(transforms.Normalize(mean=self.MEAN, std=self.STD), make_input(dtype=torch.float32))
def _reference_normalize_image(self, image, *, mean, std):
image = image.numpy()
mean, std = [np.array(stat, dtype=image.dtype).reshape((-1, 1, 1)) for stat in [mean, std]]
return tv_tensors.Image((image - mean) / std)
@pytest.mark.parametrize(("mean", "std"), MEANS_STDS)
@pytest.mark.parametrize("dtype", [torch.float16, torch.float32, torch.float64])
@pytest.mark.parametrize("fn", [F.normalize, transform_cls_to_functional(transforms.Normalize)])
def test_correctness_image(self, mean, std, dtype, fn):
image = make_image(dtype=dtype)
actual = fn(image, mean=mean, std=std)
expected = self._reference_normalize_image(image, mean=mean, std=std)
assert_equal(actual, expected)
class TestClampBoundingBoxes:
@pytest.mark.parametrize("format", list(tv_tensors.BoundingBoxFormat))
@pytest.mark.parametrize("dtype", [torch.int64, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel(self, format, dtype, device):
bounding_boxes = make_bounding_boxes(format=format, dtype=dtype, device=device)
check_kernel(
F.clamp_bounding_boxes,
bounding_boxes,
format=bounding_boxes.format,
canvas_size=bounding_boxes.canvas_size,
)
@pytest.mark.parametrize("format", list(tv_tensors.BoundingBoxFormat))
def test_functional(self, format):
check_functional(F.clamp_bounding_boxes, make_bounding_boxes(format=format))
def test_errors(self):
input_tv_tensor = make_bounding_boxes()
input_pure_tensor = input_tv_tensor.as_subclass(torch.Tensor)
format, canvas_size = input_tv_tensor.format, input_tv_tensor.canvas_size
for format_, canvas_size_ in [(None, None), (format, None), (None, canvas_size)]:
with pytest.raises(
ValueError, match="For pure tensor inputs, `format` and `canvas_size` have to be passed."
):
F.clamp_bounding_boxes(input_pure_tensor, format=format_, canvas_size=canvas_size_)
for format_, canvas_size_ in [(format, canvas_size), (format, None), (None, canvas_size)]:
with pytest.raises(
ValueError, match="For bounding box tv_tensor inputs, `format` and `canvas_size` must not be passed."
):
F.clamp_bounding_boxes(input_tv_tensor, format=format_, canvas_size=canvas_size_)
def test_transform(self):
check_transform(transforms.ClampBoundingBoxes(), make_bounding_boxes())
class TestInvert:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.int16, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.invert_image, make_image(dtype=dtype, device=device))
def test_kernel_video(self):
check_kernel(F.invert_video, make_video())
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.invert, make_input())
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.invert_image, torch.Tensor),
(F._invert_image_pil, PIL.Image.Image),
(F.invert_image, tv_tensors.Image),
(F.invert_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.invert, kernel=kernel, input_type=input_type)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image_pil, make_image, make_video])
def test_transform(self, make_input):
check_transform(transforms.RandomInvert(p=1), make_input())
@pytest.mark.parametrize("fn", [F.invert, transform_cls_to_functional(transforms.RandomInvert, p=1)])
def test_correctness_image(self, fn):
image = make_image(dtype=torch.uint8, device="cpu")
actual = fn(image)
expected = F.to_image(F.invert(F.to_pil_image(image)))
assert_equal(actual, expected)
class TestPosterize:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.posterize_image, make_image(dtype=dtype, device=device), bits=1)
def test_kernel_video(self):
check_kernel(F.posterize_video, make_video(), bits=1)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.posterize, make_input(), bits=1)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.posterize_image, torch.Tensor),
(F._posterize_image_pil, PIL.Image.Image),
(F.posterize_image, tv_tensors.Image),
(F.posterize_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.posterize, kernel=kernel, input_type=input_type)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image_pil, make_image, make_video])
def test_transform(self, make_input):
check_transform(transforms.RandomPosterize(bits=1, p=1), make_input())
@pytest.mark.parametrize("bits", [1, 4, 8])
@pytest.mark.parametrize("fn", [F.posterize, transform_cls_to_functional(transforms.RandomPosterize, p=1)])
def test_correctness_image(self, bits, fn):
image = make_image(dtype=torch.uint8, device="cpu")
actual = fn(image, bits=bits)
expected = F.to_image(F.posterize(F.to_pil_image(image), bits=bits))
assert_equal(actual, expected)
class TestSolarize:
def _make_threshold(self, input, *, factor=0.5):
dtype = input.dtype if isinstance(input, torch.Tensor) else torch.uint8
return (float if dtype.is_floating_point else int)(get_max_value(dtype) * factor)
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
image = make_image(dtype=dtype, device=device)
check_kernel(F.solarize_image, image, threshold=self._make_threshold(image))
def test_kernel_video(self):
video = make_video()
check_kernel(F.solarize_video, video, threshold=self._make_threshold(video))
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
input = make_input()
check_functional(F.solarize, input, threshold=self._make_threshold(input))
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.solarize_image, torch.Tensor),
(F._solarize_image_pil, PIL.Image.Image),
(F.solarize_image, tv_tensors.Image),
(F.solarize_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.solarize, kernel=kernel, input_type=input_type)
@pytest.mark.parametrize(("dtype", "threshold"), [(torch.uint8, 256), (torch.float, 1.5)])
def test_functional_error(self, dtype, threshold):
with pytest.raises(TypeError, match="Threshold should be less or equal the maximum value of the dtype"):
F.solarize(make_image(dtype=dtype), threshold=threshold)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image_pil, make_image, make_video])
def test_transform(self, make_input):
input = make_input()
check_transform(transforms.RandomSolarize(threshold=self._make_threshold(input), p=1), input)
@pytest.mark.parametrize("threshold_factor", [0.0, 0.1, 0.5, 0.9, 1.0])
@pytest.mark.parametrize("fn", [F.solarize, transform_cls_to_functional(transforms.RandomSolarize, p=1)])
def test_correctness_image(self, threshold_factor, fn):
image = make_image(dtype=torch.uint8, device="cpu")
threshold = self._make_threshold(image, factor=threshold_factor)
actual = fn(image, threshold=threshold)
expected = F.to_image(F.solarize(F.to_pil_image(image), threshold=threshold))
assert_equal(actual, expected)
class TestAutocontrast:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.int16, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.autocontrast_image, make_image(dtype=dtype, device=device))
def test_kernel_video(self):
check_kernel(F.autocontrast_video, make_video())
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.autocontrast, make_input())
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.autocontrast_image, torch.Tensor),
(F._autocontrast_image_pil, PIL.Image.Image),
(F.autocontrast_image, tv_tensors.Image),
(F.autocontrast_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.autocontrast, kernel=kernel, input_type=input_type)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image_pil, make_image, make_video])
def test_transform(self, make_input):
check_transform(transforms.RandomAutocontrast(p=1), make_input(), check_v1_compatibility=dict(rtol=0, atol=1))
@pytest.mark.parametrize("fn", [F.autocontrast, transform_cls_to_functional(transforms.RandomAutocontrast, p=1)])
def test_correctness_image(self, fn):
image = make_image(dtype=torch.uint8, device="cpu")
actual = fn(image)
expected = F.to_image(F.autocontrast(F.to_pil_image(image)))
assert_close(actual, expected, rtol=0, atol=1)
class TestAdjustSharpness:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.adjust_sharpness_image, make_image(dtype=dtype, device=device), sharpness_factor=0.5)
def test_kernel_video(self):
check_kernel(F.adjust_sharpness_video, make_video(), sharpness_factor=0.5)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.adjust_sharpness, make_input(), sharpness_factor=0.5)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.adjust_sharpness_image, torch.Tensor),
(F._adjust_sharpness_image_pil, PIL.Image.Image),
(F.adjust_sharpness_image, tv_tensors.Image),
(F.adjust_sharpness_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.adjust_sharpness, kernel=kernel, input_type=input_type)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image_pil, make_image, make_video])
def test_transform(self, make_input):
check_transform(transforms.RandomAdjustSharpness(sharpness_factor=0.5, p=1), make_input())
def test_functional_error(self):
with pytest.raises(TypeError, match="can have 1 or 3 channels"):
F.adjust_sharpness(make_image(color_space="RGBA"), sharpness_factor=0.5)
with pytest.raises(ValueError, match="is not non-negative"):
F.adjust_sharpness(make_image(), sharpness_factor=-1)
@pytest.mark.parametrize("sharpness_factor", [0.1, 0.5, 1.0])
@pytest.mark.parametrize(
"fn", [F.adjust_sharpness, transform_cls_to_functional(transforms.RandomAdjustSharpness, p=1)]
)
def test_correctness_image(self, sharpness_factor, fn):
image = make_image(dtype=torch.uint8, device="cpu")
actual = fn(image, sharpness_factor=sharpness_factor)
expected = F.to_image(F.adjust_sharpness(F.to_pil_image(image), sharpness_factor=sharpness_factor))
assert_equal(actual, expected)
class TestAdjustContrast:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.adjust_contrast_image, make_image(dtype=dtype, device=device), contrast_factor=0.5)
def test_kernel_video(self):
check_kernel(F.adjust_contrast_video, make_video(), contrast_factor=0.5)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.adjust_contrast, make_input(), contrast_factor=0.5)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.adjust_contrast_image, torch.Tensor),
(F._adjust_contrast_image_pil, PIL.Image.Image),
(F.adjust_contrast_image, tv_tensors.Image),
(F.adjust_contrast_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.adjust_contrast, kernel=kernel, input_type=input_type)
def test_functional_error(self):
with pytest.raises(TypeError, match="permitted channel values are 1 or 3"):
F.adjust_contrast(make_image(color_space="RGBA"), contrast_factor=0.5)
with pytest.raises(ValueError, match="is not non-negative"):
F.adjust_contrast(make_image(), contrast_factor=-1)
@pytest.mark.parametrize("contrast_factor", [0.1, 0.5, 1.0])
def test_correctness_image(self, contrast_factor):
image = make_image(dtype=torch.uint8, device="cpu")
actual = F.adjust_contrast(image, contrast_factor=contrast_factor)
expected = F.to_image(F.adjust_contrast(F.to_pil_image(image), contrast_factor=contrast_factor))
assert_close(actual, expected, rtol=0, atol=1)
class TestAdjustGamma:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.adjust_gamma_image, make_image(dtype=dtype, device=device), gamma=0.5)
def test_kernel_video(self):
check_kernel(F.adjust_gamma_video, make_video(), gamma=0.5)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.adjust_gamma, make_input(), gamma=0.5)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.adjust_gamma_image, torch.Tensor),
(F._adjust_gamma_image_pil, PIL.Image.Image),
(F.adjust_gamma_image, tv_tensors.Image),
(F.adjust_gamma_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.adjust_gamma, kernel=kernel, input_type=input_type)
def test_functional_error(self):
with pytest.raises(ValueError, match="Gamma should be a non-negative real number"):
F.adjust_gamma(make_image(), gamma=-1)
@pytest.mark.parametrize("gamma", [0.1, 0.5, 1.0])
@pytest.mark.parametrize("gain", [0.1, 1.0, 2.0])
def test_correctness_image(self, gamma, gain):
image = make_image(dtype=torch.uint8, device="cpu")
actual = F.adjust_gamma(image, gamma=gamma, gain=gain)
expected = F.to_image(F.adjust_gamma(F.to_pil_image(image), gamma=gamma, gain=gain))
assert_equal(actual, expected)
class TestAdjustHue:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.adjust_hue_image, make_image(dtype=dtype, device=device), hue_factor=0.25)
def test_kernel_video(self):
check_kernel(F.adjust_hue_video, make_video(), hue_factor=0.25)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.adjust_hue, make_input(), hue_factor=0.25)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.adjust_hue_image, torch.Tensor),
(F._adjust_hue_image_pil, PIL.Image.Image),
(F.adjust_hue_image, tv_tensors.Image),
(F.adjust_hue_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.adjust_hue, kernel=kernel, input_type=input_type)
def test_functional_error(self):
with pytest.raises(TypeError, match="permitted channel values are 1 or 3"):
F.adjust_hue(make_image(color_space="RGBA"), hue_factor=0.25)
for hue_factor in [-1, 1]:
with pytest.raises(ValueError, match=re.escape("is not in [-0.5, 0.5]")):
F.adjust_hue(make_image(), hue_factor=hue_factor)
@pytest.mark.parametrize("hue_factor", [-0.5, -0.3, 0.0, 0.2, 0.5])
def test_correctness_image(self, hue_factor):
image = make_image(dtype=torch.uint8, device="cpu")
actual = F.adjust_hue(image, hue_factor=hue_factor)
expected = F.to_image(F.adjust_hue(F.to_pil_image(image), hue_factor=hue_factor))
mae = (actual.float() - expected.float()).abs().mean()
assert mae < 2
class TestAdjustSaturation:
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
def test_kernel_image(self, dtype, device):
check_kernel(F.adjust_saturation_image, make_image(dtype=dtype, device=device), saturation_factor=0.5)
def test_kernel_video(self):
check_kernel(F.adjust_saturation_video, make_video(), saturation_factor=0.5)
@pytest.mark.parametrize("make_input", [make_image_tensor, make_image, make_image_pil, make_video])
def test_functional(self, make_input):
check_functional(F.adjust_saturation, make_input(), saturation_factor=0.5)
@pytest.mark.parametrize(
("kernel", "input_type"),
[
(F.adjust_saturation_image, torch.Tensor),
(F._adjust_saturation_image_pil, PIL.Image.Image),
(F.adjust_saturation_image, tv_tensors.Image),
(F.adjust_saturation_video, tv_tensors.Video),
],
)
def test_functional_signature(self, kernel, input_type):
check_functional_kernel_signature_match(F.adjust_saturation, kernel=kernel, input_type=input_type)
def test_functional_error(self):
with pytest.raises(TypeError, match="permitted channel values are 1 or 3"):
F.adjust_saturation(make_image(color_space="RGBA"), saturation_factor=0.5)
with pytest.raises(ValueError, match="is not non-negative"):
F.adjust_saturation(make_image(), saturation_factor=-1)
@pytest.mark.parametrize("saturation_factor", [0.1, 0.5, 1.0])
def test_correctness_image(self, saturation_factor):
image = make_image(dtype=torch.uint8, device="cpu")
actual = F.adjust_saturation(image, saturation_factor=saturation_factor)
expected = F.to_image(F.adjust_saturation(F.to_pil_image(image), saturation_factor=saturation_factor))
assert_close(actual, expected, rtol=0, atol=1)
class TestFiveTenCrop:
INPUT_SIZE = (17, 11)
OUTPUT_SIZE = (3, 5)
@pytest.mark.parametrize("dtype", [torch.uint8, torch.float32])
@pytest.mark.parametrize("device", cpu_and_cuda())
@pytest.mark.parametrize("kernel", [F.five_crop_image, F.ten_crop_image])
def test_kernel_image(self, dtype, device, kernel):
check_kernel(
kernel,
make_image(self.INPUT_SIZE, dtype=dtype, device=device),
size=self.OUTPUT_SIZE,
check_batched_vs_unbatched=False,
)
@pytest.mark.parametrize("kernel", [F.five_crop_video, F.ten_crop_video])
def test_kernel_video(self, kernel):
check_kernel(kernel, make_video(self.INPUT_SIZE), size=self.OUTPUT_SIZE, check_batched_vs_unbatched=False)
def _functional_wrapper(self, fn):
# This wrapper is needed to make five_crop / ten_crop compatible with check_functional, since that requires a
# single output rather than a sequence.
@functools.wraps(fn)
def wrapper(*args, **kwargs):
outputs = fn(*args, **kwargs)
return outputs[0]
return wrapper
@pytest.mark.parametrize(
"make_input",
[make_image_tensor, make_image_pil, make_image, make_video],
)
@pytest.mark.parametrize("functional", [F.five_crop, F.ten_crop])
def test_functional(self, make_input, functional):
check_functional(
self._functional_wrapper(functional),
make_input(self.INPUT_SIZE),
size=self.OUTPUT_SIZE,
check_scripted_smoke=False,
)
@pytest.mark.parametrize(
("functional", "kernel", "input_type"),
[
(F.five_crop, F.five_crop_image, torch.Tensor),
(F.five_crop, F._five_crop_image_pil, PIL.Image.Image),
(F.five_crop, F.five_crop_image, tv_tensors.Image),
(F.five_crop, F.five_crop_video, tv_tensors.Video),
(F.ten_crop, F.ten_crop_image, torch.Tensor),
(F.ten_crop, F._ten_crop_image_pil, PIL.Image.Image),
(F.ten_crop, F.ten_crop_image, tv_tensors.Image),
(F.ten_crop, F.ten_crop_video, tv_tensors.Video),
],
)
def test_functional_signature(self, functional, kernel, input_type):
check_functional_kernel_signature_match(functional, kernel=kernel, input_type=input_type)
class _TransformWrapper(nn.Module):
# This wrapper is needed to make FiveCrop / TenCrop compatible with check_transform, since that requires a
# single output rather than a sequence.
_v1_transform_cls = None
def _extract_params_for_v1_transform(self):
return dict(five_ten_crop_transform=self.five_ten_crop_transform)
def __init__(self, five_ten_crop_transform):
super().__init__()
type(self)._v1_transform_cls = type(self)
self.five_ten_crop_transform = five_ten_crop_transform
def forward(self, input: torch.Tensor) -> torch.Tensor:
outputs = self.five_ten_crop_transform(input)
return outputs[0]
@pytest.mark.parametrize(
"make_input",
[make_image_tensor, make_image_pil, make_image, make_video],
)
@pytest.mark.parametrize("transform_cls", [transforms.FiveCrop, transforms.TenCrop])
def test_transform(self, make_input, transform_cls):
check_transform(self._TransformWrapper(transform_cls(size=self.OUTPUT_SIZE)), make_input(self.INPUT_SIZE))
@pytest.mark.parametrize("make_input", [make_bounding_boxes, make_detection_mask])
@pytest.mark.parametrize("transform_cls", [transforms.FiveCrop, transforms.TenCrop])
def test_transform_error(self, make_input, transform_cls):
transform = transform_cls(size=self.OUTPUT_SIZE)
with pytest.raises(TypeError, match="not supported"):
transform(make_input(self.INPUT_SIZE))
@pytest.mark.parametrize("fn", [F.five_crop, transform_cls_to_functional(transforms.FiveCrop)])
def test_correctness_image_five_crop(self, fn):
image = make_image(self.INPUT_SIZE, dtype=torch.uint8, device="cpu")
actual = fn(image, size=self.OUTPUT_SIZE)
expected = F.five_crop(F.to_pil_image(image), size=self.OUTPUT_SIZE)
assert isinstance(actual, tuple)
assert_equal(actual, [F.to_image(e) for e in expected])
@pytest.mark.parametrize("fn_or_class", [F.ten_crop, transforms.TenCrop])
@pytest.mark.parametrize("vertical_flip", [False, True])
def test_correctness_image_ten_crop(self, fn_or_class, vertical_flip):
if fn_or_class is transforms.TenCrop:
fn = transform_cls_to_functional(fn_or_class, size=self.OUTPUT_SIZE, vertical_flip=vertical_flip)
kwargs = dict()
else:
fn = fn_or_class
kwargs = dict(size=self.OUTPUT_SIZE, vertical_flip=vertical_flip)
image = make_image(self.INPUT_SIZE, dtype=torch.uint8, device="cpu")
actual = fn(image, **kwargs)
expected = F.ten_crop(F.to_pil_image(image), size=self.OUTPUT_SIZE, vertical_flip=vertical_flip)
assert isinstance(actual, tuple)
assert_equal(actual, [F.to_image(e) for e in expected])
class TestColorJitter: class TestColorJitter:
@pytest.mark.parametrize( @pytest.mark.parametrize(
"make_input", "make_input",
......
import pytest
import torchvision.transforms.v2.functional as F
from torchvision import tv_tensors
from transforms_v2_kernel_infos import KERNEL_INFOS
from transforms_v2_legacy_utils import InfoBase, TestMark
__all__ = ["DispatcherInfo", "DISPATCHER_INFOS"]
class PILKernelInfo(InfoBase):
def __init__(
self,
kernel,
*,
# Defaults to `kernel.__name__`. Should be set if the function is exposed under a different name
# TODO: This can probably be removed after roll-out since we shouldn't have any aliasing then
kernel_name=None,
):
super().__init__(id=kernel_name or kernel.__name__)
self.kernel = kernel
class DispatcherInfo(InfoBase):
_KERNEL_INFO_MAP = {info.kernel: info for info in KERNEL_INFOS}
def __init__(
self,
dispatcher,
*,
# Dictionary of types that map to the kernel the dispatcher dispatches to.
kernels,
# If omitted, no PIL dispatch test will be performed.
pil_kernel_info=None,
# See InfoBase
test_marks=None,
# See InfoBase
closeness_kwargs=None,
):
super().__init__(id=dispatcher.__name__, test_marks=test_marks, closeness_kwargs=closeness_kwargs)
self.dispatcher = dispatcher
self.kernels = kernels
self.pil_kernel_info = pil_kernel_info
kernel_infos = {}
for tv_tensor_type, kernel in self.kernels.items():
kernel_info = self._KERNEL_INFO_MAP.get(kernel)
if not kernel_info:
raise pytest.UsageError(
f"Can't register {kernel.__name__} for type {tv_tensor_type} since there is no `KernelInfo` for it. "
f"Please add a `KernelInfo` for it in `transforms_v2_kernel_infos.py`."
)
kernel_infos[tv_tensor_type] = kernel_info
self.kernel_infos = kernel_infos
def sample_inputs(self, *tv_tensor_types, filter_metadata=True):
for tv_tensor_type in tv_tensor_types or self.kernel_infos.keys():
kernel_info = self.kernel_infos.get(tv_tensor_type)
if not kernel_info:
raise pytest.UsageError(f"There is no kernel registered for type {type.__name__}")
sample_inputs = kernel_info.sample_inputs_fn()
if not filter_metadata:
yield from sample_inputs
return
import itertools
for args_kwargs in sample_inputs:
if hasattr(tv_tensor_type, "__annotations__"):
for name in itertools.chain(
tv_tensor_type.__annotations__.keys(),
# FIXME: this seems ok for conversion dispatchers, but we should probably handle this on a
# per-dispatcher level. However, so far there is no option for that.
(f"old_{name}" for name in tv_tensor_type.__annotations__.keys()),
):
if name in args_kwargs.kwargs:
del args_kwargs.kwargs[name]
yield args_kwargs
def xfail_jit(reason, *, condition=None):
return TestMark(
("TestDispatchers", "test_scripted_smoke"),
pytest.mark.xfail(reason=reason),
condition=condition,
)
def xfail_jit_python_scalar_arg(name, *, reason=None):
return xfail_jit(
reason or f"Python scalar int or float for `{name}` is not supported when scripting",
condition=lambda args_kwargs: isinstance(args_kwargs.kwargs.get(name), (int, float)),
)
skip_dispatch_tv_tensor = TestMark(
("TestDispatchers", "test_dispatch_tv_tensor"),
pytest.mark.skip(reason="Dispatcher doesn't support arbitrary tv_tensor dispatch."),
)
multi_crop_skips = [
TestMark(
("TestDispatchers", test_name),
pytest.mark.skip(reason="Multi-crop dispatchers return a sequence of items rather than a single one."),
)
for test_name in ["test_pure_tensor_output_type", "test_pil_output_type", "test_tv_tensor_output_type"]
]
multi_crop_skips.append(skip_dispatch_tv_tensor)
DISPATCHER_INFOS = [
DispatcherInfo(
F.elastic,
kernels={
tv_tensors.Image: F.elastic_image,
tv_tensors.Video: F.elastic_video,
tv_tensors.BoundingBoxes: F.elastic_bounding_boxes,
tv_tensors.Mask: F.elastic_mask,
},
pil_kernel_info=PILKernelInfo(F._elastic_image_pil),
test_marks=[xfail_jit_python_scalar_arg("fill")],
),
DispatcherInfo(
F.equalize,
kernels={
tv_tensors.Image: F.equalize_image,
tv_tensors.Video: F.equalize_video,
},
pil_kernel_info=PILKernelInfo(F._equalize_image_pil, kernel_name="equalize_image_pil"),
),
DispatcherInfo(
F.invert,
kernels={
tv_tensors.Image: F.invert_image,
tv_tensors.Video: F.invert_video,
},
pil_kernel_info=PILKernelInfo(F._invert_image_pil, kernel_name="invert_image_pil"),
),
DispatcherInfo(
F.posterize,
kernels={
tv_tensors.Image: F.posterize_image,
tv_tensors.Video: F.posterize_video,
},
pil_kernel_info=PILKernelInfo(F._posterize_image_pil, kernel_name="posterize_image_pil"),
),
DispatcherInfo(
F.solarize,
kernels={
tv_tensors.Image: F.solarize_image,
tv_tensors.Video: F.solarize_video,
},
pil_kernel_info=PILKernelInfo(F._solarize_image_pil, kernel_name="solarize_image_pil"),
),
DispatcherInfo(
F.autocontrast,
kernels={
tv_tensors.Image: F.autocontrast_image,
tv_tensors.Video: F.autocontrast_video,
},
pil_kernel_info=PILKernelInfo(F._autocontrast_image_pil, kernel_name="autocontrast_image_pil"),
),
DispatcherInfo(
F.adjust_sharpness,
kernels={
tv_tensors.Image: F.adjust_sharpness_image,
tv_tensors.Video: F.adjust_sharpness_video,
},
pil_kernel_info=PILKernelInfo(F._adjust_sharpness_image_pil, kernel_name="adjust_sharpness_image_pil"),
),
DispatcherInfo(
F.adjust_contrast,
kernels={
tv_tensors.Image: F.adjust_contrast_image,
tv_tensors.Video: F.adjust_contrast_video,
},
pil_kernel_info=PILKernelInfo(F._adjust_contrast_image_pil, kernel_name="adjust_contrast_image_pil"),
),
DispatcherInfo(
F.adjust_gamma,
kernels={
tv_tensors.Image: F.adjust_gamma_image,
tv_tensors.Video: F.adjust_gamma_video,
},
pil_kernel_info=PILKernelInfo(F._adjust_gamma_image_pil, kernel_name="adjust_gamma_image_pil"),
),
DispatcherInfo(
F.adjust_hue,
kernels={
tv_tensors.Image: F.adjust_hue_image,
tv_tensors.Video: F.adjust_hue_video,
},
pil_kernel_info=PILKernelInfo(F._adjust_hue_image_pil, kernel_name="adjust_hue_image_pil"),
),
DispatcherInfo(
F.adjust_saturation,
kernels={
tv_tensors.Image: F.adjust_saturation_image,
tv_tensors.Video: F.adjust_saturation_video,
},
pil_kernel_info=PILKernelInfo(F._adjust_saturation_image_pil, kernel_name="adjust_saturation_image_pil"),
),
DispatcherInfo(
F.five_crop,
kernels={
tv_tensors.Image: F.five_crop_image,
tv_tensors.Video: F.five_crop_video,
},
pil_kernel_info=PILKernelInfo(F._five_crop_image_pil),
test_marks=[
xfail_jit_python_scalar_arg("size"),
*multi_crop_skips,
],
),
DispatcherInfo(
F.ten_crop,
kernels={
tv_tensors.Image: F.ten_crop_image,
tv_tensors.Video: F.ten_crop_video,
},
test_marks=[
xfail_jit_python_scalar_arg("size"),
*multi_crop_skips,
],
pil_kernel_info=PILKernelInfo(F._ten_crop_image_pil),
),
DispatcherInfo(
F.normalize,
kernels={
tv_tensors.Image: F.normalize_image,
tv_tensors.Video: F.normalize_video,
},
test_marks=[
xfail_jit_python_scalar_arg("mean"),
xfail_jit_python_scalar_arg("std"),
],
),
DispatcherInfo(
F.uniform_temporal_subsample,
kernels={
tv_tensors.Video: F.uniform_temporal_subsample_video,
},
test_marks=[
skip_dispatch_tv_tensor,
],
),
DispatcherInfo(
F.clamp_bounding_boxes,
kernels={tv_tensors.BoundingBoxes: F.clamp_bounding_boxes},
test_marks=[
skip_dispatch_tv_tensor,
],
),
]
import functools
import itertools
import PIL.Image
import pytest
import torch.testing
import torchvision.transforms.v2.functional as F
from torchvision.transforms._functional_tensor import _max_value as get_max_value
from transforms_v2_legacy_utils import (
ArgsKwargs,
DEFAULT_PORTRAIT_SPATIAL_SIZE,
get_num_channels,
ImageLoader,
InfoBase,
make_bounding_box_loaders,
make_image_loader,
make_image_loaders,
make_image_loaders_for_interpolation,
make_mask_loaders,
make_video_loaders,
mark_framework_limitation,
TestMark,
)
__all__ = ["KernelInfo", "KERNEL_INFOS"]
class KernelInfo(InfoBase):
def __init__(
self,
kernel,
*,
# Defaults to `kernel.__name__`. Should be set if the function is exposed under a different name
# TODO: This can probably be removed after roll-out since we shouldn't have any aliasing then
kernel_name=None,
# Most common tests use these inputs to check the kernel. As such it should cover all valid code paths, but
# should not include extensive parameter combinations to keep to overall test count moderate.
sample_inputs_fn,
# This function should mirror the kernel. It should have the same signature as the `kernel` and as such also
# take tensors as inputs. Any conversion into another object type, e.g. PIL images or numpy arrays, should
# happen inside the function. It should return a tensor or to be more precise an object that can be compared to
# a tensor by `assert_close`. If omitted, no reference test will be performed.
reference_fn=None,
# These inputs are only used for the reference tests and thus can be comprehensive with regard to the parameter
# values to be tested. If not specified, `sample_inputs_fn` will be used.
reference_inputs_fn=None,
# If true-ish, triggers a test that checks the kernel for consistency between uint8 and float32 inputs with the
# reference inputs. This is usually used whenever we use a PIL kernel as reference.
# Can be a callable in which case it will be called with `other_args, kwargs`. It should return the same
# structure, but with adapted parameters. This is useful in case a parameter value is closely tied to the input
# dtype.
float32_vs_uint8=False,
# Some kernels don't have dispatchers that would handle logging the usage. Thus, the kernel has to do it
# manually. If set, triggers a test that makes sure this happens.
logs_usage=False,
# See InfoBase
test_marks=None,
# See InfoBase
closeness_kwargs=None,
):
super().__init__(id=kernel_name or kernel.__name__, test_marks=test_marks, closeness_kwargs=closeness_kwargs)
self.kernel = kernel
self.sample_inputs_fn = sample_inputs_fn
self.reference_fn = reference_fn
self.reference_inputs_fn = reference_inputs_fn
if float32_vs_uint8 and not callable(float32_vs_uint8):
float32_vs_uint8 = lambda other_args, kwargs: (other_args, kwargs) # noqa: E731
self.float32_vs_uint8 = float32_vs_uint8
self.logs_usage = logs_usage
def pixel_difference_closeness_kwargs(uint8_atol, *, dtype=torch.uint8, mae=False):
return dict(atol=uint8_atol / 255 * get_max_value(dtype), rtol=0, mae=mae)
def cuda_vs_cpu_pixel_difference(atol=1):
return {
(("TestKernels", "test_cuda_vs_cpu"), dtype, "cuda"): pixel_difference_closeness_kwargs(atol, dtype=dtype)
for dtype in [torch.uint8, torch.float32]
}
def pil_reference_pixel_difference(atol=1, mae=False):
return {
(("TestKernels", "test_against_reference"), torch.uint8, "cpu"): pixel_difference_closeness_kwargs(
atol, mae=mae
)
}
def float32_vs_uint8_pixel_difference(atol=1, mae=False):
return {
(
("TestKernels", "test_float32_vs_uint8"),
torch.float32,
"cpu",
): pixel_difference_closeness_kwargs(atol, dtype=torch.float32, mae=mae)
}
def scripted_vs_eager_float64_tolerances(device, atol=1e-6, rtol=1e-6):
return {
(("TestKernels", "test_scripted_vs_eager"), torch.float64, device): {"atol": atol, "rtol": rtol, "mae": False},
}
def pil_reference_wrapper(pil_kernel):
@functools.wraps(pil_kernel)
def wrapper(input_tensor, *other_args, **kwargs):
if input_tensor.dtype != torch.uint8:
raise pytest.UsageError(f"Can only test uint8 tensor images against PIL, but input is {input_tensor.dtype}")
if input_tensor.ndim > 3:
raise pytest.UsageError(
f"Can only test single tensor images against PIL, but input has shape {input_tensor.shape}"
)
input_pil = F.to_pil_image(input_tensor)
output_pil = pil_kernel(input_pil, *other_args, **kwargs)
if not isinstance(output_pil, PIL.Image.Image):
return output_pil
output_tensor = F.to_image(output_pil)
# 2D mask shenanigans
if output_tensor.ndim == 2 and input_tensor.ndim == 3:
output_tensor = output_tensor.unsqueeze(0)
elif output_tensor.ndim == 3 and input_tensor.ndim == 2:
output_tensor = output_tensor.squeeze(0)
return output_tensor
return wrapper
def xfail_jit(reason, *, condition=None):
return TestMark(("TestKernels", "test_scripted_vs_eager"), pytest.mark.xfail(reason=reason), condition=condition)
def xfail_jit_python_scalar_arg(name, *, reason=None):
return xfail_jit(
reason or f"Python scalar int or float for `{name}` is not supported when scripting",
condition=lambda args_kwargs: isinstance(args_kwargs.kwargs.get(name), (int, float)),
)
KERNEL_INFOS = []
def get_fills(*, num_channels, dtype):
yield None
int_value = get_max_value(dtype)
float_value = int_value / 2
yield int_value
yield float_value
for vector_type in [list, tuple]:
yield vector_type([int_value])
yield vector_type([float_value])
if num_channels > 1:
yield vector_type(float_value * c / 10 for c in range(num_channels))
yield vector_type(int_value if c % 2 == 0 else 0 for c in range(num_channels))
def float32_vs_uint8_fill_adapter(other_args, kwargs):
fill = kwargs.get("fill")
if fill is None:
return other_args, kwargs
if isinstance(fill, (int, float)):
fill /= 255
else:
fill = type(fill)(fill_ / 255 for fill_ in fill)
return other_args, dict(kwargs, fill=fill)
def _get_elastic_displacement(canvas_size):
return torch.rand(1, *canvas_size, 2)
def sample_inputs_elastic_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE]):
displacement = _get_elastic_displacement(image_loader.canvas_size)
for fill in get_fills(num_channels=image_loader.num_channels, dtype=image_loader.dtype):
yield ArgsKwargs(image_loader, displacement=displacement, fill=fill)
def reference_inputs_elastic_image_tensor():
for image_loader, interpolation in itertools.product(
make_image_loaders_for_interpolation(),
[
F.InterpolationMode.NEAREST,
F.InterpolationMode.BILINEAR,
F.InterpolationMode.BICUBIC,
],
):
displacement = _get_elastic_displacement(image_loader.canvas_size)
for fill in get_fills(num_channels=image_loader.num_channels, dtype=image_loader.dtype):
yield ArgsKwargs(image_loader, interpolation=interpolation, displacement=displacement, fill=fill)
def sample_inputs_elastic_bounding_boxes():
for bounding_boxes_loader in make_bounding_box_loaders():
displacement = _get_elastic_displacement(bounding_boxes_loader.canvas_size)
yield ArgsKwargs(
bounding_boxes_loader,
format=bounding_boxes_loader.format,
canvas_size=bounding_boxes_loader.canvas_size,
displacement=displacement,
)
def sample_inputs_elastic_mask():
for mask_loader in make_mask_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE]):
displacement = _get_elastic_displacement(mask_loader.shape[-2:])
yield ArgsKwargs(mask_loader, displacement=displacement)
def sample_inputs_elastic_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
displacement = _get_elastic_displacement(video_loader.shape[-2:])
yield ArgsKwargs(video_loader, displacement=displacement)
KERNEL_INFOS.extend(
[
KernelInfo(
F.elastic_image,
sample_inputs_fn=sample_inputs_elastic_image_tensor,
reference_inputs_fn=reference_inputs_elastic_image_tensor,
float32_vs_uint8=float32_vs_uint8_fill_adapter,
closeness_kwargs={
**float32_vs_uint8_pixel_difference(6, mae=True),
**cuda_vs_cpu_pixel_difference(),
},
test_marks=[xfail_jit_python_scalar_arg("fill")],
),
KernelInfo(
F.elastic_bounding_boxes,
sample_inputs_fn=sample_inputs_elastic_bounding_boxes,
),
KernelInfo(
F.elastic_mask,
sample_inputs_fn=sample_inputs_elastic_mask,
),
KernelInfo(
F.elastic_video,
sample_inputs_fn=sample_inputs_elastic_video,
closeness_kwargs=cuda_vs_cpu_pixel_difference(),
),
]
)
def sample_inputs_equalize_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader)
def reference_inputs_equalize_image_tensor():
# We are not using `make_image_loaders` here since that uniformly samples the values over the whole value range.
# Since the whole point of this kernel is to transform an arbitrary distribution of values into a uniform one,
# the information gain is low if we already provide something really close to the expected value.
def make_uniform_band_image(shape, dtype, device, *, low_factor, high_factor, memory_format):
if dtype.is_floating_point:
low = low_factor
high = high_factor
else:
max_value = torch.iinfo(dtype).max
low = int(low_factor * max_value)
high = int(high_factor * max_value)
return torch.testing.make_tensor(shape, dtype=dtype, device=device, low=low, high=high).to(
memory_format=memory_format, copy=True
)
def make_beta_distributed_image(shape, dtype, device, *, alpha, beta, memory_format):
image = torch.distributions.Beta(alpha, beta).sample(shape)
if not dtype.is_floating_point:
image.mul_(torch.iinfo(dtype).max).round_()
return image.to(dtype=dtype, device=device, memory_format=memory_format, copy=True)
canvas_size = (256, 256)
for dtype, color_space, fn in itertools.product(
[torch.uint8],
["GRAY", "RGB"],
[
lambda shape, dtype, device, memory_format: torch.zeros(shape, dtype=dtype, device=device).to(
memory_format=memory_format, copy=True
),
lambda shape, dtype, device, memory_format: torch.full(
shape, 1.0 if dtype.is_floating_point else torch.iinfo(dtype).max, dtype=dtype, device=device
).to(memory_format=memory_format, copy=True),
*[
functools.partial(make_uniform_band_image, low_factor=low_factor, high_factor=high_factor)
for low_factor, high_factor in [
(0.0, 0.25),
(0.25, 0.75),
(0.75, 1.0),
]
],
*[
functools.partial(make_beta_distributed_image, alpha=alpha, beta=beta)
for alpha, beta in [
(0.5, 0.5),
(2, 2),
(2, 5),
(5, 2),
]
],
],
):
image_loader = ImageLoader(fn, shape=(get_num_channels(color_space), *canvas_size), dtype=dtype)
yield ArgsKwargs(image_loader)
def sample_inputs_equalize_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader)
KERNEL_INFOS.extend(
[
KernelInfo(
F.equalize_image,
kernel_name="equalize_image_tensor",
sample_inputs_fn=sample_inputs_equalize_image_tensor,
reference_fn=pil_reference_wrapper(F._equalize_image_pil),
float32_vs_uint8=True,
reference_inputs_fn=reference_inputs_equalize_image_tensor,
),
KernelInfo(
F.equalize_video,
sample_inputs_fn=sample_inputs_equalize_video,
),
]
)
def sample_inputs_invert_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader)
def reference_inputs_invert_image_tensor():
for image_loader in make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]):
yield ArgsKwargs(image_loader)
def sample_inputs_invert_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader)
KERNEL_INFOS.extend(
[
KernelInfo(
F.invert_image,
kernel_name="invert_image_tensor",
sample_inputs_fn=sample_inputs_invert_image_tensor,
reference_fn=pil_reference_wrapper(F._invert_image_pil),
reference_inputs_fn=reference_inputs_invert_image_tensor,
float32_vs_uint8=True,
),
KernelInfo(
F.invert_video,
sample_inputs_fn=sample_inputs_invert_video,
),
]
)
_POSTERIZE_BITS = [1, 4, 8]
def sample_inputs_posterize_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader, bits=_POSTERIZE_BITS[0])
def reference_inputs_posterize_image_tensor():
for image_loader, bits in itertools.product(
make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]),
_POSTERIZE_BITS,
):
yield ArgsKwargs(image_loader, bits=bits)
def sample_inputs_posterize_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader, bits=_POSTERIZE_BITS[0])
KERNEL_INFOS.extend(
[
KernelInfo(
F.posterize_image,
kernel_name="posterize_image_tensor",
sample_inputs_fn=sample_inputs_posterize_image_tensor,
reference_fn=pil_reference_wrapper(F._posterize_image_pil),
reference_inputs_fn=reference_inputs_posterize_image_tensor,
float32_vs_uint8=True,
closeness_kwargs=float32_vs_uint8_pixel_difference(),
),
KernelInfo(
F.posterize_video,
sample_inputs_fn=sample_inputs_posterize_video,
),
]
)
def _get_solarize_thresholds(dtype):
for factor in [0.1, 0.5]:
max_value = get_max_value(dtype)
yield (float if dtype.is_floating_point else int)(max_value * factor)
def sample_inputs_solarize_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader, threshold=next(_get_solarize_thresholds(image_loader.dtype)))
def reference_inputs_solarize_image_tensor():
for image_loader in make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]):
for threshold in _get_solarize_thresholds(image_loader.dtype):
yield ArgsKwargs(image_loader, threshold=threshold)
def uint8_to_float32_threshold_adapter(other_args, kwargs):
return other_args, dict(threshold=kwargs["threshold"] / 255)
def sample_inputs_solarize_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader, threshold=next(_get_solarize_thresholds(video_loader.dtype)))
KERNEL_INFOS.extend(
[
KernelInfo(
F.solarize_image,
kernel_name="solarize_image_tensor",
sample_inputs_fn=sample_inputs_solarize_image_tensor,
reference_fn=pil_reference_wrapper(F._solarize_image_pil),
reference_inputs_fn=reference_inputs_solarize_image_tensor,
float32_vs_uint8=uint8_to_float32_threshold_adapter,
closeness_kwargs=float32_vs_uint8_pixel_difference(),
),
KernelInfo(
F.solarize_video,
sample_inputs_fn=sample_inputs_solarize_video,
),
]
)
def sample_inputs_autocontrast_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader)
def reference_inputs_autocontrast_image_tensor():
for image_loader in make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]):
yield ArgsKwargs(image_loader)
def sample_inputs_autocontrast_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader)
KERNEL_INFOS.extend(
[
KernelInfo(
F.autocontrast_image,
kernel_name="autocontrast_image_tensor",
sample_inputs_fn=sample_inputs_autocontrast_image_tensor,
reference_fn=pil_reference_wrapper(F._autocontrast_image_pil),
reference_inputs_fn=reference_inputs_autocontrast_image_tensor,
float32_vs_uint8=True,
closeness_kwargs={
**pil_reference_pixel_difference(),
**float32_vs_uint8_pixel_difference(),
},
),
KernelInfo(
F.autocontrast_video,
sample_inputs_fn=sample_inputs_autocontrast_video,
),
]
)
_ADJUST_SHARPNESS_FACTORS = [0.1, 0.5]
def sample_inputs_adjust_sharpness_image_tensor():
for image_loader in make_image_loaders(
sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE, (2, 2)],
color_spaces=("GRAY", "RGB"),
):
yield ArgsKwargs(image_loader, sharpness_factor=_ADJUST_SHARPNESS_FACTORS[0])
def reference_inputs_adjust_sharpness_image_tensor():
for image_loader, sharpness_factor in itertools.product(
make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]),
_ADJUST_SHARPNESS_FACTORS,
):
yield ArgsKwargs(image_loader, sharpness_factor=sharpness_factor)
def sample_inputs_adjust_sharpness_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader, sharpness_factor=_ADJUST_SHARPNESS_FACTORS[0])
KERNEL_INFOS.extend(
[
KernelInfo(
F.adjust_sharpness_image,
kernel_name="adjust_sharpness_image_tensor",
sample_inputs_fn=sample_inputs_adjust_sharpness_image_tensor,
reference_fn=pil_reference_wrapper(F._adjust_sharpness_image_pil),
reference_inputs_fn=reference_inputs_adjust_sharpness_image_tensor,
float32_vs_uint8=True,
closeness_kwargs=float32_vs_uint8_pixel_difference(2),
),
KernelInfo(
F.adjust_sharpness_video,
sample_inputs_fn=sample_inputs_adjust_sharpness_video,
),
]
)
_ADJUST_CONTRAST_FACTORS = [0.1, 0.5]
def sample_inputs_adjust_contrast_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader, contrast_factor=_ADJUST_CONTRAST_FACTORS[0])
def reference_inputs_adjust_contrast_image_tensor():
for image_loader, contrast_factor in itertools.product(
make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]),
_ADJUST_CONTRAST_FACTORS,
):
yield ArgsKwargs(image_loader, contrast_factor=contrast_factor)
def sample_inputs_adjust_contrast_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader, contrast_factor=_ADJUST_CONTRAST_FACTORS[0])
KERNEL_INFOS.extend(
[
KernelInfo(
F.adjust_contrast_image,
kernel_name="adjust_contrast_image_tensor",
sample_inputs_fn=sample_inputs_adjust_contrast_image_tensor,
reference_fn=pil_reference_wrapper(F._adjust_contrast_image_pil),
reference_inputs_fn=reference_inputs_adjust_contrast_image_tensor,
float32_vs_uint8=True,
closeness_kwargs={
**pil_reference_pixel_difference(),
**float32_vs_uint8_pixel_difference(2),
**cuda_vs_cpu_pixel_difference(),
(("TestKernels", "test_against_reference"), torch.uint8, "cpu"): pixel_difference_closeness_kwargs(1),
},
),
KernelInfo(
F.adjust_contrast_video,
sample_inputs_fn=sample_inputs_adjust_contrast_video,
closeness_kwargs={
**cuda_vs_cpu_pixel_difference(),
(("TestKernels", "test_against_reference"), torch.uint8, "cpu"): pixel_difference_closeness_kwargs(1),
},
),
]
)
_ADJUST_GAMMA_GAMMAS_GAINS = [
(0.5, 2.0),
(0.0, 1.0),
]
def sample_inputs_adjust_gamma_image_tensor():
gamma, gain = _ADJUST_GAMMA_GAMMAS_GAINS[0]
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader, gamma=gamma, gain=gain)
def reference_inputs_adjust_gamma_image_tensor():
for image_loader, (gamma, gain) in itertools.product(
make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]),
_ADJUST_GAMMA_GAMMAS_GAINS,
):
yield ArgsKwargs(image_loader, gamma=gamma, gain=gain)
def sample_inputs_adjust_gamma_video():
gamma, gain = _ADJUST_GAMMA_GAMMAS_GAINS[0]
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader, gamma=gamma, gain=gain)
KERNEL_INFOS.extend(
[
KernelInfo(
F.adjust_gamma_image,
kernel_name="adjust_gamma_image_tensor",
sample_inputs_fn=sample_inputs_adjust_gamma_image_tensor,
reference_fn=pil_reference_wrapper(F._adjust_gamma_image_pil),
reference_inputs_fn=reference_inputs_adjust_gamma_image_tensor,
float32_vs_uint8=True,
closeness_kwargs={
**pil_reference_pixel_difference(),
**float32_vs_uint8_pixel_difference(),
},
),
KernelInfo(
F.adjust_gamma_video,
sample_inputs_fn=sample_inputs_adjust_gamma_video,
),
]
)
_ADJUST_HUE_FACTORS = [-0.1, 0.5]
def sample_inputs_adjust_hue_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader, hue_factor=_ADJUST_HUE_FACTORS[0])
def reference_inputs_adjust_hue_image_tensor():
for image_loader, hue_factor in itertools.product(
make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]),
_ADJUST_HUE_FACTORS,
):
yield ArgsKwargs(image_loader, hue_factor=hue_factor)
def sample_inputs_adjust_hue_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader, hue_factor=_ADJUST_HUE_FACTORS[0])
KERNEL_INFOS.extend(
[
KernelInfo(
F.adjust_hue_image,
kernel_name="adjust_hue_image_tensor",
sample_inputs_fn=sample_inputs_adjust_hue_image_tensor,
reference_fn=pil_reference_wrapper(F._adjust_hue_image_pil),
reference_inputs_fn=reference_inputs_adjust_hue_image_tensor,
float32_vs_uint8=True,
closeness_kwargs={
**pil_reference_pixel_difference(2, mae=True),
**float32_vs_uint8_pixel_difference(),
},
),
KernelInfo(
F.adjust_hue_video,
sample_inputs_fn=sample_inputs_adjust_hue_video,
),
]
)
_ADJUST_SATURATION_FACTORS = [0.1, 0.5]
def sample_inputs_adjust_saturation_image_tensor():
for image_loader in make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=("GRAY", "RGB")):
yield ArgsKwargs(image_loader, saturation_factor=_ADJUST_SATURATION_FACTORS[0])
def reference_inputs_adjust_saturation_image_tensor():
for image_loader, saturation_factor in itertools.product(
make_image_loaders(color_spaces=("GRAY", "RGB"), extra_dims=[()], dtypes=[torch.uint8]),
_ADJUST_SATURATION_FACTORS,
):
yield ArgsKwargs(image_loader, saturation_factor=saturation_factor)
def sample_inputs_adjust_saturation_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[3]):
yield ArgsKwargs(video_loader, saturation_factor=_ADJUST_SATURATION_FACTORS[0])
KERNEL_INFOS.extend(
[
KernelInfo(
F.adjust_saturation_image,
kernel_name="adjust_saturation_image_tensor",
sample_inputs_fn=sample_inputs_adjust_saturation_image_tensor,
reference_fn=pil_reference_wrapper(F._adjust_saturation_image_pil),
reference_inputs_fn=reference_inputs_adjust_saturation_image_tensor,
float32_vs_uint8=True,
closeness_kwargs={
**pil_reference_pixel_difference(),
**float32_vs_uint8_pixel_difference(2),
**cuda_vs_cpu_pixel_difference(),
},
),
KernelInfo(
F.adjust_saturation_video,
sample_inputs_fn=sample_inputs_adjust_saturation_video,
closeness_kwargs=cuda_vs_cpu_pixel_difference(),
),
]
)
def sample_inputs_clamp_bounding_boxes():
for bounding_boxes_loader in make_bounding_box_loaders():
yield ArgsKwargs(
bounding_boxes_loader,
format=bounding_boxes_loader.format,
canvas_size=bounding_boxes_loader.canvas_size,
)
KERNEL_INFOS.append(
KernelInfo(
F.clamp_bounding_boxes,
sample_inputs_fn=sample_inputs_clamp_bounding_boxes,
logs_usage=True,
)
)
_FIVE_TEN_CROP_SIZES = [7, (6,), [5], (6, 5), [7, 6]]
def _get_five_ten_crop_canvas_size(size):
if isinstance(size, int):
crop_height = crop_width = size
elif len(size) == 1:
crop_height = crop_width = size[0]
else:
crop_height, crop_width = size
return 2 * crop_height, 2 * crop_width
def sample_inputs_five_crop_image_tensor():
for size in _FIVE_TEN_CROP_SIZES:
for image_loader in make_image_loaders(
sizes=[_get_five_ten_crop_canvas_size(size)],
color_spaces=["RGB"],
dtypes=[torch.float32],
):
yield ArgsKwargs(image_loader, size=size)
def reference_inputs_five_crop_image_tensor():
for size in _FIVE_TEN_CROP_SIZES:
for image_loader in make_image_loaders(
sizes=[_get_five_ten_crop_canvas_size(size)], extra_dims=[()], dtypes=[torch.uint8]
):
yield ArgsKwargs(image_loader, size=size)
def sample_inputs_five_crop_video():
size = _FIVE_TEN_CROP_SIZES[0]
for video_loader in make_video_loaders(sizes=[_get_five_ten_crop_canvas_size(size)]):
yield ArgsKwargs(video_loader, size=size)
def sample_inputs_ten_crop_image_tensor():
for size, vertical_flip in itertools.product(_FIVE_TEN_CROP_SIZES, [False, True]):
for image_loader in make_image_loaders(
sizes=[_get_five_ten_crop_canvas_size(size)],
color_spaces=["RGB"],
dtypes=[torch.float32],
):
yield ArgsKwargs(image_loader, size=size, vertical_flip=vertical_flip)
def reference_inputs_ten_crop_image_tensor():
for size, vertical_flip in itertools.product(_FIVE_TEN_CROP_SIZES, [False, True]):
for image_loader in make_image_loaders(
sizes=[_get_five_ten_crop_canvas_size(size)], extra_dims=[()], dtypes=[torch.uint8]
):
yield ArgsKwargs(image_loader, size=size, vertical_flip=vertical_flip)
def sample_inputs_ten_crop_video():
size = _FIVE_TEN_CROP_SIZES[0]
for video_loader in make_video_loaders(sizes=[_get_five_ten_crop_canvas_size(size)]):
yield ArgsKwargs(video_loader, size=size)
def multi_crop_pil_reference_wrapper(pil_kernel):
def wrapper(input_tensor, *other_args, **kwargs):
output = pil_reference_wrapper(pil_kernel)(input_tensor, *other_args, **kwargs)
return type(output)(
F.to_dtype_image(F.to_image(output_pil), dtype=input_tensor.dtype, scale=True) for output_pil in output
)
return wrapper
_common_five_ten_crop_marks = [
xfail_jit_python_scalar_arg("size"),
mark_framework_limitation(("TestKernels", "test_batched_vs_single"), "Custom batching needed."),
]
KERNEL_INFOS.extend(
[
KernelInfo(
F.five_crop_image,
sample_inputs_fn=sample_inputs_five_crop_image_tensor,
reference_fn=multi_crop_pil_reference_wrapper(F._five_crop_image_pil),
reference_inputs_fn=reference_inputs_five_crop_image_tensor,
test_marks=_common_five_ten_crop_marks,
),
KernelInfo(
F.five_crop_video,
sample_inputs_fn=sample_inputs_five_crop_video,
test_marks=_common_five_ten_crop_marks,
),
KernelInfo(
F.ten_crop_image,
sample_inputs_fn=sample_inputs_ten_crop_image_tensor,
reference_fn=multi_crop_pil_reference_wrapper(F._ten_crop_image_pil),
reference_inputs_fn=reference_inputs_ten_crop_image_tensor,
test_marks=_common_five_ten_crop_marks,
),
KernelInfo(
F.ten_crop_video,
sample_inputs_fn=sample_inputs_ten_crop_video,
test_marks=_common_five_ten_crop_marks,
),
]
)
_NORMALIZE_MEANS_STDS = [
((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
([0.0, 0.0, 0.0], [1.0, 1.0, 1.0]),
(0.5, 2.0),
]
def sample_inputs_normalize_image_tensor():
for image_loader, (mean, std) in itertools.product(
make_image_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=["RGB"], dtypes=[torch.float32]),
_NORMALIZE_MEANS_STDS,
):
yield ArgsKwargs(image_loader, mean=mean, std=std)
def reference_normalize_image_tensor(image, mean, std, inplace=False):
mean = torch.tensor(mean).view(-1, 1, 1)
std = torch.tensor(std).view(-1, 1, 1)
sub = torch.Tensor.sub_ if inplace else torch.Tensor.sub
return sub(image, mean).div_(std)
def reference_inputs_normalize_image_tensor():
yield ArgsKwargs(
make_image_loader(size=(32, 32), color_space="RGB", extra_dims=[1]),
mean=[0.5, 0.5, 0.5],
std=[1.0, 1.0, 1.0],
)
def sample_inputs_normalize_video():
mean, std = _NORMALIZE_MEANS_STDS[0]
for video_loader in make_video_loaders(
sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=["RGB"], num_frames=[3], dtypes=[torch.float32]
):
yield ArgsKwargs(video_loader, mean=mean, std=std)
KERNEL_INFOS.extend(
[
KernelInfo(
F.normalize_image,
kernel_name="normalize_image_tensor",
sample_inputs_fn=sample_inputs_normalize_image_tensor,
reference_fn=reference_normalize_image_tensor,
reference_inputs_fn=reference_inputs_normalize_image_tensor,
test_marks=[
xfail_jit_python_scalar_arg("mean"),
xfail_jit_python_scalar_arg("std"),
],
),
KernelInfo(
F.normalize_video,
sample_inputs_fn=sample_inputs_normalize_video,
),
]
)
def sample_inputs_uniform_temporal_subsample_video():
for video_loader in make_video_loaders(sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], num_frames=[4]):
yield ArgsKwargs(video_loader, num_samples=2)
def reference_uniform_temporal_subsample_video(x, num_samples):
# Copy-pasted from
# https://github.com/facebookresearch/pytorchvideo/blob/c8d23d8b7e597586a9e2d18f6ed31ad8aa379a7a/pytorchvideo/transforms/functional.py#L19
t = x.shape[-4]
assert num_samples > 0 and t > 0
# Sample by nearest neighbor interpolation if num_samples > t.
indices = torch.linspace(0, t - 1, num_samples)
indices = torch.clamp(indices, 0, t - 1).long()
return torch.index_select(x, -4, indices)
def reference_inputs_uniform_temporal_subsample_video():
for video_loader in make_video_loaders(
sizes=[DEFAULT_PORTRAIT_SPATIAL_SIZE], color_spaces=["RGB"], num_frames=[10]
):
for num_samples in range(1, video_loader.shape[-4] + 1):
yield ArgsKwargs(video_loader, num_samples)
KERNEL_INFOS.append(
KernelInfo(
F.uniform_temporal_subsample_video,
sample_inputs_fn=sample_inputs_uniform_temporal_subsample_video,
reference_fn=reference_uniform_temporal_subsample_video,
reference_inputs_fn=reference_inputs_uniform_temporal_subsample_video,
)
)
...@@ -5,11 +5,9 @@ implemented there and must not use any of the utilities here. ...@@ -5,11 +5,9 @@ implemented there and must not use any of the utilities here.
The following legacy modules depend on this module The following legacy modules depend on this module
- transforms_v2_kernel_infos.py
- transforms_v2_dispatcher_infos.py
- test_transforms_v2_functional.py - test_transforms_v2_functional.py
- test_transforms_v2_consistency.py - test_transforms_v2_consistency.py
- test_transforms.py - test_transforms_v2.py
When all the logic is ported from the files above to test_transforms_v2_refactored.py, delete When all the logic is ported from the files above to test_transforms_v2_refactored.py, delete
all the legacy modules including this one and drop the _refactored prefix from the name. all the legacy modules including this one and drop the _refactored prefix from the name.
......
...@@ -328,6 +328,11 @@ class RandomSolarize(_RandomApplyTransform): ...@@ -328,6 +328,11 @@ class RandomSolarize(_RandomApplyTransform):
_v1_transform_cls = _transforms.RandomSolarize _v1_transform_cls = _transforms.RandomSolarize
def _extract_params_for_v1_transform(self) -> Dict[str, Any]:
params = super()._extract_params_for_v1_transform()
params["threshold"] = float(params["threshold"])
return params
def __init__(self, threshold: float, p: float = 0.5) -> None: def __init__(self, threshold: float, p: float = 0.5) -> None:
super().__init__(p=p) super().__init__(p=p)
self.threshold = threshold self.threshold = threshold
......
...@@ -261,7 +261,7 @@ def clamp_bounding_boxes( ...@@ -261,7 +261,7 @@ def clamp_bounding_boxes(
if torch.jit.is_scripting() or is_pure_tensor(inpt): if torch.jit.is_scripting() or is_pure_tensor(inpt):
if format is None or canvas_size is None: if format is None or canvas_size is None:
raise ValueError("For pure tensor inputs, `format` and `canvas_size` has to be passed.") raise ValueError("For pure tensor inputs, `format` and `canvas_size` have to be passed.")
return _clamp_bounding_boxes(inpt, format=format, canvas_size=canvas_size) return _clamp_bounding_boxes(inpt, format=format, canvas_size=canvas_size)
elif isinstance(inpt, tv_tensors.BoundingBoxes): elif isinstance(inpt, tv_tensors.BoundingBoxes):
if format is not None or canvas_size is not None: if format is not None or canvas_size is not None:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment