Unverified Commit 2c88f7c0 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] Split data to ensure data locality for multiple clients (#1710)

* fix.

* fix tests.

* fix

* add tests.

* fix.

* have default rank.

* add comment.

* fix test.

* remove check

* simplify code.

* add test.

* split data evenly.

* simplify the distributed training code.

* add comments.

* add comments.
parent 69e6afdf
...@@ -53,14 +53,13 @@ def run(args, device, data): ...@@ -53,14 +53,13 @@ def run(args, device, data):
optimizer = optim.Adam(model.parameters(), lr=args.lr) optimizer = optim.Adam(model.parameters(), lr=args.lr)
train_size = th.sum(g.ndata['train_mask'][0:g.number_of_nodes()]) train_size = th.sum(g.ndata['train_mask'][0:g.number_of_nodes()])
num_steps = int(args.num_epochs * train_size / args.batch_size / args.num_client)
# Training loop # Training loop
iter_tput = [] iter_tput = []
profiler = Profiler() profiler = Profiler()
profiler.start() profiler.start()
epoch = 0 epoch = 0
while num_steps > 0: for epoch in range(args.num_epochs):
tic = time.time() tic = time.time()
sample_time = 0 sample_time = 0
...@@ -120,10 +119,6 @@ def run(args, device, data): ...@@ -120,10 +119,6 @@ def run(args, device, data):
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MiB | time {:.3f} s'.format( print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MiB | time {:.3f} s'.format(
epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc, np.sum(step_time[-args.log_every:]))) epoch, step, loss.item(), acc.item(), np.mean(iter_tput[3:]), gpu_mem_alloc, np.sum(step_time[-args.log_every:])))
start = time.time() start = time.time()
num_steps -= 1
# We have to ensure all trainer process run the same number of steps.
if num_steps == 0:
break
toc = time.time() toc = time.time()
print('Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}'.format( print('Epoch Time(s): {:.4f}, sample: {:.4f}, data copy: {:.4f}, forward: {:.4f}, backward: {:.4f}, update: {:.4f}, #seeds: {}, #inputs: {}'.format(
...@@ -147,10 +142,11 @@ def run(args, device, data): ...@@ -147,10 +142,11 @@ def run(args, device, data):
def main(args): def main(args):
th.distributed.init_process_group(backend='gloo') th.distributed.init_process_group(backend='gloo')
g = dgl.distributed.DistGraph(args.ip_config, args.graph_name) g = dgl.distributed.DistGraph(args.ip_config, args.graph_name)
print('rank:', g.rank())
train_nid = dgl.distributed.node_split(g.ndata['train_mask'], g.get_partition_book(), g.rank()) train_nid = dgl.distributed.node_split(g.ndata['train_mask'], g.get_partition_book(), force_even=True)
val_nid = dgl.distributed.node_split(g.ndata['val_mask'], g.get_partition_book(), g.rank()) val_nid = dgl.distributed.node_split(g.ndata['val_mask'], g.get_partition_book(), force_even=True)
test_nid = dgl.distributed.node_split(g.ndata['test_mask'], g.get_partition_book(), g.rank()) test_nid = dgl.distributed.node_split(g.ndata['test_mask'], g.get_partition_book(), force_even=True)
print('part {}, train: {}, val: {}, test: {}'.format(g.rank(), len(train_nid), print('part {}, train: {}, val: {}, test: {}'.format(g.rank(), len(train_nid),
len(val_nid), len(test_nid))) len(val_nid), len(test_nid)))
device = th.device('cpu') device = th.device('cpu')
......
"""Define distributed graph.""" """Define distributed graph."""
from collections.abc import MutableMapping from collections.abc import MutableMapping
import numpy as np
from ..graph import DGLGraph from ..graph import DGLGraph
from .. import backend as F from .. import backend as F
...@@ -13,6 +14,7 @@ from .partition import load_partition ...@@ -13,6 +14,7 @@ from .partition import load_partition
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
from .. import utils from .. import utils
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from . import rpc
from .rpc_client import connect_to_server from .rpc_client import connect_to_server
from .server_state import ServerState from .server_state import ServerState
from .rpc_server import start_server from .rpc_server import start_server
...@@ -494,10 +496,20 @@ class DistGraph: ...@@ -494,10 +496,20 @@ class DistGraph:
int int
The rank of the current graph store. The rank of the current graph store.
''' '''
# If DistGraph doesn't have a local partition, it doesn't matter what rank
# it returns. There is no data locality any way, as long as the returned rank
# is unique in the system.
if self._g is None: if self._g is None:
return self._client.client_id return rpc.get_rank()
else: else:
return self._gpb.partid # If DistGraph has a local partition, we should be careful about the rank
# we return. We need to return a rank that node_split or edge_split can split
# the workload with respect to data locality.
num_client = rpc.get_num_client()
num_client_per_part = num_client // self._gpb.num_partitions()
# all ranks of the clients in the same machine are in a contiguous range.
client_id_in_part = rpc.get_rank() % num_client_per_part
return int(self._gpb.partid * num_client_per_part + client_id_in_part)
def get_partition_book(self): def get_partition_book(self):
"""Get the partition information. """Get the partition information.
...@@ -558,7 +570,70 @@ def _get_overlap(mask_arr, ids): ...@@ -558,7 +570,70 @@ def _get_overlap(mask_arr, ids):
masks = F.gather_row(mask_arr.tousertensor(), ids) masks = F.gather_row(mask_arr.tousertensor(), ids)
return F.boolean_mask(ids, masks) return F.boolean_mask(ids, masks)
def node_split(nodes, partition_book, rank): def _split_local(partition_book, rank, elements, local_eles):
''' Split the input element list with respect to data locality.
'''
num_clients = rpc.get_num_client()
num_client_per_part = num_clients // partition_book.num_partitions()
if rank is None:
rank = rpc.get_rank()
# all ranks of the clients in the same machine are in a contiguous range.
client_id_in_part = rank % num_client_per_part
local_eles = _get_overlap(elements, local_eles)
# get a subset for the local client.
size = len(local_eles) // num_client_per_part
# if this isn't the last client in the partition.
if client_id_in_part + 1 < num_client_per_part:
return local_eles[(size * client_id_in_part):(size * (client_id_in_part + 1))]
else:
return local_eles[(size * client_id_in_part):]
def _split_even(partition_book, rank, elements):
''' Split the input element list evenly.
'''
num_clients = rpc.get_num_client()
num_client_per_part = num_clients // partition_book.num_partitions()
if rank is None:
rank = rpc.get_rank()
# all ranks of the clients in the same machine are in a contiguous range.
client_id_in_part = rank % num_client_per_part
rank = client_id_in_part + num_client_per_part * partition_book.partid
if isinstance(elements, DistTensor):
# Here we need to fetch all elements from the kvstore server.
# I hope it's OK.
eles = F.nonzero_1d(elements[0:len(elements)])
else:
elements = utils.toindex(elements)
eles = F.nonzero_1d(elements.tousertensor())
# here we divide the element list as evenly as possible. If we use range partitioning,
# the split results also respect the data locality. Range partitioning is the default
# strategy.
# TODO(zhegnda) we need another way to divide the list for other partitioning strategy.
# compute the offset of each split and ensure that the difference of each partition size
# is 1.
part_size = len(eles) // num_clients
sizes = [part_size] * num_clients
remain = len(eles) - part_size * num_clients
if remain > 0:
for i in range(num_clients):
sizes[i] += 1
remain -= 1
if remain == 0:
break
offsets = np.cumsum(sizes)
assert offsets[-1] == len(eles)
if rank == 0:
return eles[0:offsets[0]]
else:
return eles[offsets[rank-1]:offsets[rank]]
def node_split(nodes, partition_book, rank=None, force_even=False):
''' Split nodes and return a subset for the local rank. ''' Split nodes and return a subset for the local rank.
This function splits the input nodes based on the partition book and This function splits the input nodes based on the partition book and
...@@ -569,6 +644,14 @@ def node_split(nodes, partition_book, rank): ...@@ -569,6 +644,14 @@ def node_split(nodes, partition_book, rank):
the same as the number of nodes in a graph; 1 indicates that the vertex in the same as the number of nodes in a graph; 1 indicates that the vertex in
the corresponding location exists. the corresponding location exists.
There are two strategies to split the nodes. By default, it splits the nodes
in a way to maximize data locality. That is, all nodes that belong to a process
are returned. If `force_even` is set to true, the nodes are split evenly so
that each process gets almost the same number of nodes. The current implementation
can still enable data locality when a graph is partitioned with range partitioning.
In this case, majority of the nodes returned for a process are the ones that
belong to the process. If range partitioning is not used, data locality isn't guaranteed.
Parameters Parameters
---------- ----------
nodes : 1D tensor or DistTensor nodes : 1D tensor or DistTensor
...@@ -576,7 +659,9 @@ def node_split(nodes, partition_book, rank): ...@@ -576,7 +659,9 @@ def node_split(nodes, partition_book, rank):
partition_book : GraphPartitionBook partition_book : GraphPartitionBook
The graph partition book The graph partition book
rank : int rank : int
The rank of the current process The rank of a process. If not given, the rank of the current process is used.
force_even : bool
Force the nodes are split evenly.
Returns Returns
------- -------
...@@ -588,11 +673,14 @@ def node_split(nodes, partition_book, rank): ...@@ -588,11 +673,14 @@ def node_split(nodes, partition_book, rank):
num_nodes += part['num_nodes'] num_nodes += part['num_nodes']
assert len(nodes) == num_nodes, \ assert len(nodes) == num_nodes, \
'The length of boolean mask vector should be the number of nodes in the graph.' 'The length of boolean mask vector should be the number of nodes in the graph.'
# Get all nodes that belong to the rank. if force_even:
local_nids = partition_book.partid2nids(rank) return _split_even(partition_book, rank, nodes)
return _get_overlap(nodes, local_nids) else:
# Get all nodes that belong to the rank.
local_nids = partition_book.partid2nids(partition_book.partid)
return _split_local(partition_book, rank, nodes, local_nids)
def edge_split(edges, partition_book, rank): def edge_split(edges, partition_book, rank=None, force_even=False):
''' Split edges and return a subset for the local rank. ''' Split edges and return a subset for the local rank.
This function splits the input edges based on the partition book and This function splits the input edges based on the partition book and
...@@ -603,14 +691,24 @@ def edge_split(edges, partition_book, rank): ...@@ -603,14 +691,24 @@ def edge_split(edges, partition_book, rank):
the same as the number of edges in a graph; 1 indicates that the edge in the same as the number of edges in a graph; 1 indicates that the edge in
the corresponding location exists. the corresponding location exists.
There are two strategies to split the edges. By default, it splits the edges
in a way to maximize data locality. That is, all edges that belong to a process
are returned. If `force_even` is set to true, the edges are split evenly so
that each process gets almost the same number of edges. The current implementation
can still enable data locality when a graph is partitioned with range partitioning.
In this case, majority of the edges returned for a process are the ones that
belong to the process. If range partitioning is not used, data locality isn't guaranteed.
Parameters Parameters
---------- ----------
edges : 1D tensor or DistTensor edges : 1D tensor or DistTensor
A boolean mask vector that indicates input nodes. A boolean mask vector that indicates input edges.
partition_book : GraphPartitionBook partition_book : GraphPartitionBook
The graph partition book The graph partition book
rank : int rank : int
The rank of the current process The rank of a process. If not given, the rank of the current process is used.
force_even : bool
Force the edges are split evenly.
Returns Returns
------- -------
...@@ -622,6 +720,10 @@ def edge_split(edges, partition_book, rank): ...@@ -622,6 +720,10 @@ def edge_split(edges, partition_book, rank):
num_edges += part['num_edges'] num_edges += part['num_edges']
assert len(edges) == num_edges, \ assert len(edges) == num_edges, \
'The length of boolean mask vector should be the number of edges in the graph.' 'The length of boolean mask vector should be the number of edges in the graph.'
# Get all edges that belong to the rank.
local_eids = partition_book.partid2eids(rank) if force_even:
return _get_overlap(edges, local_eids) return _split_even(partition_book, rank, edges)
else:
# Get all edges that belong to the rank.
local_eids = partition_book.partid2eids(partition_book.partid)
return _split_local(partition_book, rank, edges, local_eids)
...@@ -107,7 +107,7 @@ def run_client(graph_name, part_id, num_nodes, num_edges): ...@@ -107,7 +107,7 @@ def run_client(graph_name, part_id, num_nodes, num_edges):
selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30 selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
# Test node split # Test node split
nodes = node_split(selected_nodes, g.get_partition_book(), g.rank()) nodes = node_split(selected_nodes, g.get_partition_book())
nodes = F.asnumpy(nodes) nodes = F.asnumpy(nodes)
# We only have one partition, so the local nodes are basically all nodes in the graph. # We only have one partition, so the local nodes are basically all nodes in the graph.
local_nids = np.arange(g.number_of_nodes()) local_nids = np.arange(g.number_of_nodes())
...@@ -172,6 +172,7 @@ def test_split(): ...@@ -172,6 +172,7 @@ def test_split():
selected_nodes = np.nonzero(node_mask)[0] selected_nodes = np.nonzero(node_mask)[0]
selected_edges = np.nonzero(edge_mask)[0] selected_edges = np.nonzero(edge_mask)[0]
for i in range(num_parts): for i in range(num_parts):
dgl.distributed.set_num_client(num_parts)
part_g, node_feats, edge_feats, gpb = load_partition('/tmp/dist_graph/dist_graph_test.json', i) part_g, node_feats, edge_feats, gpb = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
local_nids = F.nonzero_1d(part_g.ndata['inner_node']) local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids) local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
...@@ -182,6 +183,13 @@ def test_split(): ...@@ -182,6 +183,13 @@ def test_split():
for n in nodes1: for n in nodes1:
assert n in local_nids assert n in local_nids
dgl.distributed.set_num_client(num_parts * 2)
nodes3 = node_split(node_mask, gpb, i * 2)
nodes4 = node_split(node_mask, gpb, i * 2 + 1)
nodes5 = F.cat([nodes3, nodes4], 0)
assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes5)))
dgl.distributed.set_num_client(num_parts)
local_eids = F.nonzero_1d(part_g.edata['inner_edge']) local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids) local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids)) edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
...@@ -191,6 +199,71 @@ def test_split(): ...@@ -191,6 +199,71 @@ def test_split():
for e in edges1: for e in edges1:
assert e in local_eids assert e in local_eids
dgl.distributed.set_num_client(num_parts * 2)
edges3 = edge_split(edge_mask, gpb, i * 2)
edges4 = edge_split(edge_mask, gpb, i * 2 + 1)
edges5 = F.cat([edges3, edges4], 0)
assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges5)))
def test_split_even():
prepare_dist()
g = create_random_graph(10000)
num_parts = 4
num_hops = 2
partition_graph(g, 'dist_graph_test', num_parts, '/tmp/dist_graph', num_hops=num_hops, part_method='metis')
node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
selected_nodes = np.nonzero(node_mask)[0]
selected_edges = np.nonzero(edge_mask)[0]
all_nodes1 = []
all_nodes2 = []
all_edges1 = []
all_edges2 = []
for i in range(num_parts):
dgl.distributed.set_num_client(num_parts)
part_g, node_feats, edge_feats, gpb = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
nodes = node_split(node_mask, gpb, i, force_even=True)
all_nodes1.append(nodes)
subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))
dgl.distributed.set_num_client(num_parts * 2)
nodes1 = node_split(node_mask, gpb, i * 2, force_even=True)
nodes2 = node_split(node_mask, gpb, i * 2 + 1, force_even=True)
nodes3 = F.cat([nodes1, nodes2], 0)
all_nodes2.append(nodes3)
subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(nodes3))
print('intersection has', len(subset))
dgl.distributed.set_num_client(num_parts)
local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
edges = edge_split(edge_mask, gpb, i, force_even=True)
all_edges1.append(edges)
subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(local_eids))
print('part {} get {} edges and {} are in the partition'.format(i, len(edges), len(subset)))
dgl.distributed.set_num_client(num_parts * 2)
edges1 = edge_split(edge_mask, gpb, i * 2, force_even=True)
edges2 = edge_split(edge_mask, gpb, i * 2 + 1, force_even=True)
edges3 = F.cat([edges1, edges2], 0)
all_edges2.append(edges3)
subset = np.intersect1d(F.asnumpy(edges), F.asnumpy(edges3))
print('intersection has', len(subset))
all_nodes1 = F.cat(all_nodes1, 0)
all_edges1 = F.cat(all_edges1, 0)
all_nodes2 = F.cat(all_nodes2, 0)
all_edges2 = F.cat(all_edges2, 0)
all_nodes = np.nonzero(node_mask)[0]
all_edges = np.nonzero(edge_mask)[0]
assert np.all(all_nodes == F.asnumpy(all_nodes1))
assert np.all(all_edges == F.asnumpy(all_edges1))
assert np.all(all_nodes == F.asnumpy(all_nodes2))
assert np.all(all_edges == F.asnumpy(all_edges2))
def prepare_dist(): def prepare_dist():
ip_config = open("kv_ip_config.txt", "w") ip_config = open("kv_ip_config.txt", "w")
ip_addr = get_local_usable_addr() ip_addr = get_local_usable_addr()
...@@ -199,5 +272,6 @@ def prepare_dist(): ...@@ -199,5 +272,6 @@ def prepare_dist():
if __name__ == '__main__': if __name__ == '__main__':
os.makedirs('/tmp/dist_graph', exist_ok=True) os.makedirs('/tmp/dist_graph', exist_ok=True)
#test_split() test_split()
test_split_even()
test_server_client() test_server_client()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment