test_deterministic_dataloader.py 2.89 KB
Newer Older
zbian's avatar
zbian committed
1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import os
from functools import partial
from pathlib import Path

import pytest
Frank Lee's avatar
Frank Lee committed
9
import torch
zbian's avatar
zbian committed
10
11
import torch.distributed as dist
import torch.multiprocessing as mp
Frank Lee's avatar
Frank Lee committed
12
from torchvision import transforms
zbian's avatar
zbian committed
13
14
15
from torch.utils.data import DataLoader

import colossalai
Frank Lee's avatar
Frank Lee committed
16
17
from colossalai.builder import build_dataset, build_transform
from colossalai.context import ParallelMode, Config
zbian's avatar
zbian committed
18
from colossalai.core import global_context as gpc
19
20
from colossalai.utils import free_port
from colossalai.testing import rerun_on_exception
zbian's avatar
zbian committed
21

Frank Lee's avatar
Frank Lee committed
22
23
CONFIG = Config(
    dict(
24
25
26
27
28
        train_data=dict(dataset=dict(
            type='CIFAR10',
            root=Path(os.environ['DATA']),
            train=True,
            download=True,
zbian's avatar
zbian committed
29
        ),
30
31
32
33
34
35
                        dataloader=dict(num_workers=2, batch_size=2, shuffle=True),
                        transform_pipeline=[
                            dict(type='ToTensor'),
                            dict(type='RandomCrop', size=32),
                            dict(type='Normalize', mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
                        ]),
Frank Lee's avatar
Frank Lee committed
36
37
38
39
40
        parallel=dict(
            pipeline=dict(size=1),
            tensor=dict(size=1, mode=None),
        ),
        seed=1024,
41
42
43
44
45
    ))


def run_data_sampler(rank, world_size, port):
    dist_args = dict(config=CONFIG, rank=rank, world_size=world_size, backend='gloo', port=port, host='localhost')
Frank Lee's avatar
Frank Lee committed
46
    colossalai.launch(**dist_args)
zbian's avatar
zbian committed
47

Frank Lee's avatar
Frank Lee committed
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    dataset_cfg = gpc.config.train_data.dataset
    dataloader_cfg = gpc.config.train_data.dataloader
    transform_cfg = gpc.config.train_data.transform_pipeline

    # build transform
    transform_pipeline = [build_transform(cfg) for cfg in transform_cfg]
    transform_pipeline = transforms.Compose(transform_pipeline)
    dataset_cfg['transform'] = transform_pipeline

    # build dataset
    dataset = build_dataset(dataset_cfg)

    # build dataloader
    dataloader = DataLoader(dataset=dataset, **dataloader_cfg)

zbian's avatar
zbian committed
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
    data_iter = iter(dataloader)
    img, label = data_iter.next()
    img = img[0]

    if gpc.get_local_rank(ParallelMode.DATA) != 0:
        img_to_compare = img.clone()
    else:
        img_to_compare = img
    dist.broadcast(img_to_compare, src=0, group=gpc.get_group(ParallelMode.DATA))

    if gpc.get_local_rank(ParallelMode.DATA) != 0:
        # this is without sampler
        # this should be false if data parallel sampler to given to the dataloader
        assert torch.equal(img,
                           img_to_compare), 'Same image was distributed across ranks and expected it to be the same'
Frank Lee's avatar
Frank Lee committed
78
    torch.cuda.empty_cache()
zbian's avatar
zbian committed
79
80
81


@pytest.mark.cpu
82
@rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*")
zbian's avatar
zbian committed
83
84
def test_data_sampler():
    world_size = 4
85
    test_func = partial(run_data_sampler, world_size=world_size, port=free_port())
zbian's avatar
zbian committed
86
87
88
89
90
    mp.spawn(test_func, nprocs=world_size)


if __name__ == '__main__':
    test_data_sampler()