Unverified Commit ee6b149d authored by QuanluZhang's avatar QuanluZhang Committed by GitHub
Browse files

Dev exp stop more (#221)

* Exp stop refactor (#161)

* Update RemoteMachineMode.md (#63)

* Remove unused classes for SQuAD QA example.

* Remove more unused functions for SQuAD QA example.

* Fix default dataset config.

* Add Makefile README (#64)

* update document (#92)

* Edit readme.md

* updated a word

* Update GetStarted.md

* Update GetStarted.md

* refact readme, getstarted and write your trial md.

* Update README.md

* Update WriteYourTrial.md

* Update WriteYourTrial.md

* Update WriteYourTrial.md

* Update WriteYourTrial.md

* Fix nnictl bugs and add new feature (#75)

* fix nnictl bug

* fix nnictl create bug

* add experiment status logic

* add more information for nnictl

* fix Evolution Tuner bug

* refactor code

* fix code in updater.py

* fix nnictl --help

* fix classArgs bug

* update check response.status_code logic

* remove Buffer warning (#100)

* update readme in ga_squad

* update readme

* fix typo

* Update README.md

* Update README.md

* Update README.md

* Add support for debugging mode

* fix setup.py (#115)

* Add DAG model configuration format for SQuAD example.

* Explain config format for SQuAD QA model.

* Add more detailed introduction about the evolution algorithm.

* Fix install.sh add add trial log path (#109)

* fix nnictl bug

* fix nnictl create bug

* add experiment status logic

* add more information for nnictl

* fix Evolution Tuner bug

* refactor code

* fix code in updater.py

* fix nnictl --help

* fix classArgs bug

* update check response.status_code logic

* show trial log path

* update document

* fix install.sh

* set default vallue for maxTrialNum and maxExecDuration

* fix nnictl

* Dev smac (#116)

* support package install (#91)

* fix nnictl bug

* support package install

* update

* update package install logic

* Fix package install issue (#95)

* fix nnictl bug

* fix pakcage install

* support SMAC as a tuner on nni (#81)

* update doc

* update doc

* update doc

* update hyperopt installation

* update doc

* update doc

* update description in setup.py

* update setup.py

* modify encoding

* encoding

* add encoding

* remove pymc3

* update doc

* update builtin tuner spec

* support smac in sdk, fix logging issue

* support smac tuner

* add optimize_mode

* update config in nnictl

* add __init__.py

* update smac

* update import path

* update setup.py: remove entry_point

* update rest server validation

* fix bug in nnictl launcher

* support classArgs: optimize_mode

* quick fix bug

* test travis

* add dependency

* add dependency

* add dependency

* add dependency

* create smac python package

* fix trivial points

* optimize import of tuners, modify nnictl accordingly

* fix bug: incorrect algorithm_name

* trivial refactor

* for debug

* support virtual

* update doc of SMAC

* update smac requirements

* update requirements

* change debug mode

* update doc

* update doc

* refactor based on comments

* fix comments

* modify example config path to relative path and increase maxTrialNum (#94)

* modify example config path to relative path and increase maxTrialNum

* add document

* support conda (#90) (#110)

* support install from venv and travis CI

* support install from venv and travis CI

* support install from venv and travis CI

* support conda

* support conda

* modify example config path to relative path and increase maxTrialNum

* undo messy commit

* undo messy commit

* Support pip install as root (#77)

* Typo on #58 (#122)

* PAI Training Service implementation (#128)

* PAI Training service implementation
**1. Implement PAITrainingService
**2. Add trial-keeper python module, and modify setup.py to install the module
**3. Add PAItrainingService rest server to collect metrics from PAI container.

* fix datastore for multiple final result (#129)

* Update NNI v0.2 release notes (#132)

Update NNI v0.2 release notes

* Update setup.py Makefile and documents (#130)

* update makefile and setup.py

* update makefile and setup.py

* update document

* update document

* Update Makefile no travis

*  update doc

*  update doc

* fix convert from ss to pcs (#133)

* Fix bugs about webui (#131)

* Fix webui bugs

* Fix tslint

* webui logpath and document (#135)

* Add webui document and logpath as a href

* fix tslint

* fix comments by Chengmin

* Pai training service bug fix and enhancement (#136)

* Add NNI installation scripts

* Update pai script, update NNI_out_dir

* Update NNI dir in nni sdk local.py

* Create .nni folder in nni sdk local.py

* Add check before creating .nni folder

* Fix typo for PAI_INSTALL_NNI_SHELL_FORMAT

* Improve annotation (#138)

* Improve annotation

* Minor bugfix

* Selectively install through pip (#139)

Selectively install through pip 
* update setup.py

* fix paiTrainingService bugs (#137)

* fix nnictl bug

* add hdfs host validation

* fix bugs

* fix dockerfile

* fix install.sh

* update install.sh

* fix dockerfile

* Set timeout for HDFSUtility exists function

* remove unused TODO

* fix sdk

* add optional for outputDir and dataDir

* refactor dockerfile.base

* Remove unused import in hdfsclientUtility

* Add documentation for NNI PAI mode experiment (#141)

* Add documentation for NNI PAI mode

* Fix typo based on PR comments

* Exit with subprocess return code of trial keeper

* Remove additional exit code

* Fix typo based on PR comments

* update doc for smac tuner (#140)

* Revert "Selectively install through pip (#139)" due to potential pip install issue (#142)

* Revert "Selectively install through pip (#139)"

This reverts commit 1d174836.

* Add exit code of subprocess for trial_keeper

* Update README, add link to PAImode doc

* Merge branch V0.2 to Master (#143)

* webui logpath and document (#135)

* Add webui document and logpath as a href

* fix tslint

* fix comments by Chengmin

* Pai training service bug fix and enhancement (#136)

* Add NNI installation scripts

* Update pai script, update NNI_out_dir

* Update NNI dir in nni sdk local.py

* Create .nni folder in nni sdk local.py

* Add check before creating .nni folder

* Fix typo for PAI_INSTALL_NNI_SHELL_FORMAT

* Improve annotation (#138)

* Improve annotation

* Minor bugfix

* Selectively install through pip (#139)

Selectively install through pip 
* update setup.py

* fix paiTrainingService bugs (#137)

* fix nnictl bug

* add hdfs host validation

* fix bugs

* fix dockerfile

* fix install.sh

* update install.sh

* fix dockerfile

* Set timeout for HDFSUtility exists function

* remove unused TODO

* fix sdk

* add optional for outputDir and dataDir

* refactor dockerfile.base

* Remove unused import in hdfsclientUtility

* Add documentation for NNI PAI mode experiment (#141)

* Add documentation for NNI PAI mode

* Fix typo based on PR comments

* Exit with subprocess return code of trial keeper

* Remove additional exit code

* Fix typo based on PR comments

* update doc for smac tuner (#140)

* Revert "Selectively install through pip (#139)" due to potential pip install issue (#142)

* Revert "Selectively install through pip (#139)"

This reverts commit 1d174836.

* Add exit code of subprocess for trial_keeper

* Update README, add link to PAImode doc

* fix bug (#147)

* Refactor nnictl and add config_pai.yml (#144)

* fix nnictl bug

* add hdfs host validation

* fix bugs

* fix dockerfile

* fix install.sh

* update install.sh

* fix dockerfile

* Set timeout for HDFSUtility exists function

* remove unused TODO

* fix sdk

* add optional for outputDir and dataDir

* refactor dockerfile.base

* Remove unused import in hdfsclientUtility

* add config_pai.yml

* refactor nnictl create logic and add colorful print

* fix nnictl stop logic

* add annotation for config_pai.yml

* add document for start experiment

* fix config.yml

* fix document

* Fix trial keeper wrongly exit issue (#152)

* Fix trial keeper bug, use actual exitcode to exit rather than 1

* Fix bug of table sort (#145)

* Update doc for PAIMode and v0.2 release notes (#153)

* Update v0.2 documentation regards to release note and PAI training service

* Update document to describe NNI docker image

* fix antd (#159)

* refactor experiment stopping logic

* support change concurrency

* remove trialJobs.ts

* trivial changes

* fix bugs

* fix bug

* support updating maxTrialNum

* Modify IT scripts for supporting multiple experiments

* Update ci (#175)

* Update RemoteMachineMode.md (#63)

* Remove unused classes for SQuAD QA example.

* Remove more unused functions for SQuAD QA example.

* Fix default dataset config.

* Add Makefile README (#64)

* update document (#92)

* Edit readme.md

* updated a word

* Update GetStarted.md

* Update GetStarted.md

* refact readme, getstarted and write your trial md.

* Update README.md

* Update WriteYourTrial.md

* Update WriteYourTrial.md

* Update WriteYourTrial.md

* Update WriteYourTrial.md

* Fix nnictl bugs and add new feature (#75)

* fix nnictl bug

* fix nnictl create bug

* add experiment status logic

* add more information for nnictl

* fix Evolution Tuner bug

* refactor code

* fix code in updater.py

* fix nnictl --help

* fix classArgs bug

* update check response.status_code logic

* remove Buffer warning (#100)

* update readme in ga_squad

* update readme

* fix typo

* Update README.md

* Update README.md

* Update README.md

* Add support for debugging mode

* modify CI cuz of refracting exp stop

* update CI for expstop

* update CI for expstop

* update CI for expstop

* update CI for expstop

* update CI for expstop

* update CI for expstop

* update CI for expstop

* update CI for expstop

* update CI for expstop

* file saving

* fix issues from code merge

* remove $(INSTALL_PREFIX)/nni/nni_manager before install

* fix indent

* fix merge issue

* socket close

* update port

* fix merge error

* modify ci logic in nnimanager

* fix ci

* fix bug

* change suspended to done

* update ci (#229)

* update ci

* update ci

* update ci (#232)

* update ci

* update ci

* update azure-pipelines

* update azure-pipelines

* update ci (#233)

* update ci

* update ci

* update azure-pipelines

* update azure-pipelines

* update azure-pipelines

* run.py (#238)

* Nnupdate ci (#239)

* run.py

* test ci

* Nnupdate ci (#240)

* run.py

* test ci

* test ci

* Udci (#241)

* run.py

* test ci

* test ci

* test ci

* update ci (#242)

* run.py

* test ci

* test ci

* test ci

* update ci

* revert install.sh (#244)

* run.py

* test ci

* test ci

* test ci

* update ci

* revert install.sh

* add comments

* remove assert

* trivial change

* trivial change
parent b183c3d8
......@@ -186,6 +186,7 @@ install-python-modules:
install-node-modules:
mkdir -p $(INSTALL_PREFIX)/nni
rm -rf src/nni_manager/dist/node_modules
rm -rf $(INSTALL_PREFIX)/nni/nni_manager
#$(_INFO) Installing NNI Manager $(_END)
cp -rT src/nni_manager/dist $(INSTALL_PREFIX)/nni/nni_manager
......
......@@ -9,10 +9,10 @@ steps:
- script: python3 -m pip install --upgrade pip setuptools
displayName: 'Install python tools'
- script: |
make easy-install
export PATH=$HOME/.nni/bin:$PATH
source install.sh
displayName: 'Install dependencies'
- script: |
cd test/naive
PATH=$HOME/.local/nni/node/bin:$PATH python3 run.py
export PATH=$HOME/.local/bin:$PATH
python3 run.py
displayName: 'Run tests'
......@@ -22,7 +22,7 @@
import { MetricDataRecord, MetricType, TrialJobInfo } from './datastore';
import { TrialJobStatus } from './trainingService';
type ProfileUpdateType = 'TRIAL_CONCURRENCY' | 'MAX_EXEC_DURATION' | 'SEARCH_SPACE';
type ProfileUpdateType = 'TRIAL_CONCURRENCY' | 'MAX_EXEC_DURATION' | 'SEARCH_SPACE' | 'MAX_TRIAL_NUM';
interface ExperimentParams {
authorName: string;
......@@ -73,7 +73,7 @@ interface TrialJobStatistics {
}
interface NNIManagerStatus {
status: 'INITIALIZED' | 'EXPERIMENT_RUNNING' | 'ERROR' | 'STOPPING' | 'STOPPED';
status: 'INITIALIZED' | 'EXPERIMENT_RUNNING' | 'ERROR' | 'STOPPING' | 'STOPPED' | 'DONE';
errors: string[];
}
......
......@@ -41,7 +41,6 @@ import {
REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE
} from './commands';
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
import { TrialJobMaintainerEvent, TrialJobs } from './trialJobs';
/**
* NNIManager
......@@ -49,23 +48,28 @@ import { TrialJobMaintainerEvent, TrialJobs } from './trialJobs';
class NNIManager implements Manager {
private trainingService: TrainingService;
private dispatcher: IpcInterface | undefined;
private trialJobsMaintainer: TrialJobs | undefined;
private currSubmittedTrialNum: number; // need to be recovered
private trialConcurrencyReduction: number;
private trialConcurrencyChange: number; // >0: increase, <0: decrease
private customizedTrials: string[]; // need to be recovered
private log: Logger;
private dataStore: DataStore;
private experimentProfile: ExperimentProfile;
private dispatcherPid: number;
private status: NNIManagerStatus;
private waitingTrials: string[];
private trialJobs: Map<string, TrialJobDetail>;
private suspendDuration: number;
constructor() {
this.currSubmittedTrialNum = 0;
this.trialConcurrencyReduction = 0;
this.trialConcurrencyChange = 0;
this.customizedTrials = [];
this.trainingService = component.get(TrainingService);
assert(this.trainingService);
this.dispatcherPid = 0;
this.waitingTrials = [];
this.trialJobs = new Map<string, TrialJobDetail>();
this.suspendDuration = 0;
this.log = getLogger();
this.dataStore = component.get(DataStore);
......@@ -87,6 +91,9 @@ class NNIManager implements Manager {
case 'SEARCH_SPACE':
this.updateSearchSpace(experimentProfile.params.searchSpace);
break;
case 'MAX_TRIAL_NUM':
this.updateMaxTrialNum(experimentProfile.params.maxTrialNum);
break;
default:
throw new Error('Error: unrecognized updateType');
}
......@@ -207,13 +214,8 @@ class NNIManager implements Manager {
public stopExperiment(): Promise<void> {
this.status.status = 'STOPPING';
if (this.trialJobsMaintainer !== undefined) {
this.trialJobsMaintainer.setStopLoop();
return Promise.resolve();
} else {
return Promise.reject(new Error('Error: undefined trialJobsMaintainer'));
}
}
public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
......@@ -267,28 +269,14 @@ class NNIManager implements Manager {
}
private updateTrialConcurrency(trialConcurrency: number): void {
// TO DO: this method can only be called after startExperiment/resumeExperiment
if (trialConcurrency > this.experimentProfile.params.trialConcurrency) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has to be initialized');
}
this.dispatcher.sendCommand(
REQUEST_TRIAL_JOBS,
String(trialConcurrency - this.experimentProfile.params.trialConcurrency)
);
} else {
// we assume trialConcurrency >= 0, which is checked by restserver
this.trialConcurrencyReduction += (this.experimentProfile.params.trialConcurrency - trialConcurrency);
}
this.trialConcurrencyChange += (trialConcurrency - this.experimentProfile.params.trialConcurrency);
this.experimentProfile.params.trialConcurrency = trialConcurrency;
return;
}
private updateMaxExecDuration(duration: number): void {
if (this.trialJobsMaintainer !== undefined) {
this.trialJobsMaintainer.updateMaxExecDuration(duration);
}
this.experimentProfile.params.maxExecDuration = duration;
return;
......@@ -304,6 +292,12 @@ class NNIManager implements Manager {
return;
}
private updateMaxTrialNum(maxTrialNum: number): void {
this.experimentProfile.params.maxTrialNum = maxTrialNum;
return;
}
private async experimentDoneCleanUp(): Promise<void> {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
......@@ -346,11 +340,142 @@ class NNIManager implements Manager {
const execDuration: number = this.experimentProfile.execDuration;
for (; ;) {
await delay(1000 * 60 * 10); // 10 minutes
this.experimentProfile.execDuration = execDuration + (Date.now() - startTime) / 1000;
this.experimentProfile.execDuration = execDuration + (Date.now() - startTime) / 1000 - this.suspendDuration;
await this.storeExperimentProfile();
}
}
private async requestTrialJobsStatus(): Promise<number> {
const deferred: Deferred<number> = new Deferred<number>();
let finishedTrialJobNum: number = 0;
for (const trialJobId of Array.from(this.trialJobs.keys())) {
const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
//assert(oldTrialJobDetail);
if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail.url);
}
switch (trialJobDetail.status) {
case 'SUCCEEDED':
case 'USER_CANCELED':
this.trialJobs.delete(trialJobId);
finishedTrialJobNum++;
break;
case 'FAILED':
case 'SYS_CANCELED':
// In the current version, we do not retry
// TO DO: push this job to queue for retry
this.trialJobs.delete(trialJobId);
finishedTrialJobNum++;
break;
case 'WAITING':
case 'RUNNING':
case 'UNKNOWN':
// Do nothing
break;
default:
// TO DO: add warning in log
}
}
deferred.resolve(finishedTrialJobNum);
return deferred.promise;
}
private async manageTrials(): Promise<void> {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
}
let allFinishedTrialJobNum: number = 0;
const startTime: number = Date.now();
let suspendStartTime: number = 0;
for (; ;) {
if (this.status.status === 'STOPPING') {
break;
}
const finishedTrialJobNum: number = await this.requestTrialJobsStatus();
allFinishedTrialJobNum += finishedTrialJobNum;
if (allFinishedTrialJobNum >= this.experimentProfile.params.maxTrialNum) {
// write this log for travis CI
this.log.info('Experiment done.');
}
// requestTrialNum is the number of trials that will be requested from tuner.
// If trialConcurrency does not change, requestTrialNum equals finishedTrialJobNum.
// If trialConcurrency changes, for example, trialConcurrency increases by 2 (trialConcurrencyChange=2), then
// requestTrialNum equals 2 + finishedTrialJobNum and trialConcurrencyChange becomes 0.
// If trialConcurrency changes, for example, trialConcurrency decreases by 4 (trialConcurrencyChange=-4) and
// finishedTrialJobNum is 2, then requestTrialNum becomes -2. No trial will be requested from tuner,
// and trialConcurrencyChange becomes -2.
const requestTrialNum: number = this.trialConcurrencyChange + finishedTrialJobNum;
if (requestTrialNum >= 0) {
this.trialConcurrencyChange = 0;
} else {
this.trialConcurrencyChange = requestTrialNum;
}
for (let i: number = 0; i < requestTrialNum; i++) {
// ask tuner for more trials
if (this.customizedTrials.length > 0) {
const hyperParams: string | undefined = this.customizedTrials.shift();
this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, hyperParams);
} else {
this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
}
}
// check maxtrialnum and maxduration here
if ((Date.now() - startTime) / 1000 + this.experimentProfile.execDuration - this.suspendDuration
> this.experimentProfile.params.maxExecDuration ||
this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
assert(this.status.status === 'EXPERIMENT_RUNNING' || this.status.status === 'DONE');
if (this.status.status === 'EXPERIMENT_RUNNING') {
suspendStartTime = Date.now();
}
this.status.status = 'DONE';
} else {
if (this.status.status === 'DONE') {
assert(suspendStartTime !== 0);
this.suspendDuration += (Date.now() - suspendStartTime) / 1000;
}
this.status.status = 'EXPERIMENT_RUNNING';
for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
if (this.waitingTrials.length === 0 ||
this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
break;
}
const hyperParams: string | undefined = this.waitingTrials.shift();
if (hyperParams === undefined) {
throw new Error(`Error: invalid hyper-parameters for job submission: ${hyperParams}`);
}
this.currSubmittedTrialNum++;
const trialJobAppForm: TrialJobApplicationForm = {
jobType: 'TRIAL',
hyperParameters: {
value: hyperParams,
index: 0
}
};
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm);
this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
if (trialJobDetailSnapshot != undefined) {
await this.dataStore.storeTrialJobEvent(
trialJobDetailSnapshot.status, trialJobDetailSnapshot.id, hyperParams, trialJobDetailSnapshot.url);
} else {
assert(false, `undefined trialJobDetail in trialJobs: ${trialJobDetail.id}`);
}
}
}
await delay(1000 * 5); // 5 seconds
}
this.log.info('Experiment done, cleaning up...');
await this.experimentDoneCleanUp();
this.log.info('Experiment done.');
}
private storeExperimentProfile(): Promise<void> {
this.experimentProfile.revision += 1;
......@@ -358,12 +483,7 @@ class NNIManager implements Manager {
}
private async run(): Promise<void> {
this.trialJobsMaintainer = new TrialJobs(
this.trainingService,
this.experimentProfile.execDuration,
this.experimentProfile.params.maxExecDuration);
assert(this.dispatcher !== undefined && this.trialJobsMaintainer !== undefined);
assert(this.dispatcher !== undefined);
this.addEventListeners();
......@@ -374,14 +494,14 @@ class NNIManager implements Manager {
this.trainingService.run().catch((err: Error) => {
throw new NNIError('Training service error', `Training service error: ${err.message}`, err);
}),
this.trialJobsMaintainer.run().catch((err: Error) => {
throw new NNIError('Job maintainer error', `Job maintainer error: ${err.message}`, err);
this.manageTrials().catch((err: Error) => {
throw new NNIError('Job management error', `Job management error: ${err.message}`, err);
})]);
}
private addEventListeners(): void {
// TO DO: cannot run this method more than once in one NNIManager instance
if (this.dispatcher === undefined || this.trialJobsMaintainer === undefined) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner or job maintainer have not been setup');
}
this.trainingService.addTrialJobMetricListener((metric: TrialJobMetric) => {
......@@ -390,12 +510,6 @@ class NNIManager implements Manager {
});
});
this.trialJobsMaintainer.on(async (event: TrialJobMaintainerEvent, trialJobDetail: TrialJobDetail) => {
this.onTrialJobEvent(event, trialJobDetail).catch((err: Error) => {
this.criticalError(new NNIError('Trial job event error', `Trial job event error: ${err.message}`, err));
});
});
this.dispatcher.onCommand((commandType: string, content: string) => {
this.onTunerCommand(commandType, content).catch((err: Error) => {
this.criticalError(new NNIError('Tuner command event error', `Tuner command event error: ${err.message}`, err));
......@@ -410,9 +524,6 @@ class NNIManager implements Manager {
// TO DO: we should send INITIALIZE command to tuner if user's tuner needs to run init method in tuner
this.log.debug(`Send tuner command: update search space: ${this.experimentProfile.params.searchSpace}`);
this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, this.experimentProfile.params.searchSpace);
if (this.trialConcurrencyReduction !== 0) {
throw new Error('Error: cannot modify trialConcurrency before startExperiment');
}
this.log.debug(`Send tuner command: ${this.experimentProfile.params.trialConcurrency}`);
this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(this.experimentProfile.params.trialConcurrency));
}
......@@ -425,77 +536,11 @@ class NNIManager implements Manager {
this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
}
private async onTrialJobEvent(event: TrialJobMaintainerEvent, trialJobDetail: TrialJobDetail): Promise<void> {
if (trialJobDetail !== undefined) {
this.log.debug(`Job event: ${event}, id: ${trialJobDetail.id}`);
} else {
this.log.debug(`Job event: ${event}`);
}
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
}
switch (event) {
case 'SUCCEEDED':
case 'FAILED':
case 'USER_CANCELED':
case 'SYS_CANCELED':
if (this.trialConcurrencyReduction > 0) {
this.trialConcurrencyReduction--;
} else {
if (this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum) {
if (this.customizedTrials.length > 0) {
const hyperParams: string | undefined = this.customizedTrials.shift();
this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, hyperParams);
} else {
this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
}
}
}
this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: event}));
await this.dataStore.storeTrialJobEvent(event, trialJobDetail.id, undefined, trialJobDetail.url);
break;
case 'RUNNING':
await this.dataStore.storeTrialJobEvent(event, trialJobDetail.id, undefined, trialJobDetail.url);
break;
case 'EXPERIMENT_DONE':
this.log.info('Experiment done, cleaning up...');
await this.experimentDoneCleanUp();
this.log.info('Experiment done.');
break;
default:
throw new Error('Error: unrecognized event from trialJobsMaintainer');
}
}
private async onTunerCommand(commandType: string, content: string): Promise<void> {
this.log.info(`Command from tuner: ${commandType}, ${content}`);
if (this.trialJobsMaintainer === undefined) {
throw new Error('Error: trialJobsMaintainer not initialized');
}
switch (commandType) {
case NEW_TRIAL_JOB:
if (this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum) {
this.currSubmittedTrialNum++;
const trialJobAppForm: TrialJobApplicationForm = {
jobType: 'TRIAL',
hyperParameters: {
value: content,
index: 0
}
};
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm);
this.trialJobsMaintainer.setTrialJob(trialJobDetail.id, Object.assign({}, trialJobDetail));
const jobDetailSnapshot: TrialJobDetail | undefined = this.trialJobsMaintainer.getTrialJob(trialJobDetail.id);
if (jobDetailSnapshot !== undefined) {
await this.dataStore.storeTrialJobEvent(
jobDetailSnapshot.status, jobDetailSnapshot.id, content, jobDetailSnapshot.url);
} else {
assert(false, `undefined jobdetail in job maintainer: ${trialJobDetail.id}`);
}
if (this.currSubmittedTrialNum === this.experimentProfile.params.maxTrialNum) {
this.trialJobsMaintainer.setNoMoreTrials();
}
}
this.waitingTrials.push(content);
break;
case SEND_TRIAL_JOB_PARAMETER:
const tunerCommand: any = JSON.parse(content);
......@@ -514,7 +559,8 @@ class NNIManager implements Manager {
'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
break;
case NO_MORE_TRIAL_JOBS:
this.trialJobsMaintainer.setNoMoreTrials();
//this.trialJobsMaintainer.setNoMoreTrials();
// ignore this event for now
break;
case KILL_TRIAL_JOB:
await this.trainingService.cancelTrialJob(JSON.parse(content));
......
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as assert from 'assert';
import { EventEmitter } from 'events';
import { TrainingService, TrialJobDetail, TrialJobStatus } from '../common/trainingService';
import { delay } from '../common/utils';
type TrialJobMaintainerEvent = TrialJobStatus | 'EXPERIMENT_DONE';
/**
* TrialJobs
*/
class TrialJobs {
private eventEmitter: EventEmitter;
private trialJobs: Map<string, TrialJobDetail>;
private noMoreTrials: boolean;
private stopLoop: boolean;
private trainingService: TrainingService;
private pastExecDuration: number; // second
private maxExecDuration: number; // second
constructor(
trainingService: TrainingService,
pastExecDuration: number, // second
maxExecDuration: number // second
) {
this.eventEmitter = new EventEmitter();
this.trialJobs = new Map<string, TrialJobDetail>();
this.noMoreTrials = false;
this.stopLoop = false;
this.trainingService = trainingService;
this.pastExecDuration = pastExecDuration;
this.maxExecDuration = maxExecDuration;
}
public setTrialJob(key: string, value: TrialJobDetail): void {
this.trialJobs.set(key, value);
}
public getTrialJob(key: string): TrialJobDetail | undefined {
return this.trialJobs.get(key);
}
public setNoMoreTrials(): void {
this.noMoreTrials = true;
}
public setStopLoop(): void {
this.stopLoop = true;
}
public updateMaxExecDuration(duration: number): void {
this.maxExecDuration = duration;
}
public on(listener: (event: TrialJobMaintainerEvent, trialJobDetail: TrialJobDetail) => void): void {
this.eventEmitter.addListener('all', listener);
}
public async requestTrialJobsStatus(): Promise<void> {
for (const trialJobId of Array.from(this.trialJobs.keys())) {
const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
switch (trialJobDetail.status) {
case 'SUCCEEDED':
case 'USER_CANCELED':
this.eventEmitter.emit('all', trialJobDetail.status, trialJobDetail);
this.trialJobs.delete(trialJobId);
break;
case 'FAILED':
case 'SYS_CANCELED':
// In the current version, we do not retry
// TO DO: push this job to queue for retry
this.eventEmitter.emit('all', trialJobDetail.status, trialJobDetail);
this.trialJobs.delete(trialJobId);
break;
case 'WAITING':
// Do nothing
break;
case 'RUNNING':
const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
assert(oldTrialJobDetail);
if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status === "WAITING") {
this.trialJobs.set(trialJobId, trialJobDetail);
this.eventEmitter.emit('all', trialJobDetail.status, trialJobDetail);
}
break;
case 'UNKNOWN':
// Do nothing
break;
default:
// TO DO: add warning in log
}
}
return Promise.resolve();
}
public async run(): Promise<void> {
const startTime: number = Date.now();
while ((Date.now() - startTime) / 1000 + this.pastExecDuration < this.maxExecDuration) {
if (this.stopLoop ||
(this.noMoreTrials && this.trialJobs.size === 0)) {
break;
}
await this.requestTrialJobsStatus();
await delay(5000);
}
this.eventEmitter.emit('all', 'EXPERIMENT_DONE');
}
}
export { TrialJobs, TrialJobMaintainerEvent };
......@@ -86,7 +86,7 @@ export namespace ValidationSchemas {
};
export const UPDATEEXPERIMENT = {
query: {
update_type: joi.string().required().valid('TRIAL_CONCURRENCY', 'MAX_EXEC_DURATION', 'SEARCH_SPACE')
update_type: joi.string().required().valid('TRIAL_CONCURRENCY', 'MAX_EXEC_DURATION', 'SEARCH_SPACE', 'MAX_TRIAL_NUM')
},
body: {
id: joi.string().required(),
......
__pycache__
tuner_search_space.json
tuner_result.txt
assessor_result.txt
\ No newline at end of file
## Usage
* To test before installing:
`./run.py --preinstall`
* To test the integrity of installation:
`./run.py`
* It will print `PASS` in green eventually if everything works well.
## Details
* This test case tests the communication between trials and tuner/assessor.
* The naive trials receive an integer `x` as parameter, and reports `x`, `x²`, `x³`, ... , `x¹⁰` as metrics.
* The naive tuner simply generates the sequence of natural numbers, and print received metrics to `tuner_result.txt`.
* The naive assessor kills trials when `sum(metrics) % 11 == 1`, and print killed trials to `assessor_result.txt`.
* When tuner and assessor exit with exception, they will append `ERROR` to corresponding result file.
* When the experiment is done, meaning it is successfully done in this case, `Experiment done` can be detected in the nni_manager.log file.
## Issues
* Private APIs are used to detect whether tuner and assessor have terminated successfully.
* The output of REST server is not tested.
* Remote machine training service is not tested.
\ No newline at end of file
......@@ -2,4 +2,3 @@
6 60466176
9 3486784401
10 10000000000
DONE
import logging
import os
from nni.assessor import Assessor, AssessResult
_logger = logging.getLogger('NaiveAssessor')
_logger.info('start')
_result = open('/tmp/nni_assessor_result.txt', 'w')
_pwd = os.path.dirname(__file__)
_result = open(os.path.join(_pwd, 'assessor_result.txt'), 'w')
class NaiveAssessor(Assessor):
def __init__(self, optimize_mode):
......@@ -30,7 +33,6 @@ class NaiveAssessor(Assessor):
return AssessResult.Good
def _on_exit(self):
_result.write('DONE\n')
_result.close()
def _on_error(self):
......
import json
import logging
import os
from nni.tuner import Tuner
_logger = logging.getLogger('NaiveTuner')
_logger.info('start')
_result = open('/tmp/nni_tuner_result.txt', 'w')
_pwd = os.path.dirname(__file__)
_result = open(os.path.join(_pwd, 'tuner_result.txt'), 'w')
class NaiveTuner(Tuner):
def __init__(self, optimize_mode):
......@@ -24,11 +27,10 @@ class NaiveTuner(Tuner):
def update_search_space(self, search_space):
_logger.info('update_search_space: %s' % search_space)
with open('/tmp/nni_tuner_search_space.json', 'w') as file_:
with open(os.path.join(_pwd, 'tuner_search_space.json'), 'w') as file_:
json.dump(search_space, file_)
def _on_exit(self):
_result.write('DONE\n')
_result.close()
def _on_error(self):
......
......@@ -4,6 +4,8 @@ import contextlib
import json
import os
import subprocess
import requests
import sys
import time
import traceback
......@@ -11,75 +13,109 @@ GREEN = '\33[32m'
RED = '\33[31m'
CLEAR = '\33[0m'
def read_last_line(file_name):
class Integration_test():
def __init__(self):
self.experiment_url = 'http://localhost:8080/api/v1/nni/experiment'
self.experiment_id = None
self.experiment_done_signal = '"Experiment done"'
def read_last_line(self, file_name):
try:
*_, last_line = open(file_name)
return last_line.strip()
except (FileNotFoundError, ValueError):
return None
def run():
os.environ['PATH'] = os.environ['PATH'] + ':' + os.environ['PWD']
def fetch_experiment_config(self):
experiment_profile = requests.get(self.experiment_url)
self.experiment_id = json.loads(experiment_profile.text)['id']
self.experiment_path = os.path.join(os.environ['HOME'], 'nni/experiments', self.experiment_id)
self.nnimanager_log_path = os.path.join(self.experiment_path, 'log', 'nnimanager.log')
def check_experiment_status(self):
assert os.path.exists(self.nnimanager_log_path), 'Experiment starts failed'
cmds = ['cat', self.nnimanager_log_path, '|', 'grep', self.experiment_done_signal]
completed_process = subprocess.run(' '.join(cmds), shell = True)
return completed_process.returncode == 0
def remove_files(self, file_list):
for file_path in file_list:
with contextlib.suppress(FileNotFoundError):
os.remove('tuner_search_space.txt')
with contextlib.suppress(FileNotFoundError):
os.remove('tuner_result.txt')
with contextlib.suppress(FileNotFoundError):
os.remove('/tmp/nni_assessor_result.txt')
os.remove(file_path)
def run(self, installed = True):
if not installed:
os.environ['PATH'] = os.environ['PATH'] + ':' + os.environ['PWD']
sdk_path = os.path.abspath('../../src/sdk/pynni')
cmd_path = os.path.abspath('../../tools')
pypath = os.environ.get('PYTHONPATH')
if pypath:
pypath = ':'.join([pypath, sdk_path, cmd_path])
else:
pypath = ':'.join([sdk_path, cmd_path])
os.environ['PYTHONPATH'] = pypath
to_remove = ['tuner_search_space.json', 'tuner_result.txt', 'assessor_result.txt']
self.remove_files(to_remove)
proc = subprocess.run(['nnictl', 'create', '--config', 'local.yml'])
assert proc.returncode == 0, '`nnictl create` failed with code %d' % proc.returncode
print('Spawning trials...')
time.sleep(1)
self.fetch_experiment_config()
current_trial = 0
for _ in range(60):
time.sleep(1)
tuner_status = read_last_line('/tmp/nni_tuner_result.txt')
assessor_status = read_last_line('/tmp/nni_assessor_result.txt')
tuner_status = self.read_last_line('tuner_result.txt')
assessor_status = self.read_last_line('assessor_result.txt')
experiment_status = self.check_experiment_status()
assert tuner_status != 'ERROR', 'Tuner exited with error'
assert assessor_status != 'ERROR', 'Assessor exited with error'
if tuner_status == 'DONE' and assessor_status == 'DONE':
if experiment_status:
break
if tuner_status is not None:
for line in open('/tmp/nni_tuner_result.txt'):
if line.strip() in ('DONE', 'ERROR'):
for line in open('tuner_result.txt'):
if line.strip() == 'ERROR':
break
trial = int(line.split(' ')[0])
if trial > current_trial:
current_trial = trial
print('Trial #%d done' % trial)
subprocess.run(['nnictl', 'log', 'stderr'])
assert tuner_status == 'DONE' and assessor_status == 'DONE', 'Failed to finish in 1 min'
assert experiment_status, 'Failed to finish in 1 min'
ss1 = json.load(open('search_space.json'))
ss2 = json.load(open('/tmp/nni_tuner_search_space.json'))
ss2 = json.load(open('tuner_search_space.json'))
assert ss1 == ss2, 'Tuner got wrong search space'
tuner_result = set(open('/tmp/nni_tuner_result.txt'))
tuner_result = set(open('tuner_result.txt'))
expected = set(open('expected_tuner_result.txt'))
# Trials may complete before NNI gets assessor's result,
# so it is possible to have more final result than expected
assert tuner_result.issuperset(expected), 'Bad tuner result'
assessor_result = set(open('/tmp/nni_assessor_result.txt'))
assessor_result = set(open('assessor_result.txt'))
expected = set(open('expected_assessor_result.txt'))
assert assessor_result == expected, 'Bad assessor result'
if __name__ == '__main__':
installed = (sys.argv[-1] != '--preinstall')
ci = Integration_test()
try:
run()
ci.run(installed)
# TODO: check the output of rest server
print(GREEN + 'PASS' + CLEAR)
except Exception as error:
print(RED + 'FAIL' + CLEAR)
print('%r' % error)
traceback.print_exc()
raise error
sys.exit(1)
finally:
subprocess.run(['nnictl', 'stop'])
......@@ -21,7 +21,7 @@
import argparse
from .launcher import create_experiment, resume_experiment
from .updater import update_searchspace, update_concurrency, update_duration
from .updater import update_searchspace, update_concurrency, update_duration, update_trialnum
from .nnictl_utils import *
from .package_management import *
from .constants import *
......
......@@ -51,6 +51,8 @@ def get_query_type(key):
return '?update_type=MAX_EXEC_DURATION'
if key == 'searchSpace':
return '?update_type=SEARCH_SPACE'
if key == 'maxTrialNum':
return '?update_type=MAX_TRIAL_NUM'
def update_experiment_profile(args, key, value):
'''call restful server to update experiment profile'''
......@@ -91,3 +93,9 @@ def update_duration(args):
else:
print('ERROR: update %s failed!' % 'duration')
def update_trialnum(args):
validate_digit(args.value, 1, 999999999)
if update_experiment_profile('maxTrialNum', int(args.value)):
print('INFO: update %s success!' % 'trialnum')
else:
print('ERROR: update %s failed!' % 'trialnum')
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment