test_heterograph-shared-memory.py 3.09 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
import io
import multiprocessing as mp
import os
import pickle
import unittest

import backend as F

import dgl
import dgl.function as fn
11
12
13
14
import networkx as nx
import scipy.sparse as ssp
from dgl.graph_index import create_graph_index
from dgl.utils import toindex
15
from utils import parametrize_idtype
16

17
18

def create_test_graph(idtype):
19
20
21
22
23
24
25
26
27
28
29
    g = dgl.heterograph(
        (
            {
                ("user", "follows", "user"): ([0, 1], [1, 2]),
                ("user", "plays", "game"): ([0, 1, 2, 1], [0, 0, 1, 1]),
                ("user", "wishes", "game"): ([0, 2], [1, 0]),
                ("developer", "develops", "game"): ([0, 1], [0, 1]),
            }
        ),
        idtype=idtype,
    )
30
31
    return g

32

33
34
35
36
37
def _assert_is_identical_hetero(g, g2):
    assert g.ntypes == g2.ntypes
    assert g.canonical_etypes == g2.canonical_etypes

    # check if two metagraphs are identical
38
39
    for edges, features in g.metagraph().edges(keys=True).items():
        assert g2.metagraph().edges(keys=True)[edges] == features
40
41
42
43
44
45
46

    # check if node ID spaces and feature spaces are equal
    for ntype in g.ntypes:
        assert g.number_of_nodes(ntype) == g2.number_of_nodes(ntype)

    # check if edge ID spaces and feature spaces are equal
    for etype in g.canonical_etypes:
47
48
        src, dst = g.all_edges(etype=etype, order="eid")
        src2, dst2 = g2.all_edges(etype=etype, order="eid")
49
50
51
        assert F.array_equal(src, src2)
        assert F.array_equal(dst, dst2)

52
53
54
55
56

@unittest.skipIf(
    dgl.backend.backend_name == "tensorflow",
    reason="Not support tensorflow for now",
)
nv-dlasalle's avatar
nv-dlasalle committed
57
@parametrize_idtype
58
59
60
def test_single_process(idtype):
    hg = create_test_graph(idtype=idtype)
    hg_share = hg.shared_memory("hg")
61
    hg_rebuild = dgl.hetero_from_shared_memory("hg")
62
63
64
65
66
    hg_save_again = hg_rebuild.shared_memory("hg")
    _assert_is_identical_hetero(hg, hg_share)
    _assert_is_identical_hetero(hg, hg_rebuild)
    _assert_is_identical_hetero(hg, hg_save_again)

67

68
69
70
71
72
73
def sub_proc(hg_origin, name):
    hg_rebuild = dgl.hetero_from_shared_memory(name)
    hg_save_again = hg_rebuild.shared_memory(name)
    _assert_is_identical_hetero(hg_origin, hg_rebuild)
    _assert_is_identical_hetero(hg_origin, hg_save_again)

74
75
76
77
78

@unittest.skipIf(
    dgl.backend.backend_name == "tensorflow",
    reason="Not support tensorflow for now",
)
nv-dlasalle's avatar
nv-dlasalle committed
79
@parametrize_idtype
80
81
82
83
84
85
86
def test_multi_process(idtype):
    hg = create_test_graph(idtype=idtype)
    hg_share = hg.shared_memory("hg1")
    p = mp.Process(target=sub_proc, args=(hg, "hg1"))
    p.start()
    p.join()

87
88
89
90
91
92
93
94

@unittest.skipIf(
    F._default_context_str == "cpu", reason="Need gpu for this test"
)
@unittest.skipIf(
    dgl.backend.backend_name == "tensorflow",
    reason="Not support tensorflow for now",
)
95
96
97
98
99
100
101
102
def test_copy_from_gpu():
    hg = create_test_graph(idtype=F.int32)
    hg_gpu = hg.to(F.cuda())
    hg_share = hg_gpu.shared_memory("hg_gpu")
    p = mp.Process(target=sub_proc, args=(hg, "hg_gpu"))
    p.start()
    p.join()

103

104
105
106
107
# TODO: Test calling shared_memory with Blocks (a subclass of HeteroGraph)
if __name__ == "__main__":
    test_single_process(F.int64)
    test_multi_process(F.int32)
108
    test_copy_from_gpu()