Unverified Commit 69226588 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist] defer to load node/edge feats (#4143)



* [Dist] defer to load node/edge feats

* fix lint

* Update python/dgl/distributed/partition.py
Co-authored-by: default avatarMinjie Wang <minjie.wang@nyu.edu>

* Update python/dgl/distributed/partition.py
Co-authored-by: default avatarMinjie Wang <minjie.wang@nyu.edu>

* fix lint
Co-authored-by: default avatarMinjie Wang <minjie.wang@nyu.edu>
parent 532d4ac3
...@@ -98,6 +98,7 @@ Split and Load Graphs ...@@ -98,6 +98,7 @@ Split and Load Graphs
:toctree: ../../generated/ :toctree: ../../generated/
load_partition load_partition
load_partition_feats
load_partition_book load_partition_book
partition_graph partition_graph
...@@ -16,7 +16,7 @@ import sys ...@@ -16,7 +16,7 @@ import sys
from .dist_graph import DistGraphServer, DistGraph, node_split, edge_split from .dist_graph import DistGraphServer, DistGraph, node_split, edge_split
from .dist_tensor import DistTensor from .dist_tensor import DistTensor
from .partition import partition_graph, load_partition, load_partition_book from .partition import partition_graph, load_partition, load_partition_feats, load_partition_book
from .graph_partition_book import GraphPartitionBook, PartitionPolicy from .graph_partition_book import GraphPartitionBook, PartitionPolicy
from .nn import * from .nn import *
from . import optim from . import optim
......
...@@ -17,7 +17,7 @@ from .kvstore import KVServer, get_kvstore ...@@ -17,7 +17,7 @@ from .kvstore import KVServer, get_kvstore
from .._ffi.ndarray import empty_shared_mem from .._ffi.ndarray import empty_shared_mem
from ..ndarray import exist_shared_mem_array from ..ndarray import exist_shared_mem_array
from ..frame import infer_scheme from ..frame import infer_scheme
from .partition import load_partition, load_partition_book from .partition import load_partition, load_partition_feats, load_partition_book
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
from .graph_partition_book import HeteroDataName, parse_hetero_data_name from .graph_partition_book import HeteroDataName, parse_hetero_data_name
from .graph_partition_book import NodePartitionPolicy, EdgePartitionPolicy from .graph_partition_book import NodePartitionPolicy, EdgePartitionPolicy
...@@ -332,8 +332,9 @@ class DistGraphServer(KVServer): ...@@ -332,8 +332,9 @@ class DistGraphServer(KVServer):
self.gpb, graph_name, ntypes, etypes = load_partition_book(part_config, self.part_id) self.gpb, graph_name, ntypes, etypes = load_partition_book(part_config, self.part_id)
self.client_g = None self.client_g = None
else: else:
self.client_g, node_feats, edge_feats, self.gpb, graph_name, \ # Loading of node/edge_feats are deferred to lower the peak memory consumption.
ntypes, etypes = load_partition(part_config, self.part_id) self.client_g, _, _, self.gpb, graph_name, \
ntypes, etypes = load_partition(part_config, self.part_id, load_feats=False)
print('load ' + graph_name) print('load ' + graph_name)
# Create the graph formats specified the users. # Create the graph formats specified the users.
self.client_g = self.client_g.formats(graph_format) self.client_g = self.client_g.formats(graph_format)
...@@ -352,6 +353,7 @@ class DistGraphServer(KVServer): ...@@ -352,6 +353,7 @@ class DistGraphServer(KVServer):
self.add_part_policy(PartitionPolicy(edge_name.policy_str, self.gpb)) self.add_part_policy(PartitionPolicy(edge_name.policy_str, self.gpb))
if not self.is_backup_server(): if not self.is_backup_server():
node_feats, edge_feats = load_partition_feats(part_config, self.part_id)
for name in node_feats: for name in node_feats:
# The feature name has the following format: node_type + "/" + feature_name to avoid # The feature name has the following format: node_type + "/" + feature_name to avoid
# feature name collision for different node types. # feature name collision for different node types.
......
...@@ -42,7 +42,7 @@ def _get_part_ranges(id_ranges): ...@@ -42,7 +42,7 @@ def _get_part_ranges(id_ranges):
res[key] = np.concatenate([np.array(l) for l in id_ranges[key]]).reshape(-1, 2) res[key] = np.concatenate([np.array(l) for l in id_ranges[key]]).reshape(-1, 2)
return res return res
def load_partition(part_config, part_id): def load_partition(part_config, part_id, load_feats=True):
''' Load data of a partition from the data path. ''' Load data of a partition from the data path.
A partition data includes a graph structure of the partition, a dict of node tensors, A partition data includes a graph structure of the partition, a dict of node tensors,
...@@ -61,6 +61,9 @@ def load_partition(part_config, part_id): ...@@ -61,6 +61,9 @@ def load_partition(part_config, part_id):
The path of the partition config file. The path of the partition config file.
part_id : int part_id : int
The partition ID. The partition ID.
load_feats : bool, optional
Whether to load node/edge feats. If False, the returned node/edge feature
dictionaries will be empty. Default: True.
Returns Returns
------- -------
...@@ -86,28 +89,8 @@ def load_partition(part_config, part_id): ...@@ -86,28 +89,8 @@ def load_partition(part_config, part_id):
part_metadata = json.load(conf_f) part_metadata = json.load(conf_f)
assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id) assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id)
part_files = part_metadata['part-{}'.format(part_id)] part_files = part_metadata['part-{}'.format(part_id)]
assert 'node_feats' in part_files, "the partition does not contain node features."
assert 'edge_feats' in part_files, "the partition does not contain edge feature."
assert 'part_graph' in part_files, "the partition does not contain graph structure." assert 'part_graph' in part_files, "the partition does not contain graph structure."
node_feats = load_tensors(relative_to_config(part_files['node_feats']))
edge_feats = load_tensors(relative_to_config(part_files['edge_feats']))
graph = load_graphs(relative_to_config(part_files['part_graph']))[0][0] graph = load_graphs(relative_to_config(part_files['part_graph']))[0][0]
# In the old format, the feature name doesn't contain node/edge type.
# For compatibility, let's add node/edge types to the feature names.
node_feats1 = {}
edge_feats1 = {}
for name in node_feats:
feat = node_feats[name]
if name.find('/') == -1:
name = '_N/' + name
node_feats1[name] = feat
for name in edge_feats:
feat = edge_feats[name]
if name.find('/') == -1:
name = '_E/' + name
edge_feats1[name] = feat
node_feats = node_feats1
edge_feats = edge_feats1
assert NID in graph.ndata, "the partition graph should contain node mapping to global node ID" assert NID in graph.ndata, "the partition graph should contain node mapping to global node ID"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge ID" assert EID in graph.edata, "the partition graph should contain edge mapping to global edge ID"
...@@ -134,8 +117,61 @@ def load_partition(part_config, part_id): ...@@ -134,8 +117,61 @@ def load_partition(part_config, part_id):
assert np.all(F.asnumpy(partids1 == part_id)), 'load a wrong partition' assert np.all(F.asnumpy(partids1 == part_id)), 'load a wrong partition'
assert np.all(F.asnumpy(partids2 == part_id)), 'load a wrong partition' assert np.all(F.asnumpy(partids2 == part_id)), 'load a wrong partition'
etypes_list.append(etype) etypes_list.append(etype)
node_feats = {}
edge_feats = {}
if load_feats:
node_feats, edge_feats = load_partition_feats(part_config, part_id)
return graph, node_feats, edge_feats, gpb, graph_name, ntypes_list, etypes_list return graph, node_feats, edge_feats, gpb, graph_name, ntypes_list, etypes_list
def load_partition_feats(part_config, part_id):
'''Load node/edge feature data from a partition.
Parameters
----------
part_config : str
The path of the partition config file.
part_id : int
The partition ID.
Returns
-------
Dict[str, Tensor]
Node features.
Dict[str, Tensor]
Edge features.
'''
config_path = os.path.dirname(part_config)
relative_to_config = lambda path: os.path.join(config_path, path)
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id)
part_files = part_metadata['part-{}'.format(part_id)]
assert 'node_feats' in part_files, "the partition does not contain node features."
assert 'edge_feats' in part_files, "the partition does not contain edge feature."
node_feats = load_tensors(relative_to_config(part_files['node_feats']))
edge_feats = load_tensors(relative_to_config(part_files['edge_feats']))
# In the old format, the feature name doesn't contain node/edge type.
# For compatibility, let's add node/edge types to the feature names.
node_feats1 = {}
edge_feats1 = {}
for name in node_feats:
feat = node_feats[name]
if name.find('/') == -1:
name = '_N/' + name
node_feats1[name] = feat
for name in edge_feats:
feat = edge_feats[name]
if name.find('/') == -1:
name = '_E/' + name
edge_feats1[name] = feat
node_feats = node_feats1
edge_feats = edge_feats1
return node_feats, edge_feats
def load_partition_book(part_config, part_id, graph=None): def load_partition_book(part_config, part_id, graph=None):
''' Load a graph partition book from the partition config file. ''' Load a graph partition book from the partition config file.
......
...@@ -5,7 +5,7 @@ import numpy as np ...@@ -5,7 +5,7 @@ import numpy as np
from scipy import sparse as spsp from scipy import sparse as spsp
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
from dgl.heterograph_index import create_unitgraph_from_coo from dgl.heterograph_index import create_unitgraph_from_coo
from dgl.distributed import partition_graph, load_partition from dgl.distributed import partition_graph, load_partition, load_partition_feats
from dgl import function as fn from dgl import function as fn
import backend as F import backend as F
import unittest import unittest
...@@ -160,7 +160,7 @@ def verify_graph_feats(g, gpb, part, node_feats, edge_feats): ...@@ -160,7 +160,7 @@ def verify_graph_feats(g, gpb, part, node_feats, edge_feats):
edata = F.gather_row(edge_feats[etype + '/' + name], local_eids) edata = F.gather_row(edge_feats[etype + '/' + name], local_eids)
assert np.all(F.asnumpy(edata == true_feats)) assert np.all(F.asnumpy(edata == true_feats))
def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machine=1): def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machine=1, load_feats=True):
hg.nodes['n1'].data['labels'] = F.arange(0, hg.number_of_nodes('n1')) hg.nodes['n1'].data['labels'] = F.arange(0, hg.number_of_nodes('n1'))
hg.nodes['n1'].data['feats'] = F.tensor(np.random.randn(hg.number_of_nodes('n1'), 10), F.float32) hg.nodes['n1'].data['feats'] = F.tensor(np.random.randn(hg.number_of_nodes('n1'), 10), F.float32)
hg.edges['r1'].data['feats'] = F.tensor(np.random.randn(hg.number_of_edges('r1'), 10), F.float32) hg.edges['r1'].data['feats'] = F.tensor(np.random.randn(hg.number_of_edges('r1'), 10), F.float32)
...@@ -180,7 +180,12 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin ...@@ -180,7 +180,12 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin
shuffled_labels = [] shuffled_labels = []
shuffled_elabels = [] shuffled_elabels = []
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition('/tmp/partition/test.json', i) part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition(
'/tmp/partition/test.json', i, load_feats=load_feats)
if not load_feats:
assert not node_feats
assert not edge_feats
node_feats, edge_feats = load_partition_feats('/tmp/partition/test.json', i)
if num_trainers_per_machine > 1: if num_trainers_per_machine > 1:
for ntype in hg.ntypes: for ntype in hg.ntypes:
name = ntype + '/trainer_id' name = ntype + '/trainer_id'
...@@ -237,7 +242,7 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin ...@@ -237,7 +242,7 @@ def check_hetero_partition(hg, part_method, num_parts=4, num_trainers_per_machin
assert np.all(orig_labels == F.asnumpy(hg.nodes['n1'].data['labels'])) assert np.all(orig_labels == F.asnumpy(hg.nodes['n1'].data['labels']))
assert np.all(orig_elabels == F.asnumpy(hg.edges['r1'].data['labels'])) assert np.all(orig_elabels == F.asnumpy(hg.edges['r1'].data['labels']))
def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_machine=1): def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_machine=1, load_feats=True):
g.ndata['labels'] = F.arange(0, g.number_of_nodes()) g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10), F.float32) g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10), F.float32)
g.edata['feats'] = F.tensor(np.random.randn(g.number_of_edges(), 10), F.float32) g.edata['feats'] = F.tensor(np.random.randn(g.number_of_edges(), 10), F.float32)
...@@ -252,7 +257,12 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac ...@@ -252,7 +257,12 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac
shuffled_labels = [] shuffled_labels = []
shuffled_edata = [] shuffled_edata = []
for i in range(num_parts): for i in range(num_parts):
part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition('/tmp/partition/test.json', i) part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition(
'/tmp/partition/test.json', i, load_feats=load_feats)
if not load_feats:
assert not node_feats
assert not edge_feats
node_feats, edge_feats = load_partition_feats('/tmp/partition/test.json', i)
if num_trainers_per_machine > 1: if num_trainers_per_machine > 1:
for ntype in g.ntypes: for ntype in g.ntypes:
name = ntype + '/trainer_id' name = ntype + '/trainer_id'
...@@ -402,6 +412,7 @@ def test_partition(): ...@@ -402,6 +412,7 @@ def test_partition():
check_partition(g, 'metis', True, 1, 8) check_partition(g, 'metis', True, 1, 8)
check_partition(g, 'random', False) check_partition(g, 'random', False)
check_partition(g, 'random', True) check_partition(g, 'random', True)
check_partition(g, 'metis', True, 4, 8, load_feats=False)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph") @unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
...@@ -413,6 +424,7 @@ def test_hetero_partition(): ...@@ -413,6 +424,7 @@ def test_hetero_partition():
check_hetero_partition(hg, 'metis', 1, 8) check_hetero_partition(hg, 'metis', 1, 8)
check_hetero_partition(hg, 'metis', 4, 8) check_hetero_partition(hg, 'metis', 4, 8)
check_hetero_partition(hg, 'random') check_hetero_partition(hg, 'random')
check_hetero_partition(hg, 'metis', 4, 8, load_feats=False)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment