Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
wangsen
MinerU
Commits
d8823885
Unverified
Commit
d8823885
authored
Nov 26, 2024
by
Xiaomeng Zhao
Committed by
GitHub
Nov 26, 2024
Browse files
Merge pull request #1100 from myhloli/dev
refactor(pre_proc): remove unused functions and simplify code
parents
e6da37dd
21fa7819
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
5 additions
and
1077 deletions
+5
-1077
magic_pdf/pdf_parse_union_core.py
magic_pdf/pdf_parse_union_core.py
+0
-345
magic_pdf/pdf_parse_union_core_v2.py
magic_pdf/pdf_parse_union_core_v2.py
+1
-2
magic_pdf/pre_proc/construct_page_dict.py
magic_pdf/pre_proc/construct_page_dict.py
+0
-55
magic_pdf/pre_proc/cut_image.py
magic_pdf/pre_proc/cut_image.py
+0
-37
magic_pdf/pre_proc/ocr_detect_all_bboxes.py
magic_pdf/pre_proc/ocr_detect_all_bboxes.py
+0
-173
magic_pdf/pre_proc/ocr_dict_merge.py
magic_pdf/pre_proc/ocr_dict_merge.py
+2
-213
magic_pdf/pre_proc/ocr_span_list_modify.py
magic_pdf/pre_proc/ocr_span_list_modify.py
+2
-252
No files found.
magic_pdf/pdf_parse_union_core.py
deleted
100644 → 0
View file @
e6da37dd
import
time
from
loguru
import
logger
from
magic_pdf.config.drop_reason
import
DropReason
from
magic_pdf.config.ocr_content_type
import
ContentType
from
magic_pdf.layout.layout_sort
import
(
LAYOUT_UNPROC
,
get_bboxes_layout
,
get_columns_cnt_of_layout
)
from
magic_pdf.libs.commons
import
fitz
,
get_delta_time
from
magic_pdf.libs.convert_utils
import
dict_to_list
from
magic_pdf.libs.hash_utils
import
compute_md5
from
magic_pdf.libs.local_math
import
float_equal
from
magic_pdf.model.magic_model
import
MagicModel
from
magic_pdf.para.para_split_v2
import
para_split
from
magic_pdf.pre_proc.citationmarker_remove
import
remove_citation_marker
from
magic_pdf.pre_proc.construct_page_dict
import
\
ocr_construct_page_component_v2
from
magic_pdf.pre_proc.cut_image
import
ocr_cut_image_and_table
from
magic_pdf.pre_proc.equations_replace
import
(
combine_chars_to_pymudict
,
remove_chars_in_text_blocks
,
replace_equations_in_textblock
)
from
magic_pdf.pre_proc.ocr_detect_all_bboxes
import
\
ocr_prepare_bboxes_for_layout_split
from
magic_pdf.pre_proc.ocr_dict_merge
import
(
fill_spans_in_blocks
,
fix_block_spans
,
fix_discarded_block
,
sort_blocks_by_layout
)
from
magic_pdf.pre_proc.ocr_span_list_modify
import
(
get_qa_need_list_v2
,
remove_overlaps_low_confidence_spans
,
remove_overlaps_min_spans
)
from
magic_pdf.pre_proc.resolve_bbox_conflict
import
\
check_useful_block_horizontal_overlap
def
remove_horizontal_overlap_block_which_smaller
(
all_bboxes
):
useful_blocks
=
[]
for
bbox
in
all_bboxes
:
useful_blocks
.
append
({
'bbox'
:
bbox
[:
4
]})
is_useful_block_horz_overlap
,
smaller_bbox
,
bigger_bbox
=
(
check_useful_block_horizontal_overlap
(
useful_blocks
)
)
if
is_useful_block_horz_overlap
:
logger
.
warning
(
f
'skip this page, reason:
{
DropReason
.
USEFUL_BLOCK_HOR_OVERLAP
}
, smaller bbox is
{
smaller_bbox
}
, bigger bbox is
{
bigger_bbox
}
'
)
for
bbox
in
all_bboxes
.
copy
():
if
smaller_bbox
==
bbox
[:
4
]:
all_bboxes
.
remove
(
bbox
)
return
is_useful_block_horz_overlap
,
all_bboxes
def
__replace_STX_ETX
(
text_str
:
str
):
"""Replace
\u0002
and
\u0003
, as these characters become garbled when extracted using pymupdf. In fact, they were originally quotation marks.
Drawback: This issue is only observed in English text; it has not been found in Chinese text so far.
Args:
text_str (str): raw text
Returns:
_type_: replaced text
"""
if
text_str
:
s
=
text_str
.
replace
(
'
\u0002
'
,
"'"
)
s
=
s
.
replace
(
'
\u0003
'
,
"'"
)
return
s
return
text_str
def
txt_spans_extract
(
pdf_page
,
inline_equations
,
interline_equations
):
text_raw_blocks
=
pdf_page
.
get_text
(
'dict'
,
flags
=
fitz
.
TEXTFLAGS_TEXT
)[
'blocks'
]
char_level_text_blocks
=
pdf_page
.
get_text
(
'rawdict'
,
flags
=
fitz
.
TEXTFLAGS_TEXT
)[
'blocks'
]
text_blocks
=
combine_chars_to_pymudict
(
text_raw_blocks
,
char_level_text_blocks
)
text_blocks
=
replace_equations_in_textblock
(
text_blocks
,
inline_equations
,
interline_equations
)
text_blocks
=
remove_citation_marker
(
text_blocks
)
text_blocks
=
remove_chars_in_text_blocks
(
text_blocks
)
spans
=
[]
for
v
in
text_blocks
:
for
line
in
v
[
'lines'
]:
for
span
in
line
[
'spans'
]:
bbox
=
span
[
'bbox'
]
if
float_equal
(
bbox
[
0
],
bbox
[
2
])
or
float_equal
(
bbox
[
1
],
bbox
[
3
]):
continue
if
span
.
get
(
'type'
)
not
in
(
ContentType
.
InlineEquation
,
ContentType
.
InterlineEquation
,
):
spans
.
append
(
{
'bbox'
:
list
(
span
[
'bbox'
]),
'content'
:
__replace_STX_ETX
(
span
[
'text'
]),
'type'
:
ContentType
.
Text
,
'score'
:
1.0
,
}
)
return
spans
def
replace_text_span
(
pymu_spans
,
ocr_spans
):
return
list
(
filter
(
lambda
x
:
x
[
'type'
]
!=
ContentType
.
Text
,
ocr_spans
))
+
pymu_spans
def
parse_page_core
(
pdf_docs
,
magic_model
,
page_id
,
pdf_bytes_md5
,
imageWriter
,
parse_mode
):
need_drop
=
False
drop_reason
=
[]
"""从magic_model对象中获取后面会用到的区块信息"""
img_blocks
=
magic_model
.
get_imgs
(
page_id
)
table_blocks
=
magic_model
.
get_tables
(
page_id
)
discarded_blocks
=
magic_model
.
get_discarded
(
page_id
)
text_blocks
=
magic_model
.
get_text_blocks
(
page_id
)
title_blocks
=
magic_model
.
get_title_blocks
(
page_id
)
inline_equations
,
interline_equations
,
interline_equation_blocks
=
(
magic_model
.
get_equations
(
page_id
)
)
page_w
,
page_h
=
magic_model
.
get_page_size
(
page_id
)
spans
=
magic_model
.
get_all_spans
(
page_id
)
"""根据parse_mode,构造spans"""
if
parse_mode
==
'txt'
:
"""ocr 中文本类的 span 用 pymu spans 替换!"""
pymu_spans
=
txt_spans_extract
(
pdf_docs
[
page_id
],
inline_equations
,
interline_equations
)
spans
=
replace_text_span
(
pymu_spans
,
spans
)
elif
parse_mode
==
'ocr'
:
pass
else
:
raise
Exception
(
'parse_mode must be txt or ocr'
)
"""删除重叠spans中置信度较低的那些"""
spans
,
dropped_spans_by_confidence
=
remove_overlaps_low_confidence_spans
(
spans
)
"""删除重叠spans中较小的那些"""
spans
,
dropped_spans_by_span_overlap
=
remove_overlaps_min_spans
(
spans
)
"""对image和table截图"""
spans
=
ocr_cut_image_and_table
(
spans
,
pdf_docs
[
page_id
],
page_id
,
pdf_bytes_md5
,
imageWriter
)
"""将所有区块的bbox整理到一起"""
# interline_equation_blocks参数不够准,后面切换到interline_equations上
interline_equation_blocks
=
[]
if
len
(
interline_equation_blocks
)
>
0
:
all_bboxes
,
all_discarded_blocks
,
drop_reasons
=
(
ocr_prepare_bboxes_for_layout_split
(
img_blocks
,
table_blocks
,
discarded_blocks
,
text_blocks
,
title_blocks
,
interline_equation_blocks
,
page_w
,
page_h
,
)
)
else
:
all_bboxes
,
all_discarded_blocks
,
drop_reasons
=
(
ocr_prepare_bboxes_for_layout_split
(
img_blocks
,
table_blocks
,
discarded_blocks
,
text_blocks
,
title_blocks
,
interline_equations
,
page_w
,
page_h
,
)
)
if
len
(
drop_reasons
)
>
0
:
need_drop
=
True
drop_reason
.
append
(
DropReason
.
OVERLAP_BLOCKS_CAN_NOT_SEPARATION
)
"""先处理不需要排版的discarded_blocks"""
discarded_block_with_spans
,
spans
=
fill_spans_in_blocks
(
all_discarded_blocks
,
spans
,
0.4
)
fix_discarded_blocks
=
fix_discarded_block
(
discarded_block_with_spans
)
"""如果当前页面没有bbox则跳过"""
if
len
(
all_bboxes
)
==
0
:
logger
.
warning
(
f
'skip this page, not found useful bbox, page_id:
{
page_id
}
'
)
return
ocr_construct_page_component_v2
(
[],
[],
page_id
,
page_w
,
page_h
,
[],
[],
[],
interline_equations
,
fix_discarded_blocks
,
need_drop
,
drop_reason
,
)
"""在切分之前,先检查一下bbox是否有左右重叠的情况,如果有,那么就认为这个pdf暂时没有能力处理好,这种左右重叠的情况大概率是由于pdf里的行间公式、表格没有被正确识别出来造成的 """
while
True
:
# 循环检查左右重叠的情况,如果存在就删除掉较小的那个bbox,直到不存在左右重叠的情况
is_useful_block_horz_overlap
,
all_bboxes
=
(
remove_horizontal_overlap_block_which_smaller
(
all_bboxes
)
)
if
is_useful_block_horz_overlap
:
need_drop
=
True
drop_reason
.
append
(
DropReason
.
USEFUL_BLOCK_HOR_OVERLAP
)
else
:
break
"""根据区块信息计算layout"""
page_boundry
=
[
0
,
0
,
page_w
,
page_h
]
layout_bboxes
,
layout_tree
=
get_bboxes_layout
(
all_bboxes
,
page_boundry
,
page_id
)
if
len
(
text_blocks
)
>
0
and
len
(
all_bboxes
)
>
0
and
len
(
layout_bboxes
)
==
0
:
logger
.
warning
(
f
'skip this page, page_id:
{
page_id
}
, reason:
{
DropReason
.
CAN_NOT_DETECT_PAGE_LAYOUT
}
'
)
need_drop
=
True
drop_reason
.
append
(
DropReason
.
CAN_NOT_DETECT_PAGE_LAYOUT
)
"""以下去掉复杂的布局和超过2列的布局"""
if
any
(
[
lay
[
'layout_label'
]
==
LAYOUT_UNPROC
for
lay
in
layout_bboxes
]
):
# 复杂的布局
logger
.
warning
(
f
'skip this page, page_id:
{
page_id
}
, reason:
{
DropReason
.
COMPLICATED_LAYOUT
}
'
)
need_drop
=
True
drop_reason
.
append
(
DropReason
.
COMPLICATED_LAYOUT
)
layout_column_width
=
get_columns_cnt_of_layout
(
layout_tree
)
if
layout_column_width
>
2
:
# 去掉超过2列的布局pdf
logger
.
warning
(
f
'skip this page, page_id:
{
page_id
}
, reason:
{
DropReason
.
TOO_MANY_LAYOUT_COLUMNS
}
'
)
need_drop
=
True
drop_reason
.
append
(
DropReason
.
TOO_MANY_LAYOUT_COLUMNS
)
"""根据layout顺序,对当前页面所有需要留下的block进行排序"""
sorted_blocks
=
sort_blocks_by_layout
(
all_bboxes
,
layout_bboxes
)
"""将span填入排好序的blocks中"""
block_with_spans
,
spans
=
fill_spans_in_blocks
(
sorted_blocks
,
spans
,
0.3
)
"""对block进行fix操作"""
fix_blocks
=
fix_block_spans
(
block_with_spans
,
img_blocks
,
table_blocks
)
"""获取QA需要外置的list"""
images
,
tables
,
interline_equations
=
get_qa_need_list_v2
(
fix_blocks
)
"""构造pdf_info_dict"""
page_info
=
ocr_construct_page_component_v2
(
fix_blocks
,
layout_bboxes
,
page_id
,
page_w
,
page_h
,
layout_tree
,
images
,
tables
,
interline_equations
,
fix_discarded_blocks
,
need_drop
,
drop_reason
,
)
return
page_info
def
pdf_parse_union
(
pdf_bytes
,
model_list
,
imageWriter
,
parse_mode
,
start_page_id
=
0
,
end_page_id
=
None
,
debug_mode
=
False
,
):
pdf_bytes_md5
=
compute_md5
(
pdf_bytes
)
pdf_docs
=
fitz
.
open
(
'pdf'
,
pdf_bytes
)
"""初始化空的pdf_info_dict"""
pdf_info_dict
=
{}
"""用model_list和docs对象初始化magic_model"""
magic_model
=
MagicModel
(
model_list
,
pdf_docs
)
"""根据输入的起始范围解析pdf"""
# end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
end_page_id
=
(
end_page_id
if
end_page_id
is
not
None
and
end_page_id
>=
0
else
len
(
pdf_docs
)
-
1
)
if
end_page_id
>
len
(
pdf_docs
)
-
1
:
logger
.
warning
(
'end_page_id is out of range, use pdf_docs length'
)
end_page_id
=
len
(
pdf_docs
)
-
1
"""初始化启动时间"""
start_time
=
time
.
time
()
for
page_id
,
page
in
enumerate
(
pdf_docs
):
"""debug时输出每页解析的耗时."""
if
debug_mode
:
time_now
=
time
.
time
()
logger
.
info
(
f
'page_id:
{
page_id
}
, last_page_cost_time:
{
get_delta_time
(
start_time
)
}
'
)
start_time
=
time_now
"""解析pdf中的每一页"""
if
start_page_id
<=
page_id
<=
end_page_id
:
page_info
=
parse_page_core
(
pdf_docs
,
magic_model
,
page_id
,
pdf_bytes_md5
,
imageWriter
,
parse_mode
)
else
:
page_w
=
page
.
rect
.
width
page_h
=
page
.
rect
.
height
page_info
=
ocr_construct_page_component_v2
(
[],
[],
page_id
,
page_w
,
page_h
,
[],
[],
[],
[],
[],
True
,
'skip page'
)
pdf_info_dict
[
f
'page_
{
page_id
}
'
]
=
page_info
"""分段"""
para_split
(
pdf_info_dict
,
debug_mode
=
debug_mode
)
"""dict转list"""
pdf_info_list
=
dict_to_list
(
pdf_info_dict
)
new_pdf_info_dict
=
{
'pdf_info'
:
pdf_info_list
,
}
return
new_pdf_info_dict
if
__name__
==
'__main__'
:
pass
magic_pdf/pdf_parse_union_core_v2.py
View file @
d8823885
...
...
@@ -7,7 +7,6 @@ from typing import List
import
torch
from
loguru
import
logger
from
magic_pdf.config.drop_reason
import
DropReason
from
magic_pdf.config.enums
import
SupportedPdfParseMethod
from
magic_pdf.config.ocr_content_type
import
BlockType
,
ContentType
from
magic_pdf.data.dataset
import
Dataset
,
PageableData
...
...
@@ -17,7 +16,7 @@ from magic_pdf.libs.commons import fitz, get_delta_time
from
magic_pdf.libs.config_reader
import
get_local_layoutreader_model_dir
from
magic_pdf.libs.convert_utils
import
dict_to_list
from
magic_pdf.libs.hash_utils
import
compute_md5
from
magic_pdf.libs.local_math
import
float_equal
from
magic_pdf.libs.pdf_image_tools
import
cut_image_to_pil_image
from
magic_pdf.model.magic_model
import
MagicModel
...
...
magic_pdf/pre_proc/construct_page_dict.py
View file @
d8823885
def
construct_page_component
(
page_id
,
image_info
,
table_info
,
text_blocks_preproc
,
layout_bboxes
,
inline_eq_info
,
interline_eq_info
,
raw_pymu_blocks
,
removed_text_blocks
,
removed_image_blocks
,
images_backup
,
droped_table_block
,
table_backup
,
layout_tree
,
page_w
,
page_h
,
footnote_bboxes_tmp
):
"""
"""
return_dict
=
{}
return_dict
[
'para_blocks'
]
=
{}
return_dict
[
'preproc_blocks'
]
=
text_blocks_preproc
return_dict
[
'images'
]
=
image_info
return_dict
[
'tables'
]
=
table_info
return_dict
[
'interline_equations'
]
=
interline_eq_info
return_dict
[
'inline_equations'
]
=
inline_eq_info
return_dict
[
'layout_bboxes'
]
=
layout_bboxes
return_dict
[
'pymu_raw_blocks'
]
=
raw_pymu_blocks
return_dict
[
'global_statistic'
]
=
{}
return_dict
[
'droped_text_block'
]
=
removed_text_blocks
return_dict
[
'droped_image_block'
]
=
removed_image_blocks
return_dict
[
'droped_table_block'
]
=
[]
return_dict
[
'image_backup'
]
=
images_backup
return_dict
[
'table_backup'
]
=
[]
return_dict
[
'page_idx'
]
=
page_id
return_dict
[
'page_size'
]
=
[
page_w
,
page_h
]
return_dict
[
'_layout_tree'
]
=
layout_tree
# 辅助分析layout作用
return_dict
[
'footnote_bboxes_tmp'
]
=
footnote_bboxes_tmp
return
return_dict
def
ocr_construct_page_component
(
blocks
,
layout_bboxes
,
page_id
,
page_w
,
page_h
,
layout_tree
,
images
,
tables
,
interline_equations
,
inline_equations
,
dropped_text_block
,
dropped_image_block
,
dropped_table_block
,
dropped_equation_block
,
need_remove_spans_bboxes_dict
):
return_dict
=
{
'preproc_blocks'
:
blocks
,
'layout_bboxes'
:
layout_bboxes
,
'page_idx'
:
page_id
,
'page_size'
:
[
page_w
,
page_h
],
'_layout_tree'
:
layout_tree
,
'images'
:
images
,
'tables'
:
tables
,
'interline_equations'
:
interline_equations
,
'inline_equations'
:
inline_equations
,
'droped_text_block'
:
dropped_text_block
,
'droped_image_block'
:
dropped_image_block
,
'droped_table_block'
:
dropped_table_block
,
'dropped_equation_block'
:
dropped_equation_block
,
'droped_bboxes'
:
need_remove_spans_bboxes_dict
,
}
return
return_dict
def
ocr_construct_page_component_v2
(
blocks
,
layout_bboxes
,
page_id
,
page_w
,
page_h
,
layout_tree
,
images
,
tables
,
interline_equations
,
discarded_blocks
,
need_drop
,
drop_reason
):
...
...
magic_pdf/pre_proc/cut_image.py
View file @
d8823885
...
...
@@ -25,43 +25,6 @@ def ocr_cut_image_and_table(spans, page, page_id, pdf_bytes_md5, imageWriter):
return
spans
def
txt_save_images_by_bboxes
(
page_num
:
int
,
page
,
pdf_bytes_md5
:
str
,
image_bboxes
:
list
,
images_overlap_backup
:
list
,
table_bboxes
:
list
,
equation_inline_bboxes
:
list
,
equation_interline_bboxes
:
list
,
imageWriter
)
->
dict
:
"""返回一个dict, key为bbox, 值是图片地址."""
image_info
=
[]
image_backup_info
=
[]
table_info
=
[]
inline_eq_info
=
[]
interline_eq_info
=
[]
# 图片的保存路径组成是这样的: {s3_or_local_path}/{book_name}/{images|tables|equations}/{page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg
def
return_path
(
type
):
return
join_path
(
pdf_bytes_md5
,
type
)
for
bbox
in
image_bboxes
:
if
not
check_img_bbox
(
bbox
):
continue
image_path
=
cut_image
(
bbox
,
page_num
,
page
,
return_path
(
'images'
),
imageWriter
)
image_info
.
append
({
'bbox'
:
bbox
,
'image_path'
:
image_path
})
for
bbox
in
images_overlap_backup
:
if
not
check_img_bbox
(
bbox
):
continue
image_path
=
cut_image
(
bbox
,
page_num
,
page
,
return_path
(
'images'
),
imageWriter
)
image_backup_info
.
append
({
'bbox'
:
bbox
,
'image_path'
:
image_path
})
for
bbox
in
table_bboxes
:
if
not
check_img_bbox
(
bbox
):
continue
image_path
=
cut_image
(
bbox
,
page_num
,
page
,
return_path
(
'tables'
),
imageWriter
)
table_info
.
append
({
'bbox'
:
bbox
,
'image_path'
:
image_path
})
return
image_info
,
image_backup_info
,
table_info
,
inline_eq_info
,
interline_eq_info
def
check_img_bbox
(
bbox
)
->
bool
:
if
any
([
bbox
[
0
]
>=
bbox
[
2
],
bbox
[
1
]
>=
bbox
[
3
]]):
logger
.
warning
(
f
'image_bboxes: 错误的box,
{
bbox
}
'
)
...
...
magic_pdf/pre_proc/ocr_detect_all_bboxes.py
View file @
d8823885
...
...
@@ -8,179 +8,6 @@ from magic_pdf.pre_proc.remove_bbox_overlap import \
remove_overlap_between_bbox_for_block
def
ocr_prepare_bboxes_for_layout_split
(
img_blocks
,
table_blocks
,
discarded_blocks
,
text_blocks
,
title_blocks
,
interline_equation_blocks
,
page_w
,
page_h
,
):
all_bboxes
=
[]
all_discarded_blocks
=
[]
for
image
in
img_blocks
:
x0
,
y0
,
x1
,
y1
=
image
[
'bbox'
]
all_bboxes
.
append
(
[
x0
,
y0
,
x1
,
y1
,
None
,
None
,
None
,
BlockType
.
Image
,
None
,
None
,
None
,
None
,
image
[
'score'
],
]
)
for
table
in
table_blocks
:
x0
,
y0
,
x1
,
y1
=
table
[
'bbox'
]
all_bboxes
.
append
(
[
x0
,
y0
,
x1
,
y1
,
None
,
None
,
None
,
BlockType
.
Table
,
None
,
None
,
None
,
None
,
table
[
'score'
],
]
)
for
text
in
text_blocks
:
x0
,
y0
,
x1
,
y1
=
text
[
'bbox'
]
all_bboxes
.
append
(
[
x0
,
y0
,
x1
,
y1
,
None
,
None
,
None
,
BlockType
.
Text
,
None
,
None
,
None
,
None
,
text
[
'score'
],
]
)
for
title
in
title_blocks
:
x0
,
y0
,
x1
,
y1
=
title
[
'bbox'
]
all_bboxes
.
append
(
[
x0
,
y0
,
x1
,
y1
,
None
,
None
,
None
,
BlockType
.
Title
,
None
,
None
,
None
,
None
,
title
[
'score'
],
]
)
for
interline_equation
in
interline_equation_blocks
:
x0
,
y0
,
x1
,
y1
=
interline_equation
[
'bbox'
]
all_bboxes
.
append
(
[
x0
,
y0
,
x1
,
y1
,
None
,
None
,
None
,
BlockType
.
InterlineEquation
,
None
,
None
,
None
,
None
,
interline_equation
[
'score'
],
]
)
"""block嵌套问题解决"""
"""文本框与标题框重叠,优先信任文本框"""
all_bboxes
=
fix_text_overlap_title_blocks
(
all_bboxes
)
"""任何框体与舍弃框重叠,优先信任舍弃框"""
all_bboxes
=
remove_need_drop_blocks
(
all_bboxes
,
discarded_blocks
)
# interline_equation 与title或text框冲突的情况,分两种情况处理
"""interline_equation框与文本类型框iou比较接近1的时候,信任行间公式框"""
all_bboxes
=
fix_interline_equation_overlap_text_blocks_with_hi_iou
(
all_bboxes
)
"""interline_equation框被包含在文本类型框内,且interline_equation比文本区块小很多时信任文本框,这时需要舍弃公式框"""
# 通过后续大框套小框逻辑删除
"""discarded_blocks中只保留宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的(限定footnote)"""
for
discarded
in
discarded_blocks
:
x0
,
y0
,
x1
,
y1
=
discarded
[
'bbox'
]
all_discarded_blocks
.
append
(
[
x0
,
y0
,
x1
,
y1
,
None
,
None
,
None
,
BlockType
.
Discarded
,
None
,
None
,
None
,
None
,
discarded
[
'score'
],
]
)
# 将footnote加入到all_bboxes中,用来计算layout
if
(
x1
-
x0
)
>
(
page_w
/
3
)
and
(
y1
-
y0
)
>
10
and
y0
>
(
page_h
/
2
):
all_bboxes
.
append
(
[
x0
,
y0
,
x1
,
y1
,
None
,
None
,
None
,
BlockType
.
Footnote
,
None
,
None
,
None
,
None
,
discarded
[
'score'
],
]
)
"""经过以上处理后,还存在大框套小框的情况,则删除小框"""
all_bboxes
=
remove_overlaps_min_blocks
(
all_bboxes
)
all_discarded_blocks
=
remove_overlaps_min_blocks
(
all_discarded_blocks
)
"""将剩余的bbox做分离处理,防止后面分layout时出错"""
all_bboxes
,
drop_reasons
=
remove_overlap_between_bbox_for_block
(
all_bboxes
)
return
all_bboxes
,
all_discarded_blocks
,
drop_reasons
def
add_bboxes
(
blocks
,
block_type
,
bboxes
):
for
block
in
blocks
:
x0
,
y0
,
x1
,
y1
=
block
[
'bbox'
]
...
...
magic_pdf/pre_proc/ocr_dict_merge.py
View file @
d8823885
from
magic_pdf.config.drop_tag
import
DropTag
from
magic_pdf.config.ocr_content_type
import
BlockType
,
ContentType
from
magic_pdf.libs.boxbase
import
(
__is_overlaps_y_exceeds_threshold
,
_is_in_or_part_overlap_with_area_ratio
,
calculate_overlap_area_in_bbox1_area_ratio
)
from
magic_pdf.libs.boxbase
import
__is_overlaps_y_exceeds_threshold
,
calculate_overlap_area_in_bbox1_area_ratio
# 将每一个line中的span从左到右排序
...
...
@@ -63,86 +61,6 @@ def merge_spans_to_line(spans, threshold=0.6):
return
lines
def
merge_spans_to_line_by_layout
(
spans
,
layout_bboxes
):
lines
=
[]
new_spans
=
[]
dropped_spans
=
[]
for
item
in
layout_bboxes
:
layout_bbox
=
item
[
'layout_bbox'
]
# 遍历spans,将每个span放入对应的layout中
layout_sapns
=
[]
for
span
in
spans
:
if
calculate_overlap_area_in_bbox1_area_ratio
(
span
[
'bbox'
],
layout_bbox
)
>
0.6
:
layout_sapns
.
append
(
span
)
# 如果layout_sapns不为空,则放入new_spans中
if
len
(
layout_sapns
)
>
0
:
new_spans
.
append
(
layout_sapns
)
# 从spans删除已经放入layout_sapns中的span
for
layout_sapn
in
layout_sapns
:
spans
.
remove
(
layout_sapn
)
if
len
(
new_spans
)
>
0
:
for
layout_sapns
in
new_spans
:
layout_lines
=
merge_spans_to_line
(
layout_sapns
)
lines
.
extend
(
layout_lines
)
# 对line中的span进行排序
lines
=
line_sort_spans_by_left_to_right
(
lines
)
for
span
in
spans
:
span
[
'tag'
]
=
DropTag
.
NOT_IN_LAYOUT
dropped_spans
.
append
(
span
)
return
lines
,
dropped_spans
def
merge_lines_to_block
(
lines
):
# 目前不做block拼接,先做个结构,每个block中只有一个line,block的bbox就是line的bbox
blocks
=
[]
for
line
in
lines
:
blocks
.
append
({
'bbox'
:
line
[
'bbox'
],
'lines'
:
[
line
],
})
return
blocks
def
sort_blocks_by_layout
(
all_bboxes
,
layout_bboxes
):
new_blocks
=
[]
sort_blocks
=
[]
for
item
in
layout_bboxes
:
layout_bbox
=
item
[
'layout_bbox'
]
# 遍历blocks,将每个blocks放入对应的layout中
layout_blocks
=
[]
for
block
in
all_bboxes
:
# 如果是footnote则跳过
if
block
[
7
]
==
BlockType
.
Footnote
:
continue
block_bbox
=
block
[:
4
]
if
calculate_overlap_area_in_bbox1_area_ratio
(
block_bbox
,
layout_bbox
)
>
0.8
:
layout_blocks
.
append
(
block
)
# 如果layout_blocks不为空,则放入new_blocks中
if
len
(
layout_blocks
)
>
0
:
new_blocks
.
append
(
layout_blocks
)
# 从all_bboxes删除已经放入layout_blocks中的block
for
layout_block
in
layout_blocks
:
all_bboxes
.
remove
(
layout_block
)
# 如果new_blocks不为空,则对new_blocks中每个block进行排序
if
len
(
new_blocks
)
>
0
:
for
bboxes_in_layout_block
in
new_blocks
:
bboxes_in_layout_block
.
sort
(
key
=
lambda
x
:
x
[
1
])
# 一个layout内部的box,按照y0自上而下排序
sort_blocks
.
extend
(
bboxes_in_layout_block
)
# sort_blocks中已经包含了当前页面所有最终留下的block,且已经排好了顺序
return
sort_blocks
def
fill_spans_in_blocks
(
blocks
,
spans
,
radio
):
"""将allspans中的span按位置关系,放入blocks中."""
block_with_spans
=
[]
...
...
@@ -184,28 +102,6 @@ def fill_spans_in_blocks(blocks, spans, radio):
return
block_with_spans
,
spans
def
fix_block_spans
(
block_with_spans
,
img_blocks
,
table_blocks
):
"""1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系
需要将caption和footnote的text_span放入相应img_block和table_block内的
caption_block和footnote_block中 2、同时需要删除block中的spans字段."""
fix_blocks
=
[]
for
block
in
block_with_spans
:
block_type
=
block
[
'type'
]
if
block_type
==
BlockType
.
Image
:
block
=
fix_image_block
(
block
,
img_blocks
)
elif
block_type
==
BlockType
.
Table
:
block
=
fix_table_block
(
block
,
table_blocks
)
elif
block_type
in
[
BlockType
.
Text
,
BlockType
.
Title
]:
block
=
fix_text_block
(
block
)
elif
block_type
==
BlockType
.
InterlineEquation
:
block
=
fix_interline_block
(
block
)
else
:
continue
fix_blocks
.
append
(
block
)
return
fix_blocks
def
fix_block_spans_v2
(
block_with_spans
):
"""1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系
需要将caption和footnote的text_span放入相应img_block和table_block内的
...
...
@@ -235,113 +131,6 @@ def fix_discarded_block(discarded_block_with_spans):
return
fix_discarded_blocks
def
merge_spans_to_block
(
spans
:
list
,
block_bbox
:
list
,
block_type
:
str
):
block_spans
=
[]
# 如果有img_caption,则将img_block中的text_spans放入img_caption_block中
for
span
in
spans
:
if
calculate_overlap_area_in_bbox1_area_ratio
(
span
[
'bbox'
],
block_bbox
)
>
0.6
:
block_spans
.
append
(
span
)
block_lines
=
merge_spans_to_line
(
block_spans
)
# 对line中的span进行排序
sort_block_lines
=
line_sort_spans_by_left_to_right
(
block_lines
)
block
=
{
'bbox'
:
block_bbox
,
'type'
:
block_type
,
'lines'
:
sort_block_lines
}
return
block
,
block_spans
def
make_body_block
(
span
:
dict
,
block_bbox
:
list
,
block_type
:
str
):
# 创建body_block
body_line
=
{
'bbox'
:
block_bbox
,
'spans'
:
[
span
],
}
body_block
=
{
'bbox'
:
block_bbox
,
'type'
:
block_type
,
'lines'
:
[
body_line
]}
return
body_block
def
fix_image_block
(
block
,
img_blocks
):
block
[
'blocks'
]
=
[]
# 遍历img_blocks,找到与当前block匹配的img_block
for
img_block
in
img_blocks
:
if
_is_in_or_part_overlap_with_area_ratio
(
block
[
'bbox'
],
img_block
[
'bbox'
],
0.95
):
# 创建img_body_block
for
span
in
block
[
'spans'
]:
if
span
[
'type'
]
==
ContentType
.
Image
and
img_block
[
'img_body_bbox'
]
==
span
[
'bbox'
]:
# 创建img_body_block
img_body_block
=
make_body_block
(
span
,
img_block
[
'img_body_bbox'
],
BlockType
.
ImageBody
)
block
[
'blocks'
].
append
(
img_body_block
)
# 从spans中移除img_body_block中已经放入的span
block
[
'spans'
].
remove
(
span
)
break
# 根据list长度,判断img_block中是否有img_caption
if
img_block
[
'img_caption_bbox'
]
is
not
None
:
img_caption_block
,
img_caption_spans
=
merge_spans_to_block
(
block
[
'spans'
],
img_block
[
'img_caption_bbox'
],
BlockType
.
ImageCaption
)
block
[
'blocks'
].
append
(
img_caption_block
)
if
img_block
[
'img_footnote_bbox'
]
is
not
None
:
img_footnote_block
,
img_footnote_spans
=
merge_spans_to_block
(
block
[
'spans'
],
img_block
[
'img_footnote_bbox'
],
BlockType
.
ImageFootnote
)
block
[
'blocks'
].
append
(
img_footnote_block
)
break
del
block
[
'spans'
]
return
block
def
fix_table_block
(
block
,
table_blocks
):
block
[
'blocks'
]
=
[]
# 遍历table_blocks,找到与当前block匹配的table_block
for
table_block
in
table_blocks
:
if
_is_in_or_part_overlap_with_area_ratio
(
block
[
'bbox'
],
table_block
[
'bbox'
],
0.95
):
# 创建table_body_block
for
span
in
block
[
'spans'
]:
if
span
[
'type'
]
==
ContentType
.
Table
and
table_block
[
'table_body_bbox'
]
==
span
[
'bbox'
]:
# 创建table_body_block
table_body_block
=
make_body_block
(
span
,
table_block
[
'table_body_bbox'
],
BlockType
.
TableBody
)
block
[
'blocks'
].
append
(
table_body_block
)
# 从spans中移除img_body_block中已经放入的span
block
[
'spans'
].
remove
(
span
)
break
# 根据list长度,判断table_block中是否有caption
if
table_block
[
'table_caption_bbox'
]
is
not
None
:
table_caption_block
,
table_caption_spans
=
merge_spans_to_block
(
block
[
'spans'
],
table_block
[
'table_caption_bbox'
],
BlockType
.
TableCaption
)
block
[
'blocks'
].
append
(
table_caption_block
)
# 如果table_caption_block_spans不为空
if
len
(
table_caption_spans
)
>
0
:
# 一些span已经放入了caption_block中,需要从block['spans']中删除
for
span
in
table_caption_spans
:
block
[
'spans'
].
remove
(
span
)
# 根据list长度,判断table_block中是否有table_note
if
table_block
[
'table_footnote_bbox'
]
is
not
None
:
table_footnote_block
,
table_footnote_spans
=
merge_spans_to_block
(
block
[
'spans'
],
table_block
[
'table_footnote_bbox'
],
BlockType
.
TableFootnote
)
block
[
'blocks'
].
append
(
table_footnote_block
)
break
del
block
[
'spans'
]
return
block
def
fix_text_block
(
block
):
# 文本block中的公式span都应该转换成行内type
for
span
in
block
[
'spans'
]:
...
...
magic_pdf/pre_proc/ocr_span_list_modify.py
View file @
d8823885
from
magic_pdf.config.drop_tag
import
DropTag
from
magic_pdf.config.ocr_content_type
import
BlockType
,
ContentType
from
magic_pdf.libs.boxbase
import
(
__is_overlaps_y_exceeds_threshold
,
calculate_iou
,
calculate_overlap_area_in_bbox1_area_ratio
,
get_minbox_if_overlap_by_ratio
)
from
magic_pdf.config.ocr_content_type
import
BlockType
from
magic_pdf.libs.boxbase
import
calculate_iou
,
get_minbox_if_overlap_by_ratio
def
remove_overlaps_low_confidence_spans
(
spans
):
...
...
@@ -59,253 +56,6 @@ def remove_overlaps_min_spans(spans):
return
spans
,
dropped_spans
def
remove_spans_by_bboxes
(
spans
,
need_remove_spans_bboxes
):
# 遍历spans, 判断是否在removed_span_block_bboxes中
# 如果是, 则删除该span 否则, 保留该span
need_remove_spans
=
[]
for
span
in
spans
:
for
removed_bbox
in
need_remove_spans_bboxes
:
if
(
calculate_overlap_area_in_bbox1_area_ratio
(
span
[
'bbox'
],
removed_bbox
)
>
0.5
):
if
span
not
in
need_remove_spans
:
need_remove_spans
.
append
(
span
)
break
if
len
(
need_remove_spans
)
>
0
:
for
span
in
need_remove_spans
:
spans
.
remove
(
span
)
return
spans
def
remove_spans_by_bboxes_dict
(
spans
,
need_remove_spans_bboxes_dict
):
dropped_spans
=
[]
for
drop_tag
,
removed_bboxes
in
need_remove_spans_bboxes_dict
.
items
():
# logger.info(f"remove spans by bbox dict, drop_tag: {drop_tag}, removed_bboxes: {removed_bboxes}")
need_remove_spans
=
[]
for
span
in
spans
:
# 通过判断span的bbox是否在removed_bboxes中, 判断是否需要删除该span
for
removed_bbox
in
removed_bboxes
:
if
(
calculate_overlap_area_in_bbox1_area_ratio
(
span
[
'bbox'
],
removed_bbox
)
>
0.5
):
need_remove_spans
.
append
(
span
)
break
# 当drop_tag为DropTag.FOOTNOTE时, 判断span是否在removed_bboxes中任意一个的下方,如果是,则删除该span
elif
(
drop_tag
==
DropTag
.
FOOTNOTE
and
(
span
[
'bbox'
][
1
]
+
span
[
'bbox'
][
3
])
/
2
>
removed_bbox
[
3
]
and
removed_bbox
[
0
]
<
(
span
[
'bbox'
][
0
]
+
span
[
'bbox'
][
2
])
/
2
<
removed_bbox
[
2
]
):
need_remove_spans
.
append
(
span
)
break
for
span
in
need_remove_spans
:
spans
.
remove
(
span
)
span
[
'tag'
]
=
drop_tag
dropped_spans
.
append
(
span
)
return
spans
,
dropped_spans
def
adjust_bbox_for_standalone_block
(
spans
):
# 对tpye=["interline_equation", "image", "table"]进行额外处理,如果左边有字的话,将该span的bbox中y0调整至不高于文字的y0
for
sb_span
in
spans
:
if
sb_span
[
'type'
]
in
[
ContentType
.
InterlineEquation
,
ContentType
.
Image
,
ContentType
.
Table
,
]:
for
text_span
in
spans
:
if
text_span
[
'type'
]
in
[
ContentType
.
Text
,
ContentType
.
InlineEquation
]:
# 判断span2的纵向高度是否被span所覆盖
if
(
sb_span
[
'bbox'
][
1
]
<
text_span
[
'bbox'
][
1
]
and
sb_span
[
'bbox'
][
3
]
>
text_span
[
'bbox'
][
3
]
):
# 判断span2是否在span左边
if
text_span
[
'bbox'
][
0
]
<
sb_span
[
'bbox'
][
0
]:
# 调整span的y0和span2的y0一致
sb_span
[
'bbox'
][
1
]
=
text_span
[
'bbox'
][
1
]
return
spans
def
modify_y_axis
(
spans
:
list
,
displayed_list
:
list
,
text_inline_lines
:
list
):
# displayed_list = []
# 如果spans为空,则不处理
if
len
(
spans
)
==
0
:
pass
else
:
spans
.
sort
(
key
=
lambda
span
:
span
[
'bbox'
][
1
])
lines
=
[]
current_line
=
[
spans
[
0
]]
if
spans
[
0
][
'type'
]
in
[
ContentType
.
InterlineEquation
,
ContentType
.
Image
,
ContentType
.
Table
,
]:
displayed_list
.
append
(
spans
[
0
])
line_first_y0
=
spans
[
0
][
'bbox'
][
1
]
line_first_y
=
spans
[
0
][
'bbox'
][
3
]
# 用于给行间公式搜索
# text_inline_lines = []
for
span
in
spans
[
1
:]:
# if span.get("content","") == "78.":
# print("debug")
# 如果当前的span类型为"interline_equation" 或者 当前行中已经有"interline_equation"
# image和table类型,同上
if
span
[
'type'
]
in
[
ContentType
.
InterlineEquation
,
ContentType
.
Image
,
ContentType
.
Table
,
]
or
any
(
s
[
'type'
]
in
[
ContentType
.
InterlineEquation
,
ContentType
.
Image
,
ContentType
.
Table
]
for
s
in
current_line
):
# 传入
if
span
[
'type'
]
in
[
ContentType
.
InterlineEquation
,
ContentType
.
Image
,
ContentType
.
Table
,
]:
displayed_list
.
append
(
span
)
# 则开始新行
lines
.
append
(
current_line
)
if
len
(
current_line
)
>
1
or
current_line
[
0
][
'type'
]
in
[
ContentType
.
Text
,
ContentType
.
InlineEquation
,
]:
text_inline_lines
.
append
(
(
current_line
,
(
line_first_y0
,
line_first_y
))
)
current_line
=
[
span
]
line_first_y0
=
span
[
'bbox'
][
1
]
line_first_y
=
span
[
'bbox'
][
3
]
continue
# 如果当前的span与当前行的最后一个span在y轴上重叠,则添加到当前行
if
__is_overlaps_y_exceeds_threshold
(
span
[
'bbox'
],
current_line
[
-
1
][
'bbox'
]
):
if
span
[
'type'
]
==
'text'
:
line_first_y0
=
span
[
'bbox'
][
1
]
line_first_y
=
span
[
'bbox'
][
3
]
current_line
.
append
(
span
)
else
:
# 否则,开始新行
lines
.
append
(
current_line
)
text_inline_lines
.
append
((
current_line
,
(
line_first_y0
,
line_first_y
)))
current_line
=
[
span
]
line_first_y0
=
span
[
'bbox'
][
1
]
line_first_y
=
span
[
'bbox'
][
3
]
# 添加最后一行
if
current_line
:
lines
.
append
(
current_line
)
if
len
(
current_line
)
>
1
or
current_line
[
0
][
'type'
]
in
[
ContentType
.
Text
,
ContentType
.
InlineEquation
,
]:
text_inline_lines
.
append
((
current_line
,
(
line_first_y0
,
line_first_y
)))
for
line
in
text_inline_lines
:
# 按照x0坐标排序
current_line
=
line
[
0
]
current_line
.
sort
(
key
=
lambda
span
:
span
[
'bbox'
][
0
])
# 调整每一个文字行内bbox统一
for
line
in
text_inline_lines
:
current_line
,
(
line_first_y0
,
line_first_y
)
=
line
for
span
in
current_line
:
span
[
'bbox'
][
1
]
=
line_first_y0
span
[
'bbox'
][
3
]
=
line_first_y
# return spans, displayed_list, text_inline_lines
def
modify_inline_equation
(
spans
:
list
,
displayed_list
:
list
,
text_inline_lines
:
list
):
# 错误行间公式转行内公式
j
=
0
for
i
in
range
(
len
(
displayed_list
)):
# if i == 8:
# print("debug")
span
=
displayed_list
[
i
]
span_y0
,
span_y
=
span
[
'bbox'
][
1
],
span
[
'bbox'
][
3
]
while
j
<
len
(
text_inline_lines
):
text_line
=
text_inline_lines
[
j
]
y0
,
y1
=
text_line
[
1
]
if
(
span_y0
<
y0
<
span_y
or
span_y0
<
y1
<
span_y
or
span_y0
<
y0
and
span_y
>
y1
)
and
__is_overlaps_y_exceeds_threshold
(
span
[
'bbox'
],
(
0
,
y0
,
0
,
y1
)):
# 调整公式类型
if
span
[
'type'
]
==
ContentType
.
InterlineEquation
:
# 最后一行是行间公式
if
j
+
1
>=
len
(
text_inline_lines
):
span
[
'type'
]
=
ContentType
.
InlineEquation
span
[
'bbox'
][
1
]
=
y0
span
[
'bbox'
][
3
]
=
y1
else
:
# 行间公式旁边有多行文字或者行间公式比文字高3倍则不转换
y0_next
,
y1_next
=
text_inline_lines
[
j
+
1
][
1
]
if
(
not
__is_overlaps_y_exceeds_threshold
(
span
[
'bbox'
],
(
0
,
y0_next
,
0
,
y1_next
)
)
and
3
*
(
y1
-
y0
)
>
span_y
-
span_y0
):
span
[
'type'
]
=
ContentType
.
InlineEquation
span
[
'bbox'
][
1
]
=
y0
span
[
'bbox'
][
3
]
=
y1
break
elif
(
span_y
<
y0
or
span_y0
<
y0
<
span_y
and
not
__is_overlaps_y_exceeds_threshold
(
span
[
'bbox'
],
(
0
,
y0
,
0
,
y1
))
):
break
else
:
j
+=
1
return
spans
def
get_qa_need_list
(
blocks
):
# 创建 images, tables, interline_equations, inline_equations 的副本
images
=
[]
tables
=
[]
interline_equations
=
[]
inline_equations
=
[]
for
block
in
blocks
:
for
line
in
block
[
'lines'
]:
for
span
in
line
[
'spans'
]:
if
span
[
'type'
]
==
ContentType
.
Image
:
images
.
append
(
span
)
elif
span
[
'type'
]
==
ContentType
.
Table
:
tables
.
append
(
span
)
elif
span
[
'type'
]
==
ContentType
.
InlineEquation
:
inline_equations
.
append
(
span
)
elif
span
[
'type'
]
==
ContentType
.
InterlineEquation
:
interline_equations
.
append
(
span
)
else
:
continue
return
images
,
tables
,
interline_equations
,
inline_equations
def
get_qa_need_list_v2
(
blocks
):
# 创建 images, tables, interline_equations, inline_equations 的副本
images
=
[]
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment