Unverified Commit 96984fac authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Feature] Split data for distributed training (#1505)



* fix tests in graph partition book.

* implement node_split and edge_split.

* fix a bug.

* add tests.

* remove unnecessary code.

* avoid storing graph structure in partition book.

* add partition book in DistGraph.

* Revert "avoid storing graph structure in partition book."

This reverts commit 55b11fbf9293efcae5f8c97e93f1e121399dadae.

* small fixes.

* add a test for boolean mask vector.

* fix test.

* fix bugs.

* make it work for all different cases.

* fix tests.

* fix boolean mask

* fix for TF

* fix for tensorflow.

* fix test for TF

* only support boolean mask for now.

* fix tests.

* make the code more readable.

* fix test.
Co-authored-by: default avatarChao Ma <mctt90@gmail.com>
Co-authored-by: default avatarJinjing Zhou <VoVAllen@users.noreply.github.com>
parent cfb24790
......@@ -239,6 +239,8 @@ def unsorted_1d_segment_mean(input, seg_id, n_segs, dim):
return y
def boolean_mask(input, mask):
if 'bool' not in str(mask.dtype):
mask = th.tensor(mask, dtype=th.bool)
return input[mask]
def equal(x, y):
......
"""DGL distributed."""
from .dist_graph import DistGraphServer, DistGraph
from .dist_graph import DistGraphServer, DistGraph, node_split, edge_split
from .partition import partition_graph, load_partition
from .graph_partition_book import GraphPartitionBook
......@@ -12,7 +12,9 @@ from ..graph_index import from_shared_mem_graph_index
from .._ffi.ndarray import empty_shared_mem
from ..frame import infer_scheme
from .partition import load_partition
from .graph_partition_book import GraphPartitionBook
from .. import ndarray as nd
from .. import utils
def _get_ndata_path(graph_name, ndata_name):
return "/" + graph_name + "_node_" + ndata_name
......@@ -52,6 +54,7 @@ def _copy_graph_to_shared_mem(g, graph_name):
return new_g
FIELD_DICT = {'local_node': F.int64,
'local_edge': F.int64,
NID: F.int64,
EID: F.int64}
......@@ -116,22 +119,49 @@ def _get_graph_from_shared_mem(graph_name):
g = DGLGraph(gidx)
g.ndata['local_node'] = _get_shared_mem_ndata(g, graph_name, 'local_node')
g.edata['local_edge'] = _get_shared_mem_edata(g, graph_name, 'local_edge')
g.ndata[NID] = _get_shared_mem_ndata(g, graph_name, NID)
g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID)
return g
def _move_metadata_to_shared_mam(graph_name, num_nodes, num_edges, part_id,
num_partitions, node_map, edge_map):
''' Move all metadata to the shared memory.
We need these metadata to construct graph partition book.
'''
meta = _move_data_to_shared_mem_array(F.tensor([num_nodes, num_edges,
num_partitions, part_id]),
_get_ndata_path(graph_name, 'meta'))
node_map = _move_data_to_shared_mem_array(node_map, _get_ndata_path(graph_name, 'node_map'))
edge_map = _move_data_to_shared_mem_array(edge_map, _get_edata_path(graph_name, 'edge_map'))
return meta, node_map, edge_map
def _get_shared_mem_metadata(graph_name):
''' Get the metadata of the graph through shared memory.
The metadata includes the number of nodes and the number of edges. In the future,
we can add more information, especially for heterograph.
'''
shape = (2,)
shape = (4,)
dtype = F.int64
dtype = DTYPE_DICT[dtype]
data = empty_shared_mem(_get_ndata_path(graph_name, 'meta'), False, shape, dtype)
dlpack = data.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
meta = F.asnumpy(F.zerocopy_from_dlpack(dlpack))
num_nodes, num_edges, num_partitions, part_id = meta[0], meta[1], meta[2], meta[3]
# Load node map
data = empty_shared_mem(_get_ndata_path(graph_name, 'node_map'), False, (num_nodes,), dtype)
dlpack = data.to_dlpack()
node_map = F.zerocopy_from_dlpack(dlpack)
# Load edge_map
data = empty_shared_mem(_get_edata_path(graph_name, 'edge_map'), False, (num_edges,), dtype)
dlpack = data.to_dlpack()
edge_map = F.zerocopy_from_dlpack(dlpack)
return num_nodes, num_edges, part_id, num_partitions, node_map, edge_map
class DistTensor:
''' Distributed tensor.
......@@ -312,16 +342,14 @@ class DistGraphServer(KVServer):
print('Server {}: host name: {}, ip: {}'.format(server_id, host_name, host_ip))
self.client_g, node_feats, edge_feats, self.meta = load_partition(conf_file, server_id)
num_nodes, num_edges, node_map, edge_map = self.meta
num_nodes, num_edges, node_map, edge_map, num_partitions = self.meta
self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name)
self.meta = _move_data_to_shared_mem_array(F.tensor([num_nodes, num_edges]),
_get_ndata_path(graph_name, 'meta'))
# Create node global2local map.
node_g2l = F.zeros((num_nodes), dtype=F.int64, ctx=F.cpu()) - 1
# The nodes that belong to this partition.
local_nids = F.nonzero_1d(self.client_g.ndata['local_node'])
nids = self.client_g.ndata[NID][local_nids]
nids = F.asnumpy(F.gather_row(self.client_g.ndata[NID], local_nids))
assert np.all(node_map[nids] == server_id), 'Load a wrong partition'
F.scatter_row_inplace(node_g2l, nids, F.arange(0, len(nids)))
......@@ -329,7 +357,7 @@ class DistGraphServer(KVServer):
if len(edge_feats) > 0:
edge_g2l = F.zeros((num_edges), dtype=F.int64, ctx=F.cpu()) - 1
local_eids = F.nonzero_1d(self.client_g.edata['local_edge'])
eids = self.client_g.edata[EID][local_eids]
eids = F.asnumpy(F.gather_row(self.client_g.edata[EID], local_eids))
assert np.all(edge_map[eids] == server_id), 'Load a wrong partition'
F.scatter_row_inplace(edge_g2l, eids, F.arange(0, len(eids)))
......@@ -354,6 +382,14 @@ class DistGraphServer(KVServer):
self.init_data(name=_get_edata_name(name))
self.set_partition_book(name=_get_edata_name(name), partition_book=edge_map)
# TODO(zhengda) this is temporary solution. We don't need this in the future.
self.meta, self.node_map, self.edge_map = _move_metadata_to_shared_mam(graph_name,
num_nodes,
num_edges,
server_id,
num_partitions,
node_map, edge_map)
class DistGraph:
''' The DistGraph client.
......@@ -383,19 +419,12 @@ class DistGraph:
self._client = KVClient(server_namebook=server_namebook)
self._client.connect()
self.g = _get_graph_from_shared_mem(graph_name)
self.graph_name = graph_name
self.meta = F.asnumpy(_get_shared_mem_metadata(graph_name))
self._g = _get_graph_from_shared_mem(graph_name)
self._tot_num_nodes, self._tot_num_edges, self._part_id, num_parts, node_map, \
edge_map = _get_shared_mem_metadata(graph_name)
self._gpb = GraphPartitionBook(self._part_id, num_parts, node_map, edge_map, self._g)
self._client.barrier()
if self.g is not None:
self._local_nids = F.nonzero_1d(self.g.ndata['local_node'])
self._local_gnid = self.g.ndata[NID][self._local_nids]
else:
self._local_nids = None
self._local_gnid = None
self._ndata = NodeDataView(self)
self._edata = EdgeDataView(self)
......@@ -496,11 +525,11 @@ class DistGraph:
def number_of_nodes(self):
"""Return the number of nodes"""
return self.meta[0]
return self._tot_num_nodes
def number_of_edges(self):
"""Return the number of edges"""
return self.meta[1]
return self._tot_num_edges
def node_attr_schemes(self):
"""Return the node feature and embedding schemes."""
......@@ -524,7 +553,20 @@ class DistGraph:
int
The rank of the current graph store.
'''
return self._client.get_id()
# Here the rank of the client should be the same as the partition Id to ensure
# that we always get the local partition.
# TODO(zhengda) we need to change this if we support two-level partitioning.
return self._part_id
def get_partition_book(self):
"""Get the partition information.
Returns
-------
GraphPartitionBook
Object that stores all kinds of partition information.
"""
return self._gpb
def shut_down(self):
"""Shut down all KVServer nodes.
......@@ -557,3 +599,98 @@ class DistGraph:
# Remove the prefix "edge:"
edata_names.append(name[5:])
return edata_names
def _get_overlap(mask_arr, ids):
""" Select the Ids given a boolean mask array.
The boolean mask array indicates all of the Ids to be selected. We want to
find the overlap between the Ids selected by the boolean mask array and
the Id array.
Parameters
----------
mask_arr : 1D tensor
A boolean mask array.
ids : 1D tensor
A vector with Ids.
Returns
-------
1D tensor
The selected Ids.
"""
if isinstance(mask_arr, DistTensor):
masks = mask_arr[ids]
return F.boolean_mask(ids, masks)
else:
mask_arr = utils.toindex(mask_arr)
masks = F.gather_row(mask_arr.tousertensor(), ids)
return F.boolean_mask(ids, masks)
def node_split(nodes, partition_book, rank):
''' Split nodes and return a subset for the local rank.
This function splits the input nodes based on the partition book and
returns a subset of nodes for the local rank. This method is used for
dividing workloads for distributed training.
The input nodes can be stored as a vector of masks. The length of the vector is
the same as the number of nodes in a graph; 1 indicates that the vertex in
the corresponding location exists.
Parameters
----------
nodes : 1D tensor or DistTensor
A boolean mask vector that indicates input nodes.
partition_book : GraphPartitionBook
The graph partition book
rank : int
The rank of the current process
Returns
-------
1D-tensor
The vector of node Ids that belong to the rank.
'''
num_nodes = 0
for part in partition_book.metadata():
num_nodes += part['num_nodes']
assert len(nodes) == num_nodes, \
'The length of boolean mask vector should be the number of nodes in the graph.'
# Get all nodes that belong to the rank.
local_nids = partition_book.partid2nids(rank)
return _get_overlap(nodes, local_nids)
def edge_split(edges, partition_book, rank):
''' Split edges and return a subset for the local rank.
This function splits the input edges based on the partition book and
returns a subset of edges for the local rank. This method is used for
dividing workloads for distributed training.
The input edges can be stored as a vector of masks. The length of the vector is
the same as the number of edges in a graph; 1 indicates that the edge in
the corresponding location exists.
Parameters
----------
edges : 1D tensor or DistTensor
A boolean mask vector that indicates input nodes.
partition_book : GraphPartitionBook
The graph partition book
rank : int
The rank of the current process
Returns
-------
1D-tensor
The vector of edge Ids that belong to the rank.
'''
num_edges = 0
for part in partition_book.metadata():
num_edges += part['num_edges']
assert len(edges) == num_edges, \
'The length of boolean mask vector should be the number of edges in the graph.'
# Get all edges that belong to the rank.
local_eids = partition_book.partid2eids(rank)
return _get_overlap(edges, local_eids)
......@@ -4,6 +4,7 @@ import numpy as np
from .. import backend as F
from ..base import NID, EID
from .. import utils
class GraphPartitionBook:
"""GraphPartitionBook is used to store parition information.
......@@ -14,9 +15,9 @@ class GraphPartitionBook:
partition id of current GraphPartitionBook
num_parts : int
number of total partitions
node_map : numpy array
node_map : tensor
global node id mapping to partition id
edge_map : numpy array
edge_map : tensor
global edge id mapping to partition id
part_graph : DGLGraph
The graph partition structure.
......@@ -26,8 +27,10 @@ class GraphPartitionBook:
assert num_parts > 0, 'num_parts must be greater than zero.'
self._part_id = part_id
self._num_partitions = num_parts
self._nid2partid = F.zerocopy_from_numpy(node_map)
self._eid2partid = F.zerocopy_from_numpy(edge_map)
node_map = utils.toindex(node_map)
self._nid2partid = node_map.tousertensor()
edge_map = utils.toindex(edge_map)
self._eid2partid = edge_map.tousertensor()
self._graph = part_graph
# Get meta data of GraphPartitionBook
self._partition_meta_data = []
......
......@@ -11,7 +11,7 @@ import multiprocessing as mp
from dgl.graph_index import create_graph_index
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
from dgl.distributed import partition_graph
from dgl.distributed import partition_graph, load_partition, GraphPartitionBook, node_split, edge_split
import backend as F
import unittest
import pickle
......@@ -76,10 +76,20 @@ def run_client(graph_name, barrier, num_nodes, num_edges):
assert g.node_attr_schemes()['test1'].dtype == F.int32
assert g.node_attr_schemes()['features'].shape == (1,)
selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
# Test node split
nodes = node_split(selected_nodes, g.get_partition_book(), g.rank())
nodes = F.asnumpy(nodes)
# We only have one partition, so the local nodes are basically all nodes in the graph.
local_nids = np.arange(g.number_of_nodes())
for n in nodes:
assert n in local_nids
g.shut_down()
print('end')
def run_server_client():
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
g = create_random_graph(10000)
# Partition the graph
......@@ -110,5 +120,42 @@ def run_server_client():
p.join()
print('clients have terminated')
def test_split():
g = create_random_graph(10000)
num_parts = 4
num_hops = 2
partition_graph(g, 'test', num_parts, '/tmp', num_hops=num_hops, part_method='metis')
node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
selected_nodes = np.nonzero(node_mask)[0]
selected_edges = np.nonzero(edge_mask)[0]
for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta
gpb = GraphPartitionBook(part_id=i,
num_parts=num_partitions,
node_map=node_map,
edge_map=edge_map,
part_graph=part_g)
local_nids = F.nonzero_1d(part_g.ndata['local_node'])
local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
nodes2 = node_split(node_mask, gpb, i)
assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
local_nids = F.asnumpy(local_nids)
for n in nodes1:
assert n in local_nids
local_eids = F.nonzero_1d(part_g.edata['local_edge'])
local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
edges2 = edge_split(edge_mask, gpb, i)
assert np.all(np.sort(edges1) == np.sort(F.asnumpy(edges2)))
local_eids = F.asnumpy(local_eids)
for e in edges1:
assert e in local_eids
if __name__ == '__main__':
run_server_client()
test_split()
test_server_client()
......@@ -9,14 +9,6 @@ import backend as F
import unittest
import pickle
def create_ip_config():
ip_config = open("ip_config.txt", "w")
ip_config.write('192.168.9.12 30050 0\n')
ip_config.write('192.168.9.13 30050 1\n')
ip_config.write('192.168.9.14 30050 2\n')
ip_config.write('192.168.9.15 30050 3\n')
ip_config.close()
def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64)
ig = create_graph_index(arr, readonly=True)
......@@ -30,7 +22,6 @@ def test_graph_partition_book():
num_parts = 4
num_hops = 2
create_ip_config()
partition_graph(g, 'test', num_parts, '/tmp', num_hops=num_hops, part_method='metis')
for i in range(num_parts):
......@@ -55,4 +46,4 @@ def test_graph_partition_book():
if __name__ == '__main__':
test_graph_partition_book()
\ No newline at end of file
test_graph_partition_book()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment