"launch/README.md" did not exist on "d752a1a217713e76b3cf2a307e97279855870715"
Commit 7ae93d70 authored by limm's avatar limm
Browse files

add tests part code

parent abaad570
Pipeline #2815 canceled with stages
// Copyright (c) OpenMMLab. All rights reserved.
#include "catch.hpp"
#include "mmdeploy/core/tensor.h"
#include "mmdeploy/core/utils/device_utils.h"
#include "mmdeploy/preprocess/transform/transform.h"
#include "opencv2/imgcodecs/imgcodecs.hpp"
#include "opencv_utils.h"
#include "test_resource.h"
#include "test_utils.h"
using namespace mmdeploy;
using namespace framework;
using namespace mmdeploy::test;
using namespace std;
void TestDefaultFormatBundle(const Value& cfg, const cv::Mat& mat) {
auto gResource = MMDeployTestResources::Get();
for (auto const& device_name : gResource.device_names()) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
vector<cv::Mat> channel_mats(mat.channels());
for (auto i = 0; i < mat.channels(); ++i) {
cv::extractChannel(mat, channel_mats[i], i);
}
auto res = transform->Process({{"img", cpu::CVMat2Tensor(mat)}});
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device() == device);
auto shape = res_tensor.desc().shape;
REQUIRE(shape == std::vector<int64_t>{1, mat.channels(), mat.rows, mat.cols});
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
REQUIRE(stream.Wait());
// mat's shape is {h, w, c}, while res_tensor's shape is {1, c, h, w}
// compare each channel between `res_tensor` and `mat`
// note `data_type` of `res_tensor` is `float`
auto step = shape[2] * shape[3] * sizeof(float);
auto data = host_tensor.value().data<uint8_t>();
for (auto i = 0; i < mat.channels(); ++i) {
cv::Mat _mat{mat.rows, mat.cols, CV_32FC1, data};
REQUIRE(::mmdeploy::cpu::Compare(channel_mats[i], _mat));
data += step;
}
}
}
TEST_CASE("transform DefaultFormatBundle", "[bundle]") {
auto gResource = MMDeployTestResources::Get();
auto img_list = gResource.LocateImageResources("transform");
REQUIRE(!img_list.empty());
auto img_path = img_list.front();
cv::Mat bgr_mat = cv::imread(img_path, cv::IMREAD_COLOR);
cv::Mat gray_mat = cv::imread(img_path, cv::IMREAD_GRAYSCALE);
Value cfg{{"type", "DefaultFormatBundle"}, {"keys", {"img"}}};
vector<cv::Mat> mats{bgr_mat, gray_mat};
for (auto& mat : mats) {
TestDefaultFormatBundle(cfg, mat);
}
}
// Copyright (c) OpenMMLab. All rights reserved.
#include "catch.hpp"
#include "mmdeploy/core/tensor.h"
#include "mmdeploy/core/utils/device_utils.h"
#include "mmdeploy/preprocess/transform/transform.h"
#include "opencv2/imgcodecs/imgcodecs.hpp"
#include "opencv_utils.h"
#include "test_resource.h"
#include "test_utils.h"
using namespace mmdeploy;
using namespace framework;
using namespace mmdeploy::test;
using namespace std;
void TestImage2Tensor(const Value& cfg, const cv::Mat& mat) {
auto gResource = MMDeployTestResources::Get();
for (auto const& device_name : gResource.device_names()) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
vector<cv::Mat> channel_mats(mat.channels());
for (auto i = 0; i < mat.channels(); ++i) {
cv::extractChannel(mat, channel_mats[i], i);
}
auto res = transform->Process({{"img", cpu::CVMat2Tensor(mat)}});
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device() == device);
auto shape = res_tensor.desc().shape;
REQUIRE(shape == std::vector<int64_t>{1, mat.channels(), mat.rows, mat.cols});
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
REQUIRE(stream.Wait());
// mat's shape is {h, w, c}, while res_tensor's shape is {1, c, h, w}
// compare each channel between `res_tensor` and `mat`
auto step = shape[2] * shape[3] * mat.elemSize1();
auto data = host_tensor.value().data<uint8_t>();
for (auto i = 0; i < mat.channels(); ++i) {
cv::Mat _mat{mat.rows, mat.cols, CV_MAKETYPE(mat.depth(), 1), data};
REQUIRE(::mmdeploy::cpu::Compare(channel_mats[i], _mat));
data += step;
}
}
}
TEST_CASE("transform ImageToTensor", "[img2tensor]") {
auto gResource = MMDeployTestResources::Get();
auto img_list = gResource.LocateImageResources("transform");
REQUIRE(!img_list.empty());
auto img_path = img_list.front();
cv::Mat bgr_mat = cv::imread(img_path, cv::IMREAD_COLOR);
cv::Mat gray_mat = cv::imread(img_path, cv::IMREAD_GRAYSCALE);
cv::Mat bgr_float_mat;
cv::Mat gray_float_mat;
bgr_mat.convertTo(bgr_float_mat, CV_32FC3);
gray_mat.convertTo(gray_float_mat, CV_32FC1);
Value cfg{{"type", "ImageToTensor"}, {"keys", {"img"}}};
vector<cv::Mat> mats{bgr_mat, gray_mat, bgr_float_mat, gray_float_mat};
for (auto& mat : mats) {
TestImage2Tensor(cfg, mat);
}
}
// Copyright (c) OpenMMLab. All rights reserved.
#include "catch.hpp"
#include "mmdeploy/core/mat.h"
#include "mmdeploy/core/tensor.h"
#include "mmdeploy/core/utils/device_utils.h"
#include "mmdeploy/preprocess/transform/transform.h"
#include "opencv2/imgcodecs/imgcodecs.hpp"
#include "opencv2/imgproc/imgproc.hpp"
#include "opencv_utils.h"
#include "test_resource.h"
#include "test_utils.h"
using namespace mmdeploy;
using namespace framework;
using namespace std;
using namespace mmdeploy::test;
void TestLoad(const Value& cfg, const cv::Mat& mat, PixelFormat src_format,
PixelFormat dst_format) {
auto gResource = MMDeployTestResources::Get();
for (auto const& device_name : gResource.device_names()) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
auto ref_mat = mmdeploy::cpu::CvtColor(mat, src_format, dst_format);
auto res = transform->Process({{"ori_img", cpu::CVMat2Mat(mat, PixelFormat(src_format))}});
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device() == device);
REQUIRE(Shape(res.value(), "img_shape") ==
vector<int64_t>{1, ref_mat.rows, ref_mat.cols, ref_mat.channels()});
REQUIRE(Shape(res.value(), "ori_shape") ==
vector<int64_t>{1, mat.rows, mat.cols, mat.channels()});
REQUIRE(res.value().contains("img_fields"));
REQUIRE(res.value()["img_fields"].is_array());
REQUIRE(res.value()["img_fields"].size() == 1);
REQUIRE(res.value()["img_fields"][0].get<string>() == "img");
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
REQUIRE(stream.Wait());
auto res_mat = mmdeploy::cpu::Tensor2CVMat(host_tensor.value());
REQUIRE(mmdeploy::cpu::Compare(ref_mat, res_mat));
}
}
TEST_CASE("prepare image, that is LoadImageFromFile transform", "[.load]") {
auto gResource = MMDeployTestResources::Get();
auto img_list = gResource.LocateImageResources("transform");
REQUIRE(!img_list.empty());
auto img_path = img_list.front();
cv::Mat bgr_mat = cv::imread(img_path, cv::IMREAD_COLOR);
cv::Mat gray_mat = cv::imread(img_path, cv::IMREAD_GRAYSCALE);
cv::Mat rgb_mat;
cv::Mat bgra_mat;
// TODO: make up yuv nv12/nv21 mat
cv::cvtColor(bgr_mat, rgb_mat, cv::COLOR_BGR2RGB);
cv::cvtColor(bgr_mat, bgra_mat, cv::COLOR_BGR2BGRA);
vector<pair<cv::Mat, PixelFormat>> mats{{bgr_mat, PixelFormat::kBGR},
{rgb_mat, PixelFormat::kRGB},
{gray_mat, PixelFormat::kGRAYSCALE},
{bgra_mat, PixelFormat::kBGRA}};
// pair is <color_type, to_float32>
vector<pair<std::string, bool>> conditions{
{"color", true}, {"color", false}, {"grayscale", true}, {"grayscale", false}};
for (auto& condition : conditions) {
Value cfg{{"type", "LoadImageFromFile"},
{"to_float32", condition.second},
{"color_type", condition.first}};
for (auto& mat : mats) {
TestLoad(cfg, mat.first, mat.second,
condition.first == "color" ? PixelFormat::kBGR : PixelFormat::kGRAYSCALE);
}
}
}
// Copyright (c) OpenMMLab. All rights reserved.
#include "catch.hpp"
#include "mmdeploy/core/mat.h"
#include "mmdeploy/core/utils/device_utils.h"
#include "mmdeploy/preprocess/transform/transform.h"
#include "opencv2/imgcodecs/imgcodecs.hpp"
#include "opencv2/imgproc/imgproc.hpp"
#include "opencv_utils.h"
#include "test_resource.h"
#include "test_utils.h"
using namespace mmdeploy;
using namespace framework;
using namespace mmdeploy::test;
using namespace std;
void TestNormalize(const Value &cfg, const cv::Mat &mat) {
auto gResource = MMDeployTestResources::Get();
for (auto const &device_name : gResource.device_names()) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
vector<float> mean;
vector<float> std;
for (auto &v : cfg["mean"]) {
mean.push_back(v.get<float>());
}
for (auto &v : cfg["std"]) {
std.push_back(v.get<float>());
}
bool to_rgb = cfg.value("to_rgb", false);
auto _mat = mat.clone();
auto ref_mat = mmdeploy::cpu::Normalize(_mat, mean, std, to_rgb);
auto res = transform->Process({{"img", cpu::CVMat2Tensor(mat)}});
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device() == device);
REQUIRE(res_tensor.desc().data_type == DataType::kFLOAT);
REQUIRE(ImageNormCfg(res.value(), "mean") == mean);
REQUIRE(ImageNormCfg(res.value(), "std") == std);
Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
REQUIRE(stream.Wait());
auto res_mat = mmdeploy::cpu::Tensor2CVMat(host_tensor.value());
REQUIRE(mmdeploy::cpu::Compare(ref_mat, res_mat));
}
}
TEST_CASE("transform Normalize", "[normalize]") {
auto gResource = MMDeployTestResources::Get();
auto img_list = gResource.LocateImageResources("transform");
REQUIRE(!img_list.empty());
auto img_path = img_list.front();
cv::Mat bgr_mat = cv::imread(img_path);
cv::Mat gray_mat;
cv::Mat float_bgr_mat;
cv::Mat float_gray_mat;
cv::cvtColor(bgr_mat, gray_mat, cv::COLOR_BGR2GRAY);
bgr_mat.convertTo(float_bgr_mat, CV_32FC3);
gray_mat.convertTo(float_gray_mat, CV_32FC1);
SECTION("cpu vs gpu: 3 channel mat") {
bool to_rgb = true;
Value cfg{{"type", "Normalize"},
{"mean", {123.675, 116.28, 103.53}},
{"std", {58.395, 57.12, 57.375}},
{"to_rgb", to_rgb}};
vector<cv::Mat> mats{bgr_mat, float_bgr_mat};
for (auto &mat : mats) {
TestNormalize(cfg, mat);
}
}
SECTION("cpu vs gpu: 3 channel mat, to_rgb false") {
bool to_rgb = false;
Value cfg{{"type", "Normalize"},
{"mean", {123.675, 116.28, 103.53}},
{"std", {58.395, 57.12, 57.375}},
{"to_rgb", to_rgb}};
vector<cv::Mat> mats{bgr_mat, float_bgr_mat};
for (auto &mat : mats) {
TestNormalize(cfg, mat);
}
}
SECTION("cpu vs gpu: 1 channel mat") {
bool to_rgb = true;
Value cfg{{"type", "Normalize"}, {"mean", {123.675}}, {"std", {58.395}}, {"to_rgb", to_rgb}};
vector<cv::Mat> mats{gray_mat, float_gray_mat};
for (auto &mat : mats) {
TestNormalize(cfg, mat);
}
}
}
// Copyright (c) OpenMMLab. All rights reserved.
#include "catch.hpp"
#include "mmdeploy/core/mat.h"
#include "mmdeploy/core/utils/device_utils.h"
#include "mmdeploy/preprocess/transform/transform.h"
#include "opencv2/imgcodecs/imgcodecs.hpp"
#include "opencv2/imgproc/imgproc.hpp"
#include "opencv_utils.h"
#include "test_resource.h"
#include "test_utils.h"
using namespace mmdeploy;
using namespace framework;
using namespace std;
using namespace mmdeploy::test;
// left, top, right, bottom
tuple<int, int, int, int> GetPadSize(const cv::Mat& mat, int dst_height, int dst_width) {
return {0, 0, dst_width - mat.cols, dst_height - mat.rows};
}
tuple<int, int, int, int> GetPadSize(const cv::Mat& mat, bool square = true) {
int size = std::max(mat.rows, mat.cols);
return GetPadSize(mat, size, size);
}
tuple<int, int, int, int> GetPadSize(const cv::Mat& mat, int divisor) {
auto pad_h = int(ceil(mat.rows * 1.0 / divisor)) * divisor;
auto pad_w = int(ceil(mat.cols * 1.0 / divisor)) * divisor;
return GetPadSize(mat, pad_h, pad_w);
}
void TestPad(const Value& cfg, const cv::Mat& mat, int top, int left, int bottom, int right,
int border_type, float val) {
auto gResource = MMDeployTestResources::Get();
for (auto const& device_name : gResource.device_names()) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
auto ref_mat = mmdeploy::cpu::Pad(mat, top, left, bottom, right, border_type, val);
auto res = transform->Process({{"img", cpu::CVMat2Tensor(mat)}});
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device() == device);
REQUIRE(Shape(res.value(), "pad_shape") ==
vector<int64_t>{1, ref_mat.rows, ref_mat.cols, ref_mat.channels()});
REQUIRE(Shape(res.value(), "pad_fixed_size") ==
std::vector<int64_t>{ref_mat.rows, ref_mat.cols});
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
REQUIRE(stream.Wait());
auto res_mat = mmdeploy::cpu::Tensor2CVMat(host_tensor.value());
REQUIRE(mmdeploy::cpu::Compare(ref_mat, res_mat));
}
}
TEST_CASE("transform 'Pad'", "[pad]") {
auto gResource = MMDeployTestResources::Get();
auto img_list = gResource.LocateImageResources("transform");
REQUIRE(!img_list.empty());
auto img_path = img_list.front();
cv::Mat bgr_mat = cv::imread(img_path, cv::IMREAD_COLOR);
cv::Mat gray_mat;
cv::Mat float_bgr_mat;
cv::Mat float_gray_mat;
cv::cvtColor(bgr_mat, gray_mat, cv::COLOR_BGR2GRAY);
bgr_mat.convertTo(float_bgr_mat, CV_32FC3);
gray_mat.convertTo(float_gray_mat, CV_32FC1);
vector<cv::Mat> mats{bgr_mat, gray_mat, float_bgr_mat, float_gray_mat};
vector<string> modes{"constant", "edge", "reflect", "symmetric"};
map<string, int> border_map{{"constant", cv::BORDER_CONSTANT},
{"edge", cv::BORDER_REPLICATE},
{"reflect", cv::BORDER_REFLECT_101},
{"symmetric", cv::BORDER_REFLECT}};
SECTION("pad to square") {
bool square{true};
float val = 255.0f;
for (auto& mat : mats) {
for (auto& mode : modes) {
Value cfg{
{"type", "Pad"}, {"pad_to_square", square}, {"padding_mode", mode}, {"pad_val", val}};
auto [pad_left, pad_top, pad_right, pad_bottom] = GetPadSize(mat, square);
TestPad(cfg, mat, pad_top, pad_left, pad_bottom, pad_right, border_map[mode], 255);
}
}
}
SECTION("pad with size_divisor") {
constexpr int divisor = 32;
float val = 255.0f;
for (auto& mat : mats) {
for (auto& mode : modes) {
Value cfg{
{"type", "Pad"}, {"size_divisor", divisor}, {"padding_mode", mode}, {"pad_val", val}};
auto [pad_left, pad_top, pad_right, pad_bottom] = GetPadSize(mat, divisor);
TestPad(cfg, mat, pad_top, pad_left, pad_bottom, pad_right, border_map[mode], 255);
}
}
}
SECTION("pad with size") {
constexpr int height = 600;
constexpr int width = 800;
for (auto& mat : mats) {
for (auto& mode : modes) {
Value cfg{{"type", "Pad"}, {"size", {width, height}}, {"padding_mode", mode}};
auto [pad_left, pad_top, pad_right, pad_bottom] = GetPadSize(mat, height, width);
TestPad(cfg, mat, pad_top, pad_left, pad_bottom, pad_right, border_map[mode], 0);
}
}
}
}
// Copyright (c) OpenMMLab. All rights reserved.
#include <numeric>
#include "catch.hpp"
#include "mmdeploy/core/mat.h"
#include "mmdeploy/core/tensor.h"
#include "mmdeploy/core/utils/device_utils.h"
#include "mmdeploy/operation/managed.h"
#include "mmdeploy/operation/vision.h"
#include "mmdeploy/preprocess/transform/transform.h"
#include "test_resource.h"
#include "test_utils.h"
using namespace mmdeploy;
using namespace framework;
using namespace std;
using namespace mmdeploy::test;
template <typename T>
bool CheckEqual(const Tensor& res, const vector<T>& expected) {
auto r = res.data<T>();
auto e = expected.data();
for (int i = 0; i < expected.size(); i++) {
if (r[i] != e[i]) {
return false;
}
}
return true;
}
template <typename T>
void TestPermute(const Tensor& src, const vector<int>& axes, const vector<T>& expected) {
auto gResource = MMDeployTestResources::Get();
for (auto const& device_name : gResource.device_names()) {
Device device{device_name.c_str()};
Stream stream{device};
::mmdeploy::operation::Context ctx(device, stream);
auto permute = ::mmdeploy::operation::Managed<::mmdeploy::operation::Permute>::Create();
Tensor dst;
auto ret = permute.Apply(src, dst, axes);
REQUIRE(!ret.has_error());
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(dst, kHost, stream);
REQUIRE(CheckEqual(host_tensor.value(), expected));
}
}
void TestPermuteWrongArgs(const Tensor& src) {
int sz = src.shape().size();
vector<int> oaxes(sz);
std::iota(oaxes.begin(), oaxes.end(), 0);
auto gResource = MMDeployTestResources::Get();
for (auto const& device_name : gResource.device_names()) {
Device device{device_name.c_str()};
Stream stream{device};
::mmdeploy::operation::Context ctx(device, stream);
auto permute = ::mmdeploy::operation::Managed<::mmdeploy::operation::Permute>::Create();
Tensor dst;
{
auto axes = oaxes;
axes[0]--;
auto ret = permute.Apply(src, dst, axes);
REQUIRE(ret.has_error());
}
{
auto axes = oaxes;
axes.back()++;
auto ret = permute.Apply(src, dst, axes);
REQUIRE(ret.has_error());
}
{
auto axes = oaxes;
axes[0] = axes[1];
auto ret = permute.Apply(src, dst, axes);
REQUIRE(ret.has_error());
}
}
}
TEST_CASE("operation Permute", "[permute]") {
const Device kHost{"cpu"};
const int kSize = 2 * 3 * 2 * 4;
vector<uint8_t> data(kSize);
std::iota(data.begin(), data.end(), 0); // [0, 48)
TensorDesc desc = {kHost, DataType::kINT8, {kSize}};
Tensor tensor(desc);
memcpy(tensor.data(), data.data(), data.size() * sizeof(uint8_t));
SECTION("permute: wrong axes") {
Tensor src = tensor;
src.Reshape({6, 8});
TestPermuteWrongArgs(src);
}
SECTION("permute: dims 4") {
Tensor src = tensor;
src.Reshape({2, 3, 2, 4});
vector<int> axes = {1, 0, 3, 2};
vector<uint8_t> expected = {0, 4, 1, 5, 2, 6, 3, 7, 24, 28, 25, 29, 26, 30, 27, 31,
8, 12, 9, 13, 10, 14, 11, 15, 32, 36, 33, 37, 34, 38, 35, 39,
16, 20, 17, 21, 18, 22, 19, 23, 40, 44, 41, 45, 42, 46, 43, 47};
Tensor dst(src.desc());
memcpy(dst.data(), expected.data(), data.size() * sizeof(uint8_t));
TestPermute(src, axes, expected);
}
SECTION("permute: dims 5") {
Tensor src = tensor;
src.Reshape({2, 3, 1, 2, 4});
vector<int> axes = {2, 0, 1, 4, 3};
vector<uint8_t> expected = {0, 4, 1, 5, 2, 6, 3, 7, 8, 12, 9, 13, 10, 14, 11, 15,
16, 20, 17, 21, 18, 22, 19, 23, 24, 28, 25, 29, 26, 30, 27, 31,
32, 36, 33, 37, 34, 38, 35, 39, 40, 44, 41, 45, 42, 46, 43, 47};
Tensor dst(src.desc());
memcpy(dst.data(), expected.data(), data.size() * sizeof(uint8_t));
TestPermute(src, axes, expected);
}
}
// Copyright (c) OpenMMLab. All rights reserved.
#include "catch.hpp"
#include "mmdeploy/core/mat.h"
#include "mmdeploy/core/utils/device_utils.h"
#include "mmdeploy/preprocess/transform/transform.h"
#include "opencv2/imgcodecs/imgcodecs.hpp"
#include "opencv2/imgproc/imgproc.hpp"
#include "opencv_utils.h"
#include "test_resource.h"
#include "test_utils.h"
using namespace mmdeploy;
using namespace framework;
using namespace std;
using namespace mmdeploy::test;
// return {target_height, target_width}
tuple<int, int> GetTargetSize(const cv::Mat& src, int size0, int size1) {
assert(size0 > 0);
if (size1 > 0) {
return {size0, size1};
} else {
if (src.rows < src.cols) {
return {size0, size0 * src.cols / src.rows};
} else {
return {size0 * src.rows / src.cols, size0};
}
}
}
// return {target_height, target_width}
tuple<int, int> GetTargetSize(const cv::Mat& src, int scale0, int scale1, bool keep_ratio) {
auto w = src.cols;
auto h = src.rows;
auto max_long_edge = max(scale0, scale1);
auto max_short_edge = min(scale0, scale1);
if (keep_ratio) {
auto scale_factor =
std::min(max_long_edge * 1.0 / std::max(h, w), max_short_edge * 1.0 / std::min(h, w));
return {int(h * scale_factor + 0.5f), int(w * scale_factor + 0.5f)};
} else {
return {scale0, scale1};
}
}
void TestResize(const Value& cfg, const std::string& device_name, const cv::Mat& mat,
int dst_height, int dst_width) {
if (MMDeployTestResources::Get().HasDevice(device_name)) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
auto interpolation = cfg["interpolation"].get<string>();
auto ref_mat = mmdeploy::cpu::Resize(mat, dst_height, dst_width, interpolation);
auto res = transform->Process({{"img", cpu::CVMat2Tensor(mat)}});
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device().device_id() == device.device_id());
REQUIRE(res_tensor.device().platform_id() == device.platform_id());
REQUIRE(res_tensor.device() == device);
REQUIRE(Shape(res.value(), "img_shape") ==
vector<int64_t>{1, ref_mat.rows, ref_mat.cols, ref_mat.channels()});
REQUIRE(Shape(res.value(), "img_shape") == res_tensor.desc().shape);
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
REQUIRE(stream.Wait());
auto res_mat = mmdeploy::cpu::Tensor2CVMat(host_tensor.value());
REQUIRE(mmdeploy::cpu::Compare(ref_mat, res_mat));
// cv::imwrite("ref.bmp", ref_mat);
// cv::imwrite("res.bmp", res_mat);
}
}
void TestResizeWithScale(const Value& cfg, const std::string& device_name, const cv::Mat& mat,
int scale0, int scale1, bool keep_ratio) {
if (MMDeployTestResources::Get().HasDevice(device_name)) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
auto [dst_height, dst_width] = GetTargetSize(mat, scale0, scale1, keep_ratio);
auto interpolation = cfg["interpolation"].get<string>();
auto ref_mat = mmdeploy::cpu::Resize(mat, dst_height, dst_width, interpolation);
Value input{{"img", cpu::CVMat2Tensor(mat)}, {"scale", {scale0, scale1}}};
auto res = transform->Process(input);
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device() == device);
REQUIRE(Shape(res.value(), "img_shape") ==
vector<int64_t>{1, ref_mat.rows, ref_mat.cols, ref_mat.channels()});
REQUIRE(Shape(res.value(), "img_shape") == res_tensor.desc().shape);
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
REQUIRE(stream.Wait());
auto res_mat = mmdeploy::cpu::Tensor2CVMat(host_tensor.value());
REQUIRE(mmdeploy::cpu::Compare(ref_mat, res_mat));
// cv::imwrite("ref.bmp", ref_mat);
// cv::imwrite("res.bmp", res_mat);
}
}
void TestResizeWithScaleFactor(const Value& cfg, const std::string& device_name, const cv::Mat& mat,
float scale_factor) {
if (MMDeployTestResources::Get().HasDevice(device_name)) {
Device device{device_name.c_str()};
Stream stream{device};
auto transform = CreateTransform(cfg, device, stream);
REQUIRE(transform != nullptr);
// keep round policy with resize.cpp
const int dst_height = static_cast<int>(mat.rows * scale_factor + 0.5);
const int dst_width = static_cast<int>(mat.cols * scale_factor + 0.5);
auto interpolation = cfg["interpolation"].get<string>();
auto ref_mat = mmdeploy::cpu::Resize(mat, dst_height, dst_width, interpolation);
Value input{{"img", cpu::CVMat2Tensor(mat)}, {"scale_factor", scale_factor}};
auto res = transform->Process(input);
REQUIRE(!res.has_error());
auto res_tensor = res.value()["img"].get<Tensor>();
REQUIRE(res_tensor.device() == device);
REQUIRE(Shape(res.value(), "img_shape") ==
vector<int64_t>{1, ref_mat.rows, ref_mat.cols, ref_mat.channels()});
REQUIRE(Shape(res.value(), "img_shape") == res_tensor.desc().shape);
const Device kHost{"cpu"};
auto host_tensor = MakeAvailableOnDevice(res_tensor, kHost, stream);
auto res_mat = mmdeploy::cpu::Tensor2CVMat(host_tensor.value());
REQUIRE(mmdeploy::cpu::Compare(ref_mat, res_mat));
// cv::imwrite("ref.bmp", ref_mat);
// cv::imwrite("res.bmp", res_mat);
}
}
TEST_CASE("resize transform: size", "[resize]") {
auto gResource = MMDeployTestResources::Get();
auto img_list = gResource.LocateImageResources("transform");
REQUIRE(!img_list.empty());
auto img_path = img_list.front();
cv::Mat bgr_mat = cv::imread(img_path, cv::IMREAD_COLOR);
cv::Mat gray_mat = cv::imread(img_path, cv::IMREAD_GRAYSCALE);
cv::Mat bgr_float_mat;
cv::Mat gray_float_mat;
bgr_mat.convertTo(bgr_float_mat, CV_32FC3);
gray_mat.convertTo(gray_float_mat, CV_32FC1);
vector<cv::Mat> mats{bgr_mat, gray_mat, bgr_float_mat, gray_float_mat};
vector<string> interpolations{"bilinear", "nearest", "area", "bicubic", "lanczos"};
set<string> cuda_interpolations{"bilinear", "nearest", "area"};
constexpr const char* kHost = "cpu";
SECTION("tuple size with -1") {
for (auto& mat : mats) {
auto size = std::max(mat.rows, mat.cols) + 10;
for (auto& interp : interpolations) {
Value cfg{{"type", "Resize"},
{"size", {size, -1}},
{"keep_ratio", false},
{"interpolation", interp}};
auto [dst_height, dst_width] = GetTargetSize(mat, size, -1);
TestResize(cfg, kHost, mat, dst_height, dst_width);
if (cuda_interpolations.find(interp) != cuda_interpolations.end()) {
TestResize(cfg, "cuda", mat, dst_height, dst_width);
}
}
}
}
SECTION("no need to resize") {
for (auto& mat : mats) {
auto size = std::min(mat.rows, mat.cols);
for (auto& interp : interpolations) {
Value cfg{{"type", "Resize"},
{"size", {size, -1}},
{"keep_ratio", false},
{"interpolation", interp}};
auto [dst_height, dst_width] = GetTargetSize(mat, size, -1);
TestResize(cfg, kHost, mat, dst_height, dst_width);
}
}
}
SECTION("fixed integer size") {
for (auto& mat : mats) {
constexpr int size = 224;
for (auto& interp : interpolations) {
Value cfg{
{"type", "Resize"}, {"size", size}, {"keep_ratio", false}, {"interpolation", interp}};
TestResize(cfg, kHost, mat, size, size);
if (cuda_interpolations.find(interp) != cuda_interpolations.end()) {
TestResize(cfg, "cuda", mat, size, size);
}
}
}
}
SECTION("fixed size: [1333, 800]. keep_ratio: true") {
constexpr int max_long_edge = 1333;
constexpr int max_short_edge = 800;
bool keep_ratio = true;
for (auto& mat : mats) {
for (auto& interp : interpolations) {
Value cfg{{"type", "Resize"},
{"size", {max_long_edge, max_short_edge}},
{"keep_ratio", keep_ratio},
{"interpolation", interp}};
auto [dst_height, dst_width] =
GetTargetSize(mat, max_long_edge, max_short_edge, keep_ratio);
TestResize(cfg, kHost, mat, dst_height, dst_width);
if (cuda_interpolations.find(interp) != cuda_interpolations.end()) {
TestResize(cfg, "cuda", mat, dst_height, dst_width);
}
}
}
}
SECTION("fixed size: [1333, 800]. keep_ratio: false") {
constexpr int dst_height = 800;
constexpr int dst_width = 1333;
bool keep_ratio = false;
for (auto& mat : mats) {
for (auto& interp : interpolations) {
Value cfg{{"type", "Resize"},
{"size", {dst_width, dst_height}},
{"keep_ratio", keep_ratio},
{"interpolation", interp}};
TestResize(cfg, kHost, mat, dst_height, dst_width);
if (cuda_interpolations.find(interp) != cuda_interpolations.end()) {
TestResize(cfg, "cuda", mat, dst_height, dst_width);
}
}
}
}
SECTION("fixed size: [800, 1333]. keep_ratio: true") {
constexpr int dst_height = 800;
constexpr int dst_width = 1333;
bool keep_ratio = true;
for (auto& mat : mats) {
for (auto& interp : interpolations) {
Value cfg{{"type", "Resize"},
{"size", {dst_height, dst_width}},
{"keep_ratio", keep_ratio},
{"interpolation", interp}};
TestResizeWithScale(cfg, kHost, mat, dst_height, dst_width, keep_ratio);
}
}
}
SECTION("img_scale: [800, 1333]. keep_ratio: false") {
constexpr int dst_height = 800;
constexpr int dst_width = 1333;
bool keep_ratio = false;
for (auto& mat : mats) {
for (auto& interp : interpolations) {
Value cfg{{"type", "Resize"},
{"size", {dst_height, dst_width}},
{"keep_ratio", keep_ratio},
{"interpolation", interp}};
TestResizeWithScale(cfg, kHost, mat, dst_height, dst_width, keep_ratio);
}
}
}
SECTION("scale_factor: 0.5") {
float scale_factor = 0.5;
bool keep_ratio = true;
for (auto& mat : mats) {
for (auto& interp : interpolations) {
Value cfg{{"type", "Resize"},
{"size", {600, 800}},
{"keep_ratio", keep_ratio},
{"interpolation", interp}};
TestResizeWithScaleFactor(cfg, kHost, mat, scale_factor);
}
}
}
SECTION("resize 4 channel image") {
cv::Mat mat = cv::imread(img_path, cv::IMREAD_COLOR);
cv::Mat bgra_mat;
cv::cvtColor(bgr_mat, bgra_mat, cv::COLOR_BGR2BGRA);
assert(bgra_mat.channels() == 4);
constexpr int size = 256;
auto [dst_height, dst_width] = GetTargetSize(bgra_mat, size, -1);
for (auto& device_name : gResource.device_names()) {
for (auto& interp : cuda_interpolations) {
Value cfg{{"type", "Resize"},
{"size", {size, -1}},
{"keep_ratio", false},
{"interpolation", interp}};
TestResize(cfg, device_name, bgra_mat, dst_height, dst_width);
}
}
}
}
// Copyright (c) OpenMMLab. All rights reserved.
#include "test_utils.h"
using namespace std;
namespace mmdeploy::test {
unique_ptr<Transform> CreateTransform(const Value& cfg, Device device, Stream stream) {
auto op_type = cfg.value<string>("type", "");
auto op_version = cfg.value<int>("version", -1);
try {
auto creator = gRegistry<transform::Transform>().Get(op_type, op_version);
if (creator == nullptr) {
return nullptr;
}
auto _cfg = cfg;
_cfg["context"]["device"] = device;
_cfg["context"]["stream"] = stream;
operation::Context context(device, stream);
return std::make_unique<Transform>(creator->Create(_cfg));
} catch (std::exception& e) {
cout << "exception: " << e.what() << endl;
return nullptr;
} catch (...) {
cout << "unexpected exception" << endl;
return nullptr;
}
}
vector<int64_t> Shape(const Value& value, const string& shape_key) {
vector<int64_t> shape;
for (auto& v : value[shape_key]) {
shape.push_back(v.get<int>());
}
return shape;
}
vector<float> ImageNormCfg(const Value& value, const std::string& key) {
vector<float> res;
for (auto& v : value["img_norm_cfg"][key]) {
res.push_back(v.get<float>());
}
return res;
}
Transform::Transform(std::unique_ptr<transform::Transform> transform)
: device_(operation::gContext().device()),
stream_(operation::gContext().stream()),
transform_(std::move(transform)) {}
Result<Value> Transform::Process(const Value& input) {
auto output = input;
{
operation::Context context(device_, stream_);
OUTCOME_TRY(transform_->Apply(output));
}
return output;
}
} // namespace mmdeploy::test
// Copyright (c) OpenMMLab. All rights reserved.
#ifndef MMDEPLOY_TEST_TRANSFORM_UTILS_H
#define MMDEPLOY_TEST_TRANSFORM_UTILS_H
#include "mmdeploy/core/tensor.h"
#include "mmdeploy/core/value.h"
#include "mmdeploy/preprocess/transform/transform.h"
namespace mmdeploy::test {
class Transform {
public:
explicit Transform(std::unique_ptr<transform::Transform> transform);
Result<Value> Process(const Value& input);
private:
Device device_;
Stream stream_;
std::unique_ptr<transform::Transform> transform_;
};
std::unique_ptr<Transform> CreateTransform(const Value& cfg, Device device, Stream stream);
std::vector<int64_t> Shape(const Value& value, const std::string& shape_key);
std::vector<float> ImageNormCfg(const Value& value, const std::string& key);
} // namespace mmdeploy::test
#endif // MMDEPLOY_TEST_TRANSFORM_UTILS_H
// Copyright (c) OpenMMLab. All rights reserved.
#define CATCH_CONFIG_MAIN
#include "catch.hpp"
// Copyright (c) OpenMMLab. All rights reserved.
#ifndef MMDEPLOY_TEST_RESOURCE_H
#define MMDEPLOY_TEST_RESOURCE_H
#include <algorithm>
#include <iostream>
#include <map>
#include <set>
#include <sstream>
#include <string>
#include <vector>
#include "mmdeploy/core/utils/filesystem.h"
#include "test_define.h"
using namespace std;
class MMDeployTestResources {
public:
static MMDeployTestResources &Get() {
static MMDeployTestResources resource;
return resource;
}
const std::vector<std::string> &device_names() const { return devices_; }
const std::vector<std::string> &device_names(const std::string &backend) const {
return backend_devices_.at(backend);
}
const std::vector<std::string> &backends() const { return backends_; }
const std::vector<std::string> &codebases() const { return codebases_; }
const fs::path &resource_root_path() const { return resource_root_path_; }
bool HasDevice(const std::string &name) const {
return std::any_of(devices_.begin(), devices_.end(),
[&](const std::string &device_name) { return device_name == name; });
}
bool IsDir(const fs::path &dir_name) const {
auto path = resource_root_path_ / dir_name;
return fs::is_directory(path);
}
bool IsFile(const fs::path &file_name) const {
auto path = resource_root_path_ / file_name;
return fs::is_regular_file(path);
}
public:
std::vector<std::string> LocateModelResources(const fs::path &sdk_model_zoo_dir) {
std::vector<std::string> sdk_model_list;
if (resource_root_path_.empty()) {
return sdk_model_list;
}
auto path = resource_root_path_ / sdk_model_zoo_dir;
if (!fs::is_directory(path)) {
return sdk_model_list;
}
for (auto const &dir_entry : fs::directory_iterator{path}) {
fs::directory_entry entry{dir_entry.path()};
if (auto const &_path = dir_entry.path(); fs::is_directory(_path)) {
sdk_model_list.push_back(dir_entry.path().string());
}
}
return sdk_model_list;
}
std::vector<std::string> LocateImageResources(const fs::path &img_dir) {
std::vector<std::string> img_list;
if (resource_root_path_.empty()) {
return img_list;
}
auto path = resource_root_path_ / img_dir;
if (!fs::is_directory(path)) {
return img_list;
}
set<string> extensions{".png", ".jpg", ".jpeg", ".bmp"};
for (auto const &dir_entry : fs::directory_iterator{path}) {
if (!fs::is_regular_file(dir_entry.path())) {
std::cout << dir_entry.path().string() << std::endl;
continue;
}
auto const &_path = dir_entry.path();
auto ext = _path.extension().string();
std::transform(ext.begin(), ext.end(), ext.begin(), ::tolower);
if (extensions.find(ext) != extensions.end()) {
img_list.push_back(_path.string());
}
}
return img_list;
}
private:
MMDeployTestResources() {
devices_ = Split(kDevices);
backends_ = Split(kBackends);
codebases_ = Split(kCodebases);
backend_devices_["pplnn"] = {"cpu", "cuda"};
backend_devices_["trt"] = {"cuda"};
backend_devices_["ort"] = {"cpu"};
backend_devices_["ncnn"] = {"cpu"};
backend_devices_["openvino"] = {"cpu"};
resource_root_path_ = LocateResourceRootPath(fs::current_path(), 8);
}
static std::vector<std::string> Split(const std::string &text, char delimiter = ';') {
std::vector<std::string> result;
std::istringstream ss(text);
for (std::string word; std::getline(ss, word, delimiter);) {
result.emplace_back(word);
}
return result;
}
fs::path LocateResourceRootPath(const fs::path &cur_path, int max_depth) {
if (max_depth < 0) {
return "";
}
for (auto const &dir_entry : fs::directory_iterator{cur_path}) {
fs::directory_entry entry{dir_entry.path()};
auto const &_path = dir_entry.path();
// filename must be checked before fs::is_directory, the latter will throw
// when _path points to a system file on Windows
if (_path.filename() == "mmdeploy_test_resources" && fs::is_directory(_path)) {
return _path;
}
}
// Didn't find 'mmdeploy_test_resources' in current directory.
// Move to its parent directory and keep looking for it
if (cur_path.has_parent_path()) {
return LocateResourceRootPath(cur_path.parent_path(), max_depth - 1);
} else {
return "";
}
}
private:
std::vector<std::string> devices_;
std::vector<std::string> backends_;
std::vector<std::string> codebases_;
std::map<std::string, std::vector<std::string>> backend_devices_;
fs::path resource_root_path_;
// std::string resource_root_path_;
};
#endif // MMDEPLOY_TEST_RESOURCE_H
# Copyright (c) OpenMMLab. All rights reserved.
import torch
from mmengine import Config
from mmdeploy.utils import Backend
from mmdeploy.utils.test import check_backend, get_rewrite_outputs
def test_multiheadattention_ncnn():
check_backend(Backend.NCNN)
from mmcv.cnn.bricks.transformer import MultiheadAttention
embed_dims, num_heads = 12, 2
model = MultiheadAttention(embed_dims, num_heads, batch_first=True)
query = torch.rand(1, 3, embed_dims)
deploy_cfg = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type=Backend.NCNN.value),
))
model_outputs = model(query)
rewrite_inputs = dict(query=query)
rewrite_outputs, is_backend_output = get_rewrite_outputs(
wrapped_model=model,
model_inputs=rewrite_inputs,
deploy_cfg=deploy_cfg,
run_with_backend=True)
if is_backend_output is None:
assert rewrite_outputs is not None
else:
assert torch.allclose(
model_outputs, rewrite_outputs[0], rtol=1e-03, atol=1e-05)
# Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp
import tempfile
import onnx
import pytest
import torch
from mmengine import Config
from mmdeploy.apis.onnx import export
from mmdeploy.core import RewriterContext
from mmdeploy.utils import Backend
from mmdeploy.utils.test import (WrapFunction, backend_checker, check_backend,
get_rewrite_outputs)
@pytest.mark.parametrize(
'iou_threshold, score_threshold,max_output_boxes_per_class',
[(0.6, 0.2, 3)])
def test_ONNXNMSop(iou_threshold, score_threshold, max_output_boxes_per_class):
boxes = torch.tensor([[[291.1746, 316.2263, 343.5029, 347.7312],
[288.4846, 315.0447, 343.7267, 346.5630],
[288.5307, 318.1989, 341.6425, 349.7222],
[918.9102, 83.7463, 933.3920, 164.9041],
[895.5786, 78.2361, 907.8049, 172.0883],
[292.5816, 316.5563, 340.3462, 352.9989],
[609.4592, 83.5447, 631.2532, 144.0749],
[917.7308, 85.5870, 933.2839, 168.4530],
[895.5138, 79.3596, 908.2865, 171.0418],
[291.4747, 318.6987, 347.1208, 349.5754]]])
scores = torch.rand(1, 5, 10)
from mmdeploy.mmcv.ops import ONNXNMSop
def wrapped_function(torch_bboxes, torch_scores):
return ONNXNMSop.apply(torch_bboxes, torch_scores,
max_output_boxes_per_class, iou_threshold,
score_threshold)
wrapped_model = WrapFunction(wrapped_function).eval()
result = wrapped_model(boxes, scores)
assert result is not None
onnx_file_path = tempfile.NamedTemporaryFile(suffix='.onnx').name
onnx_file_prefix = osp.splitext(onnx_file_path)[0]
export(
wrapped_model, (boxes, scores),
onnx_file_prefix,
keep_initializers_as_inputs=False,
input_names=['boxes', 'scores'],
output_names=['result'],
opset_version=11)
model = onnx.load(onnx_file_path)
assert model.graph.node[3].op_type == 'NonMaxSuppression'
def test_deform_conv_openvino():
check_backend(Backend.OPENVINO)
input = torch.Tensor([[[[1., 2., 3.], [0., 1., 2.], [3., 5., 2.]]]])
offset = torch.Tensor([[[[1.7000, 2.9000], [3.4000, 4.8000]],
[[1.1000, 2.0000], [2.1000, 1.9000]],
[[3.1000, 5.1000], [5.9000, 4.9000]],
[[2.0000, 4.1000], [4.0000, 6.6000]],
[[1.6000, 2.7000], [3.8000, 3.1000]],
[[2.5000, 4.3000], [4.2000, 5.3000]],
[[1.7000, 3.3000], [3.6000, 4.5000]],
[[1.7000, 3.4000], [5.2000, 6.1000]]]])
expected_output = torch.Tensor([[[[1.6500, 0.0000], [0.0000, 0.0000]]]])
from mmcv.ops.deform_conv import DeformConv2dFunction
def wrapped_function(input, offset):
weight = torch.Tensor([[[[0.4000, 0.2000], [0.1000, 0.9000]]]])
stride = (1, 1)
padding = (0, 0)
dilation = (1, 1)
groups = 1
deform_groups = 1
return DeformConv2dFunction.apply(input, offset, weight, stride,
padding, dilation, groups,
deform_groups)
wrapped_model = WrapFunction(wrapped_function).eval()
model_output = wrapped_model(input, offset)
assert torch.allclose(expected_output, model_output)
onnx_file_path = tempfile.NamedTemporaryFile().name
with RewriterContext({}, backend='openvino'), torch.no_grad():
torch.onnx.export(
wrapped_model, (input, offset),
onnx_file_path,
export_params=True,
keep_initializers_as_inputs=True,
input_names=['input', 'offset'],
output_names=['result'],
opset_version=11)
model = onnx.load(onnx_file_path)
assert model.graph.node[1].op_type == 'DeformableConv2D'
assert model.graph.node[1].domain == 'org.openvinotoolkit'
def test_patch_embed_ncnn():
check_backend(Backend.NCNN)
from mmcv.cnn.bricks.transformer import PatchEmbed
input = torch.ones((1, 3, 384, 384))
patch_cfg = {
'in_channels': 3,
'input_size': 384,
'embed_dims': 768,
'conv_type': 'Conv2d',
'kernel_size': 32,
'stride': 32
}
wrapped_model = PatchEmbed(**patch_cfg)
wrapped_model.eval()
with RewriterContext({}, backend='ncnn'), torch.no_grad():
_, shape = wrapped_model(input)
assert shape[0] == patch_cfg['input_size'] / patch_cfg['stride']
@backend_checker(Backend.TENSORRT)
def test_multiclass_nms_static():
from mmdeploy.mmcv.ops import multiclass_nms
deploy_cfg = Config(
dict(
onnx_config=dict(output_names=None, input_shape=None),
backend_config=dict(
type='tensorrt',
common_config=dict(
fp16_mode=False, max_workspace_size=1 << 20),
model_inputs=[
dict(
input_shapes=dict(
boxes=dict(
min_shape=[1, 5, 4],
opt_shape=[1, 5, 4],
max_shape=[1, 5, 4]),
scores=dict(
min_shape=[1, 5, 8],
opt_shape=[1, 5, 8],
max_shape=[1, 5, 8])))
]),
codebase_config=dict(
type='mmdet',
task='ObjectDetection',
post_processing=dict(
score_threshold=0.05,
iou_threshold=0.5,
max_output_boxes_per_class=20,
pre_top_k=-1,
keep_top_k=10,
background_label_id=-1,
))))
boxes = torch.rand(1, 5, 4).cuda()
scores = torch.rand(1, 5, 8).cuda()
max_output_boxes_per_class = 20
keep_top_k = 5
nms_type = 'nms'
wrapped_func = WrapFunction(
multiclass_nms,
max_output_boxes_per_class=max_output_boxes_per_class,
nms_type=nms_type,
keep_top_k=keep_top_k)
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={
'boxes': boxes,
'scores': scores
},
deploy_cfg=deploy_cfg)
assert rewrite_outputs is not None, 'Got unexpected rewrite '\
'outputs: {}'.format(rewrite_outputs)
@backend_checker(Backend.ASCEND)
def test_multiclass_nms__ascend():
from mmdeploy.mmcv.ops import multiclass_nms
deploy_cfg = Config(
dict(
onnx_config=dict(
input_names=['boxes', 'scores'],
output_names=['dets', 'labels'],
input_shape=None),
backend_config=dict(
type='ascend',
model_inputs=[
dict(input_shapes=dict(boxes=[1, 5, 4], scores=[1, 5, 8]))
]),
codebase_config=dict(
type='mmdet',
task='ObjectDetection',
post_processing=dict(
score_threshold=0.05,
iou_threshold=0.5,
max_output_boxes_per_class=20,
pre_top_k=-1,
keep_top_k=10,
background_label_id=-1,
))))
boxes = torch.rand(1, 5, 4)
scores = torch.rand(1, 5, 8)
max_output_boxes_per_class = 20
keep_top_k = 10
nms_type = 'nms'
wrapped_func = WrapFunction(
multiclass_nms,
max_output_boxes_per_class=max_output_boxes_per_class,
nms_type=nms_type,
keep_top_k=keep_top_k)
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={
'boxes': boxes,
'scores': scores
},
deploy_cfg=deploy_cfg)
assert rewrite_outputs is not None, 'Got unexpected rewrite '\
'outputs: {}'.format(rewrite_outputs)
def test_modulated_deform_conv():
check_backend(Backend.TORCHSCRIPT)
from mmdeploy.backend.torchscript import ops_available
if not ops_available():
pytest.skip('torchscript custom ops is required.')
from mmcv.ops import ModulatedDeformConv2dPack
from mmdeploy.apis.torch_jit import trace
model = ModulatedDeformConv2dPack(3, 1, 1).eval()
x = torch.rand(1, 3, 16, 16)
jit_model = trace(model, x, None, backend='torchscript')
out = model(x)
jit_out = jit_model(x)
torch.testing.assert_allclose(out, jit_out)
# Copyright (c) OpenMMLab. All rights reserved.
from .utils import TestNCNNExporter, TestOnnxRTExporter, TestTensorRTExporter
__all__ = ['TestTensorRTExporter', 'TestOnnxRTExporter', 'TestNCNNExporter']
# Copyright (c) OpenMMLab. All rights reserved.
import os
import tempfile
import numpy
import onnxruntime
import pytest
import torch
from mmdeploy.backend.onnxruntime.init_plugins import get_ops_path
from mmdeploy.mmcv.ops import ONNXNMSMatchOp
cur_dir = os.path.dirname(os.path.abspath(__file__))
boxes = torch.tensor([
[
[291.1746, 316.2263, 343.5029, 347.7312],
[288.4846, 315.0447, 343.7267, 346.5630],
[288.5307, 318.1989, 341.6425, 349.7222],
[918.9102, 83.7463, 933.3920, 164.9041],
[895.5786, 78.2361, 907.8049, 172.0883],
[292.5816, 316.5563, 340.3462, 352.9989],
[609.4592, 83.5447, 631.2532, 144.0749],
[917.7308, 85.5870, 933.2839, 168.4530],
[895.5138, 79.3596, 908.2865, 171.0418],
[291.4747, 318.6987, 347.1208, 349.5754],
],
[
[291.1746, 316.2263, 343.5029, 347.7312],
[288.4846, 315.0447, 343.7267, 346.5630],
[288.5307, 318.1989, 341.6425, 349.7222],
[918.9102, 83.7463, 933.3920, 164.9041],
[895.5786, 78.2361, 907.8049, 172.0883],
[292.5816, 316.5563, 340.3462, 352.9989],
[609.4592, 83.5447, 631.2532, 144.0749],
[917.7308, 85.5870, 933.2839, 168.4530],
[895.5138, 79.3596, 908.2865, 171.0418],
[291.4747, 318.6987, 347.1208, 349.5754],
],
])
scores = torch.tensor([
[
[0.9577, 0.9745, 0.3030, 0.6589, 0.2742],
[0.1618, 0.7963, 0.5124, 0.6964, 0.6850],
[0.8425, 0.4843, 0.9489, 0.8068, 0.7340],
[0.7337, 0.4340, 0.9923, 0.0704, 0.4506],
[0.3090, 0.5606, 0.6939, 0.3764, 0.6920],
[0.0044, 0.7986, 0.2221, 0.2782, 0.4378],
[0.7293, 0.2735, 0.8381, 0.0264, 0.6278],
[0.7144, 0.1066, 0.4125, 0.4041, 0.8819],
[0.4963, 0.7891, 0.6908, 0.1499, 0.5584],
[0.4385, 0.6035, 0.0508, 0.0662, 0.5938],
],
[
[0.9577, 0.9745, 0.3030, 0.6589, 0.2742],
[0.1618, 0.7963, 0.5124, 0.6964, 0.6850],
[0.8425, 0.4843, 0.9489, 0.8068, 0.7340],
[0.7337, 0.4340, 0.9923, 0.0704, 0.4506],
[0.3090, 0.5606, 0.6939, 0.3764, 0.6920],
[0.0044, 0.7986, 0.2221, 0.2782, 0.4378],
[0.7293, 0.2735, 0.8381, 0.0264, 0.6278],
[0.7144, 0.1066, 0.4125, 0.4041, 0.8819],
[0.4963, 0.7891, 0.6908, 0.1499, 0.5584],
[0.4385, 0.6035, 0.0508, 0.0662, 0.5938],
],
])
scores = scores.permute(0, 2, 1)
iou_threshold = torch.tensor([0.1])
score_threshold = torch.tensor([0.1])
match_op = ONNXNMSMatchOp.apply
class test_ONNX_Match(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, boxes, scores, iou_threshold, score_threshold):
return match_op(boxes, scores, iou_threshold, score_threshold)
@pytest.mark.skipif(
reason='Need to build onnxrumtime custom op',
condition=get_ops_path() == '')
def test_nms_match():
print('Running compilation...')
# here is a PyTorch test
model = test_ONNX_Match()
torch_output = model(boxes, scores, iou_threshold,
score_threshold).detach().numpy()
# export the onnx file with a tempfile
temp_onnx = tempfile.NamedTemporaryFile(
suffix='.onnx', delete=False, mode='wb', dir=cur_dir)
input_name = ['boxes', 'scores', 'iou_thr', 'score_thr']
torch.onnx.export(
model,
(boxes, scores, iou_threshold, score_threshold),
temp_onnx.name,
input_names=input_name,
)
temp_onnx.close()
options = onnxruntime.SessionOptions()
options.register_custom_ops_library(get_ops_path())
sess = onnxruntime.InferenceSession(
temp_onnx.name, options, providers=['CPUExecutionProvider'])
ort_output = sess.run(
None,
{
'boxes': boxes.numpy(),
'scores': scores.numpy(),
'iou_thr': iou_threshold.numpy(),
'score_thr': score_threshold.numpy(),
},
)
assert numpy.array_equal(
numpy.array(torch_output),
numpy.array(ort_output[0])), 'list are not equal'
os.remove(temp_onnx.name)
# Copyright (c) OpenMMLab. All rights reserved.
import tempfile
import onnx
import pytest
import torch
import torch.nn as nn
from mmengine import Config
from onnx.helper import (make_graph, make_model, make_node,
make_tensor_value_info)
from mmdeploy.core import RewriterContext
from mmdeploy.utils.test import WrapFunction, assert_allclose
from .utils import TestNCNNExporter, TestOnnxRTExporter, TestTensorRTExporter
TEST_ONNXRT = TestOnnxRTExporter()
TEST_TENSORRT = TestTensorRTExporter()
TEST_NCNN = TestNCNNExporter()
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize('pool_h,pool_w,spatial_scale,sampling_ratio',
[(2, 2, 1.0, 2), (4, 4, 2.0, 4)])
def test_roi_align(backend,
pool_h,
pool_w,
spatial_scale,
sampling_ratio,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.rand(1, 1, 16, 16, dtype=torch.float32)
single_roi = torch.tensor([[0, 0, 0, 4, 4]], dtype=torch.float32)
else:
input = torch.tensor(input_list[0], dtype=torch.float32)
single_roi = torch.tensor(input_list[1], dtype=torch.float32)
from mmcv.ops import roi_align
def wrapped_function(torch_input, torch_rois):
return roi_align(torch_input, torch_rois, (pool_w, pool_h),
spatial_scale, sampling_ratio, 'avg', True)
wrapped_model = WrapFunction(wrapped_function).eval()
with RewriterContext(
Config({'backend_config': {
'type': backend.backend_name
}}),
backend=backend.backend_name,
opset=11):
backend.run_and_validate(
wrapped_model, [input, single_roi],
'roi_align',
input_names=['input', 'rois'],
output_names=['roi_feat'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT, TEST_ONNXRT])
@pytest.mark.parametrize('mode', ['bilinear', 'nearest'])
@pytest.mark.parametrize('padding_mode', ['zeros', 'border', 'reflection'])
@pytest.mark.parametrize('align_corners', [True, False])
def test_grid_sample(backend,
mode,
padding_mode,
align_corners,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.rand(1, 1, 10, 10)
else:
input = torch.tensor(input_list[0])
grid = torch.Tensor([[[1, 0, 0], [0, 1, 0]]])
grid = nn.functional.affine_grid(
grid, (1, 1, input.shape[2] * 2, input.shape[3] * 2)).type_as(input)
def wrapped_function(inputs, grid):
return nn.functional.grid_sample(
inputs,
grid,
mode=mode,
padding_mode=padding_mode,
align_corners=align_corners)
wrapped_model = WrapFunction(wrapped_function).eval()
with RewriterContext(
Config({'backend_config': {
'type': backend.backend_name
}}),
backend=backend.backend_name,
opset=11):
backend.run_and_validate(
wrapped_model, [input, grid],
'grid_sampler',
input_names=['input', 'grid'],
output_names=['output'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize('dynamic_export', [True, False])
@pytest.mark.parametrize('mode', ['bicubic', 'nearest'])
@pytest.mark.parametrize('align_corners', [True, False])
@pytest.mark.parametrize('output_size', [[10, 20], None])
@pytest.mark.parametrize('scale_factor', [2])
@pytest.mark.parametrize('n, c, h, w', [(2, 3, 5, 10)])
def test_bicubic_interpolate(backend,
dynamic_export,
mode,
align_corners,
output_size,
scale_factor,
n,
c,
h,
w,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.randn(n, c, h, w)
if dynamic_export:
dynamic_axes = {
'input': {
0: 'n',
2: 'h',
3: 'w',
},
'output': {
0: 'n',
2: 'h',
3: 'w',
},
}
else:
dynamic_axes = None
if mode == 'nearest':
align_corners = None
if output_size is None:
resize = nn.Upsample(
scale_factor=scale_factor, mode=mode, align_corners=align_corners)
else:
resize = nn.Upsample(
size=output_size, mode=mode, align_corners=align_corners)
expected_result = resize(input).cuda()
wrapped_model = WrapFunction(resize).eval()
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
backend.run_and_validate(
wrapped_model, [input],
'bicubic_interpolate',
input_names=['input'],
dynamic_axes=dynamic_axes,
output_names=['output'],
save_dir=save_dir,
expected_result=expected_result)
@pytest.mark.parametrize('backend', [TEST_TENSORRT, TEST_ONNXRT])
@pytest.mark.parametrize('in_channels,out_channels,stride,padding,'
'dilation,groups,deform_groups,kernel_size',
[(3, 64, 1, 0, 1, 1, 1, 3),
(1, 32, 3, 2, 1, 1, 1, 3)])
@pytest.mark.parametrize('bias', [True, False])
def test_modulated_deform_conv(backend,
in_channels,
out_channels,
stride,
padding,
dilation,
groups,
deform_groups,
kernel_size,
bias,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.rand(
1, in_channels, 28, 28, requires_grad=False) # (n, c, h, w)
else:
input = torch.tensor(input_list[0])
conv_offset = nn.Conv2d(
in_channels=in_channels,
out_channels=deform_groups * 3 * kernel_size * kernel_size,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=True)
out = conv_offset(input)
o1, o2, mask = torch.chunk(out, 3, dim=1)
offset = torch.cat((o1, o2), dim=1)
mask = torch.sigmoid(mask)
from mmcv.ops import ModulatedDeformConv2d
model = ModulatedDeformConv2d(in_channels, out_channels, kernel_size,
stride, padding, dilation, groups,
deform_groups, bias).eval()
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
backend.run_and_validate(
model, [input, offset, mask],
'modulated_deform_conv',
input_names=['input', 'offset', 'mask'],
output_names=['output'],
tolerate_small_mismatch=True,
save_dir=save_dir)
@pytest.mark.parametrize('in_channels,out_channels,stride,padding,'
'dilation,groups,deform_groups,kernel_size',
[(1, 32, 3, 2, 1, 1, 1, 3)])
def test_deform_conv(in_channels, out_channels, stride, padding, dilation,
groups, deform_groups, kernel_size):
inputs = torch.rand(
1, in_channels, 28, 28, requires_grad=False) # (n, c, h, w)
conv_offset = nn.Conv2d(
in_channels=in_channels,
out_channels=deform_groups * 2 * kernel_size * kernel_size,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=True)
offsets = conv_offset(inputs)
from mmcv.ops import DeformConv2d
model = DeformConv2d(in_channels, out_channels, kernel_size, stride,
padding, dilation, groups, deform_groups).eval()
onnx_file = tempfile.NamedTemporaryFile(suffix='.onnx').name
with RewriterContext(cfg={}, backend='tensorrt', opset=11):
with torch.no_grad():
torch.onnx.export(
model, (inputs, offsets),
onnx_file,
export_params=True,
keep_initializers_as_inputs=True,
opset_version=11)
model = onnx.load(onnx_file)
node = list(model.graph.node)[0]
assert node.domain == 'mmdeploy'
assert node.op_type == 'MMCVDeformConv2d'
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize('dynamic_export', [True, False])
@pytest.mark.parametrize('fp16_mode', [True, False])
@pytest.mark.parametrize('n, c, h, w', [(2, 3, 10, 10)])
def test_instance_norm(backend,
dynamic_export,
fp16_mode,
n,
c,
h,
w,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.randn(n, c, h, w)
if dynamic_export:
dynamic_axes = {
'input': {
0: 'n',
2: 'h',
3: 'w',
},
'output': {
0: 'n',
2: 'h',
3: 'w',
},
}
else:
dynamic_axes = None
wrapped_model = nn.InstanceNorm2d(c, affine=True).eval().cuda()
cudnn_enable = torch.backends.cudnn.enabled
torch.backends.cudnn.enabled = False
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
backend.run_and_validate(
wrapped_model, [input],
'instance_norm',
input_names=['input'],
dynamic_axes=dynamic_axes,
output_names=['output'],
save_dir=save_dir)
torch.backends.cudnn.enabled = cudnn_enable
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize('num_classes,pre_topk,after_topk,iou_threshold,'
'score_threshold,background_label_id',
[(5, 6, 3, 0.7, 0.1, -1)])
def test_batched_nms(backend,
num_classes,
pre_topk,
after_topk,
iou_threshold,
score_threshold,
background_label_id,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
nms_boxes = torch.tensor([[[291.1746, 316.2263, 343.5029, 347.7312],
[288.4846, 315.0447, 343.7267, 346.5630],
[288.5307, 318.1989, 341.6425, 349.7222],
[918.9102, 83.7463, 933.3920, 164.9041],
[895.5786, 78.2361, 907.8049, 172.0883],
[292.5816, 316.5563, 340.3462, 352.9989],
[609.4592, 83.5447, 631.2532, 144.0749],
[917.7308, 85.5870, 933.2839, 168.4530],
[895.5138, 79.3596, 908.2865, 171.0418],
[291.4747, 318.6987, 347.1208, 349.5754]]])
scores = torch.tensor([[[0.9577, 0.9745, 0.3030, 0.6589, 0.2742],
[0.1618, 0.7963, 0.5124, 0.6964, 0.6850],
[0.8425, 0.4843, 0.9489, 0.8068, 0.7340],
[0.7337, 0.4340, 0.9923, 0.0704, 0.4506],
[0.3090, 0.5606, 0.6939, 0.3764, 0.6920],
[0.0044, 0.7986, 0.2221, 0.2782, 0.4378],
[0.7293, 0.2735, 0.8381, 0.0264, 0.6278],
[0.7144, 0.1066, 0.4125, 0.4041, 0.8819],
[0.4963, 0.7891, 0.6908, 0.1499, 0.5584],
[0.4385, 0.6035, 0.0508, 0.0662, 0.5938]]])
else:
nms_boxes = torch.tensor(input_list[0], dtype=torch.float32)
scores = torch.tensor(input_list[1], dtype=torch.float32)
from mmdeploy.mmcv.ops.nms import _multiclass_nms
expected_result = _multiclass_nms(
nms_boxes,
scores,
iou_threshold=iou_threshold,
score_threshold=score_threshold,
pre_top_k=pre_topk + 1,
keep_top_k=after_topk + 1)
expected_result = (expected_result[0][:,
0:-1, :], expected_result[1][:,
0:-1])
boxes = nms_boxes.unsqueeze(2).tile(num_classes, 1)
from mmdeploy.mmcv.ops.nms import TRTBatchedNMSop
batched_nms = TRTBatchedNMSop.apply
def wrapped_function(boxes, scores):
return batched_nms(boxes, scores, num_classes, pre_topk, after_topk,
iou_threshold, score_threshold, background_label_id)
wrapped_model = WrapFunction(wrapped_function)
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
backend.run_and_validate(
wrapped_model, [boxes, scores],
'batched_nms',
input_names=['boxes', 'scores'],
output_names=['batched_nms_bboxes', 'inds'],
expected_result=expected_result,
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize('num_classes,pre_topk,after_topk,iou_threshold,'
'score_threshold,background_label_id',
[(5, 6, 3, 0.7, 0.1, -1)])
def test_batched_rotated_nms(backend,
num_classes,
pre_topk,
after_topk,
iou_threshold,
score_threshold,
background_label_id,
input_list=None,
save_dir=None):
backend.check_env()
pytest.importorskip('mmrotate', reason='mmrorate is not installed.')
if input_list is None:
nms_boxes = torch.tensor(
[[[291.1746, 316.2263, 343.5029, 347.7312, 1.],
[288.4846, 315.0447, 343.7267, 346.5630, 2.],
[288.5307, 318.1989, 341.6425, 349.7222, 3.],
[918.9102, 83.7463, 933.3920, 164.9041, 4.],
[895.5786, 78.2361, 907.8049, 172.0883, 5.],
[292.5816, 316.5563, 340.3462, 352.9989, 6.],
[609.4592, 83.5447, 631.2532, 144.0749, 7.],
[917.7308, 85.5870, 933.2839, 168.4530, 8.],
[895.5138, 79.3596, 908.2865, 171.0418, 9.],
[291.4747, 318.6987, 347.1208, 349.5754, 10.]]])
scores = torch.tensor([[[0.9577, 0.9745, 0.3030, 0.6589, 0.2742],
[0.1618, 0.7963, 0.5124, 0.6964, 0.6850],
[0.8425, 0.4843, 0.9489, 0.8068, 0.7340],
[0.7337, 0.4340, 0.9923, 0.0704, 0.4506],
[0.3090, 0.5606, 0.6939, 0.3764, 0.6920],
[0.0044, 0.7986, 0.2221, 0.2782, 0.4378],
[0.7293, 0.2735, 0.8381, 0.0264, 0.6278],
[0.7144, 0.1066, 0.4125, 0.4041, 0.8819],
[0.4963, 0.7891, 0.6908, 0.1499, 0.5584],
[0.4385, 0.6035, 0.0508, 0.0662, 0.5938]]])
else:
nms_boxes = torch.tensor(input_list[0], dtype=torch.float32)
scores = torch.tensor(input_list[1], dtype=torch.float32)
from mmdeploy.mmcv.ops.nms_rotated import _multiclass_nms_rotated
expected_result = _multiclass_nms_rotated(
nms_boxes,
scores,
iou_threshold=iou_threshold,
score_threshold=score_threshold,
pre_top_k=pre_topk + 1,
keep_top_k=after_topk + 1)
expected_result = (expected_result[0][:,
0:-1, :], expected_result[1][:,
0:-1])
boxes = nms_boxes.unsqueeze(2).tile(num_classes, 1)
from mmdeploy.mmcv.ops.nms_rotated import TRTBatchedRotatedNMSop
batched_rotated_nms = TRTBatchedRotatedNMSop.apply
def wrapped_function(boxes, scores):
return batched_rotated_nms(boxes, scores, num_classes, pre_topk,
after_topk, iou_threshold, score_threshold,
background_label_id)
wrapped_model = WrapFunction(wrapped_function)
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
backend.run_and_validate(
wrapped_model, [boxes, scores],
'batched_rotated_nms',
input_names=['boxes', 'scores'],
output_names=['batched_rotated_nms_bboxes', 'inds'],
expected_result=expected_result,
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize(
'out_size, pool_mode, sampling_ratio,roi_scale_factor,'
' finest_scale,featmap_strides, aligned',
[(tuple([2, 2]), 0, 2, 1.0, 2, list([2.0, 4.0]), 1),
(tuple([2, 2]), 1, 2, 1.0, 2, list([2.0, 4.0]), 1)])
def test_multi_level_roi_align(backend,
out_size,
pool_mode,
sampling_ratio,
roi_scale_factor,
finest_scale,
featmap_strides,
aligned,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = [
torch.tensor([[[[0.3014, 0.7334, 0.6502, 0.1689],
[0.3031, 0.3735, 0.6032, 0.1644],
[0.0393, 0.4415, 0.3858, 0.2657],
[0.5766, 0.0211, 0.6384, 0.0016]],
[[0.0811, 0.6255, 0.0247, 0.3471],
[0.1390, 0.9298, 0.6178, 0.6636],
[0.2243, 0.2024, 0.2366, 0.3660],
[0.1050, 0.2301, 0.7489, 0.7506]],
[[0.3868, 0.1706, 0.2390, 0.8494],
[0.2643, 0.9347, 0.0412, 0.5790],
[0.6202, 0.0682, 0.0390, 0.5296],
[0.5383, 0.1221, 0.6344, 0.1514]]]]),
torch.tensor([[[[0.1939, 0.9983, 0.4031, 0.2712],
[0.7929, 0.1504, 0.0946, 0.5030],
[0.1421, 0.7908, 0.9595, 0.4198],
[0.6880, 0.4722, 0.9896, 0.2266]],
[[0.0778, 0.4232, 0.0736, 0.0168],
[0.2887, 0.8461, 0.1140, 0.9582],
[0.5169, 0.4924, 0.8275, 0.5530],
[0.8961, 0.7466, 0.5976, 0.3760]],
[[0.1542, 0.5028, 0.8412, 0.6617],
[0.3751, 0.2798, 0.3835, 0.8640],
[0.5821, 0.6588, 0.1324, 0.7619],
[0.9178, 0.7282, 0.0291, 0.3028]]]])
]
rois = torch.tensor([[0., 0., 0., 4., 4.]])
if pool_mode == 1:
expected_result = torch.tensor([[[[0.1939, 0.3950],
[0.3437, 0.4543]],
[[0.0778, 0.1641],
[0.1305, 0.2301]],
[[0.1542, 0.2413],
[0.2094, 0.2688]]]])
else:
expected_result = torch.tensor([[[[0.1939, 0.4956],
[0.4185, 0.5167]],
[[0.0778, 0.2073],
[0.1569, 0.3162]],
[[0.1542, 0.2849],
[0.2370, 0.3053]]]])
else:
input = input_list[0]
rois = input_list[1]
expected_result = input_list[2]
input_name = [('input_' + str(i)) for i in range(len(featmap_strides))]
input_name.insert(0, 'rois')
inputs = [
onnx.helper.make_tensor_value_info(
input_name[i + 1], onnx.TensorProto.FLOAT, shape=input[i].shape)
for i in range(len(input_name) - 1)
]
inputs.append(
onnx.helper.make_tensor_value_info(
'rois', onnx.TensorProto.FLOAT, shape=rois.shape))
outputs = [
onnx.helper.make_tensor_value_info(
'bbox_feats', onnx.TensorProto.FLOAT, shape=expected_result.shape)
]
node = onnx.helper.make_node(
'MMCVMultiLevelRoiAlign',
input_name, ['bbox_feats'],
'MMCVMultiLevelRoiAlign_0',
None,
'mmdeploy',
pool_mode=pool_mode,
aligned=aligned,
featmap_strides=featmap_strides,
finest_scale=finest_scale,
output_height=out_size[0],
output_width=out_size[1],
roi_scale_factor=roi_scale_factor,
sampling_ratio=sampling_ratio)
graph = onnx.helper.make_graph([node], 'torch-jit-export', inputs, outputs)
onnx_model = onnx.helper.make_model(
graph, producer_name='pytorch', producer_version='1.8')
onnx_model.opset_import[0].version = 11
onnx_model.opset_import.append(
onnx.onnx_ml_pb2.OperatorSetIdProto(domain='mmdeploy', version=1))
backend.run_and_validate(
onnx_model, [rois, *input],
'multi_level_roi_align',
input_names=input_name,
output_names=['bbox_feats'],
expected_result=expected_result,
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_NCNN])
@pytest.mark.parametrize('k', [1, 3, 5])
@pytest.mark.parametrize('dim', [1, 2, 3])
@pytest.mark.parametrize('largest', [True, False])
@pytest.mark.parametrize('sorted', [True, False])
def test_topk(backend,
k,
dim,
largest,
sorted,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.rand(1, 8, 12, 17)
else:
input = input_list[0]
assert input.shape[0] == 1, (f'ncnn batch must be 1, \
but got {input.shape[0]}')
def topk_function(inputs):
return torch.Tensor.topk(inputs, k, dim, largest, sorted)
wrapped_model = WrapFunction(topk_function)
# when the 'sorted' attribute is False, pytorch will return
# a hard to expect result, which only features that the topk
# number is right. So the Topk unittest only check whether the
# topk elements are right, all the possible order will be accepted.
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
if not sorted:
backend.run_and_validate(
wrapped_model, [input.float()],
'topk' + f'_no_sorted_dim_{dim}',
input_names=['inputs'],
output_names=['data', 'index'],
save_dir=save_dir)
else:
backend.run_and_validate(
wrapped_model, [input.float()],
'topk',
input_names=['inputs'],
output_names=['data', 'index'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_NCNN])
@pytest.mark.parametrize('dim, n, c, h, w', [(1, 1, 1, 1, 8), (2, 1, 1, 5, 7),
(3, 1, 3, 10, 15)])
def test_shape(backend,
dim,
n,
c,
h,
w,
input_names=['input'],
output_names=['output'],
tolerate_small_mismatch=False,
input_list=None,
save_dir=None):
backend.check_env()
orig_shape = (n, c, h, w)[-dim - 1:]
if input_list is None:
input = torch.rand(orig_shape)
else:
input = input_list[0]
assert input.dim() == dim + 1, 'input.dim() must equal to dim + 1'
assert tuple(input.shape) == orig_shape, 'input.shape must the \
same as orig_shape'
assert input.shape[0] == 1, (f'ncnn batch must be 1, \
but got {input.shape[0]}')
shape_node = make_node('Shape', input_names, output_names)
assert len(input_names) == 1, 'length of input_names must be 1'
assert len(output_names) == 1, 'length of output_names must be 1'
shape_graph = make_graph([shape_node], 'shape_graph', [
make_tensor_value_info(input_names[0], onnx.TensorProto.FLOAT,
orig_shape)
], [
make_tensor_value_info(output_names[0], onnx.TensorProto.FLOAT,
(dim + 1, ))
])
shape_model = make_model(shape_graph)
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
ncnn_model = backend.onnx2ncnn(shape_model, 'shape', output_names,
save_dir)
# ncnn mat has implicit batch for mat, the ncnn_output is a mat,
# so the ncnn_outputs has 2 dimensions, not 1.
model_outputs = [torch.tensor(orig_shape).unsqueeze(0).float()]
ncnn_outputs = ncnn_model(dict(zip(input_names, [input])))
ncnn_outputs = [ncnn_outputs[name] for name in output_names]
assert_allclose(model_outputs, ncnn_outputs, tolerate_small_mismatch)
@pytest.mark.parametrize('backend', [TEST_NCNN])
@pytest.mark.parametrize('dim, n, c, h, w', [(1, 1, 1, 1, 8), (2, 1, 1, 5, 7),
(3, 1, 3, 10, 15)])
@pytest.mark.parametrize('val', [0., 1., -3, 4.25])
def test_constantofshape(backend,
dim,
n,
c,
h,
w,
val,
input_names=['input'],
output_names=['output'],
tolerate_small_mismatch=False,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.tensor((n, c, h, w)[-dim - 1:]).unsqueeze(0)
else:
input = input_list[0]
assert input.dim() == dim + 1, 'input.dim() must equal to dim + 1'
assert tuple(input.shape) == (n, c, h,
w)[-dim - 1:], 'input.shape must the \
same as orig_shape'
assert input.shape[0] == 1, (f'ncnn input batch must be 1, \
got {input.shape[0]}')
assert input[0][0] == 1, (f'ncnn output mat batch must be 1, \
got {input[0][0]}')
constantofshape_node = make_node(
'ConstantOfShape', input_names, output_names, value=float(val))
assert len(input_names) == 1, 'length of input_names must be 1'
assert len(output_names) == 1, 'length of output_names must be 1'
constantofshape_graph = make_graph(
[constantofshape_node], 'constantofshape_graph', [
make_tensor_value_info(input_names[0], onnx.TensorProto.FLOAT,
input.shape)
], [
make_tensor_value_info(output_names[0], onnx.TensorProto.FLOAT,
torch.Size(input[0]))
])
constantofshape_model = make_model(constantofshape_graph)
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
ncnn_model = backend.onnx2ncnn(constantofshape_model,
'constantofshape', output_names,
save_dir)
# ncnn mat has implicit batch for mat, the ncnn_output is a mat,
# so the ncnn_outputs has 2 dimensions, not 1.
model_outputs = [torch.fill_(torch.rand(tuple(input[0])), val)]
ncnn_outputs = ncnn_model(dict(zip(input_names, [input.float()])))
ncnn_outputs = [ncnn_outputs[name] for name in output_names]
assert_allclose(model_outputs, ncnn_outputs, tolerate_small_mismatch)
@pytest.mark.parametrize('backend', [TEST_NCNN])
@pytest.mark.parametrize('axis, data_dims, indice_dims', [(0, 1, 1), (0, 2, 1),
(1, 2, 1), (0, 3, 1),
(1, 3, 1),
(2, 3, 1)])
def test_gather(backend,
axis,
data_dims,
indice_dims,
input_names=['input', 'indices'],
output_names=['output'],
tolerate_small_mismatch=False,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
# the real data dims is data_dims + 1
data = torch.rand((8, 12, 17)[-data_dims:]).unsqueeze(0)
indice = torch.randint(0, 8, (3, 4, 5)[-indice_dims:]).unsqueeze(0)
else:
data = input_list[0]
indice = input_list[1]
assert data.shape[0] == 1, ('ncnn batch must be 1,'
f'but got {data.shape[0]}')
assert indice.shape[0] == 1, ('ncnn batch must be 1,'
f'but got {indice.shape[0]}')
gather_node = make_node('Gather', input_names, output_names, axis=axis + 1)
gather_graph = make_graph([gather_node], 'gather_graph', [
make_tensor_value_info(input_names[0], onnx.TensorProto.FLOAT, None),
make_tensor_value_info(input_names[1], onnx.TensorProto.INT64, None)
], [make_tensor_value_info(output_names[0], onnx.TensorProto.FLOAT, None)])
opset_imports = [onnx.helper.make_operatorsetid('', 11)]
gather_model = make_model(gather_graph, opset_imports=opset_imports)
gather_model.ir_version = 7
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
ncnn_model = backend.onnx2ncnn(gather_model, 'gather', output_names,
save_dir)
# ncnn mat has implicit batch for mat, the ncnn_output is a mat,
# so the ncnn_outputs has 2 dimensions, not 1.
import importlib
assert importlib.util.find_spec('onnxruntime') is not None, 'onnxruntime \
not installed.'
from mmdeploy.backend.onnxruntime import ORTWrapper
ort_model = ORTWrapper(
gather_model.SerializeToString(),
device='cpu',
output_names=output_names)
model_outputs = ort_model(dict(zip(input_names, [data, indice[0]])))
model_outputs = ort_model.output_to_list(model_outputs)
ncnn_outputs = ncnn_model(
dict(zip(input_names, [data.float(), indice.float()])))
ncnn_outputs = [ncnn_outputs[name] for name in output_names]
assert_allclose(model_outputs, ncnn_outputs, tolerate_small_mismatch)
@pytest.mark.parametrize('backend', [TEST_NCNN])
@pytest.mark.parametrize('dim', [1, 2, 3])
def test_tensorslice(backend, dim, input_list=None, save_dir=None):
backend.check_env()
if input_list is None:
input = torch.rand((8, 12, 17)[-dim:]).unsqueeze(0)
else:
input = input_list[0]
assert input.dim() == dim + 1, f'input.dim() must equal to \
dim + 1, expected: {dim + 1}, got: {input.dim()}'
assert input.shape[0] == 1, (f'ncnn batch must be 1, \
but got {input.shape[0]}')
def tensorslice_function(inputs):
if dim == 1:
return inputs[:, 2:17:7]
if dim == 2:
return inputs[:, 3:12:4, 2:15:3]
if dim == 3:
return inputs[:, 0:8:2, 2:12:4, 2:17:7]
wrapped_model = WrapFunction(tensorslice_function)
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
backend.run_and_validate(
wrapped_model, [input.float()],
'tensorslice',
input_names=['inputs'],
output_names=['outputs'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_NCNN])
@pytest.mark.parametrize('input_dim, output_dim', [(1, 1), (1, 2), (1, 3),
(2, 2), (2, 3), (3, 3)])
def test_expand(backend,
input_dim,
output_dim,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
input = torch.rand((1, 12, 1)[-input_dim:]).unsqueeze(0)
target = torch.rand((8, 12, 17)[-output_dim:]).unsqueeze(0)
else:
input = input_list[0]
target = input_list[1]
assert input.shape[0] == 1, (f'ncnn batch must be 1, \
but not {input.shape[0]}')
assert target.shape[0] == 1, (f'ncnn batch must be 1, \
but not {target.shape[0]}')
def expand_function(input, target):
return input.expand_as(target)
wrapped_model = WrapFunction(expand_function)
with RewriterContext(cfg={}, backend=backend.backend_name, opset=11):
backend.run_and_validate(
wrapped_model, [input.float(), target.float()],
'expand',
input_names=['input', 'shape'],
output_names=['output'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_ONNXRT])
@pytest.mark.parametrize('iou_threshold', [0.1, 0.3])
@pytest.mark.parametrize('score_threshold', [0., 0.1])
def test_nms_rotated(backend, iou_threshold, score_threshold, save_dir=None):
backend.check_env()
boxes = torch.tensor(
[[[60, 75, 20, 50, 0], [65, 80, 10, 40, 0], [30, 30, 40, 40, 0]],
[[60, 75, 20, 50, 0], [65, 80, 10, 40, 0], [30, 30, 40, 40, 0]]],
dtype=torch.float32)
scores = torch.tensor(
[[[0.5, 0.1, 0.1], [0.1, 0.6, 0.1], [0.1, 0.1, 0.7], [0.1, 0.1, 0.1]],
[[0.1, 0.1, 0.1], [0.7, 0.1, 0.1], [0.1, 0.6, 0.1], [0.1, 0.1, 0.5]]],
dtype=torch.float32)
from mmdeploy.mmcv.ops import ONNXNMSRotatedOp
def wrapped_function(torch_boxes, torch_scores):
return ONNXNMSRotatedOp.apply(torch_boxes, torch_scores, iou_threshold,
score_threshold)
wrapped_model = WrapFunction(wrapped_function).eval()
with RewriterContext(
Config({'backend_config': {
'type': backend.backend_name
}}),
backend=backend.backend_name,
opset=11):
backend.run_and_validate(
wrapped_model, [boxes, scores],
'nms_rotated',
input_names=['boxes', 'scores'],
output_names=['keep_inds'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_ONNXRT])
@pytest.mark.parametrize('pool_h,pool_w,spatial_scale,sampling_ratio',
[(2, 2, 1.0, 2), (4, 4, 2.0, 4)])
def test_roi_align_rotated(backend,
pool_h,
pool_w,
spatial_scale,
sampling_ratio,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
# input = torch.rand(1, 1, 16, 16, dtype=torch.float32)
input = torch.tensor([[[[1., 2.], [3., 4.]]]], dtype=torch.float32)
single_roi = torch.tensor([[0., 0.5, 0.5, 1., 1., 0]],
dtype=torch.float32)
else:
input = torch.tensor(input_list[0], dtype=torch.float32)
single_roi = torch.tensor(input_list[1], dtype=torch.float32)
from mmcv.ops import roi_align_rotated
def wrapped_function(torch_input, torch_rois):
return roi_align_rotated(torch_input, torch_rois, (pool_w, pool_h),
spatial_scale, sampling_ratio, True, False)
wrapped_model = WrapFunction(wrapped_function).eval()
with RewriterContext(
Config({'backend_config': {
'type': backend.backend_name
}}),
backend=backend.backend_name,
opset=11):
backend.run_and_validate(
wrapped_model, [input, single_roi],
'roi_align_rotated',
input_names=['input', 'rois'],
output_names=['roi_feat'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize(
'out_size, clockwise, sampling_ratio, roi_scale_factor,'
' finest_scale, featmap_strides, aligned',
[(tuple([2, 2]), False, 2, 1.0, 2, list([1.0]), 1)])
def test_multi_level_rotated_roi_align(backend,
out_size,
clockwise,
sampling_ratio,
roi_scale_factor,
finest_scale,
featmap_strides,
aligned,
input_list=None,
save_dir=None):
backend.check_env()
if input_list is None:
import numpy as np
input = [
torch.tensor([[[[1., 2., 5., 6.], [3., 4., 7., 8.],
[9., 10., 13., 14.], [11., 12., 15., 16.]]]])
]
rois = torch.tensor([[0., 1.5, 1.5, 3., 3., np.pi / 2]])
expected_result = torch.tensor([[[[7.5625, 1.9375], [10.375, 4.75]]]])
else:
input = input_list[0]
rois = input_list[1]
expected_result = input_list[2]
input_name = [('input_' + str(i)) for i in range(len(featmap_strides))]
input_name.insert(0, 'rois')
inputs = [
onnx.helper.make_tensor_value_info(
input_name[i + 1], onnx.TensorProto.FLOAT, shape=input[i].shape)
for i in range(len(input_name) - 1)
]
inputs.append(
onnx.helper.make_tensor_value_info(
'rois', onnx.TensorProto.FLOAT, shape=rois.shape))
outputs = [
onnx.helper.make_tensor_value_info(
'bbox_feats', onnx.TensorProto.FLOAT, shape=expected_result.shape)
]
node = onnx.helper.make_node(
'MMCVMultiLevelRotatedRoiAlign',
input_name, ['bbox_feats'],
'MMCVMultiLevelRotatedRoiAlign_0',
None,
'mmdeploy',
featmap_strides=featmap_strides,
finest_scale=finest_scale,
output_height=out_size[0],
output_width=out_size[1],
clockwise=clockwise,
roi_scale_factor=roi_scale_factor,
sampling_ratio=sampling_ratio,
aligned=aligned)
graph = onnx.helper.make_graph([node], 'torch-jit-export', inputs, outputs)
onnx_model = onnx.helper.make_model(
graph, producer_name='pytorch', producer_version='1.8')
onnx_model.opset_import[0].version = 11
onnx_model.opset_import.append(
onnx.onnx_ml_pb2.OperatorSetIdProto(domain='mmdeploy', version=1))
backend.run_and_validate(
onnx_model, [rois, *input],
'multi_level_rotated_roi_align',
input_names=input_name,
output_names=['bbox_feats'],
expected_result=expected_result,
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
@pytest.mark.parametrize('strides', [(4, 4)])
def test_trt_grid_priors(backend, strides, input_list=None, save_dir=None):
backend.check_env()
if input_list is None:
input = torch.rand(1, 3, 2, 2)
base_anchors = torch.tensor([[-22.6274, -11.3137, 22.6274, 11.3137],
[-16.0000, -16.0000, 16.0000, 16.0000],
[-11.3137, -22.6274, 11.3137, 22.6274]])
expected_result = torch.tensor([[-22.6274, -11.3137, 22.6274, 11.3137],
[-16.0000, -16.0000, 16.0000, 16.0000],
[-11.3137, -22.6274, 11.3137, 22.6274],
[-18.6274, -11.3137, 26.6274, 11.3137],
[-12.0000, -16.0000, 20.0000, 16.0000],
[-7.3137, -22.6274, 15.3137, 22.6274],
[-22.6274, -7.3137, 22.6274, 15.3137],
[-16.0000, -12.0000, 16.0000, 20.0000],
[-11.3137, -18.6274, 11.3137, 26.6274],
[-18.6274, -7.3137, 26.6274, 15.3137],
[-12.0000, -12.0000, 20.0000, 20.0000],
[-7.3137, -18.6274, 15.3137, 26.6274]])
else:
input = input_list[0]
base_anchors = input_list[1]
expected_result = input_list[2]
input_name = ['input']
output_name = ['output']
class GridPriorsTestOps(torch.autograd.Function):
@staticmethod
def forward(ctx, base_anchor, feat_h, feat_w, stride_h: int,
stride_w: int):
a = base_anchor.shape[0]
return base_anchor.new_empty(feat_h * feat_w * a, 4)
@staticmethod
def symbolic(g, base_anchor, feat_h, feat_w, stride_h: int,
stride_w: int):
from torch.onnx import symbolic_helper
feat_h = symbolic_helper._unsqueeze_helper(g, feat_h, [0])
feat_w = symbolic_helper._unsqueeze_helper(g, feat_w, [0])
zero_h = g.op(
'ConstantOfShape',
feat_h,
value_t=torch.tensor([0], dtype=torch.long),
)
zero_w = g.op(
'ConstantOfShape',
feat_w,
value_t=torch.tensor([0], dtype=torch.long),
)
return g.op(
'mmdeploy::GridPriorsTRT',
base_anchor,
zero_h,
zero_w,
stride_h_i=stride_h,
stride_w_i=stride_w)
class GridPriorsTestModel(torch.nn.Module):
def __init__(self, strides, base_anchors=base_anchors) -> None:
super().__init__()
self.strides = strides
self.base_anchors = base_anchors
def forward(self, x):
base_anchors = self.base_anchors
h, w = x.shape[2:]
strides = self.strides
return GridPriorsTestOps.apply(base_anchors, h, w, strides[0],
strides[1])
model = GridPriorsTestModel(strides=strides)
backend.run_and_validate(
model, [input],
'trt_grid_priors',
input_names=input_name,
output_names=output_name,
expected_result=expected_result,
dynamic_axes=dict(input={
2: 'h',
3: 'w'
}),
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
def test_dot_product_attention(backend, save_dir=None):
backend.check_env()
B = 2
Nt = 4
Ns = 4
E = 2
query = torch.rand(B, Nt, E).cuda()
key = torch.rand(B, Ns, E).cuda()
value = torch.rand(B, Ns, E).cuda()
model = torch.nn.MultiheadAttention(E, 2).cuda()
with RewriterContext(
Config({'backend_config': {
'type': backend.backend_name
}}),
backend=backend.backend_name,
opset=11):
backend.run_and_validate(
model, [query, key, value],
'dot_product_attention',
input_names=['query', 'key', 'value'],
output_names=['out', 'attn'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
def test_gather_topk(backend, save_dir=None):
backend.check_env()
from mmdeploy.codebase.mmdet.deploy.utils import gather_topk
x = torch.rand(2, 10, 4).cuda()
class TestModel(torch.nn.Module):
def __init__(self) -> None:
super().__init__()
def forward(self, x):
batch_size = x.size(0)
max_x, _ = x.max(-1)
_, inds = max_x.topk(4)
new_x = gather_topk(x, inds=inds, batch_size=batch_size)
return new_x
model = TestModel().cuda()
with RewriterContext(
Config({'backend_config': {
'type': backend.backend_name
}}),
backend=backend.backend_name,
opset=11):
backend.run_and_validate(
model, [x],
'gather_topk',
input_names=['x'],
output_names=['out'],
save_dir=save_dir)
@pytest.mark.parametrize('backend', [TEST_ONNXRT])
@pytest.mark.parametrize('pre_top_k', [-1, 1000])
def test_multiclass_nms_rotated_with_keep_top_k(backend, pre_top_k):
backend.check_env()
from mmdeploy.mmcv.ops import multiclass_nms
from mmdeploy.utils.test import get_onnx_model
keep_top_k = 15
deploy_cfg = Config(
dict(
onnx_config=dict(
output_names=None,
input_shape=None,
dynamic_axes=dict(
boxes={
0: 'batch_size',
1: 'num_boxes'
},
scores={
0: 'batch_size',
1: 'num_boxes',
2: 'num_classes'
},
),
),
backend_config=dict(type=backend.backend_name),
codebase_config=dict(
type='mmrotate',
task='RotatedDetection',
post_processing=dict(
score_threshold=0.05,
iou_threshold=0.5,
pre_top_k=pre_top_k,
keep_top_k=keep_top_k,
))))
num_classes = 5
num_boxes = 2
batch_size = 1
export_boxes = torch.rand(batch_size, num_boxes, 5)
export_scores = torch.ones(batch_size, num_boxes, num_classes)
model_inputs = {'boxes': export_boxes, 'scores': export_scores}
wrapped_func = WrapFunction(
multiclass_nms, nms_type='nms_rotated', keep_top_k=keep_top_k)
onnx_model_path = get_onnx_model(
wrapped_func, model_inputs=model_inputs, deploy_cfg=deploy_cfg)
num_boxes = 100
test_boxes = torch.rand(batch_size, num_boxes, 5)
test_scores = torch.ones(batch_size, num_boxes, num_classes)
model_inputs = {'boxes': test_boxes, 'scores': test_scores}
import mmdeploy.backend.onnxruntime as ort_apis
backend_model = ort_apis.ORTWrapper(onnx_model_path, 'cpu', None)
output = backend_model.forward(model_inputs)
output = backend_model.output_to_list(output)
dets = output[0]
# Subtract 1 dim since we pad the tensors
assert dets.shape[1] - 1 < keep_top_k, \
'multiclass_nms_rotated returned more values than "keep_top_k"\n' \
f'dets.shape: {dets.shape}\n' \
f'keep_top_k: {keep_top_k}'
@pytest.mark.parametrize('backend', [TEST_TENSORRT])
def test_multi_scale_deformable_attn(backend, save_dir=None):
backend.check_env()
from mmcv.ops.multi_scale_deform_attn import \
MultiScaleDeformableAttnFunction
Bs = 2
Nh = 8
Nc = 32
Nq = 32
Np = 32
spatial_shapes = [[68, 120], [34, 60]]
value_spatial_shapes = torch.LongTensor(spatial_shapes).cuda()
Nl = value_spatial_shapes.shape[0]
Nk = sum([spatial_shapes[i][0] * spatial_shapes[i][1] for i in range(Nl)])
value = torch.rand(Bs, Nk, Nh, Nc).cuda()
level_start_index = torch.cat((
value_spatial_shapes.new_zeros((1, )),
value_spatial_shapes.prod(1).cumsum(0)[:-1].to(torch.int64),
))
sampling_locations = torch.rand(Bs, Nq, Nh, Nl, Np, 2).cuda()
attention_weights = torch.rand(Bs, Nq, Nh, Nl, Np).cuda()
class TestModel(torch.nn.Module):
def __init__(self) -> None:
super().__init__()
self.im2col_step = 32
def forward(self, value, value_spatial_shapes, level_start_index,
sampling_locations, attention_weights):
new_x = MultiScaleDeformableAttnFunction.apply(
value, value_spatial_shapes, level_start_index,
sampling_locations, attention_weights, self.im2col_step)
return new_x
model = TestModel().cuda()
with RewriterContext(
Config({'backend_config': {
'type': backend.backend_name
}}),
backend=backend.backend_name,
opset=11):
backend.run_and_validate(
model, [
value, value_spatial_shapes, level_start_index,
sampling_locations, attention_weights
],
'multi_scale_deformable_attn',
input_names=[
'value', 'value_spatial_shapes', 'level_start_index',
'sampling_locations', 'attention_weights'
],
output_names=['output'],
save_dir=save_dir)
# Copyright (c) OpenMMLab. All rights reserved.
import os
import subprocess
import tempfile
import onnx
import pytest
import torch
from mmengine import Config
import mmdeploy.apis.tensorrt as trt_apis
from mmdeploy.utils import Backend
from mmdeploy.utils.test import assert_allclose, check_backend
@pytest.mark.skip(reason='This a not test class but a utility class.')
class TestOnnxRTExporter:
def __init__(self):
self.backend_name = 'onnxruntime'
def check_env(self):
check_backend(Backend.ONNXRUNTIME, True)
def run_and_validate(self,
model,
input_list,
model_name='tmp',
tolerate_small_mismatch=False,
do_constant_folding=True,
dynamic_axes=None,
output_names=None,
input_names=None,
expected_result=None,
save_dir=None):
if save_dir is None:
onnx_file_path = tempfile.NamedTemporaryFile().name
else:
onnx_file_path = os.path.join(save_dir, model_name + '.onnx')
with torch.no_grad():
torch.onnx.export(
model,
tuple(input_list),
onnx_file_path,
export_params=True,
keep_initializers_as_inputs=True,
input_names=input_names,
output_names=output_names,
do_constant_folding=do_constant_folding,
dynamic_axes=dynamic_axes,
opset_version=11)
if expected_result is None:
with torch.no_grad():
model_outputs = model(*input_list)
else:
model_outputs = expected_result
if isinstance(model_outputs, torch.Tensor):
model_outputs = [model_outputs]
else:
model_outputs = list(model_outputs)
from mmdeploy.backend.onnxruntime import ORTWrapper
onnx_model = ORTWrapper(onnx_file_path, 'cpu', output_names)
with torch.no_grad():
onnx_outputs = onnx_model.forward(
dict(zip(input_names, input_list)))
onnx_outputs = [onnx_outputs[i] for i in output_names]
assert_allclose(model_outputs, onnx_outputs, tolerate_small_mismatch)
@pytest.mark.skip(reason='This a not test class but a utility class.')
class TestTensorRTExporter:
def __init__(self):
self.backend_name = 'tensorrt'
def check_env(self):
check_backend(Backend.TENSORRT, True)
def run_and_validate(self,
model,
input_list,
model_name='tmp',
tolerate_small_mismatch=False,
do_constant_folding=True,
dynamic_axes=None,
output_names=None,
input_names=None,
expected_result=None,
save_dir=None):
if save_dir is None:
onnx_file_path = tempfile.NamedTemporaryFile(suffix='.onnx').name
trt_file_path = tempfile.NamedTemporaryFile(suffix='.engine').name
else:
os.makedirs(save_dir, exist_ok=True)
onnx_file_path = os.path.join(save_dir, model_name + '.onnx')
trt_file_path = os.path.join(save_dir, model_name + '.engine')
input_list = [data.cuda() for data in input_list]
if isinstance(model, onnx.onnx_ml_pb2.ModelProto):
onnx.save(model, onnx_file_path)
else:
with torch.no_grad():
torch.onnx.export(
model,
tuple(input_list),
onnx_file_path,
export_params=True,
keep_initializers_as_inputs=True,
input_names=input_names,
output_names=output_names,
do_constant_folding=do_constant_folding,
dynamic_axes=dynamic_axes,
opset_version=11)
deploy_cfg = Config(
dict(
backend_config=dict(
type='tensorrt',
common_config=dict(
fp16_mode=False, max_workspace_size=1 << 20),
model_inputs=[
dict(
input_shapes=dict(
zip(input_names, [
dict(
min_shape=data.shape,
opt_shape=data.shape,
max_shape=data.shape)
for data in input_list
])))
])))
work_dir, filename = os.path.split(trt_file_path)
trt_apis.onnx2tensorrt(
work_dir,
filename,
0,
deploy_cfg=deploy_cfg,
onnx_model=onnx_file_path)
if expected_result is None and not isinstance(
model, onnx.onnx_ml_pb2.ModelProto):
with torch.no_grad():
model_outputs = model(*input_list)
else:
model_outputs = expected_result
if isinstance(model_outputs, torch.Tensor):
model_outputs = [model_outputs.cpu().float()]
else:
model_outputs = [data.cpu().float() for data in model_outputs]
from mmdeploy.backend.tensorrt import TRTWrapper
trt_model = TRTWrapper(trt_file_path, output_names)
trt_outputs = trt_model(dict(zip(input_names, input_list)))
trt_outputs = [trt_outputs[i].float().cpu() for i in output_names]
assert_allclose(model_outputs, trt_outputs, tolerate_small_mismatch)
@pytest.mark.skip(reason='This a not test class but a utility class.')
class TestNCNNExporter:
def __init__(self):
self.backend_name = 'ncnn'
def check_env(self):
check_backend(Backend.NCNN, True)
def run_and_validate(self,
model,
inputs_list,
model_name='tmp',
tolerate_small_mismatch=False,
do_constant_folding=True,
dynamic_axes=None,
output_names=None,
input_names=None,
save_dir=None):
if save_dir is None:
onnx_file_path = tempfile.NamedTemporaryFile().name
ncnn_param_path = tempfile.NamedTemporaryFile().name
ncnn_bin_path = tempfile.NamedTemporaryFile().name
else:
onnx_file_path = os.path.join(save_dir, model_name + '.onnx')
ncnn_param_path = os.path.join(save_dir, model_name + '.param')
ncnn_bin_path = os.path.join(save_dir, model_name + '.bin')
with torch.no_grad():
torch.onnx.export(
model,
tuple(inputs_list),
onnx_file_path,
export_params=True,
keep_initializers_as_inputs=True,
input_names=input_names,
output_names=output_names,
do_constant_folding=do_constant_folding,
dynamic_axes=dynamic_axes,
opset_version=11)
from mmdeploy.backend.ncnn.init_plugins import get_onnx2ncnn_path
onnx2ncnn_path = get_onnx2ncnn_path()
subprocess.call(
[onnx2ncnn_path, onnx_file_path, ncnn_param_path, ncnn_bin_path])
with torch.no_grad():
model_outputs = model(*inputs_list)
if isinstance(model_outputs, torch.Tensor):
model_outputs = [model_outputs]
else:
model_outputs = list(model_outputs)
model_outputs = [
model_output.float() for model_output in model_outputs
]
from mmdeploy.backend.ncnn import NCNNWrapper
ncnn_model = NCNNWrapper(ncnn_param_path, ncnn_bin_path, output_names)
ncnn_outputs = ncnn_model(dict(zip(input_names, inputs_list)))
ncnn_outputs = [ncnn_outputs[name] for name in output_names]
if model_name.startswith('topk_no_sorted'):
dim = int(model_name.split('_')[-1])
model_outputs = torch.stack(model_outputs, dim=-1).\
sort(dim=dim).values
ncnn_outputs = torch.stack(ncnn_outputs, dim=-1).\
sort(dim=dim).values
assert_allclose([model_outputs], [ncnn_outputs],
tolerate_small_mismatch)
else:
assert_allclose(model_outputs, ncnn_outputs,
tolerate_small_mismatch)
def onnx2ncnn(self, model, model_name, output_names, save_dir=None):
def _from_onnx(self, model, model_name, output_names, save_dir=None):
onnx_file_path = os.path.join(save_dir, model_name + '.onnx')
ncnn_param_path = os.path.join(save_dir, model_name + '.param')
ncnn_bin_path = os.path.join(save_dir, model_name + '.bin')
onnx.save_model(model, onnx_file_path)
from mmdeploy.backend.ncnn import from_onnx
from_onnx(onnx_file_path, os.path.join(save_dir, model_name))
from mmdeploy.backend.ncnn import NCNNWrapper
ncnn_model = NCNNWrapper(ncnn_param_path, ncnn_bin_path,
output_names)
return ncnn_model
if save_dir is None:
with tempfile.TemporaryDirectory() as save_dir:
return _from_onnx(
self, model, model_name, output_names, save_dir=save_dir)
else:
return _from_onnx(
self, model, model_name, output_names, save_dir=save_dir)
# Copyright (c) OpenMMLab. All rights reserved.
import os.path as osp
import numpy as np
import pytest
import torch
import torch.nn.functional as F
from mmengine import Config
from packaging.version import parse
from mmdeploy.utils import Backend
from mmdeploy.utils.test import (WrapFunction, backend_checker,
get_rewrite_outputs)
deploy_cfg_ncnn = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='ncnn', model_inputs=None, use_vulkan=False),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
def get_trt_config(output_names, shape, dynamic_axes=None):
deploy_cfg_tensorrt = Config(
dict(
onnx_config=dict(
input_shape=None,
output_names=output_names,
dynamic_axes=dynamic_axes),
backend_config=dict(
type='tensorrt',
common_config=dict(
fp16_mode=False, max_workspace_size=1 << 20),
model_inputs=[
dict(
input_shapes=dict(
input=dict(
min_shape=shape,
opt_shape=shape,
max_shape=shape)))
]),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
return deploy_cfg_tensorrt
@backend_checker(Backend.NCNN)
def test_get_attribute():
def model_func(tensor):
x = tensor.size()
assert isinstance(x[0], int) and not isinstance(x[0], torch.Tensor)
return torch.tensor(x)
input = torch.zeros([1, 2, 3, 4])
wrapped_func = WrapFunction(model_func)
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'tensor': input},
deploy_cfg=deploy_cfg_ncnn,
run_with_backend=True)
assert rewrite_outputs is not None, 'Got unexpected rewrite '
'outputs: {}'.format(rewrite_outputs)
@backend_checker(Backend.NCNN)
def test_group_norm_ncnn():
input = torch.rand([1, 2, 2, 2])
weight = torch.rand([2])
bias = torch.rand([2])
model_output = F.group_norm(input, 1, weight, bias, 1e-05)
def group_norm_caller(input):
return F.group_norm(input, 1, weight, bias)
wrapped_func = WrapFunction(group_norm_caller)
rewrite_output, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ncnn,
run_with_backend=True)
assert np.allclose(model_output, rewrite_output[0], rtol=1e-03, atol=1e-05)
@backend_checker(Backend.NCNN)
def test_chunk_ncnn():
input = torch.rand(1, 16, 16, 16)
model_output = input.chunk(2, dim=1)
def chunk_caller(input):
return input.chunk(2, dim=1)
wrapped_func = WrapFunction(chunk_caller)
rewrite_output, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ncnn,
run_with_backend=True)
assert len(model_output) == len(rewrite_output)
for i in range(len(model_output)):
assert np.allclose(
model_output[i], rewrite_output[i], rtol=1e-03, atol=1e-05)
@backend_checker(Backend.NCNN)
def test_interpolate_static():
input = torch.rand([1, 2, 2, 2])
model_output = F.interpolate(input, scale_factor=[2, 2])
def interpolate_caller(*arg, **kwargs):
return F.interpolate(*arg, **kwargs)
wrapped_func = WrapFunction(interpolate_caller, size=[4, 4])
rewrite_output, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ncnn,
run_with_backend=True)
assert np.allclose(model_output, rewrite_output[0], rtol=1e-03, atol=1e-05)
@backend_checker(Backend.RKNN)
def test_interpolate__rknn():
input = torch.rand([1, 2, 2, 2])
model_output = F.interpolate(input, scale_factor=[2, 2])
def interpolate_caller(*arg, **kwargs):
return F.interpolate(*arg, **kwargs)
deploy_cfg = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='rknn', model_inputs=None),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
wrapped_func = WrapFunction(interpolate_caller, size=[4, 4])
rewrite_output, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg,
run_with_backend=False)
assert np.allclose(model_output, rewrite_output[0], rtol=1e-03, atol=1e-05)
@backend_checker(Backend.NCNN)
def test_linear_ncnn():
input = torch.rand([1, 2, 2])
weight = torch.rand([2, 2])
bias = torch.rand([2])
model_output = F.linear(input, weight=weight, bias=bias)
def linear_caller(*arg, **kwargs):
return F.linear(*arg, **kwargs)
wrapped_func = WrapFunction(linear_caller, weight=weight, bias=bias)
rewrite_output, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ncnn,
run_with_backend=True)
assert np.allclose(model_output, rewrite_output[0], rtol=1e-03, atol=1e-05)
@backend_checker(Backend.NCNN)
def test_norm_ncnn():
import onnx
import mmdeploy.apis.ncnn as ncnn_apis
from mmdeploy.utils.test import get_onnx_model
input = torch.rand(1, 17, 24)
wrapped_func = WrapFunction(torch.norm, p='fro', dim=2, keepdim=True)
model_inputs = {'input': input}
ir_file_path = get_onnx_model(wrapped_func, model_inputs, deploy_cfg_ncnn)
assert osp.exists(ir_file_path)
onnx_model = onnx.load(ir_file_path)
nodes = onnx_model.graph.node
assert nodes[-1].name.startswith('ReduceL2')
ncnn_files_prefix = osp.splitext(ir_file_path)[0]
ncnn_apis.from_onnx(ir_file_path, ncnn_files_prefix)
param_path, bin_path = ncnn_apis.get_output_model_file(ir_file_path)
assert osp.exists(param_path)
assert osp.exists(bin_path)
@backend_checker(Backend.TENSORRT)
def test_repeat_static():
input = torch.rand([1])
def model_func(input):
return torch.Tensor.repeat(input, 4)
wrapped_func = WrapFunction(model_func)
model_output = model_func(input)
deploy_cfg = get_trt_config(['output'], [1])
rewrite_output, is_backend_output = get_rewrite_outputs(
wrapped_func, model_inputs={'input': input}, deploy_cfg=deploy_cfg)
if is_backend_output:
rewrite_output = rewrite_output[0].detach().cpu()
assert np.allclose(
model_output, rewrite_output, rtol=1e-03, atol=1e-05)
else:
assert rewrite_output is not None
@backend_checker(Backend.NCNN)
def test_size_of_tensor_static():
def model_func(input):
x = torch.Tensor.size(input)
assert isinstance(x[0], int) and not isinstance(x[0], torch.Tensor)
return torch.tensor(x)
input = torch.zeros([1, 2, 3, 4])
wrapped_func = WrapFunction(model_func)
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ncnn,
run_with_backend=True)
assert rewrite_outputs is not None, 'Got unexpected rewrite '
'outputs: {}'.format(rewrite_outputs)
@backend_checker(Backend.ASCEND)
def test_size__ascend():
def model_func(input):
x = torch.Tensor.size(input, -1)
return torch.tensor(x)
input = torch.zeros([1, 2, 3, 4])
deploy_cfg_ascend = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(
type='ascend',
model_inputs=[dict(input_shapes=dict(input=input.shape))]),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
wrapped_func = WrapFunction(model_func)
rewrite_outputs, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ascend,
run_with_backend=True)
assert rewrite_outputs is not None, 'Got unexpected rewrite '
'outputs: {}'.format(rewrite_outputs)
class TestTopk:
input = torch.rand(1, 5, 5, 5)
@backend_checker(Backend.NCNN)
@pytest.mark.parametrize('k', [1, 3, 4])
@pytest.mark.parametrize('dim', [1, 2, 3])
def test_topk_ncnn(self, dim, k):
model_output = torch.Tensor.topk(TestTopk.input, k, dim).values
def model_func(input):
x = input.topk(k, dim)
return x.indices, x.values
wrapped_func = WrapFunction(model_func)
# mmdeploy.pytorch.functions.topk.topk_dynamic
output, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': TestTopk.input},
deploy_cfg=deploy_cfg_ncnn,
run_with_backend=True)
assert np.allclose(model_output, output[0], rtol=1e-03, atol=1e-05)
@backend_checker(Backend.TENSORRT)
@pytest.mark.parametrize('k', [1, 3, 4])
@pytest.mark.parametrize('dim', [1, 2, 3])
def test_topk_tensorrt(self, dim, k):
model_output = torch.Tensor.topk(TestTopk.input, k, dim).values
def model_func(input):
x = input.topk(k, dim)
return x.indices, x.values
wrapped_func = WrapFunction(model_func)
# mmdeploy.pytorch.functions.topk.topk_static
deploy_cfg_tensorrt = get_trt_config(['indices', 'values'],
[1, 5, 5, 5])
output, is_backend_output = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': TestTopk.input},
deploy_cfg=deploy_cfg_tensorrt)
if is_backend_output:
output = output[1].detach().cpu()
assert np.allclose(model_output, output, rtol=1e-03, atol=1e-05)
else:
assert output is not None
@backend_checker(Backend.TENSORRT)
@pytest.mark.parametrize('shape', [[2, 2], [4, 2], [2, 4], [2, 4, 2]])
@pytest.mark.parametrize('diagonal', [0, 1, -1])
def test_triu_trt(shape, diagonal):
input = torch.rand(shape)
model_output = torch.triu(input=input, diagonal=diagonal)
def triu_caller(*arg, **kwargs):
return torch.triu(*arg, **kwargs)
wrapped_func = WrapFunction(triu_caller, diagonal=diagonal)
rewrite_outputs, is_backend_output = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=get_trt_config(['output'], shape=shape),
run_with_backend=True)
if is_backend_output:
rewrite_outputs = rewrite_outputs[0].detach().cpu()
assert np.allclose(
model_output, rewrite_outputs, rtol=1e-03, atol=1e-05)
else:
assert rewrite_outputs is not None
@backend_checker(Backend.NCNN)
@pytest.mark.parametrize(
'input',
[torch.rand(1, 16, 16), torch.rand(1, 3, 16, 16)])
@pytest.mark.parametrize('dim', [1, 2])
def test_normalize_ncnn(input, dim):
import mmdeploy.apis.ncnn as ncnn_apis
from mmdeploy.utils.test import get_onnx_model
def norm_func(input, dim):
return F.normalize(input, p=2, dim=dim)
wrapped_func = WrapFunction(norm_func, dim=dim)
model_inputs = {'input': input}
ir_file_path = get_onnx_model(wrapped_func, model_inputs, deploy_cfg_ncnn)
assert osp.exists(ir_file_path)
ncnn_files_prefix = osp.splitext(ir_file_path)[0]
ncnn_apis.from_onnx(ir_file_path, ncnn_files_prefix)
param_path, bin_path = ncnn_apis.get_output_model_file(ir_file_path)
assert osp.exists(param_path)
assert osp.exists(bin_path)
@backend_checker(Backend.ASCEND)
def test_getitem__ascend():
input = torch.rand(1, 2, 3)
def tensor_getitem(x):
return x[..., -1]
# create wrapped model
wrapped_func = WrapFunction(tensor_getitem)
import tempfile
import onnx
from mmdeploy.core import RewriterContext
onnx_file = tempfile.NamedTemporaryFile(suffix='onnx').name
# convert model
with RewriterContext(
cfg={}, backend=Backend.ASCEND.value, opset=11), torch.no_grad():
torch.onnx.export(wrapped_func, input, onnx_file, opset_version=11)
onnx_model = onnx.load(onnx_file)
nodes = onnx_model.graph.node
assert nodes is not None
@backend_checker(Backend.ONNXRUNTIME)
@pytest.mark.parametrize(
'input',
[torch.rand(1, 16, 16), torch.rand(1, 3, 16, 16)])
def test_masked_fill_onnxruntime(input):
mask = input > 0
value = float('-inf')
def masked_fill_caller(*arg, **kwargs):
return torch.masked_fill(*arg, **kwargs)
deploy_cfg_ort = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='onnxruntime'),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
wrapped_func = WrapFunction(masked_fill_caller, mask=mask, value=value)
rewrite_output, _ = get_rewrite_outputs(
wrapped_func,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ort,
run_with_backend=True)
assert rewrite_output is not None
@backend_checker(Backend.ONNXRUNTIME)
@pytest.mark.skipif(
parse(torch.__version__) < parse('1.9.0'), reason='requires torch>1.8.0')
@pytest.mark.parametrize('x', [torch.rand(1, 3, 16, 16)])
@pytest.mark.parametrize('y', [torch.rand(1, 3, 4, 4)])
def test_tensor_setitem(x, y):
import onnx
from mmdeploy.utils.test import get_onnx_model
def setitem_slice(x, y):
H, W = y.shape[2:]
x[:, :, 2:H + 2, 2:W + 2] = y
return x
wrapped_func = WrapFunction(setitem_slice)
model_inputs = {'x': x, 'y': y}
deploy_cfg = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='onnxruntime'),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
ir_file_path = get_onnx_model(wrapped_func, model_inputs, deploy_cfg)
onnx_model = onnx.load(ir_file_path)
nodes = onnx_model.graph.node
for node in nodes:
assert node.op_type != 'ScatterND'
@backend_checker(Backend.ONNXRUNTIME)
@pytest.mark.skipif(
parse(torch.__version__) < parse('1.9.0'), reason='requires torch>1.8.0')
@pytest.mark.parametrize('x', [torch.rand(1, 3, 16, 16)])
def test_tensor_setitem_scalar(x):
import onnx
from mmdeploy.utils.test import get_onnx_model
def setitem_slice(x):
H, W = x.shape[-2:]
x[:, 1:3] = 1
x[:, :, 4:H - 4, 4:W - 4] = x.new_tensor(2)
return x
wrapped_func = WrapFunction(setitem_slice)
model_inputs = {'x': x}
deploy_cfg = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='onnxruntime'),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
ir_file_path = get_onnx_model(wrapped_func, model_inputs, deploy_cfg)
onnx_model = onnx.load(ir_file_path)
nodes = onnx_model.graph.node
for node in nodes:
assert node.op_type != 'ScatterND'
@pytest.mark.parametrize('output_size', [1, 3])
def test_adaptive_avg_pool2d(output_size):
input = torch.rand(1, 3, 6, 6)
model = WrapFunction(F.adaptive_avg_pool2d, output_size=output_size)
pytorch_output = model(input)
deploy_cfg_ort = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='onnxruntime'),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
rewrite_output, _ = get_rewrite_outputs(
model,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ort,
run_with_backend=True)
assert torch.allclose(pytorch_output, rewrite_output[0])
@backend_checker(Backend.TENSORRT)
def test_scaled_dot_product_attention():
L = 10
B = 1
E = 4
q = k = v = torch.rand(B, L, E)
attn_mask = torch.rand(B, L, L)
from torch.nn.functional import _scaled_dot_product_attention
model = WrapFunction(_scaled_dot_product_attention)
pytorch_output = model(q, k, v, attn_mask)
deploy_cfg_ort = Config(
dict(
onnx_config=dict(
input_shape=None,
input_names=['q', 'k', 'v', 'attn_mask'],
output_names=['output', 'attn']),
backend_config=dict(
type='tensorrt',
model_inputs=[
dict(
input_shapes=dict(
q=dict(
min_shape=q.shape,
opt_shape=q.shape,
max_shape=q.shape),
k=dict(
min_shape=k.shape,
opt_shape=k.shape,
max_shape=k.shape),
v=dict(
min_shape=v.shape,
opt_shape=v.shape,
max_shape=v.shape),
attn_mask=dict(
min_shape=attn_mask.shape,
opt_shape=attn_mask.shape,
max_shape=attn_mask.shape)))
]),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
rewrite_output, _ = get_rewrite_outputs(
model,
model_inputs={
'q': q,
'k': k,
'v': v,
'attn_mask': attn_mask
},
deploy_cfg=deploy_cfg_ort,
run_with_backend=True)
assert torch.allclose(pytorch_output[0],
rewrite_output[0].to(pytorch_output[0].device))
@backend_checker(Backend.TENSORRT)
@pytest.mark.parametrize('num', [5, 7])
def test_mod__tensorrt(num):
input = torch.rand(1, 3, 6, 6).cuda()
model = WrapFunction(lambda input: input % num)
pytorch_output = model(input)
rewrite_output, _ = get_rewrite_outputs(
model,
model_inputs={'input': input},
deploy_cfg=get_trt_config(['output'], shape=[1, 3, 6, 6]),
run_with_backend=True)
assert torch.allclose(
pytorch_output, rewrite_output[0], rtol=1e-3, atol=1e-5)
@backend_checker(Backend.TENSORRT)
def test_prepare_onnx_paddings__tensorrt():
input = torch.rand(1, 3, 6, 6).cuda()
def _pad_(x):
a, b = [torch.tensor(2)] * 2
x = torch.nn.ZeroPad2d((0, a, 0, b))(x)
return x
model = WrapFunction(_pad_)
pytorch_output = model(input)
rewrite_output, _ = get_rewrite_outputs(
model,
model_inputs={'x': input},
deploy_cfg=get_trt_config(['output'], shape=[1, 3, 6, 6]),
run_with_backend=True)
assert torch.allclose(
pytorch_output, rewrite_output[0], rtol=1e-3, atol=1e-5)
@backend_checker(Backend.ONNXRUNTIME)
@pytest.mark.parametrize('dim', [0, -1])
@pytest.mark.parametrize('keepdim', [True, False])
def test_any__default(dim, keepdim):
input = torch.rand(2, 4)
model = WrapFunction(lambda input: input.any(dim, keepdim=keepdim))
pytorch_output = model(input)
deploy_cfg_ort = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='onnxruntime'),
codebase_config=dict(type='mmdet', task='ObjectDetection')))
rewrite_output, _ = get_rewrite_outputs(
model,
model_inputs={'input': input},
deploy_cfg=deploy_cfg_ort,
run_with_backend=True)
assert pytorch_output.dtype == rewrite_output[0].dtype
assert torch.allclose(
pytorch_output.float(),
rewrite_output[0].float(),
rtol=1e-3,
atol=1e-5)
@backend_checker(Backend.ONNXRUNTIME)
def test_linspace__default():
import random
deploy_cfg_ort = Config(
dict(
onnx_config=dict(input_shape=None),
backend_config=dict(type='onnxruntime')))
def linspace_caller(*arg, **kwargs):
return torch.linspace(*arg, **kwargs)
steps_list = [1, random.randint(1, 1000)]
for steps in steps_list:
start = random.random() * 100
end = random.random() * 100 + start
model_output = linspace_caller(start=start, end=end, steps=steps)
wrapped_func = WrapFunction(
linspace_caller, start=start, end=end, steps=steps)
rewrite_outputs, is_backend_output = get_rewrite_outputs(
wrapped_func,
model_inputs={},
deploy_cfg=deploy_cfg_ort,
run_with_backend=True)
if is_backend_output:
rewrite_outputs = rewrite_outputs[0]
assert np.allclose(
model_output, rewrite_outputs, rtol=1e-03, atol=1e-05)
@backend_checker(Backend.TENSORRT)
@pytest.mark.parametrize('dtype', [torch.bool, torch.float32])
@pytest.mark.parametrize('dynamic_axes',
[None, dict(input=dict({
0: 'dim0',
1: 'dim1'
}))])
def test_cat__tensorrt(dtype, dynamic_axes):
input = torch.rand(2, 4)
model = WrapFunction(lambda input: torch.cat(
[input.to(dtype), input.to(dtype)], -1))
pytorch_output = model(input)
rewrite_output, _ = get_rewrite_outputs(
model,
model_inputs={'input': input},
deploy_cfg=get_trt_config(['output'],
shape=[2, 4],
dynamic_axes=dynamic_axes),
run_with_backend=True)
assert pytorch_output.dtype == rewrite_output[0].dtype
assert torch.allclose(
pytorch_output.cpu().float(),
rewrite_output[0].cpu().float(),
rtol=1e-3,
atol=1e-5)
@backend_checker(Backend.TENSORRT)
def test_copy__default():
import copy
input = torch.rand(2, 4)
model = WrapFunction(
lambda input: [copy.deepcopy(input) for i in range(3)])
pytorch_output = model(input)
rewrite_output, _ = get_rewrite_outputs(
model,
model_inputs={'input': input},
deploy_cfg=get_trt_config(['output'], shape=[2, 4], dynamic_axes=None),
run_with_backend=True)
for pytorch_out, rewrite_out in zip(pytorch_output, rewrite_output):
assert torch.allclose(
pytorch_out.cpu().float(),
rewrite_out.cpu().float(),
rtol=1e-3,
atol=1e-5)
# Copyright (c) OpenMMLab. All rights reserved.
import tempfile
import onnx
import pytest
import torch
from mmengine import Config
from mmdeploy.core import RewriterContext
onnx_file = tempfile.NamedTemporaryFile(suffix='onnx').name
@pytest.fixture(autouse=False, scope='function')
def prepare_symbolics():
context = RewriterContext(
Config(
dict(
onnx_config=dict(
type='onnx',
export_params=True,
keep_initializers_as_inputs=False,
opset_version=11,
save_file='end2end.onnx',
input_names=['input'],
output_names=['output'],
input_shape=None),
backend_config=dict(type='tensorrt'))),
'tensorrt',
opset=11)
context.enter()
yield
context.exit()
@pytest.fixture(autouse=False, scope='function')
def prepare_symbolics_ncnn():
context = RewriterContext(
Config({'backend_config': {
'type': 'ncnn'
}}), 'ncnn', opset=11)
context.enter()
yield
context.exit()
class OpModel(torch.nn.Module):
def __init__(self, func, *args):
super().__init__()
self._func = func
self._arg_tuple = args
def forward(self, x):
return self._func(x, *self._arg_tuple)
def get_model_onnx_nodes(model, x, onnx_file=onnx_file):
torch.onnx.export(model, x, onnx_file, opset_version=11)
onnx_model = onnx.load(onnx_file)
nodes = onnx_model.graph.node
return nodes
@pytest.mark.usefixtures('prepare_symbolics')
class TestAdaptivePool:
def test_adaptive_pool_2d_global(self):
x = torch.rand(2, 2, 2)
model = OpModel(torch.nn.functional.adaptive_avg_pool2d, [1, 1]).eval()
nodes = get_model_onnx_nodes(model, x)
assert nodes[0].op_type == 'GlobalAveragePool'
def test_adaptive_pool_2d(self):
x = torch.rand(2, 2, 2)
model = OpModel(torch.nn.functional.adaptive_avg_pool2d, [2, 2]).eval()
nodes = get_model_onnx_nodes(model, x)
assert nodes[-1].op_type == 'AveragePool'
@pytest.mark.usefixtures('prepare_symbolics_ncnn')
def test_adaptive_pool_2d_ncnn():
x = torch.rand(2, 2, 2)
model = OpModel(torch.nn.functional.adaptive_avg_pool2d,
torch.tensor([2, 2], dtype=torch.int64)).eval()
nodes = get_model_onnx_nodes(model, x)
assert nodes[1].op_type == 'AdaptiveAvgPool2d'
assert nodes[1].domain == 'mmdeploy'
@pytest.mark.usefixtures('prepare_symbolics')
def test_grid_sampler():
x = torch.rand(1, 1, 2, 2)
flow = torch.zeros([1, 2, 2, 2])
model = OpModel(torch.grid_sampler, flow, 0, 0, False).eval()
nodes = get_model_onnx_nodes(model, x)
assert nodes[1].op_type == 'grid_sampler'
assert nodes[1].domain == 'mmdeploy'
@pytest.mark.usefixtures('prepare_symbolics')
def test_roll():
x = torch.rand(1, 4, 4, 4)
shifts = [1, 1, 1]
dims = [3, 3, 3]
model = OpModel(torch.roll, shifts, dims).eval()
nodes = get_model_onnx_nodes(model, x)
assert nodes[-2].op_type == 'Slice'
assert nodes[-1].op_type == 'Concat'
@pytest.mark.usefixtures('prepare_symbolics')
def test_instance_norm():
x = torch.rand(1, 2, 2, 2)
model = OpModel(torch.group_norm, 1, torch.rand([2]), torch.rand([2]),
1e-05).eval()
nodes = get_model_onnx_nodes(model, x)
assert nodes[4].op_type == 'TRTInstanceNormalization'
assert nodes[4].domain == 'mmdeploy'
@pytest.mark.usefixtures('prepare_symbolics_ncnn')
class TestLinear:
def check(self, nodes):
print(nodes)
exist = False
for node in nodes:
if node.op_type in ['Gemm', 'MatMul']:
exist = True
break
assert exist is True
def test_normal(self):
x = torch.rand(1, 2, 3)
w = torch.rand(2, 3)
bias = torch.rand(2)
model = OpModel(torch.nn.functional.linear, w, bias).eval()
nodes = get_model_onnx_nodes(model, x)
self.check(nodes)
def test_no_bias(self):
x = torch.rand(1, 2, 3)
w = torch.rand(2, 3)
model = OpModel(torch.nn.functional.linear, w).eval()
nodes = get_model_onnx_nodes(model, x)
self.check(nodes)
@pytest.mark.usefixtures('prepare_symbolics')
class TestSqueeze:
def test_squeeze_default(self):
x = torch.rand(1, 1, 2, 2)
model = OpModel(torch.squeeze)
nodes = get_model_onnx_nodes(model, x)
assert nodes[0].attribute[0].ints == [0, 1]
assert nodes[0].op_type == 'Squeeze'
def test_squeeze(self):
x = torch.rand(1, 1, 2, 2)
model = OpModel(torch.squeeze, 0)
nodes = get_model_onnx_nodes(model, x)
assert nodes[0].attribute[0].ints == [0]
assert nodes[0].op_type == 'Squeeze'
@pytest.mark.usefixtures('prepare_symbolics')
def test_hardsigmoid():
x = torch.rand(1, 2, 3, 4)
model = torch.nn.Hardsigmoid().eval()
nodes = get_model_onnx_nodes(model, x)
assert nodes[0].op_type == 'HardSigmoid'
@pytest.mark.usefixtures('prepare_symbolics')
def test_layer_norm():
x = torch.rand(2, 1, 4)
model = torch.nn.LayerNorm(4).eval()
torch.onnx.export(model, x, onnx_file, opset_version=11)
onnx_model = onnx.load(onnx_file)
graph = onnx_model.graph
output = graph.output[0]
dim = output.type.tensor_type.shape.dim
assert dim[0].dim_value == 2
assert dim[1].dim_value == 1
assert dim[2].dim_value == 4
# Copyright (c) OpenMMLab. All rights reserved.
from mmdeploy.utils.dataset import is_can_sort_dataset, sort_dataset
class DummyDataset():
def __init__(self, data_infos=None):
if data_infos:
self.data_infos = data_infos
emtpy_dataset = DummyDataset()
dataset = DummyDataset([{
'id': 0,
'height': 0,
'width': 0
}, {
'id': 1,
'height': 1,
'width': 1
}, {
'id': 2,
'height': 1,
'width': 0
}, {
'id': 3,
'height': 0,
'width': 1
}])
class TestIsCanSortDataset:
def test_is_can_sort_dataset_false(self):
assert not is_can_sort_dataset(emtpy_dataset)
def test_is_can_sort_dataset_True(self):
assert is_can_sort_dataset(dataset)
def test_sort_dataset():
result_dataset = sort_dataset(dataset)
assert result_dataset.data_infos == [{
'id': 0,
'height': 0,
'width': 0
}, {
'id': 3,
'height': 0,
'width': 1
}, {
'id': 2,
'height': 1,
'width': 0
}, {
'id': 1,
'height': 1,
'width': 1
}]
assert result_dataset.img_ids == [0, 3, 2, 1]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment