Unverified Commit e22bd78f authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[GraphBolt] enable training on ogb-lsc-mag240m (#6366)

parent f1e7060b
# Node classification on heterogeneous graph with RGCN # Node classification on heterogeneous graph with RGCN
This example aims to demonstrate how to run node classification task on heterogeneous graph with **GraphBolt**. Models are not tuned to achieve the best accuracy yet.
## Run on `ogbn-mag` dataset ## Run on `ogbn-mag` dataset
### Command ### Command
...@@ -8,6 +10,12 @@ python3 hetero_rgcn.py ...@@ -8,6 +10,12 @@ python3 hetero_rgcn.py
``` ```
### Statistics of train/validation/test ### Statistics of train/validation/test
Below results are run on AWS EC2 r6idn.metal, 1024GB RAM, 128 vCPUs(Ice Lake 8375C), 0 GPUs.
| Dataset Size | Peak CPU RAM Usage | Time Per Epoch(Training) | Time Per Epoch(Inference: train/val/test set) |
| ------------ | ------------- | ------------------------ | --------------------------- |
| ~1.1GB | ~5GB | ~3min | ~1min40s + ~0min9s + ~0min7s |
``` ```
Final performance: Final performance:
All runs: All runs:
...@@ -15,4 +23,31 @@ Highest Train: 49.29 ± 0.85 ...@@ -15,4 +23,31 @@ Highest Train: 49.29 ± 0.85
Highest Valid: 34.69 ± 0.49 Highest Valid: 34.69 ± 0.49
Final Train: 48.14 ± 1.09 Final Train: 48.14 ± 1.09
Final Test: 33.65 ± 0.63 Final Test: 33.65 ± 0.63
```
## Run on `ogb-lsc-mag240m` dataset
### Command
```
python3 hetero_rgcn.py --dataset ogb-lsc-mag240m --runs 2
```
### Statistics of train/validation/test
Below results are run on AWS EC2 r6idn.metal, 1024GB RAM, 128 vCPUs(Ice Lake 8375C), 0 GPUs.
| Dataset Size | Peak CPU RAM Usage | Time Per Epoch(Training) | Time Per Epoch(Inference: train/val/test set) |
| ------------ | ------------- | ------------------------ | ------------------------- |
| ~404GB | ~110GB | ~2min45s | ~28min25s + ~4min21s + ~2min54s |
As labels are hidden for test set, test accuray is always **0.00**. Test submission is saved as `y_pred_mag240m_test-dev.npz` under current directory.
As we can see from above table, the time per epoch is quite close to the one in `ogbn-mag`. This is due to no embedding layer is applied for `ogb-lsc-mag240m`. All required node features are generated in advance.
```
Final performance:
All runs:
Highest Train: 54.75 ± 0.29
Highest Valid: 52.08 ± 0.09
Final Train: 54.75 ± 0.29
Final Test: 0.00 ± 0.00
``` ```
\ No newline at end of file
...@@ -2,8 +2,10 @@ ...@@ -2,8 +2,10 @@
This script is a GraphBolt counterpart of This script is a GraphBolt counterpart of
``/examples/core/rgcn/hetero_rgcn.py``. It demonstrates how to use GraphBolt ``/examples/core/rgcn/hetero_rgcn.py``. It demonstrates how to use GraphBolt
to train a R-GCN model for node classification on the Open Graph Benchmark to train a R-GCN model for node classification on the Open Graph Benchmark
(OGB) dataset "ogbn-mag". For more details on "ogbn-mag", please refer to (OGB) dataset "ogbn-mag" and "ogb-lsc-mag240m". For more details on "ogbn-mag",
the OGB website: (https://ogb.stanford.edu/docs/linkprop/). please refer to the OGB website: (https://ogb.stanford.edu/docs/linkprop/). For
more details on "ogb-lsc-mag240m", please refer to the OGB website:
(https://ogb.stanford.edu/docs/lsc/mag240m/).
Paper [Modeling Relational Data with Graph Convolutional Networks] Paper [Modeling Relational Data with Graph Convolutional Networks]
(https://arxiv.org/abs/1703.06103). (https://arxiv.org/abs/1703.06103).
...@@ -53,6 +55,7 @@ import torch as th ...@@ -53,6 +55,7 @@ import torch as th
import torch.nn as nn import torch.nn as nn
import torch.nn.functional as F import torch.nn.functional as F
from dgl.nn import HeteroEmbedding from dgl.nn import HeteroEmbedding
from ogb.lsc import MAG240MEvaluator
from ogb.nodeproppred import Evaluator from ogb.nodeproppred import Evaluator
from tqdm import tqdm from tqdm import tqdm
...@@ -73,12 +76,21 @@ def load_dataset(dataset_name): ...@@ -73,12 +76,21 @@ def load_dataset(dataset_name):
valid_set = dataset.tasks[0].validation_set valid_set = dataset.tasks[0].validation_set
test_set = dataset.tasks[0].test_set test_set = dataset.tasks[0].test_set
num_classes = dataset.tasks[0].metadata["num_classes"] num_classes = dataset.tasks[0].metadata["num_classes"]
print(len(train_set), len(valid_set), len(test_set))
return graph, features, train_set, valid_set, test_set, num_classes return graph, features, train_set, valid_set, test_set, num_classes
def create_dataloader( def create_dataloader(
graph, features, item_set, device, batch_size, fanouts, shuffle, num_workers name,
graph,
features,
item_set,
device,
batch_size,
fanouts,
shuffle,
num_workers,
): ):
"""Create a GraphBolt dataloader for training, validation or testing.""" """Create a GraphBolt dataloader for training, validation or testing."""
...@@ -106,9 +118,11 @@ def create_dataloader( ...@@ -106,9 +118,11 @@ def create_dataloader(
# `node_feature_keys`: # `node_feature_keys`:
# The node features to fetch. This is a dictionary where the keys are # The node features to fetch. This is a dictionary where the keys are
# node types and the values are lists of feature names. # node types and the values are lists of feature names.
datapipe = datapipe.fetch_feature( node_feature_keys = {"paper": ["feat"]}
features, node_feature_keys={"paper": ["feat", "year"]} if name == "ogb-lsc-mag240m":
) node_feature_keys["author"] = ["feat"]
node_feature_keys["institution"] = ["feat"]
datapipe = datapipe.fetch_feature(features, node_feature_keys)
# Move the mini-batch to the appropriate device. # Move the mini-batch to the appropriate device.
# `device`: # `device`:
...@@ -407,15 +421,46 @@ class Logger(object): ...@@ -407,15 +421,46 @@ class Logger(object):
print(f" Final Test: {r.mean():.2f} ± {r.std():.2f}") print(f" Final Test: {r.mean():.2f} ± {r.std():.2f}")
def extract_node_features(name, block, data, node_embed, device):
"""Extract the node features from embedding layer or raw features."""
if name == "ogbn-mag":
# Extract node embeddings for the input nodes.
node_features = extract_embed(node_embed, data.input_nodes)
# Add the batch's raw "paper" features. Corresponds to the content
# in the function `rel_graph_embed` comment.
node_features.update({"paper": data.node_features[("paper", "feat")]})
else:
node_features = {
ntype: block.srcnodes[ntype].data["feat"]
for ntype in block.srctypes
}
# Original feature data are stored in float16 which is not supported
# on CPU. Let's convert to float32 explicitly.
if device == th.device("cpu"):
node_features = {k: v.float() for k, v in node_features.items()}
return node_features
@th.no_grad() @th.no_grad()
def evaluate( def evaluate(
name, g, model, node_embed, device, item_set, features, num_workers name,
g,
model,
node_embed,
device,
item_set,
features,
num_workers,
save_test_submission=False,
): ):
# Switches the model to evaluation mode. # Switches the model to evaluation mode.
model.eval() model.eval()
category = "paper" category = "paper"
# An evaluator for the dataset. # An evaluator for the dataset.
evaluator = Evaluator(name=name) if name == "ogbn-mag":
evaluator = Evaluator(name=name)
else:
evaluator = MAG240MEvaluator()
# Initialize a neighbor sampler that samples all neighbors. The model # Initialize a neighbor sampler that samples all neighbors. The model
# has 2 GNN layers, so we create a sampler of 2 layers. # has 2 GNN layers, so we create a sampler of 2 layers.
...@@ -430,6 +475,7 @@ def evaluate( ...@@ -430,6 +475,7 @@ def evaluate(
###################################################################### ######################################################################
data_loader = create_dataloader( data_loader = create_dataloader(
name,
g, g,
features, features,
item_set, item_set,
...@@ -445,25 +491,32 @@ def evaluate( ...@@ -445,25 +491,32 @@ def evaluate(
y_true = list() y_true = list()
for data in tqdm(data_loader, desc="Inference"): for data in tqdm(data_loader, desc="Inference"):
# Extract node embeddings for the input nodes. blocks = data.to_dgl_blocks()
emb = extract_embed(node_embed, data.input_nodes) node_features = extract_node_features(
# Add the batch's raw "paper" features. Corresponds to the content name, blocks[0], data, node_embed, device
# in the function `rel_graph_embed` comment. )
emb.update({category: data.node_features[(category, "feat")]})
# Generate predictions. # Generate predictions.
logits = model(emb, data.to_dgl_blocks())[category] logits = model(node_features, blocks)[category]
# Apply softmax to the logits and get the prediction by selecting the # Apply softmax to the logits and get the prediction by selecting the
# argmax. # argmax.
y_hat = logits.log_softmax(dim=-1).argmax(dim=1, keepdims=True) y_hat = logits.log_softmax(dim=-1).argmax(dim=1, keepdims=True)
y_hats.append(y_hat.cpu()) y_hats.append(y_hat.cpu())
y_true.append(data.labels[category].cpu()) y_true.append(data.labels[category].long().cpu())
y_pred = th.cat(y_hats, dim=0) y_pred = th.cat(y_hats, dim=0)
y_true = th.cat(y_true, dim=0) y_true = th.cat(y_true, dim=0)
y_true = th.unsqueeze(y_true, 1) y_true = th.unsqueeze(y_true, 1)
if name == "ogb-lsc-mag240m":
y_pred = y_pred.view(-1)
y_true = y_true.view(-1)
if save_test_submission:
evaluator.save_test_submission(
input_dict={"y_pred": y_pred}, dir_path=".", mode="test-dev"
)
return evaluator.eval({"y_true": y_true, "y_pred": y_pred})["acc"] return evaluator.eval({"y_true": y_true, "y_pred": y_pred})["acc"]
...@@ -494,6 +547,7 @@ def run( ...@@ -494,6 +547,7 @@ def run(
total_loss = 0 total_loss = 0
data_loader = create_dataloader( data_loader = create_dataloader(
name,
g, g,
features, features,
train_set, train_set,
...@@ -506,19 +560,22 @@ def run( ...@@ -506,19 +560,22 @@ def run(
for data in tqdm(data_loader, desc=f"Training~Epoch {epoch:02d}"): for data in tqdm(data_loader, desc=f"Training~Epoch {epoch:02d}"):
# Fetch the number of seed nodes in the batch. # Fetch the number of seed nodes in the batch.
num_seeds = data.seed_nodes[category].shape[0] num_seeds = data.seed_nodes[category].shape[0]
# Extract node embeddings for the input nodes.
emb = extract_embed(node_embed, data.input_nodes) # Convert MiniBatch to DGL Blocks.
# Add the batch's raw "paper" features. Corresponds to the content blocks = data.to_dgl_blocks()
# in the function `rel_graph_embed` comment.
emb.update({category: data.node_features[(category, "feat")]}) # Extract the node features from embedding layer or raw features.
node_features = extract_node_features(
name, blocks[0], data, node_embed, device
)
# Reset gradients. # Reset gradients.
optimizer.zero_grad() optimizer.zero_grad()
# Generate predictions. # Generate predictions.
logits = model(emb, data.to_dgl_blocks())[category] logits = model(node_features, blocks)[category]
y_hat = logits.log_softmax(dim=-1) y_hat = logits.log_softmax(dim=-1)
loss = F.nll_loss(y_hat, data.labels[category]) loss = F.nll_loss(y_hat, data.labels[category].long())
loss.backward() loss.backward()
optimizer.step() optimizer.step()
...@@ -541,7 +598,15 @@ def run( ...@@ -541,7 +598,15 @@ def run(
print("Evaluating the model on the test set.") print("Evaluating the model on the test set.")
test_acc = evaluate( test_acc = evaluate(
name, g, model, node_embed, device, test_set, features, num_workers name,
g,
model,
node_embed,
device,
test_set,
features,
num_workers,
save_test_submission=(name == "ogb-lsc-mag240m"),
) )
print("Finish evaluating on test set.") print("Finish evaluating on test set.")
...@@ -572,17 +637,27 @@ def main(args): ...@@ -572,17 +637,27 @@ def main(args):
args.dataset args.dataset
) )
# Create the embedding layer and move it to the appropriate device. # TODO: featch from ``feature store``.
feat_size = 128 # TODO: featch from ``feature store``. if args.dataset == "ogbn-mag":
embed_layer = rel_graph_embed(g, feat_size).to(device) feat_size = 128
else:
feat_size = 768
# As `ogb-lsc-mag240m` is a large dataset, features of `author` and
# `institution` are generated in advance and stored in the feature store.
# For `ogbn-mag`, we generate the features on the fly.
embed_layer = None
if args.dataset == "ogbn-mag":
# Create the embedding layer and move it to the appropriate device.
embed_layer = rel_graph_embed(g, feat_size).to(device)
print(
"Number of embedding parameters: "
f"{sum(p.numel() for p in embed_layer.parameters())}"
)
# Initialize the entity classification model. # Initialize the entity classification model.
model = EntityClassify(g, feat_size, num_classes).to(device) model = EntityClassify(g, feat_size, num_classes).to(device)
print(
"Number of embedding parameters: "
f"{sum(p.numel() for p in embed_layer.parameters())}"
)
print( print(
"Number of model parameters: " "Number of model parameters: "
f"{sum(p.numel() for p in model.parameters())}" f"{sum(p.numel() for p in model.parameters())}"
...@@ -594,7 +669,8 @@ def main(args): ...@@ -594,7 +669,8 @@ def main(args):
# parameters learned from the last run, potentially resulting # parameters learned from the last run, potentially resulting
# in biased outcomes or sub-optimal performance if the model was # in biased outcomes or sub-optimal performance if the model was
# previously stuck in a poor local minimum. # previously stuck in a poor local minimum.
embed_layer.reset_parameters() if embed_layer is not None:
embed_layer.reset_parameters()
model.reset_parameters() model.reset_parameters()
# `itertools.chain()` is a function in Python's itertools module. # `itertools.chain()` is a function in Python's itertools module.
...@@ -605,7 +681,8 @@ def main(args): ...@@ -605,7 +681,8 @@ def main(args):
# which is passed to the optimizer. The optimizer then updates all # which is passed to the optimizer. The optimizer then updates all
# these parameters during the training process. # these parameters during the training process.
all_params = itertools.chain( all_params = itertools.chain(
model.parameters(), embed_layer.parameters() model.parameters(),
[] if embed_layer is None else embed_layer.parameters(),
) )
optimizer = th.optim.Adam(all_params, lr=0.01) optimizer = th.optim.Adam(all_params, lr=0.01)
...@@ -648,8 +725,9 @@ if __name__ == "__main__": ...@@ -648,8 +725,9 @@ if __name__ == "__main__":
"--dataset", "--dataset",
type=str, type=str,
default="ogbn-mag", default="ogbn-mag",
help="Dataset name. Possible values: ogbn-mag, ogb-lsc-mag240m",
) )
parser.add_argument("--runs", type=int, default=10) parser.add_argument("--runs", type=int, default=5)
parser.add_argument("--num_workers", type=int, default=0) parser.add_argument("--num_workers", type=int, default=0)
parser.add_argument("--gpu", type=int, default=0) parser.add_argument("--gpu", type=int, default=0)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment