__init__.py 2.07 KB
Newer Older
xingjinliang's avatar
xingjinliang committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os
import weakref
from pathlib import Path
from shutil import rmtree
from tempfile import TemporaryDirectory
from typing import Optional, Union

from tests.unit_tests.dist_checkpointing.utils import (
    init_basic_mock_args,
    init_checkpointing_mock_args,
    initialize_gpt_model,
    setup_model_and_optimizer,
    setup_moe_model_and_optimizer,
)
from tests.unit_tests.test_utilities import Utils


def empty_dir(path: Path):
    if Utils.rank > 0:
        return
    for p in path.iterdir():
        if p.is_dir():
            rmtree(p)
        else:
            p.unlink()


class TempNamedDir(TemporaryDirectory):
    """TemporaryDirectory with a fully named directory. Empties the dir if not empty."""

    def __init__(self, name: Union[str, Path], sync=True, ignore_cleanup_errors=False) -> None:
        self.name = str(name)
        if Utils.rank == 0:
            os.makedirs(name, exist_ok=True)
            empty_dir(Path(name))
        if sync:
            import torch

            torch.distributed.barrier()
        else:
            os.makedirs(name, exist_ok=True)

        self._ignore_cleanup_errors = ignore_cleanup_errors
        self._finalizer = weakref.finalize(
            self, self._cleanup, self.name, warn_message="Implicitly cleaning up {!r}".format(self)
        )
        self.sync = sync

    def cleanup(self, override_sync: Optional[bool] = None) -> None:
        sync = self.sync if override_sync is None else override_sync
        if sync:
            import torch

            if torch.distributed.is_available() and torch.distributed.is_initialized():
                torch.distributed.barrier()
        if Utils.rank == 0:
            super().cleanup()

    def __enter__(self):
        path = Path(super().__enter__())
        if self.sync:
            import torch

            if torch.distributed.is_available() and torch.distributed.is_initialized():
                torch.distributed.barrier()
        return path

    def __exit__(self, exc_type, exc_val, exc_tb):
        raised = exc_type is not None
        if not raised:
            self.cleanup()