Commit c2e5c36f authored by 赵小蒙's avatar 赵小蒙
Browse files

Initial commit

parents
import json
import brotli
import base64
class JsonCompressor:
@staticmethod
def compress_json(data):
"""
Compress a json object and encode it with base64
"""
json_str = json.dumps(data)
json_bytes = json_str.encode('utf-8')
compressed = brotli.compress(json_bytes, quality=6)
compressed_str = base64.b64encode(compressed).decode('utf-8') # convert bytes to string
return compressed_str
@staticmethod
def decompress_json(compressed_str):
"""
Decode the base64 string and decompress the json object
"""
compressed = base64.b64decode(compressed_str.encode('utf-8')) # convert string to bytes
decompressed_bytes = brotli.decompress(compressed)
json_str = decompressed_bytes.decode('utf-8')
data = json.loads(json_str)
return data
import pycld2 as cld2
import regex
import unicodedata
RE_BAD_CHARS = regex.compile(r"\p{Cc}|\p{Cs}")
def remove_bad_chars(text):
return RE_BAD_CHARS.sub("", text)
def detect_lang(text: str) -> str:
if len(text) == 0:
return ""
try:
_, _, details = cld2.detect(text)
except:
# cld2 doesn't like control characters
# https://github.com/mikemccand/chromium-compact-language-detector/issues/22#issuecomment-435904616
html_no_ctrl_chars = ''.join([l for l in text if unicodedata.category(l)[0] not in ['C',]])
_, _, details = cld2.detect(html_no_ctrl_chars)
lang = ""
try:
lang = details[0][1].lower()
except:
lang = ""
return lang
if __name__ == '__main__':
print(detect_lang("This is a test."))
print(detect_lang("<html>This is a test</html>"))
print(detect_lang("这个是中文测试。"))
print(detect_lang("<html>这个是中文测试。</html>"))
\ No newline at end of file
import re
def escape_special_markdown_char(pymu_blocks):
"""
转义正文里对markdown语法有特殊意义的字符
"""
special_chars = ["*", "`", "~", "$"]
for blk in pymu_blocks:
for line in blk['lines']:
for span in line['spans']:
for char in special_chars:
span_text = span['text']
span_type = span.get("_type", None)
if span_type in ['inline-equation', 'interline-equation']:
continue
elif span_text:
span['text'] = span['text'].replace(char, "\\" + char)
return pymu_blocks
import re
from os import path
from collections import Counter
from loguru import logger
# from langdetect import detect
import spacy
import en_core_web_sm
import zh_core_web_sm
from libs.language import detect_lang
class NLPModels:
"""
How to upload local models to s3:
- config aws cli:
doc\SETUP-CLI.md
doc\setup_cli.sh
app\config\__init__.py
- $ cd {local_dir_storing_models}
- $ ls models
en_core_web_sm-3.7.1/
zh_core_web_sm-3.7.0/
- $ aws s3 sync models/ s3://llm-infra/models --profile=p_project_norm
- $ aws s3 --profile=p_project_norm ls s3://llm-infra/models/
PRE en_core_web_sm-3.7.1/
PRE zh_core_web_sm-3.7.0/
"""
def __init__(self):
# if OS is windows, set "TMP_DIR" to "D:/tmp"
home_dir = path.expanduser("~")
self.default_local_path = path.join(home_dir, ".nlp_models")
self.default_shared_path = "/share/pdf_processor/nlp_models"
self.default_hdfs_path = "hdfs://pdf_processor/nlp_models"
self.default_s3_path = "s3://llm-infra/models"
self.nlp_models = self.nlp_models = {
"en_core_web_sm": {
"type": "spacy",
"version": "3.7.1",
},
"en_core_web_md": {
"type": "spacy",
"version": "3.7.1",
},
"en_core_web_lg": {
"type": "spacy",
"version": "3.7.1",
},
"zh_core_web_sm": {
"type": "spacy",
"version": "3.7.0",
},
"zh_core_web_md": {
"type": "spacy",
"version": "3.7.0",
},
"zh_core_web_lg": {
"type": "spacy",
"version": "3.7.0",
},
}
self.en_core_web_sm_model = en_core_web_sm.load()
self.zh_core_web_sm_model = zh_core_web_sm.load()
def load_model(self, model_name, model_type, model_version):
if (
model_name in self.nlp_models
and self.nlp_models[model_name]["type"] == model_type
and self.nlp_models[model_name]["version"] == model_version
):
return spacy.load(model_name) if spacy.util.is_package(model_name) else None
else:
logger.error(f"Unsupported model name or version: {model_name} {model_version}")
return None
def detect_language(self, text, use_langdetect=False):
if len(text) == 0:
return None
if use_langdetect:
# print("use_langdetect")
# print(detect_lang(text))
# return detect_lang(text)
if detect_lang(text) == "zh":
return "zh"
else:
return "en"
if not use_langdetect:
en_count = len(re.findall(r"[a-zA-Z]", text))
cn_count = len(re.findall(r"[\u4e00-\u9fff]", text))
if en_count > cn_count:
return "en"
if cn_count > en_count:
return "zh"
def detect_entity_catgr_using_nlp(self, text, threshold=0.5):
"""
Detect entity categories using NLP models and return the most frequent entity types.
Parameters
----------
text : str
Text to be processed.
Returns
-------
str
The most frequent entity type.
"""
lang = self.detect_language(text, use_langdetect=True)
if lang == "en":
nlp_model = self.en_core_web_sm_model
elif lang == "zh":
nlp_model = self.zh_core_web_sm_model
else:
# logger.error(f"Unsupported language: {lang}")
return {}
# Splitting text into smaller parts
text_parts = re.split(r"[,;,;、\s & |]+", text)
text_parts = [part for part in text_parts if not re.match(r"[\d\W]+", part)] # Remove non-words
text_combined = " ".join(text_parts)
try:
doc = nlp_model(text_combined)
entity_counts = Counter([ent.label_ for ent in doc.ents])
word_counts_in_entities = Counter()
for ent in doc.ents:
word_counts_in_entities[ent.label_] += len(ent.text.split())
total_words_in_entities = sum(word_counts_in_entities.values())
total_words = len([token for token in doc if not token.is_punct])
if total_words_in_entities == 0 or total_words == 0:
return None
entity_percentage = total_words_in_entities / total_words
if entity_percentage < 0.5:
return None
most_common_entity, word_count = word_counts_in_entities.most_common(1)[0]
entity_percentage = word_count / total_words_in_entities
if entity_percentage >= threshold:
return most_common_entity
else:
return None
except Exception as e:
logger.error(f"Error in entity detection: {e}")
return None
def __main__():
nlpModel = NLPModels()
test_strings = [
"张三",
"张三, 李四,王五; 赵六",
"John Doe",
"Jane Smith",
"Lee, John",
"John Doe, Jane Smith; Alice Johnson,Bob Lee",
"孙七, Michael Jordan;赵八",
"David Smith Michael O'Connor; Kevin ßáçøñ",
"李雷·韩梅梅, 张三·李四",
"Charles Robert Darwin, Isaac Newton",
"莱昂纳多·迪卡普里奥, 杰克·吉伦哈尔",
"John Doe, Jane Smith; Alice Johnson",
"张三, 李四,王五; 赵六",
"Lei Wang, Jia Li, and Xiaojun Chen, LINKE YANG OU, and YUAN ZHANG",
"Rachel Mills & William Barry & Susanne B. Haga",
"Claire Chabut* and Jean-François Bussières",
"1 Department of Chemistry, Northeastern University, Shenyang 110004, China 2 State Key Laboratory of Polymer Physics and Chemistry, Changchun Institute of Applied Chemistry, Chinese Academy of Sciences, Changchun 130022, China",
"Changchun",
"china",
"Rongjun Song, 1,2 Baoyan Zhang, 1 Baotong Huang, 2 Tao Tang 2",
"Synergistic Effect of Supported Nickel Catalyst with Intumescent Flame-Retardants on Flame Retardancy and Thermal Stability of Polypropylene",
"Synergistic Effect of Supported Nickel Catalyst with",
"Intumescent Flame-Retardants on Flame Retardancy",
"and Thermal Stability of Polypropylene",
]
for test in test_strings:
print()
print(f"Original String: {test}")
result = nlpModel.detect_entity_catgr_using_nlp(test)
print(f"Detected entities: {result}")
if __name__ == "__main__":
__main__()
import os
from pathlib import Path
from typing import Tuple
import io
# from app.common.s3 import get_s3_client
from libs.commons import fitz
from loguru import logger
from libs.commons import parse_bucket_key, join_path
def cut_image(bbox: Tuple, page_num: int, page: fitz.Page, save_parent_path: str, s3_return_path=None, img_s3_client=None, upload_switch=True):
"""
从第page_num页的page中,根据bbox进行裁剪出一张jpg图片,返回图片路径
save_path:需要同时支持s3和本地, 图片存放在save_path下,文件名是: {page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg , bbox内数字取整。
"""
# 拼接文件名
filename = f"{page_num}_{int(bbox[0])}_{int(bbox[1])}_{int(bbox[2])}_{int(bbox[3])}.jpg"
# 拼接路径
image_save_path = join_path(save_parent_path, filename)
s3_img_path = join_path(s3_return_path, filename) if s3_return_path is not None else None
# 打印图片文件名
# print(f"Saved {image_save_path}")
#检查坐标
# x_check = int(bbox[2]) - int(bbox[0])
# y_check = int(bbox[3]) - int(bbox[1])
# if x_check <= 0 or y_check <= 0:
#
# if image_save_path.startswith("s3://"):
# logger.exception(f"传入图片坐标有误,x1<x0或y1<y0,{s3_img_path}")
# return s3_img_path
# else:
# logger.exception(f"传入图片坐标有误,x1<x0或y1<y0,{image_save_path}")
# return image_save_path
# 将坐标转换为fitz.Rect对象
rect = fitz.Rect(*bbox)
# 配置缩放倍数为3倍
zoom = fitz.Matrix(3, 3)
# 截取图片
pix = page.get_pixmap(clip=rect, matrix=zoom)
if image_save_path.startswith("s3://"):
if not upload_switch:
pass
else:
# 图片保存到s3
bucket_name, bucket_key = parse_bucket_key(image_save_path)
# 将字节流上传到s3
byte_data = pix.tobytes(output='jpeg', jpg_quality=95)
file_obj = io.BytesIO(byte_data)
if img_s3_client is not None:
img_s3_client.upload_fileobj(file_obj, bucket_name, bucket_key)
# 每个图片上传任务都创建一个新的client
# img_s3_client_once = get_s3_client(image_save_path)
# img_s3_client_once.upload_fileobj(file_obj, bucket_name, bucket_key)
else:
logger.exception("must input img_s3_client")
return s3_img_path
else:
# 保存图片到本地
# 先检查一下image_save_path的父目录是否存在,如果不存在,就创建
parent_dir = os.path.dirname(image_save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
pix.save(image_save_path, jpg_quality=95)
# 为了直接能在markdown里看,这里把地址改为相对于mardown的地址
pth = Path(image_save_path)
image_save_path = f"{pth.parent.name}/{pth.name}"
return image_save_path
def save_images_by_bboxes(book_name: str, page_num: int, page: fitz.Page, save_path: str,
image_bboxes: list, images_overlap_backup:list, table_bboxes: list, equation_inline_bboxes: list,
equation_interline_bboxes: list, img_s3_client) -> dict:
"""
返回一个dict, key为bbox, 值是图片地址
"""
image_info = []
image_backup_info = []
table_info = []
inline_eq_info = []
interline_eq_info = []
# 图片的保存路径组成是这样的: {s3_or_local_path}/{book_name}/{images|tables|equations}/{page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg
s3_return_image_path = join_path(book_name, "images")
image_save_path = join_path(save_path, s3_return_image_path)
s3_return_table_path = join_path(book_name, "tables")
table_save_path = join_path(save_path, s3_return_table_path)
s3_return_equations_inline_path = join_path(book_name, "equations_inline")
equation_inline_save_path = join_path(save_path, s3_return_equations_inline_path)
s3_return_equation_interline_path = join_path(book_name, "equation_interline")
equation_interline_save_path = join_path(save_path, s3_return_equation_interline_path)
for bbox in image_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
logger.warning(f"image_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox, page_num, page, image_save_path, s3_return_image_path, img_s3_client)
image_info.append({"bbox": bbox, "image_path": image_path})
for bbox in images_overlap_backup:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
logger.warning(f"images_overlap_backup: 错误的box, {bbox}")
continue
image_path = cut_image(bbox, page_num, page, image_save_path, s3_return_image_path, img_s3_client)
image_backup_info.append({"bbox": bbox, "image_path": image_path})
for bbox in table_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
logger.warning(f"table_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox, page_num, page, table_save_path, s3_return_table_path, img_s3_client)
table_info.append({"bbox": bbox, "image_path": image_path})
for bbox in equation_inline_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
logger.warning(f"equation_inline_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox[:4], page_num, page, equation_inline_save_path, s3_return_equations_inline_path, img_s3_client, upload_switch=False)
inline_eq_info.append({'bbox':bbox[:4], "image_path":image_path, "latex_text":bbox[4]})
for bbox in equation_interline_bboxes:
if any([bbox[0]>=bbox[2], bbox[1]>=bbox[3]]):
logger.warning(f"equation_interline_bboxes: 错误的box, {bbox}")
continue
image_path = cut_image(bbox[:4], page_num, page, equation_interline_save_path, s3_return_equation_interline_path, img_s3_client, upload_switch=False)
interline_eq_info.append({"bbox":bbox[:4], "image_path":image_path, "latex_text":bbox[4]})
return image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info
\ No newline at end of file
import os
def sanitize_filename(filename, replacement="_"):
if os.name == 'nt':
invalid_chars = '<>:"|?*'
for char in invalid_chars:
filename = filename.replace(char, replacement)
return filename
import math
def __inc_dict_val(mp, key, val_inc:int):
if mp.get(key):
mp[key] = mp[key] + val_inc
else:
mp[key] = val_inc
def get_text_block_base_info(block):
"""
获取这个文本块里的字体的颜色、字号、字体
按照正文字数最多的返回
"""
counter = {}
for line in block['lines']:
for span in line['spans']:
color = span['color']
size = round(span['size'], 2)
font = span['font']
txt_len = len(span['text'])
__inc_dict_val(counter, (color, size, font), txt_len)
c, s, ft = max(counter, key=counter.get)
return c, s, ft
\ No newline at end of file
from libs.commons import fitz
import os
from loguru import logger
from layout.bbox_sort import CONTENT_TYPE_IDX
def draw_bbox_on_page(raw_pdf_doc: fitz.Document, paras_dict:dict, save_path: str):
"""
在page上画出bbox,保存到save_path
"""
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(save_path):
# 打开现有的 PDF 文件
doc = fitz.open(save_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open('')
color_map = {
'image': fitz.pdfcolor["yellow"],
'text': fitz.pdfcolor['blue'],
"table": fitz.pdfcolor['green']
}
for k, v in paras_dict.items():
page_idx = v['page_idx']
width = raw_pdf_doc[page_idx].rect.width
height = raw_pdf_doc[page_idx].rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for order, block in enumerate(v['preproc_blocks']):
rect = fitz.Rect(block['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=color_map['text'], fill_opacity=0.2)
shape.finish()
shape.commit()
for img in v['images']:
# 原始box画上去
rect = fitz.Rect(img['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'])
shape.finish()
shape.commit()
for img in v['image_backup']:
# 原始box画上去
rect = fitz.Rect(img['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['yellow'], fill=None)
shape.finish()
shape.commit()
for tb in v['droped_text_block']:
# 原始box画上去
rect = fitz.Rect(tb['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['black'], fill_opacity=0.4)
shape.finish()
shape.commit()
# TODO table
for tb in v['tables']:
rect = fitz.Rect(tb['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['green'], fill_opacity=0.2)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if is_new_pdf:
doc.save(save_path)
else:
doc.saveIncr()
doc.close()
def debug_show_bbox(raw_pdf_doc: fitz.Document, page_idx: int, bboxes: list, droped_bboxes:list, expect_drop_bboxes:list, save_path: str, expected_page_id:int):
"""
以覆盖的方式写个临时的pdf,用于debug
"""
if page_idx!=expected_page_id:
return
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open('')
width = raw_pdf_doc[page_idx].rect.width
height = raw_pdf_doc[page_idx].rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['blue'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in droped_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in expect_drop_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=None)
shape.finish()
shape.commit()
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(bboxes)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def debug_show_page(page, bboxes1: list,bboxes2: list,bboxes3: list,):
save_path = "./tmp/debug.pdf"
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open('')
width = page.rect.width
height = page.rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes1:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['blue'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes2:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes3:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=None)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def draw_layout_bbox_on_page(raw_pdf_doc: fitz.Document, paras_dict:dict, header, footer, pdf_path: str):
"""
在page上画出bbox,保存到save_path
"""
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open('')
for k, v in paras_dict.items():
page_idx = v['page_idx']
layouts = v['layout_bboxes']
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(layouts):
border_offset = 1
rect_box = layout['layout_bbox']
layout_label = layout['layout_label']
fill_color = fitz.pdfcolor['pink'] if layout_label=='U' else None
rect_box = [rect_box[0]+1, rect_box[1]-border_offset, rect_box[2]-1, rect_box[3]+border_offset]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fill_color, fill_opacity=0.4)
"""
draw order text on layout box
"""
font_size = 10
shape.insert_text((rect_box[0] + 1, rect_box[1] + font_size), f"{order}", fontsize=font_size, color=(0, 0, 0))
"""画上footer header"""
if header:
shape.draw_rect(fitz.Rect(header))
shape.finish(color=None, fill=fitz.pdfcolor['black'], fill_opacity=0.2)
if footer:
shape.draw_rect(fitz.Rect(footer))
shape.finish(color=None, fill=fitz.pdfcolor['black'], fill_opacity=0.2)
shape.commit()
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
@DeprecationWarning
def draw_layout_on_page(raw_pdf_doc: fitz.Document, page_idx: int, page_layout: list, pdf_path: str):
"""
把layout的box用红色边框花在pdf_path的page_idx上
"""
def draw(shape, layout, fill_color=fitz.pdfcolor['pink']):
border_offset = 1
rect_box = layout['layout_bbox']
layout_label = layout['layout_label']
sub_layout = layout['sub_layout']
if len(sub_layout)==0:
fill_color = fill_color if layout_label=='U' else None
rect_box = [rect_box[0]+1, rect_box[1]-border_offset, rect_box[2]-1, rect_box[3]+border_offset]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fill_color, fill_opacity=0.2)
# if layout_label=='U':
# bad_boxes = layout.get("bad_boxes", [])
# for bad_box in bad_boxes:
# rect = fitz.Rect(*bad_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['red'], fill_opacity=0.2)
# else:
# rect = fitz.Rect(*rect_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['blue'])
for sub_layout in sub_layout:
draw(shape, sub_layout)
shape.commit()
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open('')
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(page_layout):
draw(shape, layout, fitz.pdfcolor['yellow'])
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(layout)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(pdf_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
\ No newline at end of file
import re
import math
from loguru import logger
from libs.boxbase import find_bottom_nearest_text_bbox, find_top_nearest_text_bbox
def mk_nlp_markdown(para_dict: dict):
"""
对排序后的bboxes拼接内容
"""
content_lst = []
for _, page_info in para_dict.items():
para_blocks = page_info.get("para_blocks")
if not para_blocks:
continue
for block in para_blocks:
item = block["paras"]
for _, p in item.items():
para_text = p["para_text"]
is_title = p["is_para_title"]
title_level = p['para_title_level']
md_title_prefix = "#"*title_level
if is_title:
content_lst.append(f"{md_title_prefix} {para_text}")
else:
content_lst.append(para_text)
content_text = "\n\n".join(content_lst)
return content_text
# 找到目标字符串在段落中的索引
def __find_index(paragraph, target):
index = paragraph.find(target)
if index != -1:
return index
else:
return None
def __insert_string(paragraph, target, postion):
new_paragraph = paragraph[:postion] + target + paragraph[postion:]
return new_paragraph
def __insert_after(content, image_content, target):
"""
在content中找到target,将image_content插入到target后面
"""
index = content.find(target)
if index != -1:
content = content[:index+len(target)] + "\n\n" + image_content + "\n\n" + content[index+len(target):]
else:
logger.error(f"Can't find the location of image {image_content} in the markdown file, search target is {target}")
return content
def __insert_before(content, image_content, target):
"""
在content中找到target,将image_content插入到target前面
"""
index = content.find(target)
if index != -1:
content = content[:index] + "\n\n" + image_content + "\n\n" + content[index:]
else:
logger.error(f"Can't find the location of image {image_content} in the markdown file, search target is {target}")
return content
def mk_mm_markdown(para_dict: dict):
"""拼装多模态markdown"""
content_lst = []
for _, page_info in para_dict.items():
page_lst = [] # 一个page内的段落列表
para_blocks = page_info.get("para_blocks")
pymu_raw_blocks = page_info.get("preproc_blocks")
all_page_images = []
all_page_images.extend(page_info.get("images",[]))
all_page_images.extend(page_info.get("image_backup", []) )
all_page_images.extend(page_info.get("tables",[]))
all_page_images.extend(page_info.get("table_backup",[]) )
if not para_blocks or not pymu_raw_blocks: # 只有图片的拼接的场景
for img in all_page_images:
page_lst.append(f"![]({img['image_path']})") # TODO 图片顺序
page_md = "\n\n".join(page_lst)
else:
for block in para_blocks:
item = block["paras"]
for _, p in item.items():
para_text = p["para_text"]
is_title = p["is_para_title"]
title_level = p['para_title_level']
md_title_prefix = "#"*title_level
if is_title:
page_lst.append(f"{md_title_prefix} {para_text}")
else:
page_lst.append(para_text)
"""拼装成一个页面的文本"""
page_md = "\n\n".join(page_lst)
"""插入图片"""
for img in all_page_images:
imgbox = img['bbox']
img_content = f"![]({img['image_path']})"
# 先看在哪个block内
for block in pymu_raw_blocks:
bbox = block['bbox']
if bbox[0]-1 <= imgbox[0] < bbox[2]+1 and bbox[1]-1 <= imgbox[1] < bbox[3]+1:# 确定在block内
for l in block['lines']:
line_box = l['bbox']
if line_box[0]-1 <= imgbox[0] < line_box[2]+1 and line_box[1]-1 <= imgbox[1] < line_box[3]+1: # 在line内的,插入line前面
line_txt = "".join([s['text'] for s in l['spans']])
page_md = __insert_before(page_md, img_content, line_txt)
break
break
else:# 在行与行之间
# 找到图片x0,y0与line的x0,y0最近的line
min_distance = 100000
min_line = None
for l in block['lines']:
line_box = l['bbox']
distance = math.sqrt((line_box[0] - imgbox[0])**2 + (line_box[1] - imgbox[1])**2)
if distance < min_distance:
min_distance = distance
min_line = l
if min_line:
line_txt = "".join([s['text'] for s in min_line['spans']])
img_h = imgbox[3] - imgbox[1]
if min_distance<img_h: # 文字在图片前面
page_md = __insert_after(page_md, img_content, line_txt)
else:
page_md = __insert_before(page_md, img_content, line_txt)
else:
logger.error(f"Can't find the location of image {img['image_path']} in the markdown file")
else:# 应当在两个block之间
# 找到上方最近的block,如果上方没有就找大下方最近的block
top_txt_block = find_top_nearest_text_bbox(pymu_raw_blocks, imgbox)
if top_txt_block:
line_txt = "".join([s['text'] for s in top_txt_block['lines'][-1]['spans']])
page_md = __insert_after(page_md, img_content, line_txt)
else:
bottom_txt_block = find_bottom_nearest_text_bbox(pymu_raw_blocks, imgbox)
if bottom_txt_block:
line_txt = "".join([s['text'] for s in bottom_txt_block['lines'][0]['spans']])
page_md = __insert_before(page_md, img_content, line_txt)
else:
logger.error(f"Can't find the location of image {img['image_path']} in the markdown file")
content_lst.append(page_md)
"""拼装成全部页面的文本"""
content_text = "\n\n".join(content_lst)
return content_text
@DeprecationWarning
def mk_mm_markdown_1(para_dict: dict):
"""
得到images和tables变量
"""
image_all_list = []
for _, page_info in para_dict.items():
images = page_info.get("images",[])
tables = page_info.get("tables",[])
image_backup = page_info.get("image_backup", [])
table_backup = page_info.get("table_backup",[])
all_page_images = []
all_page_images.extend(images)
all_page_images.extend(image_backup)
all_page_images.extend(tables)
all_page_images.extend(table_backup)
pymu_raw_blocks = page_info.get("pymu_raw_blocks")
# 提取每个图片所在位置
for image_info in all_page_images:
x0_image, y0_image, x1_image, y1_image = image_info['bbox'][:4]
image_path = image_info['image_path']
# 判断图片处于原始PDF中哪个模块之间
image_internal_dict = {}
image_external_dict = {}
between_dict = {}
for block in pymu_raw_blocks:
x0, y0, x1, y1 = block['bbox'][:4]
# 在某个模块内部
if x0 <= x0_image < x1 and y0 <= y0_image < y1:
image_internal_dict['bbox'] = [x0_image, y0_image, x1_image, y1_image]
image_internal_dict['path'] = image_path
# 确定图片在哪句文本之前
y_pre = 0
for line in block['lines']:
x0, y0, x1, y1 = line['spans'][0]['bbox']
if x0 <= x0_image < x1 and y_pre <= y0_image < y0:
text = line['spans']['text']
image_internal_dict['text'] = text
image_internal_dict['markdown_image'] = f'![image_path]({image_path})'
break
else:
y_pre = y0
# 在某两个模块之间
elif x0 <= x0_image < x1:
distance = math.sqrt((x1_image - x0)**2 + (y1_image - y0)**2)
between_dict[block['number']] = distance
# 找到与定位点距离最小的文本block
if between_dict:
min_key = min(between_dict, key=between_dict.get)
spans_list = []
for span in pymu_raw_blocks[min_key]['lines']:
for text_piece in span['spans']:
# 防止索引定位文本内容过多
if len(spans_list) < 60:
spans_list.append(text_piece['text'])
text1 = ''.join(spans_list)
image_external_dict['bbox'] = [x0_image, y0_image, x1_image, y1_image]
image_external_dict['path'] = image_path
image_external_dict['text'] = text1
image_external_dict['markdown_image'] = f'![image_path]({image_path})'
# 将内部图片或外部图片存入当页所有图片的列表
if len(image_internal_dict) != 0:
image_all_list.append(image_internal_dict)
elif len(image_external_dict) != 0:
image_all_list.append(image_external_dict)
else:
logger.error(f"Can't find the location of image {image_path} in the markdown file")
content_text = mk_nlp_markdown(para_dict)
for image_info_extract in image_all_list:
loc = __find_index(content_text, image_info_extract['text'])
if loc is not None:
content_text = __insert_string(content_text, image_info_extract['markdown_image'], loc)
else:
logger.error(f"Can't find the location of image {image_info_extract['path']} in the markdown file")
return content_text
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
import sys
from libs.commons import fitz
from termcolor import cprint
if sys.version_info[0] >= 3:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore
def open_pdf(pdf_path):
try:
pdf_document = fitz.open(pdf_path) # type: ignore
return pdf_document
except Exception as e:
print(f"无法打开PDF文件:{pdf_path}。原因是:{e}")
raise e
def print_green_on_red(text):
cprint(text, "green", "on_red", attrs=["bold"], end="\n\n")
def print_green(text):
print()
cprint(text, "green", attrs=["bold"], end="\n\n")
def print_red(text):
print()
cprint(text, "red", attrs=["bold"], end="\n\n")
def print_yellow(text):
print()
cprint(text, "yellow", attrs=["bold"], end="\n\n")
def safe_get(dict_obj, key, default):
val = dict_obj.get(key)
if val is None:
return default
else:
return val
def is_bbox_overlap(bbox1, bbox2):
"""
This function checks if bbox1 and bbox2 overlap or not
Parameters
----------
bbox1 : list
bbox1
bbox2 : list
bbox2
Returns
-------
bool
True if bbox1 and bbox2 overlap, else False
"""
x0_1, y0_1, x1_1, y1_1 = bbox1
x0_2, y0_2, x1_2, y1_2 = bbox2
if x0_1 > x1_2 or x0_2 > x1_1:
return False
if y0_1 > y1_2 or y0_2 > y1_1:
return False
return True
def is_in_bbox(bbox1, bbox2):
"""
This function checks if bbox1 is in bbox2
Parameters
----------
bbox1 : list
bbox1
bbox2 : list
bbox2
Returns
-------
bool
True if bbox1 is in bbox2, else False
"""
x0_1, y0_1, x1_1, y1_1 = bbox1
x0_2, y0_2, x1_2, y1_2 = bbox2
if x0_1 >= x0_2 and y0_1 >= y0_2 and x1_1 <= x1_2 and y1_1 <= y1_2:
return True
else:
return False
def calculate_para_bbox(lines):
"""
This function calculates the minimum bbox of the paragraph
Parameters
----------
lines : list
lines
Returns
-------
para_bbox : list
bbox of the paragraph
"""
x0 = min(line["bbox"][0] for line in lines)
y0 = min(line["bbox"][1] for line in lines)
x1 = max(line["bbox"][2] for line in lines)
y1 = max(line["bbox"][3] for line in lines)
return [x0, y0, x1, y1]
def is_line_right_aligned_from_neighbors(curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, direction=2):
"""
This function checks if the line is right aligned from its neighbors
Parameters
----------
curr_line_bbox : list
bbox of the current line
prev_line_bbox : list
bbox of the previous line
next_line_bbox : list
bbox of the next line
avg_char_width : float
average of char widths
direction : int
0 for prev, 1 for next, 2 for both
Returns
-------
bool
True if the line is right aligned from its neighbors, False otherwise.
"""
horizontal_ratio = 0.5
horizontal_thres = horizontal_ratio * avg_char_width
_, _, x1, _ = curr_line_bbox
_, _, prev_x1, _ = prev_line_bbox if prev_line_bbox else (0, 0, 0, 0)
_, _, next_x1, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
if direction == 0:
return abs(x1 - prev_x1) < horizontal_thres
elif direction == 1:
return abs(x1 - next_x1) < horizontal_thres
elif direction == 2:
return abs(x1 - prev_x1) < horizontal_thres and abs(x1 - next_x1) < horizontal_thres
else:
return False
def is_line_left_aligned_from_neighbors(curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, direction=2):
"""
This function checks if the line is left aligned from its neighbors
Parameters
----------
curr_line_bbox : list
bbox of the current line
prev_line_bbox : list
bbox of the previous line
next_line_bbox : list
bbox of the next line
avg_char_width : float
average of char widths
direction : int
0 for prev, 1 for next, 2 for both
Returns
-------
bool
True if the line is left aligned from its neighbors, False otherwise.
"""
horizontal_ratio = 0.5
horizontal_thres = horizontal_ratio * avg_char_width
x0, _, _, _ = curr_line_bbox
prev_x0, _, _, _ = prev_line_bbox if prev_line_bbox else (0, 0, 0, 0)
next_x0, _, _, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
if direction == 0:
return abs(x0 - prev_x0) < horizontal_thres
elif direction == 1:
return abs(x0 - next_x0) < horizontal_thres
elif direction == 2:
return abs(x0 - prev_x0) < horizontal_thres and abs(x0 - next_x0) < horizontal_thres
else:
return False
def end_with_punctuation(line_text):
"""
This function checks if the line ends with punctuation marks
"""
english_end_puncs = [".", "?", "!"]
chinese_end_puncs = ["。", "?", "!"]
end_puncs = english_end_puncs + chinese_end_puncs
last_non_space_char = None
for ch in line_text[::-1]:
if not ch.isspace():
last_non_space_char = ch
break
if last_non_space_char is None:
return False
return last_non_space_char in end_puncs
def is_nested_list(lst):
if isinstance(lst, list):
return any(isinstance(sub, list) for sub in lst)
return False
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import sys
import math
from para.commons import *
if sys.version_info[0] >= 3:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore
class LayoutFilterProcessor:
def __init__(self) -> None:
pass
def batch_process_blocks(self, pdf_dict):
for page_id, blocks in pdf_dict.items():
if page_id.startswith("page_"):
if "layout_bboxes" in blocks.keys() and "para_blocks" in blocks.keys():
layout_bbox_objs = blocks["layout_bboxes"]
if layout_bbox_objs is None:
continue
layout_bboxes = [bbox_obj["layout_bbox"] for bbox_obj in layout_bbox_objs]
# Use math.ceil function to enlarge each value of x0, y0, x1, y1 of each layout_bbox
layout_bboxes = [
[math.ceil(x0), math.ceil(y0), math.ceil(x1), math.ceil(y1)] for x0, y0, x1, y1 in layout_bboxes
]
para_blocks = blocks["para_blocks"]
if para_blocks is None:
continue
for lb_bbox in layout_bboxes:
for i, para_block in enumerate(para_blocks):
para_bbox = para_block["bbox"]
para_blocks[i]["in_layout"] = 0
if is_in_bbox(para_bbox, lb_bbox):
para_blocks[i]["in_layout"] = 1
blocks["para_blocks"] = para_blocks
return pdf_dict
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment