Unverified Commit e4ff4844 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] Distributed METIS partition (#2576)



* add convert.

* fix.

* add write_mag.

* fix convert_partition.py

* write data.

* use pyarrow to read.

* update write_mag.py

* fix convert_partition.py.

* load node/edge features when necessary.

* reshuffle nodes.

* write mag correctly.

* fix a bug: inner nodes in a partition might be empty.

* fix bugs.

* add verify code.

* insert reverse edges.

* fix a bug.

* add get node/edge data.

* add instructions.

* remove unnecessary argument.

* update distributed preprocessing.

* fix readme.

* fix.

* fix.

* fix.

* fix readme.

* fix doc.

* fix.

* update readme

* update doc.

* update readme.
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-9-132.us-west-1.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-2-202.us-west-1.compute.internal>
parent 0aca5660
......@@ -5,20 +5,21 @@
:ref:`(中文版) <guide_cn-distributed-preprocessing>`
DGL requires preprocessing the graph data for distributed training, including two steps:
1) partition a graph into subgraphs, 2) assign nodes/edges with new Ids. DGL provides
a partitioning API that performs the two steps. The API supports both random partitioning
DGL requires to preprocess the graph data for distributed training. This includes two steps:
1) partition a graph into subgraphs, 2) assign nodes/edges with new IDs. For relatively small
graphs, DGL provides a partitioning API :func:`dgl.distributed.partition_graph` that performs
the two steps above. The API runs on one machine. Therefore, if a graph is large, users will
need a large machine to partition a graph when using this API. In addition to this API, we also
provide a solution to partition a large graph in a cluster of machines below (see Section 7.1.1).
:func:`dgl.distributed.partition_graph` supports both random partitioning
and a `Metis <http://glaros.dtc.umn.edu/gkhome/views/metis>`__-based partitioning.
The benefit of Metis partitioning is that it can generate
partitions with minimal edge cuts that reduces network communication for distributed training
partitions with minimal edge cuts to reduce network communication for distributed training
and inference. DGL uses the latest version of Metis with the options optimized for the real-world
graphs with power-law distribution. After partitioning, the API constructs the partitioned results
in a format that is easy to load during the training.
**Note**: The graph partition API currently runs on one machine. Therefore, if a graph is large,
users will need a large machine to partition a graph. In the future, DGL will support distributed
graph partitioning.
By default, the partition API assigns new IDs to the nodes and edges in the input graph to help locate
nodes/edges during distributed training/inference. After assigning IDs, the partition API shuffles
all node data and edge data accordingly. During the training, users just use the new node/edge IDs.
......@@ -39,19 +40,19 @@ the graph structure of the partition as well as some metadata on nodes and edges
data_root_dir/
|-- xxx.json # partition configuration file in JSON
|-- node_map.npy # partition id of each node stored in a numpy array (optional)
|-- edge_map.npy # partition id of each edge stored in a numpy array (optional)
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format
|-- edge_feats.dgl # edge features stored in binary format
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/ # data for partition 1
|-- node_map.npy # partition id of each node stored in a numpy array (optional)
|-- edge_map.npy # partition id of each edge stored in a numpy array (optional)
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format
|-- edge_feats.dgl # edge features stored in binary format
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
Load balancing
~~~~~~~~~~~~~~
^^^^^^^^^^^^^^
When partitioning a graph, by default, Metis only balances the number of nodes in each partition.
This can result in suboptimal configuration, depending on the task at hand. For example, in the case
......@@ -75,3 +76,252 @@ the number of edges incident to the nodes of different types.
**Note**: The graph name passed to :func:`dgl.distributed.partition_graph` is an important argument.
The graph name will be used by :class:`dgl.distributed.DistGraph` to identify a distributed graph.
A legal graph name should only contain alphabetic characters and underscores.
7.1.1 Distributed partitioning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For a large graph, DGL uses `ParMetis <http://glaros.dtc.umn.edu/gkhome/metis/parmetis/overview>`__ to partition
a graph in a cluster of machines. This solution requires users to prepare data for ParMETIS and use a DGL script
`tools/convert_partition.py` to construct :class:`dgl.DGLGraph` for the partitions output by ParMETIS.
**Note**: `convert_partition.py` uses the `pyarrow` package to load csv files. Please install `pyarrow`.
ParMETIS Installation
^^^^^^^^^^^^^^^^^^^^^
For now, we need to compile and install ParMETIS manually. We clone the DGL branch of ParMETIS as follows:
.. code-block:: none
git clone --branch dgl https://github.com/KarypisLab/ParMETIS.git
Then we follow the instructions in its Github to install its dependencies including METIS
and then compile and install ParMETIS.
.. code-block:: none
make config cc=mpicc prefix=~/local
make install
Before running ParMETIS, we need to set two environment variables: `PATH` and `LD_LIBRARY_PATH`.
.. code-block:: none
export PATH=$PATH:$HOME/local/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/local/lib/
Input format for ParMETIS
^^^^^^^^^^^^^^^^^^^^^^^^^
The input graph for ParMETIS is stored in three files with the following names: `xxx_nodes.txt`,
`xxx_edges.txt` and `xxx_stats.txt`, where `xxx` is a graph name.
Each row in `xxx_nodes.txt` stores the information of a node with the following format:
.. code-block:: none
<node_type> <weight1> ... <orig_type_node_id> <attributes>
All fields are separated by whitespace:
* `<node_type>` is an integer. For a homogeneous graph, its value is always 0. For heterogeneous graphs,
its value indicates the type of each node.
* `<weight1>`, `<weight2>`, etc are integers that indicate the node weights used by ParMETIS to balance
graph partitions. If a user does not provide node weights, ParMETIS partitions a graph and balance
the number of nodes in each partition (it is important to balance graph partitions in order to achieve
good training speed). However, this default strategy may not be sufficient for many use cases.
For example, in a heterogeneous graph, we want to partition the graph so that all partitions have
roughly the same number of nodes for each node type. The toy example below shows how we can use
node weights to balance the number of nodes of different types.
* `<orig_type_node_id>` is an integer representing the node ID in its own type. In DGL, nodes of each type
are assigned with IDs starting from 0. For a homogeneous graph, this field is the same as the node ID.
* `<attributes>` are optional fields. They can be used to store any values and ParMETIS does not interpret
these fields. Potentially, we can store the node features and edge features in these fields for
homogeneous graphs.
* The row ID indicates the *homogeneous* ID of nodes in a graph (all nodes are assigned with a unique ID).
All nodes of the same type should be assigned with contiguous IDs. That is, nodes of the same type should
be stored together in `xxx_nodes.txt`.
Below shows an example of a node file for a heterogeneous graph with two node types. Node type 0 has three
nodes; node type 1 has four nodes. It uses two node weights to ensure that ParMETIS will generate partitions
with roughly the same number of nodes for type 0 and the same number of nodes for type 1.
.. code-block:: none
0 1 0 0
0 1 0 1
0 1 0 2
1 0 1 0
1 0 1 1
1 0 1 2
1 0 1 3
Similarly, each row in `xxx_edges.txt` stores the information of an edge with the following format:
.. code-block:: none
<src_id> <dst_id> <type_edge_id> <edge_type> <attributes>
All fields are separated by whitespace:
* `<src_id>` is the *homogeneous* ID of the source node.
* `<dst_id>` is the *homogeneous* ID of the destination node.
* `<type_edge_id>` is the edge ID for the edge type.
* `<edge_type>` is the edge type.
* `<attributes>` are optional fields. They can be used to store any values and ParMETIS does not
interpret these fields.
`xxx_stats.txt` stores some basic statistics of the graph. It has only one line with three fields
separated by whitespace:
.. code-block:: none
<num_nodes> <num_edges> <num_node_weights>
* `num_nodes` stores the total number of nodes regardless of node types.
* `num_edges` stores the total number of edges regardless of edge types.
* `num_node_weights` stores the number of node weights in the node file.
Run ParMETIS and output formats
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ParMETIS contains a command called `pm_dglpart`, which loads the graph stored in the three
files from the machine where `pm_dglpart` is invoked, distributes data to all machines in
the cluster and invokes ParMETIS to partition the graph. When it completes, it generates
three files for each partition: `p<part_id>-xxx_nodes.txt`, `p<part_id>-xxx_edges.txt`,
`p<part_id>-xxx_stats.txt`.
**Note**: ParMETIS reassigns IDs to nodes during the partitioning. After ID reassignment,
the nodes in a partition are assigned with contiguous IDs; furthermore, the nodes of
the same type are assigned with contiguous IDs.
`p<part_id>-xxx_nodes.txt` stores the node data of the partition. Each row represents
a node with the following fields:
.. code-block:: none
<node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
* `<node_id>` is the *homogeneous* node IDs after ID reassignment.
* `<node_type>` is the node type.
* `<weight1>` is the node weight used by ParMETIS.
* `<orig_type_node_id>` is the original node ID for a specific node type in the input heterogeneous graph.
* `<attributes>` are optional fields that contain any node attributes in the input node file.
`p<part_id>-xxx_edges.txt` stores the edge data of the partition. Each row represents
an edge with the following fields:
.. code-block:: none
<src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_type_edge_id> <edge_type> <attributes>
* `<src_id>` is the *homogeneous* ID of the source node after ID reassignment.
* `<dst_id>` is the *homogeneous* ID of the destination node after ID reassignment.
* `<orig_src_id>` is the *homogeneous* ID of the source node in the input graph.
* `<orig_dst_id>` is the *homogeneous* ID of the destination node in the input graph.
* `<orig_type_edge_id>` is the edge ID for the specific edge type in the input graph.
* `<edge_type>` is the edge type.
* `<attributes>` are optional fields that contain any edge attributes in the input edge file.
When invoking `pm_dglpart`, the three input files: `xxx_nodes.txt`, `xxx_edges.txt`, `xxx_stats.txt`
should be located in the directory where `pm_dglpart` runs. The following command partitions the graph
named `xxx` into two partitions.
.. code-block:: none
pm_dglpart xxx 2
The following command partitions the graph named `xxx` into eight partitions. In this case,
the three input files: `xxx_nodes.txt`, `xxx_edges.txt`, `xxx_stats.txt` should still be located
in the directory where `pm_dglpart` runs. **Note**: the command actually splits the input graph
into eight partitions.
.. code-block:: none
mpirun -np 4 pm_dglpart xxx 2
Convert ParMETIS outputs to DGLGraph
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
DGL provides a script named `convert_partition.py`, located in the `tools` directory, to convert the data
in the partition files into :class:`dgl.DGLGraph` objects and save them into files.
**Note**: `convert_partition.py` runs in a single machine. In the future, we will extend it to convert
graph data in parallel across multiple machines. **Note**: please install the `pyarrow` package
for loading data in csv files.
`convert_partition.py` has the following arguments:
* `--input-dir INPUT_DIR` specifies the directory that contains the partition files generated by ParMETIS.
* `--graph-name GRAPH_NAME` specifies the graph name.
* `--schema SCHEMA` provides a file that specifies the schema of the input heterogeneous graph.
* `--num-parts NUM_PARTS` specifies the number of partitions.
* `--num-node-weights NUM_NODE_WEIGHTS` specifies the number of node weights used by ParMETIS
to balance partitions.
* `[--workspace WORKSPACE]` is an optional argument that specifies a workspace directory to
store some intermediate results.
* `[--node-attr-dtype NODE_ATTR_DTYPE]` is an optional argument that specifies the data type of
node attributes in the remaining fields `<attributes>` of the node files.
* `[--edge-attr-dtype EDGE_ATTR_DTYPE]` is an optional argument that specifies the data type of
edge attributes in the remaining fields `<attributes>` of the edge files.
* `--output OUTPUT` specifies the output directory that stores the partition results.
`convert_partition.py` outputs files as below:
.. code-block:: none
data_root_dir/
|-- xxx.json # partition configuration file in JSON
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format (optional)
|-- edge_feats.dgl # edge features stored in binary format (optional)
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
**Note**: if the data type of node attributes or edge attributes is specified, `convert_partition.py`
assumes all nodes/edges of any types have exactly these attributes. Therefore, if
nodes or edges of different types contain different numbers of attributes, users need to construct
them manually.
Construct node/edge features for a heterogeneous graph
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
:class:`dgl.DGLGraph` output by `convert_partition.py` stores a heterogeneous graph partition
as a homogeneous graph. Its node data contains a field called `orig_id` to store the node IDs
of a specific node type in the original heterogeneous graph and a field of `NTYPE` to store
the node type. In addition, it contains node data called `inner_node` that indicates
whether a node in the graph partition is assigned to the partition. If a node is assigned
to the partition, `inner_node` has 1; otherwise, its value is 0. Note: a graph partition
also contains some HALO nodes, which are assigned to other partitions but are connected with
some edges in this graph partition. By using this information, we can construct node features
for each node type separately and store them in a dictionary whose keys are
`<node_type>/<feature_name>` and values are node feature tensors. The code below illustrates
the construction of node feature dictionary. After the dictionary of tensors are constructed,
they are saved into a file.
.. code-block:: none
node_data = {}
for ntype in hg.ntypes:
local_node_idx = th.logical_and(part.ndata['inner_node'].bool(),
part.ndata[dgl.NTYPE] == hg.get_ntype_id(ntype))
local_nodes = part.ndata['orig_id'][local_node_idx].numpy()
for name in hg.nodes[ntype].data:
node_data[ntype + '/' + name] = hg.nodes[ntype].data[name][local_nodes]
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['node_feats'], node_data)
We can construct the edge features in a very similar way. The only difference is that
all edges in the :class:`dgl.DGLGraph` object belong to the partition. So the construction
is even simpler.
.. code-block:: none
edge_data = {}
for etype in hg.etypes:
local_edges = subg.edata['orig_id'][subg.edata[dgl.ETYPE] == hg.get_etype_id(etype)]
for name in hg.edges[etype].data:
edge_data[etype + '/' + name] = hg.edges[etype].data[name][local_edges]
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['edge_feats'], edge_data)
......@@ -7,8 +7,16 @@ Before training, please install some python libs by pip:
```bash
sudo pip3 install ogb
sudo pip3 install pyinstrument
sudo pip3 install pyarrow
```
To use the example and tools provided by DGL, please clone the DGL Github repository.
```bash
git clone --recursive https://github.com/dmlc/dgl.git
```
Below, we assume the repository is cloned in the home directory.
To train RGCN, it has four steps:
### Step 0: set IP configuration file.
......@@ -44,7 +52,7 @@ DGL provides a script for copying partitioned data to the cluster. Before that,
```bash
mkdir ~/dgl_code
cp /home/ubuntu/dgl/examples/pytorch/rgcn/experimental/entity_classify_dist.py ~/dgl_code
cp ~/dgl/examples/pytorch/rgcn/experimental/entity_classify_dist.py ~/dgl_code
```
......@@ -88,6 +96,67 @@ We can get the performance score at the second epoch:
Val Acc 0.4323, Test Acc 0.4255, time: 128.0379
```
## Partition a graph with ParMETIS
It has four steps to partition a graph with ParMETIS for DGL's distributed training.
More details about the four steps are explained in our
[user guide](https://doc.dgl.ai/guide/distributed-preprocessing.html).
### Step 1: write the graph into files.
The graph structure should be written as a node file and an edge file. The node features and edge features
can be written as DGL tensors. `write_mag.py` shows an example of writing the OGB MAG graph into files.
```bash
python3 write_mag.py
```
### Step 2: partition the graph with ParMETIS
Run the program called `pm_dglpart` in ParMETIS to read the node file and the edge file output in Step 1
to partition the graph.
```bash
pm_dglpart mag 2
```
This partitions the graph into two parts with a single process.
```
mpirun -np 4 pm_dglpart mag 2
```
This partitions the graph into eight parts with four processes.
```
mpirun --hostfile hostfile -np 4 pm_dglpart mag 2
```
This partitions the graph into eight parts with four processes on multiple machines.
`hostfile` specifies the IPs of the machines; one line for a machine. The input files
should reside in the machine where the command line runs. Each process will write
the partitions to files in the local machine. For simplicity, we recommend users to
write the files on NFS.
### Step 3: Convert the ParMETIS partitions into DGLGraph
DGL provides a tool called `convert_partition.py` to load one partition at a time and convert it into a DGLGraph
and save it into a file.
```bash
python3 ~/dgl/tools/convert_partition.py --input-dir . --graph-name mag --schema mag.json --num-parts 2 --num-node-weights 4 --output outputs
```
### Step 4: Read node data and edge data for each partition
This shows an example of reading node data and edge data of each partition and saving them into files located in the same directory as the DGLGraph file.
```bash
python3 get_mag_data.py
```
### Step 5: Verify the partition result (Optional)
```bash
python3 verify_mag_partitions.py
```
## Distributed code runs in the standalone mode
The standalone mode is mainly used for development and testing. The procedure to run the code is much simpler.
......
import dgl
import json
import torch as th
import numpy as np
from ogb.nodeproppred import DglNodePropPredDataset
from pyinstrument import Profiler
# Load OGB-MAG.
dataset = DglNodePropPredDataset(name='ogbn-mag')
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
u, v = hg_orig.all_edges(etype=etype)
subgs[etype] = (u, v)
subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u)
hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']
split_idx = dataset.get_idx_split()
train_idx = split_idx["train"]['paper']
val_idx = split_idx["valid"]['paper']
test_idx = split_idx["test"]['paper']
paper_labels = labels['paper'].squeeze()
train_mask = th.zeros((hg.number_of_nodes('paper'),), dtype=th.bool)
train_mask[train_idx] = True
val_mask = th.zeros((hg.number_of_nodes('paper'),), dtype=th.bool)
val_mask[val_idx] = True
test_mask = th.zeros((hg.number_of_nodes('paper'),), dtype=th.bool)
test_mask[test_idx] = True
hg.nodes['paper'].data['train_mask'] = train_mask
hg.nodes['paper'].data['val_mask'] = val_mask
hg.nodes['paper'].data['test_mask'] = test_mask
hg.nodes['paper'].data['labels'] = paper_labels
with open('outputs/mag.json') as json_file:
metadata = json.load(json_file)
for part_id in range(metadata['num_parts']):
subg = dgl.load_graphs('outputs/part{}/graph.dgl'.format(part_id))[0][0]
node_data = {}
for ntype in hg.ntypes:
local_node_idx = th.logical_and(subg.ndata['inner_node'].bool(),
subg.ndata[dgl.NTYPE] == hg.get_ntype_id(ntype))
local_nodes = subg.ndata['orig_id'][local_node_idx].numpy()
for name in hg.nodes[ntype].data:
node_data[ntype + '/' + name] = hg.nodes[ntype].data[name][local_nodes]
print('node features:', node_data.keys())
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['node_feats'], node_data)
edge_data = {}
for etype in hg.etypes:
local_edges = subg.edata['orig_id'][subg.edata[dgl.ETYPE] == hg.get_etype_id(etype)]
for name in hg.edges[etype].data:
edge_data[etype + '/' + name] = hg.edges[etype].data[name][local_edges]
print('edge features:', edge_data.keys())
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['edge_feats'], edge_data)
import os
import json
import numpy as np
import dgl
import torch as th
from ogb.nodeproppred import DglNodePropPredDataset
with open('outputs/mag.json') as json_file:
metadata = json.load(json_file)
num_parts = metadata['num_parts']
# Load OGB-MAG.
dataset = DglNodePropPredDataset(name='ogbn-mag')
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
u, v = hg_orig.all_edges(etype=etype)
subgs[etype] = (u, v)
subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u)
hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']
# Construct node data and edge data after reshuffling.
node_feats = {}
edge_feats = {}
for partid in range(num_parts):
part_node_feats = dgl.data.utils.load_tensors('outputs/part{}/node_feat.dgl'.format(partid))
part_edge_feats = dgl.data.utils.load_tensors('outputs/part{}/edge_feat.dgl'.format(partid))
for key in part_node_feats:
if key in node_feats:
node_feats[key].append(part_node_feats[key])
else:
node_feats[key] = [part_node_feats[key]]
for key in part_edge_feats:
if key in edge_feats:
edge_feats[key].append(part_edge_feats[key])
else:
edge_feats[key] = [part_edge_feats[key]]
for key in node_feats:
node_feats[key] = th.cat(node_feats[key])
for key in edge_feats:
edge_feats[key] = th.cat(edge_feats[key])
ntype_map = metadata['ntypes']
ntypes = [None] * len(ntype_map)
for key in ntype_map:
ntype_id = ntype_map[key]
ntypes[ntype_id] = key
etype_map = metadata['etypes']
etypes = [None] * len(etype_map)
for key in etype_map:
etype_id = etype_map[key]
etypes[etype_id] = key
node_map = metadata['node_map']
for key in node_map:
node_map[key] = th.stack([th.tensor(row) for row in node_map[key]], 0)
nid_map = dgl.distributed.id_map.IdMap(node_map)
edge_map = metadata['edge_map']
for key in edge_map:
edge_map[key] = th.stack([th.tensor(row) for row in edge_map[key]], 0)
eid_map = dgl.distributed.id_map.IdMap(edge_map)
# Load the graph partition structure.
for partid in range(num_parts):
print('test part', partid)
part_file = 'outputs/part{}/graph.dgl'.format(partid)
subg = dgl.load_graphs(part_file)[0][0]
subg_src_id, subg_dst_id = subg.edges()
subg_src_id = subg.ndata['orig_id'][subg_src_id]
subg_dst_id = subg.ndata['orig_id'][subg_dst_id]
subg_ntype = subg.ndata[dgl.NTYPE]
subg_etype = subg.edata[dgl.ETYPE]
for ntype_id in th.unique(subg_ntype):
ntype = ntypes[ntype_id]
idx = subg_ntype == ntype_id
# This is global IDs after reshuffle.
nid = subg.ndata[dgl.NID][idx]
ntype_ids1, type_nid = nid_map(nid)
orig_type_nid = subg.ndata['orig_id'][idx]
# All nodes should have the same node type.
assert np.all(ntype_ids1.numpy() == int(ntype_id))
# Check node data.
for name in hg.nodes[ntype].data:
local_data = node_feats[ntype + '/' + name][type_nid]
local_data1 = hg.nodes[ntype].data[name][orig_type_nid]
assert np.all(local_data.numpy() == local_data1.numpy())
for etype_id in th.unique(subg_etype):
etype = etypes[etype_id]
idx = subg_etype == etype_id
exist = hg[etype].has_edges_between(subg_src_id[idx], subg_dst_id[idx])
assert np.all(exist.numpy())
eid = hg[etype].edge_ids(subg_src_id[idx], subg_dst_id[idx])
assert np.all(eid.numpy() == subg.edata['orig_id'][idx].numpy())
# This is global IDs after reshuffle.
eid = subg.edata[dgl.EID][idx]
etype_ids1, type_eid = eid_map(eid)
orig_type_eid = subg.edata['orig_id'][idx]
# All edges should have the same edge type.
assert np.all(etype_ids1.numpy() == int(etype_id))
# Check edge data.
for name in hg.edges[etype].data:
local_data = edge_feats[etype + '/' + name][type_eid]
local_data1 = hg.edges[etype].data[name][orig_type_eid]
assert np.all(local_data.numpy() == local_data1.numpy())
import dgl
import json
import torch as th
import numpy as np
from ogb.nodeproppred import DglNodePropPredDataset
from pyinstrument import Profiler
# Load OGB-MAG.
dataset = DglNodePropPredDataset(name='ogbn-mag')
hg_orig, labels = dataset[0]
subgs = {}
for etype in hg_orig.canonical_etypes:
u, v = hg_orig.all_edges(etype=etype)
subgs[etype] = (u, v)
subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u)
hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']
print(hg)
#subg_nodes = {}
#for ntype in hg.ntypes:
# subg_nodes[ntype] = np.random.choice(hg.number_of_nodes(ntype), int(hg.number_of_nodes(ntype) / 5), replace=False)
#hg = dgl.compact_graphs(dgl.node_subgraph(hg, subg_nodes))
profiler = Profiler()
profiler.start()
# OGB-MAG is stored in heterogeneous format. We need to convert it into homogeneous format.
g = dgl.to_homogeneous(hg)
g.ndata['orig_id'] = g.ndata[dgl.NID]
g.edata['orig_id'] = g.edata[dgl.EID]
print('|V|=' + str(g.number_of_nodes()))
print('|E|=' + str(g.number_of_edges()))
print('|NTYPE|=' + str(len(th.unique(g.ndata[dgl.NTYPE]))))
# Store the metadata of nodes.
num_node_weights = 0
node_data = [g.ndata[dgl.NTYPE].numpy()]
for ntype_id in th.unique(g.ndata[dgl.NTYPE]):
node_data.append((g.ndata[dgl.NTYPE] == ntype_id).numpy())
num_node_weights += 1
node_data.append(g.ndata['orig_id'].numpy())
node_data = np.stack(node_data, 1)
np.savetxt('mag_nodes.txt', node_data, fmt='%d', delimiter=' ')
# Store the node features
node_feats = {}
for ntype in hg.ntypes:
for name in hg.nodes[ntype].data:
node_feats[ntype + '/' + name] = hg.nodes[ntype].data[name]
dgl.data.utils.save_tensors("node_feat.dgl", node_feats)
# Store the metadata of edges.
src_id, dst_id = g.edges()
edge_data = th.stack([src_id, dst_id,
g.edata['orig_id'],
g.edata[dgl.ETYPE]], 1)
np.savetxt('mag_edges.txt', edge_data.numpy(), fmt='%d', delimiter=' ')
# Store the edge features
edge_feats = {}
for etype in hg.etypes:
for name in hg.edges[etype].data:
edge_feats[etype + '/' + name] = hg.edges[etype].data[name]
dgl.data.utils.save_tensors("edge_feat.dgl", edge_feats)
# Store the basic metadata of the graph.
graph_stats = [g.number_of_nodes(), g.number_of_edges(), num_node_weights]
with open('mag_stats.txt', 'w') as filehandle:
filehandle.writelines("{} {} {}".format(graph_stats[0], graph_stats[1], graph_stats[2]))
# Store the ID ranges of nodes and edges of the entire graph.
nid_ranges = {}
eid_ranges = {}
for ntype in hg.ntypes:
ntype_id = hg.get_ntype_id(ntype)
nid = th.nonzero(g.ndata[dgl.NTYPE] == ntype_id, as_tuple=True)[0]
per_type_nid = g.ndata['orig_id'][nid]
assert np.all((per_type_nid == th.arange(len(per_type_nid))).numpy())
assert np.all((nid == th.arange(nid[0], nid[-1] + 1)).numpy())
nid_ranges[ntype] = [int(nid[0]), int(nid[-1] + 1)]
for etype in hg.etypes:
etype_id = hg.get_etype_id(etype)
eid = th.nonzero(g.edata[dgl.ETYPE] == etype_id, as_tuple=True)[0]
assert np.all((eid == th.arange(eid[0], eid[-1] + 1)).numpy())
eid_ranges[etype] = [int(eid[0]), int(eid[-1] + 1)]
with open('mag.json', 'w') as outfile:
json.dump({'nid': nid_ranges, 'eid': eid_ranges}, outfile, indent=4)
profiler.stop()
print(profiler.output_text(unicode=True, color=True))
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
from pyarrow import csv
from pyinstrument import Profiler
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--num-node-weights', required=True, type=int,
help='The number of node weights used by METIS.')
parser.add_argument('--workspace', type=str, default='/tmp',
help='The directory to store the intermediate results')
parser.add_argument('--node-attr-dtype', type=str, default=None,
help='The data type of the node attributes')
parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
args = parser.parse_args()
input_dir = args.input_dir
graph_name = args.graph_name
num_parts = args.num_parts
num_node_weights = args.num_node_weights
node_attr_dtype = args.node_attr_dtype
edge_attr_dtype = args.edge_attr_dtype
workspace_dir = args.workspace
output_dir = args.output
with open(args.schema) as json_file:
schema = json.load(json_file)
nid_ranges = schema['nid']
eid_ranges = schema['eid']
nid_ranges = {key: np.array(nid_ranges[key]).reshape(1, 2) for key in nid_ranges}
eid_ranges = {key: np.array(eid_ranges[key]).reshape(1, 2) for key in eid_ranges}
id_map = dgl.distributed.id_map.IdMap(nid_ranges)
ntypes = [(key, nid_ranges[key][0,0]) for key in nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e:i for i, e in enumerate(ntypes)}
etypes = [(key, eid_ranges[key][0,0]) for key in eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e:i for i, e in enumerate(etypes)}
profiler = Profiler()
profiler.start()
def read_feats(file_name):
attrs = csv.read_csv(file_name, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(attrs.columns)
return np.stack([attrs.columns[i].to_numpy() for i in range(num_cols)], 1)
num_edges = 0
num_nodes = 0
node_map_val = {ntype:[] for ntype in ntypes}
edge_map_val = {etype:[] for etype in etypes}
for part_id in range(num_parts):
node_file = 'p{:03}-{}_nodes.txt'.format(part_id, graph_name)
# The format of each line in the node file:
# <node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
# The node file contains nodes that belong to a partition. It doesn't include HALO nodes.
orig_type_nid_col = 3 + num_node_weights
first_attr_col = 4 + num_node_weights
# Get the first two columns which is the node ID and node type.
tmp_output = workspace_dir + '/' + node_file + '.tmp'
os.system('awk \'{print $1, $2, $' + str(orig_type_nid_col) + '}\''
+ ' {} > {}'.format(input_dir + '/' + node_file, tmp_output))
nodes = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
nids, ntype_ids, orig_type_nid = nodes.columns[0].to_numpy(), nodes.columns[1].to_numpy(), \
nodes.columns[2].to_numpy()
orig_homo_nid = ntype_offset_np[ntype_ids] + orig_type_nid
assert np.all(nids[1:] - nids[:-1] == 1)
nid_range = (nids[0], nids[-1])
num_nodes += len(nodes)
if node_attr_dtype is not None:
# Get node attributes
# Here we just assume all nodes have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode node attributes.
os.system('cut -d\' \' -f {}- {} > {}'.format(first_attr_col,
input_dir + '/' + node_file,
tmp_output))
node_attrs = read_feats(tmp_output)
node_feats = {}
# nodes in a partition has been sorted based on node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
assert np.all(type_nids == np.arange(type_nids[0], type_nids[-1] + 1))
node_feats[ntype_name + '/feat'] = th.as_tensor(node_attrs[ntype_ids == ntype_id])
dgl.data.utils.save_tensors(os.path.join(part_dir, "node_feat.dgl"), node_feats)
# Determine the node ID ranges of different node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append([int(type_nids[0]), int(type_nids[-1]) + 1])
edge_file = 'p{:03}-{}_edges.txt'.format(part_id, graph_name)
# The format of each line in the edge file:
# <src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_edge_id> <edge_type> <attributes>
tmp_output = workspace_dir + '/' + edge_file + '.tmp'
os.system('awk \'{print $1, $2, $3, $4, $5, $6}\'' + ' {} > {}'.format(input_dir + '/' + edge_file,
tmp_output))
edges = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(edges.columns)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = [edges.columns[i].to_numpy() for i in range(num_cols)]
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = src_id[sort_idx], dst_id[sort_idx], \
orig_src_id[sort_idx], orig_dst_id[sort_idx], orig_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
if edge_attr_dtype is not None:
# Get edge attributes
# Here we just assume all edges have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode edge attributes.
os.system('cut -d\' \' -f 7- {} > {}'.format(input_dir + '/' + edge_file, tmp_output))
edge_attrs = th.as_tensor(read_feats(tmp_output))[sort_idx]
edge_feats = {}
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_feats[etype_name + '/feat'] = th.as_tensor(edge_attrs[etype_ids == etype_id])
dgl.data.utils.save_tensors(os.path.join(part_dir, "edge_feat.dgl"), edge_feats)
# Determine the edge ID range of different edge types.
edge_id_start = num_edges
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_map_val[etype_name].append([int(edge_id_start),
int(edge_id_start + np.sum(etype_ids == etype_id))])
edge_id_start += np.sum(etype_ids == etype_id)
# Here we want to compute the unique IDs in the edge list.
# It is possible that a node that belongs to the partition but it doesn't appear
# in the edge list. That is, the node is assigned to this partition, but its neighbor
# belongs to another partition so that the edge is assigned to another partition.
# This happens in a directed graph.
# To avoid this kind of nodes being removed from the graph, we add node IDs that
# belong to this partition.
ids = np.concatenate([src_id, dst_id, np.arange(nid_range[0], nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
local_src_id, local_dst_id = np.split(inverse_idx[:len(src_id) * 2], 2)
compact_g = dgl.graph((local_src_id, local_dst_id))
compact_g.edata['orig_id'] = th.as_tensor(orig_edge_id)
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
compact_g.edata['inner_edge'] = th.ones(compact_g.number_of_edges(), dtype=th.bool)
compact_g.edata[dgl.EID] = th.arange(num_edges, num_edges + compact_g.number_of_edges())
num_edges += compact_g.number_of_edges()
# The original IDs are homogeneous IDs.
# Similarly, we need to add the original homogeneous node IDs
orig_ids = np.concatenate([orig_src_id, orig_dst_id, orig_homo_nid])
orig_homo_ids = orig_ids[idx]
ntype, per_type_ids = id_map(orig_homo_ids)
compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids)
compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype)
compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids)
compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(uniq_ids >= nid_range[0], uniq_ids <= nid_range[1]))
local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()]
assert np.all((local_nids == th.arange(local_nids[0], local_nids[-1] + 1)).numpy())
print('|V|={}'.format(compact_g.number_of_nodes()))
print('|E|={}'.format(compact_g.number_of_edges()))
# We need to reshuffle nodes in a partition so that all local nodes are labelled starting from 0.
reshuffle_nodes = th.arange(compact_g.number_of_nodes())
reshuffle_nodes = th.cat([reshuffle_nodes[compact_g.ndata['inner_node'].bool()],
reshuffle_nodes[compact_g.ndata['inner_node'] == 0]])
compact_g1 = dgl.node_subgraph(compact_g, reshuffle_nodes)
compact_g1.ndata['orig_id'] = compact_g.ndata['orig_id'][reshuffle_nodes]
compact_g1.ndata[dgl.NTYPE] = compact_g.ndata[dgl.NTYPE][reshuffle_nodes]
compact_g1.ndata[dgl.NID] = compact_g.ndata[dgl.NID][reshuffle_nodes]
compact_g1.ndata['inner_node'] = compact_g.ndata['inner_node'][reshuffle_nodes]
compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]]
compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.EID] = compact_g.edata[dgl.EID][compact_g1.edata[dgl.EID]]
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
dgl.save_graphs(part_dir + '/graph.dgl', [compact_g1])
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
for part_id in range(num_parts):
part_dir = output_dir + '/part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
profiler.stop()
print(profiler.output_text(unicode=True, color=True))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment