Unverified Commit 29b38d12 authored by Xiaomeng Zhao's avatar Xiaomeng Zhao Committed by GitHub
Browse files

Merge pull request #1071 from icecraft/fix/demo

Fix/demo
parents 4e0b3a8f e9ace3eb
import os
from loguru import logger
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.pipe.UNIPipe import UNIPipe
try:
current_script_dir = os.path.dirname(os.path.abspath(__file__))
demo_name = "demo1"
pdf_path = os.path.join(current_script_dir, f"{demo_name}.pdf")
pdf_bytes = open(pdf_path, "rb").read()
jso_useful_key = {"_pdf_type": "", "model_list": []}
demo_name = 'demo1'
pdf_path = os.path.join(current_script_dir, f'{demo_name}.pdf')
pdf_bytes = open(pdf_path, 'rb').read()
jso_useful_key = {'_pdf_type': '', 'model_list': []}
local_image_dir = os.path.join(current_script_dir, 'images')
image_dir = str(os.path.basename(local_image_dir))
image_writer = DiskReaderWriter(local_image_dir)
image_writer = FileBasedDataWriter(local_image_dir)
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
pipe.pipe_classify()
pipe.pipe_analyze()
pipe.pipe_parse()
md_content = pipe.pipe_mk_markdown(image_dir, drop_mode="none")
with open(f"{demo_name}.md", "w", encoding="utf-8") as f:
md_content = pipe.pipe_mk_markdown(image_dir, drop_mode='none')
with open(f'{demo_name}.md', 'w', encoding='utf-8') as f:
f.write(md_content)
except Exception as e:
logger.exception(e)
\ No newline at end of file
logger.exception(e)
import os
import json
import copy
from loguru import logger
from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
# todo: 设备类型选择 (?)
def json_md_dump(
pipe,
md_writer,
pdf_name,
content_list,
md_content,
orig_model_list,
):
# 写入模型结果到 model.json
md_writer.write(
content=json.dumps(orig_model_list, ensure_ascii=False, indent=4),
path=f"{pdf_name}_model.json"
)
# 写入中间结果到 middle.json
md_writer.write(
content=json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
path=f"{pdf_name}_middle.json"
)
# text文本结果写入到 conent_list.json
md_writer.write(
content=json.dumps(content_list, ensure_ascii=False, indent=4),
path=f"{pdf_name}_content_list.json"
)
# 写入结果到 .md 文件中
md_writer.write(
content=md_content,
path=f"{pdf_name}.md"
)
# 可视化
def draw_visualization_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name):
# 画布局框,附带排序结果
draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
# 画 span 框
draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
def pdf_parse_main(
pdf_path: str,
parse_method: str = 'auto',
model_json_path: str = None,
is_json_md_dump: bool = True,
is_draw_visualization_bbox: bool = True,
output_dir: str = None
):
"""
执行从 pdf 转换到 json、md 的过程,输出 md 和 json 文件到 pdf 文件所在的目录
:param pdf_path: .pdf 文件的路径,可以是相对路径,也可以是绝对路径
:param parse_method: 解析方法, 共 auto、ocr、txt 三种,默认 auto,如果效果不好,可以尝试 ocr
:param model_json_path: 已经存在的模型数据文件,如果为空则使用内置模型,pdf 和 model_json 务必对应
:param is_json_md_dump: 是否将解析后的数据写入到 .json 和 .md 文件中,默认 True,会将不同阶段的数据写入到不同的 .json 文件中(共3个.json文件),md内容会保存到 .md 文件中
:param output_dir: 输出结果的目录地址,会生成一个以 pdf 文件名命名的文件夹并保存所有结果
"""
try:
pdf_name = os.path.basename(pdf_path).split(".")[0]
pdf_path_parent = os.path.dirname(pdf_path)
if output_dir:
output_path = os.path.join(output_dir, pdf_name)
else:
output_path = os.path.join(pdf_path_parent, pdf_name)
output_image_path = os.path.join(output_path, 'images')
# 获取图片的父路径,为的是以相对路径保存到 .md 和 conent_list.json 文件中
image_path_parent = os.path.basename(output_image_path)
pdf_bytes = open(pdf_path, "rb").read() # 读取 pdf 文件的二进制数据
orig_model_list = []
if model_json_path:
# 读取已经被模型解析后的pdf文件的 json 原始数据,list 类型
model_json = json.loads(open(model_json_path, "r", encoding="utf-8").read())
orig_model_list = copy.deepcopy(model_json)
else:
model_json = []
# 执行解析步骤
# image_writer = DiskReaderWriter(output_image_path)
image_writer, md_writer = DiskReaderWriter(output_image_path), DiskReaderWriter(output_path)
# 选择解析方式
# jso_useful_key = {"_pdf_type": "", "model_list": model_json}
# pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
if parse_method == "auto":
jso_useful_key = {"_pdf_type": "", "model_list": model_json}
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
elif parse_method == "txt":
pipe = TXTPipe(pdf_bytes, model_json, image_writer)
elif parse_method == "ocr":
pipe = OCRPipe(pdf_bytes, model_json, image_writer)
else:
logger.error("unknown parse method, only auto, ocr, txt allowed")
exit(1)
# 执行分类
pipe.pipe_classify()
# 如果没有传入模型数据,则使用内置模型解析
if len(model_json) == 0:
pipe.pipe_analyze() # 解析
orig_model_list = copy.deepcopy(pipe.model_list)
# 执行解析
pipe.pipe_parse()
# 保存 text 和 md 格式的结果
content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode="none")
md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode="none")
if is_json_md_dump:
json_md_dump(pipe, md_writer, pdf_name, content_list, md_content, orig_model_list)
if is_draw_visualization_bbox:
draw_visualization_bbox(pipe.pdf_mid_data['pdf_info'], pdf_bytes, output_path, pdf_name)
except Exception as e:
logger.exception(e)
# 测试
if __name__ == '__main__':
pdf_path = r"D:\project\20240617magicpdf\Magic-PDF\demo\demo1.pdf"
pdf_parse_main(pdf_path)
import copy
import json
import os
from loguru import logger
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.pipe.UNIPipe import UNIPipe
# todo: 设备类型选择 (?)
def json_md_dump(
pipe,
md_writer,
pdf_name,
content_list,
md_content,
orig_model_list,
):
# 写入模型结果到 model.json
md_writer.write_string(
f'{pdf_name}_model.json',
json.dumps(orig_model_list, ensure_ascii=False, indent=4)
)
# 写入中间结果到 middle.json
md_writer.write_string(
f'{pdf_name}_middle.json',
json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4)
)
# text文本结果写入到 conent_list.json
md_writer.write_string(
f'{pdf_name}_content_list.json',
json.dumps(content_list, ensure_ascii=False, indent=4)
)
# 写入结果到 .md 文件中
md_writer.write_string(
f'{pdf_name}.md',
md_content,
)
# 可视化
def draw_visualization_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name):
# 画布局框,附带排序结果
draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
# 画 span 框
draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
def pdf_parse_main(
pdf_path: str,
parse_method: str = 'auto',
model_json_path: str = None,
is_json_md_dump: bool = True,
is_draw_visualization_bbox: bool = True,
output_dir: str = None
):
"""执行从 pdf 转换到 json、md 的过程,输出 md 和 json 文件到 pdf 文件所在的目录.
:param pdf_path: .pdf 文件的路径,可以是相对路径,也可以是绝对路径
:param parse_method: 解析方法, 共 auto、ocr、txt 三种,默认 auto,如果效果不好,可以尝试 ocr
:param model_json_path: 已经存在的模型数据文件,如果为空则使用内置模型,pdf 和 model_json 务必对应
:param is_json_md_dump: 是否将解析后的数据写入到 .json 和 .md 文件中,默认 True,会将不同阶段的数据写入到不同的 .json 文件中(共3个.json文件),md内容会保存到 .md 文件中
:param output_dir: 输出结果的目录地址,会生成一个以 pdf 文件名命名的文件夹并保存所有结果
"""
try:
pdf_name = os.path.basename(pdf_path).split('.')[0]
pdf_path_parent = os.path.dirname(pdf_path)
if output_dir:
output_path = os.path.join(output_dir, pdf_name)
else:
output_path = os.path.join(pdf_path_parent, pdf_name)
output_image_path = os.path.join(output_path, 'images')
# 获取图片的父路径,为的是以相对路径保存到 .md 和 conent_list.json 文件中
image_path_parent = os.path.basename(output_image_path)
pdf_bytes = open(pdf_path, 'rb').read() # 读取 pdf 文件的二进制数据
orig_model_list = []
if model_json_path:
# 读取已经被模型解析后的pdf文件的 json 原始数据,list 类型
model_json = json.loads(open(model_json_path, 'r', encoding='utf-8').read())
orig_model_list = copy.deepcopy(model_json)
else:
model_json = []
# 执行解析步骤
# image_writer = DiskReaderWriter(output_image_path)
image_writer, md_writer = FileBasedDataWriter(output_image_path), FileBasedDataWriter(output_path)
# 选择解析方式
# jso_useful_key = {"_pdf_type": "", "model_list": model_json}
# pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
if parse_method == 'auto':
jso_useful_key = {'_pdf_type': '', 'model_list': model_json}
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
elif parse_method == 'txt':
pipe = TXTPipe(pdf_bytes, model_json, image_writer)
elif parse_method == 'ocr':
pipe = OCRPipe(pdf_bytes, model_json, image_writer)
else:
logger.error('unknown parse method, only auto, ocr, txt allowed')
exit(1)
# 执行分类
pipe.pipe_classify()
# 如果没有传入模型数据,则使用内置模型解析
if len(model_json) == 0:
pipe.pipe_analyze() # 解析
orig_model_list = copy.deepcopy(pipe.model_list)
# 执行解析
pipe.pipe_parse()
# 保存 text 和 md 格式的结果
content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode='none')
md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode='none')
if is_json_md_dump:
json_md_dump(pipe, md_writer, pdf_name, content_list, md_content, orig_model_list)
if is_draw_visualization_bbox:
draw_visualization_bbox(pipe.pdf_mid_data['pdf_info'], pdf_bytes, output_path, pdf_name)
except Exception as e:
logger.exception(e)
# 测试
if __name__ == '__main__':
pdf_path = r'D:\project\20240617magicpdf\Magic-PDF\demo\demo1.pdf'
pdf_parse_main(pdf_path)
......@@ -55,5 +55,8 @@ class FileBasedDataWriter(DataWriter):
if not os.path.isabs(fn_path) and len(self._parent_dir) > 0:
fn_path = os.path.join(self._parent_dir, path)
if not os.path.exists(os.path.dirname(fn_path)):
os.makedirs(os.path.dirname(fn_path), exist_ok=True)
with open(fn_path, 'wb') as f:
f.write(data)
......@@ -3,75 +3,79 @@ import json
import os
from tempfile import NamedTemporaryFile
import magic_pdf.model as model_config
import uvicorn
from fastapi import FastAPI, File, UploadFile, Form
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from loguru import logger
import magic_pdf.model as model_config
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
model_config.__use_inside_model__ = True
app = FastAPI()
def json_md_dump(
pipe,
md_writer,
pdf_name,
content_list,
md_content,
pipe,
md_writer,
pdf_name,
content_list,
md_content,
):
# Write model results to model.json
orig_model_list = copy.deepcopy(pipe.model_list)
md_writer.write(
content=json.dumps(orig_model_list, ensure_ascii=False, indent=4),
path=f"{pdf_name}_model.json"
md_writer.write_string(
f'{pdf_name}_model.json',
json.dumps(orig_model_list, ensure_ascii=False, indent=4),
)
# Write intermediate results to middle.json
md_writer.write(
content=json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
path=f"{pdf_name}_middle.json"
md_writer.write_string(
f'{pdf_name}_middle.json',
json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
)
# Write text content results to content_list.json
md_writer.write(
content=json.dumps(content_list, ensure_ascii=False, indent=4),
path=f"{pdf_name}_content_list.json"
md_writer.write_string(
f'{pdf_name}_content_list.json',
json.dumps(content_list, ensure_ascii=False, indent=4),
)
# Write results to .md file
md_writer.write(
content=md_content,
path=f"{pdf_name}.md"
md_writer.write_string(
f'{pdf_name}.md',
md_content,
)
@app.post("/pdf_parse", tags=["projects"], summary="Parse PDF file")
@app.post('/pdf_parse', tags=['projects'], summary='Parse PDF file')
async def pdf_parse_main(
pdf_file: UploadFile = File(...),
parse_method: str = 'auto',
model_json_path: str = None,
is_json_md_dump: bool = True,
output_dir: str = "output"
pdf_file: UploadFile = File(...),
parse_method: str = 'auto',
model_json_path: str = None,
is_json_md_dump: bool = True,
output_dir: str = 'output',
):
"""
Execute the process of converting PDF to JSON and MD, outputting MD and JSON files to the specified directory
"""Execute the process of converting PDF to JSON and MD, outputting MD and
JSON files to the specified directory.
:param pdf_file: The PDF file to be parsed
:param parse_method: Parsing method, can be auto, ocr, or txt. Default is auto. If results are not satisfactory, try ocr
:param model_json_path: Path to existing model data file. If empty, use built-in model. PDF and model_json must correspond
:param is_json_md_dump: Whether to write parsed data to .json and .md files. Default is True. Different stages of data will be written to different .json files (3 in total), md content will be saved to .md file
:param is_json_md_dump: Whether to write parsed data to .json and .md files. Default is True. Different stages of data will be written to different .json files (3 in total), md content will be saved to .md file # noqa E501
:param output_dir: Output directory for results. A folder named after the PDF file will be created to store all results
"""
try:
# Create a temporary file to store the uploaded PDF
with NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
with NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf:
temp_pdf.write(await pdf_file.read())
temp_pdf_path = temp_pdf.name
pdf_name = os.path.basename(pdf_file.filename).split(".")[0]
pdf_name = os.path.basename(pdf_file.filename).split('.')[0]
if output_dir:
output_path = os.path.join(output_dir, pdf_name)
......@@ -83,28 +87,32 @@ async def pdf_parse_main(
# Get parent path of images for relative path in .md and content_list.json
image_path_parent = os.path.basename(output_image_path)
pdf_bytes = open(temp_pdf_path, "rb").read() # Read binary data of PDF file
pdf_bytes = open(temp_pdf_path, 'rb').read() # Read binary data of PDF file
if model_json_path:
# Read original JSON data of PDF file parsed by model, list type
model_json = json.loads(open(model_json_path, "r", encoding="utf-8").read())
model_json = json.loads(open(model_json_path, 'r', encoding='utf-8').read())
else:
model_json = []
# Execute parsing steps
image_writer, md_writer = DiskReaderWriter(output_image_path), DiskReaderWriter(output_path)
image_writer, md_writer = FileBasedDataWriter(
output_image_path
), FileBasedDataWriter(output_path)
# Choose parsing method
if parse_method == "auto":
jso_useful_key = {"_pdf_type": "", "model_list": model_json}
if parse_method == 'auto':
jso_useful_key = {'_pdf_type': '', 'model_list': model_json}
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
elif parse_method == "txt":
elif parse_method == 'txt':
pipe = TXTPipe(pdf_bytes, model_json, image_writer)
elif parse_method == "ocr":
elif parse_method == 'ocr':
pipe = OCRPipe(pdf_bytes, model_json, image_writer)
else:
logger.error("Unknown parse method, only auto, ocr, txt allowed")
return JSONResponse(content={"error": "Invalid parse method"}, status_code=400)
logger.error('Unknown parse method, only auto, ocr, txt allowed')
return JSONResponse(
content={'error': 'Invalid parse method'}, status_code=400
)
# Execute classification
pipe.pipe_classify()
......@@ -114,28 +122,36 @@ async def pdf_parse_main(
if model_config.__use_inside_model__:
pipe.pipe_analyze() # Parse
else:
logger.error("Need model list input")
return JSONResponse(content={"error": "Model list input required"}, status_code=400)
logger.error('Need model list input')
return JSONResponse(
content={'error': 'Model list input required'}, status_code=400
)
# Execute parsing
pipe.pipe_parse()
# Save results in text and md format
content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode="none")
md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode="none")
content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode='none')
md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode='none')
if is_json_md_dump:
json_md_dump(pipe, md_writer, pdf_name, content_list, md_content)
data = {"layout": copy.deepcopy(pipe.model_list), "info": pipe.pdf_mid_data, "content_list": content_list,'md_content':md_content}
data = {
'layout': copy.deepcopy(pipe.model_list),
'info': pipe.pdf_mid_data,
'content_list': content_list,
'md_content': md_content,
}
return JSONResponse(data, status_code=200)
except Exception as e:
logger.exception(e)
return JSONResponse(content={"error": str(e)}, status_code=500)
return JSONResponse(content={'error': str(e)}, status_code=500)
finally:
# Clean up the temporary file
if 'temp_pdf_path' in locals():
os.unlink(temp_pdf_path)
# if __name__ == '__main__':
# uvicorn.run(app, host="0.0.0.0", port=8888)
\ No newline at end of file
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=8888)
import json
import re
import os
import shutil
import traceback
from pathlib import Path
from common.error_types import ApiException
from common.mk_markdown.mk_markdown import \
ocr_mk_mm_markdown_with_para_and_pagination
from flask import current_app, url_for
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.pipe.UNIPipe import UNIPipe
from loguru import logger
import magic_pdf.model as model_config
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.libs.json_compressor import JsonCompressor
from common.mk_markdown.mk_markdown import ocr_mk_mm_markdown_with_para_and_pagination
from magic_pdf.pipe.UNIPipe import UNIPipe
from ..extensions import app, db
from .ext import find_file
from ..extentions import app, db
from .models import AnalysisPdf, AnalysisTask
from common.error_types import ApiException
from loguru import logger
model_config.__use_inside_model__ = True
......@@ -22,51 +25,51 @@ model_config.__use_inside_model__ = True
def analysis_pdf(image_url_prefix, image_dir, pdf_bytes, is_ocr=False):
try:
model_json = [] # model_json传空list使用内置模型解析
logger.info(f"is_ocr: {is_ocr}")
logger.info(f'is_ocr: {is_ocr}')
if not is_ocr:
jso_useful_key = {"_pdf_type": "", "model_list": model_json}
image_writer = DiskReaderWriter(image_dir)
jso_useful_key = {'_pdf_type': '', 'model_list': model_json}
image_writer = FileBasedDataWriter(image_dir)
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer, is_debug=True)
pipe.pipe_classify()
else:
jso_useful_key = {"_pdf_type": "ocr", "model_list": model_json}
image_writer = DiskReaderWriter(image_dir)
jso_useful_key = {'_pdf_type': 'ocr', 'model_list': model_json}
image_writer = FileBasedDataWriter(image_dir)
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer, is_debug=True)
"""如果没有传入有效的模型数据,则使用内置model解析"""
if len(model_json) == 0:
if model_config.__use_inside_model__:
pipe.pipe_analyze()
else:
logger.error("need model list input")
logger.error('need model list input')
exit(1)
pipe.pipe_parse()
pdf_mid_data = JsonCompressor.decompress_json(pipe.get_compress_pdf_mid_data())
pdf_info_list = pdf_mid_data["pdf_info"]
pdf_info_list = pdf_mid_data['pdf_info']
md_content = json.dumps(ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_list, image_url_prefix),
ensure_ascii=False)
bbox_info = get_bbox_info(pdf_info_list)
return md_content, bbox_info
except Exception as e:
except Exception as e: # noqa: F841
logger.error(traceback.format_exc())
def get_bbox_info(data):
bbox_info = []
for page in data:
preproc_blocks = page.get("preproc_blocks", [])
discarded_blocks = page.get("discarded_blocks", [])
preproc_blocks = page.get('preproc_blocks', [])
discarded_blocks = page.get('discarded_blocks', [])
bbox_info.append({
"preproc_blocks": preproc_blocks,
"page_idx": page.get("page_idx"),
"page_size": page.get("page_size"),
"discarded_blocks": discarded_blocks,
'preproc_blocks': preproc_blocks,
'page_idx': page.get('page_idx'),
'page_size': page.get('page_size'),
'discarded_blocks': discarded_blocks,
})
return bbox_info
def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
"""
解析pdf
"""解析pdf.
:param pdf_dir: pdf解析目录
:param image_dir: 图片目录
:param pdf_path: pdf路径
......@@ -75,8 +78,8 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
:return:
"""
try:
logger.info(f"start task: {pdf_path}")
logger.info(f"image_dir: {image_dir}")
logger.info(f'start task: {pdf_path}')
logger.info(f'image_dir: {image_dir}')
if not Path(image_dir).exists():
Path(image_dir).mkdir(parents=True, exist_ok=True)
else:
......@@ -96,26 +99,26 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
# ############ markdown #############
pdf_name = Path(pdf_path).name
full_md_content = ""
full_md_content = ''
for item in json.loads(md_content):
full_md_content += item["md_content"] + "\n"
full_md_content += item['md_content'] + '\n'
full_md_name = "full.md"
with open(f"{pdf_dir}/{full_md_name}", "w", encoding="utf-8") as file:
full_md_name = 'full.md'
with open(f'{pdf_dir}/{full_md_name}', 'w', encoding='utf-8') as file:
file.write(full_md_content)
with app.app_context():
full_md_link = url_for('analysis.mdview', filename=full_md_name, as_attachment=False)
full_md_link = f"{full_md_link}&pdf={pdf_name}"
full_md_link = f'{full_md_link}&pdf={pdf_name}'
md_link_list = []
with app.app_context():
for n, md in enumerate(json.loads(md_content)):
md_content = md["md_content"]
md_content = md['md_content']
md_name = f"{md.get('page_no', n)}.md"
with open(f"{pdf_dir}/{md_name}", "w", encoding="utf-8") as file:
with open(f'{pdf_dir}/{md_name}', 'w', encoding='utf-8') as file:
file.write(md_content)
md_url = url_for('analysis.mdview', filename=md_name, as_attachment=False)
md_link_list.append(f"{md_url}&pdf={pdf_name}")
md_link_list.append(f'{md_url}&pdf={pdf_name}')
with app.app_context():
with db.auto_commit():
......@@ -129,8 +132,8 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
analysis_task_object = AnalysisTask.query.filter_by(analysis_pdf_id=analysis_pdf_id).first()
analysis_task_object.status = 1
db.session.add(analysis_task_object)
logger.info(f"finished!")
except Exception as e:
logger.info('finished!')
except Exception as e: # noqa: F841
logger.error(traceback.format_exc())
with app.app_context():
with db.auto_commit():
......@@ -141,7 +144,7 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
analysis_task_object = AnalysisTask.query.filter_by(analysis_pdf_id=analysis_pdf_id).first()
analysis_task_object.status = 1
db.session.add(analysis_task_object)
raise ApiException(code=500, msg="PDF parsing failed", msgZH="pdf解析失败")
raise ApiException(code=500, msg='PDF parsing failed', msgZH='pdf解析失败')
finally:
# 执行pending
with app.app_context():
......@@ -149,12 +152,12 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
AnalysisTask.update_date.asc()).first()
if analysis_task_object:
pdf_upload_folder = current_app.config['PDF_UPLOAD_FOLDER']
upload_dir = f"{current_app.static_folder}/{pdf_upload_folder}"
upload_dir = f'{current_app.static_folder}/{pdf_upload_folder}'
file_path = find_file(analysis_task_object.file_key, upload_dir)
file_stem = Path(file_path).stem
pdf_analysis_folder = current_app.config['PDF_ANALYSIS_FOLDER']
pdf_dir = f"{current_app.static_folder}/{pdf_analysis_folder}/{file_stem}"
image_dir = f"{pdf_dir}/images"
pdf_dir = f'{current_app.static_folder}/{pdf_analysis_folder}/{file_stem}'
image_dir = f'{pdf_dir}/images'
with db.auto_commit():
analysis_pdf_object = AnalysisPdf.query.filter_by(id=analysis_task_object.analysis_pdf_id).first()
analysis_pdf_object.status = 0
......@@ -164,4 +167,4 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
db.session.add(analysis_task_object)
analysis_pdf_task(pdf_dir, image_dir, file_path, analysis_task_object.is_ocr, analysis_task_object.analysis_pdf_id)
else:
logger.info(f"all task finished!")
logger.info('all task finished!')
from contextlib import contextmanager
from common.error_types import ApiException
from flask import Flask, jsonify
from flask_restful import Api as _Api
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
from flask_migrate import Migrate
from contextlib import contextmanager
from flask_jwt_extended import JWTManager
from flask_marshmallow import Marshmallow
from common.error_types import ApiException
from werkzeug.exceptions import HTTPException
from flask_migrate import Migrate
from flask_restful import Api as _Api
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
from loguru import logger
from werkzeug.exceptions import HTTPException
class Api(_Api):
......@@ -21,23 +22,23 @@ class Api(_Api):
elif isinstance(e, HTTPException):
code = e.code
msg = e.description
msgZH = "服务异常,详细信息请查看日志"
msgZH = '服务异常,详细信息请查看日志'
error_code = e.code
else:
code = 500
msg = str(e)
error_code = 500
msgZH = "服务异常,详细信息请查看日志"
msgZH = '服务异常,详细信息请查看日志'
# 使用 loguru 记录异常信息
logger.opt(exception=e).error(f"An error occurred: {msg}")
logger.opt(exception=e).error(f'An error occurred: {msg}')
return jsonify({
"error": "Internal Server Error" if code == 500 else e.name,
"msg": msg,
"msgZH": msgZH,
"code": code,
"error_code": error_code
'error': 'Internal Server Error' if code == 500 else e.name,
'msg': msg,
'msgZH': msgZH,
'code': code,
'error_code': error_code
}), code
......@@ -59,4 +60,4 @@ db = SQLAlchemy()
migrate = Migrate()
jwt = JWTManager()
ma = Marshmallow()
folder = app.config.get("REACT_APP_DIST")
folder = app.config.get('REACT_APP_DIST')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment