run_options.py 7.88 KB
Newer Older
zhangqha's avatar
zhangqha committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""Module taking care of important package constants."""

import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

import numpy as np
from deepmd.cluster import get_resource
from deepmd.env import get_tf_default_nthreads, tf, GLOBAL_CONFIG, global_float_prec
from deepmd.loggers import set_log_handles

if TYPE_CHECKING:
    import horovod.tensorflow as HVD


__all__ = [
    "WELCOME",
    "CITATION",
    "BUILD",
    "RunOptions",
]

log = logging.getLogger(__name__)


# http://patorjk.com/software/taag. Font:Big"
WELCOME = (  # noqa
    " _____               _____   __  __  _____           _     _  _   ",
    "|  __ \             |  __ \ |  \/  ||  __ \         | |   (_)| |  ",
    "| |  | |  ___   ___ | |__) || \  / || |  | | ______ | | __ _ | |_ ",
    "| |  | | / _ \ / _ \|  ___/ | |\/| || |  | ||______|| |/ /| || __|",
    "| |__| ||  __/|  __/| |     | |  | || |__| |        |   < | || |_ ",
    "|_____/  \___| \___||_|     |_|  |_||_____/         |_|\_\|_| \__|",
)

CITATION = (
    "Please read and cite:",
    "Wang, Zhang, Han and E, Comput.Phys.Comm. 228, 178-184 (2018)",
)

_sep = "\n                      "
BUILD = (
    f"installed to:         {GLOBAL_CONFIG['install_prefix']}",
    f"source :              {GLOBAL_CONFIG['git_summ']}",
    f"source brach:         {GLOBAL_CONFIG['git_branch']}",
    f"source commit:        {GLOBAL_CONFIG['git_hash']}",
    f"source commit at:     {GLOBAL_CONFIG['git_date']}",
    f"build float prec:     {global_float_prec}",
    f"build variant:        {GLOBAL_CONFIG['dp_variant']}",
    f"build with tf inc:    {GLOBAL_CONFIG['tf_include_dir']}",
    f"build with tf lib:    {GLOBAL_CONFIG['tf_libs'].replace(';', _sep)}"  # noqa
)


class RunOptions:
    """Class with inf oon how to run training (cluster, MPI and GPU config).

    Attributes
    ----------
    gpus: Optional[List[int]]
        list of GPUs if any are present else None
    is_chief: bool
        in distribured training it is true for tha main MPI process in serail it is
        always true
    world_size: int
        total worker count
    my_rank: int
        index of the MPI task
    nodename: str
        name of the node
    node_list_ : List[str]
        the list of nodes of the current mpirun
    my_device: str
        deviice type - gpu or cpu
    """

    gpus: Optional[List[int]]
    world_size: int
    my_rank: int
    nodename: str
    nodelist: List[int]
    my_device: str

    _HVD: Optional["HVD"]
    _log_handles_already_set: bool = False

    def __init__(
        self,
        init_model: Optional[str] = None,
        init_frz_model: Optional[str] = None,
        restart: Optional[str] = None,
        log_path: Optional[str] = None,
        log_level: int = 0,
        mpi_log: str = "master"
    ):
        self._try_init_distrib()

        if all((init_model, restart)):
            raise RuntimeError(
                "--init-model and --restart should not be set at the same time"
            )

        # model init options
        self.restart = restart
        self.init_model = init_model
        self.init_mode = "init_from_scratch"

        if restart is not None:
            self.restart = os.path.abspath(restart)
            self.init_mode = "restart"
        elif init_model is not None:
            self.init_model = os.path.abspath(init_model)
            self.init_mode = "init_from_model"
        elif init_frz_model is not None:
            self.init_frz_model = os.path.abspath(init_frz_model)
            self.init_mode = "init_from_frz_model"

        self._setup_logger(Path(log_path) if log_path else None, log_level, mpi_log)

    @property
    def is_chief(self):
        """Whether my rank is 0."""
        return self.my_rank == 0

    def print_resource_summary(self):
        """Print build and current running cluster configuration summary."""
        log.info("---Summary of the training---------------------------------------")
        if self.is_distrib:
            log.info("distributed")
            log.info(f"world size:           {self.world_size}")
            log.info(f"my rank:              {self.my_rank}")
            log.info(f"node list:            {self.nodelist}")
        log.info(f"running on:           {self.nodename}")
        log.info(f"computing device:     {self.my_device}")
        env_value = os.environ.get('CUDA_VISIBLE_DEVICES', 'unset')
        log.info(f"CUDA_VISIBLE_DEVICES: {env_value}")
        log.info(f"Count of visible GPU: {len(self.gpus or [])}")
        intra, inter = get_tf_default_nthreads()
        log.info(f"num_intra_threads:    {intra:d}")
        log.info(f"num_inter_threads:    {inter:d}")
        log.info("-----------------------------------------------------------------")

    def _setup_logger(
        self,
        log_path: Optional[Path],
        log_level: int,
        mpi_log: Optional[str],
    ):
        """Set up package loggers.

        Parameters
        ----------
        log_level: int
            logging level
        log_path: Optional[str]
            path to log file, if None logs will be send only to console. If the parent
            directory does not exist it will be automatically created, by default None
        mpi_log : Optional[str], optional
            mpi log type. Has three options. `master` will output logs to file and
            console only from rank==0. `collect` will write messages from all ranks to
            one file opened under rank==0 and to console. `workers` will open one log
            file for each worker designated by its rank, console behaviour is the same
            as for `collect`.
        """
        if not self._log_handles_already_set:
            if not self._HVD:
                mpi_log = None
            set_log_handles(log_level, log_path, mpi_log=mpi_log)
            self._log_handles_already_set = True
            log.debug("Log handles were successfully set")
        else:
            log.warning(
                f"Log handles have already been set. It is not advisable to "
                f"reset them{', especially when runnig with MPI!' if self._HVD else ''}"
            )

    def _try_init_distrib(self):
        try:
            import horovod.tensorflow as HVD
            HVD.init()
            self.is_distrib = HVD.size() > 1
        except ImportError:
            log.warning("Switch to serial execution due to lack of horovod module.")
            self.is_distrib = False

        # Do real intialization
        if self.is_distrib:
            self._init_distributed(HVD)
            self._HVD = HVD
        else:
            self._init_serial()
            self._HVD = None

    def _init_distributed(self, HVD: "HVD"):
        """Initialize  settings for distributed training.

        Parameters
        ----------
        HVD : HVD
            horovod object
        """
        nodename, nodelist, gpus = get_resource()
        self.nodename = nodename
        self.nodelist = nodelist
        self.gpus = gpus
        self.my_rank = HVD.rank()
        self.world_size = HVD.size()

        if gpus is not None:
            gpu_idx = HVD.local_rank()
            if gpu_idx >= len(gpus):
                raise RuntimeError('Count of local processes is larger than that of available GPUs!')
            self.my_device = f"gpu:{gpu_idx:d}"
        else:
            self.my_device = "cpu:0"

    def _init_serial(self):
        """Initialize setting for serial training."""
        nodename, _, gpus = get_resource()

        self.gpus = gpus
        self.world_size = 1
        self.my_rank = 0
        self.nodename = nodename
        self.nodelist = [nodename]

        if gpus is not None:
            self.my_device = "gpu:0"
        else:
            self.my_device = "cpu:0"

        self._HVD = None