Unverified Commit 6ce2dc34 authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

[Test] Regression Test for update_all and apply_edges (#2568)



* add bench jenkins

* instance type

* fix

* fix

* fix

* 111

* test

* 111

* 111

* fix

* test

* run

* fix

* fix

* fix

* fix

* fix

* publish results

* 111

* regression

* launch ec2 script

* fix

* add

* run on master

* change

* rrr

* run gpu

* fix

* fix

* try fix

* fix

* ff

* fix

* fix

* fix

* refactor

* fix

* fix

* update

* fix

* fix

* fix

* fix

* remove import torchtext

* add shm size

* update

* fix

* fix

* fix

* fix

* fix this!!!!

* 111

* fix

* remove verbose

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* update readme

* fix

* fix

* fix

* change asv default to head

* commit sage and rgcn

* fix

* update

* add benchmarks

* add

* fix

* update

* remove RandomState

* tmp remove

* new batch

* fix

* fix

* fix

* address comment

* fix warning

* fix

* fix

* fix

* fix

* add multiupdate all

* address comment

* fix
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
parent fb3c0709
import time
import dgl
import torch
import numpy as np
import dgl.function as fn
from .. import utils
@utils.benchmark('time', timeout=600)
@utils.parametrize('graph_name', ['cora', 'livejournal'])
@utils.parametrize('format', ['coo', 'csr'])
@utils.parametrize('feat_size', [8, 32, 128, 512])
@utils.parametrize('reduce_type', ['u->e', 'u+v'])
def track_time(graph_name, format, feat_size, reduce_type):
device = utils.get_bench_device()
graph = utils.get_graph(graph_name, format)
graph = graph.to(device)
graph.ndata['h'] = torch.randn(
(graph.num_nodes(), feat_size), device=device)
reduce_builtin_dict = {
'u->e': fn.copy_u('h', 'x'),
'u+v': fn.u_add_v('h', 'h', 'x'),
}
# dry run
graph.apply_edges(reduce_builtin_dict[reduce_type])
# timing
t0 = time.time()
for i in range(3):
graph.apply_edges(reduce_builtin_dict[reduce_type])
t1 = time.time()
return (t1 - t0) / 3
import time
import dgl
import torch
import numpy as np
import dgl.function as fn
from .. import utils
@utils.benchmark('time', timeout=600)
@utils.parametrize('feat_size', [32, 128, 512])
@utils.parametrize('num_relations', [5, 50, 500])
@utils.parametrize('multi_reduce_type', ["sum", "stuck"])
def track_time(feat_size, num_relations, multi_reduce_type):
device = utils.get_bench_device()
dd = {}
candidate_edges = [dgl.data.CoraGraphDataset(verbose=False)[0].edges(), dgl.data.PubmedGraphDataset(verbose=False)[
0].edges(), dgl.data.CiteseerGraphDataset(verbose=False)[0].edges()]
for i in range(num_relations):
dd[('n1', 'e_{}'.format(i), 'n2')] = candidate_edges[i %
len(candidate_edges)]
graph = dgl.heterograph(dd)
graph = graph.to(device)
graph.nodes['n1'].data['h'] = torch.randn(
(graph.num_nodes('n1'), feat_size), device=device)
graph.nodes['n2'].data['h'] = torch.randn(
(graph.num_nodes('n2'), feat_size), device=device)
# dry run
update_dict = {}
for i in range(num_relations):
update_dict['e_{}'.format(i)] = (
fn.copy_src('h', 'm'), fn.sum('m', 'h'))
graph.multi_update_all(
update_dict,
multi_reduce_type)
# timing
t0 = time.time()
for i in range(3):
graph.multi_update_all(
update_dict,
multi_reduce_type)
t1 = time.time()
return (t1 - t0) / 3
import time
import dgl
import torch
import numpy as np
import dgl.function as fn
from .. import utils
@utils.benchmark('time', timeout=7200)
@utils.parametrize('graph_name', ['cora', 'livejournal'])
@utils.parametrize('format', ['coo', 'csr'])
@utils.parametrize('feat_size', [8, 32, 128, 512])
@utils.parametrize('msg_type', ['copy_u', 'u_mul_e'])
@utils.parametrize('reduce_type', ['sum', 'mean', 'max'])
def track_time(graph_name, format, feat_size, msg_type, reduce_type):
device = utils.get_bench_device()
graph = utils.get_graph(graph_name, format)
graph = graph.to(device)
graph.ndata['h'] = torch.randn(
(graph.num_nodes(), feat_size), device=device)
graph.edata['e'] = torch.randn(
(graph.num_edges(), feat_size), device=device)
msg_builtin_dict = {
'copy_u': fn.copy_u('h', 'x'),
'u_mul_e': fn.u_mul_e('h', 'e','x'),
}
reduce_builtin_dict = {
'sum': fn.sum('x', 'h_new'),
'mean': fn.mean('x', 'h_new'),
'max': fn.max('x', 'h_new'),
}
# dry run
graph.update_all(msg_builtin_dict[msg_type], reduce_builtin_dict[reduce_type])
# timing
t0 = time.time()
for i in range(3):
graph.update_all(msg_builtin_dict[msg_type], reduce_builtin_dict[reduce_type])
t1 = time.time()
return (t1 - t0) / 3
......@@ -21,15 +21,15 @@ def track_time(batch_size, feat_size, readout_op, type):
g.ndata['h'] = torch.randn((g.num_nodes(), feat_size), device=device)
t0 = time.time()
for i in range(10):
out = dgl.readout_nodes(g, 'h', readout_op)
out = dgl.readout_nodes(g, 'h', op=readout_op)
t1 = time.time()
elif type == 'edge':
g.edata['h'] = torch.randn((g.num_edges(), feat_size), device=device)
t0 = time.time()
for i in range(10):
out = dgl.readout_edges(g, 'h', readout_op)
out = dgl.readout_edges(g, 'h', op=readout_op)
t1 = time.time()
else:
raise Exception("Unknown type")
return (t1 - t0) / 10
return (t1 - t0) / 10
\ No newline at end of file
import time
import dgl
import torch
import numpy as np
import dgl.function as fn
from .. import utils
@utils.benchmark('time', timeout=7200)
@utils.parametrize('graph_name', ['cora', 'livejournal'])
@utils.parametrize('format', ['coo', 'csr'])
@utils.parametrize('feat_size', [8, 32, 128, 512])
@utils.parametrize('reduce_type', ['u->e', 'u+v'])
def track_time(graph_name, format, feat_size, reduce_type):
device = utils.get_bench_device()
graph = utils.get_graph(graph_name, format)
graph = graph.to(device)
graph.ndata['h'] = torch.randn(
(graph.num_nodes(), feat_size), device=device)
reduce_udf_dict = {
'u->e': lambda edges: {'x': edges.src['h']},
'u+v': lambda edges: {'x': edges.src['h'] + edges.dst['h']},
}
# dry run
graph.apply_edges(reduce_udf_dict[reduce_type])
# timing
t0 = time.time()
for i in range(3):
graph.apply_edges(reduce_udf_dict[reduce_type])
t1 = time.time()
return (t1 - t0) / 3
import time
import dgl
import torch
import numpy as np
import dgl.function as fn
from .. import utils
@utils.benchmark('time', timeout=600)
@utils.parametrize('feat_size', [32, 128, 512])
@utils.parametrize('num_relations', [5, 50, 500])
@utils.parametrize('multi_reduce_type', ["sum", "stuck"])
def track_time(feat_size, num_relations, multi_reduce_type):
device = utils.get_bench_device()
dd = {}
candidate_edges = [dgl.data.CoraGraphDataset(verbose=False)[0].edges(), dgl.data.PubmedGraphDataset(verbose=False)[
0].edges(), dgl.data.CiteseerGraphDataset(verbose=False)[0].edges()]
for i in range(num_relations):
dd[('n1', 'e_{}'.format(i), 'n2')] = candidate_edges[i %
len(candidate_edges)]
graph = dgl.heterograph(dd)
graph = graph.to(device)
graph.nodes['n1'].data['h'] = torch.randn(
(graph.num_nodes('n1'), feat_size), device=device)
graph.nodes['n2'].data['h'] = torch.randn(
(graph.num_nodes('n2'), feat_size), device=device)
# dry run
update_dict = {}
for i in range(num_relations):
update_dict['e_{}'.format(i)] = (
lambda edges: {'x': edges.src['h']}, lambda nodes: {'h_new': torch.sum(nodes.mailbox['x'], dim=1)})
graph.multi_update_all(
update_dict,
multi_reduce_type)
# timing
t0 = time.time()
for i in range(3):
graph.multi_update_all(
update_dict,
multi_reduce_type)
t1 = time.time()
return (t1 - t0) / 3
import time
import dgl
import torch
import numpy as np
import dgl.function as fn
from .. import utils
@utils.benchmark('time', timeout=7200)
@utils.parametrize('graph_name', ['cora', 'livejournal'])
@utils.parametrize('format', ['coo', 'csr'])
@utils.parametrize('feat_size', [8, 32, 128, 512])
@utils.parametrize('msg_type', ['copy_u', 'u_mul_e'])
@utils.parametrize('reduce_type', ['sum', 'mean', 'max'])
def track_time(graph_name, format, feat_size, msg_type, reduce_type):
device = utils.get_bench_device()
graph = utils.get_graph(graph_name, format)
graph = graph.to(device)
graph.ndata['h'] = torch.randn(
(graph.num_nodes(), feat_size), device=device)
graph.edata['e'] = torch.randn(
(graph.num_edges(), feat_size), device=device)
msg_udf_dict = {
'copy_u': lambda edges: {'x': edges.src['h']},
'u_mul_e': lambda edges: {'x': edges.src['h']*edges.data['e']},
}
reduct_udf_dict = {
'sum': lambda nodes: {'h_new': torch.sum(nodes.mailbox['x'], dim=1)},
'mean': lambda nodes: {'h_new': torch.mean(nodes.mailbox['x'], dim=1)},
'max': lambda nodes: {'h_new': torch.max(nodes.mailbox['x'], dim=1)[0]},
}
# dry run
graph.update_all(msg_udf_dict[msg_type], reduct_udf_dict[reduce_type])
# timing
t0 = time.time()
for i in range(3):
graph.update_all(msg_udf_dict[msg_type], reduct_udf_dict[reduce_type])
t1 = time.time()
return (t1 - t0) / 3
......@@ -12,6 +12,42 @@ import torch
import time
from ogb.nodeproppred import DglNodePropPredDataset
from functools import partial, reduce, wraps
import torch.multiprocessing as mp
from _thread import start_new_thread
import traceback
def thread_wrapped_func(func):
"""
Wraps a process entry point to make it work with OpenMP.
"""
@wraps(func)
def decorated_function(*args, **kwargs):
queue = mp.Queue()
def _queue_result():
exception, trace, res = None, None, None
try:
res = func(*args, **kwargs)
except Exception as e:
exception = e
trace = traceback.format_exc()
queue.put((res, exception, trace))
start_new_thread(_queue_result, ())
result, exception, trace = queue.get()
if exception is None:
return result
else:
assert isinstance(exception, Exception)
raise exception.__class__(trace)
return decorated_function
def _download(url, path, filename):
fn = os.path.join(path, filename)
if os.path.exists(fn):
......@@ -30,7 +66,7 @@ def _download(url, path, filename):
def get_graph(name, format):
g = None
if name == 'cora':
g = dgl.data.CoraGraphDataset()[0]
g = dgl.data.CoraGraphDataset(verbose=False)[0]
elif name == 'livejournal':
bin_path = "/tmp/dataset/livejournal/livejournal_{}.bin".format(format)
if os.path.exists(bin_path):
......@@ -60,6 +96,8 @@ def get_graph(name, format):
else:
raise Exception("Unknown dataset")
g = g.formats([format])
# Remove format strict
g = g.formats(['coo', 'csr', 'csc'])
return g
def get_ogb_graph(name):
......@@ -469,7 +507,7 @@ def benchmark(track_type, timeout=60):
if not filter.check(func):
# skip if not enabled
func.benchmark_name = "skip_" + func.__name__
return func
return thread_wrapped_func(func)
return _wrapper
#####################################
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment