Commit 23bacc60 authored by Shuimo's avatar Shuimo
Browse files

add an option to freely output 'badcase.json

parents d1457937 4191fa96
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.libs.pdf_image_tools import cut_image
def cut_image_and_table(spans, page, page_id, pdf_bytes_md5, imageWriter):
def return_path(type):
return join_path(pdf_bytes_md5, type)
for span in spans:
span_type = span['type']
if span_type == ContentType.Image:
span['image_path'] = cut_image(span['bbox'], page_id, page, return_path=return_path('images'), imageWriter=imageWriter)
elif span_type == ContentType.Table:
span['image_path'] = cut_image(span['bbox'], page_id, page, return_path=return_path('tables'), imageWriter=imageWriter)
return spans
from magic_pdf.libs.boxbase import get_minbox_if_overlap_by_ratio, calculate_overlap_area_in_bbox1_area_ratio, \
calculate_iou
from magic_pdf.libs.drop_tag import DropTag
from magic_pdf.libs.ocr_content_type import BlockType
def ocr_prepare_bboxes_for_layout_split(img_blocks, table_blocks, discarded_blocks, text_blocks,
title_blocks, interline_equation_blocks, page_w, page_h):
all_bboxes = []
for image in img_blocks:
x0, y0, x1, y1 = image['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Image, None, None, None, None])
for table in table_blocks:
x0, y0, x1, y1 = table['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Table, None, None, None, None])
for text in text_blocks:
x0, y0, x1, y1 = text['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Text, None, None, None, None])
for title in title_blocks:
x0, y0, x1, y1 = title['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Title, None, None, None, None])
for interline_equation in interline_equation_blocks:
x0, y0, x1, y1 = interline_equation['bbox']
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.InterlineEquation, None, None, None, None])
'''block嵌套问题解决'''
'''文本框与标题框重叠,优先信任文本框'''
all_bboxes = fix_text_overlap_title_blocks(all_bboxes)
'''任何框体与舍弃框重叠,优先信任舍弃框'''
all_bboxes = remove_need_drop_blocks(all_bboxes, discarded_blocks)
'''经过以上处理后,还存在大框套小框的情况,则删除小框'''
all_bboxes = remove_overlaps_min_blocks(all_bboxes)
'''discarded_blocks中只保留宽度超过1/3页面宽度的,高度超过10的,处于页面下半50%区域的(限定footnote)'''
for discarded in discarded_blocks:
x0, y0, x1, y1 = discarded['bbox']
if (x1 - x0) > (page_w / 3) and (y1 - y0) > 10 and y0 > (page_h / 2):
all_bboxes.append([x0, y0, x1, y1, None, None, None, BlockType.Footnote, None, None, None, None])
return all_bboxes
def fix_text_overlap_title_blocks(all_bboxes):
# 先提取所有text和title block
text_blocks = []
for block in all_bboxes:
if block[7] == BlockType.Text:
text_blocks.append(block)
title_blocks = []
for block in all_bboxes:
if block[7] == BlockType.Title:
title_blocks.append(block)
for text_block in text_blocks:
for title_block in title_blocks:
text_block_bbox = text_block[0], text_block[1], text_block[2], text_block[3]
title_block_bbox = title_block[0], title_block[1], title_block[2], title_block[3]
if calculate_iou(text_block_bbox, title_block_bbox) > 0.8:
all_bboxes.remove(title_block)
return all_bboxes
def remove_need_drop_blocks(all_bboxes, discarded_blocks):
for block in all_bboxes.copy():
for discarded_block in discarded_blocks:
block_bbox = block[0], block[1], block[2], block[3]
if calculate_overlap_area_in_bbox1_area_ratio(block_bbox, discarded_block['bbox']) > 0.6:
all_bboxes.remove(block)
return all_bboxes
def remove_overlaps_min_blocks(all_bboxes):
# 删除重叠blocks中较小的那些
for block1 in all_bboxes.copy():
for block2 in all_bboxes.copy():
if block1 != block2:
block1_bbox = [block1[0], block1[1], block1[2], block1[3]]
block2_bbox = [block2[0], block2[1], block2[2], block2[3]]
overlap_box = get_minbox_if_overlap_by_ratio(block1_bbox, block2_bbox, 0.8)
if overlap_box is not None:
bbox_to_remove = next(
(block for block in all_bboxes if [block[0], block[1], block[2], block[3]] == overlap_box),
None)
if bbox_to_remove is not None:
all_bboxes.remove(bbox_to_remove)
return all_bboxes
......@@ -3,7 +3,9 @@ from loguru import logger
from magic_pdf.libs.boxbase import __is_overlaps_y_exceeds_threshold, get_minbox_if_overlap_by_ratio, \
calculate_overlap_area_in_bbox1_area_ratio
from magic_pdf.libs.drop_tag import DropTag
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.libs.ocr_content_type import ContentType, BlockType
from magic_pdf.pre_proc.ocr_span_list_modify import modify_y_axis, modify_inline_equation
from magic_pdf.pre_proc.remove_bbox_overlap import remove_overlap_between_bbox
# 将每一个line中的span从左到右排序
......@@ -24,6 +26,7 @@ def line_sort_spans_by_left_to_right(lines):
})
return line_objects
def merge_spans_to_line(spans):
if len(spans) == 0:
return []
......@@ -37,7 +40,8 @@ def merge_spans_to_line(spans):
# 如果当前的span类型为"interline_equation" 或者 当前行中已经有"interline_equation"
# image和table类型,同上
if span['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table] or any(
s['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table] for s in current_line):
s['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table] for s in
current_line):
# 则开始新行
lines.append(current_line)
current_line = [span]
......@@ -57,6 +61,7 @@ def merge_spans_to_line(spans):
return lines
def merge_spans_to_line_by_layout(spans, layout_bboxes):
lines = []
new_spans = []
......@@ -103,7 +108,205 @@ def merge_lines_to_block(lines):
return blocks
def sort_blocks_by_layout(all_bboxes, layout_bboxes):
new_blocks = []
sort_blocks = []
for item in layout_bboxes:
layout_bbox = item['layout_bbox']
# 遍历blocks,将每个blocks放入对应的layout中
layout_blocks = []
for block in all_bboxes:
# 如果是footnote则跳过
if block[7] == BlockType.Footnote:
continue
block_bbox = [block[0], block[1], block[2], block[3]]
if calculate_overlap_area_in_bbox1_area_ratio(block_bbox, layout_bbox) > 0.8:
layout_blocks.append(block)
# 如果layout_blocks不为空,则放入new_blocks中
if len(layout_blocks) > 0:
new_blocks.append(layout_blocks)
# 从spans删除已经放入layout_sapns中的span
for layout_block in layout_blocks:
all_bboxes.remove(layout_block)
# 如果new_blocks不为空,则对new_blocks中每个block进行排序
if len(new_blocks) > 0:
for bboxes_in_layout_block in new_blocks:
bboxes_in_layout_block.sort(key=lambda x: x[1]) # 一个layout内部的box,按照y0自上而下排序
sort_blocks.extend(bboxes_in_layout_block)
# sort_blocks中已经包含了当前页面所有最终留下的block,且已经排好了顺序
return sort_blocks
def fill_spans_in_blocks(blocks, spans):
'''
将allspans中的span按位置关系,放入blocks中
'''
block_with_spans = []
for block in blocks:
block_type = block[7]
block_bbox = block[0:4]
block_dict = {
'type': block_type,
'bbox': block_bbox,
}
block_spans = []
for span in spans:
span_bbox = span['bbox']
if calculate_overlap_area_in_bbox1_area_ratio(span_bbox, block_bbox) > 0.7:
block_spans.append(span)
'''行内公式调整, 高度调整至与同行文字高度一致(优先左侧, 其次右侧)'''
displayed_list = []
text_inline_lines = []
modify_y_axis(block_spans, displayed_list, text_inline_lines)
'''模型识别错误的行间公式, type类型转换成行内公式'''
block_spans = modify_inline_equation(block_spans, displayed_list, text_inline_lines)
'''bbox去除粘连'''
block_spans = remove_overlap_between_bbox(block_spans)
block_dict['spans'] = block_spans
block_with_spans.append(block_dict)
# 从spans删除已经放入block_spans中的span
if len(block_spans) > 0:
for span in block_spans:
spans.remove(span)
return block_with_spans
def fix_block_spans(block_with_spans, img_blocks, table_blocks):
'''
1、img_block和table_block因为包含caption和footnote的关系,存在block的嵌套关系
需要将caption和footnote的text_span放入相应img_block和table_block内的
caption_block和footnote_block中
2、同时需要删除block中的spans字段
'''
fix_blocks = []
for block in block_with_spans:
block_type = block['type']
if block_type == BlockType.Image:
block = fix_image_block(block, img_blocks)
elif block_type == BlockType.Table:
block = fix_table_block(block, table_blocks)
elif block_type in [BlockType.Text, BlockType.Title, BlockType.InterlineEquation]:
block = fix_text_block(block)
else:
continue
fix_blocks.append(block)
return fix_blocks
def merge_spans_to_block(spans: list, block_bbox: list, block_type: str):
block_spans = []
# 如果有img_caption,则将img_block中的text_spans放入img_caption_block中
for span in spans:
if calculate_overlap_area_in_bbox1_area_ratio(span['bbox'], block_bbox) > 0.8:
block_spans.append(span)
block_lines = merge_spans_to_line(block_spans)
# 对line中的span进行排序
sort_block_lines = line_sort_spans_by_left_to_right(block_lines)
block = {
'bbox': block_bbox,
'type': block_type,
'lines': sort_block_lines
}
return block, block_spans
def make_body_block(span: dict, block_bbox: list, block_type: str):
# 创建body_block
body_line = {
'bbox': block_bbox,
'spans': [span],
}
body_block = {
'bbox': block_bbox,
'type': block_type,
'lines': [body_line]
}
return body_block
def fix_image_block(block, img_blocks):
block['blocks'] = []
# 遍历img_blocks,找到与当前block匹配的img_block
for img_block in img_blocks:
if img_block['bbox'] == block['bbox']:
# 创建img_body_block
for span in block['spans']:
if span['type'] == ContentType.Image and span['bbox'] == img_block['img_body_bbox']:
# 创建img_body_block
img_body_block = make_body_block(span, img_block['img_body_bbox'], BlockType.ImageBody)
block['blocks'].append(img_body_block)
# 从spans中移除img_body_block中已经放入的span
block['spans'].remove(span)
break
# 根据list长度,判断img_block中是否有img_caption
if img_block['img_caption_bbox'] is not None:
img_caption_block, img_caption_spans = merge_spans_to_block(
block['spans'], img_block['img_caption_bbox'], BlockType.ImageCaption
)
block['blocks'].append(img_caption_block)
break
del block['spans']
return block
def fix_table_block(block, table_blocks):
block['blocks'] = []
# 遍历table_blocks,找到与当前block匹配的table_block
for table_block in table_blocks:
if table_block['bbox'] == block['bbox']:
# 创建table_body_block
for span in block['spans']:
if span['type'] == ContentType.Table and span['bbox'] == table_block['table_body_bbox']:
# 创建table_body_block
table_body_block = make_body_block(span, table_block['table_body_bbox'], BlockType.TableBody)
block['blocks'].append(table_body_block)
# 从spans中移除img_body_block中已经放入的span
block['spans'].remove(span)
break
# 根据list长度,判断table_block中是否有caption
if table_block['table_caption_bbox'] is not None:
table_caption_block, table_caption_spans = merge_spans_to_block(
block['spans'], table_block['table_caption_bbox'], BlockType.TableCaption
)
block['blocks'].append(table_caption_block)
# 如果table_caption_block_spans不为空
if len(table_caption_spans) > 0:
# 一些span已经放入了caption_block中,需要从block['spans']中删除
for span in table_caption_spans:
block['spans'].remove(span)
# 根据list长度,判断table_block中是否有table_note
if table_block['table_footnote_bbox'] is not None:
table_footnote_block, table_footnote_spans = merge_spans_to_block(
block['spans'], table_block['table_footnote_bbox'], BlockType.TableFootnote
)
block['blocks'].append(table_footnote_block)
break
del block['spans']
return block
def fix_text_block(block):
block_lines = merge_spans_to_line(block['spans'])
sort_block_lines = line_sort_spans_by_left_to_right(block_lines)
block['lines'] = sort_block_lines
del block['spans']
return block
......@@ -3,7 +3,7 @@ from loguru import logger
from magic_pdf.libs.boxbase import calculate_overlap_area_in_bbox1_area_ratio, get_minbox_if_overlap_by_ratio, \
__is_overlaps_y_exceeds_threshold
from magic_pdf.libs.drop_tag import DropTag
from magic_pdf.libs.ocr_content_type import ContentType
from magic_pdf.libs.ocr_content_type import ContentType, BlockType
def remove_overlaps_min_spans(spans):
......@@ -50,7 +50,8 @@ def remove_spans_by_bboxes_dict(spans, need_remove_spans_bboxes_dict):
need_remove_spans.append(span)
break
# 当drop_tag为DropTag.FOOTNOTE时, 判断span是否在removed_bboxes中任意一个的下方,如果是,则删除该span
elif drop_tag == DropTag.FOOTNOTE and (span['bbox'][1]+span['bbox'][3])/2 > removed_bbox[3] and removed_bbox[0] < (span['bbox'][0]+span['bbox'][2])/2 < removed_bbox[2]:
elif drop_tag == DropTag.FOOTNOTE and (span['bbox'][1] + span['bbox'][3]) / 2 > removed_bbox[3] and \
removed_bbox[0] < (span['bbox'][0] + span['bbox'][2]) / 2 < removed_bbox[2]:
need_remove_spans.append(span)
break
......@@ -162,9 +163,10 @@ def modify_inline_equation(spans: list, displayed_list: list, text_inline_lines:
text_line = text_inline_lines[j]
y0, y1 = text_line[1]
if (
span_y0 < y0 and span_y > y0 or span_y0 < y1 and span_y > y1 or span_y0 < y0 and span_y > y1) and __is_overlaps_y_exceeds_threshold(
span['bbox'], (0, y0, 0, y1)):
span_y0 < y0 < span_y or span_y0 < y1 < span_y or span_y0 < y0 and span_y > y1
) and __is_overlaps_y_exceeds_threshold(
span['bbox'], (0, y0, 0, y1)
):
# 调整公式类型
if span["type"] == ContentType.InterlineEquation:
# 最后一行是行间公式
......@@ -181,8 +183,8 @@ def modify_inline_equation(spans: list, displayed_list: list, text_inline_lines:
span["bbox"][1] = y0
span["bbox"][3] = y1
break
elif span_y < y0 or span_y0 < y0 and span_y > y0 and not __is_overlaps_y_exceeds_threshold(span['bbox'],
(0, y0, 0, y1)):
elif span_y < y0 or span_y0 < y0 < span_y and not __is_overlaps_y_exceeds_threshold(span['bbox'],
(0, y0, 0, y1)):
break
else:
j += 1
......@@ -211,3 +213,19 @@ def get_qa_need_list(blocks):
else:
continue
return images, tables, interline_equations, inline_equations
def get_qa_need_list_v2(blocks):
# 创建 images, tables, interline_equations, inline_equations 的副本
images = []
tables = []
interline_equations = []
for block in blocks:
if block["type"] == BlockType.Image:
images.append(block)
elif block["type"] == BlockType.Table:
tables.append(block)
elif block["type"] == BlockType.InterlineEquation:
interline_equations.append(block)
return images, tables, interline_equations
......@@ -68,7 +68,7 @@ def pdf_filter(page:fitz.Page, text_blocks, table_bboxes, image_bboxes) -> tuple
"""
if __is_contain_color_background_rect(page, text_blocks, image_bboxes):
return False, {"need_drop": True, "drop_reason": DropReason.COLOR_BACKGROUND_TEXT_BOX}
return False, {"_need_drop": True, "_drop_reason": DropReason.COLOR_BACKGROUND_TEXT_BOX}
return True, None
\ No newline at end of file
......@@ -5,7 +5,7 @@ def _remove_overlap_between_bbox(spans):
res = []
for v in spans:
for i in range(len(res)):
if _is_in(res[i]["bbox"], v["bbox"]):
if _is_in(res[i]["bbox"], v["bbox"]) or _is_in(v["bbox"], res[i]["bbox"]):
continue
if _is_in_or_part_overlap(res[i]["bbox"], v["bbox"]):
ix0, iy0, ix1, iy1 = res[i]["bbox"]
......@@ -17,21 +17,21 @@ def _remove_overlap_between_bbox(spans):
if diff_y > diff_x:
if x1 >= ix1:
mid = (x0 + ix1) // 2
ix1 = min(mid, ix1)
x0 = max(mid + 1, x0)
ix1 = min(mid - 0.25, ix1)
x0 = max(mid + 0.25, x0)
else:
mid = (ix0 + x1) // 2
ix0 = max(mid + 1, ix0)
x1 = min(mid, x1)
ix0 = max(mid + 0.25, ix0)
x1 = min(mid -0.25, x1)
else:
if y1 >= iy1:
mid = (y0 + iy1) // 2
y0 = max(mid + 1, y0)
iy1 = min(iy1, mid)
y0 = max(mid + 0.25, y0)
iy1 = min(iy1, mid-0.25)
else:
mid = (iy0 + y1) // 2
y1 = min(y1, mid)
iy0 = max(mid + 1, iy0)
y1 = min(y1, mid-0.25)
iy0 = max(mid + 0.25, iy0)
res[i]["bbox"] = [ix0, iy0, ix1, iy1]
v["bbox"] = [x0, y0, x1, y1]
......
import os
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from loguru import logger
MODE_TXT = "text"
MODE_BIN = "binary"
class DiskReaderWriter(AbsReaderWriter):
def __init__(self, parent_path, encoding='utf-8'):
def __init__(self, parent_path, encoding="utf-8"):
self.path = parent_path
self.encoding = encoding
......@@ -20,10 +22,10 @@ class DiskReaderWriter(AbsReaderWriter):
logger.error(f"文件 {abspath} 不存在")
raise Exception(f"文件 {abspath} 不存在")
if mode == MODE_TXT:
with open(abspath, 'r', encoding = self.encoding) as f:
with open(abspath, "r", encoding=self.encoding) as f:
return f.read()
elif mode == MODE_BIN:
with open(abspath, 'rb') as f:
with open(abspath, "rb") as f:
return f.read()
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
......@@ -33,32 +35,32 @@ class DiskReaderWriter(AbsReaderWriter):
abspath = path
else:
abspath = os.path.join(self.path, path)
directory_path = os.path.dirname(abspath)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
if mode == MODE_TXT:
with open(abspath, 'w', encoding=self.encoding) as f:
with open(abspath, "w", encoding=self.encoding) as f:
f.write(content)
logger.info(f"内容已成功写入 {abspath}")
elif mode == MODE_BIN:
with open(abspath, 'wb') as f:
with open(abspath, "wb") as f:
f.write(content)
logger.info(f"内容已成功写入 {abspath}")
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
def read_jsonl(self, path: str, byte_start=0, byte_end=None, encoding='utf-8'):
def read_jsonl(self, path: str, byte_start=0, byte_end=None, encoding="utf-8"):
return self.read(path)
# 使用示例
if __name__ == "__main__":
file_path = "io/example.txt"
file_path = "io/test/example.txt"
drw = DiskReaderWriter("D:\projects\papayfork\Magic-PDF\magic_pdf")
# 写入内容到文件
drw.write(b"Hello, World!", path="io/example.txt", mode="binary")
drw.write(b"Hello, World!", path="io/test/example.txt", mode="binary")
# 从文件读取内容
content = drw.read(path=file_path)
if content:
logger.info(f"从 {file_path} 读取的内容: {content}")
from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key
import boto3
from loguru import logger
......@@ -11,7 +11,7 @@ MODE_BIN = "binary"
class S3ReaderWriter(AbsReaderWriter):
def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str, parent_path: str):
def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str = 'auto', parent_path: str = ''):
self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
self.path = parent_path
......
from loguru import logger
from magic_pdf.libs.drop_reason import DropReason
from loguru import logger
from magic_pdf.libs.drop_reason import DropReason
def get_data_source(jso: dict):
data_source = jso.get("data_source")
if data_source is None:
data_source = jso.get("file_source")
return data_source
def get_data_type(jso: dict):
data_type = jso.get("data_type")
if data_type is None:
data_type = jso.get("file_type")
return data_type
def get_bookid(jso: dict):
book_id = jso.get("bookid")
if book_id is None:
book_id = jso.get("original_file_id")
return book_id
def exception_handler(jso: dict, e):
logger.exception(e)
jso["need_drop"] = True
jso["drop_reason"] = DropReason.Exception
jso["exception"] = f"ERROR: {e}"
return jso
def get_bookname(jso: dict):
data_source = get_data_source(jso)
file_id = jso.get("file_id")
book_name = f"{data_source}/{file_id}"
return book_name
from loguru import logger
"""
用户输入:
model数组,每个元素代表一个页面
pdf在s3的路径
截图保存的s3位置
from magic_pdf.libs.drop_reason import DropReason
然后:
1)根据s3路径,调用spark集群的api,拿到ak,sk,endpoint,构造出s3PDFReader
2)根据用户输入的s3地址,调用spark集群的api,拿到ak,sk,endpoint,构造出s3ImageWriter
其余部分至于构造s3cli, 获取ak,sk都在code-clean里写代码完成。不要反向依赖!!!
def get_data_source(jso: dict):
data_source = jso.get("data_source")
if data_source is None:
data_source = jso.get("file_source")
return data_source
"""
from loguru import logger
from magic_pdf.io import AbsReaderWriter
from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
from magic_pdf.pdf_parse_by_txt import parse_pdf_by_txt
def get_data_type(jso: dict):
data_type = jso.get("data_type")
if data_type is None:
data_type = jso.get("file_type")
return data_type
def parse_txt_pdf(pdf_bytes:bytes, pdf_models:list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args, **kwargs):
"""
解析文本类pdf
"""
pdf_info_dict = parse_pdf_by_txt(
pdf_bytes,
pdf_models,
imageWriter,
start_page_id=start_page,
debug_mode=is_debug,
)
def get_bookid(jso: dict):
book_id = jso.get("bookid")
if book_id is None:
book_id = jso.get("original_file_id")
return book_id
pdf_info_dict["parse_type"] = "txt"
return pdf_info_dict
def exception_handler(jso: dict, e):
logger.exception(e)
jso["_need_drop"] = True
jso["_drop_reason"] = DropReason.Exception
jso["_exception"] = f"ERROR: {e}"
return jso
def parse_ocr_pdf(pdf_bytes:bytes, pdf_models:list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args, **kwargs):
"""
解析ocr类pdf
"""
pdf_info_dict = parse_pdf_by_ocr(
pdf_bytes,
pdf_models,
imageWriter,
start_page_id=start_page,
debug_mode=is_debug,
)
def get_bookname(jso: dict):
data_source = get_data_source(jso)
file_id = jso.get("file_id")
book_name = f"{data_source}/{file_id}"
return book_name
pdf_info_dict["parse_type"] = "ocr"
return pdf_info_dict
def spark_json_extractor(jso: dict) -> dict:
def parse_union_pdf(pdf_bytes:bytes, pdf_models:list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args, **kwargs):
"""
ocr和文本混合的pdf,全部解析出来
从json中提取数据,返回一个dict
"""
def parse_pdf(method):
try:
return method(
pdf_bytes,
pdf_models,
imageWriter,
start_page_id=start_page,
debug_mode=is_debug,
)
except Exception as e:
logger.error(f"{method.__name__} error: {e}")
return None
pdf_info_dict = parse_pdf(parse_pdf_by_txt)
if pdf_info_dict is None or pdf_info_dict.get("need_drop", False):
logger.warning(f"parse_pdf_by_txt drop or error, switch to parse_pdf_by_ocr")
pdf_info_dict = parse_pdf(parse_pdf_by_ocr)
if pdf_info_dict is None:
raise Exception("Both parse_pdf_by_txt and parse_pdf_by_ocr failed.")
else:
pdf_info_dict["parse_type"] = "ocr"
else:
pdf_info_dict["parse_type"] = "txt"
return pdf_info_dict
def spark_json_extractor(jso:dict):
pass
return {
"_pdf_type": jso["_pdf_type"],
"model_list": jso["doc_layout_result"],
}
"""
用户输入:
model数组,每个元素代表一个页面
pdf在s3的路径
截图保存的s3位置
然后:
1)根据s3路径,调用spark集群的api,拿到ak,sk,endpoint,构造出s3PDFReader
2)根据用户输入的s3地址,调用spark集群的api,拿到ak,sk,endpoint,构造出s3ImageWriter
其余部分至于构造s3cli, 获取ak,sk都在code-clean里写代码完成。不要反向依赖!!!
"""
from loguru import logger
from magic_pdf.rw import AbsReaderWriter
from magic_pdf.pdf_parse_by_ocr_v2 import parse_pdf_by_ocr
from magic_pdf.pdf_parse_by_txt_v2 import parse_pdf_by_txt
PARSE_TYPE_TXT = "txt"
PARSE_TYPE_OCR = "ocr"
def parse_txt_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args,
**kwargs):
"""
解析文本类pdf
"""
pdf_info_dict = parse_pdf_by_txt(
pdf_bytes,
pdf_models,
imageWriter,
start_page_id=start_page,
debug_mode=is_debug,
)
pdf_info_dict["_parse_type"] = PARSE_TYPE_TXT
return pdf_info_dict
def parse_ocr_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0, *args,
**kwargs):
"""
解析ocr类pdf
"""
pdf_info_dict = parse_pdf_by_ocr(
pdf_bytes,
pdf_models,
imageWriter,
start_page_id=start_page,
debug_mode=is_debug,
)
pdf_info_dict["_parse_type"] = PARSE_TYPE_OCR
return pdf_info_dict
def parse_union_pdf(pdf_bytes: bytes, pdf_models: list, imageWriter: AbsReaderWriter, is_debug=False, start_page=0,
*args, **kwargs):
"""
ocr和文本混合的pdf,全部解析出来
"""
def parse_pdf(method):
try:
return method(
pdf_bytes,
pdf_models,
imageWriter,
start_page_id=start_page,
debug_mode=is_debug,
)
except Exception as e:
logger.error(f"{method.__name__} error: {e}")
return None
pdf_info_dict = parse_pdf(parse_pdf_by_txt)
if pdf_info_dict is None or pdf_info_dict.get("_need_drop", False):
logger.warning(f"parse_pdf_by_txt drop or error, switch to parse_pdf_by_ocr")
pdf_info_dict = parse_pdf(parse_pdf_by_ocr)
if pdf_info_dict is None:
raise Exception("Both parse_pdf_by_txt and parse_pdf_by_ocr failed.")
else:
pdf_info_dict["_parse_type"] = PARSE_TYPE_OCR
else:
pdf_info_dict["_parse_type"] = PARSE_TYPE_TXT
return pdf_info_dict
......@@ -14,5 +14,7 @@ termcolor>=2.4.0
wordninja>=2.0.0
en_core_web_sm @ https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl
zh_core_web_sm @ https://github.com/explosion/spacy-models/releases/download/zh_core_web_sm-3.7.0/zh_core_web_sm-3.7.0-py3-none-any.whl
scikit-learn==1.4.1.post1
nltk==3.8.1
\ No newline at end of file
scikit-learn>=1.0.2
nltk==3.8.1
s3pathlib>=2.1.1
# 工具脚本使用说明
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment