"tests/python/common/test_traversal.py" did not exist on "66676a548dd5ef77c8dcafe5218c04e572a4f2fb"
Unverified Commit 4b8eaf20 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] move feature copy to samplers in distributed GraphSage. (#1975)



* fix example.

* move feature copy to sampler.
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-19-1.us-west-2.compute.internal>
parent f7ce48bc
...@@ -22,11 +22,20 @@ import torch.multiprocessing as mp ...@@ -22,11 +22,20 @@ import torch.multiprocessing as mp
from torch.utils.data import DataLoader from torch.utils.data import DataLoader
from pyinstrument import Profiler from pyinstrument import Profiler
def load_subtensor(g, seeds, input_nodes, device):
"""
Copys features and labels of a set of nodes onto GPU.
"""
batch_inputs = g.ndata['features'][input_nodes].to(device)
batch_labels = g.ndata['labels'][seeds].to(device)
return batch_inputs, batch_labels
class NeighborSampler(object): class NeighborSampler(object):
def __init__(self, g, fanouts, sample_neighbors): def __init__(self, g, fanouts, sample_neighbors, device):
self.g = g self.g = g
self.fanouts = fanouts self.fanouts = fanouts
self.sample_neighbors = sample_neighbors self.sample_neighbors = sample_neighbors
self.device = device
def sample_blocks(self, seeds): def sample_blocks(self, seeds):
seeds = th.LongTensor(np.asarray(seeds)) seeds = th.LongTensor(np.asarray(seeds))
...@@ -40,6 +49,12 @@ class NeighborSampler(object): ...@@ -40,6 +49,12 @@ class NeighborSampler(object):
seeds = block.srcdata[dgl.NID] seeds = block.srcdata[dgl.NID]
blocks.insert(0, block) blocks.insert(0, block)
input_nodes = blocks[0].srcdata[dgl.NID]
seeds = blocks[-1].dstdata[dgl.NID]
batch_inputs, batch_labels = load_subtensor(self.g, seeds, input_nodes, self.device)
blocks[0].srcdata['features'] = batch_inputs
blocks[-1].dstdata['labels'] = batch_labels
return blocks return blocks
class DistSAGE(nn.Module): class DistSAGE(nn.Module):
...@@ -89,7 +104,7 @@ class DistSAGE(nn.Module): ...@@ -89,7 +104,7 @@ class DistSAGE(nn.Module):
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_classes), y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_classes),
th.float32, 'h_last', persistent=True) th.float32, 'h_last', persistent=True)
sampler = NeighborSampler(g, [-1], dgl.distributed.sample_neighbors) sampler = NeighborSampler(g, [-1], dgl.distributed.sample_neighbors, device)
print('|V|={}, eval batch size: {}'.format(g.number_of_nodes(), batch_size)) print('|V|={}, eval batch size: {}'.format(g.number_of_nodes(), batch_size))
# Create PyTorch DataLoader for constructing blocks # Create PyTorch DataLoader for constructing blocks
dataloader = DistDataLoader( dataloader = DistDataLoader(
...@@ -97,8 +112,7 @@ class DistSAGE(nn.Module): ...@@ -97,8 +112,7 @@ class DistSAGE(nn.Module):
batch_size=batch_size, batch_size=batch_size,
collate_fn=sampler.sample_blocks, collate_fn=sampler.sample_blocks,
shuffle=False, shuffle=False,
drop_last=False, drop_last=False)
num_workers=args.num_workers)
for blocks in tqdm.tqdm(dataloader): for blocks in tqdm.tqdm(dataloader):
block = blocks[0] block = blocks[0]
...@@ -140,20 +154,12 @@ def evaluate(model, g, inputs, labels, val_nid, test_nid, batch_size, device): ...@@ -140,20 +154,12 @@ def evaluate(model, g, inputs, labels, val_nid, test_nid, batch_size, device):
model.train() model.train()
return compute_acc(pred[val_nid], labels[val_nid]), compute_acc(pred[test_nid], labels[test_nid]) return compute_acc(pred[val_nid], labels[val_nid]), compute_acc(pred[test_nid], labels[test_nid])
def load_subtensor(g, seeds, input_nodes, device):
"""
Copys features and labels of a set of nodes onto GPU.
"""
batch_inputs = g.ndata['features'][input_nodes].to(device)
batch_labels = g.ndata['labels'][seeds].to(device)
return batch_inputs, batch_labels
def run(args, device, data): def run(args, device, data):
# Unpack data # Unpack data
train_nid, val_nid, test_nid, in_feats, n_classes, g = data train_nid, val_nid, test_nid, in_feats, n_classes, g = data
# Create sampler # Create sampler
sampler = NeighborSampler(g, [int(fanout) for fanout in args.fan_out.split(',')], sampler = NeighborSampler(g, [int(fanout) for fanout in args.fan_out.split(',')],
dgl.distributed.sample_neighbors) dgl.distributed.sample_neighbors, device)
# Create DataLoader for constructing blocks # Create DataLoader for constructing blocks
dataloader = DistDataLoader( dataloader = DistDataLoader(
...@@ -199,15 +205,9 @@ def run(args, device, data): ...@@ -199,15 +205,9 @@ def run(args, device, data):
# The nodes for input lies at the LHS side of the first block. # The nodes for input lies at the LHS side of the first block.
# The nodes for output lies at the RHS side of the last block. # The nodes for output lies at the RHS side of the last block.
input_nodes = blocks[0].srcdata[dgl.NID] batch_inputs = blocks[0].srcdata['features']
seeds = blocks[-1].dstdata[dgl.NID] batch_labels = blocks[-1].dstdata['labels']
# Load the input features as well as output labels
start = time.time()
batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
assert th.all(th.logical_not(th.isnan(batch_labels)))
batch_labels = batch_labels.long() batch_labels = batch_labels.long()
copy_time += time.time() - start
num_seeds += len(blocks[-1].dstdata[dgl.NID]) num_seeds += len(blocks[-1].dstdata[dgl.NID])
num_inputs += len(blocks[0].srcdata[dgl.NID]) num_inputs += len(blocks[0].srcdata[dgl.NID])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment