"vscode:/vscode.git/clone" did not exist on "038bc5d521bc73a997f9f64fcc79cc4279d2f985"
benchmark_data.py 5.83 KB
Newer Older
Yanghan Wang's avatar
Yanghan Wang committed
1
#!/usr/bin/env python3
Hang Zhang's avatar
Hang Zhang committed
2
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
Yanghan Wang's avatar
Yanghan Wang committed
3
4
5
6
7
8
"""
Tool for benchmarking data loading
"""

import logging
import time
9
from dataclasses import dataclass
10
from typing import Type, Union
Yanghan Wang's avatar
Yanghan Wang committed
11
12
13

import detectron2.utils.comm as comm
import numpy as np
14
from d2go.config import CfgNode
15
from d2go.distributed import get_num_processes_per_machine, launch
16
from d2go.evaluation.api import AccuracyDict, MetricsDict
17
from d2go.runner import BaseRunner
Yanghan Wang's avatar
Yanghan Wang committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from d2go.setup import (
    basic_argument_parser,
    post_mortem_if_fail_for_main,
    prepare_for_launch,
    setup_after_launch,
)
from d2go.utils.misc import print_metrics_table
from detectron2.fb.env import get_launch_environment
from detectron2.utils.logger import log_every_n_seconds
from fvcore.common.history_buffer import HistoryBuffer

logger = logging.getLogger("d2go.tools.benchmark_data")


32
33
@dataclass
class BenchmarkDataOutput:
34
35
    accuracy: AccuracyDict[float]
    metrics: MetricsDict[float]
36
37


Yanghan Wang's avatar
Yanghan Wang committed
38
def main(
39
40
41
42
    cfg: CfgNode,
    output_dir: str,
    runner_class: Union[str, Type[BaseRunner]],
    is_train: bool = True,
43
) -> BenchmarkDataOutput:
44
    runner = setup_after_launch(cfg, output_dir, runner_class)
Yanghan Wang's avatar
Yanghan Wang committed
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

    if is_train:
        data_loader = runner.build_detection_train_loader(cfg)
    else:
        assert len(cfg.DATASETS.TEST) > 0, cfg.DATASETS.TEST
        data_loader = runner.build_detection_test_loader(cfg, cfg.DATASETS.TEST[0])

    TOTAL_BENCHMARK_TIME = (
        100 if get_launch_environment() == "local" else 600
    )  # run benchmark for 10 min
    LOGGING_METER_WINDOW_SIZE = 20
    LOGGING_METER_TIME_INTERVAL = 5
    WARMUP_ITERS = 5

    # initialize
    time_per_iter = HistoryBuffer(max_length=10000)
    total_time = 0

    start = time.time()
    for no, batch in enumerate(data_loader):
        data_time = time.time() - start
        time_per_iter.update(data_time)
        total_time += data_time

        if no == 0:
            logger.info("Show the first batch as example:\n{}".format(batch))

        # Assume batch size is constant
        batch_size = cfg.SOLVER.IMS_PER_BATCH // comm.get_world_size()
        assert len(batch) * batch_size

        median = time_per_iter.median(window_size=LOGGING_METER_WINDOW_SIZE)
        avg = time_per_iter.avg(window_size=LOGGING_METER_WINDOW_SIZE)
        log_every_n_seconds(
            logging.INFO,
            "iter: {};"
            " recent per-iter seconds: {:.4f} (avg) {:.4f} (median);"
            " recent per-image seconds: {:.4f} (avg) {:.4f} (median).".format(
                no,
                avg,
                median,
                avg / batch_size,
                median / batch_size,
            ),
            n=LOGGING_METER_TIME_INTERVAL,
        )

        # Synchronize between processes, exit when all processes are running for enough
        # time. This mimic the loss.backward(), the logged time doesn't include the time
        # for synchronize.
        finished = comm.all_gather(total_time >= TOTAL_BENCHMARK_TIME)
        if all(x for x in finished):
            logger.info("Benchmarking finished after {} seconds".format(total_time))
            break

        start = time.time()

    dataset_name = ":".join(cfg.DATASETS.TRAIN) if is_train else cfg.DATASETS.TEST[0]
    time_per_iter = [x[0] for x in time_per_iter.values()]
    time_per_iter = time_per_iter[
        min(WARMUP_ITERS, max(len(time_per_iter) - WARMUP_ITERS, 0)) :
    ]
    results = {
        "environment": {
            "num_workers": cfg.DATALOADER.NUM_WORKERS,
            "world_size": comm.get_world_size(),
            "processes_per_machine": get_num_processes_per_machine(),
        },
        "main_processes_stats": {
            "batch_size_per_process": batch_size,
            "per_iter_avg": np.average(time_per_iter),
            "per_iter_p1": np.percentile(time_per_iter, 1, interpolation="nearest"),
            "per_iter_p10": np.percentile(time_per_iter, 10, interpolation="nearest"),
            "per_iter_p50": np.percentile(time_per_iter, 50, interpolation="nearest"),
            "per_iter_p90": np.percentile(time_per_iter, 90, interpolation="nearest"),
            "per_iter_p99": np.percentile(time_per_iter, 99, interpolation="nearest"),
            "per_image_avg": np.average(time_per_iter) / batch_size,
            "per_image_p1": np.percentile(time_per_iter, 1, interpolation="nearest")
            / batch_size,
            "per_image_p10": np.percentile(time_per_iter, 10, interpolation="nearest")
            / batch_size,
            "per_image_p50": np.percentile(time_per_iter, 50, interpolation="nearest")
            / batch_size,
            "per_image_p90": np.percentile(time_per_iter, 90, interpolation="nearest")
            / batch_size,
            "per_image_p99": np.percentile(time_per_iter, 99, interpolation="nearest")
            / batch_size,
        },
        "data_processes_stats": {},  # TODO: add worker stats
    }
    # Metrics follows the hierarchy of: name -> dataset -> task -> metrics -> number
    metrics = {"_name_": {dataset_name: results}}
    print_metrics_table(metrics)

139
140
141
142
    return BenchmarkDataOutput(
        accuracy=metrics,
        metrics=metrics,
    )
Yanghan Wang's avatar
Yanghan Wang committed
143
144
145


def run_with_cmdline_args(args):
146
    cfg, output_dir, runner_name = prepare_for_launch(args)
Yanghan Wang's avatar
Yanghan Wang committed
147
148
149
150
151
152
153
    launch(
        post_mortem_if_fail_for_main(main),
        num_processes_per_machine=args.num_processes,
        num_machines=args.num_machines,
        machine_rank=args.machine_rank,
        dist_url=args.dist_url,
        backend=args.dist_backend,
154
155
156
157
        args=(cfg, output_dir, runner_name),
        kwargs={
            "is_train": args.is_train,
        },
Yanghan Wang's avatar
Yanghan Wang committed
158
159
160
161
162
163
164
165
166
167
168
169
    )


if __name__ == "__main__":
    parser = basic_argument_parser(requires_output_dir=True)
    parser.add_argument(
        "--is-train",
        type=bool,
        default=True,
        help="data loader is train",
    )
    run_with_cmdline_args(parser.parse_args())