"tests/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "75e2af79934af67871bd9b204672dcecf9af45df"
Unverified Commit 2190c39d authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Feature] Distributed graph store (#1383)

* initial version from distributed training.

This is copied from multiprocessing training.

* modify for distributed training.

* it's runnable now.

* measure time in neighbor sampling.

* simplify neighbor sampling.

* fix a bug in distributed neighbor sampling.

* allow single-machine training.

* fix a bug.

* fix a bug.

* fix openmp.

* make some improvement.

* fix.

* add prepare in the sampler.

* prepare nodeflow async.

* fix a bug.

* get id.

* simplify the code.

* improve.

* fix partition.py

* fix the example.

* add more features.

* fix the example.

* allow one partition

* use distributed kvstore.

* do g2l map manually.

* fix commandline.

* a temp script to save reddit.

* fix pull_handler.

* add pytorch version.

* estimate the time for copying data.

* delete unused code.

* fix a bug.

* print id.

* fix a bug

* fix a bug

* fix a bug.

* remove redundent code.

* revert modify in sampler.

* fix temp script.

* remove pytorch version.

* fix.

* distributed training with pytorch.

* add distributed graph store.

* fix.

* add metis_partition_assignment.

* fix a few bugs in distributed graph store.

* fix test.

* fix bugs in distributed graph store.

* fix tests.

* remove code of defining DistGraphStore.

* fix partition.

* fix example.

* update run.sh.

* only read necessary node data.

* batching data fetch of multiple NodeFlows.

* simplify gcn.

* remove unnecessary code.

* use the new copy_from_kvstore.

* update training script.

* print time in graphsage.

* make distributed training runnable.

* use val_nid.

* fix train_sampling.

* add distributed training.

* add run.sh

* add more timing.

* fix a bug.

* save graph metadata when partition.

* create ndata and edata in distributed graph store.

* add timing in minibatch training of GraphSage.

* use pytorch distributed.

* add checks.

* fix a bug in global vs. local ids.

* remove fast pull

* fix a compile error.

* update and add new APIs.

* implement more methods in DistGraphStore.

* update more APIs.

* rename it to DistGraph.

* rename to DistTensor

* remove some unnecessary API.

* remove unnecessary files.

* revert changes in sampler.

* Revert "simplify gcn."

This reverts commit 0ed3a34ca714203a5b45240af71555d4227ce452.

* Revert "simplify neighbor sampling."

This reverts commit 551c72d20f05a029360ba97f312c7a7a578aacec.

* Revert "measure time in neighbor sampling."

This reverts commit 63ae80c7b402bb626e24acbbc8fdfe9fffd0bc64.

* Revert "add timing in minibatch training of GraphSage."

This reverts commit e59dc8957a414c7df5c316f51d78bce822bdef5e.

* Revert "fix train_sampling."

This reverts commit ea6aea9a4aabb8ba0ff63070aa51e7ca81536ad9.

* fix lint.

* add comments and small update.

* add more comments.

* add more unit tests and fix bugs.

* check the existence of shared-mem graph index.

* use new partitioned graph storage.

* fix bugs.

* print error in fast pull.

* fix lint

* fix a compile error.

* save absolute path after partitioning.

* small fixes in the example

* Revert "[kvstore] support any data type for init_data() (#1465)"

This reverts commit 87b6997b

.

* fix a bug.

* disable evaluation.

* Revert "Revert "[kvstore] support any data type for init_data() (#1465)""

This reverts commit f5b8039c6326eb73bad8287db3d30d93175e5bee.

* support set and init data.

* support set and init data.

* Revert "Revert "[kvstore] support any data type for init_data() (#1465)""

This reverts commit f5b8039c6326eb73bad8287db3d30d93175e5bee.

* fix bugs.

* fix unit test.

* move to dgl.distributed.

* fix lint.

* fix lint.

* remove local_nids.

* fix lint.

* fix test.

* remove train_dist.

* revert train_sampling.

* rename funcs.

* address comments.

* address comments.

Use NodeDataView/EdgeDataView to keep track of data.

* address comments.

* address comments.

* revert.

* save data with DGL serializer.

* use the right way of getting shape.

* fix lint.

* address comments.

* address comments.

* fix an error in mxnet.

* address comments.

* add edge_map.

* add more test and fix bugs.
Co-authored-by: default avatarZheng <dzzhen@186590dc80ff.ant.amazon.com>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-6-131.us-east-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-26-167.us-east-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-16-150.us-west-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-16-250.us-west-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-30-135.us-west-2.compute.internal>
parent 5fc334fc
...@@ -78,7 +78,6 @@ class SAGE(nn.Module): ...@@ -78,7 +78,6 @@ class SAGE(nn.Module):
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling). Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph. g : the entire graph.
x : the input of entire node set. x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and The inference code is written in a fashion that it could handle any number of nodes and
layers. layers.
""" """
...@@ -114,7 +113,6 @@ def prepare_mp(g): ...@@ -114,7 +113,6 @@ def prepare_mp(g):
Explicitly materialize the CSR, CSC and COO representation of the given graph Explicitly materialize the CSR, CSC and COO representation of the given graph
so that they could be shared via copy-on-write to sampler workers and GPU so that they could be shared via copy-on-write to sampler workers and GPU
trainers. trainers.
This is a workaround before full shared memory support on heterogeneous graphs. This is a workaround before full shared memory support on heterogeneous graphs.
""" """
g.in_degree(0) g.in_degree(0)
......
...@@ -58,13 +58,20 @@ class SharedMemory { ...@@ -58,13 +58,20 @@ class SharedMemory {
* \param size the size of the shared memory. * \param size the size of the shared memory.
* \return the address of the shared memory * \return the address of the shared memory
*/ */
void *create_new(size_t size); void *CreateNew(size_t size);
/* /*
* \brief allocate shared memory that has been created. * \brief allocate shared memory that has been created.
* \param size the size of the shared memory. * \param size the size of the shared memory.
* \return the address of the shared memory * \return the address of the shared memory
*/ */
void *open(size_t size); void *Open(size_t size);
/*
* \brief check if the shared memory exist.
* \param name the name of the shared memory.
* \return a boolean value to indicate if the shared memory exists.
*/
static bool Exist(const std::string &name);
}; };
#endif // _WIN32 #endif // _WIN32
......
from . import sampling from . import sampling
from . import graph_store from . import graph_store
from .dis_kvstore import KVClient, KVServer from .dis_kvstore import KVClient, KVServer
from .dis_kvstore import read_ip_config from .dis_kvstore import read_ip_config
\ No newline at end of file
...@@ -1381,4 +1381,4 @@ class KVClient(object): ...@@ -1381,4 +1381,4 @@ class KVClient(object):
self._data_store self._data_store
""" """
target[name][ID] = data target[name][ID] = data
\ No newline at end of file
"""DGL distributed."""
from .dist_graph import DistGraphServer, DistGraph
from .partition import partition_graph, load_partition
This diff is collapsed.
"""Functions for partitions.
For distributed training, a graph is partitioned and partitions are stored in files
organized as follows:
```
data_root_dir/
|-- part_conf.json # partition configuration file in JSON
|-- node_map # partition id of each node stored in a numpy array
|-- edge_map # partition id of each edge stored in a numpy array
|-- part0/ # data for partition 0
|-- node_feats # node features stored in binary format
|-- edge_feats # edge features stored in binary format
|-- graph # graph structure of this partition stored in binary format
|-- part1/ # data for partition 1
|-- node_feats
|-- edge_feats
|-- graph
```
The partition configuration file stores the file locations. For the above example,
the configuration file will look like the following:
```
{
"graph_name" : "test",
"part_method" : "metis",
"num_parts" : 2
"halo_hops" : 1,
"node_map" : "data_root_dir/node_map.npy",
"edge_map" : "data_root_dir/edge_map.npy"
"num_nodes" : 1000000,
"num_edges" : 52000000,
"part-0" : {
"node_feats" : "data_root_dir/part0/node_feats.dgl",
"edge_feats" : "data_root_dir/part0/edge_feats.dgl",
"part_graph" : "data_root_dir/part0/graph.dgl",
},
"part-1" : {
"node_feats" : "data_root_dir/part1/node_feats.dgl",
"edge_feats" : "data_root_dir/part1/edge_feats.dgl",
"part_graph" : "data_root_dir/part1/graph.dgl",
},
}
```
Here are the definition of the fields in the partition configuration file:
* `graph_name` is the name of the graph given by a user.
* `part_method` is the method used to assign nodes to partitions.
Currently, it supports "random" and "metis".
* `num_parts` is the number of partitions.
* `halo_hops` is the number of HALO nodes we want to include in a partition.
* `node_map` is the node assignment map, which tells the partition Id a node is assigned to.
* `edge_map` is the edge assignment map, which tells the partition Id an edge is assigned to.
* `num_nodes` is the number of nodes in the global graph.
* `num_edges` is the number of edges in the global graph.
* `part-*` stores the data of a partition.
Nodes in each partition is *relabeled* to always start with zero. We call the node
ID in the original graph, *global ID*, while the relabeled ID in each partition,
*local ID*. Each partition graph has an integer node data tensor stored under name
`dgl.NID` and each value is the node's global ID. Similarly, edges are relabeled too
and the mapping from local ID to global ID is stored as an integer edge data tensor
under name `dgl.EID`.
Note that each partition can contain *HALO* nodes and edges, those belonging to
other partitions but are included in this partition for integrity or efficiency concerns.
We call nodes and edges that truly belong to one partition *local nodes/edges*, while
the rest "HALO nodes/edges".
Node and edge features are splitted and stored together with each graph partition.
We do not store features of HALO nodes and edges.
Two useful functions in this module:
* :func:`~dgl.distributed.load_partition` loads one partition and the meta data into memory.
* :func:`~dgl.distributed.partition` partitions a graph into files organized as above.
"""
import json
import os
import numpy as np
from .. import backend as F
from ..base import NID, EID
from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors
from ..transform import metis_partition_assignment, partition_graph_with_halo
def load_partition(conf_file, part_id):
''' Load data of a partition from the data path in the DistGraph server.
A partition data includes a graph structure of the partition, a dict of node tensors,
a dict of edge tensors and some metadata. The partition may contain the HALO nodes,
which are the nodes replicated from other partitions. However, the dict of node tensors
only contains the node data that belongs to the local partition. Similarly, edge tensors
only contains the edge data that belongs to the local partition. The metadata include
the information of the global graph (not the local partition), which includes the number
of nodes, the number of edges as well as the node assignment of the global graph.
The function currently loads data through the normal filesystem interface. In the future,
we need to support loading data from other storage such as S3 and HDFS.
Parameters
----------
conf_file : str
The path of the partition config file.
part_id : int
The partition Id.
Returns
-------
DGLGraph
The graph partition structure.
dict of tensors
All node features.
dict of tensors
All edge features.
(int, int, NumPy ndarray, Numpy ndarray))
The metadata of the global graph: number of nodes, number of edges, node map, edge map.
'''
with open(conf_file) as conf_f:
part_metadata = json.load(conf_f)
assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id)
part_files = part_metadata['part-{}'.format(part_id)]
assert 'node_feats' in part_files, "the partition does not contain node features."
assert 'edge_feats' in part_files, "the partition does not contain edge feature."
assert 'part_graph' in part_files, "the partition does not contain graph structure."
node_feats = load_tensors(part_files['node_feats'])
edge_feats = load_tensors(part_files['edge_feats'])
graph = load_graphs(part_files['part_graph'])[0][0]
assert 'num_nodes' in part_metadata, "cannot get the number of nodes of the global graph."
assert 'num_edges' in part_metadata, "cannot get the number of edges of the global graph."
assert 'node_map' in part_metadata, "cannot get the node map."
assert 'edge_map' in part_metadata, "cannot get the edge map."
node_map = np.load(part_metadata['node_map'])
edge_map = np.load(part_metadata['edge_map'])
meta = (part_metadata['num_nodes'], part_metadata['num_edges'], node_map, edge_map)
assert NID in graph.ndata, "the partition graph should contain node mapping to global node Id"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge Id"
# TODO we need to fix this. DGL backend doesn't support boolean or byte.
# int64 is unnecessary.
part_ids = F.zerocopy_from_numpy(node_map)[graph.ndata[NID]]
graph.ndata['local_node'] = F.astype(part_ids == part_id, F.int64)
part_ids = F.zerocopy_from_numpy(edge_map)[graph.edata[EID]]
graph.edata['local_edge'] = F.astype(part_ids == part_id, F.int64)
return graph, node_feats, edge_feats, meta
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis"):
''' Partition a graph for distributed training and store the partitions on files.
The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to
assign nodes to partitions; 2) construct partition graph structure based on
the node assignment; 3) split the node features and edge features based on
the partition result.
The partitioned data is stored into multiple files.
First, the metadata of the original graph and the partitioning is stored in a JSON file
named after `graph_name`. This JSON file contains the information of the originla graph
as well as the file names that store each partition.
The node assignment is stored in a separate file if we don't reshuffle node Ids to ensure
that all nodes in a partition fall into a contiguous Id range. The node assignment is stored
in a numpy file.
All node features in a partition are stored in a file with DGL format. The node features are
stored in a dictionary, in which the key is the node data name and the value is a tensor.
All edge features in a partition are stored in a file with DGL format. The edge features are
stored in a dictionary, in which the key is the edge data name and the value is a tensor.
The graph structure of a partition is stored in a file with the DGLGraph format. The DGLGraph
contains the mapping of node/edge Ids to the Ids in the original graph.
Parameters
----------
g : DGLGraph
The input graph to partition
graph_name : str
The name of the graph.
num_parts : int
The number of partitions
num_hops : int
The number of hops of HALO nodes we construct on a partition graph structure.
part_method : str
The partition method. It supports "random" and "metis".
out_path : str
The path to store the files for all partitioned data.
'''
if num_parts == 1:
client_parts = {0: g}
node_parts = F.zeros((g.number_of_nodes(),), F.int64, F.cpu())
g.ndata[NID] = F.arange(0, g.number_of_nodes())
g.edata[EID] = F.arange(0, g.number_of_edges())
elif part_method == 'metis':
node_parts = metis_partition_assignment(g, num_parts)
client_parts = partition_graph_with_halo(g, node_parts, num_hops)
elif part_method == 'random':
node_parts = dgl.random.choice(num_parts, g.number_of_nodes())
client_parts = partition_graph_with_halo(g, node_parts, num_hops)
else:
raise Exception('Unknown partitioning method: ' + part_method)
# Let's calculate edge assignment.
# TODO(zhengda) we should replace int64 with int16. int16 should be sufficient.
if num_parts > 1:
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1
num_edges = 0
lnodes_list = [] # The node ids of each partition
ledges_list = [] # The edge Ids of each partition
for part_id in range(num_parts):
part = client_parts[part_id]
local_nodes = F.boolean_mask(part.ndata[NID], part.ndata['inner_node'] == 1)
local_edges = F.asnumpy(g.in_edges(local_nodes, form='eid'))
edge_parts[local_edges] = part_id
num_edges += len(local_edges)
lnodes_list.append(local_nodes)
ledges_list.append(local_edges)
assert num_edges == g.number_of_edges()
else:
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64)
os.makedirs(out_path, mode=0o775, exist_ok=True)
tot_num_inner_edges = 0
out_path = os.path.abspath(out_path)
node_part_file = os.path.join(out_path, "node_map")
edge_part_file = os.path.join(out_path, "edge_map")
np.save(node_part_file, F.asnumpy(node_parts), allow_pickle=False)
np.save(edge_part_file, edge_parts, allow_pickle=False)
part_metadata = {'graph_name': graph_name,
'num_nodes': g.number_of_nodes(),
'num_edges': g.number_of_edges(),
'part_method': part_method,
'num_parts': num_parts,
'halo_hops': num_hops,
'node_map': node_part_file + ".npy",
'edge_map': edge_part_file + ".npy"}
for part_id in range(num_parts):
part = client_parts[part_id]
# Get the node/edge features of each partition.
node_feats = {}
edge_feats = {}
if num_parts > 1:
local_nodes = lnodes_list[part_id]
local_edges = ledges_list[part_id]
print('part {} has {} nodes and {} edges.'.format(
part_id, part.number_of_nodes(), part.number_of_edges()))
print('{} nodes and {} edges are inside the partition'.format(
len(local_nodes), len(local_edges)))
tot_num_inner_edges += len(local_edges)
for name in g.ndata:
node_feats[name] = g.ndata[name][local_nodes]
for name in g.edata:
edge_feats[name] = g.edata[name][local_edges]
else:
for name in g.ndata:
node_feats[name] = g.ndata[name]
for name in g.edata:
edge_feats[name] = g.edata[name]
part_dir = os.path.join(out_path, "part" + str(part_id))
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
os.makedirs(part_dir, mode=0o775, exist_ok=True)
save_tensors(node_feat_file, node_feats)
save_tensors(edge_feat_file, edge_feats)
save_graphs(part_graph_file, [part])
with open('{}/{}.json'.format(out_path, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
num_cuts = g.number_of_edges() - tot_num_inner_edges
if num_parts == 1:
num_cuts = 0
print('There are {} edges in the graph and {} edge cuts for {} partitions.'.format(
g.number_of_edges(), num_cuts, num_parts))
...@@ -1025,6 +1025,11 @@ def from_csr(indptr, indices, direction): ...@@ -1025,6 +1025,11 @@ def from_csr(indptr, indices, direction):
indices : Tensor indices : Tensor
column index array in the CSR format column index array in the CSR format
direction : str direction : str
Returns
------
GraphIndex
The graph index
the edge direction. Either "in" or "out". the edge direction. Either "in" or "out".
""" """
indptr = utils.toindex(indptr) indptr = utils.toindex(indptr)
...@@ -1042,6 +1047,11 @@ def from_shared_mem_graph_index(shared_mem_name): ...@@ -1042,6 +1047,11 @@ def from_shared_mem_graph_index(shared_mem_name):
---------- ----------
shared_mem_name : string shared_mem_name : string
the name of shared memory the name of shared memory
Returns
------
GraphIndex
The graph index
""" """
return _CAPI_DGLGraphCSRCreateMMap(shared_mem_name) return _CAPI_DGLGraphCSRCreateMMap(shared_mem_name)
......
...@@ -549,8 +549,7 @@ def remove_self_loop(g): ...@@ -549,8 +549,7 @@ def remove_self_loop(g):
return new_g return new_g
def partition_graph_with_halo(g, node_part, num_hops): def partition_graph_with_halo(g, node_part, num_hops):
''' ''' This is to partition a graph. Each partition contains HALO nodes
This is to partition a graph. Each partition contains HALO nodes
so that we can generate NodeFlow in each partition correctly. so that we can generate NodeFlow in each partition correctly.
Parameters Parameters
...@@ -586,9 +585,35 @@ def partition_graph_with_halo(g, node_part, num_hops): ...@@ -586,9 +585,35 @@ def partition_graph_with_halo(g, node_part, num_hops):
subg_dict[i] = subg subg_dict[i] = subg
return subg_dict return subg_dict
def metis_partition(g, k, extra_cached_hops=0): def metis_partition_assignment(g, k):
''' This assigns nodes to different partitions with Metis partitioning algorithm.
After the partition assignment, we construct partitions.
Parameters
----------
g : DGLGraph
The graph to be partitioned
k : int
The number of partitions.
Returns
-------
a 1-D tensor
A vector with each element that indicates the partition Id of a vertex.
''' '''
This is to partition a graph with Metis partitioning. # METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions.
sym_g = to_bidirected(g, readonly=True)
node_part = _CAPI_DGLMetisPartition(sym_g._graph, k)
if len(node_part) == 0:
return None
else:
node_part = utils.toindex(node_part)
return node_part.tousertensor()
def metis_partition(g, k, extra_cached_hops=0):
''' This is to partition a graph with Metis partitioning.
Metis assigns vertices to partitions. This API constructs graphs with the vertices assigned Metis assigns vertices to partitions. This API constructs graphs with the vertices assigned
to the partitions and their incoming edges. to the partitions and their incoming edges.
......
...@@ -517,6 +517,12 @@ ImmutableGraphPtr ImmutableGraph::CreateFromCSR( ...@@ -517,6 +517,12 @@ ImmutableGraphPtr ImmutableGraph::CreateFromCSR(
} }
ImmutableGraphPtr ImmutableGraph::CreateFromCSR(const std::string &name) { ImmutableGraphPtr ImmutableGraph::CreateFromCSR(const std::string &name) {
// If the shared memory graph index doesn't exist, we return null directly.
#ifndef _WIN32
if (!SharedMemory::Exist(GetSharedMemName(name, "meta"))) {
return nullptr;
}
#endif // _WIN32
GraphIndexMetadata meta = DeserializeMetadata(GetSharedMemName(name, "meta")); GraphIndexMetadata meta = DeserializeMetadata(GetSharedMemName(name, "meta"));
CSRPtr in_csr, out_csr; CSRPtr in_csr, out_csr;
if (meta.has_in_csr) { if (meta.has_in_csr) {
......
...@@ -766,6 +766,9 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull") ...@@ -766,6 +766,9 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull")
} }
} }
row_size *= (local_data->dtype.bits / 8); row_size *= (local_data->dtype.bits / 8);
size_t data_size = local_data.GetSize();
CHECK_GT(local_data_shape.size(), 0);
CHECK_EQ(row_size * local_data_shape[0], data_size);
// Get local id and remote id // Get local id and remote id
if (str_flag.compare("has_g2l") == 0) { if (str_flag.compare("has_g2l") == 0) {
NDArray g2l = args[11]; NDArray g2l = args[11];
...@@ -775,9 +778,12 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull") ...@@ -775,9 +778,12 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull")
int64_t part_id = pb_data[id]; int64_t part_id = pb_data[id];
if (part_id == local_machine_id) { if (part_id == local_machine_id) {
int64_t local_id = g2l_data[id]; int64_t local_id = g2l_data[id];
CHECK_LT(local_id, local_data_shape[0]);
CHECK_GE(local_id, 0);
local_ids.push_back(local_id); local_ids.push_back(local_id);
local_ids_orginal.push_back(i); local_ids_orginal.push_back(i);
} else { } else {
CHECK_LT(part_id, machine_count) << "invalid partition ID";
remote_ids[part_id].push_back(id); remote_ids[part_id].push_back(id);
remote_ids_original[part_id].push_back(i); remote_ids_original[part_id].push_back(i);
} }
...@@ -787,6 +793,8 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull") ...@@ -787,6 +793,8 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull")
int64_t id = ID_data[i]; int64_t id = ID_data[i];
int64_t part_id = pb_data[id]; int64_t part_id = pb_data[id];
if (part_id == local_machine_id) { if (part_id == local_machine_id) {
CHECK_LT(id, local_data_shape[0]);
CHECK_GE(id, 0);
local_ids.push_back(id); local_ids.push_back(id);
local_ids_orginal.push_back(i); local_ids_orginal.push_back(i);
} else { } else {
...@@ -822,6 +830,9 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull") ...@@ -822,6 +830,9 @@ DGL_REGISTER_GLOBAL("network._CAPI_FastPull")
// Copy local data // Copy local data
#pragma omp parallel for #pragma omp parallel for
for (int64_t i = 0; i < local_ids.size(); ++i) { for (int64_t i = 0; i < local_ids.size(); ++i) {
CHECK_GE(ID_size*row_size, local_ids_orginal[i] * row_size + row_size);
CHECK_GE(data_size, local_ids[i] * row_size + row_size);
CHECK_GE(local_ids[i], 0);
memcpy(return_data + local_ids_orginal[i] * row_size, memcpy(return_data + local_ids_orginal[i] * row_size,
local_data_char + local_ids[i] * row_size, local_data_char + local_ids[i] * row_size,
row_size); row_size);
......
...@@ -167,9 +167,9 @@ NDArray NDArray::EmptyShared(const std::string &name, ...@@ -167,9 +167,9 @@ NDArray NDArray::EmptyShared(const std::string &name,
#ifndef _WIN32 #ifndef _WIN32
auto mem = std::make_shared<SharedMemory>(name); auto mem = std::make_shared<SharedMemory>(name);
if (is_create) { if (is_create) {
ret.data_->dl_tensor.data = mem->create_new(size); ret.data_->dl_tensor.data = mem->CreateNew(size);
} else { } else {
ret.data_->dl_tensor.data = mem->open(size); ret.data_->dl_tensor.data = mem->Open(size);
} }
ret.data_->mem = mem; ret.data_->mem = mem;
......
...@@ -34,7 +34,7 @@ SharedMemory::~SharedMemory() { ...@@ -34,7 +34,7 @@ SharedMemory::~SharedMemory() {
} }
} }
void *SharedMemory::create_new(size_t size) { void *SharedMemory::CreateNew(size_t size) {
this->own = true; this->own = true;
int flag = O_RDWR|O_CREAT; int flag = O_RDWR|O_CREAT;
...@@ -49,7 +49,7 @@ void *SharedMemory::create_new(size_t size) { ...@@ -49,7 +49,7 @@ void *SharedMemory::create_new(size_t size) {
return ptr; return ptr;
} }
void *SharedMemory::open(size_t size) { void *SharedMemory::Open(size_t size) {
int flag = O_RDWR; int flag = O_RDWR;
fd = shm_open(name.c_str(), flag, S_IRUSR | S_IWUSR); fd = shm_open(name.c_str(), flag, S_IRUSR | S_IWUSR);
CHECK_NE(fd, -1) << "fail to open " << name << ": " << strerror(errno); CHECK_NE(fd, -1) << "fail to open " << name << ": " << strerror(errno);
...@@ -58,6 +58,16 @@ void *SharedMemory::open(size_t size) { ...@@ -58,6 +58,16 @@ void *SharedMemory::open(size_t size) {
<< "Failed to map shared memory. mmap failed with error " << strerror(errno); << "Failed to map shared memory. mmap failed with error " << strerror(errno);
return ptr; return ptr;
} }
bool SharedMemory::Exist(const std::string &name) {
int fd = shm_open(name.c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd >= 0) {
close(fd);
return true;
} else {
return false;
}
}
#endif // _WIN32 #endif // _WIN32
} // namespace runtime } // namespace runtime
......
import os
os.environ['OMP_NUM_THREADS'] = '1'
import dgl
import sys
import numpy as np
import time
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
import multiprocessing as mp
from dgl.graph_index import create_graph_index
from dgl.data.utils import load_graphs, save_graphs
from dgl.distributed import DistGraphServer, DistGraph
from dgl.distributed import partition_graph
import backend as F
import unittest
import pickle
server_namebook = {0: [0, '127.0.0.1', 30000, 1]}
def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64)
ig = create_graph_index(arr, readonly=True)
return dgl.DGLGraph(ig)
def run_server(graph_name, server_id, num_clients, barrier):
g = DistGraphServer(server_id, server_namebook, num_clients, graph_name,
'/tmp/{}.json'.format(graph_name))
barrier.wait()
print('start server', server_id)
g.start()
def run_client(graph_name, barrier, num_nodes, num_edges):
barrier.wait()
g = DistGraph(server_namebook, graph_name)
# Test API
assert g.number_of_nodes() == num_nodes
assert g.number_of_edges() == num_edges
# Test reading node data
nids = F.arange(0, int(g.number_of_nodes() / 2))
feats1 = g.ndata['features'][nids]
feats = F.squeeze(feats1, 1)
assert np.all(F.asnumpy(feats == nids))
# Test reading edge data
eids = F.arange(0, int(g.number_of_edges() / 2))
feats1 = g.edata['features'][eids]
feats = F.squeeze(feats1, 1)
assert np.all(F.asnumpy(feats == eids))
# Test init node data
new_shape = (g.number_of_nodes(), 2)
g.init_ndata('test1', new_shape, F.int32)
feats = g.ndata['test1'][nids]
assert np.all(F.asnumpy(feats) == 0)
# Test init edge data
new_shape = (g.number_of_edges(), 2)
g.init_edata('test1', new_shape, F.int32)
feats = g.edata['test1'][eids]
assert np.all(F.asnumpy(feats) == 0)
# Test write data
new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
g.ndata['test1'][nids] = new_feats
feats = g.ndata['test1'][nids]
assert np.all(F.asnumpy(feats) == 1)
# Test metadata operations.
assert len(g.ndata['features']) == g.number_of_nodes()
assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
assert g.ndata['features'].dtype == F.int64
assert g.node_attr_schemes()['features'].dtype == F.int64
assert g.node_attr_schemes()['test1'].dtype == F.int32
assert g.node_attr_schemes()['features'].shape == (1,)
g.shut_down()
print('end')
def run_server_client():
g = create_random_graph(10000)
# Partition the graph
num_parts = 1
graph_name = 'test'
g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
partition_graph(g, graph_name, num_parts, '/tmp')
# let's just test on one partition for now.
# We cannot run multiple servers and clients on the same machine.
barrier = mp.Barrier(2)
serv_ps = []
for serv_id in range(1):
p = Process(target=run_server, args=(graph_name, serv_id, 1, barrier))
serv_ps.append(p)
p.start()
cli_ps = []
for cli_id in range(1):
print('start client', cli_id)
p = Process(target=run_client, args=(graph_name, barrier, g.number_of_nodes(),
g.number_of_edges()))
p.start()
cli_ps.append(p)
for p in cli_ps:
p.join()
print('clients have terminated')
if __name__ == '__main__':
run_server_client()
import dgl
import sys
import numpy as np
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from dgl.graph_index import create_graph_index
from dgl.distributed import partition_graph, load_partition
import backend as F
import unittest
import pickle
def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64)
ig = create_graph_index(arr, readonly=True)
return dgl.DGLGraph(ig)
def test_partition():
g = create_random_graph(10000)
g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
num_parts = 4
num_hops = 2
partition_graph(g, 'test', num_parts, '/tmp', num_hops=num_hops, part_method='metis')
for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/test.json', i)
num_nodes, num_edges, node_map, edge_map = meta
# Check the metadata
assert num_nodes == g.number_of_nodes()
assert num_edges == g.number_of_edges()
# Check the node map.
local_nodes = np.nonzero(node_map == i)[0]
part_ids = node_map[F.asnumpy(part_g.ndata[dgl.NID])]
local_nodes1 = F.asnumpy(part_g.ndata[dgl.NID])[part_ids == i]
assert np.all(local_nodes == local_nodes1)
# Check the edge map.
assert np.all(edge_map >= 0)
local_edges = np.nonzero(edge_map == i)[0]
part_ids = edge_map[F.asnumpy(part_g.edata[dgl.EID])]
local_edges1 = F.asnumpy(part_g.edata[dgl.EID])[part_ids == i]
assert np.all(local_edges == np.sort(local_edges1))
for name in ['labels', 'feats']:
assert name in node_feats
assert node_feats[name].shape[0] == len(local_nodes)
assert len(local_nodes) == len(node_feats[name])
assert np.all(F.asnumpy(g.ndata[name][local_nodes]) == F.asnumpy(node_feats[name]))
assert len(edge_feats) == 0
if __name__ == '__main__':
test_partition()
...@@ -4,11 +4,15 @@ import signal ...@@ -4,11 +4,15 @@ import signal
import dgl import dgl
from dgl import backend as F from dgl import backend as F
from dgl.data.utils import load_graphs, save_graphs from dgl.data.utils import load_graphs, save_graphs
from dgl.contrib.dist_graph import partition_graph
import pickle
def main(): def main():
parser = argparse.ArgumentParser(description='Partition a graph') parser = argparse.ArgumentParser(description='Partition a graph')
parser.add_argument('--data', required=True, type=str, parser.add_argument('--data', required=True, type=str,
help='The file path of the input graph in the DGL format.') help='The file path of the input graph in the DGL format.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('-k', '--num-parts', required=True, type=int, parser.add_argument('-k', '--num-parts', required=True, type=int,
help='The number of partitions') help='The number of partitions')
parser.add_argument('--num-hops', type=int, default=1, parser.add_argument('--num-hops', type=int, default=1,
...@@ -18,39 +22,11 @@ def main(): ...@@ -18,39 +22,11 @@ def main():
parser.add_argument('-o', '--output', required=True, type=str, parser.add_argument('-o', '--output', required=True, type=str,
help='The output directory of the partitioned results') help='The output directory of the partitioned results')
args = parser.parse_args() args = parser.parse_args()
data_path = args.data glist, _ = load_graphs(args.data)
num_parts = args.num_parts
num_hops = args.num_hops
method = args.method
output = args.output
glist, _ = load_graphs(data_path)
g = glist[0] g = glist[0]
partition_graph(g, args.graph_name, args.num_parts, args.output,
num_hops=args.num_hops, part_method=args.method)
if args.method == 'metis':
part_dict = dgl.transform.metis_partition(g, num_parts, num_hops)
elif args.method == 'random':
node_parts = np.random.choice(num_parts, g.number_of_nodes())
part_dict = dgl.transform.partition_graph_with_halo(g, node_parts, num_hops)
else:
raise Exception('unknown partitioning method: ' + args.method)
tot_num_inner_edges = 0
for part_id in part_dict:
part = part_dict[part_id]
num_inner_nodes = len(np.nonzero(F.asnumpy(part.ndata['inner_node']))[0])
num_inner_edges = len(np.nonzero(F.asnumpy(part.edata['inner_edge']))[0])
print('part {} has {} nodes and {} edges. {} nodes and {} edges are inside the partition'.format(
part_id, part.number_of_nodes(), part.number_of_edges(),
num_inner_nodes, num_inner_edges))
tot_num_inner_edges += num_inner_edges
# TODO I duplicate some node features.
part.copy_from_parent()
save_graphs(output + '/' + str(part_id) + '.dgl', [part])
print('there are {} edges in the graph and {} edge cuts for {} partitions.'.format(
g.number_of_edges(), g.number_of_edges() - tot_num_inner_edges, len(part_dict)))
if __name__ == '__main__': if __name__ == '__main__':
main() main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment