'''Copyright The Microsoft DeepSpeed Team''' import os import time import inspect from abc import ABC, abstractmethod from pathlib import Path import torch import torch.multiprocessing as mp import deepspeed from deepspeed.accelerator import get_accelerator import deepspeed.comm as dist from torch.multiprocessing import Process import pytest from _pytest.outcomes import Skipped from _pytest.fixtures import FixtureLookupError, FixtureFunctionMarker # Worker timeout *after* the first worker has completed. DEEPSPEED_UNIT_WORKER_TIMEOUT = 120 # Worker timeout for tests that hang DEEPSPEED_TEST_TIMEOUT = 600 def get_xdist_worker_id(): xdist_worker = os.environ.get('PYTEST_XDIST_WORKER', None) if xdist_worker is not None: xdist_worker_id = xdist_worker.replace('gw', '') return int(xdist_worker_id) return None def get_master_port(): master_port = os.environ.get('DS_TEST_PORT', '29503') xdist_worker_id = get_xdist_worker_id() if xdist_worker_id is not None: master_port = str(int(master_port) + xdist_worker_id) return master_port def set_accelerator_visible(): cuda_visible = os.environ.get("CUDA_VISIBLE_DEVICES", None) xdist_worker_id = get_xdist_worker_id() if xdist_worker_id is None: xdist_worker_id = 0 if cuda_visible is None: # CUDA_VISIBLE_DEVICES is not set, discover it using accelerator specific command instead import subprocess if get_accelerator().device_name() == 'cuda': is_rocm_pytorch = hasattr(torch.version, 'hip') and torch.version.hip is not None if is_rocm_pytorch: rocm_smi = subprocess.check_output(['rocm-smi', '--showid']) gpu_ids = filter(lambda s: 'GPU' in s, rocm_smi.decode('utf-8').strip().split('\n')) num_gpus = len(list(gpu_ids)) else: nvidia_smi = subprocess.check_output(['nvidia-smi', '--list-gpus']) num_gpus = len(nvidia_smi.decode('utf-8').strip().split('\n')) else: assert get_accelerator().device_name() == 'xpu' import re clinfo = subprocess.check_output(['clinfo']) lines = clinfo.decode('utf-8').strip().split('\n') num_gpus = 0 for line in lines: match = re.search('Device Type.*GPU', line) if match: num_gpus += 1 cuda_visible = ",".join(map(str, range(num_gpus))) # rotate list based on xdist worker id, example below # wid=0 -> ['0', '1', '2', '3'] # wid=1 -> ['1', '2', '3', '0'] # wid=2 -> ['2', '3', '0', '1'] # wid=3 -> ['3', '0', '1', '2'] dev_id_list = cuda_visible.split(",") dev_id_list = dev_id_list[xdist_worker_id:] + dev_id_list[:xdist_worker_id] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list) class DistributedExec(ABC): """ Base class for distributed execution of functions/methods. Contains common methods needed for DistributedTest and DistributedFixture. """ world_size = 2 backend = get_accelerator().communication_backend_name() init_distributed = True set_dist_env = True requires_cuda_env = True @abstractmethod def run(self): ... def __call__(self, request=None): self._fixture_kwargs = self._get_fixture_kwargs(request, self.run) world_size = self.world_size if self.requires_cuda_env and not get_accelerator().is_available(): pytest.skip("only supported in accelerator environments.") if isinstance(world_size, int): world_size = [world_size] for procs in world_size: self._launch_procs(procs) time.sleep(0.5) def _get_fixture_kwargs(self, request, func): if not request: return {} # Grab fixture / parametrize kwargs from pytest request object fixture_kwargs = {} params = inspect.getfullargspec(func).args params.remove("self") for p in params: try: fixture_kwargs[p] = request.getfixturevalue(p) except FixtureLookupError: pass # test methods can have kwargs that are not fixtures return fixture_kwargs def _launch_procs(self, num_procs): if torch.cuda.is_available() and torch.cuda.device_count() < num_procs: pytest.skip( f"Skipping test because not enough GPUs are available: {num_procs} required, {torch.cuda.device_count()} available" ) mp.set_start_method('forkserver', force=True) skip_msg = mp.Queue() # Allows forked processes to share pytest.skip reason processes = [] for local_rank in range(num_procs): p = Process(target=self._dist_init, args=(local_rank, num_procs, skip_msg)) p.start() processes.append(p) # Now loop and wait for a test to complete. The spin-wait here isn't a big # deal because the number of processes will be O(#GPUs) << O(#CPUs). any_done = False start = time.time() while (not any_done) and ((time.time() - start) < DEEPSPEED_TEST_TIMEOUT): for p in processes: if not p.is_alive(): any_done = True break time.sleep(.1) # So we don't hog CPU # If we hit the timeout, then presume a test is hanged if not any_done: for p in processes: p.terminate() pytest.exit("Test hanged, exiting", returncode=0) # Wait for all other processes to complete for p in processes: p.join(DEEPSPEED_UNIT_WORKER_TIMEOUT) failed = [(rank, p) for rank, p in enumerate(processes) if p.exitcode != 0] for rank, p in failed: # If it still hasn't terminated, kill it because it hung. if p.exitcode is None: p.terminate() pytest.fail(f'Worker {rank} hung.', pytrace=False) if p.exitcode < 0: pytest.fail(f'Worker {rank} killed by signal {-p.exitcode}', pytrace=False) if p.exitcode > 0: pytest.fail(f'Worker {rank} exited with code {p.exitcode}', pytrace=False) if not skip_msg.empty(): # This assumed all skip messages are the same, it may be useful to # add a check here to assert all exit messages are equal pytest.skip(skip_msg.get()) def _dist_init(self, local_rank, num_procs, skip_msg): """Initialize deepspeed.comm and execute the user function. """ if self.set_dist_env: os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = get_master_port() os.environ['LOCAL_RANK'] = str(local_rank) # NOTE: unit tests don't support multi-node so local_rank == global rank os.environ['RANK'] = str(local_rank) os.environ['WORLD_SIZE'] = str(num_procs) # turn off NCCL logging if set os.environ.pop('NCCL_DEBUG', None) if get_accelerator().is_available(): set_accelerator_visible() if self.init_distributed: deepspeed.init_distributed(dist_backend=self.backend) dist.barrier() if get_accelerator().is_available(): get_accelerator().set_device(local_rank) try: self.run(**self._fixture_kwargs) except BaseException as e: if isinstance(e, Skipped): skip_msg.put(e.msg) else: raise e if self.init_distributed or dist.is_initialized(): # make sure all ranks finish at the same time dist.barrier() # tear down after test completes dist.destroy_process_group() class DistributedFixture(DistributedExec): """ Implementation that extends @pytest.fixture to allow for distributed execution. This is primarily meant to be used when a test requires executing two pieces of code with different world sizes. There are 2 parameters that can be modified: - world_size: int = 2 -- the number of processes to launch - backend: Literal['nccl','mpi','gloo'] = 'nccl' -- which backend to use Features: - able to call pytest.skip() inside fixture - can be reused by multiple tests - can accept other fixtures as input Limitations: - cannot use @pytest.mark.parametrize - world_size cannot be modified after definition and only one world_size value is accepted - any fixtures used must also be used in the test that uses this fixture (see example below) - return values cannot be returned. Passing values to a DistributedTest object can be achieved using class_tmpdir and writing to file (see example below) Usage: - must implement a run(self, ...) method - fixture can be used by making the class name input to a test function Example: @pytest.fixture(params=[10,20]) def regular_pytest_fixture(request): return request.param class distributed_fixture_example(DistributedFixture): world_size = 4 def run(self, regular_pytest_fixture, class_tmpdir): assert int(os.environ["WORLD_SIZE"]) == self.world_size local_rank = os.environ["LOCAL_RANK"] print(f"Rank {local_rank} with value {regular_pytest_fixture}") with open(os.path.join(class_tmpdir, f"{local_rank}.txt"), "w") as f: f.write(f"{local_rank},{regular_pytest_fixture}") class TestExample(DistributedTest): world_size = 1 def test(self, distributed_fixture_example, regular_pytest_fixture, class_tmpdir): assert int(os.environ["WORLD_SIZE"]) == self.world_size for rank in range(4): with open(os.path.join(class_tmpdir, f"{rank}.txt"), "r") as f: assert f.read() == f"{rank},{regular_pytest_fixture}" """ is_dist_fixture = True # These values are just placeholders so that pytest recognizes this as a fixture _pytestfixturefunction = FixtureFunctionMarker(scope="function", params=None) __name__ = "" def __init__(self): assert isinstance(self.world_size, int), "Only one world size is allowed for distributed fixtures" self.__name__ = type(self).__name__ _pytestfixturefunction = FixtureFunctionMarker(scope="function", params=None, name=self.__name__) class DistributedTest(DistributedExec): """ Implementation for running pytest with distributed execution. There are 2 parameters that can be modified: - world_size: Union[int,List[int]] = 2 -- the number of processes to launch - backend: Literal['nccl','mpi','gloo'] = 'nccl' -- which backend to use Features: - able to call pytest.skip() inside tests - works with pytest fixtures, parametrize, mark, etc. - can contain multiple tests (each of which can be parametrized separately) - class methods can be fixtures (usable by tests in this class only) - world_size can be changed for individual tests using @pytest.mark.world_size(world_size) - class_tmpdir is a fixture that can be used to get a tmpdir shared among all tests (including DistributedFixture) Usage: - class name must start with "Test" - must implement one or more test*(self, ...) methods Example: @pytest.fixture(params=[10,20]) def val1(request): return request.param @pytest.mark.fast @pytest.mark.parametrize("val2", [30,40]) class TestExample(DistributedTest): world_size = 2 @pytest.fixture(params=[50,60]) def val3(self, request): return request.param def test_1(self, val1, val2, str1="hello world"): assert int(os.environ["WORLD_SIZE"]) == self.world_size assert all(val1, val2, str1) @pytest.mark.world_size(1) @pytest.mark.parametrize("val4", [70,80]) def test_2(self, val1, val2, val3, val4): assert int(os.environ["WORLD_SIZE"]) == 1 assert all(val1, val2, val3, val4) """ is_dist_test = True # Temporary directory that is shared among test methods in a class @pytest.fixture(autouse=True, scope="class") def class_tmpdir(self, tmpdir_factory): fn = tmpdir_factory.mktemp(self.__class__.__name__) return fn def run(self, **fixture_kwargs): self._current_test(**fixture_kwargs) def __call__(self, request): self._current_test = self._get_current_test_func(request) self._fixture_kwargs = self._get_fixture_kwargs(request, self._current_test) if self.requires_cuda_env and not get_accelerator().is_available(): pytest.skip("only supported in accelerator environments.") # Catch world_size override pytest mark for mark in getattr(request.function, "pytestmark", []): if mark.name == "world_size": world_size = mark.args[0] break else: world_size = self.world_size if isinstance(world_size, int): world_size = [world_size] for procs in world_size: self._launch_procs(procs) time.sleep(0.5) def _get_current_test_func(self, request): # DistributedTest subclasses may have multiple test methods func_name = request.function.__name__ return getattr(self, func_name) def get_test_path(filename): curr_path = Path(__file__).parent return str(curr_path.joinpath(filename))