Commit 5f2e58c5 authored by Kai Chen's avatar Kai Chen
Browse files

add optical flow module

parent 43ebfd9a
...@@ -15,3 +15,4 @@ else: ...@@ -15,3 +15,4 @@ else:
from .io import * from .io import *
from .processing import * from .processing import *
from .optflow import *
import numpy as np
from mmcv.image import read_img, write_img
from mmcv.utils import is_str
def _pair_name(filename, suffix=('_dx', '_dy')):
parts = filename.split('.')
path_wo_ext = parts[-2]
parts[-2] = path_wo_ext + suffix[0]
dx_filename = '.'.join(parts)
parts[-2] = path_wo_ext + suffix[1]
dy_filename = '.'.join(parts)
return dx_filename, dy_filename
def read_flow(flow_or_path, quantize=False, *args, **kwargs):
"""Read an optical flow map.
Args:
flow_or_path (ndarray or str): A flow map or filepath.
quantize (bool): whether to read quantized pair, if set to True,
remaining args will be passed to :func:`dequantize_flow`.
Returns:
ndarray: Optical flow represented as a (h, w, 2) numpy array
"""
if isinstance(flow_or_path, np.ndarray):
if (flow_or_path.ndim != 3) or (flow_or_path.shape[-1] != 2):
raise ValueError('Invalid flow with shape {}'.format(
flow_or_path.shape))
return flow_or_path
elif not is_str(flow_or_path):
raise TypeError(
'"flow_or_path" must be a filename or numpy array, not {}'.format(
type(flow_or_path)))
if not quantize:
with open(flow_or_path, 'rb') as f:
try:
header = f.read(4).decode('utf-8')
except Exception:
raise IOError('Invalid flow file: {}'.format(flow_or_path))
else:
if header != 'PIEH':
raise IOError(
'Invalid flow file: {}, header does not contain PIEH'.
format(flow_or_path))
w = np.fromfile(f, np.int32, 1).squeeze()
h = np.fromfile(f, np.int32, 1).squeeze()
flow = np.fromfile(f, np.float32, w * h * 2).reshape((h, w, 2))
else:
dx_filename, dy_filename = _pair_name(flow_or_path)
dx = read_img(dx_filename, flag='unchanged')
dy = read_img(dy_filename, flag='unchanged')
flow = dequantize_flow(dx, dy, *args, **kwargs)
return flow.astype(np.float32)
def write_flow(flow, filename, quantize=False, *args, **kwargs):
"""Write optical flow to file.
Args:
flow (ndarray): (h, w, 2) array of optical flow.
filename (str): Output filepath.
quantize (bool): Whether to quantize the flow and save it to 2 jpeg
images. If set to True, remaining args will be passed to
:func:`quantize_flow`.
"""
if not quantize:
with open(filename, 'wb') as f:
f.write('PIEH'.encode('utf-8'))
np.array([flow.shape[1], flow.shape[0]], dtype=np.int32).tofile(f)
flow = flow.astype(np.float32)
flow.tofile(f)
f.flush()
else:
dx, dy = quantize_flow(flow, *args, **kwargs)
dx_filename, dy_filename = _pair_name(filename)
write_img(dx, dx_filename)
write_img(dy, dy_filename)
def quantize_flow(flow, max_val=0.02, norm=True):
"""Quantize flow to [0, 255].
After this step, the size of flow will be much smaller, and can be
dumped as jpeg images.
Args:
flow (ndarray): (h, w, 2) array of optical flow.
max_val (float): Maximum value of flow, values beyond
[-max_val, max_val] will be truncated.
norm (bool): Whether to divide flow values by image width/height.
Returns:
tuple: Quantized dx and dy.
"""
h, w, _ = flow.shape
dx = flow[..., 0]
dy = flow[..., 1]
if norm:
dx = dx / w # avoid inplace operations
dy = dy / h
dx = np.maximum(0, np.minimum(dx + max_val, 2 * max_val))
dy = np.maximum(0, np.minimum(dy + max_val, 2 * max_val))
dx = np.round(dx * 255 / (max_val * 2)).astype(np.uint8)
dy = np.round(dy * 255 / (max_val * 2)).astype(np.uint8)
return dx, dy
def dequantize_flow(dx, dy, max_val=0.02, denorm=True):
"""Recover from quantized flow.
Args:
dx (ndarray): Quantized dx.
dy (ndarray): Quantized dy.
max_val (float): Maximum value used when quantizing.
denorm (bool): Whether to multiply flow values with width/height.
Returns:
tuple: Dequantized dx and dy
"""
assert dx.shape == dy.shape
assert dx.ndim == 2 or (dx.ndim == 3 and dx.shape[-1] == 1)
dx = dx.astype(np.float32) * max_val * 2 / 255 - max_val
dy = dy.astype(np.float32) * max_val * 2 / 255 - max_val
if denorm:
dx *= dx.shape[1]
dy *= dx.shape[0]
flow = np.dstack((dx, dy))
return flow
from .color import * from .color import *
\ No newline at end of file from .image import *
from .optflow import *
from __future__ import division
import numpy as np
from .image import show_img
from mmcv.image import rgb2bgr
from mmcv.video import read_flow
def show_flow(flow, win_name='', wait_time=0):
"""Show optical flow.
Args:
flow (ndarray or str): The optical flow to be displayed.
win_name (str): The window name.
wait_time (int): Value of waitKey param.
"""
flow = read_flow(flow)
flow_img = flow2rgb(flow)
show_img(rgb2bgr(flow_img), win_name, wait_time)
def flow2rgb(flow, color_wheel=None, unknown_thr=1e6):
"""Convert flow map to RGB image.
Args:
flow (ndarray): Array of optical flow.
color_wheel (ndarray or None): Color wheel used to map flow field to
RGB colorspace. Default color wheel will be used if not specified.
unknown_thr (str): Values above this threshold will be marked as
unknown and thus ignored.
Returns:
ndarray: RGB image that can be visualized.
"""
assert flow.ndim == 3 and flow.shape[-1] == 2
if color_wheel is None:
color_wheel = make_color_wheel()
assert color_wheel.ndim == 2 and color_wheel.shape[1] == 3
num_bins = color_wheel.shape[0]
dx = flow[:, :, 0].copy()
dy = flow[:, :, 1].copy()
ignore_inds = (np.isnan(dx) | np.isnan(dy) | (np.abs(dx) > unknown_thr) |
(np.abs(dy) > unknown_thr))
dx[ignore_inds] = 0
dy[ignore_inds] = 0
rad = np.sqrt(dx**2 + dy**2)
if np.any(rad > np.finfo(float).eps):
max_rad = np.max(rad)
dx /= max_rad
dy /= max_rad
[h, w] = dx.shape
rad = np.sqrt(dx**2 + dy**2)
angle = np.arctan2(-dy, -dx) / np.pi
bin_real = (angle + 1) / 2 * (num_bins - 1)
bin_left = np.floor(bin_real).astype(int)
bin_right = (bin_left + 1) % num_bins
w = (bin_real - bin_left.astype(np.float32))[..., None]
flow_img = (
1 - w) * color_wheel[bin_left, :] + w * color_wheel[bin_right, :]
small_ind = rad <= 1
flow_img[small_ind] = 1 - rad[small_ind, None] * (1 - flow_img[small_ind])
flow_img[np.logical_not(small_ind)] *= 0.75
flow_img[ignore_inds, :] = 0
return flow_img
def make_color_wheel(bins=None):
"""Build a color wheel.
Args:
bins(list or tuple, optional): Specify the number of bins for each
color range, corresponding to six ranges: red -> yellow,
yellow -> green, green -> cyan, cyan -> blue, blue -> magenta,
magenta -> red. [15, 6, 4, 11, 13, 6] is used for default
(see Middlebury).
Returns:
ndarray: Color wheel of shape (total_bins, 3).
"""
if bins is None:
bins = [15, 6, 4, 11, 13, 6]
assert len(bins) == 6
RY, YG, GC, CB, BM, MR = tuple(bins)
ry = [1, np.arange(RY) / RY, 0]
yg = [1 - np.arange(YG) / YG, 1, 0]
gc = [0, 1, np.arange(GC) / GC]
cb = [0, 1 - np.arange(CB) / CB, 1]
bm = [np.arange(BM) / BM, 0, 1]
mr = [1, 0, 1 - np.arange(MR) / MR]
num_bins = RY + YG + GC + CB + BM + MR
color_wheel = np.zeros((3, num_bins), dtype=np.float32)
col = 0
for i, color in enumerate([ry, yg, gc, cb, bm, mr]):
for j in range(3):
color_wheel[j, col:col + bins[i]] = color[j]
col += bins[i]
return color_wheel.T
import os
import os.path as osp
import tempfile
import mmcv
import numpy as np
import pytest
from numpy.testing import assert_array_equal, assert_array_almost_equal
def test_read_flow():
flow = mmcv.read_flow(osp.join(osp.dirname(__file__), 'data/optflow.flo'))
assert flow.ndim == 3 and flow.shape[-1] == 2
flow_same = mmcv.read_flow(flow)
assert_array_equal(flow, flow_same)
flow = mmcv.read_flow(
osp.join(osp.dirname(__file__), 'data/optflow.jpg'),
quantize=True,
denorm=True)
assert flow.ndim == 3 and flow.shape[-1] == 2
with pytest.raises(IOError):
mmcv.read_flow(osp.join(osp.dirname(__file__), 'data/color.jpg'))
with pytest.raises(ValueError):
mmcv.read_flow(np.zeros((100, 100, 1)))
with pytest.raises(TypeError):
mmcv.read_flow(1)
def test_write_flow():
flow = np.random.rand(100, 100, 2).astype(np.float32)
# write to a .flo file
_, filename = tempfile.mkstemp()
mmcv.write_flow(flow, filename)
flow_from_file = mmcv.read_flow(filename)
assert_array_equal(flow, flow_from_file)
os.remove(filename)
# write to two .jpg files
tmp_dir = tempfile.gettempdir()
mmcv.write_flow(flow, osp.join(tmp_dir, 'test_flow.jpg'), quantize=True)
assert osp.isfile(osp.join(tmp_dir, 'test_flow_dx.jpg'))
assert osp.isfile(osp.join(tmp_dir, 'test_flow_dy.jpg'))
os.remove(osp.join(tmp_dir, 'test_flow_dx.jpg'))
os.remove(osp.join(tmp_dir, 'test_flow_dy.jpg'))
def test_quantize_flow():
flow = (np.random.rand(10, 8, 2).astype(np.float32) - 0.5) * 15
max_val = 5.0
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=False)
ref = np.zeros_like(flow, dtype=np.uint8)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
for k in range(ref.shape[2]):
val = flow[i, j, k] + max_val
val = min(max(val, 0), 2 * max_val)
ref[i, j, k] = np.round(255 * val / (2 * max_val))
assert_array_equal(dx, ref[..., 0])
assert_array_equal(dy, ref[..., 1])
max_val = 0.5
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=True)
ref = np.zeros_like(flow, dtype=np.uint8)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
for k in range(ref.shape[2]):
scale = flow.shape[1] if k == 0 else flow.shape[0]
val = flow[i, j, k] / scale + max_val
val = min(max(val, 0), 2 * max_val)
ref[i, j, k] = np.round(255 * val / (2 * max_val))
assert_array_equal(dx, ref[..., 0])
assert_array_equal(dy, ref[..., 1])
def test_dequantize_flow():
dx = np.random.randint(256, size=(10, 8), dtype=np.uint8)
dy = np.random.randint(256, size=(10, 8), dtype=np.uint8)
max_val = 5.0
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=False)
ref = np.zeros_like(flow, dtype=np.float32)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
ref[i, j, 0] = float(dx[i, j]) * 2 * max_val / 255 - max_val
ref[i, j, 1] = float(dy[i, j]) * 2 * max_val / 255 - max_val
assert_array_almost_equal(flow, ref)
max_val = 0.5
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=True)
h, w = dx.shape
ref = np.zeros_like(flow, dtype=np.float32)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
ref[i, j, 0] = (float(dx[i, j]) * 2 * max_val / 255 - max_val) * w
ref[i, j, 1] = (float(dy[i, j]) * 2 * max_val / 255 - max_val) * h
assert_array_almost_equal(flow, ref)
def test_flow2rgb():
flow = np.array(
[[[0, 0], [0.5, 0.5], [1, 1], [2, 1], [3, np.inf]]], dtype=np.float32)
flow_img = mmcv.flow2rgb(flow)
# yapf: disable
assert_array_almost_equal(
flow_img,
np.array([[[1., 1., 1.],
[1., 0.826074731, 0.683772236],
[1., 0.652149462, 0.367544472],
[1., 0.265650552, 5.96046448e-08],
[0., 0., 0.]]],
dtype=np.float32))
# yapf: enable
def test_make_color_wheel():
default_color_wheel = mmcv.make_color_wheel()
color_wheel = mmcv.make_color_wheel([2, 2, 2, 2, 2, 2])
# yapf: disable
assert_array_equal(default_color_wheel, np.array(
[[1. , 0. , 0. ],
[1. , 0.06666667, 0. ],
[1. , 0.13333334, 0. ],
[1. , 0.2 , 0. ],
[1. , 0.26666668, 0. ],
[1. , 0.33333334, 0. ],
[1. , 0.4 , 0. ],
[1. , 0.46666667, 0. ],
[1. , 0.53333336, 0. ],
[1. , 0.6 , 0. ],
[1. , 0.6666667 , 0. ],
[1. , 0.73333335, 0. ],
[1. , 0.8 , 0. ],
[1. , 0.8666667 , 0. ],
[1. , 0.93333334, 0. ],
[1. , 1. , 0. ],
[0.8333333 , 1. , 0. ],
[0.6666667 , 1. , 0. ],
[0.5 , 1. , 0. ],
[0.33333334, 1. , 0. ],
[0.16666667, 1. , 0. ],
[0. , 1. , 0. ],
[0. , 1. , 0.25 ],
[0. , 1. , 0.5 ],
[0. , 1. , 0.75 ],
[0. , 1. , 1. ],
[0. , 0.90909094, 1. ],
[0. , 0.8181818 , 1. ],
[0. , 0.72727275, 1. ],
[0. , 0.6363636 , 1. ],
[0. , 0.54545456, 1. ],
[0. , 0.45454547, 1. ],
[0. , 0.36363637, 1. ],
[0. , 0.27272728, 1. ],
[0. , 0.18181819, 1. ],
[0. , 0.09090909, 1. ],
[0. , 0. , 1. ],
[0.07692308, 0. , 1. ],
[0.15384616, 0. , 1. ],
[0.23076923, 0. , 1. ],
[0.30769232, 0. , 1. ],
[0.3846154 , 0. , 1. ],
[0.46153846, 0. , 1. ],
[0.53846157, 0. , 1. ],
[0.61538464, 0. , 1. ],
[0.6923077 , 0. , 1. ],
[0.7692308 , 0. , 1. ],
[0.84615386, 0. , 1. ],
[0.9230769 , 0. , 1. ],
[1. , 0. , 1. ],
[1. , 0. , 0.8333333 ],
[1. , 0. , 0.6666667 ],
[1. , 0. , 0.5 ],
[1. , 0. , 0.33333334],
[1. , 0. , 0.16666667]], dtype=np.float32))
assert_array_equal(
color_wheel,
np.array([[1., 0. , 0. ],
[1. , 0.5, 0. ],
[1. , 1. , 0. ],
[0.5, 1. , 0. ],
[0. , 1. , 0. ],
[0. , 1. , 0.5],
[0. , 1. , 1. ],
[0. , 0.5, 1. ],
[0. , 0. , 1. ],
[0.5, 0. , 1. ],
[1. , 0. , 1. ],
[1. , 0. , 0.5]], dtype=np.float32))
# yapf: enable
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment