import time import os import configparser import argparse from multiprocessing import Value from aiohttp import web import torch from loguru import logger from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel def check_envs(args): if all(isinstance(item, int) for item in args.DCU_ID): os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, args.DCU_ID)) logger.info(f"Set environment variable CUDA_VISIBLE_DEVICES to {args.gita}") else: logger.error(f"The --DCU_ID argument must be a list of integers, but got {args.DCU_ID}") raise ValueError("The --DCU_ID argument must be a list of integers") def build_history_messages(prompt, history, system: str = None): history_messages = [] if system is not None and len(system) > 0: history_messages.append({'role': 'system', 'content': system}) for item in history: history_messages.append({'role': 'user', 'content': item[0]}) history_messages.append({'role': 'assistant', 'content': item[1]}) history_messages.append({'role': 'user', 'content': prompt}) return history_messages class InferenceWrapper: def __init__(self, model_path: str, accelerate: bool, stream_chat: bool): self.accelerate = accelerate self.stream_chat = stream_chat # huggingface self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) self.model = AutoModelForCausalLM.from_pretrained(model_path, trust_remote_code=True, device_map='auto', torch_dtype=torch.bfloat16).eval() if self.accelerate: try: # fastllm from fastllm_pytools import llm if self.stream_chat: self.model = llm.model(model_path) else: self.model = llm.from_hf(self.model, self.tokenizer, dtype="float16").cuda() except Exception as e: logger.error(str(e)) def chat(self, prompt: str, history=[]): output_text = '' try: if self.accelerate: output_text = self.model.response(prompt) else: output_text, _ = self.model.chat(self.tokenizer, prompt, history, do_sample=False) except Exception as e: logger.error(str(e)) return output_text def chat_stream(self, prompt: str, history=[]): '''流式服务''' if self.accelerate: from fastllm_pytools import llm # Fastllm for response in self.model.stream_response(prompt, history=[]): yield response else: # HuggingFace current_length = 0 past_key_values = None for response, _, past_key_values in self.model.stream_chat(self.tokenizer, prompt, history=history, past_key_values=past_key_values, return_past_key_values=True): output_text = response[current_length:] yield output_text current_length = len(response) class LLMInference: def __init__(self, model_path: str, tensor_parallel_size: int, device: str = 'cuda', accelerate: bool = False ) -> None: self.device = device self.inference = InferenceWrapper(model_path, accelerate=accelerate, tensor_parallel_size=tensor_parallel_size) def generate_response(self, prompt, history=[]): output_text = '' error = '' time_tokenizer = time.time() try: output_text = self.inference.chat(prompt, history) except Exception as e: error = str(e) logger.error(error) time_finish = time.time() logger.debug('output_text:{} \ntimecost {} '.format(output_text, time_finish - time_tokenizer)) return output_text, error def infer_test(args): config = configparser.ConfigParser() config.read(args.config_path) model_path = config['llm']['local_llm_path'] accelerate = config.getboolean('llm', 'accelerate') inference_wrapper = InferenceWrapper(model_path, accelerate=accelerate, tensor_parallel_size=1) # prompt = "hello,please introduce yourself..." prompt = "你好,请介绍北京大学" history = [] time_first = time.time() output_text = inference_wrapper.chat(prompt, use_history=True, history=history) time_second = time.time() logger.debug('问题:{} 回答:{} \ntimecost {} '.format( prompt, output_text, time_second - time_first)) def llm_inference(args): """ 启动 Web 服务器,接收 HTTP 请求,并通过调用本地的 LLM 推理服务生成响应. """ config = configparser.ConfigParser() config.read(args.config_path) bind_port = int(config['default']['bind_port']) model_path = config['llm']['local_llm_path'] accelerate = config.getboolean('llm', 'accelerate') inference_wrapper = InferenceWrapper(model_path, accelerate=accelerate, stream_chat=args.stream_chat) async def inference(request): start = time.time() input_json = await request.json() prompt = input_json['prompt'] history = input_json['history'] if args.stream_chat: text = inference_wrapper.stream_chat(prompt=prompt, history=history) else: text = inference_wrapper.chat(prompt=prompt, history=history) end = time.time() logger.debug('问题:{} 回答:{} \ntimecost {} '.format(prompt, text, end - start)) return web.json_response({'text': text}) app = web.Application() app.add_routes([web.post('/inference', inference)]) web.run_app(app, host='0.0.0.0', port=bind_port) def parse_args(): '''参数''' parser = argparse.ArgumentParser( description='Feature store for processing directories.') parser.add_argument( '--config_path', default='/home/zhangwq/project/shu_new/ai/config.ini', help='config目录') parser.add_argument( '--query', default=['请问下产品的服务器保修或保修政策?'], help='提问的问题.') parser.add_argument( '--DCU_ID', default=[0], help='设置DCU') parser.add_argument( '--stream_chat', action='store_true', help='启用流式对话方式') args = parser.parse_args() return args def main(): args = parse_args() check_envs(args) #infer_test(args) llm_inference(args) if __name__ == '__main__': main()