Unverified Commit 6ab12348 authored by Xiaomeng Zhao's avatar Xiaomeng Zhao Committed by GitHub
Browse files

Merge pull request #2625 from opendatalab/release-2.0.0

Release 2.0.0
parents 9487d33d 4fbec469
from .visualizer import Visualizer
from .rcnn_vl import *
from .backbone import *
from detectron2.config import get_cfg
from detectron2.config import CfgNode as CN
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.data.datasets import register_coco_instances
from detectron2.engine import DefaultTrainer, default_argument_parser, default_setup, launch, DefaultPredictor
def add_vit_config(cfg):
"""
Add config for VIT.
"""
_C = cfg
_C.MODEL.VIT = CN()
# CoaT model name.
_C.MODEL.VIT.NAME = ""
# Output features from CoaT backbone.
_C.MODEL.VIT.OUT_FEATURES = ["layer3", "layer5", "layer7", "layer11"]
_C.MODEL.VIT.IMG_SIZE = [224, 224]
_C.MODEL.VIT.POS_TYPE = "shared_rel"
_C.MODEL.VIT.DROP_PATH = 0.
_C.MODEL.VIT.MODEL_KWARGS = "{}"
_C.SOLVER.OPTIMIZER = "ADAMW"
_C.SOLVER.BACKBONE_MULTIPLIER = 1.0
_C.AUG = CN()
_C.AUG.DETR = False
_C.MODEL.IMAGE_ONLY = True
_C.PUBLAYNET_DATA_DIR_TRAIN = ""
_C.PUBLAYNET_DATA_DIR_TEST = ""
_C.FOOTNOTE_DATA_DIR_TRAIN = ""
_C.FOOTNOTE_DATA_DIR_VAL = ""
_C.SCIHUB_DATA_DIR_TRAIN = ""
_C.SCIHUB_DATA_DIR_TEST = ""
_C.JIAOCAI_DATA_DIR_TRAIN = ""
_C.JIAOCAI_DATA_DIR_TEST = ""
_C.ICDAR_DATA_DIR_TRAIN = ""
_C.ICDAR_DATA_DIR_TEST = ""
_C.M6DOC_DATA_DIR_TEST = ""
_C.DOCSTRUCTBENCH_DATA_DIR_TEST = ""
_C.DOCSTRUCTBENCHv2_DATA_DIR_TEST = ""
_C.CACHE_DIR = ""
_C.MODEL.CONFIG_PATH = ""
# effective update steps would be MAX_ITER/GRADIENT_ACCUMULATION_STEPS
# maybe need to set MAX_ITER *= GRADIENT_ACCUMULATION_STEPS
_C.SOLVER.GRADIENT_ACCUMULATION_STEPS = 1
def setup(args, device):
"""
Create configs and perform basic setups.
"""
cfg = get_cfg()
# add_coat_config(cfg)
add_vit_config(cfg)
cfg.merge_from_file(args.config_file)
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.2 # set threshold for this model
cfg.merge_from_list(args.opts)
# 使用统一的device配置
cfg.MODEL.DEVICE = device
cfg.freeze()
default_setup(cfg, args)
#@todo 可以删掉这块?
# register_coco_instances(
# "scihub_train",
# {},
# cfg.SCIHUB_DATA_DIR_TRAIN + ".json",
# cfg.SCIHUB_DATA_DIR_TRAIN
# )
return cfg
class DotDict(dict):
def __init__(self, *args, **kwargs):
super(DotDict, self).__init__(*args, **kwargs)
def __getattr__(self, key):
if key not in self.keys():
return None
value = self[key]
if isinstance(value, dict):
value = DotDict(value)
return value
def __setattr__(self, key, value):
self[key] = value
class Layoutlmv3_Predictor(object):
def __init__(self, weights, config_file, device):
layout_args = {
"config_file": config_file,
"resume": False,
"eval_only": False,
"num_gpus": 1,
"num_machines": 1,
"machine_rank": 0,
"dist_url": "tcp://127.0.0.1:57823",
"opts": ["MODEL.WEIGHTS", weights],
}
layout_args = DotDict(layout_args)
cfg = setup(layout_args, device)
self.mapping = ["title", "plain text", "abandon", "figure", "figure_caption", "table", "table_caption",
"table_footnote", "isolate_formula", "formula_caption"]
MetadataCatalog.get(cfg.DATASETS.TRAIN[0]).thing_classes = self.mapping
self.predictor = DefaultPredictor(cfg)
def __call__(self, image, ignore_catids=[]):
# page_layout_result = {
# "layout_dets": []
# }
layout_dets = []
outputs = self.predictor(image)
boxes = outputs["instances"].to("cpu")._fields["pred_boxes"].tensor.tolist()
labels = outputs["instances"].to("cpu")._fields["pred_classes"].tolist()
scores = outputs["instances"].to("cpu")._fields["scores"].tolist()
for bbox_idx in range(len(boxes)):
if labels[bbox_idx] in ignore_catids:
continue
layout_dets.append({
"category_id": labels[bbox_idx],
"poly": [
boxes[bbox_idx][0], boxes[bbox_idx][1],
boxes[bbox_idx][2], boxes[bbox_idx][1],
boxes[bbox_idx][2], boxes[bbox_idx][3],
boxes[bbox_idx][0], boxes[bbox_idx][3],
],
"score": scores[bbox_idx]
})
return layout_dets
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
import numpy as np
from typing import Dict, List, Optional, Tuple
import torch
from torch import nn
from detectron2.config import configurable
from detectron2.structures import ImageList, Instances
from detectron2.utils.events import get_event_storage
from detectron2.modeling.backbone import Backbone, build_backbone
from detectron2.modeling.meta_arch.build import META_ARCH_REGISTRY
from detectron2.modeling.meta_arch import GeneralizedRCNN
from detectron2.modeling.postprocessing import detector_postprocess
from detectron2.modeling.roi_heads.fast_rcnn import fast_rcnn_inference_single_image
from contextlib import contextmanager
from itertools import count
@META_ARCH_REGISTRY.register()
class VLGeneralizedRCNN(GeneralizedRCNN):
"""
Generalized R-CNN. Any models that contains the following three components:
1. Per-image feature extraction (aka backbone)
2. Region proposal generation
3. Per-region feature extraction and prediction
"""
def forward(self, batched_inputs: List[Dict[str, torch.Tensor]]):
"""
Args:
batched_inputs: a list, batched outputs of :class:`DatasetMapper` .
Each item in the list contains the inputs for one image.
For now, each item in the list is a dict that contains:
* image: Tensor, image in (C, H, W) format.
* instances (optional): groundtruth :class:`Instances`
* proposals (optional): :class:`Instances`, precomputed proposals.
Other information that's included in the original dicts, such as:
* "height", "width" (int): the output resolution of the model, used in inference.
See :meth:`postprocess` for details.
Returns:
list[dict]:
Each dict is the output for one input image.
The dict contains one key "instances" whose value is a :class:`Instances`.
The :class:`Instances` object has the following keys:
"pred_boxes", "pred_classes", "scores", "pred_masks", "pred_keypoints"
"""
if not self.training:
return self.inference(batched_inputs)
images = self.preprocess_image(batched_inputs)
if "instances" in batched_inputs[0]:
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
else:
gt_instances = None
# features = self.backbone(images.tensor)
input = self.get_batch(batched_inputs, images)
features = self.backbone(input)
if self.proposal_generator is not None:
proposals, proposal_losses = self.proposal_generator(images, features, gt_instances)
else:
assert "proposals" in batched_inputs[0]
proposals = [x["proposals"].to(self.device) for x in batched_inputs]
proposal_losses = {}
_, detector_losses = self.roi_heads(images, features, proposals, gt_instances)
if self.vis_period > 0:
storage = get_event_storage()
if storage.iter % self.vis_period == 0:
self.visualize_training(batched_inputs, proposals)
losses = {}
losses.update(detector_losses)
losses.update(proposal_losses)
return losses
def inference(
self,
batched_inputs: List[Dict[str, torch.Tensor]],
detected_instances: Optional[List[Instances]] = None,
do_postprocess: bool = True,
):
"""
Run inference on the given inputs.
Args:
batched_inputs (list[dict]): same as in :meth:`forward`
detected_instances (None or list[Instances]): if not None, it
contains an `Instances` object per image. The `Instances`
object contains "pred_boxes" and "pred_classes" which are
known boxes in the image.
The inference will then skip the detection of bounding boxes,
and only predict other per-ROI outputs.
do_postprocess (bool): whether to apply post-processing on the outputs.
Returns:
When do_postprocess=True, same as in :meth:`forward`.
Otherwise, a list[Instances] containing raw network outputs.
"""
assert not self.training
images = self.preprocess_image(batched_inputs)
# features = self.backbone(images.tensor)
input = self.get_batch(batched_inputs, images)
features = self.backbone(input)
if detected_instances is None:
if self.proposal_generator is not None:
proposals, _ = self.proposal_generator(images, features, None)
else:
assert "proposals" in batched_inputs[0]
proposals = [x["proposals"].to(self.device) for x in batched_inputs]
results, _ = self.roi_heads(images, features, proposals, None)
else:
detected_instances = [x.to(self.device) for x in detected_instances]
results = self.roi_heads.forward_with_given_boxes(features, detected_instances)
if do_postprocess:
assert not torch.jit.is_scripting(), "Scripting is not supported for postprocess."
return GeneralizedRCNN._postprocess(results, batched_inputs, images.image_sizes)
else:
return results
def get_batch(self, examples, images):
if len(examples) >= 1 and "bbox" not in examples[0]: # image_only
return {"images": images.tensor}
return input
def _batch_inference(self, batched_inputs, detected_instances=None):
"""
Execute inference on a list of inputs,
using batch size = self.batch_size (e.g., 2), instead of the length of the list.
Inputs & outputs have the same format as :meth:`GeneralizedRCNN.inference`
"""
if detected_instances is None:
detected_instances = [None] * len(batched_inputs)
outputs = []
inputs, instances = [], []
for idx, input, instance in zip(count(), batched_inputs, detected_instances):
inputs.append(input)
instances.append(instance)
if len(inputs) == 2 or idx == len(batched_inputs) - 1:
outputs.extend(
self.inference(
inputs,
instances if instances[0] is not None else None,
do_postprocess=True, # False
)
)
inputs, instances = [], []
return outputs
# Copyright (c) Facebook, Inc. and its affiliates.
import colorsys
import logging
import math
import numpy as np
from enum import Enum, unique
import cv2
import matplotlib as mpl
import matplotlib.colors as mplc
import matplotlib.figure as mplfigure
import pycocotools.mask as mask_util
import torch
from matplotlib.backends.backend_agg import FigureCanvasAgg
from PIL import Image
from detectron2.data import MetadataCatalog
from detectron2.structures import BitMasks, Boxes, BoxMode, Keypoints, PolygonMasks, RotatedBoxes
from detectron2.utils.file_io import PathManager
from detectron2.utils.colormap import random_color
import pdb
logger = logging.getLogger(__name__)
__all__ = ["ColorMode", "VisImage", "Visualizer"]
_SMALL_OBJECT_AREA_THRESH = 1000
_LARGE_MASK_AREA_THRESH = 120000
_OFF_WHITE = (1.0, 1.0, 240.0 / 255)
_BLACK = (0, 0, 0)
_RED = (1.0, 0, 0)
_KEYPOINT_THRESHOLD = 0.05
#CLASS_NAMES = ["footnote", "footer", "header"]
@unique
class ColorMode(Enum):
"""
Enum of different color modes to use for instance visualizations.
"""
IMAGE = 0
"""
Picks a random color for every instance and overlay segmentations with low opacity.
"""
SEGMENTATION = 1
"""
Let instances of the same category have similar colors
(from metadata.thing_colors), and overlay them with
high opacity. This provides more attention on the quality of segmentation.
"""
IMAGE_BW = 2
"""
Same as IMAGE, but convert all areas without masks to gray-scale.
Only available for drawing per-instance mask predictions.
"""
class GenericMask:
"""
Attribute:
polygons (list[ndarray]): list[ndarray]: polygons for this mask.
Each ndarray has format [x, y, x, y, ...]
mask (ndarray): a binary mask
"""
def __init__(self, mask_or_polygons, height, width):
self._mask = self._polygons = self._has_holes = None
self.height = height
self.width = width
m = mask_or_polygons
if isinstance(m, dict):
# RLEs
assert "counts" in m and "size" in m
if isinstance(m["counts"], list): # uncompressed RLEs
h, w = m["size"]
assert h == height and w == width
m = mask_util.frPyObjects(m, h, w)
self._mask = mask_util.decode(m)[:, :]
return
if isinstance(m, list): # list[ndarray]
self._polygons = [np.asarray(x).reshape(-1) for x in m]
return
if isinstance(m, np.ndarray): # assumed to be a binary mask
assert m.shape[1] != 2, m.shape
assert m.shape == (
height,
width,
), f"mask shape: {m.shape}, target dims: {height}, {width}"
self._mask = m.astype("uint8")
return
raise ValueError("GenericMask cannot handle object {} of type '{}'".format(m, type(m)))
@property
def mask(self):
if self._mask is None:
self._mask = self.polygons_to_mask(self._polygons)
return self._mask
@property
def polygons(self):
if self._polygons is None:
self._polygons, self._has_holes = self.mask_to_polygons(self._mask)
return self._polygons
@property
def has_holes(self):
if self._has_holes is None:
if self._mask is not None:
self._polygons, self._has_holes = self.mask_to_polygons(self._mask)
else:
self._has_holes = False # if original format is polygon, does not have holes
return self._has_holes
def mask_to_polygons(self, mask):
# cv2.RETR_CCOMP flag retrieves all the contours and arranges them to a 2-level
# hierarchy. External contours (boundary) of the object are placed in hierarchy-1.
# Internal contours (holes) are placed in hierarchy-2.
# cv2.CHAIN_APPROX_NONE flag gets vertices of polygons from contours.
mask = np.ascontiguousarray(mask) # some versions of cv2 does not support incontiguous arr
res = cv2.findContours(mask.astype("uint8"), cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE)
hierarchy = res[-1]
if hierarchy is None: # empty mask
return [], False
has_holes = (hierarchy.reshape(-1, 4)[:, 3] >= 0).sum() > 0
res = res[-2]
res = [x.flatten() for x in res]
# These coordinates from OpenCV are integers in range [0, W-1 or H-1].
# We add 0.5 to turn them into real-value coordinate space. A better solution
# would be to first +0.5 and then dilate the returned polygon by 0.5.
res = [x + 0.5 for x in res if len(x) >= 6]
return res, has_holes
def polygons_to_mask(self, polygons):
rle = mask_util.frPyObjects(polygons, self.height, self.width)
rle = mask_util.merge(rle)
return mask_util.decode(rle)[:, :]
def area(self):
return self.mask.sum()
def bbox(self):
p = mask_util.frPyObjects(self.polygons, self.height, self.width)
p = mask_util.merge(p)
bbox = mask_util.toBbox(p)
bbox[2] += bbox[0]
bbox[3] += bbox[1]
return bbox
class _PanopticPrediction:
"""
Unify different panoptic annotation/prediction formats
"""
def __init__(self, panoptic_seg, segments_info, metadata=None):
if segments_info is None:
assert metadata is not None
# If "segments_info" is None, we assume "panoptic_img" is a
# H*W int32 image storing the panoptic_id in the format of
# category_id * label_divisor + instance_id. We reserve -1 for
# VOID label.
label_divisor = metadata.label_divisor
segments_info = []
for panoptic_label in np.unique(panoptic_seg.numpy()):
if panoptic_label == -1:
# VOID region.
continue
pred_class = panoptic_label // label_divisor
isthing = pred_class in metadata.thing_dataset_id_to_contiguous_id.values()
segments_info.append(
{
"id": int(panoptic_label),
"category_id": int(pred_class),
"isthing": bool(isthing),
}
)
del metadata
self._seg = panoptic_seg
self._sinfo = {s["id"]: s for s in segments_info} # seg id -> seg info
segment_ids, areas = torch.unique(panoptic_seg, sorted=True, return_counts=True)
areas = areas.numpy()
sorted_idxs = np.argsort(-areas)
self._seg_ids, self._seg_areas = segment_ids[sorted_idxs], areas[sorted_idxs]
self._seg_ids = self._seg_ids.tolist()
for sid, area in zip(self._seg_ids, self._seg_areas):
if sid in self._sinfo:
self._sinfo[sid]["area"] = float(area)
def non_empty_mask(self):
"""
Returns:
(H, W) array, a mask for all pixels that have a prediction
"""
empty_ids = []
for id in self._seg_ids:
if id not in self._sinfo:
empty_ids.append(id)
if len(empty_ids) == 0:
return np.zeros(self._seg.shape, dtype=np.uint8)
assert (
len(empty_ids) == 1
), ">1 ids corresponds to no labels. This is currently not supported"
return (self._seg != empty_ids[0]).numpy().astype(np.bool)
def semantic_masks(self):
for sid in self._seg_ids:
sinfo = self._sinfo.get(sid)
if sinfo is None or sinfo["isthing"]:
# Some pixels (e.g. id 0 in PanopticFPN) have no instance or semantic predictions.
continue
yield (self._seg == sid).numpy().astype(np.bool), sinfo
def instance_masks(self):
for sid in self._seg_ids:
sinfo = self._sinfo.get(sid)
if sinfo is None or not sinfo["isthing"]:
continue
mask = (self._seg == sid).numpy().astype(np.bool)
if mask.sum() > 0:
yield mask, sinfo
def _create_text_labels(classes, scores, class_names, is_crowd=None):
"""
Args:
classes (list[int] or None):
scores (list[float] or None):
class_names (list[str] or None):
is_crowd (list[bool] or None):
Returns:
list[str] or None
"""
#class_names = CLASS_NAMES
labels = None
if classes is not None:
if class_names is not None and len(class_names) > 0:
labels = [class_names[i] for i in classes]
else:
labels = [str(i) for i in classes]
if scores is not None:
if labels is None:
labels = ["{:.0f}%".format(s * 100) for s in scores]
else:
labels = ["{} {:.0f}%".format(l, s * 100) for l, s in zip(labels, scores)]
if labels is not None and is_crowd is not None:
labels = [l + ("|crowd" if crowd else "") for l, crowd in zip(labels, is_crowd)]
return labels
class VisImage:
def __init__(self, img, scale=1.0):
"""
Args:
img (ndarray): an RGB image of shape (H, W, 3) in range [0, 255].
scale (float): scale the input image
"""
self.img = img
self.scale = scale
self.width, self.height = img.shape[1], img.shape[0]
self._setup_figure(img)
def _setup_figure(self, img):
"""
Args:
Same as in :meth:`__init__()`.
Returns:
fig (matplotlib.pyplot.figure): top level container for all the image plot elements.
ax (matplotlib.pyplot.Axes): contains figure elements and sets the coordinate system.
"""
fig = mplfigure.Figure(frameon=False)
self.dpi = fig.get_dpi()
# add a small 1e-2 to avoid precision lost due to matplotlib's truncation
# (https://github.com/matplotlib/matplotlib/issues/15363)
fig.set_size_inches(
(self.width * self.scale + 1e-2) / self.dpi,
(self.height * self.scale + 1e-2) / self.dpi,
)
self.canvas = FigureCanvasAgg(fig)
# self.canvas = mpl.backends.backend_cairo.FigureCanvasCairo(fig)
ax = fig.add_axes([0.0, 0.0, 1.0, 1.0])
ax.axis("off")
self.fig = fig
self.ax = ax
self.reset_image(img)
def reset_image(self, img):
"""
Args:
img: same as in __init__
"""
img = img.astype("uint8")
self.ax.imshow(img, extent=(0, self.width, self.height, 0), interpolation="nearest")
def save(self, filepath):
"""
Args:
filepath (str): a string that contains the absolute path, including the file name, where
the visualized image will be saved.
"""
self.fig.savefig(filepath)
def get_image(self):
"""
Returns:
ndarray:
the visualized image of shape (H, W, 3) (RGB) in uint8 type.
The shape is scaled w.r.t the input image using the given `scale` argument.
"""
canvas = self.canvas
s, (width, height) = canvas.print_to_buffer()
# buf = io.BytesIO() # works for cairo backend
# canvas.print_rgba(buf)
# width, height = self.width, self.height
# s = buf.getvalue()
buffer = np.frombuffer(s, dtype="uint8")
img_rgba = buffer.reshape(height, width, 4)
rgb, alpha = np.split(img_rgba, [3], axis=2)
return rgb.astype("uint8")
class Visualizer:
"""
Visualizer that draws data about detection/segmentation on images.
It contains methods like `draw_{text,box,circle,line,binary_mask,polygon}`
that draw primitive objects to images, as well as high-level wrappers like
`draw_{instance_predictions,sem_seg,panoptic_seg_predictions,dataset_dict}`
that draw composite data in some pre-defined style.
Note that the exact visualization style for the high-level wrappers are subject to change.
Style such as color, opacity, label contents, visibility of labels, or even the visibility
of objects themselves (e.g. when the object is too small) may change according
to different heuristics, as long as the results still look visually reasonable.
To obtain a consistent style, you can implement custom drawing functions with the
abovementioned primitive methods instead. If you need more customized visualization
styles, you can process the data yourself following their format documented in
tutorials (:doc:`/tutorials/models`, :doc:`/tutorials/datasets`). This class does not
intend to satisfy everyone's preference on drawing styles.
This visualizer focuses on high rendering quality rather than performance. It is not
designed to be used for real-time applications.
"""
# TODO implement a fast, rasterized version using OpenCV
def __init__(self, img_rgb, metadata=None, scale=1.0, instance_mode=ColorMode.IMAGE):
"""
Args:
img_rgb: a numpy array of shape (H, W, C), where H and W correspond to
the height and width of the image respectively. C is the number of
color channels. The image is required to be in RGB format since that
is a requirement of the Matplotlib library. The image is also expected
to be in the range [0, 255].
metadata (Metadata): dataset metadata (e.g. class names and colors)
instance_mode (ColorMode): defines one of the pre-defined style for drawing
instances on an image.
"""
self.img = np.asarray(img_rgb).clip(0, 255).astype(np.uint8)
if metadata is None:
metadata = MetadataCatalog.get("__nonexist__")
self.metadata = metadata
self.output = VisImage(self.img, scale=scale)
self.cpu_device = torch.device("cpu")
# too small texts are useless, therefore clamp to 9
self._default_font_size = max(
np.sqrt(self.output.height * self.output.width) // 90, 10 // scale
)
self._instance_mode = instance_mode
self.keypoint_threshold = _KEYPOINT_THRESHOLD
def draw_instance_predictions(self, predictions):
"""
Draw instance-level prediction results on an image.
Args:
predictions (Instances): the output of an instance detection/segmentation
model. Following fields will be used to draw:
"pred_boxes", "pred_classes", "scores", "pred_masks" (or "pred_masks_rle").
Returns:
output (VisImage): image object with visualizations.
"""
boxes = predictions.pred_boxes if predictions.has("pred_boxes") else None
scores = predictions.scores if predictions.has("scores") else None
classes = predictions.pred_classes.tolist() if predictions.has("pred_classes") else None
labels = _create_text_labels(classes, scores, self.metadata.get("thing_classes", None))
keypoints = predictions.pred_keypoints if predictions.has("pred_keypoints") else None
if predictions.has("pred_masks"):
masks = np.asarray(predictions.pred_masks)
masks = [GenericMask(x, self.output.height, self.output.width) for x in masks]
else:
masks = None
if self._instance_mode == ColorMode.SEGMENTATION and self.metadata.get("thing_colors"):
colors = [
self._jitter([x / 255 for x in self.metadata.thing_colors[c]]) for c in classes
]
alpha = 0.8
else:
colors = None
alpha = 0.5
if self._instance_mode == ColorMode.IMAGE_BW:
self.output.reset_image(
self._create_grayscale_image(
(predictions.pred_masks.any(dim=0) > 0).numpy()
if predictions.has("pred_masks")
else None
)
)
alpha = 0.3
self.overlay_instances(
masks=masks,
boxes=boxes,
labels=labels,
keypoints=keypoints,
assigned_colors=colors,
alpha=alpha,
)
return self.output
def draw_sem_seg(self, sem_seg, area_threshold=None, alpha=0.8):
"""
Draw semantic segmentation predictions/labels.
Args:
sem_seg (Tensor or ndarray): the segmentation of shape (H, W).
Each value is the integer label of the pixel.
area_threshold (int): segments with less than `area_threshold` are not drawn.
alpha (float): the larger it is, the more opaque the segmentations are.
Returns:
output (VisImage): image object with visualizations.
"""
if isinstance(sem_seg, torch.Tensor):
sem_seg = sem_seg.numpy()
labels, areas = np.unique(sem_seg, return_counts=True)
sorted_idxs = np.argsort(-areas).tolist()
labels = labels[sorted_idxs]
for label in filter(lambda l: l < len(self.metadata.stuff_classes), labels):
try:
mask_color = [x / 255 for x in self.metadata.stuff_colors[label]]
except (AttributeError, IndexError):
mask_color = None
binary_mask = (sem_seg == label).astype(np.uint8)
text = self.metadata.stuff_classes[label]
self.draw_binary_mask(
binary_mask,
color=mask_color,
edge_color=_OFF_WHITE,
text=text,
alpha=alpha,
area_threshold=area_threshold,
)
return self.output
def draw_panoptic_seg(self, panoptic_seg, segments_info, area_threshold=None, alpha=0.7):
"""
Draw panoptic prediction annotations or results.
Args:
panoptic_seg (Tensor): of shape (height, width) where the values are ids for each
segment.
segments_info (list[dict] or None): Describe each segment in `panoptic_seg`.
If it is a ``list[dict]``, each dict contains keys "id", "category_id".
If None, category id of each pixel is computed by
``pixel // metadata.label_divisor``.
area_threshold (int): stuff segments with less than `area_threshold` are not drawn.
Returns:
output (VisImage): image object with visualizations.
"""
pred = _PanopticPrediction(panoptic_seg, segments_info, self.metadata)
if self._instance_mode == ColorMode.IMAGE_BW:
self.output.reset_image(self._create_grayscale_image(pred.non_empty_mask()))
# draw mask for all semantic segments first i.e. "stuff"
for mask, sinfo in pred.semantic_masks():
category_idx = sinfo["category_id"]
try:
mask_color = [x / 255 for x in self.metadata.stuff_colors[category_idx]]
except AttributeError:
mask_color = None
text = self.metadata.stuff_classes[category_idx]
self.draw_binary_mask(
mask,
color=mask_color,
edge_color=_OFF_WHITE,
text=text,
alpha=alpha,
area_threshold=area_threshold,
)
# draw mask for all instances second
all_instances = list(pred.instance_masks())
if len(all_instances) == 0:
return self.output
masks, sinfo = list(zip(*all_instances))
category_ids = [x["category_id"] for x in sinfo]
try:
scores = [x["score"] for x in sinfo]
except KeyError:
scores = None
labels = _create_text_labels(
category_ids, scores, self.metadata.thing_classes, [x.get("iscrowd", 0) for x in sinfo]
)
try:
colors = [
self._jitter([x / 255 for x in self.metadata.thing_colors[c]]) for c in category_ids
]
except AttributeError:
colors = None
self.overlay_instances(masks=masks, labels=labels, assigned_colors=colors, alpha=alpha)
return self.output
draw_panoptic_seg_predictions = draw_panoptic_seg # backward compatibility
def draw_dataset_dict(self, dic):
"""
Draw annotations/segmentaions in Detectron2 Dataset format.
Args:
dic (dict): annotation/segmentation data of one image, in Detectron2 Dataset format.
Returns:
output (VisImage): image object with visualizations.
"""
annos = dic.get("annotations", None)
if annos:
if "segmentation" in annos[0]:
masks = [x["segmentation"] for x in annos]
else:
masks = None
if "keypoints" in annos[0]:
keypts = [x["keypoints"] for x in annos]
keypts = np.array(keypts).reshape(len(annos), -1, 3)
else:
keypts = None
boxes = [
BoxMode.convert(x["bbox"], x["bbox_mode"], BoxMode.XYXY_ABS)
if len(x["bbox"]) == 4
else x["bbox"]
for x in annos
]
colors = None
category_ids = [x["category_id"] for x in annos]
if self._instance_mode == ColorMode.SEGMENTATION and self.metadata.get("thing_colors"):
colors = [
self._jitter([x / 255 for x in self.metadata.thing_colors[c]])
for c in category_ids
]
names = self.metadata.get("thing_classes", None)
labels = _create_text_labels(
category_ids,
scores=None,
class_names=names,
is_crowd=[x.get("iscrowd", 0) for x in annos],
)
self.overlay_instances(
labels=labels, boxes=boxes, masks=masks, keypoints=keypts, assigned_colors=colors
)
sem_seg = dic.get("sem_seg", None)
if sem_seg is None and "sem_seg_file_name" in dic:
with PathManager.open(dic["sem_seg_file_name"], "rb") as f:
sem_seg = Image.open(f)
sem_seg = np.asarray(sem_seg, dtype="uint8")
if sem_seg is not None:
self.draw_sem_seg(sem_seg, area_threshold=0, alpha=0.5)
pan_seg = dic.get("pan_seg", None)
if pan_seg is None and "pan_seg_file_name" in dic:
with PathManager.open(dic["pan_seg_file_name"], "rb") as f:
pan_seg = Image.open(f)
pan_seg = np.asarray(pan_seg)
from panopticapi.utils import rgb2id
pan_seg = rgb2id(pan_seg)
if pan_seg is not None:
segments_info = dic["segments_info"]
pan_seg = torch.tensor(pan_seg)
self.draw_panoptic_seg(pan_seg, segments_info, area_threshold=0, alpha=0.5)
return self.output
def overlay_instances(
self,
*,
boxes=None,
labels=None,
masks=None,
keypoints=None,
assigned_colors=None,
alpha=0.5,
):
"""
Args:
boxes (Boxes, RotatedBoxes or ndarray): either a :class:`Boxes`,
or an Nx4 numpy array of XYXY_ABS format for the N objects in a single image,
or a :class:`RotatedBoxes`,
or an Nx5 numpy array of (x_center, y_center, width, height, angle_degrees) format
for the N objects in a single image,
labels (list[str]): the text to be displayed for each instance.
masks (masks-like object): Supported types are:
* :class:`detectron2.structures.PolygonMasks`,
:class:`detectron2.structures.BitMasks`.
* list[list[ndarray]]: contains the segmentation masks for all objects in one image.
The first level of the list corresponds to individual instances. The second
level to all the polygon that compose the instance, and the third level
to the polygon coordinates. The third level should have the format of
[x0, y0, x1, y1, ..., xn, yn] (n >= 3).
* list[ndarray]: each ndarray is a binary mask of shape (H, W).
* list[dict]: each dict is a COCO-style RLE.
keypoints (Keypoint or array like): an array-like object of shape (N, K, 3),
where the N is the number of instances and K is the number of keypoints.
The last dimension corresponds to (x, y, visibility or score).
assigned_colors (list[matplotlib.colors]): a list of colors, where each color
corresponds to each mask or box in the image. Refer to 'matplotlib.colors'
for full list of formats that the colors are accepted in.
Returns:
output (VisImage): image object with visualizations.
"""
num_instances = 0
if boxes is not None:
boxes = self._convert_boxes(boxes)
num_instances = len(boxes)
if masks is not None:
masks = self._convert_masks(masks)
if num_instances:
assert len(masks) == num_instances
else:
num_instances = len(masks)
if keypoints is not None:
if num_instances:
assert len(keypoints) == num_instances
else:
num_instances = len(keypoints)
keypoints = self._convert_keypoints(keypoints)
if labels is not None:
assert len(labels) == num_instances
if assigned_colors is None:
assigned_colors = [random_color(rgb=True, maximum=1) for _ in range(num_instances)]
if num_instances == 0:
return self.output
if boxes is not None and boxes.shape[1] == 5:
return self.overlay_rotated_instances(
boxes=boxes, labels=labels, assigned_colors=assigned_colors
)
# Display in largest to smallest order to reduce occlusion.
areas = None
if boxes is not None:
areas = np.prod(boxes[:, 2:] - boxes[:, :2], axis=1)
elif masks is not None:
areas = np.asarray([x.area() for x in masks])
if areas is not None:
sorted_idxs = np.argsort(-areas).tolist()
# Re-order overlapped instances in descending order.
boxes = boxes[sorted_idxs] if boxes is not None else None
labels = [labels[k] for k in sorted_idxs] if labels is not None else None
masks = [masks[idx] for idx in sorted_idxs] if masks is not None else None
assigned_colors = [assigned_colors[idx] for idx in sorted_idxs]
keypoints = keypoints[sorted_idxs] if keypoints is not None else None
for i in range(num_instances):
color = assigned_colors[i]
if boxes is not None:
self.draw_box(boxes[i], edge_color=color)
if masks is not None:
for segment in masks[i].polygons:
self.draw_polygon(segment.reshape(-1, 2), color, alpha=alpha)
if labels is not None:
# first get a box
if boxes is not None:
x0, y0, x1, y1 = boxes[i]
text_pos = (x0, y0) # if drawing boxes, put text on the box corner.
horiz_align = "left"
elif masks is not None:
# skip small mask without polygon
if len(masks[i].polygons) == 0:
continue
x0, y0, x1, y1 = masks[i].bbox()
# draw text in the center (defined by median) when box is not drawn
# median is less sensitive to outliers.
text_pos = np.median(masks[i].mask.nonzero(), axis=1)[::-1]
horiz_align = "center"
else:
continue # drawing the box confidence for keypoints isn't very useful.
# for small objects, draw text at the side to avoid occlusion
instance_area = (y1 - y0) * (x1 - x0)
if (
instance_area < _SMALL_OBJECT_AREA_THRESH * self.output.scale
or y1 - y0 < 40 * self.output.scale
):
if y1 >= self.output.height - 5:
text_pos = (x1, y0)
else:
text_pos = (x0, y1)
height_ratio = (y1 - y0) / np.sqrt(self.output.height * self.output.width)
lighter_color = self._change_color_brightness(color, brightness_factor=0.7)
font_size = (
np.clip((height_ratio - 0.02) / 0.08 + 1, 1.2, 2)
* 0.5
* self._default_font_size
)
self.draw_text(
labels[i],
text_pos,
color=lighter_color,
horizontal_alignment=horiz_align,
font_size=font_size,
)
# draw keypoints
if keypoints is not None:
for keypoints_per_instance in keypoints:
self.draw_and_connect_keypoints(keypoints_per_instance)
return self.output
def overlay_rotated_instances(self, boxes=None, labels=None, assigned_colors=None):
"""
Args:
boxes (ndarray): an Nx5 numpy array of
(x_center, y_center, width, height, angle_degrees) format
for the N objects in a single image.
labels (list[str]): the text to be displayed for each instance.
assigned_colors (list[matplotlib.colors]): a list of colors, where each color
corresponds to each mask or box in the image. Refer to 'matplotlib.colors'
for full list of formats that the colors are accepted in.
Returns:
output (VisImage): image object with visualizations.
"""
num_instances = len(boxes)
if assigned_colors is None:
assigned_colors = [random_color(rgb=True, maximum=1) for _ in range(num_instances)]
if num_instances == 0:
return self.output
# Display in largest to smallest order to reduce occlusion.
if boxes is not None:
areas = boxes[:, 2] * boxes[:, 3]
sorted_idxs = np.argsort(-areas).tolist()
# Re-order overlapped instances in descending order.
boxes = boxes[sorted_idxs]
labels = [labels[k] for k in sorted_idxs] if labels is not None else None
colors = [assigned_colors[idx] for idx in sorted_idxs]
for i in range(num_instances):
self.draw_rotated_box_with_label(
boxes[i], edge_color=colors[i], label=labels[i] if labels is not None else None
)
return self.output
def draw_and_connect_keypoints(self, keypoints):
"""
Draws keypoints of an instance and follows the rules for keypoint connections
to draw lines between appropriate keypoints. This follows color heuristics for
line color.
Args:
keypoints (Tensor): a tensor of shape (K, 3), where K is the number of keypoints
and the last dimension corresponds to (x, y, probability).
Returns:
output (VisImage): image object with visualizations.
"""
visible = {}
keypoint_names = self.metadata.get("keypoint_names")
for idx, keypoint in enumerate(keypoints):
# draw keypoint
x, y, prob = keypoint
if prob > self.keypoint_threshold:
self.draw_circle((x, y), color=_RED)
if keypoint_names:
keypoint_name = keypoint_names[idx]
visible[keypoint_name] = (x, y)
if self.metadata.get("keypoint_connection_rules"):
for kp0, kp1, color in self.metadata.keypoint_connection_rules:
if kp0 in visible and kp1 in visible:
x0, y0 = visible[kp0]
x1, y1 = visible[kp1]
color = tuple(x / 255.0 for x in color)
self.draw_line([x0, x1], [y0, y1], color=color)
# draw lines from nose to mid-shoulder and mid-shoulder to mid-hip
# Note that this strategy is specific to person keypoints.
# For other keypoints, it should just do nothing
try:
ls_x, ls_y = visible["left_shoulder"]
rs_x, rs_y = visible["right_shoulder"]
mid_shoulder_x, mid_shoulder_y = (ls_x + rs_x) / 2, (ls_y + rs_y) / 2
except KeyError:
pass
else:
# draw line from nose to mid-shoulder
nose_x, nose_y = visible.get("nose", (None, None))
if nose_x is not None:
self.draw_line([nose_x, mid_shoulder_x], [nose_y, mid_shoulder_y], color=_RED)
try:
# draw line from mid-shoulder to mid-hip
lh_x, lh_y = visible["left_hip"]
rh_x, rh_y = visible["right_hip"]
except KeyError:
pass
else:
mid_hip_x, mid_hip_y = (lh_x + rh_x) / 2, (lh_y + rh_y) / 2
self.draw_line([mid_hip_x, mid_shoulder_x], [mid_hip_y, mid_shoulder_y], color=_RED)
return self.output
"""
Primitive drawing functions:
"""
def draw_text(
self,
text,
position,
*,
font_size=None,
color="g",
horizontal_alignment="center",
rotation=0,
):
"""
Args:
text (str): class label
position (tuple): a tuple of the x and y coordinates to place text on image.
font_size (int, optional): font of the text. If not provided, a font size
proportional to the image width is calculated and used.
color: color of the text. Refer to `matplotlib.colors` for full list
of formats that are accepted.
horizontal_alignment (str): see `matplotlib.text.Text`
rotation: rotation angle in degrees CCW
Returns:
output (VisImage): image object with text drawn.
"""
if not font_size:
font_size = self._default_font_size
# since the text background is dark, we don't want the text to be dark
color = np.maximum(list(mplc.to_rgb(color)), 0.2)
color[np.argmax(color)] = max(0.8, np.max(color))
x, y = position
self.output.ax.text(
x,
y,
text,
size=font_size * self.output.scale,
family="sans-serif",
bbox={"facecolor": "black", "alpha": 0.8, "pad": 0.7, "edgecolor": "none"},
verticalalignment="top",
horizontalalignment=horizontal_alignment,
color=color,
zorder=10,
rotation=rotation,
)
return self.output
def draw_box(self, box_coord, alpha=0.5, edge_color="g", line_style="-"):
"""
Args:
box_coord (tuple): a tuple containing x0, y0, x1, y1 coordinates, where x0 and y0
are the coordinates of the image's top left corner. x1 and y1 are the
coordinates of the image's bottom right corner.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
edge_color: color of the outline of the box. Refer to `matplotlib.colors`
for full list of formats that are accepted.
line_style (string): the string to use to create the outline of the boxes.
Returns:
output (VisImage): image object with box drawn.
"""
x0, y0, x1, y1 = box_coord
width = x1 - x0
height = y1 - y0
linewidth = max(self._default_font_size / 4, 1)
self.output.ax.add_patch(
mpl.patches.Rectangle(
(x0, y0),
width,
height,
fill=False,
edgecolor=edge_color,
linewidth=linewidth * self.output.scale,
alpha=alpha,
linestyle=line_style,
)
)
return self.output
def draw_rotated_box_with_label(
self, rotated_box, alpha=0.5, edge_color="g", line_style="-", label=None
):
"""
Draw a rotated box with label on its top-left corner.
Args:
rotated_box (tuple): a tuple containing (cnt_x, cnt_y, w, h, angle),
where cnt_x and cnt_y are the center coordinates of the box.
w and h are the width and height of the box. angle represents how
many degrees the box is rotated CCW with regard to the 0-degree box.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
edge_color: color of the outline of the box. Refer to `matplotlib.colors`
for full list of formats that are accepted.
line_style (string): the string to use to create the outline of the boxes.
label (string): label for rotated box. It will not be rendered when set to None.
Returns:
output (VisImage): image object with box drawn.
"""
cnt_x, cnt_y, w, h, angle = rotated_box
area = w * h
# use thinner lines when the box is small
linewidth = self._default_font_size / (
6 if area < _SMALL_OBJECT_AREA_THRESH * self.output.scale else 3
)
theta = angle * math.pi / 180.0
c = math.cos(theta)
s = math.sin(theta)
rect = [(-w / 2, h / 2), (-w / 2, -h / 2), (w / 2, -h / 2), (w / 2, h / 2)]
# x: left->right ; y: top->down
rotated_rect = [(s * yy + c * xx + cnt_x, c * yy - s * xx + cnt_y) for (xx, yy) in rect]
for k in range(4):
j = (k + 1) % 4
self.draw_line(
[rotated_rect[k][0], rotated_rect[j][0]],
[rotated_rect[k][1], rotated_rect[j][1]],
color=edge_color,
linestyle="--" if k == 1 else line_style,
linewidth=linewidth,
)
if label is not None:
text_pos = rotated_rect[1] # topleft corner
height_ratio = h / np.sqrt(self.output.height * self.output.width)
label_color = self._change_color_brightness(edge_color, brightness_factor=0.7)
font_size = (
np.clip((height_ratio - 0.02) / 0.08 + 1, 1.2, 2) * 0.5 * self._default_font_size
)
self.draw_text(label, text_pos, color=label_color, font_size=font_size, rotation=angle)
return self.output
def draw_circle(self, circle_coord, color, radius=3):
"""
Args:
circle_coord (list(int) or tuple(int)): contains the x and y coordinates
of the center of the circle.
color: color of the polygon. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
radius (int): radius of the circle.
Returns:
output (VisImage): image object with box drawn.
"""
x, y = circle_coord
self.output.ax.add_patch(
mpl.patches.Circle(circle_coord, radius=radius, fill=True, color=color)
)
return self.output
def draw_line(self, x_data, y_data, color, linestyle="-", linewidth=None):
"""
Args:
x_data (list[int]): a list containing x values of all the points being drawn.
Length of list should match the length of y_data.
y_data (list[int]): a list containing y values of all the points being drawn.
Length of list should match the length of x_data.
color: color of the line. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
linestyle: style of the line. Refer to `matplotlib.lines.Line2D`
for a full list of formats that are accepted.
linewidth (float or None): width of the line. When it's None,
a default value will be computed and used.
Returns:
output (VisImage): image object with line drawn.
"""
if linewidth is None:
linewidth = self._default_font_size / 3
linewidth = max(linewidth, 1)
self.output.ax.add_line(
mpl.lines.Line2D(
x_data,
y_data,
linewidth=linewidth * self.output.scale,
color=color,
linestyle=linestyle,
)
)
return self.output
def draw_binary_mask(
self, binary_mask, color=None, *, edge_color=None, text=None, alpha=0.5, area_threshold=0
):
"""
Args:
binary_mask (ndarray): numpy array of shape (H, W), where H is the image height and
W is the image width. Each value in the array is either a 0 or 1 value of uint8
type.
color: color of the mask. Refer to `matplotlib.colors` for a full list of
formats that are accepted. If None, will pick a random color.
edge_color: color of the polygon edges. Refer to `matplotlib.colors` for a
full list of formats that are accepted.
text (str): if None, will be drawn in the object's center of mass.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
area_threshold (float): a connected component small than this will not be shown.
Returns:
output (VisImage): image object with mask drawn.
"""
if color is None:
color = random_color(rgb=True, maximum=1)
color = mplc.to_rgb(color)
has_valid_segment = False
binary_mask = binary_mask.astype("uint8") # opencv needs uint8
mask = GenericMask(binary_mask, self.output.height, self.output.width)
shape2d = (binary_mask.shape[0], binary_mask.shape[1])
if not mask.has_holes:
# draw polygons for regular masks
for segment in mask.polygons:
area = mask_util.area(mask_util.frPyObjects([segment], shape2d[0], shape2d[1]))
if area < (area_threshold or 0):
continue
has_valid_segment = True
segment = segment.reshape(-1, 2)
self.draw_polygon(segment, color=color, edge_color=edge_color, alpha=alpha)
else:
# TODO: Use Path/PathPatch to draw vector graphics:
# https://stackoverflow.com/questions/8919719/how-to-plot-a-complex-polygon
rgba = np.zeros(shape2d + (4,), dtype="float32")
rgba[:, :, :3] = color
rgba[:, :, 3] = (mask.mask == 1).astype("float32") * alpha
has_valid_segment = True
self.output.ax.imshow(rgba, extent=(0, self.output.width, self.output.height, 0))
if text is not None and has_valid_segment:
# TODO sometimes drawn on wrong objects. the heuristics here can improve.
lighter_color = self._change_color_brightness(color, brightness_factor=0.7)
_num_cc, cc_labels, stats, centroids = cv2.connectedComponentsWithStats(binary_mask, 8)
largest_component_id = np.argmax(stats[1:, -1]) + 1
# draw text on the largest component, as well as other very large components.
for cid in range(1, _num_cc):
if cid == largest_component_id or stats[cid, -1] > _LARGE_MASK_AREA_THRESH:
# median is more stable than centroid
# center = centroids[largest_component_id]
center = np.median((cc_labels == cid).nonzero(), axis=1)[::-1]
self.draw_text(text, center, color=lighter_color)
return self.output
def draw_polygon(self, segment, color, edge_color=None, alpha=0.5):
"""
Args:
segment: numpy array of shape Nx2, containing all the points in the polygon.
color: color of the polygon. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
edge_color: color of the polygon edges. Refer to `matplotlib.colors` for a
full list of formats that are accepted. If not provided, a darker shade
of the polygon color will be used instead.
alpha (float): blending efficient. Smaller values lead to more transparent masks.
Returns:
output (VisImage): image object with polygon drawn.
"""
if edge_color is None:
# make edge color darker than the polygon color
if alpha > 0.8:
edge_color = self._change_color_brightness(color, brightness_factor=-0.7)
else:
edge_color = color
edge_color = mplc.to_rgb(edge_color) + (1,)
polygon = mpl.patches.Polygon(
segment,
fill=True,
facecolor=mplc.to_rgb(color) + (alpha,),
edgecolor=edge_color,
linewidth=max(self._default_font_size // 15 * self.output.scale, 1),
)
self.output.ax.add_patch(polygon)
return self.output
"""
Internal methods:
"""
def _jitter(self, color):
"""
Randomly modifies given color to produce a slightly different color than the color given.
Args:
color (tuple[double]): a tuple of 3 elements, containing the RGB values of the color
picked. The values in the list are in the [0.0, 1.0] range.
Returns:
jittered_color (tuple[double]): a tuple of 3 elements, containing the RGB values of the
color after being jittered. The values in the list are in the [0.0, 1.0] range.
"""
color = mplc.to_rgb(color)
vec = np.random.rand(3)
# better to do it in another color space
vec = vec / np.linalg.norm(vec) * 0.5
res = np.clip(vec + color, 0, 1)
return tuple(res)
def _create_grayscale_image(self, mask=None):
"""
Create a grayscale version of the original image.
The colors in masked area, if given, will be kept.
"""
img_bw = self.img.astype("f4").mean(axis=2)
img_bw = np.stack([img_bw] * 3, axis=2)
if mask is not None:
img_bw[mask] = self.img[mask]
return img_bw
def _change_color_brightness(self, color, brightness_factor):
"""
Depending on the brightness_factor, gives a lighter or darker color i.e. a color with
less or more saturation than the original color.
Args:
color: color of the polygon. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
brightness_factor (float): a value in [-1.0, 1.0] range. A lightness factor of
0 will correspond to no change, a factor in [-1.0, 0) range will result in
a darker color and a factor in (0, 1.0] range will result in a lighter color.
Returns:
modified_color (tuple[double]): a tuple containing the RGB values of the
modified color. Each value in the tuple is in the [0.0, 1.0] range.
"""
assert brightness_factor >= -1.0 and brightness_factor <= 1.0
color = mplc.to_rgb(color)
polygon_color = colorsys.rgb_to_hls(*mplc.to_rgb(color))
modified_lightness = polygon_color[1] + (brightness_factor * polygon_color[1])
modified_lightness = 0.0 if modified_lightness < 0.0 else modified_lightness
modified_lightness = 1.0 if modified_lightness > 1.0 else modified_lightness
modified_color = colorsys.hls_to_rgb(polygon_color[0], modified_lightness, polygon_color[2])
return modified_color
def _convert_boxes(self, boxes):
"""
Convert different format of boxes to an NxB array, where B = 4 or 5 is the box dimension.
"""
if isinstance(boxes, Boxes) or isinstance(boxes, RotatedBoxes):
return boxes.tensor.detach().numpy()
else:
return np.asarray(boxes)
def _convert_masks(self, masks_or_polygons):
"""
Convert different format of masks or polygons to a tuple of masks and polygons.
Returns:
list[GenericMask]:
"""
m = masks_or_polygons
if isinstance(m, PolygonMasks):
m = m.polygons
if isinstance(m, BitMasks):
m = m.tensor.numpy()
if isinstance(m, torch.Tensor):
m = m.numpy()
ret = []
for x in m:
if isinstance(x, GenericMask):
ret.append(x)
else:
ret.append(GenericMask(x, self.output.height, self.output.width))
return ret
def _convert_keypoints(self, keypoints):
if isinstance(keypoints, Keypoints):
keypoints = keypoints.tensor
keypoints = np.asarray(keypoints)
return keypoints
def get_output(self):
"""
Returns:
output (VisImage): the image output containing the visualizations added
to the image.
"""
return self.output
import re
def minify_html(html):
# 移除多余的空白字符
html = re.sub(r'\s+', ' ', html)
# 移除行尾的空白字符
html = re.sub(r'\s*>\s*', '>', html)
# 移除标签前的空白字符
html = re.sub(r'\s*<\s*', '<', html)
return html.strip()
\ No newline at end of file
from abc import ABC, abstractmethod
from typing import Callable
from magic_pdf.data.data_reader_writer import DataWriter
from magic_pdf.data.dataset import Dataset
from magic_pdf.operators.pipes import PipeResult
class InferenceResultBase(ABC):
@abstractmethod
def __init__(self, inference_results: list, dataset: Dataset):
"""Initialized method.
Args:
inference_results (list): the inference result generated by model
dataset (Dataset): the dataset related with model inference result
"""
pass
@abstractmethod
def draw_model(self, file_path: str) -> None:
"""Draw model inference result.
Args:
file_path (str): the output file path
"""
pass
@abstractmethod
def dump_model(self, writer: DataWriter, file_path: str):
"""Dump model inference result to file.
Args:
writer (DataWriter): writer handle
file_path (str): the location of target file
"""
pass
@abstractmethod
def get_infer_res(self):
"""Get the inference result.
Returns:
list: the inference result generated by model
"""
pass
@abstractmethod
def apply(self, proc: Callable, *args, **kwargs):
"""Apply callable method which.
Args:
proc (Callable): invoke proc as follows:
proc(inference_result, *args, **kwargs)
Returns:
Any: return the result generated by proc
"""
pass
def pipe_txt_mode(
self,
imageWriter: DataWriter,
start_page_id=0,
end_page_id=None,
debug_mode=False,
lang=None,
) -> PipeResult:
"""Post-proc the model inference result, Extract the text using the
third library, such as `pymupdf`
Args:
imageWriter (DataWriter): the image writer handle
start_page_id (int, optional): Defaults to 0. Let user select some pages He/She want to process
end_page_id (int, optional): Defaults to the last page index of dataset. Let user select some pages He/She want to process
debug_mode (bool, optional): Defaults to False. will dump more log if enabled
lang (str, optional): Defaults to None.
Returns:
PipeResult: the result
"""
pass
@abstractmethod
def pipe_ocr_mode(
self,
imageWriter: DataWriter,
start_page_id=0,
end_page_id=None,
debug_mode=False,
lang=None,
) -> PipeResult:
pass
import copy
import json
import os
from typing import Callable
from magic_pdf.config.constants import PARSE_TYPE_OCR, PARSE_TYPE_TXT
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.data.data_reader_writer import DataWriter
from magic_pdf.data.dataset import Dataset
from magic_pdf.libs.draw_bbox import draw_model_bbox
from magic_pdf.libs.version import __version__
from magic_pdf.operators.pipes import PipeResult
from magic_pdf.pdf_parse_union_core_v2 import pdf_parse_union
from magic_pdf.operators import InferenceResultBase
class InferenceResult(InferenceResultBase):
def __init__(self, inference_results: list, dataset: Dataset):
"""Initialized method.
Args:
inference_results (list): the inference result generated by model
dataset (Dataset): the dataset related with model inference result
"""
self._infer_res = inference_results
self._dataset = dataset
def draw_model(self, file_path: str) -> None:
"""Draw model inference result.
Args:
file_path (str): the output file path
"""
dir_name = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)
draw_model_bbox(
copy.deepcopy(self._infer_res), self._dataset, dir_name, base_name
)
def dump_model(self, writer: DataWriter, file_path: str):
"""Dump model inference result to file.
Args:
writer (DataWriter): writer handle
file_path (str): the location of target file
"""
writer.write_string(
file_path, json.dumps(self._infer_res, ensure_ascii=False, indent=4)
)
def get_infer_res(self):
"""Get the inference result.
Returns:
list: the inference result generated by model
"""
return self._infer_res
def apply(self, proc: Callable, *args, **kwargs):
"""Apply callable method which.
Args:
proc (Callable): invoke proc as follows:
proc(inference_result, *args, **kwargs)
Returns:
Any: return the result generated by proc
"""
return proc(copy.deepcopy(self._infer_res), *args, **kwargs)
def pipe_txt_mode(
self,
imageWriter: DataWriter,
start_page_id=0,
end_page_id=None,
debug_mode=False,
lang=None,
) -> PipeResult:
"""Post-proc the model inference result, Extract the text using the
third library, such as `pymupdf`
Args:
imageWriter (DataWriter): the image writer handle
start_page_id (int, optional): Defaults to 0. Let user select some pages He/She want to process
end_page_id (int, optional): Defaults to the last page index of dataset. Let user select some pages He/She want to process
debug_mode (bool, optional): Defaults to False. will dump more log if enabled
lang (str, optional): Defaults to None.
Returns:
PipeResult: the result
"""
def proc(*args, **kwargs) -> PipeResult:
res = pdf_parse_union(*args, **kwargs)
res['_parse_type'] = PARSE_TYPE_TXT
res['_version_name'] = __version__
if 'lang' in kwargs and kwargs['lang'] is not None:
res['lang'] = kwargs['lang']
return PipeResult(res, self._dataset)
res = self.apply(
proc,
self._dataset,
imageWriter,
SupportedPdfParseMethod.TXT,
start_page_id=start_page_id,
end_page_id=end_page_id,
debug_mode=debug_mode,
lang=lang,
)
return res
def pipe_ocr_mode(
self,
imageWriter: DataWriter,
start_page_id=0,
end_page_id=None,
debug_mode=False,
lang=None,
) -> PipeResult:
"""Post-proc the model inference result, Extract the text using `OCR`
technical.
Args:
imageWriter (DataWriter): the image writer handle
start_page_id (int, optional): Defaults to 0. Let user select some pages He/She want to process
end_page_id (int, optional): Defaults to the last page index of dataset. Let user select some pages He/She want to process
debug_mode (bool, optional): Defaults to False. will dump more log if enabled
lang (str, optional): Defaults to None.
Returns:
PipeResult: the result
"""
def proc(*args, **kwargs) -> PipeResult:
res = pdf_parse_union(*args, **kwargs)
res['_parse_type'] = PARSE_TYPE_OCR
res['_version_name'] = __version__
if 'lang' in kwargs and kwargs['lang'] is not None:
res['lang'] = kwargs['lang']
return PipeResult(res, self._dataset)
res = self.apply(
proc,
self._dataset,
imageWriter,
SupportedPdfParseMethod.OCR,
start_page_id=start_page_id,
end_page_id=end_page_id,
debug_mode=debug_mode,
lang=lang,
)
return res
import copy
import json
import os
from typing import Callable
from magic_pdf.config.make_content_config import DropMode, MakeMode
from magic_pdf.data.data_reader_writer import DataWriter
from magic_pdf.data.dataset import Dataset
from magic_pdf.dict2md.ocr_mkcontent import union_make
from magic_pdf.libs.draw_bbox import (draw_layout_bbox, draw_line_sort_bbox,
draw_span_bbox)
from magic_pdf.libs.json_compressor import JsonCompressor
class PipeResult:
def __init__(self, pipe_res, dataset: Dataset):
"""Initialized.
Args:
pipe_res (list[dict]): the pipeline processed result of model inference result
dataset (Dataset): the dataset associated with pipe_res
"""
self._pipe_res = pipe_res
self._dataset = dataset
def get_markdown(
self,
img_dir_or_bucket_prefix: str,
drop_mode=DropMode.NONE,
md_make_mode=MakeMode.MM_MD,
) -> str:
"""Get markdown content.
Args:
img_dir_or_bucket_prefix (str): The s3 bucket prefix or local file directory which used to store the figure
drop_mode (str, optional): Drop strategy when some page which is corrupted or inappropriate. Defaults to DropMode.NONE.
md_make_mode (str, optional): The content Type of Markdown be made. Defaults to MakeMode.MM_MD.
Returns:
str: return markdown content
"""
pdf_info_list = self._pipe_res['pdf_info']
md_content = union_make(
pdf_info_list, md_make_mode, drop_mode, img_dir_or_bucket_prefix
)
return md_content
def dump_md(
self,
writer: DataWriter,
file_path: str,
img_dir_or_bucket_prefix: str,
drop_mode=DropMode.NONE,
md_make_mode=MakeMode.MM_MD,
):
"""Dump The Markdown.
Args:
writer (DataWriter): File writer handle
file_path (str): The file location of markdown
img_dir_or_bucket_prefix (str): The s3 bucket prefix or local file directory which used to store the figure
drop_mode (str, optional): Drop strategy when some page which is corrupted or inappropriate. Defaults to DropMode.NONE.
md_make_mode (str, optional): The content Type of Markdown be made. Defaults to MakeMode.MM_MD.
"""
md_content = self.get_markdown(
img_dir_or_bucket_prefix, drop_mode=drop_mode, md_make_mode=md_make_mode
)
writer.write_string(file_path, md_content)
def get_content_list(
self,
image_dir_or_bucket_prefix: str,
drop_mode=DropMode.NONE,
) -> str:
"""Get Content List.
Args:
image_dir_or_bucket_prefix (str): The s3 bucket prefix or local file directory which used to store the figure
drop_mode (str, optional): Drop strategy when some page which is corrupted or inappropriate. Defaults to DropMode.NONE.
Returns:
str: content list content
"""
pdf_info_list = self._pipe_res['pdf_info']
content_list = union_make(
pdf_info_list,
MakeMode.STANDARD_FORMAT,
drop_mode,
image_dir_or_bucket_prefix,
)
return content_list
def dump_content_list(
self,
writer: DataWriter,
file_path: str,
image_dir_or_bucket_prefix: str,
drop_mode=DropMode.NONE,
):
"""Dump Content List.
Args:
writer (DataWriter): File writer handle
file_path (str): The file location of content list
image_dir_or_bucket_prefix (str): The s3 bucket prefix or local file directory which used to store the figure
drop_mode (str, optional): Drop strategy when some page which is corrupted or inappropriate. Defaults to DropMode.NONE.
"""
content_list = self.get_content_list(
image_dir_or_bucket_prefix, drop_mode=drop_mode,
)
writer.write_string(
file_path, json.dumps(content_list, ensure_ascii=False, indent=4)
)
def get_middle_json(self) -> str:
"""Get middle json.
Returns:
str: The content of middle json
"""
return json.dumps(self._pipe_res, ensure_ascii=False, indent=4)
def dump_middle_json(self, writer: DataWriter, file_path: str):
"""Dump the result of pipeline.
Args:
writer (DataWriter): File writer handler
file_path (str): The file location of middle json
"""
middle_json = self.get_middle_json()
writer.write_string(file_path, middle_json)
def draw_layout(self, file_path: str) -> None:
"""Draw the layout.
Args:
file_path (str): The file location of layout result file
"""
dir_name = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)
pdf_info = self._pipe_res['pdf_info']
draw_layout_bbox(pdf_info, self._dataset.data_bits(), dir_name, base_name)
def draw_span(self, file_path: str):
"""Draw the Span.
Args:
file_path (str): The file location of span result file
"""
dir_name = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)
pdf_info = self._pipe_res['pdf_info']
draw_span_bbox(pdf_info, self._dataset.data_bits(), dir_name, base_name)
def draw_line_sort(self, file_path: str):
"""Draw line sort.
Args:
file_path (str): The file location of line sort result file
"""
dir_name = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)
pdf_info = self._pipe_res['pdf_info']
draw_line_sort_bbox(pdf_info, self._dataset.data_bits(), dir_name, base_name)
def get_compress_pdf_mid_data(self):
"""Compress the pipeline result.
Returns:
str: compress the pipeline result and return
"""
return JsonCompressor.compress_json(self._pipe_res)
def apply(self, proc: Callable, *args, **kwargs):
"""Apply callable method which.
Args:
proc (Callable): invoke proc as follows:
proc(pipeline_result, *args, **kwargs)
Returns:
Any: return the result generated by proc
"""
return proc(copy.deepcopy(self._pipe_res), *args, **kwargs)
import copy
import math
import os
import re
import statistics
import time
import warnings
from typing import List
import cv2
import fitz
import torch
import numpy as np
from loguru import logger
from tqdm import tqdm
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.config.ocr_content_type import BlockType, ContentType
from magic_pdf.data.dataset import Dataset, PageableData
from magic_pdf.libs.boxbase import calculate_overlap_area_in_bbox1_area_ratio, __is_overlaps_y_exceeds_threshold
from magic_pdf.libs.clean_memory import clean_memory
from magic_pdf.libs.config_reader import get_local_layoutreader_model_dir, get_llm_aided_config, get_device
from magic_pdf.libs.convert_utils import dict_to_list
from magic_pdf.libs.hash_utils import compute_md5
from magic_pdf.libs.pdf_image_tools import cut_image_to_pil_image
from magic_pdf.model.magic_model import MagicModel
from magic_pdf.post_proc.llm_aided import llm_aided_formula, llm_aided_text, llm_aided_title
from magic_pdf.model.sub_modules.model_init import AtomModelSingleton
from magic_pdf.post_proc.para_split_v3 import para_split
from magic_pdf.pre_proc.construct_page_dict import ocr_construct_page_component_v2
from magic_pdf.pre_proc.cut_image import ocr_cut_image_and_table
from magic_pdf.pre_proc.ocr_detect_all_bboxes import ocr_prepare_bboxes_for_layout_split_v2
from magic_pdf.pre_proc.ocr_dict_merge import fill_spans_in_blocks, fix_block_spans_v2, fix_discarded_block
from magic_pdf.pre_proc.ocr_span_list_modify import get_qa_need_list_v2, remove_overlaps_low_confidence_spans, \
remove_overlaps_min_spans, remove_x_overlapping_chars
os.environ['NO_ALBUMENTATIONS_UPDATE'] = '1' # 禁止albumentations检查更新
def __replace_STX_ETX(text_str: str):
"""Replace \u0002 and \u0003, as these characters become garbled when extracted using pymupdf. In fact, they were originally quotation marks.
Drawback: This issue is only observed in English text; it has not been found in Chinese text so far.
Args:
text_str (str): raw text
Returns:
_type_: replaced text
""" # noqa: E501
if text_str:
s = text_str.replace('\u0002', "'")
s = s.replace('\u0003', "'")
return s
return text_str
# 连写字符拆分
def __replace_ligatures(text: str):
ligatures = {
'fi': 'fi', 'fl': 'fl', 'ff': 'ff', 'ffi': 'ffi', 'ffl': 'ffl', 'ſt': 'ft', 'st': 'st'
}
return re.sub('|'.join(map(re.escape, ligatures.keys())), lambda m: ligatures[m.group()], text)
def chars_to_content(span):
# 检查span中的char是否为空
if len(span['chars']) == 0:
pass
else:
# 先给chars按char['bbox']的中心点的x坐标排序
span['chars'] = sorted(span['chars'], key=lambda x: (x['bbox'][0] + x['bbox'][2]) / 2)
# Calculate the width of each character
char_widths = [char['bbox'][2] - char['bbox'][0] for char in span['chars']]
# Calculate the median width
median_width = statistics.median(char_widths)
# 通过x轴重叠比率移除一部分char
span = remove_x_overlapping_chars(span, median_width)
content = ''
for char in span['chars']:
# 如果下一个char的x0和上一个char的x1距离超过0.25个字符宽度,则需要在中间插入一个空格
char1 = char
char2 = span['chars'][span['chars'].index(char) + 1] if span['chars'].index(char) + 1 < len(span['chars']) else None
if char2 and char2['bbox'][0] - char1['bbox'][2] > median_width * 0.25 and char['c'] != ' ' and char2['c'] != ' ':
content += f"{char['c']} "
else:
content += char['c']
span['content'] = __replace_ligatures(content)
del span['chars']
LINE_STOP_FLAG = ('.', '!', '?', '。', '!', '?', ')', ')', '"', '”', ':', ':', ';', ';', ']', '】', '}', '}', '>', '》', '、', ',', ',', '-', '—', '–',)
LINE_START_FLAG = ('(', '(', '"', '“', '【', '{', '《', '<', '「', '『', '【', '[',)
def fill_char_in_spans(spans, all_chars):
# 简单从上到下排一下序
spans = sorted(spans, key=lambda x: x['bbox'][1])
for char in all_chars:
for span in spans:
if calculate_char_in_span(char['bbox'], span['bbox'], char['c']):
span['chars'].append(char)
break
need_ocr_spans = []
for span in spans:
chars_to_content(span)
# 有的span中虽然没有字但有一两个空的占位符,用宽高和content长度过滤
if len(span['content']) * span['height'] < span['width'] * 0.5:
# logger.info(f"maybe empty span: {len(span['content'])}, {span['height']}, {span['width']}")
need_ocr_spans.append(span)
del span['height'], span['width']
return need_ocr_spans
# 使用鲁棒性更强的中心点坐标判断
def calculate_char_in_span(char_bbox, span_bbox, char, span_height_radio=0.33):
char_center_x = (char_bbox[0] + char_bbox[2]) / 2
char_center_y = (char_bbox[1] + char_bbox[3]) / 2
span_center_y = (span_bbox[1] + span_bbox[3]) / 2
span_height = span_bbox[3] - span_bbox[1]
if (
span_bbox[0] < char_center_x < span_bbox[2]
and span_bbox[1] < char_center_y < span_bbox[3]
and abs(char_center_y - span_center_y) < span_height * span_height_radio # 字符的中轴和span的中轴高度差不能超过1/4span高度
):
return True
else:
# 如果char是LINE_STOP_FLAG,就不用中心点判定,换一种方案(左边界在span区域内,高度判定和之前逻辑一致)
# 主要是给结尾符号一个进入span的机会,这个char还应该离span右边界较近
if char in LINE_STOP_FLAG:
if (
(span_bbox[2] - span_height) < char_bbox[0] < span_bbox[2]
and char_center_x > span_bbox[0]
and span_bbox[1] < char_center_y < span_bbox[3]
and abs(char_center_y - span_center_y) < span_height * span_height_radio
):
return True
elif char in LINE_START_FLAG:
if (
span_bbox[0] < char_bbox[2] < (span_bbox[0] + span_height)
and char_center_x < span_bbox[2]
and span_bbox[1] < char_center_y < span_bbox[3]
and abs(char_center_y - span_center_y) < span_height * span_height_radio
):
return True
else:
return False
def remove_tilted_line(text_blocks):
for block in text_blocks:
remove_lines = []
for line in block['lines']:
cosine, sine = line['dir']
# 计算弧度值
angle_radians = math.atan2(sine, cosine)
# 将弧度值转换为角度值
angle_degrees = math.degrees(angle_radians)
if 2 < abs(angle_degrees) < 88:
remove_lines.append(line)
for line in remove_lines:
block['lines'].remove(line)
def calculate_contrast(img, img_mode) -> float:
"""
计算给定图像的对比度。
:param img: 图像,类型为numpy.ndarray
:Param img_mode = 图像的色彩通道,'rgb' 或 'bgr'
:return: 图像的对比度值
"""
if img_mode == 'rgb':
# 将RGB图像转换为灰度图
gray_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
elif img_mode == 'bgr':
# 将BGR图像转换为灰度图
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
else:
raise ValueError("Invalid image mode. Please provide 'rgb' or 'bgr'.")
# 计算均值和标准差
mean_value = np.mean(gray_img)
std_dev = np.std(gray_img)
# 对比度定义为标准差除以平均值(加上小常数避免除零错误)
contrast = std_dev / (mean_value + 1e-6)
# logger.debug(f"contrast: {contrast}")
return round(contrast, 2)
# @measure_time
def txt_spans_extract_v2(pdf_page, spans, all_bboxes, all_discarded_blocks, lang):
# cid用0xfffd表示,连字符拆开
# text_blocks_raw = pdf_page.get_text('rawdict', flags=fitz.TEXT_PRESERVE_WHITESPACE | fitz.TEXT_MEDIABOX_CLIP)['blocks']
# cid用0xfffd表示,连字符不拆开
#text_blocks_raw = pdf_page.get_text('rawdict', flags=fitz.TEXT_PRESERVE_LIGATURES | fitz.TEXT_PRESERVE_WHITESPACE | fitz.TEXT_MEDIABOX_CLIP)['blocks']
# 自定义flags出现较多0xfffd,可能是pymupdf可以自行处理内置字典的pdf,不再使用
text_blocks_raw = pdf_page.get_text('rawdict', flags=fitz.TEXTFLAGS_TEXT)['blocks']
# text_blocks = pdf_page.get_text('dict', flags=fitz.TEXTFLAGS_TEXT)['blocks']
# 移除所有角度不为0或90的line
remove_tilted_line(text_blocks_raw)
all_pymu_chars = []
for block in text_blocks_raw:
for line in block['lines']:
cosine, sine = line['dir']
if abs(cosine) < 0.9 or abs(sine) > 0.1:
continue
for span in line['spans']:
all_pymu_chars.extend(span['chars'])
# 计算所有sapn的高度的中位数
span_height_list = []
for span in spans:
if span['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table]:
continue
span_height = span['bbox'][3] - span['bbox'][1]
span['height'] = span_height
span['width'] = span['bbox'][2] - span['bbox'][0]
span_height_list.append(span_height)
if len(span_height_list) == 0:
return spans
else:
median_span_height = statistics.median(span_height_list)
useful_spans = []
unuseful_spans = []
# 纵向span的两个特征:1. 高度超过多个line 2. 高宽比超过某个值
vertical_spans = []
for span in spans:
if span['type'] in [ContentType.InterlineEquation, ContentType.Image, ContentType.Table]:
continue
for block in all_bboxes + all_discarded_blocks:
if block[7] in [BlockType.ImageBody, BlockType.TableBody, BlockType.InterlineEquation]:
continue
if calculate_overlap_area_in_bbox1_area_ratio(span['bbox'], block[0:4]) > 0.5:
if span['height'] > median_span_height * 3 and span['height'] > span['width'] * 3:
vertical_spans.append(span)
elif block in all_bboxes:
useful_spans.append(span)
else:
unuseful_spans.append(span)
break
"""垂直的span框直接用pymu的line进行填充"""
if len(vertical_spans) > 0:
text_blocks = pdf_page.get_text('dict', flags=fitz.TEXTFLAGS_TEXT)['blocks']
all_pymu_lines = []
for block in text_blocks:
for line in block['lines']:
all_pymu_lines.append(line)
for pymu_line in all_pymu_lines:
for span in vertical_spans:
if calculate_overlap_area_in_bbox1_area_ratio(pymu_line['bbox'], span['bbox']) > 0.5:
for pymu_span in pymu_line['spans']:
span['content'] += pymu_span['text']
break
for span in vertical_spans:
if len(span['content']) == 0:
spans.remove(span)
"""水平的span框如果没有char则用ocr进行填充"""
new_spans = []
for span in useful_spans + unuseful_spans:
if span['type'] in [ContentType.Text]:
span['chars'] = []
new_spans.append(span)
need_ocr_spans = fill_char_in_spans(new_spans, all_pymu_chars)
if len(need_ocr_spans) > 0:
# 初始化ocr模型
# atom_model_manager = AtomModelSingleton()
# ocr_model = atom_model_manager.get_atom_model(
# atom_model_name='ocr',
# ocr_show_log=False,
# det_db_box_thresh=0.3,
# lang=lang
# )
for span in need_ocr_spans:
# 对span的bbox截图再ocr
span_img = cut_image_to_pil_image(span['bbox'], pdf_page, mode='cv2')
# 计算span的对比度,低于0.20的span不进行ocr
if calculate_contrast(span_img, img_mode='bgr') <= 0.17:
spans.remove(span)
continue
# pass
span['content'] = ''
span['score'] = 1
span['np_img'] = span_img
# ocr_res = ocr_model.ocr(span_img, det=False)
# if ocr_res and len(ocr_res) > 0:
# if len(ocr_res[0]) > 0:
# ocr_text, ocr_score = ocr_res[0][0]
# # logger.info(f"ocr_text: {ocr_text}, ocr_score: {ocr_score}")
# if ocr_score > 0.5 and len(ocr_text) > 0:
# span['content'] = ocr_text
# span['score'] = float(round(ocr_score, 2))
# else:
# spans.remove(span)
return spans
def model_init(model_name: str):
from transformers import LayoutLMv3ForTokenClassification
device_name = get_device()
bf_16_support = False
if device_name.startswith("cuda"):
bf_16_support = torch.cuda.is_bf16_supported()
elif device_name.startswith("mps"):
bf_16_support = True
device = torch.device(device_name)
if model_name == 'layoutreader':
# 检测modelscope的缓存目录是否存在
layoutreader_model_dir = get_local_layoutreader_model_dir()
if os.path.exists(layoutreader_model_dir):
model = LayoutLMv3ForTokenClassification.from_pretrained(
layoutreader_model_dir
)
else:
logger.warning(
'local layoutreader model not exists, use online model from huggingface'
)
model = LayoutLMv3ForTokenClassification.from_pretrained(
'hantian/layoutreader'
)
if bf_16_support:
model.to(device).eval().bfloat16()
else:
model.to(device).eval()
else:
logger.error('model name not allow')
exit(1)
return model
class ModelSingleton:
_instance = None
_models = {}
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def get_model(self, model_name: str):
if model_name not in self._models:
self._models[model_name] = model_init(model_name=model_name)
return self._models[model_name]
def do_predict(boxes: List[List[int]], model) -> List[int]:
from magic_pdf.model.sub_modules.reading_oreder.layoutreader.helpers import (
boxes2inputs, parse_logits, prepare_inputs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning, module="transformers")
inputs = boxes2inputs(boxes)
inputs = prepare_inputs(inputs, model)
logits = model(**inputs).logits.cpu().squeeze(0)
return parse_logits(logits, len(boxes))
def cal_block_index(fix_blocks, sorted_bboxes):
if sorted_bboxes is not None:
# 使用layoutreader排序
for block in fix_blocks:
line_index_list = []
if len(block['lines']) == 0:
block['index'] = sorted_bboxes.index(block['bbox'])
else:
for line in block['lines']:
line['index'] = sorted_bboxes.index(line['bbox'])
line_index_list.append(line['index'])
median_value = statistics.median(line_index_list)
block['index'] = median_value
# 删除图表body block中的虚拟line信息, 并用real_lines信息回填
if block['type'] in [BlockType.ImageBody, BlockType.TableBody, BlockType.Title, BlockType.InterlineEquation]:
if 'real_lines' in block:
block['virtual_lines'] = copy.deepcopy(block['lines'])
block['lines'] = copy.deepcopy(block['real_lines'])
del block['real_lines']
else:
# 使用xycut排序
block_bboxes = []
for block in fix_blocks:
# 如果block['bbox']任意值小于0,将其置为0
block['bbox'] = [max(0, x) for x in block['bbox']]
block_bboxes.append(block['bbox'])
# 删除图表body block中的虚拟line信息, 并用real_lines信息回填
if block['type'] in [BlockType.ImageBody, BlockType.TableBody, BlockType.Title, BlockType.InterlineEquation]:
if 'real_lines' in block:
block['virtual_lines'] = copy.deepcopy(block['lines'])
block['lines'] = copy.deepcopy(block['real_lines'])
del block['real_lines']
import numpy as np
from magic_pdf.model.sub_modules.reading_oreder.layoutreader.xycut import \
recursive_xy_cut
random_boxes = np.array(block_bboxes)
np.random.shuffle(random_boxes)
res = []
recursive_xy_cut(np.asarray(random_boxes).astype(int), np.arange(len(block_bboxes)), res)
assert len(res) == len(block_bboxes)
sorted_boxes = random_boxes[np.array(res)].tolist()
for i, block in enumerate(fix_blocks):
block['index'] = sorted_boxes.index(block['bbox'])
# 生成line index
sorted_blocks = sorted(fix_blocks, key=lambda b: b['index'])
line_inedx = 1
for block in sorted_blocks:
for line in block['lines']:
line['index'] = line_inedx
line_inedx += 1
return fix_blocks
def insert_lines_into_block(block_bbox, line_height, page_w, page_h):
# block_bbox是一个元组(x0, y0, x1, y1),其中(x0, y0)是左下角坐标,(x1, y1)是右上角坐标
x0, y0, x1, y1 = block_bbox
block_height = y1 - y0
block_weight = x1 - x0
# 如果block高度小于n行正文,则直接返回block的bbox
if line_height * 2 < block_height:
if (
block_height > page_h * 0.25 and page_w * 0.5 > block_weight > page_w * 0.25
): # 可能是双列结构,可以切细点
lines = int(block_height / line_height)
else:
# 如果block的宽度超过0.4页面宽度,则将block分成3行(是一种复杂布局,图不能切的太细)
if block_weight > page_w * 0.4:
lines = 3
elif block_weight > page_w * 0.25: # (可能是三列结构,也切细点)
lines = int(block_height / line_height)
else: # 判断长宽比
if block_height / block_weight > 1.2: # 细长的不分
return [[x0, y0, x1, y1]]
else: # 不细长的还是分成两行
lines = 2
line_height = (y1 - y0) / lines
# 确定从哪个y位置开始绘制线条
current_y = y0
# 用于存储线条的位置信息[(x0, y), ...]
lines_positions = []
for i in range(lines):
lines_positions.append([x0, current_y, x1, current_y + line_height])
current_y += line_height
return lines_positions
else:
return [[x0, y0, x1, y1]]
def sort_lines_by_model(fix_blocks, page_w, page_h, line_height, footnote_blocks):
page_line_list = []
def add_lines_to_block(b):
line_bboxes = insert_lines_into_block(b['bbox'], line_height, page_w, page_h)
b['lines'] = []
for line_bbox in line_bboxes:
b['lines'].append({'bbox': line_bbox, 'spans': []})
page_line_list.extend(line_bboxes)
for block in fix_blocks:
if block['type'] in [
BlockType.Text, BlockType.Title,
BlockType.ImageCaption, BlockType.ImageFootnote,
BlockType.TableCaption, BlockType.TableFootnote
]:
if len(block['lines']) == 0:
add_lines_to_block(block)
elif block['type'] in [BlockType.Title] and len(block['lines']) == 1 and (block['bbox'][3] - block['bbox'][1]) > line_height * 2:
block['real_lines'] = copy.deepcopy(block['lines'])
add_lines_to_block(block)
else:
for line in block['lines']:
bbox = line['bbox']
page_line_list.append(bbox)
elif block['type'] in [BlockType.ImageBody, BlockType.TableBody, BlockType.InterlineEquation]:
block['real_lines'] = copy.deepcopy(block['lines'])
add_lines_to_block(block)
for block in footnote_blocks:
footnote_block = {'bbox': block[:4]}
add_lines_to_block(footnote_block)
if len(page_line_list) > 200: # layoutreader最高支持512line
return None
# 使用layoutreader排序
x_scale = 1000.0 / page_w
y_scale = 1000.0 / page_h
boxes = []
# logger.info(f"Scale: {x_scale}, {y_scale}, Boxes len: {len(page_line_list)}")
for left, top, right, bottom in page_line_list:
if left < 0:
logger.warning(
f'left < 0, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}'
) # noqa: E501
left = 0
if right > page_w:
logger.warning(
f'right > page_w, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}'
) # noqa: E501
right = page_w
if top < 0:
logger.warning(
f'top < 0, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}'
) # noqa: E501
top = 0
if bottom > page_h:
logger.warning(
f'bottom > page_h, left: {left}, right: {right}, top: {top}, bottom: {bottom}, page_w: {page_w}, page_h: {page_h}'
) # noqa: E501
bottom = page_h
left = round(left * x_scale)
top = round(top * y_scale)
right = round(right * x_scale)
bottom = round(bottom * y_scale)
assert (
1000 >= right >= left >= 0 and 1000 >= bottom >= top >= 0
), f'Invalid box. right: {right}, left: {left}, bottom: {bottom}, top: {top}' # noqa: E126, E121
boxes.append([left, top, right, bottom])
model_manager = ModelSingleton()
model = model_manager.get_model('layoutreader')
with torch.no_grad():
orders = do_predict(boxes, model)
sorted_bboxes = [page_line_list[i] for i in orders]
return sorted_bboxes
def get_line_height(blocks):
page_line_height_list = []
for block in blocks:
if block['type'] in [
BlockType.Text, BlockType.Title,
BlockType.ImageCaption, BlockType.ImageFootnote,
BlockType.TableCaption, BlockType.TableFootnote
]:
for line in block['lines']:
bbox = line['bbox']
page_line_height_list.append(int(bbox[3] - bbox[1]))
if len(page_line_height_list) > 0:
return statistics.median(page_line_height_list)
else:
return 10
def process_groups(groups, body_key, caption_key, footnote_key):
body_blocks = []
caption_blocks = []
footnote_blocks = []
for i, group in enumerate(groups):
group[body_key]['group_id'] = i
body_blocks.append(group[body_key])
for caption_block in group[caption_key]:
caption_block['group_id'] = i
caption_blocks.append(caption_block)
for footnote_block in group[footnote_key]:
footnote_block['group_id'] = i
footnote_blocks.append(footnote_block)
return body_blocks, caption_blocks, footnote_blocks
def process_block_list(blocks, body_type, block_type):
indices = [block['index'] for block in blocks]
median_index = statistics.median(indices)
body_bbox = next((block['bbox'] for block in blocks if block.get('type') == body_type), [])
return {
'type': block_type,
'bbox': body_bbox,
'blocks': blocks,
'index': median_index,
}
def revert_group_blocks(blocks):
image_groups = {}
table_groups = {}
new_blocks = []
for block in blocks:
if block['type'] in [BlockType.ImageBody, BlockType.ImageCaption, BlockType.ImageFootnote]:
group_id = block['group_id']
if group_id not in image_groups:
image_groups[group_id] = []
image_groups[group_id].append(block)
elif block['type'] in [BlockType.TableBody, BlockType.TableCaption, BlockType.TableFootnote]:
group_id = block['group_id']
if group_id not in table_groups:
table_groups[group_id] = []
table_groups[group_id].append(block)
else:
new_blocks.append(block)
for group_id, blocks in image_groups.items():
new_blocks.append(process_block_list(blocks, BlockType.ImageBody, BlockType.Image))
for group_id, blocks in table_groups.items():
new_blocks.append(process_block_list(blocks, BlockType.TableBody, BlockType.Table))
return new_blocks
def remove_outside_spans(spans, all_bboxes, all_discarded_blocks):
def get_block_bboxes(blocks, block_type_list):
return [block[0:4] for block in blocks if block[7] in block_type_list]
image_bboxes = get_block_bboxes(all_bboxes, [BlockType.ImageBody])
table_bboxes = get_block_bboxes(all_bboxes, [BlockType.TableBody])
other_block_type = []
for block_type in BlockType.__dict__.values():
if not isinstance(block_type, str):
continue
if block_type not in [BlockType.ImageBody, BlockType.TableBody]:
other_block_type.append(block_type)
other_block_bboxes = get_block_bboxes(all_bboxes, other_block_type)
discarded_block_bboxes = get_block_bboxes(all_discarded_blocks, [BlockType.Discarded])
new_spans = []
for span in spans:
span_bbox = span['bbox']
span_type = span['type']
if any(calculate_overlap_area_in_bbox1_area_ratio(span_bbox, block_bbox) > 0.4 for block_bbox in
discarded_block_bboxes):
new_spans.append(span)
continue
if span_type == ContentType.Image:
if any(calculate_overlap_area_in_bbox1_area_ratio(span_bbox, block_bbox) > 0.5 for block_bbox in
image_bboxes):
new_spans.append(span)
elif span_type == ContentType.Table:
if any(calculate_overlap_area_in_bbox1_area_ratio(span_bbox, block_bbox) > 0.5 for block_bbox in
table_bboxes):
new_spans.append(span)
else:
if any(calculate_overlap_area_in_bbox1_area_ratio(span_bbox, block_bbox) > 0.5 for block_bbox in
other_block_bboxes):
new_spans.append(span)
return new_spans
def parse_page_core(
page_doc: PageableData, magic_model, page_id, pdf_bytes_md5, imageWriter, parse_mode, lang
):
need_drop = False
drop_reason = []
"""从magic_model对象中获取后面会用到的区块信息"""
img_groups = magic_model.get_imgs_v2(page_id)
table_groups = magic_model.get_tables_v2(page_id)
"""对image和table的区块分组"""
img_body_blocks, img_caption_blocks, img_footnote_blocks = process_groups(
img_groups, 'image_body', 'image_caption_list', 'image_footnote_list'
)
table_body_blocks, table_caption_blocks, table_footnote_blocks = process_groups(
table_groups, 'table_body', 'table_caption_list', 'table_footnote_list'
)
discarded_blocks = magic_model.get_discarded(page_id)
text_blocks = magic_model.get_text_blocks(page_id)
title_blocks = magic_model.get_title_blocks(page_id)
inline_equations, interline_equations, interline_equation_blocks = magic_model.get_equations(page_id)
page_w, page_h = magic_model.get_page_size(page_id)
def merge_title_blocks(blocks, x_distance_threshold=0.1*page_w):
def merge_two_bbox(b1, b2):
x_min = min(b1['bbox'][0], b2['bbox'][0])
y_min = min(b1['bbox'][1], b2['bbox'][1])
x_max = max(b1['bbox'][2], b2['bbox'][2])
y_max = max(b1['bbox'][3], b2['bbox'][3])
return x_min, y_min, x_max, y_max
def merge_two_blocks(b1, b2):
# 合并两个标题块的边界框
b1['bbox'] = merge_two_bbox(b1, b2)
# 合并两个标题块的文本内容
line1 = b1['lines'][0]
line2 = b2['lines'][0]
line1['bbox'] = merge_two_bbox(line1, line2)
line1['spans'].extend(line2['spans'])
return b1, b2
# 按 y 轴重叠度聚集标题块
y_overlapping_blocks = []
title_bs = [b for b in blocks if b['type'] == BlockType.Title]
while title_bs:
block1 = title_bs.pop(0)
current_row = [block1]
to_remove = []
for block2 in title_bs:
if (
__is_overlaps_y_exceeds_threshold(block1['bbox'], block2['bbox'], 0.9)
and len(block1['lines']) == 1
and len(block2['lines']) == 1
):
current_row.append(block2)
to_remove.append(block2)
for b in to_remove:
title_bs.remove(b)
y_overlapping_blocks.append(current_row)
# 按x轴坐标排序并合并标题块
to_remove_blocks = []
for row in y_overlapping_blocks:
if len(row) == 1:
continue
# 按x轴坐标排序
row.sort(key=lambda x: x['bbox'][0])
merged_block = row[0]
for i in range(1, len(row)):
left_block = merged_block
right_block = row[i]
left_height = left_block['bbox'][3] - left_block['bbox'][1]
right_height = right_block['bbox'][3] - right_block['bbox'][1]
if (
right_block['bbox'][0] - left_block['bbox'][2] < x_distance_threshold
and left_height * 0.95 < right_height < left_height * 1.05
):
merged_block, to_remove_block = merge_two_blocks(merged_block, right_block)
to_remove_blocks.append(to_remove_block)
else:
merged_block = right_block
for b in to_remove_blocks:
blocks.remove(b)
"""将所有区块的bbox整理到一起"""
# interline_equation_blocks参数不够准,后面切换到interline_equations上
interline_equation_blocks = []
if len(interline_equation_blocks) > 0:
all_bboxes, all_discarded_blocks, footnote_blocks = ocr_prepare_bboxes_for_layout_split_v2(
img_body_blocks, img_caption_blocks, img_footnote_blocks,
table_body_blocks, table_caption_blocks, table_footnote_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equation_blocks,
page_w,
page_h,
)
else:
all_bboxes, all_discarded_blocks, footnote_blocks = ocr_prepare_bboxes_for_layout_split_v2(
img_body_blocks, img_caption_blocks, img_footnote_blocks,
table_body_blocks, table_caption_blocks, table_footnote_blocks,
discarded_blocks,
text_blocks,
title_blocks,
interline_equations,
page_w,
page_h,
)
"""获取所有的spans信息"""
spans = magic_model.get_all_spans(page_id)
"""在删除重复span之前,应该通过image_body和table_body的block过滤一下image和table的span"""
"""顺便删除大水印并保留abandon的span"""
spans = remove_outside_spans(spans, all_bboxes, all_discarded_blocks)
"""删除重叠spans中置信度较低的那些"""
spans, dropped_spans_by_confidence = remove_overlaps_low_confidence_spans(spans)
"""删除重叠spans中较小的那些"""
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
"""根据parse_mode,构造spans,主要是文本类的字符填充"""
if parse_mode == SupportedPdfParseMethod.TXT:
"""使用新版本的混合ocr方案."""
spans = txt_spans_extract_v2(page_doc, spans, all_bboxes, all_discarded_blocks, lang)
elif parse_mode == SupportedPdfParseMethod.OCR:
pass
else:
raise Exception('parse_mode must be txt or ocr')
"""先处理不需要排版的discarded_blocks"""
discarded_block_with_spans, spans = fill_spans_in_blocks(
all_discarded_blocks, spans, 0.4
)
fix_discarded_blocks = fix_discarded_block(discarded_block_with_spans)
"""如果当前页面没有有效的bbox则跳过"""
if len(all_bboxes) == 0:
logger.warning(f'skip this page, not found useful bbox, page_id: {page_id}')
return ocr_construct_page_component_v2(
[],
[],
page_id,
page_w,
page_h,
[],
[],
[],
interline_equations,
fix_discarded_blocks,
need_drop,
drop_reason,
)
"""对image和table截图"""
spans = ocr_cut_image_and_table(
spans, page_doc, page_id, pdf_bytes_md5, imageWriter
)
"""span填充进block"""
block_with_spans, spans = fill_spans_in_blocks(all_bboxes, spans, 0.5)
"""对block进行fix操作"""
fix_blocks = fix_block_spans_v2(block_with_spans)
"""同一行被断开的titile合并"""
merge_title_blocks(fix_blocks)
"""获取所有line并计算正文line的高度"""
line_height = get_line_height(fix_blocks)
"""获取所有line并对line排序"""
sorted_bboxes = sort_lines_by_model(fix_blocks, page_w, page_h, line_height, footnote_blocks)
"""根据line的中位数算block的序列关系"""
fix_blocks = cal_block_index(fix_blocks, sorted_bboxes)
"""将image和table的block还原回group形式参与后续流程"""
fix_blocks = revert_group_blocks(fix_blocks)
"""重排block"""
sorted_blocks = sorted(fix_blocks, key=lambda b: b['index'])
"""block内重排(img和table的block内多个caption或footnote的排序)"""
for block in sorted_blocks:
if block['type'] in [BlockType.Image, BlockType.Table]:
block['blocks'] = sorted(block['blocks'], key=lambda b: b['index'])
"""获取QA需要外置的list"""
images, tables, interline_equations = get_qa_need_list_v2(sorted_blocks)
"""构造pdf_info_dict"""
page_info = ocr_construct_page_component_v2(
sorted_blocks,
[],
page_id,
page_w,
page_h,
[],
images,
tables,
interline_equations,
fix_discarded_blocks,
need_drop,
drop_reason,
)
return page_info
def pdf_parse_union(
model_list,
dataset: Dataset,
imageWriter,
parse_mode,
start_page_id=0,
end_page_id=None,
debug_mode=False,
lang=None,
):
pdf_bytes_md5 = compute_md5(dataset.data_bits())
"""初始化空的pdf_info_dict"""
pdf_info_dict = {}
"""用model_list和docs对象初始化magic_model"""
magic_model = MagicModel(model_list, dataset)
"""根据输入的起始范围解析pdf"""
end_page_id = (
end_page_id
if end_page_id is not None and end_page_id >= 0
else len(dataset) - 1
)
if end_page_id > len(dataset) - 1:
logger.warning('end_page_id is out of range, use pdf_docs length')
end_page_id = len(dataset) - 1
# """初始化启动时间"""
# start_time = time.time()
# for page_id, page in enumerate(dataset):
for page_id, page in tqdm(enumerate(dataset), total=len(dataset), desc="Processing pages"):
# """debug时输出每页解析的耗时."""
# if debug_mode:
# time_now = time.time()
# logger.info(
# f'page_id: {page_id}, last_page_cost_time: {round(time.time() - start_time, 2)}'
# )
# start_time = time_now
"""解析pdf中的每一页"""
if start_page_id <= page_id <= end_page_id:
page_info = parse_page_core(
page, magic_model, page_id, pdf_bytes_md5, imageWriter, parse_mode, lang
)
else:
page_info = page.get_page_info()
page_w = page_info.w
page_h = page_info.h
page_info = ocr_construct_page_component_v2(
[], [], page_id, page_w, page_h, [], [], [], [], [], True, 'skip page'
)
pdf_info_dict[f'page_{page_id}'] = page_info
need_ocr_list = []
img_crop_list = []
text_block_list = []
for pange_id, page_info in pdf_info_dict.items():
for block in page_info['preproc_blocks']:
if block['type'] in ['table', 'image']:
for sub_block in block['blocks']:
if sub_block['type'] in ['image_caption', 'image_footnote', 'table_caption', 'table_footnote']:
text_block_list.append(sub_block)
elif block['type'] in ['text', 'title']:
text_block_list.append(block)
for block in page_info['discarded_blocks']:
text_block_list.append(block)
for block in text_block_list:
for line in block['lines']:
for span in line['spans']:
if 'np_img' in span:
need_ocr_list.append(span)
img_crop_list.append(span['np_img'])
span.pop('np_img')
if len(img_crop_list) > 0:
# Get OCR results for this language's images
atom_model_manager = AtomModelSingleton()
ocr_model = atom_model_manager.get_atom_model(
atom_model_name='ocr',
ocr_show_log=False,
det_db_box_thresh=0.3,
lang=lang
)
# rec_start = time.time()
ocr_res_list = ocr_model.ocr(img_crop_list, det=False, tqdm_enable=True)[0]
# Verify we have matching counts
assert len(ocr_res_list) == len(need_ocr_list), f'ocr_res_list: {len(ocr_res_list)}, need_ocr_list: {len(need_ocr_list)}'
# Process OCR results for this language
for index, span in enumerate(need_ocr_list):
ocr_text, ocr_score = ocr_res_list[index]
span['content'] = ocr_text
span['score'] = float(f"{ocr_score:.3f}")
# rec_time = time.time() - rec_start
# logger.info(f'ocr-dynamic-rec time: {round(rec_time, 2)}, total images processed: {len(img_crop_list)}')
"""分段"""
para_split(pdf_info_dict)
"""llm优化"""
llm_aided_config = get_llm_aided_config()
if llm_aided_config is not None:
"""公式优化"""
formula_aided_config = llm_aided_config.get('formula_aided', None)
if formula_aided_config is not None:
if formula_aided_config.get('enable', False):
llm_aided_formula_start_time = time.time()
llm_aided_formula(pdf_info_dict, formula_aided_config)
logger.info(f'llm aided formula time: {round(time.time() - llm_aided_formula_start_time, 2)}')
"""文本优化"""
text_aided_config = llm_aided_config.get('text_aided', None)
if text_aided_config is not None:
if text_aided_config.get('enable', False):
llm_aided_text_start_time = time.time()
llm_aided_text(pdf_info_dict, text_aided_config)
logger.info(f'llm aided text time: {round(time.time() - llm_aided_text_start_time, 2)}')
"""标题优化"""
title_aided_config = llm_aided_config.get('title_aided', None)
if title_aided_config is not None:
if title_aided_config.get('enable', False):
llm_aided_title_start_time = time.time()
llm_aided_title(pdf_info_dict, title_aided_config)
logger.info(f'llm aided title time: {round(time.time() - llm_aided_title_start_time, 2)}')
"""dict转list"""
pdf_info_list = dict_to_list(pdf_info_dict)
new_pdf_info_dict = {
'pdf_info': pdf_info_list,
}
clean_memory(get_device())
return new_pdf_info_dict
if __name__ == '__main__':
pass
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment