auto_test_for_api_pipeline.py 15.1 KB
Newer Older
zzg_666's avatar
zzg_666 committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import os
import json
import argparse
import importlib
import inspect
import logging
import traceback
from pathlib import Path
from io import StringIO
from typing import List, Tuple, Dict, Optional
import sys


CURRENT_SCRIPT_PATH = Path(__file__).resolve()  
TEST_DIR = CURRENT_SCRIPT_PATH.parent  
PROJECT_ROOT = TEST_DIR.parent  


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s:%(filename)s:%(lineno)d - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger("PipelineRunner") 



def parse_arguments() -> argparse.Namespace:
    """Parse command-line arguments with support for custom Pipeline directories."""
    parser = argparse.ArgumentParser(
        description="Execute API Pipeline in batches, automatically switch the working directory, and record a complete log.",
        formatter_class=argparse.RawTextHelpFormatter  # 支持换行显示帮助信息
    )
    # 默认Pipeline目录:项目根/run_dataflow/api_pipelines
    default_pipeline_dir = PROJECT_ROOT / "run_dataflow" / "api_pipelines"
    parser.add_argument(
        "--pipeline-dir",
        type=str,
        default=str(default_pipeline_dir.resolve()),
        help=f"API Pipeline absolute path\nexample:/home/user/project/run_dataflow/api_pipelines\ndefault:{default_pipeline_dir.resolve()}"
    )
    return parser.parse_args()


def switch_working_directory(target_dir: Path) -> Optional[str]:
    """Switch to the target working directory and return the original directory for subsequent recovery.
    If the switch fails, raise an exception and terminate the script.
    """
    if not target_dir.exists():
        raise FileNotFoundError(f"Pipeline directory does not exist: {target_dir}")
    if not target_dir.is_dir():
        raise NotADirectoryError(f"The specified path is not a directory: {target_dir}")
    
    original_dir = os.getcwd()
    try:
        os.chdir(str(target_dir))
        logger.info(f"Working directory switched successfully: [{original_dir}] → [{os.getcwd()}]")
        return original_dir
    except Exception as e:
        raise RuntimeError(f"Failed to switch working directory: {str(e)}") from e


def collect_pipeline_classes(pipeline_dir: Path) -> Tuple[List[Tuple[str, type, str]], List[Dict]]:
    """Collect all custom Pipeline classes in the specified directory.
    Returns: (successfully collected class list, import error detail list)
    Class list format: (class name, class object, file name)
    Error list format: {file name, error type, error message, stack trace}
    """
    pipeline_classes: List[Tuple[str, type, str]] = []
    import_errors: List[Dict] = []
    current_script_name = CURRENT_SCRIPT_PATH.name  # skip self script
    

    for py_file in pipeline_dir.glob("*.py"):
        if py_file.name == current_script_name:
            continue
        
        try:
            # construct module path (based on project root, ensure correct import)
            relative_path = py_file.relative_to(PROJECT_ROOT)
            module_name = ".".join(relative_path.with_suffix("").parts)
            
            # import module and filter "classes defined in the current file" (exclude external imported classes)
            module = importlib.import_module(module_name)
            for class_name, cls in inspect.getmembers(module, inspect.isclass):
                if cls.__module__ == module_name:  # ensure the class is defined in the current file
                    pipeline_classes.append((class_name, cls, py_file.name))
                    logger.info(f"发现Pipeline类:{class_name}(文件:{py_file.name})")
        
        except Exception as e:
            # record complete error information (including stack trace) for troubleshooting
            error_detail = {
                "file_name": py_file.name,
                "error_type": type(e).__name__,
                "error_msg": str(e),
                "stack_trace": traceback.format_exc()  # complete stack trace, more detailed than inspect
            }
            import_errors.append(error_detail)
            logger.error(f"Failed to import file {py_file.name}: {str(e)}")
    
    return pipeline_classes, import_errors


def run_single_pipeline(pipeline_class: type, class_name: str, file_name: str) -> Dict:
    """
    Run a single Pipeline and return the execution result (including error details).
    New: Preserve terminal output when redirecting, implement "cache record + terminal display" dual output.
    """
    result = {
        "pipeline_class": class_name,
        "file_name": file_name,
        "status": "success",
        "error": None,
        "error_details": None
    }

    # ------------------------------
    # Core modification: custom dual output stream (write cache and terminal at the same time)
    # ------------------------------
    class TeeStringIO(StringIO):
        def __init__(self, original_stream):
            super().__init__()
            self.original_stream = original_stream  # save the original terminal stream (for real-time output)

        def write(self, s):
            super().write(s)  # write content to cache (for subsequent error detection)
            self.original_stream.write(s)  # write content to terminal (preserve real-time display)   
            self.original_stream.flush()  
            self.original_stream.flush()  
        def flush(self):
            super().flush()
            self.original_stream.flush()  

    # initialize output capture (stdout + stderr use dual output stream)
    old_stdout = sys.stdout
    old_stderr = sys.stderr
    # use custom TeeStringIO, implement "cache + terminal" dual output
    captured_print = TeeStringIO(old_stdout)
    captured_stderr = TeeStringIO(old_stderr)
    sys.stdout = captured_print
    sys.stderr = captured_stderr

    # custom Logging Handler: capture the output of the root logger (e.g. ERROR:root:xxx)
    class CaptureLogHandler(logging.Handler):
        def __init__(self):
            super().__init__()
            self.logs = []

        def emit(self, record: logging.LogRecord) -> None:
            log_msg = self.format(record)
            self.logs.append(log_msg)  # record to cache
            # logging already outputs to terminal by default, no additional processing needed

    log_handler = CaptureLogHandler()
    log_handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
    root_logger = logging.getLogger()
    root_logger.addHandler(log_handler)

    try:        
        logger.info(f"Start running Pipeline: {class_name} (file: {file_name})")    
        # initialize Pipeline
        pipeline = pipeline_class()

        # configure LLM Serving (avoid duplicate code)
        if hasattr(pipeline, "llm_serving"):
            llm_config = {
                "api_url": "http://123.129.219.111:3000/v1/chat/completions",
                "model_name": "gpt-3.5-turbo",
                "max_workers": 15
            }
            for key, value in llm_config.items():
                setattr(pipeline.llm_serving, key, value)
            logger.debug(f"LLM Serving configured: {llm_config}")

        # execute Pipeline (support forward/run two methods)
        output = None
        if hasattr(pipeline, "forward"):
            output = pipeline.forward()
        elif hasattr(pipeline, "run"):
            output = pipeline.run()
        else:
            raise AttributeError("No forward or run method found, cannot execute Pipeline")


        # merge all outputs (stdout/stderr/logging all included)
        full_output = "\n".join([
            "=== Captured Print Output ===",
            captured_print.getvalue(),
            "=== Captured STDERR Output ===",
            captured_stderr.getvalue(),
            "=== Captured Logging Output ===",
            "\n".join(log_handler.logs),
            "=== Pipeline Return Value ===",
            str(output) if output is not None else "No return value"
        ])

        # ------------------------------
            # Core modification: detect error in stdout/stderr (case-insensitive)
        # ------------------------------
        # 1. detect error in stdout/stderr (cover all lowercase/uppercase/mixed case)
        if "error" in full_output.lower():
            # extract the context containing error,便于定位问题
            error_lines = [line for line in full_output.split("\n") if "error" in line.lower()]
            error_context = "\n".join(error_lines[:5])  # extract the first 5 lines of the context
            raise RuntimeError(f"Detected error in stdout/stderr, context: \n{error_context}")

        # 2. detect if there is an ERROR level log (e.g. logging.error() output log)
        for log in log_handler.logs:
            if log.startswith("ERROR:"):
                raise RuntimeError(f"Detected ERROR level log: {log}")

        logger.info(f"Pipeline executed successfully: {class_name}")
        return result

    except Exception as e:
        # record error details (including complete context)
        full_output = "\n".join([
            "=== Captured Print Output ===",
            captured_print.getvalue(),
            "=== Captured STDERR Output ===",
            captured_stderr.getvalue(),
            "=== Captured Logging Output ===",
            "\n".join(log_handler.logs),
            "=== Pipeline Return Value ===",
            str(output) if 'output' in locals() else "No return value"
        ])
        error_msg = f"{type(e).__name__}: {str(e)}"
        error_details = f"{traceback.format_exc()}\n\n=== Complete Execution Context ===\n{full_output}"

        result.update({
            "status": "failed",
            "error": error_msg,
            "error_details": error_details
        })
        logger.error(f"Pipeline execution failed: {error_msg}", exc_info=True)
        return result

    finally:
        # restore the original output stream, avoid affecting subsequent logic
        sys.stdout = old_stdout
        sys.stderr = old_stderr
        if log_handler in root_logger.handlers:
            root_logger.removeHandler(log_handler)


# ------------------------------
# 6. export logs (optimize format, distinguish import errors and run errors)
# ------------------------------
def export_run_logs(
    run_results: List[Dict],
    import_errors: List[Dict],
    log_dir: Path = TEST_DIR / "pipeline_logs"
) -> None:
    """
    Export run logs:
    1. JSON file: complete run results + import errors (for debugging)
    2. TXT log: key error information (for quick viewing)
    """
    os.makedirs(log_dir, exist_ok=True)
    current_time = os.popen('date "+%Y%m%d_%H%M%S"').read().strip()  # timestamp, avoid log overwrite
    
    # 1. complete JSON log (including all details)
    json_data = {
        "run_time": current_time,
        "pipeline_dir": str(PIPELINE_DIR),
        "import_errors": import_errors,
        "run_results": run_results
    }
    json_path = log_dir / f"pipeline_full_results_{current_time}.json"
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(json_data, f, ensure_ascii=False, indent=2)
    
    # 2. simplified TXT log (key information, for quick viewing)
    log_path = log_dir / f"pipeline_error_summary_{current_time}.log"
    with open(log_path, "w", encoding="utf-8") as f:
        f.write(f"===== Pipeline Batch Running Log ({current_time}) =====\n")
        f.write(f"Pipeline Directory: {PIPELINE_DIR}\n")
        f.write(f"Total Detected Files: {len(import_errors) + len(run_results)}\n")
        f.write(f"Import Errors: {len(import_errors)}\n")
        f.write(f"Success: {sum(1 for res in run_results if res['status'] == 'success')}\n")
        f.write(f"Failed: {sum(1 for res in run_results if res['status'] == 'failed')}\n\n")
        
        # write import errors
        if import_errors:
            f.write("===== Import Errors Details =====\n")
            for idx, err in enumerate(import_errors, 1):
                f.write(f"[{idx}] File: {err['file_name']}\n")
                f.write(f"    Error Type: {err['error_type']}\n")
                f.write(f"    Error Message: {err['error_msg']}\n")
                f.write(f"    Complete Stack Trace: see JSON log import_errors[{idx-1}].stack_trace\n\n")
        
        # write run errors
        run_errors = [res for res in run_results if res['status'] == 'failed']
        if run_errors:
            f.write("===== Run Errors Details =====\n")
            for idx, err in enumerate(run_errors, 1):
                f.write(f"[{idx}] Class Name: {err['pipeline_class']} (File: {err['file_name']})\n")
                f.write(f"    Error Message: {err['error']}\n")
                f.write(f"    Complete Details: see JSON log run_results[{run_results.index(err)}].error_details\n\n")
    
    logger.info(f"Logs exported to: {log_dir}")
    logger.info(f"Complete JSON Log: {json_path.name}")
    logger.info(f"Simplified Error Log: {log_path.name}")


# ------------------------------
# 7. main function (process chain, more robust exception capture)
# ------------------------------
def main() -> None:
    try:
        # step 1: parse arguments
        args = parse_arguments()
        global PIPELINE_DIR  # global variable, for log export usage
        PIPELINE_DIR = Path(args.pipeline_dir).resolve()
        
        # step 2: add project root to Python path (ensure import dataflow)
        if str(PROJECT_ROOT) not in sys.path:
            sys.path.append(str(PROJECT_ROOT))
            logger.debug(f"Added project root to Python path: {PROJECT_ROOT}")
        
        # step 3: switch working directory
        original_dir = switch_working_directory(PIPELINE_DIR)
        
        # step 4: collect Pipeline classes (including import errors)
        pipeline_classes, import_errors = collect_pipeline_classes(PIPELINE_DIR)
        if not pipeline_classes and not import_errors:
            logger.warning("No .py files found, no need to run")
            return
        if not pipeline_classes:
            logger.error("No runnable Pipeline classes found, exiting")
            export_run_logs([], import_errors)  # export import errors log
            return
        
        # step 5: batch run Pipeline
        logger.info(f"Start batch running, total {len(pipeline_classes)} Pipelines")
        run_results = []
        for class_name, cls, file_name in pipeline_classes:
            run_res = run_single_pipeline(cls, class_name, file_name)
            run_results.append(run_res)
        
        # step 6: export logs
        export_run_logs(run_results, import_errors)
        
        # step 7: output run summary
        success_count = sum(1 for res in run_results if res['status'] == 'success')
        fail_count = len(run_results) - success_count
        print(f"\n===== Batch Running Completed =====\n")
        print(f"📊 Run Summary:")
        print(f"   Total Pipelines: {len(run_results)}")
        print(f"   ✅ Success: {success_count}")
        print(f"   ❌ Failed: {fail_count}")
        print(f"   ⚠️  Import Errors: {len(import_errors)}")
        print(f"\nLog Location: {TEST_DIR / 'pipeline_logs'}")
        
    except Exception as e:
        # capture script-level exceptions (e.g. directory switch failure, parameter error)
        logger.error(f"Script execution failed: {str(e)}", exc_info=True)
        print(f"\n❌ Script execution error: {str(e)}")
        sys.exit(1)


if __name__ == "__main__":
    main()