Unverified Commit d7456c16 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Refactor code storage logic for trial (#2403)

parent bd7edf36
......@@ -3,6 +3,7 @@
'use strict';
import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
......@@ -72,6 +73,11 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}
// wait upload of code Dir to finish
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5);
// Set trial's NFS working folder
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
......@@ -81,8 +87,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
this.generateContainerPort();
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
//upload code files
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
//wait upload of script files to finish
const trialJobOutputUrl: string = await this.uploadFolder(trialLocalTempFolder, `nni/${getExperimentId()}/${trialJobId}`);
let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
......@@ -151,6 +157,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.fcTrialConfig.codeDir);
//upload codeDir to storage
this.copyExpCodeDirPromise = this.uploadFolder(this.fcTrialConfig.codeDir, `nni/${getExperimentId()}/nni-code`);
} catch (error) {
this.log.error(error);
......@@ -171,41 +179,31 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
}
/**
* upload code files to nfs or azureStroage
* @param trialJobId
* @param trialLocalTempFolder
* return: trialJobOutputUrl
* upload local folder to nfs or azureStroage
*/
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.fcClusterConfig === undefined) {
throw new Error('Kubeflow Cluster config is not initialized');
}
if (this.fcTrialConfig === undefined) {
throw new Error('Kubeflow trial config is not initialized');
}
let trialJobOutputUrl: string = '';
assert(this.fcClusterConfig.storage === undefined
|| this.fcClusterConfig.storage === 'azureStorage'
|| this.fcClusterConfig.storage === 'nfs');
if (this.fcClusterConfig.storageType === 'azureStorage') {
const azureFrameworkControllerClusterConfig: FrameworkControllerClusterConfigAzure =
<FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.fcTrialConfig.codeDir,
azureFrameworkControllerClusterConfig.uploadRetryCount);
} else if (this.fcClusterConfig.storageType === 'nfs') {
const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS =
<FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy code files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
// Copy codeDir to NFS mounted dir
await cpp.exec(`cp -r ${this.fcTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsFrameworkControllerClusterConfig.nfs;
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
if (this.fcClusterConfig.storage === 'azureStorage') {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
const fcClusterConfigAzure: FrameworkControllerClusterConfigAzure = <FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, fcClusterConfigAzure.uploadRetryCount);
} else if (this.fcClusterConfig.storage === 'nfs' || this.fcClusterConfig.storage === undefined) {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
const fcClusterConfigNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
const nfsConfig: NFSConfig = fcClusterConfigNFS.nfs;
return `nfs://${nfsConfig.server}:${destDirectory}`;
}
return Promise.resolve(trialJobOutputUrl);
return '';
}
/**
......
......@@ -74,14 +74,20 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}
// upload code Dir to storage
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5);
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//prepare the runscript
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
//upload files to sotrage
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
//upload script files to sotrage
const trialJobOutputUrl: string = await this.uploadFolder(trialLocalTempFolder, `nni/${getExperimentId()}/${trialJobId}`);
let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
......@@ -152,6 +158,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.kubeflowTrialConfig.codeDir);
//upload codeDir to storage
this.copyExpCodeDirPromise = this.uploadFolder(this.kubeflowTrialConfig.codeDir, `nni/${getExperimentId()}/nni-code`);
} catch (error) {
this.log.error(error);
......@@ -172,12 +180,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
}
/**
* upload code files to nfs or azureStroage
* @param trialJobId
* @param trialLocalTempFolder
* return: trialJobOutputUrl
* upload local folder to nfs or azureStroage
*/
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.kubeflowClusterConfig === undefined) {
throw new Error('Kubeflow Cluster config is not initialized');
}
......@@ -186,8 +191,6 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
throw new Error('Kubeflow Trial config is not initialized');
}
let trialJobOutputUrl: string = '';
assert(this.kubeflowClusterConfig.storage === undefined
|| this.kubeflowClusterConfig.storage === 'azureStorage'
|| this.kubeflowClusterConfig.storage === 'nfs');
......@@ -197,20 +200,15 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
throw new Error('azureStorageClient is not initialized');
}
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.kubeflowTrialConfig.codeDir, azureKubeflowClusterConfig.uploadRetryCount);
return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, azureKubeflowClusterConfig.uploadRetryCount);
} else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy script files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
// Copy codeDir to NFS mounted dir
await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
return `nfs://${nfsConfig.server}:${destDirectory}`;
}
return Promise.resolve(trialJobOutputUrl);
return '';
}
private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string,
......
......@@ -39,7 +39,7 @@ export class KubernetesTrialJobDetail implements TrialJobDetail {
export const kubernetesScriptFormat: string =
`#!/bin/bash
export NNI_PLATFORM={0}
export NNI_SYS_DIR=$PWD/nni/{1}
export NNI_SYS_DIR={1}
export NNI_OUTPUT_DIR={2}
export MULTI_PHASE=false
export NNI_TRIAL_JOB_ID={3}
......@@ -49,7 +49,7 @@ export NNI_TRIAL_SEQ_ID={6}
{7}
mkdir -p $NNI_SYS_DIR
mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR
cd $NNI_SYS_DIR
sh install_nni.sh
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} \
......
......@@ -49,6 +49,8 @@ abstract class KubernetesTrainingService {
protected kubernetesClusterConfig?: KubernetesClusterConfig;
protected versionCheck: boolean = true;
protected logCollection: string;
protected copyExpCodeDirPromise?: Promise<string>;
protected expContainerCodeFolder: string;
constructor() {
this.log = getLogger();
......@@ -57,6 +59,7 @@ abstract class KubernetesTrainingService {
this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.experimentId = getExperimentId();
this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.expContainerCodeFolder = path.join(this.CONTAINER_MOUNT_PATH, 'nni', this.experimentId, 'nni-code');
this.genericK8sClient = new GeneralK8sClient();
this.logCollection = 'none';
}
......@@ -272,11 +275,11 @@ abstract class KubernetesTrainingService {
const runScript: string = String.Format(
kubernetesScriptFormat,
platform,
trialJobId,
trialWorkingFolder,
path.join(trialWorkingFolder, 'output', `${roleName}_output`),
trialJobId,
getExperimentId(),
trialWorkingFolder,
this.expContainerCodeFolder,
trialSequenceId,
nvidiaScript,
command,
......@@ -329,51 +332,45 @@ abstract class KubernetesTrainingService {
);
return registrySecretName;
}
protected async uploadFilesToAzureStorage(trialJobId: string, trialLocalTempFolder: string, codeDir: string, uploadRetryCount: number | undefined): Promise<string> {
/**
* upload local directory to azureStorage
* @param srcDirectory the source directory of local folder
* @param destDirectory the target directory in azure
* @param uploadRetryCount the retry time when upload failed
*/
protected async uploadFolderToAzureStorage(srcDirectory: string, destDirectory: string, uploadRetryCount: number | undefined): Promise<string> {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
let trialJobOutputUrl: string = '';
let retryCount: number = 1;
if(uploadRetryCount) {
retryCount = uploadRetryCount;
}
let resultUploadNNIScript: boolean = false;
let resultUploadCodeFile: boolean = false;
let uploadSuccess: boolean = false;
let folderUriInAzure = '';
try {
do {
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage
if(!resultUploadNNIScript) {
resultUploadNNIScript = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${trialLocalTempFolder}`);
}
//upload code files to azure storage
if(!resultUploadCodeFile) {
resultUploadCodeFile = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${codeDir}`);
}
if (resultUploadNNIScript && resultUploadCodeFile) {
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}` +
`/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
break;
} else {
uploadSuccess = await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient,
`${destDirectory}`,
this.azureStorageShare,
`${srcDirectory}`);
if (!uploadSuccess) {
//wait for 5 seconds to re-upload files
await delay(5000);
this.log.info('Upload failed, Retry: upload files to azure-storage');
} else {
folderUriInAzure = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${destDirectory}`;
break;
}
} while (retryCount-- >= 0)
} catch (error) {
this.log.error(error);
//return a empty url when got error
return Promise.resolve("");
}
if(!trialJobOutputUrl) {
this.log.info(`Retry-count is used up, upload files to azureStorage for trial ${trialJobId} failed!`);
return Promise.resolve('');
}
return Promise.resolve(trialJobOutputUrl);
return Promise.resolve(folderUriInAzure);
}
}
......
......@@ -361,21 +361,25 @@ class LocalTrainingService implements TrainingService {
trialJobDetail: TrialJobDetail,
resource: { gpuIndices: number[] },
gpuNum: number | undefined): { key: string; value: string }[] {
const envVariables: { key: string; value: string }[] = [
{ key: 'NNI_PLATFORM', value: 'local' },
{ key: 'NNI_EXP_ID', value: this.experimentId },
{ key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
{ key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
{ key: 'MULTI_PHASE', value: this.isMultiPhase.toString() }
];
if (gpuNum !== undefined) {
envVariables.push({
key: 'CUDA_VISIBLE_DEVICES',
value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
});
}
if (this.localTrialConfig === undefined) {
throw new Error('localTrialConfig is not initialized!');
}
const envVariables: { key: string; value: string }[] = [
{ key: 'NNI_PLATFORM', value: 'local' },
{ key: 'NNI_EXP_ID', value: this.experimentId },
{ key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
{ key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
{ key: 'MULTI_PHASE', value: this.isMultiPhase.toString() },
{ key: 'NNI_CODE_DIR', value: this.localTrialConfig.codeDir}
];
if (gpuNum !== undefined) {
envVariables.push({
key: 'CUDA_VISIBLE_DEVICES',
value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
});
}
return envVariables;
}
......@@ -473,12 +477,16 @@ class LocalTrainingService implements TrainingService {
private getScript(localTrialConfig: TrialConfig, workingDirectory: string): string[] {
const script: string[] = [];
if (process.platform === 'win32') {
script.push(`Copy-Item $env:NNI_CODE_DIR\\* -Destination $env:NNI_SYS_DIR -Recurse`);
script.push(`cd $env:NNI_SYS_DIR`);
script.push(
`cmd.exe /c ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`,
`$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
`$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`,
`Write $LASTEXITCODE " " $NOW_DATE | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`);
} else {
script.push(`cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR`);
script.push(`cd $NNI_SYS_DIR`);
script.push(`eval ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`);
if (process.platform === 'darwin') {
// https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x
......@@ -506,7 +514,6 @@ class LocalTrainingService implements TrainingService {
if (process.platform !== 'win32') {
runScriptContent.push('#!/bin/bash');
}
runScriptContent.push(`cd '${this.localTrialConfig.codeDir}'`);
for (const variable of variables) {
runScriptContent.push(setEnvironmentVariable(variable));
}
......
......@@ -31,7 +31,6 @@ fi`;
export const PAI_K8S_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \
&& ls $NNI_SYS_DIR \
&& cd $NNI_SYS_DIR && sh install_nni.sh \
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{6}' --nnimanager_ip '{7}' --nnimanager_port '{8}' \
--nni_manager_version '{9}' --log_collection '{10}'`;
&& NNI_CODE_DIR={6} && cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR && cd $NNI_SYS_DIR && sh install_nni.sh \
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' \
--nni_manager_version '{10}' --log_collection '{11}'`;
......@@ -53,6 +53,7 @@ const yaml = require('js-yaml');
@component.Singleton
class PAIK8STrainingService extends PAITrainingService {
protected paiTrialConfig: NNIPAIK8STrialConfig | undefined;
private copyExpCodeDirPromise?: Promise<void>;
private paiJobConfig: undefined;
private nniVersion: string | undefined;
constructor() {
......@@ -78,7 +79,7 @@ class PAIK8STrainingService extends PAITrainingService {
}
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
case TrialConfigMetadataKey.TRIAL_CONFIG: {
if (this.paiClusterConfig === undefined) {
this.log.error('pai cluster config is not initialized');
break;
......@@ -86,10 +87,15 @@ class PAIK8STrainingService extends PAITrainingService {
this.paiTrialConfig = <NNIPAIK8STrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(this.paiTrialConfig.codeDir);
const nniManagerNFSExpCodeDir = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, 'nni-code');
await execMkdir(nniManagerNFSExpCodeDir);
//Copy codeDir files to local working folder
this.copyExpCodeDirPromise = execCopydir(this.paiTrialConfig.codeDir, nniManagerNFSExpCodeDir);
if (this.paiTrialConfig.paiConfigPath) {
this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8'));
}
break;
}
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
this.nniVersion = this.versionCheck ? await getVersion() : '';
......@@ -152,6 +158,7 @@ class PAIK8STrainingService extends PAITrainingService {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const containerNFSExpCodeDir = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/'nni-code`;
const containerWorkingDir: string = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}/${trialJobDetail.id}`;
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const nniPaiTrialCommand: string = String.Format(
......@@ -162,6 +169,7 @@ class PAIK8STrainingService extends PAITrainingService {
this.experimentId,
trialJobDetail.form.sequenceId,
this.isMultiPhase,
containerNFSExpCodeDir,
command,
nniManagerIp,
this.paiRestServerPort,
......@@ -264,15 +272,18 @@ class PAIK8STrainingService extends PAITrainingService {
throw new Error('paiJobRestServer is not initialized');
}
// Make sure experiment code files is copied from local to NFS
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
this.paiRestServerPort = this.paiJobRestServer.clusterRestServerPort;
// Step 1. Prepare PAI job configuration
//create trial local working folder locally.
await execMkdir(trialJobDetail.logPath);
const runScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local files
await fs.promises.writeFile(path.join(trialJobDetail.logPath, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
await fs.promises.writeFile(path.join(trialJobDetail.logPath, 'install_nni.sh'), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local working folders
if (trialJobDetail.form !== undefined) {
......@@ -284,7 +295,7 @@ class PAIK8STrainingService extends PAITrainingService {
//Generate Job Configuration in yaml format
const paiJobConfig = this.generateJobConfigInYamlFormat(trialJobDetail);
this.log.debug(paiJobConfig);
// Step 3. Submit PAI job via Rest call
// Step 2. Submit PAI job via Rest call
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
const submitJobRequest: request.Options = {
uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs`,
......
......@@ -179,13 +179,14 @@ export enum ScheduleResultType {
export const REMOTEMACHINE_TRIAL_COMMAND_FORMAT: string =
`#!/bin/bash
export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} \
NNI_TRIAL_SEQ_ID={4} export MULTI_PHASE={5}
NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} NNI_CODE_DIR={6}
cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR
cd $NNI_SYS_DIR
sh install_nni.sh
echo $$ >{6}
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' \
--nni_manager_version '{10}' --log_collection '{11}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{12}`;
echo $$ >{7}
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip '{9}' --nnimanager_port '{10}' \
--nni_manager_version '{11}' --log_collection '{12}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{13}`;
export const HOST_JOB_SHELL_FORMAT: string =
`#!/bin/bash
......
......@@ -26,7 +26,7 @@ import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { GPUSummary } from '../common/gpuData';
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { execCopydir, execMkdir, validateCodeDir, getGpuMetricsCollectorBashScriptContent } from '../common/util';
import { execMkdir, validateCodeDir, getGpuMetricsCollectorBashScriptContent } from '../common/util';
import { GPUScheduler } from './gpuScheduler';
import {
REMOTEMACHINE_TRIAL_COMMAND_FORMAT, RemoteMachineMeta,
......@@ -42,11 +42,13 @@ import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
@component.Singleton
class RemoteMachineTrainingService implements TrainingService {
private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; //machine excutor map
private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>;
private readonly trialExecutorMap: Map<string, ShellExecutor>; //trial excutor map
private readonly trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
private readonly MAX_TRIAL_NUMBER_PER_EXECUTOR: number = 5; // every excutor has a max trial concurrency number
private readonly expRootDir: string;
private readonly remoteExpRootDir: string;
private readonly remoteExpCodeDir: string;
private trialConfig: TrialConfig | undefined;
private gpuScheduler?: GPUScheduler;
private readonly jobQueue: string[];
......@@ -68,9 +70,11 @@ class RemoteMachineTrainingService implements TrainingService {
this.trialJobsMap = new Map<string, RemoteMachineTrialJobDetail>();
this.trialExecutorMap = new Map<string, ShellExecutor>();
this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>();
this.jobQueue = [];
this.expRootDir = getExperimentRootDir();
this.remoteExpRootDir = this.getRemoteExperimentRootDir();
this.remoteExpCodeDir = unixPathJoin(this.remoteExpRootDir, 'nni-code');
this.timer = timer;
this.log = getLogger();
this.trialSequenceId = -1;
......@@ -320,9 +324,20 @@ class RemoteMachineTrainingService implements TrainingService {
throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
}
// Validate to make sure codeDir doesn't have too many files
try {
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(remoteMachineTrailConfig.codeDir);
// Copy codeDir to remote machine
for (const [rmMeta, executorManager] of this.machineExecutorManagerMap.entries()) {
const executor: ShellExecutor = await executorManager.getAvailableExecutor();
if (executor !== undefined) {
this.machineCopyExpCodeDirPromiseMap.set(
rmMeta,
executor.copyDirectoryToRemote(remoteMachineTrailConfig.codeDir, this.remoteExpCodeDir, this.remoteOS)
);
}
}
} catch (error) {
this.log.error(error);
......@@ -480,6 +495,10 @@ class RemoteMachineTrainingService implements TrainingService {
const trialWorkingFolder: string = unixPathJoin(this.remoteExpRootDir, 'trials', trialJobId);
trialJobDetail.rmMeta = rmScheduleInfo.rmMeta;
const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get(trialJobDetail.rmMeta);
if (copyExpCodeDirPromise !== undefined) {
await copyExpCodeDirPromise;
}
await this.allocateExecutorForTrial(trialJobDetail);
await this.launchTrialOnScheduledMachine(
......@@ -554,6 +573,7 @@ class RemoteMachineTrainingService implements TrainingService {
getExperimentId(),
trialJobDetail.form.sequenceId.toString(),
this.isMultiPhase,
this.remoteExpCodeDir,
unixPathJoin(trialWorkingFolder, '.nni', 'jobpid'),
command,
nniManagerIp,
......@@ -565,12 +585,8 @@ class RemoteMachineTrainingService implements TrainingService {
//create tmp trial working folder locally.
await execMkdir(path.join(trialLocalTempFolder, '.nni'));
//create tmp trial working folder locally.
await execCopydir(this.trialConfig.codeDir, trialLocalTempFolder);
const installScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), installScriptContent, { encoding: 'utf8' });
// Write install_nni.sh
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), CONTAINER_INSTALL_NNI_SHELL_FORMAT, { encoding: 'utf8' });
// Write file content ( run.sh and parameter.cfg ) to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptTrialContent, { encoding: 'utf8' });
await this.writeParameterFile(trialJobId, form.hyperParameters);
......
......@@ -183,13 +183,14 @@ class ShellExecutor {
* Copy files and directories in local directory recursively to remote directory
* @param localDirectory local diretory
* @param remoteDirectory remote directory
* @param sshClient SSH client
* @param remoteOS the OS of remote machine
*/
public async copyDirectoryToRemote(localDirectory: string, remoteDirectory: string, remoteOS: string): Promise<void> {
const tmpSuffix: string = uniqueString(5);
const localTarPath: string = path.join(os.tmpdir(), `nni_tmp_local_${tmpSuffix}.tar.gz`);
const remoteTarPath: string = unixPathJoin(getRemoteTmpDir(remoteOS), `nni_tmp_remote_${tmpSuffix}.tar.gz`);
// Create remote directory
await this.createFolder(remoteDirectory);
// Compress files in local directory to experiment root directory
await tarAdd(localTarPath, localDirectory);
// Copy the compressed file to remoteDirectory and delete it
......
......@@ -168,6 +168,7 @@ def launch_test(config_file, training_service, test_case_config):
trial_stats = get_trial_stats(TRIAL_JOBS_URL)
print(json.dumps(trial_stats, indent=4), flush=True)
if status != 'DONE' or trial_stats['SUCCEEDED'] + trial_stats['EARLY_STOPPED'] < max_trial_num:
print_experiment_log(experiment_id=experiment_id)
print_trial_job_log(training_service, TRIAL_JOBS_URL)
raise AssertionError('Failed to finish in maxExecDuration')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment