Unverified Commit 0c6dc9b1 authored by Xiaomeng Zhao's avatar Xiaomeng Zhao Committed by GitHub
Browse files

Merge pull request #971 from LollipopsAndWine/dev

parents 94f6bd83 ebfab424
import json import json
import re import re
import os
import shutil
import traceback import traceback
from pathlib import Path from pathlib import Path
from flask import current_app, url_for from flask import current_app, url_for
...@@ -7,7 +9,7 @@ from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter ...@@ -7,7 +9,7 @@ from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.pipe.UNIPipe import UNIPipe from magic_pdf.pipe.UNIPipe import UNIPipe
import magic_pdf.model as model_config import magic_pdf.model as model_config
from magic_pdf.libs.json_compressor import JsonCompressor from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.dict2md.ocr_mkcontent import ocr_mk_mm_markdown_with_para_and_pagination from common.mk_markdown.mk_markdown import ocr_mk_mm_markdown_with_para_and_pagination
from .ext import find_file from .ext import find_file
from ..extentions import app, db from ..extentions import app, db
from .models import AnalysisPdf, AnalysisTask from .models import AnalysisPdf, AnalysisTask
...@@ -17,7 +19,7 @@ from loguru import logger ...@@ -17,7 +19,7 @@ from loguru import logger
model_config.__use_inside_model__ = True model_config.__use_inside_model__ = True
def analysis_pdf(image_dir, pdf_bytes, is_ocr=False): def analysis_pdf(image_url_prefix, image_dir, pdf_bytes, is_ocr=False):
try: try:
model_json = [] # model_json传空list使用内置模型解析 model_json = [] # model_json传空list使用内置模型解析
logger.info(f"is_ocr: {is_ocr}") logger.info(f"is_ocr: {is_ocr}")
...@@ -40,7 +42,7 @@ def analysis_pdf(image_dir, pdf_bytes, is_ocr=False): ...@@ -40,7 +42,7 @@ def analysis_pdf(image_dir, pdf_bytes, is_ocr=False):
pipe.pipe_parse() pipe.pipe_parse()
pdf_mid_data = JsonCompressor.decompress_json(pipe.get_compress_pdf_mid_data()) pdf_mid_data = JsonCompressor.decompress_json(pipe.get_compress_pdf_mid_data())
pdf_info_list = pdf_mid_data["pdf_info"] pdf_info_list = pdf_mid_data["pdf_info"]
md_content = json.dumps(ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_list, image_dir), md_content = json.dumps(ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_list, image_url_prefix),
ensure_ascii=False) ensure_ascii=False)
bbox_info = get_bbox_info(pdf_info_list) bbox_info = get_bbox_info(pdf_info_list)
return md_content, bbox_info return md_content, bbox_info
...@@ -77,20 +79,22 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id): ...@@ -77,20 +79,22 @@ def analysis_pdf_task(pdf_dir, image_dir, pdf_path, is_ocr, analysis_pdf_id):
logger.info(f"image_dir: {image_dir}") logger.info(f"image_dir: {image_dir}")
if not Path(image_dir).exists(): if not Path(image_dir).exists():
Path(image_dir).mkdir(parents=True, exist_ok=True) Path(image_dir).mkdir(parents=True, exist_ok=True)
else:
# 清空image_dir,避免同文件多次解析图片积累
shutil.rmtree(image_dir, ignore_errors=True)
os.makedirs(image_dir, exist_ok=True)
# 获取文件内容
with open(pdf_path, 'rb') as file: with open(pdf_path, 'rb') as file:
pdf_bytes = file.read() pdf_bytes = file.read()
md_content, bbox_info = analysis_pdf(image_dir, pdf_bytes, is_ocr) # 生成图片链接
img_list = Path(image_dir).glob('*') if Path(image_dir).exists() else [] with app.app_context():
image_url_prefix = f"http://{current_app.config['SERVER_NAME']}{current_app.config['FILE_API']}&pdf={Path(pdf_path).name}&filename="
# 解析文件
md_content, bbox_info = analysis_pdf(image_url_prefix, image_dir, pdf_bytes, is_ocr)
# ############ markdown #############
pdf_name = Path(pdf_path).name pdf_name = Path(pdf_path).name
with app.app_context():
for img in img_list:
img_name = Path(img).name
regex = re.compile(fr'.*\((.*?{img_name})')
regex_result = regex.search(md_content)
if regex_result:
img_url = url_for('analysis.imgview', filename=img_name, as_attachment=False)
md_content = md_content.replace(regex_result.group(1), f"{img_url}&pdf={pdf_name}")
full_md_content = "" full_md_content = ""
for item in json.loads(md_content): for item in json.loads(md_content):
......
...@@ -42,7 +42,7 @@ if database: ...@@ -42,7 +42,7 @@ if database:
ip_address = get_local_ip() ip_address = get_local_ip()
port = config.get("PORT", 5559) port = config.get("PORT", 5559)
# 配置 SERVER_NAME # 配置 SERVER_NAME
config['SERVER_NAME'] = f'{ip_address}:5559' config['SERVER_NAME'] = f'{ip_address}:{port}'
# 配置 APPLICATION_ROOT # 配置 APPLICATION_ROOT
config['APPLICATION_ROOT'] = '/' config['APPLICATION_ROOT'] = '/'
# 配置 PREFERRED_URL_SCHEME # 配置 PREFERRED_URL_SCHEME
......
import os
import unicodedata
if not os.getenv("FTLANG_CACHE"):
current_file_path = os.path.abspath(__file__)
current_dir = os.path.dirname(current_file_path)
root_dir = os.path.dirname(current_dir)
ftlang_cache_dir = os.path.join(root_dir, 'resources', 'fasttext-langdetect')
os.environ["FTLANG_CACHE"] = str(ftlang_cache_dir)
# print(os.getenv("FTLANG_CACHE"))
from fast_langdetect import detect_language
def detect_lang(text: str) -> str:
if len(text) == 0:
return ""
try:
lang_upper = detect_language(text)
except:
html_no_ctrl_chars = ''.join([l for l in text if unicodedata.category(l)[0] not in ['C', ]])
lang_upper = detect_language(html_no_ctrl_chars)
try:
lang = lang_upper.lower()
except:
lang = ""
return lang
if __name__ == '__main__':
print(os.getenv("FTLANG_CACHE"))
print(detect_lang("This is a test."))
print(detect_lang("<html>This is a test</html>"))
print(detect_lang("这个是中文测试。"))
print(detect_lang("<html>这个是中文测试。</html>"))
import re
def escape_special_markdown_char(pymu_blocks):
"""
转义正文里对markdown语法有特殊意义的字符
"""
special_chars = ["*", "`", "~", "$"]
for blk in pymu_blocks:
for line in blk['lines']:
for span in line['spans']:
for char in special_chars:
span_text = span['text']
span_type = span.get("_type", None)
if span_type in ['inline-equation', 'interline-equation']:
continue
elif span_text:
span['text'] = span['text'].replace(char, "\\" + char)
return pymu_blocks
def ocr_escape_special_markdown_char(content):
"""
转义正文里对markdown语法有特殊意义的字符
"""
special_chars = ["*", "`", "~", "$"]
for char in special_chars:
content = content.replace(char, "\\" + char)
return content
class ContentType:
Image = 'image'
Table = 'table'
Text = 'text'
InlineEquation = 'inline_equation'
InterlineEquation = 'interline_equation'
class BlockType:
Image = 'image'
ImageBody = 'image_body'
ImageCaption = 'image_caption'
ImageFootnote = 'image_footnote'
Table = 'table'
TableBody = 'table_body'
TableCaption = 'table_caption'
TableFootnote = 'table_footnote'
Text = 'text'
Title = 'title'
InterlineEquation = 'interline_equation'
Footnote = 'footnote'
Discarded = 'discarded'
class CategoryId:
Title = 0
Text = 1
Abandon = 2
ImageBody = 3
ImageCaption = 4
TableBody = 5
TableCaption = 6
TableFootnote = 7
InterlineEquation_Layout = 8
InlineEquation = 13
InterlineEquation_YOLO = 14
OcrText = 15
ImageFootnote = 101
import re
import wordninja
from .libs.language import detect_lang
from .libs.markdown_utils import ocr_escape_special_markdown_char
from .libs.ocr_content_type import BlockType, ContentType
def __is_hyphen_at_line_end(line):
"""
Check if a line ends with one or more letters followed by a hyphen.
Args:
line (str): The line of text to check.
Returns:
bool: True if the line ends with one or more letters followed by a hyphen, False otherwise.
"""
# Use regex to check if the line ends with one or more letters followed by a hyphen
return bool(re.search(r'[A-Za-z]+-\s*$', line))
def split_long_words(text):
segments = text.split(' ')
for i in range(len(segments)):
words = re.findall(r'\w+|[^\w]', segments[i], re.UNICODE)
for j in range(len(words)):
if len(words[j]) > 10:
words[j] = ' '.join(wordninja.split(words[j]))
segments[i] = ''.join(words)
return ' '.join(segments)
def join_path(*args):
return ''.join(str(s).rstrip('/') for s in args)
def ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_dict: list,
img_buket_path):
markdown_with_para_and_pagination = []
page_no = 0
for page_info in pdf_info_dict:
paras_of_layout = page_info.get('para_blocks')
if not paras_of_layout:
continue
page_markdown = ocr_mk_markdown_with_para_core_v2(
paras_of_layout, 'mm', img_buket_path)
markdown_with_para_and_pagination.append({
'page_no':
page_no,
'md_content':
'\n\n'.join(page_markdown)
})
page_no += 1
return markdown_with_para_and_pagination
def merge_para_with_text(para_block):
def detect_language(text):
en_pattern = r'[a-zA-Z]+'
en_matches = re.findall(en_pattern, text)
en_length = sum(len(match) for match in en_matches)
if len(text) > 0:
if en_length / len(text) >= 0.5:
return 'en'
else:
return 'unknown'
else:
return 'empty'
para_text = ''
for line in para_block['lines']:
line_text = ''
line_lang = ''
for span in line['spans']:
span_type = span['type']
if span_type == ContentType.Text:
line_text += span['content'].strip()
if line_text != '':
line_lang = detect_lang(line_text)
for span in line['spans']:
span_type = span['type']
content = ''
if span_type == ContentType.Text:
content = span['content']
# language = detect_lang(content)
language = detect_language(content)
if language == 'en': # 只对英文长词进行分词处理,中文分词会丢失文本
content = ocr_escape_special_markdown_char(
split_long_words(content))
else:
content = ocr_escape_special_markdown_char(content)
elif span_type == ContentType.InlineEquation:
content = f" ${span['content']}$ "
elif span_type == ContentType.InterlineEquation:
content = f"\n$$\n{span['content']}\n$$\n"
if content != '':
langs = ['zh', 'ja', 'ko']
if line_lang in langs: # 遇到一些一个字一个span的文档,这种单字语言判断不准,需要用整行文本判断
para_text += content # 中文/日语/韩文语境下,content间不需要空格分隔
elif line_lang == 'en':
# 如果是前一行带有-连字符,那么末尾不应该加空格
if __is_hyphen_at_line_end(content):
para_text += content[:-1]
else:
para_text += content + ' '
else:
para_text += content + ' ' # 西方文本语境下 content间需要空格分隔
return para_text
def ocr_mk_markdown_with_para_core_v2(paras_of_layout,
mode,
img_buket_path=''):
page_markdown = []
for para_block in paras_of_layout:
para_text = ''
para_type = para_block['type']
if para_type == BlockType.Text:
para_text = merge_para_with_text(para_block)
elif para_type == BlockType.Title:
para_text = f'# {merge_para_with_text(para_block)}'
elif para_type == BlockType.InterlineEquation:
para_text = merge_para_with_text(para_block)
elif para_type == BlockType.Image:
if mode == 'nlp':
continue
elif mode == 'mm':
for block in para_block['blocks']: # 1st.拼image_body
if block['type'] == BlockType.ImageBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Image:
para_text += f"\n![]({join_path(img_buket_path, span['image_path'])}) \n"
for block in para_block['blocks']: # 2nd.拼image_caption
if block['type'] == BlockType.ImageCaption:
para_text += merge_para_with_text(block)
for block in para_block['blocks']: # 2nd.拼image_caption
if block['type'] == BlockType.ImageFootnote:
para_text += merge_para_with_text(block)
elif para_type == BlockType.Table:
if mode == 'nlp':
continue
elif mode == 'mm':
for block in para_block['blocks']: # 1st.拼table_caption
if block['type'] == BlockType.TableCaption:
para_text += merge_para_with_text(block)
for block in para_block['blocks']: # 2nd.拼table_body
if block['type'] == BlockType.TableBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Table:
# if processed by table model
if span.get('latex', ''):
para_text += f"\n\n$\n {span['latex']}\n$\n\n"
elif span.get('html', ''):
para_text += f"\n\n{span['html']}\n\n"
else:
para_text += f"\n![]({join_path(img_buket_path, span['image_path'])}) \n"
for block in para_block['blocks']: # 3rd.拼table_footnote
if block['type'] == BlockType.TableFootnote:
para_text += merge_para_with_text(block)
if para_text.strip() == '':
continue
else:
page_markdown.append(para_text.strip() + ' ')
return page_markdown
...@@ -13,6 +13,8 @@ BaseConfig: &base ...@@ -13,6 +13,8 @@ BaseConfig: &base
PDF_ANALYSIS_FOLDER: "analysis_pdf" PDF_ANALYSIS_FOLDER: "analysis_pdf"
# 前端项目打包的路径 # 前端项目打包的路径
REACT_APP_DIST: "../../web/dist/" REACT_APP_DIST: "../../web/dist/"
# 文件访问路径
FILE_API: "/api/v2/analysis/pdf_img?as_attachment=False"
# 开发配置 # 开发配置
DevelopmentConfig: DevelopmentConfig:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment