test_deepspeed.py 5.38 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2020 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import json
16
import os
17
import sys
18
19
20
import unittest

from transformers.integrations import is_deepspeed_available
21
from transformers.testing_utils import (
22
    CaptureStd,
23
24
25
    TestCasePlus,
    execute_subprocess_async,
    get_gpu_count,
26
    mockenv,
27
28
29
30
    require_torch_gpu,
    require_torch_multi_gpu,
    slow,
)
31
32
33
34
35
36
37
from transformers.trainer_utils import set_seed


set_seed(42)
MBART_TINY = "sshleifer/tiny-mbart"


38
39
40
41
42
def load_json(path):
    with open(path) as f:
        return json.load(f)


43
44
45
46
47
48
49
50
51
52
53
# a candidate for testing_utils
def require_deepspeed(test_case):
    """
    Decorator marking a test that requires deepspeed
    """
    if not is_deepspeed_available():
        return unittest.skip("test requires deepspeed")(test_case)
    else:
        return test_case


54
@slow
55
@require_deepspeed
56
@require_torch_gpu
57
class TestDeepSpeed(TestCasePlus):
58
59

    # this setup emulates a notebook where a launcher needs to be emulated by hand
Stas Bekman's avatar
Stas Bekman committed
60
    @mockenv(MASTER_ADDR="localhost", MASTER_PORT="10999", RANK="0", LOCAL_RANK="0", WORLD_SIZE="1")
61
62
63
64
65
66
67
68
69
70
71
    def test_fake_notebook_no_launcher(self):
        sys.path.append(self.tests_dir_str)
        from test_trainer import get_regression_trainer

        del sys.path[-1]  # restore
        ds_config_file = f"{self.test_file_dir_str}/ds_config.json"
        with CaptureStd() as cs:
            trainer = get_regression_trainer(local_rank=0, deepspeed=ds_config_file)
            trainer.train()
        assert "DeepSpeed info" in cs.out, "expected DeepSpeed logger output but got none"

72
    @require_torch_multi_gpu
73
74
    def test_basic_distributed(self):
        self.run_quick(distributed=True)
75
76
77

    @require_torch_multi_gpu
    def test_grad_acum(self):
78
        self.run_quick(distributed=True, extra_args_str="--gradient_accumulation_steps 2")
79

80
    def test_do_eval_no_train(self):
81
        # we should not fail if train is skipped
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
        output_dir = self.run_trainer(
            eval_steps=1,
            max_len=12,
            model_name=MBART_TINY,
            num_train_epochs=1,
            distributed=False,
            extra_args_str="--do_eval",
            remove_args_str="--do_train",
        )
        val_metrics = load_json(os.path.join(output_dir, "val_results.json"))
        assert "val_bleu" in val_metrics

    # XXX: need to do better validation beyond just that the run was successful
    def run_quick(self, distributed=True, extra_args_str=None, remove_args_str=None):
        output_dir = self.run_trainer(
            eval_steps=1,
            max_len=12,
            model_name=MBART_TINY,
            num_train_epochs=1,
            distributed=distributed,
            extra_args_str=extra_args_str,
            remove_args_str=remove_args_str,
        )
        train_metrics = load_json(os.path.join(output_dir, "train_results.json"))
        assert "train_runtime" in train_metrics
107
108
109
110
111
112
113

    def run_trainer(
        self,
        eval_steps: int,
        max_len: str,
        model_name: str,
        num_train_epochs: int,
114
        distributed: bool = True,
115
116
117
        extra_args_str: str = None,
        remove_args_str: str = None,
    ):
118
        data_dir = self.examples_dir / "test_data/wmt_en_ro"
119
120
121
        output_dir = self.get_auto_remove_tmp_dir()
        args = f"""
            --model_name_or_path {model_name}
122
123
            --train_file {data_dir}/train.json
            --validation_file {data_dir}/val.json
124
125
            --output_dir {output_dir}
            --overwrite_output_dir
126
127
            --max_train_samples 8
            --max_val_samples 8
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
            --max_source_length {max_len}
            --max_target_length {max_len}
            --val_max_target_length {max_len}
            --do_train
            --num_train_epochs {str(num_train_epochs)}
            --per_device_train_batch_size 4
            --learning_rate 3e-3
            --warmup_steps 8
            --predict_with_generate
            --logging_steps 0
            --save_steps {str(eval_steps)}
            --group_by_length
            --label_smoothing_factor 0.1
            --adafactor
            --task translation
143
144
            --target_lang ro_RO
            --source_lang en_XX
145
146
147
148
149
150
151
152
153
154
        """.split()

        if extra_args_str is not None:
            args.extend(extra_args_str.split())

        if remove_args_str is not None:
            remove_args = remove_args_str.split()
            args = [x for x in args if x not in remove_args]

        ds_args = f"--deepspeed {self.test_file_dir_str}/ds_config.json".split()
155
        script = [f"{self.examples_dir_str}/seq2seq/run_seq2seq.py"]
156
157
158
159
        num_gpus = get_gpu_count() if distributed else 1
        launcher = f"deepspeed --num_gpus {num_gpus}".split()

        cmd = launcher + script + args + ds_args
160
        # keep for quick debug
161
        # print(" ".join([f"PYTHONPATH={self.src_dir_str}"] +cmd)); die
162
163
164
        execute_subprocess_async(cmd, env=self.get_env())

        return output_dir