Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
vision
Commits
1101c3dc
Unverified
Commit
1101c3dc
authored
Oct 05, 2023
by
Philip Meier
Committed by
GitHub
Oct 05, 2023
Browse files
Remove reference consistency tests (#8023)
parent
1d646d41
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
0 additions
and
840 deletions
+0
-840
test/test_transforms_v2_consistency.py
test/test_transforms_v2_consistency.py
+0
-214
test/transforms_v2_legacy_utils.py
test/transforms_v2_legacy_utils.py
+0
-626
No files found.
test/test_transforms_v2_consistency.py
deleted
100644 → 0
View file @
1d646d41
import
importlib.machinery
import
importlib.util
import
random
from
pathlib
import
Path
import
pytest
import
torch
import
torchvision.transforms.v2
as
v2_transforms
from
common_utils
import
assert_equal
from
torchvision
import
tv_tensors
from
torchvision.transforms
import
functional
as
legacy_F
from
torchvision.transforms.v2
import
functional
as
prototype_F
from
torchvision.transforms.v2._utils
import
_get_fill
,
query_size
from
torchvision.transforms.v2.functional
import
to_pil_image
from
transforms_v2_legacy_utils
import
make_bounding_boxes
,
make_detection_mask
,
make_image
,
make_segmentation_mask
def
import_transforms_from_references
(
reference
):
HERE
=
Path
(
__file__
).
parent
PROJECT_ROOT
=
HERE
.
parent
loader
=
importlib
.
machinery
.
SourceFileLoader
(
"transforms"
,
str
(
PROJECT_ROOT
/
"references"
/
reference
/
"transforms.py"
)
)
spec
=
importlib
.
util
.
spec_from_loader
(
"transforms"
,
loader
)
module
=
importlib
.
util
.
module_from_spec
(
spec
)
loader
.
exec_module
(
module
)
return
module
det_transforms
=
import_transforms_from_references
(
"detection"
)
class
TestRefDetTransforms
:
def
make_tv_tensors
(
self
,
with_mask
=
True
):
size
=
(
600
,
800
)
num_objects
=
22
def
make_label
(
extra_dims
,
categories
):
return
torch
.
randint
(
categories
,
extra_dims
,
dtype
=
torch
.
int64
)
pil_image
=
to_pil_image
(
make_image
(
size
=
size
,
color_space
=
"RGB"
))
target
=
{
"boxes"
:
make_bounding_boxes
(
canvas_size
=
size
,
format
=
"XYXY"
,
batch_dims
=
(
num_objects
,),
dtype
=
torch
.
float
),
"labels"
:
make_label
(
extra_dims
=
(
num_objects
,),
categories
=
80
),
}
if
with_mask
:
target
[
"masks"
]
=
make_detection_mask
(
size
=
size
,
num_objects
=
num_objects
,
dtype
=
torch
.
long
)
yield
(
pil_image
,
target
)
tensor_image
=
torch
.
Tensor
(
make_image
(
size
=
size
,
color_space
=
"RGB"
,
dtype
=
torch
.
float32
))
target
=
{
"boxes"
:
make_bounding_boxes
(
canvas_size
=
size
,
format
=
"XYXY"
,
batch_dims
=
(
num_objects
,),
dtype
=
torch
.
float
),
"labels"
:
make_label
(
extra_dims
=
(
num_objects
,),
categories
=
80
),
}
if
with_mask
:
target
[
"masks"
]
=
make_detection_mask
(
size
=
size
,
num_objects
=
num_objects
,
dtype
=
torch
.
long
)
yield
(
tensor_image
,
target
)
tv_tensor_image
=
make_image
(
size
=
size
,
color_space
=
"RGB"
,
dtype
=
torch
.
float32
)
target
=
{
"boxes"
:
make_bounding_boxes
(
canvas_size
=
size
,
format
=
"XYXY"
,
batch_dims
=
(
num_objects
,),
dtype
=
torch
.
float
),
"labels"
:
make_label
(
extra_dims
=
(
num_objects
,),
categories
=
80
),
}
if
with_mask
:
target
[
"masks"
]
=
make_detection_mask
(
size
=
size
,
num_objects
=
num_objects
,
dtype
=
torch
.
long
)
yield
(
tv_tensor_image
,
target
)
@
pytest
.
mark
.
parametrize
(
"t_ref, t, data_kwargs"
,
[
(
det_transforms
.
RandomHorizontalFlip
(
p
=
1.0
),
v2_transforms
.
RandomHorizontalFlip
(
p
=
1.0
),
{}),
(
det_transforms
.
RandomIoUCrop
(),
v2_transforms
.
Compose
(
[
v2_transforms
.
RandomIoUCrop
(),
v2_transforms
.
SanitizeBoundingBoxes
(
labels_getter
=
lambda
sample
:
sample
[
1
][
"labels"
]),
]
),
{
"with_mask"
:
False
},
),
(
det_transforms
.
RandomZoomOut
(),
v2_transforms
.
RandomZoomOut
(),
{
"with_mask"
:
False
}),
(
det_transforms
.
ScaleJitter
((
1024
,
1024
)),
v2_transforms
.
ScaleJitter
((
1024
,
1024
),
antialias
=
True
),
{}),
(
det_transforms
.
RandomShortestSize
(
min_size
=
(
480
,
512
,
544
,
576
,
608
,
640
,
672
,
704
,
736
,
768
,
800
),
max_size
=
1333
),
v2_transforms
.
RandomShortestSize
(
min_size
=
(
480
,
512
,
544
,
576
,
608
,
640
,
672
,
704
,
736
,
768
,
800
),
max_size
=
1333
),
{},
),
],
)
def
test_transform
(
self
,
t_ref
,
t
,
data_kwargs
):
for
dp
in
self
.
make_tv_tensors
(
**
data_kwargs
):
# We should use prototype transform first as reference transform performs inplace target update
torch
.
manual_seed
(
12
)
output
=
t
(
dp
)
torch
.
manual_seed
(
12
)
expected_output
=
t_ref
(
*
dp
)
assert_equal
(
expected_output
,
output
)
seg_transforms
=
import_transforms_from_references
(
"segmentation"
)
# We need this transform for two reasons:
# 1. transforms.RandomCrop uses a different scheme to pad images and masks of insufficient size than its name
# counterpart in the detection references. Thus, we cannot use it with `pad_if_needed=True`
# 2. transforms.Pad only supports a fixed padding, but the segmentation datasets don't have a fixed image size.
class
PadIfSmaller
(
v2_transforms
.
Transform
):
def
__init__
(
self
,
size
,
fill
=
0
):
super
().
__init__
()
self
.
size
=
size
self
.
fill
=
v2_transforms
.
_geometry
.
_setup_fill_arg
(
fill
)
def
_get_params
(
self
,
sample
):
height
,
width
=
query_size
(
sample
)
padding
=
[
0
,
0
,
max
(
self
.
size
-
width
,
0
),
max
(
self
.
size
-
height
,
0
)]
needs_padding
=
any
(
padding
)
return
dict
(
padding
=
padding
,
needs_padding
=
needs_padding
)
def
_transform
(
self
,
inpt
,
params
):
if
not
params
[
"needs_padding"
]:
return
inpt
fill
=
_get_fill
(
self
.
fill
,
type
(
inpt
))
return
prototype_F
.
pad
(
inpt
,
padding
=
params
[
"padding"
],
fill
=
fill
)
class
TestRefSegTransforms
:
def
make_tv_tensors
(
self
,
supports_pil
=
True
,
image_dtype
=
torch
.
uint8
):
size
=
(
256
,
460
)
num_categories
=
21
conv_fns
=
[]
if
supports_pil
:
conv_fns
.
append
(
to_pil_image
)
conv_fns
.
extend
([
torch
.
Tensor
,
lambda
x
:
x
])
for
conv_fn
in
conv_fns
:
tv_tensor_image
=
make_image
(
size
=
size
,
color_space
=
"RGB"
,
dtype
=
image_dtype
)
tv_tensor_mask
=
make_segmentation_mask
(
size
=
size
,
num_categories
=
num_categories
,
dtype
=
torch
.
uint8
)
dp
=
(
conv_fn
(
tv_tensor_image
),
tv_tensor_mask
)
dp_ref
=
(
to_pil_image
(
tv_tensor_image
)
if
supports_pil
else
tv_tensor_image
.
as_subclass
(
torch
.
Tensor
),
to_pil_image
(
tv_tensor_mask
),
)
yield
dp
,
dp_ref
def
set_seed
(
self
,
seed
=
12
):
torch
.
manual_seed
(
seed
)
random
.
seed
(
seed
)
def
check
(
self
,
t
,
t_ref
,
data_kwargs
=
None
):
for
dp
,
dp_ref
in
self
.
make_tv_tensors
(
**
data_kwargs
or
dict
()):
self
.
set_seed
()
actual
=
actual_image
,
actual_mask
=
t
(
dp
)
self
.
set_seed
()
expected_image
,
expected_mask
=
t_ref
(
*
dp_ref
)
if
isinstance
(
actual_image
,
torch
.
Tensor
)
and
not
isinstance
(
expected_image
,
torch
.
Tensor
):
expected_image
=
legacy_F
.
pil_to_tensor
(
expected_image
)
expected_mask
=
legacy_F
.
pil_to_tensor
(
expected_mask
).
squeeze
(
0
)
expected
=
(
expected_image
,
expected_mask
)
assert_equal
(
actual
,
expected
)
@
pytest
.
mark
.
parametrize
(
(
"t_ref"
,
"t"
,
"data_kwargs"
),
[
(
seg_transforms
.
RandomHorizontalFlip
(
flip_prob
=
1.0
),
v2_transforms
.
RandomHorizontalFlip
(
p
=
1.0
),
dict
(),
),
(
seg_transforms
.
RandomHorizontalFlip
(
flip_prob
=
0.0
),
v2_transforms
.
RandomHorizontalFlip
(
p
=
0.0
),
dict
(),
),
(
seg_transforms
.
RandomCrop
(
size
=
480
),
v2_transforms
.
Compose
(
[
PadIfSmaller
(
size
=
480
,
fill
=
{
tv_tensors
.
Mask
:
255
,
"others"
:
0
}),
v2_transforms
.
RandomCrop
(
size
=
480
),
]
),
dict
(),
),
(
seg_transforms
.
Normalize
(
mean
=
(
0.485
,
0.456
,
0.406
),
std
=
(
0.229
,
0.224
,
0.225
)),
v2_transforms
.
Normalize
(
mean
=
(
0.485
,
0.456
,
0.406
),
std
=
(
0.229
,
0.224
,
0.225
)),
dict
(
supports_pil
=
False
,
image_dtype
=
torch
.
float
),
),
],
)
def
test_common
(
self
,
t_ref
,
t
,
data_kwargs
):
self
.
check
(
t
,
t_ref
,
data_kwargs
)
test/transforms_v2_legacy_utils.py
deleted
100644 → 0
View file @
1d646d41
"""
As the name implies, these are legacy utilities that are hopefully removed soon. The future of
transforms v2 testing is in test/test_transforms_v2_refactored.py. All new test should be
implemented there and must not use any of the utilities here.
The following legacy modules depend on this module
- test_transforms_v2_consistency.py
"""
import
collections.abc
import
dataclasses
import
enum
import
itertools
import
pathlib
from
collections
import
defaultdict
from
typing
import
Callable
,
Sequence
,
Tuple
,
Union
import
PIL.Image
import
pytest
import
torch
from
torchvision
import
tv_tensors
from
torchvision.transforms._functional_tensor
import
_max_value
as
get_max_value
from
torchvision.transforms.v2.functional
import
to_dtype_image
,
to_image
,
to_pil_image
def
combinations_grid
(
**
kwargs
):
"""Creates a grid of input combinations.
Each element in the returned sequence is a dictionary containing one possible combination as values.
Example:
>>> combinations_grid(foo=("bar", "baz"), spam=("eggs", "ham"))
[
{'foo': 'bar', 'spam': 'eggs'},
{'foo': 'bar', 'spam': 'ham'},
{'foo': 'baz', 'spam': 'eggs'},
{'foo': 'baz', 'spam': 'ham'}
]
"""
return
[
dict
(
zip
(
kwargs
.
keys
(),
values
))
for
values
in
itertools
.
product
(
*
kwargs
.
values
())]
DEFAULT_SIZE
=
(
17
,
11
)
NUM_CHANNELS_MAP
=
{
"GRAY"
:
1
,
"GRAY_ALPHA"
:
2
,
"RGB"
:
3
,
"RGBA"
:
4
,
}
def
make_image
(
size
=
DEFAULT_SIZE
,
*
,
color_space
=
"RGB"
,
batch_dims
=
(),
dtype
=
None
,
device
=
"cpu"
,
memory_format
=
torch
.
contiguous_format
,
):
num_channels
=
NUM_CHANNELS_MAP
[
color_space
]
dtype
=
dtype
or
torch
.
uint8
max_value
=
get_max_value
(
dtype
)
data
=
torch
.
testing
.
make_tensor
(
(
*
batch_dims
,
num_channels
,
*
size
),
low
=
0
,
high
=
max_value
,
dtype
=
dtype
,
device
=
device
,
memory_format
=
memory_format
,
)
if
color_space
in
{
"GRAY_ALPHA"
,
"RGBA"
}:
data
[...,
-
1
,
:,
:]
=
max_value
return
tv_tensors
.
Image
(
data
)
def
make_image_tensor
(
*
args
,
**
kwargs
):
return
make_image
(
*
args
,
**
kwargs
).
as_subclass
(
torch
.
Tensor
)
def
make_image_pil
(
*
args
,
**
kwargs
):
return
to_pil_image
(
make_image
(
*
args
,
**
kwargs
))
def
make_bounding_boxes
(
canvas_size
=
DEFAULT_SIZE
,
*
,
format
=
tv_tensors
.
BoundingBoxFormat
.
XYXY
,
batch_dims
=
(),
dtype
=
None
,
device
=
"cpu"
,
):
def
sample_position
(
values
,
max_value
):
# We cannot use torch.randint directly here, because it only allows integer scalars as values for low and high.
# However, if we have batch_dims, we need tensors as limits.
return
torch
.
stack
([
torch
.
randint
(
max_value
-
v
,
())
for
v
in
values
.
flatten
().
tolist
()]).
reshape
(
values
.
shape
)
if
isinstance
(
format
,
str
):
format
=
tv_tensors
.
BoundingBoxFormat
[
format
]
dtype
=
dtype
or
torch
.
float32
if
any
(
dim
==
0
for
dim
in
batch_dims
):
return
tv_tensors
.
BoundingBoxes
(
torch
.
empty
(
*
batch_dims
,
4
,
dtype
=
dtype
,
device
=
device
),
format
=
format
,
canvas_size
=
canvas_size
)
h
,
w
=
[
torch
.
randint
(
1
,
c
,
batch_dims
)
for
c
in
canvas_size
]
y
=
sample_position
(
h
,
canvas_size
[
0
])
x
=
sample_position
(
w
,
canvas_size
[
1
])
if
format
is
tv_tensors
.
BoundingBoxFormat
.
XYWH
:
parts
=
(
x
,
y
,
w
,
h
)
elif
format
is
tv_tensors
.
BoundingBoxFormat
.
XYXY
:
x1
,
y1
=
x
,
y
x2
=
x1
+
w
y2
=
y1
+
h
parts
=
(
x1
,
y1
,
x2
,
y2
)
elif
format
is
tv_tensors
.
BoundingBoxFormat
.
CXCYWH
:
cx
=
x
+
w
/
2
cy
=
y
+
h
/
2
parts
=
(
cx
,
cy
,
w
,
h
)
else
:
raise
ValueError
(
f
"Format
{
format
}
is not supported"
)
return
tv_tensors
.
BoundingBoxes
(
torch
.
stack
(
parts
,
dim
=-
1
).
to
(
dtype
=
dtype
,
device
=
device
),
format
=
format
,
canvas_size
=
canvas_size
)
def
make_detection_mask
(
size
=
DEFAULT_SIZE
,
*
,
num_objects
=
5
,
batch_dims
=
(),
dtype
=
None
,
device
=
"cpu"
):
"""Make a "detection" mask, i.e. (*, N, H, W), where each object is encoded as one of N boolean masks"""
return
tv_tensors
.
Mask
(
torch
.
testing
.
make_tensor
(
(
*
batch_dims
,
num_objects
,
*
size
),
low
=
0
,
high
=
2
,
dtype
=
dtype
or
torch
.
bool
,
device
=
device
,
)
)
def
make_segmentation_mask
(
size
=
DEFAULT_SIZE
,
*
,
num_categories
=
10
,
batch_dims
=
(),
dtype
=
None
,
device
=
"cpu"
):
"""Make a "segmentation" mask, i.e. (*, H, W), where the category is encoded as pixel value"""
return
tv_tensors
.
Mask
(
torch
.
testing
.
make_tensor
(
(
*
batch_dims
,
*
size
),
low
=
0
,
high
=
num_categories
,
dtype
=
dtype
or
torch
.
uint8
,
device
=
device
,
)
)
def
make_video
(
size
=
DEFAULT_SIZE
,
*
,
num_frames
=
3
,
batch_dims
=
(),
**
kwargs
):
return
tv_tensors
.
Video
(
make_image
(
size
,
batch_dims
=
(
*
batch_dims
,
num_frames
),
**
kwargs
))
def
make_video_tensor
(
*
args
,
**
kwargs
):
return
make_video
(
*
args
,
**
kwargs
).
as_subclass
(
torch
.
Tensor
)
DEFAULT_SQUARE_SPATIAL_SIZE
=
15
DEFAULT_LANDSCAPE_SPATIAL_SIZE
=
(
7
,
33
)
DEFAULT_PORTRAIT_SPATIAL_SIZE
=
(
31
,
9
)
DEFAULT_SPATIAL_SIZES
=
(
DEFAULT_LANDSCAPE_SPATIAL_SIZE
,
DEFAULT_PORTRAIT_SPATIAL_SIZE
,
DEFAULT_SQUARE_SPATIAL_SIZE
,
)
def
_parse_size
(
size
,
*
,
name
=
"size"
):
if
size
==
"random"
:
raise
ValueError
(
"This should never happen"
)
elif
isinstance
(
size
,
int
)
and
size
>
0
:
return
(
size
,
size
)
elif
(
isinstance
(
size
,
collections
.
abc
.
Sequence
)
and
len
(
size
)
==
2
and
all
(
isinstance
(
length
,
int
)
and
length
>
0
for
length
in
size
)
):
return
tuple
(
size
)
else
:
raise
pytest
.
UsageError
(
f
"'
{
name
}
' can either be `'random'`, a positive integer, or a sequence of two positive integers,"
f
"but got
{
size
}
instead."
)
def
get_num_channels
(
color_space
):
num_channels
=
NUM_CHANNELS_MAP
.
get
(
color_space
)
if
not
num_channels
:
raise
pytest
.
UsageError
(
f
"Can't determine the number of channels for color space
{
color_space
}
"
)
return
num_channels
VALID_EXTRA_DIMS
=
((),
(
4
,),
(
2
,
3
))
DEGENERATE_BATCH_DIMS
=
((
0
,),
(
5
,
0
),
(
0
,
5
))
DEFAULT_EXTRA_DIMS
=
(
*
VALID_EXTRA_DIMS
,
*
DEGENERATE_BATCH_DIMS
)
def
from_loader
(
loader_fn
):
def
wrapper
(
*
args
,
**
kwargs
):
device
=
kwargs
.
pop
(
"device"
,
"cpu"
)
loader
=
loader_fn
(
*
args
,
**
kwargs
)
return
loader
.
load
(
device
)
return
wrapper
def
from_loaders
(
loaders_fn
):
def
wrapper
(
*
args
,
**
kwargs
):
device
=
kwargs
.
pop
(
"device"
,
"cpu"
)
loaders
=
loaders_fn
(
*
args
,
**
kwargs
)
for
loader
in
loaders
:
yield
loader
.
load
(
device
)
return
wrapper
@
dataclasses
.
dataclass
class
TensorLoader
:
fn
:
Callable
[[
Sequence
[
int
],
torch
.
dtype
,
Union
[
str
,
torch
.
device
]],
torch
.
Tensor
]
shape
:
Sequence
[
int
]
dtype
:
torch
.
dtype
def
load
(
self
,
device
):
return
self
.
fn
(
self
.
shape
,
self
.
dtype
,
device
)
@
dataclasses
.
dataclass
class
ImageLoader
(
TensorLoader
):
spatial_size
:
Tuple
[
int
,
int
]
=
dataclasses
.
field
(
init
=
False
)
num_channels
:
int
=
dataclasses
.
field
(
init
=
False
)
memory_format
:
torch
.
memory_format
=
torch
.
contiguous_format
canvas_size
:
Tuple
[
int
,
int
]
=
dataclasses
.
field
(
init
=
False
)
def
__post_init__
(
self
):
self
.
spatial_size
=
self
.
canvas_size
=
self
.
shape
[
-
2
:]
self
.
num_channels
=
self
.
shape
[
-
3
]
def
load
(
self
,
device
):
return
self
.
fn
(
self
.
shape
,
self
.
dtype
,
device
,
memory_format
=
self
.
memory_format
)
def
make_image_loader
(
size
=
DEFAULT_PORTRAIT_SPATIAL_SIZE
,
*
,
color_space
=
"RGB"
,
extra_dims
=
(),
dtype
=
torch
.
float32
,
constant_alpha
=
True
,
memory_format
=
torch
.
contiguous_format
,
):
if
not
constant_alpha
:
raise
ValueError
(
"This should never happen"
)
size
=
_parse_size
(
size
)
num_channels
=
get_num_channels
(
color_space
)
def
fn
(
shape
,
dtype
,
device
,
memory_format
):
*
batch_dims
,
_
,
height
,
width
=
shape
return
make_image
(
(
height
,
width
),
color_space
=
color_space
,
batch_dims
=
batch_dims
,
dtype
=
dtype
,
device
=
device
,
memory_format
=
memory_format
,
)
return
ImageLoader
(
fn
,
shape
=
(
*
extra_dims
,
num_channels
,
*
size
),
dtype
=
dtype
,
memory_format
=
memory_format
)
def
make_image_loaders
(
*
,
sizes
=
DEFAULT_SPATIAL_SIZES
,
color_spaces
=
(
"GRAY"
,
"GRAY_ALPHA"
,
"RGB"
,
"RGBA"
,
),
extra_dims
=
DEFAULT_EXTRA_DIMS
,
dtypes
=
(
torch
.
float32
,
torch
.
float64
,
torch
.
uint8
),
constant_alpha
=
True
,
):
for
params
in
combinations_grid
(
size
=
sizes
,
color_space
=
color_spaces
,
extra_dims
=
extra_dims
,
dtype
=
dtypes
):
yield
make_image_loader
(
**
params
,
constant_alpha
=
constant_alpha
)
make_images
=
from_loaders
(
make_image_loaders
)
def
make_image_loader_for_interpolation
(
size
=
(
233
,
147
),
*
,
color_space
=
"RGB"
,
dtype
=
torch
.
uint8
,
memory_format
=
torch
.
contiguous_format
):
size
=
_parse_size
(
size
)
num_channels
=
get_num_channels
(
color_space
)
def
fn
(
shape
,
dtype
,
device
,
memory_format
):
height
,
width
=
shape
[
-
2
:]
image_pil
=
(
PIL
.
Image
.
open
(
pathlib
.
Path
(
__file__
).
parent
/
"assets"
/
"encode_jpeg"
/
"grace_hopper_517x606.jpg"
)
.
resize
((
width
,
height
))
.
convert
(
{
"GRAY"
:
"L"
,
"GRAY_ALPHA"
:
"LA"
,
"RGB"
:
"RGB"
,
"RGBA"
:
"RGBA"
,
}[
color_space
]
)
)
image_tensor
=
to_image
(
image_pil
)
if
memory_format
==
torch
.
contiguous_format
:
image_tensor
=
image_tensor
.
to
(
device
=
device
,
memory_format
=
memory_format
,
copy
=
True
)
else
:
image_tensor
=
image_tensor
.
to
(
device
=
device
)
image_tensor
=
to_dtype_image
(
image_tensor
,
dtype
=
dtype
,
scale
=
True
)
return
tv_tensors
.
Image
(
image_tensor
)
return
ImageLoader
(
fn
,
shape
=
(
num_channels
,
*
size
),
dtype
=
dtype
,
memory_format
=
memory_format
)
def
make_image_loaders_for_interpolation
(
sizes
=
((
233
,
147
),),
color_spaces
=
(
"RGB"
,),
dtypes
=
(
torch
.
uint8
,),
memory_formats
=
(
torch
.
contiguous_format
,
torch
.
channels_last
),
):
for
params
in
combinations_grid
(
size
=
sizes
,
color_space
=
color_spaces
,
dtype
=
dtypes
,
memory_format
=
memory_formats
):
yield
make_image_loader_for_interpolation
(
**
params
)
@
dataclasses
.
dataclass
class
BoundingBoxesLoader
(
TensorLoader
):
format
:
tv_tensors
.
BoundingBoxFormat
spatial_size
:
Tuple
[
int
,
int
]
canvas_size
:
Tuple
[
int
,
int
]
=
dataclasses
.
field
(
init
=
False
)
def
__post_init__
(
self
):
self
.
canvas_size
=
self
.
spatial_size
def
make_bounding_box_loader
(
*
,
extra_dims
=
(),
format
,
spatial_size
=
DEFAULT_PORTRAIT_SPATIAL_SIZE
,
dtype
=
torch
.
float32
):
if
isinstance
(
format
,
str
):
format
=
tv_tensors
.
BoundingBoxFormat
[
format
]
spatial_size
=
_parse_size
(
spatial_size
,
name
=
"spatial_size"
)
def
fn
(
shape
,
dtype
,
device
):
*
batch_dims
,
num_coordinates
=
shape
if
num_coordinates
!=
4
:
raise
pytest
.
UsageError
()
return
make_bounding_boxes
(
format
=
format
,
canvas_size
=
spatial_size
,
batch_dims
=
batch_dims
,
dtype
=
dtype
,
device
=
device
)
return
BoundingBoxesLoader
(
fn
,
shape
=
(
*
extra_dims
[
-
1
:],
4
),
dtype
=
dtype
,
format
=
format
,
spatial_size
=
spatial_size
)
def
make_bounding_box_loaders
(
*
,
extra_dims
=
tuple
(
d
for
d
in
DEFAULT_EXTRA_DIMS
if
len
(
d
)
<
2
),
formats
=
tuple
(
tv_tensors
.
BoundingBoxFormat
),
spatial_size
=
DEFAULT_PORTRAIT_SPATIAL_SIZE
,
dtypes
=
(
torch
.
float32
,
torch
.
float64
,
torch
.
int64
),
):
for
params
in
combinations_grid
(
extra_dims
=
extra_dims
,
format
=
formats
,
dtype
=
dtypes
):
yield
make_bounding_box_loader
(
**
params
,
spatial_size
=
spatial_size
)
make_multiple_bounding_boxes
=
from_loaders
(
make_bounding_box_loaders
)
class
MaskLoader
(
TensorLoader
):
pass
def
make_detection_mask_loader
(
size
=
DEFAULT_PORTRAIT_SPATIAL_SIZE
,
*
,
num_objects
=
5
,
extra_dims
=
(),
dtype
=
torch
.
uint8
):
# This produces "detection" masks, i.e. `(*, N, H, W)`, where `N` denotes the number of objects
size
=
_parse_size
(
size
)
def
fn
(
shape
,
dtype
,
device
):
*
batch_dims
,
num_objects
,
height
,
width
=
shape
return
make_detection_mask
(
(
height
,
width
),
num_objects
=
num_objects
,
batch_dims
=
batch_dims
,
dtype
=
dtype
,
device
=
device
)
return
MaskLoader
(
fn
,
shape
=
(
*
extra_dims
,
num_objects
,
*
size
),
dtype
=
dtype
)
def
make_detection_mask_loaders
(
sizes
=
DEFAULT_SPATIAL_SIZES
,
num_objects
=
(
1
,
0
,
5
),
extra_dims
=
DEFAULT_EXTRA_DIMS
,
dtypes
=
(
torch
.
uint8
,),
):
for
params
in
combinations_grid
(
size
=
sizes
,
num_objects
=
num_objects
,
extra_dims
=
extra_dims
,
dtype
=
dtypes
):
yield
make_detection_mask_loader
(
**
params
)
make_detection_masks
=
from_loaders
(
make_detection_mask_loaders
)
def
make_segmentation_mask_loader
(
size
=
DEFAULT_PORTRAIT_SPATIAL_SIZE
,
*
,
num_categories
=
10
,
extra_dims
=
(),
dtype
=
torch
.
uint8
):
# This produces "segmentation" masks, i.e. `(*, H, W)`, where the category is encoded in the values
size
=
_parse_size
(
size
)
def
fn
(
shape
,
dtype
,
device
):
*
batch_dims
,
height
,
width
=
shape
return
make_segmentation_mask
(
(
height
,
width
),
num_categories
=
num_categories
,
batch_dims
=
batch_dims
,
dtype
=
dtype
,
device
=
device
)
return
MaskLoader
(
fn
,
shape
=
(
*
extra_dims
,
*
size
),
dtype
=
dtype
)
def
make_segmentation_mask_loaders
(
*
,
sizes
=
DEFAULT_SPATIAL_SIZES
,
num_categories
=
(
1
,
2
,
10
),
extra_dims
=
DEFAULT_EXTRA_DIMS
,
dtypes
=
(
torch
.
uint8
,),
):
for
params
in
combinations_grid
(
size
=
sizes
,
num_categories
=
num_categories
,
extra_dims
=
extra_dims
,
dtype
=
dtypes
):
yield
make_segmentation_mask_loader
(
**
params
)
make_segmentation_masks
=
from_loaders
(
make_segmentation_mask_loaders
)
def
make_mask_loaders
(
*
,
sizes
=
DEFAULT_SPATIAL_SIZES
,
num_objects
=
(
1
,
0
,
5
),
num_categories
=
(
1
,
2
,
10
),
extra_dims
=
DEFAULT_EXTRA_DIMS
,
dtypes
=
(
torch
.
uint8
,),
):
yield
from
make_detection_mask_loaders
(
sizes
=
sizes
,
num_objects
=
num_objects
,
extra_dims
=
extra_dims
,
dtypes
=
dtypes
)
yield
from
make_segmentation_mask_loaders
(
sizes
=
sizes
,
num_categories
=
num_categories
,
extra_dims
=
extra_dims
,
dtypes
=
dtypes
)
make_masks
=
from_loaders
(
make_mask_loaders
)
class
VideoLoader
(
ImageLoader
):
pass
def
make_video_loader
(
size
=
DEFAULT_PORTRAIT_SPATIAL_SIZE
,
*
,
color_space
=
"RGB"
,
num_frames
=
3
,
extra_dims
=
(),
dtype
=
torch
.
uint8
,
):
size
=
_parse_size
(
size
)
def
fn
(
shape
,
dtype
,
device
,
memory_format
):
*
batch_dims
,
num_frames
,
_
,
height
,
width
=
shape
return
make_video
(
(
height
,
width
),
num_frames
=
num_frames
,
batch_dims
=
batch_dims
,
color_space
=
color_space
,
dtype
=
dtype
,
device
=
device
,
memory_format
=
memory_format
,
)
return
VideoLoader
(
fn
,
shape
=
(
*
extra_dims
,
num_frames
,
get_num_channels
(
color_space
),
*
size
),
dtype
=
dtype
)
def
make_video_loaders
(
*
,
sizes
=
DEFAULT_SPATIAL_SIZES
,
color_spaces
=
(
"GRAY"
,
"RGB"
,
),
num_frames
=
(
1
,
0
,
3
),
extra_dims
=
DEFAULT_EXTRA_DIMS
,
dtypes
=
(
torch
.
uint8
,
torch
.
float32
,
torch
.
float64
),
):
for
params
in
combinations_grid
(
size
=
sizes
,
color_space
=
color_spaces
,
num_frames
=
num_frames
,
extra_dims
=
extra_dims
,
dtype
=
dtypes
):
yield
make_video_loader
(
**
params
)
make_videos
=
from_loaders
(
make_video_loaders
)
class
TestMark
:
def
__init__
(
self
,
# Tuple of test class name and test function name that identifies the test the mark is applied to. If there is
# no test class, i.e. a standalone test function, use `None`.
test_id
,
# `pytest.mark.*` to apply, e.g. `pytest.mark.skip` or `pytest.mark.xfail`
mark
,
*
,
# Callable, that will be passed an `ArgsKwargs` and should return a boolean to indicate if the mark will be
# applied. If omitted, defaults to always apply.
condition
=
None
,
):
self
.
test_id
=
test_id
self
.
mark
=
mark
self
.
condition
=
condition
or
(
lambda
args_kwargs
:
True
)
def
mark_framework_limitation
(
test_id
,
reason
,
condition
=
None
):
# The purpose of this function is to have a single entry point for skip marks that are only there, because the test
# framework cannot handle the kernel in general or a specific parameter combination.
# As development progresses, we can change the `mark.skip` to `mark.xfail` from time to time to see if the skip is
# still justified.
# We don't want to use `mark.xfail` all the time, because that actually runs the test until an error happens. Thus,
# we are wasting CI resources for no reason for most of the time
return
TestMark
(
test_id
,
pytest
.
mark
.
skip
(
reason
=
reason
),
condition
=
condition
)
class
InfoBase
:
def
__init__
(
self
,
*
,
# Identifier if the info that shows up the parametrization.
id
,
# Test markers that will be (conditionally) applied to an `ArgsKwargs` parametrization.
# See the `TestMark` class for details
test_marks
=
None
,
# Additional parameters, e.g. `rtol=1e-3`, passed to `assert_close`. Keys are a 3-tuple of `test_id` (see
# `TestMark`), the dtype, and the device.
closeness_kwargs
=
None
,
):
self
.
id
=
id
self
.
test_marks
=
test_marks
or
[]
test_marks_map
=
defaultdict
(
list
)
for
test_mark
in
self
.
test_marks
:
test_marks_map
[
test_mark
.
test_id
].
append
(
test_mark
)
self
.
_test_marks_map
=
dict
(
test_marks_map
)
self
.
closeness_kwargs
=
closeness_kwargs
or
dict
()
def
get_marks
(
self
,
test_id
,
args_kwargs
):
return
[
test_mark
.
mark
for
test_mark
in
self
.
_test_marks_map
.
get
(
test_id
,
[])
if
test_mark
.
condition
(
args_kwargs
)
]
def
get_closeness_kwargs
(
self
,
test_id
,
*
,
dtype
,
device
):
if
not
(
isinstance
(
test_id
,
tuple
)
and
len
(
test_id
)
==
2
):
msg
=
"`test_id` should be a `Tuple[Optional[str], str]` denoting the test class and function name"
if
callable
(
test_id
):
msg
+=
". Did you forget to add the `test_id` fixture to parameters of the test?"
else
:
msg
+=
f
", but got
{
test_id
}
instead."
raise
pytest
.
UsageError
(
msg
)
if
isinstance
(
device
,
torch
.
device
):
device
=
device
.
type
return
self
.
closeness_kwargs
.
get
((
test_id
,
dtype
,
device
),
dict
())
class
ArgsKwargs
:
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
args
=
args
self
.
kwargs
=
kwargs
def
__iter__
(
self
):
yield
self
.
args
yield
self
.
kwargs
def
load
(
self
,
device
=
"cpu"
):
return
ArgsKwargs
(
*
(
arg
.
load
(
device
)
if
isinstance
(
arg
,
TensorLoader
)
else
arg
for
arg
in
self
.
args
),
**
{
keyword
:
arg
.
load
(
device
)
if
isinstance
(
arg
,
TensorLoader
)
else
arg
for
keyword
,
arg
in
self
.
kwargs
.
items
()
},
)
def
parametrized_error_message
(
*
args
,
**
kwargs
):
def
to_str
(
obj
):
if
isinstance
(
obj
,
torch
.
Tensor
)
and
obj
.
numel
()
>
30
:
return
f
"tensor(shape=
{
list
(
obj
.
shape
)
}
, dtype=
{
obj
.
dtype
}
, device=
{
obj
.
device
}
)"
elif
isinstance
(
obj
,
enum
.
Enum
):
return
f
"
{
type
(
obj
).
__name__
}
.
{
obj
.
name
}
"
else
:
return
repr
(
obj
)
if
args
or
kwargs
:
postfix
=
"
\n
"
.
join
(
[
""
,
"Failure happened for the following parameters:"
,
""
,
*
[
to_str
(
arg
)
for
arg
in
args
],
*
[
f
"
{
name
}
=
{
to_str
(
kwarg
)
}
"
for
name
,
kwarg
in
kwargs
.
items
()],
]
)
else
:
postfix
=
""
def
wrapper
(
msg
):
return
msg
+
postfix
return
wrapper
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment