base.py 7.05 KB
Newer Older
rusty1s's avatar
rusty1s committed
1
from typing import Optional, Callable, Dict, Any
rusty1s's avatar
rusty1s committed
2
3
4
5
6
7
8

import warnings

import torch
from torch import Tensor
from torch_sparse import SparseTensor

rusty1s's avatar
rusty1s committed
9
10
from torch_geometric_autoscale import History, AsyncIOPool
from torch_geometric_autoscale import SubgraphLoader, EvalSubgraphLoader
rusty1s's avatar
rusty1s committed
11
12
13
14
15
16


class ScalableGNN(torch.nn.Module):
    def __init__(self, num_nodes: int, hidden_channels: int, num_layers: int,
                 pool_size: Optional[int] = None,
                 buffer_size: Optional[int] = None, device=None):
rusty1s's avatar
rusty1s committed
17
        super().__init__()
rusty1s's avatar
rusty1s committed
18
19
20
21

        self.num_nodes = num_nodes
        self.hidden_channels = hidden_channels
        self.num_layers = num_layers
rusty1s's avatar
rusty1s committed
22
        self.pool_size = num_layers - 1 if pool_size is None else pool_size
rusty1s's avatar
rusty1s committed
23
24
25
26
27
28
29
        self.buffer_size = buffer_size

        self.histories = torch.nn.ModuleList([
            History(num_nodes, hidden_channels, device)
            for _ in range(num_layers - 1)
        ])

rusty1s's avatar
rusty1s committed
30
        self.pool: Optional[AsyncIOPool] = None
rusty1s's avatar
rusty1s committed
31
        self._async = False
rusty1s's avatar
doc  
rusty1s committed
32
        self.__out: Optional[Tensor] = None
rusty1s's avatar
rusty1s committed
33
34
35
36
37
38
39
40
41
42

    @property
    def emb_device(self):
        return self.histories[0].emb.device

    @property
    def device(self):
        return self.histories[0]._device

    def _apply(self, fn: Callable) -> None:
rusty1s's avatar
rusty1s committed
43
        super()._apply(fn)
rusty1s's avatar
rusty1s committed
44
        # We only initialize the AsyncIOPool in case histories are on CPU:
rusty1s's avatar
rusty1s committed
45
46
47
48
49
50
51
52
53
54
55
56
        if (str(self.emb_device) == 'cpu' and str(self.device)[:4] == 'cuda'
                and self.pool_size is not None
                and self.buffer_size is not None):
            self.pool = AsyncIOPool(self.pool_size, self.buffer_size,
                                    self.histories[0].embedding_dim)
            self.pool.to(self.device)
        return self

    def reset_parameters(self):
        for history in self.histories:
            history.reset_parameters()

rusty1s's avatar
rusty1s committed
57
58
59
60
61
62
63
64
    def __call__(
        self,
        x: Optional[Tensor] = None,
        adj_t: Optional[SparseTensor] = None,
        batch_size: Optional[int] = None,
        n_id: Optional[Tensor] = None,
        offset: Optional[Tensor] = None,
        count: Optional[Tensor] = None,
rusty1s's avatar
rusty1s committed
65
        loader: EvalSubgraphLoader = None,
rusty1s's avatar
rusty1s committed
66
67
        **kwargs,
    ) -> Tensor:
rusty1s's avatar
rusty1s committed
68
69
70
71

        if loader is not None:
            return self.mini_inference(loader)

rusty1s's avatar
rusty1s committed
72
73
        # We only perform asynchronous history transfer in case the following
        # conditions are met:
rusty1s's avatar
rusty1s committed
74
75
76
77
        self._async = (self.pool is not None and batch_size is not None
                       and n_id is not None and offset is not None
                       and count is not None)

rusty1s's avatar
rusty1s committed
78
79
80
        if (batch_size is not None and not self._async
                and str(self.emb_device) == 'cpu'
                and str(self.device)[:4] == 'cuda'):
rusty1s's avatar
rusty1s committed
81
82
83
84
85
86
87
            warnings.warn('Asynchronous I/O disabled, although history and '
                          'model sit on different devices.')

        if self._async:
            for hist in self.histories:
                self.pool.async_pull(hist.emb, None, None, n_id[batch_size:])

rusty1s's avatar
rusty1s committed
88
        out = self.forward(x, adj_t, batch_size, n_id, offset, count, **kwargs)
rusty1s's avatar
rusty1s committed
89
90
91
92
93
94
95
96
97
98
99
100
101
102

        if self._async:
            for hist in self.histories:
                self.pool.synchronize_push()

        self._async = False

        return out

    def push_and_pull(self, history, x: Tensor,
                      batch_size: Optional[int] = None,
                      n_id: Optional[Tensor] = None,
                      offset: Optional[Tensor] = None,
                      count: Optional[Tensor] = None) -> Tensor:
rusty1s's avatar
rusty1s committed
103
        r"""Push and pull information from `x` to `history` and vice versa."""
rusty1s's avatar
rusty1s committed
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

        if n_id is None and x.size(0) != self.num_nodes:
            return x  # Do nothing...

        if n_id is None and x.size(0) == self.num_nodes:
            history.push(x)
            return x

        assert n_id is not None

        if batch_size is None:
            history.push(x, n_id)
            return x

        if not self._async:
            history.push(x[:batch_size], n_id[:batch_size], offset, count)
            h = history.pull(n_id[batch_size:])
            return torch.cat([x[:batch_size], h], dim=0)

rusty1s's avatar
rusty1s committed
123
124
125
126
127
128
129
130
131
132
133
134
135
        else:
            out = self.pool.synchronize_pull()[:n_id.numel() - batch_size]
            self.pool.async_push(x[:batch_size], offset, count, history.emb)
            out = torch.cat([x[:batch_size], out], dim=0)
            self.pool.free_pull()
            return out

    @property
    def _out(self):
        if self.__out is None:
            self.__out = torch.empty(self.num_nodes, self.out_channels,
                                     pin_memory=True)
        return self.__out
rusty1s's avatar
rusty1s committed
136
137

    @torch.no_grad()
rusty1s's avatar
rusty1s committed
138
    def mini_inference(self, loader: SubgraphLoader) -> Tensor:
rusty1s's avatar
doc  
rusty1s committed
139
140
141
        # We iterate over the loader in a layer-wise fashsion.
        # In order to re-use some intermediate representations, we maintain a
        # `state` dictionary for each individual mini-batch.
rusty1s's avatar
rusty1s committed
142

rusty1s's avatar
doc  
rusty1s committed
143
144
145
146
147
148
        loader = [sub_data + ({}, ) for sub_data in loader]

        # We push the outputs of the first layer to the history:
        for data, batch_size, n_id, offset, count, state in loader:
            x = data.x.to(self.device)
            adj_t = data.adj_t.to(self.device)
rusty1s's avatar
rusty1s committed
149
150
151
152
153
            out = self.forward_layer(0, x, adj_t, state)[:batch_size]
            self.pool.async_push(out, offset, count, self.histories[0].emb)
        self.pool.synchronize_push()

        for i in range(1, len(self.histories)):
rusty1s's avatar
doc  
rusty1s committed
154
            # Pull the complete layer-wise history:
rusty1s's avatar
rusty1s committed
155
156
157
158
            for _, batch_size, n_id, offset, count, _ in loader:
                self.pool.async_pull(self.histories[i - 1].emb, offset, count,
                                     n_id[batch_size:])

rusty1s's avatar
doc  
rusty1s committed
159
160
            # Compute new output embeddings one-by-one and start pushing them
            # to the history.
rusty1s's avatar
rusty1s committed
161
162
163
164
165
166
167
168
            for batch, batch_size, n_id, offset, count, state in loader:
                adj_t = batch.adj_t.to(self.device)
                x = self.pool.synchronize_pull()[:n_id.numel()]
                out = self.forward_layer(i, x, adj_t, state)[:batch_size]
                self.pool.async_push(out, offset, count, self.histories[i].emb)
                self.pool.free_pull()
            self.pool.synchronize_push()

rusty1s's avatar
doc  
rusty1s committed
169
        # We pull the histories from the last layer:
rusty1s's avatar
rusty1s committed
170
171
172
173
        for _, batch_size, n_id, offset, count, _ in loader:
            self.pool.async_pull(self.histories[-1].emb, offset, count,
                                 n_id[batch_size:])

rusty1s's avatar
doc  
rusty1s committed
174
175
        # And compute final output embeddings, which we write into a private
        # output embedding matrix:
rusty1s's avatar
rusty1s committed
176
177
178
179
180
181
182
183
184
185
        for batch, batch_size, n_id, offset, count, state in loader:
            adj_t = batch.adj_t.to(self.device)
            x = self.pool.synchronize_pull()[:n_id.numel()]
            out = self.forward_layer(self.num_layers - 1, x, adj_t,
                                     state)[:batch_size]
            self.pool.async_push(out, offset, count, self._out)
            self.pool.free_pull()
        self.pool.synchronize_push()

        return self._out
rusty1s's avatar
rusty1s committed
186
187
188

    @torch.no_grad()
    def forward_layer(self, layer: int, x: Tensor, adj_t: SparseTensor,
rusty1s's avatar
rusty1s committed
189
                      state: Dict[str, Any]) -> Tensor:
rusty1s's avatar
rusty1s committed
190
        raise NotImplementedError