Unverified Commit 21a2bb0b authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support Azure k8s (#383)

Support aks of kuberflow training service
Support nnictl set nniManagerIp
parent 56b46003
......@@ -25,7 +25,8 @@
"ts-deferred": "^1.0.4",
"typescript-ioc": "^1.2.4",
"typescript-string-operations": "^1.3.1",
"webhdfs":"^1.2.0"
"webhdfs":"^1.2.0",
"azure-storage": "^2.10.2"
},
"devDependencies": {
"@types/chai": "^4.1.4",
......
......@@ -70,8 +70,16 @@ export namespace ValidationSchemas {
nfs: joi.object({
server: joi.string().min(1).required(),
path: joi.string().min(1).required()
}).required(),
kubernetesServer: joi.string().min(1).required()
}),
kubernetesServer: joi.string().min(1),
keyVault: joi.object({
vaultName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){1,127}$/),
name: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){1,127}$/)
}),
azureStorage: joi.object({
accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/),
azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/)
})
}),
nni_manager_ip: joi.object({
nniManagerIp: joi.string().min(1)
......
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as fs from 'fs'
import * as path from 'path';
import { Deferred } from 'ts-deferred';
import { getLogger } from '../../common/log';
import { mkDirP } from '../../common/utils';
export namespace AzureStorageClientUtility {
/**
* create azure share
* @param fileServerClient
* @param azureShare
*/
export async function createShare(fileServerClient: any, azureShare: any): Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
fileServerClient.createShareIfNotExists(azureShare, function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Create share failed:, ${error}`);
deferred.reject(error)
}else{
deferred.resolve()
}
})
return deferred.promise;
}
/**
* Create a new directory (NOT recursively) in azure file storage.
* @param fileServerClient
* @param azureFoler
* @param azureShare
*/
export async function createDirectory(fileServerClient: any, azureFoler: any, azureShare: any): Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
fileServerClient.createDirectoryIfNotExists(azureShare, azureFoler, function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Create directory failed:, ${error}`);
deferred.reject(error);
}else{
deferred.resolve();
}
})
return deferred.promise;
}
/**
* Create a new directory recursively in azure file storage
* @param fileServerClient
* @param azureDirectory
*/
export async function createDirectoryRecursive(fileServerClient: any, azureDirectory: any, azureShare: any): Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
let directories = azureDirectory.split("/");
let rootDirectory = ""
for(let directory of directories){
rootDirectory += directory;
await createDirectory(fileServerClient, rootDirectory, azureShare);
rootDirectory += '/';
}
deferred.resolve();
return deferred.promise;
}
/**
* upload a file to azure storage
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
*/
async function uploadFileToAzure(fileServerClient: any, azureDirectory: any, azureFileName: any, azureShare: any, localFilePath: any): Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
await fileServerClient.createFileFromLocalFile(azureShare, azureDirectory, azureFileName, localFilePath, function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Upload file failed:, ${error}`);
deferred.reject(error);
}else{
deferred.resolve();
}
})
return deferred.promise;
}
/**
* download a file from azure storage
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
*/
async function downloadFile(fileServerClient: any, azureDirectory: any, azureFileName: any, azureShare: any, localFilePath: any): Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
await fileServerClient.getFileToStream(azureShare, azureDirectory, azureFileName, fs.createWriteStream(localFilePath), function(error: any, result: any, response: any) {
if(error){
getLogger().error(`Download file failed:, ${error}`);
deferred.reject(error);
}else{
deferred.resolve();
}
})
return deferred.promise;
}
/**
* Upload a directory to azure file storage
* @param fileServerClient : the client of file server
* @param azureDirectory : the directory in azure file storage
* @param azureShare : the azure share used
* @param localDirectory : local directory to be uploaded
*/
export async function uploadDirectory(fileServerClient: any, azureDirectory: any, azureShare: any, localDirectory: any): Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
const fileNameArray: string[] = fs.readdirSync(localDirectory);
await createDirectoryRecursive(fileServerClient, azureDirectory, azureShare);
for(let fileName of fileNameArray){
const fullFilePath: string = path.join(localDirectory, fileName);
try {
if (fs.lstatSync(fullFilePath).isFile()) {
await uploadFileToAzure(fileServerClient, azureDirectory, fileName, azureShare, fullFilePath);
} else {
// If filePath is a directory, recuisively copy it to azure
await uploadDirectory(fileServerClient, azureDirectory + '/' + fileName, azureShare, fullFilePath);
}
} catch(error) {
deferred.reject(error);
return deferred.promise;
}
}
// All files/directories are copied successfully, resolve
deferred.resolve();
return deferred.promise;
}
/**
* downlod a directory from azure
* @param fileServerClient
* @param azureDirectory
* @param azureShare
* @param localDirectory
*/
export async function downloadDirectory(fileServerClient: any, azureDirectory:any, azureShare: any, localDirectory: any): Promise<void>{
const deferred: Deferred<void> = new Deferred<void>();
mkDirP(localDirectory);
fileServerClient.listFilesAndDirectoriesSegmented(azureShare, azureDirectory, 'null', function(error: any, result: any, response: any) {
if(('entries' in result) === false){
getLogger().error(`list files failed, can't get entries in result`);
throw new Error(`list files failed, can't get entries in result`);
}
if(('files' in result['entries']) === false){
getLogger().error(`list files failed, can't get files in result['entries']`);
throw new Error(`list files failed, can't get files in result['entries']`);
}
if(('directories' in result['directories']) === false){
getLogger().error(`list files failed, can't get directories in result['entries']`);
throw new Error(`list files failed, can't get directories in result['entries']`);
}
for(var fileName of result['entries']['files']){
const fullFilePath: string = path.join(localDirectory, fileName.name);
downloadFile(fileServerClient, azureDirectory, fileName.name, azureShare, fullFilePath)
}
for(var directoryName of result['entries']['directories']){
const fullDirectoryPath: string = path.join(localDirectory, directoryName.name)
const fullAzureDirectory: string = path.join(azureDirectory, directoryName.name)
downloadDirectory(fileServerClient, fullAzureDirectory, azureShare, fullDirectoryPath)
}
deferred.resolve();
})
return deferred.promise;
}
}
......@@ -45,8 +45,10 @@ export const kubeflowOperatorMap : Map<KubeflowOperator, KubeflowOperatorPlural>
export class KubeflowClusterConfig {
/** Name of Kubeflow operator, like tf-operator */
public readonly operator: KubeflowOperator;
public readonly nfs: NFSConfig;
public readonly nfs?: NFSConfig;
public readonly kubernetesServer: string;
public readonly keyVault?: keyVaultConfig;
public readonly azureStorage?: AzureStorage;
/**
* Constructor
......@@ -54,10 +56,12 @@ export class KubeflowClusterConfig {
* @param passWord password of Kubeflow Cluster
* @param host Host IP of Kubeflow Cluster
*/
constructor(operator: KubeflowOperator, nfs : NFSConfig, kubernetesServer : string) {
constructor(operator: KubeflowOperator, kubernetesServer : string, nfs?: NFSConfig, keyVault?: keyVaultConfig, azureStorage ?: AzureStorage) {
this.operator = operator;
this.nfs = nfs;
this.kubernetesServer = kubernetesServer;
this.keyVault = keyVault;
this.azureStorage = azureStorage;
}
}
......@@ -76,6 +80,37 @@ export class NFSConfig {
}
}
/**
* KeyVault configuration to store the key of Azure Storage Service
* Refer https://docs.microsoft.com/en-us/azure/key-vault/key-vault-manage-with-cli2
*/
export class keyVaultConfig {
/**The vault-name to specify vault */
public readonly vaultName : string;
/**The name to specify private key */
public readonly name : string;
constructor(vaultName : string, name : string){
this.vaultName = vaultName;
this.name = name;
}
}
/**
* Azure Storage Service
*/
export class AzureStorage {
/**The azure share to storage files */
public readonly azureShare : string;
/**The account name of sotrage service */
public readonly accountName: string;
constructor(azureShare : string, accountName: string){
this.azureShare = azureShare;
this.accountName = accountName;
}
}
/**
* Trial job configuration for Kubeflow
*/
......
......@@ -40,8 +40,11 @@ import { KubeflowClusterConfig, kubeflowOperatorMap, KubeflowTrialConfig, NFSCon
import { KubeflowTrialJobDetail } from './kubeflowData';
import { KubeflowJobRestServer } from './kubeflowJobRestServer';
import { KubeflowJobInfoCollector } from './kubeflowJobInfoCollector';
import { AzureStorageClientUtility } from './azureStorageClientUtils';
import * as azureStorage from 'azure-storage';
var yaml = require('node-yaml');
var azure = require('azure-storage');
type DistTrainRole = 'worker' | 'ps';
......@@ -66,6 +69,10 @@ class KubeflowTrainingService implements TrainingService {
private kubeflowRestServerPort?: number;
private kubeflowJobPlural?: string;
private readonly CONTAINER_MOUNT_PATH: string;
private azureStorageClient?: azureStorage.FileService;
private azureStorageShare?: string;
private azureStorageSecretName?: string;
private azureStorageAccountName?: string;
private nniManagerIpConfig?: NNIManagerIpConfig;
constructor() {
......@@ -76,7 +83,7 @@ class KubeflowTrainingService implements TrainingService {
this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.experimentId = getExperimentId();
this.nextTrialSequenceId = -1;
this.CONTAINER_MOUNT_PATH = '/tmp/nfs';
this.CONTAINER_MOUNT_PATH = '/tmp/mount';
}
public async run(): Promise<void> {
......@@ -175,13 +182,31 @@ class KubeflowTrainingService implements TrainingService {
'utf-8'
);
let trialJobDetail: KubeflowTrialJobDetail;
//The url used in trialJobDetail
let trialJobDetailUrl: string;
if(this.kubeflowClusterConfig.nfs) {
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy code files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = this.kubeflowClusterConfig.nfs;
const trialJobDetail: KubeflowTrialJobDetail = new KubeflowTrialJobDetail(
trialJobDetailUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`
} else {
try{
//upload local files to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${trialLocalTempFolder}`);
trialJobDetailUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`
}catch(error){
this.log.error(error);
return Promise.reject(error);
}
}
trialJobDetail = new KubeflowTrialJobDetail(
trialJobId,
'WAITING',
Date.now(),
......@@ -189,13 +214,12 @@ class KubeflowTrainingService implements TrainingService {
form,
kubeflowJobName,
curTrialSequenceId,
`nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`,
trialJobDetailUrl,
this.kubeflowJobPlural
);
// Create kubeflow training jobs
await cpp.exec(`kubectl create -f ${kubeflowJobYamlPath}`);
// Set trial job detail until kubectl create resource successfully
this.trialJobsMap.set(trialJobId, trialJobDetail);
......@@ -257,7 +281,8 @@ class KubeflowTrainingService implements TrainingService {
return Promise.reject(errorMessage);
}
const result: cpp.childProcessPromise.Result = await cpp.exec(`kubectl delete ${this.kubeflowJobPlural} -l app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()},trialId=${trialJobId}`);
const result: cpp.childProcessPromise.Result = await cpp.exec(`kubectl delete
${this.kubeflowJobPlural} -l app=${this.NNI_KUBEFLOW_TRIAL_LABEL},expId=${getExperimentId()},trialId=${trialJobId}`);
if(result.stderr) {
const errorMessage: string = `kubectl delete ${this.kubeflowJobPlural} for trial ${trialJobId} failed: ${result.stderr}`;
this.log.error(errorMessage);
......@@ -278,7 +303,6 @@ class KubeflowTrainingService implements TrainingService {
case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
this.kubeflowClusterConfig = <KubeflowClusterConfig>JSON.parse(value);
// If NFS config section is valid in config file, proceed to mount and config NFS
if(this.kubeflowClusterConfig.nfs) {
//Check and mount NFS mount point here
......@@ -293,6 +317,36 @@ class KubeflowTrainingService implements TrainingService {
this.log.error(mountError);
throw new Error(mountError);
}
}else if(this.kubeflowClusterConfig.keyVault && this.kubeflowClusterConfig.azureStorage){
const vaultName = this.kubeflowClusterConfig.keyVault.vaultName;
const valutKeyName = this.kubeflowClusterConfig.keyVault.name;
this.azureStorageAccountName = this.kubeflowClusterConfig.azureStorage.accountName;
this.azureStorageShare = this.kubeflowClusterConfig.azureStorage.azureShare;
try{
const result = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
if(result.stderr) {
const errorMessage: string = result.stderr;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
const storageAccountKey =JSON.parse(result.stdout).value;
//create storage client
this.azureStorageClient = azure.createFileService(this.azureStorageAccountName, storageAccountKey);
await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
//create sotrage secret
this.azureStorageSecretName = 'nni-secret-' + uniqueString(8).toLowerCase();
await cpp.exec(`kubectl create secret generic ${this.azureStorageSecretName} `
+ `--from-literal=azurestorageaccountname=${this.azureStorageAccountName} `
+ `--from-literal=azurestorageaccountkey=${storageAccountKey}`)
}catch(error){
this.log.error(`command error: ${error}`);
throw new Error(error);
}
}else{
const clusterConfigError: string = 'kubeflow cluster config format error!';
this.log.error(clusterConfigError);
throw new Error(clusterConfigError);
}
this.kubeflowJobPlural = kubeflowOperatorMap.get(this.kubeflowClusterConfig.operator);
......@@ -425,6 +479,32 @@ class KubeflowTrainingService implements TrainingService {
throw new Error('Kubeflow trial config is not initialized');
}
let volumeSpecMap = new Map<string, object>();
if(this.kubeflowClusterConfig.nfs){
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
nfs: {
server: `${this.kubeflowClusterConfig.nfs.server}`,
path: `${this.kubeflowClusterConfig.nfs.path}`
}
}])
}else if(this.kubeflowClusterConfig.azureStorage && this.kubeflowClusterConfig.keyVault){
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
azureFile: {
secretName: `${this.azureStorageSecretName}`,
shareName: `${this.azureStorageShare}`,
readonly: false
}
}])
}else{
const clusterConfigError: string = 'kubeflow cluster config format error!';
this.log.error(clusterConfigError);
throw new Error(clusterConfigError);
}
return {
replicas: replicaNumber,
template: {
......@@ -439,20 +519,15 @@ class KubeflowTrainingService implements TrainingService {
name: 'tensorflow',
image: replicaImage,
args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [{
name: 'nni-nfs-vol',
volumeMounts: [
{
name: 'nni-vol',
mountPath: this.CONTAINER_MOUNT_PATH
}],
resources: podResources
}],
restartPolicy: 'ExitCode',
volumes: [{
name: 'nni-nfs-vol',
nfs: {
server: `${this.kubeflowClusterConfig.nfs.server}`,
path: `${this.kubeflowClusterConfig.nfs.path}`
}
}]
volumes: volumeSpecMap.get('nniVolumes')
}
}
};
......@@ -504,7 +579,9 @@ class KubeflowTrainingService implements TrainingService {
runScriptLines.push('cp -rT $NNI_CODE_DIR $NNI_SYS_DIR');
runScriptLines.push('cd $NNI_SYS_DIR');
runScriptLines.push('sh install_nni.sh # Check and install NNI pkg');
runScriptLines.push(`python3 -m nni_trial_tool.trial_keeper --trial_command '${command}' --nnimanager_ip '${nniManagerIp}' --nnimanager_port '${this.kubeflowRestServerPort}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`);
runScriptLines.push(`python3 -m nni_trial_tool.trial_keeper --trial_command '${command}' `
+ `--nnimanager_ip '${nniManagerIp}' --nnimanager_port '${this.kubeflowRestServerPort}' `
+ `1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`);
return runScriptLines.join('\n');
}
......
......@@ -118,14 +118,24 @@ kubeflow_trial_schema = {
}
kubeflow_config_schema = {
'kubeflowConfig':{
'operator': Or('tf-operator', 'mxnet-operator', 'pytorch-operato'),
'kubeflowConfig':Or({
'operator': Or('tf-operator', 'mxnet-operator', 'pytorch-operator'),
'nfs': {
'server': str,
'path': str
},
'kubernetesServer': str
},{
'operator': Or('tf-operator', 'mxnet-operator', 'pytorch-operator'),
'keyVault': {
'vaultName': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
'name': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}')
},
'azureStorage': {
'accountName': Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),
'azureShare': Regex('([0-9]|[a-z]|[A-Z]|-){3,63}')
}
})
}
machine_list_schima = {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment