"src/git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "5b0dab1253194881eb826adf819b6c7a0558536e"
Unverified Commit 16169f3a authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Test] Regression speed test for rgcn with neighbor sampling (#2461)



* Add sage neighbor sample test for reddit

* Add ogbn-products dataset

* upd

* upd

* Add ogbn-mag

* rgcn heterogeneous performance test

* upd

* update

* upd

* upd

* use dgl dataloader

* upd

* Update with new collator
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-59-204.ec2.internal>
parent 400687d7
import dgl
import itertools
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.nn.pytorch as dglnn
import time
import traceback
from .. import utils
class RelGraphConvLayer(nn.Module):
r"""Relational graph convolution layer.
Parameters
----------
in_feat : int
Input feature size.
out_feat : int
Output feature size.
rel_names : list[str]
Relation names.
num_bases : int, optional
Number of bases. If is none, use number of relations. Default: None.
weight : bool, optional
True if a linear layer is applied after message passing. Default: True
bias : bool, optional
True if bias is added. Default: True
activation : callable, optional
Activation function. Default: None
self_loop : bool, optional
True to include self loop message. Default: False
dropout : float, optional
Dropout rate. Default: 0.0
"""
def __init__(self,
in_feat,
out_feat,
rel_names,
num_bases,
*,
weight=True,
bias=True,
activation=None,
self_loop=False,
dropout=0.0):
super(RelGraphConvLayer, self).__init__()
self.in_feat = in_feat
self.out_feat = out_feat
self.rel_names = rel_names
self.num_bases = num_bases
self.bias = bias
self.activation = activation
self.self_loop = self_loop
self.conv = dglnn.HeteroGraphConv({
rel : dglnn.GraphConv(in_feat, out_feat, norm='right', weight=False, bias=False)
for rel in rel_names
})
self.use_weight = weight
self.use_basis = num_bases < len(self.rel_names) and weight
if self.use_weight:
if self.use_basis:
self.basis = dglnn.WeightBasis((in_feat, out_feat), num_bases, len(self.rel_names))
else:
self.weight = nn.Parameter(th.Tensor(len(self.rel_names), in_feat, out_feat))
nn.init.xavier_uniform_(self.weight, gain=nn.init.calculate_gain('relu'))
# bias
if bias:
self.h_bias = nn.Parameter(th.Tensor(out_feat))
nn.init.zeros_(self.h_bias)
# weight for self loop
if self.self_loop:
self.loop_weight = nn.Parameter(th.Tensor(in_feat, out_feat))
nn.init.xavier_uniform_(self.loop_weight,
gain=nn.init.calculate_gain('relu'))
self.dropout = nn.Dropout(dropout)
def forward(self, g, inputs):
"""Forward computation
Parameters
----------
g : DGLHeteroGraph
Input graph.
inputs : dict[str, torch.Tensor]
Node feature for each node type.
Returns
-------
dict[str, torch.Tensor]
New node features for each node type.
"""
g = g.local_var()
if self.use_weight:
weight = self.basis() if self.use_basis else self.weight
wdict = {self.rel_names[i] : {'weight' : w.squeeze(0)}
for i, w in enumerate(th.split(weight, 1, dim=0))}
else:
wdict = {}
if g.is_block:
inputs_src = inputs
inputs_dst = {k: v[:g.number_of_dst_nodes(k)] for k, v in inputs.items()}
else:
inputs_src = inputs_dst = inputs
hs = self.conv(g, inputs, mod_kwargs=wdict)
def _apply(ntype, h):
if self.self_loop:
h = h + th.matmul(inputs_dst[ntype], self.loop_weight)
if self.bias:
h = h + self.h_bias
if self.activation:
h = self.activation(h)
return self.dropout(h)
return {ntype : _apply(ntype, h) for ntype, h in hs.items()}
class RelGraphEmbed(nn.Module):
r"""Embedding layer for featureless heterograph."""
def __init__(self,
g,
device,
embed_size,
num_nodes,
node_feats,
embed_name='embed',
activation=None,
dropout=0.0):
super(RelGraphEmbed, self).__init__()
self.g = g
self.device = device
self.embed_size = embed_size
self.embed_name = embed_name
self.activation = activation
self.dropout = nn.Dropout(dropout)
self.node_feats = node_feats
# create weight embeddings for each node for each relation
self.embeds = nn.ParameterDict()
self.node_embeds = nn.ModuleDict()
for ntype in g.ntypes:
if node_feats[ntype] is None:
sparse_emb = th.nn.Embedding(num_nodes[ntype], embed_size, sparse=True)
nn.init.uniform_(sparse_emb.weight, -1.0, 1.0)
self.node_embeds[ntype] = sparse_emb
else:
input_emb_size = node_feats[ntype].shape[1]
embed = nn.Parameter(th.Tensor(input_emb_size, embed_size))
nn.init.xavier_uniform_(embed)
self.embeds[ntype] = embed
def forward(self, block=None):
"""Forward computation
Parameters
----------
block : DGLHeteroGraph, optional
If not specified, directly return the full graph with embeddings stored in
:attr:`embed_name`. Otherwise, extract and store the embeddings to the block
graph and return.
Returns
-------
DGLHeteroGraph
The block graph fed with embeddings.
"""
embeds = {}
for ntype in block.ntypes:
if self.node_feats[ntype] is None:
embeds[ntype] = self.node_embeds[ntype](block.nodes(ntype)).to(self.device)
else:
embeds[ntype] = self.node_feats[ntype][block.nodes(ntype)].to(self.device) @ self.embeds[ntype]
return embeds
class EntityClassify(nn.Module):
def __init__(self,
g,
h_dim, out_dim,
num_bases,
num_hidden_layers=1,
dropout=0,
use_self_loop=False):
super(EntityClassify, self).__init__()
self.g = g
self.h_dim = h_dim
self.out_dim = out_dim
self.rel_names = list(set(g.etypes))
self.rel_names.sort()
if num_bases < 0 or num_bases > len(self.rel_names):
self.num_bases = len(self.rel_names)
else:
self.num_bases = num_bases
self.num_hidden_layers = num_hidden_layers
self.dropout = dropout
self.use_self_loop = use_self_loop
self.layers = nn.ModuleList()
# i2h
self.layers.append(RelGraphConvLayer(
self.h_dim, self.h_dim, self.rel_names,
self.num_bases, activation=F.relu, self_loop=self.use_self_loop,
dropout=self.dropout, weight=False))
# h2h
for i in range(self.num_hidden_layers):
self.layers.append(RelGraphConvLayer(
self.h_dim, self.h_dim, self.rel_names,
self.num_bases, activation=F.relu, self_loop=self.use_self_loop,
dropout=self.dropout))
# h2o
self.layers.append(RelGraphConvLayer(
self.h_dim, self.out_dim, self.rel_names,
self.num_bases, activation=None,
self_loop=self.use_self_loop))
def forward(self, h, blocks):
for layer, block in zip(self.layers, blocks):
h = layer(block, h)
return h
@utils.benchmark('time', 3600)
@utils.parametrize('data', ['am', 'ogbn-mag'])
def track_time(data):
dataset = utils.process_data(data)
device = utils.get_bench_device()
if data == 'am':
n_bases = 40
l2norm = 5e-4
elif data == 'ogbn-mag':
n_bases = 2
l2norm = 0
else:
raise ValueError()
fanout = 4
n_layers = 2
batch_size = 1024
n_hidden = 64
dropout = 0.5
use_self_loop = True
lr = 0.01
n_epochs = 5
hg = dataset[0]
category = dataset.predict_category
num_classes = dataset.num_classes
train_mask = hg.nodes[category].data.pop('train_mask')
train_idx = th.nonzero(train_mask, as_tuple=False).squeeze()
labels = hg.nodes[category].data.pop('labels')
node_feats = {}
num_nodes = {}
for ntype in hg.ntypes:
node_feats[ntype] = hg.nodes[ntype].data['feat'] if 'feat' in hg.nodes[ntype].data else None
num_nodes[ntype] = hg.number_of_nodes(ntype)
embed_layer = RelGraphEmbed(hg, device, n_hidden, num_nodes, node_feats)
model = EntityClassify(hg,
n_hidden,
num_classes,
num_bases=n_bases,
num_hidden_layers=n_layers - 2,
dropout=dropout,
use_self_loop=use_self_loop)
embed_layer = embed_layer.to(device)
model = model.to(device)
all_params = itertools.chain(model.parameters(), embed_layer.embeds.parameters())
optimizer = th.optim.Adam(all_params, lr=lr, weight_decay=l2norm)
sparse_optimizer = th.optim.SparseAdam(list(embed_layer.node_embeds.parameters()), lr=lr)
sampler = dgl.dataloading.MultiLayerNeighborSampler([fanout] * n_layers)
loader = dgl.dataloading.NodeDataLoader(
hg, {category: train_idx}, sampler,
batch_size=batch_size, shuffle=True, num_workers=4)
for epoch in range(1):
model.train()
embed_layer.train()
optimizer.zero_grad()
sparse_optimizer.zero_grad()
for i, (input_nodes, seeds, blocks) in enumerate(loader):
blocks = [blk.to(device) for blk in blocks]
seeds = seeds[category] # we only predict the nodes with type "category"
batch_tic = time.time()
emb = embed_layer(blocks[0])
lbl = labels[seeds].to(device)
emb = {k : e.to(device) for k, e in emb.items()}
logits = model(emb, blocks)[category]
loss = F.cross_entropy(logits, lbl)
loss.backward()
optimizer.step()
sparse_optimizer.step()
print("start training...")
t0 = time.time()
for epoch in range(n_epochs):
model.train()
embed_layer.train()
optimizer.zero_grad()
sparse_optimizer.zero_grad()
for i, (input_nodes, seeds, blocks) in enumerate(loader):
blocks = [blk.to(device) for blk in blocks]
seeds = seeds[category] # we only predict the nodes with type "category"
batch_tic = time.time()
emb = embed_layer(blocks[0])
lbl = labels[seeds].to(device)
emb = {k : e.to(device) for k, e in emb.items()}
logits = model(emb, blocks)[category]
loss = F.cross_entropy(logits, lbl)
loss.backward()
optimizer.step()
sparse_optimizer.step()
t1 = time.time()
return (t1 - t0) / n_epochs
import dgl
import itertools
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.nn.pytorch as dglnn
from dgl.nn import RelGraphConv
import time
from .. import utils
class EntityClassify(nn.Module):
""" Entity classification class for RGCN
Parameters
----------
device : int
Device to run the layer.
num_nodes : int
Number of nodes.
h_dim : int
Hidden dim size.
out_dim : int
Output dim size.
num_rels : int
Numer of relation types.
num_bases : int
Number of bases. If is none, use number of relations.
num_hidden_layers : int
Number of hidden RelGraphConv Layer
dropout : float
Dropout
use_self_loop : bool
Use self loop if True, default False.
low_mem : bool
True to use low memory implementation of relation message passing function
trade speed with memory consumption
"""
def __init__(self,
device,
num_nodes,
h_dim,
out_dim,
num_rels,
num_bases=None,
num_hidden_layers=1,
dropout=0,
use_self_loop=False,
low_mem=False,
layer_norm=False):
super(EntityClassify, self).__init__()
self.device = device
self.num_nodes = num_nodes
self.h_dim = h_dim
self.out_dim = out_dim
self.num_rels = num_rels
self.num_bases = None if num_bases < 0 else num_bases
self.num_hidden_layers = num_hidden_layers
self.dropout = dropout
self.use_self_loop = use_self_loop
self.low_mem = low_mem
self.layer_norm = layer_norm
self.layers = nn.ModuleList()
# i2h
self.layers.append(RelGraphConv(
self.h_dim, self.h_dim, self.num_rels, "basis",
self.num_bases, activation=F.relu, self_loop=self.use_self_loop,
low_mem=self.low_mem, dropout=self.dropout, layer_norm = layer_norm))
# h2h
for idx in range(self.num_hidden_layers):
self.layers.append(RelGraphConv(
self.h_dim, self.h_dim, self.num_rels, "basis",
self.num_bases, activation=F.relu, self_loop=self.use_self_loop,
low_mem=self.low_mem, dropout=self.dropout, layer_norm = layer_norm))
# h2o
self.layers.append(RelGraphConv(
self.h_dim, self.out_dim, self.num_rels, "basis",
self.num_bases, activation=None,
self_loop=self.use_self_loop,
low_mem=self.low_mem, layer_norm = layer_norm))
def forward(self, blocks, feats, norm=None):
if blocks is None:
# full graph training
blocks = [self.g] * len(self.layers)
h = feats
for layer, block in zip(self.layers, blocks):
block = block.to(self.device)
h = layer(block, h, block.edata['etype'], block.edata['norm'])
return h
class RelGraphEmbedLayer(nn.Module):
r"""Embedding layer for featureless heterograph.
Parameters
----------
device : int
Device to run the layer.
num_nodes : int
Number of nodes.
node_tides : tensor
Storing the node type id for each node starting from 0
num_of_ntype : int
Number of node types
input_size : list of int
A list of input feature size for each node type. If None, we then
treat certain input feature as an one-hot encoding feature.
embed_size : int
Output embed size
embed_name : str, optional
Embed name
"""
def __init__(self,
device,
num_nodes,
node_tids,
num_of_ntype,
input_size,
embed_size,
sparse_emb=False,
embed_name='embed'):
super(RelGraphEmbedLayer, self).__init__()
self.device = device
self.embed_size = embed_size
self.embed_name = embed_name
self.num_nodes = num_nodes
self.sparse_emb = sparse_emb
# create weight embeddings for each node for each relation
self.embeds = nn.ParameterDict()
self.num_of_ntype = num_of_ntype
self.idmap = th.empty(num_nodes).long()
for ntype in range(num_of_ntype):
if input_size[ntype] is not None:
input_emb_size = input_size[ntype].shape[1]
embed = nn.Parameter(th.Tensor(input_emb_size, self.embed_size))
nn.init.xavier_uniform_(embed)
self.embeds[str(ntype)] = embed
self.node_embeds = th.nn.Embedding(node_tids.shape[0], self.embed_size, sparse=self.sparse_emb)
nn.init.uniform_(self.node_embeds.weight, -1.0, 1.0)
def forward(self, node_ids, node_tids, type_ids, features):
"""Forward computation
Parameters
----------
node_ids : tensor
node ids to generate embedding for.
node_tids : tensor
node type ids
features : list of features
list of initial features for nodes belong to different node type.
If None, the corresponding features is an one-hot encoding feature,
else use the features directly as input feature and matmul a
projection matrix.
Returns
-------
tensor
embeddings as the input of the next layer
"""
tsd_ids = node_ids.to(self.node_embeds.weight.device)
embeds = th.empty(node_ids.shape[0], self.embed_size, device=self.device)
for ntype in range(self.num_of_ntype):
if features[ntype] is not None:
loc = node_tids == ntype
embeds[loc] = features[ntype][type_ids[loc]].to(self.device) @ self.embeds[str(ntype)].to(self.device)
else:
loc = node_tids == ntype
embeds[loc] = self.node_embeds(tsd_ids[loc]).to(self.device)
return embeds
@utils.benchmark('time', 3600)
@utils.parametrize('data', ['am', 'ogbn-mag'])
def track_time(data):
dataset = utils.process_data(data)
device = utils.get_bench_device()
if data == 'am':
n_bases = 40
l2norm = 5e-4
elif data == 'ogbn-mag':
n_bases = 2
l2norm = 0
else:
raise ValueError()
fanouts = [25,15]
n_layers = 2
batch_size = 1024
n_hidden = 64
dropout = 0.5
use_self_loop = True
lr = 0.01
n_epochs = 5
low_mem = True
num_workers = 4
hg = dataset[0]
category = dataset.predict_category
num_classes = dataset.num_classes
train_mask = hg.nodes[category].data.pop('train_mask')
train_idx = th.nonzero(train_mask, as_tuple=False).squeeze()
labels = hg.nodes[category].data.pop('labels').to(device)
num_of_ntype = len(hg.ntypes)
num_rels = len(hg.canonical_etypes)
node_feats = []
for ntype in hg.ntypes:
if len(hg.nodes[ntype].data) == 0 or 'feat' not in hg.nodes[ntype].data:
node_feats.append(None)
else:
feat = hg.nodes[ntype].data.pop('feat')
node_feats.append(feat.share_memory_())
# get target category id
category_id = len(hg.ntypes)
for i, ntype in enumerate(hg.ntypes):
if ntype == category:
category_id = i
g = dgl.to_homogeneous(hg)
u, v, eid = g.all_edges(form='all')
# global norm
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True)
degrees = count[inverse_index]
norm = th.ones(eid.shape[0]) / degrees
norm = norm.unsqueeze(1)
g.edata['norm'] = norm
g.edata['etype'] = g.edata[dgl.ETYPE]
g.ndata['type_id'] = g.ndata[dgl.NID]
g.ndata['ntype'] = g.ndata[dgl.NTYPE]
node_ids = th.arange(g.number_of_nodes())
# find out the target node ids
node_tids = g.ndata[dgl.NTYPE]
loc = (node_tids == category_id)
target_nids = node_ids[loc]
train_nids = target_nids[train_idx]
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
g.create_formats_()
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
collator = dgl.dataloading.NodeCollator(g, train_nids, sampler, return_indices=True)
loader = dgl.dataloading.DataLoader(
collator.dataset, collate_fn=collator.collate,
batch_size=batch_size, shuffle=True, num_workers=4)
# node features
# None for one-hot feature, if not none, it should be the feature tensor.
#
embed_layer = RelGraphEmbedLayer(device,
g.number_of_nodes(),
node_tids,
num_of_ntype,
node_feats,
n_hidden,
sparse_emb=True)
# create model
# all model params are in device.
model = EntityClassify(device,
g.number_of_nodes(),
n_hidden,
num_classes,
num_rels,
num_bases=n_bases,
num_hidden_layers=n_layers - 2,
dropout=dropout,
use_self_loop=use_self_loop,
low_mem=low_mem,
layer_norm=False)
embed_layer = embed_layer.to(device)
model = model.to(device)
all_params = itertools.chain(model.parameters(), embed_layer.embeds.parameters())
optimizer = th.optim.Adam(all_params, lr=lr, weight_decay=l2norm)
emb_optimizer = th.optim.SparseAdam(list(embed_layer.node_embeds.parameters()), lr=lr)
print("start training...")
t0 = time.time()
for epoch in range(n_epochs):
model.train()
embed_layer.train()
for i, sample_data in enumerate(loader):
input_nodes, output_nodes, seed_idx, blocks = sample_data
feats = embed_layer(input_nodes,
blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'],
node_feats)
logits = model(blocks, feats)
loss = F.cross_entropy(logits, labels[train_idx][seed_idx])
optimizer.zero_grad()
emb_optimizer.zero_grad()
loss.backward()
optimizer.step()
emb_optimizer.step()
t1 = time.time()
return (t1 - t0) / n_epochs
......@@ -40,9 +40,10 @@ def get_graph(name):
return None
class OGBDataset(object):
def __init__(self, g, num_labels):
def __init__(self, g, num_labels, predict_category=None):
self._g = g
self._num_labels = num_labels
self._predict_category = predict_category
@property
def num_labels(self):
......@@ -52,10 +53,15 @@ class OGBDataset(object):
def num_classes(self):
return self._num_labels
@property
def predict_category(self):
return self._predict_category
def __getitem__(self, idx):
return self._g
def load_ogb_product(name):
def load_ogb_product():
name = 'ogbn-products'
from ogb.nodeproppred import DglNodePropPredDataset
os.symlink('/tmp/dataset/', os.path.join(os.getcwd(), 'dataset'))
......@@ -85,6 +91,41 @@ def load_ogb_product(name):
return OGBDataset(graph, num_labels)
def load_ogb_mag():
name = 'ogbn-mag'
from ogb.nodeproppred import DglNodePropPredDataset
os.symlink('/tmp/dataset/', os.path.join(os.getcwd(), 'dataset'))
print('load', name)
dataset = DglNodePropPredDataset(name=name)
print('finish loading', name)
split_idx = dataset.get_idx_split()
train_idx = split_idx["train"]['paper']
val_idx = split_idx["valid"]['paper']
test_idx = split_idx["test"]['paper']
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
u, v = hg_orig.all_edges(etype=etype)
subgs[etype] = (u, v)
subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u)
hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']
hg.nodes['paper'].data['labels'] = labels['paper'].squeeze()
train_mask = torch.zeros((hg.number_of_nodes('paper'),), dtype=torch.bool)
train_mask[train_idx] = True
val_mask = torch.zeros((hg.number_of_nodes('paper'),), dtype=torch.bool)
val_mask[val_idx] = True
test_mask = torch.zeros((hg.number_of_nodes('paper'),), dtype=torch.bool)
test_mask[test_idx] = True
hg.nodes['paper'].data['train_mask'] = train_mask
hg.nodes['paper'].data['val_mask'] = val_mask
hg.nodes['paper'].data['test_mask'] = test_mask
num_classes = dataset.num_classes
return OGBDataset(hg, num_classes, 'paper')
class PinsageDataset:
def __init__(self, g, user_ntype, item_ntype, textset):
self._g = g
......@@ -163,7 +204,9 @@ def process_data(name):
elif name == 'reddit':
return dgl.data.RedditDataset(self_loop=True)
elif name == 'ogbn-products':
return load_ogb_product('ogbn-products')
return load_ogb_product()
elif name == 'ogbn-mag':
return load_ogb_mag()
elif name == 'nowplaying_rs':
return load_nowplaying_rs()
else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment