Unverified Commit 42f5154a authored by Mingbang Wang's avatar Mingbang Wang Committed by GitHub
Browse files

[GrahpBolt] refine hetero_rgcn example (#6808)


Co-authored-by: default avatarRhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
parent d4b5ddcd
......@@ -24,12 +24,11 @@ Below results are roughly collected from an AWS EC2 **g4dn.metal**, 384GB RAM, 9
### Accuracies
```
Final performance:
All runs:
Highest Train: 64.66 ± 0.74
Highest Valid: 41.31 ± 0.12
Final Train: 64.66 ± 0.74
Final Test: 40.07 ± 0.02
Epoch: 01, Loss: 2.6244, Valid accuracy: 33.57%, Time 61.9471
Epoch: 02, Loss: 2.0163, Valid accuracy: 35.10%, Time 60.7852
Epoch: 03, Loss: 1.7061, Valid accuracy: 36.70%, Time 60.7745
Test accuracy 35.2679
```
## Run on `ogb-lsc-mag240m` dataset
......
......@@ -42,9 +42,11 @@ main
└───> EntityClassify.evaluate
"""
import argparse
import itertools
import sys
import time
import dgl
import dgl.graphbolt as gb
......@@ -52,7 +54,7 @@ import dgl.nn as dglnn
import psutil
import torch as th
import torch
import torch.nn as nn
import torch.nn.functional as F
from dgl.nn import HeteroEmbedding
......@@ -78,7 +80,14 @@ def load_dataset(dataset_name):
test_set = dataset.tasks[0].test_set
num_classes = dataset.tasks[0].metadata["num_classes"]
return graph, features, train_set, valid_set, test_set, num_classes
return (
graph,
features,
train_set,
valid_set,
test_set,
num_classes,
)
def create_dataloader(
......@@ -128,7 +137,7 @@ def create_dataloader(
# `device`:
# The device to move the mini-batch to.
# [TODO] Moving `MiniBatch` to GPU is not supported yet.
device = th.device("cpu")
device = torch.device("cpu")
datapipe = datapipe.copy_to(device)
# Create a DataLoader from the datapipe.
......@@ -144,6 +153,32 @@ def extract_embed(node_embed, input_nodes):
return emb
def extract_node_features(name, block, data, node_embed, device):
"""Extract the node features from embedding layer or raw features."""
if name == "ogbn-mag":
input_nodes = {
k: v.to(device) for k, v in block.srcdata[dgl.NID].items()
}
# Extract node embeddings for the input nodes.
node_features = extract_embed(node_embed, input_nodes)
# Add the batch's raw "paper" features. Corresponds to the content
# in the function `rel_graph_embed` comment.
node_features.update(
{"paper": data.node_features[("paper", "feat")].to(device)}
)
else:
node_features = {
ntype: data.node_features[(ntype, "feat")]
for ntype in block.srctypes
}
# Original feature data are stored in float16 while model weights are
# float32, so we need to convert the features to float32.
node_features = {
k: v.to(device).float() for k, v in node_features.items()
}
return node_features
def rel_graph_embed(graph, embed_size):
"""Initialize a heterogenous embedding layer for all node types in the
graph, except for the "paper" node type.
......@@ -298,7 +333,6 @@ class RelGraphConvLayer(nn.Module):
inputs_dst = {
k: v[: g.number_of_dst_nodes(k)] for k, v in inputs.items()
}
# Apply the convolution operation on the graph. mod_kwargs are
# additional arguments for each relation function defined in the
# HeteroGraphConv. In this case, it's the weights for each relation.
......@@ -366,90 +400,13 @@ class EntityClassify(nn.Module):
for layer in self.layers:
layer.reset_parameters()
def forward(self, h, blocks):
def forward(self, blocks, h):
for layer, block in zip(self.layers, blocks):
h = layer(block, h)
return h
class Logger(object):
r"""
This class was taken directly from the PyG implementation and can be found
here: https://github.com/snap-stanford/ogb/blob/master/examples/nodeproppre
d/mag/logger.py
This was done to ensure that performance was measured in precisely the same
way
"""
def __init__(self, runs):
self.results = [[] for _ in range(runs)]
def add_result(self, run, result):
assert len(result) == 3
assert run >= 0 and run < len(self.results)
self.results[run].append(result)
def print_statistics(self, run=None):
if run is not None:
result = 100 * th.tensor(self.results[run])
argmax = result[:, 1].argmax().item()
print(f"Run {run + 1:02d}:")
print(f"Highest Train: {result[:, 0].max():.2f}")
print(f"Highest Valid: {result[:, 1].max():.2f}")
print(f" Final Train: {result[argmax, 0]:.2f}")
print(f" Final Test: {result[argmax, 2]:.2f}")
else:
result = 100 * th.tensor(self.results)
best_results = []
for r in result:
train1 = r[:, 0].max().item()
valid = r[:, 1].max().item()
train2 = r[r[:, 1].argmax(), 0].item()
test = r[r[:, 1].argmax(), 2].item()
best_results.append((train1, valid, train2, test))
best_result = th.tensor(best_results)
print("All runs:")
r = best_result[:, 0]
print(f"Highest Train: {r.mean():.2f} ± {r.std():.2f}")
r = best_result[:, 1]
print(f"Highest Valid: {r.mean():.2f} ± {r.std():.2f}")
r = best_result[:, 2]
print(f" Final Train: {r.mean():.2f} ± {r.std():.2f}")
r = best_result[:, 3]
print(f" Final Test: {r.mean():.2f} ± {r.std():.2f}")
def extract_node_features(name, block, data, node_embed, device):
"""Extract the node features from embedding layer or raw features."""
if name == "ogbn-mag":
input_nodes = {
k: v.to(device) for k, v in block.srcdata[dgl.NID].items()
}
# Extract node embeddings for the input nodes.
node_features = extract_embed(node_embed, input_nodes)
# Add the batch's raw "paper" features. Corresponds to the content
# in the function `rel_graph_embed` comment.
node_features.update(
{"paper": data.node_features[("paper", "feat")].to(device)}
)
else:
node_features = {
ntype: data.node_features[(ntype, "feat")]
for ntype in block.srctypes
}
# Original feature data are stored in float16 while model weights are
# float32, so we need to convert the features to float32.
node_features = {
k: v.to(device).float() for k, v in node_features.items()
}
return node_features
@th.no_grad()
@torch.no_grad()
def evaluate(
name,
g,
......@@ -459,7 +416,6 @@ def evaluate(
item_set,
features,
num_workers,
save_test_submission=False,
):
# Switches the model to evaluation mode.
model.eval()
......@@ -493,7 +449,9 @@ def evaluate(
)
# Generate predictions.
logits = model(node_features, blocks)[category]
logits = model(blocks, node_features)
logits = logits[category]
# Apply softmax to the logits and get the prediction by selecting the
# argmax.
......@@ -501,22 +459,18 @@ def evaluate(
y_hats.append(y_hat.cpu())
y_true.append(data.labels[category].long())
y_pred = th.cat(y_hats, dim=0)
y_true = th.cat(y_true, dim=0)
y_true = th.unsqueeze(y_true, 1)
y_pred = torch.cat(y_hats, dim=0)
y_true = torch.cat(y_true, dim=0)
y_true = torch.unsqueeze(y_true, 1)
if name == "ogb-lsc-mag240m":
y_pred = y_pred.view(-1)
y_true = y_true.view(-1)
if save_test_submission:
evaluator.save_test_submission(
input_dict={"y_pred": y_pred}, dir_path=".", mode="test-dev"
)
return evaluator.eval({"y_true": y_true, "y_pred": y_pred})["acc"]
def run(
def train(
name,
g,
model,
......@@ -524,14 +478,12 @@ def run(
optimizer,
train_set,
valid_set,
test_set,
logger,
device,
run_id,
features,
num_workers,
num_epochs,
):
print("start to run...")
print("Start to train...")
category = "paper"
data_loader = create_dataloader(
......@@ -548,13 +500,14 @@ def run(
# Typically, the best Validation performance is obtained after
# the 1st or 2nd epoch. This is why the max epoch is set to 3.
for epoch in range(3):
for epoch in range(num_epochs):
num_train = len(train_set)
t0 = time.time()
model.train()
total_loss = 0
for data in tqdm(data_loader, desc=f"Training~Epoch {epoch:02d}"):
for data in tqdm(data_loader, desc=f"Training~Epoch {epoch + 1:02d}"):
# Convert MiniBatch to DGL Blocks.
blocks = [block.to(device) for block in data.blocks]
......@@ -569,7 +522,7 @@ def run(
# Reset gradients.
optimizer.zero_grad()
# Generate predictions.
logits = model(node_features, blocks)[category]
logits = model(blocks, node_features)[category]
y_hat = logits.log_softmax(dim=-1).cpu()
loss = F.nll_loss(y_hat, data.labels[category].long())
......@@ -578,14 +531,10 @@ def run(
total_loss += loss.item() * num_seeds
t1 = time.time()
loss = total_loss / num_train
# Evaluate the model on the train/val/test set.
print("Evaluating the model on the training set.")
train_acc = evaluate(
name, g, model, node_embed, device, train_set, features, num_workers
)
print("Finish evaluating on training set.")
# Evaluate the model on the val/test set.
print("Evaluating the model on the validation set.")
valid_acc = evaluate(
......@@ -593,44 +542,26 @@ def run(
)
print("Finish evaluating on validation set.")
print("Evaluating the model on the test set.")
test_acc = evaluate(
name,
g,
model,
node_embed,
device,
test_set,
features,
num_workers,
save_test_submission=(name == "ogb-lsc-mag240m"),
)
print("Finish evaluating on test set.")
logger.add_result(run_id, (train_acc, valid_acc, test_acc))
print(
f"Run: {run_id + 1:02d}, "
f"Epoch: {epoch +1 :02d}, "
f"Epoch: {epoch + 1:02d}, "
f"Loss: {loss:.4f}, "
f"Train: {100 * train_acc:.2f}%, "
f"Valid: {100 * valid_acc:.2f}%, "
f"Test: {100 * test_acc:.2f}%"
f"Valid accuracy: {100 * valid_acc:.2f}%, "
f"Time {t1 - t0:.4f}"
)
print("Finish evaluating on test set.")
return logger
def main(args):
device = th.device("cuda") if args.num_gpus > 0 else th.device("cpu")
# Initialize a logger.
logger = Logger(args.runs)
device = torch.device("cuda") if args.num_gpus > 0 else torch.device("cpu")
# Load dataset.
g, features, train_set, valid_set, test_set, num_classes = load_dataset(
args.dataset
)
(
g,
features,
train_set,
valid_set,
test_set,
num_classes,
) = load_dataset(args.dataset)
feat_size = features.size("node", "paper", "feat")[0]
......@@ -654,13 +585,6 @@ def main(args):
f"{sum(p.numel() for p in model.parameters())}"
)
for run_id in range(args.runs):
# [Why we need to reset the parameters?]
# If parameters are not reset, the model will start with the
# parameters learned from the last run, potentially resulting
# in biased outcomes or sub-optimal performance if the model was
# previously stuck in a poor local minimum.
if embed_layer is not None:
embed_layer.reset_parameters()
model.reset_parameters()
......@@ -675,12 +599,8 @@ def main(args):
model.parameters(),
[] if embed_layer is None else embed_layer.parameters(),
)
optimizer = th.optim.Adam(all_params, lr=0.01)
optimizer = torch.optim.Adam(all_params, lr=0.01)
# `expected_max`` is the number of physical cores on your machine.
# The `logical` parameter, when set to False, ensures that the count
# returned is the number of physical cores instead of logical cores
# (which could be higher due to technologies like Hyper-Threading).
expected_max = int(psutil.cpu_count(logical=False))
if args.num_workers >= expected_max:
print(
......@@ -688,7 +608,8 @@ def main(args):
f"cores, please set any number less than {expected_max}",
file=sys.stderr,
)
logger = run(
train(
args.dataset,
g,
model,
......@@ -696,18 +617,24 @@ def main(args):
optimizer,
train_set,
valid_set,
test_set,
logger,
device,
run_id,
features,
args.num_workers,
args.num_epochs,
)
logger.print_statistics(run_id)
print("Final performance: ")
logger.print_statistics()
print("Testing...")
test_acc = evaluate(
args.dataset,
g,
model,
embed_layer,
device,
test_set,
features,
args.num_workers,
)
print(f"Test accuracy {test_acc*100:.4f}")
if __name__ == "__main__":
......@@ -716,9 +643,10 @@ if __name__ == "__main__":
"--dataset",
type=str,
default="ogbn-mag",
choices=["ogbn-mag", "ogb-lsc-mag240m"],
help="Dataset name. Possible values: ogbn-mag, ogb-lsc-mag240m",
)
parser.add_argument("--runs", type=int, default=5)
parser.add_argument("--num_epochs", type=int, default=3)
parser.add_argument("--num_workers", type=int, default=0)
parser.add_argument("--num_gpus", type=int, default=0)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment