Unverified Commit 3bcb268a authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

[Doc] Update distributed chapter according to new pipeline (#4275)

* dist index chapter

* preproc chapter

* rst

* tools page

* partition chapter

* rst

* hetero chapter

* 7.1 step1

* add parmetis back

* changed based on feedback

* address comments
parent 39987bc5
......@@ -3,7 +3,20 @@
dgl.distributed
=================================
.. automodule:: dgl.distributed
.. currentmodule:: dgl.distributed
DGL distributed module contains classes and functions to support
distributed Graph Neural Network training and inference on a cluster of
machines.
This includes a few submodules:
* distributed data structures including distributed graph, distributed tensor
and distributed embeddings.
* distributed sampling.
* distributed workload split at runtime.
* graph partition.
Initialization
---------------
......@@ -27,26 +40,22 @@ Distributed Tensor
Distributed Node Embedding
---------------------
.. currentmodule:: dgl.distributed
.. autoclass:: DistEmbedding
Distributed embedding optimizer
-------------------------
.. currentmodule:: dgl.distributed.optim.pytorch
.. autoclass:: SparseAdagrad
.. autoclass:: dgl.distributed.optim.SparseAdagrad
:members: step
.. autoclass:: SparseAdam
.. autoclass:: dgl.distributed.optim.SparseAdam
:members: step
Distributed workload split
--------------------------
.. currentmodule:: dgl.distributed.dist_graph
.. autosummary::
:toctree: ../../generated/
......@@ -59,19 +68,17 @@ Distributed Sampling
Distributed DataLoader
``````````````````````
.. currentmodule:: dgl.distributed.dist_dataloader
.. autoclass:: DistDataLoader
Distributed Neighbor Sampling
`````````````````````````````
.. currentmodule:: dgl.distributed.graph_services
.. _api-distributed-sampling-ops:
Distributed Graph Sampling Operators
```````````````````````````````````````
.. autosummary::
:toctree: ../../generated/
sample_neighbors
sample_etype_neighbors
find_edges
in_subgraph
......@@ -81,18 +88,14 @@ Partition
Graph partition book
````````````````````
.. currentmodule:: dgl.distributed.graph_partition_book
.. autoclass:: GraphPartitionBook
:members: shared_memory, num_partitions, metadata, nid2partid, eid2partid, partid2nids, partid2eids, nid2localnid, eid2localeid, partid, map_to_per_ntype, map_to_per_etype, map_to_homo_nid, map_to_homo_eid, canonical_etypes
.. autoclass:: PartitionPolicy
:members: policy_str, part_id, partition_book, to_local, to_partid, get_part_size, get_size
Split and Load Graphs
`````````````````````
.. currentmodule:: dgl.distributed.partition
Split and Load Partitions
````````````````````````````
.. autosummary::
:toctree: ../../generated/
......@@ -101,4 +104,3 @@ Split and Load Graphs
load_partition_feats
load_partition_book
partition_graph
.. _guide-distributed-apis:
7.2 Distributed APIs
--------------------
7.3 Programming APIs
-----------------------------------
:ref:`(中文版) <guide_cn-distributed-apis>`
This section covers the distributed APIs used in the training script. DGL provides three distributed
data structures and various APIs for initialization, distributed sampling and workload split.
For distributed training/inference, DGL provides three distributed data structures:
:class:`~dgl.distributed.DistGraph` for distributed graphs, :class:`~dgl.distributed.DistTensor` for
distributed tensors and :class:`~dgl.distributed.DistEmbedding` for distributed learnable embeddings.
This section covers the core python components commonly used in a training script. DGL
provides three distributed data structures and various APIs for initialization,
distributed sampling and workload split.
* :class:`~dgl.distributed.DistGraph` for accessing structure and feature of a distributedly
stored graph.
* :class:`~dgl.distributed.DistTensor` for accessing node/edge feature tensor that
is partitioned across machines.
* :class:`~dgl.distributed.DistEmbedding` for accessing learnable node/edge embedding
tensor that is partitioned across machines.
Initialization of the DGL distributed module
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:func:`~dgl.distributed.initialize` initializes the distributed module. When the training script runs
in the trainer mode, this API builds connections with DGL servers and creates sampler processes;
when the script runs in the server mode, this API runs the server code and never returns. This API
has to be called before any of DGL's distributed APIs. When working with Pytorch,
:func:`~dgl.distributed.initialize` has to be invoked before ``torch.distributed.init_process_group``.
Typically, the initialization APIs should be invoked in the following order:
:func:`dgl.distributed.initialize` initializes the distributed module. If invoked
by a trainer, this API creates sampler processes and builds connections with graph
servers; if invoked by graph server, this API starts a service loop to listen to
trainer/sampler requests. The API *must* be called before
:func:`torch.distributed.init_process_group` and any other ``dgl.distributed`` APIs
as shown in the order below:
.. code:: python
dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo')
**Note**: If the training script contains user-defined functions (UDFs) that have to be invoked on
the servers (see the section of DistTensor and DistEmbedding for more details), these UDFs have to
be declared before :func:`~dgl.distributed.initialize`.
.. note::
If the training script contains user-defined functions (UDFs) that have to be invoked on
the servers (see the section of DistTensor and DistEmbedding for more details), these UDFs have to
be declared before :func:`~dgl.distributed.initialize`.
Distributed graph
~~~~~~~~~~~~~~~~~
:class:`~dgl.distributed.DistGraph` is a Python class to access the graph structure and node/edge features
in a cluster of machines. Each machine is responsible for one and only one partition. It loads
the partition data (the graph structure and the node data and edge data in the partition) and makes
it accessible to all trainers in the cluster. :class:`~dgl.distributed.DistGraph` provides a small subset
of :class:`~dgl.DGLGraph` APIs for data access.
**Note**: :class:`~dgl.distributed.DistGraph` currently only supports graphs of one node type and one edge type.
:class:`~dgl.distributed.DistGraph` is a Python class to access the graph
structure and node/edge features in a cluster of machines. Each machine is
responsible for one and only one partition. It loads the partition data (the
graph structure and the node data and edge data in the partition) and makes it
accessible to all trainers in the cluster. :class:`~dgl.distributed.DistGraph`
provides a small subset of :class:`~dgl.DGLGraph` APIs for data access.
Distributed mode vs. standalone mode
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
:class:`~dgl.distributed.DistGraph` can run in two modes: distributed mode and standalone mode.
:class:`~dgl.distributed.DistGraph` can run in two modes: *distributed mode* and *standalone mode*.
When a user executes a training script in a Python command line or Jupyter Notebook, it runs in
a standalone mode. That is, it runs all computation in a single process and does not communicate
with any other processes. Thus, the standalone mode requires the input graph to have only one partition.
......@@ -58,32 +64,36 @@ of machines and access them through the network.
DistGraph creation
^^^^^^^^^^^^^^^^^^
In the distributed mode, the creation of :class:`~dgl.distributed.DistGraph` requires the graph name used
during graph partitioning. The graph name identifies the graph loaded in the cluster.
In the distributed mode, the creation of :class:`~dgl.distributed.DistGraph`
requires the graph name given during graph partitioning. The graph name
identifies the graph loaded in the cluster.
.. code:: python
import dgl
g = dgl.distributed.DistGraph('graph_name')
When running in the standalone mode, it loads the graph data in the local machine. Therefore, users need
to provide the partition configuration file, which contains all information about the input graph.
When running in the standalone mode, it loads the graph data in the local
machine. Therefore, users need to provide the partition configuration file,
which contains all information about the input graph.
.. code:: python
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
**Note**: In the current implementation, DGL only allows the creation of a single DistGraph object. The behavior
of destroying a DistGraph and creating a new one is undefined.
.. note::
DGL only allows one single ``DistGraph`` object. The behavior
of destroying a DistGraph and creating a new one is undefined.
Access graph structure
^^^^^^^^^^^^^^^^^^^^^^
Accessing graph structure
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
:class:`~dgl.distributed.DistGraph` provides a very small number of APIs to access the graph structure.
Currently, most APIs provide graph information, such as the number of nodes and edges. The main use case
of DistGraph is to run sampling APIs to support mini-batch training (see the section of distributed
graph sampling).
:class:`~dgl.distributed.DistGraph` provides a set of APIs to
access the graph structure. Currently, most APIs provide graph information,
such as the number of nodes and edges. The main use case of DistGraph is to run
sampling APIs to support mini-batch training (see `Distributed sampling`_).
.. code:: python
......@@ -124,8 +134,10 @@ in the cluster even if the :class:`~dgl.distributed.DistTensor` object disappear
tensor = dgl.distributed.DistTensor((g.number_of_nodes(), 10), th.float32, name='test')
**Note**: :class:`~dgl.distributed.DistTensor` creation is a synchronized operation. All trainers
have to invoke the creation and the creation succeeds only when all trainers call it.
.. note::
:class:`~dgl.distributed.DistTensor` creation is a synchronized operation. All trainers
have to invoke the creation and the creation succeeds only when all trainers call it.
A user can add a :class:`~dgl.distributed.DistTensor` to a :class:`~dgl.distributed.DistGraph`
object as one of the node data or edge data.
......@@ -134,13 +146,15 @@ object as one of the node data or edge data.
g.ndata['feat'] = tensor
**Note**: The node data name and the tensor name do not have to be the same. The former identifies
node data from :class:`~dgl.distributed.DistGraph` (in the trainer process) while the latter
identifies a distributed tensor in DGL servers.
.. note::
The node data name and the tensor name do not have to be the same. The former identifies
node data from :class:`~dgl.distributed.DistGraph` (in the trainer process) while the latter
identifies a distributed tensor in DGL servers.
:class:`~dgl.distributed.DistTensor` provides a small set of functions. It has the same APIs as
regular tensors to access its metadata, such as the shape and dtype.
:class:`~dgl.distributed.DistTensor` supports indexed reads and writes but does not support
:class:`~dgl.distributed.DistTensor` has the same APIs as
regular tensors to access its metadata, such as the shape and dtype. It also
supports indexed reads and writes but does not support
computation operators, such as sum and mean.
.. code:: python
......@@ -149,12 +163,16 @@ computation operators, such as sum and mean.
print(data)
g.ndata['feat'][[3, 4, 5]] = data
**Note**: Currently, DGL does not provide protection for concurrent writes from multiple trainers
when a machine runs multiple servers. This may result in data corruption. One way to avoid concurrent
writes to the same row of data is to run one server process on a machine.
.. note::
Currently, DGL does not provide protection for concurrent writes from
multiple trainers when a machine runs multiple servers. This may result in
data corruption. One way to avoid concurrent writes to the same row of data
is to run one server process on a machine.
Distributed DistEmbedding
~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
DGL provides :class:`~dgl.distributed.DistEmbedding` to support transductive models that require
node embeddings. Creating distributed embeddings is very similar to creating distributed tensors.
......@@ -167,20 +185,25 @@ node embeddings. Creating distributed embeddings is very similar to creating dis
return arr
emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
Internally, distributed embeddings are built on top of distributed tensors, and, thus, has
very similar behaviors to distributed tensors. For example, when embeddings are created, they
are sharded and stored across all machines in the cluster. It can be uniquely identified by a name.
Internally, distributed embeddings are built on top of distributed tensors,
and, thus, has very similar behaviors to distributed tensors. For example, when
embeddings are created, they are sharded and stored across all machines in the
cluster. It can be uniquely identified by a name.
.. note::
**Note**: The initializer function is invoked in the server process. Therefore, it has to be
declared before :class:`~dgl.distributed.initialize`.
The initializer function is invoked in the server process. Therefore, it has to be
declared before :class:`dgl.distributed.initialize`.
Because the embeddings are part of the model, a user has to attach them to an optimizer for
mini-batch training. Currently, DGL provides a sparse Adagrad optimizer
:class:`~dgl.distributed.SparseAdagrad` (DGL will add more optimizers for sparse embeddings later).
Users need to collect all distributed embeddings from a model and pass them to the sparse optimizer.
If a model has both node embeddings and regular dense model parameters and users want to perform
sparse updates on the embeddings, they need to create two optimizers, one for node embeddings and
the other for dense model parameters, as shown in the code below:
Because the embeddings are part of the model, a user has to attach them to an
optimizer for mini-batch training. Currently, DGL provides a sparse Adagrad
optimizer :class:`~dgl.distributed.SparseAdagrad` (DGL will add more optimizers
for sparse embeddings later). Users need to collect all distributed embeddings
from a model and pass them to the sparse optimizer. If a model has both node
embeddings and regular dense model parameters and users want to perform sparse
updates on the embeddings, they need to create two optimizers, one for node
embeddings and the other for dense model parameters, as shown in the code
below:
.. code:: python
......@@ -192,31 +215,41 @@ the other for dense model parameters, as shown in the code below:
optimizer.step()
sparse_optimizer.step()
**Note**: :class:`~dgl.distributed.DistEmbedding` is not an Pytorch nn module, so we cannot
get access to it from parameters of a Pytorch nn module.
.. note::
:class:`~dgl.distributed.DistEmbedding` does not inherit :class:`torch.nn.Module`,
so we recommend using it outside of your own NN module.
Distributed sampling
~~~~~~~~~~~~~~~~~~~~
DGL provides two levels of APIs for sampling nodes and edges to generate mini-batches
(see the section of mini-batch training). The low-level APIs require users to write code
to explicitly define how a layer of nodes are sampled (e.g., using :func:`dgl.sampling.sample_neighbors` ).
The high-level sampling APIs implement a few popular sampling algorithms for node classification
and link prediction tasks (e.g., :class:`~dgl.dataloading.NodeDataLoader` and
DGL provides two levels of APIs for sampling nodes and edges to generate
mini-batches (see the section of mini-batch training). The low-level APIs
require users to write code to explicitly define how a layer of nodes are
sampled (e.g., using :func:`dgl.sampling.sample_neighbors` ). The high-level
sampling APIs implement a few popular sampling algorithms for node
classification and link prediction tasks (e.g.,
:class:`~dgl.dataloading.NodeDataLoader` and
:class:`~dgl.dataloading.EdgeDataLoader` ).
The distributed sampling module follows the same design and provides two levels of sampling APIs.
For the lower-level sampling API, it provides :func:`~dgl.distributed.sample_neighbors` for
distributed neighborhood sampling on :class:`~dgl.distributed.DistGraph`. In addition, DGL provides
a distributed DataLoader (:class:`~dgl.distributed.DistDataLoader` ) for distributed sampling.
The distributed DataLoader has the same interface as Pytorch DataLoader except that users cannot
specify the number of worker processes when creating a dataloader. The worker processes are created
in :func:`dgl.distributed.initialize`.
**Note**: When running :func:`dgl.distributed.sample_neighbors` on :class:`~dgl.distributed.DistGraph`,
the sampler cannot run in Pytorch DataLoader with multiple worker processes. The main reason is that
Pytorch DataLoader creates new sampling worker processes in every epoch, which leads to creating and
destroying :class:`~dgl.distributed.DistGraph` objects many times.
The distributed sampling module follows the same design and provides two levels
of sampling APIs. For the lower-level sampling API, it provides
:func:`~dgl.distributed.sample_neighbors` for distributed neighborhood sampling
on :class:`~dgl.distributed.DistGraph`. In addition, DGL provides a distributed
DataLoader (:class:`~dgl.distributed.DistDataLoader` ) for distributed
sampling. The distributed DataLoader has the same interface as Pytorch
DataLoader except that users cannot specify the number of worker processes when
creating a dataloader. The worker processes are created in
:func:`dgl.distributed.initialize`.
.. note::
When running :func:`dgl.distributed.sample_neighbors` on
:class:`~dgl.distributed.DistGraph`, the sampler cannot run in Pytorch
DataLoader with multiple worker processes. The main reason is that Pytorch
DataLoader creates new sampling worker processes in every epoch, which
leads to creating and destroying :class:`~dgl.distributed.DistGraph`
objects many times.
When using the low-level API, the sampling code is similar to single-process sampling. The only
difference is that users need to use :func:`dgl.distributed.sample_neighbors` and
......@@ -243,8 +276,8 @@ difference is that users need to use :func:`dgl.distributed.sample_neighbors` an
The high-level sampling APIs (:class:`~dgl.dataloading.NodeDataLoader` and
:class:`~dgl.dataloading.EdgeDataLoader` ) has distributed counterparts
(:class:`~dgl.dataloading.DistNodeDataLoader` and
:class:`~dgl.dataloading.DistEdgeDataLoader`). The code is exactly the
same as single-process sampling otherwise.
:class:`~dgl.dataloading.DistEdgeDataLoader`). The code is exactly the same as
single-process sampling otherwise.
.. code:: python
......@@ -256,30 +289,33 @@ same as single-process sampling otherwise.
Split workloads
~~~~~~~~~~~~~~~
To train a model, users first need to split the dataset into training, validation and test sets.
For distributed training, this step is usually done before we invoke :func:`dgl.distributed.partition_graph`
to partition a graph. We recommend to store the data split in boolean arrays as node data or edge data.
For node classification tasks, the length of these boolean arrays is the number of nodes in a graph
and each of their elements indicates the existence of a node in a training/validation/test set.
Similar boolean arrays should be used for link prediction tasks.
:func:`dgl.distributed.partition_graph` splits these boolean arrays (because they are stored as
the node data or edge data of the graph) based on the graph partitioning
result and store them with graph partitions.
During distributed training, users need to assign training nodes/edges to each trainer. Similarly,
we also need to split the validation and test set in the same way.
DGL provides :func:`~dgl.distributed.node_split` and :func:`~dgl.distributed.edge_split` to
split the training, validation and test set at runtime for distributed training. The two functions
take the boolean arrays constructed before graph partitioning as input, split them and
return a portion for the local trainer.
By default, they ensure that all portions have the same number of nodes/edges. This is
important for synchronous SGD, which assumes each trainer has the same number of mini-batches.
The example below splits the training set and returns a subset of nodes for the local process.
~~~~~~~~~~~~~~~~~~
To train a model, users first need to split the dataset into training,
validation and test sets. For distributed training, this step is usually done
before we invoke :func:`dgl.distributed.partition_graph` to partition a graph.
We recommend to store the data split in boolean arrays as node data or edge
data. For node classification tasks, the length of these boolean arrays is the
number of nodes in a graph and each of their elements indicates the existence
of a node in a training/validation/test set. Similar boolean arrays should be
used for link prediction tasks. :func:`dgl.distributed.partition_graph` splits
these boolean arrays (because they are stored as the node data or edge data of
the graph) based on the graph partitioning result and store them with graph
partitions.
During distributed training, users need to assign training nodes/edges to each
trainer. Similarly, we also need to split the validation and test set in the
same way. DGL provides :func:`~dgl.distributed.node_split` and
:func:`~dgl.distributed.edge_split` to split the training, validation and test
set at runtime for distributed training. The two functions take the boolean
arrays constructed before graph partitioning as input, split them and return a
portion for the local trainer. By default, they ensure that all portions have
the same number of nodes/edges. This is important for synchronous SGD, which
assumes each trainer has the same number of mini-batches.
The example below splits the training set and returns a subset of nodes for the
local process.
.. code:: python
train_nids = dgl.distributed.node_split(g.ndata['train_mask'])
.. _guide-distributed-hetero:
7.3 Distributed Heterogeneous graph training
7.5 Heterogeneous Graph Under The Hood
--------------------------------------------
DGL v0.6.0 provides an experimental support for distributed training on heterogeneous graphs.
In DGL, a node or edge in a heterogeneous graph has a unique ID in its own node type or edge type.
DGL identifies a node or edge with a tuple: node/edge type and type-wise ID. In distributed training,
a node or edge can be identified by a homogeneous ID, in addition to the tuple of node/edge type
and type-wise ID. The homogeneous ID is unique regardless of the node type and edge type.
DGL arranges nodes and edges so that all nodes of the same type have contiguous
homogeneous IDs.
Below is an example adjancency matrix of a heterogeneous graph showing the homogeneous ID assignment.
Here, the graph has two types of nodes (`T0` and `T1` ), and four types of edges (`R0`, `R1`, `R2`, `R3` ).
There are a total of 400 nodes in the graph and each type has 200 nodes. Nodes
of `T0` have IDs in [0,200), while nodes of `T1` have IDs in [200, 400).
In this example, if we use a tuple to identify the nodes, nodes of `T0` are identified as
(T0, type-wise ID), where type-wise ID falls in [0, 200); nodes of `T1` are identified as
(T1, type-wise ID), where type-wise ID also falls in [0, 200).
The chapter covers the implementation details of distributed heterogeneous
graph. They are transparent to users in most scenarios but could be useful
for advanced customization.
In DGL, a node or edge in a heterogeneous graph has a unique ID in its own node
type or edge type. Therefore, DGL can identify a node or an edge
with a tuple: ``(node/edge type, type-wise ID)``. We call IDs of such form as
**heterogeneous IDs**. To patition a heterogeneous graph for distributed training,
DGL converts it to a homogeneous graph so that we can reuse the partitioning
algorithms designed for homogeneous graphs. Each node/edge is thus uniquely mapped
to an integer ID in a consecutive ID range (e.g., from 0 to the total number of
nodes of all types). We call the IDs after conversion as **homogeneous IDs**.
Below is an illustration of the ID conversion process. Here, the graph has two
types of nodes (:math:`T0` and :math:`T1` ), and four types of edges
(:math:`R0`, :math:`R1`, :math:`R2`, :math:`R3` ). There are a total of 400
nodes in the graph and each type has 200 nodes. Nodes of :math:`T0` have IDs in
[0,200), while nodes of :math:`T1` have IDs in [200, 400). In this example, if
we use a tuple to identify the nodes, nodes of :math:`T0` are identified as
(T0, type-wise ID), where type-wise ID falls in [0, 200); nodes of :math:`T1`
are identified as (T1, type-wise ID), where type-wise ID also falls in [0,
200).
.. figure:: https://data.dgl.ai/tutorial/hetero/heterograph_ids.png
:alt: Imgur
7.3.1 Access distributed graph data
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
For distributed training, :class:`~dgl.distributed.DistGraph` supports the heterogeneous graph API
in :class:`~dgl.DGLGraph`. Below shows an example of getting node data of `T0` on some nodes
by using type-wise node IDs. When accessing data in :class:`~dgl.distributed.DistGraph`, a user
needs to use type-wise IDs and corresponding node types or edge types.
.. code:: python
ID Conversion Utilities
^^^^^^^^^^^^^^^^^^^^^^^^
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
feat = g.nodes['T0'].data['feat'][type_wise_ids]
During Preprocessing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
A user can create distributed tensors and distributed embeddings for a particular node type or
edge type. Distributed tensors and embeddings are split and stored in multiple machines. To create
one, a user needs to specify how it is partitioned with :class:`~dgl.distributed.PartitionPolicy`.
By default, DGL chooses the right partition policy based on the size of the first dimension.
However, if multiple node types or edge types have the same number of nodes or edges, DGL cannot
determine the partition policy automatically. A user needs to explicitly specify the partition policy.
Below shows an example of creating a distributed tensor for node type `T0` by using the partition policy
for `T0` and store it as node data of `T0`.
The steps of :ref:`Parallel Processing Pipeline <guide-distributed-preprocessing>`
all use heterogeneous IDs for their inputs and outputs. Nevertheless, some steps such as
ParMETIS partitioning are easier to be implemented using homogeneous IDs, thus
requiring a utility to perform ID conversion.
The code below implements a simple ``IDConverter`` using the metadata information
in the metadata JSON from the chunked graph data format. It starts from some
node type :math:`A` as node type 0, then assigns all its nodes with IDs
in range :math:`[0, |V_A|-1)`. It then moves to the next node
type B as node type 1 and assigns all its nodes with IDs in range
:math:`[|V_A|, |V_A|+|V_B|-1)`.
.. code:: python
g.nodes['T0'].data['feat1'] = dgl.distributed.DistTensor((g.number_of_nodes('T0'), 1), th.float32, 'feat1',
part_policy=g.get_node_partition_policy('T0'))
The partition policies used for creating distributed tensors and embeddings are initialized when a heterogeneous
graph is loaded into the graph server. A user cannot create a new partition policy at runtime. Therefore, a user
can only create distributed tensors or embeddings for a node type or edge type.
Accessing distributed tensors and embeddings also requires type-wise IDs.
7.3.2 Distributed sampling
^^^^^^^^^^^^^^^^^^^^^^^^^^
DGL v0.6 uses homogeneous IDs in distributed sampling. **Note**: this may change in the future release.
DGL provides four APIs to convert node IDs and edge IDs between the homogeneous IDs and type-wise IDs:
from bisect import bisect_left
import numpy as np
class IDConverter:
def __init__(self, meta):
# meta is the JSON object loaded from metadata.json
self.node_type = meta['node_type']
self.edge_type = meta['edge_type']
self.ntype2id_map = {ntype : i for i, ntype in enumerate(self.node_type)}
self.etype2id_map = {etype : i for i, etype in enumerate(self.edge_type)}
self.num_nodes = [sum(ns) for ns in meta['num_nodes_per_chunk']]
self.num_edges = [sum(ns) for ns in meta['num_edges_per_chunk']]
self.nid_offset = np.cumsum([0] + self.num_nodes)
self.eid_offset = np.cumsum([0] + self.num_edges)
def ntype2id(self, ntype):
"""From node type name to node type ID"""
return self.ntype2id_map[ntype]
def etype2id(self, etype):
"""From edge type name to edge type ID"""
return self.etype2id_map[etype]
def id2ntype(self, id):
"""From node type ID to node type name"""
return self.node_type[id]
def id2etype(self, id):
"""From edge type ID to edge type name"""
return self.edge_type[id]
def nid_het2hom(self, ntype, id):
"""From heterogeneous node ID to homogeneous node ID"""
tid = self.ntype2id(ntype)
if id < 0 or id >= self.num_nodes[tid]:
raise ValueError(f'Invalid node ID of type {ntype}. Must be within range [0, {self.num_nodes[tid]})')
return self.nid_offset[tid] + id
def nid_hom2het(self, id):
"""From heterogeneous node ID to homogeneous node ID"""
if id < 0 or id >= self.nid_offset[-1]:
raise ValueError(f'Invalid homogeneous node ID. Must be within range [0, self.nid_offset[-1])')
tid = bisect_left(self.nid_offset, id) - 1
# Return a pair (node_type, type_wise_id)
return self.id2ntype(tid), id - self.nid_offset[tid]
def eid_het2hom(self, etype, id):
"""From heterogeneous edge ID to homogeneous edge ID"""
tid = self.etype2id(etype)
if id < 0 or id >= self.num_edges[tid]:
raise ValueError(f'Invalid edge ID of type {etype}. Must be within range [0, {self.num_edges[tid]})')
return self.eid_offset[tid] + id
def eid_hom2het(self, id):
"""From heterogeneous edge ID to homogeneous edge ID"""
if id < 0 or id >= self.eid_offset[-1]:
raise ValueError(f'Invalid homogeneous edge ID. Must be within range [0, self.eid_offset[-1])')
tid = bisect_left(self.eid_offset, id) - 1
# Return a pair (edge_type, type_wise_id)
return self.id2etype(tid), id - self.eid_offset[tid]
After Partition Loading
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
After the partitions are loaded into trainer or server processes, the loaded
:class:`~dgl.distributed.GraphPartitionBook` provides utilities for conversion
between homogeneous IDs and heterogeneous IDs.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_per_ntype`: convert a homogeneous node ID to type-wise ID and node type ID.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_per_etype`: convert a homogeneous edge ID to type-wise ID and edge type ID.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_homo_nid`: convert type-wise ID and node type to a homogeneous node ID.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_homo_eid`: convert type-wise ID and edge type to a homogeneous edge ID.
Below shows an example of sampling a subgraph with :func:`~dgl.distributed.sample_neighbors` from a heterogeneous graph
with a node type called `paper`. It first converts type-wise node IDs to homogeneous node IDs. After sampling a subgraph
from the seed nodes, it converts homogeneous node IDs and edge IDs to type-wise IDs and also stores type IDs as node data
and edge data.
Because all DGL's low-level :ref:`distributed graph sampling operators
<api-distributed-sampling-ops>` use homogeneous IDs, DGL internally converts
the heterogeneous IDs specified by users to homogeneous IDs before invoking
sampling operators. Below shows an example of sampling a subgraph by
:func:`~dgl.distributed.sample_neighbors` from nodes of type ``"paper"``. It
first performs ID conversion, and after getting the sampled subgraph, converts
the homogeneous node/edge IDs back to heterogeneous ones.
.. code:: python
......@@ -89,5 +147,43 @@ and edge data.
block.srcdata[dgl.NTYPE], block.srcdata[dgl.NID] = gpb.map_to_per_ntype(block.srcdata[dgl.NID])
block.dstdata[dgl.NTYPE], block.dstdata[dgl.NID] = gpb.map_to_per_ntype(block.dstdata[dgl.NID])
From node/edge type IDs, a user can retrieve node/edge types. For example, `g.ntypes[node_type_id]`.
With node/edge types and type-wise IDs, a user can retrieve node/edge data from `DistGraph` for mini-batch computation.
Note that getting node/edge types from type IDs is simple -- just getting them
from the ``ntypes`` attributes of a ``DistGraph``, i.e., ``g.ntypes[node_type_id]``.
Access distributed graph data
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The :class:`~dgl.distributed.DistGraph` class supports similar interface as
:class:`~dgl.DGLGraph`. Below shows an example of getting the feature data of
nodes 0, 10, 20 of type :math:`T0`. When accessing data in
:class:`~dgl.distributed.DistGraph`, a user needs to use type-wise IDs and
corresponding node types or edge types.
.. code:: python
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
feat = g.nodes['T0'].data['feat'][[0, 10, 20]]
A user can create distributed tensors and distributed embeddings for a
particular node type or edge type. Distributed tensors and embeddings are split
and stored in multiple machines. To create one, a user needs to specify how it
is partitioned with :class:`~dgl.distributed.PartitionPolicy`. By default, DGL
chooses the right partition policy based on the size of the first dimension.
However, if multiple node types or edge types have the same number of nodes or
edges, DGL cannot determine the partition policy automatically. A user needs to
explicitly specify the partition policy. Below shows an example of creating a
distributed tensor for node type :math:`T0` by using the partition policy for :math:`T0`
and store it as node data of :math:`T0`.
.. code:: python
g.nodes['T0'].data['feat1'] = dgl.distributed.DistTensor(
(g.number_of_nodes('T0'), 1), th.float32, 'feat1',
part_policy=g.get_node_partition_policy('T0'))
The partition policies used for creating distributed tensors and embeddings are
initialized when a heterogeneous graph is loaded into the graph server. A user
cannot create a new partition policy at runtime. Therefore, a user can only
create distributed tensors or embeddings for a node type or edge type.
Accessing distributed tensors and embeddings also requires type-wise IDs.
.. _guide-distributed-partition:
7.4 Advanced Graph Partitioning
---------------------------------------
The chapter covers some of the advanced topics for graph partitioning.
METIS partition algorithm
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
`METIS <http://glaros.dtc.umn.edu/gkhome/views/metis>`__ is a state-of-the-art
graph partitioning algorithm that can generate partitions with minimal number
of cross-partition edges, making it suitable for distributed message passing
where the amount of network communication is proportional to the number of
cross-partition edges. DGL has integrated METIS as the default partitioning
algorithm in its :func:`dgl.distributed.partition_graph` API.
Load balancing
~~~~~~~~~~~~~~~~
When partitioning a graph, by default, METIS only balances the number of nodes
in each partition. This can result in suboptimal configuration, depending on
the task at hand. For example, in the case of semi-supervised node
classification, a trainer performs computation on a subset of labeled nodes in
a local partition. A partitioning that only balances nodes in a graph (both
labeled and unlabeled), may end up with computational load imbalance. To get a
balanced workload in each partition, the partition API allows balancing between
partitions with respect to the number of nodes in each node type, by specifying
``balance_ntypes`` in :func:`~dgl.distributed.partition_graph`. Users can take
advantage of this and consider nodes in the training set, validation set and
test set are of different node types.
The following example considers nodes inside the training set and outside the
training set are two types of nodes:
.. code:: python
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
In addition to balancing the node types,
:func:`dgl.distributed.partition_graph` also allows balancing between
in-degrees of nodes of different node types by specifying ``balance_edges``.
This balances the number of edges incident to the nodes of different types.
ID mapping
~~~~~~~~~~~~~
After partitioning, :func:`~dgl.distributed.partition_graph` remap node
and edge IDs so that nodes of the same partition are aranged together
(in a consecutive ID range), making it easier to store partitioned node/edge
features. The API also automatically shuffles the node/edge features
according to the new IDs. However, some downstream tasks may want to
recover the original node/edge IDs (such as extracting the computed node
embeddings for later use). For such cases, pass ``return_mapping=True``
to :func:`~dgl.distributed.partition_graph`, which makes the API returns
the ID mappings between the remapped node/edge IDs and their origianl ones.
For a homogeneous graph, it returns two vectors. The first vector maps every new
node ID to its original ID; the second vector maps every new edge ID to
its original ID. For a heterogeneous graph, it returns two dictionaries of
vectors. The first dictionary contains the mapping for each node type; the
second dictionary contains the mapping for each edge type.
.. code:: python
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
balance_ntypes=g.ndata['train_mask'],
return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[node_map] = node_emb
Output format
~~~~~~~~~~~~~~~~~~~~~~~~~~
Regardless of the partitioning algorithm in use, the partitioned results are stored
in data files organized as follows:
.. code-block:: none
data_root_dir/
|-- graph_name.json # partition configuration file in JSON
|-- part0/ # data for partition 0
| |-- node_feats.dgl # node features stored in binary format
| |-- edge_feats.dgl # edge features stored in binary format
| |-- graph.dgl # graph structure of this partition stored in binary format
|
|-- part1/ # data for partition 1
| |-- node_feats.dgl
| |-- edge_feats.dgl
| |-- graph.dgl
|
|-- ... # data for other partitions
When distributed to a cluster, the metadata JSON should be copied to all the machines
while the ``partX`` folders should be dispatched accordingly.
DGL provides a :func:`dgl.distributed.load_partition` function to load one partition
for inspection.
.. code:: python
>>> import dgl
>>> # load partition 0
>>> part_data = dgl.distributed.load_partition('data_root_dir/graph_name.json', 0)
>>> g, nfeat, efeat, partition_book, graph_name, ntypes, etypes = part_data # unpack
>>> print(g)
Graph(num_nodes=966043, num_edges=34270118,
ndata_schemes={'orig_id': Scheme(shape=(), dtype=torch.int64),
'part_id': Scheme(shape=(), dtype=torch.int64),
'_ID': Scheme(shape=(), dtype=torch.int64),
'inner_node': Scheme(shape=(), dtype=torch.int32)}
edata_schemes={'_ID': Scheme(shape=(), dtype=torch.int64),
'inner_edge': Scheme(shape=(), dtype=torch.int8),
'orig_id': Scheme(shape=(), dtype=torch.int64)})
As mentioned in the `ID mapping`_ section, each partition carries auxiliary information
saved as ndata or edata such as original node/edge IDs, partition IDs, etc. Each partition
not only saves nodes/edges it owns, but also includes node/edges that are adjacent to
the partition (called **HALO** nodes/edges). The ``inner_node`` and ``inner_edge``
indicate whether a node/edge truely belongs to the partition (value is ``True``)
or is a HALO node/edge (value is ``False``).
The :func:`~dgl.distributed.load_partition` function loads all data at once. Users can
load features or the partition book using the :func:`dgl.distributed.load_partition_feats`
and :func:`dgl.distributed.load_partition_book` APIs respectively.
Parallel METIS partitioning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For massive graphs where parallel preprocessing is desired, DGL supports
`ParMETIS <http://glaros.dtc.umn.edu/gkhome/metis/parmetis/overview>`__ as one
of the choices of partitioning algorithms.
.. note::
Because ParMETIS does not support heterogeneous graph, users need to
conduct ID conversion before and after running ParMETIS.
Check out chapter :ref:`guide-distributed-hetero` for explanation.
.. note::
Please make sure that the input graph to ParMETIS does not have
duplicate edges (or parallel edges) and self-loop edges.
ParMETIS Installation
^^^^^^^^^^^^^^^^^^^^^^
ParMETIS requires METIS and GKLib. Please follow the instructions `here
<https://github.com/KarypisLab/GKlib>`__ to compile and install GKLib. For
compiling and install METIS, please follow the instructions below to clone
METIS with GIT and compile it with int64 support.
.. code-block:: bash
git clone https://github.com/KarypisLab/METIS.git
make config shared=1 cc=gcc prefix=~/local i64=1
make install
For now, we need to compile and install ParMETIS manually. We clone the DGL branch of ParMETIS as follows:
.. code-block:: bash
git clone --branch dgl https://github.com/KarypisLab/ParMETIS.git
Then compile and install ParMETIS.
.. code-block:: bash
make config cc=mpicc prefix=~/local
make install
Before running ParMETIS, we need to set two environment variables: ``PATH`` and ``LD_LIBRARY_PATH``.
.. code-block:: bash
export PATH=$PATH:$HOME/local/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/local/lib/
Input format
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note::
As a prerequisite, read chapter :doc:`guide-distributed-hetero` to understand
how DGL organize heterogeneous graph for distributed training.
The input graph for ParMETIS is stored in three files with the following names:
``xxx_nodes.txt``, ``xxx_edges.txt`` and ``xxx_stats.txt``, where ``xxx`` is a
graph name.
Each row in ``xxx_nodes.txt`` stores the information of a node. Row ID is
also the *homogeneous* ID of a node, e.g., row 0 is for node 0; row 1 is for
node 1, etc. Each row has the following format:
.. code-block:: none
<node_type_id> <node_weight_list> <type_wise_node_id>
All fields are separated by whitespace:
* ``<node_type_id>`` is an integer starting from 0. Each node type is mapped to
an integer. For a homogeneous graph, its value is always 0.
* ``<node_weight_list>`` are integers (separated by whitespace) that indicate
the node weights used by ParMETIS to balance graph partitions. For homogeneous
graphs, the list has only one integer while for heterogeneous graphs with
:math:`T` node types, the list should has :math:`T` integers. If the node
belongs to node type :math:`t`, then all the integers except the :math:`t^{th}`
one are zero; the :math:`t^{th}` integer is the weight of that node. ParMETIS
will try to balance the total node weight of each partition. For heterogeneous
graph, it will try to distribute nodes of the same type to all partitions.
The recommended node weights are 1 for balancing the number of nodes in each
partition or node degrees for balancing the number of edges in each partition.
* ``<type_wise_node_id>`` is an integer representing the node ID in its own type.
Below shows an example of a node file for a heterogeneous graph with two node
types. Node type 0 has three nodes; node type 1 has four nodes. It uses two
node weights to ensure that ParMETIS will generate partitions with roughly the
same number of nodes for type 0 and the same number of nodes for type 1.
.. code-block:: none
0 1 0 0
0 1 0 1
0 1 0 2
1 0 1 0
1 0 1 1
1 0 1 2
1 0 1 3
Similarly, each row in ``xxx_edges.txt`` stores the information of an edge. Row ID is
also the *homogeneous* ID of an edge, e.g., row 0 is for edge 0; row 1 is for
edge 1, etc. Each row has the following format:
.. code-block:: none
<src_node_id> <dst_node_id> <type_wise_edge_id> <edge_type_id>
All fields are separated by whitespace:
* ``<src_node_id>`` is the *homogeneous* ID of the source node.
* ``<dst_node_id>`` is the *homogeneous* ID of the destination node.
* ``<type_wise_edge_id>`` is the edge ID for the edge type.
* ``<edge_type_id>`` is an integer starting from 0. Each edge type is mapped to
an integer. For a homogeneous graph, its value is always 0.
``xxx_stats.txt`` stores some basic statistics of the graph. It has only one line with three fields
separated by whitespace:
.. code-block:: none
<num_nodes> <num_edges> <total_node_weights>
* ``num_nodes`` stores the total number of nodes regardless of node types.
* ``num_edges`` stores the total number of edges regardless of edge types.
* ``total_node_weights`` stores the number of node weights in the node file.
Run ParMETIS and output format
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ParMETIS contains a command called ``pm_dglpart``, which loads the graph stored
in the three files from the machine where ``pm_dglpart`` is invoked, distributes
data to all machines in the cluster and invokes ParMETIS to partition the
graph. When it completes, it generates three files for each partition:
``p<part_id>-xxx_nodes.txt``, ``p<part_id>-xxx_edges.txt``,
``p<part_id>-xxx_stats.txt``.
.. note::
ParMETIS reassigns IDs to nodes during the partitioning. After ID reassignment,
the nodes in a partition are assigned with contiguous IDs; furthermore, the nodes of
the same type are assigned with contiguous IDs.
``p<part_id>-xxx_nodes.txt`` stores the node data of the partition. Each row represents
a node with the following fields:
.. code-block:: none
<node_id> <node_type_id> <node_weight_list> <type_wise_node_id>
* ``<node_id>`` is the *homogeneous* node ID after ID reassignment.
* ``<node_type_id>`` is the node type ID.
* ``<node_weight_list>`` is the node weight used by ParMETIS (copied from the input file).
* ``<type_wise_node_id>`` is an integer representing the node ID in its own type.
``p<part_id>-xxx_edges.txt`` stores the edge data of the partition. Each row represents
an edge with the following fields:
.. code-block:: none
<src_id> <dst_id> <orig_src_id> <orig_dst_id> <type_wise_edge_id> <edge_type_id>
* ``<src_id>`` is the *homogeneous* ID of the source node after ID reassignment.
* ``<dst_id>`` is the *homogeneous* ID of the destination node after ID reassignment.
* ``<orig_src_id>`` is the *homogeneous* ID of the source node in the input graph.
* ``<orig_dst_id>`` is the *homogeneous* ID of the destination node in the input graph.
* ``<type_wise_edge_id>`` is the edge ID in its own type.
* ``<edge_type_id>`` is the edge type ID.
When invoking ``pm_dglpart``, the three input files: ``xxx_nodes.txt``,
``xxx_edges.txt``, ``xxx_stats.txt`` should be located in the directory where
``pm_dglpart`` runs. The following command run four ParMETIS processes to
partition the graph named ``xxx`` into eight partitions (each process handles
two partitions).
.. code-block:: bash
mpirun -np 4 pm_dglpart xxx 2
The output files from ParMETIS then need to be converted to the
:ref:`partition assignment format <guide-distributed-prep-partition>` to in
order to run subsequent preprocessing steps.
.. _guide-distributed-preprocessing:
7.1 Preprocessing for Distributed Training
7.1 Data Preprocessing
------------------------------------------
:ref:`(中文版) <guide_cn-distributed-preprocessing>`
DGL requires to preprocess the graph data for distributed training. This includes two steps:
1) partition a graph into subgraphs, 2) assign nodes/edges with new IDs. For relatively small
graphs, DGL provides a partitioning API :func:`dgl.distributed.partition_graph` that performs
the two steps above. The API runs on one machine. Therefore, if a graph is large, users will
need a large machine to partition a graph when using this API. In addition to this API, we also
provide a solution to partition a large graph in a cluster of machines below (see Section 7.1.1).
:func:`dgl.distributed.partition_graph` supports both random partitioning
and a `Metis <http://glaros.dtc.umn.edu/gkhome/views/metis>`__-based partitioning.
The benefit of Metis partitioning is that it can generate
partitions with minimal edge cuts to reduce network communication for distributed training
and inference. DGL uses the latest version of Metis with the options optimized for the real-world
graphs with power-law distribution. After partitioning, the API constructs the partitioned results
in a format that is easy to load during the training.
By default, the partition API assigns new IDs to the nodes and edges in the input graph to help locate
nodes/edges during distributed training/inference. After assigning IDs, the partition API shuffles
all node data and edge data accordingly. After generating partitioned subgraphs, each subgraph is stored
as a ``DGLGraph`` object. The original node/edge IDs before reshuffling are stored in the field of
'orig_id' in the node/edge data of the subgraphs. The node data `dgl.NID` and the edge data `dgl.EID`
of the subgraphs store new node/edge IDs of the full graph after nodes/edges reshuffle.
During the training, users just use the new node/edge IDs.
The partitioned results are stored in multiple files in the output directory. It always contains
a JSON file called xxx.json, where xxx is the graph name provided to the partition API. The JSON file
contains all the partition configurations. If the partition API does not assign new IDs to nodes and edges,
it generates two additional Numpy files: `node_map.npy` and `edge_map.npy`, which stores the mapping between
node/edge IDs and partition IDs. The Numpy arrays in the two files are large for a graph with billions of
nodes and edges because they have an entry for each node and edge in the graph. Inside the folders for
each partition, there are three files that store the partition data in the DGL format. `graph.dgl` stores
the graph structure of the partition as well as some metadata on nodes and edges. `node_feats.dgl` and
`edge_feats.dgl` stores all features of nodes and edges that belong to the partition.
Before launching training jobs, DGL requires the input data to be partitioned
and distributed to the target machines. For relatively small graphs, DGL
provides a partitioning API :func:`~dgl.distributed.partition_graph` that
partitions an in-memory :class:`~dgl.DGLGraph` object. It supports
multiple partitioning algorithms such as random partitioning and
`Metis <http://glaros.dtc.umn.edu/gkhome/views/metis>`__.
The benefit of Metis partitioning is that it can generate partitions with
minimal edge cuts to reduce network communication for distributed training and
inference. DGL uses the latest version of Metis with the options optimized for
the real-world graphs with power-law distribution. After partitioning, the API
constructs the partitioned results in a format that is easy to load during the
training. For example,
.. code-block:: none
data_root_dir/
|-- xxx.json # partition configuration file in JSON
|-- node_map.npy # partition id of each node stored in a numpy array (optional)
|-- edge_map.npy # partition id of each edge stored in a numpy array (optional)
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format
|-- edge_feats.dgl # edge features stored in binary format
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
Load balancing
~~~~~~~~~~~~~~
When partitioning a graph, by default, Metis only balances the number of nodes in each partition.
This can result in suboptimal configuration, depending on the task at hand. For example, in the case
of semi-supervised node classification, a trainer performs computation on a subset of labeled nodes in
a local partition. A partitioning that only balances nodes in a graph (both labeled and unlabeled), may
end up with computational load imbalance. To get a balanced workload in each partition, the partition API
allows balancing between partitions with respect to the number of nodes in each node type, by specifying
``balance_ntypes`` in :func:`dgl.distributed.partition_graph`. Users can take advantage of this and consider
nodes in the training set, validation set and test set are of different node types.
The following example considers nodes inside the training set and outside the training set are two types of nodes:
.. code:: python
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
In addition to balancing the node types, :func:`dgl.distributed.partition_graph` also allows balancing
between in-degrees of nodes of different node types by specifying ``balance_edges``. This balances
the number of edges incident to the nodes of different types.
**Note**: The graph name passed to :func:`dgl.distributed.partition_graph` is an important argument.
The graph name will be used by :class:`dgl.distributed.DistGraph` to identify a distributed graph.
A legal graph name should only contain alphabetic characters and underscores.
ID mapping
~~~~~~~~~~
:func:`dgl.distributed.partition_graph` shuffles node IDs and edge IDs during the partitioning and shuffles
node data and edge data accordingly. After training, we may need to save the computed node embeddings for
any downstream tasks. Therefore, we need to reshuffle the saved node embeddings according to their original
IDs.
When `return_mapping=True`, :func:`dgl.distributed.partition_graph` returns the mappings between shuffled
node/edge IDs and their original IDs. For a homogeneous graph, it returns two vectors. The first
vector maps every shuffled node ID to its original ID; the second vector maps every shuffled edge ID to its
original ID. For a heterogeneous graph, it returns two dictionaries of vectors. The first dictionary contains
the mapping for each node type; the second dictionary contains the mapping for each edge type.
.. code:: python
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
balance_ntypes=g.ndata['train_mask'],
return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[node_map] = node_emb
7.1.1 Distributed partitioning
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
For a large graph, DGL uses `ParMetis <http://glaros.dtc.umn.edu/gkhome/metis/parmetis/overview>`__ to partition
a graph in a cluster of machines. This solution requires users to prepare data for ParMETIS and use a DGL script
`tools/convert_partition.py` to construct :class:`dgl.DGLGraph` for the partitions output by ParMETIS.
import dgl
**Note**: `convert_partition.py` uses the `pyarrow` package to load csv files. Please install `pyarrow`.
g = ... # create or load an DGLGraph object
dgl.distributed.partition_graph(g, 'mygraph', 2, 'data_root_dir')
ParMETIS Installation
~~~~~~~~~~~~~~~~~~~~~
ParMETIS requires METIS and GKLib. Please follow the instructions `here <https://github.com/KarypisLab/GKlib>`__
to compile and install GKLib. For compiling and install METIS, please follow the instructions below to
clone METIS with GIT and compile it with int64 support.
will outputs the following data file.
.. code-block:: none
git clone https://github.com/KarypisLab/METIS.git
make config shared=1 cc=gcc prefix=~/local i64=1
make install
For now, we need to compile and install ParMETIS manually. We clone the DGL branch of ParMETIS as follows:
.. code-block:: none
git clone --branch dgl https://github.com/KarypisLab/ParMETIS.git
Then compile and install ParMETIS.
.. code-block:: none
make config cc=mpicc prefix=~/local
make install
Before running ParMETIS, we need to set two environment variables: `PATH` and `LD_LIBRARY_PATH`.
.. code-block:: none
export PATH=$PATH:$HOME/local/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/local/lib/
Input format for ParMETIS
~~~~~~~~~~~~~~~~~~~~~~~~~
The input graph for ParMETIS is stored in three files with the following names: `xxx_nodes.txt`,
`xxx_edges.txt` and `xxx_stats.txt`, where `xxx` is a graph name.
Each row in `xxx_nodes.txt` stores the information of a node with the following format:
.. code-block:: none
<node_type> <weight1> ... <orig_type_node_id> <attributes>
All fields are separated by whitespace:
* `<node_type>` is an integer. For a homogeneous graph, its value is always 0. For heterogeneous graphs,
its value indicates the type of each node.
* `<weight1>`, `<weight2>`, etc are integers that indicate the node weights used by ParMETIS to balance
graph partitions. If a user does not provide node weights, ParMETIS partitions a graph and balance
the number of nodes in each partition (it is important to balance graph partitions in order to achieve
good training speed). However, this default strategy may not be sufficient for many use cases.
For example, in a heterogeneous graph, we want to partition the graph so that all partitions have
roughly the same number of nodes for each node type. The toy example below shows how we can use
node weights to balance the number of nodes of different types.
* `<orig_type_node_id>` is an integer representing the node ID in its own type. In DGL, nodes of each type
are assigned with IDs starting from 0. For a homogeneous graph, this field is the same as the node ID.
* `<attributes>` are optional fields. They can be used to store any values and ParMETIS does not interpret
these fields. Potentially, we can store the node features and edge features in these fields for
homogeneous graphs.
* The row ID indicates the *homogeneous* ID of nodes in a graph (all nodes are assigned with a unique ID).
All nodes of the same type should be assigned with contiguous IDs. That is, nodes of the same type should
be stored together in `xxx_nodes.txt`.
Below shows an example of a node file for a heterogeneous graph with two node types. Node type 0 has three
nodes; node type 1 has four nodes. It uses two node weights to ensure that ParMETIS will generate partitions
with roughly the same number of nodes for type 0 and the same number of nodes for type 1.
data_root_dir/
|-- mygraph.json # metadata JSON. File name is the given graph name.
|-- part0/ # data for partition 0
| |-- node_feats.dgl # node features stored in binary format
| |-- edge_feats.dgl # edge features stored in binary format
| |-- graph.dgl # graph structure of this partition stored in binary format
|
|-- part1/ # data for partition 1
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
Chapter :ref:`guide-distributed-partition` covers more details about the
partition format. To distribute the partitions to a cluster, users can either save
the data in some shared folder accessible by all machines, or copy the metadata
JSON as well as the corresponding partition folder ``partX`` to the X^th machine.
Using :func:`~dgl.distributed.partition_graph` requires an instance with large enough
CPU RAM to hold the entire graph structure and features, which may not be viable for
graphs with hundreds of billions of edges or large features. We describe how to use
the *parallel data preparation pipeline* for such cases next.
Parallel Data Preparation Pipeline
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To handle massive graph data that cannot fit in the CPU RAM of a
single machine, DGL utilizes data chunking and parallel processing to reduce
memory footprint and running time. The figure below illustrates the
pipeline:
.. figure:: https://data.dgl.ai/asset/image/guide_7_distdataprep.png
* The pipeline takes input data stored in *Chunked Graph Format* and
produces and dispatches data partitions to the target machines.
* **Step.1 Graph Partitioning:** It calculates the ownership of each partition
and saves the results as a set of files called *partition assignment*.
To speedup the step, some algorithms (e.g., ParMETIS) support parallel computing
using multiple machines.
* **Step.2 Data Dispatching:** Given the partition assignment, the step then
physically partitions the graph data and dispatches them to the machines user
specified. It also converts the graph data into formats that are suitable for
distributed training and evaluation.
The whole pipeline is modularized so that each step can be invoked
individually. For example, users can replace Step.1 with some custom graph partition
algorithm as long as it produces partition assignment files
correctly.
.. _guide-distributed-prep-chunk:
Chunked Graph Format
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To run the pipeline, DGL requires the input graph to be stored in multiple data
chunks. Each data chunk is the unit of data preprocessing and thus should fit
into CPU RAM. In this section, we use the MAG240M-LSC data from `Open Graph
Benchmark <https://ogb.stanford.edu/docs/lsc/mag240m/>`__ as an example to
describe the overall design, followed by a formal specification and
tips for creating data in such format.
Example: MAG240M-LSC
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The MAG240M-LSC graph is a heterogeneous academic graph
extracted from the Microsoft Academic Graph (MAG), whose schema diagram is
illustrated below:
.. figure:: https://data.dgl.ai/asset/image/guide_7_mag240m.png
Its raw data files are organized as follows:
.. code-block:: none
0 1 0 0
0 1 0 1
0 1 0 2
1 0 1 0
1 0 1 1
1 0 1 2
1 0 1 3
Similarly, each row in `xxx_edges.txt` stores the information of an edge with the following format:
/mydata/MAG240M-LSC/
|-- meta.pt # # A dictionary of the number of nodes for each type saved by torch.save,
| # as well as num_classes
|-- processed/
|-- author___affiliated_with___institution/
| |-- edge_index.npy # graph, 713 MB
|
|-- paper/
| |-- node_feat.npy # feature, 187 GB, (numpy memmap format)
| |-- node_label.npy # label, 974 MB
| |-- node_year.npy # year, 974 MB
|
|-- paper___cites___paper/
| |-- edge_index.npy # graph, 21 GB
|
|-- author___writes___paper/
|-- edge_index.npy # graph, 6GB
The graph has three node types (``"paper"``, ``"author"`` and ``"institution"``),
three edge types/relations (``"cites"``, ``"writes"`` and ``"affiliated_with"``). The
``"paper"`` nodes have three attributes (``"feat"``, ``"label"``, ``"year"'``), while
other types of nodes and edges are featureless. Below shows the data files when
it is stored in DGL Chunked Graph Format:
.. code-block:: none
<src_id> <dst_id> <type_edge_id> <edge_type> <attributes>
All fields are separated by whitespace:
* `<src_id>` is the *homogeneous* ID of the source node.
* `<dst_id>` is the *homogeneous* ID of the destination node.
* `<type_edge_id>` is the edge ID for the edge type.
* `<edge_type>` is the edge type.
* `<attributes>` are optional fields. They can be used to store any values and ParMETIS does not
interpret these fields.
**Note**: please make sure that there are no duplicated edges and self-loop edges in the edge file.
`xxx_stats.txt` stores some basic statistics of the graph. It has only one line with three fields
separated by whitespace:
.. code-block:: none
<num_nodes> <num_edges> <num_node_weights>
* `num_nodes` stores the total number of nodes regardless of node types.
* `num_edges` stores the total number of edges regardless of edge types.
* `num_node_weights` stores the number of node weights in the node file.
Run ParMETIS and output formats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ParMETIS contains a command called `pm_dglpart`, which loads the graph stored in the three
files from the machine where `pm_dglpart` is invoked, distributes data to all machines in
the cluster and invokes ParMETIS to partition the graph. When it completes, it generates
three files for each partition: `p<part_id>-xxx_nodes.txt`, `p<part_id>-xxx_edges.txt`,
`p<part_id>-xxx_stats.txt`.
**Note**: ParMETIS reassigns IDs to nodes during the partitioning. After ID reassignment,
the nodes in a partition are assigned with contiguous IDs; furthermore, the nodes of
the same type are assigned with contiguous IDs.
`p<part_id>-xxx_nodes.txt` stores the node data of the partition. Each row represents
a node with the following fields:
/mydata/MAG240M-LSC_chunked/
|-- metadata.json # metadata json file
|-- edges/ # stores edge ID data
| |-- writes-part1.csv
| |-- writes-part2.csv
| |-- affiliated_with-part1.csv
| |-- affiliated_with-part2.csv
| |-- cites-part1.csv
| |-- cites-part1.csv
|
|-- node_data/ # stores node feature data
|-- paper-feat-part1.npy
|-- paper-feat-part2.npy
|-- paper-label-part1.npy
|-- paper-label-part2.npy
|-- paper-year-part1.npy
|-- paper-year-part2.npy
All the data files are chunked into two parts, including the edges of each relation
(e.g., writes, affiliates, cites) and node features. If the graph has edge features,
they will be chunked into multiple files too. All ID data are stored in
CSV (we will illustrate the contents soon) while node features are stored in
numpy arrays.
The ``metadata.json`` stores all the metadata information such as file names
and chunk sizes (e.g., number of nodes, number of edges).
.. code-block:: python
.. code-block:: none
<node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
* `<node_id>` is the *homogeneous* node IDs after ID reassignment.
* `<node_type>` is the node type.
* `<weight1>` is the node weight used by ParMETIS.
* `<orig_type_node_id>` is the original node ID for a specific node type in the input heterogeneous graph.
* `<attributes>` are optional fields that contain any node attributes in the input node file.
{
"graph_name" : "MAG240M-LSC", # given graph name
"node_type": ["author", "paper", "institution"],
"num_nodes_per_chunk": [
[61191556, 61191556], # number of author nodes per chunk
[61191553, 61191552], # number of paper nodes per chunk
[12861, 12860] # number of institution nodes per chunk
],
# The edge type name is a colon-joined string of source, edge, and destination type.
"edge_type": [
"author:writes:paper",
"author:affiliated_with:institution",
"paper:cites:paper"
],
"num_edges_per_chunk": [
[193011360, 193011360], # number of author:writes:paper edges per chunk
[22296293, 22296293], # number of author:affiliated_with:institution edges per chunk
[648874463, 648874463] # number of paper:cites:paper edges per chunk
],
"edges" : {
"author:write:paper" : { # edge type
"format" : {"name": "csv", "delimiter": " "},
# The list of paths. Can be relative or absolute.
"data" : ["edges/writes-part1.csv", "edges/writes-part2.csv"]
},
"author:affiliated_with:institution" : {
"format" : {"name": "csv", "delimiter": " "},
"data" : ["edges/affiliated_with-part1.csv", "edges/affiliated_with-part2.csv"]
},
"author:affiliated_with:institution" : {
"format" : {"name": "csv", "delimiter": " "},
"data" : ["edges/cites-part1.csv", "edges/cites-part2.csv"]
}
},
"node_data" : {
"paper": { # node type
"feat": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-feat-part1.npy", "node_data/paper-feat-part2.npy"]
},
"label": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-label-part1.npy", "node_data/paper-label-part2.npy"]
},
"year": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-year-part1.npy", "node_data/paper-year-part2.npy"]
}
}
},
"edge_data" : {} # MAG240M-LSC does not have edge features
}
`p<part_id>-xxx_edges.txt` stores the edge data of the partition. Each row represents
an edge with the following fields:
There are three parts in ``metadata.json``:
* Graph schema information and chunk sizes, e.g., ``"node_type"`` , ``"num_nodes_per_chunk"``, etc.
* Edge index data under key ``"edges"``.
* Node/edge feature data under keys ``"node_data"`` and ``"edge_data"``.
The edge index files contain edges in the form of node ID pairs:
.. code-block:: bash
# writes-part1.csv
0 0
0 1
0 20
0 29
0 1203
...
Specification
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In general, a chunked graph data folder just needs a ``metadata.json`` and a
bunch of data files. The folder structure in the MAG240M-LSC example is not a
strict requirement as long as ``metadata.json`` contains valid file paths.
``metadata.json`` top-level keys:
* ``graph_name``: String. Unique name used by :class:`dgl.distributed.DistGraph`
to load graph.
* ``node_type``: List of string. Node type names.
* ``num_nodes_per_chunk``: List of list of integer. For graphs with :math:`T` node
types stored in :math:`P` chunks, the value contains :math:`T` integer lists.
Each list contains :math:`P` integers, which specify the number of nodes
in each chunk.
* ``edge_type``: List of string. Edge type names in the form of
``<source node type>:<relation>:<destination node type>``.
* ``num_edges_per_chunk``: List of list of integer. For graphs with :math:`R` edge
types stored in :math:`P` chunks, the value contains :math:`R` integer lists.
Each list contains :math:`P` integers, which specify the number of edges
in each chunk.
* ``edges``: Dict of ``ChunkFileSpec``. Edge index files.
Dictionary keys are edge type names in the form of
``<source node type>:<relation>:<destination node type>``.
* ``node_data``: Dict of ``ChunkFileSpec``. Data files that store node attributes.
Dictionary keys are node type names.
* ``edge_data``: Dict of ``ChunkFileSpec``. Data files that store edge attributes.
Dictionary keys are edge type names in the form of
``<source node type>:<relation>:<destination node type>``.
``ChunkFileSpec`` has two keys:
* ``format``: File format. Depending on the format ``name``, users can configure more
details about how to parse each data file.
- ``"csv"``: CSV file. Use the ``delimiter`` key to specify delimiter in use.
- ``"numpy"``: NumPy array binary file created by :func:`numpy.save`.
* ``data``: List of string. File path to each data chunk. Support absolute path
or path relative to the location of ``metadata.json``.
Tips for making chunked graph data
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Depending on the raw data, the implementation could include:
* Construct graphs out of non-structured data such as texts or tabular data.
* Augment or transform the input graph struture or features. E.g., adding reverse
or self-loop edges, normalizing features, etc.
* Chunk the input graph structure and features into multiple data files so that
each one can fit in CPU RAM for subsequent preprocessing steps.
To avoid running into out-of-memory error, it is recommended to process graph
structures and feature data separately. Processing one chunk at a time can also
reduce the maximal runtime memory footprint. As an example, DGL provides a
`tools/chunk_graph.py
<https://github.com/dmlc/dgl/blob/master/tools/chunk_graph.py>`_ script that
chunks an in-memory feature-less :class:`~dgl.DGLGraph` and feature tensors
stored in :class:`numpy.memmap`.
.. _guide-distributed-prep-partition:
Step.1 Graph Partitioning
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This step reads the chunked graph data and calculates which partition each node
should belong to. The results are saved in a set of *partition assignment files*.
For example, to randomly partition MAG240M-LSC to two parts, run the
``partition_algo/random.py`` script in the ``tools`` folder:
.. code-block:: bash
python /my/repo/dgl/tools/partition_algo/random.py
--in-dir=/mydata/MAG240M-LSC_chunked/
--out-dir=/mydata/MAG240M-LSC_2parts/
--num-parts=2
, which outputs files as follows:
.. code-block:: none
<src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_type_edge_id> <edge_type> <attributes>
* `<src_id>` is the *homogeneous* ID of the source node after ID reassignment.
* `<dst_id>` is the *homogeneous* ID of the destination node after ID reassignment.
* `<orig_src_id>` is the *homogeneous* ID of the source node in the input graph.
* `<orig_dst_id>` is the *homogeneous* ID of the destination node in the input graph.
* `<orig_type_edge_id>` is the edge ID for the specific edge type in the input graph.
* `<edge_type>` is the edge type.
* `<attributes>` are optional fields that contain any edge attributes in the input edge file.
MAG240M-LSC_2parts/
|-- paper.txt
|-- author.txt
|-- institution.txt
When invoking `pm_dglpart`, the three input files: `xxx_nodes.txt`, `xxx_edges.txt`, `xxx_stats.txt`
should be located in the directory where `pm_dglpart` runs. The following command run four ParMETIS
processes to partition the graph named `xxx` into eight partitions (each process handles two partitions).
Each file stores the partition assignment of the corresponding node type.
The contents are the partition ID of each node stored in lines, i.e., line i is
the partition ID of node i.
.. code-block:: none
.. code-block:: bash
mpirun -np 4 pm_dglpart xxx 2
Convert ParMETIS outputs to DGLGraph
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
DGL provides a script named `convert_partition.py`, located in the `tools` directory, to convert the data
in the partition files into :class:`dgl.DGLGraph` objects and save them into files.
**Note**: `convert_partition.py` runs in a single machine. In the future, we will extend it to convert
graph data in parallel across multiple machines. **Note**: please install the `pyarrow` package
for loading data in csv files.
`convert_partition.py` has the following arguments:
* `--input-dir INPUT_DIR` specifies the directory that contains the partition files generated by ParMETIS.
* `--graph-name GRAPH_NAME` specifies the graph name.
* `--schema SCHEMA` provides a file that specifies the schema of the input heterogeneous graph.
The schema file is a JSON file that lists node types and edge types as well as homogeneous ID ranges
for each node type and edge type.
* `--num-parts NUM_PARTS` specifies the number of partitions.
* `--num-node-weights NUM_NODE_WEIGHTS` specifies the number of node weights used by ParMETIS
to balance partitions.
* `[--workspace WORKSPACE]` is an optional argument that specifies a workspace directory to
store some intermediate results.
* `[--node-attr-dtype NODE_ATTR_DTYPE]` is an optional argument that specifies the data type of
node attributes in the remaining fields `<attributes>` of the node files.
* `[--edge-attr-dtype EDGE_ATTR_DTYPE]` is an optional argument that specifies the data type of
edge attributes in the remaining fields `<attributes>` of the edge files.
* `--output OUTPUT` specifies the output directory that stores the partition results.
`convert_partition.py` outputs files as below:
# paper.txt
0
1
1
0
0
1
0
...
.. code-block:: none
.. note::
data_root_dir/
|-- xxx.json # partition configuration file in JSON
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format (optional)
|-- edge_feats.dgl # edge features stored in binary format (optional)
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
**Note**: if the data type of node attributes or edge attributes is specified, `convert_partition.py`
assumes all nodes/edges of any types have exactly these attributes. Therefore, if
nodes or edges of different types contain different numbers of attributes, users need to construct
them manually.
Below shows an example of the schema of the OGBN-MAG graph for `convert_partition.py`. It has two fields:
"nid" and "eid". Inside "nid", it lists all node types and the homogeneous ID ranges for each node type;
inside "eid", it lists all edge types and the homogeneous ID ranges for each edge type.
DGL currently requires the number of data chunks and the number of partitions to be the same.
.. code-block:: none
Despite its simplicity, random partitioning may result in frequent
cross-machine communication. Check out chapter
:ref:`guide-distributed-partition` for more advanced options.
{
"nid": {
"author": [
0,
1134649
],
"field_of_study": [
1134649,
1194614
],
"institution": [
1194614,
1203354
],
"paper": [
1203354,
1939743
]
},
"eid": {
"affiliated_with": [
0,
1043998
],
"writes": [
1043998,
8189658
],
"rev-has_topic": [
8189658,
15694736
],
"rev-affiliated_with": [
15694736,
16738734
],
"cites": [
16738734,
22155005
],
"has_topic": [
22155005,
29660083
],
"rev-cites": [
29660083,
35076354
],
"rev-writes": [
35076354,
42222014
]
}
}
Step.2 Data Dispatching
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Below shows the demo code to construct the schema file.
DGL provides a ``dispatch_data.py`` script to physically partition the data and
dispatch partitions to each training machines. It will also convert the data
once again to data objects that can be loaded by DGL training processes
efficiently. The entire step can be further accelerated using multi-processing.
.. code-block:: none
.. code-block:: bash
nid_ranges = {}
eid_ranges = {}
for ntype in hg.ntypes:
ntype_id = hg.get_ntype_id(ntype)
nid = th.nonzero(g.ndata[dgl.NTYPE] == ntype_id, as_tuple=True)[0]
nid_ranges[ntype] = [int(nid[0]), int(nid[-1] + 1)]
for etype in hg.etypes:
etype_id = hg.get_etype_id(etype)
eid = th.nonzero(g.edata[dgl.ETYPE] == etype_id, as_tuple=True)[0]
eid_ranges[etype] = [int(eid[0]), int(eid[-1] + 1)]
with open('mag.json', 'w') as outfile:
json.dump({'nid': nid_ranges, 'eid': eid_ranges}, outfile, indent=4)
Construct node/edge features for a heterogeneous graph
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:class:`dgl.DGLGraph` output by `convert_partition.py` stores a heterogeneous graph partition
as a homogeneous graph. Its node data contains a field called `orig_id` to store the node IDs
of a specific node type in the original heterogeneous graph and a field of `NTYPE` to store
the node type. In addition, it contains node data called `inner_node` that indicates
whether a node in the graph partition is assigned to the partition. If a node is assigned
to the partition, `inner_node` has 1; otherwise, its value is 0. Note: a graph partition
also contains some HALO nodes, which are assigned to other partitions but are connected with
some edges in this graph partition. By using this information, we can construct node features
for each node type separately and store them in a dictionary whose keys are
`<node_type>/<feature_name>` and values are node feature tensors. The code below illustrates
the construction of node feature dictionary. After the dictionary of tensors are constructed,
they are saved into a file.
python /myrepo/dgl/tools/dispatch_data.py \
--in-dir=/mydata/MAG240M-LSC_chunked/ \
--partition-file=/mydata/MAG240M-LSC_2parts/ \
--out-dir=/data/MAG_LSC_partitioned \
--ip-config=ip_config.txt
.. code-block:: none
* ``--in-dir`` specifies the path to the folder of the input chunked graph data produced by Step.1.
* ``--partition-file`` specifies the path to the partition assignment file produced by Step.2.
* ``--out-dir`` specifies the path to stored the data partition on each machine.
* ``--ip-config`` specifies the IP configuration file of the cluster.
node_data = {}
for ntype in hg.ntypes:
local_node_idx = th.logical_and(part.ndata['inner_node'].bool(),
part.ndata[dgl.NTYPE] == hg.get_ntype_id(ntype))
local_nodes = part.ndata['orig_id'][local_node_idx].numpy()
for name in hg.nodes[ntype].data:
node_data[ntype + '/' + name] = hg.nodes[ntype].data[name][local_nodes]
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['node_feats'], node_data)
An example IP configuration file is as follows:
We can construct the edge features in a very similar way. The only difference is that
all edges in the :class:`dgl.DGLGraph` object belong to the partition. So the construction
is even simpler.
.. code-block:: bash
.. code-block:: none
172.31.19.1
172.31.23.205
edge_data = {}
for etype in hg.etypes:
local_edges = subg.edata['orig_id'][subg.edata[dgl.ETYPE] == hg.get_etype_id(etype)]
for name in hg.edges[etype].data:
edge_data[etype + '/' + name] = hg.edges[etype].data[name][local_edges]
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['edge_feats'], edge_data)
During data dispatching, DGL assumes that the combined CPU RAM of the cluster
is able to hold the entire graph data. Moreover, the number of machines (IPs) must be the
same as the number of partitions. Node ownership is determined by the result
of partitioning algorithm where as for edges the owner of the destination node
also owns the edge as well.
.. _guide-distributed-tools:
7.4 Tools for launching distributed training/inference
7.2 Tools for launching distributed training/inference
------------------------------------------------------
:ref:`(中文版) <guide_cn-distributed-tools>`
DGL provides a launching script ``launch.py`` under
`dgl/tools <https://github.com/dmlc/dgl/tree/master/tools>`__ to launch a distributed
training job in a cluster. This script makes the following assumptions:
DGL provides two scripts to assist in distributed training:
* *tools/copy_files.py* for copying graph partitions to a graph,
* *tools/launch.py* for launching a distributed training job in a cluster of machines.
*copy_files.py* copies partitioned data and related files (e.g., training script)
in a machine (where the graph is partitioned) to a cluster of machines (where the distributed
training occurs). The script copies a partition to a machine where the distributed training job
will require the partition. The script contains four arguments:
* ``--part_config`` specifies the partition configuration file that contains the information
of the partitioned data in the local machine.
* ``--ip_config`` specifies the IP configuration file of the cluster.
* ``--workspace`` specifies the directory in the training machines where all data related
to distributed training are stored.
* ``--rel_data_path`` specifies the relative path under the workspace directory where
the partitioned data will be stored.
* ``--script_folder`` specifies the relative path under the workspace directory where
user's training scripts are stored.
**Note**: *copy_files.py* finds the right machine to store a partition based on the IP
configuration file. Therefore, the same IP configuration file should be used by copy_files.py
and launch.py.
DGL provides tools/launch.py to launch a distributed training job in a cluster.
This script makes the following assumptions:
* The partitioned data and the training script have been copied to the cluster or
a global storage (e.g., NFS) accessible to all machines in the cluster.
* The master machine (where the launch script is executed) has passwordless ssh access
to all other machines.
**Note**: The launch script has to be invoked on one of the machines in the cluster.
* The partitioned data and the training script have been provisioned to the cluster or
a shared storage (e.g., NFS) accessible to all the worker machines.
* The machine that invokes ``launch.py`` has passwordless ssh access
to all other machines. The launching machine must be one of the worker machines.
Below shows an example of launching a distributed training job in a cluster.
.. code:: none
.. code:: bash
python3 tools/launch.py \
--workspace ~graphsage/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
"python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1"
python3 tools/launch.py \
--workspace /my/workspace/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/mygraph.json \
--ip_config ip_config.txt \
"python3 my_train_script.py"
The configuration file *ip_config.txt* contains the IP addresses of the machines in a cluster.
A typical example of *ip_config.txt* is as follows:
The argument specifies the workspace path, where to find the partition metadata JSON
and machine IP configurations, how many trainer, sampler, and server processes to be launched
on each machine. The last argument is the command to launch which is usually the
model training/evaluation script.
Each line of ``ip_config.txt`` is the IP address of a machine in the cluster.
Optionally, the IP address can be followed by a network port (default is ``30050``).
A typical example is as follows:
.. code:: none
......@@ -62,61 +41,70 @@ A typical example of *ip_config.txt* is as follows:
172.31.29.175
172.31.16.98
Each row is an IP address of a machine. Optionally, the IP address can be followed by a port
that specifies the port used by network communication between trainers. When the port is not
provided, a default one is ``30050``.
The workspace specified in the launch script is the working directory in the
machines, which contains the training script, the IP configuration file, the
partition configuration file as well as the graph partitions. All paths of the
files should be specified as relative paths to the workspace.
The launch script creates a specified number of training jobs
(``--num_trainers``) on each machine. In addition, users need to specify the
number of sampler processes for each trainer (``--num_samplers``).
The workspace specified in the launch script is the working directory in the machines,
which contains the training script, the IP configuration file, the partition configuration
file as well as the graph partitions. All paths of the files should be specified as relative
paths to the workspace.
Launching a Persistent Graph Server
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The launch script creates a specified number of training jobs (``--num_trainers``) on each machine.
In addition, a user needs to specify the number of sampler processes for each trainer
(``--num_samplers``). The number of sampler processes has to match with the number of worker processes
specified in :func:`~dgl.distributed.initialize`.
.. warning::
It is common that users may want to try different models or training configurations
against the same graph data. To avoid repetitively loading the same graph data, DGL
Persistent graph server is an experimental feature. It is only available
when the ``net_etype`` argument of :func:`dgl.distributed.initialize`
is ``"tensorpipe"``.
Normally, all the server and trainer processes will be killed after the training is done.
However, sometimes users may wish to try out different models or training configurations
against the *same* graph data. Repetitively loading the same graph data
could be costly. To avoid that, DGL
allows users to launch a persistent graph server to be shared across multiple training
jobs. A persistent graph server will stay alive even all training workers have
finished and exited. Below shows an example of launching a persistent graph server:
We first launch the graph server together with the first group of training workers.
.. code:: none
.. code:: bash
python3 tools/launch.py \
--workspace ~graphsage/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
--keep_alive \
--server_name long_live \
"python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1"
python3 tools/launch.py \
--workspace /my/workspace/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/mygraph.json \
--ip_config ip_config.txt \
--keep_alive \
--server_name long_live \
"python3 my_train_script.py"
Pay attention to the ``--keep_alive`` option, which indicates the server should
stay alive after workers have finished. ``--server_name`` is the given name of
the server which will be referred when launching new training jobs.
Launch another group of distributed training job and connect to the existing persistent server.
Then launch trainers as normal which will automatically connect to the existing
persistent server.
.. code:: none
.. code:: bash
python3 tools/launch.py \
--workspace /my/workspace/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/mygraph.json \
--ip_config ip_config.txt \
"python3 my_train_script.py"
There are several restrictions when using persistent graph servers:
python3 tools/launch.py \
--workspace ~graphsage/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
--server_name long_live \
"python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1"
.. note::
All the arguments for ``launch.py`` should be kept same as previous launch. And below
* All the arguments for ``launch.py`` should be kept same as previous launch. And below
arguments for specific training script should be kept same as well: ``--graph-name``,
``--ip_config``. The rest arguments such as ``--num-epochs``, ``--batch-size`` and so
on are free to change.
``--ip_config``.
* There is no data consistency control on the server side so data update must be carefully
handled. For example, it is recommended to avoid having multiple groups of trainers
update node/edge embeddings at the same time.
......@@ -5,6 +5,10 @@ Chapter 7: Distributed Training
:ref:`(中文版) <guide_cn-distributed>`
.. note::
Distributed training is only available for PyTorch backend.
DGL adopts a fully distributed approach that distributes both data and computation
across a collection of computation resources. In the context of this section, we
will assume a cluster setting (i.e., a group of machines). DGL partitions a graph
......@@ -15,8 +19,8 @@ the computation and runs servers on the same machines to serve partitioned data
For the training script, DGL provides distributed APIs that are similar to the ones for
mini-batch training. This makes distributed training require only small code modifications
from mini-batch training on a single machine. Below shows an example of training GraphSage
in a distributed fashion. The only code modifications are located on line 4-7:
1) initialize DGL's distributed module, 2) create a distributed graph object, and
in a distributed fashion. The notable code modifications are:
1) initialization of DGL's distributed module, 2) create a distributed graph object, and
3) split the training set and calculate the nodes for the local process.
The rest of the code, including sampler creation, model definition, training loops
are the same as :ref:`mini-batch training <guide-minibatch>`.
......@@ -24,13 +28,18 @@ are the same as :ref:`mini-batch training <guide-minibatch>`.
.. code:: python
import dgl
from dgl.dataloading import NeighborSampler
from dgl.distributed import DistGraph, DistDataLoader, node_split
import torch as th
# initialize distributed contexts
dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo')
g = dgl.distributed.DistGraph('graph_name', 'part_config.json')
# load distributed graph
g = DistGraph('graph_name', 'part_config.json')
pb = g.get_partition_book()
train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True)
# get training workload, i.e., training node IDs
train_nid = node_split(g.ndata['train_mask'], pb, force_even=True)
# Create sampler
......@@ -63,11 +72,6 @@ are the same as :ref:`mini-batch training <guide-minibatch>`.
loss.backward()
optimizer.step()
When running the training script in a cluster of machines, DGL provides tools to copy data
to the cluster's machines and launch the training job on all machines.
**Note**: The current distributed training API only supports the Pytorch backend.
DGL implements a few distributed components to support distributed training. The figure below
shows the components and their interactions.
......@@ -77,28 +81,35 @@ shows the components and their interactions.
Specifically, DGL's distributed training has three types of interacting processes:
*server*, *sampler* and *trainer*.
* Server processes run on each machine that stores a graph partition
(this includes the graph structure and node/edge features). These servers
work together to serve the graph data to trainers. Note that one machine may run
multiple server processes simultaneously to parallelize computation as well as
network communication.
* Sampler processes interact with the servers and sample nodes and edges to
* **Servers** store graph partitions which includes both structure data and node/edge
features. They provide services such as sampling, getting or updating node/edge
features. Note that each machine may run multiple server processes simultaneously
to increase service throughput. One of them is *main server* in charge of data
loading and sharing data via shared memory with *backup servers* that provide
services.
* **Sampler processes** interact with the servers and sample nodes and edges to
generate mini-batches for training.
* Trainers contain multiple classes to interact with servers. It has
:class:`~dgl.distributed.DistGraph` to get access to partitioned graph data and has
* **Trainers** are in charge of training networks on mini-batches. They utilize
APIs such as :class:`~dgl.distributed.DistGraph` to access partitioned graph data,
:class:`~dgl.distributed.DistEmbedding` and :class:`~dgl.distributed.DistTensor` to access
the node/edge features/embeddings. It has
:class:`~dgl.distributed.dist_dataloader.DistDataLoader` to
interact with samplers to get mini-batches.
node/edge features/embeddings and :class:`~dgl.distributed.DistDataLoader` to interact
with samplers to get mini-batches. Trainers communicate gradients among each other
using PyTorch's native ``DistributedDataParallel`` paradigm.
Besides Python APIs, DGL also provides `tools <https://github.com/dmlc/dgl/tree/master/tools>`__
for provisioning graph data and processes to the entire cluster.
Having the distributed components in mind, the rest of the section will cover
the following distributed components:
* :ref:`guide-distributed-preprocessing`
* :ref:`guide-distributed-tools`
* :ref:`guide-distributed-apis`
For more advanced users who are interested in more details:
* :ref:`guide-distributed-partition`
* :ref:`guide-distributed-hetero`
* :ref:`guide-distributed-tools`
.. toctree::
:maxdepth: 1
......@@ -106,6 +117,7 @@ the following distributed components:
:glob:
distributed-preprocessing
distributed-tools
distributed-apis
distributed-partition
distributed-hetero
distributed-tools
"""DGL distributed module contains classes and functions to support
distributed graph neural network training and inference in a cluster of
machines.
This includes a few submodules:
* distributed data structures including distributed graph, distributed tensor
and distributed embeddings.
* distributed sampling.
* distributed workload split at runtime.
* graph partition.
"""
import os
import sys
"""DGL distributed module"""
from .dist_graph import DistGraphServer, DistGraph, node_split, edge_split
from .dist_tensor import DistTensor
from .partition import partition_graph, load_partition, load_partition_feats, load_partition_book
......@@ -28,4 +13,4 @@ from .dist_context import initialize, exit_client
from .kvstore import KVServer, KVClient
from .server_state import ServerState
from .dist_dataloader import DistDataLoader
from .graph_services import sample_neighbors, sample_etype_neighbors, in_subgraph
from .graph_services import *
......@@ -12,7 +12,10 @@ from ..base import NID, EID
from ..utils import toindex
from .. import backend as F
__all__ = ['sample_neighbors', 'in_subgraph', 'find_edges']
__all__ = [
'sample_neighbors', 'sample_etype_neighbors',
'in_subgraph', 'find_edges'
]
SAMPLING_SERVICE_ID = 6657
INSUBGRAPH_SERVICE_ID = 6658
......
......@@ -173,7 +173,7 @@ def load_partition_feats(part_config, part_id):
return node_feats, edge_feats
def load_partition_book(part_config, part_id, graph=None):
''' Load a graph partition book from the partition config file.
'''Load a graph partition book from the partition config file.
Parameters
----------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment