Commit cbcb90dc authored by Kai Chen's avatar Kai Chen
Browse files

port from cvbase and do some refactoring

parent b3a84c7f
from .utils import *
from .io import *
from .opencv_info import *
from .image import *
from .video import *
from .visualization import *
from .version import *
from .io import *
from .processing import *
import os.path as osp
import cv2
import numpy as np
from mmcv.utils import is_str, check_file_exist, mkdir_or_exist
from mmcv.opencv_info import USE_OPENCV2
if not USE_OPENCV2:
from cv2 import IMREAD_COLOR, IMREAD_GRAYSCALE, IMREAD_UNCHANGED
else:
from cv2 import CV_LOAD_IMAGE_COLOR as IMREAD_COLOR
from cv2 import CV_LOAD_IMAGE_GRAYSCALE as IMREAD_GRAYSCALE
from cv2 import CV_LOAD_IMAGE_UNCHANGED as IMREAD_UNCHANGED
imread_flags = {
'color': IMREAD_COLOR,
'grayscale': IMREAD_GRAYSCALE,
'unchanged': IMREAD_UNCHANGED
}
def read_img(img_or_path, flag='color'):
"""Read an image.
Args:
img_or_path (ndarray or str): Either a numpy array or image path.
If it is a numpy array (loaded image), then it will be returned
as is.
flag (str): Flags specifying the color type of a loaded image,
candidates are `color`, `grayscale` and `unchanged`.
Returns:
ndarray: Loaded image array.
"""
if isinstance(img_or_path, np.ndarray):
return img_or_path
elif is_str(img_or_path):
flag = imread_flags[flag] if is_str(flag) else flag
check_file_exist(img_or_path,
'img file does not exist: {}'.format(img_or_path))
return cv2.imread(img_or_path, flag)
else:
raise TypeError('"img" must be a numpy array or a filename')
def img_from_bytes(content, flag='color'):
"""Read an image from bytes.
Args:
content (bytes): Image bytes got from files or other streams.
flag (str): Same as :func:`read_img`.
Returns:
ndarray: Loaded image array.
"""
img_np = np.fromstring(content, np.uint8)
flag = imread_flags[flag] if is_str(flag) else flag
img = cv2.imdecode(img_np, flag)
return img
def write_img(img, file_path, params=None, auto_mkdir=True):
"""Write image to file
Args:
img (ndarray): Image to be written to file.
file_path (str): Image file path.
params (None or list): Same as opencv's :func:`imwrite` interface.
auto_mkdir (bool): If the parrent folder of `file_path` does not exist,
whether to create it automatically.
Returns:
bool: Successful or not.
"""
if auto_mkdir:
dir_name = osp.abspath(osp.dirname(file_path))
mkdir_or_exist(dir_name)
return cv2.imwrite(file_path, img, params)
from __future__ import division
import cv2
import numpy as np
from .io import read_img
def bgr2gray(img, keepdim=False):
"""Convert a BGR image to grayscale image.
Args:
img (ndarray or str): The input image or image path.
keepdim (bool): If False (by default), then return the grayscale image
with 2 dims, otherwise 3 dims.
Returns:
ndarray: The converted grayscale image.
"""
in_img = read_img(img)
out_img = cv2.cvtColor(in_img, cv2.COLOR_BGR2GRAY)
if keepdim:
out_img = out_img[..., None]
return out_img
def gray2bgr(img):
"""Convert a grayscale image to BGR image.
Args:
img (ndarray or str): The input image or image path.
Returns:
ndarray: The converted BGR image.
"""
in_img = read_img(img)
in_img = in_img[..., None] if in_img.ndim == 2 else in_img
out_img = cv2.cvtColor(in_img, cv2.COLOR_GRAY2BGR)
return out_img
def convert_color_factory(src, dst):
code = getattr(cv2, 'COLOR_{}2{}'.format(src.upper(), dst.upper()))
def convert_color(img):
in_img = read_img(img)
out_img = cv2.cvtColor(in_img, code)
return out_img
convert_color.__doc__ = """Convert a {0} image to {1} image
Args:
img (ndarray or str): The input image or image path.
Returns:
ndarray: The converted {1} image
""".format(src.upper(), dst.upper())
return convert_color
bgr2rgb = convert_color_factory('bgr', 'rgb')
rgb2bgr = convert_color_factory('rgb', 'bgr')
bgr2hsv = convert_color_factory('bgr', 'hsv')
hsv2bgr = convert_color_factory('hsv', 'bgr')
def scale_size(size, scale):
"""Rescale a size by a ratio.
Args:
size (tuple): w, h.
scale (float): Scaling factor.
Returns:
tuple[int]: scaled size.
"""
w, h = size
return int(w * float(scale) + 0.5), int(h * float(scale) + 0.5)
interp_codes = {
'nearest': cv2.INTER_NEAREST,
'bilinear': cv2.INTER_LINEAR,
'bicubic': cv2.INTER_CUBIC,
'area': cv2.INTER_AREA,
'lanczos': cv2.INTER_LANCZOS4
}
def resize(img, size, return_scale=False, interpolation='bilinear'):
"""Resize image to a given size.
Args:
img (ndarray): The input image or image path.
size (tuple): Target (w, h).
return_scale (bool): Whether to return `w_scale` and `h_scale`.
interpolation (str): Interpolation method, accepted values are
"nearest", "bilinear", "bicubic", "area", "lanczos".
Returns:
tuple or ndarray: (`resized_img`, `w_scale`, `h_scale`) or
`resized_img`.
"""
img = read_img(img)
h, w = img.shape[:2]
resized_img = cv2.resize(
img, size, interpolation=interp_codes[interpolation])
if not return_scale:
return resized_img
else:
w_scale = size[0] / float(w)
h_scale = size[1] / float(h)
return resized_img, w_scale, h_scale
def resize_like(img, dst_img, return_scale=False, interpolation='bilinear'):
"""Resize image to the same size of a given image.
Args:
img (ndarray): The input image or image path.
dst_img (ndarray): The target image.
return_scale (bool): Whether to return `w_scale` and `h_scale`.
interpolation (str): Same as :func:`resize`.
Returns:
tuple or ndarray: (`resized_img`, `w_scale`, `h_scale`) or
`resized_img`.
"""
h, w = dst_img.shape[:2]
return resize(img, (w, h), return_scale, interpolation)
def resize_by_ratio(img, ratio, interpolation='bilinear'):
"""Resize image by a ratio.
Args:
img (ndarray): The input image or image path.
ratio (float): Scaling factor.
interpolation (str): Same as :func:`resize`.
Returns:
ndarray: The resized image
"""
assert isinstance(ratio, (float, int)) and ratio > 0
img = read_img(img)
h, w = img.shape[:2]
new_size = scale_size((w, h), ratio)
return resize(img, new_size, interpolation=interpolation)
def resize_keep_ar(img,
max_long_edge,
max_short_edge,
return_scale=False,
interpolation='bilinear'):
"""Resize image with aspect ratio unchanged.
The long edge of resized image will be no greater than `max_long_edge`,
and the short edge of resized image is no greater than `max_short_edge`.
Args:
img (ndarray): The input image or image path.
max_long_edge (int): Max value of the long edge of resized image.
max_short_edge (int): Max value of the short edge of resized image.
return_scale (bool): Whether to return scale besides the resized image.
interpolation (str): Same as :func:`resize`.
Returns:
tuple or ndarray: (resized_img, scale_factor) or resized_img.
"""
if max_long_edge < max_short_edge:
raise ValueError(
'"max_long_edge" should not be less than "max_short_edge"')
img = read_img(img)
h, w = img.shape[:2]
ratio = min(
float(max_long_edge) / max(h, w),
float(max_short_edge) / min(h, w))
new_size = scale_size((w, h), ratio)
resized_img = resize(img, new_size, interpolation=interpolation)
if return_scale:
return resized_img, ratio
else:
return resized_img
def limit_size(img, max_edge, return_scale=False, interpolation='bilinear'):
"""Limit the size of an image.
If the long edge of the image is greater than max_edge, resize the image.
Args:
img (ndarray): The input image or image path.
max_edge (int): Maximum value of the long edge.
return_scale (bool): Whether to return scale besides the resized image.
interpolation (str): Same as :func:`resize`.
Returns:
tuple or ndarray: (resized_img, scale_factor) or resized_img.
"""
img = read_img(img)
h, w = img.shape[:2]
if max(h, w) > max_edge:
scale = float(max_edge) / max(h, w)
new_size = scale_size((w, h), scale)
resized_img = resize(img, new_size, interpolation=interpolation)
else:
scale = 1.0
resized_img = img
if return_scale:
return resized_img, scale
else:
return resized_img
def bbox_clip(bboxes, img_shape):
"""Clip bboxes to fit the image shape.
Args:
bboxes (ndarray): Shape (..., 4*k)
img_shape (tuple): (height, width) of the image.
Returns:
ndarray: Clipped bboxes.
"""
assert bboxes.shape[-1] % 4 == 0
cliped_bboxes = np.empty_like(bboxes, dtype=bboxes.dtype)
cliped_bboxes[..., 0::4] = np.maximum(
np.minimum(bboxes[..., 0::4], img_shape[1] - 1), 0)
cliped_bboxes[..., 1::4] = np.maximum(
np.minimum(bboxes[..., 1::4], img_shape[0] - 1), 0)
cliped_bboxes[..., 2::4] = np.maximum(
np.minimum(bboxes[..., 2::4], img_shape[1] - 1), 0)
cliped_bboxes[..., 3::4] = np.maximum(
np.minimum(bboxes[..., 3::4], img_shape[0] - 1), 0)
return cliped_bboxes
def bbox_scaling(bboxes, scale, clip_shape=None):
"""Scaling bboxes w.r.t the box center.
Args:
bboxes (ndarray): Shape(..., 4).
scale (float): Scaling factor.
clip_shape (tuple, optional): If specified, bboxes that exceed the
boundary will be clipped according to the given shape (h, w).
Returns:
ndarray: Scaled bboxes.
"""
if float(scale) == 1.0:
scaled_bboxes = bboxes.copy()
else:
w = bboxes[..., 2] - bboxes[..., 0] + 1
h = bboxes[..., 3] - bboxes[..., 1] + 1
dw = (w * (scale - 1)) * 0.5
dh = (h * (scale - 1)) * 0.5
scaled_bboxes = bboxes + np.stack((-dw, -dh, dw, dh), axis=-1)
if clip_shape is not None:
return bbox_clip(scaled_bboxes, clip_shape)
else:
return scaled_bboxes
def crop_img(img, bboxes, scale_ratio=1.0, pad_fill=None):
"""Crop image patches.
3 steps: scale the bboxes -> clip bboxes -> crop and pad.
Args:
img (ndarray): Image to be cropped.
bboxes (ndarray): Shape (k, 4) or (4, ), location of cropped bboxes.
scale_ratio (float, optional): Scale ratio of bboxes, the default value
1.0 means no padding.
pad_fill (number or list): Value to be filled for padding, None for
no padding.
Returns:
list or ndarray: The cropped image patches.
"""
chn = 1 if img.ndim == 2 else img.shape[2]
if pad_fill is not None:
if isinstance(pad_fill, (int, float)):
pad_fill = [pad_fill for _ in range(chn)]
assert len(pad_fill) == chn
img = read_img(img)
_bboxes = bboxes[None, ...] if bboxes.ndim == 1 else bboxes
scaled_bboxes = bbox_scaling(_bboxes, scale_ratio).astype(np.int32)
clipped_bbox = bbox_clip(scaled_bboxes, img.shape)
patches = []
for i in range(clipped_bbox.shape[0]):
x1, y1, x2, y2 = tuple(clipped_bbox[i, :].tolist())
if pad_fill is None:
patch = img[y1:y2 + 1, x1:x2 + 1, ...]
else:
_x1, _y1, _x2, _y2 = tuple(scaled_bboxes[i, :].tolist())
if chn == 2:
patch_shape = (_y2 - _y1 + 1, _x2 - _x1 + 1)
else:
patch_shape = (_y2 - _y1 + 1, _x2 - _x1 + 1, chn)
patch = np.array(
pad_fill, dtype=img.dtype) * np.ones(
patch_shape, dtype=img.dtype)
x_start = 0 if _x1 >= 0 else -_x1
y_start = 0 if _y1 >= 0 else -_y1
w = x2 - x1 + 1
h = y2 - y1 + 1
patch[y_start:y_start + h, x_start:x_start +
w, ...] = img[y1:y1 + h, x1:x1 + w, ...]
patches.append(patch)
if bboxes.ndim == 1:
return patches[0]
else:
return patches
def pad_img(img, shape, pad_val):
"""Pad an image to a certain shape.
Args:
img (ndarray): Image to be padded.
shape (tuple): Expected padding shape.
pad_val (number or list): Values to be filled in padding areas.
Returns:
ndarray: The padded image.
"""
if not isinstance(pad_val, (int, float)):
assert len(pad_val) == img.shape[-1]
if len(shape) < len(img.shape):
shape = shape + (img.shape[-1], )
assert len(shape) == len(img.shape)
for i in range(len(shape) - 1):
assert shape[i] >= img.shape[i]
pad = np.empty(shape, dtype=img.dtype)
pad[...] = pad_val
pad[:img.shape[0], :img.shape[1], ...] = img
return pad
def rotate_img(img,
angle,
center=None,
scale=1.0,
border_value=0,
auto_bound=False):
"""Rotate an image.
Args:
img (ndarray or str): Image to be rotated.
angle (float): Rotation angle in degrees, positive values mean
clockwise rotation.
center (tuple): Center of the rotation in the source image, by default
it is the center of the image.
scale (float): Isotropic scale factor.
border_value (int): Border value.
auto_bound (bool): Whether to adjust the image size to cover the whole
rotated image.
Returns:
ndarray: The rotated image.
"""
if center is not None and auto_bound:
raise ValueError('`auto_bound` conflicts with `center`')
img = read_img(img)
h, w = img.shape[:2]
if center is None:
center = ((w - 1) / 2, (h - 1) / 2)
assert isinstance(center, tuple)
matrix = cv2.getRotationMatrix2D(center, -angle, scale)
if auto_bound:
cos = np.abs(matrix[0, 0])
sin = np.abs(matrix[0, 1])
new_w = h * sin + w * cos
new_h = h * cos + w * sin
matrix[0, 2] += (new_w - w) / 2
matrix[1, 2] += (new_h - h) / 2
w = int(np.round(new_w))
h = int(np.round(new_h))
rotated = cv2.warpAffine(img, matrix, (w, h), borderValue=border_value)
return rotated
from abc import ABCMeta, abstractmethod
import json
import yaml
from six.moves import cPickle as pickle
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
from mmcv.utils import is_str
__all__ = ['load', 'dump', 'list_from_file', 'dict_from_file']
class BaseFileProcessor(object):
__metaclass__ = ABCMeta
@staticmethod
@abstractmethod
def load_from_path(filepath, **kwargs):
pass
@staticmethod
@abstractmethod
def load_from_fileobj(file, **kwargs):
pass
@staticmethod
@abstractmethod
def dump_to_str(obj, **kwargs):
pass
@staticmethod
@abstractmethod
def dump_to_path(obj, filepath, **kwargs):
pass
@staticmethod
@abstractmethod
def dump_to_fileobj(obj, file, **kwargs):
pass
class JsonProcessor(BaseFileProcessor):
@staticmethod
def load_from_path(filepath):
with open(filepath, 'r') as f:
obj = json.load(f)
return obj
@staticmethod
def load_from_fileobj(file):
return json.load(file)
@staticmethod
def dump_to_str(obj, **kwargs):
return json.dumps(obj, **kwargs)
@staticmethod
def dump_to_path(obj, filepath, **kwargs):
with open(filepath, 'w') as f:
json.dump(obj, f, **kwargs)
@staticmethod
def dump_to_fileobj(obj, file, **kwargs):
json.dump(obj, file, **kwargs)
class YamlProcessor(BaseFileProcessor):
@staticmethod
def load_from_path(filepath, **kwargs):
kwargs.setdefault('Loader', Loader)
with open(filepath, 'r') as f:
obj = yaml.load(f, **kwargs)
return obj
@staticmethod
def load_from_fileobj(file, **kwargs):
kwargs.setdefault('Loader', Loader)
return yaml.load(file, **kwargs)
@staticmethod
def dump_to_str(obj, **kwargs):
kwargs.setdefault('Dumper', Dumper)
return yaml.dump(obj, **kwargs)
@staticmethod
def dump_to_path(obj, filepath, **kwargs):
kwargs.setdefault('Dumper', Dumper)
with open(filepath, 'w') as f:
yaml.dump(obj, f, **kwargs)
@staticmethod
def dump_to_fileobj(obj, file, **kwargs):
kwargs.setdefault('Dumper', Dumper)
yaml.dump(obj, file, **kwargs)
class PickleProcessor(BaseFileProcessor):
@staticmethod
def load_from_path(filepath, **kwargs):
with open(filepath, 'rb') as f:
obj = pickle.load(f, **kwargs)
return obj
@staticmethod
def load_from_fileobj(file, **kwargs):
return pickle.load(file, **kwargs)
@staticmethod
def dump_to_str(obj, **kwargs):
kwargs.setdefault('protocol', 2)
return pickle.dumps(obj, **kwargs)
@staticmethod
def dump_to_path(obj, filepath, **kwargs):
kwargs.setdefault('protocol', 2)
with open(filepath, 'wb') as f:
pickle.dump(obj, f, **kwargs)
@staticmethod
def dump_to_fileobj(obj, file, **kwargs):
kwargs.setdefault('protocol', 2)
pickle.dump(obj, file, **kwargs)
file_processors = {
'json': JsonProcessor,
'yaml': YamlProcessor,
'yml': YamlProcessor,
'pickle': PickleProcessor,
'pkl': PickleProcessor
}
def load(file, file_format=None, **kwargs):
"""Load data from json/yaml/pickle files.
This method provides a unified api for loading data from serialized files.
Args:
file (str or file-like object): Filename or a file-like object.
file_format (str, optional): If not specified, the file format will be
inferred from the file extension, otherwise use the specified one.
Currently supported formats include "json", "yaml/yml" and
"pickle/pkl".
Returns:
The content from the file.
"""
if file_format is None and isinstance(file, str):
file_format = file.split('.')[-1]
if file_format not in file_processors:
raise TypeError('Unsupported format: {}'.format(file_format))
processor = file_processors[file_format]
if is_str(file):
obj = processor.load_from_path(file, **kwargs)
elif hasattr(file, 'read'):
obj = processor.load_from_fileobj(file, **kwargs)
else:
raise TypeError('"file" must be a filepath str or a file-object')
return obj
def dump(obj, file=None, file_format=None, **kwargs):
"""Dump data to json/yaml/pickle strings or files.
This method provides a unified api for dumping data as strings or to files,
and also supports custom arguments for each file format.
Args:
obj (any): The python object to be dumped.
file (str or file-like object, optional): If not specified, then the
object is dump to a str, otherwise to a file specified by the
filename or file-like object.
file_format (str, optional): Same as :func:`load`.
Returns:
bool: True for success, False otherwise
"""
if file_format is None:
if is_str(file):
file_format = file.split('.')[-1]
elif file is None:
raise ValueError(
'file_format must be specified since file is None')
if file_format not in file_processors:
raise TypeError('Unsupported format: {}'.format(file_format))
processor = file_processors[file_format]
if file is None:
return processor.dump_to_str(obj, **kwargs)
elif is_str(file):
processor.dump_to_path(obj, file, **kwargs)
elif hasattr(file, 'write'):
processor.dump_to_fileobj(obj, file, **kwargs)
else:
raise TypeError('"file" must be a filename str or a file-object')
def list_from_file(filename, prefix='', offset=0, max_num=0):
"""Load a text file and parse the content as a list of strings.
Args:
filename (str): Filename.
prefix (str): The prefix to be inserted to the begining of each item.
offset (int): The offset of lines.
max_num (int): The maximum number of lines to be read,
zeros and negatives mean no limitation.
Returns:
list[str]: A list of strings.
"""
cnt = 0
item_list = []
with open(filename, 'r') as f:
for _ in range(offset):
f.readline()
for line in f:
if max_num > 0 and cnt >= max_num:
break
item_list.append(prefix + line.rstrip('\n'))
cnt += 1
return item_list
def dict_from_file(filename, key_type=str):
"""Load a text file and parse the content as a dict.
Each line of the text file will be two or more columns splited by
whitespaces or tabs. The first column will be parsed as dict keys, and
the following columns will be parsed as dict values.
Args:
filename(str): Filename.
key_type(type): Type of the dict's keys. str is user by default and
type conversion will be performed if specified.
Returns:
dict: The parsed contents.
"""
mapping = {}
with open(filename, 'r') as f:
for line in f:
items = line.rstrip('\n').split()
assert len(items) >= 2
key = key_type(items[0])
val = items[1:] if len(items) > 2 else items[1]
mapping[key] = val
return mapping
import cv2
def use_opencv2():
return cv2.__version__.split('.')[0] == '2'
USE_OPENCV2 = use_opencv2()
from .misc import *
from .path import *
from .progressbar import *
from .timer import *
import collections
import functools
import itertools
import subprocess
from importlib import import_module
import six
def is_str(x):
"""Whether the input is an string instance."""
return isinstance(x, six.string_types)
def iter_cast(inputs, dst_type, return_type=None):
"""Cast elements of an iterable object into some type.
Args:
inputs (Iterable): The input object.
dst_type (type): Destination type.
return_type (type, optional): If specified, the output object will be
converted to this type, otherwise an iterator.
Returns:
iterator or specified type: The converted object.
"""
if not isinstance(inputs, collections.Iterable):
raise TypeError('inputs must be an iterable object')
if not isinstance(dst_type, type):
raise TypeError('"dst_type" must be a valid type')
out_iterable = six.moves.map(dst_type, inputs)
if return_type is None:
return out_iterable
else:
return return_type(out_iterable)
list_cast = functools.partial(iter_cast, return_type=list)
tuple_cast = functools.partial(iter_cast, return_type=tuple)
def is_seq_of(seq, expected_type, seq_type=None):
"""Check whether it is a sequence of some type.
Args:
seq (Sequence): The sequence to be checked.
expected_type (type): Expected type of sequence items.
seq_type (type, optional): Expected sequence type.
Returns:
bool: Whether the sequence is valid.
"""
if seq_type is None:
exp_seq_type = collections.Sequence
else:
assert isinstance(seq_type, type)
exp_seq_type = seq_type
if not isinstance(seq, exp_seq_type):
return False
for item in seq:
if not isinstance(item, expected_type):
return False
return True
is_list_of = functools.partial(is_seq_of, seq_type=list)
is_tuple_of = functools.partial(is_seq_of, seq_type=tuple)
def slice_list(in_list, lens):
"""Slice a list into several sub lists by a list of given length.
Args:
in_list (list): The list to be sliced.
lens(int or list): The expected length of each out list.
Returns:
list: A list of sliced list.
"""
if not isinstance(lens, list):
raise TypeError('"indices" must be a list of integers')
elif sum(lens) != len(in_list):
raise ValueError(
'sum of lens and list length does not match: {} != {}'.format(
sum(lens), len(in_list)))
out_list = []
idx = 0
for i in range(len(lens)):
out_list.append(in_list[idx:idx + lens[i]])
idx += lens[i]
return out_list
def concat_list(in_list):
"""Concatenate a list of list into a single list.
Args:
in_list (list): The list of list to be merged.
Returns:
list: The concatenated flat list.
"""
# if len(in_list) == 1:
# return in_list[0]
# else:
# return list(itertools.chain(*in_list))
return list(itertools.chain(*in_list))
# out_list = []
# for sub_list in in_list:
# out_list.extend(sub_list)
# return out_list
def check_prerequisites(
prerequisites,
checker,
msg_tmpl='Prerequisites "{}" are required in method "{}" but not '
'found, please install them first.'):
"""A decorator factory to check if prerequisites are satisfied.
Args:
prerequisites (str of list[str]): Prerequisites to be checked.
checker (callable): The checker method that returns True if a
prerequisite is meet, False otherwise.
msg_tmpl (str): The message template with two variables.
Returns:
decorator: A specific decorator.
"""
def wrap(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
requirements = [prerequisites] if isinstance(
prerequisites, str) else prerequisites
missing = []
for item in requirements:
if not checker(item):
missing.append(item)
if missing:
print(msg_tmpl.format(', '.join(missing), func.__name__))
raise RuntimeError('Prerequisites not meet.')
else:
return func(*args, **kwargs)
return wrapped_func
return wrap
def _check_py_package(package):
try:
import_module(package)
except ImportError:
return False
else:
return True
def _check_executable(cmd):
if subprocess.call('which {}'.format(cmd), shell=True) != 0:
return False
else:
return True
requires_package = functools.partial(
check_prerequisites, checker=_check_py_package)
requires_package.__doc__ = """A decorator to check if some python packages are installed.
Example:
>>> @requires_package('numpy')
>>> func(arg1, args):
>>> return numpy.zeros(1)
array([0.])
>>> @requires_package(['numpy', 'non_package'])
>>> func(arg1, args):
>>> return numpy.zeros(1)
ImportError
"""
requires_executable = functools.partial(
check_prerequisites, checker=_check_executable)
requires_executable.__doc__ = """A decorator to check if some executable files are installed.
Example:
>>> @requires_executable('ffmpeg')
>>> func(arg1, args):
>>> print(1)
1
"""
import os
import os.path as osp
import sys
from pathlib import Path
import six
from .misc import is_str
if sys.version_info <= (3, 3):
FileNotFoundError = IOError
else:
FileNotFoundError = FileNotFoundError
def is_filepath(x):
if is_str(x) or isinstance(x, Path):
return True
else:
return False
def fopen(filepath, *args, **kwargs):
if is_str(filepath):
return open(filepath, *args, **kwargs)
elif isinstance(filepath, Path):
return filepath.open(*args, **kwargs)
def check_file_exist(filename, msg_tmpl='file "{}" does not exist'):
if not osp.isfile(filename):
raise FileNotFoundError(msg_tmpl.format(filename))
def mkdir_or_exist(dir_name, mode=0o777):
if six.PY3:
os.makedirs(dir_name, mode=mode, exist_ok=True)
else:
if not osp.isdir(dir_name):
os.makedirs(dir_name, mode=0o777)
def _scandir_py35(dir_path, suffix=None):
for entry in os.scandir(dir_path):
if not entry.is_file():
continue
filename = entry.name
if suffix is None:
yield filename
elif filename.endswith(suffix):
yield filename
def _scandir_py(dir_path, suffix=None):
for filename in os.listdir(dir_path):
if not osp.isfile(osp.join(dir_path, filename)):
continue
if suffix is None:
yield filename
elif filename.endswith(suffix):
yield filename
def scandir(dir_path, suffix=None):
if suffix is not None and not isinstance(suffix, (str, tuple)):
raise TypeError('"suffix" must be a string or tuple of strings')
if sys.version_info >= (3, 5):
return _scandir_py35(dir_path, suffix)
else:
return _scandir_py(dir_path, suffix)
import collections
import sys
from multiprocessing import Pool
from .timer import Timer
class ProgressBar(object):
"""A progress bar which can print the progress"""
def __init__(self, task_num=0, bar_width=50, start=True):
self.task_num = task_num
max_bar_width = self._get_max_bar_width()
self.bar_width = (bar_width
if bar_width <= max_bar_width else max_bar_width)
self.completed = 0
if start:
self.start()
def _get_max_bar_width(self):
if sys.version_info > (3, 3):
from shutil import get_terminal_size
else:
from backports.shutil_get_terminal_size import get_terminal_size
terminal_width, _ = get_terminal_size()
max_bar_width = min(int(terminal_width * 0.6), terminal_width - 50)
if max_bar_width < 10:
print('terminal width is too small ({}), please consider '
'widen the terminal for better progressbar '
'visualization'.format(terminal_width))
max_bar_width = 10
return max_bar_width
def start(self):
if self.task_num > 0:
sys.stdout.write('[{}] 0/{}, elapsed: 0s, ETA:'.format(
' ' * self.bar_width, self.task_num))
else:
sys.stdout.write('completed: 0, elapsed: 0s')
sys.stdout.flush()
self.timer = Timer()
def update(self):
self.completed += 1
elapsed = self.timer.since_start()
fps = self.completed / elapsed
if self.task_num > 0:
percentage = self.completed / float(self.task_num)
eta = int(elapsed * (1 - percentage) / percentage + 0.5)
mark_width = int(self.bar_width * percentage)
bar_chars = '>' * mark_width + ' ' * (self.bar_width - mark_width)
sys.stdout.write(
'\r[{}] {}/{}, {:.1f} task/s, elapsed: {}s, ETA: {:5}s'.format(
bar_chars, self.completed, self.task_num, fps,
int(elapsed + 0.5), eta))
else:
sys.stdout.write(
'completed: {}, elapsed: {}s, {:.1f} tasks/s'.format(
self.completed, int(elapsed + 0.5), fps))
sys.stdout.flush()
def track_progress(func, tasks, bar_width=50, **kwargs):
"""Track the progress of tasks execution with a progress bar.
Tasks are done with a simple for-loop.
Args:
func (callable): The function to be applied to each task.
tasks (list or tuple[Iterable, int]): A list of tasks or
(tasks, total num).
bar_width (int): Width of progress bar.
Returns:
list: The task results.
"""
if isinstance(tasks, tuple):
assert len(tasks) == 2
assert isinstance(tasks[0], collections.Iterable)
assert isinstance(tasks[1], int)
task_num = tasks[1]
tasks = tasks[0]
elif isinstance(tasks, collections.Iterable):
task_num = len(tasks)
else:
raise TypeError(
'"tasks" must be an iterable object or a (iterator, int) tuple')
prog_bar = ProgressBar(task_num, bar_width)
results = []
for task in tasks:
results.append(func(task, **kwargs))
prog_bar.update()
sys.stdout.write('\n')
return results
def init_pool(process_num, initializer=None, initargs=None):
if initializer is None:
return Pool(process_num)
elif initargs is None:
return Pool(process_num, initializer)
else:
if not isinstance(initargs, tuple):
raise TypeError('"initargs" must be a tuple')
return Pool(process_num, initializer, initargs)
def track_parallel_progress(func,
tasks,
nproc,
initializer=None,
initargs=None,
bar_width=50,
chunksize=1,
skip_first=False,
keep_order=True):
"""Track the progress of parallel task execution with a progress bar.
The built-in :mod:`multiprocessing` module is used for process pools and
tasks are done with :func:`Pool.map` or :func:`Pool.imap_unordered`.
Args:
func (callable): The function to be applied to each task.
tasks (list or tuple[Iterable, int]): A list of tasks or
(tasks, total num).
nproc (int): Process (worker) number.
initializer (None or callable): Refer to :class:`multiprocessing.Pool`
for details.
initargs (None or tuple): Refer to :class:`multiprocessing.Pool` for
details.
chunksize (int): Refer to :class:`multiprocessing.Pool` for details.
bar_width (int): Width of progress bar.
skip_first (bool): Whether to skip the first sample for each worker
when estimating fps, since the initialization step may takes
longer.
keep_order (bool): If True, :func:`Pool.imap` is used, otherwise
:func:`Pool.imap_unordered` is used.
Returns:
list: The task results.
"""
if isinstance(tasks, tuple):
assert len(tasks) == 2
assert isinstance(tasks[0], collections.Iterable)
assert isinstance(tasks[1], int)
task_num = tasks[1]
tasks = tasks[0]
elif isinstance(tasks, collections.Iterable):
task_num = len(tasks)
else:
raise TypeError(
'"tasks" must be an iterable object or a (iterator, int) tuple')
pool = init_pool(nproc, initializer, initargs)
start = not skip_first
task_num -= nproc * chunksize * int(skip_first)
prog_bar = ProgressBar(task_num, bar_width, start)
results = []
if keep_order:
gen = pool.imap(func, tasks, chunksize)
else:
gen = pool.imap_unordered(func, tasks, chunksize)
for result in gen:
results.append(result)
if skip_first:
if len(results) < nproc * chunksize:
continue
elif len(results) == nproc * chunksize:
prog_bar.start()
continue
prog_bar.update()
sys.stdout.write('\n')
pool.close()
pool.join()
return results
from time import time
class TimerError(Exception):
def __init__(self, message):
self.message = message
super(TimerError, self).__init__(message)
class Timer(object):
"""A flexible Timer class.
:Example:
>>> import time
>>> import cvbase as cvb
>>> with cvb.Timer():
>>> # simulate a code block that will run for 1s
>>> time.sleep(1)
1.000
>>> with cvb.Timer(print_tmpl='it takes {:.1f} seconds'):
>>> # simulate a code block that will run for 1s
>>> time.sleep(1)
it takes 1.0 seconds
>>> timer = cvb.Timer()
>>> time.sleep(0.5)
>>> print(timer.since_start())
0.500
>>> time.sleep(0.5)
>>> print(timer.since_last_check())
0.500
>>> print(timer.since_start())
1.000
"""
def __init__(self, start=True, print_tmpl=None):
self._is_running = False
self.print_tmpl = print_tmpl if print_tmpl else '{:.3f}'
if start:
self.start()
@property
def is_running(self):
"""bool: indicate whether the timer is running"""
return self._is_running
def __enter__(self):
self.start()
return self
def __exit__(self, type, value, traceback):
print(self.print_tmpl.format(self.since_last_check()))
self._is_running = False
def start(self):
"""Start the timer."""
if not self._is_running:
self._t_start = time()
self._is_running = True
self._t_last = time()
def since_start(self):
"""Total time since the timer is started.
Returns (float): Time in seconds.
"""
if not self._is_running:
raise TimerError('timer is not running')
self._t_last = time()
return self._t_last - self._t_start
def since_last_check(self):
"""Time since the last checking.
Either :func:`since_start` or :func:`since_last_check` is a checking
operation.
Returns (float): Time in seconds.
"""
if not self._is_running:
raise TimerError('timer is not running')
dur = time() - self._t_last
self._t_last = time()
return dur
_g_timers = {} # global timers
def check_time(timer_id):
"""Add check points in a single line.
This method is suitable for running a task on a list of items. A timer will
be registered when the method is called for the first time.
:Example:
>>> import time
>>> import cvbase as cvb
>>> for i in range(1, 6):
>>> # simulate a code block
>>> time.sleep(i)
>>> cvb.check_time('task1')
2.000
3.000
4.000
5.000
Args:
timer_id (str): Timer identifier.
"""
if timer_id not in _g_timers:
_g_timers[timer_id] = Timer()
return 0
else:
return _g_timers[timer_id].since_last_check()
__version__ = '0.9.0'
from mmcv.opencv_info import USE_OPENCV2
if not USE_OPENCV2:
from cv2 import (CAP_PROP_FRAME_WIDTH, CAP_PROP_FRAME_HEIGHT, CAP_PROP_FPS,
CAP_PROP_FRAME_COUNT, CAP_PROP_FOURCC,
CAP_PROP_POS_FRAMES, VideoWriter_fourcc)
else:
from cv2.cv import CV_CAP_PROP_FRAME_WIDTH as CAP_PROP_FRAME_WIDTH
from cv2.cv import CV_CAP_PROP_FRAME_HEIGHT as CAP_PROP_FRAME_HEIGHT
from cv2.cv import CV_CAP_PROP_FPS as CAP_PROP_FPS
from cv2.cv import CV_CAP_PROP_FRAME_COUNT as CAP_PROP_FRAME_COUNT
from cv2.cv import CV_CAP_PROP_FOURCC as CAP_PROP_FOURCC
from cv2.cv import CV_CAP_PROP_POS_FRAMES as CAP_PROP_POS_FRAMES
from cv2.cv import CV_FOURCC as VideoWriter_fourcc
from .io import *
from .processing import *
from collections import OrderedDict
import os.path as osp
import cv2
from mmcv.utils import (scandir, check_file_exist, mkdir_or_exist,
track_progress)
from . import (CAP_PROP_FRAME_WIDTH, CAP_PROP_FRAME_HEIGHT, CAP_PROP_FPS,
CAP_PROP_FRAME_COUNT, CAP_PROP_FOURCC, CAP_PROP_POS_FRAMES,
VideoWriter_fourcc)
class Cache(object):
def __init__(self, capacity):
self._cache = OrderedDict()
self._capacity = int(capacity)
if capacity <= 0:
raise ValueError('capacity must be a positive integer')
@property
def capacity(self):
return self._capacity
@property
def size(self):
return len(self._cache)
def put(self, key, val):
if key in self._cache:
return
if len(self._cache) >= self.capacity:
self._cache.popitem(last=False)
self._cache[key] = val
def get(self, key, default=None):
val = self._cache[key] if key in self._cache else default
return val
class VideoReader(object):
"""Video class with similar usage to a list object.
This video warpper class provides convenient apis to access frames.
There exists an issue of OpenCV's VideoCapture class that jumping to a
certain frame may be inaccurate. It is fixed in this class by checking
the position after jumping each time.
Cache is used when decoding videos. So if the same frame is visited for
the second time, there is no need to decode again if it is stored in the
cache.
:Example:
>>> import cvbase as cvb
>>> v = cvb.VideoReader('sample.mp4')
>>> len(v) # get the total frame number with `len()`
120
>>> for img in v: # v is iterable
>>> cvb.show_img(img)
>>> v[5] # get the 6th frame
"""
def __init__(self, filename, cache_capacity=10):
check_file_exist(filename, 'Video file not found: ' + filename)
self._vcap = cv2.VideoCapture(filename)
assert cache_capacity > 0
self._cache = Cache(cache_capacity)
self._position = 0
# get basic info
self._width = int(self._vcap.get(CAP_PROP_FRAME_WIDTH))
self._height = int(self._vcap.get(CAP_PROP_FRAME_HEIGHT))
self._fps = int(round(self._vcap.get(CAP_PROP_FPS)))
self._frame_cnt = int(self._vcap.get(CAP_PROP_FRAME_COUNT))
self._fourcc = self._vcap.get(CAP_PROP_FOURCC)
@property
def vcap(self):
""":obj:`cv2.VideoCapture`: The raw VideoCapture object."""
return self._vcap
@property
def opened(self):
"""bool: Indicate whether the video is opened."""
return self._vcap.isOpened()
@property
def width(self):
"""int: Width of video frames."""
return self._width
@property
def height(self):
"""int: Height of video frames."""
return self._height
@property
def resolution(self):
"""tuple: Video resolution (width, height)."""
return (self._width, self._height)
@property
def fps(self):
"""int: FPS of the video."""
return self._fps
@property
def frame_cnt(self):
"""int: Total frames of the video."""
return self._frame_cnt
@property
def fourcc(self):
"""str: "Four character code" of the video."""
return self._fourcc
@property
def position(self):
"""int: Current cursor position, indicating frame decoded."""
return self._position
def _get_real_position(self):
return int(round(self._vcap.get(CAP_PROP_POS_FRAMES)))
def _set_real_position(self, frame_id):
self._vcap.set(CAP_PROP_POS_FRAMES, frame_id)
pos = self._get_real_position()
for _ in range(frame_id - pos):
self._vcap.read()
self._position = frame_id
def read(self):
"""Read the next frame.
If the next frame have been decoded before and in the cache, then
return it directly, otherwise decode and return it, put it in the cache.
Returns:
ndarray or None: Return the frame if successful, otherwise None.
"""
pos = self._position + 1
if self._cache:
img = self._cache.get(pos)
if img is not None:
ret = True
else:
if self._position != self._get_real_position():
self._set_real_position(self._position)
ret, img = self._vcap.read()
if ret:
self._cache.put(pos, img)
else:
ret, img = self._vcap.read()
if ret:
self._position = pos
return img
def get_frame(self, frame_id):
"""Get frame by index.
Args:
frame_id (int): Index of the expected frame, 1-based.
Returns:
ndarray or None: Return the frame if successful, otherwise None.
"""
if frame_id <= 0 or frame_id > self._frame_cnt:
raise ValueError('"frame_id" must be between 1 and {}'.format(
self._frame_cnt))
if frame_id == self._position + 1:
return self.read()
if self._cache:
img = self._cache.get(frame_id)
if img is not None:
self._position = frame_id
return img
self._set_real_position(frame_id - 1)
ret, img = self._vcap.read()
if ret:
self._position += 1
if self._cache:
self._cache.put(self._position, img)
return img
def current_frame(self):
"""Get the current frame (frame that is just visited).
Returns:
ndarray or None: If the video is fresh, return None, otherwise
return the frame.
"""
if self._position == 0:
return None
return self._cache.get(self._position)
def cvt2frames(self,
frame_dir,
file_start=0,
filename_tmpl='{:06d}.jpg',
start=0,
max_num=0,
show_progress=True):
"""Convert a video to frame images
Args:
frame_dir (str): Output directory to store all the frame images.
file_start (int): Filenames will start from the specified number.
filename_tmpl (str): Filename template with the index as the variable.
start (int): The starting frame index.
max_num (int): Maximum number of frames to be written.
show_progress (bool): Whether to show a progress bar.
"""
mkdir_or_exist(frame_dir)
if max_num == 0:
task_num = self.frame_cnt - start
else:
task_num = min(self.frame_cnt - start, max_num)
if task_num <= 0:
raise ValueError('start must be less than total frame number')
if start > 0:
self._set_real_position(start)
def write_frame(file_idx):
img = self.read()
filename = osp.join(frame_dir, filename_tmpl.format(file_idx))
cv2.imwrite(filename, img)
if show_progress:
track_progress(write_frame, range(file_start,
file_start + task_num))
else:
for i in range(task_num):
img = self.read()
if img is None:
break
filename = osp.join(frame_dir,
filename_tmpl.format(i + file_start))
cv2.imwrite(filename, img)
def __len__(self):
return self.frame_cnt
def __getitem__(self, index):
if isinstance(index, slice):
raise RuntimeError('slice has not been supported yet')
return self.get_frame(index)
def __iter__(self):
self._set_real_position(0)
return self
def __next__(self):
img = self.read()
if img is not None:
return img
else:
raise StopIteration
next = __next__
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._vcap.release()
def frames2video(frame_dir,
video_file,
fps=30,
fourcc='XVID',
filename_tmpl='{:06d}.jpg',
start=0,
end=0,
show_progress=True):
"""Read the frame images from a directory and join them as a video
Args:
frame_dir (str): The directory containing video frames.
video_file (str): Output filename.
fps (int): FPS of the output video.
fourcc (str): Fourcc of the output video, this should be compatible
with the output file type.
filename_tmpl (str): Filename template with the index as the variable.
start (int): Starting frame index.
end (int): Ending frame index.
show_progress (bool): Whether to show a progress bar.
"""
if end == 0:
ext = filename_tmpl.split('.')[-1]
end = len([name for name in scandir(frame_dir, ext)])
first_file = osp.join(frame_dir, filename_tmpl.format(start))
check_file_exist(first_file, 'The start frame not found: ' + first_file)
img = cv2.imread(first_file)
height, width = img.shape[:2]
resolution = (width, height)
vwriter = cv2.VideoWriter(video_file, VideoWriter_fourcc(*fourcc), fps,
resolution)
def write_frame(file_idx):
filename = osp.join(frame_dir, filename_tmpl.format(file_idx))
img = cv2.imread(filename)
vwriter.write(img)
if show_progress:
track_progress(write_frame, range(start, end))
else:
for i in range(start, end):
filename = osp.join(frame_dir, filename_tmpl.format(i))
img = cv2.imread(filename)
vwriter.write(img)
vwriter.release()
import os
import os.path as osp
import subprocess
import tempfile
from mmcv.utils import requires_executable
@requires_executable('ffmpeg')
def convert_video(in_file, out_file, print_cmd=False, pre_options='',
**kwargs):
"""Convert a video with ffmpeg.
This provides a general api to ffmpeg, the executed command is::
ffmpeg -y <pre_options> -i <in_file> <options> <out_file>
Options(kwargs) are mapped to ffmpeg commands with the following rules:
- key=val: "-key val"
- key=True: "-key"
- key=False: ""
Args:
in_file (str): Input video filename.
out_file (str): Output video filename.
pre_options (str): Options appears before "-i <in_file>".
print_cmd (bool): Whether to print the final ffmpeg command.
"""
options = []
for k, v in kwargs.items():
if isinstance(v, bool):
if v:
options.append('-{}'.format(k))
elif k == 'log_level':
assert v in [
'quiet', 'panic', 'fatal', 'error', 'warning', 'info',
'verbose', 'debug', 'trace'
]
options.append('-loglevel {}'.format(v))
else:
options.append('-{} {}'.format(k, v))
cmd = 'ffmpeg -y {} -i {} {} {}'.format(pre_options, in_file,
' '.join(options), out_file)
if print_cmd:
print(cmd)
subprocess.call(cmd, shell=True)
@requires_executable('ffmpeg')
def resize_video(in_file,
out_file,
size=None,
ratio=None,
keep_ar=False,
log_level='info',
print_cmd=False,
**kwargs):
"""Resize a video.
Args:
in_file (str): Input video filename.
out_file (str): Output video filename.
size (tuple): Expected size (w, h), eg, (320, 240) or (320, -1).
ratio (tuple or float): Expected resize ratio, (2, 0.5) means
(w*2, h*0.5).
keep_ar (bool): Whether to keep original aspect ratio.
log_level (str): Logging level of ffmpeg.
print_cmd (bool): Whether to print the final ffmpeg command.
"""
if size is None and ratio is None:
raise ValueError('expected size or ratio must be specified')
elif size is not None and ratio is not None:
raise ValueError('size and ratio cannot be specified at the same time')
options = {'log_level': log_level}
if size:
if not keep_ar:
options['vf'] = 'scale={}:{}'.format(size[0], size[1])
else:
options['vf'] = ('scale=w={}:h={}:force_original_aspect_ratio'
'=decrease'.format(size[0], size[1]))
else:
if not isinstance(ratio, tuple):
ratio = (ratio, ratio)
options['vf'] = 'scale="trunc(iw*{}):trunc(ih*{})"'.format(
ratio[0], ratio[1])
convert_video(in_file, out_file, print_cmd, **options)
@requires_executable('ffmpeg')
def cut_video(in_file,
out_file,
start=None,
end=None,
vcodec=None,
acodec=None,
log_level='info',
print_cmd=False,
**kwargs):
"""Cut a clip from a video.
Args:
in_file (str): Input video filename.
out_file (str): Output video filename.
start (None or float): Start time (in seconds).
end (None or float): End time (in seconds).
vcodec (None or str): Output video codec, None for unchanged.
acodec (None or str): Output audio codec, None for unchanged.
log_level (str): Logging level of ffmpeg.
print_cmd (bool): Whether to print the final ffmpeg command.
"""
options = {'log_level': log_level}
if vcodec is None:
options['vcodec'] = 'copy'
if acodec is None:
options['acodec'] = 'copy'
if start:
options['ss'] = start
else:
start = 0
if end:
options['t'] = end - start
convert_video(in_file, out_file, print_cmd, **options)
@requires_executable('ffmpeg')
def concat_video(video_list,
out_file,
vcodec=None,
acodec=None,
log_level='info',
print_cmd=False,
**kwargs):
"""Concatenate multiple videos into a single one.
Args:
video_list (list): A list of video filenames
out_file (str): Output video filename
vcodec (None or str): Output video codec, None for unchanged
acodec (None or str): Output audio codec, None for unchanged
log_level (str): Logging level of ffmpeg.
print_cmd (bool): Whether to print the final ffmpeg command.
"""
_, tmp_filename = tempfile.mkstemp(suffix='.txt', text=True)
with open(tmp_filename, 'w') as f:
for filename in video_list:
f.write('file {}\n'.format(osp.abspath(filename)))
options = {'log_level': log_level}
if vcodec is None:
options['vcodec'] = 'copy'
if acodec is None:
options['acodec'] = 'copy'
convert_video(
tmp_filename,
out_file,
print_cmd,
pre_options='-f concat -safe 0',
**options)
os.remove(tmp_filename)
from .color import *
\ No newline at end of file
from enum import Enum
import numpy as np
from mmcv.utils import is_str
class Color(Enum):
"""An enum that defines common colors.
Contains red, green, blue, cyan, yellow, magenta, white and black.
"""
red = (0, 0, 255)
green = (0, 255, 0)
blue = (255, 0, 0)
cyan = (255, 255, 0)
yellow = (0, 255, 255)
magenta = (255, 0, 255)
white = (255, 255, 255)
black = (0, 0, 0)
def color_val(color):
"""Convert various input to color tuples.
Args:
color (:obj:`Color`/str/tuple/int/ndarray): Color inputs
Returns:
tuple: A tuple of 3 integers indicating BGR channels.
"""
if is_str(color):
return Color[color].value
elif isinstance(color, Color):
return color.value
elif isinstance(color, tuple):
assert len(color) == 3
for channel in color:
assert channel >= 0 and channel <= 255
return color
elif isinstance(color, int):
assert color >= 0 and color <= 255
return color, color, color
elif isinstance(color, np.ndarray):
assert color.ndim == 1 and color.size == 3
assert np.all((color >= 0) & (color <= 255))
color = color.astype(np.uint8)
return tuple(color)
else:
raise TypeError('Invalid type for color: {}'.format(type(color)))
import cv2
import numpy as np
from mmcv.image import read_img, write_img
from .color import color_val
def show_img(img, win_name='', wait_time=0):
"""Show an image.
Args:
img (str or ndarray): The image to be displayed.
win_name (str): The window name.
wait_time (int): Value of waitKey param.
"""
cv2.imshow(win_name, read_img(img))
cv2.waitKey(wait_time)
def draw_bboxes(img,
bboxes,
colors='green',
top_k=-1,
thickness=1,
show=True,
win_name='',
wait_time=0,
out_file=None):
"""Draw bboxes on an image.
Args:
img (str or ndarray): The image to be displayed.
bboxes (list or ndarray): A list of ndarray of shape (k, 4).
colors (list[str or tuple or Color]): A list of colors.
top_k (int): Draw the first k bboxes only if set positive.
thickness (int): Thickness of lines.
show (bool): Whether to show the image.
win_name (str): The window name.
wait_time (int): Value of waitKey param.
out_file (str, optional): The filename to write the image.
"""
img = read_img(img)
if isinstance(bboxes, np.ndarray):
bboxes = [bboxes]
if not isinstance(colors, list):
colors = [colors for _ in range(len(bboxes))]
colors = [color_val(c) for c in colors]
assert len(bboxes) == len(colors)
for i, _bboxes in enumerate(bboxes):
_bboxes = _bboxes.astype(np.int32)
if top_k <= 0:
_top_k = _bboxes.shape[0]
else:
_top_k = min(top_k, _bboxes.shape[0])
for j in range(_top_k):
left_top = (_bboxes[j, 0], _bboxes[j, 1])
right_bottom = (_bboxes[j, 2], _bboxes[j, 3])
cv2.rectangle(
img, left_top, right_bottom, colors[i], thickness=thickness)
if show:
show_img(img, win_name, wait_time)
if out_file is not None:
write_img(img, out_file)
[bdist_wheel]
universal=1
[aliases]
test=pytest
\ No newline at end of file
import sys
from setuptools import find_packages, setup
install_requires = ['numpy>=1.11.1', 'pyyaml', 'six']
if sys.version_info < (3, 3):
install_requires.append('backports.shutil_get_terminal_size')
if sys.version_info < (3, 4):
install_requires.extend(['enum34', 'pathlib'])
def readme():
with open('README.md') as f:
content = f.read()
return content
def get_version():
version_file = 'mmcv/version.py'
with open(version_file, 'r') as f:
exec(compile(f.read(), version_file, 'exec'))
return locals()['__version__']
setup(
name='mmcv',
version=get_version(),
description='Open MMLab Computer Vision Foundation',
long_description=readme(),
keywords='computer vision',
packages=find_packages(),
classifiers=[
'Development Status :: 4 - Beta',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Utilities',
],
url='https://github.com/open-mmlab/mmcv',
author='Kai Chen',
author_email='chenkaidev@gmail.com',
license='GPLv3',
setup_requires=['pytest-runner'],
tests_require=['pytest'],
install_requires=install_requires,
zip_safe=False)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment