from hour_dataset import HourDataset from lstm import LSTMSimple import torch from torch import dropout, nn import numpy as np import sys from fitlog import FitLog from torch.utils.data import DataLoader import os import torch.distributed from sklearn.metrics import accuracy_score # from metric import MetricCollector class LSTMDriverHour(): def __init__(self, local_rank): # self.epochs = 20000 self.epochs = 2000 self.batch_size = 156 self.hidden_size = 2048 self.nlayer = 2 self.n_class = 2 self.lr = 0.0001 self.early_stop_thresh = 0.001 self.early_stop_nreach_limit = 10 self.dataset = None self.model = None self.criterion = None self.optimizer = None self.loader = None self.device = None self.local_rank = local_rank def __get_description(self): ret = {'epochs': self.epochs, \ 'batch_size:': self.batch_size, \ 'hidden_size': self.hidden_size, \ 'nlayer': self.nlayer, \ 'n_class': self.n_class, \ 'lr': self.lr, \ 'step_size': self.dataset.get_time_step(), \ 'input_size': self.dataset.get_input_size()} return ret def load(self, path=None): self.dataset = HourDataset(100, random_seed=1) if path == None: self.dataset.load() else: self.dataset.loadfile(path) def __prepare_for_fit(self): self.device = torch.device('cuda', self.local_rank) #self.device = torch.device('cuda', 0) self.model = LSTMSimple(self.dataset.get_input_size(), self.hidden_size, self.nlayer, self.n_class, self.device) self.model.to(self.device) self.model = nn.parallel.DistributedDataParallel(self.model, device_ids=[self.local_rank],find_unused_parameters=True) self.criterion = nn.CrossEntropyLoss() self.optimizer = torch.optim.Adam(self.model.parameters(), self.lr) # self.optimizer = torch.optim.SGD(self.model.parameters(), self.lr) def get_device(self): device_mlu = None device_gpu = None try: # device_mlu = torch.device('mlu') device_gpu = torch.device('cuda') except Exception as err: print(err) if device_mlu: self.fitlog.append('mlu', True, True) return device_mlu elif device_gpu: self.fitlog.append('cuda', True, True) return device_gpu else: self.fitlog.append('cpu', True, True) return torch.device('cpu') def fit(self): if self.dataset == None: print('no data was loaded') return None self.__prepare_for_fit() if self.local_rank == 0: fitlog = FitLog(folderpath='logs/') fitlog.append(str(self.__get_description()), with_time=True) print(self.__get_description()) self.dataset.set_mode('train') sampler = torch.utils.data.distributed.DistributedSampler(self.dataset) self.loader = torch.utils.data.DataLoader(self.dataset, batch_size=self.batch_size, sampler=sampler, shuffle=False) # self.loader = DataLoader(self.dataset, batch_size=self.batch_size, shuffle=True) nbatch = len(self.loader) print('batch:', nbatch) lowest_loss = 99999999 lowest_loss_at = -1 early_stop_nreach = 0 best_acc = 0 best_acc_at = -1 import time dt = [0.0, 0.0, 0.0] ts = time.time() # 如果是训练单个epoch,也可以移到下一层循环内 best_acc=0 for epoch in range(self.epochs): self.dataset.set_mode('train') self.model.train() t0 = time.time() if self.local_rank == 0: time_start = time.time() for i, (feats, labs) in enumerate(self.loader): self.optimizer.zero_grad() feats = feats.reshape(-1, self.dataset.get_time_step(), \ self.dataset.get_input_size()) feats = feats.to(self.device) labs = labs.to(self.device) outputs = self.model(feats) loss = self.criterion(outputs, labs) loss.backward() self.optimizer.step() torch.cuda.synchronize() if self.local_rank == 0: print('e2e time. {}s'.format((time.time() - time_start) / len(self.loader))) t1 = time.time() dt[0] += t1 - t0 cur_acc =self._validate2() if cur_acc>best_acc: best_acc=cur_acc acc_str='Best_acc: {:.4f}'.format(best_acc) print(acc_str) fitlog.append(acc_str, with_time=True, change_line=True) # print(f'Pre Epoch. ({ self.local_rank} {t1-t0:.5f}s)') t2 = time.time() if lowest_loss > loss.item(): lowest_loss = loss.item() lowest_loss_at = epoch + 1 # print(f'Loss. ({time.time()-t2:.5f}s)') if i + 1 == nbatch and self.local_rank == 0: ptstr = 'Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Best: {:.4f}, Best at: {}'.format( \ epoch + 1, self.epochs, i + 1, nbatch, loss.item(), lowest_loss, lowest_loss_at) print(ptstr) fitlog.append(ptstr, with_time=True, change_line=True) # acc_list.append(acc) # val_loss_list.append(val_loss.cpu().numpy()) # acc1=np.mean(acc_list) # val_loss1=np.mean(val_loss_list) # print("Epoch [{}/{}],acc:{},val_loss:{}".format(epoch + 1, self.epochs,str(acc1), str(val_loss1)),with_time=True, change_line=True) # fitlog.append("acc:{},val_loss:{}".format(str(acc1), str(val_loss1)),with_time=True, change_line=True) if loss.item() < self.early_stop_thresh: early_stop_nreach += 1 else: early_stop_nreach = 0 # prof.step() t3 = time.time() dt[1] += t3 - t2 t4 = time.time() dt[2] += t4 - t3 # ptstr, acc, val_loss=self.validate() # fitlog.append(ptstr+",acc:{},val_loss:{}".format(str(acc), str(val_loss)),with_time=True, change_line=True) # if early_stop_nreach >= self.early_stop_nreach_limit: # break if self.local_rank == 0: fitlog.append(f'Done {self.epochs} Epoch. ({time.time() - ts:.5f}s)', with_time=True, change_line=True) fitlog.append("Train: {}s, Log: {}s Val: {}s".format(dt[0], dt[1], dt[2]), with_time=True, change_line=True) fitlog.close() print(f'Done {self.epochs} Epoch. ({time.time() - ts:.5f}s)') print("Train: {}s, Log: {}s Val: {}s".format(dt[0], dt[1], dt[2])) def _validate2(self): self.model.eval() self.dataset.set_mode('test') all_pred = [] all_tar = [] accs = [] all_loss = [] with torch.no_grad(): for i, (ft, labs) in enumerate(self.loader): ft, labs = ft.to(self.device), labs.to(self.device) output = self.model(ft) loss = self.criterion(output, labs) preds = torch.argmax(output, dim=1).cpu().numpy().tolist() all_pred.extend(preds) all_tar.extend(labs.cpu().numpy().tolist()) accs.append(accuracy_score(all_tar, all_pred)) all_loss.append(loss.item()) # if i % 100 == 0: # print('validating @ batch {}'.format(i)) # if g_dubug: # break # ptstr = "Validation ACC: %.4f, loss: %.4f" % (np.mean(accs), np.mean(all_loss)) # self.dataset.set_mode('train') return np.mean(accs) # return np.mean(accs), np.mean(all_loss),np.std(all_loss), all_pred, all_tar def init_ddp(visiable_devices='0,1,2,3,4,5,6,7,8'): if torch.cuda.device_count() > 1: os.environ['HIP_VISIBLE_DEVICES'] = visiable_devices local_rank = int(os.environ["LOCAL_RANK"]) print("local_rank:" + str(local_rank)) #torch.distributed.init_process_group(backend='nccl', init_method='tcp://localhost:23456', rank=0, world_size=1) torch.distributed.init_process_group(backend="nccl") # local_rank = torch.distributed.get_rank() torch.cuda.set_device(local_rank) # device = torch.device("cuda", args.local_rank) return local_rank else: return None if __name__ == "__main__": local_rank = init_ddp() driver = LSTMDriverHour(local_rank=local_rank) driver.load('dat_3day_pcase') driver.fit()