Unverified Commit 69b2e9aa authored by chicm-ms's avatar chicm-ms Committed by GitHub
Browse files

Pai training service uses job queue for submitting jobs (#973)

* Use job queue for PAI training service
parent 58b259a5
...@@ -18,36 +18,36 @@ ...@@ -18,36 +18,36 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/ */
'use strict' 'use strict';
import * as component from '../../common/component';
import * as cpp from 'child-process-promise'; import * as cpp from 'child-process-promise';
import * as fs from 'fs'; import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
import * as request from 'request'; import * as request from 'request';
import * as component from '../../common/component';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { Deferred } from 'ts-deferred';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo'; import { Deferred } from 'ts-deferred';
import { HDFSClientUtility } from './hdfsClientUtility'; import { String } from 'typescript-string-operations';
import { MethodNotImplementedError } from '../../common/errors'; import { MethodNotImplementedError } from '../../common/errors';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log'; import { getLogger, Logger } from '../../common/log';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { import {
JobApplicationForm, TrainingService, TrialJobApplicationForm, JobApplicationForm, NNIManagerIpConfig, TrainingService,
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
} from '../../common/trainingService'; } from '../../common/trainingService';
import { delay, generateParamFileName, import { delay, generateParamFileName,
getExperimentRootDir, getIPV4Address, uniqueString, getVersion } from '../../common/utils'; getExperimentRootDir, getIPV4Address, getVersion, uniqueString } from '../../common/utils';
import { PAIJobRestServer } from './paiJobRestServer' import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { PAITrialJobDetail, PAI_TRIAL_COMMAND_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_LOG_PATH_FORMAT } from './paiData'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
import { String } from 'typescript-string-operations';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
import { validateCodeDir } from '../common/util'; import { validateCodeDir } from '../common/util';
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
import { PAI_LOG_PATH_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
import { PAIJobRestServer } from './paiJobRestServer';
var WebHDFS = require('webhdfs'); const WebHDFS = require('webhdfs');
/** /**
* Training Service implementation for OpenPAI (Open Platform for AI) * Training Service implementation for OpenPAI (Open Platform for AI)
...@@ -61,6 +61,7 @@ class PAITrainingService implements TrainingService { ...@@ -61,6 +61,7 @@ class PAITrainingService implements TrainingService {
private readonly expRootDir: string; private readonly expRootDir: string;
private paiTrialConfig: NNIPAITrialConfig | undefined; private paiTrialConfig: NNIPAITrialConfig | undefined;
private paiClusterConfig?: PAIClusterConfig; private paiClusterConfig?: PAIClusterConfig;
private jobQueue: string[];
private stopping: boolean = false; private stopping: boolean = false;
private hdfsClient: any; private hdfsClient: any;
private paiToken? : string; private paiToken? : string;
...@@ -82,6 +83,7 @@ class PAITrainingService implements TrainingService { ...@@ -82,6 +83,7 @@ class PAITrainingService implements TrainingService {
this.log = getLogger(); this.log = getLogger();
this.metricsEmitter = new EventEmitter(); this.metricsEmitter = new EventEmitter();
this.trialJobsMap = new Map<string, PAITrialJobDetail>(); this.trialJobsMap = new Map<string, PAITrialJobDetail>();
this.jobQueue = [];
// Root dir on HDFS // Root dir on HDFS
this.expRootDir = path.join('/nni', 'experiments', getExperimentId()); this.expRootDir = path.join('/nni', 'experiments', getExperimentId());
this.experimentId = getExperimentId(); this.experimentId = getExperimentId();
...@@ -99,15 +101,9 @@ class PAITrainingService implements TrainingService { ...@@ -99,15 +101,9 @@ class PAITrainingService implements TrainingService {
await restServer.start(); await restServer.start();
restServer.setEnableVersionCheck = this.versionCheck; restServer.setEnableVersionCheck = this.versionCheck;
this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`); this.log.info(`PAI Training service rest server listening on: ${restServer.endPoint}`);
while (!this.stopping) { await Promise.all([
await this.updatePaiToken(); this.statusCheckingLoop(),
await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig); this.submitJobLoop()]);
if (restServer.getErrorMessage) {
throw new Error(restServer.getErrorMessage)
this.stopping = true;
}
await delay(3000);
}
this.log.info('PAI training service exit.'); this.log.info('PAI training service exit.');
} }
...@@ -118,88 +114,46 @@ class PAITrainingService implements TrainingService { ...@@ -118,88 +114,46 @@ class PAITrainingService implements TrainingService {
if (value.form.jobType === 'TRIAL') { if (value.form.jobType === 'TRIAL') {
jobs.push(await this.getTrialJob(key)); jobs.push(await this.getTrialJob(key));
} }
}; }
return Promise.resolve(jobs); return Promise.resolve(jobs);
} }
public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> { public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
if(!this.paiClusterConfig) { if (!this.paiClusterConfig) {
throw new Error('PAI Cluster config is not initialized'); throw new Error('PAI Cluster config is not initialized');
} }
const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (!paiTrialJob) { if (!paiTrialJob) {
return Promise.reject(`trial job ${trialJobId} not found`) return Promise.reject(`trial job ${trialJobId} not found`);
} }
return Promise.resolve(paiTrialJob); return Promise.resolve(paiTrialJob);
} }
public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) { public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
this.metricsEmitter.on('metric', listener); this.metricsEmitter.on('metric', listener);
} }
public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) { public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
this.metricsEmitter.off('metric', listener); this.metricsEmitter.off('metric', listener);
} }
public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> { public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>(); const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
if(!this.paiClusterConfig) { if (!this.hdfsBaseDir) {
throw new Error('PAI Cluster config is not initialized');
}
if (!this.paiTrialConfig) {
throw new Error('trial config is not initialized');
}
if (!this.paiToken) {
throw new Error('PAI token is not initialized');
}
if(!this.hdfsBaseDir) {
throw new Error('hdfsBaseDir is not initialized'); throw new Error('hdfsBaseDir is not initialized');
} }
if(!this.hdfsOutputHost) {
throw new Error('hdfsOutputHost is not initialized');
}
if(!this.paiRestServerPort) {
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
this.paiRestServerPort = restServer.clusterRestServerPort;
}
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`); this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
// Make sure experiment code files is copied from local to HDFS
if(this.copyExpCodeDirPromise) {
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5); const trialJobId: string = uniqueString(5);
const trialSequenceId: number = this.generateSequenceId(); const trialSequenceId: number = this.generateSequenceId();
//TODO: use HDFS working folder instead //TODO: use HDFS working folder instead
const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId); const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local tmp folders
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form)
if(trialForm) {
await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' });
}
// Step 1. Prepare PAI job configuration
const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`; const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId); const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
const hdfsLogPath : string = String.Format( const hdfsLogPath : string = String.Format(
...@@ -216,91 +170,10 @@ class PAITrainingService implements TrainingService { ...@@ -216,91 +170,10 @@ class PAITrainingService implements TrainingService {
form, form,
trialSequenceId, trialSequenceId,
hdfsLogPath); hdfsLogPath);
this.trialJobsMap.set(trialJobId, trialJobDetail);
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
const version = this.versionCheck? await getVersion(): '';
const nniPaiTrialCommand : string = String.Format(
PAI_TRIAL_COMMAND_FORMAT,
// PAI will copy job's codeDir into /root directory
`$PWD/${trialJobId}`,
`$PWD/${trialJobId}/nnioutput`,
trialJobId,
this.experimentId,
trialSequenceId,
this.paiTrialConfig.command,
nniManagerIp,
this.paiRestServerPort,
hdfsOutputDir,
this.hdfsOutputHost,
this.paiClusterConfig.userName,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
version,
this.logCollection
).replace(/\r\n|\n|\r/gm, '');
console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
const paiTaskRoles : PAITaskRole[] = [new PAITaskRole('nni_trail_' + trialJobId,
// Task role number
1,
// Task CPU number
this.paiTrialConfig.cpuNum,
// Task memory
this.paiTrialConfig.memoryMB,
// Task GPU number
this.paiTrialConfig.gpuNum,
// Task command
nniPaiTrialCommand,
// Task shared memory
this.paiTrialConfig.shmMB)];
const paiJobConfig : PAIJobConfig = new PAIJobConfig(
// Job name
paiJobName,
// Docker image
this.paiTrialConfig.image,
// dataDir
this.paiTrialConfig.dataDir,
// outputDir
this.paiTrialConfig.outputDir,
// codeDir
`$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
// PAI Task roles
paiTaskRoles,
// Add Virutal Cluster
this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString());
// Step 2. Upload code files in codeDir onto HDFS
try {
await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
} catch (error) {
this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
throw new Error(error.message);
}
// Step 3. Submit PAI job via Rest call this.trialJobsMap.set(trialJobId, trialJobDetail);
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API this.jobQueue.push(trialJobId);
const submitJobRequest: request.Options = {
uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
method: 'POST',
json: true,
body: paiJobConfig,
headers: {
"Content-Type": "application/json",
"Authorization": 'Bearer ' + this.paiToken
}
};
request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
if (error || response.statusCode >= 400) {
const errorMessage : string = error ? error.message :
`Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
this.log.error(errorMessage);
trialJobDetail.status = 'FAILED';
deferred.reject(new Error(errorMessage));
} else {
trialJobDetail.submitTime = Date.now();
deferred.resolve(trialJobDetail); deferred.resolve(trialJobDetail);
}
});
return deferred.promise; return deferred.promise;
} }
...@@ -316,12 +189,13 @@ class PAITrainingService implements TrainingService { ...@@ -316,12 +189,13 @@ class PAITrainingService implements TrainingService {
public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> { public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail : PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); const trialJobDetail : PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
const deferred : Deferred<void> = new Deferred<void>(); const deferred : Deferred<void> = new Deferred<void>();
if(!trialJobDetail) { if (!trialJobDetail) {
this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`); this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
return Promise.reject(); return Promise.reject();
} }
if(!this.paiClusterConfig) { if (!this.paiClusterConfig) {
throw new Error('PAI Cluster config is not initialized'); throw new Error('PAI Cluster config is not initialized');
} }
if (!this.paiToken) { if (!this.paiToken) {
...@@ -332,10 +206,10 @@ class PAITrainingService implements TrainingService { ...@@ -332,10 +206,10 @@ class PAITrainingService implements TrainingService {
uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs/${trialJobDetail.paiJobName}/executionType`, uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs/${trialJobDetail.paiJobName}/executionType`,
method: 'PUT', method: 'PUT',
json: true, json: true,
body: {'value' : 'STOP'}, body: {value: 'STOP'},
headers: { headers: {
"Content-Type": "application/json", 'Content-Type': 'application/json',
"Authorization": 'Bearer ' + this.paiToken Authorization: `Bearer ${this.paiToken}`
} }
}; };
...@@ -345,7 +219,7 @@ class PAITrainingService implements TrainingService { ...@@ -345,7 +219,7 @@ class PAITrainingService implements TrainingService {
request(stopJobRequest, (error: Error, response: request.Response, body: any) => { request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
if (error || response.statusCode >= 400) { if (error || response.statusCode >= 400) {
this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`); this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
deferred.reject(error ? error.message : 'Stop trial failed, http code: ' + response.statusCode); deferred.reject(error ? error.message : `Stop trial failed, http code: ${response.statusCode}`);
} else { } else {
deferred.resolve(); deferred.resolve();
} }
...@@ -354,6 +228,7 @@ class PAITrainingService implements TrainingService { ...@@ -354,6 +228,7 @@ class PAITrainingService implements TrainingService {
return deferred.promise; return deferred.promise;
} }
// tslint:disable-next-line:max-func-body-length
public async setClusterMetadata(key: string, value: string): Promise<void> { public async setClusterMetadata(key: string, value: string): Promise<void> {
const deferred : Deferred<void> = new Deferred<void>(); const deferred : Deferred<void> = new Deferred<void>();
...@@ -364,7 +239,6 @@ class PAITrainingService implements TrainingService { ...@@ -364,7 +239,6 @@ class PAITrainingService implements TrainingService {
break; break;
case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG: case TrialConfigMetadataKey.PAI_CLUSTER_CONFIG:
//TODO: try catch exception when setting up HDFS client and get PAI token
this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value); this.paiClusterConfig = <PAIClusterConfig>JSON.parse(value);
this.hdfsClient = WebHDFS.createClient({ this.hdfsClient = WebHDFS.createClient({
...@@ -381,14 +255,14 @@ class PAITrainingService implements TrainingService { ...@@ -381,14 +255,14 @@ class PAITrainingService implements TrainingService {
break; break;
case TrialConfigMetadataKey.TRIAL_CONFIG: case TrialConfigMetadataKey.TRIAL_CONFIG:
if (!this.paiClusterConfig){ if (!this.paiClusterConfig) {
this.log.error('pai cluster config is not initialized'); this.log.error('pai cluster config is not initialized');
deferred.reject(new Error('pai cluster config is not initialized')); deferred.reject(new Error('pai cluster config is not initialized'));
break; break;
} }
this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value); this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
//paiTrialConfig.outputDir could be null if it is not set in nnictl //paiTrialConfig.outputDir could be null if it is not set in nnictl
if(this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null){ if (this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null){
this.paiTrialConfig.outputDir = String.Format( this.paiTrialConfig.outputDir = String.Format(
PAI_OUTPUT_DIR_FORMAT, PAI_OUTPUT_DIR_FORMAT,
this.paiClusterConfig.host this.paiClusterConfig.host
...@@ -406,11 +280,11 @@ class PAITrainingService implements TrainingService { ...@@ -406,11 +280,11 @@ class PAITrainingService implements TrainingService {
const hdfsDirContent = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern); const hdfsDirContent = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern);
if(hdfsDirContent === null) { if (hdfsDirContent === null) {
throw new Error('Trial outputDir format Error'); throw new Error('Trial outputDir format Error');
} }
const groups = hdfsDirContent.groups; const groups = hdfsDirContent.groups;
if(groups === undefined) { if (groups === undefined) {
throw new Error('Trial outputDir format Error'); throw new Error('Trial outputDir format Error');
} }
...@@ -418,12 +292,12 @@ class PAITrainingService implements TrainingService { ...@@ -418,12 +292,12 @@ class PAITrainingService implements TrainingService {
//TODO: choose to use /${username} as baseDir //TODO: choose to use /${username} as baseDir
this.hdfsBaseDir = groups['baseDir']; this.hdfsBaseDir = groups['baseDir'];
if(this.hdfsBaseDir === undefined) { if(this.hdfsBaseDir === undefined) {
this.hdfsBaseDir = "/"; this.hdfsBaseDir = '/';
} }
let dataOutputHdfsClient; let dataOutputHdfsClient;
if (this.paiClusterConfig.host === this.hdfsOutputHost && this.hdfsClient) { if (this.paiClusterConfig.host === this.hdfsOutputHost && this.hdfsClient) {
dataOutputHdfsClient = this.hdfsClient dataOutputHdfsClient = this.hdfsClient;
} else { } else {
dataOutputHdfsClient = WebHDFS.createClient({ dataOutputHdfsClient = WebHDFS.createClient({
user: this.paiClusterConfig.userName, user: this.paiClusterConfig.userName,
...@@ -433,18 +307,20 @@ class PAITrainingService implements TrainingService { ...@@ -433,18 +307,20 @@ class PAITrainingService implements TrainingService {
} }
try { try {
const exist : boolean = await HDFSClientUtility.pathExists("/", dataOutputHdfsClient); const exist : boolean = await HDFSClientUtility.pathExists('/', dataOutputHdfsClient);
if(!exist) { if (!exist) {
deferred.reject(new Error(`Please check hdfsOutputDir host!`)); deferred.reject(new Error(`Please check hdfsOutputDir host!`));
} }
} catch(error) { } catch (error) {
deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`)); deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`));
} }
// Copy experiment files from local folder to HDFS // Copy experiment files from local folder to HDFS
this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(this.paiTrialConfig.codeDir, this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(
this.paiTrialConfig.codeDir,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName), HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
this.hdfsClient); this.hdfsClient
);
deferred.resolve(); deferred.resolve();
break; break;
...@@ -466,6 +342,7 @@ class PAITrainingService implements TrainingService { ...@@ -466,6 +342,7 @@ class PAITrainingService implements TrainingService {
const deferred : Deferred<string> = new Deferred<string>(); const deferred : Deferred<string> = new Deferred<string>();
deferred.resolve(); deferred.resolve();
return deferred.promise; return deferred.promise;
} }
...@@ -491,6 +368,157 @@ class PAITrainingService implements TrainingService { ...@@ -491,6 +368,157 @@ class PAITrainingService implements TrainingService {
return this.metricsEmitter; return this.metricsEmitter;
} }
// tslint:disable-next-line:max-func-body-length
private async submitTrialJobToPAI(trialJobId: string): Promise<boolean> {
const deferred : Deferred<boolean> = new Deferred<boolean>();
const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (!trialJobDetail) {
throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
}
if (!this.paiClusterConfig) {
throw new Error('PAI Cluster config is not initialized');
}
if (!this.paiTrialConfig) {
throw new Error('trial config is not initialized');
}
if (!this.paiToken) {
throw new Error('PAI token is not initialized');
}
if (!this.hdfsBaseDir) {
throw new Error('hdfsBaseDir is not initialized');
}
if (!this.hdfsOutputHost) {
throw new Error('hdfsOutputHost is not initialized');
}
if (!this.paiRestServerPort) {
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
this.paiRestServerPort = restServer.clusterRestServerPort;
}
// Make sure experiment code files is copied from local to HDFS
if (this.copyExpCodeDirPromise) {
await this.copyExpCodeDirPromise;
}
// Step 1. Prepare PAI job configuration
const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local tmp folders
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
if (trialForm) {
await fs.promises.writeFile(
path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' }
);
}
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const version: string = this.versionCheck ? await getVersion() : '';
const nniPaiTrialCommand : string = String.Format(
PAI_TRIAL_COMMAND_FORMAT,
// PAI will copy job's codeDir into /root directory
`$PWD/${trialJobId}`,
`$PWD/${trialJobId}/nnioutput`,
trialJobId,
this.experimentId,
trialJobDetail.sequenceId,
this.paiTrialConfig.command,
nniManagerIp,
this.paiRestServerPort,
hdfsOutputDir,
this.hdfsOutputHost,
this.paiClusterConfig.userName,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
version,
this.logCollection
).replace(/\r\n|\n|\r/gm, '');
console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
const paiTaskRoles : PAITaskRole[] = [
new PAITaskRole(
`nni_trail_${trialJobId}`,
// Task role number
1,
// Task CPU number
this.paiTrialConfig.cpuNum,
// Task memory
this.paiTrialConfig.memoryMB,
// Task GPU number
this.paiTrialConfig.gpuNum,
// Task command
nniPaiTrialCommand,
// Task shared memory
this.paiTrialConfig.shmMB
)
];
const paiJobConfig : PAIJobConfig = new PAIJobConfig(
// Job name
trialJobDetail.paiJobName,
// Docker image
this.paiTrialConfig.image,
// dataDir
this.paiTrialConfig.dataDir,
// outputDir
this.paiTrialConfig.outputDir,
// codeDir
`$PAI_DEFAULT_FS_URI${hdfsCodeDir}`,
// PAI Task roles
paiTaskRoles,
// Add Virutal Cluster
this.paiTrialConfig.virtualCluster === undefined ? 'default' : this.paiTrialConfig.virtualCluster.toString()
);
// Step 2. Upload code files in codeDir onto HDFS
try {
await HDFSClientUtility.copyDirectoryToHdfs(trialLocalTempFolder, hdfsCodeDir, this.hdfsClient);
} catch (error) {
this.log.error(`PAI Training service: copy ${this.paiTrialConfig.codeDir} to HDFS ${hdfsCodeDir} failed, error is ${error}`);
throw new Error(error.message);
}
// Step 3. Submit PAI job via Rest call
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
const submitJobRequest: request.Options = {
uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs`,
method: 'POST',
json: true,
body: paiJobConfig,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.paiToken}`
}
};
request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
if (error || response.statusCode >= 400) {
const errorMessage : string = error ? error.message :
`Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
this.log.error(errorMessage);
trialJobDetail.status = 'FAILED';
deferred.reject(new Error(errorMessage));
} else {
trialJobDetail.submitTime = Date.now();
deferred.resolve(true);
}
});
return deferred.promise;
}
private generateSequenceId(): number { private generateSequenceId(): number {
if (this.nextTrialSequenceId === -1) { if (this.nextTrialSequenceId === -1) {
this.nextTrialSequenceId = getInitTrialSequenceId(); this.nextTrialSequenceId = getInitTrialSequenceId();
...@@ -499,22 +527,50 @@ class PAITrainingService implements TrainingService { ...@@ -499,22 +527,50 @@ class PAITrainingService implements TrainingService {
return this.nextTrialSequenceId++; return this.nextTrialSequenceId++;
} }
private async statusCheckingLoop(): Promise<void> {
while (!this.stopping) {
await this.updatePaiToken();
await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
if (restServer.getErrorMessage) {
throw new Error(restServer.getErrorMessage);
}
await delay(3000);
}
}
private async submitJobLoop(): Promise<void> {
while (!this.stopping) {
while (!this.stopping && this.jobQueue.length > 0) {
const trialJobId: string = this.jobQueue[0];
if (await this.submitTrialJobToPAI(trialJobId)) {
// Remove trial job with trialJobId from job queue
this.jobQueue.shift();
} else {
// Break the while loop since failed to submitJob
break;
}
}
await delay(3000);
}
}
/** /**
* Update pai token by the interval time or initialize the pai token * Update pai token by the interval time or initialize the pai token
*/ */
private async updatePaiToken(): Promise<void> { private async updatePaiToken(): Promise<void> {
const deferred : Deferred<void> = new Deferred<void>(); const deferred : Deferred<void> = new Deferred<void>();
let currentTime: number = new Date().getTime(); const currentTime: number = new Date().getTime();
//If pai token initialized and not reach the interval time, do not update //If pai token initialized and not reach the interval time, do not update
if(this.paiTokenUpdateTime && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval){ if (this.paiTokenUpdateTime && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval){
return Promise.resolve(); return Promise.resolve();
} }
if(!this.paiClusterConfig){ if (!this.paiClusterConfig) {
const paiClusterConfigError = `pai cluster config not initialized!` const paiClusterConfigError: string = `pai cluster config not initialized!`;
this.log.error(`${paiClusterConfigError}`); this.log.error(`${paiClusterConfigError}`);
throw Error(`${paiClusterConfigError}`) throw Error(`${paiClusterConfigError}`);
} }
const authentication_req: request.Options = { const authentication_req: request.Options = {
...@@ -532,7 +588,7 @@ class PAITrainingService implements TrainingService { ...@@ -532,7 +588,7 @@ class PAITrainingService implements TrainingService {
this.log.error(`Get PAI token failed: ${error.message}`); this.log.error(`Get PAI token failed: ${error.message}`);
deferred.reject(new Error(`Get PAI token failed: ${error.message}`)); deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
} else { } else {
if(response.statusCode !== 200){ if (response.statusCode !== 200){
this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`); this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`)); deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
} }
...@@ -550,8 +606,9 @@ class PAITrainingService implements TrainingService { ...@@ -550,8 +606,9 @@ class PAITrainingService implements TrainingService {
5000); 5000);
}); });
return Promise.race([timeoutDelay, deferred.promise]).finally(() => clearTimeout(timeoutId)); return Promise.race([timeoutDelay, deferred.promise])
.finally(() => clearTimeout(timeoutId));
} }
} }
export { PAITrainingService } export { PAITrainingService };
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment