Commit 75d93027 authored by bailuo's avatar bailuo
Browse files

init

parents
Pipeline #2479 failed with stages
in 0 seconds
import math
from lib.models.artrack_seq import build_artrack_seq
from lib.test.tracker.basetracker import BaseTracker
import torch
from lib.test.tracker.vis_utils import gen_visualization
from lib.test.utils.hann import hann2d
from lib.train.data.processing_utils import sample_target, transform_image_to_crop
# for debug
import cv2
import os
from lib.test.tracker.data_utils import Preprocessor
from lib.utils.box_ops import clip_box
from lib.utils.ce_utils import generate_mask_cond
class ARTrackSeq(BaseTracker):
def __init__(self, params, dataset_name):
super(ARTrackSeq, self).__init__(params)
network = build_artrack_seq(params.cfg, training=False)
print(self.params.checkpoint)
network.load_state_dict(torch.load(self.params.checkpoint, map_location='cpu')['net'], strict=True)
self.cfg = params.cfg
self.bins = self.cfg.MODEL.BINS
self.network = network.cuda()
self.network.eval()
self.preprocessor = Preprocessor()
self.state = None
self.feat_sz = self.cfg.TEST.SEARCH_SIZE // self.cfg.MODEL.BACKBONE.STRIDE
# motion constrain
self.output_window = hann2d(torch.tensor([self.feat_sz, self.feat_sz]).long(), centered=True).cuda()
# for debug
self.debug = params.debug
self.use_visdom = params.debug
self.frame_id = 0
if self.debug:
if not self.use_visdom:
self.save_dir = "debug"
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
else:
# self.add_hook()
self._init_visdom(None, 1)
# for save boxes from all queries
self.save_all_boxes = params.save_all_boxes
self.z_dict1 = {}
self.store_result = None
self.save_all = 7
self.x_feat = None
self.update = None
self.update_threshold = 5.0
self.update_intervals = 1
def initialize(self, image, info: dict):
# forward the template once
self.x_feat = None
z_patch_arr, resize_factor, z_amask_arr = sample_target(image, info['init_bbox'], self.params.template_factor,
output_sz=self.params.template_size) # output_sz=self.params.template_size
self.z_patch_arr = z_patch_arr
template = self.preprocessor.process(z_patch_arr, z_amask_arr)
with torch.no_grad():
self.z_dict1 = template
self.box_mask_z = None
# if self.cfg.MODEL.BACKBONE.CE_LOC:
# template_bbox = self.transform_bbox_to_crop(info['init_bbox'], resize_factor,
# template.tensors.device).squeeze(1)
# self.box_mask_z = generate_mask_cond(self.cfg, 1, template.tensors.device, template_bbox)
# save states
self.state = info['init_bbox']
self.store_result = [info['init_bbox'].copy()]
for i in range(self.save_all - 1):
self.store_result.append(info['init_bbox'].copy())
self.frame_id = 0
self.update = None
if self.save_all_boxes:
'''save all predicted boxes'''
all_boxes_save = info['init_bbox'] * self.cfg.MODEL.NUM_OBJECT_QUERIES
return {"all_boxes": all_boxes_save}
def track(self, image, info: dict = None):
H, W, _ = image.shape
self.frame_id += 1
x_patch_arr, resize_factor, x_amask_arr = sample_target(image, self.state, self.params.search_factor,
output_sz=self.params.search_size) # (x1, y1, w, h)
for i in range(len(self.store_result)):
box_temp = self.store_result[i].copy()
box_out_i = transform_image_to_crop(torch.Tensor(self.store_result[i]), torch.Tensor(self.state),
resize_factor,
torch.Tensor([self.cfg.TEST.SEARCH_SIZE, self.cfg.TEST.SEARCH_SIZE]),
normalize=True)
box_out_i[2] = box_out_i[2] + box_out_i[0]
box_out_i[3] = box_out_i[3] + box_out_i[1]
box_out_i = box_out_i.clamp(min=-0.5, max=1.5)
box_out_i = (box_out_i + 0.5) * (self.bins - 1)
if i == 0:
seqs_out = box_out_i
else:
seqs_out = torch.cat((seqs_out, box_out_i), dim=-1)
seqs_out = seqs_out.unsqueeze(0)
search = self.preprocessor.process(x_patch_arr, x_amask_arr)
with torch.no_grad():
x_dict = search
# merge the template and the search
# run the transformer
out_dict = self.network.forward(
template=self.z_dict1.tensors, search=x_dict.tensors,
seq_input=seqs_out, stage="sequence", search_feature=self.x_feat, update=None)
self.x_feat = out_dict['x_feat']
pred_boxes = out_dict['seqs'][:, 0:4] / (self.bins - 1) - 0.5
pred_boxes = pred_boxes.view(-1, 4).mean(dim=0)
pred_new = pred_boxes
pred_new[2] = pred_boxes[2] - pred_boxes[0]
pred_new[3] = pred_boxes[3] - pred_boxes[1]
pred_new[0] = pred_boxes[0] + pred_new[2] / 2
pred_new[1] = pred_boxes[1] + pred_new[3] / 2
pred_boxes = (pred_new * self.params.search_size / resize_factor).tolist()
# Baseline: Take the mean of all pred boxes as the final result
# pred_box = (pred_boxes.mean(
# dim=0) * self.params.search_size / resize_factor).tolist() # (cx, cy, w, h) [0,1]
# get the final box result
self.state = clip_box(self.map_box_back(pred_boxes, resize_factor), H, W, margin=10)
if len(self.store_result) < self.save_all:
self.store_result.append(self.state.copy())
else:
for i in range(self.save_all):
if i != self.save_all - 1:
self.store_result[i] = self.store_result[i + 1]
else:
self.store_result[i] = self.state.copy()
# for debug
if self.debug:
if not self.use_visdom:
x1, y1, w, h = self.state
image_BGR = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
cv2.rectangle(image_BGR, (int(x1), int(y1)), (int(x1 + w), int(y1 + h)), color=(0, 0, 255), thickness=2)
save_path = os.path.join(self.save_dir, "%04d.jpg" % self.frame_id)
cv2.imwrite(save_path, image_BGR)
else:
self.visdom.register((image, info['gt_bbox'].tolist(), self.state), 'Tracking', 1, 'Tracking')
self.visdom.register(torch.from_numpy(x_patch_arr).permute(2, 0, 1), 'image', 1, 'search_region')
self.visdom.register(torch.from_numpy(self.z_patch_arr).permute(2, 0, 1), 'image', 1, 'template')
self.visdom.register(pred_score_map.view(self.feat_sz, self.feat_sz), 'heatmap', 1, 'score_map')
self.visdom.register((pred_score_map * self.output_window).view(self.feat_sz, self.feat_sz), 'heatmap',
1, 'score_map_hann')
if 'removed_indexes_s' in out_dict and out_dict['removed_indexes_s']:
removed_indexes_s = out_dict['removed_indexes_s']
removed_indexes_s = [removed_indexes_s_i.cpu().numpy() for removed_indexes_s_i in removed_indexes_s]
masked_search = gen_visualization(x_patch_arr, removed_indexes_s)
self.visdom.register(torch.from_numpy(masked_search).permute(2, 0, 1), 'image', 1, 'masked_search')
while self.pause_mode:
if self.step:
self.step = False
break
if self.save_all_boxes:
'''save all predictions'''
all_boxes = self.map_box_back_batch(pred_boxes * self.params.search_size / resize_factor, resize_factor)
all_boxes_save = all_boxes.view(-1).tolist() # (4N, )
return {"target_bbox": self.state,
"all_boxes": all_boxes_save}
else:
return {"target_bbox": self.state}
def map_box_back(self, pred_box: list, resize_factor: float):
cx_prev, cy_prev = self.state[0] + 0.5 * self.state[2], self.state[1] + 0.5 * self.state[3]
cx, cy, w, h = pred_box
half_side = 0.5 * self.params.search_size / resize_factor
cx_real = cx + (cx_prev - half_side)
cy_real = cy + (cy_prev - half_side)
# cx_real = cx + cx_prev
# cy_real = cy + cy_prev
return [cx_real - 0.5 * w, cy_real - 0.5 * h, w, h]
def map_box_back_batch(self, pred_box: torch.Tensor, resize_factor: float):
cx_prev, cy_prev = self.state[0] + 0.5 * self.state[2], self.state[1] + 0.5 * self.state[3]
cx, cy, w, h = pred_box.unbind(-1) # (N,4) --> (N,)
half_side = 0.5 * self.params.search_size / resize_factor
cx_real = cx + (cx_prev - half_side)
cy_real = cy + (cy_prev - half_side)
return torch.stack([cx_real - 0.5 * w, cy_real - 0.5 * h, w, h], dim=-1)
def add_hook(self):
conv_features, enc_attn_weights, dec_attn_weights = [], [], []
for i in range(12):
self.network.backbone.blocks[i].attn.register_forward_hook(
# lambda self, input, output: enc_attn_weights.append(output[1])
lambda self, input, output: enc_attn_weights.append(output[1])
)
self.enc_attn_weights = enc_attn_weights
def get_tracker_class():
return ARTrackSeq
import time
import torch
from _collections import OrderedDict
from lib.train.data.processing_utils import transform_image_to_crop
from lib.vis.visdom_cus import Visdom
class BaseTracker:
"""Base class for all trackers."""
def __init__(self, params):
self.params = params
self.visdom = None
def predicts_segmentation_mask(self):
return False
def initialize(self, image, info: dict) -> dict:
"""Overload this function in your tracker. This should initialize the model."""
raise NotImplementedError
def track(self, image, info: dict = None) -> dict:
"""Overload this function in your tracker. This should track in the frame and update the model."""
raise NotImplementedError
def visdom_draw_tracking(self, image, box, segmentation=None):
if isinstance(box, OrderedDict):
box = [v for k, v in box.items()]
else:
box = (box,)
if segmentation is None:
self.visdom.register((image, *box), 'Tracking', 1, 'Tracking')
else:
self.visdom.register((image, *box, segmentation), 'Tracking', 1, 'Tracking')
def transform_bbox_to_crop(self, box_in, resize_factor, device, box_extract=None, crop_type='template'):
# box_in: list [x1, y1, w, h], not normalized
# box_extract: same as box_in
# out bbox: Torch.tensor [1, 1, 4], x1y1wh, normalized
if crop_type == 'template':
crop_sz = torch.Tensor([self.params.template_size, self.params.template_size])
elif crop_type == 'search':
crop_sz = torch.Tensor([self.params.search_size, self.params.search_size])
else:
raise NotImplementedError
box_in = torch.tensor(box_in)
if box_extract is None:
box_extract = box_in
else:
box_extract = torch.tensor(box_extract)
template_bbox = transform_image_to_crop(box_in, box_extract, resize_factor, crop_sz, normalize=True)
template_bbox = template_bbox.view(1, 1, 4).to(device)
return template_bbox
def _init_visdom(self, visdom_info, debug):
visdom_info = {} if visdom_info is None else visdom_info
self.pause_mode = False
self.step = False
self.next_seq = False
if debug > 0 and visdom_info.get('use_visdom', True):
try:
self.visdom = Visdom(debug, {'handler': self._visdom_ui_handler, 'win_id': 'Tracking'},
visdom_info=visdom_info)
# # Show help
# help_text = 'You can pause/unpause the tracker by pressing ''space'' with the ''Tracking'' window ' \
# 'selected. During paused mode, you can track for one frame by pressing the right arrow key.' \
# 'To enable/disable plotting of a data block, tick/untick the corresponding entry in ' \
# 'block list.'
# self.visdom.register(help_text, 'text', 1, 'Help')
except:
time.sleep(0.5)
print('!!! WARNING: Visdom could not start, so using matplotlib visualization instead !!!\n'
'!!! Start Visdom in a separate terminal window by typing \'visdom\' !!!')
def _visdom_ui_handler(self, data):
if data['event_type'] == 'KeyPress':
if data['key'] == ' ':
self.pause_mode = not self.pause_mode
elif data['key'] == 'ArrowRight' and self.pause_mode:
self.step = True
elif data['key'] == 'n':
self.next_seq = True
import torch
import numpy as np
from lib.utils.misc import NestedTensor
class Preprocessor(object):
def __init__(self):
self.mean = torch.tensor([0.485, 0.456, 0.406]).view((1, 3, 1, 1)).cuda()
self.std = torch.tensor([0.229, 0.224, 0.225]).view((1, 3, 1, 1)).cuda()
def process(self, img_arr: np.ndarray, amask_arr: np.ndarray):
# Deal with the image patch
img_tensor = torch.tensor(img_arr).cuda().float().permute((2,0,1)).unsqueeze(dim=0)
img_tensor_norm = ((img_tensor / 255.0) - self.mean) / self.std # (1,3,H,W)
# Deal with the attention mask
amask_tensor = torch.from_numpy(amask_arr).to(torch.bool).cuda().unsqueeze(dim=0) # (1,H,W)
return NestedTensor(img_tensor_norm, amask_tensor)
class PreprocessorX(object):
def __init__(self):
self.mean = torch.tensor([0.485, 0.456, 0.406]).view((1, 3, 1, 1)).cuda()
self.std = torch.tensor([0.229, 0.224, 0.225]).view((1, 3, 1, 1)).cuda()
def process(self, img_arr: np.ndarray, amask_arr: np.ndarray):
# Deal with the image patch
img_tensor = torch.tensor(img_arr).cuda().float().permute((2,0,1)).unsqueeze(dim=0)
img_tensor_norm = ((img_tensor / 255.0) - self.mean) / self.std # (1,3,H,W)
# Deal with the attention mask
amask_tensor = torch.from_numpy(amask_arr).to(torch.bool).cuda().unsqueeze(dim=0) # (1,H,W)
return img_tensor_norm, amask_tensor
class PreprocessorX_onnx(object):
def __init__(self):
self.mean = np.array([0.485, 0.456, 0.406]).reshape((1, 3, 1, 1))
self.std = np.array([0.229, 0.224, 0.225]).reshape((1, 3, 1, 1))
def process(self, img_arr: np.ndarray, amask_arr: np.ndarray):
"""img_arr: (H,W,3), amask_arr: (H,W)"""
# Deal with the image patch
img_arr_4d = img_arr[np.newaxis, :, :, :].transpose(0, 3, 1, 2)
img_arr_4d = (img_arr_4d / 255.0 - self.mean) / self.std # (1, 3, H, W)
# Deal with the attention mask
amask_arr_3d = amask_arr[np.newaxis, :, :] # (1,H,W)
return img_arr_4d.astype(np.float32), amask_arr_3d.astype(np.bool)
import numpy as np
############## used for visulize eliminated tokens #################
def get_keep_indices(decisions):
keep_indices = []
for i in range(3):
if i == 0:
keep_indices.append(decisions[i])
else:
keep_indices.append(keep_indices[-1][decisions[i]])
return keep_indices
def gen_masked_tokens(tokens, indices, alpha=0.2):
# indices = [i for i in range(196) if i not in indices]
indices = indices[0].astype(int)
tokens = tokens.copy()
tokens[indices] = alpha * tokens[indices] + (1 - alpha) * 255
return tokens
def recover_image(tokens, H, W, Hp, Wp, patch_size):
# image: (C, 196, 16, 16)
image = tokens.reshape(Hp, Wp, patch_size, patch_size, 3).swapaxes(1, 2).reshape(H, W, 3)
return image
def pad_img(img):
height, width, channels = img.shape
im_bg = np.ones((height, width + 8, channels)) * 255
im_bg[0:height, 0:width, :] = img
return im_bg
def gen_visualization(image, mask_indices, patch_size=16):
# image [224, 224, 3]
# mask_indices, list of masked token indices
# mask mask_indices need to cat
# mask_indices = mask_indices[::-1]
num_stages = len(mask_indices)
for i in range(1, num_stages):
mask_indices[i] = np.concatenate([mask_indices[i-1], mask_indices[i]], axis=1)
# keep_indices = get_keep_indices(decisions)
image = np.asarray(image)
H, W, C = image.shape
Hp, Wp = H // patch_size, W // patch_size
image_tokens = image.reshape(Hp, patch_size, Wp, patch_size, 3).swapaxes(1, 2).reshape(Hp * Wp, patch_size, patch_size, 3)
stages = [
recover_image(gen_masked_tokens(image_tokens, mask_indices[i]), H, W, Hp, Wp, patch_size)
for i in range(num_stages)
]
imgs = [image] + stages
imgs = [pad_img(img) for img in imgs]
viz = np.concatenate(imgs, axis=1)
return viz
from .params import TrackerParams, FeatureParams, Choice
\ No newline at end of file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os.path as osp
import sys
def add_path(path):
if path not in sys.path:
sys.path.insert(0, path)
this_dir = osp.dirname(__file__)
prj_path = osp.join(this_dir, '..', '..', '..')
add_path(prj_path)
import torch
import math
import torch.nn.functional as F
def hann1d(sz: int, centered = True) -> torch.Tensor:
"""1D cosine window."""
if centered:
return 0.5 * (1 - torch.cos((2 * math.pi / (sz + 1)) * torch.arange(1, sz + 1).float()))
w = 0.5 * (1 + torch.cos((2 * math.pi / (sz + 2)) * torch.arange(0, sz//2 + 1).float()))
return torch.cat([w, w[1:sz-sz//2].flip((0,))])
def hann2d(sz: torch.Tensor, centered = True) -> torch.Tensor:
"""2D cosine window."""
return hann1d(sz[0].item(), centered).reshape(1, 1, -1, 1) * hann1d(sz[1].item(), centered).reshape(1, 1, 1, -1)
def hann2d_bias(sz: torch.Tensor, ctr_point: torch.Tensor, centered = True) -> torch.Tensor:
"""2D cosine window."""
distance = torch.stack([ctr_point, sz-ctr_point], dim=0)
max_distance, _ = distance.max(dim=0)
hann1d_x = hann1d(max_distance[0].item() * 2, centered)
hann1d_x = hann1d_x[max_distance[0] - distance[0, 0]: max_distance[0] + distance[1, 0]]
hann1d_y = hann1d(max_distance[1].item() * 2, centered)
hann1d_y = hann1d_y[max_distance[1] - distance[0, 1]: max_distance[1] + distance[1, 1]]
return hann1d_y.reshape(1, 1, -1, 1) * hann1d_x.reshape(1, 1, 1, -1)
def hann2d_clipped(sz: torch.Tensor, effective_sz: torch.Tensor, centered = True) -> torch.Tensor:
"""1D clipped cosine window."""
# Ensure that the difference is even
effective_sz += (effective_sz - sz) % 2
effective_window = hann1d(effective_sz[0].item(), True).reshape(1, 1, -1, 1) * hann1d(effective_sz[1].item(), True).reshape(1, 1, 1, -1)
pad = (sz - effective_sz) // 2
window = F.pad(effective_window, (pad[1].item(), pad[1].item(), pad[0].item(), pad[0].item()), 'replicate')
if centered:
return window
else:
mid = (sz / 2).int()
window_shift_lr = torch.cat((window[:, :, :, mid[1]:], window[:, :, :, :mid[1]]), 3)
return torch.cat((window_shift_lr[:, :, mid[0]:, :], window_shift_lr[:, :, :mid[0], :]), 2)
def gauss_fourier(sz: int, sigma: float, half: bool = False) -> torch.Tensor:
if half:
k = torch.arange(0, int(sz/2+1))
else:
k = torch.arange(-int((sz-1)/2), int(sz/2+1))
return (math.sqrt(2*math.pi) * sigma / sz) * torch.exp(-2 * (math.pi * sigma * k.float() / sz)**2)
def gauss_spatial(sz, sigma, center=0, end_pad=0):
k = torch.arange(-(sz-1)/2, (sz+1)/2+end_pad)
return torch.exp(-1.0/(2*sigma**2) * (k - center)**2)
def label_function(sz: torch.Tensor, sigma: torch.Tensor):
return gauss_fourier(sz[0].item(), sigma[0].item()).reshape(1, 1, -1, 1) * gauss_fourier(sz[1].item(), sigma[1].item(), True).reshape(1, 1, 1, -1)
def label_function_spatial(sz: torch.Tensor, sigma: torch.Tensor, center: torch.Tensor = torch.zeros(2), end_pad: torch.Tensor = torch.zeros(2)):
"""The origin is in the middle of the image."""
return gauss_spatial(sz[0].item(), sigma[0].item(), center[0], end_pad[0].item()).reshape(1, 1, -1, 1) * \
gauss_spatial(sz[1].item(), sigma[1].item(), center[1], end_pad[1].item()).reshape(1, 1, 1, -1)
def cubic_spline_fourier(f, a):
"""The continuous Fourier transform of a cubic spline kernel."""
bf = (6*(1 - torch.cos(2 * math.pi * f)) + 3*a*(1 - torch.cos(4 * math.pi * f))
- (6 + 8*a)*math.pi*f*torch.sin(2 * math.pi * f) - 2*a*math.pi*f*torch.sin(4 * math.pi * f)) \
/ (4 * math.pi**4 * f**4)
bf[f == 0] = 1
return bf
def max2d(a: torch.Tensor) -> (torch.Tensor, torch.Tensor):
"""Computes maximum and argmax in the last two dimensions."""
max_val_row, argmax_row = torch.max(a, dim=-2)
max_val, argmax_col = torch.max(max_val_row, dim=-1)
argmax_row = argmax_row.view(argmax_col.numel(),-1)[torch.arange(argmax_col.numel()), argmax_col.view(-1)]
argmax_row = argmax_row.reshape(argmax_col.shape)
argmax = torch.cat((argmax_row.unsqueeze(-1), argmax_col.unsqueeze(-1)), -1)
return max_val, argmax
import numpy as np
import pandas as pd
def load_text_numpy(path, delimiter, dtype):
if isinstance(delimiter, (tuple, list)):
for d in delimiter:
try:
ground_truth_rect = np.loadtxt(path, delimiter=d, dtype=dtype)
return ground_truth_rect
except:
pass
raise Exception('Could not read file {}'.format(path))
else:
ground_truth_rect = np.loadtxt(path, delimiter=delimiter, dtype=dtype)
return ground_truth_rect
def load_text_pandas(path, delimiter, dtype):
if isinstance(delimiter, (tuple, list)):
for d in delimiter:
try:
ground_truth_rect = pd.read_csv(path, delimiter=d, header=None, dtype=dtype, na_filter=False,
low_memory=False).values
return ground_truth_rect
except Exception as e:
pass
raise Exception('Could not read file {}'.format(path))
else:
ground_truth_rect = pd.read_csv(path, delimiter=delimiter, header=None, dtype=dtype, na_filter=False,
low_memory=False).values
return ground_truth_rect
def load_text(path, delimiter=' ', dtype=np.float32, backend='numpy'):
if backend == 'numpy':
return load_text_numpy(path, delimiter, dtype)
elif backend == 'pandas':
return load_text_pandas(path, delimiter, dtype)
def load_str(path):
with open(path, "r") as f:
text_str = f.readline().strip().lower()
return text_str
from lib.utils import TensorList
import random
class TrackerParams:
"""Class for tracker parameters."""
def set_default_values(self, default_vals: dict):
for name, val in default_vals.items():
if not hasattr(self, name):
setattr(self, name, val)
def get(self, name: str, *default):
"""Get a parameter value with the given name. If it does not exists, it return the default value given as a
second argument or returns an error if no default value is given."""
if len(default) > 1:
raise ValueError('Can only give one default value.')
if not default:
return getattr(self, name)
return getattr(self, name, default[0])
def has(self, name: str):
"""Check if there exist a parameter with the given name."""
return hasattr(self, name)
class FeatureParams:
"""Class for feature specific parameters"""
def __init__(self, *args, **kwargs):
if len(args) > 0:
raise ValueError
for name, val in kwargs.items():
if isinstance(val, list):
setattr(self, name, TensorList(val))
else:
setattr(self, name, val)
def Choice(*args):
"""Can be used to sample random parameter values."""
return random.choice(args)
import numpy as np
import os
import shutil
import argparse
import _init_paths
from lib.test.evaluation.environment import env_settings
def transform_got10k(tracker_name, cfg_name):
env = env_settings()
result_dir = env.results_path
src_dir = os.path.join(result_dir, "%s/%s/got10k/" % (tracker_name, cfg_name))
dest_dir = os.path.join(result_dir, "%s/%s/got10k_submit/" % (tracker_name, cfg_name))
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
items = os.listdir(src_dir)
for item in items:
if "all" in item:
continue
src_path = os.path.join(src_dir, item)
if "time" not in item:
seq_name = item.replace(".txt", '')
seq_dir = os.path.join(dest_dir, seq_name)
if not os.path.exists(seq_dir):
os.makedirs(seq_dir)
new_item = item.replace(".txt", '_001.txt')
dest_path = os.path.join(seq_dir, new_item)
bbox_arr = np.loadtxt(src_path, dtype=np.int, delimiter='\t')
np.savetxt(dest_path, bbox_arr, fmt='%d', delimiter=',')
else:
seq_name = item.replace("_time.txt", '')
seq_dir = os.path.join(dest_dir, seq_name)
if not os.path.exists(seq_dir):
os.makedirs(seq_dir)
dest_path = os.path.join(seq_dir, item)
os.system("cp %s %s" % (src_path, dest_path))
# make zip archive
shutil.make_archive(src_dir, "zip", src_dir)
shutil.make_archive(dest_dir, "zip", dest_dir)
# Remove the original files
shutil.rmtree(src_dir)
shutil.rmtree(dest_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='transform got10k results.')
parser.add_argument('--tracker_name', type=str, help='Name of tracking method.')
parser.add_argument('--cfg_name', type=str, help='Name of config file.')
args = parser.parse_args()
transform_got10k(args.tracker_name, args.cfg_name)
import numpy as np
import os
import shutil
import argparse
import _init_paths
from lib.test.evaluation.environment import env_settings
def transform_trackingnet(tracker_name, cfg_name):
env = env_settings()
result_dir = env.results_path
src_dir = os.path.join(result_dir, "%s/%s/trackingnet/" % (tracker_name, cfg_name))
dest_dir = os.path.join(result_dir, "%s/%s/trackingnet_submit/" % (tracker_name, cfg_name))
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
items = os.listdir(src_dir)
for item in items:
if "all" in item:
continue
if "time" not in item:
src_path = os.path.join(src_dir, item)
dest_path = os.path.join(dest_dir, item)
bbox_arr = np.loadtxt(src_path, dtype=np.int, delimiter='\t')
np.savetxt(dest_path, bbox_arr, fmt='%d', delimiter=',')
# make zip archive
shutil.make_archive(src_dir, "zip", src_dir)
shutil.make_archive(dest_dir, "zip", dest_dir)
# Remove the original files
shutil.rmtree(src_dir)
shutil.rmtree(dest_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='transform trackingnet results.')
parser.add_argument('--tracker_name', type=str, help='Name of tracking method.')
parser.add_argument('--cfg_name', type=str, help='Name of config file.')
args = parser.parse_args()
transform_trackingnet(args.tracker_name, args.cfg_name)
from .admin.multigpu import MultiGPU
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os.path as osp
import sys
def add_path(path):
if path not in sys.path:
sys.path.insert(0, path)
this_dir = osp.dirname(__file__)
prj_path = osp.join(this_dir, '../..')
add_path(prj_path)
from .base_actor import BaseActor
from .artrack import ARTrackActor
from .artrack_seq import ARTrackSeqActor
from . import BaseActor
from lib.utils.misc import NestedTensor
from lib.utils.box_ops import box_cxcywh_to_xyxy, box_xywh_to_xyxy
import torch
import math
import numpy as np
from lib.utils.merge import merge_template_search
from ...utils.heapmap_utils import generate_heatmap
from ...utils.ce_utils import generate_mask_cond, adjust_keep_rate
def fp16_clamp(x, min=None, max=None):
if not x.is_cuda and x.dtype == torch.float16:
# clamp for cpu float16, tensor fp16 has no clamp implementation
return x.float().clamp(min, max).half()
return x.clamp(min, max)
def generate_sa_simdr(joints):
'''
:param joints: [num_joints, 3]
:param joints_vis: [num_joints, 3]
:return: target, target_weight(1: visible, 0: invisible)
'''
num_joints = 48
image_size = [256, 256]
simdr_split_ratio = 1.5625
sigma = 6
target_x1 = np.zeros((num_joints,
int(image_size[0] * simdr_split_ratio)),
dtype=np.float32)
target_y1 = np.zeros((num_joints,
int(image_size[1] * simdr_split_ratio)),
dtype=np.float32)
target_x2 = np.zeros((num_joints,
int(image_size[0] * simdr_split_ratio)),
dtype=np.float32)
target_y2 = np.zeros((num_joints,
int(image_size[1] * simdr_split_ratio)),
dtype=np.float32)
zero_4_begin = np.zeros((num_joints, 1), dtype=np.float32)
tmp_size = sigma * 3
for joint_id in range(num_joints):
mu_x1 = joints[joint_id][0]
mu_y1 = joints[joint_id][1]
mu_x2 = joints[joint_id][2]
mu_y2 = joints[joint_id][3]
x1 = np.arange(0, int(image_size[0] * simdr_split_ratio), 1, np.float32)
y1 = np.arange(0, int(image_size[1] * simdr_split_ratio), 1, np.float32)
x2 = np.arange(0, int(image_size[0] * simdr_split_ratio), 1, np.float32)
y2 = np.arange(0, int(image_size[1] * simdr_split_ratio), 1, np.float32)
target_x1[joint_id] = (np.exp(- ((x1 - mu_x1) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
target_y1[joint_id] = (np.exp(- ((y1 - mu_y1) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
target_x2[joint_id] = (np.exp(- ((x2 - mu_x2) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
target_y2[joint_id] = (np.exp(- ((y2 - mu_y2) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
return target_x1, target_y1, target_x2, target_y2
# angle cost
def SIoU_loss(test1, test2, theta=4):
eps = 1e-7
cx_pred = (test1[:, 0] + test1[:, 2]) / 2
cy_pred = (test1[:, 1] + test1[:, 3]) / 2
cx_gt = (test2[:, 0] + test2[:, 2]) / 2
cy_gt = (test2[:, 1] + test2[:, 3]) / 2
dist = ((cx_pred - cx_gt)**2 + (cy_pred - cy_gt)**2) ** 0.5
ch = torch.max(cy_gt, cy_pred) - torch.min(cy_gt, cy_pred)
x = ch / (dist + eps)
angle = 1 - 2*torch.sin(torch.arcsin(x)-torch.pi/4)**2
# distance cost
xmin = torch.min(test1[:, 0], test2[:, 0])
xmax = torch.max(test1[:, 2], test2[:, 2])
ymin = torch.min(test1[:, 1], test2[:, 1])
ymax = torch.max(test1[:, 3], test2[:, 3])
cw = xmax - xmin
ch = ymax - ymin
px = ((cx_gt - cx_pred) / (cw+eps))**2
py = ((cy_gt - cy_pred) / (ch+eps))**2
gama = 2 - angle
dis = (1 - torch.exp(-1 * gama * px)) + (1 - torch.exp(-1 * gama * py))
#shape cost
w_pred = test1[:, 2] - test1[:, 0]
h_pred = test1[:, 3] - test1[:, 1]
w_gt = test2[:, 2] - test2[:, 0]
h_gt = test2[:, 3] - test2[:, 1]
ww = torch.abs(w_pred - w_gt) / (torch.max(w_pred, w_gt) + eps)
wh = torch.abs(h_gt - h_pred) / (torch.max(h_gt, h_pred) + eps)
omega = (1 - torch.exp(-1 * wh)) ** theta + (1 - torch.exp(-1 * ww)) ** theta
#IoU loss
lt = torch.max(test1[..., :2], test2[..., :2]) # [B, rows, 2]
rb = torch.min(test1[..., 2:], test2[..., 2:]) # [B, rows, 2]
wh = fp16_clamp(rb - lt, min=0)
overlap = wh[..., 0] * wh[..., 1]
area1 = (test1[..., 2] - test1[..., 0]) * (
test1[..., 3] - test1[..., 1])
area2 = (test2[..., 2] - test2[..., 0]) * (
test2[..., 3] - test2[..., 1])
iou = overlap / (area1 + area2 - overlap)
SIoU = 1 - iou + (omega + dis) / 2
return SIoU, iou
def ciou(pred, target, eps=1e-7):
# overlap
lt = torch.max(pred[:, :2], target[:, :2])
rb = torch.min(pred[:, 2:], target[:, 2:])
wh = (rb - lt).clamp(min=0)
overlap = wh[:, 0] * wh[:, 1]
# union
ap = (pred[:, 2] - pred[:, 0]) * (pred[:, 3] - pred[:, 1])
ag = (target[:, 2] - target[:, 0]) * (target[:, 3] - target[:, 1])
union = ap + ag - overlap + eps
# IoU
ious = overlap / union
# enclose area
enclose_x1y1 = torch.min(pred[:, :2], target[:, :2])
enclose_x2y2 = torch.max(pred[:, 2:], target[:, 2:])
enclose_wh = (enclose_x2y2 - enclose_x1y1).clamp(min=0)
cw = enclose_wh[:, 0]
ch = enclose_wh[:, 1]
c2 = cw**2 + ch**2 + eps
b1_x1, b1_y1 = pred[:, 0], pred[:, 1]
b1_x2, b1_y2 = pred[:, 2], pred[:, 3]
b2_x1, b2_y1 = target[:, 0], target[:, 1]
b2_x2, b2_y2 = target[:, 2], target[:, 3]
w1, h1 = b1_x2 - b1_x1, b1_y2 - b1_y1 + eps
w2, h2 = b2_x2 - b2_x1, b2_y2 - b2_y1 + eps
left = ((b2_x1 + b2_x2) - (b1_x1 + b1_x2))**2 / 4
right = ((b2_y1 + b2_y2) - (b1_y1 + b1_y2))**2 / 4
rho2 = left + right
factor = 4 / math.pi**2
v = factor * torch.pow(torch.atan(w2 / h2) - torch.atan(w1 / h1), 2)
# CIoU
cious = ious - (rho2 / c2 + v**2 / (1 - ious + v))
return cious, ious
class ARTrackActor(BaseActor):
""" Actor for training ARTrack models """
def __init__(self, net, objective, loss_weight, settings, bins, search_size, cfg=None):
super().__init__(net, objective)
self.loss_weight = loss_weight
self.settings = settings
self.bs = self.settings.batchsize # batch size
self.cfg = cfg
self.bins = bins
self.range = self.cfg.MODEL.RANGE
self.search_size = search_size
self.logsoftmax = torch.nn.LogSoftmax(dim=1)
self.focal = None
self.loss_weight['KL'] = 100
self.loss_weight['focal'] = 2
def __call__(self, data):
"""
args:
data - The input data, should contain the fields 'template', 'search', 'gt_bbox'.
template_images: (N_t, batch, 3, H, W)
search_images: (N_s, batch, 3, H, W)
returns:
loss - the training loss
status - dict containing detailed losses
"""
# forward pass
out_dict = self.forward_pass(data)
# compute losses
loss, status = self.compute_losses(out_dict, data)
return loss, status
def forward_pass(self, data):
# currently only support 1 template and 1 search region
assert len(data['template_images']) == 1
assert len(data['search_images']) == 1
template_list = []
for i in range(self.settings.num_template):
template_img_i = data['template_images'][i].view(-1,
*data['template_images'].shape[2:]) # (batch, 3, 128, 128)
template_list.append(template_img_i)
search_img = data['search_images'][0].view(-1, *data['search_images'].shape[2:]) # (batch, 3, 320, 320)
if len(template_list) == 1:
template_list = template_list[0]
gt_bbox = data['search_anno'][-1]
begin = self.bins * self.range
end = self.bins * self.range + 1
magic_num = (self.range - 1) * 0.5
gt_bbox[:, 2] = gt_bbox[:, 0] + gt_bbox[:, 2]
gt_bbox[:, 3] = gt_bbox[:, 1] + gt_bbox[:, 3]
gt_bbox = gt_bbox.clamp(min=(-1*magic_num), max=(1+magic_num))
data['real_bbox'] = gt_bbox
seq_ori = (gt_bbox + magic_num) * (self.bins - 1)
seq_ori = seq_ori.int().to(search_img)
B = seq_ori.shape[0]
seq_input = torch.cat([torch.ones((B, 1)).to(search_img) * begin, seq_ori], dim=1)
seq_output = torch.cat([seq_ori, torch.ones((B, 1)).to(search_img) * end], dim=1)
data['seq_input'] = seq_input
data['seq_output'] = seq_output
out_dict = self.net(template=template_list,
search=search_img,
seq_input=seq_input)
return out_dict
def compute_losses(self, pred_dict, gt_dict, return_status=True):
bins = self.bins
magic_num = (self.range - 1) * 0.5
seq_output = gt_dict['seq_output']
pred_feat = pred_dict["feat"]
if self.focal == None:
weight = torch.ones(bins*self.range+2) * 1
weight[bins*self.range+1] = 0.1
weight[bins*self.range] = 0.1
weight.to(pred_feat)
self.klloss = torch.nn.KLDivLoss(reduction='none').to(pred_feat)
self.focal = torch.nn.CrossEntropyLoss(weight=weight, size_average=True).to(pred_feat)
# compute varfifocal loss
pred = pred_feat.permute(1, 0, 2).reshape(-1, bins*2+2)
target = seq_output.reshape(-1).to(torch.int64)
varifocal_loss = self.focal(pred, target)
# compute giou and L1 loss
beta = 1
pred = pred_feat[0:4, :, 0:bins*self.range] * beta
target = seq_output[:, 0:4].to(pred_feat)
out = pred.softmax(-1).to(pred)
mul = torch.range((-1*magic_num+1/(self.bins*self.range)), (1+magic_num-1/(self.bins*self.range)), 2/(self.bins*self.range)).to(pred)
ans = out * mul
ans = ans.sum(dim=-1)
ans = ans.permute(1, 0).to(pred)
target = target / (bins - 1) - magic_num
extra_seq = ans
extra_seq = extra_seq.to(pred)
sious, iou = SIoU_loss(extra_seq, target, 4)
sious = sious.mean()
siou_loss = sious
l1_loss = self.objective['l1'](extra_seq, target)
loss = self.loss_weight['giou'] * siou_loss + self.loss_weight['l1'] * l1_loss + self.loss_weight['focal'] * varifocal_loss
if return_status:
# status for log
mean_iou = iou.detach().mean()
status = {"Loss/total": loss.item(),
"Loss/giou": siou_loss.item(),
"Loss/l1": l1_loss.item(),
"Loss/location": varifocal_loss.item(),
"IoU": mean_iou.item()}
return loss, status
else:
return loss
from . import BaseActor
from lib.utils.misc import NestedTensor
from lib.utils.box_ops import box_cxcywh_to_xyxy, box_xywh_to_xyxy
import torch
import math
import numpy as np
import numpy
import cv2
import torch.nn.functional as F
import torchvision.transforms.functional as tvisf
import lib.train.data.bounding_box_utils as bbutils
from lib.utils.merge import merge_template_search
from torch.distributions.categorical import Categorical
from ...utils.heapmap_utils import generate_heatmap
from ...utils.ce_utils import generate_mask_cond, adjust_keep_rate
def IoU(rect1, rect2):
""" caculate interection over union
Args:
rect1: (x1, y1, x2, y2)
rect2: (x1, y1, x2, y2)
Returns:
iou
"""
# overlap
x1, y1, x2, y2 = rect1[0], rect1[1], rect1[2], rect1[3]
tx1, ty1, tx2, ty2 = rect2[0], rect2[1], rect2[2], rect2[3]
xx1 = np.maximum(tx1, x1)
yy1 = np.maximum(ty1, y1)
xx2 = np.minimum(tx2, x2)
yy2 = np.minimum(ty2, y2)
ww = np.maximum(0, xx2 - xx1)
hh = np.maximum(0, yy2 - yy1)
area = (x2 - x1) * (y2 - y1)
target_a = (tx2 - tx1) * (ty2 - ty1)
inter = ww * hh
iou = inter / (area + target_a - inter)
return iou
def fp16_clamp(x, min=None, max=None):
if not x.is_cuda and x.dtype == torch.float16:
# clamp for cpu float16, tensor fp16 has no clamp implementation
return x.float().clamp(min, max).half()
return x.clamp(min, max)
def generate_sa_simdr(joints):
'''
:param joints: [num_joints, 3]
:param joints_vis: [num_joints, 3]
:return: target, target_weight(1: visible, 0: invisible)
'''
num_joints = 48
image_size = [256, 256]
simdr_split_ratio = 1.5625
sigma = 6
target_x1 = np.zeros((num_joints,
int(image_size[0] * simdr_split_ratio)),
dtype=np.float32)
target_y1 = np.zeros((num_joints,
int(image_size[1] * simdr_split_ratio)),
dtype=np.float32)
target_x2 = np.zeros((num_joints,
int(image_size[0] * simdr_split_ratio)),
dtype=np.float32)
target_y2 = np.zeros((num_joints,
int(image_size[1] * simdr_split_ratio)),
dtype=np.float32)
zero_4_begin = np.zeros((num_joints, 1), dtype=np.float32)
tmp_size = sigma * 3
for joint_id in range(num_joints):
mu_x1 = joints[joint_id][0]
mu_y1 = joints[joint_id][1]
mu_x2 = joints[joint_id][2]
mu_y2 = joints[joint_id][3]
x1 = np.arange(0, int(image_size[0] * simdr_split_ratio), 1, np.float32)
y1 = np.arange(0, int(image_size[1] * simdr_split_ratio), 1, np.float32)
x2 = np.arange(0, int(image_size[0] * simdr_split_ratio), 1, np.float32)
y2 = np.arange(0, int(image_size[1] * simdr_split_ratio), 1, np.float32)
target_x1[joint_id] = (np.exp(- ((x1 - mu_x1) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
target_y1[joint_id] = (np.exp(- ((y1 - mu_y1) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
target_x2[joint_id] = (np.exp(- ((x2 - mu_x2) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
target_y2[joint_id] = (np.exp(- ((y2 - mu_y2) ** 2) / (2 * sigma ** 2))) / (
sigma * np.sqrt(np.pi * 2))
return target_x1, target_y1, target_x2, target_y2
# angle cost
def SIoU_loss(test1, test2, theta=4):
eps = 1e-7
cx_pred = (test1[:, 0] + test1[:, 2]) / 2
cy_pred = (test1[:, 1] + test1[:, 3]) / 2
cx_gt = (test2[:, 0] + test2[:, 2]) / 2
cy_gt = (test2[:, 1] + test2[:, 3]) / 2
dist = ((cx_pred - cx_gt) ** 2 + (cy_pred - cy_gt) ** 2) ** 0.5
ch = torch.max(cy_gt, cy_pred) - torch.min(cy_gt, cy_pred)
x = ch / (dist + eps)
angle = 1 - 2 * torch.sin(torch.arcsin(x) - torch.pi / 4) ** 2
# distance cost
xmin = torch.min(test1[:, 0], test2[:, 0])
xmax = torch.max(test1[:, 2], test2[:, 2])
ymin = torch.min(test1[:, 1], test2[:, 1])
ymax = torch.max(test1[:, 3], test2[:, 3])
cw = xmax - xmin
ch = ymax - ymin
px = ((cx_gt - cx_pred) / (cw + eps)) ** 2
py = ((cy_gt - cy_pred) / (ch + eps)) ** 2
gama = 2 - angle
dis = (1 - torch.exp(-1 * gama * px)) + (1 - torch.exp(-1 * gama * py))
# shape cost
w_pred = test1[:, 2] - test1[:, 0]
h_pred = test1[:, 3] - test1[:, 1]
w_gt = test2[:, 2] - test2[:, 0]
h_gt = test2[:, 3] - test2[:, 1]
ww = torch.abs(w_pred - w_gt) / (torch.max(w_pred, w_gt) + eps)
wh = torch.abs(h_gt - h_pred) / (torch.max(h_gt, h_pred) + eps)
omega = (1 - torch.exp(-1 * wh)) ** theta + (1 - torch.exp(-1 * ww)) ** theta
# IoU loss
lt = torch.max(test1[..., :2], test2[..., :2]) # [B, rows, 2]
rb = torch.min(test1[..., 2:], test2[..., 2:]) # [B, rows, 2]
wh = fp16_clamp(rb - lt, min=0)
overlap = wh[..., 0] * wh[..., 1]
area1 = (test1[..., 2] - test1[..., 0]) * (
test1[..., 3] - test1[..., 1])
area2 = (test2[..., 2] - test2[..., 0]) * (
test2[..., 3] - test2[..., 1])
iou = overlap / (area1 + area2 - overlap)
SIoU = 1 - iou + (omega + dis) / 2
return SIoU, iou
def ciou(pred, target, eps=1e-7):
# overlap
lt = torch.max(pred[:, :2], target[:, :2])
rb = torch.min(pred[:, 2:], target[:, 2:])
wh = (rb - lt).clamp(min=0)
overlap = wh[:, 0] * wh[:, 1]
# union
ap = (pred[:, 2] - pred[:, 0]) * (pred[:, 3] - pred[:, 1])
ag = (target[:, 2] - target[:, 0]) * (target[:, 3] - target[:, 1])
union = ap + ag - overlap + eps
# IoU
ious = overlap / union
# enclose area
enclose_x1y1 = torch.min(pred[:, :2], target[:, :2])
enclose_x2y2 = torch.max(pred[:, 2:], target[:, 2:])
enclose_wh = (enclose_x2y2 - enclose_x1y1).clamp(min=0)
cw = enclose_wh[:, 0]
ch = enclose_wh[:, 1]
c2 = cw ** 2 + ch ** 2 + eps
b1_x1, b1_y1 = pred[:, 0], pred[:, 1]
b1_x2, b1_y2 = pred[:, 2], pred[:, 3]
b2_x1, b2_y1 = target[:, 0], target[:, 1]
b2_x2, b2_y2 = target[:, 2], target[:, 3]
w1, h1 = b1_x2 - b1_x1, b1_y2 - b1_y1 + eps
w2, h2 = b2_x2 - b2_x1, b2_y2 - b2_y1 + eps
left = ((b2_x1 + b2_x2) - (b1_x1 + b1_x2)) ** 2 / 4
right = ((b2_y1 + b2_y2) - (b1_y1 + b1_y2)) ** 2 / 4
rho2 = left + right
factor = 4 / math.pi ** 2
v = factor * torch.pow(torch.atan(w2 / h2) - torch.atan(w1 / h1), 2)
# CIoU
cious = ious - (rho2 / c2 + v ** 2 / (1 - ious + v))
return cious, ious
class ARTrackSeqActor(BaseActor):
""" Actor for training OSTrack models """
def __init__(self, net, objective, loss_weight, settings, bins, search_size, cfg=None):
super().__init__(net, objective)
self.loss_weight = loss_weight
self.settings = settings
self.bs = self.settings.batchsize # batch size
self.cfg = cfg
self.bins = bins
self.search_size = search_size
self.logsoftmax = torch.nn.LogSoftmax(dim=1)
self.focal = None
self.range = cfg.MODEL.RANGE
self.pre_num = cfg.MODEL.PRENUM
self.loss_weight['KL'] = 0
self.loss_weight['focal'] = 0
self.pre_bbox = None
self.x_feat_rem = None
self.update_rem = None
def __call__(self, data):
"""
args:
data - The input data, should contain the fields 'template', 'search', 'gt_bbox'.
template_images: (N_t, batch, 3, H, W)
search_images: (N_s, batch, 3, H, W)
returns:
loss - the training loss
status - dict containing detailed losses
"""
# forward pass
out_dict = self.forward_pass(data)
# compute losses
loss, status = self.compute_losses(out_dict, data)
return loss, status
def _bbox_clip(self, cx, cy, width, height, boundary):
cx = max(0, min(cx, boundary[1]))
cy = max(0, min(cy, boundary[0]))
width = max(10, min(width, boundary[1]))
height = max(10, min(height, boundary[0]))
return cx, cy, width, height
def get_subwindow(self, im, pos, model_sz, original_sz, avg_chans):
"""
args:
im: bgr based image
pos: center position
model_sz: exemplar size
s_z: original size
avg_chans: channel average
"""
if isinstance(pos, float):
pos = [pos, pos]
sz = original_sz
im_sz = im.shape
c = (original_sz + 1) / 2
# context_xmin = round(pos[0] - c) # py2 and py3 round
context_xmin = np.floor(pos[0] - c + 0.5)
context_xmax = context_xmin + sz - 1
# context_ymin = round(pos[1] - c)
context_ymin = np.floor(pos[1] - c + 0.5)
context_ymax = context_ymin + sz - 1
left_pad = int(max(0., -context_xmin))
top_pad = int(max(0., -context_ymin))
right_pad = int(max(0., context_xmax - im_sz[1] + 1))
bottom_pad = int(max(0., context_ymax - im_sz[0] + 1))
context_xmin = context_xmin + left_pad
context_xmax = context_xmax + left_pad
context_ymin = context_ymin + top_pad
context_ymax = context_ymax + top_pad
r, c, k = im.shape
if any([top_pad, bottom_pad, left_pad, right_pad]):
size = (r + top_pad + bottom_pad, c + left_pad + right_pad, k)
te_im = np.zeros(size, np.uint8)
te_im[top_pad:top_pad + r, left_pad:left_pad + c, :] = im
if top_pad:
te_im[0:top_pad, left_pad:left_pad + c, :] = avg_chans
if bottom_pad:
te_im[r + top_pad:, left_pad:left_pad + c, :] = avg_chans
if left_pad:
te_im[:, 0:left_pad, :] = avg_chans
if right_pad:
te_im[:, c + left_pad:, :] = avg_chans
im_patch = te_im[int(context_ymin):int(context_ymax + 1),
int(context_xmin):int(context_xmax + 1), :]
else:
im_patch = im[int(context_ymin):int(context_ymax + 1),
int(context_xmin):int(context_xmax + 1), :]
if not np.array_equal(model_sz, original_sz):
try:
im_patch = cv2.resize(im_patch, (model_sz, model_sz))
except:
return None
im_patch = im_patch.transpose(2, 0, 1)
im_patch = im_patch[np.newaxis, :, :, :]
im_patch = im_patch.astype(np.float32)
im_patch = torch.from_numpy(im_patch)
im_patch = im_patch.cuda()
return im_patch
def batch_init(self, images, template_bbox, initial_bbox) -> dict:
self.frame_num = 1
self.device = 'cuda'
# Convert bbox (x1, y1, w, h) -> (cx, cy, w, h)
template_bbox = bbutils.batch_xywh2center2(template_bbox) # ndarray:(2*num_seq,4)
initial_bbox = bbutils.batch_xywh2center2(initial_bbox) # ndarray:(2*num_seq,4)
self.center_pos = initial_bbox[:, :2] # ndarray:(2*num_seq,2)
self.size = initial_bbox[:, 2:] # ndarray:(2*num_seq,2)
self.pre_bbox = initial_bbox
for i in range(self.pre_num - 1):
self.pre_bbox = numpy.concatenate((self.pre_bbox, initial_bbox), axis=1)
# print(self.pre_bbox.shape)
template_factor = self.cfg.DATA.TEMPLATE.FACTOR
w_z = template_bbox[:, 2] * template_factor # ndarray:(2*num_seq)
h_z = template_bbox[:, 3] * template_factor # ndarray:(2*num_seq)
s_z = np.ceil(np.sqrt(w_z * h_z)) # ndarray:(2*num_seq)
self.channel_average = []
for img in images:
self.channel_average.append(np.mean(img, axis=(0, 1)))
self.channel_average = np.array(self.channel_average) # ndarray:(2*num_seq,3)
# get crop
z_crop_list = []
for i in range(len(images)):
here_crop = self.get_subwindow(images[i], template_bbox[i, :2],
self.cfg.DATA.TEMPLATE.SIZE, s_z[i], self.channel_average[i])
z_crop = here_crop.float().mul(1.0 / 255.0).clamp(0.0, 1.0)
self.mean = [0.485, 0.456, 0.406]
self.std = [0.229, 0.224, 0.225]
self.inplace = False
z_crop[0] = tvisf.normalize(z_crop[0], self.mean, self.std, self.inplace)
z_crop_list.append(z_crop.clone())
z_crop = torch.cat(z_crop_list, dim=0) # Tensor(2*num_seq,3,128,128)
self.update_rem = None
out = {'template_images': z_crop}
return out
def batch_track(self, img, gt_boxes, template, action_mode='max') -> dict:
search_factor = self.cfg.DATA.SEARCH.FACTOR
w_x = self.size[:, 0] * search_factor
h_x = self.size[:, 1] * search_factor
s_x = np.ceil(np.sqrt(w_x * h_x))
gt_boxes_corner = bbutils.batch_xywh2corner(gt_boxes) # ndarray:(2*num_seq,4)
x_crop_list = []
gt_in_crop_list = []
pre_seq_list = []
pre_seq_in_list = []
x_feat_list = []
magic_num = (self.range - 1) * 0.5
for i in range(len(img)):
channel_avg = np.mean(img[i], axis=(0, 1))
x_crop = self.get_subwindow(img[i], self.center_pos[i], self.cfg.DATA.SEARCH.SIZE,
round(s_x[i]), channel_avg)
if x_crop == None:
return None
for q in range(self.pre_num):
pre_seq_temp = bbutils.batch_center2corner(self.pre_bbox[:, 0 + 4 * q:4 + 4 * q])
if q == 0:
pre_seq = pre_seq_temp
else:
pre_seq = numpy.concatenate((pre_seq, pre_seq_temp), axis=1)
if gt_boxes_corner is not None and np.sum(np.abs(gt_boxes_corner[i] - np.zeros(4))) > 10:
pre_in = np.zeros(4 * self.pre_num)
for w in range(self.pre_num):
pre_in[0 + w * 4:2 + w * 4] = pre_seq[i, 0 + w * 4:2 + w * 4] - self.center_pos[i]
pre_in[2 + w * 4:4 + w * 4] = pre_seq[i, 2 + w * 4:4 + w * 4] - self.center_pos[i]
pre_in[0 + w * 4:4 + w * 4] = pre_in[0 + w * 4:4 + w * 4] * (
self.cfg.DATA.SEARCH.SIZE / s_x[i]) + self.cfg.DATA.SEARCH.SIZE / 2
pre_in[0 + w * 4:4 + w * 4] = pre_in[0 + w * 4:4 + w * 4] / self.cfg.DATA.SEARCH.SIZE
pre_seq_list.append(pre_in)
gt_in_crop = np.zeros(4)
gt_in_crop[:2] = gt_boxes_corner[i, :2] - self.center_pos[i]
gt_in_crop[2:] = gt_boxes_corner[i, 2:] - self.center_pos[i]
gt_in_crop = gt_in_crop * (self.cfg.DATA.SEARCH.SIZE / s_x[i]) + self.cfg.DATA.SEARCH.SIZE / 2
gt_in_crop[2:] = gt_in_crop[2:] - gt_in_crop[:2] # (x1,y1,x2,y2) to (x1,y1,w,h)
gt_in_crop_list.append(gt_in_crop)
else:
pre_in = np.zeros(4 * self.pre_num)
pre_seq_list.append(pre_in)
gt_in_crop_list.append(np.zeros(4))
pre_seq_input = torch.from_numpy(pre_in).clamp(-1 * magic_num, 1 + magic_num)
pre_seq_input = (pre_seq_input + 0.5) * (self.bins - 1)
pre_seq_in_list.append(pre_seq_input.clone())
x_crop = x_crop.float().mul(1.0 / 255.0).clamp(0.0, 1.0)
x_crop[0] = tvisf.normalize(x_crop[0], self.mean, self.std, self.inplace)
x_crop_list.append(x_crop.clone())
x_crop = torch.cat(x_crop_list, dim=0)
pre_seq_output = torch.cat(pre_seq_in_list, dim=0).reshape(-1, 4 * self.pre_num)
outputs = self.net(template, x_crop, seq_input=pre_seq_output, head_type=None, stage="batch_track",
search_feature=self.x_feat_rem, update=None)
selected_indices = outputs['seqs'].detach()
x_feat = outputs['x_feat'].detach().cpu()
self.x_feat_rem = x_feat.clone()
x_feat_list.append(x_feat.clone())
pred_bbox = selected_indices[:, 0:4].data.cpu().numpy()
bbox = (pred_bbox / (self.bins - 1) - magic_num) * s_x.reshape(-1, 1)
cx = bbox[:, 0] + self.center_pos[:, 0] - s_x / 2
cy = bbox[:, 1] + self.center_pos[:, 1] - s_x / 2
width = bbox[:, 2] - bbox[:, 0]
height = bbox[:, 3] - bbox[:, 1]
cx = cx + width / 2
cy = cy + height / 2
for i in range(len(img)):
cx[i], cy[i], width[i], height[i] = self._bbox_clip(cx[i], cy[i], width[i],
height[i], img[i].shape[:2])
self.center_pos = np.stack([cx, cy], 1)
self.size = np.stack([width, height], 1)
for e in range(self.pre_num):
if e != self.pre_num - 1:
self.pre_bbox[:, 0 + e * 4:4 + e * 4] = self.pre_bbox[:, 4 + e * 4:8 + e * 4]
else:
self.pre_bbox[:, 0 + e * 4:4 + e * 4] = numpy.stack([cx, cy, width, height], 1)
bbox = np.stack([cx - width / 2, cy - height / 2, width, height], 1)
out = {
'search_images': x_crop,
'pred_bboxes': bbox,
'selected_indices': selected_indices.cpu(),
'gt_in_crop': torch.tensor(np.stack(gt_in_crop_list, axis=0), dtype=torch.float),
'pre_seq': torch.tensor(np.stack(pre_seq_list, axis=0), dtype=torch.float),
'x_feat': torch.tensor([item.cpu().detach().numpy() for item in x_feat_list], dtype=torch.float),
}
return out
def explore(self, data):
results = {}
search_images_list = []
search_anno_list = []
iou_list = []
pre_seq_list = []
x_feat_list = []
num_frames = data['num_frames']
images = data['search_images']
gt_bbox = data['search_annos']
template = data['template_images']
template_bbox = data['template_annos']
template = template
template_bbox = template_bbox
template_bbox = np.array(template_bbox)
num_seq = len(num_frames)
for idx in range(np.max(num_frames)):
here_images = [img[idx] for img in images] # S, N
here_gt_bbox = np.array([gt[idx] for gt in gt_bbox])
here_images = here_images
here_gt_bbox = np.concatenate([here_gt_bbox], 0)
if idx == 0:
outputs_template = self.batch_init(template, template_bbox, here_gt_bbox)
results['template_images'] = outputs_template['template_images']
else:
outputs = self.batch_track(here_images, here_gt_bbox, outputs_template['template_images'],
action_mode='half')
if outputs == None:
return None
x_feat = outputs['x_feat']
pred_bbox = outputs['pred_bboxes']
search_images_list.append(outputs['search_images'])
search_anno_list.append(outputs['gt_in_crop'])
if len(outputs['pre_seq']) != 8:
print(outputs['pre_seq'])
print(len(outputs['pre_seq']))
print(idx)
print(data['num_frames'])
print(data['search_annos'])
return None
pre_seq_list.append(outputs['pre_seq'])
pred_bbox_corner = bbutils.batch_xywh2corner(pred_bbox)
gt_bbox_corner = bbutils.batch_xywh2corner(here_gt_bbox)
here_iou = []
for i in range(num_seq):
bbox_iou = IoU(pred_bbox_corner[i], gt_bbox_corner[i])
here_iou.append(bbox_iou)
iou_list.append(here_iou)
x_feat_list.append(x_feat.clone())
results['x_feat'] = torch.cat([torch.stack(x_feat_list)], dim=2)
results['search_images'] = torch.cat([torch.stack(search_images_list)],
dim=1)
results['search_anno'] = torch.cat([torch.stack(search_anno_list)],
dim=1)
results['pre_seq'] = torch.cat([torch.stack(pre_seq_list)], dim=1)
iou_tensor = torch.tensor(iou_list, dtype=torch.float)
results['baseline_iou'] = torch.cat([iou_tensor[:, :num_seq]], dim=1)
return results
def forward_pass(self, data):
# currently only support 1 template and 1 search region
assert len(data['template_images']) == 1
assert len(data['search_images']) == 1
template_list = []
for i in range(self.settings.num_template):
template_img_i = data['template_images'][i].view(-1,
*data['template_images'].shape[2:]) # (batch, 3, 128, 128)
template_list.append(template_img_i)
search_img = data['search_images'][0].view(-1, *data['search_images'].shape[2:]) # (batch, 3, 320, 320)
box_mask_z = None
ce_keep_rate = None
if self.cfg.MODEL.BACKBONE.CE_LOC:
box_mask_z = generate_mask_cond(self.cfg, template_list[0].shape[0], template_list[0].device,
data['template_anno'][0])
ce_start_epoch = self.cfg.TRAIN.CE_START_EPOCH
ce_warm_epoch = self.cfg.TRAIN.CE_WARM_EPOCH
ce_keep_rate = adjust_keep_rate(data['epoch'], warmup_epochs=ce_start_epoch,
total_epochs=ce_start_epoch + ce_warm_epoch,
ITERS_PER_EPOCH=1,
base_keep_rate=self.cfg.MODEL.BACKBONE.CE_KEEP_RATIO[0])
if len(template_list) == 1:
template_list = template_list[0]
gt_bbox = data['search_anno'][-1]
begin = self.bins
end = self.bins + 1
gt_bbox[:, 2] = gt_bbox[:, 0] + gt_bbox[:, 2]
gt_bbox[:, 3] = gt_bbox[:, 1] + gt_bbox[:, 3]
gt_bbox = gt_bbox.clamp(min=0.5, max=1.5)
data['real_bbox'] = gt_bbox
seq_ori = gt_bbox * (self.bins - 1)
seq_ori = seq_ori.int().to(search_img)
B = seq_ori.shape[0]
seq_input = torch.cat([torch.ones((B, 1)).to(search_img) * begin, seq_ori], dim=1)
seq_output = torch.cat([seq_ori, torch.ones((B, 1)).to(search_img) * end], dim=1)
data['seq_input'] = seq_input
data['seq_output'] = seq_output
out_dict = self.net(template=template_list,
search=search_img,
ce_template_mask=box_mask_z,
ce_keep_rate=ce_keep_rate,
return_last_attn=False,
seq_input=seq_input)
return out_dict
def compute_sequence_losses(self, data):
num_frames = data['search_images'].shape[0]
template_images = data['template_images'].repeat(num_frames, 1, 1, 1, 1)
template_images = template_images.view(-1, *template_images.size()[2:])
search_images = data['search_images'].reshape(-1, *data['search_images'].size()[2:])
search_anno = data['search_anno'].reshape(-1, *data['search_anno'].size()[2:])
magic_num = (self.range - 1) * 0.5
self.loss_weight['focal'] = 0
pre_seq = data['pre_seq'].reshape(-1, 4 * self.pre_num)
x_feat = data['x_feat'].reshape(-1, *data['x_feat'].size()[2:])
pre_seq = pre_seq.clamp(-1 * magic_num, 1 + magic_num)
pre_seq = (pre_seq + magic_num) * (self.bins - 1)
outputs = self.net(template_images, search_images, seq_input=pre_seq, stage="forward_pass",
search_feature=x_feat, update=None)
pred_feat = outputs["feat"]
# generate labels
if self.focal == None:
weight = torch.ones(self.bins * self.range + 2) * 1
weight[self.bins * self.range + 1] = 0.1
weight[self.bins * self.range] = 0.1
weight.to(pred_feat)
self.focal = torch.nn.CrossEntropyLoss(weight=weight, size_average=True).to(pred_feat)
search_anno[:, 2] = search_anno[:, 2] + search_anno[:, 0]
search_anno[:, 3] = search_anno[:, 3] + search_anno[:, 1]
target = (search_anno / self.cfg.DATA.SEARCH.SIZE + 0.5) * (self.bins - 1)
target = target.clamp(min=0.0, max=(self.bins * self.range - 0.0001))
target_iou = target
target = torch.cat([target], dim=1)
target = target.reshape(-1).to(torch.int64)
pred = pred_feat.permute(1, 0, 2).reshape(-1, self.bins * self.range + 2)
varifocal_loss = self.focal(pred, target)
pred = pred_feat[0:4, :, 0:self.bins * self.range]
target = target_iou[:, 0:4].to(pred_feat) / (self.bins - 1) - magic_num
out = pred.softmax(-1).to(pred)
mul = torch.range(-1 * magic_num + 1 / (self.bins * self.range), 1 + magic_num - 1 / (self.bins * self.range), 2 / (self.bins * self.range)).to(pred)
ans = out * mul
ans = ans.sum(dim=-1)
ans = ans.permute(1, 0).to(pred)
extra_seq = ans
extra_seq = extra_seq.to(pred)
cious, iou = SIoU_loss(extra_seq, target, 4)
cious = cious.mean()
giou_loss = cious
loss_bb = self.loss_weight['giou'] * giou_loss + self.loss_weight[
'focal'] * varifocal_loss
total_losses = loss_bb
mean_iou = iou.detach().mean()
status = {"Loss/total": total_losses.item(),
"Loss/giou": giou_loss.item(),
"Loss/location": varifocal_loss.item(),
"IoU": mean_iou.item()}
return total_losses, status
from lib.utils import TensorDict
class BaseActor:
""" Base class for actor. The actor class handles the passing of the data through the network
and calculation the loss"""
def __init__(self, net, objective):
"""
args:
net - The network to train
objective - The loss function
"""
self.net = net
self.objective = objective
def __call__(self, data: TensorDict):
""" Called in each training iteration. Should pass in input data through the network, calculate the loss, and
return the training stats for the input data
args:
data - A TensorDict containing all the necessary data blocks.
returns:
loss - loss for the input data
stats - a dict containing detailed losses
"""
raise NotImplementedError
def to(self, device):
""" Move the network to device
args:
device - device to use. 'cpu' or 'cuda'
"""
self.net.to(device)
def train(self, mode=True):
""" Set whether the network is in train mode.
args:
mode (True) - Bool specifying whether in training mode.
"""
self.net.train(mode)
def eval(self):
""" Set network to eval mode"""
self.train(False)
\ No newline at end of file
from .environment import env_settings, create_default_local_file_ITP_train
from .stats import AverageMeter, StatValue
#from .tensorboard import TensorboardWriter
import importlib
import os
from collections import OrderedDict
def create_default_local_file():
path = os.path.join(os.path.dirname(__file__), 'local.py')
empty_str = '\'\''
default_settings = OrderedDict({
'workspace_dir': empty_str,
'tensorboard_dir': 'self.workspace_dir + \'/tensorboard/\'',
'pretrained_networks': 'self.workspace_dir + \'/pretrained_networks/\'',
'lasot_dir': empty_str,
'got10k_dir': empty_str,
'trackingnet_dir': empty_str,
'coco_dir': empty_str,
'lvis_dir': empty_str,
'sbd_dir': empty_str,
'imagenet_dir': empty_str,
'imagenetdet_dir': empty_str,
'ecssd_dir': empty_str,
'hkuis_dir': empty_str,
'msra10k_dir': empty_str,
'davis_dir': empty_str,
'youtubevos_dir': empty_str})
comment = {'workspace_dir': 'Base directory for saving network checkpoints.',
'tensorboard_dir': 'Directory for tensorboard files.'}
with open(path, 'w') as f:
f.write('class EnvironmentSettings:\n')
f.write(' def __init__(self):\n')
for attr, attr_val in default_settings.items():
comment_str = None
if attr in comment:
comment_str = comment[attr]
if comment_str is None:
f.write(' self.{} = {}\n'.format(attr, attr_val))
else:
f.write(' self.{} = {} # {}\n'.format(attr, attr_val, comment_str))
def create_default_local_file_ITP_train(workspace_dir, data_dir):
path = os.path.join(os.path.dirname(__file__), 'local.py')
empty_str = '\'\''
default_settings = OrderedDict({
'workspace_dir': workspace_dir,
'tensorboard_dir': os.path.join(workspace_dir, 'tensorboard'), # Directory for tensorboard files.
'pretrained_networks': os.path.join(workspace_dir, 'pretrained_networks'),
'lasot_dir': os.path.join(data_dir, 'lasot'),
'got10k_dir': os.path.join(data_dir, 'got10k/train'),
'got10k_val_dir': os.path.join(data_dir, 'got10k/val'),
'lasot_lmdb_dir': os.path.join(data_dir, 'lasot_lmdb'),
'got10k_lmdb_dir': os.path.join(data_dir, 'got10k_lmdb'),
'trackingnet_dir': os.path.join(data_dir, 'trackingnet'),
'trackingnet_lmdb_dir': os.path.join(data_dir, 'trackingnet_lmdb'),
'coco_dir': os.path.join(data_dir, 'coco'),
'coco_lmdb_dir': os.path.join(data_dir, 'coco_lmdb'),
'lvis_dir': empty_str,
'sbd_dir': empty_str,
'imagenet_dir': os.path.join(data_dir, 'vid'),
'imagenet_lmdb_dir': os.path.join(data_dir, 'vid_lmdb'),
'imagenetdet_dir': empty_str,
'ecssd_dir': empty_str,
'hkuis_dir': empty_str,
'msra10k_dir': empty_str,
'davis_dir': empty_str,
'youtubevos_dir': empty_str})
comment = {'workspace_dir': 'Base directory for saving network checkpoints.',
'tensorboard_dir': 'Directory for tensorboard files.'}
with open(path, 'w') as f:
f.write('class EnvironmentSettings:\n')
f.write(' def __init__(self):\n')
for attr, attr_val in default_settings.items():
comment_str = None
if attr in comment:
comment_str = comment[attr]
if comment_str is None:
if attr_val == empty_str:
f.write(' self.{} = {}\n'.format(attr, attr_val))
else:
f.write(' self.{} = \'{}\'\n'.format(attr, attr_val))
else:
f.write(' self.{} = \'{}\' # {}\n'.format(attr, attr_val, comment_str))
def env_settings():
env_module_name = 'lib.train.admin.local'
try:
env_module = importlib.import_module(env_module_name)
return env_module.EnvironmentSettings()
except:
env_file = os.path.join(os.path.dirname(__file__), 'local.py')
create_default_local_file()
raise RuntimeError('YOU HAVE NOT SETUP YOUR local.py!!!\n Go to "{}" and set all the paths you need. Then try to run again.'.format(env_file))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment