Unverified Commit ee25377d authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Update pai yaml merge method (#2369)

parent f5dbcd81
......@@ -92,8 +92,18 @@ Compared with [LocalMode](LocalMode.md) and [RemoteMachineMode](RemoteMachineMod
* Required key. Set the mount path in your container used in PAI.
* paiStoragePlugin
* Optional key. Set the storage plugin name used in PAI. If it is not set in trial configuration, it should be set in the config file specified in `paiConfigPath` field.
* command
* Optional key. Set the commands used in PAI container.
* paiConfigPath
* Optional key. Set the file path of pai job configuration, the file is in yaml format.
If users set `paiConfigPath` in NNI's configuration file, no need to specify the fields `command`, `paiStoragePlugin`, `virtualCluster`, `image`, `memoryMB`, `cpuNum`, `gpuNum` in `trial` configuration. These fields will use the values from the config file specified by `paiConfigPath`.
```
Note:
1. The job name in PAI's configuration file will be replaced by a new job name, the new job name is created by NNI, the name format is nni_exp_${this.experimentId}_trial_${trialJobId}.
2. If users set multiple taskRoles in PAI's configuration file, NNI will wrap all of these taksRoles and start multiple tasks in one trial job, users should ensure that only one taskRole report metric to NNI, otherwise there might be some conflict error.
```
Once complete to fill NNI experiment config file and save (for example, save as exp_pai.yml), then run the following command
......
......@@ -14,7 +14,6 @@
"azure-storage": "^2.10.2",
"chai-as-promised": "^7.1.1",
"child-process-promise": "^2.2.1",
"deepmerge": "^4.2.2",
"express": "^4.16.3",
"express-joi-validator": "^2.0.0",
"js-base64": "^2.4.9",
......
......@@ -44,7 +44,6 @@ import { PAIClusterConfig, PAITrialJobDetail } from '../paiConfig';
import { PAIJobRestServer } from '../paiJobRestServer';
const yaml = require('js-yaml');
const deepmerge = require('deepmerge');
/**
* Training Service implementation for OpenPAI (Open Platform for AI)
......@@ -53,9 +52,11 @@ const deepmerge = require('deepmerge');
@component.Singleton
class PAIK8STrainingService extends PAITrainingService {
protected paiTrialConfig: NNIPAIK8STrialConfig | undefined;
private paiJobConfig: undefined;
private nniVersion: string | undefined;
constructor() {
super();
}
public async setClusterMetadata(key: string, value: string): Promise<void> {
......@@ -84,9 +85,13 @@ class PAIK8STrainingService extends PAITrainingService {
this.paiTrialConfig = <NNIPAIK8STrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(this.paiTrialConfig.codeDir);
if (this.paiTrialConfig.paiConfigPath) {
this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8'));
}
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
this.nniVersion = this.versionCheck ? await getVersion() : '';
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
......@@ -142,12 +147,52 @@ class PAIK8STrainingService extends PAITrainingService {
return trialJobDetail;
}
public generateJobConfigInYamlFormat(trialJobId: string, command: string) {
private generateNNITrialCommand(trialJobDetail: PAITrialJobDetail, command: string) {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const jobName = `nni_exp_${this.experimentId}_trial_${trialJobId}`
const paiJobConfig: any = {
const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobDetail.id}`;
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const nniPaiTrialCommand: string = String.Format(
PAI_K8S_TRIAL_COMMAND_FORMAT,
`${containerWorkingDir}`,
`${containerWorkingDir}/nnioutput`,
trialJobDetail.id,
this.experimentId,
trialJobDetail.form.sequenceId,
this.isMultiPhase,
command,
nniManagerIp,
this.paiRestServerPort,
this.nniVersion,
this.logCollection
)
.replace(/\r\n|\n|\r/gm, '');
return nniPaiTrialCommand;
}
private generateJobConfigInYamlFormat(trialJobDetail: PAITrialJobDetail) {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const jobName = `nni_exp_${this.experimentId}_trial_${trialJobDetail.id}`
let nniJobConfig: any = undefined;
if (this.paiTrialConfig.paiConfigPath) {
nniJobConfig = this.paiJobConfig;
nniJobConfig.name = jobName;
// Each taskRole will generate new command in NNI's command format
// Each command will be formatted to NNI style
for(const taskRoleIndex in nniJobConfig.taskRoles) {
const commands = nniJobConfig.taskRoles[taskRoleIndex].commands
const nniTrialCommand = this.generateNNITrialCommand(trialJobDetail, commands.join(" && ").replace(/(["'$`\\])/g,'\\$1'));
nniJobConfig.taskRoles[taskRoleIndex].commands = [nniTrialCommand]
}
} else {
nniJobConfig = {
protocolVersion: 2,
name: jobName,
type: 'job',
......@@ -174,7 +219,7 @@ class PAIK8STrainingService extends PAITrainingService {
memoryMB: this.paiTrialConfig.memoryMB
},
commands: [
command
this.generateNNITrialCommand(trialJobDetail, this.paiTrialConfig.command)
]
}
},
......@@ -188,24 +233,12 @@ class PAIK8STrainingService extends PAITrainingService {
}
}
if (this.paiTrialConfig.virtualCluster) {
paiJobConfig.defaults= {
nniJobConfig.defaults = {
virtualCluster: this.paiTrialConfig.virtualCluster
}
}
if (this.paiTrialConfig.paiConfigPath) {
try {
const additionalPAIConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8'));
//deepmerge(x, y), if an element at the same key is present for both x and y, the value from y will appear in the result.
//refer: https://github.com/TehShrike/deepmerge
const overwriteMerge = (destinationArray: any, sourceArray: any, options: any) => sourceArray;
return yaml.safeDump(deepmerge(additionalPAIConfig, paiJobConfig, { arrayMerge: overwriteMerge }));
} catch (error) {
this.log.error(`Error occurs during loading and merge ${this.paiTrialConfig.paiConfigPath} : ${error}`);
}
} else {
return yaml.safeDump(paiJobConfig);
}
return yaml.safeDump(nniJobConfig);
}
protected async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
......@@ -247,29 +280,8 @@ class PAIK8STrainingService extends PAITrainingService {
//Copy codeDir files to local working folder
await execCopydir(this.paiTrialConfig.codeDir, trialJobDetail.logPath);
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const version: string = this.versionCheck ? await getVersion() : '';
const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobId}`;
const nniPaiTrialCommand: string = String.Format(
PAI_K8S_TRIAL_COMMAND_FORMAT,
`${containerWorkingDir}`,
`${containerWorkingDir}/nnioutput`,
trialJobId,
this.experimentId,
trialJobDetail.form.sequenceId,
this.isMultiPhase,
this.paiTrialConfig.command,
nniManagerIp,
this.paiRestServerPort,
version,
this.logCollection
)
.replace(/\r\n|\n|\r/gm, '');
this.log.info(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobId, nniPaiTrialCommand);
//Generate Job Configuration in yaml format
const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobDetail);
this.log.debug(paiJobConfig);
// Step 3. Submit PAI job via Rest call
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
......
......@@ -1332,11 +1332,6 @@ deepmerge@^2.1.1:
version "2.2.1"
resolved "https://registry.yarnpkg.com/deepmerge/-/deepmerge-2.2.1.tgz#5d3ff22a01c00f645405a2fbc17d0778a1801170"
deepmerge@^4.2.2:
version "4.2.2"
resolved "https://registry.yarnpkg.com/deepmerge/-/deepmerge-4.2.2.tgz#44d2ea3679b8f4d4ffba33f03d865fc1e7bf4955"
integrity sha512-FJ3UgI4gIl+PHZm53knsuSFpE+nESMr7M4v9QcgB7S63Kj/6WqMiFQJpBBYz1Pt+66bZpP3Q7Lye0Oo9MPKEdg==
default-require-extensions@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/default-require-extensions/-/default-require-extensions-3.0.0.tgz#e03f93aac9b2b6443fc52e5e4a37b3ad9ad8df96"
......
......@@ -287,7 +287,7 @@ pai_trial_schema = {
'codeDir': setPathCheck('codeDir'),
'nniManagerNFSMountPath': setPathCheck('nniManagerNFSMountPath'),
'containerNFSMountPath': setType('containerNFSMountPath', str),
'command': setType('command', str),
Optional('command'): setType('command', str),
Optional('gpuNum'): setNumberRange('gpuNum', int, 0, 99999),
Optional('cpuNum'): setNumberRange('cpuNum', int, 0, 99999),
Optional('memoryMB'): setType('memoryMB', int),
......
......@@ -266,35 +266,14 @@ def validate_pai_config_path(experiment_config):
'''validate paiConfigPath field'''
if experiment_config.get('trainingServicePlatform') == 'pai':
if experiment_config.get('trial', {}).get('paiConfigPath'):
# validate the file format of paiConfigPath, ensure it is yaml format
# validate commands
pai_config = get_yml_content(experiment_config['trial']['paiConfigPath'])
if experiment_config['trial'].get('image') is None:
if pai_config.get('prerequisites', [{}])[0].get('uri') is None:
print_error('Please set image field, or set image uri in your own paiConfig!')
taskRoles_dict = pai_config.get('taskRoles')
if not taskRoles_dict:
print_error('Please set taskRoles in paiConfigPath config file!')
exit(1)
experiment_config['trial']['image'] = pai_config['prerequisites'][0]['uri']
if experiment_config['trial'].get('gpuNum') is None:
if pai_config.get('taskRoles', {}).get('taskrole', {}).get('resourcePerInstance', {}).get('gpu') is None:
print_error('Please set gpuNum field, or set resourcePerInstance gpu in your own paiConfig!')
exit(1)
experiment_config['trial']['gpuNum'] = pai_config['taskRoles']['taskrole']['resourcePerInstance']['gpu']
if experiment_config['trial'].get('cpuNum') is None:
if pai_config.get('taskRoles', {}).get('taskrole', {}).get('resourcePerInstance', {}).get('cpu') is None:
print_error('Please set cpuNum field, or set resourcePerInstance cpu in your own paiConfig!')
exit(1)
experiment_config['trial']['cpuNum'] = pai_config['taskRoles']['taskrole']['resourcePerInstance']['cpu']
if experiment_config['trial'].get('memoryMB') is None:
if pai_config.get('taskRoles', {}).get('taskrole', {}).get('resourcePerInstance', {}).get('memoryMB', {}) is None:
print_error('Please set memoryMB field, or set resourcePerInstance memoryMB in your own paiConfig!')
exit(1)
experiment_config['trial']['memoryMB'] = pai_config['taskRoles']['taskrole']['resourcePerInstance']['memoryMB']
if experiment_config['trial'].get('paiStoragePlugin') is None:
if pai_config.get('extras', {}).get('com.microsoft.pai.runtimeplugin', [{}])[0].get('plugin') is None:
print_error('Please set paiStoragePlugin field, or set plugin in your own paiConfig!')
exit(1)
experiment_config['trial']['paiStoragePlugin'] = pai_config['extras']['com.microsoft.pai.runtimeplugin'][0]['plugin']
else:
pai_trial_fields_required_list = ['image', 'gpuNum', 'cpuNum', 'memoryMB', 'paiStoragePlugin']
pai_trial_fields_required_list = ['image', 'gpuNum', 'cpuNum', 'memoryMB', 'paiStoragePlugin', 'command']
for trial_field in pai_trial_fields_required_list:
if experiment_config['trial'].get(trial_field) is None:
print_error('Please set {0} in trial configuration,\
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment