Commit 25ccd163 authored by Sugon_ldc's avatar Sugon_ldc
Browse files

add model lstm

parents
Pipeline #2078 failed with stages
in 0 seconds
{
"BENCHMARK_LOG" : [
"net",
"batch_size",
"cards",
"precision",
"DPF_mode",
"batch_time_avg",
"batch_time_var",
"hardware_time_avg",
"hardware_time_var",
"throughput"
],
"AVG_LOG" : [
"net",
"accuracy"
]
}
\ No newline at end of file
File added
# import pymysql
# import pymongo
def GetDBConn():
conn = pymysql.connect(
host="127.0.0.1",
user="root",
password="",
database="",
charset="utf8")
return conn
def Runsql(sqlstr, conn):
ret = []
cur = conn.cursor()
cur.execute(sqlstr)
row = cur.fetchone()
while row is not None:
ret.append(row)
row = cur.fetchone()
cur.close()
return ret
def RunsqlMany(sqlstr, conn):
cur = conn.cursor()
cur.execute(sqlstr)
rows = cur.fetchall()
cur.close()
return rows
# dbname="huawei2021_w1"
# dbname="huawei2021_w1_core4"
# def GetMongoCollection(collectionName, dbname="valence_sp2"):
def GetMongoCollection(collectionName, dbname):
client = pymongo.MongoClient(host='localhost',
port=27017,serverSelectionTimeoutMS=20000,
socketTimeoutMS=20000,connectTimeoutMS=20000)
db = client[dbname]
collection = db[collectionName]
return collection
def CreateIndex(coll, key_order):
coll.create_index(key_order, unique=False, background=False)
\ No newline at end of file
#!/bin/bash
export MY_CONTAINER="lstm-test"
num=`docker ps -a|grep "$MY_CONTAINER"|wc -l`
echo $num
echo $MY_CONTAINER
if [ 0 -eq $num ];then
docker run -it \
--shm-size '64gb' \
--name $MY_CONTAINER \
-v [Your workspace that stores lstm dir]:/workspace \
-v /usr/bin/cnmon:/usr/bin/cnmon \
--net multi_node \
--pid host \
--privileged \
yellow.hub.cambricon.com/pytorch/daily/x86_64/pytorch:v1.16.0-torch1.9-x86_64-ubuntu20.04-catch_1.9_develop_3.7-latest \
/bin/bash
else
docker start $MY_CONTAINER
docker exec -it $MY_CONTAINER /bin/bash
fi
from hour_dataset import HourDataset
from lstm import LSTMSimple
import torch
from torch import dropout, nn
import numpy as np
import sys
from fitlog import FitLog
from torch.utils.data import DataLoader
import os
import torch.distributed
from sklearn.metrics import accuracy_score
# from metric import MetricCollector
class LSTMDriverHour():
def __init__(self, local_rank):
# self.epochs = 20000
self.epochs = 2000
self.batch_size = 156
self.hidden_size = 2048
self.nlayer = 2
self.n_class = 2
self.lr = 0.0001
self.early_stop_thresh = 0.001
self.early_stop_nreach_limit = 10
self.dataset = None
self.model = None
self.criterion = None
self.optimizer = None
self.loader = None
self.device = None
self.local_rank = local_rank
def __get_description(self):
ret = {'epochs': self.epochs, \
'batch_size:': self.batch_size, \
'hidden_size': self.hidden_size, \
'nlayer': self.nlayer, \
'n_class': self.n_class, \
'lr': self.lr, \
'step_size': self.dataset.get_time_step(), \
'input_size': self.dataset.get_input_size()}
return ret
def load(self, path=None):
self.dataset = HourDataset(100, random_seed=1)
if path == None:
self.dataset.load()
else:
self.dataset.loadfile(path)
def __prepare_for_fit(self):
self.device = torch.device('cuda', self.local_rank)
#self.device = torch.device('cuda', 0)
self.model = LSTMSimple(self.dataset.get_input_size(), self.hidden_size, self.nlayer, self.n_class, self.device)
self.model.to(self.device)
self.model = nn.parallel.DistributedDataParallel(self.model, device_ids=[self.local_rank],find_unused_parameters=True)
self.criterion = nn.CrossEntropyLoss()
self.optimizer = torch.optim.Adam(self.model.parameters(), self.lr)
# self.optimizer = torch.optim.SGD(self.model.parameters(), self.lr)
def get_device(self):
device_mlu = None
device_gpu = None
try:
# device_mlu = torch.device('mlu')
device_gpu = torch.device('cuda')
except Exception as err:
print(err)
if device_mlu:
self.fitlog.append('mlu', True, True)
return device_mlu
elif device_gpu:
self.fitlog.append('cuda', True, True)
return device_gpu
else:
self.fitlog.append('cpu', True, True)
return torch.device('cpu')
def fit(self):
if self.dataset == None:
print('no data was loaded')
return None
self.__prepare_for_fit()
if self.local_rank == 0:
fitlog = FitLog(folderpath='logs/')
fitlog.append(str(self.__get_description()), with_time=True)
print(self.__get_description())
self.dataset.set_mode('train')
sampler = torch.utils.data.distributed.DistributedSampler(self.dataset)
self.loader = torch.utils.data.DataLoader(self.dataset, batch_size=self.batch_size, sampler=sampler,
shuffle=False)
# self.loader = DataLoader(self.dataset, batch_size=self.batch_size, shuffle=True)
nbatch = len(self.loader)
print('batch:', nbatch)
lowest_loss = 99999999
lowest_loss_at = -1
early_stop_nreach = 0
best_acc = 0
best_acc_at = -1
import time
dt = [0.0, 0.0, 0.0]
ts = time.time()
# 如果是训练单个epoch,也可以移到下一层循环内
best_acc=0
for epoch in range(self.epochs):
self.dataset.set_mode('train')
self.model.train()
t0 = time.time()
if self.local_rank == 0:
time_start = time.time()
for i, (feats, labs) in enumerate(self.loader):
self.optimizer.zero_grad()
feats = feats.reshape(-1, self.dataset.get_time_step(), \
self.dataset.get_input_size())
feats = feats.to(self.device)
labs = labs.to(self.device)
outputs = self.model(feats)
loss = self.criterion(outputs, labs)
loss.backward()
self.optimizer.step()
torch.cuda.synchronize()
if self.local_rank == 0:
print('e2e time. {}s'.format((time.time() - time_start) / len(self.loader)))
t1 = time.time()
dt[0] += t1 - t0
cur_acc =self._validate2()
if cur_acc>best_acc:
best_acc=cur_acc
acc_str='Best_acc: {:.4f}'.format(best_acc)
print(acc_str)
fitlog.append(acc_str, with_time=True, change_line=True)
# print(f'Pre Epoch. ({ self.local_rank} {t1-t0:.5f}s)')
t2 = time.time()
if lowest_loss > loss.item():
lowest_loss = loss.item()
lowest_loss_at = epoch + 1
# print(f'Loss. ({time.time()-t2:.5f}s)')
if i + 1 == nbatch and self.local_rank == 0:
ptstr = 'Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Best: {:.4f}, Best at: {}'.format( \
epoch + 1, self.epochs, i + 1, nbatch, loss.item(), lowest_loss, lowest_loss_at)
print(ptstr)
fitlog.append(ptstr, with_time=True, change_line=True)
# acc_list.append(acc)
# val_loss_list.append(val_loss.cpu().numpy())
# acc1=np.mean(acc_list)
# val_loss1=np.mean(val_loss_list)
# print("Epoch [{}/{}],acc:{},val_loss:{}".format(epoch + 1, self.epochs,str(acc1), str(val_loss1)),with_time=True, change_line=True)
# fitlog.append("acc:{},val_loss:{}".format(str(acc1), str(val_loss1)),with_time=True, change_line=True)
if loss.item() < self.early_stop_thresh:
early_stop_nreach += 1
else:
early_stop_nreach = 0
# prof.step()
t3 = time.time()
dt[1] += t3 - t2
t4 = time.time()
dt[2] += t4 - t3
# ptstr, acc, val_loss=self.validate()
# fitlog.append(ptstr+",acc:{},val_loss:{}".format(str(acc), str(val_loss)),with_time=True, change_line=True)
# if early_stop_nreach >= self.early_stop_nreach_limit:
# break
if self.local_rank == 0:
fitlog.append(f'Done {self.epochs} Epoch. ({time.time() - ts:.5f}s)', with_time=True, change_line=True)
fitlog.append("Train: {}s, Log: {}s Val: {}s".format(dt[0], dt[1], dt[2]), with_time=True, change_line=True)
fitlog.close()
print(f'Done {self.epochs} Epoch. ({time.time() - ts:.5f}s)')
print("Train: {}s, Log: {}s Val: {}s".format(dt[0], dt[1], dt[2]))
def _validate2(self):
self.model.eval()
self.dataset.set_mode('test')
all_pred = []
all_tar = []
accs = []
all_loss = []
with torch.no_grad():
for i, (ft, labs) in enumerate(self.loader):
ft, labs = ft.to(self.device), labs.to(self.device)
output = self.model(ft)
loss = self.criterion(output, labs)
preds = torch.argmax(output, dim=1).cpu().numpy().tolist()
all_pred.extend(preds)
all_tar.extend(labs.cpu().numpy().tolist())
accs.append(accuracy_score(all_tar, all_pred))
all_loss.append(loss.item())
# if i % 100 == 0:
# print('validating @ batch {}'.format(i))
# if g_dubug:
# break
# ptstr = "Validation ACC: %.4f, loss: %.4f" % (np.mean(accs), np.mean(all_loss))
# self.dataset.set_mode('train')
return np.mean(accs)
# return np.mean(accs), np.mean(all_loss),np.std(all_loss), all_pred, all_tar
def init_ddp(visiable_devices='0,1,2,3,4,5,6,7,8'):
if torch.cuda.device_count() > 1:
os.environ['HIP_VISIBLE_DEVICES'] = visiable_devices
local_rank = int(os.environ["LOCAL_RANK"])
print("local_rank:" + str(local_rank))
#torch.distributed.init_process_group(backend='nccl', init_method='tcp://localhost:23456', rank=0, world_size=1)
torch.distributed.init_process_group(backend="nccl")
# local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
# device = torch.device("cuda", args.local_rank)
return local_rank
else:
return None
if __name__ == "__main__":
local_rank = init_ddp()
driver = LSTMDriverHour(local_rank=local_rank)
driver.load('dat_3day_pcase')
driver.fit()
import datetime
class FitLog:
def __init__(self, folderpath="", fname=None):
if fname == None:
fname = datetime.datetime.now().strftime("%y%m%d%H%M%S" + ".log")
self.fh = open(folderpath + fname, 'w')
def append(self, line, with_time=False, change_line=True):
str2append = ""
if with_time is False:
str2append = line
else:
str2append = str(datetime.datetime.now()) + " " + line
if change_line is True:
str2append += "\r\n"
self.fh.write(str2append)
self.fh.flush()
def close(self):
self.fh.flush()
self.fh.close()
\ No newline at end of file
from torch.utils.data import Dataset
import sys
import dbop
from sklearn.model_selection import train_test_split
import numpy as np
from tool import load_joblib, save_joblib, print_label_sta, linear_soomth
import torch
#import torch_mlu
# import torch_mlu
import re
from torch.nn.utils.rnn import pad_packed_sequence, pad_sequence, pack_padded_sequence
import datetime
import copy
import random
def check_abnormal_in_mat(xmat, desiresize=[697,3,16,8]):
for x in range(0, len(xmat)):
if len(xmat[x]) != desiresize[1]:
print("%d" % x)
for y in range(0, len(xmat[x])):
if len(xmat[x][y]) != desiresize[2]:
print("%d, %d" % (x, y))
for z in range(0, len(xmat[x][y])):
if len(xmat[x][y][z]) != desiresize[3]:
print("%d, %d, %d" % (x, y, z))
class HourDataset(Dataset):
def __init__(self, least_hourlen, random_seed):
self.mode = None
self.least_hourlen = least_hourlen
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
self.spots_aday = 16
self.days_2_include = 3
self.person_day_mode = False
self.random_seed = random_seed
def _case_pass(self, rec):
used_time = int(re.findall("\d+", rec['scale_usd_tm'])[0])
if len(rec['pred_iars']) > self.least_hourlen and used_time > 250:
return True
else:
return False
def _putin_timeslots(self, rec, begdate, enddate):
gap_days = (enddate - begdate).days
vec = []
for i in range(0, gap_days):
for x in range(0, self.spots_aday):#16 hours in a day
vec.append([])
for i in range(0, len(rec['begs'])):
cur_time = datetime.datetime.fromtimestamp(rec['begs'][i])
# print("beg:%s, this:%s, end:%s" % (str(begdate), str(cur_time), str(enddate)))
if begdate < cur_time < enddate:
idx = ((cur_time - begdate).days * self.spots_aday) + (cur_time.hour - 8)
if rec['pred_ival'][i] != None:
vec[idx].extend(rec['pred_ival'][i])#3
vec[idx].extend(rec['pred_pval'][i])#3
else:
vec[idx].extend([None]*3)
vec[idx].extend([None]*3)
vec[idx].append(rec['pred_iars'][i])#1
vec[idx].append(rec['pred_pars'][i])#1
return vec
def _get_lab(self, rec):
score = rec['phq']
if score > 15:
# if score >= 10:
return 1
elif score < 5:
return 0
else:
return -4
# score = rec['dass'][0]
# if score >= 21:
# return 1
# elif score <= 9:
# return 0
# else:
# return -4
def get_type_a_ftlab(self, rec, day_range):
lab = self._get_lab(rec)
if lab != -4 and self._case_pass(rec):
scale_day = datetime.datetime.strptime(rec['rating_daystr'], "%Y%m%d")
scale_day = datetime.datetime(scale_day.year, scale_day.month, scale_day.day, 0,0,0)
beg_day = scale_day + datetime.timedelta(days=(-1 * day_range))
beg_day = datetime.datetime(beg_day.year, beg_day.month, beg_day.day, 0,0,0)
person_vec = self._putin_timeslots(rec, beg_day, scale_day)
return person_vec, lab, rec['uid'], beg_day, scale_day
else:
return None, None, None, None, None
def under_sample(self, X, y):
# npX = np.array(X)
# npy = np.array(y)
# origin_shape = npX.shape
# npX = npX.reshape(origin_shape[0], origin_shape[1]*origin_shape[2]*origin_shape[3])
# feats, labs = RandomUnderSampler().fit_resample(npX, npy)
# feats = feats.reshape(-1, origin_shape[1]*origin_shape[2], origin_shape[3])
sta_dict = {}
indices_dict = {}
for i in range(0, len(y)):
if y[i] not in sta_dict:
sta_dict[y[i]] = 1
indices_dict[y[i]] = [i]
else:
sta_dict[y[i]] += 1
indices_dict[y[i]].append(i)
n_sample = min(sta_dict.values())
balanced_indeces_collection = []
random.seed(self.random_seed)
for one in indices_dict:
balanced_indeces_collection.extend(random.sample(indices_dict[one], n_sample))
feats = []
labs = []
for idx in balanced_indeces_collection:
feats.append(X[balanced_indeces_collection[idx]])
labs.append(y[balanced_indeces_collection[idx]])
return feats, labs
def _check_day_buf(self, daybuf):
n_invalid = 0
for hour in daybuf:
if (len(hour) == 0) or (hour.count(None) > 0):
n_invalid += 1
return n_invalid
def _feats_back_selection(\
self, allfeats, alllabs, alluids, miss_tole=6):
ret_feats = []
ret_labs = []
ret_uids = []
#person, hour, feat
for i in range(0, len(allfeats)):#person
person_buf = []
nday = 0
nrec = 0
daybuf = []
for j in range(len(allfeats[i])-1, -1, -1):#days backward
daybuf.append(allfeats[i][j])
nrec += 1
if nrec % self.spots_aday == 0:#went through a day
n_invalid = self._check_day_buf(daybuf)
if n_invalid < miss_tole:
person_buf.append(daybuf)
nday += 1
if nday >= self.days_2_include:
break
daybuf = []
if nday >= self.days_2_include:
ret_feats.append(person_buf)
ret_labs.append(alllabs[i])
ret_uids.append(alluids[i])
return ret_feats, ret_labs, ret_uids
def _fill_empty_with_mask(self, day_data, ftsetlen=8, mask=-1):
for i in range(0, len(day_data)):#hour
if len(day_data[i]) == 0:
day_data[i] = [mask] * ftsetlen
for j in range(0, len(day_data[i])):#ft
if day_data[i][j] == None:
day_data[i][j] = -1
def load(self, split=True):
print('begins to load!')
day_range = 30
src_coll = dbop.GetMongoCollection('FLP', 'nj_2021_hour')
recs = src_coll.find({})
allfeats = []
alllabs = []
alluids = []
all_dayrange = []
for rec in recs:
feat, lab, uid, begday, endday = self.get_type_a_ftlab(rec, day_range)
if feat != None:
allfeats.append(feat)
alllabs.append(lab)
alluids.append(uid)
all_dayrange.append([begday, endday])
print_label_sta(alllabs)
print("data loaded!")
recs.close()
fbs_feats, fbs_labs, fbs_uids =\
self._feats_back_selection(allfeats, alllabs, alluids)
print('After backward selection')
print_label_sta(fbs_labs)
print("feat shape: " + str(np.array(fbs_feats, dtype=object).shape))
#fill empty with mask
for p1 in fbs_feats:
for p1day1 in p1:
self._fill_empty_with_mask(p1day1)
#linear smooth
for i in range(0, len(fbs_feats)):#i=person
for j in range(0, len(fbs_feats[i])):#j=day
buf = copy.deepcopy(fbs_feats[i][j])
buf_t = np.array(buf).T.tolist()
for k in range(0, len(buf_t)):
buf_t[k] = linear_soomth(buf_t[k])
fbs_feats[i][j] = np.array(buf_t).T.tolist()
# check_abnormal_in_mat(fbs_feats)
us_feats, us_labs = self.under_sample(fbs_feats, fbs_labs)
print('After under sample')
print_label_sta(us_labs)
if split:
self.X_train, self.X_test, self.y_train, self.y_test =\
train_test_split(us_feats, us_labs, test_size=0.3, shuffle=False)
print("Dataset: train: %s, test: %s" % (str(np.array(self.X_train).shape), str(np.array(self.X_test).shape)))
if self.person_day_mode:
new_y_train = []
for oneytr in self.y_train:
new_y_train.extend([oneytr] * self.days_2_include)
self.y_train = new_y_train
new_y_test = []
for oneyte in self.y_test:
new_y_test.extend([oneyte] * self.days_2_include)
self.y_test = new_y_test
self.X_train = np.array(self.X_train)
self.X_train = self.X_train.reshape(-1, self.spots_aday, self.get_input_size())
self.X_test = np.array(self.X_test)
self.X_test = self.X_test.reshape(-1, self.spots_aday, self.get_input_size())
print("p_d_mode: train: %d-%d, test: %d-%d" %\
(len(self.X_train), len(self.y_train), len(self.X_test), len(self.y_test)))
def get_input_size(self):
shape = self.X_train.shape
return shape[len(shape)-1]
def get_time_step(self):
shape = self.X_train.shape
return shape[len(shape)-2]
def savefile(self, path):
save_dict = {'X_train':self.X_train, 'X_test':self.X_test,\
'y_train':self.y_train, 'y_test':self.y_test}
save_joblib(save_dict, path)
def loadfile(self, path):
load_dict = load_joblib(path)
x_test = load_dict['X_test']
y_test = load_dict['y_test']
X_train=load_dict['X_train']
y_train= load_dict['y_train']
self.X_train = np.vstack((X_train, X_train, X_train, X_train, X_train, X_train, X_train, X_train, X_train, X_train,X_train, X_train, X_train, X_train, X_train, X_train, X_train, X_train, X_train, X_train))
self.y_train = np.hstack((y_train, y_train, y_train, y_train, y_train, y_train, y_train, y_train, y_train, y_train,y_train, y_train, y_train, y_train, y_train, y_train, y_train, y_train, y_train, y_train))
# self.X_train=X_train
# self.y_train=y_train
self.X_test = x_test
self.y_test =y_test
print ("train shape:" + str(np.array(self.X_train).shape))
def set_mode(self, mode):
if mode == "train":
self.mode = "train"
elif mode == "test":
self.mode = "test"
def __getitem__(self, index):
ftensor = None
ltensor = None
if self.mode == "train":
npfeat = np.array(self.X_train[index], dtype=np.float32)
ftensor = torch.from_numpy(npfeat).to(torch.float32)
nplab = np.array(self.y_train[index])
ltensor = torch.from_numpy(nplab).to(torch.int64)
elif self.mode == "test":
npfeat = np.array(self.X_test[index])
ftensor = torch.from_numpy(npfeat).to(torch.float32)
nplab = np.array(self.y_test[index])
ltensor = torch.from_numpy(nplab).to(torch.int64)
return ftensor, ltensor
def __len__(self):
if self.mode == "train":
return len(self.X_train)
elif self.mode == "test":
return len(self.X_test)
if __name__ == "__main__":
hd = HourDataset(100)
hd.load()
hd.savefile('dat_3day_pcase_miss3_2')
This source diff could not be displayed because it is too large. You can view the blob instead.
import torch
# import torch_mlu
from torch import dropout, nn
import torchvision.datasets as dsets
# from mlu_device import global_computing_device as g_com
# torch.manual_seed(1)
# Device configuration
class LSTMSimple(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, n_classes, device):
super(LSTMSimple, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, dropout=0.5, batch_first=True)
self.linear = nn.Linear(hidden_size, n_classes)
self.device = device
def forward(self, x):
# x shape (batch, time_step, input_size)
# out shape (batch, time_step, output_size)
# h_n shape (n_layers, batch, hidden_size)
# h_c shape (n_layers, batch, hidden_size)
# 初始化hidden和memory cell参数
hidden0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(self.device)
cell0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(self.device)
# forward propagate lstm
out, (final_hidden, final_cell) = self.lstm(x, (hidden0, cell0))
# 选取最后一个时刻的输出
# out = self.fc(out[:, -1, :])
# print(final_hidden[-1])
return self.linear(out[:, -1, :])
# return out
if __name__ == "__main__":
# FitModel('600182')
# data_form_watch()
pass
\ No newline at end of file
#!/usr/bin/env python3
# Benchmark tool used to collect metrics.
import time
import os
from collections import OrderedDict
import numpy as np
import json
cur_dir = os.path.dirname(os.path.abspath(__file__))
adaptive_strategy_env = os.getenv('MLU_ADAPTIVE_STRATEGY_COUNT')
adaptive_cnt = int(adaptive_strategy_env) if (adaptive_strategy_env
is not None) else 0
def load_metrics_from_config():
required_metrics = {}
with open(cur_dir + "/configs.json", 'r') as f:
configs = json.load(f)
for key, value in configs.items():
if os.getenv(key) is None:
continue
required_metrics[key] = value
return required_metrics
required_metrics = load_metrics_from_config()
def get_platform():
import glob
import fileinput
files_driver = glob.glob("/proc/driver/*/*/*/information")
if not len(files_driver):
return "Unkonw"
finput = fileinput.input(files_driver[0])
for line in finput:
# MLU device information
if "Device name" in line:
return line.split(":")[-1].strip()
# GPU device information
elif "Model" in line:
return line.split(":")[-1].strip()
finput.close()
return "Unkonw"
cur_platform = get_platform()
def get_dataset():
dataset = os.getenv("DATASET_NAME")
if dataset is None:
return "unknow"
return dataset
cur_dataset = get_dataset()
class AggregatorMeter(object):
def __init__(self, *args):
self.meters = []
for meter in args:
self.meters.append(meter)
def update(self, val):
for meter in self.meters:
meter.update(val)
def result(self):
results = [meter.result() for meter in self.meters]
return results
class MaxMeter(object):
def __init__(self, name):
self.name = name
self.max = None
def reset(self):
self.max = None
def update(self, val):
if self.max is None:
self.max = val
else:
self.max = max(self.max, val)
def result(self):
return (self.name, self.max)
def __str__(self):
pass
class AverageMeter(object):
def __init__(self, name):
self.name = name
self.reset()
def reset(self):
self.val = 0
self.count = 0
def update(self, val):
self.count += 1
self.val += val
def result(self):
if self.count == 0:
return None
return (self.name, round(self.val / self.count, 3))
def __str__(self):
pass
class VarianceMeter(object):
def __init__(self, name):
self.name = name
self.reset()
def reset(self):
self.vals = []
self.count = 0
def update(self, val):
self.count += 1
self.vals += [val]
def result(self):
if self.count == 0:
return None
return (self.name, round(np.var(self.vals), 6))
def __str__(self):
pass
class ElapsedTimer(object):
def __init__(self, count_down=0):
self.meter = AggregatorMeter(AverageMeter("batch_time_avg"),
VarianceMeter("batch_time_var"))
self.count_down = count_down
def place(self):
self.last_time_stamp = time.time()
def clock(self):
now = time.time()
elapsed_time = now - self.last_time_stamp
return elapsed_time
def record(self):
elapsed_time = self.clock()
if self.count_down > 0:
self.count_down -= 1
return
self.meter.update(elapsed_time)
def data(self):
return self.meter.result()
class HardwareTimer(object):
def __init__(self, count_down=0):
import torch_mlu
from torch_mlu.core.device.notifier import Notifier
self.meter = AggregatorMeter(AverageMeter("hardware_time_avg"),
VarianceMeter("hardware_time_var"))
self.start_notifier = Notifier()
self.end_notifier = Notifier()
self.count_down = count_down
def place(self):
self.start_notifier.place()
def clock(self):
self.end_notifier.place()
self.end_notifier.synchronize()
# unit of hardware_time should be second.
hardware_time = self.start_notifier.hardware_time(
self.end_notifier) / 1000.0 / 1000.0
return hardware_time
def record(self):
hardware_time = self.clock()
if self.count_down > 0:
self.count_down -= 1
return
self.meter.update(hardware_time)
def data(self):
return self.meter.result()
class MemoryProfiler(object):
def __init__(self):
raise Exception("Not Implemented")
def record(self):
raise Exception("Not Implemented")
class Dumper(object):
def __init__(self, name, save_path, target):
self.name = name
self.save_path = save_path
self.target = target
def exception_handle(self, contents, exception):
"""exception method will be triggered when collected metrics
can not meet the requirements in the config.json"""
if exception == "throughput":
keys = contents.keys()
if "batch_time_avg" not in keys or "cards" not in keys or "batch_size" not in keys:
return "unknow"
batch_time = contents["batch_time_avg"]
cards = contents["cards"]
batch_size = contents["batch_size"]
return round(batch_size / batch_time * cards, 2)
else:
return "unknow"
def dump(self, contents):
from datetime import datetime
date_now = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
output = {"date": date_now}
for target in self.target:
if target not in contents.keys():
output[target] = self.exception_handle(contents, target)
else:
output[target] = contents[target]
output["device"] = cur_platform
output["dataset"] = cur_dataset
with open(self.save_path, 'a') as f:
json.dump(output, f, indent=4)
f.write("\n")
class MetricCollector(object):
def __init__(self,
enable=True,
enable_only_benchmark=False,
enable_only_avglog=False,
record_elapsed_time=False,
record_hardware_time=False,
profile_memory=False):
self._enabled = self.check_enable(enable, enable_only_benchmark,
enable_only_avglog)
if not self._enabled:
return
self.record_elapsed_time = record_elapsed_time
self.record_hardware_time = record_hardware_time
self.profile_memory = profile_memory
self._recorders = []
self._init_recorders()
self._dumpers = []
self._init_dumpers()
self._metrics = OrderedDict()
self._insert_metrics = {}
def check_enable(self, enable, enable_only_benchmark, enable_only_avglog):
"""For that envs like AVG_LOG not appear in user training script, the disablement
of MetricCollecotr should meet the conditions as follows."""
if not enable:
return False
# At least on env(AVG_LOG or BENCHMARK_LOG) in the config.json exsits.
if len(required_metrics) == 0:
return False
# If enable_only_benchmark equals True when construct MetricCollector, there must exists
# env BENCHMARK_LOG, enable_only_avglog the same (hardcode like BENCHMARK_LOG and AVG_LOG is
# not elegant to appear here, but...).
if enable_only_benchmark and os.getenv("BENCHMARK_LOG") is None:
return False
if enable_only_avglog and os.getenv("AVG_LOG") is None:
return False
return True
def _init_recorders(self):
if self.record_elapsed_time:
self._recorders.append(ElapsedTimer(count_down=adaptive_cnt))
if self.record_hardware_time:
self._recorders.append(HardwareTimer(count_down=adaptive_cnt))
if self.profile_memory:
self._recorders.append(MemoryProfiler())
def _init_dumpers(self):
for key, value in required_metrics.items():
file_path = os.getenv(key)
self._dumpers.append(Dumper(key, file_path, value))
def place(self):
"""call all recorder's place method."""
if not self._enabled:
return
for recorder in self._recorders:
recorder.place()
def record(self):
"""call all recorder's record method."""
if not self._enabled:
return
for recorder in self._recorders:
recorder.record()
def __str__(self):
return str(self.get_metrics())
def insert_metrics(self, **kwargs):
if self._enabled:
self._insert_metrics.update(kwargs)
def update_recorder_metrics(self):
for recorder in self._recorders:
result = recorder.data()
if len(result) == 0:
print(
"MetricCollector have not recorded any datas, please ensure \
you have called place() and record() method or check if iters \
lower than adaptive_cnt : {}.".format(adaptive_cnt))
return
if isinstance(result, tuple):
name, val = result
self._metrics[name] = val
elif isinstance(result, list):
for name, val in result:
self._metrics[name] = val
else:
raise "Unknow result type of recorder."
def update_metrics(self):
if not self._enabled:
return
self.update_recorder_metrics()
for key, value in self._insert_metrics.items():
self._metrics[key] = value
def get_metrics(self):
if not self._enabled:
return OrderedDict()
self.update_metrics()
return self._metrics
def dump(self):
if not self._enabled:
return
self.update_metrics()
for dumper in self._dumpers:
dumper.dump(self._metrics)
\ No newline at end of file
# Cambricon PyTorch Model Migration Report
## Cambricon PyTorch Changes
| No. | File | Description |
| 1 | driver.py:3 | add "import torch_mlu" |
| 2 | driver.py:53 | change "self.device = torch.device('cuda', self.local_rank)" to "self.device = torch.device('mlu', self.local_rank) " |
| 3 | driver.py:162 | change "if torch.cuda.device_count() > 1:" to "if torch.mlu.device_count() > 1: " |
| 4 | driver.py:165 | change "torch.distributed.init_process_group(backend="nccl")" to "torch.distributed.init_process_group(backend="cncl") " |
| 5 | driver.py:166 | change "torch.cuda.set_device(local_rank)" to "torch.mlu.set_device(local_rank) " |
| 6 | lstm.py:1 | add "import torch_mlu" |
| 7 | hour_dataset.py:7 | add "import torch_mlu" |
#!/bin/bash
# export BENCHMARK_LOG="./benchmark_log"
export MLU_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
python -m torch.distributed.launch \
--nnodes 2 \
--node_rank 0 \
--master_addr="10.0.1.8" \
--master_port="5432" \
--nproc_per_node 8 \
driver.py
#!/bin/bash
# export BENCHMARK_LOG="./benchmark_log"
export MLU_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
python -m torch.distributed.launch \
--nnodes 2 \
--node_rank 1 \
--master_addr="10.0.1.8" \
--master_port="5432" \
--nproc_per_node 8 \
driver.py
#!/bin/bash
# export BENCHMARK_LOG="./benchmark_log"
export MLU_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
python -m torch.distributed.launch --nproc_per_node 1 driver.py
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment