"vscode:/vscode.git/clone" did not exist on "3b04cdc8160700c75f9b5575444b4b37160275f3"
test_shared_mem.py 3.42 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import networkx as nx
import scipy.sparse as ssp
import dgl
import dgl.contrib as contrib
from dgl.frame import Frame, FrameRef, Column
from dgl.graph_index import create_graph_index
from dgl.utils import toindex
import backend as F
import dgl.function as fn
import pickle
import io
import unittest
from utils import parametrize_dtype
import multiprocessing as mp
import os

def create_test_graph(idtype):
    plays_spmat = ssp.coo_matrix(([1, 1, 1, 1], ([0, 1, 2, 1], [0, 0, 1, 1])))
    wishes_nx = nx.DiGraph()
    wishes_nx.add_nodes_from(['u0', 'u1', 'u2'], bipartite=0)
    wishes_nx.add_nodes_from(['g0', 'g1'], bipartite=1)
    wishes_nx.add_edge('u0', 'g1', id=0)
    wishes_nx.add_edge('u2', 'g0', id=1)

    follows_g = dgl.graph([(0, 1), (1, 2)], 'user', 'follows', idtype=idtype)
    plays_g = dgl.bipartite(plays_spmat, 'user', 'plays', 'game', idtype=idtype)
    wishes_g = dgl.bipartite(wishes_nx, 'user', 'wishes', 'game', idtype=idtype)
    develops_g = dgl.bipartite([(0, 0), (1, 1)], 'developer', 'develops', 'game', idtype=idtype)
    g = dgl.hetero_from_relations([follows_g, plays_g, wishes_g, develops_g])
    return g

def _assert_is_identical_hetero(g, g2):
    assert g.is_readonly == g2.is_readonly
    assert g.ntypes == g2.ntypes
    assert g.canonical_etypes == g2.canonical_etypes

    # check if two metagraphs are identical
    for edges, features in g.metagraph.edges(keys=True).items():
        assert g2.metagraph.edges(keys=True)[edges] == features

    # check if node ID spaces and feature spaces are equal
    for ntype in g.ntypes:
        assert g.number_of_nodes(ntype) == g2.number_of_nodes(ntype)

    # check if edge ID spaces and feature spaces are equal
    for etype in g.canonical_etypes:
        src, dst = g.all_edges(etype=etype, order='eid')
        src2, dst2 = g2.all_edges(etype=etype, order='eid')
        assert F.array_equal(src, src2)
        assert F.array_equal(dst, dst2)

@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@parametrize_dtype
def test_single_process(idtype):
    hg = create_test_graph(idtype=idtype)
    hg_share = hg.shared_memory("hg")
    hg_rebuild = dgl.hetero_from_shared_memory('hg')
    hg_save_again = hg_rebuild.shared_memory("hg")
    _assert_is_identical_hetero(hg, hg_share)
    _assert_is_identical_hetero(hg, hg_rebuild)
    _assert_is_identical_hetero(hg, hg_save_again)

def sub_proc(hg_origin, name):
    hg_rebuild = dgl.hetero_from_shared_memory(name)
    hg_save_again = hg_rebuild.shared_memory(name)
    _assert_is_identical_hetero(hg_origin, hg_rebuild)
    _assert_is_identical_hetero(hg_origin, hg_save_again)

@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@parametrize_dtype
def test_multi_process(idtype):
    hg = create_test_graph(idtype=idtype)
    hg_share = hg.shared_memory("hg1")
    p = mp.Process(target=sub_proc, args=(hg, "hg1"))
    p.start()
    p.join()

@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F._default_context_str == 'cpu', reason="Need gpu for this test")
def test_copy_from_gpu():
    hg = create_test_graph(idtype=F.int32)
    hg_gpu = hg.to(F.cuda())
    hg_share = hg_gpu.shared_memory("hg_gpu")
    p = mp.Process(target=sub_proc, args=(hg, "hg_gpu"))
    p.start()
    p.join()

# TODO: Test calling shared_memory with Blocks (a subclass of HeteroGraph)
if __name__ == "__main__":
    test_single_process(F.int64)
    test_multi_process(F.int32)
    test_copy_from_gpu()