Unverified Commit ef7e4750 authored by Chao Ma's avatar Chao Ma Committed by GitHub
Browse files

[Distributed] Add GraphPartitionBook (#1496)

* Add graph-partition-book

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* udpate

* fix lint

* update

* update

* update

* update

* update

* update

* update

* skip test on GPU

* update

* update

* update

* update

* update

* update

* update

* fix lint

* update

* update

* update

* update

* fix unittest

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update
parent dc8ca88e
...@@ -2,3 +2,4 @@ ...@@ -2,3 +2,4 @@
from .dist_graph import DistGraphServer, DistGraph from .dist_graph import DistGraphServer, DistGraph
from .partition import partition_graph, load_partition from .partition import partition_graph, load_partition
from .graph_partition_book import GraphPartitionBook
"""Define graph partition book."""
import numpy as np
from .. import backend as F
from ..base import NID, EID
class GraphPartitionBook:
"""GraphPartitionBook is used to store parition information.
Parameters
----------
part_id : int
partition id of current GraphPartitionBook
num_parts : int
number of total partitions
node_map : numpy array
global node id mapping to partition id
edge_map : numpy array
global edge id mapping to partition id
part_graph : DGLGraph
The graph partition structure.
"""
def __init__(self, part_id, num_parts, node_map, edge_map, part_graph):
assert part_id >= 0, 'part_id cannot be a negative number.'
assert num_parts > 0, 'num_parts must be greater than zero.'
self._part_id = part_id
self._num_partitions = num_parts
self._nid2partid = F.zerocopy_from_numpy(node_map)
self._eid2partid = F.zerocopy_from_numpy(edge_map)
self._graph = part_graph
# Get meta data of GraphPartitionBook
self._partition_meta_data = []
_, nid_count = np.unique(F.asnumpy(self._nid2partid), return_counts=True)
_, eid_count = np.unique(F.asnumpy(self._eid2partid), return_counts=True)
for partid in range(self._num_partitions):
part_info = {}
part_info['machine_id'] = partid
part_info['num_nodes'] = nid_count[partid]
part_info['num_edges'] = eid_count[partid]
self._partition_meta_data.append(part_info)
# Get partid2nids
self._partid2nids = []
sorted_nid = F.tensor(np.argsort(F.asnumpy(self._nid2partid)))
start = 0
for offset in nid_count:
part_nids = sorted_nid[start:start+offset]
start += offset
self._partid2nids.append(part_nids)
# Get partid2eids
self._partid2eids = []
sorted_eid = F.tensor(np.argsort(F.asnumpy(self._eid2partid)))
start = 0
for offset in eid_count:
part_eids = sorted_eid[start:start+offset]
start += offset
self._partid2eids.append(part_eids)
# Get nidg2l
self._nidg2l = [None] * self._num_partitions
global_id = self._graph.ndata[NID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id+1), F.int64, F.context(global_id))
g2l = F.scatter_row(g2l, global_id, F.arange(0, len(global_id)))
self._nidg2l[self._part_id] = g2l
# Get eidg2l
self._eidg2l = [None] * self._num_partitions
global_id = self._graph.edata[EID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id+1), F.int64, F.context(global_id))
g2l = F.scatter_row(g2l, global_id, F.arange(0, len(global_id)))
self._eidg2l[self._part_id] = g2l
def num_partitions(self):
"""Return the number of partitions.
Returns
-------
int
number of partitions
"""
return self._num_partitions
def metadata(self):
"""Return the partition meta data.
The meta data includes:
* The machine ID.
* The machine IP address.
* Number of nodes and edges of each partition.
Examples
--------
>>> print(g.get_partition_book().metadata())
>>> [{'machine_id' : 0, 'num_nodes' : 3000, 'num_edges' : 5000},
... {'machine_id' : 1, 'num_nodes' : 2000, 'num_edges' : 4888},
... ...]
Returns
-------
list[dict[str, any]]
Meta data of each partition.
"""
return self._partition_meta_data
def nid2partid(self, nids):
"""From global node IDs to partition IDs
Parameters
----------
nids : tensor
global node IDs
Returns
-------
tensor
partition IDs
"""
return F.gather_row(self._nid2partid, nids)
def eid2partid(self, eids):
"""From global edge IDs to partition IDs
Parameters
----------
eids : tensor
global edge IDs
Returns
-------
tensor
partition IDs
"""
return F.gather_row(self._eid2partid, eids)
def partid2nids(self, partid):
"""From partition id to node IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
node IDs
"""
return self._partid2nids[partid]
def partid2eids(self, partid):
"""From partition id to edge IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
edge IDs
"""
return self._partid2eids[partid]
def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
Parameters
----------
nids : tensor
global node IDs
partid : int
partition ID
Returns
-------
tensor
local node IDs
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of nid2localnid.')
return F.gather_row(self._nidg2l[partid], nids)
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
Parameters
----------
eids : tensor
global edge ids
partid : int
partition ID
Returns
-------
tensor
local edge ids
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of eid2localeid.')
return F.gather_row(self._eidg2l[partid], eids)
def get_partition(self, partid):
"""Get the graph of one partition.
Parameters
----------
partid : int
Partition ID.
Returns
-------
DGLGraph
The graph of the partition.
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote partitions.')
return self._graph
...@@ -25,7 +25,7 @@ the configuration file will look like the following: ...@@ -25,7 +25,7 @@ the configuration file will look like the following:
{ {
"graph_name" : "test", "graph_name" : "test",
"part_method" : "metis", "part_method" : "metis",
"num_parts" : 2 "num_parts" : 2,
"halo_hops" : 1, "halo_hops" : 1,
"node_map" : "data_root_dir/node_map.npy", "node_map" : "data_root_dir/node_map.npy",
"edge_map" : "data_root_dir/edge_map.npy" "edge_map" : "data_root_dir/edge_map.npy"
...@@ -120,6 +120,8 @@ def load_partition(conf_file, part_id): ...@@ -120,6 +120,8 @@ def load_partition(conf_file, part_id):
''' '''
with open(conf_file) as conf_f: with open(conf_file) as conf_f:
part_metadata = json.load(conf_f) part_metadata = json.load(conf_f)
assert 'num_parts' in part_metadata, 'num_parts does not exist.'
num_parts = part_metadata['num_parts']
assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id) assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id)
part_files = part_metadata['part-{}'.format(part_id)] part_files = part_metadata['part-{}'.format(part_id)]
assert 'node_feats' in part_files, "the partition does not contain node features." assert 'node_feats' in part_files, "the partition does not contain node features."
...@@ -134,7 +136,7 @@ def load_partition(conf_file, part_id): ...@@ -134,7 +136,7 @@ def load_partition(conf_file, part_id):
assert 'edge_map' in part_metadata, "cannot get the edge map." assert 'edge_map' in part_metadata, "cannot get the edge map."
node_map = np.load(part_metadata['node_map']) node_map = np.load(part_metadata['node_map'])
edge_map = np.load(part_metadata['edge_map']) edge_map = np.load(part_metadata['edge_map'])
meta = (part_metadata['num_nodes'], part_metadata['num_edges'], node_map, edge_map) meta = (part_metadata['num_nodes'], part_metadata['num_edges'], node_map, edge_map, num_parts)
assert NID in graph.ndata, "the partition graph should contain node mapping to global node Id" assert NID in graph.ndata, "the partition graph should contain node mapping to global node Id"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge Id" assert EID in graph.edata, "the partition graph should contain edge mapping to global edge Id"
......
import dgl
import sys
import numpy as np
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from dgl.graph_index import create_graph_index
from dgl.distributed import partition_graph, load_partition, GraphPartitionBook
import backend as F
import unittest
import pickle
def create_ip_config():
ip_config = open("ip_config.txt", "w")
ip_config.write('192.168.9.12 30050 0\n')
ip_config.write('192.168.9.13 30050 1\n')
ip_config.write('192.168.9.14 30050 2\n')
ip_config.write('192.168.9.15 30050 3\n')
ip_config.close()
def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64)
ig = create_graph_index(arr, readonly=True)
return dgl.DGLGraph(ig)
@unittest.skipIf(F._default_context_str == 'gpu', reason="METIS doesn't support GPU")
def test_graph_partition_book():
g = create_random_graph(10000)
g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
num_parts = 4
num_hops = 2
create_ip_config()
partition_graph(g, 'test', num_parts, '/tmp', num_hops=num_hops, part_method='metis')
for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta
gpb = GraphPartitionBook(part_id=i,
num_parts=num_partitions,
node_map=node_map,
edge_map=edge_map,
part_graph=part_g)
assert gpb.num_partitions() == num_parts
gpb_meta = gpb.metadata()
assert len(gpb_meta) == num_parts
assert np.all(F.asnumpy(gpb.nid2partid(F.arange(0, len(node_map)))) == node_map)
assert np.all(F.asnumpy(gpb.eid2partid(F.arange(0, len(edge_map)))) == edge_map)
assert len(gpb.partid2nids(i)) == gpb_meta[i]['num_nodes']
assert len(gpb.partid2eids(i)) == gpb_meta[i]['num_edges']
local_nid = gpb.nid2localnid(part_g.ndata[dgl.NID], i)
assert np.all(F.asnumpy(local_nid) == F.asnumpy(F.arange(0, len(local_nid))))
local_eid = gpb.eid2localeid(part_g.edata[dgl.EID], i)
assert np.all(F.asnumpy(local_eid) == F.asnumpy(F.arange(0, len(local_eid))))
if __name__ == '__main__':
test_graph_partition_book()
\ No newline at end of file
...@@ -24,11 +24,12 @@ def test_partition(): ...@@ -24,11 +24,12 @@ def test_partition():
partition_graph(g, 'test', num_parts, '/tmp', num_hops=num_hops, part_method='metis') partition_graph(g, 'test', num_parts, '/tmp', num_hops=num_hops, part_method='metis')
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/test.json', i) part_g, node_feats, edge_feats, meta = load_partition('/tmp/test.json', i)
num_nodes, num_edges, node_map, edge_map = meta num_nodes, num_edges, node_map, edge_map, num_partitions = meta
# Check the metadata # Check the metadata
assert num_nodes == g.number_of_nodes() assert num_nodes == g.number_of_nodes()
assert num_edges == g.number_of_edges() assert num_edges == g.number_of_edges()
assert num_partitions == num_parts
# Check the node map. # Check the node map.
local_nodes = np.nonzero(node_map == i)[0] local_nodes = np.nonzero(node_map == i)[0]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment