Commit b44aeb9e authored by liuhy's avatar liuhy
Browse files

上传LPR相关代码

parent 9ce4dc82
import argparse
import cv2
import torch
import numpy as np
from lprnet import build_lprnet
from load_data import CHARS
def validation(args):
model = build_lprnet(len(CHARS))
model.load_state_dict(torch.load(args.model, map_location=args.device))
model.to(args.device)
img = cv2.imread(args.imgpath)
height, width, _ = img.shape
if height != 24 or width != 94:
img = cv2.resize(img, (94, 24))
img = img.astype('float32')
img -= 127.5
img *= 0.0078125
img = np.transpose(img, (2, 0, 1))
with torch.no_grad():
img = torch.from_numpy(img).unsqueeze(0).to(args.device)
preb = model(img)
preb = preb.detach().cpu().numpy().squeeze()
preb_label = []
for j in range(preb.shape[1]):
preb_label.append(np.argmax(preb[:, j], axis=0))
no_repeat_blank_label = []
pre_c = preb_label[0]
if pre_c != len(CHARS) - 1:
no_repeat_blank_label.append(pre_c)
for c in preb_label:
if (pre_c == c) or (c == len(CHARS) - 1):
if c == len(CHARS) - 1:
pre_c = c
continue
no_repeat_blank_label.append(c)
pre_c = c
if args.export_onnx:
print('export pytroch model to onnx model...')
onnx_input = torch.randn(1, 3, 24, 94, device=args.device)
torch.onnx.export(
model,
onnx_input,
'LPRNet.onnx',
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch'}, 'output': {0: 'batch'}} if args.dynamic else None,
opset_version=12,
)
return ''.join(list(map(lambda x: CHARS[x], no_repeat_blank_label)))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='parameters to vaildate net')
parser.add_argument('--model', default='model/lprnet.pth', help='model path to vaildate')
parser.add_argument('--img', default='imgs/川JK0707.jpg', help='the image path')
parser.add_argument('--device', default='cuda', help='Use cuda to vaildate model')
parser.add_argument('--export_onnx', default=False, help='export model to onnx')
parser.add_argument('--dynamic', default=False, help='use dynamic batch size')
args = parser.parse_args()
result = validation(args)
print('recongise result:', result)
# -*- coding: utf-8 -*-
# /usr/bin/env/python3
from load_data import CHARS, CHARS_DICT, LPRDataLoader
from lprnet import build_lprnet
# import torch.backends.cudnn as cudnn
from torch.autograd import Variable
import torch.nn.functional as F
from torch.utils.data import *
from torch import optim
import torch.nn as nn
import numpy as np
import argparse
import torch
import time
import os
print(torch.cuda.is_available())
def sparse_tuple_for_ctc(T_length, lengths):
input_lengths = []
target_lengths = []
for ch in lengths:
input_lengths.append(T_length)
target_lengths.append(ch)
return tuple(input_lengths), tuple(target_lengths)
def adjust_learning_rate(optimizer, cur_epoch, base_lr, lr_schedule):
"""
Sets the learning rate
"""
lr = 0
for i, e in enumerate(lr_schedule):
if cur_epoch < e:
lr = base_lr * (0.1 ** i)
break
if lr == 0:
lr = base_lr
for param_group in optimizer.param_groups:
param_group['lr'] = lr
return lr
def get_parser():
parser = argparse.ArgumentParser(description='parameters to train net')
parser.add_argument('--max_epoch', default=15, help='epoch to train the network')
parser.add_argument('--img_size', default=[94, 24], help='the image size')
parser.add_argument('--train_img_dirs', default="data/train", help='the train images path')
parser.add_argument('--test_img_dirs', default="data/test", help='the test images path')
parser.add_argument('--dropout_rate', default=0.5, help='dropout rate.')
parser.add_argument('--learning_rate', default=0.1, help='base value of learning rate.')
parser.add_argument('--lpr_max_len', default=8, help='license plate number max length.')
parser.add_argument('--train_batch_size', default=64, help='training batch size.')
parser.add_argument('--test_batch_size', default=10, help='testing batch size.')
parser.add_argument('--phase_train', default=True, type=bool, help='train or test phase flag.')
parser.add_argument('--num_workers', default=8, type=int, help='Number of workers used in dataloading')
parser.add_argument('--cuda', default=True, type=bool, help='Use cuda to train model')
parser.add_argument('--resume_epoch', default=10, type=int, help='resume iter for retraining')
parser.add_argument('--save_interval', default=2000, type=int, help='interval for save model state dict')
parser.add_argument('--test_interval', default=2000, type=int, help='interval for evaluate')
parser.add_argument('--momentum', default=0.9, type=float, help='momentum')
parser.add_argument('--weight_decay', default=2e-5, type=float, help='Weight decay for SGD')
parser.add_argument('--lr_schedule', default=[4, 8, 12, 14, 16], help='schedule for learning rate.')
parser.add_argument('--save_folder', default='./weights/', help='Location to save checkpoint models')
# parser.add_argument('--pretrained_model', default='./weights/Final_LPRNet_model.pth', help='pretrained base model')
parser.add_argument('--pretrained_model', default='', help='pretrained base model')
args = parser.parse_args()
return args
def collate_fn(batch):
imgs = []
labels = []
lengths = []
for _, sample in enumerate(batch):
img, label, length = sample
imgs.append(torch.from_numpy(img))
labels.extend(label)
lengths.append(length)
labels = np.asarray(labels).flatten().astype(np.int16)
return (torch.stack(imgs, 0), torch.from_numpy(labels), lengths)
def train():
args = get_parser()
T_length = 18 # args.lpr_max_len
epoch = 0 + args.resume_epoch
loss_val = 0
if not os.path.exists(args.save_folder):
os.mkdir(args.save_folder)
lprnet = build_lprnet(class_num=len(CHARS), phase=args.phase_train)
device = torch.device("cuda:0" if args.cuda else "cpu")
lprnet.to(device)
print("Successful to build network!")
# load pretrained model
if args.pretrained_model:
lprnet.load_state_dict(torch.load(args.pretrained_model))
print("load pretrained model successful!")
else:
def xavier(param):
nn.init.xavier_uniform(param)
def weights_init(m):
for key in m.state_dict():
if key.split('.')[-1] == 'weight':
if 'conv' in key:
nn.init.kaiming_normal_(m.state_dict()[key], mode='fan_out')
if 'bn' in key:
m.state_dict()[key][...] = xavier(1)
elif key.split('.')[-1] == 'bias':
m.state_dict()[key][...] = 0.01
lprnet.backbone.apply(weights_init)
lprnet.container.apply(weights_init)
print("initial net weights successful!")
# define optimizer
# optimizer = optim.SGD(lprnet.parameters(), lr=args.learning_rate,
# momentum=args.momentum, weight_decay=args.weight_decay)
optimizer = optim.RMSprop(lprnet.parameters(), lr=args.learning_rate, alpha = 0.9, eps=1e-08,
momentum=args.momentum, weight_decay=args.weight_decay)
train_img_dirs = os.path.expanduser(args.train_img_dirs)
test_img_dirs = os.path.expanduser(args.test_img_dirs)
train_dataset = LPRDataLoader(train_img_dirs.split(','), args.img_size)
test_dataset = LPRDataLoader(test_img_dirs.split(','), args.img_size)
epoch_size = len(train_dataset) // args.train_batch_size
max_iter = args.max_epoch * epoch_size
ctc_loss = nn.CTCLoss(blank=len(CHARS)-1, reduction='mean') # reduction: 'none' | 'mean' | 'sum'
if args.resume_epoch > 0:
start_iter = args.resume_epoch * epoch_size
else:
start_iter = 0
for iteration in range(start_iter, max_iter):
if iteration % epoch_size == 0:
# create batch iterator
batch_iterator = iter(DataLoader(train_dataset, args.train_batch_size, shuffle=True, num_workers=args.num_workers, collate_fn=collate_fn))
loss_val = 0
epoch += 1
if iteration !=0 and iteration % args.save_interval == 0:
torch.save(lprnet.state_dict(), args.save_folder + 'LPRNet_' + '_iteration_' + repr(iteration) + '.pth')
if (iteration + 1) % args.test_interval == 0:
Greedy_Decode_Eval(lprnet, test_dataset, args)
# lprnet.train() # should be switch to train mode
start_time = time.time()
# load train data
images, labels, lengths = next(batch_iterator)
# labels = np.array([el.numpy() for el in labels]).T
# print(labels)
# get ctc parameters
input_lengths, target_lengths = sparse_tuple_for_ctc(T_length, lengths)
# update lr
lr = adjust_learning_rate(optimizer, epoch, args.learning_rate, args.lr_schedule)
if args.cuda:
images = Variable(images, requires_grad=False).cuda()
labels = Variable(labels, requires_grad=False).cuda()
else:
images = Variable(images, requires_grad=False)
labels = Variable(labels, requires_grad=False)
# forward
logits = lprnet(images)
# print(logits.size())
log_probs = logits.permute(2, 0, 1) # for ctc loss: T x N x C
# print(labels.shape)
log_probs = log_probs.log_softmax(2).requires_grad_()
# log_probs = log_probs.detach().requires_grad_()
# print(log_probs.shape)
# backprop
optimizer.zero_grad()
loss = ctc_loss(log_probs, labels, input_lengths=input_lengths, target_lengths=target_lengths)
if loss.item() == np.inf:
continue
loss.backward()
optimizer.step()
loss_val += loss.item()
end_time = time.time()
if iteration % 20 == 0:
print('Epoch:' + repr(epoch) + ' || epochiter: ' + repr(iteration % epoch_size) + '/' + repr(epoch_size)
+ '|| Totel iter ' + repr(iteration) + ' || Loss: %.4f||' % (loss.item()) +
'Batch time: %.4f sec. ||' % (end_time - start_time) + 'LR: %.8f' % (lr))
# final test
print("Final test Accuracy:")
Greedy_Decode_Eval(lprnet, test_dataset, args)
# save final parameters
torch.save(lprnet.state_dict(), args.save_folder + 'Final_LPRNet_model.pth')
def Greedy_Decode_Eval(Net, datasets, args):
# TestNet = Net.eval()
epoch_size = len(datasets) // args.test_batch_size
batch_iterator = iter(DataLoader(datasets, args.test_batch_size, shuffle=True, num_workers=args.num_workers, collate_fn=collate_fn))
Tp = 0
Tn_1 = 0
Tn_2 = 0
t1 = time.time()
for i in range(epoch_size):
# load train data
images, labels, lengths = next(batch_iterator)
start = 0
targets = []
for length in lengths:
label = labels[start:start+length]
targets.append(label)
start += length
targets = np.array([el.numpy() for el in targets])
if args.cuda:
images = Variable(images.cuda())
else:
images = Variable(images)
# forward
prebs = Net(images)
# greedy decode
prebs = prebs.cpu().detach().numpy()
preb_labels = list()
for i in range(prebs.shape[0]):
preb = prebs[i, :, :]
preb_label = list()
for j in range(preb.shape[1]):
preb_label.append(np.argmax(preb[:, j], axis=0))
no_repeat_blank_label = list()
pre_c = preb_label[0]
if pre_c != len(CHARS) - 1:
no_repeat_blank_label.append(pre_c)
for c in preb_label: # dropout repeate label and blank label
if (pre_c == c) or (c == len(CHARS) - 1):
if c == len(CHARS) - 1:
pre_c = c
continue
no_repeat_blank_label.append(c)
pre_c = c
preb_labels.append(no_repeat_blank_label)
for i, label in enumerate(preb_labels):
if len(label) != len(targets[i]):
Tn_1 += 1
continue
if (np.asarray(targets[i]) == np.asarray(label)).all():
Tp += 1
else:
Tn_2 += 1
Acc = Tp * 1.0 / (Tp + Tn_1 + Tn_2)
print("[Info] Test Accuracy: {} [{}:{}:{}:{}]".format(Acc, Tp, Tn_1, Tn_2, (Tp+Tn_1+Tn_2)))
t2 = time.time()
print("[Info] Test Speed: {}s 1/{}]".format((t2 - t1) / len(datasets), len(datasets)))
if __name__ == "__main__":
train()
import torch
from itertools import product as product
import numpy as np
from math import ceil
cfg_mnet = {
'name': 'mobilenet0.25',
'min_sizes': [[24, 48], [96, 192], [384, 768]],
'steps': [8, 16, 32],
'variance': [0.1, 0.2],
'clip': False,
'loc_weight': 2.0,
'gpu_train': True,
'batch_size': 48,
'ngpu': 1,
'epoch': 50,
'decay1': 190,
'decay2': 220,
'image_size': 640,
'pretrain': False,
'return_layers': {'stage1': 1, 'stage2': 2, 'stage3': 3},
'in_channel': 32,
'out_channel': 64
}
def py_cpu_nms(dets, thresh):
"""Pure Python NMS baseline."""
x1 = dets[:, 0]
y1 = dets[:, 1]
x2 = dets[:, 2]
y2 = dets[:, 3]
scores = dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
def decode(loc, priors, variances):
"""Decode locations from predictions using priors to undo
the encoding we did for offset regression at train time.
Args:
loc (tensor): location predictions for loc layers,
Shape: [num_priors,4]
priors (tensor): Prior boxes in center-offset form.
Shape: [num_priors,4].
variances: (list[float]) Variances of priorboxes
Return:
decoded bounding box predictions
"""
boxes = torch.cat((
priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
priors[:, 2:] * torch.exp(loc[:, 2:] * variances[1])), 1)
boxes[:, :2] -= boxes[:, 2:] / 2
boxes[:, 2:] += boxes[:, :2]
return boxes
def decode_landm(pre, priors, variances):
"""Decode landm from predictions using priors to undo
the encoding we did for offset regression at train time.
Args:
pre (tensor): landm predictions for loc layers,
Shape: [num_priors,10]
priors (tensor): Prior boxes in center-offset form.
Shape: [num_priors,4].
variances: (list[float]) Variances of priorboxes
Return:
decoded landm predictions
"""
landms = torch.cat((priors[:, :2] + pre[:, :2] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:],
#priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:],
), dim=1)
return landms
class PriorBox(object):
def __init__(self, cfg):
super(PriorBox, self).__init__()
self.min_sizes = cfg['min_sizes']
self.steps = cfg['steps']
self.clip = cfg['clip']
def __call__(self, image_size):
feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in self.steps]
anchors = []
for k, f in enumerate(feature_maps):
min_sizes = self.min_sizes[k]
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes:
s_kx = min_size / image_size[1]
s_ky = min_size / image_size[0]
dense_cx = [x * self.steps[k] / image_size[1] for x in [j + 0.5]]
dense_cy = [y * self.steps[k] / image_size[0] for y in [i + 0.5]]
for cy, cx in product(dense_cy, dense_cx):
anchors += [cx, cy, s_kx, s_ky]
#print(anchors)
#exit(1)
# back to torch land
output = torch.Tensor(anchors).view(-1, 4)
if self.clip:
output.clamp_(max=1, min=0)
return output
class PriorBoxPy(object):
def __init__(self, cfg):
super(PriorBoxPy, self).__init__()
self.min_sizes = cfg['min_sizes']
self.steps = cfg['steps']
self.clip = cfg['clip']
def __call__(self, image_size):
feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in self.steps]
anchors = []
for k, f in enumerate(feature_maps):
min_sizes = self.min_sizes[k]
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes:
s_kx = min_size / image_size[1]
s_ky = min_size / image_size[0]
dense_cx = [x * self.steps[k] / image_size[1] for x in [j + 0.5]]
dense_cy = [y * self.steps[k] / image_size[0] for y in [i + 0.5]]
for cy, cx in product(dense_cy, dense_cx):
anchors += [cx, cy, s_kx, s_ky]
#print(anchors)
#exit(1)
# back to torch land
output = np.array(anchors).reshape((-1, 4))
if self.clip:
output.clamp_(max=1, min=0)
return output
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment