import time import os import configparser import argparse from multiprocessing import Value from aiohttp import web import torch from loguru import logger from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel COMMON = { "<光合组织登记网址>": "https://www.hieco.com.cn/partner?from=timeline", "<官网>": "https://www.sugon.com/after_sale/policy?sh=1", "<平台联系方式>": "1、访问官网,根据您所在地地址联系平台人员,网址地址:https://www.sugon.com/about/contact;\n2、点击人工客服进行咨询;\n3、请您拨打中科曙光服务热线400-810-0466联系人工进行咨询。", "<购买与维修的咨询方法>": "1、确定付费处理,可以微信搜索'sugon中科曙光服务'小程序,选择'在线报修'业务\n2、先了解价格,可以微信搜索'sugon中科曙光服务'小程序,选择'其他咨询'业务\n3、请您拨打中科曙光服务热线400-810-0466", "<服务器续保流程>": "1、微信搜索'sugon中科曙光服务'小程序,选择'延保与登记'业务\n2、点击人工客服进行登记\n3、请您拨打中科曙光服务热线400-810-0466根据语音提示选择维保与购买", "": "【腾讯文档】XC内外网OS网盘链接:https://docs.qq.com/sheet/DTWtXbU1BZHJvWkJm", "": "W360-G30机器,安装Win7使用的镜像链接:https://pan.baidu.com/s/1SjHqCP6kJ9KzdJEBZDEynw;提取码:x6m4", "<麒麟系统搜狗输入法下载链接>": "软件下载链接(百度云盘):链接:https://pan.baidu.com/s/18Iluvs4BOAfFET0yFMBeLQ,提取码:bhkf", "": "链接: https://pan.baidu.com/s/1RkRGh4XY1T2oYftGnjLp4w;提取码: v2qi", "": "链接:https://pan.baidu.com/s/1euG9HGbPfrVbThEB8BX76g;提取码:o2ya", "": "链接:https://pan.baidu.com/s/17KDpm-Z9lp01WGp9sQaQ4w;提取码:0802", "": "链接:https://pan.baidu.com/s/1KQ-hxUIbTWNkc0xzrEQLjg;提取码:0802", "": "下载链接如下:http://10.2.68.104/tools/bytedance/eeprom/", "": "网盘下载:https://pan.baidu.com/s/1tZJIf_IeQLOWsvuOawhslQ?pwd=kgf1;提取码:kgf1", "<福昕阅读器补丁链接>": "补丁链接: https://pan.baidu.com/s/1QJQ1kHRplhhFly-vxJquFQ,提取码: aupx1", "": "硬盘链接: https://pan.baidu.com/s/1fDdGPH15mXiw0J-fMmLt6Q提取码: k97i", "": "云盘连接下载:链接:https://pan.baidu.com/s/1gaok13DvNddtkmk6Q-qLYg?pwd=xyhb提取码:xyhb", } def build_history_messages(prompt, history, system: str = None): history_messages = [] if system is not None and len(system) > 0: history_messages.append({'role': 'system', 'content': system}) for item in history: history_messages.append({'role': 'user', 'content': item[0]}) history_messages.append({'role': 'assistant', 'content': item[1]}) history_messages.append({'role': 'user', 'content': prompt}) return history_messages class InferenceWrapper: def __init__(self, model_path: str, use_vllm: bool, stream_chat: bool, tensor_parallel_size: int): self.use_vllm = use_vllm self.stream_chat = stream_chat # huggingface self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) # self.model = AutoModelForCausalLM.from_pretrained(model_path, # trust_remote_code=True, # torch_dtype=torch.float16).cuda().eval() model = AutoModel.from_pretrained(model_path, trust_remote_code=True).half().cuda() self.model = model.eval() if self.use_vllm: ## vllm # from vllm import LLM, SamplingParams # # self.sampling_params = SamplingParams(temperature=1, top_p=0.95) # self.llm = LLM(model=model_path, # trust_remote_code=True, # enforce_eager=True, # tensor_parallel_size=tensor_parallel_size) ## fastllm from fastllm_pytools import llm try: if self.stream_chat: # fastllm的流式初始化 self.model = llm.model(model_path) else: self.model = llm.from_hf(self.model, self.tokenizer, dtype="float16") except Exception as e: logger.error(f"fastllm initial failed, {e}") def chat(self, prompt: str, history=[]): '''单轮问答''' import re output_text = '' try: if self.use_vllm: output_text = self.model.response(prompt) else: output_text, _ = self.model.chat(self.tokenizer, prompt, history, do_sample=False) matchObj = re.match('.*(<.*>).*', output_text) if matchObj: obj = matchObj.group(1) replace_str = COMMON.get(obj) output_text = output_text.replace(obj, replace_str) logger.info(f"{obj} be replaced {replace_str}, after {output_text}") except Exception as e: logger.error(f"chat inference failed, {e}") return output_text def chat_stream(self, prompt: str, history=[]): '''流式服务''' import re if self.use_vllm: from fastllm_pytools import llm # Fastllm for response in self.model.stream_response(prompt, history=[]): matchObj = re.match('.*(<.*>).*', response) if matchObj: obj = matchObj.group(1) replace_str = COMMON.get(obj) response = response.replace(obj, replace_str) logger.info(f"{obj} be replaced {replace_str}, after {response}") yield response else: # HuggingFace current_length = 0 for response, _, past_key_values in self.model.stream_chat(self.tokenizer, prompt, history=history, past_key_values=None, return_past_key_values=True): output_text = response[current_length:] matchObj = re.match('.*(<.*>).*', output_text) if matchObj: obj = matchObj.group(1) replace_str = COMMON.get(obj) output_text = output_text.replace(obj, replace_str) logger.info(f"{obj} be replaced {replace_str}, after {output_text}") yield output_text current_length = len(response) class LLMInference: def __init__(self, model_path: str, tensor_parallel_size: int, device: str = 'cuda', use_vllm: bool = False, stream_chat: bool = False ) -> None: self.device = device self.inference = InferenceWrapper(model_path=model_path, use_vllm=use_vllm, stream_chat=stream_chat, tensor_parallel_size=tensor_parallel_size) def generate_response(self, prompt, history=[]): output_text = '' error = '' time_tokenizer = time.time() try: output_text = self.inference.chat(prompt, history) except Exception as e: error = str(e) logger.error(error) time_finish = time.time() logger.debug('output_text:{} \ntimecost {} '.format(output_text, time_finish - time_tokenizer)) return output_text, error def llm_inference(args): '''启动 Web 服务器,接收 HTTP 请求,并通过调用本地的 LLM 推理服务生成响应. ''' config = configparser.ConfigParser() config.read(args.config_path) bind_port = int(config['default']['bind_port']) model_path = config['llm']['local_llm_path'] use_vllm = config.getboolean('llm', 'use_vllm') inference_wrapper = InferenceWrapper(model_path, use_vllm=use_vllm, tensor_parallel_size=1, stream_chat=args.stream_chat) async def inference(request): start = time.time() input_json = await request.json() prompt = input_json['prompt'] history = input_json['history'] if args.stream_chat: text = inference_wrapper.stream_chat(prompt=prompt, history=history) else: text = inference_wrapper.chat(prompt=prompt, history=history) end = time.time() logger.debug('问题:{} 回答:{} \ntimecost {} '.format(prompt, text, end - start)) return web.json_response({'text': text}) app = web.Application() app.add_routes([web.post('/inference', inference)]) web.run_app(app, host='0.0.0.0', port=bind_port) def set_envs(dcu_ids): try: os.environ["CUDA_VISIBLE_DEVICES"] = dcu_ids logger.info(f"Set environment variable CUDA_VISIBLE_DEVICES to {dcu_ids}") except Exception as e: logger.error(f"{e}, but got {dcu_ids}") raise ValueError(f"{e}") def parse_args(): '''参数''' parser = argparse.ArgumentParser( description='Feature store for processing directories.') parser.add_argument( '--config_path', default='../config.ini', help='config目录') parser.add_argument( '--query', default=['请问下产品的服务器保修或保修政策?'], help='提问的问题.') parser.add_argument( '--DCU_ID', type=str, default='1', help='设置DCU卡号,卡号之间用英文逗号隔开,输入样例:"0,1,2"') parser.add_argument( '--stream_chat', action='store_true', help='启用流式对话方式') args = parser.parse_args() return args def main(): args = parse_args() set_envs(args.DCU_ID) llm_inference(args) if __name__ == '__main__': main()