test_containers.py 4.98 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.

# pylint: disable=missing-module-docstring
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring

""" Test utility classes from containers.py. """

12
from collections import OrderedDict, namedtuple
13
14
15
16
import random

import pytest
import torch
17
import torch.nn as nn
18

19
from fairscale.internal.containers import (
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    apply_to_tensors,
    pack_kwargs,
    split_non_tensors,
    unpack_kwargs,
    unpack_non_tensors,
)


@pytest.mark.parametrize("devices", [["cpu"], ["cuda"], ["cpu", "cuda"]])
def test_apply_to_tensors(devices):
    """Test apply_to_tensors for both cpu & gpu"""
    if "cuda" in devices and not torch.cuda.is_available() or torch.cuda.device_count() < 1:
        pytest.skip("Skipped due to lack of GPU")
    expected = 0

    def get_a_tensor():
        """Return a random tensor on random device."""
        dev = random.choice(devices)
38
        shape = random.choice((1, (2, 3), (4, 5, 6), (7, 8, 9, 10)))
39
40
41
42
43
44
        t = torch.rand(shape).to(dev)
        nonlocal expected
        expected += t.numel()
        return t

    # create a mixed bag of data.
45
46
    data = [1, "str"]  # list
    # dict
47
    data.append({"key1": get_a_tensor(), "key2": {1: get_a_tensor()}, "key3": 3})
48
    # set
49
    data.insert(0, set(["x", get_a_tensor(), get_a_tensor()]))
50
    # tuple
51
    data.append(([1], get_a_tensor(), 1, [get_a_tensor()], set((1, 2))))
52
    # OrderedDict
53
54
55
    od = OrderedDict()
    od["k"] = "value"
    data.append(od)
56
57
58
59
    # namedtuple
    NT = namedtuple("NT", ["key1", "key2"])
    nt = NT(key1=1, key2=get_a_tensor())
    data.append(nt)
60
61
62
63
64
65
66
67

    total = 0

    def fn(t, x=[[total]]):
        nonlocal total
        total += t.numel()
        return t

68
    new_data = apply_to_tensors(fn, data)
69
    assert total == expected, f"{total} vs. {expected}"
70
71
    for i, v in enumerate(data):
        assert type(new_data[i]) == type(v), f"expected type {type(v)} got {type(new_data[i])}"
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109


def test_pack_unpack():
    """Test pack_kwargs and unpack_kwargs."""
    kwarg_keys, flat_args = pack_kwargs(1, 2, 3, 4)
    assert kwarg_keys == tuple()
    assert flat_args == (1, 2, 3, 4)

    kwarg_keys, flat_args = pack_kwargs(a=1, b={2: "2"}, c={3}, d=[4], e=(5,))
    assert kwarg_keys == ("a", "b", "c", "d", "e")
    assert flat_args == (1, {2: "2"}, {3}, [4], (5,))

    kwarg_keys, flat_args = pack_kwargs(1, 2, a=3, b=4)
    assert kwarg_keys == ("a", "b")
    assert flat_args == (1, 2, 3, 4)

    args, kwargs = unpack_kwargs(kwarg_keys, flat_args)
    assert args == (1, 2)
    assert kwargs == {"a": 3, "b": 4}

    args, kwargs = unpack_kwargs([], flat_args)
    assert kwargs == {}
    assert args == (1, 2, 3, 4)

    args, kwargs = unpack_kwargs(["a", "b", "c", "d"], flat_args)
    assert kwargs == {"a": 1, "b": 2, "c": 3, "d": 4}
    assert args == tuple()

    with pytest.raises(AssertionError):
        # too many keys should assert.
        args, kwargs = unpack_kwargs(["a", "b", "c", "d", "e"], flat_args)


def test_split_unpack():
    """Test split_non_tensors and unpack_non_tensors."""
    x = torch.Tensor([1])
    y = torch.Tensor([2])

110
111
112
113
114
    # degenerate case, args is a single tensor.
    tensors, packed_non_tensors = split_non_tensors(x)
    assert tensors == (x,)
    assert packed_non_tensors is None

115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
    tensors, packed_non_tensors = split_non_tensors((x, y, None, 3))
    assert tensors == (x, y)
    assert packed_non_tensors == {
        "is_tensor": [True, True, False, False],
        "objects": [None, 3],
    }
    recon = unpack_non_tensors(tensors, packed_non_tensors)
    assert recon == (x, y, None, 3)

    tensors, packed_non_tensors = split_non_tensors((None, 3, x, y))
    recon = unpack_non_tensors(tensors, packed_non_tensors)
    assert recon == (None, 3, x, y)

    tensors, packed_non_tensors = split_non_tensors((None, 3))
    recon = unpack_non_tensors(tensors, packed_non_tensors)
    assert recon == (None, 3)

    tensors, packed_non_tensors = split_non_tensors((x, y))
    recon = unpack_non_tensors(tensors, packed_non_tensors)
    assert recon == (x, y)

    recon = unpack_non_tensors(tensors, None)
    assert recon == (x, y)

    with pytest.raises(AssertionError):
        # assert the second arg should be a dict.
        recon = unpack_non_tensors(tensors, set())

    with pytest.raises(AssertionError):
        # assert the content of the second arg should be sane.
        recon = unpack_non_tensors(tensors, {"is_tensor": [], "objects": []})
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162


def test_packed_sequence():
    """Test to ensure RNN packed sequences are modified correctly."""
    rnn = nn.RNN(5, 5)

    x = torch.rand((5, 1, 5), dtype=torch.float)
    seq_length = torch.tensor([4], dtype=torch.int)

    def fill_fn(x):
        x.fill_(0)

    x = nn.utils.rnn.pack_padded_sequence(x, seq_length)
    x, h = rnn(x)
    x = apply_to_tensors(fill_fn, x)
    x, _ = nn.utils.rnn.pad_packed_sequence(x)
    assert torch.sum(x) == 0