"...api/git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "4c6152c2fb0ade468aadb417102605a07a8635d3"
Unverified Commit efe28493 authored by yxy235's avatar yxy235 Committed by GitHub
Browse files

[GraphBolt] Modify examples to use `seeds`. (#7231)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-0-133.us-west-2.compute.internal>
parent b725ee53
...@@ -205,7 +205,7 @@ if __name__ == "__main__": ...@@ -205,7 +205,7 @@ if __name__ == "__main__":
) )
args = parser.parse_args() args = parser.parse_args()
dataset = gb.BuiltinDataset("ogbn-products").load() dataset = gb.BuiltinDataset("ogbn-products-seeds").load()
datamodule = DataModule( datamodule = DataModule(
dataset, dataset,
[10, 10, 10], [10, 10, 10],
......
...@@ -51,7 +51,7 @@ import torch ...@@ -51,7 +51,7 @@ import torch
import torch.nn as nn import torch.nn as nn
import torch.nn.functional as F import torch.nn.functional as F
import tqdm import tqdm
from ogb.linkproppred import Evaluator from torchmetrics.retrieval import RetrievalMRR
class SAGE(nn.Module): class SAGE(nn.Module):
...@@ -243,43 +243,38 @@ def create_dataloader(args, graph, features, itemset, is_train=True): ...@@ -243,43 +243,38 @@ def create_dataloader(args, graph, features, itemset, is_train=True):
@torch.no_grad() @torch.no_grad()
def compute_mrr(args, model, evaluator, node_emb, src, dst, neg_dst): def compute_mrr(args, model, node_emb, seeds, labels, indexes):
"""Compute the Mean Reciprocal Rank (MRR) for given source and destination """Compute the Mean Reciprocal Rank (MRR) for given source and destination
nodes. nodes.
This function computes the MRR for a set of node pairs, dividing the task This function computes the MRR for a set of node pairs, dividing the task
into batches to handle potentially large graphs. into batches to handle potentially large graphs.
""" """
rr = torch.zeros(src.shape[0])
# Loop over node pairs in batches.
for start in tqdm.trange(
0, src.shape[0], args.eval_batch_size, desc="Evaluate"
):
end = min(start + args.eval_batch_size, src.shape[0])
# Concatenate positive and negative destination nodes. preds = torch.empty(seeds.shape[0])
all_dst = torch.cat([dst[start:end, None], neg_dst[start:end]], 1) mrr = RetrievalMRR()
seeds_src, seeds_dst = seeds.T
# The constant number is 1001, due to negtive ratio in the `ogbl-citation2`
# dataset is 1000.
eval_size = args.eval_batch_size * 1001
# Loop over node pairs in batches.
for start in tqdm.trange(0, seeds_src.shape[0], eval_size, desc="Evaluate"):
end = min(start + eval_size, seeds_src.shape[0])
# Fetch embeddings for current batch of source and destination nodes. # Fetch embeddings for current batch of source and destination nodes.
h_src = node_emb[src[start:end]][:, None, :].to(args.device) h_src = node_emb[seeds_src[start:end]].to(args.device)
h_dst = ( h_dst = node_emb[seeds_dst[start:end]].to(args.device)
node_emb[all_dst.view(-1)].view(*all_dst.shape, -1).to(args.device)
)
# Compute prediction scores using the model. # Compute prediction scores using the model.
pred = model.predictor(h_src * h_dst).squeeze(-1) pred = model.predictor(h_src * h_dst).squeeze()
preds[start:end] = pred
# Evaluate the predictions to obtain MRR values. return mrr(preds, labels, indexes=indexes)
input_dict = {"y_pred_pos": pred[:, 0], "y_pred_neg": pred[:, 1:]}
rr[start:end] = evaluator.eval(input_dict)["mrr_list"]
return rr.mean()
@torch.no_grad() @torch.no_grad()
def evaluate(args, model, graph, features, all_nodes_set, valid_set, test_set): def evaluate(args, model, graph, features, all_nodes_set, valid_set, test_set):
"""Evaluate the model on validation and test sets.""" """Evaluate the model on validation and test sets."""
model.eval() model.eval()
evaluator = Evaluator(name="ogbl-citation2")
dataloader = create_dataloader( dataloader = create_dataloader(
args, graph, features, all_nodes_set, is_train=False args, graph, features, all_nodes_set, is_train=False
...@@ -292,13 +287,13 @@ def evaluate(args, model, graph, features, all_nodes_set, valid_set, test_set): ...@@ -292,13 +287,13 @@ def evaluate(args, model, graph, features, all_nodes_set, valid_set, test_set):
# Loop over both validation and test sets. # Loop over both validation and test sets.
for split in [valid_set, test_set]: for split in [valid_set, test_set]:
# Unpack the item set. # Unpack the item set.
src = split._items[0][:, 0].to(node_emb.device) seeds = split._items[0].to(node_emb.device)
dst = split._items[0][:, 1].to(node_emb.device) labels = split._items[1].to(node_emb.device)
neg_dst = split._items[1].to(node_emb.device) indexes = split._items[2].to(node_emb.device)
# Compute MRR values for the current split. # Compute MRR values for the current split.
results.append( results.append(
compute_mrr(args, model, evaluator, node_emb, src, dst, neg_dst) compute_mrr(args, model, node_emb, seeds, labels, indexes)
) )
return results return results
...@@ -313,7 +308,8 @@ def train(args, model, graph, features, train_set): ...@@ -313,7 +308,8 @@ def train(args, model, graph, features, train_set):
start_epoch_time = time.time() start_epoch_time = time.time()
for step, data in tqdm.tqdm(enumerate(dataloader)): for step, data in tqdm.tqdm(enumerate(dataloader)):
# Get node pairs with labels for loss calculation. # Get node pairs with labels for loss calculation.
compacted_pairs, labels = data.node_pairs_with_labels compacted_seeds = data.compacted_seeds.T
labels = data.labels
node_feature = data.node_features["feat"] node_feature = data.node_features["feat"]
blocks = data.blocks blocks = data.blocks
...@@ -321,7 +317,7 @@ def train(args, model, graph, features, train_set): ...@@ -321,7 +317,7 @@ def train(args, model, graph, features, train_set):
# Get the embeddings of the input nodes. # Get the embeddings of the input nodes.
y = model(blocks, node_feature) y = model(blocks, node_feature)
logits = model.predictor( logits = model.predictor(
y[compacted_pairs[0]] * y[compacted_pairs[1]] y[compacted_seeds[0]] * y[compacted_seeds[1]]
).squeeze() ).squeeze()
# Compute loss. # Compute loss.
...@@ -389,7 +385,7 @@ def main(args): ...@@ -389,7 +385,7 @@ def main(args):
# Load and preprocess dataset. # Load and preprocess dataset.
print("Loading data") print("Loading data")
dataset = gb.BuiltinDataset("ogbl-citation2").load() dataset = gb.BuiltinDataset("ogbl-citation2-seeds").load()
# Move the dataset to the selected storage. # Move the dataset to the selected storage.
if args.storage_device == "pinned": if args.storage_device == "pinned":
......
...@@ -101,7 +101,7 @@ def create_dataloader( ...@@ -101,7 +101,7 @@ def create_dataloader(
# ensures that the rest of the operations run on the GPU. # ensures that the rest of the operations run on the GPU.
############################################################################ ############################################################################
if args.storage_device != "cpu": if args.storage_device != "cpu":
datapipe = datapipe.copy_to(device=device, extra_attrs=["seed_nodes"]) datapipe = datapipe.copy_to(device=device, extra_attrs=["seeds"])
############################################################################ ############################################################################
# [Step-3]: # [Step-3]:
...@@ -364,8 +364,12 @@ def parse_args(): ...@@ -364,8 +364,12 @@ def parse_args():
parser.add_argument( parser.add_argument(
"--dataset", "--dataset",
type=str, type=str,
default="ogbn-products", default="ogbn-products-seeds",
choices=["ogbn-arxiv", "ogbn-products", "ogbn-papers100M"], choices=[
"ogbn-arxiv-seeds",
"ogbn-products-seeds",
"ogbn-papers100M-seeds",
],
help="The dataset we can use for node classification example. Currently" help="The dataset we can use for node classification example. Currently"
" ogbn-products, ogbn-arxiv, ogbn-papers100M datasets are supported.", " ogbn-products, ogbn-arxiv, ogbn-papers100M datasets are supported.",
) )
......
...@@ -208,8 +208,9 @@ def main(): ...@@ -208,8 +208,9 @@ def main():
parser.add_argument( parser.add_argument(
"--dataset", "--dataset",
type=str, type=str,
default="ogbn-products", default="ogbn-products-seeds",
help='Name of the dataset to use (e.g., "ogbn-products", "ogbn-arxiv")', help='Name of the dataset to use (e.g., "ogbn-products-seeds",'
+ ' "ogbn-arxiv-seeds")',
) )
parser.add_argument( parser.add_argument(
"--epochs", type=int, default=10, help="Number of training epochs." "--epochs", type=int, default=10, help="Number of training epochs."
......
...@@ -175,7 +175,7 @@ def create_dataloader( ...@@ -175,7 +175,7 @@ def create_dataloader(
) )
# Copy the data to the specified device. # Copy the data to the specified device.
if args.graph_device != "cpu": if args.graph_device != "cpu":
datapipe = datapipe.copy_to(device=device, extra_attrs=["seed_nodes"]) datapipe = datapipe.copy_to(device=device, extra_attrs=["seeds"])
# Sample neighbors for each node in the mini-batch. # Sample neighbors for each node in the mini-batch.
datapipe = getattr(datapipe, args.sample_mode)( datapipe = getattr(datapipe, args.sample_mode)(
graph, fanout if job != "infer" else [-1] graph, fanout if job != "infer" else [-1]
...@@ -320,8 +320,12 @@ def parse_args(): ...@@ -320,8 +320,12 @@ def parse_args():
parser.add_argument( parser.add_argument(
"--dataset", "--dataset",
type=str, type=str,
default="ogbn-products", default="ogbn-products-seeds",
choices=["ogbn-arxiv", "ogbn-products", "ogbn-papers100M"], choices=[
"ogbn-arxiv-seeds",
"ogbn-products-seeds",
"ogbn-papers100M-seeds",
],
help="The dataset we can use for node classification example. Currently" help="The dataset we can use for node classification example. Currently"
" ogbn-products, ogbn-arxiv, ogbn-papers100M datasets are supported.", " ogbn-products, ogbn-arxiv, ogbn-papers100M datasets are supported.",
) )
......
...@@ -85,7 +85,8 @@ def evaluate(model, dataset, device): ...@@ -85,7 +85,8 @@ def evaluate(model, dataset, device):
labels = [] labels = []
for step, data in enumerate(dataloader): for step, data in enumerate(dataloader):
# Get node pairs with labels for loss calculation. # Get node pairs with labels for loss calculation.
compacted_pairs, label = data.node_pairs_with_labels compacted_seeds = data.compacted_seeds.T
label = data.labels
# The features of sampled nodes. # The features of sampled nodes.
x = data.node_features["feat"] x = data.node_features["feat"]
...@@ -94,7 +95,7 @@ def evaluate(model, dataset, device): ...@@ -94,7 +95,7 @@ def evaluate(model, dataset, device):
y = model(data.blocks, x) y = model(data.blocks, x)
logit = ( logit = (
model.predictor( model.predictor(
y[compacted_pairs[0].long()] * y[compacted_pairs[1].long()] y[compacted_seeds[0].long()] * y[compacted_seeds[1].long()]
) )
.squeeze() .squeeze()
.detach() .detach()
...@@ -126,7 +127,8 @@ def train(model, dataset, device): ...@@ -126,7 +127,8 @@ def train(model, dataset, device):
######################################################################## ########################################################################
for step, data in enumerate(dataloader): for step, data in enumerate(dataloader):
# Get node pairs with labels for loss calculation. # Get node pairs with labels for loss calculation.
compacted_pairs, labels = data.node_pairs_with_labels compacted_seeds = data.compacted_seeds.T
labels = data.labels
# The features of sampled nodes. # The features of sampled nodes.
x = data.node_features["feat"] x = data.node_features["feat"]
...@@ -134,7 +136,7 @@ def train(model, dataset, device): ...@@ -134,7 +136,7 @@ def train(model, dataset, device):
# Forward. # Forward.
y = model(data.blocks, x) y = model(data.blocks, x)
logits = model.predictor( logits = model.predictor(
y[compacted_pairs[0].long()] * y[compacted_pairs[1].long()] y[compacted_seeds[0].long()] * y[compacted_seeds[1].long()]
).squeeze() ).squeeze()
# Compute loss. # Compute loss.
...@@ -156,7 +158,7 @@ if __name__ == "__main__": ...@@ -156,7 +158,7 @@ if __name__ == "__main__":
# Load and preprocess dataset. # Load and preprocess dataset.
print("Loading data...") print("Loading data...")
dataset = gb.BuiltinDataset("cora").load() dataset = gb.BuiltinDataset("cora-seeds").load()
# If a CUDA device is selected, we pin the graph and the features so that # If a CUDA device is selected, we pin the graph and the features so that
# the GPU can access them. # the GPU can access them.
......
...@@ -18,7 +18,7 @@ def create_dataloader(dataset, itemset, device): ...@@ -18,7 +18,7 @@ def create_dataloader(dataset, itemset, device):
datapipe = gb.ItemSampler(itemset, batch_size=16) datapipe = gb.ItemSampler(itemset, batch_size=16)
# Copy the mini-batch to the designated device for sampling and training. # Copy the mini-batch to the designated device for sampling and training.
datapipe = datapipe.copy_to(device, extra_attrs=["seed_nodes"]) datapipe = datapipe.copy_to(device, extra_attrs=["seeds"])
# Sample neighbors for the seed nodes. # Sample neighbors for the seed nodes.
datapipe = datapipe.sample_neighbor(dataset.graph, fanouts=[4, 2]) datapipe = datapipe.sample_neighbor(dataset.graph, fanouts=[4, 2])
...@@ -117,7 +117,7 @@ if __name__ == "__main__": ...@@ -117,7 +117,7 @@ if __name__ == "__main__":
# Load and preprocess dataset. # Load and preprocess dataset.
print("Loading data...") print("Loading data...")
dataset = gb.BuiltinDataset("cora").load() dataset = gb.BuiltinDataset("cora-seeds").load()
# If a CUDA device is selected, we pin the graph and the features so that # If a CUDA device is selected, we pin the graph and the features so that
# the GPU can access them. # the GPU can access them.
......
...@@ -117,7 +117,7 @@ def create_dataloader( ...@@ -117,7 +117,7 @@ def create_dataloader(
# Move the mini-batch to the appropriate device. # Move the mini-batch to the appropriate device.
# `device`: # `device`:
# The device to move the mini-batch to. # The device to move the mini-batch to.
datapipe = datapipe.copy_to(device, extra_attrs=["seed_nodes"]) datapipe = datapipe.copy_to(device, extra_attrs=["seeds"])
# Sample neighbors for each seed node in the mini-batch. # Sample neighbors for each seed node in the mini-batch.
# `graph`: # `graph`:
...@@ -153,7 +153,7 @@ def extract_embed(node_embed, input_nodes): ...@@ -153,7 +153,7 @@ def extract_embed(node_embed, input_nodes):
def extract_node_features(name, block, data, node_embed, device): def extract_node_features(name, block, data, node_embed, device):
"""Extract the node features from embedding layer or raw features.""" """Extract the node features from embedding layer or raw features."""
if name == "ogbn-mag": if name == "ogbn-mag-seeds":
input_nodes = { input_nodes = {
k: v.to(device) for k, v in block.srcdata[dgl.NID].items() k: v.to(device) for k, v in block.srcdata[dgl.NID].items()
} }
...@@ -419,8 +419,8 @@ def evaluate( ...@@ -419,8 +419,8 @@ def evaluate(
model.eval() model.eval()
category = "paper" category = "paper"
# An evaluator for the dataset. # An evaluator for the dataset.
if name == "ogbn-mag": if name == "ogbn-mag-seeds":
evaluator = Evaluator(name=name) evaluator = Evaluator(name="ogbn-mag")
else: else:
evaluator = MAG240MEvaluator() evaluator = MAG240MEvaluator()
...@@ -578,7 +578,7 @@ def main(args): ...@@ -578,7 +578,7 @@ def main(args):
# `institution` are generated in advance and stored in the feature store. # `institution` are generated in advance and stored in the feature store.
# For `ogbn-mag`, we generate the features on the fly. # For `ogbn-mag`, we generate the features on the fly.
embed_layer = None embed_layer = None
if args.dataset == "ogbn-mag": if args.dataset == "ogbn-mag-seeds":
# Create the embedding layer and move it to the appropriate device. # Create the embedding layer and move it to the appropriate device.
embed_layer = rel_graph_embed(g, feat_size).to(device) embed_layer = rel_graph_embed(g, feat_size).to(device)
print( print(
...@@ -652,9 +652,9 @@ if __name__ == "__main__": ...@@ -652,9 +652,9 @@ if __name__ == "__main__":
parser.add_argument( parser.add_argument(
"--dataset", "--dataset",
type=str, type=str,
default="ogbn-mag", default="ogbn-mag-seeds",
choices=["ogbn-mag", "ogb-lsc-mag240m"], choices=["ogbn-mag-seeds", "ogb-lsc-mag240m"],
help="Dataset name. Possible values: ogbn-mag, ogb-lsc-mag240m", help="Dataset name. Possible values: ogbn-mag-seeds, ogb-lsc-mag240m",
) )
parser.add_argument("--num_epochs", type=int, default=3) parser.add_argument("--num_epochs", type=int, default=3)
parser.add_argument("--num_workers", type=int, default=0) parser.add_argument("--num_workers", type=int, default=0)
......
...@@ -116,7 +116,7 @@ class SparseNeighborSampler(SubgraphSampler): ...@@ -116,7 +116,7 @@ class SparseNeighborSampler(SubgraphSampler):
def sample_subgraphs(self, seeds, seeds_timestamp=None): def sample_subgraphs(self, seeds, seeds_timestamp=None):
sampled_matrices = [] sampled_matrices = []
src = seeds src = seeds.long()
##################################################################### #####################################################################
# (HIGHLIGHT) Using the sparse sample operator to preform random # (HIGHLIGHT) Using the sparse sample operator to preform random
...@@ -242,7 +242,7 @@ if __name__ == "__main__": ...@@ -242,7 +242,7 @@ if __name__ == "__main__":
# Load and preprocess dataset. # Load and preprocess dataset.
print("Loading data") print("Loading data")
device = torch.device("cpu" if args.mode == "cpu" else "cuda") device = torch.device("cpu" if args.mode == "cpu" else "cuda")
dataset = gb.BuiltinDataset("ogbn-products").load() dataset = gb.BuiltinDataset("ogbn-products-seeds").load()
g = dataset.graph g = dataset.graph
features = dataset.feature features = dataset.feature
...@@ -254,7 +254,7 @@ if __name__ == "__main__": ...@@ -254,7 +254,7 @@ if __name__ == "__main__":
# Create sparse. # Create sparse.
N = g.num_nodes N = g.num_nodes
A = dglsp.from_csc(g.csc_indptr, g.indices, shape=(N, N)) A = dglsp.from_csc(g.csc_indptr.long(), g.indices.long(), shape=(N, N))
# Model training. # Model training.
print("Training...") print("Training...")
......
...@@ -996,12 +996,22 @@ class BuiltinDataset(OnDiskDataset): ...@@ -996,12 +996,22 @@ class BuiltinDataset(OnDiskDataset):
) )
_datasets = [ _datasets = [
"cora", "cora",
"cora-seeds",
"ogbn-mag", "ogbn-mag",
"ogbn-mag-seeds",
"ogbl-citation2", "ogbl-citation2",
"ogbl-citation2-seeds",
"ogbn-products", "ogbn-products",
"ogbn-products-seeds",
"ogbn-arxiv", "ogbn-arxiv",
"ogbn-arxiv-seeds",
]
_large_datasets = [
"ogb-lsc-mag240m",
"ogb-lsc-mag240m-seeds",
"ogbn-papers100M",
"ogbn-papers100M-seeds",
] ]
_large_datasets = ["ogb-lsc-mag240m", "ogbn-papers100M"]
_all_datasets = _datasets + _large_datasets _all_datasets = _datasets + _large_datasets
def __init__(self, name: str, root: str = "datasets") -> OnDiskDataset: def __init__(self, name: str, root: str = "datasets") -> OnDiskDataset:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment