test_general_checkpoint_io.py 3.35 KB
Newer Older
1
import tempfile
2

3
import pytest
4
5
6
7
import torch
from torch.optim import Adam
from torchvision.models import resnet18

8
from colossalai.booster.plugin.gemini_plugin import GeminiCheckpointIO
9
10
from colossalai.checkpoint_io import GeneralCheckpointIO
from colossalai.testing import check_state_dict_equal, clear_cache_before_run, parameterize
11

12
13
14
15
# ========
# Note:
# 1. due to checkpoint IO can be quite slow if tested with all models, we will only test on resnet for now
# 2. we will test on both sharded and unsharded checkpoints
16
# 3. implement sharded checkpoint and test it
17
18
19
# ========


20
21
@clear_cache_before_run()
@parameterize('use_safetensors', [True, False])
22
def test_unsharded_checkpoint(use_safetensors: bool):
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    # create a model and optimizer
    model = resnet18()
    optimizer = Adam(model.parameters(), lr=0.001)

    # create test data sample
    x = torch.randn(1, 3, 224, 224)

    # run fwd and bwd
    y = model(x)
    loss = y.sum()
    loss.backward()
    optimizer.step()

    # create a temp file for checkpoint
37
38
39
40
41
    if use_safetensors:
        suffix = ".safetensors"
    else:
        suffix = ".bin"
    model_ckpt_tempfile = tempfile.NamedTemporaryFile(suffix=suffix)
42
43
44
45
    optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile()

    # save the model and optimizer
    ckpt_io = GeneralCheckpointIO()
46
    ckpt_io.save_model(model, model_ckpt_tempfile.name, use_safetensors=use_safetensors)
47
48
49
50
51
52
53
    ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name)

    # create new model
    new_model = resnet18()
    new_optimizer = Adam(new_model.parameters(), lr=0.001)

    # load the model and optimizer
54
55
    ckpt_io.load_model(new_model, model_ckpt_tempfile.name)
    ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name)
56
57

    # check for model and optimizer state dict recursively
58
59
60
    check_state_dict_equal(model.state_dict(), new_model.state_dict())
    check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict())

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

@pytest.mark.parametrize('use_safetensors', [True, False])
def test_sharded_checkpoint(use_safetensors: bool):
    # create a model and optimizer
    model = resnet18()
    optimizer = Adam(model.parameters(), lr=0.001)
    # create test data sample
    x = torch.randn(1, 3, 224, 224)

    # run fwd and bwd
    y = model(x)
    loss = y.sum()
    loss.backward()
    optimizer.step()

    # create a temp file for checkpoint
    if use_safetensors:
        suffix = ".safetensors"
        SAFE_WEIGHTS_INDEX_NAME = "model.safetensors.index.json"
    else:
        suffix = ".bin"
        WEIGHTS_INDEX_NAME = "model.bin.index.json"
83

84
85
86
87
88
89
90
91
    model_ckpt_dir = tempfile.TemporaryDirectory()
    optimizer_ckpt_tempfile = tempfile.NamedTemporaryFile()

    # save the model and optimizer
    ckpt_io = GeneralCheckpointIO()

    ckpt_io.save_model(model, model_ckpt_dir.name, True, True, "", 10, use_safetensors=use_safetensors)
    ckpt_io.save_optimizer(optimizer, optimizer_ckpt_tempfile.name, shard=False)
92

93
94
95
96
97
98
99
100
    # create new model
    new_model = resnet18()
    new_optimizer = Adam(new_model.parameters(), lr=0.001)

    ckpt_io.load_model(new_model, str(model_ckpt_dir.name), strict=True)
    ckpt_io.load_optimizer(new_optimizer, optimizer_ckpt_tempfile.name)

    # check for model and optimizer state dict recursively
101
102
    check_state_dict_equal(model.state_dict(), new_model.state_dict())
    check_state_dict_equal(optimizer.state_dict(), new_optimizer.state_dict())