Unverified Commit 6a91d181 authored by Mufei Li's avatar Mufei Li Committed by GitHub
Browse files

[DGL-Go] Graph Property Prediction Pipeline (#3927)



* Update

* Update

* Update

* Fix

* Update

* Update

* Update

* Update

* Update

* Fix

* Update

* Update

* Update

* Fix

* Update

* Update

* update

* Update

* Update

* Update

* Fix

* Fix

* Update

* Update

* Update

* Update

* lr_scheduler

* Update

* Update

* Update

* Update

* Update

* Update

* Fix

* Fix

* Fix

* Fix

* Update

* Update

* Update

* Update

* Update

* Fix

* Update

* Update

* Update

* Update

* Fix

* Update

* Update

* Update

* Fix

* Fix

* Fix

* Update

* Update

* Fix

* Fix

* Update

* Update

* Update

* Update

* Update

* Update

* update

* CI

* Update

* Update

* Update

* Update

* Update
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
parent d31448dd
...@@ -112,6 +112,9 @@ will list the available recipes: ...@@ -112,6 +112,9 @@ will list the available recipes:
=============================================================================== ===============================================================================
| Filename | Pipeline | Dataset | | Filename | Pipeline | Dataset |
=============================================================================== ===============================================================================
| graphpred_pcba_gin.yaml | graphpred | ogbg-molpcba |
| graphpred_hiv_pna.yaml | graphpred | ogbg-molhiv |
| graphpred_hiv_gin.yaml | graphpred | ogbg-molhiv |
| linkpred_citation2_sage.yaml | linkpred | ogbl-citation2 | | linkpred_citation2_sage.yaml | linkpred | ogbl-citation2 |
| linkpred_collab_sage.yaml | linkpred | ogbl-collab | | linkpred_collab_sage.yaml | linkpred | ogbl-collab |
| nodepred_citeseer_sage.yaml | nodepred | citeseer | | nodepred_citeseer_sage.yaml | nodepred | citeseer |
...@@ -382,12 +385,11 @@ help message; use `dgl configure --help` for the configuration options; use ...@@ -382,12 +385,11 @@ help message; use `dgl configure --help` for the configuration options; use
`dgl configure nodepred --help` for the configuration options of node prediction pipeline. `dgl configure nodepred --help` for the configuration options of node prediction pipeline.
**Q: What exactly is nodepred/linkpred? How many are they?** **Q: What exactly is nodepred/linkpred? How many are they?**
A: They are called DGl-Go pipelines. A pipeline represents the training methodology for A: They are called DGL-Go pipelines. A pipeline represents the training methodology for
a certain task. Therefore, its naming convention is *<task_name>[-<method_name>]*. For example, a certain task. Therefore, its naming convention is *<task_name>[-<method_name>]*. For example,
`nodepred` trains the selected GNN model for node classification using full-graph training method; `nodepred` trains the selected GNN model for node classification using full-graph training method;
while `nodepred-ns` trains the model for node classifiation but using neighbor sampling. while `nodepred-ns` trains the model for node classifiation but using neighbor sampling.
The first release included three training pipelines (`nodepred`, `nodepred-ns` and `linkpred`) Currently DGL-Go provides four training pipelines (`nodepred`, `nodepred-ns`, `linkpred`, and `graphpred`). Use `dgl configure --help` to see
but you can expect more will be coming in the future. Use `dgl configure --help` to see
all the available pipelines. all the available pipelines.
**Q: How to add my model to the official model recipe zoo?** **Q: How to add my model to the official model recipe zoo?**
......
from .node_encoder import * from .node_encoder import *
from .edge_encoder import * from .edge_encoder import *
from .graph_encoder import *
\ No newline at end of file
from ...utils.factory import GraphModelFactory
from .gin_ogbg import OGBGGIN
from .pna import PNA
GraphModelFactory.register("gin")(OGBGGIN)
GraphModelFactory.register("pna")(PNA)
\ No newline at end of file
import dgl
import torch
import torch.nn as nn
import torch.nn.functional as F
from dgl.nn import GINEConv, AvgPooling, SumPooling
from ogb.graphproppred.mol_encoder import AtomEncoder, BondEncoder
class MLP(nn.Module):
def __init__(self,
feat_size: int):
"""Multilayer Perceptron (MLP)"""
super(MLP, self).__init__()
self.mlp = nn.Sequential(
nn.Linear(feat_size, 2 * feat_size),
nn.BatchNorm1d(2 * feat_size),
nn.ReLU(),
nn.Linear(2 * feat_size, feat_size),
nn.BatchNorm1d(feat_size)
)
def forward(self, h):
return self.mlp(h)
class OGBGGIN(nn.Module):
def __init__(self,
data_info: dict,
embed_size: int = 300,
num_layers: int = 5,
dropout: float = 0.5,
virtual_node : bool = False):
"""Graph Isomorphism Network (GIN) variant introduced in baselines
for OGB graph property prediction datasets
Parameters
----------
data_info : dict
The information about the input dataset.
embed_size : int
Embedding size.
num_layers : int
Number of layers.
dropout : float
Dropout rate.
virtual_node : bool
Whether to use virtual node.
"""
super(OGBGGIN, self).__init__()
self.data_info = data_info
self.embed_size = embed_size
self.num_layers = num_layers
self.virtual_node = virtual_node
if data_info['name'] in ['ogbg-molhiv', 'ogbg-molpcba']:
self.node_encoder = AtomEncoder(embed_size)
self.edge_encoders = nn.ModuleList([
BondEncoder(embed_size) for _ in range(num_layers)])
else:
# Handle other datasets
self.node_encoder = nn.Linear(data_info['node_feat_size'], embed_size)
self.edge_encoders = nn.ModuleList([nn.Linear(data_info['edge_feat_size'], embed_size)
for _ in range(num_layers)])
self.conv_layers = nn.ModuleList([GINEConv(MLP(embed_size)) for _ in range(num_layers)])
self.dropout = nn.Dropout(dropout)
self.pool = AvgPooling()
self.pred = nn.Linear(embed_size, data_info['out_size'])
if virtual_node:
self.virtual_emb = nn.Embedding(1, embed_size)
nn.init.constant_(self.virtual_emb.weight.data, 0)
self.mlp_virtual = nn.ModuleList()
for _ in range(num_layers - 1):
self.mlp_virtual.append(MLP(embed_size))
self.virtual_pool = SumPooling()
def forward(self, graph, node_feat, edge_feat):
if self.virtual_node:
virtual_emb = self.virtual_emb.weight.expand(graph.batch_size, -1)
hn = self.node_encoder(node_feat)
for layer in range(self.num_layers):
if self.virtual_node:
# messages from virtual nodes to graph nodes
virtual_hn = dgl.broadcast_nodes(graph, virtual_emb)
hn = hn + virtual_hn
he = self.edge_encoders[layer](edge_feat)
hn = self.conv_layers[layer](graph, hn, he)
if layer != self.num_layers - 1:
hn = F.relu(hn)
hn = self.dropout(hn)
if self.virtual_node and layer != self.num_layers - 1:
# messages from graph nodes to virtual nodes
virtual_emb_tmp = self.virtual_pool(graph, hn) + virtual_emb
virtual_emb = self.mlp_virtual[layer](virtual_emb_tmp)
virtual_emb = self.dropout(F.relu(virtual_emb))
hg = self.pool(graph, hn)
return self.pred(hg)
from typing import List
import dgl.function as fn
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from dgl.nn import SumPooling, AvgPooling
from ogb.graphproppred.mol_encoder import AtomEncoder
def aggregate_mean(h):
"""mean aggregation"""
return torch.mean(h, dim=1)
def aggregate_max(h):
"""max aggregation"""
return torch.max(h, dim=1)[0]
def aggregate_min(h):
"""min aggregation"""
return torch.min(h, dim=1)[0]
def aggregate_sum(h):
"""sum aggregation"""
return torch.sum(h, dim=1)
def aggregate_var(h):
"""variance aggregation"""
h_mean_squares = torch.mean(h * h, dim=1)
h_mean = torch.mean(h, dim=1)
var = torch.relu(h_mean_squares - h_mean * h_mean)
return var
def aggregate_std(h):
"""standard deviation aggregation"""
return torch.sqrt(aggregate_var(h) + 1e-5)
AGGREGATORS = {'mean': aggregate_mean, 'sum': aggregate_sum, 'max': aggregate_max,
'min': aggregate_min, 'std': aggregate_std, 'var': aggregate_var}
def scale_identity(h, D, delta):
"""identity scaling (no scaling operation)"""
return h
def scale_amplification(h, D, delta):
"""amplification scaling"""
return h * (np.log(D + 1) / delta)
def scale_attenuation(h, D, delta):
"""attenuation scaling"""
return h * (delta / np.log(D + 1))
SCALERS = {
'identity': scale_identity,
'amplification': scale_amplification,
'attenuation': scale_attenuation
}
class MLP(nn.Module):
def __init__(self,
in_feat_size: int,
out_feat_size: int,
num_layers: int=3,
decreasing_hidden_size=False):
"""Multilayer Perceptron (MLP)"""
super(MLP, self).__init__()
self.layers = nn.ModuleList()
if decreasing_hidden_size:
for i in range(num_layers - 1):
self.layers.append(nn.Linear(in_feat_size // 2 ** i,
in_feat_size // 2 ** (i + 1)))
self.layers.append(nn.Linear(in_feat_size // 2 ** (num_layers - 1),
out_feat_size))
else:
self.layers.append(nn.Linear(in_feat_size, out_feat_size))
for _ in range(num_layers - 1):
self.layers.append(nn.Linear(out_feat_size, out_feat_size))
self.num_layers = num_layers
def forward(self, h):
for i, layer in enumerate(self.layers):
h = layer(h)
if i != self.num_layers - 1:
h = F.relu(h)
return h
class SimplePNAConv(nn.Module):
r"""A simplified PNAConv variant used in OGB submissions"""
def __init__(self,
feat_size: int,
aggregators: List[str],
scalers: List[str],
delta: float,
dropout: float,
batch_norm: bool,
residual: bool,
num_mlp_layers: int):
super(SimplePNAConv, self).__init__()
self.aggregators = [AGGREGATORS[aggr] for aggr in aggregators]
self.scalers = [SCALERS[scale] for scale in scalers]
self.delta = delta
self.mlp = MLP(in_feat_size=(len(aggregators) * len(scalers)) * feat_size,
out_feat_size=feat_size, num_layers=num_mlp_layers)
self.dropout = nn.Dropout(dropout)
self.residual = residual
if batch_norm:
self.bn = nn.BatchNorm1d(feat_size)
else:
self.bn = None
def reduce(self, nodes):
h = nodes.mailbox['m']
D = h.shape[-2]
h = torch.cat([aggregate(h) for aggregate in self.aggregators], dim=1)
h = torch.cat([scale(h, D=D, delta=self.delta) for scale in self.scalers], dim=1)
return {'h': h}
def forward(self, g, h):
with g.local_scope():
g.ndata['h'] = h
g.update_all(fn.copy_u('h', 'm'), self.reduce)
h_new = g.ndata['h']
h_new = self.mlp(h_new)
if self.bn is not None:
h_new = self.bn(h_new)
h_new = F.relu(h_new)
if self.residual:
h_new = h_new + h
h_new = self.dropout(h_new)
return h_new
class PNA(nn.Module):
def __init__(self,
data_info: dict,
embed_size: int = 80,
aggregators: str = 'mean max min std',
scalers: str = 'identity amplification attenuation',
dropout: float = 0.3,
batch_norm: bool = True,
residual: bool = True,
num_mlp_layers: int = 1,
num_layers: int = 4,
readout: str = 'mean'):
"""Principal Neighbourhood Aggregation
Parameters
----------
data_info : dict
The information about the input dataset.
embed_size : int
Embedding size.
aggregators : str
Aggregation function names separated by space, can include mean, max, min, std, sum
scalers : str
Scaler function names separated by space, can include identity, amplification, and attenuation
dropout : float
Dropout rate.
batch_norm : bool
Whether to use batch normalization.
residual : bool
Whether to use residual connection.
num_mlp_layers : int
Number of MLP layers to use after message aggregation in each PNA layer.
num_layers : int
Number of PNA layers.
readout : str
Readout for computing graph-level representations, can be 'sum' or 'mean'.
"""
super(PNA, self).__init__()
self.data_info = data_info
self.embed_size = embed_size
self.dropout = dropout
self.batch_norm = batch_norm
self.residual = residual
self.num_mlp_layers = num_mlp_layers
self.num_layers = num_layers
self.readout = readout
if aggregators is None:
aggregators = ['mean', 'max', 'min', 'std']
else:
aggregators = [agg.strip() for agg in aggregators.split(' ')]
assert set(aggregators).issubset({'mean', 'max', 'min', 'std', 'sum'}), \
"Expect aggregators to be a subset of ['mean', 'max', 'min', 'std', 'sum'], \
got {}".format(aggregators)
if scalers is None:
scalers = ['identity', 'amplification', 'attenuation']
else:
scalers = [scl.strip() for scl in scalers.split(' ')]
assert set(scalers).issubset({'identity', 'amplification', 'attenuation'}), \
"Expect scalers to be a subset of ['identity', 'amplification', 'attenuation'], \
got {}".format(scalers)
self.aggregators = aggregators
self.scalers = scalers
if data_info['name'] in ['ogbg-molhiv', 'ogbg-molpcba']:
self.node_encoder = AtomEncoder(embed_size)
else:
# Handle other datasets
self.node_encoder = nn.Linear(data_info['node_feat_size'], embed_size)
self.conv_layers = nn.ModuleList([SimplePNAConv(feat_size=embed_size,
aggregators=aggregators,
scalers=scalers,
delta=data_info['delta'],
dropout=dropout,
batch_norm=batch_norm,
residual=residual,
num_mlp_layers=num_mlp_layers)
for _ in range(num_layers)])
if readout == 'sum':
self.pool = SumPooling()
elif readout == 'mean':
self.pool = AvgPooling()
else:
raise ValueError("Expect readout to be 'sum' or 'mean', got {}".format(readout))
self.pred = MLP(embed_size, data_info['out_size'], decreasing_hidden_size=True)
def forward(self, graph, node_feat, edge_feat=None):
hn = self.node_encoder(node_feat)
for conv in self.conv_layers:
hn = conv(graph, hn)
hg = self.pool(graph, hn)
return self.pred(hg)
from .nodepred import NodepredPipeline from .nodepred import NodepredPipeline
from .nodepred_sample import NodepredNsPipeline from .nodepred_sample import NodepredNsPipeline
from .linkpred import LinkpredPipeline from .linkpred import LinkpredPipeline
from .graphpred import GraphpredPipeline
\ No newline at end of file
from .gen import *
\ No newline at end of file
from pathlib import Path
from jinja2 import Template
import copy
import typer
from pydantic import BaseModel, Field
from typing import Optional
from ...utils.factory import PipelineFactory, GraphModelFactory, PipelineBase, DataFactory
from ...utils.yaml_dump import deep_convert_dict, merge_comment
import ruamel.yaml
pipeline_comments = {
"num_runs": "Number of experiments to run",
"train_batch_size": "Graph batch size when training",
"eval_batch_size": "Graph batch size when evaluating",
"num_workers": "Number of workers for data loading",
"num_epochs": "Number of training epochs",
"save_path": "Path to save the model"
}
class GraphpredPipelineCfg(BaseModel):
num_runs: int = 1
train_batch_size: int = 32
eval_batch_size: int = 32
num_workers: int = 4
optimizer: dict = {"name": "Adam", "lr": 0.001, "weight_decay": 0}
# Default to no lr decay
lr_scheduler: dict = {"name": "StepLR", "step_size": 100, "gamma": 1}
loss: str = "BCEWithLogitsLoss"
metric: str = "roc_auc_score"
num_epochs: int = 100
save_path: str = "model.pth"
@PipelineFactory.register("graphpred")
class GraphpredPipeline(PipelineBase):
def __init__(self):
self.pipeline_name = "graphpred"
@classmethod
def setup_user_cfg_cls(cls):
from ...utils.enter_config import UserConfig
class GraphPredUserConfig(UserConfig):
data: DataFactory.filter("graphpred").get_pydantic_config() = Field(..., discriminator="name")
model: GraphModelFactory.get_pydantic_model_config() = Field(..., discriminator="name")
general_pipeline: GraphpredPipelineCfg = GraphpredPipelineCfg()
cls.user_cfg_cls = GraphPredUserConfig
@property
def user_cfg_cls(self):
return self.__class__.user_cfg_cls
def get_cfg_func(self):
def config(
data: DataFactory.filter("graphpred").get_dataset_enum() = typer.Option(..., help="input data name"),
cfg: Optional[str] = typer.Option(
None, help="output configuration path"),
model: GraphModelFactory.get_model_enum() = typer.Option(..., help="Model name"),
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": self.pipeline_name,
"device": "cpu",
"data": {"name": data.name},
"model": {"name": model.value},
"general_pipeline": {}
}
output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg)
comment_dict = {
"device": "Torch device name, e.g., cpu or cuda or cuda:0",
"data": {
"split_ratio": 'Ratio to generate data split, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset'
},
"general_pipeline": pipeline_comments,
"model": GraphModelFactory.get_constructor_doc_dict(model.value)
}
comment_dict = merge_comment(output_cfg, comment_dict)
yaml = ruamel.yaml.YAML()
if cfg is None:
cfg = "_".join(["graphpred", data.value, model.value]) + ".yaml"
yaml.dump(comment_dict, Path(cfg).open("w"))
print("Configuration file is generated at {}".format(Path(cfg).absolute()))
return config
@classmethod
def gen_script(cls, user_cfg_dict):
cls.setup_user_cfg_cls()
file_current_dir = Path(__file__).resolve().parent
with open(file_current_dir / "graphpred.jinja-py", "r") as f:
template = Template(f.read())
render_cfg = copy.deepcopy(user_cfg_dict)
model_code = GraphModelFactory.get_source_code(
user_cfg_dict["model"]["name"])
render_cfg["model_code"] = model_code
render_cfg["model_class_name"] = GraphModelFactory.get_model_class_name(
user_cfg_dict["model"]["name"])
render_cfg.update(DataFactory.get_generated_code_dict(user_cfg_dict["data"]["name"], '**cfg["data"]'))
generated_user_cfg = copy.deepcopy(user_cfg_dict)
if "split_ratio" in generated_user_cfg["data"]:
generated_user_cfg["data"].pop("split_ratio")
generated_user_cfg["data_name"] = generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg["model_name"] = generated_user_cfg["model"].pop("name")
generated_user_cfg["general_pipeline"]["optimizer"].pop("name")
generated_user_cfg["general_pipeline"]["lr_scheduler"].pop("name")
generated_train_cfg = copy.deepcopy(user_cfg_dict["general_pipeline"])
generated_train_cfg["optimizer"].pop("name")
generated_train_cfg["lr_scheduler"].pop("name")
if user_cfg_dict["data"].get("split_ratio", None) is not None:
render_cfg["data_initialize_code"] = "{}, split_ratio={}".format(render_cfg["data_initialize_code"], user_cfg_dict["data"]["split_ratio"])
render_cfg["user_cfg_str"] = f"cfg = {str(generated_user_cfg)}"
render_cfg["user_cfg"] = user_cfg_dict
return template.render(**render_cfg)
@staticmethod
def get_description() -> str:
return "Graph property prediction pipeline"
import numpy as np
import sklearn
import torch
import torch.nn as nn
from dgl.data import AsGraphPredDataset
from dgl.dataloading import GraphDataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
{{ data_import_code }}
{{ model_code }}
def train(device, loader, model, criterion, optimizer):
model.train()
for _, (g, labels) in enumerate(tqdm(loader, desc="Iteration")):
g = g.to(device)
labels = labels.to(device)
node_feat = g.ndata['feat']
edge_feat = g.edata['feat']
pred = model(g, node_feat, edge_feat)
optimizer.zero_grad()
# ignore nan targets (unlabeled) when computing training loss
is_labeled = labels == labels
loss = criterion(pred.float()[is_labeled], labels.float()[is_labeled])
loss.backward()
optimizer.step()
def calc_metric(y_true, y_pred):
task_metric_list = []
for i in range(y_true.shape[1]):
# AUC is only defined when there is at least one positive and negative datapoint.
if np.sum(y_true[:, i] == 1) > 0 and np.sum(y_true[:, i] == 0) > 0:
# ignore nan values
is_labeled = y_true[:,i] == y_true[:,i]
task_metric = sklearn.metrics.{{ user_cfg.general_pipeline.metric }}(
y_true[is_labeled, i], y_pred[is_labeled, i])
task_metric_list.append(task_metric)
return sum(task_metric_list) / len(task_metric_list)
def evaluate(device, loader, model):
model.eval()
y_true = []
y_pred = []
for _, (g, labels) in enumerate(tqdm(loader, desc="Iteration")):
g = g.to(device)
labels = labels.to(device)
node_feat = g.ndata['feat']
edge_feat = g.edata['feat']
with torch.no_grad():
pred = model(g, node_feat, edge_feat)
y_true.append(labels.view(pred.shape).detach().cpu())
y_pred.append(pred.detach().cpu())
y_true = torch.cat(y_true, dim=0).numpy()
y_pred = torch.cat(y_pred, dim=0).numpy()
return calc_metric(y_true, y_pred)
def main(run):
{{ user_cfg_str }}
device = cfg['device']
if not torch.cuda.is_available():
device = 'cpu'
pipeline_cfg = cfg['general_pipeline']
save_path = pipeline_cfg['save_path']
# load data
data = AsGraphPredDataset({{data_initialize_code}})
train_loader = GraphDataLoader(data[data.train_idx], batch_size=pipeline_cfg['train_batch_size'],
shuffle=True, num_workers=pipeline_cfg['num_workers'])
val_loader = GraphDataLoader(data[data.val_idx], batch_size=pipeline_cfg['eval_batch_size'],
shuffle=False, num_workers=pipeline_cfg['num_workers'])
test_loader = GraphDataLoader(data[data.test_idx], batch_size=pipeline_cfg['eval_batch_size'],
shuffle=False, num_workers=pipeline_cfg['num_workers'])
# create model
model_cfg = cfg["model"]
# data[0] is a tuple (g, label)
cfg["model"]["data_info"] = {
"name": cfg["data_name"],
"node_feat_size": data.node_feat_size,
"edge_feat_size": data.edge_feat_size,
"out_size": data.num_tasks
}
if cfg["model_name"] == 'pna':
in_deg = torch.cat([g.in_degrees() for (g, _) in data[data.train_idx]])
cfg["model"]["data_info"]["delta"] = torch.mean(torch.log(in_deg + 1))
model = {{ model_class_name }}(**cfg["model"])
model = model.to(device)
criterion = nn.{{ user_cfg.general_pipeline.loss }}()
optimizer = torch.optim.{{ user_cfg.general_pipeline.optimizer.name }}(
model.parameters(), **pipeline_cfg["optimizer"])
lr_scheduler = torch.optim.lr_scheduler.{{ user_cfg.general_pipeline.lr_scheduler.name }}(
optimizer, **pipeline_cfg["lr_scheduler"])
best_val_metric = 0.
for epoch in range(pipeline_cfg['num_epochs']):
train(device, train_loader, model, criterion, optimizer)
val_metric = evaluate(device, val_loader, model)
if val_metric >= best_val_metric:
best_val_metric = val_metric
torch.save(model.state_dict(), save_path)
print('Run {:d} | Epoch {:d} | Val Metric {:.4f} | Best Val Metric {:.4f}'.format(
run, epoch, val_metric, best_val_metric))
if isinstance(lr_scheduler, ReduceLROnPlateau):
lr_scheduler.step(val_metric)
else:
lr_scheduler.step()
model.load_state_dict(torch.load(save_path))
test_metric = evaluate(device, test_loader, model)
print('Test Metric: {:.4f}'.format(test_metric))
return test_metric
if __name__ == '__main__':
all_run_metrics = []
num_runs = {{ user_cfg.general_pipeline.num_runs }}
for run in range(num_runs):
print('Run experiment {:d}'.format(run))
test_metric = main(run)
all_run_metrics.append(test_metric)
avg_metric = np.round(np.mean(all_run_metrics), 6)
std_metric = np.round(np.std(all_run_metrics), 6)
print('Test Metric across {:d} runs: {:.6f} ± {:.6f}'.format(
num_runs, avg_metric, std_metric))
...@@ -89,7 +89,7 @@ class LinkpredPipeline(PipelineBase): ...@@ -89,7 +89,7 @@ class LinkpredPipeline(PipelineBase):
output_cfg = self.user_cfg_cls(**generated_cfg).dict() output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg) output_cfg = deep_convert_dict(output_cfg)
comment_dict = { comment_dict = {
"device": "Torch device name, e.q. cpu or cuda or cuda:0", "device": "Torch device name, e.g., cpu or cuda or cuda:0",
"general_pipeline": pipeline_comments, "general_pipeline": pipeline_comments,
"node_model": NodeModelFactory.get_constructor_doc_dict(node_model.value), "node_model": NodeModelFactory.get_constructor_doc_dict(node_model.value),
"edge_model": EdgeModelFactory.get_constructor_doc_dict(edge_model.value), "edge_model": EdgeModelFactory.get_constructor_doc_dict(edge_model.value),
......
...@@ -71,7 +71,7 @@ class NodepredPipeline(PipelineBase): ...@@ -71,7 +71,7 @@ class NodepredPipeline(PipelineBase):
output_cfg = self.user_cfg_cls(**generated_cfg).dict() output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg) output_cfg = deep_convert_dict(output_cfg)
comment_dict = { comment_dict = {
"device": "Torch device name, e.q. cpu or cuda or cuda:0", "device": "Torch device name, e.g., cpu or cuda or cuda:0",
"data": { "data": {
"split_ratio": 'Ratio to generate split masks, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset' "split_ratio": 'Ratio to generate split masks, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset'
}, },
......
...@@ -96,7 +96,7 @@ class NodepredNsPipeline(PipelineBase): ...@@ -96,7 +96,7 @@ class NodepredNsPipeline(PipelineBase):
output_cfg = self.user_cfg_cls(**generated_cfg).dict() output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg) output_cfg = deep_convert_dict(output_cfg)
comment_dict = { comment_dict = {
"device": "Torch device name, e.q. cpu or cuda or cuda:0", "device": "Torch device name, e.g., cpu or cuda or cuda:0",
"data": { "data": {
"split_ratio": 'Ratio to generate split masks, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset' "split_ratio": 'Ratio to generate split masks, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset'
}, },
......
...@@ -12,7 +12,7 @@ import inspect ...@@ -12,7 +12,7 @@ import inspect
from numpydoc import docscrape from numpydoc import docscrape
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
ALL_PIPELINE = ["nodepred", "nodepred-ns", "linkpred"] ALL_PIPELINE = ["nodepred", "nodepred-ns", "linkpred", "graphpred"]
class PipelineBase(ABC): class PipelineBase(ABC):
...@@ -164,7 +164,7 @@ DataFactory.register( ...@@ -164,7 +164,7 @@ DataFactory.register(
import_code="from dgl.data import CSVDataset", import_code="from dgl.data import CSVDataset",
extra_args={"data_path": "./"}, extra_args={"data_path": "./"},
class_name="CSVDataset({})", class_name="CSVDataset({})",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"]) allowed_pipeline=["nodepred", "nodepred-ns", "linkpred", "graphpred"])
DataFactory.register( DataFactory.register(
"reddit", "reddit",
...@@ -206,6 +206,20 @@ DataFactory.register( ...@@ -206,6 +206,20 @@ DataFactory.register(
class_name="DglLinkPropPredDataset('ogbl-citation2')", class_name="DglLinkPropPredDataset('ogbl-citation2')",
allowed_pipeline=["linkpred"]) allowed_pipeline=["linkpred"])
DataFactory.register(
"ogbg-molhiv",
import_code="from ogb.graphproppred import DglGraphPropPredDataset",
extra_args={},
class_name="DglGraphPropPredDataset(name='ogbg-molhiv')",
allowed_pipeline=["graphpred"])
DataFactory.register(
"ogbg-molpcba",
import_code="from ogb.graphproppred import DglGraphPropPredDataset",
extra_args={},
class_name="DglGraphPropPredDataset(name='ogbg-molpcba')",
allowed_pipeline=["graphpred"])
class PipelineFactory: class PipelineFactory:
""" The factory class for creating executors""" """ The factory class for creating executors"""
...@@ -424,3 +438,4 @@ NegativeSamplerFactory.register("persource")(PerSourceUniform) ...@@ -424,3 +438,4 @@ NegativeSamplerFactory.register("persource")(PerSourceUniform)
NodeModelFactory = ModelFactory() NodeModelFactory = ModelFactory()
EdgeModelFactory = ModelFactory() EdgeModelFactory = ModelFactory()
GraphModelFactory = ModelFactory()
version: 0.0.1
pipeline_name: graphpred
device: cuda:0 # Torch device name, e.q. cpu or cuda or cuda:0
data:
name: ogbg-molhiv
split_ratio: # Ratio to generate data split, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset
model:
name: gin
embed_size: 300 # Embedding size.
num_layers: 5 # Number of layers.
dropout: 0.5 # Dropout rate.
virtual_node: true # Whether to use virtual node.
general_pipeline:
num_runs: 10 # Number of experiments to run
train_batch_size: 32 # Graph batch size when training
eval_batch_size: 32 # Graph batch size when evaluating
num_workers: 4 # Number of workers for data loading
optimizer:
name: Adam
lr: 0.001
weight_decay: 0
lr_scheduler:
name: StepLR
step_size: 100
gamma: 1
loss: BCEWithLogitsLoss
metric: roc_auc_score
num_epochs: 100 # Number of training epochs
save_path: model.pth # Path to save the model
version: 0.0.1
pipeline_name: graphpred
device: cuda:0 # Torch device name, e.q. cpu or cuda or cuda:0
data:
name: ogbg-molhiv
split_ratio: # Ratio to generate data split, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset
model:
name: pna
embed_size: 80 # Embedding size.
aggregators: mean max min std # Aggregation function names separated by space, can include mean, max, min, std, sum
scalers: identity amplification attenuation # Scaler function names separated by space, can include identity, amplification, and attenuation
dropout: 0.3 # Dropout rate.
batch_norm: true # Whether to use batch normalization.
residual: true # Whether to use residual connection.
num_mlp_layers: 1 # Number of MLP layers to use after message aggregation in each PNA layer.
num_layers: 4 # Number of PNA layers.
readout: mean # Readout for computing graph-level representations, can be 'sum' or 'mean'.
general_pipeline:
num_runs: 10 # Number of experiments to run
train_batch_size: 128 # Graph batch size when training
eval_batch_size: 128 # Graph batch size when evaluating
num_workers: 4 # Number of workers for data loading
optimizer:
name: Adam
lr: 0.01
weight_decay: 0.000003
lr_scheduler:
name: ReduceLROnPlateau
mode: max
factor: 0.5
patience: 20
verbose: true
loss: BCEWithLogitsLoss
metric: roc_auc_score
num_epochs: 200 # Number of training epochs
save_path: model.pth # Path to save the model
version: 0.0.1
pipeline_name: graphpred
device: cuda:0 # Torch device name, e.q. cpu or cuda or cuda:0
data:
name: ogbg-molpcba
split_ratio: # Ratio to generate data split, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset
model:
name: gin
embed_size: 300 # Embedding size.
num_layers: 5 # Number of layers.
dropout: 0.5 # Dropout rate.
virtual_node: true # Whether to use virtual node.
general_pipeline:
num_runs: 10 # Number of experiments to run
train_batch_size: 32 # Graph batch size when training
eval_batch_size: 32 # Graph batch size when evaluating
num_workers: 4 # Number of workers for data loading
optimizer:
name: Adam
lr: 0.001
weight_decay: 0
lr_scheduler:
name: StepLR
step_size: 100
gamma: 1
loss: BCEWithLogitsLoss
metric: average_precision_score
num_epochs: 100 # Number of training epochs
save_path: model.pth # Path to save the model
...@@ -16,7 +16,10 @@ setup(name='dglgo', ...@@ -16,7 +16,10 @@ setup(name='dglgo',
'numpydoc>=1.1.0', 'numpydoc>=1.1.0',
"pydantic>=1.9.0", "pydantic>=1.9.0",
"ruamel.yaml>=0.17.20", "ruamel.yaml>=0.17.20",
"PyYAML>=5.1" "PyYAML>=5.1",
"ogb>=1.3.3",
"rdkit-pypi",
"scikit-learn>=0.20.0"
], ],
package_data={"": ["./*"]}, package_data={"": ["./*"]},
include_package_data=True, include_package_data=True,
......
import dgl
import pytest import pytest
import torch import torch
from dglgo.model import * from dglgo.model import *
...@@ -171,3 +172,69 @@ def test_ele(): ...@@ -171,3 +172,69 @@ def test_ele():
h_src = torch.randn(num_pairs, data_info['in_size']) h_src = torch.randn(num_pairs, data_info['in_size'])
h_dst = torch.randn(num_pairs, data_info['in_size']) h_dst = torch.randn(num_pairs, data_info['in_size'])
model(h_src, h_dst) model(h_src, h_dst)
@pytest.mark.parametrize('virtual_node', [True, False])
def test_ogbg_gin(virtual_node):
# Test for ogbg-mol datasets
data_info = {
'name': 'ogbg-molhiv',
'out_size': 1
}
model = OGBGGIN(data_info,
embed_size=10,
num_layers=2,
virtual_node=virtual_node)
num_nodes = 5
num_edges = 15
g1 = dgl.rand_graph(num_nodes, num_edges)
g2 = dgl.rand_graph(num_nodes, num_edges)
g = dgl.batch([g1, g2])
num_nodes = g.num_nodes()
num_edges = g.num_edges()
nfeat = torch.zeros(num_nodes, 9).long()
efeat = torch.zeros(num_edges, 3).long()
model(g, nfeat, efeat)
# Test for non-ogbg-mol datasets
data_info = {
'name': 'a_dataset',
'out_size': 1,
'node_feat_size': 15,
'edge_feat_size': 5
}
model = OGBGGIN(data_info,
embed_size=10,
num_layers=2,
virtual_node=virtual_node)
nfeat = torch.randn(num_nodes, data_info['node_feat_size'])
efeat = torch.randn(num_edges, data_info['edge_feat_size'])
model(g, nfeat, efeat)
def test_pna():
# Test for ogbg-mol datasets
data_info = {
'name': 'ogbg-molhiv',
'delta': 1,
'out_size': 1
}
model = PNA(data_info,
embed_size=10,
num_layers=2)
num_nodes = 5
num_edges = 15
g = dgl.rand_graph(num_nodes, num_edges)
nfeat = torch.zeros(num_nodes, 9).long()
model(g, nfeat)
# Test for non-ogbg-mol datasets
data_info = {
'name': 'a_dataset',
'node_feat_size': 15,
'delta': 1,
'out_size': 1
}
model = PNA(data_info,
embed_size=10,
num_layers=2)
nfeat = torch.randn(num_nodes, data_info['node_feat_size'])
model(g, nfeat)
...@@ -60,8 +60,26 @@ def test_linkpred_default_neg_sampler(data, node_model, edge_model): ...@@ -60,8 +60,26 @@ def test_linkpred_default_neg_sampler(data, node_model, edge_model):
data, node_model, edge_model, custom_config_file)) data, node_model, edge_model, custom_config_file))
assert os.path.exists(custom_config_file) assert os.path.exists(custom_config_file)
@pytest.mark.parametrize('data', ['csv', 'ogbg-molhiv', 'ogbg-molpcba'])
@pytest.mark.parametrize('model', ['gin', 'pna'])
def test_graphpred(data, model):
os.system('dgl configure graphpred --data {} --model {}'.format(data, model))
assert os.path.exists('graphpred_{}_{}.yaml'.format(data, model))
custom_config_file = 'custom_{}_{}.yaml'.format(data, model)
os.system('dgl configure graphpred --data {} --model {} --cfg {}'.format(data, model,
custom_config_file))
assert os.path.exists(custom_config_file)
custom_script = '_'.join([data, model]) + '.py'
os.system('dgl export --cfg {} --output {}'.format(custom_config_file, custom_script))
assert os.path.exists(custom_script)
@pytest.mark.parametrize('recipe', @pytest.mark.parametrize('recipe',
['linkpred_cora_sage.yaml', ['graphpred_hiv_gin.yaml',
'graphpred_hiv_pna.yaml',
'graphpred_pcba_gin.yaml',
'linkpred_cora_sage.yaml',
'linkpred_citation2_sage.yaml', 'linkpred_citation2_sage.yaml',
'linkpred_collab_sage.yaml', 'linkpred_collab_sage.yaml',
'nodepred_citeseer_gat.yaml', 'nodepred_citeseer_gat.yaml',
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment