Unverified Commit b377e1b9 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist][Examples] refactor dist graphsage examples (#4269)

* [Dist][Examples] refactor dist graphsage examples

* refine train_dist.py

* update train_dist_unsupervised.py

* fix debug info

* update train_dist_transductive

* update unsupervised_transductive

* remove distgnn

* fix join() in standalone mode

* change batch_labels to long() for ogbn-papers100M

* free unnecessary mem

* lint

* fix lint

* refine

* fix lint

* fix incorrect args

* refine
parent ff090f69
......@@ -137,7 +137,6 @@ python3 ~/workspace/dgl/tools/launch.py \
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
--graph_format csc,coo \
"python3 train_dist_unsupervised.py --graph_name ogb-product --ip_config ip_config.txt --num_epochs 3 --batch_size 1000"
```
......@@ -158,24 +157,22 @@ To run supervised with transductive setting (nodes are initialized with node emb
```bash
python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pytorch/graphsage/dist/ \
--num_trainers 4 \
--num_samplers 4 \
--num_servers 1 \
--num_samplers 0 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
"python3 train_dist_transductive.py --graph_name ogb-product --ip_config ip_config.txt --batch_size 1000 --num_gpu 4 --eval_every 5"
"python3 train_dist_transductive.py --graph_name ogb-product --ip_config ip_config.txt --batch_size 1000 --num_gpus 4 --eval_every 5"
```
To run supervised with transductive setting using dgl distributed DistEmbedding
```bash
python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pytorch/graphsage/dist/ \
--num_trainers 4 \
--num_samplers 4 \
--num_servers 1 \
--num_samplers 0 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
"python3 train_dist_transductive.py --graph_name ogb-product --ip_config ip_config.txt --batch_size 1000 --num_gpu 4 --eval_every 5 --dgl_sparse"
"python3 train_dist_transductive.py --graph_name ogb-product --ip_config ip_config.txt --batch_size 1000 --num_gpus 4 --eval_every 5 --dgl_sparse"
```
To run unsupervised with transductive setting (nodes are initialized with node embedding)
......@@ -186,7 +183,6 @@ python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pyt
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
--graph_format csc,coo \
"python3 train_dist_unsupervised_transductive.py --graph_name ogb-product --ip_config ip_config.txt --num_epochs 3 --batch_size 1000 --num_gpus 4"
```
......@@ -198,7 +194,6 @@ python3 ~/workspace/dgl/tools/launch.py --workspace ~/workspace/dgl/examples/pyt
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
--graph_format csc,coo \
"python3 train_dist_unsupervised_transductive.py --graph_name ogb-product --ip_config ip_config.txt --num_epochs 3 --batch_size 1000 --num_gpus 4 --dgl_sparse"
```
......
import os
os.environ["DGLBACKEND"] = "pytorch"
import argparse
import math
import socket
import time
from functools import wraps
from multiprocessing import Process
from contextlib import contextmanager
import numpy as np
import torch as th
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
from torch.utils.data import DataLoader
import dgl
import dgl.function as fn
import dgl.nn.pytorch as dglnn
from dgl import DGLGraph
from dgl.data import load_data, register_data_args
from dgl.data.utils import load_graphs
from dgl.distributed import DistDataLoader
def load_subtensor(g, seeds, input_nodes, device, load_feat=True):
"""
......@@ -37,40 +23,6 @@ def load_subtensor(g, seeds, input_nodes, device, load_feat=True):
return batch_inputs, batch_labels
class NeighborSampler(object):
def __init__(self, g, fanouts, sample_neighbors, device, load_feat=True):
self.g = g
self.fanouts = fanouts
self.sample_neighbors = sample_neighbors
self.device = device
self.load_feat = load_feat
def sample_blocks(self, seeds):
seeds = th.LongTensor(np.asarray(seeds))
blocks = []
for fanout in self.fanouts:
# For each seed node, sample ``fanout`` neighbors.
frontier = self.sample_neighbors(
self.g, seeds, fanout, replace=True
)
# Then we compact the frontier into a bipartite graph for message passing.
block = dgl.to_block(frontier, seeds)
# Obtain the seed nodes for next layer.
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
input_nodes = blocks[0].srcdata[dgl.NID]
seeds = blocks[-1].dstdata[dgl.NID]
batch_inputs, batch_labels = load_subtensor(
self.g, seeds, input_nodes, "cpu", self.load_feat
)
if self.load_feat:
blocks[0].srcdata["features"] = batch_inputs
blocks[-1].dstdata["labels"] = batch_labels
return blocks
class DistSAGE(nn.Module):
def __init__(
self, in_feats, n_hidden, n_classes, n_layers, activation, dropout
......@@ -89,72 +41,68 @@ class DistSAGE(nn.Module):
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
for i, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
if i != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def inference(self, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
Inference with the GraphSAGE model on full neighbors (i.e. without
neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
Distributed layer-wise inference.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# During inference with sampling, multi-layer blocks are very
# inefficient because lots of computations in the first few layers
# are repeated. Therefore, we compute the representation of all nodes
# layer by layer. The nodes on each layer are of course splitted in
# batches.
# TODO: can we standardize this?
nodes = dgl.distributed.node_split(
np.arange(g.number_of_nodes()),
np.arange(g.num_nodes()),
g.get_partition_book(),
force_even=True,
)
y = dgl.distributed.DistTensor(
(g.number_of_nodes(), self.n_hidden),
(g.num_nodes(), self.n_hidden),
th.float32,
"h",
persistent=True,
)
for l, layer in enumerate(self.layers):
if l == len(self.layers) - 1:
for i, layer in enumerate(self.layers):
if i == len(self.layers) - 1:
y = dgl.distributed.DistTensor(
(g.number_of_nodes(), self.n_classes),
(g.num_nodes(), self.n_classes),
th.float32,
"h_last",
persistent=True,
)
sampler = NeighborSampler(
g, [-1], dgl.distributed.sample_neighbors, device
)
print(
"|V|={}, eval batch size: {}".format(
g.number_of_nodes(), batch_size
)
f"|V|={g.num_nodes()}, eval batch size: {batch_size}"
)
# Create PyTorch DataLoader for constructing blocks
dataloader = DistDataLoader(
dataset=nodes,
sampler = dgl.dataloading.NeighborSampler([-1])
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
nodes,
sampler,
batch_size=batch_size,
collate_fn=sampler.sample_blocks,
shuffle=False,
drop_last=False,
)
for blocks in tqdm.tqdm(dataloader):
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0].to(device)
input_nodes = block.srcdata[dgl.NID]
output_nodes = block.dstdata[dgl.NID]
h = x[input_nodes].to(device)
h_dst = h[: block.number_of_dst_nodes()]
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
if i != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
......@@ -164,6 +112,11 @@ class DistSAGE(nn.Module):
g.barrier()
return y
@contextmanager
def join(self):
"""dummy join for standalone"""
yield
def compute_acc(pred, labels):
"""
......@@ -196,23 +149,18 @@ def run(args, device, data):
# Unpack data
train_nid, val_nid, test_nid, in_feats, n_classes, g = data
shuffle = True
# Create sampler
sampler = NeighborSampler(
g,
[int(fanout) for fanout in args.fan_out.split(",")],
dgl.distributed.sample_neighbors,
device,
# prefetch_node_feats/prefetch_labels are not supported for DistGraph yet.
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(",")]
)
# Create DataLoader for constructing blocks
dataloader = DistDataLoader(
dataset=train_nid.numpy(),
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
train_nid,
sampler,
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=shuffle,
drop_last=False,
)
# Define model and optimizer
model = DistSAGE(
in_feats,
......@@ -247,28 +195,27 @@ def run(args, device, data):
num_seeds = 0
num_inputs = 0
start = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
# Loop over the dataloader to sample the computation dependency graph
# as a list of blocks.
step_time = []
with model.join():
for step, blocks in enumerate(dataloader):
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
tic_step = time.time()
sample_time += tic_step - start
# The nodes for input lies at the LHS side of the first block.
# The nodes for output lies at the RHS side of the last block.
batch_inputs = blocks[0].srcdata["features"]
batch_labels = blocks[-1].dstdata["labels"]
# fetch features/labels
batch_inputs, batch_labels = load_subtensor(
g, seeds, input_nodes, "cpu"
)
batch_labels = batch_labels.long()
num_seeds += len(blocks[-1].dstdata[dgl.NID])
num_inputs += len(blocks[0].srcdata[dgl.NID])
# move to target device
blocks = [block.to(device) for block in blocks]
batch_inputs = batch_inputs.to(device)
batch_labels = batch_labels.to(device)
# Compute loss and prediction
start = time.time()
# print(g.rank(), blocks[0].device, model.module.layers[0].fc_neigh.weight.device, dev_id)
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
forward_end = time.time()
......@@ -292,7 +239,9 @@ def run(args, device, data):
else 0
)
print(
"Part {} | Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB | time {:.3f} s".format(
"Part {} | Epoch {:05d} | Step {:05d} | Loss {:.4f} | "
"Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU "
"{:.1f} MB | time {:.3f} s".format(
g.rank(),
epoch,
step,
......@@ -300,14 +249,16 @@ def run(args, device, data):
acc.item(),
np.mean(iter_tput[3:]),
gpu_mem_alloc,
np.sum(step_time[-args.log_every :]),
np.sum(step_time[-args.log_every:]),
)
)
start = time.time()
toc = time.time()
print(
"Part {}, Epoch Time(s): {:.4f}, sample+data_copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}".format(
"Part {}, Epoch Time(s): {:.4f}, sample+data_copy: {:.4f}, "
"forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, "
"#inputs: {}".format(
g.rank(),
toc - tic,
sample_time,
......@@ -323,7 +274,7 @@ def run(args, device, data):
if epoch % args.eval_every == 0 and epoch != 0:
start = time.time()
val_acc, test_acc = evaluate(
model.module,
model if args.standalone else model.module,
g,
g.ndata["features"],
g.ndata["labels"],
......@@ -333,7 +284,8 @@ def run(args, device, data):
device,
)
print(
"Part {}, Val Acc {:.4f}, Test Acc {:.4f}, time: {:.4f}".format(
"Part {}, Val Acc {:.4f}, Test Acc {:.4f}, time: {:.4f}".format
(
g.rank(), val_acc, test_acc, time.time() - start
)
)
......@@ -346,7 +298,10 @@ def main(args):
print(socket.gethostname(), "Initializing DGL process group")
th.distributed.init_process_group(backend=args.backend)
print(socket.gethostname(), "Initializing DistGraph")
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
g = dgl.distributed.DistGraph(
args.graph_name,
part_config=args.part_config
)
print(socket.gethostname(), "rank:", g.rank())
pb = g.get_partition_book()
......@@ -381,7 +336,8 @@ def main(args):
)
local_nid = pb.partid2nids(pb.partid).detach().numpy()
print(
"part {}, train: {} (local: {}), val: {} (local: {}), test: {} (local: {})".format(
"part {}, train: {} (local: {}), val: {} (local: {}), test: {} "
"(local: {})".format(
g.rank(),
len(train_nid),
len(np.intersect1d(train_nid.numpy(), local_nid)),
......@@ -398,8 +354,8 @@ def main(args):
dev_id = g.rank() % args.num_gpus
device = th.device("cuda:" + str(dev_id))
n_classes = args.n_classes
if n_classes == -1:
labels = g.ndata["labels"][np.arange(g.number_of_nodes())]
if n_classes == 0:
labels = g.ndata["labels"][np.arange(g.num_nodes())]
n_classes = len(th.unique(labels[th.logical_not(th.isnan(labels))]))
del labels
print("#labels:", n_classes)
......@@ -413,7 +369,6 @@ def main(args):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="GCN")
register_data_args(parser)
parser.add_argument("--graph_name", type=str, help="graph name")
parser.add_argument("--id", type=int, help="the partition id")
parser.add_argument(
......@@ -422,14 +377,8 @@ if __name__ == "__main__":
parser.add_argument(
"--part_config", type=str, help="The path to the partition config file"
)
parser.add_argument("--num_clients", type=int, help="The number of clients")
parser.add_argument(
"--n_classes",
type=int,
default=-1,
help="The number of classes. If not specified, this"
" value will be calculated via scaning all the labels"
" in the dataset which probably causes memory burst.",
"--n_classes", type=int, default=0, help="the number of classes"
)
parser.add_argument(
"--backend",
......@@ -463,7 +412,8 @@ if __name__ == "__main__":
"--pad-data",
default=False,
action="store_true",
help="Pad train nid to the same length across machine, to ensure num of batches to be the same.",
help="Pad train nid to the same length across machine, to ensure num "
"of batches to be the same.",
)
parser.add_argument(
"--net_type",
......
import os
os.environ["DGLBACKEND"] = "pytorch"
import argparse
import math
import time
from functools import wraps
from multiprocessing import Process
import numpy as np
import torch as th
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
from torch.utils.data import DataLoader
from train_dist import DistSAGE, NeighborSampler, compute_acc
import dgl
import dgl.function as fn
import dgl.nn.pytorch as dglnn
from dgl import DGLGraph
from dgl.data import load_data, register_data_args
from dgl.data.utils import load_graphs
from dgl.distributed import DistDataLoader, DistEmbedding
class TransDistSAGE(DistSAGE):
def __init__(
self, in_feats, n_hidden, n_classes, n_layers, activation, dropout
):
super(TransDistSAGE, self).__init__(
in_feats, n_hidden, n_classes, n_layers, activation, dropout
)
def inference(self, standalone, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# TODO: can we standardize this?
nodes = dgl.distributed.node_split(
np.arange(g.number_of_nodes()),
g.get_partition_book(),
force_even=True,
)
y = dgl.distributed.DistTensor(
(g.number_of_nodes(), self.n_hidden),
th.float32,
"h",
persistent=True,
)
for l, layer in enumerate(self.layers):
if l == len(self.layers) - 1:
y = dgl.distributed.DistTensor(
(g.number_of_nodes(), self.n_classes),
th.float32,
"h_last",
persistent=True,
)
sampler = NeighborSampler(
g,
[-1],
dgl.distributed.sample_neighbors,
device,
load_feat=False,
)
print(
"|V|={}, eval batch size: {}".format(
g.number_of_nodes(), batch_size
)
)
# Create PyTorch DataLoader for constructing blocks
dataloader = DistDataLoader(
dataset=nodes,
batch_size=batch_size,
collate_fn=sampler.sample_blocks,
shuffle=False,
drop_last=False,
)
for blocks in tqdm.tqdm(dataloader):
block = blocks[0].to(device)
input_nodes = block.srcdata[dgl.NID]
output_nodes = block.dstdata[dgl.NID]
h = x[input_nodes].to(device)
h_dst = h[: block.number_of_dst_nodes()]
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
y[output_nodes] = h.cpu()
x = y
g.barrier()
return y
from dgl.distributed import DistEmbedding
from train_dist import DistSAGE, compute_acc
def initializer(shape, dtype):
......@@ -114,7 +18,9 @@ def initializer(shape, dtype):
class DistEmb(nn.Module):
def __init__(self, num_nodes, emb_size, dgl_sparse_emb=False, dev_id="cpu"):
def __init__(
self, num_nodes, emb_size, dgl_sparse_emb=False, dev_id="cpu"
):
super().__init__()
self.dev_id = dev_id
self.emb_size = emb_size
......@@ -138,11 +44,11 @@ class DistEmb(nn.Module):
def load_embs(standalone, emb_layer, g):
nodes = dgl.distributed.node_split(
np.arange(g.number_of_nodes()), g.get_partition_book(), force_even=True
np.arange(g.num_nodes()), g.get_partition_book(), force_even=True
)
x = dgl.distributed.DistTensor(
(
g.number_of_nodes(),
g.num_nodes(),
emb_layer.module.emb_size
if isinstance(emb_layer, th.nn.parallel.DistributedDataParallel)
else emb_layer.emb_size,
......@@ -154,7 +60,7 @@ def load_embs(standalone, emb_layer, g):
num_nodes = nodes.shape[0]
for i in range((num_nodes + 1023) // 1024):
idx = nodes[
i * 1024 : (i + 1) * 1024
i * 1024: (i + 1) * 1024
if (i + 1) * 1024 < num_nodes
else num_nodes
]
......@@ -187,11 +93,13 @@ def evaluate(
batch_size : Number of nodes to compute at the same time.
device : The GPU device to evaluate on.
"""
if not standalone:
model = model.module
model.eval()
emb_layer.eval()
with th.no_grad():
inputs = load_embs(standalone, emb_layer, g)
pred = model.inference(standalone, g, inputs, batch_size, device)
pred = model.inference(g, inputs, batch_size, device)
model.train()
emb_layer.train()
return compute_acc(pred[val_nid], labels[val_nid]), compute_acc(
......@@ -202,24 +110,17 @@ def evaluate(
def run(args, device, data):
# Unpack data
train_nid, val_nid, test_nid, n_classes, g = data
# Create sampler
sampler = NeighborSampler(
g,
[int(fanout) for fanout in args.fan_out.split(",")],
dgl.distributed.sample_neighbors,
device,
load_feat=False,
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(",")]
)
# Create DataLoader for constructing blocks
dataloader = DistDataLoader(
dataset=train_nid.numpy(),
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
train_nid,
sampler,
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=True,
drop_last=False,
)
# Define model and optimizer
emb_layer = DistEmb(
g.num_nodes(),
......@@ -227,7 +128,7 @@ def run(args, device, data):
dgl_sparse_emb=args.dgl_sparse,
dev_id=device,
)
model = TransDistSAGE(
model = DistSAGE(
args.num_hidden,
args.num_hidden,
n_classes,
......@@ -263,9 +164,10 @@ def run(args, device, data):
emb_optimizer = th.optim.SparseAdam(
list(emb_layer.module.sparse_emb.parameters()), lr=args.sparse_lr
)
print("optimize Pytorch sparse embedding:", emb_layer.module.sparse_emb)
train_size = th.sum(g.ndata["train_mask"][0 : g.number_of_nodes()])
print(
"optimize Pytorch sparse embedding:",
emb_layer.module.sparse_emb
)
# Training loop
iter_tput = []
......@@ -280,67 +182,65 @@ def run(args, device, data):
num_seeds = 0
num_inputs = 0
start = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
step_time = []
for step, blocks in enumerate(dataloader):
tic_step = time.time()
sample_time += tic_step - start
# The nodes for input lies at the LHS side of the first block.
# The nodes for output lies at the RHS side of the last block.
batch_inputs = blocks[0].srcdata[dgl.NID]
batch_labels = blocks[-1].dstdata["labels"]
batch_labels = batch_labels.long()
num_seeds += len(blocks[-1].dstdata[dgl.NID])
num_inputs += len(blocks[0].srcdata[dgl.NID])
blocks = [block.to(device) for block in blocks]
batch_labels = batch_labels.to(device)
# Compute loss and prediction
start = time.time()
batch_inputs = emb_layer(batch_inputs)
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
forward_end = time.time()
emb_optimizer.zero_grad()
optimizer.zero_grad()
loss.backward()
compute_end = time.time()
forward_time += forward_end - start
backward_time += compute_end - forward_end
emb_optimizer.step()
optimizer.step()
update_time += time.time() - compute_end
step_t = time.time() - tic_step
step_time.append(step_t)
iter_tput.append(len(blocks[-1].dstdata[dgl.NID]) / step_t)
if step % args.log_every == 0:
acc = compute_acc(batch_pred, batch_labels)
gpu_mem_alloc = (
th.cuda.max_memory_allocated() / 1000000
if th.cuda.is_available()
else 0
)
print(
"Part {} | Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MB | time {:.3f} s".format(
g.rank(),
epoch,
step,
loss.item(),
acc.item(),
np.mean(iter_tput[3:]),
gpu_mem_alloc,
np.sum(step_time[-args.log_every :]),
with model.join():
# Loop over the dataloader to sample the computation dependency
# graph as a list of blocks.
step_time = []
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
tic_step = time.time()
sample_time += tic_step - start
num_seeds += len(blocks[-1].dstdata[dgl.NID])
num_inputs += len(blocks[0].srcdata[dgl.NID])
blocks = [block.to(device) for block in blocks]
batch_labels = g.ndata["labels"][seeds].long().to(device)
# Compute loss and prediction
start = time.time()
batch_inputs = emb_layer(input_nodes)
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
forward_end = time.time()
emb_optimizer.zero_grad()
optimizer.zero_grad()
loss.backward()
compute_end = time.time()
forward_time += forward_end - start
backward_time += compute_end - forward_end
emb_optimizer.step()
optimizer.step()
update_time += time.time() - compute_end
step_t = time.time() - tic_step
step_time.append(step_t)
iter_tput.append(len(blocks[-1].dstdata[dgl.NID]) / step_t)
if step % args.log_every == 0:
acc = compute_acc(batch_pred, batch_labels)
gpu_mem_alloc = (
th.cuda.max_memory_allocated() / 1000000
if th.cuda.is_available()
else 0
)
)
start = time.time()
print(
"Part {} | Epoch {:05d} | Step {:05d} | Loss {:.4f} | "
"Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU "
"{:.1f} MB | time {:.3f} s".format(
g.rank(),
epoch,
step,
loss.item(),
acc.item(),
np.mean(iter_tput[3:]),
gpu_mem_alloc,
np.sum(step_time[-args.log_every:]),
)
)
start = time.time()
toc = time.time()
print(
"Part {}, Epoch Time(s): {:.4f}, sample+data_copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}".format(
"Part {}, Epoch Time(s): {:.4f}, sample+data_copy: {:.4f}, forward"
": {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs"
": {}".format(
g.rank(),
toc - tic,
sample_time,
......@@ -357,7 +257,7 @@ def run(args, device, data):
start = time.time()
val_acc, test_acc = evaluate(
args.standalone,
model.module,
model,
emb_layer,
g,
g.ndata["labels"],
......@@ -367,7 +267,8 @@ def run(args, device, data):
device,
)
print(
"Part {}, Val Acc {:.4f}, Test Acc {:.4f}, time: {:.4f}".format(
"Part {}, Val Acc {:.4f}, Test Acc {:.4f}, time: {:.4f}".format
(
g.rank(), val_acc, test_acc, time.time() - start
)
)
......@@ -377,7 +278,10 @@ def main(args):
dgl.distributed.initialize(args.ip_config)
if not args.standalone:
th.distributed.init_process_group(backend="gloo")
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
g = dgl.distributed.DistGraph(
args.graph_name,
part_config=args.part_config
)
print("rank:", g.rank())
pb = g.get_partition_book()
......@@ -392,7 +296,8 @@ def main(args):
)
local_nid = pb.partid2nids(pb.partid).detach().numpy()
print(
"part {}, train: {} (local: {}), val: {} (local: {}), test: {} (local: {})".format(
"part {}, train: {} (local: {}), val: {} (local: {}), test: {} "
"(local: {})".format(
g.rank(),
len(train_nid),
len(np.intersect1d(train_nid.numpy(), local_nid)),
......@@ -405,8 +310,9 @@ def main(args):
if args.num_gpus == -1:
device = th.device("cpu")
else:
device = th.device("cuda:" + str(args.local_rank))
labels = g.ndata["labels"][np.arange(g.number_of_nodes())]
dev_id = g.rank() % args.num_gpus
device = th.device("cuda:" + str(dev_id))
labels = g.ndata["labels"][np.arange(g.num_nodes())]
n_classes = len(th.unique(labels[th.logical_not(th.isnan(labels))]))
print("#labels:", n_classes)
......@@ -418,7 +324,6 @@ def main(args):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="GCN")
register_data_args(parser)
parser.add_argument("--graph_name", type=str, help="graph name")
parser.add_argument("--id", type=int, help="the partition id")
parser.add_argument(
......@@ -427,7 +332,6 @@ if __name__ == "__main__":
parser.add_argument(
"--part_config", type=str, help="The path to the partition config file"
)
parser.add_argument("--num_clients", type=int, help="The number of clients")
parser.add_argument("--n_classes", type=int, help="the number of classes")
parser.add_argument(
"--num_gpus",
......
import os
os.environ["DGLBACKEND"] = "pytorch"
import argparse
import math
import time
from functools import wraps
from multiprocessing import Process
from contextlib import contextmanager
import numpy as np
import sklearn.linear_model as lm
import sklearn.metrics as skm
import torch as th
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
import dgl
import dgl.function as fn
import dgl.nn.pytorch as dglnn
from dgl import DGLGraph
from dgl.data import load_data, register_data_args
from dgl.data.utils import load_graphs
from dgl.distributed import DistDataLoader
class SAGE(nn.Module):
class DistSAGE(nn.Module):
def __init__(
self, in_feats, n_hidden, n_classes, n_layers, activation, dropout
):
......@@ -44,224 +32,66 @@ class SAGE(nn.Module):
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
for i, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
if i != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def inference(self, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# TODO: can we standardize this?
for l, layer in enumerate(self.layers):
y = th.zeros(
g.number_of_nodes(),
self.n_hidden if l != len(self.layers) - 1 else self.n_classes,
)
sampler = dgl.dataloading.MultiLayerNeighborSampler([None])
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
th.arange(g.number_of_nodes()),
sampler,
batch_size=batch_size,
shuffle=True,
drop_last=False,
num_workers=0,
)
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0]
block = block.int().to(device)
h = x[input_nodes].to(device)
h = layer(block, h)
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
y[output_nodes] = h.cpu()
x = y
return y
class NegativeSampler(object):
def __init__(self, g, neg_nseeds):
self.neg_nseeds = neg_nseeds
def __call__(self, num_samples):
# select local neg nodes as seeds
return self.neg_nseeds[
th.randint(self.neg_nseeds.shape[0], (num_samples,))
]
class NeighborSampler(object):
def __init__(
self, g, fanouts, neg_nseeds, sample_neighbors, num_negs, remove_edge
):
self.g = g
self.fanouts = fanouts
self.sample_neighbors = sample_neighbors
self.neg_sampler = NegativeSampler(g, neg_nseeds)
self.num_negs = num_negs
self.remove_edge = remove_edge
def sample_blocks(self, seed_edges):
n_edges = len(seed_edges)
seed_edges = th.LongTensor(np.asarray(seed_edges))
heads, tails = self.g.find_edges(seed_edges)
neg_tails = self.neg_sampler(self.num_negs * n_edges)
neg_heads = heads.view(-1, 1).expand(n_edges, self.num_negs).flatten()
# Maintain the correspondence between heads, tails and negative tails as two
# graphs.
# pos_graph contains the correspondence between each head and its positive tail.
# neg_graph contains the correspondence between each head and its negative tails.
# Both pos_graph and neg_graph are first constructed with the same node space as
# the original graph. Then they are compacted together with dgl.compact_graphs.
pos_graph = dgl.graph(
(heads, tails), num_nodes=self.g.number_of_nodes()
)
neg_graph = dgl.graph(
(neg_heads, neg_tails), num_nodes=self.g.number_of_nodes()
)
pos_graph, neg_graph = dgl.compact_graphs([pos_graph, neg_graph])
seeds = pos_graph.ndata[dgl.NID]
blocks = []
for fanout in self.fanouts:
# For each seed node, sample ``fanout`` neighbors.
frontier = self.sample_neighbors(
self.g, seeds, fanout, replace=True
)
if self.remove_edge:
# Remove all edges between heads and tails, as well as heads and neg_tails.
_, _, edge_ids = frontier.edge_ids(
th.cat([heads, tails, neg_heads, neg_tails]),
th.cat([tails, heads, neg_tails, neg_heads]),
return_uv=True,
)
frontier = dgl.remove_edges(frontier, edge_ids)
# Then we compact the frontier into a bipartite graph for message passing.
block = dgl.to_block(frontier, seeds)
# Obtain the seed nodes for next layer.
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
Inference with the GraphSAGE model on full neighbors (i.e. without
neighbor sampling).
input_nodes = blocks[0].srcdata[dgl.NID]
blocks[0].srcdata["features"] = load_subtensor(
self.g, input_nodes, "cpu"
)
# Pre-generate CSR format that it can be used in training directly
return pos_graph, neg_graph, blocks
class PosNeighborSampler(object):
def __init__(self, g, fanouts, sample_neighbors):
self.g = g
self.fanouts = fanouts
self.sample_neighbors = sample_neighbors
def sample_blocks(self, seeds):
seeds = th.LongTensor(np.asarray(seeds))
blocks = []
for fanout in self.fanouts:
# For each seed node, sample ``fanout`` neighbors.
frontier = self.sample_neighbors(
self.g, seeds, fanout, replace=True
)
# Then we compact the frontier into a bipartite graph for message passing.
block = dgl.to_block(frontier, seeds)
# Obtain the seed nodes for next layer.
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
return blocks
class DistSAGE(SAGE):
def __init__(
self, in_feats, n_hidden, n_classes, n_layers, activation, dropout
):
super(DistSAGE, self).__init__(
in_feats, n_hidden, n_classes, n_layers, activation, dropout
)
def inference(self, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
The inference code is written in a fashion that it could handle any
number of nodes and layers.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# During inference with sampling, multi-layer blocks are very
# inefficient because lots of computations in the first few layers are
# repeated. Therefore, we compute the representation of all nodes layer
# by layer. The nodes on each layer are of course splitted in batches.
# TODO: can we standardize this?
nodes = dgl.distributed.node_split(
np.arange(g.number_of_nodes()),
np.arange(g.num_nodes()),
g.get_partition_book(),
force_even=True,
)
y = dgl.distributed.DistTensor(
(g.number_of_nodes(), self.n_hidden),
(g.num_nodes(), self.n_hidden),
th.float32,
"h",
persistent=True,
)
for l, layer in enumerate(self.layers):
if l == len(self.layers) - 1:
for i, layer in enumerate(self.layers):
if i == len(self.layers) - 1:
y = dgl.distributed.DistTensor(
(g.number_of_nodes(), self.n_classes),
(g.num_nodes(), self.n_classes),
th.float32,
"h_last",
persistent=True,
)
sampler = PosNeighborSampler(
g, [-1], dgl.distributed.sample_neighbors
)
print(
"|V|={}, eval batch size: {}".format(
g.number_of_nodes(), batch_size
)
)
# Create PyTorch DataLoader for constructing blocks
dataloader = DistDataLoader(
dataset=nodes,
# Create sampler
sampler = dgl.dataloading.NeighborSampler([-1])
# Create dataloader
dataloader = dgl.dataloading.DistNodeDataLoader(
g,
nodes,
sampler,
batch_size=batch_size,
collate_fn=sampler.sample_blocks,
shuffle=False,
drop_last=False,
)
for blocks in tqdm.tqdm(dataloader):
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0].to(device)
input_nodes = block.srcdata[dgl.NID]
output_nodes = block.dstdata[dgl.NID]
h = x[input_nodes].to(device)
h_dst = h[: block.number_of_dst_nodes()]
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
if i != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
......@@ -271,6 +101,11 @@ class DistSAGE(SAGE):
g.barrier()
return y
@contextmanager
def join(self):
"""dummy join for standalone"""
yield
def load_subtensor(g, input_nodes, device):
"""
......@@ -359,24 +194,24 @@ def run(args, device, data):
labels,
) = data
# Create sampler
sampler = NeighborSampler(
g,
[int(fanout) for fanout in args.fan_out.split(",")],
train_nids,
dgl.distributed.sample_neighbors,
args.num_negs,
args.remove_edge,
neg_sampler = dgl.dataloading.negative_sampler.Uniform(args.num_negs)
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(",")]
)
# Create PyTorch DataLoader for constructing blocks
dataloader = dgl.distributed.DistDataLoader(
dataset=train_eids.numpy(),
# Create dataloader
exclude = "reverse_id" if args.remove_edge else None
reverse_eids = th.arange(g.num_edges()) if args.remove_edge else None
dataloader = dgl.dataloading.DistEdgeDataLoader(
g,
train_eids,
sampler,
negative_sampler=neg_sampler,
exclude=exclude,
reverse_eids=reverse_eids,
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=True,
drop_last=False,
)
# Define model and optimizer
model = DistSAGE(
in_feats,
......@@ -402,16 +237,10 @@ def run(args, device, data):
# Training loop
epoch = 0
for epoch in range(args.num_epochs):
sample_time = 0
copy_time = 0
forward_time = 0
backward_time = 0
update_time = 0
num_seeds = 0
num_inputs = 0
step_time = []
iter_t = []
sample_t = []
feat_copy_t = []
forward_t = []
......@@ -420,65 +249,68 @@ def run(args, device, data):
iter_tput = []
start = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (pos_graph, neg_graph, blocks) in enumerate(dataloader):
tic_step = time.time()
sample_t.append(tic_step - start)
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
blocks = [block.to(device) for block in blocks]
# The nodes for input lies at the LHS side of the first block.
# The nodes for output lies at the RHS side of the last block.
# Load the input features as well as output labels
batch_inputs = blocks[0].srcdata["features"]
copy_time = time.time()
feat_copy_t.append(copy_time - tic_step)
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
forward_end = time.time()
optimizer.zero_grad()
loss.backward()
compute_end = time.time()
forward_t.append(forward_end - copy_time)
backward_t.append(compute_end - forward_end)
# Aggregate gradients in multiple nodes.
optimizer.step()
update_t.append(time.time() - compute_end)
pos_edges = pos_graph.number_of_edges()
neg_edges = neg_graph.number_of_edges()
step_t = time.time() - start
step_time.append(step_t)
iter_tput.append(pos_edges / step_t)
num_seeds += pos_edges
if step % args.log_every == 0:
print(
"[{}] Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed (samples/sec) {:.4f} | time {:.3f} s"
"| sample {:.3f} | copy {:.3f} | forward {:.3f} | backward {:.3f} | update {:.3f}".format(
g.rank(),
epoch,
step,
loss.item(),
np.mean(iter_tput[3:]),
np.sum(step_time[-args.log_every :]),
np.sum(sample_t[-args.log_every :]),
np.sum(feat_copy_t[-args.log_every :]),
np.sum(forward_t[-args.log_every :]),
np.sum(backward_t[-args.log_every :]),
np.sum(update_t[-args.log_every :]),
with model.join():
# Loop over the dataloader to sample the computation dependency
# graph as a list of blocks.
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(
dataloader
):
tic_step = time.time()
sample_t.append(tic_step - start)
copy_t = time.time()
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
blocks = [block.to(device) for block in blocks]
batch_inputs = load_subtensor(g, input_nodes, device)
copy_time = time.time()
feat_copy_t.append(copy_time - copy_t)
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
forward_end = time.time()
optimizer.zero_grad()
loss.backward()
compute_end = time.time()
forward_t.append(forward_end - copy_time)
backward_t.append(compute_end - forward_end)
# Aggregate gradients in multiple nodes.
optimizer.step()
update_t.append(time.time() - compute_end)
pos_edges = pos_graph.num_edges()
step_t = time.time() - start
step_time.append(step_t)
iter_tput.append(pos_edges / step_t)
num_seeds += pos_edges
if step % args.log_every == 0:
print(
"[{}] Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed "
"(samples/sec) {:.4f} | time {:.3f}s | sample {:.3f} | "
"copy {:.3f} | forward {:.3f} | backward {:.3f} | "
"update {:.3f}".format(
g.rank(),
epoch,
step,
loss.item(),
np.mean(iter_tput[3:]),
np.sum(step_time[-args.log_every:]),
np.sum(sample_t[-args.log_every:]),
np.sum(feat_copy_t[-args.log_every:]),
np.sum(forward_t[-args.log_every:]),
np.sum(backward_t[-args.log_every:]),
np.sum(update_t[-args.log_every:]),
)
)
)
start = time.time()
start = time.time()
print(
"[{}]Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}".format(
"[{}]Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, "
"forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, "
"#inputs: {}".format(
g.rank(),
np.sum(step_time),
np.sum(sample_t),
......@@ -493,14 +325,13 @@ def run(args, device, data):
epoch += 1
# evaluate the embedding using LogisticRegression
if args.standalone:
pred = generate_emb(
model, g, g.ndata["features"], args.batch_size_eval, device
)
else:
pred = generate_emb(
model.module, g, g.ndata["features"], args.batch_size_eval, device
)
pred = generate_emb(
model if args.standalone else model.module,
g,
g.ndata["features"],
args.batch_size_eval,
device,
)
if g.rank() == 0:
eval_acc, test_acc = compute_acc(
pred, labels, global_train_nid, global_valid_nid, global_test_nid
......@@ -518,7 +349,6 @@ def run(args, device, data):
if g.rank() == 0:
th.save(pred, "emb.pt")
else:
feat = g.ndata["features"]
th.save(pred, "emb.pt")
......@@ -526,32 +356,35 @@ def main(args):
dgl.distributed.initialize(args.ip_config)
if not args.standalone:
th.distributed.init_process_group(backend="gloo")
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
g = dgl.distributed.DistGraph(
args.graph_name, part_config=args.part_config
)
print("rank:", g.rank())
print("number of edges", g.number_of_edges())
print("number of edges", g.num_edges())
train_eids = dgl.distributed.edge_split(
th.ones((g.number_of_edges(),), dtype=th.bool),
th.ones((g.num_edges(),), dtype=th.bool),
g.get_partition_book(),
force_even=True,
)
train_nids = dgl.distributed.node_split(
th.ones((g.number_of_nodes(),), dtype=th.bool), g.get_partition_book()
th.ones((g.num_nodes(),), dtype=th.bool), g.get_partition_book()
)
global_train_nid = th.LongTensor(
np.nonzero(g.ndata["train_mask"][np.arange(g.number_of_nodes())])
np.nonzero(g.ndata["train_mask"][np.arange(g.num_nodes())])
)
global_valid_nid = th.LongTensor(
np.nonzero(g.ndata["val_mask"][np.arange(g.number_of_nodes())])
np.nonzero(g.ndata["val_mask"][np.arange(g.num_nodes())])
)
global_test_nid = th.LongTensor(
np.nonzero(g.ndata["test_mask"][np.arange(g.number_of_nodes())])
np.nonzero(g.ndata["test_mask"][np.arange(g.num_nodes())])
)
labels = g.ndata["labels"][np.arange(g.number_of_nodes())]
labels = g.ndata["labels"][np.arange(g.num_nodes())]
if args.num_gpus == -1:
device = th.device("cpu")
else:
device = th.device("cuda:" + str(args.local_rank))
dev_id = g.rank() % args.num_gpus
device = th.device("cuda:" + str(dev_id))
# Pack data
in_feats = g.ndata["features"].shape[1]
......@@ -577,7 +410,6 @@ def main(args):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="GCN")
register_data_args(parser)
parser.add_argument("--graph_name", type=str, help="graph name")
parser.add_argument("--id", type=int, help="the partition id")
parser.add_argument(
......@@ -610,12 +442,6 @@ if __name__ == "__main__":
"--standalone", action="store_true", help="run in the standalone mode"
)
parser.add_argument("--num_negs", type=int, default=1)
parser.add_argument(
"--neg_share",
default=False,
action="store_true",
help="sharing neg nodes for positive nodes",
)
parser.add_argument(
"--remove_edge",
default=False,
......
import os
os.environ["DGLBACKEND"] = "pytorch"
import argparse
import math
import time
from functools import wraps
from multiprocessing import Process
import numpy as np
import sklearn.linear_model as lm
import sklearn.metrics as skm
import torch as th
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
from train_dist_transductive import DistEmb, load_embs
from train_dist_unsupervised import (
SAGE,
CrossEntropyLoss,
NeighborSampler,
PosNeighborSampler,
compute_acc,
)
import dgl
import dgl.function as fn
import dgl.nn.pytorch as dglnn
from dgl import DGLGraph
from dgl.data import load_data, register_data_args
from dgl.data.utils import load_graphs
from dgl.distributed import DistDataLoader
from train_dist_transductive import DistEmb, load_embs
from train_dist_unsupervised import CrossEntropyLoss, DistSAGE, compute_acc
def generate_emb(standalone, model, emb_layer, g, batch_size, device):
......@@ -43,6 +19,8 @@ def generate_emb(standalone, model, emb_layer, g, batch_size, device):
batch_size : Number of nodes to compute at the same time.
device : The GPU device to evaluate on.
"""
if not standalone:
model = model.module
model.eval()
emb_layer.eval()
with th.no_grad():
......@@ -64,24 +42,24 @@ def run(args, device, data):
labels,
) = data
# Create sampler
sampler = NeighborSampler(
g,
[int(fanout) for fanout in args.fan_out.split(",")],
train_nids,
dgl.distributed.sample_neighbors,
args.num_negs,
args.remove_edge,
neg_sampler = dgl.dataloading.negative_sampler.Uniform(args.num_negs)
sampler = dgl.dataloading.NeighborSampler(
[int(fanout) for fanout in args.fan_out.split(",")]
)
# Create PyTorch DataLoader for constructing blocks
dataloader = dgl.distributed.DistDataLoader(
dataset=train_eids.numpy(),
# Create dataloader
exclude = "reverse_id" if args.remove_edge else None
reverse_eids = th.arange(g.num_edges()) if args.remove_edge else None
dataloader = dgl.dataloading.DistEdgeDataLoader(
g,
train_eids,
sampler,
negative_sampler=neg_sampler,
exclude=exclude,
reverse_eids=reverse_eids,
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=True,
drop_last=False,
)
# Define model and optimizer
emb_layer = DistEmb(
g.num_nodes(),
......@@ -89,7 +67,7 @@ def run(args, device, data):
dgl_sparse_emb=args.dgl_sparse,
dev_id=device,
)
model = SAGE(
model = DistSAGE(
args.num_hidden,
args.num_hidden,
args.num_hidden,
......@@ -126,21 +104,16 @@ def run(args, device, data):
emb_optimizer = th.optim.SparseAdam(
list(emb_layer.module.sparse_emb.parameters()), lr=args.sparse_lr
)
print("optimize Pytorch sparse embedding:", emb_layer.module.sparse_emb)
print(
"optimize Pytorch sparse embedding:", emb_layer.module.sparse_emb
)
# Training loop
epoch = 0
for epoch in range(args.num_epochs):
sample_time = 0
copy_time = 0
forward_time = 0
backward_time = 0
update_time = 0
num_seeds = 0
num_inputs = 0
step_time = []
iter_t = []
sample_t = []
feat_copy_t = []
forward_t = []
......@@ -149,69 +122,71 @@ def run(args, device, data):
iter_tput = []
start = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (pos_graph, neg_graph, blocks) in enumerate(dataloader):
tic_step = time.time()
sample_t.append(tic_step - start)
with model.join():
# Loop over the dataloader to sample the computation dependency
# graph as a list of blocks.
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(
dataloader
):
tic_step = time.time()
sample_t.append(tic_step - start)
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
blocks = [block.to(device) for block in blocks]
# The nodes for input lies at the LHS side of the first block.
# The nodes for output lies at the RHS side of the last block.
copy_t = time.time()
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
blocks = [block.to(device) for block in blocks]
feat_copy_t.append(copy_t - tic_step)
copy_time = time.time()
# Load the input features as well as output labels
batch_inputs = blocks[0].srcdata[dgl.NID]
copy_time = time.time()
feat_copy_t.append(copy_time - tic_step)
# Compute loss and prediction
batch_inputs = emb_layer(input_nodes)
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
forward_end = time.time()
emb_optimizer.zero_grad()
optimizer.zero_grad()
loss.backward()
compute_end = time.time()
forward_t.append(forward_end - copy_time)
backward_t.append(compute_end - forward_end)
# Compute loss and prediction
batch_inputs = emb_layer(batch_inputs)
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
forward_end = time.time()
emb_optimizer.zero_grad()
optimizer.zero_grad()
loss.backward()
compute_end = time.time()
forward_t.append(forward_end - copy_time)
backward_t.append(compute_end - forward_end)
# Aggregate gradients in multiple nodes.
emb_optimizer.step()
optimizer.step()
update_t.append(time.time() - compute_end)
# Aggregate gradients in multiple nodes.
emb_optimizer.step()
optimizer.step()
update_t.append(time.time() - compute_end)
pos_edges = pos_graph.num_edges()
pos_edges = pos_graph.number_of_edges()
neg_edges = neg_graph.number_of_edges()
step_t = time.time() - start
step_time.append(step_t)
iter_tput.append(pos_edges / step_t)
num_seeds += pos_edges
if step % args.log_every == 0:
print(
"[{}] Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed (samples/sec) {:.4f} | time {:.3f} s"
"| sample {:.3f} | copy {:.3f} | forward {:.3f} | backward {:.3f} | update {:.3f}".format(
g.rank(),
epoch,
step,
loss.item(),
np.mean(iter_tput[3:]),
np.sum(step_time[-args.log_every :]),
np.sum(sample_t[-args.log_every :]),
np.sum(feat_copy_t[-args.log_every :]),
np.sum(forward_t[-args.log_every :]),
np.sum(backward_t[-args.log_every :]),
np.sum(update_t[-args.log_every :]),
step_t = time.time() - start
step_time.append(step_t)
iter_tput.append(pos_edges / step_t)
num_seeds += pos_edges
if step % args.log_every == 0:
print(
"[{}] Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed "
"(samples/sec) {:.4f} | time {:.3f}s | sample {:.3f} | "
"copy {:.3f} | forward {:.3f} | backward {:.3f} | "
"update {:.3f}".format(
g.rank(),
epoch,
step,
loss.item(),
np.mean(iter_tput[3:]),
np.sum(step_time[-args.log_every:]),
np.sum(sample_t[-args.log_every:]),
np.sum(feat_copy_t[-args.log_every:]),
np.sum(forward_t[-args.log_every:]),
np.sum(backward_t[-args.log_every:]),
np.sum(update_t[-args.log_every:]),
)
)
)
start = time.time()
start = time.time()
print(
"[{}]Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}".format(
"[{}]Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, "
"forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, "
"#inputs: {}".format(
g.rank(),
np.sum(step_time),
np.sum(sample_t),
......@@ -226,14 +201,9 @@ def run(args, device, data):
epoch += 1
# evaluate the embedding using LogisticRegression
if args.standalone:
pred = generate_emb(
True, model, emb_layer, g, args.batch_size_eval, device
)
else:
pred = generate_emb(
False, model.module, emb_layer, g, args.batch_size_eval, device
)
pred = generate_emb(
args.standalone, model, emb_layer, g, args.batch_size_eval, device
)
if g.rank() == 0:
eval_acc, test_acc = compute_acc(
pred, labels, global_train_nid, global_valid_nid, global_test_nid
......@@ -251,7 +221,6 @@ def run(args, device, data):
if g.rank() == 0:
th.save(pred, "emb.pt")
else:
feat = g.ndata["features"]
th.save(pred, "emb.pt")
......@@ -259,32 +228,35 @@ def main(args):
dgl.distributed.initialize(args.ip_config)
if not args.standalone:
th.distributed.init_process_group(backend="gloo")
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
g = dgl.distributed.DistGraph(
args.graph_name, part_config=args.part_config
)
print("rank:", g.rank())
print("number of edges", g.number_of_edges())
print("number of edges", g.num_edges())
train_eids = dgl.distributed.edge_split(
th.ones((g.number_of_edges(),), dtype=th.bool),
th.ones((g.num_edges(),), dtype=th.bool),
g.get_partition_book(),
force_even=True,
)
train_nids = dgl.distributed.node_split(
th.ones((g.number_of_nodes(),), dtype=th.bool), g.get_partition_book()
th.ones((g.num_nodes(),), dtype=th.bool), g.get_partition_book()
)
global_train_nid = th.LongTensor(
np.nonzero(g.ndata["train_mask"][np.arange(g.number_of_nodes())])
np.nonzero(g.ndata["train_mask"][np.arange(g.num_nodes())])
)
global_valid_nid = th.LongTensor(
np.nonzero(g.ndata["val_mask"][np.arange(g.number_of_nodes())])
np.nonzero(g.ndata["val_mask"][np.arange(g.num_nodes())])
)
global_test_nid = th.LongTensor(
np.nonzero(g.ndata["test_mask"][np.arange(g.number_of_nodes())])
np.nonzero(g.ndata["test_mask"][np.arange(g.num_nodes())])
)
labels = g.ndata["labels"][np.arange(g.number_of_nodes())]
labels = g.ndata["labels"][np.arange(g.num_nodes())]
if args.num_gpus == -1:
device = th.device("cpu")
else:
device = th.device("cuda:" + str(args.local_rank))
dev_id = g.rank() % args.num_gpus
device = th.device("cuda:" + str(dev_id))
# Pack data
global_train_nid = global_train_nid.squeeze()
......@@ -308,7 +280,6 @@ def main(args):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="GCN")
register_data_args(parser)
parser.add_argument("--graph_name", type=str, help="graph name")
parser.add_argument("--id", type=int, help="the partition id")
parser.add_argument(
......
## DistGNN vertex-cut based graph partitioning (using Libra)
### How to run graph partitioning
```python partition_graph.py --dataset <dataset> --num-parts <num_parts> --out-dir <output_location>```
Example: The following command-line creates 4 partitions of pubmed graph
``` python partition_graph.py --dataset pubmed --num-parts 4 --out-dir ./```
The ouptut partitions are created in the current directory in Libra_result_\<dataset\>/ folder.
The *upcoming DistGNN* application can directly use these partitions for distributed training.
### How Libra partitioning works
Libra is a vertex-cut based graph partitioning method. It applies greedy heuristics to uniquely distribute the input graph edges among the partitions. It generates the partitions as a list of edges. Script ```libra_partition.py``` after generates the Libra partitions and converts the Libra output to DGL/DistGNN input format.
Note: Current Libra implementation is sequential. Extra overhead is paid due to the additional work of format conversion of the partitioned graph.
### Expected partitioning timinigs
Cora, Pubmed, Citeseer: < 10 sec (<10GB)
Reddit: ~150 sec (~ 25GB)
OGBN-Products: ~200 sec (~30GB)
Proteins: 1800 sec (Format conversion from public data takes time) (~100GB)
OGBN-Paper100M: 2500 sec (~200GB)
### Settings
Tested with:
Cent OS 7.6
gcc v8.3.0
PyTorch 1.7.1
Python 3.7.10
r"""
Copyright (c) 2021 Intel Corporation
\file Graph partitioning
\brief Calls Libra - Vertex-cut based graph partitioner for distirbuted training
\author Vasimuddin Md <vasimuddin.md@intel.com>,
Guixiang Ma <guixiang.ma@intel.com>
Sanchit Misra <sanchit.misra@intel.com>,
Ramanarayan Mohanty <ramanarayan.mohanty@intel.com>,
Sasikanth Avancha <sasikanth.avancha@intel.com>
Nesreen K. Ahmed <nesreen.k.ahmed@intel.com>
"""
import argparse
import csv
import os
import random
import sys
import time
from statistics import mean
import numpy as np
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from load_graph import load_ogb
import dgl
from dgl.base import DGLError
from dgl.data import load_data
from dgl.distgnn.partition import partition_graph
from dgl.distgnn.tools import load_proteins
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument("--dataset", type=str, default="cora")
argparser.add_argument("--num-parts", type=int, default=2)
argparser.add_argument("--out-dir", type=str, default="./")
args = argparser.parse_args()
dataset = args.dataset
num_community = args.num_parts
out_dir = "Libra_result_" + dataset ## "Libra_result_" prefix is mandatory
resultdir = os.path.join(args.out_dir, out_dir)
print("Input dataset for partitioning: ", dataset)
if args.dataset == "ogbn-products":
print("Loading ogbn-products")
G, _ = load_ogb("ogbn-products")
elif args.dataset == "ogbn-papers100M":
print("Loading ogbn-papers100M")
G, _ = load_ogb("ogbn-papers100M")
elif args.dataset == "proteins":
G = load_proteins("proteins")
elif args.dataset == "ogbn-arxiv":
print("Loading ogbn-arxiv")
G, _ = load_ogb("ogbn-arxiv")
else:
try:
G = load_data(args)[0]
except:
raise DGLError("Error: Dataset {} not found !!!".format(dataset))
print("Done loading the graph.", flush=True)
partition_graph(num_community, G, resultdir)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment