Unverified Commit 6f272049 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Remove paiYarn mode (#3327)

parent f11aea0d
...@@ -9,7 +9,7 @@ if trial_env_vars.NNI_PLATFORM is None: ...@@ -9,7 +9,7 @@ if trial_env_vars.NNI_PLATFORM is None:
from .standalone import * from .standalone import *
elif trial_env_vars.NNI_PLATFORM == 'unittest': elif trial_env_vars.NNI_PLATFORM == 'unittest':
from .test import * from .test import *
elif trial_env_vars.NNI_PLATFORM in ('local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml', 'adl', 'hybrid'): elif trial_env_vars.NNI_PLATFORM in ('local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'dlts', 'aml', 'adl', 'hybrid'):
from .local import * from .local import *
else: else:
raise RuntimeError('Unknown platform %s' % trial_env_vars.NNI_PLATFORM) raise RuntimeError('Unknown platform %s' % trial_env_vars.NNI_PLATFORM)
...@@ -124,7 +124,7 @@ common_schema = { ...@@ -124,7 +124,7 @@ common_schema = {
Optional('maxExecDuration'): And(Regex(r'^[1-9][0-9]*[s|m|h|d]$', error='ERROR: maxExecDuration format is [digit]{s,m,h,d}')), Optional('maxExecDuration'): And(Regex(r'^[1-9][0-9]*[s|m|h|d]$', error='ERROR: maxExecDuration format is [digit]{s,m,h,d}')),
Optional('maxTrialNum'): setNumberRange('maxTrialNum', int, 1, 99999), Optional('maxTrialNum'): setNumberRange('maxTrialNum', int, 1, 99999),
'trainingServicePlatform': setChoice( 'trainingServicePlatform': setChoice(
'trainingServicePlatform', 'remote', 'local', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml', 'adl', 'hybrid'), 'trainingServicePlatform', 'remote', 'local', 'pai', 'kubeflow', 'frameworkcontroller', 'dlts', 'aml', 'adl', 'hybrid'),
Optional('searchSpacePath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'searchSpacePath'), Optional('searchSpacePath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'searchSpacePath'),
Optional('multiPhase'): setType('multiPhase', bool), Optional('multiPhase'): setType('multiPhase', bool),
Optional('multiThread'): setType('multiThread', bool), Optional('multiThread'): setType('multiThread', bool),
...@@ -178,18 +178,6 @@ pai_yarn_trial_schema = { ...@@ -178,18 +178,6 @@ pai_yarn_trial_schema = {
} }
} }
pai_yarn_config_schema = {
'paiYarnConfig': Or({
'userName': setType('userName', str),
'passWord': setType('passWord', str),
'host': setType('host', str)
}, {
'userName': setType('userName', str),
'token': setType('token', str),
'host': setType('host', str)
})
}
pai_trial_schema = { pai_trial_schema = {
'trial': { 'trial': {
...@@ -456,7 +444,6 @@ training_service_schema_dict = { ...@@ -456,7 +444,6 @@ training_service_schema_dict = {
'local': Schema({**common_schema, **common_trial_schema}), 'local': Schema({**common_schema, **common_trial_schema}),
'remote': Schema({**common_schema, **common_trial_schema, **machine_list_schema, **remote_config_schema}), 'remote': Schema({**common_schema, **common_trial_schema, **machine_list_schema, **remote_config_schema}),
'pai': Schema({**common_schema, **pai_trial_schema, **pai_config_schema}), 'pai': Schema({**common_schema, **pai_trial_schema, **pai_config_schema}),
'paiYarn': Schema({**common_schema, **pai_yarn_trial_schema, **pai_yarn_config_schema}),
'kubeflow': Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema}), 'kubeflow': Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema}),
'frameworkcontroller': Schema({**common_schema, **frameworkcontroller_trial_schema, **frameworkcontroller_config_schema}), 'frameworkcontroller': Schema({**common_schema, **frameworkcontroller_trial_schema, **frameworkcontroller_config_schema}),
'aml': Schema({**common_schema, **aml_trial_schema, **aml_config_schema}), 'aml': Schema({**common_schema, **aml_trial_schema, **aml_config_schema}),
...@@ -569,7 +556,7 @@ class NNIConfigSchema: ...@@ -569,7 +556,7 @@ class NNIConfigSchema:
def validate_pai_trial_conifg(self, experiment_config): def validate_pai_trial_conifg(self, experiment_config):
'''validate the trial config in pai platform''' '''validate the trial config in pai platform'''
if experiment_config.get('trainingServicePlatform') in ['pai', 'paiYarn']: if experiment_config.get('trainingServicePlatform') in ['pai']:
if experiment_config.get('trial').get('shmMB') and \ if experiment_config.get('trial').get('shmMB') and \
experiment_config['trial']['shmMB'] > experiment_config['trial']['memoryMB']: experiment_config['trial']['shmMB'] > experiment_config['trial']['memoryMB']:
raise SchemaError('shmMB should be no more than memoryMB!') raise SchemaError('shmMB should be no more than memoryMB!')
......
...@@ -205,25 +205,6 @@ def set_pai_config(experiment_config, port, config_file_name): ...@@ -205,25 +205,6 @@ def set_pai_config(experiment_config, port, config_file_name):
#set trial_config #set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message return set_trial_config(experiment_config, port, config_file_name), err_message
def set_pai_yarn_config(experiment_config, port, config_file_name):
'''set paiYarn configuration'''
pai_yarn_config_data = dict()
pai_yarn_config_data['pai_yarn_config'] = experiment_config['paiYarnConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(pai_yarn_config_data), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_kubeflow_config(experiment_config, port, config_file_name): def set_kubeflow_config(experiment_config, port, config_file_name):
'''set kubeflow configuration''' '''set kubeflow configuration'''
kubeflow_config_data = dict() kubeflow_config_data = dict()
...@@ -395,11 +376,6 @@ def set_experiment(experiment_config, mode, port, config_file_name): ...@@ -395,11 +376,6 @@ def set_experiment(experiment_config, mode, port, config_file_name):
{'key': 'pai_config', 'value': experiment_config['paiConfig']}) {'key': 'pai_config', 'value': experiment_config['paiConfig']})
request_data['clusterMetaData'].append( request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']}) {'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'paiYarn':
request_data['clusterMetaData'].append(
{'key': 'pai_yarn_config', 'value': experiment_config['paiYarnConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'kubeflow': elif experiment_config['trainingServicePlatform'] == 'kubeflow':
request_data['clusterMetaData'].append( request_data['clusterMetaData'].append(
{'key': 'kubeflow_config', 'value': experiment_config['kubeflowConfig']}) {'key': 'kubeflow_config', 'value': experiment_config['kubeflowConfig']})
...@@ -453,8 +429,6 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res ...@@ -453,8 +429,6 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res
config_result, err_msg = set_remote_config(experiment_config, port, config_file_name) config_result, err_msg = set_remote_config(experiment_config, port, config_file_name)
elif platform == 'pai': elif platform == 'pai':
config_result, err_msg = set_pai_config(experiment_config, port, config_file_name) config_result, err_msg = set_pai_config(experiment_config, port, config_file_name)
elif platform == 'paiYarn':
config_result, err_msg = set_pai_yarn_config(experiment_config, port, config_file_name)
elif platform == 'kubeflow': elif platform == 'kubeflow':
config_result, err_msg = set_kubeflow_config(experiment_config, port, config_file_name) config_result, err_msg = set_kubeflow_config(experiment_config, port, config_file_name)
elif platform == 'frameworkcontroller': elif platform == 'frameworkcontroller':
......
...@@ -254,8 +254,6 @@ if __name__ == '__main__': ...@@ -254,8 +254,6 @@ if __name__ == '__main__':
exit(1) exit(1)
check_version(args) check_version(args)
try: try:
if NNI_PLATFORM == 'paiYarn' and is_multi_phase():
fetch_parameter_file(args)
if NNI_PLATFORM == 'adl': if NNI_PLATFORM == 'adl':
_set_adaptdl_signal_handler() _set_adaptdl_signal_handler()
main_loop(args) main_loop(args)
......
...@@ -59,22 +59,6 @@ frameworkcontroller: ...@@ -59,22 +59,6 @@ frameworkcontroller:
local: local:
trainingServicePlatform: local trainingServicePlatform: local
paiYarn:
nniManagerIp:
maxExecDuration: 15m
paiYarnConfig:
host:
passWord:
userName:
trainingServicePlatform: paiYarn
trial:
gpuNum: 1
cpuNum: 1
dataDir:
image:
memoryMB: 8192
outputDir:
virtualCluster:
pai: pai:
nniManagerIp: nniManagerIp:
maxExecDuration: 15m maxExecDuration: 15m
......
...@@ -13,21 +13,6 @@ def update_training_service_config(args): ...@@ -13,21 +13,6 @@ def update_training_service_config(args):
config = get_yml_content(TRAINING_SERVICE_FILE) config = get_yml_content(TRAINING_SERVICE_FILE)
if args.nni_manager_ip is not None: if args.nni_manager_ip is not None:
config[args.ts]['nniManagerIp'] = args.nni_manager_ip config[args.ts]['nniManagerIp'] = args.nni_manager_ip
if args.ts == 'paiYarn':
if args.pai_user is not None:
config[args.ts]['paiYarnConfig']['userName'] = args.pai_user
if args.pai_pwd is not None:
config[args.ts]['paiYarnConfig']['passWord'] = args.pai_pwd
if args.pai_host is not None:
config[args.ts]['paiYarnConfig']['host'] = args.pai_host
if args.nni_docker_image is not None:
config[args.ts]['trial']['image'] = args.nni_docker_image
if args.data_dir is not None:
config[args.ts]['trial']['dataDir'] = args.data_dir
if args.output_dir is not None:
config[args.ts]['trial']['outputDir'] = args.output_dir
if args.vc is not None:
config[args.ts]['trial']['virtualCluster'] = args.vc
if args.ts == 'pai': if args.ts == 'pai':
if args.pai_user is not None: if args.pai_user is not None:
config[args.ts]['paiConfig']['userName'] = args.pai_user config[args.ts]['paiConfig']['userName'] = args.pai_user
......
...@@ -25,7 +25,6 @@ import { AdlTrainingService } from './training_service/kubernetes/adl/adlTrainin ...@@ -25,7 +25,6 @@ import { AdlTrainingService } from './training_service/kubernetes/adl/adlTrainin
import { KubeflowTrainingService } from './training_service/kubernetes/kubeflow/kubeflowTrainingService'; import { KubeflowTrainingService } from './training_service/kubernetes/kubeflow/kubeflowTrainingService';
import { LocalTrainingService } from './training_service/local/localTrainingService'; import { LocalTrainingService } from './training_service/local/localTrainingService';
import { RouterTrainingService } from './training_service/reusable/routerTrainingService'; import { RouterTrainingService } from './training_service/reusable/routerTrainingService';
import { PAIYarnTrainingService } from './training_service/pai/paiYarn/paiYarnTrainingService';
import { DLTSTrainingService } from './training_service/dlts/dltsTrainingService'; import { DLTSTrainingService } from './training_service/dlts/dltsTrainingService';
...@@ -46,10 +45,6 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN ...@@ -46,10 +45,6 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN
Container.bind(TrainingService) Container.bind(TrainingService)
.to(LocalTrainingService) .to(LocalTrainingService)
.scope(Scope.Singleton); .scope(Scope.Singleton);
} else if (platformMode === 'paiYarn') {
Container.bind(TrainingService)
.to(PAIYarnTrainingService)
.scope(Scope.Singleton);
} else if (platformMode === 'kubeflow') { } else if (platformMode === 'kubeflow') {
Container.bind(TrainingService) Container.bind(TrainingService)
.to(KubeflowTrainingService) .to(KubeflowTrainingService)
...@@ -97,7 +92,7 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN ...@@ -97,7 +92,7 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN
function usage(): void { function usage(): void {
console.info('usage: node main.js --port <port> --mode \ console.info('usage: node main.js --port <port> --mode \
<local/remote/pai/kubeflow/frameworkcontroller/paiYarn/aml/adl/hybrid> --start_mode <new/resume> --experiment_id <id> --foreground <true/false>'); <local/remote/pai/kubeflow/frameworkcontroller/aml/adl/hybrid> --start_mode <new/resume> --experiment_id <id> --foreground <true/false>');
} }
const strPort: string = parseArg(['--port', '-p']); const strPort: string = parseArg(['--port', '-p']);
...@@ -117,7 +112,7 @@ const foreground: boolean = foregroundArg.toLowerCase() === 'true' ? true : fals ...@@ -117,7 +112,7 @@ const foreground: boolean = foregroundArg.toLowerCase() === 'true' ? true : fals
const port: number = parseInt(strPort, 10); const port: number = parseInt(strPort, 10);
const mode: string = parseArg(['--mode', '-m']); const mode: string = parseArg(['--mode', '-m']);
if (!['local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml', 'adl', 'hybrid'].includes(mode)) { if (!['local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'dlts', 'aml', 'adl', 'hybrid'].includes(mode)) {
console.log(`FATAL: unknown mode: ${mode}`); console.log(`FATAL: unknown mode: ${mode}`);
usage(); usage();
process.exit(1); process.exit(1);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
'use strict'; 'use strict';
import { TrialJobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService'; import { TrialJobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService';
import {TrialConfig} from '../common/trialConfig';
export class PAIClusterConfig { export class PAIClusterConfig {
public readonly userName: string; public readonly userName: string;
...@@ -71,3 +72,37 @@ export class PAITrialJobDetail implements TrialJobDetail { ...@@ -71,3 +72,37 @@ export class PAITrialJobDetail implements TrialJobDetail {
this.paiJobDetailUrl = paiJobDetailUrl; this.paiJobDetailUrl = paiJobDetailUrl;
} }
} }
export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \
&& NNI_CODE_DIR={6} && mkdir -p $NNI_SYS_DIR/code && cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR/code && sh $NNI_SYS_DIR/install_nni.sh \
&& cd $NNI_SYS_DIR/code && python3 -m nni.tools.trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' \
--nni_manager_version '{10}' --log_collection '{11}' | tee $NNI_OUTPUT_DIR/trial.log`;
/**
* PAI trial configuration
*/
export class NNIPAITrialConfig extends TrialConfig {
public readonly cpuNum: number;
public readonly memoryMB: number;
public readonly image: string;
public virtualCluster?: string;
public readonly nniManagerNFSMountPath: string;
public readonly containerNFSMountPath: string;
public readonly paiStorageConfigName: string;
public readonly paiConfigPath?: string;
constructor(command: string, codeDir: string, gpuNum: number, cpuNum: number, memoryMB: number,
image: string, nniManagerNFSMountPath: string, containerNFSMountPath: string,
paiStorageConfigName: string, virtualCluster?: string, paiConfigPath?: string) {
super(command, codeDir, gpuNum);
this.cpuNum = cpuNum;
this.memoryMB = memoryMB;
this.image = image;
this.virtualCluster = virtualCluster;
this.nniManagerNFSMountPath = nniManagerNFSMountPath;
this.containerNFSMountPath = containerNFSMountPath;
this.paiStorageConfigName = paiStorageConfigName;
this.paiConfigPath = paiConfigPath;
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import {TrialConfig} from '../../common/trialConfig';
/**
* PAI trial configuration
*/
export class NNIPAIK8STrialConfig extends TrialConfig {
public readonly cpuNum: number;
public readonly memoryMB: number;
public readonly image: string;
public virtualCluster?: string;
public readonly nniManagerNFSMountPath: string;
public readonly containerNFSMountPath: string;
public readonly paiStorageConfigName: string;
public readonly paiConfigPath?: string;
constructor(command: string, codeDir: string, gpuNum: number, cpuNum: number, memoryMB: number,
image: string, nniManagerNFSMountPath: string, containerNFSMountPath: string,
paiStorageConfigName: string, virtualCluster?: string, paiConfigPath?: string) {
super(command, codeDir, gpuNum);
this.cpuNum = cpuNum;
this.memoryMB = memoryMB;
this.image = image;
this.virtualCluster = virtualCluster;
this.nniManagerNFSMountPath = nniManagerNFSMountPath;
this.containerNFSMountPath = containerNFSMountPath;
this.paiStorageConfigName = paiStorageConfigName;
this.paiConfigPath = paiConfigPath;
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
export const PAI_INSTALL_NNI_SHELL_FORMAT: string =
`#!/bin/bash
if python3 -c 'import nni' > /dev/null 2>&1; then
# nni module is already installed, skip
return
else
# Install nni
python3 -m pip install --user nni
fi`;
export const PAI_K8S_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \
&& NNI_CODE_DIR={6} && mkdir -p $NNI_SYS_DIR/code && cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR/code && sh $NNI_SYS_DIR/install_nni.sh \
&& cd $NNI_SYS_DIR/code && python3 -m nni.tools.trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' \
--nni_manager_version '{10}' --log_collection '{11}' | tee $NNI_OUTPUT_DIR/trial.log`;
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import * as path from 'path';
// tslint:disable-next-line:no-implicit-dependencies
import * as request from 'request';
import * as component from '../../../common/component';
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
import {
HyperParameters, NNIManagerIpConfig,
TrialJobApplicationForm, TrialJobDetail
} from '../../../common/trainingService';
import {
generateParamFileName,
getIPV4Address, getVersion, uniqueString
} from '../../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { execMkdir, validateCodeDir, execCopydir } from '../../common/util';
import { PAI_K8S_TRIAL_COMMAND_FORMAT } from './paiK8SData';
import { NNIPAIK8STrialConfig } from './paiK8SConfig';
import { PAITrainingService } from '../paiTrainingService';
import { PAIClusterConfig, PAITrialJobDetail } from '../paiConfig';
import { PAIJobRestServer } from '../paiJobRestServer';
const yaml = require('js-yaml');
/**
* Training Service implementation for OpenPAI (Open Platform for AI)
* Refer https://github.com/Microsoft/pai for more info about OpenPAI
*/
@component.Singleton
class PAIK8STrainingService extends PAITrainingService {
protected paiTrialConfig: NNIPAIK8STrialConfig | undefined;
private copyExpCodeDirPromise?: Promise<void>;
private paiJobConfig: any;
private nniVersion: string | undefined;
constructor() {
super();
}
public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
this.paiJobRestServer = new PAIJobRestServer(component.get(PAIK8STrainingService));
this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
this.paiClusterConfig.host = this.formatPAIHost(this.paiClusterConfig.host);
this.paiToken = this.paiClusterConfig.token;
break;
case TrialConfigMetadataKey.TRIAL_CONFIG: {
if (this.paiClusterConfig === undefined) {
this.log.error('pai cluster config is not initialized');
break;
}
this.paiTrialConfig = <NNIPAIK8STrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(this.paiTrialConfig.codeDir);
const nniManagerNFSExpCodeDir = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, 'nni-code');
await execMkdir(nniManagerNFSExpCodeDir);
//Copy codeDir files to local working folder
this.log.info(`Starting copy codeDir data from ${this.paiTrialConfig.codeDir} to ${nniManagerNFSExpCodeDir}`);
this.copyExpCodeDirPromise = execCopydir(this.paiTrialConfig.codeDir, nniManagerNFSExpCodeDir);
if (this.paiTrialConfig.paiConfigPath) {
this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8'));
}
break;
}
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
this.nniVersion = this.versionCheck ? await getVersion() : '';
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
break;
default:
//Reject for unknown keys
this.log.error(`Uknown key: ${key}`);
}
}
// update trial parameters for multi-phase
public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
}
// Write file content ( parameter.cfg ) to working folders
await this.writeParameterFile(trialJobDetail.logPath, form.hyperParameters);
return trialJobDetail;
}
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
if (this.paiClusterConfig === undefined) {
throw new Error(`paiClusterConfig not initialized!`);
}
if (this.paiTrialConfig === undefined) {
throw new Error(`paiTrialConfig not initialized!`);
}
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
const trialJobId: string = uniqueString(5);
//TODO: use HDFS working folder instead
const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
const logPath: string = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, trialJobId);
const paiJobDetailUrl: string = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${paiJobName}`;
const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
trialJobId,
'WAITING',
paiJobName,
Date.now(),
trialWorkingFolder,
form,
logPath,
paiJobDetailUrl);
this.trialJobsMap.set(trialJobId, trialJobDetail);
this.jobQueue.push(trialJobId);
return trialJobDetail;
}
private generateNNITrialCommand(trialJobDetail: PAITrialJobDetail, command: string): string {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const containerNFSExpCodeDir = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/nni-code`;
const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobDetail.id}`;
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const nniPaiTrialCommand: string = String.Format(
PAI_K8S_TRIAL_COMMAND_FORMAT,
`${containerWorkingDir}`,
`${containerWorkingDir}/nnioutput`,
trialJobDetail.id,
this.experimentId,
trialJobDetail.form.sequenceId,
this.isMultiPhase,
containerNFSExpCodeDir,
command,
nniManagerIp,
this.paiRestServerPort,
this.nniVersion,
this.logCollection
)
.replace(/\r\n|\n|\r/gm, '');
return nniPaiTrialCommand;
}
private generateJobConfigInYamlFormat(trialJobDetail: PAITrialJobDetail): any {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const jobName = `nni_exp_${this.experimentId}_trial_${trialJobDetail.id}`
let nniJobConfig: any = undefined;
if (this.paiTrialConfig.paiConfigPath) {
nniJobConfig = JSON.parse(JSON.stringify(this.paiJobConfig)); //Trick for deep clone in Typescript
nniJobConfig.name = jobName;
// Each taskRole will generate new command in NNI's command format
// Each command will be formatted to NNI style
for (const taskRoleIndex in nniJobConfig.taskRoles) {
const commands = nniJobConfig.taskRoles[taskRoleIndex].commands
const nniTrialCommand = this.generateNNITrialCommand(trialJobDetail, commands.join(" && ").replace(/(["'$`\\])/g, '\\$1'));
nniJobConfig.taskRoles[taskRoleIndex].commands = [nniTrialCommand]
}
} else {
nniJobConfig = {
protocolVersion: 2,
name: jobName,
type: 'job',
jobRetryCount: 0,
prerequisites: [
{
type: 'dockerimage',
uri: this.paiTrialConfig.image,
name: 'docker_image_0'
}
],
taskRoles: {
taskrole: {
instances: 1,
completion: {
minFailedInstances: 1,
minSucceededInstances: -1
},
taskRetryCount: 0,
dockerImage: 'docker_image_0',
resourcePerInstance: {
gpu: this.paiTrialConfig.gpuNum,
cpu: this.paiTrialConfig.cpuNum,
memoryMB: this.paiTrialConfig.memoryMB
},
commands: [
this.generateNNITrialCommand(trialJobDetail, this.paiTrialConfig.command)
]
}
},
extras: {
'storages': [
{
name: this.paiTrialConfig.paiStorageConfigName
}
],
submitFrom: 'submit-job-v2'
}
}
if (this.paiTrialConfig.virtualCluster) {
nniJobConfig.defaults = {
virtualCluster: this.paiTrialConfig.virtualCluster
}
}
}
return yaml.safeDump(nniJobConfig);
}
protected async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
}
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
if (this.paiToken === undefined) {
throw new Error('PAI token is not initialized');
}
if (this.paiJobRestServer === undefined) {
throw new Error('paiJobRestServer is not initialized');
}
// Make sure experiment code files is copied from local to NFS
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
this.log.info(`Copy codeDir data finished.`);
// All trials share same destination NFS code folder, only copy codeDir once for an experiment.
// After copy data finished, set copyExpCodeDirPromise be undefined to avoid log content duplicated.
this.copyExpCodeDirPromise = undefined;
}
this.paiRestServerPort = this.paiJobRestServer.clusterRestServerPort;
// Step 1. Prepare PAI job configuration
//create trial local working folder locally.
await execMkdir(trialJobDetail.logPath);
// Write NNI installation file to local files
await fs.promises.writeFile(path.join(trialJobDetail.logPath, 'install_nni.sh'), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local working folders
if (trialJobDetail.form !== undefined) {
await this.writeParameterFile(trialJobDetail.logPath, trialJobDetail.form.hyperParameters);
}
//Generate Job Configuration in yaml format
const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobDetail);
this.log.debug(paiJobConfig);
// Step 2. Submit PAI job via Rest call
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
const submitJobRequest: request.Options = {
uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs`,
method: 'POST',
body: paiJobConfig,
followAllRedirects: true,
headers: {
'Content-Type': 'text/yaml',
Authorization: `Bearer ${this.paiToken}`
}
};
request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
// If submit success, will get status code 202. refer: https://github.com/microsoft/pai/blob/master/src/rest-server/docs/swagger.yaml
if ((error !== undefined && error !== null) || response.statusCode >= 400) {
const errorMessage: string = (error !== undefined && error !== null) ? error.message :
`Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${body}`;
this.log.error(errorMessage);
trialJobDetail.status = 'FAILED';
deferred.reject(errorMessage);
} else {
trialJobDetail.submitTime = Date.now();
}
deferred.resolve(true);
});
return deferred.promise;
}
private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise<void> {
const filepath: string = path.join(directory, generateParamFileName(hyperParameters));
await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' });
}
}
export { PAIK8STrainingService };
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
'use strict'; 'use strict';
import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
import * as request from 'request'; import * as request from 'request';
import * as component from '../../common/component'; import * as component from '../../common/component';
...@@ -13,41 +14,53 @@ import { getExperimentId } from '../../common/experimentStartupInfo'; ...@@ -13,41 +14,53 @@ import { getExperimentId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log'; import { getLogger, Logger } from '../../common/log';
import { MethodNotImplementedError } from '../../common/errors'; import { MethodNotImplementedError } from '../../common/errors';
import { import {
NNIManagerIpConfig, TrainingService, HyperParameters, NNIManagerIpConfig, TrainingService,
TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, LogType TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, LogType
} from '../../common/trainingService'; } from '../../common/trainingService';
import { delay } from '../../common/utils'; import { delay } from '../../common/utils';
import { PAIJobInfoCollector } from './paiJobInfoCollector'; import { PAIJobInfoCollector } from './paiJobInfoCollector';
import { PAIJobRestServer } from './paiJobRestServer'; import { PAIJobRestServer } from './paiJobRestServer';
import { PAIClusterConfig, PAITrialJobDetail } from './paiConfig'; import { PAIClusterConfig, PAITrialJobDetail, PAI_TRIAL_COMMAND_FORMAT, NNIPAITrialConfig } from './paiConfig';
import { String } from 'typescript-string-operations';
import {
generateParamFileName,
getIPV4Address, getVersion, uniqueString
} from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { execMkdir, validateCodeDir, execCopydir } from '../common/util';
const yaml = require('js-yaml');
/** /**
* Training Service implementation for OpenPAI (Open Platform for AI) * Training Service implementation for OpenPAI (Open Platform for AI)
* Refer https://github.com/Microsoft/pai for more info about OpenPAI * Refer https://github.com/Microsoft/pai for more info about OpenPAI
*/ */
@component.Singleton @component.Singleton
abstract class PAITrainingService implements TrainingService { class PAITrainingService implements TrainingService {
protected readonly log!: Logger; private readonly log!: Logger;
protected readonly metricsEmitter: EventEmitter; private readonly metricsEmitter: EventEmitter;
protected readonly trialJobsMap: Map<string, PAITrialJobDetail>; private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
protected readonly expRootDir: string; private readonly expRootDir: string;
protected paiClusterConfig?: PAIClusterConfig; private paiClusterConfig?: PAIClusterConfig;
protected readonly jobQueue: string[]; private readonly jobQueue: string[];
protected stopping: boolean = false; private stopping: boolean = false;
protected paiToken?: string; private paiToken?: string;
protected paiTokenUpdateTime?: number; private paiTokenUpdateTime?: number;
protected readonly paiTokenUpdateInterval: number; private readonly paiTokenUpdateInterval: number;
protected readonly experimentId!: string; private readonly experimentId!: string;
protected readonly paiJobCollector: PAIJobInfoCollector; private readonly paiJobCollector: PAIJobInfoCollector;
protected paiRestServerPort?: number; private paiRestServerPort?: number;
protected nniManagerIpConfig?: NNIManagerIpConfig; private nniManagerIpConfig?: NNIManagerIpConfig;
protected versionCheck: boolean = true; private versionCheck: boolean = true;
protected logCollection: string; private logCollection: string;
protected isMultiPhase: boolean = false; private isMultiPhase: boolean = false;
protected authFileHdfsPath: string | undefined = undefined; private paiJobRestServer?: PAIJobRestServer;
protected portList?: string | undefined; private protocol: string = 'http';
protected paiJobRestServer?: PAIJobRestServer; private copyExpCodeDirPromise?: Promise<void>;
protected protocol: string = 'http'; private paiJobConfig: any;
private nniVersion: string | undefined;
private paiTrialConfig: NNIPAITrialConfig | undefined;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -76,18 +89,6 @@ abstract class PAITrainingService implements TrainingService { ...@@ -76,18 +89,6 @@ abstract class PAITrainingService implements TrainingService {
this.log.info('PAI training service exit.'); this.log.info('PAI training service exit.');
} }
public async submitTrialJob(_form: TrialJobApplicationForm): Promise<any> {
throw new Error('Not implemented!');
}
public async updateTrialJob(_trialJobId: string, _form: TrialJobApplicationForm): Promise<TrialJobDetail> {
throw new Error('Not implemented!');
}
protected async submitTrialJobToPAI(_trialJobId: string): Promise<boolean> {
throw new Error('Not implemented!');
}
protected async submitJobLoop(): Promise<void> { protected async submitJobLoop(): Promise<void> {
while (!this.stopping) { while (!this.stopping) {
while (!this.stopping && this.jobQueue.length > 0) { while (!this.stopping && this.jobQueue.length > 0) {
...@@ -104,10 +105,6 @@ abstract class PAITrainingService implements TrainingService { ...@@ -104,10 +105,6 @@ abstract class PAITrainingService implements TrainingService {
} }
} }
public async setClusterMetadata(_key: string, _value: string): Promise<void> {
throw new Error('Not implemented!');
}
public async listTrialJobs(): Promise<TrialJobDetail[]> { public async listTrialJobs(): Promise<TrialJobDetail[]> {
const jobs: TrialJobDetail[] = []; const jobs: TrialJobDetail[] = [];
...@@ -311,6 +308,274 @@ abstract class PAITrainingService implements TrainingService { ...@@ -311,6 +308,274 @@ abstract class PAITrainingService implements TrainingService {
return Promise.race([timeoutDelay, deferred.promise]) return Promise.race([timeoutDelay, deferred.promise])
.finally(() => { clearTimeout(timeoutId); }); .finally(() => { clearTimeout(timeoutId); });
} }
public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
this.paiJobRestServer = new PAIJobRestServer(component.get(PAITrainingService));
this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
this.paiClusterConfig.host = this.formatPAIHost(this.paiClusterConfig.host);
this.paiToken = this.paiClusterConfig.token;
break;
case TrialConfigMetadataKey.TRIAL_CONFIG: {
if (this.paiClusterConfig === undefined) {
this.log.error('pai cluster config is not initialized');
break;
}
this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(this.paiTrialConfig.codeDir);
const nniManagerNFSExpCodeDir = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, 'nni-code');
await execMkdir(nniManagerNFSExpCodeDir);
//Copy codeDir files to local working folder
this.log.info(`Starting copy codeDir data from ${this.paiTrialConfig.codeDir} to ${nniManagerNFSExpCodeDir}`);
this.copyExpCodeDirPromise = execCopydir(this.paiTrialConfig.codeDir, nniManagerNFSExpCodeDir);
if (this.paiTrialConfig.paiConfigPath) {
this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8'));
}
break;
}
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
this.nniVersion = this.versionCheck ? await getVersion() : '';
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
break;
default:
//Reject for unknown keys
this.log.error(`Uknown key: ${key}`);
}
}
// update trial parameters for multi-phase
public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
}
// Write file content ( parameter.cfg ) to working folders
await this.writeParameterFile(trialJobDetail.logPath, form.hyperParameters);
return trialJobDetail;
}
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
if (this.paiClusterConfig === undefined) {
throw new Error(`paiClusterConfig not initialized!`);
}
if (this.paiTrialConfig === undefined) {
throw new Error(`paiTrialConfig not initialized!`);
}
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
const trialJobId: string = uniqueString(5);
//TODO: use HDFS working folder instead
const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
const logPath: string = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, trialJobId);
const paiJobDetailUrl: string = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${paiJobName}`;
const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
trialJobId,
'WAITING',
paiJobName,
Date.now(),
trialWorkingFolder,
form,
logPath,
paiJobDetailUrl);
this.trialJobsMap.set(trialJobId, trialJobDetail);
this.jobQueue.push(trialJobId);
return trialJobDetail;
}
private generateNNITrialCommand(trialJobDetail: PAITrialJobDetail, command: string): string {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const containerNFSExpCodeDir = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/nni-code`;
const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobDetail.id}`;
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const nniPaiTrialCommand: string = String.Format(
PAI_TRIAL_COMMAND_FORMAT,
`${containerWorkingDir}`,
`${containerWorkingDir}/nnioutput`,
trialJobDetail.id,
this.experimentId,
trialJobDetail.form.sequenceId,
this.isMultiPhase,
containerNFSExpCodeDir,
command,
nniManagerIp,
this.paiRestServerPort,
this.nniVersion,
this.logCollection
)
.replace(/\r\n|\n|\r/gm, '');
return nniPaiTrialCommand;
}
private generateJobConfigInYamlFormat(trialJobDetail: PAITrialJobDetail): any {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const jobName = `nni_exp_${this.experimentId}_trial_${trialJobDetail.id}`
let nniJobConfig: any = undefined;
if (this.paiTrialConfig.paiConfigPath) {
nniJobConfig = JSON.parse(JSON.stringify(this.paiJobConfig)); //Trick for deep clone in Typescript
nniJobConfig.name = jobName;
// Each taskRole will generate new command in NNI's command format
// Each command will be formatted to NNI style
for (const taskRoleIndex in nniJobConfig.taskRoles) {
const commands = nniJobConfig.taskRoles[taskRoleIndex].commands
const nniTrialCommand = this.generateNNITrialCommand(trialJobDetail, commands.join(" && ").replace(/(["'$`\\])/g, '\\$1'));
nniJobConfig.taskRoles[taskRoleIndex].commands = [nniTrialCommand]
}
} else {
nniJobConfig = {
protocolVersion: 2,
name: jobName,
type: 'job',
jobRetryCount: 0,
prerequisites: [
{
type: 'dockerimage',
uri: this.paiTrialConfig.image,
name: 'docker_image_0'
}
],
taskRoles: {
taskrole: {
instances: 1,
completion: {
minFailedInstances: 1,
minSucceededInstances: -1
},
taskRetryCount: 0,
dockerImage: 'docker_image_0',
resourcePerInstance: {
gpu: this.paiTrialConfig.gpuNum,
cpu: this.paiTrialConfig.cpuNum,
memoryMB: this.paiTrialConfig.memoryMB
},
commands: [
this.generateNNITrialCommand(trialJobDetail, this.paiTrialConfig.command)
]
}
},
extras: {
'storages': [
{
name: this.paiTrialConfig.paiStorageConfigName
}
],
submitFrom: 'submit-job-v2'
}
}
if (this.paiTrialConfig.virtualCluster) {
nniJobConfig.defaults = {
virtualCluster: this.paiTrialConfig.virtualCluster
}
}
}
return yaml.safeDump(nniJobConfig);
}
protected async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
}
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
if (this.paiToken === undefined) {
throw new Error('PAI token is not initialized');
}
if (this.paiJobRestServer === undefined) {
throw new Error('paiJobRestServer is not initialized');
}
// Make sure experiment code files is copied from local to NFS
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
this.log.info(`Copy codeDir data finished.`);
// All trials share same destination NFS code folder, only copy codeDir once for an experiment.
// After copy data finished, set copyExpCodeDirPromise be undefined to avoid log content duplicated.
this.copyExpCodeDirPromise = undefined;
}
this.paiRestServerPort = this.paiJobRestServer.clusterRestServerPort;
// Step 1. Prepare PAI job configuration
//create trial local working folder locally.
await execMkdir(trialJobDetail.logPath);
// Write NNI installation file to local files
await fs.promises.writeFile(path.join(trialJobDetail.logPath, 'install_nni.sh'), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local working folders
if (trialJobDetail.form !== undefined) {
await this.writeParameterFile(trialJobDetail.logPath, trialJobDetail.form.hyperParameters);
}
//Generate Job Configuration in yaml format
const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobDetail);
this.log.debug(paiJobConfig);
// Step 2. Submit PAI job via Rest call
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
const submitJobRequest: request.Options = {
uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs`,
method: 'POST',
body: paiJobConfig,
followAllRedirects: true,
headers: {
'Content-Type': 'text/yaml',
Authorization: `Bearer ${this.paiToken}`
}
};
request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
// If submit success, will get status code 202. refer: https://github.com/microsoft/pai/blob/master/src/rest-server/docs/swagger.yaml
if ((error !== undefined && error !== null) || response.statusCode >= 400) {
const errorMessage: string = (error !== undefined && error !== null) ? error.message :
`Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${body}`;
this.log.error(errorMessage);
trialJobDetail.status = 'FAILED';
deferred.reject(errorMessage);
} else {
trialJobDetail.submitTime = Date.now();
}
deferred.resolve(true);
});
return deferred.promise;
}
private async writeParameterFile(directory: string, hyperParameters: HyperParameters): Promise<void> {
const filepath: string = path.join(directory, generateParamFileName(hyperParameters));
await fs.promises.writeFile(filepath, hyperParameters.value, { encoding: 'utf8' });
}
} }
export { PAITrainingService }; export { PAITrainingService };
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import * as fs from 'fs';
import * as path from 'path';
import { Deferred } from 'ts-deferred';
import { getExperimentId } from '../../../common/experimentStartupInfo';
import { getLogger } from '../../../common/log';
import { unixPathJoin } from '../../../common/utils';
/**
* HDFS client utility, including copy file/directory
*/
export namespace HDFSClientUtility {
/**
* Get NNI experiment root directory
* @param hdfsUserName HDFS user name
*/
export function hdfsExpRootDir(hdfsUserName: string): string {
return '/' + unixPathJoin(hdfsUserName, 'nni', 'experiments', getExperimentId());
}
/**
* Get NNI experiment code directory
* @param hdfsUserName HDFS user name
*/
export function getHdfsExpCodeDir(hdfsUserName: string): string {
return unixPathJoin(hdfsExpRootDir(hdfsUserName), 'codeDir');
}
/**
* Get NNI trial working directory
* @param hdfsUserName HDFS user name
* @param trialId NNI trial ID
*/
export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
const root: string = hdfsExpRootDir(hdfsUserName);
return unixPathJoin(root, 'trials', trialId);
}
/**
* Copy a local file to hdfs directory
*
* @param localFilePath local file path(source)
* @param hdfsFilePath hdfs file path(target)
* @param hdfsClient hdfs client
*/
export async function copyFileToHdfs(localFilePath: string, hdfsFilePath: string, hdfsClient: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
fs.exists(localFilePath, (exists: boolean) => {
// Detect if local file exist
if (exists) {
const localFileStream: fs.ReadStream = fs.createReadStream(localFilePath);
const hdfsFileStream: any = hdfsClient.createWriteStream(hdfsFilePath);
localFileStream.pipe(hdfsFileStream);
hdfsFileStream.on('finish', () => {
deferred.resolve();
});
hdfsFileStream.on('error', (err: any) => {
getLogger()
.error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
deferred.reject(err);
});
} else {
getLogger()
.error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
deferred.reject('file not exist!');
}
});
return deferred.promise;
}
/**
* Recursively copy local directory to hdfs directory
*
* @param localDirectory local directory
* @param hdfsDirectory HDFS directory
* @param hdfsClient HDFS client
*/
export async function copyDirectoryToHdfs(localDirectory: string, hdfsDirectory: string, hdfsClient: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
// TODO: fs.readdirSync doesn't support ~($HOME)
const fileNameArray: string[] = fs.readdirSync(localDirectory);
for (const fileName of fileNameArray) {
const fullFilePath: string = path.join(localDirectory, fileName);
try {
if (fs.lstatSync(fullFilePath)
.isFile()) {
await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
} else {
// If filePath is a directory, recuisively copy it to remote directory
await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
}
} catch (error) {
deferred.reject(error);
}
}
// All files/directories are copied successfully, resolve
deferred.resolve();
return deferred.promise;
}
/**
* Check if an HDFS path already exists
*
* @param hdfsPath target path need to check in HDFS
* @param hdfsClient HDFS client
*/
export async function pathExists(hdfsPath: string, hdfsClient: any): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
hdfsClient.exists(hdfsPath, (exist: boolean) => {
deferred.resolve(exist);
});
let timeoutId: NodeJS.Timer;
const delayTimeout: Promise<boolean> = new Promise<boolean>((resolve: Function, reject: Function): void => {
// Set timeout and reject the promise once reach timeout (5 seconds)
timeoutId = setTimeout(() => { reject(`Check HDFS path ${hdfsPath} exists timeout`); }, 5000);
});
return Promise.race([deferred.promise, delayTimeout])
.finally(() => { clearTimeout(timeoutId); });
}
/**
* Read content from HDFS file
*
* @param hdfsPath HDFS file path
* @param hdfsClient HDFS client
*/
export async function readFileFromHDFS(hdfsPath: string, hdfsClient: any): Promise<Buffer> {
const deferred: Deferred<Buffer> = new Deferred<Buffer>();
let buffer: Buffer = Buffer.alloc(0);
const exist: boolean = await pathExists(hdfsPath, hdfsClient);
if (!exist) {
deferred.reject(`${hdfsPath} doesn't exists`);
}
const remoteFileStream: any = hdfsClient.createReadStream(hdfsPath);
remoteFileStream.on('error', (err: any) => {
// Reject with the error
deferred.reject(err);
});
remoteFileStream.on('data', (chunk: any) => {
// Concat the data chunk to buffer
buffer = Buffer.concat([buffer, chunk]);
});
remoteFileStream.on('finish', () => {
// Upload is done, resolve
deferred.resolve(buffer);
});
return deferred.promise;
}
/**
* Mkdir in HDFS, use default permission 755
*
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient HDFS client
*/
export function mkdir(hdfsPath: string, hdfsClient: any): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
hdfsClient.mkdir(hdfsPath, (err: any) => {
if (!err) {
deferred.resolve(true);
} else {
deferred.reject(err.message);
}
});
return deferred.promise;
}
/**
* Read directory contents
*
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient HDFS client
*/
export async function readdir(hdfsPath: string, hdfsClient: any): Promise<string[]> {
const deferred: Deferred<string[]> = new Deferred<string[]>();
const exist: boolean = await pathExists(hdfsPath, hdfsClient);
if (!exist) {
deferred.reject(`${hdfsPath} doesn't exists`);
}
hdfsClient.readdir(hdfsPath, (err: any, files: any[]) => {
if (err) {
deferred.reject(err);
}
deferred.resolve(files);
});
return deferred.promise;
}
/**
* Delete HDFS path
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient HDFS client
* @param recursive Mark if need to delete recursively
*/
export function deletePath(hdfsPath: string, hdfsClient: any, recursive: boolean = true): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
hdfsClient.unlink(hdfsPath, recursive, (err: any) => {
if (!err) {
deferred.resolve(true);
} else {
deferred.reject(err.message);
}
});
return deferred.promise;
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import {TrialConfig} from '../../common/trialConfig';
/**
* Task role for PAI
*/
export class PAITaskRole {
// Name for the task role
public readonly name: string;
// Number of tasks for the task role, no less than 1
public readonly taskNumber: number;
// CPU number for one task in the task role, no less than 1
public readonly cpuNumber: number;
// Memory for one task in the task role, no less than 100
public readonly memoryMB: number;
// GPU number for one task in the task role, no less than 0
public readonly gpuNumber: number;
// Executable command for tasks in the task role, can not be empty
public readonly command: string;
//Shared memory for one task in the task role
public readonly shmMB?: number;
//portList to specify the port used in container
public portList?: PortListMetaData[];
/**
* Constructor
* @param name Name for the task role
* @param taskNumber Number of tasks for the task role, no less than 1
* @param cpuNumber CPU number for one task in the task role, no less than 1
* @param memoryMB Memory for one task in the task role, no less than 100
* @param gpuNumber GPU number for one task in the task role, no less than 0
* @param command Executable command for tasks in the task role, can not be empty
*/
constructor(name: string, taskNumber: number, cpuNumber: number, memoryMB: number, gpuNumber: number,
command: string, shmMB?: number, portList?: PortListMetaData[]) {
this.name = name;
this.taskNumber = taskNumber;
this.cpuNumber = cpuNumber;
this.memoryMB = memoryMB;
this.gpuNumber = gpuNumber;
this.command = command;
this.shmMB = shmMB;
this.portList = portList;
}
}
/**
* Trial job configuration submitted to PAI
*/
export class PAIJobConfig {
// Name for the job, need to be unique
public readonly jobName: string;
// URL pointing to the Docker image for all tasks in the job
public readonly image: string;
// Code directory on HDFS
public readonly codeDir: string;
//authentication file used for private Docker registry
public readonly authFile?: string;
// List of taskRole, one task role at least
public taskRoles: PAITaskRole[];
//The virtual cluster job runs on.
public readonly virtualCluster: string;
/**
* Constructor
* @param jobName Name for the job, need to be unique
* @param image URL pointing to the Docker image for all tasks in the job
* @param dataDir Data directory existing on HDFS
* @param outputDir Output directory on HDFS
* @param taskRoles List of taskRole, one task role at least
*/
constructor(jobName: string, image: string, codeDir: string,
taskRoles: PAITaskRole[], virtualCluster: string, authFile?: string) {
this.jobName = jobName;
this.image = image;
this.codeDir = codeDir;
this.taskRoles = taskRoles;
this.virtualCluster = virtualCluster;
this.authFile = authFile;
}
}
/**
* portList data structure used in PAI taskRole
*/
export class PortListMetaData {
public readonly label: string = '';
public readonly beginAt: number = 0;
public readonly portNumber: number = 0;
}
/**
* PAI trial configuration
*/
export class NNIPAITrialConfig extends TrialConfig {
public readonly cpuNum: number;
public readonly memoryMB: number;
public readonly image: string;
//The virtual cluster job runs on. If omitted, the job will run on default virtual cluster
public virtualCluster?: string;
//Shared memory for one task in the task role
public shmMB?: number;
//authentication file used for private Docker registry
public authFile?: string;
//portList to specify the port used in container
public portList?: PortListMetaData[];
constructor(command: string, codeDir: string, gpuNum: number, cpuNum: number, memoryMB: number,
image: string, virtualCluster?: string, shmMB?: number, authFile?: string, portList?: PortListMetaData[]) {
super(command, codeDir, gpuNum);
this.cpuNum = cpuNum;
this.memoryMB = memoryMB;
this.image = image;
this.virtualCluster = virtualCluster;
this.shmMB = shmMB;
this.authFile = authFile;
this.portList = portList;
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
export const PAI_INSTALL_NNI_SHELL_FORMAT: string =
`#!/bin/bash
if python3 -c 'import nni' > /dev/null 2>&1; then
# nni module is already installed, skip
return
else
# Install nni
python3 -m pip install --user nni
fi`;
export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=paiYarn NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \
&& cd $NNI_SYS_DIR && sh install_nni.sh \
&& python3 -m nni.tools.trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' \
--pai_hdfs_output_dir '{9}' --pai_hdfs_host '{10}' --pai_user_name {11} --nni_hdfs_exp_dir '{12}' --webhdfs_path '/webhdfs/api/v1' \
--nni_manager_version '{13}' --log_collection '{14}'`;
export const PAI_LOG_PATH_FORMAT: string =
`http://{0}/webhdfs/explorer.html#{1}`;
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import * as path from 'path';
import * as request from 'request';
import * as component from '../../../common/component';
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
import {
HyperParameters, NNIManagerIpConfig,
TrialJobApplicationForm, TrialJobDetail
} from '../../../common/trainingService';
import {
generateParamFileName,
getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin
} from '../../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { execMkdir, validateCodeDir } from '../../common/util';
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIJobConfig, PAITaskRole } from './paiYarnConfig';
import { PAI_LOG_PATH_FORMAT, PAI_TRIAL_COMMAND_FORMAT } from './paiYarnData';
import { PAITrainingService } from '../paiTrainingService';
import { PAIClusterConfig, PAITrialJobDetail } from '../paiConfig';
import * as WebHDFS from 'webhdfs';
import { PAIJobRestServer, ParameterFileMeta } from '../paiJobRestServer';
/**
* Training Service implementation for OpenPAI (Open Platform for AI)
* Refer https://github.com/Microsoft/pai for more info about OpenPAI
*/
@component.Singleton
class PAIYarnTrainingService extends PAITrainingService {
private hdfsClient: any;
private copyExpCodeDirPromise?: Promise<void>;
private copyAuthFilePromise?: Promise<void>;
private paiTrialConfig?: NNIPAITrialConfig;
constructor() {
super();
}
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
if (this.paiClusterConfig === undefined) {
throw new Error(`paiBaseClusterConfig not initialized!`);
}
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
const trialJobId: string = uniqueString(5);
//TODO: use HDFS working folder instead
const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
const hdfsLogPath: string = String.Format(
PAI_LOG_PATH_FORMAT,
this.paiClusterConfig.host,
hdfsOutputDir
);
const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
trialJobId,
'WAITING',
paiJobName,
Date.now(),
trialWorkingFolder,
form,
hdfsLogPath);
this.trialJobsMap.set(trialJobId, trialJobDetail);
this.jobQueue.push(trialJobId);
return trialJobDetail;
}
public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.PAI_YARN_CLUSTER_CONFIG:
this.paiJobRestServer = new PAIJobRestServer(component.get(PAIYarnTrainingService));
this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
this.paiClusterConfig.host = this.formatPAIHost(this.paiClusterConfig.host);
this.hdfsClient = WebHDFS.createClient({
user: this.paiClusterConfig.userName,
// Refer PAI document for Pylon mapping https://github.com/Microsoft/pai/tree/master/docs/pylon
port: 80,
path: '/webhdfs/api/v1',
host: this.paiClusterConfig.host
});
this.paiClusterConfig.host = this.formatPAIHost(this.paiClusterConfig.host);
if (this.paiClusterConfig.passWord) {
// Get PAI authentication token
await this.updatePaiToken();
} else if (this.paiClusterConfig.token) {
this.paiToken = this.paiClusterConfig.token;
} else {
throw new Error('pai cluster config format error, please set password or token!');
}
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
if (this.paiClusterConfig === undefined) {
this.log.error('pai cluster config is not initialized');
break;
}
this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(this.paiTrialConfig.codeDir);
// Copy experiment files from local folder to HDFS
this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
this.paiTrialConfig.codeDir,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
this.hdfsClient
);
// Upload authFile to hdfs
if (this.paiTrialConfig.authFile) {
this.authFileHdfsPath = unixPathJoin(HDFSClientUtility.hdfsExpRootDir(this.paiClusterConfig.userName), 'authFile');
this.copyAuthFilePromise = HDFSClientUtility.copyFileToHdfs(this.paiTrialConfig.authFile, this.authFileHdfsPath, this.hdfsClient);
}
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
break;
default:
//Reject for unknown keys
throw new Error(`Uknown key: ${key}`);
}
}
protected async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
}
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
if (this.paiToken === undefined) {
throw new Error('PAI token is not initialized');
}
if (this.paiJobRestServer === undefined) {
throw new Error('paiJobRestServer is not initialized');
}
this.paiRestServerPort = this.paiJobRestServer.clusterRestServerPort;
// Make sure experiment code files is copied from local to HDFS
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
//Make sure authFile is copied from local to HDFS
if (this.paiTrialConfig.authFile) {
await this.copyAuthFilePromise;
}
// Step 1. Prepare PAI job configuration
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//create tmp trial working folder locally.
await execMkdir(trialLocalTempFolder);
const runScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local tmp folders
if (trialJobDetail.form !== undefined) {
await fs.promises.writeFile(
path.join(trialLocalTempFolder, generateParamFileName(trialJobDetail.form.hyperParameters)),
trialJobDetail.form.hyperParameters.value, { encoding: 'utf8' }
);
}
const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
const hdfsOutputDir: string = unixPathJoin(hdfsCodeDir, 'nnioutput');
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const version: string = this.versionCheck ? await getVersion() : '';
const nniPaiTrialCommand: string = String.Format(
PAI_TRIAL_COMMAND_FORMAT,
// PAI will copy job's codeDir into /root directory
`$PWD/${trialJobId}`,
`$PWD/${trialJobId}/nnioutput`,
trialJobId,
this.experimentId,
trialJobDetail.form.sequenceId,
this.isMultiPhase,
this.paiTrialConfig.command,
nniManagerIp,
this.paiRestServerPort,
hdfsOutputDir,
this.paiClusterConfig.host,
this.paiClusterConfig.userName,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
version,
this.logCollection
)
.replace(/\r\n|\n|\r/gm, '');
this.log.info(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
const paiTaskRoles: PAITaskRole[] = [
new PAITaskRole(
`nni_trail_${trialJobId}`,
// Task role number
1,
// Task CPU number
this.paiTrialConfig.cpuNum,
// Task memory
this.paiTrialConfig.memoryMB,
// Task GPU number
this.paiTrialConfig.gpuNum,
// Task command
nniPaiTrialCommand,
// Task shared memory
this.paiTrialConfig.shmMB,
// Task portList
this.paiTrialConfig.portList
)
];
const paiJobConfig: PAIJobConfig = new PAIJobConfig(
// Job name
trialJobDetail.paiJobName,
// Docker image
this.paiTrialConfig.image,
// codeDir
`$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
// PAI Task roles
paiTaskRoles,
// Add Virutal Cluster
this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString(),
//Task auth File
this.authFileHdfsPath
);
// Step 2. Upload code files in codeDir onto HDFS
try {
await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
} catch (error) {
this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
trialJobDetail.status = 'FAILED'; // eslint-disable-line require-atomic-updates
return true;
}
// Step 3. Submit PAI job via Rest call
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
const submitJobRequest: request.Options = {
uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
method: 'POST',
json: true,
body: paiJobConfig,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.paiToken}`
}
};
request(submitJobRequest, (error: Error, response: request.Response, _body: any) => {
if ((error !== undefined && error !== null) || response.statusCode >= 400) {
const errorMessage: string = (error !== undefined && error !== null) ? error.message :
`Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body.message}`;
this.log.error(errorMessage);
trialJobDetail.status = 'FAILED';
deferred.resolve(true);
} else {
trialJobDetail.submitTime = Date.now();
deferred.resolve(true);
}
});
return deferred.promise;
}
public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
}
await this.writeParameterFile(trialJobId, form.hyperParameters);
return trialJobDetail;
}
protected async writeParameterFile(trialJobId: string, hyperParameters: HyperParameters): Promise<void> {
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
if (this.paiTrialConfig === undefined) {
throw new Error('PAI trial config is not initialized');
}
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
const hpFileName: string = generateParamFileName(hyperParameters);
const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
const hdfsHpFilePath: string = path.join(hdfsCodeDir, hpFileName);
await HDFSClientUtility.copyFileToHdfs(localFilepath, hdfsHpFilePath, this.hdfsClient);
await this.postParameterFileMeta({
experimentId: this.experimentId,
trialId: trialJobId,
filePath: hdfsHpFilePath
});
}
protected postParameterFileMeta(parameterFileMeta: ParameterFileMeta): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
if (this.paiJobRestServer === undefined) {
throw new Error('paiJobRestServer not implemented!');
}
const req: request.Options = {
uri: `${this.paiJobRestServer.endPoint}${this.paiJobRestServer.apiRootUrl}/parameter-file-meta`,
method: 'POST',
json: true,
body: parameterFileMeta
};
request(req, (err: Error, _res: request.Response) => {
if (err) {
deferred.reject(err);
} else {
deferred.resolve();
}
});
return deferred.promise;
}
}
export { PAIYarnTrainingService };
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import {TrialConfig} from '../../common/trialConfig';
/**
* PAI configuration to run trials
*/
export class PAIYarnTrialConfig extends TrialConfig {
public readonly cpuNum: number;
public readonly memoryMB: number;
public readonly image: string;
public readonly dataDir: string;
public readonly outputDir: string;
constructor(command: string, codeDir: string, gpuNum: number, cpuNum: number, memoryMB: number,
image: string, dataDir: string, outputDir: string) {
super(command, codeDir, gpuNum);
this.cpuNum = cpuNum;
this.memoryMB = memoryMB;
this.image = image;
this.dataDir = dataDir;
this.outputDir = outputDir;
}
}
...@@ -12,7 +12,7 @@ import { getExperimentId } from '../../../common/experimentStartupInfo'; ...@@ -12,7 +12,7 @@ import { getExperimentId } from '../../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../../common/log'; import { getLogger, Logger } from '../../../common/log';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey'; import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { PAIClusterConfig } from '../../pai/paiConfig'; import { PAIClusterConfig } from '../../pai/paiConfig';
import { NNIPAIK8STrialConfig } from '../../pai/paiK8S/paiK8SConfig'; import { NNIPAITrialConfig } from '../../pai/paiConfig';
import { EnvironmentInformation, EnvironmentService } from '../environment'; import { EnvironmentInformation, EnvironmentService } from '../environment';
import { StorageService } from '../storageService'; import { StorageService } from '../storageService';
...@@ -25,7 +25,7 @@ export class OpenPaiEnvironmentService extends EnvironmentService { ...@@ -25,7 +25,7 @@ export class OpenPaiEnvironmentService extends EnvironmentService {
private readonly log: Logger = getLogger(); private readonly log: Logger = getLogger();
private paiClusterConfig: PAIClusterConfig | undefined; private paiClusterConfig: PAIClusterConfig | undefined;
private paiTrialConfig: NNIPAIK8STrialConfig | undefined; private paiTrialConfig: NNIPAITrialConfig | undefined;
private paiJobConfig: any; private paiJobConfig: any;
private paiToken?: string; private paiToken?: string;
private protocol: string = 'http'; private protocol: string = 'http';
...@@ -62,7 +62,7 @@ export class OpenPaiEnvironmentService extends EnvironmentService { ...@@ -62,7 +62,7 @@ export class OpenPaiEnvironmentService extends EnvironmentService {
this.log.error('pai cluster config is not initialized'); this.log.error('pai cluster config is not initialized');
break; break;
} }
this.paiTrialConfig = <NNIPAIK8STrialConfig>JSON.parse(value); this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files // Validate to make sure codeDir doesn't have too many files
const storageService = component.get<StorageService>(StorageService); const storageService = component.get<StorageService>(StorageService);
......
...@@ -11,7 +11,7 @@ import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetri ...@@ -11,7 +11,7 @@ import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetri
import { delay } from '../../common/utils'; import { delay } from '../../common/utils';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { PAIClusterConfig } from '../pai/paiConfig'; import { PAIClusterConfig } from '../pai/paiConfig';
import { PAIK8STrainingService } from '../pai/paiK8S/paiK8STrainingService'; import { PAITrainingService } from '../pai/paiTrainingService';
import { RemoteMachineTrainingService } from '../remote_machine/remoteMachineTrainingService'; import { RemoteMachineTrainingService } from '../remote_machine/remoteMachineTrainingService';
import { MountedStorageService } from './storages/mountedStorageService'; import { MountedStorageService } from './storages/mountedStorageService';
import { StorageService } from './storageService'; import { StorageService } from './storageService';
...@@ -131,7 +131,7 @@ class RouterTrainingService implements TrainingService { ...@@ -131,7 +131,7 @@ class RouterTrainingService implements TrainingService {
await this.internalTrainingService.setClusterMetadata('platform_list', 'pai'); await this.internalTrainingService.setClusterMetadata('platform_list', 'pai');
} else { } else {
this.log.debug(`caching metadata key:{} value:{}, as training service is not determined.`); this.log.debug(`caching metadata key:{} value:{}, as training service is not determined.`);
this.internalTrainingService = component.get(PAIK8STrainingService); this.internalTrainingService = component.get(PAITrainingService);
} }
} else if (key === TrialConfigMetadataKey.AML_CLUSTER_CONFIG) { } else if (key === TrialConfigMetadataKey.AML_CLUSTER_CONFIG) {
this.internalTrainingService = component.get(TrialDispatcher); this.internalTrainingService = component.get(TrialDispatcher);
......
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as chai from 'chai';
import * as chaiAsPromised from 'chai-as-promised';
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import * as tmp from 'tmp';
import { cleanupUnitTest, prepareUnitTest, uniqueString } from '../../common/utils';
import { HDFSClientUtility } from '../pai/paiYarn/hdfsClientUtility';
var WebHDFS = require('webhdfs');
var rmdir = require('rmdir');
describe('WebHDFS', function () {
/*
To enable web HDFS client unit test, HDFS information needs to be configured in:
Default/.vscode/hdfsInfo.json, whose content looks like:
{
"user": "user1",
"port": 50070,
"host": "10.0.0.0"
}
*/
let skip: boolean = false;
let testHDFSInfo: any;
let hdfsClient: any;
try {
testHDFSInfo = JSON.parse(fs.readFileSync('../../.vscode/hdfsInfo.json', 'utf8'));
console.log(testHDFSInfo);
hdfsClient = WebHDFS.createClient({
user: testHDFSInfo.user,
port: testHDFSInfo.port,
host: testHDFSInfo.host
});
} catch (err) {
console.log('Please configure rminfo.json to enable remote machine unit test.');
skip = true;
}
before(() => {
chai.should();
chai.use(chaiAsPromised);
tmp.setGracefulCleanup();
prepareUnitTest();
});
after(() => {
cleanupUnitTest();
});
it('Test HDFS utility path functions', async () => {
if (skip) {
return;
}
const testPath : string = '/nni_unittest_' + uniqueString(6);
let exists : boolean = await HDFSClientUtility.pathExists(testPath, hdfsClient);
// The new random named path is expected to not exist
chai.expect(exists).to.be.equals(false);
const mkdirResult : boolean = await HDFSClientUtility.mkdir(testPath, hdfsClient);
// Mkdir is expected to be successful
chai.expect(mkdirResult).to.be.equals(true);
exists = await HDFSClientUtility.pathExists(testPath, hdfsClient);
// The newly created path is expected to exist
chai.expect(exists).to.be.equals(true);
const deleteResult : boolean = await HDFSClientUtility.deletePath(testPath, hdfsClient);
// Delete path is expected to be successful
chai.expect(deleteResult).to.be.equals(true);
exists = await HDFSClientUtility.pathExists(testPath, hdfsClient);
// The deleted path is not expected to exist
chai.expect(exists).to.be.equals(false);
});
it('Test HDFS utility copyFileToHdfs', async() => {
if (skip) {
return;
}
// Prepare local directory and files
const tmpLocalDirectoryPath : string = path.join(os.tmpdir(), 'nni_unittest_dir_' + uniqueString(6));
const tmpDataFilePath : string = path.join(tmpLocalDirectoryPath, 'file_' + uniqueString(6));
const testFileData : string = 'TestContent123';
fs.mkdirSync(tmpLocalDirectoryPath);
fs.writeFileSync(tmpDataFilePath, testFileData);
const testHDFSFilePath : string = '/nni_unittest_' + uniqueString(6);
let exists : boolean = await HDFSClientUtility.pathExists(testHDFSFilePath, hdfsClient);
// The new random named path is expected to not exist
chai.expect(exists).to.be.equals(false);
await HDFSClientUtility.copyFileToHdfs(tmpDataFilePath, testHDFSFilePath, hdfsClient);
exists = await HDFSClientUtility.pathExists(testHDFSFilePath, hdfsClient);
// After copy local file to HDFS, the target file path in HDFS is expected to exist
chai.expect(exists).to.be.equals(true);
const buffer : Buffer = await HDFSClientUtility.readFileFromHDFS(testHDFSFilePath, hdfsClient);
const actualFileData : string = buffer.toString('utf8');
// The file content read from HDFS is expected to equal to the content of local file
chai.expect(actualFileData).to.be.equals(testFileData);
const testHDFSDirPath : string = path.join('/nni_unittest_' + uniqueString(6) + '_dir');
await HDFSClientUtility.copyDirectoryToHdfs(tmpLocalDirectoryPath, testHDFSDirPath, hdfsClient);
const files : any[] = await HDFSClientUtility.readdir(testHDFSDirPath, hdfsClient);
// Expected file count under HDFS target directory is 1
chai.expect(files.length).to.be.equals(1);
// Expected file name under HDFS target directory is equal to local file name
chai.expect(files[0].pathSuffix).to.be.equals(path.parse(tmpDataFilePath).base);
// Cleanup
rmdir(tmpLocalDirectoryPath);
let deleteRestult : boolean = await HDFSClientUtility.deletePath(testHDFSFilePath, hdfsClient);
chai.expect(deleteRestult).to.be.equals(true);
deleteRestult = await HDFSClientUtility.deletePath(testHDFSDirPath, hdfsClient);
chai.expect(deleteRestult).to.be.equals(true);
});
});
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment