# Adapted from https://github.com/facebookresearch/detectron2/ # Copyright (c) Facebook, Inc. and its affiliates. import argparse import glob import multiprocessing as mp import os import time import cv2 import tqdm from d2go.model_zoo import model_zoo from d2go.utils.demo_predictor import VisualizationDemo from detectron2.data.detection_utils import read_image from detectron2.utils.logger import setup_logger # constants WINDOW_NAME = "COCO detections" def setup_cfg(cfg, args): # Set score_threshold for builtin models cfg.MODEL.RETINANET.SCORE_THRESH_TEST = args.confidence_threshold cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = args.confidence_threshold cfg.MODEL.PANOPTIC_FPN.COMBINE.INSTANCES_CONFIDENCE_THRESH = ( args.confidence_threshold ) cfg.freeze() return cfg def get_parser(): parser = argparse.ArgumentParser(description="Detectron2 demo for builtin configs") parser.add_argument( "--config-file", default="keypoint_rcnn_fbnetv3a_dsmask_C4.yaml", metavar="FILE", help="path to config file", ) parser.add_argument( "--webcam", action="store_true", help="Take inputs from webcam." ) parser.add_argument("--video-input", help="Path to video file.") parser.add_argument( "--input", nargs="+", help="A list of space separated input images; " "or a single glob pattern such as 'directory/*.jpg'", ) parser.add_argument( "--output", help="A file or directory to save output visualizations. " "If not given, will show output in an OpenCV window.", ) parser.add_argument( "--confidence-threshold", type=float, default=0.5, help="Minimum score for instance predictions to be shown", ) parser.add_argument( "--opts", help="Modify config options using the command-line 'KEY VALUE' pairs", default=[], nargs=argparse.REMAINDER, ) return parser def main(): mp.set_start_method("spawn", force=True) args = get_parser().parse_args() setup_logger(name="fvcore") logger = setup_logger() logger.info("Arguments: " + str(args)) cfg = model_zoo.get_config(args.config_file) cfg = setup_cfg(cfg, args) demo = VisualizationDemo(cfg, args.config_file) if args.input: if len(args.input) == 1: args.input = glob.glob(os.path.expanduser(args.input[0])) assert args.input, "The input path(s) was not found" for path in tqdm.tqdm(args.input, disable=not args.output): # use PIL, to be consistent with evaluation img = read_image(path, format="BGR") start_time = time.time() predictions, visualized_output = demo.run_on_image(img) logger.info( "{}: {} in {:.2f}s".format( path, ( "detected {} instances".format(len(predictions["instances"])) if "instances" in predictions else "finished" ), time.time() - start_time, ) ) if args.output: if os.path.isdir(args.output): assert os.path.isdir(args.output), args.output out_filename = os.path.join(args.output, os.path.basename(path)) else: assert ( len(args.input) == 1 ), "Please specify a directory with args.output" out_filename = args.output visualized_output.save(out_filename) else: cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL) cv2.imshow(WINDOW_NAME, visualized_output.get_image()[:, :, ::-1]) if cv2.waitKey(0) == 27: break # esc to quit elif args.video_input: video = cv2.VideoCapture(args.video_input) width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) frames_per_second = video.get(cv2.CAP_PROP_FPS) num_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) basename = os.path.basename(args.video_input) if args.output: if os.path.isdir(args.output): output_fname = os.path.join(args.output, basename) output_fname = os.path.splitext(output_fname)[0] + ".mkv" else: output_fname = args.output assert not os.path.isfile(output_fname), output_fname output_file = cv2.VideoWriter( filename=output_fname, # some installation of opencv may not support x264 (due to its license), # you can try other format (e.g. MPEG) fourcc=cv2.VideoWriter_fourcc(*"x264"), fps=float(frames_per_second), frameSize=(width, height), isColor=True, ) assert os.path.isfile(args.video_input) for vis_frame in tqdm.tqdm(demo.run_on_video(video), total=num_frames): if args.output: output_file.write(vis_frame) else: cv2.namedWindow(basename, cv2.WINDOW_NORMAL) cv2.imshow(basename, vis_frame) if cv2.waitKey(1) == 27: break # esc to quit video.release() if args.output: output_file.release() else: cv2.destroyAllWindows() if __name__ == "__main__": main()