Commit 5b3e36dc authored by Sugon_ldc's avatar Sugon_ldc
Browse files

add model TSM

parents
Pipeline #315 failed with stages
in 0 seconds
# 教程 6:如何导出模型为 onnx 格式
开放式神经网络交换格式(Open Neural Network Exchange,即 [ONNX](https://onnx.ai/))是一个开放的生态系统,使 AI 开发人员能够随着项目的发展选择正确的工具。
<!-- TOC -->
- [教程 6:如何导出模型为 onnx 格式](#教程-6如何导出模型为-onnx-格式)
- [支持的模型](#支持的模型)
- [如何使用](#如何使用)
- [准备工作](#准备工作)
- [行为识别器](#行为识别器)
- [时序动作检测器](#时序动作检测器)
<!-- TOC -->
## 支持的模型
到目前为止,MMAction2 支持将训练的 pytorch 模型中进行 onnx 导出。支持的模型有:
- I3D
- TSN
- TIN
- TSM
- R(2+1)D
- SLOWFAST
- SLOWONLY
- BMN
- BSN(tem, pem)
## 如何使用
对于简单的模型导出,用户可以使用这里的 [脚本](/tools/deployment/pytorch2onnx.py)
注意,需要安装 `onnx``onnxruntime` 包以进行导出后的验证。
### 准备工作
首先,安装 onnx
```shell
pip install onnx onnxruntime
```
MMAction2 提供了一个 python 脚本,用于将 MMAction2 训练的 pytorch 模型导出到 ONNX。
```shell
python tools/deployment/pytorch2onnx.py ${CONFIG_FILE} ${CHECKPOINT_FILE} [--shape ${SHAPE}] \
[--verify] [--show] [--output-file ${OUTPUT_FILE}] [--is-localizer] [--opset-version ${VERSION}]
```
可选参数:
- `--shape`: 模型输入张量的形状。对于 2D 模型(如 TSN),输入形状应当为 `$batch $clip $channel $height $width` (例如,`1 1 3 224 224`);对于 3D 模型(如 I3D),输入形状应当为 `$batch $clip $channel $time $height $width` (如,`1 1 3 32 224 224`);对于时序检测器如 BSN,每个模块的数据都不相同,请查看对应的 `forward` 函数。如果没有被指定,它将被置为 `1 1 3 224 224`
- `--verify`: 决定是否对导出模型进行验证,验证项包括是否可运行,数值是否正确等。如果没有被指定,它将被置为 `False`
- `--show`: 决定是否打印导出模型的结构。如果没有被指定,它将被置为 `False`
- `--output-file`: 导出的 onnx 模型名。如果没有被指定,它将被置为 `tmp.onnx`
- `--is-localizer`:决定导出的模型是否为时序检测器。如果没有被指定,它将被置为 `False`
- `--opset-version`:决定 onnx 的执行版本,MMAction2 推荐用户使用高版本(例如 11 版本)的 onnx 以确保稳定性。如果没有被指定,它将被置为 `11`
- `--softmax`: 是否在行为识别器末尾添加 Softmax。如果没有指定,将被置为 `False`。目前仅支持行为识别器,不支持时序动作检测器。
### 行为识别器
对于行为识别器,可运行:
```shell
python tools/deployment/pytorch2onnx.py $CONFIG_PATH $CHECKPOINT_PATH --shape $SHAPE --verify
```
### 时序动作检测器
对于时序动作检测器,可运行:
```shell
python tools/deployment/pytorch2onnx.py $CONFIG_PATH $CHECKPOINT_PATH --is-localizer --shape $SHAPE --verify
```
如果发现提供的模型权重文件没有被成功导出,或者存在精度损失,可以在本 repo 下提出问题(issue)。
# 教程 7:如何自定义模型运行参数
在本教程中,我们将介绍如何在运行自定义模型时,进行自定义参数优化方法,学习率调整策略,工作流和钩子的方法。
<!-- TOC -->
- [教程 7:如何自定义模型运行参数](#教程-7如何自定义模型运行参数)
- [定制优化方法](#定制优化方法)
- [使用 PyTorch 内置的优化器](#使用-pytorch-内置的优化器)
- [定制用户自定义的优化器](#定制用户自定义的优化器)
- [1. 定义一个新的优化器](#1-定义一个新的优化器)
- [2. 注册优化器](#2-注册优化器)
- [3. 在配置文件中指定优化器](#3-在配置文件中指定优化器)
- [定制优化器构造器](#定制优化器构造器)
- [额外设定](#额外设定)
- [定制学习率调整策略](#定制学习率调整策略)
- [定制工作流](#定制工作流)
- [定制钩子](#定制钩子)
- [定制用户自定义钩子](#定制用户自定义钩子)
- [1. 创建一个新钩子](#1-创建一个新钩子)
- [2. 注册新钩子](#2-注册新钩子)
- [3. 修改配置](#3-修改配置)
- [使用 MMCV 内置钩子](#使用-mmcv-内置钩子)
- [修改默认运行的钩子](#修改默认运行的钩子)
- [模型权重文件配置](#模型权重文件配置)
- [日志配置](#日志配置)
- [验证配置](#验证配置)
<!-- TOC -->
## 定制优化方法
### 使用 PyTorch 内置的优化器
MMAction2 支持 PyTorch 实现的所有优化器,仅需在配置文件中,指定 “optimizer” 字段
例如,如果要使用 “Adam”,则修改如下。
```python
optimizer = dict(type='Adam', lr=0.0003, weight_decay=0.0001)
```
要修改模型的学习率,用户只需要在优化程序的配置中修改 “lr” 即可。
用户可根据 [PyTorch API 文档](https://pytorch.org/docs/stable/optim.html?highlight=optim#module-torch.optim) 进行参数设置
例如,如果想使用 `Adam` 并设置参数为 `torch.optim.Adam(params, lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)`
则需要进行如下修改
```python
optimizer = dict(type='Adam', lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
```
### 定制用户自定义的优化器
#### 1. 定义一个新的优化器
一个自定义的优化器可根据如下规则进行定制
假设用户想添加一个名为 `MyOptimzer` 的优化器,其拥有参数 `a`, `b``c`
可以创建一个名为 `mmaction/core/optimizer` 的文件夹,并在目录下的文件进行构建,如 `mmaction/core/optimizer/my_optimizer.py`
```python
from mmcv.runner import OPTIMIZERS
from torch.optim import Optimizer
@OPTIMIZERS.register_module()
class MyOptimizer(Optimizer):
def __init__(self, a, b, c):
```
#### 2. 注册优化器
要找到上面定义的上述模块,首先应将此模块导入到主命名空间中。有两种方法可以实现它。
- 修改 `mmaction/core/optimizer/__init__.py` 来进行调用
新定义的模块应导入到 `mmaction/core/optimizer/__init__.py` 中,以便注册器能找到新模块并将其添加:
```python
from .my_optimizer import MyOptimizer
```
- 在配置中使用 `custom_imports` 手动导入
```python
custom_imports = dict(imports=['mmaction.core.optimizer.my_optimizer'], allow_failed_imports=False)
```
`mmaction.core.optimizer.my_optimizer` 模块将会在程序开始阶段被导入,`MyOptimizer` 类会随之自动被注册。
注意,只有包含 `MyOptmizer` 类的包会被导入。`mmaction.core.optimizer.my_optimizer.MyOptimizer` **不会** 被直接导入。
#### 3. 在配置文件中指定优化器
之后,用户便可在配置文件的 `optimizer` 域中使用 `MyOptimizer`
在配置中,优化器由 “optimizer” 字段定义,如下所示:
```python
optimizer = dict(type='SGD', lr=0.02, momentum=0.9, weight_decay=0.0001)
```
要使用自定义的优化器,可以将该字段更改为
```python
optimizer = dict(type='MyOptimizer', a=a_value, b=b_value, c=c_value)
```
### 定制优化器构造器
某些模型可能具有一些特定于参数的设置以进行优化,例如 BatchNorm 层的权重衰减。
用户可以通过自定义优化器构造函数来进行那些细粒度的参数调整。
```python
from mmcv.runner.optimizer import OPTIMIZER_BUILDERS
@OPTIMIZER_BUILDERS.register_module()
class MyOptimizerConstructor:
def __init__(self, optimizer_cfg, paramwise_cfg=None):
pass
def __call__(self, model):
return my_optimizer
```
默认的优化器构造器被创建于[](https://github.com/open-mmlab/mmcv/blob/9ecd6b0d5ff9d2172c49a182eaa669e9f27bb8e7/mmcv/runner/optimizer/default_constructor.py#L11)
可被视为新优化器构造器的模板。
### 额外设定
优化器没有实现的优化技巧(trick)可通过优化器构造函数(例如,设置按参数的学习率)或钩子来实现。
下面列出了一些可以稳定训练或加快训练速度的常用设置。用户亦可通过为 MMAction2 创建 PR,发布更多设置。
- __使用梯度裁剪来稳定训练__
一些模型需要使用梯度裁剪来剪辑渐变以稳定训练过程。 一个例子如下:
```python
optimizer_config = dict(grad_clip=dict(max_norm=35, norm_type=2))
```
- __使用动量调整来加速模型收敛__
MMAction2 支持动量调整器根据学习率修改模型的动量,从而使模型收敛更快。
动量调整程序通常与学习率调整器一起使用,例如,以下配置用于3D检测以加速收敛。
更多细节可参考 [CyclicLrUpdater](https://github.com/open-mmlab/mmcv/blob/f48241a65aebfe07db122e9db320c31b685dc674/mmcv/runner/hooks/lr_updater.py#L327)
[CyclicMomentumUpdater](https://github.com/open-mmlab/mmcv/blob/f48241a65aebfe07db122e9db320c31b685dc674/mmcv/runner/hooks/momentum_updater.py#L130)
```python
lr_config = dict(
policy='cyclic',
target_ratio=(10, 1e-4),
cyclic_times=1,
step_ratio_up=0.4,
)
momentum_config = dict(
policy='cyclic',
target_ratio=(0.85 / 0.95, 1),
cyclic_times=1,
step_ratio_up=0.4,
)
```
## 定制学习率调整策略
在配置文件中使用默认值的逐步学习率调整,它调用 MMCV 中的 [`StepLRHook`](https://github.com/open-mmlab/mmcv/blob/f48241a65aebfe07db122e9db320c31b685dc674/mmcv/runner/hooks/lr_updater.py#L153)
此外,也支持其他学习率调整方法,如 `CosineAnnealing``Poly`。 详情可见 [这里](https://github.com/open-mmlab/mmcv/blob/master/mmcv/runner/hooks/lr_updater.py)
- Poly:
```python
lr_config = dict(policy='poly', power=0.9, min_lr=1e-4, by_epoch=False)
```
- ConsineAnnealing:
```python
lr_config = dict(
policy='CosineAnnealing',
warmup='linear',
warmup_iters=1000,
warmup_ratio=1.0 / 10,
min_lr_ratio=1e-5)
```
## 定制工作流
默认情况下,MMAction2 推荐用户在训练周期中使用 “EvalHook” 进行模型验证,也可以选择 “val” 工作流模型进行模型验证。
工作流是一个形如 (工作流名, 周期数) 的列表,用于指定运行顺序和周期。其默认设置为:
```python
workflow = [('train', 1)]
```
其代表要进行一轮周期的训练。
有时,用户可能希望检查有关验证集中模型的某些指标(例如,损失,准确性)。
在这种情况下,可以将工作流程设置为
```python
[('train', 1), ('val', 1)]
```
从而将迭代运行1个训练时间和1个验证时间。
**值得注意的是**
1. 在验证周期时不会更新模型参数。
2. 配置文件内的关键词 `total_epochs` 控制训练时期数,并且不会影响验证工作流程。
3. 工作流 `[('train', 1), ('val', 1)]``[('train', 1)]` 不会改变 `EvalHook` 的行为。
因为 `EvalHook``after_train_epoch` 调用,而验证工作流只会影响 `after_val_epoch` 调用的钩子。
因此,`[('train', 1), ('val', 1)]``[('train', 1)]` 的区别在于,runner 在完成每一轮训练后,会计算验证集上的损失。
## 定制钩子
### 定制用户自定义钩子
#### 1. 创建一个新钩子
这里举一个在 MMAction2 中创建一个新钩子,并在训练中使用它的示例:
```python
from mmcv.runner import HOOKS, Hook
@HOOKS.register_module()
class MyHook(Hook):
def __init__(self, a, b):
pass
def before_run(self, runner):
pass
def after_run(self, runner):
pass
def before_epoch(self, runner):
pass
def after_epoch(self, runner):
pass
def before_iter(self, runner):
pass
def after_iter(self, runner):
pass
```
根据钩子的功能,用户需要指定钩子在训练的每个阶段将要执行的操作,比如 `before_run``after_run``before_epoch``after_epoch``before_iter``after_iter`
#### 2. 注册新钩子
之后,需要导入 `MyHook`。假设该文件在 `mmaction/core/utils/my_hook.py`,有两种办法导入它:
- 修改 `mmaction/core/utils/__init__.py` 进行导入
新定义的模块应导入到 `mmaction/core/utils/__init__py` 中,以便注册表能找到并添加新模块:
```python
from .my_hook import MyHook
```
- 使用配置文件中的 `custom_imports` 变量手动导入
```python
custom_imports = dict(imports=['mmaction.core.utils.my_hook'], allow_failed_imports=False)
```
#### 3. 修改配置
```python
custom_hooks = [
dict(type='MyHook', a=a_value, b=b_value)
]
```
还可通过 `priority` 参数(可选参数值包括 `'NORMAL'``'HIGHEST'`)设置钩子优先级,如下所示:
```python
custom_hooks = [
dict(type='MyHook', a=a_value, b=b_value, priority='NORMAL')
]
```
默认情况下,在注册过程中,钩子的优先级设置为 “NORMAL”。
### 使用 MMCV 内置钩子
如果该钩子已在 MMCV 中实现,则可以直接修改配置以使用该钩子,如下所示
```python
mmcv_hooks = [
dict(type='MMCVHook', a=a_value, b=b_value, priority='NORMAL')
]
```
### 修改默认运行的钩子
有一些常见的钩子未通过 `custom_hooks` 注册,但在导入 MMCV 时已默认注册,它们是:
- log_config
- checkpoint_config
- evaluation
- lr_config
- optimizer_config
- momentum_config
在这些钩子中,只有 log_config 具有 “VERY_LOW” 优先级,其他钩子具有 “NORMAL” 优先级。
上述教程已经介绍了如何修改 “optimizer_config”,“momentum_config” 和 “lr_config”。
下面介绍如何使用 log_config,checkpoint_config,以及 evaluation 能做什么。
#### 模型权重文件配置
MMCV 的 runner 使用 `checkpoint_config` 来初始化 [`CheckpointHook`](https://github.com/open-mmlab/mmcv/blob/9ecd6b0d5ff9d2172c49a182eaa669e9f27bb8e7/mmcv/runner/hooks/checkpoint.py#L9)
```python
checkpoint_config = dict(interval=1)
```
用户可以设置 “max_keep_ckpts” 来仅保存少量模型权重文件,或者通过 “save_optimizer” 决定是否存储优化器的状态字典。
更多细节可参考 [这里](https://mmcv.readthedocs.io/en/latest/api.html#mmcv.runner.CheckpointHook)
#### 日志配置
`log_config` 包装了多个记录器钩子,并可以设置间隔。
目前,MMCV 支持 `WandbLoggerHook``MlflowLoggerHook``TensorboardLoggerHook`
更多细节可参考[这里](https://mmcv.readthedocs.io/en/latest/api.html#mmcv.runner.LoggerHook)
```python
log_config = dict(
interval=50,
hooks=[
dict(type='TextLoggerHook'),
dict(type='TensorboardLoggerHook')
])
```
#### 验证配置
评估的配置将用于初始化 [`EvalHook`](https://github.com/open-mmlab/mmaction2/blob/master/mmaction/core/evaluation/eval_hooks.py#L12)
除了键 `interval` 外,其他参数,如 “metrics” 也将传递给 `dataset.evaluate()`
```python
evaluation = dict(interval=1, metrics='bbox')
```
除了训练/测试脚本外,MMAction2 还在 `tools/` 目录下提供了许多有用的工具。
## 目录
<!-- TOC -->
- [目录](#目录)
- [日志分析](#日志分析)
- [模型复杂度分析](#模型复杂度分析)
- [模型转换](#模型转换)
- [导出 MMAction2 模型为 ONNX 格式(实验特性)](#导出-mmaction2-模型为-onnx-格式实验特性)
- [发布模型](#发布模型)
- [其他脚本](#其他脚本)
- [指标评价](#指标评价)
- [打印完整配置](#打印完整配置)
- [检查视频](#检查视频)
<!-- TOC -->
## 日志分析
输入变量指定一个训练日志文件,可通过 `tools/analysis/analyze_logs.py` 脚本绘制 loss/top-k 曲线。本功能依赖于 `seaborn`,使用前请先通过 `pip install seaborn` 安装依赖包。
![准确度曲线图](https://github.com/open-mmlab/mmaction2/raw/master/resources/acc_curve.png)
```shell
python tools/analysis/analyze_logs.py plot_curve ${JSON_LOGS} [--keys ${KEYS}] [--title ${TITLE}] [--legend ${LEGEND}] [--backend ${BACKEND}] [--style ${STYLE}] [--out ${OUT_FILE}]
```
例如:
- 绘制某日志文件对应的分类损失曲线图。
```shell
python tools/analysis/analyze_logs.py plot_curve log.json --keys loss_cls --legend loss_cls
```
- 绘制某日志文件对应的 top-1 和 top-5 准确率曲线图,并将曲线图导出为 PDF 文件。
```shell
python tools/analysis/analyze_logs.py plot_curve log.json --keys top1_acc top5_acc --out results.pdf
```
- 在同一图像内绘制两份日志文件对应的 top-1 准确率曲线图。
```shell
python tools/analysis/analyze_logs.py plot_curve log1.json log2.json --keys top1_acc --legend run1 run2
```
用户还可以通过本工具计算平均训练速度。
```shell
python tools/analysis/analyze_logs.py cal_train_time ${JSON_LOGS} [--include-outliers]
```
- 计算某日志文件对应的平均训练速度。
```shell
python tools/analysis/analyze_logs.py cal_train_time work_dirs/some_exp/20200422_153324.log.json
```
预计输出结果如下所示:
```text
-----Analyze train time of work_dirs/some_exp/20200422_153324.log.json-----
slowest epoch 60, average time is 0.9736
fastest epoch 18, average time is 0.9001
time std over epochs is 0.0177
average iter time: 0.9330 s/iter
```
## 模型复杂度分析
`/tools/analysis/get_flops.py` 是根据 [flops-counter.pytorch](https://github.com/sovrasov/flops-counter.pytorch) 库改编的脚本,用于计算输入变量指定模型的 FLOPs 和参数量。
```shell
python tools/analysis/get_flops.py ${CONFIG_FILE} [--shape ${INPUT_SHAPE}]
```
预计输出结果如下所示:
```text
==============================
Input shape: (1, 3, 32, 340, 256)
Flops: 37.1 GMac
Params: 28.04 M
==============================
```
**注意**:该工具仍处于试验阶段,不保证该数字绝对正确。
用户可以将结果用于简单比较,但若要在技术报告或论文中采用该结果,请仔细检查。
(1) FLOPs 与输入变量形状有关,但是模型的参数量与输入变量形状无关。2D 行为识别器的默认形状为 (1, 3, 340, 256),3D 行为识别器的默认形状为 (1, 3, 32, 340, 256)。
(2) 部分算子不参与 FLOPs 以及参数量的计算,如 GN 和一些自定义算子。更多详细信息请参考 [`mmcv.cnn.get_model_complexity_info()`](https://github.com/open-mmlab/mmcv/blob/master/mmcv/cnn/utils/flops_counter.py)
## 模型转换
### 导出 MMAction2 模型为 ONNX 格式(实验特性)
`/tools/deployment/pytorch2onnx.py` 脚本用于将模型转换为 [ONNX](https://github.com/onnx/onnx) 格式。
同时,该脚本支持比较 PyTorch 模型和 ONNX 模型的输出结果,验证输出结果是否相同。
本功能依赖于 `onnx` 以及 `onnxruntime`,使用前请先通过 `pip install onnx onnxruntime` 安装依赖包。
请注意,可通过 `--softmax` 选项在行为识别器末尾添加 Softmax 层,从而获取 `[0, 1]` 范围内的预测结果。
- 对于行为识别模型,请运行:
```shell
python tools/deployment/pytorch2onnx.py $CONFIG_PATH $CHECKPOINT_PATH --shape $SHAPE --verify
```
- 对于时序动作检测模型,请运行:
```shell
python tools/deployment/pytorch2onnx.py $CONFIG_PATH $CHECKPOINT_PATH --is-localizer --shape $SHAPE --verify
```
### 发布模型
`tools/deployment/publish_model.py` 脚本用于进行模型发布前的准备工作,主要包括:
(1) 将模型的权重张量转化为 CPU 张量。
(2) 删除优化器状态信息。
(3) 计算模型权重文件的哈希值,并将哈希值添加到文件名后。
```shell
python tools/deployment/publish_model.py ${INPUT_FILENAME} ${OUTPUT_FILENAME}
```
例如,
```shell
python tools/deployment/publish_model.py work_dirs/tsn_r50_1x1x3_100e_kinetics400_rgb/latest.pth tsn_r50_1x1x3_100e_kinetics400_rgb.pth
```
最终,输出文件名为 `tsn_r50_1x1x3_100e_kinetics400_rgb-{hash id}.pth`。
## 其他脚本
### 指标评价
`tools/analysis/eval_metric.py` 脚本通过输入变量指定配置文件,以及对应的结果存储文件,计算某一评价指标。
结果存储文件通过 `tools/test.py` 脚本(通过参数 `--out ${RESULT_FILE}` 指定)生成,保存了指定模型在指定数据集中的预测结果。
```shell
python tools/analysis/eval_metric.py ${CONFIG_FILE} ${RESULT_FILE} [--eval ${EVAL_METRICS}] [--cfg-options ${CFG_OPTIONS}] [--eval-options ${EVAL_OPTIONS}]
```
### 打印完整配置
`tools/analysis/print_config.py` 脚本会解析所有输入变量,并打印完整配置信息。
```shell
python tools/print_config.py ${CONFIG} [-h] [--options ${OPTIONS [OPTIONS...]}]
```
### 检查视频
`tools/analysis/check_videos.py` 脚本利用指定视频编码器,遍历指定配置文件视频数据集中所有样本,寻找无效视频文件(文件破损或者文件不存在),并将无效文件路径保存到输出文件中。请注意,删除无效视频文件后,需要重新生成视频文件列表。
```shell
python tools/analysis/check_videos.py ${CONFIG} [-h] [--options OPTIONS [OPTIONS ...]] [--cfg-options CFG_OPTIONS [CFG_OPTIONS ...]] [--output-file OUTPUT_FILE] [--split SPLIT] [--decoder DECODER] [--num-processes NUM_PROCESSES] [--remove-corrupted-videos]
```
# Copyright (c) OpenMMLab. All rights reserved.
import mmcv
from mmcv import digit_version
from .version import __version__
mmcv_minimum_version = '1.3.6'
mmcv_maximum_version = '1.8.0'
mmcv_version = digit_version(mmcv.__version__)
assert (digit_version(mmcv_minimum_version) <= mmcv_version
<= digit_version(mmcv_maximum_version)), \
f'MMCV=={mmcv.__version__} is used but incompatible. ' \
f'Please install mmcv>={mmcv_minimum_version}, <={mmcv_maximum_version}.'
__all__ = ['__version__']
# Copyright (c) OpenMMLab. All rights reserved.
from .inference import inference_recognizer, init_recognizer
from .test import multi_gpu_test, single_gpu_test
from .train import init_random_seed, train_model
__all__ = [
'train_model', 'init_recognizer', 'inference_recognizer', 'multi_gpu_test',
'single_gpu_test', 'init_random_seed'
]
# Copyright (c) OpenMMLab. All rights reserved.
import os
import os.path as osp
import re
import warnings
from operator import itemgetter
import mmcv
import numpy as np
import torch
from mmcv.parallel import collate, scatter
from mmcv.runner import load_checkpoint
from mmaction.core import OutputHook
from mmaction.datasets.pipelines import Compose
from mmaction.models import build_recognizer
def init_recognizer(config, checkpoint=None, device='cuda:0', **kwargs):
"""Initialize a recognizer from config file.
Args:
config (str | :obj:`mmcv.Config`): Config file path or the config
object.
checkpoint (str | None, optional): Checkpoint path/url. If set to None,
the model will not load any weights. Default: None.
device (str | :obj:`torch.device`): The desired device of returned
tensor. Default: 'cuda:0'.
Returns:
nn.Module: The constructed recognizer.
"""
if 'use_frames' in kwargs:
warnings.warn('The argument `use_frames` is deprecated PR #1191. '
'Now you can use models trained with frames or videos '
'arbitrarily. ')
if isinstance(config, str):
config = mmcv.Config.fromfile(config)
elif not isinstance(config, mmcv.Config):
raise TypeError('config must be a filename or Config object, '
f'but got {type(config)}')
# pretrained model is unnecessary since we directly load checkpoint later
config.model.backbone.pretrained = None
model = build_recognizer(config.model, test_cfg=config.get('test_cfg'))
if checkpoint is not None:
load_checkpoint(model, checkpoint, map_location='cpu')
model.cfg = config
model.to(device)
model.eval()
return model
def inference_recognizer(model, video, outputs=None, as_tensor=True, **kwargs):
"""Inference a video with the recognizer.
Args:
model (nn.Module): The loaded recognizer.
video (str | dict | ndarray): The video file path / url or the
rawframes directory path / results dictionary (the input of
pipeline) / a 4D array T x H x W x 3 (The input video).
outputs (list(str) | tuple(str) | str | None) : Names of layers whose
outputs need to be returned, default: None.
as_tensor (bool): Same as that in ``OutputHook``. Default: True.
Returns:
dict[tuple(str, float)]: Top-5 recognition result dict.
dict[torch.tensor | np.ndarray]:
Output feature maps from layers specified in `outputs`.
"""
if 'use_frames' in kwargs:
warnings.warn('The argument `use_frames` is deprecated PR #1191. '
'Now you can use models trained with frames or videos '
'arbitrarily. ')
if 'label_path' in kwargs:
warnings.warn('The argument `use_frames` is deprecated PR #1191. '
'Now the label file is not needed in '
'inference_recognizer. ')
input_flag = None
if isinstance(video, dict):
input_flag = 'dict'
elif isinstance(video, np.ndarray):
assert len(video.shape) == 4, 'The shape should be T x H x W x C'
input_flag = 'array'
elif isinstance(video, str) and video.startswith('http'):
input_flag = 'video'
elif isinstance(video, str) and osp.exists(video):
if osp.isfile(video):
if video.endswith('.npy'):
input_flag = 'audio'
else:
input_flag = 'video'
if osp.isdir(video):
input_flag = 'rawframes'
else:
raise RuntimeError('The type of argument video is not supported: '
f'{type(video)}')
if isinstance(outputs, str):
outputs = (outputs, )
assert outputs is None or isinstance(outputs, (tuple, list))
cfg = model.cfg
device = next(model.parameters()).device # model device
# build the data pipeline
test_pipeline = cfg.data.test.pipeline
# Alter data pipelines & prepare inputs
if input_flag == 'dict':
data = video
if input_flag == 'array':
modality_map = {2: 'Flow', 3: 'RGB'}
modality = modality_map.get(video.shape[-1])
data = dict(
total_frames=video.shape[0],
label=-1,
start_index=0,
array=video,
modality=modality)
for i in range(len(test_pipeline)):
if 'Decode' in test_pipeline[i]['type']:
test_pipeline[i] = dict(type='ArrayDecode')
test_pipeline = [x for x in test_pipeline if 'Init' not in x['type']]
if input_flag == 'video':
data = dict(filename=video, label=-1, start_index=0, modality='RGB')
if 'Init' not in test_pipeline[0]['type']:
test_pipeline = [dict(type='OpenCVInit')] + test_pipeline
else:
test_pipeline[0] = dict(type='OpenCVInit')
for i in range(len(test_pipeline)):
if 'Decode' in test_pipeline[i]['type']:
test_pipeline[i] = dict(type='OpenCVDecode')
if input_flag == 'rawframes':
filename_tmpl = cfg.data.test.get('filename_tmpl', 'img_{:05}.jpg')
modality = cfg.data.test.get('modality', 'RGB')
start_index = cfg.data.test.get('start_index', 1)
# count the number of frames that match the format of `filename_tmpl`
# RGB pattern example: img_{:05}.jpg -> ^img_\d+.jpg$
# Flow patteren example: {}_{:05d}.jpg -> ^x_\d+.jpg$
pattern = f'^{filename_tmpl}$'
if modality == 'Flow':
pattern = pattern.replace('{}', 'x')
pattern = pattern.replace(
pattern[pattern.find('{'):pattern.find('}') + 1], '\\d+')
total_frames = len(
list(
filter(lambda x: re.match(pattern, x) is not None,
os.listdir(video))))
data = dict(
frame_dir=video,
total_frames=total_frames,
label=-1,
start_index=start_index,
filename_tmpl=filename_tmpl,
modality=modality)
if 'Init' in test_pipeline[0]['type']:
test_pipeline = test_pipeline[1:]
for i in range(len(test_pipeline)):
if 'Decode' in test_pipeline[i]['type']:
test_pipeline[i] = dict(type='RawFrameDecode')
if input_flag == 'audio':
data = dict(
audio_path=video,
total_frames=len(np.load(video)),
start_index=cfg.data.test.get('start_index', 1),
label=-1)
test_pipeline = Compose(test_pipeline)
data = test_pipeline(data)
data = collate([data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [device])[0]
# forward the model
with OutputHook(model, outputs=outputs, as_tensor=as_tensor) as h:
with torch.no_grad():
scores = model(return_loss=False, **data)[0]
returned_features = h.layer_outputs if outputs else None
num_classes = scores.shape[-1]
score_tuples = tuple(zip(range(num_classes), scores))
score_sorted = sorted(score_tuples, key=itemgetter(1), reverse=True)
top5_label = score_sorted[:5]
if outputs:
return top5_label, returned_features
return top5_label
# Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp
import pickle
import shutil
import tempfile
# TODO import test functions from mmcv and delete them from mmaction2
import warnings
import mmcv
import torch
import torch.distributed as dist
from mmcv.runner import get_dist_info
try:
from mmcv.engine import (collect_results_cpu, collect_results_gpu,
multi_gpu_test, single_gpu_test)
from_mmcv = True
except (ImportError, ModuleNotFoundError):
warnings.warn(
'DeprecationWarning: single_gpu_test, multi_gpu_test, '
'collect_results_cpu, collect_results_gpu from mmaction2 will be '
'deprecated. Please install mmcv through master branch.')
from_mmcv = False
if not from_mmcv:
def single_gpu_test(model, data_loader): # noqa: F811
"""Test model with a single gpu.
This method tests model with a single gpu and
displays test progress bar.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
Returns:
list: The prediction results.
"""
model.eval()
results = []
dataset = data_loader.dataset
prog_bar = mmcv.ProgressBar(len(dataset))
for data in data_loader:
with torch.no_grad():
result = model(return_loss=False, **data)
results.extend(result)
# use the first key as main key to calculate the batch size
batch_size = len(next(iter(data.values())))
for _ in range(batch_size):
prog_bar.update()
return results
def multi_gpu_test( # noqa: F811
model, data_loader, tmpdir=None, gpu_collect=True):
"""Test model with multiple gpus.
This method tests model with multiple gpus and collects the results
under two different modes: gpu and cpu modes. By setting
'gpu_collect=True' it encodes results to gpu tensors and use gpu
communication for results collection. On cpu mode it saves the results
on different gpus to 'tmpdir' and collects them by the rank 0 worker.
Args:
model (nn.Module): Model to be tested.
data_loader (nn.Dataloader): Pytorch data loader.
tmpdir (str): Path of directory to save the temporary results from
different gpus under cpu mode. Default: None
gpu_collect (bool): Option to use either gpu or cpu to collect
results. Default: True
Returns:
list: The prediction results.
"""
model.eval()
results = []
dataset = data_loader.dataset
rank, world_size = get_dist_info()
if rank == 0:
prog_bar = mmcv.ProgressBar(len(dataset))
for data in data_loader:
with torch.no_grad():
result = model(return_loss=False, **data)
results.extend(result)
if rank == 0:
# use the first key as main key to calculate the batch size
batch_size = len(next(iter(data.values())))
for _ in range(batch_size * world_size):
prog_bar.update()
# collect results from all ranks
if gpu_collect:
results = collect_results_gpu(results, len(dataset))
else:
results = collect_results_cpu(results, len(dataset), tmpdir)
return results
def collect_results_cpu(result_part, size, tmpdir=None): # noqa: F811
"""Collect results in cpu mode.
It saves the results on different gpus to 'tmpdir' and collects
them by the rank 0 worker.
Args:
result_part (list): Results to be collected
size (int): Result size.
tmpdir (str): Path of directory to save the temporary results from
different gpus under cpu mode. Default: None
Returns:
list: Ordered results.
"""
rank, world_size = get_dist_info()
# create a tmp dir if it is not specified
if tmpdir is None:
MAX_LEN = 512
# 32 is whitespace
dir_tensor = torch.full((MAX_LEN, ),
32,
dtype=torch.uint8,
device='cuda')
if rank == 0:
mmcv.mkdir_or_exist('.dist_test')
tmpdir = tempfile.mkdtemp(dir='.dist_test')
tmpdir = torch.tensor(
bytearray(tmpdir.encode()),
dtype=torch.uint8,
device='cuda')
dir_tensor[:len(tmpdir)] = tmpdir
dist.broadcast(dir_tensor, 0)
tmpdir = dir_tensor.cpu().numpy().tobytes().decode().rstrip()
else:
tmpdir = osp.join(tmpdir, '.dist_test')
mmcv.mkdir_or_exist(tmpdir)
# synchronizes all processes to make sure tmpdir exist
dist.barrier()
# dump the part result to the dir
mmcv.dump(result_part, osp.join(tmpdir, f'part_{rank}.pkl'))
# synchronizes all processes for loading pickle file
dist.barrier()
# collect all parts
if rank != 0:
return None
# load results of all parts from tmp dir
part_list = []
for i in range(world_size):
part_file = osp.join(tmpdir, f'part_{i}.pkl')
part_list.append(mmcv.load(part_file))
# sort the results
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
# the dataloader may pad some samples
ordered_results = ordered_results[:size]
# remove tmp dir
shutil.rmtree(tmpdir)
return ordered_results
def collect_results_gpu(result_part, size): # noqa: F811
"""Collect results in gpu mode.
It encodes results to gpu tensors and use gpu communication for results
collection.
Args:
result_part (list): Results to be collected
size (int): Result size.
Returns:
list: Ordered results.
"""
rank, world_size = get_dist_info()
# dump result part to tensor with pickle
part_tensor = torch.tensor(
bytearray(pickle.dumps(result_part)),
dtype=torch.uint8,
device='cuda')
# gather all result part tensor shape
shape_tensor = torch.tensor(part_tensor.shape, device='cuda')
shape_list = [shape_tensor.clone() for _ in range(world_size)]
dist.all_gather(shape_list, shape_tensor)
# padding result part tensor to max length
shape_max = torch.tensor(shape_list).max()
part_send = torch.zeros(shape_max, dtype=torch.uint8, device='cuda')
part_send[:shape_tensor[0]] = part_tensor
part_recv_list = [
part_tensor.new_zeros(shape_max) for _ in range(world_size)
]
# gather all result part
dist.all_gather(part_recv_list, part_send)
if rank == 0:
part_list = []
for recv, shape in zip(part_recv_list, shape_list):
part_list.append(
pickle.loads(recv[:shape[0]].cpu().numpy().tobytes()))
# sort the results
ordered_results = []
for res in zip(*part_list):
ordered_results.extend(list(res))
# the dataloader may pad some samples
ordered_results = ordered_results[:size]
return ordered_results
return None
# Copyright (c) OpenMMLab. All rights reserved.
import copy as cp
import os
import os.path as osp
import time
import numpy as np
import torch
import torch.distributed as dist
from mmcv.runner import (DistSamplerSeedHook, EpochBasedRunner, OptimizerHook,
build_optimizer, get_dist_info)
from mmcv.runner.hooks import Fp16OptimizerHook
from ..core import (DistEvalHook, EvalHook, OmniSourceDistSamplerSeedHook,
OmniSourceRunner)
from ..datasets import build_dataloader, build_dataset
from ..utils import (PreciseBNHook, build_ddp, build_dp, default_device,
get_root_logger)
from .test import multi_gpu_test
def init_random_seed(seed=None, device=default_device, distributed=True):
"""Initialize random seed.
If the seed is not set, the seed will be automatically randomized,
and then broadcast to all processes to prevent some potential bugs.
Args:
seed (int, Optional): The seed. Default to None.
device (str): The device where the seed will be put on.
Default to 'cuda'.
distributed (bool): Whether to use distributed training.
Default: True.
Returns:
int: Seed to be used.
"""
if seed is not None:
return seed
# Make sure all ranks share the same random seed to prevent
# some potential bugs. Please refer to
# https://github.com/open-mmlab/mmdetection/issues/6339
rank, world_size = get_dist_info()
seed = np.random.randint(2**31)
if world_size == 1:
return seed
if rank == 0:
random_num = torch.tensor(seed, dtype=torch.int32, device=device)
else:
random_num = torch.tensor(0, dtype=torch.int32, device=device)
if distributed:
dist.broadcast(random_num, src=0)
return random_num.item()
def train_model(model,
dataset,
cfg,
distributed=False,
validate=False,
test=dict(test_best=False, test_last=False),
timestamp=None,
meta=None):
"""Train model entry function.
Args:
model (nn.Module): The model to be trained.
dataset (:obj:`Dataset`): Train dataset.
cfg (dict): The config dict for training.
distributed (bool): Whether to use distributed training.
Default: False.
validate (bool): Whether to do evaluation. Default: False.
test (dict): The testing option, with two keys: test_last & test_best.
The value is True or False, indicating whether to test the
corresponding checkpoint.
Default: dict(test_best=False, test_last=False).
timestamp (str | None): Local time for runner. Default: None.
meta (dict | None): Meta dict to record some important information.
Default: None
"""
logger = get_root_logger(log_level=cfg.log_level)
# prepare data loaders
dataset = dataset if isinstance(dataset, (list, tuple)) else [dataset]
dataloader_setting = dict(
videos_per_gpu=cfg.data.get('videos_per_gpu', 1),
workers_per_gpu=cfg.data.get('workers_per_gpu', 1),
persistent_workers=cfg.data.get('persistent_workers', False),
num_gpus=len(cfg.gpu_ids),
dist=distributed,
seed=cfg.seed)
dataloader_setting = dict(dataloader_setting,
**cfg.data.get('train_dataloader', {}))
if cfg.omnisource:
# The option can override videos_per_gpu
train_ratio = cfg.data.get('train_ratio', [1] * len(dataset))
omni_videos_per_gpu = cfg.data.get('omni_videos_per_gpu', None)
if omni_videos_per_gpu is None:
dataloader_settings = [dataloader_setting] * len(dataset)
else:
dataloader_settings = []
for videos_per_gpu in omni_videos_per_gpu:
this_setting = cp.deepcopy(dataloader_setting)
this_setting['videos_per_gpu'] = videos_per_gpu
dataloader_settings.append(this_setting)
data_loaders = [
build_dataloader(ds, **setting)
for ds, setting in zip(dataset, dataloader_settings)
]
else:
data_loaders = [
build_dataloader(ds, **dataloader_setting) for ds in dataset
]
# put model on gpus
if distributed:
find_unused_parameters = cfg.get('find_unused_parameters', False)
# Sets the `find_unused_parameters` parameter in
# torch.nn.parallel.DistributedDataParallel
model = build_ddp(
model,
default_device,
default_args=dict(
device_ids=[int(os.environ['LOCAL_RANK'])],
broadcast_buffers=False,
find_unused_parameters=find_unused_parameters))
else:
model = build_dp(
model, default_device, default_args=dict(device_ids=cfg.gpu_ids))
# build runner
optimizer = build_optimizer(model, cfg.optimizer)
Runner = OmniSourceRunner if cfg.omnisource else EpochBasedRunner
runner = Runner(
model,
optimizer=optimizer,
work_dir=cfg.work_dir,
logger=logger,
meta=meta)
# an ugly workaround to make .log and .log.json filenames the same
runner.timestamp = timestamp
# fp16 setting
fp16_cfg = cfg.get('fp16', None)
if fp16_cfg is not None:
optimizer_config = Fp16OptimizerHook(
**cfg.optimizer_config, **fp16_cfg, distributed=distributed)
elif distributed and 'type' not in cfg.optimizer_config:
optimizer_config = OptimizerHook(**cfg.optimizer_config)
else:
optimizer_config = cfg.optimizer_config
# register hooks
runner.register_training_hooks(
cfg.lr_config,
optimizer_config,
cfg.checkpoint_config,
cfg.log_config,
cfg.get('momentum_config', None),
custom_hooks_config=cfg.get('custom_hooks', None))
# multigrid setting
multigrid_cfg = cfg.get('multigrid', None)
if multigrid_cfg is not None:
from mmaction.utils.multigrid import LongShortCycleHook
multigrid_scheduler = LongShortCycleHook(cfg)
runner.register_hook(multigrid_scheduler)
logger.info('Finish register multigrid hook')
# subbn3d aggregation is HIGH, as it should be done before
# saving and evaluation
from mmaction.utils.multigrid import SubBatchNorm3dAggregationHook
subbn3d_aggre_hook = SubBatchNorm3dAggregationHook()
runner.register_hook(subbn3d_aggre_hook, priority='VERY_HIGH')
logger.info('Finish register subbn3daggre hook')
# precise bn setting
if cfg.get('precise_bn', False):
precise_bn_dataset = build_dataset(cfg.data.train)
dataloader_setting = dict(
videos_per_gpu=cfg.data.get('videos_per_gpu', 1),
workers_per_gpu=1, # save memory and time
persistent_workers=cfg.data.get('persistent_workers', False),
num_gpus=len(cfg.gpu_ids),
dist=distributed,
seed=cfg.seed)
data_loader_precise_bn = build_dataloader(precise_bn_dataset,
**dataloader_setting)
precise_bn_hook = PreciseBNHook(data_loader_precise_bn,
**cfg.get('precise_bn'))
runner.register_hook(precise_bn_hook, priority='HIGHEST')
logger.info('Finish register precisebn hook')
if distributed:
if cfg.omnisource:
runner.register_hook(OmniSourceDistSamplerSeedHook())
else:
runner.register_hook(DistSamplerSeedHook())
if validate:
eval_cfg = cfg.get('evaluation', {})
val_dataset = build_dataset(cfg.data.val, dict(test_mode=True))
dataloader_setting = dict(
videos_per_gpu=cfg.data.get('videos_per_gpu', 1),
workers_per_gpu=cfg.data.get('workers_per_gpu', 1),
persistent_workers=cfg.data.get('persistent_workers', False),
# cfg.gpus will be ignored if distributed
num_gpus=len(cfg.gpu_ids),
dist=distributed,
shuffle=False)
dataloader_setting = dict(dataloader_setting,
**cfg.data.get('val_dataloader', {}))
val_dataloader = build_dataloader(val_dataset, **dataloader_setting)
eval_hook = DistEvalHook(val_dataloader, **eval_cfg) if distributed \
else EvalHook(val_dataloader, **eval_cfg)
runner.register_hook(eval_hook, priority='LOW')
if cfg.resume_from:
runner.resume(cfg.resume_from)
elif cfg.load_from:
runner.load_checkpoint(cfg.load_from)
runner_kwargs = dict()
if cfg.omnisource:
runner_kwargs = dict(train_ratio=train_ratio)
training_start = time.time()
runner.run(data_loaders, cfg.workflow, cfg.total_epochs, **runner_kwargs)
if distributed:
dist.barrier()
train_time = time.time() - training_start
num_trained_samples_4ranks = cfg.total_epochs * len(dataset[0])
samples_sec_4ranks = num_trained_samples_4ranks / train_time
train_output = f'[PerfLog] {{"event": "TRAIN_END", "value": {{"train_time":{train_time:.2f},"samples/sec: {samples_sec_4ranks:.2f}","num_trained_samples":{num_trained_samples_4ranks}}}}}'
_, world_size = get_dist_info()
logger.info(f"{world_size} ranks total: {train_output}")
time.sleep(5)
if test['test_last'] or test['test_best']:
best_ckpt_path = None
if test['test_best']:
ckpt_paths = [x for x in os.listdir(cfg.work_dir) if 'best' in x]
ckpt_paths = [x for x in ckpt_paths if x.endswith('.pth')]
if len(ckpt_paths) == 0:
runner.logger.info('Warning: test_best set, but no ckpt found')
test['test_best'] = False
if not test['test_last']:
return
elif len(ckpt_paths) > 1:
epoch_ids = [
int(x.split('epoch_')[-1][:-4]) for x in ckpt_paths
]
best_ckpt_path = ckpt_paths[np.argmax(epoch_ids)]
else:
best_ckpt_path = ckpt_paths[0]
if best_ckpt_path:
best_ckpt_path = osp.join(cfg.work_dir, best_ckpt_path)
test_dataset = build_dataset(cfg.data.test, dict(test_mode=True))
gpu_collect = cfg.get('evaluation', {}).get('gpu_collect', False)
tmpdir = cfg.get('evaluation', {}).get('tmpdir',
osp.join(cfg.work_dir, 'tmp'))
dataloader_setting = dict(
videos_per_gpu=cfg.data.get('videos_per_gpu', 1),
workers_per_gpu=cfg.data.get('workers_per_gpu', 1),
persistent_workers=cfg.data.get('persistent_workers', False),
num_gpus=len(cfg.gpu_ids),
dist=distributed,
shuffle=False)
dataloader_setting = dict(dataloader_setting,
**cfg.data.get('test_dataloader', {}))
test_dataloader = build_dataloader(test_dataset, **dataloader_setting)
names, ckpts = [], []
if test['test_last']:
names.append('last')
ckpts.append(None)
if test['test_best'] and best_ckpt_path is not None:
names.append('best')
ckpts.append(best_ckpt_path)
for name, ckpt in zip(names, ckpts):
if ckpt is not None:
runner.load_checkpoint(ckpt)
outputs = multi_gpu_test(runner.model, test_dataloader, tmpdir,
gpu_collect)
rank, _ = get_dist_info()
if rank == 0:
out = osp.join(cfg.work_dir, f'{name}_pred.pkl')
test_dataset.dump_results(outputs, out)
eval_cfg = cfg.get('evaluation', {})
for key in [
'interval', 'tmpdir', 'start', 'gpu_collect',
'save_best', 'rule', 'by_epoch', 'broadcast_bn_buffers'
]:
eval_cfg.pop(key, None)
eval_res = test_dataset.evaluate(outputs, **eval_cfg)
runner.logger.info(f'Testing results of the {name} checkpoint')
for metric_name, val in eval_res.items():
runner.logger.info(f'{metric_name}: {val:.04f}')
# Copyright (c) OpenMMLab. All rights reserved.
from .bbox import * # noqa: F401, F403
from .dist_utils import * # noqa: F401, F403
from .evaluation import * # noqa: F401, F403
from .hooks import * # noqa: F401, F403
from .lr import * # noqa: F401, F403
from .optimizer import * # noqa: F401, F403
from .runner import * # noqa: F401, F403
from .scheduler import * # noqa: F401, F403
# Copyright (c) OpenMMLab. All rights reserved.
from .assigners import MaxIoUAssignerAVA
from .bbox_target import bbox_target
from .transforms import bbox2result
__all__ = ['MaxIoUAssignerAVA', 'bbox_target', 'bbox2result']
# Copyright (c) OpenMMLab. All rights reserved.
from .max_iou_assigner_ava import MaxIoUAssignerAVA
__all__ = ['MaxIoUAssignerAVA']
# Copyright (c) OpenMMLab. All rights reserved.
import torch
try:
from mmdet.core.bbox import AssignResult, MaxIoUAssigner
from mmdet.core.bbox.builder import BBOX_ASSIGNERS
mmdet_imported = True
except (ImportError, ModuleNotFoundError):
mmdet_imported = False
if mmdet_imported:
@BBOX_ASSIGNERS.register_module()
class MaxIoUAssignerAVA(MaxIoUAssigner):
"""Assign a corresponding gt bbox or background to each bbox.
Each proposals will be assigned with `-1`, `0`, or a positive integer
indicating the ground truth index.
- -1: don't care
- 0: negative sample, no assigned gt
- positive integer: positive sample, index (1-based) of assigned gt
Args:
pos_iou_thr (float): IoU threshold for positive bboxes.
neg_iou_thr (float | tuple): IoU threshold for negative bboxes.
min_pos_iou (float): Minimum iou for a bbox to be considered as a
positive bbox. Positive samples can have smaller IoU than
pos_iou_thr due to the 4th step (assign max IoU sample to each
gt). Default: 0.
gt_max_assign_all (bool): Whether to assign all bboxes with the
same highest overlap with some gt to that gt. Default: True.
"""
# The function is overridden, to handle the case that gt_label is not
# int
def assign_wrt_overlaps(self, overlaps, gt_labels=None):
"""Assign w.r.t. the overlaps of bboxes with gts.
Args:
overlaps (Tensor): Overlaps between k gt_bboxes and n bboxes,
shape(k, n).
gt_labels (Tensor, optional): Labels of k gt_bboxes, shape
(k, ).
Returns:
:obj:`AssignResult`: The assign result.
"""
num_gts, num_bboxes = overlaps.size(0), overlaps.size(1)
# 1. assign -1 by default
assigned_gt_inds = overlaps.new_full((num_bboxes, ),
-1,
dtype=torch.long)
if num_gts == 0 or num_bboxes == 0:
# No ground truth or boxes, return empty assignment
max_overlaps = overlaps.new_zeros((num_bboxes, ))
if num_gts == 0:
# No truth, assign everything to background
assigned_gt_inds[:] = 0
if gt_labels is None:
assigned_labels = None
else:
assigned_labels = overlaps.new_full((num_bboxes, ),
-1,
dtype=torch.long)
return AssignResult(
num_gts,
assigned_gt_inds,
max_overlaps,
labels=assigned_labels)
# for each anchor, which gt best overlaps with it
# for each anchor, the max iou of all gts
max_overlaps, argmax_overlaps = overlaps.max(dim=0)
# for each gt, which anchor best overlaps with it
# for each gt, the max iou of all proposals
gt_max_overlaps, gt_argmax_overlaps = overlaps.max(dim=1)
# 2. assign negative: below
# the negative inds are set to be 0
if isinstance(self.neg_iou_thr, float):
assigned_gt_inds[(max_overlaps >= 0)
& (max_overlaps < self.neg_iou_thr)] = 0
elif isinstance(self.neg_iou_thr, tuple):
assert len(self.neg_iou_thr) == 2
assigned_gt_inds[(max_overlaps >= self.neg_iou_thr[0])
& (max_overlaps < self.neg_iou_thr[1])] = 0
# 3. assign positive: above positive IoU threshold
pos_inds = max_overlaps >= self.pos_iou_thr
assigned_gt_inds[pos_inds] = argmax_overlaps[pos_inds] + 1
if self.match_low_quality:
# Low-quality matching will overwrite the assigned_gt_inds
# assigned in Step 3. Thus, the assigned gt might not be the
# best one for prediction.
# For example, if bbox A has 0.9 and 0.8 iou with GT bbox
# 1 & 2, bbox 1 will be assigned as the best target for bbox A
# in step 3. However, if GT bbox 2's gt_argmax_overlaps = A,
# bbox A's assigned_gt_inds will be overwritten to be bbox B.
# This might be the reason that it is not used in ROI Heads.
for i in range(num_gts):
if gt_max_overlaps[i] >= self.min_pos_iou:
if self.gt_max_assign_all:
max_iou_inds = overlaps[i, :] == gt_max_overlaps[i]
assigned_gt_inds[max_iou_inds] = i + 1
else:
assigned_gt_inds[gt_argmax_overlaps[i]] = i + 1
if gt_labels is not None:
# consider multi-class case (AVA)
assert len(gt_labels[0]) > 1
assigned_labels = assigned_gt_inds.new_zeros(
(num_bboxes, len(gt_labels[0])), dtype=torch.float32)
# If not assigned, labels will be all 0
pos_inds = torch.nonzero(
assigned_gt_inds > 0, as_tuple=False).squeeze()
if pos_inds.numel() > 0:
assigned_labels[pos_inds] = gt_labels[
assigned_gt_inds[pos_inds] - 1]
else:
assigned_labels = None
return AssignResult(
num_gts,
assigned_gt_inds,
max_overlaps,
labels=assigned_labels)
else:
# define an empty class, so that can be imported
class MaxIoUAssignerAVA:
def __init__(self, *args, **kwargs):
raise ImportError(
'Failed to import `AssignResult`, `MaxIoUAssigner` from '
'`mmdet.core.bbox` or failed to import `BBOX_ASSIGNERS` from '
'`mmdet.core.bbox.builder`. The class `MaxIoUAssignerAVA` is '
'invalid. ')
# Copyright (c) OpenMMLab. All rights reserved.
import torch
import torch.nn.functional as F
def bbox_target(pos_bboxes_list, neg_bboxes_list, gt_labels, cfg):
"""Generate classification targets for bboxes.
Args:
pos_bboxes_list (list[Tensor]): Positive bboxes list.
neg_bboxes_list (list[Tensor]): Negative bboxes list.
gt_labels (list[Tensor]): Groundtruth classification label list.
cfg (Config): RCNN config.
Returns:
(Tensor, Tensor): Label and label_weight for bboxes.
"""
labels, label_weights = [], []
pos_weight = 1.0 if cfg.pos_weight <= 0 else cfg.pos_weight
assert len(pos_bboxes_list) == len(neg_bboxes_list) == len(gt_labels)
length = len(pos_bboxes_list)
for i in range(length):
pos_bboxes = pos_bboxes_list[i]
neg_bboxes = neg_bboxes_list[i]
gt_label = gt_labels[i]
num_pos = pos_bboxes.size(0)
num_neg = neg_bboxes.size(0)
num_samples = num_pos + num_neg
label = F.pad(gt_label, (0, 0, 0, num_neg))
label_weight = pos_bboxes.new_zeros(num_samples)
label_weight[:num_pos] = pos_weight
label_weight[-num_neg:] = 1.
labels.append(label)
label_weights.append(label_weight)
labels = torch.cat(labels, 0)
label_weights = torch.cat(label_weights, 0)
return labels, label_weights
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
def bbox2result(bboxes, labels, num_classes, thr=0.01):
"""Convert detection results to a list of numpy arrays.
This identifies single-label classification (as opposed to multi-label)
through the thr parameter which is set to a negative value.
Currently, the way to set this is to set
`test_cfg.rcnn.action_thr=-1.0`
ToDo: The ideal way would be for this to be automatically set when the
model cfg uses multilabel=False, however this could be a breaking change
and is left as a future exercise.
NB - this should not interfere with the evaluation in any case.
Args:
bboxes (Tensor): shape (n, 4)
labels (Tensor): shape (n, #num_classes)
num_classes (int): class number, including background class
thr (float): The score threshold used when converting predictions to
detection results. If a single negative value, uses single-label
classification
Returns:
list(ndarray): bbox results of each class
"""
if bboxes.shape[0] == 0:
return list(np.zeros((num_classes - 1, 0, 5), dtype=np.float32))
bboxes = bboxes.cpu().numpy()
scores = labels.cpu().numpy() # rename for clarification
# Although we can handle single-label classification, we still want scores
assert scores.shape[-1] > 1
# Robustly check for multi/single-label:
if not hasattr(thr, '__len__'):
multilabel = thr >= 0
thr = (thr, ) * num_classes
else:
multilabel = True
# Check Shape
assert scores.shape[1] == num_classes
assert len(thr) == num_classes
result = []
for i in range(num_classes - 1):
if multilabel:
where = (scores[:, i + 1] > thr[i + 1])
else:
where = (scores[:, 1:].argmax(axis=1) == i)
result.append(
np.concatenate((bboxes[where, :4], scores[where, i + 1:i + 2]),
axis=1))
return result
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
import torch
import torch.distributed as dist
from mmcv.runner import get_dist_info
from ..utils import default_device
def sync_random_seed(seed=None, device=default_device):
"""Make sure different ranks share the same seed. All workers must call
this function, otherwise it will deadlock. This method is generally used in
`DistributedSampler`, because the seed should be identical across all
processes in the distributed group.
In distributed sampling, different ranks should sample non-overlapped
data in the dataset. Therefore, this function is used to make sure that
each rank shuffles the data indices in the same order based
on the same seed. Then different ranks could use different indices
to select non-overlapped data from the same data list.
Args:
seed (int, Optional): The seed. Default to None.
device (str): The device where the seed will be put on.
Default to 'cuda'.
Returns:
int: Seed to be used.
"""
if seed is None:
seed = np.random.randint(2**31)
assert isinstance(seed, int)
rank, world_size = get_dist_info()
if world_size == 1:
return seed
if rank == 0:
random_num = torch.tensor(seed, dtype=torch.int32, device=device)
else:
random_num = torch.tensor(0, dtype=torch.int32, device=device)
dist.broadcast(random_num, src=0)
return random_num.item()
# Copyright (c) OpenMMLab. All rights reserved.
from .accuracy import (average_precision_at_temporal_iou,
average_recall_at_avg_proposals, confusion_matrix,
get_weighted_score, interpolated_precision_recall,
mean_average_precision, mean_class_accuracy,
mmit_mean_average_precision, pairwise_temporal_iou,
softmax, top_k_accuracy, top_k_classes)
from .eval_detection import ActivityNetLocalization
from .eval_hooks import DistEvalHook, EvalHook
__all__ = [
'DistEvalHook', 'EvalHook', 'top_k_accuracy', 'mean_class_accuracy',
'confusion_matrix', 'mean_average_precision', 'get_weighted_score',
'average_recall_at_avg_proposals', 'pairwise_temporal_iou',
'average_precision_at_temporal_iou', 'ActivityNetLocalization', 'softmax',
'interpolated_precision_recall', 'mmit_mean_average_precision',
'top_k_classes'
]
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
def confusion_matrix(y_pred, y_real, normalize=None):
"""Compute confusion matrix.
Args:
y_pred (list[int] | np.ndarray[int]): Prediction labels.
y_real (list[int] | np.ndarray[int]): Ground truth labels.
normalize (str | None): Normalizes confusion matrix over the true
(rows), predicted (columns) conditions or all the population.
If None, confusion matrix will not be normalized. Options are
"true", "pred", "all", None. Default: None.
Returns:
np.ndarray: Confusion matrix.
"""
if normalize not in ['true', 'pred', 'all', None]:
raise ValueError("normalize must be one of {'true', 'pred', "
"'all', None}")
if isinstance(y_pred, list):
y_pred = np.array(y_pred)
if y_pred.dtype == np.int32:
y_pred = y_pred.astype(np.int64)
if not isinstance(y_pred, np.ndarray):
raise TypeError(
f'y_pred must be list or np.ndarray, but got {type(y_pred)}')
if not y_pred.dtype == np.int64:
raise TypeError(
f'y_pred dtype must be np.int64, but got {y_pred.dtype}')
if isinstance(y_real, list):
y_real = np.array(y_real)
if y_real.dtype == np.int32:
y_real = y_real.astype(np.int64)
if not isinstance(y_real, np.ndarray):
raise TypeError(
f'y_real must be list or np.ndarray, but got {type(y_real)}')
if not y_real.dtype == np.int64:
raise TypeError(
f'y_real dtype must be np.int64, but got {y_real.dtype}')
label_set = np.unique(np.concatenate((y_pred, y_real)))
num_labels = len(label_set)
max_label = label_set[-1]
label_map = np.zeros(max_label + 1, dtype=np.int64)
for i, label in enumerate(label_set):
label_map[label] = i
y_pred_mapped = label_map[y_pred]
y_real_mapped = label_map[y_real]
confusion_mat = np.bincount(
num_labels * y_real_mapped + y_pred_mapped,
minlength=num_labels**2).reshape(num_labels, num_labels)
with np.errstate(all='ignore'):
if normalize == 'true':
confusion_mat = (
confusion_mat / confusion_mat.sum(axis=1, keepdims=True))
elif normalize == 'pred':
confusion_mat = (
confusion_mat / confusion_mat.sum(axis=0, keepdims=True))
elif normalize == 'all':
confusion_mat = (confusion_mat / confusion_mat.sum())
confusion_mat = np.nan_to_num(confusion_mat)
return confusion_mat
def mean_class_accuracy(scores, labels):
"""Calculate mean class accuracy.
Args:
scores (list[np.ndarray]): Prediction scores for each class.
labels (list[int]): Ground truth labels.
Returns:
np.ndarray: Mean class accuracy.
"""
pred = np.argmax(scores, axis=1)
cf_mat = confusion_matrix(pred, labels).astype(float)
cls_cnt = cf_mat.sum(axis=1)
cls_hit = np.diag(cf_mat)
mean_class_acc = np.mean(
[hit / cnt if cnt else 0.0 for cnt, hit in zip(cls_cnt, cls_hit)])
return mean_class_acc
def top_k_classes(scores, labels, k=10, mode='accurate'):
"""Calculate the most K accurate (inaccurate) classes.
Given the prediction scores, ground truth label and top-k value,
compute the top K accurate (inaccurate) classes.
Args:
scores (list[np.ndarray]): Prediction scores for each class.
labels (list[int] | np.ndarray): Ground truth labels.
k (int): Top-k values. Default: 10.
mode (str): Comparison mode for Top-k. Options are 'accurate'
and 'inaccurate'. Default: 'accurate'.
Return:
list: List of sorted (from high accuracy to low accuracy for
'accurate' mode, and from low accuracy to high accuracy for
inaccurate mode) top K classes in format of (label_id,
acc_ratio).
"""
assert mode in ['accurate', 'inaccurate']
pred = np.argmax(scores, axis=1)
cf_mat = confusion_matrix(pred, labels).astype(float)
cls_cnt = cf_mat.sum(axis=1)
cls_hit = np.diag(cf_mat)
hit_ratio = np.array(
[hit / cnt if cnt else 0.0 for cnt, hit in zip(cls_cnt, cls_hit)])
if mode == 'accurate':
max_index = np.argsort(hit_ratio)[-k:][::-1]
max_value = hit_ratio[max_index]
results = list(zip(max_index, max_value))
else:
min_index = np.argsort(hit_ratio)[:k]
min_value = hit_ratio[min_index]
results = list(zip(min_index, min_value))
return results
def top_k_accuracy(scores, labels, topk=(1, )):
"""Calculate top k accuracy score.
Args:
scores (list[np.ndarray]): Prediction scores for each class.
labels (list[int]): Ground truth labels.
topk (tuple[int]): K value for top_k_accuracy. Default: (1, ).
Returns:
list[float]: Top k accuracy score for each k.
"""
res = []
labels = np.array(labels)[:, np.newaxis]
for k in topk:
max_k_preds = np.argsort(scores, axis=1)[:, -k:][:, ::-1]
match_array = np.logical_or.reduce(max_k_preds == labels, axis=1)
topk_acc_score = match_array.sum() / match_array.shape[0]
res.append(topk_acc_score)
return res
def mmit_mean_average_precision(scores, labels):
"""Mean average precision for multi-label recognition. Used for reporting
MMIT style mAP on Multi-Moments in Times. The difference is that this
method calculates average-precision for each sample and averages them among
samples.
Args:
scores (list[np.ndarray]): Prediction scores of different classes for
each sample.
labels (list[np.ndarray]): Ground truth many-hot vector for each
sample.
Returns:
np.float64: The MMIT style mean average precision.
"""
results = []
for score, label in zip(scores, labels):
precision, recall, _ = binary_precision_recall_curve(score, label)
ap = -np.sum(np.diff(recall) * np.array(precision)[:-1])
results.append(ap)
return np.mean(results)
def mean_average_precision(scores, labels):
"""Mean average precision for multi-label recognition.
Args:
scores (list[np.ndarray]): Prediction scores of different classes for
each sample.
labels (list[np.ndarray]): Ground truth many-hot vector for each
sample.
Returns:
np.float64: The mean average precision.
"""
results = []
scores = np.stack(scores).T
labels = np.stack(labels).T
for score, label in zip(scores, labels):
precision, recall, _ = binary_precision_recall_curve(score, label)
ap = -np.sum(np.diff(recall) * np.array(precision)[:-1])
results.append(ap)
results = [x for x in results if not np.isnan(x)]
if results == []:
return np.nan
return np.mean(results)
def binary_precision_recall_curve(y_score, y_true):
"""Calculate the binary precision recall curve at step thresholds.
Args:
y_score (np.ndarray): Prediction scores for each class.
Shape should be (num_classes, ).
y_true (np.ndarray): Ground truth many-hot vector.
Shape should be (num_classes, ).
Returns:
precision (np.ndarray): The precision of different thresholds.
recall (np.ndarray): The recall of different thresholds.
thresholds (np.ndarray): Different thresholds at which precision and
recall are tested.
"""
assert isinstance(y_score, np.ndarray)
assert isinstance(y_true, np.ndarray)
assert y_score.shape == y_true.shape
# make y_true a boolean vector
y_true = (y_true == 1)
# sort scores and corresponding truth values
desc_score_indices = np.argsort(y_score, kind='mergesort')[::-1]
y_score = y_score[desc_score_indices]
y_true = y_true[desc_score_indices]
# There may be ties in values, therefore find the `distinct_value_inds`
distinct_value_inds = np.where(np.diff(y_score))[0]
threshold_inds = np.r_[distinct_value_inds, y_true.size - 1]
# accumulate the true positives with decreasing threshold
tps = np.cumsum(y_true)[threshold_inds]
fps = 1 + threshold_inds - tps
thresholds = y_score[threshold_inds]
precision = tps / (tps + fps)
precision[np.isnan(precision)] = 0
recall = tps / tps[-1]
# stop when full recall attained
# and reverse the outputs so recall is decreasing
last_ind = tps.searchsorted(tps[-1])
sl = slice(last_ind, None, -1)
return np.r_[precision[sl], 1], np.r_[recall[sl], 0], thresholds[sl]
def pairwise_temporal_iou(candidate_segments,
target_segments,
calculate_overlap_self=False):
"""Compute intersection over union between segments.
Args:
candidate_segments (np.ndarray): 1-dim/2-dim array in format
``[init, end]/[m x 2:=[init, end]]``.
target_segments (np.ndarray): 2-dim array in format
``[n x 2:=[init, end]]``.
calculate_overlap_self (bool): Whether to calculate overlap_self
(union / candidate_length) or not. Default: False.
Returns:
t_iou (np.ndarray): 1-dim array [n] /
2-dim array [n x m] with IoU ratio.
t_overlap_self (np.ndarray, optional): 1-dim array [n] /
2-dim array [n x m] with overlap_self, returns when
calculate_overlap_self is True.
"""
candidate_segments_ndim = candidate_segments.ndim
if target_segments.ndim != 2 or candidate_segments_ndim not in [1, 2]:
raise ValueError('Dimension of arguments is incorrect')
if candidate_segments_ndim == 1:
candidate_segments = candidate_segments[np.newaxis, :]
n, m = target_segments.shape[0], candidate_segments.shape[0]
t_iou = np.empty((n, m), dtype=np.float32)
if calculate_overlap_self:
t_overlap_self = np.empty((n, m), dtype=np.float32)
for i in range(m):
candidate_segment = candidate_segments[i, :]
tt1 = np.maximum(candidate_segment[0], target_segments[:, 0])
tt2 = np.minimum(candidate_segment[1], target_segments[:, 1])
# Intersection including Non-negative overlap score.
segments_intersection = (tt2 - tt1).clip(0)
# Segment union.
segments_union = ((target_segments[:, 1] - target_segments[:, 0]) +
(candidate_segment[1] - candidate_segment[0]) -
segments_intersection)
# Compute overlap as the ratio of the intersection
# over union of two segments.
t_iou[:, i] = (segments_intersection.astype(float) / segments_union)
if calculate_overlap_self:
candidate_length = candidate_segment[1] - candidate_segment[0]
t_overlap_self[:, i] = (
segments_intersection.astype(float) / candidate_length)
if candidate_segments_ndim == 1:
t_iou = np.squeeze(t_iou, axis=1)
if calculate_overlap_self:
if candidate_segments_ndim == 1:
t_overlap_self = np.squeeze(t_overlap_self, axis=1)
return t_iou, t_overlap_self
return t_iou
def average_recall_at_avg_proposals(ground_truth,
proposals,
total_num_proposals,
max_avg_proposals=None,
temporal_iou_thresholds=np.linspace(
0.5, 0.95, 10)):
"""Computes the average recall given an average number (percentile) of
proposals per video.
Args:
ground_truth (dict): Dict containing the ground truth instances.
proposals (dict): Dict containing the proposal instances.
total_num_proposals (int): Total number of proposals in the
proposal dict.
max_avg_proposals (int | None): Max number of proposals for one video.
Default: None.
temporal_iou_thresholds (np.ndarray): 1D array with temporal_iou
thresholds. Default: ``np.linspace(0.5, 0.95, 10)``.
Returns:
tuple([np.ndarray, np.ndarray, np.ndarray, float]):
(recall, average_recall, proposals_per_video, auc)
In recall, ``recall[i,j]`` is recall at i-th temporal_iou threshold
at the j-th average number (percentile) of average number of
proposals per video. The average_recall is recall averaged
over a list of temporal_iou threshold (1D array). This is
equivalent to ``recall.mean(axis=0)``. The ``proposals_per_video``
is the average number of proposals per video. The auc is the area
under ``AR@AN`` curve.
"""
total_num_videos = len(ground_truth)
if not max_avg_proposals:
max_avg_proposals = float(total_num_proposals) / total_num_videos
ratio = (max_avg_proposals * float(total_num_videos) / total_num_proposals)
# For each video, compute temporal_iou scores among the retrieved proposals
score_list = []
total_num_retrieved_proposals = 0
for video_id in ground_truth:
# Get proposals for this video.
proposals_video_id = proposals[video_id]
this_video_proposals = proposals_video_id[:, :2]
# Sort proposals by score.
sort_idx = proposals_video_id[:, 2].argsort()[::-1]
this_video_proposals = this_video_proposals[sort_idx, :].astype(
np.float32)
# Get ground-truth instances associated to this video.
ground_truth_video_id = ground_truth[video_id]
this_video_ground_truth = ground_truth_video_id[:, :2].astype(
np.float32)
if this_video_proposals.shape[0] == 0:
n = this_video_ground_truth.shape[0]
score_list.append(np.zeros((n, 1)))
continue
if this_video_proposals.ndim != 2:
this_video_proposals = np.expand_dims(this_video_proposals, axis=0)
if this_video_ground_truth.ndim != 2:
this_video_ground_truth = np.expand_dims(
this_video_ground_truth, axis=0)
num_retrieved_proposals = np.minimum(
int(this_video_proposals.shape[0] * ratio),
this_video_proposals.shape[0])
total_num_retrieved_proposals += num_retrieved_proposals
this_video_proposals = this_video_proposals[:
num_retrieved_proposals, :]
# Compute temporal_iou scores.
t_iou = pairwise_temporal_iou(this_video_proposals,
this_video_ground_truth)
score_list.append(t_iou)
# Given that the length of the videos is really varied, we
# compute the number of proposals in terms of a ratio of the total
# proposals retrieved, i.e. average recall at a percentage of proposals
# retrieved per video.
# Computes average recall.
pcn_list = np.arange(1, 101) / 100.0 * (
max_avg_proposals * float(total_num_videos) /
total_num_retrieved_proposals)
matches = np.empty((total_num_videos, pcn_list.shape[0]))
positives = np.empty(total_num_videos)
recall = np.empty((temporal_iou_thresholds.shape[0], pcn_list.shape[0]))
# Iterates over each temporal_iou threshold.
for ridx, temporal_iou in enumerate(temporal_iou_thresholds):
# Inspect positives retrieved per video at different
# number of proposals (percentage of the total retrieved).
for i, score in enumerate(score_list):
# Total positives per video.
positives[i] = score.shape[0]
# Find proposals that satisfies minimum temporal_iou threshold.
true_positives_temporal_iou = score >= temporal_iou
# Get number of proposals as a percentage of total retrieved.
pcn_proposals = np.minimum(
(score.shape[1] * pcn_list).astype(np.int64), score.shape[1])
for j, num_retrieved_proposals in enumerate(pcn_proposals):
# Compute the number of matches
# for each percentage of the proposals
matches[i, j] = np.count_nonzero(
(true_positives_temporal_iou[:, :num_retrieved_proposals]
).sum(axis=1))
# Computes recall given the set of matches per video.
recall[ridx, :] = matches.sum(axis=0) / positives.sum()
# Recall is averaged.
avg_recall = recall.mean(axis=0)
# Get the average number of proposals per video.
proposals_per_video = pcn_list * (
float(total_num_retrieved_proposals) / total_num_videos)
# Get AUC
area_under_curve = np.trapz(avg_recall, proposals_per_video)
auc = 100. * float(area_under_curve) / proposals_per_video[-1]
return recall, avg_recall, proposals_per_video, auc
def get_weighted_score(score_list, coeff_list):
"""Get weighted score with given scores and coefficients.
Given n predictions by different classifier: [score_1, score_2, ...,
score_n] (score_list) and their coefficients: [coeff_1, coeff_2, ...,
coeff_n] (coeff_list), return weighted score: weighted_score =
score_1 * coeff_1 + score_2 * coeff_2 + ... + score_n * coeff_n
Args:
score_list (list[list[np.ndarray]]): List of list of scores, with shape
n(number of predictions) X num_samples X num_classes
coeff_list (list[float]): List of coefficients, with shape n.
Returns:
list[np.ndarray]: List of weighted scores.
"""
assert len(score_list) == len(coeff_list)
num_samples = len(score_list[0])
for i in range(1, len(score_list)):
assert len(score_list[i]) == num_samples
scores = np.array(score_list) # (num_coeff, num_samples, num_classes)
coeff = np.array(coeff_list) # (num_coeff, )
weighted_scores = list(np.dot(scores.T, coeff).T)
return weighted_scores
def softmax(x, dim=1):
"""Compute softmax values for each sets of scores in x."""
e_x = np.exp(x - np.max(x, axis=dim, keepdims=True))
return e_x / e_x.sum(axis=dim, keepdims=True)
def interpolated_precision_recall(precision, recall):
"""Interpolated AP - VOCdevkit from VOC 2011.
Args:
precision (np.ndarray): The precision of different thresholds.
recall (np.ndarray): The recall of different thresholds.
Returns:
float: Average precision score.
"""
mprecision = np.hstack([[0], precision, [0]])
mrecall = np.hstack([[0], recall, [1]])
for i in range(len(mprecision) - 1)[::-1]:
mprecision[i] = max(mprecision[i], mprecision[i + 1])
idx = np.where(mrecall[1::] != mrecall[0:-1])[0] + 1
ap = np.sum((mrecall[idx] - mrecall[idx - 1]) * mprecision[idx])
return ap
def average_precision_at_temporal_iou(ground_truth,
prediction,
temporal_iou_thresholds=(np.linspace(
0.5, 0.95, 10))):
"""Compute average precision (in detection task) between ground truth and
predicted data frames. If multiple predictions match the same predicted
segment, only the one with highest score is matched as true positive. This
code is greatly inspired by Pascal VOC devkit.
Args:
ground_truth (dict): Dict containing the ground truth instances.
Key: 'video_id'
Value (np.ndarray): 1D array of 't-start' and 't-end'.
prediction (np.ndarray): 2D array containing the information of
proposal instances, including 'video_id', 'class_id', 't-start',
't-end' and 'score'.
temporal_iou_thresholds (np.ndarray): 1D array with temporal_iou
thresholds. Default: ``np.linspace(0.5, 0.95, 10)``.
Returns:
np.ndarray: 1D array of average precision score.
"""
ap = np.zeros(len(temporal_iou_thresholds), dtype=np.float32)
if len(prediction) < 1:
return ap
num_gts = 0.
lock_gt = dict()
for key in ground_truth:
lock_gt[key] = np.ones(
(len(temporal_iou_thresholds), len(ground_truth[key]))) * -1
num_gts += len(ground_truth[key])
# Sort predictions by decreasing score order.
prediction = np.array(prediction)
scores = prediction[:, 4].astype(float)
sort_idx = np.argsort(scores)[::-1]
prediction = prediction[sort_idx]
# Initialize true positive and false positive vectors.
tp = np.zeros((len(temporal_iou_thresholds), len(prediction)),
dtype=np.int32)
fp = np.zeros((len(temporal_iou_thresholds), len(prediction)),
dtype=np.int32)
# Assigning true positive to truly grount truth instances.
for idx, this_pred in enumerate(prediction):
# Check if there is at least one ground truth in the video.
if this_pred[0] in ground_truth:
this_gt = np.array(ground_truth[this_pred[0]], dtype=float)
else:
fp[:, idx] = 1
continue
t_iou = pairwise_temporal_iou(this_pred[2:4].astype(float), this_gt)
# We would like to retrieve the predictions with highest t_iou score.
t_iou_sorted_idx = t_iou.argsort()[::-1]
for t_idx, t_iou_threshold in enumerate(temporal_iou_thresholds):
for jdx in t_iou_sorted_idx:
if t_iou[jdx] < t_iou_threshold:
fp[t_idx, idx] = 1
break
if lock_gt[this_pred[0]][t_idx, jdx] >= 0:
continue
# Assign as true positive after the filters above.
tp[t_idx, idx] = 1
lock_gt[this_pred[0]][t_idx, jdx] = idx
break
if fp[t_idx, idx] == 0 and tp[t_idx, idx] == 0:
fp[t_idx, idx] = 1
tp_cumsum = np.cumsum(tp, axis=1).astype(np.float32)
fp_cumsum = np.cumsum(fp, axis=1).astype(np.float32)
recall_cumsum = tp_cumsum / num_gts
precision_cumsum = tp_cumsum / (tp_cumsum + fp_cumsum)
for t_idx in range(len(temporal_iou_thresholds)):
ap[t_idx] = interpolated_precision_recall(precision_cumsum[t_idx, :],
recall_cumsum[t_idx, :])
return ap
The code under this folder is from the official [ActivityNet repo](https://github.com/activitynet/ActivityNet).
Some unused codes are removed to minimize the length of codes added.
# Copyright (c) OpenMMLab. All rights reserved.
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Functions for computing metrics like precision, recall, CorLoc and etc."""
import numpy as np
def compute_precision_recall(scores, labels, num_gt):
"""Compute precision and recall.
Args:
scores: A float numpy array representing detection score
labels: A boolean numpy array representing true/false positive labels
num_gt: Number of ground truth instances
Raises:
ValueError: if the input is not of the correct format
Returns:
precision: Fraction of positive instances over detected ones. This
value is None if no ground truth labels are present.
recall: Fraction of detected positive instance over all positive
instances. This value is None if no ground truth labels are
present.
"""
if (not isinstance(labels, np.ndarray) or labels.dtype != bool
or len(labels.shape) != 1):
raise ValueError('labels must be single dimension bool numpy array')
if not isinstance(scores, np.ndarray) or len(scores.shape) != 1:
raise ValueError('scores must be single dimension numpy array')
if num_gt < np.sum(labels):
raise ValueError(
'Number of true positives must be smaller than num_gt.')
if len(scores) != len(labels):
raise ValueError('scores and labels must be of the same size.')
if num_gt == 0:
return None, None
sorted_indices = np.argsort(scores)
sorted_indices = sorted_indices[::-1]
labels = labels.astype(int)
true_positive_labels = labels[sorted_indices]
false_positive_labels = 1 - true_positive_labels
cum_true_positives = np.cumsum(true_positive_labels)
cum_false_positives = np.cumsum(false_positive_labels)
precision = cum_true_positives.astype(float) / (
cum_true_positives + cum_false_positives)
recall = cum_true_positives.astype(float) / num_gt
return precision, recall
def compute_average_precision(precision, recall):
"""Compute Average Precision according to the definition in VOCdevkit.
Precision is modified to ensure that it does not decrease as recall
decrease.
Args:
precision: A float [N, 1] numpy array of precisions
recall: A float [N, 1] numpy array of recalls
Raises:
ValueError: if the input is not of the correct format
Returns:
average_precison: The area under the precision recall curve. NaN if
precision and recall are None.
"""
if precision is None:
if recall is not None:
raise ValueError('If precision is None, recall must also be None')
return np.NAN
if not isinstance(precision, np.ndarray) or not isinstance(
recall, np.ndarray):
raise ValueError('precision and recall must be numpy array')
if precision.dtype != np.float64 or recall.dtype != np.float64:
raise ValueError('input must be float numpy array.')
if len(precision) != len(recall):
raise ValueError('precision and recall must be of the same size.')
if not precision.size:
return 0.0
if np.amin(precision) < 0 or np.amax(precision) > 1:
raise ValueError('Precision must be in the range of [0, 1].')
if np.amin(recall) < 0 or np.amax(recall) > 1:
raise ValueError('recall must be in the range of [0, 1].')
if not all(recall[i] <= recall[i + 1] for i in range(len(recall) - 1)):
raise ValueError('recall must be a non-decreasing array')
recall = np.concatenate([[0], recall, [1]])
precision = np.concatenate([[0], precision, [0]])
# Preprocess precision to be a non-decreasing array
for i in range(len(precision) - 2, -1, -1):
precision[i] = np.maximum(precision[i], precision[i + 1])
indices = np.where(recall[1:] != recall[:-1])[0] + 1
average_precision = np.sum(
(recall[indices] - recall[indices - 1]) * precision[indices])
return average_precision
def compute_cor_loc(num_gt_imgs_per_class,
num_images_correctly_detected_per_class):
"""Compute CorLoc according to the definition in the following paper.
https://www.robots.ox.ac.uk/~vgg/rg/papers/deselaers-eccv10.pdf
Returns nans if there are no ground truth images for a class.
Args:
num_gt_imgs_per_class: 1D array, representing number of images
containing at least one object instance of a particular class
num_images_correctly_detected_per_class: 1D array, representing number
of images that are correctly detected at least one object instance
of a particular class
Returns:
corloc_per_class: A float numpy array represents the corloc score of
each class
"""
# Divide by zero expected for classes with no gt examples.
with np.errstate(divide='ignore', invalid='ignore'):
return np.where(
num_gt_imgs_per_class == 0, np.nan,
num_images_correctly_detected_per_class / num_gt_imgs_per_class)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment