mlperf_logger.py 2.8 KB
Newer Older
xinghao's avatar
xinghao committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# @lint-ignore-every LICENSELINT
# Copyright (c) 2020, NVIDIA CORPORATION.  All rights reserved.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.


"""
Utilities for MLPerf logging
"""

import os

import torch

try:
    from mlperf_logging import mllog
    from mlperf_logging.mllog import constants

    _MLLOGGER = mllog.get_mllogger()
except ImportError as error:
    print("Unable to import mlperf_logging, ", error)


def log_start(*args, **kwargs):
    "log with start tag"
    _log_print(_MLLOGGER.start, *args, **kwargs)


def log_end(*args, **kwargs):
    "log with end tag"
    _log_print(_MLLOGGER.end, *args, **kwargs)


def log_event(*args, **kwargs):
    "log with event tag"
    _log_print(_MLLOGGER.event, *args, **kwargs)


def _log_print(logger, *args, **kwargs):
    "makes mlperf logger aware of distributed execution"
    if "stack_offset" not in kwargs:
        kwargs["stack_offset"] = 3
    if "value" not in kwargs:
        kwargs["value"] = None

    if kwargs.pop("log_all_ranks", False):
        log = True
    else:
        log = get_rank() == 0

    if log:
        logger(*args, **kwargs)


def config_logger(benchmark):
    "initiates mlperf logger"
    mllog.config(
        filename=os.path.join(
            os.path.dirname(os.path.abspath(__file__)), f"{benchmark}.log"
        )
    )
    _MLLOGGER.logger.propagate = False


def barrier():
    """
    Works as a temporary distributed barrier, currently pytorch
    doesn't implement barrier for NCCL backend.
    Calls all_reduce on dummy tensor and synchronizes with GPU.
    """
    if torch.distributed.is_available() and torch.distributed.is_initialized():
        torch.distributed.all_reduce(torch.cuda.FloatTensor(1))
        torch.cuda.synchronize()


def get_rank():
    """
    Gets distributed rank or returns zero if distributed is not initialized.
    """
    if torch.distributed.is_available() and torch.distributed.is_initialized():
        rank = torch.distributed.get_rank()
    else:
        rank = 0
    return rank


def mlperf_submission_log(benchmark):
    """
    Logs information needed for MLPerf submission
    """

    config_logger(benchmark)

    log_event(
        key=constants.SUBMISSION_BENCHMARK,
        value=benchmark,
    )

    log_event(key=constants.SUBMISSION_ORG, value="reference_implementation")

    log_event(key=constants.SUBMISSION_DIVISION, value="closed")

    log_event(key=constants.SUBMISSION_STATUS, value="onprem")

    log_event(key=constants.SUBMISSION_PLATFORM, value="reference_implementation")

    log_event(key=constants.SUBMISSION_ENTRY, value="reference_implementation")

    log_event(key=constants.SUBMISSION_POC_NAME, value="reference_implementation")

    log_event(key=constants.SUBMISSION_POC_EMAIL, value="reference_implementation")