Unverified Commit 5561af45 authored by Daniil Sizov's avatar Daniil Sizov Committed by GitHub
Browse files

[Feature] Dataloader worker affinitization (#3723)



* PR3355 + CSR conversion workaround

* Remove debug code

* Fix convention errors

* Remove wrongly added code section during merge

* Update to reflect dataloading changes

* Fix missing changes

* Remove comment

* Fix linter errors

* Fix trailing whitespace

* Add wrapper around worker init function
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
parent 37be02a4
......@@ -9,6 +9,7 @@ import inspect
import re
import atexit
import os
import psutil
import torch
import torch.distributed as dist
......@@ -678,7 +679,8 @@ class DataLoader(torch.utils.data.DataLoader):
def __init__(self, graph, indices, graph_sampler, device=None, use_ddp=False,
ddp_seed=0, batch_size=1, drop_last=False, shuffle=False,
use_prefetch_thread=None, use_alternate_streams=None,
pin_prefetcher=None, use_uva=False, **kwargs):
pin_prefetcher=None, use_uva=False,
use_cpu_worker_affinity=False, cpu_worker_affinity_cores=None, **kwargs):
# (BarclayII) PyTorch Lightning sometimes will recreate a DataLoader from an existing
# DataLoader with modifications to the original arguments. The arguments are retrieved
# from the attributes with the same name, and because we change certain arguments
......@@ -826,6 +828,26 @@ class DataLoader(torch.utils.data.DataLoader):
self.other_storages = {}
if use_cpu_worker_affinity:
nw_work = kwargs.get('num_workers', 0)
if cpu_worker_affinity_cores is None:
cpu_worker_affinity_cores = []
if not isinstance(cpu_worker_affinity_cores, list):
raise Exception('ERROR: cpu_worker_affinity_cores should be a list of cores')
if not nw_work > 0:
raise Exception('ERROR: affinity should be used with --num_workers=X')
if len(cpu_worker_affinity_cores) not in [0, nw_work]:
raise Exception('ERROR: cpu_affinity incorrect '
'settings for cores={} num_workers={}'
.format(cpu_worker_affinity_cores, nw_work))
self.cpu_cores = (cpu_worker_affinity_cores
if len(cpu_worker_affinity_cores)
else range(0, nw_work))
worker_init_fn = WorkerInitWrapper(self.worker_init_function)
super().__init__(
self.dataset,
collate_fn=CollateWrapper(
......@@ -844,6 +866,21 @@ class DataLoader(torch.utils.data.DataLoader):
self, super().__iter__(), use_thread=self.use_prefetch_thread,
use_alternate_streams=self.use_alternate_streams, num_threads=num_threads)
def worker_init_function(self, worker_id):
"""Worker init default function.
Parameters
----------
worker_id : int
Worker ID.
"""
try:
psutil.Process().cpu_affinity([self.cpu_cores[worker_id]])
print('CPU-affinity worker {} has been assigned to core={}'
.format(worker_id, self.cpu_cores[worker_id]))
except:
raise Exception('ERROR: cannot use affinity id={} cpu_cores={}'
.format(worker_id, self.cpu_cores))
# To allow data other than node/edge data to be prefetched.
def attach_data(self, name, data):
"""Add a data other than node and edge features for prefetching."""
......
......@@ -172,7 +172,8 @@ setup(
'scipy>=1.1.0',
'networkx>=2.1',
'requests>=2.19.0',
'tqdm'
'tqdm',
'psutil>=5.8.0',
],
url='https://github.com/dmlc/dgl',
distclass=BinaryDistribution,
......
......@@ -14,7 +14,7 @@ SET DGLBACKEND=!BACKEND!
SET DGL_LIBRARY_PATH=!CD!\build
SET DGL_DOWNLOAD_DIR=!CD!
python -m pip install pytest pyyaml pandas pydantic rdflib || EXIT /B 1
python -m pip install pytest psutil pyyaml pandas pydantic rdflib || EXIT /B 1
python -m pytest -v --junitxml=pytest_backend.xml tests\!DGLBACKEND! || EXIT /B 1
python -m pytest -v --junitxml=pytest_compute.xml tests\compute || EXIT /B 1
ENDLOCAL
......
......@@ -32,7 +32,7 @@ fi
conda activate ${DGLBACKEND}-ci
python3 -m pip install pytest pyyaml pandas pydantic rdflib ogb || fail "pip install"
python3 -m pip install pytest psutil pyyaml pandas pydantic rdflib ogb || fail "pip install"
python3 -m pytest -v --junitxml=pytest_compute.xml tests/compute || fail "compute"
python3 -m pytest -v --junitxml=pytest_backend.xml tests/$DGLBACKEND || fail "backend-specific"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment