"tests/vscode:/vscode.git/clone" did not exist on "f4608c2227c85a3ab36aa1e11cbaec928e9b67c8"
distributed.rst 4.61 KB
Newer Older
1
2
.. _guide-distributed:

3
4
Chapter 7: Distributed Training
=====================================
5

6
7
:ref:`(中文版) <guide_cn-distributed>`

8
9
10
11
12
13
14
15
16
17
18
DGL adopts a fully distributed approach that distributes both data and computation
across a collection of computation resources. In the context of this section, we
will assume a cluster setting (i.e., a group of machines). DGL partitions a graph
into subgraphs and each machine in a cluster is responsible for one subgraph (partition).
DGL runs an identical training script on all machines in the cluster to parallelize
the computation and runs servers on the same machines to serve partitioned data to the trainers.

For the training script, DGL provides distributed APIs that are similar to the ones for
mini-batch training. This makes distributed training require only small code modifications
from mini-batch training on a single machine. Below shows an example of training GraphSage
in a distributed fashion. The only code modifications are located on line 4-7:
19
20
1) initialize DGL's distributed module, 2) create a distributed graph object, and 
3) split the training set and calculate the nodes for the local process.
21
22
23
24
25
26
27
28
The rest of the code, including sampler creation, model definition, training loops
are the same as :ref:`mini-batch training <guide-minibatch>`.

.. code:: python

    import dgl
    import torch as th

29
    dgl.distributed.initialize('ip_config.txt', num_servers, num_workers)
30
    th.distributed.init_process_group(backend='gloo')
31
32
33
    g = dgl.distributed.DistGraph('graph_name', 'part_config.json')
    pb = g.get_partition_book()
    train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True)
34
35
36


    # Create sampler
37
38
39
40
41
42
43
44
45
46
    sampler = NeighborSampler(g, [10,25],
                              dgl.distributed.sample_neighbors, 
                              device)

    dataloader = DistDataLoader(
        dataset=train_nid.numpy(),
        batch_size=batch_size,
        collate_fn=sampler.sample_blocks,
        shuffle=True,
        drop_last=False)
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

    # Define model and optimizer
    model = SAGE(in_feats, num_hidden, n_classes, num_layers, F.relu, dropout)
    model = th.nn.parallel.DistributedDataParallel(model)
    loss_fcn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # training loop
    for epoch in range(args.num_epochs):
        for step, blocks in enumerate(dataloader):
            batch_inputs, batch_labels = load_subtensor(g, blocks[0].srcdata[dgl.NID],
                                                        blocks[-1].dstdata[dgl.NID])
            batch_pred = model(blocks, batch_inputs)
            loss = loss_fcn(batch_pred, batch_labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

When running the training script in a cluster of machines, DGL provides tools to copy data
to the cluster's machines and launch the training job on all machines.

**Note**: The current distributed training API only supports the Pytorch backend.

**Note**: The current implementation only supports graphs with one node type and one edge type.

DGL implements a few distributed components to support distributed training. The figure below
shows the components and their interactions.

.. figure:: https://data.dgl.ai/asset/image/distributed.png
   :alt: Imgur

Specifically, DGL's distributed training has three types of interacting processes:
*server*, *sampler* and *trainer*.

* Server processes run on each machine that stores a graph partition
  (this includes the graph structure and node/edge features). These servers
  work together to serve the graph data to trainers. Note that one machine may run
  multiple server processes simultaneously to parallelize computation as well as
  network communication.
* Sampler processes interact with the servers and sample nodes and edges to
  generate mini-batches for training.
* Trainers contain multiple classes to interact with servers. It has
  :class:`~dgl.distributed.DistGraph` to get access to partitioned graph data and has
  :class:`~dgl.distributed.DistEmbedding` and :class:`~dgl.distributed.DistTensor` to access
  the node/edge features/embeddings. It has
  :class:`~dgl.distributed.dist_dataloader.DistDataLoader` to
  interact with samplers to get mini-batches.


Having the distributed components in mind, the rest of the section will cover
the following distributed components:

* :ref:`guide-distributed-preprocessing`
* :ref:`guide-distributed-apis`
101
* :ref:`guide-distributed-hetero`
102
103
104
105
106
107
108
109
110
* :ref:`guide-distributed-tools`

.. toctree::
    :maxdepth: 1
    :hidden:
    :glob:

    distributed-preprocessing
    distributed-apis
111
    distributed-hetero
112
    distributed-tools