Unverified Commit ef9e27b9 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support kubeflow reuse mode (#3919)

parent 322424ab
...@@ -253,3 +253,41 @@ version check ...@@ -253,3 +253,41 @@ version check
NNI support version check feature in since version 0.6, `refer <PaiMode.rst>`__ NNI support version check feature in since version 0.6, `refer <PaiMode.rst>`__
Any problems when using NNI in Kubeflow mode, please create issues on `NNI Github repo <https://github.com/Microsoft/nni>`__. Any problems when using NNI in Kubeflow mode, please create issues on `NNI Github repo <https://github.com/Microsoft/nni>`__.
Kubeflow reuse mode
----------------------
NNI support setting reuse mode for trial jobs. In reuse mode, NNI will submit a long-running trial runner process to occupy the container, and start trial jobs as the subprocess of the trial runner process, it means k8s do not need to schedule new container again, it just reuse old container.
Currently, kubeflow reuse mode only support V2 config.
Here is the example:
.. code-block:: yaml
searchSpaceFile: search_space.json
trialCommand: python3 mnist.py
trialGpuNumber: 0
trialConcurrency: 4
maxTrialNumber: 20
tuner:
name: TPE
classArgs:
optimize_mode: maximize
trainingService:
reuseMode: true
platform: kubeflow
worker:
command: python3 mnist.py
code_directory: .
dockerImage: msranni/nni
cpuNumber: 1
gpuNumber: 0
memorySize: 8192
replicas: 1
operator: tf-operator
storage:
storageType: azureStorage
azureAccount: {your_account}
azureShare: {your_share}
keyVaultName: {your_valut_name}
keyVaultKey: {your_valut_key}
apiVersion: v1
...@@ -8,21 +8,21 @@ from .base import ConfigBase ...@@ -8,21 +8,21 @@ from .base import ConfigBase
from .common import TrainingServiceConfig from .common import TrainingServiceConfig
from . import util from . import util
__all__ = ['KubeflowConfig', 'KubeflowRoleConfig', 'KubeflowNfsConfig', 'KubeflowAzureStorageConfig'] __all__ = ['KubeflowConfig', 'KubeflowRoleConfig', 'KubeflowStorageConfig', 'KubeflowNfsConfig', 'KubeflowAzureStorageConfig']
@dataclass(init=False) @dataclass(init=False)
class _KubeflowStorageConfig(ConfigBase): class KubeflowStorageConfig(ConfigBase):
storage: str storage_type: str
server: Optional[str] = None server: Optional[str] = None
path: Optional[str] = None path: Optional[str] = None
azure_account: Optional[str] = None azure_account: Optional[str] = None
azure_share: Optional[str] = None azure_share: Optional[str] = None
key_vault: Optional[str] = None key_vault_name: Optional[str] = None
key_vault_secret: Optional[str] = None key_vault_key: Optional[str] = None
@dataclass(init=False) @dataclass(init=False)
class KubeflowNfsConfig(_KubeflowStorageConfig): class KubeflowNfsConfig(KubeflowStorageConfig):
storage: str = 'nfs' storage: str = 'nfs'
server: str server: str
path: str path: str
...@@ -32,18 +32,19 @@ class KubeflowAzureStorageConfig(ConfigBase): ...@@ -32,18 +32,19 @@ class KubeflowAzureStorageConfig(ConfigBase):
storage: str = 'azureStorage' storage: str = 'azureStorage'
azure_account: str azure_account: str
azure_share: str azure_share: str
key_vault: str key_vault_name: str
key_vault_secret: str key_vault_key: str
@dataclass(init=False) @dataclass(init=False)
class KubeflowRoleConfig(ConfigBase): class KubeflowRoleConfig(ConfigBase):
replicas: int replicas: int
command: str command: str
gpu_number: int gpu_number: Optional[int] = 0
cpu_number: int cpu_number: int
memory_size: str memory_size: str
docker_image: str = 'msranni/nni:latest' docker_image: str = 'msranni/nni:latest'
code_directory: str
@dataclass(init=False) @dataclass(init=False)
...@@ -51,18 +52,21 @@ class KubeflowConfig(TrainingServiceConfig): ...@@ -51,18 +52,21 @@ class KubeflowConfig(TrainingServiceConfig):
platform: str = 'kubeflow' platform: str = 'kubeflow'
operator: str operator: str
api_version: str api_version: str
storage: _KubeflowStorageConfig storage: KubeflowStorageConfig
worker: KubeflowRoleConfig worker: Optional[KubeflowRoleConfig] = None
parameter_server: Optional[KubeflowRoleConfig] = None ps: Optional[KubeflowRoleConfig] = None
master: Optional[KubeflowRoleConfig] = None
reuse_mode: Optional[bool] = True #set reuse mode as true for v2 config
def __init__(self, **kwargs): def __init__(self, **kwargs):
kwargs = util.case_insensitive(kwargs) kwargs = util.case_insensitive(kwargs)
kwargs['storage'] = util.load_config(_KubeflowStorageConfig, kwargs.get('storage')) kwargs['storage'] = util.load_config(KubeflowStorageConfig, kwargs.get('storage'))
kwargs['worker'] = util.load_config(KubeflowRoleConfig, kwargs.get('worker')) kwargs['worker'] = util.load_config(KubeflowRoleConfig, kwargs.get('worker'))
kwargs['parameterserver'] = util.load_config(KubeflowRoleConfig, kwargs.get('parameterserver')) kwargs['ps'] = util.load_config(KubeflowRoleConfig, kwargs.get('ps'))
kwargs['master'] = util.load_config(KubeflowRoleConfig, kwargs.get('master'))
super().__init__(**kwargs) super().__init__(**kwargs)
_validation_rules = { _validation_rules = {
'platform': lambda value: (value == 'kubeflow', 'cannot be modified'), 'platform': lambda value: (value == 'kubeflow', 'cannot be modified'),
'operator': lambda value: value in ['tf-operator', 'pytorch-operator'] 'operator': lambda value: value in ['tf-operator', 'pytorch-operator']
} }
\ No newline at end of file
...@@ -5,6 +5,9 @@ ...@@ -5,6 +5,9 @@
import * as assert from 'assert'; import * as assert from 'assert';
import { KubeflowOperator, OperatorApiVersion } from '../training_service/kubernetes/kubeflow/kubeflowConfig'
import { KubernetesStorageKind } from '../training_service/kubernetes/kubernetesConfig';
export interface TrainingServiceConfig { export interface TrainingServiceConfig {
platform: string; platform: string;
} }
...@@ -68,35 +71,38 @@ export interface AmlConfig extends TrainingServiceConfig { ...@@ -68,35 +71,38 @@ export interface AmlConfig extends TrainingServiceConfig {
maxTrialNumberPerGpu: number; maxTrialNumberPerGpu: number;
} }
/* Kubeflow */
// FIXME: merge with shared storage config
export interface KubeflowStorageConfig { export interface KubeflowStorageConfig {
storage: string; storageType: string;
maxTrialNumberPerGpu?: number;
server?: string; server?: string;
path?: string; path?: string;
azureAccount?: string; azureAccount?: string;
azureShare?: string; azureShare?: string;
keyVault?: string; keyVaultName?: string;
keyVaultSecret?: string; keyVaultKey?: string;
} }
export interface KubeflowRoleConfig { export interface KubeflowRoleConfig {
replicas: number; replicas: number;
codeDirectory: string;
command: string; command: string;
gpuNumber: number; gpuNumber: number;
cpuNumber: number; cpuNumber: number;
memorySize: string; memorySize: number;
dockerImage: string; dockerImage: string;
privateRegistryAuthPath?: string;
} }
export interface KubeflowConfig extends TrainingServiceConfig { export interface KubeflowConfig extends TrainingServiceConfig {
platform: 'kubeflow'; platform: 'kubeflow';
operator: string; ps?: KubeflowRoleConfig;
apiVersion: string; master?: KubeflowRoleConfig;
worker?: KubeflowRoleConfig;
maxTrialNumberPerGpu: number;
operator: KubeflowOperator;
apiVersion: OperatorApiVersion;
storage: KubeflowStorageConfig; storage: KubeflowStorageConfig;
worker: KubeflowRoleConfig; reuseMode: boolean;
parameterServer?: KubeflowRoleConfig;
} }
/* FrameworkController */ /* FrameworkController */
...@@ -221,4 +227,4 @@ export function flattenConfig<T>(config: ExperimentConfig, platform: string): T ...@@ -221,4 +227,4 @@ export function flattenConfig<T>(config: ExperimentConfig, platform: string): T
Object.assign(flattened, config.trainingService); Object.assign(flattened, config.trainingService);
} }
return <T>flattened; return <T>flattened;
} }
\ No newline at end of file
...@@ -453,9 +453,6 @@ class NNIManager implements Manager { ...@@ -453,9 +453,6 @@ class NNIManager implements Manager {
if (platform === 'local') { if (platform === 'local') {
const module_ = await import('../training_service/local/localTrainingService'); const module_ = await import('../training_service/local/localTrainingService');
return new module_.LocalTrainingService(config); return new module_.LocalTrainingService(config);
} else if (platform === 'kubeflow') {
const module_ = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService');
return new module_.KubeflowTrainingService();
} else if (platform === 'frameworkcontroller') { } else if (platform === 'frameworkcontroller') {
const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService'); const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService');
return new module_.FrameworkControllerTrainingService(); return new module_.FrameworkControllerTrainingService();
......
...@@ -2,6 +2,7 @@ import { AMLEnvironmentService } from './amlEnvironmentService'; ...@@ -2,6 +2,7 @@ import { AMLEnvironmentService } from './amlEnvironmentService';
import { OpenPaiEnvironmentService } from './openPaiEnvironmentService'; import { OpenPaiEnvironmentService } from './openPaiEnvironmentService';
import { LocalEnvironmentService } from './localEnvironmentService'; import { LocalEnvironmentService } from './localEnvironmentService';
import { RemoteEnvironmentService } from './remoteEnvironmentService'; import { RemoteEnvironmentService } from './remoteEnvironmentService';
import { KubeflowEnvironmentService } from './kubernetes/kubeflowEnvironmentService';
import { EnvironmentService } from '../environment'; import { EnvironmentService } from '../environment';
import { ExperimentConfig } from '../../../common/experimentConfig'; import { ExperimentConfig } from '../../../common/experimentConfig';
import { ExperimentStartupInfo } from '../../../common/experimentStartupInfo'; import { ExperimentStartupInfo } from '../../../common/experimentStartupInfo';
...@@ -20,6 +21,8 @@ export async function createEnvironmentService(name: string, config: ExperimentC ...@@ -20,6 +21,8 @@ export async function createEnvironmentService(name: string, config: ExperimentC
return new AMLEnvironmentService(config, info); return new AMLEnvironmentService(config, info);
case 'openpai': case 'openpai':
return new OpenPaiEnvironmentService(config, info); return new OpenPaiEnvironmentService(config, info);
case 'kubeflow':
return new KubeflowEnvironmentService(config, info);
} }
const esConfig = await getCustomEnvironmentServiceConfig(name); const esConfig = await getCustomEnvironmentServiceConfig(name);
...@@ -29,4 +32,4 @@ export async function createEnvironmentService(name: string, config: ExperimentC ...@@ -29,4 +32,4 @@ export async function createEnvironmentService(name: string, config: ExperimentC
const esModule = importModule(esConfig.nodeModulePath); const esModule = importModule(esConfig.nodeModulePath);
const esClass = esModule[esConfig.nodeClassName] as any; const esClass = esModule[esConfig.nodeClassName] as any;
return new esClass(config, info); return new esClass(config, info);
} }
\ No newline at end of file
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import * as path from 'path';
import * as component from '../../../../common/component';
import { ExperimentConfig, KubeflowConfig, flattenConfig } from '../../../../common/experimentConfig';
import { ExperimentStartupInfo } from '../../../../common/experimentStartupInfo';
import { EnvironmentInformation } from '../../environment';
import { KubernetesEnvironmentService } from './kubernetesEnvironmentService';
import { KubeflowOperatorClientFactory } from '../../../kubernetes/kubeflow/kubeflowApiClient';
import { KubeflowClusterConfigAzure } from '../../../kubernetes/kubeflow/kubeflowConfig';
import { KeyVaultConfig, AzureStorage } from '../../../kubernetes/kubernetesConfig';
interface FlattenKubeflowConfig extends ExperimentConfig, KubeflowConfig { }
@component.Singleton
export class KubeflowEnvironmentService extends KubernetesEnvironmentService {
private config: FlattenKubeflowConfig;
private createStoragePromise?: Promise<void>;
constructor(config: ExperimentConfig, info: ExperimentStartupInfo) {
super(config, info);
this.experimentId = info.experimentId;
this.config = flattenConfig(config, 'kubeflow');
// Create kubernetesCRDClient
this.kubernetesCRDClient = KubeflowOperatorClientFactory.createClient(
this.config.operator, this.config.apiVersion);
// Create storage
if (this.config.storage.storageType === 'azureStorage') {
if (this.config.storage.azureShare === undefined ||
this.config.storage.azureAccount === undefined ||
this.config.storage.keyVaultName === undefined ||
this.config.storage.keyVaultKey === undefined) {
throw new Error("Azure storage configuration error!");
}
const azureStorage: AzureStorage = new AzureStorage(this.config.storage.azureShare, this.config.storage.azureAccount);
const keyValutConfig: KeyVaultConfig = new KeyVaultConfig(this.config.storage.keyVaultName, this.config.storage.keyVaultKey);
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = new KubeflowClusterConfigAzure(
this.config.operator, this.config.apiVersion, keyValutConfig, azureStorage);
this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
this.createStoragePromise = this.createAzureStorage(
azureKubeflowClusterConfig.keyVault.vaultName,
azureKubeflowClusterConfig.keyVault.name
);
} else if (this.config.storage.storageType === 'nfs') {
if (this.config.storage.server === undefined ||
this.config.storage.path === undefined) {
throw new Error("NFS storage configuration error!");
}
this.createStoragePromise = this.createNFSStorage(
this.config.storage.server,
this.config.storage.path
);
}
}
public get environmentMaintenceLoopInterval(): number {
return 5000;
}
public get hasStorageService(): boolean {
return false;
}
public get getName(): string {
return 'kubeflow';
}
public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
if (this.kubernetesCRDClient === undefined) {
throw new Error("kubernetesCRDClient not initialized!");
}
if (this.createStoragePromise) {
await this.createStoragePromise;
}
environment.command = `mv envs outputs/envs && cd outputs && ${environment.command}`;
if (this.config.deprecated && this.config.deprecated.useActiveGpu !== undefined) {
environment.useActiveGpu = this.config.deprecated.useActiveGpu;
}
environment.maxTrialNumberPerGpu = this.config.maxTrialNumberPerGpu;
const kubeflowJobName: string = `nni-exp-${this.experimentId}-env-${environment.id}`.toLowerCase();
await fs.promises.writeFile(path.join(this.environmentLocalTempFolder, "run.sh"), environment.command, { encoding: 'utf8' });
//upload script files to sotrage
const trialJobOutputUrl: string = await this.uploadFolder(this.environmentLocalTempFolder, `nni/${this.experimentId}`);
environment.trackingUrl = trialJobOutputUrl;
// Generate kubeflow job resource config object
const kubeflowJobConfig: any = await this.prepareKubeflowConfig(environment.id, kubeflowJobName);
// Create kubeflow job based on generated kubeflow job resource config
await this.kubernetesCRDClient.createKubernetesJob(kubeflowJobConfig);
}
/**
* upload local folder to nfs or azureStroage
*/
private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.config.storage.storageType === 'azureStorage') {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, 2);
} else {
// do not need to upload files to nfs server, temp folder already mounted to nfs
return `nfs://${this.config.storage.server}:${destDirectory}`;
}
}
private async prepareKubeflowConfig(envId: string, kubeflowJobName: string): Promise<any> {
const workerPodResources: any = {};
if (this.config.worker !== undefined) {
workerPodResources.requests = this.generatePodResource(this.config.worker.memorySize, this.config.worker.cpuNumber,
this.config.worker.gpuNumber);
}
workerPodResources.limits = {...workerPodResources.requests};
const nonWorkerResources: any = {};
if (this.config.operator === 'tf-operator') {
if (this.config.ps !== undefined) {
nonWorkerResources.requests = this.generatePodResource(this.config.ps.memorySize, this.config.ps.cpuNumber,
this.config.ps.gpuNumber);
nonWorkerResources.limits = {...nonWorkerResources.requests};
}
} else if (this.config.operator === 'pytorch-operator') {
if (this.config.master !== undefined) {
nonWorkerResources.requests = this.generatePodResource(this.config.master.memorySize, this.config.master.cpuNumber,
this.config.master.gpuNumber);
nonWorkerResources.limits = {...nonWorkerResources.requests};
}
}
// Generate kubeflow job resource config object
const kubeflowJobConfig: any = await this.generateKubeflowJobConfig(envId, kubeflowJobName, workerPodResources, nonWorkerResources);
return Promise.resolve(kubeflowJobConfig);
}
public generatePodResource(memory: number, cpuNum: number, gpuNum: number): any {
const resources: any = {
memory: `${memory}Mi`,
cpu: `${cpuNum}`
};
if (gpuNum !== 0) {
resources['nvidia.com/gpu'] = `${gpuNum}`;
}
return resources;
}
/**
* Generate kubeflow resource config file
* @param kubeflowJobName job name
* @param workerPodResources worker pod template
* @param nonWorkerPodResources non-worker pod template, like ps or master
*/
private async generateKubeflowJobConfig(envId: string, kubeflowJobName: string, workerPodResources: any,
nonWorkerPodResources?: any): Promise<any> {
if (this.kubernetesCRDClient === undefined) {
throw new Error('Kubeflow operator client is not initialized');
}
const replicaSpecsObj: any = {};
const replicaSpecsObjMap: Map<string, object> = new Map<string, object>();
if (this.config.operator === 'tf-operator') {
if (this.config.worker) {
const privateRegistrySecretName = await this.createRegistrySecret(this.config.worker.privateRegistryAuthPath);
replicaSpecsObj.Worker = this.generateReplicaConfig(this.config.worker.replicas,
this.config.worker.dockerImage, 'run.sh', workerPodResources, privateRegistrySecretName);
}
if (this.config.ps !== undefined) {
const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(this.config.ps.privateRegistryAuthPath);
replicaSpecsObj.Ps = this.generateReplicaConfig(this.config.ps.replicas,
this.config.ps.dockerImage, 'run.sh', nonWorkerPodResources, privateRegistrySecretName);
}
replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {tfReplicaSpecs: replicaSpecsObj});
} else if (this.config.operator === 'pytorch-operator') {
if (this.config.worker !== undefined) {
const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(this.config.worker.privateRegistryAuthPath);
replicaSpecsObj.Worker = this.generateReplicaConfig(this.config.worker.replicas,
this.config.worker.dockerImage, 'run.sh', workerPodResources, privateRegistrySecretName);
}
if (this.config.master !== undefined) {
const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(this.config.master.privateRegistryAuthPath);
replicaSpecsObj.Master = this.generateReplicaConfig(this.config.master.replicas,
this.config.master.dockerImage, 'run.sh', nonWorkerPodResources, privateRegistrySecretName);
}
replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {pytorchReplicaSpecs: replicaSpecsObj});
}
return Promise.resolve({
apiVersion: `kubeflow.org/${this.kubernetesCRDClient.apiVersion}`,
kind: this.kubernetesCRDClient.jobKind,
metadata: {
name: kubeflowJobName,
namespace: 'default',
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: this.experimentId,
envId: envId
}
},
spec: replicaSpecsObjMap.get(this.kubernetesCRDClient.jobKind)
});
}
/**
* Generate tf-operator's tfjobs replica config section
* @param replicaNumber replica number
* @param replicaImage image
* @param runScriptFile script file name
* @param podResources pod resource config section
*/
private generateReplicaConfig(replicaNumber: number, replicaImage: string, runScriptFile: string,
podResources: any, privateRegistrySecretName: string | undefined): any {
if (this.kubernetesCRDClient === undefined) {
throw new Error('Kubeflow operator client is not initialized');
}
// The config spec for volume field
const volumeSpecMap: Map<string, object> = new Map<string, object>();
if (this.config.storage.storageType === 'azureStorage') {
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
azureFile: {
secretName: `${this.azureStorageSecretName}`,
shareName: `${this.azureStorageShare}`,
readonly: false
}
}]);
} else {
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
nfs: {
server: `${this.config.storage.server}`,
path: `${this.config.storage.path}`
}
}]);
}
// The config spec for container field
const containersSpecMap: Map<string, object> = new Map<string, object>();
containersSpecMap.set('containers', [
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
// TODO: change the name based on operator's type
name: this.kubernetesCRDClient.containerName,
image: replicaImage,
args: ['sh', `${path.join(this.environmentWorkingFolder, runScriptFile)}`],
volumeMounts: [
{
name: 'nni-vol',
mountPath: this.CONTAINER_MOUNT_PATH
}],
resources: podResources
}
]);
const spec: any = {
containers: containersSpecMap.get('containers'),
restartPolicy: 'ExitCode',
volumes: volumeSpecMap.get('nniVolumes')
}
if (privateRegistrySecretName) {
spec.imagePullSecrets = [
{
name: privateRegistrySecretName
}]
}
return {
replicas: replicaNumber,
template: {
metadata: {
creationTimestamp: null
},
spec: spec
}
}
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as cpp from 'child-process-promise';
import * as path from 'path';
import * as azureStorage from 'azure-storage';
import {Base64} from 'js-base64';
import {String} from 'typescript-string-operations';
import { ExperimentConfig } from '../../../../common/experimentConfig';
import { ExperimentStartupInfo } from '../../../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../../../common/log';
import { EnvironmentInformation, EnvironmentService } from '../../environment';
import {GeneralK8sClient, KubernetesCRDClient} from '../../../kubernetes/kubernetesApiClient';
import {AzureStorageClientUtility} from '../../../kubernetes/azureStorageClientUtils';
import { KubeflowJobStatus } from '../../../kubernetes/kubeflow/kubeflowConfig';
import {delay, uniqueString} from '../../../../common/utils';
const fs = require('fs');
export class KubernetesEnvironmentService extends EnvironmentService {
protected azureStorageClient?: azureStorage.FileService;
protected azureStorageShare?: string;
protected azureStorageSecretName?: string;
protected azureStorageAccountName?: string;
protected genericK8sClient: GeneralK8sClient;
protected kubernetesCRDClient?: KubernetesCRDClient;
protected experimentRootDir: string;
protected experimentId: string;
// experiment root dir in NFS
protected environmentLocalTempFolder: string;
protected NNI_KUBERNETES_TRIAL_LABEL: string = 'nni-kubernetes-trial';
protected CONTAINER_MOUNT_PATH: string;
protected log: Logger = getLogger('KubernetesEnvironmentService');
protected environmentWorkingFolder: string;
constructor(config: ExperimentConfig, info: ExperimentStartupInfo) {
super();
this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.genericK8sClient = new GeneralK8sClient();
this.experimentRootDir = info.logDir;
this.environmentLocalTempFolder = path.join(this.experimentRootDir, "environment-temp");
this.experimentId = info.experimentId;
this.environmentWorkingFolder = path.join(this.CONTAINER_MOUNT_PATH, 'nni', this.experimentId);
}
public get environmentMaintenceLoopInterval(): number {
return 5000;
}
public get hasStorageService(): boolean {
return false;
}
public get getName(): string {
return 'kubernetes';
}
protected async createAzureStorage(vaultName: string, valutKeyName: string): Promise<void> {
try {
const result: any = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
if (result.stderr) {
const errorMessage: string = result.stderr;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
const storageAccountKey: any = JSON.parse(result.stdout).value;
if (this.azureStorageAccountName === undefined) {
throw new Error('azureStorageAccountName not initialized!');
}
//create storage client
this.azureStorageClient = azureStorage.createFileService(this.azureStorageAccountName, storageAccountKey);
await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
//create sotrage secret
this.azureStorageSecretName = String.Format('nni-secret-{0}', uniqueString(8)
.toLowerCase());
if (this.genericK8sClient === undefined) {
throw new Error("genericK8sClient undefined!");
}
const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
kind: 'Secret',
metadata: {
name: this.azureStorageSecretName,
namespace: namespace,
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: this.experimentId
}
},
type: 'Opaque',
data: {
azurestorageaccountname: Base64.encode(this.azureStorageAccountName),
azurestorageaccountkey: Base64.encode(storageAccountKey)
}
}
);
} catch (error) {
this.log.error(error);
return Promise.reject(error);
}
return Promise.resolve();
}
/**
* upload local directory to azureStorage
* @param srcDirectory the source directory of local folder
* @param destDirectory the target directory in azure
* @param uploadRetryCount the retry time when upload failed
*/
protected async uploadFolderToAzureStorage(srcDirectory: string, destDirectory: string, uploadRetryCount: number | undefined): Promise<string> {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
let retryCount: number = 1;
if (uploadRetryCount) {
retryCount = uploadRetryCount;
}
let uploadSuccess: boolean = false;
let folderUriInAzure = '';
try {
do {
uploadSuccess = await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient,
`${destDirectory}`,
this.azureStorageShare,
`${srcDirectory}`);
if (!uploadSuccess) {
//wait for 5 seconds to re-upload files
await delay(5000);
this.log.info('Upload failed, Retry: upload files to azure-storage');
} else {
folderUriInAzure = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${destDirectory}`;
break;
}
} while (retryCount-- >= 0)
} catch (error) {
this.log.error(error);
//return a empty url when got error
return Promise.resolve('');
}
return Promise.resolve(folderUriInAzure);
}
protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise<void> {
await cpp.exec(`mkdir -p ${this.environmentLocalTempFolder}`);
try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.environmentLocalTempFolder}`);
} catch (error) {
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.environmentLocalTempFolder} failed, error is ${error}`;
this.log.error(mountError);
return Promise.reject(mountError);
}
return Promise.resolve();
}
protected async createPVCStorage(pvcPath: string): Promise<void> {
try {
await cpp.exec(`mkdir -p ${pvcPath}`);
await cpp.exec(`sudo ln -s ${pvcPath} ${this.environmentLocalTempFolder}`);
} catch (error) {
const linkError: string = `Linking ${pvcPath} to ${this.environmentLocalTempFolder} failed, error is ${error}`;
this.log.error(linkError);
return Promise.reject(linkError);
}
return Promise.resolve();
}
protected async createRegistrySecret(filePath: string | undefined): Promise<string | undefined> {
if (filePath === undefined || filePath === '') {
return undefined;
}
const body = fs.readFileSync(filePath).toString('base64');
const registrySecretName = String.Format('nni-secret-{0}', uniqueString(8)
.toLowerCase());
const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
kind: 'Secret',
metadata: {
name: registrySecretName,
namespace: namespace,
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: this.experimentId
}
},
type: 'kubernetes.io/dockerconfigjson',
data: {
'.dockerconfigjson': body
}
}
);
return registrySecretName;
}
public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
environments.forEach(async (environment) => {
if (this.kubernetesCRDClient === undefined) {
throw new Error("kubernetesCRDClient undefined")
}
const kubeflowJobName: string = `nni-exp-${this.experimentId}-env-${environment.id}`.toLowerCase();
const kubernetesJobInfo = await this.kubernetesCRDClient.getKubernetesJob(kubeflowJobName);
if (kubernetesJobInfo.status && kubernetesJobInfo.status.conditions) {
const latestCondition: any = kubernetesJobInfo.status.conditions[kubernetesJobInfo.status.conditions.length - 1];
const tfJobType: KubeflowJobStatus = <KubeflowJobStatus>latestCondition.type;
switch (tfJobType) {
case 'Created':
environment.setStatus('WAITING');
break;
case 'Running':
environment.setStatus('RUNNING');
break;
case 'Failed':
environment.setStatus('FAILED');
break;
case 'Succeeded':
environment.setStatus('SUCCEEDED');
break;
default:
}
}
});
}
public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
throw new Error("Not implemented");
}
public async stopEnvironment(environment: EnvironmentInformation): Promise<void> {
if (this.kubernetesCRDClient === undefined) {
throw new Error('kubernetesCRDClient not initialized!');
}
try {
await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
[
['app', this.NNI_KUBERNETES_TRIAL_LABEL],
['expId', this.experimentId],
['envId', environment.id]
]
));
} catch (err) {
const errorMessage: string = `Delete env ${environment.id} failed: ${err}`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
}
}
...@@ -5,11 +5,12 @@ ...@@ -5,11 +5,12 @@
import { getLogger, Logger } from '../../common/log'; import { getLogger, Logger } from '../../common/log';
import { MethodNotImplementedError } from '../../common/errors'; import { MethodNotImplementedError } from '../../common/errors';
import { ExperimentConfig, RemoteConfig, OpenpaiConfig } from '../../common/experimentConfig'; import { ExperimentConfig, RemoteConfig, OpenpaiConfig, KubeflowConfig } from '../../common/experimentConfig';
import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric } from '../../common/trainingService'; import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric } from '../../common/trainingService';
import { delay } from '../../common/utils'; import { delay } from '../../common/utils';
import { PAITrainingService } from '../pai/paiTrainingService'; import { PAITrainingService } from '../pai/paiTrainingService';
import { RemoteMachineTrainingService } from '../remote_machine/remoteMachineTrainingService'; import { RemoteMachineTrainingService } from '../remote_machine/remoteMachineTrainingService';
import { KubeflowTrainingService } from '../kubernetes/kubeflow/kubeflowTrainingService';
import { TrialDispatcher } from './trialDispatcher'; import { TrialDispatcher } from './trialDispatcher';
...@@ -29,6 +30,8 @@ class RouterTrainingService implements TrainingService { ...@@ -29,6 +30,8 @@ class RouterTrainingService implements TrainingService {
instance.internalTrainingService = new RemoteMachineTrainingService(config); instance.internalTrainingService = new RemoteMachineTrainingService(config);
} else if (platform === 'openpai' && !(<OpenpaiConfig>config.trainingService).reuseMode) { } else if (platform === 'openpai' && !(<OpenpaiConfig>config.trainingService).reuseMode) {
instance.internalTrainingService = new PAITrainingService(config); instance.internalTrainingService = new PAITrainingService(config);
} else if (platform === 'kubeflow' && !(<KubeflowConfig>config.trainingService).reuseMode) {
instance.internalTrainingService = new KubeflowTrainingService();
} else { } else {
instance.internalTrainingService = await TrialDispatcher.construct(config); instance.internalTrainingService = await TrialDispatcher.construct(config);
} }
...@@ -125,4 +128,4 @@ class RouterTrainingService implements TrainingService { ...@@ -125,4 +128,4 @@ class RouterTrainingService implements TrainingService {
} }
} }
export { RouterTrainingService }; export { RouterTrainingService };
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment