import pandas as pd import re import subprocess import os import shutil import time from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED import argparse import logging class MyLogger: def __init__(self, logger_name, log_file, console_handler=True, level=logging.INFO): self.logger_name = logger_name self.log_file = log_file self.vlog = logging.getLogger(logger_name) self.vlog.setLevel(level) self.file_handler = logging.FileHandler(log_file) formatter = logging.Formatter('%(asctime)s : %(message)s', "%Y-%m-%d %H:%M:%S") self.file_handler.setFormatter(formatter) self.vlog.addHandler(self.file_handler) if console_handler: self.console_handler = logging.StreamHandler() self.console_handler.setFormatter(formatter) self.console_handler.setLevel(level) self.console_handler.setLevel(level) self.vlog.addHandler(self.console_handler) def get_vlog(self): return self.vlog def __del__(self): self.vlog.removeHandler(self.file_handler) if self.console_handler is not None: self.vlog.removeHandler(self.console_handler) # 定义一个用于打包和传输的函数 def package_and_transfer(image_name, tar_file, image_result_dir, logger): # 打包镜像 save_commands = [ f"sh script/save.sh {image_name} > /dev/null 2>&1", f"mv {tar_file} {image_result_dir}/" ] for save_command in save_commands: logger.info(f"打包镜像: {save_command}") subprocess.run(save_command, shell=True) logger.info(f"镜像 {image_name} 已成功打包 {tar_file}") # 准备执行远程传输命令 recvlog_file = f"{image_name.replace(':', '-')}_recvlog" rsync_command = f'rsync -aP -e "ssh -p 65023 -i my_rsa -o StrictHostKeyChecking=no" {image_result_dir}/{tar_file} {args.des_path} > {recvlog_file}' # 打印并执行 rsync 远程传输命令 logger.info(f"远程传输命令: {rsync_command}") retries = 0 while retries < args.trans_retry_max_num: try: subprocess.run(rsync_command, shell=True, check=True) logger.info(f"镜像 {tar_file} 传输成功,日志保存到 {recvlog_file}") # 传输成功后,将镜像名称追加到日志文件中 with open(args.ok_file, "a") as log: log.write(f"{image_name} 成功传输\n") # 传输成功后删除 .tar 文件 tar_file_path = os.path.join(image_result_dir, tar_file) if os.path.exists(tar_file_path): os.remove(tar_file_path) logger.info(f"{tar_file_path} 已删除") # 传输成功后删除 recvlog 文件 if os.path.exists(recvlog_file): os.remove(recvlog_file) logger.info(f"{recvlog_file} 已删除") break # 成功后跳出重试循环 except subprocess.CalledProcessError: retries += 1 logger.info(f"镜像 {tar_file} 传输失败,尝试重试 {retries}/{args.trans_retry_num} 次") if retries < args.trans_retry_num: time.sleep(args.trans_retry_delay) # 等待一段时间再重试 else: logger.warning(f"传输失败超过最大重试次数,跳过镜像 {image_name}") with open(args.ok_file, "a") as log: log.write(f"{image_name} 传输失败\n") break # 超过最大重试次数后,跳过这个镜像 logger.info(f"==== 镜像 {image_name} 传输完毕 ====") def run(): # 读取Excel文件 df = pd.read_excel(args.input_file) os.makedirs(args.log_dir, exist_ok=True) # 创建线程池 with ThreadPoolExecutor() as executor: # 遍历每一行数据,自动构建镜像 for index, row in df.iterrows(): image_name = row['镜像名'] base_image = row['基础镜像'] framework_version = row['框架版本'] # 直接获取框架版本作为 framework_VERSION other_dependencies = row['其他依赖包'] conda_url = row['conda url'] # 获取conda URL # 日志文件 if os.path.exists(os.path.join(args.log_dir, image_name)): shutil.rmtree(os.path.join(args.log_dir, image_name)) os.makedirs(os.path.join(args.log_dir, image_name)) my_logger = MyLogger(image_name, os.path.join(args.log_dir, image_name, "run.log")) logger = my_logger.get_vlog() # 处理 NaN 情况:确保 base_image 是字符串 if pd.isna(base_image): logger.error(f"基础镜像信息缺失,跳过该行: {image_name}") continue # 提取 torchvision 和 torchaudio 版本号 torchvision_version = None torchaudio_version = None if pd.notna(other_dependencies): # 使用正则表达式提取torchvision和torchaudio版本 match_vision = re.search(r'torchvision-([\d.]+)', other_dependencies) match_audio = re.search(r'torchaudio-([\d.]+)', other_dependencies) if match_vision: torchvision_version = match_vision.group(1) if match_audio: torchaudio_version = match_audio.group(1) # 如果未找到torchvision或torchaudio的版本,默认设置为空 if torchvision_version is None or torchaudio_version is None: torchvision_version = "未找到版本号" if torchaudio_version is None: torchaudio_version = "未找到版本号" # 基于 PyTorch 或 NVIDIA 镜像的构建逻辑 if isinstance(base_image, str): if "pytorch" in image_name: if "pytorch/pytorch" in base_image: # 构建 PyTorch 镜像的命令 build_command = f""" cd build_space && \ ./build_ubuntu.sh jupyterlab {image_name} {base_image} \ 2>&1 | tee ../{args.log_dir}/{image_name}/build.log """ else: # 构建 NVIDIA 镜像的命令 build_command = f""" cd build_space && \ ./build_ubuntu.sh jupyterlab {image_name} {base_image} \ TORCH_VERSION="{framework_version}" \ TORCHVISION_VERSION="{torchvision_version}" \ TORCHAUDIO_VERSION="{torchaudio_version}" \ CONDA_URL="{conda_url}" \ 2>&1 | tee ../{args.log_dir}/{image_name}/build.log """ elif "tensorflow" in image_name: build_command = f""" cd build_space && \ ./build_ubuntu.sh jupyterlab {image_name} {base_image} \ TENSORFLOW_VERSION="{framework_version}" \ CONDA_URL="{conda_url}" \ 2>&1 | tee ../{args.log_dir}/{image_name}/build.log """ # 打印构建命令(用于调试) logger.info(build_command) # 执行构建命令,捕获异常 try: logger.info(f"==== 镜像 {image_name} 开始构建 ====") subprocess.run(build_command, shell=True, check=True) except subprocess.CalledProcessError: logger.info(f"==== 镜像 {image_name} 构建失败,跳过该镜像 ====") continue # 继续执行下一个镜像 # 创建与镜像名称对应的文件夹,用于保存测试结果 image_result_dir = os.path.join(args.log_dir, image_name) # 执行测试并将日志保存到相应的目录 test_commands = [ f"sh script/1_base_test.sh {image_name} > {image_result_dir}/1_base_test.log 2>&1", f"sh script/2_text_test.sh {image_name} > {image_result_dir}/2_text_test.log 2>&1", f"sh script/3_image_test.sh {image_name} > {image_result_dir}/3_image_test.log 2>&1", ] if "pytorch" in image_name: test_commands.append( f"mv gpu-base-image-test/pytorch/stable-diffusion-v1-4/output.png {image_result_dir}") # 执行测试命令 for test_command in test_commands: logger.info(f"执行测试: {test_command}") subprocess.run(test_command, shell=True) # 生成打包后的镜像文件名,替换 ":" 为 "-" 并添加 ".tar" 后缀 tar_file = f"{image_name.replace(':', '-')}.tar" if not args.no_save_trans: # 提交打包和传输任务到后台线程池,继续执行下一个构建任务 executor.submit(package_and_transfer, image_name, tar_file, image_result_dir, logger) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Autobuild images from a excel file.') parser.add_argument('--input-file', type=str, default="input.xlsx", required=True, help='a excel file with images to build.') parser.add_argument('--index', type=str, help='the indexes for images to build, separated by ","') parser.add_argument('--num', type=int, help='the number of images to build') parser.add_argument('--log-dir', type=str, default="logs", help='logs directory') parser.add_argument('--ok-file', type=str, default="ok.txt", help='the file of succeed images') parser.add_argument('--trans-retry-max-num', type=int, default=3, help='transform retry max num') parser.add_argument('--trans-retry-delay', type=int, default=5, help='transform delay seconds') parser.add_argument('--des-path', type=str, default="openaimodels@cancon.hpccube.com:/public/home/openaimodels/chenyh/", help='destination path in scnet') parser.add_argument("--no-save-trans", action="store_true", help="do not save and transform image") args = parser.parse_args() run()