Commit 4824c25b authored by wangsen's avatar wangsen
Browse files

Initial commit

parents
# -*- coding: utf-8 -*-
# @Time : 2019/12/8 13:14
# @Author : zhoujun
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2018/6/11 15:54
# @Author : zhoujun
import os
import sys
import pathlib
__dir__ = pathlib.Path(os.path.abspath(__file__))
sys.path.append(str(__dir__))
sys.path.append(str(__dir__.parent.parent))
import argparse
import time
import paddle
from tqdm.auto import tqdm
class EVAL():
def __init__(self, model_path, gpu_id=0):
from models import build_model
from data_loader import get_dataloader
from post_processing import get_post_processing
from utils import get_metric
self.gpu_id = gpu_id
if self.gpu_id is not None and isinstance(
self.gpu_id, int) and paddle.device.is_compiled_with_cuda():
paddle.device.set_device("gpu:{}".format(self.gpu_id))
else:
paddle.device.set_device("cpu")
checkpoint = paddle.load(model_path)
config = checkpoint['config']
config['arch']['backbone']['pretrained'] = False
self.validate_loader = get_dataloader(config['dataset']['validate'],
config['distributed'])
self.model = build_model(config['arch'])
self.model.set_state_dict(checkpoint['state_dict'])
self.post_process = get_post_processing(config['post_processing'])
self.metric_cls = get_metric(config['metric'])
def eval(self):
self.model.eval()
raw_metrics = []
total_frame = 0.0
total_time = 0.0
for i, batch in tqdm(
enumerate(self.validate_loader),
total=len(self.validate_loader),
desc='test model'):
with paddle.no_grad():
start = time.time()
preds = self.model(batch['img'])
boxes, scores = self.post_process(
batch,
preds,
is_output_polygon=self.metric_cls.is_output_polygon)
total_frame += batch['img'].shape[0]
total_time += time.time() - start
raw_metric = self.metric_cls.validate_measure(batch,
(boxes, scores))
raw_metrics.append(raw_metric)
metrics = self.metric_cls.gather_measure(raw_metrics)
print('FPS:{}'.format(total_frame / total_time))
return {
'recall': metrics['recall'].avg,
'precision': metrics['precision'].avg,
'fmeasure': metrics['fmeasure'].avg
}
def init_args():
parser = argparse.ArgumentParser(description='DBNet.paddle')
parser.add_argument(
'--model_path',
required=False,
default='output/DBNet_resnet18_FPN_DBHead/checkpoint/1.pth',
type=str)
args = parser.parse_args()
return args
if __name__ == '__main__':
args = init_args()
eval = EVAL(args.model_path)
result = eval.eval()
print(result)
import os
import sys
__dir__ = os.path.dirname(os.path.abspath(__file__))
sys.path.append(__dir__)
sys.path.insert(0, os.path.abspath(os.path.join(__dir__, "..")))
import argparse
import paddle
from paddle.jit import to_static
from models import build_model
from utils import Config, ArgsParser
def init_args():
parser = ArgsParser()
args = parser.parse_args()
return args
def load_checkpoint(model, checkpoint_path):
"""
load checkpoints
:param checkpoint_path: Checkpoint path to be loaded
"""
checkpoint = paddle.load(checkpoint_path)
model.set_state_dict(checkpoint['state_dict'])
print('load checkpoint from {}'.format(checkpoint_path))
def main(config):
model = build_model(config['arch'])
load_checkpoint(model, config['trainer']['resume_checkpoint'])
model.eval()
save_path = config["trainer"]["output_dir"]
save_path = os.path.join(save_path, "inference")
infer_shape = [3, -1, -1]
model = to_static(
model,
input_spec=[
paddle.static.InputSpec(
shape=[None] + infer_shape, dtype="float32")
])
paddle.jit.save(model, save_path)
print("inference model is saved to {}".format(save_path))
if __name__ == "__main__":
args = init_args()
assert os.path.exists(args.config_file)
config = Config(args.config_file)
config.merge_dict(args.opt)
main(config.cfg)
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import pathlib
__dir__ = pathlib.Path(os.path.abspath(__file__))
sys.path.append(str(__dir__))
sys.path.append(str(__dir__.parent.parent))
import cv2
import paddle
from paddle import inference
import numpy as np
from PIL import Image
from paddle.vision import transforms
from tools.predict import resize_image
from post_processing import get_post_processing
from utils.util import draw_bbox, save_result
class InferenceEngine(object):
"""InferenceEngine
Inference engina class which contains preprocess, run, postprocess
"""
def __init__(self, args):
"""
Args:
args: Parameters generated using argparser.
Returns: None
"""
super().__init__()
self.args = args
# init inference engine
self.predictor, self.config, self.input_tensor, self.output_tensor = self.load_predictor(
os.path.join(args.model_dir, "inference.pdmodel"),
os.path.join(args.model_dir, "inference.pdiparams"))
# build transforms
self.transforms = transforms.Compose([
transforms.ToTensor(), transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# wamrup
if self.args.warmup > 0:
for idx in range(args.warmup):
print(idx)
x = np.random.rand(1, 3, self.args.crop_size,
self.args.crop_size).astype("float32")
self.input_tensor.copy_from_cpu(x)
self.predictor.run()
self.output_tensor.copy_to_cpu()
self.post_process = get_post_processing({
'type': 'SegDetectorRepresenter',
'args': {
'thresh': 0.3,
'box_thresh': 0.7,
'max_candidates': 1000,
'unclip_ratio': 1.5
}
})
def load_predictor(self, model_file_path, params_file_path):
"""load_predictor
initialize the inference engine
Args:
model_file_path: inference model path (*.pdmodel)
model_file_path: inference parmaeter path (*.pdiparams)
Return:
predictor: Predictor created using Paddle Inference.
config: Configuration of the predictor.
input_tensor: Input tensor of the predictor.
output_tensor: Output tensor of the predictor.
"""
args = self.args
config = inference.Config(model_file_path, params_file_path)
if args.use_gpu:
config.enable_use_gpu(1000, 0)
if args.use_tensorrt:
config.enable_tensorrt_engine(
workspace_size=1 << 30,
precision_mode=precision,
max_batch_size=args.max_batch_size,
min_subgraph_size=args.
min_subgraph_size, # skip the minmum trt subgraph
use_calib_mode=False)
# collect shape
trt_shape_f = os.path.join(model_dir, "_trt_dynamic_shape.txt")
if not os.path.exists(trt_shape_f):
config.collect_shape_range_info(trt_shape_f)
logger.info(
f"collect dynamic shape info into : {trt_shape_f}")
try:
config.enable_tuned_tensorrt_dynamic_shape(trt_shape_f,
True)
except Exception as E:
logger.info(E)
logger.info("Please keep your paddlepaddle-gpu >= 2.3.0!")
else:
config.disable_gpu()
# The thread num should not be greater than the number of cores in the CPU.
if args.enable_mkldnn:
# cache 10 different shapes for mkldnn to avoid memory leak
config.set_mkldnn_cache_capacity(10)
config.enable_mkldnn()
if args.precision == "fp16":
config.enable_mkldnn_bfloat16()
if hasattr(args, "cpu_threads"):
config.set_cpu_math_library_num_threads(args.cpu_threads)
else:
# default cpu threads as 10
config.set_cpu_math_library_num_threads(10)
# enable memory optim
config.enable_memory_optim()
config.disable_glog_info()
config.switch_use_feed_fetch_ops(False)
config.switch_ir_optim(True)
# create predictor
predictor = inference.create_predictor(config)
# get input and output tensor property
input_names = predictor.get_input_names()
input_tensor = predictor.get_input_handle(input_names[0])
output_names = predictor.get_output_names()
output_tensor = predictor.get_output_handle(output_names[0])
return predictor, config, input_tensor, output_tensor
def preprocess(self, img_path, short_size):
"""preprocess
Preprocess to the input.
Args:
img_path: Image path.
Returns: Input data after preprocess.
"""
img = cv2.imread(img_path, 1)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
h, w = img.shape[:2]
img = resize_image(img, short_size)
img = self.transforms(img)
img = np.expand_dims(img, axis=0)
shape_info = {'shape': [(h, w)]}
return img, shape_info
def postprocess(self, x, shape_info, is_output_polygon):
"""postprocess
Postprocess to the inference engine output.
Args:
x: Inference engine output.
Returns: Output data after argmax.
"""
box_list, score_list = self.post_process(
shape_info, x, is_output_polygon=is_output_polygon)
box_list, score_list = box_list[0], score_list[0]
if len(box_list) > 0:
if is_output_polygon:
idx = [x.sum() > 0 for x in box_list]
box_list = [box_list[i] for i, v in enumerate(idx) if v]
score_list = [score_list[i] for i, v in enumerate(idx) if v]
else:
idx = box_list.reshape(box_list.shape[0], -1).sum(
axis=1) > 0 # 去掉全为0的框
box_list, score_list = box_list[idx], score_list[idx]
else:
box_list, score_list = [], []
return box_list, score_list
def run(self, x):
"""run
Inference process using inference engine.
Args:
x: Input data after preprocess.
Returns: Inference engine output
"""
self.input_tensor.copy_from_cpu(x)
self.predictor.run()
output = self.output_tensor.copy_to_cpu()
return output
def get_args(add_help=True):
"""
parse args
"""
import argparse
def str2bool(v):
return v.lower() in ("true", "t", "1")
parser = argparse.ArgumentParser(
description="PaddlePaddle Classification Training", add_help=add_help)
parser.add_argument("--model_dir", default=None, help="inference model dir")
parser.add_argument("--batch_size", type=int, default=1)
parser.add_argument(
"--short_size", default=1024, type=int, help="short size")
parser.add_argument("--img_path", default="./images/demo.jpg")
parser.add_argument(
"--benchmark", default=False, type=str2bool, help="benchmark")
parser.add_argument("--warmup", default=0, type=int, help="warmup iter")
parser.add_argument(
'--polygon', action='store_true', help='output polygon or box')
parser.add_argument("--use_gpu", type=str2bool, default=True)
parser.add_argument("--use_tensorrt", type=str2bool, default=False)
parser.add_argument("--precision", type=str, default="fp32")
parser.add_argument("--gpu_mem", type=int, default=500)
parser.add_argument("--gpu_id", type=int, default=0)
parser.add_argument("--enable_mkldnn", type=str2bool, default=False)
parser.add_argument("--cpu_threads", type=int, default=10)
args = parser.parse_args()
return args
def main(args):
"""
Main inference function.
Args:
args: Parameters generated using argparser.
Returns:
class_id: Class index of the input.
prob: : Probability of the input.
"""
inference_engine = InferenceEngine(args)
# init benchmark
if args.benchmark:
import auto_log
autolog = auto_log.AutoLogger(
model_name="db",
batch_size=args.batch_size,
inference_config=inference_engine.config,
gpu_ids="auto" if args.use_gpu else None)
# enable benchmark
if args.benchmark:
autolog.times.start()
# preprocess
img, shape_info = inference_engine.preprocess(args.img_path,
args.short_size)
if args.benchmark:
autolog.times.stamp()
output = inference_engine.run(img)
if args.benchmark:
autolog.times.stamp()
# postprocess
box_list, score_list = inference_engine.postprocess(output, shape_info,
args.polygon)
if args.benchmark:
autolog.times.stamp()
autolog.times.end(stamp=True)
autolog.report()
img = draw_bbox(cv2.imread(args.img_path)[:, :, ::-1], box_list)
# 保存结果到路径
os.makedirs('output', exist_ok=True)
img_path = pathlib.Path(args.img_path)
output_path = os.path.join('output', img_path.stem + '_infer_result.jpg')
cv2.imwrite(output_path, img[:, :, ::-1])
save_result(
output_path.replace('_infer_result.jpg', '.txt'), box_list, score_list,
args.polygon)
if __name__ == "__main__":
args = get_args()
main(args)
# -*- coding: utf-8 -*-
# @Time : 2019/8/24 12:06
# @Author : zhoujun
import os
import sys
import pathlib
__dir__ = pathlib.Path(os.path.abspath(__file__))
sys.path.append(str(__dir__))
sys.path.append(str(__dir__.parent.parent))
import time
import cv2
import paddle
from data_loader import get_transforms
from models import build_model
from post_processing import get_post_processing
def resize_image(img, short_size):
height, width, _ = img.shape
if height < width:
new_height = short_size
new_width = new_height / height * width
else:
new_width = short_size
new_height = new_width / width * height
new_height = int(round(new_height / 32) * 32)
new_width = int(round(new_width / 32) * 32)
resized_img = cv2.resize(img, (new_width, new_height))
return resized_img
class PaddleModel:
def __init__(self, model_path, post_p_thre=0.7, gpu_id=None):
'''
初始化模型
:param model_path: 模型地址(可以是模型的参数或者参数和计算图一起保存的文件)
:param gpu_id: 在哪一块gpu上运行
'''
self.gpu_id = gpu_id
if self.gpu_id is not None and isinstance(
self.gpu_id, int) and paddle.device.is_compiled_with_cuda():
paddle.device.set_device("gpu:{}".format(self.gpu_id))
else:
paddle.device.set_device("cpu")
checkpoint = paddle.load(model_path)
config = checkpoint['config']
config['arch']['backbone']['pretrained'] = False
self.model = build_model(config['arch'])
self.post_process = get_post_processing(config['post_processing'])
self.post_process.box_thresh = post_p_thre
self.img_mode = config['dataset']['train']['dataset']['args'][
'img_mode']
self.model.set_state_dict(checkpoint['state_dict'])
self.model.eval()
self.transform = []
for t in config['dataset']['train']['dataset']['args']['transforms']:
if t['type'] in ['ToTensor', 'Normalize']:
self.transform.append(t)
self.transform = get_transforms(self.transform)
def predict(self,
img_path: str,
is_output_polygon=False,
short_size: int=1024):
'''
对传入的图像进行预测,支持图像地址,opecv 读取图片,偏慢
:param img_path: 图像地址
:param is_numpy:
:return:
'''
assert os.path.exists(img_path), 'file is not exists'
img = cv2.imread(img_path, 1 if self.img_mode != 'GRAY' else 0)
if self.img_mode == 'RGB':
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
h, w = img.shape[:2]
img = resize_image(img, short_size)
# 将图片由(w,h)变为(1,img_channel,h,w)
tensor = self.transform(img)
tensor = tensor.unsqueeze_(0)
batch = {'shape': [(h, w)]}
with paddle.no_grad():
start = time.time()
preds = self.model(tensor)
box_list, score_list = self.post_process(
batch, preds, is_output_polygon=is_output_polygon)
box_list, score_list = box_list[0], score_list[0]
if len(box_list) > 0:
if is_output_polygon:
idx = [x.sum() > 0 for x in box_list]
box_list = [box_list[i] for i, v in enumerate(idx) if v]
score_list = [score_list[i] for i, v in enumerate(idx) if v]
else:
idx = box_list.reshape(box_list.shape[0], -1).sum(
axis=1) > 0 # 去掉全为0的框
box_list, score_list = box_list[idx], score_list[idx]
else:
box_list, score_list = [], []
t = time.time() - start
return preds[0, 0, :, :].detach().cpu().numpy(), box_list, score_list, t
def save_depoly(net, input, save_path):
input_spec = [
paddle.static.InputSpec(
shape=[None, 3, None, None], dtype="float32")
]
net = paddle.jit.to_static(net, input_spec=input_spec)
# save static model for inference directly
paddle.jit.save(net, save_path)
def init_args():
import argparse
parser = argparse.ArgumentParser(description='DBNet.paddle')
parser.add_argument('--model_path', default=r'model_best.pth', type=str)
parser.add_argument(
'--input_folder',
default='./test/input',
type=str,
help='img path for predict')
parser.add_argument(
'--output_folder',
default='./test/output',
type=str,
help='img path for output')
parser.add_argument('--gpu', default=0, type=int, help='gpu for inference')
parser.add_argument(
'--thre', default=0.3, type=float, help='the thresh of post_processing')
parser.add_argument(
'--polygon', action='store_true', help='output polygon or box')
parser.add_argument('--show', action='store_true', help='show result')
parser.add_argument(
'--save_result',
action='store_true',
help='save box and score to txt file')
args = parser.parse_args()
return args
if __name__ == '__main__':
import pathlib
from tqdm import tqdm
import matplotlib.pyplot as plt
from utils.util import show_img, draw_bbox, save_result, get_image_file_list
args = init_args()
print(args)
# 初始化网络
model = PaddleModel(args.model_path, post_p_thre=args.thre, gpu_id=args.gpu)
img_folder = pathlib.Path(args.input_folder)
for img_path in tqdm(get_image_file_list(args.input_folder)):
preds, boxes_list, score_list, t = model.predict(
img_path, is_output_polygon=args.polygon)
img = draw_bbox(cv2.imread(img_path)[:, :, ::-1], boxes_list)
if args.show:
show_img(preds)
show_img(img, title=os.path.basename(img_path))
plt.show()
# 保存结果到路径
os.makedirs(args.output_folder, exist_ok=True)
img_path = pathlib.Path(img_path)
output_path = os.path.join(args.output_folder,
img_path.stem + '_result.jpg')
pred_path = os.path.join(args.output_folder,
img_path.stem + '_pred.jpg')
cv2.imwrite(output_path, img[:, :, ::-1])
cv2.imwrite(pred_path, preds * 255)
save_result(
output_path.replace('_result.jpg', '.txt'), boxes_list, score_list,
args.polygon)
import os
import sys
import pathlib
__dir__ = pathlib.Path(os.path.abspath(__file__))
sys.path.append(str(__dir__))
sys.path.append(str(__dir__.parent.parent))
import paddle
import paddle.distributed as dist
from utils import Config, ArgsParser
def init_args():
parser = ArgsParser()
args = parser.parse_args()
return args
def main(config, profiler_options):
from models import build_model, build_loss
from data_loader import get_dataloader
from trainer import Trainer
from post_processing import get_post_processing
from utils import get_metric
if paddle.device.cuda.device_count() > 1:
dist.init_parallel_env()
config['distributed'] = True
else:
config['distributed'] = False
train_loader = get_dataloader(config['dataset']['train'],
config['distributed'])
assert train_loader is not None
if 'validate' in config['dataset']:
validate_loader = get_dataloader(config['dataset']['validate'], False)
else:
validate_loader = None
criterion = build_loss(config['loss'])
config['arch']['backbone']['in_channels'] = 3 if config['dataset']['train'][
'dataset']['args']['img_mode'] != 'GRAY' else 1
model = build_model(config['arch'])
# set @to_static for benchmark, skip this by default.
post_p = get_post_processing(config['post_processing'])
metric = get_metric(config['metric'])
trainer = Trainer(
config=config,
model=model,
criterion=criterion,
train_loader=train_loader,
post_process=post_p,
metric_cls=metric,
validate_loader=validate_loader,
profiler_options=profiler_options)
trainer.train()
if __name__ == '__main__':
args = init_args()
assert os.path.exists(args.config_file)
config = Config(args.config_file)
config.merge_dict(args.opt)
main(config.cfg, args.profiler_options)
# -*- coding: utf-8 -*-
# @Time : 2019/8/23 21:58
# @Author : zhoujun
from .trainer import Trainer
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2019/8/23 21:58
# @Author : zhoujun
import time
import paddle
from tqdm import tqdm
from base import BaseTrainer
from utils import runningScore, cal_text_score, Polynomial, profiler
class Trainer(BaseTrainer):
def __init__(self,
config,
model,
criterion,
train_loader,
validate_loader,
metric_cls,
post_process=None,
profiler_options=None):
super(Trainer, self).__init__(config, model, criterion, train_loader,
validate_loader, metric_cls, post_process)
self.profiler_options = profiler_options
self.enable_eval = config['trainer'].get('enable_eval', True)
def _train_epoch(self, epoch):
self.model.train()
total_samples = 0
train_reader_cost = 0.0
train_batch_cost = 0.0
reader_start = time.time()
epoch_start = time.time()
train_loss = 0.
running_metric_text = runningScore(2)
for i, batch in enumerate(self.train_loader):
profiler.add_profiler_step(self.profiler_options)
if i >= self.train_loader_len:
break
self.global_step += 1
lr = self.optimizer.get_lr()
cur_batch_size = batch['img'].shape[0]
train_reader_cost += time.time() - reader_start
if self.amp:
with paddle.amp.auto_cast(
enable='gpu' in paddle.device.get_device(),
custom_white_list=self.amp.get('custom_white_list', []),
custom_black_list=self.amp.get('custom_black_list', []),
level=self.amp.get('level', 'O2')):
preds = self.model(batch['img'])
loss_dict = self.criterion(preds.astype(paddle.float32), batch)
scaled_loss = self.amp['scaler'].scale(loss_dict['loss'])
scaled_loss.backward()
self.amp['scaler'].minimize(self.optimizer, scaled_loss)
else:
preds = self.model(batch['img'])
loss_dict = self.criterion(preds, batch)
# backward
loss_dict['loss'].backward()
self.optimizer.step()
self.lr_scheduler.step()
self.optimizer.clear_grad()
train_batch_time = time.time() - reader_start
train_batch_cost += train_batch_time
total_samples += cur_batch_size
# acc iou
score_shrink_map = cal_text_score(
preds[:, 0, :, :],
batch['shrink_map'],
batch['shrink_mask'],
running_metric_text,
thred=self.config['post_processing']['args']['thresh'])
# loss 和 acc 记录到日志
loss_str = 'loss: {:.4f}, '.format(loss_dict['loss'].item())
for idx, (key, value) in enumerate(loss_dict.items()):
loss_dict[key] = value.item()
if key == 'loss':
continue
loss_str += '{}: {:.4f}'.format(key, loss_dict[key])
if idx < len(loss_dict) - 1:
loss_str += ', '
train_loss += loss_dict['loss']
acc = score_shrink_map['Mean Acc']
iou_shrink_map = score_shrink_map['Mean IoU']
if self.global_step % self.log_iter == 0:
self.logger_info(
'[{}/{}], [{}/{}], global_step: {}, ips: {:.1f} samples/sec, avg_reader_cost: {:.5f} s, avg_batch_cost: {:.5f} s, avg_samples: {}, acc: {:.4f}, iou_shrink_map: {:.4f}, {}lr:{:.6}, time:{:.2f}'.
format(epoch, self.epochs, i + 1, self.train_loader_len,
self.global_step, total_samples / train_batch_cost,
train_reader_cost / self.log_iter, train_batch_cost /
self.log_iter, total_samples / self.log_iter, acc,
iou_shrink_map, loss_str, lr, train_batch_cost))
total_samples = 0
train_reader_cost = 0.0
train_batch_cost = 0.0
if self.visualdl_enable and paddle.distributed.get_rank() == 0:
# write tensorboard
for key, value in loss_dict.items():
self.writer.add_scalar('TRAIN/LOSS/{}'.format(key), value,
self.global_step)
self.writer.add_scalar('TRAIN/ACC_IOU/acc', acc,
self.global_step)
self.writer.add_scalar('TRAIN/ACC_IOU/iou_shrink_map',
iou_shrink_map, self.global_step)
self.writer.add_scalar('TRAIN/lr', lr, self.global_step)
reader_start = time.time()
return {
'train_loss': train_loss / self.train_loader_len,
'lr': lr,
'time': time.time() - epoch_start,
'epoch': epoch
}
def _eval(self, epoch):
self.model.eval()
raw_metrics = []
total_frame = 0.0
total_time = 0.0
for i, batch in tqdm(
enumerate(self.validate_loader),
total=len(self.validate_loader),
desc='test model'):
with paddle.no_grad():
start = time.time()
if self.amp:
with paddle.amp.auto_cast(
enable='gpu' in paddle.device.get_device(),
custom_white_list=self.amp.get('custom_white_list',
[]),
custom_black_list=self.amp.get('custom_black_list',
[]),
level=self.amp.get('level', 'O2')):
preds = self.model(batch['img'])
preds = preds.astype(paddle.float32)
else:
preds = self.model(batch['img'])
boxes, scores = self.post_process(
batch,
preds,
is_output_polygon=self.metric_cls.is_output_polygon)
total_frame += batch['img'].shape[0]
total_time += time.time() - start
raw_metric = self.metric_cls.validate_measure(batch,
(boxes, scores))
raw_metrics.append(raw_metric)
metrics = self.metric_cls.gather_measure(raw_metrics)
self.logger_info('FPS:{}'.format(total_frame / total_time))
return metrics['recall'].avg, metrics['precision'].avg, metrics[
'fmeasure'].avg
def _on_epoch_finish(self):
self.logger_info('[{}/{}], train_loss: {:.4f}, time: {:.4f}, lr: {}'.
format(self.epoch_result['epoch'], self.epochs, self.
epoch_result['train_loss'], self.epoch_result[
'time'], self.epoch_result['lr']))
net_save_path = '{}/model_latest.pth'.format(self.checkpoint_dir)
net_save_path_best = '{}/model_best.pth'.format(self.checkpoint_dir)
if paddle.distributed.get_rank() == 0:
self._save_checkpoint(self.epoch_result['epoch'], net_save_path)
save_best = False
if self.validate_loader is not None and self.metric_cls is not None and self.enable_eval: # 使用f1作为最优模型指标
recall, precision, hmean = self._eval(self.epoch_result[
'epoch'])
if self.visualdl_enable:
self.writer.add_scalar('EVAL/recall', recall,
self.global_step)
self.writer.add_scalar('EVAL/precision', precision,
self.global_step)
self.writer.add_scalar('EVAL/hmean', hmean,
self.global_step)
self.logger_info(
'test: recall: {:.6f}, precision: {:.6f}, hmean: {:.6f}'.
format(recall, precision, hmean))
if hmean >= self.metrics['hmean']:
save_best = True
self.metrics['train_loss'] = self.epoch_result['train_loss']
self.metrics['hmean'] = hmean
self.metrics['precision'] = precision
self.metrics['recall'] = recall
self.metrics['best_model_epoch'] = self.epoch_result[
'epoch']
else:
if self.epoch_result['train_loss'] <= self.metrics[
'train_loss']:
save_best = True
self.metrics['train_loss'] = self.epoch_result['train_loss']
self.metrics['best_model_epoch'] = self.epoch_result[
'epoch']
best_str = 'current best, '
for k, v in self.metrics.items():
best_str += '{}: {:.6f}, '.format(k, v)
self.logger_info(best_str)
if save_best:
import shutil
shutil.copy(net_save_path, net_save_path_best)
self.logger_info("Saving current best: {}".format(
net_save_path_best))
else:
self.logger_info("Saving checkpoint: {}".format(net_save_path))
def _on_train_finish(self):
if self.enable_eval:
for k, v in self.metrics.items():
self.logger_info('{}:{}'.format(k, v))
self.logger_info('finish train')
def _initialize_scheduler(self):
if self.config['lr_scheduler']['type'] == 'Polynomial':
self.config['lr_scheduler']['args']['epochs'] = self.config[
'trainer']['epochs']
self.config['lr_scheduler']['args']['step_each_epoch'] = len(
self.train_loader)
self.lr_scheduler = Polynomial(
**self.config['lr_scheduler']['args'])()
else:
self.lr_scheduler = self._initialize('lr_scheduler',
paddle.optimizer.lr)
# -*- coding: utf-8 -*-
# @Time : 2019/8/23 21:58
# @Author : zhoujun
from .util import *
from .metrics import *
from .schedulers import *
from .cal_recall.script import cal_recall_precison_f1
from .ocr_metric import get_metric
# -*- coding: utf-8 -*-
# @Time : 1/16/19 6:40 AM
# @Author : zhoujun
from .script import cal_recall_precison_f1
__all__ = ['cal_recall_precison_f1']
#!/usr/bin/env python2
#encoding: UTF-8
import json
import sys
sys.path.append('./')
import zipfile
import re
import sys
import os
import codecs
import traceback
import numpy as np
from utils import order_points_clockwise
def print_help():
sys.stdout.write(
'Usage: python %s.py -g=<gtFile> -s=<submFile> [-o=<outputFolder> -p=<jsonParams>]'
% sys.argv[0])
sys.exit(2)
def load_zip_file_keys(file, fileNameRegExp=''):
"""
Returns an array with the entries of the ZIP file that match with the regular expression.
The key's are the names or the file or the capturing group definied in the fileNameRegExp
"""
try:
archive = zipfile.ZipFile(file, mode='r', allowZip64=True)
except:
raise Exception('Error loading the ZIP archive.')
pairs = []
for name in archive.namelist():
addFile = True
keyName = name
if fileNameRegExp != "":
m = re.match(fileNameRegExp, name)
if m == None:
addFile = False
else:
if len(m.groups()) > 0:
keyName = m.group(1)
if addFile:
pairs.append(keyName)
return pairs
def load_zip_file(file, fileNameRegExp='', allEntries=False):
"""
Returns an array with the contents (filtered by fileNameRegExp) of a ZIP file.
The key's are the names or the file or the capturing group definied in the fileNameRegExp
allEntries validates that all entries in the ZIP file pass the fileNameRegExp
"""
try:
archive = zipfile.ZipFile(file, mode='r', allowZip64=True)
except:
raise Exception('Error loading the ZIP archive')
pairs = []
for name in archive.namelist():
addFile = True
keyName = name
if fileNameRegExp != "":
m = re.match(fileNameRegExp, name)
if m == None:
addFile = False
else:
if len(m.groups()) > 0:
keyName = m.group(1)
if addFile:
pairs.append([keyName, archive.read(name)])
else:
if allEntries:
raise Exception('ZIP entry not valid: %s' % name)
return dict(pairs)
def load_folder_file(file, fileNameRegExp='', allEntries=False):
"""
Returns an array with the contents (filtered by fileNameRegExp) of a ZIP file.
The key's are the names or the file or the capturing group definied in the fileNameRegExp
allEntries validates that all entries in the ZIP file pass the fileNameRegExp
"""
pairs = []
for name in os.listdir(file):
addFile = True
keyName = name
if fileNameRegExp != "":
m = re.match(fileNameRegExp, name)
if m == None:
addFile = False
else:
if len(m.groups()) > 0:
keyName = m.group(1)
if addFile:
pairs.append([keyName, open(os.path.join(file, name)).read()])
else:
if allEntries:
raise Exception('ZIP entry not valid: %s' % name)
return dict(pairs)
def decode_utf8(raw):
"""
Returns a Unicode object on success, or None on failure
"""
try:
raw = codecs.decode(raw, 'utf-8', 'replace')
#extracts BOM if exists
raw = raw.encode('utf8')
if raw.startswith(codecs.BOM_UTF8):
raw = raw.replace(codecs.BOM_UTF8, '', 1)
return raw.decode('utf-8')
except:
return None
def validate_lines_in_file(fileName,
file_contents,
CRLF=True,
LTRB=True,
withTranscription=False,
withConfidence=False,
imWidth=0,
imHeight=0):
"""
This function validates that all lines of the file calling the Line validation function for each line
"""
utf8File = decode_utf8(file_contents)
if (utf8File is None):
raise Exception("The file %s is not UTF-8" % fileName)
lines = utf8File.split("\r\n" if CRLF else "\n")
for line in lines:
line = line.replace("\r", "").replace("\n", "")
if (line != ""):
try:
validate_tl_line(line, LTRB, withTranscription, withConfidence,
imWidth, imHeight)
except Exception as e:
raise Exception(
("Line in sample not valid. Sample: %s Line: %s Error: %s" %
(fileName, line, str(e))).encode('utf-8', 'replace'))
def validate_tl_line(line,
LTRB=True,
withTranscription=True,
withConfidence=True,
imWidth=0,
imHeight=0):
"""
Validate the format of the line. If the line is not valid an exception will be raised.
If maxWidth and maxHeight are specified, all points must be inside the imgage bounds.
Posible values are:
LTRB=True: xmin,ymin,xmax,ymax[,confidence][,transcription]
LTRB=False: x1,y1,x2,y2,x3,y3,x4,y4[,confidence][,transcription]
"""
get_tl_line_values(line, LTRB, withTranscription, withConfidence, imWidth,
imHeight)
def get_tl_line_values(line,
LTRB=True,
withTranscription=False,
withConfidence=False,
imWidth=0,
imHeight=0):
"""
Validate the format of the line. If the line is not valid an exception will be raised.
If maxWidth and maxHeight are specified, all points must be inside the imgage bounds.
Posible values are:
LTRB=True: xmin,ymin,xmax,ymax[,confidence][,transcription]
LTRB=False: x1,y1,x2,y2,x3,y3,x4,y4[,confidence][,transcription]
Returns values from a textline. Points , [Confidences], [Transcriptions]
"""
confidence = 0.0
transcription = ""
points = []
numPoints = 4
if LTRB:
numPoints = 4
if withTranscription and withConfidence:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-1].?[0-9]*)\s*,(.*)$',
line)
if m == None:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-1].?[0-9]*)\s*,(.*)$',
line)
raise Exception(
"Format incorrect. Should be: xmin,ymin,xmax,ymax,confidence,transcription"
)
elif withConfidence:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-1].?[0-9]*)\s*$',
line)
if m == None:
raise Exception(
"Format incorrect. Should be: xmin,ymin,xmax,ymax,confidence"
)
elif withTranscription:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*,(.*)$',
line)
if m == None:
raise Exception(
"Format incorrect. Should be: xmin,ymin,xmax,ymax,transcription"
)
else:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*,?\s*$',
line)
if m == None:
raise Exception(
"Format incorrect. Should be: xmin,ymin,xmax,ymax")
xmin = int(m.group(1))
ymin = int(m.group(2))
xmax = int(m.group(3))
ymax = int(m.group(4))
if (xmax < xmin):
raise Exception("Xmax value (%s) not valid (Xmax < Xmin)." % (xmax))
if (ymax < ymin):
raise Exception("Ymax value (%s) not valid (Ymax < Ymin)." %
(ymax))
points = [float(m.group(i)) for i in range(1, (numPoints + 1))]
if (imWidth > 0 and imHeight > 0):
validate_point_inside_bounds(xmin, ymin, imWidth, imHeight)
validate_point_inside_bounds(xmax, ymax, imWidth, imHeight)
else:
numPoints = 8
if withTranscription and withConfidence:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*([0-1].?[0-9]*)\s*,(.*)$',
line)
if m == None:
raise Exception(
"Format incorrect. Should be: x1,y1,x2,y2,x3,y3,x4,y4,confidence,transcription"
)
elif withConfidence:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*([0-1].?[0-9]*)\s*$',
line)
if m == None:
raise Exception(
"Format incorrect. Should be: x1,y1,x2,y2,x3,y3,x4,y4,confidence"
)
elif withTranscription:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,(.*)$',
line)
if m == None:
raise Exception(
"Format incorrect. Should be: x1,y1,x2,y2,x3,y3,x4,y4,transcription"
)
else:
m = re.match(
r'^\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*,\s*(-?[0-9]+)\s*$',
line)
if m == None:
raise Exception(
"Format incorrect. Should be: x1,y1,x2,y2,x3,y3,x4,y4")
points = [float(m.group(i)) for i in range(1, (numPoints + 1))]
points = order_points_clockwise(np.array(points).reshape(-1,
2)).reshape(-1)
validate_clockwise_points(points)
if (imWidth > 0 and imHeight > 0):
validate_point_inside_bounds(points[0], points[1], imWidth,
imHeight)
validate_point_inside_bounds(points[2], points[3], imWidth,
imHeight)
validate_point_inside_bounds(points[4], points[5], imWidth,
imHeight)
validate_point_inside_bounds(points[6], points[7], imWidth,
imHeight)
if withConfidence:
try:
confidence = float(m.group(numPoints + 1))
except ValueError:
raise Exception("Confidence value must be a float")
if withTranscription:
posTranscription = numPoints + (2 if withConfidence else 1)
transcription = m.group(posTranscription)
m2 = re.match(r'^\s*\"(.*)\"\s*$', transcription)
if m2 != None: #Transcription with double quotes, we extract the value and replace escaped characters
transcription = m2.group(1).replace("\\\\", "\\").replace("\\\"",
"\"")
return points, confidence, transcription
def validate_point_inside_bounds(x, y, imWidth, imHeight):
if (x < 0 or x > imWidth):
raise Exception("X value (%s) not valid. Image dimensions: (%s,%s)" %
(xmin, imWidth, imHeight))
if (y < 0 or y > imHeight):
raise Exception(
"Y value (%s) not valid. Image dimensions: (%s,%s) Sample: %s Line:%s"
% (ymin, imWidth, imHeight))
def validate_clockwise_points(points):
"""
Validates that the points that the 4 points that dlimite a polygon are in clockwise order.
"""
if len(points) != 8:
raise Exception("Points list not valid." + str(len(points)))
point = [[int(points[0]), int(points[1])],
[int(points[2]), int(points[3])],
[int(points[4]), int(points[5])],
[int(points[6]), int(points[7])]]
edge = [(point[1][0] - point[0][0]) * (point[1][1] + point[0][1]),
(point[2][0] - point[1][0]) * (point[2][1] + point[1][1]),
(point[3][0] - point[2][0]) * (point[3][1] + point[2][1]),
(point[0][0] - point[3][0]) * (point[0][1] + point[3][1])]
summatory = edge[0] + edge[1] + edge[2] + edge[3]
if summatory > 0:
raise Exception(
"Points are not clockwise. The coordinates of bounding quadrilaterals have to be given in clockwise order. Regarding the correct interpretation of 'clockwise' remember that the image coordinate system used is the standard one, with the image origin at the upper left, the X axis extending to the right and Y axis extending downwards."
)
def get_tl_line_values_from_file_contents(content,
CRLF=True,
LTRB=True,
withTranscription=False,
withConfidence=False,
imWidth=0,
imHeight=0,
sort_by_confidences=True):
"""
Returns all points, confindences and transcriptions of a file in lists. Valid line formats:
xmin,ymin,xmax,ymax,[confidence],[transcription]
x1,y1,x2,y2,x3,y3,x4,y4,[confidence],[transcription]
"""
pointsList = []
transcriptionsList = []
confidencesList = []
lines = content.split("\r\n" if CRLF else "\n")
for line in lines:
line = line.replace("\r", "").replace("\n", "")
if (line != ""):
points, confidence, transcription = get_tl_line_values(
line, LTRB, withTranscription, withConfidence, imWidth,
imHeight)
pointsList.append(points)
transcriptionsList.append(transcription)
confidencesList.append(confidence)
if withConfidence and len(confidencesList) > 0 and sort_by_confidences:
import numpy as np
sorted_ind = np.argsort(-np.array(confidencesList))
confidencesList = [confidencesList[i] for i in sorted_ind]
pointsList = [pointsList[i] for i in sorted_ind]
transcriptionsList = [transcriptionsList[i] for i in sorted_ind]
return pointsList, confidencesList, transcriptionsList
def main_evaluation(p,
default_evaluation_params_fn,
validate_data_fn,
evaluate_method_fn,
show_result=True,
per_sample=True):
"""
This process validates a method, evaluates it and if it succed generates a ZIP file with a JSON entry for each sample.
Params:
p: Dictionary of parmeters with the GT/submission locations. If None is passed, the parameters send by the system are used.
default_evaluation_params_fn: points to a function that returns a dictionary with the default parameters used for the evaluation
validate_data_fn: points to a method that validates the corrct format of the submission
evaluate_method_fn: points to a function that evaluated the submission and return a Dictionary with the results
"""
evalParams = default_evaluation_params_fn()
if 'p' in p.keys():
evalParams.update(p['p'] if isinstance(p['p'], dict) else json.loads(p[
'p'][1:-1]))
resDict = {
'calculated': True,
'Message': '',
'method': '{}',
'per_sample': '{}'
}
try:
# validate_data_fn(p['g'], p['s'], evalParams)
evalData = evaluate_method_fn(p['g'], p['s'], evalParams)
resDict.update(evalData)
except Exception as e:
traceback.print_exc()
resDict['Message'] = str(e)
resDict['calculated'] = False
if 'o' in p:
if not os.path.exists(p['o']):
os.makedirs(p['o'])
resultsOutputname = p['o'] + '/results.zip'
outZip = zipfile.ZipFile(resultsOutputname, mode='w', allowZip64=True)
del resDict['per_sample']
if 'output_items' in resDict.keys():
del resDict['output_items']
outZip.writestr('method.json', json.dumps(resDict))
if not resDict['calculated']:
if show_result:
sys.stderr.write('Error!\n' + resDict['Message'] + '\n\n')
if 'o' in p:
outZip.close()
return resDict
if 'o' in p:
if per_sample == True:
for k, v in evalData['per_sample'].iteritems():
outZip.writestr(k + '.json', json.dumps(v))
if 'output_items' in evalData.keys():
for k, v in evalData['output_items'].iteritems():
outZip.writestr(k, v)
outZip.close()
if show_result:
sys.stdout.write("Calculated!")
sys.stdout.write(json.dumps(resDict['method']))
return resDict
def main_validation(default_evaluation_params_fn, validate_data_fn):
"""
This process validates a method
Params:
default_evaluation_params_fn: points to a function that returns a dictionary with the default parameters used for the evaluation
validate_data_fn: points to a method that validates the corrct format of the submission
"""
try:
p = dict([s[1:].split('=') for s in sys.argv[1:]])
evalParams = default_evaluation_params_fn()
if 'p' in p.keys():
evalParams.update(p['p'] if isinstance(p['p'], dict) else
json.loads(p['p'][1:-1]))
validate_data_fn(p['g'], p['s'], evalParams)
print('SUCCESS')
sys.exit(0)
except Exception as e:
print(str(e))
sys.exit(101)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from collections import namedtuple
from . import rrc_evaluation_funcs
import Polygon as plg
import numpy as np
def default_evaluation_params():
"""
default_evaluation_params: Default parameters to use for the validation and evaluation.
"""
return {
'IOU_CONSTRAINT': 0.5,
'AREA_PRECISION_CONSTRAINT': 0.5,
'GT_SAMPLE_NAME_2_ID': 'gt_img_([0-9]+).txt',
'DET_SAMPLE_NAME_2_ID': 'res_img_([0-9]+).txt',
'LTRB':
False, # LTRB:2points(left,top,right,bottom) or 4 points(x1,y1,x2,y2,x3,y3,x4,y4)
'CRLF': False, # Lines are delimited by Windows CRLF format
'CONFIDENCES':
False, # Detections must include confidence value. AP will be calculated
'PER_SAMPLE_RESULTS':
True # Generate per sample results and produce data for visualization
}
def validate_data(gtFilePath, submFilePath, evaluationParams):
"""
Method validate_data: validates that all files in the results folder are correct (have the correct name contents).
Validates also that there are no missing files in the folder.
If some error detected, the method raises the error
"""
gt = rrc_evaluation_funcs.load_folder_file(
gtFilePath, evaluationParams['GT_SAMPLE_NAME_2_ID'])
subm = rrc_evaluation_funcs.load_folder_file(
submFilePath, evaluationParams['DET_SAMPLE_NAME_2_ID'], True)
# Validate format of GroundTruth
for k in gt:
rrc_evaluation_funcs.validate_lines_in_file(
k, gt[k], evaluationParams['CRLF'], evaluationParams['LTRB'], True)
# Validate format of results
for k in subm:
if (k in gt) == False:
raise Exception("The sample %s not present in GT" % k)
rrc_evaluation_funcs.validate_lines_in_file(
k, subm[k], evaluationParams['CRLF'], evaluationParams['LTRB'],
False, evaluationParams['CONFIDENCES'])
def evaluate_method(gtFilePath, submFilePath, evaluationParams):
"""
Method evaluate_method: evaluate method and returns the results
Results. Dictionary with the following values:
- method (required) Global method metrics. Ex: { 'Precision':0.8,'Recall':0.9 }
- samples (optional) Per sample metrics. Ex: {'sample1' : { 'Precision':0.8,'Recall':0.9 } , 'sample2' : { 'Precision':0.8,'Recall':0.9 }
"""
def polygon_from_points(points):
"""
Returns a Polygon object to use with the Polygon2 class from a list of 8 points: x1,y1,x2,y2,x3,y3,x4,y4
"""
resBoxes = np.empty([1, 8], dtype='int32')
resBoxes[0, 0] = int(points[0])
resBoxes[0, 4] = int(points[1])
resBoxes[0, 1] = int(points[2])
resBoxes[0, 5] = int(points[3])
resBoxes[0, 2] = int(points[4])
resBoxes[0, 6] = int(points[5])
resBoxes[0, 3] = int(points[6])
resBoxes[0, 7] = int(points[7])
pointMat = resBoxes[0].reshape([2, 4]).T
return plg.Polygon(pointMat)
def rectangle_to_polygon(rect):
resBoxes = np.empty([1, 8], dtype='int32')
resBoxes[0, 0] = int(rect.xmin)
resBoxes[0, 4] = int(rect.ymax)
resBoxes[0, 1] = int(rect.xmin)
resBoxes[0, 5] = int(rect.ymin)
resBoxes[0, 2] = int(rect.xmax)
resBoxes[0, 6] = int(rect.ymin)
resBoxes[0, 3] = int(rect.xmax)
resBoxes[0, 7] = int(rect.ymax)
pointMat = resBoxes[0].reshape([2, 4]).T
return plg.Polygon(pointMat)
def rectangle_to_points(rect):
points = [
int(rect.xmin), int(rect.ymax), int(rect.xmax), int(rect.ymax),
int(rect.xmax), int(rect.ymin), int(rect.xmin), int(rect.ymin)
]
return points
def get_union(pD, pG):
areaA = pD.area()
areaB = pG.area()
return areaA + areaB - get_intersection(pD, pG)
def get_intersection_over_union(pD, pG):
try:
return get_intersection(pD, pG) / get_union(pD, pG)
except:
return 0
def get_intersection(pD, pG):
pInt = pD & pG
if len(pInt) == 0:
return 0
return pInt.area()
def compute_ap(confList, matchList, numGtCare):
correct = 0
AP = 0
if len(confList) > 0:
confList = np.array(confList)
matchList = np.array(matchList)
sorted_ind = np.argsort(-confList)
confList = confList[sorted_ind]
matchList = matchList[sorted_ind]
for n in range(len(confList)):
match = matchList[n]
if match:
correct += 1
AP += float(correct) / (n + 1)
if numGtCare > 0:
AP /= numGtCare
return AP
perSampleMetrics = {}
matchedSum = 0
Rectangle = namedtuple('Rectangle', 'xmin ymin xmax ymax')
gt = rrc_evaluation_funcs.load_folder_file(
gtFilePath, evaluationParams['GT_SAMPLE_NAME_2_ID'])
subm = rrc_evaluation_funcs.load_folder_file(
submFilePath, evaluationParams['DET_SAMPLE_NAME_2_ID'], True)
numGlobalCareGt = 0
numGlobalCareDet = 0
arrGlobalConfidences = []
arrGlobalMatches = []
for resFile in gt:
gtFile = gt[resFile] # rrc_evaluation_funcs.decode_utf8(gt[resFile])
recall = 0
precision = 0
hmean = 0
detMatched = 0
iouMat = np.empty([1, 1])
gtPols = []
detPols = []
gtPolPoints = []
detPolPoints = []
# Array of Ground Truth Polygons' keys marked as don't Care
gtDontCarePolsNum = []
# Array of Detected Polygons' matched with a don't Care GT
detDontCarePolsNum = []
pairs = []
detMatchedNums = []
arrSampleConfidences = []
arrSampleMatch = []
sampleAP = 0
evaluationLog = ""
pointsList, _, transcriptionsList = rrc_evaluation_funcs.get_tl_line_values_from_file_contents(
gtFile, evaluationParams['CRLF'], evaluationParams['LTRB'], True,
False)
for n in range(len(pointsList)):
points = pointsList[n]
transcription = transcriptionsList[n]
dontCare = transcription == "###"
if evaluationParams['LTRB']:
gtRect = Rectangle(*points)
gtPol = rectangle_to_polygon(gtRect)
else:
gtPol = polygon_from_points(points)
gtPols.append(gtPol)
gtPolPoints.append(points)
if dontCare:
gtDontCarePolsNum.append(len(gtPols) - 1)
evaluationLog += "GT polygons: " + str(len(gtPols)) + (
" (" + str(len(gtDontCarePolsNum)) + " don't care)\n"
if len(gtDontCarePolsNum) > 0 else "\n")
if resFile in subm:
detFile = subm[
resFile] # rrc_evaluation_funcs.decode_utf8(subm[resFile])
pointsList, confidencesList, _ = rrc_evaluation_funcs.get_tl_line_values_from_file_contents(
detFile, evaluationParams['CRLF'], evaluationParams['LTRB'],
False, evaluationParams['CONFIDENCES'])
for n in range(len(pointsList)):
points = pointsList[n]
if evaluationParams['LTRB']:
detRect = Rectangle(*points)
detPol = rectangle_to_polygon(detRect)
else:
detPol = polygon_from_points(points)
detPols.append(detPol)
detPolPoints.append(points)
if len(gtDontCarePolsNum) > 0:
for dontCarePol in gtDontCarePolsNum:
dontCarePol = gtPols[dontCarePol]
intersected_area = get_intersection(dontCarePol, detPol)
pdDimensions = detPol.area()
precision = 0 if pdDimensions == 0 else intersected_area / pdDimensions
if (precision >
evaluationParams['AREA_PRECISION_CONSTRAINT']):
detDontCarePolsNum.append(len(detPols) - 1)
break
evaluationLog += "DET polygons: " + str(len(detPols)) + (
" (" + str(len(detDontCarePolsNum)) + " don't care)\n"
if len(detDontCarePolsNum) > 0 else "\n")
if len(gtPols) > 0 and len(detPols) > 0:
# Calculate IoU and precision matrixs
outputShape = [len(gtPols), len(detPols)]
iouMat = np.empty(outputShape)
gtRectMat = np.zeros(len(gtPols), np.int8)
detRectMat = np.zeros(len(detPols), np.int8)
for gtNum in range(len(gtPols)):
for detNum in range(len(detPols)):
pG = gtPols[gtNum]
pD = detPols[detNum]
iouMat[gtNum, detNum] = get_intersection_over_union(pD,
pG)
for gtNum in range(len(gtPols)):
for detNum in range(len(detPols)):
if gtRectMat[gtNum] == 0 and detRectMat[
detNum] == 0 and gtNum not in gtDontCarePolsNum and detNum not in detDontCarePolsNum:
if iouMat[gtNum, detNum] > evaluationParams[
'IOU_CONSTRAINT']:
gtRectMat[gtNum] = 1
detRectMat[detNum] = 1
detMatched += 1
pairs.append({'gt': gtNum, 'det': detNum})
detMatchedNums.append(detNum)
evaluationLog += "Match GT #" + str(
gtNum) + " with Det #" + str(detNum) + "\n"
if evaluationParams['CONFIDENCES']:
for detNum in range(len(detPols)):
if detNum not in detDontCarePolsNum:
# we exclude the don't care detections
match = detNum in detMatchedNums
arrSampleConfidences.append(confidencesList[detNum])
arrSampleMatch.append(match)
arrGlobalConfidences.append(confidencesList[detNum])
arrGlobalMatches.append(match)
numGtCare = (len(gtPols) - len(gtDontCarePolsNum))
numDetCare = (len(detPols) - len(detDontCarePolsNum))
if numGtCare == 0:
recall = float(1)
precision = float(0) if numDetCare > 0 else float(1)
sampleAP = precision
else:
recall = float(detMatched) / numGtCare
precision = 0 if numDetCare == 0 else float(detMatched) / numDetCare
if evaluationParams['CONFIDENCES'] and evaluationParams[
'PER_SAMPLE_RESULTS']:
sampleAP = compute_ap(arrSampleConfidences, arrSampleMatch,
numGtCare)
hmean = 0 if (precision + recall) == 0 else 2.0 * precision * recall / (
precision + recall)
matchedSum += detMatched
numGlobalCareGt += numGtCare
numGlobalCareDet += numDetCare
if evaluationParams['PER_SAMPLE_RESULTS']:
perSampleMetrics[resFile] = {
'precision': precision,
'recall': recall,
'hmean': hmean,
'pairs': pairs,
'AP': sampleAP,
'iouMat': [] if len(detPols) > 100 else iouMat.tolist(),
'gtPolPoints': gtPolPoints,
'detPolPoints': detPolPoints,
'gtDontCare': gtDontCarePolsNum,
'detDontCare': detDontCarePolsNum,
'evaluationParams': evaluationParams,
'evaluationLog': evaluationLog
}
# Compute MAP and MAR
AP = 0
if evaluationParams['CONFIDENCES']:
AP = compute_ap(arrGlobalConfidences, arrGlobalMatches, numGlobalCareGt)
methodRecall = 0 if numGlobalCareGt == 0 else float(
matchedSum) / numGlobalCareGt
methodPrecision = 0 if numGlobalCareDet == 0 else float(
matchedSum) / numGlobalCareDet
methodHmean = 0 if methodRecall + methodPrecision == 0 else 2 * methodRecall * methodPrecision / (
methodRecall + methodPrecision)
methodMetrics = {
'precision': methodPrecision,
'recall': methodRecall,
'hmean': methodHmean,
'AP': AP
}
resDict = {
'calculated': True,
'Message': '',
'method': methodMetrics,
'per_sample': perSampleMetrics
}
return resDict
def cal_recall_precison_f1(gt_path, result_path, show_result=False):
p = {'g': gt_path, 's': result_path}
result = rrc_evaluation_funcs.main_evaluation(p, default_evaluation_params,
validate_data,
evaluate_method, show_result)
return result['method']
# -*- coding: utf-8 -*-
# @Time : 2019/12/7 14:46
# @Author : zhoujun
import numpy as np
import cv2
import os
import random
from tqdm import tqdm
# calculate means and std
train_txt_path = './train_val_list.txt'
CNum = 10000 # 挑选多少图片进行计算
img_h, img_w = 640, 640
imgs = np.zeros([img_w, img_h, 3, 1])
means, stdevs = [], []
with open(train_txt_path, 'r') as f:
lines = f.readlines()
random.shuffle(lines) # shuffle , 随机挑选图片
for i in tqdm(range(CNum)):
img_path = lines[i].split('\t')[0]
img = cv2.imread(img_path)
img = cv2.resize(img, (img_h, img_w))
img = img[:, :, :, np.newaxis]
imgs = np.concatenate((imgs, img), axis=3)
# print(i)
imgs = imgs.astype(np.float32) / 255.
for i in tqdm(range(3)):
pixels = imgs[:, :, i, :].ravel() # 拉成一行
means.append(np.mean(pixels))
stdevs.append(np.std(pixels))
# cv2 读取的图像格式为BGR,PIL/Skimage读取到的都是RGB不用转
means.reverse() # BGR --> RGB
stdevs.reverse()
print("normMean = {}".format(means))
print("normStd = {}".format(stdevs))
print('transforms.Normalize(normMean = {}, normStd = {})'.format(means, stdevs))
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2019/8/24 12:06
# @Author : zhoujun
import os
import glob
import pathlib
data_path = r'test'
# data_path/img 存放图片
# data_path/gt 存放标签文件
f_w = open(os.path.join(data_path, 'test.txt'), 'w', encoding='utf8')
for img_path in glob.glob(data_path + '/img/*.jpg', recursive=True):
d = pathlib.Path(img_path)
label_path = os.path.join(data_path, 'gt', ('gt_' + str(d.stem) + '.txt'))
if os.path.exists(img_path) and os.path.exists(label_path):
print(img_path, label_path)
else:
print('不存在', img_path, label_path)
f_w.write('{}\t{}\n'.format(img_path, label_path))
f_w.close()
\ No newline at end of file
# Adapted from score written by wkentaro
# https://github.com/wkentaro/pytorch-fcn/blob/master/torchfcn/utils.py
import numpy as np
class runningScore(object):
def __init__(self, n_classes):
self.n_classes = n_classes
self.confusion_matrix = np.zeros((n_classes, n_classes))
def _fast_hist(self, label_true, label_pred, n_class):
mask = (label_true >= 0) & (label_true < n_class)
if np.sum((label_pred[mask] < 0)) > 0:
print(label_pred[label_pred < 0])
hist = np.bincount(
n_class * label_true[mask].astype(int) + label_pred[mask],
minlength=n_class**2).reshape(n_class, n_class)
return hist
def update(self, label_trues, label_preds):
# print label_trues.dtype, label_preds.dtype
for lt, lp in zip(label_trues, label_preds):
try:
self.confusion_matrix += self._fast_hist(lt.flatten(),
lp.flatten(),
self.n_classes)
except:
pass
def get_scores(self):
"""Returns accuracy score evaluation result.
- overall accuracy
- mean accuracy
- mean IU
- fwavacc
"""
hist = self.confusion_matrix
acc = np.diag(hist).sum() / (hist.sum() + 0.0001)
acc_cls = np.diag(hist) / (hist.sum(axis=1) + 0.0001)
acc_cls = np.nanmean(acc_cls)
iu = np.diag(hist) / (
hist.sum(axis=1) + hist.sum(axis=0) - np.diag(hist) + 0.0001)
mean_iu = np.nanmean(iu)
freq = hist.sum(axis=1) / (hist.sum() + 0.0001)
fwavacc = (freq[freq > 0] * iu[freq > 0]).sum()
cls_iu = dict(zip(range(self.n_classes), iu))
return {
'Overall Acc': acc,
'Mean Acc': acc_cls,
'FreqW Acc': fwavacc,
'Mean IoU': mean_iu,
}, cls_iu
def reset(self):
self.confusion_matrix = np.zeros((self.n_classes, self.n_classes))
# -*- coding: utf-8 -*-
# @Time : 2019/12/5 15:36
# @Author : zhoujun
from .icdar2015 import QuadMetric
def get_metric(config):
try:
if 'args' not in config:
args = {}
else:
args = config['args']
if isinstance(args, dict):
cls = eval(config['type'])(**args)
else:
cls = eval(config['type'])(args)
return cls
except:
return None
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2019/12/5 15:36
# @Author : zhoujun
from .quad_metric import QuadMetric
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math
from collections import namedtuple
import numpy as np
from shapely.geometry import Polygon
class DetectionDetEvalEvaluator(object):
def __init__(self,
area_recall_constraint=0.8,
area_precision_constraint=0.4,
ev_param_ind_center_diff_thr=1,
mtype_oo_o=1.0,
mtype_om_o=0.8,
mtype_om_m=1.0):
self.area_recall_constraint = area_recall_constraint
self.area_precision_constraint = area_precision_constraint
self.ev_param_ind_center_diff_thr = ev_param_ind_center_diff_thr
self.mtype_oo_o = mtype_oo_o
self.mtype_om_o = mtype_om_o
self.mtype_om_m = mtype_om_m
def evaluate_image(self, gt, pred):
def get_union(pD, pG):
return Polygon(pD).union(Polygon(pG)).area
def get_intersection_over_union(pD, pG):
return get_intersection(pD, pG) / get_union(pD, pG)
def get_intersection(pD, pG):
return Polygon(pD).intersection(Polygon(pG)).area
def one_to_one_match(row, col):
cont = 0
for j in range(len(recallMat[0])):
if recallMat[row,
j] >= self.area_recall_constraint and precisionMat[
row, j] >= self.area_precision_constraint:
cont = cont + 1
if (cont != 1):
return False
cont = 0
for i in range(len(recallMat)):
if recallMat[
i, col] >= self.area_recall_constraint and precisionMat[
i, col] >= self.area_precision_constraint:
cont = cont + 1
if (cont != 1):
return False
if recallMat[row,
col] >= self.area_recall_constraint and precisionMat[
row, col] >= self.area_precision_constraint:
return True
return False
def num_overlaps_gt(gtNum):
cont = 0
for detNum in range(len(detRects)):
if detNum not in detDontCareRectsNum:
if recallMat[gtNum, detNum] > 0:
cont = cont + 1
return cont
def num_overlaps_det(detNum):
cont = 0
for gtNum in range(len(recallMat)):
if gtNum not in gtDontCareRectsNum:
if recallMat[gtNum, detNum] > 0:
cont = cont + 1
return cont
def is_single_overlap(row, col):
if num_overlaps_gt(row) == 1 and num_overlaps_det(col) == 1:
return True
else:
return False
def one_to_many_match(gtNum):
many_sum = 0
detRects = []
for detNum in range(len(recallMat[0])):
if gtRectMat[gtNum] == 0 and detRectMat[
detNum] == 0 and detNum not in detDontCareRectsNum:
if precisionMat[gtNum,
detNum] >= self.area_precision_constraint:
many_sum += recallMat[gtNum, detNum]
detRects.append(detNum)
if round(many_sum, 4) >= self.area_recall_constraint:
return True, detRects
else:
return False, []
def many_to_one_match(detNum):
many_sum = 0
gtRects = []
for gtNum in range(len(recallMat)):
if gtRectMat[gtNum] == 0 and detRectMat[
detNum] == 0 and gtNum not in gtDontCareRectsNum:
if recallMat[gtNum, detNum] >= self.area_recall_constraint:
many_sum += precisionMat[gtNum, detNum]
gtRects.append(gtNum)
if round(many_sum, 4) >= self.area_precision_constraint:
return True, gtRects
else:
return False, []
def center_distance(r1, r2):
return ((np.mean(r1, axis=0) - np.mean(r2, axis=0))**2).sum()**0.5
def diag(r):
r = np.array(r)
return ((r[:, 0].max() - r[:, 0].min())**2 +
(r[:, 1].max() - r[:, 1].min())**2)**0.5
perSampleMetrics = {}
recall = 0
precision = 0
hmean = 0
recallAccum = 0.
precisionAccum = 0.
gtRects = []
detRects = []
gtPolPoints = []
detPolPoints = []
gtDontCareRectsNum = [
] #Array of Ground Truth Rectangles' keys marked as don't Care
detDontCareRectsNum = [
] #Array of Detected Rectangles' matched with a don't Care GT
pairs = []
evaluationLog = ""
recallMat = np.empty([1, 1])
precisionMat = np.empty([1, 1])
for n in range(len(gt)):
points = gt[n]['points']
# transcription = gt[n]['text']
dontCare = gt[n]['ignore']
if not Polygon(points).is_valid or not Polygon(points).is_simple:
continue
gtRects.append(points)
gtPolPoints.append(points)
if dontCare:
gtDontCareRectsNum.append(len(gtRects) - 1)
evaluationLog += "GT rectangles: " + str(len(gtRects)) + (
" (" + str(len(gtDontCareRectsNum)) + " don't care)\n"
if len(gtDontCareRectsNum) > 0 else "\n")
for n in range(len(pred)):
points = pred[n]['points']
if not Polygon(points).is_valid or not Polygon(points).is_simple:
continue
detRect = points
detRects.append(detRect)
detPolPoints.append(points)
if len(gtDontCareRectsNum) > 0:
for dontCareRectNum in gtDontCareRectsNum:
dontCareRect = gtRects[dontCareRectNum]
intersected_area = get_intersection(dontCareRect, detRect)
rdDimensions = Polygon(detRect).area
if (rdDimensions == 0):
precision = 0
else:
precision = intersected_area / rdDimensions
if (precision > self.area_precision_constraint):
detDontCareRectsNum.append(len(detRects) - 1)
break
evaluationLog += "DET rectangles: " + str(len(detRects)) + (
" (" + str(len(detDontCareRectsNum)) + " don't care)\n"
if len(detDontCareRectsNum) > 0 else "\n")
if len(gtRects) == 0:
recall = 1
precision = 0 if len(detRects) > 0 else 1
if len(detRects) > 0:
#Calculate recall and precision matrixs
outputShape = [len(gtRects), len(detRects)]
recallMat = np.empty(outputShape)
precisionMat = np.empty(outputShape)
gtRectMat = np.zeros(len(gtRects), np.int8)
detRectMat = np.zeros(len(detRects), np.int8)
for gtNum in range(len(gtRects)):
for detNum in range(len(detRects)):
rG = gtRects[gtNum]
rD = detRects[detNum]
intersected_area = get_intersection(rG, rD)
rgDimensions = Polygon(rG).area
rdDimensions = Polygon(rD).area
recallMat[
gtNum,
detNum] = 0 if rgDimensions == 0 else intersected_area / rgDimensions
precisionMat[
gtNum,
detNum] = 0 if rdDimensions == 0 else intersected_area / rdDimensions
# Find one-to-one matches
evaluationLog += "Find one-to-one matches\n"
for gtNum in range(len(gtRects)):
for detNum in range(len(detRects)):
if gtRectMat[gtNum] == 0 and detRectMat[
detNum] == 0 and gtNum not in gtDontCareRectsNum and detNum not in detDontCareRectsNum:
match = one_to_one_match(gtNum, detNum)
if match is True:
#in deteval we have to make other validation before mark as one-to-one
if is_single_overlap(gtNum, detNum) is True:
rG = gtRects[gtNum]
rD = detRects[detNum]
normDist = center_distance(rG, rD)
normDist /= diag(rG) + diag(rD)
normDist *= 2.0
if normDist < self.ev_param_ind_center_diff_thr:
gtRectMat[gtNum] = 1
detRectMat[detNum] = 1
recallAccum += self.mtype_oo_o
precisionAccum += self.mtype_oo_o
pairs.append({
'gt': gtNum,
'det': detNum,
'type': 'OO'
})
evaluationLog += "Match GT #" + str(
gtNum) + " with Det #" + str(
detNum) + "\n"
else:
evaluationLog += "Match Discarded GT #" + str(
gtNum) + " with Det #" + str(
detNum) + " normDist: " + str(
normDist) + " \n"
else:
evaluationLog += "Match Discarded GT #" + str(
gtNum) + " with Det #" + str(
detNum) + " not single overlap\n"
# Find one-to-many matches
evaluationLog += "Find one-to-many matches\n"
for gtNum in range(len(gtRects)):
if gtNum not in gtDontCareRectsNum:
match, matchesDet = one_to_many_match(gtNum)
if match is True:
evaluationLog += "num_overlaps_gt=" + str(
num_overlaps_gt(gtNum))
#in deteval we have to make other validation before mark as one-to-one
if num_overlaps_gt(gtNum) >= 2:
gtRectMat[gtNum] = 1
recallAccum += (self.mtype_oo_o
if len(matchesDet) == 1 else
self.mtype_om_o)
precisionAccum += (self.mtype_oo_o
if len(matchesDet) == 1 else
self.mtype_om_o *
len(matchesDet))
pairs.append({
'gt': gtNum,
'det': matchesDet,
'type': 'OO' if len(matchesDet) == 1 else 'OM'
})
for detNum in matchesDet:
detRectMat[detNum] = 1
evaluationLog += "Match GT #" + str(
gtNum) + " with Det #" + str(matchesDet) + "\n"
else:
evaluationLog += "Match Discarded GT #" + str(
gtNum) + " with Det #" + str(
matchesDet) + " not single overlap\n"
# Find many-to-one matches
evaluationLog += "Find many-to-one matches\n"
for detNum in range(len(detRects)):
if detNum not in detDontCareRectsNum:
match, matchesGt = many_to_one_match(detNum)
if match is True:
#in deteval we have to make other validation before mark as one-to-one
if num_overlaps_det(detNum) >= 2:
detRectMat[detNum] = 1
recallAccum += (self.mtype_oo_o
if len(matchesGt) == 1 else
self.mtype_om_m * len(matchesGt))
precisionAccum += (self.mtype_oo_o
if len(matchesGt) == 1 else
self.mtype_om_m)
pairs.append({
'gt': matchesGt,
'det': detNum,
'type': 'OO' if len(matchesGt) == 1 else 'MO'
})
for gtNum in matchesGt:
gtRectMat[gtNum] = 1
evaluationLog += "Match GT #" + str(
matchesGt) + " with Det #" + str(detNum) + "\n"
else:
evaluationLog += "Match Discarded GT #" + str(
matchesGt) + " with Det #" + str(
detNum) + " not single overlap\n"
numGtCare = (len(gtRects) - len(gtDontCareRectsNum))
if numGtCare == 0:
recall = float(1)
precision = float(0) if len(detRects) > 0 else float(1)
else:
recall = float(recallAccum) / numGtCare
precision = float(0) if (
len(detRects) - len(detDontCareRectsNum)
) == 0 else float(precisionAccum) / (
len(detRects) - len(detDontCareRectsNum))
hmean = 0 if (precision + recall
) == 0 else 2.0 * precision * recall / (
precision + recall)
numGtCare = len(gtRects) - len(gtDontCareRectsNum)
numDetCare = len(detRects) - len(detDontCareRectsNum)
perSampleMetrics = {
'precision': precision,
'recall': recall,
'hmean': hmean,
'pairs': pairs,
'recallMat': [] if len(detRects) > 100 else recallMat.tolist(),
'precisionMat': []
if len(detRects) > 100 else precisionMat.tolist(),
'gtPolPoints': gtPolPoints,
'detPolPoints': detPolPoints,
'gtCare': numGtCare,
'detCare': numDetCare,
'gtDontCare': gtDontCareRectsNum,
'detDontCare': detDontCareRectsNum,
'recallAccum': recallAccum,
'precisionAccum': precisionAccum,
'evaluationLog': evaluationLog
}
return perSampleMetrics
def combine_results(self, results):
numGt = 0
numDet = 0
methodRecallSum = 0
methodPrecisionSum = 0
for result in results:
numGt += result['gtCare']
numDet += result['detCare']
methodRecallSum += result['recallAccum']
methodPrecisionSum += result['precisionAccum']
methodRecall = 0 if numGt == 0 else methodRecallSum / numGt
methodPrecision = 0 if numDet == 0 else methodPrecisionSum / numDet
methodHmean = 0 if methodRecall + methodPrecision == 0 else 2 * methodRecall * methodPrecision / (
methodRecall + methodPrecision)
methodMetrics = {
'precision': methodPrecision,
'recall': methodRecall,
'hmean': methodHmean
}
return methodMetrics
if __name__ == '__main__':
evaluator = DetectionDetEvalEvaluator()
gts = [[{
'points': [(0, 0), (1, 0), (1, 1), (0, 1)],
'text': 1234,
'ignore': False,
}, {
'points': [(2, 2), (3, 2), (3, 3), (2, 3)],
'text': 5678,
'ignore': True,
}]]
preds = [[{
'points': [(0.1, 0.1), (1, 0), (1, 1), (0, 1)],
'text': 123,
'ignore': False,
}]]
results = []
for gt, pred in zip(gts, preds):
results.append(evaluator.evaluate_image(gt, pred))
metrics = evaluator.combine_results(results)
print(metrics)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math
from collections import namedtuple
import numpy as np
from shapely.geometry import Polygon
class DetectionICDAR2013Evaluator(object):
def __init__(self,
area_recall_constraint=0.8,
area_precision_constraint=0.4,
ev_param_ind_center_diff_thr=1,
mtype_oo_o=1.0,
mtype_om_o=0.8,
mtype_om_m=1.0):
self.area_recall_constraint = area_recall_constraint
self.area_precision_constraint = area_precision_constraint
self.ev_param_ind_center_diff_thr = ev_param_ind_center_diff_thr
self.mtype_oo_o = mtype_oo_o
self.mtype_om_o = mtype_om_o
self.mtype_om_m = mtype_om_m
def evaluate_image(self, gt, pred):
def get_union(pD, pG):
return Polygon(pD).union(Polygon(pG)).area
def get_intersection_over_union(pD, pG):
return get_intersection(pD, pG) / get_union(pD, pG)
def get_intersection(pD, pG):
return Polygon(pD).intersection(Polygon(pG)).area
def one_to_one_match(row, col):
cont = 0
for j in range(len(recallMat[0])):
if recallMat[row,
j] >= self.area_recall_constraint and precisionMat[
row, j] >= self.area_precision_constraint:
cont = cont + 1
if (cont != 1):
return False
cont = 0
for i in range(len(recallMat)):
if recallMat[
i, col] >= self.area_recall_constraint and precisionMat[
i, col] >= self.area_precision_constraint:
cont = cont + 1
if (cont != 1):
return False
if recallMat[row,
col] >= self.area_recall_constraint and precisionMat[
row, col] >= self.area_precision_constraint:
return True
return False
def one_to_many_match(gtNum):
many_sum = 0
detRects = []
for detNum in range(len(recallMat[0])):
if gtRectMat[gtNum] == 0 and detRectMat[
detNum] == 0 and detNum not in detDontCareRectsNum:
if precisionMat[gtNum,
detNum] >= self.area_precision_constraint:
many_sum += recallMat[gtNum, detNum]
detRects.append(detNum)
if round(many_sum, 4) >= self.area_recall_constraint:
return True, detRects
else:
return False, []
def many_to_one_match(detNum):
many_sum = 0
gtRects = []
for gtNum in range(len(recallMat)):
if gtRectMat[gtNum] == 0 and detRectMat[
detNum] == 0 and gtNum not in gtDontCareRectsNum:
if recallMat[gtNum, detNum] >= self.area_recall_constraint:
many_sum += precisionMat[gtNum, detNum]
gtRects.append(gtNum)
if round(many_sum, 4) >= self.area_precision_constraint:
return True, gtRects
else:
return False, []
def center_distance(r1, r2):
return ((np.mean(r1, axis=0) - np.mean(r2, axis=0))**2).sum()**0.5
def diag(r):
r = np.array(r)
return ((r[:, 0].max() - r[:, 0].min())**2 +
(r[:, 1].max() - r[:, 1].min())**2)**0.5
perSampleMetrics = {}
recall = 0
precision = 0
hmean = 0
recallAccum = 0.
precisionAccum = 0.
gtRects = []
detRects = []
gtPolPoints = []
detPolPoints = []
gtDontCareRectsNum = [
] #Array of Ground Truth Rectangles' keys marked as don't Care
detDontCareRectsNum = [
] #Array of Detected Rectangles' matched with a don't Care GT
pairs = []
evaluationLog = ""
recallMat = np.empty([1, 1])
precisionMat = np.empty([1, 1])
for n in range(len(gt)):
points = gt[n]['points']
# transcription = gt[n]['text']
dontCare = gt[n]['ignore']
if not Polygon(points).is_valid or not Polygon(points).is_simple:
continue
gtRects.append(points)
gtPolPoints.append(points)
if dontCare:
gtDontCareRectsNum.append(len(gtRects) - 1)
evaluationLog += "GT rectangles: " + str(len(gtRects)) + (
" (" + str(len(gtDontCareRectsNum)) + " don't care)\n"
if len(gtDontCareRectsNum) > 0 else "\n")
for n in range(len(pred)):
points = pred[n]['points']
if not Polygon(points).is_valid or not Polygon(points).is_simple:
continue
detRect = points
detRects.append(detRect)
detPolPoints.append(points)
if len(gtDontCareRectsNum) > 0:
for dontCareRectNum in gtDontCareRectsNum:
dontCareRect = gtRects[dontCareRectNum]
intersected_area = get_intersection(dontCareRect, detRect)
rdDimensions = Polygon(detRect).area
if (rdDimensions == 0):
precision = 0
else:
precision = intersected_area / rdDimensions
if (precision > self.area_precision_constraint):
detDontCareRectsNum.append(len(detRects) - 1)
break
evaluationLog += "DET rectangles: " + str(len(detRects)) + (
" (" + str(len(detDontCareRectsNum)) + " don't care)\n"
if len(detDontCareRectsNum) > 0 else "\n")
if len(gtRects) == 0:
recall = 1
precision = 0 if len(detRects) > 0 else 1
if len(detRects) > 0:
#Calculate recall and precision matrixs
outputShape = [len(gtRects), len(detRects)]
recallMat = np.empty(outputShape)
precisionMat = np.empty(outputShape)
gtRectMat = np.zeros(len(gtRects), np.int8)
detRectMat = np.zeros(len(detRects), np.int8)
for gtNum in range(len(gtRects)):
for detNum in range(len(detRects)):
rG = gtRects[gtNum]
rD = detRects[detNum]
intersected_area = get_intersection(rG, rD)
rgDimensions = Polygon(rG).area
rdDimensions = Polygon(rD).area
recallMat[
gtNum,
detNum] = 0 if rgDimensions == 0 else intersected_area / rgDimensions
precisionMat[
gtNum,
detNum] = 0 if rdDimensions == 0 else intersected_area / rdDimensions
# Find one-to-one matches
evaluationLog += "Find one-to-one matches\n"
for gtNum in range(len(gtRects)):
for detNum in range(len(detRects)):
if gtRectMat[gtNum] == 0 and detRectMat[
detNum] == 0 and gtNum not in gtDontCareRectsNum and detNum not in detDontCareRectsNum:
match = one_to_one_match(gtNum, detNum)
if match is True:
#in deteval we have to make other validation before mark as one-to-one
rG = gtRects[gtNum]
rD = detRects[detNum]
normDist = center_distance(rG, rD)
normDist /= diag(rG) + diag(rD)
normDist *= 2.0
if normDist < self.ev_param_ind_center_diff_thr:
gtRectMat[gtNum] = 1
detRectMat[detNum] = 1
recallAccum += self.mtype_oo_o
precisionAccum += self.mtype_oo_o
pairs.append({
'gt': gtNum,
'det': detNum,
'type': 'OO'
})
evaluationLog += "Match GT #" + str(
gtNum) + " with Det #" + str(detNum) + "\n"
else:
evaluationLog += "Match Discarded GT #" + str(
gtNum) + " with Det #" + str(
detNum) + " normDist: " + str(
normDist) + " \n"
# Find one-to-many matches
evaluationLog += "Find one-to-many matches\n"
for gtNum in range(len(gtRects)):
if gtNum not in gtDontCareRectsNum:
match, matchesDet = one_to_many_match(gtNum)
if match is True:
evaluationLog += "num_overlaps_gt=" + str(
num_overlaps_gt(gtNum))
gtRectMat[gtNum] = 1
recallAccum += (self.mtype_oo_o if len(matchesDet) == 1
else self.mtype_om_o)
precisionAccum += (self.mtype_oo_o
if len(matchesDet) == 1 else
self.mtype_om_o * len(matchesDet))
pairs.append({
'gt': gtNum,
'det': matchesDet,
'type': 'OO' if len(matchesDet) == 1 else 'OM'
})
for detNum in matchesDet:
detRectMat[detNum] = 1
evaluationLog += "Match GT #" + str(
gtNum) + " with Det #" + str(matchesDet) + "\n"
# Find many-to-one matches
evaluationLog += "Find many-to-one matches\n"
for detNum in range(len(detRects)):
if detNum not in detDontCareRectsNum:
match, matchesGt = many_to_one_match(detNum)
if match is True:
detRectMat[detNum] = 1
recallAccum += (self.mtype_oo_o if len(matchesGt) == 1
else self.mtype_om_m * len(matchesGt))
precisionAccum += (self.mtype_oo_o
if len(matchesGt) == 1 else
self.mtype_om_m)
pairs.append({
'gt': matchesGt,
'det': detNum,
'type': 'OO' if len(matchesGt) == 1 else 'MO'
})
for gtNum in matchesGt:
gtRectMat[gtNum] = 1
evaluationLog += "Match GT #" + str(
matchesGt) + " with Det #" + str(detNum) + "\n"
numGtCare = (len(gtRects) - len(gtDontCareRectsNum))
if numGtCare == 0:
recall = float(1)
precision = float(0) if len(detRects) > 0 else float(1)
else:
recall = float(recallAccum) / numGtCare
precision = float(0) if (
len(detRects) - len(detDontCareRectsNum)
) == 0 else float(precisionAccum) / (
len(detRects) - len(detDontCareRectsNum))
hmean = 0 if (precision + recall
) == 0 else 2.0 * precision * recall / (
precision + recall)
numGtCare = len(gtRects) - len(gtDontCareRectsNum)
numDetCare = len(detRects) - len(detDontCareRectsNum)
perSampleMetrics = {
'precision': precision,
'recall': recall,
'hmean': hmean,
'pairs': pairs,
'recallMat': [] if len(detRects) > 100 else recallMat.tolist(),
'precisionMat': []
if len(detRects) > 100 else precisionMat.tolist(),
'gtPolPoints': gtPolPoints,
'detPolPoints': detPolPoints,
'gtCare': numGtCare,
'detCare': numDetCare,
'gtDontCare': gtDontCareRectsNum,
'detDontCare': detDontCareRectsNum,
'recallAccum': recallAccum,
'precisionAccum': precisionAccum,
'evaluationLog': evaluationLog
}
return perSampleMetrics
def combine_results(self, results):
numGt = 0
numDet = 0
methodRecallSum = 0
methodPrecisionSum = 0
for result in results:
numGt += result['gtCare']
numDet += result['detCare']
methodRecallSum += result['recallAccum']
methodPrecisionSum += result['precisionAccum']
methodRecall = 0 if numGt == 0 else methodRecallSum / numGt
methodPrecision = 0 if numDet == 0 else methodPrecisionSum / numDet
methodHmean = 0 if methodRecall + methodPrecision == 0 else 2 * methodRecall * methodPrecision / (
methodRecall + methodPrecision)
methodMetrics = {
'precision': methodPrecision,
'recall': methodRecall,
'hmean': methodHmean
}
return methodMetrics
if __name__ == '__main__':
evaluator = DetectionICDAR2013Evaluator()
gts = [[{
'points': [(0, 0), (1, 0), (1, 1), (0, 1)],
'text': 1234,
'ignore': False,
}, {
'points': [(2, 2), (3, 2), (3, 3), (2, 3)],
'text': 5678,
'ignore': True,
}]]
preds = [[{
'points': [(0.1, 0.1), (1, 0), (1, 1), (0, 1)],
'text': 123,
'ignore': False,
}]]
results = []
for gt, pred in zip(gts, preds):
results.append(evaluator.evaluate_image(gt, pred))
metrics = evaluator.combine_results(results)
print(metrics)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment