Commit 6a22b5ab authored by myhloli's avatar myhloli
Browse files

refactor(magic_pdf): remove unused functions and simplify code

parent ecdaa49a
import math
from loguru import logger
from magic_pdf.config.ocr_content_type import ContentType
from magic_pdf.libs.boxbase import (find_bottom_nearest_text_bbox,
find_top_nearest_text_bbox)
from magic_pdf.libs.commons import join_path
TYPE_INLINE_EQUATION = ContentType.InlineEquation
TYPE_INTERLINE_EQUATION = ContentType.InterlineEquation
UNI_FORMAT_TEXT_TYPE = ['text', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']
@DeprecationWarning
def mk_nlp_markdown_1(para_dict: dict):
"""对排序后的bboxes拼接内容."""
content_lst = []
for _, page_info in para_dict.items():
para_blocks = page_info.get('para_blocks')
if not para_blocks:
continue
for block in para_blocks:
item = block['paras']
for _, p in item.items():
para_text = p['para_text']
is_title = p['is_para_title']
title_level = p['para_title_level']
md_title_prefix = '#' * title_level
if is_title:
content_lst.append(f'{md_title_prefix} {para_text}')
else:
content_lst.append(para_text)
content_text = '\n\n'.join(content_lst)
return content_text
# 找到目标字符串在段落中的索引
def __find_index(paragraph, target):
index = paragraph.find(target)
if index != -1:
return index
else:
return None
def __insert_string(paragraph, target, position):
new_paragraph = paragraph[:position] + target + paragraph[position:]
return new_paragraph
def __insert_after(content, image_content, target):
"""在content中找到target,将image_content插入到target后面."""
index = content.find(target)
if index != -1:
content = (
content[: index + len(target)]
+ '\n\n'
+ image_content
+ '\n\n'
+ content[index + len(target) :]
)
else:
logger.error(
f"Can't find the location of image {image_content} in the markdown file, search target is {target}"
)
return content
def __insert_before(content, image_content, target):
"""在content中找到target,将image_content插入到target前面."""
index = content.find(target)
if index != -1:
content = content[:index] + '\n\n' + image_content + '\n\n' + content[index:]
else:
logger.error(
f"Can't find the location of image {image_content} in the markdown file, search target is {target}"
)
return content
@DeprecationWarning
def mk_mm_markdown_1(para_dict: dict):
"""拼装多模态markdown."""
content_lst = []
for _, page_info in para_dict.items():
page_lst = [] # 一个page内的段落列表
para_blocks = page_info.get('para_blocks')
pymu_raw_blocks = page_info.get('preproc_blocks')
all_page_images = []
all_page_images.extend(page_info.get('images', []))
all_page_images.extend(page_info.get('image_backup', []))
all_page_images.extend(page_info.get('tables', []))
all_page_images.extend(page_info.get('table_backup', []))
if not para_blocks or not pymu_raw_blocks: # 只有图片的拼接的场景
for img in all_page_images:
page_lst.append(f"![]({img['image_path']})") # TODO 图片顺序
page_md = '\n\n'.join(page_lst)
else:
for block in para_blocks:
item = block['paras']
for _, p in item.items():
para_text = p['para_text']
is_title = p['is_para_title']
title_level = p['para_title_level']
md_title_prefix = '#' * title_level
if is_title:
page_lst.append(f'{md_title_prefix} {para_text}')
else:
page_lst.append(para_text)
"""拼装成一个页面的文本"""
page_md = '\n\n'.join(page_lst)
"""插入图片"""
for img in all_page_images:
imgbox = img['bbox']
img_content = f"![]({img['image_path']})"
# 先看在哪个block内
for block in pymu_raw_blocks:
bbox = block['bbox']
if (
bbox[0] - 1 <= imgbox[0] < bbox[2] + 1
and bbox[1] - 1 <= imgbox[1] < bbox[3] + 1
): # 确定在block内
for l in block['lines']: # noqa: E741
line_box = l['bbox']
if (
line_box[0] - 1 <= imgbox[0] < line_box[2] + 1
and line_box[1] - 1 <= imgbox[1] < line_box[3] + 1
): # 在line内的,插入line前面
line_txt = ''.join([s['text'] for s in l['spans']])
page_md = __insert_before(
page_md, img_content, line_txt
)
break
break
else: # 在行与行之间
# 找到图片x0,y0与line的x0,y0最近的line
min_distance = 100000
min_line = None
for l in block['lines']: # noqa: E741
line_box = l['bbox']
distance = math.sqrt(
(line_box[0] - imgbox[0]) ** 2
+ (line_box[1] - imgbox[1]) ** 2
)
if distance < min_distance:
min_distance = distance
min_line = l
if min_line:
line_txt = ''.join(
[s['text'] for s in min_line['spans']]
)
img_h = imgbox[3] - imgbox[1]
if min_distance < img_h: # 文字在图片前面
page_md = __insert_after(
page_md, img_content, line_txt
)
else:
page_md = __insert_before(
page_md, img_content, line_txt
)
else:
logger.error(
f"Can't find the location of image {img['image_path']} in the markdown file #1"
)
else: # 应当在两个block之间
# 找到上方最近的block,如果上方没有就找大下方最近的block
top_txt_block = find_top_nearest_text_bbox(pymu_raw_blocks, imgbox)
if top_txt_block:
line_txt = ''.join(
[s['text'] for s in top_txt_block['lines'][-1]['spans']]
)
page_md = __insert_after(page_md, img_content, line_txt)
else:
bottom_txt_block = find_bottom_nearest_text_bbox(
pymu_raw_blocks, imgbox
)
if bottom_txt_block:
line_txt = ''.join(
[
s['text']
for s in bottom_txt_block['lines'][0]['spans']
]
)
page_md = __insert_before(page_md, img_content, line_txt)
else:
logger.error(
f"Can't find the location of image {img['image_path']} in the markdown file #2"
)
content_lst.append(page_md)
"""拼装成全部页面的文本"""
content_text = '\n\n'.join(content_lst)
return content_text
def __insert_after_para(text, type, element, content_list):
"""在content_list中找到text,将image_path作为一个新的node插入到text后面."""
for i, c in enumerate(content_list):
content_type = c.get('type')
if content_type in UNI_FORMAT_TEXT_TYPE and text in c.get('text', ''):
if type == 'image':
content_node = {
'type': 'image',
'img_path': element.get('image_path'),
'img_alt': '',
'img_title': '',
'img_caption': '',
}
elif type == 'table':
content_node = {
'type': 'table',
'img_path': element.get('image_path'),
'table_latex': element.get('text'),
'table_title': '',
'table_caption': '',
'table_quality': element.get('quality'),
}
content_list.insert(i + 1, content_node)
break
else:
logger.error(
f"Can't find the location of image {element.get('image_path')} in the markdown file, search target is {text}"
)
def __insert_before_para(text, type, element, content_list):
"""在content_list中找到text,将image_path作为一个新的node插入到text前面."""
for i, c in enumerate(content_list):
content_type = c.get('type')
if content_type in UNI_FORMAT_TEXT_TYPE and text in c.get('text', ''):
if type == 'image':
content_node = {
'type': 'image',
'img_path': element.get('image_path'),
'img_alt': '',
'img_title': '',
'img_caption': '',
}
elif type == 'table':
content_node = {
'type': 'table',
'img_path': element.get('image_path'),
'table_latex': element.get('text'),
'table_title': '',
'table_caption': '',
'table_quality': element.get('quality'),
}
content_list.insert(i, content_node)
break
else:
logger.error(
f"Can't find the location of image {element.get('image_path')} in the markdown file, search target is {text}"
)
def mk_universal_format(pdf_info_list: list, img_buket_path):
"""构造统一格式 https://aicarrier.feishu.cn/wiki/FqmMwcH69iIdCWkkyjvcDwNUnTY."""
content_lst = []
for page_info in pdf_info_list:
page_lst = [] # 一个page内的段落列表
para_blocks = page_info.get('para_blocks')
pymu_raw_blocks = page_info.get('preproc_blocks')
all_page_images = []
all_page_images.extend(page_info.get('images', []))
all_page_images.extend(page_info.get('image_backup', []))
# all_page_images.extend(page_info.get("tables",[]))
# all_page_images.extend(page_info.get("table_backup",[]) )
all_page_tables = []
all_page_tables.extend(page_info.get('tables', []))
if not para_blocks or not pymu_raw_blocks: # 只有图片的拼接的场景
for img in all_page_images:
content_node = {
'type': 'image',
'img_path': join_path(img_buket_path, img['image_path']),
'img_alt': '',
'img_title': '',
'img_caption': '',
}
page_lst.append(content_node) # TODO 图片顺序
for table in all_page_tables:
content_node = {
'type': 'table',
'img_path': join_path(img_buket_path, table['image_path']),
'table_latex': table.get('text'),
'table_title': '',
'table_caption': '',
'table_quality': table.get('quality'),
}
page_lst.append(content_node) # TODO 图片顺序
else:
for block in para_blocks:
item = block['paras']
for _, p in item.items():
font_type = p[
'para_font_type'
] # 对于文本来说,要么是普通文本,要么是个行间公式
if font_type == TYPE_INTERLINE_EQUATION:
content_node = {'type': 'equation', 'latex': p['para_text']}
page_lst.append(content_node)
else:
para_text = p['para_text']
is_title = p['is_para_title']
title_level = p['para_title_level']
if is_title:
content_node = {
'type': f'h{title_level}',
'text': para_text,
}
page_lst.append(content_node)
else:
content_node = {'type': 'text', 'text': para_text}
page_lst.append(content_node)
content_lst.extend(page_lst)
"""插入图片"""
for img in all_page_images:
insert_img_or_table('image', img, pymu_raw_blocks, content_lst)
"""插入表格"""
for table in all_page_tables:
insert_img_or_table('table', table, pymu_raw_blocks, content_lst)
# end for
return content_lst
def insert_img_or_table(type, element, pymu_raw_blocks, content_lst):
element_bbox = element['bbox']
# 先看在哪个block内
for block in pymu_raw_blocks:
bbox = block['bbox']
if (
bbox[0] - 1 <= element_bbox[0] < bbox[2] + 1
and bbox[1] - 1 <= element_bbox[1] < bbox[3] + 1
): # 确定在这个大的block内,然后进入逐行比较距离
for l in block['lines']: # noqa: E741
line_box = l['bbox']
if (
line_box[0] - 1 <= element_bbox[0] < line_box[2] + 1
and line_box[1] - 1 <= element_bbox[1] < line_box[3] + 1
): # 在line内的,插入line前面
line_txt = ''.join([s['text'] for s in l['spans']])
__insert_before_para(line_txt, type, element, content_lst)
break
break
else: # 在行与行之间
# 找到图片x0,y0与line的x0,y0最近的line
min_distance = 100000
min_line = None
for l in block['lines']: # noqa: E741
line_box = l['bbox']
distance = math.sqrt(
(line_box[0] - element_bbox[0]) ** 2
+ (line_box[1] - element_bbox[1]) ** 2
)
if distance < min_distance:
min_distance = distance
min_line = l
if min_line:
line_txt = ''.join([s['text'] for s in min_line['spans']])
img_h = element_bbox[3] - element_bbox[1]
if min_distance < img_h: # 文字在图片前面
__insert_after_para(line_txt, type, element, content_lst)
else:
__insert_before_para(line_txt, type, element, content_lst)
break
else:
logger.error(
f"Can't find the location of image {element.get('image_path')} in the markdown file #1"
)
else: # 应当在两个block之间
# 找到上方最近的block,如果上方没有就找大下方最近的block
top_txt_block = find_top_nearest_text_bbox(pymu_raw_blocks, element_bbox)
if top_txt_block:
line_txt = ''.join([s['text'] for s in top_txt_block['lines'][-1]['spans']])
__insert_after_para(line_txt, type, element, content_lst)
else:
bottom_txt_block = find_bottom_nearest_text_bbox(
pymu_raw_blocks, element_bbox
)
if bottom_txt_block:
line_txt = ''.join(
[s['text'] for s in bottom_txt_block['lines'][0]['spans']]
)
__insert_before_para(line_txt, type, element, content_lst)
else: # TODO ,图片可能独占一列,这种情况上下是没有图片的
logger.error(
f"Can't find the location of image {element.get('image_path')} in the markdown file #2"
)
def mk_mm_markdown(content_list):
"""基于同一格式的内容列表,构造markdown,含图片."""
content_md = []
for c in content_list:
content_type = c.get('type')
if content_type == 'text':
content_md.append(c.get('text'))
elif content_type == 'equation':
content = c.get('latex')
if content.startswith('$$') and content.endswith('$$'):
content_md.append(content)
else:
content_md.append(f"\n$$\n{c.get('latex')}\n$$\n")
elif content_type in UNI_FORMAT_TEXT_TYPE:
content_md.append(f"{'#'*int(content_type[1])} {c.get('text')}")
elif content_type == 'image':
content_md.append(f"![]({c.get('img_path')})")
return '\n\n'.join(content_md)
def mk_nlp_markdown(content_list):
"""基于同一格式的内容列表,构造markdown,不含图片."""
content_md = []
for c in content_list:
content_type = c.get('type')
if content_type == 'text':
content_md.append(c.get('text'))
elif content_type == 'equation':
content_md.append(f"$$\n{c.get('latex')}\n$$")
elif content_type == 'table':
content_md.append(f"$$$\n{c.get('table_latex')}\n$$$")
elif content_type in UNI_FORMAT_TEXT_TYPE:
content_md.append(f"{'#'*int(content_type[1])} {c.get('text')}")
return '\n\n'.join(content_md)
This diff is collapsed.
from magic_pdf.layout.bbox_sort import X0_EXT_IDX, X0_IDX, X1_EXT_IDX, X1_IDX, Y0_IDX, Y1_EXT_IDX, Y1_IDX
from magic_pdf.libs.boxbase import _is_bottom_full_overlap, _left_intersect, _right_intersect
def find_all_left_bbox_direct(this_bbox, all_bboxes) -> list:
"""
在all_bboxes里找到所有右侧垂直方向上和this_bbox有重叠的bbox, 不用延长线
并且要考虑两个box左右相交的情况,如果相交了,那么右侧的box就不算最左侧。
"""
left_boxes = [box for box in all_bboxes if box[X1_IDX] <= this_bbox[X0_IDX]
and any([
box[Y0_IDX] < this_bbox[Y0_IDX] < box[Y1_IDX], box[Y0_IDX] < this_bbox[Y1_IDX] < box[Y1_IDX],
this_bbox[Y0_IDX] < box[Y0_IDX] < this_bbox[Y1_IDX], this_bbox[Y0_IDX] < box[Y1_IDX] < this_bbox[Y1_IDX],
box[Y0_IDX]==this_bbox[Y0_IDX] and box[Y1_IDX]==this_bbox[Y1_IDX]]) or _left_intersect(box[:4], this_bbox[:4])]
# 然后再过滤一下,找到水平上距离this_bbox最近的那个——x1最大的那个
if len(left_boxes) > 0:
left_boxes.sort(key=lambda x: x[X1_EXT_IDX] if x[X1_EXT_IDX] else x[X1_IDX], reverse=True)
left_boxes = left_boxes[0]
else:
left_boxes = None
return left_boxes
def find_all_right_bbox_direct(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox右侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
right_bboxes = [box for box in all_bboxes if box[X0_IDX] >= this_bbox[X1_IDX]
and any([
this_bbox[Y0_IDX] < box[Y0_IDX] < this_bbox[Y1_IDX], this_bbox[Y0_IDX] < box[Y1_IDX] < this_bbox[Y1_IDX],
box[Y0_IDX] < this_bbox[Y0_IDX] < box[Y1_IDX], box[Y0_IDX] < this_bbox[Y1_IDX] < box[Y1_IDX],
box[Y0_IDX]==this_bbox[Y0_IDX] and box[Y1_IDX]==this_bbox[Y1_IDX]]) or _right_intersect(this_bbox[:4], box[:4])]
if len(right_bboxes)>0:
right_bboxes.sort(key=lambda x: x[X0_EXT_IDX] if x[X0_EXT_IDX] else x[X0_IDX])
right_bboxes = right_bboxes[0]
else:
right_bboxes = None
return right_bboxes
def find_all_top_bbox_direct(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox上侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
top_bboxes = [box for box in all_bboxes if box[Y1_IDX] <= this_bbox[Y0_IDX] and any([
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(top_bboxes)>0:
top_bboxes.sort(key=lambda x: x[Y1_EXT_IDX] if x[Y1_EXT_IDX] else x[Y1_IDX], reverse=True)
top_bboxes = top_bboxes[0]
else:
top_bboxes = None
return top_bboxes
def find_all_bottom_bbox_direct(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox下侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
bottom_bboxes = [box for box in all_bboxes if box[Y0_IDX] >= this_bbox[Y1_IDX] and any([
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(bottom_bboxes)>0:
bottom_bboxes.sort(key=lambda x: x[Y0_IDX])
bottom_bboxes = bottom_bboxes[0]
else:
bottom_bboxes = None
return bottom_bboxes
# ===================================================================================================================
def find_bottom_bbox_direct_from_right_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox下侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
bottom_bboxes = [box for box in all_bboxes if box[Y0_IDX] >= this_bbox[Y1_IDX] and any([
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(bottom_bboxes)>0:
# y0最小, X1最大的那个,也就是box上边缘最靠近this_bbox的那个,并且还最靠右
bottom_bboxes.sort(key=lambda x: x[Y0_IDX])
bottom_bboxes = [box for box in bottom_bboxes if box[Y0_IDX]==bottom_bboxes[0][Y0_IDX]]
# 然后再y1相同的情况下,找到x1最大的那个
bottom_bboxes.sort(key=lambda x: x[X1_IDX], reverse=True)
bottom_bboxes = bottom_bboxes[0]
else:
bottom_bboxes = None
return bottom_bboxes
def find_bottom_bbox_direct_from_left_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox下侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
bottom_bboxes = [box for box in all_bboxes if box[Y0_IDX] >= this_bbox[Y1_IDX] and any([
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(bottom_bboxes)>0:
# y0最小, X0最小的那个
bottom_bboxes.sort(key=lambda x: x[Y0_IDX])
bottom_bboxes = [box for box in bottom_bboxes if box[Y0_IDX]==bottom_bboxes[0][Y0_IDX]]
# 然后再y0相同的情况下,找到x0最小的那个
bottom_bboxes.sort(key=lambda x: x[X0_IDX])
bottom_bboxes = bottom_bboxes[0]
else:
bottom_bboxes = None
return bottom_bboxes
def find_top_bbox_direct_from_left_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox上侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
top_bboxes = [box for box in all_bboxes if box[Y1_IDX] <= this_bbox[Y0_IDX] and any([
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(top_bboxes)>0:
# y1最大, X0最小的那个
top_bboxes.sort(key=lambda x: x[Y1_IDX], reverse=True)
top_bboxes = [box for box in top_bboxes if box[Y1_IDX]==top_bboxes[0][Y1_IDX]]
# 然后再y1相同的情况下,找到x0最小的那个
top_bboxes.sort(key=lambda x: x[X0_IDX])
top_bboxes = top_bboxes[0]
else:
top_bboxes = None
return top_bboxes
def find_top_bbox_direct_from_right_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox上侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
top_bboxes = [box for box in all_bboxes if box[Y1_IDX] <= this_bbox[Y0_IDX] and any([
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(top_bboxes)>0:
# y1最大, X1最大的那个
top_bboxes.sort(key=lambda x: x[Y1_IDX], reverse=True)
top_bboxes = [box for box in top_bboxes if box[Y1_IDX]==top_bboxes[0][Y1_IDX]]
# 然后再y1相同的情况下,找到x1最大的那个
top_bboxes.sort(key=lambda x: x[X1_IDX], reverse=True)
top_bboxes = top_bboxes[0]
else:
top_bboxes = None
return top_bboxes
# ===================================================================================================================
def get_left_edge_bboxes(all_bboxes) -> list:
"""
返回最左边的bbox
"""
left_bboxes = [box for box in all_bboxes if find_all_left_bbox_direct(box, all_bboxes) is None]
return left_bboxes
def get_right_edge_bboxes(all_bboxes) -> list:
"""
返回最右边的bbox
"""
right_bboxes = [box for box in all_bboxes if find_all_right_bbox_direct(box, all_bboxes) is None]
return right_bboxes
def fix_vertical_bbox_pos(bboxes:list):
"""
检查这批bbox在垂直方向是否有轻微的重叠,如果重叠了,就把重叠的bbox往下移动一点
在x方向上必须一个包含或者被包含,或者完全重叠,不能只有部分重叠
"""
bboxes.sort(key=lambda x: x[Y0_IDX]) # 从上向下排列
for i in range(0, len(bboxes)):
for j in range(i+1, len(bboxes)):
if _is_bottom_full_overlap(bboxes[i][:4], bboxes[j][:4]):
# 如果两个bbox有部分重叠,那么就把下面的bbox往下移动一点
bboxes[j][Y0_IDX] = bboxes[i][Y1_IDX] + 2 # 2是个经验值
break
return bboxes
This diff is collapsed.
"""
找到能分割布局的水平的横线、色块
"""
import os
from magic_pdf.libs.commons import fitz
from magic_pdf.libs.boxbase import _is_in_or_part_overlap
def __rect_filter_by_width(rect, page_w, page_h):
mid_x = page_w/2
if rect[0]< mid_x < rect[2]:
return True
return False
def __rect_filter_by_pos(rect, image_bboxes, table_bboxes):
"""
不能出现在table和image的位置
"""
for box in image_bboxes:
if _is_in_or_part_overlap(rect, box):
return False
for box in table_bboxes:
if _is_in_or_part_overlap(rect, box):
return False
return True
def __debug_show_page(page, bboxes1: list,bboxes2: list,bboxes3: list,):
save_path = "./tmp/debug.pdf"
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open('')
width = page.rect.width
height = page.rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes1:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['blue'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes2:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes3:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=None)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def get_spilter_of_page(page, image_bboxes, table_bboxes):
"""
获取到色块和横线
"""
cdrawings = page.get_cdrawings()
spilter_bbox = []
for block in cdrawings:
if 'fill' in block:
fill = block['fill']
if 'fill' in block and block['fill'] and block['fill']!=(1.0,1.0,1.0):
rect = block['rect']
if __rect_filter_by_width(rect, page.rect.width, page.rect.height) and __rect_filter_by_pos(rect, image_bboxes, table_bboxes):
spilter_bbox.append(list(rect))
"""过滤、修正一下这些box。因为有时候会有一些矩形,高度为0或者为负数,造成layout计算无限循环。如果是负高度或者0高度,统一修正为高度为1"""
for box in spilter_bbox:
if box[3]-box[1] <= 0:
box[3] = box[1] + 1
#__debug_show_page(page, spilter_bbox, [], [])
return spilter_bbox
"""
This is an advanced PyMuPDF utility for detecting multi-column pages.
It can be used in a shell script, or its main function can be imported and
invoked as descript below.
Features
---------
- Identify text belonging to (a variable number of) columns on the page.
- Text with different background color is handled separately, allowing for
easier treatment of side remarks, comment boxes, etc.
- Uses text block detection capability to identify text blocks and
uses the block bboxes as primary structuring principle.
- Supports ignoring footers via a footer margin parameter.
- Returns re-created text boundary boxes (integer coordinates), sorted ascending
by the top, then by the left coordinates.
Restrictions
-------------
- Only supporting horizontal, left-to-right text
- Returns a list of text boundary boxes - not the text itself. The caller is
expected to extract text from within the returned boxes.
- Text written above images is ignored altogether (option).
- This utility works as expected in most cases. The following situation cannot
be handled correctly:
* overlapping (non-disjoint) text blocks
* image captions are not recognized and are handled like normal text
Usage
------
- As a CLI shell command use
python multi_column.py input.pdf footer_margin
Where footer margin is the height of the bottom stripe to ignore on each page.
This code is intended to be modified according to your need.
- Use in a Python script as follows:
----------------------------------------------------------------------------------
from multi_column import column_boxes
# for each page execute
bboxes = column_boxes(page, footer_margin=50, no_image_text=True)
# bboxes is a list of fitz.IRect objects, that are sort ascending by their y0,
# then x0 coordinates. Their text content can be extracted by all PyMuPDF
# get_text() variants, like for instance the following:
for rect in bboxes:
print(page.get_text(clip=rect, sort=True))
----------------------------------------------------------------------------------
"""
import sys
from magic_pdf.libs.commons import fitz
def column_boxes(page, footer_margin=50, header_margin=50, no_image_text=True):
"""Determine bboxes which wrap a column."""
paths = page.get_drawings()
bboxes = []
# path rectangles
path_rects = []
# image bboxes
img_bboxes = []
# bboxes of non-horizontal text
# avoid when expanding horizontal text boxes
vert_bboxes = []
# compute relevant page area
clip = +page.rect
clip.y1 -= footer_margin # Remove footer area
clip.y0 += header_margin # Remove header area
def can_extend(temp, bb, bboxlist):
"""Determines whether rectangle 'temp' can be extended by 'bb'
without intersecting any of the rectangles contained in 'bboxlist'.
Items of bboxlist may be None if they have been removed.
Returns:
True if 'temp' has no intersections with items of 'bboxlist'.
"""
for b in bboxlist:
if not intersects_bboxes(temp, vert_bboxes) and (
b == None or b == bb or (temp & b).is_empty
):
continue
return False
return True
def in_bbox(bb, bboxes):
"""Return 1-based number if a bbox contains bb, else return 0."""
for i, bbox in enumerate(bboxes):
if bb in bbox:
return i + 1
return 0
def intersects_bboxes(bb, bboxes):
"""Return True if a bbox intersects bb, else return False."""
for bbox in bboxes:
if not (bb & bbox).is_empty:
return True
return False
def extend_right(bboxes, width, path_bboxes, vert_bboxes, img_bboxes):
"""Extend a bbox to the right page border.
Whenever there is no text to the right of a bbox, enlarge it up
to the right page border.
Args:
bboxes: (list[IRect]) bboxes to check
width: (int) page width
path_bboxes: (list[IRect]) bboxes with a background color
vert_bboxes: (list[IRect]) bboxes with vertical text
img_bboxes: (list[IRect]) bboxes of images
Returns:
Potentially modified bboxes.
"""
for i, bb in enumerate(bboxes):
# do not extend text with background color
if in_bbox(bb, path_bboxes):
continue
# do not extend text in images
if in_bbox(bb, img_bboxes):
continue
# temp extends bb to the right page border
temp = +bb
temp.x1 = width
# do not cut through colored background or images
if intersects_bboxes(temp, path_bboxes + vert_bboxes + img_bboxes):
continue
# also, do not intersect other text bboxes
check = can_extend(temp, bb, bboxes)
if check:
bboxes[i] = temp # replace with enlarged bbox
return [b for b in bboxes if b != None]
def clean_nblocks(nblocks):
"""Do some elementary cleaning."""
# 1. remove any duplicate blocks.
blen = len(nblocks)
if blen < 2:
return nblocks
start = blen - 1
for i in range(start, -1, -1):
bb1 = nblocks[i]
bb0 = nblocks[i - 1]
if bb0 == bb1:
del nblocks[i]
# 2. repair sequence in special cases:
# consecutive bboxes with almost same bottom value are sorted ascending
# by x-coordinate.
y1 = nblocks[0].y1 # first bottom coordinate
i0 = 0 # its index
i1 = -1 # index of last bbox with same bottom
# Iterate over bboxes, identifying segments with approx. same bottom value.
# Replace every segment by its sorted version.
for i in range(1, len(nblocks)):
b1 = nblocks[i]
if abs(b1.y1 - y1) > 10: # different bottom
if i1 > i0: # segment length > 1? Sort it!
nblocks[i0 : i1 + 1] = sorted(
nblocks[i0 : i1 + 1], key=lambda b: b.x0
)
y1 = b1.y1 # store new bottom value
i0 = i # store its start index
i1 = i # store current index
if i1 > i0: # segment waiting to be sorted
nblocks[i0 : i1 + 1] = sorted(nblocks[i0 : i1 + 1], key=lambda b: b.x0)
return nblocks
# extract vector graphics
for p in paths:
path_rects.append(p["rect"].irect)
path_bboxes = path_rects
# sort path bboxes by ascending top, then left coordinates
path_bboxes.sort(key=lambda b: (b.y0, b.x0))
# bboxes of images on page, no need to sort them
for item in page.get_images():
img_bboxes.extend(page.get_image_rects(item[0]))
# blocks of text on page
blocks = page.get_text(
"dict",
flags=fitz.TEXTFLAGS_TEXT,
clip=clip,
)["blocks"]
# Make block rectangles, ignoring non-horizontal text
for b in blocks:
bbox = fitz.IRect(b["bbox"]) # bbox of the block
# ignore text written upon images
if no_image_text and in_bbox(bbox, img_bboxes):
continue
# confirm first line to be horizontal
line0 = b["lines"][0] # get first line
if line0["dir"] != (1, 0): # only accept horizontal text
vert_bboxes.append(bbox)
continue
srect = fitz.EMPTY_IRECT()
for line in b["lines"]:
lbbox = fitz.IRect(line["bbox"])
text = "".join([s["text"].strip() for s in line["spans"]])
if len(text) > 1:
srect |= lbbox
bbox = +srect
if not bbox.is_empty:
bboxes.append(bbox)
# Sort text bboxes by ascending background, top, then left coordinates
bboxes.sort(key=lambda k: (in_bbox(k, path_bboxes), k.y0, k.x0))
# Extend bboxes to the right where possible
bboxes = extend_right(
bboxes, int(page.rect.width), path_bboxes, vert_bboxes, img_bboxes
)
# immediately return of no text found
if bboxes == []:
return []
# --------------------------------------------------------------------
# Join bboxes to establish some column structure
# --------------------------------------------------------------------
# the final block bboxes on page
nblocks = [bboxes[0]] # pre-fill with first bbox
bboxes = bboxes[1:] # remaining old bboxes
for i, bb in enumerate(bboxes): # iterate old bboxes
check = False # indicates unwanted joins
# check if bb can extend one of the new blocks
for j in range(len(nblocks)):
nbb = nblocks[j] # a new block
# never join across columns
if bb == None or nbb.x1 < bb.x0 or bb.x1 < nbb.x0:
continue
# never join across different background colors
if in_bbox(nbb, path_bboxes) != in_bbox(bb, path_bboxes):
continue
temp = bb | nbb # temporary extension of new block
check = can_extend(temp, nbb, nblocks)
if check == True:
break
if not check: # bb cannot be used to extend any of the new bboxes
nblocks.append(bb) # so add it to the list
j = len(nblocks) - 1 # index of it
temp = nblocks[j] # new bbox added
# check if some remaining bbox is contained in temp
check = can_extend(temp, bb, bboxes)
if check == False:
nblocks.append(bb)
else:
nblocks[j] = temp
bboxes[i] = None
# do some elementary cleaning
nblocks = clean_nblocks(nblocks)
# return identified text bboxes
return nblocks
if __name__ == "__main__":
"""Only for debugging purposes, currently.
Draw red borders around the returned text bboxes and insert
the bbox number.
Then save the file under the name "input-blocks.pdf".
"""
# get the file name
filename = sys.argv[1]
# check if footer margin is given
if len(sys.argv) > 2:
footer_margin = int(sys.argv[2])
else: # use default vaue
footer_margin = 50
# check if header margin is given
if len(sys.argv) > 3:
header_margin = int(sys.argv[3])
else: # use default vaue
header_margin = 50
# open document
doc = fitz.open(filename)
# iterate over the pages
for page in doc:
# remove any geometry issues
page.wrap_contents()
# get the text bboxes
bboxes = column_boxes(page, footer_margin=footer_margin, header_margin=header_margin)
# prepare a canvas to draw rectangles and text
shape = page.new_shape()
# iterate over the bboxes
for i, rect in enumerate(bboxes):
shape.draw_rect(rect) # draw a border
# write sequence number
shape.insert_text(rect.tl + (5, 15), str(i), color=fitz.pdfcolor["red"])
# finish drawing / text with color red
shape.finish(color=fitz.pdfcolor["red"])
shape.commit() # store to the page
# save document with text bboxes
doc.ez_save(filename.replace(".pdf", "-blocks.pdf"))
\ No newline at end of file
import os
import csv
import json
import pandas as pd
from pandas import DataFrame as df
from matplotlib import pyplot as plt
from termcolor import cprint
"""
Execute this script in the following way:
1. Make sure there are pdf_dic.json files under the directory code-clean/tmp/unittest/md/, such as the following:
code-clean/tmp/unittest/md/scihub/scihub_00500000/libgen.scimag00527000-00527999.zip_10.1002/app.25178/pdf_dic.json
2. Under the directory code-clean, execute the following command:
$ python -m libs.calc_span_stats
"""
def print_green_on_red(text):
cprint(text, "green", "on_red", attrs=["bold"], end="\n\n")
def print_green(text):
print()
cprint(text, "green", attrs=["bold"], end="\n\n")
def print_red(text):
print()
cprint(text, "red", attrs=["bold"], end="\n\n")
def safe_get(dict_obj, key, default):
val = dict_obj.get(key)
if val is None:
return default
else:
return val
class SpanStatsCalc:
"""Calculate statistics of span."""
def draw_charts(self, span_stats: pd.DataFrame, fig_num: int, save_path: str):
"""Draw multiple figures in one figure."""
# make a canvas
fig = plt.figure(fig_num, figsize=(20, 20))
pass
def calc_stats_per_dict(self, pdf_dict) -> pd.DataFrame:
"""Calculate statistics per pdf_dict."""
span_stats = pd.DataFrame()
span_stats = []
span_id = 0
for page_id, blocks in pdf_dict.items():
if page_id.startswith("page_"):
if "para_blocks" in blocks.keys():
for para_block in blocks["para_blocks"]:
for line in para_block["lines"]:
for span in line["spans"]:
span_text = safe_get(span, "text", "")
span_font_name = safe_get(span, "font", "")
span_font_size = safe_get(span, "size", 0)
span_font_color = safe_get(span, "color", "")
span_font_flags = safe_get(span, "flags", 0)
span_font_flags_decoded = safe_get(span, "decomposed_flags", {})
span_is_super_script = safe_get(span_font_flags_decoded, "is_superscript", False)
span_is_italic = safe_get(span_font_flags_decoded, "is_italic", False)
span_is_serifed = safe_get(span_font_flags_decoded, "is_serifed", False)
span_is_sans_serifed = safe_get(span_font_flags_decoded, "is_sans_serifed", False)
span_is_monospaced = safe_get(span_font_flags_decoded, "is_monospaced", False)
span_is_proportional = safe_get(span_font_flags_decoded, "is_proportional", False)
span_is_bold = safe_get(span_font_flags_decoded, "is_bold", False)
span_stats.append(
{
"span_id": span_id, # id of span
"page_id": page_id, # page number of pdf
"span_text": span_text, # text of span
"span_font_name": span_font_name, # font name of span
"span_font_size": span_font_size, # font size of span
"span_font_color": span_font_color, # font color of span
"span_font_flags": span_font_flags, # font flags of span
"span_is_superscript": int(
span_is_super_script
), # indicate whether the span is super script or not
"span_is_italic": int(span_is_italic), # indicate whether the span is italic or not
"span_is_serifed": int(span_is_serifed), # indicate whether the span is serifed or not
"span_is_sans_serifed": int(
span_is_sans_serifed
), # indicate whether the span is sans serifed or not
"span_is_monospaced": int(
span_is_monospaced
), # indicate whether the span is monospaced or not
"span_is_proportional": int(
span_is_proportional
), # indicate whether the span is proportional or not
"span_is_bold": int(span_is_bold), # indicate whether the span is bold or not
}
)
span_id += 1
span_stats = pd.DataFrame(span_stats)
# print(span_stats)
return span_stats
def __find_pdf_dic_files(
jf_name="pdf_dic.json",
base_code_name="code-clean",
tgt_base_dir_name="tmp",
unittest_dir_name="unittest",
md_dir_name="md",
book_names=[
"scihub",
], # other possible values: "zlib", "arxiv" and so on
):
pdf_dict_files = []
curr_dir = os.path.dirname(__file__)
for i in range(len(curr_dir)):
if curr_dir[i : i + len(base_code_name)] == base_code_name:
base_code_dir_name = curr_dir[: i + len(base_code_name)]
for book_name in book_names:
search_dir_relative_name = os.path.join(tgt_base_dir_name, unittest_dir_name, md_dir_name, book_name)
if os.path.exists(base_code_dir_name):
search_dir_name = os.path.join(base_code_dir_name, search_dir_relative_name)
for root, dirs, files in os.walk(search_dir_name):
for file in files:
if file == jf_name:
pdf_dict_files.append(os.path.join(root, file))
break
return pdf_dict_files
def combine_span_texts(group_df, span_stats):
combined_span_texts = []
for _, row in group_df.iterrows():
curr_span_id = row.name
curr_span_text = row["span_text"]
pre_span_id = curr_span_id - 1
pre_span_text = span_stats.at[pre_span_id, "span_text"] if pre_span_id in span_stats.index else ""
next_span_id = curr_span_id + 1
next_span_text = span_stats.at[next_span_id, "span_text"] if next_span_id in span_stats.index else ""
# pointer_sign is a right arrow if the span is superscript, otherwise it is a down arrow
pointer_sign = "→ → → "
combined_text = "\n".join([pointer_sign + pre_span_text, pointer_sign + curr_span_text, pointer_sign + next_span_text])
combined_span_texts.append(combined_text)
return "\n\n".join(combined_span_texts)
# pd.set_option("display.max_colwidth", None) # 设置为 None 来显示完整的文本
pd.set_option("display.max_rows", None) # 设置为 None 来显示更多的行
def main():
pdf_dict_files = __find_pdf_dic_files()
# print(pdf_dict_files)
span_stats_calc = SpanStatsCalc()
for pdf_dict_file in pdf_dict_files:
print("-" * 100)
print_green_on_red(f"Processing {pdf_dict_file}")
with open(pdf_dict_file, "r", encoding="utf-8") as f:
pdf_dict = json.load(f)
raw_df = span_stats_calc.calc_stats_per_dict(pdf_dict)
save_path = pdf_dict_file.replace("pdf_dic.json", "span_stats_raw.csv")
raw_df.to_csv(save_path, index=False)
filtered_df = raw_df[raw_df["span_is_superscript"] == 1]
if filtered_df.empty:
print("No superscript span found!")
continue
filtered_grouped_df = filtered_df.groupby(["span_font_name", "span_font_size", "span_font_color"])
combined_span_texts = filtered_grouped_df.apply(combine_span_texts, span_stats=raw_df) # type: ignore
final_df = filtered_grouped_df.size().reset_index(name="count")
final_df["span_texts"] = combined_span_texts.reset_index(level=[0, 1, 2], drop=True)
print(final_df)
final_df["span_texts"] = final_df["span_texts"].apply(lambda x: x.replace("\n", "\r\n"))
save_path = pdf_dict_file.replace("pdf_dic.json", "span_stats_final.csv")
# 使用 UTF-8 编码并添加 BOM,确保所有字段被双引号包围
final_df.to_csv(save_path, index=False, encoding="utf-8-sig", quoting=csv.QUOTE_ALL)
# 创建一个 2x2 的图表布局
fig, axs = plt.subplots(2, 2, figsize=(15, 10))
# 按照 span_font_name 分类作图
final_df.groupby("span_font_name")["count"].sum().plot(kind="bar", ax=axs[0, 0], title="By Font Name")
# 按照 span_font_size 分类作图
final_df.groupby("span_font_size")["count"].sum().plot(kind="bar", ax=axs[0, 1], title="By Font Size")
# 按照 span_font_color 分类作图
final_df.groupby("span_font_color")["count"].sum().plot(kind="bar", ax=axs[1, 0], title="By Font Color")
# 按照 span_font_name、span_font_size 和 span_font_color 共同分类作图
grouped = final_df.groupby(["span_font_name", "span_font_size", "span_font_color"])
grouped["count"].sum().unstack().plot(kind="bar", ax=axs[1, 1], title="Combined Grouping")
# 调整布局
plt.tight_layout()
# 显示图表
# plt.show()
# 保存图表到 PNG 文件
save_path = pdf_dict_file.replace("pdf_dic.json", "span_stats_combined.png")
plt.savefig(save_path)
# 清除画布
plt.clf()
if __name__ == "__main__":
main()
from collections import Counter
from magic_pdf.libs.language import detect_lang
def get_language_from_model(model_list: list):
language_lst = []
for ocr_page_info in model_list:
page_text = ""
layout_dets = ocr_page_info["layout_dets"]
for layout_det in layout_dets:
category_id = layout_det["category_id"]
allow_category_id_list = [15]
if category_id in allow_category_id_list:
page_text += layout_det["text"]
page_language = detect_lang(page_text)
language_lst.append(page_language)
# 统计text_language_list中每种语言的个数
count_dict = Counter(language_lst)
# 输出text_language_list中出现的次数最多的语言
language = max(count_dict, key=count_dict.get)
return language
import re
from os import path
from collections import Counter
from loguru import logger
# from langdetect import detect
import spacy
import en_core_web_sm
import zh_core_web_sm
from magic_pdf.libs.language import detect_lang
class NLPModels:
"""
How to upload local models to s3:
- config aws cli:
doc\SETUP-CLI.md
doc\setup_cli.sh
app\config\__init__.py
- $ cd {local_dir_storing_models}
- $ ls models
en_core_web_sm-3.7.1/
zh_core_web_sm-3.7.0/
- $ aws s3 sync models/ s3://llm-infra/models --profile=p_project_norm
- $ aws s3 --profile=p_project_norm ls s3://llm-infra/models/
PRE en_core_web_sm-3.7.1/
PRE zh_core_web_sm-3.7.0/
"""
def __init__(self):
# if OS is windows, set "TMP_DIR" to "D:/tmp"
home_dir = path.expanduser("~")
self.default_local_path = path.join(home_dir, ".nlp_models")
self.default_shared_path = "/share/pdf_processor/nlp_models"
self.default_hdfs_path = "hdfs://pdf_processor/nlp_models"
self.default_s3_path = "s3://llm-infra/models"
self.nlp_models = self.nlp_models = {
"en_core_web_sm": {
"type": "spacy",
"version": "3.7.1",
},
"en_core_web_md": {
"type": "spacy",
"version": "3.7.1",
},
"en_core_web_lg": {
"type": "spacy",
"version": "3.7.1",
},
"zh_core_web_sm": {
"type": "spacy",
"version": "3.7.0",
},
"zh_core_web_md": {
"type": "spacy",
"version": "3.7.0",
},
"zh_core_web_lg": {
"type": "spacy",
"version": "3.7.0",
},
}
self.en_core_web_sm_model = en_core_web_sm.load()
self.zh_core_web_sm_model = zh_core_web_sm.load()
def load_model(self, model_name, model_type, model_version):
if (
model_name in self.nlp_models
and self.nlp_models[model_name]["type"] == model_type
and self.nlp_models[model_name]["version"] == model_version
):
return spacy.load(model_name) if spacy.util.is_package(model_name) else None
else:
logger.error(f"Unsupported model name or version: {model_name} {model_version}")
return None
def detect_language(self, text, use_langdetect=False):
if len(text) == 0:
return None
if use_langdetect:
# print("use_langdetect")
# print(detect_lang(text))
# return detect_lang(text)
if detect_lang(text) == "zh":
return "zh"
else:
return "en"
if not use_langdetect:
en_count = len(re.findall(r"[a-zA-Z]", text))
cn_count = len(re.findall(r"[\u4e00-\u9fff]", text))
if en_count > cn_count:
return "en"
if cn_count > en_count:
return "zh"
def detect_entity_catgr_using_nlp(self, text, threshold=0.5):
"""
Detect entity categories using NLP models and return the most frequent entity types.
Parameters
----------
text : str
Text to be processed.
Returns
-------
str
The most frequent entity type.
"""
lang = self.detect_language(text, use_langdetect=True)
if lang == "en":
nlp_model = self.en_core_web_sm_model
elif lang == "zh":
nlp_model = self.zh_core_web_sm_model
else:
# logger.error(f"Unsupported language: {lang}")
return {}
# Splitting text into smaller parts
text_parts = re.split(r"[,;,;、\s & |]+", text)
text_parts = [part for part in text_parts if not re.match(r"[\d\W]+", part)] # Remove non-words
text_combined = " ".join(text_parts)
try:
doc = nlp_model(text_combined)
entity_counts = Counter([ent.label_ for ent in doc.ents])
word_counts_in_entities = Counter()
for ent in doc.ents:
word_counts_in_entities[ent.label_] += len(ent.text.split())
total_words_in_entities = sum(word_counts_in_entities.values())
total_words = len([token for token in doc if not token.is_punct])
if total_words_in_entities == 0 or total_words == 0:
return None
entity_percentage = total_words_in_entities / total_words
if entity_percentage < 0.5:
return None
most_common_entity, word_count = word_counts_in_entities.most_common(1)[0]
entity_percentage = word_count / total_words_in_entities
if entity_percentage >= threshold:
return most_common_entity
else:
return None
except Exception as e:
logger.error(f"Error in entity detection: {e}")
return None
def __main__():
nlpModel = NLPModels()
test_strings = [
"张三",
"张三, 李四,王五; 赵六",
"John Doe",
"Jane Smith",
"Lee, John",
"John Doe, Jane Smith; Alice Johnson,Bob Lee",
"孙七, Michael Jordan;赵八",
"David Smith Michael O'Connor; Kevin ßáçøñ",
"李雷·韩梅梅, 张三·李四",
"Charles Robert Darwin, Isaac Newton",
"莱昂纳多·迪卡普里奥, 杰克·吉伦哈尔",
"John Doe, Jane Smith; Alice Johnson",
"张三, 李四,王五; 赵六",
"Lei Wang, Jia Li, and Xiaojun Chen, LINKE YANG OU, and YUAN ZHANG",
"Rachel Mills & William Barry & Susanne B. Haga",
"Claire Chabut* and Jean-François Bussières",
"1 Department of Chemistry, Northeastern University, Shenyang 110004, China 2 State Key Laboratory of Polymer Physics and Chemistry, Changchun Institute of Applied Chemistry, Chinese Academy of Sciences, Changchun 130022, China",
"Changchun",
"china",
"Rongjun Song, 1,2 Baoyan Zhang, 1 Baotong Huang, 2 Tao Tang 2",
"Synergistic Effect of Supported Nickel Catalyst with Intumescent Flame-Retardants on Flame Retardancy and Thermal Stability of Polypropylene",
"Synergistic Effect of Supported Nickel Catalyst with",
"Intumescent Flame-Retardants on Flame Retardancy",
"and Thermal Stability of Polypropylene",
]
for test in test_strings:
print()
print(f"Original String: {test}")
result = nlpModel.detect_entity_catgr_using_nlp(test)
print(f"Detected entities: {result}")
if __name__ == "__main__":
__main__()
import math
def __inc_dict_val(mp, key, val_inc:int):
if mp.get(key):
mp[key] = mp[key] + val_inc
else:
mp[key] = val_inc
def get_text_block_base_info(block):
"""
获取这个文本块里的字体的颜色、字号、字体
按照正文字数最多的返回
"""
counter = {}
for line in block['lines']:
for span in line['spans']:
color = span['color']
size = round(span['size'], 2)
font = span['font']
txt_len = len(span['text'])
__inc_dict_val(counter, (color, size, font), txt_len)
c, s, ft = max(counter, key=counter.get)
return c, s, ft
\ No newline at end of file
from magic_pdf.libs.commons import fitz
import os
def draw_bbox_on_page(raw_pdf_doc: fitz.Document, paras_dict:dict, save_path: str):
"""
在page上画出bbox,保存到save_path
"""
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(save_path):
# 打开现有的 PDF 文件
doc = fitz.open(save_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open('')
color_map = {
'image': fitz.pdfcolor["yellow"],
'text': fitz.pdfcolor['blue'],
"table": fitz.pdfcolor['green']
}
for k, v in paras_dict.items():
page_idx = v['page_idx']
width = raw_pdf_doc[page_idx].rect.width
height = raw_pdf_doc[page_idx].rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for order, block in enumerate(v['preproc_blocks']):
rect = fitz.Rect(block['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=color_map['text'], fill_opacity=0.2)
shape.finish()
shape.commit()
for img in v['images']:
# 原始box画上去
rect = fitz.Rect(img['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'])
shape.finish()
shape.commit()
for img in v['image_backup']:
# 原始box画上去
rect = fitz.Rect(img['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['yellow'], fill=None)
shape.finish()
shape.commit()
for tb in v['droped_text_block']:
# 原始box画上去
rect = fitz.Rect(tb['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['black'], fill_opacity=0.4)
shape.finish()
shape.commit()
# TODO table
for tb in v['tables']:
rect = fitz.Rect(tb['bbox'])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['green'], fill_opacity=0.2)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if is_new_pdf:
doc.save(save_path)
else:
doc.saveIncr()
doc.close()
def debug_show_bbox(raw_pdf_doc: fitz.Document, page_idx: int, bboxes: list, droped_bboxes:list, expect_drop_bboxes:list, save_path: str, expected_page_id:int):
"""
以覆盖的方式写个临时的pdf,用于debug
"""
if page_idx!=expected_page_id:
return
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open('')
width = raw_pdf_doc[page_idx].rect.width
height = raw_pdf_doc[page_idx].rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['blue'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in droped_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in expect_drop_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=None)
shape.finish()
shape.commit()
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(bboxes)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def debug_show_page(page, bboxes1: list,bboxes2: list,bboxes3: list,):
save_path = "./tmp/debug.pdf"
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open('')
width = page.rect.width
height = page.rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes1:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['blue'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes2:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes3:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=None)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def draw_layout_bbox_on_page(raw_pdf_doc: fitz.Document, paras_dict:dict, header, footer, pdf_path: str):
"""
在page上画出bbox,保存到save_path
"""
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open('')
for k, v in paras_dict.items():
page_idx = v['page_idx']
layouts = v['layout_bboxes']
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(layouts):
border_offset = 1
rect_box = layout['layout_bbox']
layout_label = layout['layout_label']
fill_color = fitz.pdfcolor['pink'] if layout_label=='U' else None
rect_box = [rect_box[0]+1, rect_box[1]-border_offset, rect_box[2]-1, rect_box[3]+border_offset]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fill_color, fill_opacity=0.4)
"""
draw order text on layout box
"""
font_size = 10
shape.insert_text((rect_box[0] + 1, rect_box[1] + font_size), f"{order}", fontsize=font_size, color=(0, 0, 0))
"""画上footer header"""
if header:
shape.draw_rect(fitz.Rect(header))
shape.finish(color=None, fill=fitz.pdfcolor['black'], fill_opacity=0.2)
if footer:
shape.draw_rect(fitz.Rect(footer))
shape.finish(color=None, fill=fitz.pdfcolor['black'], fill_opacity=0.2)
shape.commit()
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
@DeprecationWarning
def draw_layout_on_page(raw_pdf_doc: fitz.Document, page_idx: int, page_layout: list, pdf_path: str):
"""
把layout的box用红色边框花在pdf_path的page_idx上
"""
def draw(shape, layout, fill_color=fitz.pdfcolor['pink']):
border_offset = 1
rect_box = layout['layout_bbox']
layout_label = layout['layout_label']
sub_layout = layout['sub_layout']
if len(sub_layout)==0:
fill_color = fill_color if layout_label=='U' else None
rect_box = [rect_box[0]+1, rect_box[1]-border_offset, rect_box[2]-1, rect_box[3]+border_offset]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fill_color, fill_opacity=0.2)
# if layout_label=='U':
# bad_boxes = layout.get("bad_boxes", [])
# for bad_box in bad_boxes:
# rect = fitz.Rect(*bad_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['red'], fill_opacity=0.2)
# else:
# rect = fitz.Rect(*rect_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['blue'])
for sub_layout in sub_layout:
draw(shape, sub_layout)
shape.commit()
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open('')
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(page_layout):
draw(shape, layout, fitz.pdfcolor['yellow'])
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(layout)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(pdf_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
\ No newline at end of file
This diff is collapsed.
from magic_pdf.para.commons import *
if sys.version_info[0] >= 3:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore
class BlockTerminationProcessor:
def __init__(self) -> None:
pass
def _is_consistent_lines(
self,
curr_line,
prev_line,
next_line,
consistent_direction, # 0 for prev, 1 for next, 2 for both
):
"""
This function checks if the line is consistent with its neighbors
Parameters
----------
curr_line : dict
current line
prev_line : dict
previous line
next_line : dict
next line
consistent_direction : int
0 for prev, 1 for next, 2 for both
Returns
-------
bool
True if the line is consistent with its neighbors, False otherwise.
"""
curr_line_font_size = curr_line["spans"][0]["size"]
curr_line_font_type = curr_line["spans"][0]["font"].lower()
if consistent_direction == 0:
if prev_line:
prev_line_font_size = prev_line["spans"][0]["size"]
prev_line_font_type = prev_line["spans"][0]["font"].lower()
return curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type
else:
return False
elif consistent_direction == 1:
if next_line:
next_line_font_size = next_line["spans"][0]["size"]
next_line_font_type = next_line["spans"][0]["font"].lower()
return curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
else:
return False
elif consistent_direction == 2:
if prev_line and next_line:
prev_line_font_size = prev_line["spans"][0]["size"]
prev_line_font_type = prev_line["spans"][0]["font"].lower()
next_line_font_size = next_line["spans"][0]["size"]
next_line_font_type = next_line["spans"][0]["font"].lower()
return (curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type) and (
curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
)
else:
return False
else:
return False
def _is_regular_line(self, curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_line_height):
"""
This function checks if the line is a regular line
Parameters
----------
curr_line_bbox : list
bbox of the current line
prev_line_bbox : list
bbox of the previous line
next_line_bbox : list
bbox of the next line
avg_char_width : float
average of char widths
X0 : float
median of x0 values, which represents the left average boundary of the page
X1 : float
median of x1 values, which represents the right average boundary of the page
avg_line_height : float
average of line heights
Returns
-------
bool
True if the line is a regular line, False otherwise.
"""
horizontal_ratio = 0.5
vertical_ratio = 0.5
horizontal_thres = horizontal_ratio * avg_char_width
vertical_thres = vertical_ratio * avg_line_height
x0, y0, x1, y1 = curr_line_bbox
x0_near_X0 = abs(x0 - X0) < horizontal_thres
x1_near_X1 = abs(x1 - X1) < horizontal_thres
prev_line_is_end_of_para = prev_line_bbox and (abs(prev_line_bbox[2] - X1) > avg_char_width)
sufficient_spacing_above = False
if prev_line_bbox:
vertical_spacing_above = y1 - prev_line_bbox[3]
sufficient_spacing_above = vertical_spacing_above > vertical_thres
sufficient_spacing_below = False
if next_line_bbox:
vertical_spacing_below = next_line_bbox[1] - y0
sufficient_spacing_below = vertical_spacing_below > vertical_thres
return (
(sufficient_spacing_above or sufficient_spacing_below)
or (not x0_near_X0 and not x1_near_X1)
or prev_line_is_end_of_para
)
def _is_possible_start_of_para(self, curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size):
"""
This function checks if the line is a possible start of a paragraph
Parameters
----------
curr_line : dict
current line
prev_line : dict
previous line
next_line : dict
next line
X0 : float
median of x0 values, which represents the left average boundary of the page
X1 : float
median of x1 values, which represents the right average boundary of the page
avg_char_width : float
average of char widths
avg_line_height : float
average of line heights
Returns
-------
bool
True if the line is a possible start of a paragraph, False otherwise.
"""
start_confidence = 0.5 # Initial confidence of the line being a start of a paragraph
decision_path = [] # Record the decision path
curr_line_bbox = curr_line["bbox"]
prev_line_bbox = prev_line["bbox"] if prev_line else None
next_line_bbox = next_line["bbox"] if next_line else None
indent_ratio = 1
vertical_ratio = 1.5
vertical_thres = vertical_ratio * avg_font_size
left_horizontal_ratio = 0.5
left_horizontal_thres = left_horizontal_ratio * avg_char_width
right_horizontal_ratio = 2.5
right_horizontal_thres = right_horizontal_ratio * avg_char_width
x0, y0, x1, y1 = curr_line_bbox
indent_condition = x0 > X0 + indent_ratio * avg_char_width
if indent_condition:
start_confidence += 0.2
decision_path.append("indent_condition_met")
x0_near_X0 = abs(x0 - X0) < left_horizontal_thres
if x0_near_X0:
start_confidence += 0.1
decision_path.append("x0_near_X0")
x1_near_X1 = abs(x1 - X1) < right_horizontal_thres
if x1_near_X1:
start_confidence += 0.1
decision_path.append("x1_near_X1")
if prev_line is None:
prev_line_is_end_of_para = True
start_confidence += 0.2
decision_path.append("no_prev_line")
else:
prev_line_is_end_of_para, _, _ = self._is_possible_end_of_para(prev_line, next_line, X0, X1, avg_char_width)
if prev_line_is_end_of_para:
start_confidence += 0.1
decision_path.append("prev_line_is_end_of_para")
sufficient_spacing_above = False
if prev_line_bbox:
vertical_spacing_above = y1 - prev_line_bbox[3]
sufficient_spacing_above = vertical_spacing_above > vertical_thres
if sufficient_spacing_above:
start_confidence += 0.2
decision_path.append("sufficient_spacing_above")
sufficient_spacing_below = False
if next_line_bbox:
vertical_spacing_below = next_line_bbox[1] - y0
sufficient_spacing_below = vertical_spacing_below > vertical_thres
if sufficient_spacing_below:
start_confidence += 0.2
decision_path.append("sufficient_spacing_below")
is_regular_line = self._is_regular_line(
curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_font_size
)
if is_regular_line:
start_confidence += 0.1
decision_path.append("is_regular_line")
is_start_of_para = (
(sufficient_spacing_above or sufficient_spacing_below)
or (indent_condition)
or (not indent_condition and x0_near_X0 and x1_near_X1 and not is_regular_line)
or prev_line_is_end_of_para
)
return (is_start_of_para, start_confidence, decision_path)
def _is_possible_end_of_para(self, curr_line, next_line, X0, X1, avg_char_width):
"""
This function checks if the line is a possible end of a paragraph
Parameters
----------
curr_line : dict
current line
next_line : dict
next line
X0 : float
median of x0 values, which represents the left average boundary of the page
X1 : float
median of x1 values, which represents the right average boundary of the page
avg_char_width : float
average of char widths
Returns
-------
bool
True if the line is a possible end of a paragraph, False otherwise.
"""
end_confidence = 0.5 # Initial confidence of the line being a end of a paragraph
decision_path = [] # Record the decision path
curr_line_bbox = curr_line["bbox"]
next_line_bbox = next_line["bbox"] if next_line else None
left_horizontal_ratio = 0.5
right_horizontal_ratio = 0.5
x0, _, x1, y1 = curr_line_bbox
next_x0, next_y0, _, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
x0_near_X0 = abs(x0 - X0) < left_horizontal_ratio * avg_char_width
if x0_near_X0:
end_confidence += 0.1
decision_path.append("x0_near_X0")
x1_smaller_than_X1 = x1 < X1 - right_horizontal_ratio * avg_char_width
if x1_smaller_than_X1:
end_confidence += 0.1
decision_path.append("x1_smaller_than_X1")
next_line_is_start_of_para = (
next_line_bbox
and (next_x0 > X0 + left_horizontal_ratio * avg_char_width)
and (not is_line_left_aligned_from_neighbors(curr_line_bbox, None, next_line_bbox, avg_char_width, direction=1))
)
if next_line_is_start_of_para:
end_confidence += 0.2
decision_path.append("next_line_is_start_of_para")
is_line_left_aligned_from_neighbors_bool = is_line_left_aligned_from_neighbors(
curr_line_bbox, None, next_line_bbox, avg_char_width
)
if is_line_left_aligned_from_neighbors_bool:
end_confidence += 0.1
decision_path.append("line_is_left_aligned_from_neighbors")
is_line_right_aligned_from_neighbors_bool = is_line_right_aligned_from_neighbors(
curr_line_bbox, None, next_line_bbox, avg_char_width
)
if not is_line_right_aligned_from_neighbors_bool:
end_confidence += 0.1
decision_path.append("line_is_not_right_aligned_from_neighbors")
is_end_of_para = end_with_punctuation(curr_line["text"]) and (
(x0_near_X0 and x1_smaller_than_X1)
or (is_line_left_aligned_from_neighbors_bool and not is_line_right_aligned_from_neighbors_bool)
)
return (is_end_of_para, end_confidence, decision_path)
def _cut_paras_per_block(
self,
block,
):
"""
Processes a raw block from PyMuPDF and returns the processed block.
Parameters
----------
raw_block : dict
A raw block from pymupdf.
Returns
-------
processed_block : dict
"""
def _construct_para(lines, is_block_title, para_title_level):
"""
Construct a paragraph from given lines.
"""
font_sizes = [span["size"] for line in lines for span in line["spans"]]
avg_font_size = sum(font_sizes) / len(font_sizes) if font_sizes else 0
font_colors = [span["color"] for line in lines for span in line["spans"]]
most_common_font_color = max(set(font_colors), key=font_colors.count) if font_colors else None
# font_types = [span["font"] for line in lines for span in line["spans"]]
# most_common_font_type = max(set(font_types), key=font_types.count) if font_types else None
font_type_lengths = {}
for line in lines:
for span in line["spans"]:
font_type = span["font"]
bbox_width = span["bbox"][2] - span["bbox"][0]
if font_type in font_type_lengths:
font_type_lengths[font_type] += bbox_width
else:
font_type_lengths[font_type] = bbox_width
# get the font type with the longest bbox width
most_common_font_type = max(font_type_lengths, key=font_type_lengths.get) if font_type_lengths else None # type: ignore
para_bbox = calculate_para_bbox(lines)
para_text = " ".join(line["text"] for line in lines)
return {
"para_bbox": para_bbox,
"para_text": para_text,
"para_font_type": most_common_font_type,
"para_font_size": avg_font_size,
"para_font_color": most_common_font_color,
"is_para_title": is_block_title,
"para_title_level": para_title_level,
}
block_bbox = block["bbox"]
block_text = block["text"]
block_lines = block["lines"]
X0 = safe_get(block, "X0", 0)
X1 = safe_get(block, "X1", 0)
avg_char_width = safe_get(block, "avg_char_width", 0)
avg_char_height = safe_get(block, "avg_char_height", 0)
avg_font_size = safe_get(block, "avg_font_size", 0)
is_block_title = safe_get(block, "is_block_title", False)
para_title_level = safe_get(block, "block_title_level", 0)
# Segment into paragraphs
para_ranges = []
in_paragraph = False
start_idx_of_para = None
# Create the processed paragraphs
processed_paras = {}
para_bboxes = []
end_idx_of_para = 0
for line_index, line in enumerate(block_lines):
curr_line = line
prev_line = block_lines[line_index - 1] if line_index > 0 else None
next_line = block_lines[line_index + 1] if line_index < len(block_lines) - 1 else None
"""
Start processing paragraphs.
"""
# Check if the line is the start of a paragraph
is_start_of_para, start_confidence, decision_path = self._is_possible_start_of_para(
curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size
)
if not in_paragraph and is_start_of_para:
in_paragraph = True
start_idx_of_para = line_index
# print_green(">>> Start of a paragraph")
# print(" curr_line_text: ", curr_line["text"])
# print(" start_confidence: ", start_confidence)
# print(" decision_path: ", decision_path)
# Check if the line is the end of a paragraph
is_end_of_para, end_confidence, decision_path = self._is_possible_end_of_para(
curr_line, next_line, X0, X1, avg_char_width
)
if in_paragraph and (is_end_of_para or not next_line):
para_ranges.append((start_idx_of_para, line_index))
start_idx_of_para = None
in_paragraph = False
# print_red(">>> End of a paragraph")
# print(" curr_line_text: ", curr_line["text"])
# print(" end_confidence: ", end_confidence)
# print(" decision_path: ", decision_path)
# Add the last paragraph if it is not added
if in_paragraph and start_idx_of_para is not None:
para_ranges.append((start_idx_of_para, len(block_lines) - 1))
# Process the matched paragraphs
for para_index, (start_idx, end_idx) in enumerate(para_ranges):
matched_lines = block_lines[start_idx : end_idx + 1]
para_properties = _construct_para(matched_lines, is_block_title, para_title_level)
para_key = f"para_{len(processed_paras)}"
processed_paras[para_key] = para_properties
para_bboxes.append(para_properties["para_bbox"])
end_idx_of_para = end_idx + 1
# Deal with the remaining lines
if end_idx_of_para < len(block_lines):
unmatched_lines = block_lines[end_idx_of_para:]
unmatched_properties = _construct_para(unmatched_lines, is_block_title, para_title_level)
unmatched_key = f"para_{len(processed_paras)}"
processed_paras[unmatched_key] = unmatched_properties
para_bboxes.append(unmatched_properties["para_bbox"])
block["paras"] = processed_paras
return block
def batch_process_blocks(self, pdf_dict):
"""
Parses the blocks of all pages.
Parameters
----------
pdf_dict : dict
PDF dictionary.
filter_blocks : list
List of bounding boxes to filter.
Returns
-------
result_dict : dict
Result dictionary.
"""
num_paras = 0
for page_id, page in pdf_dict.items():
if page_id.startswith("page_"):
para_blocks = []
if "para_blocks" in page.keys():
input_blocks = page["para_blocks"]
for input_block in input_blocks:
new_block = self._cut_paras_per_block(input_block)
para_blocks.append(new_block)
num_paras += len(new_block["paras"])
page["para_blocks"] = para_blocks
pdf_dict["statistics"]["num_paras"] = num_paras
return pdf_dict
import sys
from magic_pdf.libs.commons import fitz
from termcolor import cprint
if sys.version_info[0] >= 3:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore
def open_pdf(pdf_path):
try:
pdf_document = fitz.open(pdf_path) # type: ignore
return pdf_document
except Exception as e:
print(f"无法打开PDF文件:{pdf_path}。原因是:{e}")
raise e
def print_green_on_red(text):
cprint(text, "green", "on_red", attrs=["bold"], end="\n\n")
def print_green(text):
print()
cprint(text, "green", attrs=["bold"], end="\n\n")
def print_red(text):
print()
cprint(text, "red", attrs=["bold"], end="\n\n")
def print_yellow(text):
print()
cprint(text, "yellow", attrs=["bold"], end="\n\n")
def safe_get(dict_obj, key, default):
val = dict_obj.get(key)
if val is None:
return default
else:
return val
def is_bbox_overlap(bbox1, bbox2):
"""
This function checks if bbox1 and bbox2 overlap or not
Parameters
----------
bbox1 : list
bbox1
bbox2 : list
bbox2
Returns
-------
bool
True if bbox1 and bbox2 overlap, else False
"""
x0_1, y0_1, x1_1, y1_1 = bbox1
x0_2, y0_2, x1_2, y1_2 = bbox2
if x0_1 > x1_2 or x0_2 > x1_1:
return False
if y0_1 > y1_2 or y0_2 > y1_1:
return False
return True
def is_in_bbox(bbox1, bbox2):
"""
This function checks if bbox1 is in bbox2
Parameters
----------
bbox1 : list
bbox1
bbox2 : list
bbox2
Returns
-------
bool
True if bbox1 is in bbox2, else False
"""
x0_1, y0_1, x1_1, y1_1 = bbox1
x0_2, y0_2, x1_2, y1_2 = bbox2
if x0_1 >= x0_2 and y0_1 >= y0_2 and x1_1 <= x1_2 and y1_1 <= y1_2:
return True
else:
return False
def calculate_para_bbox(lines):
"""
This function calculates the minimum bbox of the paragraph
Parameters
----------
lines : list
lines
Returns
-------
para_bbox : list
bbox of the paragraph
"""
x0 = min(line["bbox"][0] for line in lines)
y0 = min(line["bbox"][1] for line in lines)
x1 = max(line["bbox"][2] for line in lines)
y1 = max(line["bbox"][3] for line in lines)
return [x0, y0, x1, y1]
def is_line_right_aligned_from_neighbors(curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, direction=2):
"""
This function checks if the line is right aligned from its neighbors
Parameters
----------
curr_line_bbox : list
bbox of the current line
prev_line_bbox : list
bbox of the previous line
next_line_bbox : list
bbox of the next line
avg_char_width : float
average of char widths
direction : int
0 for prev, 1 for next, 2 for both
Returns
-------
bool
True if the line is right aligned from its neighbors, False otherwise.
"""
horizontal_ratio = 0.5
horizontal_thres = horizontal_ratio * avg_char_width
_, _, x1, _ = curr_line_bbox
_, _, prev_x1, _ = prev_line_bbox if prev_line_bbox else (0, 0, 0, 0)
_, _, next_x1, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
if direction == 0:
return abs(x1 - prev_x1) < horizontal_thres
elif direction == 1:
return abs(x1 - next_x1) < horizontal_thres
elif direction == 2:
return abs(x1 - prev_x1) < horizontal_thres and abs(x1 - next_x1) < horizontal_thres
else:
return False
def is_line_left_aligned_from_neighbors(curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, direction=2):
"""
This function checks if the line is left aligned from its neighbors
Parameters
----------
curr_line_bbox : list
bbox of the current line
prev_line_bbox : list
bbox of the previous line
next_line_bbox : list
bbox of the next line
avg_char_width : float
average of char widths
direction : int
0 for prev, 1 for next, 2 for both
Returns
-------
bool
True if the line is left aligned from its neighbors, False otherwise.
"""
horizontal_ratio = 0.5
horizontal_thres = horizontal_ratio * avg_char_width
x0, _, _, _ = curr_line_bbox
prev_x0, _, _, _ = prev_line_bbox if prev_line_bbox else (0, 0, 0, 0)
next_x0, _, _, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
if direction == 0:
return abs(x0 - prev_x0) < horizontal_thres
elif direction == 1:
return abs(x0 - next_x0) < horizontal_thres
elif direction == 2:
return abs(x0 - prev_x0) < horizontal_thres and abs(x0 - next_x0) < horizontal_thres
else:
return False
def end_with_punctuation(line_text):
"""
This function checks if the line ends with punctuation marks
"""
english_end_puncs = [".", "?", "!"]
chinese_end_puncs = ["。", "?", "!"]
end_puncs = english_end_puncs + chinese_end_puncs
last_non_space_char = None
for ch in line_text[::-1]:
if not ch.isspace():
last_non_space_char = ch
break
if last_non_space_char is None:
return False
return last_non_space_char in end_puncs
def is_nested_list(lst):
if isinstance(lst, list):
return any(isinstance(sub, list) for sub in lst)
return False
import math
from collections import defaultdict
from magic_pdf.para.commons import *
if sys.version_info[0] >= 3:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore
class HeaderFooterProcessor:
def __init__(self) -> None:
pass
def get_most_common_bboxes(self, bboxes, page_height, position="top", threshold=0.25, num_bboxes=3, min_frequency=2):
"""
This function gets the most common bboxes from the bboxes
Parameters
----------
bboxes : list
bboxes
page_height : float
height of the page
position : str, optional
"top" or "bottom", by default "top"
threshold : float, optional
threshold, by default 0.25
num_bboxes : int, optional
number of bboxes to return, by default 3
min_frequency : int, optional
minimum frequency of the bbox, by default 2
Returns
-------
common_bboxes : list
common bboxes
"""
# Filter bbox by position
if position == "top":
filtered_bboxes = [bbox for bbox in bboxes if bbox[1] < page_height * threshold]
else:
filtered_bboxes = [bbox for bbox in bboxes if bbox[3] > page_height * (1 - threshold)]
# Find the most common bbox
bbox_count = defaultdict(int)
for bbox in filtered_bboxes:
bbox_count[tuple(bbox)] += 1
# Get the most frequently occurring bbox, but only consider it when the frequency exceeds min_frequency
common_bboxes = [
bbox for bbox, count in sorted(bbox_count.items(), key=lambda item: item[1], reverse=True) if count >= min_frequency
][:num_bboxes]
return common_bboxes
def detect_footer_header(self, result_dict, similarity_threshold=0.5):
"""
This function detects the header and footer of the document.
Parameters
----------
result_dict : dict
result dictionary
Returns
-------
result_dict : dict
result dictionary
"""
def compare_bbox_with_list(bbox, bbox_list, tolerance=1):
return any(all(abs(a - b) < tolerance for a, b in zip(bbox, common_bbox)) for common_bbox in bbox_list)
def is_single_line_block(block):
# Determine based on the width and height of the block
block_width = block["X1"] - block["X0"]
block_height = block["bbox"][3] - block["bbox"][1]
# If the height of the block is close to the average character height and the width is large, it is considered a single line
return block_height <= block["avg_char_height"] * 3 and block_width > block["avg_char_width"] * 3
# Traverse all blocks in the document
single_preproc_blocks = 0
total_blocks = 0
single_preproc_blocks = 0
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
for block_key, block in blocks.items():
if block_key.startswith("block_"):
total_blocks += 1
if is_single_line_block(block):
single_preproc_blocks += 1
# If there are no blocks, skip the header and footer detection
if total_blocks == 0:
print("No blocks found. Skipping header/footer detection.")
return result_dict
# If most of the blocks are single-line, skip the header and footer detection
if single_preproc_blocks / total_blocks > 0.5: # 50% of the blocks are single-line
return result_dict
# Collect the bounding boxes of all blocks
all_bboxes = []
all_texts = []
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
for block_key, block in blocks.items():
if block_key.startswith("block_"):
all_bboxes.append(block["bbox"])
# Get the height of the page
page_height = max(bbox[3] for bbox in all_bboxes)
# Get the most common bbox lists for headers and footers
common_header_bboxes = self.get_most_common_bboxes(all_bboxes, page_height, position="top") if all_bboxes else []
common_footer_bboxes = self.get_most_common_bboxes(all_bboxes, page_height, position="bottom") if all_bboxes else []
# Detect and mark headers and footers
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
for block_key, block in blocks.items():
if block_key.startswith("block_"):
bbox = block["bbox"]
text = block["text"]
is_header = compare_bbox_with_list(bbox, common_header_bboxes)
is_footer = compare_bbox_with_list(bbox, common_footer_bboxes)
block["is_header"] = int(is_header)
block["is_footer"] = int(is_footer)
return result_dict
class NonHorizontalTextProcessor:
def __init__(self) -> None:
pass
def detect_non_horizontal_texts(self, result_dict):
"""
This function detects watermarks and vertical margin notes in the document.
Watermarks are identified by finding blocks with the same coordinates and frequently occurring identical texts across multiple pages.
If these conditions are met, the blocks are highly likely to be watermarks, as opposed to headers or footers, which can change from page to page.
If the direction of these blocks is not horizontal, they are definitely considered to be watermarks.
Vertical margin notes are identified by finding blocks with the same coordinates and frequently occurring identical texts across multiple pages.
If these conditions are met, the blocks are highly likely to be vertical margin notes, which typically appear on the left and right sides of the page.
If the direction of these blocks is vertical, they are definitely considered to be vertical margin notes.
Parameters
----------
result_dict : dict
The result dictionary.
Returns
-------
result_dict : dict
The updated result dictionary.
"""
# Dictionary to store information about potential watermarks
potential_watermarks = {}
potential_margin_notes = {}
for page_id, page_content in result_dict.items():
if page_id.startswith("page_"):
for block_id, block_data in page_content.items():
if block_id.startswith("block_"):
if "dir" in block_data:
coordinates_text = (block_data["bbox"], block_data["text"]) # Tuple of coordinates and text
angle = math.atan2(block_data["dir"][1], block_data["dir"][0])
angle = abs(math.degrees(angle))
if angle > 5 and angle < 85: # Check if direction is watermarks
if coordinates_text in potential_watermarks:
potential_watermarks[coordinates_text] += 1
else:
potential_watermarks[coordinates_text] = 1
if angle > 85 and angle < 105: # Check if direction is vertical
if coordinates_text in potential_margin_notes:
potential_margin_notes[coordinates_text] += 1 # Increment count
else:
potential_margin_notes[coordinates_text] = 1 # Initialize count
# Identify watermarks by finding entries with counts higher than a threshold (e.g., appearing on more than half of the pages)
watermark_threshold = len(result_dict) // 2
watermarks = {k: v for k, v in potential_watermarks.items() if v > watermark_threshold}
# Identify margin notes by finding entries with counts higher than a threshold (e.g., appearing on more than half of the pages)
margin_note_threshold = len(result_dict) // 2
margin_notes = {k: v for k, v in potential_margin_notes.items() if v > margin_note_threshold}
# Add watermark information to the result dictionary
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
for block_id, block_data in blocks.items():
coordinates_text = (block_data["bbox"], block_data["text"])
if coordinates_text in watermarks:
block_data["is_watermark"] = 1
else:
block_data["is_watermark"] = 0
if coordinates_text in margin_notes:
block_data["is_vertical_margin_note"] = 1
else:
block_data["is_vertical_margin_note"] = 0
return result_dict
class NoiseRemover:
def __init__(self) -> None:
pass
def skip_data_noises(self, result_dict):
"""
This function skips the data noises, including overlap blocks, header, footer, watermark, vertical margin note, title
"""
filtered_result_dict = {}
for page_id, blocks in result_dict.items():
if page_id.startswith("page_"):
filtered_blocks = {}
for block_id, block in blocks.items():
if block_id.startswith("block_"):
if any(
block.get(key, 0)
for key in [
"is_overlap",
"is_header",
"is_footer",
"is_watermark",
"is_vertical_margin_note",
"is_block_title",
]
):
continue
filtered_blocks[block_id] = block
if filtered_blocks:
filtered_result_dict[page_id] = filtered_blocks
return filtered_result_dict
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment