test_deterministic_dataloader.py 2.63 KB
Newer Older
zbian's avatar
zbian committed
1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import os
from functools import partial
from pathlib import Path

import pytest
Frank Lee's avatar
Frank Lee committed
9
import torch
zbian's avatar
zbian committed
10
11
import torch.distributed as dist
import torch.multiprocessing as mp
Frank Lee's avatar
Frank Lee committed
12
from torchvision import transforms
zbian's avatar
zbian committed
13
14
15
from torch.utils.data import DataLoader

import colossalai
16
from colossalai.builder import build_dataset
Frank Lee's avatar
Frank Lee committed
17
from colossalai.context import ParallelMode, Config
zbian's avatar
zbian committed
18
from colossalai.core import global_context as gpc
19
from colossalai.utils import free_port
20
from colossalai.testing import rerun_if_address_is_in_use
21
from torchvision import transforms
zbian's avatar
zbian committed
22

Frank Lee's avatar
Frank Lee committed
23
24
CONFIG = Config(
    dict(
25
26
27
28
29
30
31
32
        train_data=dict(
            dataset=dict(
                type='CIFAR10',
                root=Path(os.environ['DATA']),
                train=True,
                download=True,
            ),
            dataloader=dict(num_workers=2, batch_size=2, shuffle=True),
zbian's avatar
zbian committed
33
        ),
Frank Lee's avatar
Frank Lee committed
34
35
36
37
38
        parallel=dict(
            pipeline=dict(size=1),
            tensor=dict(size=1, mode=None),
        ),
        seed=1024,
39
40
41
42
43
    ))


def run_data_sampler(rank, world_size, port):
    dist_args = dict(config=CONFIG, rank=rank, world_size=world_size, backend='gloo', port=port, host='localhost')
Frank Lee's avatar
Frank Lee committed
44
    colossalai.launch(**dist_args)
zbian's avatar
zbian committed
45

Frank Lee's avatar
Frank Lee committed
46
47
48
49
50
    dataset_cfg = gpc.config.train_data.dataset
    dataloader_cfg = gpc.config.train_data.dataloader
    transform_cfg = gpc.config.train_data.transform_pipeline

    # build transform
51
    transform_pipeline = [transforms.ToTensor(), transforms.RandomCrop(size=32)]
Frank Lee's avatar
Frank Lee committed
52
53
54
55
56
57
58
59
60
    transform_pipeline = transforms.Compose(transform_pipeline)
    dataset_cfg['transform'] = transform_pipeline

    # build dataset
    dataset = build_dataset(dataset_cfg)

    # build dataloader
    dataloader = DataLoader(dataset=dataset, **dataloader_cfg)

zbian's avatar
zbian committed
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    data_iter = iter(dataloader)
    img, label = data_iter.next()
    img = img[0]

    if gpc.get_local_rank(ParallelMode.DATA) != 0:
        img_to_compare = img.clone()
    else:
        img_to_compare = img
    dist.broadcast(img_to_compare, src=0, group=gpc.get_group(ParallelMode.DATA))

    if gpc.get_local_rank(ParallelMode.DATA) != 0:
        # this is without sampler
        # this should be false if data parallel sampler to given to the dataloader
        assert torch.equal(img,
                           img_to_compare), 'Same image was distributed across ranks and expected it to be the same'
Frank Lee's avatar
Frank Lee committed
76
    torch.cuda.empty_cache()
zbian's avatar
zbian committed
77
78


79
@pytest.mark.skip
zbian's avatar
zbian committed
80
@pytest.mark.cpu
81
@rerun_if_address_is_in_use()
zbian's avatar
zbian committed
82
83
def test_data_sampler():
    world_size = 4
84
    test_func = partial(run_data_sampler, world_size=world_size, port=free_port())
zbian's avatar
zbian committed
85
86
87
88
89
    mp.spawn(test_func, nprocs=world_size)


if __name__ == '__main__':
    test_data_sampler()