Commit 481f872d authored by limm's avatar limm
Browse files

add test_mmpretrain test_mmrotate and test_mmseg

parent a17c53b8
Pipeline #2820 canceled with stages
# Copyright (c) OpenMMLab. All rights reserved.
import mmengine
import pytest
import torch
from packaging import version
from mmdeploy.codebase import import_codebase
from mmdeploy.utils import Backend, Codebase, Task
from mmdeploy.utils.test import (WrapModel, check_backend, get_model_outputs,
get_rewrite_outputs)
try:
import_codebase(Codebase.MMSEG)
except ImportError:
pytest.skip(f'{Codebase.MMSEG} is not installed.', allow_module_level=True)
from .utils import generate_datasample # noqa: E402
from .utils import generate_mmseg_deploy_config # noqa: E402
from .utils import generate_mmseg_task_processor # noqa: E402
@pytest.mark.parametrize('backend', [Backend.ONNXRUNTIME])
def test_encoderdecoder_predict(backend):
check_backend(backend)
deploy_cfg = generate_mmseg_deploy_config(backend.value)
task_processor = generate_mmseg_task_processor(deploy_cfg=deploy_cfg)
segmentor = task_processor.build_pytorch_model()
size = 256
inputs = torch.randn(1, 3, size, size)
data_samples = [generate_datasample(size, size)]
wrapped_model = WrapModel(segmentor, 'predict', data_samples=data_samples)
model_outputs = wrapped_model(inputs)[0].pred_sem_seg.data
rewrite_inputs = {
'inputs': inputs,
}
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_model=wrapped_model,
model_inputs=rewrite_inputs,
deploy_cfg=deploy_cfg)
rewrite_outputs = segmentor.postprocess_result(rewrite_outputs[0],
data_samples)
rewrite_outputs = rewrite_outputs[0].pred_sem_seg.data
assert torch.allclose(model_outputs, rewrite_outputs)
@pytest.mark.parametrize('backend', [Backend.ONNXRUNTIME])
@pytest.mark.parametrize('with_argmax,use_sigmoid', [(True, False),
(False, True)])
def test_basesegmentor_forward(backend: Backend, with_argmax: bool,
use_sigmoid: bool):
check_backend(backend)
config_path = 'tests/test_codebase/test_mmseg/data/model.py'
model_cfg = mmengine.Config.fromfile(config_path)
if use_sigmoid:
import mmseg
if version.parse(mmseg.__version__) <= version.parse('1.0.0'):
pytest.skip('ignore mmseg<=1.0.0')
model_cfg.model.decode_head.num_classes = 2
model_cfg.model.decode_head.out_channels = 1
model_cfg.model.decode_head.threshold = 0.3
deploy_cfg = generate_mmseg_deploy_config(backend.value)
deploy_cfg.codebase_config.with_argmax = with_argmax
task_processor = generate_mmseg_task_processor(
deploy_cfg=deploy_cfg, model_cfg=model_cfg)
segmentor = task_processor.build_pytorch_model()
size = 256
inputs = torch.randn(1, 3, size, size)
data_samples = [generate_datasample(size, size)]
wrapped_model = WrapModel(
segmentor, 'forward', data_samples=data_samples, mode='predict')
model_outputs = wrapped_model(inputs)[0].pred_sem_seg.data
rewrite_inputs = {
'inputs': inputs,
}
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_model=wrapped_model,
model_inputs=rewrite_inputs,
deploy_cfg=deploy_cfg)
rewrite_outputs = rewrite_outputs[0]
if rewrite_outputs.shape[1] != 1:
rewrite_outputs = rewrite_outputs.argmax(dim=1, keepdim=True)
rewrite_outputs = rewrite_outputs.squeeze(0).to(model_outputs)
assert torch.allclose(model_outputs, rewrite_outputs)
@pytest.mark.parametrize('backend', [Backend.ONNXRUNTIME])
def test_emamodule_forward(backend):
check_backend(backend)
from mmseg.models.decode_heads.ema_head import EMAModule
head = EMAModule(8, 2, 2, 1.0).eval()
deploy_cfg = mmengine.Config(
dict(
backend_config=dict(type=backend.value),
onnx_config=dict(
output_names=['result'], input_shape=(1, 8, 16, 16)),
codebase_config=dict(type='mmseg', task='Segmentation')))
feats = torch.randn(1, 8, 16, 16)
model_inputs = {'feats': feats}
with torch.no_grad():
model_outputs = get_model_outputs(head, 'forward', model_inputs)
wrapped_model = WrapModel(head, 'forward')
rewrite_outputs, is_backend_output = get_rewrite_outputs(
wrapped_model=wrapped_model,
model_inputs=model_inputs,
deploy_cfg=deploy_cfg)
if is_backend_output:
rewrite_outputs = rewrite_outputs[0]
rewrite_outputs = rewrite_outputs.to(model_outputs).reshape(
model_outputs.shape)
assert torch.allclose(
rewrite_outputs, model_outputs, rtol=1e-03, atol=1e-05)
@pytest.mark.parametrize('is_dynamic_shape', [True, False])
@pytest.mark.parametrize('backend', [Backend.ONNXRUNTIME])
def test_upconvblock_forward(backend, is_dynamic_shape):
check_backend(backend)
from mmseg.models.backbones.unet import BasicConvBlock
from mmseg.models.utils import UpConvBlock
head = UpConvBlock(BasicConvBlock, 16, 8, 8).eval()
dynamic_axes = {
'x': {
0: 'b',
2: 'h',
3: 'w'
},
'skip': {
0: 'b',
2: 'h',
3: 'w'
},
'output': {
0: 'b',
2: 'h',
3: 'w'
},
} if is_dynamic_shape else None
deploy_cfg = mmengine.Config(
dict(
backend_config=dict(type=backend.value),
onnx_config=dict(
input_names=['x', 'skip'],
output_names=['output'],
dynamic_axes=dynamic_axes),
codebase_config=dict(
type=Codebase.MMSEG.value, task=Task.SEGMENTATION.value)))
x = torch.randn(1, 16, 16, 16)
skip = torch.randn(1, 8, 32, 32)
model_inputs = {'x': x, 'skip': skip}
with torch.no_grad():
model_outputs = get_model_outputs(head, 'forward', model_inputs)
wrapped_model = WrapModel(head, 'forward')
rewrite_outputs, is_backend_output = get_rewrite_outputs(
wrapped_model=wrapped_model,
model_inputs=model_inputs,
deploy_cfg=deploy_cfg)
if is_backend_output:
rewrite_outputs = rewrite_outputs[0]
rewrite_outputs = rewrite_outputs.to(model_outputs).reshape(
model_outputs.shape)
assert torch.allclose(
rewrite_outputs, model_outputs, rtol=1e-03, atol=1e-05)
# Copyright (c) OpenMMLab. All rights reserved.
import copy
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import Any
import mmcv
import pytest
import torch
import mmdeploy.backend.onnxruntime as ort_apis
from mmdeploy.apis import build_task_processor
from mmdeploy.codebase import import_codebase
from mmdeploy.utils import Codebase, load_config
from mmdeploy.utils.test import SwitchBackendWrapper
try:
import_codebase(Codebase.MMSEG)
except ImportError:
pytest.skip(f'{Codebase.MMSEG} is not installed.', allow_module_level=True)
from .utils import generate_datasample # noqa: E402
from .utils import generate_mmseg_deploy_config # noqa: E402
model_cfg_path = 'tests/test_codebase/test_mmseg/data/model.py'
model_cfg = load_config(model_cfg_path)[0]
deploy_cfg = generate_mmseg_deploy_config()
task_processor = None
img_shape = (32, 32)
tiger_img_path = 'tests/data/tiger.jpeg'
img = mmcv.imread(tiger_img_path)
img = mmcv.imresize(img, img_shape)
@pytest.fixture(autouse=True)
def init_task_processor():
global task_processor
task_processor = build_task_processor(model_cfg, deploy_cfg, 'cpu')
@pytest.mark.parametrize('from_mmrazor', [True, False, '123', 0])
def test_build_pytorch_model(from_mmrazor: Any):
from mmseg.models.segmentors.base import BaseSegmentor
if from_mmrazor is False:
_task_processor = task_processor
else:
_model_cfg_path = 'tests/test_codebase/test_mmseg/data/' \
'mmrazor_model.py'
_model_cfg = load_config(_model_cfg_path)[0]
_model_cfg.algorithm.architecture.model.type = 'mmseg.EncoderDecoder'
_model_cfg.algorithm.distiller.teacher.type = 'mmseg.EncoderDecoder'
_deploy_cfg = copy.deepcopy(deploy_cfg)
_deploy_cfg.codebase_config['from_mmrazor'] = from_mmrazor
_task_processor = build_task_processor(_model_cfg, _deploy_cfg, 'cpu')
if not isinstance(from_mmrazor, bool):
with pytest.raises(
TypeError,
match='`from_mmrazor` attribute must be '
'boolean type! '
f'but got: {from_mmrazor}'):
_ = _task_processor.from_mmrazor
return
assert from_mmrazor == _task_processor.from_mmrazor
if from_mmrazor:
pytest.importorskip('mmrazor', reason='mmrazor is not installed.')
model = _task_processor.build_pytorch_model(None)
assert isinstance(model, BaseSegmentor)
@pytest.fixture
def backend_model():
from mmdeploy.backend.onnxruntime import ORTWrapper
ort_apis.__dict__.update({'ORTWrapper': ORTWrapper})
wrapper = SwitchBackendWrapper(ORTWrapper)
wrapper.set(outputs={
'output': torch.rand(1, 1, *img_shape),
})
yield task_processor.build_backend_model([''])
wrapper.recover()
def test_build_backend_model(backend_model):
assert isinstance(backend_model, torch.nn.Module)
def test_create_input():
img_path = 'tests/data/tiger.jpeg'
data_preprocessor = task_processor.build_data_preprocessor()
inputs = task_processor.create_input(
img_path, input_shape=img_shape, data_preprocessor=data_preprocessor)
assert isinstance(inputs, tuple) and len(inputs) == 2
def test_build_data_preprocessor():
from mmseg.models import SegDataPreProcessor
data_preprocessor = task_processor.build_data_preprocessor()
assert isinstance(data_preprocessor, SegDataPreProcessor)
def test_get_visualizer():
from mmseg.visualization import SegLocalVisualizer
tmp_dir = TemporaryDirectory().name
visualizer = task_processor.get_visualizer('ort', tmp_dir)
assert isinstance(visualizer, SegLocalVisualizer)
def test_get_tensort_from_input():
data = torch.rand(3, 4, 5)
input_data = {'inputs': data}
inputs = task_processor.get_tensor_from_input(input_data)
assert torch.equal(inputs, data)
def test_get_partition_cfg():
try:
_ = task_processor.get_partition_cfg(partition_type='')
except NotImplementedError:
pass
def test_build_dataset_and_dataloader():
from torch.utils.data import DataLoader, Dataset
val_dataloader = model_cfg['val_dataloader']
dataset = task_processor.build_dataset(
dataset_cfg=val_dataloader['dataset'])
assert isinstance(dataset, Dataset), 'Failed to build dataset'
dataloader = task_processor.build_dataloader(val_dataloader)
assert isinstance(dataloader, DataLoader), 'Failed to build dataloader'
def test_build_test_runner(backend_model):
from mmdeploy.codebase.base.runner import DeployTestRunner
temp_dir = TemporaryDirectory().name
runner = task_processor.build_test_runner(backend_model, temp_dir)
assert isinstance(runner, DeployTestRunner)
def test_visualize():
h, w = img.shape[:2]
datasample = generate_datasample(h, w)
output_file = NamedTemporaryFile(suffix='.jpg').name
task_processor.visualize(
img, datasample, output_file, show_result=False, window_name='test')
def test_get_preprocess():
process = task_processor.get_preprocess()
assert process is not None
def test_get_postprocess():
process = task_processor.get_postprocess()
assert isinstance(process, dict)
def test_get_model_name():
name = task_processor.get_model_name()
assert isinstance(name, str)
# Copyright (c) OpenMMLab. All rights reserved.
import mmengine
import numpy as np
import pytest
import torch
import mmdeploy.backend.onnxruntime as ort_apis
from mmdeploy.codebase import import_codebase
from mmdeploy.utils import Backend, Codebase
from mmdeploy.utils.test import SwitchBackendWrapper, backend_checker
try:
import_codebase(Codebase.MMSEG)
except ImportError:
pytest.skip(f'{Codebase.MMSEG} is not installed.', allow_module_level=True)
from .utils import generate_datasample # noqa: E402
from .utils import generate_mmseg_deploy_config # noqa: E402
NUM_CLASS = 19
IMAGE_SIZE = 32
@backend_checker(Backend.ONNXRUNTIME)
class TestEnd2EndModel:
@classmethod
def setup_class(cls):
# force add backend wrapper regardless of plugins
from mmdeploy.backend.onnxruntime import ORTWrapper
ort_apis.__dict__.update({'ORTWrapper': ORTWrapper})
# simplify backend inference
cls.wrapper = SwitchBackendWrapper(ORTWrapper)
cls.outputs = {
'output': torch.rand(1, 1, IMAGE_SIZE, IMAGE_SIZE),
}
cls.wrapper.set(outputs=cls.outputs)
deploy_cfg = generate_mmseg_deploy_config()
from mmdeploy.codebase.mmseg.deploy.segmentation_model import \
End2EndModel
cls.end2end_model = End2EndModel(
Backend.ONNXRUNTIME, [''], device='cpu', deploy_cfg=deploy_cfg)
@classmethod
def teardown_class(cls):
cls.wrapper.recover()
def test_forward(self):
from mmseg.structures import SegDataSample
imgs = torch.rand(1, 3, IMAGE_SIZE, IMAGE_SIZE)
data_samples = [generate_datasample(IMAGE_SIZE, IMAGE_SIZE)]
results = self.end2end_model.forward(imgs, data_samples)
assert len(results) == 1
assert isinstance(results[0], SegDataSample)
@backend_checker(Backend.RKNN)
class TestRKNNModel:
@classmethod
def setup_class(cls):
# force add backend wrapper regardless of plugins
import mmdeploy.backend.rknn as rknn_apis
from mmdeploy.backend.rknn import RKNNWrapper
rknn_apis.__dict__.update({'RKNNWrapper': RKNNWrapper})
# simplify backend inference
cls.wrapper = SwitchBackendWrapper(RKNNWrapper)
cls.outputs = [torch.rand(1, 19, IMAGE_SIZE, IMAGE_SIZE)]
cls.wrapper.set(outputs=cls.outputs)
deploy_cfg = mmengine.Config({
'onnx_config': {
'output_names': ['outputs']
},
'backend_config': {
'common_config': {}
}
})
from mmdeploy.codebase.mmseg.deploy.segmentation_model import RKNNModel
class_names = ['' for i in range(NUM_CLASS)]
palette = np.random.randint(0, 255, size=(NUM_CLASS, 3))
cls.rknn_model = RKNNModel(
Backend.RKNN, [''],
device='cpu',
class_names=class_names,
palette=palette,
deploy_cfg=deploy_cfg)
def test_forward_test(self):
imgs = torch.rand(2, 3, IMAGE_SIZE, IMAGE_SIZE)
results = self.rknn_model.forward_test(imgs)
assert isinstance(results[0], np.ndarray)
@backend_checker(Backend.ONNXRUNTIME)
def test_build_segmentation_model():
model_cfg = mmengine.Config(
dict(data=dict(test={'type': 'CityscapesDataset'})))
deploy_cfg = generate_mmseg_deploy_config()
from mmdeploy.backend.onnxruntime import ORTWrapper
ort_apis.__dict__.update({'ORTWrapper': ORTWrapper})
# simplify backend inference
with SwitchBackendWrapper(ORTWrapper) as wrapper:
wrapper.set(model_cfg=model_cfg, deploy_cfg=deploy_cfg)
from mmdeploy.codebase.mmseg.deploy.segmentation_model import (
End2EndModel, build_segmentation_model)
segmentor = build_segmentation_model([''], model_cfg, deploy_cfg,
'cpu')
assert isinstance(segmentor, End2EndModel)
# Copyright (c) OpenMMLab. All rights reserved.
import mmengine
import torch
from mmengine.structures import PixelData
from mmdeploy.apis import build_task_processor
from mmdeploy.utils import load_config
def generate_datasample(h, w):
from mmseg.structures import SegDataSample
metainfo = dict(img_shape=(h, w), ori_shape=(h, w), pad_shape=(h, w))
data_sample = SegDataSample()
data_sample.set_metainfo(metainfo)
seg_pred = torch.randint(0, 2, (1, h, w))
seg_gt = torch.randint(0, 2, (1, h, w))
data_sample.set_data(dict(pred_sem_seg=PixelData(**dict(data=seg_pred))))
data_sample.set_data(
dict(gt_sem_seg=PixelData(**dict(data=seg_gt, metainfo=metainfo))))
return data_sample
def generate_mmseg_deploy_config(backend='onnxruntime'):
deploy_cfg = mmengine.Config(
dict(
backend_config=dict(type=backend),
codebase_config=dict(
type='mmseg', task='Segmentation', with_argmax=False),
onnx_config=dict(
type='onnx',
export_params=True,
keep_initializers_as_inputs=False,
opset_version=11,
input_shape=None,
input_names=['inputs'],
output_names=['output'])))
return deploy_cfg
def generate_mmseg_task_processor(model_cfg=None, deploy_cfg=None):
if model_cfg is None:
model_cfg = 'tests/test_codebase/test_mmseg/data/model.py'
if deploy_cfg is None:
deploy_cfg = generate_mmseg_deploy_config()
model_cfg, deploy_cfg = load_config(model_cfg, deploy_cfg)
task_processor = build_task_processor(model_cfg, deploy_cfg, 'cpu')
return task_processor
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment