Commit c732df65 authored by limm's avatar limm
Browse files

push v0.1.3 version commit bd2ea47

parent 5b3792fc
Pipeline #706 failed with stages
in 0 seconds
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import datetime
import logging
import time
from collections import OrderedDict
from contextlib import contextmanager
import torch
from detectron2.utils.comm import get_world_size, is_main_process
from detectron2.utils.logger import log_every_n_seconds
class DatasetEvaluator:
"""
Base class for a dataset evaluator.
The function :func:`inference_on_dataset` runs the model over
all samples in the dataset, and have a DatasetEvaluator to process the inputs/outputs.
This class will accumulate information of the inputs/outputs (by :meth:`process`),
and produce evaluation results in the end (by :meth:`evaluate`).
"""
def reset(self):
"""
Preparation for a new round of evaluation.
Should be called before starting a round of evaluation.
"""
pass
def process(self, inputs, outputs):
"""
Process the pair of inputs and outputs.
If they contain batches, the pairs can be consumed one-by-one using `zip`:
.. code-block:: python
for input_, output in zip(inputs, outputs):
# do evaluation on single input/output pair
...
Args:
inputs (list): the inputs that's used to call the model.
outputs (list): the return value of `model(inputs)`
"""
pass
def evaluate(self):
"""
Evaluate/summarize the performance, after processing all input/output pairs.
Returns:
dict:
A new evaluator class can return a dict of arbitrary format
as long as the user can process the results.
In our train_net.py, we expect the following format:
* key: the name of the task (e.g., bbox)
* value: a dict of {metric name: score}, e.g.: {"AP50": 80}
"""
pass
class DatasetEvaluators(DatasetEvaluator):
"""
Wrapper class to combine multiple :class:`DatasetEvaluator` instances.
This class dispatches every evaluation call to
all of its :class:`DatasetEvaluator`.
"""
def __init__(self, evaluators):
"""
Args:
evaluators (list): the evaluators to combine.
"""
super().__init__()
self._evaluators = evaluators
def reset(self):
for evaluator in self._evaluators:
evaluator.reset()
def process(self, inputs, outputs):
for evaluator in self._evaluators:
evaluator.process(inputs, outputs)
def evaluate(self):
results = OrderedDict()
for evaluator in self._evaluators:
result = evaluator.evaluate()
if is_main_process() and result is not None:
for k, v in result.items():
assert (
k not in results
), "Different evaluators produce results with the same key {}".format(k)
results[k] = v
return results
def inference_on_dataset(model, data_loader, evaluator):
"""
Run model on the data_loader and evaluate the metrics with evaluator.
Also benchmark the inference speed of `model.forward` accurately.
The model will be used in eval mode.
Args:
model (nn.Module): a module which accepts an object from
`data_loader` and returns some outputs. It will be temporarily set to `eval` mode.
If you wish to evaluate a model in `training` mode instead, you can
wrap the given model and override its behavior of `.eval()` and `.train()`.
data_loader: an iterable object with a length.
The elements it generates will be the inputs to the model.
evaluator (DatasetEvaluator): the evaluator to run. Use `None` if you only want
to benchmark, but don't want to do any evaluation.
Returns:
The return value of `evaluator.evaluate()`
"""
num_devices = get_world_size()
logger = logging.getLogger(__name__)
logger.info("Start inference on {} images".format(len(data_loader)))
total = len(data_loader) # inference data loader must have a fixed length
if evaluator is None:
# create a no-op evaluator
evaluator = DatasetEvaluators([])
evaluator.reset()
num_warmup = min(5, total - 1)
start_time = time.perf_counter()
total_compute_time = 0
with inference_context(model), torch.no_grad():
for idx, inputs in enumerate(data_loader):
if idx == num_warmup:
start_time = time.perf_counter()
total_compute_time = 0
start_compute_time = time.perf_counter()
outputs = model(inputs)
if torch.cuda.is_available():
torch.cuda.synchronize()
total_compute_time += time.perf_counter() - start_compute_time
evaluator.process(inputs, outputs)
iters_after_start = idx + 1 - num_warmup * int(idx >= num_warmup)
seconds_per_img = total_compute_time / iters_after_start
if idx >= num_warmup * 2 or seconds_per_img > 5:
total_seconds_per_img = (time.perf_counter() - start_time) / iters_after_start
eta = datetime.timedelta(seconds=int(total_seconds_per_img * (total - idx - 1)))
log_every_n_seconds(
logging.INFO,
"Inference done {}/{}. {:.4f} s / img. ETA={}".format(
idx + 1, total, seconds_per_img, str(eta)
),
n=5,
)
# Measure the time only for this worker (before the synchronization barrier)
total_time = time.perf_counter() - start_time
total_time_str = str(datetime.timedelta(seconds=total_time))
# NOTE this format is parsed by grep
logger.info(
"Total inference time: {} ({:.6f} s / img per device, on {} devices)".format(
total_time_str, total_time / (total - num_warmup), num_devices
)
)
total_compute_time_str = str(datetime.timedelta(seconds=int(total_compute_time)))
logger.info(
"Total inference pure compute time: {} ({:.6f} s / img per device, on {} devices)".format(
total_compute_time_str, total_compute_time / (total - num_warmup), num_devices
)
)
results = evaluator.evaluate()
# An evaluator may return None when not in main process.
# Replace it by an empty dict instead to make it easier for downstream code to handle
if results is None:
results = {}
return results
@contextmanager
def inference_context(model):
"""
A context where the model is temporarily changed to eval mode,
and restored to previous mode afterwards.
Args:
model: a torch Module
"""
training_mode = model.training
model.eval()
yield
model.train(training_mode)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import copy
import itertools
import json
import logging
import os
import pickle
from collections import OrderedDict
import torch
from fvcore.common.file_io import PathManager
import detectron2.utils.comm as comm
from detectron2.data import MetadataCatalog
from detectron2.structures import Boxes, BoxMode, pairwise_iou
from detectron2.utils.logger import create_small_table
from .coco_evaluation import instances_to_coco_json
from .evaluator import DatasetEvaluator
class LVISEvaluator(DatasetEvaluator):
"""
Evaluate object proposal and instance detection/segmentation outputs using
LVIS's metrics and evaluation API.
"""
def __init__(self, dataset_name, cfg, distributed, output_dir=None):
"""
Args:
dataset_name (str): name of the dataset to be evaluated.
It must have the following corresponding metadata:
"json_file": the path to the LVIS format annotation
cfg (CfgNode): config instance
distributed (True): if True, will collect results from all ranks for evaluation.
Otherwise, will evaluate the results in the current process.
output_dir (str): optional, an output directory to dump results.
"""
from lvis import LVIS
self._tasks = self._tasks_from_config(cfg)
self._distributed = distributed
self._output_dir = output_dir
self._cpu_device = torch.device("cpu")
self._logger = logging.getLogger(__name__)
self._metadata = MetadataCatalog.get(dataset_name)
json_file = PathManager.get_local_path(self._metadata.json_file)
self._lvis_api = LVIS(json_file)
# Test set json files do not contain annotations (evaluation must be
# performed using the LVIS evaluation server).
self._do_evaluation = len(self._lvis_api.get_ann_ids()) > 0
def reset(self):
self._predictions = []
def _tasks_from_config(self, cfg):
"""
Returns:
tuple[str]: tasks that can be evaluated under the given configuration.
"""
tasks = ("bbox",)
if cfg.MODEL.MASK_ON:
tasks = tasks + ("segm",)
return tasks
def process(self, inputs, outputs):
"""
Args:
inputs: the inputs to a LVIS model (e.g., GeneralizedRCNN).
It is a list of dict. Each dict corresponds to an image and
contains keys like "height", "width", "file_name", "image_id".
outputs: the outputs of a LVIS model. It is a list of dicts with key
"instances" that contains :class:`Instances`.
"""
for input, output in zip(inputs, outputs):
prediction = {"image_id": input["image_id"]}
if "instances" in output:
instances = output["instances"].to(self._cpu_device)
prediction["instances"] = instances_to_coco_json(instances, input["image_id"])
if "proposals" in output:
prediction["proposals"] = output["proposals"].to(self._cpu_device)
self._predictions.append(prediction)
def evaluate(self):
if self._distributed:
comm.synchronize()
predictions = comm.gather(self._predictions, dst=0)
predictions = list(itertools.chain(*predictions))
if not comm.is_main_process():
return
else:
predictions = self._predictions
if len(predictions) == 0:
self._logger.warning("[LVISEvaluator] Did not receive valid predictions.")
return {}
if self._output_dir:
PathManager.mkdirs(self._output_dir)
file_path = os.path.join(self._output_dir, "instances_predictions.pth")
with PathManager.open(file_path, "wb") as f:
torch.save(predictions, f)
self._results = OrderedDict()
if "proposals" in predictions[0]:
self._eval_box_proposals(predictions)
if "instances" in predictions[0]:
self._eval_predictions(set(self._tasks), predictions)
# Copy so the caller can do whatever with results
return copy.deepcopy(self._results)
def _eval_predictions(self, tasks, predictions):
"""
Evaluate predictions on the given tasks.
Fill self._results with the metrics of the tasks.
Args:
predictions (list[dict]): list of outputs from the model
"""
self._logger.info("Preparing results in the LVIS format ...")
lvis_results = list(itertools.chain(*[x["instances"] for x in predictions]))
# LVIS evaluator can be used to evaluate results for COCO dataset categories.
# In this case `_metadata` variable will have a field with COCO-specific category mapping.
if hasattr(self._metadata, "thing_dataset_id_to_contiguous_id"):
reverse_id_mapping = {
v: k for k, v in self._metadata.thing_dataset_id_to_contiguous_id.items()
}
for result in lvis_results:
result["category_id"] = reverse_id_mapping[result["category_id"]]
else:
# unmap the category ids for LVIS (from 0-indexed to 1-indexed)
for result in lvis_results:
result["category_id"] += 1
if self._output_dir:
file_path = os.path.join(self._output_dir, "lvis_instances_results.json")
self._logger.info("Saving results to {}".format(file_path))
with PathManager.open(file_path, "w") as f:
f.write(json.dumps(lvis_results))
f.flush()
if not self._do_evaluation:
self._logger.info("Annotations are not available for evaluation.")
return
self._logger.info("Evaluating predictions ...")
for task in sorted(tasks):
res = _evaluate_predictions_on_lvis(
self._lvis_api, lvis_results, task, class_names=self._metadata.get("thing_classes")
)
self._results[task] = res
def _eval_box_proposals(self, predictions):
"""
Evaluate the box proposals in predictions.
Fill self._results with the metrics for "box_proposals" task.
"""
if self._output_dir:
# Saving generated box proposals to file.
# Predicted box_proposals are in XYXY_ABS mode.
bbox_mode = BoxMode.XYXY_ABS.value
ids, boxes, objectness_logits = [], [], []
for prediction in predictions:
ids.append(prediction["image_id"])
boxes.append(prediction["proposals"].proposal_boxes.tensor.numpy())
objectness_logits.append(prediction["proposals"].objectness_logits.numpy())
proposal_data = {
"boxes": boxes,
"objectness_logits": objectness_logits,
"ids": ids,
"bbox_mode": bbox_mode,
}
with PathManager.open(os.path.join(self._output_dir, "box_proposals.pkl"), "wb") as f:
pickle.dump(proposal_data, f)
if not self._do_evaluation:
self._logger.info("Annotations are not available for evaluation.")
return
self._logger.info("Evaluating bbox proposals ...")
res = {}
areas = {"all": "", "small": "s", "medium": "m", "large": "l"}
for limit in [100, 1000]:
for area, suffix in areas.items():
stats = _evaluate_box_proposals(predictions, self._lvis_api, area=area, limit=limit)
key = "AR{}@{:d}".format(suffix, limit)
res[key] = float(stats["ar"].item() * 100)
self._logger.info("Proposal metrics: \n" + create_small_table(res))
self._results["box_proposals"] = res
# inspired from Detectron:
# https://github.com/facebookresearch/Detectron/blob/a6a835f5b8208c45d0dce217ce9bbda915f44df7/detectron/datasets/json_dataset_evaluator.py#L255 # noqa
def _evaluate_box_proposals(dataset_predictions, lvis_api, thresholds=None, area="all", limit=None):
"""
Evaluate detection proposal recall metrics. This function is a much
faster alternative to the official LVIS API recall evaluation code. However,
it produces slightly different results.
"""
# Record max overlap value for each gt box
# Return vector of overlap values
areas = {
"all": 0,
"small": 1,
"medium": 2,
"large": 3,
"96-128": 4,
"128-256": 5,
"256-512": 6,
"512-inf": 7,
}
area_ranges = [
[0 ** 2, 1e5 ** 2], # all
[0 ** 2, 32 ** 2], # small
[32 ** 2, 96 ** 2], # medium
[96 ** 2, 1e5 ** 2], # large
[96 ** 2, 128 ** 2], # 96-128
[128 ** 2, 256 ** 2], # 128-256
[256 ** 2, 512 ** 2], # 256-512
[512 ** 2, 1e5 ** 2],
] # 512-inf
assert area in areas, "Unknown area range: {}".format(area)
area_range = area_ranges[areas[area]]
gt_overlaps = []
num_pos = 0
for prediction_dict in dataset_predictions:
predictions = prediction_dict["proposals"]
# sort predictions in descending order
# TODO maybe remove this and make it explicit in the documentation
inds = predictions.objectness_logits.sort(descending=True)[1]
predictions = predictions[inds]
ann_ids = lvis_api.get_ann_ids(img_ids=[prediction_dict["image_id"]])
anno = lvis_api.load_anns(ann_ids)
gt_boxes = [
BoxMode.convert(obj["bbox"], BoxMode.XYWH_ABS, BoxMode.XYXY_ABS) for obj in anno
]
gt_boxes = torch.as_tensor(gt_boxes).reshape(-1, 4) # guard against no boxes
gt_boxes = Boxes(gt_boxes)
gt_areas = torch.as_tensor([obj["area"] for obj in anno])
if len(gt_boxes) == 0 or len(predictions) == 0:
continue
valid_gt_inds = (gt_areas >= area_range[0]) & (gt_areas <= area_range[1])
gt_boxes = gt_boxes[valid_gt_inds]
num_pos += len(gt_boxes)
if len(gt_boxes) == 0:
continue
if limit is not None and len(predictions) > limit:
predictions = predictions[:limit]
overlaps = pairwise_iou(predictions.proposal_boxes, gt_boxes)
_gt_overlaps = torch.zeros(len(gt_boxes))
for j in range(min(len(predictions), len(gt_boxes))):
# find which proposal box maximally covers each gt box
# and get the iou amount of coverage for each gt box
max_overlaps, argmax_overlaps = overlaps.max(dim=0)
# find which gt box is 'best' covered (i.e. 'best' = most iou)
gt_ovr, gt_ind = max_overlaps.max(dim=0)
assert gt_ovr >= 0
# find the proposal box that covers the best covered gt box
box_ind = argmax_overlaps[gt_ind]
# record the iou coverage of this gt box
_gt_overlaps[j] = overlaps[box_ind, gt_ind]
assert _gt_overlaps[j] == gt_ovr
# mark the proposal box and the gt box as used
overlaps[box_ind, :] = -1
overlaps[:, gt_ind] = -1
# append recorded iou coverage level
gt_overlaps.append(_gt_overlaps)
gt_overlaps = (
torch.cat(gt_overlaps, dim=0) if len(gt_overlaps) else torch.zeros(0, dtype=torch.float32)
)
gt_overlaps, _ = torch.sort(gt_overlaps)
if thresholds is None:
step = 0.05
thresholds = torch.arange(0.5, 0.95 + 1e-5, step, dtype=torch.float32)
recalls = torch.zeros_like(thresholds)
# compute recall for each iou threshold
for i, t in enumerate(thresholds):
recalls[i] = (gt_overlaps >= t).float().sum() / float(num_pos)
# ar = 2 * np.trapz(recalls, thresholds)
ar = recalls.mean()
return {
"ar": ar,
"recalls": recalls,
"thresholds": thresholds,
"gt_overlaps": gt_overlaps,
"num_pos": num_pos,
}
def _evaluate_predictions_on_lvis(lvis_gt, lvis_results, iou_type, class_names=None):
"""
Args:
iou_type (str):
kpt_oks_sigmas (list[float]):
class_names (None or list[str]): if provided, will use it to predict
per-category AP.
Returns:
a dict of {metric name: score}
"""
metrics = {
"bbox": ["AP", "AP50", "AP75", "APs", "APm", "APl", "APr", "APc", "APf"],
"segm": ["AP", "AP50", "AP75", "APs", "APm", "APl", "APr", "APc", "APf"],
}[iou_type]
logger = logging.getLogger(__name__)
if len(lvis_results) == 0: # TODO: check if needed
logger.warn("No predictions from the model!")
return {metric: float("nan") for metric in metrics}
if iou_type == "segm":
lvis_results = copy.deepcopy(lvis_results)
# When evaluating mask AP, if the results contain bbox, LVIS API will
# use the box area as the area of the instance, instead of the mask area.
# This leads to a different definition of small/medium/large.
# We remove the bbox field to let mask AP use mask area.
for c in lvis_results:
c.pop("bbox", None)
from lvis import LVISEval, LVISResults
lvis_results = LVISResults(lvis_gt, lvis_results)
lvis_eval = LVISEval(lvis_gt, lvis_results, iou_type)
lvis_eval.run()
lvis_eval.print_results()
# Pull the standard metrics from the LVIS results
results = lvis_eval.get_results()
results = {metric: float(results[metric] * 100) for metric in metrics}
logger.info("Evaluation results for {}: \n".format(iou_type) + create_small_table(results))
return results
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import contextlib
import io
import itertools
import json
import logging
import os
import tempfile
from collections import OrderedDict
from fvcore.common.file_io import PathManager
from PIL import Image
from tabulate import tabulate
from detectron2.data import MetadataCatalog
from detectron2.utils import comm
from .evaluator import DatasetEvaluator
logger = logging.getLogger(__name__)
class COCOPanopticEvaluator(DatasetEvaluator):
"""
Evaluate Panoptic Quality metrics on COCO using PanopticAPI.
It saves panoptic segmentation prediction in `output_dir`
It contains a synchronize call and has to be called from all workers.
"""
def __init__(self, dataset_name, output_dir):
"""
Args:
dataset_name (str): name of the dataset
output_dir (str): output directory to save results for evaluation
"""
self._metadata = MetadataCatalog.get(dataset_name)
self._thing_contiguous_id_to_dataset_id = {
v: k for k, v in self._metadata.thing_dataset_id_to_contiguous_id.items()
}
self._stuff_contiguous_id_to_dataset_id = {
v: k for k, v in self._metadata.stuff_dataset_id_to_contiguous_id.items()
}
self._predictions_json = os.path.join(output_dir, "predictions.json")
def reset(self):
self._predictions = []
def _convert_category_id(self, segment_info):
isthing = segment_info.pop("isthing", None)
if isthing is None:
# the model produces panoptic category id directly. No more conversion needed
return segment_info
if isthing is True:
segment_info["category_id"] = self._thing_contiguous_id_to_dataset_id[
segment_info["category_id"]
]
else:
segment_info["category_id"] = self._stuff_contiguous_id_to_dataset_id[
segment_info["category_id"]
]
return segment_info
def process(self, inputs, outputs):
from panopticapi.utils import id2rgb
for input, output in zip(inputs, outputs):
panoptic_img, segments_info = output["panoptic_seg"]
panoptic_img = panoptic_img.cpu().numpy()
file_name = os.path.basename(input["file_name"])
file_name_png = os.path.splitext(file_name)[0] + ".png"
with io.BytesIO() as out:
Image.fromarray(id2rgb(panoptic_img)).save(out, format="PNG")
segments_info = [self._convert_category_id(x) for x in segments_info]
self._predictions.append(
{
"image_id": input["image_id"],
"file_name": file_name_png,
"png_string": out.getvalue(),
"segments_info": segments_info,
}
)
def evaluate(self):
comm.synchronize()
self._predictions = comm.gather(self._predictions)
self._predictions = list(itertools.chain(*self._predictions))
if not comm.is_main_process():
return
# PanopticApi requires local files
gt_json = PathManager.get_local_path(self._metadata.panoptic_json)
gt_folder = PathManager.get_local_path(self._metadata.panoptic_root)
with tempfile.TemporaryDirectory(prefix="panoptic_eval") as pred_dir:
logger.info("Writing all panoptic predictions to {} ...".format(pred_dir))
for p in self._predictions:
with open(os.path.join(pred_dir, p["file_name"]), "wb") as f:
f.write(p.pop("png_string"))
with open(gt_json, "r") as f:
json_data = json.load(f)
json_data["annotations"] = self._predictions
with PathManager.open(self._predictions_json, "w") as f:
f.write(json.dumps(json_data))
from panopticapi.evaluation import pq_compute
with contextlib.redirect_stdout(io.StringIO()):
pq_res = pq_compute(
gt_json,
PathManager.get_local_path(self._predictions_json),
gt_folder=gt_folder,
pred_folder=pred_dir,
)
res = {}
res["PQ"] = 100 * pq_res["All"]["pq"]
res["SQ"] = 100 * pq_res["All"]["sq"]
res["RQ"] = 100 * pq_res["All"]["rq"]
res["PQ_th"] = 100 * pq_res["Things"]["pq"]
res["SQ_th"] = 100 * pq_res["Things"]["sq"]
res["RQ_th"] = 100 * pq_res["Things"]["rq"]
res["PQ_st"] = 100 * pq_res["Stuff"]["pq"]
res["SQ_st"] = 100 * pq_res["Stuff"]["sq"]
res["RQ_st"] = 100 * pq_res["Stuff"]["rq"]
results = OrderedDict({"panoptic_seg": res})
_print_panoptic_results(pq_res)
return results
def _print_panoptic_results(pq_res):
headers = ["", "PQ", "SQ", "RQ", "#categories"]
data = []
for name in ["All", "Things", "Stuff"]:
row = [name] + [pq_res[name][k] * 100 for k in ["pq", "sq", "rq"]] + [pq_res[name]["n"]]
data.append(row)
table = tabulate(
data, headers=headers, tablefmt="pipe", floatfmt=".3f", stralign="center", numalign="center"
)
logger.info("Panoptic Evaluation Results:\n" + table)
if __name__ == "__main__":
from detectron2.utils.logger import setup_logger
logger = setup_logger()
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--gt-json")
parser.add_argument("--gt-dir")
parser.add_argument("--pred-json")
parser.add_argument("--pred-dir")
args = parser.parse_args()
from panopticapi.evaluation import pq_compute
with contextlib.redirect_stdout(io.StringIO()):
pq_res = pq_compute(
args.gt_json, args.pred_json, gt_folder=args.gt_dir, pred_folder=args.pred_dir
)
_print_panoptic_results(pq_res)
# -*- coding: utf-8 -*-
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
import numpy as np
import os
import tempfile
import xml.etree.ElementTree as ET
from collections import OrderedDict, defaultdict
from functools import lru_cache
import torch
from fvcore.common.file_io import PathManager
from detectron2.data import MetadataCatalog
from detectron2.utils import comm
from .evaluator import DatasetEvaluator
class PascalVOCDetectionEvaluator(DatasetEvaluator):
"""
Evaluate Pascal VOC AP.
It contains a synchronization, therefore has to be called from all ranks.
Note that this is a rewrite of the official Matlab API.
The results should be similar, but not identical to the one produced by
the official API.
"""
def __init__(self, dataset_name):
"""
Args:
dataset_name (str): name of the dataset, e.g., "voc_2007_test"
"""
self._dataset_name = dataset_name
meta = MetadataCatalog.get(dataset_name)
self._anno_file_template = os.path.join(meta.dirname, "Annotations", "{}.xml")
self._image_set_path = os.path.join(meta.dirname, "ImageSets", "Main", meta.split + ".txt")
self._class_names = meta.thing_classes
assert meta.year in [2007, 2012], meta.year
self._is_2007 = meta.year == 2007
self._cpu_device = torch.device("cpu")
self._logger = logging.getLogger(__name__)
def reset(self):
self._predictions = defaultdict(list) # class name -> list of prediction strings
def process(self, inputs, outputs):
for input, output in zip(inputs, outputs):
image_id = input["image_id"]
instances = output["instances"].to(self._cpu_device)
boxes = instances.pred_boxes.tensor.numpy()
scores = instances.scores.tolist()
classes = instances.pred_classes.tolist()
for box, score, cls in zip(boxes, scores, classes):
xmin, ymin, xmax, ymax = box
# The inverse of data loading logic in `datasets/pascal_voc.py`
xmin += 1
ymin += 1
self._predictions[cls].append(
f"{image_id} {score:.3f} {xmin:.1f} {ymin:.1f} {xmax:.1f} {ymax:.1f}"
)
def evaluate(self):
"""
Returns:
dict: has a key "segm", whose value is a dict of "AP", "AP50", and "AP75".
"""
all_predictions = comm.gather(self._predictions, dst=0)
if not comm.is_main_process():
return
predictions = defaultdict(list)
for predictions_per_rank in all_predictions:
for clsid, lines in predictions_per_rank.items():
predictions[clsid].extend(lines)
del all_predictions
self._logger.info(
"Evaluating {} using {} metric. "
"Note that results do not use the official Matlab API.".format(
self._dataset_name, 2007 if self._is_2007 else 2012
)
)
with tempfile.TemporaryDirectory(prefix="pascal_voc_eval_") as dirname:
res_file_template = os.path.join(dirname, "{}.txt")
aps = defaultdict(list) # iou -> ap per class
for cls_id, cls_name in enumerate(self._class_names):
lines = predictions.get(cls_id, [""])
with open(res_file_template.format(cls_name), "w") as f:
f.write("\n".join(lines))
for thresh in range(50, 100, 5):
rec, prec, ap = voc_eval(
res_file_template,
self._anno_file_template,
self._image_set_path,
cls_name,
ovthresh=thresh / 100.0,
use_07_metric=self._is_2007,
)
aps[thresh].append(ap * 100)
ret = OrderedDict()
mAP = {iou: np.mean(x) for iou, x in aps.items()}
ret["bbox"] = {"AP": np.mean(list(mAP.values())), "AP50": mAP[50], "AP75": mAP[75]}
return ret
##############################################################################
#
# Below code is modified from
# https://github.com/rbgirshick/py-faster-rcnn/blob/master/lib/datasets/voc_eval.py
# --------------------------------------------------------
# Fast/er R-CNN
# Licensed under The MIT License [see LICENSE for details]
# Written by Bharath Hariharan
# --------------------------------------------------------
"""Python implementation of the PASCAL VOC devkit's AP evaluation code."""
@lru_cache(maxsize=None)
def parse_rec(filename):
"""Parse a PASCAL VOC xml file."""
with PathManager.open(filename) as f:
tree = ET.parse(f)
objects = []
for obj in tree.findall("object"):
obj_struct = {}
obj_struct["name"] = obj.find("name").text
obj_struct["pose"] = obj.find("pose").text
obj_struct["truncated"] = int(obj.find("truncated").text)
obj_struct["difficult"] = int(obj.find("difficult").text)
bbox = obj.find("bndbox")
obj_struct["bbox"] = [
int(bbox.find("xmin").text),
int(bbox.find("ymin").text),
int(bbox.find("xmax").text),
int(bbox.find("ymax").text),
]
objects.append(obj_struct)
return objects
def voc_ap(rec, prec, use_07_metric=False):
"""Compute VOC AP given precision and recall. If use_07_metric is true, uses
the VOC 07 11-point method (default:False).
"""
if use_07_metric:
# 11 point metric
ap = 0.0
for t in np.arange(0.0, 1.1, 0.1):
if np.sum(rec >= t) == 0:
p = 0
else:
p = np.max(prec[rec >= t])
ap = ap + p / 11.0
else:
# correct AP calculation
# first append sentinel values at the end
mrec = np.concatenate(([0.0], rec, [1.0]))
mpre = np.concatenate(([0.0], prec, [0.0]))
# compute the precision envelope
for i in range(mpre.size - 1, 0, -1):
mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])
# to calculate area under PR curve, look for points
# where X axis (recall) changes value
i = np.where(mrec[1:] != mrec[:-1])[0]
# and sum (\Delta recall) * prec
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
return ap
def voc_eval(detpath, annopath, imagesetfile, classname, ovthresh=0.5, use_07_metric=False):
"""rec, prec, ap = voc_eval(detpath,
annopath,
imagesetfile,
classname,
[ovthresh],
[use_07_metric])
Top level function that does the PASCAL VOC evaluation.
detpath: Path to detections
detpath.format(classname) should produce the detection results file.
annopath: Path to annotations
annopath.format(imagename) should be the xml annotations file.
imagesetfile: Text file containing the list of images, one image per line.
classname: Category name (duh)
[ovthresh]: Overlap threshold (default = 0.5)
[use_07_metric]: Whether to use VOC07's 11 point AP computation
(default False)
"""
# assumes detections are in detpath.format(classname)
# assumes annotations are in annopath.format(imagename)
# assumes imagesetfile is a text file with each line an image name
# first load gt
# read list of images
with PathManager.open(imagesetfile, "r") as f:
lines = f.readlines()
imagenames = [x.strip() for x in lines]
# load annots
recs = {}
for imagename in imagenames:
recs[imagename] = parse_rec(annopath.format(imagename))
# extract gt objects for this class
class_recs = {}
npos = 0
for imagename in imagenames:
R = [obj for obj in recs[imagename] if obj["name"] == classname]
bbox = np.array([x["bbox"] for x in R])
difficult = np.array([x["difficult"] for x in R]).astype(np.bool)
# difficult = np.array([False for x in R]).astype(np.bool) # treat all "difficult" as GT
det = [False] * len(R)
npos = npos + sum(~difficult)
class_recs[imagename] = {"bbox": bbox, "difficult": difficult, "det": det}
# read dets
detfile = detpath.format(classname)
with open(detfile, "r") as f:
lines = f.readlines()
splitlines = [x.strip().split(" ") for x in lines]
image_ids = [x[0] for x in splitlines]
confidence = np.array([float(x[1]) for x in splitlines])
BB = np.array([[float(z) for z in x[2:]] for x in splitlines]).reshape(-1, 4)
# sort by confidence
sorted_ind = np.argsort(-confidence)
BB = BB[sorted_ind, :]
image_ids = [image_ids[x] for x in sorted_ind]
# go down dets and mark TPs and FPs
nd = len(image_ids)
tp = np.zeros(nd)
fp = np.zeros(nd)
for d in range(nd):
R = class_recs[image_ids[d]]
bb = BB[d, :].astype(float)
ovmax = -np.inf
BBGT = R["bbox"].astype(float)
if BBGT.size > 0:
# compute overlaps
# intersection
ixmin = np.maximum(BBGT[:, 0], bb[0])
iymin = np.maximum(BBGT[:, 1], bb[1])
ixmax = np.minimum(BBGT[:, 2], bb[2])
iymax = np.minimum(BBGT[:, 3], bb[3])
iw = np.maximum(ixmax - ixmin + 1.0, 0.0)
ih = np.maximum(iymax - iymin + 1.0, 0.0)
inters = iw * ih
# union
uni = (
(bb[2] - bb[0] + 1.0) * (bb[3] - bb[1] + 1.0)
+ (BBGT[:, 2] - BBGT[:, 0] + 1.0) * (BBGT[:, 3] - BBGT[:, 1] + 1.0)
- inters
)
overlaps = inters / uni
ovmax = np.max(overlaps)
jmax = np.argmax(overlaps)
if ovmax > ovthresh:
if not R["difficult"][jmax]:
if not R["det"][jmax]:
tp[d] = 1.0
R["det"][jmax] = 1
else:
fp[d] = 1.0
else:
fp[d] = 1.0
# compute precision recall
fp = np.cumsum(fp)
tp = np.cumsum(tp)
rec = tp / float(npos)
# avoid divide by zero in case the first detection matches a difficult
# ground truth
prec = tp / np.maximum(tp + fp, np.finfo(np.float64).eps)
ap = voc_ap(rec, prec, use_07_metric)
return rec, prec, ap
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import itertools
import json
import numpy as np
import os
import torch
from fvcore.common.file_io import PathManager
from pycocotools.cocoeval import COCOeval, maskUtils
from detectron2.structures import BoxMode, RotatedBoxes, pairwise_iou_rotated
from .coco_evaluation import COCOEvaluator
class RotatedCOCOeval(COCOeval):
@staticmethod
def is_rotated(box_list):
if type(box_list) == np.ndarray:
return box_list.shape[1] == 5
elif type(box_list) == list:
if box_list == []: # cannot decide the box_dim
return False
return np.all(
np.array(
[
(len(obj) == 5) and ((type(obj) == list) or (type(obj) == np.ndarray))
for obj in box_list
]
)
)
return False
@staticmethod
def boxlist_to_tensor(boxlist, output_box_dim):
if type(boxlist) == np.ndarray:
box_tensor = torch.from_numpy(boxlist)
elif type(boxlist) == list:
if boxlist == []:
return torch.zeros((0, output_box_dim), dtype=torch.float32)
else:
box_tensor = torch.FloatTensor(boxlist)
else:
raise Exception("Unrecognized boxlist type")
input_box_dim = box_tensor.shape[1]
if input_box_dim != output_box_dim:
if input_box_dim == 4 and output_box_dim == 5:
box_tensor = BoxMode.convert(box_tensor, BoxMode.XYWH_ABS, BoxMode.XYWHA_ABS)
else:
raise Exception(
"Unable to convert from {}-dim box to {}-dim box".format(
input_box_dim, output_box_dim
)
)
return box_tensor
def compute_iou_dt_gt(self, dt, gt, is_crowd):
if self.is_rotated(dt) or self.is_rotated(gt):
# TODO: take is_crowd into consideration
assert all(c == 0 for c in is_crowd)
dt = RotatedBoxes(self.boxlist_to_tensor(dt, output_box_dim=5))
gt = RotatedBoxes(self.boxlist_to_tensor(gt, output_box_dim=5))
return pairwise_iou_rotated(dt, gt)
else:
# This is the same as the classical COCO evaluation
return maskUtils.iou(dt, gt, is_crowd)
def computeIoU(self, imgId, catId):
p = self.params
if p.useCats:
gt = self._gts[imgId, catId]
dt = self._dts[imgId, catId]
else:
gt = [_ for cId in p.catIds for _ in self._gts[imgId, cId]]
dt = [_ for cId in p.catIds for _ in self._dts[imgId, cId]]
if len(gt) == 0 and len(dt) == 0:
return []
inds = np.argsort([-d["score"] for d in dt], kind="mergesort")
dt = [dt[i] for i in inds]
if len(dt) > p.maxDets[-1]:
dt = dt[0 : p.maxDets[-1]]
assert p.iouType == "bbox", "unsupported iouType for iou computation"
g = [g["bbox"] for g in gt]
d = [d["bbox"] for d in dt]
# compute iou between each dt and gt region
iscrowd = [int(o["iscrowd"]) for o in gt]
# Note: this function is copied from cocoeval.py in cocoapi
# and the major difference is here.
ious = self.compute_iou_dt_gt(d, g, iscrowd)
return ious
class RotatedCOCOEvaluator(COCOEvaluator):
"""
Evaluate object proposal/instance detection outputs using COCO-like metrics and APIs,
with rotated boxes support.
Note: this uses IOU only and does not consider angle differences.
"""
def process(self, inputs, outputs):
"""
Args:
inputs: the inputs to a COCO model (e.g., GeneralizedRCNN).
It is a list of dict. Each dict corresponds to an image and
contains keys like "height", "width", "file_name", "image_id".
outputs: the outputs of a COCO model. It is a list of dicts with key
"instances" that contains :class:`Instances`.
"""
for input, output in zip(inputs, outputs):
prediction = {"image_id": input["image_id"]}
if "instances" in output:
instances = output["instances"].to(self._cpu_device)
prediction["instances"] = self.instances_to_json(instances, input["image_id"])
if "proposals" in output:
prediction["proposals"] = output["proposals"].to(self._cpu_device)
self._predictions.append(prediction)
def instances_to_json(self, instances, img_id):
num_instance = len(instances)
if num_instance == 0:
return []
boxes = instances.pred_boxes.tensor.numpy()
if boxes.shape[1] == 4:
boxes = BoxMode.convert(boxes, BoxMode.XYXY_ABS, BoxMode.XYWH_ABS)
boxes = boxes.tolist()
scores = instances.scores.tolist()
classes = instances.pred_classes.tolist()
results = []
for k in range(num_instance):
result = {
"image_id": img_id,
"category_id": classes[k],
"bbox": boxes[k],
"score": scores[k],
}
results.append(result)
return results
def _eval_predictions(self, tasks, predictions):
"""
Evaluate predictions on the given tasks.
Fill self._results with the metrics of the tasks.
"""
self._logger.info("Preparing results for COCO format ...")
coco_results = list(itertools.chain(*[x["instances"] for x in predictions]))
# unmap the category ids for COCO
if hasattr(self._metadata, "thing_dataset_id_to_contiguous_id"):
reverse_id_mapping = {
v: k for k, v in self._metadata.thing_dataset_id_to_contiguous_id.items()
}
for result in coco_results:
result["category_id"] = reverse_id_mapping[result["category_id"]]
if self._output_dir:
file_path = os.path.join(self._output_dir, "coco_instances_results.json")
self._logger.info("Saving results to {}".format(file_path))
with PathManager.open(file_path, "w") as f:
f.write(json.dumps(coco_results))
f.flush()
if not self._do_evaluation:
self._logger.info("Annotations are not available for evaluation.")
return
self._logger.info("Evaluating predictions ...")
for task in sorted(tasks):
assert task == "bbox", "Task {} is not supported".format(task)
coco_eval = (
self._evaluate_predictions_on_coco(self._coco_api, coco_results)
if len(coco_results) > 0
else None # cocoapi does not handle empty results very well
)
res = self._derive_coco_results(
coco_eval, task, class_names=self._metadata.get("thing_classes")
)
self._results[task] = res
def _evaluate_predictions_on_coco(self, coco_gt, coco_results):
"""
Evaluate the coco results using COCOEval API.
"""
assert len(coco_results) > 0
coco_dt = coco_gt.loadRes(coco_results)
# Only bbox is supported for now
coco_eval = RotatedCOCOeval(coco_gt, coco_dt, iouType="bbox")
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()
return coco_eval
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import itertools
import json
import logging
import numpy as np
import os
from collections import OrderedDict
import PIL.Image as Image
import pycocotools.mask as mask_util
import torch
from fvcore.common.file_io import PathManager
from detectron2.data import DatasetCatalog, MetadataCatalog
from detectron2.utils.comm import all_gather, is_main_process, synchronize
from .evaluator import DatasetEvaluator
class SemSegEvaluator(DatasetEvaluator):
"""
Evaluate semantic segmentation
"""
def __init__(self, dataset_name, distributed, num_classes, ignore_label=255, output_dir=None):
"""
Args:
dataset_name (str): name of the dataset to be evaluated.
distributed (True): if True, will collect results from all ranks for evaluation.
Otherwise, will evaluate the results in the current process.
num_classes (int): number of classes
ignore_label (int): value in semantic segmentation ground truth. Predictions for the
corresponding pixels should be ignored.
output_dir (str): an output directory to dump results.
"""
self._dataset_name = dataset_name
self._distributed = distributed
self._output_dir = output_dir
self._num_classes = num_classes
self._ignore_label = ignore_label
self._N = num_classes + 1
self._cpu_device = torch.device("cpu")
self._logger = logging.getLogger(__name__)
self.input_file_to_gt_file = {
dataset_record["file_name"]: dataset_record["sem_seg_file_name"]
for dataset_record in DatasetCatalog.get(dataset_name)
}
meta = MetadataCatalog.get(dataset_name)
# Dict that maps contiguous training ids to COCO category ids
try:
c2d = meta.stuff_dataset_id_to_contiguous_id
self._contiguous_id_to_dataset_id = {v: k for k, v in c2d.items()}
except AttributeError:
self._contiguous_id_to_dataset_id = None
self._class_names = meta.stuff_classes
def reset(self):
self._conf_matrix = np.zeros((self._N, self._N), dtype=np.int64)
self._predictions = []
def process(self, inputs, outputs):
"""
Args:
inputs: the inputs to a model.
It is a list of dicts. Each dict corresponds to an image and
contains keys like "height", "width", "file_name".
outputs: the outputs of a model. It is either list of semantic segmentation predictions
(Tensor [H, W]) or list of dicts with key "sem_seg" that contains semantic
segmentation prediction in the same format.
"""
for input, output in zip(inputs, outputs):
output = output["sem_seg"].argmax(dim=0).to(self._cpu_device)
pred = np.array(output, dtype=np.int)
with PathManager.open(self.input_file_to_gt_file[input["file_name"]], "rb") as f:
gt = np.array(Image.open(f), dtype=np.int)
gt[gt == self._ignore_label] = self._num_classes
self._conf_matrix += np.bincount(
self._N * pred.reshape(-1) + gt.reshape(-1), minlength=self._N ** 2
).reshape(self._N, self._N)
self._predictions.extend(self.encode_json_sem_seg(pred, input["file_name"]))
def evaluate(self):
"""
Evaluates standard semantic segmentation metrics (http://cocodataset.org/#stuff-eval):
* Mean intersection-over-union averaged across classes (mIoU)
* Frequency Weighted IoU (fwIoU)
* Mean pixel accuracy averaged across classes (mACC)
* Pixel Accuracy (pACC)
"""
if self._distributed:
synchronize()
conf_matrix_list = all_gather(self._conf_matrix)
self._predictions = all_gather(self._predictions)
self._predictions = list(itertools.chain(*self._predictions))
if not is_main_process():
return
self._conf_matrix = np.zeros_like(self._conf_matrix)
for conf_matrix in conf_matrix_list:
self._conf_matrix += conf_matrix
if self._output_dir:
PathManager.mkdirs(self._output_dir)
file_path = os.path.join(self._output_dir, "sem_seg_predictions.json")
with PathManager.open(file_path, "w") as f:
f.write(json.dumps(self._predictions))
acc = np.full(self._num_classes, np.nan, dtype=np.float)
iou = np.full(self._num_classes, np.nan, dtype=np.float)
tp = self._conf_matrix.diagonal()[:-1].astype(np.float)
pos_gt = np.sum(self._conf_matrix[:-1, :-1], axis=0).astype(np.float)
class_weights = pos_gt / np.sum(pos_gt)
pos_pred = np.sum(self._conf_matrix[:-1, :-1], axis=1).astype(np.float)
acc_valid = pos_gt > 0
acc[acc_valid] = tp[acc_valid] / pos_gt[acc_valid]
iou_valid = (pos_gt + pos_pred) > 0
union = pos_gt + pos_pred - tp
iou[acc_valid] = tp[acc_valid] / union[acc_valid]
macc = np.sum(acc[acc_valid]) / np.sum(acc_valid)
miou = np.sum(iou[acc_valid]) / np.sum(iou_valid)
fiou = np.sum(iou[acc_valid] * class_weights[acc_valid])
pacc = np.sum(tp) / np.sum(pos_gt)
res = {}
res["mIoU"] = 100 * miou
res["fwIoU"] = 100 * fiou
for i, name in enumerate(self._class_names):
res["IoU-{}".format(name)] = 100 * iou[i]
res["mACC"] = 100 * macc
res["pACC"] = 100 * pacc
for i, name in enumerate(self._class_names):
res["ACC-{}".format(name)] = 100 * acc[i]
if self._output_dir:
file_path = os.path.join(self._output_dir, "sem_seg_evaluation.pth")
with PathManager.open(file_path, "wb") as f:
torch.save(res, f)
results = OrderedDict({"sem_seg": res})
self._logger.info(results)
return results
def encode_json_sem_seg(self, sem_seg, input_file_name):
"""
Convert semantic segmentation to COCO stuff format with segments encoded as RLEs.
See http://cocodataset.org/#format-results
"""
json_list = []
for label in np.unique(sem_seg):
if self._contiguous_id_to_dataset_id is not None:
assert (
label in self._contiguous_id_to_dataset_id
), "Label {} is not in the metadata info for {}".format(label, self._dataset_name)
dataset_id = self._contiguous_id_to_dataset_id[label]
else:
dataset_id = int(label)
mask = (sem_seg == label).astype(np.uint8)
mask_rle = mask_util.encode(np.array(mask[:, :, None], order="F"))[0]
mask_rle["counts"] = mask_rle["counts"].decode("utf-8")
json_list.append(
{"file_name": input_file_name, "category_id": dataset_id, "segmentation": mask_rle}
)
return json_list
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
import numpy as np
import pprint
import sys
from collections import OrderedDict
from collections.abc import Mapping
def print_csv_format(results):
"""
Print main metrics in a format similar to Detectron,
so that they are easy to copypaste into a spreadsheet.
Args:
results (OrderedDict[dict]): task_name -> {metric -> score}
"""
assert isinstance(results, OrderedDict), results # unordered results cannot be properly printed
logger = logging.getLogger(__name__)
for task, res in results.items():
# Don't print "AP-category" metrics since they are usually not tracked.
important_res = [(k, v) for k, v in res.items() if "-" not in k]
logger.info("copypaste: Task: {}".format(task))
logger.info("copypaste: " + ",".join([k[0] for k in important_res]))
logger.info("copypaste: " + ",".join(["{0:.4f}".format(k[1]) for k in important_res]))
def verify_results(cfg, results):
"""
Args:
results (OrderedDict[dict]): task_name -> {metric -> score}
Returns:
bool: whether the verification succeeds or not
"""
expected_results = cfg.TEST.EXPECTED_RESULTS
if not len(expected_results):
return True
ok = True
for task, metric, expected, tolerance in expected_results:
actual = results[task][metric]
if not np.isfinite(actual):
ok = False
diff = abs(actual - expected)
if diff > tolerance:
ok = False
logger = logging.getLogger(__name__)
if not ok:
logger.error("Result verification failed!")
logger.error("Expected Results: " + str(expected_results))
logger.error("Actual Results: " + pprint.pformat(results))
sys.exit(1)
else:
logger.info("Results verification passed.")
return ok
def flatten_results_dict(results):
"""
Expand a hierarchical dict of scalars into a flat dict of scalars.
If results[k1][k2][k3] = v, the returned dict will have the entry
{"k1/k2/k3": v}.
Args:
results (dict):
"""
r = {}
for k, v in results.items():
if isinstance(v, Mapping):
v = flatten_results_dict(v)
for kk, vv in v.items():
r[k + "/" + kk] = vv
else:
r[k] = v
return r
This directory contains code to prepare a detectron2 model for deployment.
Currently it supports exporting a detectron2 model to Caffe2 format through ONNX.
Please see [documentation](https://detectron2.readthedocs.io/tutorials/deployment.html) for its usage.
### Acknowledgements
Thanks to Mobile Vision team at Facebook for developing the conversion tools.
# -*- coding: utf-8 -*-
from .api import *
__all__ = [k for k in globals().keys() if not k.startswith("_")]
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
import copy
import logging
import os
import torch
from caffe2.proto import caffe2_pb2
from torch import nn
from detectron2.config import CfgNode as CN
from .caffe2_export import export_caffe2_detection_model
from .caffe2_export import export_onnx_model as export_onnx_model_impl
from .caffe2_export import run_and_save_graph
from .caffe2_inference import ProtobufDetectionModel
from .caffe2_modeling import META_ARCH_CAFFE2_EXPORT_TYPE_MAP, convert_batched_inputs_to_c2_format
from .shared import get_pb_arg_vali, get_pb_arg_vals, save_graph
__all__ = [
"add_export_config",
"export_caffe2_model",
"Caffe2Model",
"export_onnx_model",
"Caffe2Tracer",
]
def add_export_config(cfg):
"""
Args:
cfg (CfgNode): a detectron2 config
Returns:
CfgNode: an updated config with new options that will be used
by :class:`Caffe2Tracer`.
"""
is_frozen = cfg.is_frozen()
cfg.defrost()
cfg.EXPORT_CAFFE2 = CN()
cfg.EXPORT_CAFFE2.USE_HEATMAP_MAX_KEYPOINT = False
if is_frozen:
cfg.freeze()
return cfg
class Caffe2Tracer:
"""
Make a detectron2 model traceable with caffe2 style.
An original detectron2 model may not be traceable, or
cannot be deployed directly after being traced, due to some reasons:
1. control flow in some ops
2. custom ops
3. complicated pre/post processing
This class provides a traceable version of a detectron2 model by:
1. Rewrite parts of the model using ops in caffe2. Note that some ops do
not have GPU implementation.
2. Define the inputs "after pre-processing" as inputs to the model
3. Remove post-processing and produce raw layer outputs
More specifically about inputs: all builtin models take two input tensors.
(1) NCHW float "data" which is an image (usually in [0, 255])
(2) Nx3 float "im_info", each row of which is (height, width, 1.0)
After making a traceable model, the class provide methods to export such a
model to different deployment formats.
The class currently only supports models using builtin meta architectures.
"""
def __init__(self, cfg, model, inputs):
"""
Args:
cfg (CfgNode): a detectron2 config, with extra export-related options
added by :func:`add_export_config`.
model (nn.Module): a model built by
:func:`detectron2.modeling.build_model`.
inputs: sample inputs that the given model takes for inference.
Will be used to trace the model.
"""
assert isinstance(cfg, CN), cfg
assert isinstance(model, torch.nn.Module), type(model)
if "EXPORT_CAFFE2" not in cfg:
cfg = add_export_config(cfg) # will just the defaults
self.cfg = cfg
self.model = model
self.inputs = inputs
def _get_traceable(self):
# TODO how to make it extensible to support custom models
C2MetaArch = META_ARCH_CAFFE2_EXPORT_TYPE_MAP[self.cfg.MODEL.META_ARCHITECTURE]
traceable_model = C2MetaArch(self.cfg, copy.deepcopy(self.model))
traceable_inputs = traceable_model.get_caffe2_inputs(self.inputs)
return traceable_model, traceable_inputs
def export_caffe2(self):
"""
Export the model to Caffe2's protobuf format.
The returned object can be saved with `.save_protobuf()` method.
The result can be loaded and executed using Caffe2 runtime.
Returns:
Caffe2Model
"""
model, inputs = self._get_traceable()
predict_net, init_net = export_caffe2_detection_model(model, inputs)
return Caffe2Model(predict_net, init_net)
def export_onnx(self):
"""
Export the model to ONNX format.
Note that the exported model contains custom ops only available in caffe2, therefore it
cannot be directly executed by other runtime. Post-processing or transformation passes
may be applied on the model to accommodate different runtimes.
Returns:
onnx.ModelProto: an onnx model.
"""
model, inputs = self._get_traceable()
return export_onnx_model_impl(model, (inputs,))
def export_torchscript(self):
"""
Export the model to a `torch.jit.TracedModule` by tracing.
The returned object can be saved to a file by ".save()".
Returns:
torch.jit.TracedModule: a torch TracedModule
"""
model, inputs = self._get_traceable()
logger = logging.getLogger(__name__)
logger.info("Tracing the model with torch.jit.trace ...")
with torch.no_grad():
return torch.jit.trace(model, (inputs,), optimize=True)
def export_caffe2_model(cfg, model, inputs):
"""
Export a detectron2 model to caffe2 format.
Args:
cfg (CfgNode): a detectron2 config, with extra export-related options
added by :func:`add_export_config`.
model (nn.Module): a model built by
:func:`detectron2.modeling.build_model`.
It will be modified by this function.
inputs: sample inputs that the given model takes for inference.
Will be used to trace the model.
Returns:
Caffe2Model
"""
return Caffe2Tracer(cfg, model, inputs).export_caffe2()
def export_onnx_model(cfg, model, inputs):
"""
Export a detectron2 model to ONNX format.
Note that the exported model contains custom ops only available in caffe2, therefore it
cannot be directly executed by other runtime. Post-processing or transformation passes
may be applied on the model to accommodate different runtimes.
Args:
cfg (CfgNode): a detectron2 config, with extra export-related options
added by :func:`add_export_config`.
model (nn.Module): a model built by
:func:`detectron2.modeling.build_model`.
It will be modified by this function.
inputs: sample inputs that the given model takes for inference.
Will be used to trace the model.
Returns:
onnx.ModelProto: an onnx model.
"""
return Caffe2Tracer(cfg, model, inputs).export_onnx()
class Caffe2Model(nn.Module):
"""
A wrapper around the traced model in caffe2's pb format.
"""
def __init__(self, predict_net, init_net):
super().__init__()
self.eval() # always in eval mode
self._predict_net = predict_net
self._init_net = init_net
self._predictor = None
@property
def predict_net(self):
"""
Returns:
core.Net: the underlying caffe2 predict net
"""
return self._predict_net
@property
def init_net(self):
"""
Returns:
core.Net: the underlying caffe2 init net
"""
return self._init_net
__init__.__HIDE_SPHINX_DOC__ = True
def save_protobuf(self, output_dir):
"""
Save the model as caffe2's protobuf format.
Args:
output_dir (str): the output directory to save protobuf files.
"""
logger = logging.getLogger(__name__)
logger.info("Saving model to {} ...".format(output_dir))
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "model.pb"), "wb") as f:
f.write(self._predict_net.SerializeToString())
with open(os.path.join(output_dir, "model.pbtxt"), "w") as f:
f.write(str(self._predict_net))
with open(os.path.join(output_dir, "model_init.pb"), "wb") as f:
f.write(self._init_net.SerializeToString())
def save_graph(self, output_file, inputs=None):
"""
Save the graph as SVG format.
Args:
output_file (str): a SVG file
inputs: optional inputs given to the model.
If given, the inputs will be used to run the graph to record
shape of every tensor. The shape information will be
saved together with the graph.
"""
if inputs is None:
save_graph(self._predict_net, output_file, op_only=False)
else:
size_divisibility = get_pb_arg_vali(self._predict_net, "size_divisibility", 0)
device = get_pb_arg_vals(self._predict_net, "device", b"cpu").decode("ascii")
inputs = convert_batched_inputs_to_c2_format(inputs, size_divisibility, device)
inputs = [x.cpu().numpy() for x in inputs]
run_and_save_graph(self._predict_net, self._init_net, inputs, output_file)
@staticmethod
def load_protobuf(dir):
"""
Args:
dir (str): a directory used to save Caffe2Model with
:meth:`save_protobuf`.
The files "model.pb" and "model_init.pb" are needed.
Returns:
Caffe2Model: the caffe2 model loaded from this directory.
"""
predict_net = caffe2_pb2.NetDef()
with open(os.path.join(dir, "model.pb"), "rb") as f:
predict_net.ParseFromString(f.read())
init_net = caffe2_pb2.NetDef()
with open(os.path.join(dir, "model_init.pb"), "rb") as f:
init_net.ParseFromString(f.read())
return Caffe2Model(predict_net, init_net)
def __call__(self, inputs):
"""
An interface that wraps around a caffe2 model and mimics detectron2's models'
input & output format. This is used to compare the outputs of caffe2 model
with its original torch model.
Due to the extra conversion between torch/caffe2,
this method is not meant for benchmark.
"""
if self._predictor is None:
self._predictor = ProtobufDetectionModel(self._predict_net, self._init_net)
return self._predictor(inputs)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
import math
import torch
import torch.nn.functional as F
from detectron2.layers import cat
from detectron2.layers.roi_align_rotated import ROIAlignRotated
from detectron2.modeling import poolers
from detectron2.modeling.proposal_generator import rpn
from detectron2.modeling.roi_heads.mask_head import mask_rcnn_inference
from detectron2.structures import Boxes, ImageList, Instances, Keypoints
from .shared import alias, to_device
"""
This file contains caffe2-compatible implementation of several detectrno2 components.
"""
class Caffe2Boxes(Boxes):
"""
Representing a list of detectron2.structures.Boxes from minibatch, each box
is represented by a 5d vector (batch index + 4 coordinates), or a 6d vector
(batch index + 5 coordinates) for RotatedBoxes.
"""
def __init__(self, tensor):
assert isinstance(tensor, torch.Tensor)
assert tensor.dim() == 2 and tensor.size(-1) in [4, 5, 6], tensor.size()
# TODO: make tensor immutable when dim is Nx5 for Boxes,
# and Nx6 for RotatedBoxes?
self.tensor = tensor
# TODO clean up this class, maybe just extend Instances
class InstancesList(object):
"""
Tensor representation of a list of Instances object for a batch of images.
When dealing with a batch of images with Caffe2 ops, a list of bboxes
(instances) are usually represented by single Tensor with size
(sigma(Ni), 5) or (sigma(Ni), 4) plus a batch split Tensor. This class is
for providing common functions to convert between these two representations.
"""
def __init__(self, im_info, indices, extra_fields=None):
# [N, 3] -> (H, W, Scale)
self.im_info = im_info
# [N,] -> indice of batch to which the instance belongs
self.indices = indices
# [N, ...]
self.batch_extra_fields = extra_fields or {}
self.image_size = self.im_info
def get_fields(self):
""" like `get_fields` in the Instances object,
but return each field in tensor representations """
ret = {}
for k, v in self.batch_extra_fields.items():
# if isinstance(v, torch.Tensor):
# tensor_rep = v
# elif isinstance(v, (Boxes, Keypoints)):
# tensor_rep = v.tensor
# else:
# raise ValueError("Can't find tensor representation for: {}".format())
ret[k] = v
return ret
def has(self, name):
return name in self.batch_extra_fields
def set(self, name, value):
data_len = len(value)
if len(self.batch_extra_fields):
assert (
len(self) == data_len
), "Adding a field of length {} to a Instances of length {}".format(data_len, len(self))
self.batch_extra_fields[name] = value
def __setattr__(self, name, val):
if name in ["im_info", "indices", "batch_extra_fields", "image_size"]:
super().__setattr__(name, val)
else:
self.set(name, val)
def __getattr__(self, name):
if name not in self.batch_extra_fields:
raise AttributeError("Cannot find field '{}' in the given Instances!".format(name))
return self.batch_extra_fields[name]
def __len__(self):
return len(self.indices)
def flatten(self):
ret = []
for _, v in self.batch_extra_fields.items():
if isinstance(v, (Boxes, Keypoints)):
ret.append(v.tensor)
else:
ret.append(v)
return ret
@staticmethod
def to_d2_instances_list(instances_list):
"""
Convert InstancesList to List[Instances]. The input `instances_list` can
also be a List[Instances], in this case this method is a non-op.
"""
if not isinstance(instances_list, InstancesList):
assert all(isinstance(x, Instances) for x in instances_list)
return instances_list
ret = []
for i, info in enumerate(instances_list.im_info):
instances = Instances(torch.Size([int(info[0].item()), int(info[1].item())]))
ids = instances_list.indices == i
for k, v in instances_list.batch_extra_fields.items():
if isinstance(v, torch.Tensor):
instances.set(k, v[ids])
continue
elif isinstance(v, Boxes):
instances.set(k, v[ids, -4:])
continue
target_type, tensor_source = v
assert isinstance(tensor_source, torch.Tensor)
assert tensor_source.shape[0] == instances_list.indices.shape[0]
tensor_source = tensor_source[ids]
if issubclass(target_type, Boxes):
instances.set(k, Boxes(tensor_source[:, -4:]))
elif issubclass(target_type, Keypoints):
instances.set(k, Keypoints(tensor_source))
elif issubclass(target_type, torch.Tensor):
instances.set(k, tensor_source)
else:
raise ValueError("Can't handle targe type: {}".format(target_type))
ret.append(instances)
return ret
class Caffe2Compatible(object):
def _get_tensor_mode(self):
return self._tensor_mode
def _set_tensor_mode(self, v):
self._tensor_mode = v
tensor_mode = property(_get_tensor_mode, _set_tensor_mode)
"""
If true, the model expects C2-style tensor only inputs/outputs format.
"""
class Caffe2RPN(Caffe2Compatible, rpn.RPN):
def forward(self, images, features, gt_instances=None):
assert not self.training
features = [features[f] for f in self.in_features]
objectness_logits_pred, anchor_deltas_pred = self.rpn_head(features)
assert isinstance(images, ImageList)
if self.tensor_mode:
im_info = images.image_sizes
else:
im_info = torch.Tensor(
[[im_sz[0], im_sz[1], torch.Tensor([1.0])] for im_sz in images.image_sizes]
).to(images.tensor.device)
assert isinstance(im_info, torch.Tensor)
rpn_rois_list = []
rpn_roi_probs_list = []
for scores, bbox_deltas, cell_anchors_tensor, feat_stride in zip(
objectness_logits_pred,
anchor_deltas_pred,
iter(self.anchor_generator.cell_anchors),
self.anchor_generator.strides,
):
scores = scores.detach()
bbox_deltas = bbox_deltas.detach()
rpn_rois, rpn_roi_probs = torch.ops._caffe2.GenerateProposals(
scores,
bbox_deltas,
im_info,
cell_anchors_tensor,
spatial_scale=1.0 / feat_stride,
pre_nms_topN=self.pre_nms_topk[self.training],
post_nms_topN=self.post_nms_topk[self.training],
nms_thresh=self.nms_thresh,
min_size=self.min_box_side_len,
# correct_transform_coords=True, # deprecated argument
angle_bound_on=True, # Default
angle_bound_lo=-180,
angle_bound_hi=180,
clip_angle_thresh=1.0, # Default
legacy_plus_one=False,
)
rpn_rois_list.append(rpn_rois)
rpn_roi_probs_list.append(rpn_roi_probs)
# For FPN in D2, in RPN all proposals from different levels are concated
# together, ranked and picked by top post_nms_topk. Then in ROIPooler
# it calculates level_assignments and calls the RoIAlign from
# the corresponding level.
if len(objectness_logits_pred) == 1:
rpn_rois = rpn_rois_list[0]
rpn_roi_probs = rpn_roi_probs_list[0]
else:
assert len(rpn_rois_list) == len(rpn_roi_probs_list)
rpn_post_nms_topN = self.post_nms_topk[self.training]
device = rpn_rois_list[0].device
input_list = [to_device(x, "cpu") for x in (rpn_rois_list + rpn_roi_probs_list)]
# TODO remove this after confirming rpn_max_level/rpn_min_level
# is not needed in CollectRpnProposals.
feature_strides = list(self.anchor_generator.strides)
rpn_min_level = int(math.log2(feature_strides[0]))
rpn_max_level = int(math.log2(feature_strides[-1]))
assert (rpn_max_level - rpn_min_level + 1) == len(
rpn_rois_list
), "CollectRpnProposals requires continuous levels"
rpn_rois = torch.ops._caffe2.CollectRpnProposals(
input_list,
# NOTE: in current implementation, rpn_max_level and rpn_min_level
# are not needed, only the subtraction of two matters and it
# can be infer from the number of inputs. Keep them now for
# consistency.
rpn_max_level=2 + len(rpn_rois_list) - 1,
rpn_min_level=2,
rpn_post_nms_topN=rpn_post_nms_topN,
)
rpn_rois = to_device(rpn_rois, device)
rpn_roi_probs = []
proposals = self.c2_postprocess(im_info, rpn_rois, rpn_roi_probs, self.tensor_mode)
return proposals, {}
@staticmethod
def c2_postprocess(im_info, rpn_rois, rpn_roi_probs, tensor_mode):
proposals = InstancesList(
im_info=im_info,
indices=rpn_rois[:, 0],
extra_fields={
"proposal_boxes": Caffe2Boxes(rpn_rois),
"objectness_logits": (torch.Tensor, rpn_roi_probs),
},
)
if not tensor_mode:
proposals = InstancesList.to_d2_instances_list(proposals)
else:
proposals = [proposals]
return proposals
class Caffe2ROIPooler(Caffe2Compatible, poolers.ROIPooler):
@staticmethod
def c2_preprocess(box_lists):
assert all(isinstance(x, Boxes) for x in box_lists)
if all(isinstance(x, Caffe2Boxes) for x in box_lists):
# input is pure-tensor based
assert len(box_lists) == 1
pooler_fmt_boxes = box_lists[0].tensor
else:
pooler_fmt_boxes = poolers.convert_boxes_to_pooler_format(box_lists)
return pooler_fmt_boxes
def forward(self, x, box_lists):
assert not self.training
pooler_fmt_boxes = self.c2_preprocess(box_lists)
num_level_assignments = len(self.level_poolers)
if num_level_assignments == 1:
if isinstance(self.level_poolers[0], ROIAlignRotated):
c2_roi_align = torch.ops._caffe2.RoIAlignRotated
aligned = True
else:
c2_roi_align = torch.ops._caffe2.RoIAlign
aligned = self.level_poolers[0].aligned
out = c2_roi_align(
x[0],
pooler_fmt_boxes,
order="NCHW",
spatial_scale=float(self.level_poolers[0].spatial_scale),
pooled_h=int(self.output_size[0]),
pooled_w=int(self.output_size[1]),
sampling_ratio=int(self.level_poolers[0].sampling_ratio),
aligned=aligned,
)
return out
device = pooler_fmt_boxes.device
assert (
self.max_level - self.min_level + 1 == 4
), "Currently DistributeFpnProposals only support 4 levels"
fpn_outputs = torch.ops._caffe2.DistributeFpnProposals(
to_device(pooler_fmt_boxes, "cpu"),
roi_canonical_scale=self.canonical_box_size,
roi_canonical_level=self.canonical_level,
roi_max_level=self.max_level,
roi_min_level=self.min_level,
legacy_plus_one=False,
)
fpn_outputs = [to_device(x, device) for x in fpn_outputs]
rois_fpn_list = fpn_outputs[:-1]
rois_idx_restore_int32 = fpn_outputs[-1]
roi_feat_fpn_list = []
for roi_fpn, x_level, pooler in zip(rois_fpn_list, x, self.level_poolers):
if isinstance(pooler, ROIAlignRotated):
c2_roi_align = torch.ops._caffe2.RoIAlignRotated
aligned = True
else:
c2_roi_align = torch.ops._caffe2.RoIAlign
aligned = bool(pooler.aligned)
roi_feat_fpn = c2_roi_align(
x_level,
roi_fpn,
order="NCHW",
spatial_scale=float(pooler.spatial_scale),
pooled_h=int(self.output_size[0]),
pooled_w=int(self.output_size[1]),
sampling_ratio=int(pooler.sampling_ratio),
aligned=aligned,
)
roi_feat_fpn_list.append(roi_feat_fpn)
roi_feat_shuffled = cat(roi_feat_fpn_list, dim=0)
roi_feat = torch.ops._caffe2.BatchPermutation(roi_feat_shuffled, rois_idx_restore_int32)
return roi_feat
class Caffe2FastRCNNOutputsInference:
def __init__(self, tensor_mode):
self.tensor_mode = tensor_mode # whether the output is caffe2 tensor mode
def __call__(self, box_predictor, predictions, proposals):
""" equivalent to FastRCNNOutputLayers.inference """
score_thresh = box_predictor.test_score_thresh
nms_thresh = box_predictor.test_nms_thresh
topk_per_image = box_predictor.test_topk_per_image
is_rotated = len(box_predictor.box2box_transform.weights) == 5
if is_rotated:
box_dim = 5
assert box_predictor.box2box_transform.weights[4] == 1, (
"The weights for Rotated BBoxTransform in C2 have only 4 dimensions,"
+ " thus enforcing the angle weight to be 1 for now"
)
box2box_transform_weights = box_predictor.box2box_transform.weights[:4]
else:
box_dim = 4
box2box_transform_weights = box_predictor.box2box_transform.weights
class_logits, box_regression = predictions
class_prob = F.softmax(class_logits, -1)
assert box_regression.shape[1] % box_dim == 0
cls_agnostic_bbox_reg = box_regression.shape[1] // box_dim == 1
input_tensor_mode = proposals[0].proposal_boxes.tensor.shape[1] == box_dim + 1
rois = type(proposals[0].proposal_boxes).cat([p.proposal_boxes for p in proposals])
device, dtype = rois.tensor.device, rois.tensor.dtype
if input_tensor_mode:
im_info = proposals[0].image_size
rois = rois.tensor
else:
im_info = torch.Tensor(
[[sz[0], sz[1], 1.0] for sz in [x.image_size for x in proposals]]
)
batch_ids = cat(
[
torch.full((b, 1), i, dtype=dtype, device=device)
for i, b in enumerate(len(p) for p in proposals)
],
dim=0,
)
rois = torch.cat([batch_ids, rois.tensor], dim=1)
roi_pred_bbox, roi_batch_splits = torch.ops._caffe2.BBoxTransform(
to_device(rois, "cpu"),
to_device(box_regression, "cpu"),
to_device(im_info, "cpu"),
weights=box2box_transform_weights,
apply_scale=True,
rotated=is_rotated,
angle_bound_on=True,
angle_bound_lo=-180,
angle_bound_hi=180,
clip_angle_thresh=1.0,
legacy_plus_one=False,
)
roi_pred_bbox = to_device(roi_pred_bbox, device)
roi_batch_splits = to_device(roi_batch_splits, device)
nms_outputs = torch.ops._caffe2.BoxWithNMSLimit(
to_device(class_prob, "cpu"),
to_device(roi_pred_bbox, "cpu"),
to_device(roi_batch_splits, "cpu"),
score_thresh=float(score_thresh),
nms=float(nms_thresh),
detections_per_im=int(topk_per_image),
soft_nms_enabled=False,
soft_nms_method="linear",
soft_nms_sigma=0.5,
soft_nms_min_score_thres=0.001,
rotated=is_rotated,
cls_agnostic_bbox_reg=cls_agnostic_bbox_reg,
input_boxes_include_bg_cls=False,
output_classes_include_bg_cls=False,
legacy_plus_one=False,
)
roi_score_nms = to_device(nms_outputs[0], device)
roi_bbox_nms = to_device(nms_outputs[1], device)
roi_class_nms = to_device(nms_outputs[2], device)
roi_batch_splits_nms = to_device(nms_outputs[3], device)
roi_keeps_nms = to_device(nms_outputs[4], device)
roi_keeps_size_nms = to_device(nms_outputs[5], device)
if not self.tensor_mode:
roi_class_nms = roi_class_nms.to(torch.int64)
roi_batch_ids = cat(
[
torch.full((b, 1), i, dtype=dtype, device=device)
for i, b in enumerate(int(x.item()) for x in roi_batch_splits_nms)
],
dim=0,
)
roi_class_nms = alias(roi_class_nms, "class_nms")
roi_score_nms = alias(roi_score_nms, "score_nms")
roi_bbox_nms = alias(roi_bbox_nms, "bbox_nms")
roi_batch_splits_nms = alias(roi_batch_splits_nms, "batch_splits_nms")
roi_keeps_nms = alias(roi_keeps_nms, "keeps_nms")
roi_keeps_size_nms = alias(roi_keeps_size_nms, "keeps_size_nms")
results = InstancesList(
im_info=im_info,
indices=roi_batch_ids[:, 0],
extra_fields={
"pred_boxes": Caffe2Boxes(roi_bbox_nms),
"scores": roi_score_nms,
"pred_classes": roi_class_nms,
},
)
if not self.tensor_mode:
results = InstancesList.to_d2_instances_list(results)
batch_splits = roi_batch_splits_nms.int().tolist()
kept_indices = list(roi_keeps_nms.to(torch.int64).split(batch_splits))
else:
results = [results]
kept_indices = [roi_keeps_nms]
return results, kept_indices
class Caffe2MaskRCNNInference:
def __call__(self, pred_mask_logits, pred_instances):
""" equivalent to mask_head.mask_rcnn_inference """
if all(isinstance(x, InstancesList) for x in pred_instances):
assert len(pred_instances) == 1
mask_probs_pred = pred_mask_logits.sigmoid()
mask_probs_pred = alias(mask_probs_pred, "mask_fcn_probs")
pred_instances[0].pred_masks = mask_probs_pred
else:
mask_rcnn_inference(pred_mask_logits, pred_instances)
class Caffe2KeypointRCNNInference:
def __init__(self, use_heatmap_max_keypoint):
self.use_heatmap_max_keypoint = use_heatmap_max_keypoint
def __call__(self, pred_keypoint_logits, pred_instances):
# just return the keypoint heatmap for now,
# there will be option to call HeatmapMaxKeypointOp
output = alias(pred_keypoint_logits, "kps_score")
if all(isinstance(x, InstancesList) for x in pred_instances):
assert len(pred_instances) == 1
if self.use_heatmap_max_keypoint:
device = output.device
output = torch.ops._caffe2.HeatmapMaxKeypoint(
to_device(output, "cpu"),
pred_instances[0].pred_boxes.tensor,
should_output_softmax=True, # worth make it configerable?
)
output = to_device(output, device)
output = alias(output, "keypoints_out")
pred_instances[0].pred_keypoints = output
return pred_keypoint_logits
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import copy
import io
import logging
import numpy as np
from typing import List
import onnx
import torch
from caffe2.proto import caffe2_pb2
from caffe2.python import core
from caffe2.python.onnx.backend import Caffe2Backend
from tabulate import tabulate
from termcolor import colored
from torch.onnx import OperatorExportTypes
from .shared import (
ScopedWS,
construct_init_net_from_params,
fuse_alias_placeholder,
fuse_copy_between_cpu_and_gpu,
get_params_from_init_net,
group_norm_replace_aten_with_caffe2,
infer_device_type,
remove_dead_end_ops,
remove_reshape_for_fc,
save_graph,
)
logger = logging.getLogger(__name__)
def export_onnx_model(model, inputs):
"""
Trace and export a model to onnx format.
Args:
model (nn.Module):
inputs (tuple[args]): the model will be called by `model(*inputs)`
Returns:
an onnx model
"""
assert isinstance(model, torch.nn.Module)
# make sure all modules are in eval mode, onnx may change the training state
# of the module if the states are not consistent
def _check_eval(module):
assert not module.training
model.apply(_check_eval)
# Export the model to ONNX
with torch.no_grad():
with io.BytesIO() as f:
torch.onnx.export(
model,
inputs,
f,
operator_export_type=OperatorExportTypes.ONNX_ATEN_FALLBACK,
# verbose=True, # NOTE: uncomment this for debugging
# export_params=True,
)
onnx_model = onnx.load_from_string(f.getvalue())
# Apply ONNX's Optimization
all_passes = onnx.optimizer.get_available_passes()
passes = ["fuse_bn_into_conv"]
assert all(p in all_passes for p in passes)
onnx_model = onnx.optimizer.optimize(onnx_model, passes)
return onnx_model
def _op_stats(net_def):
type_count = {}
for t in [op.type for op in net_def.op]:
type_count[t] = type_count.get(t, 0) + 1
type_count_list = sorted(type_count.items(), key=lambda kv: kv[0]) # alphabet
type_count_list = sorted(type_count_list, key=lambda kv: -kv[1]) # count
return "\n".join("{:>4}x {}".format(count, name) for name, count in type_count_list)
def _assign_device_option(
predict_net: caffe2_pb2.NetDef, init_net: caffe2_pb2.NetDef, tensor_inputs: List[torch.Tensor]
):
"""
ONNX exported network doesn't have concept of device, assign necessary
device option for each op in order to make it runable on GPU runtime.
"""
def _get_device_type(torch_tensor):
assert torch_tensor.device.type in ["cpu", "cuda"]
assert torch_tensor.device.index == 0
return torch_tensor.device.type
def _assign_op_device_option(net_proto, net_ssa, blob_device_types):
for op, ssa_i in zip(net_proto.op, net_ssa):
if op.type in ["CopyCPUToGPU", "CopyGPUToCPU"]:
op.device_option.CopyFrom(core.DeviceOption(caffe2_pb2.CUDA, 0))
else:
devices = [blob_device_types[b] for b in ssa_i[0] + ssa_i[1]]
assert all(d == devices[0] for d in devices)
if devices[0] == "cuda":
op.device_option.CopyFrom(core.DeviceOption(caffe2_pb2.CUDA, 0))
# update ops in predict_net
predict_net_input_device_types = {
(name, 0): _get_device_type(tensor)
for name, tensor in zip(predict_net.external_input, tensor_inputs)
}
predict_net_device_types = infer_device_type(
predict_net, known_status=predict_net_input_device_types, device_name_style="pytorch"
)
predict_net_ssa, _ = core.get_ssa(predict_net)
_assign_op_device_option(predict_net, predict_net_ssa, predict_net_device_types)
# update ops in init_net
init_net_ssa, versions = core.get_ssa(init_net)
init_net_output_device_types = {
(name, versions[name]): predict_net_device_types[(name, 0)]
for name in init_net.external_output
}
init_net_device_types = infer_device_type(
init_net, known_status=init_net_output_device_types, device_name_style="pytorch"
)
_assign_op_device_option(init_net, init_net_ssa, init_net_device_types)
def export_caffe2_detection_model(model: torch.nn.Module, tensor_inputs: List[torch.Tensor]):
"""
Export a caffe2-compatible Detectron2 model to caffe2 format via ONNX.
Arg:
model: a caffe2-compatible version of detectron2 model, defined in caffe2_modeling.py
tensor_inputs: a list of tensors that caffe2 model takes as input.
"""
model = copy.deepcopy(model)
assert isinstance(model, torch.nn.Module)
assert hasattr(model, "encode_additional_info")
# Export via ONNX
logger.info("Exporting a {} model via ONNX ...".format(type(model).__name__))
onnx_model = export_onnx_model(model, (tensor_inputs,))
# Convert ONNX model to Caffe2 protobuf
init_net, predict_net = Caffe2Backend.onnx_graph_to_caffe2_net(onnx_model)
ops_table = [[op.type, op.input, op.output] for op in predict_net.op]
table = tabulate(ops_table, headers=["type", "input", "output"], tablefmt="pipe")
logger.info(
"ONNX export Done. Exported predict_net (before optimizations):\n" + colored(table, "cyan")
)
# Apply protobuf optimization
fuse_alias_placeholder(predict_net, init_net)
if any(t.device.type != "cpu" for t in tensor_inputs):
fuse_copy_between_cpu_and_gpu(predict_net)
remove_dead_end_ops(init_net)
_assign_device_option(predict_net, init_net, tensor_inputs)
params, device_options = get_params_from_init_net(init_net)
predict_net, params = remove_reshape_for_fc(predict_net, params)
init_net = construct_init_net_from_params(params, device_options)
group_norm_replace_aten_with_caffe2(predict_net)
# Record necessary information for running the pb model in Detectron2 system.
model.encode_additional_info(predict_net, init_net)
logger.info("Operators used in predict_net: \n{}".format(_op_stats(predict_net)))
logger.info("Operators used in init_net: \n{}".format(_op_stats(init_net)))
return predict_net, init_net
def run_and_save_graph(predict_net, init_net, tensor_inputs, graph_save_path):
"""
Run the caffe2 model on given inputs, recording the shape and draw the graph.
predict_net/init_net: caffe2 model.
tensor_inputs: a list of tensors that caffe2 model takes as input.
graph_save_path: path for saving graph of exported model.
"""
logger.info("Saving graph of ONNX exported model to {} ...".format(graph_save_path))
save_graph(predict_net, graph_save_path, op_only=False)
# Run the exported Caffe2 net
logger.info("Running ONNX exported model ...")
with ScopedWS("__ws_tmp__", True) as ws:
ws.RunNetOnce(init_net)
initialized_blobs = set(ws.Blobs())
uninitialized = [inp for inp in predict_net.external_input if inp not in initialized_blobs]
for name, blob in zip(uninitialized, tensor_inputs):
ws.FeedBlob(name, blob)
try:
ws.RunNetOnce(predict_net)
except RuntimeError as e:
logger.warning("Encountered RuntimeError: \n{}".format(str(e)))
ws_blobs = {b: ws.FetchBlob(b) for b in ws.Blobs()}
blob_sizes = {b: ws_blobs[b].shape for b in ws_blobs if isinstance(ws_blobs[b], np.ndarray)}
logger.info("Saving graph with blob shapes to {} ...".format(graph_save_path))
save_graph(predict_net, graph_save_path, op_only=False, blob_sizes=blob_sizes)
return ws_blobs
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import collections
import logging
import numpy as np
import torch
from caffe2.proto import caffe2_pb2
from caffe2.python import core
from .caffe2_modeling import META_ARCH_CAFFE2_EXPORT_TYPE_MAP, convert_batched_inputs_to_c2_format
from .shared import ScopedWS, get_pb_arg_vali, get_pb_arg_vals, infer_device_type
logger = logging.getLogger(__name__)
class ProtobufModel(torch.nn.Module):
"""
A class works just like nn.Module in terms of inference, but running
caffe2 model under the hood. Input/Output are Dict[str, tensor] whose keys
are in external_input/output.
"""
def __init__(self, predict_net, init_net):
logger.info("Initializing ProtobufModel ...")
super().__init__()
assert isinstance(predict_net, caffe2_pb2.NetDef)
assert isinstance(init_net, caffe2_pb2.NetDef)
self.ws_name = "__ws_tmp__"
self.net = core.Net(predict_net)
with ScopedWS(self.ws_name, is_reset=True, is_cleanup=False) as ws:
ws.RunNetOnce(init_net)
for blob in self.net.Proto().external_input:
if blob not in ws.Blobs():
ws.CreateBlob(blob)
ws.CreateNet(self.net)
self._error_msgs = set()
def forward(self, inputs_dict):
assert all(inp in self.net.Proto().external_input for inp in inputs_dict)
with ScopedWS(self.ws_name, is_reset=False, is_cleanup=False) as ws:
for b, tensor in inputs_dict.items():
ws.FeedBlob(b, tensor)
try:
ws.RunNet(self.net.Proto().name)
except RuntimeError as e:
if not str(e) in self._error_msgs:
self._error_msgs.add(str(e))
logger.warning("Encountered new RuntimeError: \n{}".format(str(e)))
logger.warning("Catch the error and use partial results.")
outputs_dict = collections.OrderedDict(
[(b, ws.FetchBlob(b)) for b in self.net.Proto().external_output]
)
# Remove outputs of current run, this is necessary in order to
# prevent fetching the result from previous run if the model fails
# in the middle.
for b in self.net.Proto().external_output:
# Needs to create uninitialized blob to make the net runable.
# This is "equivalent" to: ws.RemoveBlob(b) then ws.CreateBlob(b),
# but there'no such API.
ws.FeedBlob(b, "{}, a C++ native class of type nullptr (uninitialized).".format(b))
return outputs_dict
class ProtobufDetectionModel(torch.nn.Module):
"""
A class works just like a pytorch meta arch in terms of inference, but running
caffe2 model under the hood.
"""
def __init__(self, predict_net, init_net, *, convert_outputs=None):
"""
Args:
predict_net, init_net (core.Net): caffe2 nets
convert_outptus (callable): a function that converts caffe2
outputs to the same format of the original pytorch model.
By default, use the one defined in the caffe2 meta_arch.
"""
super().__init__()
self.protobuf_model = ProtobufModel(predict_net, init_net)
self.size_divisibility = get_pb_arg_vali(predict_net, "size_divisibility", 0)
self.device = get_pb_arg_vals(predict_net, "device", b"cpu").decode("ascii")
if convert_outputs is None:
meta_arch = get_pb_arg_vals(predict_net, "meta_architecture", b"GeneralizedRCNN")
meta_arch = META_ARCH_CAFFE2_EXPORT_TYPE_MAP[meta_arch.decode("ascii")]
self._convert_outputs = meta_arch.get_outputs_converter(predict_net, init_net)
else:
self._convert_outputs = convert_outputs
def _infer_output_devices(self, inputs_dict):
def _get_device_type(torch_tensor):
assert torch_tensor.device.type in ["cpu", "cuda"]
assert torch_tensor.device.index == 0
return torch_tensor.device.type
predict_net = self.protobuf_model.net.Proto()
input_device_types = {
(name, 0): _get_device_type(tensor) for name, tensor in inputs_dict.items()
}
device_type_map = infer_device_type(
predict_net, known_status=input_device_types, device_name_style="pytorch"
)
ssa, versions = core.get_ssa(predict_net)
versioned_outputs = [(name, versions[name]) for name in predict_net.external_output]
output_devices = [device_type_map[outp] for outp in versioned_outputs]
return output_devices
def _convert_inputs(self, batched_inputs):
# currently all models convert inputs in the same way
data, im_info = convert_batched_inputs_to_c2_format(
batched_inputs, self.size_divisibility, self.device
)
return {"data": data, "im_info": im_info}
def forward(self, batched_inputs):
c2_inputs = self._convert_inputs(batched_inputs)
c2_results = self.protobuf_model(c2_inputs)
if any(t.device.type != "cpu" for _, t in c2_inputs.items()):
output_devices = self._infer_output_devices(c2_inputs)
else:
output_devices = ["cpu" for _ in self.protobuf_model.net.Proto().external_output]
def _cast_caffe2_blob_to_torch_tensor(blob, device):
return torch.Tensor(blob).to(device) if isinstance(blob, np.ndarray) else None
c2_results = {
name: _cast_caffe2_blob_to_torch_tensor(c2_results[name], device)
for name, device in zip(self.protobuf_model.net.Proto().external_output, output_devices)
}
return self._convert_outputs(batched_inputs, c2_inputs, c2_results)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import functools
import io
import struct
import types
import torch
from detectron2.modeling import meta_arch
from detectron2.modeling.box_regression import Box2BoxTransform
from detectron2.modeling.meta_arch.panoptic_fpn import combine_semantic_and_instance_outputs
from detectron2.modeling.postprocessing import detector_postprocess, sem_seg_postprocess
from detectron2.modeling.roi_heads import keypoint_head
from detectron2.structures import Boxes, ImageList, Instances, RotatedBoxes
from .c10 import Caffe2Compatible
from .patcher import ROIHeadsPatcher, patch_generalized_rcnn
from .shared import (
alias,
check_set_pb_arg,
get_pb_arg_floats,
get_pb_arg_valf,
get_pb_arg_vali,
get_pb_arg_vals,
mock_torch_nn_functional_interpolate,
)
def assemble_rcnn_outputs_by_name(image_sizes, tensor_outputs, force_mask_on=False):
"""
A function to assemble caffe2 model's outputs (i.e. Dict[str, Tensor])
to detectron2's format (i.e. list of Instances instance).
This only works when the model follows the Caffe2 detectron's naming convention.
Args:
image_sizes (List[List[int, int]]): [H, W] of every image.
tensor_outputs (Dict[str, Tensor]): external_output to its tensor.
force_mask_on (Bool): if true, the it make sure there'll be pred_masks even
if the mask is not found from tensor_outputs (usually due to model crash)
"""
results = [Instances(image_size) for image_size in image_sizes]
batch_splits = tensor_outputs.get("batch_splits", None)
if batch_splits:
raise NotImplementedError()
assert len(image_sizes) == 1
result = results[0]
bbox_nms = tensor_outputs["bbox_nms"]
score_nms = tensor_outputs["score_nms"]
class_nms = tensor_outputs["class_nms"]
# Detection will always success because Conv support 0-batch
assert bbox_nms is not None
assert score_nms is not None
assert class_nms is not None
if bbox_nms.shape[1] == 5:
result.pred_boxes = RotatedBoxes(bbox_nms)
else:
result.pred_boxes = Boxes(bbox_nms)
result.scores = score_nms
result.pred_classes = class_nms.to(torch.int64)
mask_fcn_probs = tensor_outputs.get("mask_fcn_probs", None)
if mask_fcn_probs is not None:
# finish the mask pred
mask_probs_pred = mask_fcn_probs
num_masks = mask_probs_pred.shape[0]
class_pred = result.pred_classes
indices = torch.arange(num_masks, device=class_pred.device)
mask_probs_pred = mask_probs_pred[indices, class_pred][:, None]
result.pred_masks = mask_probs_pred
elif force_mask_on:
# NOTE: there's no way to know the height/width of mask here, it won't be
# used anyway when batch size is 0, so just set them to 0.
result.pred_masks = torch.zeros([0, 1, 0, 0], dtype=torch.uint8)
keypoints_out = tensor_outputs.get("keypoints_out", None)
kps_score = tensor_outputs.get("kps_score", None)
if keypoints_out is not None:
# keypoints_out: [N, 4, #kypoints], where 4 is in order of (x, y, score, prob)
keypoints_tensor = keypoints_out
# NOTE: it's possible that prob is not calculated if "should_output_softmax"
# is set to False in HeatmapMaxKeypoint, so just using raw score, seems
# it doesn't affect mAP. TODO: check more carefully.
keypoint_xyp = keypoints_tensor.transpose(1, 2)[:, :, [0, 1, 2]]
result.pred_keypoints = keypoint_xyp
elif kps_score is not None:
# keypoint heatmap to sparse data structure
pred_keypoint_logits = kps_score
keypoint_head.keypoint_rcnn_inference(pred_keypoint_logits, [result])
return results
def _cast_to_f32(f64):
return struct.unpack("f", struct.pack("f", f64))[0]
def set_caffe2_compatible_tensor_mode(model, enable=True):
def _fn(m):
if isinstance(m, Caffe2Compatible):
m.tensor_mode = enable
model.apply(_fn)
def convert_batched_inputs_to_c2_format(batched_inputs, size_divisibility, device):
"""
See get_caffe2_inputs() below.
"""
assert all(isinstance(x, dict) for x in batched_inputs)
assert all(x["image"].dim() == 3 for x in batched_inputs)
images = [x["image"] for x in batched_inputs]
images = ImageList.from_tensors(images, size_divisibility)
im_info = []
for input_per_image, image_size in zip(batched_inputs, images.image_sizes):
target_height = input_per_image.get("height", image_size[0])
target_width = input_per_image.get("width", image_size[1]) # noqa
# NOTE: The scale inside im_info is kept as convention and for providing
# post-processing information if further processing is needed. For
# current Caffe2 model definitions that don't include post-processing inside
# the model, this number is not used.
# NOTE: There can be a slight difference between width and height
# scales, using a single number can results in numerical difference
# compared with D2's post-processing.
scale = target_height / image_size[0]
im_info.append([image_size[0], image_size[1], scale])
im_info = torch.Tensor(im_info)
return images.tensor.to(device), im_info.to(device)
class Caffe2MetaArch(Caffe2Compatible, torch.nn.Module):
"""
Base class for caffe2-compatible implementation of a meta architecture.
The forward is traceable and its traced graph can be converted to caffe2
graph through ONNX.
"""
def __init__(self, cfg, torch_model):
"""
Args:
cfg (CfgNode):
torch_model (nn.Module): the detectron2 model (meta_arch) to be
converted.
"""
super().__init__()
self._wrapped_model = torch_model
self.eval()
set_caffe2_compatible_tensor_mode(self, True)
def get_caffe2_inputs(self, batched_inputs):
"""
Convert pytorch-style structured inputs to caffe2-style inputs that
are tuples of tensors.
Args:
batched_inputs (list[dict]): inputs to a detectron2 model
in its standard format. Each dict has "image" (CHW tensor), and optionally
"height" and "width".
Returns:
tuple[Tensor]:
tuple of tensors that will be the inputs to the
:meth:`forward` method. For existing models, the first
is an NCHW tensor (padded and batched); the second is
a im_info Nx3 tensor, where the rows are
(height, width, unused legacy parameter)
"""
return convert_batched_inputs_to_c2_format(
batched_inputs,
self._wrapped_model.backbone.size_divisibility,
self._wrapped_model.device,
)
def encode_additional_info(self, predict_net, init_net):
"""
Save extra metadata that will be used by inference in the output protobuf.
"""
pass
def forward(self, inputs):
"""
Run the forward in caffe2-style. It has to use caffe2-compatible ops
and the method will be used for tracing.
Args:
inputs (tuple[Tensor]): inputs defined by :meth:`get_caffe2_input`.
They will be the inputs of the converted caffe2 graph.
Returns:
tuple[Tensor]: output tensors. They will be the outputs of the
converted caffe2 graph.
"""
raise NotImplementedError
def _caffe2_preprocess_image(self, inputs):
"""
Caffe2 implementation of preprocess_image, which is called inside each MetaArch's forward.
It normalizes the input images, and the final caffe2 graph assumes the
inputs have been batched already.
"""
data, im_info = inputs
data = alias(data, "data")
im_info = alias(im_info, "im_info")
mean, std = self._wrapped_model.pixel_mean, self._wrapped_model.pixel_std
normalized_data = (data - mean) / std
normalized_data = alias(normalized_data, "normalized_data")
# Pack (data, im_info) into ImageList which is recognized by self.inference.
images = ImageList(tensor=normalized_data, image_sizes=im_info)
return images
@staticmethod
def get_outputs_converter(predict_net, init_net):
"""
Creates a function that converts outputs of the caffe2 model to
detectron2's standard format.
The function uses information in `predict_net` and `init_net` that are
available at inferene time. Therefore the function logic can be used in inference.
The returned function has the following signature:
def convert(batched_inputs, c2_inputs, c2_results) -> detectron2_outputs
Where
* batched_inputs (list[dict]): the original input format of the meta arch
* c2_inputs (dict[str, Tensor]): the caffe2 inputs.
* c2_results (dict[str, Tensor]): the caffe2 output format,
corresponding to the outputs of the :meth:`forward` function.
* detectron2_outputs: the original output format of the meta arch.
This function can be used to compare the outputs of the original meta arch and
the converted caffe2 graph.
Returns:
callable: a callable of the above signature.
"""
raise NotImplementedError
class Caffe2GeneralizedRCNN(Caffe2MetaArch):
def __init__(self, cfg, torch_model):
assert isinstance(torch_model, meta_arch.GeneralizedRCNN)
torch_model = patch_generalized_rcnn(torch_model)
super().__init__(cfg, torch_model)
self.roi_heads_patcher = ROIHeadsPatcher(cfg, self._wrapped_model.roi_heads)
def encode_additional_info(self, predict_net, init_net):
size_divisibility = self._wrapped_model.backbone.size_divisibility
check_set_pb_arg(predict_net, "size_divisibility", "i", size_divisibility)
check_set_pb_arg(
predict_net, "device", "s", str.encode(str(self._wrapped_model.device), "ascii")
)
check_set_pb_arg(predict_net, "meta_architecture", "s", b"GeneralizedRCNN")
@mock_torch_nn_functional_interpolate()
def forward(self, inputs):
if not self.tensor_mode:
return self._wrapped_model.inference(inputs)
images = self._caffe2_preprocess_image(inputs)
features = self._wrapped_model.backbone(images.tensor)
proposals, _ = self._wrapped_model.proposal_generator(images, features)
with self.roi_heads_patcher.mock_roi_heads():
detector_results, _ = self._wrapped_model.roi_heads(images, features, proposals)
return tuple(detector_results[0].flatten())
@staticmethod
def get_outputs_converter(predict_net, init_net):
def f(batched_inputs, c2_inputs, c2_results):
image_sizes = [[int(im[0]), int(im[1])] for im in c2_inputs["im_info"]]
results = assemble_rcnn_outputs_by_name(image_sizes, c2_results)
return meta_arch.GeneralizedRCNN._postprocess(results, batched_inputs, image_sizes)
return f
class Caffe2PanopticFPN(Caffe2MetaArch):
def __init__(self, cfg, torch_model):
assert isinstance(torch_model, meta_arch.PanopticFPN)
torch_model = patch_generalized_rcnn(torch_model)
super().__init__(cfg, torch_model)
self.roi_heads_patcher = ROIHeadsPatcher(cfg, self._wrapped_model.roi_heads)
@mock_torch_nn_functional_interpolate()
def forward(self, inputs):
assert self.tensor_mode
images = self._caffe2_preprocess_image(inputs)
features = self._wrapped_model.backbone(images.tensor)
sem_seg_results, _ = self._wrapped_model.sem_seg_head(features)
sem_seg_results = alias(sem_seg_results, "sem_seg")
proposals, _ = self._wrapped_model.proposal_generator(images, features)
with self.roi_heads_patcher.mock_roi_heads(self.tensor_mode):
detector_results, _ = self._wrapped_model.roi_heads(images, features, proposals)
return tuple(detector_results[0].flatten()) + (sem_seg_results,)
def encode_additional_info(self, predict_net, init_net):
size_divisibility = self._wrapped_model.backbone.size_divisibility
check_set_pb_arg(predict_net, "size_divisibility", "i", size_divisibility)
check_set_pb_arg(
predict_net, "device", "s", str.encode(str(self._wrapped_model.device), "ascii")
)
check_set_pb_arg(predict_net, "meta_architecture", "s", b"PanopticFPN")
# Inference parameters:
check_set_pb_arg(predict_net, "combine_on", "i", self._wrapped_model.combine_on)
check_set_pb_arg(
predict_net,
"combine_overlap_threshold",
"f",
_cast_to_f32(self._wrapped_model.combine_overlap_threshold),
)
check_set_pb_arg(
predict_net,
"combine_stuff_area_limit",
"i",
self._wrapped_model.combine_stuff_area_limit,
)
check_set_pb_arg(
predict_net,
"combine_instances_confidence_threshold",
"f",
_cast_to_f32(self._wrapped_model.combine_instances_confidence_threshold),
)
@staticmethod
def get_outputs_converter(predict_net, init_net):
combine_on = get_pb_arg_vali(predict_net, "combine_on", None)
combine_overlap_threshold = get_pb_arg_valf(predict_net, "combine_overlap_threshold", None)
combine_stuff_area_limit = get_pb_arg_vali(predict_net, "combine_stuff_area_limit", None)
combine_instances_confidence_threshold = get_pb_arg_valf(
predict_net, "combine_instances_confidence_threshold", None
)
def f(batched_inputs, c2_inputs, c2_results):
image_sizes = [[int(im[0]), int(im[1])] for im in c2_inputs["im_info"]]
detector_results = assemble_rcnn_outputs_by_name(
image_sizes, c2_results, force_mask_on=True
)
sem_seg_results = c2_results["sem_seg"]
# copied from meta_arch/panoptic_fpn.py ...
processed_results = []
for sem_seg_result, detector_result, input_per_image, image_size in zip(
sem_seg_results, detector_results, batched_inputs, image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
sem_seg_r = sem_seg_postprocess(sem_seg_result, image_size, height, width)
detector_r = detector_postprocess(detector_result, height, width)
processed_results.append({"sem_seg": sem_seg_r, "instances": detector_r})
if combine_on:
panoptic_r = combine_semantic_and_instance_outputs(
detector_r,
sem_seg_r.argmax(dim=0),
combine_overlap_threshold,
combine_stuff_area_limit,
combine_instances_confidence_threshold,
)
processed_results[-1]["panoptic_seg"] = panoptic_r
return processed_results
return f
class Caffe2RetinaNet(Caffe2MetaArch):
def __init__(self, cfg, torch_model):
assert isinstance(torch_model, meta_arch.RetinaNet)
super().__init__(cfg, torch_model)
@mock_torch_nn_functional_interpolate()
def forward(self, inputs):
assert self.tensor_mode
images = self._caffe2_preprocess_image(inputs)
# explicitly return the images sizes to avoid removing "im_info" by ONNX
# since it's not used in the forward path
return_tensors = [images.image_sizes]
features = self._wrapped_model.backbone(images.tensor)
features = [features[f] for f in self._wrapped_model.in_features]
for i, feature_i in enumerate(features):
features[i] = alias(feature_i, "feature_{}".format(i), is_backward=True)
return_tensors.append(features[i])
box_cls, box_delta = self._wrapped_model.head(features)
for i, (box_cls_i, box_delta_i) in enumerate(zip(box_cls, box_delta)):
return_tensors.append(alias(box_cls_i, "box_cls_{}".format(i)))
return_tensors.append(alias(box_delta_i, "box_delta_{}".format(i)))
return tuple(return_tensors)
def encode_additional_info(self, predict_net, init_net):
size_divisibility = self._wrapped_model.backbone.size_divisibility
check_set_pb_arg(predict_net, "size_divisibility", "i", size_divisibility)
check_set_pb_arg(
predict_net, "device", "s", str.encode(str(self._wrapped_model.device), "ascii")
)
check_set_pb_arg(predict_net, "meta_architecture", "s", b"RetinaNet")
# Inference parameters:
check_set_pb_arg(
predict_net, "score_threshold", "f", _cast_to_f32(self._wrapped_model.score_threshold)
)
check_set_pb_arg(predict_net, "topk_candidates", "i", self._wrapped_model.topk_candidates)
check_set_pb_arg(
predict_net, "nms_threshold", "f", _cast_to_f32(self._wrapped_model.nms_threshold)
)
check_set_pb_arg(
predict_net,
"max_detections_per_image",
"i",
self._wrapped_model.max_detections_per_image,
)
check_set_pb_arg(
predict_net,
"bbox_reg_weights",
"floats",
[_cast_to_f32(w) for w in self._wrapped_model.box2box_transform.weights],
)
self._encode_anchor_generator_cfg(predict_net)
def _encode_anchor_generator_cfg(self, predict_net):
# serialize anchor_generator for future use
serialized_anchor_generator = io.BytesIO()
torch.save(self._wrapped_model.anchor_generator, serialized_anchor_generator)
# Ideally we can put anchor generating inside the model, then we don't
# need to store this information.
bytes = serialized_anchor_generator.getvalue()
check_set_pb_arg(predict_net, "serialized_anchor_generator", "s", bytes)
@staticmethod
def get_outputs_converter(predict_net, init_net):
self = types.SimpleNamespace()
serialized_anchor_generator = io.BytesIO(
get_pb_arg_vals(predict_net, "serialized_anchor_generator", None)
)
self.anchor_generator = torch.load(serialized_anchor_generator)
bbox_reg_weights = get_pb_arg_floats(predict_net, "bbox_reg_weights", None)
self.box2box_transform = Box2BoxTransform(weights=tuple(bbox_reg_weights))
self.score_threshold = get_pb_arg_valf(predict_net, "score_threshold", None)
self.topk_candidates = get_pb_arg_vali(predict_net, "topk_candidates", None)
self.nms_threshold = get_pb_arg_valf(predict_net, "nms_threshold", None)
self.max_detections_per_image = get_pb_arg_vali(
predict_net, "max_detections_per_image", None
)
# hack to reuse inference code from RetinaNet
self.inference = functools.partial(meta_arch.RetinaNet.inference, self)
self.inference_single_image = functools.partial(
meta_arch.RetinaNet.inference_single_image, self
)
def f(batched_inputs, c2_inputs, c2_results):
image_sizes = [[int(im[0]), int(im[1])] for im in c2_inputs["im_info"]]
num_features = len([x for x in c2_results.keys() if x.startswith("box_cls_")])
box_cls = [c2_results["box_cls_{}".format(i)] for i in range(num_features)]
box_delta = [c2_results["box_delta_{}".format(i)] for i in range(num_features)]
# For each feature level, feature should have the same batch size and
# spatial dimension as the box_cls and box_delta.
dummy_features = [box_delta[i].clone()[:, 0:0, :, :] for i in range(num_features)]
anchors = self.anchor_generator(dummy_features)
# self.num_classess can be inferred
self.num_classes = box_cls[0].shape[1] // (box_delta[0].shape[1] // 4)
results = self.inference(box_cls, box_delta, anchors, image_sizes)
return meta_arch.GeneralizedRCNN._postprocess(results, batched_inputs, image_sizes)
return f
META_ARCH_CAFFE2_EXPORT_TYPE_MAP = {
"GeneralizedRCNN": Caffe2GeneralizedRCNN,
"PanopticFPN": Caffe2PanopticFPN,
"RetinaNet": Caffe2RetinaNet,
}
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import contextlib
import mock
import torch
from detectron2.modeling import poolers
from detectron2.modeling.proposal_generator import rpn
from detectron2.modeling.roi_heads import keypoint_head, mask_head
from detectron2.modeling.roi_heads.fast_rcnn import FastRCNNOutputLayers
from .c10 import (
Caffe2Compatible,
Caffe2FastRCNNOutputsInference,
Caffe2KeypointRCNNInference,
Caffe2MaskRCNNInference,
Caffe2ROIPooler,
Caffe2RPN,
)
class GenericMixin(object):
pass
class Caffe2CompatibleConverter(object):
"""
A GenericUpdater which implements the `create_from` interface, by modifying
module object and assign it with another class replaceCls.
"""
def __init__(self, replaceCls):
self.replaceCls = replaceCls
def create_from(self, module):
# update module's class to the new class
assert isinstance(module, torch.nn.Module)
if issubclass(self.replaceCls, GenericMixin):
# replaceCls should act as mixin, create a new class on-the-fly
new_class = type(
"{}MixedWith{}".format(self.replaceCls.__name__, module.__class__.__name__),
(self.replaceCls, module.__class__),
{}, # {"new_method": lambda self: ...},
)
module.__class__ = new_class
else:
# replaceCls is complete class, this allow arbitrary class swap
module.__class__ = self.replaceCls
# initialize Caffe2Compatible
if isinstance(module, Caffe2Compatible):
module.tensor_mode = False
return module
def patch(model, target, updater, *args, **kwargs):
"""
recursively (post-order) update all modules with the target type and its
subclasses, make a initialization/composition/inheritance/... via the
updater.create_from.
"""
for name, module in model.named_children():
model._modules[name] = patch(module, target, updater, *args, **kwargs)
if isinstance(model, target):
return updater.create_from(model, *args, **kwargs)
return model
def patch_generalized_rcnn(model):
ccc = Caffe2CompatibleConverter
model = patch(model, rpn.RPN, ccc(Caffe2RPN))
model = patch(model, poolers.ROIPooler, ccc(Caffe2ROIPooler))
return model
@contextlib.contextmanager
def mock_fastrcnn_outputs_inference(
tensor_mode, check=True, box_predictor_type=FastRCNNOutputLayers
):
with mock.patch.object(
box_predictor_type,
"inference",
autospec=True,
side_effect=Caffe2FastRCNNOutputsInference(tensor_mode),
) as mocked_func:
yield
if check:
assert mocked_func.call_count > 0
@contextlib.contextmanager
def mock_mask_rcnn_inference(tensor_mode, patched_module, check=True):
with mock.patch(
"{}.mask_rcnn_inference".format(patched_module), side_effect=Caffe2MaskRCNNInference()
) as mocked_func:
yield
if check:
assert mocked_func.call_count > 0
@contextlib.contextmanager
def mock_keypoint_rcnn_inference(tensor_mode, patched_module, use_heatmap_max_keypoint, check=True):
with mock.patch(
"{}.keypoint_rcnn_inference".format(patched_module),
side_effect=Caffe2KeypointRCNNInference(use_heatmap_max_keypoint),
) as mocked_func:
yield
if check:
assert mocked_func.call_count > 0
class ROIHeadsPatcher:
def __init__(self, cfg, heads):
self.heads = heads
self.use_heatmap_max_keypoint = cfg.EXPORT_CAFFE2.USE_HEATMAP_MAX_KEYPOINT
@contextlib.contextmanager
def mock_roi_heads(self, tensor_mode=True):
"""
Patching several inference functions inside ROIHeads and its subclasses
Args:
tensor_mode (bool): whether the inputs/outputs are caffe2's tensor
format or not. Default to True.
"""
# NOTE: this requries the `keypoint_rcnn_inference` and `mask_rcnn_inference`
# are called inside the same file as BaseXxxHead due to using mock.patch.
kpt_heads_mod = keypoint_head.BaseKeypointRCNNHead.__module__
mask_head_mod = mask_head.BaseMaskRCNNHead.__module__
mock_ctx_managers = [
mock_fastrcnn_outputs_inference(
tensor_mode=tensor_mode,
check=True,
box_predictor_type=type(self.heads.box_predictor),
)
]
if getattr(self.heads, "keypoint_on", False):
mock_ctx_managers += [
mock_keypoint_rcnn_inference(
tensor_mode, kpt_heads_mod, self.use_heatmap_max_keypoint
)
]
if getattr(self.heads, "mask_on", False):
mock_ctx_managers += [mock_mask_rcnn_inference(tensor_mode, mask_head_mod)]
with contextlib.ExitStack() as stack: # python 3.3+
for mgr in mock_ctx_managers:
stack.enter_context(mgr)
yield
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import collections
import contextlib
import copy
import functools
import logging
import mock
import numpy as np
import os
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import caffe2.python.utils as putils
import torch
import torch.nn.functional as F
from caffe2.proto import caffe2_pb2
from caffe2.python import core, net_drawer, workspace
from torch.nn.functional import interpolate as interp
logger = logging.getLogger(__name__)
# ==== torch/utils_toffee/cast.py =======================================
def to_device(t, device_str):
"""
This function is a replacement of .to(another_device) such that it allows the
casting to be traced properly by explicitly calling the underlying copy ops.
It also avoids introducing unncessary op when casting to the same device.
"""
src = t.device
dst = torch.device(device_str)
if src == dst:
return t
elif src.type == "cuda" and dst.type == "cpu":
return torch.ops._caffe2.CopyGPUToCPU(t)
elif src.type == "cpu" and dst.type == "cuda":
return torch.ops._caffe2.CopyCPUToGPU(t)
else:
raise RuntimeError("Can't cast tensor from device {} to device {}".format(src, dst))
# ==== torch/utils_toffee/interpolate.py =======================================
# Note: borrowed from vision/detection/fair/detectron/detectron/modeling/detector.py
def BilinearInterpolation(tensor_in, up_scale):
assert up_scale % 2 == 0, "Scale should be even"
def upsample_filt(size):
factor = (size + 1) // 2
if size % 2 == 1:
center = factor - 1
else:
center = factor - 0.5
og = np.ogrid[:size, :size]
return (1 - abs(og[0] - center) / factor) * (1 - abs(og[1] - center) / factor)
kernel_size = int(up_scale) * 2
bil_filt = upsample_filt(kernel_size)
dim = int(tensor_in.shape[1])
kernel = np.zeros((dim, dim, kernel_size, kernel_size), dtype=np.float32)
kernel[range(dim), range(dim), :, :] = bil_filt
tensor_out = F.conv_transpose2d(
tensor_in,
weight=to_device(torch.Tensor(kernel), tensor_in.device),
bias=None,
stride=int(up_scale),
padding=int(up_scale / 2),
)
return tensor_out
# NOTE: ONNX is incompatible with traced torch.nn.functional.interpolate if
# using dynamic `scale_factor` rather than static `size`. (T43166860)
# NOTE: Caffe2 Int8 conversion might not be able to quantize `size` properly.
def onnx_compatibale_interpolate(
input, size=None, scale_factor=None, mode="nearest", align_corners=None
):
# NOTE: The input dimensions are interpreted in the form:
# `mini-batch x channels x [optional depth] x [optional height] x width`.
if size is None and scale_factor is not None:
if input.dim() == 4:
if isinstance(scale_factor, (int, float)):
height_scale, width_scale = (scale_factor, scale_factor)
else:
assert isinstance(scale_factor, (tuple, list))
assert len(scale_factor) == 2
height_scale, width_scale = scale_factor
assert not align_corners, "No matching C2 op for align_corners == True"
if mode == "nearest":
return torch.ops._caffe2.ResizeNearest(
input, order="NCHW", width_scale=width_scale, height_scale=height_scale
)
elif mode == "bilinear":
logger.warning(
"Use F.conv_transpose2d for bilinear interpolate"
" because there's no such C2 op, this may cause significant"
" slowdown and the boundary pixels won't be as same as"
" using F.interpolate due to padding."
)
assert height_scale == width_scale
return BilinearInterpolation(input, up_scale=height_scale)
logger.warning("Output size is not static, it might cause ONNX conversion issue")
return interp(input, size, scale_factor, mode, align_corners)
@contextlib.contextmanager
def mock_torch_nn_functional_interpolate():
if torch.onnx.is_in_onnx_export():
with mock.patch(
"torch.nn.functional.interpolate", side_effect=onnx_compatibale_interpolate
):
yield
else:
yield
# ==== torch/utils_caffe2/ws_utils.py ==========================================
class ScopedWS(object):
def __init__(self, ws_name, is_reset, is_cleanup=False):
self.ws_name = ws_name
self.is_reset = is_reset
self.is_cleanup = is_cleanup
self.org_ws = ""
def __enter__(self):
self.org_ws = workspace.CurrentWorkspace()
if self.ws_name is not None:
workspace.SwitchWorkspace(self.ws_name, True)
if self.is_reset:
workspace.ResetWorkspace()
return workspace
def __exit__(self, *args):
if self.is_cleanup:
workspace.ResetWorkspace()
if self.ws_name is not None:
workspace.SwitchWorkspace(self.org_ws)
def fetch_any_blob(name):
bb = None
try:
bb = workspace.FetchBlob(name)
except TypeError:
bb = workspace.FetchInt8Blob(name)
except Exception as e:
logger.error("Get blob {} error: {}".format(name, e))
return bb
# ==== torch/utils_caffe2/protobuf.py ==========================================
def get_pb_arg(pb, arg_name):
for x in pb.arg:
if x.name == arg_name:
return x
return None
def get_pb_arg_valf(pb, arg_name, default_val):
arg = get_pb_arg(pb, arg_name)
return arg.f if arg is not None else default_val
def get_pb_arg_floats(pb, arg_name, default_val):
arg = get_pb_arg(pb, arg_name)
return list(map(float, arg.floats)) if arg is not None else default_val
def get_pb_arg_ints(pb, arg_name, default_val):
arg = get_pb_arg(pb, arg_name)
return list(map(int, arg.ints)) if arg is not None else default_val
def get_pb_arg_vali(pb, arg_name, default_val):
arg = get_pb_arg(pb, arg_name)
return arg.i if arg is not None else default_val
def get_pb_arg_vals(pb, arg_name, default_val):
arg = get_pb_arg(pb, arg_name)
return arg.s if arg is not None else default_val
def get_pb_arg_valstrings(pb, arg_name, default_val):
arg = get_pb_arg(pb, arg_name)
return list(arg.strings) if arg is not None else default_val
def check_set_pb_arg(pb, arg_name, arg_attr, arg_value, allow_override=False):
arg = get_pb_arg(pb, arg_name)
if arg is None:
arg = putils.MakeArgument(arg_name, arg_value)
assert hasattr(arg, arg_attr)
pb.arg.extend([arg])
if allow_override and getattr(arg, arg_attr) != arg_value:
logger.warning(
"Override argument {}: {} -> {}".format(arg_name, getattr(arg, arg_attr), arg_value)
)
setattr(arg, arg_attr, arg_value)
else:
assert arg is not None
assert getattr(arg, arg_attr) == arg_value, "Existing value {}, new value {}".format(
getattr(arg, arg_attr), arg_value
)
def _create_const_fill_op_from_numpy(name, tensor, device_option=None):
assert type(tensor) == np.ndarray
kTypeNameMapper = {
np.dtype("float32"): "GivenTensorFill",
np.dtype("int32"): "GivenTensorIntFill",
np.dtype("int64"): "GivenTensorInt64Fill",
np.dtype("uint8"): "GivenTensorStringFill",
}
args_dict = {}
if tensor.dtype == np.dtype("uint8"):
args_dict.update({"values": [str(tensor.data)], "shape": [1]})
else:
args_dict.update({"values": tensor, "shape": tensor.shape})
if device_option is not None:
args_dict["device_option"] = device_option
return core.CreateOperator(kTypeNameMapper[tensor.dtype], [], [name], **args_dict)
def _create_const_fill_op_from_c2_int8_tensor(name, int8_tensor):
assert type(int8_tensor) == workspace.Int8Tensor
kTypeNameMapper = {
np.dtype("int32"): "Int8GivenIntTensorFill",
np.dtype("uint8"): "Int8GivenTensorFill",
}
tensor = int8_tensor.data
assert tensor.dtype in [np.dtype("uint8"), np.dtype("int32")]
values = tensor.tobytes() if tensor.dtype == np.dtype("uint8") else tensor
return core.CreateOperator(
kTypeNameMapper[tensor.dtype],
[],
[name],
values=values,
shape=tensor.shape,
Y_scale=int8_tensor.scale,
Y_zero_point=int8_tensor.zero_point,
)
def create_const_fill_op(
name: str,
blob: Union[np.ndarray, workspace.Int8Tensor],
device_option: Optional[caffe2_pb2.DeviceOption] = None,
) -> caffe2_pb2.OperatorDef:
"""
Given a blob object, return the Caffe2 operator that creates this blob
as constant. Currently support NumPy tensor and Caffe2 Int8Tensor.
"""
tensor_type = type(blob)
assert tensor_type in [
np.ndarray,
workspace.Int8Tensor,
], 'Error when creating const fill op for "{}", unsupported blob type: {}'.format(
name, type(blob)
)
if tensor_type == np.ndarray:
return _create_const_fill_op_from_numpy(name, blob, device_option)
elif tensor_type == workspace.Int8Tensor:
assert device_option is None
return _create_const_fill_op_from_c2_int8_tensor(name, blob)
def construct_init_net_from_params(
params: Dict[str, Any], device_options: Optional[Dict[str, caffe2_pb2.DeviceOption]] = None
) -> caffe2_pb2.NetDef:
"""
Construct the init_net from params dictionary
"""
init_net = caffe2_pb2.NetDef()
device_options = device_options or {}
for name, blob in params.items():
if isinstance(blob, str):
logger.warning(
(
"Blob {} with type {} is not supported in generating init net,"
" skipped.".format(name, type(blob))
)
)
continue
init_net.op.extend(
[create_const_fill_op(name, blob, device_option=device_options.get(name, None))]
)
init_net.external_output.append(name)
return init_net
def get_producer_map(ssa):
"""
Return dict from versioned blob to (i, j),
where i is index of producer op, j is the index of output of that op.
"""
producer_map = {}
for i in range(len(ssa)):
outputs = ssa[i][1]
for j, outp in enumerate(outputs):
producer_map[outp] = (i, j)
return producer_map
def get_consumer_map(ssa):
"""
Return dict from versioned blob to list of (i, j),
where i is index of consumer op, j is the index of input of that op.
"""
consumer_map = collections.defaultdict(list)
for i in range(len(ssa)):
inputs = ssa[i][0]
for j, inp in enumerate(inputs):
consumer_map[inp].append((i, j))
return consumer_map
def get_params_from_init_net(
init_net: caffe2_pb2.NetDef,
) -> [Dict[str, Any], Dict[str, caffe2_pb2.DeviceOption]]:
"""
Take the output blobs from init_net by running it.
Outputs:
params: dict from blob name to numpy array
device_options: dict from blob name to the device option of its creating op
"""
# NOTE: this assumes that the params is determined by producer op with the
# only exception be CopyGPUToCPU which is CUDA op but returns CPU tensor.
def _get_device_option(producer_op):
if producer_op.type == "CopyGPUToCPU":
return caffe2_pb2.DeviceOption()
else:
return producer_op.device_option
with ScopedWS("__get_params_from_init_net__", is_reset=True, is_cleanup=True) as ws:
ws.RunNetOnce(init_net)
params = {b: fetch_any_blob(b) for b in init_net.external_output}
ssa, versions = core.get_ssa(init_net)
producer_map = get_producer_map(ssa)
device_options = {
b: _get_device_option(init_net.op[producer_map[(b, versions[b])][0]])
for b in init_net.external_output
}
return params, device_options
def _updater_raise(op, input_types, output_types):
raise RuntimeError(
"Failed to apply updater for op {} given input_types {} and"
" output_types {}".format(op, input_types, output_types)
)
def _generic_status_identifier(
predict_net: caffe2_pb2.NetDef,
status_updater: Callable,
known_status: Dict[Tuple[str, int], Any],
) -> Dict[Tuple[str, int], Any]:
"""
Statically infer the status of each blob, the status can be such as device type
(CPU/GPU), layout (NCHW/NHWC), data type (float32/int8), etc. "Blob" here
is versioned blob (Tuple[str, int]) in the format compatible with ssa.
Inputs:
predict_net: the caffe2 network
status_updater: a callable, given an op and the status of its input/output,
it returns the updated status of input/output. `None` is used for
representing unknown status.
known_status: a dict containing known status, used as initialization.
Outputs:
A dict mapping from versioned blob to its status
"""
ssa, versions = core.get_ssa(predict_net)
versioned_ext_input = [(b, 0) for b in predict_net.external_input]
versioned_ext_output = [(b, versions[b]) for b in predict_net.external_output]
all_versioned_blobs = set().union(*[set(x[0] + x[1]) for x in ssa])
allowed_vbs = all_versioned_blobs.union(versioned_ext_input).union(versioned_ext_output)
assert all(k in allowed_vbs for k in known_status)
assert all(v is not None for v in known_status.values())
_known_status = copy.deepcopy(known_status)
def _check_and_update(key, value):
assert value is not None
if key in _known_status:
if not _known_status[key] == value:
raise RuntimeError(
"Confilict status for {}, existing status {}, new status {}".format(
key, _known_status[key], value
)
)
_known_status[key] = value
def _update_i(op, ssa_i):
versioned_inputs = ssa_i[0]
versioned_outputs = ssa_i[1]
inputs_status = [_known_status.get(b, None) for b in versioned_inputs]
outputs_status = [_known_status.get(b, None) for b in versioned_outputs]
new_inputs_status, new_outputs_status = status_updater(op, inputs_status, outputs_status)
for versioned_blob, status in zip(
versioned_inputs + versioned_outputs, new_inputs_status + new_outputs_status
):
if status is not None:
_check_and_update(versioned_blob, status)
for op, ssa_i in zip(predict_net.op, ssa):
_update_i(op, ssa_i)
for op, ssa_i in zip(reversed(predict_net.op), reversed(ssa)):
_update_i(op, ssa_i)
# NOTE: This strictly checks all the blob from predict_net must be assgined
# a known status. However sometimes it's impossible (eg. having deadend op),
# we may relax this constraint if
for k in all_versioned_blobs:
if k not in _known_status:
raise NotImplementedError(
"Can not infer the status for {}. Currently only support the case where"
" a single forward and backward pass can identify status for all blobs.".format(k)
)
return _known_status
def infer_device_type(
predict_net: caffe2_pb2.NetDef,
known_status: Dict[Tuple[str, int], Any],
device_name_style: str = "caffe2",
) -> Dict[Tuple[str, int], str]:
""" Return the device type ("cpu" or "gpu"/"cuda") of each (versioned) blob """
assert device_name_style in ["caffe2", "pytorch"]
_CPU_STR = "cpu"
_GPU_STR = "gpu" if device_name_style == "caffe2" else "cuda"
def _copy_cpu_to_gpu_updater(op, input_types, output_types):
if input_types[0] == _GPU_STR or output_types[0] == _CPU_STR:
_updater_raise(op, input_types, output_types)
return ([_CPU_STR], [_GPU_STR])
def _copy_gpu_to_cpu_updater(op, input_types, output_types):
if input_types[0] == _CPU_STR or output_types[0] == _GPU_STR:
_updater_raise(op, input_types, output_types)
return ([_GPU_STR], [_CPU_STR])
def _other_ops_updater(op, input_types, output_types):
non_none_types = [x for x in input_types + output_types if x is not None]
if len(non_none_types) > 0:
the_type = non_none_types[0]
if not all(x == the_type for x in non_none_types):
_updater_raise(op, input_types, output_types)
else:
the_type = None
return ([the_type for _ in op.input], [the_type for _ in op.output])
def _device_updater(op, *args, **kwargs):
return {
"CopyCPUToGPU": _copy_cpu_to_gpu_updater,
"CopyGPUToCPU": _copy_gpu_to_cpu_updater,
}.get(op.type, _other_ops_updater)(op, *args, **kwargs)
return _generic_status_identifier(predict_net, _device_updater, known_status)
# ==== torch/utils_caffe2/vis.py ===============================================
def _modify_blob_names(ops, blob_rename_f):
ret = []
def _replace_list(blob_list, replaced_list):
del blob_list[:]
blob_list.extend(replaced_list)
for x in ops:
cur = copy.deepcopy(x)
_replace_list(cur.input, list(map(blob_rename_f, cur.input)))
_replace_list(cur.output, list(map(blob_rename_f, cur.output)))
ret.append(cur)
return ret
def _rename_blob(name, blob_sizes, blob_ranges):
def _list_to_str(bsize):
ret = ", ".join([str(x) for x in bsize])
ret = "[" + ret + "]"
return ret
ret = name
if blob_sizes is not None and name in blob_sizes:
ret += "\n" + _list_to_str(blob_sizes[name])
if blob_ranges is not None and name in blob_ranges:
ret += "\n" + _list_to_str(blob_ranges[name])
return ret
# graph_name could not contain word 'graph'
def save_graph(net, file_name, graph_name="net", op_only=True, blob_sizes=None, blob_ranges=None):
blob_rename_f = functools.partial(_rename_blob, blob_sizes=blob_sizes, blob_ranges=blob_ranges)
return save_graph_base(net, file_name, graph_name, op_only, blob_rename_f)
def save_graph_base(net, file_name, graph_name="net", op_only=True, blob_rename_func=None):
graph = None
ops = net.op
if blob_rename_func is not None:
ops = _modify_blob_names(ops, blob_rename_func)
if not op_only:
graph = net_drawer.GetPydotGraph(ops, graph_name, rankdir="TB")
else:
graph = net_drawer.GetPydotGraphMinimal(
ops, graph_name, rankdir="TB", minimal_dependency=True
)
try:
par_dir = os.path.dirname(file_name)
if not os.path.exists(par_dir):
os.makedirs(par_dir)
format = os.path.splitext(os.path.basename(file_name))[-1]
if format == ".png":
graph.write_png(file_name)
elif format == ".pdf":
graph.write_pdf(file_name)
elif format == ".svg":
graph.write_svg(file_name)
else:
print("Incorrect format {}".format(format))
except Exception as e:
print("Error when writing graph to image {}".format(e))
return graph
# ==== torch/utils_toffee/aten_to_caffe2.py ====================================
def group_norm_replace_aten_with_caffe2(predict_net: caffe2_pb2.NetDef):
"""
For ONNX exported model, GroupNorm will be represented as ATen op,
this can be a drop in replacement from ATen to GroupNorm
"""
count = 0
for op in predict_net.op:
if op.type == "ATen":
op_name = get_pb_arg_vals(op, "operator", None) # return byte in py3
if op_name and op_name.decode() == "group_norm":
op.arg.remove(get_pb_arg(op, "operator"))
if get_pb_arg_vali(op, "cudnn_enabled", None):
op.arg.remove(get_pb_arg(op, "cudnn_enabled"))
num_groups = get_pb_arg_vali(op, "num_groups", None)
if num_groups is not None:
op.arg.remove(get_pb_arg(op, "num_groups"))
check_set_pb_arg(op, "group", "i", num_groups)
op.type = "GroupNorm"
count += 1
if count > 1:
logger.info("Replaced {} ATen operator to GroupNormOp".format(count))
# ==== torch/utils_toffee/alias.py =============================================
def alias(x, name, is_backward=False):
if not torch.onnx.is_in_onnx_export():
return x
assert isinstance(x, torch.Tensor)
return torch.ops._caffe2.AliasWithName(x, name, is_backward=is_backward)
def fuse_alias_placeholder(predict_net, init_net):
""" Remove AliasWithName placeholder and rename the input/output of it """
# First we finish all the re-naming
for i, op in enumerate(predict_net.op):
if op.type == "AliasWithName":
assert len(op.input) == 1
assert len(op.output) == 1
name = get_pb_arg_vals(op, "name", None).decode()
is_backward = bool(get_pb_arg_vali(op, "is_backward", 0))
rename_op_input(predict_net, init_net, i, 0, name, from_producer=is_backward)
rename_op_output(predict_net, i, 0, name)
# Remove AliasWithName, should be very safe since it's a non-op
new_ops = []
for op in predict_net.op:
if op.type != "AliasWithName":
new_ops.append(op)
else:
# safety check
assert op.input == op.output
assert op.input[0] == op.arg[0].s.decode()
del predict_net.op[:]
predict_net.op.extend(new_ops)
# ==== torch/utils_caffe2/graph_transform.py ===================================
class IllegalGraphTransformError(ValueError):
""" When a graph transform function call can't be executed. """
def _rename_versioned_blob_in_proto(
proto: caffe2_pb2.NetDef,
old_name: str,
new_name: str,
version: int,
ssa: List[Tuple[List[Tuple[str, int]], List[Tuple[str, int]]]],
start_versions: Dict[str, int],
end_versions: Dict[str, int],
):
""" In given proto, rename all blobs with matched version """
# Operater list
for op, i_th_ssa in zip(proto.op, ssa):
versioned_inputs, versioned_outputs = i_th_ssa
for i in range(len(op.input)):
if versioned_inputs[i] == (old_name, version):
op.input[i] = new_name
for i in range(len(op.output)):
if versioned_outputs[i] == (old_name, version):
op.output[i] = new_name
# external_input
if start_versions.get(old_name, 0) == version:
for i in range(len(proto.external_input)):
if proto.external_input[i] == old_name:
proto.external_input[i] = new_name
# external_output
if end_versions.get(old_name, 0) == version:
for i in range(len(proto.external_output)):
if proto.external_output[i] == old_name:
proto.external_output[i] = new_name
def rename_op_input(
predict_net: caffe2_pb2.NetDef,
init_net: caffe2_pb2.NetDef,
op_id: int,
input_id: int,
new_name: str,
from_producer: bool = False,
):
"""
Rename the op_id-th operator in predict_net, change it's input_id-th input's
name to the new_name. It also does automatic re-route and change
external_input and init_net if necessary.
- It requires the input is only consumed by this op.
- This function modifies predict_net and init_net in-place.
- When from_producer is enable, this also updates other operators that consumes
the same input. Be cautious because may trigger unintended behavior.
"""
assert isinstance(predict_net, caffe2_pb2.NetDef)
assert isinstance(init_net, caffe2_pb2.NetDef)
init_net_ssa, init_net_versions = core.get_ssa(init_net)
predict_net_ssa, predict_net_versions = core.get_ssa(
predict_net, copy.deepcopy(init_net_versions)
)
versioned_inputs, versioned_outputs = predict_net_ssa[op_id]
old_name, version = versioned_inputs[input_id]
if from_producer:
producer_map = get_producer_map(predict_net_ssa)
if not (old_name, version) in producer_map:
raise NotImplementedError(
"Can't find producer, the input {} is probably from"
" init_net, this is not supported yet.".format(old_name)
)
producer = producer_map[(old_name, version)]
rename_op_output(predict_net, producer[0], producer[1], new_name)
return
def contain_targets(op_ssa):
return (old_name, version) in op_ssa[0]
is_consumer = [contain_targets(op_ssa) for op_ssa in predict_net_ssa]
if sum(is_consumer) > 1:
raise IllegalGraphTransformError(
(
"Input '{}' of operator(#{}) are consumed by other ops, please use"
+ " rename_op_output on the producer instead. Offending op: \n{}"
).format(old_name, op_id, predict_net.op[op_id])
)
# update init_net
_rename_versioned_blob_in_proto(
init_net, old_name, new_name, version, init_net_ssa, {}, init_net_versions
)
# update predict_net
_rename_versioned_blob_in_proto(
predict_net,
old_name,
new_name,
version,
predict_net_ssa,
init_net_versions,
predict_net_versions,
)
def rename_op_output(predict_net: caffe2_pb2.NetDef, op_id: int, output_id: int, new_name: str):
"""
Rename the op_id-th operator in predict_net, change it's output_id-th input's
name to the new_name. It also does automatic re-route and change
external_output and if necessary.
- It allows multiple consumers of its output.
- This function modifies predict_net in-place, doesn't need init_net.
"""
assert isinstance(predict_net, caffe2_pb2.NetDef)
ssa, blob_versions = core.get_ssa(predict_net)
versioned_inputs, versioned_outputs = ssa[op_id]
old_name, version = versioned_outputs[output_id]
# update predict_net
_rename_versioned_blob_in_proto(
predict_net, old_name, new_name, version, ssa, {}, blob_versions
)
def get_sub_graph_external_input_output(
predict_net: caffe2_pb2.NetDef, sub_graph_op_indices: List[int]
) -> Tuple[List[Tuple[str, int]], List[Tuple[str, int]]]:
"""
Return the list of external input/output of sub-graph,
each element is tuple of the name and corresponding version in predict_net.
external input/output is defined the same way as caffe2 NetDef.
"""
ssa, versions = core.get_ssa(predict_net)
all_inputs = []
all_outputs = []
for op_id in sub_graph_op_indices:
all_inputs += [inp for inp in ssa[op_id][0] if inp not in all_inputs]
all_outputs += list(ssa[op_id][1]) # ssa output won't repeat
# for versioned blobs, external inputs are just those blob in all_inputs
# but not in all_outputs
ext_inputs = [inp for inp in all_inputs if inp not in all_outputs]
# external outputs are essentially outputs of this subgraph that are used
# outside of this sub-graph (including predict_net.external_output)
all_other_inputs = sum(
(ssa[i][0] for i in range(len(ssa)) if i not in sub_graph_op_indices),
[(outp, versions[outp]) for outp in predict_net.external_output],
)
ext_outputs = [outp for outp in all_outputs if outp in set(all_other_inputs)]
return ext_inputs, ext_outputs
class DiGraph:
""" A DAG representation of caffe2 graph, each vertice is a versioned blob. """
def __init__(self):
self.vertices = set()
self.graph = collections.defaultdict(list)
def add_edge(self, u, v):
self.graph[u].append(v)
self.vertices.add(u)
self.vertices.add(v)
# grab from https://www.geeksforgeeks.org/find-paths-given-source-destination/
def get_all_paths(self, s, d):
visited = {k: False for k in self.vertices}
path = []
all_paths = []
def _get_all_paths_util(graph, u, d, visited, path):
visited[u] = True
path.append(u)
if u == d:
all_paths.append(copy.deepcopy(path))
else:
for i in graph[u]:
if not visited[i]:
_get_all_paths_util(graph, i, d, visited, path)
path.pop()
visited[u] = False
_get_all_paths_util(self.graph, s, d, visited, path)
return all_paths
@staticmethod
def from_ssa(ssa):
graph = DiGraph()
for op_id in range(len(ssa)):
for inp in ssa[op_id][0]:
for outp in ssa[op_id][1]:
graph.add_edge(inp, outp)
return graph
def _get_dependency_chain(ssa, versioned_target, versioned_source):
"""
Return the index list of relevant operator to produce target blob from source blob,
if there's no dependency, return empty list.
"""
# finding all paths between nodes can be O(N!), thus we can only search
# in the subgraph using the op starting from the first consumer of source blob
# to the producer of the target blob.
consumer_map = get_consumer_map(ssa)
producer_map = get_producer_map(ssa)
start_op = min(x[0] for x in consumer_map[versioned_source]) - 15
end_op = (
producer_map[versioned_target][0] + 15 if versioned_target in producer_map else start_op
)
sub_graph_ssa = ssa[start_op : end_op + 1]
if len(sub_graph_ssa) > 30:
logger.warning(
"Subgraph bebetween {} and {} is large (from op#{} to op#{}), it"
" might take non-trival time to find all paths between them.".format(
versioned_source, versioned_target, start_op, end_op
)
)
dag = DiGraph.from_ssa(sub_graph_ssa)
paths = dag.get_all_paths(versioned_source, versioned_target) # include two ends
ops_in_paths = [[producer_map[blob][0] for blob in path[1:]] for path in paths]
return sorted(set().union(*[set(ops) for ops in ops_in_paths]))
def identify_reshape_sub_graph(predict_net: caffe2_pb2.NetDef) -> List[List[int]]:
"""
Idenfity the reshape sub-graph in a protobuf.
The reshape sub-graph is defined as matching the following pattern:
(input_blob) -> Op_1 -> ... -> Op_N -> (new_shape) -─┐
└-------------------------------------------> Reshape -> (output_blob)
Return:
List of sub-graphs, each sub-graph is represented as a list of indices
of the relavent ops, [Op_1, Op_2, ..., Op_N, Reshape]
"""
ssa, _ = core.get_ssa(predict_net)
ret = []
for i, op in enumerate(predict_net.op):
if op.type == "Reshape":
assert len(op.input) == 2
input_ssa = ssa[i][0]
data_source = input_ssa[0]
shape_source = input_ssa[1]
op_indices = _get_dependency_chain(ssa, shape_source, data_source)
ret.append(op_indices + [i])
return ret
def remove_reshape_for_fc(predict_net, params):
"""
In PyTorch nn.Linear has to take 2D tensor, this often leads to reshape
a 4D tensor to 2D by calling .view(). However this (dynamic) reshaping
doesn't work well with ONNX and Int8 tools, and cause using extra
ops (eg. ExpandDims) that might not be available on mobile.
Luckily Caffe2 supports 4D tensor for FC, so we can remove those reshape
after exporting ONNX model.
"""
from caffe2.python import core
# find all reshape sub-graph that can be removed, which is now all Reshape
# sub-graph whose output is only consumed by FC.
# TODO: to make it safer, we may need the actually value to better determine
# if a Reshape before FC is removable.
reshape_sub_graphs = identify_reshape_sub_graph(predict_net)
sub_graphs_to_remove = []
for reshape_sub_graph in reshape_sub_graphs:
reshape_op_id = reshape_sub_graph[-1]
assert predict_net.op[reshape_op_id].type == "Reshape"
ssa, _ = core.get_ssa(predict_net)
reshape_output = ssa[reshape_op_id][1][0]
consumers = [i for i in range(len(ssa)) if reshape_output in ssa[i][0]]
if all(predict_net.op[consumer].type == "FC" for consumer in consumers):
# safety check if the sub-graph is isolated, for this reshape sub-graph,
# it means it has one non-param external input and one external output.
ext_inputs, ext_outputs = get_sub_graph_external_input_output(
predict_net, reshape_sub_graph
)
non_params_ext_inputs = [inp for inp in ext_inputs if inp[1] != 0]
if len(non_params_ext_inputs) == 1 and len(ext_outputs) == 1:
sub_graphs_to_remove.append(reshape_sub_graph)
# perform removing subgraph by:
# 1: rename the Reshape's output to its input, then the graph can be
# seen as in-place itentify, meaning whose external input/output are the same.
# 2: simply remove those ops.
remove_op_ids = []
params_to_remove = []
for sub_graph in sub_graphs_to_remove:
logger.info(
"Remove Reshape sub-graph:\n{}".format(
"".join(["(#{:>4})\n{}".format(i, predict_net.op[i]) for i in sub_graph])
)
)
reshape_op_id = sub_graph[-1]
new_reshap_output = predict_net.op[reshape_op_id].input[0]
rename_op_output(predict_net, reshape_op_id, 0, new_reshap_output)
ext_inputs, ext_outputs = get_sub_graph_external_input_output(predict_net, sub_graph)
non_params_ext_inputs = [inp for inp in ext_inputs if inp[1] != 0]
params_ext_inputs = [inp for inp in ext_inputs if inp[1] == 0]
assert len(non_params_ext_inputs) == 1 and len(ext_outputs) == 1
assert ext_outputs[0][0] == non_params_ext_inputs[0][0]
assert ext_outputs[0][1] == non_params_ext_inputs[0][1] + 1
remove_op_ids.extend(sub_graph)
params_to_remove.extend(params_ext_inputs)
predict_net = copy.deepcopy(predict_net)
new_ops = [op for i, op in enumerate(predict_net.op) if i not in remove_op_ids]
del predict_net.op[:]
predict_net.op.extend(new_ops)
for versioned_params in params_to_remove:
name = versioned_params[0]
logger.info("Remove params: {} from init_net and predict_net.external_input".format(name))
del params[name]
predict_net.external_input.remove(name)
return predict_net, params
def fuse_copy_between_cpu_and_gpu(predict_net: caffe2_pb2.NetDef):
"""
In-place fuse extra copy ops between cpu/gpu for the following case:
a -CopyAToB-> b -CopyBToA> c1 -NextOp1-> d1
-CopyBToA> c2 -NextOp2-> d2
The fused network will look like:
a -NextOp1-> d1
-NextOp2-> d2
"""
_COPY_OPS = ["CopyCPUToGPU", "CopyGPUToCPU"]
def _fuse_once(predict_net):
ssa, blob_versions = core.get_ssa(predict_net)
consumer_map = get_consumer_map(ssa)
versioned_external_output = [
(name, blob_versions[name]) for name in predict_net.external_output
]
for op_id, op in enumerate(predict_net.op):
if op.type in _COPY_OPS:
fw_copy_versioned_output = ssa[op_id][1][0]
consumer_ids = [x[0] for x in consumer_map[fw_copy_versioned_output]]
reverse_op_type = _COPY_OPS[1 - _COPY_OPS.index(op.type)]
is_fusable = (
len(consumer_ids) > 0
and fw_copy_versioned_output not in versioned_external_output
and all(
predict_net.op[_op_id].type == reverse_op_type
and ssa[_op_id][1][0] not in versioned_external_output
for _op_id in consumer_ids
)
)
if is_fusable:
for rv_copy_op_id in consumer_ids:
# making each NextOp uses "a" directly and removing Copy ops
rs_copy_versioned_output = ssa[rv_copy_op_id][1][0]
next_op_id, inp_id = consumer_map[rs_copy_versioned_output][0]
predict_net.op[next_op_id].input[inp_id] = op.input[0]
# remove CopyOps
new_ops = [
op
for i, op in enumerate(predict_net.op)
if i != op_id and i not in consumer_ids
]
del predict_net.op[:]
predict_net.op.extend(new_ops)
return True
return False
# _fuse_once returns False is nothing can be fused
while _fuse_once(predict_net):
pass
def remove_dead_end_ops(net_def: caffe2_pb2.NetDef):
""" remove ops if its output is not used or not in external_output """
ssa, versions = core.get_ssa(net_def)
versioned_external_output = [(name, versions[name]) for name in net_def.external_output]
consumer_map = get_consumer_map(ssa)
removed_op_ids = set()
def _is_dead_end(versioned_blob):
return not (
versioned_blob in versioned_external_output
or (
len(consumer_map[versioned_blob]) > 0
and all(x[0] not in removed_op_ids for x in consumer_map[versioned_blob])
)
)
for i, ssa_i in reversed(list(enumerate(ssa))):
versioned_outputs = ssa_i[1]
if all(_is_dead_end(outp) for outp in versioned_outputs):
removed_op_ids.add(i)
# simply removing those deadend ops should have no effect to external_output
new_ops = [op for i, op in enumerate(net_def.op) if i not in removed_op_ids]
del net_def.op[:]
net_def.op.extend(new_ops)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
from .batch_norm import FrozenBatchNorm2d, get_norm, NaiveSyncBatchNorm
from .deform_conv import DeformConv, ModulatedDeformConv
from .mask_ops import paste_masks_in_image
from .nms import batched_nms, batched_nms_rotated, nms, nms_rotated
from .roi_align import ROIAlign, roi_align
from .roi_align_rotated import ROIAlignRotated, roi_align_rotated
from .shape_spec import ShapeSpec
from .wrappers import BatchNorm2d, Conv2d, ConvTranspose2d, cat, interpolate, Linear
from .blocks import CNNBlockBase
__all__ = [k for k in globals().keys() if not k.startswith("_")]
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
import torch
import torch.distributed as dist
from torch import nn
from torch.autograd.function import Function
from torch.nn import functional as F
from detectron2.utils import comm
from .wrappers import BatchNorm2d
TORCH_VERSION = tuple(int(x) for x in torch.__version__.split(".")[:2])
class FrozenBatchNorm2d(nn.Module):
"""
BatchNorm2d where the batch statistics and the affine parameters are fixed.
It contains non-trainable buffers called
"weight" and "bias", "running_mean", "running_var",
initialized to perform identity transformation.
The pre-trained backbone models from Caffe2 only contain "weight" and "bias",
which are computed from the original four parameters of BN.
The affine transform `x * weight + bias` will perform the equivalent
computation of `(x - running_mean) / sqrt(running_var) * weight + bias`.
When loading a backbone model from Caffe2, "running_mean" and "running_var"
will be left unchanged as identity transformation.
Other pre-trained backbone models may contain all 4 parameters.
The forward is implemented by `F.batch_norm(..., training=False)`.
"""
_version = 3
def __init__(self, num_features, eps=1e-5):
super().__init__()
self.num_features = num_features
self.eps = eps
self.register_buffer("weight", torch.ones(num_features))
self.register_buffer("bias", torch.zeros(num_features))
self.register_buffer("running_mean", torch.zeros(num_features))
self.register_buffer("running_var", torch.ones(num_features) - eps)
def forward(self, x):
if x.requires_grad:
# When gradients are needed, F.batch_norm will use extra memory
# because its backward op computes gradients for weight/bias as well.
scale = self.weight * (self.running_var + self.eps).rsqrt()
bias = self.bias - self.running_mean * scale
scale = scale.reshape(1, -1, 1, 1)
bias = bias.reshape(1, -1, 1, 1)
return x * scale + bias
else:
# When gradients are not needed, F.batch_norm is a single fused op
# and provide more optimization opportunities.
return F.batch_norm(
x,
self.running_mean,
self.running_var,
self.weight,
self.bias,
training=False,
eps=self.eps,
)
def _load_from_state_dict(
self, state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs
):
version = local_metadata.get("version", None)
if version is None or version < 2:
# No running_mean/var in early versions
# This will silent the warnings
if prefix + "running_mean" not in state_dict:
state_dict[prefix + "running_mean"] = torch.zeros_like(self.running_mean)
if prefix + "running_var" not in state_dict:
state_dict[prefix + "running_var"] = torch.ones_like(self.running_var)
if version is not None and version < 3:
logger = logging.getLogger(__name__)
logger.info("FrozenBatchNorm {} is upgraded to version 3.".format(prefix.rstrip(".")))
# In version < 3, running_var are used without +eps.
state_dict[prefix + "running_var"] -= self.eps
super()._load_from_state_dict(
state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs
)
def __repr__(self):
return "FrozenBatchNorm2d(num_features={}, eps={})".format(self.num_features, self.eps)
@classmethod
def convert_frozen_batchnorm(cls, module):
"""
Convert BatchNorm/SyncBatchNorm in module into FrozenBatchNorm.
Args:
module (torch.nn.Module):
Returns:
If module is BatchNorm/SyncBatchNorm, returns a new module.
Otherwise, in-place convert module and return it.
Similar to convert_sync_batchnorm in
https://github.com/pytorch/pytorch/blob/master/torch/nn/modules/batchnorm.py
"""
bn_module = nn.modules.batchnorm
bn_module = (bn_module.BatchNorm2d, bn_module.SyncBatchNorm)
res = module
if isinstance(module, bn_module):
res = cls(module.num_features)
if module.affine:
res.weight.data = module.weight.data.clone().detach()
res.bias.data = module.bias.data.clone().detach()
res.running_mean.data = module.running_mean.data
res.running_var.data = module.running_var.data
res.eps = module.eps
else:
for name, child in module.named_children():
new_child = cls.convert_frozen_batchnorm(child)
if new_child is not child:
res.add_module(name, new_child)
return res
def get_norm(norm, out_channels):
"""
Args:
norm (str or callable): either one of BN, SyncBN, FrozenBN, GN;
or a callable that takes a channel number and returns
the normalization layer as a nn.Module.
Returns:
nn.Module or None: the normalization layer
"""
if isinstance(norm, str):
if len(norm) == 0:
return None
norm = {
"BN": BatchNorm2d,
# Fixed in https://github.com/pytorch/pytorch/pull/36382
"SyncBN": NaiveSyncBatchNorm if TORCH_VERSION <= (1, 5) else nn.SyncBatchNorm,
"FrozenBN": FrozenBatchNorm2d,
"GN": lambda channels: nn.GroupNorm(32, channels),
# for debugging:
"nnSyncBN": nn.SyncBatchNorm,
"naiveSyncBN": NaiveSyncBatchNorm,
}[norm]
return norm(out_channels)
class AllReduce(Function):
@staticmethod
def forward(ctx, input):
input_list = [torch.zeros_like(input) for k in range(dist.get_world_size())]
# Use allgather instead of allreduce since I don't trust in-place operations ..
dist.all_gather(input_list, input, async_op=False)
inputs = torch.stack(input_list, dim=0)
return torch.sum(inputs, dim=0)
@staticmethod
def backward(ctx, grad_output):
dist.all_reduce(grad_output, async_op=False)
return grad_output
class NaiveSyncBatchNorm(BatchNorm2d):
"""
In PyTorch<=1.5, `nn.SyncBatchNorm` has incorrect gradient
when the batch size on each worker is different.
(e.g., when scale augmentation is used, or when it is applied to mask head).
This is a slower but correct alternative to `nn.SyncBatchNorm`.
Note:
There isn't a single definition of Sync BatchNorm.
When ``stats_mode==""``, this module computes overall statistics by using
statistics of each worker with equal weight. The result is true statistics
of all samples (as if they are all on one worker) only when all workers
have the same (N, H, W). This mode does not support inputs with zero batch size.
When ``stats_mode=="N"``, this module computes overall statistics by weighting
the statistics of each worker by their ``N``. The result is true statistics
of all samples (as if they are all on one worker) only when all workers
have the same (H, W). It is slower than ``stats_mode==""``.
Even though the result of this module may not be the true statistics of all samples,
it may still be reasonable because it might be preferrable to assign equal weights
to all workers, regardless of their (H, W) dimension, instead of putting larger weight
on larger images. From preliminary experiments, little difference is found between such
a simplified implementation and an accurate computation of overall mean & variance.
"""
def __init__(self, *args, stats_mode="", **kwargs):
super().__init__(*args, **kwargs)
assert stats_mode in ["", "N"]
self._stats_mode = stats_mode
def forward(self, input):
if comm.get_world_size() == 1 or not self.training:
return super().forward(input)
B, C = input.shape[0], input.shape[1]
mean = torch.mean(input, dim=[0, 2, 3])
meansqr = torch.mean(input * input, dim=[0, 2, 3])
if self._stats_mode == "":
assert B > 0, 'SyncBatchNorm(stats_mode="") does not support zero batch size.'
vec = torch.cat([mean, meansqr], dim=0)
vec = AllReduce.apply(vec) * (1.0 / dist.get_world_size())
mean, meansqr = torch.split(vec, C)
momentum = self.momentum
else:
if B == 0:
vec = torch.zeros([2 * C + 1], device=mean.device, dtype=mean.dtype)
vec = vec + input.sum() # make sure there is gradient w.r.t input
else:
vec = torch.cat(
[mean, meansqr, torch.ones([1], device=mean.device, dtype=mean.dtype)], dim=0
)
vec = AllReduce.apply(vec * B)
total_batch = vec[-1].detach()
momentum = total_batch.clamp(max=1) * self.momentum # no update if total_batch is 0
total_batch = torch.max(total_batch, torch.ones_like(total_batch)) # avoid div-by-zero
mean, meansqr, _ = torch.split(vec / total_batch, C)
var = meansqr - mean * mean
invstd = torch.rsqrt(var + self.eps)
scale = self.weight * invstd
bias = self.bias - mean * scale
scale = scale.reshape(1, -1, 1, 1)
bias = bias.reshape(1, -1, 1, 1)
self.running_mean += momentum * (mean.detach() - self.running_mean)
self.running_var += momentum * (var.detach() - self.running_var)
return input * scale + bias
# -*- coding: utf-8 -*-
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
from torch import nn
from .batch_norm import FrozenBatchNorm2d
class CNNBlockBase(nn.Module):
"""
A CNN block is assumed to have input channels, output channels and a stride.
The input and output of `forward()` method must be NCHW tensors.
The method can perform arbitrary computation but must match the given
channels and stride specification.
Attribute:
in_channels (int):
out_channels (int):
stride (int):
"""
def __init__(self, in_channels, out_channels, stride):
"""
The `__init__` method of any subclass should also contain these arguments.
Args:
in_channels (int):
out_channels (int):
stride (int):
"""
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.stride = stride
def freeze(self):
"""
Make this block not trainable.
This method sets all parameters to `requires_grad=False`,
and convert all BatchNorm layers to FrozenBatchNorm
Returns:
the block itself
"""
for p in self.parameters():
p.requires_grad = False
FrozenBatchNorm2d.convert_frozen_batchnorm(self)
return self
To add a new Op:
1. Create a new directory
2. Implement new ops there
3. Delcare its Python interface in `vision.cpp`.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment