Unverified Commit bee8f84e authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Merge pull request #174 from microsoft/master

merge master from Microsoft
parents c5acd8c2 252d35e0
......@@ -70,6 +70,18 @@ interface TrialJobInfo {
stderrPath?: string;
}
interface HyperParameterFormat {
parameter_source: string;
parameters: Object;
parameter_id: number;
}
interface ExportedDataFormat {
parameter: Object;
value: Object;
id: string;
}
abstract class DataStore {
public abstract init(): Promise<void>;
public abstract close(): Promise<void>;
......@@ -82,6 +94,8 @@ abstract class DataStore {
public abstract getTrialJob(trialJobId: string): Promise<TrialJobInfo>;
public abstract storeMetricData(trialJobId: string, data: string): Promise<void>;
public abstract getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]>;
public abstract exportTrialHpConfigs(): Promise<string>;
public abstract getImportedData(): Promise<string[]>;
}
abstract class Database {
......@@ -99,5 +113,5 @@ abstract class Database {
export {
DataStore, Database, TrialJobEvent, MetricType, MetricData, TrialJobInfo,
ExperimentProfileRecord, TrialJobEventRecord, MetricDataRecord
ExperimentProfileRecord, TrialJobEventRecord, MetricDataRecord, HyperParameterFormat, ExportedDataFormat
};
......@@ -100,6 +100,7 @@ abstract class Manager {
public abstract getExperimentProfile(): Promise<ExperimentProfile>;
public abstract updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void>;
public abstract importData(data: string): Promise<void>;
public abstract exportData(): Promise<string>;
public abstract addCustomizedTrialJob(hyperParams: string): Promise<void>;
public abstract cancelTrialJobByUser(trialJobId: string): Promise<void>;
......
......@@ -24,7 +24,8 @@ import { Deferred } from 'ts-deferred';
import * as component from '../common/component';
import { Database, DataStore, MetricData, MetricDataRecord, MetricType,
TrialJobEvent, TrialJobEventRecord, TrialJobInfo } from '../common/datastore';
TrialJobEvent, TrialJobEventRecord, TrialJobInfo, HyperParameterFormat,
ExportedDataFormat } from '../common/datastore';
import { NNIError } from '../common/errors';
import { getExperimentId, isNewExperiment } from '../common/experimentStartupInfo';
import { getLogger, Logger } from '../common/log';
......@@ -171,6 +172,61 @@ class NNIDataStore implements DataStore {
return this.db.queryMetricData(trialJobId, metricType);
}
public async exportTrialHpConfigs(): Promise<string> {
const jobs: TrialJobInfo[] = await this.listTrialJobs();
let exportedData: ExportedDataFormat[] = [];
for (const job of jobs) {
if (job.hyperParameters && job.finalMetricData) {
if (job.hyperParameters.length === 1 && job.finalMetricData.length === 1) {
// optimization for non-multi-phase case
const parameters: HyperParameterFormat = <HyperParameterFormat>JSON.parse(job.hyperParameters[0]);
const oneEntry: ExportedDataFormat = {
parameter: parameters.parameters,
value: JSON.parse(job.finalMetricData[0].data),
id: job.id
};
exportedData.push(oneEntry);
} else {
let paraMap: Map<number, Object> = new Map();
let metricMap: Map<number, Object> = new Map();
for (const eachPara of job.hyperParameters) {
const parameters: HyperParameterFormat = <HyperParameterFormat>JSON.parse(eachPara);
paraMap.set(parameters.parameter_id, parameters.parameters);
}
for (const eachMetric of job.finalMetricData) {
const value: Object = JSON.parse(eachMetric.data);
metricMap.set(Number(eachMetric.parameterId), value);
}
paraMap.forEach((value: Object, key: number) => {
const metricValue: Object | undefined = metricMap.get(key);
if (metricValue) {
const oneEntry: ExportedDataFormat = {
parameter: value,
value: metricValue,
id: job.id
};
exportedData.push(oneEntry);
}
});
}
}
}
return JSON.stringify(exportedData);
}
public async getImportedData(): Promise<string[]> {
let importedData: string[] = [];
const importDataEvents: TrialJobEventRecord[] = await this.db.queryTrialJobEvent(undefined, 'IMPORT_DATA');
for (const event of importDataEvents) {
if (event.data) {
importedData.push(event.data);
}
}
return importedData;
}
private async queryTrialJobs(status?: TrialJobStatus, trialJobId?: string): Promise<TrialJobInfo[]> {
const result: TrialJobInfo[] = [];
const trialJobEvents: TrialJobEventRecord[] = await this.db.queryTrialJobEvent(trialJobId);
......
......@@ -58,6 +58,8 @@ class NNIManager implements Manager {
private status: NNIManagerStatus;
private waitingTrials: string[];
private trialJobs: Map<string, TrialJobDetail>;
private trialDataForTuner: string;
private trialJobMetricListener: (metric: TrialJobMetric) => void;
constructor() {
......@@ -69,6 +71,7 @@ class NNIManager implements Manager {
this.dispatcherPid = 0;
this.waitingTrials = [];
this.trialJobs = new Map<string, TrialJobDetail>();
this.trialDataForTuner = '';
this.log = getLogger();
this.dataStore = component.get(DataStore);
......@@ -116,6 +119,10 @@ class NNIManager implements Manager {
return this.dataStore.storeTrialJobEvent('IMPORT_DATA', '', data);
}
public async exportData(): Promise<string> {
return this.dataStore.exportTrialHpConfigs();
}
public addCustomizedTrialJob(hyperParams: string): Promise<void> {
if (this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
return Promise.reject(
......@@ -212,6 +219,16 @@ class NNIManager implements Manager {
.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
.map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.id)));
// Collect generated trials and imported trials
const finishedTrialData: string = await this.exportData();
const importedData: string[] = await this.dataStore.getImportedData();
let trialData: Object[] = JSON.parse(finishedTrialData);
for (const oneImportedData of importedData) {
// do not deduplicate
trialData = trialData.concat(<Object[]>JSON.parse(oneImportedData));
}
this.trialDataForTuner = JSON.stringify(trialData);
if (this.experimentProfile.execDuration < this.experimentProfile.params.maxExecDuration &&
this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum &&
this.experimentProfile.endTime) {
......@@ -647,6 +664,12 @@ class NNIManager implements Manager {
switch (commandType) {
case INITIALIZED:
// Tuner is intialized, search space is set, request tuner to generate hyper parameters
if (this.trialDataForTuner.length > 0) {
if (this.dispatcher === undefined) {
throw new Error('Dispatcher error: tuner has not been setup');
}
this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
}
this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
break;
case NEW_TRIAL_JOB:
......
......@@ -210,6 +210,16 @@ class MockedDataStore implements DataStore {
return result;
}
async exportTrialHpConfigs(): Promise<string> {
const ret: string = '';
return Promise.resolve(ret);
}
async getImportedData(): Promise<string[]> {
const ret: string[] = [];
return Promise.resolve(ret);
}
public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
throw new Error("Method not implemented.");
}
......
......@@ -72,6 +72,7 @@ class NNIRestHandler {
this.addTrialJob(router);
this.cancelTrialJob(router);
this.getMetricData(router);
this.exportData(router);
// Express-joi-validator configuration
router.use((err: any, req: Request, res: Response, next: any) => {
......@@ -261,6 +262,16 @@ class NNIRestHandler {
});
}
private exportData(router: Router): void {
router.get('/export-data', (req: Request, res: Response) => {
this.nniManager.exportData().then((exportedData: string) => {
res.send(exportedData);
}).catch((err: Error) => {
this.handle_error(err, res);
});
});
}
private setErrorPathForFailedJob(jobInfo: TrialJobInfo): TrialJobInfo {
if (jobInfo === undefined || jobInfo.status !== 'FAILED' || jobInfo.logPath === undefined) {
return jobInfo;
......
......@@ -31,10 +31,14 @@ export namespace ValidationSchemas {
passwd: joi.string(),
sshKeyPath: joi.string(),
passphrase: joi.string(),
gpuIndices: joi.string()
gpuIndices: joi.string(),
maxTrialNumPerGpu: joi.number(),
useActiveGpu: joi.boolean()
})),
local_config: joi.object({
gpuIndices: joi.string()
gpuIndices: joi.string(),
maxTrialNumPerGpu: joi.number(),
useActiveGpu: joi.boolean()
}),
trial_config: joi.object({
image: joi.string().min(1),
......
......@@ -49,6 +49,10 @@ export class MockedNNIManager extends Manager {
public importData(data: string): Promise<void> {
return Promise.resolve();
}
public async exportData(): Promise<string> {
const ret: string = '';
return Promise.resolve(ret);
}
public getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
const deferred: Deferred<TrialJobStatistics[]> = new Deferred<TrialJobStatistics[]>();
deferred.resolve([{
......
......@@ -71,14 +71,15 @@ class GPUScheduler {
execScript(gpuMetricsCollectorScriptPath)
}
public getAvailableGPUIndices(): number[] {
public getAvailableGPUIndices(useActiveGpu: boolean, occupiedGpuIndexNumMap: Map<number, number>): number[] {
if (this.gpuSummary !== undefined) {
if(process.platform === 'win32') {
if(process.platform === 'win32' || useActiveGpu) {
return this.gpuSummary.gpuInfos.map((info: GPUInfo) => info.index);
}
else{
return this.gpuSummary.gpuInfos.filter((info: GPUInfo) => info.activeProcessNum === 0)
.map((info: GPUInfo) => info.index);
return this.gpuSummary.gpuInfos.filter((info: GPUInfo) =>
occupiedGpuIndexNumMap.get(info.index) === undefined && info.activeProcessNum === 0 ||
occupiedGpuIndexNumMap.get(info.index) !== undefined).map((info: GPUInfo) => info.index);
}
}
......
......@@ -97,11 +97,19 @@ class LocalTrialJobDetail implements TrialJobDetail {
* Local training service config
*/
class LocalConfig {
public maxTrialNumPerGpu?: number;
public gpuIndices?: string;
constructor(gpuIndices?: string) {
public useActiveGpu?: boolean;
constructor(gpuIndices?: string, maxTrialNumPerGpu?: number, useActiveGpu?: boolean) {
if (gpuIndices !== undefined) {
this.gpuIndices = gpuIndices;
}
if (maxTrialNumPerGpu !== undefined) {
this.maxTrialNumPerGpu = maxTrialNumPerGpu;
}
if (useActiveGpu !== undefined) {
this.useActiveGpu = useActiveGpu;
}
}
}
......@@ -117,13 +125,15 @@ class LocalTrainingService implements TrainingService {
private rootDir!: string;
private trialSequenceId: number;
private gpuScheduler!: GPUScheduler;
private occupiedGpuIndices: Set<number>;
private occupiedGpuIndexNumMap: Map<number, number>;
private designatedGpuIndices!: Set<number>;
private log: Logger;
private localTrailConfig?: TrialConfig;
private localConfig?: LocalConfig;
private isMultiPhase: boolean = false;
private isMultiPhase: boolean;
private jobStreamMap: Map<string, ts.Stream>;
private maxTrialNumPerGpu: number;
private useActiveGpu: boolean;
constructor() {
this.eventEmitter = new EventEmitter();
......@@ -135,7 +145,10 @@ class LocalTrainingService implements TrainingService {
this.trialSequenceId = -1;
this.jobStreamMap = new Map<string, ts.Stream>();
this.log.info('Construct local machine training service.');
this.occupiedGpuIndices = new Set<number>();
this.occupiedGpuIndexNumMap = new Map<number, number>();
this.maxTrialNumPerGpu = 1;
this.useActiveGpu = false;
this.isMultiPhase = false;
}
public async run(): Promise<void> {
......@@ -304,6 +317,13 @@ class LocalTrainingService implements TrainingService {
throw new Error('gpuIndices can not be empty if specified.');
}
}
if (this.localConfig.maxTrialNumPerGpu !== undefined) {
this.maxTrialNumPerGpu = this.localConfig.maxTrialNumPerGpu;
}
if (this.localConfig.useActiveGpu !== undefined) {
this.useActiveGpu = this.localConfig.useActiveGpu;
}
break;
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
......@@ -356,7 +376,14 @@ class LocalTrainingService implements TrainingService {
if (trialJob.gpuIndices !== undefined && trialJob.gpuIndices.length > 0 && this.gpuScheduler !== undefined) {
if (oldStatus === 'RUNNING' && trialJob.status !== 'RUNNING') {
for (const index of trialJob.gpuIndices) {
this.occupiedGpuIndices.delete(index);
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined) {
throw new Error(`gpu resource schedule error`);
} else if(num === 1) {
this.occupiedGpuIndexNumMap.delete(index);
} else {
this.occupiedGpuIndexNumMap.set(index, num - 1)
}
}
}
}
......@@ -396,8 +423,14 @@ class LocalTrainingService implements TrainingService {
return [true, resource];
}
let selectedGPUIndices: number[] = this.gpuScheduler.getAvailableGPUIndices()
.filter((index: number) => !this.occupiedGpuIndices.has(index));
let selectedGPUIndices: number[] = [];
let availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.useActiveGpu, this.occupiedGpuIndexNumMap);
for(let index of availableGpuIndices) {
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined || num < this.maxTrialNumPerGpu) {
selectedGPUIndices.push(index);
}
}
if (this.designatedGpuIndices !== undefined) {
this.checkSpecifiedGpuIndices();
......@@ -428,7 +461,12 @@ class LocalTrainingService implements TrainingService {
private occupyResource(resource: {gpuIndices: number[]}): void {
if (this.gpuScheduler !== undefined) {
for (const index of resource.gpuIndices) {
this.occupiedGpuIndices.add(index);
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined) {
this.occupiedGpuIndexNumMap.set(index, 1)
} else {
this.occupiedGpuIndexNumMap.set(index, num + 1)
}
}
}
}
......
......@@ -23,7 +23,8 @@ import * as assert from 'assert';
import { getLogger, Logger } from '../../common/log';
import { randomSelect } from '../../common/utils';
import { GPUInfo } from '../common/gpuData';
import { parseGpuIndices, RemoteMachineMeta, RemoteMachineScheduleResult, ScheduleResultType, SSHClientManager } from './remoteMachineData';
import { RemoteMachineTrialJobDetail, parseGpuIndices, RemoteMachineMeta, RemoteMachineScheduleResult, ScheduleResultType, SSHClientManager } from './remoteMachineData';
import { TrialJobDetail } from 'common/trainingService';
/**
* A simple GPU scheduler implementation
......@@ -45,7 +46,7 @@ export class GPUScheduler {
* Schedule a machine according to the constraints (requiredGPUNum)
* @param requiredGPUNum required GPU number
*/
public scheduleMachine(requiredGPUNum: number, trialJobId : string) : RemoteMachineScheduleResult {
public scheduleMachine(requiredGPUNum: number, trialJobDetail : RemoteMachineTrialJobDetail) : RemoteMachineScheduleResult {
assert(requiredGPUNum >= 0);
const allRMs: RemoteMachineMeta[] = Array.from(this.machineSSHClientMap.keys());
assert(allRMs.length > 0);
......@@ -66,7 +67,7 @@ export class GPUScheduler {
// Currenty the requireGPUNum parameter for all trial jobs are identical.
if (requiredGPUNum > 0) {
// Trial job requires GPU
const result: RemoteMachineScheduleResult | undefined = this.scheduleGPUHost(requiredGPUNum, trialJobId);
const result: RemoteMachineScheduleResult | undefined = this.scheduleGPUHost(requiredGPUNum, trialJobDetail);
if (result !== undefined) {
return result;
}
......@@ -74,9 +75,9 @@ export class GPUScheduler {
// Trail job does not need GPU
const allocatedRm: RemoteMachineMeta = this.selectMachine(allRMs);
return this.allocateHost(requiredGPUNum, allocatedRm, [], trialJobId);
return this.allocateHost(requiredGPUNum, allocatedRm, [], trialJobDetail);
}
this.log.warning(`Scheduler: trialJob id ${trialJobId}, no machine can be scheduled, return TMP_NO_AVAILABLE_GPU `);
this.log.warning(`Scheduler: trialJob id ${trialJobDetail.id}, no machine can be scheduled, return TMP_NO_AVAILABLE_GPU `);
return {
resultType : ScheduleResultType.TMP_NO_AVAILABLE_GPU,
......@@ -87,21 +88,35 @@ export class GPUScheduler {
/**
* remove the job's gpu reversion
*/
public removeGpuReservation(trialJobId: string, rmMeta?: RemoteMachineMeta): void {
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
if (rmMeta !== undefined && rmMeta.gpuReservation !== undefined) {
rmMeta.gpuReservation.forEach((reserveTrialJobId : string, gpuIndex : number) => {
if (reserveTrialJobId === trialJobId) {
rmMeta.gpuReservation.delete(gpuIndex);
public removeGpuReservation(trialJobId: string, trialJobMap: Map<string, RemoteMachineTrialJobDetail>): void {
let trialJobDetail: RemoteMachineTrialJobDetail | undefined = trialJobMap.get(trialJobId);
if(trialJobDetail === undefined) {
throw new Error(`could not get trialJobDetail by id ${trialJobId}`);
}
if (trialJobDetail.rmMeta !== undefined &&
trialJobDetail.rmMeta.occupiedGpuIndexMap !== undefined &&
trialJobDetail.gpuIndices !== undefined &&
trialJobDetail.gpuIndices.length > 0) {
for (const gpuInfo of trialJobDetail.gpuIndices) {
let num: number | undefined = trialJobDetail.rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if(num !== undefined) {
if(num === 1) {
trialJobDetail.rmMeta.occupiedGpuIndexMap.delete(gpuInfo.index);
} else {
trialJobDetail.rmMeta.occupiedGpuIndexMap.set(gpuInfo.index, num - 1)
}
}
}
});
}
trialJobDetail.gpuIndices = [];
trialJobMap.set(trialJobId, trialJobDetail);
}
private scheduleGPUHost(requiredGPUNum: number, trialJobId: string): RemoteMachineScheduleResult | undefined {
private scheduleGPUHost(requiredGPUNum: number, trialJobDetail: RemoteMachineTrialJobDetail): RemoteMachineScheduleResult | undefined {
const totalResourceMap: Map<RemoteMachineMeta, GPUInfo[]> = this.gpuResourceDetection();
const qualifiedRMs: RemoteMachineMeta[] = [];
totalResourceMap.forEach((gpuInfos: GPUInfo[], rmMeta: RemoteMachineMeta) => {
if (gpuInfos !== undefined && gpuInfos.length >= requiredGPUNum) {
qualifiedRMs.push(rmMeta);
}
......@@ -110,7 +125,7 @@ export class GPUScheduler {
const allocatedRm: RemoteMachineMeta = this.selectMachine(qualifiedRMs);
const gpuInfos: GPUInfo[] | undefined = totalResourceMap.get(allocatedRm);
if (gpuInfos !== undefined) { // should always true
return this.allocateHost(requiredGPUNum, allocatedRm, gpuInfos, trialJobId);
return this.allocateHost(requiredGPUNum, allocatedRm, gpuInfos, trialJobDetail);
} else {
assert(false, 'gpuInfos is undefined');
}
......@@ -130,9 +145,6 @@ export class GPUScheduler {
// Assgin totoal GPU count as init available GPU number
if (rmMeta.gpuSummary !== undefined) {
const availableGPUs: GPUInfo[] = [];
if (rmMeta.gpuReservation === undefined) {
rmMeta.gpuReservation = new Map<number, string>();
}
const designatedGpuIndices: Set<number> | undefined = parseGpuIndices(rmMeta.gpuIndices);
if (designatedGpuIndices !== undefined) {
for (const gpuIndex of designatedGpuIndices) {
......@@ -145,11 +157,21 @@ export class GPUScheduler {
rmMeta.gpuSummary.gpuInfos.forEach((gpuInfo: GPUInfo) => {
// if the GPU has active process, OR be reserved by a job,
// or index not in gpuIndices configuration in machineList,
// or trial number on a GPU reach max number,
// We should NOT allocate this GPU
if (gpuInfo.activeProcessNum === 0 && !rmMeta.gpuReservation.has(gpuInfo.index)
&& (designatedGpuIndices === undefined || designatedGpuIndices.has(gpuInfo.index))) {
// if users set useActiveGpu, use the gpu whether there is another activeProcess
if (designatedGpuIndices === undefined || designatedGpuIndices.has(gpuInfo.index)) {
if(rmMeta.occupiedGpuIndexMap !== undefined) {
let num = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
let maxTrialNumPerGpu: number = rmMeta.maxTrialNumPerGpu? rmMeta.maxTrialNumPerGpu: 1;
if((num === undefined && (!rmMeta.useActiveGpu && gpuInfo.activeProcessNum === 0 || rmMeta.useActiveGpu)) ||
(num !== undefined && num < maxTrialNumPerGpu)) {
availableGPUs.push(gpuInfo);
}
} else {
throw new Error(`occupiedGpuIndexMap initialize error!`);
}
}
});
totalResourceMap.set(rmMeta, availableGPUs);
}
......@@ -170,14 +192,22 @@ export class GPUScheduler {
}
private allocateHost(requiredGPUNum: number, rmMeta: RemoteMachineMeta,
gpuInfos: GPUInfo[], trialJobId: string): RemoteMachineScheduleResult {
gpuInfos: GPUInfo[], trialJobDetail: RemoteMachineTrialJobDetail): RemoteMachineScheduleResult {
assert(gpuInfos.length >= requiredGPUNum);
const allocatedGPUs: GPUInfo[] = this.selectGPUsForTrial(gpuInfos, requiredGPUNum);
allocatedGPUs.forEach((gpuInfo: GPUInfo) => {
rmMeta.gpuReservation.set(gpuInfo.index, trialJobId);
if(rmMeta.occupiedGpuIndexMap !== undefined) {
let num = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if(num === undefined) {
num = 0;
}
rmMeta.occupiedGpuIndexMap.set(gpuInfo.index, num + 1);
}else {
throw new Error(`Machine ${rmMeta.ip} occupiedGpuIndexMap initialize error!`);
}
});
trialJobDetail.gpuIndices = allocatedGPUs;
trialJobDetail.rmMeta = rmMeta;
return {
resultType: ScheduleResultType.SUCCEED,
scheduleInfo: {
......
......@@ -23,7 +23,7 @@ import * as fs from 'fs';
import { Client, ConnectConfig } from 'ssh2';
import { Deferred } from 'ts-deferred';
import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService';
import { GPUSummary } from '../common/gpuData';
import { GPUSummary, GPUInfo } from '../common/gpuData';
/**
* Metadata of remote machine for configuration and statuc query
......@@ -36,20 +36,23 @@ export class RemoteMachineMeta {
public readonly sshKeyPath?: string;
public readonly passphrase?: string;
public gpuSummary : GPUSummary | undefined;
// GPU Reservation info, the key is GPU index, the value is the job id which reserves this GPU
public gpuReservation : Map<number, string>;
public readonly gpuIndices?: string;
public readonly maxTrialNumPerGpu?: number;
public occupiedGpuIndexMap: Map<number, number>;
public readonly useActiveGpu?: boolean = false;
constructor(ip : string, port : number, username : string, passwd : string,
sshKeyPath: string, passphrase : string, gpuIndices?: string) {
sshKeyPath: string, passphrase : string, gpuIndices?: string, maxTrialNumPerGpu?: number, useActiveGpu?: boolean) {
this.ip = ip;
this.port = port;
this.username = username;
this.passwd = passwd;
this.sshKeyPath = sshKeyPath;
this.passphrase = passphrase;
this.gpuReservation = new Map<number, string>();
this.gpuIndices = gpuIndices;
this.maxTrialNumPerGpu = maxTrialNumPerGpu;
this.occupiedGpuIndexMap = new Map<number, number>();
this.useActiveGpu = useActiveGpu;
}
}
......@@ -97,6 +100,7 @@ export class RemoteMachineTrialJobDetail implements TrialJobDetail {
public sequenceId: number;
public rmMeta?: RemoteMachineMeta;
public isEarlyStopped?: boolean;
public gpuIndices: GPUInfo[];
constructor(id: string, status: TrialJobStatus, submitTime: number,
workingDirectory: string, form: JobApplicationForm, sequenceId: number) {
......@@ -107,6 +111,7 @@ export class RemoteMachineTrialJobDetail implements TrialJobDetail {
this.form = form;
this.sequenceId = sequenceId;
this.tags = [];
this.gpuIndices = []
}
}
......
......@@ -282,7 +282,7 @@ class RemoteMachineTrainingService implements TrainingService {
private updateGpuReservation() {
for (const [key, value] of this.trialJobsMap) {
if(!['WAITING', 'RUNNING'].includes(value.status)) {
this.gpuScheduler.removeGpuReservation(value.id, value.rmMeta);
this.gpuScheduler.removeGpuReservation(key, this.trialJobsMap);
}
};
}
......@@ -521,7 +521,7 @@ class RemoteMachineTrainingService implements TrainingService {
return deferred.promise;
}
// get an ssh client from scheduler
const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobId);
const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobDetail);
if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
const errorMessage : string = `Required GPU number ${this.trialConfig.gpuNum} is too large, no machine can meet`;
this.log.error(errorMessage);
......@@ -542,6 +542,7 @@ class RemoteMachineTrainingService implements TrainingService {
trialJobDetail.url = `file://${rmScheduleInfo.rmMeta.ip}:${trialWorkingFolder}`;
trialJobDetail.startTime = Date.now();
this.trialJobsMap.set(trialJobId, trialJobDetail);
deferred.resolve(true);
} else if (rmScheduleResult.resultType === ScheduleResultType.TMP_NO_AVAILABLE_GPU) {
this.log.info(`Right now no available GPU can be allocated for trial ${trialJobId}, will try to schedule later`);
......
......@@ -22,11 +22,7 @@ batch_tuner.py including:
class BatchTuner
"""
import copy
from enum import Enum, unique
import random
import numpy as np
import logging
import nni
from nni.tuner import Tuner
......@@ -35,6 +31,7 @@ TYPE = '_type'
CHOICE = 'choice'
VALUE = '_value'
logger = logging.getLogger('batch_tuner_AutoML')
class BatchTuner(Tuner):
"""
......@@ -88,8 +85,8 @@ class BatchTuner(Tuner):
----------
parameter_id : int
"""
self.count +=1
if self.count>len(self.values)-1:
self.count += 1
if self.count > len(self.values) - 1:
raise nni.NoMoreTrialError('no more parameters now.')
return self.values[self.count]
......@@ -97,4 +94,31 @@ class BatchTuner(Tuner):
pass
def import_data(self, data):
pass
"""Import additional data for tuning
Parameters
----------
data:
a list of dictionarys, each of which has at least two keys, 'parameter' and 'value'
"""
if len(self.values) == 0:
logger.info("Search space has not been initialized, skip this data import")
return
self.values = self.values[(self.count+1):]
self.count = -1
_completed_num = 0
for trial_info in data:
logger.info("Importing data, current processing progress %s / %s", _completed_num, len(data))
# simply validate data format
assert "parameter" in trial_info
_params = trial_info["parameter"]
assert "value" in trial_info
_value = trial_info['value']
if not _value:
logger.info("Useless trial data, value is %s, skip this trial data.", _value)
continue
_completed_num += 1
if _params in self.values:
self.values.remove(_params)
logger.info("Successfully import data to batch tuner, total data: %d, imported data: %d.", len(data), _completed_num)
# Naive Evolution Tuner
## Naive Evolution(进化算法)
进化算法来自于 [Large-Scale Evolution of Image Classifiers](https://arxiv.org/pdf/1703.01041.pdf)。 它会基于搜索空间随机生成一个种群。 在每一代中,会选择较好的结果,并对其下一代进行一些变异(例如,改动一个超参,增加或减少一层)。 进化算法需要很多次 Trial 才能有效,但它也非常简单,也很容易扩展新功能。
\ No newline at end of file
# Grid Search
## Grid Search(遍历搜索)
Grid Search 会穷举定义在搜索空间文件中的所有超参组合。 注意,搜索空间仅支持 `choice`, `quniform`, `qloguniform``quniform``qloguniform` 中的 **数字 `q` 有不同的含义(与[搜索空间](../../../../../docs/zh_CN/SearchSpaceSpec.md)说明不同)。 这里的意义是在 `low``high` 之间均匀取值的数量。</p>
\ No newline at end of file
# NNI 中使用 Hyperband
## 1. 介绍
[Hyperband](https://arxiv.org/pdf/1603.06560.pdf) 是一种流行的自动机器学习算法。 Hyperband 的基本思想是对配置分组,每组有 `n` 个随机生成的超参配置,每个配置使用 `r` 次资源(如,epoch 数量,批处理数量等)。 当 `n` 个配置完成后,会选择最好的 `n/eta` 个配置,并增加 `r*eta` 次使用的资源。 最后,会选择出的最好配置。
## 2. 实现并行
首先,此样例是基于 MsgDispatcherBase 来实现的自动机器学习算法,而不是基于 Tuner 和Assessor。 这种实现方法下,Hyperband 集成了 Tuner 和 Assessor 两者的功能,因而将它叫做 Advisor。
其次,本实现完全利用了 Hyperband 内部的并行性。 具体来说,下一个分组不会严格的在当前分组结束后再运行,只要有资源,就可以开始运行新的分组。
## 3. 用法
要使用 Hyperband,需要在 Experiment 的 YAML 配置文件进行如下改动。
advisor:
#可选项: Hyperband
builtinAdvisorName: Hyperband
classArgs:
#R: 最大的步骤
R: 100
#eta: 丢弃的 Trial 的比例
eta: 3
#可选项: maximize, minimize
optimize_mode: maximize
注意,一旦使用了 Advisor,就不能在配置文件中添加 Tuner 和 Assessor。 使用 Hyperband 时,Trial 代码收到的超参(如键值对)中,除了用户定义的超参,会多一个 `STEPS`**使用 `STEPS`,Trial 能够控制其运行的时间。</p>
对于 Trial 代码中 `report_intermediate_result(metric)``report_final_result(metric)`**`指标` 应该是数值,或者用一个 dict,并保证其中有键值为 default 的项目,其值也为数值型**。 这是需要进行最大化或者最小化优化的数值,如精度或者损失度。
`R``eta` 是 Hyperband 中可以改动的参数。 `R` 表示可以分配给配置的最大步数(STEPS)。 这里,STEPS 可以代表 epoch 或 批处理数量。 `STEPS` 应该被 Trial 代码用来控制运行的次数。 参考样例 `examples/trials/mnist-hyperband/` ,了解详细信息。
`eta` 表示 `n` 个配置中的 `n/eta` 个配置会留存下来,并用更多的 STEPS 来运行。
下面是 `R=81``eta=3` 时的样例:
| | s=4 | s=3 | s=2 | s=1 | s=0 |
| - | ---- | ---- | ---- | ---- | ---- |
| i | n r | n r | n r | n r | n r |
| 0 | 81 1 | 27 3 | 9 9 | 6 27 | 5 81 |
| 1 | 27 3 | 9 9 | 3 27 | 2 81 | |
| 2 | 9 9 | 3 27 | 1 81 | | |
| 3 | 3 27 | 1 81 | | | |
| 4 | 1 81 | | | | |
`s` 表示分组, `n` 表示生成的配置数量,相应的 `r` 表示配置会运行多少 STEPS。 `i` 表示轮数,如分组 4 有 5 轮,分组 3 有 4 轮。
关于如何实现 Trial 代码,参考 `examples/trials/mnist-hyperband/` 中的说明。
## 4. 待改进
当前实现的 Hyperband 算法可以通过改进支持的提前终止算法来提高,原因是最好的 `n/eta` 个配置并不一定都表现很好。 不好的配置可以更早的终止。
在当前实现中,遵循了[此论文](https://arxiv.org/pdf/1603.06560.pdf)的设计,配置都是随机生成的。 要进一步提升,配置生成过程可以利用更高级的算法。
\ No newline at end of file
# TPE, Random Search, Anneal Tuners
## TPE
Tree-structured Parzen Estimator (TPE) 是一种 sequential model-based optimization(SMBO,即基于序列模型优化)的方法。 SMBO 方法根据历史指标数据来按顺序构造模型,来估算超参的性能,随后基于此模型来选择新的超参。 TPE 方法对 P(x|y) 和 P(y) 建模,其中 x 表示超参,y 表示相关的评估指标。 P(x|y) 通过变换超参的生成过程来建模,用非参数密度(non-parametric densities)代替配置的先验分布。 细节可参考 [Algorithms for Hyper-Parameter Optimization](https://papers.nips.cc/paper/4443-algorithms-for-hyper-parameter-optimization.pdf)。 ​
## Random Search(随机搜索)
[Random Search for Hyper-Parameter Optimization](http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf) 中介绍了随机搜索惊人的简单和效果。 建议当不清楚超参的先验分布时,采用随机搜索作为基准。
## Anneal(退火算法)
这种简单的退火算法从先前的采样开始,会越来越靠近发现的最佳点取样。 此算法是随机搜索的简单变体,利用了响应面的平滑性。 退火率不是自适应的。
\ No newline at end of file
......@@ -153,14 +153,14 @@ def _add_index(in_x, parameter):
Will change to format in hyperopt, like:
{'dropout_rate': 0.8, 'conv_size': {'_index': 1, '_value': 3}, 'hidden_size': {'_index': 1, '_value': 512}}
"""
if TYPE not in in_x: # if at the top level
if NodeType.TYPE not in in_x: # if at the top level
out_y = dict()
for key, value in parameter.items():
out_y[key] = _add_index(in_x[key], value)
return out_y
elif isinstance(in_x, dict):
value_type = in_x[TYPE]
value_format = in_x[VALUE]
value_type = in_x[NodeType.TYPE]
value_format = in_x[NodeType.VALUE]
if value_type == "choice":
choice_name = parameter[0] if isinstance(parameter,
list) else parameter
......@@ -173,15 +173,14 @@ def _add_index(in_x, parameter):
choice_value_format = item[1]
if choice_key == choice_name:
return {
INDEX:
pos,
VALUE: [
NodeType.INDEX: pos,
NodeType.VALUE: [
choice_name,
_add_index(choice_value_format, parameter[1])
]
}
elif choice_name == item:
return {INDEX: pos, VALUE: item}
return {NodeType.INDEX: pos, NodeType.VALUE: item}
else:
return parameter
......
# Medianstop Assessor
## Median Stop
Medianstop 是一种简单的提前终止 Trial 的策略,可参考[论文](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/46180.pdf)。 如果 Trial X 的在步骤 S 的最好目标值比所有已完成 Trial 的步骤 S 的中位数值明显要低,这个 Trial 就会被提前停止。
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment