Unverified Commit 403dba62 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Examples] enable train on ogb-lsc-mag240m (#6402)

parent 5ce8b83d
# Node classification on heterogeneous graph with RGCN
This example aims to demonstrate how to run node classification task on heterogeneous graph with **DGL**. Models are not tuned to achieve the best accuracy yet.
## Run on `ogbn-mag` dataset
In the preprocess stage, reverse edges are added and duplicate edges are removed. Feature data of `author` and `institution` node types are generated dynamically with embedding layer.
### Sample on CPU and train/infer on CPU
```
python3 hetero_rgcn.py --dataset ogbn-mag
```
### Sample on CPU and train/infer on GPU
```
python3 hetero_rgcn.py --dataset ogbn-mag --num_gpus 1
```
### Resource usage and time cost
Below results are roughly collected from an AWS EC2 **g4dn.metal**, 384GB RAM, 96 vCPUs(Cascade Lake P-8259L), 8 NVIDIA T4 GPUs(16GB RAM). CPU RAM usage is the peak value of `used` field of `free` command which is a bit rough. Please refer to `RSS`/`USS`/`PSS` which are more accurate. GPU RAM usage is the peak value recorded by `nvidia-smi` command.
| Dataset Size | CPU RAM Usage | Num of GPUs | GPU RAM Usage | Time Per Epoch(Training) | Time Per Epoch(Inference: train/val/test set) |
| ------------ | ------------- | ----------- | ---------- | --------- | --------------------------- |
| ~1.1GB | ~5GB | 0 | 0GB | ~4min03s(615it, 2.53it/s) | ~0min22s(154it, 6.86it/s) + ~0min2s(16it, 6.92it/s) + ~0min1s(11it, 7.34it/s) |
| ~1.1GB | ~3GB | 0 | 4.4GB | ~1min20s(615it, 7.65it/s) | ~0min14s(154it, 10.79it/s) + ~0min1s(16it, 10.07it/s) + ~0min1s(11it, 10.42it/s) |
### Accuracies
```
Final performance:
All runs:
Highest Train: 83.22 ± 0.00
Highest Valid: 48.25 ± 0.20
Final Train: 68.45 ± 9.81
Final Test: 47.51 ± 0.19
```
## Run on `ogb-lsc-mag240m` dataset
In the preprocess stage, reverse edges are added and duplicate edges are removed. What's more, feature data are generated in advance for `author` and `institution` node types via message passing. Since such preprocessing will usually take a long time, we also offer the above files for download:
* [`paper-feat.npy`](https://dgl-data.s3-accelerate.amazonaws.com/dataset/OGB-LSC/paper-feat.npy)
* [`author-feat.npy`](https://dgl-data.s3-accelerate.amazonaws.com/dataset/OGB-LSC/author-feat.npy)
* [`inst-feat.npy`](https://dgl-data.s3-accelerate.amazonaws.com/dataset/OGB-LSC/inst-feat.npy)
* [`hetero-graph.dgl`](https://dgl-data.s3-accelerate.amazonaws.com/dataset/OGB-LSC/hetero-graph.dgl)
### Sample on CPU and train/infer on CPU
```
python3 hetero_rgcn.py --dataset ogb-lsc-mag240m
```
### Sample on CPU and train/infer on GPU
```
python3 hetero_rgcn.py --dataset ogb-lsc-mag240m --num_gpus 1
```
### Resource usage and time cost
Below results are roughly collected from an AWS EC2 **g4dn.metal**, 384GB RAM, 96 vCPUs(Cascade Lake P-8259L), 8 NVIDIA T4 GPUs(16GB RAM). CPU RAM usage is the peak value of `used` field of `free` command which is a bit rough. Please refer to `RSS`/`USS`/`PSS` which are more accurate. GPU RAM usage is the peak value recorded by `nvidia-smi` command.
| Dataset Size | CPU RAM Usage | Num of GPUs | GPU RAM Usage | Time Per Epoch(Training) | Time Per Epoch(Inference: train/val/test set) |
| ------------ | ------------- | ----------- | ---------- | --------- | --------------------------- |
| ~404GB | ~60GB | 0 | 0GB | ~4min12s(1087it, 4.31it/s) | ~2min40s(272it, 1.70it/s) + ~0min25s(34it, 1.35it/s) + ~0min15s(22it, 1.43it/s) |
| ~404GB | ~60GB | 1 | 7GB | ~2min46s(1087it, 6.52it/s) | ~1min49s(272it, 2.48it/s) + ~0min17s(34it, 1.76it/s) + ~0min12s(22it, 1.81it/s) |
### Accuracies
```
Final performance:
All runs:
Highest Train: 54.85 ± 1.02
Highest Valid: 52.29 ± 0.50
Final Train: 54.78 ± 1.12
Final Test: 0.00 ± 0.00
```
...@@ -52,6 +52,7 @@ import sys ...@@ -52,6 +52,7 @@ import sys
import dgl import dgl
import dgl.nn as dglnn import dgl.nn as dglnn
import numpy as np
import psutil import psutil
...@@ -60,36 +61,59 @@ import torch.nn as nn ...@@ -60,36 +61,59 @@ import torch.nn as nn
import torch.nn.functional as F import torch.nn.functional as F
from dgl import AddReverse, Compose, ToSimple from dgl import AddReverse, Compose, ToSimple
from dgl.nn import HeteroEmbedding from dgl.nn import HeteroEmbedding
from ogb.lsc import MAG240MDataset, MAG240MEvaluator
from ogb.nodeproppred import DglNodePropPredDataset, Evaluator from ogb.nodeproppred import DglNodePropPredDataset, Evaluator
from tqdm import tqdm from tqdm import tqdm
def prepare_data(args, device): def prepare_data(args, device):
dataset = DglNodePropPredDataset(name="ogbn-mag") feats = {}
if args.dataset == "ogbn-mag":
dataset = DglNodePropPredDataset(name="ogbn-mag", root=args.rootdir)
# - graph: dgl graph object.
# - label: torch tensor of shape (num_nodes, num_tasks).
g, labels = dataset[0]
# Flatten the labels for "paper" type nodes. This step reduces the
# dimensionality of the labels. We need to flatten the labels because
# the model requires a 1-dimensional label tensor.
labels = labels["paper"].flatten().long()
# Apply transformation to the graph.
# - "ToSimple()" removes multi-edge between two nodes.
# - "AddReverse()" adds reverse edges to the graph.
transform = Compose([ToSimple(), AddReverse()])
g = transform(g)
else:
dataset = MAG240MDataset(root=args.rootdir)
(g,), _ = dgl.load_graphs(args.graph_path)
g = g.formats(["csc"])
labels = th.as_tensor(dataset.paper_label).long()
# As feature data is too large to fit in memory, we read it from disk.
feats["paper"] = th.as_tensor(
np.load(args.paper_feature_path, mmap_mode="r+")
)
feats["author"] = th.as_tensor(
np.load(args.author_feature_path, mmap_mode="r+")
)
feats["institution"] = th.as_tensor(
np.load(args.inst_feature_path, mmap_mode="r+")
)
print(f"Loaded graph: {g}")
# Get train/valid/test index. # Get train/valid/test index.
split_idx = dataset.get_idx_split() split_idx = dataset.get_idx_split()
if args.dataset == "ogb-lsc-mag240m":
# - graph: dgl graph object. split_idx = {
# - label: torch tensor of shape (num_nodes, num_tasks). split_type: {"paper": split_idx[split_type]}
g, labels = dataset[0] for split_type in split_idx
}
# Flatten the labels for "paper" type nodes. This step reduces the
# dimensionality of the labels. We need to flatten the labels because
# the model requires a 1-dimensional label tensor.
labels = labels["paper"].flatten()
# Apply transformation to the graph.
# - "ToSimple()" removes multi-edge between two nodes.
# - "AddReverse()" adds reverse edges to the graph.
transform = Compose([ToSimple(), AddReverse()])
g = transform(g)
print(f"Loaded graph: {g}")
# Initialize a train sampler that samples neighbors for multi-layer graph # Initialize a train sampler that samples neighbors for multi-layer graph
# convolution. It samples 25 and 20 neighbors for the first and second # convolution. It samples 25 and 10 neighbors for the first and second
# layers respectively. # layers respectively.
sampler = dgl.dataloading.MultiLayerNeighborSampler([25, 20]) sampler = dgl.dataloading.MultiLayerNeighborSampler([25, 10])
num_workers = args.num_workers num_workers = args.num_workers
train_loader = dgl.dataloading.DataLoader( train_loader = dgl.dataloading.DataLoader(
g, g,
...@@ -101,7 +125,7 @@ def prepare_data(args, device): ...@@ -101,7 +125,7 @@ def prepare_data(args, device):
device=device, device=device,
) )
return g, labels, dataset.num_classes, split_idx, train_loader return g, labels, dataset.num_classes, split_idx, train_loader, feats
def extract_embed(node_embed, input_nodes): def extract_embed(node_embed, input_nodes):
...@@ -383,8 +407,33 @@ class Logger(object): ...@@ -383,8 +407,33 @@ class Logger(object):
print(f" Final Test: {r.mean():.2f} ± {r.std():.2f}") print(f" Final Test: {r.mean():.2f} ± {r.std():.2f}")
def extract_node_features(name, g, input_nodes, node_embed, feats, device):
"""Extract the node features from embedding layer or raw features."""
if name == "ogbn-mag":
# Extract node embeddings for the input nodes.
node_features = extract_embed(node_embed, input_nodes)
# Add the batch's raw "paper" features. Corresponds to the content
# in the function `rel_graph_embed` comment.
node_features.update(
{"paper": g.ndata["feat"]["paper"][input_nodes["paper"].cpu()]}
)
node_features = {k: e.to(device) for k, e in node_features.items()}
else:
node_features = {
ntype: feats[ntype][input_nodes[ntype].cpu()].to(device)
for ntype in input_nodes
}
# Original feature data are stored in float16 while model weights are
# float32, so we need to convert the features to float32.
# [TODO] Enable mixed precision training on GPU.
node_features = {k: v.float() for k, v in node_features.items()}
return node_features
def train( def train(
dataset,
g, g,
feats,
model, model,
node_embed, node_embed,
optimizer, optimizer,
...@@ -414,24 +463,17 @@ def train( ...@@ -414,24 +463,17 @@ def train(
# We only predict the nodes with type "category". # We only predict the nodes with type "category".
seeds = seeds[category] seeds = seeds[category]
batch_size = seeds.shape[0] batch_size = seeds.shape[0]
input_nodes_indexes = input_nodes[category].to(g.device)
seeds = seeds.to(labels.device)
# Extract node embeddings for the input nodes.
emb = extract_embed(node_embed, input_nodes)
# Add the batch's raw "paper" features. Corresponds to the content
# in the function `rel_graph_embed` comment.
emb.update(
{category: g.ndata["feat"][category][input_nodes_indexes]}
)
emb = {k: e.to(device) for k, e in emb.items()} # Extract the node features from embedding layer or raw features.
lbl = labels[seeds].to(device) node_features = extract_node_features(
dataset, g, input_nodes, node_embed, feats, device
)
lbl = labels[seeds.cpu()].to(device)
# Reset gradients. # Reset gradients.
optimizer.zero_grad() optimizer.zero_grad()
# Generate predictions. # Generate predictions.
logits = model(emb, blocks)[category] logits = model(node_features, blocks)[category]
y_hat = logits.log_softmax(dim=-1) y_hat = logits.log_softmax(dim=-1)
loss = F.nll_loss(y_hat, lbl) loss = F.nll_loss(y_hat, lbl)
...@@ -442,10 +484,40 @@ def train( ...@@ -442,10 +484,40 @@ def train(
loss = total_loss / num_train loss = total_loss / num_train
# Evaluate the model on the test set. # Evaluate the model on the train/val/test set.
result = test(g, model, node_embed, labels, device, split_idx) train_acc = evaluate(
logger.add_result(run, result) dataset,
train_acc, valid_acc, test_acc = result g,
feats,
model,
node_embed,
labels,
device,
split_idx["train"],
)
valid_acc = evaluate(
dataset,
g,
feats,
model,
node_embed,
labels,
device,
split_idx["valid"],
)
test_key = "test" if dataset == "ogbn-mag" else "test-dev"
test_acc = evaluate(
dataset,
g,
feats,
model,
node_embed,
labels,
device,
split_idx[test_key],
save_test_submission=(dataset == "ogb-lsc-mag240m"),
)
logger.add_result(run, (train_acc, valid_acc, test_acc))
print( print(
f"Run: {run + 1:02d}, " f"Run: {run + 1:02d}, "
f"Epoch: {epoch +1 :02d}, " f"Epoch: {epoch +1 :02d}, "
...@@ -459,30 +531,31 @@ def train( ...@@ -459,30 +531,31 @@ def train(
@th.no_grad() @th.no_grad()
def test(g, model, node_embed, y_true, device, split_idx): def evaluate(
dataset,
g,
feats,
model,
node_embed,
labels,
device,
idx,
save_test_submission=False,
):
# Switches the model to evaluation mode. # Switches the model to evaluation mode.
model.eval() model.eval()
category = "paper" category = "paper"
# An evaluator for the dataset 'ogbn-mag'. if dataset == "ogbn-mag":
evaluator = Evaluator(name="ogbn-mag") evaluator = Evaluator(name="ogbn-mag")
else:
# Initialize a neighbor sampler that samples all neighbors. The model evaluator = MAG240MEvaluator()
# has 2 GNN layers, so we create a sampler of 2 layers.
###################################################################### sampler = dgl.dataloading.MultiLayerNeighborSampler([25, 10])
# [Why we need to sample all neighbors?] dataloader = dgl.dataloading.DataLoader(
# During the testing phase, we use a `MultiLayerFullNeighborSampler` to
# sample all neighbors for each node. This is done to achieve the most
# accurate evaluation of the model's performance, despite the increased
# computational cost. This contrasts with the training phase where we
# prefer a balance between computational efficiency and model accuracy,
# hence only a subset of neighbors is sampled.
######################################################################
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
loader = dgl.dataloading.DataLoader(
g, g,
{category: th.arange(g.num_nodes(category))}, idx,
sampler, sampler,
batch_size=16384, batch_size=4096,
shuffle=False, shuffle=False,
num_workers=0, num_workers=0,
device=device, device=device,
...@@ -490,73 +563,63 @@ def test(g, model, node_embed, y_true, device, split_idx): ...@@ -490,73 +563,63 @@ def test(g, model, node_embed, y_true, device, split_idx):
# To store the predictions. # To store the predictions.
y_hats = list() y_hats = list()
y_true = list()
for input_nodes, seeds, blocks in tqdm(loader, desc="Inference"): for input_nodes, seeds, blocks in tqdm(dataloader, desc="Inference"):
blocks = [blk.to(device) for blk in blocks] blocks = [blk.to(device) for blk in blocks]
# We only predict the nodes with type "category". # We only predict the nodes with type "category".
seeds = seeds[category] node_features = extract_node_features(
input_nodes_indexes = input_nodes[category].to(g.device) dataset, g, input_nodes, node_embed, feats, device
)
# Extract node embeddings for the input nodes.
emb = extract_embed(node_embed, input_nodes)
# Add the batch's raw "paper" features.
# Corresponds to the content in the function `rel_graph_embed` comment.
emb.update({category: g.ndata["feat"][category][input_nodes_indexes]})
emb = {k: e.to(device) for k, e in emb.items()}
# Generate predictions. # Generate predictions.
logits = model(emb, blocks)[category] logits = model(node_features, blocks)[category]
# Apply softmax to the logits and get the prediction by selecting the # Apply softmax to the logits and get the prediction by selecting the
# argmax. # argmax.
y_hat = logits.log_softmax(dim=-1).argmax(dim=1, keepdims=True) y_hat = logits.log_softmax(dim=-1).argmax(dim=1, keepdims=True)
y_hats.append(y_hat.cpu()) y_hats.append(y_hat.cpu())
y_true.append(labels[seeds["paper"].cpu()])
y_pred = th.cat(y_hats, dim=0) y_pred = th.cat(y_hats, dim=0)
y_true = th.cat(y_true, dim=0)
y_true = th.unsqueeze(y_true, 1) y_true = th.unsqueeze(y_true, 1)
# Calculate the accuracy of the predictions for the train, valid and if dataset == "ogb-lsc-mag240m":
# test splits. y_pred = y_pred.view(-1)
train_acc = evaluator.eval( y_true = y_true.view(-1)
{
"y_true": y_true[split_idx["train"]["paper"]],
"y_pred": y_pred[split_idx["train"]["paper"]],
}
)["acc"]
valid_acc = evaluator.eval(
{
"y_true": y_true[split_idx["valid"]["paper"]],
"y_pred": y_pred[split_idx["valid"]["paper"]],
}
)["acc"]
test_acc = evaluator.eval(
{
"y_true": y_true[split_idx["test"]["paper"]],
"y_pred": y_pred[split_idx["test"]["paper"]],
}
)["acc"]
return train_acc, valid_acc, test_acc if save_test_submission:
evaluator.save_test_submission(
input_dict={"y_pred": y_pred}, dir_path=".", mode="test-dev"
)
return evaluator.eval({"y_true": y_true, "y_pred": y_pred})["acc"]
def main(args): def main(args):
device = "cuda:0" if th.cuda.is_available() else "cpu" device = "cuda:0" if th.cuda.is_available() and args.num_gpus > 0 else "cpu"
# Initialize a logger. # Initialize a logger.
logger = Logger(args.runs) logger = Logger(args.runs)
# Prepare the data. # Prepare the data.
g, labels, num_classes, split_idx, train_loader = prepare_data(args, device) g, labels, num_classes, split_idx, train_loader, feats = prepare_data(
args, device
)
feat_size = 128 if args.dataset == "ogbn-mag" else 768
# Create the embedding layer and move it to the appropriate device. # Create the embedding layer and move it to the appropriate device.
embed_layer = rel_graph_embed(g, 128).to(device) embed_layer = None
if args.dataset == "ogbn-mag":
embed_layer = rel_graph_embed(g, feat_size).to(device)
print(
"Number of embedding parameters: "
f"{sum(p.numel() for p in embed_layer.parameters())}"
)
# Initialize the entity classification model. # Initialize the entity classification model.
model = EntityClassify(g, 128, num_classes).to(device) model = EntityClassify(g, feat_size, num_classes).to(device)
print(
"Number of embedding parameters: "
f"{sum(p.numel() for p in embed_layer.parameters())}"
)
print( print(
"Number of model parameters: " "Number of model parameters: "
f"{sum(p.numel() for p in model.parameters())}" f"{sum(p.numel() for p in model.parameters())}"
...@@ -564,7 +627,8 @@ def main(args): ...@@ -564,7 +627,8 @@ def main(args):
for run in range(args.runs): for run in range(args.runs):
try: try:
embed_layer.reset_parameters() if embed_layer is not None:
embed_layer.reset_parameters()
model.reset_parameters() model.reset_parameters()
except: except:
# Old pytorch version doesn't support reset_parameters() API. # Old pytorch version doesn't support reset_parameters() API.
...@@ -585,7 +649,8 @@ def main(args): ...@@ -585,7 +649,8 @@ def main(args):
# which is passed to the optimizer. The optimizer then updates all # which is passed to the optimizer. The optimizer then updates all
# these parameters during the training process. # these parameters during the training process.
all_params = itertools.chain( all_params = itertools.chain(
model.parameters(), embed_layer.parameters() model.parameters(),
[] if embed_layer is None else embed_layer.parameters(),
) )
optimizer = th.optim.Adam(all_params, lr=0.01) optimizer = th.optim.Adam(all_params, lr=0.01)
...@@ -601,7 +666,9 @@ def main(args): ...@@ -601,7 +666,9 @@ def main(args):
file=sys.stderr, file=sys.stderr,
) )
logger = train( logger = train(
args.dataset,
g, g,
feats,
model, model,
embed_layer, embed_layer,
optimizer, optimizer,
...@@ -620,8 +687,60 @@ def main(args): ...@@ -620,8 +687,60 @@ def main(args):
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="RGCN") parser = argparse.ArgumentParser(description="RGCN")
parser.add_argument("--runs", type=int, default=10) parser.add_argument(
parser.add_argument("--num_workers", type=int, default=0) "--dataset",
type=str,
default="ogbn-mag",
help="Dataset for train: ogbn-mag, ogb-lsc-mag240m",
)
parser.add_argument(
"--num_gpus",
type=int,
default=0,
help="Number of GPUs. Use 0 for CPU training.",
)
parser.add_argument(
"--runs",
type=int,
default=5,
help="Number of runs. Each run will train the model from scratch.",
)
parser.add_argument(
"--num_workers",
type=int,
default=0,
help="Number of worker processes for data loading.",
)
parser.add_argument(
"--rootdir",
type=str,
default="./",
help="Directory to download the OGB dataset.",
)
parser.add_argument(
"--graph_path",
type=str,
default="./graph.dgl",
help="Path to the graph file.",
)
parser.add_argument(
"--paper_feature_path",
type=str,
default="./paper-feat.npy",
help="Path to the features of paper nodes.",
)
parser.add_argument(
"--author_feature_path",
type=str,
default="./author-feat.npy",
help="Path to the features of author nodes.",
)
parser.add_argument(
"--inst_feature_path",
type=str,
default="./inst-feat.npy",
help="Path to the features of institution nodes.",
)
args = parser.parse_args() args = parser.parse_args()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment