Unverified Commit 56339b34 authored by Zihao Ye's avatar Zihao Ye Committed by GitHub
Browse files

[Fix] Update ogb examples according to new API. (#1922)



* upd

* upd
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-29-3.us-east-2.compute.internal>
parent 001d7937
...@@ -96,7 +96,7 @@ class GAT(nn.Module): ...@@ -96,7 +96,7 @@ class GAT(nn.Module):
num_workers=args.num_workers) num_workers=args.num_workers)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader): for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0].to(device) block = blocks[0].int().to(device)
h = x[input_nodes].to(device) h = x[input_nodes].to(device)
h_dst = h[:block.number_of_dst_nodes()].to(device) h_dst = h[:block.number_of_dst_nodes()].to(device)
if l < self.n_layers - 1: if l < self.n_layers - 1:
...@@ -164,7 +164,7 @@ def run(args, device, data): ...@@ -164,7 +164,7 @@ def run(args, device, data):
# blocks. # blocks.
tic_start = time.time() tic_start = time.time()
for step, cluster in enumerate(cluster_iterator): for step, cluster in enumerate(cluster_iterator):
cluster = cluster.to(device) cluster = cluster.int().to(device)
mask = cluster.ndata['train_mask'] mask = cluster.ndata['train_mask']
if mask.sum() == 0: if mask.sum() == 0:
continue continue
...@@ -265,7 +265,7 @@ if __name__ == '__main__': ...@@ -265,7 +265,7 @@ if __name__ == '__main__':
cluster_iter_data = ClusterIter( cluster_iter_data = ClusterIter(
'ogbn-products', graph, args.num_partitions, args.batch_size) 'ogbn-products', graph, args.num_partitions, args.batch_size)
cluster_iterator = DataLoader(cluster_iter_data, batch_size=args.batch_size, shuffle=True, pin_memory=True, num_workers=0, collate_fn=partial(subgraph_collate_fn, graph)) cluster_iterator = DataLoader(cluster_iter_data, batch_size=args.batch_size, shuffle=True, pin_memory=True, num_workers=4, collate_fn=partial(subgraph_collate_fn, graph))
in_feats = graph.ndata['feat'].shape[1] in_feats = graph.ndata['feat'].shape[1]
n_classes = (labels.max() + 1).item() n_classes = (labels.max() + 1).item()
......
...@@ -7,4 +7,4 @@ We use builtin metris to do the graph partition. ...@@ -7,4 +7,4 @@ We use builtin metris to do the graph partition.
Run `main.py` and you should directly see the result. Run `main.py` and you should directly see the result.
Accuracy over 10 runs: 0.7565994 ± 0.00654619 Accuracy over 10 runs: 0.7830701 ± 0.0035093208
...@@ -133,8 +133,7 @@ def run(args, device, data): ...@@ -133,8 +133,7 @@ def run(args, device, data):
# blocks. # blocks.
tic_start = time.time() tic_start = time.time()
for step, cluster in enumerate(cluster_iterator): for step, cluster in enumerate(cluster_iterator):
#cluster.copy_from_parent() cluster = cluster.int().to(device)
#cluster.ndata['train_mask'] = g.ndata['train_mask'][cluster.ndata[dgl.NID]]
mask = cluster.ndata['train_mask'].to(device) mask = cluster.ndata['train_mask'].to(device)
if mask.sum() == 0: if mask.sum() == 0:
continue continue
...@@ -184,14 +183,14 @@ if __name__ == '__main__': ...@@ -184,14 +183,14 @@ if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training") argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument('--gpu', type=int, default=0, argparser.add_argument('--gpu', type=int, default=0,
help="GPU device ID. Use -1 for CPU training") help="GPU device ID. Use -1 for CPU training")
argparser.add_argument('--num-epochs', type=int, default=20) argparser.add_argument('--num-epochs', type=int, default=30)
argparser.add_argument('--num-hidden', type=int, default=256) argparser.add_argument('--num-hidden', type=int, default=256)
argparser.add_argument('--num-layers', type=int, default=3) argparser.add_argument('--num-layers', type=int, default=3)
argparser.add_argument('--batch-size', type=int, default=32) argparser.add_argument('--batch-size', type=int, default=32)
argparser.add_argument('--val-batch-size', type=int, default=10000) argparser.add_argument('--val-batch-size', type=int, default=10000)
argparser.add_argument('--log-every', type=int, default=20) argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=1) argparser.add_argument('--eval-every', type=int, default=1)
argparser.add_argument('--lr', type=float, default=0.003) argparser.add_argument('--lr', type=float, default=0.001)
argparser.add_argument('--dropout', type=float, default=0.5) argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--save-pred', type=str, default='') argparser.add_argument('--save-pred', type=str, default='')
argparser.add_argument('--wd', type=float, default=0) argparser.add_argument('--wd', type=float, default=0)
...@@ -229,7 +228,7 @@ if __name__ == '__main__': ...@@ -229,7 +228,7 @@ if __name__ == '__main__':
cluster_iter_data = ClusterIter( cluster_iter_data = ClusterIter(
'ogbn-products', graph, args.num_partitions, args.batch_size, th.cat([train_idx, val_idx, test_idx])) 'ogbn-products', graph, args.num_partitions, args.batch_size, th.cat([train_idx, val_idx, test_idx]))
idx = th.arange(args.num_partitions // args.batch_size) idx = th.arange(args.num_partitions // args.batch_size)
cluster_iterator = DataLoader(cluster_iter_data, batch_size=32, shuffle=True, pin_memory=True, num_workers=0, collate_fn=partial(subgraph_collate_fn, graph)) cluster_iterator = DataLoader(cluster_iter_data, batch_size=32, shuffle=True, pin_memory=True, num_workers=4, collate_fn=partial(subgraph_collate_fn, graph))
in_feats = graph.ndata['feat'].shape[1] in_feats = graph.ndata['feat'].shape[1]
print(in_feats) print(in_feats)
......
...@@ -116,10 +116,7 @@ def make_undirected(G): ...@@ -116,10 +116,7 @@ def make_undirected(G):
return G return G
def find_connected_nodes(G): def find_connected_nodes(G):
nodes = [] nodes = G.out_degrees().nonzero().squeeze(-1)
for n in G.nodes():
if G.out_degree(n) > 0:
nodes.append(n.item())
return nodes return nodes
class DeepwalkDataset: class DeepwalkDataset:
...@@ -188,7 +185,7 @@ class DeepwalkDataset: ...@@ -188,7 +185,7 @@ class DeepwalkDataset:
# negative table for true negative sampling # negative table for true negative sampling
if not fast_neg: if not fast_neg:
node_degree = np.array(list(map(lambda x: self.G.out_degree(x), self.valid_seeds))) node_degree = self.G.out_degrees(self.valid_seeds).numpy()
node_degree = np.power(node_degree, 0.75) node_degree = np.power(node_degree, 0.75)
node_degree /= np.sum(node_degree) node_degree /= np.sum(node_degree)
node_degree = np.array(node_degree * 1e8, dtype=np.int) node_degree = np.array(node_degree * 1e8, dtype=np.int)
...@@ -224,6 +221,5 @@ class DeepwalkSampler(object): ...@@ -224,6 +221,5 @@ class DeepwalkSampler(object):
self.walk_length = walk_length self.walk_length = walk_length
def sample(self, seeds): def sample(self, seeds):
walks = dgl.contrib.sampling.random_walk(self.G, seeds, walks = dgl.sampling.random_walk(self.G, seeds, length=self.walk_length-1)[0]
1, self.walk_length-1)
return walks return walks
...@@ -108,7 +108,7 @@ class GAT(nn.Module): ...@@ -108,7 +108,7 @@ class GAT(nn.Module):
num_workers=args.num_workers) num_workers=args.num_workers)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader): for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0].to(device) block = blocks[0].int().to(device)
h = x[input_nodes].to(device) h = x[input_nodes].to(device)
h_dst = h[:block.number_of_dst_nodes()] h_dst = h[:block.number_of_dst_nodes()]
...@@ -124,17 +124,6 @@ class GAT(nn.Module): ...@@ -124,17 +124,6 @@ class GAT(nn.Module):
x = y x = y
return y return y
def prepare_mp(g):
"""
Explicitly materialize the CSR, CSC and COO representation of the given graph
so that they could be shared via copy-on-write to sampler workers and GPU
trainers.
This is a workaround before full shared memory support on heterogeneous graphs.
"""
g.request_format('csr')
g.request_format('coo')
g.request_format('csc')
def compute_acc(pred, labels): def compute_acc(pred, labels):
""" """
Compute the accuracy of prediction given the labels. Compute the accuracy of prediction given the labels.
...@@ -277,7 +266,7 @@ if __name__ == '__main__': ...@@ -277,7 +266,7 @@ if __name__ == '__main__':
in_feats = graph.ndata['feat'].shape[1] in_feats = graph.ndata['feat'].shape[1]
n_classes = (labels.max() + 1).item() n_classes = (labels.max() + 1).item()
prepare_mp(graph) graph.create_format_()
# Pack data # Pack data
data = train_idx, val_idx, test_idx, in_feats, labels, n_classes, graph, args.head data = train_idx, val_idx, test_idx, in_feats, labels, n_classes, graph, args.head
......
...@@ -81,7 +81,7 @@ class SAGE(nn.Module): ...@@ -81,7 +81,7 @@ class SAGE(nn.Module):
num_workers=args.num_workers) num_workers=args.num_workers)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader): for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0].to(device) block = blocks[0].int().to(device)
h = x[input_nodes].to(device) h = x[input_nodes].to(device)
h_dst = h[:block.number_of_dst_nodes()] h_dst = h[:block.number_of_dst_nodes()]
...@@ -95,17 +95,6 @@ class SAGE(nn.Module): ...@@ -95,17 +95,6 @@ class SAGE(nn.Module):
x = y x = y
return y return y
def prepare_mp(g):
"""
Explicitly materialize the CSR, CSC and COO representation of the given graph
so that they could be shared via copy-on-write to sampler workers and GPU
trainers.
This is a workaround before full shared memory support on heterogeneous graphs.
"""
g.in_degree(0)
g.out_degree(0)
g.find_edges([0])
def compute_acc(pred, labels): def compute_acc(pred, labels):
""" """
Compute the accuracy of prediction given the labels. Compute the accuracy of prediction given the labels.
...@@ -175,7 +164,7 @@ def run(args, device, data): ...@@ -175,7 +164,7 @@ def run(args, device, data):
tic_step = time.time() tic_step = time.time()
# copy block to gpu # copy block to gpu
blocks = [blk.to(device) for blk in blocks] blocks = [blk.int().to(device) for blk in blocks]
# Load the input features as well as output labels # Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(g, labels, seeds, input_nodes, device) batch_inputs, batch_labels = load_subtensor(g, labels, seeds, input_nodes, device)
...@@ -245,7 +234,7 @@ if __name__ == '__main__': ...@@ -245,7 +234,7 @@ if __name__ == '__main__':
in_feats = graph.ndata['feat'].shape[1] in_feats = graph.ndata['feat'].shape[1]
n_classes = (labels.max() + 1).item() n_classes = (labels.max() + 1).item()
prepare_mp(graph) graph.create_format_()
# Pack data # Pack data
data = train_idx, val_idx, test_idx, in_feats, labels, n_classes, graph data = train_idx, val_idx, test_idx, in_feats, labels, n_classes, graph
......
...@@ -27,13 +27,8 @@ class GATConv(nn.Block): ...@@ -27,13 +27,8 @@ class GATConv(nn.Block):
Parameters Parameters
---------- ----------
in_feats : int or pair of ints in_feats : int
Input feature size. Number of input features.
If the layer is to be applied to a unidirectional bipartite graph, ``in_feats``
specifies the input feature size on both the source and destination nodes. If
a scalar is given, the source and destination node feature size would take the
same value.
out_feats : int out_feats : int
Output feature size. Output feature size.
num_heads : int num_heads : int
...@@ -65,14 +60,6 @@ class GATConv(nn.Block): ...@@ -65,14 +60,6 @@ class GATConv(nn.Block):
self._in_feats = in_feats self._in_feats = in_feats
self._out_feats = out_feats self._out_feats = out_feats
with self.name_scope(): with self.name_scope():
if isinstance(in_feats, tuple):
self.fc_src = nn.Dense(out_feats * num_heads, use_bias=False,
weight_initializer=mx.init.Xavier(magnitude=math.sqrt(2.0)),
in_units=self._in_src_feats)
self.fc_dst = nn.Dense(out_feats * num_heads, use_bias=False,
weight_initializer=mx.init.Xavier(magnitude=math.sqrt(2.0)),
in_units=self._in_dst_feats)
else:
self.fc = nn.Dense(out_feats * num_heads, use_bias=False, self.fc = nn.Dense(out_feats * num_heads, use_bias=False,
weight_initializer=mx.init.Xavier(magnitude=math.sqrt(2.0)), weight_initializer=mx.init.Xavier(magnitude=math.sqrt(2.0)),
in_units=in_feats) in_units=in_feats)
...@@ -121,9 +108,9 @@ class GATConv(nn.Block): ...@@ -121,9 +108,9 @@ class GATConv(nn.Block):
if isinstance(feat, tuple): if isinstance(feat, tuple):
h_src = self.feat_drop(feat[0]) h_src = self.feat_drop(feat[0])
h_dst = self.feat_drop(feat[1]) h_dst = self.feat_drop(feat[1])
feat_src = self.fc_src(h_src).reshape( feat_src = self.fc(h_src).reshape(
-1, self._num_heads, self._out_feats) -1, self._num_heads, self._out_feats)
feat_dst = self.fc_dst(h_dst).reshape( feat_dst = self.fc(h_dst).reshape(
-1, self._num_heads, self._out_feats) -1, self._num_heads, self._out_feats)
else: else:
h_src = h_dst = self.feat_drop(feat) h_src = h_dst = self.feat_drop(feat)
......
...@@ -67,12 +67,6 @@ class GATConv(layers.Layer): ...@@ -67,12 +67,6 @@ class GATConv(layers.Layer):
self._out_feats = out_feats self._out_feats = out_feats
xinit = tf.keras.initializers.VarianceScaling(scale=np.sqrt( xinit = tf.keras.initializers.VarianceScaling(scale=np.sqrt(
2), mode="fan_avg", distribution="untruncated_normal") 2), mode="fan_avg", distribution="untruncated_normal")
if isinstance(in_feats, tuple):
self.fc_src = layers.Dense(
out_feats * num_heads, use_bias=False, kernel_initializer=xinit)
self.fc_dst = layers.Dense(
out_feats * num_heads, use_bias=False, kernel_initializer=xinit)
else:
self.fc = layers.Dense( self.fc = layers.Dense(
out_feats * num_heads, use_bias=False, kernel_initializer=xinit) out_feats * num_heads, use_bias=False, kernel_initializer=xinit)
self.attn_l = tf.Variable(initial_value=xinit( self.attn_l = tf.Variable(initial_value=xinit(
...@@ -116,8 +110,8 @@ class GATConv(layers.Layer): ...@@ -116,8 +110,8 @@ class GATConv(layers.Layer):
if isinstance(feat, tuple): if isinstance(feat, tuple):
h_src = self.feat_drop(feat[0]) h_src = self.feat_drop(feat[0])
h_dst = self.feat_drop(feat[1]) h_dst = self.feat_drop(feat[1])
feat_src = tf.reshape(self.fc_src(h_src), (-1, self._num_heads, self._out_feats)) feat_src = tf.reshape(self.fc(h_src), (-1, self._num_heads, self._out_feats))
feat_dst = tf.reshape(self.fc_dst(h_dst), (-1, self._num_heads, self._out_feats)) feat_dst = tf.reshape(self.fc(h_dst), (-1, self._num_heads, self._out_feats))
else: else:
h_src = h_dst = self.feat_drop(feat) h_src = h_dst = self.feat_drop(feat)
feat_src = feat_dst = tf.reshape( feat_src = feat_dst = tf.reshape(
......
...@@ -173,9 +173,9 @@ def test_gat_conv(g, idtype): ...@@ -173,9 +173,9 @@ def test_gat_conv(g, idtype):
def test_gat_conv_bi(g, idtype): def test_gat_conv_bi(g, idtype):
g = g.astype(idtype).to(F.ctx()) g = g.astype(idtype).to(F.ctx())
ctx = F.ctx() ctx = F.ctx()
gat = nn.GATConv((5, 10), 2, 4) gat = nn.GATConv(5, 2, 4)
gat.initialize(ctx=ctx) gat.initialize(ctx=ctx)
feat = (F.randn((g.number_of_src_nodes(), 5)), F.randn((g.number_of_dst_nodes(), 10))) feat = (F.randn((g.number_of_src_nodes(), 5)), F.randn((g.number_of_dst_nodes(), 5)))
h = gat(g, feat) h = gat(g, feat)
assert h.shape == (g.number_of_dst_nodes(), 4, 2) assert h.shape == (g.number_of_dst_nodes(), 4, 2)
......
...@@ -371,8 +371,8 @@ def test_gat_conv(g, idtype): ...@@ -371,8 +371,8 @@ def test_gat_conv(g, idtype):
def test_gat_conv_bi(g, idtype): def test_gat_conv_bi(g, idtype):
g = g.astype(idtype).to(F.ctx()) g = g.astype(idtype).to(F.ctx())
ctx = F.ctx() ctx = F.ctx()
gat = nn.GATConv((5, 10), 2, 4) gat = nn.GATConv(5, 2, 4)
feat = (F.randn((g.number_of_src_nodes(), 5)), F.randn((g.number_of_dst_nodes(), 10))) feat = (F.randn((g.number_of_src_nodes(), 5)), F.randn((g.number_of_dst_nodes(), 5)))
h = gat(g, feat) h = gat(g, feat)
assert h.shape == (g.number_of_dst_nodes(), 4, 2) assert h.shape == (g.number_of_dst_nodes(), 4, 2)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment