import subprocess import json import time import sys def run_hy_smi(): """运行hy-smi命令并返回JSON格式的输出""" try: # 使用JSON格式获取数据,提高解析可靠性 result = subprocess.run( ['hy-smi', '--showtemp', '--showpower', '--showuse', '--showmemuse', '--json'], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: print(f"命令执行错误: {result.stderr}") return None return json.loads(result.stdout) except subprocess.TimeoutExpired: print("命令执行超时") return None except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") return None except Exception as e: print(f"意外错误: {e}") return None def parse_data(data): """解析hy-smi输出数据""" metrics = [] # 根据实际JSON结构提取数据 for card_key, card_data in data.items(): try: # 使用结温作为温度指标 temperature = float(card_data.get("Temperature (Sensor junction) (C)", 0)) # 提取功耗 power = float(card_data.get("Average Graphics Package Power (W)", 0)) # 提取利用率 utilization = float(card_data.get("HCU use (%)", 0)) # 提取内存使用百分比 mem_usage_percent = float(card_data.get("HCU memory use (%)", 0)) metric = { 'card': card_key, 'temperature': temperature, 'power': power, 'utilization': utilization, 'mem_usage_percent': mem_usage_percent } metrics.append(metric) except (ValueError, TypeError) as e: print(f"解析数据时出错: {e}") continue return metrics def main(): """主函数""" print("开始收集DCU指标,按Ctrl+C停止...") print("{:<10} {:<8} {:<10} {:<10} {:<15} {:<15}".format( "时间", "卡号", "温度(℃)", "功耗(W)", "利用率(%)", "内存使用(%)")) data_points = [] try: while True: raw_data = run_hy_smi() if raw_data: current_metrics = parse_data(raw_data) if current_metrics: for metric in current_metrics: print("{:<10} {:<8} {:<10.1f} {:<10.1f} {:<15.1f} {:<15.0f}".format( time.strftime("%H:%M:%S"), metric['card'], metric['temperature'], metric['power'], metric['utilization'], metric['mem_usage_percent'] )) data_points.extend(current_metrics) time.sleep(1) except KeyboardInterrupt: print("\n正在计算平均值...") if data_points: # 计算所有卡的平均值 avg_temp = sum(m['temperature'] for m in data_points) / len(data_points) avg_power = sum(m['power'] for m in data_points) / len(data_points) avg_util = sum(m['utilization'] for m in data_points) / len(data_points) avg_mem_percent = sum(m['mem_usage_percent'] for m in data_points) / len(data_points) print("\n所有卡的平均值统计:") print("平均温度: {:.1f}℃".format(avg_temp)) print("平均功耗: {:.1f}W".format(avg_power)) print("平均利用率: {:.1f}%".format(avg_util)) print("平均内存使用: {:.1f}%".format(avg_mem_percent)) # 计算每张卡的平均值 card_stats = {} for metric in data_points: card = metric['card'] if card not in card_stats: card_stats[card] = {'temp': [], 'power': [], 'util': [], 'mem': []} card_stats[card]['temp'].append(metric['temperature']) card_stats[card]['power'].append(metric['power']) card_stats[card]['util'].append(metric['utilization']) card_stats[card]['mem'].append(metric['mem_usage_percent']) print("\n每张卡的平均值统计:") print("平均温度: {:.1f}℃".format(avg_temp)) print("平均功耗: {:.1f}W".format(avg_power)) print("平均利用率: {:.1f}%".format(avg_util)) print("平均内存使用: {:.1f}%".format(avg_mem_percent)) for card, stats in card_stats.items(): avg_card_temp = sum(stats['temp']) / len(stats['temp']) avg_card_power = sum(stats['power']) / len(stats['power']) avg_card_util = sum(stats['util']) / len(stats['util']) avg_card_mem = sum(stats['mem']) / len(stats['mem']) print("{:<8} {:<10.1f} {:<10.1f} {:<15.1f} {:<15.1f}".format( card, avg_card_temp, avg_card_power, avg_card_util, avg_card_mem)) else: print("未收集到有效数据") if __name__ == "__main__": main()