test_deterministic_dataloader.py 2.77 KB
Newer Older
zbian's avatar
zbian committed
1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import os
from functools import partial
from pathlib import Path

import pytest
Frank Lee's avatar
Frank Lee committed
9
import torch
zbian's avatar
zbian committed
10
11
import torch.distributed as dist
import torch.multiprocessing as mp
Frank Lee's avatar
Frank Lee committed
12
from torchvision import transforms
zbian's avatar
zbian committed
13
14
15
from torch.utils.data import DataLoader

import colossalai
Frank Lee's avatar
Frank Lee committed
16
17
from colossalai.builder import build_dataset, build_transform
from colossalai.context import ParallelMode, Config
zbian's avatar
zbian committed
18
19
from colossalai.core import global_context as gpc

Frank Lee's avatar
Frank Lee committed
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CONFIG = Config(
    dict(
        train_data=dict(
            dataset=dict(
                type='CIFAR10',
                root=Path(os.environ['DATA']),
                train=True,
                download=True,
            ),
            dataloader=dict(
                num_workers=2,
                batch_size=2,
                shuffle=True
            ),
zbian's avatar
zbian committed
34
35
36
37
38
39
            transform_pipeline=[
                dict(type='ToTensor'),
                dict(type='RandomCrop', size=32),
                dict(type='Normalize', mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
            ]
        ),
Frank Lee's avatar
Frank Lee committed
40
41
42
43
44
45
        parallel=dict(
            pipeline=dict(size=1),
            tensor=dict(size=1, mode=None),
        ),
        seed=1024,
    )
zbian's avatar
zbian committed
46
47
48
)


Frank Lee's avatar
Frank Lee committed
49
def run_data_sampler(rank, world_size):
zbian's avatar
zbian committed
50
51
    dist_args = dict(
        config=CONFIG,
Frank Lee's avatar
Frank Lee committed
52
        rank=rank,
zbian's avatar
zbian committed
53
54
        world_size=world_size,
        backend='gloo',
Frank Lee's avatar
Frank Lee committed
55
        port='29904',
zbian's avatar
zbian committed
56
57
        host='localhost'
    )
Frank Lee's avatar
Frank Lee committed
58
    colossalai.launch(**dist_args)
zbian's avatar
zbian committed
59

Frank Lee's avatar
Frank Lee committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    dataset_cfg = gpc.config.train_data.dataset
    dataloader_cfg = gpc.config.train_data.dataloader
    transform_cfg = gpc.config.train_data.transform_pipeline

    # build transform
    transform_pipeline = [build_transform(cfg) for cfg in transform_cfg]
    transform_pipeline = transforms.Compose(transform_pipeline)
    dataset_cfg['transform'] = transform_pipeline

    # build dataset
    dataset = build_dataset(dataset_cfg)

    # build dataloader
    dataloader = DataLoader(dataset=dataset, **dataloader_cfg)

zbian's avatar
zbian committed
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    data_iter = iter(dataloader)
    img, label = data_iter.next()
    img = img[0]

    if gpc.get_local_rank(ParallelMode.DATA) != 0:
        img_to_compare = img.clone()
    else:
        img_to_compare = img
    dist.broadcast(img_to_compare, src=0, group=gpc.get_group(ParallelMode.DATA))

    if gpc.get_local_rank(ParallelMode.DATA) != 0:
        # this is without sampler
        # this should be false if data parallel sampler to given to the dataloader
        assert torch.equal(img,
                           img_to_compare), 'Same image was distributed across ranks and expected it to be the same'
Frank Lee's avatar
Frank Lee committed
90
    torch.cuda.empty_cache()
zbian's avatar
zbian committed
91
92
93
94
95
96
97
98
99
100
101


@pytest.mark.cpu
def test_data_sampler():
    world_size = 4
    test_func = partial(run_data_sampler, world_size=world_size)
    mp.spawn(test_func, nprocs=world_size)


if __name__ == '__main__':
    test_data_sampler()