Unverified Commit 158e556b authored by Xiaomeng Zhao's avatar Xiaomeng Zhao Committed by GitHub
Browse files

Merge pull request #1063 from opendatalab/release-0.10.0

Release 0.10.0
parents 038f48d3 30be5017
from loguru import logger
from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
from magic_pdf.config.make_content_config import DropMode, MakeMode
from magic_pdf.data.data_reader_writer import DataWriter
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.pipe.AbsPipe import AbsPipe
from magic_pdf.user_api import parse_txt_pdf
class TXTPipe(AbsPipe):
def __init__(self, pdf_bytes: bytes, model_list: list, image_writer: AbsReaderWriter, is_debug: bool = False,
def __init__(self, pdf_bytes: bytes, model_list: list, image_writer: DataWriter, is_debug: bool = False,
start_page_id=0, end_page_id=None, lang=None,
layout_model=None, formula_enable=None, table_enable=None):
super().__init__(pdf_bytes, model_list, image_writer, is_debug, start_page_id, end_page_id, lang,
......@@ -33,10 +32,10 @@ class TXTPipe(AbsPipe):
def pipe_mk_uni_format(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF):
result = super().pipe_mk_uni_format(img_parent_path, drop_mode)
logger.info("txt_pipe mk content list finished")
logger.info('txt_pipe mk content list finished')
return result
def pipe_mk_markdown(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF, md_make_mode=MakeMode.MM_MD):
result = super().pipe_mk_markdown(img_parent_path, drop_mode, md_make_mode)
logger.info(f"txt_pipe mk {md_make_mode} finished")
logger.info(f'txt_pipe mk {md_make_mode} finished')
return result
......@@ -2,22 +2,21 @@ import json
from loguru import logger
from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.config.make_content_config import DropMode, MakeMode
from magic_pdf.data.data_reader_writer import DataWriter
from magic_pdf.libs.commons import join_path
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.pipe.AbsPipe import AbsPipe
from magic_pdf.user_api import parse_union_pdf, parse_ocr_pdf
from magic_pdf.user_api import parse_ocr_pdf, parse_union_pdf
class UNIPipe(AbsPipe):
def __init__(self, pdf_bytes: bytes, jso_useful_key: dict, image_writer: AbsReaderWriter, is_debug: bool = False,
def __init__(self, pdf_bytes: bytes, jso_useful_key: dict, image_writer: DataWriter, is_debug: bool = False,
start_page_id=0, end_page_id=None, lang=None,
layout_model=None, formula_enable=None, table_enable=None):
self.pdf_type = jso_useful_key["_pdf_type"]
super().__init__(pdf_bytes, jso_useful_key["model_list"], image_writer, is_debug, start_page_id, end_page_id,
self.pdf_type = jso_useful_key['_pdf_type']
super().__init__(pdf_bytes, jso_useful_key['model_list'], image_writer, is_debug, start_page_id, end_page_id,
lang, layout_model, formula_enable, table_enable)
if len(self.model_list) == 0:
self.input_model_is_empty = True
......@@ -54,27 +53,28 @@ class UNIPipe(AbsPipe):
def pipe_mk_uni_format(self, img_parent_path: str, drop_mode=DropMode.NONE_WITH_REASON):
result = super().pipe_mk_uni_format(img_parent_path, drop_mode)
logger.info("uni_pipe mk content list finished")
logger.info('uni_pipe mk content list finished')
return result
def pipe_mk_markdown(self, img_parent_path: str, drop_mode=DropMode.WHOLE_PDF, md_make_mode=MakeMode.MM_MD):
result = super().pipe_mk_markdown(img_parent_path, drop_mode, md_make_mode)
logger.info(f"uni_pipe mk {md_make_mode} finished")
logger.info(f'uni_pipe mk {md_make_mode} finished')
return result
if __name__ == '__main__':
# 测试
drw = DiskReaderWriter(r"D:/project/20231108code-clean")
from magic_pdf.data.data_reader_writer import DataReader
drw = DataReader(r'D:/project/20231108code-clean')
pdf_file_path = r"linshixuqiu\19983-00.pdf"
model_file_path = r"linshixuqiu\19983-00.json"
pdf_bytes = drw.read(pdf_file_path, AbsReaderWriter.MODE_BIN)
model_json_txt = drw.read(model_file_path, AbsReaderWriter.MODE_TXT)
pdf_file_path = r'linshixuqiu\19983-00.pdf'
model_file_path = r'linshixuqiu\19983-00.json'
pdf_bytes = drw.read(pdf_file_path)
model_json_txt = drw.read(model_file_path).decode()
model_list = json.loads(model_json_txt)
write_path = r"D:\project\20231108code-clean\linshixuqiu\19983-00"
img_bucket_path = "imgs"
img_writer = DiskReaderWriter(join_path(write_path, img_bucket_path))
write_path = r'D:\project\20231108code-clean\linshixuqiu\19983-00'
img_bucket_path = 'imgs'
img_writer = DataWriter(join_path(write_path, img_bucket_path))
# pdf_type = UNIPipe.classify(pdf_bytes)
# jso_useful_key = {
......@@ -83,8 +83,8 @@ if __name__ == '__main__':
# }
jso_useful_key = {
"_pdf_type": "",
"model_list": model_list
'_pdf_type': '',
'model_list': model_list
}
pipe = UNIPipe(pdf_bytes, jso_useful_key, img_writer)
pipe.pipe_classify()
......@@ -92,8 +92,7 @@ if __name__ == '__main__':
md_content = pipe.pipe_mk_markdown(img_bucket_path)
content_list = pipe.pipe_mk_uni_format(img_bucket_path)
md_writer = DiskReaderWriter(write_path)
md_writer.write(md_content, "19983-00.md", AbsReaderWriter.MODE_TXT)
md_writer.write(json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4), "19983-00.json",
AbsReaderWriter.MODE_TXT)
md_writer.write(str(content_list), "19983-00.txt", AbsReaderWriter.MODE_TXT)
md_writer = DataWriter(write_path)
md_writer.write_string('19983-00.md', md_content)
md_writer.write_string('19983-00.json', json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4))
md_writer.write_string('19983-00.txt', str(content_list))
from loguru import logger
from magic_pdf.config.drop_reason import DropReason
from magic_pdf.layout.layout_sort import get_columns_cnt_of_layout
from magic_pdf.libs.drop_reason import DropReason
def __is_pseudo_single_column(page_info) -> bool:
"""
判断一个页面是否伪单列。
"""判断一个页面是否伪单列。
Args:
page_info (dict): 页面信息字典,包括'_layout_tree'和'preproc_blocks'。
Returns:
Tuple[bool, Optional[str]]: 如果页面伪单列返回(True, extra_info),否则返回(False, None)。
"""
layout_tree = page_info['_layout_tree']
layout_column_width = get_columns_cnt_of_layout(layout_tree)
......@@ -41,27 +39,22 @@ def __is_pseudo_single_column(page_info) -> bool:
if num_lines > 20:
radio = num_satisfying_lines / num_lines
if radio >= 0.5:
extra_info = f"{{num_lines: {num_lines}, num_satisfying_lines: {num_satisfying_lines}}}"
extra_info = f'{{num_lines: {num_lines}, num_satisfying_lines: {num_satisfying_lines}}}'
block_text = []
for line in lines:
if line['spans']:
for span in line['spans']:
block_text.append(span['text'])
logger.warning(f"pseudo_single_column block_text: {block_text}")
logger.warning(f'pseudo_single_column block_text: {block_text}')
return True, extra_info
return False, None
def pdf_post_filter(page_info) -> tuple:
"""
return:(True|False, err_msg)
True, 如果pdf符合要求
False, 如果pdf不符合要求
"""
"""return:(True|False, err_msg) True, 如果pdf符合要求 False, 如果pdf不符合要求."""
bool_is_pseudo_single_column, extra_info = __is_pseudo_single_column(page_info)
if bool_is_pseudo_single_column:
return False, {"_need_drop": True, "_drop_reason": DropReason.PSEUDO_SINGLE_COLUMN, "extra_info": extra_info}
return False, {'_need_drop': True, '_drop_reason': DropReason.PSEUDO_SINGLE_COLUMN, 'extra_info': extra_info}
return True, None
\ No newline at end of file
return True, None
from loguru import logger
from magic_pdf.config.ocr_content_type import ContentType
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.libs.pdf_image_tools import cut_image
......@@ -29,9 +29,7 @@ def txt_save_images_by_bboxes(page_num: int, page, pdf_bytes_md5: str,
image_bboxes: list, images_overlap_backup: list, table_bboxes: list,
equation_inline_bboxes: list,
equation_interline_bboxes: list, imageWriter) -> dict:
"""
返回一个dict, key为bbox, 值是图片地址
"""
"""返回一个dict, key为bbox, 值是图片地址."""
image_info = []
image_backup_info = []
table_info = []
......@@ -46,26 +44,26 @@ def txt_save_images_by_bboxes(page_num: int, page, pdf_bytes_md5: str,
for bbox in image_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path("images"), imageWriter)
image_info.append({"bbox": bbox, "image_path": image_path})
image_path = cut_image(bbox, page_num, page, return_path('images'), imageWriter)
image_info.append({'bbox': bbox, 'image_path': image_path})
for bbox in images_overlap_backup:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path("images"), imageWriter)
image_backup_info.append({"bbox": bbox, "image_path": image_path})
image_path = cut_image(bbox, page_num, page, return_path('images'), imageWriter)
image_backup_info.append({'bbox': bbox, 'image_path': image_path})
for bbox in table_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path("tables"), imageWriter)
table_info.append({"bbox": bbox, "image_path": image_path})
image_path = cut_image(bbox, page_num, page, return_path('tables'), imageWriter)
table_info.append({'bbox': bbox, 'image_path': image_path})
return image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info
def check_img_bbox(bbox) -> bool:
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f"image_bboxes: 错误的box, {bbox}")
logger.warning(f'image_bboxes: 错误的box, {bbox}')
return False
return True
"""
对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果
"""
"""对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果."""
from magic_pdf.libs.commons import fitz
import json
import os
from pathlib import Path
from loguru import logger
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.config.ocr_content_type import ContentType
from magic_pdf.libs.commons import fitz
TYPE_INLINE_EQUATION = ContentType.InlineEquation
TYPE_INTERLINE_EQUATION = ContentType.InterlineEquation
def combine_chars_to_pymudict(block_dict, char_dict):
"""
把block级别的pymupdf 结构里加入char结构
"""
"""把block级别的pymupdf 结构里加入char结构."""
# 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充
char_map = {tuple(item["bbox"]): item for item in char_dict}
char_map = {tuple(item['bbox']): item for item in char_dict}
for i in range(len(block_dict)): # blcok
for i in range(len(block_dict)): # block
block = block_dict[i]
key = block["bbox"]
key = block['bbox']
char_dict_item = char_map[tuple(key)]
char_dict_map = {tuple(item["bbox"]): item for item in char_dict_item["lines"]}
for j in range(len(block["lines"])):
lines = block["lines"][j]
with_char_lines = char_dict_map[lines["bbox"]]
for k in range(len(lines["spans"])):
spans = lines["spans"][k]
char_dict_map = {tuple(item['bbox']): item for item in char_dict_item['lines']}
for j in range(len(block['lines'])):
lines = block['lines'][j]
with_char_lines = char_dict_map[lines['bbox']]
for k in range(len(lines['spans'])):
spans = lines['spans'][k]
try:
chars = with_char_lines["spans"][k]["chars"]
except Exception as e:
logger.error(char_dict[i]["lines"][j])
chars = with_char_lines['spans'][k]['chars']
except Exception:
logger.error(char_dict[i]['lines'][j])
spans["chars"] = chars
spans['chars'] = chars
return block_dict
def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox):
"""
计算box1和box2的重叠面积占最小面积的box的比例
"""
"""计算box1和box2的重叠面积占最小面积的box的比例."""
# Determine the coordinates of the intersection rectangle
x_left = max(bbox1[0], min_bbox[0])
y_top = max(bbox1[1], min_bbox[1])
......@@ -74,13 +70,13 @@ def _is_xin(bbox1, bbox2):
def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
"""消除掉整个块都在行间公式块内部的文本块"""
"""消除掉整个块都在行间公式块内部的文本块."""
for eq_bbox in interline_bboxes:
removed_txt_blk = []
for text_blk in text_blocks:
text_bbox = text_blk["bbox"]
text_bbox = text_blk['bbox']
if (
calculate_overlap_area_2_minbox_area_ratio(eq_bbox["bbox"], text_bbox)
calculate_overlap_area_2_minbox_area_ratio(eq_bbox['bbox'], text_bbox)
>= 0.7
):
removed_txt_blk.append(text_blk)
......@@ -91,9 +87,7 @@ def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
def _is_in_or_part_overlap(box1, box2) -> bool:
"""
两个bbox是否有部分重叠或者包含
"""
"""两个bbox是否有部分重叠或者包含."""
if box1 is None or box2 is None:
return False
......@@ -111,62 +105,65 @@ def _is_in_or_part_overlap(box1, box2) -> bool:
def remove_text_block_overlap_interline_equation_bbox(
interline_eq_bboxes, pymu_block_list
):
"""消除掉行行内公式有部分重叠的文本块的内容。
同时重新计算消除重叠之后文本块的大小"""
"""消除掉行行内公式有部分重叠的文本块的内容。 同时重新计算消除重叠之后文本块的大小."""
deleted_block = []
for text_block in pymu_block_list:
deleted_line = []
for line in text_block["lines"]:
for line in text_block['lines']:
deleted_span = []
for span in line["spans"]:
for span in line['spans']:
deleted_chars = []
for char in span["chars"]:
for char in span['chars']:
if any(
[
(calculate_overlap_area_2_minbox_area_ratio(eq_bbox["bbox"], char["bbox"]) > 0.5)
for eq_bbox in interline_eq_bboxes
]
[
(
calculate_overlap_area_2_minbox_area_ratio(
eq_bbox['bbox'], char['bbox']
)
> 0.5
)
for eq_bbox in interline_eq_bboxes
]
):
deleted_chars.append(char)
# 检查span里没有char则删除这个span
for char in deleted_chars:
span["chars"].remove(char)
span['chars'].remove(char)
# 重新计算这个span的大小
if len(span["chars"]) == 0: # 删除这个span
if len(span['chars']) == 0: # 删除这个span
deleted_span.append(span)
else:
span["bbox"] = (
min([b["bbox"][0] for b in span["chars"]]),
min([b["bbox"][1] for b in span["chars"]]),
max([b["bbox"][2] for b in span["chars"]]),
max([b["bbox"][3] for b in span["chars"]]),
span['bbox'] = (
min([b['bbox'][0] for b in span['chars']]),
min([b['bbox'][1] for b in span['chars']]),
max([b['bbox'][2] for b in span['chars']]),
max([b['bbox'][3] for b in span['chars']]),
)
# 检查这个span
for span in deleted_span:
line["spans"].remove(span)
if len(line["spans"]) == 0: # 删除这个line
line['spans'].remove(span)
if len(line['spans']) == 0: # 删除这个line
deleted_line.append(line)
else:
line["bbox"] = (
min([b["bbox"][0] for b in line["spans"]]),
min([b["bbox"][1] for b in line["spans"]]),
max([b["bbox"][2] for b in line["spans"]]),
max([b["bbox"][3] for b in line["spans"]]),
line['bbox'] = (
min([b['bbox'][0] for b in line['spans']]),
min([b['bbox'][1] for b in line['spans']]),
max([b['bbox'][2] for b in line['spans']]),
max([b['bbox'][3] for b in line['spans']]),
)
# 检查这个block是否可以删除
for line in deleted_line:
text_block["lines"].remove(line)
if len(text_block["lines"]) == 0: # 删除block
text_block['lines'].remove(line)
if len(text_block['lines']) == 0: # 删除block
deleted_block.append(text_block)
else:
text_block["bbox"] = (
min([b["bbox"][0] for b in text_block["lines"]]),
min([b["bbox"][1] for b in text_block["lines"]]),
max([b["bbox"][2] for b in text_block["lines"]]),
max([b["bbox"][3] for b in text_block["lines"]]),
text_block['bbox'] = (
min([b['bbox'][0] for b in text_block['lines']]),
min([b['bbox'][1] for b in text_block['lines']]),
max([b['bbox'][2] for b in text_block['lines']]),
max([b['bbox'][3] for b in text_block['lines']]),
)
# 检查text block删除
......@@ -179,33 +176,33 @@ def remove_text_block_overlap_interline_equation_bbox(
def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
"""在行间公式对应的地方插上一个伪造的block"""
"""在行间公式对应的地方插上一个伪造的block."""
for eq in interline_eq_bboxes:
bbox = eq["bbox"]
latex_content = eq["latex"]
bbox = eq['bbox']
latex_content = eq['latex']
text_block = {
"number": len(pymu_block_list),
"type": 0,
"bbox": bbox,
"lines": [
'number': len(pymu_block_list),
'type': 0,
'bbox': bbox,
'lines': [
{
"spans": [
'spans': [
{
"size": 9.962599754333496,
"type": TYPE_INTERLINE_EQUATION,
"flags": 4,
"font": TYPE_INTERLINE_EQUATION,
"color": 0,
"ascender": 0.9409999847412109,
"descender": -0.3050000071525574,
"latex": latex_content,
"origin": [bbox[0], bbox[1]],
"bbox": bbox,
'size': 9.962599754333496,
'type': TYPE_INTERLINE_EQUATION,
'flags': 4,
'font': TYPE_INTERLINE_EQUATION,
'color': 0,
'ascender': 0.9409999847412109,
'descender': -0.3050000071525574,
'latex': latex_content,
'origin': [bbox[0], bbox[1]],
'bbox': bbox,
}
],
"wmode": 0,
"dir": [1.0, 0.0],
"bbox": bbox,
'wmode': 0,
'dir': [1.0, 0.0],
'bbox': bbox,
}
],
}
......@@ -250,53 +247,52 @@ def __y_overlap_ratio(box1, box2):
def replace_line_v2(eqinfo, line):
"""
扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。
最后与这个x0,x1有相交的span0, span1内部进行分割。
"""
"""扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。
最后与这个x0,x1有相交的span0, span1内部进行分割。"""
first_overlap_span = -1
first_overlap_span_idx = -1
last_overlap_span = -1
delete_chars = []
for i in range(0, len(line["spans"])):
if "chars" not in line["spans"][i]:
for i in range(0, len(line['spans'])):
if 'chars' not in line['spans'][i]:
continue
if line["spans"][i].get("_type", None) is not None:
if line['spans'][i].get('_type', None) is not None:
continue # 忽略,因为已经是插入的伪造span公式了
for char in line["spans"][i]["chars"]:
if __is_x_dir_overlap(eqinfo["bbox"], char["bbox"]):
line_txt = ""
for span in line["spans"]:
span_txt = "<span>"
for ch in span["chars"]:
span_txt = span_txt + ch["c"]
for char in line['spans'][i]['chars']:
if __is_x_dir_overlap(eqinfo['bbox'], char['bbox']):
line_txt = ''
for span in line['spans']:
span_txt = '<span>'
for ch in span['chars']:
span_txt = span_txt + ch['c']
span_txt = span_txt + "</span>"
span_txt = span_txt + '</span>'
line_txt = line_txt + span_txt
if first_overlap_span_idx == -1:
first_overlap_span = line["spans"][i]
first_overlap_span = line['spans'][i]
first_overlap_span_idx = i
last_overlap_span = line["spans"][i]
last_overlap_span = line['spans'][i]
delete_chars.append(char)
# 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多
if len(delete_chars) > 0:
ch0_bbox = delete_chars[0]["bbox"]
if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
ch0_bbox = delete_chars[0]['bbox']
if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51:
delete_chars.remove(delete_chars[0])
if len(delete_chars) > 0:
ch0_bbox = delete_chars[-1]["bbox"]
if x_overlap_ratio(eqinfo["bbox"], ch0_bbox) < 0.51:
ch0_bbox = delete_chars[-1]['bbox']
if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51:
delete_chars.remove(delete_chars[-1])
# 计算x方向上被删除区间内的char的真实x0, x1
if len(delete_chars):
x0, x1 = min([b["bbox"][0] for b in delete_chars]), max(
[b["bbox"][2] for b in delete_chars]
x0, x1 = (
min([b['bbox'][0] for b in delete_chars]),
max([b['bbox'][2] for b in delete_chars]),
)
else:
# logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}")
......@@ -304,101 +300,101 @@ def replace_line_v2(eqinfo, line):
# 删除位于x0, x1这两个中间的span
delete_span = []
for span in line["spans"]:
span_box = span["bbox"]
for span in line['spans']:
span_box = span['bbox']
if x0 <= span_box[0] and span_box[2] <= x1:
delete_span.append(span)
for span in delete_span:
line["spans"].remove(span)
line['spans'].remove(span)
equation_span = {
"size": 9.962599754333496,
"type": TYPE_INLINE_EQUATION,
"flags": 4,
"font": TYPE_INLINE_EQUATION,
"color": 0,
"ascender": 0.9409999847412109,
"descender": -0.3050000071525574,
"latex": "",
"origin": [337.1410153102337, 216.0205245153934],
"bbox": eqinfo["bbox"]
'size': 9.962599754333496,
'type': TYPE_INLINE_EQUATION,
'flags': 4,
'font': TYPE_INLINE_EQUATION,
'color': 0,
'ascender': 0.9409999847412109,
'descender': -0.3050000071525574,
'latex': '',
'origin': [337.1410153102337, 216.0205245153934],
'bbox': eqinfo['bbox'],
}
# equation_span = line['spans'][0].copy()
equation_span["latex"] = eqinfo['latex']
equation_span["bbox"] = [x0, equation_span["bbox"][1], x1, equation_span["bbox"][3]]
equation_span["origin"] = [equation_span["bbox"][0], equation_span["bbox"][1]]
equation_span["chars"] = delete_chars
equation_span["type"] = TYPE_INLINE_EQUATION
equation_span["_eq_bbox"] = eqinfo["bbox"]
line["spans"].insert(first_overlap_span_idx + 1, equation_span) # 放入公式
equation_span['latex'] = eqinfo['latex']
equation_span['bbox'] = [x0, equation_span['bbox'][1], x1, equation_span['bbox'][3]]
equation_span['origin'] = [equation_span['bbox'][0], equation_span['bbox'][1]]
equation_span['chars'] = delete_chars
equation_span['type'] = TYPE_INLINE_EQUATION
equation_span['_eq_bbox'] = eqinfo['bbox']
line['spans'].insert(first_overlap_span_idx + 1, equation_span) # 放入公式
# logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】")
# 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置
first_span_chars = [
char
for char in first_overlap_span["chars"]
if (char["bbox"][2] + char["bbox"][0]) / 2 < x0
for char in first_overlap_span['chars']
if (char['bbox'][2] + char['bbox'][0]) / 2 < x0
]
tail_span_chars = [
char
for char in last_overlap_span["chars"]
if (char["bbox"][0] + char["bbox"][2]) / 2 > x1
for char in last_overlap_span['chars']
if (char['bbox'][0] + char['bbox'][2]) / 2 > x1
]
if len(first_span_chars) > 0:
first_overlap_span["chars"] = first_span_chars
first_overlap_span["text"] = "".join([char["c"] for char in first_span_chars])
first_overlap_span["bbox"] = (
first_overlap_span["bbox"][0],
first_overlap_span["bbox"][1],
max([chr["bbox"][2] for chr in first_span_chars]),
first_overlap_span["bbox"][3],
first_overlap_span['chars'] = first_span_chars
first_overlap_span['text'] = ''.join([char['c'] for char in first_span_chars])
first_overlap_span['bbox'] = (
first_overlap_span['bbox'][0],
first_overlap_span['bbox'][1],
max([chr['bbox'][2] for chr in first_span_chars]),
first_overlap_span['bbox'][3],
)
# first_overlap_span['_type'] = "first"
else:
# 删掉
if first_overlap_span not in delete_span:
line["spans"].remove(first_overlap_span)
line['spans'].remove(first_overlap_span)
if len(tail_span_chars) > 0:
min_of_tail_span_x0 = min([chr["bbox"][0] for chr in tail_span_chars])
min_of_tail_span_y0 = min([chr["bbox"][1] for chr in tail_span_chars])
max_of_tail_span_x1 = max([chr["bbox"][2] for chr in tail_span_chars])
max_of_tail_span_y1 = max([chr["bbox"][3] for chr in tail_span_chars])
min_of_tail_span_x0 = min([chr['bbox'][0] for chr in tail_span_chars])
min_of_tail_span_y0 = min([chr['bbox'][1] for chr in tail_span_chars])
max_of_tail_span_x1 = max([chr['bbox'][2] for chr in tail_span_chars])
max_of_tail_span_y1 = max([chr['bbox'][3] for chr in tail_span_chars])
if last_overlap_span == first_overlap_span: # 这个时候应该插入一个新的
tail_span_txt = "".join([char["c"] for char in tail_span_chars])
tail_span_txt = ''.join([char['c'] for char in tail_span_chars]) # noqa: F841
last_span_to_insert = last_overlap_span.copy()
last_span_to_insert["chars"] = tail_span_chars
last_span_to_insert["text"] = "".join(
[char["c"] for char in tail_span_chars]
last_span_to_insert['chars'] = tail_span_chars
last_span_to_insert['text'] = ''.join(
[char['c'] for char in tail_span_chars]
)
if equation_span["bbox"][2] >= last_overlap_span["bbox"][2]:
last_span_to_insert["bbox"] = (
if equation_span['bbox'][2] >= last_overlap_span['bbox'][2]:
last_span_to_insert['bbox'] = (
min_of_tail_span_x0,
min_of_tail_span_y0,
max_of_tail_span_x1,
max_of_tail_span_y1
max_of_tail_span_y1,
)
else:
last_span_to_insert["bbox"] = (
min([chr["bbox"][0] for chr in tail_span_chars]),
last_overlap_span["bbox"][1],
last_overlap_span["bbox"][2],
last_overlap_span["bbox"][3],
last_span_to_insert['bbox'] = (
min([chr['bbox'][0] for chr in tail_span_chars]),
last_overlap_span['bbox'][1],
last_overlap_span['bbox'][2],
last_overlap_span['bbox'][3],
)
# 插入到公式对象之后
equation_idx = line["spans"].index(equation_span)
line["spans"].insert(equation_idx + 1, last_span_to_insert) # 放入公式
equation_idx = line['spans'].index(equation_span)
line['spans'].insert(equation_idx + 1, last_span_to_insert) # 放入公式
else: # 直接修改原来的span
last_overlap_span["chars"] = tail_span_chars
last_overlap_span["text"] = "".join([char["c"] for char in tail_span_chars])
last_overlap_span["bbox"] = (
min([chr["bbox"][0] for chr in tail_span_chars]),
last_overlap_span["bbox"][1],
last_overlap_span["bbox"][2],
last_overlap_span["bbox"][3],
last_overlap_span['chars'] = tail_span_chars
last_overlap_span['text'] = ''.join([char['c'] for char in tail_span_chars])
last_overlap_span['bbox'] = (
min([chr['bbox'][0] for chr in tail_span_chars]),
last_overlap_span['bbox'][1],
last_overlap_span['bbox'][2],
last_overlap_span['bbox'][3],
)
else:
# 删掉
......@@ -406,15 +402,15 @@ def replace_line_v2(eqinfo, line):
last_overlap_span not in delete_span
and last_overlap_span != first_overlap_span
):
line["spans"].remove(last_overlap_span)
line['spans'].remove(last_overlap_span)
remain_txt = ""
for span in line["spans"]:
span_txt = "<span>"
for char in span["chars"]:
span_txt = span_txt + char["c"]
remain_txt = ''
for span in line['spans']:
span_txt = '<span>'
for char in span['chars']:
span_txt = span_txt + char['c']
span_txt = span_txt + "</span>"
span_txt = span_txt + '</span>'
remain_txt = remain_txt + span_txt
......@@ -424,17 +420,15 @@ def replace_line_v2(eqinfo, line):
def replace_eq_blk(eqinfo, text_block):
"""替换行内公式"""
for line in text_block["lines"]:
line_bbox = line["bbox"]
"""替换行内公式."""
for line in text_block['lines']:
line_bbox = line['bbox']
if (
_is_xin(eqinfo["bbox"], line_bbox)
or __y_overlap_ratio(eqinfo["bbox"], line_bbox) > 0.6
_is_xin(eqinfo['bbox'], line_bbox)
or __y_overlap_ratio(eqinfo['bbox'], line_bbox) > 0.6
): # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄,
replace_succ = replace_line_v2(eqinfo, line)
if (
not replace_succ
): # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
if not replace_succ: # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
continue
else:
break
......@@ -444,13 +438,13 @@ def replace_eq_blk(eqinfo, text_block):
def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
"""替换行内公式"""
"""替换行内公式."""
for eqinfo in inline_equation_bboxes:
eqbox = eqinfo["bbox"]
eqbox = eqinfo['bbox']
for blk in raw_text_blocks:
if _is_xin(eqbox, blk["bbox"]):
if _is_xin(eqbox, blk['bbox']):
if not replace_eq_blk(eqinfo, blk):
logger.warning(f"行内公式没有替换成功:{eqinfo} ")
logger.warning(f'行内公式没有替换成功:{eqinfo} ')
else:
break
......@@ -458,20 +452,18 @@ def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
def remove_chars_in_text_blocks(text_blocks):
"""删除text_blocks里的char"""
"""删除text_blocks里的char."""
for blk in text_blocks:
for line in blk["lines"]:
for span in line["spans"]:
_ = span.pop("chars", "no such key")
for line in blk['lines']:
for span in line['spans']:
_ = span.pop('chars', 'no such key')
return text_blocks
def replace_equations_in_textblock(
raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes
):
"""
替换行间和和行内公式为latex
"""
"""替换行间和和行内公式为latex."""
raw_text_blocks = remove_text_block_in_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消除重叠:第一步,在公式内部的
......@@ -486,22 +478,22 @@ def replace_equations_in_textblock(
def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
""" """
new_pdf = f"{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf"
with open(json_path, "r", encoding="utf-8") as f:
""""""
new_pdf = f'{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf'
with open(json_path, 'r', encoding='utf-8') as f:
obj = json.loads(f.read())
if os.path.exists(new_pdf):
os.remove(new_pdf)
new_doc = fitz.open("")
new_doc = fitz.open('')
doc = fitz.open(pdf_path)
doc = fitz.open(pdf_path) # noqa: F841
new_doc = fitz.open(pdf_path)
for i in range(len(new_doc)):
page = new_doc[i]
inline_equation_bboxes = obj[f"page_{i}"]["inline_equations"]
interline_equation_bboxes = obj[f"page_{i}"]["interline_equations"]
raw_text_blocks = obj[f"page_{i}"]["preproc_blocks"]
inline_equation_bboxes = obj[f'page_{i}']['inline_equations']
interline_equation_bboxes = obj[f'page_{i}']['interline_equations']
raw_text_blocks = obj[f'page_{i}']['preproc_blocks']
raw_text_blocks = remove_text_block_in_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消除重叠:第一步,在公式内部的
......@@ -514,11 +506,10 @@ def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
)
# 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的
color_map = [fitz.pdfcolor["blue"], fitz.pdfcolor["green"]]
j = 0
color_map = [fitz.pdfcolor['blue'], fitz.pdfcolor['green']] # noqa: F841
j = 0 # noqa: F841
for blk in raw_text_blocks:
for i, line in enumerate(blk["lines"]):
for i, line in enumerate(blk['lines']):
# line_box = line['bbox']
# shape = page.new_shape()
# shape.draw_rect(line_box)
......@@ -526,34 +517,34 @@ def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
# shape.commit()
# j = j+1
for i, span in enumerate(line["spans"]):
for i, span in enumerate(line['spans']):
shape_page = page.new_shape()
span_type = span.get("_type")
color = fitz.pdfcolor["blue"]
if span_type == "first":
color = fitz.pdfcolor["blue"]
elif span_type == "tail":
color = fitz.pdfcolor["green"]
span_type = span.get('_type')
color = fitz.pdfcolor['blue']
if span_type == 'first':
color = fitz.pdfcolor['blue']
elif span_type == 'tail':
color = fitz.pdfcolor['green']
elif span_type == TYPE_INLINE_EQUATION:
color = fitz.pdfcolor["black"]
color = fitz.pdfcolor['black']
else:
color = None
b = span["bbox"]
b = span['bbox']
shape_page.draw_rect(b)
shape_page.finish(color=None, fill=color, fill_opacity=0.3)
shape_page.commit()
new_doc.save(new_pdf)
logger.info(f"save ok {new_pdf}")
logger.info(f'save ok {new_pdf}')
final_json = json.dumps(obj, ensure_ascii=False, indent=2)
with open("equations_test/final_json.json", "w") as f:
with open('equations_test/final_json.json', 'w') as f:
f.write(final_json)
return new_pdf
if __name__ == "__main__":
if __name__ == '__main__':
# draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf)
pass
from loguru import logger
from magic_pdf.libs.boxbase import get_minbox_if_overlap_by_ratio, calculate_overlap_area_in_bbox1_area_ratio, \
calculate_iou, calculate_vertical_projection_overlap_ratio
from magic_pdf.libs.drop_tag import DropTag
from magic_pdf.libs.ocr_content_type import BlockType
from magic_pdf.pre_proc.remove_bbox_overlap import remove_overlap_between_bbox_for_block
def ocr_prepare_bboxes_for_layout_split(img_blocks, table_blocks, discarded_blocks, text_blocks,
title_blocks, interline_equation_blocks, page_w, page_h):
from magic_pdf.config.ocr_content_type import BlockType
from magic_pdf.libs.boxbase import (
calculate_iou, calculate_overlap_area_in_bbox1_area_ratio,
calculate_vertical_projection_overlap_ratio,
get_minbox_if_overlap_by_ratio)
from magic_pdf.pre_proc.remove_bbox_overlap import \
remove_overlap_between_bbox_for_block
def ocr_prepare_bboxes_for_layout_split(
img_blocks,
table_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equation_blocks,
page_w,
page_h,
):
all_bboxes = []
all_discarded_blocks = []
for image in img_blocks:
x0, y0, x1, y1 = image['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Image, None, None, None, None, image["score"]])
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Image,
None,
None,
None,
None,
image['score'],
]
)
for table in table_blocks:
x0, y0, x1, y1 = table['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Table, None, None, None, None, table["score"]])
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Table,
None,
None,
None,
None,
table['score'],
]
)
for text in text_blocks:
x0, y0, x1, y1 = text['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Text, None, None, None, None, text["score"]])
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Text,
None,
None,
None,
None,
text['score'],
]
)
for title in title_blocks:
x0, y0, x1, y1 = title['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Title, None, None, None, None, title["score"]])
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Title,
None,
None,
None,
None,
title['score'],
]
)
for interline_equation in interline_equation_blocks:
x0, y0, x1, y1 = interline_equation['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.InterlineEquation, None, None, None, None, interline_equation["score"]])
'''block嵌套问题解决'''
'''文本框与标题框重叠,优先信任文本框'''
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.InterlineEquation,
None,
None,
None,
None,
interline_equation['score'],
]
)
"""block嵌套问题解决"""
"""文本框与标题框重叠,优先信任文本框"""
all_bboxes = fix_text_overlap_title_blocks(all_bboxes)
'''任何框体与舍弃框重叠,优先信任舍弃框'''
"""任何框体与舍弃框重叠,优先信任舍弃框"""
all_bboxes = remove_need_drop_blocks(all_bboxes, discarded_blocks)
# interline_equation 与title或text框冲突的情况,分两种情况处理
'''interline_equation框与文本类型框iou比较接近1的时候,信任行间公式框'''
"""interline_equation框与文本类型框iou比较接近1的时候,信任行间公式框"""
all_bboxes = fix_interline_equation_overlap_text_blocks_with_hi_iou(all_bboxes)
'''interline_equation框被包含在文本类型框内,且interline_equation比文本区块小很多时信任文本框,这时需要舍弃公式框'''
"""interline_equation框被包含在文本类型框内,且interline_equation比文本区块小很多时信任文本框,这时需要舍弃公式框"""
# 通过后续大框套小框逻辑删除
'''discarded_blocks中只保留宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的(限定footnote)'''
"""discarded_blocks中只保留宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的(限定footnote)"""
for discarded in discarded_blocks:
x0, y0, x1, y1 = discarded['bbox']
all_discarded_blocks.append([x0, y0, x1, y1, None, None, None, BlockType.Discarded, None, None, None, None, discarded["score"]])
all_discarded_blocks.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Discarded,
None,
None,
None,
None,
discarded['score'],
]
)
# 将footnote加入到all_bboxes中,用来计算layout
if (x1 - x0) > (page_w / 3) and (y1 - y0) > 10 and y0 > (page_h / 2):
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Footnote, None, None, None, None, discarded["score"]])
'''经过以上处理后,还存在大框套小框的情况,则删除小框'''
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Footnote,
None,
None,
None,
None,
discarded['score'],
]
)
"""经过以上处理后,还存在大框套小框的情况,则删除小框"""
all_bboxes = remove_overlaps_min_blocks(all_bboxes)
all_discarded_blocks = remove_overlaps_min_blocks(all_discarded_blocks)
'''将剩余的bbox做分离处理,防止后面分layout时出错'''
"""将剩余的bbox做分离处理,防止后面分layout时出错"""
all_bboxes, drop_reasons = remove_overlap_between_bbox_for_block(all_bboxes)
return all_bboxes, all_discarded_blocks, drop_reasons
......@@ -64,18 +185,64 @@ def add_bboxes(blocks, block_type, bboxes):
for block in blocks:
x0, y0, x1, y1 = block['bbox']
if block_type in [
BlockType.ImageBody, BlockType.ImageCaption, BlockType.ImageFootnote,
BlockType.TableBody, BlockType.TableCaption, BlockType.TableFootnote
BlockType.ImageBody,
BlockType.ImageCaption,
BlockType.ImageFootnote,
BlockType.TableBody,
BlockType.TableCaption,
BlockType.TableFootnote,
]:
bboxes.append([x0, y0, x1, y1, None, None, None, block_type, None, None, None, None, block["score"], block["group_id"]])
bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
block_type,
None,
None,
None,
None,
block['score'],
block['group_id'],
]
)
else:
bboxes.append([x0, y0, x1, y1, None, None, None, block_type, None, None, None, None, block["score"]])
bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
block_type,
None,
None,
None,
None,
block['score'],
]
)
def ocr_prepare_bboxes_for_layout_split_v2(
img_body_blocks, img_caption_blocks, img_footnote_blocks,
table_body_blocks, table_caption_blocks, table_footnote_blocks,
discarded_blocks, text_blocks, title_blocks, interline_equation_blocks, page_w, page_h
img_body_blocks,
img_caption_blocks,
img_footnote_blocks,
table_body_blocks,
table_caption_blocks,
table_footnote_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equation_blocks,
page_w,
page_h,
):
all_bboxes = []
......@@ -89,40 +256,40 @@ def ocr_prepare_bboxes_for_layout_split_v2(
add_bboxes(title_blocks, BlockType.Title, all_bboxes)
add_bboxes(interline_equation_blocks, BlockType.InterlineEquation, all_bboxes)
'''block嵌套问题解决'''
'''文本框与标题框重叠,优先信任文本框'''
"""block嵌套问题解决"""
"""文本框与标题框重叠,优先信任文本框"""
all_bboxes = fix_text_overlap_title_blocks(all_bboxes)
'''任何框体与舍弃框重叠,优先信任舍弃框'''
"""任何框体与舍弃框重叠,优先信任舍弃框"""
all_bboxes = remove_need_drop_blocks(all_bboxes, discarded_blocks)
# interline_equation 与title或text框冲突的情况,分两种情况处理
'''interline_equation框与文本类型框iou比较接近1的时候,信任行间公式框'''
"""interline_equation框与文本类型框iou比较接近1的时候,信任行间公式框"""
all_bboxes = fix_interline_equation_overlap_text_blocks_with_hi_iou(all_bboxes)
'''interline_equation框被包含在文本类型框内,且interline_equation比文本区块小很多时信任文本框,这时需要舍弃公式框'''
"""interline_equation框被包含在文本类型框内,且interline_equation比文本区块小很多时信任文本框,这时需要舍弃公式框"""
# 通过后续大框套小框逻辑删除
'''discarded_blocks'''
"""discarded_blocks"""
all_discarded_blocks = []
add_bboxes(discarded_blocks, BlockType.Discarded, all_discarded_blocks)
'''footnote识别:宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的'''
"""footnote识别:宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的"""
footnote_blocks = []
for discarded in discarded_blocks:
x0, y0, x1, y1 = discarded['bbox']
if (x1 - x0) > (page_w / 3) and (y1 - y0) > 10 and y0 > (page_h / 2):
footnote_blocks.append([x0, y0, x1, y1])
'''移除在footnote下面的任何框'''
"""移除在footnote下面的任何框"""
need_remove_blocks = find_blocks_under_footnote(all_bboxes, footnote_blocks)
if len(need_remove_blocks) > 0:
for block in need_remove_blocks:
all_bboxes.remove(block)
all_discarded_blocks.append(block)
'''经过以上处理后,还存在大框套小框的情况,则删除小框'''
"""经过以上处理后,还存在大框套小框的情况,则删除小框"""
all_bboxes = remove_overlaps_min_blocks(all_bboxes)
all_discarded_blocks = remove_overlaps_min_blocks(all_discarded_blocks)
'''将剩余的bbox做分离处理,防止后面分layout时出错'''
"""将剩余的bbox做分离处理,防止后面分layout时出错"""
all_bboxes, drop_reasons = remove_overlap_between_bbox_for_block(all_bboxes)
return all_bboxes, all_discarded_blocks
......@@ -135,7 +302,13 @@ def find_blocks_under_footnote(all_bboxes, footnote_blocks):
for footnote_bbox in footnote_blocks:
footnote_x0, footnote_y0, footnote_x1, footnote_y1 = footnote_bbox
# 如果footnote的纵向投影覆盖了block的纵向投影的80%且block的y0大于等于footnote的y1
if block_y0 >= footnote_y1 and calculate_vertical_projection_overlap_ratio((block_x0, block_y0, block_x1, block_y1), footnote_bbox) >= 0.8:
if (
block_y0 >= footnote_y1
and calculate_vertical_projection_overlap_ratio(
(block_x0, block_y0, block_x1, block_y1), footnote_bbox
)
>= 0.8
):
if block not in need_remove_blocks:
need_remove_blocks.append(block)
break
......@@ -203,7 +376,12 @@ def remove_need_drop_blocks(all_bboxes, discarded_blocks):
for block in all_bboxes:
for discarded_block in discarded_blocks:
block_bbox = block[:4]
if calculate_overlap_area_in_bbox1_area_ratio(block_bbox, discarded_block['bbox']) > 0.6:
if (
calculate_overlap_area_in_bbox1_area_ratio(
block_bbox, discarded_block['bbox']
)
> 0.6
):
if block not in need_remove:
need_remove.append(block)
break
......@@ -223,10 +401,18 @@ def remove_overlaps_min_blocks(all_bboxes):
if block1 != block2:
block1_bbox = block1[:4]
block2_bbox = block2[:4]
overlap_box = get_minbox_if_overlap_by_ratio(block1_bbox, block2_bbox, 0.8)
overlap_box = get_minbox_if_overlap_by_ratio(
block1_bbox, block2_bbox, 0.8
)
if overlap_box is not None:
block_to_remove = next((block for block in all_bboxes if block[:4] == overlap_box), None)
if block_to_remove is not None and block_to_remove not in need_remove:
block_to_remove = next(
(block for block in all_bboxes if block[:4] == overlap_box),
None,
)
if (
block_to_remove is not None
and block_to_remove not in need_remove
):
large_block = block1 if block1 != block_to_remove else block2
x1, y1, x2, y2 = large_block[:4]
sx1, sy1, sx2, sy2 = block_to_remove[:4]
......
from magic_pdf.config.drop_tag import DropTag
from magic_pdf.config.ocr_content_type import BlockType, ContentType
from magic_pdf.libs.boxbase import (__is_overlaps_y_exceeds_threshold,
_is_in_or_part_overlap_with_area_ratio,
calculate_overlap_area_in_bbox1_area_ratio)
from magic_pdf.libs.drop_tag import DropTag
from magic_pdf.libs.ocr_content_type import BlockType, ContentType
# 将每一个line中的span从左到右排序
......@@ -24,7 +24,7 @@ def line_sort_spans_by_left_to_right(lines):
return line_objects
def merge_spans_to_line(spans):
def merge_spans_to_line(spans, threshold=0.6):
if len(spans) == 0:
return []
else:
......@@ -49,7 +49,7 @@ def merge_spans_to_line(spans):
continue
# 如果当前的span与当前行的最后一个span在y轴上重叠,则添加到当前行
if __is_overlaps_y_exceeds_threshold(span['bbox'], current_line[-1]['bbox'], 0.5):
if __is_overlaps_y_exceeds_threshold(span['bbox'], current_line[-1]['bbox'], threshold):
current_line.append(span)
else:
# 否则,开始新行
......@@ -157,7 +157,7 @@ def fill_spans_in_blocks(blocks, spans, radio):
BlockType.ImageBody, BlockType.ImageCaption, BlockType.ImageFootnote,
BlockType.TableBody, BlockType.TableCaption, BlockType.TableFootnote
]:
block_dict["group_id"] = block[-1]
block_dict['group_id'] = block[-1]
block_spans = []
for span in spans:
span_bbox = span['bbox']
......
from loguru import logger
from magic_pdf.libs.boxbase import calculate_overlap_area_in_bbox1_area_ratio, get_minbox_if_overlap_by_ratio, \
__is_overlaps_y_exceeds_threshold, calculate_iou
from magic_pdf.libs.drop_tag import DropTag
from magic_pdf.libs.ocr_content_type import ContentType, BlockType
from magic_pdf.config.drop_tag import DropTag
from magic_pdf.config.ocr_content_type import BlockType, ContentType
from magic_pdf.libs.boxbase import (__is_overlaps_y_exceeds_threshold,
calculate_iou,
calculate_overlap_area_in_bbox1_area_ratio,
get_minbox_if_overlap_by_ratio)
def remove_overlaps_low_confidence_spans(spans):
......@@ -21,7 +22,10 @@ def remove_overlaps_low_confidence_spans(spans):
span_need_remove = span1
else:
span_need_remove = span2
if span_need_remove is not None and span_need_remove not in dropped_spans:
if (
span_need_remove is not None
and span_need_remove not in dropped_spans
):
dropped_spans.append(span_need_remove)
if len(dropped_spans) > 0:
......@@ -38,12 +42,15 @@ def remove_overlaps_min_spans(spans):
for span1 in spans:
for span2 in spans:
if span1 != span2:
overlap_box = get_minbox_if_overlap_by_ratio(span1['bbox'], span2['bbox'], 0.65)
if overlap_box is not None:
span_need_remove = next((span for span in spans if span['bbox'] == overlap_box), None)
if span_need_remove is not None and span_need_remove not in dropped_spans:
dropped_spans.append(span_need_remove)
# span1 或 span2 任何一个都不应该在 dropped_spans 中
if span1 in dropped_spans or span2 in dropped_spans:
continue
else:
overlap_box = get_minbox_if_overlap_by_ratio(span1['bbox'], span2['bbox'], 0.65)
if overlap_box is not None:
span_need_remove = next((span for span in spans if span['bbox'] == overlap_box), None)
if span_need_remove is not None and span_need_remove not in dropped_spans:
dropped_spans.append(span_need_remove)
if len(dropped_spans) > 0:
for span_need_remove in dropped_spans:
spans.remove(span_need_remove)
......@@ -58,7 +65,10 @@ def remove_spans_by_bboxes(spans, need_remove_spans_bboxes):
need_remove_spans = []
for span in spans:
for removed_bbox in need_remove_spans_bboxes:
if calculate_overlap_area_in_bbox1_area_ratio(span['bbox'], removed_bbox) > 0.5:
if (
calculate_overlap_area_in_bbox1_area_ratio(span['bbox'], removed_bbox)
> 0.5
):
if span not in need_remove_spans:
need_remove_spans.append(span)
break
......@@ -78,12 +88,22 @@ def remove_spans_by_bboxes_dict(spans, need_remove_spans_bboxes_dict):
for span in spans:
# 通过判断span的bbox是否在removed_bboxes中, 判断是否需要删除该span
for removed_bbox in removed_bboxes:
if calculate_overlap_area_in_bbox1_area_ratio(span['bbox'], removed_bbox) > 0.5:
if (
calculate_overlap_area_in_bbox1_area_ratio(
span['bbox'], removed_bbox
)
> 0.5
):
need_remove_spans.append(span)
break
# 当drop_tag为DropTag.FOOTNOTE时, 判断span是否在removed_bboxes中任意一个的下方,如果是,则删除该span
elif drop_tag == DropTag.FOOTNOTE and (span['bbox'][1] + span['bbox'][3]) / 2 > removed_bbox[3] and \
removed_bbox[0] < (span['bbox'][0] + span['bbox'][2]) / 2 < removed_bbox[2]:
elif (
drop_tag == DropTag.FOOTNOTE
and (span['bbox'][1] + span['bbox'][3]) / 2 > removed_bbox[3]
and removed_bbox[0]
< (span['bbox'][0] + span['bbox'][2]) / 2
< removed_bbox[2]
):
need_remove_spans.append(span)
break
......@@ -98,11 +118,18 @@ def remove_spans_by_bboxes_dict(spans, need_remove_spans_bboxes_dict):
def adjust_bbox_for_standalone_block(spans):
# 对tpye=["interline_equation", "image", "table"]进行额外处理,如果左边有字的话,将该span的bbox中y0调整至不高于文字的y0
for sb_span in spans:
if sb_span['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table]:
if sb_span['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
]:
for text_span in spans:
if text_span['type'] in [ContentType.Text, ContentType.InlineEquation]:
# 判断span2的纵向高度是否被span所覆盖
if sb_span['bbox'][1] < text_span['bbox'][1] and sb_span['bbox'][3] > text_span['bbox'][3]:
if (
sb_span['bbox'][1] < text_span['bbox'][1]
and sb_span['bbox'][3] > text_span['bbox'][3]
):
# 判断span2是否在span左边
if text_span['bbox'][0] < sb_span['bbox'][0]:
# 调整span的y0和span2的y0一致
......@@ -120,11 +147,15 @@ def modify_y_axis(spans: list, displayed_list: list, text_inline_lines: list):
lines = []
current_line = [spans[0]]
if spans[0]["type"] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table]:
if spans[0]['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
]:
displayed_list.append(spans[0])
line_first_y0 = spans[0]["bbox"][1]
line_first_y = spans[0]["bbox"][3]
line_first_y0 = spans[0]['bbox'][1]
line_first_y = spans[0]['bbox'][3]
# 用于给行间公式搜索
# text_inline_lines = []
for span in spans[1:]:
......@@ -132,26 +163,43 @@ def modify_y_axis(spans: list, displayed_list: list, text_inline_lines: list):
# print("debug")
# 如果当前的span类型为"interline_equation" 或者 当前行中已经有"interline_equation"
# image和table类型,同上
if span['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table] or any(
s['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table] for s in
current_line):
if span['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
] or any(
s['type']
in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table]
for s in current_line
):
# 传入
if span["type"] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table]:
if span['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
]:
displayed_list.append(span)
# 则开始新行
lines.append(current_line)
if len(current_line) > 1 or current_line[0]["type"] in [ContentType.Text, ContentType.InlineEquation]:
text_inline_lines.append((current_line, (line_first_y0, line_first_y)))
if len(current_line) > 1 or current_line[0]['type'] in [
ContentType.Text,
ContentType.InlineEquation,
]:
text_inline_lines.append(
(current_line, (line_first_y0, line_first_y))
)
current_line = [span]
line_first_y0 = span["bbox"][1]
line_first_y = span["bbox"][3]
line_first_y0 = span['bbox'][1]
line_first_y = span['bbox'][3]
continue
# 如果当前的span与当前行的最后一个span在y轴上重叠,则添加到当前行
if __is_overlaps_y_exceeds_threshold(span['bbox'], current_line[-1]['bbox']):
if span["type"] == "text":
line_first_y0 = span["bbox"][1]
line_first_y = span["bbox"][3]
if __is_overlaps_y_exceeds_threshold(
span['bbox'], current_line[-1]['bbox']
):
if span['type'] == 'text':
line_first_y0 = span['bbox'][1]
line_first_y = span['bbox'][3]
current_line.append(span)
else:
......@@ -159,13 +207,16 @@ def modify_y_axis(spans: list, displayed_list: list, text_inline_lines: list):
lines.append(current_line)
text_inline_lines.append((current_line, (line_first_y0, line_first_y)))
current_line = [span]
line_first_y0 = span["bbox"][1]
line_first_y = span["bbox"][3]
line_first_y0 = span['bbox'][1]
line_first_y = span['bbox'][3]
# 添加最后一行
if current_line:
lines.append(current_line)
if len(current_line) > 1 or current_line[0]["type"] in [ContentType.Text, ContentType.InlineEquation]:
if len(current_line) > 1 or current_line[0]['type'] in [
ContentType.Text,
ContentType.InlineEquation,
]:
text_inline_lines.append((current_line, (line_first_y0, line_first_y)))
for line in text_inline_lines:
# 按照x0坐标排序
......@@ -176,8 +227,8 @@ def modify_y_axis(spans: list, displayed_list: list, text_inline_lines: list):
for line in text_inline_lines:
current_line, (line_first_y0, line_first_y) = line
for span in current_line:
span["bbox"][1] = line_first_y0
span["bbox"][3] = line_first_y
span['bbox'][1] = line_first_y0
span['bbox'][3] = line_first_y
# return spans, displayed_list, text_inline_lines
......@@ -189,34 +240,42 @@ def modify_inline_equation(spans: list, displayed_list: list, text_inline_lines:
# if i == 8:
# print("debug")
span = displayed_list[i]
span_y0, span_y = span["bbox"][1], span["bbox"][3]
span_y0, span_y = span['bbox'][1], span['bbox'][3]
while j < len(text_inline_lines):
text_line = text_inline_lines[j]
y0, y1 = text_line[1]
if (
span_y0 < y0 < span_y or span_y0 < y1 < span_y or span_y0 < y0 and span_y > y1
) and __is_overlaps_y_exceeds_threshold(
span['bbox'], (0, y0, 0, y1)
):
span_y0 < y0 < span_y
or span_y0 < y1 < span_y
or span_y0 < y0
and span_y > y1
) and __is_overlaps_y_exceeds_threshold(span['bbox'], (0, y0, 0, y1)):
# 调整公式类型
if span["type"] == ContentType.InterlineEquation:
if span['type'] == ContentType.InterlineEquation:
# 最后一行是行间公式
if j + 1 >= len(text_inline_lines):
span["type"] = ContentType.InlineEquation
span["bbox"][1] = y0
span["bbox"][3] = y1
span['type'] = ContentType.InlineEquation
span['bbox'][1] = y0
span['bbox'][3] = y1
else:
# 行间公式旁边有多行文字或者行间公式比文字高3倍则不转换
y0_next, y1_next = text_inline_lines[j + 1][1]
if not __is_overlaps_y_exceeds_threshold(span['bbox'], (0, y0_next, 0, y1_next)) and 3 * (
y1 - y0) > span_y - span_y0:
span["type"] = ContentType.InlineEquation
span["bbox"][1] = y0
span["bbox"][3] = y1
if (
not __is_overlaps_y_exceeds_threshold(
span['bbox'], (0, y0_next, 0, y1_next)
)
and 3 * (y1 - y0) > span_y - span_y0
):
span['type'] = ContentType.InlineEquation
span['bbox'][1] = y0
span['bbox'][3] = y1
break
elif span_y < y0 or span_y0 < y0 < span_y and not __is_overlaps_y_exceeds_threshold(span['bbox'],
(0, y0, 0, y1)):
elif (
span_y < y0
or span_y0 < y0 < span_y
and not __is_overlaps_y_exceeds_threshold(span['bbox'], (0, y0, 0, y1))
):
break
else:
j += 1
......@@ -232,15 +291,15 @@ def get_qa_need_list(blocks):
inline_equations = []
for block in blocks:
for line in block["lines"]:
for span in line["spans"]:
if span["type"] == ContentType.Image:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Image:
images.append(span)
elif span["type"] == ContentType.Table:
elif span['type'] == ContentType.Table:
tables.append(span)
elif span["type"] == ContentType.InlineEquation:
elif span['type'] == ContentType.InlineEquation:
inline_equations.append(span)
elif span["type"] == ContentType.InterlineEquation:
elif span['type'] == ContentType.InterlineEquation:
interline_equations.append(span)
else:
continue
......@@ -254,10 +313,10 @@ def get_qa_need_list_v2(blocks):
interline_equations = []
for block in blocks:
if block["type"] == BlockType.Image:
if block['type'] == BlockType.Image:
images.append(block)
elif block["type"] == BlockType.Table:
elif block['type'] == BlockType.Table:
tables.append(block)
elif block["type"] == BlockType.InterlineEquation:
elif block['type'] == BlockType.InterlineEquation:
interline_equations.append(block)
return images, tables, interline_equations
from magic_pdf.libs.commons import fitz
from magic_pdf.config.drop_reason import DropReason
from magic_pdf.libs.boxbase import _is_in, _is_in_or_part_overlap
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.commons import fitz
def __area(box):
return (box[2] - box[0]) * (box[3] - box[1])
def __is_contain_color_background_rect(page:fitz.Page, text_blocks, image_bboxes) -> bool:
"""
检查page是包含有颜色背景的矩形
"""
def __is_contain_color_background_rect(
page: fitz.Page, text_blocks, image_bboxes
) -> bool:
"""检查page是包含有颜色背景的矩形."""
color_bg_rect = []
p_width, p_height = page.rect.width, page.rect.height
# 先找到最大的带背景矩形
blocks = page.get_cdrawings()
for block in blocks:
if 'fill' in block and block['fill']: # 过滤掉透明的
if 'fill' in block and block['fill']: # 过滤掉透明的
fill = list(block['fill'])
fill[0], fill[1], fill[2] = int(fill[0]), int(fill[1]), int(fill[2])
if fill==(1.0,1.0,1.0):
if fill == (1.0, 1.0, 1.0):
continue
rect = block['rect']
# 过滤掉特别小的矩形
if __area(rect) < 10*10:
if __area(rect) < 10 * 10:
continue
# 为了防止是svg图片上的色块,这里过滤掉这类
if any([_is_in_or_part_overlap(rect, img_bbox) for img_bbox in image_bboxes]):
if any(
[_is_in_or_part_overlap(rect, img_bbox) for img_bbox in image_bboxes]
):
continue
color_bg_rect.append(rect)
# 找到最大的背景矩形
if len(color_bg_rect) > 0:
max_rect = max(color_bg_rect, key=lambda x:__area(x))
max_rect_int = (int(max_rect[0]), int(max_rect[1]), int(max_rect[2]), int(max_rect[3]))
max_rect = max(color_bg_rect, key=lambda x: __area(x))
max_rect_int = (
int(max_rect[0]),
int(max_rect[1]),
int(max_rect[2]),
int(max_rect[3]),
)
# 判断最大的背景矩形是否包含超过3行文字,或者50个字 TODO
if max_rect[2]-max_rect[0] > 0.2*p_width and max_rect[3]-max_rect[1] > 0.1*p_height:#宽度符合
#看是否有文本块落入到这个矩形中
if (
max_rect[2] - max_rect[0] > 0.2 * p_width
and max_rect[3] - max_rect[1] > 0.1 * p_height
): # 宽度符合
# 看是否有文本块落入到这个矩形中
for text_block in text_blocks:
box = text_block['bbox']
box_int = (int(box[0]), int(box[1]), int(box[2]), int(box[3]))
if _is_in(box_int, max_rect_int):
return True
return False
def __is_table_overlap_text_block(text_blocks, table_bbox):
"""
检查table_bbox是否覆盖了text_blocks里的文本块
TODO
"""
"""检查table_bbox是否覆盖了text_blocks里的文本块 TODO."""
for text_block in text_blocks:
box = text_block['bbox']
if _is_in_or_part_overlap(table_bbox, box):
......@@ -60,15 +67,12 @@ def __is_table_overlap_text_block(text_blocks, table_bbox):
return False
def pdf_filter(page:fitz.Page, text_blocks, table_bboxes, image_bboxes) -> tuple:
"""
return:(True|False, err_msg)
True, 如果pdf符合要求
False, 如果pdf不符合要求
"""
def pdf_filter(page: fitz.Page, text_blocks, table_bboxes, image_bboxes) -> tuple:
"""return:(True|False, err_msg) True, 如果pdf符合要求 False, 如果pdf不符合要求."""
if __is_contain_color_background_rect(page, text_blocks, image_bboxes):
return False, {"_need_drop": True, "_drop_reason": DropReason.COLOR_BACKGROUND_TEXT_BOX}
return False, {
'_need_drop': True,
'_drop_reason': DropReason.COLOR_BACKGROUND_TEXT_BOX,
}
return True, None
\ No newline at end of file
return True, None
from magic_pdf.libs.boxbase import _is_in_or_part_overlap, _is_in, _is_part_overlap
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.config.drop_reason import DropReason
from magic_pdf.libs.boxbase import _is_in, _is_part_overlap
def _remove_overlap_between_bbox(bbox1, bbox2):
if _is_part_overlap(bbox1, bbox2):
if _is_part_overlap(bbox1, bbox2):
ix0, iy0, ix1, iy1 = bbox1
x0, y0, x1, y1 = bbox2
......@@ -22,10 +23,10 @@ def _remove_overlap_between_bbox(bbox1, bbox2):
if y1 >= iy1:
mid = (y0 + iy1) // 2
y0 = max(mid + 0.25, y0)
iy1 = min(iy1, mid-0.25)
iy1 = min(iy1, mid - 0.25)
else:
mid = (iy0 + y1) // 2
y1 = min(y1, mid-0.25)
y1 = min(y1, mid - 0.25)
iy0 = max(mid + 0.25, iy0)
if ix1 > ix0 and iy1 > iy0 and y1 > y0 and x1 > x0:
......@@ -34,8 +35,8 @@ def _remove_overlap_between_bbox(bbox1, bbox2):
return bbox1, bbox2, None
else:
return bbox1, bbox2, DropReason.NEGATIVE_BBOX_AREA
else:
return bbox1, bbox2, None
else:
return bbox1, bbox2, None
def _remove_overlap_between_bboxes(arr):
......@@ -47,7 +48,7 @@ def _remove_overlap_between_bboxes(arr):
for j in range(N):
if i == j:
continue
if _is_in(arr[i]["bbox"], arr[j]["bbox"]):
if _is_in(arr[i]['bbox'], arr[j]['bbox']):
keeps[i] = False
for idx, v in enumerate(arr):
......@@ -56,13 +57,15 @@ def _remove_overlap_between_bboxes(arr):
for i in range(N):
if res[i] is None:
continue
bbox1, bbox2, drop_reason = _remove_overlap_between_bbox(v["bbox"], res[i]["bbox"])
bbox1, bbox2, drop_reason = _remove_overlap_between_bbox(
v['bbox'], res[i]['bbox']
)
if drop_reason is None:
v["bbox"] = bbox1
res[i]["bbox"] = bbox2
v['bbox'] = bbox1
res[i]['bbox'] = bbox2
else:
if v["score"] > res[i]["score"]:
if v['score'] > res[i]['score']:
keeps[i] = False
res[i] = None
else:
......@@ -74,25 +77,24 @@ def _remove_overlap_between_bboxes(arr):
def remove_overlap_between_bbox_for_span(spans):
arr = [{"bbox": span["bbox"], "score": span.get("score", 0.1)} for span in spans ]
arr = [{'bbox': span['bbox'], 'score': span.get('score', 0.1)} for span in spans]
res, drop_reasons = _remove_overlap_between_bboxes(arr)
ret = []
for i in range(len(res)):
if res[i] is None:
continue
spans[i]["bbox"] = res[i]["bbox"]
spans[i]['bbox'] = res[i]['bbox']
ret.append(spans[i])
return ret, drop_reasons
def remove_overlap_between_bbox_for_block(all_bboxes):
arr = [{"bbox": bbox[:4], "score": bbox[-1]} for bbox in all_bboxes ]
arr = [{'bbox': bbox[:4], 'score': bbox[-1]} for bbox in all_bboxes]
res, drop_reasons = _remove_overlap_between_bboxes(arr)
ret = []
for i in range(len(res)):
if res[i] is None:
continue
all_bboxes[i][:4] = res[i]["bbox"]
all_bboxes[i][:4] = res[i]['bbox']
ret.append(all_bboxes[i])
return ret, drop_reasons
from magic_pdf.libs.boxbase import _is_in, _is_in_or_part_overlap, calculate_overlap_area_2_minbox_area_ratio
from loguru import logger
from magic_pdf.libs.drop_tag import COLOR_BG_HEADER_TXT_BLOCK
from magic_pdf.config.drop_tag import COLOR_BG_HEADER_TXT_BLOCK
from magic_pdf.libs.boxbase import (_is_in, _is_in_or_part_overlap,
calculate_overlap_area_2_minbox_area_ratio)
def __area(box):
......@@ -9,8 +10,7 @@ def __area(box):
def rectangle_position_determination(rect, p_width):
"""
判断矩形是否在页面中轴线附近。
"""判断矩形是否在页面中轴线附近。
Args:
rect (list): 矩形坐标,格式为[x1, y1, x2, y2]。
......@@ -34,9 +34,10 @@ def rectangle_position_determination(rect, p_width):
else:
return False
def remove_colored_strip_textblock(remain_text_blocks, page):
"""
根据页面中特定颜色和大小过滤文本块,将符合条件的文本块从remain_text_blocks中移除,并返回移除的文本块列表colored_strip_textblock。
"""根据页面中特定颜色和大小过滤文本块,将符合条件的文本块从remain_text_blocks中移除,并返回移除的文本块列表colored_str
ip_textblock。
Args:
remain_text_blocks (list): 剩余文本块列表。
......@@ -51,22 +52,44 @@ def remove_colored_strip_textblock(remain_text_blocks, page):
blocks = page.get_cdrawings()
colored_strip_bg_rect = []
for block in blocks:
is_filled = 'fill' in block and block['fill'] and block['fill'] != (1.0, 1.0, 1.0) # 过滤掉透明的
is_filled = (
'fill' in block and block['fill'] and block['fill'] != (1.0, 1.0, 1.0)
) # 过滤掉透明的
rect = block['rect']
area_is_large_enough = __area(rect) > 100 # 过滤掉特别小的矩形
rectangle_position_determination_result = rectangle_position_determination(rect, p_width)
in_upper_half_page = rect[3] < p_height * 0.3 # 找到位于页面上半部分的矩形,下边界小于页面高度的30%
aspect_ratio_exceeds_4 = (rect[2] - rect[0]) > (rect[3] - rect[1]) * 4 # 找到长宽比超过4的矩形
rectangle_position_determination_result = rectangle_position_determination(
rect, p_width
)
in_upper_half_page = (
rect[3] < p_height * 0.3
) # 找到位于页面上半部分的矩形,下边界小于页面高度的30%
aspect_ratio_exceeds_4 = (rect[2] - rect[0]) > (
rect[3] - rect[1]
) * 4 # 找到长宽比超过4的矩形
if is_filled and area_is_large_enough and rectangle_position_determination_result and in_upper_half_page and aspect_ratio_exceeds_4:
if (
is_filled
and area_is_large_enough
and rectangle_position_determination_result
and in_upper_half_page
and aspect_ratio_exceeds_4
):
colored_strip_bg_rect.append(rect)
if len(colored_strip_bg_rect) > 0:
for colored_strip_block_bbox in colored_strip_bg_rect:
for text_block in remain_text_blocks:
text_bbox = text_block['bbox']
if _is_in(text_bbox, colored_strip_block_bbox) or (_is_in_or_part_overlap(text_bbox, colored_strip_block_bbox) and calculate_overlap_area_2_minbox_area_ratio(text_bbox, colored_strip_block_bbox) > 0.6):
logger.info(f'remove_colored_strip_textblock: {text_bbox}, {colored_strip_block_bbox}')
if _is_in(text_bbox, colored_strip_block_bbox) or (
_is_in_or_part_overlap(text_bbox, colored_strip_block_bbox)
and calculate_overlap_area_2_minbox_area_ratio(
text_bbox, colored_strip_block_bbox
)
> 0.6
):
logger.info(
f'remove_colored_strip_textblock: {text_bbox}, {colored_strip_block_bbox}'
)
text_block['tag'] = COLOR_BG_HEADER_TXT_BLOCK
colored_strip_textblocks.append(text_block)
......@@ -76,4 +99,3 @@ def remove_colored_strip_textblock(remain_text_blocks, page):
remain_text_blocks.remove(colored_strip_textblock)
return remain_text_blocks, colored_strip_textblocks
import re
from magic_pdf.config.drop_tag import CONTENT_IN_FOOT_OR_HEADER, PAGE_NO
from magic_pdf.libs.boxbase import _is_in_or_part_overlap
from magic_pdf.libs.drop_tag import CONTENT_IN_FOOT_OR_HEADER, PAGE_NO
def remove_headder_footer_one_page(text_raw_blocks, image_bboxes, table_bboxes, header_bboxs, footer_bboxs,
page_no_bboxs, page_w, page_h):
"""
删除页眉页脚,页码
从line级别进行删除,删除之后观察这个text-block是否是空的,如果是空的,则移动到remove_list中
"""
"""删除页眉页脚,页码 从line级别进行删除,删除之后观察这个text-block是否是空的,如果是空的,则移动到remove_list中."""
header = []
footer = []
if len(header) == 0:
......
import math
import re
from magic_pdf.config.drop_tag import (EMPTY_SIDE_BLOCK, ROTATE_TEXT,
VERTICAL_TEXT)
from magic_pdf.libs.boxbase import is_vbox_on_side
from magic_pdf.libs.drop_tag import EMPTY_SIDE_BLOCK, ROTATE_TEXT, VERTICAL_TEXT
def detect_non_horizontal_texts(result_dict):
"""
This function detects watermarks and vertical margin notes in the document.
"""This function detects watermarks and vertical margin notes in the
document.
Watermarks are identified by finding blocks with the same coordinates and frequently occurring identical texts across multiple pages.
If these conditions are met, the blocks are highly likely to be watermarks, as opposed to headers or footers, which can change from page to page.
If the direction of these blocks is not horizontal, they are definitely considered to be watermarks.
Vertical margin notes are identified by finding blocks with the same coordinates and frequently occurring identical texts across multiple pages.
If these conditions are met, the blocks are highly likely to be vertical margin notes, which typically appear on the left and right sides of the page.
If these conditions are met, the blocks are highly likely to be vertical margin notes, which typically appear on the left and right sides of the page. # noqa: E501
If the direction of these blocks is vertical, they are definitely considered to be vertical margin notes.
......@@ -32,13 +34,16 @@ def detect_non_horizontal_texts(result_dict):
potential_margin_notes = {}
for page_id, page_content in result_dict.items():
if page_id.startswith("page_"):
if page_id.startswith('page_'):
for block_id, block_data in page_content.items():
if block_id.startswith("block_"):
if "dir" in block_data:
coordinates_text = (block_data["bbox"], block_data["text"]) # Tuple of coordinates and text
angle = math.atan2(block_data["dir"][1], block_data["dir"][0])
if block_id.startswith('block_'):
if 'dir' in block_data:
coordinates_text = (
block_data['bbox'],
block_data['text'],
) # Tuple of coordinates and text
angle = math.atan2(block_data['dir'][1], block_data['dir'][0])
angle = abs(math.degrees(angle))
if angle > 5 and angle < 85: # Check if direction is watermarks
......@@ -49,32 +54,40 @@ def detect_non_horizontal_texts(result_dict):
if angle > 85 and angle < 105: # Check if direction is vertical
if coordinates_text in potential_margin_notes:
potential_margin_notes[coordinates_text] += 1 # Increment count
potential_margin_notes[coordinates_text] += (
1 # Increment count
)
else:
potential_margin_notes[coordinates_text] = 1 # Initialize count
potential_margin_notes[coordinates_text] = (
1 # Initialize count
)
# Identify watermarks by finding entries with counts higher than a threshold (e.g., appearing on more than half of the pages)
watermark_threshold = len(result_dict) // 2
watermarks = {k: v for k, v in potential_watermarks.items() if v > watermark_threshold}
watermarks = {
k: v for k, v in potential_watermarks.items() if v > watermark_threshold
}
# Identify margin notes by finding entries with counts higher than a threshold (e.g., appearing on more than half of the pages)
margin_note_threshold = len(result_dict) // 2
margin_notes = {k: v for k, v in potential_margin_notes.items() if v > margin_note_threshold}
margin_notes = {
k: v for k, v in potential_margin_notes.items() if v > margin_note_threshold
}
# Add watermark information to the result dictionary
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
if page_id.startswith('page_'):
for block_id, block_data in blocks.items():
coordinates_text = (block_data["bbox"], block_data["text"])
coordinates_text = (block_data['bbox'], block_data['text'])
if coordinates_text in watermarks:
block_data["is_watermark"] = 1
block_data['is_watermark'] = 1
else:
block_data["is_watermark"] = 0
block_data['is_watermark'] = 0
if coordinates_text in margin_notes:
block_data["is_vertical_margin_note"] = 1
block_data['is_vertical_margin_note'] = 1
else:
block_data["is_vertical_margin_note"] = 0
block_data['is_vertical_margin_note'] = 0
return result_dict
......@@ -83,21 +96,21 @@ def detect_non_horizontal_texts(result_dict):
1. 当一个block里全部文字都不是dir=(1,0),这个block整体去掉
2. 当一个block里全部文字都是dir=(1,0),但是每行只有一个字,这个block整体去掉。这个block必须出现在页面的四周,否则不去掉
"""
import re
def __is_a_word(sentence):
# 如果输入是中文并且长度为1,则返回True
if re.fullmatch(r'[\u4e00-\u9fa5]', sentence):
return True
# 判断是否为单个英文单词或字符(包括ASCII标点)
elif re.fullmatch(r'[a-zA-Z0-9]+', sentence) and len(sentence) <=2:
elif re.fullmatch(r'[a-zA-Z0-9]+', sentence) and len(sentence) <= 2:
return True
else:
return False
def __get_text_color(num):
"""获取字体的颜色RGB值"""
"""获取字体的颜色RGB值."""
blue = num & 255
green = (num >> 8) & 255
red = (num >> 16) & 255
......@@ -105,84 +118,119 @@ def __get_text_color(num):
def __is_empty_side_box(text_block):
"""
是否是边缘上的空白没有任何内容的block
"""
"""是否是边缘上的空白没有任何内容的block."""
for line in text_block['lines']:
for span in line['spans']:
font_color = span['color']
r,g,b = __get_text_color(font_color)
if len(span['text'].strip())>0 and (r,g,b)!=(255,255,255):
r, g, b = __get_text_color(font_color)
if len(span['text'].strip()) > 0 and (r, g, b) != (255, 255, 255):
return False
return True
def remove_rotate_side_textblock(pymu_text_block, page_width, page_height):
"""
返回删除了垂直,水印,旋转的textblock
删除的内容打上tag返回
"""
"""返回删除了垂直,水印,旋转的textblock 删除的内容打上tag返回."""
removed_text_block = []
for i, block in enumerate(pymu_text_block): # 格式参考test/assets/papre/pymu_textblocks.json
for i, block in enumerate(
pymu_text_block
): # 格式参考test/assets/papre/pymu_textblocks.json
lines = block['lines']
block_bbox = block['bbox']
if not is_vbox_on_side(block_bbox, page_width, page_height, 0.2): # 保证这些box必须在页面的两边
continue
if all([__is_a_word(line['spans'][0]["text"]) for line in lines if len(line['spans'])>0]) and len(lines)>1 and all([len(line['spans'])==1 for line in lines]):
is_box_valign = (len(set([int(line['spans'][0]['bbox'][0] ) for line in lines if len(line['spans'])>0]))==1) and (len([int(line['spans'][0]['bbox'][0] ) for line in lines if len(line['spans'])>0])>1) # 测试bbox在垂直方向是不是x0都相等,也就是在垂直方向排列.同时必须大于等于2个字
if not is_vbox_on_side(
block_bbox, page_width, page_height, 0.2
): # 保证这些box必须在页面的两边
continue
if (
all(
[
__is_a_word(line['spans'][0]['text'])
for line in lines
if len(line['spans']) > 0
]
)
and len(lines) > 1
and all([len(line['spans']) == 1 for line in lines])
):
is_box_valign = (
(
len(
set(
[
int(line['spans'][0]['bbox'][0])
for line in lines
if len(line['spans']) > 0
]
)
)
== 1
)
and (
len(
[
int(line['spans'][0]['bbox'][0])
for line in lines
if len(line['spans']) > 0
]
)
> 1
)
) # 测试bbox在垂直方向是不是x0都相等,也就是在垂直方向排列.同时必须大于等于2个字
if is_box_valign:
block['tag'] = VERTICAL_TEXT
removed_text_block.append(block)
continue
for line in lines:
if line['dir']!=(1,0):
if line['dir'] != (1, 0):
block['tag'] = ROTATE_TEXT
removed_text_block.append(block) # 只要有一个line不是dir=(1,0),就把整个block都删掉
removed_text_block.append(
block
) # 只要有一个line不是dir=(1,0),就把整个block都删掉
break
for block in removed_text_block:
pymu_text_block.remove(block)
return pymu_text_block, removed_text_block
def get_side_boundry(rotate_bbox, page_width, page_height):
"""
根据rotate_bbox,返回页面的左右正文边界
"""
"""根据rotate_bbox,返回页面的左右正文边界."""
left_x = 0
right_x = page_width
for x in rotate_bbox:
box = x['bbox']
if box[2]<page_width/2:
if box[2] < page_width / 2:
left_x = max(left_x, box[2])
else:
right_x = min(right_x, box[0])
return left_x+1, right_x-1
return left_x + 1, right_x - 1
def remove_side_blank_block(pymu_text_block, page_width, page_height):
"""
删除页面两侧的空白block
"""
"""删除页面两侧的空白block."""
removed_text_block = []
for i, block in enumerate(pymu_text_block): # 格式参考test/assets/papre/pymu_textblocks.json
for i, block in enumerate(
pymu_text_block
): # 格式参考test/assets/papre/pymu_textblocks.json
block_bbox = block['bbox']
if not is_vbox_on_side(block_bbox, page_width, page_height, 0.2): # 保证这些box必须在页面的两边
continue
if not is_vbox_on_side(
block_bbox, page_width, page_height, 0.2
): # 保证这些box必须在页面的两边
continue
if __is_empty_side_box(block):
block['tag'] = EMPTY_SIDE_BLOCK
removed_text_block.append(block)
continue
for block in removed_text_block:
pymu_text_block.remove(block)
return pymu_text_block, removed_text_block
\ No newline at end of file
return pymu_text_block, removed_text_block
......@@ -4,8 +4,9 @@
2. 然后去掉出现在文字blcok上的图片bbox
"""
from magic_pdf.libs.boxbase import _is_in, _is_in_or_part_overlap, _is_left_overlap
from magic_pdf.libs.drop_tag import ON_IMAGE_TEXT, ON_TABLE_TEXT
from magic_pdf.config.drop_tag import ON_IMAGE_TEXT, ON_TABLE_TEXT
from magic_pdf.libs.boxbase import (_is_in, _is_in_or_part_overlap,
_is_left_overlap)
def resolve_bbox_overlap_conflict(images: list, tables: list, interline_equations: list, inline_equations: list,
......@@ -26,14 +27,14 @@ def resolve_bbox_overlap_conflict(images: list, tables: list, interline_equation
# 去掉位于图片上的文字block
for image_box in images:
for text_block in text_raw_blocks:
text_bbox = text_block["bbox"]
text_bbox = text_block['bbox']
if _is_in(text_bbox, image_box):
text_block['tag'] = ON_IMAGE_TEXT
text_block_removed.append(text_block)
# 去掉table上的文字block
for table_box in tables:
for text_block in text_raw_blocks:
text_bbox = text_block["bbox"]
text_bbox = text_block['bbox']
if _is_in(text_bbox, table_box):
text_block['tag'] = ON_TABLE_TEXT
text_block_removed.append(text_block)
......@@ -77,7 +78,7 @@ def resolve_bbox_overlap_conflict(images: list, tables: list, interline_equation
# 图片和文字重叠,丢掉图片
for image_box in images:
for text_block in text_raw_blocks:
text_bbox = text_block["bbox"]
text_bbox = text_block['bbox']
if _is_in_or_part_overlap(image_box, text_bbox):
images_backup.append(image_box)
break
......@@ -122,11 +123,7 @@ def resolve_bbox_overlap_conflict(images: list, tables: list, interline_equation
def check_text_block_horizontal_overlap(text_blocks: list, header, footer) -> bool:
"""
检查文本block之间的水平重叠情况,这种情况如果发生,那么这个pdf就不再继续处理了。
因为这种情况大概率发生了公式没有被检测出来。
"""
"""检查文本block之间的水平重叠情况,这种情况如果发生,那么这个pdf就不再继续处理了。 因为这种情况大概率发生了公式没有被检测出来。"""
if len(text_blocks) == 0:
return False
......@@ -148,7 +145,7 @@ def check_text_block_horizontal_overlap(text_blocks: list, header, footer) -> bo
txt_bboxes = []
for text_block in text_blocks:
bbox = text_block["bbox"]
bbox = text_block['bbox']
if bbox[1] >= clip_y0 and bbox[3] <= clip_y1:
txt_bboxes.append(bbox)
......@@ -161,11 +158,7 @@ def check_text_block_horizontal_overlap(text_blocks: list, header, footer) -> bo
def check_useful_block_horizontal_overlap(useful_blocks: list) -> bool:
"""
检查文本block之间的水平重叠情况,这种情况如果发生,那么这个pdf就不再继续处理了。
因为这种情况大概率发生了公式没有被检测出来。
"""
"""检查文本block之间的水平重叠情况,这种情况如果发生,那么这个pdf就不再继续处理了。 因为这种情况大概率发生了公式没有被检测出来。"""
if len(useful_blocks) == 0:
return False
......@@ -174,7 +167,7 @@ def check_useful_block_horizontal_overlap(useful_blocks: list) -> bool:
useful_bboxes = []
for text_block in useful_blocks:
bbox = text_block["bbox"]
bbox = text_block['bbox']
if bbox[1] >= page_min_y and bbox[3] <= page_max_y:
useful_bboxes.append(bbox)
......
from loguru import logger
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.config.drop_reason import DropReason
def get_data_source(jso: dict):
data_source = jso.get("data_source")
data_source = jso.get('data_source')
if data_source is None:
data_source = jso.get("file_source")
data_source = jso.get('file_source')
return data_source
def get_data_type(jso: dict):
data_type = jso.get("data_type")
data_type = jso.get('data_type')
if data_type is None:
data_type = jso.get("file_type")
data_type = jso.get('file_type')
return data_type
def get_bookid(jso: dict):
book_id = jso.get("bookid")
book_id = jso.get('bookid')
if book_id is None:
book_id = jso.get("original_file_id")
book_id = jso.get('original_file_id')
return book_id
def exception_handler(jso: dict, e):
logger.exception(e)
jso["_need_drop"] = True
jso["_drop_reason"] = DropReason.Exception
jso["_exception"] = f"ERROR: {e}"
jso['_need_drop'] = True
jso['_drop_reason'] = DropReason.Exception
jso['_exception'] = f'ERROR: {e}'
return jso
def get_bookname(jso: dict):
data_source = get_data_source(jso)
file_id = jso.get("file_id")
book_name = f"{data_source}/{file_id}"
file_id = jso.get('file_id')
book_name = f'{data_source}/{file_id}'
return book_name
def spark_json_extractor(jso: dict) -> dict:
"""
从json中提取数据,返回一个dict
"""
"""从json中提取数据,返回一个dict."""
return {
"_pdf_type": jso["_pdf_type"],
"model_list": jso["doc_layout_result"],
'_pdf_type': jso['_pdf_type'],
'model_list': jso['doc_layout_result'],
}
......@@ -5,9 +5,8 @@ import click
from loguru import logger
import magic_pdf.model as model_config
from magic_pdf.data.data_reader_writer import FileBasedDataReader
from magic_pdf.libs.version import __version__
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.tools.common import do_parse, parse_pdf_methods
......@@ -86,8 +85,8 @@ def cli(path, output_dir, method, lang, debug_able, start_page_id, end_page_id):
os.makedirs(output_dir, exist_ok=True)
def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
disk_rw = FileBasedDataReader(os.path.dirname(path))
return disk_rw.read(os.path.basename(path))
def parse_doc(doc_path: str):
try:
......
......@@ -5,13 +5,11 @@ from pathlib import Path
import click
import magic_pdf.model as model_config
from magic_pdf.data.data_reader_writer import FileBasedDataReader, S3DataReader
from magic_pdf.libs.config_reader import get_s3_config
from magic_pdf.libs.path_utils import (parse_s3_range_params, parse_s3path,
remove_non_official_s3_args)
from magic_pdf.libs.version import __version__
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter
from magic_pdf.tools.common import do_parse, parse_pdf_methods
......@@ -19,15 +17,14 @@ def read_s3_path(s3path):
bucket, key = parse_s3path(s3path)
s3_ak, s3_sk, s3_endpoint = get_s3_config(bucket)
s3_rw = S3ReaderWriter(s3_ak, s3_sk, s3_endpoint, 'auto',
remove_non_official_s3_args(s3path))
s3_rw = S3DataReader('', bucket, s3_ak, s3_sk, s3_endpoint, 'auto')
may_range_params = parse_s3_range_params(s3path)
if may_range_params is None or 2 != len(may_range_params):
byte_start, byte_end = 0, None
byte_start, byte_end = 0, -1
else:
byte_start, byte_end = int(may_range_params[0]), int(
may_range_params[1])
return s3_rw.read_offset(
return s3_rw.read_at(
remove_non_official_s3_args(s3path),
byte_start,
byte_end,
......@@ -129,8 +126,8 @@ def pdf(pdf, json_data, output_dir, method):
os.makedirs(output_dir, exist_ok=True)
def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
disk_rw = FileBasedDataReader(os.path.dirname(path))
return disk_rw.read(os.path.basename(path))
model_json_list = json_parse.loads(read_fn(json_data).decode('utf-8'))
......
......@@ -3,18 +3,18 @@ import json as json_parse
import os
import click
import fitz
from loguru import logger
import magic_pdf.model as model_config
from magic_pdf.config.make_content_config import DropMode, MakeMode
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
from magic_pdf.libs.draw_bbox import (draw_layout_bbox, draw_line_sort_bbox,
draw_model_bbox, draw_span_bbox)
from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
import fitz
# from io import BytesIO
# from pypdf import PdfReader, PdfWriter
......@@ -54,11 +54,11 @@ def prepare_env(output_dir, pdf_file_name, method):
def convert_pdf_bytes_to_bytes_by_pymupdf(pdf_bytes, start_page_id=0, end_page_id=None):
document = fitz.open("pdf", pdf_bytes)
document = fitz.open('pdf', pdf_bytes)
output_document = fitz.open()
end_page_id = end_page_id if end_page_id is not None and end_page_id >= 0 else len(document) - 1
if end_page_id > len(document) - 1:
logger.warning("end_page_id is out of range, use pdf_docs length")
logger.warning('end_page_id is out of range, use pdf_docs length')
end_page_id = len(document) - 1
output_document.insert_pdf(document, from_page=start_page_id, to_page=end_page_id)
output_bytes = output_document.tobytes()
......@@ -94,14 +94,17 @@ def do_parse(
f_draw_model_bbox = True
f_draw_line_sort_bbox = True
if lang == "":
lang = None
pdf_bytes = convert_pdf_bytes_to_bytes_by_pymupdf(pdf_bytes, start_page_id, end_page_id)
orig_model_list = copy.deepcopy(model_list)
local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name,
parse_method)
image_writer, md_writer = DiskReaderWriter(
local_image_dir), DiskReaderWriter(local_md_dir)
image_writer, md_writer = FileBasedDataWriter(
local_image_dir), FileBasedDataWriter(local_md_dir)
image_dir = str(os.path.basename(local_image_dir))
if parse_method == 'auto':
......@@ -145,49 +148,36 @@ def do_parse(
if f_draw_line_sort_bbox:
draw_line_sort_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
md_content = pipe.pipe_mk_markdown(image_dir,
drop_mode=DropMode.NONE,
md_make_mode=f_make_md_mode)
md_content = pipe.pipe_mk_markdown(image_dir, drop_mode=DropMode.NONE, md_make_mode=f_make_md_mode)
if f_dump_md:
md_writer.write(
content=md_content,
path=f'{pdf_file_name}.md',
mode=AbsReaderWriter.MODE_TXT,
md_writer.write_string(
f'{pdf_file_name}.md',
md_content
)
if f_dump_middle_json:
md_writer.write(
content=json_parse.dumps(pipe.pdf_mid_data,
ensure_ascii=False,
indent=4),
path=f'{pdf_file_name}_middle.json',
mode=AbsReaderWriter.MODE_TXT,
md_writer.write_string(
f'{pdf_file_name}_middle.json',
json_parse.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4)
)
if f_dump_model_json:
md_writer.write(
content=json_parse.dumps(orig_model_list,
ensure_ascii=False,
indent=4),
path=f'{pdf_file_name}_model.json',
mode=AbsReaderWriter.MODE_TXT,
md_writer.write_string(
f'{pdf_file_name}_model.json',
json_parse.dumps(orig_model_list, ensure_ascii=False, indent=4)
)
if f_dump_orig_pdf:
md_writer.write(
content=pdf_bytes,
path=f'{pdf_file_name}_origin.pdf',
mode=AbsReaderWriter.MODE_BIN,
f'{pdf_file_name}_origin.pdf',
pdf_bytes,
)
content_list = pipe.pipe_mk_uni_format(image_dir, drop_mode=DropMode.NONE)
if f_dump_content_list:
md_writer.write(
content=json_parse.dumps(content_list,
ensure_ascii=False,
indent=4),
path=f'{pdf_file_name}_content_list.json',
mode=AbsReaderWriter.MODE_TXT,
md_writer.write_string(
f'{pdf_file_name}_content_list.json',
json_parse.dumps(content_list, ensure_ascii=False, indent=4)
)
logger.info(f'local output dir is {local_md_dir}')
......
"""
用户输入:
model数组,每个元素代表一个页面
pdf在s3的路径
截图保存的s3位置
"""用户输入: model数组,每个元素代表一个页面 pdf在s3的路径 截图保存的s3位置.
然后:
1)根据s3路径,调用spark集群的api,拿到ak,sk,endpoint,构造出s3PDFReader
2)根据用户输入的s3地址,调用spark集群的api,拿到ak,sk,endpoint,构造出s3ImageWriter
其余部分至于构造s3cli, 获取ak,sk都在code-clean里写代码完成。不要反向依赖!!!
"""
import re
from loguru import logger
from magic_pdf.data.data_reader_writer import DataWriter
from magic_pdf.libs.version import __version__
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.rw import AbsReaderWriter
from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
PARSE_TYPE_TXT = "txt"
PARSE_TYPE_OCR = "ocr"
PARSE_TYPE_TXT = 'txt'
PARSE_TYPE_OCR = 'ocr'
def parse_txt_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWriter, is_debug=False,
def parse_txt_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: DataWriter, is_debug=False,
start_page_id=0, end_page_id=None, lang=None,
*args, **kwargs):
"""
解析文本类pdf
"""
"""解析文本类pdf."""
pdf_info_dict = parse_pdf_by_txt(
pdf_bytes,
pdf_models,
......@@ -38,24 +30,23 @@ def parse_txt_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWrit
start_page_id=start_page_id,
end_page_id=end_page_id,
debug_mode=is_debug,
lang=lang,
)
pdf_info_dict["_parse_type"] = PARSE_TYPE_TXT
pdf_info_dict['_parse_type'] = PARSE_TYPE_TXT
pdf_info_dict["_version_name"] = __version__
pdf_info_dict['_version_name'] = __version__
if lang is not None:
pdf_info_dict["_lang"] = lang
pdf_info_dict['_lang'] = lang
return pdf_info_dict
def parse_ocr_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWriter, is_debug=False,
def parse_ocr_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: DataWriter, is_debug=False,
start_page_id=0, end_page_id=None, lang=None,
*args, **kwargs):
"""
解析ocr类pdf
"""
"""解析ocr类pdf."""
pdf_info_dict = parse_pdf_by_ocr(
pdf_bytes,
pdf_models,
......@@ -63,25 +54,24 @@ def parse_ocr_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWrit
start_page_id=start_page_id,
end_page_id=end_page_id,
debug_mode=is_debug,
lang=lang,
)
pdf_info_dict["_parse_type"] = PARSE_TYPE_OCR
pdf_info_dict['_parse_type'] = PARSE_TYPE_OCR
pdf_info_dict["_version_name"] = __version__
pdf_info_dict['_version_name'] = __version__
if lang is not None:
pdf_info_dict["_lang"] = lang
pdf_info_dict['_lang'] = lang
return pdf_info_dict
def parse_union_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWriter, is_debug=False,
def parse_union_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: DataWriter, is_debug=False,
input_model_is_empty: bool = False,
start_page_id=0, end_page_id=None, lang=None,
*args, **kwargs):
"""
ocr和文本混合的pdf,全部解析出来
"""
"""ocr和文本混合的pdf,全部解析出来."""
def parse_pdf(method):
try:
......@@ -92,18 +82,19 @@ def parse_union_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWr
start_page_id=start_page_id,
end_page_id=end_page_id,
debug_mode=is_debug,
lang=lang,
)
except Exception as e:
logger.exception(e)
return None
pdf_info_dict = parse_pdf(parse_pdf_by_txt)
if pdf_info_dict is None or pdf_info_dict.get("_need_drop", False):
logger.warning(f"parse_pdf_by_txt drop or error, switch to parse_pdf_by_ocr")
if pdf_info_dict is None or pdf_info_dict.get('_need_drop', False):
logger.warning('parse_pdf_by_txt drop or error, switch to parse_pdf_by_ocr')
if input_model_is_empty:
layout_model = kwargs.get("layout_model", None)
formula_enable = kwargs.get("formula_enable", None)
table_enable = kwargs.get("table_enable", None)
layout_model = kwargs.get('layout_model', None)
formula_enable = kwargs.get('formula_enable', None)
table_enable = kwargs.get('table_enable', None)
pdf_models = doc_analyze(
pdf_bytes,
ocr=True,
......@@ -116,15 +107,15 @@ def parse_union_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWr
)
pdf_info_dict = parse_pdf(parse_pdf_by_ocr)
if pdf_info_dict is None:
raise Exception("Both parse_pdf_by_txt and parse_pdf_by_ocr failed.")
raise Exception('Both parse_pdf_by_txt and parse_pdf_by_ocr failed.')
else:
pdf_info_dict["_parse_type"] = PARSE_TYPE_OCR
pdf_info_dict['_parse_type'] = PARSE_TYPE_OCR
else:
pdf_info_dict["_parse_type"] = PARSE_TYPE_TXT
pdf_info_dict['_parse_type'] = PARSE_TYPE_TXT
pdf_info_dict["_version_name"] = __version__
pdf_info_dict['_version_name'] = __version__
if lang is not None:
pdf_info_dict["_lang"] = lang
pdf_info_dict['_lang'] = lang
return pdf_info_dict
......@@ -2,39 +2,37 @@
import base64
import os
import re
import time
import uuid
import zipfile
from pathlib import Path
import re
import gradio as gr
import pymupdf
from gradio_pdf import PDF
from loguru import logger
from magic_pdf.data.data_reader_writer import FileBasedDataReader
from magic_pdf.libs.hash_utils import compute_sha256
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.tools.common import do_parse, prepare_env
import gradio as gr
from gradio_pdf import PDF
def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
disk_rw = FileBasedDataReader(os.path.dirname(path))
return disk_rw.read(os.path.basename(path))
def parse_pdf(doc_path, output_dir, end_page_id, is_ocr, layout_mode, formula_enable, table_enable, language):
os.makedirs(output_dir, exist_ok=True)
try:
file_name = f"{str(Path(doc_path).stem)}_{time.time()}"
file_name = f'{str(Path(doc_path).stem)}_{time.time()}'
pdf_data = read_fn(doc_path)
if is_ocr:
parse_method = "ocr"
parse_method = 'ocr'
else:
parse_method = "auto"
parse_method = 'auto'
local_image_dir, local_md_dir = prepare_env(output_dir, file_name, parse_method)
do_parse(
output_dir,
......@@ -55,8 +53,7 @@ def parse_pdf(doc_path, output_dir, end_page_id, is_ocr, layout_mode, formula_en
def compress_directory_to_zip(directory_path, output_zip_path):
"""
压缩指定目录到一个 ZIP 文件。
"""压缩指定目录到一个 ZIP 文件。
:param directory_path: 要压缩的目录路径
:param output_zip_path: 输出的 ZIP 文件路径
......@@ -80,7 +77,7 @@ def compress_directory_to_zip(directory_path, output_zip_path):
def image_to_base64(image_path):
with open(image_path, "rb") as image_file:
with open(image_path, 'rb') as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
......@@ -93,7 +90,7 @@ def replace_image_with_base64(markdown_text, image_dir_path):
relative_path = match.group(1)
full_path = os.path.join(image_dir_path, relative_path)
base64_image = image_to_base64(full_path)
return f"![{relative_path}](data:image/jpeg;base64,{base64_image})"
return f'![{relative_path}](data:image/jpeg;base64,{base64_image})'
# 应用替换
return re.sub(pattern, replace, markdown_text)
......@@ -103,34 +100,34 @@ def to_markdown(file_path, end_pages, is_ocr, layout_mode, formula_enable, table
# 获取识别的md文件以及压缩包文件路径
local_md_dir, file_name = parse_pdf(file_path, './output', end_pages - 1, is_ocr,
layout_mode, formula_enable, table_enable, language)
archive_zip_path = os.path.join("./output", compute_sha256(local_md_dir) + ".zip")
archive_zip_path = os.path.join('./output', compute_sha256(local_md_dir) + '.zip')
zip_archive_success = compress_directory_to_zip(local_md_dir, archive_zip_path)
if zip_archive_success == 0:
logger.info("压缩成功")
logger.info('压缩成功')
else:
logger.error("压缩失败")
md_path = os.path.join(local_md_dir, file_name + ".md")
logger.error('压缩失败')
md_path = os.path.join(local_md_dir, file_name + '.md')
with open(md_path, 'r', encoding='utf-8') as f:
txt_content = f.read()
md_content = replace_image_with_base64(txt_content, local_md_dir)
# 返回转换后的PDF路径
new_pdf_path = os.path.join(local_md_dir, file_name + "_layout.pdf")
new_pdf_path = os.path.join(local_md_dir, file_name + '_layout.pdf')
return md_content, txt_content, archive_zip_path, new_pdf_path
latex_delimiters = [{"left": "$$", "right": "$$", "display": True},
{"left": '$', "right": '$', "display": False}]
latex_delimiters = [{'left': '$$', 'right': '$$', 'display': True},
{'left': '$', 'right': '$', 'display': False}]
def init_model():
from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton
try:
model_manager = ModelSingleton()
txt_model = model_manager.get_model(False, False)
logger.info(f"txt_model init final")
ocr_model = model_manager.get_model(True, False)
logger.info(f"ocr_model init final")
txt_model = model_manager.get_model(False, False) # noqa: F841
logger.info('txt_model init final')
ocr_model = model_manager.get_model(True, False) # noqa: F841
logger.info('ocr_model init final')
return 0
except Exception as e:
logger.exception(e)
......@@ -138,31 +135,31 @@ def init_model():
model_init = init_model()
logger.info(f"model_init: {model_init}")
logger.info(f'model_init: {model_init}')
with open("header.html", "r") as file:
with open('header.html', 'r') as file:
header = file.read()
latin_lang = [
'af', 'az', 'bs', 'cs', 'cy', 'da', 'de', 'es', 'et', 'fr', 'ga', 'hr',
'af', 'az', 'bs', 'cs', 'cy', 'da', 'de', 'es', 'et', 'fr', 'ga', 'hr', # noqa: E126
'hu', 'id', 'is', 'it', 'ku', 'la', 'lt', 'lv', 'mi', 'ms', 'mt', 'nl',
'no', 'oc', 'pi', 'pl', 'pt', 'ro', 'rs_latin', 'sk', 'sl', 'sq', 'sv',
'sw', 'tl', 'tr', 'uz', 'vi', 'french', 'german'
]
arabic_lang = ['ar', 'fa', 'ug', 'ur']
cyrillic_lang = [
'ru', 'rs_cyrillic', 'be', 'bg', 'uk', 'mn', 'abq', 'ady', 'kbd', 'ava',
'ru', 'rs_cyrillic', 'be', 'bg', 'uk', 'mn', 'abq', 'ady', 'kbd', 'ava', # noqa: E126
'dar', 'inh', 'che', 'lbe', 'lez', 'tab'
]
devanagari_lang = [
'hi', 'mr', 'ne', 'bh', 'mai', 'ang', 'bho', 'mah', 'sck', 'new', 'gom',
'hi', 'mr', 'ne', 'bh', 'mai', 'ang', 'bho', 'mah', 'sck', 'new', 'gom', # noqa: E126
'sa', 'bgc'
]
other_lang = ['ch', 'en', 'korean', 'japan', 'chinese_cht', 'ta', 'te', 'ka']
all_lang = [""]
all_lang = ['']
all_lang.extend([*other_lang, *latin_lang, *arabic_lang, *cyrillic_lang, *devanagari_lang])
......@@ -174,7 +171,7 @@ def to_pdf(file_path):
pdf_bytes = f.convert_to_pdf()
# 将pdfbytes 写入到uuid.pdf中
# 生成唯一的文件名
unique_filename = f"{uuid.uuid4()}.pdf"
unique_filename = f'{uuid.uuid4()}.pdf'
# 构建完整的文件路径
tmp_file_path = os.path.join(os.path.dirname(file_path), unique_filename)
......@@ -186,43 +183,43 @@ def to_pdf(file_path):
return tmp_file_path
if __name__ == "__main__":
if __name__ == '__main__':
with gr.Blocks() as demo:
gr.HTML(header)
with gr.Row():
with gr.Column(variant='panel', scale=5):
file = gr.File(label="Please upload a PDF or image", file_types=[".pdf", ".png", ".jpeg", ".jpg"])
max_pages = gr.Slider(1, 10, 5, step=1, label="Max convert pages")
file = gr.File(label='Please upload a PDF or image', file_types=['.pdf', '.png', '.jpeg', '.jpg'])
max_pages = gr.Slider(1, 10, 5, step=1, label='Max convert pages')
with gr.Row():
layout_mode = gr.Dropdown(["layoutlmv3", "doclayout_yolo"], label="Layout model", value="layoutlmv3")
language = gr.Dropdown(all_lang, label="Language", value="")
layout_mode = gr.Dropdown(['layoutlmv3', 'doclayout_yolo'], label='Layout model', value='layoutlmv3')
language = gr.Dropdown(all_lang, label='Language', value='')
with gr.Row():
formula_enable = gr.Checkbox(label="Enable formula recognition", value=True)
is_ocr = gr.Checkbox(label="Force enable OCR", value=False)
table_enable = gr.Checkbox(label="Enable table recognition(test)", value=False)
formula_enable = gr.Checkbox(label='Enable formula recognition', value=True)
is_ocr = gr.Checkbox(label='Force enable OCR', value=False)
table_enable = gr.Checkbox(label='Enable table recognition(test)', value=False)
with gr.Row():
change_bu = gr.Button("Convert")
clear_bu = gr.ClearButton(value="Clear")
pdf_show = PDF(label="PDF preview", interactive=True, height=800)
with gr.Accordion("Examples:"):
example_root = os.path.join(os.path.dirname(__file__), "examples")
change_bu = gr.Button('Convert')
clear_bu = gr.ClearButton(value='Clear')
pdf_show = PDF(label='PDF preview', interactive=True, height=800)
with gr.Accordion('Examples:'):
example_root = os.path.join(os.path.dirname(__file__), 'examples')
gr.Examples(
examples=[os.path.join(example_root, _) for _ in os.listdir(example_root) if
_.endswith("pdf")],
_.endswith('pdf')],
inputs=pdf_show
)
with gr.Column(variant='panel', scale=5):
output_file = gr.File(label="convert result", interactive=False)
output_file = gr.File(label='convert result', interactive=False)
with gr.Tabs():
with gr.Tab("Markdown rendering"):
md = gr.Markdown(label="Markdown rendering", height=900, show_copy_button=True,
with gr.Tab('Markdown rendering'):
md = gr.Markdown(label='Markdown rendering', height=900, show_copy_button=True,
latex_delimiters=latex_delimiters, line_breaks=True)
with gr.Tab("Markdown text"):
with gr.Tab('Markdown text'):
md_text = gr.TextArea(lines=45, show_copy_button=True)
file.upload(fn=to_pdf, inputs=file, outputs=pdf_show)
change_bu.click(fn=to_markdown, inputs=[pdf_show, max_pages, is_ocr, layout_mode, formula_enable, table_enable, language],
outputs=[md, md_text, output_file, pdf_show])
clear_bu.add([file, md, pdf_show, md_text, output_file, is_ocr, table_enable, language])
demo.launch(server_name="0.0.0.0")
\ No newline at end of file
demo.launch(server_name='0.0.0.0')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment