Commit 8314d6ee authored by Deshui Yu's avatar Deshui Yu Committed by fishyds
Browse files

Merge from dogfood branch to master

parent 98530fd2
#!/bin/bash
INSTALL_PREFIX=${HOME}/.local
mkdir -p ${INSTALL_PREFIX}
wget -4 -nc https://nodejs.org/dist/v10.9.0/node-v10.9.0-linux-x64.tar.xz --header "Referer: nodejs.org"
tar -xf 'node-v10.9.0-linux-x64.tar.xz'
sudo cp -rf node-v10.9.0-linux-x64/* /usr/local/node/
cp -rT node-v10.9.0-linux-x64 ${INSTALL_PREFIX}/node
rm -rf node-v10.9.0-linux-x64*
wget -4 -nc https://github.com/yarnpkg/yarn/releases/download/v1.9.4/yarn-v1.9.4.tar.gz
tar -xf 'yarn-v1.9.4.tar.gz'
sudo cp -rf yarn-v1.9.4/* /usr/local/yarn/
cp -rT yarn-v1.9.4 ${INSTALL_PREFIX}/yarn
rm -rf yarn-v1.9.4*
export PATH=/usr/local/node/bin:/usr/local/yarn/bin:$PATH
NODE_BIN=${INSTALL_PREFIX}/node/bin
YARN_BIN=${INSTALL_PREFIX}/yarn/bin
export PATH=${INSTALL_PREFIX}/node/bin:${INSTALL_PREFIX}/yarn/bin:$PATH
echo $PATH|grep -q ${NODE_BIN} || echo "export PATH=${NODE_BIN}:\${PATH}" >> ${HOME}/.bashrc
echo $PATH|grep -q ${YARN_BIN} || echo "export PATH=${YARN_BIN}:\${PATH}" >> ${HOME}/.bashrc
source ${HOME}/.bashrc
make
sudo make install
make install
\ No newline at end of file
# Copyright (c) Microsoft Corporation. All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
# associated documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish, distribute,
# sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
# OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ==================================================================================================
import os
from setuptools import setup, find_packages
from setuptools.command.install import install
from subprocess import Popen
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
class CustomInstallCommand(install):
'''a customized install class in pip module'''
def makeInstall(self):
'''execute make pip-install command'''
cmds = ['make', 'pip-install']
process = Popen(cmds)
if process.wait() != 0:
print('Error: Make Install Failed')
exit(-1)
def writeEnvironmentVariables(self, variable_name):
'''write an environment variable into ~/.bashrc'''
paths = os.getenv("PATH").split(':')
bin_path = os.path.join(os.getenv('HOME'),'.local/'+variable_name+'/bin')
if bin_path not in paths:
bashrc_path = os.path.join(os.getenv('HOME'), '.bashrc')
process = Popen('echo export PATH=' + bin_path + ':\$PATH >> ' + bashrc_path, shell=True)
if process.wait() != 0:
print('Error: Write Environment Variables Failed')
exit(-1)
def run(self):
install.run(self)
self.makeInstall()
self.writeEnvironmentVariables('node')
self.writeEnvironmentVariables('yarn')
setup(
name = 'NNI',
version = '0.0.1',
author = 'Microsoft NNI Team',
author_email = 'nni@microsoft.com',
description = 'Neural Network Intelligence project',
long_description = read('docs/NNICTLDOC.md'),
license = 'MIT',
url = 'https://msrasrg.visualstudio.com/NeuralNetworkIntelligence',
packages = find_packages('src/sdk/pynni', exclude=['tests']) + find_packages('tools'),
package_dir = {
'annotation': 'tools/annotation',
'nni': 'src/sdk/pynni/nni',
'nnicmd': 'tools/nnicmd'
},
python_requires = '>=3.5',
install_requires = [
'astor',
'json_tricks',
'numpy',
'psutil',
'pymc3',
'pyyaml',
'requests',
'scipy'
],
dependency_links = [
'git+https://github.com/hyperopt/hyperopt.git',
],
cmdclass={
'install': CustomInstallCommand
},
entry_points={
'console_scripts': ['nnictl = nnicmd.nnictl:parse_args']
}
)
......@@ -32,16 +32,22 @@ interface ExperimentParams {
maxTrialNum: number;
searchSpace: string;
tuner: {
tunerCommand: string;
tunerCwd: string;
tunerCheckpointDirectory: string;
tunerGpuNum?: number;
className: string;
builtinTunerName?: string;
codeDir?: string;
classArgs?: any;
classFileName?: string;
checkpointDir: string;
gpuNum?: number;
};
assessor?: {
assessorCommand: string;
assessorCwd: string;
assessorCheckpointDirectory: string;
assessorGpuNum?: number;
className: string;
builtinAssessorName?: string;
codeDir?: string;
classArgs?: any;
classFileName?: string;
checkpointDir: string;
gpuNum?: number;
};
clusterMetaData?: {
key: string;
......
......@@ -105,6 +105,8 @@ abstract class TrainingService {
public abstract addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void;
public abstract removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void;
public abstract submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail>;
public abstract updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail>;
public abstract get isMultiPhaseJobSupported(): boolean;
public abstract cancelTrialJob(trialJobId: string): Promise<void>;
public abstract setClusterMetadata(key: string, value: string): Promise<void>;
public abstract getClusterMetadata(key: string): Promise<string>;
......
......@@ -28,7 +28,7 @@ import { Container } from 'typescript-ioc';
import * as util from 'util';
import { Database, DataStore } from './datastore';
import { ExperimentStartupInfo, setExperimentStartupInfo, getExperimentId } from './experimentStartupInfo';
import { ExperimentStartupInfo, getExperimentId, setExperimentStartupInfo } from './experimentStartupInfo';
import { Manager } from './manager';
import { TrainingService } from './trainingService';
......@@ -127,6 +127,63 @@ function parseArg(names: string[]): string {
return '';
}
/**
* Generate command line to start advisor process which runs tuner and assessor
* @param tuner : For builtin tuner:
* {
* className: 'EvolutionTuner'
* classArgs: {
* optimize_mode: 'maximize',
* population_size: 3
* }
* }
* customized:
* {
* codeDir: '/tmp/mytuner'
* classFile: 'best_tuner.py'
* className: 'BestTuner'
* classArgs: {
* optimize_mode: 'maximize',
* population_size: 3
* }
* }
*
* @param assessor: similiar as tuner
*
*/
function getMsgDispatcherCommand(tuner: any, assessor: any): string {
let command: string = `python3 -m nni --tuner_class_name ${tuner.className}`;
if (tuner.classArgs !== undefined) {
command += ` --tuner_args ${JSON.stringify(JSON.stringify(tuner.classArgs))}`;
}
if (tuner.codeDir !== undefined && tuner.codeDir.length > 1) {
command += ` --tuner_directory ${tuner.codeDir}`;
}
if (tuner.classFileName !== undefined && tuner.classFileName.length > 1) {
command += ` --tuner_class_filename ${tuner.classFileName}`;
}
if (assessor !== undefined && assessor.className !== undefined) {
command += ` --assessor_class_name ${assessor.className}`;
if (assessor.classArgs !== undefined) {
command += ` --assessor_args ${JSON.stringify(JSON.stringify(assessor.classArgs))}`;
}
if (assessor.codeDir !== undefined && assessor.codeDir.length > 1) {
command += ` --assessor_directory ${assessor.codeDir}`;
}
if (assessor.classFileName !== undefined && assessor.classFileName.length > 1) {
command += ` --assessor_class_filename ${assessor.classFileName}`;
}
}
return command;
}
/**
* Initialize a pseudo experiment environment for unit test.
* Must be paired with `cleanupUnitTest()`.
......@@ -161,5 +218,5 @@ function cleanupUnitTest(): void {
Container.restore(ExperimentStartupInfo);
}
export { getLogDir, getExperimentRootDir, getDefaultDatabaseDir, mkDirP, delay, prepareUnitTest,
export { getMsgDispatcherCommand, getLogDir, getExperimentRootDir, getDefaultDatabaseDir, mkDirP, delay, prepareUnitTest,
parseArg, cleanupUnitTest, uniqueString };
......@@ -135,16 +135,8 @@ class IpcInterface {
* Create IPC proxy for tuner process
* @param process_ the tuner process
*/
function createTunerInterface(process: ChildProcess): IpcInterface {
return new IpcInterface(process, CommandType.TUNER_COMMANDS);
function createDispatcherInterface(process: ChildProcess): IpcInterface {
return new IpcInterface(process, new Set([...CommandType.TUNER_COMMANDS, ...CommandType.ASSESSOR_COMMANDS]));
}
/**
* Create IPC proxy for assessor process
* @param process_ the assessor process
*/
function createAssessorInterface(process: ChildProcess): IpcInterface {
return new IpcInterface(process, CommandType.ASSESSOR_COMMANDS);
}
export { IpcInterface, createTunerInterface, createAssessorInterface };
export { IpcInterface, createDispatcherInterface };
......@@ -185,6 +185,9 @@ class NNIDataStore implements DataStore {
// assume data is stored by time ASC order
for (const record of trialJobEvents) {
let jobInfo: TrialJobInfo | undefined;
if (record.trialJobId === undefined || record.trialJobId.length < 1) {
continue;
}
if (map.has(record.trialJobId)) {
jobInfo = map.get(record.trialJobId);
} else {
......
......@@ -34,12 +34,12 @@ import {
import {
TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../common/trainingService';
import { delay , getLogDir} from '../common/utils';
import { delay , getLogDir, getMsgDispatcherCommand} from '../common/utils';
import {
ADD_CUSTOMIZED_TRIAL_JOB, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, REPORT_METRIC_DATA,
REQUEST_TRIAL_JOBS, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE
} from './commands';
import { createAssessorInterface, createTunerInterface, IpcInterface } from './ipcInterface';
import { createDispatcherInterface, IpcInterface } from './ipcInterface';
import { TrialJobMaintainerEvent, TrialJobs } from './trialJobs';
/**
......@@ -47,8 +47,7 @@ import { TrialJobMaintainerEvent, TrialJobs } from './trialJobs';
*/
class NNIManager implements Manager {
private trainingService: TrainingService;
private tuner: IpcInterface | undefined;
private assessor: IpcInterface | undefined;
private dispatcher: IpcInterface | undefined;
private trialJobsMaintainer: TrialJobs | undefined;
private currSubmittedTrialNum: number; // need to be recovered
private trialConcurrencyReduction: number;
......@@ -56,9 +55,7 @@ class NNIManager implements Manager {
private log: Logger;
private dataStore: DataStore;
private experimentProfile: ExperimentProfile;
// TO DO: could use struct here
private tunerPid: number;
private assessorPid: number;
private dispatcherPid: number;
constructor() {
this.currSubmittedTrialNum = 0;
......@@ -67,8 +64,7 @@ class NNIManager implements Manager {
const experimentId: string = getExperimentId();
this.trainingService = component.get(TrainingService);
assert(this.trainingService);
this.tunerPid = 0;
this.assessorPid = 0;
this.dispatcherPid = 0;
this.log = getLogger();
this.dataStore = component.get(DataStore);
......@@ -84,9 +80,9 @@ class NNIManager implements Manager {
maxTrialNum: 0, // maxTrialNum includes all the submitted trial jobs
searchSpace: '',
tuner: {
tunerCommand: '',
tunerCwd: '',
tunerCheckpointDirectory: ''
className: '',
classArgs: {},
checkpointDir: ''
}
}
};
......@@ -134,21 +130,15 @@ class NNIManager implements Manager {
this.experimentProfile.params = expParams;
await this.storeExperimentProfile();
this.log.debug('Setup tuner...');
const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor);
console.log(`dispatcher command: ${dispatcherCommand}`);
this.setupTuner(
expParams.tuner.tunerCommand,
expParams.tuner.tunerCwd,
//expParams.tuner.tunerCommand,
dispatcherCommand,
undefined,
'start',
expParams.tuner.tunerCheckpointDirectory);
if (expParams.assessor !== undefined) {
this.log.debug('Setup assessor...');
this.setupAssessor(
expParams.assessor.assessorCommand,
expParams.assessor.assessorCwd,
'start',
expParams.assessor.assessorCheckpointDirectory
);
}
expParams.tuner.checkpointDir);
this.experimentProfile.startTime = new Date();
await this.storeExperimentProfile();
......@@ -164,20 +154,13 @@ class NNIManager implements Manager {
this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
const expParams: ExperimentParams = this.experimentProfile.params;
const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor);
console.log(`dispatcher command: ${dispatcherCommand}`);
this.setupTuner(
expParams.tuner.tunerCommand,
expParams.tuner.tunerCwd,
dispatcherCommand,
undefined,
'resume',
expParams.tuner.tunerCheckpointDirectory);
if (expParams.assessor !== undefined) {
this.setupAssessor(
expParams.assessor.assessorCommand,
expParams.assessor.assessorCwd,
'resume',
expParams.assessor.assessorCheckpointDirectory
);
}
expParams.tuner.checkpointDir);
const allTrialJobs: TrialJobInfo[] = await this.dataStore.listTrialJobs();
......@@ -204,7 +187,7 @@ class NNIManager implements Manager {
// TO DO: move timeout value to constants file
const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
timeoutId = setTimeout(
() => { reject(new Error('TrainingService setClusterMetadata timeout.')); },
() => { reject(new Error('TrainingService setClusterMetadata timeout. Please check your config file.')); },
10000);
});
await Promise.race([delay1, this.trainingService.setClusterMetadata(key, value)]).finally(() => {
......@@ -248,8 +231,8 @@ class NNIManager implements Manager {
return this.dataStore.listTrialJobs(status);
}
private setupTuner(command: string, cwd: string, mode: 'start' | 'resume', dataDirectory: string): void {
if (this.tuner !== undefined) {
private setupTuner(command: string, cwd: string | undefined, mode: 'start' | 'resume', dataDirectory: string): void {
if (this.dispatcher !== undefined) {
return;
}
const stdio: (string | NodeJS.WriteStream)[] = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
......@@ -270,36 +253,8 @@ class NNIManager implements Manager {
},
shell: true
});
this.tunerPid = tunerProc.pid;
this.tuner = createTunerInterface(tunerProc);
return;
}
private setupAssessor(command: string, cwd: string, mode: 'start' | 'resume', dataDirectory: string): void {
if (this.assessor !== undefined) {
return;
}
const stdio: (string | NodeJS.WriteStream)[] = ['ignore', process.stdout, process.stderr, 'pipe', 'pipe'];
let newCwd: string;
if (cwd === undefined || cwd === '') {
newCwd = getLogDir();
} else {
newCwd = cwd;
}
// TO DO: add CUDA_VISIBLE_DEVICES
const assessorProc: ChildProcess = spawn(command, [], {
stdio,
cwd: newCwd,
env: {
NNI_MODE: mode,
NNI_CHECKPOINT_DIRECTORY: dataDirectory,
NNI_LOG_DIRECTORY: getLogDir()
},
shell: true
});
this.assessorPid = assessorProc.pid;
this.assessor = createAssessorInterface(assessorProc);
this.dispatcherPid = tunerProc.pid;
this.dispatcher = createDispatcherInterface(tunerProc);
return;
}
......@@ -307,10 +262,10 @@ class NNIManager implements Manager {
private updateTrialConcurrency(trialConcurrency: number): void {
// TO DO: this method can only be called after startExperiment/resumeExperiment
if (trialConcurrency > this.experimentProfile.params.trialConcurrency) {
if (this.tuner === undefined) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has to be initialized');
}
this.tuner.sendCommand(
this.dispatcher.sendCommand(
REQUEST_TRIAL_JOBS,
String(trialConcurrency - this.experimentProfile.params.trialConcurrency)
);
......@@ -333,45 +288,31 @@ class NNIManager implements Manager {
}
private updateSearchSpace(searchSpace: string): void {
if (this.tuner === undefined) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
}
this.tuner.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, searchSpace);
this.experimentProfile.params.searchSpace = searchSpace;
return;
}
private async experimentDoneCleanUp(): Promise<void> {
if (this.tuner === undefined) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
}
this.tuner.sendCommand(TERMINATE);
if (this.assessor !== undefined) {
this.assessor.sendCommand(TERMINATE);
}
this.dispatcher.sendCommand(TERMINATE);
let tunerAlive: boolean = true;
let assessorAlive: boolean = true;
// gracefully terminate tuner and assessor here, wait at most 30 seconds.
for (let i: number = 0; i < 30; i++) {
if (!tunerAlive && !assessorAlive) { break; }
if (!tunerAlive) { break; }
try {
await cpp.exec(`kill -0 ${this.tunerPid}`);
await cpp.exec(`kill -0 ${this.dispatcherPid}`);
} catch (error) { tunerAlive = false; }
if (this.assessor !== undefined) {
try {
await cpp.exec(`kill -0 ${this.assessorPid}`);
} catch (error) { assessorAlive = false; }
} else {
assessorAlive = false;
}
await delay(1000);
}
try {
await cpp.exec(`kill ${this.tunerPid}`);
if (this.assessorPid !== undefined) {
await cpp.exec(`kill ${this.assessorPid}`);
}
await cpp.exec(`kill ${this.dispatcherPid}`);
} catch (error) {
// this.tunerPid does not exist, do nothing here
}
......@@ -408,25 +349,18 @@ class NNIManager implements Manager {
return this.dataStore.storeExperimentProfile(this.experimentProfile);
}
// tslint:disable-next-line:max-func-body-length
private runInternal(): Promise<void> {
// TO DO: cannot run this method more than once in one NNIManager instance
if (this.tuner === undefined) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
}
this.trainingService.addTrialJobMetricListener(async (metric: TrialJobMetric) => {
await this.dataStore.storeMetricData(metric.id, metric.data);
if (this.tuner === undefined) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
}
this.tuner.sendCommand(REPORT_METRIC_DATA, metric.data);
if (this.assessor !== undefined) {
try {
this.assessor.sendCommand(REPORT_METRIC_DATA, metric.data);
} catch (error) {
this.log.critical(`ASSESSOR ERROR: ${error.message}`);
this.log.critical(`ASSESSOR ERROR: ${error.stack}`);
}
}
this.dispatcher.sendCommand(REPORT_METRIC_DATA, metric.data);
});
this.trialJobsMaintainer = new TrialJobs(
......@@ -439,7 +373,7 @@ class NNIManager implements Manager {
} else {
this.log.debug(`Job event: ${event}`);
}
if (this.tuner === undefined) {
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
}
switch (event) {
......@@ -453,15 +387,13 @@ class NNIManager implements Manager {
if (this.currSubmittedTrialNum < this.experimentProfile.params.maxTrialNum) {
if (this.customizedTrials.length > 0) {
const hyperParams: string | undefined = this.customizedTrials.shift();
this.tuner.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, hyperParams);
this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, hyperParams);
} else {
this.tuner.sendCommand(REQUEST_TRIAL_JOBS, '1');
this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, '1');
}
}
}
if (this.assessor !== undefined) {
this.assessor.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: event}));
}
this.dispatcher.sendCommand(TRIAL_END, JSON.stringify({trial_job_id: trialJobDetail.id, event: event}));
await this.dataStore.storeTrialJobEvent(event, trialJobDetail.id, undefined, trialJobDetail.url);
break;
case 'RUNNING':
......@@ -478,15 +410,14 @@ class NNIManager implements Manager {
});
// TO DO: we should send INITIALIZE command to tuner if user's tuner needs to run init method in tuner
// TO DO: we should send INITIALIZE command to assessor if user's tuner needs to run init method in tuner
this.log.debug(`Send tuner command: update search space: ${this.experimentProfile.params.searchSpace}`)
this.tuner.sendCommand(UPDATE_SEARCH_SPACE, this.experimentProfile.params.searchSpace);
this.log.debug(`Send tuner command: update search space: ${this.experimentProfile.params.searchSpace}`);
this.dispatcher.sendCommand(UPDATE_SEARCH_SPACE, this.experimentProfile.params.searchSpace);
if (this.trialConcurrencyReduction !== 0) {
return Promise.reject(new Error('Error: cannot modify trialConcurrency before startExperiment'));
}
this.log.debug(`Send tuner command: ${this.experimentProfile.params.trialConcurrency}`)
this.tuner.sendCommand(REQUEST_TRIAL_JOBS, String(this.experimentProfile.params.trialConcurrency));
this.tuner.onCommand(async (commandType: string, content: string) => {
this.dispatcher.sendCommand(REQUEST_TRIAL_JOBS, String(this.experimentProfile.params.trialConcurrency));
this.dispatcher.onCommand(async (commandType: string, content: string) => {
this.log.info(`Command from tuner: ${commandType}, ${content}`);
if (this.trialJobsMaintainer === undefined) {
throw new Error('Error: trialJobsMaintainer not initialized');
......@@ -501,8 +432,7 @@ class NNIManager implements Manager {
};
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm);
this.trialJobsMaintainer.setTrialJob(trialJobDetail.id, Object.assign({}, trialJobDetail));
// TO DO: to uncomment
//assert(trialJobDetail.status === 'WAITING');
assert(trialJobDetail.status === 'WAITING');
await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, content, trialJobDetail.url);
if (this.currSubmittedTrialNum === this.experimentProfile.params.maxTrialNum) {
this.trialJobsMaintainer.setNoMoreTrials();
......@@ -512,19 +442,13 @@ class NNIManager implements Manager {
case NO_MORE_TRIAL_JOBS:
this.trialJobsMaintainer.setNoMoreTrials();
break;
case KILL_TRIAL_JOB:
await this.trainingService.cancelTrialJob(JSON.parse(content));
break;
default:
throw new Error('Error: unsupported command type from tuner');
throw new Error(`Error: unsupported command type: [${commandType}]`);
}
});
if (this.assessor !== undefined) {
this.assessor.onCommand(async (commandType: string, content: string) => {
if (commandType === KILL_TRIAL_JOB) {
await this.trainingService.cancelTrialJob(JSON.parse(content));
} else {
throw new Error('Error: unsupported command type from assessor');
}
});
}
return this.trialJobsMaintainer.run();
}
......
......@@ -69,10 +69,9 @@ describe('Unit test for dataStore', () => {
}
}`,
tuner: {
tunerCommand: 'python3 tunner.py',
tunerCwd: '/tmp',
tunerCheckpointDirectory: '/tmp/cp',
tunerGpuNum: 0
className: 'testTuner',
checkpointDir: '/tmp/cp',
gpuNum: 0
}
},
id: 'exp123',
......
......@@ -20,6 +20,4 @@ from nni.assessor import Assessor, AssessResult
class DummyAssessor(Assessor):
def assess_trial(self, trial_job_id, trial_history):
return AssessResult.Good
DummyAssessor().run()
return AssessResult.Good
\ No newline at end of file
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from nni.tuner import Tuner
class DummyTuner(Tuner):
def generate_parameters(self, parameter_id):
return 'unit-test-parm'
def generate_multiple_parameters(self, parameter_id_list):
return ['unit-test-param1', 'unit-test-param2']
def receive_trial_result(self, parameter_id, parameters, reward):
pass
def receive_customized_trial_result(self, parameter_id, parameters, reward):
pass
def update_search_space(self, search_space):
pass
......@@ -24,7 +24,7 @@ import { ChildProcess, spawn } from 'child_process';
import { Deferred } from 'ts-deferred';
import { cleanupUnitTest, prepareUnitTest } from '../../common/utils';
import * as CommandType from '../commands';
import { createAssessorInterface, createTunerInterface, IpcInterface } from '../ipcInterface';
import { createDispatcherInterface, IpcInterface } from '../ipcInterface';
let sentCommands: {[key: string]: string}[] = [];
const receivedCommands: {[key: string]: string}[] = [];
......@@ -52,27 +52,27 @@ function runProcess(): Promise<Error | null> {
});
// create IPC interface
const assessor: IpcInterface = createAssessorInterface(proc);
assessor.onCommand((commandType: string, content: string): void => {
const dispatcher: IpcInterface = createDispatcherInterface(proc);
dispatcher.onCommand((commandType: string, content: string): void => {
receivedCommands.push({ commandType, content });
});
// Command #1: ok
assessor.sendCommand('IN');
dispatcher.sendCommand('IN');
// Command #2: ok
assessor.sendCommand('ME', '123');
dispatcher.sendCommand('ME', '123');
// Command #3: too long
try {
assessor.sendCommand('ME', 'x'.repeat(1_000_000));
dispatcher.sendCommand('ME', 'x'.repeat(1_000_000));
} catch (error) {
commandTooLong = error;
}
// Command #4: not assessor command
// Command #4: FE is not tuner/assessor command, test the exception type of send non-valid command
try {
assessor.sendCommand('GE', '1');
dispatcher.sendCommand('FE', '1');
} catch (error) {
rejectCommandType = error;
}
......
......@@ -22,18 +22,34 @@
import * as assert from 'assert';
import { ChildProcess, spawn } from 'child_process';
import { Deferred } from 'ts-deferred';
import { cleanupUnitTest, prepareUnitTest } from '../../common/utils';
import { cleanupUnitTest, prepareUnitTest, getMsgDispatcherCommand } from '../../common/utils';
import * as CommandType from '../commands';
import { createAssessorInterface, IpcInterface } from '../ipcInterface';
import { createDispatcherInterface, IpcInterface } from '../ipcInterface';
let assessor: IpcInterface | undefined;
let dispatcher: IpcInterface | undefined;
let procExit: boolean = false;
let procError: boolean = false;
function startProcess(): void {
// create fake assessor process
const stdio: {}[] = ['ignore', 'pipe', process.stderr, 'pipe', 'pipe'];
const proc: ChildProcess = spawn('python3 dummy_assessor.py', [], { stdio, cwd: 'core/test', shell: true });
const dispatcherCmd : string = getMsgDispatcherCommand(
// Mock tuner config
{
className: 'DummyTuner',
codeDir: './',
classFileName: 'dummy_tuner.py'
},
// Mock assessor config
{
className: 'DummyAssessor',
codeDir: './',
classFileName: 'dummy_assessor.py'
}
);
const proc: ChildProcess = spawn(dispatcherCmd, [], { stdio, cwd: 'core/test', shell: true });
proc.on('error', (error: Error): void => {
procExit = true;
......@@ -43,10 +59,10 @@ function startProcess(): void {
procExit = true;
procError = (code !== 0);
});
// create IPC interface
assessor = createAssessorInterface(proc);
(<IpcInterface>assessor).onCommand((commandType: string, content: string): void => {
dispatcher = createDispatcherInterface(proc);
(<IpcInterface>dispatcher).onCommand((commandType: string, content: string): void => {
console.log(commandType, content); // tslint:disable-line:no-console
});
}
......@@ -62,13 +78,13 @@ describe('core/ipcInterface.terminate', (): void => {
});
it('normal', () => {
(<IpcInterface>assessor).sendCommand(
(<IpcInterface>dispatcher).sendCommand(
CommandType.REPORT_METRIC_DATA,
'{"trial_job_id":"A","type":"periodical","value":1}');
'{"trial_job_id":"A","type":"PERIODICAL","value":1,"sequence":123}');
const deferred: Deferred<void> = new Deferred<void>();
setTimeout(
() => {
() => {
assert.ok(!procExit);
assert.ok(!procError);
deferred.resolve();
......@@ -79,7 +95,7 @@ describe('core/ipcInterface.terminate', (): void => {
});
it('terminate', () => {
(<IpcInterface>assessor).sendCommand(CommandType.TERMINATE);
(<IpcInterface>dispatcher).sendCommand(CommandType.TERMINATE);
const deferred: Deferred<void> = new Deferred<void>();
setTimeout(
......@@ -88,7 +104,7 @@ describe('core/ipcInterface.terminate', (): void => {
assert.ok(!procError);
deferred.resolve();
},
1000);
2000);
return deferred.promise;
});
......
......@@ -30,7 +30,6 @@ const testTrainingServiceProvider: Provider = {
};
class MockedTrainingService extends TrainingService {
public mockedMetaDataValue: string = "default";
public jobDetail1: TrialJobDetail = {
id: '1234',
......@@ -93,6 +92,14 @@ class MockedTrainingService extends TrainingService {
return deferred.promise;
}
public updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
throw new MethodNotImplementedError();
}
public get isMultiPhaseJobSupported(): boolean {
return false;
}
public cancelTrialJob(trialJobId: string): Promise<void> {
const deferred = new Deferred<void>();
if(trialJobId === '1234' || trialJobId === '3456'){
......@@ -125,7 +132,7 @@ class MockedTrainingService extends TrainingService {
}
public cleanUp(): Promise<void> {
throw new MethodNotImplementedError();
return Promise.resolve();
}
}
......
......@@ -56,16 +56,17 @@ describe('Unit test for nnimanager', function () {
maxTrialNum: 2,
searchSpace: '{"x":1}',
tuner: {
tunerCommand: 'python3 hyperopt.py',
tunerCwd: 'core/test',
tunerCheckpointDirectory: '',
tunerGpuNum: 1
className: 'EvolutionTuner',
classArgs: {
optimize_mode: 'maximize'
},
checkpointDir: '',
gpuNum: 1
},
assessor: {
assessorCommand: 'python3 dummy_assessor.py',
assessorCwd: 'core/test',
assessorCheckpointDirectory: '',
assessorGpuNum: 1
className: 'MedianstopAssessor',
checkpointDir: '',
gpuNum: 1
}
}
......
......@@ -38,10 +38,9 @@ const expParams1: ExperimentParams = {
maxTrialNum: 5,
searchSpace: 'SS',
tuner: {
tunerCommand: './tuner.sh',
tunerCwd: '.',
tunerCheckpointDirectory: '/tmp',
tunerGpuNum: 0
className: 'testTuner',
checkpointDir: '/tmp',
gpuNum: 0
}
};
......@@ -53,14 +52,12 @@ const expParams2: ExperimentParams = {
maxTrialNum: 5,
searchSpace: '',
tuner: {
tunerCommand: 'python tuner.py',
tunerCwd: '/tmp',
tunerCheckpointDirectory: '/tmp'
className: 'testTuner',
checkpointDir: '/tmp'
},
assessor: {
assessorCommand: 'python assessor.py',
assessorCwd: '/tmp',
assessorCheckpointDirectory: '/tmp'
className: 'testAssessor',
checkpointDir: '/tmp'
}
};
......
......@@ -37,7 +37,7 @@ export const testManagerProvider: Provider = {
};
export class MockedNNIManager extends Manager {
public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType ): Promise<void> {
public updateExperimentProfile(experimentProfile: ExperimentProfile, updateType: ProfileUpdateType): Promise<void> {
return Promise.resolve();
}
public getTrialJobStatistics(): Promise<TrialJobStatistics[]> {
......@@ -103,23 +103,15 @@ export class MockedNNIManager extends Manager {
return deferred.promise;
}
public getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
const jobDetail: TrialJobDetail = {
public getTrialJob(trialJobId: string): Promise<TrialJobInfo> {
const deferred: Deferred<TrialJobInfo> = new Deferred<TrialJobInfo>();
const jobInfo: TrialJobInfo = {
id: '1234',
status: 'SUCCEEDED',
submitTime: new Date(),
startTime: new Date(),
endTime: new Date(),
tags: ['test'],
// tslint:disable-next-line:no-http-string
url: 'http://test',
workingDirectory: '/tmp/mocked',
form: {
jobType: 'TRIAL'
}
endTime: new Date()
};
deferred.resolve(jobDetail);
deferred.resolve(jobInfo);
return deferred.promise;
}
......@@ -139,9 +131,8 @@ export class MockedNNIManager extends Manager {
maxTrialNum: 3,
searchSpace: '{lr: 0.01}',
tuner: {
tunerCommand: 'python3 tuner.py',
tunerCwd: '/tmp/tunner',
tunerCheckpointDirectory: ''
className: 'testTuner',
checkpointDir: ''
}
},
id: '2345',
......
......@@ -116,7 +116,7 @@ describe('Unit test for rest server', () => {
}
const req: request.Options = {
uri: `${ROOT_URL}/experiment`,
uri: `${ROOT_URL}/experiment?update_type=TRIAL_CONCURRENCY`,
method: 'PUT',
json: true,
body: profile
......@@ -141,7 +141,7 @@ describe('Unit test for rest server', () => {
body: {
exception_test_key: 'test'
}
}
};
request(req, (err: Error, res: request.Response) => {
if (err) {
assert.fail(err.message);
......@@ -158,7 +158,7 @@ describe('Unit test for rest server', () => {
method: 'PUT',
json: true,
body: {
MACHINE_LIST: [{
machine_list: [{
ip: '10.10.10.101',
port: 22,
username: 'test',
......@@ -170,37 +170,12 @@ describe('Unit test for rest server', () => {
passwd: '1234'
}]
}
}
request(req, (err: Error, res: request.Response) => {
if (err) {
assert.fail(err.message);
} else {
expect(res.statusCode).to.equal(200);
}
done();
});
});
it('Test POST experiment', (done: Mocha.Done) => {
const req: request.Options = {
uri: `${ROOT_URL}/experiment`,
method: 'POST',
json: true,
body: {
author: 'test',
trial: {
entrypoint: 'python',
args: 'mnist.py'
}
}
};
// tslint:disable-next-line:no-any
request(req, (err: Error, res: request.Response, body: any) => {
request(req, (err: Error, res: request.Response) => {
if (err) {
assert.fail(err.message);
} else {
expect(res.statusCode).to.equal(200);
expect(body.experiment_id).to.equal('id-1234');
}
done();
});
......
......@@ -25,7 +25,7 @@ import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
import * as ts from 'tail-stream';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { MethodNotImplementedError, NNIError, NNIErrorNames } from '../../common/errors';
import { getLogger, Logger } from '../../common/log';
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
......@@ -205,6 +205,22 @@ class LocalTrainingService implements TrainingService {
}
}
/**
* Update trial job for multi-phase
* @param trialJobId trial job id
* @param form job application form
*/
public updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
throw new MethodNotImplementedError();
}
/**
* Is multiphase job supported in current training service
*/
public get isMultiPhaseJobSupported(): boolean {
return false;
}
public async cancelTrialJob(trialJobId: string): Promise<void> {
this.log.info(`cancelTrialJob: ${trialJobId}`);
const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
......@@ -309,7 +325,7 @@ class LocalTrainingService implements TrainingService {
runScriptLines.push(`export ${variable.key}=${variable.value}`);
}
runScriptLines.push(
`eval ${this.localTrailConfig.command} 2>${path.join(trialJobDetail.workingDirectory, '.nni', 'stderr')}`,
`eval ${this.localTrailConfig.command} 2>${path.join(trialJobDetail.workingDirectory, 'stderr')}`,
`echo $? \`date +%s%3N\` >${path.join(trialJobDetail.workingDirectory, '.nni', 'state')}`);
await cpp.exec(`mkdir -p ${trialJobDetail.workingDirectory}`);
......
......@@ -82,7 +82,12 @@ export class MetricsCollector {
private getTrialJobIdsGroupByRmMeta(status: TrialJobStatus[]): Map<RemoteMachineMeta, string[]> {
const map: Map<RemoteMachineMeta, string[]> = new Map<RemoteMachineMeta, string[]>();
this.trialJobsMap.forEach((trialJob, id) => {
if (status.includes(trialJob.status)) {
let reservedTrialJobIds : string[] = [];
if(trialJob.rmMeta !== undefined
&& trialJob.rmMeta.gpuReservation !== undefined) {
reservedTrialJobIds = Array.from(trialJob.rmMeta.gpuReservation.values());
}
if (reservedTrialJobIds.includes(id) || status.includes(trialJob.status)) {
if (map.has(trialJob.rmMeta)) {
const ids = map.get(trialJob.rmMeta);
if (ids !== undefined && !ids.includes(id)) {
......@@ -93,7 +98,7 @@ export class MetricsCollector {
// If the remote machine has jobs reserve GPU, also put that jobs into list to get metrics data
if(trialJob.rmMeta.gpuReservation !== undefined) {
const concatJobIds : string[] = initJobIds.concat(Array.from(trialJob.rmMeta.gpuReservation.values()));
const concatJobIds : string[] = initJobIds.concat(reservedTrialJobIds);
initJobIds = concatJobIds.filter((item, pos) => concatJobIds.indexOf(item) === pos);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment