Commit 37437e80 authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'sun_22.10' into 'main'

Sun 22.10

See merge request dcutoolkit/deeplearing/dlexamples_new!54
parents 8442f072 701c0060
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import os
import os.path as osp
import cv2
import decord
import numpy as np
import torch
import webcolors
from mmcv import Config, DictAction
from mmaction.apis import inference_recognizer, init_recognizer
def parse_args():
parser = argparse.ArgumentParser(description='MMAction2 demo')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file/url')
parser.add_argument('video', help='video file/url or rawframes directory')
parser.add_argument('label', help='label file')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
parser.add_argument(
'--use-frames',
default=False,
action='store_true',
help='whether to use rawframes as input')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--fps',
default=30,
type=int,
help='specify fps value of the output video when using rawframes to '
'generate file')
parser.add_argument(
'--font-scale',
default=0.5,
type=float,
help='font scale of the label in output video')
parser.add_argument(
'--font-color',
default='white',
help='font color of the label in output video')
parser.add_argument(
'--target-resolution',
nargs=2,
default=None,
type=int,
help='Target resolution (w, h) for resizing the frames when using a '
'video as input. If either dimension is set to -1, the frames are '
'resized by keeping the existing aspect ratio')
parser.add_argument(
'--resize-algorithm',
default='bicubic',
help='resize algorithm applied to generate video')
parser.add_argument('--out-filename', default=None, help='output filename')
args = parser.parse_args()
return args
def get_output(video_path,
out_filename,
label,
fps=30,
font_scale=0.5,
font_color='white',
target_resolution=None,
resize_algorithm='bicubic',
use_frames=False):
"""Get demo output using ``moviepy``.
This function will generate video file or gif file from raw video or
frames, by using ``moviepy``. For more information of some parameters,
you can refer to: https://github.com/Zulko/moviepy.
Args:
video_path (str): The video file path or the rawframes directory path.
If ``use_frames`` is set to True, it should be rawframes directory
path. Otherwise, it should be video file path.
out_filename (str): Output filename for the generated file.
label (str): Predicted label of the generated file.
fps (int): Number of picture frames to read per second. Default: 30.
font_scale (float): Font scale of the label. Default: 0.5.
font_color (str): Font color of the label. Default: 'white'.
target_resolution (None | tuple[int | None]): Set to
(desired_width desired_height) to have resized frames. If either
dimension is None, the frames are resized by keeping the existing
aspect ratio. Default: None.
resize_algorithm (str): Support "bicubic", "bilinear", "neighbor",
"lanczos", etc. Default: 'bicubic'. For more information,
see https://ffmpeg.org/ffmpeg-scaler.html
use_frames: Determine Whether to use rawframes as input. Default:False.
"""
if video_path.startswith(('http://', 'https://')):
raise NotImplementedError
try:
from moviepy.editor import ImageSequenceClip
except ImportError:
raise ImportError('Please install moviepy to enable output file.')
# Channel Order is BGR
if use_frames:
frame_list = sorted(
[osp.join(video_path, x) for x in os.listdir(video_path)])
frames = [cv2.imread(x) for x in frame_list]
else:
video = decord.VideoReader(video_path)
frames = [x.asnumpy()[..., ::-1] for x in video]
if target_resolution:
w, h = target_resolution
frame_h, frame_w, _ = frames[0].shape
if w == -1:
w = int(h / frame_h * frame_w)
if h == -1:
h = int(w / frame_w * frame_h)
frames = [cv2.resize(f, (w, h)) for f in frames]
textsize = cv2.getTextSize(label, cv2.FONT_HERSHEY_DUPLEX, font_scale,
1)[0]
textheight = textsize[1]
padding = 10
location = (padding, padding + textheight)
if isinstance(font_color, str):
font_color = webcolors.name_to_rgb(font_color)[::-1]
frames = [np.array(frame) for frame in frames]
for frame in frames:
cv2.putText(frame, label, location, cv2.FONT_HERSHEY_DUPLEX,
font_scale, font_color, 1)
# RGB order
frames = [x[..., ::-1] for x in frames]
video_clips = ImageSequenceClip(frames, fps=fps)
out_type = osp.splitext(out_filename)[1][1:]
if out_type == 'gif':
video_clips.write_gif(out_filename)
else:
video_clips.write_videofile(out_filename, remove_temp=True)
def main():
args = parse_args()
# assign the desired device.
device = torch.device(args.device)
cfg = Config.fromfile(args.config)
cfg.merge_from_dict(args.cfg_options)
# build the recognizer from a config file and checkpoint file/url
model = init_recognizer(cfg, args.checkpoint, device=device)
# e.g. use ('backbone', ) to return backbone feature
output_layer_names = None
# test a single video or rawframes of a single video
if output_layer_names:
results, returned_feature = inference_recognizer(
model, args.video, outputs=output_layer_names)
else:
results = inference_recognizer(model, args.video)
labels = open(args.label).readlines()
labels = [x.strip() for x in labels]
results = [(labels[k[0]], k[1]) for k in results]
print('The top-5 labels with corresponding scores are:')
for result in results:
print(f'{result[0]}: ', result[1])
if args.out_filename is not None:
if args.target_resolution is not None:
if args.target_resolution[0] == -1:
assert isinstance(args.target_resolution[1], int)
assert args.target_resolution[1] > 0
if args.target_resolution[1] == -1:
assert isinstance(args.target_resolution[0], int)
assert args.target_resolution[0] > 0
args.target_resolution = tuple(args.target_resolution)
get_output(
args.video,
args.out_filename,
results[0][0],
fps=args.fps,
font_scale=args.font_scale,
font_color=args.font_color,
target_resolution=args.target_resolution,
resize_algorithm=args.resize_algorithm,
use_frames=args.use_frames)
if __name__ == '__main__':
main()
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import torch
from mmcv import Config, DictAction
from mmaction.apis import inference_recognizer, init_recognizer
def parse_args():
parser = argparse.ArgumentParser(description='MMAction2 demo')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file/url')
parser.add_argument('audio', help='audio file')
parser.add_argument('label', help='label file')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
args = parser.parse_args()
return args
def main():
args = parse_args()
device = torch.device(args.device)
cfg = Config.fromfile(args.config)
cfg.merge_from_dict(args.cfg_options)
model = init_recognizer(cfg, args.checkpoint, device=device)
if not args.audio.endswith('.npy'):
raise NotImplementedError('Demo works on extracted audio features')
results = inference_recognizer(model, args.audio)
labels = open(args.label).readlines()
labels = [x.strip() for x in labels]
results = [(labels[k[0]], k[1]) for k in results]
print('Scores:')
for result in results:
print(f'{result[0]}: ', result[1])
if __name__ == '__main__':
main()
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import os
import os.path as osp
import mmcv
import numpy as np
import torch
from mmcv import Config, DictAction
from mmcv.parallel import collate, scatter
from mmaction.apis import init_recognizer
from mmaction.datasets.pipelines import Compose
from mmaction.utils import GradCAM
def parse_args():
parser = argparse.ArgumentParser(description='MMAction2 GradCAM demo')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file/url')
parser.add_argument('video', help='video file/url or rawframes directory')
parser.add_argument(
'--use-frames',
default=False,
action='store_true',
help='whether to use rawframes as input')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--target-layer-name',
type=str,
default='backbone/layer4/1/relu',
help='GradCAM target layer name')
parser.add_argument('--out-filename', default=None, help='output filename')
parser.add_argument('--fps', default=5, type=int)
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
parser.add_argument(
'--target-resolution',
nargs=2,
default=None,
type=int,
help='Target resolution (w, h) for resizing the frames when using a '
'video as input. If either dimension is set to -1, the frames are '
'resized by keeping the existing aspect ratio')
parser.add_argument(
'--resize-algorithm',
default='bilinear',
help='resize algorithm applied to generate video & gif')
args = parser.parse_args()
return args
def build_inputs(model, video_path, use_frames=False):
"""build inputs for GradCAM.
Note that, building inputs for GradCAM is exactly the same as building
inputs for Recognizer test stage. Codes from `inference_recognizer`.
Args:
model (nn.Module): Recognizer model.
video_path (str): video file/url or rawframes directory.
use_frames (bool): whether to use rawframes as input.
Returns:
dict: Both GradCAM inputs and Recognizer test stage inputs,
including two keys, ``imgs`` and ``label``.
"""
if not (osp.exists(video_path) or video_path.startswith('http')):
raise RuntimeError(f"'{video_path}' is missing")
if osp.isfile(video_path) and use_frames:
raise RuntimeError(
f"'{video_path}' is a video file, not a rawframe directory")
if osp.isdir(video_path) and not use_frames:
raise RuntimeError(
f"'{video_path}' is a rawframe directory, not a video file")
cfg = model.cfg
device = next(model.parameters()).device # model device
# build the data pipeline
test_pipeline = cfg.data.test.pipeline
test_pipeline = Compose(test_pipeline)
# prepare data
if use_frames:
filename_tmpl = cfg.data.test.get('filename_tmpl', 'img_{:05}.jpg')
modality = cfg.data.test.get('modality', 'RGB')
start_index = cfg.data.test.get('start_index', 1)
data = dict(
frame_dir=video_path,
total_frames=len(os.listdir(video_path)),
label=-1,
start_index=start_index,
filename_tmpl=filename_tmpl,
modality=modality)
else:
start_index = cfg.data.test.get('start_index', 0)
data = dict(
filename=video_path,
label=-1,
start_index=start_index,
modality='RGB')
data = test_pipeline(data)
data = collate([data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [device])[0]
return data
def _resize_frames(frame_list,
scale,
keep_ratio=True,
interpolation='bilinear'):
"""resize frames according to given scale.
Codes are modified from `mmaction2/datasets/pipelines/augmentation.py`,
`Resize` class.
Args:
frame_list (list[np.ndarray]): frames to be resized.
scale (tuple[int]): If keep_ratio is True, it serves as scaling
factor or maximum size: the image will be rescaled as large
as possible within the scale. Otherwise, it serves as (w, h)
of output size.
keep_ratio (bool): If set to True, Images will be resized without
changing the aspect ratio. Otherwise, it will resize images to a
given size. Default: True.
interpolation (str): Algorithm used for interpolation:
"nearest" | "bilinear". Default: "bilinear".
Returns:
list[np.ndarray]: Both GradCAM and Recognizer test stage inputs,
including two keys, ``imgs`` and ``label``.
"""
if scale is None or (scale[0] == -1 and scale[1] == -1):
return frame_list
scale = tuple(scale)
max_long_edge = max(scale)
max_short_edge = min(scale)
if max_short_edge == -1:
scale = (np.inf, max_long_edge)
img_h, img_w, _ = frame_list[0].shape
if keep_ratio:
new_w, new_h = mmcv.rescale_size((img_w, img_h), scale)
else:
new_w, new_h = scale
frame_list = [
mmcv.imresize(img, (new_w, new_h), interpolation=interpolation)
for img in frame_list
]
return frame_list
def main():
args = parse_args()
# assign the desired device.
device = torch.device(args.device)
cfg = Config.fromfile(args.config)
cfg.merge_from_dict(args.cfg_options)
# build the recognizer from a config file and checkpoint file/url
model = init_recognizer(cfg, args.checkpoint, device=device)
inputs = build_inputs(model, args.video, use_frames=args.use_frames)
gradcam = GradCAM(model, args.target_layer_name)
results = gradcam(inputs)
if args.out_filename is not None:
try:
from moviepy.editor import ImageSequenceClip
except ImportError:
raise ImportError('Please install moviepy to enable output file.')
# frames_batches shape [B, T, H, W, 3], in RGB order
frames_batches = (results[0] * 255.).numpy().astype(np.uint8)
frames = frames_batches.reshape(-1, *frames_batches.shape[-3:])
frame_list = list(frames)
frame_list = _resize_frames(
frame_list,
args.target_resolution,
interpolation=args.resize_algorithm)
video_clips = ImageSequenceClip(frame_list, fps=args.fps)
out_type = osp.splitext(args.out_filename)[1][1:]
if out_type == 'gif':
video_clips.write_gif(args.out_filename)
else:
video_clips.write_videofile(args.out_filename, remove_temp=True)
if __name__ == '__main__':
main()
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import os
import os.path as osp
import shutil
import cv2
import mmcv
import numpy as np
import torch
from mmcv import DictAction
from mmaction.apis import inference_recognizer, init_recognizer
try:
from mmdet.apis import inference_detector, init_detector
except (ImportError, ModuleNotFoundError):
raise ImportError('Failed to import `inference_detector` and '
'`init_detector` form `mmdet.apis`. These apis are '
'required in this demo! ')
try:
from mmpose.apis import (inference_top_down_pose_model, init_pose_model,
vis_pose_result)
except (ImportError, ModuleNotFoundError):
raise ImportError('Failed to import `inference_top_down_pose_model`, '
'`init_pose_model`, and `vis_pose_result` form '
'`mmpose.apis`. These apis are required in this demo! ')
try:
import moviepy.editor as mpy
except ImportError:
raise ImportError('Please install moviepy to enable output file')
FONTFACE = cv2.FONT_HERSHEY_DUPLEX
FONTSCALE = 0.75
FONTCOLOR = (255, 255, 255) # BGR, white
THICKNESS = 1
LINETYPE = 1
def parse_args():
parser = argparse.ArgumentParser(description='MMAction2 demo')
parser.add_argument('video', help='video file/url')
parser.add_argument('out_filename', help='output filename')
parser.add_argument(
'--config',
default=('configs/skeleton/posec3d/'
'slowonly_r50_u48_240e_ntu120_xsub_keypoint.py'),
help='skeleton model config file path')
parser.add_argument(
'--checkpoint',
default=('https://download.openmmlab.com/mmaction/skeleton/posec3d/'
'slowonly_r50_u48_240e_ntu120_xsub_keypoint/'
'slowonly_r50_u48_240e_ntu120_xsub_keypoint-6736b03f.pth'),
help='skeleton model checkpoint file/url')
parser.add_argument(
'--det-config',
default='demo/faster_rcnn_r50_fpn_2x_coco.py',
help='human detection config file path (from mmdet)')
parser.add_argument(
'--det-checkpoint',
default=('http://download.openmmlab.com/mmdetection/v2.0/faster_rcnn/'
'faster_rcnn_r50_fpn_2x_coco/'
'faster_rcnn_r50_fpn_2x_coco_'
'bbox_mAP-0.384_20200504_210434-a5d8aa15.pth'),
help='human detection checkpoint file/url')
parser.add_argument(
'--pose-config',
default='demo/hrnet_w32_coco_256x192.py',
help='human pose estimation config file path (from mmpose)')
parser.add_argument(
'--pose-checkpoint',
default=('https://download.openmmlab.com/mmpose/top_down/hrnet/'
'hrnet_w32_coco_256x192-c78dce93_20200708.pth'),
help='human pose estimation checkpoint file/url')
parser.add_argument(
'--det-score-thr',
type=float,
default=0.9,
help='the threshold of human detection score')
parser.add_argument(
'--label-map',
default='tools/data/skeleton/label_map_ntu120.txt',
help='label map file')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--short-side',
type=int,
default=480,
help='specify the short-side length of the image')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
args = parser.parse_args()
return args
def frame_extraction(video_path, short_side):
"""Extract frames given video_path.
Args:
video_path (str): The video_path.
"""
# Load the video, extract frames into ./tmp/video_name
target_dir = osp.join('./tmp', osp.basename(osp.splitext(video_path)[0]))
os.makedirs(target_dir, exist_ok=True)
# Should be able to handle videos up to several hours
frame_tmpl = osp.join(target_dir, 'img_{:06d}.jpg')
vid = cv2.VideoCapture(video_path)
frames = []
frame_paths = []
flag, frame = vid.read()
cnt = 0
new_h, new_w = None, None
while flag:
if new_h is None:
h, w, _ = frame.shape
new_w, new_h = mmcv.rescale_size((w, h), (short_side, np.Inf))
frame = mmcv.imresize(frame, (new_w, new_h))
frames.append(frame)
frame_path = frame_tmpl.format(cnt + 1)
frame_paths.append(frame_path)
cv2.imwrite(frame_path, frame)
cnt += 1
flag, frame = vid.read()
return frame_paths, frames
def detection_inference(args, frame_paths):
"""Detect human boxes given frame paths.
Args:
args (argparse.Namespace): The arguments.
frame_paths (list[str]): The paths of frames to do detection inference.
Returns:
list[np.ndarray]: The human detection results.
"""
model = init_detector(args.det_config, args.det_checkpoint, args.device)
assert model.CLASSES[0] == 'person', ('We require you to use a detector '
'trained on COCO')
results = []
print('Performing Human Detection for each frame')
prog_bar = mmcv.ProgressBar(len(frame_paths))
for frame_path in frame_paths:
result = inference_detector(model, frame_path)
# We only keep human detections with score larger than det_score_thr
result = result[0][result[0][:, 4] >= args.det_score_thr]
results.append(result)
prog_bar.update()
return results
def pose_inference(args, frame_paths, det_results):
model = init_pose_model(args.pose_config, args.pose_checkpoint,
args.device)
ret = []
print('Performing Human Pose Estimation for each frame')
prog_bar = mmcv.ProgressBar(len(frame_paths))
for f, d in zip(frame_paths, det_results):
# Align input format
d = [dict(bbox=x) for x in list(d)]
pose = inference_top_down_pose_model(model, f, d, format='xyxy')[0]
ret.append(pose)
prog_bar.update()
return ret
def main():
args = parse_args()
frame_paths, original_frames = frame_extraction(args.video,
args.short_side)
num_frame = len(frame_paths)
h, w, _ = original_frames[0].shape
# Get clip_len, frame_interval and calculate center index of each clip
config = mmcv.Config.fromfile(args.config)
config.merge_from_dict(args.cfg_options)
for component in config.data.test.pipeline:
if component['type'] == 'PoseNormalize':
component['mean'] = (w // 2, h // 2, .5)
component['max_value'] = (w, h, 1.)
model = init_recognizer(config, args.checkpoint, args.device)
# Load label_map
label_map = [x.strip() for x in open(args.label_map).readlines()]
# Get Human detection results
det_results = detection_inference(args, frame_paths)
torch.cuda.empty_cache()
pose_results = pose_inference(args, frame_paths, det_results)
torch.cuda.empty_cache()
fake_anno = dict(
frame_dir='',
label=-1,
img_shape=(h, w),
original_shape=(h, w),
start_index=0,
modality='Pose',
total_frames=num_frame)
num_person = max([len(x) for x in pose_results])
num_keypoint = 17
keypoint = np.zeros((num_person, num_frame, num_keypoint, 2),
dtype=np.float16)
keypoint_score = np.zeros((num_person, num_frame, num_keypoint),
dtype=np.float16)
for i, poses in enumerate(pose_results):
for j, pose in enumerate(poses):
pose = pose['keypoints']
keypoint[j, i] = pose[:, :2]
keypoint_score[j, i] = pose[:, 2]
fake_anno['keypoint'] = keypoint
fake_anno['keypoint_score'] = keypoint_score
results = inference_recognizer(model, fake_anno)
action_label = label_map[results[0][0]]
pose_model = init_pose_model(args.pose_config, args.pose_checkpoint,
args.device)
vis_frames = [
vis_pose_result(pose_model, frame_paths[i], pose_results[i])
for i in range(num_frame)
]
for frame in vis_frames:
cv2.putText(frame, action_label, (10, 30), FONTFACE, FONTSCALE,
FONTCOLOR, THICKNESS, LINETYPE)
vid = mpy.ImageSequenceClip([x[:, :, ::-1] for x in vis_frames], fps=24)
vid.write_videofile(args.out_filename, remove_temp=True)
tmp_frame_dir = osp.dirname(frame_paths[0])
shutil.rmtree(tmp_frame_dir)
if __name__ == '__main__':
main()
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import copy as cp
import os
import os.path as osp
import shutil
import cv2
import mmcv
import numpy as np
import torch
from mmcv import DictAction
from mmcv.runner import load_checkpoint
from mmaction.models import build_detector
try:
from mmdet.apis import inference_detector, init_detector
except (ImportError, ModuleNotFoundError):
raise ImportError('Failed to import `inference_detector` and '
'`init_detector` form `mmdet.apis`. These apis are '
'required in this demo! ')
try:
import moviepy.editor as mpy
except ImportError:
raise ImportError('Please install moviepy to enable output file')
FONTFACE = cv2.FONT_HERSHEY_DUPLEX
FONTSCALE = 0.5
FONTCOLOR = (255, 255, 255) # BGR, white
MSGCOLOR = (128, 128, 128) # BGR, gray
THICKNESS = 1
LINETYPE = 1
def hex2color(h):
"""Convert the 6-digit hex string to tuple of 3 int value (RGB)"""
return (int(h[:2], 16), int(h[2:4], 16), int(h[4:], 16))
plate_blue = '03045e-023e8a-0077b6-0096c7-00b4d8-48cae4'
plate_blue = plate_blue.split('-')
plate_blue = [hex2color(h) for h in plate_blue]
plate_green = '004b23-006400-007200-008000-38b000-70e000'
plate_green = plate_green.split('-')
plate_green = [hex2color(h) for h in plate_green]
def visualize(frames, annotations, plate=plate_blue, max_num=5):
"""Visualize frames with predicted annotations.
Args:
frames (list[np.ndarray]): Frames for visualization, note that
len(frames) % len(annotations) should be 0.
annotations (list[list[tuple]]): The predicted results.
plate (str): The plate used for visualization. Default: plate_blue.
max_num (int): Max number of labels to visualize for a person box.
Default: 5.
Returns:
list[np.ndarray]: Visualized frames.
"""
assert max_num + 1 <= len(plate)
plate = [x[::-1] for x in plate]
frames_ = cp.deepcopy(frames)
nf, na = len(frames), len(annotations)
assert nf % na == 0
nfpa = len(frames) // len(annotations)
anno = None
h, w, _ = frames[0].shape
scale_ratio = np.array([w, h, w, h])
for i in range(na):
anno = annotations[i]
if anno is None:
continue
for j in range(nfpa):
ind = i * nfpa + j
frame = frames_[ind]
for ann in anno:
box = ann[0]
label = ann[1]
if not len(label):
continue
score = ann[2]
box = (box * scale_ratio).astype(np.int64)
st, ed = tuple(box[:2]), tuple(box[2:])
cv2.rectangle(frame, st, ed, plate[0], 2)
for k, lb in enumerate(label):
if k >= max_num:
break
text = abbrev(lb)
text = ': '.join([text, str(score[k])])
location = (0 + st[0], 18 + k * 18 + st[1])
textsize = cv2.getTextSize(text, FONTFACE, FONTSCALE,
THICKNESS)[0]
textwidth = textsize[0]
diag0 = (location[0] + textwidth, location[1] - 14)
diag1 = (location[0], location[1] + 2)
cv2.rectangle(frame, diag0, diag1, plate[k + 1], -1)
cv2.putText(frame, text, location, FONTFACE, FONTSCALE,
FONTCOLOR, THICKNESS, LINETYPE)
return frames_
def parse_args():
parser = argparse.ArgumentParser(description='MMAction2 demo')
parser.add_argument(
'--config',
default=('configs/detection/ava/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb.py'),
help='spatio temporal detection config file path')
parser.add_argument(
'--checkpoint',
default=('https://download.openmmlab.com/mmaction/detection/ava/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb'
'_20201217-16378594.pth'),
help='spatio temporal detection checkpoint file/url')
parser.add_argument(
'--det-config',
default='demo/faster_rcnn_r50_fpn_2x_coco.py',
help='human detection config file path (from mmdet)')
parser.add_argument(
'--det-checkpoint',
default=('http://download.openmmlab.com/mmdetection/v2.0/faster_rcnn/'
'faster_rcnn_r50_fpn_2x_coco/'
'faster_rcnn_r50_fpn_2x_coco_'
'bbox_mAP-0.384_20200504_210434-a5d8aa15.pth'),
help='human detection checkpoint file/url')
parser.add_argument(
'--det-score-thr',
type=float,
default=0.9,
help='the threshold of human detection score')
parser.add_argument(
'--action-score-thr',
type=float,
default=0.5,
help='the threshold of human action score')
parser.add_argument('--video', help='video file/url')
parser.add_argument(
'--label-map',
default='tools/data/ava/label_map.txt',
help='label map file')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--out-filename',
default='demo/stdet_demo.mp4',
help='output filename')
parser.add_argument(
'--predict-stepsize',
default=8,
type=int,
help='give out a prediction per n frames')
parser.add_argument(
'--output-stepsize',
default=4,
type=int,
help=('show one frame per n frames in the demo, we should have: '
'predict_stepsize % output_stepsize == 0'))
parser.add_argument(
'--output-fps',
default=6,
type=int,
help='the fps of demo video output')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
args = parser.parse_args()
return args
def frame_extraction(video_path):
"""Extract frames given video_path.
Args:
video_path (str): The video_path.
"""
# Load the video, extract frames into ./tmp/video_name
target_dir = osp.join('./tmp', osp.basename(osp.splitext(video_path)[0]))
os.makedirs(target_dir, exist_ok=True)
# Should be able to handle videos up to several hours
frame_tmpl = osp.join(target_dir, 'img_{:06d}.jpg')
vid = cv2.VideoCapture(video_path)
frames = []
frame_paths = []
flag, frame = vid.read()
cnt = 0
while flag:
frames.append(frame)
frame_path = frame_tmpl.format(cnt + 1)
frame_paths.append(frame_path)
cv2.imwrite(frame_path, frame)
cnt += 1
flag, frame = vid.read()
return frame_paths, frames
def detection_inference(args, frame_paths):
"""Detect human boxes given frame paths.
Args:
args (argparse.Namespace): The arguments.
frame_paths (list[str]): The paths of frames to do detection inference.
Returns:
list[np.ndarray]: The human detection results.
"""
model = init_detector(args.det_config, args.det_checkpoint, args.device)
assert model.CLASSES[0] == 'person', ('We require you to use a detector '
'trained on COCO')
results = []
print('Performing Human Detection for each frame')
prog_bar = mmcv.ProgressBar(len(frame_paths))
for frame_path in frame_paths:
result = inference_detector(model, frame_path)
# We only keep human detections with score larger than det_score_thr
result = result[0][result[0][:, 4] >= args.det_score_thr]
results.append(result)
prog_bar.update()
return results
def load_label_map(file_path):
"""Load Label Map.
Args:
file_path (str): The file path of label map.
Returns:
dict: The label map (int -> label name).
"""
lines = open(file_path).readlines()
lines = [x.strip().split(': ') for x in lines]
return {int(x[0]): x[1] for x in lines}
def abbrev(name):
"""Get the abbreviation of label name:
'take (an object) from (a person)' -> 'take ... from ...'
"""
while name.find('(') != -1:
st, ed = name.find('('), name.find(')')
name = name[:st] + '...' + name[ed + 1:]
return name
def pack_result(human_detection, result, img_h, img_w):
"""Short summary.
Args:
human_detection (np.ndarray): Human detection result.
result (type): The predicted label of each human proposal.
img_h (int): The image height.
img_w (int): The image width.
Returns:
tuple: Tuple of human proposal, label name and label score.
"""
human_detection[:, 0::2] /= img_w
human_detection[:, 1::2] /= img_h
results = []
if result is None:
return None
for prop, res in zip(human_detection, result):
res.sort(key=lambda x: -x[1])
results.append(
(prop.data.cpu().numpy(), [x[0] for x in res], [x[1]
for x in res]))
return results
def main():
args = parse_args()
frame_paths, original_frames = frame_extraction(args.video)
num_frame = len(frame_paths)
h, w, _ = original_frames[0].shape
# resize frames to shortside 256
new_w, new_h = mmcv.rescale_size((w, h), (256, np.Inf))
frames = [mmcv.imresize(img, (new_w, new_h)) for img in original_frames]
w_ratio, h_ratio = new_w / w, new_h / h
# Get clip_len, frame_interval and calculate center index of each clip
config = mmcv.Config.fromfile(args.config)
config.merge_from_dict(args.cfg_options)
val_pipeline = config.data.val.pipeline
sampler = [x for x in val_pipeline if x['type'] == 'SampleAVAFrames'][0]
clip_len, frame_interval = sampler['clip_len'], sampler['frame_interval']
window_size = clip_len * frame_interval
assert clip_len % 2 == 0, 'We would like to have an even clip_len'
# Note that it's 1 based here
timestamps = np.arange(window_size // 2, num_frame + 1 - window_size // 2,
args.predict_stepsize)
# Load label_map
label_map = load_label_map(args.label_map)
try:
if config['data']['train']['custom_classes'] is not None:
label_map = {
id + 1: label_map[cls]
for id, cls in enumerate(config['data']['train']
['custom_classes'])
}
except KeyError:
pass
# Get Human detection results
center_frames = [frame_paths[ind - 1] for ind in timestamps]
human_detections = detection_inference(args, center_frames)
for i in range(len(human_detections)):
det = human_detections[i]
det[:, 0:4:2] *= w_ratio
det[:, 1:4:2] *= h_ratio
human_detections[i] = torch.from_numpy(det[:, :4]).to(args.device)
# Get img_norm_cfg
img_norm_cfg = config['img_norm_cfg']
if 'to_rgb' not in img_norm_cfg and 'to_bgr' in img_norm_cfg:
to_bgr = img_norm_cfg.pop('to_bgr')
img_norm_cfg['to_rgb'] = to_bgr
img_norm_cfg['mean'] = np.array(img_norm_cfg['mean'])
img_norm_cfg['std'] = np.array(img_norm_cfg['std'])
# Build STDET model
try:
# In our spatiotemporal detection demo, different actions should have
# the same number of bboxes.
config['model']['test_cfg']['rcnn']['action_thr'] = .0
except KeyError:
pass
config.model.backbone.pretrained = None
model = build_detector(config.model, test_cfg=config.get('test_cfg'))
load_checkpoint(model, args.checkpoint, map_location='cpu')
model.to(args.device)
model.eval()
predictions = []
print('Performing SpatioTemporal Action Detection for each clip')
assert len(timestamps) == len(human_detections)
prog_bar = mmcv.ProgressBar(len(timestamps))
for timestamp, proposal in zip(timestamps, human_detections):
if proposal.shape[0] == 0:
predictions.append(None)
continue
start_frame = timestamp - (clip_len // 2 - 1) * frame_interval
frame_inds = start_frame + np.arange(0, window_size, frame_interval)
frame_inds = list(frame_inds - 1)
imgs = [frames[ind].astype(np.float32) for ind in frame_inds]
_ = [mmcv.imnormalize_(img, **img_norm_cfg) for img in imgs]
# THWC -> CTHW -> 1CTHW
input_array = np.stack(imgs).transpose((3, 0, 1, 2))[np.newaxis]
input_tensor = torch.from_numpy(input_array).to(args.device)
with torch.no_grad():
result = model(
return_loss=False,
img=[input_tensor],
img_metas=[[dict(img_shape=(new_h, new_w))]],
proposals=[[proposal]])
result = result[0]
prediction = []
# N proposals
for i in range(proposal.shape[0]):
prediction.append([])
# Perform action score thr
for i in range(len(result)):
if i + 1 not in label_map:
continue
for j in range(proposal.shape[0]):
if result[i][j, 4] > args.action_score_thr:
prediction[j].append((label_map[i + 1], result[i][j,
4]))
predictions.append(prediction)
prog_bar.update()
results = []
for human_detection, prediction in zip(human_detections, predictions):
results.append(pack_result(human_detection, prediction, new_h, new_w))
def dense_timestamps(timestamps, n):
"""Make it nx frames."""
old_frame_interval = (timestamps[1] - timestamps[0])
start = timestamps[0] - old_frame_interval / n * (n - 1) / 2
new_frame_inds = np.arange(
len(timestamps) * n) * old_frame_interval / n + start
return new_frame_inds.astype(np.int)
dense_n = int(args.predict_stepsize / args.output_stepsize)
frames = [
cv2.imread(frame_paths[i - 1])
for i in dense_timestamps(timestamps, dense_n)
]
print('Performing visualization')
vis_frames = visualize(frames, results)
vid = mpy.ImageSequenceClip([x[:, :, ::-1] for x in vis_frames],
fps=args.output_fps)
vid.write_videofile(args.out_filename)
tmp_frame_dir = osp.dirname(frame_paths[0])
shutil.rmtree(tmp_frame_dir)
if __name__ == '__main__':
main()
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import copy as cp
import os
import os.path as osp
import shutil
import warnings
import cv2
import mmcv
import numpy as np
import torch
from mmcv import DictAction
from mmcv.runner import load_checkpoint
from mmaction.apis import inference_recognizer
from mmaction.datasets.pipelines import Compose
from mmaction.models import build_detector, build_model, build_recognizer
try:
from mmdet.apis import inference_detector, init_detector
except (ImportError, ModuleNotFoundError):
warnings.warn('Failed to import `inference_detector` and `init_detector` '
'form `mmdet.apis`. These apis are required in '
'skeleton-based applications! ')
try:
from mmpose.apis import (inference_top_down_pose_model, init_pose_model,
vis_pose_result)
except (ImportError, ModuleNotFoundError):
warnings.warn('Failed to import `inference_top_down_pose_model`, '
'`init_pose_model`, and `vis_pose_result` form '
'`mmpose.apis`. These apis are required in skeleton-based '
'applications! ')
try:
import moviepy.editor as mpy
except ImportError:
raise ImportError('Please install moviepy to enable output file')
FONTFACE = cv2.FONT_HERSHEY_DUPLEX
FONTSCALE = 0.5
FONTCOLOR = (255, 255, 255) # BGR, white
MSGCOLOR = (128, 128, 128) # BGR, gray
THICKNESS = 1
LINETYPE = 1
def hex2color(h):
"""Convert the 6-digit hex string to tuple of 3 int value (RGB)"""
return (int(h[:2], 16), int(h[2:4], 16), int(h[4:], 16))
PLATEBLUE = '03045e-023e8a-0077b6-0096c7-00b4d8-48cae4'
PLATEBLUE = PLATEBLUE.split('-')
PLATEBLUE = [hex2color(h) for h in PLATEBLUE]
PLATEGREEN = '004b23-006400-007200-008000-38b000-70e000'
PLATEGREEN = PLATEGREEN.split('-')
PLATEGREEN = [hex2color(h) for h in PLATEGREEN]
def visualize(frames,
annotations,
pose_results,
action_result,
pose_model,
plate=PLATEBLUE,
max_num=5):
"""Visualize frames with predicted annotations.
Args:
frames (list[np.ndarray]): Frames for visualization, note that
len(frames) % len(annotations) should be 0.
annotations (list[list[tuple]]): The predicted spatio-temporal
detection results.
pose_results (list[list[tuple]): The pose results.
action_result (str): The predicted action recognition results.
pose_model (nn.Module): The constructed pose model.
plate (str): The plate used for visualization. Default: PLATEBLUE.
max_num (int): Max number of labels to visualize for a person box.
Default: 5.
Returns:
list[np.ndarray]: Visualized frames.
"""
assert max_num + 1 <= len(plate)
plate = [x[::-1] for x in plate]
frames_ = cp.deepcopy(frames)
nf, na = len(frames), len(annotations)
assert nf % na == 0
nfpa = len(frames) // len(annotations)
anno = None
h, w, _ = frames[0].shape
scale_ratio = np.array([w, h, w, h])
# add pose results
if pose_results:
for i in range(nf):
frames_[i] = vis_pose_result(pose_model, frames_[i],
pose_results[i])
for i in range(na):
anno = annotations[i]
if anno is None:
continue
for j in range(nfpa):
ind = i * nfpa + j
frame = frames_[ind]
# add action result for whole video
cv2.putText(frame, action_result, (10, 30), FONTFACE, FONTSCALE,
FONTCOLOR, THICKNESS, LINETYPE)
# add spatio-temporal action detection results
for ann in anno:
box = ann[0]
label = ann[1]
if not len(label):
continue
score = ann[2]
box = (box * scale_ratio).astype(np.int64)
st, ed = tuple(box[:2]), tuple(box[2:])
if not pose_results:
cv2.rectangle(frame, st, ed, plate[0], 2)
for k, lb in enumerate(label):
if k >= max_num:
break
text = abbrev(lb)
text = ': '.join([text, str(score[k])])
location = (0 + st[0], 18 + k * 18 + st[1])
textsize = cv2.getTextSize(text, FONTFACE, FONTSCALE,
THICKNESS)[0]
textwidth = textsize[0]
diag0 = (location[0] + textwidth, location[1] - 14)
diag1 = (location[0], location[1] + 2)
cv2.rectangle(frame, diag0, diag1, plate[k + 1], -1)
cv2.putText(frame, text, location, FONTFACE, FONTSCALE,
FONTCOLOR, THICKNESS, LINETYPE)
return frames_
def parse_args():
parser = argparse.ArgumentParser(description='MMAction2 demo')
parser.add_argument(
'--rgb-stdet-config',
default=('configs/detection/ava/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb.py'),
help='rgb-based spatio temporal detection config file path')
parser.add_argument(
'--rgb-stdet-checkpoint',
default=('https://download.openmmlab.com/mmaction/detection/ava/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb'
'_20201217-16378594.pth'),
help='rgb-based spatio temporal detection checkpoint file/url')
parser.add_argument(
'--skeleton-stdet-checkpoint',
default=('https://download.openmmlab.com/mmaction/skeleton/posec3d/'
'posec3d_ava.pth'),
help='skeleton-based spatio temporal detection checkpoint file/url')
parser.add_argument(
'--det-config',
default='demo/faster_rcnn_r50_fpn_2x_coco.py',
help='human detection config file path (from mmdet)')
parser.add_argument(
'--det-checkpoint',
default=('http://download.openmmlab.com/mmdetection/v2.0/'
'faster_rcnn/faster_rcnn_r50_fpn_2x_coco/'
'faster_rcnn_r50_fpn_2x_coco_'
'bbox_mAP-0.384_20200504_210434-a5d8aa15.pth'),
help='human detection checkpoint file/url')
parser.add_argument(
'--pose-config',
default='demo/hrnet_w32_coco_256x192.py',
help='human pose estimation config file path (from mmpose)')
parser.add_argument(
'--pose-checkpoint',
default=('https://download.openmmlab.com/mmpose/top_down/hrnet/'
'hrnet_w32_coco_256x192-c78dce93_20200708.pth'),
help='human pose estimation checkpoint file/url')
parser.add_argument(
'--skeleton-config',
default='configs/skeleton/posec3d/'
'slowonly_r50_u48_240e_ntu120_xsub_keypoint.py',
help='skeleton-based action recognition config file path')
parser.add_argument(
'--skeleton-checkpoint',
default='https://download.openmmlab.com/mmaction/skeleton/posec3d/'
'posec3d_k400.pth',
help='skeleton-based action recognition checkpoint file/url')
parser.add_argument(
'--rgb-config',
default='configs/recognition/tsn/'
'tsn_r50_video_inference_1x1x3_100e_kinetics400_rgb.py',
help='rgb-based action recognition config file path')
parser.add_argument(
'--rgb-checkpoint',
default='https://download.openmmlab.com/mmaction/recognition/'
'tsn/tsn_r50_1x1x3_100e_kinetics400_rgb/'
'tsn_r50_1x1x3_100e_kinetics400_rgb_20200614-e508be42.pth',
help='rgb-based action recognition checkpoint file/url')
parser.add_argument(
'--use-skeleton-stdet',
action='store_true',
help='use skeleton-based spatio temporal detection method')
parser.add_argument(
'--use-skeleton-recog',
action='store_true',
help='use skeleton-based action recognition method')
parser.add_argument(
'--det-score-thr',
type=float,
default=0.9,
help='the threshold of human detection score')
parser.add_argument(
'--action-score-thr',
type=float,
default=0.4,
help='the threshold of action prediction score')
parser.add_argument(
'--video',
default='demo/test_video_structuralize.mp4',
help='video file/url')
parser.add_argument(
'--label-map-stdet',
default='tools/data/ava/label_map.txt',
help='label map file for spatio-temporal action detection')
parser.add_argument(
'--label-map',
default='tools/data/kinetics/label_map_k400.txt',
help='label map file for action recognition')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--out-filename',
default='demo/test_stdet_recognition_output.mp4',
help='output filename')
parser.add_argument(
'--predict-stepsize',
default=8,
type=int,
help='give out a spatio-temporal detection prediction per n frames')
parser.add_argument(
'--output-stepsize',
default=1,
type=int,
help=('show one frame per n frames in the demo, we should have: '
'predict_stepsize % output_stepsize == 0'))
parser.add_argument(
'--output-fps',
default=24,
type=int,
help='the fps of demo video output')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
args = parser.parse_args()
return args
def frame_extraction(video_path):
"""Extract frames given video_path.
Args:
video_path (str): The video_path.
"""
# Load the video, extract frames into ./tmp/video_name
target_dir = osp.join('./tmp', osp.basename(osp.splitext(video_path)[0]))
# target_dir = osp.join('./tmp','spatial_skeleton_dir')
os.makedirs(target_dir, exist_ok=True)
# Should be able to handle videos up to several hours
frame_tmpl = osp.join(target_dir, 'img_{:06d}.jpg')
vid = cv2.VideoCapture(video_path)
frames = []
frame_paths = []
flag, frame = vid.read()
cnt = 0
while flag:
frames.append(frame)
frame_path = frame_tmpl.format(cnt + 1)
frame_paths.append(frame_path)
cv2.imwrite(frame_path, frame)
cnt += 1
flag, frame = vid.read()
return frame_paths, frames
def detection_inference(args, frame_paths):
"""Detect human boxes given frame paths.
Args:
args (argparse.Namespace): The arguments.
frame_paths (list[str]): The paths of frames to do detection inference.
Returns:
list[np.ndarray]: The human detection results.
"""
model = init_detector(args.det_config, args.det_checkpoint, args.device)
assert model.CLASSES[0] == 'person', ('We require you to use a detector '
'trained on COCO')
results = []
print('Performing Human Detection for each frame')
prog_bar = mmcv.ProgressBar(len(frame_paths))
for frame_path in frame_paths:
result = inference_detector(model, frame_path)
# We only keep human detections with score larger than det_score_thr
result = result[0][result[0][:, 4] >= args.det_score_thr]
results.append(result)
prog_bar.update()
return results
def pose_inference(args, frame_paths, det_results):
model = init_pose_model(args.pose_config, args.pose_checkpoint,
args.device)
ret = []
print('Performing Human Pose Estimation for each frame')
prog_bar = mmcv.ProgressBar(len(frame_paths))
for f, d in zip(frame_paths, det_results):
# Align input format
d = [dict(bbox=x) for x in list(d)]
pose = inference_top_down_pose_model(model, f, d, format='xyxy')[0]
ret.append(pose)
prog_bar.update()
return ret
def load_label_map(file_path):
"""Load Label Map.
Args:
file_path (str): The file path of label map.
Returns:
dict: The label map (int -> label name).
"""
lines = open(file_path).readlines()
lines = [x.strip().split(': ') for x in lines]
return {int(x[0]): x[1] for x in lines}
def abbrev(name):
"""Get the abbreviation of label name:
'take (an object) from (a person)' -> 'take ... from ...'
"""
while name.find('(') != -1:
st, ed = name.find('('), name.find(')')
name = name[:st] + '...' + name[ed + 1:]
return name
def pack_result(human_detection, result, img_h, img_w):
"""Short summary.
Args:
human_detection (np.ndarray): Human detection result.
result (type): The predicted label of each human proposal.
img_h (int): The image height.
img_w (int): The image width.
Returns:
tuple: Tuple of human proposal, label name and label score.
"""
human_detection[:, 0::2] /= img_w
human_detection[:, 1::2] /= img_h
results = []
if result is None:
return None
for prop, res in zip(human_detection, result):
res.sort(key=lambda x: -x[1])
results.append(
(prop.data.cpu().numpy(), [x[0] for x in res], [x[1]
for x in res]))
return results
def expand_bbox(bbox, h, w, ratio=1.25):
x1, y1, x2, y2 = bbox
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
width = x2 - x1
height = y2 - y1
square_l = max(width, height)
new_width = new_height = square_l * ratio
new_x1 = max(0, int(center_x - new_width / 2))
new_x2 = min(int(center_x + new_width / 2), w)
new_y1 = max(0, int(center_y - new_height / 2))
new_y2 = min(int(center_y + new_height / 2), h)
return (new_x1, new_y1, new_x2, new_y2)
def cal_iou(box1, box2):
xmin1, ymin1, xmax1, ymax1 = box1
xmin2, ymin2, xmax2, ymax2 = box2
s1 = (xmax1 - xmin1) * (ymax1 - ymin1)
s2 = (xmax2 - xmin2) * (ymax2 - ymin2)
xmin = max(xmin1, xmin2)
ymin = max(ymin1, ymin2)
xmax = min(xmax1, xmax2)
ymax = min(ymax1, ymax2)
w = max(0, xmax - xmin)
h = max(0, ymax - ymin)
intersect = w * h
union = s1 + s2 - intersect
iou = intersect / union
return iou
def skeleton_based_action_recognition(args, pose_results, num_frame, h, w):
fake_anno = dict(
frame_dict='',
label=-1,
img_shape=(h, w),
origin_shape=(h, w),
start_index=0,
modality='Pose',
total_frames=num_frame)
num_person = max([len(x) for x in pose_results])
num_keypoint = 17
keypoint = np.zeros((num_person, num_frame, num_keypoint, 2),
dtype=np.float16)
keypoint_score = np.zeros((num_person, num_frame, num_keypoint),
dtype=np.float16)
for i, poses in enumerate(pose_results):
for j, pose in enumerate(poses):
pose = pose['keypoints']
keypoint[j, i] = pose[:, :2]
keypoint_score[j, i] = pose[:, 2]
fake_anno['keypoint'] = keypoint
fake_anno['keypoint_score'] = keypoint_score
label_map = [x.strip() for x in open(args.label_map).readlines()]
num_class = len(label_map)
skeleton_config = mmcv.Config.fromfile(args.skeleton_config)
skeleton_config.model.cls_head.num_classes = num_class # for K400 dataset
skeleton_pipeline = Compose(skeleton_config.test_pipeline)
skeleton_imgs = skeleton_pipeline(fake_anno)['imgs'][None]
skeleton_imgs = skeleton_imgs.to(args.device)
# Build skeleton-based recognition model
skeleton_model = build_model(skeleton_config.model)
load_checkpoint(
skeleton_model, args.skeleton_checkpoint, map_location='cpu')
skeleton_model.to(args.device)
skeleton_model.eval()
with torch.no_grad():
output = skeleton_model(return_loss=False, imgs=skeleton_imgs)
action_idx = np.argmax(output)
skeleton_action_result = label_map[
action_idx] # skeleton-based action result for the whole video
return skeleton_action_result
def rgb_based_action_recognition(args):
rgb_config = mmcv.Config.fromfile(args.rgb_config)
rgb_config.model.backbone.pretrained = None
rgb_model = build_recognizer(
rgb_config.model, test_cfg=rgb_config.get('test_cfg'))
load_checkpoint(rgb_model, args.rgb_checkpoint, map_location='cpu')
rgb_model.cfg = rgb_config
rgb_model.to(args.device)
rgb_model.eval()
action_results = inference_recognizer(
rgb_model, args.video, label_path=args.label_map)
rgb_action_result = action_results[0][0]
label_map = [x.strip() for x in open(args.label_map).readlines()]
return label_map[rgb_action_result]
def skeleton_based_stdet(args, label_map, human_detections, pose_results,
num_frame, clip_len, frame_interval, h, w):
window_size = clip_len * frame_interval
assert clip_len % 2 == 0, 'We would like to have an even clip_len'
timestamps = np.arange(window_size // 2, num_frame + 1 - window_size // 2,
args.predict_stepsize)
skeleton_config = mmcv.Config.fromfile(args.skeleton_config)
num_class = max(label_map.keys()) + 1 # for AVA dataset (81)
skeleton_config.model.cls_head.num_classes = num_class
skeleton_pipeline = Compose(skeleton_config.test_pipeline)
skeleton_stdet_model = build_model(skeleton_config.model)
load_checkpoint(
skeleton_stdet_model,
args.skeleton_stdet_checkpoint,
map_location='cpu')
skeleton_stdet_model.to(args.device)
skeleton_stdet_model.eval()
skeleton_predictions = []
print('Performing SpatioTemporal Action Detection for each clip')
prog_bar = mmcv.ProgressBar(len(timestamps))
for timestamp in timestamps:
proposal = human_detections[timestamp - 1]
if proposal.shape[0] == 0: # no people detected
skeleton_predictions.append(None)
continue
start_frame = timestamp - (clip_len // 2 - 1) * frame_interval
frame_inds = start_frame + np.arange(0, window_size, frame_interval)
frame_inds = list(frame_inds - 1)
num_frame = len(frame_inds) # 30
pose_result = [pose_results[ind] for ind in frame_inds]
skeleton_prediction = []
for i in range(proposal.shape[0]): # num_person
skeleton_prediction.append([])
fake_anno = dict(
frame_dict='',
label=-1,
img_shape=(h, w),
origin_shape=(h, w),
start_index=0,
modality='Pose',
total_frames=num_frame)
num_person = 1
num_keypoint = 17
keypoint = np.zeros(
(num_person, num_frame, num_keypoint, 2)) # M T V 2
keypoint_score = np.zeros(
(num_person, num_frame, num_keypoint)) # M T V
# pose matching
person_bbox = proposal[i][:4]
area = expand_bbox(person_bbox, h, w)
for j, poses in enumerate(pose_result): # num_frame
max_iou = float('-inf')
index = -1
if len(poses) == 0:
continue
for k, per_pose in enumerate(poses):
iou = cal_iou(per_pose['bbox'][:4], area)
if max_iou < iou:
index = k
max_iou = iou
keypoint[0, j] = poses[index]['keypoints'][:, :2]
keypoint_score[0, j] = poses[index]['keypoints'][:, 2]
fake_anno['keypoint'] = keypoint
fake_anno['keypoint_score'] = keypoint_score
skeleton_imgs = skeleton_pipeline(fake_anno)['imgs'][None]
skeleton_imgs = skeleton_imgs.to(args.device)
with torch.no_grad():
output = skeleton_stdet_model(
return_loss=False, imgs=skeleton_imgs)
output = output[0]
for k in range(len(output)): # 81
if k not in label_map:
continue
if output[k] > args.action_score_thr:
skeleton_prediction[i].append(
(label_map[k], output[k]))
skeleton_predictions.append(skeleton_prediction)
prog_bar.update()
return timestamps, skeleton_predictions
def rgb_based_stdet(args, frames, label_map, human_detections, w, h, new_w,
new_h, w_ratio, h_ratio):
rgb_stdet_config = mmcv.Config.fromfile(args.rgb_stdet_config)
rgb_stdet_config.merge_from_dict(args.cfg_options)
val_pipeline = rgb_stdet_config.data.val.pipeline
sampler = [x for x in val_pipeline if x['type'] == 'SampleAVAFrames'][0]
clip_len, frame_interval = sampler['clip_len'], sampler['frame_interval']
assert clip_len % 2 == 0, 'We would like to have an even clip_len'
window_size = clip_len * frame_interval
num_frame = len(frames)
timestamps = np.arange(window_size // 2, num_frame + 1 - window_size // 2,
args.predict_stepsize)
# Get img_norm_cfg
img_norm_cfg = rgb_stdet_config['img_norm_cfg']
if 'to_rgb' not in img_norm_cfg and 'to_bgr' in img_norm_cfg:
to_bgr = img_norm_cfg.pop('to_bgr')
img_norm_cfg['to_rgb'] = to_bgr
img_norm_cfg['mean'] = np.array(img_norm_cfg['mean'])
img_norm_cfg['std'] = np.array(img_norm_cfg['std'])
# Build STDET model
try:
# In our spatiotemporal detection demo, different actions should have
# the same number of bboxes.
rgb_stdet_config['model']['test_cfg']['rcnn']['action_thr'] = .0
except KeyError:
pass
rgb_stdet_config.model.backbone.pretrained = None
rgb_stdet_model = build_detector(
rgb_stdet_config.model, test_cfg=rgb_stdet_config.get('test_cfg'))
load_checkpoint(
rgb_stdet_model, args.rgb_stdet_checkpoint, map_location='cpu')
rgb_stdet_model.to(args.device)
rgb_stdet_model.eval()
predictions = []
print('Performing SpatioTemporal Action Detection for each clip')
prog_bar = mmcv.ProgressBar(len(timestamps))
for timestamp in timestamps:
proposal = human_detections[timestamp - 1]
if proposal.shape[0] == 0:
predictions.append(None)
continue
start_frame = timestamp - (clip_len // 2 - 1) * frame_interval
frame_inds = start_frame + np.arange(0, window_size, frame_interval)
frame_inds = list(frame_inds - 1)
imgs = [frames[ind].astype(np.float32) for ind in frame_inds]
_ = [mmcv.imnormalize_(img, **img_norm_cfg) for img in imgs]
# THWC -> CTHW -> 1CTHW
input_array = np.stack(imgs).transpose((3, 0, 1, 2))[np.newaxis]
input_tensor = torch.from_numpy(input_array).to(args.device)
with torch.no_grad():
result = rgb_stdet_model(
return_loss=False,
img=[input_tensor],
img_metas=[[dict(img_shape=(new_h, new_w))]],
proposals=[[proposal]])
result = result[0]
prediction = []
# N proposals
for i in range(proposal.shape[0]):
prediction.append([])
# Perform action score thr
for i in range(len(result)): # 80
if i + 1 not in label_map:
continue
for j in range(proposal.shape[0]):
if result[i][j, 4] > args.action_score_thr:
prediction[j].append((label_map[i + 1], result[i][j,
4]))
predictions.append(prediction)
prog_bar.update()
return timestamps, predictions
def main():
args = parse_args()
frame_paths, original_frames = frame_extraction(args.video)
num_frame = len(frame_paths)
h, w, _ = original_frames[0].shape
# Get Human detection results and pose results
human_detections = detection_inference(args, frame_paths)
pose_results = None
if args.use_skeleton_recog or args.use_skeleton_stdet:
pose_results = pose_inference(args, frame_paths, human_detections)
# resize frames to shortside 256
new_w, new_h = mmcv.rescale_size((w, h), (256, np.Inf))
frames = [mmcv.imresize(img, (new_w, new_h)) for img in original_frames]
w_ratio, h_ratio = new_w / w, new_h / h
# Load spatio-temporal detection label_map
stdet_label_map = load_label_map(args.label_map_stdet)
rgb_stdet_config = mmcv.Config.fromfile(args.rgb_stdet_config)
rgb_stdet_config.merge_from_dict(args.cfg_options)
try:
if rgb_stdet_config['data']['train']['custom_classes'] is not None:
stdet_label_map = {
id + 1: stdet_label_map[cls]
for id, cls in enumerate(rgb_stdet_config['data']['train']
['custom_classes'])
}
except KeyError:
pass
action_result = None
if args.use_skeleton_recog:
print('Use skeleton-based recognition')
action_result = skeleton_based_action_recognition(
args, pose_results, num_frame, h, w)
else:
print('Use rgb-based recognition')
action_result = rgb_based_action_recognition(args)
stdet_preds = None
if args.use_skeleton_stdet:
print('Use skeleton-based SpatioTemporal Action Detection')
clip_len, frame_interval = 30, 1
timestamps, stdet_preds = skeleton_based_stdet(args, stdet_label_map,
human_detections,
pose_results, num_frame,
clip_len,
frame_interval, h, w)
for i in range(len(human_detections)):
det = human_detections[i]
det[:, 0:4:2] *= w_ratio
det[:, 1:4:2] *= h_ratio
human_detections[i] = torch.from_numpy(det[:, :4]).to(args.device)
else:
print('Use rgb-based SpatioTemporal Action Detection')
for i in range(len(human_detections)):
det = human_detections[i]
det[:, 0:4:2] *= w_ratio
det[:, 1:4:2] *= h_ratio
human_detections[i] = torch.from_numpy(det[:, :4]).to(args.device)
timestamps, stdet_preds = rgb_based_stdet(args, frames,
stdet_label_map,
human_detections, w, h,
new_w, new_h, w_ratio,
h_ratio)
stdet_results = []
for timestamp, prediction in zip(timestamps, stdet_preds):
human_detection = human_detections[timestamp - 1]
stdet_results.append(
pack_result(human_detection, prediction, new_h, new_w))
def dense_timestamps(timestamps, n):
"""Make it nx frames."""
old_frame_interval = (timestamps[1] - timestamps[0])
start = timestamps[0] - old_frame_interval / n * (n - 1) / 2
new_frame_inds = np.arange(
len(timestamps) * n) * old_frame_interval / n + start
return new_frame_inds.astype(np.int)
dense_n = int(args.predict_stepsize / args.output_stepsize)
output_timestamps = dense_timestamps(timestamps, dense_n)
frames = [
cv2.imread(frame_paths[timestamp - 1])
for timestamp in output_timestamps
]
print('Performing visualization')
pose_model = init_pose_model(args.pose_config, args.pose_checkpoint,
args.device)
if args.use_skeleton_recog or args.use_skeleton_stdet:
pose_results = [
pose_results[timestamp - 1] for timestamp in output_timestamps
]
vis_frames = visualize(frames, stdet_results, pose_results, action_result,
pose_model)
vid = mpy.ImageSequenceClip([x[:, :, ::-1] for x in vis_frames],
fps=args.output_fps)
vid.write_videofile(args.out_filename)
tmp_frame_dir = osp.dirname(frame_paths[0])
shutil.rmtree(tmp_frame_dir)
if __name__ == '__main__':
main()
# Copyright (c) OpenMMLab. All rights reserved.
# model config
model = dict(
type='FasterRCNN',
pretrained='torchvision://resnet50',
backbone=dict(
type='ResNet',
depth=50,
num_stages=4,
out_indices=(0, 1, 2, 3),
frozen_stages=1,
norm_cfg=dict(type='BN', requires_grad=True),
norm_eval=True,
style='pytorch'),
neck=dict(
type='FPN',
in_channels=[256, 512, 1024, 2048],
out_channels=256,
num_outs=5),
rpn_head=dict(
type='RPNHead',
in_channels=256,
feat_channels=256,
anchor_generator=dict(
type='AnchorGenerator',
scales=[8],
ratios=[0.5, 1.0, 2.0],
strides=[4, 8, 16, 32, 64]),
bbox_coder=dict(
type='DeltaXYWHBBoxCoder',
target_means=[.0, .0, .0, .0],
target_stds=[1.0, 1.0, 1.0, 1.0]),
loss_cls=dict(
type='CrossEntropyLoss', use_sigmoid=True, loss_weight=1.0),
loss_bbox=dict(type='L1Loss', loss_weight=1.0)),
roi_head=dict(
type='StandardRoIHead',
bbox_roi_extractor=dict(
type='SingleRoIExtractor',
roi_layer=dict(type='RoIAlign', output_size=7, sampling_ratio=0),
out_channels=256,
featmap_strides=[4, 8, 16, 32]),
bbox_head=dict(
type='Shared2FCBBoxHead',
in_channels=256,
fc_out_channels=1024,
roi_feat_size=7,
num_classes=80,
bbox_coder=dict(
type='DeltaXYWHBBoxCoder',
target_means=[0., 0., 0., 0.],
target_stds=[0.1, 0.1, 0.2, 0.2]),
reg_class_agnostic=False,
loss_cls=dict(
type='CrossEntropyLoss', use_sigmoid=False, loss_weight=1.0),
loss_bbox=dict(type='L1Loss', loss_weight=1.0))),
# model training and testing settings
train_cfg=dict(
rpn=dict(
assigner=dict(
type='MaxIoUAssigner',
pos_iou_thr=0.7,
neg_iou_thr=0.3,
min_pos_iou=0.3,
match_low_quality=True,
ignore_iof_thr=-1),
sampler=dict(
type='RandomSampler',
num=256,
pos_fraction=0.5,
neg_pos_ub=-1,
add_gt_as_proposals=False),
allowed_border=-1,
pos_weight=-1,
debug=False),
rpn_proposal=dict(
nms_pre=2000,
max_per_img=1000,
nms=dict(type='nms', iou_threshold=0.7),
min_bbox_size=0),
rcnn=dict(
assigner=dict(
type='MaxIoUAssigner',
pos_iou_thr=0.5,
neg_iou_thr=0.5,
min_pos_iou=0.5,
match_low_quality=False,
ignore_iof_thr=-1),
sampler=dict(
type='RandomSampler',
num=512,
pos_fraction=0.25,
neg_pos_ub=-1,
add_gt_as_proposals=True),
pos_weight=-1,
debug=False)),
test_cfg=dict(
rpn=dict(
nms_pre=1000,
max_per_img=1000,
nms=dict(type='nms', iou_threshold=0.7),
min_bbox_size=0),
rcnn=dict(
score_thr=0.05,
nms=dict(type='nms', iou_threshold=0.5),
max_per_img=100)))
# dataset config
dataset_type = 'CocoDataset'
data_root = 'data/coco/'
img_norm_cfg = dict(
mean=[123.675, 116.28, 103.53], std=[58.395, 57.12, 57.375], to_rgb=True)
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(type='LoadAnnotations', with_bbox=True),
dict(type='Resize', img_scale=(1333, 800), keep_ratio=True),
dict(type='RandomFlip', flip_ratio=0.5),
dict(type='Normalize', **img_norm_cfg),
dict(type='Pad', size_divisor=32),
dict(type='DefaultFormatBundle'),
dict(type='Collect', keys=['img', 'gt_bboxes', 'gt_labels']),
]
test_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='MultiScaleFlipAug',
img_scale=(1333, 800),
flip=False,
transforms=[
dict(type='Resize', keep_ratio=True),
dict(type='RandomFlip'),
dict(type='Normalize', **img_norm_cfg),
dict(type='Pad', size_divisor=32),
dict(type='ImageToTensor', keys=['img']),
dict(type='Collect', keys=['img']),
])
]
data = dict(
samples_per_gpu=2,
workers_per_gpu=2,
train=dict(
type=dataset_type,
ann_file=data_root + 'annotations/instances_train2017.json',
img_prefix=data_root + 'train2017/',
pipeline=train_pipeline),
val=dict(
type=dataset_type,
ann_file=data_root + 'annotations/instances_val2017.json',
img_prefix=data_root + 'val2017/',
pipeline=test_pipeline),
test=dict(
type=dataset_type,
ann_file=data_root + 'annotations/instances_val2017.json',
img_prefix=data_root + 'val2017/',
pipeline=test_pipeline))
evaluation = dict(interval=1, metric='bbox')
# Schedule
# optimizer
optimizer = dict(type='SGD', lr=0.02, momentum=0.9, weight_decay=0.0001)
optimizer_config = dict(grad_clip=None)
# learning policy
lr_config = dict(
policy='step',
warmup='linear',
warmup_iters=500,
warmup_ratio=0.001,
step=[16, 22])
total_epochs = 24
# runtime
checkpoint_config = dict(interval=1)
# yapf:disable
log_config = dict(
interval=50,
hooks=[
dict(type='TextLoggerHook'),
])
# yapf:enable
dist_params = dict(backend='nccl')
log_level = 'INFO'
load_from = None
resume_from = None
workflow = [('train', 1)]
jf7RDuUTrsQ 300 325
JTlatknwOrY 301 233
8UxlDNur-Z0 300 262
y9r115bgfNk 300 320
ZnIDviwA8CE 300 244
c8ln_nWYMyM 300 333
9GFfKVeoGm0 300 98
F5Y_gGsg4x8 300 193
AuqIu3x_lhY 300 36
1Hi5GMotrjs 300 26
czhL0iDbNT8 300 46
DYpTE_n-Wvk 177 208
R-xmgefs-M4 300 101
KPP2qRzMdos 300 131
PmgfU9ocx5A 300 193
GI7nIyMEQi4 300 173
A8TIWMvJVDU 300 72
ustVqWMM56c 300 289
03dk7mneDU0 300 254
jqkyelS4GJk 300 279
a58tBGuDIg0 231 382
5l1ajLjqaPo 300 226
-5wLopwbGX0 300 132
NUG7kwJ-614 300 103
wHUvw_R2iv8 300 97
44Mak5_s6Fk 300 256
y5vsk8Mj-3w 300 77
TEj_A_BC-aU 300 393
fUdu6hpMt_c 299 40
C5Z1sRArUR0 300 254
-orecnYvpNw 300 284
Urmbp1ulIXI 300 319
bLgdi4w7OAk 299 36
cVv_XMw4W2U 300 27
dV8JmKwDUzM 300 312
yZ9hIqW4bRc 300 239
9ykbMdR9Jss 213 257
G8fEnqIOkiA 300 158
6P2eVJ-Qp1g 300 131
Y-acp_jXG1Q 302 315
xthWPdx21r8 301 62
LExCUx4STW0 300 9
p2UMwzWsY0U 300 248
c0UI7f3Plro 300 383
1MmjE51PeIE 300 93
OU5dJpNHATk 300 342
38Uv6dbQkWc 281 44
5ZNdkbmv274 300 59
DrSL3Uddj6s 300 283
aNJ1-bvRox8 175 384
b5U7A_crvE0 194 377
xeWO9Bl9aWA 300 86
Zy8Ta83mrXo 300 223
AXnDRH7o2DQ 300 146
fTPDXmcygjw 300 11
EhRxb8-cNzQ 164 325
iO8RYYQzNiE 299 191
XbCncZcXuTI 300 55
pSCunaRn45A 300 265
UqI--TBQRgg 300 165
yD42KW6cm-A 300 186
VseX7hoxhbM 300 61
1FEcfy-moBM 300 8
BUT8oefH9Nw 300 120
-49tMSUTnZg 300 227
cZKPTt_FcFs 300 85
fiKJm0eavfw 300 323
gJcVljRRxGE 302 87
de1rSoht9t4 300 253
UAIJnI7fQYo 300 284
c4eIDxmVmCw 300 95
3LGce3efz7M 300 332
EC8iyn_q-NM 300 92
eo15donXwmY 300 351
NsG31u7Pd2Q 300 87
ILkPWpZYlPE 300 137
n5ZHSJRZl1U 300 338
UoQE44FEqLQ 300 260
5I-4meP_5wY 300 185
udLMOf77S3U 300 209
a4Ye18Mnblk 262 172
QbDMgHWwt_s 236 395
S6iAYBBMnwk 300 267
DNMfmNV8Uug 300 131
AJdp07pp43c 300 293
tVuop87KbDY 300 103
o79s5eOAF-c 300 246
dMt_nuBNdeY 300 168
RJU9NV1R4Fw 300 128
Zhux7Vy-hHc 300 82
47Cj6jwQKjo 300 228
a7Mc-0lwAuE 300 129
taZtEzvkg3M 300 264
bVDZohQJhBI 240 129
sBJk5li0O5o 216 154
DQUNZmbQI_g 300 29
-zpKHNrNsn4 300 244
Dcz0r8q-sx0 300 249
hfRKTH9pOMA 165 116
8CdUbOHDtes 300 222
# Copyright (c) OpenMMLab. All rights reserved.
log_level = 'INFO'
load_from = None
resume_from = None
dist_params = dict(backend='nccl')
workflow = [('train', 1)]
checkpoint_config = dict(interval=10)
evaluation = dict(interval=10, metric='mAP', key_indicator='AP')
optimizer = dict(
type='Adam',
lr=5e-4,
)
optimizer_config = dict(grad_clip=None)
# learning policy
lr_config = dict(
policy='step',
warmup='linear',
warmup_iters=500,
warmup_ratio=0.001,
step=[170, 200])
total_epochs = 210
log_config = dict(
interval=50,
hooks=[
dict(type='TextLoggerHook'),
# dict(type='TensorboardLoggerHook')
])
channel_cfg = dict(
num_output_channels=17,
dataset_joints=17,
dataset_channel=[
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
],
inference_channel=[
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16
])
# model settings
model = dict(
type='TopDown',
pretrained='https://download.openmmlab.com/mmpose/'
'pretrain_models/hrnet_w32-36af842e.pth',
backbone=dict(
type='HRNet',
in_channels=3,
extra=dict(
stage1=dict(
num_modules=1,
num_branches=1,
block='BOTTLENECK',
num_blocks=(4, ),
num_channels=(64, )),
stage2=dict(
num_modules=1,
num_branches=2,
block='BASIC',
num_blocks=(4, 4),
num_channels=(32, 64)),
stage3=dict(
num_modules=4,
num_branches=3,
block='BASIC',
num_blocks=(4, 4, 4),
num_channels=(32, 64, 128)),
stage4=dict(
num_modules=3,
num_branches=4,
block='BASIC',
num_blocks=(4, 4, 4, 4),
num_channels=(32, 64, 128, 256))),
),
keypoint_head=dict(
type='TopdownHeatmapSimpleHead',
in_channels=32,
out_channels=channel_cfg['num_output_channels'],
num_deconv_layers=0,
extra=dict(final_conv_kernel=1, ),
loss_keypoint=dict(type='JointsMSELoss', use_target_weight=True)),
train_cfg=dict(),
test_cfg=dict(
flip_test=True,
post_process='default',
shift_heatmap=True,
modulate_kernel=11))
data_cfg = dict(
image_size=[192, 256],
heatmap_size=[48, 64],
num_output_channels=channel_cfg['num_output_channels'],
num_joints=channel_cfg['dataset_joints'],
dataset_channel=channel_cfg['dataset_channel'],
inference_channel=channel_cfg['inference_channel'],
soft_nms=False,
nms_thr=1.0,
oks_thr=0.9,
vis_thr=0.2,
use_gt_bbox=False,
det_bbox_thr=0.0,
bbox_file='data/coco/person_detection_results/'
'COCO_val2017_detections_AP_H_56_person.json',
)
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(type='TopDownRandomFlip', flip_prob=0.5),
dict(
type='TopDownHalfBodyTransform',
num_joints_half_body=8,
prob_half_body=0.3),
dict(
type='TopDownGetRandomScaleRotation', rot_factor=40, scale_factor=0.5),
dict(type='TopDownAffine'),
dict(type='ToTensor'),
dict(
type='NormalizeTensor',
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
dict(type='TopDownGenerateTarget', sigma=2),
dict(
type='Collect',
keys=['img', 'target', 'target_weight'],
meta_keys=[
'image_file', 'joints_3d', 'joints_3d_visible', 'center', 'scale',
'rotation', 'bbox_score', 'flip_pairs'
]),
]
val_pipeline = [
dict(type='LoadImageFromFile'),
dict(type='TopDownGetBboxCenterScale', padding=1.25),
dict(type='TopDownAffine'),
dict(type='ToTensor'),
dict(
type='NormalizeTensor',
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
dict(
type='Collect',
keys=['img'],
meta_keys=[
'image_file', 'center', 'scale', 'rotation', 'bbox_score',
'flip_pairs'
]),
]
test_pipeline = val_pipeline
data_root = 'data/coco'
data = dict(
samples_per_gpu=64,
workers_per_gpu=2,
val_dataloader=dict(samples_per_gpu=32),
test_dataloader=dict(samples_per_gpu=32),
train=dict(
type='TopDownCocoDataset',
ann_file=f'{data_root}/annotations/person_keypoints_train2017.json',
img_prefix=f'{data_root}/train2017/',
data_cfg=data_cfg,
pipeline=train_pipeline),
val=dict(
type='TopDownCocoDataset',
ann_file=f'{data_root}/annotations/person_keypoints_val2017.json',
img_prefix=f'{data_root}/val2017/',
data_cfg=data_cfg,
pipeline=val_pipeline),
test=dict(
type='TopDownCocoDataset',
ann_file=f'{data_root}/annotations/person_keypoints_val2017.json',
img_prefix=f'{data_root}/val2017/',
data_cfg=data_cfg,
pipeline=val_pipeline),
)
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import json
import random
from collections import deque
from operator import itemgetter
import cv2
import mmcv
import numpy as np
import torch
from mmcv import Config, DictAction
from mmcv.parallel import collate, scatter
from mmaction.apis import init_recognizer
from mmaction.datasets.pipelines import Compose
FONTFACE = cv2.FONT_HERSHEY_COMPLEX_SMALL
FONTSCALE = 1
THICKNESS = 1
LINETYPE = 1
EXCLUED_STEPS = [
'OpenCVInit', 'OpenCVDecode', 'DecordInit', 'DecordDecode', 'PyAVInit',
'PyAVDecode', 'RawFrameDecode'
]
def parse_args():
parser = argparse.ArgumentParser(
description='MMAction2 predict different labels in a long video demo')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file/url')
parser.add_argument('video_path', help='video file/url')
parser.add_argument('label', help='label file')
parser.add_argument('out_file', help='output result file in video/json')
parser.add_argument(
'--input-step',
type=int,
default=1,
help='input step for sampling frames')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--threshold',
type=float,
default=0.01,
help='recognition score threshold')
parser.add_argument(
'--stride',
type=float,
default=0,
help=('the prediction stride equals to stride * sample_length '
'(sample_length indicates the size of temporal window from '
'which you sample frames, which equals to '
'clip_len x frame_interval), if set as 0, the '
'prediction stride is 1'))
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
parser.add_argument(
'--label-color',
nargs='+',
type=int,
default=(255, 255, 255),
help='font color (B, G, R) of the labels in output video')
parser.add_argument(
'--msg-color',
nargs='+',
type=int,
default=(128, 128, 128),
help='font color (B, G, R) of the messages in output video')
args = parser.parse_args()
return args
def show_results_video(result_queue,
text_info,
thr,
msg,
frame,
video_writer,
label_color=(255, 255, 255),
msg_color=(128, 128, 128)):
if len(result_queue) != 0:
text_info = {}
results = result_queue.popleft()
for i, result in enumerate(results):
selected_label, score = result
if score < thr:
break
location = (0, 40 + i * 20)
text = selected_label + ': ' + str(round(score, 2))
text_info[location] = text
cv2.putText(frame, text, location, FONTFACE, FONTSCALE,
label_color, THICKNESS, LINETYPE)
elif len(text_info):
for location, text in text_info.items():
cv2.putText(frame, text, location, FONTFACE, FONTSCALE,
label_color, THICKNESS, LINETYPE)
else:
cv2.putText(frame, msg, (0, 40), FONTFACE, FONTSCALE, msg_color,
THICKNESS, LINETYPE)
video_writer.write(frame)
return text_info
def get_results_json(result_queue, text_info, thr, msg, ind, out_json):
if len(result_queue) != 0:
text_info = {}
results = result_queue.popleft()
for i, result in enumerate(results):
selected_label, score = result
if score < thr:
break
text_info[i + 1] = selected_label + ': ' + str(round(score, 2))
out_json[ind] = text_info
elif len(text_info):
out_json[ind] = text_info
else:
out_json[ind] = msg
return text_info, out_json
def show_results(model, data, label, args):
frame_queue = deque(maxlen=args.sample_length)
result_queue = deque(maxlen=1)
cap = cv2.VideoCapture(args.video_path)
num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
msg = 'Preparing action recognition ...'
text_info = {}
out_json = {}
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
frame_size = (frame_width, frame_height)
ind = 0
video_writer = None if args.out_file.endswith('.json') \
else cv2.VideoWriter(args.out_file, fourcc, fps, frame_size)
prog_bar = mmcv.ProgressBar(num_frames)
backup_frames = []
while ind < num_frames:
ind += 1
prog_bar.update()
ret, frame = cap.read()
if frame is None:
# drop it when encounting None
continue
backup_frames.append(np.array(frame)[:, :, ::-1])
if ind == args.sample_length:
# provide a quick show at the beginning
frame_queue.extend(backup_frames)
backup_frames = []
elif ((len(backup_frames) == args.input_step
and ind > args.sample_length) or ind == num_frames):
# pick a frame from the backup
# when the backup is full or reach the last frame
chosen_frame = random.choice(backup_frames)
backup_frames = []
frame_queue.append(chosen_frame)
ret, scores = inference(model, data, args, frame_queue)
if ret:
num_selected_labels = min(len(label), 5)
scores_tuples = tuple(zip(label, scores))
scores_sorted = sorted(
scores_tuples, key=itemgetter(1), reverse=True)
results = scores_sorted[:num_selected_labels]
result_queue.append(results)
if args.out_file.endswith('.json'):
text_info, out_json = get_results_json(result_queue, text_info,
args.threshold, msg, ind,
out_json)
else:
text_info = show_results_video(result_queue, text_info,
args.threshold, msg, frame,
video_writer, args.label_color,
args.msg_color)
cap.release()
cv2.destroyAllWindows()
if args.out_file.endswith('.json'):
with open(args.out_file, 'w') as js:
json.dump(out_json, js)
def inference(model, data, args, frame_queue):
if len(frame_queue) != args.sample_length:
# Do no inference when there is no enough frames
return False, None
cur_windows = list(np.array(frame_queue))
if data['img_shape'] is None:
data['img_shape'] = frame_queue[0].shape[:2]
cur_data = data.copy()
cur_data['imgs'] = cur_windows
cur_data = args.test_pipeline(cur_data)
cur_data = collate([cur_data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
cur_data = scatter(cur_data, [args.device])[0]
with torch.no_grad():
scores = model(return_loss=False, **cur_data)[0]
if args.stride > 0:
pred_stride = int(args.sample_length * args.stride)
for _ in range(pred_stride):
frame_queue.popleft()
# for case ``args.stride=0``
# deque will automatically popleft one element
return True, scores
def main():
args = parse_args()
args.device = torch.device(args.device)
cfg = Config.fromfile(args.config)
cfg.merge_from_dict(args.cfg_options)
model = init_recognizer(cfg, args.checkpoint, device=args.device)
data = dict(img_shape=None, modality='RGB', label=-1)
with open(args.label, 'r') as f:
label = [line.strip() for line in f]
# prepare test pipeline from non-camera pipeline
cfg = model.cfg
sample_length = 0
pipeline = cfg.data.test.pipeline
pipeline_ = pipeline.copy()
for step in pipeline:
if 'SampleFrames' in step['type']:
sample_length = step['clip_len'] * step['num_clips']
data['num_clips'] = step['num_clips']
data['clip_len'] = step['clip_len']
pipeline_.remove(step)
if step['type'] in EXCLUED_STEPS:
# remove step to decode frames
pipeline_.remove(step)
test_pipeline = Compose(pipeline_)
assert sample_length > 0
args.sample_length = sample_length
args.test_pipeline = test_pipeline
show_results(model, data, label, args)
if __name__ == '__main__':
main()
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
{
"cells": [
{
"cell_type": "code",
"execution_count": 6,
"id": "speaking-algebra",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import cv2\n",
"import os.path as osp\n",
"import decord\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"import urllib\n",
"import moviepy.editor as mpy\n",
"import random as rd\n",
"from mmpose.apis import vis_pose_result\n",
"from mmpose.models import TopDown\n",
"from mmcv import load, dump\n",
"\n",
"# We assume the annotation is already prepared\n",
"gym_train_ann_file = '../data/skeleton/gym_train.pkl'\n",
"gym_val_ann_file = '../data/skeleton/gym_val.pkl'\n",
"ntu60_xsub_train_ann_file = '../data/skeleton/ntu60_xsub_train.pkl'\n",
"ntu60_xsub_val_ann_file = '../data/skeleton/ntu60_xsub_val.pkl'"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "alive-consolidation",
"metadata": {},
"outputs": [],
"source": [
"FONTFACE = cv2.FONT_HERSHEY_DUPLEX\n",
"FONTSCALE = 0.6\n",
"FONTCOLOR = (255, 255, 255)\n",
"BGBLUE = (0, 119, 182)\n",
"THICKNESS = 1\n",
"LINETYPE = 1"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "ranging-conjunction",
"metadata": {},
"outputs": [],
"source": [
"def add_label(frame, label, BGCOLOR=BGBLUE):\n",
" threshold = 30\n",
" def split_label(label):\n",
" label = label.split()\n",
" lines, cline = [], ''\n",
" for word in label:\n",
" if len(cline) + len(word) < threshold:\n",
" cline = cline + ' ' + word\n",
" else:\n",
" lines.append(cline)\n",
" cline = word\n",
" if cline != '':\n",
" lines += [cline]\n",
" return lines\n",
" \n",
" if len(label) > 30:\n",
" label = split_label(label)\n",
" else:\n",
" label = [label]\n",
" label = ['Action: '] + label\n",
" \n",
" sizes = []\n",
" for line in label:\n",
" sizes.append(cv2.getTextSize(line, FONTFACE, FONTSCALE, THICKNESS)[0])\n",
" box_width = max([x[0] for x in sizes]) + 10\n",
" text_height = sizes[0][1]\n",
" box_height = len(sizes) * (text_height + 6)\n",
" \n",
" cv2.rectangle(frame, (0, 0), (box_width, box_height), BGCOLOR, -1)\n",
" for i, line in enumerate(label):\n",
" location = (5, (text_height + 6) * i + text_height + 3)\n",
" cv2.putText(frame, line, location, FONTFACE, FONTSCALE, FONTCOLOR, THICKNESS, LINETYPE)\n",
" return frame\n",
" \n",
"\n",
"def vis_skeleton(vid_path, anno, category_name=None, ratio=0.5):\n",
" vid = decord.VideoReader(vid_path)\n",
" frames = [x.asnumpy() for x in vid]\n",
" \n",
" h, w, _ = frames[0].shape\n",
" new_shape = (int(w * ratio), int(h * ratio))\n",
" frames = [cv2.resize(f, new_shape) for f in frames]\n",
" \n",
" assert len(frames) == anno['total_frames']\n",
" # The shape is N x T x K x 3\n",
" kps = np.concatenate([anno['keypoint'], anno['keypoint_score'][..., None]], axis=-1)\n",
" kps[..., :2] *= ratio\n",
" # Convert to T x N x K x 3\n",
" kps = kps.transpose([1, 0, 2, 3])\n",
" vis_frames = []\n",
"\n",
" # we need an instance of TopDown model, so build a minimal one\n",
" model = TopDown(backbone=dict(type='ShuffleNetV1'))\n",
"\n",
" for f, kp in zip(frames, kps):\n",
" bbox = np.zeros([0, 4], dtype=np.float32)\n",
" result = [dict(bbox=bbox, keypoints=k) for k in kp]\n",
" vis_frame = vis_pose_result(model, f, result)\n",
" \n",
" if category_name is not None:\n",
" vis_frame = add_label(vis_frame, category_name)\n",
" \n",
" vis_frames.append(vis_frame)\n",
" return vis_frames"
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "applied-humanity",
"metadata": {},
"outputs": [],
"source": [
"keypoint_pipeline = [\n",
" dict(type='PoseDecode'),\n",
" dict(type='PoseCompact', hw_ratio=1., allow_imgpad=True),\n",
" dict(type='Resize', scale=(-1, 64)),\n",
" dict(type='CenterCrop', crop_size=64),\n",
" dict(type='GeneratePoseTarget', sigma=0.6, use_score=True, with_kp=True, with_limb=False)\n",
"]\n",
"\n",
"limb_pipeline = [\n",
" dict(type='PoseDecode'),\n",
" dict(type='PoseCompact', hw_ratio=1., allow_imgpad=True),\n",
" dict(type='Resize', scale=(-1, 64)),\n",
" dict(type='CenterCrop', crop_size=64),\n",
" dict(type='GeneratePoseTarget', sigma=0.6, use_score=True, with_kp=False, with_limb=True)\n",
"]\n",
"\n",
"from mmaction.datasets.pipelines import Compose\n",
"def get_pseudo_heatmap(anno, flag='keypoint'):\n",
" assert flag in ['keypoint', 'limb']\n",
" pipeline = Compose(keypoint_pipeline if flag == 'keypoint' else limb_pipeline)\n",
" return pipeline(anno)['imgs']\n",
"\n",
"def vis_heatmaps(heatmaps, channel=-1, ratio=8):\n",
" # if channel is -1, draw all keypoints / limbs on the same map\n",
" import matplotlib.cm as cm\n",
" h, w, _ = heatmaps[0].shape\n",
" newh, neww = int(h * ratio), int(w * ratio)\n",
" \n",
" if channel == -1:\n",
" heatmaps = [np.max(x, axis=-1) for x in heatmaps]\n",
" cmap = cm.viridis\n",
" heatmaps = [(cmap(x)[..., :3] * 255).astype(np.uint8) for x in heatmaps]\n",
" heatmaps = [cv2.resize(x, (neww, newh)) for x in heatmaps]\n",
" return heatmaps"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "automatic-commons",
"metadata": {},
"outputs": [],
"source": [
"# Load GYM annotations\n",
"lines = list(urllib.request.urlopen('https://sdolivia.github.io/FineGym/resources/dataset/gym99_categories.txt'))\n",
"gym_categories = [x.decode().strip().split('; ')[-1] for x in lines]\n",
"gym_annos = load(gym_train_ann_file) + load(gym_val_ann_file)"
]
},
{
"cell_type": "code",
"execution_count": 74,
"id": "numerous-bristol",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--2021-04-25 22:18:53-- https://download.openmmlab.com/mmaction/posec3d/gym_samples.tar\n",
"Resolving download.openmmlab.com (download.openmmlab.com)... 124.160.145.22\n",
"Connecting to download.openmmlab.com (download.openmmlab.com)|124.160.145.22|:443... connected.\n",
"HTTP request sent, awaiting response... 200 OK\n",
"Length: 36300800 (35M) [application/x-tar]\n",
"Saving to: ‘gym_samples.tar’\n",
"\n",
"100%[======================================>] 36,300,800 11.5MB/s in 3.0s \n",
"\n",
"2021-04-25 22:18:58 (11.5 MB/s) - ‘gym_samples.tar’ saved [36300800/36300800]\n",
"\n"
]
}
],
"source": [
"# download sample videos of GYM\n",
"!wget https://download.openmmlab.com/mmaction/posec3d/gym_samples.tar\n",
"!tar -xf gym_samples.tar\n",
"!rm gym_samples.tar"
]
},
{
"cell_type": "code",
"execution_count": 76,
"id": "ranging-harrison",
"metadata": {},
"outputs": [],
"source": [
"gym_root = 'gym_samples/'\n",
"gym_vids = os.listdir(gym_root)\n",
"# visualize pose of which video? index in 0 - 50.\n",
"idx = 1\n",
"vid = gym_vids[idx]\n",
"\n",
"frame_dir = vid.split('.')[0]\n",
"vid_path = osp.join(gym_root, vid)\n",
"anno = [x for x in gym_annos if x['frame_dir'] == frame_dir][0]"
]
},
{
"cell_type": "code",
"execution_count": 86,
"id": "fitting-courage",
"metadata": {},
"outputs": [],
"source": [
"# Visualize Skeleton\n",
"vis_frames = vis_skeleton(vid_path, anno, gym_categories[anno['label']])\n",
"vid = mpy.ImageSequenceClip(vis_frames, fps=24)\n",
"vid.ipython_display()"
]
},
{
"cell_type": "code",
"execution_count": 87,
"id": "orange-logging",
"metadata": {},
"outputs": [],
"source": [
"keypoint_heatmap = get_pseudo_heatmap(anno)\n",
"keypoint_mapvis = vis_heatmaps(keypoint_heatmap)\n",
"keypoint_mapvis = [add_label(f, gym_categories[anno['label']]) for f in keypoint_mapvis]\n",
"vid = mpy.ImageSequenceClip(keypoint_mapvis, fps=24)\n",
"vid.ipython_display()"
]
},
{
"cell_type": "code",
"execution_count": 88,
"id": "residential-conjunction",
"metadata": {},
"outputs": [],
"source": [
"limb_heatmap = get_pseudo_heatmap(anno, 'limb')\n",
"limb_mapvis = vis_heatmaps(limb_heatmap)\n",
"limb_mapvis = [add_label(f, gym_categories[anno['label']]) for f in limb_mapvis]\n",
"vid = mpy.ImageSequenceClip(limb_mapvis, fps=24)\n",
"vid.ipython_display()"
]
},
{
"cell_type": "code",
"execution_count": 66,
"id": "coupled-stranger",
"metadata": {},
"outputs": [],
"source": [
"# The name list of \n",
"ntu_categories = ['drink water', 'eat meal/snack', 'brushing teeth', 'brushing hair', 'drop', 'pickup', \n",
" 'throw', 'sitting down', 'standing up (from sitting position)', 'clapping', 'reading', \n",
" 'writing', 'tear up paper', 'wear jacket', 'take off jacket', 'wear a shoe', \n",
" 'take off a shoe', 'wear on glasses', 'take off glasses', 'put on a hat/cap', \n",
" 'take off a hat/cap', 'cheer up', 'hand waving', 'kicking something', \n",
" 'reach into pocket', 'hopping (one foot jumping)', 'jump up', \n",
" 'make a phone call/answer phone', 'playing with phone/tablet', 'typing on a keyboard', \n",
" 'pointing to something with finger', 'taking a selfie', 'check time (from watch)', \n",
" 'rub two hands together', 'nod head/bow', 'shake head', 'wipe face', 'salute', \n",
" 'put the palms together', 'cross hands in front (say stop)', 'sneeze/cough', \n",
" 'staggering', 'falling', 'touch head (headache)', 'touch chest (stomachache/heart pain)', \n",
" 'touch back (backache)', 'touch neck (neckache)', 'nausea or vomiting condition', \n",
" 'use a fan (with hand or paper)/feeling warm', 'punching/slapping other person', \n",
" 'kicking other person', 'pushing other person', 'pat on back of other person', \n",
" 'point finger at the other person', 'hugging other person', \n",
" 'giving something to other person', \"touch other person's pocket\", 'handshaking', \n",
" 'walking towards each other', 'walking apart from each other']\n",
"ntu_annos = load(ntu60_xsub_train_ann_file) + load(ntu60_xsub_val_ann_file)"
]
},
{
"cell_type": "code",
"execution_count": 80,
"id": "critical-review",
"metadata": {},
"outputs": [],
"source": [
"ntu_root = 'ntu_samples/'\n",
"ntu_vids = os.listdir(ntu_root)\n",
"# visualize pose of which video? index in 0 - 50.\n",
"idx = 20\n",
"vid = ntu_vids[idx]\n",
"\n",
"frame_dir = vid.split('.')[0]\n",
"vid_path = osp.join(ntu_root, vid)\n",
"anno = [x for x in ntu_annos if x['frame_dir'] == frame_dir.split('_')[0]][0]\n"
]
},
{
"cell_type": "code",
"execution_count": 81,
"id": "seasonal-palmer",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--2021-04-25 22:21:16-- https://download.openmmlab.com/mmaction/posec3d/ntu_samples.tar\n",
"Resolving download.openmmlab.com (download.openmmlab.com)... 124.160.145.22\n",
"Connecting to download.openmmlab.com (download.openmmlab.com)|124.160.145.22|:443... connected.\n",
"HTTP request sent, awaiting response... 200 OK\n",
"Length: 121753600 (116M) [application/x-tar]\n",
"Saving to: ‘ntu_samples.tar’\n",
"\n",
"100%[======================================>] 121,753,600 14.4MB/s in 9.2s \n",
"\n",
"2021-04-25 22:21:26 (12.6 MB/s) - ‘ntu_samples.tar’ saved [121753600/121753600]\n",
"\n"
]
}
],
"source": [
"# download sample videos of NTU-60\n",
"!wget https://download.openmmlab.com/mmaction/posec3d/ntu_samples.tar\n",
"!tar -xf ntu_samples.tar\n",
"!rm ntu_samples.tar"
]
},
{
"cell_type": "code",
"execution_count": 89,
"id": "accompanied-invitation",
"metadata": {},
"outputs": [],
"source": [
"vis_frames = vis_skeleton(vid_path, anno, ntu_categories[anno['label']])\n",
"vid = mpy.ImageSequenceClip(vis_frames, fps=24)\n",
"vid.ipython_display()"
]
},
{
"cell_type": "code",
"execution_count": 90,
"id": "respiratory-conclusion",
"metadata": {},
"outputs": [],
"source": [
"keypoint_heatmap = get_pseudo_heatmap(anno)\n",
"keypoint_mapvis = vis_heatmaps(keypoint_heatmap)\n",
"keypoint_mapvis = [add_label(f, gym_categories[anno['label']]) for f in keypoint_mapvis]\n",
"vid = mpy.ImageSequenceClip(keypoint_mapvis, fps=24)\n",
"vid.ipython_display()"
]
},
{
"cell_type": "code",
"execution_count": 91,
"id": "thirty-vancouver",
"metadata": {},
"outputs": [],
"source": [
"limb_heatmap = get_pseudo_heatmap(anno, 'limb')\n",
"limb_mapvis = vis_heatmaps(limb_heatmap)\n",
"limb_mapvis = [add_label(f, gym_categories[anno['label']]) for f in limb_mapvis]\n",
"vid = mpy.ImageSequenceClip(limb_mapvis, fps=24)\n",
"vid.ipython_display()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
# Copyright (c) OpenMMLab. All rights reserved.
import argparse
import time
from collections import deque
from operator import itemgetter
from threading import Thread
import cv2
import numpy as np
import torch
from mmcv import Config, DictAction
from mmcv.parallel import collate, scatter
from mmaction.apis import init_recognizer
from mmaction.datasets.pipelines import Compose
FONTFACE = cv2.FONT_HERSHEY_COMPLEX_SMALL
FONTSCALE = 1
FONTCOLOR = (255, 255, 255) # BGR, white
MSGCOLOR = (128, 128, 128) # BGR, gray
THICKNESS = 1
LINETYPE = 1
EXCLUED_STEPS = [
'OpenCVInit', 'OpenCVDecode', 'DecordInit', 'DecordDecode', 'PyAVInit',
'PyAVDecode', 'RawFrameDecode'
]
def parse_args():
parser = argparse.ArgumentParser(description='MMAction2 webcam demo')
parser.add_argument('config', help='test config file path')
parser.add_argument('checkpoint', help='checkpoint file')
parser.add_argument('label', help='label file')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--camera-id', type=int, default=0, help='camera device id')
parser.add_argument(
'--threshold',
type=float,
default=0.01,
help='recognition score threshold')
parser.add_argument(
'--average-size',
type=int,
default=1,
help='number of latest clips to be averaged for prediction')
parser.add_argument(
'--drawing-fps',
type=int,
default=20,
help='Set upper bound FPS value of the output drawing')
parser.add_argument(
'--inference-fps',
type=int,
default=4,
help='Set upper bound FPS value of model inference')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
args = parser.parse_args()
assert args.drawing_fps >= 0 and args.inference_fps >= 0, \
'upper bound FPS value of drawing and inference should be set as ' \
'positive number, or zero for no limit'
return args
def show_results():
print('Press "Esc", "q" or "Q" to exit')
text_info = {}
cur_time = time.time()
while True:
msg = 'Waiting for action ...'
_, frame = camera.read()
frame_queue.append(np.array(frame[:, :, ::-1]))
if len(result_queue) != 0:
text_info = {}
results = result_queue.popleft()
for i, result in enumerate(results):
selected_label, score = result
if score < threshold:
break
location = (0, 40 + i * 20)
text = selected_label + ': ' + str(round(score, 2))
text_info[location] = text
cv2.putText(frame, text, location, FONTFACE, FONTSCALE,
FONTCOLOR, THICKNESS, LINETYPE)
elif len(text_info) != 0:
for location, text in text_info.items():
cv2.putText(frame, text, location, FONTFACE, FONTSCALE,
FONTCOLOR, THICKNESS, LINETYPE)
else:
cv2.putText(frame, msg, (0, 40), FONTFACE, FONTSCALE, MSGCOLOR,
THICKNESS, LINETYPE)
cv2.imshow('camera', frame)
ch = cv2.waitKey(1)
if ch == 27 or ch == ord('q') or ch == ord('Q'):
break
if drawing_fps > 0:
# add a limiter for actual drawing fps <= drawing_fps
sleep_time = 1 / drawing_fps - (time.time() - cur_time)
if sleep_time > 0:
time.sleep(sleep_time)
cur_time = time.time()
def inference():
score_cache = deque()
scores_sum = 0
cur_time = time.time()
while True:
cur_windows = []
while len(cur_windows) == 0:
if len(frame_queue) == sample_length:
cur_windows = list(np.array(frame_queue))
if data['img_shape'] is None:
data['img_shape'] = frame_queue.popleft().shape[:2]
cur_data = data.copy()
cur_data['imgs'] = cur_windows
cur_data = test_pipeline(cur_data)
cur_data = collate([cur_data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
cur_data = scatter(cur_data, [device])[0]
with torch.no_grad():
scores = model(return_loss=False, **cur_data)[0]
score_cache.append(scores)
scores_sum += scores
if len(score_cache) == average_size:
scores_avg = scores_sum / average_size
num_selected_labels = min(len(label), 5)
scores_tuples = tuple(zip(label, scores_avg))
scores_sorted = sorted(
scores_tuples, key=itemgetter(1), reverse=True)
results = scores_sorted[:num_selected_labels]
result_queue.append(results)
scores_sum -= score_cache.popleft()
if inference_fps > 0:
# add a limiter for actual inference fps <= inference_fps
sleep_time = 1 / inference_fps - (time.time() - cur_time)
if sleep_time > 0:
time.sleep(sleep_time)
cur_time = time.time()
camera.release()
cv2.destroyAllWindows()
def main():
global frame_queue, camera, frame, results, threshold, sample_length, \
data, test_pipeline, model, device, average_size, label, \
result_queue, drawing_fps, inference_fps
args = parse_args()
average_size = args.average_size
threshold = args.threshold
drawing_fps = args.drawing_fps
inference_fps = args.inference_fps
device = torch.device(args.device)
cfg = Config.fromfile(args.config)
cfg.merge_from_dict(args.cfg_options)
model = init_recognizer(cfg, args.checkpoint, device=device)
camera = cv2.VideoCapture(args.camera_id)
data = dict(img_shape=None, modality='RGB', label=-1)
with open(args.label, 'r') as f:
label = [line.strip() for line in f]
# prepare test pipeline from non-camera pipeline
cfg = model.cfg
sample_length = 0
pipeline = cfg.data.test.pipeline
pipeline_ = pipeline.copy()
for step in pipeline:
if 'SampleFrames' in step['type']:
sample_length = step['clip_len'] * step['num_clips']
data['num_clips'] = step['num_clips']
data['clip_len'] = step['clip_len']
pipeline_.remove(step)
if step['type'] in EXCLUED_STEPS:
# remove step to decode frames
pipeline_.remove(step)
test_pipeline = Compose(pipeline_)
assert sample_length > 0
try:
frame_queue = deque(maxlen=sample_length)
result_queue = deque(maxlen=1)
pw = Thread(target=show_results, args=(), daemon=True)
pr = Thread(target=inference, args=(), daemon=True)
pw.start()
pr.start()
pw.join()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
# Copyright (c) OpenMMLab. All rights reserved.
"""Webcam Spatio-Temporal Action Detection Demo.
Some codes are based on https://github.com/facebookresearch/SlowFast
"""
import argparse
import atexit
import copy
import logging
import queue
import threading
import time
from abc import ABCMeta, abstractmethod
import cv2
import mmcv
import numpy as np
import torch
from mmcv import Config, DictAction
from mmcv.runner import load_checkpoint
from mmaction.models import build_detector
try:
from mmdet.apis import inference_detector, init_detector
except (ImportError, ModuleNotFoundError):
raise ImportError('Failed to import `inference_detector` and '
'`init_detector` form `mmdet.apis`. These apis are '
'required in this demo! ')
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def parse_args():
parser = argparse.ArgumentParser(
description='MMAction2 webcam spatio-temporal detection demo')
parser.add_argument(
'--config',
default=('configs/detection/ava/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb.py'),
help='spatio temporal detection config file path')
parser.add_argument(
'--checkpoint',
default=('https://download.openmmlab.com/mmaction/detection/ava/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb/'
'slowonly_omnisource_pretrained_r101_8x8x1_20e_ava_rgb'
'_20201217-16378594.pth'),
help='spatio temporal detection checkpoint file/url')
parser.add_argument(
'--action-score-thr',
type=float,
default=0.4,
help='the threshold of human action score')
parser.add_argument(
'--det-config',
default='demo/faster_rcnn_r50_fpn_2x_coco.py',
help='human detection config file path (from mmdet)')
parser.add_argument(
'--det-checkpoint',
default=('http://download.openmmlab.com/mmdetection/v2.0/faster_rcnn/'
'faster_rcnn_r50_fpn_2x_coco/'
'faster_rcnn_r50_fpn_2x_coco_'
'bbox_mAP-0.384_20200504_210434-a5d8aa15.pth'),
help='human detection checkpoint file/url')
parser.add_argument(
'--det-score-thr',
type=float,
default=0.9,
help='the threshold of human detection score')
parser.add_argument(
'--input-video',
default='0',
type=str,
help='webcam id or input video file/url')
parser.add_argument(
'--label-map',
default='tools/data/ava/label_map.txt',
help='label map file')
parser.add_argument(
'--device', type=str, default='cuda:0', help='CPU/CUDA device option')
parser.add_argument(
'--output-fps',
default=15,
type=int,
help='the fps of demo video output')
parser.add_argument(
'--out-filename',
default=None,
type=str,
help='the filename of output video')
parser.add_argument(
'--show',
action='store_true',
help='Whether to show results with cv2.imshow')
parser.add_argument(
'--display-height',
type=int,
default=0,
help='Image height for human detector and draw frames.')
parser.add_argument(
'--display-width',
type=int,
default=0,
help='Image width for human detector and draw frames.')
parser.add_argument(
'--predict-stepsize',
default=8,
type=int,
help='give out a prediction per n frames')
parser.add_argument(
'--clip-vis-length',
default=8,
type=int,
help='Number of draw frames per clip.')
parser.add_argument(
'--cfg-options',
nargs='+',
action=DictAction,
default={},
help='override some settings in the used config, the key-value pair '
'in xxx=yyy format will be merged into config file. For example, '
"'--cfg-options model.backbone.depth=18 model.backbone.with_cp=True'")
args = parser.parse_args()
return args
class TaskInfo:
"""Wapper for a clip.
Transmit data around three threads.
1) Read Thread: Create task and put task into read queue. Init `frames`,
`processed_frames`, `img_shape`, `ratio`, `clip_vis_length`.
2) Main Thread: Get data from read queue, predict human bboxes and stdet
action labels, draw predictions and put task into display queue. Init
`display_bboxes`, `stdet_bboxes` and `action_preds`, update `frames`.
3) Display Thread: Get data from display queue, show/write frames and
delete task.
"""
def __init__(self):
self.id = -1
# raw frames, used as human detector input, draw predictions input
# and output, display input
self.frames = None
# stdet params
self.processed_frames = None # model inputs
self.frames_inds = None # select frames from processed frames
self.img_shape = None # model inputs, processed frame shape
# `action_preds` is `list[list[tuple]]`. The outer brackets indicate
# different bboxes and the intter brackets indicate different action
# results for the same bbox. tuple contains `class_name` and `score`.
self.action_preds = None # stdet results
# human bboxes with the format (xmin, ymin, xmax, ymax)
self.display_bboxes = None # bboxes coords for self.frames
self.stdet_bboxes = None # bboxes coords for self.processed_frames
self.ratio = None # processed_frames.shape[1::-1]/frames.shape[1::-1]
# for each clip, draw predictions on clip_vis_length frames
self.clip_vis_length = -1
def add_frames(self, idx, frames, processed_frames):
"""Add the clip and corresponding id.
Args:
idx (int): the current index of the clip.
frames (list[ndarray]): list of images in "BGR" format.
processed_frames (list[ndarray]): list of resize and normed images
in "BGR" format.
"""
self.frames = frames
self.processed_frames = processed_frames
self.id = idx
self.img_shape = processed_frames[0].shape[:2]
def add_bboxes(self, display_bboxes):
"""Add correspondding bounding boxes."""
self.display_bboxes = display_bboxes
self.stdet_bboxes = display_bboxes.clone()
self.stdet_bboxes[:, ::2] = self.stdet_bboxes[:, ::2] * self.ratio[0]
self.stdet_bboxes[:, 1::2] = self.stdet_bboxes[:, 1::2] * self.ratio[1]
def add_action_preds(self, preds):
"""Add the corresponding action predictions."""
self.action_preds = preds
def get_model_inputs(self, device):
"""Convert preprocessed images to MMAction2 STDet model inputs."""
cur_frames = [self.processed_frames[idx] for idx in self.frames_inds]
input_array = np.stack(cur_frames).transpose((3, 0, 1, 2))[np.newaxis]
input_tensor = torch.from_numpy(input_array).to(device)
return dict(
return_loss=False,
img=[input_tensor],
proposals=[[self.stdet_bboxes]],
img_metas=[[dict(img_shape=self.img_shape)]])
class BaseHumanDetector(metaclass=ABCMeta):
"""Base class for Human Dector.
Args:
device (str): CPU/CUDA device option.
"""
def __init__(self, device):
self.device = torch.device(device)
@abstractmethod
def _do_detect(self, image):
"""Get human bboxes with shape [n, 4].
The format of bboxes is (xmin, ymin, xmax, ymax) in pixels.
"""
def predict(self, task):
"""Add keyframe bboxes to task."""
# keyframe idx == (clip_len * frame_interval) // 2
keyframe = task.frames[len(task.frames) // 2]
# call detector
bboxes = self._do_detect(keyframe)
# convert bboxes to torch.Tensor and move to target device
if isinstance(bboxes, np.ndarray):
bboxes = torch.from_numpy(bboxes).to(self.device)
elif isinstance(bboxes, torch.Tensor) and bboxes.device != self.device:
bboxes = bboxes.to(self.device)
# update task
task.add_bboxes(bboxes)
return task
class MmdetHumanDetector(BaseHumanDetector):
"""Wrapper for mmdetection human detector.
Args:
config (str): Path to mmdetection config.
ckpt (str): Path to mmdetection checkpoint.
device (str): CPU/CUDA device option.
score_thr (float): The threshold of human detection score.
person_classid (int): Choose class from detection results.
Default: 0. Suitable for COCO pretrained models.
"""
def __init__(self, config, ckpt, device, score_thr, person_classid=0):
super().__init__(device)
self.model = init_detector(config, ckpt, device)
self.person_classid = person_classid
self.score_thr = score_thr
def _do_detect(self, image):
"""Get bboxes in shape [n, 4] and values in pixels."""
result = inference_detector(self.model, image)[self.person_classid]
result = result[result[:, 4] >= self.score_thr][:, :4]
return result
class StdetPredictor:
"""Wrapper for MMAction2 spatio-temporal action models.
Args:
config (str): Path to stdet config.
ckpt (str): Path to stdet checkpoint.
device (str): CPU/CUDA device option.
score_thr (float): The threshold of human action score.
label_map_path (str): Path to label map file. The format for each line
is `{class_id}: {class_name}`.
"""
def __init__(self, config, checkpoint, device, score_thr, label_map_path):
self.score_thr = score_thr
# load model
config.model.backbone.pretrained = None
model = build_detector(config.model, test_cfg=config.get('test_cfg'))
load_checkpoint(model, checkpoint, map_location='cpu')
model.to(device)
model.eval()
self.model = model
self.device = device
# init label map, aka class_id to class_name dict
with open(label_map_path) as f:
lines = f.readlines()
lines = [x.strip().split(': ') for x in lines]
self.label_map = {int(x[0]): x[1] for x in lines}
try:
if config['data']['train']['custom_classes'] is not None:
self.label_map = {
id + 1: self.label_map[cls]
for id, cls in enumerate(config['data']['train']
['custom_classes'])
}
except KeyError:
pass
def predict(self, task):
"""Spatio-temporval Action Detection model inference."""
# No need to do inference if no one in keyframe
if len(task.stdet_bboxes) == 0:
return task
with torch.no_grad():
result = self.model(**task.get_model_inputs(self.device))[0]
# pack results of human detector and stdet
preds = []
for _ in range(task.stdet_bboxes.shape[0]):
preds.append([])
for class_id in range(len(result)):
if class_id + 1 not in self.label_map:
continue
for bbox_id in range(task.stdet_bboxes.shape[0]):
if result[class_id][bbox_id, 4] > self.score_thr:
preds[bbox_id].append((self.label_map[class_id + 1],
result[class_id][bbox_id, 4]))
# update task
# `preds` is `list[list[tuple]]`. The outer brackets indicate
# different bboxes and the intter brackets indicate different action
# results for the same bbox. tuple contains `class_name` and `score`.
task.add_action_preds(preds)
return task
class ClipHelper:
"""Multithrading utils to manage the lifecycle of task."""
def __init__(self,
config,
display_height=0,
display_width=0,
input_video=0,
predict_stepsize=40,
output_fps=25,
clip_vis_length=8,
out_filename=None,
show=True,
stdet_input_shortside=256):
# stdet sampling strategy
val_pipeline = config.data.val.pipeline
sampler = [x for x in val_pipeline
if x['type'] == 'SampleAVAFrames'][0]
clip_len, frame_interval = sampler['clip_len'], sampler[
'frame_interval']
self.window_size = clip_len * frame_interval
# asserts
assert (out_filename or show), \
'out_filename and show cannot both be None'
assert clip_len % 2 == 0, 'We would like to have an even clip_len'
assert clip_vis_length <= predict_stepsize
assert 0 < predict_stepsize <= self.window_size
# source params
try:
self.cap = cv2.VideoCapture(int(input_video))
self.webcam = True
except ValueError:
self.cap = cv2.VideoCapture(input_video)
self.webcam = False
assert self.cap.isOpened()
# stdet input preprocessing params
h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.stdet_input_size = mmcv.rescale_size(
(w, h), (stdet_input_shortside, np.Inf))
img_norm_cfg = config['img_norm_cfg']
if 'to_rgb' not in img_norm_cfg and 'to_bgr' in img_norm_cfg:
to_bgr = img_norm_cfg.pop('to_bgr')
img_norm_cfg['to_rgb'] = to_bgr
img_norm_cfg['mean'] = np.array(img_norm_cfg['mean'])
img_norm_cfg['std'] = np.array(img_norm_cfg['std'])
self.img_norm_cfg = img_norm_cfg
# task init params
self.clip_vis_length = clip_vis_length
self.predict_stepsize = predict_stepsize
self.buffer_size = self.window_size - self.predict_stepsize
frame_start = self.window_size // 2 - (clip_len // 2) * frame_interval
self.frames_inds = [
frame_start + frame_interval * i for i in range(clip_len)
]
self.buffer = []
self.processed_buffer = []
# output/display params
if display_height > 0 and display_width > 0:
self.display_size = (display_width, display_height)
elif display_height > 0 or display_width > 0:
self.display_size = mmcv.rescale_size(
(w, h), (np.Inf, max(display_height, display_width)))
else:
self.display_size = (w, h)
self.ratio = tuple(
n / o for n, o in zip(self.stdet_input_size, self.display_size))
if output_fps <= 0:
self.output_fps = int(self.cap.get(cv2.CAP_PROP_FPS))
else:
self.output_fps = output_fps
self.show = show
self.video_writer = None
if out_filename is not None:
self.video_writer = self.get_output_video_writer(out_filename)
display_start_idx = self.window_size // 2 - self.predict_stepsize // 2
self.display_inds = [
display_start_idx + i for i in range(self.predict_stepsize)
]
# display multi-theading params
self.display_id = -1 # task.id for display queue
self.display_queue = {}
self.display_lock = threading.Lock()
self.output_lock = threading.Lock()
# read multi-theading params
self.read_id = -1 # task.id for read queue
self.read_id_lock = threading.Lock()
self.read_queue = queue.Queue()
self.read_lock = threading.Lock()
self.not_end = True # cap.read() flag
# program state
self.stopped = False
atexit.register(self.clean)
def read_fn(self):
"""Main function for read thread.
Contains three steps:
1) Read and preprocess (resize + norm) frames from source.
2) Create task by frames from previous step and buffer.
3) Put task into read queue.
"""
was_read = True
start_time = time.time()
while was_read and not self.stopped:
# init task
task = TaskInfo()
task.clip_vis_length = self.clip_vis_length
task.frames_inds = self.frames_inds
task.ratio = self.ratio
# read buffer
frames = []
processed_frames = []
if len(self.buffer) != 0:
frames = self.buffer
if len(self.processed_buffer) != 0:
processed_frames = self.processed_buffer
# read and preprocess frames from source and update task
with self.read_lock:
before_read = time.time()
read_frame_cnt = self.window_size - len(frames)
while was_read and len(frames) < self.window_size:
was_read, frame = self.cap.read()
if not self.webcam:
# Reading frames too fast may lead to unexpected
# performance degradation. If you have enough
# resource, this line could be commented.
time.sleep(1 / self.output_fps)
if was_read:
frames.append(mmcv.imresize(frame, self.display_size))
processed_frame = mmcv.imresize(
frame, self.stdet_input_size).astype(np.float32)
_ = mmcv.imnormalize_(processed_frame,
**self.img_norm_cfg)
processed_frames.append(processed_frame)
task.add_frames(self.read_id + 1, frames, processed_frames)
# update buffer
if was_read:
self.buffer = frames[-self.buffer_size:]
self.processed_buffer = processed_frames[-self.buffer_size:]
# update read state
with self.read_id_lock:
self.read_id += 1
self.not_end = was_read
self.read_queue.put((was_read, copy.deepcopy(task)))
cur_time = time.time()
logger.debug(
f'Read thread: {1000*(cur_time - start_time):.0f} ms, '
f'{read_frame_cnt / (cur_time - before_read):.0f} fps')
start_time = cur_time
def display_fn(self):
"""Main function for display thread.
Read data from display queue and display predictions.
"""
start_time = time.time()
while not self.stopped:
# get the state of the read thread
with self.read_id_lock:
read_id = self.read_id
not_end = self.not_end
with self.display_lock:
# If video ended and we have display all frames.
if not not_end and self.display_id == read_id:
break
# If the next task are not available, wait.
if (len(self.display_queue) == 0 or
self.display_queue.get(self.display_id + 1) is None):
time.sleep(0.02)
continue
# get display data and update state
self.display_id += 1
was_read, task = self.display_queue[self.display_id]
del self.display_queue[self.display_id]
display_id = self.display_id
# do display predictions
with self.output_lock:
if was_read and task.id == 0:
# the first task
cur_display_inds = range(self.display_inds[-1] + 1)
elif not was_read:
# the last task
cur_display_inds = range(self.display_inds[0],
len(task.frames))
else:
cur_display_inds = self.display_inds
for frame_id in cur_display_inds:
frame = task.frames[frame_id]
if self.show:
cv2.imshow('Demo', frame)
cv2.waitKey(int(1000 / self.output_fps))
if self.video_writer:
self.video_writer.write(frame)
cur_time = time.time()
logger.debug(
f'Display thread: {1000*(cur_time - start_time):.0f} ms, '
f'read id {read_id}, display id {display_id}')
start_time = cur_time
def __iter__(self):
return self
def __next__(self):
"""Get data from read queue.
This function is part of the main thread.
"""
if self.read_queue.qsize() == 0:
time.sleep(0.02)
return not self.stopped, None
was_read, task = self.read_queue.get()
if not was_read:
# If we reach the end of the video, there aren't enough frames
# in the task.processed_frames, so no need to model inference
# and draw predictions. Put task into display queue.
with self.read_id_lock:
read_id = self.read_id
with self.display_lock:
self.display_queue[read_id] = was_read, copy.deepcopy(task)
# main thread doesn't need to handle this task again
task = None
return was_read, task
def start(self):
"""Start read thread and display thread."""
self.read_thread = threading.Thread(
target=self.read_fn, args=(), name='VidRead-Thread', daemon=True)
self.read_thread.start()
self.display_thread = threading.Thread(
target=self.display_fn,
args=(),
name='VidDisplay-Thread',
daemon=True)
self.display_thread.start()
return self
def clean(self):
"""Close all threads and release all resources."""
self.stopped = True
self.read_lock.acquire()
self.cap.release()
self.read_lock.release()
self.output_lock.acquire()
cv2.destroyAllWindows()
if self.video_writer:
self.video_writer.release()
self.output_lock.release()
def join(self):
"""Waiting for the finalization of read and display thread."""
self.read_thread.join()
self.display_thread.join()
def display(self, task):
"""Add the visualized task to the display queue.
Args:
task (TaskInfo object): task object that contain the necessary
information for prediction visualization.
"""
with self.display_lock:
self.display_queue[task.id] = (True, task)
def get_output_video_writer(self, path):
"""Return a video writer object.
Args:
path (str): path to the output video file.
"""
return cv2.VideoWriter(
filename=path,
fourcc=cv2.VideoWriter_fourcc(*'mp4v'),
fps=float(self.output_fps),
frameSize=self.display_size,
isColor=True)
class BaseVisualizer(metaclass=ABCMeta):
"""Base class for visualization tools."""
def __init__(self, max_labels_per_bbox):
self.max_labels_per_bbox = max_labels_per_bbox
def draw_predictions(self, task):
"""Visualize stdet predictions on raw frames."""
# read bboxes from task
bboxes = task.display_bboxes.cpu().numpy()
# draw predictions and update task
keyframe_idx = len(task.frames) // 2
draw_range = [
keyframe_idx - task.clip_vis_length // 2,
keyframe_idx + (task.clip_vis_length - 1) // 2
]
assert draw_range[0] >= 0 and draw_range[1] < len(task.frames)
task.frames = self.draw_clip_range(task.frames, task.action_preds,
bboxes, draw_range)
return task
def draw_clip_range(self, frames, preds, bboxes, draw_range):
"""Draw a range of frames with the same bboxes and predictions."""
# no predictions to be draw
if bboxes is None or len(bboxes) == 0:
return frames
# draw frames in `draw_range`
left_frames = frames[:draw_range[0]]
right_frames = frames[draw_range[1] + 1:]
draw_frames = frames[draw_range[0]:draw_range[1] + 1]
# get labels(texts) and draw predictions
draw_frames = [
self.draw_one_image(frame, bboxes, preds) for frame in draw_frames
]
return list(left_frames) + draw_frames + list(right_frames)
@abstractmethod
def draw_one_image(self, frame, bboxes, preds):
"""Draw bboxes and corresponding texts on one frame."""
@staticmethod
def abbrev(name):
"""Get the abbreviation of label name:
'take (an object) from (a person)' -> 'take ... from ...'
"""
while name.find('(') != -1:
st, ed = name.find('('), name.find(')')
name = name[:st] + '...' + name[ed + 1:]
return name
class DefaultVisualizer(BaseVisualizer):
"""Tools to visualize predictions.
Args:
max_labels_per_bbox (int): Max number of labels to visualize for a
person box. Default: 5.
plate (str): The color plate used for visualization. Two recommended
plates are blue plate `03045e-023e8a-0077b6-0096c7-00b4d8-48cae4`
and green plate `004b23-006400-007200-008000-38b000-70e000`. These
plates are generated by https://coolors.co/.
Default: '03045e-023e8a-0077b6-0096c7-00b4d8-48cae4'.
text_fontface (int): Fontface from OpenCV for texts.
Default: cv2.FONT_HERSHEY_DUPLEX.
text_fontscale (float): Fontscale from OpenCV for texts.
Default: 0.5.
text_fontcolor (tuple): fontface from OpenCV for texts.
Default: (255, 255, 255).
text_thickness (int): Thickness from OpenCV for texts.
Default: 1.
text_linetype (int): LInetype from OpenCV for texts.
Default: 1.
"""
def __init__(
self,
max_labels_per_bbox=5,
plate='03045e-023e8a-0077b6-0096c7-00b4d8-48cae4',
text_fontface=cv2.FONT_HERSHEY_DUPLEX,
text_fontscale=0.5,
text_fontcolor=(255, 255, 255), # white
text_thickness=1,
text_linetype=1):
super().__init__(max_labels_per_bbox=max_labels_per_bbox)
self.text_fontface = text_fontface
self.text_fontscale = text_fontscale
self.text_fontcolor = text_fontcolor
self.text_thickness = text_thickness
self.text_linetype = text_linetype
def hex2color(h):
"""Convert the 6-digit hex string to tuple of 3 int value (RGB)"""
return (int(h[:2], 16), int(h[2:4], 16), int(h[4:], 16))
plate = plate.split('-')
self.plate = [hex2color(h) for h in plate]
def draw_one_image(self, frame, bboxes, preds):
"""Draw predictions on one image."""
for bbox, pred in zip(bboxes, preds):
# draw bbox
box = bbox.astype(np.int64)
st, ed = tuple(box[:2]), tuple(box[2:])
cv2.rectangle(frame, st, ed, (0, 0, 255), 2)
# draw texts
for k, (label, score) in enumerate(pred):
if k >= self.max_labels_per_bbox:
break
text = f'{self.abbrev(label)}: {score:.4f}'
location = (0 + st[0], 18 + k * 18 + st[1])
textsize = cv2.getTextSize(text, self.text_fontface,
self.text_fontscale,
self.text_thickness)[0]
textwidth = textsize[0]
diag0 = (location[0] + textwidth, location[1] - 14)
diag1 = (location[0], location[1] + 2)
cv2.rectangle(frame, diag0, diag1, self.plate[k + 1], -1)
cv2.putText(frame, text, location, self.text_fontface,
self.text_fontscale, self.text_fontcolor,
self.text_thickness, self.text_linetype)
return frame
def main(args):
# init human detector
human_detector = MmdetHumanDetector(args.det_config, args.det_checkpoint,
args.device, args.det_score_thr)
# init action detector
config = Config.fromfile(args.config)
config.merge_from_dict(args.cfg_options)
try:
# In our spatiotemporal detection demo, different actions should have
# the same number of bboxes.
config['model']['test_cfg']['rcnn']['action_thr'] = .0
except KeyError:
pass
stdet_predictor = StdetPredictor(
config=config,
checkpoint=args.checkpoint,
device=args.device,
score_thr=args.action_score_thr,
label_map_path=args.label_map)
# init clip helper
clip_helper = ClipHelper(
config=config,
display_height=args.display_height,
display_width=args.display_width,
input_video=args.input_video,
predict_stepsize=args.predict_stepsize,
output_fps=args.output_fps,
clip_vis_length=args.clip_vis_length,
out_filename=args.out_filename,
show=args.show)
# init visualizer
vis = DefaultVisualizer()
# start read and display thread
clip_helper.start()
try:
# Main thread main function contains:
# 1) get data from read queue
# 2) get human bboxes and stdet predictions
# 3) draw stdet predictions and update task
# 4) put task into display queue
for able_to_read, task in clip_helper:
# get data from read queue
if not able_to_read:
# read thread is dead and all tasks are processed
break
if task is None:
# when no data in read queue, wait
time.sleep(0.01)
continue
inference_start = time.time()
# get human bboxes
human_detector.predict(task)
# get stdet predictions
stdet_predictor.predict(task)
# draw stdet predictions in raw frames
vis.draw_predictions(task)
logger.info(f'Stdet Results: {task.action_preds}')
# add draw frames to display queue
clip_helper.display(task)
logger.debug('Main thread inference time '
f'{1000*(time.time() - inference_start):.0f} ms')
# wait for display thread
clip_helper.join()
except KeyboardInterrupt:
pass
finally:
# close read & display thread, release all resources
clip_helper.clean()
if __name__ == '__main__':
main(parse_args())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment