Unverified Commit 8afff9ae authored by Xiaomeng Zhao's avatar Xiaomeng Zhao Committed by GitHub
Browse files

Merge pull request #1120 from opendatalab/release-0.10.2

Release 0.10.2
parents 4df1eb74 7fdbb6e5
from loguru import logger
from magic_pdf.config.drop_reason import DropReason
from magic_pdf.layout.layout_sort import get_columns_cnt_of_layout
def __is_pseudo_single_column(page_info) -> bool:
"""判断一个页面是否伪单列。
Args:
page_info (dict): 页面信息字典,包括'_layout_tree'和'preproc_blocks'。
Returns:
Tuple[bool, Optional[str]]: 如果页面伪单列返回(True, extra_info),否则返回(False, None)。
"""
layout_tree = page_info['_layout_tree']
layout_column_width = get_columns_cnt_of_layout(layout_tree)
if layout_column_width == 1:
text_blocks = page_info['preproc_blocks']
# 遍历每一个text_block
for text_block in text_blocks:
lines = text_block['lines']
num_lines = len(lines)
num_satisfying_lines = 0
for i in range(num_lines - 1):
current_line = lines[i]
next_line = lines[i + 1]
# 获取当前line和下一个line的bbox属性
current_bbox = current_line['bbox']
next_bbox = next_line['bbox']
# 检查是否满足条件
if next_bbox[0] > current_bbox[2] or next_bbox[2] < current_bbox[0]:
num_satisfying_lines += 1
# 如果有一半以上的line满足条件,就drop
# print("num_satisfying_lines:", num_satisfying_lines, "num_lines:", num_lines)
if num_lines > 20:
radio = num_satisfying_lines / num_lines
if radio >= 0.5:
extra_info = f'{{num_lines: {num_lines}, num_satisfying_lines: {num_satisfying_lines}}}'
block_text = []
for line in lines:
if line['spans']:
for span in line['spans']:
block_text.append(span['text'])
logger.warning(f'pseudo_single_column block_text: {block_text}')
return True, extra_info
return False, None
def pdf_post_filter(page_info) -> tuple:
"""return:(True|False, err_msg) True, 如果pdf符合要求 False, 如果pdf不符合要求."""
bool_is_pseudo_single_column, extra_info = __is_pseudo_single_column(page_info)
if bool_is_pseudo_single_column:
return False, {'_need_drop': True, '_drop_reason': DropReason.PSEUDO_SINGLE_COLUMN, 'extra_info': extra_info}
return True, None
from magic_pdf.libs.boxbase import _is_in, _is_in_or_part_overlap
import collections # 统计库
def is_below(bbox1, bbox2):
# 如果block1的上边y坐标大于block2的下边y坐标,那么block1在block2下面
return bbox1[1] > bbox2[3]
def merge_bboxes(bboxes):
# 找出所有blocks的最小x0,最大y1,最大x1,最小y0,这就是合并后的bbox
x0 = min(bbox[0] for bbox in bboxes)
y0 = min(bbox[1] for bbox in bboxes)
x1 = max(bbox[2] for bbox in bboxes)
y1 = max(bbox[3] for bbox in bboxes)
return [x0, y0, x1, y1]
def merge_footnote_blocks(page_info, main_text_font):
page_info['merged_bboxes'] = []
for layout in page_info['layout_bboxes']:
# 找出layout中的所有footnote blocks和preproc_blocks
footnote_bboxes = [block for block in page_info['footnote_bboxes_tmp'] if _is_in(block, layout['layout_bbox'])]
# 如果没有footnote_blocks,就跳过这个layout
if not footnote_bboxes:
continue
preproc_blocks = [block for block in page_info['preproc_blocks'] if _is_in(block['bbox'], layout['layout_bbox'])]
# preproc_bboxes = [block['bbox'] for block in preproc_blocks]
font_names = collections.Counter()
if len(preproc_blocks) > 0:
# 存储每一行的文本块大小的列表
line_sizes = []
# 存储每个文本块的平均行大小
block_sizes = []
for block in preproc_blocks:
block_line_sizes = []
block_fonts = collections.Counter()
for line in block['lines']:
# 提取每个span的size属性,并计算行大小
span_sizes = [span['size'] for span in line['spans'] if 'size' in span]
if span_sizes:
line_size = sum(span_sizes) / len(span_sizes)
line_sizes.append(line_size)
block_line_sizes.append(line_size)
span_font = [(span['font'], len(span['text'])) for span in line['spans'] if
'font' in span and len(span['text']) > 0]
if span_font:
# # todo main_text_font应该用基于字数最多的字体而不是span级别的统计
# font_names.append(font_name for font_name in span_font)
# block_fonts.append(font_name for font_name in span_font)
for font, count in span_font:
# font_names.extend([font] * count)
# block_fonts.extend([font] * count)
font_names[font] += count
block_fonts[font] += count
if block_line_sizes:
# 计算文本块的平均行大小
block_size = sum(block_line_sizes) / len(block_line_sizes)
block_font = block_fonts.most_common(1)[0][0]
block_sizes.append((block, block_size, block_font))
# 计算main_text_size
# main_text_font = font_names.most_common(1)[0][0]
main_text_size = collections.Counter(line_sizes).most_common(1)[0][0]
else:
continue
need_merge_bboxes = []
# 任何一个下面有正文block的footnote bbox都是假footnote
for footnote_bbox in footnote_bboxes:
# 检测footnote下面是否有正文block(正文block需满足,block平均size大于等于main_text_size,且block行数大于等于5)
main_text_bboxes_below = [block['bbox'] for block, size, block_font in block_sizes if
is_below(block['bbox'], footnote_bbox) and
sum([size >= main_text_size,
len(block['lines']) >= 5,
block_font == main_text_font])
>= 2]
# 如果main_text_bboxes_below不为空,说明footnote下面有正文block,这个footnote不成立,跳过
if len(main_text_bboxes_below) > 0:
continue
else:
# 否则,说明footnote下面没有正文block,这个footnote成立,添加到待merge的footnote_bboxes中
need_merge_bboxes.append(footnote_bbox)
if len(need_merge_bboxes) == 0:
continue
# 找出最靠上的footnote block
top_footnote_bbox = min(need_merge_bboxes, key=lambda bbox: bbox[1])
# 找出所有在top_footnote_block下面的preproc_blocks,并确保这些preproc_blocks的平均行大小小于main_text_size
bboxes_below = [block['bbox'] for block, size, block_font in block_sizes if is_below(block['bbox'], top_footnote_bbox)]
# # 找出所有在top_footnote_block下面的preproc_blocks
# bboxes_below = [bbox for bbox in preproc_bboxes if is_below(bbox, top_footnote_bbox)]
# 合并top_footnote_block和blocks_below
merged_bbox = merge_bboxes([top_footnote_bbox] + bboxes_below)
# 添加到新的footnote_bboxes_tmp中
page_info['merged_bboxes'].append(merged_bbox)
return page_info
def remove_footnote_blocks(page_info):
if page_info.get('merged_bboxes'):
# 从文字中去掉footnote
remain_text_blocks, removed_footnote_text_blocks = remove_footnote_text(page_info['preproc_blocks'], page_info['merged_bboxes'])
# 从图片中去掉footnote
image_blocks, removed_footnote_imgs_blocks = remove_footnote_image(page_info['images'], page_info['merged_bboxes'])
# 更新page_info
page_info['preproc_blocks'] = remain_text_blocks
page_info['images'] = image_blocks
page_info['droped_text_block'].extend(removed_footnote_text_blocks)
page_info['droped_image_block'].extend(removed_footnote_imgs_blocks)
# 删除footnote_bboxes_tmp和merged_bboxes
del page_info['merged_bboxes']
del page_info['footnote_bboxes_tmp']
return page_info
def remove_footnote_text(raw_text_block, footnote_bboxes):
"""
:param raw_text_block: str类型,是当前页的文本内容
:param footnoteBboxes: list类型,是当前页的脚注bbox
"""
footnote_text_blocks = []
for block in raw_text_block:
text_bbox = block['bbox']
# TODO 更严谨点在line级别做
if any([_is_in_or_part_overlap(text_bbox, footnote_bbox) for footnote_bbox in footnote_bboxes]):
# if any([text_bbox[3]>=footnote_bbox[1] for footnote_bbox in footnote_bboxes]):
block['tag'] = 'footnote'
footnote_text_blocks.append(block)
# raw_text_block.remove(block)
# 移除,不能再内部移除,否则会出错
for block in footnote_text_blocks:
raw_text_block.remove(block)
return raw_text_block, footnote_text_blocks
def remove_footnote_image(image_blocks, footnote_bboxes):
"""
:param image_bboxes: list类型,是当前页的图片bbox(结构体)
:param footnoteBboxes: list类型,是当前页的脚注bbox
"""
footnote_imgs_blocks = []
for image_block in image_blocks:
if any([_is_in(image_block['bbox'], footnote_bbox) for footnote_bbox in footnote_bboxes]):
footnote_imgs_blocks.append(image_block)
for footnote_imgs_block in footnote_imgs_blocks:
image_blocks.remove(footnote_imgs_block)
return image_blocks, footnote_imgs_blocks
\ No newline at end of file
"""
去掉正文的引文引用marker
https://aicarrier.feishu.cn/wiki/YLOPwo1PGiwFRdkwmyhcZmr0n3d
"""
import re
# from magic_pdf.libs.nlp_utils import NLPModels
# __NLP_MODEL = NLPModels()
def check_1(spans, cur_span_i):
"""寻找前一个char,如果是句号,逗号,那么就是角标"""
if cur_span_i==0:
return False # 不是角标
pre_span = spans[cur_span_i-1]
pre_char = pre_span['chars'][-1]['c']
if pre_char in ['。', ',', '.', ',']:
return True
return False
# def check_2(spans, cur_span_i):
# """检查前面一个span的最后一个单词,如果长度大于5,全都是字母,并且不含大写,就是角标"""
# pattern = r'\b[A-Z]\.\s[A-Z][a-z]*\b' # 形如A. Bcde, L. Bcde, 人名的缩写
#
# if cur_span_i==0 and len(spans)>1:
# next_span = spans[cur_span_i+1]
# next_txt = "".join([c['c'] for c in next_span['chars']])
# result = __NLP_MODEL.detect_entity_catgr_using_nlp(next_txt)
# if result in ["PERSON", "GPE", "ORG"]:
# return True
#
# if re.findall(pattern, next_txt):
# return True
#
# return False # 不是角标
# elif cur_span_i==0 and len(spans)==1: # 角标占用了整行?谨慎删除
# return False
#
# # 如果这个span是最后一个span,
# if cur_span_i==len(spans)-1:
# pre_span = spans[cur_span_i-1]
# pre_txt = "".join([c['c'] for c in pre_span['chars']])
# pre_word = pre_txt.split(' ')[-1]
# result = __NLP_MODEL.detect_entity_catgr_using_nlp(pre_txt)
# if result in ["PERSON", "GPE", "ORG"]:
# return True
#
# if re.findall(pattern, pre_txt):
# return True
#
# return len(pre_word) > 5 and pre_word.isalpha() and pre_word.islower()
# else: # 既不是第一个span,也不是最后一个span,那么此时检查一下这个角标距离前后哪个单词更近就属于谁的角标
# pre_span = spans[cur_span_i-1]
# next_span = spans[cur_span_i+1]
# cur_span = spans[cur_span_i]
# # 找到前一个和后一个span里的距离最近的单词
# pre_distance = 10000 # 一个很大的数
# next_distance = 10000 # 一个很大的数
# for c in pre_span['chars'][::-1]:
# if c['c'].isalpha():
# pre_distance = cur_span['bbox'][0] - c['bbox'][2]
# break
# for c in next_span['chars']:
# if c['c'].isalpha():
# next_distance = c['bbox'][0] - cur_span['bbox'][2]
# break
#
# if pre_distance<next_distance:
# belong_to_span = pre_span
# else:
# belong_to_span = next_span
#
# txt = "".join([c['c'] for c in belong_to_span['chars']])
# pre_word = txt.split(' ')[-1]
# result = __NLP_MODEL.detect_entity_catgr_using_nlp(txt)
# if result in ["PERSON", "GPE", "ORG"]:
# return True
#
# if re.findall(pattern, txt):
# return True
#
# return len(pre_word) > 5 and pre_word.isalpha() and pre_word.islower()
def check_3(spans, cur_span_i):
"""上标里有[], 有*, 有-, 有逗号"""
# 如[2-3],[22]
# 如 2,3,4
cur_span_txt = ''.join(c['c'] for c in spans[cur_span_i]['chars']).strip()
bad_char = ['[', ']', '*', ',']
if any([c in cur_span_txt for c in bad_char]) and any(character.isdigit() for character in cur_span_txt):
return True
# 如2-3, a-b
patterns = [r'\d+-\d+', r'[a-zA-Z]-[a-zA-Z]', r'[a-zA-Z],[a-zA-Z]']
for pattern in patterns:
match = re.match(pattern, cur_span_txt)
if match is not None:
return True
return False
def remove_citation_marker(with_char_text_blcoks):
for blk in with_char_text_blcoks:
for line in blk['lines']:
# 如果span里的个数少于2个,那只能忽略,角标不可能自己独占一行
if len(line['spans'])<=1:
continue
# 找到高度最高的span作为位置比较的基准
max_hi_span = line['spans'][0]['bbox']
min_font_sz = 10000 # line里最小的字体
max_font_sz = 0 # line里最大的字体
for s in line['spans']:
if max_hi_span[3]-max_hi_span[1]<s['bbox'][3]-s['bbox'][1]:
max_hi_span = s['bbox']
if min_font_sz>s['size']:
min_font_sz = s['size']
if max_font_sz<s['size']:
max_font_sz = s['size']
base_span_mid_y = (max_hi_span[3]+max_hi_span[1])/2
span_to_del = []
for i, span in enumerate(line['spans']):
span_hi = span['bbox'][3]-span['bbox'][1]
span_mid_y = (span['bbox'][3]+span['bbox'][1])/2
span_font_sz = span['size']
if max_font_sz-span_font_sz<1: # 先以字体过滤正文,如果是正文就不再继续判断了
continue
# 对被除数为0的情况进行过滤
if span_hi==0 or min_font_sz==0:
continue
if (base_span_mid_y-span_mid_y)/span_hi>0.2 or (base_span_mid_y-span_mid_y>0 and abs(span_font_sz-min_font_sz)/min_font_sz<0.1):
"""
1. 它的前一个char如果是句号或者逗号的话,那么肯定是角标而不是公式
2. 如果这个角标的前面是一个单词(长度大于5)而不是任何大写或小写的短字母的话 应该也是角标
3. 上标里有数字和逗号或者数字+星号的组合,方括号,一般肯定就是角标了
4. 这个角标属于前文还是后文要根据距离来判断,如果距离前面的文本太近,那么就是前面的角标,否则就是后面的角标
"""
if (check_1(line['spans'], i) or
# check_2(line['spans'], i) or
check_3(line['spans'], i)
):
"""删除掉这个角标:删除这个span, 同时还要更新line的text"""
span_to_del.append(span)
if len(span_to_del)>0:
for span in span_to_del:
line['spans'].remove(span)
line['text'] = ''.join([c['c'] for s in line['spans'] for c in s['chars']])
return with_char_text_blcoks
def construct_page_component(page_id, image_info, table_info, text_blocks_preproc, layout_bboxes, inline_eq_info,
interline_eq_info, raw_pymu_blocks,
removed_text_blocks, removed_image_blocks, images_backup, droped_table_block, table_backup,
layout_tree,
page_w, page_h, footnote_bboxes_tmp):
"""
"""
return_dict = {}
return_dict['para_blocks'] = {}
return_dict['preproc_blocks'] = text_blocks_preproc
return_dict['images'] = image_info
return_dict['tables'] = table_info
return_dict['interline_equations'] = interline_eq_info
return_dict['inline_equations'] = inline_eq_info
return_dict['layout_bboxes'] = layout_bboxes
return_dict['pymu_raw_blocks'] = raw_pymu_blocks
return_dict['global_statistic'] = {}
return_dict['droped_text_block'] = removed_text_blocks
return_dict['droped_image_block'] = removed_image_blocks
return_dict['droped_table_block'] = []
return_dict['image_backup'] = images_backup
return_dict['table_backup'] = []
return_dict['page_idx'] = page_id
return_dict['page_size'] = [page_w, page_h]
return_dict['_layout_tree'] = layout_tree # 辅助分析layout作用
return_dict['footnote_bboxes_tmp'] = footnote_bboxes_tmp
return return_dict
def ocr_construct_page_component(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, inline_equations,
dropped_text_block, dropped_image_block, dropped_table_block, dropped_equation_block,
need_remove_spans_bboxes_dict):
return_dict = {
'preproc_blocks': blocks,
'layout_bboxes': layout_bboxes,
'page_idx': page_id,
'page_size': [page_w, page_h],
'_layout_tree': layout_tree,
'images': images,
'tables': tables,
'interline_equations': interline_equations,
'inline_equations': inline_equations,
'droped_text_block': dropped_text_block,
'droped_image_block': dropped_image_block,
'droped_table_block': dropped_table_block,
'dropped_equation_block': dropped_equation_block,
'droped_bboxes': need_remove_spans_bboxes_dict,
}
return return_dict
def ocr_construct_page_component_v2(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, discarded_blocks, need_drop, drop_reason):
......
......@@ -25,43 +25,6 @@ def ocr_cut_image_and_table(spans, page, page_id, pdf_bytes_md5, imageWriter):
return spans
def txt_save_images_by_bboxes(page_num: int, page, pdf_bytes_md5: str,
image_bboxes: list, images_overlap_backup: list, table_bboxes: list,
equation_inline_bboxes: list,
equation_interline_bboxes: list, imageWriter) -> dict:
"""返回一个dict, key为bbox, 值是图片地址."""
image_info = []
image_backup_info = []
table_info = []
inline_eq_info = []
interline_eq_info = []
# 图片的保存路径组成是这样的: {s3_or_local_path}/{book_name}/{images|tables|equations}/{page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg
def return_path(type):
return join_path(pdf_bytes_md5, type)
for bbox in image_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path('images'), imageWriter)
image_info.append({'bbox': bbox, 'image_path': image_path})
for bbox in images_overlap_backup:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path('images'), imageWriter)
image_backup_info.append({'bbox': bbox, 'image_path': image_path})
for bbox in table_bboxes:
if not check_img_bbox(bbox):
continue
image_path = cut_image(bbox, page_num, page, return_path('tables'), imageWriter)
table_info.append({'bbox': bbox, 'image_path': image_path})
return image_info, image_backup_info, table_info, inline_eq_info, interline_eq_info
def check_img_bbox(bbox) -> bool:
if any([bbox[0] >= bbox[2], bbox[1] >= bbox[3]]):
logger.warning(f'image_bboxes: 错误的box, {bbox}')
......
from magic_pdf.libs.boxbase import _is_in, calculate_overlap_area_2_minbox_area_ratio # 正则
from magic_pdf.libs.commons import fitz # pyMuPDF库
def __solve_contain_bboxs(all_bbox_list: list):
"""将两个公式的bbox做判断是否有包含关系,若有的话则删掉较小的bbox"""
dump_list = []
for i in range(len(all_bbox_list)):
for j in range(i + 1, len(all_bbox_list)):
# 获取当前两个值
bbox1 = all_bbox_list[i][:4]
bbox2 = all_bbox_list[j][:4]
# 删掉较小的框
if _is_in(bbox1, bbox2):
dump_list.append(all_bbox_list[i])
elif _is_in(bbox2, bbox1):
dump_list.append(all_bbox_list[j])
else:
ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2)
if ratio > 0.7:
s1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
s2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
if s2 > s1:
dump_list.append(all_bbox_list[i])
else:
dump_list.append(all_bbox_list[i])
# 遍历需要删除的列表中的每个元素
for item in dump_list:
while item in all_bbox_list:
all_bbox_list.remove(item)
return all_bbox_list
def parse_equations(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
DPI = 72 # use this resolution
pix = page.get_pixmap(dpi=DPI)
pageL = 0
pageR = int(pix.w)
pageU = 0
pageD = int(pix.h)
#--------- 通过json_from_DocXchain来获取 table ---------#
equationEmbedding_from_DocXChain_bboxs = []
equationIsolated_from_DocXChain_bboxs = []
xf_json = json_from_DocXchain_obj
width_from_json = xf_json['page_info']['width']
height_from_json = xf_json['page_info']['height']
LR_scaleRatio = width_from_json / (pageR - pageL)
UD_scaleRatio = height_from_json / (pageD - pageU)
for xf in xf_json['layout_dets']:
# {0: 'title', 1: 'figure', 2: 'plain text', 3: 'header', 4: 'page number', 5: 'footnote', 6: 'footer', 7: 'table', 8: 'table caption', 9: 'figure caption', 10: 'equation', 11: 'full column', 12: 'sub column'}
L = xf['poly'][0] / LR_scaleRatio
U = xf['poly'][1] / UD_scaleRatio
R = xf['poly'][2] / LR_scaleRatio
D = xf['poly'][5] / UD_scaleRatio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
# equation
img_suffix = f"{page_ID}_{int(L)}_{int(U)}_{int(R)}_{int(D)}"
if xf['category_id'] == 13 and xf['score'] >= 0.3:
latex_text = xf.get("latex", "EmptyInlineEquationResult")
debugable_latex_text = f"{latex_text}|{img_suffix}"
equationEmbedding_from_DocXChain_bboxs.append((L, U, R, D, latex_text))
if xf['category_id'] == 14 and xf['score'] >= 0.3:
latex_text = xf.get("latex", "EmptyInterlineEquationResult")
debugable_latex_text = f"{latex_text}|{img_suffix}"
equationIsolated_from_DocXChain_bboxs.append((L, U, R, D, latex_text))
#---------------------------------------- 排序,编号,保存 -----------------------------------------#
equationIsolated_from_DocXChain_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
equationIsolated_from_DocXChain_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
equationEmbedding_from_DocXChain_names = []
equationEmbedding_ID = 0
equationIsolated_from_DocXChain_names = []
equationIsolated_ID = 0
for L, U, R, D, _ in equationEmbedding_from_DocXChain_bboxs:
if not(L < R and U < D):
continue
try:
# cur_equation = page.get_pixmap(clip=(L,U,R,D))
new_equation_name = "equationEmbedding_{}_{}.png".format(page_ID, equationEmbedding_ID) # 公式name
# cur_equation.save(res_dir_path + '/' + new_equation_name) # 把公式存出在新建的文件夹,并命名
equationEmbedding_from_DocXChain_names.append(new_equation_name) # 把公式的名字存在list中,方便在md中插入引用
equationEmbedding_ID += 1
except:
pass
for L, U, R, D, _ in equationIsolated_from_DocXChain_bboxs:
if not(L < R and U < D):
continue
try:
# cur_equation = page.get_pixmap(clip=(L,U,R,D))
new_equation_name = "equationEmbedding_{}_{}.png".format(page_ID, equationIsolated_ID) # 公式name
# cur_equation.save(res_dir_path + '/' + new_equation_name) # 把公式存出在新建的文件夹,并命名
equationIsolated_from_DocXChain_names.append(new_equation_name) # 把公式的名字存在list中,方便在md中插入引用
equationIsolated_ID += 1
except:
pass
equationEmbedding_from_DocXChain_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
equationIsolated_from_DocXChain_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
"""根据pdf可视区域,调整bbox的坐标"""
cropbox = page.cropbox
if cropbox[0]!=page.rect[0] or cropbox[1]!=page.rect[1]:
for eq_box in equationEmbedding_from_DocXChain_bboxs:
eq_box = [eq_box[0]+cropbox[0], eq_box[1]+cropbox[1], eq_box[2]+cropbox[0], eq_box[3]+cropbox[1], eq_box[4]]
for eq_box in equationIsolated_from_DocXChain_bboxs:
eq_box = [eq_box[0]+cropbox[0], eq_box[1]+cropbox[1], eq_box[2]+cropbox[0], eq_box[3]+cropbox[1], eq_box[4]]
deduped_embedding_eq_bboxes = __solve_contain_bboxs(equationEmbedding_from_DocXChain_bboxs)
return deduped_embedding_eq_bboxes, equationIsolated_from_DocXChain_bboxs
from magic_pdf.libs.commons import fitz # pyMuPDF库
from magic_pdf.libs.coordinate_transform import get_scale_ratio
def parse_footers(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
#--------- 通过json_from_DocXchain来获取 footer ---------#
footer_bbox_from_DocXChain = []
xf_json = json_from_DocXchain_obj
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(xf_json, page)
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
for xf in xf_json['layout_dets']:
L = xf['poly'][0] / horizontal_scale_ratio
U = xf['poly'][1] / vertical_scale_ratio
R = xf['poly'][2] / horizontal_scale_ratio
D = xf['poly'][5] / vertical_scale_ratio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if xf['category_id'] == 6 and xf['score'] >= 0.3:
footer_bbox_from_DocXChain.append((L, U, R, D))
footer_final_names = []
footer_final_bboxs = []
footer_ID = 0
for L, U, R, D in footer_bbox_from_DocXChain:
# cur_footer = page.get_pixmap(clip=(L,U,R,D))
new_footer_name = "footer_{}_{}.png".format(page_ID, footer_ID) # 脚注name
# cur_footer.save(res_dir_path + '/' + new_footer_name) # 把页脚存储在新建的文件夹,并命名
footer_final_names.append(new_footer_name) # 把脚注的名字存在list中
footer_final_bboxs.append((L, U, R, D))
footer_ID += 1
footer_final_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
curPage_all_footer_bboxs = footer_final_bboxs
return curPage_all_footer_bboxs
from collections import defaultdict
from magic_pdf.libs.boxbase import calculate_iou
def compare_bbox_with_list(bbox, bbox_list, tolerance=1):
return any(all(abs(a - b) < tolerance for a, b in zip(bbox, common_bbox)) for common_bbox in bbox_list)
def is_single_line_block(block):
# Determine based on the width and height of the block
block_width = block["X1"] - block["X0"]
block_height = block["bbox"][3] - block["bbox"][1]
# If the height of the block is close to the average character height and the width is large, it is considered a single line
return block_height <= block["avg_char_height"] * 3 and block_width > block["avg_char_width"] * 3
def get_most_common_bboxes(bboxes, page_height, position="top", threshold=0.25, num_bboxes=3, min_frequency=2):
"""
This function gets the most common bboxes from the bboxes
Parameters
----------
bboxes : list
bboxes
page_height : float
height of the page
position : str, optional
"top" or "bottom", by default "top"
threshold : float, optional
threshold, by default 0.25
num_bboxes : int, optional
number of bboxes to return, by default 3
min_frequency : int, optional
minimum frequency of the bbox, by default 2
Returns
-------
common_bboxes : list
common bboxes
"""
# Filter bbox by position
if position == "top":
filtered_bboxes = [bbox for bbox in bboxes if bbox[1] < page_height * threshold]
else:
filtered_bboxes = [bbox for bbox in bboxes if bbox[3] > page_height * (1 - threshold)]
# Find the most common bbox
bbox_count = defaultdict(int)
for bbox in filtered_bboxes:
bbox_count[tuple(bbox)] += 1
# Get the most frequently occurring bbox, but only consider it when the frequency exceeds min_frequency
common_bboxes = [
bbox for bbox, count in sorted(bbox_count.items(), key=lambda item: item[1], reverse=True) if count >= min_frequency
][:num_bboxes]
return common_bboxes
def detect_footer_header2(result_dict, similarity_threshold=0.5):
"""
This function detects the header and footer of the document.
Parameters
----------
result_dict : dict
result dictionary
Returns
-------
result_dict : dict
result dictionary
"""
# Traverse all blocks in the document
single_line_blocks = 0
total_blocks = 0
single_line_blocks = 0
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
for block_key, block in blocks.items():
if block_key.startswith("block_"):
total_blocks += 1
if is_single_line_block(block):
single_line_blocks += 1
# If there are no blocks, skip the header and footer detection
if total_blocks == 0:
print("No blocks found. Skipping header/footer detection.")
return result_dict
# If most of the blocks are single-line, skip the header and footer detection
if single_line_blocks / total_blocks > 0.5: # 50% of the blocks are single-line
# print("Skipping header/footer detection for text-dense document.")
return result_dict
# Collect the bounding boxes of all blocks
all_bboxes = []
all_texts = []
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
for block_key, block in blocks.items():
if block_key.startswith("block_"):
all_bboxes.append(block["bbox"])
# Get the height of the page
page_height = max(bbox[3] for bbox in all_bboxes)
# Get the most common bbox lists for headers and footers
common_header_bboxes = get_most_common_bboxes(all_bboxes, page_height, position="top") if all_bboxes else []
common_footer_bboxes = get_most_common_bboxes(all_bboxes, page_height, position="bottom") if all_bboxes else []
# Detect and mark headers and footers
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
for block_key, block in blocks.items():
if block_key.startswith("block_"):
bbox = block["bbox"]
text = block["text"]
is_header = compare_bbox_with_list(bbox, common_header_bboxes)
is_footer = compare_bbox_with_list(bbox, common_footer_bboxes)
block["is_header"] = int(is_header)
block["is_footer"] = int(is_footer)
return result_dict
def __get_page_size(page_sizes:list):
"""
页面大小可能不一样
"""
w = sum([w for w,h in page_sizes])/len(page_sizes)
h = sum([h for w,h in page_sizes])/len(page_sizes)
return w, h
def __calculate_iou(bbox1, bbox2):
iou = calculate_iou(bbox1, bbox2)
return iou
def __is_same_pos(box1, box2, iou_threshold):
iou = __calculate_iou(box1, box2)
return iou >= iou_threshold
def get_most_common_bbox(bboxes:list, page_size:list, page_cnt:int, page_range_threshold=0.2, iou_threshold=0.9):
"""
common bbox必须大于page_cnt的1/3
"""
min_occurance_cnt = max(3, page_cnt//4)
header_det_bbox = []
footer_det_bbox = []
hdr_same_pos_group = []
btn_same_pos_group = []
page_w, page_h = __get_page_size(page_size)
top_y, bottom_y = page_w*page_range_threshold, page_h*(1-page_range_threshold)
top_bbox = [b for b in bboxes if b[3]<top_y]
bottom_bbox = [b for b in bboxes if b[1]>bottom_y]
# 然后开始排序,寻找最经常出现的bbox, 寻找的时候如果IOU>iou_threshold就算是一个
for i in range(0, len(top_bbox)):
hdr_same_pos_group.append([top_bbox[i]])
for j in range(i+1, len(top_bbox)):
if __is_same_pos(top_bbox[i], top_bbox[j], iou_threshold):
#header_det_bbox = [min(top_bbox[i][0], top_bbox[j][0]), min(top_bbox[i][1], top_bbox[j][1]), max(top_bbox[i][2], top_bbox[j][2]), max(top_bbox[i][3],top_bbox[j][3])]
hdr_same_pos_group[i].append(top_bbox[j])
for i in range(0, len(bottom_bbox)):
btn_same_pos_group.append([bottom_bbox[i]])
for j in range(i+1, len(bottom_bbox)):
if __is_same_pos(bottom_bbox[i], bottom_bbox[j], iou_threshold):
#footer_det_bbox = [min(bottom_bbox[i][0], bottom_bbox[j][0]), min(bottom_bbox[i][1], bottom_bbox[j][1]), max(bottom_bbox[i][2], bottom_bbox[j][2]), max(bottom_bbox[i][3],bottom_bbox[j][3])]
btn_same_pos_group[i].append(bottom_bbox[j])
# 然后看下每一组的bbox,是否符合大于page_cnt一定比例
hdr_same_pos_group = [g for g in hdr_same_pos_group if len(g)>=min_occurance_cnt]
btn_same_pos_group = [g for g in btn_same_pos_group if len(g)>=min_occurance_cnt]
# 平铺2个list[list]
hdr_same_pos_group = [bbox for g in hdr_same_pos_group for bbox in g]
btn_same_pos_group = [bbox for g in btn_same_pos_group for bbox in g]
# 寻找hdr_same_pos_group中的box[3]最大值,btn_same_pos_group中的box[1]最小值
hdr_same_pos_group.sort(key=lambda b:b[3])
btn_same_pos_group.sort(key=lambda b:b[1])
hdr_y = hdr_same_pos_group[-1][3] if hdr_same_pos_group else 0
btn_y = btn_same_pos_group[0][1] if btn_same_pos_group else page_h
header_det_bbox = [0, 0, page_w, hdr_y]
footer_det_bbox = [0, btn_y, page_w, page_h]
# logger.warning(f"header: {header_det_bbox}, footer: {footer_det_bbox}")
return header_det_bbox, footer_det_bbox, page_w, page_h
def drop_footer_header(pdf_info_dict:dict):
"""
启用规则探测,在全局的视角上通过统计的方法。
"""
header = []
footer = []
all_text_bboxes = [blk['bbox'] for _, val in pdf_info_dict.items() for blk in val['preproc_blocks']]
image_bboxes = [img['bbox'] for _, val in pdf_info_dict.items() for img in val['images']] + [img['bbox'] for _, val in pdf_info_dict.items() for img in val['image_backup']]
page_size = [val['page_size'] for _, val in pdf_info_dict.items()]
page_cnt = len(pdf_info_dict.keys()) # 一共多少页
header, footer, page_w, page_h = get_most_common_bbox(all_text_bboxes+image_bboxes, page_size, page_cnt)
""""
把范围扩展到页面水平的整个方向上
"""
if header:
header = [0, 0, page_w, header[3]+1]
if footer:
footer = [0, footer[1]-1, page_w, page_h]
# 找到footer, header范围之后,针对每一页pdf,从text、图片中删除这些范围内的内容
# 移除text block
for _, page_info in pdf_info_dict.items():
header_text_blk = []
footer_text_blk = []
for blk in page_info['preproc_blocks']:
blk_bbox = blk['bbox']
if header and blk_bbox[3]<=header[3]:
blk['tag'] = "header"
header_text_blk.append(blk)
elif footer and blk_bbox[1]>=footer[1]:
blk['tag'] = "footer"
footer_text_blk.append(blk)
# 放入text_block_droped中
page_info['droped_text_block'].extend(header_text_blk)
page_info['droped_text_block'].extend(footer_text_blk)
for blk in header_text_blk:
page_info['preproc_blocks'].remove(blk)
for blk in footer_text_blk:
page_info['preproc_blocks'].remove(blk)
"""接下来把footer、header上的图片也删除掉。图片包括正常的和backup的"""
header_image = []
footer_image = []
for image_info in page_info['images']:
img_bbox = image_info['bbox']
if header and img_bbox[3]<=header[3]:
image_info['tag'] = "header"
header_image.append(image_info)
elif footer and img_bbox[1]>=footer[1]:
image_info['tag'] = "footer"
footer_image.append(image_info)
page_info['droped_image_block'].extend(header_image)
page_info['droped_image_block'].extend(footer_image)
for img in header_image:
page_info['images'].remove(img)
for img in footer_image:
page_info['images'].remove(img)
"""接下来吧backup的图片也删除掉"""
header_image = []
footer_image = []
for image_info in page_info['image_backup']:
img_bbox = image_info['bbox']
if header and img_bbox[3]<=header[3]:
image_info['tag'] = "header"
header_image.append(image_info)
elif footer and img_bbox[1]>=footer[1]:
image_info['tag'] = "footer"
footer_image.append(image_info)
page_info['droped_image_block'].extend(header_image)
page_info['droped_image_block'].extend(footer_image)
for img in header_image:
page_info['image_backup'].remove(img)
for img in footer_image:
page_info['image_backup'].remove(img)
return header, footer
from collections import Counter
from magic_pdf.libs.commons import fitz # pyMuPDF库
from magic_pdf.libs.coordinate_transform import get_scale_ratio
def parse_footnotes_by_model(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict, md_bookname_save_path=None, debug_mode=False):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
#--------- 通过json_from_DocXchain来获取 footnote ---------#
footnote_bbox_from_DocXChain = []
xf_json = json_from_DocXchain_obj
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(xf_json, page)
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
for xf in xf_json['layout_dets']:
L = xf['poly'][0] / horizontal_scale_ratio
U = xf['poly'][1] / vertical_scale_ratio
R = xf['poly'][2] / horizontal_scale_ratio
D = xf['poly'][5] / vertical_scale_ratio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
# if xf['category_id'] == 5 and xf['score'] >= 0.3:
if xf['category_id'] == 5 and xf['score'] >= 0.43: # 新的footnote阈值
footnote_bbox_from_DocXChain.append((L, U, R, D))
footnote_final_names = []
footnote_final_bboxs = []
footnote_ID = 0
for L, U, R, D in footnote_bbox_from_DocXChain:
if debug_mode:
# cur_footnote = page.get_pixmap(clip=(L,U,R,D))
new_footnote_name = "footnote_{}_{}.png".format(page_ID, footnote_ID) # 脚注name
# cur_footnote.save(md_bookname_save_path + '/' + new_footnote_name) # 把脚注存储在新建的文件夹,并命名
footnote_final_names.append(new_footnote_name) # 把脚注的名字存在list中
footnote_final_bboxs.append((L, U, R, D))
footnote_ID += 1
footnote_final_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
curPage_all_footnote_bboxs = footnote_final_bboxs
return curPage_all_footnote_bboxs
def need_remove(block):
if 'lines' in block and len(block['lines']) > 0:
# block中只有一行,且该行文本全是大写字母,或字体为粗体bold关键词,SB关键词,把这个block捞回来
if len(block['lines']) == 1:
if 'spans' in block['lines'][0] and len(block['lines'][0]['spans']) == 1:
font_keywords = ['SB', 'bold', 'Bold']
if block['lines'][0]['spans'][0]['text'].isupper() or any(keyword in block['lines'][0]['spans'][0]['font'] for keyword in font_keywords):
return True
for line in block['lines']:
if 'spans' in line and len(line['spans']) > 0:
for span in line['spans']:
# 检测"keyword"是否在span中,忽略大小写
if "keyword" in span['text'].lower():
return True
return False
def parse_footnotes_by_rule(remain_text_blocks, page_height, page_id, main_text_font):
"""
根据给定的文本块、页高和页码,解析出符合规则的脚注文本块,并返回其边界框。
Args:
remain_text_blocks (list): 包含所有待处理的文本块的列表。
page_height (float): 页面的高度。
page_id (int): 页面的ID。
Returns:
list: 符合规则的脚注文本块的边界框列表。
"""
# if page_id > 20:
if page_id > 2: # 为保证精确度,先只筛选前3页
return []
else:
# 存储每一行的文本块大小的列表
line_sizes = []
# 存储每个文本块的平均行大小
block_sizes = []
# 存储每一行的字体信息
# font_names = []
font_names = Counter()
if len(remain_text_blocks) > 0:
for block in remain_text_blocks:
block_line_sizes = []
# block_fonts = []
block_fonts = Counter()
for line in block['lines']:
# 提取每个span的size属性,并计算行大小
span_sizes = [span['size'] for span in line['spans'] if 'size' in span]
if span_sizes:
line_size = sum(span_sizes) / len(span_sizes)
line_sizes.append(line_size)
block_line_sizes.append(line_size)
span_font = [(span['font'], len(span['text'])) for span in line['spans'] if 'font' in span and len(span['text']) > 0]
if span_font:
# main_text_font应该用基于字数最多的字体而不是span级别的统计
# font_names.append(font_name for font_name in span_font)
# block_fonts.append(font_name for font_name in span_font)
for font, count in span_font:
# font_names.extend([font] * count)
# block_fonts.extend([font] * count)
font_names[font] += count
block_fonts[font] += count
if block_line_sizes:
# 计算文本块的平均行大小
block_size = sum(block_line_sizes) / len(block_line_sizes)
# block_font = collections.Counter(block_fonts).most_common(1)[0][0]
block_font = block_fonts.most_common(1)[0][0]
block_sizes.append((block, block_size, block_font))
# 计算main_text_size
main_text_size = Counter(line_sizes).most_common(1)[0][0]
# 计算main_text_font
# main_text_font = collections.Counter(font_names).most_common(1)[0][0]
# main_text_font = font_names.most_common(1)[0][0]
# 删除一些可能被误识别为脚注的文本块
block_sizes = [(block, block_size, block_font) for block, block_size, block_font in block_sizes if not need_remove(block)]
# 检测footnote_block 并返回 footnote_bboxes
# footnote_bboxes = [block['bbox'] for block, block_size, block_font in block_sizes if
# block['bbox'][1] > page_height * 0.6 and block_size < main_text_size
# and (len(block['lines']) < 5 or block_font != main_text_font)]
# and len(block['lines']) < 5]
footnote_bboxes = [block['bbox'] for block, block_size, block_font in block_sizes if
block['bbox'][1] > page_height * 0.6 and
# 较为严格的规则
block_size < main_text_size and
(len(block['lines']) < 5 or
block_font != main_text_font)]
# 较为宽松的规则
# sum([block_size < main_text_size,
# len(block['lines']) < 5,
# block_font != main_text_font])
# >= 2]
return footnote_bboxes
else:
return []
from magic_pdf.libs.commons import fitz # pyMuPDF库
from magic_pdf.libs.coordinate_transform import get_scale_ratio
def parse_headers(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
#--------- 通过json_from_DocXchain来获取 header ---------#
header_bbox_from_DocXChain = []
xf_json = json_from_DocXchain_obj
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(xf_json, page)
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
for xf in xf_json['layout_dets']:
L = xf['poly'][0] / horizontal_scale_ratio
U = xf['poly'][1] / vertical_scale_ratio
R = xf['poly'][2] / horizontal_scale_ratio
D = xf['poly'][5] / vertical_scale_ratio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if xf['category_id'] == 3 and xf['score'] >= 0.3:
header_bbox_from_DocXChain.append((L, U, R, D))
header_final_names = []
header_final_bboxs = []
header_ID = 0
for L, U, R, D in header_bbox_from_DocXChain:
# cur_header = page.get_pixmap(clip=(L,U,R,D))
new_header_name = "header_{}_{}.png".format(page_ID, header_ID) # 页眉name
# cur_header.save(res_dir_path + '/' + new_header_name) # 把页眉存储在新建的文件夹,并命名
header_final_names.append(new_header_name) # 把页面的名字存在list中
header_final_bboxs.append((L, U, R, D))
header_ID += 1
header_final_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
curPage_all_header_bboxs = header_final_bboxs
return curPage_all_header_bboxs
import collections # 统计库
import re
from magic_pdf.libs.commons import fitz # pyMuPDF库
#--------------------------------------- Tool Functions --------------------------------------#
# 正则化,输入文本,输出只保留a-z,A-Z,0-9
def remove_special_chars(s: str) -> str:
pattern = r"[^a-zA-Z0-9]"
res = re.sub(pattern, "", s)
return res
def check_rect1_sameWith_rect2(L1: float, U1: float, R1: float, D1: float, L2: float, U2: float, R2: float, D2: float) -> bool:
# 判断rect1和rect2是否一模一样
return L1 == L2 and U1 == U2 and R1 == R2 and D1 == D2
def check_rect1_contains_rect2(L1: float, U1: float, R1: float, D1: float, L2: float, U2: float, R2: float, D2: float) -> bool:
# 判断rect1包含了rect2
return (L1 <= L2 <= R2 <= R1) and (U1 <= U2 <= D2 <= D1)
def check_rect1_overlaps_rect2(L1: float, U1: float, R1: float, D1: float, L2: float, U2: float, R2: float, D2: float) -> bool:
# 判断rect1与rect2是否存在重叠(只有一条边重叠,也算重叠)
return max(L1, L2) <= min(R1, R2) and max(U1, U2) <= min(D1, D2)
def calculate_overlapRatio_between_rect1_and_rect2(L1: float, U1: float, R1: float, D1: float, L2: float, U2: float, R2: float, D2: float) -> (float, float):
# 计算两个rect,重叠面积各占2个rect面积的比例
if min(R1, R2) < max(L1, L2) or min(D1, D2) < max(U1, U2):
return 0, 0
square_1 = (R1 - L1) * (D1 - U1)
square_2 = (R2 - L2) * (D2 - U2)
if square_1 == 0 or square_2 == 0:
return 0, 0
square_overlap = (min(R1, R2) - max(L1, L2)) * (min(D1, D2) - max(U1, U2))
return square_overlap / square_1, square_overlap / square_2
def calculate_overlapRatio_between_line1_and_line2(L1: float, R1: float, L2: float, R2: float) -> (float, float):
# 计算两个line,重叠区间各占2个line长度的比例
if max(L1, L2) > min(R1, R2):
return 0, 0
if L1 == R1 or L2 == R2:
return 0, 0
overlap_line = min(R1, R2) - max(L1, L2)
return overlap_line / (R1 - L1), overlap_line / (R2 - L2)
# 判断rect其实是一条line
def check_rect_isLine(L: float, U: float, R: float, D: float) -> bool:
width = R - L
height = D - U
if width <= 3 or height <= 3:
return True
if width / height >= 30 or height / width >= 30:
return True
def parse_images(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict, junk_img_bojids=[]):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
#### 通过fitz获取page信息
## 超越边界
DPI = 72 # use this resolution
pix = page.get_pixmap(dpi=DPI)
pageL = 0
pageR = int(pix.w)
pageU = 0
pageD = int(pix.h)
#----------------- 保存每一个文本块的LURD ------------------#
textLine_blocks = []
blocks = page.get_text(
"dict",
flags=fitz.TEXTFLAGS_TEXT,
#clip=clip,
)["blocks"]
for i in range(len(blocks)):
bbox = blocks[i]['bbox']
# print(bbox)
for tt in blocks[i]['lines']:
# 当前line
cur_line_bbox = None # 当前line,最右侧的section的bbox
for xf in tt['spans']:
L, U, R, D = xf['bbox']
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
textLine_blocks.append((L, U, R, D))
textLine_blocks.sort(key = lambda LURD: (LURD[1], LURD[0]))
#---------------------------------------------- 保存img --------------------------------------------------#
raw_imgs = page.get_images() # 获取所有的图片
imgs = []
img_names = [] # 保存图片的名字,方便在md中插入引用
img_bboxs = [] # 保存图片的location信息。
img_visited = [] # 记忆化,记录该图片是否在md中已经插入过了
img_ID = 0
## 获取、保存每张img的location信息(x1, y1, x2, y2, UL, DR坐标)
for i in range(len(raw_imgs)):
# 如果图片在junklist中则跳过
if raw_imgs[i][0] in junk_img_bojids:
continue
else:
try:
tt = page.get_image_rects(raw_imgs[i][0], transform = True)
rec = tt[0][0]
L, U, R, D = int(rec[0]), int(rec[1]), int(rec[2]), int(rec[3])
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if not(pageL <= L < R <= pageR and pageU <= U < D <= pageD):
continue
if pageL == L and R == pageR:
continue
if pageU == U and D == pageD:
continue
# pix1 = page.get_Pixmap(clip=(L,U,R,D))
new_img_name = "{}_{}.png".format(page_ID, i) # 图片name
# pix1.save(res_dir_path + '/' + new_img_name) # 把图片存出在新建的文件夹,并命名
img_names.append(new_img_name)
img_bboxs.append((L, U, R, D))
img_visited.append(False)
imgs.append(raw_imgs[i])
except:
continue
#-------- 如果img之间有重叠。说明获取的img大小有问题,位置也不一定对。就扔掉--------#
imgs_ok = [True for _ in range(len(imgs))]
for i in range(len(imgs)):
L1, U1, R1, D1 = img_bboxs[i]
for j in range(i + 1, len(imgs)):
L2, U2, R2, D2 = img_bboxs[j]
ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L1, U1, R1, D1, L2, U2, R2, D2)
s1 = abs(R1 - L1) * abs(D1 - U1)
s2 = abs(R2 - L2) * abs(D2 - U2)
if ratio_1 > 0 and ratio_2 > 0:
if ratio_1 == 1 and ratio_2 > 0.8:
imgs_ok[i] = False
elif ratio_1 > 0.8 and ratio_2 == 1:
imgs_ok[j] = False
elif s1 > 20000 and s2 > 20000 and ratio_1 > 0.4 and ratio_2 > 0.4:
imgs_ok[i] = False
imgs_ok[j] = False
elif s1 / s2 > 5 and ratio_2 > 0.5:
imgs_ok[j] = False
elif s2 / s1 > 5 and ratio_1 > 0.5:
imgs_ok[i] = False
imgs = [imgs[i] for i in range(len(imgs)) if imgs_ok[i] == True]
img_names = [img_names[i] for i in range(len(imgs)) if imgs_ok[i] == True]
img_bboxs = [img_bboxs[i] for i in range(len(imgs)) if imgs_ok[i] == True]
img_visited = [img_visited[i] for i in range(len(imgs)) if imgs_ok[i] == True]
#*******************************************************************************#
#---------------------------------------- 通过fitz提取svg的信息 -----------------------------------------#
#
svgs = page.get_drawings()
#------------ preprocess, check一些大框,看是否是合理的 ----------#
## 去重。有时候会遇到rect1和rect2是完全一样的情形。
svg_rect_visited = set()
available_svgIdx = []
for i in range(len(svgs)):
L, U, R, D = svgs[i]['rect'].irect
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
tt = (L, U, R, D)
if tt not in svg_rect_visited:
svg_rect_visited.add(tt)
available_svgIdx.append(i)
svgs = [svgs[i] for i in available_svgIdx] # 去重后,有效的svgs
svg_childs = [[] for _ in range(len(svgs))]
svg_parents = [[] for _ in range(len(svgs))]
svg_overlaps = [[] for _ in range(len(svgs))] #svg_overlaps[i]是一个list,存的是与svg_i有重叠的svg的index。e.g., svg_overlaps[0] = [1, 2, 7, 9]
svg_visited = [False for _ in range(len(svgs))]
svg_exceedPage = [0 for _ in range(len(svgs))] # 是否超越边界(artbox),很大,但一般是一个svg的底。
for i in range(len(svgs)):
L, U, R, D = svgs[i]['rect'].irect
ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L, U, R, D, pageL, pageU, pageR, pageD)
if (pageL + 20 < L <= R < pageR - 20) and (pageU + 20 < U <= D < pageD - 20):
if ratio_2 >= 0.7:
svg_exceedPage[i] += 4
else:
if L <= pageL:
svg_exceedPage[i] += 1
if pageR <= R:
svg_exceedPage[i] += 1
if U <= pageU:
svg_exceedPage[i] += 1
if pageD <= D:
svg_exceedPage[i] += 1
#### 如果有≥2个的超边界的框,就不要手写规则判断svg了。很难写对。
if len([x for x in svg_exceedPage if x >= 1]) >= 2:
svgs = []
svg_childs = []
svg_parents = []
svg_overlaps = []
svg_visited = []
svg_exceedPage = []
#---------------------------- build graph ----------------------------#
for i, p in enumerate(svgs):
L1, U1, R1, D1 = svgs[i]["rect"].irect
for j in range(len(svgs)):
if i == j:
continue
L2, U2, R2, D2 = svgs[j]["rect"].irect
## 包含
if check_rect1_contains_rect2(L1, U1, R1, D1, L2, U2, R2, D2) == True:
svg_childs[i].append(j)
svg_parents[j].append(i)
else:
## 交叉
if check_rect1_overlaps_rect2(L1, U1, R1, D1, L2, U2, R2, D2) == True:
svg_overlaps[i].append(j)
#---------------- 确定最终的svg。连通块儿的外围 -------------------#
eps_ERROR = 5 # 给识别出的svg,四周留白(为了防止pyMuPDF的rect不准)
svg_ID = 0
svg_final_names = []
svg_final_bboxs = []
svg_final_visited = [] # 为下面,text识别左准备。作用同img_visited
svg_idxs = [i for i in range(len(svgs))]
svg_idxs.sort(key = lambda i: -(svgs[i]['rect'].irect[2] - svgs[i]['rect'].irect[0]) * (svgs[i]['rect'].irect[3] - svgs[i]['rect'].irect[1])) # 按照面积,从大到小排序
for i in svg_idxs:
if svg_visited[i] == True:
continue
svg_visited[i] = True
L, U, R, D = svgs[i]['rect'].irect
width = R - L
height = D - U
if check_rect_isLine(L, U, R, D) == True:
svg_visited[i] = False
continue
# if i == 4:
# print(i, L, U, R, D)
# print(svg_parents[i])
cur_block_element_cnt = 0 # 当前要判定为svg的区域中,有多少elements,最外围的最大svg框除外。
if len(svg_parents[i]) == 0:
## 是个普通框的情形
cur_block_element_cnt += len(svg_childs[i])
if svg_exceedPage[i] == 0:
## 误差。可能已经包含在某个框里面了
neglect_flag = False
for pL, pU, pR, pD in svg_final_bboxs:
if pL <= L <= R <= pR and pU <= U <= D <= pD:
neglect_flag = True
break
if neglect_flag == True:
continue
## 搜索连通域, bfs+记忆化
q = collections.deque()
for j in svg_overlaps[i]:
q.append(j)
while q:
j = q.popleft()
svg_visited[j] = True
L2, U2, R2, D2 = svgs[j]['rect'].irect
# width2 = R2 - L2
# height2 = D2 - U2
# if width2 <= 2 or height2 <= 2 or (height2 / width2) >= 30 or (width2 / height2) >= 30:
# continue
L = min(L, L2)
R = max(R, R2)
U = min(U, U2)
D = max(D, D2)
cur_block_element_cnt += 1
cur_block_element_cnt += len(svg_childs[j])
for k in svg_overlaps[j]:
if svg_visited[k] == False and svg_exceedPage[k] == 0:
svg_visited[k] = True
q.append(k)
elif svg_exceedPage[i] <= 2:
## 误差。可能已经包含在某个svg_final_bbox框里面了
neglect_flag = False
for sL, sU, sR, sD in svg_final_bboxs:
if sL <= L <= R <= sR and sU <= U <= D <= sD:
neglect_flag = True
break
if neglect_flag == True:
continue
L, U, R, D = pageR, pageD, pageL, pageU
## 所有孩子元素的最大边界
for j in svg_childs[i]:
if svg_visited[j] == True:
continue
if svg_exceedPage[j] >= 1:
continue
svg_visited[j] = True #### 这个位置考虑一下
L2, U2, R2, D2 = svgs[j]['rect'].irect
L = min(L, L2)
R = max(R, R2)
U = min(U, U2)
D = max(D, D2)
cur_block_element_cnt += 1
# 如果是条line,就不用保存了
if check_rect_isLine(L, U, R, D) == True:
continue
# 如果当前的svg,连2个elements都没有,就不用保存了
if cur_block_element_cnt < 3:
continue
## 当前svg,框住了多少文本框。如果框多了,可能就是错了
contain_textLineBlock_cnt = 0
for L2, U2, R2, D2 in textLine_blocks:
if check_rect1_contains_rect2(L, U, R, D, L2, U2, R2, D2) == True:
contain_textLineBlock_cnt += 1
if contain_textLineBlock_cnt >= 10:
continue
# L -= eps_ERROR * 2
# U -= eps_ERROR
# R += eps_ERROR * 2
# D += eps_ERROR
# # cur_svg = page.get_pixmap(matrix=fitz.Identity, dpi=None, colorspace=fitz.csRGB, clip=(U,L,R,D), alpha=False, annots=True)
# cur_svg = page.get_pixmap(clip=(L,U,R,D))
new_svg_name = "svg_{}_{}.png".format(page_ID, svg_ID) # 图片name
# cur_svg.save(res_dir_path + '/' + new_svg_name) # 把图片存出在新建的文件夹,并命名
svg_final_names.append(new_svg_name) # 把图片的名字存在list中,方便在md中插入引用
svg_final_bboxs.append((L, U, R, D))
svg_final_visited.append(False)
svg_ID += 1
## 识别出的svg,可能有 包含,相邻的情形。需要进一步合并
svg_idxs = [i for i in range(len(svg_final_bboxs))]
svg_idxs.sort(key = lambda i: (svg_final_bboxs[i][1], svg_final_bboxs[i][0])) # (U, L)
svg_final_names_2 = []
svg_final_bboxs_2 = []
svg_final_visited_2 = [] # 为下面,text识别左准备。作用同img_visited
svg_ID_2 = 0
for i in range(len(svg_final_bboxs)):
L1, U1, R1, D1 = svg_final_bboxs[i]
for j in range(i + 1, len(svg_final_bboxs)):
L2, U2, R2, D2 = svg_final_bboxs[j]
# 如果 rect1包含了rect2
if check_rect1_contains_rect2(L1, U1, R1, D1, L2, U2, R2, D2) == True:
svg_final_visited[j] = True
continue
# 水平并列
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(U1, D1, U2, D2)
if ratio_1 >= 0.7 and ratio_2 >= 0.7:
if abs(L2 - R1) >= 20:
continue
LL = min(L1, L2)
UU = min(U1, U2)
RR = max(R1, R2)
DD = max(D1, D2)
svg_final_bboxs[i] = (LL, UU, RR, DD)
svg_final_visited[j] = True
continue
# 竖直并列
ratio_1, ratio_2 = calculate_overlapRatio_between_line1_and_line2(L1, R2, L2, R2)
if ratio_1 >= 0.7 and ratio_2 >= 0.7:
if abs(U2 - D1) >= 20:
continue
LL = min(L1, L2)
UU = min(U1, U2)
RR = max(R1, R2)
DD = max(D1, D2)
svg_final_bboxs[i] = (LL, UU, RR, DD)
svg_final_visited[j] = True
for i in range(len(svg_final_bboxs)):
if svg_final_visited[i] == False:
L, U, R, D = svg_final_bboxs[i]
svg_final_bboxs_2.append((L, U, R, D))
L -= eps_ERROR * 2
U -= eps_ERROR
R += eps_ERROR * 2
D += eps_ERROR
# cur_svg = page.get_pixmap(clip=(L,U,R,D))
new_svg_name = "svg_{}_{}.png".format(page_ID, svg_ID_2) # 图片name
# cur_svg.save(res_dir_path + '/' + new_svg_name) # 把图片存出在新建的文件夹,并命名
svg_final_names_2.append(new_svg_name) # 把图片的名字存在list中,方便在md中插入引用
svg_final_bboxs_2.append((L, U, R, D))
svg_final_visited_2.append(False)
svg_ID_2 += 1
## svg收尾。识别为drawing,但是在上面没有拼成一张图的。
# 有收尾才comprehensive
# xxxx
# xxxx
# xxxx
# xxxx
#--------- 通过json_from_DocXchain来获取,figure, table, equation的bbox ---------#
figure_bbox_from_DocXChain = []
figure_from_DocXChain_visited = [] # 记忆化
figure_bbox_from_DocXChain_overlappedRatio = []
figure_only_from_DocXChain_bboxs = [] # 存储
figure_only_from_DocXChain_names = []
figure_only_from_DocXChain_visited = []
figure_only_ID = 0
xf_json = json_from_DocXchain_obj
width_from_json = xf_json['page_info']['width']
height_from_json = xf_json['page_info']['height']
LR_scaleRatio = width_from_json / (pageR - pageL)
UD_scaleRatio = height_from_json / (pageD - pageU)
for xf in xf_json['layout_dets']:
# {0: 'title', 1: 'figure', 2: 'plain text', 3: 'header', 4: 'page number', 5: 'footnote', 6: 'footer', 7: 'table', 8: 'table caption', 9: 'figure caption', 10: 'equation', 11: 'full column', 12: 'sub column'}
L = xf['poly'][0] / LR_scaleRatio
U = xf['poly'][1] / UD_scaleRatio
R = xf['poly'][2] / LR_scaleRatio
D = xf['poly'][5] / UD_scaleRatio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
# figure
if xf["category_id"] == 1 and xf['score'] >= 0.3:
figure_bbox_from_DocXChain.append((L, U, R, D))
figure_from_DocXChain_visited.append(False)
figure_bbox_from_DocXChain_overlappedRatio.append(0.0)
#---------------------- 比对上面识别出来的img,svg 与DocXChain给的figure -----------------------#
## 比对imgs
for i, b1 in enumerate(figure_bbox_from_DocXChain):
# print('--------- DocXChain的图片', b1)
L1, U1, R1, D1 = b1
for b2 in img_bboxs:
# print('-------- igms得到的图', b2)
L2, U2, R2, D2 = b2
s1 = abs(R1 - L1) * abs(D1 - U1)
s2 = abs(R2 - L2) * abs(D2 - U2)
# 相同
if check_rect1_sameWith_rect2(L1, U1, R1, D1, L2, U2, R2, D2) == True:
figure_from_DocXChain_visited[i] = True
# 包含
elif check_rect1_contains_rect2(L1, U1, R1, D1, L2, U2, R2, D2) == True:
if s2 / s1 > 0.8:
figure_from_DocXChain_visited[i] = True
elif check_rect1_contains_rect2(L2, U2, R2, D2, L1, U1, R1, D1) == True:
if s1 / s2 > 0.8:
figure_from_DocXChain_visited[i] = True
else:
# 重叠了相当一部分
# print('进入第3部分')
ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L1, U1, R1, D1, L2, U2, R2, D2)
if (ratio_1 >= 0.6 and ratio_2 >= 0.6) or (ratio_1 >= 0.8 and s1/s2>0.8) or (ratio_2 >= 0.8 and s2/s1>0.8):
figure_from_DocXChain_visited[i] = True
else:
figure_bbox_from_DocXChain_overlappedRatio[i] += ratio_1
# print('图片的重叠率是{}'.format(ratio_1))
## 比对svgs
svg_final_bboxs_2_badIdxs = []
for i, b1 in enumerate(figure_bbox_from_DocXChain):
L1, U1, R1, D1 = b1
for j, b2 in enumerate(svg_final_bboxs_2):
L2, U2, R2, D2 = b2
s1 = abs(R1 - L1) * abs(D1 - U1)
s2 = abs(R2 - L2) * abs(D2 - U2)
# 相同
if check_rect1_sameWith_rect2(L1, U1, R1, D1, L2, U2, R2, D2) == True:
figure_from_DocXChain_visited[i] = True
# 包含
elif check_rect1_contains_rect2(L1, U1, R1, D1, L2, U2, R2, D2) == True:
figure_from_DocXChain_visited[i] = True
elif check_rect1_contains_rect2(L2, U2, R2, D2, L1, U1, R1, D1) == True:
if s1 / s2 > 0.7:
figure_from_DocXChain_visited[i] = True
else:
svg_final_bboxs_2_badIdxs.append(j) # svg丢弃。用DocXChain的结果。
else:
# 重叠了相当一部分
ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L1, U1, R1, D1, L2, U2, R2, D2)
if (ratio_1 >= 0.5 and ratio_2 >= 0.5) or (min(ratio_1, ratio_2) >= 0.4 and max(ratio_1, ratio_2) >= 0.6):
figure_from_DocXChain_visited[i] = True
else:
figure_bbox_from_DocXChain_overlappedRatio[i] += ratio_1
# 丢掉错误的svg
svg_final_bboxs_2 = [svg_final_bboxs_2[i] for i in range(len(svg_final_bboxs_2)) if i not in set(svg_final_bboxs_2_badIdxs)]
for i in range(len(figure_from_DocXChain_visited)):
if figure_bbox_from_DocXChain_overlappedRatio[i] >= 0.7:
figure_from_DocXChain_visited[i] = True
# DocXChain识别出来的figure,但是没被保存的。
for i in range(len(figure_from_DocXChain_visited)):
if figure_from_DocXChain_visited[i] == False:
figure_from_DocXChain_visited[i] = True
cur_bbox = figure_bbox_from_DocXChain[i]
# cur_figure = page.get_pixmap(clip=cur_bbox)
new_figure_name = "figure_only_{}_{}.png".format(page_ID, figure_only_ID) # 图片name
# cur_figure.save(res_dir_path + '/' + new_figure_name) # 把图片存出在新建的文件夹,并命名
figure_only_from_DocXChain_names.append(new_figure_name) # 把图片的名字存在list中,方便在md中插入引用
figure_only_from_DocXChain_bboxs.append(cur_bbox)
figure_only_from_DocXChain_visited.append(False)
figure_only_ID += 1
img_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
svg_final_bboxs_2.sort(key = lambda LURD: (LURD[1], LURD[0]))
figure_only_from_DocXChain_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
curPage_all_fig_bboxs = img_bboxs + svg_final_bboxs + figure_only_from_DocXChain_bboxs
#--------------------------- 最后统一去重 -----------------------------------#
curPage_all_fig_bboxs.sort(key = lambda LURD: ( (LURD[2]-LURD[0])*(LURD[3]-LURD[1]) , LURD[0], LURD[1]) )
#### 先考虑包含关系的小块
final_duplicate = set()
for i in range(len(curPage_all_fig_bboxs)):
L1, U1, R1, D1 = curPage_all_fig_bboxs[i]
for j in range(len(curPage_all_fig_bboxs)):
if i == j:
continue
L2, U2, R2, D2 = curPage_all_fig_bboxs[j]
s1 = abs(R1 - L1) * abs(D1 - U1)
s2 = abs(R2 - L2) * abs(D2 - U2)
if check_rect1_contains_rect2(L2, U2, R2, D2, L1, U1, R1, D1) == True:
final_duplicate.add((L1, U1, R1, D1))
else:
ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L1, U1, R1, D1, L2, U2, R2, D2)
if ratio_1 >= 0.8 and ratio_2 <= 0.6:
final_duplicate.add((L1, U1, R1, D1))
curPage_all_fig_bboxs = [LURD for LURD in curPage_all_fig_bboxs if LURD not in final_duplicate]
#### 再考虑重叠关系的块
final_duplicate = set()
final_synthetic_bboxs = []
for i in range(len(curPage_all_fig_bboxs)):
L1, U1, R1, D1 = curPage_all_fig_bboxs[i]
for j in range(len(curPage_all_fig_bboxs)):
if i == j:
continue
L2, U2, R2, D2 = curPage_all_fig_bboxs[j]
s1 = abs(R1 - L1) * abs(D1 - U1)
s2 = abs(R2 - L2) * abs(D2 - U2)
ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L1, U1, R1, D1, L2, U2, R2, D2)
union_ok = False
if (ratio_1 >= 0.8 and ratio_2 <= 0.6) or (ratio_1 > 0.6 and ratio_2 > 0.6):
union_ok = True
if (ratio_1 > 0.2 and s2 / s1 > 5):
union_ok = True
if (L1 <= (L2+R2)/2 <= R1) and (U1 <= (U2+D2)/2 <= D1):
union_ok = True
if (L2 <= (L1+R1)/2 <= R2) and (U2 <= (U1+D1)/2 <= D2):
union_ok = True
if union_ok == True:
final_duplicate.add((L1, U1, R1, D1))
final_duplicate.add((L2, U2, R2, D2))
L3, U3, R3, D3 = min(L1, L2), min(U1, U2), max(R1, R2), max(D1, D2)
final_synthetic_bboxs.append((L3, U3, R3, D3))
# print('---------- curPage_all_fig_bboxs ---------')
# print(curPage_all_fig_bboxs)
curPage_all_fig_bboxs = [b for b in curPage_all_fig_bboxs if b not in final_duplicate]
final_synthetic_bboxs = list(set(final_synthetic_bboxs))
## 再再考虑重叠关系。极端情况下会迭代式地2进1
new_images = []
droped_img_idx = []
image_bboxes = [[b[0], b[1], b[2], b[3]] for b in final_synthetic_bboxs]
for i in range(0, len(image_bboxes)):
for j in range(i+1, len(image_bboxes)):
if j not in droped_img_idx:
L2, U2, R2, D2 = image_bboxes[j]
s1 = abs(R1 - L1) * abs(D1 - U1)
s2 = abs(R2 - L2) * abs(D2 - U2)
ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L1, U1, R1, D1, L2, U2, R2, D2)
union_ok = False
if (ratio_1 >= 0.8 and ratio_2 <= 0.6) or (ratio_1 > 0.6 and ratio_2 > 0.6):
union_ok = True
if (ratio_1 > 0.2 and s2 / s1 > 5):
union_ok = True
if (L1 <= (L2+R2)/2 <= R1) and (U1 <= (U2+D2)/2 <= D1):
union_ok = True
if (L2 <= (L1+R1)/2 <= R2) and (U2 <= (U1+D1)/2 <= D2):
union_ok = True
if union_ok == True:
# 合并
image_bboxes[i][0], image_bboxes[i][1],image_bboxes[i][2],image_bboxes[i][3] = min(image_bboxes[i][0], image_bboxes[j][0]), min(image_bboxes[i][1], image_bboxes[j][1]), max(image_bboxes[i][2], image_bboxes[j][2]), max(image_bboxes[i][3], image_bboxes[j][3])
droped_img_idx.append(j)
for i in range(0, len(image_bboxes)):
if i not in droped_img_idx:
new_images.append(image_bboxes[i])
# find_union_FLAG = True
# while find_union_FLAG == True:
# find_union_FLAG = False
# final_duplicate = set()
# tmp = []
# for i in range(len(final_synthetic_bboxs)):
# L1, U1, R1, D1 = final_synthetic_bboxs[i]
# for j in range(len(final_synthetic_bboxs)):
# if i == j:
# continue
# L2, U2, R2, D2 = final_synthetic_bboxs[j]
# s1 = abs(R1 - L1) * abs(D1 - U1)
# s2 = abs(R2 - L2) * abs(D2 - U2)
# ratio_1, ratio_2 = calculate_overlapRatio_between_rect1_and_rect2(L1, U1, R1, D1, L2, U2, R2, D2)
# union_ok = False
# if (ratio_1 >= 0.8 and ratio_2 <= 0.6) or (ratio_1 > 0.6 and ratio_2 > 0.6):
# union_ok = True
# if (ratio_1 > 0.2 and s2 / s1 > 5):
# union_ok = True
# if (L1 <= (L2+R2)/2 <= R1) and (U1 <= (U2+D2)/2 <= D1):
# union_ok = True
# if (L2 <= (L1+R1)/2 <= R2) and (U2 <= (U1+D1)/2 <= D2):
# union_ok = True
# if union_ok == True:
# find_union_FLAG = True
# final_duplicate.add((L1, U1, R1, D1))
# final_duplicate.add((L2, U2, R2, D2))
# L3, U3, R3, D3 = min(L1, L2), min(U1, U2), max(R1, R2), max(D1, D2)
# tmp.append((L3, U3, R3, D3))
# if find_union_FLAG == True:
# tmp = list(set(tmp))
# final_synthetic_bboxs = tmp[:]
# curPage_all_fig_bboxs += final_synthetic_bboxs
# print('--------- final synthetic')
# print(final_synthetic_bboxs)
#**************************************************************************#
images1 = [[img[0], img[1], img[2], img[3]] for img in curPage_all_fig_bboxs]
images = images1 + new_images
return images
from magic_pdf.libs.commons import fitz # pyMuPDF库
from magic_pdf.libs.coordinate_transform import get_scale_ratio
def parse_pageNos(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
#--------- 通过json_from_DocXchain来获取 pageNo ---------#
pageNo_bbox_from_DocXChain = []
xf_json = json_from_DocXchain_obj
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(xf_json, page)
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
for xf in xf_json['layout_dets']:
L = xf['poly'][0] / horizontal_scale_ratio
U = xf['poly'][1] / vertical_scale_ratio
R = xf['poly'][2] / horizontal_scale_ratio
D = xf['poly'][5] / vertical_scale_ratio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if xf['category_id'] == 4 and xf['score'] >= 0.3:
pageNo_bbox_from_DocXChain.append((L, U, R, D))
pageNo_final_names = []
pageNo_final_bboxs = []
pageNo_ID = 0
for L, U, R, D in pageNo_bbox_from_DocXChain:
# cur_pageNo = page.get_pixmap(clip=(L,U,R,D))
new_pageNo_name = "pageNo_{}_{}.png".format(page_ID, pageNo_ID) # 页码name
# cur_pageNo.save(res_dir_path + '/' + new_pageNo_name) # 把页码存储在新建的文件夹,并命名
pageNo_final_names.append(new_pageNo_name) # 把页码的名字存在list中
pageNo_final_bboxs.append((L, U, R, D))
pageNo_ID += 1
pageNo_final_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
curPage_all_pageNo_bboxs = pageNo_final_bboxs
return curPage_all_pageNo_bboxs
from magic_pdf.libs.commons import fitz # pyMuPDF库
def parse_tables(page_ID: int, page: fitz.Page, json_from_DocXchain_obj: dict):
"""
:param page_ID: int类型,当前page在当前pdf文档中是第page_D页。
:param page :fitz读取的当前页的内容
:param res_dir_path: str类型,是每一个pdf文档,在当前.py文件的目录下生成一个与pdf文档同名的文件夹,res_dir_path就是文件夹的dir
:param json_from_DocXchain_obj: dict类型,把pdf文档送入DocXChain模型中后,提取bbox,结果保存到pdf文档同名文件夹下的 page_ID.json文件中了。json_from_DocXchain_obj就是打开后的dict
"""
DPI = 72 # use this resolution
pix = page.get_pixmap(dpi=DPI)
pageL = 0
pageR = int(pix.w)
pageU = 0
pageD = int(pix.h)
#--------- 通过json_from_DocXchain来获取 table ---------#
table_bbox_from_DocXChain = []
xf_json = json_from_DocXchain_obj
width_from_json = xf_json['page_info']['width']
height_from_json = xf_json['page_info']['height']
LR_scaleRatio = width_from_json / (pageR - pageL)
UD_scaleRatio = height_from_json / (pageD - pageU)
for xf in xf_json['layout_dets']:
# {0: 'title', 1: 'figure', 2: 'plain text', 3: 'header', 4: 'page number', 5: 'footnote', 6: 'footer', 7: 'table', 8: 'table caption', 9: 'figure caption', 10: 'equation', 11: 'full column', 12: 'sub column'}
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
L = xf['poly'][0] / LR_scaleRatio
U = xf['poly'][1] / UD_scaleRatio
R = xf['poly'][2] / LR_scaleRatio
D = xf['poly'][5] / UD_scaleRatio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
if xf['category_id'] == 7 and xf['score'] >= 0.3:
table_bbox_from_DocXChain.append((L, U, R, D))
table_final_names = []
table_final_bboxs = []
table_ID = 0
for L, U, R, D in table_bbox_from_DocXChain:
# cur_table = page.get_pixmap(clip=(L,U,R,D))
new_table_name = "table_{}_{}.png".format(page_ID, table_ID) # 表格name
# cur_table.save(res_dir_path + '/' + new_table_name) # 把表格存出在新建的文件夹,并命名
table_final_names.append(new_table_name) # 把表格的名字存在list中,方便在md中插入引用
table_final_bboxs.append((L, U, R, D))
table_ID += 1
table_final_bboxs.sort(key = lambda LURD: (LURD[1], LURD[0]))
curPage_all_table_bboxs = table_final_bboxs
return curPage_all_table_bboxs
"""对pymupdf返回的结构里的公式进行替换,替换为模型识别的公式结果."""
import json
import os
from pathlib import Path
from loguru import logger
from magic_pdf.config.ocr_content_type import ContentType
from magic_pdf.libs.commons import fitz
TYPE_INLINE_EQUATION = ContentType.InlineEquation
TYPE_INTERLINE_EQUATION = ContentType.InterlineEquation
def combine_chars_to_pymudict(block_dict, char_dict):
"""把block级别的pymupdf 结构里加入char结构."""
# 因为block_dict 被裁剪过,因此先把他和char_dict文字块对齐,才能进行补充
char_map = {tuple(item['bbox']): item for item in char_dict}
for i in range(len(block_dict)): # block
block = block_dict[i]
key = block['bbox']
char_dict_item = char_map[tuple(key)]
char_dict_map = {tuple(item['bbox']): item for item in char_dict_item['lines']}
for j in range(len(block['lines'])):
lines = block['lines'][j]
with_char_lines = char_dict_map[lines['bbox']]
for k in range(len(lines['spans'])):
spans = lines['spans'][k]
try:
chars = with_char_lines['spans'][k]['chars']
except Exception:
logger.error(char_dict[i]['lines'][j])
spans['chars'] = chars
return block_dict
def calculate_overlap_area_2_minbox_area_ratio(bbox1, min_bbox):
"""计算box1和box2的重叠面积占最小面积的box的比例."""
# Determine the coordinates of the intersection rectangle
x_left = max(bbox1[0], min_bbox[0])
y_top = max(bbox1[1], min_bbox[1])
x_right = min(bbox1[2], min_bbox[2])
y_bottom = min(bbox1[3], min_bbox[3])
if x_right < x_left or y_bottom < y_top:
return 0.0
# The area of overlap area
intersection_area = (x_right - x_left) * (y_bottom - y_top)
min_box_area = (min_bbox[3] - min_bbox[1]) * (min_bbox[2] - min_bbox[0])
if min_box_area == 0:
return 0
else:
return intersection_area / min_box_area
def _is_xin(bbox1, bbox2):
area1 = abs(bbox1[2] - bbox1[0]) * abs(bbox1[3] - bbox1[1])
area2 = abs(bbox2[2] - bbox2[0]) * abs(bbox2[3] - bbox2[1])
if area1 < area2:
ratio = calculate_overlap_area_2_minbox_area_ratio(bbox2, bbox1)
else:
ratio = calculate_overlap_area_2_minbox_area_ratio(bbox1, bbox2)
return ratio > 0.6
def remove_text_block_in_interline_equation_bbox(interline_bboxes, text_blocks):
"""消除掉整个块都在行间公式块内部的文本块."""
for eq_bbox in interline_bboxes:
removed_txt_blk = []
for text_blk in text_blocks:
text_bbox = text_blk['bbox']
if (
calculate_overlap_area_2_minbox_area_ratio(eq_bbox['bbox'], text_bbox)
>= 0.7
):
removed_txt_blk.append(text_blk)
for blk in removed_txt_blk:
text_blocks.remove(blk)
return text_blocks
def _is_in_or_part_overlap(box1, box2) -> bool:
"""两个bbox是否有部分重叠或者包含."""
if box1 is None or box2 is None:
return False
x0_1, y0_1, x1_1, y1_1 = box1
x0_2, y0_2, x1_2, y1_2 = box2
return not (
x1_1 < x0_2 # box1在box2的左边
or x0_1 > x1_2 # box1在box2的右边
or y1_1 < y0_2 # box1在box2的上边
or y0_1 > y1_2
) # box1在box2的下边
def remove_text_block_overlap_interline_equation_bbox(
interline_eq_bboxes, pymu_block_list
):
"""消除掉行行内公式有部分重叠的文本块的内容。 同时重新计算消除重叠之后文本块的大小."""
deleted_block = []
for text_block in pymu_block_list:
deleted_line = []
for line in text_block['lines']:
deleted_span = []
for span in line['spans']:
deleted_chars = []
for char in span['chars']:
if any(
[
(
calculate_overlap_area_2_minbox_area_ratio(
eq_bbox['bbox'], char['bbox']
)
> 0.5
)
for eq_bbox in interline_eq_bboxes
]
):
deleted_chars.append(char)
# 检查span里没有char则删除这个span
for char in deleted_chars:
span['chars'].remove(char)
# 重新计算这个span的大小
if len(span['chars']) == 0: # 删除这个span
deleted_span.append(span)
else:
span['bbox'] = (
min([b['bbox'][0] for b in span['chars']]),
min([b['bbox'][1] for b in span['chars']]),
max([b['bbox'][2] for b in span['chars']]),
max([b['bbox'][3] for b in span['chars']]),
)
# 检查这个span
for span in deleted_span:
line['spans'].remove(span)
if len(line['spans']) == 0: # 删除这个line
deleted_line.append(line)
else:
line['bbox'] = (
min([b['bbox'][0] for b in line['spans']]),
min([b['bbox'][1] for b in line['spans']]),
max([b['bbox'][2] for b in line['spans']]),
max([b['bbox'][3] for b in line['spans']]),
)
# 检查这个block是否可以删除
for line in deleted_line:
text_block['lines'].remove(line)
if len(text_block['lines']) == 0: # 删除block
deleted_block.append(text_block)
else:
text_block['bbox'] = (
min([b['bbox'][0] for b in text_block['lines']]),
min([b['bbox'][1] for b in text_block['lines']]),
max([b['bbox'][2] for b in text_block['lines']]),
max([b['bbox'][3] for b in text_block['lines']]),
)
# 检查text block删除
for block in deleted_block:
pymu_block_list.remove(block)
if len(pymu_block_list) == 0:
return []
return pymu_block_list
def insert_interline_equations_textblock(interline_eq_bboxes, pymu_block_list):
"""在行间公式对应的地方插上一个伪造的block."""
for eq in interline_eq_bboxes:
bbox = eq['bbox']
latex_content = eq['latex']
text_block = {
'number': len(pymu_block_list),
'type': 0,
'bbox': bbox,
'lines': [
{
'spans': [
{
'size': 9.962599754333496,
'type': TYPE_INTERLINE_EQUATION,
'flags': 4,
'font': TYPE_INTERLINE_EQUATION,
'color': 0,
'ascender': 0.9409999847412109,
'descender': -0.3050000071525574,
'latex': latex_content,
'origin': [bbox[0], bbox[1]],
'bbox': bbox,
}
],
'wmode': 0,
'dir': [1.0, 0.0],
'bbox': bbox,
}
],
}
pymu_block_list.append(text_block)
def x_overlap_ratio(box1, box2):
a, _, c, _ = box1
e, _, g, _ = box2
# 计算重叠宽度
overlap_x = max(min(c, g) - max(a, e), 0)
# 计算box1的宽度
width1 = g - e
# 计算重叠比例
overlap_ratio = overlap_x / width1 if width1 != 0 else 0
return overlap_ratio
def __is_x_dir_overlap(bbox1, bbox2):
return not (bbox1[2] < bbox2[0] or bbox1[0] > bbox2[2])
def __y_overlap_ratio(box1, box2):
""""""
_, b, _, d = box1
_, f, _, h = box2
# 计算重叠高度
overlap_y = max(min(d, h) - max(b, f), 0)
# 计算box1的高度
height1 = d - b
# 计算重叠比例
overlap_ratio = overlap_y / height1 if height1 != 0 else 0
return overlap_ratio
def replace_line_v2(eqinfo, line):
"""扫描这一行所有的和公式框X方向重叠的char,然后计算char的左、右x0, x1,位于这个区间内的span删除掉。
最后与这个x0,x1有相交的span0, span1内部进行分割。"""
first_overlap_span = -1
first_overlap_span_idx = -1
last_overlap_span = -1
delete_chars = []
for i in range(0, len(line['spans'])):
if 'chars' not in line['spans'][i]:
continue
if line['spans'][i].get('_type', None) is not None:
continue # 忽略,因为已经是插入的伪造span公式了
for char in line['spans'][i]['chars']:
if __is_x_dir_overlap(eqinfo['bbox'], char['bbox']):
line_txt = ''
for span in line['spans']:
span_txt = '<span>'
for ch in span['chars']:
span_txt = span_txt + ch['c']
span_txt = span_txt + '</span>'
line_txt = line_txt + span_txt
if first_overlap_span_idx == -1:
first_overlap_span = line['spans'][i]
first_overlap_span_idx = i
last_overlap_span = line['spans'][i]
delete_chars.append(char)
# 第一个和最后一个char要进行检查,到底属于公式多还是属于正常span多
if len(delete_chars) > 0:
ch0_bbox = delete_chars[0]['bbox']
if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51:
delete_chars.remove(delete_chars[0])
if len(delete_chars) > 0:
ch0_bbox = delete_chars[-1]['bbox']
if x_overlap_ratio(eqinfo['bbox'], ch0_bbox) < 0.51:
delete_chars.remove(delete_chars[-1])
# 计算x方向上被删除区间内的char的真实x0, x1
if len(delete_chars):
x0, x1 = (
min([b['bbox'][0] for b in delete_chars]),
max([b['bbox'][2] for b in delete_chars]),
)
else:
# logger.debug(f"行内公式替换没有发生,尝试下一行匹配, eqinfo={eqinfo}")
return False
# 删除位于x0, x1这两个中间的span
delete_span = []
for span in line['spans']:
span_box = span['bbox']
if x0 <= span_box[0] and span_box[2] <= x1:
delete_span.append(span)
for span in delete_span:
line['spans'].remove(span)
equation_span = {
'size': 9.962599754333496,
'type': TYPE_INLINE_EQUATION,
'flags': 4,
'font': TYPE_INLINE_EQUATION,
'color': 0,
'ascender': 0.9409999847412109,
'descender': -0.3050000071525574,
'latex': '',
'origin': [337.1410153102337, 216.0205245153934],
'bbox': eqinfo['bbox'],
}
# equation_span = line['spans'][0].copy()
equation_span['latex'] = eqinfo['latex']
equation_span['bbox'] = [x0, equation_span['bbox'][1], x1, equation_span['bbox'][3]]
equation_span['origin'] = [equation_span['bbox'][0], equation_span['bbox'][1]]
equation_span['chars'] = delete_chars
equation_span['type'] = TYPE_INLINE_EQUATION
equation_span['_eq_bbox'] = eqinfo['bbox']
line['spans'].insert(first_overlap_span_idx + 1, equation_span) # 放入公式
# logger.info(f"==>text is 【{line_txt}】, equation is 【{eqinfo['latex_text']}】")
# 第一个、和最后一个有overlap的span进行分割,然后插入对应的位置
first_span_chars = [
char
for char in first_overlap_span['chars']
if (char['bbox'][2] + char['bbox'][0]) / 2 < x0
]
tail_span_chars = [
char
for char in last_overlap_span['chars']
if (char['bbox'][0] + char['bbox'][2]) / 2 > x1
]
if len(first_span_chars) > 0:
first_overlap_span['chars'] = first_span_chars
first_overlap_span['text'] = ''.join([char['c'] for char in first_span_chars])
first_overlap_span['bbox'] = (
first_overlap_span['bbox'][0],
first_overlap_span['bbox'][1],
max([chr['bbox'][2] for chr in first_span_chars]),
first_overlap_span['bbox'][3],
)
# first_overlap_span['_type'] = "first"
else:
# 删掉
if first_overlap_span not in delete_span:
line['spans'].remove(first_overlap_span)
if len(tail_span_chars) > 0:
min_of_tail_span_x0 = min([chr['bbox'][0] for chr in tail_span_chars])
min_of_tail_span_y0 = min([chr['bbox'][1] for chr in tail_span_chars])
max_of_tail_span_x1 = max([chr['bbox'][2] for chr in tail_span_chars])
max_of_tail_span_y1 = max([chr['bbox'][3] for chr in tail_span_chars])
if last_overlap_span == first_overlap_span: # 这个时候应该插入一个新的
tail_span_txt = ''.join([char['c'] for char in tail_span_chars]) # noqa: F841
last_span_to_insert = last_overlap_span.copy()
last_span_to_insert['chars'] = tail_span_chars
last_span_to_insert['text'] = ''.join(
[char['c'] for char in tail_span_chars]
)
if equation_span['bbox'][2] >= last_overlap_span['bbox'][2]:
last_span_to_insert['bbox'] = (
min_of_tail_span_x0,
min_of_tail_span_y0,
max_of_tail_span_x1,
max_of_tail_span_y1,
)
else:
last_span_to_insert['bbox'] = (
min([chr['bbox'][0] for chr in tail_span_chars]),
last_overlap_span['bbox'][1],
last_overlap_span['bbox'][2],
last_overlap_span['bbox'][3],
)
# 插入到公式对象之后
equation_idx = line['spans'].index(equation_span)
line['spans'].insert(equation_idx + 1, last_span_to_insert) # 放入公式
else: # 直接修改原来的span
last_overlap_span['chars'] = tail_span_chars
last_overlap_span['text'] = ''.join([char['c'] for char in tail_span_chars])
last_overlap_span['bbox'] = (
min([chr['bbox'][0] for chr in tail_span_chars]),
last_overlap_span['bbox'][1],
last_overlap_span['bbox'][2],
last_overlap_span['bbox'][3],
)
else:
# 删掉
if (
last_overlap_span not in delete_span
and last_overlap_span != first_overlap_span
):
line['spans'].remove(last_overlap_span)
remain_txt = ''
for span in line['spans']:
span_txt = '<span>'
for char in span['chars']:
span_txt = span_txt + char['c']
span_txt = span_txt + '</span>'
remain_txt = remain_txt + span_txt
# logger.info(f"<== succ replace, text is 【{remain_txt}】, equation is 【{eqinfo['latex_text']}】")
return True
def replace_eq_blk(eqinfo, text_block):
"""替换行内公式."""
for line in text_block['lines']:
line_bbox = line['bbox']
if (
_is_xin(eqinfo['bbox'], line_bbox)
or __y_overlap_ratio(eqinfo['bbox'], line_bbox) > 0.6
): # 定位到行, 使用y方向重合率是因为有的时候,一个行的宽度会小于公式位置宽度:行很高,公式很窄,
replace_succ = replace_line_v2(eqinfo, line)
if not replace_succ: # 有的时候,一个pdf的line高度从API里会计算的有问题,因此在行内span级别会替换不成功,这就需要继续重试下一行
continue
else:
break
else:
return False
return True
def replace_inline_equations(inline_equation_bboxes, raw_text_blocks):
"""替换行内公式."""
for eqinfo in inline_equation_bboxes:
eqbox = eqinfo['bbox']
for blk in raw_text_blocks:
if _is_xin(eqbox, blk['bbox']):
if not replace_eq_blk(eqinfo, blk):
logger.warning(f'行内公式没有替换成功:{eqinfo} ')
else:
break
return raw_text_blocks
def remove_chars_in_text_blocks(text_blocks):
"""删除text_blocks里的char."""
for blk in text_blocks:
for line in blk['lines']:
for span in line['spans']:
_ = span.pop('chars', 'no such key')
return text_blocks
def replace_equations_in_textblock(
raw_text_blocks, inline_equation_bboxes, interline_equation_bboxes
):
"""替换行间和和行内公式为latex."""
raw_text_blocks = remove_text_block_in_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消除重叠:第一步,在公式内部的
raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消重,第二步,和公式覆盖的
insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
raw_text_blocks = replace_inline_equations(inline_equation_bboxes, raw_text_blocks)
return raw_text_blocks
def draw_block_on_pdf_with_txt_replace_eq_bbox(json_path, pdf_path):
""""""
new_pdf = f'{Path(pdf_path).parent}/{Path(pdf_path).stem}.step3-消除行内公式text_block.pdf'
with open(json_path, 'r', encoding='utf-8') as f:
obj = json.loads(f.read())
if os.path.exists(new_pdf):
os.remove(new_pdf)
new_doc = fitz.open('')
doc = fitz.open(pdf_path) # noqa: F841
new_doc = fitz.open(pdf_path)
for i in range(len(new_doc)):
page = new_doc[i]
inline_equation_bboxes = obj[f'page_{i}']['inline_equations']
interline_equation_bboxes = obj[f'page_{i}']['interline_equations']
raw_text_blocks = obj[f'page_{i}']['preproc_blocks']
raw_text_blocks = remove_text_block_in_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消除重叠:第一步,在公式内部的
raw_text_blocks = remove_text_block_overlap_interline_equation_bbox(
interline_equation_bboxes, raw_text_blocks
) # 消重,第二步,和公式覆盖的
insert_interline_equations_textblock(interline_equation_bboxes, raw_text_blocks)
raw_text_blocks = replace_inline_equations(
inline_equation_bboxes, raw_text_blocks
)
# 为了检验公式是否重复,把每一行里,含有公式的span背景改成黄色的
color_map = [fitz.pdfcolor['blue'], fitz.pdfcolor['green']] # noqa: F841
j = 0 # noqa: F841
for blk in raw_text_blocks:
for i, line in enumerate(blk['lines']):
# line_box = line['bbox']
# shape = page.new_shape()
# shape.draw_rect(line_box)
# shape.finish(color=fitz.pdfcolor['red'], fill=color_map[j%2], fill_opacity=0.3)
# shape.commit()
# j = j+1
for i, span in enumerate(line['spans']):
shape_page = page.new_shape()
span_type = span.get('_type')
color = fitz.pdfcolor['blue']
if span_type == 'first':
color = fitz.pdfcolor['blue']
elif span_type == 'tail':
color = fitz.pdfcolor['green']
elif span_type == TYPE_INLINE_EQUATION:
color = fitz.pdfcolor['black']
else:
color = None
b = span['bbox']
shape_page.draw_rect(b)
shape_page.finish(color=None, fill=color, fill_opacity=0.3)
shape_page.commit()
new_doc.save(new_pdf)
logger.info(f'save ok {new_pdf}')
final_json = json.dumps(obj, ensure_ascii=False, indent=2)
with open('equations_test/final_json.json', 'w') as f:
f.write(final_json)
return new_pdf
if __name__ == '__main__':
# draw_block_on_pdf_with_txt_replace_eq_bbox(new_json_path, equation_color_pdf)
pass
import re
from magic_pdf.libs.boxbase import _is_in_or_part_overlap, _is_part_overlap, find_bottom_nearest_text_bbox, find_left_nearest_text_bbox, find_right_nearest_text_bbox, find_top_nearest_text_bbox
from magic_pdf.libs.textbase import get_text_block_base_info
def fix_image_vertical(image_bboxes:list, text_blocks:list):
"""
修正图片的位置
如果图片与文字block发生一定重叠(也就是图片切到了一部分文字),那么减少图片边缘,让文字和图片不再重叠。
只对垂直方向进行。
"""
for image_bbox in image_bboxes:
for text_block in text_blocks:
text_bbox = text_block["bbox"]
if _is_part_overlap(text_bbox, image_bbox) and any([text_bbox[0]>=image_bbox[0] and text_bbox[2]<=image_bbox[2], text_bbox[0]<=image_bbox[0] and text_bbox[2]>=image_bbox[2]]):
if text_bbox[1] < image_bbox[1]:#在图片上方
image_bbox[1] = text_bbox[3]+1
elif text_bbox[3]>image_bbox[3]:#在图片下方
image_bbox[3] = text_bbox[1]-1
return image_bboxes
def __merge_if_common_edge(bbox1, bbox2):
x_min_1, y_min_1, x_max_1, y_max_1 = bbox1
x_min_2, y_min_2, x_max_2, y_max_2 = bbox2
# 检查是否有公共的水平边
if y_min_1 == y_min_2 or y_max_1 == y_max_2:
# 确保一个框的x范围在另一个框的x范围内
if max(x_min_1, x_min_2) <= min(x_max_1, x_max_2):
return [min(x_min_1, x_min_2), min(y_min_1, y_min_2), max(x_max_1, x_max_2), max(y_max_1, y_max_2)]
# 检查是否有公共的垂直边
if x_min_1 == x_min_2 or x_max_1 == x_max_2:
# 确保一个框的y范围在另一个框的y范围内
if max(y_min_1, y_min_2) <= min(y_max_1, y_max_2):
return [min(x_min_1, x_min_2), min(y_min_1, y_min_2), max(x_max_1, x_max_2), max(y_max_1, y_max_2)]
# 如果没有公共边
return None
def fix_seperated_image(image_bboxes:list):
"""
如果2个图片有一个边重叠,那么合并2个图片
"""
new_images = []
droped_img_idx = []
for i in range(0, len(image_bboxes)):
for j in range(i+1, len(image_bboxes)):
new_img = __merge_if_common_edge(image_bboxes[i], image_bboxes[j])
if new_img is not None:
new_images.append(new_img)
droped_img_idx.append(i)
droped_img_idx.append(j)
break
for i in range(0, len(image_bboxes)):
if i not in droped_img_idx:
new_images.append(image_bboxes[i])
return new_images
def __check_img_title_pattern(text):
"""
检查文本段是否是表格的标题
"""
patterns = [r"^(fig|figure).*", r"^(scheme).*"]
text = text.strip()
for pattern in patterns:
match = re.match(pattern, text, re.IGNORECASE)
if match:
return True
return False
def __get_fig_caption_text(text_block):
txt = " ".join(span['text'] for line in text_block['lines'] for span in line['spans'])
line_cnt = len(text_block['lines'])
txt = txt.replace("Ž . ", '')
return txt, line_cnt
def __find_and_extend_bottom_caption(text_block, pymu_blocks, image_box):
"""
继续向下方寻找和图片caption字号,字体,颜色一样的文字框,合并入caption。
text_block是已经找到的图片catpion(这个caption可能不全,多行被划分到多个pymu block里了)
"""
combined_image_caption_text_block = list(text_block.copy()['bbox'])
base_font_color, base_font_size, base_font_type = get_text_block_base_info(text_block)
while True:
tb_add = find_bottom_nearest_text_bbox(pymu_blocks, combined_image_caption_text_block)
if not tb_add:
break
tb_font_color, tb_font_size, tb_font_type = get_text_block_base_info(tb_add)
if tb_font_color==base_font_color and tb_font_size==base_font_size and tb_font_type==base_font_type:
combined_image_caption_text_block[0] = min(combined_image_caption_text_block[0], tb_add['bbox'][0])
combined_image_caption_text_block[2] = max(combined_image_caption_text_block[2], tb_add['bbox'][2])
combined_image_caption_text_block[3] = tb_add['bbox'][3]
else:
break
image_box[0] = min(image_box[0], combined_image_caption_text_block[0])
image_box[1] = min(image_box[1], combined_image_caption_text_block[1])
image_box[2] = max(image_box[2], combined_image_caption_text_block[2])
image_box[3] = max(image_box[3], combined_image_caption_text_block[3])
text_block['_image_caption'] = True
def include_img_title(pymu_blocks, image_bboxes: list):
"""
向上方和下方寻找符合图片title的文本block,合并到图片里
如果图片上下都有fig的情况怎么办?寻找标题距离最近的那个。
---
增加对左侧和右侧图片标题的寻找
"""
for tb in image_bboxes:
# 优先找下方的
max_find_cnt = 3 # 向上,向下最多找3个就停止
temp_box = tb.copy()
while max_find_cnt>0:
text_block_btn = find_bottom_nearest_text_bbox(pymu_blocks, temp_box)
if text_block_btn:
txt, line_cnt = __get_fig_caption_text(text_block_btn)
if len(txt.strip())>0:
if not __check_img_title_pattern(txt) and max_find_cnt>0 and line_cnt<3: # 设置line_cnt<=2目的是为了跳过子标题,或者有时候图片下方文字没有被图片识别模型放入图片里
max_find_cnt = max_find_cnt - 1
temp_box[3] = text_block_btn['bbox'][3]
continue
else:
break
else:
temp_box[3] = text_block_btn['bbox'][3] # 宽度不变,扩大
max_find_cnt = max_find_cnt - 1
else:
break
max_find_cnt = 3 # 向上,向下最多找3个就停止
temp_box = tb.copy()
while max_find_cnt>0:
text_block_top = find_top_nearest_text_bbox(pymu_blocks, temp_box)
if text_block_top:
txt, line_cnt = __get_fig_caption_text(text_block_top)
if len(txt.strip())>0:
if not __check_img_title_pattern(txt) and max_find_cnt>0 and line_cnt <3:
max_find_cnt = max_find_cnt - 1
temp_box[1] = text_block_top['bbox'][1]
continue
else:
break
else:
b = text_block_top['bbox']
temp_box[1] = b[1] # 宽度不变,扩大
max_find_cnt = max_find_cnt - 1
else:
break
if text_block_btn and text_block_top and text_block_btn.get("_image_caption", False) is False and text_block_top.get("_image_caption", False) is False :
btn_text, _ = __get_fig_caption_text(text_block_btn)
top_text, _ = __get_fig_caption_text(text_block_top)
if __check_img_title_pattern(btn_text) and __check_img_title_pattern(top_text):
# 取距离图片最近的
btn_text_distance = text_block_btn['bbox'][1] - tb[3]
top_text_distance = tb[1] - text_block_top['bbox'][3]
if btn_text_distance<top_text_distance: # caption在下方
__find_and_extend_bottom_caption(text_block_btn, pymu_blocks, tb)
else:
text_block = text_block_top
tb[0] = min(tb[0], text_block['bbox'][0])
tb[1] = min(tb[1], text_block['bbox'][1])
tb[2] = max(tb[2], text_block['bbox'][2])
tb[3] = max(tb[3], text_block['bbox'][3])
text_block_btn['_image_caption'] = True
continue
text_block = text_block_btn # find_bottom_nearest_text_bbox(pymu_blocks, tb)
if text_block and text_block.get("_image_caption", False) is False:
first_text_line, _ = __get_fig_caption_text(text_block)
if __check_img_title_pattern(first_text_line):
# 发现特征之后,继续向相同方向寻找(想同颜色,想同大小,想同字体)的textblock
__find_and_extend_bottom_caption(text_block, pymu_blocks, tb)
continue
text_block = text_block_top # find_top_nearest_text_bbox(pymu_blocks, tb)
if text_block and text_block.get("_image_caption", False) is False:
first_text_line, _ = __get_fig_caption_text(text_block)
if __check_img_title_pattern(first_text_line):
tb[0] = min(tb[0], text_block['bbox'][0])
tb[1] = min(tb[1], text_block['bbox'][1])
tb[2] = max(tb[2], text_block['bbox'][2])
tb[3] = max(tb[3], text_block['bbox'][3])
text_block['_image_caption'] = True
continue
"""向左、向右寻找,暂时只寻找一次"""
left_text_block = find_left_nearest_text_bbox(pymu_blocks, tb)
if left_text_block and left_text_block.get("_image_caption", False) is False:
first_text_line, _ = __get_fig_caption_text(left_text_block)
if __check_img_title_pattern(first_text_line):
tb[0] = min(tb[0], left_text_block['bbox'][0])
tb[1] = min(tb[1], left_text_block['bbox'][1])
tb[2] = max(tb[2], left_text_block['bbox'][2])
tb[3] = max(tb[3], left_text_block['bbox'][3])
left_text_block['_image_caption'] = True
continue
right_text_block = find_right_nearest_text_bbox(pymu_blocks, tb)
if right_text_block and right_text_block.get("_image_caption", False) is False:
first_text_line, _ = __get_fig_caption_text(right_text_block)
if __check_img_title_pattern(first_text_line):
tb[0] = min(tb[0], right_text_block['bbox'][0])
tb[1] = min(tb[1], right_text_block['bbox'][1])
tb[2] = max(tb[2], right_text_block['bbox'][2])
tb[3] = max(tb[3], right_text_block['bbox'][3])
right_text_block['_image_caption'] = True
continue
return image_bboxes
def combine_images(image_bboxes:list):
"""
合并图片,如果图片有重叠,那么合并
"""
new_images = []
droped_img_idx = []
for i in range(0, len(image_bboxes)):
for j in range(i+1, len(image_bboxes)):
if j not in droped_img_idx and _is_in_or_part_overlap(image_bboxes[i], image_bboxes[j]):
# 合并
image_bboxes[i][0], image_bboxes[i][1],image_bboxes[i][2],image_bboxes[i][3] = min(image_bboxes[i][0], image_bboxes[j][0]), min(image_bboxes[i][1], image_bboxes[j][1]), max(image_bboxes[i][2], image_bboxes[j][2]), max(image_bboxes[i][3], image_bboxes[j][3])
droped_img_idx.append(j)
for i in range(0, len(image_bboxes)):
if i not in droped_img_idx:
new_images.append(image_bboxes[i])
return new_images
\ No newline at end of file
from magic_pdf.libs.commons import fitz # pyMuPDF库
import re
from magic_pdf.libs.boxbase import _is_in_or_part_overlap, _is_part_overlap, find_bottom_nearest_text_bbox, find_left_nearest_text_bbox, find_right_nearest_text_bbox, find_top_nearest_text_bbox # json
## version 2
def get_merged_line(page):
"""
这个函数是为了从pymuPDF中提取出的矢量里筛出水平的横线,并且将断开的线段进行了合并。
:param page :fitz读取的当前页的内容
"""
drawings_bbox = []
drawings_line = []
drawings = page.get_drawings() # 提取所有的矢量
for p in drawings:
drawings_bbox.append(p["rect"].irect) # (L, U, R, D)
lines = []
for L, U, R, D in drawings_bbox:
if abs(D - U) <= 3: # 筛出水平的横线
lines.append((L, U, R, D))
U_groups = []
visited = [False for _ in range(len(lines))]
for i, (L1, U1, R1, D1) in enumerate(lines):
if visited[i] == True:
continue
tmp_g = [(L1, U1, R1, D1)]
for j, (L2, U2, R2, D2) in enumerate(lines):
if i == j:
continue
if visited[j] == True:
continue
if max(U1, D1, U2, D2) - min(U1, D1, U2, D2) <= 5: # 把高度一致的线放进一个group
tmp_g.append((L2, U2, R2, D2))
visited[j] = True
U_groups.append(tmp_g)
res = []
for group in U_groups:
group.sort(key = lambda LURD: (LURD[0], LURD[2]))
LL, UU, RR, DD = group[0]
for i, (L1, U1, R1, D1) in enumerate(group):
if (L1 - RR) >= 5:
cur_line = (LL, UU, RR, DD)
res.append(cur_line)
LL = L1
else:
RR = max(RR, R1)
cur_line = (LL, UU, RR, DD)
res.append(cur_line)
return res
def fix_tables(page: fitz.Page, table_bboxes: list, include_table_title: bool, scan_line_num: int):
"""
:param page :fitz读取的当前页的内容
:param table_bboxes: list类型,每一个元素是一个元祖 (L, U, R, D)
:param include_table_title: 是否将表格的标题也圈进来
:param scan_line_num: 在与表格框临近的上下几个文本框里扫描搜索标题
"""
drawings_lines = get_merged_line(page)
fix_table_bboxes = []
for table in table_bboxes:
(L, U, R, D) = table
fix_table_L = []
fix_table_U = []
fix_table_R = []
fix_table_D = []
width = R - L
width_range = width * 0.1 # 只看距离表格整体宽度10%之内偏差的线
height = D - U
height_range = height * 0.1 # 只看距离表格整体高度10%之内偏差的线
for line in drawings_lines:
if (L - width_range) <= line[0] <= (L + width_range) and (R - width_range) <= line[2] <= (R + width_range): # 相近的宽度
if (U - height_range) < line[1] < (U + height_range): # 上边界,在一定的高度范围内
fix_table_U.append(line[1])
fix_table_L.append(line[0])
fix_table_R.append(line[2])
elif (D - height_range) < line[1] < (D + height_range): # 下边界,在一定的高度范围内
fix_table_D.append(line[1])
fix_table_L.append(line[0])
fix_table_R.append(line[2])
if fix_table_U:
U = min(fix_table_U)
if fix_table_D:
D = max(fix_table_D)
if fix_table_L:
L = min(fix_table_L)
if fix_table_R:
R = max(fix_table_R)
if include_table_title: # 需要将表格标题包括
text_blocks = page.get_text("dict", flags=fitz.TEXTFLAGS_TEXT)["blocks"] # 所有的text的block
incolumn_text_blocks = [block for block in text_blocks if not ((block['bbox'][0] < L and block['bbox'][2] < L) or (block['bbox'][0] > R and block['bbox'][2] > R))] # 将与表格完全没有任何遮挡的文字筛除掉(比如另一栏的文字)
upper_text_blocks = [block for block in incolumn_text_blocks if (U - block['bbox'][3]) > 0] # 将在表格线以上的text block筛选出来
sorted_filtered_text_blocks = sorted(upper_text_blocks, key=lambda x: (U - x['bbox'][3], x['bbox'][0])) # 按照text block的下边界距离表格上边界的距离升序排序,如果是同一个高度,则先左再右
for idx in range(scan_line_num):
if idx+1 <= len(sorted_filtered_text_blocks):
line_temp = sorted_filtered_text_blocks[idx]['lines']
if line_temp:
text = line_temp[0]['spans'][0]['text'] # 提取出第一个span里的text内容
check_en = re.match('Table', text) # 检查是否有Table开头的(英文)
check_ch = re.match('表', text) # 检查是否有Table开头的(中文)
if check_en or check_ch:
if sorted_filtered_text_blocks[idx]['bbox'][1] < D: # 以防出现负的bbox
U = sorted_filtered_text_blocks[idx]['bbox'][1]
fix_table_bboxes.append([L-2, U-2, R+2, D+2])
return fix_table_bboxes
def __check_table_title_pattern(text):
"""
检查文本段是否是表格的标题
"""
patterns = [r'^table\s\d+']
for pattern in patterns:
match = re.match(pattern, text, re.IGNORECASE)
if match:
return True
else:
return False
def fix_table_text_block(pymu_blocks, table_bboxes: list):
"""
调整table, 如果table和上下的text block有相交区域,则将table的上下边界调整到text block的上下边界
例如 tmp/unittest/unittest_pdf/纯2列_ViLT_6_文字 表格.pdf
"""
for tb in table_bboxes:
(L, U, R, D) = tb
for block in pymu_blocks:
if _is_in_or_part_overlap((L, U, R, D), block['bbox']):
txt = " ".join(span['text'] for line in block['lines'] for span in line['spans'])
if not __check_table_title_pattern(txt) and block.get("_table", False) is False: # 如果是table的title,那么不调整。因为下一步会统一调整,如果这里进行了调整,后面的调整会造成调整到其他table的title上(在连续出现2个table的情况下)。
tb[0] = min(tb[0], block['bbox'][0])
tb[1] = min(tb[1], block['bbox'][1])
tb[2] = max(tb[2], block['bbox'][2])
tb[3] = max(tb[3], block['bbox'][3])
block['_table'] = True # 占位,防止其他table再次占用
"""如果是个table的title,但是有部分重叠,那么修正这个title,使得和table不重叠"""
if _is_part_overlap(tb, block['bbox']) and __check_table_title_pattern(txt):
block['bbox'] = list(block['bbox'])
if block['bbox'][3] > U:
block['bbox'][3] = U-1
if block['bbox'][1] < D:
block['bbox'][1] = D+1
return table_bboxes
def __get_table_caption_text(text_block):
txt = " ".join(span['text'] for line in text_block['lines'] for span in line['spans'])
line_cnt = len(text_block['lines'])
txt = txt.replace("Ž . ", '')
return txt, line_cnt
def include_table_title(pymu_blocks, table_bboxes: list):
"""
把表格的title也包含进来,扩展到table_bbox上
"""
for tb in table_bboxes:
max_find_cnt = 3 # 上上最多找3次
temp_box = tb.copy()
while max_find_cnt>0:
text_block_top = find_top_nearest_text_bbox(pymu_blocks, temp_box)
if text_block_top:
txt, line_cnt = __get_table_caption_text(text_block_top)
if len(txt.strip())>0:
if not __check_table_title_pattern(txt) and max_find_cnt>0 and line_cnt<3:
max_find_cnt = max_find_cnt -1
temp_box[1] = text_block_top['bbox'][1]
continue
else:
break
else:
temp_box[1] = text_block_top['bbox'][1] # 宽度不变,扩大
max_find_cnt = max_find_cnt - 1
else:
break
max_find_cnt = 3 # 向下找
temp_box = tb.copy()
while max_find_cnt>0:
text_block_bottom = find_bottom_nearest_text_bbox(pymu_blocks, temp_box)
if text_block_bottom:
txt, line_cnt = __get_table_caption_text(text_block_bottom)
if len(txt.strip())>0:
if not __check_table_title_pattern(txt) and max_find_cnt>0 and line_cnt<3:
max_find_cnt = max_find_cnt - 1
temp_box[3] = text_block_bottom['bbox'][3]
continue
else:
break
else:
temp_box[3] = text_block_bottom['bbox'][3]
max_find_cnt = max_find_cnt - 1
else:
break
if text_block_top and text_block_bottom and text_block_top.get("_table_caption", False) is False and text_block_bottom.get("_table_caption", False) is False :
btn_text, _ = __get_table_caption_text(text_block_bottom)
top_text, _ = __get_table_caption_text(text_block_top)
if __check_table_title_pattern(btn_text) and __check_table_title_pattern(top_text): # 上下都有一个tbale的caption
# 取距离最近的
btn_text_distance = text_block_bottom['bbox'][1] - tb[3]
top_text_distance = tb[1] - text_block_top['bbox'][3]
text_block = text_block_bottom if btn_text_distance<top_text_distance else text_block_top
tb[0] = min(tb[0], text_block['bbox'][0])
tb[1] = min(tb[1], text_block['bbox'][1])
tb[2] = max(tb[2], text_block['bbox'][2])
tb[3] = max(tb[3], text_block['bbox'][3])
text_block_bottom['_table_caption'] = True
continue
# 如果以上条件都不满足,那么就向下找
text_block = text_block_top
if text_block and text_block.get("_table_caption", False) is False:
first_text_line = " ".join(span['text'] for line in text_block['lines'] for span in line['spans'])
if __check_table_title_pattern(first_text_line) and text_block.get("_table", False) is False:
tb[0] = min(tb[0], text_block['bbox'][0])
tb[1] = min(tb[1], text_block['bbox'][1])
tb[2] = max(tb[2], text_block['bbox'][2])
tb[3] = max(tb[3], text_block['bbox'][3])
text_block['_table_caption'] = True
continue
text_block = text_block_bottom
if text_block and text_block.get("_table_caption", False) is False:
first_text_line, _ = __get_table_caption_text(text_block)
if __check_table_title_pattern(first_text_line) and text_block.get("_table", False) is False:
tb[0] = min(tb[0], text_block['bbox'][0])
tb[1] = min(tb[1], text_block['bbox'][1])
tb[2] = max(tb[2], text_block['bbox'][2])
tb[3] = max(tb[3], text_block['bbox'][3])
text_block['_table_caption'] = True
continue
"""向左、向右寻找,暂时只寻找一次"""
left_text_block = find_left_nearest_text_bbox(pymu_blocks, tb)
if left_text_block and left_text_block.get("_image_caption", False) is False:
first_text_line, _ = __get_table_caption_text(left_text_block)
if __check_table_title_pattern(first_text_line):
tb[0] = min(tb[0], left_text_block['bbox'][0])
tb[1] = min(tb[1], left_text_block['bbox'][1])
tb[2] = max(tb[2], left_text_block['bbox'][2])
tb[3] = max(tb[3], left_text_block['bbox'][3])
left_text_block['_image_caption'] = True
continue
right_text_block = find_right_nearest_text_bbox(pymu_blocks, tb)
if right_text_block and right_text_block.get("_image_caption", False) is False:
first_text_line, _ = __get_table_caption_text(right_text_block)
if __check_table_title_pattern(first_text_line):
tb[0] = min(tb[0], right_text_block['bbox'][0])
tb[1] = min(tb[1], right_text_block['bbox'][1])
tb[2] = max(tb[2], right_text_block['bbox'][2])
tb[3] = max(tb[3], right_text_block['bbox'][3])
right_text_block['_image_caption'] = True
continue
return table_bboxes
\ No newline at end of file
import collections
def get_main_text_font(pdf_docs):
font_names = collections.Counter()
for page in pdf_docs:
blocks = page.get_text('dict')['blocks']
if blocks is not None:
for block in blocks:
lines = block.get('lines')
if lines is not None:
for line in lines:
span_font = [(span['font'], len(span['text'])) for span in line['spans'] if
'font' in span and len(span['text']) > 0]
if span_font:
# main_text_font应该用基于字数最多的字体而不是span级别的统计
# font_names.append(font_name for font_name in span_font)
# block_fonts.append(font_name for font_name in span_font)
for font, count in span_font:
font_names[font] += count
main_text_font = font_names.most_common(1)[0][0]
return main_text_font
from magic_pdf.config.ocr_content_type import BlockType
from magic_pdf.libs.boxbase import (
calculate_iou, calculate_overlap_area_in_bbox1_area_ratio,
calculate_iou,
calculate_overlap_area_in_bbox1_area_ratio,
calculate_vertical_projection_overlap_ratio,
get_minbox_if_overlap_by_ratio)
from magic_pdf.pre_proc.remove_bbox_overlap import \
remove_overlap_between_bbox_for_block
def ocr_prepare_bboxes_for_layout_split(
img_blocks,
table_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equation_blocks,
page_w,
page_h,
):
all_bboxes = []
all_discarded_blocks = []
for image in img_blocks:
x0, y0, x1, y1 = image['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Image,
None,
None,
None,
None,
image['score'],
]
)
for table in table_blocks:
x0, y0, x1, y1 = table['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Table,
None,
None,
None,
None,
table['score'],
]
)
for text in text_blocks:
x0, y0, x1, y1 = text['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Text,
None,
None,
None,
None,
text['score'],
]
)
for title in title_blocks:
x0, y0, x1, y1 = title['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Title,
None,
None,
None,
None,
title['score'],
]
)
for interline_equation in interline_equation_blocks:
x0, y0, x1, y1 = interline_equation['bbox']
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.InterlineEquation,
None,
None,
None,
None,
interline_equation['score'],
]
)
"""block嵌套问题解决"""
"""文本框与标题框重叠,优先信任文本框"""
all_bboxes = fix_text_overlap_title_blocks(all_bboxes)
"""任何框体与舍弃框重叠,优先信任舍弃框"""
all_bboxes = remove_need_drop_blocks(all_bboxes, discarded_blocks)
# interline_equation 与title或text框冲突的情况,分两种情况处理
"""interline_equation框与文本类型框iou比较接近1的时候,信任行间公式框"""
all_bboxes = fix_interline_equation_overlap_text_blocks_with_hi_iou(all_bboxes)
"""interline_equation框被包含在文本类型框内,且interline_equation比文本区块小很多时信任文本框,这时需要舍弃公式框"""
# 通过后续大框套小框逻辑删除
"""discarded_blocks中只保留宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的(限定footnote)"""
for discarded in discarded_blocks:
x0, y0, x1, y1 = discarded['bbox']
all_discarded_blocks.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Discarded,
None,
None,
None,
None,
discarded['score'],
]
)
# 将footnote加入到all_bboxes中,用来计算layout
if (x1 - x0) > (page_w / 3) and (y1 - y0) > 10 and y0 > (page_h / 2):
all_bboxes.append(
[
x0,
y0,
x1,
y1,
None,
None,
None,
BlockType.Footnote,
None,
None,
None,
None,
discarded['score'],
]
)
"""经过以上处理后,还存在大框套小框的情况,则删除小框"""
all_bboxes = remove_overlaps_min_blocks(all_bboxes)
all_discarded_blocks = remove_overlaps_min_blocks(all_discarded_blocks)
"""将剩余的bbox做分离处理,防止后面分layout时出错"""
all_bboxes, drop_reasons = remove_overlap_between_bbox_for_block(all_bboxes)
return all_bboxes, all_discarded_blocks, drop_reasons
get_minbox_if_overlap_by_ratio
)
from magic_pdf.pre_proc.remove_bbox_overlap import remove_overlap_between_bbox_for_block
def add_bboxes(blocks, block_type, bboxes):
......
import fitz
from magic_pdf.layout.layout_sort import get_bboxes_layout
from magic_pdf.libs.boxbase import _is_part_overlap, _is_in
from magic_pdf.libs.coordinate_transform import get_scale_ratio
def get_center_point(bbox):
"""
根据边界框坐标信息,计算出该边界框的中心点坐标。
Args:
bbox (list): 边界框坐标信息,包含四个元素,分别为左上角x坐标、左上角y坐标、右下角x坐标、右下角y坐标。
Returns:
list: 中心点坐标信息,包含两个元素,分别为x坐标和y坐标。
"""
return [(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2]
def get_area(bbox):
"""
根据边界框坐标信息,计算出该边界框的面积。
Args:
bbox (list): 边界框坐标信息,包含四个元素,分别为左上角x坐标、左上角y坐标、右下角x坐标、右下角y坐标。
Returns:
float: 该边界框的面积。
"""
return (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
def adjust_layouts(layout_bboxes, page_boundry, page_id):
# 遍历所有布局框
for i in range(len(layout_bboxes)):
# 遍历当前布局框之后的布局框
for j in range(i + 1, len(layout_bboxes)):
# 判断两个布局框是否重叠
if _is_part_overlap(layout_bboxes[i], layout_bboxes[j]):
# 计算每个布局框的中心点坐标和面积
area_i = get_area(layout_bboxes[i])
area_j = get_area(layout_bboxes[j])
# 较大布局框和较小布局框的赋值
if area_i > area_j:
larger_layout, smaller_layout = layout_bboxes[i], layout_bboxes[j]
else:
larger_layout, smaller_layout = layout_bboxes[j], layout_bboxes[i]
center_large = get_center_point(larger_layout)
center_small = get_center_point(smaller_layout)
# 计算横向和纵向的距离差
distance_x = center_large[0] - center_small[0]
distance_y = center_large[1] - center_small[1]
# 根据距离差判断重叠方向并修正边界
if abs(distance_x) > abs(distance_y): # 左右重叠
if distance_x > 0 and larger_layout[0] < smaller_layout[2]:
larger_layout[0] = smaller_layout[2]+1
if distance_x < 0 and larger_layout[2] > smaller_layout[0]:
larger_layout[2] = smaller_layout[0]-1
else: # 上下重叠
if distance_y > 0 and larger_layout[1] < smaller_layout[3]:
larger_layout[1] = smaller_layout[3]+1
if distance_y < 0 and larger_layout[3] > smaller_layout[1]:
larger_layout[3] = smaller_layout[1]-1
# 排序调整布局边界框列表
new_bboxes = []
for layout_bbox in layout_bboxes:
new_bboxes.append([layout_bbox[0], layout_bbox[1], layout_bbox[2], layout_bbox[3], None, None, None, None, None, None, None, None, None])
layout_bboxes, layout_tree = get_bboxes_layout(new_bboxes, page_boundry, page_id)
# 返回排序调整后的布局边界框列表
return layout_bboxes, layout_tree
def layout_detect(layout_info, page: fitz.Page, ocr_page_info):
"""
对输入的布局信息进行解析,提取出每个子布局的边界框,并对所有子布局进行排序调整。
Args:
layout_info (list): 包含子布局信息的列表,每个子布局信息为字典类型,包含'poly'字段,表示子布局的边界框坐标信息。
Returns:
list: 经过排序调整后的所有子布局边界框信息的列表,每个边界框信息为字典类型,包含'layout_bbox'字段,表示边界框的坐标信息。
"""
page_id = ocr_page_info['page_info']['page_no']-1
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(ocr_page_info, page)
# 初始化布局边界框列表
layout_bboxes = []
# 遍历每个子布局
for sub_layout in layout_info:
# 提取子布局的边界框坐标信息
x0, y0, _, _, x1, y1, _, _ = sub_layout['poly']
bbox = [int(x0 / horizontal_scale_ratio), int(y0 / vertical_scale_ratio),
int(x1 / horizontal_scale_ratio), int(y1 / vertical_scale_ratio)]
# 将子布局的边界框添加到列表中
layout_bboxes.append(bbox)
# 初始化新的布局边界框列表
new_layout_bboxes = []
# 遍历每个布局边界框
for i in range(len(layout_bboxes)):
# 初始化标记变量,用于判断当前边界框是否需要保留
keep = True
# 获取当前边界框的坐标信息
box_i = layout_bboxes[i]
# 遍历其他边界框
for j in range(len(layout_bboxes)):
# 排除当前边界框自身
if i != j:
# 获取其他边界框的坐标信息
box_j = layout_bboxes[j]
# 检测box_i是否被box_j包含
if _is_in(box_i, box_j):
# 如果当前边界框被其他边界框包含,则标记为不需要保留
keep = False
# 跳出内层循环
break
# 如果当前边界框需要保留,则添加到新的布局边界框列表中
if keep:
new_layout_bboxes.append(layout_bboxes[i])
# 对新的布局边界框列表进行排序调整
page_width = page.rect.width
page_height = page.rect.height
page_boundry = [0, 0, page_width, page_height]
layout_bboxes, layout_tree = adjust_layouts(new_layout_bboxes, page_boundry, page_id)
# 返回排序调整后的布局边界框列表
return layout_bboxes, layout_tree
from magic_pdf.config.drop_tag import DropTag
from magic_pdf.config.ocr_content_type import BlockType, ContentType
from magic_pdf.libs.boxbase import (__is_overlaps_y_exceeds_threshold,
_is_in_or_part_overlap_with_area_ratio,
calculate_overlap_area_in_bbox1_area_ratio)
from magic_pdf.libs.boxbase import __is_overlaps_y_exceeds_threshold, calculate_overlap_area_in_bbox1_area_ratio
# 将每一个line中的span从左到右排序
......@@ -63,86 +60,6 @@ def merge_spans_to_line(spans, threshold=0.6):
return lines
def merge_spans_to_line_by_layout(spans, layout_bboxes):
lines = []
new_spans = []
dropped_spans = []
for item in layout_bboxes:
layout_bbox = item['layout_bbox']
# 遍历spans,将每个span放入对应的layout中
layout_sapns = []
for span in spans:
if calculate_overlap_area_in_bbox1_area_ratio(
span['bbox'], layout_bbox) > 0.6:
layout_sapns.append(span)
# 如果layout_sapns不为空,则放入new_spans中
if len(layout_sapns) > 0:
new_spans.append(layout_sapns)
# 从spans删除已经放入layout_sapns中的span
for layout_sapn in layout_sapns:
spans.remove(layout_sapn)
if len(new_spans) > 0:
for layout_sapns in new_spans:
layout_lines = merge_spans_to_line(layout_sapns)
lines.extend(layout_lines)
# 对line中的span进行排序
lines = line_sort_spans_by_left_to_right(lines)
for span in spans:
span['tag'] = DropTag.NOT_IN_LAYOUT
dropped_spans.append(span)
return lines, dropped_spans
def merge_lines_to_block(lines):
# 目前不做block拼接,先做个结构,每个block中只有一个line,block的bbox就是line的bbox
blocks = []
for line in lines:
blocks.append({
'bbox': line['bbox'],
'lines': [line],
})
return blocks
def sort_blocks_by_layout(all_bboxes, layout_bboxes):
new_blocks = []
sort_blocks = []
for item in layout_bboxes:
layout_bbox = item['layout_bbox']
# 遍历blocks,将每个blocks放入对应的layout中
layout_blocks = []
for block in all_bboxes:
# 如果是footnote则跳过
if block[7] == BlockType.Footnote:
continue
block_bbox = block[:4]
if calculate_overlap_area_in_bbox1_area_ratio(
block_bbox, layout_bbox) > 0.8:
layout_blocks.append(block)
# 如果layout_blocks不为空,则放入new_blocks中
if len(layout_blocks) > 0:
new_blocks.append(layout_blocks)
# 从all_bboxes删除已经放入layout_blocks中的block
for layout_block in layout_blocks:
all_bboxes.remove(layout_block)
# 如果new_blocks不为空,则对new_blocks中每个block进行排序
if len(new_blocks) > 0:
for bboxes_in_layout_block in new_blocks:
bboxes_in_layout_block.sort(
key=lambda x: x[1]) # 一个layout内部的box,按照y0自上而下排序
sort_blocks.extend(bboxes_in_layout_block)
# sort_blocks中已经包含了当前页面所有最终留下的block,且已经排好了顺序
return sort_blocks
def fill_spans_in_blocks(blocks, spans, radio):
"""将allspans中的span按位置关系,放入blocks中."""
block_with_spans = []
......@@ -164,14 +81,6 @@ def fill_spans_in_blocks(blocks, spans, radio):
if calculate_overlap_area_in_bbox1_area_ratio(
span_bbox, block_bbox) > radio:
block_spans.append(span)
'''行内公式调整, 高度调整至与同行文字高度一致(优先左侧, 其次右侧)'''
# displayed_list = []
# text_inline_lines = []
# modify_y_axis(block_spans, displayed_list, text_inline_lines)
'''模型识别错误的行间公式, type类型转换成行内公式'''
# block_spans = modify_inline_equation(block_spans, displayed_list, text_inline_lines)
'''bbox去除粘连''' # 去粘连会影响span的bbox,导致后续fill的时候出错
# block_spans = remove_overlap_between_bbox_for_span(block_spans)
block_dict['spans'] = block_spans
block_with_spans.append(block_dict)
......@@ -184,32 +93,7 @@ def fill_spans_in_blocks(blocks, spans, radio):
return block_with_spans, spans
def fix_block_spans(block_with_spans, img_blocks, table_blocks):
"""1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系
需要将caption和footnote的text_span放入相应img_block和table_block内的
caption_block和footnote_block中 2、同时需要删除block中的spans字段."""
fix_blocks = []
for block in block_with_spans:
block_type = block['type']
if block_type == BlockType.Image:
block = fix_image_block(block, img_blocks)
elif block_type == BlockType.Table:
block = fix_table_block(block, table_blocks)
elif block_type in [BlockType.Text, BlockType.Title]:
block = fix_text_block(block)
elif block_type == BlockType.InterlineEquation:
block = fix_interline_block(block)
else:
continue
fix_blocks.append(block)
return fix_blocks
def fix_block_spans_v2(block_with_spans):
"""1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系
需要将caption和footnote的text_span放入相应img_block和table_block内的
caption_block和footnote_block中 2、同时需要删除block中的spans字段."""
fix_blocks = []
for block in block_with_spans:
block_type = block['type']
......@@ -235,113 +119,6 @@ def fix_discarded_block(discarded_block_with_spans):
return fix_discarded_blocks
def merge_spans_to_block(spans: list, block_bbox: list, block_type: str):
block_spans = []
# 如果有img_caption,则将img_block中的text_spans放入img_caption_block中
for span in spans:
if calculate_overlap_area_in_bbox1_area_ratio(span['bbox'],
block_bbox) > 0.6:
block_spans.append(span)
block_lines = merge_spans_to_line(block_spans)
# 对line中的span进行排序
sort_block_lines = line_sort_spans_by_left_to_right(block_lines)
block = {'bbox': block_bbox, 'type': block_type, 'lines': sort_block_lines}
return block, block_spans
def make_body_block(span: dict, block_bbox: list, block_type: str):
# 创建body_block
body_line = {
'bbox': block_bbox,
'spans': [span],
}
body_block = {'bbox': block_bbox, 'type': block_type, 'lines': [body_line]}
return body_block
def fix_image_block(block, img_blocks):
block['blocks'] = []
# 遍历img_blocks,找到与当前block匹配的img_block
for img_block in img_blocks:
if _is_in_or_part_overlap_with_area_ratio(block['bbox'],
img_block['bbox'], 0.95):
# 创建img_body_block
for span in block['spans']:
if span['type'] == ContentType.Image and img_block[
'img_body_bbox'] == span['bbox']:
# 创建img_body_block
img_body_block = make_body_block(
span, img_block['img_body_bbox'], BlockType.ImageBody)
block['blocks'].append(img_body_block)
# 从spans中移除img_body_block中已经放入的span
block['spans'].remove(span)
break
# 根据list长度,判断img_block中是否有img_caption
if img_block['img_caption_bbox'] is not None:
img_caption_block, img_caption_spans = merge_spans_to_block(
block['spans'], img_block['img_caption_bbox'],
BlockType.ImageCaption)
block['blocks'].append(img_caption_block)
if img_block['img_footnote_bbox'] is not None:
img_footnote_block, img_footnote_spans = merge_spans_to_block(
block['spans'], img_block['img_footnote_bbox'],
BlockType.ImageFootnote)
block['blocks'].append(img_footnote_block)
break
del block['spans']
return block
def fix_table_block(block, table_blocks):
block['blocks'] = []
# 遍历table_blocks,找到与当前block匹配的table_block
for table_block in table_blocks:
if _is_in_or_part_overlap_with_area_ratio(block['bbox'],
table_block['bbox'], 0.95):
# 创建table_body_block
for span in block['spans']:
if span['type'] == ContentType.Table and table_block[
'table_body_bbox'] == span['bbox']:
# 创建table_body_block
table_body_block = make_body_block(
span, table_block['table_body_bbox'],
BlockType.TableBody)
block['blocks'].append(table_body_block)
# 从spans中移除img_body_block中已经放入的span
block['spans'].remove(span)
break
# 根据list长度,判断table_block中是否有caption
if table_block['table_caption_bbox'] is not None:
table_caption_block, table_caption_spans = merge_spans_to_block(
block['spans'], table_block['table_caption_bbox'],
BlockType.TableCaption)
block['blocks'].append(table_caption_block)
# 如果table_caption_block_spans不为空
if len(table_caption_spans) > 0:
# 一些span已经放入了caption_block中,需要从block['spans']中删除
for span in table_caption_spans:
block['spans'].remove(span)
# 根据list长度,判断table_block中是否有table_note
if table_block['table_footnote_bbox'] is not None:
table_footnote_block, table_footnote_spans = merge_spans_to_block(
block['spans'], table_block['table_footnote_bbox'],
BlockType.TableFootnote)
block['blocks'].append(table_footnote_block)
break
del block['spans']
return block
def fix_text_block(block):
# 文本block中的公式span都应该转换成行内type
for span in block['spans']:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment