# Copyright (c) OpenMMLab. All rights reserved. import os import os.path as osp import re import warnings from operator import itemgetter import mmcv import numpy as np import torch from mmcv.parallel import collate, scatter from mmcv.runner import load_checkpoint from mmaction.core import OutputHook from mmaction.datasets.pipelines import Compose from mmaction.models import build_recognizer def init_recognizer(config, checkpoint=None, device='cuda:0', **kwargs): """Initialize a recognizer from config file. Args: config (str | :obj:`mmcv.Config`): Config file path or the config object. checkpoint (str | None, optional): Checkpoint path/url. If set to None, the model will not load any weights. Default: None. device (str | :obj:`torch.device`): The desired device of returned tensor. Default: 'cuda:0'. Returns: nn.Module: The constructed recognizer. """ if 'use_frames' in kwargs: warnings.warn('The argument `use_frames` is deprecated PR #1191. ' 'Now you can use models trained with frames or videos ' 'arbitrarily. ') if isinstance(config, str): config = mmcv.Config.fromfile(config) elif not isinstance(config, mmcv.Config): raise TypeError('config must be a filename or Config object, ' f'but got {type(config)}') # pretrained model is unnecessary since we directly load checkpoint later config.model.backbone.pretrained = None model = build_recognizer(config.model, test_cfg=config.get('test_cfg')) if checkpoint is not None: load_checkpoint(model, checkpoint, map_location='cpu') model.cfg = config model.to(device) model.eval() return model def inference_recognizer(model, video, outputs=None, as_tensor=True, **kwargs): """Inference a video with the recognizer. Args: model (nn.Module): The loaded recognizer. video (str | dict | ndarray): The video file path / url or the rawframes directory path / results dictionary (the input of pipeline) / a 4D array T x H x W x 3 (The input video). outputs (list(str) | tuple(str) | str | None) : Names of layers whose outputs need to be returned, default: None. as_tensor (bool): Same as that in ``OutputHook``. Default: True. Returns: dict[tuple(str, float)]: Top-5 recognition result dict. dict[torch.tensor | np.ndarray]: Output feature maps from layers specified in `outputs`. """ if 'use_frames' in kwargs: warnings.warn('The argument `use_frames` is deprecated PR #1191. ' 'Now you can use models trained with frames or videos ' 'arbitrarily. ') if 'label_path' in kwargs: warnings.warn('The argument `use_frames` is deprecated PR #1191. ' 'Now the label file is not needed in ' 'inference_recognizer. ') input_flag = None if isinstance(video, dict): input_flag = 'dict' elif isinstance(video, np.ndarray): assert len(video.shape) == 4, 'The shape should be T x H x W x C' input_flag = 'array' elif isinstance(video, str) and video.startswith('http'): input_flag = 'video' elif isinstance(video, str) and osp.exists(video): if osp.isfile(video): if video.endswith('.npy'): input_flag = 'audio' else: input_flag = 'video' if osp.isdir(video): input_flag = 'rawframes' else: raise RuntimeError('The type of argument video is not supported: ' f'{type(video)}') if isinstance(outputs, str): outputs = (outputs, ) assert outputs is None or isinstance(outputs, (tuple, list)) cfg = model.cfg device = next(model.parameters()).device # model device # build the data pipeline test_pipeline = cfg.data.test.pipeline # Alter data pipelines & prepare inputs if input_flag == 'dict': data = video if input_flag == 'array': modality_map = {2: 'Flow', 3: 'RGB'} modality = modality_map.get(video.shape[-1]) data = dict( total_frames=video.shape[0], label=-1, start_index=0, array=video, modality=modality) for i in range(len(test_pipeline)): if 'Decode' in test_pipeline[i]['type']: test_pipeline[i] = dict(type='ArrayDecode') test_pipeline = [x for x in test_pipeline if 'Init' not in x['type']] if input_flag == 'video': data = dict(filename=video, label=-1, start_index=0, modality='RGB') if 'Init' not in test_pipeline[0]['type']: test_pipeline = [dict(type='OpenCVInit')] + test_pipeline else: test_pipeline[0] = dict(type='OpenCVInit') for i in range(len(test_pipeline)): if 'Decode' in test_pipeline[i]['type']: test_pipeline[i] = dict(type='OpenCVDecode') if input_flag == 'rawframes': filename_tmpl = cfg.data.test.get('filename_tmpl', 'img_{:05}.jpg') modality = cfg.data.test.get('modality', 'RGB') start_index = cfg.data.test.get('start_index', 1) # count the number of frames that match the format of `filename_tmpl` # RGB pattern example: img_{:05}.jpg -> ^img_\d+.jpg$ # Flow patteren example: {}_{:05d}.jpg -> ^x_\d+.jpg$ pattern = f'^{filename_tmpl}$' if modality == 'Flow': pattern = pattern.replace('{}', 'x') pattern = pattern.replace( pattern[pattern.find('{'):pattern.find('}') + 1], '\\d+') total_frames = len( list( filter(lambda x: re.match(pattern, x) is not None, os.listdir(video)))) data = dict( frame_dir=video, total_frames=total_frames, label=-1, start_index=start_index, filename_tmpl=filename_tmpl, modality=modality) if 'Init' in test_pipeline[0]['type']: test_pipeline = test_pipeline[1:] for i in range(len(test_pipeline)): if 'Decode' in test_pipeline[i]['type']: test_pipeline[i] = dict(type='RawFrameDecode') if input_flag == 'audio': data = dict( audio_path=video, total_frames=len(np.load(video)), start_index=cfg.data.test.get('start_index', 1), label=-1) test_pipeline = Compose(test_pipeline) data = test_pipeline(data) data = collate([data], samples_per_gpu=1) if next(model.parameters()).is_cuda: # scatter to specified GPU data = scatter(data, [device])[0] # forward the model with OutputHook(model, outputs=outputs, as_tensor=as_tensor) as h: with torch.no_grad(): scores = model(return_loss=False, **data)[0] returned_features = h.layer_outputs if outputs else None num_classes = scores.shape[-1] score_tuples = tuple(zip(range(num_classes), scores)) score_sorted = sorted(score_tuples, key=itemgetter(1), reverse=True) top5_label = score_sorted[:5] if outputs: return top5_label, returned_features return top5_label