import gradio as gr import requests import json import time import sqlite3 import os import yaml from datetime import datetime, timedelta import pandas as pd import plotly.graph_objects as go from plotly.subplots import make_subplots import threading import queue from typing import List, Dict, Any import re import traceback class DCUMultiMonitor: def __init__(self, db_path="/public/wkx/dcgm/mon/monitor.db", yaml_path="dev_list.yaml"): self.db_path = db_path self.yaml_path = yaml_path self.targets = set() # 存储所有监控目标 self.target_status = {} # 存储每个目标的状态 self.data_lock = threading.Lock() # 数据锁 self.monitoring = False self.monitor_thread = None self.failed_targets = set() # 存储失败的目标 self.ensure_db_exists() self.load_targets_from_yaml() # 从YAML文件加载目标 def ensure_db_exists(self): """确保数据库和表存在""" os.makedirs(os.path.dirname(self.db_path), exist_ok=True) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='monitor_data';") table_exists = cursor.fetchone() if table_exists: cursor.execute("PRAGMA table_info(monitor_data);") columns = [col[1] for col in cursor.fetchall()] expected_columns = ['url', 'minor_number', 'timestamp', 'power_usage', 'memory_used', 'memory_cap', 'utilization_rate', 'temperature'] missing_columns = [col for col in expected_columns if col not in columns] if missing_columns: print(f"检测到表结构不完整,缺少列: {missing_columns}") print("正在重建表...") cursor.execute("DROP TABLE monitor_data;") cursor.execute(''' CREATE TABLE monitor_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, minor_number INTEGER NOT NULL, timestamp DATETIME NOT NULL, power_usage REAL, memory_used REAL, memory_cap REAL, utilization_rate REAL, temperature REAL, FOREIGN KEY (url) REFERENCES monitor_targets (url) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS monitor_targets ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE NOT NULL, added_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS monitor_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, minor_number INTEGER NOT NULL, timestamp DATETIME NOT NULL, power_usage REAL, memory_used REAL, memory_cap REAL, utilization_rate REAL, temperature REAL, FOREIGN KEY (url) REFERENCES monitor_targets (url) ) ''') conn.commit() conn.close() def load_targets_from_yaml(self): """从YAML文件加载监控目标""" if os.path.exists(self.yaml_path): try: with open(self.yaml_path, 'r', encoding='utf-8') as f: urls = [line.strip() for line in f.readlines() if line.strip()] self.targets = set(urls) print(f"从YAML文件加载了 {len(self.targets)} 个监控目标") except Exception as e: print(f"读取YAML文件失败: {e}") self.targets = set() else: print(f"YAML文件 {self.yaml_path} 不存在,创建新文件") self.targets = set() self.save_targets_to_yaml() for target in self.targets: self.target_status[target] = True def save_targets_to_yaml(self): """保存监控目标到YAML文件""" try: with open(self.yaml_path, 'w', encoding='utf-8') as f: for url in sorted(self.targets): f.write(f"{url}\n") print(f"保存 {len(self.targets)} 个监控目标到YAML文件") return True except Exception as e: print(f"保存YAML文件失败: {e}") return False def add_target(self, url: str): """添加监控目标""" if self.is_valid_url(url): if url not in self.targets: self.targets.add(url) self.target_status[url] = True success = self.save_targets_to_yaml() if success: print(f"已添加目标: {url}") return True, f"成功添加: {url}" else: self.targets.remove(url) return False, f"保存失败: {url}" else: return False, f"目标已存在: {url}" return False, f"无效URL格式: {url}" def remove_target_by_ip(self, ip: str): """通过IP地址删除监控目标""" matched_url = None for url in self.targets: if ip in url: matched_url = url break if matched_url: self.targets.remove(matched_url) if matched_url in self.target_status: del self.target_status[matched_url] if matched_url in self.failed_targets: self.failed_targets.remove(matched_url) self.save_targets_to_yaml() conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("DELETE FROM monitor_targets WHERE url = ?", (matched_url,)) cursor.execute("DELETE FROM monitor_data WHERE url = ?", (matched_url,)) conn.commit() conn.close() return True, f"已删除: {matched_url}" return False, f"未找到包含IP {ip} 的设备" def is_valid_url(self, url: str) -> bool: """验证URL格式""" pattern = r'^https?://[\w\.-]+:\d+/.*$' return bool(re.match(pattern, url)) def extract_ip_from_url(self, url: str) -> str: """从URL中提取IP地址""" match = re.search(r'https?://([\w\.-]+):', url) if match: return match.group(1) return url def fetch_dcu_data(self, url: str) -> List[Dict[str, Any]]: """获取DCU数据,增加超时和异常处理""" try: # 设置更短的超时时间,避免卡死 response = requests.get(url, timeout=3) if response.status_code == 200: data = response.json() if isinstance(data, list): for item in data: if item.get('MemoryCap', 0) > 0: mem_util = (item['MemoryUsed'] / item['MemoryCap']) * 100 item['MemoryUtilization'] = int(mem_util) else: item['MemoryUtilization'] = 0 return data return [] except requests.exceptions.Timeout: print(f"获取数据超时 {url}") return [] except requests.exceptions.ConnectionError: print(f"连接失败 {url}") return [] except Exception as e: print(f"获取数据失败 {url}: {str(e)}") return [] def save_data_to_db(self, url: str, data: List[Dict[str, Any]]): """保存数据到数据库""" with self.data_lock: try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cutoff_time = datetime.now() - timedelta(hours=24) cursor.execute("DELETE FROM monitor_data WHERE timestamp < ?", (cutoff_time,)) for item in data: cursor.execute(''' INSERT INTO monitor_data (url, minor_number, timestamp, power_usage, memory_used, memory_cap, utilization_rate, temperature) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( url, item['MinorNumber'], datetime.now(), item['PowerUsage'], item['MemoryUsed'], item['MemoryCap'], item['UtilizationRate'], item['Temperature'] )) conn.commit() conn.close() except Exception as e: print(f"保存数据到数据库失败 {url}: {str(e)}") def get_recent_data(self, url: str, minutes: int = 30) -> pd.DataFrame: """获取最近数据,使用固定30分钟""" try: with self.data_lock: conn = sqlite3.connect(self.db_path) query = ''' SELECT url, minor_number, timestamp, power_usage, memory_used, memory_cap, utilization_rate, temperature FROM monitor_data WHERE url = ? AND timestamp >= datetime('now', '-30 minutes') ORDER BY timestamp ASC ''' df = pd.read_sql_query(query, conn, params=(url,)) conn.close() return df except Exception as e: print(f"获取数据失败: {str(e)}") return pd.DataFrame() def start_monitoring(self): """开始监控""" if self.monitoring: return "监控已在运行中" self.monitoring = True def monitor_loop(): while self.monitoring: for url in list(self.targets): if not self.target_status.get(url, True): continue try: data = self.fetch_dcu_data(url) if data: self.save_data_to_db(url, data) if url in self.failed_targets: self.failed_targets.remove(url) else: # 第一次失败就标记为失败 if url not in self.failed_targets: self.failed_targets.add(url) print(f"设备 {url} 连接失败") except Exception as e: print(f"监控循环中发生错误 {url}: {str(e)}") if url not in self.failed_targets: self.failed_targets.add(url) time.sleep(5) self.monitor_thread = threading.Thread(target=monitor_loop) self.monitor_thread.daemon = True self.monitor_thread.start() return f"监控已启动,当前监控 {len(self.targets)} 个目标" def stop_monitoring(self): """停止监控""" self.monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=1) return "监控已停止" def get_all_targets(self): """获取所有目标""" return list(self.targets) def get_target_status_info(self): """获取目标状态信息""" targets_info = [] for target in self.targets: ip = self.extract_ip_from_url(target) status = "✅" if target not in self.failed_targets else "❌" targets_info.append({ 'url': target, 'ip': ip, 'status': status }) return targets_info def refresh_targets(self): """刷新目标列表""" self.load_targets_from_yaml() return self.get_target_status_info() def create_charts_for_target(self, url: str): """为单个目标创建4个图表""" try: df = self.get_recent_data(url) if df.empty: fig_power = go.Figure() fig_power.update_layout(title=f'{self.extract_ip_from_url(url)} - 功率监控 (W)', height=600) fig_memory = go.Figure() fig_memory.update_layout(title=f'{self.extract_ip_from_url(url)} - 内存利用率 (%)', height=600) fig_util = go.Figure() fig_util.update_layout(title=f'{self.extract_ip_from_url(url)} - 利用率 (%)', height=600) fig_temp = go.Figure() fig_temp.update_layout(title=f'{self.extract_ip_from_url(url)} - 温度 (°C)', height=600) return fig_power, fig_memory, fig_util, fig_temp fig_power = go.Figure() fig_memory = go.Figure() fig_util = go.Figure() fig_temp = go.Figure() for device_id in df['minor_number'].unique(): device_data = df[df['minor_number'] == device_id] device_data = device_data.copy() device_data['mem_util'] = ((device_data['memory_used'] / device_data['memory_cap']) * 100).round().astype(int) fig_power.add_trace(go.Scatter( x=device_data['timestamp'], y=device_data['power_usage'], mode='lines+markers', name=f'Device {device_id}', line=dict(width=2), marker=dict(size=4) )) fig_memory.add_trace(go.Scatter( x=device_data['timestamp'], y=device_data['mem_util'], mode='lines+markers', name=f'Device {device_id}', line=dict(width=2), marker=dict(size=4) )) fig_util.add_trace(go.Scatter( x=device_data['timestamp'], y=device_data['utilization_rate'], mode='lines+markers', name=f'Device {device_id}', line=dict(width=2), marker=dict(size=4) )) fig_temp.add_trace(go.Scatter( x=device_data['timestamp'], y=device_data['temperature'], mode='lines+markers', name=f'Device {device_id}', line=dict(width=2), marker=dict(size=4) )) fig_power.update_layout( title=f'{self.extract_ip_from_url(url)} - 功率监控 (W)', xaxis_title='时间', yaxis_title='功率 (W)', hovermode='x unified', height=600 ) fig_memory.update_layout( title=f'{self.extract_ip_from_url(url)} - 内存利用率 (%)', xaxis_title='时间', yaxis_title='内存利用率 (%)', yaxis=dict(range=[0, 100]), hovermode='x unified', height=600 ) fig_util.update_layout( title=f'{self.extract_ip_from_url(url)} - 利用率 (%)', xaxis_title='时间', yaxis_title='利用率 (%)', yaxis=dict(range=[0, 100]), hovermode='x unified', height=600 ) fig_temp.update_layout( title=f'{self.extract_ip_from_url(url)} - 温度 (°C)', xaxis_title='时间', yaxis_title='温度 (°C)', hovermode='x unified', height=600 ) return fig_power, fig_memory, fig_util, fig_temp except Exception as e: print(f"创建图表时发生错误: {str(e)}") error_fig = go.Figure() error_fig.update_layout(title=f'数据加载失败: {str(e)}', height=600) return error_fig, error_fig, error_fig, error_fig def refresh_all_charts(self, selected_target): """刷新所有图表""" try: if not selected_target or selected_target not in self.targets: empty_fig = go.Figure() empty_fig.update_layout(title="请选择一个监控目标", height=600) return empty_fig, empty_fig, empty_fig, empty_fig, "" if selected_target in self.failed_targets: error_msg = f"错误:无法获取 {selected_target} 的数据,请检查连接" empty_fig = go.Figure() empty_fig.update_layout(title=f"{self.extract_ip_from_url(selected_target)} - 连接失败", height=600) return empty_fig, empty_fig, empty_fig, empty_fig, error_msg charts = self.create_charts_for_target(selected_target) return *charts, f"最后更新: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" except Exception as e: print(f"刷新图表时发生错误: {str(e)}") error_fig = go.Figure() error_fig.update_layout(title=f'刷新失败: {str(e)}', height=600) return error_fig, error_fig, error_fig, error_fig, f"刷新失败: {str(e)}" # 创建监控实例 monitor = DCUMultiMonitor() # 创建Gradio界面 with gr.Blocks(title="DCU多机监控平台", theme=gr.themes.Soft()) as demo: gr.Markdown("# DCU多机监控平台") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 添加监控目标") single_url = gr.Textbox(label="单个URL", placeholder="http://ip:16081/CollectDeviceMetrics") single_add_btn = gr.Button("添加单个URL", variant="primary") gr.Markdown("### 监控控制") start_btn = gr.Button("开始监控", variant="primary") stop_btn = gr.Button("停止监控", variant="stop") refresh_btn = gr.Button("刷新设备列表", variant="secondary") status_output = gr.Textbox(label="状态", interactive=False) gr.Markdown("### 监控设备列表") device_radio = gr.Radio(label="选择设备", choices=[], interactive=True) gr.Markdown("### 删除设备") delete_ip = gr.Textbox(label="输入IP地址", placeholder="例如: 10.20.100.12") delete_btn = gr.Button("删除设备", variant="stop") with gr.Column(scale=3): gr.Markdown("### 监控图表") with gr.Tab("功率监控"): power_chart = gr.Plot() with gr.Tab("内存利用率"): memory_chart = gr.Plot() with gr.Tab("利用率"): utilization_chart = gr.Plot() with gr.Tab("温度"): temperature_chart = gr.Plot() last_update = gr.Textbox(label="更新时间", interactive=False) def update_device_list(): """更新设备列表""" try: targets_info = monitor.get_target_status_info() if not targets_info: return gr.Radio(choices=[], label="选择设备", value=None) choices = [] for info in targets_info: display_text = f"{info['status']} {info['ip']}" choice_value = info['url'] choices.append((display_text, choice_value)) return gr.Radio(choices=choices, label="选择设备", value=None) except Exception as e: print(f"更新设备列表时发生错误: {str(e)}") return gr.Radio(choices=[], label="选择设备", value=None) def handle_single_add(url): """处理添加单个URL""" try: success, message = monitor.add_target(url) if success: return message, update_device_list() else: return message, update_device_list() except Exception as e: return f"添加失败: {str(e)}", update_device_list() def handle_delete_device(ip): """处理删除设备""" try: if ip.strip(): success, message = monitor.remove_target_by_ip(ip.strip()) if success: return message, update_device_list(), "" # 清空输入框 else: return message, update_device_list(), ip return "请输入IP地址", update_device_list(), ip except Exception as e: return f"删除失败: {str(e)}", update_device_list(), ip def handle_refresh(): """刷新设备列表""" try: targets_info = monitor.refresh_targets() return f"已刷新,当前监控 {len(targets_info)} 个设备", update_device_list() except Exception as e: return f"刷新失败: {str(e)}", update_device_list() def handle_device_select(selected_url): """处理设备选择,自动刷新图表""" try: if selected_url: result = monitor.refresh_all_charts(selected_url) return result return monitor.refresh_all_charts(None) except Exception as e: error_msg = f"刷新图表失败: {str(e)}" error_fig = go.Figure() error_fig.update_layout(title=error_msg, height=600) return error_fig, error_fig, error_fig, error_fig, error_msg # 绑定事件 single_add_btn.click( fn=handle_single_add, inputs=single_url, outputs=[status_output, device_radio] ) delete_btn.click( fn=handle_delete_device, inputs=delete_ip, outputs=[status_output, device_radio, delete_ip] ) start_btn.click( fn=monitor.start_monitoring, inputs=None, outputs=status_output ) stop_btn.click( fn=monitor.stop_monitoring, inputs=None, outputs=status_output ) refresh_btn.click( fn=handle_refresh, inputs=None, outputs=[status_output, device_radio] ) # 设备选择变化时自动刷新图表 device_radio.change( fn=handle_device_select, inputs=device_radio, outputs=[power_chart, memory_chart, utilization_chart, temperature_chart, last_update] ) # 初始化设备列表 demo.load(fn=update_device_list, outputs=device_radio) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, share=True)