parse_factory.py 11.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Factory module for auto-detecting and parsing test results."""

import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional


def detect_result_type(log_dir: str) -> Optional[str]:
    """Auto-detect the type of test results in a directory.

    Checks for characteristic files to determine if results are from:
    - AI-Perf client: client_N/attempt_M/profile_export_aiperf.json
    - Legacy client: client_N.log.txt with JSONL format

    Args:
        log_dir: Directory containing test results

    Returns:
        "aiperf" if AI-Perf results detected
        "legacy" if legacy client results detected
        None if unable to detect or directory doesn't exist
    """
    if not os.path.exists(log_dir):
        logging.warning(f"Directory does not exist: {log_dir}")
        return None

    if not os.path.isdir(log_dir):
        logging.warning(f"Not a directory: {log_dir}")
        return None

    log_path = Path(log_dir)

    # Check for AI-Perf results
    # Pattern: client_N/attempt_M/profile_export_aiperf.json
    aiperf_indicators = 0
    legacy_indicators = 0

    for item in log_path.iterdir():
        if item.is_dir() and item.name.startswith("client_"):
            # Check for AI-Perf structure
            for attempt_dir in item.iterdir():
                if attempt_dir.is_dir() and attempt_dir.name.startswith("attempt_"):
                    # Look for AI-Perf result files
                    if (attempt_dir / "profile_export_aiperf.json").exists():
                        aiperf_indicators += 1
                        break
                    if (attempt_dir / "profile_export_aiperf.csv").exists():
                        aiperf_indicators += 1
                        break
                    if (attempt_dir / "genai_perf.log").exists():
                        aiperf_indicators += 1
                        break

        # Check for legacy client results
        # Pattern: client_N.log.txt with JSONL content
        if (
            item.is_file()
            and item.name.startswith("client_")
            and item.name.endswith(".log.txt")
        ):
            legacy_indicators += 1

    # Determine result type based on indicators
    if aiperf_indicators > 0 and legacy_indicators == 0:
        return "aiperf"
    elif legacy_indicators > 0 and aiperf_indicators == 0:
        return "legacy"
    elif aiperf_indicators > 0 and legacy_indicators > 0:
        # Mixed results - prioritize AI-Perf as it's newer
        logging.warning(
            f"Mixed result types detected in {log_dir}. "
            f"Found {aiperf_indicators} AI-Perf indicators and {legacy_indicators} legacy indicators. "
            f"Using AI-Perf parser."
        )
        return "aiperf"
    else:
        # No clear indicators
        logging.warning(
            f"Unable to detect result type in {log_dir}. "
            f"No client result files found."
        )
        return None


def parse_test_results(
    log_dir: Optional[str] = None,
    log_paths: Optional[List[str]] = None,
    tablefmt: str = "grid",
    sla: Optional[float] = None,
    force_parser: Optional[str] = None,
) -> Any:
    """Auto-detect and parse test results using the appropriate parser.

    This function automatically detects whether results are from the legacy
    client (JSONL format) or AI-Perf client (JSON format) and routes to the
    correct parser.

    Args:
        log_dir: Base directory for logs (for single directory processing)
        log_paths: List of log directories to process (for multiple directories)
        tablefmt: Table format for output (e.g., "fancy_grid", "pipe")
        sla: Optional SLA threshold for latency violations
        force_parser: Optional override to force using a specific parser
                     ("aiperf" or "legacy"). If not provided, auto-detection is used.

    Returns:
        Results from the appropriate parser

    Raises:
        ValueError: If force_parser is invalid or unable to detect result type

    Example:
        >>> # Auto-detect and parse single directory
        >>> parse_test_results(log_dir="test_fault_scenario[...]")

        >>> # Auto-detect and parse multiple directories
        >>> parse_test_results(log_paths=["test1", "test2"])

        >>> # Force use of legacy parser
        >>> parse_test_results(log_dir="test_dir", force_parser="legacy")
    """
    # Validate force_parser if provided
    if force_parser is not None:
        if force_parser not in ["aiperf", "legacy"]:
            raise ValueError(
                f"Invalid force_parser value: '{force_parser}'. "
                f"Valid options are: 'aiperf', 'legacy'"
            )

    # Determine which parser to use
    parser_type = None

    if force_parser:
        # Use forced parser without detection
        parser_type = force_parser
        logging.info(f"Using forced parser: {parser_type}")
    else:
        # Auto-detect parser type
        if log_paths:
            # Detect from first log path
            if log_paths:
                parser_type = detect_result_type(log_paths[0])

                # Validate all paths use same type
                for log_path in log_paths[1:]:
                    detected = detect_result_type(log_path)
                    if detected != parser_type:
                        logging.warning(
                            f"Inconsistent result types detected. "
                            f"Using {parser_type} for all paths."
                        )
        elif log_dir:
            # Detect from single directory
            parser_type = detect_result_type(log_dir)
        else:
            raise ValueError("Must provide either log_dir or log_paths")

    if parser_type is None:
        raise ValueError(
            "Unable to auto-detect result type. "
            "Use force_parser='aiperf' or force_parser='legacy' to specify explicitly."
        )

    # Route to appropriate parser
    logging.info(f"Using {parser_type} parser for results")

    if parser_type == "aiperf":
        from tests.fault_tolerance.deploy.parse_results import main as parse_aiperf

        if log_paths:
            return parse_aiperf(
                logs_dir=None,
                log_paths=log_paths,
                tablefmt=tablefmt,
                sla=sla,
            )
        else:
            return parse_aiperf(
                logs_dir=log_dir,
                log_paths=None,
                tablefmt=tablefmt,
                sla=sla,
            )

    elif parser_type == "legacy":
        from tests.fault_tolerance.deploy.legacy_parse_results import (
            main as parse_legacy,
        )

        if log_paths:
            return parse_legacy(
                logs_dir=None,
                log_paths=log_paths,
                tablefmt=tablefmt,
                sla=sla,
            )
        else:
            return parse_legacy(
                logs_dir=log_dir,
                log_paths=None,
                tablefmt=tablefmt,
                sla=sla,
            )

    else:
        raise ValueError(f"Unknown parser type: {parser_type}")


def get_result_info(log_dir: str) -> Dict[str, Any]:
    """Get information about test results in a directory.

    Args:
        log_dir: Directory containing test results

    Returns:
        Dictionary with result information:
        {
            "type": "aiperf" or "legacy" or None,
            "client_count": number of clients detected,
            "has_test_log": whether test.log.txt exists,
            "details": additional format-specific details
        }
    """
    info: Dict[str, Any] = {
        "type": None,
        "client_count": 0,
        "has_test_log": False,
        "details": {},
    }

    if not os.path.exists(log_dir) or not os.path.isdir(log_dir):
        return info

    log_path = Path(log_dir)

    # Check for test.log.txt
    info["has_test_log"] = (log_path / "test.log.txt").exists()

    # Detect result type
    info["type"] = detect_result_type(log_dir)

    # Count clients and gather details
    if info["type"] == "aiperf":
        attempt_counts = []
        for item in log_path.iterdir():
            if item.is_dir() and item.name.startswith("client_"):
                info["client_count"] += 1
                # Count attempts for this client
                attempts = len(
                    [
                        d
                        for d in item.iterdir()
                        if d.is_dir() and d.name.startswith("attempt_")
                    ]
                )
                attempt_counts.append(attempts)

        info["details"]["attempt_counts"] = attempt_counts
        info["details"]["total_attempts"] = sum(attempt_counts)

    elif info["type"] == "legacy":
        for item in log_path.iterdir():
            if (
                item.is_file()
                and item.name.startswith("client_")
                and item.name.endswith(".log.txt")
            ):
                info["client_count"] += 1

    return info


def print_result_info(log_dir: str) -> None:
    """Print human-readable information about test results.

    Args:
        log_dir: Directory containing test results
    """
    info = get_result_info(log_dir)

    print(f"\nTest Results Information: {log_dir}")
    print("=" * 60)
    print(f"Result Type: {info['type'] or 'Unknown'}")
    print(f"Client Count: {info['client_count']}")
    print(f"Has Test Log: {info['has_test_log']}")

    if info["details"]:
        print("\nDetails:")
        for key, value in info["details"].items():
            print(f"  {key}: {value}")

    print("=" * 60)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(
        description="Auto-detect and parse fault tolerance test results"
    )
    parser.add_argument(
        "log_dir", nargs="?", default=None, help="Directory containing test results"
    )
    parser.add_argument(
        "--log-paths", nargs="+", help="Multiple log directories to process"
    )
    parser.add_argument(
        "--format", choices=["fancy", "markdown"], default="fancy", help="Table format"
    )
    parser.add_argument(
        "--sla", type=float, default=None, help="SLA threshold for latency"
    )
    parser.add_argument(
        "--force-parser",
        choices=["aiperf", "legacy"],
        default=None,
        help="Force use of specific parser (skip auto-detection)",
    )
    parser.add_argument(
        "--info",
        action="store_true",
        help="Print information about results without parsing",
    )

    args = parser.parse_args()

    # Configure logging
    logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

    # Map format choices to tabulate formats
    tablefmt = "fancy_grid" if args.format == "fancy" else "pipe"

    # Info mode
    if args.info:
        if args.log_dir:
            print_result_info(args.log_dir)
        elif args.log_paths:
            for log_path in args.log_paths:
                print_result_info(log_path)
        else:
            print("Error: Must provide log_dir or --log-paths")
    else:
        # Parse mode
        try:
            parse_test_results(
                log_dir=args.log_dir,
                log_paths=args.log_paths,
                tablefmt=tablefmt,
                sla=args.sla,
                force_parser=args.force_parser,
            )
        except Exception as e:
            logging.error(f"Failed to parse results: {e}")
            exit(1)