Unverified Commit 2cf4bd0a authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

Merge branch 'master' into dist_part

parents 2e8ae9f9 d077d371
...@@ -114,29 +114,32 @@ def track_time(data): ...@@ -114,29 +114,32 @@ def track_time(data):
loss_fcn = loss_fcn.to(device) loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr) optimizer = optim.Adam(model.parameters(), lr=lr)
# Training loop # Enable dataloader cpu affinitization for cpu devices (no effect on gpu)
avg = 0 with dataloader.enable_cpu_affinity():
iter_tput = [] # Loop over the dataloader to sample the computation dependency graph as a list of
# Loop over the dataloader to sample the computation dependency graph as a list of # blocks.
# blocks.
for step, (input_nodes, seeds, blocks) in enumerate(dataloader): # Training loop
# Load the input features as well as output labels avg = 0
blocks = [block.int().to(device) for block in blocks] iter_tput = []
batch_inputs = blocks[0].srcdata['features'] for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
batch_labels = blocks[-1].dstdata['labels'] # Load the input features as well as output labels
blocks = [block.int().to(device) for block in blocks]
batch_inputs = blocks[0].srcdata['features']
batch_labels = blocks[-1].dstdata['labels']
# Compute loss and prediction # Compute loss and prediction
batch_pred = model(blocks, batch_inputs) batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels) loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad() optimizer.zero_grad()
loss.backward() loss.backward()
optimizer.step() optimizer.step()
# start timer at before iter_start # start timer at before iter_start
if step == iter_start - 1: if step == iter_start - 1:
t0 = time.time() t0 = time.time()
elif step == iter_count + iter_start - 1: # time iter_count iterations elif step == iter_count + iter_start - 1: # time iter_count iterations
break break
t1 = time.time() t1 = time.time()
......
...@@ -288,24 +288,26 @@ def track_time(data): ...@@ -288,24 +288,26 @@ def track_time(data):
optimizer.zero_grad() optimizer.zero_grad()
sparse_optimizer.zero_grad() sparse_optimizer.zero_grad()
for step, (input_nodes, seeds, blocks) in enumerate(loader): # Enable dataloader cpu affinitization for cpu devices (no effect on gpu)
blocks = [blk.to(device) for blk in blocks] with loader.enable_cpu_affinity():
seeds = seeds[category] # we only predict the nodes with type "category" for step, (input_nodes, seeds, blocks) in enumerate(loader):
batch_tic = time.time() blocks = [blk.to(device) for blk in blocks]
emb = embed_layer(blocks[0]) seeds = seeds[category] # we only predict the nodes with type "category"
lbl = labels[seeds].to(device) batch_tic = time.time()
emb = {k : e.to(device) for k, e in emb.items()} emb = embed_layer(blocks[0])
logits = model(emb, blocks)[category] lbl = labels[seeds].to(device)
loss = F.cross_entropy(logits, lbl) emb = {k : e.to(device) for k, e in emb.items()}
loss.backward() logits = model(emb, blocks)[category]
optimizer.step() loss = F.cross_entropy(logits, lbl)
sparse_optimizer.step() loss.backward()
optimizer.step()
# start timer at before iter_start sparse_optimizer.step()
if step == iter_start - 1:
t0 = time.time() # start timer at before iter_start
elif step == iter_count + iter_start - 1: # time iter_count iterations if step == iter_start - 1:
break t0 = time.time()
elif step == iter_count + iter_start - 1: # time iter_count iterations
break
t1 = time.time() t1 = time.time()
......
...@@ -283,27 +283,29 @@ def track_time(data): ...@@ -283,27 +283,29 @@ def track_time(data):
model.train() model.train()
embed_layer.train() embed_layer.train()
for step, sample_data in enumerate(loader): # Enable dataloader cpu affinitization for cpu devices (no effect on gpu)
input_nodes, output_nodes, blocks = sample_data with loader.enable_cpu_affinity():
feats = embed_layer(input_nodes, for step, sample_data in enumerate(loader):
blocks[0].srcdata['ntype'], input_nodes, output_nodes, blocks = sample_data
blocks[0].srcdata['type_id'], feats = embed_layer(input_nodes,
node_feats) blocks[0].srcdata['ntype'],
logits = model(blocks, feats) blocks[0].srcdata['type_id'],
seed_idx = blocks[-1].dstdata['type_id'] node_feats)
loss = F.cross_entropy(logits, labels[seed_idx]) logits = model(blocks, feats)
optimizer.zero_grad() seed_idx = blocks[-1].dstdata['type_id']
emb_optimizer.zero_grad() loss = F.cross_entropy(logits, labels[seed_idx])
optimizer.zero_grad()
emb_optimizer.zero_grad()
loss.backward() loss.backward()
optimizer.step() optimizer.step()
emb_optimizer.step() emb_optimizer.step()
# start timer at before iter_start # start timer at before iter_start
if step == iter_start - 1: if step == iter_start - 1:
t0 = time.time() t0 = time.time()
elif step == iter_count + iter_start - 1: # time iter_count iterations elif step == iter_count + iter_start - 1: # time iter_count iterations
break break
t1 = time.time() t1 = time.time()
......
...@@ -93,29 +93,32 @@ def track_time(data): ...@@ -93,29 +93,32 @@ def track_time(data):
loss_fcn = nn.CrossEntropyLoss() loss_fcn = nn.CrossEntropyLoss()
loss_fcn = loss_fcn.to(device) loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr) optimizer = optim.Adam(model.parameters(), lr=lr)
# Enable dataloader cpu affinitization for cpu devices (no effect on gpu)
with dataloader.enable_cpu_affinity():
# Training loop
avg = 0
iter_tput = []
# Training loop for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
avg = 0 # Load the input features as well as output labels
iter_tput = [] #batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
for step, (input_nodes, seeds, blocks) in enumerate(dataloader): blocks = [block.int().to(device) for block in blocks]
# Load the input features as well as output labels batch_inputs = blocks[0].srcdata['features']
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device) batch_labels = blocks[-1].dstdata['labels']
blocks = [block.int().to(device) for block in blocks]
batch_inputs = blocks[0].srcdata['features']
batch_labels = blocks[-1].dstdata['labels']
# Compute loss and prediction # Compute loss and prediction
batch_pred = model(blocks, batch_inputs) batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels) loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad() optimizer.zero_grad()
loss.backward() loss.backward()
optimizer.step() optimizer.step()
# start timer at before iter_start # start timer at before iter_start
if step == iter_start - 1: if step == iter_start - 1:
t0 = time.time() t0 = time.time()
elif step == iter_count + iter_start - 1: # time iter_count iterations elif step == iter_count + iter_start - 1: # time iter_count iterations
break break
t1 = time.time() t1 = time.time()
......
...@@ -3,7 +3,20 @@ ...@@ -3,7 +3,20 @@
dgl.distributed dgl.distributed
================================= =================================
.. automodule:: dgl.distributed .. currentmodule:: dgl.distributed
DGL distributed module contains classes and functions to support
distributed Graph Neural Network training and inference on a cluster of
machines.
This includes a few submodules:
* distributed data structures including distributed graph, distributed tensor
and distributed embeddings.
* distributed sampling.
* distributed workload split at runtime.
* graph partition.
Initialization Initialization
--------------- ---------------
...@@ -27,26 +40,22 @@ Distributed Tensor ...@@ -27,26 +40,22 @@ Distributed Tensor
Distributed Node Embedding Distributed Node Embedding
--------------------- ---------------------
.. currentmodule:: dgl.distributed
.. autoclass:: DistEmbedding .. autoclass:: DistEmbedding
Distributed embedding optimizer Distributed embedding optimizer
------------------------- -------------------------
.. currentmodule:: dgl.distributed.optim.pytorch
.. autoclass:: SparseAdagrad .. autoclass:: dgl.distributed.optim.SparseAdagrad
:members: step :members: step
.. autoclass:: SparseAdam .. autoclass:: dgl.distributed.optim.SparseAdam
:members: step :members: step
Distributed workload split Distributed workload split
-------------------------- --------------------------
.. currentmodule:: dgl.distributed.dist_graph
.. autosummary:: .. autosummary::
:toctree: ../../generated/ :toctree: ../../generated/
...@@ -59,19 +68,17 @@ Distributed Sampling ...@@ -59,19 +68,17 @@ Distributed Sampling
Distributed DataLoader Distributed DataLoader
`````````````````````` ``````````````````````
.. currentmodule:: dgl.distributed.dist_dataloader
.. autoclass:: DistDataLoader .. autoclass:: DistDataLoader
Distributed Neighbor Sampling .. _api-distributed-sampling-ops:
````````````````````````````` Distributed Graph Sampling Operators
```````````````````````````````````````
.. currentmodule:: dgl.distributed.graph_services
.. autosummary:: .. autosummary::
:toctree: ../../generated/ :toctree: ../../generated/
sample_neighbors sample_neighbors
sample_etype_neighbors
find_edges find_edges
in_subgraph in_subgraph
...@@ -81,18 +88,14 @@ Partition ...@@ -81,18 +88,14 @@ Partition
Graph partition book Graph partition book
```````````````````` ````````````````````
.. currentmodule:: dgl.distributed.graph_partition_book
.. autoclass:: GraphPartitionBook .. autoclass:: GraphPartitionBook
:members: shared_memory, num_partitions, metadata, nid2partid, eid2partid, partid2nids, partid2eids, nid2localnid, eid2localeid, partid, map_to_per_ntype, map_to_per_etype, map_to_homo_nid, map_to_homo_eid :members: shared_memory, num_partitions, metadata, nid2partid, eid2partid, partid2nids, partid2eids, nid2localnid, eid2localeid, partid, map_to_per_ntype, map_to_per_etype, map_to_homo_nid, map_to_homo_eid, canonical_etypes
.. autoclass:: PartitionPolicy .. autoclass:: PartitionPolicy
:members: policy_str, part_id, partition_book, to_local, to_partid, get_part_size, get_size :members: policy_str, part_id, partition_book, to_local, to_partid, get_part_size, get_size
Split and Load Graphs Split and Load Partitions
````````````````````` ````````````````````````````
.. currentmodule:: dgl.distributed.partition
.. autosummary:: .. autosummary::
:toctree: ../../generated/ :toctree: ../../generated/
...@@ -101,4 +104,3 @@ Split and Load Graphs ...@@ -101,4 +104,3 @@ Split and Load Graphs
load_partition_feats load_partition_feats
load_partition_book load_partition_book
partition_graph partition_graph
.. _guide-distributed-apis: .. _guide-distributed-apis:
7.2 Distributed APIs 7.3 Programming APIs
-------------------- -----------------------------------
:ref:`(中文版) <guide_cn-distributed-apis>` :ref:`(中文版) <guide_cn-distributed-apis>`
This section covers the distributed APIs used in the training script. DGL provides three distributed This section covers the core python components commonly used in a training script. DGL
data structures and various APIs for initialization, distributed sampling and workload split. provides three distributed data structures and various APIs for initialization,
For distributed training/inference, DGL provides three distributed data structures: distributed sampling and workload split.
:class:`~dgl.distributed.DistGraph` for distributed graphs, :class:`~dgl.distributed.DistTensor` for
distributed tensors and :class:`~dgl.distributed.DistEmbedding` for distributed learnable embeddings. * :class:`~dgl.distributed.DistGraph` for accessing structure and feature of a distributedly
stored graph.
* :class:`~dgl.distributed.DistTensor` for accessing node/edge feature tensor that
is partitioned across machines.
* :class:`~dgl.distributed.DistEmbedding` for accessing learnable node/edge embedding
tensor that is partitioned across machines.
Initialization of the DGL distributed module Initialization of the DGL distributed module
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:func:`~dgl.distributed.initialize` initializes the distributed module. When the training script runs :func:`dgl.distributed.initialize` initializes the distributed module. If invoked
in the trainer mode, this API builds connections with DGL servers and creates sampler processes; by a trainer, this API creates sampler processes and builds connections with graph
when the script runs in the server mode, this API runs the server code and never returns. This API servers; if invoked by graph server, this API starts a service loop to listen to
has to be called before any of DGL's distributed APIs. When working with Pytorch, trainer/sampler requests. The API *must* be called before
:func:`~dgl.distributed.initialize` has to be invoked before ``torch.distributed.init_process_group``. :func:`torch.distributed.init_process_group` and any other ``dgl.distributed`` APIs
Typically, the initialization APIs should be invoked in the following order: as shown in the order below:
.. code:: python .. code:: python
dgl.distributed.initialize('ip_config.txt') dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo') th.distributed.init_process_group(backend='gloo')
**Note**: If the training script contains user-defined functions (UDFs) that have to be invoked on .. note::
the servers (see the section of DistTensor and DistEmbedding for more details), these UDFs have to
be declared before :func:`~dgl.distributed.initialize`. If the training script contains user-defined functions (UDFs) that have to be invoked on
the servers (see the section of DistTensor and DistEmbedding for more details), these UDFs have to
be declared before :func:`~dgl.distributed.initialize`.
Distributed graph Distributed graph
~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~
:class:`~dgl.distributed.DistGraph` is a Python class to access the graph structure and node/edge features :class:`~dgl.distributed.DistGraph` is a Python class to access the graph
in a cluster of machines. Each machine is responsible for one and only one partition. It loads structure and node/edge features in a cluster of machines. Each machine is
the partition data (the graph structure and the node data and edge data in the partition) and makes responsible for one and only one partition. It loads the partition data (the
it accessible to all trainers in the cluster. :class:`~dgl.distributed.DistGraph` provides a small subset graph structure and the node data and edge data in the partition) and makes it
of :class:`~dgl.DGLGraph` APIs for data access. accessible to all trainers in the cluster. :class:`~dgl.distributed.DistGraph`
provides a small subset of :class:`~dgl.DGLGraph` APIs for data access.
**Note**: :class:`~dgl.distributed.DistGraph` currently only supports graphs of one node type and one edge type.
Distributed mode vs. standalone mode Distributed mode vs. standalone mode
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
:class:`~dgl.distributed.DistGraph` can run in two modes: distributed mode and standalone mode. :class:`~dgl.distributed.DistGraph` can run in two modes: *distributed mode* and *standalone mode*.
When a user executes a training script in a Python command line or Jupyter Notebook, it runs in When a user executes a training script in a Python command line or Jupyter Notebook, it runs in
a standalone mode. That is, it runs all computation in a single process and does not communicate a standalone mode. That is, it runs all computation in a single process and does not communicate
with any other processes. Thus, the standalone mode requires the input graph to have only one partition. with any other processes. Thus, the standalone mode requires the input graph to have only one partition.
...@@ -58,32 +64,36 @@ of machines and access them through the network. ...@@ -58,32 +64,36 @@ of machines and access them through the network.
DistGraph creation DistGraph creation
^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
In the distributed mode, the creation of :class:`~dgl.distributed.DistGraph` requires the graph name used In the distributed mode, the creation of :class:`~dgl.distributed.DistGraph`
during graph partitioning. The graph name identifies the graph loaded in the cluster. requires the graph name given during graph partitioning. The graph name
identifies the graph loaded in the cluster.
.. code:: python .. code:: python
import dgl import dgl
g = dgl.distributed.DistGraph('graph_name') g = dgl.distributed.DistGraph('graph_name')
When running in the standalone mode, it loads the graph data in the local machine. Therefore, users need When running in the standalone mode, it loads the graph data in the local
to provide the partition configuration file, which contains all information about the input graph. machine. Therefore, users need to provide the partition configuration file,
which contains all information about the input graph.
.. code:: python .. code:: python
import dgl import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json') g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
**Note**: In the current implementation, DGL only allows the creation of a single DistGraph object. The behavior .. note::
of destroying a DistGraph and creating a new one is undefined.
DGL only allows one single ``DistGraph`` object. The behavior
of destroying a DistGraph and creating a new one is undefined.
Access graph structure Accessing graph structure
^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
:class:`~dgl.distributed.DistGraph` provides a very small number of APIs to access the graph structure. :class:`~dgl.distributed.DistGraph` provides a set of APIs to
Currently, most APIs provide graph information, such as the number of nodes and edges. The main use case access the graph structure. Currently, most APIs provide graph information,
of DistGraph is to run sampling APIs to support mini-batch training (see the section of distributed such as the number of nodes and edges. The main use case of DistGraph is to run
graph sampling). sampling APIs to support mini-batch training (see `Distributed sampling`_).
.. code:: python .. code:: python
...@@ -124,8 +134,10 @@ in the cluster even if the :class:`~dgl.distributed.DistTensor` object disappear ...@@ -124,8 +134,10 @@ in the cluster even if the :class:`~dgl.distributed.DistTensor` object disappear
tensor = dgl.distributed.DistTensor((g.number_of_nodes(), 10), th.float32, name='test') tensor = dgl.distributed.DistTensor((g.number_of_nodes(), 10), th.float32, name='test')
**Note**: :class:`~dgl.distributed.DistTensor` creation is a synchronized operation. All trainers .. note::
have to invoke the creation and the creation succeeds only when all trainers call it.
:class:`~dgl.distributed.DistTensor` creation is a synchronized operation. All trainers
have to invoke the creation and the creation succeeds only when all trainers call it.
A user can add a :class:`~dgl.distributed.DistTensor` to a :class:`~dgl.distributed.DistGraph` A user can add a :class:`~dgl.distributed.DistTensor` to a :class:`~dgl.distributed.DistGraph`
object as one of the node data or edge data. object as one of the node data or edge data.
...@@ -134,13 +146,15 @@ object as one of the node data or edge data. ...@@ -134,13 +146,15 @@ object as one of the node data or edge data.
g.ndata['feat'] = tensor g.ndata['feat'] = tensor
**Note**: The node data name and the tensor name do not have to be the same. The former identifies .. note::
node data from :class:`~dgl.distributed.DistGraph` (in the trainer process) while the latter
identifies a distributed tensor in DGL servers. The node data name and the tensor name do not have to be the same. The former identifies
node data from :class:`~dgl.distributed.DistGraph` (in the trainer process) while the latter
identifies a distributed tensor in DGL servers.
:class:`~dgl.distributed.DistTensor` provides a small set of functions. It has the same APIs as :class:`~dgl.distributed.DistTensor` has the same APIs as
regular tensors to access its metadata, such as the shape and dtype. regular tensors to access its metadata, such as the shape and dtype. It also
:class:`~dgl.distributed.DistTensor` supports indexed reads and writes but does not support supports indexed reads and writes but does not support
computation operators, such as sum and mean. computation operators, such as sum and mean.
.. code:: python .. code:: python
...@@ -149,12 +163,16 @@ computation operators, such as sum and mean. ...@@ -149,12 +163,16 @@ computation operators, such as sum and mean.
print(data) print(data)
g.ndata['feat'][[3, 4, 5]] = data g.ndata['feat'][[3, 4, 5]] = data
**Note**: Currently, DGL does not provide protection for concurrent writes from multiple trainers
when a machine runs multiple servers. This may result in data corruption. One way to avoid concurrent .. note::
writes to the same row of data is to run one server process on a machine.
Currently, DGL does not provide protection for concurrent writes from
multiple trainers when a machine runs multiple servers. This may result in
data corruption. One way to avoid concurrent writes to the same row of data
is to run one server process on a machine.
Distributed DistEmbedding Distributed DistEmbedding
~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
DGL provides :class:`~dgl.distributed.DistEmbedding` to support transductive models that require DGL provides :class:`~dgl.distributed.DistEmbedding` to support transductive models that require
node embeddings. Creating distributed embeddings is very similar to creating distributed tensors. node embeddings. Creating distributed embeddings is very similar to creating distributed tensors.
...@@ -167,20 +185,25 @@ node embeddings. Creating distributed embeddings is very similar to creating dis ...@@ -167,20 +185,25 @@ node embeddings. Creating distributed embeddings is very similar to creating dis
return arr return arr
emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer) emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
Internally, distributed embeddings are built on top of distributed tensors, and, thus, has Internally, distributed embeddings are built on top of distributed tensors,
very similar behaviors to distributed tensors. For example, when embeddings are created, they and, thus, has very similar behaviors to distributed tensors. For example, when
are sharded and stored across all machines in the cluster. It can be uniquely identified by a name. embeddings are created, they are sharded and stored across all machines in the
cluster. It can be uniquely identified by a name.
.. note::
**Note**: The initializer function is invoked in the server process. Therefore, it has to be The initializer function is invoked in the server process. Therefore, it has to be
declared before :class:`~dgl.distributed.initialize`. declared before :class:`dgl.distributed.initialize`.
Because the embeddings are part of the model, a user has to attach them to an optimizer for Because the embeddings are part of the model, a user has to attach them to an
mini-batch training. Currently, DGL provides a sparse Adagrad optimizer optimizer for mini-batch training. Currently, DGL provides a sparse Adagrad
:class:`~dgl.distributed.SparseAdagrad` (DGL will add more optimizers for sparse embeddings later). optimizer :class:`~dgl.distributed.SparseAdagrad` (DGL will add more optimizers
Users need to collect all distributed embeddings from a model and pass them to the sparse optimizer. for sparse embeddings later). Users need to collect all distributed embeddings
If a model has both node embeddings and regular dense model parameters and users want to perform from a model and pass them to the sparse optimizer. If a model has both node
sparse updates on the embeddings, they need to create two optimizers, one for node embeddings and embeddings and regular dense model parameters and users want to perform sparse
the other for dense model parameters, as shown in the code below: updates on the embeddings, they need to create two optimizers, one for node
embeddings and the other for dense model parameters, as shown in the code
below:
.. code:: python .. code:: python
...@@ -192,31 +215,41 @@ the other for dense model parameters, as shown in the code below: ...@@ -192,31 +215,41 @@ the other for dense model parameters, as shown in the code below:
optimizer.step() optimizer.step()
sparse_optimizer.step() sparse_optimizer.step()
**Note**: :class:`~dgl.distributed.DistEmbedding` is not an Pytorch nn module, so we cannot .. note::
get access to it from parameters of a Pytorch nn module.
:class:`~dgl.distributed.DistEmbedding` does not inherit :class:`torch.nn.Module`,
so we recommend using it outside of your own NN module.
Distributed sampling Distributed sampling
~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~
DGL provides two levels of APIs for sampling nodes and edges to generate mini-batches DGL provides two levels of APIs for sampling nodes and edges to generate
(see the section of mini-batch training). The low-level APIs require users to write code mini-batches (see the section of mini-batch training). The low-level APIs
to explicitly define how a layer of nodes are sampled (e.g., using :func:`dgl.sampling.sample_neighbors` ). require users to write code to explicitly define how a layer of nodes are
The high-level sampling APIs implement a few popular sampling algorithms for node classification sampled (e.g., using :func:`dgl.sampling.sample_neighbors` ). The high-level
and link prediction tasks (e.g., :class:`~dgl.dataloading.NodeDataLoader` and sampling APIs implement a few popular sampling algorithms for node
classification and link prediction tasks (e.g.,
:class:`~dgl.dataloading.NodeDataLoader` and
:class:`~dgl.dataloading.EdgeDataLoader` ). :class:`~dgl.dataloading.EdgeDataLoader` ).
The distributed sampling module follows the same design and provides two levels of sampling APIs. The distributed sampling module follows the same design and provides two levels
For the lower-level sampling API, it provides :func:`~dgl.distributed.sample_neighbors` for of sampling APIs. For the lower-level sampling API, it provides
distributed neighborhood sampling on :class:`~dgl.distributed.DistGraph`. In addition, DGL provides :func:`~dgl.distributed.sample_neighbors` for distributed neighborhood sampling
a distributed DataLoader (:class:`~dgl.distributed.DistDataLoader` ) for distributed sampling. on :class:`~dgl.distributed.DistGraph`. In addition, DGL provides a distributed
The distributed DataLoader has the same interface as Pytorch DataLoader except that users cannot DataLoader (:class:`~dgl.distributed.DistDataLoader` ) for distributed
specify the number of worker processes when creating a dataloader. The worker processes are created sampling. The distributed DataLoader has the same interface as Pytorch
in :func:`dgl.distributed.initialize`. DataLoader except that users cannot specify the number of worker processes when
creating a dataloader. The worker processes are created in
**Note**: When running :func:`dgl.distributed.sample_neighbors` on :class:`~dgl.distributed.DistGraph`, :func:`dgl.distributed.initialize`.
the sampler cannot run in Pytorch DataLoader with multiple worker processes. The main reason is that
Pytorch DataLoader creates new sampling worker processes in every epoch, which leads to creating and .. note::
destroying :class:`~dgl.distributed.DistGraph` objects many times.
When running :func:`dgl.distributed.sample_neighbors` on
:class:`~dgl.distributed.DistGraph`, the sampler cannot run in Pytorch
DataLoader with multiple worker processes. The main reason is that Pytorch
DataLoader creates new sampling worker processes in every epoch, which
leads to creating and destroying :class:`~dgl.distributed.DistGraph`
objects many times.
When using the low-level API, the sampling code is similar to single-process sampling. The only When using the low-level API, the sampling code is similar to single-process sampling. The only
difference is that users need to use :func:`dgl.distributed.sample_neighbors` and difference is that users need to use :func:`dgl.distributed.sample_neighbors` and
...@@ -243,8 +276,8 @@ difference is that users need to use :func:`dgl.distributed.sample_neighbors` an ...@@ -243,8 +276,8 @@ difference is that users need to use :func:`dgl.distributed.sample_neighbors` an
The high-level sampling APIs (:class:`~dgl.dataloading.NodeDataLoader` and The high-level sampling APIs (:class:`~dgl.dataloading.NodeDataLoader` and
:class:`~dgl.dataloading.EdgeDataLoader` ) has distributed counterparts :class:`~dgl.dataloading.EdgeDataLoader` ) has distributed counterparts
(:class:`~dgl.dataloading.DistNodeDataLoader` and (:class:`~dgl.dataloading.DistNodeDataLoader` and
:class:`~dgl.dataloading.DistEdgeDataLoader`). The code is exactly the :class:`~dgl.dataloading.DistEdgeDataLoader`). The code is exactly the same as
same as single-process sampling otherwise. single-process sampling otherwise.
.. code:: python .. code:: python
...@@ -256,30 +289,33 @@ same as single-process sampling otherwise. ...@@ -256,30 +289,33 @@ same as single-process sampling otherwise.
Split workloads Split workloads
~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~
To train a model, users first need to split the dataset into training, validation and test sets. To train a model, users first need to split the dataset into training,
For distributed training, this step is usually done before we invoke :func:`dgl.distributed.partition_graph` validation and test sets. For distributed training, this step is usually done
to partition a graph. We recommend to store the data split in boolean arrays as node data or edge data. before we invoke :func:`dgl.distributed.partition_graph` to partition a graph.
For node classification tasks, the length of these boolean arrays is the number of nodes in a graph We recommend to store the data split in boolean arrays as node data or edge
and each of their elements indicates the existence of a node in a training/validation/test set. data. For node classification tasks, the length of these boolean arrays is the
Similar boolean arrays should be used for link prediction tasks. number of nodes in a graph and each of their elements indicates the existence
:func:`dgl.distributed.partition_graph` splits these boolean arrays (because they are stored as of a node in a training/validation/test set. Similar boolean arrays should be
the node data or edge data of the graph) based on the graph partitioning used for link prediction tasks. :func:`dgl.distributed.partition_graph` splits
result and store them with graph partitions. these boolean arrays (because they are stored as the node data or edge data of
the graph) based on the graph partitioning result and store them with graph
During distributed training, users need to assign training nodes/edges to each trainer. Similarly, partitions.
we also need to split the validation and test set in the same way.
DGL provides :func:`~dgl.distributed.node_split` and :func:`~dgl.distributed.edge_split` to During distributed training, users need to assign training nodes/edges to each
split the training, validation and test set at runtime for distributed training. The two functions trainer. Similarly, we also need to split the validation and test set in the
take the boolean arrays constructed before graph partitioning as input, split them and same way. DGL provides :func:`~dgl.distributed.node_split` and
return a portion for the local trainer. :func:`~dgl.distributed.edge_split` to split the training, validation and test
By default, they ensure that all portions have the same number of nodes/edges. This is set at runtime for distributed training. The two functions take the boolean
important for synchronous SGD, which assumes each trainer has the same number of mini-batches. arrays constructed before graph partitioning as input, split them and return a
portion for the local trainer. By default, they ensure that all portions have
The example below splits the training set and returns a subset of nodes for the local process. the same number of nodes/edges. This is important for synchronous SGD, which
assumes each trainer has the same number of mini-batches.
The example below splits the training set and returns a subset of nodes for the
local process.
.. code:: python .. code:: python
train_nids = dgl.distributed.node_split(g.ndata['train_mask']) train_nids = dgl.distributed.node_split(g.ndata['train_mask'])
.. _guide-distributed-hetero: .. _guide-distributed-hetero:
7.3 Distributed Heterogeneous graph training 7.5 Heterogeneous Graph Under The Hood
-------------------------------------------- --------------------------------------------
DGL v0.6.0 provides an experimental support for distributed training on heterogeneous graphs. The chapter covers the implementation details of distributed heterogeneous
In DGL, a node or edge in a heterogeneous graph has a unique ID in its own node type or edge type. graph. They are transparent to users in most scenarios but could be useful
DGL identifies a node or edge with a tuple: node/edge type and type-wise ID. In distributed training, for advanced customization.
a node or edge can be identified by a homogeneous ID, in addition to the tuple of node/edge type
and type-wise ID. The homogeneous ID is unique regardless of the node type and edge type. In DGL, a node or edge in a heterogeneous graph has a unique ID in its own node
DGL arranges nodes and edges so that all nodes of the same type have contiguous type or edge type. Therefore, DGL can identify a node or an edge
homogeneous IDs. with a tuple: ``(node/edge type, type-wise ID)``. We call IDs of such form as
**heterogeneous IDs**. To patition a heterogeneous graph for distributed training,
Below is an example adjancency matrix of a heterogeneous graph showing the homogeneous ID assignment. DGL converts it to a homogeneous graph so that we can reuse the partitioning
Here, the graph has two types of nodes (`T0` and `T1` ), and four types of edges (`R0`, `R1`, `R2`, `R3` ). algorithms designed for homogeneous graphs. Each node/edge is thus uniquely mapped
There are a total of 400 nodes in the graph and each type has 200 nodes. Nodes to an integer ID in a consecutive ID range (e.g., from 0 to the total number of
of `T0` have IDs in [0,200), while nodes of `T1` have IDs in [200, 400). nodes of all types). We call the IDs after conversion as **homogeneous IDs**.
In this example, if we use a tuple to identify the nodes, nodes of `T0` are identified as
(T0, type-wise ID), where type-wise ID falls in [0, 200); nodes of `T1` are identified as Below is an illustration of the ID conversion process. Here, the graph has two
(T1, type-wise ID), where type-wise ID also falls in [0, 200). types of nodes (:math:`T0` and :math:`T1` ), and four types of edges
(:math:`R0`, :math:`R1`, :math:`R2`, :math:`R3` ). There are a total of 400
nodes in the graph and each type has 200 nodes. Nodes of :math:`T0` have IDs in
[0,200), while nodes of :math:`T1` have IDs in [200, 400). In this example, if
we use a tuple to identify the nodes, nodes of :math:`T0` are identified as
(T0, type-wise ID), where type-wise ID falls in [0, 200); nodes of :math:`T1`
are identified as (T1, type-wise ID), where type-wise ID also falls in [0,
200).
.. figure:: https://data.dgl.ai/tutorial/hetero/heterograph_ids.png .. figure:: https://data.dgl.ai/tutorial/hetero/heterograph_ids.png
:alt: Imgur :alt: Imgur
7.3.1 Access distributed graph data ID Conversion Utilities
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^
For distributed training, :class:`~dgl.distributed.DistGraph` supports the heterogeneous graph API
in :class:`~dgl.DGLGraph`. Below shows an example of getting node data of `T0` on some nodes
by using type-wise node IDs. When accessing data in :class:`~dgl.distributed.DistGraph`, a user
needs to use type-wise IDs and corresponding node types or edge types.
.. code:: python
import dgl During Preprocessing
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json') ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
feat = g.nodes['T0'].data['feat'][type_wise_ids]
A user can create distributed tensors and distributed embeddings for a particular node type or The steps of :ref:`Parallel Processing Pipeline <guide-distributed-preprocessing>`
edge type. Distributed tensors and embeddings are split and stored in multiple machines. To create all use heterogeneous IDs for their inputs and outputs. Nevertheless, some steps such as
one, a user needs to specify how it is partitioned with :class:`~dgl.distributed.PartitionPolicy`. ParMETIS partitioning are easier to be implemented using homogeneous IDs, thus
By default, DGL chooses the right partition policy based on the size of the first dimension. requiring a utility to perform ID conversion.
However, if multiple node types or edge types have the same number of nodes or edges, DGL cannot The code below implements a simple ``IDConverter`` using the metadata information
determine the partition policy automatically. A user needs to explicitly specify the partition policy. in the metadata JSON from the chunked graph data format. It starts from some
Below shows an example of creating a distributed tensor for node type `T0` by using the partition policy node type :math:`A` as node type 0, then assigns all its nodes with IDs
for `T0` and store it as node data of `T0`. in range :math:`[0, |V_A|-1)`. It then moves to the next node
type B as node type 1 and assigns all its nodes with IDs in range
:math:`[|V_A|, |V_A|+|V_B|-1)`.
.. code:: python .. code:: python
g.nodes['T0'].data['feat1'] = dgl.distributed.DistTensor((g.number_of_nodes('T0'), 1), th.float32, 'feat1', from bisect import bisect_left
part_policy=g.get_node_partition_policy('T0')) import numpy as np
The partition policies used for creating distributed tensors and embeddings are initialized when a heterogeneous class IDConverter:
graph is loaded into the graph server. A user cannot create a new partition policy at runtime. Therefore, a user def __init__(self, meta):
can only create distributed tensors or embeddings for a node type or edge type. # meta is the JSON object loaded from metadata.json
Accessing distributed tensors and embeddings also requires type-wise IDs. self.node_type = meta['node_type']
self.edge_type = meta['edge_type']
7.3.2 Distributed sampling self.ntype2id_map = {ntype : i for i, ntype in enumerate(self.node_type)}
^^^^^^^^^^^^^^^^^^^^^^^^^^ self.etype2id_map = {etype : i for i, etype in enumerate(self.edge_type)}
self.num_nodes = [sum(ns) for ns in meta['num_nodes_per_chunk']]
DGL v0.6 uses homogeneous IDs in distributed sampling. **Note**: this may change in the future release. self.num_edges = [sum(ns) for ns in meta['num_edges_per_chunk']]
DGL provides four APIs to convert node IDs and edge IDs between the homogeneous IDs and type-wise IDs: self.nid_offset = np.cumsum([0] + self.num_nodes)
self.eid_offset = np.cumsum([0] + self.num_edges)
def ntype2id(self, ntype):
"""From node type name to node type ID"""
return self.ntype2id_map[ntype]
def etype2id(self, etype):
"""From edge type name to edge type ID"""
return self.etype2id_map[etype]
def id2ntype(self, id):
"""From node type ID to node type name"""
return self.node_type[id]
def id2etype(self, id):
"""From edge type ID to edge type name"""
return self.edge_type[id]
def nid_het2hom(self, ntype, id):
"""From heterogeneous node ID to homogeneous node ID"""
tid = self.ntype2id(ntype)
if id < 0 or id >= self.num_nodes[tid]:
raise ValueError(f'Invalid node ID of type {ntype}. Must be within range [0, {self.num_nodes[tid]})')
return self.nid_offset[tid] + id
def nid_hom2het(self, id):
"""From heterogeneous node ID to homogeneous node ID"""
if id < 0 or id >= self.nid_offset[-1]:
raise ValueError(f'Invalid homogeneous node ID. Must be within range [0, self.nid_offset[-1])')
tid = bisect_left(self.nid_offset, id) - 1
# Return a pair (node_type, type_wise_id)
return self.id2ntype(tid), id - self.nid_offset[tid]
def eid_het2hom(self, etype, id):
"""From heterogeneous edge ID to homogeneous edge ID"""
tid = self.etype2id(etype)
if id < 0 or id >= self.num_edges[tid]:
raise ValueError(f'Invalid edge ID of type {etype}. Must be within range [0, {self.num_edges[tid]})')
return self.eid_offset[tid] + id
def eid_hom2het(self, id):
"""From heterogeneous edge ID to homogeneous edge ID"""
if id < 0 or id >= self.eid_offset[-1]:
raise ValueError(f'Invalid homogeneous edge ID. Must be within range [0, self.eid_offset[-1])')
tid = bisect_left(self.eid_offset, id) - 1
# Return a pair (edge_type, type_wise_id)
return self.id2etype(tid), id - self.eid_offset[tid]
After Partition Loading
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
After the partitions are loaded into trainer or server processes, the loaded
:class:`~dgl.distributed.GraphPartitionBook` provides utilities for conversion
between homogeneous IDs and heterogeneous IDs.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_per_ntype`: convert a homogeneous node ID to type-wise ID and node type ID. * :func:`~dgl.distributed.GraphPartitionBook.map_to_per_ntype`: convert a homogeneous node ID to type-wise ID and node type ID.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_per_etype`: convert a homogeneous edge ID to type-wise ID and edge type ID. * :func:`~dgl.distributed.GraphPartitionBook.map_to_per_etype`: convert a homogeneous edge ID to type-wise ID and edge type ID.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_homo_nid`: convert type-wise ID and node type to a homogeneous node ID. * :func:`~dgl.distributed.GraphPartitionBook.map_to_homo_nid`: convert type-wise ID and node type to a homogeneous node ID.
* :func:`~dgl.distributed.GraphPartitionBook.map_to_homo_eid`: convert type-wise ID and edge type to a homogeneous edge ID. * :func:`~dgl.distributed.GraphPartitionBook.map_to_homo_eid`: convert type-wise ID and edge type to a homogeneous edge ID.
Below shows an example of sampling a subgraph with :func:`~dgl.distributed.sample_neighbors` from a heterogeneous graph Because all DGL's low-level :ref:`distributed graph sampling operators
with a node type called `paper`. It first converts type-wise node IDs to homogeneous node IDs. After sampling a subgraph <api-distributed-sampling-ops>` use homogeneous IDs, DGL internally converts
from the seed nodes, it converts homogeneous node IDs and edge IDs to type-wise IDs and also stores type IDs as node data the heterogeneous IDs specified by users to homogeneous IDs before invoking
and edge data. sampling operators. Below shows an example of sampling a subgraph by
:func:`~dgl.distributed.sample_neighbors` from nodes of type ``"paper"``. It
first performs ID conversion, and after getting the sampled subgraph, converts
the homogeneous node/edge IDs back to heterogeneous ones.
.. code:: python .. code:: python
...@@ -89,5 +147,43 @@ and edge data. ...@@ -89,5 +147,43 @@ and edge data.
block.srcdata[dgl.NTYPE], block.srcdata[dgl.NID] = gpb.map_to_per_ntype(block.srcdata[dgl.NID]) block.srcdata[dgl.NTYPE], block.srcdata[dgl.NID] = gpb.map_to_per_ntype(block.srcdata[dgl.NID])
block.dstdata[dgl.NTYPE], block.dstdata[dgl.NID] = gpb.map_to_per_ntype(block.dstdata[dgl.NID]) block.dstdata[dgl.NTYPE], block.dstdata[dgl.NID] = gpb.map_to_per_ntype(block.dstdata[dgl.NID])
From node/edge type IDs, a user can retrieve node/edge types. For example, `g.ntypes[node_type_id]`. Note that getting node/edge types from type IDs is simple -- just getting them
With node/edge types and type-wise IDs, a user can retrieve node/edge data from `DistGraph` for mini-batch computation. from the ``ntypes`` attributes of a ``DistGraph``, i.e., ``g.ntypes[node_type_id]``.
Access distributed graph data
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The :class:`~dgl.distributed.DistGraph` class supports similar interface as
:class:`~dgl.DGLGraph`. Below shows an example of getting the feature data of
nodes 0, 10, 20 of type :math:`T0`. When accessing data in
:class:`~dgl.distributed.DistGraph`, a user needs to use type-wise IDs and
corresponding node types or edge types.
.. code:: python
import dgl
g = dgl.distributed.DistGraph('graph_name', part_config='data/graph_name.json')
feat = g.nodes['T0'].data['feat'][[0, 10, 20]]
A user can create distributed tensors and distributed embeddings for a
particular node type or edge type. Distributed tensors and embeddings are split
and stored in multiple machines. To create one, a user needs to specify how it
is partitioned with :class:`~dgl.distributed.PartitionPolicy`. By default, DGL
chooses the right partition policy based on the size of the first dimension.
However, if multiple node types or edge types have the same number of nodes or
edges, DGL cannot determine the partition policy automatically. A user needs to
explicitly specify the partition policy. Below shows an example of creating a
distributed tensor for node type :math:`T0` by using the partition policy for :math:`T0`
and store it as node data of :math:`T0`.
.. code:: python
g.nodes['T0'].data['feat1'] = dgl.distributed.DistTensor(
(g.number_of_nodes('T0'), 1), th.float32, 'feat1',
part_policy=g.get_node_partition_policy('T0'))
The partition policies used for creating distributed tensors and embeddings are
initialized when a heterogeneous graph is loaded into the graph server. A user
cannot create a new partition policy at runtime. Therefore, a user can only
create distributed tensors or embeddings for a node type or edge type.
Accessing distributed tensors and embeddings also requires type-wise IDs.
.. _guide-distributed-partition:
7.4 Advanced Graph Partitioning
---------------------------------------
The chapter covers some of the advanced topics for graph partitioning.
METIS partition algorithm
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
`METIS <http://glaros.dtc.umn.edu/gkhome/views/metis>`__ is a state-of-the-art
graph partitioning algorithm that can generate partitions with minimal number
of cross-partition edges, making it suitable for distributed message passing
where the amount of network communication is proportional to the number of
cross-partition edges. DGL has integrated METIS as the default partitioning
algorithm in its :func:`dgl.distributed.partition_graph` API.
Load balancing
~~~~~~~~~~~~~~~~
When partitioning a graph, by default, METIS only balances the number of nodes
in each partition. This can result in suboptimal configuration, depending on
the task at hand. For example, in the case of semi-supervised node
classification, a trainer performs computation on a subset of labeled nodes in
a local partition. A partitioning that only balances nodes in a graph (both
labeled and unlabeled), may end up with computational load imbalance. To get a
balanced workload in each partition, the partition API allows balancing between
partitions with respect to the number of nodes in each node type, by specifying
``balance_ntypes`` in :func:`~dgl.distributed.partition_graph`. Users can take
advantage of this and consider nodes in the training set, validation set and
test set are of different node types.
The following example considers nodes inside the training set and outside the
training set are two types of nodes:
.. code:: python
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
In addition to balancing the node types,
:func:`dgl.distributed.partition_graph` also allows balancing between
in-degrees of nodes of different node types by specifying ``balance_edges``.
This balances the number of edges incident to the nodes of different types.
ID mapping
~~~~~~~~~~~~~
After partitioning, :func:`~dgl.distributed.partition_graph` remap node
and edge IDs so that nodes of the same partition are aranged together
(in a consecutive ID range), making it easier to store partitioned node/edge
features. The API also automatically shuffles the node/edge features
according to the new IDs. However, some downstream tasks may want to
recover the original node/edge IDs (such as extracting the computed node
embeddings for later use). For such cases, pass ``return_mapping=True``
to :func:`~dgl.distributed.partition_graph`, which makes the API returns
the ID mappings between the remapped node/edge IDs and their origianl ones.
For a homogeneous graph, it returns two vectors. The first vector maps every new
node ID to its original ID; the second vector maps every new edge ID to
its original ID. For a heterogeneous graph, it returns two dictionaries of
vectors. The first dictionary contains the mapping for each node type; the
second dictionary contains the mapping for each edge type.
.. code:: python
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
balance_ntypes=g.ndata['train_mask'],
return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[node_map] = node_emb
Output format
~~~~~~~~~~~~~~~~~~~~~~~~~~
Regardless of the partitioning algorithm in use, the partitioned results are stored
in data files organized as follows:
.. code-block:: none
data_root_dir/
|-- graph_name.json # partition configuration file in JSON
|-- part0/ # data for partition 0
| |-- node_feats.dgl # node features stored in binary format
| |-- edge_feats.dgl # edge features stored in binary format
| |-- graph.dgl # graph structure of this partition stored in binary format
|
|-- part1/ # data for partition 1
| |-- node_feats.dgl
| |-- edge_feats.dgl
| |-- graph.dgl
|
|-- ... # data for other partitions
When distributed to a cluster, the metadata JSON should be copied to all the machines
while the ``partX`` folders should be dispatched accordingly.
DGL provides a :func:`dgl.distributed.load_partition` function to load one partition
for inspection.
.. code:: python
>>> import dgl
>>> # load partition 0
>>> part_data = dgl.distributed.load_partition('data_root_dir/graph_name.json', 0)
>>> g, nfeat, efeat, partition_book, graph_name, ntypes, etypes = part_data # unpack
>>> print(g)
Graph(num_nodes=966043, num_edges=34270118,
ndata_schemes={'orig_id': Scheme(shape=(), dtype=torch.int64),
'part_id': Scheme(shape=(), dtype=torch.int64),
'_ID': Scheme(shape=(), dtype=torch.int64),
'inner_node': Scheme(shape=(), dtype=torch.int32)}
edata_schemes={'_ID': Scheme(shape=(), dtype=torch.int64),
'inner_edge': Scheme(shape=(), dtype=torch.int8),
'orig_id': Scheme(shape=(), dtype=torch.int64)})
As mentioned in the `ID mapping`_ section, each partition carries auxiliary information
saved as ndata or edata such as original node/edge IDs, partition IDs, etc. Each partition
not only saves nodes/edges it owns, but also includes node/edges that are adjacent to
the partition (called **HALO** nodes/edges). The ``inner_node`` and ``inner_edge``
indicate whether a node/edge truely belongs to the partition (value is ``True``)
or is a HALO node/edge (value is ``False``).
The :func:`~dgl.distributed.load_partition` function loads all data at once. Users can
load features or the partition book using the :func:`dgl.distributed.load_partition_feats`
and :func:`dgl.distributed.load_partition_book` APIs respectively.
Parallel METIS partitioning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For massive graphs where parallel preprocessing is desired, DGL supports
`ParMETIS <http://glaros.dtc.umn.edu/gkhome/metis/parmetis/overview>`__ as one
of the choices of partitioning algorithms.
.. note::
Because ParMETIS does not support heterogeneous graph, users need to
conduct ID conversion before and after running ParMETIS.
Check out chapter :ref:`guide-distributed-hetero` for explanation.
.. note::
Please make sure that the input graph to ParMETIS does not have
duplicate edges (or parallel edges) and self-loop edges.
ParMETIS Installation
^^^^^^^^^^^^^^^^^^^^^^
ParMETIS requires METIS and GKLib. Please follow the instructions `here
<https://github.com/KarypisLab/GKlib>`__ to compile and install GKLib. For
compiling and install METIS, please follow the instructions below to clone
METIS with GIT and compile it with int64 support.
.. code-block:: bash
git clone https://github.com/KarypisLab/METIS.git
make config shared=1 cc=gcc prefix=~/local i64=1
make install
For now, we need to compile and install ParMETIS manually. We clone the DGL branch of ParMETIS as follows:
.. code-block:: bash
git clone --branch dgl https://github.com/KarypisLab/ParMETIS.git
Then compile and install ParMETIS.
.. code-block:: bash
make config cc=mpicc prefix=~/local
make install
Before running ParMETIS, we need to set two environment variables: ``PATH`` and ``LD_LIBRARY_PATH``.
.. code-block:: bash
export PATH=$PATH:$HOME/local/bin
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/local/lib/
Input format
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note::
As a prerequisite, read chapter :doc:`guide-distributed-hetero` to understand
how DGL organize heterogeneous graph for distributed training.
The input graph for ParMETIS is stored in three files with the following names:
``xxx_nodes.txt``, ``xxx_edges.txt`` and ``xxx_stats.txt``, where ``xxx`` is a
graph name.
Each row in ``xxx_nodes.txt`` stores the information of a node. Row ID is
also the *homogeneous* ID of a node, e.g., row 0 is for node 0; row 1 is for
node 1, etc. Each row has the following format:
.. code-block:: none
<node_type_id> <node_weight_list> <type_wise_node_id>
All fields are separated by whitespace:
* ``<node_type_id>`` is an integer starting from 0. Each node type is mapped to
an integer. For a homogeneous graph, its value is always 0.
* ``<node_weight_list>`` are integers (separated by whitespace) that indicate
the node weights used by ParMETIS to balance graph partitions. For homogeneous
graphs, the list has only one integer while for heterogeneous graphs with
:math:`T` node types, the list should has :math:`T` integers. If the node
belongs to node type :math:`t`, then all the integers except the :math:`t^{th}`
one are zero; the :math:`t^{th}` integer is the weight of that node. ParMETIS
will try to balance the total node weight of each partition. For heterogeneous
graph, it will try to distribute nodes of the same type to all partitions.
The recommended node weights are 1 for balancing the number of nodes in each
partition or node degrees for balancing the number of edges in each partition.
* ``<type_wise_node_id>`` is an integer representing the node ID in its own type.
Below shows an example of a node file for a heterogeneous graph with two node
types. Node type 0 has three nodes; node type 1 has four nodes. It uses two
node weights to ensure that ParMETIS will generate partitions with roughly the
same number of nodes for type 0 and the same number of nodes for type 1.
.. code-block:: none
0 1 0 0
0 1 0 1
0 1 0 2
1 0 1 0
1 0 1 1
1 0 1 2
1 0 1 3
Similarly, each row in ``xxx_edges.txt`` stores the information of an edge. Row ID is
also the *homogeneous* ID of an edge, e.g., row 0 is for edge 0; row 1 is for
edge 1, etc. Each row has the following format:
.. code-block:: none
<src_node_id> <dst_node_id> <type_wise_edge_id> <edge_type_id>
All fields are separated by whitespace:
* ``<src_node_id>`` is the *homogeneous* ID of the source node.
* ``<dst_node_id>`` is the *homogeneous* ID of the destination node.
* ``<type_wise_edge_id>`` is the edge ID for the edge type.
* ``<edge_type_id>`` is an integer starting from 0. Each edge type is mapped to
an integer. For a homogeneous graph, its value is always 0.
``xxx_stats.txt`` stores some basic statistics of the graph. It has only one line with three fields
separated by whitespace:
.. code-block:: none
<num_nodes> <num_edges> <total_node_weights>
* ``num_nodes`` stores the total number of nodes regardless of node types.
* ``num_edges`` stores the total number of edges regardless of edge types.
* ``total_node_weights`` stores the number of node weights in the node file.
Run ParMETIS and output format
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ParMETIS contains a command called ``pm_dglpart``, which loads the graph stored
in the three files from the machine where ``pm_dglpart`` is invoked, distributes
data to all machines in the cluster and invokes ParMETIS to partition the
graph. When it completes, it generates three files for each partition:
``p<part_id>-xxx_nodes.txt``, ``p<part_id>-xxx_edges.txt``,
``p<part_id>-xxx_stats.txt``.
.. note::
ParMETIS reassigns IDs to nodes during the partitioning. After ID reassignment,
the nodes in a partition are assigned with contiguous IDs; furthermore, the nodes of
the same type are assigned with contiguous IDs.
``p<part_id>-xxx_nodes.txt`` stores the node data of the partition. Each row represents
a node with the following fields:
.. code-block:: none
<node_id> <node_type_id> <node_weight_list> <type_wise_node_id>
* ``<node_id>`` is the *homogeneous* node ID after ID reassignment.
* ``<node_type_id>`` is the node type ID.
* ``<node_weight_list>`` is the node weight used by ParMETIS (copied from the input file).
* ``<type_wise_node_id>`` is an integer representing the node ID in its own type.
``p<part_id>-xxx_edges.txt`` stores the edge data of the partition. Each row represents
an edge with the following fields:
.. code-block:: none
<src_id> <dst_id> <orig_src_id> <orig_dst_id> <type_wise_edge_id> <edge_type_id>
* ``<src_id>`` is the *homogeneous* ID of the source node after ID reassignment.
* ``<dst_id>`` is the *homogeneous* ID of the destination node after ID reassignment.
* ``<orig_src_id>`` is the *homogeneous* ID of the source node in the input graph.
* ``<orig_dst_id>`` is the *homogeneous* ID of the destination node in the input graph.
* ``<type_wise_edge_id>`` is the edge ID in its own type.
* ``<edge_type_id>`` is the edge type ID.
When invoking ``pm_dglpart``, the three input files: ``xxx_nodes.txt``,
``xxx_edges.txt``, ``xxx_stats.txt`` should be located in the directory where
``pm_dglpart`` runs. The following command run four ParMETIS processes to
partition the graph named ``xxx`` into eight partitions (each process handles
two partitions).
.. code-block:: bash
mpirun -np 4 pm_dglpart xxx 2
The output files from ParMETIS then need to be converted to the
:ref:`partition assignment format <guide-distributed-prep-partition>` to in
order to run subsequent preprocessing steps.
.. _guide-distributed-preprocessing: .. _guide-distributed-preprocessing:
7.1 Preprocessing for Distributed Training 7.1 Data Preprocessing
------------------------------------------ ------------------------------------------
:ref:`(中文版) <guide_cn-distributed-preprocessing>` Before launching training jobs, DGL requires the input data to be partitioned
and distributed to the target machines. For relatively small graphs, DGL
DGL requires to preprocess the graph data for distributed training. This includes two steps: provides a partitioning API :func:`~dgl.distributed.partition_graph` that
1) partition a graph into subgraphs, 2) assign nodes/edges with new IDs. For relatively small partitions an in-memory :class:`~dgl.DGLGraph` object. It supports
graphs, DGL provides a partitioning API :func:`dgl.distributed.partition_graph` that performs multiple partitioning algorithms such as random partitioning and
the two steps above. The API runs on one machine. Therefore, if a graph is large, users will `Metis <http://glaros.dtc.umn.edu/gkhome/views/metis>`__.
need a large machine to partition a graph when using this API. In addition to this API, we also The benefit of Metis partitioning is that it can generate partitions with
provide a solution to partition a large graph in a cluster of machines below (see Section 7.1.1). minimal edge cuts to reduce network communication for distributed training and
inference. DGL uses the latest version of Metis with the options optimized for
:func:`dgl.distributed.partition_graph` supports both random partitioning the real-world graphs with power-law distribution. After partitioning, the API
and a `Metis <http://glaros.dtc.umn.edu/gkhome/views/metis>`__-based partitioning. constructs the partitioned results in a format that is easy to load during the
The benefit of Metis partitioning is that it can generate training. For example,
partitions with minimal edge cuts to reduce network communication for distributed training
and inference. DGL uses the latest version of Metis with the options optimized for the real-world
graphs with power-law distribution. After partitioning, the API constructs the partitioned results
in a format that is easy to load during the training.
By default, the partition API assigns new IDs to the nodes and edges in the input graph to help locate
nodes/edges during distributed training/inference. After assigning IDs, the partition API shuffles
all node data and edge data accordingly. After generating partitioned subgraphs, each subgraph is stored
as a ``DGLGraph`` object. The original node/edge IDs before reshuffling are stored in the field of
'orig_id' in the node/edge data of the subgraphs. The node data `dgl.NID` and the edge data `dgl.EID`
of the subgraphs store new node/edge IDs of the full graph after nodes/edges reshuffle.
During the training, users just use the new node/edge IDs.
The partitioned results are stored in multiple files in the output directory. It always contains
a JSON file called xxx.json, where xxx is the graph name provided to the partition API. The JSON file
contains all the partition configurations. If the partition API does not assign new IDs to nodes and edges,
it generates two additional Numpy files: `node_map.npy` and `edge_map.npy`, which stores the mapping between
node/edge IDs and partition IDs. The Numpy arrays in the two files are large for a graph with billions of
nodes and edges because they have an entry for each node and edge in the graph. Inside the folders for
each partition, there are three files that store the partition data in the DGL format. `graph.dgl` stores
the graph structure of the partition as well as some metadata on nodes and edges. `node_feats.dgl` and
`edge_feats.dgl` stores all features of nodes and edges that belong to the partition.
.. code-block:: none .. code-block:: python
data_root_dir/
|-- xxx.json # partition configuration file in JSON
|-- node_map.npy # partition id of each node stored in a numpy array (optional)
|-- edge_map.npy # partition id of each edge stored in a numpy array (optional)
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format
|-- edge_feats.dgl # edge features stored in binary format
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
Load balancing
~~~~~~~~~~~~~~
When partitioning a graph, by default, Metis only balances the number of nodes in each partition.
This can result in suboptimal configuration, depending on the task at hand. For example, in the case
of semi-supervised node classification, a trainer performs computation on a subset of labeled nodes in
a local partition. A partitioning that only balances nodes in a graph (both labeled and unlabeled), may
end up with computational load imbalance. To get a balanced workload in each partition, the partition API
allows balancing between partitions with respect to the number of nodes in each node type, by specifying
``balance_ntypes`` in :func:`dgl.distributed.partition_graph`. Users can take advantage of this and consider
nodes in the training set, validation set and test set are of different node types.
The following example considers nodes inside the training set and outside the training set are two types of nodes:
.. code:: python
dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', balance_ntypes=g.ndata['train_mask'])
In addition to balancing the node types, :func:`dgl.distributed.partition_graph` also allows balancing
between in-degrees of nodes of different node types by specifying ``balance_edges``. This balances
the number of edges incident to the nodes of different types.
**Note**: The graph name passed to :func:`dgl.distributed.partition_graph` is an important argument.
The graph name will be used by :class:`dgl.distributed.DistGraph` to identify a distributed graph.
A legal graph name should only contain alphabetic characters and underscores.
ID mapping
~~~~~~~~~~
:func:`dgl.distributed.partition_graph` shuffles node IDs and edge IDs during the partitioning and shuffles
node data and edge data accordingly. After training, we may need to save the computed node embeddings for
any downstream tasks. Therefore, we need to reshuffle the saved node embeddings according to their original
IDs.
When `return_mapping=True`, :func:`dgl.distributed.partition_graph` returns the mappings between shuffled
node/edge IDs and their original IDs. For a homogeneous graph, it returns two vectors. The first
vector maps every shuffled node ID to its original ID; the second vector maps every shuffled edge ID to its
original ID. For a heterogeneous graph, it returns two dictionaries of vectors. The first dictionary contains
the mapping for each node type; the second dictionary contains the mapping for each edge type.
.. code:: python
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
balance_ntypes=g.ndata['train_mask'],
return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_node_emb[node_map] = node_emb
7.1.1 Distributed partitioning
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
For a large graph, DGL uses `ParMetis <http://glaros.dtc.umn.edu/gkhome/metis/parmetis/overview>`__ to partition import dgl
a graph in a cluster of machines. This solution requires users to prepare data for ParMETIS and use a DGL script
`tools/convert_partition.py` to construct :class:`dgl.DGLGraph` for the partitions output by ParMETIS.
**Note**: `convert_partition.py` uses the `pyarrow` package to load csv files. Please install `pyarrow`. g = ... # create or load a DGLGraph object
dgl.distributed.partition_graph(g, 'mygraph', 2, 'data_root_dir')
ParMETIS Installation will outputs the following data file.
~~~~~~~~~~~~~~~~~~~~~
ParMETIS requires METIS and GKLib. Please follow the instructions `here <https://github.com/KarypisLab/GKlib>`__
to compile and install GKLib. For compiling and install METIS, please follow the instructions below to
clone METIS with GIT and compile it with int64 support.
.. code-block:: none .. code-block:: none
git clone https://github.com/KarypisLab/METIS.git data_root_dir/
make config shared=1 cc=gcc prefix=~/local i64=1 |-- mygraph.json # metadata JSON. File name is the given graph name.
make install |-- part0/ # data for partition 0
| |-- node_feats.dgl # node features stored in binary format
| |-- edge_feats.dgl # edge features stored in binary format
For now, we need to compile and install ParMETIS manually. We clone the DGL branch of ParMETIS as follows: | |-- graph.dgl # graph structure of this partition stored in binary format
|
.. code-block:: none |-- part1/ # data for partition 1
|-- node_feats.dgl
git clone --branch dgl https://github.com/KarypisLab/ParMETIS.git |-- edge_feats.dgl
|-- graph.dgl
Then compile and install ParMETIS.
Chapter :ref:`guide-distributed-partition` covers more details about the
.. code-block:: none partition format. To distribute the partitions to a cluster, users can either save
the data in some shared folder accessible by all machines, or copy the metadata
make config cc=mpicc prefix=~/local JSON as well as the corresponding partition folder ``partX`` to the X^th machine.
make install
Using :func:`~dgl.distributed.partition_graph` requires an instance with large enough
Before running ParMETIS, we need to set two environment variables: `PATH` and `LD_LIBRARY_PATH`. CPU RAM to hold the entire graph structure and features, which may not be viable for
graphs with hundreds of billions of edges or large features. We describe how to use
.. code-block:: none the *parallel data preparation pipeline* for such cases next.
export PATH=$PATH:$HOME/local/bin Parallel Data Preparation Pipeline
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/local/lib/ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Input format for ParMETIS To handle massive graph data that cannot fit in the CPU RAM of a
~~~~~~~~~~~~~~~~~~~~~~~~~ single machine, DGL utilizes data chunking and parallel processing to reduce
memory footprint and running time. The figure below illustrates the
The input graph for ParMETIS is stored in three files with the following names: `xxx_nodes.txt`, pipeline:
`xxx_edges.txt` and `xxx_stats.txt`, where `xxx` is a graph name.
.. figure:: https://data.dgl.ai/asset/image/guide_7_distdataprep.png
Each row in `xxx_nodes.txt` stores the information of a node with the following format:
* The pipeline takes input data stored in *Chunked Graph Format* and
.. code-block:: none produces and dispatches data partitions to the target machines.
* **Step.1 Graph Partitioning:** It calculates the ownership of each partition
<node_type> <weight1> ... <orig_type_node_id> <attributes> and saves the results as a set of files called *partition assignment*.
To speedup the step, some algorithms (e.g., ParMETIS) support parallel computing
All fields are separated by whitespace: using multiple machines.
* **Step.2 Data Dispatching:** Given the partition assignment, the step then
* `<node_type>` is an integer. For a homogeneous graph, its value is always 0. For heterogeneous graphs, physically partitions the graph data and dispatches them to the machines user
its value indicates the type of each node. specified. It also converts the graph data into formats that are suitable for
* `<weight1>`, `<weight2>`, etc are integers that indicate the node weights used by ParMETIS to balance distributed training and evaluation.
graph partitions. If a user does not provide node weights, ParMETIS partitions a graph and balance
the number of nodes in each partition (it is important to balance graph partitions in order to achieve The whole pipeline is modularized so that each step can be invoked
good training speed). However, this default strategy may not be sufficient for many use cases. individually. For example, users can replace Step.1 with some custom graph partition
For example, in a heterogeneous graph, we want to partition the graph so that all partitions have algorithm as long as it produces partition assignment files
roughly the same number of nodes for each node type. The toy example below shows how we can use correctly.
node weights to balance the number of nodes of different types.
* `<orig_type_node_id>` is an integer representing the node ID in its own type. In DGL, nodes of each type .. _guide-distributed-prep-chunk:
are assigned with IDs starting from 0. For a homogeneous graph, this field is the same as the node ID. Chunked Graph Format
* `<attributes>` are optional fields. They can be used to store any values and ParMETIS does not interpret ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
these fields. Potentially, we can store the node features and edge features in these fields for
homogeneous graphs. To run the pipeline, DGL requires the input graph to be stored in multiple data
* The row ID indicates the *homogeneous* ID of nodes in a graph (all nodes are assigned with a unique ID). chunks. Each data chunk is the unit of data preprocessing and thus should fit
All nodes of the same type should be assigned with contiguous IDs. That is, nodes of the same type should into CPU RAM. In this section, we use the MAG240M-LSC data from `Open Graph
be stored together in `xxx_nodes.txt`. Benchmark <https://ogb.stanford.edu/docs/lsc/mag240m/>`__ as an example to
describe the overall design, followed by a formal specification and
Below shows an example of a node file for a heterogeneous graph with two node types. Node type 0 has three tips for creating data in such format.
nodes; node type 1 has four nodes. It uses two node weights to ensure that ParMETIS will generate partitions
with roughly the same number of nodes for type 0 and the same number of nodes for type 1. Example: MAG240M-LSC
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The MAG240M-LSC graph is a heterogeneous academic graph
extracted from the Microsoft Academic Graph (MAG), whose schema diagram is
illustrated below:
.. figure:: https://data.dgl.ai/asset/image/guide_7_mag240m.png
Its raw data files are organized as follows:
.. code-block:: none .. code-block:: none
0 1 0 0 /mydata/MAG240M-LSC/
0 1 0 1 |-- meta.pt # # A dictionary of the number of nodes for each type saved by torch.save,
0 1 0 2 | # as well as num_classes
1 0 1 0 |-- processed/
1 0 1 1 |-- author___affiliated_with___institution/
1 0 1 2 | |-- edge_index.npy # graph, 713 MB
1 0 1 3 |
|-- paper/
Similarly, each row in `xxx_edges.txt` stores the information of an edge with the following format: | |-- node_feat.npy # feature, 187 GB, (numpy memmap format)
| |-- node_label.npy # label, 974 MB
| |-- node_year.npy # year, 974 MB
|
|-- paper___cites___paper/
| |-- edge_index.npy # graph, 21 GB
|
|-- author___writes___paper/
|-- edge_index.npy # graph, 6GB
The graph has three node types (``"paper"``, ``"author"`` and ``"institution"``),
three edge types/relations (``"cites"``, ``"writes"`` and ``"affiliated_with"``). The
``"paper"`` nodes have three attributes (``"feat"``, ``"label"``, ``"year"'``), while
other types of nodes and edges are featureless. Below shows the data files when
it is stored in DGL Chunked Graph Format:
.. code-block:: none .. code-block:: none
<src_id> <dst_id> <type_edge_id> <edge_type> <attributes> /mydata/MAG240M-LSC_chunked/
|-- metadata.json # metadata json file
All fields are separated by whitespace: |-- edges/ # stores edge ID data
| |-- writes-part1.csv
* `<src_id>` is the *homogeneous* ID of the source node. | |-- writes-part2.csv
* `<dst_id>` is the *homogeneous* ID of the destination node. | |-- affiliated_with-part1.csv
* `<type_edge_id>` is the edge ID for the edge type. | |-- affiliated_with-part2.csv
* `<edge_type>` is the edge type. | |-- cites-part1.csv
* `<attributes>` are optional fields. They can be used to store any values and ParMETIS does not | |-- cites-part1.csv
interpret these fields. |
|-- node_data/ # stores node feature data
**Note**: please make sure that there are no duplicated edges and self-loop edges in the edge file. |-- paper-feat-part1.npy
|-- paper-feat-part2.npy
`xxx_stats.txt` stores some basic statistics of the graph. It has only one line with three fields |-- paper-label-part1.npy
separated by whitespace: |-- paper-label-part2.npy
|-- paper-year-part1.npy
.. code-block:: none |-- paper-year-part2.npy
<num_nodes> <num_edges> <num_node_weights> All the data files are chunked into two parts, including the edges of each relation
(e.g., writes, affiliates, cites) and node features. If the graph has edge features,
* `num_nodes` stores the total number of nodes regardless of node types. they will be chunked into multiple files too. All ID data are stored in
* `num_edges` stores the total number of edges regardless of edge types. CSV (we will illustrate the contents soon) while node features are stored in
* `num_node_weights` stores the number of node weights in the node file. numpy arrays.
Run ParMETIS and output formats The ``metadata.json`` stores all the metadata information such as file names
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ and chunk sizes (e.g., number of nodes, number of edges).
ParMETIS contains a command called `pm_dglpart`, which loads the graph stored in the three .. code-block:: python
files from the machine where `pm_dglpart` is invoked, distributes data to all machines in
the cluster and invokes ParMETIS to partition the graph. When it completes, it generates
three files for each partition: `p<part_id>-xxx_nodes.txt`, `p<part_id>-xxx_edges.txt`,
`p<part_id>-xxx_stats.txt`.
**Note**: ParMETIS reassigns IDs to nodes during the partitioning. After ID reassignment,
the nodes in a partition are assigned with contiguous IDs; furthermore, the nodes of
the same type are assigned with contiguous IDs.
`p<part_id>-xxx_nodes.txt` stores the node data of the partition. Each row represents
a node with the following fields:
.. code-block:: none {
"graph_name" : "MAG240M-LSC", # given graph name
<node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes> "node_type": ["author", "paper", "institution"],
"num_nodes_per_chunk": [
* `<node_id>` is the *homogeneous* node IDs after ID reassignment. [61191556, 61191556], # number of author nodes per chunk
* `<node_type>` is the node type. [61191553, 61191552], # number of paper nodes per chunk
* `<weight1>` is the node weight used by ParMETIS. [12861, 12860] # number of institution nodes per chunk
* `<orig_type_node_id>` is the original node ID for a specific node type in the input heterogeneous graph. ],
* `<attributes>` are optional fields that contain any node attributes in the input node file. # The edge type name is a colon-joined string of source, edge, and destination type.
"edge_type": [
"author:writes:paper",
"author:affiliated_with:institution",
"paper:cites:paper"
],
"num_edges_per_chunk": [
[193011360, 193011360], # number of author:writes:paper edges per chunk
[22296293, 22296293], # number of author:affiliated_with:institution edges per chunk
[648874463, 648874463] # number of paper:cites:paper edges per chunk
],
"edges" : {
"author:write:paper" : { # edge type
"format" : {"name": "csv", "delimiter": " "},
# The list of paths. Can be relative or absolute.
"data" : ["edges/writes-part1.csv", "edges/writes-part2.csv"]
},
"author:affiliated_with:institution" : {
"format" : {"name": "csv", "delimiter": " "},
"data" : ["edges/affiliated_with-part1.csv", "edges/affiliated_with-part2.csv"]
},
"author:affiliated_with:institution" : {
"format" : {"name": "csv", "delimiter": " "},
"data" : ["edges/cites-part1.csv", "edges/cites-part2.csv"]
}
},
"node_data" : {
"paper": { # node type
"feat": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-feat-part1.npy", "node_data/paper-feat-part2.npy"]
},
"label": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-label-part1.npy", "node_data/paper-label-part2.npy"]
},
"year": { # feature key
"format": {"name": "numpy"},
"data": ["node_data/paper-year-part1.npy", "node_data/paper-year-part2.npy"]
}
}
},
"edge_data" : {} # MAG240M-LSC does not have edge features
}
`p<part_id>-xxx_edges.txt` stores the edge data of the partition. Each row represents There are three parts in ``metadata.json``:
an edge with the following fields:
* Graph schema information and chunk sizes, e.g., ``"node_type"`` , ``"num_nodes_per_chunk"``, etc.
* Edge index data under key ``"edges"``.
* Node/edge feature data under keys ``"node_data"`` and ``"edge_data"``.
The edge index files contain edges in the form of node ID pairs:
.. code-block:: bash
# writes-part1.csv
0 0
0 1
0 20
0 29
0 1203
...
Specification
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In general, a chunked graph data folder just needs a ``metadata.json`` and a
bunch of data files. The folder structure in the MAG240M-LSC example is not a
strict requirement as long as ``metadata.json`` contains valid file paths.
``metadata.json`` top-level keys:
* ``graph_name``: String. Unique name used by :class:`dgl.distributed.DistGraph`
to load graph.
* ``node_type``: List of string. Node type names.
* ``num_nodes_per_chunk``: List of list of integer. For graphs with :math:`T` node
types stored in :math:`P` chunks, the value contains :math:`T` integer lists.
Each list contains :math:`P` integers, which specify the number of nodes
in each chunk.
* ``edge_type``: List of string. Edge type names in the form of
``<source node type>:<relation>:<destination node type>``.
* ``num_edges_per_chunk``: List of list of integer. For graphs with :math:`R` edge
types stored in :math:`P` chunks, the value contains :math:`R` integer lists.
Each list contains :math:`P` integers, which specify the number of edges
in each chunk.
* ``edges``: Dict of ``ChunkFileSpec``. Edge index files.
Dictionary keys are edge type names in the form of
``<source node type>:<relation>:<destination node type>``.
* ``node_data``: Dict of ``ChunkFileSpec``. Data files that store node attributes.
Dictionary keys are node type names.
* ``edge_data``: Dict of ``ChunkFileSpec``. Data files that store edge attributes.
Dictionary keys are edge type names in the form of
``<source node type>:<relation>:<destination node type>``.
``ChunkFileSpec`` has two keys:
* ``format``: File format. Depending on the format ``name``, users can configure more
details about how to parse each data file.
- ``"csv"``: CSV file. Use the ``delimiter`` key to specify delimiter in use.
- ``"numpy"``: NumPy array binary file created by :func:`numpy.save`.
* ``data``: List of string. File path to each data chunk. Support absolute path.
Tips for making chunked graph data
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Depending on the raw data, the implementation could include:
* Construct graphs out of non-structured data such as texts or tabular data.
* Augment or transform the input graph struture or features. E.g., adding reverse
or self-loop edges, normalizing features, etc.
* Chunk the input graph structure and features into multiple data files so that
each one can fit in CPU RAM for subsequent preprocessing steps.
To avoid running into out-of-memory error, it is recommended to process graph
structures and feature data separately. Processing one chunk at a time can also
reduce the maximal runtime memory footprint. As an example, DGL provides a
`tools/chunk_graph.py
<https://github.com/dmlc/dgl/blob/master/tools/chunk_graph.py>`_ script that
chunks an in-memory feature-less :class:`~dgl.DGLGraph` and feature tensors
stored in :class:`numpy.memmap`.
.. _guide-distributed-prep-partition:
Step.1 Graph Partitioning
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This step reads the chunked graph data and calculates which partition each node
should belong to. The results are saved in a set of *partition assignment files*.
For example, to randomly partition MAG240M-LSC to two parts, run the
``partition_algo/random.py`` script in the ``tools`` folder:
.. code-block:: bash
python /my/repo/dgl/tools/partition_algo/random.py
--metadata /mydata/MAG240M-LSC_chunked/metadata.json
--output_path /mydata/MAG240M-LSC_2parts/
--num_partitions 2
, which outputs files as follows:
.. code-block:: none .. code-block:: none
<src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_type_edge_id> <edge_type> <attributes> MAG240M-LSC_2parts/
|-- paper.txt
* `<src_id>` is the *homogeneous* ID of the source node after ID reassignment. |-- author.txt
* `<dst_id>` is the *homogeneous* ID of the destination node after ID reassignment. |-- institution.txt
* `<orig_src_id>` is the *homogeneous* ID of the source node in the input graph.
* `<orig_dst_id>` is the *homogeneous* ID of the destination node in the input graph.
* `<orig_type_edge_id>` is the edge ID for the specific edge type in the input graph.
* `<edge_type>` is the edge type.
* `<attributes>` are optional fields that contain any edge attributes in the input edge file.
When invoking `pm_dglpart`, the three input files: `xxx_nodes.txt`, `xxx_edges.txt`, `xxx_stats.txt` Each file stores the partition assignment of the corresponding node type.
should be located in the directory where `pm_dglpart` runs. The following command run four ParMETIS The contents are the partition ID of each node stored in lines, i.e., line i is
processes to partition the graph named `xxx` into eight partitions (each process handles two partitions). the partition ID of node i.
.. code-block:: none .. code-block:: bash
mpirun -np 4 pm_dglpart xxx 2 # paper.txt
0
Convert ParMETIS outputs to DGLGraph 1
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 1
0
DGL provides a script named `convert_partition.py`, located in the `tools` directory, to convert the data 0
in the partition files into :class:`dgl.DGLGraph` objects and save them into files. 1
**Note**: `convert_partition.py` runs in a single machine. In the future, we will extend it to convert 0
graph data in parallel across multiple machines. **Note**: please install the `pyarrow` package ...
for loading data in csv files.
`convert_partition.py` has the following arguments:
* `--input-dir INPUT_DIR` specifies the directory that contains the partition files generated by ParMETIS.
* `--graph-name GRAPH_NAME` specifies the graph name.
* `--schema SCHEMA` provides a file that specifies the schema of the input heterogeneous graph.
The schema file is a JSON file that lists node types and edge types as well as homogeneous ID ranges
for each node type and edge type.
* `--num-parts NUM_PARTS` specifies the number of partitions.
* `--num-node-weights NUM_NODE_WEIGHTS` specifies the number of node weights used by ParMETIS
to balance partitions.
* `[--workspace WORKSPACE]` is an optional argument that specifies a workspace directory to
store some intermediate results.
* `[--node-attr-dtype NODE_ATTR_DTYPE]` is an optional argument that specifies the data type of
node attributes in the remaining fields `<attributes>` of the node files.
* `[--edge-attr-dtype EDGE_ATTR_DTYPE]` is an optional argument that specifies the data type of
edge attributes in the remaining fields `<attributes>` of the edge files.
* `--output OUTPUT` specifies the output directory that stores the partition results.
`convert_partition.py` outputs files as below:
.. code-block:: none .. note::
data_root_dir/ DGL currently requires the number of data chunks and the number of partitions to be the same.
|-- xxx.json # partition configuration file in JSON
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format (optional)
|-- edge_feats.dgl # edge features stored in binary format (optional)
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
**Note**: if the data type of node attributes or edge attributes is specified, `convert_partition.py`
assumes all nodes/edges of any types have exactly these attributes. Therefore, if
nodes or edges of different types contain different numbers of attributes, users need to construct
them manually.
Below shows an example of the schema of the OGBN-MAG graph for `convert_partition.py`. It has two fields:
"nid" and "eid". Inside "nid", it lists all node types and the homogeneous ID ranges for each node type;
inside "eid", it lists all edge types and the homogeneous ID ranges for each edge type.
.. code-block:: none Despite its simplicity, random partitioning may result in frequent
cross-machine communication. Check out chapter
:ref:`guide-distributed-partition` for more advanced options.
{ Step.2 Data Dispatching
"nid": { ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
"author": [
0,
1134649
],
"field_of_study": [
1134649,
1194614
],
"institution": [
1194614,
1203354
],
"paper": [
1203354,
1939743
]
},
"eid": {
"affiliated_with": [
0,
1043998
],
"writes": [
1043998,
8189658
],
"rev-has_topic": [
8189658,
15694736
],
"rev-affiliated_with": [
15694736,
16738734
],
"cites": [
16738734,
22155005
],
"has_topic": [
22155005,
29660083
],
"rev-cites": [
29660083,
35076354
],
"rev-writes": [
35076354,
42222014
]
}
}
Below shows the demo code to construct the schema file. DGL provides a ``dispatch_data.py`` script to physically partition the data and
dispatch partitions to each training machines. It will also convert the data
once again to data objects that can be loaded by DGL training processes
efficiently. The entire step can be further accelerated using multi-processing.
.. code-block:: none .. code-block:: bash
nid_ranges = {} python /myrepo/dgl/tools/dispatch_data.py \
eid_ranges = {} --in-dir /mydata/MAG240M-LSC_chunked/ \
for ntype in hg.ntypes: --partitions-dir /mydata/MAG240M-LSC_2parts/ \
ntype_id = hg.get_ntype_id(ntype) --out-dir data/MAG_LSC_partitioned \
nid = th.nonzero(g.ndata[dgl.NTYPE] == ntype_id, as_tuple=True)[0] --ip-config ip_config.txt
nid_ranges[ntype] = [int(nid[0]), int(nid[-1] + 1)]
for etype in hg.etypes:
etype_id = hg.get_etype_id(etype)
eid = th.nonzero(g.edata[dgl.ETYPE] == etype_id, as_tuple=True)[0]
eid_ranges[etype] = [int(eid[0]), int(eid[-1] + 1)]
with open('mag.json', 'w') as outfile:
json.dump({'nid': nid_ranges, 'eid': eid_ranges}, outfile, indent=4)
Construct node/edge features for a heterogeneous graph
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
:class:`dgl.DGLGraph` output by `convert_partition.py` stores a heterogeneous graph partition
as a homogeneous graph. Its node data contains a field called `orig_id` to store the node IDs
of a specific node type in the original heterogeneous graph and a field of `NTYPE` to store
the node type. In addition, it contains node data called `inner_node` that indicates
whether a node in the graph partition is assigned to the partition. If a node is assigned
to the partition, `inner_node` has 1; otherwise, its value is 0. Note: a graph partition
also contains some HALO nodes, which are assigned to other partitions but are connected with
some edges in this graph partition. By using this information, we can construct node features
for each node type separately and store them in a dictionary whose keys are
`<node_type>/<feature_name>` and values are node feature tensors. The code below illustrates
the construction of node feature dictionary. After the dictionary of tensors are constructed,
they are saved into a file.
.. code-block:: none * ``--in-dir`` specifies the path to the folder of the input chunked graph data produced
* ``--partitions-dir`` specifies the path to the partition assignment folder produced by Step.1.
* ``--out-dir`` specifies the path to stored the data partition on each machine.
* ``--ip-config`` specifies the IP configuration file of the cluster.
node_data = {} An example IP configuration file is as follows:
for ntype in hg.ntypes:
local_node_idx = th.logical_and(part.ndata['inner_node'].bool(),
part.ndata[dgl.NTYPE] == hg.get_ntype_id(ntype))
local_nodes = part.ndata['orig_id'][local_node_idx].numpy()
for name in hg.nodes[ntype].data:
node_data[ntype + '/' + name] = hg.nodes[ntype].data[name][local_nodes]
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['node_feats'], node_data)
We can construct the edge features in a very similar way. The only difference is that .. code-block:: bash
all edges in the :class:`dgl.DGLGraph` object belong to the partition. So the construction
is even simpler.
.. code-block:: none 172.31.19.1
172.31.23.205
edge_data = {} During data dispatching, DGL assumes that the combined CPU RAM of the cluster
for etype in hg.etypes: is able to hold the entire graph data. Moreover, the number of machines (IPs) must be the
local_edges = subg.edata['orig_id'][subg.edata[dgl.ETYPE] == hg.get_etype_id(etype)] same as the number of partitions. Node ownership is determined by the result
for name in hg.edges[etype].data: of partitioning algorithm where as for edges the owner of the destination node
edge_data[etype + '/' + name] = hg.edges[etype].data[name][local_edges] also owns the edge as well.
dgl.data.utils.save_tensors(metadata['part-{}'.format(part_id)]['edge_feats'], edge_data)
.. _guide-distributed-tools: .. _guide-distributed-tools:
7.4 Tools for launching distributed training/inference 7.2 Tools for launching distributed training/inference
------------------------------------------------------ ------------------------------------------------------
:ref:`(中文版) <guide_cn-distributed-tools>` DGL provides a launching script ``launch.py`` under
`dgl/tools <https://github.com/dmlc/dgl/tree/master/tools>`__ to launch a distributed
training job in a cluster. This script makes the following assumptions:
DGL provides two scripts to assist in distributed training: * The partitioned data and the training script have been provisioned to the cluster or
a shared storage (e.g., NFS) accessible to all the worker machines.
* *tools/copy_files.py* for copying graph partitions to a graph, * The machine that invokes ``launch.py`` has passwordless ssh access
* *tools/launch.py* for launching a distributed training job in a cluster of machines. to all other machines. The launching machine must be one of the worker machines.
*copy_files.py* copies partitioned data and related files (e.g., training script)
in a machine (where the graph is partitioned) to a cluster of machines (where the distributed
training occurs). The script copies a partition to a machine where the distributed training job
will require the partition. The script contains four arguments:
* ``--part_config`` specifies the partition configuration file that contains the information
of the partitioned data in the local machine.
* ``--ip_config`` specifies the IP configuration file of the cluster.
* ``--workspace`` specifies the directory in the training machines where all data related
to distributed training are stored.
* ``--rel_data_path`` specifies the relative path under the workspace directory where
the partitioned data will be stored.
* ``--script_folder`` specifies the relative path under the workspace directory where
user's training scripts are stored.
**Note**: *copy_files.py* finds the right machine to store a partition based on the IP
configuration file. Therefore, the same IP configuration file should be used by copy_files.py
and launch.py.
DGL provides tools/launch.py to launch a distributed training job in a cluster.
This script makes the following assumptions:
* The partitioned data and the training script have been copied to the cluster or
a global storage (e.g., NFS) accessible to all machines in the cluster.
* The master machine (where the launch script is executed) has passwordless ssh access
to all other machines.
**Note**: The launch script has to be invoked on one of the machines in the cluster.
Below shows an example of launching a distributed training job in a cluster. Below shows an example of launching a distributed training job in a cluster.
.. code:: none .. code:: bash
python3 tools/launch.py \ python3 tools/launch.py \
--workspace ~graphsage/ \ --workspace /my/workspace/ \
--num_trainers 2 \ --num_trainers 2 \
--num_samplers 4 \ --num_samplers 4 \
--num_servers 1 \ --num_servers 1 \
--part_config data/ogb-product.json \ --part_config data/mygraph.json \
--ip_config ip_config.txt \ --ip_config ip_config.txt \
"python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1" "python3 my_train_script.py"
The configuration file *ip_config.txt* contains the IP addresses of the machines in a cluster. The argument specifies the workspace path, where to find the partition metadata JSON
A typical example of *ip_config.txt* is as follows: and machine IP configurations, how many trainer, sampler, and server processes to be launched
on each machine. The last argument is the command to launch which is usually the
model training/evaluation script.
Each line of ``ip_config.txt`` is the IP address of a machine in the cluster.
Optionally, the IP address can be followed by a network port (default is ``30050``).
A typical example is as follows:
.. code:: none .. code:: none
...@@ -62,61 +41,70 @@ A typical example of *ip_config.txt* is as follows: ...@@ -62,61 +41,70 @@ A typical example of *ip_config.txt* is as follows:
172.31.29.175 172.31.29.175
172.31.16.98 172.31.16.98
Each row is an IP address of a machine. Optionally, the IP address can be followed by a port The workspace specified in the launch script is the working directory in the
that specifies the port used by network communication between trainers. When the port is not machines, which contains the training script, the IP configuration file, the
provided, a default one is ``30050``. partition configuration file as well as the graph partitions. All paths of the
files should be specified as relative paths to the workspace.
The launch script creates a specified number of training jobs
(``--num_trainers``) on each machine. In addition, users need to specify the
number of sampler processes for each trainer (``--num_samplers``).
The workspace specified in the launch script is the working directory in the machines, Launching a Persistent Graph Server
which contains the training script, the IP configuration file, the partition configuration ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
file as well as the graph partitions. All paths of the files should be specified as relative
paths to the workspace.
The launch script creates a specified number of training jobs (``--num_trainers``) on each machine. .. warning::
In addition, a user needs to specify the number of sampler processes for each trainer
(``--num_samplers``). The number of sampler processes has to match with the number of worker processes
specified in :func:`~dgl.distributed.initialize`.
It is common that users may want to try different models or training configurations Persistent graph server is an experimental feature. It is only available
against the same graph data. To avoid repetitively loading the same graph data, DGL when the ``net_etype`` argument of :func:`dgl.distributed.initialize`
is ``"tensorpipe"``.
Normally, all the server and trainer processes will be killed after the training is done.
However, sometimes users may wish to try out different models or training configurations
against the *same* graph data. Repetitively loading the same graph data
could be costly. To avoid that, DGL
allows users to launch a persistent graph server to be shared across multiple training allows users to launch a persistent graph server to be shared across multiple training
jobs. A persistent graph server will stay alive even all training workers have jobs. A persistent graph server will stay alive even all training workers have
finished and exited. Below shows an example of launching a persistent graph server: finished and exited. Below shows an example of launching a persistent graph server:
We first launch the graph server together with the first group of training workers. We first launch the graph server together with the first group of training workers.
.. code:: none .. code:: bash
python3 tools/launch.py \ python3 tools/launch.py \
--workspace ~graphsage/ \ --workspace /my/workspace/ \
--num_trainers 2 \ --num_trainers 2 \
--num_samplers 4 \ --num_samplers 4 \
--num_servers 1 \ --num_servers 1 \
--part_config data/ogb-product.json \ --part_config data/mygraph.json \
--ip_config ip_config.txt \ --ip_config ip_config.txt \
--keep_alive \ --keep_alive \
--server_name long_live \ --server_name long_live \
"python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1" "python3 my_train_script.py"
Pay attention to the ``--keep_alive`` option, which indicates the server should Pay attention to the ``--keep_alive`` option, which indicates the server should
stay alive after workers have finished. ``--server_name`` is the given name of stay alive after workers have finished. ``--server_name`` is the given name of
the server which will be referred when launching new training jobs. the server which will be referred when launching new training jobs.
Launch another group of distributed training job and connect to the existing persistent server. Then launch trainers as normal which will automatically connect to the existing
persistent server.
.. code:: none .. code:: bash
python3 tools/launch.py \
--workspace /my/workspace/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/mygraph.json \
--ip_config ip_config.txt \
"python3 my_train_script.py"
There are several restrictions when using persistent graph servers:
python3 tools/launch.py \ * All the arguments for ``launch.py`` should be kept same as previous launch. And below
--workspace ~graphsage/ \
--num_trainers 2 \
--num_samplers 4 \
--num_servers 1 \
--part_config data/ogb-product.json \
--ip_config ip_config.txt \
--server_name long_live \
"python3 code/train_dist.py --graph-name ogb-product --ip_config ip_config.txt --num-epochs 5 --batch-size 1000 --lr 0.1"
.. note::
All the arguments for ``launch.py`` should be kept same as previous launch. And below
arguments for specific training script should be kept same as well: ``--graph-name``, arguments for specific training script should be kept same as well: ``--graph-name``,
``--ip_config``. The rest arguments such as ``--num-epochs``, ``--batch-size`` and so ``--ip_config``.
on are free to change. * There is no data consistency control on the server side so data update must be carefully
handled. For example, it is recommended to avoid having multiple groups of trainers
update node/edge embeddings at the same time.
...@@ -5,6 +5,10 @@ Chapter 7: Distributed Training ...@@ -5,6 +5,10 @@ Chapter 7: Distributed Training
:ref:`(中文版) <guide_cn-distributed>` :ref:`(中文版) <guide_cn-distributed>`
.. note::
Distributed training is only available for PyTorch backend.
DGL adopts a fully distributed approach that distributes both data and computation DGL adopts a fully distributed approach that distributes both data and computation
across a collection of computation resources. In the context of this section, we across a collection of computation resources. In the context of this section, we
will assume a cluster setting (i.e., a group of machines). DGL partitions a graph will assume a cluster setting (i.e., a group of machines). DGL partitions a graph
...@@ -15,8 +19,8 @@ the computation and runs servers on the same machines to serve partitioned data ...@@ -15,8 +19,8 @@ the computation and runs servers on the same machines to serve partitioned data
For the training script, DGL provides distributed APIs that are similar to the ones for For the training script, DGL provides distributed APIs that are similar to the ones for
mini-batch training. This makes distributed training require only small code modifications mini-batch training. This makes distributed training require only small code modifications
from mini-batch training on a single machine. Below shows an example of training GraphSage from mini-batch training on a single machine. Below shows an example of training GraphSage
in a distributed fashion. The only code modifications are located on line 4-7: in a distributed fashion. The notable code modifications are:
1) initialize DGL's distributed module, 2) create a distributed graph object, and 1) initialization of DGL's distributed module, 2) create a distributed graph object, and
3) split the training set and calculate the nodes for the local process. 3) split the training set and calculate the nodes for the local process.
The rest of the code, including sampler creation, model definition, training loops The rest of the code, including sampler creation, model definition, training loops
are the same as :ref:`mini-batch training <guide-minibatch>`. are the same as :ref:`mini-batch training <guide-minibatch>`.
...@@ -24,13 +28,18 @@ are the same as :ref:`mini-batch training <guide-minibatch>`. ...@@ -24,13 +28,18 @@ are the same as :ref:`mini-batch training <guide-minibatch>`.
.. code:: python .. code:: python
import dgl import dgl
from dgl.dataloading import NeighborSampler
from dgl.distributed import DistGraph, DistDataLoader, node_split
import torch as th import torch as th
# initialize distributed contexts
dgl.distributed.initialize('ip_config.txt') dgl.distributed.initialize('ip_config.txt')
th.distributed.init_process_group(backend='gloo') th.distributed.init_process_group(backend='gloo')
g = dgl.distributed.DistGraph('graph_name', 'part_config.json') # load distributed graph
g = DistGraph('graph_name', 'part_config.json')
pb = g.get_partition_book() pb = g.get_partition_book()
train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True) # get training workload, i.e., training node IDs
train_nid = node_split(g.ndata['train_mask'], pb, force_even=True)
# Create sampler # Create sampler
...@@ -63,11 +72,6 @@ are the same as :ref:`mini-batch training <guide-minibatch>`. ...@@ -63,11 +72,6 @@ are the same as :ref:`mini-batch training <guide-minibatch>`.
loss.backward() loss.backward()
optimizer.step() optimizer.step()
When running the training script in a cluster of machines, DGL provides tools to copy data
to the cluster's machines and launch the training job on all machines.
**Note**: The current distributed training API only supports the Pytorch backend.
DGL implements a few distributed components to support distributed training. The figure below DGL implements a few distributed components to support distributed training. The figure below
shows the components and their interactions. shows the components and their interactions.
...@@ -77,28 +81,35 @@ shows the components and their interactions. ...@@ -77,28 +81,35 @@ shows the components and their interactions.
Specifically, DGL's distributed training has three types of interacting processes: Specifically, DGL's distributed training has three types of interacting processes:
*server*, *sampler* and *trainer*. *server*, *sampler* and *trainer*.
* Server processes run on each machine that stores a graph partition * **Servers** store graph partitions which includes both structure data and node/edge
(this includes the graph structure and node/edge features). These servers features. They provide services such as sampling, getting or updating node/edge
work together to serve the graph data to trainers. Note that one machine may run features. Note that each machine may run multiple server processes simultaneously
multiple server processes simultaneously to parallelize computation as well as to increase service throughput. One of them is *main server* in charge of data
network communication. loading and sharing data via shared memory with *backup servers* that provide
* Sampler processes interact with the servers and sample nodes and edges to services.
* **Sampler processes** interact with the servers and sample nodes and edges to
generate mini-batches for training. generate mini-batches for training.
* Trainers contain multiple classes to interact with servers. It has * **Trainers** are in charge of training networks on mini-batches. They utilize
:class:`~dgl.distributed.DistGraph` to get access to partitioned graph data and has APIs such as :class:`~dgl.distributed.DistGraph` to access partitioned graph data,
:class:`~dgl.distributed.DistEmbedding` and :class:`~dgl.distributed.DistTensor` to access :class:`~dgl.distributed.DistEmbedding` and :class:`~dgl.distributed.DistTensor` to access
the node/edge features/embeddings. It has node/edge features/embeddings and :class:`~dgl.distributed.DistDataLoader` to interact
:class:`~dgl.distributed.dist_dataloader.DistDataLoader` to with samplers to get mini-batches. Trainers communicate gradients among each other
interact with samplers to get mini-batches. using PyTorch's native ``DistributedDataParallel`` paradigm.
Besides Python APIs, DGL also provides `tools <https://github.com/dmlc/dgl/tree/master/tools>`__
for provisioning graph data and processes to the entire cluster.
Having the distributed components in mind, the rest of the section will cover Having the distributed components in mind, the rest of the section will cover
the following distributed components: the following distributed components:
* :ref:`guide-distributed-preprocessing` * :ref:`guide-distributed-preprocessing`
* :ref:`guide-distributed-tools`
* :ref:`guide-distributed-apis` * :ref:`guide-distributed-apis`
For more advanced users who are interested in more details:
* :ref:`guide-distributed-partition`
* :ref:`guide-distributed-hetero` * :ref:`guide-distributed-hetero`
* :ref:`guide-distributed-tools`
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
...@@ -106,6 +117,7 @@ the following distributed components: ...@@ -106,6 +117,7 @@ the following distributed components:
:glob: :glob:
distributed-preprocessing distributed-preprocessing
distributed-tools
distributed-apis distributed-apis
distributed-partition
distributed-hetero distributed-hetero
distributed-tools
...@@ -31,6 +31,9 @@ To quickly locate the examples of your interest, search for the tagged keywords ...@@ -31,6 +31,9 @@ To quickly locate the examples of your interest, search for the tagged keywords
- <a name='bgrl'></a> Thakoor et al. Large-Scale Representation Learning on Graphs via Bootstrapping. [Paper link](https://arxiv.org/abs/2102.06514). - <a name='bgrl'></a> Thakoor et al. Large-Scale Representation Learning on Graphs via Bootstrapping. [Paper link](https://arxiv.org/abs/2102.06514).
- Example code: [PyTorch](../examples/pytorch/bgrl) - Example code: [PyTorch](../examples/pytorch/bgrl)
- Tags: contrastive learning for node classification. - Tags: contrastive learning for node classification.
- <a name='ngnn'></a> Song et al. Network In Graph Neural Network. [Paper link](https://arxiv.org/abs/2111.11638).
- Example code: [PyTorch](../examples/pytorch/ogb/ngnn)
- Tags: model-agnostic methodology, link prediction, open graph benchmark.
## 2020 ## 2020
- <a name="eeg-gcnn"></a> Wagh et al. EEG-GCNN: Augmenting Electroencephalogram-based Neurological Disease Diagnosis using a Domain-guided Graph Convolutional Neural Network. [Paper link](http://proceedings.mlr.press/v136/wagh20a.html). - <a name="eeg-gcnn"></a> Wagh et al. EEG-GCNN: Augmenting Electroencephalogram-based Neurological Disease Diagnosis using a Domain-guided Graph Convolutional Neural Network. [Paper link](http://proceedings.mlr.press/v136/wagh20a.html).
......
...@@ -36,7 +36,6 @@ Train w/ mini-batch sampling in mixed mode (CPU+GPU) for node classification on ...@@ -36,7 +36,6 @@ Train w/ mini-batch sampling in mixed mode (CPU+GPU) for node classification on
```bash ```bash
python3 node_classification.py python3 node_classification.py
python3 multi_gpu_node_classification.py
``` ```
Results: Results:
......
Multiple GPU Training
============
Requirements
------------
```bash
pip install torchmetrics
```
How to run
-------
### Graph property prediction
Run with following (available dataset: "ogbg-molhiv", "ogbg-molpcba")
```bash
python3 multi_gpu_graph_prediction.py --dataset ogbg-molhiv
```
#### __Results__
```
* ogbg-molhiv: ~0.7965
* ogbg-molpcba: ~0.2239
```
#### __Scalability__
We test scalability of the code with dataset "ogbg-molhiv" in a machine of type <a href="https://aws.amazon.com/blogs/aws/now-available-ec2-instances-g4-with-nvidia-t4-tensor-core-gpus/">Amazon EC2 g4dn.metal</a>
, which has **8 Nvidia T4 Tensor Core GPUs**.
|GPU number |Speed Up |Batch size |Test accuracy |Average epoch Time|
| --- | ----------- | ----------- | -----------|-----------|
| 1 | x | 32 | 0.7765| 45.0s|
| 2 | 3.7x |64 | 0.7761|12.1s|
| 4 | 5.9x| 128 | 0.7854|7.6s|
| 8 | 9.5x| 256 | 0.7751|4.7s|
### Node classification
Run with following on dataset "ogbn-products"
```bash
python3 multi_gpu_node_classification.py
```
#### __Results__
```
Test Accuracy: ~0.7632
```
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist
import torch.optim as optim
import dgl
import dgl.nn as dglnn
from dgl.data import AsGraphPredDataset
from dgl.dataloading import GraphDataLoader
from ogb.graphproppred import DglGraphPropPredDataset, Evaluator
from ogb.graphproppred.mol_encoder import AtomEncoder, BondEncoder
from tqdm import tqdm
import argparse
class MLP(nn.Module):
def __init__(self, in_feats):
super().__init__()
self.mlp = nn.Sequential(
nn.Linear(in_feats, 2 * in_feats),
nn.BatchNorm1d(2 * in_feats),
nn.ReLU(),
nn.Linear(2 * in_feats, in_feats),
nn.BatchNorm1d(in_feats)
)
def forward(self, h):
return self.mlp(h)
class GIN(nn.Module):
def __init__(self, n_hidden, n_output, n_layers=5):
super().__init__()
self.node_encoder = AtomEncoder(n_hidden)
self.edge_encoders = nn.ModuleList([
BondEncoder(n_hidden) for _ in range(n_layers)])
self.pool = dglnn.AvgPooling()
self.dropout = nn.Dropout(0.5)
self.layers = nn.ModuleList()
for _ in range(n_layers):
self.layers.append(dglnn.GINEConv(MLP(n_hidden), learn_eps=True))
self.predictor = nn.Linear(n_hidden, n_output)
# add virtual node
self.virtual_emb = nn.Embedding(1, n_hidden)
nn.init.constant_(self.virtual_emb.weight.data, 0)
self.virtual_layers = nn.ModuleList()
for _ in range(n_layers - 1):
self.virtual_layers.append(MLP(n_hidden))
self.virtual_pool = dglnn.SumPooling()
def forward(self, g, x, x_e):
v_emb = self.virtual_emb.weight.expand(g.batch_size, -1)
hn = self.node_encoder(x)
for i in range(len(self.layers)):
v_hn = dgl.broadcast_nodes(g, v_emb)
hn = hn + v_hn
he = self.edge_encoders[i](x_e)
hn = self.layers[i](g, hn, he)
hn = F.relu(hn)
hn = self.dropout(hn)
if i != len(self.layers) - 1:
v_emb_tmp = self.virtual_pool(g, hn) + v_emb
v_emb = self.virtual_layers[i](v_emb_tmp)
v_emb = self.dropout(F.relu(v_emb))
hn = self.pool(g, hn)
return self.predictor(hn)
@torch.no_grad()
def evaluate(dataloader, device, model, evaluator):
model.eval()
y_true = []
y_pred = []
for batched_graph, labels in tqdm(dataloader):
batched_graph, labels = batched_graph.to(device), labels.to(device)
node_feat, edge_feat = batched_graph.ndata['feat'], batched_graph.edata['feat']
y_hat = model(batched_graph, node_feat, edge_feat)
y_true.append(labels.view(y_hat.shape).detach().cpu())
y_pred.append(y_hat.detach().cpu())
y_true = torch.cat(y_true, dim=0).numpy()
y_pred = torch.cat(y_pred, dim=0).numpy()
input_dict = {'y_true': y_true, 'y_pred': y_pred}
return evaluator.eval(input_dict)
def train(rank, world_size, dataset_name, root):
dist.init_process_group('nccl', 'tcp://127.0.0.1:12347', world_size=world_size, rank=rank)
torch.cuda.set_device(rank)
dataset = AsGraphPredDataset(DglGraphPropPredDataset(dataset_name, root))
evaluator = Evaluator(dataset_name)
model = GIN(300, dataset.num_tasks).to(rank)
model = nn.parallel.DistributedDataParallel(model, device_ids=[rank])
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)
train_dataloader = GraphDataLoader(
dataset[dataset.train_idx], batch_size=256,
use_ddp=True, shuffle=True)
valid_dataloader = GraphDataLoader(
dataset[dataset.val_idx], batch_size=256)
test_dataloader = GraphDataLoader(
dataset[dataset.test_idx], batch_size=256)
for epoch in range(50):
model.train()
train_dataloader.set_epoch(epoch)
for batched_graph, labels in train_dataloader:
batched_graph, labels = batched_graph.to(rank), labels.to(rank)
node_feat, edge_feat = batched_graph.ndata['feat'], batched_graph.edata['feat']
logits = model(batched_graph, node_feat, edge_feat)
optimizer.zero_grad()
is_labeled = labels == labels
loss = F.binary_cross_entropy_with_logits(logits.float()[is_labeled], labels.float()[is_labeled])
loss.backward()
optimizer.step()
scheduler.step()
if rank == 0:
val_metric = evaluate(valid_dataloader, rank, model.module, evaluator)[evaluator.eval_metric]
test_metric = evaluate(test_dataloader, rank, model.module, evaluator)[evaluator.eval_metric]
print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}, '
f'Val: {val_metric:.4f}, Test: {test_metric:.4f}')
dist.destroy_process_group()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--dataset', type=str, default="ogbg-molhiv",
choices=['ogbg-molhiv', 'ogbg-molpcba'],
help='name of dataset (default: ogbg-molhiv)')
dataset_name = parser.parse_args().dataset
root = './data/OGB'
DglGraphPropPredDataset(dataset_name, root)
world_size = torch.cuda.device_count()
print('Let\'s use', world_size, 'GPUs!')
args = (world_size, dataset_name, root)
import torch.multiprocessing as mp
mp.spawn(train, args=args, nprocs=world_size, join=True)
# NGNN + GraphSage/GCN
## Introduction
This is an example of implementing [NGNN](https://arxiv.org/abs/2111.11638) for link prediction in DGL.
We use a model-agnostic methodology, namely Network In Graph Neural Network (NGNN), which allows arbitrary GNN models to increase their model capacity.
The script in this folder experiments full-batch GCN/GraphSage (with/without NGNN) on the datasets: ogbl-ddi, ogbl-collab and ogbl-ppa.
## Installation requirements
```
ogb>=1.3.3
torch>=1.11.0
dgl>=0.8
```
## Experiments
We do not fix random seeds at all, and take over 10 runs for all models. All models are trained on a single V100 GPU with 16GB memory.
### ogbl-ddi
#### performance
<table>
<tr>
<th></th>
<th colspan=3 style="text-align: center;">test set</th>
<th colspan=3 style="text-align: center;">validation set</th>
<th>#parameters</th>
</tr>
<tr>
<td></td>
<td>Hits@20</td>
<td>Hits@50</td>
<td>Hits@100</td>
<td>Hits@20</td>
<td>Hits@50</td>
<td>Hits@100</td>
<td></td>
</tr>
<tr>
<td>GCN+NGNN(paper)</td>
<td>48.22% ± 7.00%</td>
<td>82.56% ± 4.03%</td>
<td>89.48% ± 1.68%</td>
<td>65.95% ± 1.16%</td>
<td>70.24% ± 0.50%</td>
<td>72.54% ± 0.62%</td>
<td rowspan=2>1,487,361</td>
</tr>
<tr>
<td>GCN+NGNN(ours; 50runs)</td>
<td><b>54.83% ± 15.81%</b></td>
<td><b>93.15% ± 2.59%</b></td>
<td><b>97.05% ± 0.56%</b></td>
<td>71.21% ± 0.38%</td>
<td>73.55% ± 0.25%</td>
<td>76.24% ± 1.33%</td>
</tr>
<tr>
<td>GraphSage+NGNN(paper)</td>
<td>60.75% ± 4.94%</td>
<td>84.58% ± 1.89%</td>
<td>92.58% ± 0.88%</td>
<td>68.05% ± 0.68%</td>
<td>71.14% ± 0.33%</td>
<td>72.77% ± 0.09%</td>
<td rowspan=2>1,618,433</td>
</tr>
<tr>
<td>GraphSage+NGNN(ours; 50runs)</td>
<td>57.70% ± 15.23%</td>
<td><b>96.18% ± 0.94%</b></td>
<td><b>98.58% ± 0.17%</b></td>
<td>73.23% ± 0.40%</td>
<td>87.20% ± 5.29%</td>
<td>98.71% ± 0.22%</td>
</tr>
</table>
A 3-layer MLP is used as LinkPredictor here, while a 2-layer one is used by the NGNN paper. This is the main reason for the better performance.
#### Reproduction of performance
- GCN + NGNN
```{.bash}
python main.py --dataset ogbl-ddi --device 0 --ngnn_type input --epochs 800 --dropout 0.5 --num_layers 2 --lr 0.0025 --batch_size 16384 --runs 50
```
- GraphSage + NGNN
```{.bash}
python main.py --dataset ogbl-ddi --device 1 --ngnn_type input --use_sage --epochs 600 --dropout 0.25 --num_layers 2 --lr 0.0012 --batch_size 32768 --runs 50
```
### ogbl-collab
#### Performance
<table>
<tr>
<th></th>
<th colspan=3 style="text-align: center;">test set</th>
<th colspan=3 style="text-align: center;">validation set</th>
<th>#parameters</th>
</tr>
<tr>
<td></td>
<td>Hits@10</td>
<td>Hits@50</td>
<td>Hits@100</td>
<td>Hits@10</td>
<td>Hits@50</td>
<td>Hits@100</td>
<td></td>
</tr>
<tr>
<td>GCN+NGNN(paper)</td>
<td>36.69% ± 0.82%</td>
<td>51.83% ± 0.50%</td>
<td>57.41% ± 0.22%</td>
<td>44.97% ± 0.97%</td>
<td>60.84% ± 0.63%</td>
<td>66.09% ± 0.30%</td>
<td rowspan=2>428,033</td>
</tr>
<tr>
<td>GCN+NGNN(ours)</td>
<td><b>39.29% ± 1.21%</b></td>
<td><b>53.48% ± 0.40%</b></td>
<td>58.34% ± 0.45%</td>
<td>48.28% ± 1.39%</td>
<td>62.73% ± 0.40%</td>
<td>67.13% ± 0.39%</td>
</tr>
<tr>
<td>GraphSage+NGNN(paper)</td>
<td>36.83% ± 2.56%</td>
<td>52.62% ± 1.04%</td>
<td>57.96% ± 0.56%</td>
<td>45.62% ± 2.56%</td>
<td>61.34% ± 1.05%</td>
<td>66.26% ± 0.44%</td>
<td rowspan=2>591,873</td>
</tr>
<tr>
<td>GraphSage+NGNN(ours)</td>
<td><b>40.30% ± 1.03%</b></td>
<td>53.59% ± 0.56%</td>
<td>58.75% ± 0.57%</td>
<td>49.85% ± 1.07%</td>
<td>62.81% ± 0.46%</td>
<td>67.33% ± 0.38%</td>
</tr>
</table>
#### Reproduction of performance
- GCN + NGNN
```{.bash}
python main.py --dataset ogbl-collab --device 2 --ngnn_type hidden --epochs 600 --dropout 0.2 --num_layers 3 --lr 0.001 --batch_size 32768 --runs 10
```
- GraphSage + NGNN
```{.bash}
python main.py --dataset ogbl-collab --device 3 --ngnn_type input --use_sage --epochs 800 --dropout 0.2 --num_layers 3 --lr 0.0005 --batch_size 32768 --runs 10
```
### ogbl-ppa
#### Performance
<table>
<tr>
<th></th>
<th colspan=3 style="text-align: center;">test set</th>
<th colspan=3 style="text-align: center;">validation set</th>
<th>#parameters</th>
</tr>
<tr>
<td></td>
<td>Hits@10</td>
<td>Hits@50</td>
<td>Hits@100</td>
<td>Hits@10</td>
<td>Hits@50</td>
<td>Hits@100</td>
<td></td>
</tr>
<tr>
<td>GCN+NGNN(paper)</td>
<td>5.64% ± 0.93%</td>
<td>18.44% ± 1.88%</td>
<td>26.78% ± 0.9%</td>
<td>8.14% ± 0.71%</td>
<td>19.69% ± 0.94%</td>
<td>27.86% ± 0.81%</td>
<td rowspan=1>673,281</td>
</tr>
<tr>
<td>GCN+NGNN(ours)</td>
<td><b>13.07% ± 3.24%</b></td>
<td><b>28.55% ± 1.62%</b></td>
<td><b>36.83% ± 0.99%</b></td>
<td>16.36% ± 1.89%</td>
<td>30.56% ± 0.72%</td>
<td>38.34% ± 0.82%</td>
<td>410,113</td>
</tr>
<tr>
<td>GraphSage+NGNN(paper)</td>
<td>3.52% ± 1.24%</td>
<td>15.55% ± 1.92%</td>
<td>24.45% ± 2.34%</td>
<td>5.59% ± 0.93%</td>
<td>17.21% ± 0.69%</td>
<td>25.42% ± 0.50%</td>
<td rowspan=1>819,201</td>
</tr>
<tr>
<td>GraphSage+NGNN(ours)</td>
<td><b>11.73% ± 2.42%</b></td>
<td><b>29.88% ± 1.84%</b></td>
<td><b>40.05% ± 1.38%</b></td>
<td>14.73% ± 2.36%</td>
<td>31.59% ± 1.72%</td>
<td>40.58% ± 1.23%</td>
<td>556,033</td>
</tr>
</table>
The main difference between this implementation and NGNN paper is the position of NGNN (all -> input).
#### Reproduction of performance
- GCN + NGNN
```{.bash}
python main.py --dataset ogbl-ppa --device 4 --ngnn_type input --epochs 80 --dropout 0.2 --num_layers 3 --lr 0.001 --batch_size 49152 --runs 10
```
- GraphSage + NGNN
```{.bash}
python main.py --dataset ogbl-ppa --device 5 --ngnn_type input --use_sage --epochs 80 --dropout 0.2 --num_layers 3 --lr 0.001 --batch_size 49152 --runs 10
```
## References
```{.tex}
@article{DBLP:journals/corr/abs-2111-11638,
author = {Xiang Song and
Runjie Ma and
Jiahang Li and
Muhan Zhang and
David Paul Wipf},
title = {Network In Graph Neural Network},
journal = {CoRR},
volume = {abs/2111.11638},
year = {2021},
url = {https://arxiv.org/abs/2111.11638},
eprinttype = {arXiv},
eprint = {2111.11638},
timestamp = {Fri, 26 Nov 2021 13:48:43 +0100},
biburl = {https://dblp.org/rec/journals/corr/abs-2111-11638.bib},
bibsource = {dblp computer science bibliography, https://dblp.org}
}
```
import argparse
import math
import torch
import torch.nn.functional as F
from torch.nn import Linear
from torch.utils.data import DataLoader
import dgl
from dgl.nn.pytorch import GraphConv, SAGEConv
from dgl.dataloading.negative_sampler import GlobalUniform
from ogb.linkproppred import DglLinkPropPredDataset, Evaluator
class Logger(object):
def __init__(self, runs, info=None):
self.info = info
self.results = [[] for _ in range(runs)]
def add_result(self, run, result):
assert len(result) == 3
assert run >= 0 and run < len(self.results)
self.results[run].append(result)
def print_statistics(self, run=None):
if run is not None:
result = 100 * torch.tensor(self.results[run])
argmax = result[:, 1].argmax().item()
print(f"Run {run + 1:02d}:")
print(f"Highest Train: {result[:, 0].max():.2f}")
print(f"Highest Valid: {result[:, 1].max():.2f}")
print(f" Final Train: {result[argmax, 0]:.2f}")
print(f" Final Test: {result[argmax, 2]:.2f}")
else:
result = 100 * torch.tensor(self.results)
best_results = []
for r in result:
train1 = r[:, 0].max().item()
valid = r[:, 1].max().item()
train2 = r[r[:, 1].argmax(), 0].item()
test = r[r[:, 1].argmax(), 2].item()
best_results.append((train1, valid, train2, test))
best_result = torch.tensor(best_results)
print(f"All runs:")
r = best_result[:, 0]
print(f"Highest Train: {r.mean():.2f} ± {r.std():.2f}")
r = best_result[:, 1]
print(f"Highest Valid: {r.mean():.2f} ± {r.std():.2f}")
r = best_result[:, 2]
print(f" Final Train: {r.mean():.2f} ± {r.std():.2f}")
r = best_result[:, 3]
print(f" Final Test: {r.mean():.2f} ± {r.std():.2f}")
class NGNN_GCNConv(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_nonl_layers):
super(NGNN_GCNConv, self).__init__()
self.num_nonl_layers = num_nonl_layers # number of nonlinear layers in each conv layer
self.conv = GraphConv(in_channels, hidden_channels)
self.fc = Linear(hidden_channels, hidden_channels)
self.fc2 = Linear(hidden_channels, out_channels)
self.reset_parameters()
def reset_parameters(self):
self.conv.reset_parameters()
gain = torch.nn.init.calculate_gain('relu')
torch.nn.init.xavier_uniform_(self.fc.weight, gain=gain)
torch.nn.init.xavier_uniform_(self.fc2.weight, gain=gain)
for bias in [self.fc.bias, self.fc2.bias]:
stdv = 1.0 / math.sqrt(bias.size(0))
bias.data.uniform_(-stdv, stdv)
def forward(self, g, x):
x = self.conv(g, x)
if self.num_nonl_layers == 2:
x = F.relu(x)
x = self.fc(x)
x = F.relu(x)
x = self.fc2(x)
return x
class GCN(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_layers, dropout, ngnn_type, dataset):
super(GCN, self).__init__()
self.dataset = dataset
self.convs = torch.nn.ModuleList()
num_nonl_layers = 1 if num_layers <= 2 else 2 # number of nonlinear layers in each conv layer
if ngnn_type == 'input':
self.convs.append(NGNN_GCNConv(in_channels, hidden_channels, hidden_channels, num_nonl_layers))
for _ in range(num_layers - 2):
self.convs.append(GraphConv(hidden_channels, hidden_channels))
elif ngnn_type == 'hidden':
self.convs.append(GraphConv(in_channels, hidden_channels))
for _ in range(num_layers - 2):
self.convs.append(NGNN_GCNConv(hidden_channels, hidden_channels, hidden_channels, num_nonl_layers))
self.convs.append(GraphConv(hidden_channels, out_channels))
self.dropout = dropout
self.reset_parameters()
def reset_parameters(self):
for conv in self.convs:
conv.reset_parameters()
def forward(self, g, x):
for conv in self.convs[:-1]:
x = conv(g, x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.convs[-1](g, x)
return x
class NGNN_SAGEConv(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_nonl_layers,
*, reduce):
super(NGNN_SAGEConv, self).__init__()
self.num_nonl_layers = num_nonl_layers # number of nonlinear layers in each conv layer
self.conv = SAGEConv(in_channels, hidden_channels, reduce)
self.fc = Linear(hidden_channels, hidden_channels)
self.fc2 = Linear(hidden_channels, out_channels)
self.reset_parameters()
def reset_parameters(self):
self.conv.reset_parameters()
gain = torch.nn.init.calculate_gain('relu')
torch.nn.init.xavier_uniform_(self.fc.weight, gain=gain)
torch.nn.init.xavier_uniform_(self.fc2.weight, gain=gain)
for bias in [self.fc.bias, self.fc2.bias]:
stdv = 1.0 / math.sqrt(bias.size(0))
bias.data.uniform_(-stdv, stdv)
def forward(self, g, x):
x = self.conv(g, x)
if self.num_nonl_layers == 2:
x = F.relu(x)
x = self.fc(x)
x = F.relu(x)
x = self.fc2(x)
return x
class SAGE(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_layers, dropout, ngnn_type, dataset, reduce='mean'):
super(SAGE, self).__init__()
self.dataset = dataset
self.convs = torch.nn.ModuleList()
num_nonl_layers = 1 if num_layers <= 2 else 2 # number of nonlinear layers in each conv layer
if ngnn_type == 'input':
self.convs.append(NGNN_SAGEConv(in_channels, hidden_channels, hidden_channels, num_nonl_layers, reduce=reduce))
for _ in range(num_layers - 2):
self.convs.append(SAGEConv(hidden_channels, hidden_channels, reduce))
elif ngnn_type == 'hidden':
self.convs.append(SAGEConv(in_channels, hidden_channels, reduce))
for _ in range(num_layers - 2):
self.convs.append(NGNN_SAGEConv(hidden_channels, hidden_channels, hidden_channels, num_nonl_layers, reduce=reduce))
self.convs.append(SAGEConv(hidden_channels, out_channels, reduce))
self.dropout = dropout
self.reset_parameters()
def reset_parameters(self):
for conv in self.convs:
conv.reset_parameters()
def forward(self, g, x):
for conv in self.convs[:-1]:
x = conv(g, x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.convs[-1](g, x)
return x
class LinkPredictor(torch.nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_layers, dropout):
super(LinkPredictor, self).__init__()
self.lins = torch.nn.ModuleList()
self.lins.append(Linear(in_channels, hidden_channels))
for _ in range(num_layers - 2):
self.lins.append(Linear(hidden_channels, hidden_channels))
self.lins.append(Linear(hidden_channels, out_channels))
self.dropout = dropout
self.reset_parameters()
def reset_parameters(self):
for lin in self.lins:
lin.reset_parameters()
def forward(self, x_i, x_j):
x = x_i * x_j
for lin in self.lins[:-1]:
x = lin(x)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.lins[-1](x)
return torch.sigmoid(x)
def train(model, predictor, g, x, split_edge, optimizer, batch_size):
model.train()
predictor.train()
pos_train_edge = split_edge['train']['edge'].to(x.device)
neg_sampler = GlobalUniform(1)
total_loss = total_examples = 0
for perm in DataLoader(range(pos_train_edge.size(0)), batch_size,
shuffle=True):
optimizer.zero_grad()
h = model(g, x)
edge = pos_train_edge[perm].t()
pos_out = predictor(h[edge[0]], h[edge[1]])
pos_loss = -torch.log(pos_out + 1e-15).mean()
edge = neg_sampler(g, edge[0])
neg_out = predictor(h[edge[0]], h[edge[1]])
neg_loss = -torch.log(1 - neg_out + 1e-15).mean()
loss = pos_loss + neg_loss
loss.backward()
if model.dataset == 'ogbl-ddi':
torch.nn.utils.clip_grad_norm_(x, 1.0)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
torch.nn.utils.clip_grad_norm_(predictor.parameters(), 1.0)
optimizer.step()
num_examples = pos_out.size(0)
total_loss += loss.item() * num_examples
total_examples += num_examples
return total_loss / total_examples
@torch.no_grad()
def test(model, predictor, g, x, split_edge, evaluator, batch_size):
model.eval()
predictor.eval()
h = model(g, x)
pos_train_edge = split_edge['eval_train']['edge'].to(h.device)
pos_valid_edge = split_edge['valid']['edge'].to(h.device)
neg_valid_edge = split_edge['valid']['edge_neg'].to(h.device)
pos_test_edge = split_edge['test']['edge'].to(h.device)
neg_test_edge = split_edge['test']['edge_neg'].to(h.device)
def get_pred(test_edges, h):
preds = []
for perm in DataLoader(range(test_edges.size(0)), batch_size):
edge = test_edges[perm].t()
preds += [predictor(h[edge[0]], h[edge[1]]).squeeze().cpu()]
pred = torch.cat(preds, dim=0)
return pred
pos_train_pred = get_pred(pos_train_edge, h)
pos_valid_pred = get_pred(pos_valid_edge, h)
neg_valid_pred = get_pred(neg_valid_edge, h)
pos_test_pred = get_pred(pos_test_edge, h)
neg_test_pred = get_pred(neg_test_edge, h)
results = {}
for K in [20, 50, 100]:
evaluator.K = K
train_hits = evaluator.eval({
'y_pred_pos': pos_train_pred,
'y_pred_neg': neg_valid_pred,
})[f'hits@{K}']
valid_hits = evaluator.eval({
'y_pred_pos': pos_valid_pred,
'y_pred_neg': neg_valid_pred,
})[f'hits@{K}']
test_hits = evaluator.eval({
'y_pred_pos': pos_test_pred,
'y_pred_neg': neg_test_pred,
})[f'hits@{K}']
results[f'Hits@{K}'] = (train_hits, valid_hits, test_hits)
return results
def main():
parser = argparse.ArgumentParser(description='OGBL(Full Batch GCN/GraphSage + NGNN)')
# dataset setting
parser.add_argument('--dataset', type=str, default='ogbl-ddi', choices=['ogbl-ddi', 'ogbl-collab', 'ogbl-ppa'])
# device setting
parser.add_argument('--device', type=int, default=0, help='GPU device ID. Use -1 for CPU training.')
# model structure settings
parser.add_argument('--use_sage', action='store_true', help='If not set, use GCN by default.')
parser.add_argument('--ngnn_type', type=str, default="input", choices=['input', 'hidden'], help="You can set this value from 'input' or 'hidden' to apply NGNN to different GNN layers.")
parser.add_argument('--num_layers', type=int, default=3, help='number of GNN layers')
parser.add_argument('--hidden_channels', type=int, default=256)
parser.add_argument('--dropout', type=float, default=0.0)
parser.add_argument('--batch_size', type=int, default=64 * 1024)
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--epochs', type=int, default=400)
# training settings
parser.add_argument('--eval_steps', type=int, default=1)
parser.add_argument('--runs', type=int, default=10)
args = parser.parse_args()
print(args)
device = f'cuda:{args.device}' if args.device != -1 and torch.cuda.is_available() else 'cpu'
device = torch.device(device)
dataset = DglLinkPropPredDataset(name=args.dataset)
g = dataset[0]
split_edge = dataset.get_edge_split()
# We randomly pick some training samples that we want to evaluate on:
idx = torch.randperm(split_edge['train']['edge'].size(0))
idx = idx[:split_edge['valid']['edge'].size(0)]
split_edge['eval_train'] = {'edge': split_edge['train']['edge'][idx]}
if dataset.name == 'ogbl-ppa':
g.ndata['feat'] = g.ndata['feat'].to(torch.float)
if dataset.name == 'ogbl-ddi':
emb = torch.nn.Embedding(g.num_nodes(), args.hidden_channels).to(device)
in_channels = args.hidden_channels
else: # ogbl-collab, ogbl-ppa
in_channels = g.ndata['feat'].size(-1)
# select model
if args.use_sage:
model = SAGE(in_channels, args.hidden_channels,
args.hidden_channels, args.num_layers,
args.dropout, args.ngnn_type, dataset.name)
else: # GCN
g = dgl.add_self_loop(g)
model = GCN(in_channels, args.hidden_channels,
args.hidden_channels, args.num_layers,
args.dropout, args.ngnn_type, dataset.name)
predictor = LinkPredictor(args.hidden_channels, args.hidden_channels, 1, 3, args.dropout)
g, model, predictor = map(lambda x: x.to(device), (g, model, predictor))
evaluator = Evaluator(name=dataset.name)
loggers = {
'Hits@20': Logger(args.runs, args),
'Hits@50': Logger(args.runs, args),
'Hits@100': Logger(args.runs, args),
}
for run in range(args.runs):
model.reset_parameters()
predictor.reset_parameters()
if dataset.name == 'ogbl-ddi':
torch.nn.init.xavier_uniform_(emb.weight)
g.ndata['feat'] = emb.weight
optimizer = torch.optim.Adam(
list(model.parameters()) + list(predictor.parameters()) + (
list(emb.parameters()) if dataset.name == 'ogbl-ddi' else []
),
lr=args.lr)
for epoch in range(1, 1 + args.epochs):
loss = train(model, predictor, g, g.ndata['feat'], split_edge, optimizer,
args.batch_size)
if epoch % args.eval_steps == 0:
results = test(model, predictor, g, g.ndata['feat'], split_edge, evaluator,
args.batch_size)
for key, result in results.items():
loggers[key].add_result(run, result)
train_hits, valid_hits, test_hits = result
print(key)
print(f'Run: {run + 1:02d}, '
f'Epoch: {epoch:02d}, '
f'Loss: {loss:.4f}, '
f'Train: {100 * train_hits:.2f}%, '
f'Valid: {100 * valid_hits:.2f}%, '
f'Test: {100 * test_hits:.2f}%')
print('---')
for key in loggers.keys():
print(key)
loggers[key].print_statistics(run)
for key in loggers.keys():
print(key)
loggers[key].print_statistics()
if __name__ == "__main__":
main()
...@@ -573,7 +573,8 @@ def main(args): ...@@ -573,7 +573,8 @@ def main(args):
if args.num_gpus == -1: if args.num_gpus == -1:
device = th.device('cpu') device = th.device('cpu')
else: else:
device = th.device('cuda:'+str(args.local_rank)) dev_id = g.rank() % args.num_gpus
device = th.device('cuda:'+str(dev_id))
labels = g.nodes['paper'].data['labels'][np.arange(g.number_of_nodes('paper'))] labels = g.nodes['paper'].data['labels'][np.arange(g.number_of_nodes('paper'))]
all_val_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['val_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze() all_val_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['val_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze()
all_test_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['test_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze() all_test_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['test_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze()
......
...@@ -9,6 +9,7 @@ import inspect ...@@ -9,6 +9,7 @@ import inspect
import re import re
import atexit import atexit
import os import os
from contextlib import contextmanager
import psutil import psutil
import numpy as np import numpy as np
...@@ -20,8 +21,8 @@ from ..base import NID, EID, dgl_warning, DGLError ...@@ -20,8 +21,8 @@ from ..base import NID, EID, dgl_warning, DGLError
from ..batch import batch as batch_graphs from ..batch import batch as batch_graphs
from ..heterograph import DGLHeteroGraph from ..heterograph import DGLHeteroGraph
from ..utils import ( from ..utils import (
recursive_apply, ExceptionWrapper, recursive_apply_pair, set_num_threads, recursive_apply, ExceptionWrapper, recursive_apply_pair, set_num_threads, get_num_threads,
context_of, dtype_of) get_numa_nodes_cores, context_of, dtype_of)
from ..frame import LazyFeature from ..frame import LazyFeature
from ..storages import wrap_storage from ..storages import wrap_storage
from .base import BlockSampler, as_edge_prediction_sampler from .base import BlockSampler, as_edge_prediction_sampler
...@@ -697,8 +698,7 @@ class DataLoader(torch.utils.data.DataLoader): ...@@ -697,8 +698,7 @@ class DataLoader(torch.utils.data.DataLoader):
def __init__(self, graph, indices, graph_sampler, device=None, use_ddp=False, def __init__(self, graph, indices, graph_sampler, device=None, use_ddp=False,
ddp_seed=0, batch_size=1, drop_last=False, shuffle=False, ddp_seed=0, batch_size=1, drop_last=False, shuffle=False,
use_prefetch_thread=None, use_alternate_streams=None, use_prefetch_thread=None, use_alternate_streams=None,
pin_prefetcher=None, use_uva=False, pin_prefetcher=None, use_uva=False, **kwargs):
use_cpu_worker_affinity=False, cpu_worker_affinity_cores=None, **kwargs):
# (BarclayII) PyTorch Lightning sometimes will recreate a DataLoader from an existing # (BarclayII) PyTorch Lightning sometimes will recreate a DataLoader from an existing
# DataLoader with modifications to the original arguments. The arguments are retrieved # DataLoader with modifications to the original arguments. The arguments are retrieved
# from the attributes with the same name, and because we change certain arguments # from the attributes with the same name, and because we change certain arguments
...@@ -840,31 +840,12 @@ class DataLoader(torch.utils.data.DataLoader): ...@@ -840,31 +840,12 @@ class DataLoader(torch.utils.data.DataLoader):
self.use_alternate_streams = use_alternate_streams self.use_alternate_streams = use_alternate_streams
self.pin_prefetcher = pin_prefetcher self.pin_prefetcher = pin_prefetcher
self.use_prefetch_thread = use_prefetch_thread self.use_prefetch_thread = use_prefetch_thread
self.cpu_affinity_enabled = False
worker_init_fn = WorkerInitWrapper(kwargs.get('worker_init_fn', None)) worker_init_fn = WorkerInitWrapper(kwargs.get('worker_init_fn', None))
self.other_storages = {} self.other_storages = {}
if use_cpu_worker_affinity:
nw_work = kwargs.get('num_workers', 0)
if cpu_worker_affinity_cores is None:
cpu_worker_affinity_cores = []
if not isinstance(cpu_worker_affinity_cores, list):
raise Exception('ERROR: cpu_worker_affinity_cores should be a list of cores')
if not nw_work > 0:
raise Exception('ERROR: affinity should be used with --num_workers=X')
if len(cpu_worker_affinity_cores) not in [0, nw_work]:
raise Exception('ERROR: cpu_affinity incorrect '
'settings for cores={} num_workers={}'
.format(cpu_worker_affinity_cores, nw_work))
self.cpu_cores = (cpu_worker_affinity_cores
if len(cpu_worker_affinity_cores)
else range(0, nw_work))
worker_init_fn = WorkerInitWrapper(self.worker_init_function)
super().__init__( super().__init__(
self.dataset, self.dataset,
collate_fn=CollateWrapper( collate_fn=CollateWrapper(
...@@ -875,6 +856,11 @@ class DataLoader(torch.utils.data.DataLoader): ...@@ -875,6 +856,11 @@ class DataLoader(torch.utils.data.DataLoader):
**kwargs) **kwargs)
def __iter__(self): def __iter__(self):
if self.device.type == 'cpu' and not self.cpu_affinity_enabled:
link = 'https://docs.dgl.ai/tutorials/cpu/cpu_best_practises.html'
dgl_warning(f'Dataloader CPU affinity opt is not enabled, consider switching it on '
f'(see enable_cpu_affinity() or CPU best practices for DGL [{link}])')
if self.shuffle: if self.shuffle:
self.dataset.shuffle() self.dataset.shuffle()
# When using multiprocessing PyTorch sometimes set the number of PyTorch threads to 1 # When using multiprocessing PyTorch sometimes set the number of PyTorch threads to 1
...@@ -882,20 +868,89 @@ class DataLoader(torch.utils.data.DataLoader): ...@@ -882,20 +868,89 @@ class DataLoader(torch.utils.data.DataLoader):
num_threads = torch.get_num_threads() if self.num_workers > 0 else None num_threads = torch.get_num_threads() if self.num_workers > 0 else None
return _PrefetchingIter(self, super().__iter__(), num_threads=num_threads) return _PrefetchingIter(self, super().__iter__(), num_threads=num_threads)
def worker_init_function(self, worker_id): @contextmanager
"""Worker init default function. def enable_cpu_affinity(self, loader_cores=None, compute_cores=None, verbose=True):
Parameters """ Helper method for enabling cpu affinity for compute threads and dataloader workers
---------- Only for CPU devices
worker_id : int Uses only NUMA node 0 by default for multi-node systems
Worker ID.
Parameters
----------
loader_cores : [int] (optional)
List of cpu cores to which dataloader workers should affinitize to.
default: node0_cores[0:num_workers]
compute_cores : [int] (optional)
List of cpu cores to which compute threads should affinitize to
default: node0_cores[num_workers:]
verbose : bool (optional)
If True, affinity information will be printed to the console
Usage
-----
with dataloader.enable_cpu_affinity():
<training loop>
""" """
try: if self.device.type == 'cpu':
psutil.Process().cpu_affinity([self.cpu_cores[worker_id]]) if not self.num_workers > 0:
print('CPU-affinity worker {} has been assigned to core={}' raise Exception('ERROR: affinity should be used with at least one DL worker')
.format(worker_id, self.cpu_cores[worker_id])) if loader_cores and len(loader_cores) != self.num_workers:
except: raise Exception('ERROR: cpu_affinity incorrect '
raise Exception('ERROR: cannot use affinity id={} cpu_cores={}' 'number of loader_cores={} for num_workers={}'
.format(worker_id, self.cpu_cores)) .format(loader_cores, self.num_workers))
# False positive E0203 (access-member-before-definition) linter warning
worker_init_fn_old = self.worker_init_fn # pylint: disable=E0203
affinity_old = psutil.Process().cpu_affinity()
nthreads_old = get_num_threads()
compute_cores = compute_cores[:] if compute_cores else []
loader_cores = loader_cores[:] if loader_cores else []
def init_fn(worker_id):
try:
psutil.Process().cpu_affinity([loader_cores[worker_id]])
except:
raise Exception('ERROR: cannot use affinity id={} cpu={}'
.format(worker_id, loader_cores))
worker_init_fn_old(worker_id)
if not loader_cores or not compute_cores:
numa_info = get_numa_nodes_cores()
if numa_info and len(numa_info[0]) > self.num_workers:
# take one thread per each node 0 core
node0_cores = [cpus[0] for core_id, cpus in numa_info[0]]
else:
node0_cores = list(range(psutil.cpu_count(logical = False)))
if len(node0_cores) <= self.num_workers:
raise Exception('ERROR: more workers than available cores')
loader_cores = loader_cores or node0_cores[0:self.num_workers]
compute_cores = [cpu for cpu in node0_cores if cpu not in loader_cores]
try:
psutil.Process().cpu_affinity(compute_cores)
set_num_threads(len(compute_cores))
self.worker_init_fn = init_fn
self.cpu_affinity_enabled = True
if verbose:
print('{} DL workers are assigned to cpus {}, main process will use cpus {}'
.format(self.num_workers, loader_cores, compute_cores))
yield
finally:
# restore omp_num_threads and cpu affinity
psutil.Process().cpu_affinity(affinity_old)
set_num_threads(nthreads_old)
self.worker_init_fn = worker_init_fn_old
self.cpu_affinity_enabled = False
else:
yield
# To allow data other than node/edge data to be prefetched. # To allow data other than node/edge data to be prefetched.
def attach_data(self, name, data): def attach_data(self, name, data):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment