# --------------------------------------------- # Copyright (c) OpenMMLab. All rights reserved. # --------------------------------------------- # Modified by Zhiqi Li # --------------------------------------------- import random import warnings import numpy as np import torch import torch.distributed as dist from mmcv.parallel import MMDataParallel, MMDistributedDataParallel from mmcv.runner import (HOOKS, DistSamplerSeedHook, EpochBasedRunner, Fp16OptimizerHook, OptimizerHook, build_optimizer, build_runner, get_dist_info) from mmcv.utils import build_from_cfg from mmdet.core import EvalHook from mmdet.datasets import (build_dataset, replace_ImageToTensor) from mmdet.utils import get_root_logger import time import os.path as osp from projects.mmdet3d_plugin.datasets.builder import build_dataloader from projects.mmdet3d_plugin.core.evaluation.eval_hooks import CustomDistEvalHook from projects.mmdet3d_plugin.datasets import custom_build_dataset from mmcv.runner import Hook class ProfilerHook(Hook): def __init__(self, profiler, total_steps): self.profiler = profiler self.total_steps = total_steps # 总步数 (wait + warmup + active) * repeat self.stopped = False def after_train_iter(self, runner): # if not self.stopped: # self.profiler.step() # 检测是否完成所有schedule步骤 if self.profiler.step_num == self.total_steps - 1 and not self.stopped: # 停止Profiler self.profiler.stop() self.stopped = True # 只在rank 0上打印结果 rank, _ = get_dist_info() if rank == 0: # 获取并打印关键指标 # table = self.profiler.key_averages().table( # sort_by="self_cuda_time_total", # row_limit=10 # ) # runner.logger.info(f"Profiler results after {self.total_steps} steps:\n{table}") # table = self.profiler.key_averages().table( # sort_by="self_cpu_time_total", # row_limit=10 # ) # runner.logger.info(f"Profiler results after {self.total_steps} steps:\n{table}") results = self.profiler.key_averages().table(sort_by="cuda_time_total") log_file = "./BW_log_step{}.txt".format(self.total_steps) with open(log_file, mode='w') as file: file.write(str(results)) # self.profiler.start() if not self.stopped: self.profiler.step() def custom_train_detector(model, dataset, cfg, distributed=False, validate=False, timestamp=None, eval_model=None, meta=None): logger = get_root_logger(cfg.log_level) # prepare data loaders dataset = dataset if isinstance(dataset, (list, tuple)) else [dataset] #assert len(dataset)==1s if 'imgs_per_gpu' in cfg.data: logger.warning('"imgs_per_gpu" is deprecated in MMDet V2.0. ' 'Please use "samples_per_gpu" instead') if 'samples_per_gpu' in cfg.data: logger.warning( f'Got "imgs_per_gpu"={cfg.data.imgs_per_gpu} and ' f'"samples_per_gpu"={cfg.data.samples_per_gpu}, "imgs_per_gpu"' f'={cfg.data.imgs_per_gpu} is used in this experiments') else: logger.warning( 'Automatically set "samples_per_gpu"="imgs_per_gpu"=' f'{cfg.data.imgs_per_gpu} in this experiments') cfg.data.samples_per_gpu = cfg.data.imgs_per_gpu data_loaders = [ build_dataloader( ds, cfg.data.samples_per_gpu, cfg.data.workers_per_gpu, # cfg.gpus will be ignored if distributed len(cfg.gpu_ids), dist=distributed, seed=cfg.seed, shuffler_sampler=cfg.data.shuffler_sampler, # dict(type='DistributedGroupSampler'), nonshuffler_sampler=cfg.data.nonshuffler_sampler, # dict(type='DistributedSampler'), ) for ds in dataset ] # put model on gpus if distributed: print("============================distributed yes=================================================") find_unused_parameters = cfg.get('find_unused_parameters', False) # Sets the `find_unused_parameters` parameter in # torch.nn.parallel.DistributedDataParallel model = MMDistributedDataParallel( model.to(device='cuda', memory_format=torch.channels_last), # model.cuda(), device_ids=[torch.cuda.current_device()], broadcast_buffers=False, find_unused_parameters=find_unused_parameters) if eval_model is not None: eval_model = MMDistributedDataParallel( eval_model.to(device='cuda', memory_format=torch.channels_last), # model.cuda(), device_ids=[torch.cuda.current_device()], broadcast_buffers=False, find_unused_parameters=find_unused_parameters) else: print("============================distributed no=================================================") model = MMDataParallel( model.cuda(cfg.gpu_ids[0]), device_ids=cfg.gpu_ids) if eval_model is not None: eval_model = MMDataParallel( eval_model.cuda(cfg.gpu_ids[0]), device_ids=cfg.gpu_ids) # build runner optimizer = build_optimizer(model, cfg.optimizer) if 'runner' not in cfg: cfg.runner = { 'type': 'EpochBasedRunner', 'max_epochs': cfg.total_epochs } warnings.warn( 'config is now expected to have a `runner` section, ' 'please set `runner` in your config.', UserWarning) else: if 'total_epochs' in cfg: assert cfg.total_epochs == cfg.runner.max_epochs if eval_model is not None: runner = build_runner( cfg.runner, default_args=dict( model=model, eval_model=eval_model, optimizer=optimizer, work_dir=cfg.work_dir, logger=logger, meta=meta)) else: runner = build_runner( cfg.runner, default_args=dict( model=model, optimizer=optimizer, work_dir=cfg.work_dir, logger=logger, meta=meta)) # an ugly workaround to make .log and .log.json filenames the same runner.timestamp = timestamp # fp16 setting fp16_cfg = cfg.get('fp16', None) if fp16_cfg is not None: optimizer_config = Fp16OptimizerHook( **cfg.optimizer_config, **fp16_cfg, distributed=distributed) elif distributed and 'type' not in cfg.optimizer_config: optimizer_config = OptimizerHook(**cfg.optimizer_config) else: optimizer_config = cfg.optimizer_config # register hooks runner.register_training_hooks(cfg.lr_config, optimizer_config, cfg.checkpoint_config, cfg.log_config, cfg.get('momentum_config', None)) # register profiler hook #trace_config = dict(type='tb_trace', dir_name='work_dir') #profiler_config = dict(on_trace_ready=trace_config) #runner.register_profiler_hook(profiler_config) if distributed: if isinstance(runner, EpochBasedRunner): runner.register_hook(DistSamplerSeedHook()) # register eval hooks if validate: # Support batch_size > 1 in validation val_samples_per_gpu = cfg.data.val.pop('samples_per_gpu', 1) if val_samples_per_gpu > 1: assert False # Replace 'ImageToTensor' to 'DefaultFormatBundle' cfg.data.val.pipeline = replace_ImageToTensor( cfg.data.val.pipeline) val_dataset = custom_build_dataset(cfg.data.val, dict(test_mode=True)) val_dataloader = build_dataloader( val_dataset, samples_per_gpu=val_samples_per_gpu, workers_per_gpu=cfg.data.workers_per_gpu, dist=distributed, shuffle=False, shuffler_sampler=cfg.data.shuffler_sampler, # dict(type='DistributedGroupSampler'), nonshuffler_sampler=cfg.data.nonshuffler_sampler, # dict(type='DistributedSampler'), ) eval_cfg = cfg.get('evaluation', {}) eval_cfg['by_epoch'] = cfg.runner['type'] != 'IterBasedRunner' eval_cfg['jsonfile_prefix'] = osp.join('val', cfg.work_dir, time.ctime().replace(' ','_').replace(':','_')) eval_hook = CustomDistEvalHook if distributed else EvalHook runner.register_hook(eval_hook(val_dataloader, **eval_cfg)) # user-defined hooks if cfg.get('custom_hooks', None): custom_hooks = cfg.custom_hooks assert isinstance(custom_hooks, list), \ f'custom_hooks expect list type, but got {type(custom_hooks)}' for hook_cfg in cfg.custom_hooks: assert isinstance(hook_cfg, dict), \ 'Each item in custom_hooks expects dict type, but got ' \ f'{type(hook_cfg)}' hook_cfg = hook_cfg.copy() priority = hook_cfg.pop('priority', 'NORMAL') hook = build_from_cfg(hook_cfg, HOOKS) runner.register_hook(hook, priority=priority) if cfg.resume_from: runner.resume(cfg.resume_from) elif cfg.load_from: runner.load_checkpoint(cfg.load_from) if cfg.get('enable_profiler', False): # 创建profiler配置 total_steps = (1 + 20 + 15) * 1 # 22 steps profiler = torch.profiler.profile( activities=[ torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA ], schedule=torch.profiler.schedule( wait=1, # 跳过前1个step warmup=20, # 预热1个step(不计入结果) active=1, # 分析3个step repeat=1 # 只执行一轮 ), on_trace_ready=torch.profiler.tensorboard_trace_handler( # f"{cfg.work_dir}/profiler_logs" # 输出目录 "/home/BEVFormer/profiler_logs" # "./profiler_logs" ), with_stack=True, # 收集调用栈信息 profile_memory=False, # 分析内存使用 record_shapes=False # 记录张量形状 ) # 创建并注册ProfilerHook # profiler_hook = ProfilerHook(profiler) profiler_hook = ProfilerHook(profiler,total_steps) runner.register_hook(profiler_hook) # 启动profiler profiler.start() print("==================================== profiler.start()===================================================================") try: # 运行训练 runner.run(data_loaders, cfg.workflow) finally: # 确保profiler停止 profiler.stop() else: # 正常训练 runner.run(data_loaders, cfg.workflow) # runner.run(data_loaders, cfg.workflow)