Commit fb54db0f authored by limm's avatar limm
Browse files

add projects code

parent 1ac2e802
Pipeline #2804 canceled with stages
# Directly inherit the entire recipe you want to use.
_base_ = 'mmpretrain::resnet/resnet50_8xb32_in1k.py'
# This line is to import your own modules.
custom_imports = dict(imports='models')
# Modify the backbone to use your own backbone.
_base_['model']['backbone'] = dict(type='ExampleNet', depth=18)
# Modify the in_channels of classifier head to fit your backbone.
_base_['model']['head']['in_channels'] = 512
from .example_net import ExampleNet
__all__ = ['ExampleNet']
from mmpretrain.models import ResNet
from mmpretrain.registry import MODELS
# Register your model to the `MODELS`.
@MODELS.register_module()
class ExampleNet(ResNet):
"""Implements an example backbone.
Implement the backbone network just like a normal pytorch network.
"""
def __init__(self, **kwargs) -> None:
print('#############################\n'
'# Hello MMPretrain! #\n'
'#############################')
super().__init__(**kwargs)
def forward(self, x):
"""The forward method of the network.
Args:
x (torch.Tensor): A tensor of image batch with shape
``(batch_size, num_channels, height, width)``.
Returns:
Tuple[torch.Tensor]: Please return a tuple of tensors and every
tensor is a feature map of specified scale. If you only want the
final feature map, simply return a tuple with one item.
"""
return super().forward(x)
# Solution of FGIA ACCV 2022(1st Place)
This is fine-tuning part of the 1st Place Solution for Webly-supervised Fine-grained Recognition, refer to the ACCV workshop competition in https://www.cvmart.net/race/10412/base.
## Result
<details>
<summary>Show the result</summary>
<br>
**Leaderboard A**
![LB-A](https://user-images.githubusercontent.com/18586273/205498131-5728e470-b4f6-43b7-82a5-5f8e3bd5168e.png)
**Leaderboard B**
![LB-B](https://user-images.githubusercontent.com/18586273/205498171-5a3a3055-370a-4a8b-9779-b686254ebc94.png)
</br>
</details>
## Reproduce
For detailed self-supervised pretrain code, please refer to [Self-spervised Pre-training](#self-supervised-pre-training).
For detailed finetuning and inference code, please refer to [this repo](https://github.com/Ezra-Yu/ACCV2022_FGIA_1st).
## Description
### Overview of Our Solution
![image](https://user-images.githubusercontent.com/18586273/205498371-31dbc1f4-5814-44bc-904a-f0d32515c7dd.png)
### Our Model
- ViT(MAE-pre-train) # Pretrained with [MAE](https://github.com/open-mmlab/mmppretrain/tree/main/projects/fgia_accv2022_1st/config/mae_vit-large-p16_8xb512-amp-coslr-1600e_in1k.py)
- Swin-v2(SimMIM-pre-train) # From [MMPretrain-swin_transformer_v2](https://github.com/open-mmlab/mmppretrain/tree/main/configs/swin_transformer_v2).
\*\*The architectures we use \*\*
- ViT + CE-loss + post-LongTail-Adjusment
- ViT + SubCenterArcFaceWithAdvMargin(CE)
- Swin-B + SubCenterArcFaceWithAdvMargin(SoftMax-EQL)
- Swin-L + SubCenterArcFaceWithAdvMargin(SoftMAx-EQL)
## Self-supervised Pre-training
### Requirements
```shell
PyTorch 1.11.0
torchvision 0.12.0
CUDA 11.3
MMEngine >= 0.1.0
MMCV >= 2.0.0rc0
```
### Preparing the dataset
First you should refactor the folder of your dataset in the following format:
```text
mmpretrain
|
|── data
| |── WebiNat5000
| | |── meta
| | | |── train.txt
| | |── train
| | |── testa
| | |── testb
```
The `train`, `testa`, and `testb` folders contain the same content with
those provided by the official website of the competition.
### Start pre-training
First, you should install all these requirements, following this [page](https://mmpretrain.readthedocs.io/en/latest/get_started.html).
Then change your current directory to the root of MMPretrain
```shell
cd $MMPretrain
```
Then you have the following two choices to start pre-training
#### Slurm
If you have a cluster managed by Slurm, you can use the following command:
```shell
## we use 16 NVIDIA 80G A100 GPUs for pre-training
GPUS_PER_NODE=8 GPUS=16 SRUN_ARGS=${SRUN_ARGS} bash tools/slurm_train.sh ${PARTITION} ${JOB_NAME} projects/fgia_accv2022_1st/config/mae_vit-large-p16_8xb512-amp-coslr-1600e_in1k.py [optional arguments]
```
#### Pytorch
Or you can use the following two commands to start distributed training on two separate nodes:
```shell
# node 1
NNODES=2 NODE_RANK=0 PORT=${MASTER_PORT} MASTER_ADDR=${MASTER_ADDR} bash tools/dist_train.sh projects/fgia_accv2022_1st/config/mae_vit-large-p16_8xb512-amp-coslr-1600e_in1k.py 8
```
```shell
# node 2
NNODES=2 NODE_RANK=1 PORT=${MASTER_PORT} MASTER_ADDR=${MASTER_ADDR} bash tools/dist_train.sh projects/fgia_accv2022_1st/config/mae_vit-large-p16_8xb512-amp-coslr-1600e_in1k.py 8
```
All these logs and checkpoints will be saved under the folder `work_dirs`in the root.
## Fine-tuning with bag of tricks
- [MAE](https://github.com/open-mmlab/mmpretrain/tree/main/configs/mae) | [Config](https://github.com/Ezra-Yu/ACCV_workshop/tree/master/configs/vit)
- [Swinv2](https://github.com/open-mmlab/mmpretrain/tree/main/configs/swin_transformer_v2) | [Config](https://github.com/Ezra-Yu/ACCV_workshop/tree/master/configs/swin)
- [ArcFace](https://arxiv.org/abs/1801.07698) | [Code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/src/models/arcface_head.py)
- [SubCenterArcFaceWithAdvMargin](https://paperswithcode.com/paper/sub-center-arcface-boosting-face-recognition) | [Code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/src/models/arcface_head.py)
- [Post-LT-adjusment](https://paperswithcode.com/paper/long-tail-learning-via-logit-adjustment) | [Code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/src/models/linear_head_lt.py)
- [SoftMaxEQL](https://paperswithcode.com/paper/the-equalization-losses-gradient-driven) | [Code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/src/models/eql.py)
- FlipTTA [Code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/src/models/tta_classifier.py)
- clean dataset
- self-emsemble: [Uniform-model-soup](https://arxiv.org/abs/2203.05482) | [code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/tools/model_soup.py)
- [pseudo](https://lilianweng.github.io/posts/2021-12-05-semi-supervised/) | [Code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/tools/creat_pseudo.py)
- bagging-emsemble [Code](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/tools/emsemble.py),
- post-process: [re-distribute-label](https://github.com/Ezra-Yu/ACCV_workshop/blob/master/tools/re-distribute-label.py);
![Overview](https://user-images.githubusercontent.com/18586273/205498258-e5720d83-7006-4aea-86b5-aab1a8998c6c.png)
![image](https://user-images.githubusercontent.com/18586273/205498027-def99b0d-a99a-470b-b292-8d5fc83111fc.png)
#### Used but no improvements
1. Using retrieval paradigm to solve this classification task;
2. Using EfficientNetv2 backbone.
#### Not used but worth to do
1. Try [DiVE](https://arxiv.org/abs/2103.15042) algorithm to improve performance in long tail dataset;
2. Use SimMIM to pre-train Swin-v2 on the competition dataset;
3. refine the re-distribute-label tool.
model = dict(
type='MAE',
backbone=dict(type='MAEViT', arch='l', patch_size=16, mask_ratio=0.75),
neck=dict(
type='MAEPretrainDecoder',
patch_size=16,
in_chans=3,
embed_dim=1024,
decoder_embed_dim=512,
decoder_depth=8,
decoder_num_heads=16,
mlp_ratio=4.0),
head=dict(
type='MAEPretrainHead',
norm_pix=True,
patch_size=16,
loss=dict(type='MAEReconstructionLoss')),
init_cfg=dict(
type='Pretrained',
checkpoint= # noqa: E251
'https://download.openmmlab.com/mmselfsup/1.x/mae/mae_vit-large-p16_8xb512-fp16-coslr-1600e_in1k/mae_vit-large-p16_8xb512-fp16-coslr-1600e_in1k_20220825-cc7e98c9.pth' # noqa
))
custom_imports = dict(
imports='mmpretrain.datasets', allow_failed_imports=False)
data_preprocessor = dict(
mean=[123.675, 116.28, 103.53],
std=[58.395, 57.12, 57.375],
bgr_to_rgb=True)
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='RandomResizedCrop',
scale=224,
crop_ratio_range=(0.2, 1.0),
backend='pillow',
interpolation='bicubic'),
dict(type='RandomFlip', prob=0.5),
dict(type='PackInputs')
]
train_dataloader = dict(
batch_size=256,
num_workers=16,
persistent_workers=True,
sampler=dict(type='DefaultSampler', shuffle=True),
collate_fn=dict(type='default_collate'),
pin_memory=True,
dataset=dict(
type='ImageNet',
data_root='data/WebiNat5000/',
ann_file='data/WebiNat5000/meta/train.txt',
data_prefix=dict(img_path='train/'),
pipeline=train_pipeline))
optim_wrapper = dict(
type='AmpOptimWrapper',
optimizer=dict(
type='AdamW', lr=0.0024, betas=(0.9, 0.95), weight_decay=0.05),
paramwise_cfg=dict(
custom_keys=dict(
ln=dict(decay_mult=0.0),
bias=dict(decay_mult=0.0),
pos_embed=dict(decay_mult=0.0),
mask_token=dict(decay_mult=0.0),
cls_token=dict(decay_mult=0.0))),
loss_scale='dynamic')
param_scheduler = [
dict(
type='LinearLR',
start_factor=0.0001,
by_epoch=True,
begin=0,
end=40,
convert_to_iter_based=True),
dict(
type='CosineAnnealingLR',
T_max=1560,
by_epoch=True,
begin=40,
end=1600,
convert_to_iter_based=True)
]
train_cfg = dict(type='EpochBasedTrainLoop', max_epochs=1600)
default_scope = 'mmpretrain'
default_hooks = dict(
runtime_info=dict(type='RuntimeInfoHook'),
timer=dict(type='IterTimerHook'),
logger=dict(type='LoggerHook', interval=100),
param_scheduler=dict(type='ParamSchedulerHook'),
checkpoint=dict(type='CheckpointHook', interval=1, max_keep_ckpts=1),
sampler_seed=dict(type='DistSamplerSeedHook'))
env_cfg = dict(
cudnn_benchmark=False,
mp_cfg=dict(mp_start_method='fork', opencv_num_threads=0),
dist_cfg=dict(backend='nccl'))
log_processor = dict(
window_size=10,
custom_cfg=[dict(data_src='', method='mean', windows_size='global')])
vis_backends = [dict(type='LocalVisBackend')]
visualizer = dict(
type='UniversalVisualizer',
vis_backends=[dict(type='LocalVisBackend')],
name='visualizer')
log_level = 'INFO'
load_from = None
resume = False
randomness = dict(seed=0, diff_rank_seed=True)
launcher = 'slurm'
work_dir = './work_dirs/mae_vit-large-p16_8xb512-amp-coslr-1600e_in1k'
# MMPretrain Gradio Demo
Here is a gradio demo for MMPretrain supported inference tasks.
Currently supported tasks:
- Image Classifiation
- Image-To-Image Retrieval
- Text-To-Image Retrieval (require multi-modality support)
- Image Caption (require multi-modality support)
- Visual Question Answering (require multi-modality support)
- Visual Grounding (require multi-modality support)
## Preview
<img src="https://user-images.githubusercontent.com/26739999/236147750-90ccb517-92c0-44e9-905e-1473677023b1.jpg" width="100%"/>
## Requirements
To run the demo, you need to install MMPretrain at first. And please install with the extra multi-modality
dependencies to enable multi-modality tasks.
```shell
# At the MMPretrain root folder
pip install -e ".[multimodal]"
```
And then install the latest gradio package.
```shell
pip install "gradio>=3.31.0"
```
## Start
Then, you can start the gradio server on the local machine by:
```shell
# At the project folder
python launch.py
```
The demo will start a local server `http://127.0.0.1:7860` and you can browse it by your browser.
And to share it to others, please set `share=True` in the `demo.launch()`.
# Modified from
# https://github.com/Vision-CAIR/MiniGPT-4/blob/main/minigpt4/conversation/conversation.py
import dataclasses
from typing import List
import torch
@dataclasses.dataclass
class Conversation:
system: str
roles: List[str]
messages: List[List[str]]
sep: str = '###'
def get_prompt(self):
ret = self.system + self.sep
for role, message in self.messages:
if message:
ret += role + ': ' + message + self.sep
else:
ret += role + ':'
return ret
def append_message(self, role, message):
self.messages.append([role, message])
def copy(self):
return Conversation(
system=self.system,
roles=[role for role in self.roles],
messages=[[y for y in x] for x in self.messages],
sep=self.sep,
)
def dict(self):
return {
'system': self.system,
'roles': self.roles,
'messages': self.messages,
'offset': self.offset,
'sep': self.sep,
}
EN_CONV_VISION = Conversation(
system='Give the following image. '
'You will be able to see the image once I provide it to you. '
'Please answer my questions in detail.',
roles=['Ask', 'Answer'],
messages=[],
sep='###',
)
ZH_CONV_VISION = Conversation(
system='给定一张图片,请仔细观察这张图片,并回答我的问题。',
roles=['问', '答'],
messages=[],
sep='###',
)
class Chat:
def __init__(self, inferencer, device, is_half=False):
self.device = device
self.inferencer = inferencer
self.model = inferencer.model
self.is_half = is_half
if is_half:
self.model = self.model.half()
self.model = self.model.to(device)
self.max_length = 2000
def upload_img(self, image, conv, img_list):
img = next(self.inferencer.preprocess([image]))
img = self.model.data_preprocessor(img, False)['images']
img = img.to(self.device)
image_emb, _ = self.model.encode_img(img)
img_list.append(image_emb)
conv.append_message(conv.roles[0], '<Img><ImageHere></Img>')
def get_context_emb(self, conv, img_list):
prompt = conv.get_prompt()
prompt_segs = prompt.split('<ImageHere>')
seg_tokens = [
self.model.llama_tokenizer(
seg, return_tensors='pt',
add_special_tokens=(i == 0)).to(self.device).input_ids
for i, seg in enumerate(prompt_segs)
]
seg_embs = [
self.model.llama_model.model.embed_tokens(seg_token)
for seg_token in seg_tokens
]
mixed_embs = [
emb for pair in zip(seg_embs[:-1], img_list) for emb in pair
] + [seg_embs[-1]]
mixed_embs = torch.cat(mixed_embs, dim=1)
return mixed_embs
def ask(self, text, conv):
if len(conv.messages) > 0 and conv.messages[-1][0] == conv.roles[
0] and conv.messages[-1][1][-6:] == '</Img>':
conv.messages[-1][1] = ' '.join([conv.messages[-1][1], text])
else:
conv.append_message(conv.roles[0], text)
def answer(self, conv, img_list, generation_cfg):
conv.append_message(conv.roles[1], None)
embs = self.get_context_emb(conv, img_list)
cur_max_len = generation_cfg['max_new_tokens'] + embs.shape[1]
if cur_max_len > self.max_length:
print('Warning: The number of tokens in current conversation'
'exceeds the max length. '
'The model will not see the contexts outside the range.')
begin_idx = max(0, cur_max_len - self.max_length)
embs = embs[:, begin_idx:]
if self.is_half:
embs = embs.half()
outputs = self.model.llama_model.generate(
inputs_embeds=embs,
eos_token_id=self.model.end_token_id,
**generation_cfg)
output_token = outputs[0]
if output_token[0] == 0:
output_token = output_token[1:]
elif output_token[0] == 1:
output_token = output_token[1:]
output_text = self.model.llama_tokenizer.decode(
output_token,
add_special_tokens=False,
skip_special_tokens=True)
output_text = output_text.split('###')[0]
conv.messages[-1][1] = output_text
return output_text
from functools import partial
from pathlib import Path
from typing import Callable
import gradio as gr
import torch
from mmengine.logging import MMLogger
import mmpretrain
from mmpretrain.apis import (ImageCaptionInferencer,
ImageClassificationInferencer,
ImageRetrievalInferencer,
TextToImageRetrievalInferencer,
VisualGroundingInferencer,
VisualQuestionAnsweringInferencer)
from mmpretrain.utils.dependency import WITH_MULTIMODAL
from mmpretrain.visualization import UniversalVisualizer
mmpretrain.utils.progress.disable_progress_bar = True
logger = MMLogger('mmpretrain', logger_name='mmpre')
if torch.cuda.is_available():
devices = [
torch.device(f'cuda:{i}') for i in range(torch.cuda.device_count())
]
logger.info(f'Available GPUs: {len(devices)}')
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
devices = [torch.device('mps')]
logger.info('Available MPS.')
else:
devices = [torch.device('cpu')]
logger.info('Available CPU.')
def get_free_device():
if hasattr(torch.cuda, 'mem_get_info'):
free = [torch.cuda.mem_get_info(gpu)[0] for gpu in devices]
select = max(zip(free, range(len(free))))[1]
else:
import random
select = random.randint(0, len(devices) - 1)
return devices[select]
class InferencerCache:
max_size = 2
_cache = []
@classmethod
def get_instance(cls, instance_name, callback: Callable):
if len(cls._cache) > 0:
for i, cache in enumerate(cls._cache):
if cache[0] == instance_name:
# Re-insert to the head of list.
cls._cache.insert(0, cls._cache.pop(i))
logger.info(f'Use cached {instance_name}.')
return cache[1]
if len(cls._cache) == cls.max_size:
cls._cache.pop(cls.max_size - 1)
torch.cuda.empty_cache()
device = get_free_device()
instance = callback(device=device)
logger.info(f'New instance {instance_name} on {device}.')
cls._cache.insert(0, (instance_name, instance))
return instance
class ImageCaptionTab:
def __init__(self) -> None:
self.model_list = ImageCaptionInferencer.list_models()
self.tab = self.create_ui()
def create_ui(self):
with gr.Row():
with gr.Column():
select_model = gr.Dropdown(
label='Choose a model',
elem_id='image_caption_models',
elem_classes='select_model',
choices=self.model_list,
value='blip-base_3rdparty_coco-caption',
)
with gr.Column():
image_input = gr.Image(
label='Input',
source='upload',
elem_classes='input_image',
interactive=True,
tool='editor',
)
caption_output = gr.Textbox(
label='Result',
lines=2,
elem_classes='caption_result',
interactive=False,
)
run_button = gr.Button(
'Run',
elem_classes='run_button',
)
run_button.click(
self.inference,
inputs=[select_model, image_input],
outputs=caption_output,
)
def inference(self, model, image):
image = image[:, :, ::-1]
inferencer_name = self.__class__.__name__ + model
inferencer = InferencerCache.get_instance(
inferencer_name, partial(ImageCaptionInferencer, model))
result = inferencer(image)[0]
return result['pred_caption']
class ImageClassificationTab:
def __init__(self) -> None:
self.short_list = [
'resnet50_8xb32_in1k',
'resnet50_8xb256-rsb-a1-600e_in1k',
'swin-base_16xb64_in1k',
'convnext-base_32xb128_in1k',
'vit-base-p16_32xb128-mae_in1k',
]
self.long_list = ImageClassificationInferencer.list_models()
self.tab = self.create_ui()
def create_ui(self):
with gr.Row():
with gr.Column():
select_model = gr.Dropdown(
label='Choose a model',
elem_id='image_classification_models',
elem_classes='select_model',
choices=self.short_list,
value='swin-base_16xb64_in1k',
)
expand = gr.Checkbox(label='Browse all models')
def browse_all_model(value):
models = self.long_list if value else self.short_list
return gr.update(choices=models)
expand.select(
fn=browse_all_model, inputs=expand, outputs=select_model)
with gr.Column():
in_image = gr.Image(
label='Input',
source='upload',
elem_classes='input_image',
interactive=True,
tool='editor',
)
out_cls = gr.Label(
label='Result',
num_top_classes=5,
elem_classes='cls_result',
)
run_button = gr.Button(
'Run',
elem_classes='run_button',
)
run_button.click(
self.inference,
inputs=[select_model, in_image],
outputs=out_cls,
)
def inference(self, model, image):
image = image[:, :, ::-1]
inferencer_name = self.__class__.__name__ + model
inferencer = InferencerCache.get_instance(
inferencer_name, partial(ImageClassificationInferencer, model))
result = inferencer(image)[0]['pred_scores'].tolist()
if inferencer.classes is not None:
classes = inferencer.classes
else:
classes = list(range(len(result)))
return dict(zip(classes, result))
class ImageRetrievalTab:
def __init__(self) -> None:
self.model_list = ImageRetrievalInferencer.list_models()
self.tab = self.create_ui()
def create_ui(self):
with gr.Row():
with gr.Column():
select_model = gr.Dropdown(
label='Choose a model',
elem_id='image_retri_models',
elem_classes='select_model',
choices=self.model_list,
value='resnet50-arcface_inshop',
)
topk = gr.Slider(minimum=1, maximum=6, value=3, step=1)
with gr.Column():
prototype = gr.File(
label='Retrieve from',
file_count='multiple',
file_types=['image'])
image_input = gr.Image(
label='Query',
source='upload',
elem_classes='input_image',
interactive=True,
tool='editor',
)
retri_output = gr.Gallery(
label='Result',
elem_classes='img_retri_result',
).style(
columns=[3], object_fit='contain', height='auto')
run_button = gr.Button(
'Run',
elem_classes='run_button',
)
run_button.click(
self.inference,
inputs=[select_model, prototype, image_input, topk],
outputs=retri_output,
)
def inference(self, model, prototype, image, topk):
image = image[:, :, ::-1]
import hashlib
proto_signature = ''.join(file.name for file in prototype).encode()
proto_signature = hashlib.sha256(proto_signature).hexdigest()
inferencer_name = self.__class__.__name__ + model + proto_signature
tmp_dir = Path(prototype[0].name).parent
cache_file = tmp_dir / f'{inferencer_name}.pth'
inferencer = InferencerCache.get_instance(
inferencer_name,
partial(
ImageRetrievalInferencer,
model,
prototype=[file.name for file in prototype],
prototype_cache=str(cache_file),
),
)
result = inferencer(image, topk=min(topk, len(prototype)))[0]
return [(str(item['sample']['img_path']),
str(item['match_score'].cpu().item())) for item in result]
class TextToImageRetrievalTab:
def __init__(self) -> None:
self.model_list = TextToImageRetrievalInferencer.list_models()
self.tab = self.create_ui()
def create_ui(self):
with gr.Row():
with gr.Column():
select_model = gr.Dropdown(
label='Choose a model',
elem_id='t2i_retri_models',
elem_classes='select_model',
choices=self.model_list,
value='blip-base_3rdparty_coco-retrieval',
)
topk = gr.Slider(minimum=1, maximum=6, value=3, step=1)
with gr.Column():
prototype = gr.File(
file_count='multiple', file_types=['image'])
text_input = gr.Textbox(
label='Query',
elem_classes='input_text',
interactive=True,
)
retri_output = gr.Gallery(
label='Result',
elem_classes='img_retri_result',
).style(
columns=[3], object_fit='contain', height='auto')
run_button = gr.Button(
'Run',
elem_classes='run_button',
)
run_button.click(
self.inference,
inputs=[select_model, prototype, text_input, topk],
outputs=retri_output,
)
def inference(self, model, prototype, text, topk):
import hashlib
proto_signature = ''.join(file.name for file in prototype).encode()
proto_signature = hashlib.sha256(proto_signature).hexdigest()
inferencer_name = self.__class__.__name__ + model + proto_signature
tmp_dir = Path(prototype[0].name).parent
cache_file = tmp_dir / f'{inferencer_name}.pth'
inferencer = InferencerCache.get_instance(
inferencer_name,
partial(
TextToImageRetrievalInferencer,
model,
prototype=[file.name for file in prototype],
prototype_cache=str(cache_file),
),
)
result = inferencer(text, topk=min(topk, len(prototype)))[0]
return [(str(item['sample']['img_path']),
str(item['match_score'].cpu().item())) for item in result]
class VisualGroundingTab:
def __init__(self) -> None:
self.model_list = VisualGroundingInferencer.list_models()
self.tab = self.create_ui()
self.visualizer = UniversalVisualizer(
fig_save_cfg=dict(figsize=(16, 9)))
def create_ui(self):
with gr.Row():
with gr.Column():
select_model = gr.Dropdown(
label='Choose a model',
elem_id='vg_models',
elem_classes='select_model',
choices=self.model_list,
value='ofa-base_3rdparty_refcoco',
)
with gr.Column():
image_input = gr.Image(
label='Image',
source='upload',
elem_classes='input_image',
interactive=True,
tool='editor',
)
text_input = gr.Textbox(
label='The object to search',
elem_classes='input_text',
interactive=True,
)
vg_output = gr.Image(
label='Result',
source='upload',
interactive=False,
elem_classes='vg_result',
)
run_button = gr.Button(
'Run',
elem_classes='run_button',
)
run_button.click(
self.inference,
inputs=[select_model, image_input, text_input],
outputs=vg_output,
)
def inference(self, model, image, text):
inferencer_name = self.__class__.__name__ + model
inferencer = InferencerCache.get_instance(
inferencer_name,
partial(VisualGroundingInferencer, model),
)
result = inferencer(
image[:, :, ::-1], text, return_datasamples=True)[0]
vis = self.visualizer.visualize_visual_grounding(
image, result, resize=512)
return vis
class VisualQuestionAnsweringTab:
def __init__(self) -> None:
self.model_list = VisualQuestionAnsweringInferencer.list_models()
# The fine-tuned OFA vqa models requires extra object description.
self.model_list.remove('ofa-base_3rdparty-finetuned_vqa')
self.tab = self.create_ui()
def create_ui(self):
with gr.Row():
with gr.Column():
select_model = gr.Dropdown(
label='Choose a model',
elem_id='vqa_models',
elem_classes='select_model',
choices=self.model_list,
value='ofa-base_3rdparty-zeroshot_coco-vqa',
)
with gr.Column():
image_input = gr.Image(
label='Input',
source='upload',
elem_classes='input_image',
interactive=True,
tool='editor',
)
question_input = gr.Textbox(
label='Question',
elem_classes='question_input',
)
answer_output = gr.Textbox(
label='Answer',
elem_classes='answer_result',
)
run_button = gr.Button(
'Run',
elem_classes='run_button',
)
run_button.click(
self.inference,
inputs=[select_model, image_input, question_input],
outputs=answer_output,
)
def inference(self, model, image, question):
image = image[:, :, ::-1]
inferencer_name = self.__class__.__name__ + model
inferencer = InferencerCache.get_instance(
inferencer_name, partial(VisualQuestionAnsweringInferencer, model))
result = inferencer(image, question)[0]
return result['pred_answer']
if __name__ == '__main__':
title = 'MMPretrain Inference Demo'
with gr.Blocks(analytics_enabled=False, title=title) as demo:
gr.Markdown(f'# {title}')
with gr.Tabs():
with gr.TabItem('Image Classification'):
ImageClassificationTab()
with gr.TabItem('Image-To-Image Retrieval'):
ImageRetrievalTab()
if WITH_MULTIMODAL:
with gr.TabItem('Image Caption'):
ImageCaptionTab()
with gr.TabItem('Text-To-Image Retrieval'):
TextToImageRetrievalTab()
with gr.TabItem('Visual Grounding'):
VisualGroundingTab()
with gr.TabItem('Visual Question Answering'):
VisualQuestionAnsweringTab()
else:
with gr.TabItem('Multi-modal tasks'):
gr.Markdown(
'To inference multi-modal models, please install '
'the extra multi-modal dependencies, please refer '
'to https://mmpretrain.readthedocs.io/en/latest/'
'get_started.html#installation')
demo.launch()
import argparse
import gradio as gr
import numpy as np
import torch
from conversation import EN_CONV_VISION, ZH_CONV_VISION, Chat
from mmpretrain import ImageCaptionInferencer
parser = argparse.ArgumentParser(description='MiniGPT4 demo')
parser.add_argument(
'cfg', type=str, help='config file for minigpt4 (absolute path)')
parser.add_argument(
'ckpt', type=str, help='pretrained file for minigpt4 (absolute path)')
args = parser.parse_args()
if torch.cuda.is_available():
devices = [
torch.device(f'cuda:{i}') for i in range(torch.cuda.device_count())
]
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
devices = [torch.device('mps')]
else:
devices = [torch.device('cpu')]
def get_free_device():
if hasattr(torch.cuda, 'mem_get_info'):
free = [torch.cuda.mem_get_info(gpu)[0] for gpu in devices]
select = max(zip(free, range(len(free))))[1]
else:
import random
select = random.randint(0, len(devices) - 1)
return devices[select]
device = get_free_device()
inferencer = ImageCaptionInferencer(model=args.cfg, pretrained=args.ckpt)
model = inferencer.model
chat = Chat(inferencer, device=device, is_half=(device.type != 'cpu'))
def reset(chat_state, img_list):
if chat_state is not None:
chat_state.messages = []
if img_list is not None:
img_list = []
return (None, gr.update(value=None, interactive=True),
gr.update(
value=None,
placeholder='Please upload your image first',
interactive=False),
gr.update(value='Upload & Start Chat',
interactive=True), chat_state, img_list,
gr.update(value='Restart', interactive=False),
gr.update(value='English', interactive=True))
def upload_img(gr_img, language, chat_state):
if gr_img is None:
return (None,
gr.update(
placeholder='Please upload your image first',
interactive=False),
gr.update(value='Upload & Start Chat',
interactive=True), chat_state, None,
gr.update(value='Restart', interactive=False),
gr.update(value='English', interactive=True))
if (language == 'English'):
chat_state = EN_CONV_VISION.copy()
else:
chat_state = ZH_CONV_VISION.copy()
img_list = []
gr_img_array = np.asarray(gr_img)
chat.upload_img(gr_img_array, chat_state, img_list)
return (gr.update(interactive=False),
gr.update(placeholder='Type and press Enter', interactive=True),
gr.update(value='Start Chatting',
interactive=False), chat_state, img_list,
gr.update(value='Restart',
interactive=True), gr.update(interactive=False))
def ask(user_message, chatbot, chat_state):
if (len(user_message) == 0):
return gr.update(
value=None,
placeholder='Input should not be empty!',
interactive=True), chatbot, chat_state
chat.ask(user_message, chat_state)
chatbot = chatbot + [[user_message, None]]
return '', chatbot, chat_state
def answer(chatbot, chat_state, img_list):
llm_message = chat.answer(
conv=chat_state,
img_list=img_list,
generation_cfg=model.generation_cfg)
chatbot[-1][1] = llm_message
return chatbot, chat_state, img_list
if __name__ == '__main__':
title = 'MMPretrain MiniGPT-4 Inference Demo'
with gr.Blocks(analytics_enabled=False, title=title) as demo:
gr.Markdown(f'# {title}')
with gr.Row():
with gr.Column():
image = gr.Image(type='pil')
language = gr.Dropdown(['English', 'Chinese'],
label='Language',
info='Select chatbot\'s language',
value='English',
interactive=True)
upload_button = gr.Button(
value='Upload & Start Chat', interactive=True)
clear = gr.Button(value='Restart', interactive=False)
with gr.Column():
chat_state = gr.State()
img_list = gr.State()
chatbot = gr.Chatbot(
label='MiniGPT-4', min_width=320, height=600)
text_input = gr.Textbox(
label='User',
placeholder='Please upload your image first',
interactive=False)
upload_button.click(upload_img, [image, language, chat_state], [
image, text_input, upload_button, chat_state, img_list, clear,
language
])
text_input.submit(ask, [text_input, chatbot, chat_state],
[text_input, chatbot, chat_state]).then(
answer, [chatbot, chat_state, img_list],
[chatbot, chat_state, img_list])
clear.click(reset, [chat_state, img_list], [
chatbot, image, text_input, upload_button, chat_state, img_list,
clear, language
])
demo.launch(share=True)
# InternImage Classification
## Description
This is the implementation of [InternImage](https://arxiv.org/abs/2211.05778) for image classification.
## Usage
### Setup Environment
Please refer to [Get Started](https://mmpretrain.readthedocs.io/en/latest/get_started.html) documentation of MMPretrain to finish installation.
Please install DCNv3. Run the command below following the [ InternImage official installation instructions](https://github.com/OpenGVLab/InternImage/blob/master/classification/README.md).
```shell
cd ops_dcnv3
sh ./make.sh
```
### Training and Test Commands
At first, you need to add the current folder to `PYTHONPATH`, so that Python can find your model files. In `projects/internimage_classification/` root directory, please run command below to add it.
```shell
export PYTHONPATH=`pwd`:$PYTHONPATH
```
#### Training
##### On Local Single GPU
```bash
# train with mim
mim train mmpretrain ${CONFIG} --work-dir ${WORK_DIR}
# a specific command example
mim train mmpretrain configs/internimage-tiny_8xb128_in1k-224.py \
--work-dir work_dirs/internimage-tiny_8xb128_in1k-224/
```
##### On Multiple GPUs
```bash
# train with mim
mim train mmpretrain ${CONFIG} \
--work-dir ${WORK_DIR} \
--launcher pytorch --gpus 8
```
##### On Multiple GPUs with Slurm
```bash
# train with mim
mim train mmpretrain ${CONFIG} \
--work-dir ${WORK_DIR} \
--launcher slurm --gpus 16 --gpus-per-node 8 \
--partition ${PARTITION}
```
#### Test
Please download the pretrain weight provided by [OpenGVLab](https://github.com/OpenGVLab/) from [here](https://huggingface.co/OpenGVLab/InternImage/tree/main)
##### On Local Single GPU
```bash
# test with mim
mim test mmpretrain ${CONFIG} -C ${CHECKPOINT}
# a specific command example
mim test mmpretrain configs/internimage-tiny_8xb128_in1k-224.py -C /PATH/TO/internimage_t_1k_224.pth
```
##### On Multiple GPUs
```bash
# test with mim
# a specific command examples, 8 GPUs here
mim test mmpretrain configs/internimage_t_1k_224.py \
-C /PATH/TO/internimage_t_1k_224.pth \
--launcher pytorch --gpus 8
```
##### On Multiple GPUs with Slurm
```bash
# test with mim
mim test mmpretrain ${CONFIG} \
-C ${CHECKPOINT}
--work-dir ${WORK_DIR} \
--launcher slurm --gpus 8 --gpus-per-node 8 \
--partition ${PARTITION} \
$PY_ARGS
```
Note: `PY_ARGS` is other optional args.
## Results on ImageNet1K
The accuracy of different models on ImageNet1K,
| name | resolution | acc@1 | acc@5 | config | weight |
| :------------: | :--------: | :-----: | :-----: | :-------------------------------------------------------: | :-----------------------------------------------------------------------------------------------: |
| InternImage-T | 224 | 83.4700 | 96.5340 | [config](./configs/internimage-tiny_8xb128_in1k-224.py) | [model](https://huggingface.co/OpenGVLab/InternImage/resolve/main/internimage_t_1k_224.pth) |
| InternImage-S | 224 | 84.1640 | 96.9320 | [config](./configs/internimage-small_8xb128_in1k-224.py) | [model](https://huggingface.co/OpenGVLab/InternImage/resolve/main/internimage_s_1k_224.pth) |
| InternImage-B | 224 | 84.8660 | 97.1820 | [config](./configs/internimage-base_8xb128_in1k-224.py) | [model](https://huggingface.co/OpenGVLab/InternImage/resolve/main/internimage_b_1k_224.pth) |
| InternImage-L | 384 | 87.7060 | 98.3820 | [config](./configs/internimage-large_8xb128_in1k-384.py) | [model](https://huggingface.co/OpenGVLab/InternImage/resolve/main/internimage_l_22kto1k_384.pth) |
| InternImage-XL | 384 | 88.0460 | 98.5620 | [config](./configs/internimage-xlagre_8xb128_in1k-384.py) | [model](https://huggingface.co/OpenGVLab/InternImage/resolve/main/internimage_xl_22kto1k_384.pth) |
| InternImage-H | 640 | 89.5500 | 98.8500 | [config](./configs/internimage-huge_8xb128_in1k-640.py) | [model](https://huggingface.co/OpenGVLab/InternImage/resolve/main/internimage_h_22kto1k_640.pth) |
| InternImage-G | 512 | 90.0580 | 98.9700 | [config](./configs/internimage-giant_8xb128_in1k-512.py) | [model](https://huggingface.co/OpenGVLab/InternImage/resolve/main/internimage_g_22kto1k_512.pth) |
## Citation
```bibtex
@article{wang2022internimage,
title={InternImage: Exploring Large-Scale Vision Foundation Models with Deformable Convolutions},
author={Wang, Wenhai and Dai, Jifeng and Chen, Zhe and Huang, Zhenhang and Li, Zhiqi and Zhu, Xizhou and Hu, Xiaowei and Lu, Tong and Lu, Lewei and Li, Hongsheng and others},
journal={arXiv preprint arXiv:2211.05778},
year={2022}
}
```
_base_ = 'mmpretrain::_base_/default_runtime.py'
# dataset settings
dataset_type = 'ImageNet'
data_preprocessor = dict(
num_classes=1000,
# RGB format normalization parameters
mean=[123.675, 116.28, 103.53],
std=[58.395, 57.12, 57.375],
# convert image from BGR to RGB
to_rgb=True,
)
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='RandomResizedCrop',
scale=224,
backend='pillow',
interpolation='bicubic'),
dict(type='RandomFlip', prob=0.5, direction='horizontal'),
dict(type='PackInputs'),
]
test_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='ResizeEdge',
scale=224,
edge='short',
backend='pillow',
interpolation='bicubic'),
dict(type='CenterCrop', crop_size=224),
dict(type='PackInputs'),
]
train_dataloader = dict(
batch_size=128,
num_workers=8,
dataset=dict(
type=dataset_type,
data_root='../../data/imagenet',
data_prefix='train',
pipeline=train_pipeline),
sampler=dict(type='DefaultSampler', shuffle=True),
)
val_dataloader = dict(
batch_size=128,
num_workers=8,
dataset=dict(
type=dataset_type,
data_root='../../data/imagenet',
data_prefix='val',
pipeline=test_pipeline),
sampler=dict(type='DefaultSampler', shuffle=False),
)
val_evaluator = dict(type='Accuracy', topk=(1, 5))
test_dataloader = val_dataloader
test_evaluator = val_evaluator
# model setting
custom_imports = dict(imports='models')
model = dict(
type='ImageClassifier',
backbone=dict(
type='InternImage',
stem_channels=64,
drop_path_rate=0.1,
stage_blocks=[4, 4, 18, 4],
groups=[4, 8, 16, 32]),
neck=dict(type='GlobalAveragePooling'),
head=dict(
type='LinearClsHead',
num_classes=1000,
in_channels=768,
loss=dict(type='CrossEntropyLoss', loss_weight=1.0),
topk=(1, 5)))
# optimizer
optim_wrapper = dict(
optimizer=dict(type='AdamW', lr=1.25e-04, eps=1e-8, betas=(0.9, 0.999)),
weight_decay=0.05)
# learning policy
param_scheduler = [
# warm up learning rate scheduler
dict(
type='LinearLR',
by_epoch=True,
begin=0,
end=20,
convert_to_iter_based=True),
# main learning rate scheduler
dict(
type='CosineAnnealingLR',
T_max=280,
by_epoch=True,
begin=20,
end=300,
eta_min=1.25e-06)
]
# train, val, test setting
train_cfg = dict(by_epoch=True, max_epochs=300, val_interval=1)
val_cfg = dict()
test_cfg = dict()
# NOTE: `auto_scale_lr` is for automatically scaling LR,
# based on the actual training batch size.
auto_scale_lr = dict(base_batch_size=128 * 8)
_base_ = './_base_.py'
model = dict(
backbone=dict(
stem_channels=112,
drop_path_rate=0.5,
stage_blocks=[4, 4, 21, 4],
groups=[7, 14, 28, 56],
layer_scale=1e-5,
post_norm=True),
head=dict(in_channels=1344))
optim_wrapper = dict(optimizer=dict(lr=0.0005))
_base_ = './_base_.py'
model = dict(
backbone=dict(
stem_channels=512,
drop_path_rate=0.4,
stage_blocks=[2, 2, 48, 4],
groups=[16, 32, 64, 128],
dw_kernel_size=5,
level2_post_norm=True,
level2_post_norm_block_ids=[5, 11, 17, 23, 29, 35, 41, 47],
center_feature_scale=True,
use_clip_projector=True,
),
neck=None,
head=dict(in_channels=768))
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='RandomResizedCrop',
scale=512,
backend='pillow',
interpolation='bicubic'),
dict(type='RandomFlip', prob=0.5, direction='horizontal'),
dict(type='PackInputs'),
]
test_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='ResizeEdge',
scale=512,
edge='short',
backend='pillow',
interpolation='bicubic'),
dict(type='CenterCrop', crop_size=512),
dict(type='PackInputs'),
]
train_dataloader = dict(dataset=dict(pipeline=train_pipeline))
val_dataloader = dict(dataset=dict(pipeline=test_pipeline))
test_dataloader = val_dataloader
optim_wrapper = dict(optimizer=dict(lr=5e-6))
param_scheduler = [
dict(
type='LinearLR',
by_epoch=True,
begin=0,
end=2,
convert_to_iter_based=True),
dict(type='CosineAnnealingLR', T_max=18, by_epoch=True, begin=2, end=20)
]
train_cfg = dict(by_epoch=True, max_epochs=20, val_interval=1)
_base_ = './_base_.py'
model = dict(
backbone=dict(
stem_channels=320,
drop_path_rate=0.1,
stage_blocks=[6, 6, 32, 6],
groups=[10, 20, 40, 80],
dw_kernel_size=5,
res_post_norm=True,
level2_post_norm=True,
level2_post_norm_block_ids=[5, 11, 17, 23, 29],
center_feature_scale=True,
use_clip_projector=True,
),
neck=None,
head=dict(in_channels=768))
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='RandomResizedCrop',
scale=640,
backend='pillow',
interpolation='bicubic'),
dict(type='RandomFlip', prob=0.5, direction='horizontal'),
dict(type='PackInputs')
]
test_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='ResizeEdge',
scale=640,
edge='short',
backend='pillow',
interpolation='bicubic'),
dict(type='CenterCrop', crop_size=640),
dict(type='PackInputs')
]
train_dataloader = dict(dataset=dict(pipeline=train_pipeline))
val_dataloader = dict(dataset=dict(pipeline=test_pipeline))
test_dataloader = val_dataloader
optim_wrapper = dict(optimizer=dict(lr=5e-6))
param_scheduler = [
dict(
type='LinearLR',
by_epoch=True,
begin=0,
end=2,
convert_to_iter_based=True),
dict(type='CosineAnnealingLR', T_max=18, by_epoch=True, begin=2, end=20)
]
train_cfg = dict(by_epoch=True, max_epochs=20, val_interval=1)
_base_ = './_base_.py'
model = dict(
backbone=dict(
stem_channels=160,
drop_path_rate=0.1,
stage_blocks=[5, 5, 22, 5],
groups=[10, 20, 40, 80],
layer_scale=1e-5,
offset_scale=2.0,
post_norm=True),
head=dict(in_channels=1920))
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='RandomResizedCrop',
scale=384,
backend='pillow',
interpolation='bicubic'),
dict(type='RandomFlip', prob=0.5, direction='horizontal'),
dict(type='PackInputs')
]
test_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='ResizeEdge',
scale=384,
edge='short',
backend='pillow',
interpolation='bicubic'),
dict(type='CenterCrop', crop_size=384),
dict(type='PackInputs')
]
train_dataloader = dict(dataset=dict(pipeline=train_pipeline))
val_dataloader = dict(dataset=dict(pipeline=test_pipeline))
test_dataloader = val_dataloader
optim_wrapper = dict(optimizer=dict(lr=5e-6))
param_scheduler = [
dict(
type='LinearLR',
by_epoch=True,
begin=0,
end=2,
convert_to_iter_based=True),
dict(type='CosineAnnealingLR', T_max=18, by_epoch=True, begin=2, end=20)
]
train_cfg = dict(by_epoch=True, max_epochs=20, val_interval=1)
_base_ = './_base_.py'
model = dict(
backbone=dict(
stem_channels=80,
drop_path_rate=0.4,
stage_blocks=[4, 4, 21, 4],
groups=[5, 10, 20, 40],
layer_scale=1e-5,
post_norm=True),
head=dict(in_channels=960))
_base_ = './_base_.py'
model = dict(
backbone=dict(
stem_channels=64,
drop_path_rate=0.1,
stage_blocks=[4, 4, 18, 4],
groups=[4, 8, 16, 32]))
_base_ = './_base_.py'
model = dict(
backbone=dict(
stem_channels=192,
drop_path_rate=0.2,
stage_blocks=[5, 5, 24, 5],
groups=[12, 24, 48, 96],
layer_scale=1e-5,
offset_scale=2.0,
post_norm=True),
head=dict(in_channels=2304))
train_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='RandomResizedCrop',
scale=384,
backend='pillow',
interpolation='bicubic'),
dict(type='RandomFlip', prob=0.5, direction='horizontal'),
dict(type='PackInputs')
]
test_pipeline = [
dict(type='LoadImageFromFile'),
dict(
type='ResizeEdge',
scale=384,
edge='short',
backend='pillow',
interpolation='bicubic'),
dict(type='CenterCrop', crop_size=384),
dict(type='PackInputs')
]
train_dataloader = dict(dataset=dict(pipeline=train_pipeline))
val_dataloader = dict(dataset=dict(pipeline=test_pipeline))
test_dataloader = val_dataloader
optim_wrapper = dict(optimizer=dict(lr=5e-6))
param_scheduler = [
dict(
type='LinearLR',
by_epoch=True,
begin=0,
end=2,
convert_to_iter_based=True),
dict(type='CosineAnnealingLR', T_max=18, by_epoch=True, begin=2, end=20)
]
train_cfg = dict(by_epoch=True, max_epochs=20, val_interval=1)
# Copyright (c) OpenMMLab. All rights reserved.
from .intern_image import InternImage
__all__ = ['InternImage']
# Copyright (c) 2022 OpenGVLab
# Copyright (c) OpenMMLab. All rights reserved.
# modified from
# https://github.com/OpenGVLab/InternImage/blob/master/classification/models/intern_image.py
import torch
import torch.nn as nn
import torch.utils.checkpoint as cp
from mmcv.cnn.bricks import DropPath, build_activation_layer
from mmcv.cnn.bricks.transformer import FFN
from mmengine.model.weight_init import trunc_normal_
from ops_dcnv3 import modules as opsm
from mmpretrain.models.backbones.base_backbone import BaseBackbone
from mmpretrain.models.utils import CrossMultiheadAttention
from mmpretrain.registry import MODELS
class to_channels_first(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 3, 1, 2)
class to_channels_last(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 2, 3, 1)
def build_norm_layer(dim,
norm_layer,
in_format='channels_last',
out_format='channels_last',
eps=1e-6):
layers = []
if norm_layer == 'BN':
if in_format == 'channels_last':
layers.append(to_channels_first())
layers.append(nn.BatchNorm2d(dim))
if out_format == 'channels_last':
layers.append(to_channels_last())
elif norm_layer == 'LN':
if in_format == 'channels_first':
layers.append(to_channels_last())
layers.append(nn.LayerNorm(dim, eps=eps))
if out_format == 'channels_first':
layers.append(to_channels_first())
else:
raise NotImplementedError(
f'build_norm_layer does not support {norm_layer}')
return nn.Sequential(*layers)
class AttentiveBlock(nn.Module):
"""Attentive Block.
Args:
dim (int): Number of input channels.
num_heads (int): Number of attention heads.
qkv_bias (bool, optional): If True, add a learnable bias to q, k, v.
Default: False.
qk_scale (float, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
drop (float, optional): Dropout rate. Default: 0.0.
attn_drop (float, optional): Attention dropout rate. Default: 0.0.
drop_path (float, optional): Stochastic depth rate. Default: 0.0.
norm_cfg (dict, optional): Normalization layer.
Default: dict(type='LN')
out_dim (int, optional): Dimension of output. Default: None.
"""
def __init__(self,
dim,
num_heads,
qkv_bias=False,
qk_scale=None,
drop=0.,
attn_drop=0.,
drop_path=0.,
norm_cfg=dict(type='LN'),
out_dim=None):
super().__init__()
norm_layer = norm_cfg['type']
self.norm1_q = build_norm_layer(dim, norm_layer, eps=1e-6)
self.norm1_k = build_norm_layer(dim, norm_layer, eps=1e-6)
self.norm1_v = build_norm_layer(dim, norm_layer, eps=1e-6)
self.cross_dcn = CrossMultiheadAttention(
embed_dims=dim,
num_heads=num_heads,
qkv_bias=qkv_bias,
qk_scale=qk_scale,
attn_drop=attn_drop,
proj_drop=drop,
)
if out_dim and out_dim != dim:
self.cross_dcn.proj = nn.Linear(dim, out_dim)
self.drop_path = DropPath(
drop_path) if drop_path > 0. else nn.Identity()
def forward(self, x_q, x_kv, pos_q, pos_k):
x_q = self.norm1_q(x_q + pos_q)
x_k = self.norm1_k(x_kv + pos_k)
x_v = self.norm1_v(x_kv)
x = self.cross_dcn(x_q, k=x_k, v=x_v)
return x
class AttentionPoolingBlock(AttentiveBlock):
def forward(self, x):
x_q = x.mean(1, keepdim=True)
x_kv = x
pos_q, pos_k = 0, 0
x = super().forward(x_q, x_kv, pos_q, pos_k)
x = x.squeeze(1)
return x
class DownsampleLayer(nn.Module):
"""Downsample layer of InternImage.
Args:
channels (int): number of input channels
norm_layer (str): normalization layer
"""
def __init__(self, channels, norm_layer='LN'):
super().__init__()
self.conv = nn.Conv2d(
channels,
2 * channels,
kernel_size=3,
stride=2,
padding=1,
bias=False)
self.norm = build_norm_layer(2 * channels, norm_layer,
'channels_first', 'channels_last')
def forward(self, x):
x = self.conv(x.permute(0, 3, 1, 2))
x = self.norm(x)
return x
class InternImageLayer(nn.Module):
"""Basic layer of InternImage.
Args:
core_op (nn.Module): core operation of InternImage
channels (int): number of input channels
groups (list): Groups of each block.
mlp_ratio (float): ratio of mlp hidden features to input channels
drop (float): dropout rate
drop_path (float): drop path rate
act_cfg (dict): activation layer
norm_cfg (dict): normalization layer
post_norm (bool): whether to use post normalization
layer_scale (float): layer scale
offset_scale (float): offset scale
with_cp (bool): whether to use checkpoint
"""
def __init__(
self,
core_op,
channels,
groups,
mlp_ratio=4.,
drop=0.,
drop_path=0.,
act_cfg=dict(type='GELU'),
norm_cfg=dict(type='LN'),
post_norm=False,
layer_scale=None,
offset_scale=1.0,
with_cp=False,
dw_kernel_size=None,
res_post_norm=False,
center_feature_scale=False,
remove_center=False,
):
super().__init__()
self.channels = channels
self.groups = groups
self.mlp_ratio = mlp_ratio
self.with_cp = with_cp
self.norm1 = build_norm_layer(channels, 'LN')
self.post_norm = post_norm
self.dcn = core_op(
channels=channels,
kernel_size=3,
stride=1,
pad=1,
dilation=1,
group=groups,
offset_scale=offset_scale,
act_layer=act_cfg['type'],
norm_layer=norm_cfg['type'],
dw_kernel_size=dw_kernel_size,
center_feature_scale=center_feature_scale,
remove_center=remove_center,
)
self.drop_path = DropPath(drop_path) if drop_path > 0. \
else nn.Identity()
self.norm2 = build_norm_layer(channels, 'LN')
self.mlp = FFN(
embed_dims=channels,
feedforward_channels=int(channels * mlp_ratio),
act_cfg=act_cfg,
ffn_drop=drop,
add_identity=False)
self.layer_scale = layer_scale is not None
if self.layer_scale:
self.gamma1 = nn.Parameter(
layer_scale * torch.ones(channels), requires_grad=True)
self.gamma2 = nn.Parameter(
layer_scale * torch.ones(channels), requires_grad=True)
self.res_post_norm = res_post_norm
if res_post_norm:
self.res_post_norm1 = build_norm_layer(channels, 'LN')
self.res_post_norm2 = build_norm_layer(channels, 'LN')
def forward(self, x):
def _inner_forward(x):
if not self.layer_scale:
if self.post_norm:
x = x + self.drop_path(self.norm1(self.dcn(x)))
x = x + self.drop_path(self.norm2(self.mlp(x)))
elif self.res_post_norm:
x = x + self.drop_path(
self.res_post_norm1(self.dcn(self.norm1(x))))
x = x + self.drop_path(
self.res_post_norm2(self.mlp(self.norm2(x))))
else:
x = x + self.drop_path(self.dcn(self.norm1(x)))
x = x + self.drop_path(self.mlp(self.norm2(x)))
return x
if self.post_norm:
x = x + self.drop_path(self.gamma1 * self.norm1(self.dcn(x)))
x = x + self.drop_path(self.gamma2 * self.norm2(self.mlp(x)))
else:
x = x + self.drop_path(self.gamma1 * self.dcn(self.norm1(x)))
x = x + self.drop_path(self.gamma2 * self.mlp(self.norm2(x)))
return x
if self.with_cp and x.requires_grad:
x = cp.checkpoint(_inner_forward, x)
else:
x = _inner_forward(x)
return x
class InternImageBlock(nn.Module):
"""Block of InternImage.
Args:
core_op (nn.Module): core operation of InternImage
channels (int): number of input channels
depths (list): Depth of each block.
groups (list): Groups of each block.
mlp_ratio (float): ratio of mlp hidden features to input channels
drop (float): dropout rate
drop_path (float): drop path rate
act_cfg (dict): activation layer
norm_cfg (dict): normalization layer
post_norm (bool): whether to use post normalization
layer_scale (float): layer scale
offset_scale (float): offset scale
with_cp (bool): whether to use checkpoint
"""
def __init__(
self,
core_op,
channels,
depth,
groups,
downsample=True,
mlp_ratio=4.,
drop=0.,
drop_path=0.,
act_cfg=dict(type='GELU'),
norm_cfg=dict(type='LN'),
post_norm=False,
offset_scale=1.0,
layer_scale=None,
with_cp=False,
dw_kernel_size=None,
post_norm_block_ids=None,
res_post_norm=False,
center_feature_scale=False,
remove_center=False,
):
super().__init__()
self.channels = channels
self.depth = depth
self.post_norm = post_norm
self.center_feature_scale = center_feature_scale
self.blocks = nn.ModuleList([
InternImageLayer(
core_op=core_op,
channels=channels,
groups=groups,
mlp_ratio=mlp_ratio,
drop=drop,
drop_path=drop_path[i]
if isinstance(drop_path, list) else drop_path,
act_cfg=act_cfg,
norm_cfg=norm_cfg,
post_norm=post_norm,
layer_scale=layer_scale,
offset_scale=offset_scale,
with_cp=with_cp,
dw_kernel_size=dw_kernel_size,
res_post_norm=res_post_norm,
center_feature_scale=center_feature_scale,
remove_center=remove_center,
) for i in range(depth)
])
if not self.post_norm or center_feature_scale:
self.norm = build_norm_layer(channels, 'LN')
self.post_norm_block_ids = post_norm_block_ids
if post_norm_block_ids is not None:
self.post_norms = nn.ModuleList([
build_norm_layer(channels, 'LN', eps=1e-6)
for _ in post_norm_block_ids
])
self.downsample = DownsampleLayer(
channels=channels,
norm_layer=norm_cfg['type']) if downsample else None
def forward(self, x, return_wo_downsample=False):
for i, blk in enumerate(self.blocks):
x = blk(x)
if (self.post_norm_block_ids
is not None) and (i in self.post_norm_block_ids):
index = self.post_norm_block_ids.index(i)
x = self.post_norms[index](x)
if not self.post_norm or self.center_feature_scale:
x = self.norm(x)
if return_wo_downsample:
x_ = x
if self.downsample is not None:
x = self.downsample(x)
if return_wo_downsample:
return x, x_
return x
@MODELS.register_module()
class InternImage(BaseBackbone):
""" InternImage
A PyTorch impl of : `InternImage: Exploring Large-Scale Vision Foundation Models with Deformable Convolutions` -
https://arxiv.org/pdf/2103.14030
Args:
core_op (str): Core operator. Default: 'DCNv3'
stem_channels (int): Number of the first stage. Default: 64
stage_blocks (list): Depth of each block. Default: [3, 4, 18, 5]
groups (list): Groups of each block. Default: [3, 6, 12, 24]
num_classes (int): Number of classes. Default: 1000
mlp_ratio (float): Ratio of mlp hidden dim to embedding dim. Default: 4.
drop_rate (float): Probability of an element to be zeroed. Default: 0.
drop_path_rate (float): Stochastic depth rate. Default: 0.
act_cfg (dict): Activation layer. Default: dict(type='GELU')
norm_cfg (dict): Normalization layer. Default: dict(type='LN')
layer_scale (bool): Whether to use layer scale. Default: False
cls_scale (bool): Whether to use class scale. Default: False
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
dw_kernel_size (int): Size of the dwconv. Default: None
use_clip_projector (bool): Whether to use clip projector. Default: False
level2_post_norm (bool): Whether to use level2 post norm. Default: False
level2_post_norm_block_ids (list): Indexes of post norm blocks. Default: None
res_post_norm (bool): Whether to use res post norm. Default: False
center_feature_scale (bool): Whether to use center feature scale. Default: False
""" # noqa: E501
def __init__(self,
stem_channels=64,
stage_blocks=[3, 4, 18, 5],
groups=[3, 6, 12, 24],
mlp_ratio=4.,
drop_rate=0.,
drop_path_rate=0.2,
drop_path_type='linear',
act_cfg=dict(type='GELU'),
norm_cfg=dict(type='LN'),
layer_scale=None,
offset_scale=1.0,
post_norm=False,
cls_scale=1.5,
with_cp=False,
dw_kernel_size=None,
use_clip_projector=False,
level2_post_norm=False,
level2_post_norm_block_ids=None,
res_post_norm=False,
center_feature_scale=False,
remove_center=False,
init_cfg=None):
super(InternImage, self).__init__(init_cfg)
self.core_op = 'DCNv3'
self.num_stages = len(stage_blocks)
self.num_features = int(stem_channels * 2**(self.num_stages - 1))
self.post_norm = post_norm
self.mlp_ratio = mlp_ratio
self.use_clip_projector = use_clip_projector
self.level2_post_norm_block_ids = level2_post_norm_block_ids
self.remove_center = remove_center
self.act_cfg = act_cfg
self.norm_cfg = norm_cfg
# stem layer
self._make_stem_layer(in_channels=3, stem_channels=stem_channels)
self.pos_drop = nn.Dropout(p=drop_rate)
# stochastic depth decay rule
total_depth = sum(stage_blocks)
dpr = [
x.item() for x in torch.linspace(0, drop_path_rate, total_depth)
]
if drop_path_type == 'uniform':
for i in range(len(dpr)):
dpr[i] = drop_path_rate
# InternImage Layers
self.layers = nn.ModuleList()
for i in range(self.num_stages):
if level2_post_norm and i == 2:
post_norm_block_ids = level2_post_norm_block_ids
else:
post_norm_block_ids = None
layer = InternImageBlock(
core_op=getattr(opsm, self.core_op),
channels=int(stem_channels * 2**i),
depth=stage_blocks[i],
groups=groups[i],
mlp_ratio=self.mlp_ratio,
drop=drop_rate,
drop_path=dpr[sum(stage_blocks[:i]):sum(stage_blocks[:i + 1])],
act_cfg=act_cfg,
norm_cfg=norm_cfg,
post_norm=post_norm,
downsample=(i < self.num_stages - 1),
layer_scale=layer_scale,
offset_scale=offset_scale,
with_cp=with_cp,
dw_kernel_size=dw_kernel_size,
post_norm_block_ids=post_norm_block_ids,
res_post_norm=res_post_norm,
center_feature_scale=center_feature_scale,
remove_center=remove_center,
)
self.layers.append(layer)
# Conv Head
if not use_clip_projector:
self.conv_head = nn.Sequential(
nn.Conv2d(
self.num_features,
int(self.num_features * cls_scale),
kernel_size=1,
bias=False),
build_norm_layer(
int(self.num_features * cls_scale), 'BN', 'channels_first',
'channels_first'), build_activation_layer(act_cfg))
else:
pretrain_embed_dim, _stride, attnpool_num_heads, clip_embed_dim \
= 1024, 2, 16, 768
self.dcnv3_head_x4 = nn.Sequential(
nn.Conv2d(
in_channels=self.num_features,
out_channels=pretrain_embed_dim * (_stride**2),
kernel_size=1), nn.PixelShuffle(_stride))
self.dcnv3_head_x3 = nn.Conv2d(
in_channels=self.num_features // 2,
out_channels=pretrain_embed_dim,
kernel_size=1)
self.clip_projector = AttentionPoolingBlock(
dim=pretrain_embed_dim,
num_heads=attnpool_num_heads,
qkv_bias=True,
qk_scale=None,
drop=0.,
attn_drop=0.,
norm_cfg=norm_cfg,
out_dim=clip_embed_dim)
norm_layer = norm_cfg['type']
self.fc_norm = build_norm_layer(
clip_embed_dim, norm_layer, eps=1e-6)
def init_weights(self):
super(InternImage, self).init_weights()
for m in self.modules():
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, getattr(opsm, self.core_op)):
m._reset_parameters()
def _make_stem_layer(self, in_channels, stem_channels):
norm_layer = self.norm_cfg['type']
self.patch_embed = nn.Sequential(
nn.Conv2d(
in_channels,
stem_channels // 2,
kernel_size=3,
stride=2,
padding=1),
build_norm_layer(stem_channels // 2, norm_layer, 'channels_first',
'channels_first'),
build_activation_layer(self.act_cfg),
nn.Conv2d(
stem_channels // 2,
stem_channels,
kernel_size=3,
stride=2,
padding=1),
build_norm_layer(stem_channels, norm_layer, 'channels_first',
'channels_last'),
)
def forward_features(self, x):
x = self.patch_embed(x)
x = self.pos_drop(x)
for layer in self.layers:
x = layer(x)
x = self.conv_head(x.permute(0, 3, 1, 2))
return (x, )
def forward_features_seq_out(self, x):
x = self.patch_embed(x)
x = self.pos_drop(x)
seq_out = []
for layer in self.layers:
x, x_ = layer(x, return_wo_downsample=True)
seq_out.append(x_)
return seq_out
def forward_clip_projector(self, x): # for InternImage-H/G
xs = self.forward_features_seq_out(x)
x1, x2, x3, x4 = xs
x1 = x1.permute(0, 3, 1, 2) # NHWC -> NCHW
x2 = x2.permute(0, 3, 1, 2) # NHWC -> NCHW
x3 = x3.permute(0, 3, 1, 2) # NHWC -> NCHW
x4 = x4.permute(0, 3, 1, 2) # NHWC -> NCHW
x4 = self.dcnv3_head_x4(x4)
x = x4
x3 = self.dcnv3_head_x3(x3)
x = x + x3
x = x.flatten(-2).transpose(1, 2).contiguous()
x = self.clip_projector(x)
x = self.fc_norm(x)
return (x, )
def forward(self, x):
if not self.use_clip_projector:
# for InternImage-T/S/B/L/XL
return self.forward_features(x)
else:
# for InternImage-H/G
return self.forward_clip_projector(x)
@staticmethod
def _checkpoint_filter(state_dict, prefix, local_metadata, strict,
missing_keys, unexpected_keys, error_msgs):
def internimage_to_mmpretrain():
for k, v in state_dict['model'].items():
if 'head.' in k and 'conv_head' not in k:
if 'weight' in k:
new_k = 'head.fc.weight'
else:
new_k = 'head.fc.bias'
elif 'patch_embed' in k:
map_fun = {
'conv1': '0',
'norm1': '1',
'conv2': '3',
'norm2': '4'
}
new_k = k
for old, new in map_fun.items():
new_k = new_k.replace(old, new)
new_k = 'backbone.' + new_k
elif 'levels' in k:
new_k = k.replace('levels', 'layers')
if 'mlp' in new_k:
new_k = new_k.replace('fc1', 'layers.0.0')
new_k = new_k.replace('fc2', 'layers.1')
new_k = 'backbone.' + new_k
elif 'clip_projector.cross_dcn.k_bias' in k:
continue
else:
new_k = 'backbone.' + k
state_dict[new_k] = state_dict['model'][k]
del state_dict['model']
# The original weights need to be converted to mmpretrain format.
# Some modules in the original weights starts with 'levels',
# and in this implement they are replaced with 'layers'.
if 'model' in state_dict and 'levels.0.blocks.0.norm1.0.weight'\
in state_dict['model']:
internimage_to_mmpretrain()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment