import requests import json import re API_BASE_URL = "http://localhost:8000/v1" # MODEL_NAME = "/home/zwq/model/Qwen3-30B-A3B" MODEL_NAME = "/home/zwq/model/Qwen3-30B-A3B-Instruct-2507" class Qwen3ChatClient: def __init__(self, api_base_url=API_BASE_URL, model_name=MODEL_NAME): self.api_base_url = api_base_url self.model_name = model_name self.history = [] def _parse_response(self, text): """ 解析模型响应,分离思考内容和最终内容。 """ thinking_content = "" main_content = text # re.DOTALL 确保 '.' 匹配包括换行符在内的所有字符 # 这个正则表达式尝试捕获 之间的内容,以及其后的所有内容 match = re.search(r'(.*?)(.*)', text, re.DOTALL) if match: thinking_content = match.group(1).strip() main_content = match.group(2).strip() # 如果没有 标签,则 thinking_content 保持为空,main_content 为原始文本 return thinking_content, main_content def generate_response(self, user_input, enable_thinking=True, conversation_history=None): """ 向Qwen3模型发送请求并获取响应。 :param user_input: 用户输入的消息。 :param enable_thinking: 是否启用思考模式。True为思考模式(默认),False为非思考模式。 :param conversation_history: 可选的对话历史列表,格式为 [{"role": "...", "content": "..."}] :return: (full_assistant_content, list_of_rank1_logprobs) full_assistant_content 是模型的完整原始回答,包含思考内容 """ if conversation_history is None: conversation_history = [{"role": "system", "content": "You are a helpful assistant."}] # 将用户输入添加到当前会话历史 current_messages = conversation_history + [{"role": "user", "content": user_input}] headers = { "Content-Type": "application/json" } # 采样参数,固定用于确定性或近似确定性测试 temperature, top_p, top_k = 0.0, 1.0, 1 payload = { "model": self.model_name, "messages": current_messages, "temperature": temperature, "top_p": top_p, "top_k": top_k, "max_tokens": 8192, "stream": False, "logprobs": True, "extra_body": { "enable_reasoning": enable_thinking } } try: response = requests.post(f"{self.api_base_url}/chat/completions", headers=headers, json=payload) response.raise_for_status() # 检查HTTP错误 response_data = response.json() if not response_data.get("choices"): print("错误: 响应中未找到 choices。") return "", [] full_assistant_content = response_data["choices"][0]["message"]["content"] # --- 提取 Logprobs --- list_of_rank1_logprobs = [] logprobs_data = response_data["choices"][0].get("logprobs", {}).get("content", []) for token_logprob_info in logprobs_data: # 在vLLM的OpenAI API兼容响应中,顶层"logprob"字段就是Rank 1的Logprob list_of_rank1_logprobs.append(token_logprob_info.get("logprob")) return full_assistant_content, list_of_rank1_logprobs except requests.exceptions.HTTPError as e: print(f"HTTP请求失败: {e}") print(f"响应内容: {e.response.text}") return "", [] except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return "", [] except json.JSONDecodeError as e: print(f"JSON解析失败: {e} - 响应文本: {response.text[:200]}...") return "", [] except Exception as e: print(f"发生未知错误: {e}") return "", [] # --- 示例用法 (已修改) --- if __name__ == "__main__": chatbot = Qwen3ChatClient() print("欢迎使用 Qwen3-30B-A3B 聊天客户端!") print(f"已连接到 vLLM 服务,使用模型: {MODEL_NAME}") print("--------------------------------------------------") # 硬编码的 10 个随机问题 test_questions = [ "介绍一下北京.", "写一首关于春天的五言绝句.", "请解释一下黑洞的形成原理.", "推荐三部值得一看的科幻电影,并简述理由.", "如何有效提高编程能力?", "给我讲一个关于人工智能的笑话.", "你认为未来教育会发展成什么样?", "如何制作一道美味的麻婆豆腐?", "量子计算的原理是什么?它有哪些潜在应用?", "请用英语介绍一下中国长城.", ] results_to_save = [] for i, question in enumerate(test_questions): print(f"\n--- 问题 {i+1}: {question!r} ---") full_content, rank1_logprobs = chatbot.generate_response(question, enable_thinking=True) thinking_part, main_answer = chatbot._parse_response(full_content) print(f"完整回答 (包含思考): {full_content!r}") if thinking_part: print(f"【思考过程】: {thinking_part!r}") print(f"【主要回答】: {main_answer!r}") thinking_end_tag = '' logprobs_after_thinking = [] if thinking_end_tag in full_content and rank1_logprobs: end_char_idx = full_content.find(thinking_end_tag) + len(thinking_end_tag) current_decoded_length = 0 for j, logprob_val in enumerate(rank1_logprobs): raw_logprobs_data = response_data["choices"][0].get("logprobs", {}).get("content", []) if j < len(raw_logprobs_data): token_text_from_api = raw_logprobs_data[j].get("token", "") current_decoded_length += len(token_text_from_api) if current_decoded_length > end_char_idx: logprobs_after_thinking = rank1_logprobs[j:] break if not logprobs_after_thinking and thinking_end_tag in full_content: print("Warning: Could not accurately find logprobs after tag. Using all logprobs.") logprobs_after_thinking = rank1_logprobs elif not thinking_end_tag in full_content: logprobs_after_thinking = rank1_logprobs else: logprobs_after_thinking = rank1_logprobs print("\n答案部分前10个Token的Rank 1 Logprobs:") for k, logprob_val in enumerate(logprobs_after_thinking[:10]): print(f" Step {k}: {logprob_val:.4f}") results_to_save.append({ "input": question, "output": main_answer, "logprobs_of_rank1_for_the_first_10_tokens": logprobs_after_thinking[:10] }) print("--------------------------------------------------") output_filename_client_all_results = './Qwen3-30B-A3B-Instruct-2507_logprobs_K100AI_fp16.json' with open(output_filename_client_all_results, 'w', encoding='utf-8') as f: json.dump(results_to_save, f, indent=4, ensure_ascii=False) print(f"\n所有测试结果已保存到文件: {output_filename_client_all_results}")