Unverified Commit 59bf5634 authored by Ma Zerun's avatar Ma Zerun Committed by GitHub
Browse files

[Feature] Support CUDA_VISIBLE_DEVICES and multiple tasks on one GPU (#148)

* [Feature] Support CUDA_VISIBLE_DEVICES and multiple tasks on one GPU

* Fix UT

* Update according to comments
parent 312095de
import os import os
import os.path as osp import os.path as osp
import re
import subprocess import subprocess
import time import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
...@@ -26,6 +27,8 @@ class LocalRunner(BaseRunner): ...@@ -26,6 +27,8 @@ class LocalRunner(BaseRunner):
task (ConfigDict): Task type config. task (ConfigDict): Task type config.
max_num_workers (int): Max number of workers to run in parallel. max_num_workers (int): Max number of workers to run in parallel.
Defaults to 16. Defaults to 16.
max_workers_per_gpu (int): Max number of workers to run for one GPU.
Defaults to 1.
debug (bool): Whether to run in debug mode. debug (bool): Whether to run in debug mode.
lark_bot_url (str): Lark bot url. lark_bot_url (str): Lark bot url.
""" """
...@@ -34,9 +37,11 @@ class LocalRunner(BaseRunner): ...@@ -34,9 +37,11 @@ class LocalRunner(BaseRunner):
task: ConfigDict, task: ConfigDict,
max_num_workers: int = 16, max_num_workers: int = 16,
debug: bool = False, debug: bool = False,
max_workers_per_gpu: int = 1,
lark_bot_url: str = None): lark_bot_url: str = None):
super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url) super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
self.max_num_workers = max_num_workers self.max_num_workers = max_num_workers
self.max_workers_per_gpu = max_workers_per_gpu
def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]: def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
"""Launch multiple tasks. """Launch multiple tasks.
...@@ -58,7 +63,20 @@ class LocalRunner(BaseRunner): ...@@ -58,7 +63,20 @@ class LocalRunner(BaseRunner):
status.append((task_name, 0)) status.append((task_name, 0))
else: else:
import torch import torch
gpus = np.ones(torch.cuda.device_count(), dtype=np.bool_) if 'CUDA_VISIBLE_DEVICES' in os.environ:
all_gpu_ids = [
int(i) for i in re.findall(
r'(?<!-)\d+', os.getenv('CUDA_VISIBLE_DEVICES'))
]
else:
all_gpu_ids = list(range(torch.cuda.device_count()))
if len(all_gpu_ids) > 0:
gpus = np.zeros(max(all_gpu_ids) + 1, dtype=np.uint)
gpus[all_gpu_ids] = self.max_workers_per_gpu
else:
gpus = np.array([], dtype=np.uint)
pbar = tqdm(total=len(tasks)) pbar = tqdm(total=len(tasks))
lock = Lock() lock = Lock()
...@@ -69,9 +87,9 @@ class LocalRunner(BaseRunner): ...@@ -69,9 +87,9 @@ class LocalRunner(BaseRunner):
while True: while True:
lock.acquire() lock.acquire()
if sum(gpus) >= num_gpus: if sum(gpus > 0) >= num_gpus:
gpu_ids = np.where(gpus)[0][:num_gpus] gpu_ids = np.where(gpus)[0][:num_gpus]
gpus[gpu_ids] = False gpus[gpu_ids] -= 1
lock.release() lock.release()
break break
lock.release() lock.release()
...@@ -87,7 +105,7 @@ class LocalRunner(BaseRunner): ...@@ -87,7 +105,7 @@ class LocalRunner(BaseRunner):
pbar.update() pbar.update()
with lock: with lock:
gpus[gpu_ids] = True gpus[gpu_ids] += 1
return res return res
......
...@@ -98,6 +98,11 @@ def parse_args(): ...@@ -98,6 +98,11 @@ def parse_args():
'in the config.', 'in the config.',
type=int, type=int,
default=32) default=32)
parser.add_argument('--max-workers-per-gpu',
help='Max task to run in parallel on one GPU. '
'It will only be used in the local runner.',
type=int,
default=32)
parser.add_argument( parser.add_argument(
'--retry', '--retry',
help='Number of retries if the job failed when using slurm or dlc. ' help='Number of retries if the job failed when using slurm or dlc. '
...@@ -337,6 +342,7 @@ def exec_infer_runner(tasks, args, cfg): ...@@ -337,6 +342,7 @@ def exec_infer_runner(tasks, args, cfg):
else: else:
runner = LocalRunner(task=dict(type='OpenICLInferTask'), runner = LocalRunner(task=dict(type='OpenICLInferTask'),
max_num_workers=args.max_num_workers, max_num_workers=args.max_num_workers,
max_workers_per_gpu=args.max_workers_per_gpu,
debug=args.debug, debug=args.debug,
lark_bot_url=cfg['lark_bot_url']) lark_bot_url=cfg['lark_bot_url'])
runner(tasks) runner(tasks)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment