Commit 2df265c8 authored by zhougaofeng's avatar zhougaofeng
Browse files

Update magic_pdf/__init__.py, magic_pdf/config.ini, magic_pdf/tmp.py,...

Update magic_pdf/__init__.py, magic_pdf/config.ini, magic_pdf/tmp.py, magic_pdf/pdf_parse_by_ocr.py, magic_pdf/pdf_parse_by_txt.py, magic_pdf/user_api.py, magic_pdf/pdf_parse_union_core.py, magic_pdf/pdf_parse_union_core_v2.py, magic_pdf/config/__init__.py, magic_pdf/config/enums.py, magic_pdf/config/exceptions.py, magic_pdf/data/__init__.py, magic_pdf/data/schemas.py, magic_pdf/data/dataset.py, magic_pdf/data/utils.py, magic_pdf/data/read_api.py, magic_pdf/data/data_reader_writer/__init__.py, magic_pdf/data/data_reader_writer/base.py, magic_pdf/data/data_reader_writer/filebase.py, magic_pdf/data/data_reader_writer/s3.py, magic_pdf/data/data_reader_writer/multi_bucket_s3.py, magic_pdf/data/io/__init__.py, magic_pdf/data/io/base.py, magic_pdf/data/io/s3.py, magic_pdf/data/io/http.py, magic_pdf/dict2md/__init__.py, magic_pdf/dict2md/ocr_vllm_client.py, magic_pdf/dict2md/ocr_vllm_server.py, magic_pdf/dict2md/ocr_mkcontent.py, magic_pdf/dict2md/mkcontent.py, magic_pdf/dict2md/ocr_server.py, magic_pdf/dict2md/ocr_client.py, magic_pdf/filter/__init__.py, magic_pdf/filter/pdf_classify_by_type.py, magic_pdf/filter/pdf_meta_scan.py, magic_pdf/integrations/__init__.py, magic_pdf/integrations/rag/__init__.py, magic_pdf/integrations/rag/api.py, magic_pdf/integrations/rag/utils.py, magic_pdf/integrations/rag/type.py, magic_pdf/layout/__init__.py, magic_pdf/layout/layout_det_utils.py, magic_pdf/layout/layout_spiler_recog.py, magic_pdf/layout/mcol_sort.py, magic_pdf/layout/layout_sort.py, magic_pdf/layout/bbox_sort.py, magic_pdf/libs/__init__.py, magic_pdf/libs/boxbase.py, magic_pdf/libs/calc_span_stats.py, magic_pdf/libs/clean_memory.py, magic_pdf/libs/convert_utils.py, magic_pdf/libs/detect_language_from_model.py, magic_pdf/libs/coordinate_transform.py, magic_pdf/libs/drop_tag.py, magic_pdf/libs/nlp_utils.py, magic_pdf/libs/ocr_content_type.py, magic_pdf/libs/hash_utils.py, magic_pdf/libs/path_utils.py, magic_pdf/libs/pdf_check.py, magic_pdf/libs/language.py, magic_pdf/libs/markdown_utils.py, magic_pdf/libs/drop_reason.py, magic_pdf/libs/pdf_image_tools.py, magic_pdf/libs/config_reader.py, magic_pdf/libs/Constants.py, magic_pdf/libs/local_math.py, magic_pdf/libs/ModelBlockTypeEnum.py, magic_pdf/libs/draw_bbox.py, magic_pdf/libs/vis_utils.py, magic_pdf/libs/textbase.py, magic_pdf/libs/safe_filename.py, magic_pdf/libs/MakeContentConfig.py, magic_pdf/libs/version.py, magic_pdf/libs/json_compressor.py, magic_pdf/libs/commons.py, magic_pdf/model/__init__.py, magic_pdf/model/doc_analyze_by_custom_model.py, magic_pdf/model/magic_model.py, magic_pdf/model/model_list.py, magic_pdf/model/pp_structure_v2.py, magic_pdf/model/ppTableModel.py, magic_pdf/model/pdf_extract_kit.py, magic_pdf/model/pek_sub_modules/__init__.py, magic_pdf/model/pek_sub_modules/self_modify.py, magic_pdf/model/pek_sub_modules/post_process.py, magic_pdf/model/pek_sub_modules/layoutlmv3/__init__.py, magic_pdf/model/pek_sub_modules/layoutlmv3/backbone.py, magic_pdf/model/pek_sub_modules/layoutlmv3/beit.py, magic_pdf/model/pek_sub_modules/layoutlmv3/deit.py, magic_pdf/model/pek_sub_modules/layoutlmv3/model_init.py, magic_pdf/model/pek_sub_modules/layoutlmv3/visualizer.py, magic_pdf/model/pek_sub_modules/layoutlmv3/rcnn_vl.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/__init__.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/data/__init__.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/data/funsd.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/data/data_collator.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/data/cord.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/data/image_utils.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/data/xfund.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/models/__init__.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/models/layoutlmv3/__init__.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/models/layoutlmv3/tokenization_layoutlmv3.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/models/layoutlmv3/tokenization_layoutlmv3_fast.py, magic_pdf/model/pek_sub_modules/layoutlmv3/layoutlmft/models/layoutlmv3/configuration_layoutlmv3.py, magic_pdf/model/pek_sub_modules/structeqtable/__init__.py, magic_pdf/model/pek_sub_modules/structeqtable/StructTableModel.py, magic_pdf/para/__init__.py, magic_pdf/para/commons.py, magic_pdf/para/draw.py, magic_pdf/para/layout_match_processor.py, magic_pdf/para/raw_processor.py, magic_pdf/para/title_processor.py, magic_pdf/para/para_split.py, magic_pdf/para/denoise.py, magic_pdf/para/block_continuation_processor.py, magic_pdf/para/block_termination_processor.py, magic_pdf/para/para_pipeline.py, magic_pdf/para/para_split_v2.py, magic_pdf/para/para_split_v3.py, magic_pdf/para/stats.py, magic_pdf/para/exceptions.py, magic_pdf/parse/__init__.py, magic_pdf/parse/excel_parse.py, magic_pdf/parse/common_parse.py, magic_pdf/parse/ofd_parse.py, magic_pdf/parse/pdf_client.py, magic_pdf/pipe/__init__.py, magic_pdf/pipe/AbsPipe.py, magic_pdf/pipe/OCRPipe.py, magic_pdf/pipe/TXTPipe.py, magic_pdf/pipe/UNIPipe.py, magic_pdf/post_proc/__init__.py, magic_pdf/post_proc/pdf_post_filter.py, magic_pdf/post_proc/remove_spaces_html.py, magic_pdf/post_proc/remove_footnote.py, magic_pdf/post_proc/detect_para.py, magic_pdf/pre_proc/__init__.py, magic_pdf/pre_proc/post_layout_split.py, magic_pdf/pre_proc/citationmarker_remove.py, magic_pdf/pre_proc/detect_equation.py, magic_pdf/pre_proc/detect_footer_by_model.py, magic_pdf/pre_proc/cut_image.py, magic_pdf/pre_proc/construct_page_dict.py, magic_pdf/pre_proc/ocr_detect_layout.py, magic_pdf/pre_proc/pdf_pre_filter.py, magic_pdf/pre_proc/ocr_dict_merge.py, magic_pdf/pre_proc/ocr_span_list_modify.py, magic_pdf/pre_proc/remove_bbox_overlap.py, magic_pdf/pre_proc/remove_colored_strip_bbox.py, magic_pdf/pre_proc/remove_footer_header.py, magic_pdf/pre_proc/detect_header.py, magic_pdf/pre_proc/detect_page_number.py, magic_pdf/pre_proc/detect_tables.py, magic_pdf/pre_proc/detect_footer_header_by_statistics.py, magic_pdf/pre_proc/detect_footnote.py, magic_pdf/pre_proc/remove_rotate_bbox.py, magic_pdf/pre_proc/resolve_bbox_conflict.py, magic_pdf/pre_proc/solve_line_alien.py, magic_pdf/pre_proc/statistics.py, magic_pdf/pre_proc/detect_images.py, magic_pdf/pre_proc/equations_replace.py, magic_pdf/pre_proc/fix_image.py, magic_pdf/pre_proc/ocr_detect_all_bboxes.py, magic_pdf/pre_proc/fix_table.py, magic_pdf/pre_proc/main_text_font.py, magic_pdf/resources/fasttext-langdetect/lid.176.ftz, magic_pdf/resources/model_config/model_configs.yaml, magic_pdf/resources/model_config/layoutlmv3/layoutlmv3_base_inference.yaml, magic_pdf/resources/model_config/UniMERNet/demo.yaml, magic_pdf/rw/__init__.py, magic_pdf/rw/AbsReaderWriter.py, magic_pdf/rw/DiskReaderWriter.py, magic_pdf/rw/draw_ofd.py, magic_pdf/rw/ofdtemplate.py, magic_pdf/rw/pdf_parse.py, magic_pdf/rw/draw_pdf.py, magic_pdf/rw/S3ReaderWriter.py, magic_pdf/spark/__init__.py, magic_pdf/spark/spark_api.py, magic_pdf/tools/__init__.py, magic_pdf/tools/cli.py, magic_pdf/tools/cli_dev.py, magic_pdf/tools/common.py, magic_pdf/tools/file_deal.py, magic_pdf/tools/img_deal.py, magic_pdf/tools/find_seal_img.py, magic_pdf/tools/font_tools.py, magic_pdf/tools/file_parser.py, magic_pdf/tools/parameter_parser.py, magic_pdf/tools/ofd.py, magic_pdf/tools/pdf_server.py, magic_pdf/tools/ofd_parser.py, magic_pdf/utils/__init__.py, magic_pdf/utils/annotations.py files
parent 826086d2
import math
from loguru import logger
from magic_pdf.libs.boxbase import find_bottom_nearest_text_bbox, find_top_nearest_text_bbox
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.ocr_content_type import ContentType
TYPE_INLINE_EQUATION = ContentType.InlineEquation
TYPE_INTERLINE_EQUATION = ContentType.InterlineEquation
UNI_FORMAT_TEXT_TYPE = ['text', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']
@DeprecationWarning
def mk_nlp_markdown_1(para_dict: dict):
"""
对排序后的bboxes拼接内容
"""
content_lst = []
for _, page_info in para_dict.items():
para_blocks = page_info.get("para_blocks")
if not para_blocks:
continue
for block in para_blocks:
item = block["paras"]
for _, p in item.items():
para_text = p["para_text"]
is_title = p["is_para_title"]
title_level = p['para_title_level']
md_title_prefix = "#"*title_level
if is_title:
content_lst.append(f"{md_title_prefix} {para_text}")
else:
content_lst.append(para_text)
content_text = "\n\n".join(content_lst)
return content_text
# 找到目标字符串在段落中的索引
def __find_index(paragraph, target):
index = paragraph.find(target)
if index != -1:
return index
else:
return None
def __insert_string(paragraph, target, postion):
new_paragraph = paragraph[:postion] + target + paragraph[postion:]
return new_paragraph
def __insert_after(content, image_content, target):
"""
在content中找到target,将image_content插入到target后面
"""
index = content.find(target)
if index != -1:
content = content[:index+len(target)] + "\n\n" + image_content + "\n\n" + content[index+len(target):]
else:
logger.error(f"Can't find the location of image {image_content} in the markdown file, search target is {target}")
return content
def __insert_before(content, image_content, target):
"""
在content中找到target,将image_content插入到target前面
"""
index = content.find(target)
if index != -1:
content = content[:index] + "\n\n" + image_content + "\n\n" + content[index:]
else:
logger.error(f"Can't find the location of image {image_content} in the markdown file, search target is {target}")
return content
@DeprecationWarning
def mk_mm_markdown_1(para_dict: dict):
"""拼装多模态markdown"""
content_lst = []
for _, page_info in para_dict.items():
page_lst = [] # 一个page内的段落列表
para_blocks = page_info.get("para_blocks")
pymu_raw_blocks = page_info.get("preproc_blocks")
all_page_images = []
all_page_images.extend(page_info.get("images",[]))
all_page_images.extend(page_info.get("image_backup", []) )
all_page_images.extend(page_info.get("tables",[]))
all_page_images.extend(page_info.get("table_backup",[]) )
if not para_blocks or not pymu_raw_blocks: # 只有图片的拼接的场景
for img in all_page_images:
page_lst.append(f"![]({img['image_path']})") # TODO 图片顺序
page_md = "\n\n".join(page_lst)
else:
for block in para_blocks:
item = block["paras"]
for _, p in item.items():
para_text = p["para_text"]
is_title = p["is_para_title"]
title_level = p['para_title_level']
md_title_prefix = "#"*title_level
if is_title:
page_lst.append(f"{md_title_prefix} {para_text}")
else:
page_lst.append(para_text)
"""拼装成一个页面的文本"""
page_md = "\n\n".join(page_lst)
"""插入图片"""
for img in all_page_images:
imgbox = img['bbox']
img_content = f"![]({img['image_path']})"
# 先看在哪个block内
for block in pymu_raw_blocks:
bbox = block['bbox']
if bbox[0]-1 <= imgbox[0] < bbox[2]+1 and bbox[1]-1 <= imgbox[1] < bbox[3]+1:# 确定在block内
for l in block['lines']:
line_box = l['bbox']
if line_box[0]-1 <= imgbox[0] < line_box[2]+1 and line_box[1]-1 <= imgbox[1] < line_box[3]+1: # 在line内的,插入line前面
line_txt = "".join([s['text'] for s in l['spans']])
page_md = __insert_before(page_md, img_content, line_txt)
break
break
else:# 在行与行之间
# 找到图片x0,y0与line的x0,y0最近的line
min_distance = 100000
min_line = None
for l in block['lines']:
line_box = l['bbox']
distance = math.sqrt((line_box[0] - imgbox[0])**2 + (line_box[1] - imgbox[1])**2)
if distance < min_distance:
min_distance = distance
min_line = l
if min_line:
line_txt = "".join([s['text'] for s in min_line['spans']])
img_h = imgbox[3] - imgbox[1]
if min_distance<img_h: # 文字在图片前面
page_md = __insert_after(page_md, img_content, line_txt)
else:
page_md = __insert_before(page_md, img_content, line_txt)
else:
logger.error(f"Can't find the location of image {img['image_path']} in the markdown file #1")
else:# 应当在两个block之间
# 找到上方最近的block,如果上方没有就找大下方最近的block
top_txt_block = find_top_nearest_text_bbox(pymu_raw_blocks, imgbox)
if top_txt_block:
line_txt = "".join([s['text'] for s in top_txt_block['lines'][-1]['spans']])
page_md = __insert_after(page_md, img_content, line_txt)
else:
bottom_txt_block = find_bottom_nearest_text_bbox(pymu_raw_blocks, imgbox)
if bottom_txt_block:
line_txt = "".join([s['text'] for s in bottom_txt_block['lines'][0]['spans']])
page_md = __insert_before(page_md, img_content, line_txt)
else:
logger.error(f"Can't find the location of image {img['image_path']} in the markdown file #2")
content_lst.append(page_md)
"""拼装成全部页面的文本"""
content_text = "\n\n".join(content_lst)
return content_text
def __insert_after_para(text, type, element, content_list):
"""
在content_list中找到text,将image_path作为一个新的node插入到text后面
"""
for i, c in enumerate(content_list):
content_type = c.get("type")
if content_type in UNI_FORMAT_TEXT_TYPE and text in c.get("text", ''):
if type == "image":
content_node = {
"type": "image",
"img_path": element.get("image_path"),
"img_alt": "",
"img_title": "",
"img_caption": "",
}
elif type == "table":
content_node = {
"type": "table",
"img_path": element.get("image_path"),
"table_latex": element.get("text"),
"table_title": "",
"table_caption": "",
"table_quality": element.get("quality"),
}
content_list.insert(i+1, content_node)
break
else:
logger.error(f"Can't find the location of image {element.get('image_path')} in the markdown file, search target is {text}")
def __insert_before_para(text, type, element, content_list):
"""
在content_list中找到text,将image_path作为一个新的node插入到text前面
"""
for i, c in enumerate(content_list):
content_type = c.get("type")
if content_type in UNI_FORMAT_TEXT_TYPE and text in c.get("text", ''):
if type == "image":
content_node = {
"type": "image",
"img_path": element.get("image_path"),
"img_alt": "",
"img_title": "",
"img_caption": "",
}
elif type == "table":
content_node = {
"type": "table",
"img_path": element.get("image_path"),
"table_latex": element.get("text"),
"table_title": "",
"table_caption": "",
"table_quality": element.get("quality"),
}
content_list.insert(i, content_node)
break
else:
logger.error(f"Can't find the location of image {element.get('image_path')} in the markdown file, search target is {text}")
def mk_universal_format(pdf_info_list: list, img_buket_path):
"""
构造统一格式 https://aicarrier.feishu.cn/wiki/FqmMwcH69iIdCWkkyjvcDwNUnTY
"""
content_lst = []
for page_info in pdf_info_list:
page_lst = [] # 一个page内的段落列表
para_blocks = page_info.get("para_blocks")
pymu_raw_blocks = page_info.get("preproc_blocks")
all_page_images = []
all_page_images.extend(page_info.get("images",[]))
all_page_images.extend(page_info.get("image_backup", []) )
# all_page_images.extend(page_info.get("tables",[]))
# all_page_images.extend(page_info.get("table_backup",[]) )
all_page_tables = []
all_page_tables.extend(page_info.get("tables", []))
if not para_blocks or not pymu_raw_blocks: # 只有图片的拼接的场景
for img in all_page_images:
content_node = {
"type": "image",
"img_path": join_path(img_buket_path, img['image_path']),
"img_alt":"",
"img_title":"",
"img_caption":""
}
page_lst.append(content_node) # TODO 图片顺序
for table in all_page_tables:
content_node = {
"type": "table",
"img_path": join_path(img_buket_path, table['image_path']),
"table_latex": table.get("text"),
"table_title": "",
"table_caption": "",
"table_quality": table.get("quality"),
}
page_lst.append(content_node) # TODO 图片顺序
else:
for block in para_blocks:
item = block["paras"]
for _, p in item.items():
font_type = p['para_font_type']# 对于文本来说,要么是普通文本,要么是个行间公式
if font_type == TYPE_INTERLINE_EQUATION:
content_node = {
"type": "equation",
"latex": p["para_text"]
}
page_lst.append(content_node)
else:
para_text = p["para_text"]
is_title = p["is_para_title"]
title_level = p['para_title_level']
if is_title:
content_node = {
"type": f"h{title_level}",
"text": para_text
}
page_lst.append(content_node)
else:
content_node = {
"type": "text",
"text": para_text
}
page_lst.append(content_node)
content_lst.extend(page_lst)
"""插入图片"""
for img in all_page_images:
insert_img_or_table("image", img, pymu_raw_blocks, content_lst)
"""插入表格"""
for table in all_page_tables:
insert_img_or_table("table", table, pymu_raw_blocks, content_lst)
# end for
return content_lst
def insert_img_or_table(type, element, pymu_raw_blocks, content_lst):
element_bbox = element['bbox']
# 先看在哪个block内
for block in pymu_raw_blocks:
bbox = block['bbox']
if bbox[0] - 1 <= element_bbox[0] < bbox[2] + 1 and bbox[1] - 1 <= element_bbox[1] < bbox[
3] + 1: # 确定在这个大的block内,然后进入逐行比较距离
for l in block['lines']:
line_box = l['bbox']
if line_box[0] - 1 <= element_bbox[0] < line_box[2] + 1 and line_box[1] - 1 <= element_bbox[1] < line_box[
3] + 1: # 在line内的,插入line前面
line_txt = "".join([s['text'] for s in l['spans']])
__insert_before_para(line_txt, type, element, content_lst)
break
break
else: # 在行与行之间
# 找到图片x0,y0与line的x0,y0最近的line
min_distance = 100000
min_line = None
for l in block['lines']:
line_box = l['bbox']
distance = math.sqrt((line_box[0] - element_bbox[0]) ** 2 + (line_box[1] - element_bbox[1]) ** 2)
if distance < min_distance:
min_distance = distance
min_line = l
if min_line:
line_txt = "".join([s['text'] for s in min_line['spans']])
img_h = element_bbox[3] - element_bbox[1]
if min_distance < img_h: # 文字在图片前面
__insert_after_para(line_txt, type, element, content_lst)
else:
__insert_before_para(line_txt, type, element, content_lst)
break
else:
logger.error(f"Can't find the location of image {element.get('image_path')} in the markdown file #1")
else: # 应当在两个block之间
# 找到上方最近的block,如果上方没有就找大下方最近的block
top_txt_block = find_top_nearest_text_bbox(pymu_raw_blocks, element_bbox)
if top_txt_block:
line_txt = "".join([s['text'] for s in top_txt_block['lines'][-1]['spans']])
__insert_after_para(line_txt, type, element, content_lst)
else:
bottom_txt_block = find_bottom_nearest_text_bbox(pymu_raw_blocks, element_bbox)
if bottom_txt_block:
line_txt = "".join([s['text'] for s in bottom_txt_block['lines'][0]['spans']])
__insert_before_para(line_txt, type, element, content_lst)
else: # TODO ,图片可能独占一列,这种情况上下是没有图片的
logger.error(f"Can't find the location of image {element.get('image_path')} in the markdown file #2")
def mk_mm_markdown(content_list):
"""
基于同一格式的内容列表,构造markdown,含图片
"""
content_md = []
for c in content_list:
content_type = c.get("type")
if content_type == "text":
content_md.append(c.get("text"))
elif content_type == "equation":
content = c.get("latex")
if content.startswith("$$") and content.endswith("$$"):
content_md.append(content)
else:
content_md.append(f"\n$$\n{c.get('latex')}\n$$\n")
elif content_type in UNI_FORMAT_TEXT_TYPE:
content_md.append(f"{'#'*int(content_type[1])} {c.get('text')}")
elif content_type == "image":
content_md.append(f"![]({c.get('img_path')})")
return "\n\n".join(content_md)
def mk_nlp_markdown(content_list):
"""
基于同一格式的内容列表,构造markdown,不含图片
"""
content_md = []
for c in content_list:
content_type = c.get("type")
if content_type == "text":
content_md.append(c.get("text"))
elif content_type == "equation":
content_md.append(f"$$\n{c.get('latex')}\n$$")
elif content_type == "table":
content_md.append(f"$$$\n{c.get('table_latex')}\n$$$")
elif content_type in UNI_FORMAT_TEXT_TYPE:
content_md.append(f"{'#'*int(content_type[1])} {c.get('text')}")
return "\n\n".join(content_md)
\ No newline at end of file
import configparser
import os
import json
import requests
from loguru import logger
import argparse
import time
from PIL import Image
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--config_path',
default='/home/practice/magic_pdf-main/magic_pdf/config.ini',
)
parser.add_argument(
'--image_path',
default='/home/wanglch/projects/Qwen2-VL/20240920-163701.png',
)
parser.add_argument(
'--text',
default="描述你在图片中看到的内容",
)
args = parser.parse_args()
return args
def parse_text(text):
lines = text.split("\n")
lines = [line for line in lines if line.strip() != ""] # 去除空行
count = 0
parsed_lines = []
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split("`")
if count % 2 == 1:
# 开始代码块
parsed_lines.append(f'<pre><code class="language-{items[-1]}">')
else:
# 结束代码块
parsed_lines.append(f"</code></pre>")
else:
if i > 0 and count % 2 == 1:
# 转义代码块内的特殊字符
line = line.replace("`", r"\`")
line = line.replace("<", "&lt;")
line = line.replace(">", "&gt;")
line = line.replace(" ", "&nbsp;")
line = line.replace("*", "&ast;")
line = line.replace("_", "&lowbar;")
line = line.replace("-", "&#45;")
line = line.replace(".", "&#46;")
line = line.replace("!", "&#33;")
line = line.replace("(", "&#40;")
line = line.replace(")", "&#41;")
line = line.replace("$", "&#36;")
# 使用空格连接行
if parsed_lines:
parsed_lines[-1] += " " + line
else:
parsed_lines.append(line)
text = "".join(parsed_lines)
return text
def unparse_text(parsed_text):
in_code_block = False
lines = parsed_text.split("\n")
unparsed_lines = []
for line in lines:
if "<pre><code" in line:
in_code_block = True
# 移除开始标签
line = line.split(">", 1)[1]
elif "</code></pre>" in line:
in_code_block = False
# 移除结束标签
line = line.rsplit("<", 1)[0]
# 反转 HTML 实体
line = line.replace("&lt;", "<")
line = line.replace("&gt;", ">")
line = line.replace("&nbsp;", " ")
line = line.replace("&ast;", "*")
line = line.replace("&lowbar;", "_")
line = line.replace("&#45;", "-")
line = line.replace("&#46;", ".")
line = line.replace("&#33;", "!")
line = line.replace("&#40;", "(")
line = line.replace("&#41;", ")")
line = line.replace("&#36;", "$")
# 如果在代码块内,还原反斜杠转义
if in_code_block:
line = line.replace(r"\`", "`")
unparsed_lines.append(line)
# 合并所有行
unparsed_text = "\n".join(unparsed_lines)
return unparsed_text
def compress_image(image_path, max_size=(1024, 1024)):
img = Image.open(image_path)
width, height = img.size
aspect_ratio = width / height
if width > max_size[0] or height > max_size[1]:
if width > height:
new_width = max_size[0]
new_height = int(new_width / aspect_ratio)
else:
new_height = max_size[1]
new_width = int(new_height * aspect_ratio)
img = img.resize((new_width, new_height), Image.LANCZOS)
img.save(image_path, optimize=True, quality=80)
class PredictClient:
def __init__(self, api_url):
self.api_url = api_url
def check_health(self):
health_check_url = f'{self.api_url}/health'
try:
response = requests.get(health_check_url)
if response.status_code == 200:
logger.info("Server is healthy and ready to process requests.")
return True
else:
logger.error(f'Server health check failed with status code:{response.status_code}')
return False
except requests.exceptions.RequestException as e:
logger.error(f'Health check request failed:{e}')
return False
def predict(self, image_path: str, text: str):
payload = {
"image_path": image_path,
"text": text
}
headers = {'Content-Type': 'application/json'}
response = requests.post(f"{self.api_url}/predict", json=payload, headers=headers)
if response.status_code == 200:
result = response.json()
return result.get('Generated Text', '')
else:
raise Exception(f"Predict API request failed with status code {response.status_code}")
def main():
args = parse_args()
config = configparser.ConfigParser()
config.read(args.config_path)
ocr_server = config.get('server', 'ocr_server')
client = PredictClient(ocr_server)
try:
start_time = time.time() # 记录开始时间
# 压缩图片
#compress_image(args.image_path)
generated_text = client.predict(args.image_path, parse_text(args.text))
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time # 计算运行时间
if generated_text:
clean_text = unparse_text(generated_text) # 解析生成的文本
logger.info(f"Image Path: {args.image_path}")
logger.info(f"Generated Text: {clean_text}")
logger.info(f"耗时为: {elapsed_time}秒") # 打印运行时间
else:
logger.warning("Received empty generated text.")
except requests.exceptions.RequestException as e:
logger.error(f"Error while making request to predict service: {e}")
except Exception as e:
logger.error(f"Unexpected error occurred: {e}")
if __name__ == "__main__":
main()
import re
from loguru import logger
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.language import detect_lang
from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
from magic_pdf.libs.markdown_utils import ocr_escape_special_markdown_char
from magic_pdf.libs.ocr_content_type import BlockType, ContentType
from magic_pdf.para.para_split_v3 import ListLineTag
def __is_hyphen_at_line_end(line):
"""
Check if a line ends with one or more letters followed by a hyphen.
Args:
line (str): The line of text to check.
Returns:
bool: True if the line ends with one or more letters followed by a hyphen, False otherwise.
"""
# Use regex to check if the line ends with one or more letters followed by a hyphen
return bool(re.search(r'[A-Za-z]+-\s*$', line))
def ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_dict: list,
img_buket_path):
markdown_with_para_and_pagination = []
page_no = 0
for page_info in pdf_info_dict:
paras_of_layout = page_info.get('para_blocks')
if not paras_of_layout:
continue
page_markdown = ocr_mk_markdown_with_para_core_v2(
paras_of_layout, 'mm', img_buket_path)
markdown_with_para_and_pagination.append({
'page_no':
page_no,
'md_content':
'\n\n'.join(page_markdown)
})
page_no += 1
return markdown_with_para_and_pagination
def ocr_mk_markdown_with_para_core_v2(paras_of_layout,
mode,
img_buket_path='',
):
page_markdown = []
for para_block in paras_of_layout:
para_text = ''
para_type = para_block['type']
if para_type in [BlockType.Text, BlockType.List, BlockType.Index]:
para_text = merge_para_with_text(para_block)
elif para_type == BlockType.Title:
para_text = f'# {merge_para_with_text(para_block)}'
elif para_type == BlockType.InterlineEquation:
para_text = merge_para_with_text(para_block)
elif para_type == BlockType.Image:
if mode == 'nlp':
continue
elif mode == 'mm':
for block in para_block['blocks']: # 1st.拼image_body
if block['type'] == BlockType.ImageBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Image:
if span.get('image_path', ''):
para_text += f"\n"
for block in para_block['blocks']: # 2nd.拼image_caption
if block['type'] == BlockType.ImageCaption:
para_text += merge_para_with_text(block) + ' \n'
for block in para_block['blocks']: # 3rd.拼image_footnote
if block['type'] == BlockType.ImageFootnote:
para_text += merge_para_with_text(block) + ' \n'
elif para_type == BlockType.Table:
if mode == 'nlp':
continue
elif mode == 'mm':
for block in para_block['blocks']: # 1st.拼table_caption
if block['type'] == BlockType.TableCaption:
para_text += merge_para_with_text(block) + ' \n'
for block in para_block['blocks']: # 2nd.拼table_body
if block['type'] == BlockType.TableBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Table:
# if processed by table model
if span.get('latex', ''):
para_text += f"\n\n$\n {span['latex']}\n$\n\n"
elif span.get('html', ''):
para_text += f"\n\n{span['html']}\n\n"
elif span.get('image_path', ''):
para_text += span['image_path']
for block in para_block['blocks']: # 3rd.拼table_footnote
if block['type'] == BlockType.TableFootnote:
para_text += merge_para_with_text(block) + ' \n'
if para_text.strip() == '':
continue
else:
page_markdown.append(para_text.strip() + ' ')
return page_markdown
def detect_language(text):
en_pattern = r'[a-zA-Z]+'
en_matches = re.findall(en_pattern, text)
en_length = sum(len(match) for match in en_matches)
if len(text) > 0:
if en_length / len(text) >= 0.5:
return 'en'
else:
return 'unknown'
else:
return 'empty'
def merge_para_with_text(para_block):
para_text = ''
for i, line in enumerate(para_block['lines']):
if i >= 1 and line.get(ListLineTag.IS_LIST_START_LINE, False):
para_text += ' \n'
line_text = ''
line_lang = ''
for span in line['spans']:
span_type = span['type']
if span_type == ContentType.Text:
line_text += span['content'].strip()
if line_text != '':
line_lang = detect_lang(line_text)
for span in line['spans']:
span_type = span['type']
content = ''
if span_type == ContentType.Text:
content = ocr_escape_special_markdown_char(span['content'])
elif span_type == ContentType.InlineEquation:
content = f" ${span['content']}$ "
elif span_type == ContentType.InterlineEquation:
content = f"\n$$\n{span['content']}\n$$\n"
if content != '':
langs = ['zh', 'ja', 'ko']
if line_lang in langs: # 遇到一些一个字一个span的文档,这种单字语言判断不准,需要用整行文本判断
para_text += content # 中文/日语/韩文语境下,content间不需要空格分隔
elif line_lang == 'en':
# 如果是前一行带有-连字符,那么末尾不应该加空格
if __is_hyphen_at_line_end(content):
para_text += content[:-1]
else:
para_text += content + ' '
else:
para_text += content + ' ' # 西方文本语境下 content间需要空格分隔
return para_text
def para_to_standard_format_v2(para_block, img_buket_path, page_idx, drop_reason=None):
para_type = para_block['type']
para_content = {}
if para_type in [BlockType.Text, BlockType.List, BlockType.Index]:
para_content = {
'type': 'text',
'text': merge_para_with_text(para_block),
}
elif para_type == BlockType.Title:
para_content = {
'type': 'text',
'text': merge_para_with_text(para_block),
'text_level': 1,
}
elif para_type == BlockType.InterlineEquation:
para_content = {
'type': 'equation',
'text': merge_para_with_text(para_block),
'text_format': 'latex',
}
elif para_type == BlockType.Image:
para_content = {'type': 'image', 'img_path': '', 'img_caption': [], 'img_footnote': []}
for block in para_block['blocks']:
if block['type'] == BlockType.ImageBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Image:
if span.get('image_path', ''):
para_content['img_path'] = join_path(img_buket_path, span['image_path'])
if block['type'] == BlockType.ImageCaption:
para_content['img_caption'].append(merge_para_with_text(block))
if block['type'] == BlockType.ImageFootnote:
para_content['img_footnote'].append(merge_para_with_text(block))
elif para_type == BlockType.Table:
para_content = {'type': 'table', 'img_path': '', 'table_caption': [], 'table_footnote': []}
for block in para_block['blocks']:
if block['type'] == BlockType.TableBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Table:
if span.get('latex', ''):
para_content['table_body'] = f"\n\n$\n {span['latex']}\n$\n\n"
elif span.get('html', ''):
para_content['table_body'] = f"\n\n{span['html']}\n\n"
if span.get('image_path', ''):
para_content['img_path'] = join_path(img_buket_path, span['image_path'])
if block['type'] == BlockType.TableCaption:
para_content['table_caption'].append(merge_para_with_text(block))
if block['type'] == BlockType.TableFootnote:
para_content['table_footnote'].append(merge_para_with_text(block))
para_content['page_idx'] = page_idx
if drop_reason is not None:
para_content['drop_reason'] = drop_reason
return para_content
def union_make(pdf_info_dict: list,
make_mode: str,
drop_mode: str,
img_buket_path: str = '',
):
output_content = []
for page_info in pdf_info_dict:
drop_reason_flag = False
drop_reason = None
if page_info.get('need_drop', False):
drop_reason = page_info.get('drop_reason')
if drop_mode == DropMode.NONE:
pass
elif drop_mode == DropMode.NONE_WITH_REASON:
drop_reason_flag = True
elif drop_mode == DropMode.WHOLE_PDF:
raise Exception((f'drop_mode is {DropMode.WHOLE_PDF} ,'
f'drop_reason is {drop_reason}'))
elif drop_mode == DropMode.SINGLE_PAGE:
logger.warning((f'drop_mode is {DropMode.SINGLE_PAGE} ,'
f'drop_reason is {drop_reason}'))
continue
else:
raise Exception('drop_mode can not be null')
paras_of_layout = page_info.get('para_blocks')
page_idx = page_info.get('page_idx')
if not paras_of_layout:
continue
if make_mode == MakeMode.MM_MD:
page_markdown = ocr_mk_markdown_with_para_core_v2(
paras_of_layout, 'mm', img_buket_path)
output_content.extend(page_markdown)
elif make_mode == MakeMode.NLP_MD:
page_markdown = ocr_mk_markdown_with_para_core_v2(
paras_of_layout, 'nlp')
output_content.extend(page_markdown)
elif make_mode == MakeMode.STANDARD_FORMAT:
for para_block in paras_of_layout:
if drop_reason_flag:
para_content = para_to_standard_format_v2(
para_block, img_buket_path, page_idx)
else:
para_content = para_to_standard_format_v2(
para_block, img_buket_path, page_idx)
output_content.append(para_content)
if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
return '\n\n'.join(output_content)
elif make_mode == MakeMode.STANDARD_FORMAT:
return output_content
# Copyright (c) Alibaba Cloud.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree
import configparser
import copy
import re
import gc
import time
import torch
from argparse import ArgumentParser
from threading import Thread
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration, TextIteratorStreamer
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
from loguru import logger
app = FastAPI()
DEFAULT_CKPT_PATH = '/home/practice/model/Qwen2-VL-7B-Instruct'
REVISION = 'v1.0.4'
BOX_TAG_PATTERN = r"<box>([\s\S]*?)</box>"
PUNCTUATION = "!?。"#$%&'()*+,-/:;<=>@[\]^_`{|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏."
logger.add("parse.log", rotation="10 MB", level="INFO",
format="{time} {level} {message}", encoding='utf-8', enqueue=True)
def _get_args():
parser = ArgumentParser()
parser.add_argument('-c', '--checkpoint_path', type=str, default=DEFAULT_CKPT_PATH,
help='Checkpoint name or path, default to %(default)r')
parser.add_argument('--cpu_only', action='store_true', help='Run demo with CPU only')
parser.add_argument('--flash_attn2', action='store_true', default=False,
help='Enable flash_attention_2 when loading the model.')
parser.add_argument('--share', action='store_true', default=False,
help='Create a publicly shareable link for the interface.')
parser.add_argument('--inbrowser', action='store_true', default=False,
help='Automatically launch the interface in a new tab on the default browser.')
parser.add_argument('--dcu_id', type=str, default='0', help='Specify the GPU ID to load the model onto.')
parser.add_argument(
'--config_path',
default='/home/practice/magic_pdf-main/magic_pdf/config.ini',
)
args = parser.parse_args()
return args
def _load_model_processor(args):
if args.cpu_only:
device_map = 'cpu'
else:
if args.dcu_id is not None:
device_map = {'': f'cuda:{args.dcu_id}'}
print('使用DCU推理:', f'cuda:{args.dcu_id}')
else:
device_map = 'auto'
if args.flash_attn2:
model = Qwen2VLForConditionalGeneration.from_pretrained(
args.checkpoint_path,
torch_dtype=torch.float16,
attn_implementation='flash_attention_2',
device_map=device_map
)
else:
model = Qwen2VLForConditionalGeneration.from_pretrained(
args.checkpoint_path,
torch_dtype=torch.float16,
device_map=device_map
)
processor = AutoProcessor.from_pretrained(args.checkpoint_path)
return model, processor
def _parse_text(text):
lines = text.split("\n")
lines = [line for line in lines if line.strip() != ""] # 去除空行
count = 0
parsed_lines = []
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split("`")
if count % 2 == 1:
# 开始代码块
parsed_lines.append(f'<pre><code class="language-{items[-1]}">')
else:
# 结束代码块
parsed_lines.append(f"</code></pre>")
else:
if i > 0 and count % 2 == 1:
# 转义代码块内的特殊字符
line = line.replace("`", r"\`")
line = line.replace("<", "&lt;")
line = line.replace(">", "&gt;")
line = line.replace(" ", "&nbsp;")
line = line.replace("*", "&ast;")
line = line.replace("_", "&lowbar;")
line = line.replace("-", "&#45;")
line = line.replace(".", "&#46;")
line = line.replace("!", "&#33;")
line = line.replace("(", "&#40;")
line = line.replace(")", "&#41;")
line = line.replace("$", "&#36;")
# 使用空格连接行
if parsed_lines:
parsed_lines[-1] += " " + line
else:
parsed_lines.append(line)
text = "".join(parsed_lines)
return text
def _remove_image_special(text):
text = text.replace('<ref>', '').replace('</ref>', '')
return re.sub(r'<box>.*?(</box>|$)', '', text)
def _is_video_file(filename):
video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.mpeg']
return any(filename.lower().endswith(ext) for ext in video_extensions)
def _transform_messages(original_messages):
transformed_messages = []
for message in original_messages:
new_content = []
for item in message['content']:
if 'image' in item:
new_item = {'type': 'image', 'image': item['image']}
elif 'text' in item:
new_item = {'type': 'text', 'text': item['text']}
elif 'video' in item:
new_item = {'type': 'video', 'video': item['video']}
else:
continue
new_content.append(new_item)
new_message = {'role': message['role'], 'content': new_content}
transformed_messages.append(new_message)
return transformed_messages
def _gc():
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def call_local_model(model, processor, messages):
messages = _transform_messages(messages)
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors='pt')
inputs = inputs.to(model.device)
tokenizer = processor.tokenizer
streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True)
gen_kwargs = {'max_new_tokens': 512, 'streamer': streamer, **inputs}
thread = Thread(target=model.generate, kwargs=gen_kwargs)
thread.start()
generated_text = ''
for new_text in streamer:
generated_text += new_text
yield _parse_text(generated_text)
def create_predict_fn(model, processor):
def predict(_chatbot, task_history):
chat_query = _chatbot[-1][0]
query = task_history[-1][0]
if len(chat_query) == 0:
_chatbot.pop()
task_history.pop()
return _chatbot
print('User: ' + _parse_text(query))
history_cp = copy.deepcopy(task_history)
full_response = ''
messages = []
content = []
for q, a in history_cp:
if isinstance(q, (tuple, list)):
if _is_video_file(q[0]):
content.append({'video': f'file://{q[0]}'})
else:
content.append({'image': f'file://{q[0]}'})
else:
content.append({'text': q})
messages.append({'role': 'user', 'content': content})
messages.append({'role': 'assistant', 'content': [{'text': a}]})
content = []
messages.pop()
for response in call_local_model(model, processor, messages):
_chatbot[-1] = (_parse_text(chat_query), _remove_image_special(_parse_text(response)))
yield _chatbot
full_response = _parse_text(response)
task_history[-1] = (query, full_response)
print('Qwen-VL-Chat: ' + _parse_text(full_response))
yield _chatbot
return predict
# 启用加载模型
args = _get_args()
model, processor = _load_model_processor(args)
class Item(BaseModel):
image_path: str
text: str
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/predict")
async def predict(item: Item):
messages = [
{
'role': 'user',
'content': [
{'image': item.image_path},
{'text': item.text}
]
}
]
start = time.time()
generated_text = ''
for response in call_local_model(model, processor, messages):
generated_text = _parse_text(response)
_gc()
end = time.time()
logger.info(f'【{item.image_path}】解析的结果是:{generated_text},耗时为:{end-start}')
return {"Generated Text": generated_text}
if __name__ == "__main__":
import uvicorn
args = _get_args()
config = configparser.ConfigParser()
config.read(args.config_path)
# host = config.get('server', 'ocr_host')
host, port = config.get('server', 'ocr_server').split('://')[1].split(':')[0], int(
config.get('server', 'ocr_server').split('://')[1].split(':')[1])
# port = int(config.get('server', 'ocr_port'))
uvicorn.run(app, host=host, port=port)
import os
import json
import requests
from loguru import logger
import argparse
import time
from PIL import Image
import configparser
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--config_path',
default='/home/practice/magic_pdf-main/magic_pdf/config.ini',
)
parser.add_argument(
'--image_path',
default='/path/to/your/image.png',
help='Path to the image file'
)
parser.add_argument(
'--text',
default="描述你在图片中看到的内容",
help='Text input for the model'
)
args = parser.parse_args()
return args
def parse_text(text):
lines = text.split("\n")
lines = [line for line in lines if line.strip() != ""] # 去除空行
count = 0
parsed_lines = []
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split("`")
if count % 2 == 1:
# 开始代码块
parsed_lines.append(f'<pre><code class="language-{items[-1]}">')
else:
# 结束代码块
parsed_lines.append(f"</code></pre>")
else:
if i > 0 and count % 2 == 1:
# 转义代码块内的特殊字符
line = line.replace("`", r"\`")
line = line.replace("<", "&lt;")
line = line.replace(">", "&gt;")
line = line.replace(" ", "&nbsp;")
line = line.replace("*", "&ast;")
line = line.replace("_", "&lowbar;")
line = line.replace("-", "&#45;")
line = line.replace(".", "&#46;")
line = line.replace("!", "&#33;")
line = line.replace("(", "&#40;")
line = line.replace(")", "&#41;")
line = line.replace("$", "&#36;")
# 使用空格连接行
if parsed_lines:
parsed_lines[-1] += " " + line
else:
parsed_lines.append(line)
text = "".join(parsed_lines)
return text
def unparse_text(parsed_text):
in_code_block = False
lines = parsed_text.split("\n")
unparsed_lines = []
for line in lines:
if "<pre><code" in line:
in_code_block = True
# 移除开始标签
line = line.split(">", 1)[1]
elif "</code></pre>" in line:
in_code_block = False
# 移除结束标签
line = line.rsplit("<", 1)[0]
# 反转 HTML 实体
line = line.replace("&lt;", "<")
line = line.replace("&gt;", ">")
line = line.replace("&nbsp;", " ")
line = line.replace("&ast;", "*")
line = line.replace("&lowbar;", "_")
line = line.replace("&#45;", "-")
line = line.replace("&#46;", ".")
line = line.replace("&#33;", "!")
line = line.replace("&#40;", "(")
line = line.replace("&#41;", ")")
line = line.replace("&#36;", "$")
# 如果在代码块内,还原反斜杠转义
if in_code_block:
line = line.replace(r"\`", "`")
unparsed_lines.append(line)
# 合并所有行
unparsed_text = "\n".join(unparsed_lines)
return unparsed_text
def compress_image(image_path, max_size=(512, 512)):
img = Image.open(image_path)
width, height = img.size
aspect_ratio = width / height
if width > max_size[0] or height > max_size[1]:
if width > height:
new_width = max_size[0]
new_height = int(new_width / aspect_ratio)
else:
new_height = max_size[1]
new_width = int(new_height * aspect_ratio)
img = img.resize((new_width, new_height), Image.LANCZOS)
img.save(image_path, optimize=True, quality=80)
class PredictClient:
def __init__(self, api_url):
self.api_url = api_url
def check_health(self):
health_check_url = f'{self.api_url}/health'
try:
response = requests.get(health_check_url)
if response.status_code == 200:
logger.info("Server is healthy and ready to process requests.")
return True
else:
logger.error(f'Server health check failed with status code:{response.status_code}')
return False
except requests.exceptions.RequestException as e:
logger.error(f'Health check request failed:{e}')
return False
def predict(self, image_path: str, text: str):
payload = {
"image_path": image_path,
"text": text
}
headers = {'Content-Type': 'application/json'}
response = requests.post(f"{self.api_url}/predict", json=payload, headers=headers)
if response.status_code == 200:
result = response.json()
return result.get('Generated Text', '')
else:
raise Exception(f"Predict API request failed with status code {response.status_code}")
def main():
args = parse_args()
config = configparser.ConfigParser()
config.read(args.config_path)
ocr_server = config.get('server', 'ocr_server')
client = PredictClient(ocr_server)
try:
start_time = time.time() # 记录开始时间
# 压缩图片
compress_image(args.image_path)
generated_text = client.predict(args.image_path, parse_text(args.text))
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time # 计算运行时间
if generated_text:
clean_text = unparse_text(generated_text) # 解析生成的文本
logger.info(f"Image Path: {args.image_path}")
logger.info(f"Generated Text: {clean_text}")
logger.info(f"耗时为: {elapsed_time}秒") # 打印运行时间
else:
logger.warning("Received empty generated text.")
except requests.exceptions.RequestException as e:
logger.error(f"Error while making request to predict service: {e}")
except Exception as e:
logger.error(f"Unexpected error occurred: {e}")
if __name__ == "__main__":
main()
\ No newline at end of file
# Copyright (c) Alibaba Cloud.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree
import copy
import re
import gc
import torch
from argparse import ArgumentParser
from threading import Thread
from qwen_vl_utils import process_vision_info
from transformers import AutoProcessor
from vllm import LLM, SamplingParams
import os
import configparser
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
DEFAULT_CKPT_PATH = '/home/practice/model/Qwen2-VL-7B-Instruct'
REVISION = 'v1.0.4'
BOX_TAG_PATTERN = r"<box>([\s\S]*?)</box>"
PUNCTUATION = "!?。"#$%&'()*+,-/:;<=>@[\]^_`{|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏."
def get_args():
parser = ArgumentParser()
parser.add_argument('-c', '--checkpoint_path', type=str, default=DEFAULT_CKPT_PATH,
help='Checkpoint name or path, default to %(default)r')
parser.add_argument('--cpu_only', default=False, action='store_true', help='Run demo with CPU only')
parser.add_argument('--flash_attn2', action='store_true', default=False,
help='Enable flash_attention_2 when loading the model.')
parser.add_argument('--share', action='store_true', default=False,
help='Create a publicly shareable link for the interface.')
parser.add_argument('--inbrowser', action='store_true', default=False,
help='Automatically launch the interface in a new tab on the default browser.')
parser.add_argument('--gpu_nums', type=int, default=1, help='Number of GPUs to use for tensor parallelism.')
parser.add_argument('--dcu_id', type=str, default='0', help='Specify the GPU ID to load the model onto.')
parser.add_argument(
'--config_path',
default='/home/practice/magic_pdf-main/magic_pdf/config.ini',
)
args = parser.parse_args()
return args
def load_model_processor(args):
if args.cpu_only:
device = 'cpu'
else:
os.environ['CUDA_VISIBLE_DEVICES'] = args.dcu_id
print(f"Visible CUDA devices: {os.environ.get('CUDA_VISIBLE_DEVICES')}")
llm = LLM(
model=args.checkpoint_path,
limit_mm_per_prompt={"image": 10, "video": 10},
trust_remote_code=True,
tensor_parallel_size=args.gpu_nums, # 用args.gpu_nums根据实际情况调整
gpu_memory_utilization=0.99,
dtype='float16', # 或者 'bfloat16'
)
processor = AutoProcessor.from_pretrained(args.checkpoint_path)
return llm, processor
def parse_text(text):
lines = text.split("\n")
lines = [line for line in lines if line.strip() != ""] # 去除空行
count = 0
parsed_lines = []
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split("`")
if count % 2 == 1:
# 开始代码块
parsed_lines.append(f'<pre><code class="language-{items[-1]}">')
else:
# 结束代码块
parsed_lines.append(f"</code></pre>")
else:
if i > 0 and count % 2 == 1:
# 转义代码块内的特殊字符
line = line.replace("`", r"\`")
line = line.replace("<", "&lt;")
line = line.replace(">", "&gt;")
line = line.replace(" ", "&nbsp;")
line = line.replace("*", "&ast;")
line = line.replace("_", "&lowbar;")
line = line.replace("-", "&#45;")
line = line.replace(".", "&#46;")
line = line.replace("!", "&#33;")
line = line.replace("(", "&#40;")
line = line.replace(")", "&#41;")
line = line.replace("$", "&#36;")
# 使用空格连接行
if parsed_lines:
parsed_lines[-1] += " " + line
else:
parsed_lines.append(line)
text = "".join(parsed_lines)
return text
def unparse_text(parsed_text):
in_code_block = False
lines = parsed_text.split("\n")
unparsed_lines = []
for line in lines:
if "<pre><code" in line:
in_code_block = True
# 移除开始标签
line = line.split(">", 1)[1]
elif "</code></pre>" in line:
in_code_block = False
# 移除结束标签
line = line.rsplit("<", 1)[0]
# 反转 HTML 实体
line = line.replace("&lt;", "<")
line = line.replace("&gt;", ">")
line = line.replace("&nbsp;", " ")
line = line.replace("&ast;", "*")
line = line.replace("&lowbar;", "_")
line = line.replace("&#45;", "-")
line = line.replace("&#46;", ".")
line = line.replace("&#33;", "!")
line = line.replace("&#40;", "(")
line = line.replace("&#41;", ")")
line = line.replace("&#36;", "$")
# 如果在代码块内,还原反斜杠转义
if in_code_block:
line = line.replace(r"\`", "`")
unparsed_lines.append(line)
# 合并所有行
unparsed_text = "\n".join(unparsed_lines)
return unparsed_text
def remove_image_special(text):
text = text.replace('<ref>', '').replace('</ref>', '')
return re.sub(r'<box>.*?(</box>|$)', '', text)
def is_video_file(filename):
video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.mpeg']
return any(filename.lower().endswith(ext) for ext in video_extensions)
def transform_messages(original_messages):
transformed_messages = []
for message in original_messages:
new_content = []
for item in message['content']:
if 'image' in item:
new_item = {'type': 'image', 'image': item['image']}
elif 'text' in item:
new_item = {'type': 'text', 'text': item['text']}
elif 'video' in item:
new_item = {'type': 'video', 'video': item['video']}
else:
continue
new_content.append(new_item)
new_message = {'role': message['role'], 'content': new_content}
transformed_messages.append(new_message)
return transformed_messages
def _gc():
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def call_local_model(llm, processor, messages):
messages = transform_messages(messages)
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
mm_data = {}
if image_inputs is not None:
mm_data["image"] = image_inputs
if video_inputs is not None:
mm_data["video"] = video_inputs
llm_inputs = {
"prompt": text,
"multi_modal_data": mm_data,
}
sampling_params = SamplingParams(
temperature=0.1,
top_p=0.001,
repetition_penalty=1.05,
max_tokens=256,
stop_token_ids=[],
)
outputs = llm.generate([llm_inputs], sampling_params=sampling_params)
generated_text = outputs[0].outputs[0].text
yield parse_text(generated_text)
def create_predict_fn(llm, processor):
def predict(_chatbot, task_history):
chat_query = _chatbot[-1][0]
query = task_history[-1][0]
if len(chat_query) == 0:
_chatbot.pop()
task_history.pop()
return _chatbot
print('User: ' + parse_text(query))
history_cp = copy.deepcopy(task_history)
full_response = ''
messages = []
content = []
for q, a in history_cp:
if isinstance(q, (tuple, list)):
if is_video_file(q[0]):
content.append({'video': f'file://{q[0]}'})
else:
content.append({'image': f'file://{q[0]}'})
else:
content.append({'text': q})
messages.append({'role': 'user', 'content': content})
messages.append({'role': 'assistant', 'content': [{'text': a}]})
content = []
messages.pop()
for response in call_local_model(llm, processor, messages):
_chatbot[-1] = (parse_text(chat_query), remove_image_special(parse_text(response)))
yield _chatbot
full_response = parse_text(response)
task_history[-1] = (query, full_response)
print('Qwen-VL-Chat: ' + unparse_text(full_response))
yield _chatbot
return predict
# 启用加载模型
args = get_args()
llm, processor = load_model_processor(args)
class Item(BaseModel):
image_path: str
text: str
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.post("/predict")
async def predict(item: Item):
messages = [
{
'role': 'user',
'content': [
{'image': item.image_path},
{'text': item.text}
]
}
]
generated_text = ''
for response in call_local_model(llm, processor, messages):
generated_text = unparse_text(response)
_gc()
return {"Generated Text": generated_text}
if __name__ == "__main__":
import uvicorn
args = get_args()
config = configparser.ConfigParser()
config.read(args.config_path)
# host = config.get('server', 'ocr_host')
host, port = config.get('server', 'ocr_server').split('://')[1].split(':')[0], int(
config.get('server', 'ocr_server').split('://')[1].split(':')[1])
# port = int(config.get('server', 'ocr_port'))
uvicorn.run(app, host=host, port=port)
\ No newline at end of file
This diff is collapsed.
"""
输入: s3路径,每行一个
输出: pdf文件元信息,包括每一页上的所有图片的长宽高,bbox位置
"""
import sys
import click
from magic_pdf.libs.commons import read_file, mymax, get_top_percent_list
from magic_pdf.libs.commons import fitz
from loguru import logger
from collections import Counter
from magic_pdf.libs.drop_reason import DropReason
from magic_pdf.libs.language import detect_lang
from magic_pdf.libs.pdf_check import detect_invalid_chars
scan_max_page = 50
junk_limit_min = 10
def calculate_max_image_area_per_page(result: list, page_width_pts, page_height_pts):
max_image_area_per_page = [mymax([(x1 - x0) * (y1 - y0) for x0, y0, x1, y1, _ in page_img_sz]) for page_img_sz in
result]
page_area = int(page_width_pts) * int(page_height_pts)
max_image_area_per_page = [area / page_area for area in max_image_area_per_page]
max_image_area_per_page = [area for area in max_image_area_per_page if area > 0.6]
return max_image_area_per_page
def process_image(page, junk_img_bojids=[]):
page_result = [] # 存每个页面里的多张图四元组信息
items = page.get_images()
dedup = set()
for img in items:
# 这里返回的是图片在page上的实际展示的大小。返回一个数组,每个元素第一部分是
img_bojid = img[0] # 在pdf文件中是全局唯一的,如果这个图反复出现在pdf里那么就可能是垃圾信息,例如水印、页眉页脚等
if img_bojid in junk_img_bojids: # 如果是垃圾图像,就跳过
continue
recs = page.get_image_rects(img, transform=True)
if recs:
rec = recs[0][0]
x0, y0, x1, y1 = map(int, rec)
width = x1 - x0
height = y1 - y0
if (x0, y0, x1, y1, img_bojid) in dedup: # 这里面会出现一些重复的bbox,无需重复出现,需要去掉
continue
if not all([width, height]): # 长和宽任何一个都不能是0,否则这个图片不可见,没有实际意义
continue
dedup.add((x0, y0, x1, y1, img_bojid))
page_result.append([x0, y0, x1, y1, img_bojid])
return page_result
def get_image_info(doc: fitz.Document, page_width_pts, page_height_pts) -> list:
"""
返回每个页面里的图片的四元组,每个页面多个图片。
:param doc:
:return:
"""
# 使用 Counter 计数 img_bojid 的出现次数
img_bojid_counter = Counter(img[0] for page in doc for img in page.get_images())
# 找出出现次数超过 len(doc) 半数的 img_bojid
junk_limit = max(len(doc) * 0.5, junk_limit_min) # 对一些页数比较少的进行豁免
junk_img_bojids = [img_bojid for img_bojid, count in img_bojid_counter.items() if count >= junk_limit]
#todo 加个判断,用前十页就行,这些垃圾图片需要满足两个条件,不止出现的次数要足够多,而且图片占书页面积的比例要足够大,且图与图大小都差不多
#有两种扫描版,一种文字版,这里可能会有误判
#扫描版1:每页都有所有扫描页图片,特点是图占比大,每页展示1张
#扫描版2,每页存储的扫描页图片数量递增,特点是图占比大,每页展示1张,需要清空junklist跑前50页图片信息用于分类判断
#文字版1.每页存储所有图片,特点是图片占页面比例不大,每页展示可能为0也可能不止1张 这种pdf需要拿前10页抽样检测img大小和个数,如果符合需要清空junklist
imgs_len_list = [len(page.get_images()) for page in doc]
special_limit_pages = 10
# 统一用前十页结果做判断
result = []
break_loop = False
for i, page in enumerate(doc):
if break_loop:
break
if i >= special_limit_pages:
break
page_result = process_image(page) # 这里不传junk_img_bojids,拿前十页所有图片信息用于后续分析
result.append(page_result)
for item in result:
if not any(item): # 如果任何一页没有图片,说明是个文字版,需要判断是否为特殊文字版
if max(imgs_len_list) == min(imgs_len_list) and max(
imgs_len_list) >= junk_limit_min: # 如果是特殊文字版,就把junklist置空并break
junk_img_bojids = []
else: # 不是特殊文字版,是个普通文字版,但是存在垃圾图片,不置空junklist
pass
break_loop = True
break
if not break_loop:
# 获取前80%的元素
top_eighty_percent = get_top_percent_list(imgs_len_list, 0.8)
# 检查前80%的元素是否都相等
if len(set(top_eighty_percent)) == 1 and max(imgs_len_list) >= junk_limit_min:
# # 如果前10页跑完都有图,根据每页图片数量是否相等判断是否需要清除junklist
# if max(imgs_len_list) == min(imgs_len_list) and max(imgs_len_list) >= junk_limit_min:
#前10页都有图,且每页数量一致,需要检测图片大小占页面的比例判断是否需要清除junklist
max_image_area_per_page = calculate_max_image_area_per_page(result, page_width_pts, page_height_pts)
if len(max_image_area_per_page) < 0.8 * special_limit_pages: # 前10页不全是大图,说明可能是个文字版pdf,把垃圾图片list置空
junk_img_bojids = []
else: # 前10页都有图,而且80%都是大图,且每页图片数量一致并都很多,说明是扫描版1,不需要清空junklist
pass
else: # 每页图片数量不一致,需要清掉junklist全量跑前50页图片
junk_img_bojids = []
#正式进入取前50页图片的信息流程
result = []
for i, page in enumerate(doc):
if i >= scan_max_page:
break
page_result = process_image(page, junk_img_bojids)
# logger.info(f"page {i} img_len: {len(page_result)}")
result.append(page_result)
return result, junk_img_bojids
def get_pdf_page_size_pts(doc: fitz.Document):
page_cnt = len(doc)
l: int = min(page_cnt, 50)
#把所有宽度和高度塞到两个list 分别取中位数(中间遇到了个在纵页里塞横页的pdf,导致宽高互换了)
page_width_list = []
page_height_list = []
for i in range(l):
page = doc[i]
page_rect = page.rect
page_width_list.append(page_rect.width)
page_height_list.append(page_rect.height)
page_width_list.sort()
page_height_list.sort()
median_width = page_width_list[len(page_width_list) // 2]
median_height = page_height_list[len(page_height_list) // 2]
return median_width, median_height
def get_pdf_textlen_per_page(doc: fitz.Document):
text_len_lst = []
for page in doc:
# 拿包含img和text的所有blocks
# text_block = page.get_text("blocks")
# 拿所有text的blocks
# text_block = page.get_text("words")
# text_block_len = sum([len(t[4]) for t in text_block])
#拿所有text的str
text_block = page.get_text("text")
text_block_len = len(text_block)
# logger.info(f"page {page.number} text_block_len: {text_block_len}")
text_len_lst.append(text_block_len)
return text_len_lst
def get_pdf_text_layout_per_page(doc: fitz.Document):
"""
根据PDF文档的每一页文本布局,判断该页的文本布局是横向、纵向还是未知。
Args:
doc (fitz.Document): PDF文档对象。
Returns:
List[str]: 每一页的文本布局(横向、纵向、未知)。
"""
text_layout_list = []
for page_id, page in enumerate(doc):
if page_id >= scan_max_page:
break
# 创建每一页的纵向和横向的文本行数计数器
vertical_count = 0
horizontal_count = 0
text_dict = page.get_text("dict")
if "blocks" in text_dict:
for block in text_dict["blocks"]:
if 'lines' in block:
for line in block["lines"]:
# 获取line的bbox顶点坐标
x0, y0, x1, y1 = line['bbox']
# 计算bbox的宽高
width = x1 - x0
height = y1 - y0
# 计算bbox的面积
area = width * height
font_sizes = []
for span in line['spans']:
if 'size' in span:
font_sizes.append(span['size'])
if len(font_sizes) > 0:
average_font_size = sum(font_sizes) / len(font_sizes)
else:
average_font_size = 10 # 有的line拿不到font_size,先定一个阈值100
if area <= average_font_size ** 2: # 判断bbox的面积是否小于平均字体大小的平方,单字无法计算是横向还是纵向
continue
else:
if 'wmode' in line: # 通过wmode判断文本方向
if line['wmode'] == 1: # 判断是否为竖向文本
vertical_count += 1
elif line['wmode'] == 0: # 判断是否为横向文本
horizontal_count += 1
# if 'dir' in line: # 通过旋转角度计算判断文本方向
# # 获取行的 "dir" 值
# dir_value = line['dir']
# cosine, sine = dir_value
# # 计算角度
# angle = math.degrees(math.acos(cosine))
#
# # 判断是否为横向文本
# if abs(angle - 0) < 0.01 or abs(angle - 180) < 0.01:
# # line_text = ' '.join(span['text'] for span in line['spans'])
# # print('This line is horizontal:', line_text)
# horizontal_count += 1
# # 判断是否为纵向文本
# elif abs(angle - 90) < 0.01 or abs(angle - 270) < 0.01:
# # line_text = ' '.join(span['text'] for span in line['spans'])
# # print('This line is vertical:', line_text)
# vertical_count += 1
# print(f"page_id: {page_id}, vertical_count: {vertical_count}, horizontal_count: {horizontal_count}")
# 判断每一页的文本布局
if vertical_count == 0 and horizontal_count == 0: # 该页没有文本,无法判断
text_layout_list.append("unknow")
continue
else:
if vertical_count > horizontal_count: # 该页的文本纵向行数大于横向的
text_layout_list.append("vertical")
else: # 该页的文本横向行数大于纵向的
text_layout_list.append("horizontal")
# logger.info(f"page_id: {page_id}, vertical_count: {vertical_count}, horizontal_count: {horizontal_count}")
return text_layout_list
'''定义一个自定义异常用来抛出单页svg太多的pdf'''
class PageSvgsTooManyError(Exception):
def __init__(self, message="Page SVGs are too many"):
self.message = message
super().__init__(self.message)
def get_svgs_per_page(doc: fitz.Document):
svgs_len_list = []
for page_id, page in enumerate(doc):
# svgs = page.get_drawings()
svgs = page.get_cdrawings() # 切换成get_cdrawings,效率更高
len_svgs = len(svgs)
if len_svgs >= 3000:
raise PageSvgsTooManyError()
else:
svgs_len_list.append(len_svgs)
# logger.info(f"page_id: {page_id}, svgs_len: {len(svgs)}")
return svgs_len_list
def get_imgs_per_page(doc: fitz.Document):
imgs_len_list = []
for page_id, page in enumerate(doc):
imgs = page.get_images()
imgs_len_list.append(len(imgs))
# logger.info(f"page_id: {page}, imgs_len: {len(imgs)}")
return imgs_len_list
def get_language(doc: fitz.Document):
"""
获取PDF文档的语言。
Args:
doc (fitz.Document): PDF文档对象。
Returns:
str: 文档语言,如 "en-US"。
"""
language_lst = []
for page_id, page in enumerate(doc):
if page_id >= scan_max_page:
break
# 拿所有text的str
text_block = page.get_text("text")
page_language = detect_lang(text_block)
language_lst.append(page_language)
# logger.info(f"page_id: {page_id}, page_language: {page_language}")
# 统计text_language_list中每种语言的个数
count_dict = Counter(language_lst)
# 输出text_language_list中出现的次数最多的语言
language = max(count_dict, key=count_dict.get)
return language
def check_invalid_chars(pdf_bytes):
"""
乱码检测
"""
return detect_invalid_chars(pdf_bytes)
def pdf_meta_scan(pdf_bytes: bytes):
"""
:param s3_pdf_path:
:param pdf_bytes: pdf文件的二进制数据
几个维度来评价:是否加密,是否需要密码,纸张大小,总页数,是否文字可提取
"""
doc = fitz.open("pdf", pdf_bytes)
is_needs_password = doc.needs_pass
is_encrypted = doc.is_encrypted
total_page = len(doc)
if total_page == 0:
logger.warning(f"drop this pdf, drop_reason: {DropReason.EMPTY_PDF}")
result = {"_need_drop": True, "_drop_reason": DropReason.EMPTY_PDF}
return result
else:
page_width_pts, page_height_pts = get_pdf_page_size_pts(doc)
# logger.info(f"page_width_pts: {page_width_pts}, page_height_pts: {page_height_pts}")
# svgs_per_page = get_svgs_per_page(doc)
# logger.info(f"svgs_per_page: {svgs_per_page}")
imgs_per_page = get_imgs_per_page(doc)
# logger.info(f"imgs_per_page: {imgs_per_page}")
image_info_per_page, junk_img_bojids = get_image_info(doc, page_width_pts, page_height_pts)
# logger.info(f"image_info_per_page: {image_info_per_page}, junk_img_bojids: {junk_img_bojids}")
text_len_per_page = get_pdf_textlen_per_page(doc)
# logger.info(f"text_len_per_page: {text_len_per_page}")
text_layout_per_page = get_pdf_text_layout_per_page(doc)
# logger.info(f"text_layout_per_page: {text_layout_per_page}")
text_language = get_language(doc)
# logger.info(f"text_language: {text_language}")
invalid_chars = check_invalid_chars(pdf_bytes)
# logger.info(f"invalid_chars: {invalid_chars}")
# 最后输出一条json
res = {
"is_needs_password": is_needs_password,
"is_encrypted": is_encrypted,
"total_page": total_page,
"page_width_pts": int(page_width_pts),
"page_height_pts": int(page_height_pts),
"image_info_per_page": image_info_per_page,
"text_len_per_page": text_len_per_page,
"text_layout_per_page": text_layout_per_page,
"text_language": text_language,
# "svgs_per_page": svgs_per_page,
"imgs_per_page": imgs_per_page, # 增加每页img数量list
"junk_img_bojids": junk_img_bojids, # 增加垃圾图片的bojid list
"invalid_chars": invalid_chars,
"metadata": doc.metadata
}
# logger.info(json.dumps(res, ensure_ascii=False))
return res
@click.command()
@click.option('--s3-pdf-path', help='s3上pdf文件的路径')
@click.option('--s3-profile', help='s3上的profile')
def main(s3_pdf_path: str, s3_profile: str):
"""
"""
try:
file_content = read_file(s3_pdf_path, s3_profile)
pdf_meta_scan(file_content)
except Exception as e:
print(f"ERROR: {s3_pdf_path}, {e}", file=sys.stderr)
logger.exception(e)
if __name__ == '__main__':
main()
# "D:\project/20231108code-clean\pdf_cost_time\竖排例子\净空法师-大乘无量寿.pdf"
# "D:\project/20231108code-clean\pdf_cost_time\竖排例子\三国演义_繁体竖排版.pdf"
# "D:\project/20231108code-clean\pdf_cost_time\scihub\scihub_86800000\libgen.scimag86880000-86880999.zip_10.1021/acsami.1c03109.s002.pdf"
# "D:/project/20231108code-clean/pdf_cost_time/scihub/scihub_18600000/libgen.scimag18645000-18645999.zip_10.1021/om3006239.pdf"
# file_content = read_file("D:/project/20231108code-clean/pdf_cost_time/scihub/scihub_31000000/libgen.scimag31098000-31098999.zip_10.1109/isit.2006.261791.pdf","")
# file_content = read_file("D:\project/20231108code-clean\pdf_cost_time\竖排例子\净空法师_大乘无量寿.pdf","")
# doc = fitz.open("pdf", file_content)
# text_layout_lst = get_pdf_text_layout_per_page(doc)
# print(text_layout_lst)
import os
from pathlib import Path
from loguru import logger
from magic_pdf.integrations.rag.type import (ElementRelation, LayoutElements,
Node)
from magic_pdf.integrations.rag.utils import inference
class RagPageReader:
def __init__(self, pagedata: LayoutElements):
self.o = [
Node(
category_type=v.category_type,
text=v.text,
image_path=v.image_path,
anno_id=v.anno_id,
latex=v.latex,
html=v.html,
) for v in pagedata.layout_dets
]
self.pagedata = pagedata
def __iter__(self):
return iter(self.o)
def get_rel_map(self) -> list[ElementRelation]:
return self.pagedata.extra.element_relation
class RagDocumentReader:
def __init__(self, ragdata: list[LayoutElements]):
self.o = [RagPageReader(v) for v in ragdata]
def __iter__(self):
return iter(self.o)
class DataReader:
def __init__(self, path_or_directory: str, method: str, output_dir: str):
self.path_or_directory = path_or_directory
self.method = method
self.output_dir = output_dir
self.pdfs = []
if os.path.isdir(path_or_directory):
for doc_path in Path(path_or_directory).glob('*.pdf'):
self.pdfs.append(doc_path)
else:
assert path_or_directory.endswith('.pdf')
self.pdfs.append(Path(path_or_directory))
def get_documents_count(self) -> int:
"""Returns the number of documents in the directory."""
return len(self.pdfs)
def get_document_result(self, idx: int) -> RagDocumentReader | None:
"""
Args:
idx (int): the index of documents under the
directory path_or_directory
Returns:
RagDocumentReader | None: RagDocumentReader is an iterable object,
more details @RagDocumentReader
"""
if idx >= self.get_documents_count() or idx < 0:
logger.error(f'invalid idx: {idx}')
return None
res = inference(str(self.pdfs[idx]), self.output_dir, self.method)
if res is None:
logger.warning(f'failed to inference pdf {self.pdfs[idx]}')
return None
return RagDocumentReader(res)
def get_document_filename(self, idx: int) -> Path:
"""get the filename of the document."""
return self.pdfs[idx]
from enum import Enum
from pydantic import BaseModel, Field
# rag
class CategoryType(Enum): # py310 not support StrEnum
text = 'text'
title = 'title'
interline_equation = 'interline_equation'
image = 'image'
image_body = 'image_body'
image_caption = 'image_caption'
table = 'table'
table_body = 'table_body'
table_caption = 'table_caption'
table_footnote = 'table_footnote'
class ElementRelType(Enum):
sibling = 'sibling'
class PageInfo(BaseModel):
page_no: int = Field(description='the index of page, start from zero',
ge=0)
height: int = Field(description='the height of page', gt=0)
width: int = Field(description='the width of page', ge=0)
image_path: str | None = Field(description='the image of this page',
default=None)
class ContentObject(BaseModel):
category_type: CategoryType = Field(description='类别')
poly: list[float] = Field(
description=('Coordinates, need to convert back to PDF coordinates,'
' order is top-left, top-right, bottom-right, bottom-left'
' x,y coordinates'))
ignore: bool = Field(description='whether ignore this object',
default=False)
text: str | None = Field(description='text content of the object',
default=None)
image_path: str | None = Field(description='path of embedded image',
default=None)
order: int = Field(description='the order of this object within a page',
default=-1)
anno_id: int = Field(description='unique id', default=-1)
latex: str | None = Field(description='latex result', default=None)
html: str | None = Field(description='html result', default=None)
class ElementRelation(BaseModel):
source_anno_id: int = Field(description='unique id of the source object',
default=-1)
target_anno_id: int = Field(description='unique id of the target object',
default=-1)
relation: ElementRelType = Field(
description='the relation between source and target element')
class LayoutElementsExtra(BaseModel):
element_relation: list[ElementRelation] = Field(
description='the relation between source and target element')
class LayoutElements(BaseModel):
layout_dets: list[ContentObject] = Field(
description='layout element details')
page_info: PageInfo = Field(description='page info')
extra: LayoutElementsExtra = Field(description='extra information')
# iter data format
class Node(BaseModel):
category_type: CategoryType = Field(description='类别')
text: str | None = Field(description='text content of the object',
default=None)
image_path: str | None = Field(description='path of embedded image',
default=None)
anno_id: int = Field(description='unique id', default=-1)
latex: str | None = Field(description='latex result', default=None)
html: str | None = Field(description='html result', default=None)
import json
import os
from pathlib import Path
from loguru import logger
import magic_pdf.model as model_config
from magic_pdf.dict2md.ocr_mkcontent import merge_para_with_text
from magic_pdf.integrations.rag.type import (CategoryType, ContentObject,
ElementRelation, ElementRelType,
LayoutElements,
LayoutElementsExtra, PageInfo)
from magic_pdf.libs.ocr_content_type import BlockType, ContentType
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.tools.common import do_parse, prepare_env
def convert_middle_json_to_layout_elements(
json_data: dict,
output_dir: str,
) -> list[LayoutElements]:
uniq_anno_id = 0
res: list[LayoutElements] = []
for page_no, page_data in enumerate(json_data['pdf_info']):
order_id = 0
page_info = PageInfo(
height=int(page_data['page_size'][1]),
width=int(page_data['page_size'][0]),
page_no=page_no,
)
layout_dets: list[ContentObject] = []
extra_element_relation: list[ElementRelation] = []
for para_block in page_data['para_blocks']:
para_text = ''
para_type = para_block['type']
if para_type == BlockType.Text:
para_text = merge_para_with_text(para_block)
x0, y0, x1, y1 = para_block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.text,
text=para_text,
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
uniq_anno_id += 1
order_id += 1
layout_dets.append(content)
elif para_type == BlockType.Title:
para_text = merge_para_with_text(para_block)
x0, y0, x1, y1 = para_block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.title,
text=para_text,
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
uniq_anno_id += 1
order_id += 1
layout_dets.append(content)
elif para_type == BlockType.InterlineEquation:
para_text = merge_para_with_text(para_block)
x0, y0, x1, y1 = para_block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.interline_equation,
text=para_text,
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
uniq_anno_id += 1
order_id += 1
layout_dets.append(content)
elif para_type == BlockType.Image:
body_anno_id = -1
caption_anno_id = -1
for block in para_block['blocks']:
if block['type'] == BlockType.ImageBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Image:
x0, y0, x1, y1 = block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.image_body,
image_path=os.path.join(
output_dir, span['image_path']),
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
body_anno_id = uniq_anno_id
uniq_anno_id += 1
order_id += 1
layout_dets.append(content)
for block in para_block['blocks']:
if block['type'] == BlockType.ImageCaption:
para_text += merge_para_with_text(block)
x0, y0, x1, y1 = block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.image_caption,
text=para_text,
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
caption_anno_id = uniq_anno_id
uniq_anno_id += 1
order_id += 1
layout_dets.append(content)
if body_anno_id > 0 and caption_anno_id > 0:
element_relation = ElementRelation(
relation=ElementRelType.sibling,
source_anno_id=body_anno_id,
target_anno_id=caption_anno_id,
)
extra_element_relation.append(element_relation)
elif para_type == BlockType.Table:
body_anno_id, caption_anno_id, footnote_anno_id = -1, -1, -1
for block in para_block['blocks']:
if block['type'] == BlockType.TableCaption:
para_text += merge_para_with_text(block)
x0, y0, x1, y1 = block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.table_caption,
text=para_text,
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
caption_anno_id = uniq_anno_id
uniq_anno_id += 1
order_id += 1
layout_dets.append(content)
for block in para_block['blocks']:
if block['type'] == BlockType.TableBody:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.Table:
x0, y0, x1, y1 = para_block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.table_body,
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
body_anno_id = uniq_anno_id
uniq_anno_id += 1
order_id += 1
# if processed by table model
if span.get('latex', ''):
content.latex = span['latex']
else:
content.image_path = os.path.join(
output_dir, span['image_path'])
layout_dets.append(content)
for block in para_block['blocks']:
if block['type'] == BlockType.TableFootnote:
para_text += merge_para_with_text(block)
x0, y0, x1, y1 = block['bbox']
content = ContentObject(
anno_id=uniq_anno_id,
category_type=CategoryType.table_footnote,
text=para_text,
order=order_id,
poly=[x0, y0, x1, y0, x1, y1, x0, y1],
)
footnote_anno_id = uniq_anno_id
uniq_anno_id += 1
order_id += 1
layout_dets.append(content)
if caption_anno_id != -1 and body_anno_id != -1:
element_relation = ElementRelation(
relation=ElementRelType.sibling,
source_anno_id=body_anno_id,
target_anno_id=caption_anno_id,
)
extra_element_relation.append(element_relation)
if footnote_anno_id != -1 and body_anno_id != -1:
element_relation = ElementRelation(
relation=ElementRelType.sibling,
source_anno_id=body_anno_id,
target_anno_id=footnote_anno_id,
)
extra_element_relation.append(element_relation)
res.append(
LayoutElements(
page_info=page_info,
layout_dets=layout_dets,
extra=LayoutElementsExtra(
element_relation=extra_element_relation),
))
return res
def inference(path, output_dir, method):
model_config.__use_inside_model__ = True
model_config.__model_mode__ = 'full'
if output_dir == '':
if os.path.isdir(path):
output_dir = os.path.join(path, 'output')
else:
output_dir = os.path.join(os.path.dirname(path), 'output')
local_image_dir, local_md_dir = prepare_env(output_dir,
str(Path(path).stem), method)
def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
def parse_doc(doc_path: str):
try:
file_name = str(Path(doc_path).stem)
pdf_data = read_fn(doc_path)
do_parse(
output_dir,
file_name,
pdf_data,
[],
method,
False,
f_draw_span_bbox=False,
f_draw_layout_bbox=False,
f_dump_md=False,
f_dump_middle_json=True,
f_dump_model_json=False,
f_dump_orig_pdf=False,
f_dump_content_list=False,
f_draw_model_bbox=False,
)
middle_json_fn = os.path.join(local_md_dir,
f'{file_name}_middle.json')
with open(middle_json_fn) as fd:
jso = json.load(fd)
os.remove(middle_json_fn)
return convert_middle_json_to_layout_elements(jso, local_image_dir)
except Exception as e:
logger.exception(e)
return parse_doc(path)
if __name__ == '__main__':
import pprint
base_dir = '/opt/data/pdf/resources/samples/'
if 0:
with open(base_dir + 'json_outputs/middle.json') as f:
d = json.load(f)
result = convert_middle_json_to_layout_elements(d, '/tmp')
pprint.pp(result)
if 0:
with open(base_dir + 'json_outputs/middle.3.json') as f:
d = json.load(f)
result = convert_middle_json_to_layout_elements(d, '/tmp')
pprint.pp(result)
if 1:
res = inference(
base_dir + 'samples/pdf/one_page_with_table_image.pdf',
'/tmp/output',
'ocr',
)
pprint.pp(res)
This diff is collapsed.
from magic_pdf.layout.bbox_sort import X0_EXT_IDX, X0_IDX, X1_EXT_IDX, X1_IDX, Y0_IDX, Y1_EXT_IDX, Y1_IDX
from magic_pdf.libs.boxbase import _is_bottom_full_overlap, _left_intersect, _right_intersect
def find_all_left_bbox_direct(this_bbox, all_bboxes) -> list:
"""
在all_bboxes里找到所有右侧垂直方向上和this_bbox有重叠的bbox, 不用延长线
并且要考虑两个box左右相交的情况,如果相交了,那么右侧的box就不算最左侧。
"""
left_boxes = [box for box in all_bboxes if box[X1_IDX] <= this_bbox[X0_IDX]
and any([
box[Y0_IDX] < this_bbox[Y0_IDX] < box[Y1_IDX], box[Y0_IDX] < this_bbox[Y1_IDX] < box[Y1_IDX],
this_bbox[Y0_IDX] < box[Y0_IDX] < this_bbox[Y1_IDX], this_bbox[Y0_IDX] < box[Y1_IDX] < this_bbox[Y1_IDX],
box[Y0_IDX]==this_bbox[Y0_IDX] and box[Y1_IDX]==this_bbox[Y1_IDX]]) or _left_intersect(box[:4], this_bbox[:4])]
# 然后再过滤一下,找到水平上距离this_bbox最近的那个——x1最大的那个
if len(left_boxes) > 0:
left_boxes.sort(key=lambda x: x[X1_EXT_IDX] if x[X1_EXT_IDX] else x[X1_IDX], reverse=True)
left_boxes = left_boxes[0]
else:
left_boxes = None
return left_boxes
def find_all_right_bbox_direct(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox右侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
right_bboxes = [box for box in all_bboxes if box[X0_IDX] >= this_bbox[X1_IDX]
and any([
this_bbox[Y0_IDX] < box[Y0_IDX] < this_bbox[Y1_IDX], this_bbox[Y0_IDX] < box[Y1_IDX] < this_bbox[Y1_IDX],
box[Y0_IDX] < this_bbox[Y0_IDX] < box[Y1_IDX], box[Y0_IDX] < this_bbox[Y1_IDX] < box[Y1_IDX],
box[Y0_IDX]==this_bbox[Y0_IDX] and box[Y1_IDX]==this_bbox[Y1_IDX]]) or _right_intersect(this_bbox[:4], box[:4])]
if len(right_bboxes)>0:
right_bboxes.sort(key=lambda x: x[X0_EXT_IDX] if x[X0_EXT_IDX] else x[X0_IDX])
right_bboxes = right_bboxes[0]
else:
right_bboxes = None
return right_bboxes
def find_all_top_bbox_direct(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox上侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
top_bboxes = [box for box in all_bboxes if box[Y1_IDX] <= this_bbox[Y0_IDX] and any([
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(top_bboxes)>0:
top_bboxes.sort(key=lambda x: x[Y1_EXT_IDX] if x[Y1_EXT_IDX] else x[Y1_IDX], reverse=True)
top_bboxes = top_bboxes[0]
else:
top_bboxes = None
return top_bboxes
def find_all_bottom_bbox_direct(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox下侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
bottom_bboxes = [box for box in all_bboxes if box[Y0_IDX] >= this_bbox[Y1_IDX] and any([
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(bottom_bboxes)>0:
bottom_bboxes.sort(key=lambda x: x[Y0_IDX])
bottom_bboxes = bottom_bboxes[0]
else:
bottom_bboxes = None
return bottom_bboxes
# ===================================================================================================================
def find_bottom_bbox_direct_from_right_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox下侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
bottom_bboxes = [box for box in all_bboxes if box[Y0_IDX] >= this_bbox[Y1_IDX] and any([
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(bottom_bboxes)>0:
# y0最小, X1最大的那个,也就是box上边缘最靠近this_bbox的那个,并且还最靠右
bottom_bboxes.sort(key=lambda x: x[Y0_IDX])
bottom_bboxes = [box for box in bottom_bboxes if box[Y0_IDX]==bottom_bboxes[0][Y0_IDX]]
# 然后再y1相同的情况下,找到x1最大的那个
bottom_bboxes.sort(key=lambda x: x[X1_IDX], reverse=True)
bottom_bboxes = bottom_bboxes[0]
else:
bottom_bboxes = None
return bottom_bboxes
def find_bottom_bbox_direct_from_left_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox下侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
bottom_bboxes = [box for box in all_bboxes if box[Y0_IDX] >= this_bbox[Y1_IDX] and any([
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(bottom_bboxes)>0:
# y0最小, X0最小的那个
bottom_bboxes.sort(key=lambda x: x[Y0_IDX])
bottom_bboxes = [box for box in bottom_bboxes if box[Y0_IDX]==bottom_bboxes[0][Y0_IDX]]
# 然后再y0相同的情况下,找到x0最小的那个
bottom_bboxes.sort(key=lambda x: x[X0_IDX])
bottom_bboxes = bottom_bboxes[0]
else:
bottom_bboxes = None
return bottom_bboxes
def find_top_bbox_direct_from_left_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox上侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
top_bboxes = [box for box in all_bboxes if box[Y1_IDX] <= this_bbox[Y0_IDX] and any([
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(top_bboxes)>0:
# y1最大, X0最小的那个
top_bboxes.sort(key=lambda x: x[Y1_IDX], reverse=True)
top_bboxes = [box for box in top_bboxes if box[Y1_IDX]==top_bboxes[0][Y1_IDX]]
# 然后再y1相同的情况下,找到x0最小的那个
top_bboxes.sort(key=lambda x: x[X0_IDX])
top_bboxes = top_bboxes[0]
else:
top_bboxes = None
return top_bboxes
def find_top_bbox_direct_from_right_edge(this_bbox, all_bboxes) -> list:
"""
找到在this_bbox上侧且距离this_bbox距离最近的bbox.必须是直接遮挡的那种
"""
top_bboxes = [box for box in all_bboxes if box[Y1_IDX] <= this_bbox[Y0_IDX] and any([
box[X0_IDX] < this_bbox[X0_IDX] < box[X1_IDX], box[X0_IDX] < this_bbox[X1_IDX] < box[X1_IDX],
this_bbox[X0_IDX] < box[X0_IDX] < this_bbox[X1_IDX], this_bbox[X0_IDX] < box[X1_IDX] < this_bbox[X1_IDX],
box[X0_IDX]==this_bbox[X0_IDX] and box[X1_IDX]==this_bbox[X1_IDX]])]
if len(top_bboxes)>0:
# y1最大, X1最大的那个
top_bboxes.sort(key=lambda x: x[Y1_IDX], reverse=True)
top_bboxes = [box for box in top_bboxes if box[Y1_IDX]==top_bboxes[0][Y1_IDX]]
# 然后再y1相同的情况下,找到x1最大的那个
top_bboxes.sort(key=lambda x: x[X1_IDX], reverse=True)
top_bboxes = top_bboxes[0]
else:
top_bboxes = None
return top_bboxes
# ===================================================================================================================
def get_left_edge_bboxes(all_bboxes) -> list:
"""
返回最左边的bbox
"""
left_bboxes = [box for box in all_bboxes if find_all_left_bbox_direct(box, all_bboxes) is None]
return left_bboxes
def get_right_edge_bboxes(all_bboxes) -> list:
"""
返回最右边的bbox
"""
right_bboxes = [box for box in all_bboxes if find_all_right_bbox_direct(box, all_bboxes) is None]
return right_bboxes
def fix_vertical_bbox_pos(bboxes:list):
"""
检查这批bbox在垂直方向是否有轻微的重叠,如果重叠了,就把重叠的bbox往下移动一点
在x方向上必须一个包含或者被包含,或者完全重叠,不能只有部分重叠
"""
bboxes.sort(key=lambda x: x[Y0_IDX]) # 从上向下排列
for i in range(0, len(bboxes)):
for j in range(i+1, len(bboxes)):
if _is_bottom_full_overlap(bboxes[i][:4], bboxes[j][:4]):
# 如果两个bbox有部分重叠,那么就把下面的bbox往下移动一点
bboxes[j][Y0_IDX] = bboxes[i][Y1_IDX] + 2 # 2是个经验值
break
return bboxes
This diff is collapsed.
"""
找到能分割布局的水平的横线、色块
"""
import os
from magic_pdf.libs.commons import fitz
from magic_pdf.libs.boxbase import _is_in_or_part_overlap
def __rect_filter_by_width(rect, page_w, page_h):
mid_x = page_w/2
if rect[0]< mid_x < rect[2]:
return True
return False
def __rect_filter_by_pos(rect, image_bboxes, table_bboxes):
"""
不能出现在table和image的位置
"""
for box in image_bboxes:
if _is_in_or_part_overlap(rect, box):
return False
for box in table_bboxes:
if _is_in_or_part_overlap(rect, box):
return False
return True
def __debug_show_page(page, bboxes1: list,bboxes2: list,bboxes3: list,):
save_path = "./tmp/debug.pdf"
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open('')
width = page.rect.width
height = page.rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes1:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['blue'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes2:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor['yellow'], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes3:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor['red'], fill=None)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def get_spilter_of_page(page, image_bboxes, table_bboxes):
"""
获取到色块和横线
"""
cdrawings = page.get_cdrawings()
spilter_bbox = []
for block in cdrawings:
if 'fill' in block:
fill = block['fill']
if 'fill' in block and block['fill'] and block['fill']!=(1.0,1.0,1.0):
rect = block['rect']
if __rect_filter_by_width(rect, page.rect.width, page.rect.height) and __rect_filter_by_pos(rect, image_bboxes, table_bboxes):
spilter_bbox.append(list(rect))
"""过滤、修正一下这些box。因为有时候会有一些矩形,高度为0或者为负数,造成layout计算无限循环。如果是负高度或者0高度,统一修正为高度为1"""
for box in spilter_bbox:
if box[3]-box[1] <= 0:
box[3] = box[1] + 1
#__debug_show_page(page, spilter_bbox, [], [])
return spilter_bbox
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment