benchmarks_entrypoint.py 19.3 KB
Newer Older
yangzhong's avatar
yangzhong committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import importlib.util
import json
import logging
import os
import sys
import uuid
from datetime import datetime

import pandas as pd


try:
    from psycopg2.extensions import register_adapter
    from psycopg2.extras import Json

    register_adapter(dict, Json)
    PSYCOPG2_AVAILABLE = True
except ImportError:
    PSYCOPG2_AVAILABLE = False


class ImportModuleException(Exception):
    pass


class MetricsRecorder:
    def __init__(
        self,
        connection,
        logger: logging.Logger,
        repository: str,
        branch: str,
        commit_id: str,
        commit_msg: str,
        collect_csv_data: bool = True,
    ):
        self.conn = connection
        self.use_database = connection is not None
        if self.use_database:
            self.conn.autocommit = True
        self.logger = logger
        self.repository = repository
        self.branch = branch
        self.commit_id = commit_id
        self.commit_msg = commit_msg
        self.collect_csv_data = collect_csv_data

        # For CSV export - store all data in pandas DataFrames (only if CSV collection is enabled)
        if self.collect_csv_data:
            # Initialize empty DataFrames with proper schemas
            self.benchmarks_df = pd.DataFrame(
                columns=[
                    "benchmark_id",
                    "repository",
                    "branch",
                    "commit_id",
                    "commit_message",
                    "metadata",
                    "created_at",
                ]
            )
            self.device_measurements_df = pd.DataFrame(
                columns=["benchmark_id", "cpu_util", "mem_megabytes", "gpu_util", "gpu_mem_megabytes", "time"]
            )
            self.model_measurements_df = pd.DataFrame(
                columns=[
                    "benchmark_id",
                    "time",
                    "model_load_time",
                    "first_eager_forward_pass_time_secs",
                    "second_eager_forward_pass_time_secs",
                    "first_eager_generate_time_secs",
                    "second_eager_generate_time_secs",
                    "time_to_first_token_secs",
                    "time_to_second_token_secs",
                    "time_to_third_token_secs",
                    "time_to_next_token_mean_secs",
                    "first_compile_generate_time_secs",
                    "second_compile_generate_time_secs",
                    "third_compile_generate_time_secs",
                    "fourth_compile_generate_time_secs",
                ]
            )
        else:
            self.benchmarks_df = None
            self.device_measurements_df = None
            self.model_measurements_df = None

    def initialise_benchmark(self, metadata: dict[str, str]) -> str:
        """
        Creates a new benchmark, returns the benchmark id (UUID)
        """
        # Generate a unique UUID for this benchmark
        benchmark_id = str(uuid.uuid4())

        if self.use_database:
            with self.conn.cursor() as cur:
                cur.execute(
                    "INSERT INTO benchmarks (benchmark_id, repository, branch, commit_id, commit_message, metadata) VALUES (%s, %s, %s, %s, %s, %s)",
                    (benchmark_id, self.repository, self.branch, self.commit_id, self.commit_msg, metadata),
                )
                self.logger.debug(f"initialised benchmark #{benchmark_id}")

        # Store benchmark data for CSV export (if enabled)
        if self.collect_csv_data:
            # Add row to pandas DataFrame
            new_row = pd.DataFrame(
                [
                    {
                        "benchmark_id": benchmark_id,
                        "repository": self.repository,
                        "branch": self.branch,
                        "commit_id": self.commit_id,
                        "commit_message": self.commit_msg,
                        "metadata": json.dumps(metadata),
                        "created_at": datetime.utcnow().isoformat(),
                    }
                ]
            )
            self.benchmarks_df = pd.concat([self.benchmarks_df, new_row], ignore_index=True)

        mode_info = []
        if self.use_database:
            mode_info.append("database")
        if self.collect_csv_data:
            mode_info.append("CSV")
        mode_str = " + ".join(mode_info) if mode_info else "no storage"

        self.logger.debug(f"initialised benchmark #{benchmark_id} ({mode_str} mode)")
        return benchmark_id

    def collect_device_measurements(self, benchmark_id: str, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes):
        """
        Collect device metrics, such as CPU & GPU usage. These are "static", as in you cannot pass arbitrary arguments to the function.
        """
        # Store device measurements for CSV export (if enabled)
        if self.collect_csv_data:
            # Add row to pandas DataFrame
            new_row = pd.DataFrame(
                [
                    {
                        "benchmark_id": benchmark_id,
                        "cpu_util": cpu_util,
                        "mem_megabytes": mem_megabytes,
                        "gpu_util": gpu_util,
                        "gpu_mem_megabytes": gpu_mem_megabytes,
                        "time": datetime.utcnow().isoformat(),
                    }
                ]
            )
            self.device_measurements_df = pd.concat([self.device_measurements_df, new_row], ignore_index=True)

        # Store in database if available
        if self.use_database:
            with self.conn.cursor() as cur:
                cur.execute(
                    "INSERT INTO device_measurements (benchmark_id, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes) VALUES (%s, %s, %s, %s, %s)",
                    (benchmark_id, cpu_util, mem_megabytes, gpu_util, gpu_mem_megabytes),
                )

        self.logger.debug(
            f"collected device measurements for benchmark #{benchmark_id} [CPU util: {cpu_util}, mem MBs: {mem_megabytes}, GPU util: {gpu_util}, GPU mem MBs: {gpu_mem_megabytes}]"
        )

    def collect_model_measurements(self, benchmark_id: str, measurements: dict[str, float]):
        # Store model measurements for CSV export (if enabled)
        if self.collect_csv_data:
            # Add row to pandas DataFrame with flattened measurements
            row_data = {"benchmark_id": benchmark_id, "time": datetime.utcnow().isoformat()}
            # Flatten the measurements dict into the row
            row_data.update(measurements)

            new_row = pd.DataFrame([row_data])
            self.model_measurements_df = pd.concat([self.model_measurements_df, new_row], ignore_index=True)

        # Store in database if available
        if self.use_database:
            with self.conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO model_measurements (
                        benchmark_id,
                        measurements
                    ) VALUES (%s, %s)
                    """,
                    (
                        benchmark_id,
                        measurements,
                    ),
                )

        self.logger.debug(f"collected model measurements for benchmark #{benchmark_id}: {measurements}")

    def export_to_csv(self, output_dir: str = "benchmark_results"):
        """
        Export all collected data to CSV files using pandas DataFrames
        """
        if not self.collect_csv_data:
            self.logger.warning("CSV data collection is disabled - no CSV files will be generated")
            return

        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
            self.logger.info(f"Created output directory: {output_dir}")

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        files_created = []

        # Export using pandas DataFrames
        self._export_pandas_data(output_dir, timestamp, files_created)

        self.logger.info(f"CSV export complete! Created {len(files_created)} files in {output_dir}")

    def _export_pandas_data(self, output_dir: str, timestamp: str, files_created: list):
        """
        Export CSV files using pandas DataFrames
        """
        # Export benchmarks
        benchmarks_file = os.path.join(output_dir, f"benchmarks_{timestamp}.csv")
        self.benchmarks_df.to_csv(benchmarks_file, index=False)
        files_created.append(benchmarks_file)
        self.logger.info(f"Exported {len(self.benchmarks_df)} benchmark records to {benchmarks_file}")

        # Export device measurements
        device_file = os.path.join(output_dir, f"device_measurements_{timestamp}.csv")
        self.device_measurements_df.to_csv(device_file, index=False)
        files_created.append(device_file)
        self.logger.info(f"Exported {len(self.device_measurements_df)} device measurement records to {device_file}")

        # Export model measurements (already flattened)
        model_file = os.path.join(output_dir, f"model_measurements_{timestamp}.csv")
        self.model_measurements_df.to_csv(model_file, index=False)
        files_created.append(model_file)
        self.logger.info(f"Exported {len(self.model_measurements_df)} model measurement records to {model_file}")

        # Create comprehensive summary using pandas operations
        summary_file = os.path.join(output_dir, f"benchmark_summary_{timestamp}.csv")
        self._create_summary(summary_file)
        files_created.append(summary_file)

    def _create_summary(self, summary_file: str):
        """
        Create a comprehensive summary CSV using pandas operations
        """
        if len(self.benchmarks_df) == 0:
            # Create empty summary file
            summary_df = pd.DataFrame()
            summary_df.to_csv(summary_file, index=False)
            self.logger.info(f"Created empty benchmark summary at {summary_file}")
            return

        # Start with benchmarks as the base
        summary_df = self.benchmarks_df.copy()

        # Add model measurements (join on benchmark_id)
        if len(self.model_measurements_df) > 0:
            # Drop 'time' column from model measurements to avoid conflicts
            model_df = self.model_measurements_df.drop(columns=["time"], errors="ignore")
            summary_df = summary_df.merge(model_df, on="benchmark_id", how="left")

        # Calculate device measurement aggregates using pandas groupby
        if len(self.device_measurements_df) > 0:
            device_agg = (
                self.device_measurements_df.groupby("benchmark_id")
                .agg(
                    {
                        "cpu_util": ["mean", "max", "std", "count"],
                        "mem_megabytes": ["mean", "max", "std"],
                        "gpu_util": ["mean", "max", "std"],
                        "gpu_mem_megabytes": ["mean", "max", "std"],
                    }
                )
                .round(3)
            )

            # Flatten column names
            device_agg.columns = [f"{col[0]}_{col[1]}" for col in device_agg.columns]
            device_agg = device_agg.reset_index()

            # Rename count column to be more descriptive
            if "cpu_util_count" in device_agg.columns:
                device_agg = device_agg.rename(columns={"cpu_util_count": "device_measurement_count"})

            # Merge with summary
            summary_df = summary_df.merge(device_agg, on="benchmark_id", how="left")

        # Export the comprehensive summary
        summary_df.to_csv(summary_file, index=False)
        self.logger.info(f"Created comprehensive benchmark summary with {len(summary_df)} records at {summary_file}")

    def close(self):
        if self.use_database and self.conn:
            self.conn.close()


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter("[%(levelname)s - %(asctime)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)


def parse_arguments() -> tuple[str, str, str, str, bool, str]:
    """
    Parse command line arguments for the benchmarking CLI.
    """
    parser = argparse.ArgumentParser(description="CLI for benchmarking the huggingface/transformers.")

    parser.add_argument(
        "repository",
        type=str,
        help="The repository name on which the benchmarking is performed.",
    )

    parser.add_argument(
        "branch",
        type=str,
        help="The branch name on which the benchmarking is performed.",
    )

    parser.add_argument(
        "commit_id",
        type=str,
        help="The commit hash on which the benchmarking is performed.",
    )

    parser.add_argument(
        "commit_msg",
        type=str,
        help="The commit message associated with the commit, truncated to 70 characters.",
    )

    parser.add_argument("--csv", action="store_true", default=False, help="Enable CSV output files generation.")

    parser.add_argument(
        "--csv-output-dir",
        type=str,
        default="benchmark_results",
        help="Directory for CSV output files (default: benchmark_results).",
    )

    args = parser.parse_args()

    # CSV is disabled by default, only enabled when --csv is used
    generate_csv = args.csv

    return args.repository, args.branch, args.commit_id, args.commit_msg, generate_csv, args.csv_output_dir


def import_from_path(module_name, file_path):
    try:
        spec = importlib.util.spec_from_file_location(module_name, file_path)
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)
        return module
    except Exception as e:
        raise ImportModuleException(f"failed to load python module: {e}")


def create_database_connection():
    """
    Try to create a database connection. Returns None if connection fails.
    """
    if not PSYCOPG2_AVAILABLE:
        logger.warning("psycopg2 not available - running in CSV-only mode")
        return None

    try:
        import psycopg2

        conn = psycopg2.connect("dbname=metrics")
        logger.info("Successfully connected to database")
        return conn
    except Exception as e:
        logger.warning(f"Failed to connect to database: {e}. Running in CSV-only mode")
        return None


def create_global_metrics_recorder(
    repository: str, branch: str, commit_id: str, commit_msg: str, generate_csv: bool = False
) -> MetricsRecorder:
    """
    Create a global metrics recorder that will be used across all benchmarks.
    """
    connection = create_database_connection()
    recorder = MetricsRecorder(connection, logger, repository, branch, commit_id, commit_msg, generate_csv)

    # Log the storage mode
    storage_modes = []
    if connection is not None:
        storage_modes.append("database")
    if generate_csv:
        storage_modes.append("CSV")

    if not storage_modes:
        logger.warning("Running benchmarks with NO data storage (no database connection, CSV disabled)")
        logger.warning("Use --csv flag to enable CSV output when database is unavailable")
    else:
        logger.info(f"Running benchmarks with: {' + '.join(storage_modes)} storage")

    return recorder


if __name__ == "__main__":
    benchmarks_folder_path = os.path.dirname(os.path.realpath(__file__))
    benches_folder_path = os.path.join(benchmarks_folder_path, "benches")

    repository, branch, commit_id, commit_msg, generate_csv, csv_output_dir = parse_arguments()

    # Create a global metrics recorder
    global_metrics_recorder = create_global_metrics_recorder(repository, branch, commit_id, commit_msg, generate_csv)

    successful_benchmarks = 0
    failed_benchmarks = 0

    # Automatically discover all benchmark modules in benches/ folder
    benchmark_modules = []

    if os.path.exists(benches_folder_path):
        logger.debug(f"Scanning for benchmarks in: {benches_folder_path}")
        for entry in os.scandir(benches_folder_path):
            if not entry.name.endswith(".py"):
                continue
            if entry.name.startswith("__"):  # Skip __init__.py, __pycache__, etc.
                continue

            # Check if the file has a run_benchmark function
            try:
                logger.debug(f"checking if benches/{entry.name} has run_benchmark function")
                module = import_from_path(entry.name.split(".")[0], entry.path)
                if hasattr(module, "run_benchmark"):
                    benchmark_modules.append(entry.name)
                    logger.debug(f"discovered benchmark: {entry.name}")
                else:
                    logger.debug(f"skipping {entry.name} - no run_benchmark function found")
            except Exception as e:
                logger.debug(f"failed to check benches/{entry.name}: {e}")
    else:
        logger.warning(f"Benches directory not found: {benches_folder_path}")

    if benchmark_modules:
        logger.info(f"Discovered {len(benchmark_modules)} benchmark(s): {benchmark_modules}")
    else:
        logger.warning("No benchmark modules found in benches/ directory")

    for module_name in benchmark_modules:
        module_path = os.path.join(benches_folder_path, module_name)
        try:
            logger.debug(f"loading: {module_name}")
            module = import_from_path(module_name.split(".")[0], module_path)
            logger.info(f"running benchmarks in: {module_name}")

            # Check if the module has an updated run_benchmark function that accepts metrics_recorder
            try:
                # Try the new signature first
                module.run_benchmark(logger, repository, branch, commit_id, commit_msg, global_metrics_recorder)
            except TypeError:
                # Fall back to the old signature for backward compatibility
                logger.warning(
                    f"Module {module_name} using old run_benchmark signature - database connection will be created per module"
                )
                module.run_benchmark(logger, repository, branch, commit_id, commit_msg)

            successful_benchmarks += 1
        except ImportModuleException as e:
            logger.error(e)
            failed_benchmarks += 1
        except Exception as e:
            logger.error(f"error running benchmarks for {module_name}: {e}")
            failed_benchmarks += 1

    # Export CSV results at the end (if enabled)
    try:
        if generate_csv:
            global_metrics_recorder.export_to_csv(csv_output_dir)
            logger.info(f"CSV reports have been generated and saved to the {csv_output_dir} directory")
        else:
            logger.info("CSV generation disabled - no CSV files created (use --csv to enable)")

        logger.info(f"Benchmark run completed. Successful: {successful_benchmarks}, Failed: {failed_benchmarks}")
    except Exception as e:
        logger.error(f"Failed to export CSV results: {e}")
    finally:
        global_metrics_recorder.close()