"src/array/cuda/negative_sampling.hip" did not exist on "8d14a739bc9e446d6c92ef83eafe5782398118de"
Unverified Commit ede0558c authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[Feature] Dataloader gpu cache (#6033)


Co-authored-by: default avatarHongzhi (Steve), Chen <chenhongzhi.nkcs@gmail.com>
parent 0c3b2b78
...@@ -3,11 +3,13 @@ import atexit ...@@ -3,11 +3,13 @@ import atexit
import inspect import inspect
import itertools import itertools
import math import math
import operator
import os import os
import re import re
import threading import threading
from collections.abc import Mapping, Sequence from collections.abc import Mapping, Sequence
from contextlib import contextmanager from contextlib import contextmanager
from functools import reduce
from queue import Empty, Full, Queue from queue import Empty, Full, Queue
import numpy as np import numpy as np
...@@ -21,6 +23,7 @@ from .._ffi.base import is_tensor_adaptor_enabled ...@@ -21,6 +23,7 @@ from .._ffi.base import is_tensor_adaptor_enabled
from ..base import dgl_warning, DGLError, EID, NID from ..base import dgl_warning, DGLError, EID, NID
from ..batch import batch as batch_graphs from ..batch import batch as batch_graphs
from ..cuda import GPUCache
from ..distributed import DistGraph from ..distributed import DistGraph
from ..frame import LazyFeature from ..frame import LazyFeature
from ..heterograph import DGLGraph from ..heterograph import DGLGraph
...@@ -336,8 +339,45 @@ class DDPTensorizedDataset(torch.utils.data.IterableDataset): ...@@ -336,8 +339,45 @@ class DDPTensorizedDataset(torch.utils.data.IterableDataset):
) // self.batch_size ) // self.batch_size
def _numel_of_shape(shape):
return reduce(operator.mul, shape, 1)
def _init_gpu_caches(graph, gpu_caches):
if not hasattr(graph, "_gpu_caches"):
graph._gpu_caches = {"node": {}, "edge": {}}
if gpu_caches is None:
return
assert isinstance(gpu_caches, dict), "GPU cache argument should be a dict"
for i, frames in enumerate([graph._node_frames, graph._edge_frames]):
node_or_edge = ["node", "edge"][i]
cache_inf = gpu_caches.get(node_or_edge, {})
for tid, frame in enumerate(frames):
type_ = [graph.ntypes, graph.canonical_etypes][i][tid]
for key in frame.keys():
if key in cache_inf and cache_inf[key] > 0:
column = frame._columns[key]
if (key, type_) not in graph._gpu_caches[node_or_edge]:
cache = GPUCache(
cache_inf[key],
_numel_of_shape(column.shape),
graph.idtype,
)
graph._gpu_caches[node_or_edge][key, type_] = (
cache,
column.shape,
)
def _prefetch_update_feats( def _prefetch_update_feats(
feats, frames, types, get_storage_func, id_name, device, pin_prefetcher feats,
frames,
types,
get_storage_func,
id_name,
device,
pin_prefetcher,
gpu_caches,
): ):
for tid, frame in enumerate(frames): for tid, frame in enumerate(frames):
type_ = types[tid] type_ = types[tid]
...@@ -351,8 +391,25 @@ def _prefetch_update_feats( ...@@ -351,8 +391,25 @@ def _prefetch_update_feats(
"Found a LazyFeature with no ID specified, " "Found a LazyFeature with no ID specified, "
"and the graph does not have dgl.NID or dgl.EID columns" "and the graph does not have dgl.NID or dgl.EID columns"
) )
ids = column.id_ or default_id
if (parent_key, type_) in gpu_caches:
cache, item_shape = gpu_caches[parent_key, type_]
values, missing_index, missing_keys = cache.query(ids)
missing_values = get_storage_func(parent_key, type_).fetch(
missing_keys, device, pin_prefetcher
)
cache.replace(
missing_keys, F.astype(missing_values, F.float32)
)
values = F.astype(values, F.dtype(missing_values))
F.scatter_row_inplace(values, missing_index, missing_values)
# Reshape the flattened result to match the original shape.
F.reshape(values, (values.shape[0],) + item_shape)
values.__cache_miss__ = missing_keys.shape[0] / ids.shape[0]
feats[tid, key] = values
else:
feats[tid, key] = get_storage_func(parent_key, type_).fetch( feats[tid, key] = get_storage_func(parent_key, type_).fetch(
column.id_ or default_id, device, pin_prefetcher ids, device, pin_prefetcher
) )
...@@ -376,6 +433,7 @@ def _prefetch_for_subgraph(subg, dataloader): ...@@ -376,6 +433,7 @@ def _prefetch_for_subgraph(subg, dataloader):
NID, NID,
dataloader.device, dataloader.device,
dataloader.pin_prefetcher, dataloader.pin_prefetcher,
dataloader.graph._gpu_caches["node"],
) )
_prefetch_update_feats( _prefetch_update_feats(
edge_feats, edge_feats,
...@@ -385,6 +443,7 @@ def _prefetch_for_subgraph(subg, dataloader): ...@@ -385,6 +443,7 @@ def _prefetch_for_subgraph(subg, dataloader):
EID, EID,
dataloader.device, dataloader.device,
dataloader.pin_prefetcher, dataloader.pin_prefetcher,
dataloader.graph._gpu_caches["edge"],
) )
return _PrefetchedGraphFeatures(node_feats, edge_feats) return _PrefetchedGraphFeatures(node_feats, edge_feats)
...@@ -791,6 +850,17 @@ class DataLoader(torch.utils.data.DataLoader): ...@@ -791,6 +850,17 @@ class DataLoader(torch.utils.data.DataLoader):
Whether to pin the feature tensors into pinned memory. Whether to pin the feature tensors into pinned memory.
Default: True if the graph is on CPU and :attr:`device` is CUDA. False otherwise. Default: True if the graph is on CPU and :attr:`device` is CUDA. False otherwise.
gpu_cache : dict[dict], optional
Which node and edge features to cache using HugeCTR gpu_cache. Example:
{"node": {"features": 500000}, "edge": {"types": 4000000}} would
indicate that we want to cache 500k of the node "features" and 4M of the
edge "types" in GPU caches.
Is supported only on NVIDIA GPUs with compute capability 70 or above.
The dictionary holds the keys of features along with the corresponding
cache sizes. Please see
https://github.com/NVIDIA-Merlin/HugeCTR/blob/main/gpu_cache/ReadMe.md
for further reference.
kwargs : dict kwargs : dict
Key-word arguments to be passed to the parent PyTorch Key-word arguments to be passed to the parent PyTorch
:py:class:`torch.utils.data.DataLoader` class. Common arguments are: :py:class:`torch.utils.data.DataLoader` class. Common arguments are:
...@@ -867,6 +937,7 @@ class DataLoader(torch.utils.data.DataLoader): ...@@ -867,6 +937,7 @@ class DataLoader(torch.utils.data.DataLoader):
use_alternate_streams=None, use_alternate_streams=None,
pin_prefetcher=None, pin_prefetcher=None,
use_uva=False, use_uva=False,
gpu_cache=None,
**kwargs, **kwargs,
): ):
# (BarclayII) PyTorch Lightning sometimes will recreate a DataLoader from an existing # (BarclayII) PyTorch Lightning sometimes will recreate a DataLoader from an existing
...@@ -1055,6 +1126,8 @@ class DataLoader(torch.utils.data.DataLoader): ...@@ -1055,6 +1126,8 @@ class DataLoader(torch.utils.data.DataLoader):
self.other_storages = {} self.other_storages = {}
_init_gpu_caches(self.graph, gpu_cache)
super().__init__( super().__init__(
self.dataset, self.dataset,
collate_fn=CollateWrapper( collate_fn=CollateWrapper(
......
...@@ -14,4 +14,4 @@ class BaseTensorStorage(FeatureStorage): ...@@ -14,4 +14,4 @@ class BaseTensorStorage(FeatureStorage):
def fetch( def fetch(
self, indices, device, pin_memory=False, **kwargs self, indices, device, pin_memory=False, **kwargs
): # pylint: disable=unused-argument ): # pylint: disable=unused-argument
return F.copy_to(F.gather_row(tensor, indices), device, **kwargs) return F.copy_to(F.gather_row(self.storage, indices), device, **kwargs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment