argo.py 9.26 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
"""
ARGO: An Auto-Tuning Runtime System for Scalable GNN Training on Multi-Core Processor
--------------------------------------------
Graph Neural Network (GNN) training suffers from low scalability on multi-core CPUs. 
Specificially, the performance often caps at 16 cores, and no improvement is observed when applying more than 16 cores.
ARGO is a runtime system that offers scalable performance by overlapping the computation and communication during GNN training.
With ARGO enabled, we are able to scale over 64 cores, allowing ARGO to speedup GNN training (in terms of epoch time) by up to 4.30x and 3.32x on a Xeon 8380H and a Xeon 6430L, respectively.
--------------------------------------------
Paper Link: https://arxiv.org/abs/2402.03671
"""

import time
from typing import Callable, List, Tuple

import dgl.multiprocessing as dmp
import numpy as np
import psutil
from skopt import gp_minimize
from skopt.space import Normalize


def transform(self, X):
    X = np.asarray(X)
    if self.is_int:
        if np.any(np.round(X) > self.high):
            raise ValueError(
                "All integer values should" "be less than %f" % self.high
            )
        if np.any(np.round(X) < self.low):
            raise ValueError(
                "All integer values should" "be greater than %f" % self.low
            )
    else:
        if np.any(X > self.high + self._eps):
            raise ValueError("All values should" "be less than %f" % self.high)
        if np.any(X < self.low - self._eps):
            raise ValueError(
                "All values should" "be greater than %f" % self.low
            )
    if (self.high - self.low) == 0.0:
        return X * 0.0
    if self.is_int:
        return (np.round(X).astype(int) - self.low) / (self.high - self.low)
    else:
        return (X - self.low) / (self.high - self.low)


def inverse_transform(self, X):
    X = np.asarray(X)
    if np.any(X > 1.0 + self._eps):
        raise ValueError("All values should be less than 1.0")
    if np.any(X < 0.0 - self._eps):
        raise ValueError("All values should be greater than 0.0")
    X_orig = X * (self.high - self.low) + self.low
    if self.is_int:
        return np.round(X_orig).astype(int)
    return X_orig


# This is a workaround for scikit-optimize's incompatibility with NumPy, which results in an error::
# AttributeError: module 'numpy' has no attribute 'int'
Normalize.transform = transform
Normalize.inverse_transform = inverse_transform


class ARGO:
    def __init__(
        self,
        n_search=10,
        epoch=200,
        batch_size=4096,
        space=[(2, 8), (1, 4), (1, 32)],
        random_state=1,
    ):
        """
        Initialization

        Parameters
        ----------
        n_search: int
            Number of configuration searches the auto-tuner will conduct

        epoch: int
            Number of epochs of GNN training

        batch_size: int
            Size of the mini-batch

        space: list[Tuple(int,int)]
            Range of the search space; [range of processes, range of samplers for each process, range of trainers for each process]

        random_state: int
            Number of random initializations before searching

        """
        self.n_search = n_search
        self.epoch = epoch
        self.batch_size = batch_size
        self.space = space
        self.random_state = random_state
        self.acq_func = "EI"
        self.counter = [0]

    def core_binder(
        self, num_cpu_proc: int, n_samp: int, n_train: int, rank: int
    ) -> Tuple[List[int], List[int]]:
        """
        Core Binder

        The Core Binder binds CPU cores to perform sampling (i.e., sampling cores) and model propagation (i.e., training cores).
        The actual binding is done using the CPU affinity function in the data_loader.
        The core_binder function here is used to produce the list of CPU IDs for the CPU affinity function.

        Parameters
        ----------
        num_cpu_proc: int
            Number of processes instantiated

        n_samp: int
            Number of sampling cores for each process

        n_train: int
            Number of training cores for each process

        rank: int
            The rank of the current process

        Returns: Tuple[list[int], list[int]]
        -------
        load_core: list[int]
            For a given process rank, the load_core specifies a list of CPU core IDs to be used for sampling, the length of load_core = n_samp.

        comp_core: list[int]
            For a given process rank, the comp_core specifies a list of CPU core IDs to be used for training, the length of comp_core = n_comp.

        .. note:: Each process is assigned with a unique list of sampling cores and training cores, and no CPU core will appear in two lists or more.

        """
        load_core, comp_core = [], []
        n = psutil.cpu_count(logical=False)
        size = num_cpu_proc
        num_of_samplers = n_samp
        load_core = list(
            range(n // size * rank, n // size * rank + num_of_samplers)
        )
        comp_core = list(
            range(
                n // size * rank + num_of_samplers,
                n // size * rank + num_of_samplers + n_train,
            )
        )
        return load_core, comp_core

    def auto_tuning(self, train: Callable, args) -> List[int]:
        """
        Auto-tuner

        The auto-tuner runs Bayesian Optimization (BO) to search for the optimal configuration (number of processes, samplers, trainers).
        During the search, the auto-tuner explores the design space by collecting the epoch time of various configurations.
        Specifically, the exploration is done by feeding the Multi-Process Engine with various configurations, and record the epoch time.
        After the searching is done, the optimal configuration will be used repeatedly until the end of model training.

        Parameters
        ----------
        train: Callable
            The GNN training function.

        args:
            The inputs of the GNN training function.

        Returns
        -------
        result: list[int]
            The optimal configurations (which leads to the shortest epoch time) found by running BO.
            - result[0]: number of processes to instantiate
            - result[1]: number of sampling cores for each process
            - result[2]: number of training cores for each process

        """
        ep = 1
        result = gp_minimize(
            lambda x: self.mp_engine(x, train, args, ep),
            dimensions=self.space,
            n_calls=self.n_search,
            random_state=self.random_state,
            acq_func=self.acq_func,
        )
        return result

    def mp_engine(self, x: List[int], train: Callable, args, ep: int) -> float:
        """
        Multi-Process Engine (MP Engine)

        The MP Engine launches multiple GNN training processes in parallel to overlap computation with communication.
        Such an approach effectively improves the utilization of the memory bandwidth and the CPU cores.
        The MP Engine also adjust the batch size according to the number of processes instantiated, so that the effective batch size remains the same as the original program without ARGO.

        Parameters
        ----------
        x: list[int]
            Optimal configurations provided by the auto-tuner.
            - x[0]: number of processes to instantiate
            - x[1]: number of sampling cores for each process
            - x[2]: number of training cores for each process

        train: Callable
            The GNN training function.

        args:
            The inputs of the GNN training function.

        ep: int
            number of epochs.

        Returns
        -------
        t: float
            The epoch time using the current configuration `x`.
        """
        n_proc = x[0]
        n_samp = x[1]
        n_train = x[2]
        n_total = psutil.cpu_count(logical=False)

        if n_proc * (n_samp + n_train) > n_total:  # handling corner cases
            n_proc = 2
            n_samp = 2
            n_train = (n_total // n_proc) - n_samp

        processes = []
        cnt = self.counter
        b_size = self.batch_size // n_proc  # adjust batch size

        tik = time.time()
        for i in range(n_proc):
            load_core, comp_core = self.core_binder(n_proc, n_samp, n_train, i)
            p = dmp.Process(
                target=train,
                args=(*args, i, n_proc, comp_core, load_core, cnt, b_size, ep),
            )
            p.start()
            processes.append(p)
        for p in processes:
            p.join()
        t = time.time() - tik

        self.counter[0] = self.counter[0] + 1

        return t

    def run(self, train, args):
        """
        The "run" function launches ARGO to traing GNN model
        Step 1: run the auto-tuner to search for the optimal configuration
        Step 2: record the optimal configuration
        Step 3: use the optimal configuration repeatedly until the end of the model training

        Parameters
        ----------
        train: Callable
            The GNN training function.

        args:
            The inputs of the GNN training function.
        """
        result = self.auto_tuning(train, args)  # Step 1
        x = result.x  # Step 2
        self.mp_engine(
            x, train, args, ep=(self.epoch - self.n_search)
        )  # Step 3