test_unified_tensor.py 2.93 KB
Newer Older
1
import dgl.multiprocessing as mp
2
import unittest, os
3
import pytest
4
5
6
7
8

import torch as th
import dgl
import backend as F

9
10
11
12
13
14
15
def start_unified_tensor_worker(dev_id, input, seq_idx, rand_idx, output_seq, output_rand):
    device = th.device('cuda:'+str(dev_id))
    th.cuda.set_device(device)
    input_unified = dgl.contrib.UnifiedTensor(input, device=device)
    output_seq.copy_(input_unified[seq_idx.to(device)].cpu())
    output_rand.copy_(input_unified[rand_idx.to(device)].cpu())

16
17
18
19
20
21
22
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='gpu only test')
def test_unified_tensor():
    test_row_size = 65536
    test_col_size = 128
    rand_test_size = 8192

23
24
    device = th.device('cuda:0')

25
    input = th.rand((test_row_size, test_col_size))
26
    input_unified = dgl.contrib.UnifiedTensor(input, device=device)
27
28

    seq_idx = th.arange(0, test_row_size)
29
    # CPU indexing
30
    assert th.all(th.eq(input[seq_idx], input_unified[seq_idx]))
31
32
    # GPU indexing
    assert th.all(th.eq(input[seq_idx].to(device), input_unified[seq_idx.to(device)]))
33
34

    rand_idx = th.randint(0, test_row_size, (rand_test_size,))
35
    # CPU indexing
36
    assert th.all(th.eq(input[rand_idx], input_unified[rand_idx]))
37
38
    # GPU indexing
    assert th.all(th.eq(input[rand_idx].to(device), input_unified[rand_idx.to(device)]))
39

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='gpu only test')
@pytest.mark.parametrize("num_workers", [1, 2])
def test_multi_gpu_unified_tensor(num_workers):
    if F.ctx().type == 'cuda' and th.cuda.device_count() < num_workers:
        pytest.skip("Not enough number of GPUs to do this test, skip multi-gpu test.")

    test_row_size = 65536
    test_col_size = 128

    rand_test_size = 8192

    input = th.rand((test_row_size, test_col_size)).share_memory_()
    seq_idx = th.arange(0, test_row_size).share_memory_()
    rand_idx = th.randint(0, test_row_size, (rand_test_size,)).share_memory_()

    output_seq = []
    output_rand = []

    output_seq_cpu = input[seq_idx]
    output_rand_cpu = input[rand_idx]

    worker_list = []

    ctx = mp.get_context('spawn')
    for i in range(num_workers):
        output_seq.append(th.zeros((test_row_size, test_col_size)).share_memory_())
        output_rand.append(th.zeros((rand_test_size, test_col_size)).share_memory_())
        p = ctx.Process(target=start_unified_tensor_worker,
                        args=(i, input, seq_idx, rand_idx, output_seq[i], output_rand[i],))
        p.start()
        worker_list.append(p)

    for p in worker_list:
        p.join()
    for p in worker_list:
        assert p.exitcode == 0
    for i in range(num_workers):
        assert th.all(th.eq(output_seq_cpu, output_seq[i]))
        assert th.all(th.eq(output_rand_cpu, output_rand[i]))


82
83
if __name__ == '__main__':
    test_unified_tensor()
84
85
    test_multi_gpu_unified_tensor(1)
    test_multi_gpu_unified_tensor(2)