Commit b8adb630 authored by liukaiwen's avatar liukaiwen
Browse files

Merge branch 'master' of github.com:papayalove/Magic-PDF

# Conflicts:
#	docs/how_to_download_models_zh_cn.md
parents 6de68f06 52069612
...@@ -3,34 +3,29 @@ from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter ...@@ -3,34 +3,29 @@ from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from loguru import logger from loguru import logger
MODE_TXT = "text"
MODE_BIN = "binary"
class DiskReaderWriter(AbsReaderWriter): class DiskReaderWriter(AbsReaderWriter):
def __init__(self, parent_path, encoding="utf-8"): def __init__(self, parent_path, encoding="utf-8"):
self.path = parent_path self.path = parent_path
self.encoding = encoding self.encoding = encoding
def read(self, path, mode=MODE_TXT): def read(self, path, mode=AbsReaderWriter.MODE_TXT):
if os.path.isabs(path): if os.path.isabs(path):
abspath = path abspath = path
else: else:
abspath = os.path.join(self.path, path) abspath = os.path.join(self.path, path)
if not os.path.exists(abspath): if not os.path.exists(abspath):
logger.error(f"文件 {abspath} 不存在") logger.error(f"file {abspath} not exists")
raise Exception(f"文件 {abspath} 不存在") raise Exception(f"file {abspath} no exists")
if mode == MODE_TXT: if mode == AbsReaderWriter.MODE_TXT:
with open(abspath, "r", encoding=self.encoding) as f: with open(abspath, "r", encoding=self.encoding) as f:
return f.read() return f.read()
elif mode == MODE_BIN: elif mode == AbsReaderWriter.MODE_BIN:
with open(abspath, "rb") as f: with open(abspath, "rb") as f:
return f.read() return f.read()
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
def write(self, content, path, mode=MODE_TXT): def write(self, content, path, mode=AbsReaderWriter.MODE_TXT):
if os.path.isabs(path): if os.path.isabs(path):
abspath = path abspath = path
else: else:
...@@ -38,29 +33,42 @@ class DiskReaderWriter(AbsReaderWriter): ...@@ -38,29 +33,42 @@ class DiskReaderWriter(AbsReaderWriter):
directory_path = os.path.dirname(abspath) directory_path = os.path.dirname(abspath)
if not os.path.exists(directory_path): if not os.path.exists(directory_path):
os.makedirs(directory_path) os.makedirs(directory_path)
if mode == MODE_TXT: if mode == AbsReaderWriter.MODE_TXT:
with open(abspath, "w", encoding=self.encoding, errors="replace") as f: with open(abspath, "w", encoding=self.encoding, errors="replace") as f:
f.write(content) f.write(content)
elif mode == MODE_BIN: elif mode == AbsReaderWriter.MODE_BIN:
with open(abspath, "wb") as f: with open(abspath, "wb") as f:
f.write(content) f.write(content)
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
def read_jsonl(self, path: str, byte_start=0, byte_end=None, encoding="utf-8"): def read_offset(self, path: str, offset=0, limit=None):
return self.read(path) abspath = path
if not os.path.isabs(path):
abspath = os.path.join(self.path, path)
with open(abspath, "rb") as f:
f.seek(offset)
return f.read(limit)
# 使用示例
if __name__ == "__main__": if __name__ == "__main__":
file_path = "io/test/example.txt" if 0:
drw = DiskReaderWriter("D:\projects\papayfork\Magic-PDF\magic_pdf") file_path = "io/test/example.txt"
drw = DiskReaderWriter("D:\projects\papayfork\Magic-PDF\magic_pdf")
# 写入内容到文件
drw.write(b"Hello, World!", path="io/test/example.txt", mode="binary")
# 从文件读取内容
content = drw.read(path=file_path)
if content:
logger.info(f"从 {file_path} 读取的内容: {content}")
if 1:
drw = DiskReaderWriter("/opt/data/pdf/resources/test/io/")
content_bin = drw.read_offset("1.txt")
assert content_bin == b"ABCD!"
# 写入内容到文件 content_bin = drw.read_offset("1.txt", offset=1, limit=2)
drw.write(b"Hello, World!", path="io/test/example.txt", mode="binary") assert content_bin == b"BC"
# 从文件读取内容
content = drw.read(path=file_path)
if content:
logger.info(f"从 {file_path} 读取的内容: {content}")
...@@ -2,16 +2,18 @@ from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter ...@@ -2,16 +2,18 @@ from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key, join_path from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key, join_path
import boto3 import boto3
from loguru import logger from loguru import logger
from boto3.s3.transfer import TransferConfig
from botocore.config import Config from botocore.config import Config
import os
MODE_TXT = "text"
MODE_BIN = "binary"
class S3ReaderWriter(AbsReaderWriter): class S3ReaderWriter(AbsReaderWriter):
def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str = 'auto', parent_path: str = ''): def __init__(
self,
ak: str,
sk: str,
endpoint_url: str,
addressing_style: str = "auto",
parent_path: str = "",
):
self.client = self._get_client(ak, sk, endpoint_url, addressing_style) self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
self.path = parent_path self.path = parent_path
...@@ -21,12 +23,14 @@ class S3ReaderWriter(AbsReaderWriter): ...@@ -21,12 +23,14 @@ class S3ReaderWriter(AbsReaderWriter):
aws_access_key_id=ak, aws_access_key_id=ak,
aws_secret_access_key=sk, aws_secret_access_key=sk,
endpoint_url=endpoint_url, endpoint_url=endpoint_url,
config=Config(s3={"addressing_style": addressing_style}, config=Config(
retries={'max_attempts': 5, 'mode': 'standard'}), s3={"addressing_style": addressing_style},
retries={"max_attempts": 5, "mode": "standard"},
),
) )
return s3_client return s3_client
def read(self, s3_relative_path, mode=MODE_TXT, encoding="utf-8"): def read(self, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
if s3_relative_path.startswith("s3://"): if s3_relative_path.startswith("s3://"):
s3_path = s3_relative_path s3_path = s3_relative_path
else: else:
...@@ -34,22 +38,22 @@ class S3ReaderWriter(AbsReaderWriter): ...@@ -34,22 +38,22 @@ class S3ReaderWriter(AbsReaderWriter):
bucket_name, key = parse_bucket_key(s3_path) bucket_name, key = parse_bucket_key(s3_path)
res = self.client.get_object(Bucket=bucket_name, Key=key) res = self.client.get_object(Bucket=bucket_name, Key=key)
body = res["Body"].read() body = res["Body"].read()
if mode == MODE_TXT: if mode == AbsReaderWriter.MODE_TXT:
data = body.decode(encoding) # Decode bytes to text data = body.decode(encoding) # Decode bytes to text
elif mode == MODE_BIN: elif mode == AbsReaderWriter.MODE_BIN:
data = body data = body
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
return data return data
def write(self, content, s3_relative_path, mode=MODE_TXT, encoding="utf-8"): def write(self, content, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
if s3_relative_path.startswith("s3://"): if s3_relative_path.startswith("s3://"):
s3_path = s3_relative_path s3_path = s3_relative_path
else: else:
s3_path = join_path(self.path, s3_relative_path) s3_path = join_path(self.path, s3_relative_path)
if mode == MODE_TXT: if mode == AbsReaderWriter.MODE_TXT:
body = content.encode(encoding) # Encode text data as bytes body = content.encode(encoding) # Encode text data as bytes
elif mode == MODE_BIN: elif mode == AbsReaderWriter.MODE_BIN:
body = content body = content
else: else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.") raise ValueError("Invalid mode. Use 'text' or 'binary'.")
...@@ -57,51 +61,82 @@ class S3ReaderWriter(AbsReaderWriter): ...@@ -57,51 +61,82 @@ class S3ReaderWriter(AbsReaderWriter):
self.client.put_object(Body=body, Bucket=bucket_name, Key=key) self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
logger.info(f"内容已写入 {s3_path} ") logger.info(f"内容已写入 {s3_path} ")
def read_jsonl(self, path: str, byte_start=0, byte_end=None, mode=MODE_TXT, encoding='utf-8'): def read_offset(self, path: str, offset=0, limit=None) -> bytes:
if path.startswith("s3://"): if path.startswith("s3://"):
s3_path = path s3_path = path
else: else:
s3_path = join_path(self.path, path) s3_path = join_path(self.path, path)
bucket_name, key = parse_bucket_key(s3_path) bucket_name, key = parse_bucket_key(s3_path)
range_header = f'bytes={byte_start}-{byte_end}' if byte_end else f'bytes={byte_start}-' range_header = (
f"bytes={offset}-{offset+limit-1}" if limit else f"bytes={offset}-"
)
res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header) res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
body = res["Body"].read() return res["Body"].read()
if mode == MODE_TXT:
data = body.decode(encoding) # Decode bytes to text
elif mode == MODE_BIN:
data = body
else:
raise ValueError("Invalid mode. Use 'text' or 'binary'.")
return data
if __name__ == "__main__": if __name__ == "__main__":
# Config the connection info if 0:
ak = "" # Config the connection info
sk = "" ak = ""
endpoint_url = "" sk = ""
addressing_style = "auto" endpoint_url = ""
bucket_name = "" addressing_style = "auto"
# Create an S3ReaderWriter object bucket_name = ""
s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style, "s3://bucket_name/") # Create an S3ReaderWriter object
s3_reader_writer = S3ReaderWriter(
ak, sk, endpoint_url, addressing_style, "s3://bucket_name/"
)
# Write text data to S3 # Write text data to S3
text_data = "This is some text data" text_data = "This is some text data"
s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT) s3_reader_writer.write(
text_data,
s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
mode=AbsReaderWriter.MODE_TXT,
)
# Read text data from S3
text_data_read = s3_reader_writer.read(
s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_TXT
)
logger.info(f"Read text data from S3: {text_data_read}")
# Write binary data to S3
binary_data = b"This is some binary data"
s3_reader_writer.write(
text_data,
s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
mode=AbsReaderWriter.MODE_BIN,
)
# Read text data from S3 # Read binary data from S3
text_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT) binary_data_read = s3_reader_writer.read(
logger.info(f"Read text data from S3: {text_data_read}") s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_BIN
# Write binary data to S3 )
binary_data = b"This is some binary data" logger.info(f"Read binary data from S3: {binary_data_read}")
s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
# Range Read text data from S3
binary_data_read = s3_reader_writer.read_offset(
path=f"s3://{bucket_name}/ebook/test/test.json", offset=0, limit=10
)
logger.info(f"Read binary data from S3: {binary_data_read}")
if 1:
import os
import json
# Read binary data from S3 ak = os.getenv("AK", "")
binary_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN) sk = os.getenv("SK", "")
logger.info(f"Read binary data from S3: {binary_data_read}") endpoint_url = os.getenv("ENDPOINT", "")
bucket = os.getenv("S3_BUCKET", "")
prefix = os.getenv("S3_PREFIX", "")
key_basename = os.getenv("S3_KEY_BASENAME", "")
s3_reader_writer = S3ReaderWriter(
ak, sk, endpoint_url, "auto", f"s3://{bucket}/{prefix}"
)
content_bin = s3_reader_writer.read_offset(key_basename)
assert content_bin[:10] == b'{"track_id'
assert content_bin[-10:] == b'r":null}}\n'
# Range Read text data from S3 content_bin = s3_reader_writer.read_offset(key_basename, offset=424, limit=426)
binary_data_read = s3_reader_writer.read_jsonl(path=f"s3://{bucket_name}/ebook/test/test.json", jso = json.dumps(content_bin.decode("utf-8"))
byte_start=0, byte_end=10, mode=MODE_BIN) print(jso)
logger.info(f"Read binary data from S3: {binary_data_read}")
import os
import click
from loguru import logger
from pathlib import Path
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
import magic_pdf.model as model_config
from magic_pdf.tools.common import parse_pdf_methods, do_parse
from magic_pdf.libs.version import __version__
@click.command()
@click.version_option(__version__, "--version", "-v", help="display the version and exit")
@click.option(
"-p",
"--path",
"path",
type=click.Path(exists=True),
required=True,
help="local pdf filepath or directory",
)
@click.option(
"-o",
"--output-dir",
"output_dir",
type=str,
help="output local directory",
default="",
)
@click.option(
"-m",
"--method",
"method",
type=parse_pdf_methods,
help="""the method for parsing pdf.
ocr: using ocr technique to extract information from pdf.
txt: suitable for the text-based pdf only and outperform ocr.
auto: automatically choose the best method for parsing pdf from ocr and txt.
without method specified, auto will be used by default.""",
default="auto",
)
def cli(path, output_dir, method):
model_config.__use_inside_model__ = True
model_config.__model_mode__ = "full"
if output_dir == "":
if os.path.isdir(path):
output_dir = os.path.join(path, "output")
else:
output_dir = os.path.join(os.path.dirname(path), "output")
def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
def parse_doc(doc_path: str):
try:
file_name = str(Path(doc_path).stem)
pdf_data = read_fn(doc_path)
do_parse(
output_dir,
file_name,
pdf_data,
[],
method,
)
except Exception as e:
logger.exception(e)
if os.path.isdir(path):
for doc_path in Path(path).glob("*.pdf"):
parse_doc(doc_path)
else:
parse_doc(path)
if __name__ == "__main__":
cli()
import os
import json as json_parse
import click
from pathlib import Path
from magic_pdf.libs.path_utils import (
parse_s3path,
parse_s3_range_params,
remove_non_official_s3_args,
)
from magic_pdf.libs.config_reader import (
get_s3_config,
)
from magic_pdf.rw.S3ReaderWriter import S3ReaderWriter
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
import magic_pdf.model as model_config
from magic_pdf.tools.common import parse_pdf_methods, do_parse
from magic_pdf.libs.version import __version__
def read_s3_path(s3path):
bucket, key = parse_s3path(s3path)
s3_ak, s3_sk, s3_endpoint = get_s3_config(bucket)
s3_rw = S3ReaderWriter(
s3_ak, s3_sk, s3_endpoint, "auto", remove_non_official_s3_args(s3path)
)
may_range_params = parse_s3_range_params(s3path)
if may_range_params is None or 2 != len(may_range_params):
byte_start, byte_end = 0, None
else:
byte_start, byte_end = int(may_range_params[0]), int(may_range_params[1])
byte_end += byte_start - 1
return s3_rw.read_jsonl(
remove_non_official_s3_args(s3path),
byte_start,
byte_end,
AbsReaderWriter.MODE_BIN,
)
@click.group()
@click.version_option(__version__, "--version", "-v", help="显示版本信息")
def cli():
pass
@cli.command()
@click.option(
"-j",
"--jsonl",
"jsonl",
type=str,
help="输入 jsonl 路径,本地或者 s3 上的文件",
required=True,
)
@click.option(
"-m",
"--method",
"method",
type=parse_pdf_methods,
help="指定解析方法。txt: 文本型 pdf 解析方法, ocr: 光学识别解析 pdf, auto: 程序智能选择解析方法",
default="auto",
)
@click.option(
"-o",
"--output-dir",
"output_dir",
type=str,
help="输出到本地目录",
default="",
)
def jsonl(jsonl, method, output_dir):
print("haha")
model_config.__use_inside_model__ = False
full_jsonl_path = os.path.realpath(jsonl)
if output_dir == "":
output_dir = os.path.join(os.path.dirname(full_jsonl_path), "output")
if jsonl.startswith("s3://"):
jso = json_parse.loads(read_s3_path(jsonl).decode("utf-8"))
else:
with open(jsonl) as f:
jso = json_parse.loads(f.readline())
s3_file_path = jso.get("file_location")
if s3_file_path is None:
s3_file_path = jso.get("path")
pdf_file_name = Path(s3_file_path).stem
pdf_data = read_s3_path(s3_file_path)
print(pdf_file_name, jso, method)
do_parse(
output_dir,
pdf_file_name,
pdf_data,
jso["doc_layout_result"],
method,
f_dump_content_list=True,
)
@cli.command()
@click.option(
"-p",
"--pdf",
"pdf",
type=click.Path(exists=True),
required=True,
help="本地 PDF 文件",
)
@click.option(
"-j",
"--json",
"json_data",
type=click.Path(exists=True),
required=True,
help="本地模型推理出的 json 数据",
)
@click.option(
"-o", "--output-dir", "output_dir", type=str, help="本地输出目录", default=""
)
@click.option(
"-m",
"--method",
"method",
type=parse_pdf_methods,
help="指定解析方法。txt: 文本型 pdf 解析方法, ocr: 光学识别解析 pdf, auto: 程序智能选择解析方法",
default="auto",
)
def pdf(pdf, json_data, output_dir, method):
model_config.__use_inside_model__ = False
full_pdf_path = os.path.realpath(pdf)
if output_dir == "":
output_dir = os.path.join(os.path.dirname(full_pdf_path), "output")
def read_fn(path):
disk_rw = DiskReaderWriter(os.path.dirname(path))
return disk_rw.read(os.path.basename(path), AbsReaderWriter.MODE_BIN)
model_json_list = json_parse.loads(read_fn(json_data).decode("utf-8"))
file_name = str(Path(full_pdf_path).stem)
pdf_data = read_fn(full_pdf_path)
do_parse(
output_dir,
file_name,
pdf_data,
model_json_list,
method,
f_dump_content_list=True,
)
if __name__ == "__main__":
cli()
import os
import json as json_parse
import copy
import click
from loguru import logger
from magic_pdf.libs.MakeContentConfig import DropMode, MakeMode
from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
import magic_pdf.model as model_config
def prepare_env(output_dir, pdf_file_name, method):
local_parent_dir = os.path.join(output_dir, pdf_file_name, method)
local_image_dir = os.path.join(str(local_parent_dir), "images")
local_md_dir = local_parent_dir
os.makedirs(local_image_dir, exist_ok=True)
os.makedirs(local_md_dir, exist_ok=True)
return local_image_dir, local_md_dir
def do_parse(
output_dir,
pdf_file_name,
pdf_bytes,
model_list,
parse_method,
f_draw_span_bbox=True,
f_draw_layout_bbox=True,
f_dump_md=True,
f_dump_middle_json=True,
f_dump_model_json=True,
f_dump_orig_pdf=True,
f_dump_content_list=False,
f_make_md_mode=MakeMode.MM_MD,
):
orig_model_list = copy.deepcopy(model_list)
local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
image_writer, md_writer = DiskReaderWriter(local_image_dir), DiskReaderWriter(
local_md_dir
)
image_dir = str(os.path.basename(local_image_dir))
if parse_method == "auto":
jso_useful_key = {"_pdf_type": "", "model_list": model_list}
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer, is_debug=True)
elif parse_method == "txt":
pipe = TXTPipe(pdf_bytes, model_list, image_writer, is_debug=True)
elif parse_method == "ocr":
pipe = OCRPipe(pdf_bytes, model_list, image_writer, is_debug=True)
else:
logger.error("unknown parse method")
exit(1)
pipe.pipe_classify()
if len(model_list) == 0:
if model_config.__use_inside_model__:
pipe.pipe_analyze()
orig_model_list = copy.deepcopy(pipe.model_list)
else:
logger.error("need model list input")
exit(2)
pipe.pipe_parse()
pdf_info = pipe.pdf_mid_data["pdf_info"]
if f_draw_layout_bbox:
draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir)
if f_draw_span_bbox:
draw_span_bbox(pdf_info, pdf_bytes, local_md_dir)
md_content = pipe.pipe_mk_markdown(
image_dir, drop_mode=DropMode.NONE, md_make_mode=f_make_md_mode
)
if f_dump_md:
md_writer.write(
content=md_content,
path=f"{pdf_file_name}.md",
mode=AbsReaderWriter.MODE_TXT,
)
if f_dump_middle_json:
md_writer.write(
content=json_parse.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
path="middle.json",
mode=AbsReaderWriter.MODE_TXT,
)
if f_dump_model_json:
md_writer.write(
content=json_parse.dumps(orig_model_list, ensure_ascii=False, indent=4),
path="model.json",
mode=AbsReaderWriter.MODE_TXT,
)
if f_dump_orig_pdf:
md_writer.write(
content=pdf_bytes,
path="origin.pdf",
mode=AbsReaderWriter.MODE_BIN,
)
content_list = pipe.pipe_mk_uni_format(image_dir, drop_mode=DropMode.NONE)
if f_dump_content_list:
md_writer.write(
content=json_parse.dumps(content_list, ensure_ascii=False, indent=4),
path="content_list.json",
mode=AbsReaderWriter.MODE_TXT,
)
logger.info(f"local output dir is {local_md_dir}")
parse_pdf_methods = click.Choice(["ocr", "txt", "auto"])
def convert_to_train_format(jso: dict) -> []:
pages = []
for k, v in jso.items():
if not k.startswith("page_"):
continue
page_idx = v["page_idx"]
width, height = v["page_size"]
info = {"page_info": {"page_no": page_idx, "height": height, "width": width}}
bboxes: list[dict] = []
for img_bbox in v["image_bboxes_with_caption"]:
bbox = {"category_id": 1, "bbox": img_bbox["bbox"]}
if "caption" in img_bbox:
bbox["caption_bbox"] = img_bbox["caption"]
bboxes.append(bbox)
for tbl_bbox in v["table_bboxes_with_caption"]:
bbox = {"category_id": 7, "bbox": tbl_bbox["bbox"]}
if "caption" in tbl_bbox:
bbox["caption_bbox"] = tbl_bbox["caption"]
bboxes.append(bbox)
for bbox in v["bak_page_no_bboxes"]:
n_bbox = {"category_id": 4, "bbox": bbox}
bboxes.append(n_bbox)
for bbox in v["bak_header_bboxes"]:
n_bbox = {"category_id": 3, "bbox": bbox}
bboxes.append(n_bbox)
for bbox in v["bak_footer_bboxes"]:
n_bbox = {"category_id": 6, "bbox": bbox}
bboxes.append(n_bbox)
# 脚注, 目前没有看到例子
for para in v["para_blocks"]:
if "paras" in para:
paras = para["paras"]
for para_key, para_content in paras.items():
para_bbox = para_content["para_bbox"]
is_para_title = para_content["is_para_title"]
if is_para_title:
n_bbox = {"category_id": 0, "bbox": para_bbox}
else:
n_bbox = {"category_id": 2, "bbox": para_bbox}
bboxes.append(n_bbox)
for inline_equation in v["inline_equations"]:
n_bbox = {"category_id": 13, "bbox": inline_equation["bbox"]}
bboxes.append(n_bbox)
for inter_equation in v["interline_equations"]:
n_bbox = {"category_id": 10, "bbox": inter_equation["bbox"]}
bboxes.append(n_bbox)
for footnote_bbox in v["bak_footer_note_bboxes"]:
n_bbox = {"category_id": 5, "bbox": list(footnote_bbox)}
bboxes.append(n_bbox)
info["bboxes"] = bboxes
info["layout_tree"] = v["layout_bboxes"]
pages.append(info)
return pages
from magic_pdf.libs.boxbase import _is_in
def extract_caption_bbox(outer: list, inner: list) -> list:
"""
ret: list of {
"bbox": [1,2,3,4],
"caption": [5,6,7,8] # may existed
}
"""
found_count = 0 # for debug
print(outer, inner)
def is_float_equal(a, b):
if 0.01 > abs(a - b): # non strict float equal compare
return True
return False
outer_h = {i: outer[i] for i in range(len(outer))}
ret = []
for v in inner:
ix0, iy0, ix1, iy1 = v
found_idx = None
d = {"bbox": v[:4]}
for k in outer_h:
ox0, oy0, ox1, oy1 = outer_h[k]
equal_float_flags = [
is_float_equal(ix0, ox0),
is_float_equal(iy0, oy0),
is_float_equal(ix1, ox1),
is_float_equal(iy1, oy1),
]
if _is_in(v, outer_h[k]) and not all(equal_float_flags):
found_idx = k
break
if found_idx is not None:
found_count += 1
captions: list[list] = []
ox0, oy0, ox1, oy1 = outer_h[found_idx]
captions = [
[ox0, oy0, ix0, oy1],
[ox0, oy0, ox1, iy0],
[ox0, iy1, ox1, oy1],
[ix1, oy0, ox1, oy1],
]
captions = sorted(
captions,
key=lambda rect: abs(rect[0] - rect[2]) * abs(rect[1] - rect[3]),
) # 面积最大的框就是caption
d["caption"] = captions[-1]
outer_h.pop(
found_idx
) # 同一个 outer box 只能用于确定一个 inner box 的 caption 位置。
ret.append(d)
print("found_count: ", found_count)
return ret
import re
from magic_pdf.libs.boxbase import _is_in_or_part_overlap
from magic_pdf.libs.drop_tag import CONTENT_IN_FOOT_OR_HEADER, PAGE_NO
"""
copy from pre_proc/remove_footer_header.py
"""
def remove_headder_footer_one_page(
text_raw_blocks,
image_bboxes,
table_bboxes,
header_bboxs,
footer_bboxs,
page_no_bboxs,
page_w,
page_h,
):
"""
删除页眉页脚,页码
从line级别进行删除,删除之后观察这个text-block是否是空的,如果是空的,则移动到remove_list中
"""
if 1:
return image_bboxes, table_bboxes, text_raw_blocks, [], [], []
header = []
footer = []
if len(header) == 0:
model_header = header_bboxs
if model_header:
x0 = min([x for x, _, _, _ in model_header])
y0 = min([y for _, y, _, _ in model_header])
x1 = max([x1 for _, _, x1, _ in model_header])
y1 = max([y1 for _, _, _, y1 in model_header])
header = [x0, y0, x1, y1]
if len(footer) == 0:
model_footer = footer_bboxs
if model_footer:
x0 = min([x for x, _, _, _ in model_footer])
y0 = min([y for _, y, _, _ in model_footer])
x1 = max([x1 for _, _, x1, _ in model_footer])
y1 = max([y1 for _, _, _, y1 in model_footer])
footer = [x0, y0, x1, y1]
header_y0 = 0 if len(header) == 0 else header[3]
footer_y0 = page_h if len(footer) == 0 else footer[1]
if page_no_bboxs:
top_part = [b for b in page_no_bboxs if b[3] < page_h / 2]
btn_part = [b for b in page_no_bboxs if b[1] > page_h / 2]
top_max_y0 = max([b[1] for b in top_part]) if top_part else 0
btn_min_y1 = min([b[3] for b in btn_part]) if btn_part else page_h
header_y0 = max(header_y0, top_max_y0)
footer_y0 = min(footer_y0, btn_min_y1)
content_boundry = [0, header_y0, page_w, footer_y0]
header = [0, 0, page_w, header_y0]
footer = [0, footer_y0, page_w, page_h]
"""以上计算出来了页眉页脚的边界,下面开始进行删除"""
text_block_to_remove = []
# 首先检查每个textblock
for blk in text_raw_blocks:
if len(blk["lines"]) > 0:
for line in blk["lines"]:
line_del = []
for span in line["spans"]:
span_del = []
if span["bbox"][3] < header_y0:
span_del.append(span)
elif _is_in_or_part_overlap(
span["bbox"], header
) or _is_in_or_part_overlap(span["bbox"], footer):
span_del.append(span)
for span in span_del:
line["spans"].remove(span)
if not line["spans"]:
line_del.append(line)
for line in line_del:
blk["lines"].remove(line)
else:
# if not blk['lines']:
blk["tag"] = CONTENT_IN_FOOT_OR_HEADER
text_block_to_remove.append(blk)
"""有的时候由于pageNo太小了,总是会有一点和content_boundry重叠一点,被放入正文,因此对于pageNo,进行span粒度的删除"""
page_no_block_2_remove = []
if page_no_bboxs:
for pagenobox in page_no_bboxs:
for block in text_raw_blocks:
if _is_in_or_part_overlap(
pagenobox, block["bbox"]
): # 在span级别删除页码
for line in block["lines"]:
for span in line["spans"]:
if _is_in_or_part_overlap(pagenobox, span["bbox"]):
# span['text'] = ''
span["tag"] = PAGE_NO
# 检查这个block是否只有这一个span,如果是,那么就把这个block也删除
if len(line["spans"]) == 1 and len(block["lines"]) == 1:
page_no_block_2_remove.append(block)
else:
# 测试最后一个是不是页码:规则是,最后一个block仅有1个line,一个span,且text是数字,空格,符号组成,不含字母,并且包含数字
if len(text_raw_blocks) > 0:
text_raw_blocks.sort(key=lambda x: x["bbox"][1], reverse=True)
last_block = text_raw_blocks[0]
if len(last_block["lines"]) == 1:
last_line = last_block["lines"][0]
if len(last_line["spans"]) == 1:
last_span = last_line["spans"][0]
if (
last_span["text"].strip()
and not re.search("[a-zA-Z]", last_span["text"])
and re.search("[0-9]", last_span["text"])
):
last_span["tag"] = PAGE_NO
page_no_block_2_remove.append(last_block)
for b in page_no_block_2_remove:
text_block_to_remove.append(b)
for blk in text_block_to_remove:
if blk in text_raw_blocks:
text_raw_blocks.remove(blk)
text_block_remain = text_raw_blocks
image_bbox_to_remove = [
bbox
for bbox in image_bboxes
if not _is_in_or_part_overlap(bbox, content_boundry)
]
image_bbox_remain = [
bbox for bbox in image_bboxes if _is_in_or_part_overlap(bbox, content_boundry)
]
table_bbox_to_remove = [
bbox
for bbox in table_bboxes
if not _is_in_or_part_overlap(bbox, content_boundry)
]
table_bbox_remain = [
bbox for bbox in table_bboxes if _is_in_or_part_overlap(bbox, content_boundry)
]
# 1, 2, 3
return (
image_bbox_remain,
table_bbox_remain,
text_block_remain,
text_block_to_remove,
image_bbox_to_remove,
table_bbox_to_remove,
)
from magic_pdf.libs.commons import fitz
import os
from magic_pdf.libs.coordinate_transform import get_scale_ratio
def draw_model_output(
raw_pdf_doc: fitz.Document, paras_dict_arr: list[dict], save_path: str
):
"""
在page上画出bbox,保存到save_path
"""
"""
# {0: 'title', # 标题
# 1: 'figure', # 图片
# 2: 'plain text', # 文本
# 3: 'header', # 页眉
# 4: 'page number', # 页码
# 5: 'footnote', # 脚注
# 6: 'footer', # 页脚
# 7: 'table', # 表格
# 8: 'table caption', # 表格描述
# 9: 'figure caption', # 图片描述
# 10: 'equation', # 公式
# 11: 'full column', # 单栏
# 12: 'sub column', # 多栏
# 13: 'embedding', # 嵌入公式
# 14: 'isolated'} # 单行公式
"""
color_map = {
"body": fitz.pdfcolor["green"],
"non_body": fitz.pdfcolor["red"],
}
"""
{"layout_dets": [], "subfield_dets": [], "page_info": {"page_no": 22, "height": 1650, "width": 1275}}
"""
for i, page in enumerate(raw_pdf_doc):
v = paras_dict_arr[i]
page_idx = v["page_info"]["page_no"]
width = v["page_info"]["width"]
height = v["page_info"]["height"]
horizontal_scale_ratio, vertical_scale_ratio = get_scale_ratio(
paras_dict_arr[i], page
)
for order, block in enumerate(v["layout_dets"]):
L = block["poly"][0] / horizontal_scale_ratio
U = block["poly"][1] / vertical_scale_ratio
R = block["poly"][2] / horizontal_scale_ratio
D = block["poly"][5] / vertical_scale_ratio
# L += pageL # 有的页面,artBox偏移了。不在(0,0)
# R += pageL
# U += pageU
# D += pageU
L, R = min(L, R), max(L, R)
U, D = min(U, D), max(U, D)
bbox = [L, U, R, D]
color = color_map["body"]
if block["category_id"] in (3, 4, 5, 6, 0):
color = color_map["non_body"]
rect = fitz.Rect(bbox)
page.draw_rect(rect, fill=None, width=0.5, overlay=True, color=color)
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
raw_pdf_doc.save(save_path)
def debug_show_bbox(
raw_pdf_doc: fitz.Document,
page_idx: int,
bboxes: list,
droped_bboxes: list,
expect_drop_bboxes: list,
save_path: str,
expected_page_id: int,
):
"""
以覆盖的方式写个临时的pdf,用于debug
"""
if page_idx != expected_page_id:
return
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open("")
width = raw_pdf_doc[page_idx].rect.width
height = raw_pdf_doc[page_idx].rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(
color=fitz.pdfcolor["red"], fill=fitz.pdfcolor["blue"], fill_opacity=0.2
)
shape.finish()
shape.commit()
for bbox in droped_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor["yellow"], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in expect_drop_bboxes:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=None)
shape.finish()
shape.commit()
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(bboxes)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def debug_show_page(
page,
bboxes1: list,
bboxes2: list,
bboxes3: list,
):
save_path = "./tmp/debug.pdf"
if os.path.exists(save_path):
# 删除已经存在的文件
os.remove(save_path)
# 创建一个新的空白 PDF 文件
doc = fitz.open("")
width = page.rect.width
height = page.rect.height
new_page = doc.new_page(width=width, height=height)
shape = new_page.new_shape()
for bbox in bboxes1:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(
color=fitz.pdfcolor["red"], fill=fitz.pdfcolor["blue"], fill_opacity=0.2
)
shape.finish()
shape.commit()
for bbox in bboxes2:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=None, fill=fitz.pdfcolor["yellow"], fill_opacity=0.2)
shape.finish()
shape.commit()
for bbox in bboxes3:
# 原始box画上去
rect = fitz.Rect(*bbox[0:4])
shape = new_page.new_shape()
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=None)
shape.finish()
shape.commit()
parent_dir = os.path.dirname(save_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
doc.save(save_path)
doc.close()
def draw_layout_bbox_on_page(
raw_pdf_doc: fitz.Document, paras_dict: dict, header, footer, pdf_path: str
):
"""
在page上画出bbox,保存到save_path
"""
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open("")
for k, v in paras_dict.items():
page_idx = v["page_idx"]
layouts = v["layout_bboxes"]
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(layouts):
border_offset = 1
rect_box = layout["layout_bbox"]
layout_label = layout["layout_label"]
fill_color = fitz.pdfcolor["pink"] if layout_label == "U" else None
rect_box = [
rect_box[0] + 1,
rect_box[1] - border_offset,
rect_box[2] - 1,
rect_box[3] + border_offset,
]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=fill_color, fill_opacity=0.4)
"""
draw order text on layout box
"""
font_size = 10
shape.insert_text(
(rect_box[0] + 1, rect_box[1] + font_size),
f"{order}",
fontsize=font_size,
color=(0, 0, 0),
)
"""画上footer header"""
if header:
shape.draw_rect(fitz.Rect(header))
shape.finish(color=None, fill=fitz.pdfcolor["black"], fill_opacity=0.2)
if footer:
shape.draw_rect(fitz.Rect(footer))
shape.finish(color=None, fill=fitz.pdfcolor["black"], fill_opacity=0.2)
shape.commit()
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
@DeprecationWarning
def draw_layout_on_page(
raw_pdf_doc: fitz.Document, page_idx: int, page_layout: list, pdf_path: str
):
"""
把layout的box用红色边框花在pdf_path的page_idx上
"""
def draw(shape, layout, fill_color=fitz.pdfcolor["pink"]):
border_offset = 1
rect_box = layout["layout_bbox"]
layout_label = layout["layout_label"]
sub_layout = layout["sub_layout"]
if len(sub_layout) == 0:
fill_color = fill_color if layout_label == "U" else None
rect_box = [
rect_box[0] + 1,
rect_box[1] - border_offset,
rect_box[2] - 1,
rect_box[3] + border_offset,
]
rect = fitz.Rect(*rect_box)
shape.draw_rect(rect)
shape.finish(color=fitz.pdfcolor["red"], fill=fill_color, fill_opacity=0.2)
# if layout_label=='U':
# bad_boxes = layout.get("bad_boxes", [])
# for bad_box in bad_boxes:
# rect = fitz.Rect(*bad_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['red'], fill=fitz.pdfcolor['red'], fill_opacity=0.2)
# else:
# rect = fitz.Rect(*rect_box)
# shape.draw_rect(rect)
# shape.finish(color=fitz.pdfcolor['blue'])
for sub_layout in sub_layout:
draw(shape, sub_layout)
shape.commit()
# 检查文件是否存在
is_new_pdf = False
if os.path.exists(pdf_path):
# 打开现有的 PDF 文件
doc = fitz.open(pdf_path)
else:
# 创建一个新的空白 PDF 文件
is_new_pdf = True
doc = fitz.open("")
page = doc[page_idx]
shape = page.new_shape()
for order, layout in enumerate(page_layout):
draw(shape, layout, fitz.pdfcolor["yellow"])
# shape.insert_textbox(fitz.Rect(200, 0, 600, 20), f"total bboxes: {len(layout)}", fontname="helv", fontsize=12,
# color=(0, 0, 0))
# shape.finish(color=fitz.pdfcolor['black'])
# shape.commit()
parent_dir = os.path.dirname(pdf_path)
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
if is_new_pdf:
doc.save(pdf_path)
else:
doc.saveIncr()
doc.close()
...@@ -36,12 +36,15 @@ if __name__ == '__main__': ...@@ -36,12 +36,15 @@ if __name__ == '__main__':
"paddlepaddle==3.0.0b1;platform_system=='Linux'", "paddlepaddle==3.0.0b1;platform_system=='Linux'",
"paddlepaddle==2.6.1;platform_system=='Windows' or platform_system=='Darwin'", "paddlepaddle==2.6.1;platform_system=='Windows' or platform_system=='Darwin'",
], ],
"full": ["unimernet==0.1.6", "full": ["unimernet==0.1.6", # 0.1.6版本大幅裁剪依赖包范围,推荐使用此版本
"matplotlib", "matplotlib<=3.9.0;platform_system=='Windows'", # 3.9.1及之后不提供windows的预编译包,避免一些没有编译环境的windows设备安装失败
"ultralytics", "matplotlib;platform_system=='Linux' or platform_system=='Darwin'", # linux 和 macos 不应限制matplotlib的最高版本,以避免无法更新导致的一些bug
"paddleocr==2.7.3", "ultralytics", # yolov8,公式检测
"paddlepaddle==3.0.0b1;platform_system=='Linux'", "paddleocr==2.7.3", # 2.8.0及2.8.1版本与detectron2有冲突,需锁定2.7.3
"paddlepaddle==2.6.1;platform_system=='Windows' or platform_system=='Darwin'", "paddlepaddle==3.0.0b1;platform_system=='Linux'", # 解决linux的段异常问题
"paddlepaddle==2.6.1;platform_system=='Windows' or platform_system=='Darwin'", # windows版本3.0.0b1效率下降,需锁定2.6.1
"pypandoc", # 表格解析latex转html
"struct-eqtable==0.1.0", # 表格解析
"detectron2" "detectron2"
], ],
}, },
...@@ -52,7 +55,8 @@ if __name__ == '__main__': ...@@ -52,7 +55,8 @@ if __name__ == '__main__':
python_requires=">=3.9", # 项目依赖的 Python 版本 python_requires=">=3.9", # 项目依赖的 Python 版本
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [
"magic-pdf = magic_pdf.cli.magicpdf:cli" "magic-pdf = magic_pdf.tools.cli:cli",
"magic-pdf-dev = magic_pdf.tools.cli_dev:cli"
], ],
}, # 项目提供的可执行命令 }, # 项目提供的可执行命令
include_package_data=True, # 是否包含非代码文件,如数据文件、配置文件等 include_package_data=True, # 是否包含非代码文件,如数据文件、配置文件等
......
{"file_location":"tests/test_tools/assets/cli_dev/cli_test_01.pdf","doc_layout_result":[{"layout_dets":[{"category_id":1,"poly":[882.4013061523438,169.93817138671875,1552.350341796875,169.93817138671875,1552.350341796875,625.8263549804688,882.4013061523438,625.8263549804688],"score":0.999992311000824},{"category_id":1,"poly":[882.474853515625,1450.92822265625,1551.4490966796875,1450.92822265625,1551.4490966796875,1877.5712890625,882.474853515625,1877.5712890625],"score":0.9999903440475464},{"category_id":1,"poly":[881.6513061523438,626.2058715820312,1552.1400146484375,626.2058715820312,1552.1400146484375,1450.604736328125,881.6513061523438,1450.604736328125],"score":0.9999856352806091},{"category_id":1,"poly":[149.41075134277344,232.1595001220703,819.0465087890625,232.1595001220703,819.0465087890625,625.8865356445312,149.41075134277344,625.8865356445312],"score":0.99998539686203},{"category_id":1,"poly":[149.3945770263672,1215.5172119140625,817.8850708007812,1215.5172119140625,817.8850708007812,1304.873291015625,149.3945770263672,1304.873291015625],"score":0.9999765157699585},{"category_id":1,"poly":[882.6979370117188,1880.13916015625,1552.15185546875,1880.13916015625,1552.15185546875,2031.339599609375,882.6979370117188,2031.339599609375],"score":0.9999744892120361},{"category_id":1,"poly":[148.96054077148438,743.3055419921875,818.6231689453125,743.3055419921875,818.6231689453125,1074.2369384765625,148.96054077148438,1074.2369384765625],"score":0.9999669790267944},{"category_id":1,"poly":[148.8435516357422,1791.14306640625,818.6885375976562,1791.14306640625,818.6885375976562,2030.794189453125,148.8435516357422,2030.794189453125],"score":0.9999618530273438},{"category_id":0,"poly":[150.7009735107422,684.0087890625,623.5106201171875,684.0087890625,623.5106201171875,717.03662109375,150.7009735107422,717.03662109375],"score":0.9999415278434753},{"category_id":8,"poly":[146.48068237304688,1331.6737060546875,317.2640075683594,1331.6737060546875,317.2640075683594,1400.1722412109375,146.48068237304688,1400.1722412109375],"score":0.9998958110809326},{"category_id":1,"poly":[149.42420959472656,1430.8782958984375,818.9042358398438,1430.8782958984375,818.9042358398438,1672.7386474609375,149.42420959472656,1672.7386474609375],"score":0.9998599290847778},{"category_id":1,"poly":[149.18746948242188,172.10252380371094,818.5662231445312,172.10252380371094,818.5662231445312,230.4594268798828,149.18746948242188,230.4594268798828],"score":0.9997718334197998},{"category_id":0,"poly":[149.0175018310547,1732.1090087890625,702.1005859375,1732.1090087890625,702.1005859375,1763.6046142578125,149.0175018310547,1763.6046142578125],"score":0.9997085928916931},{"category_id":2,"poly":[1519.802490234375,98.59099578857422,1551.985107421875,98.59099578857422,1551.985107421875,119.48420715332031,1519.802490234375,119.48420715332031],"score":0.9995552897453308},{"category_id":8,"poly":[146.9109649658203,1100.156494140625,544.2803344726562,1100.156494140625,544.2803344726562,1184.929443359375,146.9109649658203,1184.929443359375],"score":0.9995207786560059},{"category_id":2,"poly":[148.11611938476562,99.87767791748047,318.926025390625,99.87767791748047,318.926025390625,120.70393371582031,148.11611938476562,120.70393371582031],"score":0.999351441860199},{"category_id":9,"poly":[791.7642211914062,1130.056396484375,818.6940307617188,1130.056396484375,818.6940307617188,1161.1080322265625,791.7642211914062,1161.1080322265625],"score":0.9908884763717651},{"category_id":9,"poly":[788.37060546875,1346.8450927734375,818.5010986328125,1346.8450927734375,818.5010986328125,1377.370361328125,788.37060546875,1377.370361328125],"score":0.9873985052108765},{"category_id":14,"poly":[146,1103,543,1103,543,1184,146,1184],"score":0.94,"latex":"E\\!\\left(W\\right)\\!=\\!\\frac{E\\!\\left[H^{2}\\right]}{2E\\!\\left[H\\right]}\\!=\\!\\frac{E\\!\\left[H\\right]}{2}\\!\\!\\left(1\\!+\\!\\operatorname{CV}\\!\\left(H\\right)^{2}\\right)"},{"category_id":13,"poly":[1196,354,1278,354,1278,384,1196,384],"score":0.91,"latex":"p(1-q)"},{"category_id":13,"poly":[881,415,1020,415,1020,444,881,444],"score":0.91,"latex":"(1-p)(1-q)"},{"category_id":14,"poly":[147,1333,318,1333,318,1400,147,1400],"score":0.91,"latex":"\\mathbf{CV}\\big(H\\big)\\!=\\!\\frac{\\boldsymbol{\\upsigma}_{H}}{E\\big[H\\big]}"},{"category_id":13,"poly":[1197,657,1263,657,1263,686,1197,686],"score":0.9,"latex":"(1-p)"},{"category_id":13,"poly":[213,1217,263,1217,263,1244,213,1244],"score":0.88,"latex":"E[X]"},{"category_id":13,"poly":[214,1434,245,1434,245,1459,214,1459],"score":0.87,"latex":"\\upsigma_{H}"},{"category_id":13,"poly":[324,2002,373,2002,373,2028,324,2028],"score":0.84,"latex":"30\\%"},{"category_id":13,"poly":[1209,693,1225,693,1225,717,1209,717],"score":0.83,"latex":"p"},{"category_id":13,"poly":[990,449,1007,449,1007,474,990,474],"score":0.81,"latex":"p"},{"category_id":13,"poly":[346,1277,369,1277,369,1301,346,1301],"score":0.81,"latex":"H"},{"category_id":13,"poly":[1137,661,1154,661,1154,686,1137,686],"score":0.81,"latex":"p"},{"category_id":13,"poly":[522,1432,579,1432,579,1459,522,1459],"score":0.81,"latex":"H\\left(4\\right)"},{"category_id":13,"poly":[944,540,962,540,962,565,944,565],"score":0.8,"latex":"p"},{"category_id":13,"poly":[1444,936,1461,936,1461,961,1444,961],"score":0.79,"latex":"p"},{"category_id":13,"poly":[602,1247,624,1247,624,1270,602,1270],"score":0.78,"latex":"H"},{"category_id":13,"poly":[147,1247,167,1247,167,1271,147,1271],"score":0.77,"latex":"X"},{"category_id":13,"poly":[210,1246,282,1246,282,1274,210,1274],"score":0.77,"latex":"\\operatorname{CV}(H)"},{"category_id":13,"poly":[1346,268,1361,268,1361,292,1346,292],"score":0.76,"latex":"q"},{"category_id":13,"poly":[215,957,238,957,238,981,215,981],"score":0.74,"latex":"H"},{"category_id":13,"poly":[149,956,173,956,173,981,149,981],"score":0.63,"latex":"W"},{"category_id":13,"poly":[924,841,1016,841,1016,868,924,868],"score":0.56,"latex":"8{\\cdot}00\\;\\mathrm{a.m}"},{"category_id":13,"poly":[956,871,1032,871,1032,898,956,898],"score":0.43,"latex":"20~\\mathrm{min}"},{"category_id":13,"poly":[1082,781,1112,781,1112,808,1082,808],"score":0.41,"latex":"(l)"},{"category_id":13,"poly":[697,1821,734,1821,734,1847,697,1847],"score":0.3,"latex":"^{1\\mathrm{~h~}}"}],"page_info":{"page_no":0,"height":2200,"width":1700}}]}
\ No newline at end of file
[
{
"layout_dets": [
{
"category_id": 1,
"poly": [
882.4013061523438,
169.93817138671875,
1552.350341796875,
169.93817138671875,
1552.350341796875,
625.8263549804688,
882.4013061523438,
625.8263549804688
],
"score": 0.999992311000824
},
{
"category_id": 1,
"poly": [
882.474853515625,
1450.92822265625,
1551.4490966796875,
1450.92822265625,
1551.4490966796875,
1877.5712890625,
882.474853515625,
1877.5712890625
],
"score": 0.9999903440475464
},
{
"category_id": 1,
"poly": [
881.6513061523438,
626.2058715820312,
1552.1400146484375,
626.2058715820312,
1552.1400146484375,
1450.604736328125,
881.6513061523438,
1450.604736328125
],
"score": 0.9999856352806091
},
{
"category_id": 1,
"poly": [
149.41075134277344,
232.1595001220703,
819.0465087890625,
232.1595001220703,
819.0465087890625,
625.8865356445312,
149.41075134277344,
625.8865356445312
],
"score": 0.99998539686203
},
{
"category_id": 1,
"poly": [
149.3945770263672,
1215.5172119140625,
817.8850708007812,
1215.5172119140625,
817.8850708007812,
1304.873291015625,
149.3945770263672,
1304.873291015625
],
"score": 0.9999765157699585
},
{
"category_id": 1,
"poly": [
882.6979370117188,
1880.13916015625,
1552.15185546875,
1880.13916015625,
1552.15185546875,
2031.339599609375,
882.6979370117188,
2031.339599609375
],
"score": 0.9999744892120361
},
{
"category_id": 1,
"poly": [
148.96054077148438,
743.3055419921875,
818.6231689453125,
743.3055419921875,
818.6231689453125,
1074.2369384765625,
148.96054077148438,
1074.2369384765625
],
"score": 0.9999669790267944
},
{
"category_id": 1,
"poly": [
148.8435516357422,
1791.14306640625,
818.6885375976562,
1791.14306640625,
818.6885375976562,
2030.794189453125,
148.8435516357422,
2030.794189453125
],
"score": 0.9999618530273438
},
{
"category_id": 0,
"poly": [
150.7009735107422,
684.0087890625,
623.5106201171875,
684.0087890625,
623.5106201171875,
717.03662109375,
150.7009735107422,
717.03662109375
],
"score": 0.9999415278434753
},
{
"category_id": 8,
"poly": [
146.48068237304688,
1331.6737060546875,
317.2640075683594,
1331.6737060546875,
317.2640075683594,
1400.1722412109375,
146.48068237304688,
1400.1722412109375
],
"score": 0.9998958110809326
},
{
"category_id": 1,
"poly": [
149.42420959472656,
1430.8782958984375,
818.9042358398438,
1430.8782958984375,
818.9042358398438,
1672.7386474609375,
149.42420959472656,
1672.7386474609375
],
"score": 0.9998599290847778
},
{
"category_id": 1,
"poly": [
149.18746948242188,
172.10252380371094,
818.5662231445312,
172.10252380371094,
818.5662231445312,
230.4594268798828,
149.18746948242188,
230.4594268798828
],
"score": 0.9997718334197998
},
{
"category_id": 0,
"poly": [
149.0175018310547,
1732.1090087890625,
702.1005859375,
1732.1090087890625,
702.1005859375,
1763.6046142578125,
149.0175018310547,
1763.6046142578125
],
"score": 0.9997085928916931
},
{
"category_id": 2,
"poly": [
1519.802490234375,
98.59099578857422,
1551.985107421875,
98.59099578857422,
1551.985107421875,
119.48420715332031,
1519.802490234375,
119.48420715332031
],
"score": 0.9995552897453308
},
{
"category_id": 8,
"poly": [
146.9109649658203,
1100.156494140625,
544.2803344726562,
1100.156494140625,
544.2803344726562,
1184.929443359375,
146.9109649658203,
1184.929443359375
],
"score": 0.9995207786560059
},
{
"category_id": 2,
"poly": [
148.11611938476562,
99.87767791748047,
318.926025390625,
99.87767791748047,
318.926025390625,
120.70393371582031,
148.11611938476562,
120.70393371582031
],
"score": 0.999351441860199
},
{
"category_id": 9,
"poly": [
791.7642211914062,
1130.056396484375,
818.6940307617188,
1130.056396484375,
818.6940307617188,
1161.1080322265625,
791.7642211914062,
1161.1080322265625
],
"score": 0.9908884763717651
},
{
"category_id": 9,
"poly": [
788.37060546875,
1346.8450927734375,
818.5010986328125,
1346.8450927734375,
818.5010986328125,
1377.370361328125,
788.37060546875,
1377.370361328125
],
"score": 0.9873985052108765
},
{
"category_id": 14,
"poly": [
146,
1103,
543,
1103,
543,
1184,
146,
1184
],
"score": 0.94,
"latex": "E\\!\\left(W\\right)\\!=\\!\\frac{E\\!\\left[H^{2}\\right]}{2E\\!\\left[H\\right]}\\!=\\!\\frac{E\\!\\left[H\\right]}{2}\\!\\!\\left(1\\!+\\!\\operatorname{CV}\\!\\left(H\\right)^{2}\\right)"
},
{
"category_id": 13,
"poly": [
1196,
354,
1278,
354,
1278,
384,
1196,
384
],
"score": 0.91,
"latex": "p(1-q)"
},
{
"category_id": 13,
"poly": [
881,
415,
1020,
415,
1020,
444,
881,
444
],
"score": 0.91,
"latex": "(1-p)(1-q)"
},
{
"category_id": 14,
"poly": [
147,
1333,
318,
1333,
318,
1400,
147,
1400
],
"score": 0.91,
"latex": "\\mathbf{CV}\\big(H\\big)\\!=\\!\\frac{\\boldsymbol{\\upsigma}_{H}}{E\\big[H\\big]}"
},
{
"category_id": 13,
"poly": [
1197,
657,
1263,
657,
1263,
686,
1197,
686
],
"score": 0.9,
"latex": "(1-p)"
},
{
"category_id": 13,
"poly": [
213,
1217,
263,
1217,
263,
1244,
213,
1244
],
"score": 0.88,
"latex": "E[X]"
},
{
"category_id": 13,
"poly": [
214,
1434,
245,
1434,
245,
1459,
214,
1459
],
"score": 0.87,
"latex": "\\upsigma_{H}"
},
{
"category_id": 13,
"poly": [
324,
2002,
373,
2002,
373,
2028,
324,
2028
],
"score": 0.84,
"latex": "30\\%"
},
{
"category_id": 13,
"poly": [
1209,
693,
1225,
693,
1225,
717,
1209,
717
],
"score": 0.83,
"latex": "p"
},
{
"category_id": 13,
"poly": [
990,
449,
1007,
449,
1007,
474,
990,
474
],
"score": 0.81,
"latex": "p"
},
{
"category_id": 13,
"poly": [
346,
1277,
369,
1277,
369,
1301,
346,
1301
],
"score": 0.81,
"latex": "H"
},
{
"category_id": 13,
"poly": [
1137,
661,
1154,
661,
1154,
686,
1137,
686
],
"score": 0.81,
"latex": "p"
},
{
"category_id": 13,
"poly": [
522,
1432,
579,
1432,
579,
1459,
522,
1459
],
"score": 0.81,
"latex": "H\\left(4\\right)"
},
{
"category_id": 13,
"poly": [
944,
540,
962,
540,
962,
565,
944,
565
],
"score": 0.8,
"latex": "p"
},
{
"category_id": 13,
"poly": [
1444,
936,
1461,
936,
1461,
961,
1444,
961
],
"score": 0.79,
"latex": "p"
},
{
"category_id": 13,
"poly": [
602,
1247,
624,
1247,
624,
1270,
602,
1270
],
"score": 0.78,
"latex": "H"
},
{
"category_id": 13,
"poly": [
147,
1247,
167,
1247,
167,
1271,
147,
1271
],
"score": 0.77,
"latex": "X"
},
{
"category_id": 13,
"poly": [
210,
1246,
282,
1246,
282,
1274,
210,
1274
],
"score": 0.77,
"latex": "\\operatorname{CV}(H)"
},
{
"category_id": 13,
"poly": [
1346,
268,
1361,
268,
1361,
292,
1346,
292
],
"score": 0.76,
"latex": "q"
},
{
"category_id": 13,
"poly": [
215,
957,
238,
957,
238,
981,
215,
981
],
"score": 0.74,
"latex": "H"
},
{
"category_id": 13,
"poly": [
149,
956,
173,
956,
173,
981,
149,
981
],
"score": 0.63,
"latex": "W"
},
{
"category_id": 13,
"poly": [
924,
841,
1016,
841,
1016,
868,
924,
868
],
"score": 0.56,
"latex": "8{\\cdot}00\\;\\mathrm{a.m}"
},
{
"category_id": 13,
"poly": [
956,
871,
1032,
871,
1032,
898,
956,
898
],
"score": 0.43,
"latex": "20~\\mathrm{min}"
},
{
"category_id": 13,
"poly": [
1082,
781,
1112,
781,
1112,
808,
1082,
808
],
"score": 0.41,
"latex": "(l)"
},
{
"category_id": 13,
"poly": [
697,
1821,
734,
1821,
734,
1847,
697,
1847
],
"score": 0.3,
"latex": "^{1\\mathrm{~h~}}"
}
],
"page_info": {
"page_no": 0,
"height": 2200,
"width": 1700
}
}
]
\ No newline at end of file
import tempfile
import os
import shutil
from click.testing import CliRunner
from magic_pdf.tools.cli import cli
def test_cli_pdf():
# setup
unitest_dir = "/tmp/magic_pdf/unittest/tools"
filename = "cli_test_01"
os.makedirs(unitest_dir, exist_ok=True)
temp_output_dir = tempfile.mkdtemp(dir="/tmp/magic_pdf/unittest/tools")
# run
runner = CliRunner()
result = runner.invoke(
cli,
[
"-p",
"tests/test_tools/assets/cli/pdf/cli_test_01.pdf",
"-o",
temp_output_dir,
],
)
# check
assert result.exit_code == 0
base_output_dir = os.path.join(temp_output_dir, "cli_test_01/auto")
r = os.stat(os.path.join(base_output_dir, f"{filename}.md"))
assert r.st_size > 7000
r = os.stat(os.path.join(base_output_dir, "middle.json"))
assert r.st_size > 200000
r = os.stat(os.path.join(base_output_dir, "model.json"))
assert r.st_size > 15000
r = os.stat(os.path.join(base_output_dir, "origin.pdf"))
assert r.st_size > 500000
r = os.stat(os.path.join(base_output_dir, "layout.pdf"))
assert r.st_size > 500000
r = os.stat(os.path.join(base_output_dir, "spans.pdf"))
assert r.st_size > 500000
assert os.path.exists(os.path.join(base_output_dir, "images")) is True
assert os.path.isdir(os.path.join(base_output_dir, "images")) is True
assert os.path.exists(os.path.join(base_output_dir, "content_list.json")) is False
# teardown
shutil.rmtree(temp_output_dir)
def test_cli_path():
# setup
unitest_dir = "/tmp/magic_pdf/unittest/tools"
os.makedirs(unitest_dir, exist_ok=True)
temp_output_dir = tempfile.mkdtemp(dir="/tmp/magic_pdf/unittest/tools")
# run
runner = CliRunner()
result = runner.invoke(
cli, ["-p", "tests/test_tools/assets/cli/path", "-o", temp_output_dir]
)
# check
assert result.exit_code == 0
filename = "cli_test_01"
base_output_dir = os.path.join(temp_output_dir, "cli_test_01/auto")
r = os.stat(os.path.join(base_output_dir, f"{filename}.md"))
assert r.st_size > 7000
r = os.stat(os.path.join(base_output_dir, "middle.json"))
assert r.st_size > 200000
r = os.stat(os.path.join(base_output_dir, "model.json"))
assert r.st_size > 15000
r = os.stat(os.path.join(base_output_dir, "origin.pdf"))
assert r.st_size > 500000
r = os.stat(os.path.join(base_output_dir, "layout.pdf"))
assert r.st_size > 500000
r = os.stat(os.path.join(base_output_dir, "spans.pdf"))
assert r.st_size > 500000
assert os.path.exists(os.path.join(base_output_dir, "images")) is True
assert os.path.isdir(os.path.join(base_output_dir, "images")) is True
assert os.path.exists(os.path.join(base_output_dir, "content_list.json")) is False
base_output_dir = os.path.join(temp_output_dir, "cli_test_02/auto")
filename = "cli_test_02"
r = os.stat(os.path.join(base_output_dir, f"{filename}.md"))
assert r.st_size > 5000
r = os.stat(os.path.join(base_output_dir, "middle.json"))
assert r.st_size > 200000
r = os.stat(os.path.join(base_output_dir, "model.json"))
assert r.st_size > 15000
r = os.stat(os.path.join(base_output_dir, "origin.pdf"))
assert r.st_size > 500000
r = os.stat(os.path.join(base_output_dir, "layout.pdf"))
assert r.st_size > 500000
r = os.stat(os.path.join(base_output_dir, "spans.pdf"))
assert r.st_size > 500000
assert os.path.exists(os.path.join(base_output_dir, "images")) is True
assert os.path.isdir(os.path.join(base_output_dir, "images")) is True
assert os.path.exists(os.path.join(base_output_dir, "content_list.json")) is False
# teardown
shutil.rmtree(temp_output_dir)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment