benchmark_utils.py 4.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import inspect
from logging import getLogger
from time import time
from typing import Callable

import torch
import yaml
from torch.optim.lr_scheduler import _LRScheduler as LRScheduler
from torch.utils.data import DataLoader
from tqdm import tqdm

from colossalai.booster import Booster
from colossalai.cluster import DistCoordinator

logger = getLogger("colossalai-booster-benchmark")
_INVALID = float("nan")


def format_num(num: int, bytes=False):
    """Scale bytes to its proper format, e.g. 1253656 => '1.20MB'"""
    factor = 1024 if bytes else 1000
    suffix = "B" if bytes else ""
    for unit in ["", " K", " M", " G", " T", " P"]:
        if num < factor:
            return f"{num:.2f}{unit}{suffix}"
        num /= factor


def _is_valid(val):
    return val == val


def get_call_arg_names(module_or_fn):
    if isinstance(module_or_fn, torch.nn.Module):
        return inspect.getfullargspec(module_or_fn.forward)[0][1:]
    return inspect.getfullargspec(module_or_fn)[0]


def measure_params(model):
    num_params = _INVALID

    try:
        num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    except AttributeError as e:
        logger.error(f"Unable to measure model params due to error: {e}")

    return num_params


def warm_up(
    model,
    booster,
    dataloader,
    criterion,
    optimizer,
    lr_scheduler,
    num_runs=10,
):
    for i, data in enumerate(dataloader):
        if i > num_runs:
            break
        inputs, labels = data[0].cuda(), data[1].cuda()
        outputs = model(inputs, labels=labels)
        loss = criterion(outputs)
        booster.backward(loss, optimizer)
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()


def fmt(d: dict):
    return yaml.dump(d)


def benchmark(
    model: torch.nn.Module,
    booster: Booster,
    optimizer: torch.optim.Optimizer,
    lr_scheduler: LRScheduler,
    dataloader: DataLoader,
    criterion: Callable = None,
    warm_up_fn=warm_up,
    epoch_num: int = 3,
    batch_size: int = 32,
    warm_up_steps: int = 3,
):
    results = {}
    model_device = torch.cuda.current_device()

    # Warm up
    warm_up_fn(
        model,
        booster,
        dataloader,
        criterion,
        optimizer,
        lr_scheduler,
        num_runs=warm_up_steps,
    )
    # Measure params
    params = measure_params(model)
    if _is_valid(params):
        results["params"] = format_num(params)
        logger.info(f"Model parameters: {params} ({format_num(params)})")

    # Measure Allocated Memory and Throughput
    memory = {}
    throughput = {}
    torch.cuda.reset_peak_memory_stats(device=model_device)
    pre_mem = torch.cuda.memory_allocated(device=model_device)

    start_time = time()

    for epoch in range(epoch_num):
        with tqdm(dataloader, desc=f'Epoch [{epoch + 1}/{epoch_num}]',
                  disable=not DistCoordinator().is_master()) as pbar:
            for data in pbar:
                inputs, labels = data[0].cuda(), data[1].cuda()
                outputs = model(inputs, labels=labels)
                loss = criterion(outputs)
                booster.backward(loss, optimizer)
                optimizer.step()
                lr_scheduler.step()
                optimizer.zero_grad()

    end_time = time()

    all_sample = epoch_num * len(dataloader)

    post_mem = torch.cuda.memory_allocated(device=model_device)
    max_mem = torch.cuda.max_memory_allocated(device=model_device)

    memory[f"batch_size_{batch_size}"] = {
        "cuda_pre_training_bytes": format_num(pre_mem, bytes=True),
        "cuda_max_training_bytes": format_num(max_mem, bytes=True),
        "cuda_post_training_bytes": format_num(post_mem, bytes=True),
    }
    logger.info(fmt({f"Memory results (batch_size={batch_size})": memory[f"batch_size_{batch_size}"]}))

    throughput[f"batch_size_{batch_size}"] = {"throughput:": "{:.1f}".format(all_sample * DistCoordinator().world_size / (end_time - start_time))}
    logger.info(fmt({f"Throughput results (batch_size={batch_size})": throughput[f"batch_size_{batch_size}"]}))

    results["throughput"] = throughput
    results["memory"] = memory

    return results