train.py 4.32 KB
Newer Older
1
2
3
4
5
6
7
import os

import torch
from titans.model.vit.vit import _create_vit_model
from tqdm import tqdm

import colossalai
8
9
from colossalai.legacy.context import ParallelMode
from colossalai.legacy.core import global_context as gpc
10
from colossalai.legacy.nn import CrossEntropyLoss
11
from colossalai.legacy.pipeline.pipelinable import PipelinableContext
12
13
from colossalai.logging import get_dist_logger
from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR
14
from colossalai.utils import is_using_pp
15
16


17
class DummyDataloader:
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    def __init__(self, length, batch_size):
        self.length = length
        self.batch_size = batch_size

    def generate(self):
        data = torch.rand(self.batch_size, 3, 224, 224)
        label = torch.randint(low=0, high=10, size=(self.batch_size,))
        return data, label

    def __iter__(self):
        self.step = 0
        return self

    def __next__(self):
        if self.step < self.length:
            self.step += 1
            return self.generate()
        else:
            raise StopIteration

    def __len__(self):
        return self.length


def main():
43
    # launch from torch
44
    parser = colossalai.legacy.get_default_parser()
45
    args = parser.parse_args()
46
    colossalai.legacy.launch_from_torch(config=args.config)
47
48
49
50
51

    # get logger
    logger = get_dist_logger()
    logger.info("initialized distributed environment", ranks=[0])

52
    if hasattr(gpc.config, "LOG_PATH"):
53
54
55
56
57
58
59
60
61
        if gpc.get_global_rank() == 0:
            log_path = gpc.config.LOG_PATH
            if not os.path.exists(log_path):
                os.mkdir(log_path)
            logger.log_to_file(log_path)

    use_pipeline = is_using_pp()

    # create model
62
63
64
65
66
67
68
69
70
71
72
    model_kwargs = dict(
        img_size=gpc.config.IMG_SIZE,
        patch_size=gpc.config.PATCH_SIZE,
        hidden_size=gpc.config.HIDDEN_SIZE,
        depth=gpc.config.DEPTH,
        num_heads=gpc.config.NUM_HEADS,
        mlp_ratio=gpc.config.MLP_RATIO,
        num_classes=10,
        init_method="jax",
        checkpoint=gpc.config.CHECKPOINT,
    )
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

    if use_pipeline:
        pipelinable = PipelinableContext()
        with pipelinable:
            model = _create_vit_model(**model_kwargs)
        pipelinable.to_layer_list()
        pipelinable.policy = "uniform"
        model = pipelinable.partition(1, gpc.pipeline_parallel_size, gpc.get_local_rank(ParallelMode.PIPELINE))
    else:
        model = _create_vit_model(**model_kwargs)

    # count number of parameters
    total_numel = 0
    for p in model.parameters():
        total_numel += p.numel()
    if not gpc.is_initialized(ParallelMode.PIPELINE):
        pipeline_stage = 0
    else:
        pipeline_stage = gpc.get_local_rank(ParallelMode.PIPELINE)
    logger.info(f"number of parameters: {total_numel} on pipeline stage {pipeline_stage}")

94
95
96
97
    # use synthetic dataset
    # we train for 10 steps and eval for 5 steps per epoch
    train_dataloader = DummyDataloader(length=10, batch_size=gpc.config.BATCH_SIZE)
    test_dataloader = DummyDataloader(length=5, batch_size=gpc.config.BATCH_SIZE)
98
99
100
101
102
103
104
105

    # create loss function
    criterion = CrossEntropyLoss(label_smoothing=0.1)

    # create optimizer
    optimizer = torch.optim.AdamW(model.parameters(), lr=gpc.config.LEARNING_RATE, weight_decay=gpc.config.WEIGHT_DECAY)

    # create lr scheduler
106
107
108
    lr_scheduler = CosineAnnealingWarmupLR(
        optimizer=optimizer, total_steps=gpc.config.NUM_EPOCHS, warmup_steps=gpc.config.WARMUP_EPOCHS
    )
109
110

    # initialize
111
112
113
114
115
116
117
    engine, train_dataloader, test_dataloader, _ = colossalai.initialize(
        model=model,
        optimizer=optimizer,
        criterion=criterion,
        train_dataloader=train_dataloader,
        test_dataloader=test_dataloader,
    )
118
119
120
121
122
123
124
125
126

    logger.info("Engine is built", ranks=[0])

    for epoch in range(gpc.config.NUM_EPOCHS):
        # training
        engine.train()
        data_iter = iter(train_dataloader)

        if gpc.get_global_rank() == 0:
127
            description = "Epoch {} / {}".format(epoch, gpc.config.NUM_EPOCHS)
128
129
130
131
132
133
134
135
            progress = tqdm(range(len(train_dataloader)), desc=description)
        else:
            progress = range(len(train_dataloader))
        for _ in progress:
            engine.zero_grad()
            engine.execute_schedule(data_iter, return_output_label=False)
            engine.step()
            lr_scheduler.step()
136
    gpc.destroy()
137
138


139
if __name__ == "__main__":
140
    main()