Commit 22993e5d authored by demianzhang's avatar demianzhang Committed by chicm-ms
Browse files

Pass tslint for training service (#1177)

* fix local and remote training services tslint
parent ae7a72bc
......@@ -19,44 +19,46 @@
'use strict';
import * as os from 'os'
import * as path from 'path';
import { Client1_10, config } from 'kubernetes-client';
import { getLogger, Logger } from '../../common/log';
var K8SClient = require('kubernetes-client').Client;
var K8SConfig = require('kubernetes-client').config;
/**
* Generict Kubernetes client, target version >= 1.9
*/
// tslint:disable: no-any no-unsafe-any
class GeneralK8sClient {
protected readonly client: any;
protected readonly log: Logger = getLogger();
constructor() {
this.client = new K8SClient({ config: K8SConfig.fromKubeconfig(), version: '1.9'});
this.client = new Client1_10({ config: config.fromKubeconfig(), version: '1.9'});
this.client.loadSpec();
}
public async createSecret(secretManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response : any = await this.client.api.v1.namespaces('default').secrets.post({body: secretManifest});
if(response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
const response : any = await this.client.api.v1.namespaces('default').secrets
.post({body: secretManifest});
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`Create secrets failed, statusCode is ${response.statusCode}`);
}
return result;
}
}
/**
* Kubernetes CRD client
*/
abstract class KubernetesCRDClient {
protected readonly client: any;
protected readonly log: Logger = getLogger();
protected crdSchema: any;
constructor() {
this.client = new K8SClient({ config: K8SConfig.fromKubeconfig() });
this.client = new Client1_10({ config: config.fromKubeconfig() });
this.client.loadSpec();
}
......@@ -65,7 +67,7 @@ abstract class KubernetesCRDClient {
public abstract get containerName(): string;
public get jobKind(): string {
if(this.crdSchema
if (this.crdSchema
&& this.crdSchema.spec
&& this.crdSchema.spec.names
&& this.crdSchema.spec.names.kind) {
......@@ -76,7 +78,7 @@ abstract class KubernetesCRDClient {
}
public get apiVersion(): string {
if(this.crdSchema
if (this.crdSchema
&& this.crdSchema.spec
&& this.crdSchema.spec.version) {
return this.crdSchema.spec.version;
......@@ -88,43 +90,50 @@ abstract class KubernetesCRDClient {
public async createKubernetesJob(jobManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response : any = await this.operator.post({body: jobManifest});
if(response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`Create kubernetes job failed, statusCode is ${response.statusCode}`);
}
return result;
}
//TODO : replace any
public async getKubernetesJob(kubeflowJobName: string): Promise<any> {
let result: Promise<any>;
const response : any = await this.operator(kubeflowJobName).get();
if(response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
const response : any = await this.operator(kubeflowJobName)
.get();
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(response.body);
} else {
result = Promise.reject(`KubeflowOperatorClient get tfjobs failed, statusCode is ${response.statusCode}`);
}
return result;
}
public async deleteKubernetesJob(labels: Map<string, string>): Promise<boolean> {
let result: Promise<boolean>;
// construct match query from labels for deleting tfjob
const matchQuery: string = Array.from(labels.keys()).map(labelKey => `${labelKey}=${labels.get(labelKey)}`).join(',');
const matchQuery: string = Array.from(labels.keys())
.map((labelKey: string) => `${labelKey}=${labels.get(labelKey)}`)
.join(',');
try {
const deleteResult : any = await this.operator().delete({
const deleteResult : any = await this.operator()
.delete({
qs: {
labelSelector: matchQuery,
propagationPolicy: "Background"
propagationPolicy: 'Background'
}
});
if(deleteResult.statusCode && deleteResult.statusCode >= 200 && deleteResult.statusCode <= 299) {
if (deleteResult.statusCode && deleteResult.statusCode >= 200 && deleteResult.statusCode <= 299) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`KubeflowOperatorClient, delete labels ${matchQuery} get wrong statusCode ${deleteResult.statusCode}`);
result = Promise.reject(
`KubeflowOperatorClient, delete labels ${matchQuery} get wrong statusCode ${deleteResult.statusCode}`);
}
} catch(err) {
} catch (err) {
result = Promise.reject(err);
}
......
......@@ -22,6 +22,7 @@
export type KubernetesStorageKind = 'nfs' | 'azureStorage';
import { MethodNotImplementedError } from '../../common/errors';
// tslint:disable: completed-docs function-name
export abstract class KubernetesClusterConfig {
public readonly storage?: KubernetesStorageKind;
public readonly apiVersion: string;
......@@ -31,7 +32,7 @@ export abstract class KubernetesClusterConfig {
this.apiVersion = apiVersion;
}
public get storageType(): KubernetesStorageKind{
public get storageType(): KubernetesStorageKind {
throw new MethodNotImplementedError();
}
}
......@@ -56,12 +57,13 @@ export class KubernetesClusterConfigNFS extends KubernetesClusterConfig {
this.nfs = nfs;
}
public get storageType(): KubernetesStorageKind{
public get storageType(): KubernetesStorageKind {
return 'nfs';
}
public static getInstance(jsonObject: object): KubernetesClusterConfigNFS {
let kubernetesClusterConfigObjectNFS = <KubernetesClusterConfigNFS>jsonObject;
const kubernetesClusterConfigObjectNFS: KubernetesClusterConfigNFS = <KubernetesClusterConfigNFS>jsonObject;
return new KubernetesClusterConfigNFS(
kubernetesClusterConfigObjectNFS.apiVersion,
kubernetesClusterConfigObjectNFS.nfs,
......@@ -71,12 +73,12 @@ export class KubernetesClusterConfigNFS extends KubernetesClusterConfig {
}
export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
public readonly keyVault: keyVaultConfig;
public readonly keyVault: KeyVaultConfig;
public readonly azureStorage: AzureStorage;
constructor(
apiVersion: string,
keyVault: keyVaultConfig,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind
) {
......@@ -85,12 +87,13 @@ export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
this.azureStorage = azureStorage;
}
public get storageType(): KubernetesStorageKind{
public get storageType(): KubernetesStorageKind {
return 'azureStorage';
}
public static getInstance(jsonObject: object): KubernetesClusterConfigAzure {
let kubernetesClusterConfigObjectAzure = <KubernetesClusterConfigAzure>jsonObject;
const kubernetesClusterConfigObjectAzure: KubernetesClusterConfigAzure = <KubernetesClusterConfigAzure>jsonObject;
return new KubernetesClusterConfigAzure(
kubernetesClusterConfigObjectAzure.apiVersion,
kubernetesClusterConfigObjectAzure.keyVault,
......@@ -100,27 +103,30 @@ export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
}
}
// tslint:disable-next-line:no-unnecessary-class
export class KubernetesClusterConfigFactory {
public static generateKubernetesClusterConfig(jsonObject: object): KubernetesClusterConfig {
let storageConfig = <StorageConfig>jsonObject;
switch(storageConfig.storage) {
const storageConfig: StorageConfig = <StorageConfig>jsonObject;
switch (storageConfig.storage) {
case 'azureStorage':
return KubernetesClusterConfigAzure.getInstance(jsonObject);
case 'nfs' || undefined :
case 'nfs':
case undefined:
return KubernetesClusterConfigNFS.getInstance(jsonObject);
}
default:
throw new Error(`Invalid json object ${jsonObject}`);
}
}
}
/**
* NFS configuration to store Kubeflow job related files
*/
export class NFSConfig {
/** IP Adress of NFS server */
// IP Adress of NFS server
public readonly server : string;
/** exported NFS path on NFS server */
// exported NFS path on NFS server
public readonly path : string;
constructor(server : string, path : string) {
......@@ -133,13 +139,13 @@ export class NFSConfig {
* KeyVault configuration to store the key of Azure Storage Service
* Refer https://docs.microsoft.com/en-us/azure/key-vault/key-vault-manage-with-cli2
*/
export class keyVaultConfig {
/**The vault-name to specify vault */
export class KeyVaultConfig {
// The vault-name to specify vault
public readonly vaultName : string;
/**The name to specify private key */
// The name to specify private key
public readonly name : string;
constructor(vaultName : string, name : string){
constructor(vaultName : string, name : string) {
this.vaultName = vaultName;
this.name = name;
}
......@@ -149,12 +155,12 @@ export class keyVaultConfig {
* Azure Storage Service
*/
export class AzureStorage {
/**The azure share to storage files */
// The azure share to storage files
public readonly azureShare : string;
/**The account name of sotrage service */
// The account name of sotrage service
public readonly accountName: string;
constructor(azureShare : string, accountName: string){
constructor(azureShare : string, accountName: string) {
this.azureShare = azureShare;
this.accountName = accountName;
}
......@@ -164,19 +170,19 @@ export class AzureStorage {
* Trial job configuration for Kubernetes
*/
export class KubernetesTrialConfigTemplate {
/** CPU number */
// CPU number
public readonly cpuNum: number;
/** Memory */
// Memory
public readonly memoryMB: number;
/** Docker image */
// Docker image
public readonly image: string;
/** Trail command */
// Trail command
public readonly command : string;
/** Required GPU number for trial job. The number should be in [0,100] */
// Required GPU number for trial job. The number should be in [0,100]
public readonly gpuNum : number;
constructor(command : string, gpuNum : number,
......
......@@ -24,7 +24,6 @@ import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../commo
/**
* KubeflowTrialJobDetail
*/
// tslint:disable-next-line:max-classes-per-file
export class KubernetesTrialJobDetail implements TrialJobDetail {
public id: string;
public status: TrialJobStatus;
......@@ -55,7 +54,7 @@ export class KubernetesTrialJobDetail implements TrialJobDetail {
}
}
export const KubernetesScriptFormat =
export const kubernetesScriptFormat: string =
`#!/bin/bash
export NNI_PLATFORM={0}
export NNI_SYS_DIR=$PWD/nni/{1}
......@@ -71,5 +70,5 @@ mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cd $NNI_SYS_DIR
sh install_nni.sh
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} --nni_manager_version '{11}' --log_collection '{12}'`
+ `1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} \
--nni_manager_version '{11}' --log_collection '{12}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`;
......@@ -20,11 +20,10 @@
'use strict';
import * as assert from 'assert';
import { MethodNotImplementedError, NNIError, NNIErrorNames } from '../../common/errors';
import { getLogger, Logger } from '../../common/log';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { TrialJobStatus } from '../../common/trainingService';
import { KubernetesCRDClient } from './kubernetesApiClient';
import { MethodNotImplementedError } from '../../common/errors';
import { KubernetesTrialJobDetail } from './kubernetesData';
/**
......@@ -43,15 +42,15 @@ export class KubernetesJobInfoCollector {
public async retrieveTrialStatus(kubernetesCRDClient: KubernetesCRDClient | undefined) : Promise<void> {
assert(kubernetesCRDClient !== undefined);
const updateKubernetesTrialJobs : Promise<void>[] = [];
for(let [trialJobId, kubernetesTrialJob] of this.trialJobsMap) {
if (!kubernetesTrialJob) {
for (const [trialJobId, kubernetesTrialJob] of this.trialJobsMap) {
if (kubernetesTrialJob === undefined) {
throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
}
// Since Kubeflow needs some delay to schedule jobs, we provide 20 seconds buffer time to check kubeflow job's status
if( Date.now() - kubernetesTrialJob.submitTime < 20 * 1000) {
if (Date.now() - kubernetesTrialJob.submitTime < 20 * 1000) {
return Promise.resolve();
}
updateKubernetesTrialJobs.push(this.retrieveSingleTrialJobInfo(kubernetesCRDClient, kubernetesTrialJob))
updateKubernetesTrialJobs.push(this.retrieveSingleTrialJobInfo(kubernetesCRDClient, kubernetesTrialJob));
}
await Promise.all(updateKubernetesTrialJobs);
......
......@@ -19,19 +19,19 @@
'use strict';
import * as component from '../../common/component';
import { Inject } from 'typescript-ioc';
import * as component from '../../common/component';
import { ClusterJobRestServer } from '../common/clusterJobRestServer';
import { KubernetesTrainingService } from './kubernetesTrainingService';
import { ClusterJobRestServer } from '../common/clusterJobRestServer'
/**
* Kubeflow Training service Rest server, provides rest API to support kubeflow job metrics update
*
*/
@component.Singleton
export class KubernetesJobRestServer extends ClusterJobRestServer{
export class KubernetesJobRestServer extends ClusterJobRestServer {
@Inject
private kubernetesTrainingService? : KubernetesTrainingService;
private readonly kubernetesTrainingService? : KubernetesTrainingService;
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
......@@ -41,8 +41,9 @@ export class KubernetesJobRestServer extends ClusterJobRestServer{
this.kubernetesTrainingService = kubernetesTrainingService;
}
// tslint:disable-next-line:no-any
protected handleTrialMetrics(jobId : string, metrics : any[]) : void {
if(!this.kubernetesTrainingService) {
if (this.kubernetesTrainingService === undefined) {
throw Error('kubernetesTrainingService not initialized!');
}
// Split metrics array into single metric, then emit
......
......@@ -17,35 +17,36 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict'
'use strict';
import * as cpp from 'child-process-promise';
import * as path from 'path';
import * as azureStorage from 'azure-storage';
import { EventEmitter } from 'events';
import { Base64 } from 'js-base64';
import { String } from 'typescript-string-operations';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { getExperimentRootDir, uniqueString, getJobCancelStatus, getIPV4Address, getVersion } from '../../common/utils';
import {
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
NNIManagerIpConfig, TrialJobDetail, TrialJobMetric
} from '../../common/trainingService';
import { KubernetesTrialJobDetail, KubernetesScriptFormat } from './kubernetesData';
import { KubernetesClusterConfig } from './kubernetesConfig';
import { GeneralK8sClient, KubernetesCRDClient } from './kubernetesApiClient';
import { getExperimentRootDir, getIPV4Address, getJobCancelStatus, getVersion, uniqueString } from '../../common/utils';
import { AzureStorageClientUtility } from './azureStorageClientUtils';
import { GeneralK8sClient, KubernetesCRDClient } from './kubernetesApiClient';
import { KubernetesClusterConfig } from './kubernetesConfig';
import { kubernetesScriptFormat, KubernetesTrialJobDetail } from './kubernetesData';
import { KubernetesJobRestServer } from './kubernetesJobRestServer';
import { String } from 'typescript-string-operations';
import * as azureStorage from 'azure-storage';
var azure = require('azure-storage');
var base64 = require('js-base64').Base64;
/**
* Training Service implementation for Kubernetes
*/
abstract class KubernetesTrainingService {
protected readonly NNI_KUBERNETES_TRIAL_LABEL: string = 'nni-kubernetes-trial';
protected readonly log!: Logger;
protected readonly metricsEmitter: EventEmitter;
protected readonly trialJobsMap: Map<string, KubernetesTrialJobDetail>;
/** experiment root dir in NFS */
// experiment root dir in NFS
protected readonly trialLocalNFSTempFolder: string;
protected stopping: boolean = false;
protected experimentId! : string;
......@@ -76,13 +77,14 @@ abstract class KubernetesTrainingService {
this.logCollection = 'none';
}
public generatePodResource(memory: number, cpuNum: number, gpuNum: number) {
// tslint:disable:no-any
public generatePodResource(memory: number, cpuNum: number, gpuNum: number): any {
return {
'memory': `${memory}Mi`,
'cpu': `${cpuNum}`,
memory: `${memory}Mi`,
cpu: `${cpuNum}`,
'nvidia.com/gpu': `${gpuNum}`
}
}
};
} // tslint:enable:no-any
public async listTrialJobs(): Promise<TrialJobDetail[]> {
const jobs: TrialJobDetail[] = [];
......@@ -91,7 +93,7 @@ abstract class KubernetesTrainingService {
if (value.form.jobType === 'TRIAL') {
jobs.push(await this.getTrialJob(key));
}
};
}
return Promise.resolve(jobs);
}
......@@ -100,18 +102,18 @@ abstract class KubernetesTrainingService {
const kubernetesTrialJob: TrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (!kubernetesTrialJob) {
return Promise.reject(`trial job ${trialJobId} not found`)
if (kubernetesTrialJob === undefined) {
return Promise.reject(`trial job ${trialJobId} not found`);
}
return Promise.resolve(kubernetesTrialJob);
}
public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) {
public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
this.metricsEmitter.on('metric', listener);
}
public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) {
public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
this.metricsEmitter.off('metric', listener);
}
......@@ -127,6 +129,96 @@ abstract class KubernetesTrainingService {
return this.metricsEmitter;
}
public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail : KubernetesTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
if (this.kubernetesCRDClient === undefined) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because operatorClient is undefined`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
try {
await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
[
['app', this.NNI_KUBERNETES_TRIAL_LABEL],
['expId', getExperimentId()],
['trialId', trialJobId]
]
));
} catch (err) {
const errorMessage: string = `Delete trial ${trialJobId} failed: ${err}`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
trialJobDetail.endTime = Date.now();
trialJobDetail.status = getJobCancelStatus(isEarlyStopped);
return Promise.resolve();
}
public async cleanUp(): Promise<void> {
this.stopping = true;
// First, cancel all running kubernetes jobs
for (const [trialJobId, kubernetesTrialJob] of this.trialJobsMap) {
if (['RUNNING', 'WAITING', 'UNKNOWN'].includes(kubernetesTrialJob.status)) {
try {
await this.cancelTrialJob(trialJobId);
} catch (error) {
// DONT throw error during cleanup
}
kubernetesTrialJob.status = 'SYS_CANCELED';
}
}
// Delete all kubernetes jobs whose expId label is current experiment id
try {
if (this.kubernetesCRDClient !== undefined) {
await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
[
['app', this.NNI_KUBERNETES_TRIAL_LABEL],
['expId', getExperimentId()]
]
));
}
} catch (error) {
this.log.error(`Delete kubernetes job with label: app=${this.NNI_KUBERNETES_TRIAL_LABEL},\
expId=${getExperimentId()} failed, error is ${error}`);
}
// Unmount NFS
try {
await cpp.exec(`sudo umount ${this.trialLocalNFSTempFolder}`);
} catch (error) {
this.log.error(`Unmount ${this.trialLocalNFSTempFolder} failed, error is ${error}`);
}
// Stop kubernetes rest server
if (this.kubernetesJobRestServer === undefined) {
throw new Error('kubernetesJobRestServer not initialized!');
}
try {
await this.kubernetesJobRestServer.stop();
this.log.info('Kubernetes Training service rest server stopped successfully.');
} catch (error) {
// tslint:disable-next-line: no-unsafe-any
this.log.error(`Kubernetes Training service rest server stopped failed, error: ${error.message}`);
return Promise.reject(error);
}
return Promise.resolve();
}
protected generateSequenceId(): number {
if (this.nextTrialSequenceId === -1) {
this.nextTrialSequenceId = getInitTrialSequenceId();
......@@ -135,20 +227,26 @@ abstract class KubernetesTrainingService {
return this.nextTrialSequenceId++;
}
// tslint:disable: no-unsafe-any no-any
protected async createAzureStorage(vaultName: string, valutKeyName: string, accountName: string, azureShare: string): Promise<void> {
try {
const result = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
if(result.stderr) {
const result: any = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
if (result.stderr) {
const errorMessage: string = result.stderr;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
const storageAccountKey =JSON.parse(result.stdout).value;
const storageAccountKey: any = JSON.parse(result.stdout).value;
if (this.azureStorageAccountName === undefined) {
throw new Error('azureStorageAccountName not initialized!');
}
//create storage client
this.azureStorageClient = azure.createFileService(this.azureStorageAccountName, storageAccountKey);
this.azureStorageClient = azureStorage.createFileService(this.azureStorageAccountName, storageAccountKey);
await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
//create sotrage secret
this.azureStorageSecretName = 'nni-secret-' + uniqueString(8).toLowerCase();
this.azureStorageSecretName = String.Format('nni-secret-{0}', uniqueString(8)
.toLowerCase());
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
......@@ -163,38 +261,42 @@ abstract class KubernetesTrainingService {
},
type: 'Opaque',
data: {
azurestorageaccountname: base64.encode(this.azureStorageAccountName),
azurestorageaccountkey: base64.encode(storageAccountKey)
azurestorageaccountname: Base64.encode(this.azureStorageAccountName),
azurestorageaccountkey: Base64.encode(storageAccountKey)
}
}
);
} catch(error) {
} catch (error) {
this.log.error(error);
return Promise.reject(error);
}
return Promise.resolve();
}
// tslint:enable: no-unsafe-any no-any
/**
* Genereate run script for different roles(like worker or ps)
* @param trialJobId trial job id
* @param trialWorkingFolder working folder
* @param command
* @param command command
* @param trialSequenceId sequence id
*/
protected async generateRunScript(platform: string, trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleName: string, gpuNum: number): Promise<string> {
let nvidia_script: string = '';
let nvidiaScript: string = '';
// Nvidia devcie plugin for K8S has a known issue that requesting zero GPUs allocates all GPUs
// Refer https://github.com/NVIDIA/k8s-device-plugin/issues/61
// So we have to explicitly set CUDA_VISIBLE_DEVICES to empty if user sets gpuNum to 0 in NNI config file
if(gpuNum === 0) {
nvidia_script = `export CUDA_VISIBLE_DEVICES='0'`;
if (gpuNum === 0) {
nvidiaScript = `export CUDA_VISIBLE_DEVICES='0'`;
}
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
const version = this.versionCheck? await getVersion(): '';
// tslint:disable-next-line: strict-boolean-expressions
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const version: string = this.versionCheck ? await getVersion() : '';
const runScript: string = String.Format(
KubernetesScriptFormat,
kubernetesScriptFormat,
platform,
trialJobId,
path.join(trialWorkingFolder, 'output', `${roleName}_output`),
......@@ -202,108 +304,28 @@ abstract class KubernetesTrainingService {
getExperimentId(),
trialWorkingFolder,
trialSequenceId,
nvidia_script,
nvidiaScript,
command,
nniManagerIp,
this.kubernetesRestServerPort,
version,
this.logCollection
);
return Promise.resolve(runScript);
}
protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise<void> {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
} catch(error) {
} catch (error) {
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`;
this.log.error(mountError);
return Promise.reject(mountError);
}
return Promise.resolve();
}
public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail : KubernetesTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if(!trialJobDetail) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
if(!this.kubernetesCRDClient) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because operatorClient is undefined`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
try {
await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
[
['app', this.NNI_KUBERNETES_TRIAL_LABEL],
['expId', getExperimentId()],
['trialId', trialJobId]
]
));
} catch(err) {
const errorMessage: string = `Delete trial ${trialJobId} failed: ${err}`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
trialJobDetail.endTime = Date.now();
trialJobDetail.status = getJobCancelStatus(isEarlyStopped);
return Promise.resolve();
}
public async cleanUp(): Promise<void> {
this.stopping = true;
// First, cancel all running kubernetes jobs
for(let [trialJobId, kubernetesTrialJob] of this.trialJobsMap) {
if(['RUNNING', 'WAITING', 'UNKNOWN'].includes(kubernetesTrialJob.status)) {
try {
await this.cancelTrialJob(trialJobId);
} catch(error) {} // DONT throw error during cleanup
kubernetesTrialJob.status = 'SYS_CANCELED';
}
}
// Delete all kubernetes jobs whose expId label is current experiment id
try {
if(this.kubernetesCRDClient) {
await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
[
['app', this.NNI_KUBERNETES_TRIAL_LABEL],
['expId', getExperimentId()]
]
));
}
} catch(error) {
this.log.error(`Delete kubernetes job with label: app=${this.NNI_KUBERNETES_TRIAL_LABEL},expId=${getExperimentId()} failed, error is ${error}`);
}
// Unmount NFS
try {
await cpp.exec(`sudo umount ${this.trialLocalNFSTempFolder}`);
} catch(error) {
this.log.error(`Unmount ${this.trialLocalNFSTempFolder} failed, error is ${error}`);
}
// Stop kubernetes rest server
if(!this.kubernetesJobRestServer) {
throw new Error('kubernetesJobRestServer not initialized!');
}
try {
await this.kubernetesJobRestServer.stop();
this.log.info('Kubernetes Training service rest server stopped successfully.');
} catch (error) {
this.log.error(`Kubernetes Training service rest server stopped failed, error: ${error.message}`);
Promise.reject(error);
return Promise.reject(mountError);
}
return Promise.resolve();
}
}
export { KubernetesTrainingService }
export { KubernetesTrainingService };
......@@ -25,10 +25,10 @@ import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import { String } from 'typescript-string-operations';
import { execMkdir, getScriptName, getgpuMetricsCollectorScriptContent, execScript, execTail, execRemove, execKill } from '../common/util'
import { getLogger, Logger } from '../../common/log';
import { delay } from '../../common/utils';
import { GPUInfo, GPUSummary } from '../common/gpuData';
import { execKill, execMkdir, execRemove, execTail, getgpuMetricsCollectorScriptContent, getScriptName, runScript } from '../common/util';
/**
* GPUScheduler for local training service
......@@ -37,8 +37,8 @@ class GPUScheduler {
private gpuSummary!: GPUSummary;
private stopping: boolean;
private log: Logger;
private gpuMetricCollectorScriptFolder: string;
private readonly log: Logger;
private readonly gpuMetricCollectorScriptFolder: string;
constructor() {
this.stopping = false;
......@@ -58,28 +58,15 @@ class GPUScheduler {
}
}
/**
* Generate gpu metric collector shell script in local machine,
* used to run in remote machine, and will be deleted after uploaded from local.
*/
private async runGpuMetricsCollectorScript(): Promise<void> {
await execMkdir(this.gpuMetricCollectorScriptFolder);
//generate gpu_metrics_collector script
let gpuMetricsCollectorScriptPath: string = path.join(this.gpuMetricCollectorScriptFolder, getScriptName('gpu_metrics_collector'));
const gpuMetricsCollectorScriptContent: string = getgpuMetricsCollectorScriptContent(this.gpuMetricCollectorScriptFolder);
await fs.promises.writeFile(gpuMetricsCollectorScriptPath, gpuMetricsCollectorScriptContent, { encoding: 'utf8' });
execScript(gpuMetricsCollectorScriptPath)
}
public getAvailableGPUIndices(useActiveGpu: boolean, occupiedGpuIndexNumMap: Map<number, number>): number[] {
if (this.gpuSummary !== undefined) {
if(process.platform === 'win32' || useActiveGpu) {
if (process.platform === 'win32' || useActiveGpu) {
return this.gpuSummary.gpuInfos.map((info: GPUInfo) => info.index);
}
else{
} else {
return this.gpuSummary.gpuInfos.filter((info: GPUInfo) =>
occupiedGpuIndexNumMap.get(info.index) === undefined && info.activeProcessNum === 0 ||
occupiedGpuIndexNumMap.get(info.index) !== undefined).map((info: GPUInfo) => info.index);
occupiedGpuIndexNumMap.get(info.index) !== undefined)
.map((info: GPUInfo) => info.index);
}
}
......@@ -105,17 +92,32 @@ class GPUScheduler {
}
}
/**
* Generate gpu metric collector shell script in local machine,
* used to run in remote machine, and will be deleted after uploaded from local.
*/
private async runGpuMetricsCollectorScript(): Promise<void> {
await execMkdir(this.gpuMetricCollectorScriptFolder);
//generate gpu_metrics_collector script
const gpuMetricsCollectorScriptPath: string =
path.join(this.gpuMetricCollectorScriptFolder, getScriptName('gpu_metrics_collector'));
const gpuMetricsCollectorScriptContent: string = getgpuMetricsCollectorScriptContent(this.gpuMetricCollectorScriptFolder);
await fs.promises.writeFile(gpuMetricsCollectorScriptPath, gpuMetricsCollectorScriptContent, { encoding: 'utf8' });
runScript(gpuMetricsCollectorScriptPath);
}
// tslint:disable:non-literal-fs-path
private async updateGPUSummary(): Promise<void> {
let gpuMetricPath = path.join(this.gpuMetricCollectorScriptFolder, 'gpu_metrics');
const gpuMetricPath: string = path.join(this.gpuMetricCollectorScriptFolder, 'gpu_metrics');
if (fs.existsSync(gpuMetricPath)) {
const cmdresult: cpp.childProcessPromise.Result = await execTail(gpuMetricPath);
if (cmdresult && cmdresult.stdout) {
if (cmdresult !== undefined && cmdresult.stdout !== undefined) {
this.gpuSummary = <GPUSummary>JSON.parse(cmdresult.stdout);
} else {
this.log.error('Could not get gpu metrics information!');
}
} else{
this.log.warning('gpu_metrics file does not exist!')
} else {
this.log.warning('gpu_metrics file does not exist!');
}
}
}
......
......@@ -24,6 +24,7 @@ import { EventEmitter } from 'events';
import * as fs from 'fs';
import * as path from 'path';
import * as ts from 'tail-stream';
import * as tkill from 'tree-kill';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
......@@ -31,14 +32,14 @@ import {
HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm,
TrialJobDetail, TrialJobMetric, TrialJobStatus
} from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, getJobCancelStatus, uniqueString, isAlive, getNewLine } from '../../common/utils';
import { execMkdir, getScriptName, execScript, setEnvironmentVariable, execNewFile } from '../common/util'
import {
delay, generateParamFileName, getExperimentRootDir, getJobCancelStatus, getNewLine, isAlive, uniqueString
} from '../../common/utils';
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { execMkdir, execNewFile, getScriptName, runScript, setEnvironmentVariable } from '../common/util';
import { GPUScheduler } from './gpuScheduler';
const tkill = require('tree-kill');
/**
* Decode a command
* @param Buffer binary incoming data
......@@ -46,7 +47,7 @@ const tkill = require('tree-kill');
* success: true if the buffer contains at least one complete command; otherwise false
* remain: remaining data after the first command
*/
// tslint:disable-next-line:informative-docs
// tslint:disable:newline-per-chained-call informative-docs
function decodeCommand(data: Buffer): [boolean, string, string, Buffer] {
if (data.length < 8) {
return [false, '', '', data];
......@@ -61,6 +62,7 @@ function decodeCommand(data: Buffer): [boolean, string, string, Buffer] {
return [true, commandType, content, remain];
}
// tslint:enable:newline-per-chained-call informative-docs
/**
* LocalTrialJobDetail
......@@ -117,21 +119,21 @@ class LocalConfig {
* Local machine training service
*/
class LocalTrainingService implements TrainingService {
private eventEmitter: EventEmitter;
private jobMap: Map<string, LocalTrialJobDetail>;
private jobQueue: string[];
private readonly eventEmitter: EventEmitter;
private readonly jobMap: Map<string, LocalTrialJobDetail>;
private readonly jobQueue: string[];
private initialized: boolean;
private stopping: boolean;
private rootDir!: string;
private trialSequenceId: number;
private gpuScheduler!: GPUScheduler;
private occupiedGpuIndexNumMap: Map<number, number>;
private readonly occupiedGpuIndexNumMap: Map<number, number>;
private designatedGpuIndices!: Set<number>;
private log: Logger;
private readonly log: Logger;
private localTrailConfig?: TrialConfig;
private localConfig?: LocalConfig;
private isMultiPhase: boolean;
private jobStreamMap: Map<string, ts.Stream>;
private readonly jobStreamMap: Map<string, ts.Stream>;
private maxTrialNumPerGpu: number;
private useActiveGpu: boolean;
......@@ -182,7 +184,7 @@ class LocalTrainingService implements TrainingService {
return this.getHostJob(trialJobId);
}
if (trialJob.status === 'RUNNING') {
let alive: boolean = await isAlive(trialJob.pid);
const alive: boolean = await isAlive(trialJob.pid);
if (!alive) {
trialJob.endTime = Date.now();
this.setTrialJobStatus(trialJob, 'FAILED');
......@@ -276,7 +278,7 @@ class LocalTrainingService implements TrainingService {
return Promise.resolve();
}
if (trialJob.form.jobType === 'TRIAL') {
await tkill(trialJob.pid, 'SIGKILL');
tkill(trialJob.pid, 'SIGKILL');
} else if (trialJob.form.jobType === 'HOST') {
await cpp.exec(`pkill -9 -P ${trialJob.pid}`);
} else {
......@@ -290,7 +292,8 @@ class LocalTrainingService implements TrainingService {
public async setClusterMetadata(key: string, value: string): Promise<void> {
if (!this.initialized) {
this.rootDir = getExperimentRootDir();
if(!fs.existsSync(this.rootDir)){
// tslint:disable-next-line:non-literal-fs-path
if (!fs.existsSync(this.rootDir)) {
await cpp.exec(`powershell.exe mkdir ${this.rootDir}`);
}
this.initialized = true;
......@@ -299,7 +302,7 @@ class LocalTrainingService implements TrainingService {
case TrialConfigMetadataKey.TRIAL_CONFIG:
this.localTrailConfig = <TrialConfig>JSON.parse(value);
// Parse trial config failed, throw Error
if (!this.localTrailConfig) {
if (this.localTrailConfig === undefined) {
throw new Error('trial config parsed failed');
}
this.log.info(`required GPU number is ${this.localTrailConfig.gpuNum}`);
......@@ -336,10 +339,10 @@ class LocalTrainingService implements TrainingService {
switch (key) {
case TrialConfigMetadataKey.TRIAL_CONFIG:
let getResult: Promise<string>;
if (!this.localTrailConfig) {
if (this.localTrailConfig === undefined) {
getResult = Promise.reject(new NNIError(NNIErrorNames.NOT_FOUND, `${key} is never set yet`));
} else {
getResult = Promise.resolve(!this.localTrailConfig ? '' : JSON.stringify(this.localTrailConfig));
getResult = Promise.resolve(JSON.stringify(this.localTrailConfig));
}
return getResult;
......@@ -366,7 +369,7 @@ class LocalTrainingService implements TrainingService {
if (['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'].includes(trialJob.status)) {
if (this.jobStreamMap.has(trialJob.id)) {
const stream: ts.Stream | undefined = this.jobStreamMap.get(trialJob.id);
if (!stream) {
if (stream === undefined) {
throw new Error(`Could not find stream in trial ${trialJob.id}`);
}
stream.destroy();
......@@ -376,13 +379,13 @@ class LocalTrainingService implements TrainingService {
if (trialJob.gpuIndices !== undefined && trialJob.gpuIndices.length > 0 && this.gpuScheduler !== undefined) {
if (oldStatus === 'RUNNING' && trialJob.status !== 'RUNNING') {
for (const index of trialJob.gpuIndices) {
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined) {
const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if (num === undefined) {
throw new Error(`gpu resource schedule error`);
} else if(num === 1) {
} else if (num === 1) {
this.occupiedGpuIndexNumMap.delete(index);
} else {
this.occupiedGpuIndexNumMap.set(index, num - 1)
this.occupiedGpuIndexNumMap.set(index, num - 1);
}
}
}
......@@ -424,10 +427,10 @@ class LocalTrainingService implements TrainingService {
}
let selectedGPUIndices: number[] = [];
let availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.useActiveGpu, this.occupiedGpuIndexNumMap);
for(let index of availableGpuIndices) {
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined || num < this.maxTrialNumPerGpu) {
const availableGpuIndices: number[] = this.gpuScheduler.getAvailableGPUIndices(this.useActiveGpu, this.occupiedGpuIndexNumMap);
for (const index of availableGpuIndices) {
const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if (num === undefined || num < this.maxTrialNumPerGpu) {
selectedGPUIndices.push(index);
}
}
......@@ -461,11 +464,11 @@ class LocalTrainingService implements TrainingService {
private occupyResource(resource: {gpuIndices: number[]}): void {
if (this.gpuScheduler !== undefined) {
for (const index of resource.gpuIndices) {
let num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if(num === undefined) {
this.occupiedGpuIndexNumMap.set(index, 1)
const num: number | undefined = this.occupiedGpuIndexNumMap.get(index);
if (num === undefined) {
this.occupiedGpuIndexNumMap.set(index, 1);
} else {
this.occupiedGpuIndexNumMap.set(index, num + 1)
this.occupiedGpuIndexNumMap.set(index, num + 1);
}
}
}
......@@ -498,20 +501,20 @@ class LocalTrainingService implements TrainingService {
}
}
private getScript(localTrailConfig: TrialConfig, workingDirectory: string): string[]{
let script: string[] = [];
if (process.platform === "win32") {
private getScript(localTrailConfig: TrialConfig, workingDirectory: string): string[] {
const script: string[] = [];
if (process.platform === 'win32') {
script.push(
`cmd /c ${localTrailConfig.command} 2>${path.join(workingDirectory, 'stderr')}`,
`$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
`$NOW_DATE = "$NOW_DATE" + "000"`,
`Write $LASTEXITCODE " " $NOW_DATE | Out-File ${path.join(workingDirectory, '.nni', 'state')} -NoNewline -encoding utf8`);
}
else{
} else {
script.push(
`eval ${localTrailConfig.command} 2>${path.join(workingDirectory, 'stderr')}`,
`echo $? \`date +%s000\` >${path.join(workingDirectory, '.nni', 'state')}`);
}
return script;
}
......@@ -519,28 +522,29 @@ class LocalTrainingService implements TrainingService {
const trialJobDetail: LocalTrialJobDetail = <LocalTrialJobDetail>this.jobMap.get(trialJobId);
const variables: { key: string; value: string }[] = this.getEnvironmentVariables(trialJobDetail, resource);
if (!this.localTrailConfig) {
if (this.localTrailConfig === undefined) {
throw new Error('trial config is not initialized');
}
const runScriptLines: string[] = [];
if (process.platform !== "win32"){
runScriptLines.push('#!/bin/bash');
const runScriptContent: string[] = [];
if (process.platform !== 'win32') {
runScriptContent.push('#!/bin/bash');
}
runScriptLines.push(`cd ${this.localTrailConfig.codeDir}`);
runScriptContent.push(`cd ${this.localTrailConfig.codeDir}`);
for (const variable of variables) {
runScriptLines.push(setEnvironmentVariable(variable));
runScriptContent.push(setEnvironmentVariable(variable));
}
const scripts: string[] = this.getScript(this.localTrailConfig, trialJobDetail.workingDirectory);
scripts.forEach(script => {
runScriptLines.push(script);
scripts.forEach((script: string) => {
runScriptContent.push(script);
});
await execMkdir(trialJobDetail.workingDirectory);
await execMkdir(path.join(trialJobDetail.workingDirectory, '.nni'));
await execNewFile(path.join(trialJobDetail.workingDirectory, '.nni', 'metrics'));
const scriptName: string = getScriptName('run');
await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, scriptName), runScriptLines.join(getNewLine()), { encoding: 'utf8', mode: 0o777 });
await fs.promises.writeFile(path.join(trialJobDetail.workingDirectory, scriptName),
runScriptContent.join(getNewLine()), { encoding: 'utf8', mode: 0o777 });
await this.writeParameterFile(trialJobDetail.workingDirectory, (<TrialJobApplicationForm>trialJobDetail.form).hyperParameters);
const trialJobProcess: cp.ChildProcess = execScript(path.join(trialJobDetail.workingDirectory, scriptName));
const trialJobProcess: cp.ChildProcess = runScript(path.join(trialJobDetail.workingDirectory, scriptName));
this.setTrialJobStatus(trialJobDetail, 'RUNNING');
trialJobDetail.startTime = Date.now();
trialJobDetail.pid = trialJobProcess.pid;
......
......@@ -17,12 +17,12 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
import * as path from 'path';
import * as fs from 'fs';
import * as path from 'path';
import { Deferred } from 'ts-deferred';
import { getExperimentId } from '../../common/experimentStartupInfo';
import { getLogger } from '../../common/log';
import { unixPathJoin } from '../../common/utils'
import { unixPathJoin } from '../../common/utils';
/**
* HDFS client utility, including copy file/directory
......@@ -33,6 +33,7 @@ export namespace HDFSClientUtility {
* @param hdfsUserName HDFS user name
*/
function hdfsExpRootDir(hdfsUserName: string): string {
// tslint:disable-next-line:prefer-template
return '/' + unixPathJoin(hdfsUserName, 'nni', 'experiments', getExperimentId());
}
......@@ -50,8 +51,8 @@ export namespace HDFSClientUtility {
* @param trialId NNI trial ID
*/
export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
let root = hdfsExpRootDir(hdfsUserName)
console.log(root)
const root: string = hdfsExpRootDir(hdfsUserName);
return unixPathJoin(root, 'trials', trialId);
}
......@@ -62,26 +63,31 @@ export namespace HDFSClientUtility {
* @param hdfsFilePath hdfs file path(target)
* @param hdfsClient hdfs client
*/
// tslint:disable: no-unsafe-any non-literal-fs-path no-any
export async function copyFileToHdfs(localFilePath : string, hdfsFilePath : string, hdfsClient : any) : Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
// tslint:disable-next-line:non-literal-fs-path
fs.exists(localFilePath, (exists : boolean) => {
// Detect if local file exist
if (exists) {
var localFileStream = fs.createReadStream(localFilePath);
var hdfsFileStream = hdfsClient.createWriteStream(hdfsFilePath);
const localFileStream: fs.ReadStream = fs.createReadStream(localFilePath);
const hdfsFileStream: any = hdfsClient.createWriteStream(hdfsFilePath);
localFileStream.pipe(hdfsFileStream);
hdfsFileStream.on('finish', function onFinish () {
hdfsFileStream.on('finish', () => {
deferred.resolve();
});
hdfsFileStream.on('error', (err : any) => {
getLogger().error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
getLogger()
.error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
deferred.reject(err);
});
} else {
getLogger().error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
getLogger()
.error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
deferred.reject('file not exist!');
}
});
return deferred.promise;
}
......@@ -92,21 +98,23 @@ export namespace HDFSClientUtility {
* @param hdfsDirectory HDFS directory
* @param hdfsClient HDFS client
*/
export async function copyDirectoryToHdfs(localDirectory : string, hdfsDirectory : string, hdfsClient : any) : Promise<void>{
export async function copyDirectoryToHdfs(localDirectory : string, hdfsDirectory : string, hdfsClient : any) : Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
// TODO: fs.readdirSync doesn't support ~($HOME)
const fileNameArray: string[] = fs.readdirSync(localDirectory);
for(var fileName of fileNameArray){
for (const fileName of fileNameArray) {
const fullFilePath: string = path.join(localDirectory, fileName);
try {
if (fs.lstatSync(fullFilePath).isFile()) {
// tslint:disable-next-line:non-literal-fs-path
if (fs.lstatSync(fullFilePath)
.isFile()) {
await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
} else {
// If filePath is a directory, recuisively copy it to remote directory
await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
}
} catch(error) {
} catch (error) {
deferred.reject(error);
}
}
......@@ -122,16 +130,16 @@ export namespace HDFSClientUtility {
* @param hdfsPath HDFS file path
* @param hdfsClient HDFS client
*/
export async function readFileFromHDFS(hdfsPath : string, hdfsClient :any) : Promise<Buffer> {
export async function readFileFromHDFS(hdfsPath : string, hdfsClient : any) : Promise<Buffer> {
const deferred: Deferred<Buffer> = new Deferred<Buffer>();
let buffer : Buffer = Buffer.alloc(0);
const exist : boolean = await pathExists(hdfsPath, hdfsClient);
if(!exist) {
if (!exist) {
deferred.reject(`${hdfsPath} doesn't exists`);
}
const remoteFileStream = hdfsClient.createReadStream(hdfsPath);
const remoteFileStream: any = hdfsClient.createReadStream(hdfsPath);
remoteFileStream.on('error', (err : any) => {
// Reject with the error
deferred.reject(err);
......@@ -142,7 +150,7 @@ export namespace HDFSClientUtility {
buffer = Buffer.concat([buffer, chunk]);
});
remoteFileStream.on('finish', function onFinish () {
remoteFileStream.on('finish', () => {
// Upload is done, resolve
deferred.resolve(buffer);
});
......@@ -158,30 +166,32 @@ export namespace HDFSClientUtility {
*/
export async function pathExists(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
const deferred : Deferred<boolean> = new Deferred<boolean>();
hdfsClient.exists(hdfsPath, (exist : boolean ) => {
hdfsClient.exists(hdfsPath, (exist : boolean) => {
deferred.resolve(exist);
});
let timeoutId : NodeJS.Timer
let timeoutId : NodeJS.Timer;
const delayTimeout : Promise<boolean> = new Promise<boolean>((resolve : Function, reject : Function) : void => {
// Set timeout and reject the promise once reach timeout (5 seconds)
timeoutId = setTimeout(() => deferred.reject(`Check HDFS path ${hdfsPath} exists timeout`), 5000);
timeoutId = setTimeout(() => { reject(`Check HDFS path ${hdfsPath} exists timeout`); }, 5000);
});
return Promise.race([deferred.promise, delayTimeout]).finally(() => clearTimeout(timeoutId));
return Promise.race([deferred.promise, delayTimeout])
.finally(() => { clearTimeout(timeoutId); });
}
/**
* Mkdir in HDFS, use default permission 755
*
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient
* @param hdfsClient HDFS client
*/
export function mkdir(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
const deferred : Deferred<boolean> = new Deferred<boolean>();
hdfsClient.mkdir(hdfsPath, (err : any)=> {
if(!err) {
hdfsClient.mkdir(hdfsPath, (err : any) => {
if (!err) {
deferred.resolve(true);
} else {
deferred.reject(err.message);
......@@ -195,17 +205,17 @@ export namespace HDFSClientUtility {
* Read directory contents
*
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient
* @param hdfsClient HDFS client
*/
export async function readdir(hdfsPath : string, hdfsClient : any) : Promise<string[]> {
const deferred : Deferred<string[]> = new Deferred<string[]>();
const exist : boolean = await pathExists(hdfsPath, hdfsClient);
if(!exist) {
if (!exist) {
deferred.reject(`${hdfsPath} doesn't exists`);
}
hdfsClient.readdir(hdfsPath, (err : any, files : any[] ) => {
if(err) {
hdfsClient.readdir(hdfsPath, (err : any, files : any[]) => {
if (err) {
deferred.reject(err);
}
......@@ -218,18 +228,20 @@ export namespace HDFSClientUtility {
/**
* Delete HDFS path
* @param hdfsPath the path in HDFS. It could be either file or directory
* @param hdfsClient
* @param hdfsClient HDFS client
* @param recursive Mark if need to delete recursively
*/
export function deletePath(hdfsPath : string, hdfsClient : any, recursive : boolean = true) : Promise<boolean> {
const deferred : Deferred<boolean> = new Deferred<boolean>();
hdfsClient.unlink(hdfsPath, recursive, (err : any)=> {
if(!err) {
hdfsClient.unlink(hdfsPath, recursive, (err : any) => {
if (!err) {
deferred.resolve(true);
} else {
deferred.reject(err.message);
}
});
return deferred.promise;
}
// tslint:enable: no-unsafe-any non-literal-fs-path no-any
}
......@@ -19,8 +19,11 @@
'use strict';
import {TrialConfig} from '../common/trialConfig'
import {TrialConfig} from '../common/trialConfig';
/**
* Task role for PAI
*/
export class PAITaskRole {
// Name for the task role
public readonly name: string;
......@@ -46,7 +49,8 @@ export class PAITaskRole {
* @param gpuNumber GPU number for one task in the task role, no less than 0
* @param command Executable command for tasks in the task role, can not be empty
*/
constructor(name : string, taskNumber : number, cpuNumber : number, memoryMB : number, gpuNumber : number, command : string, shmMB?: number) {
constructor(name : string, taskNumber : number, cpuNumber : number, memoryMB : number, gpuNumber : number,
command : string, shmMB?: number) {
this.name = name;
this.taskNumber = taskNumber;
this.cpuNumber = cpuNumber;
......@@ -57,7 +61,10 @@ export class PAITaskRole {
}
}
export class PAIJobConfig{
/**
* Trial job configuration submitted to PAI
*/
export class PAIJobConfig {
// Name for the job, need to be unique
public readonly jobName: string;
// URL pointing to the Docker image for all tasks in the job
......@@ -95,6 +102,9 @@ export class PAIJobConfig{
}
}
/**
* PAI cluster configuration
*/
export class PAIClusterConfig {
public readonly userName: string;
public readonly passWord: string;
......@@ -106,14 +116,17 @@ export class PAIClusterConfig {
* @param passWord password of PAI Cluster
* @param host Host IP of PAI Cluster
*/
constructor(userName: string, passWord : string, host : string){
constructor(userName: string, passWord : string, host : string) {
this.userName = userName;
this.passWord = passWord;
this.host = host;
}
}
export class NNIPAITrialConfig extends TrialConfig{
/**
* PAI trial configuration
*/
export class NNIPAITrialConfig extends TrialConfig {
public readonly cpuNum: number;
public readonly memoryMB: number;
public readonly image: string;
......@@ -137,4 +150,3 @@ export class NNIPAITrialConfig extends TrialConfig{
this.shmMB = shmMB;
}
}
......@@ -19,8 +19,11 @@
'use strict';
import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from 'common/trainingService';
import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService';
/**
* PAI trial job detail
*/
export class PAITrialJobDetail implements TrialJobDetail {
public id: string;
public status: TrialJobStatus;
......@@ -61,13 +64,15 @@ else
fi`;
export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4}
&& cd $NNI_SYS_DIR && sh install_nni.sh
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}'
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' --nni_manager_version '{12}' --log_collection '{13}'`;
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} \
&& cd $NNI_SYS_DIR && sh install_nni.sh \
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' \
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' \
--nni_manager_version '{12}' --log_collection '{13}'`;
export const PAI_OUTPUT_DIR_FORMAT: string =
`hdfs://{0}:9000/`;
// tslint:disable:no-http-string
export const PAI_LOG_PATH_FORMAT: string =
`http://{0}/webhdfs/explorer.html#{1}`
`http://{0}/webhdfs/explorer.html#{1}`;
......@@ -19,13 +19,14 @@
'use strict';
// tslint:disable-next-line:no-implicit-dependencies
import * as request from 'request';
import { Deferred } from 'ts-deferred';
import { getLogger, Logger } from '../../common/log';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { PAITrialJobDetail } from './paiData';
import { PAIClusterConfig } from './paiConfig';
import { getLogger, Logger } from '../../common/log';
import { TrialJobStatus } from '../../common/trainingService';
import { PAIClusterConfig } from './paiConfig';
import { PAITrialJobDetail } from './paiData';
/**
* Collector PAI jobs info from PAI cluster, and update pai job status locally
......@@ -43,59 +44,64 @@ export class PAIJobInfoCollector {
}
public async retrieveTrialStatus(paiToken? : string, paiClusterConfig?: PAIClusterConfig) : Promise<void> {
if (!paiClusterConfig || !paiToken) {
if (paiClusterConfig === undefined || paiToken === undefined) {
return Promise.resolve();
}
const updatePaiTrialJobs : Promise<void>[] = [];
for(let [trialJobId, paiTrialJob] of this.trialJobsMap) {
if (!paiTrialJob) {
for (const [trialJobId, paiTrialJob] of this.trialJobsMap) {
if (paiTrialJob === undefined) {
throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
}
updatePaiTrialJobs.push(this.getSinglePAITrialJobInfo(paiTrialJob, paiToken, paiClusterConfig))
updatePaiTrialJobs.push(this.getSinglePAITrialJobInfo(paiTrialJob, paiToken, paiClusterConfig));
}
await Promise.all(updatePaiTrialJobs);
}
private getSinglePAITrialJobInfo(paiTrialJob : PAITrialJobDetail, paiToken : string, paiClusterConfig: PAIClusterConfig) : Promise<void> {
private getSinglePAITrialJobInfo(paiTrialJob : PAITrialJobDetail, paiToken : string, paiClusterConfig: PAIClusterConfig)
: Promise<void> {
const deferred : Deferred<void> = new Deferred<void>();
if (!this.statusesNeedToCheck.includes(paiTrialJob.status)) {
deferred.resolve();
return deferred.promise;
}
// Rest call to get PAI job info and update status
// Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
const getJobInfoRequest: request.Options = {
// tslint:disable-next-line:no-http-string
uri: `http://${paiClusterConfig.host}/rest-server/api/v1/user/${paiClusterConfig.userName}/jobs/${paiTrialJob.paiJobName}`,
method: 'GET',
json: true,
headers: {
"Content-Type": "application/json",
"Authorization": 'Bearer ' + paiToken
'Content-Type': 'application/json',
Authorization: `Bearer ${paiToken}`
}
};
// tslint:disable: no-unsafe-any no-any cyclomatic-complexity
//TODO : pass in request timeout param?
request(getJobInfoRequest, (error: Error, response: request.Response, body: any) => {
if (error || response.statusCode >= 500) {
if ((error !== undefined && error !== null) || response.statusCode >= 500) {
this.log.error(`PAI Training service: get job info for trial ${paiTrialJob.id} from PAI Cluster failed!`);
// Queried PAI job info failed, set job status to UNKNOWN
if(paiTrialJob.status === 'WAITING' || paiTrialJob.status === 'RUNNING') {
if (paiTrialJob.status === 'WAITING' || paiTrialJob.status === 'RUNNING') {
paiTrialJob.status = 'UNKNOWN';
}
} else {
if(response.body.jobStatus && response.body.jobStatus.state) {
switch(response.body.jobStatus.state) {
if (response.body.jobStatus && response.body.jobStatus.state) {
switch (response.body.jobStatus.state) {
case 'WAITING':
paiTrialJob.status = 'WAITING';
break;
case 'RUNNING':
paiTrialJob.status = 'RUNNING';
if(!paiTrialJob.startTime) {
if (paiTrialJob.startTime === undefined) {
paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
}
if(!paiTrialJob.url) {
if (paiTrialJob.url === undefined) {
paiTrialJob.url = response.body.jobStatus.appTrackingUrl;
}
break;
......@@ -107,7 +113,9 @@ export class PAIJobInfoCollector {
paiTrialJob.status = paiTrialJob.isEarlyStopped === true ?
'EARLY_STOPPED' : 'USER_CANCELED';
} else {
// if paiTrialJob's isEarlyStopped is undefined, that mean we didn't stop it via cancellation, mark it as SYS_CANCELLED by PAI
/* if paiTrialJob's isEarlyStopped is undefined, that mean we didn't stop it via cancellation,
* mark it as SYS_CANCELLED by PAI
*/
paiTrialJob.status = 'SYS_CANCELED';
}
break;
......@@ -116,18 +124,17 @@ export class PAIJobInfoCollector {
break;
default:
paiTrialJob.status = 'UNKNOWN';
break;
}
// For final job statues, update startTime, endTime and url
if(this.finalStatuses.includes(paiTrialJob.status)) {
if(!paiTrialJob.startTime) {
if (this.finalStatuses.includes(paiTrialJob.status)) {
if (paiTrialJob.startTime === undefined) {
paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
}
if(!paiTrialJob.endTime) {
if (paiTrialJob.endTime === undefined) {
paiTrialJob.endTime = response.body.jobStatus.completedTime;
}
// Set pai trial job's url to WebHDFS output path
if(paiTrialJob.hdfsLogPath) {
if (paiTrialJob.hdfsLogPath !== undefined) {
paiTrialJob.url += `,${paiTrialJob.hdfsLogPath}`;
}
}
......@@ -138,4 +145,5 @@ export class PAIJobInfoCollector {
return deferred.promise;
}
// tslint:enable: no-unsafe-any no-any
}
......@@ -19,17 +19,17 @@
'use strict';
import * as component from '../../common/component';
import { Inject } from 'typescript-ioc';
import * as component from '../../common/component';
import { ClusterJobRestServer } from '../common/clusterJobRestServer';
import { PAITrainingService } from './paiTrainingService';
import { ClusterJobRestServer } from '../common/clusterJobRestServer'
/**
* PAI Training service Rest server, provides rest API to support pai job metrics update
*
*/
@component.Singleton
export class PAIJobRestServer extends ClusterJobRestServer{
export class PAIJobRestServer extends ClusterJobRestServer {
@Inject
private readonly paiTrainingService : PAITrainingService;
......@@ -41,6 +41,7 @@ export class PAIJobRestServer extends ClusterJobRestServer{
this.paiTrainingService = component.get(PAITrainingService);
}
// tslint:disable-next-line:no-any
protected handleTrialMetrics(jobId : string, metrics : any[]) : void {
// Split metrics array into single metric, then emit
// Warning: If not split metrics into single ones, the behavior will be UNKNOWN
......
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
......@@ -23,6 +22,7 @@
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
// tslint:disable-next-line:no-implicit-dependencies
import * as request from 'request';
import * as component from '../../common/component';
......@@ -37,18 +37,17 @@ import {
TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
} from '../../common/trainingService';
import { delay, generateParamFileName,
getExperimentRootDir, getIPV4Address, getVersion, uniqueString } from '../../common/utils';
getExperimentRootDir, getIPV4Address, getVersion, uniqueString, unixPathJoin } from '../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { validateCodeDir, execMkdir } from '../common/util';
import { unixPathJoin } from '../../common/utils'
import { execMkdir, validateCodeDir } from '../common/util';
import { HDFSClientUtility } from './hdfsClientUtility';
import { NNIPAITrialConfig, PAIClusterConfig, PAIJobConfig, PAITaskRole } from './paiConfig';
import { PAI_LOG_PATH_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_TRIAL_COMMAND_FORMAT, PAITrialJobDetail } from './paiData';
import { PAIJobInfoCollector } from './paiJobInfoCollector';
import { PAIJobRestServer } from './paiJobRestServer';
const WebHDFS = require('webhdfs');
import * as WebHDFS from 'webhdfs';
/**
* Training Service implementation for OpenPAI (Open Platform for AI)
......@@ -62,13 +61,14 @@ class PAITrainingService implements TrainingService {
private readonly expRootDir: string;
private paiTrialConfig: NNIPAITrialConfig | undefined;
private paiClusterConfig?: PAIClusterConfig;
private jobQueue: string[];
private readonly jobQueue: string[];
private stopping: boolean = false;
// tslint:disable-next-line:no-any
private hdfsClient: any;
private paiToken? : string;
private paiTokenUpdateTime?: number;
private paiTokenUpdateInterval: number;
private experimentId! : string;
private readonly paiTokenUpdateInterval: number;
private readonly experimentId! : string;
private readonly paiJobCollector : PAIJobInfoCollector;
private readonly hdfsDirPattern: string;
private hdfsBaseDir: string | undefined;
......@@ -121,13 +121,13 @@ class PAITrainingService implements TrainingService {
}
public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
if (!this.paiClusterConfig) {
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
const paiTrialJob: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (!paiTrialJob) {
if (paiTrialJob === undefined) {
return Promise.reject(`trial job ${trialJobId} not found`);
}
......@@ -144,7 +144,7 @@ class PAITrainingService implements TrainingService {
public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
const deferred : Deferred<PAITrialJobDetail> = new Deferred<PAITrialJobDetail>();
if (!this.hdfsBaseDir) {
if (this.hdfsBaseDir === undefined) {
throw new Error('hdfsBaseDir is not initialized');
}
......@@ -187,24 +187,26 @@ class PAITrainingService implements TrainingService {
return false;
}
// tslint:disable:no-http-string
public cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail : PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
const deferred : Deferred<void> = new Deferred<void>();
if (!trialJobDetail) {
if (trialJobDetail === undefined) {
this.log.error(`cancelTrialJob: trial job id ${trialJobId} not found`);
return Promise.reject();
}
if (!this.paiClusterConfig) {
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
if (!this.paiToken) {
if (this.paiToken === undefined) {
throw new Error('PAI token is not initialized');
}
const stopJobRequest: request.Options = {
uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}/jobs/${trialJobDetail.paiJobName}/executionType`,
uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/user/${this.paiClusterConfig.userName}\
/jobs/${trialJobDetail.paiJobName}/executionType`,
method: 'PUT',
json: true,
body: {value: 'STOP'},
......@@ -217,10 +219,12 @@ class PAITrainingService implements TrainingService {
// Set trialjobDetail's early stopped field, to mark the job's cancellation source
trialJobDetail.isEarlyStopped = isEarlyStopped;
// tslint:disable-next-line:no-any
request(stopJobRequest, (error: Error, response: request.Response, body: any) => {
if (error || response.statusCode >= 400) {
if ((error !== undefined && error !== null) || response.statusCode >= 400) {
this.log.error(`PAI Training service: stop trial ${trialJobId} to PAI Cluster failed!`);
deferred.reject(error ? error.message : `Stop trial failed, http code: ${response.statusCode}`);
deferred.reject((error !== undefined && error !== null) ? error.message :
`Stop trial failed, http code: ${response.statusCode}`);
} else {
deferred.resolve();
}
......@@ -229,6 +233,7 @@ class PAITrainingService implements TrainingService {
return deferred.promise;
}
// tslint:disable: no-unsafe-any no-any
// tslint:disable-next-line:max-func-body-length
public async setClusterMetadata(key: string, value: string): Promise<void> {
const deferred : Deferred<void> = new Deferred<void>();
......@@ -256,47 +261,47 @@ class PAITrainingService implements TrainingService {
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
if (!this.paiClusterConfig) {
if (this.paiClusterConfig === undefined) {
this.log.error('pai cluster config is not initialized');
deferred.reject(new Error('pai cluster config is not initialized'));
break;
}
this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
//paiTrialConfig.outputDir could be null if it is not set in nnictl
if (this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null){
if (this.paiTrialConfig.outputDir === undefined || this.paiTrialConfig.outputDir === null) {
this.paiTrialConfig.outputDir = String.Format(
PAI_OUTPUT_DIR_FORMAT,
this.paiClusterConfig.host
).replace(/\r\n|\n|\r/gm, '');
)
.replace(/\r\n|\n|\r/gm, '');
}
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.paiTrialConfig.codeDir);
} catch(error) {
} catch (error) {
this.log.error(error);
deferred.reject(new Error(error));
break;
}
const hdfsDirContent = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern);
const hdfsDirContent: any = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern);
if (hdfsDirContent === null) {
throw new Error('Trial outputDir format Error');
}
const groups = hdfsDirContent.groups;
const groups: any = hdfsDirContent.groups;
if (groups === undefined) {
throw new Error('Trial outputDir format Error');
}
this.hdfsOutputHost = groups['host'];
this.hdfsOutputHost = groups.host;
//TODO: choose to use /${username} as baseDir
this.hdfsBaseDir = groups['baseDir'];
if(this.hdfsBaseDir === undefined) {
this.hdfsBaseDir = groups.baseDir;
if (this.hdfsBaseDir === undefined) {
this.hdfsBaseDir = '/';
}
let dataOutputHdfsClient;
let dataOutputHdfsClient: any;
if (this.paiClusterConfig.host === this.hdfsOutputHost && this.hdfsClient) {
dataOutputHdfsClient = this.hdfsClient;
} else {
......@@ -338,6 +343,7 @@ class PAITrainingService implements TrainingService {
return deferred.promise;
}
// tslint:enable: no-unsafe-any
public getClusterMetadata(key: string): Promise<string> {
const deferred : Deferred<string> = new Deferred<string>();
......@@ -358,6 +364,7 @@ class PAITrainingService implements TrainingService {
deferred.resolve();
this.log.info('PAI Training service rest server stopped successfully.');
} catch (error) {
// tslint:disable-next-line: no-unsafe-any
this.log.error(`PAI Training service rest server stopped failed, error: ${error.message}`);
deferred.reject(error);
}
......@@ -374,35 +381,35 @@ class PAITrainingService implements TrainingService {
const deferred : Deferred<boolean> = new Deferred<boolean>();
const trialJobDetail: PAITrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (!trialJobDetail) {
if (trialJobDetail === undefined) {
throw new Error(`Failed to find PAITrialJobDetail for job ${trialJobId}`);
}
if (!this.paiClusterConfig) {
if (this.paiClusterConfig === undefined) {
throw new Error('PAI Cluster config is not initialized');
}
if (!this.paiTrialConfig) {
if (this.paiTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
if (!this.paiToken) {
if (this.paiToken === undefined) {
throw new Error('PAI token is not initialized');
}
if (!this.hdfsBaseDir) {
if (this.hdfsBaseDir === undefined) {
throw new Error('hdfsBaseDir is not initialized');
}
if (!this.hdfsOutputHost) {
if (this.hdfsOutputHost === undefined) {
throw new Error('hdfsOutputHost is not initialized');
}
if (!this.paiRestServerPort) {
if (this.paiRestServerPort === undefined) {
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
this.paiRestServerPort = restServer.clusterRestServerPort;
}
// Make sure experiment code files is copied from local to HDFS
if (this.copyExpCodeDirPromise) {
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}
......@@ -420,13 +427,14 @@ class PAITrainingService implements TrainingService {
// Write file content ( parameter.cfg ) to local tmp folders
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>trialJobDetail.form);
if (trialForm) {
if (trialForm !== undefined) {
await fs.promises.writeFile(
path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' }
);
}
// tslint:disable-next-line: strict-boolean-expressions
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const version: string = this.versionCheck ? await getVersion() : '';
const nniPaiTrialCommand : string = String.Format(
......@@ -446,8 +454,10 @@ class PAITrainingService implements TrainingService {
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
version,
this.logCollection
).replace(/\r\n|\n|\r/gm, '');
)
.replace(/\r\n|\n|\r/gm, '');
// tslint:disable-next-line:no-console
console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
const paiTaskRoles : PAITaskRole[] = [
new PAITaskRole(
......@@ -507,9 +517,10 @@ class PAITrainingService implements TrainingService {
Authorization: `Bearer ${this.paiToken}`
}
};
// tslint:disable:no-any no-unsafe-any
request(submitJobRequest, (error: Error, response: request.Response, body: any) => {
if (error || response.statusCode >= 400) {
const errorMessage : string = error ? error.message :
if ((error !== undefined && error !== null) || response.statusCode >= 400) {
const errorMessage : string = (error !== undefined && error !== null) ? error.message :
`Submit trial ${trialJobId} failed, http code:${response.statusCode}, http body: ${response.body}`;
this.log.error(errorMessage);
trialJobDetail.status = 'FAILED';
......@@ -533,18 +544,18 @@ class PAITrainingService implements TrainingService {
private async statusCheckingLoop(): Promise<void> {
while (!this.stopping) {
try{
try {
await this.updatePaiToken();
}catch(error){
} catch (error) {
this.log.error(`${error}`);
//only throw error when initlize paiToken first time
if(!this.paiToken) {
if (this.paiToken === undefined) {
throw new Error(error);
}
}
await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
if (restServer.getErrorMessage) {
if (restServer.getErrorMessage !== undefined) {
throw new Error(restServer.getErrorMessage);
}
await delay(3000);
......@@ -575,17 +586,17 @@ class PAITrainingService implements TrainingService {
const currentTime: number = new Date().getTime();
//If pai token initialized and not reach the interval time, do not update
if (this.paiTokenUpdateTime && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval){
if (this.paiTokenUpdateTime !== undefined && (currentTime - this.paiTokenUpdateTime) < this.paiTokenUpdateInterval) {
return Promise.resolve();
}
if (!this.paiClusterConfig) {
if (this.paiClusterConfig === undefined) {
const paiClusterConfigError: string = `pai cluster config not initialized!`;
this.log.error(`${paiClusterConfigError}`);
throw Error(`${paiClusterConfigError}`);
}
const authentication_req: request.Options = {
const authenticationReq: request.Options = {
uri: `http://${this.paiClusterConfig.host}/rest-server/api/v1/token`,
method: 'POST',
json: true,
......@@ -595,12 +606,12 @@ class PAITrainingService implements TrainingService {
}
};
request(authentication_req, (error: Error, response: request.Response, body: any) => {
if (error) {
request(authenticationReq, (error: Error, response: request.Response, body: any) => {
if (error !== undefined && error !== null) {
this.log.error(`Get PAI token failed: ${error.message}`);
deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
} else {
if (response.statusCode !== 200){
if (response.statusCode !== 200) {
this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
deferred.reject(new Error(`Get PAI token failed: ${response.body}, please check paiConfig username or password`));
}
......@@ -619,8 +630,9 @@ class PAITrainingService implements TrainingService {
});
return Promise.race([timeoutDelay, deferred.promise])
.finally(() => clearTimeout(timeoutId));
.finally(() => { clearTimeout(timeoutId); });
}
// tslint:enable:no-any no-unsafe-any no-http-string
}
export { PAITrainingService };
......@@ -19,16 +19,20 @@
'use strict';
import {TrialConfig} from '../common/trialConfig'
import {TrialConfig} from '../common/trialConfig';
export class PAITrialConfig extends TrialConfig{
/**
* PAI configuration to run trials
*/
export class PAITrialConfig extends TrialConfig {
public readonly cpuNum: number;
public readonly memoryMB: number;
public readonly image: string;
public readonly dataDir: string;
public readonly outputDir: string;
constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string, dataDir: string, outputDir: string) {
constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number,
image: string, dataDir: string, outputDir: string) {
super(command, codeDir, gpuNum);
this.cpuNum = cpuNum;
this.memoryMB = memoryMB;
......
......@@ -21,10 +21,12 @@
import * as assert from 'assert';
import { getLogger, Logger } from '../../common/log';
import { TrialJobDetail } from '../../common/trainingService';
import { randomSelect } from '../../common/utils';
import { GPUInfo } from '../common/gpuData';
import { RemoteMachineTrialJobDetail, parseGpuIndices, RemoteMachineMeta, RemoteMachineScheduleResult, ScheduleResultType, SSHClientManager } from './remoteMachineData';
import { TrialJobDetail } from 'common/trainingService';
import {
parseGpuIndices, RemoteMachineMeta, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail, ScheduleResultType, SSHClientManager
} from './remoteMachineData';
/**
* A simple GPU scheduler implementation
......@@ -32,7 +34,7 @@ import { TrialJobDetail } from 'common/trainingService';
export class GPUScheduler {
private readonly machineSSHClientMap : Map<RemoteMachineMeta, SSHClientManager>;
private log: Logger = getLogger();
private readonly log: Logger = getLogger();
/**
* Constructor
......@@ -89,8 +91,8 @@ export class GPUScheduler {
* remove the job's gpu reversion
*/
public removeGpuReservation(trialJobId: string, trialJobMap: Map<string, RemoteMachineTrialJobDetail>): void {
let trialJobDetail: RemoteMachineTrialJobDetail | undefined = trialJobMap.get(trialJobId);
if(trialJobDetail === undefined) {
const trialJobDetail: RemoteMachineTrialJobDetail | undefined = trialJobMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`could not get trialJobDetail by id ${trialJobId}`);
}
if (trialJobDetail.rmMeta !== undefined &&
......@@ -98,12 +100,12 @@ export class GPUScheduler {
trialJobDetail.gpuIndices !== undefined &&
trialJobDetail.gpuIndices.length > 0) {
for (const gpuInfo of trialJobDetail.gpuIndices) {
let num: number | undefined = trialJobDetail.rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if(num !== undefined) {
if(num === 1) {
const num: number | undefined = trialJobDetail.rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if (num !== undefined) {
if (num === 1) {
trialJobDetail.rmMeta.occupiedGpuIndexMap.delete(gpuInfo.index);
} else {
trialJobDetail.rmMeta.occupiedGpuIndexMap.set(gpuInfo.index, num - 1)
trialJobDetail.rmMeta.occupiedGpuIndexMap.set(gpuInfo.index, num - 1);
}
}
}
......@@ -116,7 +118,6 @@ export class GPUScheduler {
const totalResourceMap: Map<RemoteMachineMeta, GPUInfo[]> = this.gpuResourceDetection();
const qualifiedRMs: RemoteMachineMeta[] = [];
totalResourceMap.forEach((gpuInfos: GPUInfo[], rmMeta: RemoteMachineMeta) => {
if (gpuInfos !== undefined && gpuInfos.length >= requiredGPUNum) {
qualifiedRMs.push(rmMeta);
}
......@@ -154,6 +155,7 @@ export class GPUScheduler {
}
}
this.log.debug(`designated gpu indices: ${designatedGpuIndices}`);
// tslint:disable: strict-boolean-expressions
rmMeta.gpuSummary.gpuInfos.forEach((gpuInfo: GPUInfo) => {
// if the GPU has active process, OR be reserved by a job,
// or index not in gpuIndices configuration in machineList,
......@@ -161,10 +163,10 @@ export class GPUScheduler {
// We should NOT allocate this GPU
// if users set useActiveGpu, use the gpu whether there is another activeProcess
if (designatedGpuIndices === undefined || designatedGpuIndices.has(gpuInfo.index)) {
if(rmMeta.occupiedGpuIndexMap !== undefined) {
let num = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
let maxTrialNumPerGpu: number = rmMeta.maxTrialNumPerGpu? rmMeta.maxTrialNumPerGpu: 1;
if((num === undefined && (!rmMeta.useActiveGpu && gpuInfo.activeProcessNum === 0 || rmMeta.useActiveGpu)) ||
if (rmMeta.occupiedGpuIndexMap !== undefined) {
const num: number | undefined = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
const maxTrialNumPerGpu: number = rmMeta.maxTrialNumPerGpu ? rmMeta.maxTrialNumPerGpu : 1;
if ((num === undefined && (!rmMeta.useActiveGpu && gpuInfo.activeProcessNum === 0 || rmMeta.useActiveGpu)) ||
(num !== undefined && num < maxTrialNumPerGpu)) {
availableGPUs.push(gpuInfo);
}
......@@ -179,6 +181,7 @@ export class GPUScheduler {
return totalResourceMap;
}
// tslint:enable: strict-boolean-expressions
private selectMachine(rmMetas: RemoteMachineMeta[]): RemoteMachineMeta {
assert(rmMetas !== undefined && rmMetas.length > 0);
......@@ -196,23 +199,28 @@ export class GPUScheduler {
assert(gpuInfos.length >= requiredGPUNum);
const allocatedGPUs: GPUInfo[] = this.selectGPUsForTrial(gpuInfos, requiredGPUNum);
allocatedGPUs.forEach((gpuInfo: GPUInfo) => {
if(rmMeta.occupiedGpuIndexMap !== undefined) {
let num = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if(num === undefined) {
if (rmMeta.occupiedGpuIndexMap !== undefined) {
let num: number | undefined = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index);
if (num === undefined) {
num = 0;
}
rmMeta.occupiedGpuIndexMap.set(gpuInfo.index, num + 1);
}else {
} else {
throw new Error(`Machine ${rmMeta.ip} occupiedGpuIndexMap initialize error!`);
}
});
trialJobDetail.gpuIndices = allocatedGPUs;
trialJobDetail.rmMeta = rmMeta;
return {
resultType: ScheduleResultType.SUCCEED,
scheduleInfo: {
rmMeta: rmMeta,
cuda_visible_device: allocatedGPUs.map((gpuInfo: GPUInfo) => { return gpuInfo.index; }).join(',')
cuda_visible_device: allocatedGPUs
.map((gpuInfo: GPUInfo) => {
return gpuInfo.index;
})
.join(',')
}
};
}
......
......@@ -23,7 +23,7 @@ import * as fs from 'fs';
import { Client, ConnectConfig } from 'ssh2';
import { Deferred } from 'ts-deferred';
import { JobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService';
import { GPUSummary, GPUInfo } from '../common/gpuData';
import { GPUInfo, GPUSummary } from '../common/gpuData';
/**
* Metadata of remote machine for configuration and statuc query
......@@ -73,7 +73,6 @@ export class RemoteCommandResult {
/**
* RemoteMachineTrialJobDetail
*/
// tslint:disable-next-line:max-classes-per-file
export class RemoteMachineTrialJobDetail implements TrialJobDetail {
public id: string;
public status: TrialJobStatus;
......@@ -98,7 +97,7 @@ export class RemoteMachineTrialJobDetail implements TrialJobDetail {
this.form = form;
this.sequenceId = sequenceId;
this.tags = [];
this.gpuIndices = []
this.gpuIndices = [];
}
}
......@@ -121,17 +120,20 @@ export class SSHClient {
return this.usedConnectionNumber;
}
public addUsedConnectionNumber() {
public addUsedConnectionNumber(): void {
this.usedConnectionNumber += 1;
}
public minusUsedConnectionNumber() {
public minusUsedConnectionNumber(): void {
this.usedConnectionNumber -= 1;
}
}
/**
* The remote machine ssh client manager
*/
export class SSHClientManager {
private sshClientArray: SSHClient[];
private readonly sshClientArray: SSHClient[];
private readonly maxTrialNumberPerConnection: number;
private readonly rmMeta: RemoteMachineMeta;
constructor(sshClientArray: SSHClient[], maxTrialNumberPerConnection: number, rmMeta: RemoteMachineMeta) {
......@@ -140,122 +142,128 @@ export class SSHClientManager {
this.maxTrialNumberPerConnection = maxTrialNumberPerConnection;
}
/**
* Create a new ssh connection client and initialize it
*/
private initNewSSHClient(): Promise<Client> {
const deferred: Deferred<Client> = new Deferred<Client>();
const conn: Client = new Client();
let connectConfig: ConnectConfig = {
host: this.rmMeta.ip,
port: this.rmMeta.port,
username: this.rmMeta.username };
if (this.rmMeta.passwd) {
connectConfig.password = this.rmMeta.passwd;
} else if(this.rmMeta.sshKeyPath) {
if(!fs.existsSync(this.rmMeta.sshKeyPath)) {
//SSh key path is not a valid file, reject
deferred.reject(new Error(`${this.rmMeta.sshKeyPath} does not exist.`));
}
const privateKey: string = fs.readFileSync(this.rmMeta.sshKeyPath, 'utf8');
connectConfig.privateKey = privateKey;
connectConfig.passphrase = this.rmMeta.passphrase;
} else {
deferred.reject(new Error(`No valid passwd or sshKeyPath is configed.`));
}
conn.on('ready', () => {
this.addNewSSHClient(conn);
deferred.resolve(conn);
}).on('error', (err: Error) => {
// SSH connection error, reject with error message
deferred.reject(new Error(err.message));
}).connect(connectConfig);
return deferred.promise;
}
/**
* find a available ssh client in ssh array, if no ssh client available, return undefined
*/
public async getAvailableSSHClient(): Promise<Client> {
const deferred: Deferred<Client> = new Deferred<Client>();
for (const index in this.sshClientArray) {
let connectionNumber: number = this.sshClientArray[index].getUsedConnectionNumber;
if(connectionNumber < this.maxTrialNumberPerConnection) {
for (const index of this.sshClientArray.keys()) {
const connectionNumber: number = this.sshClientArray[index].getUsedConnectionNumber;
if (connectionNumber < this.maxTrialNumberPerConnection) {
this.sshClientArray[index].addUsedConnectionNumber();
deferred.resolve(this.sshClientArray[index].getSSHClientInstance);
return deferred.promise;
}
};
}
//init a new ssh client if could not get an available one
return await this.initNewSSHClient();
return this.initNewSSHClient();
}
/**
* add a new ssh client to sshClientArray
* @param sshClient
* @param sshClient SSH Client
*/
public addNewSSHClient(client: Client) {
public addNewSSHClient(client: Client): void {
this.sshClientArray.push(new SSHClient(client, 1));
}
/**
* first ssh clilent instance is used for gpu collector and host job
* first ssh client instance is used for gpu collector and host job
*/
public getFirstSSHClient() {
public getFirstSSHClient(): Client {
return this.sshClientArray[0].getSSHClientInstance;
}
/**
* close all of ssh client
*/
public closeAllSSHClient() {
for (let sshClient of this.sshClientArray) {
public closeAllSSHClient(): void {
for (const sshClient of this.sshClientArray) {
sshClient.getSSHClientInstance.end();
}
}
/**
* retrieve resource, minus a number for given ssh client
* @param client
* @param client SSH Client
*/
public releaseConnection(client: Client | undefined) {
if(!client) {
public releaseConnection(client: Client | undefined): void {
if (client === undefined) {
throw new Error(`could not release a undefined ssh client`);
}
for(let index in this.sshClientArray) {
if(this.sshClientArray[index].getSSHClientInstance === client) {
for (const index of this.sshClientArray.keys()) {
if (this.sshClientArray[index].getSSHClientInstance === client) {
this.sshClientArray[index].minusUsedConnectionNumber();
break;
}
}
}
}
/**
* Create a new ssh connection client and initialize it
*/
// tslint:disable:non-literal-fs-path
private initNewSSHClient(): Promise<Client> {
const deferred: Deferred<Client> = new Deferred<Client>();
const conn: Client = new Client();
const connectConfig: ConnectConfig = {
host: this.rmMeta.ip,
port: this.rmMeta.port,
username: this.rmMeta.username };
if (this.rmMeta.passwd !== undefined) {
connectConfig.password = this.rmMeta.passwd;
} else if (this.rmMeta.sshKeyPath !== undefined) {
if (!fs.existsSync(this.rmMeta.sshKeyPath)) {
//SSh key path is not a valid file, reject
deferred.reject(new Error(`${this.rmMeta.sshKeyPath} does not exist.`));
}
const privateKey: string = fs.readFileSync(this.rmMeta.sshKeyPath, 'utf8');
connectConfig.privateKey = privateKey;
connectConfig.passphrase = this.rmMeta.passphrase;
} else {
deferred.reject(new Error(`No valid passwd or sshKeyPath is configed.`));
}
conn.on('ready', () => {
this.addNewSSHClient(conn);
deferred.resolve(conn);
})
.on('error', (err: Error) => {
// SSH connection error, reject with error message
deferred.reject(new Error(err.message));
})
.connect(connectConfig);
return deferred.promise;
}
}
export type RemoteMachineScheduleResult = { scheduleInfo : RemoteMachineScheduleInfo | undefined; resultType : ScheduleResultType};
export type RemoteMachineScheduleInfo = { rmMeta : RemoteMachineMeta; cuda_visible_device : string};
export enum ScheduleResultType {
/* Schedule succeeded*/
// Schedule succeeded
SUCCEED,
/* Temporarily, no enough available GPU right now */
// Temporarily, no enough available GPU right now
TMP_NO_AVAILABLE_GPU,
/* Cannot match requirement even if all GPU are a*/
// Cannot match requirement even if all GPU are a
REQUIRE_EXCEED_TOTAL
}
export const REMOTEMACHINE_TRIAL_COMMAND_FORMAT: string =
`#!/bin/bash
export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} export MULTI_PHASE={5}
export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} \
NNI_TRIAL_SEQ_ID={4} export MULTI_PHASE={5}
cd $NNI_SYS_DIR
sh install_nni.sh
echo $$ >{6}
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' --nni_manager_version '{10}' --log_collection '{11}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' \
--nni_manager_version '{10}' --log_collection '{11}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{12}`;
export const HOST_JOB_SHELL_FORMAT: string =
......
......@@ -19,17 +19,17 @@
'use strict';
import * as component from '../../common/component';
import { Inject } from 'typescript-ioc';
import * as component from '../../common/component';
import { ClusterJobRestServer } from '../common/clusterJobRestServer';
import { RemoteMachineTrainingService } from './remoteMachineTrainingService';
import { ClusterJobRestServer } from '../common/clusterJobRestServer'
/**
* RemoteMachine Training service Rest server, provides rest RemoteMachine to support remotemachine job metrics update
*
*/
@component.Singleton
export class RemoteMachineJobRestServer extends ClusterJobRestServer{
export class RemoteMachineJobRestServer extends ClusterJobRestServer {
@Inject
private readonly remoteMachineTrainingService : RemoteMachineTrainingService;
......@@ -41,6 +41,7 @@ export class RemoteMachineJobRestServer extends ClusterJobRestServer{
this.remoteMachineTrainingService = component.get(RemoteMachineTrainingService);
}
// tslint:disable-next-line:no-any
protected handleTrialMetrics(jobId : string, metrics : any[]) : void {
// Split metrics array into single metric, then emit
// Warning: If not split metrics into single ones, the behavior will be UNKNOWNls
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment