Unverified Commit 053c8221 authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[GraphBolt][CUDA] Enable GPU sampling in examples (#6861)

parent c81ff6ad
...@@ -144,6 +144,16 @@ def create_dataloader(args, graph, features, itemset, is_train=True): ...@@ -144,6 +144,16 @@ def create_dataloader(args, graph, features, itemset, is_train=True):
shuffle=is_train, shuffle=is_train,
) )
############################################################################
# [Input]:
# 'device': The device to copy the data to.
# [Output]:
# A CopyTo object to copy the data to the specified device. Copying here
# ensures that the rest of the operations run on the GPU.
############################################################################
if args.storage_device != "cpu":
datapipe = datapipe.copy_to(device=args.device)
############################################################################ ############################################################################
# [Input]: # [Input]:
# 'args.neg_ratio': Specify the ratio of negative to positive samples. # 'args.neg_ratio': Specify the ratio of negative to positive samples.
...@@ -216,7 +226,8 @@ def create_dataloader(args, graph, features, itemset, is_train=True): ...@@ -216,7 +226,8 @@ def create_dataloader(args, graph, features, itemset, is_train=True):
# [Output]: # [Output]:
# A CopyTo object to copy the data to the specified device. # A CopyTo object to copy the data to the specified device.
############################################################################ ############################################################################
datapipe = datapipe.copy_to(device=args.device) if args.storage_device == "cpu":
datapipe = datapipe.copy_to(device=args.device)
############################################################################ ############################################################################
# [Input]: # [Input]:
...@@ -304,11 +315,11 @@ def train(args, model, graph, features, train_set): ...@@ -304,11 +315,11 @@ def train(args, model, graph, features, train_set):
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)
dataloader = create_dataloader(args, graph, features, train_set) dataloader = create_dataloader(args, graph, features, train_set)
for epoch in tqdm.trange(args.epochs): for epoch in range(args.epochs):
model.train() model.train()
total_loss = 0 total_loss = 0
start_epoch_time = time.time() start_epoch_time = time.time()
for step, data in enumerate(dataloader): for step, data in tqdm.tqdm(enumerate(dataloader)):
# Get node pairs with labels for loss calculation. # Get node pairs with labels for loss calculation.
compacted_pairs, labels = data.node_pairs_with_labels compacted_pairs, labels = data.node_pairs_with_labels
...@@ -366,24 +377,30 @@ def parse_args(): ...@@ -366,24 +377,30 @@ def parse_args():
help="Whether to exclude reverse edges during sampling. Default: 1", help="Whether to exclude reverse edges during sampling. Default: 1",
) )
parser.add_argument( parser.add_argument(
"--device", "--mode",
default="cpu", default="pinned-cuda",
choices=["cpu", "cuda"], choices=["cpu-cpu", "cpu-cuda", "pinned-cuda", "cuda-cuda"],
help="Train device: 'cpu' for CPU, 'cuda' for GPU.", help="Dataset storage placement and Train device: 'cpu' for CPU and RAM,"
" 'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory.",
) )
return parser.parse_args() return parser.parse_args()
def main(args): def main(args):
if not torch.cuda.is_available(): if not torch.cuda.is_available():
args.device = "cpu" args.mode = "cpu-cpu"
print(f"Training in {args.device} mode.") print(f"Training in {args.mode} mode.")
args.storage_device, args.device = args.mode.split("-")
args.device = torch.device(args.device)
# Load and preprocess dataset. # Load and preprocess dataset.
print("Loading data") print("Loading data")
dataset = gb.BuiltinDataset("ogbl-citation2").load() dataset = gb.BuiltinDataset("ogbl-citation2").load()
graph = dataset.graph
features = dataset.feature # Move the dataset to the selected storage.
graph = dataset.graph.to(args.storage_device)
features = dataset.feature.to(args.storage_device)
train_set = dataset.tasks[0].train_set train_set = dataset.tasks[0].train_set
args.fanout = list(map(int, args.fanout.split(","))) args.fanout = list(map(int, args.fanout.split(",")))
......
...@@ -92,6 +92,19 @@ def create_dataloader( ...@@ -92,6 +92,19 @@ def create_dataloader(
############################################################################ ############################################################################
# [Step-2]: # [Step-2]:
# self.copy_to()
# [Input]:
# 'device': The device to copy the data to.
# 'extra_attrs': The extra attributes to copy.
# [Output]:
# A CopyTo object to copy the data to the specified device. Copying here
# ensures that the rest of the operations run on the GPU.
############################################################################
if args.storage_device != "cpu":
datapipe = datapipe.copy_to(device=device, extra_attrs=["seed_nodes"])
############################################################################
# [Step-3]:
# self.sample_neighbor() # self.sample_neighbor()
# [Input]: # [Input]:
# 'graph': The network topology for sampling. # 'graph': The network topology for sampling.
...@@ -109,7 +122,7 @@ def create_dataloader( ...@@ -109,7 +122,7 @@ def create_dataloader(
) )
############################################################################ ############################################################################
# [Step-3]: # [Step-4]:
# self.fetch_feature() # self.fetch_feature()
# [Input]: # [Input]:
# 'features': The node features. # 'features': The node features.
...@@ -125,17 +138,18 @@ def create_dataloader( ...@@ -125,17 +138,18 @@ def create_dataloader(
datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"])
############################################################################ ############################################################################
# [Step-4]: # [Step-5]:
# self.copy_to() # self.copy_to()
# [Input]: # [Input]:
# 'device': The device to copy the data to. # 'device': The device to copy the data to.
# [Output]: # [Output]:
# A CopyTo object to copy the data to the specified device. # A CopyTo object to copy the data to the specified device.
############################################################################ ############################################################################
datapipe = datapipe.copy_to(device=device) if args.storage_device == "cpu":
datapipe = datapipe.copy_to(device=device)
############################################################################ ############################################################################
# [Step-5]: # [Step-6]:
# gb.DataLoader() # gb.DataLoader()
# [Input]: # [Input]:
# 'datapipe': The datapipe object to be used for data loading. # 'datapipe': The datapipe object to be used for data loading.
...@@ -259,7 +273,7 @@ def evaluate(args, model, graph, features, itemset, num_classes): ...@@ -259,7 +273,7 @@ def evaluate(args, model, graph, features, itemset, num_classes):
job="evaluate", job="evaluate",
) )
for step, data in tqdm(enumerate(dataloader)): for step, data in tqdm(enumerate(dataloader), "Evaluating"):
x = data.node_features["feat"] x = data.node_features["feat"]
y.append(data.labels) y.append(data.labels)
y_hats.append(model(data.blocks, x)) y_hats.append(model(data.blocks, x))
...@@ -289,7 +303,7 @@ def train(args, graph, features, train_set, valid_set, num_classes, model): ...@@ -289,7 +303,7 @@ def train(args, graph, features, train_set, valid_set, num_classes, model):
t0 = time.time() t0 = time.time()
model.train() model.train()
total_loss = 0 total_loss = 0
for step, data in enumerate(dataloader): for step, data in tqdm(enumerate(dataloader), "Training"):
# The input features from the source nodes in the first layer's # The input features from the source nodes in the first layer's
# computation graph. # computation graph.
x = data.node_features["feat"] x = data.node_features["feat"]
...@@ -349,28 +363,30 @@ def parse_args(): ...@@ -349,28 +363,30 @@ def parse_args():
" identical with the number of layers in your model. Default: 10,10,10", " identical with the number of layers in your model. Default: 10,10,10",
) )
parser.add_argument( parser.add_argument(
"--device", "--mode",
default="cpu", default="pinned-cuda",
choices=["cpu", "cuda"], choices=["cpu-cpu", "cpu-cuda", "pinned-cuda", "cuda-cuda"],
help="Train device: 'cpu' for CPU, 'cuda' for GPU.", help="Dataset storage placement and Train device: 'cpu' for CPU and RAM,"
" 'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory.",
) )
return parser.parse_args() return parser.parse_args()
def main(args): def main(args):
if not torch.cuda.is_available(): if not torch.cuda.is_available():
args.device = "cpu" args.mode = "cpu-cpu"
print(f"Training in {args.device} mode.") print(f"Training in {args.mode} mode.")
args.storage_device, args.device = args.mode.split("-")
args.device = torch.device(args.device) args.device = torch.device(args.device)
# Load and preprocess dataset. # Load and preprocess dataset.
print("Loading data...") print("Loading data...")
dataset = gb.BuiltinDataset("ogbn-products").load() dataset = gb.BuiltinDataset("ogbn-products").load()
graph = dataset.graph # Move the dataset to the selected storage.
# Currently the neighbor-sampling process can only be done on the CPU, graph = dataset.graph.to(args.storage_device)
# therefore there is no need to copy the graph to the GPU. features = dataset.feature.to(args.storage_device)
features = dataset.feature
train_set = dataset.tasks[0].train_set train_set = dataset.tasks[0].train_set
valid_set = dataset.tasks[0].validation_set valid_set = dataset.tasks[0].validation_set
test_set = dataset.tasks[0].test_set test_set = dataset.tasks[0].test_set
......
...@@ -18,7 +18,7 @@ from torcheval.metrics import BinaryAUROC ...@@ -18,7 +18,7 @@ from torcheval.metrics import BinaryAUROC
############################################################################ ############################################################################
# (HIGHLIGHT) Create a single process dataloader with dgl graphbolt package. # (HIGHLIGHT) Create a single process dataloader with dgl graphbolt package.
############################################################################ ############################################################################
def create_dataloader(dateset, device, is_train=True): def create_dataloader(dataset, device, is_train=True):
# The second of two tasks in the dataset is link prediction. # The second of two tasks in the dataset is link prediction.
task = dataset.tasks[1] task = dataset.tasks[1]
itemset = task.train_set if is_train else task.test_set itemset = task.train_set if is_train else task.test_set
...@@ -26,6 +26,9 @@ def create_dataloader(dateset, device, is_train=True): ...@@ -26,6 +26,9 @@ def create_dataloader(dateset, device, is_train=True):
# Sample seed edges from the itemset. # Sample seed edges from the itemset.
datapipe = gb.ItemSampler(itemset, batch_size=256) datapipe = gb.ItemSampler(itemset, batch_size=256)
# Copy the mini-batch to the designated device for sampling and training.
datapipe = datapipe.copy_to(device)
if is_train: if is_train:
# Sample negative edges for the seed edges. # Sample negative edges for the seed edges.
datapipe = datapipe.sample_uniform_negative( datapipe = datapipe.sample_uniform_negative(
...@@ -47,9 +50,6 @@ def create_dataloader(dateset, device, is_train=True): ...@@ -47,9 +50,6 @@ def create_dataloader(dateset, device, is_train=True):
dataset.feature, node_feature_keys=["feat"] dataset.feature, node_feature_keys=["feat"]
) )
# Copy the mini-batch to the designated device for training.
datapipe = datapipe.copy_to(device)
# Initiate the dataloader for the datapipe. # Initiate the dataloader for the datapipe.
return gb.DataLoader(datapipe) return gb.DataLoader(datapipe)
...@@ -158,6 +158,12 @@ if __name__ == "__main__": ...@@ -158,6 +158,12 @@ if __name__ == "__main__":
print("Loading data...") print("Loading data...")
dataset = gb.BuiltinDataset("cora").load() dataset = gb.BuiltinDataset("cora").load()
# If a CUDA device is selected, we pin the graph and the features so that
# the GPU can access them.
if device == torch.device("cuda:0"):
dataset.graph.pin_memory_()
dataset.feature.pin_memory_()
in_size = dataset.feature.size("node", None, "feat")[0] in_size = dataset.feature.size("node", None, "feat")[0]
model = GraphSAGE(in_size).to(device) model = GraphSAGE(in_size).to(device)
......
...@@ -13,10 +13,13 @@ import torchmetrics.functional as MF ...@@ -13,10 +13,13 @@ import torchmetrics.functional as MF
############################################################################ ############################################################################
# (HIGHLIGHT) Create a single process dataloader with dgl graphbolt package. # (HIGHLIGHT) Create a single process dataloader with dgl graphbolt package.
############################################################################ ############################################################################
def create_dataloader(dateset, itemset, device): def create_dataloader(dataset, itemset, device):
# Sample seed nodes from the itemset. # Sample seed nodes from the itemset.
datapipe = gb.ItemSampler(itemset, batch_size=16) datapipe = gb.ItemSampler(itemset, batch_size=16)
# Copy the mini-batch to the designated device for sampling and training.
datapipe = datapipe.copy_to(device, extra_attrs=["seed_nodes"])
# Sample neighbors for the seed nodes. # Sample neighbors for the seed nodes.
datapipe = datapipe.sample_neighbor(dataset.graph, fanouts=[4, 2]) datapipe = datapipe.sample_neighbor(dataset.graph, fanouts=[4, 2])
...@@ -25,9 +28,6 @@ def create_dataloader(dateset, itemset, device): ...@@ -25,9 +28,6 @@ def create_dataloader(dateset, itemset, device):
dataset.feature, node_feature_keys=["feat"] dataset.feature, node_feature_keys=["feat"]
) )
# Copy the mini-batch to the designated device for training.
datapipe = datapipe.copy_to(device)
# Initiate the dataloader for the datapipe. # Initiate the dataloader for the datapipe.
return gb.DataLoader(datapipe) return gb.DataLoader(datapipe)
...@@ -119,6 +119,12 @@ if __name__ == "__main__": ...@@ -119,6 +119,12 @@ if __name__ == "__main__":
print("Loading data...") print("Loading data...")
dataset = gb.BuiltinDataset("cora").load() dataset = gb.BuiltinDataset("cora").load()
# If a CUDA device is selected, we pin the graph and the features so that
# the GPU can access them.
if device == torch.device("cuda:0"):
dataset.graph.pin_memory_()
dataset.feature.pin_memory_()
in_size = dataset.feature.size("node", None, "feat")[0] in_size = dataset.feature.size("node", None, "feat")[0]
out_size = dataset.tasks[0].metadata["num_classes"] out_size = dataset.tasks[0].metadata["num_classes"]
model = GCN(in_size, out_size).to(device) model = GCN(in_size, out_size).to(device)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment