"...git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "f57e7bd92c57bf59cba45496fc85ea77797bedcc"
Unverified Commit 311bd88a authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Doc] update doc page for distributed partition pipeline (#5064)

* [Doc] update doc page for distributed partition pipeline

* update
parent 3eac464a
...@@ -15,102 +15,6 @@ where the amount of network communication is proportional to the number of ...@@ -15,102 +15,6 @@ where the amount of network communication is proportional to the number of
cross-partition edges. DGL has integrated METIS as the default partitioning cross-partition edges. DGL has integrated METIS as the default partitioning
algorithm in its :func:`dgl.distributed.partition_graph` API. algorithm in its :func:`dgl.distributed.partition_graph` API.
Load balancing
~~~~~~~~~~~~~~~~
When partitioning a graph, by default, METIS only balances the number of nodes
in each partition. This can result in suboptimal configuration, depending on
the task at hand. For example, in the case of semi-supervised node
classification, a trainer performs computation on a subset of labeled nodes in
a local partition. A partitioning that only balances nodes in a graph (both
labeled and unlabeled), may end up with computational load imbalance. To get a
balanced workload in each partition, the partition API allows balancing between
partitions with respect to the number of nodes in each node type, by specifying
``balance_ntypes`` in :func:`~dgl.distributed.partition_graph`. Users can take
advantage of this and consider nodes in the training set, validation set and
test set are of different node types.
The following example considers nodes inside the training set and outside the
training set are two types of nodes:
.. code:: python
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
In addition to balancing the node types,
:func:`dgl.distributed.partition_graph` also allows balancing between
in-degrees of nodes of different node types by specifying ``balance_edges``.
This balances the number of edges incident to the nodes of different types.
ID mapping
~~~~~~~~~~~~~
After partitioning, :func:`~dgl.distributed.partition_graph` remap node
and edge IDs so that nodes of the same partition are aranged together
(in a consecutive ID range), making it easier to store partitioned node/edge
features. The API also automatically shuffles the node/edge features
according to the new IDs. However, some downstream tasks may want to
recover the original node/edge IDs (such as extracting the computed node
embeddings for later use). For such cases, pass ``return_mapping=True``
to :func:`~dgl.distributed.partition_graph`, which makes the API returns
the ID mappings between the remapped node/edge IDs and their origianl ones.
For a homogeneous graph, it returns two vectors. The first vector maps every new
node ID to its original ID; the second vector maps every new edge ID to
its original ID. For a heterogeneous graph, it returns two dictionaries of
vectors. The first dictionary contains the mapping for each node type; the
second dictionary contains the mapping for each edge type.
.. code:: python
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
balance_ntypes=g.ndata['train_mask'],
return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[node_map] = node_emb
As a counterpart of ``return_mapping=True`` in :func:`~dgl.distributed.partition_graph`, the
:ref:`distributed partitioning pipeline <guide-distributed-preprocessing>`
provides two arguments in ``dispatch_data.py`` to save the original node/edge IDs to disk.
* ``--save-orig-nids`` save original node IDs into files.
* ``--save-orig-eids`` save original edge IDs into files.
Specifying the two options will create two files ``orig_nids.dgl`` and ``orig_eids.dgl``
under each partition folder.
.. code-block:: none
data_root_dir/
|-- graph_name.json # partition configuration file in JSON
|-- part0/ # data for partition 0
| |-- orig_nids.dgl # original node IDs
| |-- orig_eids.dgl # original edge IDs
| |-- ... # other data such as graph and node/edge feats
|
|-- part1/ # data for partition 1
| |-- orig_nids.dgl
| |-- orig_eids.dgl
| |-- ...
|
|-- ... # data for other partitions
The two files store the original IDs as a dictionary of tensors, where keys are node/edge
type names and values are ID tensors. Users can use the :func:`dgl.data.load_tensors`
utility to load them:
.. code:: python
# Load the original IDs for the nodes in partition 0.
orig_nids_0 = dgl.data.load_tensors('/path/to/data/part0/orig_nids.dgl')
# Get the original node IDs for node type 'user'
user_orig_nids_0 = orig_nids_0['user']
# Load the original IDs for the edges in partition 0.
orig_eids_0 = dgl.data.load_tensors('/path/to/data/part0/orig_eids.dgl')
# Get the original edge IDs for edge type 'like'
like_orig_eids_0 = orig_nids_0['like']
Output format Output format
~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~
......
...@@ -4,9 +4,18 @@ ...@@ -4,9 +4,18 @@
------------------------------------------ ------------------------------------------
Before launching training jobs, DGL requires the input data to be partitioned Before launching training jobs, DGL requires the input data to be partitioned
and distributed to the target machines. For relatively small graphs, DGL and distributed to the target machines. In order to handle different scales
provides a partitioning API :func:`~dgl.distributed.partition_graph` that of graphs, DGL provides 2 partitioning approaches:
partitions an in-memory :class:`~dgl.DGLGraph` object. It supports
* A partitioning API for graphs that can fit in a single machine memory.
* A distributed partition pipeline for graphs beyond a single machine capacity.
7.1.1 Partitioning API
^^^^^^^^^^^^^^^^^^^^^^
For relatively small graphs, DGL provides a partitioning API
:func:`~dgl.distributed.partition_graph` that partitions
an in-memory :class:`~dgl.DGLGraph` object. It supports
multiple partitioning algorithms such as random partitioning and multiple partitioning algorithms such as random partitioning and
`Metis <http://glaros.dtc.umn.edu/gkhome/views/metis>`__. `Metis <http://glaros.dtc.umn.edu/gkhome/views/metis>`__.
The benefit of Metis partitioning is that it can generate partitions with The benefit of Metis partitioning is that it can generate partitions with
...@@ -49,8 +58,97 @@ CPU RAM to hold the entire graph structure and features, which may not be viable ...@@ -49,8 +58,97 @@ CPU RAM to hold the entire graph structure and features, which may not be viable
graphs with hundreds of billions of edges or large features. We describe how to use graphs with hundreds of billions of edges or large features. We describe how to use
the *parallel data preparation pipeline* for such cases next. the *parallel data preparation pipeline* for such cases next.
Parallel Data Preparation Pipeline Load balancing
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ~~~~~~~~~~~~~~
When partitioning a graph, by default, METIS only balances the number of nodes
in each partition. This can result in suboptimal configuration, depending on
the task at hand. For example, in the case of semi-supervised node
classification, a trainer performs computation on a subset of labeled nodes in
a local partition. A partitioning that only balances nodes in a graph (both
labeled and unlabeled), may end up with computational load imbalance. To get a
balanced workload in each partition, the partition API allows balancing between
partitions with respect to the number of nodes in each node type, by specifying
``balance_ntypes`` in :func:`~dgl.distributed.partition_graph`. Users can take
advantage of this and consider nodes in the training set, validation set and
test set are of different node types.
The following example considers nodes inside the training set and outside the
training set are two types of nodes:
.. code:: python
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
In addition to balancing the node types,
:func:`dgl.distributed.partition_graph` also allows balancing between
in-degrees of nodes of different node types by specifying ``balance_edges``.
This balances the number of edges incident to the nodes of different types.
ID mapping
~~~~~~~~~~~~~
After partitioning, :func:`~dgl.distributed.partition_graph` remap node
and edge IDs so that nodes of the same partition are aranged together
(in a consecutive ID range), making it easier to store partitioned node/edge
features. The API also automatically shuffles the node/edge features
according to the new IDs. However, some downstream tasks may want to
recover the original node/edge IDs (such as extracting the computed node
embeddings for later use). For such cases, pass ``return_mapping=True``
to :func:`~dgl.distributed.partition_graph`, which makes the API returns
the ID mappings between the remapped node/edge IDs and their origianl ones.
For a homogeneous graph, it returns two vectors. The first vector maps every new
node ID to its original ID; the second vector maps every new edge ID to
its original ID. For a heterogeneous graph, it returns two dictionaries of
vectors. The first dictionary contains the mapping for each node type; the
second dictionary contains the mapping for each edge type.
.. code:: python
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
balance_ntypes=g.ndata['train_mask'],
return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[node_map] = node_emb
Load partitioned graphs
^^^^^^^^^^^^^^^^^^^^^^^
DGL provides a :func:`dgl.distributed.load_partition` function to load one partition
for inspection.
.. code:: python
>>> import dgl
>>> # load partition 0
>>> part_data = dgl.distributed.load_partition('data_root_dir/graph_name.json', 0)
>>> g, nfeat, efeat, partition_book, graph_name, ntypes, etypes = part_data # unpack
>>> print(g)
Graph(num_nodes=966043, num_edges=34270118,
ndata_schemes={'orig_id': Scheme(shape=(), dtype=torch.int64),
'part_id': Scheme(shape=(), dtype=torch.int64),
'_ID': Scheme(shape=(), dtype=torch.int64),
'inner_node': Scheme(shape=(), dtype=torch.int32)}
edata_schemes={'_ID': Scheme(shape=(), dtype=torch.int64),
'inner_edge': Scheme(shape=(), dtype=torch.int8),
'orig_id': Scheme(shape=(), dtype=torch.int64)})
As mentioned in the `ID mapping`_ section, each partition carries auxiliary information
saved as ndata or edata such as original node/edge IDs, partition IDs, etc. Each partition
not only saves nodes/edges it owns, but also includes node/edges that are adjacent to
the partition (called **HALO** nodes/edges). The ``inner_node`` and ``inner_edge``
indicate whether a node/edge truely belongs to the partition (value is ``True``)
or is a HALO node/edge (value is ``False``).
The :func:`~dgl.distributed.load_partition` function loads all data at once. Users can
load features or the partition book using the :func:`dgl.distributed.load_partition_feats`
and :func:`dgl.distributed.load_partition_book` APIs respectively.
7.1.2 Distributed Graph Partitioning Pipeline
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To handle massive graph data that cannot fit in the CPU RAM of a To handle massive graph data that cannot fit in the CPU RAM of a
single machine, DGL utilizes data chunking and parallel processing to reduce single machine, DGL utilizes data chunking and parallel processing to reduce
...@@ -250,10 +348,12 @@ strict requirement as long as ``metadata.json`` contains valid file paths. ...@@ -250,10 +348,12 @@ strict requirement as long as ``metadata.json`` contains valid file paths.
* ``edges``: Dict of ``ChunkFileSpec``. Edge index files. * ``edges``: Dict of ``ChunkFileSpec``. Edge index files.
Dictionary keys are edge type names in the form of Dictionary keys are edge type names in the form of
``<source node type>:<relation>:<destination node type>``. ``<source node type>:<relation>:<destination node type>``.
* ``node_data``: Dict of ``ChunkFileSpec``. Data files that store node attributes. * ``node_data``: Dict of ``ChunkFileSpec``. Data files that store node attributes
Dictionary keys are node type names. could have arbitrary number of files regardless of ``num_parts``. Dictionary
* ``edge_data``: Dict of ``ChunkFileSpec``. Data files that store edge attributes. keys are node type names.
Dictionary keys are edge type names in the form of * ``edge_data``: Dict of ``ChunkFileSpec``. Data files that store edge attributes
could have arbitrary number of files regardless of ``num_parts``. Dictionary
keys are edge type names in the form of
``<source node type>:<relation>:<destination node type>``. ``<source node type>:<relation>:<destination node type>``.
``ChunkFileSpec`` has two keys: ``ChunkFileSpec`` has two keys:
...@@ -262,6 +362,7 @@ strict requirement as long as ``metadata.json`` contains valid file paths. ...@@ -262,6 +362,7 @@ strict requirement as long as ``metadata.json`` contains valid file paths.
details about how to parse each data file. details about how to parse each data file.
- ``"csv"``: CSV file. Use the ``delimiter`` key to specify delimiter in use. - ``"csv"``: CSV file. Use the ``delimiter`` key to specify delimiter in use.
- ``"numpy"``: NumPy array binary file created by :func:`numpy.save`. - ``"numpy"``: NumPy array binary file created by :func:`numpy.save`.
- ``"parquet"``: parquet table binary file created by :func:`pyarrow.parquet.write_table`.
* ``data``: List of string. File path to each data chunk. Support absolute path. * ``data``: List of string. File path to each data chunk. Support absolute path.
Tips for making chunked graph data Tips for making chunked graph data
...@@ -325,10 +426,6 @@ the partition ID of node i. ...@@ -325,10 +426,6 @@ the partition ID of node i.
0 0
... ...
.. note::
DGL currently requires the number of data chunks and the number of partitions to be the same.
Despite its simplicity, random partitioning may result in frequent Despite its simplicity, random partitioning may result in frequent
cross-machine communication. Check out chapter cross-machine communication. Check out chapter
:ref:`guide-distributed-partition` for more advanced options. :ref:`guide-distributed-partition` for more advanced options.
...@@ -361,8 +458,50 @@ An example IP configuration file is as follows: ...@@ -361,8 +458,50 @@ An example IP configuration file is as follows:
172.31.19.1 172.31.19.1
172.31.23.205 172.31.23.205
As a counterpart of ``return_mapping=True`` in :func:`~dgl.distributed.partition_graph`, the
:ref:`distributed partitioning pipeline <guide-distributed-preprocessing>`
provides two arguments in ``dispatch_data.py`` to save the original node/edge IDs to disk.
* ``--save-orig-nids`` save original node IDs into files.
* ``--save-orig-eids`` save original edge IDs into files.
Specifying the two options will create two files ``orig_nids.dgl`` and ``orig_eids.dgl``
under each partition folder.
.. code-block:: none
data_root_dir/
|-- graph_name.json # partition configuration file in JSON
|-- part0/ # data for partition 0
| |-- orig_nids.dgl # original node IDs
| |-- orig_eids.dgl # original edge IDs
| |-- ... # other data such as graph and node/edge feats
|
|-- part1/ # data for partition 1
| |-- orig_nids.dgl
| |-- orig_eids.dgl
| |-- ...
|
|-- ... # data for other partitions
The two files store the original IDs as a dictionary of tensors, where keys are node/edge
type names and values are ID tensors. Users can use the :func:`dgl.data.load_tensors`
utility to load them:
.. code:: python
# Load the original IDs for the nodes in partition 0.
orig_nids_0 = dgl.data.load_tensors('/path/to/data/part0/orig_nids.dgl')
# Get the original node IDs for node type 'user'
user_orig_nids_0 = orig_nids_0['user']
# Load the original IDs for the edges in partition 0.
orig_eids_0 = dgl.data.load_tensors('/path/to/data/part0/orig_eids.dgl')
# Get the original edge IDs for edge type 'like'
like_orig_eids_0 = orig_nids_0['like']
During data dispatching, DGL assumes that the combined CPU RAM of the cluster During data dispatching, DGL assumes that the combined CPU RAM of the cluster
is able to hold the entire graph data. Moreover, the number of machines (IPs) must be the is able to hold the entire graph data. Node ownership is determined by the result
same as the number of partitions. Node ownership is determined by the result
of partitioning algorithm where as for edges the owner of the destination node of partitioning algorithm where as for edges the owner of the destination node
also owns the edge as well. also owns the edge as well.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment