"src/vscode:/vscode.git/clone" did not exist on "0d4dfbbd0a26d463d45a79b4667e288e00c3e0a0"
Unverified Commit 22ccf436 authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

[Test] Tweak end2end benchmarks to be more reasonable (#2643)



* change timeout to reasonable ranges

* rgcn ns

* fix all ns speed tests
Co-authored-by: default avatarJinjing Zhou <VoVAllen@users.noreply.github.com>
parent e4ff4844
......@@ -182,21 +182,21 @@ def evaluate(model, embed_layer, eval_loader, node_feats):
with th.no_grad():
for sample_data in eval_loader:
th.cuda.empty_cache()
seeds, blocks = sample_data
_, _, blocks = sample_data
feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata[dgl.NTYPE],
blocks[0].srcdata['type_id'],
node_feats)
logits = model(blocks, feats)
eval_logits.append(logits.cpu().detach())
eval_seeds.append(seeds.cpu().detach())
eval_seeds.append(blocks[-1].dstdata['type_id'].cpu().detach())
eval_logits = th.cat(eval_logits)
eval_seeds = th.cat(eval_seeds)
return eval_logits, eval_seeds
@utils.benchmark('time', 3600)
@utils.benchmark('time', 3600) # ogbn-mag takes ~1 hour to train
@utils.parametrize('data', ['am', 'ogbn-mag'])
def track_acc(data):
dataset = utils.process_data(data)
......@@ -205,9 +205,11 @@ def track_acc(data):
if data == 'am':
n_bases = 40
l2norm = 5e-4
n_epochs = 20
elif data == 'ogbn-mag':
n_bases = 2
l2norm = 0
n_epochs = 20
else:
raise ValueError()
......@@ -218,7 +220,6 @@ def track_acc(data):
dropout = 0.5
use_self_loop = True
lr = 0.01
n_epochs = 20
low_mem = True
num_workers = 4
......@@ -264,26 +265,28 @@ def track_acc(data):
node_tids = g.ndata[dgl.NTYPE]
loc = (node_tids == category_id)
target_nids = node_ids[loc]
train_nids = target_nids[train_idx]
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
g.create_formats_()
g = g.formats('csc')
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
collator = dgl.dataloading.NodeCollator(g, train_nids, sampler, return_indices=True)
loader = dgl.dataloading.DataLoader(
collator.dataset, collate_fn=collator.collate,
batch_size=batch_size, shuffle=True, num_workers=4)
# test_sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
test_loader = DataLoader(dataset=test_idx.numpy(),
train_loader = dgl.dataloading.NodeDataLoader(
g,
target_nids[train_idx],
sampler,
batch_size=batch_size,
collate_fn=collator.collate,
shuffle=False,
num_workers=4)
shuffle=True,
drop_last=False,
num_workers=num_workers)
test_loader = dgl.dataloading.NodeDataLoader(
g,
target_nids[test_idx],
sampler,
batch_size=batch_size,
shuffle=True,
drop_last=False,
num_workers=num_workers)
# node features
# None for one-hot feature, if not none, it should be the feature tensor.
#
embed_layer = RelGraphEmbedLayer(device,
g.number_of_nodes(),
node_tids,
......@@ -314,19 +317,19 @@ def track_acc(data):
emb_optimizer = th.optim.SparseAdam(list(embed_layer.node_embeds.parameters()), lr=lr)
print("start training...")
t0 = time.time()
for epoch in range(n_epochs):
model.train()
embed_layer.train()
for i, sample_data in enumerate(loader):
input_nodes, output_nodes, seed_idx, blocks = sample_data
for i, sample_data in enumerate(train_loader):
input_nodes, output_nodes, blocks = sample_data
feats = embed_layer(input_nodes,
blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'],
node_feats)
logits = model(blocks, feats)
loss = F.cross_entropy(logits, labels[train_idx][seed_idx])
seed_idx = blocks[-1].dstdata['type_id']
loss = F.cross_entropy(logits, labels[seed_idx])
optimizer.zero_grad()
emb_optimizer.zero_grad()
......@@ -334,8 +337,10 @@ def track_acc(data):
optimizer.step()
emb_optimizer.step()
print('start testing...')
test_logits, test_seeds = evaluate(model, embed_layer, test_loader, node_feats)
test_loss = F.cross_entropy(test_logits, labels[test_seeds].cpu()).item()
test_acc = th.sum(test_logits.argmax(dim=1) == labels[test_seeds].cpu()).item() / len(test_seeds)
t1 = time.time()
return test_acc
......@@ -118,7 +118,7 @@ def load_subtensor(g, seeds, input_nodes, device):
return batch_inputs, batch_labels
@utils.benchmark('acc', 3600)
@utils.benchmark('acc', 600)
@utils.parametrize('data', ['ogbn-products', "reddit"])
def track_acc(data):
data = utils.process_data(data)
......
......@@ -66,7 +66,7 @@ def load_subtensor(g, seeds, input_nodes, device):
batch_labels = g.ndata['labels'][seeds].to(device)
return batch_inputs, batch_labels
@utils.benchmark('time', 3600)
@utils.benchmark('time', 600)
@utils.parametrize('data', ['reddit', 'ogbn-products'])
def track_time(data):
data = utils.process_data(data)
......@@ -82,7 +82,6 @@ def track_time(data):
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
g.create_formats_()
num_epochs = 20
num_hidden = 16
num_heads = 8
num_layers = 2
......@@ -113,7 +112,7 @@ def track_time(data):
loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
# dry run one epoch
# dry run
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
......@@ -128,16 +127,17 @@ def track_time(data):
loss.backward()
optimizer.step()
if step >= 3:
break
# Training loop
avg = 0
iter_tput = []
t0 = time.time()
for epoch in range(num_epochs):
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
blocks = [block.int().to(device) for block in blocks]
batch_inputs = blocks[0].srcdata['features']
batch_labels = blocks[-1].dstdata['labels']
......@@ -149,6 +149,9 @@ def track_time(data):
loss.backward()
optimizer.step()
if step >= 9: # time 10 loops
break
t1 = time.time()
return (t1 - t0) / num_epochs
return (t1 - t0) / (step + 1)
......@@ -358,7 +358,7 @@ class PinSAGECollator(object):
assign_features_to_blocks(blocks, self.g, self.textset, self.ntype)
return blocks
@utils.benchmark('time', 36000)
@utils.benchmark('time', 600)
@utils.parametrize('data', ['nowplaying_rs'])
def track_time(data):
dataset = utils.process_data(data)
......@@ -377,8 +377,6 @@ def track_time(data):
num_workers = 0
hidden_dims = 16
lr = 3e-5
num_epochs = 5
batches_per_epoch = 20000
g = dataset[0]
# Sampler
......@@ -398,7 +396,6 @@ def track_time(data):
batch_size=batch_size,
collate_fn=collator.collate_test,
num_workers=num_workers)
dataloader_it = iter(dataloader)
# Model
model = PinSAGEModel(g, item_ntype, textset, hidden_dims, num_layers).to(device)
......@@ -406,8 +403,7 @@ def track_time(data):
opt = torch.optim.Adam(model.parameters(), lr=lr)
model.train()
for batch_id in range(batches_per_epoch):
pos_graph, neg_graph, blocks = next(dataloader_it)
for batch_id, (pos_graph, neg_graph, blocks) in enumerate(dataloader):
# Copy to GPU
for i in range(len(blocks)):
blocks[i] = blocks[i].to(device)
......@@ -419,13 +415,13 @@ def track_time(data):
loss.backward()
opt.step()
if batch_id >= 3:
break
print("start training...")
t0 = time.time()
# For each batch of head-tail-negative triplets...
for epoch_id in range(num_epochs):
model.train()
for batch_id in range(batches_per_epoch):
pos_graph, neg_graph, blocks = next(dataloader_it)
for batch_id, (pos_graph, neg_graph, blocks) in enumerate(dataloader):
# Copy to GPU
for i in range(len(blocks)):
blocks[i] = blocks[i].to(device)
......@@ -437,6 +433,9 @@ def track_time(data):
loss.backward()
opt.step()
if batch_id >= 10: # time 10 loops
break
t1 = time.time()
return (t1 - t0) / num_epochs
return (t1 - t0) / (batch_id + 1)
......@@ -38,7 +38,7 @@ class RGCN(nn.Module):
h = layer(g, h, r, norm)
return h
@utils.benchmark('time', 3600)
@utils.benchmark('time', 300)
@utils.parametrize('data', ['aifb'])
@utils.parametrize('lowmem', [True, False])
@utils.parametrize('use_type_count', [True, False])
......
......@@ -227,7 +227,7 @@ class EntityClassify(nn.Module):
h = layer(block, h)
return h
@utils.benchmark('time', 3600)
@utils.benchmark('time', 600)
@utils.parametrize('data', ['am', 'ogbn-mag'])
def track_time(data):
dataset = utils.process_data(data)
......@@ -249,7 +249,6 @@ def track_time(data):
dropout = 0.5
use_self_loop = True
lr = 0.01
n_epochs = 5
hg = dataset[0]
category = dataset.predict_category
......@@ -284,12 +283,7 @@ def track_time(data):
hg, {category: train_idx}, sampler,
batch_size=batch_size, shuffle=True, num_workers=4)
for epoch in range(1):
model.train()
embed_layer.train()
optimizer.zero_grad()
sparse_optimizer.zero_grad()
# dry run
for i, (input_nodes, seeds, blocks) in enumerate(loader):
blocks = [blk.to(device) for blk in blocks]
seeds = seeds[category] # we only predict the nodes with type "category"
......@@ -303,14 +297,16 @@ def track_time(data):
optimizer.step()
sparse_optimizer.step()
if i >= 3:
break
print("start training...")
t0 = time.time()
for epoch in range(n_epochs):
model.train()
embed_layer.train()
optimizer.zero_grad()
sparse_optimizer.zero_grad()
t0 = time.time()
for i, (input_nodes, seeds, blocks) in enumerate(loader):
blocks = [blk.to(device) for blk in blocks]
seeds = seeds[category] # we only predict the nodes with type "category"
......@@ -324,6 +320,9 @@ def track_time(data):
optimizer.step()
sparse_optimizer.step()
if i >= 9: # time 10 loops
break
t1 = time.time()
return (t1 - t0) / n_epochs
return (t1 - t0) / (i + 1)
......@@ -173,7 +173,7 @@ class RelGraphEmbedLayer(nn.Module):
return embeds
@utils.benchmark('time', 3600)
@utils.benchmark('time', 600)
@utils.parametrize('data', ['am', 'ogbn-mag'])
def track_time(data):
dataset = utils.process_data(data)
......@@ -195,7 +195,6 @@ def track_time(data):
dropout = 0.5
use_self_loop = True
lr = 0.01
n_epochs = 5
low_mem = True
num_workers = 4
......@@ -241,14 +240,16 @@ def track_time(data):
target_nids = node_ids[loc]
train_nids = target_nids[train_idx]
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
g.create_formats_()
g = g.formats('csc')
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
collator = dgl.dataloading.NodeCollator(g, train_nids, sampler, return_indices=True)
loader = dgl.dataloading.DataLoader(
collator.dataset, collate_fn=collator.collate,
batch_size=batch_size, shuffle=True, num_workers=4)
loader = dgl.dataloading.NodeDataLoader(
g,
target_nids[train_idx],
sampler,
batch_size=batch_size,
shuffle=True,
drop_last=False,
num_workers=num_workers)
# node features
# None for one-hot feature, if not none, it should be the feature tensor.
......@@ -282,20 +283,40 @@ def track_time(data):
optimizer = th.optim.Adam(all_params, lr=lr, weight_decay=l2norm)
emb_optimizer = th.optim.SparseAdam(list(embed_layer.node_embeds.parameters()), lr=lr)
# dry run
for i, sample_data in enumerate(loader):
input_nodes, output_nodes, blocks = sample_data
feats = embed_layer(input_nodes,
blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'],
node_feats)
logits = model(blocks, feats)
seed_idx = blocks[-1].dstdata['type_id']
loss = F.cross_entropy(logits, labels[seed_idx])
optimizer.zero_grad()
emb_optimizer.zero_grad()
loss.backward()
optimizer.step()
emb_optimizer.step()
if i >= 3:
break
print("start training...")
t0 = time.time()
for epoch in range(n_epochs):
model.train()
embed_layer.train()
t0 = time.time()
for i, sample_data in enumerate(loader):
input_nodes, output_nodes, seed_idx, blocks = sample_data
input_nodes, output_nodes, blocks = sample_data
feats = embed_layer(input_nodes,
blocks[0].srcdata['ntype'],
blocks[0].srcdata['type_id'],
node_feats)
logits = model(blocks, feats)
loss = F.cross_entropy(logits, labels[train_idx][seed_idx])
seed_idx = blocks[-1].dstdata['type_id']
loss = F.cross_entropy(logits, labels[seed_idx])
optimizer.zero_grad()
emb_optimizer.zero_grad()
......@@ -303,6 +324,8 @@ def track_time(data):
optimizer.step()
emb_optimizer.step()
if i >= 9: # time 10 loops
break
t1 = time.time()
return (t1 - t0) / n_epochs
return (t1 - t0) / (i + 1)
......@@ -47,7 +47,7 @@ def load_subtensor(g, seeds, input_nodes, device):
batch_labels = g.ndata['labels'][seeds].to(device)
return batch_inputs, batch_labels
@utils.benchmark('time', 3600)
@utils.benchmark('time', 600)
@utils.parametrize('data', ['reddit', 'ogbn-products'])
def track_time(data):
data = utils.process_data(data)
......@@ -92,7 +92,7 @@ def track_time(data):
loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
# dry run one epoch
# dry run
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
......@@ -107,13 +107,13 @@ def track_time(data):
loss.backward()
optimizer.step()
if step >= 3:
break
# Training loop
avg = 0
iter_tput = []
t0 = time.time()
for epoch in range(num_epochs):
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
......@@ -128,6 +128,9 @@ def track_time(data):
loss.backward()
optimizer.step()
if step >= 9: # time 10 loops
break
t1 = time.time()
return (t1 - t0) / num_epochs
return (t1 - t0) / (step + 1)
......@@ -89,9 +89,11 @@ class CrossEntropyLoss(nn.Module):
loss = F.binary_cross_entropy_with_logits(score, label.float())
return loss
@utils.benchmark('time', 72000)
@utils.benchmark('time', 600)
@utils.parametrize('data', ['reddit'])
def track_time(data):
@utils.parametrize('num_negs', [2, 8, 32])
@utils.parametrize('batch_size', [1024, 2048, 8192])
def track_time(data, num_negs, batch_size):
data = utils.process_data(data)
device = utils.get_bench_device()
g = data[0]
......@@ -108,10 +110,9 @@ def track_time(data):
num_hidden = 16
num_layers = 2
fan_out = '10,25'
batch_size = 10000
lr = 0.003
dropout = 0.5
num_workers = 0
num_workers = 4
num_negs = 2
n_edges = g.number_of_edges()
......@@ -140,14 +141,30 @@ def track_time(data):
loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
# dry run
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
batch_inputs = load_subtensor(g, input_nodes, device)
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
blocks = [block.int().to(device) for block in blocks]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if step >= 3:
break
# Training loop
avg = 0
iter_tput = []
t0 = time.time()
for epoch in range(num_epochs):
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
batch_inputs = load_subtensor(g, input_nodes, device)
pos_graph = pos_graph.to(device)
......@@ -160,6 +177,9 @@ def track_time(data):
loss.backward()
optimizer.step()
if step >= 9: # time 10 loops
break
t1 = time.time()
return (t1 - t0) / num_epochs
return (t1 - t0) / (step + 1)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment