Commit a8562a56 authored by luopl's avatar luopl
Browse files

Initial commit

parents
Pipeline #1564 canceled with stages
# Copyright (c) OpenMMLab. All rights reserved.
import mmcv
import mmengine
from mmengine.utils import digit_version
from .version import __version__, version_info
mmcv_minimum_version = '2.0.0rc4'
mmcv_maximum_version = '2.2.0'
mmcv_version = digit_version(mmcv.__version__)
mmengine_minimum_version = '0.7.1'
mmengine_maximum_version = '1.0.0'
mmengine_version = digit_version(mmengine.__version__)
assert (mmcv_version >= digit_version(mmcv_minimum_version)
and mmcv_version < digit_version(mmcv_maximum_version)), \
f'MMCV=={mmcv.__version__} is used but incompatible. ' \
f'Please install mmcv>={mmcv_minimum_version}, <{mmcv_maximum_version}.'
assert (mmengine_version >= digit_version(mmengine_minimum_version)
and mmengine_version < digit_version(mmengine_maximum_version)), \
f'MMEngine=={mmengine.__version__} is used but incompatible. ' \
f'Please install mmengine>={mmengine_minimum_version}, ' \
f'<{mmengine_maximum_version}.'
__all__ = ['__version__', 'version_info', 'digit_version']
# Copyright (c) OpenMMLab. All rights reserved.
from .det_inferencer import DetInferencer
from .inference import (async_inference_detector, inference_detector,
inference_mot, init_detector, init_track_model)
__all__ = [
'init_detector', 'async_inference_detector', 'inference_detector',
'DetInferencer', 'inference_mot', 'init_track_model'
]
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import os.path as osp
import warnings
from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
import mmcv
import mmengine
import numpy as np
import torch.nn as nn
from mmcv.transforms import LoadImageFromFile
from mmengine.dataset import Compose
from mmengine.fileio import (get_file_backend, isdir, join_path,
list_dir_or_file)
from mmengine.infer.infer import BaseInferencer, ModelType
from mmengine.model.utils import revert_sync_batchnorm
from mmengine.registry import init_default_scope
from mmengine.runner.checkpoint import _load_checkpoint_to_model
from mmengine.visualization import Visualizer
from rich.progress import track
from mmdet.evaluation import INSTANCE_OFFSET
from mmdet.registry import DATASETS
from mmdet.structures import DetDataSample
from mmdet.structures.mask import encode_mask_results, mask2bbox
from mmdet.utils import ConfigType
from ..evaluation import get_classes
try:
from panopticapi.evaluation import VOID
from panopticapi.utils import id2rgb
except ImportError:
id2rgb = None
VOID = None
InputType = Union[str, np.ndarray]
InputsType = Union[InputType, Sequence[InputType]]
PredType = List[DetDataSample]
ImgType = Union[np.ndarray, Sequence[np.ndarray]]
IMG_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif',
'.tiff', '.webp')
class DetInferencer(BaseInferencer):
"""Object Detection Inferencer.
Args:
model (str, optional): Path to the config file or the model name
defined in metafile. For example, it could be
"rtmdet-s" or 'rtmdet_s_8xb32-300e_coco' or
"configs/rtmdet/rtmdet_s_8xb32-300e_coco.py".
If model is not specified, user must provide the
`weights` saved by MMEngine which contains the config string.
Defaults to None.
weights (str, optional): Path to the checkpoint. If it is not specified
and model is a model name of metafile, the weights will be loaded
from metafile. Defaults to None.
device (str, optional): Device to run inference. If None, the available
device will be automatically used. Defaults to None.
scope (str, optional): The scope of the model. Defaults to mmdet.
palette (str): Color palette used for visualization. The order of
priority is palette -> config -> checkpoint. Defaults to 'none'.
show_progress (bool): Control whether to display the progress
bar during the inference process. Defaults to True.
"""
preprocess_kwargs: set = set()
forward_kwargs: set = set()
visualize_kwargs: set = {
'return_vis',
'show',
'wait_time',
'draw_pred',
'pred_score_thr',
'img_out_dir',
'no_save_vis',
}
postprocess_kwargs: set = {
'print_result',
'pred_out_dir',
'return_datasamples',
'no_save_pred',
}
def __init__(self,
model: Optional[Union[ModelType, str]] = None,
weights: Optional[str] = None,
device: Optional[str] = None,
scope: Optional[str] = 'mmdet',
palette: str = 'none',
show_progress: bool = True) -> None:
# A global counter tracking the number of images processed, for
# naming of the output images
self.num_visualized_imgs = 0
self.num_predicted_imgs = 0
self.palette = palette
init_default_scope(scope)
super().__init__(
model=model, weights=weights, device=device, scope=scope)
self.model = revert_sync_batchnorm(self.model)
self.show_progress = show_progress
def _load_weights_to_model(self, model: nn.Module,
checkpoint: Optional[dict],
cfg: Optional[ConfigType]) -> None:
"""Loading model weights and meta information from cfg and checkpoint.
Args:
model (nn.Module): Model to load weights and meta information.
checkpoint (dict, optional): The loaded checkpoint.
cfg (Config or ConfigDict, optional): The loaded config.
"""
if checkpoint is not None:
_load_checkpoint_to_model(model, checkpoint)
checkpoint_meta = checkpoint.get('meta', {})
# save the dataset_meta in the model for convenience
if 'dataset_meta' in checkpoint_meta:
# mmdet 3.x, all keys should be lowercase
model.dataset_meta = {
k.lower(): v
for k, v in checkpoint_meta['dataset_meta'].items()
}
elif 'CLASSES' in checkpoint_meta:
# < mmdet 3.x
classes = checkpoint_meta['CLASSES']
model.dataset_meta = {'classes': classes}
else:
warnings.warn(
'dataset_meta or class names are not saved in the '
'checkpoint\'s meta data, use COCO classes by default.')
model.dataset_meta = {'classes': get_classes('coco')}
else:
warnings.warn('Checkpoint is not loaded, and the inference '
'result is calculated by the randomly initialized '
'model!')
warnings.warn('weights is None, use COCO classes by default.')
model.dataset_meta = {'classes': get_classes('coco')}
# Priority: args.palette -> config -> checkpoint
if self.palette != 'none':
model.dataset_meta['palette'] = self.palette
else:
test_dataset_cfg = copy.deepcopy(cfg.test_dataloader.dataset)
# lazy init. We only need the metainfo.
test_dataset_cfg['lazy_init'] = True
metainfo = DATASETS.build(test_dataset_cfg).metainfo
cfg_palette = metainfo.get('palette', None)
if cfg_palette is not None:
model.dataset_meta['palette'] = cfg_palette
else:
if 'palette' not in model.dataset_meta:
warnings.warn(
'palette does not exist, random is used by default. '
'You can also set the palette to customize.')
model.dataset_meta['palette'] = 'random'
def _init_pipeline(self, cfg: ConfigType) -> Compose:
"""Initialize the test pipeline."""
pipeline_cfg = cfg.test_dataloader.dataset.pipeline
# For inference, the key of ``img_id`` is not used.
if 'meta_keys' in pipeline_cfg[-1]:
pipeline_cfg[-1]['meta_keys'] = tuple(
meta_key for meta_key in pipeline_cfg[-1]['meta_keys']
if meta_key != 'img_id')
load_img_idx = self._get_transform_idx(
pipeline_cfg, ('LoadImageFromFile', LoadImageFromFile))
if load_img_idx == -1:
raise ValueError(
'LoadImageFromFile is not found in the test pipeline')
pipeline_cfg[load_img_idx]['type'] = 'mmdet.InferencerLoader'
return Compose(pipeline_cfg)
def _get_transform_idx(self, pipeline_cfg: ConfigType,
name: Union[str, Tuple[str, type]]) -> int:
"""Returns the index of the transform in a pipeline.
If the transform is not found, returns -1.
"""
for i, transform in enumerate(pipeline_cfg):
if transform['type'] in name:
return i
return -1
def _init_visualizer(self, cfg: ConfigType) -> Optional[Visualizer]:
"""Initialize visualizers.
Args:
cfg (ConfigType): Config containing the visualizer information.
Returns:
Visualizer or None: Visualizer initialized with config.
"""
visualizer = super()._init_visualizer(cfg)
visualizer.dataset_meta = self.model.dataset_meta
return visualizer
def _inputs_to_list(self, inputs: InputsType) -> list:
"""Preprocess the inputs to a list.
Preprocess inputs to a list according to its type:
- list or tuple: return inputs
- str:
- Directory path: return all files in the directory
- other cases: return a list containing the string. The string
could be a path to file, a url or other types of string according
to the task.
Args:
inputs (InputsType): Inputs for the inferencer.
Returns:
list: List of input for the :meth:`preprocess`.
"""
if isinstance(inputs, str):
backend = get_file_backend(inputs)
if hasattr(backend, 'isdir') and isdir(inputs):
# Backends like HttpsBackend do not implement `isdir`, so only
# those backends that implement `isdir` could accept the inputs
# as a directory
filename_list = list_dir_or_file(
inputs, list_dir=False, suffix=IMG_EXTENSIONS)
inputs = [
join_path(inputs, filename) for filename in filename_list
]
if not isinstance(inputs, (list, tuple)):
inputs = [inputs]
return list(inputs)
def preprocess(self, inputs: InputsType, batch_size: int = 1, **kwargs):
"""Process the inputs into a model-feedable format.
Customize your preprocess by overriding this method. Preprocess should
return an iterable object, of which each item will be used as the
input of ``model.test_step``.
``BaseInferencer.preprocess`` will return an iterable chunked data,
which will be used in __call__ like this:
.. code-block:: python
def __call__(self, inputs, batch_size=1, **kwargs):
chunked_data = self.preprocess(inputs, batch_size, **kwargs)
for batch in chunked_data:
preds = self.forward(batch, **kwargs)
Args:
inputs (InputsType): Inputs given by user.
batch_size (int): batch size. Defaults to 1.
Yields:
Any: Data processed by the ``pipeline`` and ``collate_fn``.
"""
chunked_data = self._get_chunk_data(inputs, batch_size)
yield from map(self.collate_fn, chunked_data)
def _get_chunk_data(self, inputs: Iterable, chunk_size: int):
"""Get batch data from inputs.
Args:
inputs (Iterable): An iterable dataset.
chunk_size (int): Equivalent to batch size.
Yields:
list: batch data.
"""
inputs_iter = iter(inputs)
while True:
try:
chunk_data = []
for _ in range(chunk_size):
inputs_ = next(inputs_iter)
if isinstance(inputs_, dict):
if 'img' in inputs_:
ori_inputs_ = inputs_['img']
else:
ori_inputs_ = inputs_['img_path']
chunk_data.append(
(ori_inputs_,
self.pipeline(copy.deepcopy(inputs_))))
else:
chunk_data.append((inputs_, self.pipeline(inputs_)))
yield chunk_data
except StopIteration:
if chunk_data:
yield chunk_data
break
# TODO: Video and Webcam are currently not supported and
# may consume too much memory if your input folder has a lot of images.
# We will be optimized later.
def __call__(
self,
inputs: InputsType,
batch_size: int = 1,
return_vis: bool = False,
show: bool = False,
wait_time: int = 0,
no_save_vis: bool = False,
draw_pred: bool = True,
pred_score_thr: float = 0.3,
return_datasamples: bool = False,
print_result: bool = False,
no_save_pred: bool = True,
out_dir: str = '',
# by open image task
texts: Optional[Union[str, list]] = None,
# by open panoptic task
stuff_texts: Optional[Union[str, list]] = None,
# by GLIP and Grounding DINO
custom_entities: bool = False,
# by Grounding DINO
tokens_positive: Optional[Union[int, list]] = None,
**kwargs) -> dict:
"""Call the inferencer.
Args:
inputs (InputsType): Inputs for the inferencer.
batch_size (int): Inference batch size. Defaults to 1.
show (bool): Whether to display the visualization results in a
popup window. Defaults to False.
wait_time (float): The interval of show (s). Defaults to 0.
no_save_vis (bool): Whether to force not to save prediction
vis results. Defaults to False.
draw_pred (bool): Whether to draw predicted bounding boxes.
Defaults to True.
pred_score_thr (float): Minimum score of bboxes to draw.
Defaults to 0.3.
return_datasamples (bool): Whether to return results as
:obj:`DetDataSample`. Defaults to False.
print_result (bool): Whether to print the inference result w/o
visualization to the console. Defaults to False.
no_save_pred (bool): Whether to force not to save prediction
results. Defaults to True.
out_dir: Dir to save the inference results or
visualization. If left as empty, no file will be saved.
Defaults to ''.
texts (str | list[str]): Text prompts. Defaults to None.
stuff_texts (str | list[str]): Stuff text prompts of open
panoptic task. Defaults to None.
custom_entities (bool): Whether to use custom entities.
Defaults to False. Only used in GLIP and Grounding DINO.
**kwargs: Other keyword arguments passed to :meth:`preprocess`,
:meth:`forward`, :meth:`visualize` and :meth:`postprocess`.
Each key in kwargs should be in the corresponding set of
``preprocess_kwargs``, ``forward_kwargs``, ``visualize_kwargs``
and ``postprocess_kwargs``.
Returns:
dict: Inference and visualization results.
"""
(
preprocess_kwargs,
forward_kwargs,
visualize_kwargs,
postprocess_kwargs,
) = self._dispatch_kwargs(**kwargs)
ori_inputs = self._inputs_to_list(inputs)
if texts is not None and isinstance(texts, str):
texts = [texts] * len(ori_inputs)
if stuff_texts is not None and isinstance(stuff_texts, str):
stuff_texts = [stuff_texts] * len(ori_inputs)
# Currently only supports bs=1
tokens_positive = [tokens_positive] * len(ori_inputs)
if texts is not None:
assert len(texts) == len(ori_inputs)
for i in range(len(texts)):
if isinstance(ori_inputs[i], str):
ori_inputs[i] = {
'text': texts[i],
'img_path': ori_inputs[i],
'custom_entities': custom_entities,
'tokens_positive': tokens_positive[i]
}
else:
ori_inputs[i] = {
'text': texts[i],
'img': ori_inputs[i],
'custom_entities': custom_entities,
'tokens_positive': tokens_positive[i]
}
if stuff_texts is not None:
assert len(stuff_texts) == len(ori_inputs)
for i in range(len(stuff_texts)):
ori_inputs[i]['stuff_text'] = stuff_texts[i]
inputs = self.preprocess(
ori_inputs, batch_size=batch_size, **preprocess_kwargs)
results_dict = {'predictions': [], 'visualization': []}
for ori_imgs, data in (track(inputs, description='Inference')
if self.show_progress else inputs):
preds = self.forward(data, **forward_kwargs)
visualization = self.visualize(
ori_imgs,
preds,
return_vis=return_vis,
show=show,
wait_time=wait_time,
draw_pred=draw_pred,
pred_score_thr=pred_score_thr,
no_save_vis=no_save_vis,
img_out_dir=out_dir,
**visualize_kwargs)
results = self.postprocess(
preds,
visualization,
return_datasamples=return_datasamples,
print_result=print_result,
no_save_pred=no_save_pred,
pred_out_dir=out_dir,
**postprocess_kwargs)
results_dict['predictions'].extend(results['predictions'])
if results['visualization'] is not None:
results_dict['visualization'].extend(results['visualization'])
return results_dict
def visualize(self,
inputs: InputsType,
preds: PredType,
return_vis: bool = False,
show: bool = False,
wait_time: int = 0,
draw_pred: bool = True,
pred_score_thr: float = 0.3,
no_save_vis: bool = False,
img_out_dir: str = '',
**kwargs) -> Union[List[np.ndarray], None]:
"""Visualize predictions.
Args:
inputs (List[Union[str, np.ndarray]]): Inputs for the inferencer.
preds (List[:obj:`DetDataSample`]): Predictions of the model.
return_vis (bool): Whether to return the visualization result.
Defaults to False.
show (bool): Whether to display the image in a popup window.
Defaults to False.
wait_time (float): The interval of show (s). Defaults to 0.
draw_pred (bool): Whether to draw predicted bounding boxes.
Defaults to True.
pred_score_thr (float): Minimum score of bboxes to draw.
Defaults to 0.3.
no_save_vis (bool): Whether to force not to save prediction
vis results. Defaults to False.
img_out_dir (str): Output directory of visualization results.
If left as empty, no file will be saved. Defaults to ''.
Returns:
List[np.ndarray] or None: Returns visualization results only if
applicable.
"""
if no_save_vis is True:
img_out_dir = ''
if not show and img_out_dir == '' and not return_vis:
return None
if self.visualizer is None:
raise ValueError('Visualization needs the "visualizer" term'
'defined in the config, but got None.')
results = []
for single_input, pred in zip(inputs, preds):
if isinstance(single_input, str):
img_bytes = mmengine.fileio.get(single_input)
img = mmcv.imfrombytes(img_bytes)
img = img[:, :, ::-1]
img_name = osp.basename(single_input)
elif isinstance(single_input, np.ndarray):
img = single_input.copy()
img_num = str(self.num_visualized_imgs).zfill(8)
img_name = f'{img_num}.jpg'
else:
raise ValueError('Unsupported input type: '
f'{type(single_input)}')
out_file = osp.join(img_out_dir, 'vis',
img_name) if img_out_dir != '' else None
self.visualizer.add_datasample(
img_name,
img,
pred,
show=show,
wait_time=wait_time,
draw_gt=False,
draw_pred=draw_pred,
pred_score_thr=pred_score_thr,
out_file=out_file,
)
results.append(self.visualizer.get_image())
self.num_visualized_imgs += 1
return results
def postprocess(
self,
preds: PredType,
visualization: Optional[List[np.ndarray]] = None,
return_datasamples: bool = False,
print_result: bool = False,
no_save_pred: bool = False,
pred_out_dir: str = '',
**kwargs,
) -> Dict:
"""Process the predictions and visualization results from ``forward``
and ``visualize``.
This method should be responsible for the following tasks:
1. Convert datasamples into a json-serializable dict if needed.
2. Pack the predictions and visualization results and return them.
3. Dump or log the predictions.
Args:
preds (List[:obj:`DetDataSample`]): Predictions of the model.
visualization (Optional[np.ndarray]): Visualized predictions.
return_datasamples (bool): Whether to use Datasample to store
inference results. If False, dict will be used.
print_result (bool): Whether to print the inference result w/o
visualization to the console. Defaults to False.
no_save_pred (bool): Whether to force not to save prediction
results. Defaults to False.
pred_out_dir: Dir to save the inference results w/o
visualization. If left as empty, no file will be saved.
Defaults to ''.
Returns:
dict: Inference and visualization results with key ``predictions``
and ``visualization``.
- ``visualization`` (Any): Returned by :meth:`visualize`.
- ``predictions`` (dict or DataSample): Returned by
:meth:`forward` and processed in :meth:`postprocess`.
If ``return_datasamples=False``, it usually should be a
json-serializable dict containing only basic data elements such
as strings and numbers.
"""
if no_save_pred is True:
pred_out_dir = ''
result_dict = {}
results = preds
if not return_datasamples:
results = []
for pred in preds:
result = self.pred2dict(pred, pred_out_dir)
results.append(result)
elif pred_out_dir != '':
warnings.warn('Currently does not support saving datasample '
'when return_datasamples is set to True. '
'Prediction results are not saved!')
# Add img to the results after printing and dumping
result_dict['predictions'] = results
if print_result:
print(result_dict)
result_dict['visualization'] = visualization
return result_dict
# TODO: The data format and fields saved in json need further discussion.
# Maybe should include model name, timestamp, filename, image info etc.
def pred2dict(self,
data_sample: DetDataSample,
pred_out_dir: str = '') -> Dict:
"""Extract elements necessary to represent a prediction into a
dictionary.
It's better to contain only basic data elements such as strings and
numbers in order to guarantee it's json-serializable.
Args:
data_sample (:obj:`DetDataSample`): Predictions of the model.
pred_out_dir: Dir to save the inference results w/o
visualization. If left as empty, no file will be saved.
Defaults to ''.
Returns:
dict: Prediction results.
"""
is_save_pred = True
if pred_out_dir == '':
is_save_pred = False
if is_save_pred and 'img_path' in data_sample:
img_path = osp.basename(data_sample.img_path)
img_path = osp.splitext(img_path)[0]
out_img_path = osp.join(pred_out_dir, 'preds',
img_path + '_panoptic_seg.png')
out_json_path = osp.join(pred_out_dir, 'preds', img_path + '.json')
elif is_save_pred:
out_img_path = osp.join(
pred_out_dir, 'preds',
f'{self.num_predicted_imgs}_panoptic_seg.png')
out_json_path = osp.join(pred_out_dir, 'preds',
f'{self.num_predicted_imgs}.json')
self.num_predicted_imgs += 1
result = {}
if 'pred_instances' in data_sample:
masks = data_sample.pred_instances.get('masks')
pred_instances = data_sample.pred_instances.numpy()
result = {
'labels': pred_instances.labels.tolist(),
'scores': pred_instances.scores.tolist()
}
if 'bboxes' in pred_instances:
result['bboxes'] = pred_instances.bboxes.tolist()
if masks is not None:
if 'bboxes' not in pred_instances or pred_instances.bboxes.sum(
) == 0:
# Fake bbox, such as the SOLO.
bboxes = mask2bbox(masks.cpu()).numpy().tolist()
result['bboxes'] = bboxes
encode_masks = encode_mask_results(pred_instances.masks)
for encode_mask in encode_masks:
if isinstance(encode_mask['counts'], bytes):
encode_mask['counts'] = encode_mask['counts'].decode()
result['masks'] = encode_masks
if 'pred_panoptic_seg' in data_sample:
if VOID is None:
raise RuntimeError(
'panopticapi is not installed, please install it by: '
'pip install git+https://github.com/cocodataset/'
'panopticapi.git.')
pan = data_sample.pred_panoptic_seg.sem_seg.cpu().numpy()[0]
pan[pan % INSTANCE_OFFSET == len(
self.model.dataset_meta['classes'])] = VOID
pan = id2rgb(pan).astype(np.uint8)
if is_save_pred:
mmcv.imwrite(pan[:, :, ::-1], out_img_path)
result['panoptic_seg_path'] = out_img_path
else:
result['panoptic_seg'] = pan
if is_save_pred:
mmengine.dump(result, out_json_path)
return result
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import warnings
from pathlib import Path
from typing import Optional, Sequence, Union
import numpy as np
import torch
import torch.nn as nn
from mmcv.ops import RoIPool
from mmcv.transforms import Compose
from mmengine.config import Config
from mmengine.dataset import default_collate
from mmengine.model.utils import revert_sync_batchnorm
from mmengine.registry import init_default_scope
from mmengine.runner import load_checkpoint
from mmdet.registry import DATASETS
from mmdet.utils import ConfigType
from ..evaluation import get_classes
from ..registry import MODELS
from ..structures import DetDataSample, SampleList
from ..utils import get_test_pipeline_cfg
def init_detector(
config: Union[str, Path, Config],
checkpoint: Optional[str] = None,
palette: str = 'none',
device: str = 'cuda:0',
cfg_options: Optional[dict] = None,
) -> nn.Module:
"""Initialize a detector from config file.
Args:
config (str, :obj:`Path`, or :obj:`mmengine.Config`): Config file path,
:obj:`Path`, or the config object.
checkpoint (str, optional): Checkpoint path. If left as None, the model
will not load any weights.
palette (str): Color palette used for visualization. If palette
is stored in checkpoint, use checkpoint's palette first, otherwise
use externally passed palette. Currently, supports 'coco', 'voc',
'citys' and 'random'. Defaults to none.
device (str): The device where the anchors will be put on.
Defaults to cuda:0.
cfg_options (dict, optional): Options to override some settings in
the used config.
Returns:
nn.Module: The constructed detector.
"""
if isinstance(config, (str, Path)):
config = Config.fromfile(config)
elif not isinstance(config, Config):
raise TypeError('config must be a filename or Config object, '
f'but got {type(config)}')
if cfg_options is not None:
config.merge_from_dict(cfg_options)
elif 'init_cfg' in config.model.backbone:
config.model.backbone.init_cfg = None
scope = config.get('default_scope', 'mmdet')
if scope is not None:
init_default_scope(config.get('default_scope', 'mmdet'))
model = MODELS.build(config.model)
model = revert_sync_batchnorm(model)
if checkpoint is None:
warnings.simplefilter('once')
warnings.warn('checkpoint is None, use COCO classes by default.')
model.dataset_meta = {'classes': get_classes('coco')}
else:
checkpoint = load_checkpoint(model, checkpoint, map_location='cpu')
# Weights converted from elsewhere may not have meta fields.
checkpoint_meta = checkpoint.get('meta', {})
# save the dataset_meta in the model for convenience
if 'dataset_meta' in checkpoint_meta:
# mmdet 3.x, all keys should be lowercase
model.dataset_meta = {
k.lower(): v
for k, v in checkpoint_meta['dataset_meta'].items()
}
elif 'CLASSES' in checkpoint_meta:
# < mmdet 3.x
classes = checkpoint_meta['CLASSES']
model.dataset_meta = {'classes': classes}
else:
warnings.simplefilter('once')
warnings.warn(
'dataset_meta or class names are not saved in the '
'checkpoint\'s meta data, use COCO classes by default.')
model.dataset_meta = {'classes': get_classes('coco')}
# Priority: args.palette -> config -> checkpoint
if palette != 'none':
model.dataset_meta['palette'] = palette
else:
test_dataset_cfg = copy.deepcopy(config.test_dataloader.dataset)
# lazy init. We only need the metainfo.
test_dataset_cfg['lazy_init'] = True
metainfo = DATASETS.build(test_dataset_cfg).metainfo
cfg_palette = metainfo.get('palette', None)
if cfg_palette is not None:
model.dataset_meta['palette'] = cfg_palette
else:
if 'palette' not in model.dataset_meta:
warnings.warn(
'palette does not exist, random is used by default. '
'You can also set the palette to customize.')
model.dataset_meta['palette'] = 'random'
model.cfg = config # save the config in the model for convenience
model.to(device)
model.eval()
return model
ImagesType = Union[str, np.ndarray, Sequence[str], Sequence[np.ndarray]]
def inference_detector(
model: nn.Module,
imgs: ImagesType,
test_pipeline: Optional[Compose] = None,
text_prompt: Optional[str] = None,
custom_entities: bool = False,
) -> Union[DetDataSample, SampleList]:
"""Inference image(s) with the detector.
Args:
model (nn.Module): The loaded detector.
imgs (str, ndarray, Sequence[str/ndarray]):
Either image files or loaded images.
test_pipeline (:obj:`Compose`): Test pipeline.
Returns:
:obj:`DetDataSample` or list[:obj:`DetDataSample`]:
If imgs is a list or tuple, the same length list type results
will be returned, otherwise return the detection results directly.
"""
if isinstance(imgs, (list, tuple)):
is_batch = True
else:
imgs = [imgs]
is_batch = False
cfg = model.cfg
if test_pipeline is None:
cfg = cfg.copy()
test_pipeline = get_test_pipeline_cfg(cfg)
if isinstance(imgs[0], np.ndarray):
# Calling this method across libraries will result
# in module unregistered error if not prefixed with mmdet.
test_pipeline[0].type = 'mmdet.LoadImageFromNDArray'
test_pipeline = Compose(test_pipeline)
if model.data_preprocessor.device.type == 'cpu':
for m in model.modules():
assert not isinstance(
m, RoIPool
), 'CPU inference with RoIPool is not supported currently.'
result_list = []
for i, img in enumerate(imgs):
# prepare data
if isinstance(img, np.ndarray):
# TODO: remove img_id.
data_ = dict(img=img, img_id=0)
else:
# TODO: remove img_id.
data_ = dict(img_path=img, img_id=0)
if text_prompt:
data_['text'] = text_prompt
data_['custom_entities'] = custom_entities
# build the data pipeline
data_ = test_pipeline(data_)
data_['inputs'] = [data_['inputs']]
data_['data_samples'] = [data_['data_samples']]
# forward the model
with torch.no_grad():
results = model.test_step(data_)[0]
result_list.append(results)
if not is_batch:
return result_list[0]
else:
return result_list
# TODO: Awaiting refactoring
async def async_inference_detector(model, imgs):
"""Async inference image(s) with the detector.
Args:
model (nn.Module): The loaded detector.
img (str | ndarray): Either image files or loaded images.
Returns:
Awaitable detection results.
"""
if not isinstance(imgs, (list, tuple)):
imgs = [imgs]
cfg = model.cfg
if isinstance(imgs[0], np.ndarray):
cfg = cfg.copy()
# set loading pipeline type
cfg.data.test.pipeline[0].type = 'LoadImageFromNDArray'
# cfg.data.test.pipeline = replace_ImageToTensor(cfg.data.test.pipeline)
test_pipeline = Compose(cfg.data.test.pipeline)
datas = []
for img in imgs:
# prepare data
if isinstance(img, np.ndarray):
# directly add img
data = dict(img=img)
else:
# add information into dict
data = dict(img_info=dict(filename=img), img_prefix=None)
# build the data pipeline
data = test_pipeline(data)
datas.append(data)
for m in model.modules():
assert not isinstance(
m,
RoIPool), 'CPU inference with RoIPool is not supported currently.'
# We don't restore `torch.is_grad_enabled()` value during concurrent
# inference since execution can overlap
torch.set_grad_enabled(False)
results = await model.aforward_test(data, rescale=True)
return results
def build_test_pipeline(cfg: ConfigType) -> ConfigType:
"""Build test_pipeline for mot/vis demo. In mot/vis infer, original
test_pipeline should remove the "LoadImageFromFile" and
"LoadTrackAnnotations".
Args:
cfg (ConfigDict): The loaded config.
Returns:
ConfigType: new test_pipeline
"""
# remove the "LoadImageFromFile" and "LoadTrackAnnotations" in pipeline
transform_broadcaster = cfg.test_dataloader.dataset.pipeline[0].copy()
for transform in transform_broadcaster['transforms']:
if transform['type'] == 'Resize':
transform_broadcaster['transforms'] = transform
pack_track_inputs = cfg.test_dataloader.dataset.pipeline[-1].copy()
test_pipeline = Compose([transform_broadcaster, pack_track_inputs])
return test_pipeline
def inference_mot(model: nn.Module, img: np.ndarray, frame_id: int,
video_len: int) -> SampleList:
"""Inference image(s) with the mot model.
Args:
model (nn.Module): The loaded mot model.
img (np.ndarray): Loaded image.
frame_id (int): frame id.
video_len (int): demo video length
Returns:
SampleList: The tracking data samples.
"""
cfg = model.cfg
data = dict(
img=[img.astype(np.float32)],
frame_id=[frame_id],
ori_shape=[img.shape[:2]],
img_id=[frame_id + 1],
ori_video_length=[video_len])
test_pipeline = build_test_pipeline(cfg)
data = test_pipeline(data)
if not next(model.parameters()).is_cuda:
for m in model.modules():
assert not isinstance(
m, RoIPool
), 'CPU inference with RoIPool is not supported currently.'
# forward the model
with torch.no_grad():
data = default_collate([data])
result = model.test_step(data)[0]
return result
def init_track_model(config: Union[str, Config],
checkpoint: Optional[str] = None,
detector: Optional[str] = None,
reid: Optional[str] = None,
device: str = 'cuda:0',
cfg_options: Optional[dict] = None) -> nn.Module:
"""Initialize a model from config file.
Args:
config (str or :obj:`mmengine.Config`): Config file path or the config
object.
checkpoint (Optional[str], optional): Checkpoint path. Defaults to
None.
detector (Optional[str], optional): Detector Checkpoint path, use in
some tracking algorithms like sort. Defaults to None.
reid (Optional[str], optional): Reid checkpoint path. use in
some tracking algorithms like sort. Defaults to None.
device (str, optional): The device that the model inferences on.
Defaults to `cuda:0`.
cfg_options (Optional[dict], optional): Options to override some
settings in the used config. Defaults to None.
Returns:
nn.Module: The constructed model.
"""
if isinstance(config, str):
config = Config.fromfile(config)
elif not isinstance(config, Config):
raise TypeError('config must be a filename or Config object, '
f'but got {type(config)}')
if cfg_options is not None:
config.merge_from_dict(cfg_options)
model = MODELS.build(config.model)
if checkpoint is not None:
checkpoint = load_checkpoint(model, checkpoint, map_location='cpu')
# Weights converted from elsewhere may not have meta fields.
checkpoint_meta = checkpoint.get('meta', {})
# save the dataset_meta in the model for convenience
if 'dataset_meta' in checkpoint_meta:
if 'CLASSES' in checkpoint_meta['dataset_meta']:
value = checkpoint_meta['dataset_meta'].pop('CLASSES')
checkpoint_meta['dataset_meta']['classes'] = value
model.dataset_meta = checkpoint_meta['dataset_meta']
if detector is not None:
assert not (checkpoint and detector), \
'Error: checkpoint and detector checkpoint cannot both exist'
load_checkpoint(model.detector, detector, map_location='cpu')
if reid is not None:
assert not (checkpoint and reid), \
'Error: checkpoint and reid checkpoint cannot both exist'
load_checkpoint(model.reid, reid, map_location='cpu')
# Some methods don't load checkpoints or checkpoints don't contain
# 'dataset_meta'
# VIS need dataset_meta, MOT don't need dataset_meta
if not hasattr(model, 'dataset_meta'):
warnings.warn('dataset_meta or class names are missed, '
'use None by default.')
model.dataset_meta = {'classes': None}
model.cfg = config # save the config in the model for convenience
model.to(device)
model.eval()
return model
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.transforms import LoadImageFromFile
from mmengine.dataset.sampler import DefaultSampler
from mmdet.datasets import AspectRatioBatchSampler, CocoDataset
from mmdet.datasets.transforms import (LoadAnnotations, PackDetInputs,
RandomFlip, Resize)
from mmdet.evaluation import CocoMetric
# dataset settings
dataset_type = CocoDataset
data_root = 'data/coco/'
# Example to use different file client
# Method 1: simply set the data root and let the file I/O module
# automatically infer from prefix (not support LMDB and Memcache yet)
# data_root = 's3://openmmlab/datasets/detection/coco/'
# Method 2: Use `backend_args`, `file_client_args` in versions before 3.0.0rc6
# backend_args = dict(
# backend='petrel',
# path_mapping=dict({
# './data/': 's3://openmmlab/datasets/detection/',
# 'data/': 's3://openmmlab/datasets/detection/'
# }))
backend_args = None
train_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=LoadAnnotations, with_bbox=True),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
dict(type=RandomFlip, prob=0.5),
dict(type=PackDetInputs)
]
test_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
# If you don't have a gt annotation, delete the pipeline
dict(type=LoadAnnotations, with_bbox=True),
dict(
type=PackDetInputs,
meta_keys=('img_id', 'img_path', 'ori_shape', 'img_shape',
'scale_factor'))
]
train_dataloader = dict(
batch_size=2,
num_workers=2,
persistent_workers=True,
sampler=dict(type=DefaultSampler, shuffle=True),
batch_sampler=dict(type=AspectRatioBatchSampler),
dataset=dict(
type=dataset_type,
data_root=data_root,
ann_file='annotations/instances_train2017.json',
data_prefix=dict(img='train2017/'),
filter_cfg=dict(filter_empty_gt=True, min_size=32),
pipeline=train_pipeline,
backend_args=backend_args))
val_dataloader = dict(
batch_size=1,
num_workers=2,
persistent_workers=True,
drop_last=False,
sampler=dict(type=DefaultSampler, shuffle=False),
dataset=dict(
type=dataset_type,
data_root=data_root,
ann_file='annotations/instances_val2017.json',
data_prefix=dict(img='val2017/'),
test_mode=True,
pipeline=test_pipeline,
backend_args=backend_args))
test_dataloader = val_dataloader
val_evaluator = dict(
type=CocoMetric,
ann_file=data_root + 'annotations/instances_val2017.json',
metric='bbox',
format_only=False,
backend_args=backend_args)
test_evaluator = val_evaluator
# inference on test dataset and
# format the output results for submission.
# test_dataloader = dict(
# batch_size=1,
# num_workers=2,
# persistent_workers=True,
# drop_last=False,
# sampler=dict(type=DefaultSampler, shuffle=False),
# dataset=dict(
# type=dataset_type,
# data_root=data_root,
# ann_file=data_root + 'annotations/image_info_test-dev2017.json',
# data_prefix=dict(img='test2017/'),
# test_mode=True,
# pipeline=test_pipeline))
# test_evaluator = dict(
# type=CocoMetric,
# metric='bbox',
# format_only=True,
# ann_file=data_root + 'annotations/image_info_test-dev2017.json',
# outfile_prefix='./work_dirs/coco_detection/test')
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.transforms.loading import LoadImageFromFile
from mmengine.dataset.sampler import DefaultSampler
from mmdet.datasets.coco import CocoDataset
from mmdet.datasets.samplers.batch_sampler import AspectRatioBatchSampler
from mmdet.datasets.transforms.formatting import PackDetInputs
from mmdet.datasets.transforms.loading import LoadAnnotations
from mmdet.datasets.transforms.transforms import RandomFlip, Resize
from mmdet.evaluation.metrics.coco_metric import CocoMetric
# dataset settings
dataset_type = 'CocoDataset'
data_root = 'data/coco/'
# Example to use different file client
# Method 1: simply set the data root and let the file I/O module
# automatically infer from prefix (not support LMDB and Memcache yet)
# data_root = 's3://openmmlab/datasets/detection/coco/'
# Method 2: Use `backend_args`, `file_client_args` in versions before 3.0.0rc6
# backend_args = dict(
# backend='petrel',
# path_mapping=dict({
# './data/': 's3://openmmlab/datasets/detection/',
# 'data/': 's3://openmmlab/datasets/detection/'
# }))
backend_args = None
train_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=LoadAnnotations, with_bbox=True, with_mask=True),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
dict(type=RandomFlip, prob=0.5),
dict(type=PackDetInputs)
]
test_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
# If you don't have a gt annotation, delete the pipeline
dict(type=LoadAnnotations, with_bbox=True, with_mask=True),
dict(
type=PackDetInputs,
meta_keys=('img_id', 'img_path', 'ori_shape', 'img_shape',
'scale_factor'))
]
train_dataloader = dict(
batch_size=2,
num_workers=2,
persistent_workers=True,
sampler=dict(type=DefaultSampler, shuffle=True),
batch_sampler=dict(type=AspectRatioBatchSampler),
dataset=dict(
type=CocoDataset,
data_root=data_root,
ann_file='annotations/instances_train2017.json',
data_prefix=dict(img='train2017/'),
filter_cfg=dict(filter_empty_gt=True, min_size=32),
pipeline=train_pipeline,
backend_args=backend_args))
val_dataloader = dict(
batch_size=1,
num_workers=2,
persistent_workers=True,
drop_last=False,
sampler=dict(type=DefaultSampler, shuffle=False),
dataset=dict(
type=CocoDataset,
data_root=data_root,
ann_file='annotations/instances_val2017.json',
data_prefix=dict(img='val2017/'),
test_mode=True,
pipeline=test_pipeline,
backend_args=backend_args))
test_dataloader = val_dataloader
val_evaluator = dict(
type=CocoMetric,
ann_file=data_root + 'annotations/instances_val2017.json',
metric=['bbox', 'segm'],
format_only=False,
backend_args=backend_args)
test_evaluator = val_evaluator
# inference on test dataset and
# format the output results for submission.
# test_dataloader = dict(
# batch_size=1,
# num_workers=2,
# persistent_workers=True,
# drop_last=False,
# sampler=dict(type=DefaultSampler, shuffle=False),
# dataset=dict(
# type=CocoDataset,
# data_root=data_root,
# ann_file=data_root + 'annotations/image_info_test-dev2017.json',
# data_prefix=dict(img='test2017/'),
# test_mode=True,
# pipeline=test_pipeline))
# test_evaluator = dict(
# type=CocoMetric,
# metric=['bbox', 'segm'],
# format_only=True,
# ann_file=data_root + 'annotations/image_info_test-dev2017.json',
# outfile_prefix='./work_dirs/coco_instance/test')
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.transforms.loading import LoadImageFromFile
from mmengine.dataset.sampler import DefaultSampler
from mmdet.datasets.coco import CocoDataset
from mmdet.datasets.samplers.batch_sampler import AspectRatioBatchSampler
from mmdet.datasets.transforms.formatting import PackDetInputs
from mmdet.datasets.transforms.loading import LoadAnnotations
from mmdet.datasets.transforms.transforms import RandomFlip, Resize
from mmdet.evaluation.metrics.coco_metric import CocoMetric
# dataset settings
dataset_type = 'CocoDataset'
data_root = 'data/coco/'
# Example to use different file client
# Method 1: simply set the data root and let the file I/O module
# automatically infer from prefix (not support LMDB and Memcache yet)
# data_root = 's3://openmmlab/datasets/detection/coco/'
# Method 2: Use `backend_args`, `file_client_args` in versions before 3.0.0rc6
# backend_args = dict(
# backend='petrel',
# path_mapping=dict({
# './data/': 's3://openmmlab/datasets/detection/',
# 'data/': 's3://openmmlab/datasets/detection/'
# }))
backend_args = None
train_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=LoadAnnotations, with_bbox=True, with_mask=True, with_seg=True),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
dict(type=RandomFlip, prob=0.5),
dict(type=PackDetInputs)
]
test_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
# If you don't have a gt annotation, delete the pipeline
dict(type=LoadAnnotations, with_bbox=True, with_mask=True, with_seg=True),
dict(
type=PackDetInputs,
meta_keys=('img_id', 'img_path', 'ori_shape', 'img_shape',
'scale_factor'))
]
train_dataloader = dict(
batch_size=2,
num_workers=2,
persistent_workers=True,
sampler=dict(type=DefaultSampler, shuffle=True),
batch_sampler=dict(type=AspectRatioBatchSampler),
dataset=dict(
type=CocoDataset,
data_root=data_root,
ann_file='annotations/instances_train2017.json',
data_prefix=dict(img='train2017/', seg='stuffthingmaps/train2017/'),
filter_cfg=dict(filter_empty_gt=True, min_size=32),
pipeline=train_pipeline,
backend_args=backend_args))
val_dataloader = dict(
batch_size=1,
num_workers=2,
persistent_workers=True,
drop_last=False,
sampler=dict(type=DefaultSampler, shuffle=False),
dataset=dict(
type=CocoDataset,
data_root=data_root,
ann_file='annotations/instances_val2017.json',
data_prefix=dict(img='val2017/'),
test_mode=True,
pipeline=test_pipeline,
backend_args=backend_args))
test_dataloader = val_dataloader
val_evaluator = dict(
type=CocoMetric,
ann_file=data_root + 'annotations/instances_val2017.json',
metric=['bbox', 'segm'],
format_only=False,
backend_args=backend_args)
test_evaluator = val_evaluator
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.transforms.loading import LoadImageFromFile
from mmengine.dataset.sampler import DefaultSampler
from mmdet.datasets.coco_panoptic import CocoPanopticDataset
from mmdet.datasets.samplers.batch_sampler import AspectRatioBatchSampler
from mmdet.datasets.transforms.formatting import PackDetInputs
from mmdet.datasets.transforms.loading import LoadPanopticAnnotations
from mmdet.datasets.transforms.transforms import RandomFlip, Resize
from mmdet.evaluation.metrics.coco_panoptic_metric import CocoPanopticMetric
# dataset settings
dataset_type = 'CocoPanopticDataset'
data_root = 'data/coco/'
# Example to use different file client
# Method 1: simply set the data root and let the file I/O module
# automatically infer from prefix (not support LMDB and Memcache yet)
# data_root = 's3://openmmlab/datasets/detection/coco/'
# Method 2: Use `backend_args`, `file_client_args` in versions before 3.0.0rc6
# backend_args = dict(
# backend='petrel',
# path_mapping=dict({
# './data/': 's3://openmmlab/datasets/detection/',
# 'data/': 's3://openmmlab/datasets/detection/'
# }))
backend_args = None
train_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=LoadPanopticAnnotations, backend_args=backend_args),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
dict(type=RandomFlip, prob=0.5),
dict(type=PackDetInputs)
]
test_pipeline = [
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=Resize, scale=(1333, 800), keep_ratio=True),
dict(type=LoadPanopticAnnotations, backend_args=backend_args),
dict(
type=PackDetInputs,
meta_keys=('img_id', 'img_path', 'ori_shape', 'img_shape',
'scale_factor'))
]
train_dataloader = dict(
batch_size=2,
num_workers=2,
persistent_workers=True,
sampler=dict(type=DefaultSampler, shuffle=True),
batch_sampler=dict(type=AspectRatioBatchSampler),
dataset=dict(
type=CocoPanopticDataset,
data_root=data_root,
ann_file='annotations/panoptic_train2017.json',
data_prefix=dict(
img='train2017/', seg='annotations/panoptic_train2017/'),
filter_cfg=dict(filter_empty_gt=True, min_size=32),
pipeline=train_pipeline,
backend_args=backend_args))
val_dataloader = dict(
batch_size=1,
num_workers=2,
persistent_workers=True,
drop_last=False,
sampler=dict(type=DefaultSampler, shuffle=False),
dataset=dict(
type=CocoPanopticDataset,
data_root=data_root,
ann_file='annotations/panoptic_val2017.json',
data_prefix=dict(img='val2017/', seg='annotations/panoptic_val2017/'),
test_mode=True,
pipeline=test_pipeline,
backend_args=backend_args))
test_dataloader = val_dataloader
val_evaluator = dict(
type=CocoPanopticMetric,
ann_file=data_root + 'annotations/panoptic_val2017.json',
seg_prefix=data_root + 'annotations/panoptic_val2017/',
backend_args=backend_args)
test_evaluator = val_evaluator
# inference on test dataset and
# format the output results for submission.
# test_dataloader = dict(
# batch_size=1,
# num_workers=1,
# persistent_workers=True,
# drop_last=False,
# sampler=dict(type=DefaultSampler, shuffle=False),
# dataset=dict(
# type=CocoPanopticDataset,
# data_root=data_root,
# ann_file='annotations/panoptic_image_info_test-dev2017.json',
# data_prefix=dict(img='test2017/'),
# test_mode=True,
# pipeline=test_pipeline))
# test_evaluator = dict(
# type=CocoPanopticMetric,
# format_only=True,
# ann_file=data_root + 'annotations/panoptic_image_info_test-dev2017.json',
# outfile_prefix='./work_dirs/coco_panoptic/test')
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.transforms import (LoadImageFromFile, RandomResize,
TransformBroadcaster)
from mmdet.datasets import MOTChallengeDataset
from mmdet.datasets.samplers import TrackImgSampler
from mmdet.datasets.transforms import (LoadTrackAnnotations, PackTrackInputs,
PhotoMetricDistortion, RandomCrop,
RandomFlip, Resize,
UniformRefFrameSample)
from mmdet.evaluation import MOTChallengeMetric
# dataset settings
dataset_type = MOTChallengeDataset
data_root = 'data/MOT17/'
img_scale = (1088, 1088)
backend_args = None
# data pipeline
train_pipeline = [
dict(
type=UniformRefFrameSample,
num_ref_imgs=1,
frame_range=10,
filter_key_img=True),
dict(
type=TransformBroadcaster,
share_random_params=True,
transforms=[
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=LoadTrackAnnotations),
dict(
type=RandomResize,
scale=img_scale,
ratio_range=(0.8, 1.2),
keep_ratio=True,
clip_object_border=False),
dict(type=PhotoMetricDistortion)
]),
dict(
type=TransformBroadcaster,
# different cropped positions for different frames
share_random_params=False,
transforms=[
dict(type=RandomCrop, crop_size=img_scale, bbox_clip_border=False)
]),
dict(
type=TransformBroadcaster,
share_random_params=True,
transforms=[
dict(type=RandomFlip, prob=0.5),
]),
dict(type=PackTrackInputs)
]
test_pipeline = [
dict(
type=TransformBroadcaster,
transforms=[
dict(type=LoadImageFromFile, backend_args=backend_args),
dict(type=Resize, scale=img_scale, keep_ratio=True),
dict(type=LoadTrackAnnotations)
]),
dict(type=PackTrackInputs)
]
# dataloader
train_dataloader = dict(
batch_size=2,
num_workers=2,
persistent_workers=True,
sampler=dict(type=TrackImgSampler), # image-based sampling
dataset=dict(
type=dataset_type,
data_root=data_root,
visibility_thr=-1,
ann_file='annotations/half-train_cocoformat.json',
data_prefix=dict(img_path='train'),
metainfo=dict(classes=('pedestrian', )),
pipeline=train_pipeline))
val_dataloader = dict(
batch_size=1,
num_workers=2,
persistent_workers=True,
# Now we support two ways to test, image_based and video_based
# if you want to use video_based sampling, you can use as follows
# sampler=dict(type='DefaultSampler', shuffle=False, round_up=False),
sampler=dict(type=TrackImgSampler), # image-based sampling
dataset=dict(
type=dataset_type,
data_root=data_root,
ann_file='annotations/half-val_cocoformat.json',
data_prefix=dict(img_path='train'),
test_mode=True,
pipeline=test_pipeline))
test_dataloader = val_dataloader
# evaluator
val_evaluator = dict(
type=MOTChallengeMetric, metric=['HOTA', 'CLEAR', 'Identity'])
test_evaluator = val_evaluator
# Copyright (c) OpenMMLab. All rights reserved.
from mmengine.hooks import (CheckpointHook, DistSamplerSeedHook, IterTimerHook,
LoggerHook, ParamSchedulerHook)
from mmengine.runner import LogProcessor
from mmengine.visualization import LocalVisBackend
from mmdet.engine.hooks import DetVisualizationHook
from mmdet.visualization import DetLocalVisualizer
default_scope = None
default_hooks = dict(
timer=dict(type=IterTimerHook),
logger=dict(type=LoggerHook, interval=50),
param_scheduler=dict(type=ParamSchedulerHook),
checkpoint=dict(type=CheckpointHook, interval=1),
sampler_seed=dict(type=DistSamplerSeedHook),
visualization=dict(type=DetVisualizationHook))
env_cfg = dict(
cudnn_benchmark=False,
mp_cfg=dict(mp_start_method='fork', opencv_num_threads=0),
dist_cfg=dict(backend='nccl'),
)
vis_backends = [dict(type=LocalVisBackend)]
visualizer = dict(
type=DetLocalVisualizer, vis_backends=vis_backends, name='visualizer')
log_processor = dict(type=LogProcessor, window_size=50, by_epoch=True)
log_level = 'INFO'
load_from = None
resume = False
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.ops import RoIAlign, nms
from torch.nn import BatchNorm2d
from mmdet.models.backbones.resnet import ResNet
from mmdet.models.data_preprocessors.data_preprocessor import \
DetDataPreprocessor
from mmdet.models.dense_heads.rpn_head import RPNHead
from mmdet.models.detectors.cascade_rcnn import CascadeRCNN
from mmdet.models.losses.cross_entropy_loss import CrossEntropyLoss
from mmdet.models.losses.smooth_l1_loss import SmoothL1Loss
from mmdet.models.necks.fpn import FPN
from mmdet.models.roi_heads.bbox_heads.convfc_bbox_head import \
Shared2FCBBoxHead
from mmdet.models.roi_heads.cascade_roi_head import CascadeRoIHead
from mmdet.models.roi_heads.mask_heads.fcn_mask_head import FCNMaskHead
from mmdet.models.roi_heads.roi_extractors.single_level_roi_extractor import \
SingleRoIExtractor
from mmdet.models.task_modules.assigners.max_iou_assigner import MaxIoUAssigner
from mmdet.models.task_modules.coders.delta_xywh_bbox_coder import \
DeltaXYWHBBoxCoder
from mmdet.models.task_modules.prior_generators.anchor_generator import \
AnchorGenerator
from mmdet.models.task_modules.samplers.random_sampler import RandomSampler
# model settings
model = dict(
type=CascadeRCNN,
data_preprocessor=dict(
type=DetDataPreprocessor,
mean=[123.675, 116.28, 103.53],
std=[58.395, 57.12, 57.375],
bgr_to_rgb=True,
pad_mask=True,
pad_size_divisor=32),
backbone=dict(
type=ResNet,
depth=50,
num_stages=4,
out_indices=(0, 1, 2, 3),
frozen_stages=1,
norm_cfg=dict(type=BatchNorm2d, requires_grad=True),
norm_eval=True,
style='pytorch',
init_cfg=dict(type='Pretrained', checkpoint='torchvision://resnet50')),
neck=dict(
type=FPN,
in_channels=[256, 512, 1024, 2048],
out_channels=256,
num_outs=5),
rpn_head=dict(
type=RPNHead,
in_channels=256,
feat_channels=256,
anchor_generator=dict(
type=AnchorGenerator,
scales=[8],
ratios=[0.5, 1.0, 2.0],
strides=[4, 8, 16, 32, 64]),
bbox_coder=dict(
type=DeltaXYWHBBoxCoder,
target_means=[.0, .0, .0, .0],
target_stds=[1.0, 1.0, 1.0, 1.0]),
loss_cls=dict(
type=CrossEntropyLoss, use_sigmoid=True, loss_weight=1.0),
loss_bbox=dict(type=SmoothL1Loss, beta=1.0 / 9.0, loss_weight=1.0)),
roi_head=dict(
type=CascadeRoIHead,
num_stages=3,
stage_loss_weights=[1, 0.5, 0.25],
bbox_roi_extractor=dict(
type=SingleRoIExtractor,
roi_layer=dict(type=RoIAlign, output_size=7, sampling_ratio=0),
out_channels=256,
featmap_strides=[4, 8, 16, 32]),
bbox_head=[
dict(
type=Shared2FCBBoxHead,
in_channels=256,
fc_out_channels=1024,
roi_feat_size=7,
num_classes=80,
bbox_coder=dict(
type=DeltaXYWHBBoxCoder,
target_means=[0., 0., 0., 0.],
target_stds=[0.1, 0.1, 0.2, 0.2]),
reg_class_agnostic=True,
loss_cls=dict(
type=CrossEntropyLoss, use_sigmoid=False, loss_weight=1.0),
loss_bbox=dict(type=SmoothL1Loss, beta=1.0, loss_weight=1.0)),
dict(
type=Shared2FCBBoxHead,
in_channels=256,
fc_out_channels=1024,
roi_feat_size=7,
num_classes=80,
bbox_coder=dict(
type=DeltaXYWHBBoxCoder,
target_means=[0., 0., 0., 0.],
target_stds=[0.05, 0.05, 0.1, 0.1]),
reg_class_agnostic=True,
loss_cls=dict(
type=CrossEntropyLoss, use_sigmoid=False, loss_weight=1.0),
loss_bbox=dict(type=SmoothL1Loss, beta=1.0, loss_weight=1.0)),
dict(
type=Shared2FCBBoxHead,
in_channels=256,
fc_out_channels=1024,
roi_feat_size=7,
num_classes=80,
bbox_coder=dict(
type=DeltaXYWHBBoxCoder,
target_means=[0., 0., 0., 0.],
target_stds=[0.033, 0.033, 0.067, 0.067]),
reg_class_agnostic=True,
loss_cls=dict(
type=CrossEntropyLoss, use_sigmoid=False, loss_weight=1.0),
loss_bbox=dict(type=SmoothL1Loss, beta=1.0, loss_weight=1.0))
],
mask_roi_extractor=dict(
type=SingleRoIExtractor,
roi_layer=dict(type=RoIAlign, output_size=14, sampling_ratio=0),
out_channels=256,
featmap_strides=[4, 8, 16, 32]),
mask_head=dict(
type=FCNMaskHead,
num_convs=4,
in_channels=256,
conv_out_channels=256,
num_classes=80,
loss_mask=dict(
type=CrossEntropyLoss, use_mask=True, loss_weight=1.0))),
# model training and testing settings
train_cfg=dict(
rpn=dict(
assigner=dict(
type=MaxIoUAssigner,
pos_iou_thr=0.7,
neg_iou_thr=0.3,
min_pos_iou=0.3,
match_low_quality=True,
ignore_iof_thr=-1),
sampler=dict(
type=RandomSampler,
num=256,
pos_fraction=0.5,
neg_pos_ub=-1,
add_gt_as_proposals=False),
allowed_border=0,
pos_weight=-1,
debug=False),
rpn_proposal=dict(
nms_pre=2000,
max_per_img=2000,
nms=dict(type=nms, iou_threshold=0.7),
min_bbox_size=0),
rcnn=[
dict(
assigner=dict(
type=MaxIoUAssigner,
pos_iou_thr=0.5,
neg_iou_thr=0.5,
min_pos_iou=0.5,
match_low_quality=False,
ignore_iof_thr=-1),
sampler=dict(
type=RandomSampler,
num=512,
pos_fraction=0.25,
neg_pos_ub=-1,
add_gt_as_proposals=True),
mask_size=28,
pos_weight=-1,
debug=False),
dict(
assigner=dict(
type=MaxIoUAssigner,
pos_iou_thr=0.6,
neg_iou_thr=0.6,
min_pos_iou=0.6,
match_low_quality=False,
ignore_iof_thr=-1),
sampler=dict(
type=RandomSampler,
num=512,
pos_fraction=0.25,
neg_pos_ub=-1,
add_gt_as_proposals=True),
mask_size=28,
pos_weight=-1,
debug=False),
dict(
assigner=dict(
type=MaxIoUAssigner,
pos_iou_thr=0.7,
neg_iou_thr=0.7,
min_pos_iou=0.7,
match_low_quality=False,
ignore_iof_thr=-1),
sampler=dict(
type=RandomSampler,
num=512,
pos_fraction=0.25,
neg_pos_ub=-1,
add_gt_as_proposals=True),
mask_size=28,
pos_weight=-1,
debug=False)
]),
test_cfg=dict(
rpn=dict(
nms_pre=1000,
max_per_img=1000,
nms=dict(type=nms, iou_threshold=0.7),
min_bbox_size=0),
rcnn=dict(
score_thr=0.05,
nms=dict(type=nms, iou_threshold=0.5),
max_per_img=100,
mask_thr_binary=0.5)))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment