Unverified Commit d0b22fc7 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support version check of nni (#807)

check nni version in trialkeeper, to make sure the version of trialkeeper is consistent with trainingService
add a debug mode in config file
parent 82791fc1
......@@ -149,6 +149,11 @@ machineList:
Note: The maxExecDuration spec set the time of an experiment, not a trial job. If the experiment reach the max duration time, the experiment will not stop, but could not submit new trial jobs any more.
* __debug__
* Description
NNI will check the version of nniManager process and the version of trialKeeper in remote, pai and kubernetes platform. If you want to disable version check, you could set debug be false.
* __maxTrialNum__
* Description
......
......@@ -45,6 +45,12 @@ nnictl support commands:
|------|------|------|------|
|--config, -c| True| |YAML configure file of the experiment|
|--port, -p|False| |the port of restful server|
|--debug, -d|False||set debug mode|
Note:
```
Debug mode will disable version check function in Trialkeeper.
```
<a name="resume"></a>
* __nnictl resume__
......@@ -65,6 +71,7 @@ nnictl support commands:
|------|------|------ |------|
|id| False| |The id of the experiment you want to resume|
|--port, -p| False| |Rest port of the experiment you want to resume|
|--debug, -d|False||set debug mode|
<a name="stop"></a>
* __nnictl stop__
......
......@@ -36,6 +36,7 @@ interface ExperimentParams {
trainingServicePlatform: string;
multiPhase?: boolean;
multiThread?: boolean;
versionCheck?: boolean;
tuner?: {
className: string;
builtinTunerName?: string;
......
......@@ -345,6 +345,19 @@ function countFilesRecursively(directory: string, timeoutMilliSeconds?: number):
});
}
/**
* get the version of current package
*/
async function getVersion(): Promise<string> {
const deferred : Deferred<string> = new Deferred<string>();
import(path.join(__dirname, '..', 'package.json')).then((pkg)=>{
deferred.resolve(pkg.version);
}).catch((error)=>{
deferred.reject(error);
});
return deferred.promise;
}
export {countFilesRecursively, getRemoteTmpDir, generateParamFileName, getMsgDispatcherCommand, getCheckpointDir,
getLogDir, getExperimentRootDir, getJobCancelStatus, getDefaultDatabaseDir, getIPV4Address,
mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect };
mkDirP, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomSelect, getVersion };
......@@ -127,7 +127,11 @@ class NNIManager implements Manager {
if (expParams.multiPhase && this.trainingService.isMultiPhaseJobSupported) {
this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
}
// Set up versionCheck config
if (expParams.versionCheck !== undefined) {
this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
}
const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
expParams.multiPhase, expParams.multiThread);
this.log.debug(`dispatcher command: ${dispatcherCommand}`);
......@@ -162,6 +166,11 @@ class NNIManager implements Manager {
this.trainingService.setClusterMetadata('multiPhase', expParams.multiPhase.toString());
}
// Set up versionCheck config
if (expParams.versionCheck !== undefined) {
this.trainingService.setClusterMetadata('versionCheck', expParams.versionCheck.toString());
}
const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
expParams.multiPhase, expParams.multiThread);
this.log.debug(`dispatcher command: ${dispatcherCommand}`);
......
......@@ -30,6 +30,7 @@ import { getLogger, Logger } from '../common/log';
import { ExperimentProfile, Manager, TrialJobStatistics} from '../common/manager';
import { ValidationSchemas } from './restValidationSchemas';
import { NNIRestServer } from './nniRestServer';
import { getVersion } from '../common/utils';
const expressJoi = require('express-joi-validator');
......@@ -104,8 +105,8 @@ class NNIRestHandler {
private version(router: Router): void {
router.get('/version', async (req: Request, res: Response) => {
const pkg = await import(path.join(__dirname, '..', 'package.json'));
res.send(pkg.version);
const version = await getVersion();
res.send(version);
});
}
......
......@@ -139,6 +139,7 @@ export namespace ValidationSchemas {
maxExecDuration: joi.number().min(0).required(),
multiPhase: joi.boolean(),
multiThread: joi.boolean(),
versionCheck: joi.boolean(),
advisor: joi.object({
builtinAdvisorName: joi.string().valid('Hyperband'),
codeDir: joi.string(),
......
......@@ -31,5 +31,6 @@ export enum TrialConfigMetadataKey {
PAI_CLUSTER_CONFIG = 'pai_config',
KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config',
NNI_MANAGER_IP = 'nni_manager_ip',
FRAMEWORKCONTROLLER_CLUSTER_CONFIG = 'frameworkcontroller_config'
FRAMEWORKCONTROLLER_CLUSTER_CONFIG = 'frameworkcontroller_config',
VERSION_CHECK = 'version_check'
}
......@@ -191,7 +191,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
for(let taskRole of this.fcTrialConfig.taskRoles) {
const runScriptContent: string = this.generateRunScript('frameworkcontroller', trialJobId, trialWorkingFolder,
const runScriptContent: string = await this.generateRunScript('frameworkcontroller', trialJobId, trialWorkingFolder,
this.generateCommandScript(taskRole.command), curTrialSequenceId.toString(), taskRole.name, taskRole.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, `run_${taskRole.name}.sh`), runScriptContent, { encoding: 'utf8' });
}
......@@ -267,6 +267,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
return Promise.reject(new Error(error));
}
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
default:
break;
}
......
......@@ -188,7 +188,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
// Write worker file content run_worker.sh to local tmp folders
if(kubeflowTrialConfig.worker) {
const workerRunScriptContent: string = this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
const workerRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
kubeflowTrialConfig.worker.command, curTrialSequenceId.toString(), 'worker', kubeflowTrialConfig.worker.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_worker.sh'), workerRunScriptContent, { encoding: 'utf8' });
......@@ -197,7 +197,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
if(this.kubeflowClusterConfig.operator === 'tf-operator') {
let tensorflowTrialConfig: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>this.kubeflowTrialConfig;
if(tensorflowTrialConfig.ps){
const psRunScriptContent: string = this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
const psRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
tensorflowTrialConfig.ps.command, curTrialSequenceId.toString(), 'ps', tensorflowTrialConfig.ps.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_ps.sh'), psRunScriptContent, { encoding: 'utf8' });
}
......@@ -205,7 +205,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
else if(this.kubeflowClusterConfig.operator === 'pytorch-operator') {
let pytorchTrialConfig: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>this.kubeflowTrialConfig;
if(pytorchTrialConfig.master){
const masterRunScriptContent: string = this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
const masterRunScriptContent: string = await this.generateRunScript('kubeflow', trialJobId, trialWorkingFolder,
pytorchTrialConfig.master.command, curTrialSequenceId.toString(), 'master', pytorchTrialConfig.master.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run_master.sh'), masterRunScriptContent, { encoding: 'utf8' });
}
......@@ -317,6 +317,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
return Promise.reject(new Error(error));
}
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
default:
break;
}
......
......@@ -71,5 +71,5 @@ mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cd $NNI_SYS_DIR
sh install_nni.sh
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} `
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} --version '{11}'`
+ `1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`
......@@ -25,7 +25,7 @@ import * as path from 'path';
import { EventEmitter } from 'events';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { getExperimentRootDir, uniqueString, getJobCancelStatus, getIPV4Address } from '../../common/utils';
import { getExperimentRootDir, uniqueString, getJobCancelStatus, getIPV4Address, getVersion } from '../../common/utils';
import {
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService';
......@@ -61,6 +61,7 @@ abstract class KubernetesTrainingService {
protected kubernetesCRDClient?: KubernetesCRDClient;
protected kubernetesJobRestServer?: KubernetesJobRestServer;
protected kubernetesClusterConfig?: KubernetesClusterConfig;
protected versionCheck?: boolean = true;
constructor() {
this.log = getLogger();
......@@ -179,8 +180,8 @@ abstract class KubernetesTrainingService {
* @param command
* @param trialSequenceId sequence id
*/
protected generateRunScript(platform: string, trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleName: string, gpuNum: number): string {
protected async generateRunScript(platform: string, trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleName: string, gpuNum: number): Promise<string> {
let nvidia_script: string = '';
// Nvidia devcie plugin for K8S has a known issue that requesting zero GPUs allocates all GPUs
// Refer https://github.com/NVIDIA/k8s-device-plugin/issues/61
......@@ -189,6 +190,7 @@ abstract class KubernetesTrainingService {
nvidia_script = `export CUDA_VISIBLE_DEVICES='0'`;
}
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
const version = this.versionCheck? await getVersion(): '';
const runScript: string = String.Format(
KubernetesScriptFormat,
platform,
......@@ -201,9 +203,10 @@ abstract class KubernetesTrainingService {
nvidia_script,
command,
nniManagerIp,
this.kubernetesRestServerPort
this.kubernetesRestServerPort,
version
);
return runScript;
return Promise.resolve(runScript);
}
protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise<void> {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
......
......@@ -64,7 +64,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4}
&& cd $NNI_SYS_DIR && sh install_nni.sh
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}'
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1'`;
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' --version '{12}'`;
export const PAI_OUTPUT_DIR_FORMAT: string =
`hdfs://{0}:9000/`;
......
......@@ -39,7 +39,7 @@ import {
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService';
import { delay, generateParamFileName,
getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils';
getExperimentRootDir, getIPV4Address, uniqueString, getVersion } from '../../common/utils';
import { PAIJobRestServer } from './paiJobRestServer'
import { PAITrialJobDetail, PAI_TRIAL_COMMAND_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_LOG_PATH_FORMAT } from './paiData';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
......@@ -75,6 +75,7 @@ class PAITrainingService implements TrainingService {
private paiRestServerPort?: number;
private nniManagerIpConfig?: NNIManagerIpConfig;
private copyExpCodeDirPromise?: Promise<void>;
private versionCheck?: boolean = true;
constructor() {
this.log = getLogger();
......@@ -211,6 +212,7 @@ class PAITrainingService implements TrainingService {
hdfsLogPath);
this.trialJobsMap.set(trialJobId, trialJobDetail);
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
const version = this.versionCheck? await getVersion(): '';
const nniPaiTrialCommand : string = String.Format(
PAI_TRIAL_COMMAND_FORMAT,
// PAI will copy job's codeDir into /root directory
......@@ -225,7 +227,8 @@ class PAITrainingService implements TrainingService {
hdfsOutputDir,
this.hdfsOutputHost,
this.paiClusterConfig.userName,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName)
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
version
).replace(/\r\n|\n|\r/gm, '');
console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
......@@ -434,6 +437,9 @@ class PAITrainingService implements TrainingService {
deferred.resolve();
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
default:
//Reject for unknown keys
throw new Error(`Uknown key: ${key}`);
......
......@@ -250,8 +250,8 @@ export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={
cd $NNI_SYS_DIR
sh install_nni.sh
echo $$ >{6}
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{10}`;
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' --version '{10}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{11}`;
export const HOST_JOB_SHELL_FORMAT: string =
`#!/bin/bash
......
......@@ -51,7 +51,7 @@ import { SSHClientUtility } from './sshClientUtility';
import { validateCodeDir } from '../common/util';
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { mkDirP } from '../../common/utils';
import { mkDirP, getVersion } from '../../common/utils';
/**
* Training Service implementation for Remote Machine (Linux)
......@@ -76,6 +76,7 @@ class RemoteMachineTrainingService implements TrainingService {
private remoteRestServerPort?: number;
private readonly remoteOS: string;
private nniManagerIpConfig?: NNIManagerIpConfig;
private versionCheck: boolean = true;
constructor(@component.Inject timer: ObservableTimer) {
this.remoteOS = 'linux';
......@@ -372,6 +373,9 @@ class RemoteMachineTrainingService implements TrainingService {
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
default:
//Reject for unknown keys
throw new Error(`Uknown key: ${key}`);
......@@ -580,6 +584,7 @@ class RemoteMachineTrainingService implements TrainingService {
const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
this.remoteRestServerPort = restServer.clusterRestServerPort;
}
const version = this.versionCheck? await getVersion(): '';
const runScriptTrialContent: string = String.Format(
REMOTEMACHINE_TRIAL_COMMAND_FORMAT,
trialWorkingFolder,
......@@ -592,6 +597,7 @@ class RemoteMachineTrainingService implements TrainingService {
command,
nniManagerIp,
this.remoteRestServerPort,
version,
path.join(trialWorkingFolder, '.nni', 'code')
)
......
......@@ -34,6 +34,7 @@ Optional('multiPhase'): bool,
Optional('multiThread'): bool,
Optional('nniManagerIp'): str,
Optional('logDir'): os.path.isdir,
Optional('debug'): bool,
Optional('logLevel'): Or('trace', 'debug', 'info', 'warning', 'error', 'fatal'),
'useAnnotation': bool,
Optional('advisor'): Or({
......
......@@ -271,6 +271,8 @@ def set_experiment(experiment_config, mode, port, config_file_name):
request_data['tuner'] = experiment_config['tuner']
if 'assessor' in experiment_config:
request_data['assessor'] = experiment_config['assessor']
if experiment_config.get('debug') is not None:
request_data['versionCheck'] = experiment_config.get('debug')
request_data['clusterMetaData'] = []
if experiment_config['trainingServicePlatform'] == 'local':
......@@ -313,7 +315,6 @@ def set_experiment(experiment_config, mode, port, config_file_name):
def launch_experiment(args, experiment_config, mode, config_file_name, experiment_id=None):
'''follow steps to start rest server and start experiment'''
nni_config = Config(config_file_name)
# check packages for tuner
if experiment_config.get('tuner') and experiment_config['tuner'].get('builtinTunerName'):
tuner_name = experiment_config['tuner']['builtinTunerName']
......@@ -440,6 +441,9 @@ def launch_experiment(args, experiment_config, mode, config_file_name, experimen
# start a new experiment
print_normal('Starting experiment...')
# set debug configuration
if args.debug is not None:
experiment_config['debug'] = args.debug
response = set_experiment(experiment_config, mode, args.port, config_file_name)
if response:
if experiment_id is None:
......
......@@ -34,7 +34,10 @@ if os.environ.get('COVERAGE_PROCESS_START'):
def nni_info(*args):
if args[0].version:
print(pkg_resources.get_distribution('nni').version)
try:
print(pkg_resources.get_distribution('nni').version)
except pkg_resources.ResolutionError as err:
print_error('Get version failed, please use `pip3 list | grep nni` to check nni version!')
else:
print('please run "nnictl {positional argument} --help" to see nnictl guidance')
......@@ -51,14 +54,14 @@ def parse_args():
parser_start = subparsers.add_parser('create', help='create a new experiment')
parser_start.add_argument('--config', '-c', required=True, dest='config', help='the path of yaml config file')
parser_start.add_argument('--port', '-p', default=DEFAULT_REST_PORT, dest='port', help='the port of restful server')
parser_start.add_argument('--debug', '-d', action='store_true', help=' set log level to debug')
parser_start.add_argument('--debug', '-d', action='store_true', help=' set debug mode')
parser_start.set_defaults(func=create_experiment)
# parse resume command
parser_resume = subparsers.add_parser('resume', help='resume a new experiment')
parser_resume.add_argument('id', nargs='?', help='The id of the experiment you want to resume')
parser_resume.add_argument('--port', '-p', default=DEFAULT_REST_PORT, dest='port', help='the port of restful server')
parser_resume.add_argument('--debug', '-d', action='store_true', help=' set log level to debug')
parser_resume.add_argument('--debug', '-d', action='store_true', help=' set debug mode')
parser_resume.set_defaults(func=resume_experiment)
# parse update command
......
......@@ -28,6 +28,7 @@ import re
import sys
import select
from pyhdfs import HdfsClient
import pkg_resources
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal
......@@ -103,6 +104,28 @@ def main_loop(args):
def trial_keeper_help_info(*args):
print('please run --help to see guidance')
def check_version(args):
try:
trial_keeper_version = pkg_resources.get_distribution('nni').version
except pkg_resources.ResolutionError as err:
#package nni does not exist, try nni-tool package
nni_log(LogType.Warning, 'Package nni does not exist!')
try:
trial_keeper_version = pkg_resources.get_distribution('nni-tool').version
except pkg_resources.ResolutionError as err:
#package nni-tool does not exist
nni_log(LogType.Error, 'Package nni-tool does not exist!')
os._exit(1)
if not args.version:
# skip version check
nni_log(LogType.Warning, 'Skipping version check!')
elif trial_keeper_version != args.version:
nni_log(LogType.Error, 'Exit trial keeper, trial keeper version is {}, and trainingService version is {}, \
versions does not match, please check your code and image versions!'.format(trial_keeper_version, args.version))
os._exit(1)
else:
nni_log(LogType.Info, 'NNI version is {}'.format(args.version))
if __name__ == '__main__':
'''NNI Trial Keeper main function'''
PARSER = argparse.ArgumentParser()
......@@ -117,10 +140,11 @@ if __name__ == '__main__':
PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
PARSER.add_argument('--version', type=str, help='the nni version transmitted from trainingService')
args, unknown = PARSER.parse_known_args()
if args.trial_command is None:
exit(1)
check_version(args)
try:
main_loop(args)
except SystemExit as se:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment