Unverified Commit 31e4a89b authored by Mufei Li's avatar Mufei Li Committed by GitHub
Browse files

[DGL-Go] Inference for Node Prediction Pipeline (full & ns) (#4095)

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update

* Update
parent 69226588
......@@ -10,7 +10,23 @@ experiments.
## Installation and get started
DGL-Go requires DGL v0.8+ so please make sure DGL is updated properly.
Install DGL-Go by `pip install dglgo` and type `dgl` in your console:
### Install the latest stable version
```
pip install dglgo
```
### Install from source for experimental features
```
python setup.py install
```
### Get started
Type `dgl` in your console:
```
Usage: dgl [OPTIONS] COMMAND [ARGS]...
......@@ -63,14 +79,16 @@ generate a configurate file `cora_sage.yaml` which includes:
* Training hyperparameters (e.g., learning rate, loss function, etc.).
Different choices of task, model and datasets may give very different options,
so DGL-Go also adds a comment for what each option does in the file.
so DGL-Go also adds a comment per option for explanation.
At this point you can also change options to explore optimization potentials.
Below shows the configuration file generated by the command above.
The snippet below shows the configuration file generated by the command above.
```yaml
version: 0.0.1
pipeline_name: nodepred
pipeline:
name: nodepred
mode: train
device: cpu
data:
name: cora
......@@ -94,7 +112,7 @@ general_pipeline:
lr: 0.01
weight_decay: 0.0005
loss: CrossEntropyLoss
save_path: model.pth # Path to save the model
save_path: results # Directory to save the experiment results
num_runs: 1 # Number of experiments to run
```
......@@ -216,14 +234,18 @@ class GraphSAGE(nn.Module):
for i in range(num_layers):
in_hidden = hidden_size if i > 0 else in_size
out_hidden = hidden_size if i < num_layers - 1 else data_info["out_size"]
self.layers.append(dgl.nn.SAGEConv( in_hidden, out_hidden, aggregator_type))
out_hidden = hidden_size if i < num_layers - \
1 else data_info["out_size"]
self.layers.append(
dgl.nn.SAGEConv(
in_hidden,
out_hidden,
aggregator_type))
def forward(self, graph, node_feat, edge_feat=None):
if self.embed_size > 0:
dgl_warning(
"The embedding for node feature is used, and input node_feat is ignored, due to the provided embed_size.",
norepeat=True)
"The embedding for node feature is used, and input node_feat is ignored, due to the provided embed_size.")
h = self.embed.weight
else:
h = node_feat
......@@ -272,6 +294,7 @@ def train(cfg, pipeline_cfg, device, data, model, optimizer, loss_fcn):
format(epoch, loss.item(), train_acc, val_acc))
stopper.load_checkpoint(model)
stopper.close()
model.eval()
with torch.no_grad():
......@@ -280,40 +303,10 @@ def train(cfg, pipeline_cfg, device, data, model, optimizer, loss_fcn):
return test_acc
def main():
cfg = {
'version': '0.0.1',
'device': 'cuda:0',
'model': {
'embed_size': -1,
'hidden_size': 16,
'num_layers': 2,
'activation': 'relu',
'dropout': 0.5,
'aggregator_type': 'gcn'},
'general_pipeline': {
'early_stop': {
'patience': 100,
'checkpoint_path': 'checkpoint.pth'},
'num_epochs': 200,
'eval_period': 5,
'optimizer': {
'lr': 0.01,
'weight_decay': 0.0005},
'loss': 'CrossEntropyLoss',
'save_path': 'model.pth',
'num_runs': 10}}
def main(run, cfg, data):
device = cfg['device']
pipeline_cfg = cfg['general_pipeline']
# load data
data = AsNodePredDataset(CoraGraphDataset())
# create model
model_cfg = cfg["model"]
cfg["model"]["data_info"] = {
"in_size": model_cfg['embed_size'] if model_cfg['embed_size'] > 0 else data[0].ndata['feat'].shape[1],
"out_size": data.num_classes,
"num_nodes": data[0].num_nodes()
}
model = GraphSAGE(**cfg["model"])
model = model.to(device)
loss = torch.nn.CrossEntropyLoss()
......@@ -322,10 +315,36 @@ def main():
**pipeline_cfg["optimizer"])
# train
test_acc = train(cfg, pipeline_cfg, device, data, model, optimizer, loss)
torch.save(model.state_dict(), pipeline_cfg["save_path"])
torch.save({'cfg': cfg, 'model': model.state_dict()},
os.path.join(pipeline_cfg["save_path"], 'run_{}.pth'.format(run)))
return test_acc
...
if __name__ == '__main__':
...
# load data
data = AsNodePredDataset(CoraGraphDataset())
model_cfg = cfg["model"]
cfg["model"]["data_info"] = {
"in_size": model_cfg['embed_size'] if model_cfg['embed_size'] > 0 else data[0].ndata['feat'].shape[1],
"out_size": data.num_classes,
"num_nodes": data[0].num_nodes()
}
os.makedirs(cfg['general_pipeline']["save_path"])
all_acc = []
num_runs = 1
for run in range(num_runs):
print(f'Run experiment #{run}')
test_acc = main(run, cfg, data)
print("Test Accuracy {:.4f}".format(test_acc))
all_acc.append(test_acc)
avg_acc = np.round(np.mean(all_acc), 6)
std_acc = np.round(np.std(all_acc), 6)
print(f'Accuracy across {num_runs} runs: {avg_acc} ± {std_acc}')
```
You can see that everything is collected into one Python script which includes the
......@@ -396,6 +415,15 @@ all the available pipelines.
A: Currently not supported. We will enable this feature soon. Please stay tuned!
**Q: After training a model on some dataset, how can I apply it to another one?**
A: The `save_path` option in the generated configuration file allows you to specify where
to save the model after training. You can then modify the script generated by `dgl export`
to load the the model checkpoint and evaluate it on another dataset.
A: The `save_path` option in the generated configuration file allows you to specify the directory to save the experiment results. After training, `{save_path}/run_{i}.pth` will be the checkpoint for the i-th run, consisting of the training configuration and trained model state dict. You can then use `dgl apply` as follows.
```
dgl configure-apply X --data Y --cpt {save_path}/run_{i}.pth --cfg Z
dgl apply --cfg Z
```
- `X` is the pipeline name as in `dgl configure`.
- `Y` is the dataset to apply and can be omitted if you are applying the trained model to the training dataset.
- `Z` is the configuration file and a default value will be used if not specified.
You can also use `dgl export --cfg Z` to generate a python script for further modification.
from .nodepred import ApplyNodepredPipeline
from .nodepred_sample import ApplyNodepredNsPipeline
import ruamel.yaml
import torch
import typer
from copy import deepcopy
from jinja2 import Template
from pathlib import Path
from pydantic import Field
from typing import Optional
from ...utils.factory import ApplyPipelineFactory, PipelineBase, DataFactory, NodeModelFactory
from ...utils.yaml_dump import deep_convert_dict, merge_comment
@ApplyPipelineFactory.register("nodepred")
class ApplyNodepredPipeline(PipelineBase):
def __init__(self):
self.pipeline = {
"name": "nodepred",
"mode": "apply"
}
@classmethod
def setup_user_cfg_cls(cls):
from ...utils.enter_config import UserConfig
class ApplyNodePredUserConfig(UserConfig):
data: DataFactory.filter("nodepred").get_pydantic_config() = Field(..., discriminator="name")
cls.user_cfg_cls = ApplyNodePredUserConfig
@property
def user_cfg_cls(self):
return self.__class__.user_cfg_cls
def get_cfg_func(self):
def config(
data: DataFactory.filter("nodepred").get_dataset_enum() = typer.Option(None, help="input data name"),
cfg: Optional[str] = typer.Option(None, help="output configuration file path"),
cpt: str = typer.Option(..., help="input checkpoint file path")
):
# Training configuration
train_cfg = torch.load(cpt)["cfg"]
if data is None:
print("data is not specified, use the training dataset")
data = train_cfg["data_name"]
else:
data = data.name
if cfg is None:
cfg = "_".join(["apply", "nodepred", data, train_cfg["model_name"]]) + ".yaml"
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": self.pipeline["name"],
"pipeline_mode": self.pipeline["mode"],
"device": train_cfg["device"],
"data": {"name": data},
"cpt_path": cpt,
"general_pipeline": {"save_path": "apply_results"}
}
output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg)
# Not applicable for inference
output_cfg['data'].pop('split_ratio')
comment_dict = {
"device": "Torch device name, e.g., cpu or cuda or cuda:0",
"cpt_path": "Path to the checkpoint file",
"general_pipeline": {"save_path": "Directory to save the inference results"}
}
comment_dict = merge_comment(output_cfg, comment_dict)
yaml = ruamel.yaml.YAML()
yaml.dump(comment_dict, Path(cfg).open("w"))
print("Configuration file is generated at {}".format(Path(cfg).absolute()))
return config
@classmethod
def gen_script(cls, user_cfg_dict):
# Check validation
cls.setup_user_cfg_cls()
cls.user_cfg_cls(**user_cfg_dict)
# Training configuration
train_cfg = torch.load(user_cfg_dict["cpt_path"])["cfg"]
# Dict for code rendering
render_cfg = deepcopy(user_cfg_dict)
model_name = train_cfg["model_name"]
model_code = NodeModelFactory.get_source_code(model_name)
render_cfg["model_code"] = model_code
render_cfg["model_class_name"] = NodeModelFactory.get_model_class_name(model_name)
render_cfg.update(DataFactory.get_generated_code_dict(user_cfg_dict["data"]["name"]))
# Dict for defining cfg in the rendered code
generated_user_cfg = deepcopy(user_cfg_dict)
generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg.pop("pipeline_mode")
# model arch configuration
generated_user_cfg["model"] = train_cfg["model"]
render_cfg["user_cfg_str"] = f"cfg = {str(generated_user_cfg)}"
render_cfg["user_cfg"] = user_cfg_dict
file_current_dir = Path(__file__).resolve().parent
with open(file_current_dir / "nodepred.jinja-py", "r") as f:
template = Template(f.read())
return template.render(**render_cfg)
@staticmethod
def get_description() -> str:
return "Node classification pipeline for inference"
import torch
import dgl
import os
import csv
from dgl.data import AsNodePredDataset
{{ data_import_code }}
{{ model_code }}
def infer(device, data, model):
g = data[0] # Only infer on the first graph
g = dgl.remove_self_loop(g)
g = dgl.add_self_loop(g)
g = g.to(device)
node_feat = g.ndata.get('feat', None)
edge_feat = g.edata.get('feat', None)
model = model.to(device)
model.eval()
with torch.no_grad():
logits = model(g, node_feat, edge_feat)
return logits
def main():
{{ user_cfg_str }}
device = cfg['device']
if not torch.cuda.is_available():
device = 'cpu'
# load data
data = AsNodePredDataset({{data_initialize_code}})
# validation
if cfg['model']['embed_size'] > 0:
model_num_nodes = cfg['model']['data_info']['num_nodes']
data_num_nodes = data[0].num_nodes()
assert model_num_nodes == data_num_nodes, \
'Training and inference need to be on the same dataset when node embeddings were learned from scratch'
else:
model_in_size = cfg['model']['data_info']['in_size']
data_in_size = data[0].ndata['feat'].shape[1]
assert model_in_size == data_in_size, \
'Expect the training data and inference data to have the same number of input node \
features, got {:d} and {:d}'.format(model_in_size, data_in_size)
model = {{ model_class_name }}(**cfg['model'])
model.load_state_dict(torch.load(cfg['cpt_path'], map_location='cpu')['model'])
logits = infer(device, data, model)
pred = logits.argmax(dim=1).cpu()
# Dump the results
os.makedirs(cfg['general_pipeline']["save_path"])
file_path = os.path.join(cfg['general_pipeline']["save_path"], 'output.csv')
with open(file_path, 'w') as f:
writer = csv.writer(f)
writer.writerow(['node id', 'predicted label'])
writer.writerows([
[i, pred[i].item()] for i in range(len(pred))
])
print('Saved inference results to {}'.format(file_path))
if __name__ == '__main__':
main()
import ruamel.yaml
import typer
import torch
from copy import deepcopy
from jinja2 import Template
from pathlib import Path
from pydantic import Field
from typing import Optional
from ...utils.factory import ApplyPipelineFactory, PipelineBase, DataFactory, NodeModelFactory
from ...utils.yaml_dump import deep_convert_dict, merge_comment
@ApplyPipelineFactory.register("nodepred-ns")
class ApplyNodepredNsPipeline(PipelineBase):
def __init__(self):
self.pipeline = {
"name": "nodepred-ns",
"mode": "apply"
}
@classmethod
def setup_user_cfg_cls(cls):
from ...utils.enter_config import UserConfig
class ApplyNodePredUserConfig(UserConfig):
data: DataFactory.filter("nodepred-ns").get_pydantic_config() = Field(..., discriminator="name")
cls.user_cfg_cls = ApplyNodePredUserConfig
@property
def user_cfg_cls(self):
return self.__class__.user_cfg_cls
def get_cfg_func(self):
def config(
data: DataFactory.filter("nodepred-ns").get_dataset_enum() = typer.Option(None, help="input data name"),
cfg: Optional[str] = typer.Option(None, help="output configuration file path"),
cpt: str = typer.Option(..., help="input checkpoint file path")
):
# Training configuration
train_cfg = torch.load(cpt)["cfg"]
if data is None:
print("data is not specified, use the training dataset")
data = train_cfg["data_name"]
else:
data = data.name
if cfg is None:
cfg = "_".join(["apply", "nodepred-ns", data, train_cfg["model_name"]]) + ".yaml"
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": self.pipeline["name"],
"pipeline_mode": self.pipeline["mode"],
"device": train_cfg["device"],
"data": {"name": data},
"cpt_path": cpt,
"general_pipeline": {"save_path": "apply_results"}
}
output_cfg = self.user_cfg_cls(**generated_cfg).dict()
output_cfg = deep_convert_dict(output_cfg)
# Not applicable for inference
output_cfg['data'].pop('split_ratio')
comment_dict = {
"device": "Torch device name, e.g., cpu or cuda or cuda:0",
"cpt_path": "Path to the checkpoint file",
"general_pipeline": {"save_path": "Directory to save the inference results"}
}
comment_dict = merge_comment(output_cfg, comment_dict)
yaml = ruamel.yaml.YAML()
yaml.dump(comment_dict, Path(cfg).open("w"))
print("Configuration file is generated at {}".format(Path(cfg).absolute()))
return config
@classmethod
def gen_script(cls, user_cfg_dict):
# Check validation
cls.setup_user_cfg_cls()
cls.user_cfg_cls(**user_cfg_dict)
# Training configuration
train_cfg = torch.load(user_cfg_dict["cpt_path"])["cfg"]
# Dict for code rendering
render_cfg = deepcopy(user_cfg_dict)
model_name = train_cfg["model_name"]
model_code = NodeModelFactory.get_source_code(model_name)
render_cfg["model_code"] = model_code
render_cfg["model_class_name"] = NodeModelFactory.get_model_class_name(model_name)
render_cfg.update(DataFactory.get_generated_code_dict(user_cfg_dict["data"]["name"]))
# Dict for defining cfg in the rendered code
generated_user_cfg = deepcopy(user_cfg_dict)
generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg.pop("pipeline_mode")
# model arch configuration
generated_user_cfg["model"] = train_cfg["model"]
render_cfg["user_cfg_str"] = f"cfg = {str(generated_user_cfg)}"
render_cfg["user_cfg"] = user_cfg_dict
file_current_dir = Path(__file__).resolve().parent
with open(file_current_dir / "nodepred-ns.jinja-py", "r") as f:
template = Template(f.read())
return template.render(**render_cfg)
@staticmethod
def get_description() -> str:
return "Node classification neighbor sampling pipeline for inference"
import torch
import dgl
import os
import csv
from dgl.data import AsNodePredDataset
{{ data_import_code }}
{{ model_code }}
def infer(device, data, model):
g = data[0] # Only infer on the first graph
g = dgl.remove_self_loop(g)
g = dgl.add_self_loop(g)
g = g.to(device)
node_feat = g.ndata.get('feat', None)
edge_feat = g.edata.get('feat', None)
model = model.to(device)
model.eval()
with torch.no_grad():
logits = model(g, node_feat, edge_feat)
return logits
def main():
{{ user_cfg_str }}
device = cfg['device']
if not torch.cuda.is_available():
device = 'cpu'
# load data
data = AsNodePredDataset({{data_initialize_code}})
# validation
if cfg['model']['embed_size'] > 0:
model_num_nodes = cfg['model']['data_info']['num_nodes']
data_num_nodes = data[0].num_nodes()
assert model_num_nodes == data_num_nodes, \
'Training and inference need to be on the same dataset when node embeddings were learned from scratch'
else:
model_in_size = cfg['model']['data_info']['in_size']
data_in_size = data[0].ndata['feat'].shape[1]
assert model_in_size == data_in_size, \
'Expect the training data and inference data to have the same number of input node \
features, got {:d} and {:d}'.format(model_in_size, data_in_size)
model = {{ model_class_name }}(**cfg['model'])
model.load_state_dict(torch.load(cfg['cpt_path'], map_location='cpu')['model'])
logits = infer(device, data, model)
pred = logits.argmax(dim=1).cpu()
# Dump the results
os.makedirs(cfg['general_pipeline']["save_path"])
file_path = os.path.join(cfg['general_pipeline']["save_path"], 'output.csv')
with open(file_path, 'w') as f:
writer = csv.writer(f)
writer.writerow(['node id', 'predicted label'])
writer.writerows([
[i, pred[i].item()] for i in range(len(pred))
])
print('Saved inference results to {}'.format(file_path))
if __name__ == '__main__':
main()
from ..utils.factory import ApplyPipelineFactory
import autopep8
import isort
import typer
import yaml
from pathlib import Path
def apply(
cfg: str = typer.Option(..., help="config yaml file name")
):
user_cfg = yaml.safe_load(Path(cfg).open("r"))
pipeline_name = user_cfg["pipeline_name"]
output_file_content = ApplyPipelineFactory.registry[pipeline_name].gen_script(user_cfg)
f_code = autopep8.fix_code(output_file_content, options={'aggressive': 1})
f_code = isort.code(f_code)
code = compile(f_code, 'dglgo_tmp.py', 'exec')
exec(code, {'__name__': '__main__'})
......@@ -5,6 +5,8 @@ from .config_cli import config_app
from .train_cli import train
from .export_cli import export
from .recipe_cli import recipe_app
from .config_apply_cli import config_apply_app
from .apply_cli import apply
no_args_is_help = False
app = typer.Typer(no_args_is_help=True, add_completion=False)
......@@ -12,6 +14,8 @@ app.add_typer(config_app, name="configure", no_args_is_help=no_args_is_help)
app.add_typer(recipe_app, name="recipe", no_args_is_help=True)
app.command(help="Launch training", no_args_is_help=no_args_is_help)(train)
app.command(help="Export a runnable python script", no_args_is_help=no_args_is_help)(export)
app.add_typer(config_apply_app, name="configure-apply", no_args_is_help=no_args_is_help)
app.command(help="Launch inference", no_args_is_help=no_args_is_help)(apply)
def main():
app()
......
from ..apply_pipeline import *
from ..utils.factory import ApplyPipelineFactory
import typer
config_apply_app = typer.Typer(help="Generate a configuration file for inference")
for key, pipeline in ApplyPipelineFactory.registry.items():
config_apply_app.command(key, help=pipeline.get_description())(pipeline.get_cfg_func())
from ..utils.factory import ModelFactory, PipelineFactory
from ..utils.enter_config import UserConfig
from ..utils.factory import ModelFactory, PipelineFactory, ApplyPipelineFactory
import typer
from enum import Enum
import typing
......@@ -15,7 +14,11 @@ def export(
):
user_cfg = yaml.safe_load(Path(cfg).open("r"))
pipeline_name = user_cfg["pipeline_name"]
output_file_content = PipelineFactory.registry[pipeline_name].gen_script(user_cfg)
pipeline_mode = user_cfg["pipeline_mode"]
if pipeline_mode == 'train':
output_file_content = PipelineFactory.registry[pipeline_name].gen_script(user_cfg)
else:
output_file_content = ApplyPipelineFactory.registry[pipeline_name].gen_script(user_cfg)
f_code = autopep8.fix_code(output_file_content, options={'aggressive': 1})
f_code = isort.code(f_code)
......
from ..utils.factory import ModelFactory, PipelineFactory
from ..utils.enter_config import UserConfig
import typer
from enum import Enum
import typing
......
......@@ -14,7 +14,7 @@ pipeline_comments = {
"eval_batch_size": "Graph batch size when evaluating",
"num_workers": "Number of workers for data loading",
"num_epochs": "Number of training epochs",
"save_path": "Path to save the model"
"save_path": "Directory to save the experiment results"
}
class GraphpredPipelineCfg(BaseModel):
......@@ -28,12 +28,15 @@ class GraphpredPipelineCfg(BaseModel):
loss: str = "BCEWithLogitsLoss"
metric: str = "roc_auc_score"
num_epochs: int = 100
save_path: str = "model.pth"
save_path: str = "results"
@PipelineFactory.register("graphpred")
class GraphpredPipeline(PipelineBase):
def __init__(self):
self.pipeline_name = "graphpred"
self.pipeline = {
"name": "graphpred",
"mode": "train"
}
@classmethod
def setup_user_cfg_cls(cls):
......@@ -58,7 +61,8 @@ class GraphpredPipeline(PipelineBase):
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": self.pipeline_name,
"pipeline_name": self.pipeline["name"],
"pipeline_mode": self.pipeline["mode"],
"device": "cpu",
"data": {"name": data.name},
"model": {"name": model.value},
......@@ -104,6 +108,7 @@ class GraphpredPipeline(PipelineBase):
generated_user_cfg["data"].pop("split_ratio")
generated_user_cfg["data_name"] = generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg.pop("pipeline_mode")
generated_user_cfg["model_name"] = generated_user_cfg["model"].pop("name")
generated_user_cfg["general_pipeline"]["optimizer"].pop("name")
generated_user_cfg["general_pipeline"]["lr_scheduler"].pop("name")
......
......@@ -2,10 +2,12 @@ import numpy as np
import sklearn
import torch
import torch.nn as nn
from dgl.data import AsGraphPredDataset
from dgl.dataloading import GraphDataLoader
import os
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm
from dgl.data import AsGraphPredDataset
from dgl.dataloading import GraphDataLoader
{{ data_import_code }}
{{ model_code }}
......@@ -61,16 +63,10 @@ def evaluate(device, loader, model):
return calc_metric(y_true, y_pred)
def main(run):
{{ user_cfg_str }}
def main(run, cfg, data):
device = cfg['device']
if not torch.cuda.is_available():
device = 'cpu'
pipeline_cfg = cfg['general_pipeline']
save_path = pipeline_cfg['save_path']
# load data
data = AsGraphPredDataset({{data_initialize_code}})
train_loader = GraphDataLoader(data[data.train_idx], batch_size=pipeline_cfg['train_batch_size'],
shuffle=True, num_workers=pipeline_cfg['num_workers'])
val_loader = GraphDataLoader(data[data.val_idx], batch_size=pipeline_cfg['eval_batch_size'],
......@@ -79,17 +75,6 @@ def main(run):
shuffle=False, num_workers=pipeline_cfg['num_workers'])
# create model
model_cfg = cfg["model"]
# data[0] is a tuple (g, label)
cfg["model"]["data_info"] = {
"name": cfg["data_name"],
"node_feat_size": data.node_feat_size,
"edge_feat_size": data.edge_feat_size,
"out_size": data.num_tasks
}
if cfg["model_name"] == 'pna':
in_deg = torch.cat([g.in_degrees() for (g, _) in data[data.train_idx]])
cfg["model"]["data_info"]["delta"] = torch.mean(torch.log(in_deg + 1))
model = {{ model_class_name }}(**cfg["model"])
model = model.to(device)
......@@ -100,12 +85,14 @@ def main(run):
optimizer, **pipeline_cfg["lr_scheduler"])
best_val_metric = 0.
tmp_cpt_path = 'checkpoint.pth'
for epoch in range(pipeline_cfg['num_epochs']):
train(device, train_loader, model, criterion, optimizer)
val_metric = evaluate(device, val_loader, model)
if val_metric >= best_val_metric:
best_val_metric = val_metric
torch.save(model.state_dict(), save_path)
torch.save(model.state_dict(), tmp_cpt_path)
print('Run {:d} | Epoch {:d} | Val Metric {:.4f} | Best Val Metric {:.4f}'.format(
run, epoch, val_metric, best_val_metric))
......@@ -114,18 +101,42 @@ def main(run):
else:
lr_scheduler.step()
model.load_state_dict(torch.load(save_path))
model.load_state_dict(torch.load(tmp_cpt_path))
os.remove(tmp_cpt_path)
test_metric = evaluate(device, test_loader, model)
print('Test Metric: {:.4f}'.format(test_metric))
cpt_path = os.path.join(pipeline_cfg["save_path"], 'run_{}.pth'.format(run))
torch.save({'cfg': cfg, 'model': model.state_dict()}, cpt_path)
print('Saved training checkpoint to {}'.format(cpt_path))
return test_metric
if __name__ == '__main__':
{{ user_cfg_str }}
if not torch.cuda.is_available():
cfg['device'] = 'cpu'
# load data
data = AsGraphPredDataset({{data_initialize_code}})
cfg["model"]["data_info"] = {
"name": cfg["data_name"],
"node_feat_size": data.node_feat_size,
"edge_feat_size": data.edge_feat_size,
"out_size": data.num_tasks
}
if cfg["model_name"] == 'pna':
in_deg = torch.cat([g.in_degrees() for (g, _) in data[data.train_idx]])
cfg["model"]["data_info"]["delta"] = torch.mean(torch.log(in_deg + 1))
os.makedirs(cfg['general_pipeline']["save_path"])
all_run_metrics = []
num_runs = {{ user_cfg.general_pipeline.num_runs }}
for run in range(num_runs):
print('Run experiment {:d}'.format(run))
test_metric = main(run)
test_metric = main(run, cfg, data)
all_run_metrics.append(test_metric)
avg_metric = np.round(np.mean(all_run_metrics), 6)
std_metric = np.round(np.std(all_run_metrics), 6)
......
......@@ -20,7 +20,7 @@ class LinkpredPipelineCfg(BaseModel):
eval_period: int = 5
optimizer: dict = {"name": "Adam", "lr": 0.005}
loss: str = "BCELoss"
save_path: str = "model.pth"
save_path: str = "results"
num_runs: int = 1
......@@ -30,7 +30,7 @@ pipeline_comments = {
"train_batch_size": "Edge batch size when training",
"num_epochs": "Number of training epochs",
"eval_period": "Interval epochs between evaluations",
"save_path": "Path to save the model",
"save_path": "Directory to save the experiment results",
"num_runs": "Number of experiments to run",
}
......@@ -42,14 +42,16 @@ class LinkpredPipeline(PipelineBase):
pipeline_name = "linkpred"
def __init__(self):
self.pipeline_name = "linkpred"
self.pipeline = {
"name": "linkpred",
"mode": "train"
}
@classmethod
def setup_user_cfg_cls(cls):
from ...utils.enter_config import UserConfig
class LinkPredUserConfig(UserConfig):
pipeline_name: str = "linkpred"
data: DataFactory.filter("linkpred").get_pydantic_config() = Field(..., discriminator="name")
node_model: NodeModelFactory.get_pydantic_model_config() = Field(...,
discriminator="name")
......@@ -79,7 +81,8 @@ class LinkpredPipeline(PipelineBase):
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": "linkpred",
"pipeline_name": self.pipeline["name"],
"pipeline_mode": self.pipeline["mode"],
"device": "cpu",
"data": {"name": data.name},
"neg_sampler": {"name": neg_sampler.value},
......@@ -138,6 +141,7 @@ class LinkpredPipeline(PipelineBase):
else:
generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg.pop("pipeline_mode")
generated_user_cfg["node_model"].pop("name")
generated_user_cfg["edge_model"].pop("name")
generated_user_cfg["neg_sampler"].pop("name")
......
import dgl
from dgl.data import AsLinkPredDataset
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import numpy as np
import dgl
import os
from torch.utils.data import DataLoader
from dgl.data import AsLinkPredDataset
{{ data_import_code }}
{{ node_model_code}}
{{ edge_model_code }}
{{ node_model_code}}
class Model(nn.Module):
def __init__(self, node_model, edge_model, neg_sampler, eval_batch_size):
super().__init__()
......@@ -39,8 +39,8 @@ def calc_hitsk(y_pred_pos, y_pred_neg, k):
hitsK = (y_pred_pos > kth_score_in_negative_edges).float().mean()
return hitsK.item()
def train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss_fcn):
train_g = dataset.train_graph
def train(cfg, pipeline_cfg, device, data, model, optimizer, loss_fcn):
train_g = data.train_graph
train_g = train_g.to(device)
node_feat = train_g.ndata['feat']
train_src, train_dst = train_g.edges()
......@@ -63,7 +63,7 @@ def train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss_fcn):
optimizer.step()
with torch.no_grad():
model.eval()
val_neg_edges = dataset.val_edges[1]
val_neg_edges = data.val_edges[1]
val_neg_score = model.inference(train_g, node_feat, val_neg_edges)
train_hits = calc_hitsk(pos_score, val_neg_score, k=50)
print("Epoch {:05d} | Loss {:.4f} | Train Hits@50 {:.4f}".format(epoch, loss, train_hits))
......@@ -71,7 +71,7 @@ def train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss_fcn):
if epoch != 0 and epoch % pipeline_cfg['eval_period'] == 0:
with torch.no_grad():
model.eval()
val_pos_edge, val_neg_edges = dataset.val_edges
val_pos_edge, val_neg_edges = data.val_edges
pos_result = model.inference(train_g, node_feat, val_pos_edge)
neg_result = model.inference(train_g, node_feat, val_neg_edges)
val_hits = calc_hitsk(pos_result, neg_result, k=50)
......@@ -79,7 +79,7 @@ def train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss_fcn):
with torch.no_grad():
model.eval()
test_pos_edge, test_neg_edges = dataset.test_edges
test_pos_edge, test_neg_edges = data.test_edges
pos_result = model.inference(train_g, node_feat, test_pos_edge)
neg_result = model.inference(train_g, node_feat, test_neg_edges)
test_hits = calc_hitsk(pos_result, neg_result, k=50)
......@@ -87,40 +87,53 @@ def train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss_fcn):
return test_hits
def main():
{{user_cfg_str}}
def main(run, cfg, data):
device = cfg['device']
pipeline_cfg = cfg['general_pipeline']
dataset = AsLinkPredDataset({{ data_initialize_code }})
if 'feat' not in dataset[0].ndata:
assert cfg["node_model"]["embed_size"] > 0, "Need to specify embed size if graph doesn't have feat in ndata"
cfg["node_model"]["data_info"] = {
"in_size": cfg["node_model"]['embed_size'] if cfg["node_model"]['embed_size'] > 0 else dataset[0].ndata['feat'].shape[1],
"out_size": pipeline_cfg['hidden_size'],
"num_nodes": dataset[0].num_nodes()
}
cfg["edge_model"]["data_info"] = {
"in_size": pipeline_cfg['hidden_size'],
"out_size": 1 # output each edge score
}
node_model = {{node_model_class_name}}(**cfg["node_model"])
edge_model = {{edge_model_class_name}}(**cfg["edge_model"])
neg_sampler = dgl.dataloading.negative_sampler.{{ neg_sampler_name }}(**cfg["neg_sampler"])
model = Model(node_model, edge_model, neg_sampler, pipeline_cfg["eval_batch_size"])
model = model.to(device)
params = model.parameters()
loss = torch.nn.{{ loss }}()
optimizer = torch.optim.Adam(params, **pipeline_cfg["optimizer"])
test_hits = train(cfg, pipeline_cfg, device, dataset, model, optimizer, loss)
torch.save(model.state_dict(), pipeline_cfg["save_path"])
optimizer = torch.optim.Adam(model.parameters(), **pipeline_cfg["optimizer"])
test_hits = train(cfg, pipeline_cfg, device, data, model, optimizer, loss)
cpt_path = os.path.join(pipeline_cfg["save_path"], 'run_{}.pth'.format(run))
torch.save({'cfg': cfg, 'model': model.state_dict()}, cpt_path)
print('Saved training checkpoint to {}'.format(cpt_path))
return test_hits
if __name__ == '__main__':
{{user_cfg_str}}
if not torch.cuda.is_available():
cfg['device'] = 'cpu'
# load data
data = AsLinkPredDataset({{ data_initialize_code }})
nmodel_cfg = cfg["node_model"]
pipeline_cfg = cfg['general_pipeline']
if 'feat' not in data[0].ndata:
assert nmodel_cfg["embed_size"] > 0, "Need to specify embed size if graph doesn't have feat in ndata"
cfg["node_model"]["data_info"] = {
"in_size": nmodel_cfg['embed_size'] if nmodel_cfg['embed_size'] > 0 else data[0].ndata['feat'].shape[1],
"out_size": pipeline_cfg['hidden_size'],
"num_nodes": data[0].num_nodes()
}
cfg["edge_model"]["data_info"] = {
"in_size": pipeline_cfg['hidden_size'],
"out_size": 1 # output each edge score
}
os.makedirs(pipeline_cfg["save_path"])
all_acc = []
num_runs = {{ user_cfg.general_pipeline.num_runs }}
for run in range(num_runs):
print(f'Run experiment #{run}')
test_acc = main()
test_acc = main(run, cfg, data)
print("Test Hits@50 {:.4f}".format(test_acc))
all_acc.append(test_acc)
avg_acc = np.round(np.mean(all_acc), 6)
......
......@@ -18,7 +18,7 @@ pipeline_comments = {
"patience": "Steps before early stop",
"checkpoint_path": "Early stop checkpoint model file path"
},
"save_path": "Path to save the model",
"save_path": "Directory to save the experiment results",
"num_runs": "Number of experiments to run",
}
......@@ -28,7 +28,7 @@ class NodepredPipelineCfg(BaseModel):
eval_period: int = 5
optimizer: dict = {"name": "Adam", "lr": 0.01, "weight_decay": 5e-4}
loss: str = "CrossEntropyLoss"
save_path: str = "model.pth"
save_path: str = "results"
num_runs: int = 1
@PipelineFactory.register("nodepred")
......@@ -37,7 +37,10 @@ class NodepredPipeline(PipelineBase):
user_cfg_cls = None
def __init__(self):
self.pipeline_name = "nodepred"
self.pipeline = {
"name": "nodepred",
"mode": "train"
}
@classmethod
def setup_user_cfg_cls(cls):
......@@ -62,7 +65,8 @@ class NodepredPipeline(PipelineBase):
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": self.pipeline_name,
"pipeline_name": self.pipeline["name"],
"pipeline_mode": self.pipeline["mode"],
"device": "cpu",
"data": {"name": data.name},
"model": {"name": model.value},
......@@ -108,12 +112,10 @@ class NodepredPipeline(PipelineBase):
generated_user_cfg = copy.deepcopy(user_cfg_dict)
if "split_ratio" in generated_user_cfg["data"]:
generated_user_cfg["data"].pop("split_ratio")
if len(generated_user_cfg["data"]) == 1:
generated_user_cfg.pop("data")
else:
generated_user_cfg["data"].pop("name")
generated_user_cfg["data_name"] = generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg["model"].pop("name")
generated_user_cfg.pop("pipeline_mode")
generated_user_cfg["model_name"] = generated_user_cfg["model"].pop("name")
generated_user_cfg["general_pipeline"]["optimizer"].pop("name")
generated_train_cfg = copy.deepcopy(user_cfg_dict["general_pipeline"])
......@@ -128,4 +130,4 @@ class NodepredPipeline(PipelineBase):
@staticmethod
def get_description() -> str:
return "Node classification pipeline"
return "Node classification pipeline for training"
......@@ -3,6 +3,8 @@ import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
import os
from dgl.data import AsNodePredDataset
{{ data_import_code }}
......@@ -12,7 +14,7 @@ from dgl.data import AsNodePredDataset
class EarlyStopping:
def __init__(self,
patience: int = -1,
checkpoint_path: str = 'checkpoint.pt'):
checkpoint_path: str = 'checkpoint.pth'):
self.patience = patience
self.checkpoint_path = checkpoint_path
self.counter = 0
......@@ -41,6 +43,9 @@ class EarlyStopping:
def load_checkpoint(self, model):
model.load_state_dict(torch.load(self.checkpoint_path))
def close(self):
os.remove(self.checkpoint_path)
{% endif %}
......@@ -86,6 +91,7 @@ def train(cfg, pipeline_cfg, device, data, model, optimizer, loss_fcn):
{% if user_cfg.general_pipeline.early_stop %}
stopper.load_checkpoint(model)
stopper.close()
{% endif %}
model.eval()
with torch.no_grad():
......@@ -93,34 +99,42 @@ def train(cfg, pipeline_cfg, device, data, model, optimizer, loss_fcn):
test_acc = accuracy(logits[test_mask], label[test_mask])
return test_acc
def main():
{{ user_cfg_str }}
def main(run, cfg, data):
device = cfg['device']
pipeline_cfg = cfg['general_pipeline']
model = {{ model_class_name }}(**cfg["model"])
model = model.to(device)
loss = torch.nn.{{ user_cfg.general_pipeline.loss }}()
optimizer = torch.optim.{{ user_cfg.general_pipeline.optimizer.name }}(model.parameters(), **pipeline_cfg["optimizer"])
test_acc = train(cfg, pipeline_cfg, device, data, model, optimizer, loss)
cpt_path = os.path.join(pipeline_cfg["save_path"], 'run_{}.pth'.format(run))
torch.save({'cfg': cfg, 'model': model.state_dict()}, cpt_path)
print('Saved training checkpoint to {}'.format(cpt_path))
return test_acc
if __name__ == '__main__':
{{ user_cfg_str }}
if not torch.cuda.is_available():
cfg['device'] = 'cpu'
# load data
data = AsNodePredDataset({{data_initialize_code}})
# create model
model_cfg = cfg["model"]
cfg["model"]["data_info"] = {
"in_size": model_cfg['embed_size'] if model_cfg['embed_size'] > 0 else data[0].ndata['feat'].shape[1],
"out_size": data.num_classes,
"num_nodes": data[0].num_nodes()
}
model = {{ model_class_name }}(**cfg["model"])
model = model.to(device)
loss = torch.nn.{{ user_cfg.general_pipeline.loss }}()
optimizer = torch.optim.{{ user_cfg.general_pipeline.optimizer.name }}(model.parameters(), **pipeline_cfg["optimizer"])
# train
test_acc = train(cfg, pipeline_cfg, device, data, model, optimizer, loss)
torch.save(model.state_dict(), pipeline_cfg["save_path"])
return test_acc
if __name__ == '__main__':
os.makedirs(cfg['general_pipeline']["save_path"])
all_acc = []
num_runs = {{ user_cfg.general_pipeline.num_runs }}
for run in range(num_runs):
print(f'Run experiment #{run}')
test_acc = main()
test_acc = main(run, cfg, data)
print("Test Accuracy {:.4f}".format(test_acc))
all_acc.append(test_acc)
avg_acc = np.round(np.mean(all_acc), 6)
......
......@@ -43,7 +43,7 @@ pipeline_comments = {
"eval_batch_size": "Batch size of seed nodes in training stage in evaluation stage",
"eval_num_workers": "Number of workers to accelerate the graph data processing step in evaluation stage"
},
"save_path": "Path to save the model",
"save_path": "Directory to save the experiment results",
"num_runs": "Number of experiments to run",
}
......@@ -55,12 +55,15 @@ class NodepredNSPipelineCfg(BaseModel):
optimizer: dict = {"name": "Adam", "lr": 0.005, "weight_decay": 0.0}
loss: str = "CrossEntropyLoss"
num_runs: int = 1
save_path: str = "model.pth"
save_path: str = "results"
@PipelineFactory.register("nodepred-ns")
class NodepredNsPipeline(PipelineBase):
def __init__(self):
self.pipeline_name = "nodepred-ns"
self.pipeline = {
"name": "nodepred-ns",
"mode": "train"
}
self.default_cfg = None
@classmethod
......@@ -87,7 +90,8 @@ class NodepredNsPipeline(PipelineBase):
):
self.__class__.setup_user_cfg_cls()
generated_cfg = {
"pipeline_name": "nodepred-ns",
"pipeline_name": self.pipeline["name"],
"pipeline_mode": self.pipeline["mode"],
"device": "cpu",
"data": {"name": data.name},
"model": {"name": model.value},
......@@ -143,13 +147,10 @@ class NodepredNsPipeline(PipelineBase):
if "split_ratio" in generated_user_cfg["data"]:
generated_user_cfg["data"].pop("split_ratio")
if len(generated_user_cfg["data"]) == 1:
generated_user_cfg.pop("data")
else:
generated_user_cfg["data"].pop("name")
generated_user_cfg["data_name"] = generated_user_cfg["data"].pop("name")
generated_user_cfg.pop("pipeline_name")
generated_user_cfg["model"].pop("name")
generated_user_cfg.pop("pipeline_mode")
generated_user_cfg["model_name"] = generated_user_cfg["model"].pop("name")
generated_user_cfg['general_pipeline']["optimizer"].pop("name")
......@@ -163,4 +164,4 @@ class NodepredNsPipeline(PipelineBase):
@staticmethod
def get_description() -> str:
return "Node classification neighbor sampling pipeline"
return "Node classification neighbor sampling pipeline for training"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment