"...git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "e51f19aee82c8dd874b715a09dbc521d88835d68"
Unverified Commit 89a4cc4d authored by Hongzhi (Steve), Chen's avatar Hongzhi (Steve), Chen Committed by GitHub
Browse files

[Misc] Black auto fix. (#4694)


Co-authored-by: default avatarSteve <ubuntu@ip-172-31-34-29.ap-northeast-1.compute.internal>
parent 303b150f
import dgl
import numpy as np
import unittest
import backend as F
import networkx as nx
import unittest
import numpy as np
import pytest
from test_utils.graph_cases import get_cases
from test_utils import parametrize_idtype
from test_utils.graph_cases import get_cases
import dgl
@parametrize_idtype
def test_sum_case1(idtype):
# NOTE: If you want to update this test case, remember to update the docstring
# example too!!!
g1 = dgl.graph(([0, 1], [1, 0]), idtype=idtype, device=F.ctx())
g1.ndata['h'] = F.tensor([1., 2.])
g1.ndata["h"] = F.tensor([1.0, 2.0])
g2 = dgl.graph(([0, 1], [1, 2]), idtype=idtype, device=F.ctx())
g2.ndata['h'] = F.tensor([1., 2., 3.])
g2.ndata["h"] = F.tensor([1.0, 2.0, 3.0])
bg = dgl.batch([g1, g2])
bg.ndata['w'] = F.tensor([.1, .2, .1, .5, .2])
assert F.allclose(F.tensor([3.]), dgl.sum_nodes(g1, 'h'))
assert F.allclose(F.tensor([3., 6.]), dgl.sum_nodes(bg, 'h'))
assert F.allclose(F.tensor([.5, 1.7]), dgl.sum_nodes(bg, 'h', 'w'))
bg.ndata["w"] = F.tensor([0.1, 0.2, 0.1, 0.5, 0.2])
assert F.allclose(F.tensor([3.0]), dgl.sum_nodes(g1, "h"))
assert F.allclose(F.tensor([3.0, 6.0]), dgl.sum_nodes(bg, "h"))
assert F.allclose(F.tensor([0.5, 1.7]), dgl.sum_nodes(bg, "h", "w"))
@parametrize_idtype
@pytest.mark.parametrize('g', get_cases(['homo'], exclude=['dglgraph']))
@pytest.mark.parametrize('reducer', ['sum', 'max', 'mean'])
@pytest.mark.parametrize("g", get_cases(["homo"], exclude=["dglgraph"]))
@pytest.mark.parametrize("reducer", ["sum", "max", "mean"])
def test_reduce_readout(g, idtype, reducer):
g = g.astype(idtype).to(F.ctx())
g.ndata['h'] = F.randn((g.number_of_nodes(), 3))
g.edata['h'] = F.randn((g.number_of_edges(), 2))
g.ndata["h"] = F.randn((g.number_of_nodes(), 3))
g.edata["h"] = F.randn((g.number_of_edges(), 2))
# Test.1: node readout
x = dgl.readout_nodes(g, 'h', op=reducer)
x = dgl.readout_nodes(g, "h", op=reducer)
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = dgl.readout_nodes(sg, 'h', op=reducer)
sx = dgl.readout_nodes(sg, "h", op=reducer)
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
x = getattr(dgl, '{}_nodes'.format(reducer))(g, 'h')
x = getattr(dgl, "{}_nodes".format(reducer))(g, "h")
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = getattr(dgl, '{}_nodes'.format(reducer))(sg, 'h')
sx = getattr(dgl, "{}_nodes".format(reducer))(sg, "h")
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
# Test.2: edge readout
x = dgl.readout_edges(g, 'h', op=reducer)
x = dgl.readout_edges(g, "h", op=reducer)
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = dgl.readout_edges(sg, 'h', op=reducer)
sx = dgl.readout_edges(sg, "h", op=reducer)
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
x = getattr(dgl, '{}_edges'.format(reducer))(g, 'h')
x = getattr(dgl, "{}_edges".format(reducer))(g, "h")
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = getattr(dgl, '{}_edges'.format(reducer))(sg, 'h')
sx = getattr(dgl, "{}_edges".format(reducer))(sg, "h")
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
@parametrize_idtype
@pytest.mark.parametrize('g', get_cases(['homo'], exclude=['dglgraph']))
@pytest.mark.parametrize('reducer', ['sum', 'max', 'mean'])
@pytest.mark.parametrize("g", get_cases(["homo"], exclude=["dglgraph"]))
@pytest.mark.parametrize("reducer", ["sum", "max", "mean"])
def test_weighted_reduce_readout(g, idtype, reducer):
g = g.astype(idtype).to(F.ctx())
g.ndata['h'] = F.randn((g.number_of_nodes(), 3))
g.ndata['w'] = F.randn((g.number_of_nodes(), 1))
g.edata['h'] = F.randn((g.number_of_edges(), 2))
g.edata['w'] = F.randn((g.number_of_edges(), 1))
g.ndata["h"] = F.randn((g.number_of_nodes(), 3))
g.ndata["w"] = F.randn((g.number_of_nodes(), 1))
g.edata["h"] = F.randn((g.number_of_edges(), 2))
g.edata["w"] = F.randn((g.number_of_edges(), 1))
# Test.1: node readout
x = dgl.readout_nodes(g, 'h', 'w', op=reducer)
x = dgl.readout_nodes(g, "h", "w", op=reducer)
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = dgl.readout_nodes(sg, 'h', 'w', op=reducer)
sx = dgl.readout_nodes(sg, "h", "w", op=reducer)
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
x = getattr(dgl, '{}_nodes'.format(reducer))(g, 'h', 'w')
x = getattr(dgl, "{}_nodes".format(reducer))(g, "h", "w")
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = getattr(dgl, '{}_nodes'.format(reducer))(sg, 'h', 'w')
sx = getattr(dgl, "{}_nodes".format(reducer))(sg, "h", "w")
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
# Test.2: edge readout
x = dgl.readout_edges(g, 'h', 'w', op=reducer)
x = dgl.readout_edges(g, "h", "w", op=reducer)
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = dgl.readout_edges(sg, 'h', 'w', op=reducer)
sx = dgl.readout_edges(sg, "h", "w", op=reducer)
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
x = getattr(dgl, '{}_edges'.format(reducer))(g, 'h', 'w')
x = getattr(dgl, "{}_edges".format(reducer))(g, "h", "w")
# check correctness
subg = dgl.unbatch(g)
subx = []
for sg in subg:
sx = getattr(dgl, '{}_edges'.format(reducer))(sg, 'h', 'w')
sx = getattr(dgl, "{}_edges".format(reducer))(sg, "h", "w")
subx.append(sx)
assert F.allclose(x, F.cat(subx, dim=0))
@parametrize_idtype
@pytest.mark.parametrize('g', get_cases(['homo'], exclude=['dglgraph']))
@pytest.mark.parametrize('descending', [True, False])
@pytest.mark.parametrize("g", get_cases(["homo"], exclude=["dglgraph"]))
@pytest.mark.parametrize("descending", [True, False])
def test_topk(g, idtype, descending):
g = g.astype(idtype).to(F.ctx())
g.ndata['x'] = F.randn((g.number_of_nodes(), 3))
g.ndata["x"] = F.randn((g.number_of_nodes(), 3))
# Test.1: to test the case where k > number of nodes.
dgl.topk_nodes(g, 'x', 100, sortby=-1)
dgl.topk_nodes(g, "x", 100, sortby=-1)
# Test.2: test correctness
min_nnodes = F.asnumpy(g.batch_num_nodes()).min()
if min_nnodes <= 1:
return
k = min_nnodes - 1
val, indices = dgl.topk_nodes(g, 'x', k, descending=descending, sortby=-1)
val, indices = dgl.topk_nodes(g, "x", k, descending=descending, sortby=-1)
print(k)
print(g.ndata['x'])
print('val', val)
print('indices', indices)
print(g.ndata["x"])
print("val", val)
print("indices", indices)
subg = dgl.unbatch(g)
subval, subidx = [], []
for sg in subg:
subx = F.asnumpy(sg.ndata['x'])
ai = np.argsort(subx[:,-1:].flatten())
subx = F.asnumpy(sg.ndata["x"])
ai = np.argsort(subx[:, -1:].flatten())
if descending:
ai = np.ascontiguousarray(ai[::-1])
subx = np.expand_dims(subx[ai[:k]], 0)
......@@ -150,28 +156,28 @@ def test_topk(g, idtype, descending):
assert F.allclose(indices, F.cat(subidx, dim=0))
# Test.3: sorby=None
dgl.topk_nodes(g, 'x', k, sortby=None)
dgl.topk_nodes(g, "x", k, sortby=None)
g.edata['x'] = F.randn((g.number_of_edges(), 3))
g.edata["x"] = F.randn((g.number_of_edges(), 3))
# Test.4: topk edges where k > number of edges.
dgl.topk_edges(g, 'x', 100, sortby=-1)
dgl.topk_edges(g, "x", 100, sortby=-1)
# Test.5: topk edges test correctness
min_nedges = F.asnumpy(g.batch_num_edges()).min()
if min_nedges <= 1:
return
k = min_nedges - 1
val, indices = dgl.topk_edges(g, 'x', k, descending=descending, sortby=-1)
val, indices = dgl.topk_edges(g, "x", k, descending=descending, sortby=-1)
print(k)
print(g.edata['x'])
print('val', val)
print('indices', indices)
print(g.edata["x"])
print("val", val)
print("indices", indices)
subg = dgl.unbatch(g)
subval, subidx = [], []
for sg in subg:
subx = F.asnumpy(sg.edata['x'])
ai = np.argsort(subx[:,-1:].flatten())
subx = F.asnumpy(sg.edata["x"])
ai = np.argsort(subx[:, -1:].flatten())
if descending:
ai = np.ascontiguousarray(ai[::-1])
subx = np.expand_dims(subx[ai[:k]], 0)
......@@ -181,45 +187,51 @@ def test_topk(g, idtype, descending):
assert F.allclose(val, F.cat(subval, dim=0))
assert F.allclose(indices, F.cat(subidx, dim=0))
@parametrize_idtype
@pytest.mark.parametrize('g', get_cases(['homo'], exclude=['dglgraph']))
@pytest.mark.parametrize("g", get_cases(["homo"], exclude=["dglgraph"]))
def test_softmax(g, idtype):
g = g.astype(idtype).to(F.ctx())
g.ndata['h'] = F.randn((g.number_of_nodes(), 3))
g.edata['h'] = F.randn((g.number_of_edges(), 2))
g.ndata["h"] = F.randn((g.number_of_nodes(), 3))
g.edata["h"] = F.randn((g.number_of_edges(), 2))
# Test.1: node readout
x = dgl.softmax_nodes(g, 'h')
x = dgl.softmax_nodes(g, "h")
subg = dgl.unbatch(g)
subx = []
for sg in subg:
subx.append(F.softmax(sg.ndata['h'], dim=0))
subx.append(F.softmax(sg.ndata["h"], dim=0))
assert F.allclose(x, F.cat(subx, dim=0))
# Test.2: edge readout
x = dgl.softmax_edges(g, 'h')
x = dgl.softmax_edges(g, "h")
subg = dgl.unbatch(g)
subx = []
for sg in subg:
subx.append(F.softmax(sg.edata['h'], dim=0))
subx.append(F.softmax(sg.edata["h"], dim=0))
assert F.allclose(x, F.cat(subx, dim=0))
@parametrize_idtype
@pytest.mark.parametrize('g', get_cases(['homo'], exclude=['dglgraph']))
@pytest.mark.parametrize("g", get_cases(["homo"], exclude=["dglgraph"]))
def test_broadcast(idtype, g):
g = g.astype(idtype).to(F.ctx())
gfeat = F.randn((g.batch_size, 3))
# Test.0: broadcast_nodes
g.ndata['h'] = dgl.broadcast_nodes(g, gfeat)
g.ndata["h"] = dgl.broadcast_nodes(g, gfeat)
subg = dgl.unbatch(g)
for i, sg in enumerate(subg):
assert F.allclose(sg.ndata['h'],
F.repeat(F.reshape(gfeat[i], (1,3)), sg.number_of_nodes(), dim=0))
assert F.allclose(
sg.ndata["h"],
F.repeat(F.reshape(gfeat[i], (1, 3)), sg.number_of_nodes(), dim=0),
)
# Test.1: broadcast_edges
g.edata['h'] = dgl.broadcast_edges(g, gfeat)
g.edata["h"] = dgl.broadcast_edges(g, gfeat)
subg = dgl.unbatch(g)
for i, sg in enumerate(subg):
assert F.allclose(sg.edata['h'],
F.repeat(F.reshape(gfeat[i], (1,3)), sg.number_of_edges(), dim=0))
assert F.allclose(
sg.edata["h"],
F.repeat(F.reshape(gfeat[i], (1, 3)), sg.number_of_edges(), dim=0),
)
import backend as F
import numpy as np
import dgl
from test_utils import parametrize_idtype
import dgl
@parametrize_idtype
def test_node_removal(idtype):
g = dgl.DGLGraph()
......@@ -10,27 +12,30 @@ def test_node_removal(idtype):
g.add_nodes(10)
g.add_edge(0, 0)
assert g.number_of_nodes() == 10
g.ndata['id'] = F.arange(0, 10)
g.ndata["id"] = F.arange(0, 10)
# remove nodes
g.remove_nodes(range(4, 7))
assert g.number_of_nodes() == 7
assert F.array_equal(g.ndata['id'], F.tensor([0, 1, 2, 3, 7, 8, 9]))
assert F.array_equal(g.ndata["id"], F.tensor([0, 1, 2, 3, 7, 8, 9]))
assert dgl.NID not in g.ndata
assert dgl.EID not in g.edata
# add nodes
g.add_nodes(3)
assert g.number_of_nodes() == 10
assert F.array_equal(g.ndata['id'], F.tensor([0, 1, 2, 3, 7, 8, 9, 0, 0, 0]))
assert F.array_equal(
g.ndata["id"], F.tensor([0, 1, 2, 3, 7, 8, 9, 0, 0, 0])
)
# remove nodes
g.remove_nodes(range(1, 4), store_ids=True)
assert g.number_of_nodes() == 7
assert F.array_equal(g.ndata['id'], F.tensor([0, 7, 8, 9, 0, 0, 0]))
assert F.array_equal(g.ndata["id"], F.tensor([0, 7, 8, 9, 0, 0, 0]))
assert dgl.NID in g.ndata
assert dgl.EID in g.edata
@parametrize_idtype
def test_multigraph_node_removal(idtype):
g = dgl.DGLGraph()
......@@ -59,6 +64,7 @@ def test_multigraph_node_removal(idtype):
assert g.number_of_nodes() == 3
assert g.number_of_edges() == 6
@parametrize_idtype
def test_multigraph_edge_removal(idtype):
g = dgl.DGLGraph()
......@@ -86,6 +92,7 @@ def test_multigraph_edge_removal(idtype):
assert g.number_of_nodes() == 5
assert g.number_of_edges() == 8
@parametrize_idtype
def test_edge_removal(idtype):
g = dgl.DGLGraph()
......@@ -94,13 +101,15 @@ def test_edge_removal(idtype):
for i in range(5):
for j in range(5):
g.add_edge(i, j)
g.edata['id'] = F.arange(0, 25)
g.edata["id"] = F.arange(0, 25)
# remove edges
g.remove_edges(range(13, 20))
assert g.number_of_nodes() == 5
assert g.number_of_edges() == 18
assert F.array_equal(g.edata['id'], F.tensor(list(range(13)) + list(range(20, 25))))
assert F.array_equal(
g.edata["id"], F.tensor(list(range(13)) + list(range(20, 25)))
)
assert dgl.NID not in g.ndata
assert dgl.EID not in g.edata
......@@ -108,15 +117,20 @@ def test_edge_removal(idtype):
g.add_edge(3, 3)
assert g.number_of_nodes() == 5
assert g.number_of_edges() == 19
assert F.array_equal(g.edata['id'], F.tensor(list(range(13)) + list(range(20, 25)) + [0]))
assert F.array_equal(
g.edata["id"], F.tensor(list(range(13)) + list(range(20, 25)) + [0])
)
# remove edges
g.remove_edges(range(2, 10), store_ids=True)
assert g.number_of_nodes() == 5
assert g.number_of_edges() == 11
assert F.array_equal(g.edata['id'], F.tensor([0, 1, 10, 11, 12, 20, 21, 22, 23, 24, 0]))
assert F.array_equal(
g.edata["id"], F.tensor([0, 1, 10, 11, 12, 20, 21, 22, 23, 24, 0])
)
assert dgl.EID in g.edata
@parametrize_idtype
def test_node_and_edge_removal(idtype):
g = dgl.DGLGraph()
......@@ -125,7 +139,7 @@ def test_node_and_edge_removal(idtype):
for i in range(10):
for j in range(10):
g.add_edge(i, j)
g.edata['id'] = F.arange(0, 100)
g.edata["id"] = F.arange(0, 100)
assert g.number_of_nodes() == 10
assert g.number_of_edges() == 100
......@@ -156,6 +170,7 @@ def test_node_and_edge_removal(idtype):
assert g.number_of_nodes() == 10
assert g.number_of_edges() == 48
@parametrize_idtype
def test_node_frame(idtype):
g = dgl.DGLGraph()
......@@ -163,11 +178,12 @@ def test_node_frame(idtype):
g.add_nodes(10)
data = np.random.rand(10, 3)
new_data = data.take([0, 1, 2, 7, 8, 9], axis=0)
g.ndata['h'] = F.tensor(data)
g.ndata["h"] = F.tensor(data)
# remove nodes
g.remove_nodes(range(3, 7))
assert F.allclose(g.ndata['h'], F.tensor(new_data))
assert F.allclose(g.ndata["h"], F.tensor(new_data))
@parametrize_idtype
def test_edge_frame(idtype):
......@@ -177,11 +193,12 @@ def test_edge_frame(idtype):
g.add_edges(list(range(10)), list(range(1, 10)) + [0])
data = np.random.rand(10, 3)
new_data = data.take([0, 1, 2, 7, 8, 9], axis=0)
g.edata['h'] = F.tensor(data)
g.edata["h"] = F.tensor(data)
# remove edges
g.remove_edges(range(3, 7))
assert F.allclose(g.edata['h'], F.tensor(new_data))
assert F.allclose(g.edata["h"], F.tensor(new_data))
@parametrize_idtype
def test_issue1287(idtype):
......@@ -192,20 +209,21 @@ def test_issue1287(idtype):
g.add_nodes(5)
g.add_edges([0, 2, 3, 1, 1], [1, 0, 3, 1, 0])
g.remove_nodes([0, 1])
g.ndata['h'] = F.randn((g.number_of_nodes(), 3))
g.edata['h'] = F.randn((g.number_of_edges(), 2))
g.ndata["h"] = F.randn((g.number_of_nodes(), 3))
g.edata["h"] = F.randn((g.number_of_edges(), 2))
# remove edges
# remove edges
g = dgl.DGLGraph()
g = g.astype(idtype).to(F.ctx())
g.add_nodes(5)
g.add_edges([0, 2, 3, 1, 1], [1, 0, 3, 1, 0])
g.remove_edges([0, 1])
g = g.to(F.ctx())
g.ndata['h'] = F.randn((g.number_of_nodes(), 3))
g.edata['h'] = F.randn((g.number_of_edges(), 2))
g.ndata["h"] = F.randn((g.number_of_nodes(), 3))
g.edata["h"] = F.randn((g.number_of_edges(), 2))
if __name__ == '__main__':
if __name__ == "__main__":
test_node_removal()
test_edge_removal()
test_multigraph_node_removal()
......
import os
import tempfile
import time
import unittest
import backend as F
import numpy as np
import scipy as sp
import time
import tempfile
import os
import pytest
import unittest
import scipy as sp
from dgl import DGLGraph
import dgl
import dgl.ndarray as nd
from dgl.data.utils import load_labels, save_tensors, load_tensors
from dgl import DGLGraph
from dgl.data.utils import load_labels, load_tensors, save_tensors
np.random.seed(44)
def generate_rand_graph(n, is_hetero):
arr = (sp.sparse.random(n, n, density=0.1,
format='coo') != 0).astype(np.int64)
arr = (sp.sparse.random(n, n, density=0.1, format="coo") != 0).astype(
np.int64
)
if is_hetero:
return dgl.from_scipy(arr)
else:
......@@ -28,15 +30,15 @@ def construct_graph(n, is_hetero):
g_list = []
for i in range(n):
g = generate_rand_graph(30, is_hetero)
g.edata['e1'] = F.randn((g.number_of_edges(), 32))
g.edata['e2'] = F.ones((g.number_of_edges(), 32))
g.ndata['n1'] = F.randn((g.number_of_nodes(), 64))
g.edata["e1"] = F.randn((g.number_of_edges(), 32))
g.edata["e2"] = F.ones((g.number_of_edges(), 32))
g.ndata["n1"] = F.randn((g.number_of_nodes(), 64))
g_list.append(g)
return g_list
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@pytest.mark.parametrize('is_hetero', [True, False])
@unittest.skipIf(F._default_context_str == "gpu", reason="GPU not implemented")
@pytest.mark.parametrize("is_hetero", [True, False])
def test_graph_serialize_with_feature(is_hetero):
num_graphs = 100
......@@ -66,19 +68,19 @@ def test_graph_serialize_with_feature(is_hetero):
assert F.allclose(load_g.nodes(), g_list[idx].nodes())
load_edges = load_g.all_edges('uv', 'eid')
g_edges = g_list[idx].all_edges('uv', 'eid')
load_edges = load_g.all_edges("uv", "eid")
g_edges = g_list[idx].all_edges("uv", "eid")
assert F.allclose(load_edges[0], g_edges[0])
assert F.allclose(load_edges[1], g_edges[1])
assert F.allclose(load_g.edata['e1'], g_list[idx].edata['e1'])
assert F.allclose(load_g.edata['e2'], g_list[idx].edata['e2'])
assert F.allclose(load_g.ndata['n1'], g_list[idx].ndata['n1'])
assert F.allclose(load_g.edata["e1"], g_list[idx].edata["e1"])
assert F.allclose(load_g.edata["e2"], g_list[idx].edata["e2"])
assert F.allclose(load_g.ndata["n1"], g_list[idx].ndata["n1"])
os.unlink(path)
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@pytest.mark.parametrize('is_hetero', [True, False])
@unittest.skipIf(F._default_context_str == "gpu", reason="GPU not implemented")
@pytest.mark.parametrize("is_hetero", [True, False])
def test_graph_serialize_without_feature(is_hetero):
num_graphs = 100
g_list = [generate_rand_graph(30, is_hetero) for _ in range(num_graphs)]
......@@ -98,15 +100,16 @@ def test_graph_serialize_without_feature(is_hetero):
assert F.allclose(load_g.nodes(), g_list[idx].nodes())
load_edges = load_g.all_edges('uv', 'eid')
g_edges = g_list[idx].all_edges('uv', 'eid')
load_edges = load_g.all_edges("uv", "eid")
g_edges = g_list[idx].all_edges("uv", "eid")
assert F.allclose(load_edges[0], g_edges[0])
assert F.allclose(load_edges[1], g_edges[1])
os.unlink(path)
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@pytest.mark.parametrize('is_hetero', [True, False])
@unittest.skipIf(F._default_context_str == "gpu", reason="GPU not implemented")
@pytest.mark.parametrize("is_hetero", [True, False])
def test_graph_serialize_with_labels(is_hetero):
num_graphs = 100
g_list = [generate_rand_graph(30, is_hetero) for _ in range(num_graphs)]
......@@ -122,16 +125,16 @@ def test_graph_serialize_with_labels(is_hetero):
idx_list = np.random.permutation(np.arange(num_graphs)).tolist()
loadg_list, l_labels0 = dgl.load_graphs(path, idx_list)
l_labels = load_labels(path)
assert F.allclose(l_labels['label'], labels['label'])
assert F.allclose(l_labels0['label'], labels['label'])
assert F.allclose(l_labels["label"], labels["label"])
assert F.allclose(l_labels0["label"], labels["label"])
idx = idx_list[0]
load_g = loadg_list[0]
assert F.allclose(load_g.nodes(), g_list[idx].nodes())
load_edges = load_g.all_edges('uv', 'eid')
g_edges = g_list[idx].all_edges('uv', 'eid')
load_edges = load_g.all_edges("uv", "eid")
g_edges = g_list[idx].all_edges("uv", "eid")
assert F.allclose(load_edges[0], g_edges[0])
assert F.allclose(load_edges[1], g_edges[1])
......@@ -144,8 +147,10 @@ def test_serialize_tensors():
path = f.name
f.close()
tensor_dict = {"a": F.tensor(
[1, 3, -1, 0], dtype=F.int64), "1@1": F.tensor([1.5, 2], dtype=F.float32)}
tensor_dict = {
"a": F.tensor([1, 3, -1, 0], dtype=F.int64),
"1@1": F.tensor([1.5, 2], dtype=F.float32),
}
save_tensors(path, tensor_dict)
......@@ -154,7 +159,8 @@ def test_serialize_tensors():
for key in tensor_dict:
assert key in load_tensor_dict
assert np.array_equal(
F.asnumpy(load_tensor_dict[key]), F.asnumpy(tensor_dict[key]))
F.asnumpy(load_tensor_dict[key]), F.asnumpy(tensor_dict[key])
)
load_nd_dict = load_tensors(path, return_dgl_ndarray=True)
......@@ -162,7 +168,8 @@ def test_serialize_tensors():
assert key in load_nd_dict
assert isinstance(load_nd_dict[key], nd.NDArray)
assert np.array_equal(
load_nd_dict[key].asnumpy(), F.asnumpy(tensor_dict[key]))
load_nd_dict[key].asnumpy(), F.asnumpy(tensor_dict[key])
)
os.unlink(path)
......@@ -185,103 +192,120 @@ def test_serialize_empty_dict():
def test_load_old_files1():
loadg_list, _ = dgl.load_graphs(os.path.join(
os.path.dirname(__file__), "data/1.bin"))
loadg_list, _ = dgl.load_graphs(
os.path.join(os.path.dirname(__file__), "data/1.bin")
)
idx, num_nodes, edge0, edge1, edata_e1, edata_e2, ndata_n1 = np.load(
os.path.join(os.path.dirname(__file__), "data/1.npy"), allow_pickle=True)
os.path.join(os.path.dirname(__file__), "data/1.npy"), allow_pickle=True
)
load_g = loadg_list[idx]
load_edges = load_g.all_edges('uv', 'eid')
load_edges = load_g.all_edges("uv", "eid")
assert np.allclose(F.asnumpy(load_edges[0]), edge0)
assert np.allclose(F.asnumpy(load_edges[1]), edge1)
assert np.allclose(F.asnumpy(load_g.edata['e1']), edata_e1)
assert np.allclose(F.asnumpy(load_g.edata['e2']), edata_e2)
assert np.allclose(F.asnumpy(load_g.ndata['n1']), ndata_n1)
assert np.allclose(F.asnumpy(load_g.edata["e1"]), edata_e1)
assert np.allclose(F.asnumpy(load_g.edata["e2"]), edata_e2)
assert np.allclose(F.asnumpy(load_g.ndata["n1"]), ndata_n1)
def test_load_old_files2():
loadg_list, labels0 = dgl.load_graphs(os.path.join(
os.path.dirname(__file__), "data/2.bin"))
labels1 = load_labels(os.path.join(
os.path.dirname(__file__), "data/2.bin"))
idx, edges0, edges1, np_labels = np.load(os.path.join(
os.path.dirname(__file__), "data/2.npy"), allow_pickle=True)
assert np.allclose(F.asnumpy(labels0['label']), np_labels)
assert np.allclose(F.asnumpy(labels1['label']), np_labels)
loadg_list, labels0 = dgl.load_graphs(
os.path.join(os.path.dirname(__file__), "data/2.bin")
)
labels1 = load_labels(os.path.join(os.path.dirname(__file__), "data/2.bin"))
idx, edges0, edges1, np_labels = np.load(
os.path.join(os.path.dirname(__file__), "data/2.npy"), allow_pickle=True
)
assert np.allclose(F.asnumpy(labels0["label"]), np_labels)
assert np.allclose(F.asnumpy(labels1["label"]), np_labels)
load_g = loadg_list[idx]
print(load_g)
load_edges = load_g.all_edges('uv', 'eid')
load_edges = load_g.all_edges("uv", "eid")
assert np.allclose(F.asnumpy(load_edges[0]), edges0)
assert np.allclose(F.asnumpy(load_edges[1]), edges1)
def create_heterographs(idtype):
g_x = dgl.heterograph({
('user', 'follows', 'user'): ([0, 1, 2], [1, 2, 3])}, idtype=idtype)
g_y = dgl.heterograph({
('user', 'knows', 'user'): ([0, 2], [2, 3])}, idtype=idtype).formats('csr')
g_x.ndata['h'] = F.randn((4, 3))
g_x.edata['w'] = F.randn((3, 2))
g_y.ndata['hh'] = F.ones((4, 5))
g_y.edata['ww'] = F.randn((2, 10))
g = dgl.heterograph({
('user', 'follows', 'user'): ([0, 1, 2], [1, 2, 3]),
('user', 'knows', 'user'): ([0, 2], [2, 3])
}, idtype=idtype)
g.nodes['user'].data['h'] = g_x.ndata['h']
g.nodes['user'].data['hh'] = g_y.ndata['hh']
g.edges['follows'].data['w'] = g_x.edata['w']
g.edges['knows'].data['ww'] = g_y.edata['ww']
g_x = dgl.heterograph(
{("user", "follows", "user"): ([0, 1, 2], [1, 2, 3])}, idtype=idtype
)
g_y = dgl.heterograph(
{("user", "knows", "user"): ([0, 2], [2, 3])}, idtype=idtype
).formats("csr")
g_x.ndata["h"] = F.randn((4, 3))
g_x.edata["w"] = F.randn((3, 2))
g_y.ndata["hh"] = F.ones((4, 5))
g_y.edata["ww"] = F.randn((2, 10))
g = dgl.heterograph(
{
("user", "follows", "user"): ([0, 1, 2], [1, 2, 3]),
("user", "knows", "user"): ([0, 2], [2, 3]),
},
idtype=idtype,
)
g.nodes["user"].data["h"] = g_x.ndata["h"]
g.nodes["user"].data["hh"] = g_y.ndata["hh"]
g.edges["follows"].data["w"] = g_x.edata["w"]
g.edges["knows"].data["ww"] = g_y.edata["ww"]
return [g, g_x, g_y]
def create_heterographs2(idtype):
g_x = dgl.heterograph({
('user', 'follows', 'user'): ([0, 1, 2], [1, 2, 3])}, idtype=idtype)
g_y = dgl.heterograph({
('user', 'knows', 'user'): ([0, 2], [2, 3])}, idtype=idtype).formats('csr')
g_x = dgl.heterograph(
{("user", "follows", "user"): ([0, 1, 2], [1, 2, 3])}, idtype=idtype
)
g_y = dgl.heterograph(
{("user", "knows", "user"): ([0, 2], [2, 3])}, idtype=idtype
).formats("csr")
g_z = dgl.heterograph(
{('user', 'knows', 'knowledge'): ([0, 1, 3], [2, 3, 4])}, idtype=idtype)
g_x.ndata['h'] = F.randn((4, 3))
g_x.edata['w'] = F.randn((3, 2))
g_y.ndata['hh'] = F.ones((4, 5))
g_y.edata['ww'] = F.randn((2, 10))
g = dgl.heterograph({
('user', 'follows', 'user'): ([0, 1, 2], [1, 2, 3]),
('user', 'knows', 'user'): ([0, 2], [2, 3]),
('user', 'knows', 'knowledge'): ([0, 1, 3], [2, 3, 4])
}, idtype=idtype)
g.nodes['user'].data['h'] = g_x.ndata['h']
g.edges['follows'].data['w'] = g_x.edata['w']
g.nodes['user'].data['hh'] = g_y.ndata['hh']
g.edges[('user', 'knows', 'user')].data['ww'] = g_y.edata['ww']
{("user", "knows", "knowledge"): ([0, 1, 3], [2, 3, 4])}, idtype=idtype
)
g_x.ndata["h"] = F.randn((4, 3))
g_x.edata["w"] = F.randn((3, 2))
g_y.ndata["hh"] = F.ones((4, 5))
g_y.edata["ww"] = F.randn((2, 10))
g = dgl.heterograph(
{
("user", "follows", "user"): ([0, 1, 2], [1, 2, 3]),
("user", "knows", "user"): ([0, 2], [2, 3]),
("user", "knows", "knowledge"): ([0, 1, 3], [2, 3, 4]),
},
idtype=idtype,
)
g.nodes["user"].data["h"] = g_x.ndata["h"]
g.edges["follows"].data["w"] = g_x.edata["w"]
g.nodes["user"].data["hh"] = g_y.ndata["hh"]
g.edges[("user", "knows", "user")].data["ww"] = g_y.edata["ww"]
return [g, g_x, g_y, g_z]
def test_deserialize_old_heterograph_file():
path = os.path.join(
os.path.dirname(__file__), "data/hetero1.bin")
path = os.path.join(os.path.dirname(__file__), "data/hetero1.bin")
g_list, label_dict = dgl.load_graphs(path)
assert g_list[0].idtype == F.int64
assert g_list[3].idtype == F.int32
assert np.allclose(
F.asnumpy(g_list[2].nodes['user'].data['hh']), np.ones((4, 5)))
F.asnumpy(g_list[2].nodes["user"].data["hh"]), np.ones((4, 5))
)
assert np.allclose(
F.asnumpy(g_list[5].nodes['user'].data['hh']), np.ones((4, 5)))
edges = g_list[0]['follows'].edges()
F.asnumpy(g_list[5].nodes["user"].data["hh"]), np.ones((4, 5))
)
edges = g_list[0]["follows"].edges()
assert np.allclose(F.asnumpy(edges[0]), np.array([0, 1, 2]))
assert np.allclose(F.asnumpy(edges[1]), np.array([1, 2, 3]))
assert F.allclose(label_dict['graph_label'], F.ones(54))
assert F.allclose(label_dict["graph_label"], F.ones(54))
def create_old_heterograph_files():
path = os.path.join(
os.path.dirname(__file__), "data/hetero1.bin")
path = os.path.join(os.path.dirname(__file__), "data/hetero1.bin")
g_list0 = create_heterographs(F.int64) + create_heterographs(F.int32)
labels_dict = {"graph_label": F.ones(54)}
dgl.save_graphs(path, g_list0, labels_dict)
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@unittest.skipIf(F._default_context_str == "gpu", reason="GPU not implemented")
def test_serialize_heterograph():
f = tempfile.NamedTemporaryFile(delete=False)
path = f.name
......@@ -295,15 +319,17 @@ def test_serialize_heterograph():
for i in range(len(g_list0)):
for j, etypes in enumerate(g_list0[i].canonical_etypes):
assert g_list[i].canonical_etypes[j] == etypes
#assert g_list[1].restrict_format() == 'any'
#assert g_list[2].restrict_format() == 'csr'
# assert g_list[1].restrict_format() == 'any'
# assert g_list[2].restrict_format() == 'csr'
assert g_list[4].idtype == F.int32
assert np.allclose(
F.asnumpy(g_list[2].nodes['user'].data['hh']), np.ones((4, 5)))
F.asnumpy(g_list[2].nodes["user"].data["hh"]), np.ones((4, 5))
)
assert np.allclose(
F.asnumpy(g_list[6].nodes['user'].data['hh']), np.ones((4, 5)))
edges = g_list[0]['follows'].edges()
F.asnumpy(g_list[6].nodes["user"].data["hh"]), np.ones((4, 5))
)
edges = g_list[0]["follows"].edges()
assert np.allclose(F.asnumpy(edges[0]), np.array([0, 1, 2]))
assert np.allclose(F.asnumpy(edges[1]), np.array([1, 2, 3]))
for i in range(len(g_list)):
......@@ -311,12 +337,13 @@ def test_serialize_heterograph():
assert g_list[i].etypes == g_list0[i].etypes
# test set feature after load_graph
g_list[3].nodes['user'].data['test'] = F.tensor([0, 1, 2, 4])
g_list[3].edata['test'] = F.tensor([0, 1, 2])
g_list[3].nodes["user"].data["test"] = F.tensor([0, 1, 2, 4])
g_list[3].edata["test"] = F.tensor([0, 1, 2])
os.unlink(path)
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@unittest.skipIf(F._default_context_str == "gpu", reason="GPU not implemented")
@pytest.mark.skip(reason="lack of permission on CI")
def test_serialize_heterograph_s3():
path = "s3://dglci-data-test/graph2.bin"
......@@ -325,30 +352,31 @@ def test_serialize_heterograph_s3():
g_list = dgl.load_graphs(path, [0, 2, 5])
assert g_list[0].idtype == F.int64
#assert g_list[1].restrict_format() == 'csr'
# assert g_list[1].restrict_format() == 'csr'
assert np.allclose(
F.asnumpy(g_list[1].nodes['user'].data['hh']), np.ones((4, 5)))
F.asnumpy(g_list[1].nodes["user"].data["hh"]), np.ones((4, 5))
)
assert np.allclose(
F.asnumpy(g_list[2].nodes['user'].data['hh']), np.ones((4, 5)))
edges = g_list[0]['follows'].edges()
F.asnumpy(g_list[2].nodes["user"].data["hh"]), np.ones((4, 5))
)
edges = g_list[0]["follows"].edges()
assert np.allclose(F.asnumpy(edges[0]), np.array([0, 1, 2]))
assert np.allclose(F.asnumpy(edges[1]), np.array([1, 2, 3]))
if __name__ == "__main__":
pass
#test_graph_serialize_with_feature(True)
#test_graph_serialize_with_feature(False)
#test_graph_serialize_without_feature(True)
#test_graph_serialize_without_feature(False)
#test_graph_serialize_with_labels(True)
#test_graph_serialize_with_labels(False)
#test_serialize_tensors()
#test_serialize_empty_dict()
# test_graph_serialize_with_feature(True)
# test_graph_serialize_with_feature(False)
# test_graph_serialize_without_feature(True)
# test_graph_serialize_without_feature(False)
# test_graph_serialize_with_labels(True)
# test_graph_serialize_with_labels(False)
# test_serialize_tensors()
# test_serialize_empty_dict()
# test_load_old_files1()
test_load_old_files2()
#test_serialize_heterograph()
#test_serialize_heterograph_s3()
#test_deserialize_old_heterograph_file()
#create_old_heterograph_files()
# test_serialize_heterograph()
# test_serialize_heterograph_s3()
# test_deserialize_old_heterograph_file()
# create_old_heterograph_files()
import dgl
import dgl.function as fn
from collections import Counter
import numpy as np
import scipy.sparse as ssp
import itertools
import unittest
from collections import Counter
import backend as F
import networkx as nx
import unittest, pytest
from dgl import DGLError
import numpy as np
import pytest
import scipy.sparse as ssp
from test_utils import parametrize_idtype
import dgl
import dgl.function as fn
from dgl import DGLError
def create_test_heterograph(num_nodes, num_adj, idtype):
if isinstance(num_adj, int):
num_adj = [num_adj, num_adj+1]
num_adj_list = list(np.random.choice(np.arange(num_adj[0], num_adj[1]), num_nodes))
num_adj = [num_adj, num_adj + 1]
num_adj_list = list(
np.random.choice(np.arange(num_adj[0], num_adj[1]), num_nodes)
)
src = np.concatenate([[i] * num_adj_list[i] for i in range(num_nodes)])
dst = [np.random.choice(num_nodes, nadj, replace=False) for nadj in num_adj_list]
dst = [
np.random.choice(num_nodes, nadj, replace=False)
for nadj in num_adj_list
]
dst = np.concatenate(dst)
return dgl.graph((src, dst), idtype=idtype)
def check_sort(spm, tag_arr=None, tag_pos=None):
if tag_arr is None:
tag_arr = np.arange(spm.shape[0])
......@@ -37,18 +47,20 @@ def check_sort(spm, tag_arr=None, tag_pos=None):
# `tag_pos_ptr` is the expected tag value. Here we check whether the
# tag value is equal to `tag_pos_ptr`
return False
if tag_arr[dst[j]] > tag_arr[dst[j+1]]:
if tag_arr[dst[j]] > tag_arr[dst[j + 1]]:
# The tag should be in descending order after sorting
return False
if tag_pos is not None and tag_arr[dst[j]] < tag_arr[dst[j+1]]:
if j+1 != int(tag_pos_row[tag_pos_ptr+1]):
if tag_pos is not None and tag_arr[dst[j]] < tag_arr[dst[j + 1]]:
if j + 1 != int(tag_pos_row[tag_pos_ptr + 1]):
# The boundary of tag should be consistent with `tag_pos`
return False
tag_pos_ptr = tag_arr[dst[j+1]]
tag_pos_ptr = tag_arr[dst[j + 1]]
return True
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sorting by tag not implemented")
@unittest.skipIf(
F._default_context_str == "gpu", reason="GPU sorting by tag not implemented"
)
@parametrize_idtype
def test_sort_with_tag(idtype):
num_nodes, num_adj, num_tags = 200, [20, 50], 5
......@@ -58,42 +70,50 @@ def test_sort_with_tag(idtype):
edge_tag_dst = F.gather_row(tag, F.tensor(dst))
edge_tag_src = F.gather_row(tag, F.tensor(src))
for tag_type in ['node', 'edge']:
for tag_type in ["node", "edge"]:
new_g = dgl.sort_csr_by_tag(
g, tag if tag_type == 'node' else edge_tag_dst, tag_type=tag_type)
old_csr = g.adjacency_matrix(scipy_fmt='csr')
new_csr = new_g.adjacency_matrix(scipy_fmt='csr')
assert(check_sort(new_csr, tag, new_g.dstdata["_TAG_OFFSET"]))
assert(not check_sort(old_csr, tag)) # Check the original csr is not modified.
g, tag if tag_type == "node" else edge_tag_dst, tag_type=tag_type
)
old_csr = g.adjacency_matrix(scipy_fmt="csr")
new_csr = new_g.adjacency_matrix(scipy_fmt="csr")
assert check_sort(new_csr, tag, new_g.dstdata["_TAG_OFFSET"])
assert not check_sort(
old_csr, tag
) # Check the original csr is not modified.
for tag_type in ['node', 'edge']:
for tag_type in ["node", "edge"]:
new_g = dgl.sort_csc_by_tag(
g, tag if tag_type == 'node' else edge_tag_src, tag_type=tag_type)
old_csc = g.adjacency_matrix(transpose=True, scipy_fmt='csr')
new_csc = new_g.adjacency_matrix(transpose=True, scipy_fmt='csr')
assert(check_sort(new_csc, tag, new_g.srcdata["_TAG_OFFSET"]))
assert(not check_sort(old_csc, tag))
g, tag if tag_type == "node" else edge_tag_src, tag_type=tag_type
)
old_csc = g.adjacency_matrix(transpose=True, scipy_fmt="csr")
new_csc = new_g.adjacency_matrix(transpose=True, scipy_fmt="csr")
assert check_sort(new_csc, tag, new_g.srcdata["_TAG_OFFSET"])
assert not check_sort(old_csc, tag)
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sorting by tag not implemented")
@unittest.skipIf(
F._default_context_str == "gpu", reason="GPU sorting by tag not implemented"
)
@parametrize_idtype
def test_sort_with_tag_bipartite(idtype):
num_nodes, num_adj, num_tags = 200, [20, 50], 5
g = create_test_heterograph(num_nodes, num_adj, idtype=idtype)
g = dgl.heterograph({('_U', '_E', '_V') : g.edges()})
utag = F.tensor(np.random.choice(num_tags, g.number_of_nodes('_U')))
vtag = F.tensor(np.random.choice(num_tags, g.number_of_nodes('_V')))
g = dgl.heterograph({("_U", "_E", "_V"): g.edges()})
utag = F.tensor(np.random.choice(num_tags, g.number_of_nodes("_U")))
vtag = F.tensor(np.random.choice(num_tags, g.number_of_nodes("_V")))
new_g = dgl.sort_csr_by_tag(g, vtag)
old_csr = g.adjacency_matrix(scipy_fmt='csr')
new_csr = new_g.adjacency_matrix(scipy_fmt='csr')
assert(check_sort(new_csr, vtag, new_g.nodes['_U'].data['_TAG_OFFSET']))
assert(not check_sort(old_csr, vtag))
old_csr = g.adjacency_matrix(scipy_fmt="csr")
new_csr = new_g.adjacency_matrix(scipy_fmt="csr")
assert check_sort(new_csr, vtag, new_g.nodes["_U"].data["_TAG_OFFSET"])
assert not check_sort(old_csr, vtag)
new_g = dgl.sort_csc_by_tag(g, utag)
old_csc = g.adjacency_matrix(transpose=True, scipy_fmt='csr')
new_csc = new_g.adjacency_matrix(transpose=True, scipy_fmt='csr')
assert(check_sort(new_csc, utag, new_g.nodes['_V'].data['_TAG_OFFSET']))
assert(not check_sort(old_csc, utag))
old_csc = g.adjacency_matrix(transpose=True, scipy_fmt="csr")
new_csc = new_g.adjacency_matrix(transpose=True, scipy_fmt="csr")
assert check_sort(new_csc, utag, new_g.nodes["_V"].data["_TAG_OFFSET"])
assert not check_sort(old_csc, utag)
if __name__ == "__main__":
test_sort_with_tag(F.int32)
......
from dgl.ops import gspmm, gsddmm, edge_softmax, segment_reduce
from test_utils.graph_cases import get_cases
from test_utils import parametrize_idtype
import dgl
import random
import pytest, unittest
import networkx as nx
import unittest
import backend as F
import numpy as np
import networkx as nx
import numpy as np
import pytest
import torch
from test_utils import parametrize_idtype
from test_utils.graph_cases import get_cases
import dgl
from dgl.ops import edge_softmax, gsddmm, gspmm, segment_reduce
random.seed(42)
np.random.seed(42)
udf_msg = {
'add': lambda edges: {'m': edges.src['x'] + edges.data['w']},
'sub': lambda edges: {'m': edges.src['x'] - edges.data['w']},
'mul': lambda edges: {'m': edges.src['x'] * edges.data['w']},
'div': lambda edges: {'m': edges.src['x'] / edges.data['w']},
'copy_lhs': lambda edges: {'m': edges.src['x']},
'copy_rhs': lambda edges: {'m': edges.data['w']}
"add": lambda edges: {"m": edges.src["x"] + edges.data["w"]},
"sub": lambda edges: {"m": edges.src["x"] - edges.data["w"]},
"mul": lambda edges: {"m": edges.src["x"] * edges.data["w"]},
"div": lambda edges: {"m": edges.src["x"] / edges.data["w"]},
"copy_lhs": lambda edges: {"m": edges.src["x"]},
"copy_rhs": lambda edges: {"m": edges.data["w"]},
}
def select(target, src, edge, dst):
if target == 'u':
if target == "u":
return src
elif target == 'v':
elif target == "v":
return dst
elif target == 'e':
elif target == "e":
return edge
def binary_op(msg, x, y):
if msg == 'add':
if msg == "add":
return x + y
elif msg == 'sub':
elif msg == "sub":
return x - y
elif msg == 'mul':
elif msg == "mul":
return x * y
elif msg == 'div':
elif msg == "div":
return x / y
elif msg == 'dot':
elif msg == "dot":
return F.sum(x * y, -1, keepdims=True)
elif msg == 'copy_lhs':
elif msg == "copy_lhs":
return x
elif msg == 'copy_rhs':
elif msg == "copy_rhs":
return y
def edge_func(lhs_target, rhs_target, msg):
def foo(edges):
return {
'm': binary_op(
"m": binary_op(
msg,
select(lhs_target, edges.src, edges.data, edges.dst)['x'],
select(rhs_target, edges.src, edges.data, edges.dst)['y']
select(lhs_target, edges.src, edges.data, edges.dst)["x"],
select(rhs_target, edges.src, edges.data, edges.dst)["y"],
)
}
return foo
udf_apply_edges = {
lhs_target + '_' + msg + '_' + rhs_target: edge_func(lhs_target, rhs_target, msg)
for lhs_target in ['u', 'v', 'e']
for rhs_target in ['u', 'v', 'e']
for msg in ['add', 'sub', 'mul', 'div', 'dot', 'copy_lhs', 'copy_rhs']
lhs_target
+ "_"
+ msg
+ "_"
+ rhs_target: edge_func(lhs_target, rhs_target, msg)
for lhs_target in ["u", "v", "e"]
for rhs_target in ["u", "v", "e"]
for msg in ["add", "sub", "mul", "div", "dot", "copy_lhs", "copy_rhs"]
}
udf_reduce = {
'sum': lambda nodes: {'v': F.sum(nodes.mailbox['m'], 1)},
'min': lambda nodes: {'v': F.min(nodes.mailbox['m'], 1)},
'max': lambda nodes: {'v': F.max(nodes.mailbox['m'], 1)}
"sum": lambda nodes: {"v": F.sum(nodes.mailbox["m"], 1)},
"min": lambda nodes: {"v": F.min(nodes.mailbox["m"], 1)},
"max": lambda nodes: {"v": F.max(nodes.mailbox["m"], 1)},
}
graphs = [
# dgl.rand_graph(30, 0),
# dgl.rand_graph(30, 0),
dgl.rand_graph(30, 100),
dgl.rand_bipartite('_U', '_E', '_V', 30, 40, 300)
dgl.rand_bipartite("_U", "_E", "_V", 30, 40, 300),
]
spmm_shapes = [
......@@ -81,7 +93,7 @@ spmm_shapes = [
((1,), (3,)),
((3,), (1,)),
((1,), (1,)),
((), ())
((), ()),
]
sddmm_shapes = [
......@@ -89,17 +101,18 @@ sddmm_shapes = [
((5, 3, 1, 7), (1, 3, 7, 7)),
((1, 3, 3), (4, 1, 3)),
((3,), (3,)),
((1,), (1,))
((1,), (1,)),
]
edge_softmax_shapes = [
(1,), (1, 3), (3, 4, 5)
]
edge_softmax_shapes = [(1,), (1, 3), (3, 4, 5)]
@pytest.mark.parametrize('g', graphs)
@pytest.mark.parametrize('shp', spmm_shapes)
@pytest.mark.parametrize('msg', ['add', 'sub', 'mul', 'div', 'copy_lhs', 'copy_rhs'])
@pytest.mark.parametrize('reducer', ['sum', 'min', 'max'])
@pytest.mark.parametrize("g", graphs)
@pytest.mark.parametrize("shp", spmm_shapes)
@pytest.mark.parametrize(
"msg", ["add", "sub", "mul", "div", "copy_lhs", "copy_rhs"]
)
@pytest.mark.parametrize("reducer", ["sum", "min", "max"])
@parametrize_idtype
def test_spmm(idtype, g, shp, msg, reducer):
g = g.astype(idtype).to(F.ctx())
......@@ -108,65 +121,74 @@ def test_spmm(idtype, g, shp, msg, reducer):
hu = F.tensor(np.random.rand(*((g.number_of_src_nodes(),) + shp[0])) + 1)
he = F.tensor(np.random.rand(*((g.number_of_edges(),) + shp[1])) + 1)
print('u shape: {}, e shape: {}'.format(F.shape(hu), F.shape(he)))
print("u shape: {}, e shape: {}".format(F.shape(hu), F.shape(he)))
g.srcdata['x'] = F.attach_grad(F.clone(hu))
g.edata['w'] = F.attach_grad(F.clone(he))
print('SpMM(message func: {}, reduce func: {})'.format(msg, reducer))
g.srcdata["x"] = F.attach_grad(F.clone(hu))
g.edata["w"] = F.attach_grad(F.clone(he))
print("SpMM(message func: {}, reduce func: {})".format(msg, reducer))
u = F.attach_grad(F.clone(hu))
e = F.attach_grad(F.clone(he))
with F.record_grad():
v = gspmm(g, msg, reducer, u, e)
if reducer in ['max', 'min']:
if reducer in ["max", "min"]:
v = F.replace_inf_with_zero(v)
if g.number_of_edges() > 0:
F.backward(F.reduce_sum(v))
if msg != 'copy_rhs':
if msg != "copy_rhs":
grad_u = F.grad(u)
if msg != 'copy_lhs':
if msg != "copy_lhs":
grad_e = F.grad(e)
with F.record_grad():
g.update_all(udf_msg[msg], udf_reduce[reducer])
if g.number_of_edges() > 0:
v1 = g.dstdata['v']
v1 = g.dstdata["v"]
assert F.allclose(v, v1)
print('forward passed')
print("forward passed")
F.backward(F.reduce_sum(v1))
if msg != 'copy_rhs':
if reducer in ['min', 'max']: # there might be some numerical errors
rate = F.reduce_sum(F.abs(F.grad(g.srcdata['x']) - grad_u)) /\
F.reduce_sum(F.abs(grad_u))
if msg != "copy_rhs":
if reducer in [
"min",
"max",
]: # there might be some numerical errors
rate = F.reduce_sum(
F.abs(F.grad(g.srcdata["x"]) - grad_u)
) / F.reduce_sum(F.abs(grad_u))
assert F.as_scalar(rate) < 1e-2, rate
else:
assert F.allclose(F.grad(g.srcdata['x']), grad_u)
if msg != 'copy_lhs':
if reducer in ['min', 'max']:
rate = F.reduce_sum(F.abs(F.grad(g.edata['w']) - grad_e)) /\
F.reduce_sum(F.abs(grad_e))
assert F.allclose(F.grad(g.srcdata["x"]), grad_u)
if msg != "copy_lhs":
if reducer in ["min", "max"]:
rate = F.reduce_sum(
F.abs(F.grad(g.edata["w"]) - grad_e)
) / F.reduce_sum(F.abs(grad_e))
assert F.as_scalar(rate) < 1e-2, rate
else:
assert F.allclose(F.grad(g.edata['w']), grad_e)
print('backward passed')
g.srcdata.pop('x')
g.edata.pop('w')
if 'v' in g.dstdata: g.dstdata.pop('v')
@pytest.mark.parametrize('g', graphs)
@pytest.mark.parametrize('shp', sddmm_shapes)
@pytest.mark.parametrize('lhs_target', ['u', 'v', 'e'])
@pytest.mark.parametrize('rhs_target', ['u', 'v', 'e'])
@pytest.mark.parametrize('msg', ['add', 'sub', 'mul', 'div', 'dot', 'copy_lhs', 'copy_rhs'])
assert F.allclose(F.grad(g.edata["w"]), grad_e)
print("backward passed")
g.srcdata.pop("x")
g.edata.pop("w")
if "v" in g.dstdata:
g.dstdata.pop("v")
@pytest.mark.parametrize("g", graphs)
@pytest.mark.parametrize("shp", sddmm_shapes)
@pytest.mark.parametrize("lhs_target", ["u", "v", "e"])
@pytest.mark.parametrize("rhs_target", ["u", "v", "e"])
@pytest.mark.parametrize(
"msg", ["add", "sub", "mul", "div", "dot", "copy_lhs", "copy_rhs"]
)
@parametrize_idtype
def test_sddmm(g, shp, lhs_target, rhs_target, msg, idtype):
if lhs_target == rhs_target:
return
g = g.astype(idtype).to(F.ctx())
if dgl.backend.backend_name == 'mxnet' and g.number_of_edges() == 0:
pytest.skip() # mxnet do not support zero shape tensor
if dgl.backend.backend_name == "mxnet" and g.number_of_edges() == 0:
pytest.skip() # mxnet do not support zero shape tensor
print(g)
print(g.idtype)
......@@ -174,37 +196,37 @@ def test_sddmm(g, shp, lhs_target, rhs_target, msg, idtype):
lhs_target,
g.number_of_src_nodes(),
g.number_of_edges(),
g.number_of_dst_nodes())
g.number_of_dst_nodes(),
)
lhs_shp = (len_lhs,) + shp[0]
len_rhs = select(
rhs_target,
g.number_of_src_nodes(),
g.number_of_edges(),
g.number_of_dst_nodes())
g.number_of_dst_nodes(),
)
rhs_shp = (len_rhs,) + shp[1]
feat_lhs = F.tensor(np.random.rand(*lhs_shp) + 1)
feat_rhs = F.tensor(np.random.rand(*rhs_shp) + 1)
print('lhs shape: {}, rhs shape: {}'.format(F.shape(feat_lhs), F.shape(feat_rhs)))
lhs_frame = select(
lhs_target,
g.srcdata,
g.edata,
g.dstdata)
rhs_frame = select(
rhs_target,
g.srcdata,
g.edata,
g.dstdata)
lhs_frame['x'] = F.attach_grad(F.clone(feat_lhs))
rhs_frame['y'] = F.attach_grad(F.clone(feat_rhs))
msg_func = lhs_target + '_' + msg + '_' + rhs_target
print('SDDMM(message func: {})'.format(msg_func))
print(
"lhs shape: {}, rhs shape: {}".format(
F.shape(feat_lhs), F.shape(feat_rhs)
)
)
lhs_frame = select(lhs_target, g.srcdata, g.edata, g.dstdata)
rhs_frame = select(rhs_target, g.srcdata, g.edata, g.dstdata)
lhs_frame["x"] = F.attach_grad(F.clone(feat_lhs))
rhs_frame["y"] = F.attach_grad(F.clone(feat_rhs))
msg_func = lhs_target + "_" + msg + "_" + rhs_target
print("SDDMM(message func: {})".format(msg_func))
lhs = F.attach_grad(F.clone(feat_lhs))
rhs = F.attach_grad(F.clone(feat_rhs))
with F.record_grad():
e = gsddmm(g, msg, lhs, rhs, lhs_target=lhs_target, rhs_target=rhs_target)
e = gsddmm(
g, msg, lhs, rhs, lhs_target=lhs_target, rhs_target=rhs_target
)
F.backward(F.reduce_sum(e))
grad_lhs = F.grad(lhs)
grad_rhs = F.grad(rhs)
......@@ -212,24 +234,26 @@ def test_sddmm(g, shp, lhs_target, rhs_target, msg, idtype):
with F.record_grad():
g.apply_edges(udf_apply_edges[msg_func])
if g.number_of_edges() > 0:
e1 = g.edata['m']
e1 = g.edata["m"]
assert F.allclose(e, e1)
print('forward passed')
print("forward passed")
F.backward(F.reduce_sum(e1))
if msg != 'copy_rhs':
assert F.allclose(F.grad(lhs_frame['x']), grad_lhs)
if msg != 'copy_lhs':
assert F.allclose(F.grad(rhs_frame['y']), grad_rhs)
print('backward passed')
lhs_frame.pop('x')
rhs_frame.pop('y')
if 'm' in g.edata: g.edata.pop('m')
@pytest.mark.parametrize('g', get_cases(['clique']))
@pytest.mark.parametrize('norm_by', ['src', 'dst'])
@pytest.mark.parametrize('shp', edge_softmax_shapes)
if msg != "copy_rhs":
assert F.allclose(F.grad(lhs_frame["x"]), grad_lhs)
if msg != "copy_lhs":
assert F.allclose(F.grad(rhs_frame["y"]), grad_rhs)
print("backward passed")
lhs_frame.pop("x")
rhs_frame.pop("y")
if "m" in g.edata:
g.edata.pop("m")
@pytest.mark.parametrize("g", get_cases(["clique"]))
@pytest.mark.parametrize("norm_by", ["src", "dst"])
@pytest.mark.parametrize("shp", edge_softmax_shapes)
@parametrize_idtype
def test_edge_softmax(g, norm_by, shp, idtype):
g = g.astype(idtype).to(F.ctx())
......@@ -244,21 +268,24 @@ def test_edge_softmax(g, norm_by, shp, idtype):
with F.record_grad():
e2 = F.attach_grad(F.clone(edata))
e2_2d = F.reshape(
e2, (g.number_of_src_nodes(), g.number_of_dst_nodes(), *e2.shape[1:]))
if norm_by == 'src':
e2,
(g.number_of_src_nodes(), g.number_of_dst_nodes(), *e2.shape[1:]),
)
if norm_by == "src":
score2 = F.softmax(e2_2d, 1)
score2 = F.reshape(score2, (-1, *e2.shape[1:]))
if norm_by == 'dst':
if norm_by == "dst":
score2 = F.softmax(e2_2d, 0)
score2 = F.reshape(score2, (-1, *e2.shape[1:]))
assert F.allclose(score1, score2)
print('forward passed')
print("forward passed")
F.backward(F.reduce_sum(score2))
assert F.allclose(F.grad(e2), grad_edata)
print('backward passed')
print("backward passed")
@pytest.mark.parametrize('reducer', ['sum', 'max', 'min', 'mean'])
@pytest.mark.parametrize("reducer", ["sum", "max", "min", "mean"])
def test_segment_reduce(reducer):
ctx = F.ctx()
value = F.tensor(np.random.rand(10, 5))
......@@ -266,14 +293,17 @@ def test_segment_reduce(reducer):
v2 = F.attach_grad(F.clone(value))
seglen = F.tensor([2, 3, 0, 4, 1, 0, 0])
u = F.copy_to(F.arange(0, F.shape(value)[0], F.int32), ctx)
v = F.repeat(F.copy_to(F.arange(0, len(seglen), F.int32), ctx),
seglen, dim=0)
num_nodes = {'_U': len(u), '_V': len(seglen)}
g = dgl.convert.heterograph({('_U', '_E', '_V'): (u, v)}, num_nodes_dict=num_nodes)
v = F.repeat(
F.copy_to(F.arange(0, len(seglen), F.int32), ctx), seglen, dim=0
)
num_nodes = {"_U": len(u), "_V": len(seglen)}
g = dgl.convert.heterograph(
{("_U", "_E", "_V"): (u, v)}, num_nodes_dict=num_nodes
)
with F.record_grad():
rst1 = gspmm(g, 'copy_lhs', reducer, v1, None)
if reducer in ['max', 'min']:
rst1 = gspmm(g, "copy_lhs", reducer, v1, None)
if reducer in ["max", "min"]:
rst1 = F.replace_inf_with_zero(rst1)
F.backward(F.reduce_sum(rst1))
grad1 = F.grad(v1)
......@@ -282,24 +312,36 @@ def test_segment_reduce(reducer):
rst2 = segment_reduce(seglen, v2, reducer=reducer)
F.backward(F.reduce_sum(rst2))
assert F.allclose(rst1, rst2)
print('forward passed')
print("forward passed")
grad2 = F.grad(v2)
assert F.allclose(grad1, grad2)
print('backward passed')
print("backward passed")
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
@unittest.skipIf(
dgl.backend.backend_name != "pytorch", reason="Only support PyTorch for now"
)
@parametrize_idtype
@pytest.mark.parametrize('feat_size', [1, 8, 16, 64, 256])
@pytest.mark.parametrize('dtype,tol', [(torch.float16,1e-2),(torch.float32,3e-3),(torch.float64,1e-4)])
@pytest.mark.parametrize("feat_size", [1, 8, 16, 64, 256])
@pytest.mark.parametrize(
"dtype,tol",
[(torch.float16, 1e-2), (torch.float32, 3e-3), (torch.float64, 1e-4)],
)
def test_segment_mm(idtype, feat_size, dtype, tol):
if F._default_context_str == 'cpu' and dtype == torch.float16:
pytest.skip("fp16 support for CPU linalg functions has been removed in PyTorch.")
if F._default_context_str == "cpu" and dtype == torch.float16:
pytest.skip(
"fp16 support for CPU linalg functions has been removed in PyTorch."
)
dev = F.ctx()
# input
a = torch.tensor(np.random.rand(100, feat_size)).to(dev).to(dtype)
a.requires_grad_()
b = torch.tensor(np.random.rand(10, feat_size, feat_size + 1)).to(dev).to(dtype)
b = (
torch.tensor(np.random.rand(10, feat_size, feat_size + 1))
.to(dev)
.to(dtype)
)
b.requires_grad_()
seglen_a = torch.tensor([10, 15, 8, 0, 1, 9, 18, 24, 15, 0])
dc = torch.tensor(np.random.rand(100, feat_size + 1)).to(dev).to(dtype)
......@@ -312,7 +354,7 @@ def test_segment_mm(idtype, feat_size, dtype, tol):
c_t = []
off = 0
for i, l in enumerate(seglen_a):
c_t.append(a[off:off+l] @ b[i])
c_t.append(a[off : off + l] @ b[i])
off += l
c_t = torch.cat(c_t).to(dtype)
a.grad.zero_()
......@@ -325,11 +367,15 @@ def test_segment_mm(idtype, feat_size, dtype, tol):
assert torch.allclose(da, da_t, atol=tol, rtol=tol)
assert torch.allclose(db, db_t, atol=tol, rtol=tol)
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
@unittest.skipIf(
dgl.backend.backend_name != "pytorch", reason="Only support PyTorch for now"
)
@parametrize_idtype
@pytest.mark.parametrize('feat_size', [1, 8, 16, 64, 256])
@pytest.mark.parametrize("feat_size", [1, 8, 16, 64, 256])
def test_gather_mm_idx_b(idtype, feat_size):
import torch
dev = F.ctx()
# input
a = torch.tensor(np.random.rand(100, feat_size)).to(dev)
......@@ -355,12 +401,16 @@ def test_gather_mm_idx_b(idtype, feat_size):
assert torch.allclose(da, da_t, atol=1e-4, rtol=1e-4)
assert torch.allclose(db, db_t, atol=1e-4, rtol=1e-4)
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
@unittest.skipIf(
dgl.backend.backend_name != "pytorch", reason="Only support PyTorch for now"
)
@parametrize_idtype
@pytest.mark.parametrize('feat_size', [1, 8, 16, 64, 256])
@pytest.mark.parametrize("feat_size", [1, 8, 16, 64, 256])
def _test_gather_mm_idx_a(idtype, feat_size):
# TODO(minjie): currently disabled due to bugs in the CUDA kernel. Need to fix it later.
import torch
dev = F.ctx()
# input
a = torch.tensor(np.random.rand(10, feat_size)).to(dev)
......@@ -386,10 +436,16 @@ def _test_gather_mm_idx_a(idtype, feat_size):
assert torch.allclose(da, da_t, atol=1e-4, rtol=1e-4)
assert torch.allclose(db, db_t, atol=1e-4, rtol=1e-4)
@unittest.skipIf(dgl.backend.backend_name != 'pytorch', reason='Only support PyTorch for now')
@unittest.skipIf(F._default_context_str == 'gpu', reason="Libxsmm only fit in CPU.")
@unittest.skipIf(
dgl.backend.backend_name != "pytorch", reason="Only support PyTorch for now"
)
@unittest.skipIf(
F._default_context_str == "gpu", reason="Libxsmm only fit in CPU."
)
def test_use_libxsmm_switch():
import torch
g = dgl.graph(([0, 0, 0, 1, 1, 2], [0, 1, 2, 1, 2, 2]))
x = torch.ones(3, 2, requires_grad=True)
y = torch.arange(1, 13).float().view(6, 2).requires_grad_()
......
This diff is collapsed.
import itertools
import random
import sys
import time
import unittest
import dgl
import backend as F
import networkx as nx
import numpy as np
import scipy.sparse as sp
import backend as F
import itertools
from test_utils import parametrize_idtype
import dgl
np.random.seed(42)
def toset(x):
# F.zerocopy_to_numpy may return a int
return set(F.zerocopy_to_numpy(x).tolist())
@parametrize_idtype
def test_bfs(idtype, n=100):
def _bfs_nx(g_nx, src):
......@@ -59,6 +61,7 @@ def test_bfs(idtype, n=100):
assert len(edges_dgl) == len(edges_nx)
assert all(toset(x) == y for x, y in zip(edges_dgl, edges_nx))
@parametrize_idtype
def test_topological_nodes(idtype, n=100):
a = sp.random(n, n, 3 / n, data_rvs=lambda n: np.ones(n))
......@@ -68,12 +71,13 @@ def test_topological_nodes(idtype, n=100):
layers_dgl = dgl.topological_nodes_generator(g)
adjmat = g.adjacency_matrix(transpose=True)
def tensor_topo_traverse():
n = g.number_of_nodes()
mask = F.copy_to(F.ones((n, 1)), F.cpu())
degree = F.spmm(adjmat, mask)
while F.reduce_sum(mask) != 0.:
v = F.astype((degree == 0.), F.float32)
while F.reduce_sum(mask) != 0.0:
v = F.astype((degree == 0.0), F.float32)
v = v * mask
mask = mask - v
frontier = F.copy_to(F.nonzero_1d(F.squeeze(v, 1)), F.cpu())
......@@ -85,33 +89,41 @@ def test_topological_nodes(idtype, n=100):
assert len(layers_dgl) == len(layers_spmv)
assert all(toset(x) == toset(y) for x, y in zip(layers_dgl, layers_spmv))
DFS_LABEL_NAMES = ['forward', 'reverse', 'nontree']
DFS_LABEL_NAMES = ["forward", "reverse", "nontree"]
@parametrize_idtype
def test_dfs_labeled_edges(idtype, example=False):
dgl_g = dgl.DGLGraph().astype(idtype)
dgl_g.add_nodes(6)
dgl_g.add_edges([0, 1, 0, 3, 3], [1, 2, 2, 4, 5])
dgl_edges, dgl_labels = dgl.dfs_labeled_edges_generator(
dgl_g, [0, 3], has_reverse_edge=True, has_nontree_edge=True)
dgl_g, [0, 3], has_reverse_edge=True, has_nontree_edge=True
)
dgl_edges = [toset(t) for t in dgl_edges]
dgl_labels = [toset(t) for t in dgl_labels]
g1_solutions = [
# edges labels
[[0, 1, 1, 0, 2], [0, 0, 1, 1, 2]],
[[2, 2, 0, 1, 0], [0, 1, 0, 2, 1]],
# edges labels
[[0, 1, 1, 0, 2], [0, 0, 1, 1, 2]],
[[2, 2, 0, 1, 0], [0, 1, 0, 2, 1]],
]
g2_solutions = [
# edges labels
[[3, 3, 4, 4], [0, 1, 0, 1]],
[[4, 4, 3, 3], [0, 1, 0, 1]],
# edges labels
[[3, 3, 4, 4], [0, 1, 0, 1]],
[[4, 4, 3, 3], [0, 1, 0, 1]],
]
def combine_frontiers(sol):
es, ls = zip(*sol)
es = [set(i for i in t if i is not None)
for t in itertools.zip_longest(*es)]
ls = [set(i for i in t if i is not None)
for t in itertools.zip_longest(*ls)]
es = [
set(i for i in t if i is not None)
for t in itertools.zip_longest(*es)
]
ls = [
set(i for i in t if i is not None)
for t in itertools.zip_longest(*ls)
]
return es, ls
for sol_set in itertools.product(g1_solutions, g2_solutions):
......@@ -121,7 +133,8 @@ def test_dfs_labeled_edges(idtype, example=False):
else:
assert False
if __name__ == '__main__':
test_bfs(idtype='int32')
test_topological_nodes(idtype='int32')
test_dfs_labeled_edges(idtype='int32')
if __name__ == "__main__":
test_bfs(idtype="int32")
test_topological_nodes(idtype="int32")
test_dfs_labeled_edges(idtype="int32")
# NOTE(vibwu): Currently cugraph must be imported before torch to avoid a resource cleanup issue.
# See https://github.com/rapidsai/cugraph/issues/2718
import cugraph
import unittest
import backend as F
import dgl
import cugraph
import numpy as np
from dgl import DGLGraph
import unittest
import pytest
import dgl
from dgl import DGLGraph
def test_dummy():
cg = cugraph.Graph()
assert cg is not None
def test_to_cugraph_conversion():
g = dgl.graph((F.tensor([0, 1, 2, 3]), F.tensor([1, 0, 3, 2]))).to('cuda')
g = dgl.graph((F.tensor([0, 1, 2, 3]), F.tensor([1, 0, 3, 2]))).to("cuda")
cugraph_g = g.to_cugraph()
assert cugraph_g.number_of_nodes()==g.number_of_nodes()
assert cugraph_g.number_of_edges()==g.number_of_edges()
assert cugraph_g.number_of_nodes() == g.number_of_nodes()
assert cugraph_g.number_of_edges() == g.number_of_edges()
assert cugraph_g.has_edge(0, 1)
assert cugraph_g.has_edge(1, 0)
assert cugraph_g.has_edge(3, 2)
def test_from_cugraph_conversion():
# cudf is a dependency of cugraph
import cudf
# directed graph conversion test
cugraph_g = cugraph.Graph(directed=True)
df = cudf.DataFrame({"source":[0, 1, 2, 3],
"destination":[1, 2, 3, 2]})
df = cudf.DataFrame({"source": [0, 1, 2, 3], "destination": [1, 2, 3, 2]})
cugraph_g.from_cudf_edgelist(df)
g = dgl.from_cugraph(cugraph_g)
assert g.device.type == 'cuda'
assert g.device.type == "cuda"
assert g.number_of_nodes() == cugraph_g.number_of_nodes()
assert g.number_of_edges() == cugraph_g.number_of_edges()
......@@ -50,14 +53,13 @@ def test_from_cugraph_conversion():
# undirected graph conversion test
cugraph_g = cugraph.Graph(directed=False)
df = cudf.DataFrame({"source":[0, 1, 2, 3],
"destination":[1, 2, 3, 2]})
df = cudf.DataFrame({"source": [0, 1, 2, 3], "destination": [1, 2, 3, 2]})
cugraph_g.from_cudf_edgelist(df)
g = dgl.from_cugraph(cugraph_g)
assert g.device.type == 'cuda'
assert g.device.type == "cuda"
assert g.number_of_nodes() == cugraph_g.number_of_nodes()
# assert reverse edges are present
assert g.has_edges_between(0, 1)
......
import os
import dgl
import backend as F
from numpy.testing import assert_array_equal
import dgl
INTEGER = 2
STR = 'hello world!'
STR = "hello world!"
HELLO_SERVICE_ID = 901231
TENSOR = F.zeros((1000, 1000), F.int64, F.cpu())
......@@ -47,25 +49,36 @@ class HelloRequest(dgl.distributed.Request):
return res
def start_server(server_id, ip_config, num_servers, num_clients, net_type, keep_alive):
def start_server(
server_id, ip_config, num_servers, num_clients, net_type, keep_alive
):
server_state = dgl.distributed.ServerState(
None, local_g=None, partition_book=None, keep_alive=keep_alive)
None, local_g=None, partition_book=None, keep_alive=keep_alive
)
dgl.distributed.register_service(
HELLO_SERVICE_ID, HelloRequest, HelloResponse)
HELLO_SERVICE_ID, HelloRequest, HelloResponse
)
print("Start server {}".format(server_id))
dgl.distributed.start_server(server_id=server_id,
ip_config=ip_config,
num_servers=num_servers,
num_clients=num_clients,
server_state=server_state,
net_type=net_type)
dgl.distributed.start_server(
server_id=server_id,
ip_config=ip_config,
num_servers=num_servers,
num_clients=num_clients,
server_state=server_state,
net_type=net_type,
)
def start_client(ip_config, num_servers, group_id, net_type):
dgl.distributed.register_service(
HELLO_SERVICE_ID, HelloRequest, HelloResponse)
HELLO_SERVICE_ID, HelloRequest, HelloResponse
)
dgl.distributed.connect_to_server(
ip_config=ip_config, num_servers=num_servers, group_id=group_id, net_type=net_type)
ip_config=ip_config,
num_servers=num_servers,
group_id=group_id,
net_type=net_type,
)
req = HelloRequest(STR, INTEGER, TENSOR, tensor_func)
server_namebook = dgl.distributed.read_ip_config(ip_config, num_servers)
for server_id in server_namebook.keys():
......@@ -102,19 +115,20 @@ def start_client(ip_config, num_servers, group_id, net_type):
def main():
ip_config = os.environ.get('DIST_DGL_TEST_IP_CONFIG')
num_servers = int(os.environ.get('DIST_DGL_TEST_NUM_SERVERS'))
net_type = os.environ.get('DIST_DGL_TEST_NET_TYPE', 'tensorpipe')
if os.environ.get('DIST_DGL_TEST_ROLE', 'server') == 'server':
server_id = int(os.environ.get('DIST_DGL_TEST_SERVER_ID'))
num_clients = int(os.environ.get('DIST_DGL_TEST_NUM_CLIENTS'))
keep_alive = 'DIST_DGL_TEST_KEEP_ALIVE' in os.environ
start_server(server_id, ip_config, num_servers,
num_clients, net_type, keep_alive)
ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG")
num_servers = int(os.environ.get("DIST_DGL_TEST_NUM_SERVERS"))
net_type = os.environ.get("DIST_DGL_TEST_NET_TYPE", "tensorpipe")
if os.environ.get("DIST_DGL_TEST_ROLE", "server") == "server":
server_id = int(os.environ.get("DIST_DGL_TEST_SERVER_ID"))
num_clients = int(os.environ.get("DIST_DGL_TEST_NUM_CLIENTS"))
keep_alive = "DIST_DGL_TEST_KEEP_ALIVE" in os.environ
start_server(
server_id, ip_config, num_servers, num_clients, net_type, keep_alive
)
else:
group_id = int(os.environ.get('DIST_DGL_TEST_GROUP_ID', '0'))
group_id = int(os.environ.get("DIST_DGL_TEST_GROUP_ID", "0"))
start_client(ip_config, num_servers, group_id, net_type)
if __name__ == '__main__':
if __name__ == "__main__":
main()
import dgl
import os
import numpy as np
import dgl
import dgl.backend as F
from dgl.distributed import load_partition_book
mode = os.environ.get('DIST_DGL_TEST_MODE', "")
graph_name = os.environ.get('DIST_DGL_TEST_GRAPH_NAME', 'random_test_graph')
num_part = int(os.environ.get('DIST_DGL_TEST_NUM_PART'))
num_servers_per_machine = int(os.environ.get('DIST_DGL_TEST_NUM_SERVER'))
num_client_per_machine = int(os.environ.get('DIST_DGL_TEST_NUM_CLIENT'))
shared_workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE')
graph_path = os.environ.get('DIST_DGL_TEST_GRAPH_PATH')
part_id = int(os.environ.get('DIST_DGL_TEST_PART_ID'))
net_type = os.environ.get('DIST_DGL_TEST_NET_TYPE')
ip_config = os.environ.get('DIST_DGL_TEST_IP_CONFIG', 'ip_config.txt')
mode = os.environ.get("DIST_DGL_TEST_MODE", "")
graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph")
num_part = int(os.environ.get("DIST_DGL_TEST_NUM_PART"))
num_servers_per_machine = int(os.environ.get("DIST_DGL_TEST_NUM_SERVER"))
num_client_per_machine = int(os.environ.get("DIST_DGL_TEST_NUM_CLIENT"))
shared_workspace = os.environ.get("DIST_DGL_TEST_WORKSPACE")
graph_path = os.environ.get("DIST_DGL_TEST_GRAPH_PATH")
part_id = int(os.environ.get("DIST_DGL_TEST_PART_ID"))
net_type = os.environ.get("DIST_DGL_TEST_NET_TYPE")
ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt")
os.environ["DGL_DIST_MODE"] = "distributed"
os.environ['DGL_DIST_MODE'] = 'distributed'
def zeros_init(shape, dtype):
return F.zeros(shape, dtype=dtype, ctx=F.cpu())
def run_server(graph_name, server_id, server_count, num_clients, shared_mem, keep_alive=False):
def run_server(
graph_name,
server_id,
server_count,
num_clients,
shared_mem,
keep_alive=False,
):
# server_count = num_servers_per_machine
g = dgl.distributed.DistGraphServer(server_id, ip_config,
server_count, num_clients,
graph_path + '/{}.json'.format(graph_name),
disable_shared_mem=not shared_mem,
graph_format=['csc', 'coo'], keep_alive=keep_alive,
net_type=net_type)
print('start server', server_id)
g = dgl.distributed.DistGraphServer(
server_id,
ip_config,
server_count,
num_clients,
graph_path + "/{}.json".format(graph_name),
disable_shared_mem=not shared_mem,
graph_format=["csc", "coo"],
keep_alive=keep_alive,
net_type=net_type,
)
print("start server", server_id)
g.start()
##########################################
############### DistTensor ###############
##########################################
def dist_tensor_test_sanity(data_shape, name=None):
local_rank = dgl.distributed.get_rank() % num_client_per_machine
dist_ten = dgl.distributed.DistTensor(data_shape,
F.int32,
init_func=zeros_init,
name=name)
dist_ten = dgl.distributed.DistTensor(
data_shape, F.int32, init_func=zeros_init, name=name
)
# arbitrary value
stride = 3
pos = (part_id // 2) * num_client_per_machine + local_rank
if part_id % 2 == 0:
dist_ten[pos*stride:(pos+1)*stride] = F.ones((stride, 2), dtype=F.int32, ctx=F.cpu()) * (pos+1)
dist_ten[pos * stride : (pos + 1) * stride] = F.ones(
(stride, 2), dtype=F.int32, ctx=F.cpu()
) * (pos + 1)
dgl.distributed.client_barrier()
assert F.allclose(dist_ten[pos*stride:(pos+1)*stride],
F.ones((stride, 2), dtype=F.int32, ctx=F.cpu()) * (pos+1))
assert F.allclose(
dist_ten[pos * stride : (pos + 1) * stride],
F.ones((stride, 2), dtype=F.int32, ctx=F.cpu()) * (pos + 1),
)
def dist_tensor_test_destroy_recreate(data_shape, name):
dist_ten = dgl.distributed.DistTensor(data_shape, F.float32, name, init_func=zeros_init)
dist_ten = dgl.distributed.DistTensor(
data_shape, F.float32, name, init_func=zeros_init
)
del dist_ten
dgl.distributed.client_barrier()
new_shape = (data_shape[0], 4)
dist_ten = dgl.distributed.DistTensor(new_shape, F.float32, name, init_func=zeros_init)
dist_ten = dgl.distributed.DistTensor(
new_shape, F.float32, name, init_func=zeros_init
)
def dist_tensor_test_persistent(data_shape):
dist_ten_name = 'persistent_dist_tensor'
dist_ten = dgl.distributed.DistTensor(data_shape, F.float32, dist_ten_name, init_func=zeros_init,
persistent=True)
dist_ten_name = "persistent_dist_tensor"
dist_ten = dgl.distributed.DistTensor(
data_shape,
F.float32,
dist_ten_name,
init_func=zeros_init,
persistent=True,
)
del dist_ten
try:
dist_ten = dgl.distributed.DistTensor(data_shape, F.float32, dist_ten_name)
raise Exception('')
dist_ten = dgl.distributed.DistTensor(
data_shape, F.float32, dist_ten_name
)
raise Exception("")
except:
pass
......@@ -86,17 +119,20 @@ def test_dist_tensor(g):
############# DistEmbedding ##############
##########################################
def dist_embedding_check_sanity(num_nodes, optimizer, name=None):
local_rank = dgl.distributed.get_rank() % num_client_per_machine
local_rank = dgl.distributed.get_rank() % num_client_per_machine
emb = dgl.distributed.DistEmbedding(num_nodes, 1, name=name, init_func=zeros_init)
emb = dgl.distributed.DistEmbedding(
num_nodes, 1, name=name, init_func=zeros_init
)
lr = 0.001
optim = optimizer(params=[emb], lr=lr)
stride = 3
pos = (part_id // 2) * num_client_per_machine + local_rank
idx = F.arange(pos*stride, (pos+1)*stride)
idx = F.arange(pos * stride, (pos + 1) * stride)
if part_id % 2 == 0:
with F.record_grad():
......@@ -110,43 +146,64 @@ def dist_embedding_check_sanity(num_nodes, optimizer, name=None):
value = emb(idx)
F.allclose(value, F.ones((len(idx), 1), dtype=F.int32, ctx=F.cpu()) * -lr)
not_update_idx = F.arange(((num_part + 1) / 2) * num_client_per_machine * stride, num_nodes)
not_update_idx = F.arange(
((num_part + 1) / 2) * num_client_per_machine * stride, num_nodes
)
value = emb(not_update_idx)
assert np.all(F.asnumpy(value) == np.zeros((len(not_update_idx), 1)))
def dist_embedding_check_existing(num_nodes):
dist_emb_name = "UniqueEmb"
emb = dgl.distributed.DistEmbedding(num_nodes, 1, name=dist_emb_name, init_func=zeros_init)
emb = dgl.distributed.DistEmbedding(
num_nodes, 1, name=dist_emb_name, init_func=zeros_init
)
try:
emb1 = dgl.distributed.DistEmbedding(num_nodes, 2, name=dist_emb_name, init_func=zeros_init)
raise Exception('')
emb1 = dgl.distributed.DistEmbedding(
num_nodes, 2, name=dist_emb_name, init_func=zeros_init
)
raise Exception("")
except:
pass
def test_dist_embedding(g):
num_nodes = g.number_of_nodes(g.ntypes[0])
dist_embedding_check_sanity(num_nodes, dgl.distributed.optim.SparseAdagrad)
dist_embedding_check_sanity(num_nodes, dgl.distributed.optim.SparseAdagrad, name='SomeEmbedding')
dist_embedding_check_sanity(num_nodes, dgl.distributed.optim.SparseAdam, name='SomeEmbedding')
dist_embedding_check_sanity(
num_nodes, dgl.distributed.optim.SparseAdagrad, name="SomeEmbedding"
)
dist_embedding_check_sanity(
num_nodes, dgl.distributed.optim.SparseAdam, name="SomeEmbedding"
)
dist_embedding_check_existing(num_nodes)
if mode == "server":
shared_mem = bool(int(os.environ.get('DIST_DGL_TEST_SHARED_MEM')))
server_id = int(os.environ.get('DIST_DGL_TEST_SERVER_ID'))
run_server(graph_name, server_id, server_count=num_servers_per_machine,
num_clients=num_part*num_client_per_machine, shared_mem=shared_mem, keep_alive=False)
shared_mem = bool(int(os.environ.get("DIST_DGL_TEST_SHARED_MEM")))
server_id = int(os.environ.get("DIST_DGL_TEST_SERVER_ID"))
run_server(
graph_name,
server_id,
server_count=num_servers_per_machine,
num_clients=num_part * num_client_per_machine,
shared_mem=shared_mem,
keep_alive=False,
)
elif mode == "client":
os.environ['DGL_NUM_SERVER'] = str(num_servers_per_machine)
os.environ["DGL_NUM_SERVER"] = str(num_servers_per_machine)
dgl.distributed.initialize(ip_config, net_type=net_type)
gpb, graph_name, _, _ = load_partition_book(graph_path + '/{}.json'.format(graph_name), part_id, None)
gpb, graph_name, _, _ = load_partition_book(
graph_path + "/{}.json".format(graph_name), part_id, None
)
g = dgl.distributed.DistGraph(graph_name, gpb=gpb)
target_func_map = {"DistTensor": test_dist_tensor,
"DistEmbedding": test_dist_embedding,
}
target_func_map = {
"DistTensor": test_dist_tensor,
"DistEmbedding": test_dist_embedding,
}
target = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "")
if target not in target_func_map:
......@@ -158,4 +215,3 @@ elif mode == "client":
else:
print("DIST_DGL_TEST_MODE has to be either server or client")
exit(1)
import os
import unittest
from utils import execute_remote, get_ips
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(os.name == "nt", reason="Do not support windows yet")
def test_tensorpipe_comm():
base_dir = os.environ.get('DIST_DGL_TEST_CPP_BIN_DIR', '.')
ip_config = os.environ.get('DIST_DGL_TEST_IP_CONFIG', 'ip_config.txt')
client_bin = os.path.join(base_dir, 'rpc_client')
server_bin = os.path.join(base_dir, 'rpc_server')
base_dir = os.environ.get("DIST_DGL_TEST_CPP_BIN_DIR", ".")
ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt")
client_bin = os.path.join(base_dir, "rpc_client")
server_bin = os.path.join(base_dir, "rpc_server")
ips = get_ips(ip_config)
num_machines = len(ips)
procs = []
for ip in ips:
procs.append(execute_remote(server_bin + " " +
str(num_machines) + " " + ip, ip))
procs.append(
execute_remote(server_bin + " " + str(num_machines) + " " + ip, ip)
)
for ip in ips:
procs.append(execute_remote(client_bin + " " + ip_config, ip))
for p in procs:
......
import multiprocessing as mp
import os
import subprocess
import unittest
import numpy as np
import pytest
import multiprocessing as mp
import subprocess
import utils
import dgl
import numpy as np
import dgl.backend as F
from dgl.distributed import partition_graph
graph_name = os.environ.get('DIST_DGL_TEST_GRAPH_NAME', 'random_test_graph')
target = os.environ.get('DIST_DGL_TEST_OBJECT_TYPE', '')
shared_workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE')
graph_name = os.environ.get("DIST_DGL_TEST_GRAPH_NAME", "random_test_graph")
target = os.environ.get("DIST_DGL_TEST_OBJECT_TYPE", "")
shared_workspace = os.environ.get("DIST_DGL_TEST_WORKSPACE")
def create_graph(num_part, dist_graph_path, hetero):
if not hetero:
g = dgl.rand_graph(10000, 42000)
g.ndata['feat'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata['feat'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
g.ndata["feat"] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata["feat"] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
partition_graph(g, graph_name, num_part, dist_graph_path)
else:
from scipy import sparse as spsp
num_nodes = {'n1': 10000, 'n2': 10010, 'n3': 10020}
etypes = [('n1', 'r1', 'n2'),
('n1', 'r2', 'n3'),
('n2', 'r3', 'n3')]
num_nodes = {"n1": 10000, "n2": 10010, "n3": 10020}
etypes = [("n1", "r1", "n2"), ("n1", "r2", "n3"), ("n2", "r3", "n3")]
edges = {}
for etype in etypes:
src_ntype, _, dst_ntype = etype
arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.001, format='coo',
random_state=100)
arr = spsp.random(
num_nodes[src_ntype],
num_nodes[dst_ntype],
density=0.001,
format="coo",
random_state=100,
)
edges[etype] = (arr.row, arr.col)
g = dgl.heterograph(edges, num_nodes)
g.nodes['n1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_nodes('n1')), 1)
g.edges['r1'].data['feat'] = F.unsqueeze(F.arange(0, g.number_of_edges('r1')), 1)
g.nodes["n1"].data["feat"] = F.unsqueeze(
F.arange(0, g.number_of_nodes("n1")), 1
)
g.edges["r1"].data["feat"] = F.unsqueeze(
F.arange(0, g.number_of_edges("r1")), 1
)
partition_graph(g, graph_name, num_part, dist_graph_path)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@pytest.mark.parametrize("net_type", ['tensorpipe', 'socket'])
@unittest.skipIf(os.name == "nt", reason="Do not support windows yet")
@pytest.mark.parametrize("net_type", ["tensorpipe", "socket"])
@pytest.mark.parametrize("num_servers", [1, 4])
@pytest.mark.parametrize("num_clients", [1, 4])
@pytest.mark.parametrize("hetero", [False, True])
@pytest.mark.parametrize("shared_mem", [False, True])
def test_dist_objects(net_type, num_servers, num_clients, hetero, shared_mem):
if not shared_mem and num_servers > 1:
pytest.skip(f"Backup servers are not supported when shared memory is disabled")
ip_config = os.environ.get('DIST_DGL_TEST_IP_CONFIG', 'ip_config.txt')
workspace = os.environ.get('DIST_DGL_TEST_WORKSPACE', '/shared_workspace/dgl_dist_tensor_test/')
pytest.skip(
f"Backup servers are not supported when shared memory is disabled"
)
ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt")
workspace = os.environ.get(
"DIST_DGL_TEST_WORKSPACE", "/shared_workspace/dgl_dist_tensor_test/"
)
ips = utils.get_ips(ip_config)
num_part = len(ips)
test_bin = os.path.join(os.environ.get(
'DIST_DGL_TEST_PY_BIN_DIR', '.'), 'run_dist_objects.py')
test_bin = os.path.join(
os.environ.get("DIST_DGL_TEST_PY_BIN_DIR", "."), "run_dist_objects.py"
)
dist_graph_path = os.path.join(workspace, 'hetero_dist_graph' if hetero else 'dist_graph')
dist_graph_path = os.path.join(
workspace, "hetero_dist_graph" if hetero else "dist_graph"
)
if not os.path.isdir(dist_graph_path):
create_graph(num_part, dist_graph_path, hetero)
base_envs = f"DIST_DGL_TEST_WORKSPACE={workspace} " \
f"DIST_DGL_TEST_NUM_PART={num_part} " \
f"DIST_DGL_TEST_NUM_SERVER={num_servers} " \
f"DIST_DGL_TEST_NUM_CLIENT={num_clients} " \
f"DIST_DGL_TEST_NET_TYPE={net_type} " \
f"DIST_DGL_TEST_GRAPH_PATH={dist_graph_path} " \
f"DIST_DGL_TEST_IP_CONFIG={ip_config} "
base_envs = (
f"DIST_DGL_TEST_WORKSPACE={workspace} "
f"DIST_DGL_TEST_NUM_PART={num_part} "
f"DIST_DGL_TEST_NUM_SERVER={num_servers} "
f"DIST_DGL_TEST_NUM_CLIENT={num_clients} "
f"DIST_DGL_TEST_NET_TYPE={net_type} "
f"DIST_DGL_TEST_GRAPH_PATH={dist_graph_path} "
f"DIST_DGL_TEST_IP_CONFIG={ip_config} "
)
procs = []
# Start server
server_id = 0
for part_id, ip in enumerate(ips):
for _ in range(num_servers):
cmd_envs = base_envs + \
f"DIST_DGL_TEST_SERVER_ID={server_id} " \
f"DIST_DGL_TEST_PART_ID={part_id} " \
f"DIST_DGL_TEST_SHARED_MEM={str(int(shared_mem))} " \
f"DIST_DGL_TEST_MODE=server "
procs.append(utils.execute_remote(
f"{cmd_envs} python3 {test_bin}",
ip))
cmd_envs = (
base_envs + f"DIST_DGL_TEST_SERVER_ID={server_id} "
f"DIST_DGL_TEST_PART_ID={part_id} "
f"DIST_DGL_TEST_SHARED_MEM={str(int(shared_mem))} "
f"DIST_DGL_TEST_MODE=server "
)
procs.append(
utils.execute_remote(f"{cmd_envs} python3 {test_bin}", ip)
)
server_id += 1
# Start client processes
for part_id, ip in enumerate(ips):
for _ in range(num_clients):
cmd_envs = base_envs + \
f"DIST_DGL_TEST_PART_ID={part_id} " \
f"DIST_DGL_TEST_OBJECT_TYPE={target} " \
f"DIST_DGL_TEST_MODE=client "
procs.append(utils.execute_remote(
f"{cmd_envs} python3 {test_bin}",
ip))
cmd_envs = (
base_envs + f"DIST_DGL_TEST_PART_ID={part_id} "
f"DIST_DGL_TEST_OBJECT_TYPE={target} "
f"DIST_DGL_TEST_MODE=client "
)
procs.append(
utils.execute_remote(f"{cmd_envs} python3 {test_bin}", ip)
)
for p in procs:
p.join()
assert p.exitcode == 0
import multiprocessing as mp
import os
import unittest
import pytest
import multiprocessing as mp
import utils
dgl_envs = f"PYTHONUNBUFFERED=1 DMLC_LOG_DEBUG=1 DGLBACKEND={os.environ.get('DGLBACKEND')} DGL_LIBRARY_PATH={os.environ.get('DGL_LIBRARY_PATH')} PYTHONPATH={os.environ.get('PYTHONPATH')} "
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@pytest.mark.parametrize("net_type", ['socket', 'tensorpipe'])
@unittest.skipIf(os.name == "nt", reason="Do not support windows yet")
@pytest.mark.parametrize("net_type", ["socket", "tensorpipe"])
def test_rpc(net_type):
ip_config = os.environ.get('DIST_DGL_TEST_IP_CONFIG', 'ip_config.txt')
ip_config = os.environ.get("DIST_DGL_TEST_IP_CONFIG", "ip_config.txt")
num_clients = 1
num_servers = 1
ips = utils.get_ips(ip_config)
num_machines = len(ips)
test_bin = os.path.join(os.environ.get(
'DIST_DGL_TEST_PY_BIN_DIR', '.'), 'rpc_basic.py')
base_envs = dgl_envs + \
f" DGL_DIST_MODE=distributed DIST_DGL_TEST_IP_CONFIG={ip_config} DIST_DGL_TEST_NUM_SERVERS={num_servers} DIST_DGL_TEST_NET_TYPE={net_type} "
test_bin = os.path.join(
os.environ.get("DIST_DGL_TEST_PY_BIN_DIR", "."), "rpc_basic.py"
)
base_envs = (
dgl_envs
+ f" DGL_DIST_MODE=distributed DIST_DGL_TEST_IP_CONFIG={ip_config} DIST_DGL_TEST_NUM_SERVERS={num_servers} DIST_DGL_TEST_NET_TYPE={net_type} "
)
procs = []
# start server processes
server_id = 0
for ip in ips:
for _ in range(num_servers):
server_envs = base_envs + \
f" DIST_DGL_TEST_ROLE=server DIST_DGL_TEST_SERVER_ID={server_id} DIST_DGL_TEST_NUM_CLIENTS={num_clients * num_machines} "
procs.append(utils.execute_remote(
server_envs + " python3 " + test_bin, ip))
server_envs = (
base_envs
+ f" DIST_DGL_TEST_ROLE=server DIST_DGL_TEST_SERVER_ID={server_id} DIST_DGL_TEST_NUM_CLIENTS={num_clients * num_machines} "
)
procs.append(
utils.execute_remote(server_envs + " python3 " + test_bin, ip)
)
server_id += 1
# start client processes
client_envs = base_envs + " DIST_DGL_TEST_ROLE=client DIST_DGL_TEST_GROUP_ID=0 "
client_envs = (
base_envs + " DIST_DGL_TEST_ROLE=client DIST_DGL_TEST_GROUP_ID=0 "
)
for ip in ips:
for _ in range(num_clients):
procs.append(utils.execute_remote(
client_envs + " python3 "+test_bin, ip))
procs.append(
utils.execute_remote(client_envs + " python3 " + test_bin, ip)
)
for p in procs:
p.join()
assert p.exitcode == 0
import subprocess
import multiprocessing as mp
from typing import Optional
import os
import subprocess
from typing import Optional
def run(ssh_cmd):
subprocess.check_call(ssh_cmd, shell=True)
def execute_remote(
cmd: str,
ip: str,
port: Optional[int] = 22,
username: Optional[str] = ""
cmd: str, ip: str, port: Optional[int] = 22, username: Optional[str] = ""
) -> mp.Process:
"""Execute command line on remote machine via ssh.
......@@ -30,18 +28,18 @@ def execute_remote(
if username:
ip_prefix += "{username}@".format(username=username)
custom_port = os.getenv('DIST_DGL_TEST_SSH_PORT', '')
custom_port = os.getenv("DIST_DGL_TEST_SSH_PORT", "")
if custom_port:
port = custom_port
custom_ssh_key = os.getenv('DIST_DGL_TEST_SSH_KEY', '')
custom_ssh_key = os.getenv("DIST_DGL_TEST_SSH_KEY", "")
if custom_ssh_key:
custom_ssh_key = os.path.expanduser(custom_ssh_key)
custom_ssh_key = "-i " + custom_ssh_key
ssh_setup = os.getenv('DIST_DGL_TEST_SSH_SETUP', '')
ssh_setup = os.getenv("DIST_DGL_TEST_SSH_SETUP", "")
if ssh_setup:
cmd = ssh_setup + ';' + cmd
cmd = ssh_setup + ";" + cmd
# Construct ssh command that executes `cmd` on the remote host
ssh_cmd = "ssh -o StrictHostKeyChecking=no {ssh_key} -p {port} {ip_prefix}{ip} '{cmd}'".format(
ssh_key=custom_ssh_key,
......@@ -50,11 +48,12 @@ def execute_remote(
ip=ip,
cmd=cmd,
)
ctx = mp.get_context('spawn')
ctx = mp.get_context("spawn")
proc = ctx.Process(target=run, args=(ssh_cmd,))
proc.start()
return proc
def get_ips(ip_config):
ips = []
with open(ip_config) as f:
......@@ -62,6 +61,7 @@ def get_ips(ip_config):
result = line.strip().split()
if len(result) != 1:
raise RuntimeError(
"Invalid format of ip_config:{}".format(ip_config))
"Invalid format of ip_config:{}".format(ip_config)
)
ips.append(result[0])
return ips
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import socket
import os
import random
import scipy.sparse as spsp
import socket
import numpy as np
import scipy.sparse as spsp
import dgl
......@@ -13,10 +14,10 @@ def generate_ip_config(file_name, num_machines, num_servers):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
sock.connect(('10.255.255.255', 1))
sock.connect(("10.255.255.255", 1))
ip = sock.getsockname()[0]
except ValueError:
ip = '127.0.0.1'
ip = "127.0.0.1"
finally:
sock.close()
......@@ -35,16 +36,23 @@ def generate_ip_config(file_name, num_machines, num_servers):
sock.close()
if len(ports) < num_machines * num_servers:
raise RuntimeError(
"Failed to get available IP/PORT with required numbers.")
with open(file_name, 'w') as f:
"Failed to get available IP/PORT with required numbers."
)
with open(file_name, "w") as f:
for i in range(num_machines):
f.write('{} {}\n'.format(ip, ports[i*num_servers]))
f.write("{} {}\n".format(ip, ports[i * num_servers]))
def reset_envs():
"""Reset common environment variable which are set in tests. """
for key in ['DGL_ROLE', 'DGL_NUM_SAMPLER', 'DGL_NUM_SERVER', \
'DGL_DIST_MODE', 'DGL_NUM_CLIENT', 'DGL_DIST_MAX_TRY_TIMES']:
"""Reset common environment variable which are set in tests."""
for key in [
"DGL_ROLE",
"DGL_NUM_SAMPLER",
"DGL_NUM_SERVER",
"DGL_DIST_MODE",
"DGL_NUM_CLIENT",
"DGL_DIST_MAX_TRY_TIMES",
]:
if key in os.environ:
os.environ.pop(key)
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment