Commit 21fa7819 authored by myhloli's avatar myhloli
Browse files

refactor(pre_proc): remove unused functions and simplify code

- Remove unused imports and functions across multiple files
- Simplify code by deleting unnecessary comments and empty lines
- Update function signatures to match actual usage
- Replace redundant code with more efficient alternatives
parent 6a22b5ab
import time
from loguru import logger
from magic_pdf.config.drop_reason import DropReason
from magic_pdf.config.ocr_content_type import ContentType
from magic_pdf.layout.layout_sort import (LAYOUT_UNPROC, get_bboxes_layout,
get_columns_cnt_of_layout)
from magic_pdf.libs.commons import fitz, get_delta_time
from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.local_math import float_equal
from magic_pdf.model.magic_model import MagicModel
from magic_pdf.para.para_split_v2 import para_split
from magic_pdf.pre_proc.citationmarker_remove import remove_citation_marker
from magic_pdf.pre_proc.construct_page_dict import \
ocr_construct_page_component_v2
from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
from magic_pdf.pre_proc.equations_replace import (
combine_chars_to_pymudict, remove_chars_in_text_blocks,
replace_equations_in_textblock)
from magic_pdf.pre_proc.ocr_detect_all_bboxes import \
ocr_prepare_bboxes_for_layout_split
from magic_pdf.pre_proc.ocr_dict_merge import (fill_spans_in_blocks,
fix_block_spans,
fix_discarded_block,
sort_blocks_by_layout)
from magic_pdf.pre_proc.ocr_span_list_modify import (
get_qa_need_list_v2, remove_overlaps_low_confidence_spans,
remove_overlaps_min_spans)
from magic_pdf.pre_proc.resolve_bbox_conflict import \
check_useful_block_horizontal_overlap
def remove_horizontal_overlap_block_which_smaller(all_bboxes):
useful_blocks = []
for bbox in all_bboxes:
useful_blocks.append({'bbox': bbox[:4]})
is_useful_block_horz_overlap, smaller_bbox, bigger_bbox = (
check_useful_block_horizontal_overlap(useful_blocks)
)
if is_useful_block_horz_overlap:
logger.warning(
f'skip this page, reason: {DropReason.USEFUL_BLOCK_HOR_OVERLAP}, smaller bbox is {smaller_bbox}, bigger bbox is {bigger_bbox}'
)
for bbox in all_bboxes.copy():
if smaller_bbox == bbox[:4]:
all_bboxes.remove(bbox)
return is_useful_block_horz_overlap, all_bboxes
def __replace_STX_ETX(text_str: str):
"""Replace \u0002 and \u0003, as these characters become garbled when extracted using pymupdf. In fact, they were originally quotation marks.
Drawback: This issue is only observed in English text; it has not been found in Chinese text so far.
Args:
text_str (str): raw text
Returns:
_type_: replaced text
"""
if text_str:
s = text_str.replace('\u0002', "'")
s = s.replace('\u0003', "'")
return s
return text_str
def txt_spans_extract(pdf_page, inline_equations, interline_equations):
text_raw_blocks = pdf_page.get_text('dict', flags=fitz.TEXTFLAGS_TEXT)['blocks']
char_level_text_blocks = pdf_page.get_text('rawdict', flags=fitz.TEXTFLAGS_TEXT)[
'blocks'
]
text_blocks = combine_chars_to_pymudict(text_raw_blocks, char_level_text_blocks)
text_blocks = replace_equations_in_textblock(
text_blocks, inline_equations, interline_equations
)
text_blocks = remove_citation_marker(text_blocks)
text_blocks = remove_chars_in_text_blocks(text_blocks)
spans = []
for v in text_blocks:
for line in v['lines']:
for span in line['spans']:
bbox = span['bbox']
if float_equal(bbox[0], bbox[2]) or float_equal(bbox[1], bbox[3]):
continue
if span.get('type') not in (
ContentType.InlineEquation,
ContentType.InterlineEquation,
):
spans.append(
{
'bbox': list(span['bbox']),
'content': __replace_STX_ETX(span['text']),
'type': ContentType.Text,
'score': 1.0,
}
)
return spans
def replace_text_span(pymu_spans, ocr_spans):
return list(filter(lambda x: x['type'] != ContentType.Text, ocr_spans)) + pymu_spans
def parse_page_core(
pdf_docs, magic_model, page_id, pdf_bytes_md5, imageWriter, parse_mode
):
need_drop = False
drop_reason = []
"""从magic_model对象中获取后面会用到的区块信息"""
img_blocks = magic_model.get_imgs(page_id)
table_blocks = magic_model.get_tables(page_id)
discarded_blocks = magic_model.get_discarded(page_id)
text_blocks = magic_model.get_text_blocks(page_id)
title_blocks = magic_model.get_title_blocks(page_id)
inline_equations, interline_equations, interline_equation_blocks = (
magic_model.get_equations(page_id)
)
page_w, page_h = magic_model.get_page_size(page_id)
spans = magic_model.get_all_spans(page_id)
"""根据parse_mode,构造spans"""
if parse_mode == 'txt':
"""ocr 中文本类的 span 用 pymu spans 替换!"""
pymu_spans = txt_spans_extract(
pdf_docs[page_id], inline_equations, interline_equations
)
spans = replace_text_span(pymu_spans, spans)
elif parse_mode == 'ocr':
pass
else:
raise Exception('parse_mode must be txt or ocr')
"""删除重叠spans中置信度较低的那些"""
spans, dropped_spans_by_confidence = remove_overlaps_low_confidence_spans(spans)
"""删除重叠spans中较小的那些"""
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
"""对image和table截图"""
spans = ocr_cut_image_and_table(
spans, pdf_docs[page_id], page_id, pdf_bytes_md5, imageWriter
)
"""将所有区块的bbox整理到一起"""
# interline_equation_blocks参数不够准,后面切换到interline_equations上
interline_equation_blocks = []
if len(interline_equation_blocks) > 0:
all_bboxes, all_discarded_blocks, drop_reasons = (
ocr_prepare_bboxes_for_layout_split(
img_blocks,
table_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equation_blocks,
page_w,
page_h,
)
)
else:
all_bboxes, all_discarded_blocks, drop_reasons = (
ocr_prepare_bboxes_for_layout_split(
img_blocks,
table_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equations,
page_w,
page_h,
)
)
if len(drop_reasons) > 0:
need_drop = True
drop_reason.append(DropReason.OVERLAP_BLOCKS_CAN_NOT_SEPARATION)
"""先处理不需要排版的discarded_blocks"""
discarded_block_with_spans, spans = fill_spans_in_blocks(
all_discarded_blocks, spans, 0.4
)
fix_discarded_blocks = fix_discarded_block(discarded_block_with_spans)
"""如果当前页面没有bbox则跳过"""
if len(all_bboxes) == 0:
logger.warning(f'skip this page, not found useful bbox, page_id: {page_id}')
return ocr_construct_page_component_v2(
[],
[],
page_id,
page_w,
page_h,
[],
[],
[],
interline_equations,
fix_discarded_blocks,
need_drop,
drop_reason,
)
"""在切分之前,先检查一下bbox是否有左右重叠的情况,如果有,那么就认为这个pdf暂时没有能力处理好,这种左右重叠的情况大概率是由于pdf里的行间公式、表格没有被正确识别出来造成的 """
while True: # 循环检查左右重叠的情况,如果存在就删除掉较小的那个bbox,直到不存在左右重叠的情况
is_useful_block_horz_overlap, all_bboxes = (
remove_horizontal_overlap_block_which_smaller(all_bboxes)
)
if is_useful_block_horz_overlap:
need_drop = True
drop_reason.append(DropReason.USEFUL_BLOCK_HOR_OVERLAP)
else:
break
"""根据区块信息计算layout"""
page_boundry = [0, 0, page_w, page_h]
layout_bboxes, layout_tree = get_bboxes_layout(all_bboxes, page_boundry, page_id)
if len(text_blocks) > 0 and len(all_bboxes) > 0 and len(layout_bboxes) == 0:
logger.warning(
f'skip this page, page_id: {page_id}, reason: {DropReason.CAN_NOT_DETECT_PAGE_LAYOUT}'
)
need_drop = True
drop_reason.append(DropReason.CAN_NOT_DETECT_PAGE_LAYOUT)
"""以下去掉复杂的布局和超过2列的布局"""
if any(
[lay['layout_label'] == LAYOUT_UNPROC for lay in layout_bboxes]
): # 复杂的布局
logger.warning(
f'skip this page, page_id: {page_id}, reason: {DropReason.COMPLICATED_LAYOUT}'
)
need_drop = True
drop_reason.append(DropReason.COMPLICATED_LAYOUT)
layout_column_width = get_columns_cnt_of_layout(layout_tree)
if layout_column_width > 2: # 去掉超过2列的布局pdf
logger.warning(
f'skip this page, page_id: {page_id}, reason: {DropReason.TOO_MANY_LAYOUT_COLUMNS}'
)
need_drop = True
drop_reason.append(DropReason.TOO_MANY_LAYOUT_COLUMNS)
"""根据layout顺序,对当前页面所有需要留下的block进行排序"""
sorted_blocks = sort_blocks_by_layout(all_bboxes, layout_bboxes)
"""将span填入排好序的blocks中"""
block_with_spans, spans = fill_spans_in_blocks(sorted_blocks, spans, 0.3)
"""对block进行fix操作"""
fix_blocks = fix_block_spans(block_with_spans, img_blocks, table_blocks)
"""获取QA需要外置的list"""
images, tables, interline_equations = get_qa_need_list_v2(fix_blocks)
"""构造pdf_info_dict"""
page_info = ocr_construct_page_component_v2(
fix_blocks,
layout_bboxes,
page_id,
page_w,
page_h,
layout_tree,
images,
tables,
interline_equations,
fix_discarded_blocks,
need_drop,
drop_reason,
)
return page_info
def pdf_parse_union(
pdf_bytes,
model_list,
imageWriter,
parse_mode,
start_page_id=0,
end_page_id=None,
debug_mode=False,
):
pdf_bytes_md5 = compute_md5(pdf_bytes)
pdf_docs = fitz.open('pdf', pdf_bytes)
"""初始化空的pdf_info_dict"""
pdf_info_dict = {}
"""用model_list和docs对象初始化magic_model"""
magic_model = MagicModel(model_list, pdf_docs)
"""根据输入的起始范围解析pdf"""
# end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
end_page_id = (
end_page_id
if end_page_id is not None and end_page_id >= 0
else len(pdf_docs) - 1
)
if end_page_id > len(pdf_docs) - 1:
logger.warning('end_page_id is out of range, use pdf_docs length')
end_page_id = len(pdf_docs) - 1
"""初始化启动时间"""
start_time = time.time()
for page_id, page in enumerate(pdf_docs):
"""debug时输出每页解析的耗时."""
if debug_mode:
time_now = time.time()
logger.info(
f'page_id: {page_id}, last_page_cost_time: {get_delta_time(start_time)}'
)
start_time = time_now
"""解析pdf中的每一页"""
if start_page_id <= page_id <= end_page_id:
page_info = parse_page_core(
pdf_docs, magic_model, page_id, pdf_bytes_md5, imageWriter, parse_mode
)
else:
page_w = page.rect.width
page_h = page.rect.height
page_info = ocr_construct_page_component_v2(
[], [], page_id, page_w, page_h, [], [], [], [], [], True, 'skip page'
)
pdf_info_dict[f'page_{page_id}'] = page_info
"""分段"""
para_split(pdf_info_dict, debug_mode=debug_mode)
"""dict转list"""
pdf_info_list = dict_to_list(pdf_info_dict)
new_pdf_info_dict = {
'pdf_info': pdf_info_list,
}
return new_pdf_info_dict
if __name__ == '__main__':
pass
...@@ -7,7 +7,6 @@ from typing import List ...@@ -7,7 +7,6 @@ from typing import List
import torch import torch
from loguru import logger from loguru import logger
from magic_pdf.config.drop_reason import DropReason
from magic_pdf.config.enums import SupportedPdfParseMethod from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.config.ocr_content_type import BlockType, ContentType from magic_pdf.config.ocr_content_type import BlockType, ContentType
from magic_pdf.data.dataset import Dataset, PageableData from magic_pdf.data.dataset import Dataset, PageableData
...@@ -17,7 +16,7 @@ from magic_pdf.libs.commons import fitz, get_delta_time ...@@ -17,7 +16,7 @@ from magic_pdf.libs.commons import fitz, get_delta_time
from magic_pdf.libs.config_reader import get_local_layoutreader_model_dir from magic_pdf.libs.config_reader import get_local_layoutreader_model_dir
from magic_pdf.libs.convert_utils import dict_to_list from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.hash_utils import compute_md5 from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.local_math import float_equal
from magic_pdf.libs.pdf_image_tools import cut_image_to_pil_image from magic_pdf.libs.pdf_image_tools import cut_image_to_pil_image
from magic_pdf.model.magic_model import MagicModel from magic_pdf.model.magic_model import MagicModel
......
def construct_page_component(page_id, image_info, table_info, text_blocks_preproc, layout_bboxes, inline_eq_info,
interline_eq_info, raw_pymu_blocks,
removed_text_blocks, removed_image_blocks, images_backup, droped_table_block, table_backup,
layout_tree,
page_w, page_h, footnote_bboxes_tmp):
"""
"""
return_dict = {}
return_dict['para_blocks'] = {}
return_dict['preproc_blocks'] = text_blocks_preproc
return_dict['images'] = image_info
return_dict['tables'] = table_info
return_dict['interline_equations'] = interline_eq_info
return_dict['inline_equations'] = inline_eq_info
return_dict['layout_bboxes'] = layout_bboxes
return_dict['pymu_raw_blocks'] = raw_pymu_blocks
return_dict['global_statistic'] = {}
return_dict['droped_text_block'] = removed_text_blocks
return_dict['droped_image_block'] = removed_image_blocks
return_dict['droped_table_block'] = []
return_dict['image_backup'] = images_backup
return_dict['table_backup'] = []
return_dict['page_idx'] = page_id
return_dict['page_size'] = [page_w, page_h]
return_dict['_layout_tree'] = layout_tree # 辅助分析layout作用
return_dict['footnote_bboxes_tmp'] = footnote_bboxes_tmp
return return_dict
def ocr_construct_page_component(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, inline_equations,
dropped_text_block, dropped_image_block, dropped_table_block, dropped_equation_block,
need_remove_spans_bboxes_dict):
return_dict = {
'preproc_blocks': blocks,
'layout_bboxes': layout_bboxes,
'page_idx': page_id,
'page_size': [page_w, page_h],
'_layout_tree': layout_tree,
'images': images,
'tables': tables,
'interline_equations': interline_equations,
'inline_equations': inline_equations,
'droped_text_block': dropped_text_block,
'droped_image_block': dropped_image_block,
'droped_table_block': dropped_table_block,
'dropped_equation_block': dropped_equation_block,
'droped_bboxes': need_remove_spans_bboxes_dict,
}
return return_dict
def ocr_construct_page_component_v2(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree, def ocr_construct_page_component_v2(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, discarded_blocks, need_drop, drop_reason): images, tables, interline_equations, discarded_blocks, need_drop, drop_reason):
......
...@@ -25,43 +25,6 @@ def ocr_cut_image_and_table(spans, page, page_id, pdf_bytes_md5, imageWriter): ...@@ -25,43 +25,6 @@ def ocr_cut_image_and_table(spans, page, page_id, pdf_bytes_md5, imageWriter):
return spans return spans
def txt_save_images_by_bboxes(page_num: int, page, pdf_bytes_md5: str,
image_bboxes: list, images_overlap_backup: list, table_bboxes: list,
equation_inline_bboxes: list,
equation_interline_bboxes: list, imageWriter) -> dict:
"""返回一个dict, key为bbox, 值是图片地址."""
image_info = []
image_backup_info = []
table_info = []
inline_eq_info = []
interline_eq_info = []
# 图片的保存路径组成是这样的: {s3_or_local_path}/{book_name}/{images|tables|equations}/{page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg
def return_path(type):
return join_path(pdf_bytes_md5, type)
for bbox in image_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path('images'), imageWriter)
image_info.append({'bbox': bbox, 'image_path': image_path})
for bbox in images_overlap_backup:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path('images'), imageWriter)
image_backup_info.append({'bbox': bbox, 'image_path': image_path})
for bbox in table_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path('tables'), imageWriter)
table_info.append({'bbox': bbox, 'image_path': image_path})
return image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info
def check_img_bbox(bbox) -> bool: def check_img_bbox(bbox) -> bool:
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]): if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f'image_bboxes: 错误的box, {bbox}') logger.warning(f'image_bboxes: 错误的box, {bbox}')
......
...@@ -8,179 +8,6 @@ from magic_pdf.pre_proc.remove_bbox_overlap import \ ...@@ -8,179 +8,6 @@ from magic_pdf.pre_proc.remove_bbox_overlap import \
remove_overlap_between_bbox_for_block remove_overlap_between_bbox_for_block
def ocr_prepare_bboxes_for_layout_split(
img_blocks,
table_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equation_blocks,
page_w,
page_h,
):
all_bboxes = []
all_discarded_blocks = []
for image in img_blocks:
x0, y0, x1, y1 = image['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Image,
None,
None,
None,
None,
image['score'],
]
)
for table in table_blocks:
x0, y0, x1, y1 = table['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Table,
None,
None,
None,
None,
table['score'],
]
)
for text in text_blocks:
x0, y0, x1, y1 = text['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Text,
None,
None,
None,
None,
text['score'],
]
)
for title in title_blocks:
x0, y0, x1, y1 = title['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Title,
None,
None,
None,
None,
title['score'],
]
)
for interline_equation in interline_equation_blocks:
x0, y0, x1, y1 = interline_equation['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.InterlineEquation,
None,
None,
None,
None,
interline_equation['score'],
]
)
"""block嵌套问题解决"""
"""文本框与标题框重叠,优先信任文本框"""
all_bboxes = fix_text_overlap_title_blocks(all_bboxes)
"""任何框体与舍弃框重叠,优先信任舍弃框"""
all_bboxes = remove_need_drop_blocks(all_bboxes, discarded_blocks)
# interline_equation 与title或text框冲突的情况,分两种情况处理
"""interline_equation框与文本类型框iou比较接近1的时候,信任行间公式框"""
all_bboxes = fix_interline_equation_overlap_text_blocks_with_hi_iou(all_bboxes)
"""interline_equation框被包含在文本类型框内,且interline_equation比文本区块小很多时信任文本框,这时需要舍弃公式框"""
# 通过后续大框套小框逻辑删除
"""discarded_blocks中只保留宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的(限定footnote)"""
for discarded in discarded_blocks:
x0, y0, x1, y1 = discarded['bbox']
all_discarded_blocks.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Discarded,
None,
None,
None,
None,
discarded['score'],
]
)
# 将footnote加入到all_bboxes中,用来计算layout
if (x1 - x0) > (page_w / 3) and (y1 - y0) > 10 and y0 > (page_h / 2):
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Footnote,
None,
None,
None,
None,
discarded['score'],
]
)
"""经过以上处理后,还存在大框套小框的情况,则删除小框"""
all_bboxes = remove_overlaps_min_blocks(all_bboxes)
all_discarded_blocks = remove_overlaps_min_blocks(all_discarded_blocks)
"""将剩余的bbox做分离处理,防止后面分layout时出错"""
all_bboxes, drop_reasons = remove_overlap_between_bbox_for_block(all_bboxes)
return all_bboxes, all_discarded_blocks, drop_reasons
def add_bboxes(blocks, block_type, bboxes): def add_bboxes(blocks, block_type, bboxes):
for block in blocks: for block in blocks:
x0, y0, x1, y1 = block['bbox'] x0, y0, x1, y1 = block['bbox']
......
from magic_pdf.config.drop_tag import DropTag
from magic_pdf.config.ocr_content_type import BlockType, ContentType from magic_pdf.config.ocr_content_type import BlockType, ContentType
from magic_pdf.libs.boxbase import (__is_overlaps_y_exceeds_threshold, from magic_pdf.libs.boxbase import __is_overlaps_y_exceeds_threshold, calculate_overlap_area_in_bbox1_area_ratio
_is_in_or_part_overlap_with_area_ratio,
calculate_overlap_area_in_bbox1_area_ratio)
# 将每一个line中的span从左到右排序 # 将每一个line中的span从左到右排序
...@@ -63,86 +61,6 @@ def merge_spans_to_line(spans, threshold=0.6): ...@@ -63,86 +61,6 @@ def merge_spans_to_line(spans, threshold=0.6):
return lines return lines
def merge_spans_to_line_by_layout(spans, layout_bboxes):
lines = []
new_spans = []
dropped_spans = []
for item in layout_bboxes:
layout_bbox = item['layout_bbox']
# 遍历spans,将每个span放入对应的layout中
layout_sapns = []
for span in spans:
if calculate_overlap_area_in_bbox1_area_ratio(
span['bbox'], layout_bbox) > 0.6:
layout_sapns.append(span)
# 如果layout_sapns不为空,则放入new_spans中
if len(layout_sapns) > 0:
new_spans.append(layout_sapns)
# 从spans删除已经放入layout_sapns中的span
for layout_sapn in layout_sapns:
spans.remove(layout_sapn)
if len(new_spans) > 0:
for layout_sapns in new_spans:
layout_lines = merge_spans_to_line(layout_sapns)
lines.extend(layout_lines)
# 对line中的span进行排序
lines = line_sort_spans_by_left_to_right(lines)
for span in spans:
span['tag'] = DropTag.NOT_IN_LAYOUT
dropped_spans.append(span)
return lines, dropped_spans
def merge_lines_to_block(lines):
# 目前不做block拼接,先做个结构,每个block中只有一个line,block的bbox就是line的bbox
blocks = []
for line in lines:
blocks.append({
'bbox': line['bbox'],
'lines': [line],
})
return blocks
def sort_blocks_by_layout(all_bboxes, layout_bboxes):
new_blocks = []
sort_blocks = []
for item in layout_bboxes:
layout_bbox = item['layout_bbox']
# 遍历blocks,将每个blocks放入对应的layout中
layout_blocks = []
for block in all_bboxes:
# 如果是footnote则跳过
if block[7] == BlockType.Footnote:
continue
block_bbox = block[:4]
if calculate_overlap_area_in_bbox1_area_ratio(
block_bbox, layout_bbox) > 0.8:
layout_blocks.append(block)
# 如果layout_blocks不为空,则放入new_blocks中
if len(layout_blocks) > 0:
new_blocks.append(layout_blocks)
# 从all_bboxes删除已经放入layout_blocks中的block
for layout_block in layout_blocks:
all_bboxes.remove(layout_block)
# 如果new_blocks不为空,则对new_blocks中每个block进行排序
if len(new_blocks) > 0:
for bboxes_in_layout_block in new_blocks:
bboxes_in_layout_block.sort(
key=lambda x: x[1]) # 一个layout内部的box,按照y0自上而下排序
sort_blocks.extend(bboxes_in_layout_block)
# sort_blocks中已经包含了当前页面所有最终留下的block,且已经排好了顺序
return sort_blocks
def fill_spans_in_blocks(blocks, spans, radio): def fill_spans_in_blocks(blocks, spans, radio):
"""将allspans中的span按位置关系,放入blocks中.""" """将allspans中的span按位置关系,放入blocks中."""
block_with_spans = [] block_with_spans = []
...@@ -184,28 +102,6 @@ def fill_spans_in_blocks(blocks, spans, radio): ...@@ -184,28 +102,6 @@ def fill_spans_in_blocks(blocks, spans, radio):
return block_with_spans, spans return block_with_spans, spans
def fix_block_spans(block_with_spans, img_blocks, table_blocks):
"""1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系
需要将caption和footnote的text_span放入相应img_block和table_block内的
caption_block和footnote_block中 2、同时需要删除block中的spans字段."""
fix_blocks = []
for block in block_with_spans:
block_type = block['type']
if block_type == BlockType.Image:
block = fix_image_block(block, img_blocks)
elif block_type == BlockType.Table:
block = fix_table_block(block, table_blocks)
elif block_type in [BlockType.Text, BlockType.Title]:
block = fix_text_block(block)
elif block_type == BlockType.InterlineEquation:
block = fix_interline_block(block)
else:
continue
fix_blocks.append(block)
return fix_blocks
def fix_block_spans_v2(block_with_spans): def fix_block_spans_v2(block_with_spans):
"""1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系 """1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系
需要将caption和footnote的text_span放入相应img_block和table_block内的 需要将caption和footnote的text_span放入相应img_block和table_block内的
...@@ -235,113 +131,6 @@ def fix_discarded_block(discarded_block_with_spans): ...@@ -235,113 +131,6 @@ def fix_discarded_block(discarded_block_with_spans):
return fix_discarded_blocks return fix_discarded_blocks
def merge_spans_to_block(spans: list, block_bbox: list, block_type: str):
block_spans = []
# 如果有img_caption,则将img_block中的text_spans放入img_caption_block中
for span in spans:
if calculate_overlap_area_in_bbox1_area_ratio(span['bbox'],
block_bbox) > 0.6:
block_spans.append(span)
block_lines = merge_spans_to_line(block_spans)
# 对line中的span进行排序
sort_block_lines = line_sort_spans_by_left_to_right(block_lines)
block = {'bbox': block_bbox, 'type': block_type, 'lines': sort_block_lines}
return block, block_spans
def make_body_block(span: dict, block_bbox: list, block_type: str):
# 创建body_block
body_line = {
'bbox': block_bbox,
'spans': [span],
}
body_block = {'bbox': block_bbox, 'type': block_type, 'lines': [body_line]}
return body_block
def fix_image_block(block, img_blocks):
block['blocks'] = []
# 遍历img_blocks,找到与当前block匹配的img_block
for img_block in img_blocks:
if _is_in_or_part_overlap_with_area_ratio(block['bbox'],
img_block['bbox'], 0.95):
# 创建img_body_block
for span in block['spans']:
if span['type'] == ContentType.Image and img_block[
'img_body_bbox'] == span['bbox']:
# 创建img_body_block
img_body_block = make_body_block(
span, img_block['img_body_bbox'], BlockType.ImageBody)
block['blocks'].append(img_body_block)
# 从spans中移除img_body_block中已经放入的span
block['spans'].remove(span)
break
# 根据list长度,判断img_block中是否有img_caption
if img_block['img_caption_bbox'] is not None:
img_caption_block, img_caption_spans = merge_spans_to_block(
block['spans'], img_block['img_caption_bbox'],
BlockType.ImageCaption)
block['blocks'].append(img_caption_block)
if img_block['img_footnote_bbox'] is not None:
img_footnote_block, img_footnote_spans = merge_spans_to_block(
block['spans'], img_block['img_footnote_bbox'],
BlockType.ImageFootnote)
block['blocks'].append(img_footnote_block)
break
del block['spans']
return block
def fix_table_block(block, table_blocks):
block['blocks'] = []
# 遍历table_blocks,找到与当前block匹配的table_block
for table_block in table_blocks:
if _is_in_or_part_overlap_with_area_ratio(block['bbox'],
table_block['bbox'], 0.95):
# 创建table_body_block
for span in block['spans']:
if span['type'] == ContentType.Table and table_block[
'table_body_bbox'] == span['bbox']:
# 创建table_body_block
table_body_block = make_body_block(
span, table_block['table_body_bbox'],
BlockType.TableBody)
block['blocks'].append(table_body_block)
# 从spans中移除img_body_block中已经放入的span
block['spans'].remove(span)
break
# 根据list长度,判断table_block中是否有caption
if table_block['table_caption_bbox'] is not None:
table_caption_block, table_caption_spans = merge_spans_to_block(
block['spans'], table_block['table_caption_bbox'],
BlockType.TableCaption)
block['blocks'].append(table_caption_block)
# 如果table_caption_block_spans不为空
if len(table_caption_spans) > 0:
# 一些span已经放入了caption_block中,需要从block['spans']中删除
for span in table_caption_spans:
block['spans'].remove(span)
# 根据list长度,判断table_block中是否有table_note
if table_block['table_footnote_bbox'] is not None:
table_footnote_block, table_footnote_spans = merge_spans_to_block(
block['spans'], table_block['table_footnote_bbox'],
BlockType.TableFootnote)
block['blocks'].append(table_footnote_block)
break
del block['spans']
return block
def fix_text_block(block): def fix_text_block(block):
# 文本block中的公式span都应该转换成行内type # 文本block中的公式span都应该转换成行内type
for span in block['spans']: for span in block['spans']:
......
from magic_pdf.config.drop_tag import DropTag from magic_pdf.config.drop_tag import DropTag
from magic_pdf.config.ocr_content_type import BlockType, ContentType from magic_pdf.config.ocr_content_type import BlockType
from magic_pdf.libs.boxbase import (__is_overlaps_y_exceeds_threshold, from magic_pdf.libs.boxbase import calculate_iou, get_minbox_if_overlap_by_ratio
calculate_iou,
calculate_overlap_area_in_bbox1_area_ratio,
get_minbox_if_overlap_by_ratio)
def remove_overlaps_low_confidence_spans(spans): def remove_overlaps_low_confidence_spans(spans):
...@@ -59,253 +56,6 @@ def remove_overlaps_min_spans(spans): ...@@ -59,253 +56,6 @@ def remove_overlaps_min_spans(spans):
return spans, dropped_spans return spans, dropped_spans
def remove_spans_by_bboxes(spans, need_remove_spans_bboxes):
# 遍历spans, 判断是否在removed_span_block_bboxes中
# 如果是, 则删除该span 否则, 保留该span
need_remove_spans = []
for span in spans:
for removed_bbox in need_remove_spans_bboxes:
if (
calculate_overlap_area_in_bbox1_area_ratio(span['bbox'], removed_bbox)
> 0.5
):
if span not in need_remove_spans:
need_remove_spans.append(span)
break
if len(need_remove_spans) > 0:
for span in need_remove_spans:
spans.remove(span)
return spans
def remove_spans_by_bboxes_dict(spans, need_remove_spans_bboxes_dict):
dropped_spans = []
for drop_tag, removed_bboxes in need_remove_spans_bboxes_dict.items():
# logger.info(f"remove spans by bbox dict, drop_tag: {drop_tag}, removed_bboxes: {removed_bboxes}")
need_remove_spans = []
for span in spans:
# 通过判断span的bbox是否在removed_bboxes中, 判断是否需要删除该span
for removed_bbox in removed_bboxes:
if (
calculate_overlap_area_in_bbox1_area_ratio(
span['bbox'], removed_bbox
)
> 0.5
):
need_remove_spans.append(span)
break
# 当drop_tag为DropTag.FOOTNOTE时, 判断span是否在removed_bboxes中任意一个的下方,如果是,则删除该span
elif (
drop_tag == DropTag.FOOTNOTE
and (span['bbox'][1] + span['bbox'][3]) / 2 > removed_bbox[3]
and removed_bbox[0]
< (span['bbox'][0] + span['bbox'][2]) / 2
< removed_bbox[2]
):
need_remove_spans.append(span)
break
for span in need_remove_spans:
spans.remove(span)
span['tag'] = drop_tag
dropped_spans.append(span)
return spans, dropped_spans
def adjust_bbox_for_standalone_block(spans):
# 对tpye=["interline_equation", "image", "table"]进行额外处理,如果左边有字的话,将该span的bbox中y0调整至不高于文字的y0
for sb_span in spans:
if sb_span['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
]:
for text_span in spans:
if text_span['type'] in [ContentType.Text, ContentType.InlineEquation]:
# 判断span2的纵向高度是否被span所覆盖
if (
sb_span['bbox'][1] < text_span['bbox'][1]
and sb_span['bbox'][3] > text_span['bbox'][3]
):
# 判断span2是否在span左边
if text_span['bbox'][0] < sb_span['bbox'][0]:
# 调整span的y0和span2的y0一致
sb_span['bbox'][1] = text_span['bbox'][1]
return spans
def modify_y_axis(spans: list, displayed_list: list, text_inline_lines: list):
# displayed_list = []
# 如果spans为空,则不处理
if len(spans) == 0:
pass
else:
spans.sort(key=lambda span: span['bbox'][1])
lines = []
current_line = [spans[0]]
if spans[0]['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
]:
displayed_list.append(spans[0])
line_first_y0 = spans[0]['bbox'][1]
line_first_y = spans[0]['bbox'][3]
# 用于给行间公式搜索
# text_inline_lines = []
for span in spans[1:]:
# if span.get("content","") == "78.":
# print("debug")
# 如果当前的span类型为"interline_equation" 或者 当前行中已经有"interline_equation"
# image和table类型,同上
if span['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
] or any(
s['type']
in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table]
for s in current_line
):
# 传入
if span['type'] in [
ContentType.InterlineEquation,
ContentType.Image,
ContentType.Table,
]:
displayed_list.append(span)
# 则开始新行
lines.append(current_line)
if len(current_line) > 1 or current_line[0]['type'] in [
ContentType.Text,
ContentType.InlineEquation,
]:
text_inline_lines.append(
(current_line, (line_first_y0, line_first_y))
)
current_line = [span]
line_first_y0 = span['bbox'][1]
line_first_y = span['bbox'][3]
continue
# 如果当前的span与当前行的最后一个span在y轴上重叠,则添加到当前行
if __is_overlaps_y_exceeds_threshold(
span['bbox'], current_line[-1]['bbox']
):
if span['type'] == 'text':
line_first_y0 = span['bbox'][1]
line_first_y = span['bbox'][3]
current_line.append(span)
else:
# 否则,开始新行
lines.append(current_line)
text_inline_lines.append((current_line, (line_first_y0, line_first_y)))
current_line = [span]
line_first_y0 = span['bbox'][1]
line_first_y = span['bbox'][3]
# 添加最后一行
if current_line:
lines.append(current_line)
if len(current_line) > 1 or current_line[0]['type'] in [
ContentType.Text,
ContentType.InlineEquation,
]:
text_inline_lines.append((current_line, (line_first_y0, line_first_y)))
for line in text_inline_lines:
# 按照x0坐标排序
current_line = line[0]
current_line.sort(key=lambda span: span['bbox'][0])
# 调整每一个文字行内bbox统一
for line in text_inline_lines:
current_line, (line_first_y0, line_first_y) = line
for span in current_line:
span['bbox'][1] = line_first_y0
span['bbox'][3] = line_first_y
# return spans, displayed_list, text_inline_lines
def modify_inline_equation(spans: list, displayed_list: list, text_inline_lines: list):
# 错误行间公式转行内公式
j = 0
for i in range(len(displayed_list)):
# if i == 8:
# print("debug")
span = displayed_list[i]
span_y0, span_y = span['bbox'][1], span['bbox'][3]
while j < len(text_inline_lines):
text_line = text_inline_lines[j]
y0, y1 = text_line[1]
if (
span_y0 < y0 < span_y
or span_y0 < y1 < span_y
or span_y0 < y0
and span_y > y1
) and __is_overlaps_y_exceeds_threshold(span['bbox'], (0, y0, 0, y1)):
# 调整公式类型
if span['type'] == ContentType.InterlineEquation:
# 最后一行是行间公式
if j + 1 >= len(text_inline_lines):
span['type'] = ContentType.InlineEquation
span['bbox'][1] = y0
span['bbox'][3] = y1
else:
# 行间公式旁边有多行文字或者行间公式比文字高3倍则不转换
y0_next, y1_next = text_inline_lines[j + 1][1]
if (
not __is_overlaps_y_exceeds_threshold(
span['bbox'], (0, y0_next, 0, y1_next)
)
and 3 * (y1 - y0) > span_y - span_y0
):
span['type'] = ContentType.InlineEquation
span['bbox'][1] = y0
span['bbox'][3] = y1
break
elif (
span_y < y0
or span_y0 < y0 < span_y
and not __is_overlaps_y_exceeds_threshold(span['bbox'], (0, y0, 0, y1))
):
break
else:
j += 1
return spans
def get_qa_need_list(blocks):
# 创建 images, tables, interline_equations, inline_equations 的副本
images = []
tables = []
interline_equations = []
inline_equations = []
for block in blocks:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Image:
images.append(span)
elif span['type'] == ContentType.Table:
tables.append(span)
elif span['type'] == ContentType.InlineEquation:
inline_equations.append(span)
elif span['type'] == ContentType.InterlineEquation:
interline_equations.append(span)
else:
continue
return images, tables, interline_equations, inline_equations
def get_qa_need_list_v2(blocks): def get_qa_need_list_v2(blocks):
# 创建 images, tables, interline_equations, inline_equations 的副本 # 创建 images, tables, interline_equations, inline_equations 的副本
images = [] images = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment