Unverified Commit 539335ce authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

DGL Enter (#3690)



* add

* fix

* fix

* fix

* fix

* add

* add

* fix

* fix

* fix

* new loader

* fix

* fix

* fix for 3.6

* fix

* add

* add receipes and also some bug fixes

* fix

* fix

* fix

* fix receipies

* allow AsNodeDataset to work on ogb

* add ut

* many fixes for nodepred-ns pipeline

* receipe for nodepred-ns

* Update enter/README.md
Co-authored-by: default avatarZihao Ye <zihaoye.cs@gmail.com>

* fix layers

* fix

* fix

* fix

* fix

* fix multiple issues

* fix for citation2

* fix comment

* fix

* fix

* clean up

* fix
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
Co-authored-by: default avatarMinjie Wang <minjie.wang@nyu.edu>
Co-authored-by: default avatarZihao Ye <zihaoye.cs@gmail.com>
parent 80fb4dbe
from pathlib import Path
from jinja2 import Template
import copy
import typer
from pydantic import BaseModel, Field
from typing import Optional
import yaml
from ...utils.factory import PipelineFactory, NodeModelFactory, PipelineBase, DataFactory, EdgeModelFactory, NegativeSamplerFactory
from ...utils.base_model import EarlyStopConfig, DeviceEnum
from ...utils.yaml_dump import deep_convert_dict, merge_comment
import ruamel.yaml
from ruamel.yaml.comments import CommentedMap
class LinkpredPipelineCfg(BaseModel):
hidden_size: int = 256
eval_batch_size: int = 32769
train_batch_size: int = 32769
num_epochs: int = 200
eval_period: int = 5
optimizer: dict = {"name": "Adam", "lr": 0.005}
loss: str = "BCELoss"
num_runs: int = 1
pipeline_comments = {
"hidden_size": "The intermediate hidden size between node model and edge model",
"eval_batch_size": "Edge batch size when evaluating",
"train_batch_size": "Edge batch size when training",
"num_epochs": "Number of training epochs",
"eval_period": "Interval epochs between evaluations",
"num_runs": "Number of experiments to run",
}
@PipelineFactory.register("linkpred")
class LinkpredPipeline(PipelineBase):
user_cfg_cls = None
pipeline_name = "linkpred"
def __init__(self):
self.pipeline_name = "linkpred"
@classmethod
def setup_user_cfg_cls(cls):
from ...utils.enter_config import UserConfig
class LinkPredUserConfig(UserConfig):
pipeline_name: str = "linkpred"
data: DataFactory.filter("linkpred").get_pydantic_config() = Field(..., discriminator="name")
node_model: NodeModelFactory.get_pydantic_model_config() = Field(...,
discriminator="name")
edge_model: EdgeModelFactory.get_pydantic_model_config() = Field(...,
discriminator="name")
neg_sampler: NegativeSamplerFactory.get_pydantic_model_config() = Field(...,
discriminator="name")
general_pipeline: LinkpredPipelineCfg = LinkpredPipelineCfg()
cls.user_cfg_cls = LinkPredUserConfig
@property
def user_cfg_cls(self):
return self.__class__.user_cfg_cls
def get_cfg_func(self):
def config(
data: DataFactory.filter("linkpred").get_dataset_enum() = typer.Option(..., help="input data name"),
cfg: str = typer.Option(
"cfg.yml", help="output configuration path"),
node_model: NodeModelFactory.get_model_enum() = typer.Option(...,
help="Model name"),
edge_model: EdgeModelFactory.get_model_enum() = typer.Option(...,
help="Model name"),
neg_sampler: NegativeSamplerFactory.get_model_enum() = typer.Option(
"uniform", help="Negative sampler name"),
device: DeviceEnum = typer.Option(
"cpu", help="Device, cpu or cuda"),
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": "linkpred",
"device": device.value,
"data": {"name": data.name},
"neg_sampler": {"name": neg_sampler.value},
"node_model": {"name": node_model.value},
"edge_model": {"name": edge_model.value},
}
output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg)
comment_dict = {
"general_pipeline": pipeline_comments,
"node_model": NodeModelFactory.get_constructor_doc_dict(node_model.value),
"edge_model": EdgeModelFactory.get_constructor_doc_dict(edge_model.value),
"neg_sampler": NegativeSamplerFactory.get_constructor_doc_dict(neg_sampler.value),
"data": {
"split_ratio": 'List of float, e.q. [0.8, 0.1, 0.1]. Split ratios for training, validation and test sets. Must sum to one. Leave blank to use builtin split in original dataset',
"neg_ratio": 'Int, e.q. 2. Indicate how much negative samples to be sampled per positive samples. Leave blank to use builtin split in original dataset'
},
}
comment_dict = merge_comment(output_cfg, comment_dict)
yaml = ruamel.yaml.YAML()
yaml.dump(comment_dict, Path(cfg).open("w"))
print("Configuration file is generated at {}".format(Path(cfg).absolute()))
return config
@classmethod
def gen_script(cls, user_cfg_dict):
cls.setup_user_cfg_cls()
# Check validation
user_cfg = cls.user_cfg_cls(**user_cfg_dict)
file_current_dir = Path(__file__).resolve().parent
with open(file_current_dir / "linkpred.jinja-py", "r") as f:
template = Template(f.read())
render_cfg = copy.deepcopy(user_cfg_dict)
render_cfg["node_model_code"] = NodeModelFactory.get_source_code(
user_cfg_dict["node_model"]["name"])
render_cfg["edge_model_code"] = EdgeModelFactory.get_source_code(
user_cfg_dict["edge_model"]["name"])
render_cfg["node_model_class_name"] = NodeModelFactory.get_model_class_name(
user_cfg_dict["node_model"]["name"])
render_cfg["edge_model_class_name"] = EdgeModelFactory.get_model_class_name(
user_cfg_dict["edge_model"]["name"])
render_cfg["neg_sampler_name"] = NegativeSamplerFactory.get_model_class_name(
user_cfg_dict["neg_sampler"]["name"])
render_cfg["loss"] = user_cfg_dict["general_pipeline"]["loss"]
# update import and initialization code
render_cfg.update(DataFactory.get_generated_code_dict(user_cfg_dict["data"]["name"], '**cfg["data"]'))
generated_user_cfg = copy.deepcopy(user_cfg_dict)
if len(generated_user_cfg["data"]) == 1:
generated_user_cfg.pop("data")
else:
generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg["node_model"].pop("name")
generated_user_cfg["edge_model"].pop("name")
generated_user_cfg["neg_sampler"].pop("name")
generated_user_cfg["general_pipeline"]["optimizer"].pop("name")
generated_user_cfg["general_pipeline"].pop("loss")
generated_train_cfg = copy.deepcopy(user_cfg_dict["general_pipeline"])
generated_train_cfg["optimizer"].pop("name")
if user_cfg_dict["data"].get("split_ratio", None) is not None:
assert user_cfg_dict["data"].get("neg_ratio", None) is not None, "Please specify both split_ratio and neg_ratio"
render_cfg["data_initialize_code"] = "{}, split_ratio={}, neg_ratio={}".format(render_cfg["data_initialize_code"], user_cfg_dict["data"]["split_ratio"], user_cfg_dict["data"]["neg_ratio"])
generated_user_cfg["data"].pop("split_ratio")
generated_user_cfg["data"].pop("neg_ratio")
render_cfg["user_cfg_str"] = f"cfg = {str(generated_user_cfg)}"
render_cfg["user_cfg"] = user_cfg_dict
return template.render(**render_cfg)
@staticmethod
def get_description() -> str:
return "Link prediction pipeline"
import dgl
from dgl.data import AsLinkPredDataset
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import numpy as np
{{ data_import_code }}
{{ edge_model_code }}
{{ node_model_code}}
class Model(nn.Module):
def __init__(self, node_model, edge_model, neg_sampler, eval_batch_size):
super().__init__()
self.node_model = node_model
self.edge_model = edge_model
self.neg_sampler = neg_sampler
self.eval_batch_size = eval_batch_size
def inference(self, g, x, edges):
src, dst = edges
h = self.node_model(g, x)
eid_dataloader = DataLoader(
range(
src.shape[-1]),
batch_size=self.eval_batch_size)
score_list = []
for eids in eid_dataloader:
score = self.edge_model(h[src[eids]], h[dst[eids]])
score_list.append(score)
return torch.cat(score_list, dim=0)
def calc_hitsk(y_pred_pos, y_pred_neg, k):
kth_score_in_negative_edges = torch.topk(y_pred_neg.flatten(), k)[0][-1]
hitsK = (y_pred_pos > kth_score_in_negative_edges).float().mean()
return hitsK.item()
def train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss_fcn):
train_g = dataset.train_graph
train_g = train_g.to(device)
node_feat = train_g.ndata['feat']
train_src, train_dst = train_g.edges()
for epoch in range(pipeline_cfg['num_epochs']):
model.train()
eid_dataloader = DataLoader(range(train_g.num_edges()), batch_size = pipeline_cfg["train_batch_size"], shuffle=True)
for eids in eid_dataloader:
h = model.node_model(train_g, node_feat)
eids = eids.to(device)
src, dst = train_src[eids], train_dst[eids]
pos_score = model.edge_model(h[src], h[dst])
neg_src, neg_dst = model.neg_sampler(train_g, eids)
neg_score = model.edge_model(h[neg_src], h[neg_dst])
loss = loss_fcn(torch.cat([pos_score, neg_score]), torch.cat(
[torch.ones_like(pos_score), torch.zeros_like(neg_score)]))
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
with torch.no_grad():
model.eval()
val_neg_edges = dataset.val_edges[1]
val_neg_score = model.inference(train_g, node_feat, val_neg_edges)
train_hits = calc_hitsk(pos_score, val_neg_score, k=50)
print("Epoch {:05d} | Loss {:.4f} | Train Hits@50 {:.4f}".format(epoch, loss, train_hits))
if epoch != 0 and epoch % pipeline_cfg['eval_period'] == 0:
with torch.no_grad():
model.eval()
val_pos_edge, val_neg_edges = dataset.val_edges
pos_result = model.inference(train_g, node_feat, val_pos_edge)
neg_result = model.inference(train_g, node_feat, val_neg_edges)
val_hits = calc_hitsk(pos_result, neg_result, k=50)
print("Epoch {:05d} | Val Hits@50 {:.4f}".format(epoch, val_hits))
with torch.no_grad():
model.eval()
test_pos_edge, test_neg_edges = dataset.test_edges
pos_result = model.inference(train_g, node_feat, test_pos_edge)
neg_result = model.inference(train_g, node_feat, test_neg_edges)
test_hits = calc_hitsk(pos_result, neg_result, k=50)
print("Test Hits@50 {:.4f}".format(test_hits))
return test_hits
def main():
{{user_cfg_str}}
device = cfg['device']
pipeline_cfg = cfg['general_pipeline']
dataset = AsLinkPredDataset({{ data_initialize_code }})
if 'feat' not in dataset[0].ndata:
assert cfg["node_model"]["embed_size"] > 0, "Need to specify embed size if graph doesn't have feat in ndata"
cfg["node_model"]["data_info"] = {
"in_size": cfg["node_model"]['embed_size'] if cfg["node_model"]['embed_size'] > 0 else dataset[0].ndata['feat'].shape[1],
"out_size": pipeline_cfg['hidden_size'],
"num_nodes": dataset[0].num_nodes()
}
cfg["edge_model"]["data_info"] = {
"in_size": pipeline_cfg['hidden_size'],
"out_size": 1 # output each edge score
}
node_model = {{node_model_class_name}}(**cfg["node_model"])
edge_model = {{edge_model_class_name}}(**cfg["edge_model"])
neg_sampler = dgl.dataloading.negative_sampler.{{ neg_sampler_name }}(**cfg["neg_sampler"])
model = Model(node_model, edge_model, neg_sampler, pipeline_cfg["eval_batch_size"])
model = model.to(device)
params = model.parameters()
loss = torch.nn.{{ loss }}()
optimizer = torch.optim.Adam(params, **pipeline_cfg["optimizer"])
test_hits = train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss)
return test_hits
if __name__ == '__main__':
all_acc = []
num_runs = {{ user_cfg.general_pipeline.num_runs }}
for run in range(num_runs):
print(f'Run experiment #{run}')
test_acc = main()
print("Test Hits@50 {:.4f}".format(test_acc))
all_acc.append(test_acc)
avg_acc = np.round(np.mean(all_acc), 6)
std_acc = np.round(np.std(all_acc), 6)
print(f'Test Hits@50 across {num_runs} runs: {avg_acc} ± {std_acc}')
from .gen import *
\ No newline at end of file
from pathlib import Path
from jinja2 import Template
import copy
import typer
from pydantic import BaseModel, Field
from typing import Optional
import yaml
from ...utils.factory import PipelineFactory, NodeModelFactory, PipelineBase, DataFactory
from ...utils.base_model import EarlyStopConfig, DeviceEnum
from ...utils.yaml_dump import deep_convert_dict, merge_comment
import ruamel.yaml
from ruamel.yaml.comments import CommentedMap
pipeline_comments = {
"num_epochs": "Number of training epochs",
"eval_period": "Interval epochs between evaluations",
"early_stop": {
"patience": "Steps before early stop",
"checkpoint_path": "Early stop checkpoint model file path"
},
"num_runs": "Number of experiments to run",
}
class NodepredPipelineCfg(BaseModel):
early_stop: Optional[EarlyStopConfig] = EarlyStopConfig()
num_epochs: int = 200
eval_period: int = 5
optimizer: dict = {"name": "Adam", "lr": 0.01, "weight_decay": 5e-4}
loss: str = "CrossEntropyLoss"
num_runs: int = 1
@PipelineFactory.register("nodepred")
class NodepredPipeline(PipelineBase):
user_cfg_cls = None
def __init__(self):
self.pipeline_name = "nodepred"
@classmethod
def setup_user_cfg_cls(cls):
from ...utils.enter_config import UserConfig
class NodePredUserConfig(UserConfig):
data: DataFactory.filter("nodepred").get_pydantic_config() = Field(..., discriminator="name")
model : NodeModelFactory.get_pydantic_model_config() = Field(..., discriminator="name")
general_pipeline: NodepredPipelineCfg = NodepredPipelineCfg()
cls.user_cfg_cls = NodePredUserConfig
@property
def user_cfg_cls(self):
return self.__class__.user_cfg_cls
def get_cfg_func(self):
def config(
data: DataFactory.filter("nodepred").get_dataset_enum() = typer.Option(..., help="input data name"),
cfg: str = typer.Option(
"cfg.yml", help="output configuration path"),
model: NodeModelFactory.get_model_enum() = typer.Option(..., help="Model name"),
device: DeviceEnum = typer.Option("cpu", help="Device, cpu or cuda"),
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": self.pipeline_name,
"device": device,
"data": {"name": data.name},
"model": {"name": model.value},
"general_pipeline": {}
}
output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg)
comment_dict = {
"data": {
"split_ratio": 'Ratio to generate split masks, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset'
},
"general_pipeline": pipeline_comments,
"model": NodeModelFactory.get_constructor_doc_dict(model.value)
}
comment_dict = merge_comment(output_cfg, comment_dict)
yaml = ruamel.yaml.YAML()
yaml.dump(comment_dict, Path(cfg).open("w"))
print("Configuration file is generated at {}".format(Path(cfg).absolute()))
return config
@classmethod
def gen_script(cls, user_cfg_dict):
# Check validation
cls.setup_user_cfg_cls()
user_cfg = cls.user_cfg_cls(**user_cfg_dict)
file_current_dir = Path(__file__).resolve().parent
with open(file_current_dir / "nodepred.jinja-py", "r") as f:
template = Template(f.read())
render_cfg = copy.deepcopy(user_cfg_dict)
model_code = NodeModelFactory.get_source_code(
user_cfg_dict["model"]["name"])
render_cfg["model_code"] = model_code
render_cfg["model_class_name"] = NodeModelFactory.get_model_class_name(
user_cfg_dict["model"]["name"])
render_cfg.update(DataFactory.get_generated_code_dict(user_cfg_dict["data"]["name"], '**cfg["data"]'))
generated_user_cfg = copy.deepcopy(user_cfg_dict)
if len(generated_user_cfg["data"]) == 1:
generated_user_cfg.pop("data")
else:
generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg["model"].pop("name")
generated_user_cfg["general_pipeline"]["optimizer"].pop("name")
generated_train_cfg = copy.deepcopy(user_cfg_dict["general_pipeline"])
generated_train_cfg["optimizer"].pop("name")
if user_cfg_dict["data"].get("split_ratio", None) is not None:
render_cfg["data_initialize_code"] = "{}, split_ratio={}".format(render_cfg["data_initialize_code"], user_cfg_dict["data"]["split_ratio"])
if "split_ratio" in generated_user_cfg["data"]:
generated_user_cfg["data"].pop("split_ratio")
render_cfg["user_cfg_str"] = f"cfg = {str(generated_user_cfg)}"
render_cfg["user_cfg"] = user_cfg_dict
return template.render(**render_cfg)
@staticmethod
def get_description() -> str:
return "Node classification pipeline"
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
from dgl.data import AsNodePredDataset
{{ data_import_code }}
{{ model_code }}
{% if user_cfg.general_pipeline.early_stop %}
class EarlyStopping:
def __init__(self,
patience: int = -1,
checkpoint_path: str = 'checkpoint.pt'):
self.patience = patience
self.checkpoint_path = checkpoint_path
self.counter = 0
self.best_score = None
self.early_stop = False
def step(self, acc, model):
score = acc
if self.best_score is None:
self.best_score = score
self.save_checkpoint(model)
elif score < self.best_score:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(model)
self.counter = 0
return self.early_stop
def save_checkpoint(self, model):
'''Save model when validation loss decreases.'''
torch.save(model.state_dict(), self.checkpoint_path)
def load_checkpoint(self, model):
model.load_state_dict(torch.load(self.checkpoint_path))
{% endif %}
def accuracy(logits, labels):
_, indices = torch.max(logits, dim=1)
correct = torch.sum(indices == labels)
return correct.item() * 1.0 / len(labels)
def train(cfg, pipeline_cfg, device, data, model, optimizer, loss_fcn):
g = data[0] # Only train on the first graph
g = dgl.remove_self_loop(g)
g = dgl.add_self_loop(g)
g = g.to(device)
node_feat = g.ndata.get('feat', None)
edge_feat = g.edata.get('feat', None)
label = g.ndata['label']
train_mask, val_mask, test_mask = g.ndata['train_mask'].bool(), g.ndata['val_mask'].bool(), g.ndata['test_mask'].bool()
{% if user_cfg.general_pipeline.early_stop %}
stopper = EarlyStopping(**pipeline_cfg['early_stop'])
{% endif %}
val_acc = 0.
for epoch in range(pipeline_cfg['num_epochs']):
model.train()
logits = model(g, node_feat, edge_feat)
loss = loss_fcn(logits[train_mask], label[train_mask])
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_acc = accuracy(logits[train_mask], label[train_mask])
if epoch != 0 and epoch % pipeline_cfg['eval_period'] == 0:
val_acc = accuracy(logits[val_mask], label[val_mask])
{% if user_cfg.general_pipeline.early_stop %}
if stopper.step(val_acc, model):
break
{% endif %}
print("Epoch {:05d} | Loss {:.4f} | TrainAcc {:.4f} | ValAcc {:.4f}".
format(epoch, loss.item(), train_acc, val_acc))
{% if user_cfg.general_pipeline.early_stop %}
stopper.load_checkpoint(model)
{% endif %}
model.eval()
with torch.no_grad():
logits = model(g, node_feat, edge_feat)
test_acc = accuracy(logits[test_mask], label[test_mask])
return test_acc
def main():
{{ user_cfg_str }}
device = cfg['device']
pipeline_cfg = cfg['general_pipeline']
# load data
data = AsNodePredDataset({{data_initialize_code}})
# create model
model_cfg = cfg["model"]
cfg["model"]["data_info"] = {
"in_size": model_cfg['embed_size'] if model_cfg['embed_size'] > 0 else data[0].ndata['feat'].shape[1],
"out_size": data.num_classes,
"num_nodes": data[0].num_nodes()
}
model = {{ model_class_name }}(**cfg["model"])
model = model.to(device)
loss = torch.nn.{{ user_cfg.general_pipeline.loss }}()
optimizer = torch.optim.{{ user_cfg.general_pipeline.optimizer.name }}(model.parameters(), **pipeline_cfg["optimizer"])
# train
test_acc = train(cfg, pipeline_cfg, device, data, model, optimizer, loss)
return test_acc
if __name__ == '__main__':
all_acc = []
num_runs = {{ user_cfg.general_pipeline.num_runs }}
for run in range(num_runs):
print(f'Run experiment #{run}')
test_acc = main()
print("Test Accuracy {:.4f}".format(test_acc))
all_acc.append(test_acc)
avg_acc = np.round(np.mean(all_acc), 6)
std_acc = np.round(np.std(all_acc), 6)
print(f'Accuracy across {num_runs} runs: {avg_acc} ± {std_acc}')
from .gen import *
\ No newline at end of file
from enum import Enum
from pathlib import Path
from typing import Optional, List, Union
from typing_extensions import Literal
from jinja2 import Template, ext
from pydantic import BaseModel, Field
import copy
import yaml
import typer
from ...utils.factory import PipelineFactory, NodeModelFactory, PipelineBase, DataFactory
from ...utils.base_model import extract_name, EarlyStopConfig, DeviceEnum
from ...utils.yaml_dump import deep_convert_dict, merge_comment
import ruamel.yaml
from ruamel.yaml.comments import CommentedMap
class SamplerConfig(BaseModel):
name: Literal["neighbor"]
fan_out: List[int] = [5, 10]
batch_size: int = Field(64, description="Batch size")
num_workers: int = 4
eval_batch_size: int = 1024
eval_num_workers: int = 4
class Config:
extra = 'forbid'
pipeline_comments = {
"num_epochs": "Number of training epochs",
"eval_period": "Interval epochs between evaluations",
"early_stop": {
"patience": "Steps before early stop",
"checkpoint_path": "Early stop checkpoint model file path"
},
"num_runs": "Number of experiments to run",
}
class NodepredNSPipelineCfg(BaseModel):
sampler: SamplerConfig = Field("neighbor")
early_stop: Optional[EarlyStopConfig] = EarlyStopConfig()
num_epochs: int = 200
eval_period: int = 5
optimizer: dict = {"name": "Adam", "lr": 0.005, "weight_decay": 0.0}
loss: str = "CrossEntropyLoss"
num_runs: int = 1
@PipelineFactory.register("nodepred-ns")
class NodepredNsPipeline(PipelineBase):
def __init__(self):
self.pipeline_name = "nodepred-ns"
self.default_cfg = None
@classmethod
def setup_user_cfg_cls(cls):
from ...utils.enter_config import UserConfig
class NodePredUserConfig(UserConfig):
eval_device: DeviceEnum = Field("cpu")
data: DataFactory.filter("nodepred-ns").get_pydantic_config() = Field(..., discriminator="name")
model : NodeModelFactory.get_pydantic_model_config() = Field(..., discriminator="name")
general_pipeline: NodepredNSPipelineCfg
cls.user_cfg_cls = NodePredUserConfig
@property
def user_cfg_cls(self):
return self.__class__.user_cfg_cls
def get_cfg_func(self):
def config(
data: DataFactory.filter("nodepred-ns").get_dataset_enum() = typer.Option(..., help="input data name"),
cfg: str = typer.Option(
"cfg.yml", help="output configuration path"),
model: NodeModelFactory.get_model_enum() = typer.Option(..., help="Model name"),
device: DeviceEnum = typer.Option(
"cpu", help="Device, cpu or cuda"),
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": "nodepred-ns",
"device": device,
"data": {"name": data.name},
"model": {"name": model.value},
"general_pipeline": {"sampler":{"name": "neighbor"}}
}
output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg)
comment_dict = {
"data": {
"split_ratio": 'Ratio to generate split masks, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset'
},
"general_pipeline": pipeline_comments,
"model": NodeModelFactory.get_constructor_doc_dict(model.value)
}
comment_dict = merge_comment(output_cfg, comment_dict)
yaml = ruamel.yaml.YAML()
yaml.dump(comment_dict, Path(cfg).open("w"))
print("Configuration file is generated at {}".format(
Path(cfg).absolute()))
return config
@staticmethod
def gen_script(user_cfg_dict):
file_current_dir = Path(__file__).resolve().parent
template_filename = file_current_dir / "nodepred-ns.jinja-py"
with open(template_filename, "r") as f:
template = Template(f.read())
pipeline_cfg = NodepredNSPipelineCfg(
**user_cfg_dict["general_pipeline"])
render_cfg = copy.deepcopy(user_cfg_dict)
model_code = NodeModelFactory.get_source_code(
user_cfg_dict["model"]["name"])
render_cfg["model_code"] = model_code
render_cfg["model_class_name"] = NodeModelFactory.get_model_class_name(
user_cfg_dict["model"]["name"])
render_cfg.update(DataFactory.get_generated_code_dict(
user_cfg_dict["data"]["name"], '**cfg["data"]'))
generated_user_cfg = copy.deepcopy(user_cfg_dict)
if len(generated_user_cfg["data"]) == 1:
generated_user_cfg.pop("data")
else:
generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg["model"].pop("name")
generated_user_cfg['general_pipeline']["optimizer"].pop("name")
if user_cfg_dict["data"].get("split_ratio", None) is not None:
render_cfg["data_initialize_code"] = "{}, split_ratio={}".format(render_cfg["data_initialize_code"], user_cfg_dict["data"]["split_ratio"])
if "split_ratio" in generated_user_cfg["data"]:
generated_user_cfg["data"].pop("split_ratio")
render_cfg["user_cfg_str"] = f"cfg = {str(generated_user_cfg)}"
render_cfg["user_cfg"] = user_cfg_dict
with open("output.py", "w") as f:
return template.render(**render_cfg)
@staticmethod
def get_description() -> str:
return "Node classification sampling pipeline"
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
from dgl.data import AsNodePredDataset
{{ data_import_code }}
{{ model_code }}
{% if user_cfg.early_stop %}
class EarlyStopping:
def __init__(self,
patience: int = -1,
checkpoint_path: str = 'checkpoint.pt'):
self.patience = patience
self.checkpoint_path = checkpoint_path
self.counter = 0
self.best_score = None
self.early_stop = False
def step(self, acc, model):
score = acc
if self.best_score is None:
self.best_score = score
self.save_checkpoint(model)
elif score < self.best_score:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(model)
self.counter = 0
return self.early_stop
def save_checkpoint(self, model):
'''Save model when validation loss decreases.'''
torch.save(model.state_dict(), self.checkpoint_path)
def load_checkpoint(self, model):
model.load_state_dict(torch.load(self.checkpoint_path))
{% endif %}
def load_subtensor(nfeat, labels, seeds, input_nodes, device):
"""
Extracts features and labels for a subset of nodes
"""
batch_inputs = nfeat[input_nodes].to(device)
batch_labels = labels[seeds].to(device)
return batch_inputs, batch_labels
def evaluate(model, g, nfeat, labels, val_nid, eval_device):
"""
Evaluate the model on the validation set specified by ``val_nid``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_nid : the node Ids for validation.
device : The GPU device to evaluate on.
"""
model.eval()
eval_model = model.to(eval_device)
g = g.to(eval_device)
nfeat = nfeat.to(eval_device)
with torch.no_grad():
y = eval_model(g, nfeat)
model.train()
return accuracy(y[val_nid], labels[val_nid].to(y.device))
def accuracy(logits, labels):
_, indices = torch.max(logits, dim=1)
correct = torch.sum(indices == labels)
return correct.item() * 1.0 / len(labels)
def train(cfg, pipeline_cfg, device, data, model, optimizer, loss_fcn):
g = data[0] # Only train on the first graph
g = dgl.remove_self_loop(g)
g = dgl.add_self_loop(g)
train_g = val_g = test_g = g
train_nfeat = val_nfeat = test_nfeat = train_g.ndata['feat']
train_labels = val_labels = test_labels = train_g.ndata['label']
train_nid = torch.nonzero(train_g.ndata['train_mask'], as_tuple=True)[0]
val_nid = torch.nonzero(val_g.ndata['val_mask'], as_tuple=True)[0]
test_nid = torch.nonzero(~(test_g.ndata['train_mask'] | test_g.ndata['val_mask']), as_tuple=True)[0]
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in pipeline_cfg["sampler"]["fan_out"]])
dataloader = dgl.dataloading.NodeDataLoader(
train_g,
train_nid,
sampler,
device=device,
batch_size=pipeline_cfg["sampler"]["batch_size"],
shuffle=True,
drop_last=False,
num_workers=pipeline_cfg["sampler"]["num_workers"])
{% if user_cfg.early_stop %}
stopper = EarlyStopping(pipeline_cfg['patience'], pipeline_cfg['checkpoint_path'])
{% endif %}
val_acc = 0.
for epoch in range(pipeline_cfg['num_epochs']):
model.train()
model = model.to(device)
for step, (input_nodes, seeds, subgs) in enumerate(dataloader):
# Load the input features as well as output labels
batch_inputs, batch_labels = load_subtensor(train_nfeat, train_labels,
seeds, input_nodes, device)
subgs = [subg.int().to(device) for subg in subgs]
batch_pred = model.forward_block(subgs, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_acc = accuracy(batch_pred, batch_labels)
print("Epoch {:05d} | Step {:05d} | Loss {:.4f} | TrainAcc {:.4f}".
format(epoch, step, loss.item(), train_acc))
if epoch % pipeline_cfg["eval_period"] == 0 and epoch != 0:
val_acc = evaluate(model, val_g, val_nfeat, val_labels, val_nid, cfg["eval_device"])
print('Eval Acc {:.4f}'.format(val_acc))
{% if user_cfg.early_stop %}
if stopper.step(val_acc, model):
break
{% endif %}
{% if user_cfg.early_stop %}
stopper.load_checkpoint(model)
{% endif %}
model.eval()
with torch.no_grad():
test_acc = evaluate(model, test_g, test_nfeat, test_labels, test_nid, cfg["eval_device"])
return test_acc
def main():
{{ user_cfg_str }}
device = cfg['device']
pipeline_cfg = cfg["general_pipeline"]
# load data
data = AsNodePredDataset({{data_initialize_code}})
# create model
model_cfg = cfg["model"]
cfg["model"]["data_info"] = {
"in_size": model_cfg['embed_size'] if model_cfg['embed_size'] > 0 else data[0].ndata['feat'].shape[1],
"out_size": data.num_classes,
"num_nodes": data[0].num_nodes()
}
model = {{ model_class_name }}(**cfg["model"])
model = model.to(device)
loss = torch.nn.{{ user_cfg.general_pipeline.loss }}()
optimizer = torch.optim.{{ user_cfg.general_pipeline.optimizer.name }}(model.parameters(), **pipeline_cfg["optimizer"])
# train
test_acc = train(cfg, pipeline_cfg, device, data, model, optimizer, loss)
return test_acc
if __name__ == '__main__':
all_acc = []
num_runs = {{ user_cfg.general_pipeline.num_runs }}
for run in range(num_runs):
print(f'Run experiment #{run}')
test_acc = main()
print("Test Accuracy {:.4f}".format(test_acc))
all_acc.append(test_acc)
avg_acc = np.round(np.mean(all_acc), 6)
std_acc = np.round(np.std(all_acc), 6)
print(f'Accuracy across {num_runs} runs: {avg_acc} ± {std_acc}')
from .factory import *
\ No newline at end of file
import enum
from typing import Optional
from jinja2 import Template
from enum import Enum, IntEnum
import copy
from pydantic import create_model, BaseModel as PydanticBaseModel, Field, create_model
class DeviceEnum(str, Enum):
cpu = "cpu"
cuda = "cuda"
class DGLBaseModel(PydanticBaseModel):
class Config:
extra = "allow"
use_enum_values = True
@classmethod
def with_fields(cls, model_name, **field_definitions):
return create_model(model_name, __base__=cls, **field_definitions)
def get_literal_value(type_):
if hasattr(type_, "__values__"):
name = type_.__values__[0]
elif hasattr(type_, "__args__"):
name = type_.__args__[0]
return name
def extract_name(union_type):
name_dict = {}
for t in union_type.__args__:
type_ = t.__fields__['name'].type_
name = get_literal_value(type_)
name_dict[name] = name
return enum.Enum("Choice", name_dict)
class EarlyStopConfig(DGLBaseModel):
patience: int = 20
checkpoint_path: str = "checkpoint.pth"
import torch
class EarlyStopping:
def __init__(self,
patience: int = -1,
checkpoint_path: str = 'checkpoint.pt'):
self.patience = patience
self.checkpoint_path = checkpoint_path
self.counter = 0
self.best_score = None
self.early_stop = False
def step(self, acc, model):
score = acc
if self.best_score is None:
self.best_score = score
self.save_checkpoint(model)
elif score < self.best_score:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(model)
self.counter = 0
return self.early_stop
def save_checkpoint(self, model):
'''Save model when validation loss decreases.'''
torch.save(model.state_dict(), self.checkpoint_path)
def load_checkpoint(self, model):
model.load_state_dict(torch.load(self.checkpoint_path))
\ No newline at end of file
from typing import Optional
import yaml
import jinja2
from jinja2 import Template
from enum import Enum, IntEnum
import copy
from pydantic import create_model, BaseModel as PydanticBaseModel, Field
# from ..pipeline import nodepred, nodepred_sample
from .factory import ModelFactory, PipelineFactory, DataFactory
from .base_model import DGLBaseModel
class PipelineConfig(DGLBaseModel):
node_embed_size: Optional[int] = -1
early_stop: Optional[dict]
num_epochs: int = 200
eval_period: int = 5
optimizer: dict = {"name": "Adam", "lr": 0.005}
loss: str = "CrossEntropyLoss"
class UserConfig(DGLBaseModel):
version: Optional[str] = "0.0.1"
pipeline_name: PipelineFactory.get_pipeline_enum()
device: str = "cpu"
# general_pipeline: PipelineConfig = PipelineConfig()
\ No newline at end of file
import enum
import logging
from typing import Callable, Dict, Union, List, Tuple, Optional
from typing_extensions import Literal
from pathlib import Path
from abc import ABC, abstractmethod, abstractstaticmethod
from .base_model import DGLBaseModel
import yaml
from pydantic import create_model_from_typeddict, create_model, Field
from dgl.dataloading.negative_sampler import GlobalUniform, PerSourceUniform
import inspect
from numpydoc import docscrape
logger = logging.getLogger(__name__)
ALL_PIPELINE = ["nodepred", "nodepred-ns", "linkpred"]
class PipelineBase(ABC):
@abstractmethod
def __init__(self) -> None:
super().__init__()
@abstractmethod
def get_cfg_func(self):
pass
@abstractstaticmethod
def gen_script(user_cfg_dict: dict):
pass
@abstractstaticmethod
def get_description() -> str:
pass
class DataFactoryClass:
def __init__(self):
self.registry = {}
self.pipeline_name = None
self.pipeline_allowed = {}
def register(self,
name: str,
import_code: str,
class_name: str,
allowed_pipeline: List[str],
extra_args={}):
self.registry[name] = {
"name": name,
"import_code": import_code,
"class_name": class_name,
"extra_args": extra_args
}
for pipeline in allowed_pipeline:
if pipeline in self.pipeline_allowed:
self.pipeline_allowed[pipeline].append(name)
else:
self.pipeline_allowed[pipeline] = [name]
return self
def get_dataset_enum(self):
enum_class = enum.Enum(
"DatasetName", {v["name"]: k for k, v in self.registry.items()})
return enum_class
def get_dataset_classname(self, name):
return self.registry[name]["class_name"]
def get_constructor_arg_type(self, model_name):
sigs = inspect.signature(self.registry[model_name].__init__)
type_annotation_dict = {}
for k, param in dict(sigs.parameters).items():
type_annotation_dict[k] = param.annotation
return type_annotation_dict
def get_pydantic_config(self):
type_annotation_dict = {}
dataset_list = []
for k, v in self.registry.items():
dataset_name = v["name"]
type_annotation_dict = v["extra_args"]
if "name" in type_annotation_dict:
del type_annotation_dict["name"]
base = self.get_base_class(dataset_name, self.pipeline_name)
dataset_list.append(create_model(
f'{dataset_name}Config', **type_annotation_dict, __base__=base))
output = dataset_list[0]
for d in dataset_list[1:]:
output = Union[output, d]
return output
def get_import_code(self, name):
return self.registry[name]["import_code"]
def get_import_code(self, name):
return self.registry[name]["import_code"]
def get_extra_args(self, name):
return self.registry[name]["extra_args"]
def get_class_name(self, name):
return self.registry[name]["class_name"]
def get_generated_code_dict(self, name, args='**cfg["data"]'):
d = {}
d["data_import_code"] = self.registry[name]["import_code"]
data_initialize_code = self.registry[name]["class_name"]
extra_args_dict = self.registry[name]["extra_args"]
if len(extra_args_dict) > 0:
data_initialize_code = data_initialize_code.format('**cfg["data"]')
d["data_initialize_code"] = data_initialize_code
return d
def filter(self, pipeline_name):
allowed_name = self.pipeline_allowed[pipeline_name]
new_registry = {k: v for k,v in self.registry.items() if k in allowed_name}
d = DataFactoryClass()
d.registry = new_registry
d.pipeline_name = pipeline_name
return d
@staticmethod
def get_base_class(dataset_name, pipeline_name):
if pipeline_name == "linkpred":
class EdgeBase(DGLBaseModel):
name: Literal[dataset_name]
split_ratio: Optional[Tuple[float, float, float]] = None
neg_ratio: Optional[int] = None
return EdgeBase
else:
class NodeBase(DGLBaseModel):
name: Literal[dataset_name]
split_ratio: Optional[Tuple[float, float, float]] = None
return NodeBase
DataFactory = DataFactoryClass()
DataFactory.register(
"cora",
import_code="from dgl.data import CoraGraphDataset",
class_name="CoraGraphDataset()",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"citeseer",
import_code="from dgl.data import CiteseerGraphDataset",
class_name="CiteseerGraphDataset()",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"pubmed",
import_code="from dgl.data import PubmedGraphDataset",
class_name="PubmedGraphDataset()",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"csv",
import_code="from dgl.data import DGLCSVDataset",
extra_args={"data_path": "./"},
class_name="DGLCSVDataset({})",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"reddit",
import_code="from dgl.data import RedditDataset",
class_name="RedditDataset()",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"co-buy-computer",
import_code="from dgl.data import AmazonCoBuyComputerDataset",
class_name="AmazonCoBuyComputerDataset()",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"ogbn-arxiv",
import_code="from ogb.nodeproppred import DglNodePropPredDataset",
extra_args={},
class_name="DglNodePropPredDataset('ogbn-arxiv')",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"ogbn-products",
import_code="from ogb.nodeproppred import DglNodePropPredDataset",
extra_args={},
class_name="DglNodePropPredDataset('ogbn-products')",
allowed_pipeline=["nodepred", "nodepred-ns", "linkpred"])
DataFactory.register(
"ogbl-collab",
import_code="from ogb.linkproppred import DglLinkPropPredDataset",
extra_args={},
class_name="DglLinkPropPredDataset('ogbl-collab')",
allowed_pipeline=["linkpred"])
DataFactory.register(
"ogbl-citation2",
import_code="from ogb.linkproppred import DglLinkPropPredDataset",
extra_args={},
class_name="DglLinkPropPredDataset('ogbl-citation2')",
allowed_pipeline=["linkpred"])
class PipelineFactory:
""" The factory class for creating executors"""
registry: Dict[str, PipelineBase] = {}
default_config_registry = {}
""" Internal registry for available executors """
@classmethod
def register(cls, name: str) -> Callable:
def inner_wrapper(wrapped_class) -> Callable:
if name in cls.registry:
logger.warning(
'Executor %s already exists. Will replace it', name)
cls.registry[name] = wrapped_class()
return wrapped_class
return inner_wrapper
@classmethod
def register_default_config_generator(cls, name: str) -> Callable:
def inner_wrapper(wrapped_class) -> Callable:
if name in cls.registry:
logger.warning(
'Executor %s already exists. Will replace it', name)
cls.default_config_registry[name] = wrapped_class
return wrapped_class
return inner_wrapper
@classmethod
def call_default_config_generator(cls, generator_name, model_name, dataset_name):
return cls.default_config_registry[generator_name](model_name, dataset_name)
@classmethod
def call_generator(cls, generator_name, cfg):
return cls.registry[generator_name](cfg)
@classmethod
def get_pipeline_enum(cls):
enum_class = enum.Enum(
"PipelineName", {k: k for k, v in cls.registry.items()})
return enum_class
model_dir = Path(__file__).parent.parent / "model"
class ModelFactory:
""" The factory class for creating executors"""
def __init__(self):
self.registry = {}
self.code_registry = {}
""" Internal registry for available executors """
def get_model_enum(self):
enum_class = enum.Enum(
"ModelName", {k: k for k, v in self.registry.items()})
return enum_class
def register(self, model_name: str) -> Callable:
def inner_wrapper(wrapped_class) -> Callable:
if model_name in self.registry:
logger.warning(
'Executor %s already exists. Will replace it', model_name)
self.registry[model_name] = wrapped_class
# code_filename = model_dir / filename
code_filename = Path(inspect.getfile(wrapped_class))
self.code_registry[model_name] = code_filename.read_text()
return wrapped_class
return inner_wrapper
def get_source_code(self, model_name):
return self.code_registry[model_name]
def get_constructor_default_args(self, model_name):
sigs = inspect.signature(self.registry[model_name].__init__)
default_map = {}
for k, param in dict(sigs.parameters).items():
default_map[k] = param.default
return default_map
def get_pydantic_constructor_arg_type(self, model_name: str):
model_enum = self.get_model_enum()
arg_dict = self.get_constructor_default_args(model_name)
type_annotation_dict = {}
# type_annotation_dict["name"] = Literal[""]
exempt_keys = ['self', 'in_size', 'out_size', 'data_info']
for k, param in arg_dict.items():
if k not in exempt_keys:
type_annotation_dict[k] = arg_dict[k]
class Base(DGLBaseModel):
name: Literal[model_name]
return create_model(f'{model_name.upper()}ModelConfig', **type_annotation_dict, __base__=Base)
def get_constructor_doc_dict(self, name):
model_class = self.registry[name]
docs = inspect.getdoc(model_class.__init__)
param_docs = docscrape.NumpyDocString(docs)
param_docs_dict = {}
for param in param_docs["Parameters"]:
param_docs_dict[param.name] = param.desc[0]
return param_docs_dict
def get_pydantic_model_config(self):
model_list = []
for k in self.registry:
model_list.append(self.get_pydantic_constructor_arg_type(k))
output = model_list[0]
for m in model_list[1:]:
output = Union[output, m]
return output
def get_model_class_name(self, model_name):
return self.registry[model_name].__name__
def get_constructor_arg_type(self, model_name):
sigs = inspect.signature(self.registry[model_name].__init__)
type_annotation_dict = {}
for k, param in dict(sigs.parameters).items():
type_annotation_dict[k] = param.annotation
return type_annotation_dict
class SamplerFactory:
""" The factory class for creating executors"""
def __init__(self):
self.registry = {}
def get_model_enum(self):
enum_class = enum.Enum(
"NegativeSamplerName", {k: k for k, v in self.registry.items()})
return enum_class
def register(self, sampler_name: str) -> Callable:
def inner_wrapper(wrapped_class) -> Callable:
if sampler_name in self.registry:
logger.warning(
'Sampler %s already exists. Will replace it', sampler_name)
self.registry[sampler_name] = wrapped_class
return wrapped_class
return inner_wrapper
def get_constructor_default_args(self, sampler_name):
sigs = inspect.signature(self.registry[sampler_name].__init__)
default_map = {}
for k, param in dict(sigs.parameters).items():
default_map[k] = param.default
return default_map
def get_pydantic_constructor_arg_type(self, sampler_name: str):
model_enum = self.get_model_enum()
arg_dict = self.get_constructor_default_args(sampler_name)
type_annotation_dict = {}
# type_annotation_dict["name"] = Literal[""]
exempt_keys = ['self', 'in_size', 'out_size', 'redundancy']
for k, param in arg_dict.items():
if k not in exempt_keys or param is None:
if k == 'k' or k == 'redundancy':
type_annotation_dict[k] = 3
else:
type_annotation_dict[k] = arg_dict[k]
class Base(DGLBaseModel):
name: Literal[sampler_name]
return create_model(f'{sampler_name.upper()}SamplerConfig', **type_annotation_dict, __base__=Base)
def get_pydantic_model_config(self):
model_list = []
for k in self.registry:
model_list.append(self.get_pydantic_constructor_arg_type(k))
output = model_list[0]
for m in model_list[1:]:
output = Union[output, m]
return output
def get_model_class_name(self, model_name):
return self.registry[model_name].__name__
def get_constructor_arg_type(self, model_name):
sigs = inspect.signature(self.registry[model_name].__init__)
type_annotation_dict = {}
for k, param in dict(sigs.parameters).items():
type_annotation_dict[k] = param.annotation
return type_annotation_dict
def get_constructor_doc_dict(self, name):
model_class = self.registry[name]
docs = inspect.getdoc(model_class)
param_docs = docscrape.NumpyDocString(docs)
param_docs_dict = {}
for param in param_docs["Parameters"]:
param_docs_dict[param.name] = param.desc[0]
return param_docs_dict
NegativeSamplerFactory = SamplerFactory()
NegativeSamplerFactory.register("uniform")(GlobalUniform)
NegativeSamplerFactory.register("persource")(PerSourceUniform)
NodeModelFactory = ModelFactory()
EdgeModelFactory = ModelFactory()
from ruamel.yaml.comments import CommentedMap
def deep_convert_dict(layer):
to_ret = layer
if isinstance(layer, dict):
to_ret = CommentedMap(layer)
try:
for key, value in to_ret.items():
to_ret[key] = deep_convert_dict(value)
except AttributeError:
pass
return to_ret
import collections.abc
def merge_comment(d, comment_dict, column=30):
for k, v in comment_dict.items():
if isinstance(v, collections.abc.Mapping):
d[k] = merge_comment(d.get(k, CommentedMap()), v)
else:
d.yaml_add_eol_comment(v, key=k, column=column)
return d
\ No newline at end of file
version: 0.0.1
pipeline_name: linkpred
device: cpu
data:
name: ogbl-citation2
split_ratio: # List of float, e.q. [0.8, 0.1, 0.1]. Split ratios for training, validation and test sets. Must sum to one. Leave blank to use builtin split in original dataset
neg_ratio: # Int, e.q. 2. Indicate how much negative samples to be sampled per positive samples. Leave blank to use builtin split in original dataset
node_model:
name: sage
embed_size: -1 # The dimension of created embedding table. -1 means using original node embedding
hidden_size: 16 # Hidden size.
num_layers: 1 # Number of hidden layers.
activation: relu
dropout: 0.5 # Dropout rate.
aggregator_type: gcn # Aggregator type to use (``mean``, ``gcn``, ``pool``, ``lstm``).
edge_model:
name: ele
hidden_size: 64 # Hidden size.
num_layers: 2 # Number of hidden layers.
bias: true # Whether to use bias in the linaer layer.
neg_sampler:
name: persource
k: 3 # The number of negative samples per edge.
general_pipeline:
hidden_size: 256 # The intermediate hidden size between node model and edge model
eval_batch_size: 32769 # Edge batch size when evaluating
train_batch_size: 32769 # Edge batch size when training
num_epochs: 200 # Number of training epochs
eval_period: 5 # Interval epochs between evaluations
optimizer:
name: Adam
lr: 0.005
loss: BCELoss
num_runs: 1 # Number of experiments to run
version: 0.0.1
pipeline_name: linkpred
device: cpu
data:
name: ogbl-collab
split_ratio: # List of float, e.q. [0.8, 0.1, 0.1]. Split ratios for training, validation and test sets. Must sum to one. Leave blank to use builtin split in original dataset
neg_ratio: # Int, e.q. 2. Indicate how much negative samples to be sampled per positive samples. Leave blank to use builtin split in original dataset
node_model:
name: sage
embed_size: -1 # The dimension of created embedding table. -1 means using original node embedding
hidden_size: 16 # Hidden size.
num_layers: 1 # Number of hidden layers.
activation: relu
dropout: 0.5 # Dropout rate.
aggregator_type: gcn # Aggregator type to use (``mean``, ``gcn``, ``pool``, ``lstm``).
edge_model:
name: ele
hidden_size: 64 # Hidden size.
num_layers: 2 # Number of hidden layers.
bias: true # Whether to use bias in the linaer layer.
neg_sampler:
name: persource
k: 3 # The number of negative samples per edge.
general_pipeline:
hidden_size: 256 # The intermediate hidden size between node model and edge model
eval_batch_size: 32769 # Edge batch size when evaluating
train_batch_size: 32769 # Edge batch size when training
num_epochs: 200 # Number of training epochs
eval_period: 5 # Interval epochs between evaluations
optimizer:
name: Adam
lr: 0.005
loss: BCELoss
num_runs: 1 # Number of experiments to run
version: 0.0.1
pipeline_name: linkpred
device: cuda
data:
name: cora
split_ratio: [0.8, 0.1, 0.1] # List of float, e.q. [0.8, 0.1, 0.1]. Split ratios for training, validation and test sets. Must sum to one. Leave blank to use builtin split in original dataset
neg_ratio: 3 # Int, e.q. 2. Indicate how much negative samples to be sampled per positive samples. Leave blank to use builtin split in original dataset
node_model:
name: sage
embed_size: -1 # The dimension of created embedding table. -1 means using original node embedding
hidden_size: 32 # Hidden size.
num_layers: 2 # Number of hidden layers.
activation: relu
dropout: 0.5 # Dropout rate.
aggregator_type: gcn # Aggregator type to use (``mean``, ``gcn``, ``pool``, ``lstm``).
edge_model:
name: ele
hidden_size: 64 # Hidden size.
num_layers: 2 # Number of hidden layers.
bias: true # Whether to use bias in the linaer layer.
neg_sampler:
name: persource
k: 3 # The number of negative samples per edge.
general_pipeline:
hidden_size: 256 # The intermediate hidden size between node model and edge model
eval_batch_size: 32769 # Edge batch size when evaluating
train_batch_size: 32769 # Edge batch size when training
num_epochs: 200 # Number of training epochs
eval_period: 5 # Interval epochs between evaluations
optimizer:
name: Adam
lr: 0.005
loss: BCELoss
num_runs: 1 # Number of experiments to run
# Accuracy across 5 runs: 0.593288 ± 0.006103
version: 0.0.1
pipeline_name: nodepred-ns
device: 'cuda:0'
eval_device: 'cpu'
data:
name: ogbn-arxiv
model:
name: gcn
embed_size: -1 # The dimension of created embedding table. -1 means using original node embedding
hidden_size: 256 # Hidden size.
num_layers: 2 # Number of layers.
norm: both # GCN normalization type. Can be 'both', 'right', 'left', 'none'.
activation: relu # Activation function.
dropout: 0.5 # Dropout rate.
use_edge_weight: false # If true, scale the messages by edge weights.
general_pipeline:
sampler:
name: neighbor
fan_out:
- 5
- 10
batch_size: 1024
num_workers: 4
eval_batch_size: 10240
eval_num_workers: 4
num_epochs: 20 # Number of training epochs
eval_period: 1 # Interval epochs between evaluations
optimizer:
name: Adam
lr: 0.005
weight_decay: 0.0
loss: CrossEntropyLoss
num_runs: 5
# Accuracy across 1 runs: 0.796911
version: 0.0.1
pipeline_name: nodepred-ns
device: cuda
eval_device: cpu
data:
name: ogbn-products
split_ratio: # Ratio to generate split masks, for example set to [0.8, 0.1, 0.1] for 80% train/10% val/10% test. Leave blank to use builtin split in original dataset
model:
name: sage
embed_size: -1 # The dimension of created embedding table. -1 means using original node embedding
hidden_size: 256 # Hidden size.
num_layers: 3 # Number of hidden layers.
activation: relu
dropout: 0.5 # Dropout rate.
aggregator_type: gcn # Aggregator type to use (``mean``, ``gcn``, ``pool``, ``lstm``).
general_pipeline:
sampler:
name: neighbor
fan_out:
- 5
- 10
- 15
batch_size: 1000
num_workers: 4
eval_batch_size: 10000
eval_num_workers: 4
early_stop:
patience: 20 # Steps before early stop
checkpoint_path: checkpoint.pth # Early stop checkpoint model file path
num_epochs: 20 # Number of training epochs
eval_period: 5 # Interval epochs between evaluations
optimizer:
name: Adam
lr: 0.005
weight_decay: 0.0
loss: CrossEntropyLoss
num_runs: 5 # Number of experiments to run
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment