test_checkpointing.py 28 KB
Newer Older
1
import torch
2

3
4
import torch.distributed as dist

5
import deepspeed
6
7
from deepspeed.runtime.zero.stage2 import FP16_DeepSpeedZeroOptimizer
from deepspeed.runtime.zero.stage1 import FP16_DeepSpeedZeroOptimizer_Stage1
8

9
10
from deepspeed.runtime.fp16.fused_optimizer import FP16_Optimizer
from deepspeed.runtime.fp16.unfused_optimizer import FP16_UnfusedOptimizer
11

12
13
14
from deepspeed.runtime.pipe.topology import *
PipeTopo = PipeDataParallelTopology

15
16
from deepspeed.ops.op_builder import FusedLambBuilder, CPUAdamBuilder

17
18
19
20
import argparse
import pytest
import json
import os
Jeff Rasley's avatar
Jeff Rasley committed
21
import numbers
22
from common import distributed_test
23
from simple_model import *
24
25


26
27
28
29
30
31
32
33
34
def compare_deepspeed_states(saved_model, loaded_model):
    # These are compared in more depth in other places
    assert hasattr(loaded_model, 'module')

    assert saved_model.csr_tensor_module_names == loaded_model.csr_tensor_module_names
    assert saved_model.skipped_steps == loaded_model.skipped_steps
    assert saved_model.global_steps == loaded_model.global_steps


35
def compare_model_states(saved_model, loaded_model, compare_optimizer=True):
36
37
    compare_deepspeed_states(saved_model, loaded_model)

38
    for p0, p1 in zip(saved_model.module.parameters(), loaded_model.module.parameters()):
39
        assert id(p0) != id(p1), f'Comparing fp16 model state tensor against itself : {id(p0)} <====> {id(p1)}'
Jeff Rasley's avatar
Jeff Rasley committed
40
        assert torch.allclose(p0, p1, atol=1e-07), f"FP16 model state {p0} is not equal to {p1}"
41

42
43
44
    if not compare_optimizer:
        return

45
46
    if isinstance(saved_model.optimizer, FP16_DeepSpeedZeroOptimizer):
        for p0, p1 in zip(saved_model.optimizer.single_partition_of_fp32_groups, loaded_model.optimizer.single_partition_of_fp32_groups):
47
            assert id(p0) != id(p1), f'Comparing fp32 model state tensor against itself: {id(p0)} <====> {id(p1)}'
Jeff Rasley's avatar
Jeff Rasley committed
48
            assert torch.allclose(p0, p1, atol=1e-07), f"Fp32 model states {p0} is not equal to {p1}"
49

Jeff Rasley's avatar
Jeff Rasley committed
50
51
52
    elif isinstance(saved_model.optimizer, FP16_DeepSpeedZeroOptimizer_Stage1):
        for partition0, partition1 in zip(saved_model.optimizer.local_sub_partitions_of_fp32_groups, loaded_model.optimizer.local_sub_partitions_of_fp32_groups):
            for p0, p1 in zip(partition0, partition1):
53
                assert id(p0) != id(p1), f'Comparing fp32 model state tensor against itself: {id(p0)} <====> {id(p1)}'
Jeff Rasley's avatar
Jeff Rasley committed
54
                assert torch.allclose(p0, p1, atol=1e-07), f"Fp32 model states {p0} is not equal to {p1}"
Jeff Rasley's avatar
Jeff Rasley committed
55

56
57
    elif isinstance(saved_model.optimizer, FP16_Optimizer):
        for p0, p1 in zip(saved_model.optimizer.fp32_groups_flat, loaded_model.optimizer.fp32_groups_flat):
58
            assert id(p0) != id(p1), f'Comparing fp32 model state tensor against itself: {id(p0)} <====> {id(p1)}'
Jeff Rasley's avatar
Jeff Rasley committed
59
            assert torch.allclose(p0, p1, atol=1e-07), f"FP32 model states {p0} is not equal to {p1}"
60
61
62
63

    elif isinstance(saved_model.optimizer, FP16_UnfusedOptimizer):
        for params0, params1 in zip(saved_model.optimizer.fp32_groups, loaded_model.optimizer.fp32_groups):
            for p0, p1 in zip(params0, params1):
64
                assert id(p0) != id(p1), f'Comparing fp32 model state tensor against itself: {id(p0)} <====> {id(p1)}'
Jeff Rasley's avatar
Jeff Rasley committed
65
                assert torch.allclose(p0, p1, atol=1e-07), f"FP32 model states {p0} is not equal to {p1}"
66
67
    elif isinstance(saved_model.optimizer, torch.optim.Optimizer):
        pass
68
    else:
69
70
        assert False, f'Unexpected Optimizer Type: {saved_model.optimizer}'

71

72
73
74
def compare_optimizer_states(saved_model, loaded_model, hidden_dim, fp16=True):
    saved_optimizer = saved_model.optimizer.optimizer if fp16 else saved_model.optimizer
    loaded_optimizer = loaded_model.optimizer.optimizer if fp16 else loaded_model.optimizer
75

76
77
    for state0, state1 in zip(saved_optimizer.state.values(),
                              loaded_optimizer.state.values()):
78
79
        for s0, s1 in zip(state0.values(), state1.values()):
            if isinstance(s0, torch.Tensor) and isinstance(s1, torch.Tensor):
80
                assert id(s0) != id(s1), f'Comparing optimizer state tensor against itself: {id(s0)} <====> {id(s1)}'
81
82
83
84
85
                assert torch.equal(s0, s1)
            else:
                assert s0 == s1


Jeff Rasley's avatar
Jeff Rasley committed
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
def compare_lr_scheduler_states(saved_model, loaded_model):
    assert hasattr(saved_model, 'lr_scheduler')
    assert hasattr(loaded_model, 'lr_scheduler')

    saved_scheduler = saved_model.lr_scheduler
    loaded_scheduler = loaded_model.lr_scheduler

    assert hasattr(saved_scheduler, 'state_dict')
    assert hasattr(loaded_scheduler, 'state_dict')

    saved_sd = saved_scheduler.state_dict()
    loaded_sd = loaded_scheduler.state_dict()

    print(f"saved_sd = {saved_sd}")
    print(f"loaded_sd = {loaded_sd}")

    assert saved_sd.keys() == loaded_sd.keys()

    for state0, state1 in zip(saved_sd.values(), loaded_sd.values()):
        if isinstance(state0, numbers.Number) and isinstance(state1, numbers.Number):
            assert state0 == state1


109
110
111
112
113
114
115
116
117
118
119
120
121
def create_deepspeed_model(args, model, base_optimizer):
    if base_optimizer is None:
        ds_model, _, _, _ = deepspeed.initialize(args=args,
                                                 model=model,
                                                 model_parameters=model.parameters())
    else:
        ds_model, _, _, _ = deepspeed.initialize(args=args,
                                                model=model,
                                                optimizer=base_optimizer)

    return ds_model


Jeff Rasley's avatar
Jeff Rasley committed
122
def checkpoint_correctness_verification(args,
123
                                        models,
124
                                        hidden_dim,
Jeff Rasley's avatar
Jeff Rasley committed
125
126
                                        tmpdir,
                                        load_optimizer_states=False,
127
                                        load_lr_scheduler_states=False,
128
                                        fp16=True,
129
130
                                        train_batch=False,
                                        base_optimizers=[None,
131
132
                                                         None],
                                        empty_tag=False):
133
    dtype = torch.half if fp16 else torch.float32
134
135
136
137
    ds_model = create_deepspeed_model(args=args,
                                      model=models[0],
                                      base_optimizer=base_optimizers[0])

138
139
140
    data_loader = random_dataloader(model=ds_model,
                                    total_samples=50,
                                    hidden_dim=hidden_dim,
141
142
                                    device=ds_model.device,
                                    dtype=dtype)
143
144
145
146
147
148
149
150
151
152

    if train_batch:
        ds_model.set_dataloader(data_loader)
        for n, batch in enumerate(data_loader):
            loss = ds_model.train_batch()
    else:
        for n, batch in enumerate(data_loader):
            loss = ds_model(batch[0], batch[1])
            ds_model.backward(loss)
            ds_model.step()
153
154
155

    trained_model = ds_model

Jeff Rasley's avatar
Jeff Rasley committed
156
    save_folder = os.path.join(tmpdir, 'saved_checkpoint')
157
    save_tag = None if empty_tag else '1'
158

159
    trained_model.save_checkpoint(save_folder, tag=save_tag)
160

161
162
163
    loaded_model = create_deepspeed_model(args=args,
                                          model=models[1],
                                          base_optimizer=base_optimizers[1])
164
165

    loaded_model.load_checkpoint(save_folder,
166
                                 tag=save_tag,
Jeff Rasley's avatar
Jeff Rasley committed
167
168
                                 load_optimizer_states=load_optimizer_states,
                                 load_lr_scheduler_states=load_lr_scheduler_states)
169

Jeff Rasley's avatar
Jeff Rasley committed
170
    compare_model_states(trained_model, loaded_model)
171

172
    if load_optimizer_states:
173
        compare_optimizer_states(trained_model, loaded_model, hidden_dim, fp16)
Jeff Rasley's avatar
Jeff Rasley committed
174
175
176

    if load_lr_scheduler_states:
        compare_lr_scheduler_states(trained_model, loaded_model)
177
178


179
180
@pytest.mark.skipif(not deepspeed.ops.__compatible_ops__[FusedLambBuilder.NAME],
                    reason="lamb is not compatible")
181
182
183
184
185
186
187
def test_checkpoint_unfused_optimizer(tmpdir):
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
            "type": "Lamb",
            "params": {
188
                "lr": 0.00015
189
190
            }
        },
191
        "gradient_clipping": 1.0,
192
193
        "fp16": {
            "enabled": True
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
        },
        "scheduler": {
            "type": "OneCycle",
            "params": {
                "cycle_first_step_size": 1000,
                "cycle_first_stair_count": 500,
                "cycle_second_step_size": 1000,
                "cycle_second_stair_count": 500,
                "decay_step_size": 1000,
                "cycle_min_lr": 0.0001,
                "cycle_max_lr": 0.0010,
                "decay_lr_rate": 0.001,
                "cycle_min_mom": 0.85,
                "cycle_max_mom": 0.99,
                "decay_mom_rate": 0.0
            }
210
211
212
213
214
215
        }
    }

    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10

216
    models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
217
218
219

    @distributed_test(world_size=[2])
    def _test_checkpoint_unfused_optimizer(args,
220
                                           models,
221
222
                                           hidden_dim,
                                           load_optimizer_states):
Jeff Rasley's avatar
Jeff Rasley committed
223
        checkpoint_correctness_verification(args,
224
225
226
                                            models=models,
                                            hidden_dim=hidden_dim,
                                            tmpdir=tmpdir,
227
228
229
                                            load_optimizer_states=load_optimizer_states)

    _test_checkpoint_unfused_optimizer(args=args,
230
                                       models=models,
231
232
                                       hidden_dim=hidden_dim,
                                       load_optimizer_states=True)
233

234
    _test_checkpoint_unfused_optimizer(args=args,
235
                                       models=models,
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
                                       hidden_dim=hidden_dim,
                                       load_optimizer_states=False)


def test_checkpoint_fused_optimizer(tmpdir):
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.00015,
                "betas": [0.8,
                          0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7
            }
        },
        "fp16": {
            "enabled": True
        }
    }

    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10

262
    models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
263
264

    @distributed_test(world_size=[2])
265
266
267
268
    def _test_checkpoint_fused_optimizer(args,
                                         models,
                                         hidden_dim,
                                         load_optimizer_states):
Jeff Rasley's avatar
Jeff Rasley committed
269
        checkpoint_correctness_verification(args,
270
271
272
                                            models=models,
                                            hidden_dim=hidden_dim,
                                            tmpdir=tmpdir,
273
274
275
                                            load_optimizer_states=load_optimizer_states)

    _test_checkpoint_fused_optimizer(args=args,
276
                                     models=models,
277
278
                                     hidden_dim=hidden_dim,
                                     load_optimizer_states=True)
279

280
    _test_checkpoint_fused_optimizer(args=args,
281
                                     models=models,
282
283
284
285
                                     hidden_dim=hidden_dim,
                                     load_optimizer_states=False)


286
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
Jeff Rasley's avatar
Jeff Rasley committed
287
288
                         [
                             (1,
289
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
290
                             (2,
291
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
292
                             (2,
293
                              True),
Jeff Rasley's avatar
Jeff Rasley committed
294
                         ])
295
def test_checkpoint_zero_optimizer(tmpdir, zero_stage, use_cpu_offload):
296
297
    if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
        pytest.skip("cpu-adam is not compatible")
298

299
300
301
302
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
303
            "type": 'Adam',
304
305
306
307
308
309
310
311
312
313
314
            "params": {
                "lr": 0.00015,
                "betas": [0.8,
                          0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7
            }
        },
        "fp16": {
            "enabled": True
        },
Jeff Rasley's avatar
Jeff Rasley committed
315
        "zero_optimization": {
Jeff Rasley's avatar
Jeff Rasley committed
316
317
318
            "stage": zero_stage,
            "cpu_offload": use_cpu_offload
        }
319
320
321
322
    }
    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10

323
    models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
324
325

    @distributed_test(world_size=[2])
326
    def _test_checkpoint_zero_optimizer(args, models, hidden_dim, load_optimizer_states):
Jeff Rasley's avatar
Jeff Rasley committed
327
        checkpoint_correctness_verification(args,
328
329
330
                                            models=models,
                                            hidden_dim=hidden_dim,
                                            tmpdir=tmpdir,
331
332
333
                                            load_optimizer_states=load_optimizer_states)

    _test_checkpoint_zero_optimizer(args=args,
334
                                    models=models,
335
336
                                    hidden_dim=hidden_dim,
                                    load_optimizer_states=True)
Jeff Rasley's avatar
Jeff Rasley committed
337
338


339
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
Jeff Rasley's avatar
Jeff Rasley committed
340
341
                         [
                             (1,
342
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
343
                             (2,
344
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
345
                             (2,
346
                              True),
Jeff Rasley's avatar
Jeff Rasley committed
347
                         ])
348
def test_checkpoint_zero_no_optimizer(tmpdir, zero_stage, use_cpu_offload):
349
350
    if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
        pytest.skip("cpu-adam is not compatible")
351

Jeff Rasley's avatar
Jeff Rasley committed
352
353
354
355
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
356
            "type": 'Adam',
Jeff Rasley's avatar
Jeff Rasley committed
357
358
359
360
361
362
363
364
365
366
367
368
            "params": {
                "lr": 0.00015,
                "betas": [0.8,
                          0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7
            }
        },
        "fp16": {
            "enabled": True
        },
        "zero_optimization": {
Jeff Rasley's avatar
Jeff Rasley committed
369
370
371
            "stage": zero_stage,
            "cpu_offload": use_cpu_offload
        }
Jeff Rasley's avatar
Jeff Rasley committed
372
373
374
375
    }
    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10

376
    models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
Jeff Rasley's avatar
Jeff Rasley committed
377
378
379

    @distributed_test(world_size=[2])
    def _test_checkpoint_zero_no_optimizer(args,
380
                                           models,
Jeff Rasley's avatar
Jeff Rasley committed
381
382
383
                                           hidden_dim,
                                           load_optimizer_states):
        checkpoint_correctness_verification(args,
384
385
386
                                            models=models,
                                            hidden_dim=hidden_dim,
                                            tmpdir=tmpdir,
Jeff Rasley's avatar
Jeff Rasley committed
387
388
389
                                            load_optimizer_states=load_optimizer_states)

    _test_checkpoint_zero_no_optimizer(args=args,
390
                                       models=models,
Jeff Rasley's avatar
Jeff Rasley committed
391
392
393
394
                                       hidden_dim=hidden_dim,
                                       load_optimizer_states=False)


395
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
Jeff Rasley's avatar
Jeff Rasley committed
396
397
                         [
                             (0,
398
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
399
                             (1,
400
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
401
                             (2,
402
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
403
                             (2,
404
                              True),
Jeff Rasley's avatar
Jeff Rasley committed
405
                         ])
406
def test_checkpoint_lr_scheduler(tmpdir, zero_stage, use_cpu_offload):
407
408
    if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
        pytest.skip("cpu-adam is not compatible")
409

Jeff Rasley's avatar
Jeff Rasley committed
410
411
412
413
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
414
            "type": 'Adam',
Jeff Rasley's avatar
Jeff Rasley committed
415
416
417
418
419
420
421
422
423
424
425
426
            "params": {
                "lr": 0.00015,
                "betas": [0.8,
                          0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7
            }
        },
        "fp16": {
            "enabled": True
        },
        "zero_optimization": {
Jeff Rasley's avatar
Jeff Rasley committed
427
428
            "stage": zero_stage,
            "cpu_offload": use_cpu_offload
Jeff Rasley's avatar
Jeff Rasley committed
429
430
431
432
433
434
435
436
437
438
439
440
441
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 0.001,
                "warmup_num_steps": 1000
            }
        }
    }
    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10

442
    models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
Jeff Rasley's avatar
Jeff Rasley committed
443
444
445

    @distributed_test(world_size=[2])
    def _test_checkpoint_lr_scheduler(args,
446
                                      models,
Jeff Rasley's avatar
Jeff Rasley committed
447
448
449
450
451
                                      hidden_dim,
                                      load_optimizer_states,
                                      load_lr_scheduler_states):
        checkpoint_correctness_verification(
            args,
452
453
454
            models=models,
            hidden_dim=hidden_dim,
            tmpdir=tmpdir,
Jeff Rasley's avatar
Jeff Rasley committed
455
456
457
458
            load_optimizer_states=load_optimizer_states,
            load_lr_scheduler_states=load_lr_scheduler_states)

    _test_checkpoint_lr_scheduler(args=args,
459
                                  models=models,
Jeff Rasley's avatar
Jeff Rasley committed
460
461
462
463
464
                                  hidden_dim=hidden_dim,
                                  load_optimizer_states=False,
                                  load_lr_scheduler_states=True)


465
@pytest.mark.parametrize('zero_stage, use_cpu_offload',
Jeff Rasley's avatar
Jeff Rasley committed
466
467
                         [
                             (0,
468
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
469
                             (1,
470
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
471
                             (2,
472
                              False),
Jeff Rasley's avatar
Jeff Rasley committed
473
                             (2,
474
                              True),
Jeff Rasley's avatar
Jeff Rasley committed
475
                         ])
476
def test_checkpoint_no_lr_scheduler(tmpdir, zero_stage, use_cpu_offload):
477
478
    if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]:
        pytest.skip("cpu-adam is not compatible")
479

Jeff Rasley's avatar
Jeff Rasley committed
480
481
482
483
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
484
            "type": 'Adam',
Jeff Rasley's avatar
Jeff Rasley committed
485
486
487
488
489
490
491
492
            "params": {
                "lr": 1e-5
            }
        },
        "fp16": {
            "enabled": True
        },
        "zero_optimization": {
Jeff Rasley's avatar
Jeff Rasley committed
493
494
            "stage": zero_stage,
            "cpu_offload": use_cpu_offload
Jeff Rasley's avatar
Jeff Rasley committed
495
496
497
498
499
500
501
502
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 0.001,
                "warmup_num_steps": 1000
            }
Jeff Rasley's avatar
Jeff Rasley committed
503
        },
Jeff Rasley's avatar
Jeff Rasley committed
504
505
506
507
    }
    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10

508
    models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
Jeff Rasley's avatar
Jeff Rasley committed
509
510
511

    @distributed_test(world_size=[2])
    def _test_checkpoint_no_lr_scheduler(args,
512
                                         models,
Jeff Rasley's avatar
Jeff Rasley committed
513
514
515
516
517
                                         hidden_dim,
                                         load_optimizer_states,
                                         load_lr_scheduler_states):
        checkpoint_correctness_verification(
            args,
518
519
520
            models=models,
            hidden_dim=hidden_dim,
            tmpdir=tmpdir,
Jeff Rasley's avatar
Jeff Rasley committed
521
522
523
524
            load_optimizer_states=load_optimizer_states,
            load_lr_scheduler_states=load_lr_scheduler_states)

    _test_checkpoint_no_lr_scheduler(args=args,
525
                                     models=models,
Jeff Rasley's avatar
Jeff Rasley committed
526
527
528
                                     hidden_dim=hidden_dim,
                                     load_optimizer_states=False,
                                     load_lr_scheduler_states=False)
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552


def test_checkpoint_fp32_optimizer(tmpdir):
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.00015,
                "betas": [0.8,
                          0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7
            }
        },
        "fp16": {
            "enabled": False
        }
    }

    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10

553
    models = [SimpleModel(hidden_dim, empty_grad=False) for _ in range(2)]
554
555

    @distributed_test(world_size=[2])
556
557
558
559
560
561
    def _test_checkpoint_fp32_optimizer(args, models, hidden_dim):
        checkpoint_correctness_verification(args,
                                            models=models,
                                            hidden_dim=hidden_dim,
                                            tmpdir=tmpdir,
                                            fp16=False)
562

563
    _test_checkpoint_fp32_optimizer(args=args, models=models, hidden_dim=hidden_dim)
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604


@pytest.mark.parametrize("zero_stage", [0, 1])
def test_checkpoint_pipe_engine(zero_stage, tmpdir, stages=2):
    config_dict = {
        "train_batch_size": 2,
        "train_micro_batch_size_per_gpu": 1,
        "steps_per_print": 1,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 1e-5
            }
        },
        "zero_optimization": {
            "stage": zero_stage
        },
        "fp16": {
            "enabled": zero_stage > 0
        },
        "scheduler": {
            "type": "OneCycle",
            "params": {
                "cycle_first_step_size": 1000,
                "cycle_first_stair_count": 500,
                "cycle_second_step_size": 1000,
                "cycle_second_stair_count": 500,
                "decay_step_size": 1000,
                "cycle_min_lr": 0.0001,
                "cycle_max_lr": 0.0010,
                "decay_lr_rate": 0.001,
                "cycle_min_mom": 0.85,
                "cycle_max_mom": 0.99,
                "decay_mom_rate": 0.0
            }
        }
    }

    @distributed_test(world_size=4)
    def _test(save_folder, num_stages):
        args = args_from_dict(tmpdir, config_dict)
605
        models = [LinearStackPipe(num_stages=num_stages) for _ in range(2)]
606
        checkpoint_correctness_verification(args=args,
607
608
                                            models=models,
                                            hidden_dim=models[0].hidden_dim,
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
                                            tmpdir=save_folder,
                                            fp16=config_dict['fp16']['enabled'],
                                            load_optimizer_states=True,
                                            load_lr_scheduler_states=True,
                                            train_batch=True)

    _test(tmpdir, num_stages=stages)


@pytest.mark.parametrize("base_topo,test_topo",
                         [
                             (PipeTopo(num_pp=1,
                                       num_dp=4),
                              PipeTopo(num_pp=4,
                                       num_dp=1)),
                             (PipeTopo(num_pp=2,
                                       num_dp=2),
                              PipeTopo(num_pp=2,
                                       num_dp=2)),
                             (PipeTopo(num_pp=4,
                                       num_dp=1),
                              PipeTopo(num_pp=2,
                                       num_dp=2)),
                         ])
def test_checkpoint_pipe_module(base_topo, test_topo, tmpdir):
    @distributed_test(world_size=4)
    def _test(base_topo, test_topo, save_folder):
        base_model = LinearStackPipe(topology=base_topo)
        base_model.save_state_dict(save_folder)

        dist.barrier()

        test_model = LinearStackPipe(topology=test_topo)
        test_model.load_state_dir(save_folder)

        # Base and test can have different lengths, so make sure we map from the
        # smaller to larger model
        if len(base_model.forward_funcs) < len(test_model.forward_funcs):
            A = base_model
            B = test_model
        else:
            A = test_model
            B = base_model

        # Compare layers individually since partitions are different
        for idx, A_layer in enumerate(A.forward_funcs):
            if not hasattr(A_layer, 'parameters'):
                # Skip functionals, etc.
                continue

            # Find the corresponding layer in B
            global_idx = idx + A._local_start
            B_local_idx = global_idx - B._local_start
            B_layer = B.forward_funcs[B_local_idx]

            # Compare layer parameters
            for p0, p1 in zip(A_layer.parameters(), B_layer.parameters()):
                assert torch.allclose(p0, p1, atol=1e-07), f"Model state {p0} is not equal to {p1}"

    _test(base_topo, test_topo, save_folder=tmpdir)
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707


@pytest.mark.parametrize('zero_stage', [1, 2])
def test_checkpoint_zero_hybrid_optimizer_state(tmpdir, zero_stage):
    config_dict = {
        "train_micro_batch_size_per_gpu": 2,
        "gradient_accumulation_steps": 2,
        "steps_per_print": 1,
        "zero_optimization": {
            "stage": zero_stage
        },
        "zero_allow_untested_optimizer": True,
        "fp16": {
            "enabled": True,
            "initial_scale_power": 8
        }
    }

    args = args_from_dict(tmpdir, config_dict)
    hidden_dim = 10
    models = [SimpleModel(hidden_dim=hidden_dim) for _ in range(2)]
    optimizers = [HybridStateOptimizer(model.parameters()) for model in models]

    @distributed_test(world_size=[2])
    def _test_checkpoint_zero_hybrid_optimizer_state(args,
                                                     models,
                                                     optimizers,
                                                     hidden_dim):
        checkpoint_correctness_verification(args,
                                            models=models,
                                            base_optimizers=optimizers,
                                            hidden_dim=hidden_dim,
                                            tmpdir=tmpdir,
                                            load_optimizer_states=True)

    _test_checkpoint_zero_hybrid_optimizer_state(args=args,
                                                 models=models,
                                                 optimizers=optimizers,
                                                 hidden_dim=hidden_dim)
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759


def test_checkpoint_latest(tmpdir):
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.00015
            }
        }
    }
    hidden_dim = 10
    args = args_from_dict(tmpdir, config_dict)
    models = [SimpleModel(hidden_dim=hidden_dim) for _ in range(2)]

    @distributed_test(world_size=[1])
    def _helper(args, models):
        checkpoint_correctness_verification(args,
                                            models=models,
                                            hidden_dim=hidden_dim,
                                            tmpdir=tmpdir,
                                            load_optimizer_states=True,
                                            load_lr_scheduler_states=False,
                                            fp16=False,
                                            empty_tag=True)

    _helper(args, models)


def test_checkpoint_missing_latest(tmpdir):
    config_dict = {
        "train_batch_size": 2,
        "steps_per_print": 1,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.00015
            }
        }
    }
    hidden_dim = 10
    args = args_from_dict(tmpdir, config_dict)

    model = SimpleModel(hidden_dim, rank=args.local_rank)

    @distributed_test(world_size=[1])
    def _helper(args, model, hidden_dim):
        model, _, _,_ = deepspeed.initialize(args=args,
                                             model=model,
                                             model_parameters=model.parameters())
Jeff Rasley's avatar
Jeff Rasley committed
760
761
        # should be no-op, since latest doesn't exist
        model.load_checkpoint(tmpdir)
762
763

    _helper(args=args, model=model, hidden_dim=hidden_dim)