"docs/source/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "98e11533e266c5b30b60d3852bc9f76c6ca2898f"
Unverified Commit 938deec8 authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[GraphBolt][Doc] Updated MultiGPU tutorial (#7126)

parent e60262d3
...@@ -68,12 +68,7 @@ class SAGE(nn.Module): ...@@ -68,12 +68,7 @@ class SAGE(nn.Module):
self.hidden_size = hidden_size self.hidden_size = hidden_size
self.out_size = out_size self.out_size = out_size
# Set the dtype for the layers manually. # Set the dtype for the layers manually.
self.set_layer_dtype(torch.float32) self.float()
def set_layer_dtype(self, dtype):
for layer in self.layers:
for param in layer.parameters():
param.data = param.data.to(dtype)
def forward(self, blocks, x): def forward(self, blocks, x):
hidden_x = x hidden_x = x
...@@ -105,22 +100,38 @@ def create_dataloader( ...@@ -105,22 +100,38 @@ def create_dataloader(
features, features,
itemset, itemset,
device, device,
drop_last=False, is_train,
shuffle=True,
drop_uneven_inputs=False,
): ):
datapipe = gb.DistributedItemSampler( datapipe = gb.DistributedItemSampler(
item_set=itemset, item_set=itemset,
batch_size=1024, batch_size=1024,
drop_last=drop_last, drop_last=is_train,
shuffle=shuffle, shuffle=is_train,
drop_uneven_inputs=drop_uneven_inputs, drop_uneven_inputs=is_train,
) )
datapipe = datapipe.copy_to(device, extra_attrs=["seed_nodes"])
# Now that we have moved to device, sample_neighbor and fetch_feature steps
# will be executed on GPUs.
datapipe = datapipe.sample_neighbor(graph, [10, 10, 10]) datapipe = datapipe.sample_neighbor(graph, [10, 10, 10])
datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"]) datapipe = datapipe.fetch_feature(features, node_feature_keys=["feat"])
datapipe = datapipe.copy_to(device) return gb.DataLoader(datapipe)
dataloader = gb.DataLoader(datapipe)
return dataloader
def weighted_reduce(tensor, weight, dst=0):
########################################################################
# (HIGHLIGHT) Collect accuracy and loss values from sub-processes and
# obtain overall average values.
#
# `torch.distributed.reduce` is used to reduce tensors from all the
# sub-processes to a specified process, ReduceOp.SUM is used by default.
#
# Because the GPUs may have differing numbers of processed items, we
# perform a weighted mean to calculate the exact loss and accuracy.
########################################################################
dist.reduce(tensor=tensor, dst=dst)
weight = torch.tensor(weight, device=tensor.device)
dist.reduce(tensor=weight, dst=dst)
return tensor / weight
###################################################################### ######################################################################
...@@ -140,15 +151,11 @@ def evaluate(rank, model, graph, features, itemset, num_classes, device): ...@@ -140,15 +151,11 @@ def evaluate(rank, model, graph, features, itemset, num_classes, device):
graph, graph,
features, features,
itemset, itemset,
drop_last=False, device,
shuffle=False, is_train=False,
drop_uneven_inputs=False,
device=device,
) )
for step, data in ( for data in tqdm.tqdm(dataloader) if rank == 0 else dataloader:
tqdm.tqdm(enumerate(dataloader)) if rank == 0 else enumerate(dataloader)
):
blocks = data.blocks blocks = data.blocks
x = data.node_features["feat"] x = data.node_features["feat"]
y.append(data.labels) y.append(data.labels)
...@@ -161,7 +168,7 @@ def evaluate(rank, model, graph, features, itemset, num_classes, device): ...@@ -161,7 +168,7 @@ def evaluate(rank, model, graph, features, itemset, num_classes, device):
num_classes=num_classes, num_classes=num_classes,
) )
return res.to(device) return res.to(device), sum(y_i.size(0) for y_i in y)
###################################################################### ######################################################################
...@@ -196,22 +203,17 @@ def train( ...@@ -196,22 +203,17 @@ def train(
features, features,
train_set, train_set,
device, device,
drop_last=False, is_train=True,
shuffle=True,
drop_uneven_inputs=False,
) )
for epoch in range(5): for epoch in range(5):
epoch_start = time.time() epoch_start = time.time()
model.train() model.train()
total_loss = torch.tensor(0, dtype=torch.float).to(device) total_loss = torch.tensor(0, dtype=torch.float, device=device)
num_train_items = 0
with Join([model]): with Join([model]):
for step, data in ( for data in tqdm.tqdm(dataloader) if rank == 0 else dataloader:
tqdm.tqdm(enumerate(dataloader))
if rank == 0
else enumerate(dataloader)
):
# The input features are from the source nodes in the first # The input features are from the source nodes in the first
# layer's computation graph. # layer's computation graph.
x = data.node_features["feat"] x = data.node_features["feat"]
...@@ -231,40 +233,31 @@ def train( ...@@ -231,40 +233,31 @@ def train(
loss.backward() loss.backward()
optimizer.step() optimizer.step()
total_loss += loss total_loss += loss * y.size(0)
num_train_items += y.size(0)
# Evaluate the model. # Evaluate the model.
if rank == 0: if rank == 0:
print("Validating...") print("Validating...")
acc = ( acc, num_val_items = evaluate(
evaluate( rank,
rank, model,
model, graph,
graph, features,
features, valid_set,
valid_set, num_classes,
num_classes, device,
device,
)
/ world_size
) )
######################################################################## total_loss = weighted_reduce(total_loss, num_train_items)
# (HIGHLIGHT) Collect accuracy and loss values from sub-processes and acc = weighted_reduce(acc * num_val_items, num_val_items)
# obtain overall average values.
#
# `torch.distributed.reduce` is used to reduce tensors from all the
# sub-processes to a specified process, ReduceOp.SUM is used by default.
########################################################################
dist.reduce(tensor=acc, dst=0)
total_loss /= step + 1
dist.reduce(tensor=total_loss, dst=0)
dist.barrier()
# We synchronize before measuring the epoch time.
torch.cuda.synchronize()
epoch_end = time.time() epoch_end = time.time()
if rank == 0: if rank == 0:
print( print(
f"Epoch {epoch:05d} | " f"Epoch {epoch:05d} | "
f"Average Loss {total_loss.item() / world_size:.4f} | " f"Average Loss {total_loss.item():.4f} | "
f"Accuracy {acc.item():.4f} | " f"Accuracy {acc.item():.4f} | "
f"Time {epoch_end - epoch_start:.4f}" f"Time {epoch_end - epoch_start:.4f}"
) )
...@@ -292,8 +285,9 @@ def run(rank, world_size, devices, dataset): ...@@ -292,8 +285,9 @@ def run(rank, world_size, devices, dataset):
rank=rank, rank=rank,
) )
graph = dataset.graph # Pin the graph and features in-place to enable GPU access.
features = dataset.feature graph = dataset.graph.pin_memory_()
features = dataset.feature.pin_memory_()
train_set = dataset.tasks[0].train_set train_set = dataset.tasks[0].train_set
valid_set = dataset.tasks[0].validation_set valid_set = dataset.tasks[0].validation_set
num_classes = dataset.tasks[0].metadata["num_classes"] num_classes = dataset.tasks[0].metadata["num_classes"]
...@@ -325,20 +319,17 @@ def run(rank, world_size, devices, dataset): ...@@ -325,20 +319,17 @@ def run(rank, world_size, devices, dataset):
if rank == 0: if rank == 0:
print("Testing...") print("Testing...")
test_set = dataset.tasks[0].test_set test_set = dataset.tasks[0].test_set
test_acc = ( test_acc, num_test_items = evaluate(
evaluate( rank,
rank, model,
model, graph,
graph, features,
features, itemset=test_set,
itemset=test_set, num_classes=num_classes,
num_classes=num_classes, device=device,
device=device,
)
/ world_size
) )
dist.reduce(tensor=test_acc, dst=0) test_acc = weighted_reduce(test_acc * num_test_items, num_test_items)
dist.barrier()
if rank == 0: if rank == 0:
print(f"Test Accuracy {test_acc.item():.4f}") print(f"Test Accuracy {test_acc.item():.4f}")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment