"tests/python/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "d853a010e0dbba8b1d07f8de284e0e71b52a3dfd"
Unverified Commit 05b0f1ea authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Model] Shared memory history tensor for multi-gpu training of control variate methods (#1479)



* shared memory history for multi-gpu training

* reorg
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
parent dc0432e7
...@@ -102,11 +102,10 @@ class SAGE(nn.Module): ...@@ -102,11 +102,10 @@ class SAGE(nn.Module):
# on each layer are of course splitted in batches. # on each layer are of course splitted in batches.
# TODO: can we standardize this? # TODO: can we standardize this?
nodes = th.arange(g.number_of_nodes()) nodes = th.arange(g.number_of_nodes())
ys = []
for l, layer in enumerate(self.layers): for l, layer in enumerate(self.layers):
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes) y = g.ndata['hist_%d' % (l + 1)]
for start in range(0, len(nodes), batch_size): for start in tqdm.trange(0, len(nodes), batch_size):
end = start + batch_size end = start + batch_size
batch_nodes = nodes[start:end] batch_nodes = nodes[start:end]
block = dgl.to_block(dgl.in_subgraph(g, batch_nodes), batch_nodes) block = dgl.to_block(dgl.in_subgraph(g, batch_nodes), batch_nodes)
...@@ -118,9 +117,8 @@ class SAGE(nn.Module): ...@@ -118,9 +117,8 @@ class SAGE(nn.Module):
y[start:end] = h.cpu() y[start:end] = h.cpu()
ys.append(y)
x = y x = y
return y, ys return y
...@@ -207,7 +205,7 @@ def evaluate(model, g, labels, val_mask, batch_size, device): ...@@ -207,7 +205,7 @@ def evaluate(model, g, labels, val_mask, batch_size, device):
model.eval() model.eval()
with th.no_grad(): with th.no_grad():
inputs = g.ndata['features'] inputs = g.ndata['features']
pred, _ = model.inference(g, inputs, batch_size, device) pred = model.inference(g, inputs, batch_size, device) # also recomputes history tensors
model.train() model.train()
return compute_acc(pred[val_mask], labels[val_mask]) return compute_acc(pred[val_mask], labels[val_mask])
...@@ -230,13 +228,15 @@ def load_subtensor(g, labels, blocks, hist_blocks, dev_id, aggregation_on_device ...@@ -230,13 +228,15 @@ def load_subtensor(g, labels, blocks, hist_blocks, dev_id, aggregation_on_device
if not aggregation_on_device: if not aggregation_on_device:
block.dstdata['agg_hist'] = block.dstdata['agg_hist'].to(dev_id) block.dstdata['agg_hist'] = block.dstdata['agg_hist'].to(dev_id)
def init_history(g, model, dev_id): def create_history_storage(g, args, n_classes):
# Initialize history storage
for l in range(args.num_layers):
dim = args.num_hidden if l != args.num_layers - 1 else n_classes
g.ndata['hist_%d' % (l + 1)] = th.zeros(g.number_of_nodes(), dim).share_memory_()
def init_history(g, model, dev_id, batch_size):
with th.no_grad(): with th.no_grad():
history = model.inference(g, g.ndata['features'], 1000, dev_id)[1] model.inference(g, g.ndata['features'], batch_size, dev_id) # replaces hist_i features in-place
for layer in range(args.num_layers + 1):
if layer > 0:
hist_col = 'hist_%d' % layer
g.ndata['hist_%d' % layer] = history[layer - 1]
def update_history(g, blocks): def update_history(g, blocks):
with th.no_grad(): with th.no_grad():
...@@ -298,10 +298,11 @@ def run(proc_id, n_gpus, args, devices, data): ...@@ -298,10 +298,11 @@ def run(proc_id, n_gpus, args, devices, data):
# Compute history tensor and their aggregation before training on CPU # Compute history tensor and their aggregation before training on CPU
model.eval() model.eval()
if n_gpus > 1: if n_gpus > 1:
init_history(g, model.module, dev_id) if proc_id == 0:
init_history(g, model.module, dev_id, args.val_batch_size)
th.distributed.barrier() th.distributed.barrier()
else: else:
init_history(g, model, dev_id) init_history(g, model, dev_id, args.val_batch_size)
model.train() model.train()
# Training loop # Training loop
...@@ -392,7 +393,9 @@ if __name__ == '__main__': ...@@ -392,7 +393,9 @@ if __name__ == '__main__':
n_classes = data.num_labels n_classes = data.num_labels
# Construct graph # Construct graph
g = dgl.graph(data.graph.all_edges()) g = dgl.graph(data.graph.all_edges())
g.ndata['features'] = features g.ndata['features'] = features.share_memory_()
create_history_storage(g, args, n_classes)
prepare_mp(g) prepare_mp(g)
# Pack data # Pack data
data = train_mask, val_mask, in_feats, labels, n_classes, g data = train_mask, val_mask, in_feats, labels, n_classes, g
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment