Unverified Commit 12410686 authored by chicm-ms's avatar chicm-ms Committed by GitHub
Browse files

Merge pull request #20 from microsoft/master

pull code
parents 611a45fc 61fec446
......@@ -33,6 +33,7 @@
"@types/chai-as-promised": "^7.1.0",
"@types/express": "^4.16.0",
"@types/glob": "^7.1.1",
"@types/js-base64": "^2.3.1",
"@types/mocha": "^5.2.5",
"@types/node": "10.12.18",
"@types/request": "^2.47.1",
......
......@@ -31,7 +31,7 @@ import { createRestHandler } from './restHandler';
* NNI Main rest server, provides rest API to support
* # nnictl CLI tool
* # NNI WebUI
*
*
*/
@component.Singleton
export class NNIRestServer extends RestServer {
......
......@@ -146,7 +146,7 @@ class NNIRestHandler {
});
});
}
private importData(router: Router): void {
router.post('/experiment/import-data', (req: Request, res: Response) => {
this.nniManager.importData(JSON.stringify(req.body)).then(() => {
......@@ -203,7 +203,7 @@ class NNIRestHandler {
res.send();
} catch (err) {
// setClusterMetata is a step of initialization, so any exception thrown is a fatal
this.handle_error(err, res, true);
this.handle_error(NNIError.FromError(err), res, true);
}
});
}
......
......@@ -133,7 +133,7 @@ export namespace ValidationSchemas {
})
}),
nni_manager_ip: joi.object({
nniManagerIp: joi.string().min(1)
nniManagerIp: joi.string().min(1)
})
}
};
......
......@@ -20,22 +20,24 @@
'use strict';
import * as assert from 'assert';
import { Request, Response, Router } from 'express';
// tslint:disable-next-line:no-implicit-dependencies
import * as bodyParser from 'body-parser';
import { Request, Response, Router } from 'express';
import * as fs from 'fs';
import * as path from 'path';
import { Writable } from 'stream';
import { String } from 'typescript-string-operations';
import * as component from '../../common/component';
import * as fs from 'fs'
import * as path from 'path'
import { getBasePort, getExperimentId } from '../../common/experimentStartupInfo';
import { RestServer } from '../../common/restServer'
import { RestServer } from '../../common/restServer';
import { getLogDir } from '../../common/utils';
import { Writable } from 'stream';
/**
* Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update
*
*
*/
@component.Singleton
export abstract class ClusterJobRestServer extends RestServer{
export abstract class ClusterJobRestServer extends RestServer {
private readonly API_ROOT_URL: string = '/api/v1/nni-pai';
private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?<metrics>.*?)'`;
......@@ -51,22 +53,23 @@ export abstract class ClusterJobRestServer extends RestServer{
constructor() {
super();
const basePort: number = getBasePort();
assert(basePort && basePort > 1024);
this.port = basePort + 1;
assert(basePort !== undefined && basePort > 1024);
this.port = basePort + 1;
}
public get clusterRestServerPort(): number {
if(!this.port) {
if (this.port === undefined) {
throw new Error('PAI Rest server port is undefined');
}
return this.port;
}
public get getErrorMessage(): string | undefined{
public get getErrorMessage(): string | undefined {
return this.errorMessage;
}
public set setEnableVersionCheck(versionCheck: boolean) {
this.enableVersionCheck = versionCheck;
}
......@@ -79,11 +82,15 @@ export abstract class ClusterJobRestServer extends RestServer{
this.app.use(this.API_ROOT_URL, this.createRestHandler());
}
// Abstract method to handle trial metrics data
// tslint:disable-next-line:no-any
protected abstract handleTrialMetrics(jobId : string, trialMetrics : any[]) : void;
// tslint:disable: no-unsafe-any no-any
private createRestHandler() : Router {
const router: Router = Router();
// tslint:disable-next-line:typedef
router.use((req: Request, res: Response, next) => {
router.use((req: Request, res: Response, next: any) => {
this.log.info(`${req.method}: ${req.url}: body:\n${JSON.stringify(req.body, undefined, 4)}`);
res.setHeader('Content-Type', 'application/json');
next();
......@@ -92,7 +99,7 @@ export abstract class ClusterJobRestServer extends RestServer{
router.post(`/version/${this.expId}/:trialId`, (req: Request, res: Response) => {
if (this.enableVersionCheck) {
try {
const checkResultSuccess: boolean = req.body.tag === 'VCSuccess'? true: false;
const checkResultSuccess: boolean = req.body.tag === 'VCSuccess' ? true : false;
if (this.versionCheckSuccess !== undefined && this.versionCheckSuccess !== checkResultSuccess) {
this.errorMessage = 'Version check error, version check result is inconsistent!';
this.log.error(this.errorMessage);
......@@ -103,7 +110,7 @@ export abstract class ClusterJobRestServer extends RestServer{
this.versionCheckSuccess = false;
this.errorMessage = req.body.msg;
}
} catch(err) {
} catch (err) {
this.log.error(`json parse metrics error: ${err}`);
res.status(500);
res.send(err.message);
......@@ -122,8 +129,7 @@ export abstract class ClusterJobRestServer extends RestServer{
this.handleTrialMetrics(req.body.jobId, req.body.metrics);
res.send();
}
catch(err) {
} catch (err) {
this.log.error(`json parse metrics error: ${err}`);
res.status(500);
res.send(err.message);
......@@ -131,35 +137,37 @@ export abstract class ClusterJobRestServer extends RestServer{
});
router.post(`/stdout/${this.expId}/:trialId`, (req: Request, res: Response) => {
if(this.enableVersionCheck && !this.versionCheckSuccess && !this.errorMessage) {
this.errorMessage = `Version check failed, didn't get version check response from trialKeeper, please check your NNI version in `
+ `NNIManager and TrialKeeper!`
if (this.enableVersionCheck && (this.versionCheckSuccess === undefined || !this.versionCheckSuccess)
&& this.errorMessage === undefined) {
this.errorMessage = `Version check failed, didn't get version check response from trialKeeper,`
+ ` please check your NNI version in NNIManager and TrialKeeper!`;
}
const trialLogPath: string = path.join(getLogDir(), `trial_${req.params.trialId}.log`);
try {
let skipLogging: boolean = false;
if(req.body.tag === 'trial' && req.body.msg !== undefined) {
const metricsContent = req.body.msg.match(this.NNI_METRICS_PATTERN);
if(metricsContent && metricsContent.groups) {
this.handleTrialMetrics(req.params.trialId, [metricsContent.groups['metrics']]);
if (req.body.tag === 'trial' && req.body.msg !== undefined) {
const metricsContent: any = req.body.msg.match(this.NNI_METRICS_PATTERN);
if (metricsContent && metricsContent.groups) {
const key: string = 'metrics';
this.handleTrialMetrics(req.params.trialId, [metricsContent.groups[key]]);
skipLogging = true;
}
}
if(!skipLogging){
if (!skipLogging) {
// Construct write stream to write remote trial's log into local file
// tslint:disable-next-line:non-literal-fs-path
const writeStream: Writable = fs.createWriteStream(trialLogPath, {
flags: 'a+',
encoding: 'utf8',
autoClose: true
});
writeStream.write(req.body.msg + '\n');
writeStream.write(String.Format('{0}\n', req.body.msg));
writeStream.end();
}
res.send();
}
catch(err) {
} catch (err) {
this.log.error(`json parse stdout data error: ${err}`);
res.status(500);
res.send(err.message);
......@@ -168,7 +176,5 @@ export abstract class ClusterJobRestServer extends RestServer{
return router;
}
/** Abstract method to handle trial metrics data */
protected abstract handleTrialMetrics(jobId : string, trialMetrics : any[]) : void;
}
\ No newline at end of file
// tslint:enable: no-unsafe-any no-any
}
......@@ -19,12 +19,12 @@
'use strict';
export const CONTAINER_INSTALL_NNI_SHELL_FORMAT: string =
export const CONTAINER_INSTALL_NNI_SHELL_FORMAT: string =
`#!/bin/bash
if python3 -c 'import nni' > /dev/null 2>&1; then
# nni module is already installed, skip
return
else
# Install nni
python3 -m pip install --user --upgrade nni
fi`;
\ No newline at end of file
python3 -m pip install --user --upgrade nni
fi`;
......@@ -59,17 +59,17 @@ export class GPUSummary {
}
}
export const GPU_INFO_COLLECTOR_FORMAT_LINUX: string =
export const GPU_INFO_COLLECTOR_FORMAT_LINUX: string =
`
#!/bin/bash
export METRIC_OUTPUT_DIR={0}
echo $$ >{1}
python3 -m nni_gpu_tool.gpu_metrics_collector
`
`;
export const GPU_INFO_COLLECTOR_FORMAT_WINDOWS: string =
export const GPU_INFO_COLLECTOR_FORMAT_WINDOWS: string =
`
$env:METRIC_OUTPUT_DIR="{0}"
$app = Start-Process "python" -ArgumentList "-m nni_gpu_tool.gpu_metrics_collector" -passthru -NoNewWindow
Write $app.ID | Out-File {1} -NoNewline -encoding utf8
`
\ No newline at end of file
`;
......@@ -21,7 +21,10 @@
import { TrialJobStatus } from '../../common/trainingService';
// tslint:disable-next-line:max-classes-per-file
/**
* Trial job metrics class
* Representing trial job metrics properties
*/
export class JobMetrics {
public readonly jobId: string;
public readonly metrics: string[];
......
......@@ -24,13 +24,13 @@
* Representing trial job configurable properties
*/
export class TrialConfig {
/** Trail command */
// Trail command
public readonly command : string;
/** Code directory */
// Code directory
public readonly codeDir : string;
/** Required GPU number for trial job. The number should be in [0,100] */
// Required GPU number for trial job. The number should be in [0,100]
public readonly gpuNum : number;
/**
......@@ -44,4 +44,4 @@ export class TrialConfig {
this.codeDir = codeDir;
this.gpuNum = gpuNum;
}
}
\ No newline at end of file
}
import { getLogger } from "common/log";
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
......@@ -21,44 +19,55 @@ import { getLogger } from "common/log";
'use strict';
import { countFilesRecursively } from '../../common/utils'
import * as cpp from 'child-process-promise';
import * as cp from 'child_process';
import * as os from 'os';
import * as fs from 'fs';
import { getNewLine } from '../../common/utils';
import { GPU_INFO_COLLECTOR_FORMAT_LINUX, GPU_INFO_COLLECTOR_FORMAT_WINDOWS } from './gpuData';
import * as os from 'os';
import * as path from 'path';
import { String } from 'typescript-string-operations';
import { file } from "../../node_modules/@types/tmp";
import { countFilesRecursively, getNewLine, validateFileNameRecursively } from '../../common/utils';
import { file } from '../../node_modules/@types/tmp';
import { GPU_INFO_COLLECTOR_FORMAT_LINUX, GPU_INFO_COLLECTOR_FORMAT_WINDOWS } from './gpuData';
/**
* Validate codeDir, calculate file count recursively under codeDir, and throw error if any rule is broken
*
*
* @param codeDir codeDir in nni config file
* @returns file number under codeDir
*/
// tslint:disable: no-redundant-jsdoc
export async function validateCodeDir(codeDir: string) : Promise<number> {
let fileCount: number | undefined;
let fileNameValid: boolean = true;
try {
fileCount = await countFilesRecursively(codeDir);
} catch(error) {
} catch (error) {
throw new Error(`Call count file error: ${error}`);
}
try {
fileNameValid = await validateFileNameRecursively(codeDir);
} catch(error) {
throw new Error(`Validate file name error: ${error}`);
}
if(fileCount && fileCount > 1000) {
const errMessage: string = `Too many files(${fileCount} found}) in ${codeDir},`
if (fileCount !== undefined && fileCount > 1000) {
const errMessage: string = `Too many files(${fileCount} found}) in ${codeDir},`
+ ` please check if it's a valid code dir`;
throw new Error(errMessage);
throw new Error(errMessage);
}
if(!fileNameValid) {
const errMessage: string = `File name in ${codeDir} is not valid, please check file names, only support digit number、alphabet and (.-_) in file name.`;
throw new Error(errMessage);
}
return fileCount;
}
/**
* crete a new directory
* @param directory
* @param directory
*/
export async function execMkdir(directory: string): Promise<void> {
if (process.platform === 'win32') {
......@@ -66,6 +75,7 @@ export async function execMkdir(directory: string): Promise<void> {
} else {
await cpp.exec(`mkdir -p ${directory}`);
}
return Promise.resolve();
}
......@@ -80,12 +90,13 @@ export async function execCopydir(source: string, destination: string): Promise<
} else {
await cpp.exec(`cp -r ${source} ${destination}`);
}
return Promise.resolve();
}
/**
* crete a new file
* @param filename
* @param filename
*/
export async function execNewFile(filename: string): Promise<void> {
if (process.platform === 'win32') {
......@@ -93,16 +104,17 @@ export async function execNewFile(filename: string): Promise<void> {
} else {
await cpp.exec(`touch ${filename}`);
}
return Promise.resolve();
}
/**
* run script
* run script using powershell or bash
* @param filePath
*/
export function execScript(filePath: string): cp.ChildProcess {
export function runScript(filePath: string): cp.ChildProcess {
if (process.platform === 'win32') {
return cp.exec(`powershell.exe -file ${filePath}`);
return cp.exec(`powershell.exe -ExecutionPolicy Bypass -file ${filePath}`);
} else {
return cp.exec(`bash ${filePath}`);
}
......@@ -110,7 +122,7 @@ export function execScript(filePath: string): cp.ChildProcess {
/**
* output the last line of a file
* @param filePath
* @param filePath
*/
export async function execTail(filePath: string): Promise<cpp.childProcessPromise.Result> {
let cmdresult: cpp.childProcessPromise.Result;
......@@ -119,12 +131,13 @@ export async function execTail(filePath: string): Promise<cpp.childProcessPromis
} else {
cmdresult = await cpp.exec(`tail -n 1 ${filePath}`);
}
return Promise.resolve(cmdresult);
}
/**
* delete a directory
* @param directory
* @param directory
*/
export async function execRemove(directory: string): Promise<void> {
if (process.platform === 'win32') {
......@@ -132,12 +145,13 @@ export async function execRemove(directory: string): Promise<void> {
} else {
await cpp.exec(`rm -rf ${directory}`);
}
return Promise.resolve();
}
/**
* kill a process
* @param directory
* @param directory
*/
export async function execKill(pid: string): Promise<void> {
if (process.platform === 'win32') {
......@@ -145,37 +159,39 @@ export async function execKill(pid: string): Promise<void> {
} else {
await cpp.exec(`pkill -P ${pid}`);
}
return Promise.resolve();
}
/**
* set environment variable
* get command of setting environment variable
* @param variable
* @returns command string
* @returns command string
*/
export function setEnvironmentVariable(variable: { key: string; value: string }): string {
if (process.platform === 'win32') {
return `$env:${variable.key}="${variable.value}"`;
}
else{
} else {
return `export ${variable.key}=${variable.value}`;
}
}
/**
* Compress files in directory to tar file
* @param source_path
* @param tar_path
* @param sourcePath
* @param tarPath
*/
export async function tarAdd(tar_path: string, source_path: string): Promise<void> {
export async function tarAdd(tarPath: string, sourcePath: string): Promise<void> {
if (process.platform === 'win32') {
tar_path = tar_path.split('\\').join('\\\\');
source_path = source_path.split('\\').join('\\\\');
let script: string[] = [];
const tarFilePath: string = tarPath.split('\\')
.join('\\\\');
const sourceFilePath: string = sourcePath.split('\\')
.join('\\\\');
const script: string[] = [];
script.push(
`import os`,
`import tarfile`,
String.Format(`tar = tarfile.open("{0}","w:gz")\r\nfor root,dir,files in os.walk("{1}"):`, tar_path, source_path),
String.Format(`tar = tarfile.open("{0}","w:gz")\r\nfor root,dir,files in os.walk("{1}"):`, tarFilePath, sourceFilePath),
` for file in files:`,
` fullpath = os.path.join(root,file)`,
` tar.add(fullpath, arcname=file)`,
......@@ -184,39 +200,40 @@ export async function tarAdd(tar_path: string, source_path: string): Promise<voi
const tarScript: string = path.join(os.tmpdir(), 'tar.py');
await cpp.exec(`python ${tarScript}`);
} else {
await cpp.exec(`tar -czf ${tar_path} -C ${source_path} .`);
await cpp.exec(`tar -czf ${tarPath} -C ${sourcePath} .`);
}
return Promise.resolve();
}
/**
* generate script file name
* @param fileNamePrefix
* @param fileNamePrefix
*/
export function getScriptName(fileNamePrefix: string): string {
if (process.platform === 'win32') {
return fileNamePrefix + '.ps1';
return String.Format('{0}.ps1', fileNamePrefix);
} else {
return fileNamePrefix + '.sh';
return String.Format('{0}.sh', fileNamePrefix);
}
}
/**
* generate script file
* @param gpuMetricCollectorScriptFolder
* @param gpuMetricCollectorScriptFolder
*/
export function getgpuMetricsCollectorScriptContent(gpuMetricCollectorScriptFolder: string): string {
if(process.platform === 'win32') {
if (process.platform === 'win32') {
return String.Format(
GPU_INFO_COLLECTOR_FORMAT_WINDOWS,
gpuMetricCollectorScriptFolder,
path.join(gpuMetricCollectorScriptFolder, 'pid'),
path.join(gpuMetricCollectorScriptFolder, 'pid')
);
} else {
return String.Format(
GPU_INFO_COLLECTOR_FORMAT_LINUX,
gpuMetricCollectorScriptFolder,
path.join(gpuMetricCollectorScriptFolder, 'pid'),
path.join(gpuMetricCollectorScriptFolder, 'pid')
);
}
}
......@@ -19,108 +19,126 @@
'use strict';
import * as fs from 'fs'
import * as azureStorage from 'azure-storage';
import * as fs from 'fs';
import * as path from 'path';
import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations';
import { getLogger } from '../../common/log';
import { mkDirP } from '../../common/utils';
// tslint:disable: no-redundant-jsdoc no-any no-unsafe-any
export namespace AzureStorageClientUtility {
/**
* create azure share
* @param fileServerClient
* @param azureShare
* @param fileServerClient
* @param azureShare
*/
export async function createShare(fileServerClient: any, azureShare: any): Promise<void>{
export async function createShare(fileServerClient: any, azureShare: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
fileServerClient.createShareIfNotExists(azureShare, function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Create share failed:, ${error}`);
deferred.reject(error)
}else{
deferred.resolve()
fileServerClient.createShareIfNotExists(azureShare, (error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Create share failed:, ${error}`);
deferred.reject(error);
} else {
deferred.resolve();
}
})
});
return deferred.promise;
}
/**
* Create a new directory (NOT recursively) in azure file storage.
* @param fileServerClient
* @param azureFoler
* @param azureShare
* @param fileServerClient
* @param azureFoler
* @param azureShare
*/
export async function createDirectory(fileServerClient: any, azureFoler: any, azureShare: any): Promise<void>{
export async function createDirectory(fileServerClient: azureStorage.FileService, azureFoler: any, azureShare: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
fileServerClient.createDirectoryIfNotExists(azureShare, azureFoler, function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Create directory failed:, ${error}`);
fileServerClient.createDirectoryIfNotExists(azureShare, azureFoler, (error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Create directory failed:, ${error}`);
deferred.reject(error);
}else{
} else {
deferred.resolve();
}
})
});
return deferred.promise;
}
/**
* Create a new directory recursively in azure file storage
* @param fileServerClient
* @param azureDirectory
* @param azureDirectory
*/
export async function createDirectoryRecursive(fileServerClient: any, azureDirectory: any, azureShare: any): Promise<void>{
export async function createDirectoryRecursive(fileServerClient: azureStorage.FileService, azureDirectory: string,
azureShare: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
let directories = azureDirectory.split("/");
let rootDirectory = ""
for(let directory of directories){
const directories: string[] = azureDirectory.split('/');
let rootDirectory: string = '';
for (const directory of directories) {
rootDirectory += directory;
await createDirectory(fileServerClient, rootDirectory, azureShare);
rootDirectory += '/';
}
deferred.resolve();
return deferred.promise;
}
/**
* upload a file to azure storage
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
*/
async function uploadFileToAzure(fileServerClient: any, azureDirectory: any, azureFileName: any, azureShare: any, localFilePath: any): Promise<void>{
async function uploadFileToAzure(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any,
localFilePath: string): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
await fileServerClient.createFileFromLocalFile(azureShare, azureDirectory, azureFileName, localFilePath, function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Upload file failed:, ${error}`);
await fileServerClient.createFileFromLocalFile(azureShare, azureDirectory, azureFileName, localFilePath,
(error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Upload file failed:, ${error}`);
deferred.reject(error);
}else{
} else {
deferred.resolve();
}
})
});
return deferred.promise;
}
/**
* download a file from azure storage
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
*/
async function downloadFile(fileServerClient: any, azureDirectory: any, azureFileName: any, azureShare: any, localFilePath: any): Promise<void>{
async function downloadFile(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any,
localFilePath: string): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
await fileServerClient.getFileToStream(azureShare, azureDirectory, azureFileName, fs.createWriteStream(localFilePath), function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Download file failed:, ${error}`);
// tslint:disable-next-line:non-literal-fs-path
await fileServerClient.getFileToStream(azureShare, azureDirectory, azureFileName, fs.createWriteStream(localFilePath),
(error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Download file failed:, ${error}`);
deferred.reject(error);
}else{
deferred.resolve();
} else {
deferred.resolve();
}
})
});
return deferred.promise;
}
......@@ -131,67 +149,79 @@ export namespace AzureStorageClientUtility {
* @param azureShare : the azure share used
* @param localDirectory : local directory to be uploaded
*/
export async function uploadDirectory(fileServerClient: any, azureDirectory: any, azureShare: any, localDirectory: any): Promise<void>{
// tslint:disable:non-literal-fs-path
export async function uploadDirectory(fileServerClient: azureStorage.FileService, azureDirectory: string, azureShare: any,
localDirectory: string): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
const fileNameArray: string[] = fs.readdirSync(localDirectory);
await createDirectoryRecursive(fileServerClient, azureDirectory, azureShare);
for(let fileName of fileNameArray){
for (const fileName of fileNameArray) {
const fullFilePath: string = path.join(localDirectory, fileName);
try {
if (fs.lstatSync(fullFilePath).isFile()) {
if (fs.lstatSync(fullFilePath)
.isFile()) {
await uploadFileToAzure(fileServerClient, azureDirectory, fileName, azureShare, fullFilePath);
} else {
// If filePath is a directory, recuisively copy it to azure
await uploadDirectory(fileServerClient, azureDirectory + '/' + fileName, azureShare, fullFilePath);
await uploadDirectory(fileServerClient, String.Format('{0}/{1}', azureDirectory, fileName), azureShare, fullFilePath);
}
} catch(error) {
} catch (error) {
deferred.reject(error);
return deferred.promise;
}
}
// All files/directories are copied successfully, resolve
deferred.resolve();
return deferred.promise;
}
/**
* downlod a directory from azure
* @param fileServerClient
* @param azureDirectory
* @param azureShare
* @param localDirectory
* @param fileServerClient
* @param azureDirectory
* @param azureShare
* @param localDirectory
*/
export async function downloadDirectory(fileServerClient: any, azureDirectory:any, azureShare: any, localDirectory: any): Promise<void>{
export async function downloadDirectory(fileServerClient: any, azureDirectory: string, azureShare: any, localDirectory: string):
Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
mkDirP(localDirectory);
fileServerClient.listFilesAndDirectoriesSegmented(azureShare, azureDirectory, 'null', function(error: any, result: any, response: any) {
if(('entries' in result) === false){
getLogger().error(`list files failed, can't get entries in result`);
await mkDirP(localDirectory);
fileServerClient.listFilesAndDirectoriesSegmented(azureShare, azureDirectory, 'null',
async (error: any, result: any, response: any) => {
if (('entries' in result) === false) {
getLogger()
.error(`list files failed, can't get entries in result`);
throw new Error(`list files failed, can't get entries in result`);
}
if(('files' in result['entries']) === false){
getLogger().error(`list files failed, can't get files in result['entries']`);
if (('files' in result.entries) === false) {
getLogger()
.error(`list files failed, can't get files in result['entries']`);
throw new Error(`list files failed, can't get files in result['entries']`);
}
if(('directories' in result['directories']) === false){
getLogger().error(`list files failed, can't get directories in result['entries']`);
if (('directories' in result.directories) === false) {
getLogger()
.error(`list files failed, can't get directories in result['entries']`);
throw new Error(`list files failed, can't get directories in result['entries']`);
}
for(var fileName of result['entries']['files']){
for (const fileName of result.entries.files) {
const fullFilePath: string = path.join(localDirectory, fileName.name);
downloadFile(fileServerClient, azureDirectory, fileName.name, azureShare, fullFilePath)
await downloadFile(fileServerClient, azureDirectory, fileName.name, azureShare, fullFilePath);
}
for(var directoryName of result['entries']['directories']){
const fullDirectoryPath: string = path.join(localDirectory, directoryName.name)
const fullAzureDirectory: string = path.join(azureDirectory, directoryName.name)
downloadDirectory(fileServerClient, fullAzureDirectory, azureShare, fullDirectoryPath)
for (const directoryName of result.entries.directories) {
const fullDirectoryPath: string = path.join(localDirectory, directoryName.name);
const fullAzureDirectory: string = path.join(azureDirectory, directoryName.name);
await downloadDirectory(fileServerClient, fullAzureDirectory, azureShare, fullDirectoryPath);
}
deferred.resolve();
})
});
return deferred.promise;
}
}
// tslint:enable: no-redundant-jsdoc no-any no-unsafe-any
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
......@@ -20,21 +21,29 @@
'use strict';
import * as fs from 'fs';
import { KubernetesCRDClient, GeneralK8sClient } from '../kubernetesApiClient';
import { GeneralK8sClient, KubernetesCRDClient } from '../kubernetesApiClient';
abstract class FrameworkControllerClient extends KubernetesCRDClient{
/**
* FrameworkController Client
*/
abstract class FrameworkControllerClient extends KubernetesCRDClient {
/**
* Factory method to generate operator cliet
* Factory method to generate operator client
*/
// tslint:disable-next-line:function-name
public static generateFrameworkControllerClient(): KubernetesCRDClient {
return new FrameworkControllerClientV1();
}
}
/**
* FrameworkController ClientV1
*/
class FrameworkControllerClientV1 extends FrameworkControllerClient {
/**
* constructor, to initialize frameworkcontroller CRD definition
*/
// tslint:disable: no-unsafe-any no-any
public constructor() {
super();
this.crdSchema = JSON.parse(fs.readFileSync('./config/frameworkcontroller/frameworkcontrollerjob-crd-v1.json', 'utf8'));
......@@ -42,13 +51,13 @@ class FrameworkControllerClientV1 extends FrameworkControllerClient {
}
protected get operator(): any {
return this.client.apis["frameworkcontroller.microsoft.com"].v1.namespaces('default').frameworks;
return this.client.apis['frameworkcontroller.microsoft.com'].v1.namespaces('default').frameworks;
}
// tslint:enable: no-unsafe-any no-any
public get containerName(): string {
return 'framework';
}
}
}
export { FrameworkControllerClient, GeneralK8sClient };
......@@ -20,10 +20,11 @@
'use strict';
import * as assert from 'assert';
import { KubernetesTrialConfig, KubernetesTrialConfigTemplate, KubernetesClusterConfigAzure,
KubernetesClusterConfigNFS, NFSConfig, KubernetesStorageKind, keyVaultConfig, AzureStorage, KubernetesClusterConfig,
StorageConfig } from '../kubernetesConfig'
import { AzureStorage, KeyVaultConfig, KubernetesClusterConfig, KubernetesClusterConfigAzure, KubernetesClusterConfigNFS,
KubernetesStorageKind, KubernetesTrialConfig, KubernetesTrialConfigTemplate, NFSConfig, StorageConfig
} from '../kubernetesConfig';
// tslint:disable:completed-docs
export class FrameworkAttemptCompletionPolicy {
public readonly minFailedTaskCount: number;
public readonly minSucceededTaskCount: number;
......@@ -36,13 +37,13 @@ export class FrameworkAttemptCompletionPolicy {
/**
* Trial job configuration for FrameworkController
*/
export class FrameworkControllerTrialConfigTemplate extends KubernetesTrialConfigTemplate{
export class FrameworkControllerTrialConfigTemplate extends KubernetesTrialConfigTemplate {
public readonly frameworkAttemptCompletionPolicy: FrameworkAttemptCompletionPolicy;
public readonly name: string;
public readonly taskNum: number;
constructor(taskNum: number, command : string, gpuNum : number,
cpuNum: number, memoryMB: number, image: string,
frameworkAttemptCompletionPolicy: FrameworkAttemptCompletionPolicy) {
constructor(taskNum: number, command : string, gpuNum : number,
cpuNum: number, memoryMB: number, image: string,
frameworkAttemptCompletionPolicy: FrameworkAttemptCompletionPolicy) {
super(command, gpuNum, cpuNum, memoryMB, image);
this.frameworkAttemptCompletionPolicy = frameworkAttemptCompletionPolicy;
this.name = name;
......@@ -50,7 +51,7 @@ export class FrameworkControllerTrialConfigTemplate extends KubernetesTrialConfi
}
}
export class FrameworkControllerTrialConfig extends KubernetesTrialConfig{
export class FrameworkControllerTrialConfig extends KubernetesTrialConfig {
public readonly taskRoles: FrameworkControllerTrialConfigTemplate[];
public readonly codeDir: string;
constructor(codeDir: string, taskRoles: FrameworkControllerTrialConfigTemplate[]) {
......@@ -68,11 +69,12 @@ export class FrameworkControllerClusterConfig extends KubernetesClusterConfig {
}
}
// tslint:disable:function-name
export class FrameworkControllerClusterConfigNFS extends KubernetesClusterConfigNFS {
public readonly serviceAccountName: string;
constructor(
serviceAccountName: string,
apiVersion: string,
serviceAccountName: string,
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind
) {
......@@ -81,8 +83,9 @@ export class FrameworkControllerClusterConfigNFS extends KubernetesClusterConfig
}
public static getInstance(jsonObject: object): FrameworkControllerClusterConfigNFS {
let kubeflowClusterConfigObjectNFS = <FrameworkControllerClusterConfigNFS>jsonObject;
assert (kubeflowClusterConfigObjectNFS !== undefined)
const kubeflowClusterConfigObjectNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>jsonObject;
assert (kubeflowClusterConfigObjectNFS !== undefined);
return new FrameworkControllerClusterConfigNFS(
kubeflowClusterConfigObjectNFS.serviceAccountName,
kubeflowClusterConfigObjectNFS.apiVersion,
......@@ -94,20 +97,21 @@ export class FrameworkControllerClusterConfigNFS extends KubernetesClusterConfig
export class FrameworkControllerClusterConfigAzure extends KubernetesClusterConfigAzure {
public readonly serviceAccountName: string;
constructor(
serviceAccountName: string,
apiVersion: string,
keyVault: keyVaultConfig,
azureStorage: AzureStorage,
serviceAccountName: string,
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind
) {
super(apiVersion, keyVault, azureStorage,storage);
super(apiVersion, keyVault, azureStorage, storage);
this.serviceAccountName = serviceAccountName;
}
public static getInstance(jsonObject: object): FrameworkControllerClusterConfigAzure {
let kubeflowClusterConfigObjectAzure = <FrameworkControllerClusterConfigAzure>jsonObject;
const kubeflowClusterConfigObjectAzure: FrameworkControllerClusterConfigAzure = <FrameworkControllerClusterConfigAzure>jsonObject;
return new FrameworkControllerClusterConfigAzure(
kubeflowClusterConfigObjectAzure.serviceAccountName,
kubeflowClusterConfigObjectAzure.apiVersion,
......@@ -121,11 +125,11 @@ export class FrameworkControllerClusterConfigAzure extends KubernetesClusterConf
export class FrameworkControllerClusterConfigFactory {
public static generateFrameworkControllerClusterConfig(jsonObject: object): FrameworkControllerClusterConfig {
let storageConfig = <StorageConfig>jsonObject;
if(!storageConfig) {
throw new Error("Invalid json object as a StorageConfig instance");
const storageConfig: StorageConfig = <StorageConfig>jsonObject;
if (storageConfig === undefined) {
throw new Error('Invalid json object as a StorageConfig instance');
}
if(storageConfig.storage && storageConfig.storage === 'azureStorage') {
if (storageConfig.storage !== undefined && storageConfig.storage === 'azureStorage') {
return FrameworkControllerClusterConfigAzure.getInstance(jsonObject);
} else if (storageConfig.storage === undefined || storageConfig.storage === 'nfs') {
return FrameworkControllerClusterConfigNFS.getInstance(jsonObject);
......@@ -134,6 +138,7 @@ export class FrameworkControllerClusterConfigFactory {
}
}
export type FrameworkControllerJobStatus = 'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted';
export type FrameworkControllerJobStatus =
'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted';
export type FrameworkControllerJobCompleteStatus = 'Succeeded' | 'Failed';
\ No newline at end of file
export type FrameworkControllerJobCompleteStatus = 'Succeeded' | 'Failed';
......@@ -19,66 +19,74 @@
'use strict';
import { KubernetesTrialJobDetail} from '../kubernetesData';
import { KubernetesCRDClient } from '../kubernetesApiClient';
import { KubernetesTrialJobDetail} from '../kubernetesData';
import { KubernetesJobInfoCollector } from '../kubernetesJobInfoCollector';
import { FrameworkControllerJobStatus, FrameworkControllerJobCompleteStatus } from './frameworkcontrollerConfig';
import { FrameworkControllerJobCompleteStatus, FrameworkControllerJobStatus } from './frameworkcontrollerConfig';
/**
* Collector frameworkcontroller jobs info from Kubernetes cluster, and update frameworkcontroller job status locally
*/
export class FrameworkControllerJobInfoCollector extends KubernetesJobInfoCollector{
export class FrameworkControllerJobInfoCollector extends KubernetesJobInfoCollector {
constructor(jobMap: Map<string, KubernetesTrialJobDetail>) {
super(jobMap);
}
protected async retrieveSingleTrialJobInfo(kubernetesCRDClient: KubernetesCRDClient | undefined,
kubernetesTrialJob : KubernetesTrialJobDetail) : Promise<void> {
protected async retrieveSingleTrialJobInfo(kubernetesCRDClient: KubernetesCRDClient | undefined,
kubernetesTrialJob : KubernetesTrialJobDetail) : Promise<void> {
if (!this.statusesNeedToCheck.includes(kubernetesTrialJob.status)) {
return Promise.resolve();
}
if(kubernetesCRDClient === undefined) {
if (kubernetesCRDClient === undefined) {
return Promise.reject('kubernetesCRDClient is undefined');
}
// tslint:disable-next-line:no-any
let kubernetesJobInfo: any;
try {
kubernetesJobInfo = await kubernetesCRDClient.getKubernetesJob(kubernetesTrialJob.kubernetesJobName);
} catch(error) {
kubernetesJobInfo = await kubernetesCRDClient.getKubernetesJob(kubernetesTrialJob.kubernetesJobName);
} catch (error) {
this.log.error(`Get job ${kubernetesTrialJob.kubernetesJobName} info failed, error is ${error}`);
//This is not treat as a error status
return Promise.resolve();
}
if(kubernetesJobInfo.status && kubernetesJobInfo.status.state) {
// tslint:disable: no-unsafe-any
if (kubernetesJobInfo.status && kubernetesJobInfo.status.state) {
const frameworkJobType: FrameworkControllerJobStatus = <FrameworkControllerJobStatus>kubernetesJobInfo.status.state;
switch(frameworkJobType) {
case 'AttemptCreationPending' || 'AttemptCreationRequested' || 'AttemptPreparing':
switch (frameworkJobType) {
case 'AttemptCreationPending':
case 'AttemptCreationRequested':
case 'AttemptPreparing':
kubernetesTrialJob.status = 'WAITING';
break;
case 'AttemptRunning':
kubernetesTrialJob.status = 'RUNNING';
if(!kubernetesTrialJob.startTime) {
if (kubernetesTrialJob.startTime === undefined) {
kubernetesTrialJob.startTime = Date.parse(<string>kubernetesJobInfo.status.startTime);
}
break;
case 'Completed':
const completedJobType : FrameworkControllerJobCompleteStatus = <FrameworkControllerJobCompleteStatus>kubernetesJobInfo.status.attemptStatus.completionStatus.type.name;
switch(completedJobType) {
const completedJobType : FrameworkControllerJobCompleteStatus =
<FrameworkControllerJobCompleteStatus>kubernetesJobInfo.status.attemptStatus.completionStatus.type.name;
switch (completedJobType) {
case 'Succeeded':
kubernetesTrialJob.status = 'SUCCEEDED';
break;
case 'Failed':
kubernetesTrialJob.status = 'FAILED';
break;
break;
default:
}
kubernetesTrialJob.endTime = Date.parse(<string>kubernetesJobInfo.status.completionTime);
kubernetesTrialJob.endTime = Date.parse(<string>kubernetesJobInfo.status.completionTime);
break;
default:
break;
}
}
return Promise.resolve();
}
}
\ No newline at end of file
// tslint:enable: no-unsafe-any
}
......@@ -20,16 +20,16 @@
'use strict';
import * as component from '../../../common/component';
import { KubernetesJobRestServer } from '../kubernetesJobRestServer';
import { FrameworkControllerTrainingService } from './frameworkcontrollerTrainingService';
import { KubernetesJobRestServer } from '../kubernetesJobRestServer'
/**
* frameworkcontroller Training service Rest server, provides rest API to support frameworkcontroller job metrics update
*
*
*/
@component.Singleton
export class FrameworkControllerJobRestServer extends KubernetesJobRestServer{
export class FrameworkControllerJobRestServer extends KubernetesJobRestServer {
constructor() {
super(component.get(FrameworkControllerTrainingService));
}
}
\ No newline at end of file
}
}
......@@ -17,31 +17,29 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict'
'use strict';
import * as component from '../../../common/component';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import * as component from '../../../common/component';
import { getExperimentId } from '../../../common/experimentStartupInfo';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import {
JobApplicationForm, TrialJobApplicationForm,
TrialJobDetail, NNIManagerIpConfig
JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail
} from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
import { NFSConfig } from '../kubernetesConfig'
import { KubernetesTrialJobDetail } from '../kubernetesData';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { validateCodeDir } from '../../common/util';
import { AzureStorageClientUtility } from '../azureStorageClientUtils';
import { NFSConfig } from '../kubernetesConfig';
import { KubernetesTrialJobDetail } from '../kubernetesData';
import { KubernetesTrainingService } from '../kubernetesTrainingService';
import { FrameworkControllerTrialConfig, FrameworkControllerClusterConfig, FrameworkControllerClusterConfigAzure, FrameworkControllerClusterConfigNFS,
FrameworkControllerClusterConfigFactory} from './frameworkcontrollerConfig';
import { FrameworkControllerJobRestServer } from './frameworkcontrollerJobRestServer';
import { FrameworkControllerClient } from './frameworkcontrollerApiClient';
import { FrameworkControllerClusterConfig, FrameworkControllerClusterConfigAzure, FrameworkControllerClusterConfigFactory,
FrameworkControllerClusterConfigNFS, FrameworkControllerTrialConfig} from './frameworkcontrollerConfig';
import { FrameworkControllerJobInfoCollector } from './frameworkcontrollerJobInfoCollector';
import { FrameworkControllerJobRestServer } from './frameworkcontrollerJobRestServer';
/**
* Training Service implementation for frameworkcontroller
......@@ -49,30 +47,30 @@ import { FrameworkControllerJobInfoCollector } from './frameworkcontrollerJobInf
@component.Singleton
class FrameworkControllerTrainingService extends KubernetesTrainingService implements KubernetesTrainingService {
private fcTrialConfig?: FrameworkControllerTrialConfig; // frameworkcontroller trial configuration
private fcJobInfoCollector: FrameworkControllerJobInfoCollector; // frameworkcontroller job info collector
private fcContainerPortMap = new Map<string, number>(); // store frameworkcontroller container port
private readonly fcJobInfoCollector: FrameworkControllerJobInfoCollector; // frameworkcontroller job info collector
private readonly fcContainerPortMap: Map<string, number> = new Map<string, number>(); // store frameworkcontroller container port
private fcClusterConfig?: FrameworkControllerClusterConfig;
constructor() {
super();
this.fcJobInfoCollector = new FrameworkControllerJobInfoCollector(this.trialJobsMap);
this.experimentId = getExperimentId();
this.experimentId = getExperimentId();
this.nextTrialSequenceId = -1;
}
public async run(): Promise<void> {
this.kubernetesJobRestServer = component.get(FrameworkControllerJobRestServer);
if(!this.kubernetesJobRestServer) {
if (this.kubernetesJobRestServer === undefined) {
throw new Error('kubernetesJobRestServer not initialized!');
}
await this.kubernetesJobRestServer.start();
this.kubernetesJobRestServer.setEnableVersionCheck = this.versionCheck;
this.log.info(`frameworkcontroller Training service rest server listening on: ${this.kubernetesJobRestServer.endPoint}`);
while (!this.stopping) {
// collect metrics for frameworkcontroller jobs by interacting with Kubernetes API server
// collect metrics for frameworkcontroller jobs by interacting with Kubernetes API server
await delay(3000);
await this.fcJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
if(this.kubernetesJobRestServer.getErrorMessage) {
if (this.kubernetesJobRestServer.getErrorMessage !== undefined) {
throw new Error(this.kubernetesJobRestServer.getErrorMessage);
this.stopping = true;
}
......@@ -80,14 +78,14 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
}
public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
if(!this.fcClusterConfig) {
if (this.fcClusterConfig === undefined) {
throw new Error('frameworkcontrollerClusterConfig is not initialized');
}
if(!this.kubernetesCRDClient) {
if (this.kubernetesCRDClient === undefined) {
throw new Error('kubernetesCRDClient is undefined');
}
if(!this.kubernetesRestServerPort) {
if (this.kubernetesRestServerPort === undefined) {
const restServer: FrameworkControllerJobRestServer = component.get(FrameworkControllerJobRestServer);
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}
......@@ -97,14 +95,14 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
// Set trial's NFS working folder
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
const frameworkcontrollerJobName = `nniexp${this.experimentId}trial${trialJobId}`.toLowerCase();
const frameworkcontrollerJobName: string = `nniexp${this.experimentId}trial${trialJobId}`.toLowerCase();
//Generate the port used for taskRole
this.generateContainerPort();
await this.prepareRunScript(trialLocalTempFolder, curTrialSequenceId, trialJobId, trialWorkingFolder, form);
//upload code files
let trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
trialJobId,
'WAITING',
......@@ -116,182 +114,202 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
trialJobOutputUrl
);
// Set trial job detail until create frameworkcontroller job successfully
// Set trial job detail until create frameworkcontroller job successfully
this.trialJobsMap.set(trialJobId, trialJobDetail);
// Create frameworkcontroller job based on generated frameworkcontroller job resource config
const frameworkcontrollerJobConfig = await this.prepareFrameworkControllerConfig(trialJobId, trialWorkingFolder, frameworkcontrollerJobName);
// tslint:disable-next-line:no-any
const frameworkcontrollerJobConfig: any = await this.prepareFrameworkControllerConfig(
trialJobId, trialWorkingFolder, frameworkcontrollerJobName);
await this.kubernetesCRDClient.createKubernetesJob(frameworkcontrollerJobConfig);
// Set trial job detail until create frameworkcontroller job successfully
// Set trial job detail until create frameworkcontroller job successfully
this.trialJobsMap.set(trialJobId, trialJobDetail);
return Promise.resolve(trialJobDetail);
}
// tslint:disable:no-redundant-jsdoc no-any no-unsafe-any
public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.FRAMEWORKCONTROLLER_CLUSTER_CONFIG:
const frameworkcontrollerClusterJsonObject: any = JSON.parse(value);
this.fcClusterConfig = FrameworkControllerClusterConfigFactory
.generateFrameworkControllerClusterConfig(frameworkcontrollerClusterJsonObject);
if (this.fcClusterConfig.storageType === 'azureStorage') {
const azureFrameworkControllerClusterConfig: FrameworkControllerClusterConfigAzure =
<FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
this.azureStorageAccountName = azureFrameworkControllerClusterConfig.azureStorage.accountName;
this.azureStorageShare = azureFrameworkControllerClusterConfig.azureStorage.azureShare;
await this.createAzureStorage(
azureFrameworkControllerClusterConfig.keyVault.vaultName,
azureFrameworkControllerClusterConfig.keyVault.name,
azureFrameworkControllerClusterConfig.azureStorage.accountName,
azureFrameworkControllerClusterConfig.azureStorage.azureShare
);
} else if (this.fcClusterConfig.storageType === 'nfs') {
const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS =
<FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
await this.createNFSStorage(
nfsFrameworkControllerClusterConfig.nfs.server,
nfsFrameworkControllerClusterConfig.nfs.path
);
}
this.kubernetesCRDClient = FrameworkControllerClient.generateFrameworkControllerClient();
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
const frameworkcontrollerTrialJsonObjsect: any = JSON.parse(value);
this.fcTrialConfig = new FrameworkControllerTrialConfig(
frameworkcontrollerTrialJsonObjsect.codeDir,
frameworkcontrollerTrialJsonObjsect.taskRoles
);
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.fcTrialConfig.codeDir);
} catch (error) {
this.log.error(error);
return Promise.reject(new Error(error));
}
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default:
}
return Promise.resolve();
}
// tslint:enable: no-any no-unsafe-any
/**
* upload code files to nfs or azureStroage
* @param trialJobId
* @param trialLocalTempFolder
* @param trialJobId
* @param trialLocalTempFolder
* return: trialJobOutputUrl
*/
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
if(!this.fcClusterConfig) {
if (this.fcClusterConfig === undefined) {
throw new Error('Kubeflow Cluster config is not initialized');
}
let trialJobOutputUrl: string = '';
if(this.fcClusterConfig.storageType === 'azureStorage') {
try{
if (this.fcClusterConfig.storageType === 'azureStorage') {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
try {
//upload local files to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${trialLocalTempFolder}`);
await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient, `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${trialLocalTempFolder}`);
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`
}catch(error){
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/\
${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
} catch (error) {
this.log.error(error);
return Promise.reject(error);
}
} else if(this.fcClusterConfig.storageType === 'nfs') {
let nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
// Creat work dir for current trial in NFS directory
} else if (this.fcClusterConfig.storageType === 'nfs') {
const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS =
<FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy code files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsFrameworkControllerClusterConfig.nfs;
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
}
return Promise.resolve(trialJobOutputUrl);
}
/**
* generate trial's command for frameworkcontroller
* expose port and execute injector.sh before executing user's command
* @param command
* @param command
*/
private generateCommandScript(command: string): string {
let portScript = '';
if(!this.fcTrialConfig) {
let portScript: string = '';
if (this.fcTrialConfig === undefined) {
throw new Error('frameworkcontroller trial config is not initialized');
}
for(let taskRole of this.fcTrialConfig.taskRoles) {
for (const taskRole of this.fcTrialConfig.taskRoles) {
portScript += `FB_${taskRole.name.toUpperCase()}_PORT=${this.fcContainerPortMap.get(taskRole.name)} `;
}
return `${portScript} . /mnt/frameworkbarrier/injector.sh && ${command}`;
}
private async prepareRunScript(trialLocalTempFolder: string, curTrialSequenceId: number, trialJobId: string, trialWorkingFolder: string, form: JobApplicationForm): Promise<void> {
if(!this.fcTrialConfig) {
private async prepareRunScript(trialLocalTempFolder: string, curTrialSequenceId: number, trialJobId: string,
trialWorkingFolder: string, form: JobApplicationForm): Promise<void> {
if (this.fcTrialConfig === undefined) {
throw new Error('frameworkcontroller trial config is not initialized');
}
await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`);
await cpp.exec(`cp -r ${this.fcTrialConfig.codeDir} ${trialLocalTempFolder}`);
const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
const installScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), installScriptContent, { encoding: 'utf8' });
// Create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
for(let taskRole of this.fcTrialConfig.taskRoles) {
const runScriptContent: string = await this.generateRunScript('frameworkcontroller', trialJobId, trialWorkingFolder,
this.generateCommandScript(taskRole.command), curTrialSequenceId.toString(), taskRole.name, taskRole.gpuNum);
for (const taskRole of this.fcTrialConfig.taskRoles) {
const runScriptContent: string =
await this.generateRunScript('frameworkcontroller', trialJobId, trialWorkingFolder,
this.generateCommandScript(taskRole.command), curTrialSequenceId.toString(),
taskRole.name, taskRole.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, `run_${taskRole.name}.sh`), runScriptContent, { encoding: 'utf8' });
}
// Write file content ( parameter.cfg ) to local tmp folders
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form)
if(trialForm && trialForm.hyperParameters) {
await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' });
const trialForm : TrialJobApplicationForm = (<TrialJobApplicationForm>form);
if (trialForm !== undefined && trialForm.hyperParameters !== undefined) {
await fs.promises.writeFile(path.join(trialLocalTempFolder, generateParamFileName(trialForm.hyperParameters)),
trialForm.hyperParameters.value, { encoding: 'utf8' });
}
}
private async prepareFrameworkControllerConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string): Promise<any> {
if(!this.fcTrialConfig) {
// tslint:disable: no-any no-unsafe-any
private async prepareFrameworkControllerConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string):
Promise<any> {
if (this.fcTrialConfig === undefined) {
throw new Error('frameworkcontroller trial config is not initialized');
}
const podResources : any = [];
for(let taskRole of this.fcTrialConfig.taskRoles) {
let resource: any = {};
for (const taskRole of this.fcTrialConfig.taskRoles) {
const resource: any = {};
resource.requests = this.generatePodResource(taskRole.memoryMB, taskRole.cpuNum, taskRole.gpuNum);
resource.limits = Object.assign({}, resource.requests);
resource.limits = {...resource.requests};
podResources.push(resource);
}
// Generate frameworkcontroller job resource config object
const frameworkcontrollerJobConfig: any = this.generateFrameworkControllerJobConfig(trialJobId, trialWorkingFolder, frameworkcontrollerJobName, podResources);
// Generate frameworkcontroller job resource config object
const frameworkcontrollerJobConfig: any =
this.generateFrameworkControllerJobConfig(trialJobId, trialWorkingFolder, frameworkcontrollerJobName, podResources);
return Promise.resolve(frameworkcontrollerJobConfig);
}
public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.FRAMEWORKCONTROLLER_CLUSTER_CONFIG:
let frameworkcontrollerClusterJsonObject = JSON.parse(value);
this.fcClusterConfig = FrameworkControllerClusterConfigFactory.generateFrameworkControllerClusterConfig(frameworkcontrollerClusterJsonObject);
if(this.fcClusterConfig.storageType === 'azureStorage') {
let azureFrameworkControllerClusterConfig = <FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
this.azureStorageAccountName = azureFrameworkControllerClusterConfig.azureStorage.accountName;
this.azureStorageShare = azureFrameworkControllerClusterConfig.azureStorage.azureShare;
await this.createAzureStorage(
azureFrameworkControllerClusterConfig.keyVault.vaultName,
azureFrameworkControllerClusterConfig.keyVault.name,
azureFrameworkControllerClusterConfig.azureStorage.accountName,
azureFrameworkControllerClusterConfig.azureStorage.azureShare
);
} else if(this.fcClusterConfig.storageType === 'nfs') {
let nfsFrameworkControllerClusterConfig = <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
await this.createNFSStorage(
nfsFrameworkControllerClusterConfig.nfs.server,
nfsFrameworkControllerClusterConfig.nfs.path
);
}
this.kubernetesCRDClient = FrameworkControllerClient.generateFrameworkControllerClient();
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
let frameworkcontrollerTrialJsonObjsect = JSON.parse(value);
this.fcTrialConfig = new FrameworkControllerTrialConfig(
frameworkcontrollerTrialJsonObjsect.codeDir,
frameworkcontrollerTrialJsonObjsect.taskRoles
);
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.fcTrialConfig.codeDir);
} catch(error) {
this.log.error(error);
return Promise.reject(new Error(error));
}
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default:
break;
}
return Promise.resolve();
}
private generateContainerPort() {
if(!this.fcTrialConfig) {
private generateContainerPort(): void {
if (this.fcTrialConfig === undefined) {
throw new Error('frameworkcontroller trial config is not initialized');
}
let port = 4000; //The default port used in container
for(let index in this.fcTrialConfig.taskRoles) {
let port: number = 4000; //The default port used in container
for (const index of this.fcTrialConfig.taskRoles.keys()) {
this.fcContainerPortMap.set(this.fcTrialConfig.taskRoles[index].name, port);
port += 1;
}
......@@ -304,24 +322,25 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
* @param frameworkcontrollerJobName job name
* @param podResources pod template
*/
private generateFrameworkControllerJobConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName : string, podResources : any) : any {
if(!this.fcClusterConfig) {
private generateFrameworkControllerJobConfig(trialJobId: string, trialWorkingFolder: string,
frameworkcontrollerJobName : string, podResources : any) : any {
if (this.fcClusterConfig === undefined) {
throw new Error('frameworkcontroller Cluster config is not initialized');
}
if(!this.fcTrialConfig) {
if (this.fcTrialConfig === undefined) {
throw new Error('frameworkcontroller trial config is not initialized');
}
let taskRoles = [];
for(let index in this.fcTrialConfig.taskRoles) {
let containerPort = this.fcContainerPortMap.get(this.fcTrialConfig.taskRoles[index].name);
if(!containerPort) {
const taskRoles: any = [];
for (const index of this.fcTrialConfig.taskRoles.keys()) {
const containerPort: number | undefined = this.fcContainerPortMap.get(this.fcTrialConfig.taskRoles[index].name);
if (containerPort === undefined) {
throw new Error('Container port is not initialized');
}
let taskRole = this.generateTaskRoleConfig(
trialWorkingFolder,
this.fcTrialConfig.taskRoles[index].image,
const taskRole: any = this.generateTaskRoleConfig(
trialWorkingFolder,
this.fcTrialConfig.taskRoles[index].image,
`run_${this.fcTrialConfig.taskRoles[index].name}.sh`,
podResources[index],
containerPort
......@@ -330,17 +349,17 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
name: this.fcTrialConfig.taskRoles[index].name,
taskNumber: this.fcTrialConfig.taskRoles[index].taskNum,
frameworkAttemptCompletionPolicy: {
minFailedTaskCount: this.fcTrialConfig.taskRoles[index].frameworkAttemptCompletionPolicy.minFailedTaskCount,
minFailedTaskCount: this.fcTrialConfig.taskRoles[index].frameworkAttemptCompletionPolicy.minFailedTaskCount,
minSucceededTaskCount: this.fcTrialConfig.taskRoles[index].frameworkAttemptCompletionPolicy.minSucceededTaskCount
},
task: taskRole
});
}
return {
apiVersion: `frameworkcontroller.microsoft.com/v1`,
kind: 'Framework',
metadata: {
metadata: {
name: frameworkcontrollerJobName,
namespace: 'default',
labels: {
......@@ -356,19 +375,18 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
};
}
private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string, podResources: any, containerPort: number): any {
if(!this.fcClusterConfig) {
private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string,
podResources: any, containerPort: number): any {
if (this.fcClusterConfig === undefined) {
throw new Error('frameworkcontroller Cluster config is not initialized');
}
if(!this.fcTrialConfig) {
if (this.fcTrialConfig === undefined) {
throw new Error('frameworkcontroller trial config is not initialized');
}
let volumeSpecMap = new Map<string, object>();
if(this.fcClusterConfig.storageType === 'azureStorage'){
const volumeSpecMap: Map<string, object> = new Map<string, object>();
if (this.fcClusterConfig.storageType === 'azureStorage') {
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
......@@ -380,9 +398,10 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
}, {
name: 'frameworkbarrier-volume',
emptyDir: {}
}])
}else {
let frameworkcontrollerClusterConfigNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS> this.fcClusterConfig;
}]);
} else {
const frameworkcontrollerClusterConfigNFS: FrameworkControllerClusterConfigNFS =
<FrameworkControllerClusterConfigNFS> this.fcClusterConfig;
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
......@@ -393,19 +412,19 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
}, {
name: 'frameworkbarrier-volume',
emptyDir: {}
}])
}]);
}
let containers = [
const containers: any = [
{
name: 'framework',
image: replicaImage,
command: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
command: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [
{
name: 'nni-vol',
mountPath: this.CONTAINER_MOUNT_PATH
},{
}, {
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}],
......@@ -413,35 +432,36 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
ports: [{
containerPort: containerPort
}]
}]
}];
let initContainers = [
const initContainers: any = [
{
name: 'frameworkbarrier',
image: 'frameworkcontroller/frameworkbarrier',
volumeMounts: [
{
{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}]
}]
let spec: any = {
}];
const spec: any = {
containers: containers,
initContainers: initContainers,
restartPolicy: 'OnFailure',
volumes: volumeSpecMap.get('nniVolumes'),
hostNetwork: false
};
if(this.fcClusterConfig.serviceAccountName) {
spec.serviceAccountName = this.fcClusterConfig.serviceAccountName;
if (this.fcClusterConfig.serviceAccountName !== undefined) {
spec.serviceAccountName = this.fcClusterConfig.serviceAccountName;
}
let taskRole = {
return {
pod: {
spec: spec
}
}
return taskRole;
};
}
// tslint:enable: no-any no-unsafe-any
}
export { FrameworkControllerTrainingService }
export { FrameworkControllerTrainingService };
......@@ -20,18 +20,22 @@
'use strict';
import * as fs from 'fs';
import { GeneralK8sClient, KubernetesCRDClient } from '../kubernetesApiClient';
import { KubeflowOperator } from './kubeflowConfig';
import { KubernetesCRDClient, GeneralK8sClient } from '../kubernetesApiClient';
abstract class KubeflowOperatorClient extends KubernetesCRDClient{
/**
* KubeflowOperator Client
*/
abstract class KubeflowOperatorClient extends KubernetesCRDClient {
/**
* Factory method to generate operator cliet
* Factory method to generate operator client
*/
public static generateOperatorClient(kubeflowOperator: KubeflowOperator,
operatorApiVersion: string): KubernetesCRDClient {
switch(kubeflowOperator) {
// tslint:disable-next-line:function-name
public static generateOperatorClient(kubeflowOperator: KubeflowOperator,
operatorApiVersion: string): KubernetesCRDClient {
switch (kubeflowOperator) {
case 'tf-operator': {
switch(operatorApiVersion) {
switch (operatorApiVersion) {
case 'v1alpha2': {
return new TFOperatorClientV1Alpha2();
}
......@@ -41,11 +45,12 @@ abstract class KubeflowOperatorClient extends KubernetesCRDClient{
case 'v1beta2': {
return new TFOperatorClientV1Beta2();
}
default:
throw new Error(`Invalid tf-operator apiVersion ${operatorApiVersion}`);
}
break;
}
case 'pytorch-operator': {
switch(operatorApiVersion) {
switch (operatorApiVersion) {
case 'v1alpha2': {
return new PyTorchOperatorClientV1Alpha2();
}
......@@ -55,13 +60,17 @@ abstract class KubeflowOperatorClient extends KubernetesCRDClient{
case 'v1beta2': {
return new PyTorchOperatorClientV1Beta2();
}
default:
throw new Error(`Invalid pytorch-operator apiVersion ${operatorApiVersion}`);
}
}
}
default:
throw new Error(`Invalid operator ${kubeflowOperator}`);
}
throw new Error(`Invalid operator ${kubeflowOperator} or apiVersion ${operatorApiVersion}`);
}
}
// tslint:disable: no-unsafe-any no-any completed-docs
class TFOperatorClientV1Alpha2 extends KubeflowOperatorClient {
/**
* constructor, to initialize tfjob CRD definition
......@@ -73,12 +82,12 @@ class TFOperatorClientV1Alpha2 extends KubeflowOperatorClient {
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1alpha2.namespaces('default').tfjobs;
return this.client.apis['kubeflow.org'].v1alpha2.namespaces('default').tfjobs;
}
public get containerName(): string {
return 'tensorflow';
}
}
}
class TFOperatorClientV1Beta1 extends KubernetesCRDClient {
......@@ -92,12 +101,12 @@ class TFOperatorClientV1Beta1 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1beta1.namespaces('default').tfjobs;
return this.client.apis['kubeflow.org'].v1beta1.namespaces('default').tfjobs;
}
public get containerName(): string {
return 'tensorflow';
}
}
}
class TFOperatorClientV1Beta2 extends KubernetesCRDClient {
......@@ -111,12 +120,12 @@ class TFOperatorClientV1Beta2 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1beta2.namespaces('default').tfjobs;
return this.client.apis['kubeflow.org'].v1beta2.namespaces('default').tfjobs;
}
public get containerName(): string {
return 'tensorflow';
}
}
}
class PyTorchOperatorClientV1Alpha2 extends KubeflowOperatorClient {
......@@ -130,7 +139,7 @@ class PyTorchOperatorClientV1Alpha2 extends KubeflowOperatorClient {
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1alpha2.namespaces('default').pytorchjobs;
return this.client.apis['kubeflow.org'].v1alpha2.namespaces('default').pytorchjobs;
}
public get containerName(): string {
......@@ -149,7 +158,7 @@ class PyTorchOperatorClientV1Beta1 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1beta1.namespaces('default').pytorchjobs;
return this.client.apis['kubeflow.org'].v1beta1.namespaces('default').pytorchjobs;
}
public get containerName(): string {
......@@ -168,7 +177,7 @@ class PyTorchOperatorClientV1Beta2 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis["kubeflow.org"].v1beta2.namespaces('default').pytorchjobs;
return this.client.apis['kubeflow.org'].v1beta2.namespaces('default').pytorchjobs;
}
public get containerName(): string {
......@@ -176,5 +185,5 @@ class PyTorchOperatorClientV1Beta2 extends KubernetesCRDClient {
}
}
// tslint:enable: no-unsafe-any
export { KubeflowOperatorClient, GeneralK8sClient };
......@@ -20,16 +20,20 @@
'use strict';
import * as assert from 'assert';
import { KubernetesClusterConfigAzure, KubernetesClusterConfigNFS, KubernetesStorageKind, NFSConfig, AzureStorage, keyVaultConfig,
KubernetesTrialConfig, KubernetesTrialConfigTemplate, StorageConfig, KubernetesClusterConfig } from '../kubernetesConfig'
import { MethodNotImplementedError } from '../../../common/errors';
import { AzureStorage, KeyVaultConfig, KubernetesClusterConfig, KubernetesClusterConfigAzure, KubernetesClusterConfigNFS,
KubernetesStorageKind, KubernetesTrialConfig, KubernetesTrialConfigTemplate, NFSConfig, StorageConfig
} from '../kubernetesConfig';
/** operator types that kubeflow supported */
// operator types that kubeflow supported
export type KubeflowOperator = 'tf-operator' | 'pytorch-operator' ;
export type DistTrainRole = 'worker' | 'ps' | 'master';
export type KubeflowJobStatus = 'Created' | 'Running' | 'Failed' | 'Succeeded';
export type OperatorApiVersion = 'v1alpha2' | 'v1beta1' | 'v1beta2';
/**
* Kubeflow Cluster Configuration
*/
export class KubeflowClusterConfig extends KubernetesClusterConfig {
public readonly operator: KubeflowOperator;
constructor(apiVersion: string, operator: KubeflowOperator) {
......@@ -38,11 +42,12 @@ export class KubeflowClusterConfig extends KubernetesClusterConfig {
}
}
// tslint:disable:completed-docs
export class KubeflowClusterConfigNFS extends KubernetesClusterConfigNFS {
public readonly operator: KubeflowOperator;
constructor(
operator: KubeflowOperator,
apiVersion: string,
operator: KubeflowOperator,
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind
) {
......@@ -54,9 +59,11 @@ export class KubeflowClusterConfigNFS extends KubernetesClusterConfigNFS {
return 'nfs';
}
// tslint:disable-next-line:function-name
public static getInstance(jsonObject: object): KubeflowClusterConfigNFS {
let kubeflowClusterConfigObjectNFS = <KubeflowClusterConfigNFS>jsonObject;
assert (kubeflowClusterConfigObjectNFS !== undefined)
const kubeflowClusterConfigObjectNFS: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>jsonObject;
assert (kubeflowClusterConfigObjectNFS !== undefined);
return new KubeflowClusterConfigNFS(
kubeflowClusterConfigObjectNFS.operator,
kubeflowClusterConfigObjectNFS.apiVersion,
......@@ -66,26 +73,28 @@ export class KubeflowClusterConfigNFS extends KubernetesClusterConfigNFS {
}
}
export class KubeflowClusterConfigAzure extends KubernetesClusterConfigAzure{
export class KubeflowClusterConfigAzure extends KubernetesClusterConfigAzure {
public readonly operator: KubeflowOperator;
constructor(
operator: KubeflowOperator,
apiVersion: string,
keyVault: keyVaultConfig,
azureStorage: AzureStorage,
operator: KubeflowOperator,
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind
) {
super(apiVersion, keyVault, azureStorage,storage);
super(apiVersion, keyVault, azureStorage, storage);
this.operator = operator;
}
public get storageType(): KubernetesStorageKind{
public get storageType(): KubernetesStorageKind {
return 'azureStorage';
}
// tslint:disable-next-line:function-name
public static getInstance(jsonObject: object): KubeflowClusterConfigAzure {
let kubeflowClusterConfigObjectAzure = <KubeflowClusterConfigAzure>jsonObject;
const kubeflowClusterConfigObjectAzure: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>jsonObject;
return new KubeflowClusterConfigAzure(
kubeflowClusterConfigObjectAzure.operator,
kubeflowClusterConfigObjectAzure.apiVersion,
......@@ -98,12 +107,13 @@ export class KubeflowClusterConfigAzure extends KubernetesClusterConfigAzure{
export class KubeflowClusterConfigFactory {
// tslint:disable-next-line:function-name
public static generateKubeflowClusterConfig(jsonObject: object): KubeflowClusterConfig {
let storageConfig = <StorageConfig>jsonObject;
if(!storageConfig) {
throw new Error("Invalid json object as a StorageConfig instance");
const storageConfig: StorageConfig = <StorageConfig>jsonObject;
if (storageConfig === undefined) {
throw new Error('Invalid json object as a StorageConfig instance');
}
if(storageConfig.storage && storageConfig.storage === 'azureStorage') {
if (storageConfig.storage !== undefined && storageConfig.storage === 'azureStorage') {
return KubeflowClusterConfigAzure.getInstance(jsonObject);
} else if (storageConfig.storage === undefined || storageConfig.storage === 'nfs') {
return KubeflowClusterConfigNFS.getInstance(jsonObject);
......@@ -122,10 +132,10 @@ export class KubeflowTrialConfig extends KubernetesTrialConfig {
}
}
export class KubeflowTrialConfigTemplate extends KubernetesTrialConfigTemplate{
export class KubeflowTrialConfigTemplate extends KubernetesTrialConfigTemplate {
public readonly replicas: number;
constructor(replicas: number, command : string, gpuNum : number,
cpuNum: number, memoryMB: number, image: string) {
constructor(replicas: number, command : string, gpuNum : number,
cpuNum: number, memoryMB: number, image: string) {
super(command, gpuNum, cpuNum, memoryMB, image);
this.replicas = replicas;
}
......@@ -163,22 +173,25 @@ export class KubeflowTrialConfigPytorch extends KubeflowTrialConfig {
export class KubeflowTrialConfigFactory {
// tslint:disable-next-line:function-name
public static generateKubeflowTrialConfig(jsonObject: object, operator: KubeflowOperator): KubeflowTrialConfig {
if(operator === 'tf-operator'){
let kubeflowTrialConfigObject = <KubeflowTrialConfigTensorflow>jsonObject;
if (operator === 'tf-operator') {
const kubeflowTrialConfigObject: KubeflowTrialConfigTensorflow = <KubeflowTrialConfigTensorflow>jsonObject;
return new KubeflowTrialConfigTensorflow(
kubeflowTrialConfigObject.codeDir,
kubeflowTrialConfigObject.worker,
kubeflowTrialConfigObject.ps
);
}else if(operator === 'pytorch-operator'){
let kubeflowTrialConfigObject = <KubeflowTrialConfigPytorch>jsonObject;
} else if (operator === 'pytorch-operator') {
const kubeflowTrialConfigObject: KubeflowTrialConfigPytorch = <KubeflowTrialConfigPytorch>jsonObject;
return new KubeflowTrialConfigPytorch(
kubeflowTrialConfigObject.codeDir,
kubeflowTrialConfigObject.master,
kubeflowTrialConfigObject.worker
);
}
throw new Error(`Invalid json object ${jsonObject}`);
throw new Error(`Invalid json object ${jsonObject}`);
}
}
......@@ -19,65 +19,68 @@
'use strict';
import { KubernetesTrialJobDetail} from '../kubernetesData';
import { KubernetesCRDClient } from '../kubernetesApiClient';
import { KubernetesTrialJobDetail} from '../kubernetesData';
import { KubernetesJobInfoCollector } from '../kubernetesJobInfoCollector';
import { KubeflowJobStatus } from './kubeflowConfig';
/**
* Collector Kubeflow jobs info from Kubernetes cluster, and update kubeflow job status locally
*/
export class KubeflowJobInfoCollector extends KubernetesJobInfoCollector{
export class KubeflowJobInfoCollector extends KubernetesJobInfoCollector {
constructor(jobMap: Map<string, KubernetesTrialJobDetail>) {
super(jobMap);
}
protected async retrieveSingleTrialJobInfo(kubernetesCRDClient: KubernetesCRDClient | undefined,
kubernetesTrialJob : KubernetesTrialJobDetail) : Promise<void> {
protected async retrieveSingleTrialJobInfo(kubernetesCRDClient: KubernetesCRDClient | undefined,
kubernetesTrialJob : KubernetesTrialJobDetail) : Promise<void> {
if (!this.statusesNeedToCheck.includes(kubernetesTrialJob.status)) {
return Promise.resolve();
}
if(kubernetesCRDClient === undefined) {
if (kubernetesCRDClient === undefined) {
return Promise.reject('kubernetesCRDClient is undefined');
}
// tslint:disable:no-any no-unsafe-any
let kubernetesJobInfo: any;
try {
kubernetesJobInfo = await kubernetesCRDClient.getKubernetesJob(kubernetesTrialJob.kubernetesJobName);
} catch(error) {
// Notice: it maynot be a 'real' error since cancel trial job can also cause getKubernetesJob failed.
kubernetesJobInfo = await kubernetesCRDClient.getKubernetesJob(kubernetesTrialJob.kubernetesJobName);
} catch (error) {
// Notice: it maynot be a 'real' error since cancel trial job can also cause getKubernetesJob failed.
this.log.error(`Get job ${kubernetesTrialJob.kubernetesJobName} info failed, error is ${error}`);
//This is not treat as a error status
return Promise.resolve();
}
if(kubernetesJobInfo.status && kubernetesJobInfo.status.conditions) {
const latestCondition = kubernetesJobInfo.status.conditions[kubernetesJobInfo.status.conditions.length - 1];
if (kubernetesJobInfo.status && kubernetesJobInfo.status.conditions) {
const latestCondition: any = kubernetesJobInfo.status.conditions[kubernetesJobInfo.status.conditions.length - 1];
const tfJobType : KubeflowJobStatus = <KubeflowJobStatus>latestCondition.type;
switch(tfJobType) {
switch (tfJobType) {
case 'Created':
kubernetesTrialJob.status = 'WAITING';
kubernetesTrialJob.startTime = Date.parse(<string>latestCondition.lastUpdateTime);
break;
kubernetesTrialJob.startTime = Date.parse(<string>latestCondition.lastUpdateTime);
break;
case 'Running':
kubernetesTrialJob.status = 'RUNNING';
if(!kubernetesTrialJob.startTime) {
if (kubernetesTrialJob.startTime === undefined) {
kubernetesTrialJob.startTime = Date.parse(<string>latestCondition.lastUpdateTime);
}
break;
case 'Failed':
kubernetesTrialJob.status = 'FAILED';
kubernetesTrialJob.endTime = Date.parse(<string>latestCondition.lastUpdateTime);
kubernetesTrialJob.endTime = Date.parse(<string>latestCondition.lastUpdateTime);
break;
case 'Succeeded':
kubernetesTrialJob.status = 'SUCCEEDED';
kubernetesTrialJob.endTime = Date.parse(<string>latestCondition.lastUpdateTime);
kubernetesTrialJob.endTime = Date.parse(<string>latestCondition.lastUpdateTime);
break;
default:
break;
}
}
// tslint:enable:no-any no-unsafe-any
return Promise.resolve();
}
}
\ No newline at end of file
}
......@@ -20,19 +20,19 @@
'use strict';
import * as component from '../../../common/component';
import { KubernetesJobRestServer } from '../kubernetesJobRestServer';
import { KubeflowTrainingService } from './kubeflowTrainingService';
import { KubernetesJobRestServer } from '../kubernetesJobRestServer'
/**
* Kubeflow Training service Rest server, provides rest API to support kubeflow job metrics update
*
*
*/
@component.Singleton
export class KubeflowJobRestServer extends KubernetesJobRestServer{
export class KubeflowJobRestServer extends KubernetesJobRestServer {
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor() {
super(component.get(KubeflowTrainingService));
}
}
\ No newline at end of file
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment