test_compressor_tf.py 4.46 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
3
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

4
5
from pathlib import Path
import tempfile
liuzhe-lz's avatar
liuzhe-lz committed
6
7
8
9
10
11
12
13
import unittest

import numpy as np
import tensorflow as tf


####
#
14
15
16
17
# This file tests pruners on 3 models:
#   A classic CNN model built by inheriting `Model`;
#   The same CNN model built with `Sequential`;
#   A naive model with only one linear layer.
liuzhe-lz's avatar
liuzhe-lz committed
18
#
19
# The CNN models are used to test layer detecting and instrumenting.
liuzhe-lz's avatar
liuzhe-lz committed
20
21
22
23
24
25
26
27
28
29
30
31
#
# The naive model is used to test mask calculation.
# It has a single 10x10 linear layer without bias, and `reduce_sum` its result.
# To help predicting pruning result, the linear layer has fixed initial weights:
#     [ [ 0.0, 1.0, 2.0, ..., 9.0 ], [0.1, 1.1, 2.1, ..., 9.1 ], ... , [0.9, 1.0, 2.9, ..., 9.9 ] ]
#
####


# This tensor is used as input of 10x10 linear layer, the first dimension is batch size
tensor1x10 = tf.constant([[1.0] * 10])

32
33
34
# This tensor is used as input of CNN models
image_tensor = tf.zeros([1, 10, 10, 3])

liuzhe-lz's avatar
liuzhe-lz committed
35
36
37
38
39
40
41

@unittest.skipIf(tf.__version__[0] != '2', 'Skip TF 1.x setup')
class TfCompressorTestCase(unittest.TestCase):
    def test_layer_detection(self):
        # Conv and dense layers should be compressed, pool and flatten should not.
        # This also tests instrumenting functionality.
        self._test_layer_detection_on_model(CnnModel())
42
        self._test_layer_detection_on_model(build_sequential_model())
liuzhe-lz's avatar
liuzhe-lz committed
43
44
45
46

    def _test_layer_detection_on_model(self, model):
        pruner = pruners['level'](model)
        pruner.compress()
47
        layer_types = sorted(type(wrapper.layer).__name__ for wrapper in pruner.wrappers)
liuzhe-lz's avatar
liuzhe-lz committed
48
49
        assert layer_types == ['Conv2D', 'Dense', 'Dense'], layer_types

50
    def test_level_pruner_and_export_correctness(self):
liuzhe-lz's avatar
liuzhe-lz committed
51
52
        # prune 90% : 9.0 + 9.1 + ... + 9.9 = 94.5
        model = build_naive_model()
53
54
55
        pruner = pruners['level'](model)
        model = pruner.compress()

liuzhe-lz's avatar
liuzhe-lz committed
56
57
58
        x = model(tensor1x10)
        assert x.numpy() == 94.5

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
        temp_dir = Path(tempfile.gettempdir())
        pruner.export_model(temp_dir / 'model', temp_dir / 'mask')

        # because exporting will uninstrument and re-instrument the model,
        # we must test the model again
        x = model(tensor1x10)
        assert x.numpy() == 94.5

        # load and test exported model
        exported_model = tf.keras.models.load_model(temp_dir / 'model')
        x = exported_model(tensor1x10)
        assert x.numpy() == 94.5

    def test_export_not_crash(self):
        for model in [CnnModel(), build_sequential_model()]:
            pruner = pruners['level'](model)
            model = pruner.compress()
            # cannot use model.build(image_tensor.shape) here
            # it fails even without compression
            # seems TF's bug, not ours
            model(image_tensor)
            pruner.export_model(tempfile.TemporaryDirectory().name)
liuzhe-lz's avatar
liuzhe-lz committed
81
82
83
84
85

try:
    from tensorflow.keras import Model, Sequential
    from tensorflow.keras.layers import (Conv2D, Dense, Flatten, MaxPool2D)

86
    from nni.algorithms.compression.tensorflow.pruning import LevelPruner
liuzhe-lz's avatar
liuzhe-lz committed
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

    pruners = {
        'level': (lambda model: LevelPruner(model, [{'sparsity': 0.9, 'op_types': ['default']}])),
    }

    class CnnModel(Model):
        def __init__(self):
            super().__init__()
            self.conv = Conv2D(filters=10, kernel_size=3, activation='relu')
            self.pool = MaxPool2D(pool_size=2)
            self.flatten = Flatten()
            self.fc1 = Dense(units=10, activation='relu')
            self.fc2 = Dense(units=5, activation='softmax')

        def call(self, x):
            x = self.conv(x)
            x = self.pool(x)
            x = self.flatten(x)
            x = self.fc1(x)
            x = self.fc2(x)
            return x

109
110
111
112
113
114
115
116
117
    def build_sequential_model():
        return Sequential([
            Conv2D(filters=10, kernel_size=3, activation='relu'),
            MaxPool2D(pool_size=2),
            Flatten(),
            Dense(units=10, activation='relu'),
            Dense(units=5, activation='softmax'),
        ])

liuzhe-lz's avatar
liuzhe-lz committed
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
    class NaiveModel(Model):
        def __init__(self):
            super().__init__()
            self.fc = Dense(units=10, use_bias=False)

        def call(self, x):
            return tf.math.reduce_sum(self.fc(x))

except Exception:
    pass


def build_naive_model():
    model = NaiveModel()
    model.build(tensor1x10.shape)
    weight = [[(i + j * 0.1) for i in range(10)] for j in range(10)]
    model.set_weights([np.array(weight)])
    return model


if __name__ == '__main__':
    unittest.main()