"vscode:/vscode.git/clone" did not exist on "81aeea361da3936b875a678b9cb44596800510b5"
Unverified Commit 75ffc31f authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Doc] Update the docstring of distributed APIs. (#2025)



* add doc.

* update DistGraph.

* add DistTensor.

* update DistEmbedding.

* add partition.py

* add sampling.

* fix.

* add graph partition book and create a base class.

* fix test.

* add rst.

* update doc rst.

* update.

* fix.

* fix docs

* update distributed tensor and embeddings.

* add checks.

* update DistGraph.

* update initialization.

* fix graph partition book.

* update graph partition book.

* update partition.

* update partition.

* fix.

* add example code.

* update DistGraph

* Update python/dgl/distributed/dist_context.py
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>

* Update python/dgl/distributed/dist_context.py
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>

* Update python/dgl/distributed/dist_dataloader.py
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>

* Update python/dgl/distributed/dist_dataloader.py
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>

* Update python/dgl/distributed/dist_dataloader.py
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>

* update initialize.

* update dataloader.

* update distgraph.

* update DistGraph.

* update DistTensor.

* update.

* more updates.

* fix lint.

* add num_nodes and num_edges
Co-authored-by: default avatarChao Ma <mctt90@gmail.com>
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
parent 3d654843
.. _api-distributed:
dgl.distributed
=================================
.. automodule:: dgl.distributed
Initialization
---------------
.. autosummary::
:toctree: ../../generated/
initialize
Distributed Graph
-----------------
.. autoclass:: DistGraph
:members: ndata, edata, idtype, device, ntypes, etypes, number_of_nodes, number_of_edges, node_attr_schemes, edge_attr_schemes, rank, find_edges, get_partition_book, barrier, local_partition
Distributed Tensor
------------------
.. autoclass:: DistTensor
:members: part_policy, shape, dtype, name
Distributed Embedding
---------------------
.. autoclass:: DistEmbedding
.. autoclass:: SparseAdagrad
:members: step
Distributed workload split
--------------------------
.. autosummary::
:toctree: ../../generated/
node_split
edge_split
Distributed Sampling
--------------------
Distributed DataLoader
``````````````````````
.. currentmodule:: dgl.distributed.dist_dataloader
.. autoclass:: DistDataLoader
Distributed Neighbor Sampling
`````````````````````````````
.. currentmodule:: dgl.distributed.graph_services
.. autosummary::
:toctree: ../../generated/
sample_neighbors
find_edges
in_subgraph
Partition
---------
Graph partition book
````````````````````
.. currentmodule:: dgl.distributed.graph_partition_book
.. autoclass:: GraphPartitionBook
:members: shared_memory, num_partitions, metadata, nid2partid, eid2partid, partid2nids, partid2eids, nid2localnid, eid2localeid, partid
.. autoclass:: PartitionPolicy
:members: policy_str, part_id, partition_book, to_local, to_partid, get_part_size, get_size
Split and Load Graphs
`````````````````````
.. currentmodule:: dgl.distributed.partition
.. autosummary::
:toctree: ../../generated/
load_partition
load_partition_book
partition_graph
......@@ -12,3 +12,4 @@ API Reference
dgl.function
sampling
dgl.dataloading
dgl.distributed
......@@ -110,6 +110,7 @@ Getting Started
api/python/dgl.function
api/python/sampling
api/python/dgl.dataloading
api/python/dgl.distributed
.. toctree::
:maxdepth: 3
......
"""DGL distributed."""
"""DGL distributed module contains classes and functions to support
distributed graph neural network training and inference in a cluster of
machines.
This includes a few submodules:
* distributed data structures including distributed graph, distributed tensor
and distributed embeddings.
* distributed sampling.
* distributed workload split at runtime.
* graph partition.
"""
import os
import sys
from .dist_graph import DistGraphServer, DistGraph, DistTensor, node_split, edge_split
from .dist_graph import DistGraphServer, DistGraph, node_split, edge_split
from .dist_tensor import DistTensor
from .partition import partition_graph, load_partition, load_partition_book
from .graph_partition_book import GraphPartitionBook, RangePartitionBook, PartitionPolicy
from .graph_partition_book import GraphPartitionBook, PartitionPolicy
from .sparse_emb import SparseAdagrad, DistEmbedding
from .rpc import *
......
......@@ -44,7 +44,15 @@ def _init_rpc(ip_config, num_servers, max_queue_size, net_type, role, num_thread
def initialize(ip_config, num_servers=1, num_workers=0,
max_queue_size=MAX_QUEUE_SIZE, net_type='socket',
num_worker_threads=1):
"""Init rpc service
"""Initialize DGL's distributed module
This function initializes DGL's distributed module. It acts differently in server
or client modes. In the server mode, it runs the server code and never returns.
In the client mode, it builds connections with servers for communication and
creates worker processes for distributed sampling. `num_workers` specifies
the number of sampling worker processes per trainer process.
Users also have to provide the number of server processes on each machine in order
to connect to all the server processes in the cluster of machines correctly.
Parameters
----------
......@@ -57,12 +65,21 @@ def initialize(ip_config, num_servers=1, num_workers=0,
for distributed sampling.
max_queue_size : int
Maximal size (bytes) of client queue buffer (~20 GB on default).
Note that the 20 GB is just an upper-bound and DGL uses zero-copy and
it will not allocate 20GB memory at once.
net_type : str
Networking type. Current options are: 'socket'.
net_type : str, optional
Networking type. Currently the only valid option is ``'socket'``.
Default: ``'socket'``
num_worker_threads: int
The number of threads in a worker process.
Note
----
Users have to invoke this API before any DGL's distributed API and framework-specific
distributed API. For example, when used with Pytorch, users have to invoke this function
before Pytorch's `pytorch.distributed.init_process_group`.
"""
if os.environ.get('DGL_ROLE', 'client') == 'server':
from .dist_graph import DistGraphServer
......@@ -138,7 +155,14 @@ def is_initialized():
return INITIALIZED
def exit_client():
"""Register exit callback.
"""Trainer exits
This function is called automatically when a Python process exits. Normally,
the training script does not need to invoke this function at the end.
In the case that the training script needs to initialize the distributed module
multiple times (so far, this is needed in the unit tests), the training script
needs to call `exit_client` before calling `initialize` again.
"""
# Only client with rank_0 will send shutdown request to servers.
finalize_worker() # finalize workers should be earilier than barrier, and non-blocking
......
......@@ -58,31 +58,60 @@ def enable_mp_debug():
DATALOADER_ID = 0
class DistDataLoader:
"""DGL customized multiprocessing dataloader, which is designed for using with DistGraph."""
def __init__(self, dataset, batch_size, shuffle=False, collate_fn=None, drop_last=False,
queue_size=None):
"""
This class will utilize the worker process created by dgl.distributed.initialize function
Note that the iteration order is not guaranteed with this class. For example,
"""DGL customized multiprocessing dataloader.
DistDataLoader provides a similar interface to Pytorch's DataLoader to generate mini-batches
with multiprocessing. It utilizes the worker processes created by
:func:`dgl.distributed.initialize` to parallelize sampling.
Parameters
----------
dataset: a tensor
A tensor of node IDs or edge IDs.
batch_size: int
The number of samples per batch to load.
shuffle: bool, optional
Set to ``True`` to have the data reshuffled at every epoch (default: ``False``).
collate_fn: callable, optional
The function is typically used to sample neighbors of the nodes in a batch
or the endpoint nodes of the edges in a batch.
drop_last: bool, optional
Set to ``True`` to drop the last incomplete batch, if the dataset size is not
divisible by the batch size. If ``False`` and the size of dataset is not divisible
by the batch size, then the last batch will be smaller. (default: ``False``)
queue_size: int, optional
Size of multiprocessing queue
Examples
--------
>>> g = dgl.distributed.DistGraph('graph-name')
>>> def sample(seeds):
... seeds = th.LongTensor(np.asarray(seeds))
... frontier = dgl.distributed.sample_neighbors(g, seeds, 10)
... return dgl.to_block(frontier, seeds)
>>> dataloader = dgl.distributed.DistDataLoader(dataset=nodes, batch_size=1000,
collate_fn=sample, shuffle=True)
>>> for block in dataloader:
... feat = g.ndata['features'][block.srcdata[dgl.NID]]
... labels = g.ndata['labels'][block.dstdata[dgl.NID]]
... pred = model(block, feat)
Note
----
When performing DGL's distributed sampling with multiprocessing, users have to use this class
instead of Pytorch's DataLoader because DGL's RPC requires that all processes establish
connections with servers before invoking any DGL's distributed API. Therefore, this dataloader
uses the worker processes created in :func:`dgl.distributed.initialize`.
Note
----
This dataloader does not guarantee the iteration order. For example,
if dataset = [1, 2, 3, 4], batch_size = 2 and shuffle = False, the order of [1, 2]
and [3, 4] is not guaranteed.
dataset (Dataset): dataset from which to load the data.
batch_size (int, optional): how many samples per batch to load
(default: ``1``).
shuffle (bool, optional): set to ``True`` to have the data reshuffled
at every epoch (default: ``False``).
collate_fn (callable, optional): merges a list of samples to form a
mini-batch of Tensor(s). Used when using batched loading from a
map-style dataset.
drop_last (bool, optional): set to ``True`` to drop the last incomplete batch,
if the dataset size is not divisible by the batch size. If ``False`` and
the size of dataset is not divisible by the batch size, then the last batch
will be smaller. (default: ``False``)
queue_size (int, optional): Size of multiprocessing queue
"""
def __init__(self, dataset, batch_size, shuffle=False, collate_fn=None, drop_last=False,
queue_size=None):
self.pool, self.num_workers = get_sampler_pool()
if queue_size is None:
queue_size = self.num_workers * 4 if self.num_workers > 0 else 4
......
......@@ -221,9 +221,9 @@ class EdgeDataView(MutableMapping):
class DistGraphServer(KVServer):
''' The DistGraph server.
This DistGraph server loads the graph data and sets up a service so that clients can read data
of a graph partition (graph structure, node data and edge data) from remote machines.
A server is responsible for one graph partition.
This DistGraph server loads the graph data and sets up a service so that trainers and
samplers can read data of a graph partition (graph structure, node data and edge data)
from remote machines. A server is responsible for one graph partition.
Currently, each machine runs only one main server with a set of backup servers to handle
clients' requests. The main server and the backup servers all handle the requests for the same
......@@ -297,35 +297,83 @@ class DistGraphServer(KVServer):
num_clients=self.num_clients, server_state=server_state)
class DistGraph:
''' The DistGraph client.
This provides the graph interface to access the partitioned graph data for distributed GNN
training. All data of partitions are loaded by the DistGraph server.
DistGraph can run in two modes: the standalone mode and the distributed mode.
* When a user runs the training script normally, DistGraph will be in the standalone mode.
In this mode, the input graph has to be constructed with only one partition. This mode is
used for testing and debugging purpose.
* When a user runs the training script with the distributed launch script, DistGraph will
be set into the distributed mode. This is used for actual distributed training.
When running in the distributed mode, `DistGraph` uses shared-memory to access
the partition data in the local machine.
This gives the best performance for distributed training when we run `DistGraphServer`
and `DistGraph` on the same machine. However, a user may want to run them in separate
machines. In this case, a user may want to disable shared memory by passing
`disable_shared_mem=False` when creating `DistGraphServer`. When shared-memory is disabled,
'''The class for accessing a distributed graph.
This class provides a subset of DGLGraph APIs for accessing partitioned graph data in
distributed GNN training and inference. Thus, its main use case is to work with
distributed sampling APIs to generate mini-batches and perform forward and
backward computation on the mini-batches.
The class can run in two modes: the standalone mode and the distributed mode.
* When a user runs the training script normally, ``DistGraph`` will be in the standalone mode.
In this mode, the input data must be constructed by
:py:meth:`~dgl.distributed.partition.partition_graph` with only one partition. This mode is
used for testing and debugging purpose. In this mode, users have to provide ``part_config``
so that ``DistGraph`` can load the input graph.
* When a user runs the training script with the distributed launch script, ``DistGraph`` will
be set into the distributed mode. This is used for actual distributed training. All data of
partitions are loaded by the ``DistGraph`` servers, which are created by DGL's launch script.
``DistGraph`` connects with the servers to access the partitioned graph data.
Currently, the ``DistGraph`` servers and clients run on the same set of machines
in the distributed mode. ``DistGraph`` uses shared-memory to access the partition data
in the local machine. This gives the best performance for distributed training
Users may want to run ``DistGraph`` servers and clients on separate sets of machines.
In this case, a user may want to disable shared memory by passing
``disable_shared_mem=False`` when creating ``DistGraphServer``. When shared memory is disabled,
a user has to pass a partition book.
Parameters
----------
graph_name : str
The name of the graph. This name has to be the same as the one used in DistGraphServer.
gpb : PartitionBook
The partition book object
part_config : str
The partition config file. It's used in the standalone mode.
The name of the graph. This name has to be the same as the one used for
partitioning a graph in :py:meth:`dgl.distributed.partition.partition_graph`.
gpb : GraphPartitionBook, optional
The partition book object. Normally, users do not need to provide the partition book.
This argument is necessary only when users want to run server process and trainer
processes on different machines.
part_config : str, optional
The path of partition configuration file generated by
:py:meth:`dgl.distributed.partition.partition_graph`. It's used in the standalone mode.
Examples
--------
The example shows the creation of ``DistGraph`` in the standalone mode.
>>> dgl.distributed.partition_graph(g, 'graph_name', 1, num_hops=1, part_method='metis',
out_path='output/', reshuffle=True)
>>> g = dgl.distributed.DistGraph('graph_name', part_config='output/graph_name.json')
The example shows the creation of ``DistGraph`` in the distributed mode.
>>> g = dgl.distributed.DistGraph('graph-name')
The code below shows the mini-batch training using ``DistGraph``.
>>> def sample(seeds):
... seeds = th.LongTensor(np.asarray(seeds))
... frontier = dgl.distributed.sample_neighbors(g, seeds, 10)
... return dgl.to_block(frontier, seeds)
>>> dataloader = dgl.distributed.DistDataLoader(dataset=nodes, batch_size=1000,
collate_fn=sample, shuffle=True)
>>> for block in dataloader:
... feat = g.ndata['features'][block.srcdata[dgl.NID]]
... labels = g.ndata['labels'][block.dstdata[dgl.NID]]
... pred = model(block, feat)
Note
----
``DistGraph`` currently only supports graphs with only one node type and one edge type.
For heterogeneous graphs, users need to convert them into DGL graphs with one node type and
one edge type and store the actual node types and edge types as node data and edge data.
Note
----
DGL's distributed training by default runs server processes and trainer processes on the same
set of machines. If users need to run them on different sets of machines, it requires
manually setting up servers and trainers. The setup is not fully tested yet.
'''
def __init__(self, graph_name, gpb=None, part_config=None):
self.graph_name = graph_name
......@@ -334,6 +382,8 @@ class DistGraph:
assert part_config is not None, \
'When running in the standalone model, the partition config file is required'
self._client = get_kvstore()
assert self._client is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
# Load graph partition data.
g, node_feats, edge_feats, self._gpb, _ = load_partition(part_config, 0)
assert self._gpb.num_partitions() == 1, \
......@@ -367,6 +417,8 @@ class DistGraph:
def _init(self):
self._client = get_kvstore()
assert self._client is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
self._g = _get_graph_from_shared_mem(self.graph_name)
self._gpb = get_shared_mem_partition_book(self.graph_name, self._g)
if self._gpb is None:
......@@ -394,11 +446,12 @@ class DistGraph:
DistGraph provides a global view of the distributed graph. Internally,
it may contains a partition of the graph if it is co-located with
the server. If there is no co-location, this returns None.
the server. When servers and clients run on separate sets of machines,
this returns None.
Returns
-------
DGLHeterograph
DGLGraph
The local partition
'''
return self._g
......@@ -499,34 +552,109 @@ class DistGraph:
return ['_E']
def number_of_nodes(self):
"""Return the number of nodes"""
return self._num_nodes
"""Alias of :func:`num_nodes`"""
return self.num_nodes()
def number_of_edges(self):
"""Return the number of edges"""
"""Alias of :func:`num_edges`"""
return self.num_edges()
def num_nodes(self):
"""Return the total number of nodes in the distributed graph.
Returns
-------
int
The number of nodes
Examples
--------
>>> g = dgl.distributed.DistGraph('ogb-product')
>>> print(g.number_of_nodes())
2449029
"""
return self._num_nodes
def num_edges(self):
"""Return the total number of edges in the distributed graph.
Returns
-------
int
The number of edges
Examples
--------
>>> g = dgl.distributed.DistGraph('ogb-product')
>>> print(g.number_of_nodes())
123718280
"""
return self._num_edges
def node_attr_schemes(self):
"""Return the node feature and embedding schemes."""
"""Return the node feature schemes.
Each feature scheme is a named tuple that stores the shape and data type
of the node feature.
Returns
-------
dict of str to schemes
The schemes of node feature columns.
Examples
--------
The following uses PyTorch backend.
>>> g.node_attr_schemes()
{'h': Scheme(shape=(4,), dtype=torch.float32)}
See Also
--------
edge_attr_schemes
"""
schemes = {}
for key in self.ndata:
schemes[key] = infer_scheme(self.ndata[key])
return schemes
def edge_attr_schemes(self):
"""Return the edge feature and embedding schemes."""
"""Return the edge feature schemes.
Each feature scheme is a named tuple that stores the shape and data type
of the edge feature.
Returns
-------
dict of str to schemes
The schemes of edge feature columns.
Examples
--------
The following uses PyTorch backend.
>>> g.edge_attr_schemes()
{'h': Scheme(shape=(4,), dtype=torch.float32)}
See Also
--------
node_attr_schemes
"""
schemes = {}
for key in self.edata:
schemes[key] = infer_scheme(self.edata[key])
return schemes
def rank(self):
''' The rank of the distributed graph store.
''' The rank of the current DistGraph.
This returns a unique number to identify the DistGraph object among all of
the client processes.
Returns
-------
int
The rank of the current graph store.
The rank of the current DistGraph.
'''
return role.get_global_rank()
......@@ -555,14 +683,15 @@ class DistGraph:
Returns
-------
GraphPartitionBook
Object that stores all kinds of partition information.
Object that stores all graph partition information.
"""
return self._gpb
def barrier(self):
'''Barrier for all client nodes.
This API will be blocked untill all the clients invoke this API.
This API blocks the current process untill all the clients invoke this API.
Please use this API with caution.
'''
self._client.barrier()
......@@ -688,17 +817,19 @@ def node_split(nodes, partition_book=None, rank=None, force_even=True):
returns a subset of nodes for the local rank. This method is used for
dividing workloads for distributed training.
The input nodes can be stored as a vector of masks. The length of the vector is
The input nodes are stored as a vector of masks. The length of the vector is
the same as the number of nodes in a graph; 1 indicates that the vertex in
the corresponding location exists.
There are two strategies to split the nodes. By default, it splits the nodes
in a way to maximize data locality. That is, all nodes that belong to a process
are returned. If `force_even` is set to true, the nodes are split evenly so
that each process gets almost the same number of nodes. The current implementation
can still enable data locality when a graph is partitioned with range partitioning.
that each process gets almost the same number of nodes.
When `force_even` is True, the data locality is still preserved if a graph is partitioned
with Metis and the node/edge IDs are shuffled.
In this case, majority of the nodes returned for a process are the ones that
belong to the process. If range partitioning is not used, data locality isn't guaranteed.
belong to the process. If node/edge IDs are not shuffled, data locality is not guaranteed.
Parameters
----------
......@@ -746,10 +877,12 @@ def edge_split(edges, partition_book=None, rank=None, force_even=True):
There are two strategies to split the edges. By default, it splits the edges
in a way to maximize data locality. That is, all edges that belong to a process
are returned. If `force_even` is set to true, the edges are split evenly so
that each process gets almost the same number of edges. The current implementation
can still enable data locality when a graph is partitioned with range partitioning.
In this case, majority of the edges returned for a process are the ones that
belong to the process. If range partitioning is not used, data locality isn't guaranteed.
that each process gets almost the same number of edges.
When `force_even` is True, the data locality is still preserved if a graph is partitioned
with Metis and the node/edge IDs are shuffled.
In this case, majority of the nodes returned for a process are the ones that
belong to the process. If node/edge IDs are not shuffled, data locality is not guaranteed.
Parameters
----------
......
......@@ -24,36 +24,91 @@ DIST_TENSOR_ID = 0
class DistTensor:
''' Distributed tensor.
DistTensor references to a tensor stored in the distributed KVStore.
When a DistTensor is created, it may reference to a tensor in the KVStore, or
create a new one. The tensor is identified by the name passed to the constructor
of DistTensor. If the name exists, DistTensor will reference the existing one.
In this case, the shape and the data type should match the existing tensor.
``DistTensor`` references to a distributed tensor sharded and stored in a cluster of machines.
It has the same interface as Pytorch Tensor to access its metadata (e.g., shape and data type).
To access data in a distributed tensor, it supports slicing rows and writing data to rows.
It does not support any operators of a deep learning framework, such as addition and
multiplication.
Currently, distributed tensors are designed to store node data and edge data of a distributed
graph. Therefore, their first dimensions have to be the number of nodes or edges in the graph.
The tensors are sharded in the first dimension based on the partition policy of nodes
or edges. When a distributed tensor is created, the partition policy is automatically
determined based on the first dimension if the partition policy is not provided: if the first
dimension matches the number of nodes, ``DistTensor`` will use the node partition policy;
if the first dimension matches the number of edges, ``DistTensor`` wll use the edge partition
policy. To determine the partition policy automatically, a DistGraph object has to be created.
Users can overwrite the rule by providing a partition policy directly.
A distributed tensor can be ether named or anonymous.
When a distributed tensor has a name, the tensor can be persistent if ``persistent=True``.
Normally, DGL destroys the distributed tensor in the system when the ``DistTensor`` object
goes away. However, a persistent tensor lives in the system even if
the ``DistTenor`` object disappears in the trainer process. The persistent tensor has
the same life span as the DGL servers. DGL does not allow an anonymous tensor to be persistent.
When a ``DistTensor`` object is created, it may reference to an existing distributed tensor or
create a new one. A distributed tensor is identified by the name passed to the constructor.
If the name exists, ``DistTensor`` will reference the existing one.
In this case, the shape and the data type must match the existing tensor.
If the name doesn't exist, a new tensor will be created in the kvstore.
If persistent=True when creating DistTesnor, the tensor in the KVStore will
be persistent. Even if DistTensor is destroyed in the local trainer process,
the tensor will still exist in KVStore. However, we do not allow an anonymous
tensor to be persistent.
When a distributed tensor is created, its values are initialized to zero. Users
can define an initialization function to control how the values are initialized.
The init function has two input arguments: shape and data type and returns a tensor.
Below shows an example of an init function:
.. highlight:: python
.. code-block:: python
def init_func(shape, dtype):
return torch.ones(shape=shape, dtype=dtype)
Parameters
----------
shape : tuple
The shape of the tensor
The shape of the tensor. The first dimension has to be the number of nodes or
the number of edges of a distributed graph.
dtype : dtype
The dtype of the tensor
name : string
The name of the tensor.
init_func : callable
The function to initialize data in the tensor.
part_policy : PartitionPolicy
The partition policy of the tensor
The dtype of the tensor. The data type has to be the one in the deep learning framework.
name : string, optional
The name of the embeddings. The name can uniquely identify embeddings in a system
so that another ``DistTensor`` object can referent to the distributed tensor.
init_func : callable, optional
The function to initialize data in the tensor. If the init function is not provided,
the values of the embeddings are initialized to zero.
part_policy : PartitionPolicy, optional
The partition policy of the rows of the tensor to different machines in the cluster.
Currently, it only supports node partition policy or edge partition policy.
The system determines the right partition policy automatically.
persistent : bool
Whether the created tensor is persistent.
Whether the created tensor lives after the ``DistTensor`` object is destroyed.
Examples
--------
>>> init = lambda shape, dtype: th.ones(shape, dtype=dtype)
>>> arr = dgl.distributed.DistTensor((g.number_of_nodes(), 2), th.int32, init_func=init)
>>> print(arr[0:3])
tensor([[1, 1],
[1, 1],
[1, 1]], dtype=torch.int32)
>>> arr[0:3] = th.ones((3, 2), dtype=th.int32) * 2
>>> print(arr[0:3])
tensor([[2, 2],
[2, 2],
[2, 2]], dtype=torch.int32)
Note
----
The creation of ``DistTensor`` is a synchronized operation. When a trainer process tries to
create a ``DistTensor`` object, the creation succeeds only when all trainer processes
do the same.
'''
def __init__(self, shape, dtype, name=None, init_func=None, part_policy=None,
persistent=False):
self.kvstore = get_kvstore()
assert self.kvstore is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
self._shape = shape
self._dtype = dtype
......@@ -72,10 +127,13 @@ class DistTensor:
+ 'Please provide a partition policy explicitly.'
part_policy = policy
assert part_policy is not None, \
'Cannot find a right partition policy. Currently, DistTensor only ' \
+ 'supports partition policy associated with nodes or edges.'
'Cannot find a right partition policy. It is either because ' \
+ 'its first dimension does not match the number of nodes or edges ' \
+ 'of a distributed graph or there does not exist a distributed graph.'
self._part_policy = part_policy
assert part_policy.get_size() == shape[0], \
'The partition policy does not match the input shape.'
if init_func is None:
init_func = _default_init_data
......@@ -122,20 +180,44 @@ class DistTensor:
@property
def part_policy(self):
''' Return the partition policy '''
'''Return the partition policy
Returns
-------
PartitionPolicy
The partition policy of the distributed tensor.
'''
return self._part_policy
@property
def shape(self):
''' Return the shape of the distributed tensor. '''
'''Return the shape of the distributed tensor.
Returns
-------
tuple
The shape of the distributed tensor.
'''
return self._shape
@property
def dtype(self):
''' Return the data type of the distributed tensor. '''
'''Return the data type of the distributed tensor.
Returns
------
dtype
The data type of the tensor.
'''
return self._dtype
@property
def name(self):
''' Return the name of the distributed tensor '''
'''Return the name of the distributed tensor
Returns
-------
str
The name of the tensor.
'''
return self._name
......@@ -68,7 +68,7 @@ def get_shared_mem_partition_book(graph_name, graph_part):
Returns
-------
GraphPartitionBook or RangePartitionBook
GraphPartitionBook
A graph partition book for a particular partition.
'''
if not exist_shared_mem_array(_get_ndata_path(graph_name, 'meta')):
......@@ -77,10 +77,183 @@ def get_shared_mem_partition_book(graph_name, graph_part):
if is_range_part == 1:
return RangePartitionBook(part_id, num_parts, node_map, edge_map)
else:
return GraphPartitionBook(part_id, num_parts, node_map, edge_map, graph_part)
return BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph_part)
class GraphPartitionBook:
"""GraphPartitionBook is used to store parition information.
""" The base class of the graph partition book.
For distributed training, a graph is partitioned into multiple parts and is loaded
in multiple machines. The partition book contains all necessary information to locate
nodes and edges in the cluster.
The partition book contains various partition information, including
* the number of partitions,
* the partition ID that a node or edge belongs to,
* the node IDs and the edge IDs that a partition has.
* the local IDs of nodes and edges in a partition.
Currently, there are two classes that implement `GraphPartitionBook`:
`BasicGraphPartitionBook` and `RangePartitionBook`. `BasicGraphPartitionBook`
stores the mappings between every individual node/edge ID and partition ID on
every machine, which usually consumes a lot of memory, while `RangePartitionBook`
calculates the mapping between node/edge IDs and partition IDs based on some small
metadata because nodes/edges have been relabeled to have IDs in the same partition
fall in a contiguous ID range. `RangePartitionBook` is usually a preferred way to
provide mappings between node/edge IDs and partition IDs.
A graph partition book is constructed automatically when a graph is partitioned.
When a graph partition is loaded, a graph partition book is loaded as well.
Please see :py:meth:`~dgl.distributed.partition.partition_graph`,
:py:meth:`~dgl.distributed.partition.load_partition` and
:py:meth:`~dgl.distributed.partition.load_partition_book` for more details.
"""
def shared_memory(self, graph_name):
"""Move the partition book to shared memory.
Parameters
----------
graph_name : str
The graph name. This name will be used to read the partition book from shared
memory in another process.
"""
def num_partitions(self):
"""Return the number of partitions.
Returns
-------
int
number of partitions
"""
def metadata(self):
"""Return the partition meta data.
The meta data includes:
* The machine ID.
* Number of nodes and edges of each partition.
Examples
--------
>>> print(g.get_partition_book().metadata())
>>> [{'machine_id' : 0, 'num_nodes' : 3000, 'num_edges' : 5000},
... {'machine_id' : 1, 'num_nodes' : 2000, 'num_edges' : 4888},
... ...]
Returns
-------
list[dict[str, any]]
Meta data of each partition.
"""
def nid2partid(self, nids):
"""From global node IDs to partition IDs
Parameters
----------
nids : tensor
global node IDs
Returns
-------
tensor
partition IDs
"""
def eid2partid(self, eids):
"""From global edge IDs to partition IDs
Parameters
----------
eids : tensor
global edge IDs
Returns
-------
tensor
partition IDs
"""
def partid2nids(self, partid):
"""From partition id to global node IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
node IDs
"""
def partid2eids(self, partid):
"""From partition id to global edge IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
edge IDs
"""
def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
Parameters
----------
nids : tensor
global node IDs
partid : int
partition ID
Returns
-------
tensor
local node IDs
"""
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
Parameters
----------
eids : tensor
global edge ids
partid : int
partition ID
Returns
-------
tensor
local edge ids
"""
@property
def partid(self):
"""Get the current partition id
Return
------
int
The partition id of current machine
"""
class BasicPartitionBook(GraphPartitionBook):
"""This provides the most flexible way to store parition information.
The partition book maintains the mapping of every single node IDs and edge IDs to
partition IDs. This is very flexible at the coast of large memory consumption.
On a large graph, the mapping consumes significant memory and this partition book
is not recommended.
Parameters
----------
......@@ -154,11 +327,6 @@ class GraphPartitionBook:
def shared_memory(self, graph_name):
"""Move data to shared memory.
Parameters
----------
graph_name : str
The graph name
"""
self._meta, self._nid2partid, self._eid2partid = _move_metadata_to_shared_mem(
graph_name, self._num_nodes(), self._num_edges(), self._part_id, self._num_partitions,
......@@ -166,33 +334,11 @@ class GraphPartitionBook:
def num_partitions(self):
"""Return the number of partitions.
Returns
-------
int
number of partitions
"""
return self._num_partitions
def metadata(self):
"""Return the partition meta data.
The meta data includes:
* The machine ID.
* Number of nodes and edges of each partition.
Examples
--------
>>> print(g.get_partition_book().metadata())
>>> [{'machine_id' : 0, 'num_nodes' : 3000, 'num_edges' : 5000},
... {'machine_id' : 1, 'num_nodes' : 2000, 'num_edges' : 4888},
... ...]
Returns
-------
list[dict[str, any]]
Meta data of each partition.
"""
return self._partition_meta_data
......@@ -208,78 +354,26 @@ class GraphPartitionBook:
def nid2partid(self, nids):
"""From global node IDs to partition IDs
Parameters
----------
nids : tensor
global node IDs
Returns
-------
tensor
partition IDs
"""
return F.gather_row(self._nid2partid, nids)
def eid2partid(self, eids):
"""From global edge IDs to partition IDs
Parameters
----------
eids : tensor
global edge IDs
Returns
-------
tensor
partition IDs
"""
return F.gather_row(self._eid2partid, eids)
def partid2nids(self, partid):
"""From partition id to global node IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
node IDs
"""
return self._partid2nids[partid]
def partid2eids(self, partid):
"""From partition id to global edge IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
edge IDs
"""
return self._partid2eids[partid]
def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
Parameters
----------
nids : tensor
global node IDs
partid : int
partition ID
Returns
-------
tensor
local node IDs
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
......@@ -288,18 +382,6 @@ class GraphPartitionBook:
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
Parameters
----------
eids : tensor
global edge ids
partid : int
partition ID
Returns
-------
tensor
local edge ids
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
......@@ -309,17 +391,16 @@ class GraphPartitionBook:
@property
def partid(self):
"""Get the current partition id
Return
------
int
The partition id of current machine
"""
return self._part_id
class RangePartitionBook:
"""RangePartitionBook is used to store parition information.
class RangePartitionBook(GraphPartitionBook):
"""This partition book supports more efficient storage of partition information.
This partition book is used if the nodes and edges of a graph partition are assigned
with contiguous IDs. It uses very small amount of memory to store the partition
information.
Parameters
----------
......@@ -358,11 +439,6 @@ class RangePartitionBook:
def shared_memory(self, graph_name):
"""Move data to shared memory.
Parameters
----------
graph_name : str
The graph name
"""
self._meta = _move_metadata_to_shared_mem(
graph_name, self._num_nodes(), self._num_edges(), self._partid,
......@@ -370,11 +446,6 @@ class RangePartitionBook:
def num_partitions(self):
"""Return the number of partitions.
Returns
-------
int
number of partitions
"""
return self._num_partitions
......@@ -391,39 +462,12 @@ class RangePartitionBook:
def metadata(self):
"""Return the partition meta data.
The meta data includes:
* The machine ID.
* Number of nodes and edges of each partition.
Examples
--------
>>> print(g.get_partition_book().metadata())
>>> [{'machine_id' : 0, 'num_nodes' : 3000, 'num_edges' : 5000},
... {'machine_id' : 1, 'num_nodes' : 2000, 'num_edges' : 4888},
... ...]
Returns
-------
list[dict[str, any]]
Meta data of each partition.
"""
return self._partition_meta_data
def nid2partid(self, nids):
"""From global node IDs to partition IDs
Parameters
----------
nids : tensor
global node IDs
Returns
-------
tensor
partition IDs
"""
nids = utils.toindex(nids)
ret = np.searchsorted(self._node_map, nids.tonumpy(), side='right')
......@@ -433,16 +477,6 @@ class RangePartitionBook:
def eid2partid(self, eids):
"""From global edge IDs to partition IDs
Parameters
----------
eids : tensor
global edge IDs
Returns
-------
tensor
partition IDs
"""
eids = utils.toindex(eids)
ret = np.searchsorted(self._edge_map, eids.tonumpy(), side='right')
......@@ -452,16 +486,6 @@ class RangePartitionBook:
def partid2nids(self, partid):
"""From partition id to global node IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
node IDs
"""
# TODO do we need to cache it?
start = self._node_map[partid - 1] if partid > 0 else 0
......@@ -471,16 +495,6 @@ class RangePartitionBook:
def partid2eids(self, partid):
"""From partition id to global edge IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
edge IDs
"""
# TODO do we need to cache it?
start = self._edge_map[partid - 1] if partid > 0 else 0
......@@ -490,18 +504,6 @@ class RangePartitionBook:
def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
Parameters
----------
nids : tensor
global node IDs
partid : int
partition ID
Returns
-------
tensor
local node IDs
"""
if partid != self._partid:
raise RuntimeError('Now RangePartitionBook does not support \
......@@ -515,18 +517,6 @@ class RangePartitionBook:
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
Parameters
----------
eids : tensor
global edge ids
partid : int
partition ID
Returns
-------
tensor
local edge ids
"""
if partid != self._partid:
raise RuntimeError('Now RangePartitionBook does not support \
......@@ -541,11 +531,6 @@ class RangePartitionBook:
@property
def partid(self):
"""Get the current partition id
Return
------
int
The partition id of current machine
"""
return self._partid
......@@ -553,16 +538,21 @@ NODE_PART_POLICY = 'node'
EDGE_PART_POLICY = 'edge'
class PartitionPolicy(object):
"""Wrapper for GraphPartitionBook and RangePartitionBook.
"""This defines a partition policy for a distributed tensor or distributed embedding.
When DGL shards tensors and stores them in a cluster of machines, it requires
partition policies that map rows of the tensors to machines in the cluster.
We can extend this class to support HeteroGraph in the future.
Although an arbitrary partition policy can be defined, DGL currently supports
two partition policies for mapping nodes and edges to machines. To define a partition
policy from a graph partition book, users need to specify the policy name ('node' or 'edge').
Parameters
----------
policy_str : str
partition-policy string, e.g., 'edge' or 'node'.
partition_book : GraphPartitionBook or RangePartitionBook
Main class storing the partition information
Partition policy name, e.g., 'edge' or 'node'.
partition_book : GraphPartitionBook
A graph partition book
"""
def __init__(self, policy_str, partition_book):
# TODO(chao): support more policies for HeteroGraph
......@@ -574,17 +564,35 @@ class PartitionPolicy(object):
@property
def policy_str(self):
"""Get policy string"""
"""Get the policy name
Returns
-------
str
The name of the partition policy.
"""
return self._policy_str
@property
def part_id(self):
"""Get partition ID"""
"""Get partition ID
Returns
-------
int
The partition ID
"""
return self._part_id
@property
def partition_book(self):
"""Get partition book"""
"""Get partition book
Returns
-------
GraphPartitionBook
The graph partition book
"""
return self._partition_book
def to_local(self, id_tensor):
......
......@@ -247,10 +247,9 @@ def _distributed_access(g, nodes, issue_remote_req, local_access):
def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False):
"""Sample from the neighbors of the given nodes from a distributed graph.
When sampling with replacement, the sampled subgraph could have parallel edges.
For sampling without replace, if fanout > the number of neighbors, all the
neighbors are sampled.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The returned graph will contain all the nodes in the
original graph, but only the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
......@@ -260,27 +259,38 @@ def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False):
Parameters
----------
g : DistGraph
The distributed graph.
The distributed graph..
nodes : tensor or dict
Node ids to sample neighbors from. If it's a dict, it should contain only
Node IDs to sample neighbors from. If it's a dict, it should contain only
one key-value pair to make this API consistent with dgl.sampling.sample_neighbors.
fanout : int
The number of sampled neighbors for each node.
The number of edges to be sampled for each node.
If -1 is given, all of the neighbors will be selected.
edge_dir : str, optional
Edge direction ('in' or 'out'). If is 'in', sample from in edges. Otherwise,
sample from out edges.
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : str, optional
Feature name used as the probabilities associated with each neighbor of a node.
Its shape should be compatible with a scalar edge feature tensor.
Feature name used as the (unnormalized) probabilities associated with each
neighboring edge of a node. The feature must have only one element for each
edge.
The features must be non-negative floats, and the sum of the features of
inbound/outbound edges for every node must be positive (though they don't have
to sum up to one). Otherwise, the result will be undefined.
replace : bool, optional
If True, sample with replacement.
When sampling with replacement, the sampled subgraph could have parallel edges.
For sampling without replacement, if fanout > the number of neighbors, all the
neighbors are sampled. If fanout == -1, all neighbors are collected.
Returns
-------
DGLHeteroGraph
A sampled subgraph containing only the sampled neighbor edges from
``nodes``. The sampled subgraph has the same metagraph as the original
one.
DGLGraph
A sampled subgraph containing only the sampled neighboring edges. It is on CPU.
"""
if isinstance(nodes, dict):
assert len(nodes) == 1, 'The distributed sampler only supports one node type for now.'
......@@ -386,25 +396,31 @@ def find_edges(g, edge_ids):
return _distributed_edge_access(g, edge_ids, issue_remove_req, local_access)
def in_subgraph(g, nodes):
"""Extract the subgraph containing only the in edges of the given nodes.
"""Return the subgraph induced on the inbound edges of the given nodes.
The subgraph keeps the same type schema and all the nodes are preserved regardless
of whether they have an edge or not.
The subgraph keeps the same type schema and the cardinality of the original one.
Node/edge features are not preserved. The original IDs
Node/edge features are not preserved. The original IDs of
the extracted edges are stored as the `dgl.EID` feature in the returned graph.
For now, we only support the input graph with one node type and one edge type.
Parameters
----------
g : DistGraph
The distributed graph structure.
nodes : tensor
nodes : tensor or dict
Node ids to sample neighbors from.
Returns
-------
DGLHeteroGraph
DGLGraph
The subgraph.
One can retrieve the mapping from subgraph edge ID to parent
edge ID via ``dgl.EID`` edge features of the subgraph.
"""
if isinstance(nodes, dict):
assert len(nodes) == 1, 'The distributed in_subgraph only supports one node type for now.'
......
......@@ -1055,7 +1055,7 @@ class KVClient(object):
Parameters
----------
partition_book : GraphPartitionBook or RangePartitionBook
partition_book : GraphPartitionBook
Store the partition information
"""
# Get shared data from server side
......
"""Functions for partitions.
For distributed training, a graph is partitioned and partitions are stored in files
organized as follows:
```
data_root_dir/
|-- part_conf.json # partition configuration file in JSON
|-- node_map # partition id of each node stored in a numpy array
|-- edge_map # partition id of each edge stored in a numpy array
|-- part0/ # data for partition 0
|-- node_feats # node features stored in binary format
|-- edge_feats # edge features stored in binary format
|-- graph # graph structure of this partition stored in binary format
|-- part1/ # data for partition 1
|-- node_feats
|-- edge_feats
|-- graph
```
The partition configuration file stores the file locations. For the above example,
the configuration file will look like the following:
```
{
"graph_name" : "test",
"part_method" : "metis",
"num_parts" : 2,
"halo_hops" : 1,
"node_map" : "data_root_dir/node_map.npy",
"edge_map" : "data_root_dir/edge_map.npy"
"num_nodes" : 1000000,
"num_edges" : 52000000,
"part-0" : {
"node_feats" : "data_root_dir/part0/node_feats.dgl",
"edge_feats" : "data_root_dir/part0/edge_feats.dgl",
"part_graph" : "data_root_dir/part0/graph.dgl",
},
"part-1" : {
"node_feats" : "data_root_dir/part1/node_feats.dgl",
"edge_feats" : "data_root_dir/part1/edge_feats.dgl",
"part_graph" : "data_root_dir/part1/graph.dgl",
},
}
```
Here are the definition of the fields in the partition configuration file:
* `graph_name` is the name of the graph given by a user.
* `part_method` is the method used to assign nodes to partitions.
Currently, it supports "random" and "metis".
* `num_parts` is the number of partitions.
* `halo_hops` is the number of HALO nodes we want to include in a partition.
* `node_map` is the node assignment map, which tells the partition Id a node is assigned to.
* `edge_map` is the edge assignment map, which tells the partition Id an edge is assigned to.
* `num_nodes` is the number of nodes in the global graph.
* `num_edges` is the number of edges in the global graph.
* `part-*` stores the data of a partition.
Nodes in each partition is *relabeled* to always start with zero. We call the node
ID in the original graph, *global ID*, while the relabeled ID in each partition,
*local ID*. Each partition graph has an integer node data tensor stored under name
`dgl.NID` and each value is the node's global ID. Similarly, edges are relabeled too
and the mapping from local ID to global ID is stored as an integer edge data tensor
under name `dgl.EID`.
Note that each partition can contain *HALO* nodes and edges, those belonging to
other partitions but are included in this partition for integrity or efficiency concerns.
We call nodes and edges that truly belong to one partition *local nodes/edges*, while
the rest "HALO nodes/edges".
Node and edge features are splitted and stored together with each graph partition.
We do not store features of HALO nodes and edges.
Two useful functions in this module:
* :func:`~dgl.distributed.load_partition` loads one partition and the meta data into memory.
* :func:`~dgl.distributed.partition` partitions a graph into files organized as above.
"""
"""Functions for partitions. """
import json
import os
......@@ -87,10 +10,10 @@ from ..base import NID, EID
from ..random import choice as random_choice
from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors
from ..transform import metis_partition_assignment, partition_graph_with_halo
from .graph_partition_book import GraphPartitionBook, RangePartitionBook
from .graph_partition_book import BasicPartitionBook, RangePartitionBook
def load_partition(conf_file, part_id):
''' Load data of a partition from the data path in the DistGraph server.
def load_partition(part_config, part_id):
''' Load data of a partition from the data path.
A partition data includes a graph structure of the partition, a dict of node tensors,
a dict of edge tensors and some metadata. The partition may contain the HALO nodes,
......@@ -100,12 +23,11 @@ def load_partition(conf_file, part_id):
the information of the global graph (not the local partition), which includes the number
of nodes, the number of edges as well as the node assignment of the global graph.
The function currently loads data through the normal filesystem interface. In the future,
we need to support loading data from other storage such as S3 and HDFS.
The function currently loads data through the local filesystem interface.
Parameters
----------
conf_file : str
part_config : str
The path of the partition config file.
part_id : int
The partition Id.
......@@ -115,15 +37,15 @@ def load_partition(conf_file, part_id):
DGLGraph
The graph partition structure.
dict of tensors
All node features.
Node features.
dict of tensors
All edge features.
Edge features.
GraphPartitionBook
The global partition information.
The graph partition information.
str
The graph name
'''
with open(conf_file) as conf_f:
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id)
part_files = part_metadata['part-{}'.format(part_id)]
......@@ -137,18 +59,18 @@ def load_partition(conf_file, part_id):
assert NID in graph.ndata, "the partition graph should contain node mapping to global node Id"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge Id"
gpb, graph_name = load_partition_book(conf_file, part_id, graph)
gpb, graph_name = load_partition_book(part_config, part_id, graph)
nids = F.boolean_mask(graph.ndata[NID], graph.ndata['inner_node'])
partids = gpb.nid2partid(nids)
assert np.all(F.asnumpy(partids == part_id)), 'load a wrong partition'
return graph, node_feats, edge_feats, gpb, graph_name
def load_partition_book(conf_file, part_id, graph=None):
def load_partition_book(part_config, part_id, graph=None):
''' Load a graph partition book from the partition config file.
Parameters
----------
conf_file : str
part_config : str
The path of the partition config file.
part_id : int
The partition Id.
......@@ -162,7 +84,7 @@ def load_partition_book(conf_file, part_id, graph=None):
str
The graph name
'''
with open(conf_file) as conf_f:
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
assert 'num_parts' in part_metadata, 'num_parts does not exist.'
assert part_metadata['num_parts'] > part_id, \
......@@ -187,7 +109,7 @@ def load_partition_book(conf_file, part_id, graph=None):
return RangePartitionBook(part_id, num_parts, np.array(node_map),
np.array(edge_map)), part_metadata['graph_name']
else:
return GraphPartitionBook(part_id, num_parts, node_map, edge_map,
return BasicPartitionBook(part_id, num_parts, node_map, edge_map,
graph), part_metadata['graph_name']
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
......@@ -199,33 +121,95 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
the node assignment; 3) split the node features and edge features based on
the partition result.
The partitioned data is stored into multiple files.
When a graph is partitioned, each partition can contain *HALO* nodes and edges, which are
the ones that belong to
other partitions but are included in this partition for integrity or efficiency concerns.
In this document, *local nodes/edges* refers to the nodes and edges that truly belong to
a partition. The rest are "HALO nodes/edges".
The partitioned data is stored into multiple files organized as follows:
.. code-block:: none
data_root_dir/
|-- graph_name.json # partition configuration file in JSON
|-- node_map.npy # partition id of each node stored in a numpy array (optional)
|-- edge_map.npy # partition id of each edge stored in a numpy array (optional)
|-- part0/ # data for partition 0
|-- node_feats.dgl # node features stored in binary format
|-- edge_feats.dgl # edge features stored in binary format
|-- graph.dgl # graph structure of this partition stored in binary format
|-- part1/ # data for partition 1
|-- node_feats.dgl
|-- edge_feats.dgl
|-- graph.dgl
First, the metadata of the original graph and the partitioning is stored in a JSON file
named after `graph_name`. This JSON file contains the information of the original graph
as well as the file names that store each partition.
as well as the path of the files that store each partition. Below show an example.
.. code-block:: none
{
"graph_name" : "test",
"part_method" : "metis",
"num_parts" : 2,
"halo_hops" : 1,
"node_map" : "data_root_dir/node_map.npy",
"edge_map" : "data_root_dir/edge_map.npy"
"num_nodes" : 1000000,
"num_edges" : 52000000,
"part-0" : {
"node_feats" : "data_root_dir/part0/node_feats.dgl",
"edge_feats" : "data_root_dir/part0/edge_feats.dgl",
"part_graph" : "data_root_dir/part0/graph.dgl",
},
"part-1" : {
"node_feats" : "data_root_dir/part1/node_feats.dgl",
"edge_feats" : "data_root_dir/part1/edge_feats.dgl",
"part_graph" : "data_root_dir/part1/graph.dgl",
},
}
Here are the definition of the fields in the partition configuration file:
The node assignment is stored in a separate file if we don't reshuffle node Ids to ensure
that all nodes in a partition fall into a contiguous Id range. The node assignment is stored
in a numpy file.
* `graph_name` is the name of the graph given by a user.
* `part_method` is the method used to assign nodes to partitions.
Currently, it supports "random" and "metis".
* `num_parts` is the number of partitions.
* `halo_hops` is the number of HALO nodes we want to include in a partition.
* `node_map` is the node assignment map, which tells the partition Id a node is assigned to.
* `edge_map` is the edge assignment map, which tells the partition Id an edge is assigned to.
* `num_nodes` is the number of nodes in the global graph.
* `num_edges` is the number of edges in the global graph.
* `part-*` stores the data of a partition.
All node features in a partition are stored in a file with DGL format. The node features are
stored in a dictionary, in which the key is the node data name and the value is a tensor.
If node IDs and edge IDs are not shuffled to ensure that all nodes/edges in a partition
fall into a contiguous ID range, DGL needs to store node/edge mappings (from
node/edge IDs to partition IDs) in separate files (node_map.npy and edge_map.npy).
The node/edge mappings are stored in numpy files.
All edge features in a partition are stored in a file with DGL format. The edge features are
stored in a dictionary, in which the key is the edge data name and the value is a tensor.
The graph structure of a partition is stored in a file with the DGLGraph format.
Nodes in each partition is *relabeled* to always start with zero. We call the node
ID in the original graph, *global ID*, while the relabeled ID in each partition,
*local ID*. Each partition graph has an integer node data tensor stored under name
`dgl.NID` and each value is the node's global ID. Similarly, edges are relabeled too
and the mapping from local ID to global ID is stored as an integer edge data tensor
under name `dgl.EID`.
The graph structure of a partition is stored in a file with the DGLGraph format. The DGLGraph
contains the mapping of node/edge Ids to the Ids in the global graph. The mappings can be
accessed with `part.ndata[dgl.NID]` and `part.edata[dgl.NID]`, where `part` is the partition
graph structure. In addition to the mapping, the partition graph contains node data
("inner_node" and "orig_id") and edge data ("inner_edge").
The partition graph contains additional node data ("inner_node" and "orig_id") and
edge data ("inner_edge"):
* "inner_node" indicates whether a node belongs to a partition.
* "inner_edge" indicates whether an edge belongs to a partition.
* "orig_id" exists when reshuffle=True. It indicates the original node Ids in the original
graph before reshuffling.
Node and edge features are splitted and stored together with each graph partition.
All node/edge features in a partition are stored in a file with DGL format. The node/edge
features are stored in dictionaries, in which the key is the node/edge data name and
the value is a tensor. We do not store features of HALO nodes and edges.
When performing Metis partitioning, we can put some constraint on the partitioning.
Current, it supports two constrants to balance the partitioning. By default, Metis
always tries to balance the number of nodes in each partition.
......@@ -241,22 +225,38 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
g : DGLGraph
The input graph to partition
graph_name : str
The name of the graph.
The name of the graph. The name will be used to construct
:py:meth:`~dgl.distributed.DistGraph`.
num_parts : int
The number of partitions
num_hops : int
The number of hops of HALO nodes we construct on a partition graph structure.
part_method : str
The partition method. It supports "random" and "metis".
out_path : str
The path to store the files for all partitioned data.
reshuffle : bool
num_hops : int, optional
The number of hops of HALO nodes we construct on a partition graph structure.
The default value is 1.
part_method : str, optional
The partition method. It supports "random" and "metis". The default value is "metis".
reshuffle : bool, optional
Reshuffle nodes and edges so that nodes and edges in a partition are in
contiguous Id range.
balance_ntypes : tensor
Node type of each node
contiguous Id range. The default value is True
balance_ntypes : tensor, optional
Node type of each node. This is a 1D-array of integers. Its values indicates the node
type of each node. This argument is used by Metis partition. When the argument is
specified, the Metis algorithm will try to partition the input graph into partitions where
each partition has roughly the same number of nodes for each node type. The default value
is None, which means Metis partitions the graph to only balance the number of nodes.
balance_edges : bool
Indicate whether to balance the edges.
Indicate whether to balance the edges in each partition. This argument is used by
the Metis algorithm.
Examples
--------
>>> dgl.distributed.partition_graph(g, 'test', 4, num_hops=1, part_method='metis',
out_path='output/', reshuffle=True,
balance_ntypes=g.ndata['train_mask'],
balance_edges=True)
>>> g, node_feats, edge_feats, gpb, graph_name = dgl.distributed.load_partition(
'output/test.json', 0)
'''
if num_parts == 1:
parts = {0: g}
......
......@@ -5,31 +5,62 @@ from .. import utils
from .dist_tensor import DistTensor
class DistEmbedding:
'''Embeddings in the distributed training.
'''Distributed embeddings.
DGL provides a distributed embedding to support models that require learnable embeddings.
DGL's distributed embeddings are mainly used for learning node embeddings of graph models.
Because distributed embeddings are part of a model, they are updated by mini-batches.
The distributed embeddings have to be updated by DGL's optimizers instead of
the optimizers provided by the deep learning frameworks (e.g., Pytorch and MXNet).
To support efficient training on a graph with many nodes, the embeddings support sparse
updates. That is, only the embeddings involved in a mini-batch computation are updated.
Currently, DGL provides only one optimizer: `SparseAdagrad`. DGL will provide more
optimizers in the future.
Distributed embeddings are sharded and stored in a cluster of machines in the same way as
py:meth:`dgl.distributed.DistTensor`, except that distributed embeddings are trainable.
Because distributed embeddings are sharded
in the same way as nodes and edges of a distributed graph, it is usually much more
efficient to access than the sparse embeddings provided by the deep learning frameworks.
Parameters
----------
num_embeddings : int
The number of embeddings
The number of embeddings. Currently, the number of embeddings has to be the same as
the number of nodes or the number of edges.
embedding_dim : int
The dimension size of embeddings.
name : str
The name of the embeddings
init_func : callable
The function to create the initial data.
part_policy : PartitionPolicy
The partition policy.
name : str, optional
The name of the embeddings. The name can uniquely identify embeddings in a system
so that another DistEmbedding object can referent to the embeddings.
init_func : callable, optional
The function to create the initial data. If the init function is not provided,
the values of the embeddings are initialized to zero.
part_policy : PartitionPolicy, optional
The partition policy that assigns embeddings to different machines in the cluster.
Currently, it only supports node partition policy or edge partition policy.
The system determines the right partition policy automatically.
Examples
--------
>>> emb_init = lambda shape, dtype: F.zeros(shape, dtype, F.cpu())
>>> emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10)
>>> def initializer(shape, dtype):
arr = th.zeros(shape, dtype=dtype)
arr.uniform_(-1, 1)
return arr
>>> emb = dgl.distributed.DistEmbedding(g.number_of_nodes(), 10, init_func=initializer)
>>> optimizer = dgl.distributed.SparseAdagrad([emb], lr=0.001)
>>> for blocks in dataloader:
>>> feats = emb(nids)
>>> loss = F.sum(feats + 1, 0)
>>> loss.backward()
>>> optimizer.step()
... feats = emb(nids)
... loss = F.sum(feats + 1, 0)
... loss.backward()
... optimizer.step()
Note
----
When a ``DistEmbedding`` object is used when the deep learning framework is recording
the forward computation, users have to invoke py:meth:`~dgl.distributed.SparseAdagrad.step`
afterwards. Otherwise, there will be some memory leak.
'''
def __init__(self, num_embeddings, embedding_dim, name=None,
init_func=None, part_policy=None):
......@@ -88,15 +119,17 @@ def _init_state(shape, dtype):
return F.zeros(shape, dtype, F.cpu())
class SparseAdagrad:
''' The Adagrad optimizer for sparse embeddings.
''' The sparse Adagrad optimizer.
This optimizer collects gradients for the sparse embeddings and update
the embeddings in the distributed KVStore.
This optimizer implements a sparse version of the Adagrad algorithm.
It works with DistEmbedding and only update the embeddings
involved in a mini-batch to support efficient training on a graph with many
nodes and edges.
Parameters
----------
params : list of DistEmbeddings
The list of sparse embeddings.
The list of distributed embeddings.
lr : float
The learning rate.
'''
......@@ -105,6 +138,7 @@ class SparseAdagrad:
self._lr = lr
# We need to register a state sum for each embedding in the kvstore.
for emb in params:
assert isinstance(emb, DistEmbedding), 'SparseAdagrad only supports DistEmbeding'
name = emb._tensor.name
kvstore = emb._tensor.kvstore
policy = emb._tensor.part_policy
......@@ -117,8 +151,7 @@ class SparseAdagrad:
''' The step function.
The step function is invoked at the end of every batch to push the gradients
of the sparse embeddings to the distributed kvstore and update the embeddings
in the kvstore.
of the embeddings involved in a mini-batch to DGL's servers and update the embeddings.
'''
with F.no_grad():
for emb in self._params:
......
......@@ -58,7 +58,7 @@ g.add_edges(2, 5) # 6
g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid
gpb = dgl.distributed.GraphPartitionBook(part_id=0,
gpb = dgl.distributed.graph_partition_book.BasicPartitionBook(part_id=0,
num_parts=1,
node_map=node_map,
edge_map=edge_map,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment