Commit 74767f88 authored by Rayyyyy's avatar Rayyyyy
Browse files

First add

parent c8ce1000
Pipeline #1353 failed with stages
in 0 seconds
# chat_demo
技服智能问答服务
技服智能问答服务
\ No newline at end of file
## 环境配置
### Docker(方式一)
-v 路径、docker_name和imageID根据实际情况修改
```bash
docker pull image.sourcefind.cn:5000/dcu/admin/base/pytorch:2.1.0-centos7.6-dtk23.10.1-py38
docker run -it -v /path/your_code_data/:/path/your_code_data/ --shm-size=80G --privileged=true --device=/dev/kfd --device=/dev/dri/ --group-add video --name docker_name imageID bash
# 加载运行环境变量
source /opt/dtk/cuda/env.sh
# 下载fastllm库
git clone http://developer.hpccube.com/codes/OpenDAS/fastllm.git
# 编译fastllm
cd fastllm
mkdir build
cd build
cmake ..
make -j
# 编译完成后,可以使用如下命令安装简易python工具包
cd tools # 这时在fastllm/build/tools目录下
python setup.py install
pip install transformers==4.28.0 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
pip install accelerate sentencepiece mdtex2html gradio rouge_chinese nltk jieba datasets protobuf peft==0.5.0 pydantic==1.10.9 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
```
### Dockerfile(方式二)
```
docker build -t chatglm2:latest .
docker run -dit --network=host --name=chatglm2 --privileged --device=/dev/kfd --device=/dev/dri --ipc=host --shm-size=16G --group-add video --cap-add=SYS_PTRACE --security-opt seccomp=unconfined -u root --ulimit stack=-1:-1 --ulimit memlock=-1:-1 chatglm2:latest
docker exec -it chatglm2 /bin/bash
```
### Conda(方法三)
1. 创建conda虚拟环境:
```
conda create -n chatglm python=3.8
```
2. 关于本项目DCU显卡所需的工具包、深度学习库等均可从[光合](https://developer.hpccube.com/tool/)开发者社区下载安装。
- [DTK 23.04](https://cancon.hpccube.com:65024/1/main/DTK-23.04.1)
- [Pytorch 1.13.1](https://cancon.hpccube.com:65024/4/main/pytorch/dtk23.04)
- [Deepspeed 0.9.2](https://cancon.hpccube.com:65024/4/main/deepspeed/dtk23.04)
Tips:以上dtk驱动、python、deepspeed等工具版本需要严格一一对应。
3. 其它依赖库参照requirements.txt安装:
```
pip install -r requirements.txt
```
\ No newline at end of file
from .llm_service import ChatAgent # noqa E401
from .llm_service import ErrorCode # noqa E401
from .llm_service import FeatureDataBase # noqa E401
from .llm_service import LLMInference # noqa E401
from .llm_service import llm_inference # noqa E401
from .llm_service import Worker # noqa E401
from .llm_service import DocumentName # noqa E401
from .llm_service import DocumentProcessor # noqa E401
from .llm_service import rag_retrieve # noqa E401
\ No newline at end of file
[default]
work_dir = /path/to/your/ai/work_dir
bind_port = 8888
mem_threshold = 50
dcu_threshold = 100
[feature_database]
reject_throttle = 0.6165309870679363
embedding_model_path = /path/to/your/text2vec-large-chinese
reranker_model_path = /path/to/your/bce-reranker-base_v1
[llm]
local_llm_path = /path/to/your/internlm-chat-7b
accelerate = False
\ No newline at end of file
import matplotlib.pyplot as plt
precision = [0.5, 0.50684932, 0.51388889, 0.52112676, 0.52857143, 0.53623188,
0.54411765, 0.55223881, 0.56060606, 0.56923077, 0.578125, 0.58730159,
0.59677419, 0.60655738, 0.61666667, 0.62711864, 0.63793103, 0.64912281,
0.66071429, 0.67272727, 0.68518519, 0.69811321, 0.71153846, 0.7254902,
0.74, 0.75510204, 0.77083333, 0.78723404, 0.80434783, 0.82222222,
0.84090909, 0.86046512, 0.88095238, 0.87804878, 0.9 , 0.92307692,
0.94736842, 0.94594595, 0.94444444, 0.94285714, 0.94117647, 0.93939394,
0.9375, 0.93548387, 0.96666667, 0.96551724, 0.96428571, 0.96296296,
0.96153846, 0.96, 0.95833333, 0.95652174, 0.95454545, 0.95238095,
0.95, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0 ]
recall =[1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 0.97297297, 0.97297297, 0.97297297,
0.97297297, 0.94594595, 0.91891892, 0.89189189, 0.86486486, 0.83783784,
0.81081081, 0.78378378, 0.78378378, 0.75675676, 0.72972973, 0.7027027,
0.67567568, 0.64864865, 0.62162162, 0.59459459, 0.56756757, 0.54054054,
0.51351351, 0.51351351, 0.48648649, 0.45945946, 0.43243243, 0.40540541,
0.37837838, 0.35135135, 0.32432432, 0.2972973, 0.27027027, 0.24324324,
0.21621622, 0.18918919, 0.16216216, 0.13513514, 0.10810811, 0.08108108,
0.05405405, 0]
thresholds = [0.05392042, 0.05747761, 0.08795847, 0.09711715, 0.10836032, 0.1201096,
0.12746203 ,0.14044166 ,0.14357233, 0.1931704, 0.19408794, 0.22400858,
0.22726139 ,0.23303272 ,0.25124016, 0.26014023, 0.26981908, 0.27072288,
0.27109875 ,0.28445365 ,0.28548161, 0.28880253, 0.30126451, 0.30273324,
0.3068595 ,0.31269419 ,0.33400304, 0.35578062, 0.37101008, 0.38200717,
0.38539771 ,0.38972295 ,0.39294773, 0.40920107, 0.4121291, 0.4369997,
0.44255718 ,0.44256915 ,0.46214362, 0.46447274, 0.46686736, 0.46977465,
0.47523671 ,0.47777227 ,0.48489664, 0.48920359, 0.52904117, 0.53225221,
0.53416311 ,0.54550706 ,0.55801574, 0.57259243, 0.57398918, 0.5908601,
0.60177331 ,0.60481593 ,0.61276247 ,0.62152893, 0.62966217, 0.64346738,
0.64541498 ,0.64878433 ,0.65476345, 0.65665078, 0.67432673, 0.67492593,
0.68424871 ,0.68872306 ,0.70297847 ,0.70474923,0.72355252, 0.7291418,
0.79367824, 1.0]
print(len(precision), len(recall), len(thresholds))
plt.figure(figsize=(10, 8))
plt.plot(recall, precision, label='Precision-Recall curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='best')
plt.grid(True)
# 显示图例
plt.legend()
# 显示图表
plt.show()
from vllm import LLM, SamplingParams
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '7'
prompt = ''
model_path = ''
sampling_params = SamplingParams(temperature=1, top_p=0.95)
llm = LLM(model=model_path,
trust_remote_code=True,
enforce_eager=True,
tensor_parallel_size=1)
outputs = llm.generate(prompt, sampling_params)
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}")
from .feature_database import FeatureDataBase, DocumentProcessor, DocumentName # noqa E401
from .helper import TaskCode, ErrorCode, LogManager # noqa E401
from .inferencer import LLMInference, InferenceWrapper # noqa E401
from .retriever import CacheRetriever, Retriever, rag_retrieve# noqa E401
from .worker import Worker, ChatAgent # noqa E401
\ No newline at end of file
import argparse
import fitz
import re
import os
import time
import pandas as pd
import hashlib
import textract
import shutil
import configparser
import json
from multiprocessing import Pool
from typing import List
from loguru import logger
from BCEmbedding.tools.langchain import BCERerank
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from torch.cuda import empty_cache
from bs4 import BeautifulSoup
from .retriever import CacheRetriever, Retriever
def check_envs(args):
if all(isinstance(item, int) for item in args.DCU_ID):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, args.DCU_ID))
logger.info(f"Set environment variable CUDA_VISIBLE_DEVICES to {args.DCU_ID}")
else:
logger.error(f"The --DCU_ID argument must be a list of integers, but got {args.DCU_ID}")
raise ValueError("The --DCU_ID argument must be a list of integers")
class DocumentName:
def __init__(self, directory: str, name: str, category: str):
self.directory = directory
self.prefix = name.replace('/', '_')
self.basename = os.path.basename(name)
self.origin_path = os.path.join(directory, name)
self.copy_path = ''
self._category = category
self.status = True
self.message = ''
def __str__(self):
return '{},{},{},{}\n'.format(self.basename, self.copy_path, self.status,
self.message)
class DocumentProcessor:
def __init__(self):
self.image_suffix = ['.jpg', '.jpeg', '.png', '.bmp']
self.md_suffix = '.md'
self.text_suffix = ['.txt', '.text']
self.excel_suffix = ['.xlsx', '.xls', '.csv']
self.pdf_suffix = '.pdf'
self.ppt_suffix = '.pptx'
self.html_suffix = ['.html', '.htm', '.shtml', '.xhtml']
self.word_suffix = ['.docx', '.doc']
self.json_suffix = '.json'
def md5(self, filepath: str):
hash_object = hashlib.sha256()
with open(filepath, 'rb') as file:
chunk_size = 8192
while chunk := file.read(chunk_size):
hash_object.update(chunk)
return hash_object.hexdigest()[0:8]
def summarize(self, files: list):
success = 0
skip = 0
failed = 0
for file in files:
if file.status:
success += 1
elif file.message == 'skip':
skip += 1
else:
logger.info('{}文件异常, 异常信息: {} '.format(file.origin_path, file.message))
failed += 1
logger.info('解析{}文件,成功{}个,跳过{}个,异常{}个'.format(len(files), success,
skip, failed))
def read_file_type(self, filepath: str):
filepath = filepath.lower()
if filepath.endswith(self.pdf_suffix):
return 'pdf'
if filepath.endswith(self.md_suffix):
return 'md'
if filepath.endswith(self.ppt_suffix):
return 'ppt'
if filepath.endswith(self.json_suffix):
return 'json'
for suffix in self.image_suffix:
if filepath.endswith(suffix):
return 'image'
for suffix in self.text_suffix:
if filepath.endswith(suffix):
return 'text'
for suffix in self.word_suffix:
if filepath.endswith(suffix):
return 'word'
for suffix in self.excel_suffix:
if filepath.endswith(suffix):
return 'excel'
for suffix in self.html_suffix:
if filepath.endswith(suffix):
return 'html'
return None
def scan_directory(self, repo_dir: str):
documents = []
for directory, _, names in os.walk(repo_dir):
for name in names:
category = self.read_file_type(name)
if category is not None:
documents.append(
DocumentName(directory=directory, name=name, category=category))
return documents
def read(self, filepath: str):
file_type = self.read_file_type(filepath)
text = ''
if not os.path.exists(filepath):
return text
try:
if file_type == 'md' or file_type == 'text':
text = []
with open(filepath) as f:
txt = f.read()
cleaned_txt = re.sub(r'\n\s*\n', '\n\n', txt)
text.append(cleaned_txt)
elif file_type == 'pdf':
text += self.read_pdf(filepath)
text = re.sub(r'\n\s*\n', '\n\n', text)
elif file_type == 'excel':
text = []
df = pd.read_excel(filepath, header=None)
for row in df.index.values:
doc = dict()
doc['Que'] = df.iloc[row, 0]
doc['Ans'] = df.iloc[row, 1]
text.append(str(doc))
# text += self.read_excel(filepath)
elif file_type == 'word' or file_type == 'ppt':
# https://stackoverflow.com/questions/36001482/read-doc-file-with-python
# https://textract.readthedocs.io/en/latest/installation.html
text = textract.process(filepath).decode('utf8')
text = re.sub(r'\n\s*\n', '\n\n', text)
if file_type == 'ppt':
text = text.replace('\n', ' ')
elif file_type == 'html':
with open(filepath) as f:
soup = BeautifulSoup(f.read(), 'html.parser')
text += soup.text
elif filepath.endswith('.json'):
# 打开JSON文件进行读取
with open(filepath, 'r', encoding='utf-8') as file:
# 读取文件的所有行
text = file.readlines()
except Exception as e:
logger.error((filepath, str(e)))
return '', e
return text, None
def read_pdf(self, filepath: str):
# load pdf and serialize table
text = ''
with fitz.open(filepath) as pages:
for page in pages:
text += page.get_text()
tables = page.find_tables()
for table in tables:
tablename = '_'.join(
filter(lambda x: x is not None and 'Col' not in x,
table.header.names))
pan = table.to_pandas()
json_text = pan.dropna(axis=1).to_json(force_ascii=False)
text += tablename
text += '\n'
text += json_text
text += '\n'
return text
def read_and_save(file: DocumentName, file_opr: DocumentProcessor):
try:
if os.path.exists(file.copy_path):
# already exists, return
logger.info('{} already processed, output file: {}, skip load'
.format(file.origin_path, file.copy_path))
return
logger.info('reading {}, would save to {}'.format(file.origin_path,
file.copy_path))
content, error = file_opr.read(file.origin_path)
if error is not None:
logger.error('{} load error: {}'.format(file.origin_path, str(error)))
return
if content is None or len(content) < 1:
logger.warning('{} empty, skip save'.format(file.origin_path))
return
cleaned_content = re.sub(r'\n\s*\n', '\n\n', content)
with open(file.copy_path, 'w') as f:
f.write(os.path.splitext(file.basename)[0] + '\n')
f.write(cleaned_content)
except Exception as e:
logger.error(f"Error in read_and_save: {e}")
class FeatureDataBase:
def __init__(self,
embeddings: HuggingFaceEmbeddings,
reranker: BCERerank,
reject_throttle: float) -> None:
# logger.debug('loading text2vec model..')
self.embeddings = embeddings
self.reranker = reranker
self.compression_retriever = None
self.rejecter = None
self.retriever = None
self.reject_throttle = reject_throttle if reject_throttle else -1
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=768, chunk_overlap=32)
def get_documents(self, text, file):
# if len(text) <= 1:
# return []
chunks = self.text_splitter.create_documents(text)
documents = []
for chunk in chunks:
# `source` is for return references
# `read` is for LLM response
chunk.metadata = {'source': file.basename, 'read': file.origin_path}
documents.append(chunk)
return documents
def register_response(self, files: list, work_dir: str, file_opr: DocumentProcessor):
feature_dir = os.path.join(work_dir, 'db_response')
if not os.path.exists(feature_dir):
os.makedirs(feature_dir)
documents = []
for i, file in enumerate(files):
if not file.status:
continue
# 读取每个file
text, error = file_opr.read(file.copy_path)
if error is not None:
file.status = False
file.message = str(error)
continue
file.message = str(text[0])
# file.message = str(len(text))
# logger.info('{} content length {}'.format(
# file._category, len(text)))
document = self.get_documents(text, file)
documents += document
logger.debug('Positive pipeline {}/{}.. register 《{}》 and split {} documents'
.format(i + 1, len(files), file.basename, len(document)))
logger.debug('Positive pipeline register {} documents into database...'.format(len(documents)))
time_before_register = time.time()
vs = FAISS.from_documents(documents, self.embeddings)
vs.save_local(feature_dir)
time_after_register = time.time()
logger.debug('Positive pipeline take time: {} '.format(time_after_register - time_before_register))
def register_reject(self, files: list, work_dir: str, file_opr: DocumentProcessor):
feature_dir = os.path.join(work_dir, 'db_reject')
if not os.path.exists(feature_dir):
os.makedirs(feature_dir)
documents = []
for i, file in enumerate(files):
if not file.state:
continue
text, error = file_opr.read(file.copypath)
if len(text) < 1:
continue
if error is not None:
continue
document = self.get_documents(text, file)
documents += document
logger.debug('Negative pipeline {}/{}.. register 《{}》 and split {} documents'
.format(i + 1, len(files), file.basename, len(document)))
if len(documents) < 1:
return
logger.debug('Negative pipeline register {} documents into database...'.format(len(documents)))
time_before_register = time.time()
vs = FAISS.from_documents(documents, self.embeddings)
vs.save_local(feature_dir)
time_after_register = time.time()
logger.debug('Negative pipeline take time: {} '.format(time_after_register - time_before_register))
def preprocess(self, files: list, work_dir: str, file_opr: DocumentProcessor):
preproc_dir = os.path.join(work_dir, 'preprocess')
if not os.path.exists(preproc_dir):
os.makedirs(preproc_dir)
pool = Pool(processes=16)
for idx, file in enumerate(files):
if not os.path.exists(file.origin_path):
file.status = False
file.message = 'skip not exist'
continue
if file._category == 'image':
file.status = False
file.message = 'skip image'
elif file._category in ['pdf', 'word', 'ppt', 'html']:
# read pdf/word/excel file and save to text format
md5 = file_opr.md5(file.origin_path)
file.copy_path = os.path.join(preproc_dir,
'{}.text'.format(md5))
pool.apply_async(read_and_save, args=(file, file_opr))
elif file._category in ['md', 'text']:
# rename text files to new dir
file.copy_path = os.path.join(
preproc_dir,
file.origin_path.replace('/', '_')[-84:])
try:
shutil.copy(file.origin_path, file.copy_path)
file.status = True
file.message = 'preprocessed'
except Exception as e:
file.status = False
file.message = str(e)
elif file._category in ['json', 'excel']:
file.status = True
file.copy_path = file.origin_path
file.message = 'preprocessed'
else:
file.status = False
file.message = 'skip unknown format'
pool.close()
logger.debug('waiting for preprocess read finish..')
pool.join()
# check process result
for file in files:
if file._category in ['pdf', 'word', 'excel']:
if os.path.exists(file.copy_path):
file.status = True
file.message = 'preprocessed'
else:
file.status = False
file.message = 'read error'
def initialize(self, files: list, work_dir: str, file_opr: DocumentProcessor):
self.preprocess(files=files, work_dir=work_dir, file_opr=file_opr)
self.register_response(files=files, work_dir=work_dir, file_opr=file_opr)
# self.register_reject(files=files, work_dir=work_dir, file_opr=file_opr)
def merge_db_response(self, faiss: FAISS, files: list, work_dir: str, file_opr: DocumentProcessor):
feature_dir = os.path.join(work_dir, 'db_response')
if not os.path.exists(feature_dir):
os.makedirs(feature_dir)
documents = []
for i, file in enumerate(files):
logger.debug('{}/{}.. register 《{}》 into database...'.format(i + 1, len(files), file.basename))
if not file.status:
continue
# 读取每个file
text, error = file_opr.read(file.copy_path)
if error is not None:
file.status = False
file.message = str(error)
continue
file.message = str(text[0])
# file.message = str(len(text))
# logger.info('{} content length {}'.format(
# file._category, len(text)))
documents += self.get_documents(text, file)
vs = FAISS.from_documents(documents, self.embeddings)
faiss.merge_from(vs)
faiss.save_local(feature_dir)
def test_reject(retriever: Retriever):
"""Simple test reject pipeline."""
real_questions = [
'姚明是谁?',
'CBBA是啥?',
'差多少嘞?',
'cnn 的全称是什么?',
'transformer啥意思?',
'成都有什么好吃的推荐?',
'树博士是什么?',
'白马非马啥意思?',
'mmpose 如何安装?',
'今天天气如何?',
'写一首五言律诗?',
'先有鸡还是先有蛋?',
'如何在Gromacs中进行蛋白质的动态模拟?',
'wy-vSphere 7 海光平台兼容补丁?',
'在Linux系统中,如何进行源码包的安装?'
]
for example in real_questions:
relative, _ = retriever.is_relative(example)
if relative:
logger.warning(f'process query: {example}')
retriever.query(example)
empty_cache()
else:
logger.error(f'reject query: {example}')
empty_cache()
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description='Feature store for processing directories.')
parser.add_argument('--work_dir',
type=str,
default='/ai/work_dir',
help='自定义.')
parser.add_argument(
'--repo_dir',
type=str,
default='',
help='需要读取的文件目录.')
parser.add_argument(
'--config_path',
default='/ai/config.ini',
help='config目录')
parser.add_argument(
'--DCU_ID',
default=[4],
help='设置DCU')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
log_file_path = os.path.join(args.work_dir, 'application.log')
logger.add(log_file_path, rotation='10MB', compression='zip')
check_envs(args)
config = configparser.ConfigParser()
config.read(args.config_path)
embedding_model_path = config['feature_database']['embedding_model_path']
reranker_model_path = config['feature_database']['reranker_model_path']
reject_throttle = float(config['feature_database']['reject_throttle'])
cache = CacheRetriever(embedding_model_path=embedding_model_path,
reranker_model_path=reranker_model_path)
fs_init = FeatureDataBase(embeddings=cache.embeddings,
reranker=cache.reranker,
reject_throttle=reject_throttle)
# walk all files in repo dir
file_opr = DocumentProcessor()
files = file_opr.scan_directory(repo_dir=args.repo_dir)
fs_init.initialize(files=files, work_dir=args.work_dir, file_opr=file_opr)
file_opr.summarize(files)
del fs_init
retriever = cache.get(reject_throttle=reject_throttle,
work_dir=args.work_dir)
# with open(os.path.join(args.work_dir, 'sample', 'positive.json')) as f:
# positive_sample = json.load(f)
# with open(os.path.join(args.work_dir, 'sample', 'negative.json')) as f:
# negative_sample = json.load(f)
with open(os.path.join(args.work_dir, 'sample', 'positive.txt'), 'r', encoding='utf-8') as file:
positive_sample = []
for line in file:
positive_sample.append(line.strip())
with open(os.path.join(args.work_dir, 'sample', 'negative.txt'), 'r', encoding='utf-8') as file:
negative_sample = []
for line in file:
negative_sample.append(line.strip())
reject_throttle = retriever.update_throttle(work_dir=args.work_dir,
config_path=args.config_path,
positive_sample=positive_sample,
negative_sample=negative_sample)
cache.pop('default')
# test
retriever = cache.get(reject_throttle=reject_throttle,
work_dir=args.work_dir)
test_reject(retriever)
\ No newline at end of file
from enum import Enum
class TaskCode(Enum):
FS_ADD_DOC = 'add_doc'
FS_UPDATE_SAMPLE = 'update_sample'
FS_UPDATE_PIPELINE = 'update_pipeline'
CHAT = 'chat'
CHAT_RESPONSE = 'chat_response'
class ErrorCode(Enum):
SUCCESS = 0, 'success'
NOT_A_QUESTION = 1, 'query is not a question'
NO_TOPIC = 2, 'The question does not have a topic. It might be a meaningless sentence.' # noqa E501
UNRELATED = 3, 'Topics unrelated to the knowledge base. Revising the question can improve accuracy..' # noqa E501
NO_SEARCH_KEYWORDS = 4, 'Cannot extract keywords.'
NO_SEARCH_RESULT = 5, 'Cannot retrieve results.'
BAD_ANSWER = 6, 'Irrelevant answer.'
SECURITY = 7, 'Reply has a high relevance to prohibited topics.'
NOT_WORK_TIME = 8, 'Non-working hours. The config.ini file can be modified to adjust this. **In scenarios where speech may pose risks, let the robot operate under human supervision**' # noqa E501
PARAMETER_ERROR = 9, "HTTP interface parameter error. Query cannot be empty; the format of history is list of lists, like [['question1', 'reply1'], ['question2'], ['reply2']]" # noqa E501
PARAMETER_MISS = 10, 'Missing key in http json input parameters.'
WORK_IN_PROGRESS = 11, 'not finish'
FAILED = 12, 'fail'
BAD_PARAMETER = 13, 'bad parameter'
INTERNAL_ERROR = 14, 'internal error'
SEARCH_FAIL = 15, 'Search fail, please check TOKEN and quota'
NOT_FIND_RELATED_DOCS = 16, 'No relevant documents found, the following answer is generated directly by LLM.'
NON_COMPLIANCE_QUESTION = 17, 'Non-compliance question, refusing to answer.'
def __new__(cls, value, description):
"""Create new instance of ErrorCode."""
obj = object.__new__(cls)
obj._value_ = value
obj.description = description
return obj
def __int__(self):
return self.value
def describe(self):
return self.description
@classmethod
def format(cls, code):
if isinstance(code, cls):
return {'code': int(code), 'message': code.describe()}
raise TypeError(f'Expected type {cls}, got {type(code)}')
class LogManager:
def __init__(self, log_path):
self.log_path = log_path
self.log_content_list = []
def log(self, operation, outcome=''):
self.log_content_list.append((operation, outcome))
def __del__(self):
try:
with open(self.log_path, 'a', encoding='utf8') as file:
for operation, outcome in self.log_content_list:
file.write(f'{operation}: {outcome}\n')
file.write('\n')
except Exception as e:
print(e)
\ No newline at end of file
import time
import os
import configparser
import argparse
from multiprocessing import Value
from aiohttp import web
import torch
from loguru import logger
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel
def check_envs(args):
if all(isinstance(item, int) for item in args.DCU_ID):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, args.DCU_ID))
logger.info(f"Set environment variable CUDA_VISIBLE_DEVICES to {args.gita}")
else:
logger.error(f"The --DCU_ID argument must be a list of integers, but got {args.DCU_ID}")
raise ValueError("The --DCU_ID argument must be a list of integers")
def build_history_messages(prompt, history, system: str = None):
history_messages = []
if system is not None and len(system) > 0:
history_messages.append({'role': 'system', 'content': system})
for item in history:
history_messages.append({'role': 'user', 'content': item[0]})
history_messages.append({'role': 'assistant', 'content': item[1]})
history_messages.append({'role': 'user', 'content': prompt})
return history_messages
class InferenceWrapper:
def __init__(self, model_path: str, accelerate: bool, stream_chat: bool):
self.accelerate = accelerate
self.stream_chat = stream_chat
# huggingface
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
self.model = AutoModelForCausalLM.from_pretrained(model_path,
trust_remote_code=True,
device_map='auto',
torch_dtype=torch.bfloat16).eval()
if self.accelerate:
try:
# fastllm
from fastllm_pytools import llm
if self.stream_chat:
self.model = llm.model(model_path)
else:
self.model = llm.from_hf(self.model, self.tokenizer, dtype="float16").cuda()
except Exception as e:
logger.error(str(e))
def chat(self, prompt: str, history=[]):
output_text = ''
try:
if self.accelerate:
output_text = self.model.response(prompt)
else:
output_text, _ = self.model.chat(self.tokenizer,
prompt,
history,
do_sample=False)
except Exception as e:
logger.error(str(e))
return output_text
def chat_stream(self, prompt: str, history=[]):
'''流式服务'''
if self.accelerate:
from fastllm_pytools import llm
# Fastllm
for response in self.model.stream_response(prompt, history=[]):
yield response
else:
# HuggingFace
current_length = 0
past_key_values = None
for response, _, past_key_values in self.model.stream_chat(self.tokenizer, prompt, history=history,
past_key_values=past_key_values,
return_past_key_values=True):
output_text = response[current_length:]
yield output_text
current_length = len(response)
class LLMInference:
def __init__(self,
model_path: str,
tensor_parallel_size: int,
device: str = 'cuda',
accelerate: bool = False
) -> None:
self.device = device
self.inference = InferenceWrapper(model_path,
accelerate=accelerate,
tensor_parallel_size=tensor_parallel_size)
def generate_response(self, prompt, history=[]):
output_text = ''
error = ''
time_tokenizer = time.time()
try:
output_text = self.inference.chat(prompt, history)
except Exception as e:
error = str(e)
logger.error(error)
time_finish = time.time()
logger.debug('output_text:{} \ntimecost {} '.format(output_text,
time_finish - time_tokenizer))
return output_text, error
def infer_test(args):
config = configparser.ConfigParser()
config.read(args.config_path)
model_path = config['llm']['local_llm_path']
accelerate = config.getboolean('llm', 'accelerate')
inference_wrapper = InferenceWrapper(model_path,
accelerate=accelerate,
tensor_parallel_size=1)
# prompt = "hello,please introduce yourself..."
prompt = "你好,请介绍北京大学"
history = []
time_first = time.time()
output_text = inference_wrapper.chat(prompt, use_history=True, history=history)
time_second = time.time()
logger.debug('问题:{} 回答:{} \ntimecost {} '.format(
prompt, output_text, time_second - time_first))
def llm_inference(args):
"""
启动 Web 服务器,接收 HTTP 请求,并通过调用本地的 LLM 推理服务生成响应.
"""
config = configparser.ConfigParser()
config.read(args.config_path)
bind_port = int(config['default']['bind_port'])
model_path = config['llm']['local_llm_path']
accelerate = config.getboolean('llm', 'accelerate')
inference_wrapper = InferenceWrapper(model_path,
accelerate=accelerate,
stream_chat=args.stream_chat)
async def inference(request):
start = time.time()
input_json = await request.json()
prompt = input_json['prompt']
history = input_json['history']
if args.stream_chat:
text = inference_wrapper.stream_chat(prompt=prompt, history=history)
else:
text = inference_wrapper.chat(prompt=prompt, history=history)
end = time.time()
logger.debug('问题:{} 回答:{} \ntimecost {} '.format(prompt, text, end - start))
return web.json_response({'text': text})
app = web.Application()
app.add_routes([web.post('/inference', inference)])
web.run_app(app, host='0.0.0.0', port=bind_port)
def parse_args():
'''参数'''
parser = argparse.ArgumentParser(
description='Feature store for processing directories.')
parser.add_argument(
'--config_path',
default='/home/zhangwq/project/shu_new/ai/config.ini',
help='config目录')
parser.add_argument(
'--query',
default=['请问下产品的服务器保修或保修政策?'],
help='提问的问题.')
parser.add_argument(
'--DCU_ID',
default=[0],
help='设置DCU')
parser.add_argument(
'--stream_chat',
action='store_true',
help='启用流式对话方式')
args = parser.parse_args()
return args
def main():
args = parse_args()
check_envs(args)
#infer_test(args)
llm_inference(args)
if __name__ == '__main__':
main()
import os
import argparse
import time
import configparser
import numpy as np
from aiohttp import web
from multiprocessing import Value
from torch.cuda import empty_cache
from BCEmbedding.tools.langchain import BCERerank
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.retrievers import ContextualCompressionRetriever
from langchain_community.vectorstores import FAISS
from langchain_community.vectorstores.utils import DistanceStrategy
from sklearn.metrics import precision_recall_curve
from loguru import logger
def check_envs(args):
if all(isinstance(item, int) for item in args.DCU_ID):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, args.DCU_ID))
logger.info(f"Set environment variable CUDA_VISIBLE_DEVICES to {args.DCU_ID}")
else:
logger.error(f"The --DCU_ID argument must be a list of integers, but got {args.DCU_ID}")
raise ValueError("The --DCU_ID argument must be a list of integers")
class Retriever:
def __init__(self, embeddings, reranker, work_dir: str, reject_throttle: float) -> None:
self.reject_throttle = reject_throttle
self.rejecter = None
self.retriever = None
self.compression_retriever = None
self.embeddings = embeddings
self.reranker = reranker
self.rejecter = FAISS.load_local(
os.path.join(work_dir, 'db_response'),
embeddings=embeddings,
allow_dangerous_deserialization=True)
self.vector_store = FAISS.load_local(
os.path.join(work_dir, 'db_response'),
embeddings=embeddings,
allow_dangerous_deserialization=True,
distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT)
self.retriever = self.vector_store.as_retriever(
search_type='similarity',
search_kwargs={
'score_threshold': 0.4,
'k': 30
}
)
# retriever = self.vector_store.as_retriever(
# search_type='similarity',
# search_kwargs={
# 'score_threshold': 0.15,
# 'k': 30
# }
# )
self.compression_retriever = ContextualCompressionRetriever(
base_compressor=reranker, base_retriever=self.retriever)
if self.rejecter is None:
logger.warning('rejecter is None')
if self.retriever is None:
logger.warning('retriever is None')
def is_relative(self, sample, k=30, disable_throttle=False):
"""If no search results below the threshold can be found from the
database, reject this query."""
if self.rejecter is None:
return False, []
if disable_throttle:
# for searching throttle during update sample
docs_with_score = self.rejecter.similarity_search_with_relevance_scores(
sample, k=1)
if len(docs_with_score) < 1:
return False, docs_with_score
return True, docs_with_score
else:
# for retrieve result
# if no chunk passed the throttle, give the max
docs_with_score = self.rejecter.similarity_search_with_relevance_scores(
sample, k=k)
ret = []
max_score = -1
top1 = None
for (doc, score) in docs_with_score:
if score >= self.reject_throttle:
ret.append(doc)
if score > max_score:
max_score = score
top1 = (doc, score)
relative = True if len(ret) > 0 else False
return relative, [top1]
def update_throttle(self,
work_dir: str,
config_path: str = 'config.ini',
positive_sample=[],
negative_sample=[]):
import matplotlib.pyplot as plt
"""Update reject throttle based on positive and negative examples."""
if len(positive_sample) == 0 or len(negative_sample) == 0:
raise Exception('positive and negative samples cat not be empty.')
all_samples = positive_sample + negative_sample
predictions = []
for sample in all_samples:
self.reject_throttle = -1
_, docs = self.is_relative(sample=sample,
disable_throttle=True)
score = docs[0][1]
predictions.append(max(0, score))
labels = [1 for _ in range(len(positive_sample))
] + [0 for _ in range(len(negative_sample))]
precision, recall, thresholds = precision_recall_curve(
labels, predictions)
plt.figure(figsize=(10, 8))
plt.plot(recall, precision, label='Precision-Recall curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='best')
plt.grid(True)
plt.savefig(os.path.join(work_dir, 'precision_recall_curve.png'), format='png')
plt.close()
logger.debug("Figure have been saved!")
thresholds = np.append(thresholds, 1)
max_precision = np.max(precision)
indices_with_max_precision = np.where(precision == max_precision)
optimal_recall = recall[indices_with_max_precision[0][0]]
optimal_threshold = thresholds[indices_with_max_precision[0][0]]
logger.debug(f"Optimal threshold with the highest recall under the highest precision is: {optimal_threshold}")
logger.debug(f"The corresponding precision is: {max_precision}")
logger.debug(f"The corresponding recall is: {optimal_recall}")
config = configparser.ConfigParser()
config.read(config_path)
config['feature_database']['reject_throttle'] = str(optimal_threshold)
with open(config_path, 'w') as configfile:
config.write(configfile)
logger.info(
f'Update optimal threshold: {optimal_threshold} to {config_path}' # noqa E501
)
return optimal_threshold
def query(self,
question: str,
):
time_1 = time.time()
if question is None or len(question) < 1:
return None, None, []
if len(question) > 512:
logger.warning('input too long, truncate to 512')
question = question[0:512]
chunks = []
references = []
relative, docs = self.is_relative(sample=question)
if relative:
docs = self.compression_retriever.get_relevant_documents(question)
for doc in docs:
doc = [doc.page_content]
chunks.append(doc)
# chunks = [doc.page_content for doc in docs]
references = [doc.metadata['source'] for doc in docs]
time_2 = time.time()
logger.debug('query:{} \nchunks:{} \nreferences:{} \ntimecost:{}'
.format(question, chunks, references, time_2 - time_1))
return chunks, [os.path.basename(r) for r in references]
else:
if len(docs) > 0:
references.append(docs[0][0].metadata['source'])
logger.info('feature database rejected!')
return chunks, references
class CacheRetriever:
def __init__(self, embedding_model_path: str, reranker_model_path: str, max_len: int = 4):
self.cache = dict()
self.max_len = max_len
# load text2vec and rerank model
logger.info('loading test2vec and rerank models')
self.embeddings = HuggingFaceEmbeddings(
model_name=embedding_model_path,
model_kwargs={'device': 'cuda'},
encode_kwargs={
'batch_size': 1,
'normalize_embeddings': True
})
# half
self.embeddings.client = self.embeddings.client.half()
reranker_args = {
'model': reranker_model_path,
'top_n': 3,
'device': 'cuda',
'use_fp16': True
}
self.reranker = BCERerank(**reranker_args)
def get(self,
reject_throttle: float,
fs_id: str = 'default',
work_dir='workdir'
):
if fs_id in self.cache:
self.cache[fs_id]['time'] = time.time()
return self.cache[fs_id]['retriever']
if len(self.cache) >= self.max_len:
# drop the oldest one
del_key = None
min_time = time.time()
for key, value in self.cache.items():
cur_time = value['time']
if cur_time < min_time:
min_time = cur_time
del_key = key
if del_key is not None:
del_value = self.cache[del_key]
self.cache.pop(del_key)
del del_value['retriever']
retriever = Retriever(embeddings=self.embeddings,
reranker=self.reranker,
work_dir=work_dir,
reject_throttle=reject_throttle)
self.cache[fs_id] = {'retriever': retriever, 'time': time.time()}
return retriever
def pop(self, fs_id: str):
if fs_id not in self.cache:
return
del_value = self.cache[fs_id]
self.cache.pop(fs_id)
# manually free memory
del del_value
def rag_retrieve(config_path: str, server_ready):
"""
启动 Web 服务器,接收 HTTP 请求,并通过调用本地的 rag 检索服务.
"""
config = configparser.ConfigParser()
config.read(config_path)
bind_port = int(config['default']['bind_port'])
work_dir = config['default']['work_dir']
try:
retriever = CacheRetriever(config_path=config_path).get(config_path=config_path,
work_dir=work_dir)
server_ready.value = 1
except Exception as e:
server_ready.value = -1
raise (e)
async def retrieve(request):
input_json = await request.json()
query = input_json['query']
chunks, ref = retriever.query(query)
return web.json_response({'chunks': chunks, 'ref': ref})
app = web.Application()
app.add_routes([web.post('/retrieve', retrieve)])
web.run_app(app, host='0.0.0.0', port=bind_port)
def test_query(retriever: Retriever, real_questions):
"""Simple test response pipeline."""
if real_questions is None or not real_questions:
logger.error("No questions provided or real_questions is empty.")
return None
else:
logger.add('logs/feature_store_query.log', rotation='4MB')
for example in real_questions:
example = example[0:400]
retriever.query(example)
empty_cache()
empty_cache()
def parse_args():
parser = argparse.ArgumentParser(
description='Feature store for processing directories.')
parser.add_argument(
'--config_path',
default='/home/zhangwq/project/shu/ai/config.ini',
help='config目录')
parser.add_argument(
'--query',
default=['先有鸡还是先有蛋?', '写一首五言律诗?'],
help='提问的问题.')
parser.add_argument(
'--DCU_ID',
default=[6],
help='设置DCU')
args = parser.parse_args()
return args
def main():
args = parse_args()
check_envs(args)
config = configparser.ConfigParser()
config.read(args.config_path)
embedding_model_path = config['feature_database']['embedding_model_path']
reranker_model_path = config['feature_database']['reranker_model_path']
cache = CacheRetriever(embedding_model_path=embedding_model_path,
reranker_model_path=reranker_model_path)
retriever = cache.get(reject_throttle=float(config['feature_database']['reject_throttle']),
work_dir=config['default']['work_dir'])
test_query(retriever, args.query)
# server_ready = Value('i', 0)
# rag_retrieve(config_path=args.config_path,
# server_ready=server_ready)
if __name__ == '__main__':
main()
from loguru import logger
from .helper import ErrorCode, LogManager
from .retriever import CacheRetriever
from .inferencer import LLMInference
from .feature_database import DocumentProcessor, FeatureDataBase
class ChatAgent:
def __init__(self, config, tensor_parallel_size) -> None:
self.work_dir = config['default']['work_dir']
self.embedding_model_path = config['feature_database']['embedding_model_path']
self.reranker_model_path = config['feature_database']['reranker_model_path']
reject_throttle = float(config['feature_database']['reject_throttle'])
local_llm_path = config['llm']['local_llm_path']
accelerate = config.getboolean('llm', 'accelerate')
self.retriever = CacheRetriever(self.embedding_model_path,
self.reranker_model_path).get(reject_throttle=reject_throttle,
work_dir=self.work_dir)
self.llm_server = LLMInference(local_llm_path, tensor_parallel_size, accelerate=accelerate)
def generate_prompt(self,
history_pair,
instruction: str,
template: str,
context: str = ''):
if context is not None and len(context) > 0:
instruction = template.format(context, instruction)
real_history = []
for pair in history_pair:
if pair[0] is None or pair[1] is None:
continue
if len(pair[0]) < 1 or len(pair[1]) < 1:
continue
real_history.append(pair)
return instruction, real_history
def call_rag_retrieve(self, query):
return self.retriever.query(query)
def call_llm_response(self, prompt, history=None):
text, error = self.llm_server.generate_response(prompt=prompt, history=history)
return text
def parse_file_and_merge(self, file_dir):
file_opr = DocumentProcessor()
files = file_opr.scan_directory(repo_dir=file_dir)
file_handler = FeatureDataBase(embeddings=self.retriever.embeddings, reranker=self.retriever.reranker)
file_handler.preprocess(files=files, work_dir=self.work_dir, file_opr=file_opr)
file_handler.merge_db_response(self.retriever.vector_store, files=files, work_dir=self.work_dir, file_opr=file_opr)
file_opr.summarize(files)
self.retriever = CacheRetriever(self.embedding_model_path, self.reranker_model_path).get(work_dir=self.work_dir)
class Worker:
def __init__(self, config, tensor_parallel_size):
self.agent = ChatAgent(config, tensor_parallel_size)
self.TOPIC_TEMPLATE = '告诉我这句话的主题,直接说主题不要解释:“{}”'
self.SCORING_RELAVANCE_TEMPLATE = '问题:“{}”\n材料:“{}”\n请仔细阅读以上内容,材料里为一个列表,列表里面有若干子列表,请判断每个子列表的内容和问题的相关度,不要解释直接给出相关度得分列表并以空格分隔,用0~10表示。判断标准:非常相关得 10 分;完全没关联得 0 分。\n' # noqa E501
self.KEYWORDS_TEMPLATE = '谷歌搜索是一个通用搜索引擎,可用于访问互联网、查询百科知识、了解时事新闻等。搜索参数类型 string, 内容是短语或关键字,以空格分隔。\n你现在是搜搜小助手,用户提问“{}”,你打算通过谷歌搜索查询相关资料,请提供用于搜索的关键字或短语,不要解释直接给出关键字或短语。' # noqa E501
self.SECURITY_TEMAPLTE = '判断以下句子是否涉及政治、辱骂、色情、恐暴、宗教、网络暴力、种族歧视等违禁内容,结果用 0~10 表示,不要解释直接给出得分。判断标准:涉其中任一问题直接得 10 分;完全不涉及得 0 分。直接给得分不要解释:“{}”' # noqa E501
self.PERPLESITY_TEMPLATE = '“question:{} answer:{}”\n阅读以上对话,answer 是否在表达自己不知道,回答越全面得分越少,用0~10表示,不要解释直接给出得分。\n判断标准:准确回答问题得 0 分;答案详尽得 1 分;知道部分答案但有不确定信息得 8 分;知道小部分答案但推荐求助其他人得 9 分;不知道任何答案直接推荐求助别人得 10 分。直接打分不要解释。' # noqa E501
self.SUMMARIZE_TEMPLATE = '{} \n 仔细阅读以上内容,总结得简短有力点' # noqa E501
self.GENERATE_TEMPLATE = '“{}” \n问题:“{}” \n请仔细阅读上述文字, 并使用markdown格式回答问题,直接给出回答不做任何解释。' # noqa E501
self.MARKDOWN_TEMPLATE = '问题:“{}” \n请使用markdown格式回答此问题'
def judgment_results(self, query, chunks, throttle):
relation_score = self.agent.call_llm_response(
prompt=self.SCORING_RELAVANCE_TEMPLATE.format(query, chunks))
logger.info('score: %s' % [relation_score, throttle])
# 过滤操作
filtered_chunks = []
for chunk, score in zip(chunks, relation_score.split()):
if float(score) >= float(throttle):
filtered_chunks.append(chunk)
return filtered_chunks
def extract_topic(self, query):
topic = self.agent.call_llm_response(self.TOPIC_TEMPLATE.format(query))
return topic
def response_direct_by_llm(self, query):
# Compliant check
prompt = self.SECURITY_TEMAPLTE.format(query)
score = self.agent.call_llm_response(prompt=prompt)
logger.debug("score:{}, prompt:{}".format(score, prompt))
if int(score) > 5:
return ErrorCode.NON_COMPLIANCE_QUESTION, "您的问题中涉及敏感话题,请重新提问。", None
logger.info('LLM direct response and prompt is: {}'.format(query))
prompt = self.MARKDOWN_TEMPLATE.format(query)
response_direct = self.agent.call_llm_response(prompt=prompt)
return ErrorCode.NOT_FIND_RELATED_DOCS, response_direct, None
def produce_response(self, query,
history,
judgment,
topic=False,
rag=True):
response = ''
references = []
if query is None:
return ErrorCode.NOT_A_QUESTION, response, references
logger.info('input: %s' % [query, history])
if rag:
if topic:
query = self.extract_topic(query)
logger.info('topic: %s' % query)
if len(query) <= 0:
return ErrorCode.NO_TOPIC, response, references
chunks, references = self.agent.call_rag_retrieve(query)
if len(chunks) == 0:
return self.response_direct_by_llm(query)
if judgment:
chunks = self.judgment_results(
query, chunks,
throttle=5,
)
# 如果DataBase检索到了,就用检索到的块去回答
if len(chunks) > 0:
prompt, history = self.agent.generate_prompt(
instruction=query,
context=chunks,
history_pair=history,
template=self.GENERATE_TEMPLATE)
logger.debug('prompt: {}'.format(prompt))
response = self.agent.call_llm_response(prompt=prompt, history=history)
return ErrorCode.SUCCESS, response, references
else:
return self.response_direct_by_llm(query)
\ No newline at end of file
#!/usr/bin/env python3
import argparse
import os
from multiprocessing import Process, Value
from loguru import logger
from llm_service import Worker, llm_inference
def check_envs(args):
if all(isinstance(item, int) for item in args.DCU_ID):
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, args.DCU_ID))
logger.info(f"Set environment variable CUDA_VISIBLE_DEVICES to {args.DCU_ID}")
else:
logger.error(f"The --DCU_ID argument must be a list of integers, but got {args.DCU_ID}")
raise ValueError("The --DCU_ID argument must be a list of integers")
def parse_args():
"""Parse args."""
parser = argparse.ArgumentParser(description='Executor.')
parser.add_argument(
'--DCU_ID',
default=[1,2,6,7],
help='设置DCU')
parser.add_argument(
'--config_path',
default='/path/to/your/ai/config.ini',
type=str,
help='config.ini路径')
parser.add_argument(
'--standalone',
default=False,
help='部署LLM推理服务.')
parser.add_argument(
'--accelerate',
default=False,
type=bool,
help='LLM推理是否启用加速'
)
args = parser.parse_args()
return args
def build_reply_text(reply: str, references: list):
if len(references) < 1:
return reply
ret = reply
for ref in references:
ret += '\n'
ret += ref
return ret
def reply_workflow(assistant):
queries = ['你好,我们公司想要购买几台测试机,请问需要联系贵公司哪位?']
for query in queries:
code, reply, references = assistant.produce_response(query=query,
history=[],
judgment=False)
logger.info(f'{code}, {query}, {reply}, {references}')
def run():
args = parse_args()
if args.standalone is True:
import time
check_envs(args)
server_ready = Value('i', 0)
server_process = Process(target=llm_inference,
args=(args.config_path,
len(args.DCU_ID),
args.accelerate,
server_ready))
server_process.daemon = True
server_process.start()
while True:
if server_ready.value == 0:
logger.info('waiting for server to be ready..')
time.sleep(15)
elif server_ready.value == 1:
break
else:
logger.error('start local LLM server failed, quit.')
raise Exception('local LLM path')
logger.info('LLM Server start.')
assistant = Worker(args=args)
reply_workflow(assistant)
if __name__ == '__main__':
run()
import json
import requests
from loguru import logger
import argparse
def start(query):
url = 'http://127.0.0.1:8888/work'
try:
header = {'Content-Type': 'application/json'}
# Add history to data
data = {
'query': query
}
resp = requests.post(url,
headers=header,
data=json.dumps(data),
timeout=300)
if resp.status_code != 200:
raise Exception(str((resp.status_code, resp.reason)))
return resp.json()['reply'], resp.json()['references']
except Exception as e:
logger.error(str(e))
return ''
def parse_args():
parser = argparse.ArgumentParser(description='.')
parser.add_argument('--query',
default='your query',
help='')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
reply, ref = start(args.query)
logger.debug('reply: {} \nref: {} '.format(reply,
ref))
\ No newline at end of file
import os
import argparse
import bisect
import configparser
import subprocess
from aiohttp import web
from loguru import logger
from llm_service import Worker
from scipy.stats import rankdata
divisible_by_32 = [1, 2, 4, 8, 16, 32]
recv_file_path = "%s/upload"
def workflow(args):
config = configparser.ConfigParser()
config.read(args.config_path)
bind_port = int(config['default']['bind_port'])
dcu_ids = auto_select_dcu(config)
tensor_parallel_size = len(dcu_ids)
try:
assistant = Worker(config, tensor_parallel_size)
except Exception as e:
raise (e)
async def work(request):
input_json = await request.json()
query = input_json['query']
code, reply, references = assistant.produce_response(query=query,
history=[],
judgment=False)
return web.json_response({'reply': reply, 'references': references})
async def handle_upload(request):
reader = await request.multipart()
while True:
field = await reader.next()
if field is None:
break
filename = field.filename
# Save to server
save_path = recv_file_path % config['default']['work_dir']
if not os.path.exists(save_path):
os.makedirs(save_path)
file_path = os.path.join(save_path, filename)
with open(file_path, 'wb') as f:
while True:
chunk = await field.read_chunk()
if not chunk:
break
f.write(chunk)
logger.debug("成功接收文件:%s" % file_path)
# Call file parse process
assistant.agent.parse_file_and_merge(save_path)
return web.json_response({"reply": "成功接收文件:{filename}\n"})
app = web.Application()
app.add_routes([web.post('/work', work),
web.post('/upload', handle_upload)])
web.run_app(app, host='0.0.0.0', port=bind_port)
def auto_select_dcu(config):
# Read threshold in config file
mem_threshold = config.getint('default', 'mem_threshold')
dcu_threshold = config.getint('default', 'dcu_threshold')
# Get dcu usage
process = subprocess.Popen("hy-smi | grep '^[0-9]' | awk '{print $1,$6,$7}' | sed 's/%//g'", shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
result = output.decode().strip().split('\n')
if not result:
raise Exception("There is no dcu on this node.")
dcu_map = {}
for line in result:
dcu_info = line.split()
if int(dcu_info[1]) >= mem_threshold or int(dcu_info[2]) >= dcu_threshold:
logger.debug("filter dcu:%s, which mem usage (%s) and dcu usage (%s) above the threshold." % (dcu_info[0],
dcu_info[1],
dcu_info[2]))
continue
logger.debug("dcu id:%s, mem usage: %s dcu usage: %s" % (dcu_info[0], dcu_info[1], dcu_info[2]))
dcu_map[dcu_info[0]] = [int(dcu_info[1]), int(dcu_info[2])]
# Select dcu count must be divisible by 32.
# TODO temporary use 40% of available count
count = round(len(dcu_map.keys()) * 0.4)
if not count:
logger.error("There is no available dcu device, can not start the service.")
raise Exception("There is no available dcu device, can not start the service.")
insert_index = bisect.bisect_left(divisible_by_32, count)
#insert_index = bisect.bisect_left(divisible_by_32, len(dcu_map.keys()))
if insert_index > 0 and count != divisible_by_32[insert_index]:
index = insert_index - 1
elif count == divisible_by_32[insert_index]:
index = insert_index
else:
index = 0
select_count = divisible_by_32[index]
# Based on the ranking of memory and dcu usage.
dcu_mem_use_rank = [item[0] for item in dcu_map.values()]
dcu_use_rank = [item[1] for item in dcu_map.values()]
# Calculate the final ranking
final_rank = [(name, dcu_mem_use_rank[i] + dcu_use_rank[i]) for i, name in enumerate(dcu_map.keys())]
sorted_rank = sorted(final_rank, key=lambda x: x[1])
sorted_dcu_ids = [item[0] for item in sorted_rank]
select_dcu_ids = sorted_dcu_ids[:select_count]
os.environ["CUDA_VISIBLE_DEVICES"] = ','.join(map(str, select_dcu_ids))
logger.info(f"Set environment variable CUDA_VISIBLE_DEVICES to {select_dcu_ids}")
return select_dcu_ids
def parse_args():
parser = argparse.ArgumentParser(description='Start all services.')
parser.add_argument('--config_path',
default='ai/config.ini',
help='Config directory')
parser.add_argument('--log_path',
default='',
help='Set log file path')
return parser.parse_args()
def main():
args = parse_args()
log_path = '/var/log/assistant.log'
if args.log_path:
log_path = args.log_path
logger.add(sink=log_path, level="DEBUG", rotation="500MB", compression="zip", encoding="utf-8", enqueue=True)
workflow(args)
if __name__ == '__main__':
main()
\ No newline at end of file
from typing import Tuple
__release__ = '0.0.1'
short_release = __release__
def extract_release_info(release_str: str) -> Tuple:
release_info = []
for part in release_str.split('.'):
if part.isdigit():
release_info.append(int(part))
elif 'rc' in part:
patch_info = part.split('rc')
release_info.append(int(patch_info[0]))
release_info.append(f'rc{patch_info[1]}')
return tuple(release_info)
release_info = extract_release_info(__release__)
[
"阿树是谁",
"具体在哪些位置进行修改?",
"你是谁?",
"1+1",
"你好",
"需要修改的内容的多吗",
"你好,介绍下自己",
"你能干什么",
"啊,不是吧",
"FCOS",
"那我怎么打印",
"?",
"MMSelfSup",
"百度",
"啥?",
"你是谁",
"那历史方面的问题呢",
"你被预设了什么人设",
"SIM卡鉴权过程中,跟踪区域码TAC起到了什么作用?请详述双向鉴权过程,尤其涉及TAC以及小区ID部分",
"DCOS",
"你帅吗",
"有新手入门教程吗",
"你说你是",
"你把两个问题合起来想一想",
"OpoenMMLab 会被取代吗",
"为什么",
"MMSelfSup有什么用?",
"群号有吗",
"有交流群吗",
"你会哪些问题啊",
"本垃圾在缓慢学习这些玩意",
"能不能找到上面的安装手册呢?",
"xtcocotools安装不了",
"你这是llm模型吗",
"在线难样本挖掘",
"ncnn全名是什么",
"先有鸡还是先有蛋?"
]
\ No newline at end of file
This diff is collapsed.
[
"请问DCU中如何查看显卡显存等信息?",
"Z100支持什么类型的计算精度?",
"请问DCU中如何查看显卡显存等信息?",
"能否概括一下DCU软件栈?",
"DCU如何实现分布式训练?",
"Rocm-smi的输出正常,但是rocminfo报错",
"什么是miopen?",
"怎样通过容器分割物理机上的DCU加速卡?",
"yolov5的内存出现memory access fault是因为什么?",
"为什么运行时找不到rocblas库",
"什么是服务器?",
"你能解释一下云服务器是什么吗?",
"在什么情况下可以提升服务器的性能?",
"什么是交换机?",
"负载均衡器是什么?",
"CDN是什么?",
"什么是人工智能(AI)?",
"深度学习指的是什么?",
"人工智能的伦理和道德问题有哪些?",
"人工智能与机器学习有何不同之处?",
"AI加速卡的工作原理是什么?",
"如何部署和使用AI加速卡?",
"DCU加速器计算单元微架构是什么样的?",
"有哪些HIP主机编程接口可供使用?",
"并行计算计算机的结构体系是什么样的?",
"MIGraphX都有哪些特性?",
"DTK是做什么用的?",
"曙光的HPC有哪些技术优势?",
"DCU在人工智能领域有哪些实际应用?",
"在DCU上如何配置onnxruntime环境?",
"为什么在本地可以运行,但在容器中无法运行?",
"MIOpen发生错误时应该如何处理?",
"如何进行DCU代码的移植?",
"DCU支持哪些深度学习框架或工具?",
"请问DCU上支持的大模型目前有哪些?",
"请问ac平台上slurm如何申请DCU资源?",
"请问DCU的AI生态包可以从哪里下载?"
]
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment