Unverified Commit fbffbc7c authored by Markus Bauer's avatar Markus Bauer Committed by GitHub
Browse files

[WIP] Enable optional Pod Spec for FrameworkController platform (#3379)

parent 38c9a734
......@@ -11,7 +11,6 @@
/ts/nni_manager/metrics.json
/ts/nni_manager/trial_jobs.json
# Logs
logs
*.log
......
......@@ -28,6 +28,16 @@ Prerequisite for Azure Kubernetes Service
#. Follow the `guideline <https://docs.microsoft.com/en-us/azure/storage/common/storage-quickstart-create-account?tabs=portal>`__ to create azure file storage account. If you use Azure Kubernetes Service, NNI need Azure Storage Service to store code files and the output files.
#. To access Azure storage service, NNI need the access key of the storage account, and NNI uses `Azure Key Vault <https://azure.microsoft.com/en-us/services/key-vault/>`__ Service to protect your private key. Set up Azure Key Vault Service, add a secret to Key Vault to store the access key of Azure storage account. Follow this `guideline <https://docs.microsoft.com/en-us/azure/key-vault/quick-create-cli>`__ to store the access key.
Prerequisite for PVC storage mode
-----------------------------------------
In order to use persistent volume claims instead of NFS or Azure storage, related storage must
be created manually, in the namespace your trials will run later. This restriction is due to the
fact, that persistent volume claims are hard to recycle and thus can quickly mess with a cluster's
storage management. Persistent volume claims can be created by e.g. using kubectl. Please refer
to the official Kubernetes documentation for `further information <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims>`__.
Setup FrameworkController
-------------------------
......@@ -116,6 +126,37 @@ Trial configuration in frameworkcontroller mode have the following configuration
* image: the docker image used to create pod and run the program.
* frameworkAttemptCompletionPolicy: the policy to run framework, please refer the `user-manual <https://github.com/Microsoft/frameworkcontroller/blob/master/doc/user-manual.md#frameworkattemptcompletionpolicy>`__ to get the specific information. Users could use the policy to control the pod, for example, if ps does not stop, only worker stops, The completion policy could helps stop ps.
NNI also offers the possibility to include a customized frameworkcontroller template similar
to the aforementioned tensorflow example. A valid configuration the may look like:
.. code-block:: yaml
experimentName: example_mnist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 2
logLevel: trace
trainingServicePlatform: frameworkcontroller
searchSpacePath: search_space.json
tuner:
builtinTunerName: TPE
classArgs:
optimize_mode: maximize
assessor:
builtinAssessorName: Medianstop
classArgs:
optimize_mode: maximize
trial:
codeDir: .
frameworkcontrollerConfig:
configPath: fc_template.yml
storage: pvc
namespace: twin-pipelines
pvc:
path: /mnt/data
Note that in this example a persistent volume claim has been used, that must be created manually in the specified namespace beforehand. Stick to the mnist-pytorch example (:githublink: `<examples/trials/mnist-pytorch>`__) for a more detailed config (:githublink: `<examples/trials/mnist-pytorch/config_frameworkcontroller_custom.yml>`__) and frameworkcontroller template (:githublink: `<examples/trials/fc_template.yml>`__).
How to run example
------------------
......
authorName: default
experimentName: example_mnist_pytorch
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
logLevel: trace
#choice: local, remote, pai, kubeflow
trainingServicePlatform: frameworkcontroller
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
assessor:
builtinAssessorName: Medianstop
classArgs:
optimize_mode: maximize
trial:
codeDir: .
frameworkcontrollerConfig:
configPath: fc_template.yml
storage: pvc
namespace: "default"
pvc:
path: "/tmp/mount"
apiVersion: frameworkcontroller.microsoft.com/v1
kind: Framework
metadata:
name: pytorchcpu
namespace: default
spec:
executionType: Start
retryPolicy:
fancyRetryPolicy: true
maxRetryCount: 2
taskRoles:
- name: worker
taskNumber: 1
frameworkAttemptCompletionPolicy:
minFailedTaskCount: 1
minSucceededTaskCount: 3
task:
retryPolicy:
fancyRetryPolicy: false
maxRetryCount: 0
podGracefulDeletionTimeoutSec: 1800
pod:
spec:
restartPolicy: Never
hostNetwork: false
containers:
- name: mnist-pytorch
image: msranni/nni:latest
command: ["python", "mnist.py"]
ports:
- containerPort: 5001
volumeMounts:
- name: frameworkbarrier-volume
mountPath: /mnt/frameworkbarrier
- name: data-volume
mountPath: /tmp/mount
serviceAccountName: frameworkbarrier
initContainers:
- name: frameworkbarrier
image: frameworkcontroller/frameworkbarrier
volumeMounts:
- name: frameworkbarrier-volume
mountPath: /mnt/frameworkbarrier
volumes:
- name: frameworkbarrier-volume
emptyDir: {}
- name: data-volume
persistentVolumeClaim:
claimName: nni-storage
......@@ -4,11 +4,17 @@
import json
import logging
import os
import netifaces
from schema import Schema, And, Optional, Regex, Or, SchemaError
from nni.tools.package_utils import create_validator_instance, get_all_builtin_names, get_registered_algo_meta
from .constants import SCHEMA_TYPE_ERROR, SCHEMA_RANGE_ERROR, SCHEMA_PATH_ERROR
from nni.tools.package_utils import (
create_validator_instance,
get_all_builtin_names,
get_registered_algo_meta,
)
from schema import And, Optional, Or, Regex, Schema, SchemaError
from .common_utils import get_yml_content, print_warning
from .constants import SCHEMA_PATH_ERROR, SCHEMA_RANGE_ERROR, SCHEMA_TYPE_ERROR
def setType(key, valueType):
......@@ -183,9 +189,9 @@ pai_yarn_trial_schema = {
Optional('virtualCluster'): setType('virtualCluster', str),
Optional('nasMode'): setChoice('nasMode', 'classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'),
Optional('portList'): [{
"label": setType('label', str),
"beginAt": setType('beginAt', int),
"portNumber": setType('portNumber', int)
'label': setType('label', str),
'beginAt': setType('beginAt', int),
'portNumber': setType('portNumber', int)
}]
}
}
......@@ -376,7 +382,7 @@ kubeflow_config_schema = {
frameworkcontroller_trial_schema = {
'trial': {
'codeDir': setPathCheck('codeDir'),
'taskRoles': [{
Optional('taskRoles'): [{
'name': setType('name', str),
'taskNum': setType('taskNum', int),
'frameworkAttemptCompletionPolicy': {
......@@ -395,14 +401,22 @@ frameworkcontroller_trial_schema = {
frameworkcontroller_config_schema = {
'frameworkcontrollerConfig': Or({
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'),
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage', 'pvc'),
Optional('serviceAccountName'): setType('serviceAccountName', str),
'nfs': {
'server': setType('server', str),
'path': setType('path', str)
}
},
Optional('namespace'): setType('namespace', str),
Optional('configPath'): setType('configPath', str),
}, {
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'),
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage', 'pvc'),
Optional('serviceAccountName'): setType('serviceAccountName', str),
'configPath': setType('configPath', str),
'pvc': {'path': setType('server', str)},
Optional('namespace'): setType('namespace', str),
}, {
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage', 'pvc'),
Optional('serviceAccountName'): setType('serviceAccountName', str),
'keyVault': {
'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
......@@ -416,7 +430,9 @@ frameworkcontroller_config_schema = {
'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),
error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)')
},
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999)
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999),
Optional('namespace'): setType('namespace', str),
Optional('configPath'): setType('configPath', str),
})
}
......@@ -479,6 +495,7 @@ class NNIConfigSchema:
self.validate_kubeflow_operators(experiment_config)
self.validate_eth0_device(experiment_config)
self.validate_hybrid_platforms(experiment_config)
self.validate_frameworkcontroller_trial_config(experiment_config)
def validate_tuner_adivosr_assessor(self, experiment_config):
if experiment_config.get('advisor'):
......@@ -588,7 +605,7 @@ class NNIConfigSchema:
and not experiment_config.get('nniManagerIp') \
and 'eth0' not in netifaces.interfaces():
raise SchemaError('This machine does not contain eth0 network device, please set nniManagerIp in config file!')
def validate_hybrid_platforms(self, experiment_config):
required_config_name_map = {
'remote': 'machineList',
......@@ -600,4 +617,25 @@ class NNIConfigSchema:
config_name = required_config_name_map.get(platform)
if config_name and not experiment_config.get(config_name):
raise SchemaError('Need to set {0} for {1} in hybrid mode!'.format(config_name, platform))
\ No newline at end of file
def validate_frameworkcontroller_trial_config(self, experiment_config):
if experiment_config.get('trainingServicePlatform') == 'frameworkcontroller':
if not experiment_config.get('trial').get('taskRoles'):
if not experiment_config.get('frameworkcontrollerConfig').get('configPath'):
raise SchemaError("""If no taskRoles are specified a valid custom frameworkcontroller config should
be set using the configPath attribute in frameworkcontrollerConfig!""")
config_content = get_yml_content(experiment_config.get('frameworkcontrollerConfig').get('configPath'))
if not config_content.get('spec').get('taskRoles') or not len(config_content.get('spec').get('taskRoles')):
raise SchemaError('Invalid frameworkcontroller config! No taskRoles were specified!')
if not config_content.get('spec').get('taskRoles')[0].get('task'):
raise SchemaError('Invalid frameworkcontroller config! No task was specified for taskRole!')
names = []
for taskRole in config_content.get('spec').get('taskRoles'):
if not "name" in taskRole:
raise SchemaError('Invalid frameworkcontroller config! Name is missing for taskRole!')
names.append(taskRole.get("name"))
if len(names) > len(set(names)):
raise SchemaError('Invalid frameworkcontroller config! Duplicate taskrole names!')
if not config_content.get('metadata').get('name'):
raise SchemaError('Invalid frameworkcontroller config! No experiment name was specified!')
......@@ -100,6 +100,11 @@ def parse_path(experiment_config, config_path):
if experiment_config['trial'].get('paiConfigPath'):
parse_relative_path(root_path, experiment_config['trial'], 'paiConfigPath')
# For frameworkcontroller a custom configuration path may be specified
if experiment_config.get('frameworkcontrollerConfig'):
if experiment_config['frameworkcontrollerConfig'].get('configPath'):
parse_relative_path(root_path, experiment_config['frameworkcontrollerConfig'], 'configPath')
def set_default_values(experiment_config):
if experiment_config.get('maxExecDuration') is None:
experiment_config['maxExecDuration'] = '999d'
......
......@@ -63,14 +63,14 @@ export namespace ValidationSchemas {
command: joi.string().min(1).required()
}),
ps: joi.object({
replicas: joi.number().min(1).required(),
image: joi.string().min(1),
privateRegistryAuthPath: joi.string().min(1),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
replicas: joi.number().min(1).required(),
image: joi.string().min(1),
privateRegistryAuthPath: joi.string().min(1),
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
gpuNum: joi.number().min(0).required(),
command: joi.string().min(1).required()
}),
master: joi.object({
replicas: joi.number().min(1).required(),
......@@ -152,6 +152,10 @@ export namespace ValidationSchemas {
frameworkcontroller_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
storage: joi.string().min(1),
serviceAccountName: joi.string().min(1),
pvc: joi.object({
path: joi.string().min(1).required()
}),
configPath: joi.string().min(1),
nfs: joi.object({
server: joi.string().min(1).required(),
path: joi.string().min(1).required()
......@@ -164,14 +168,15 @@ export namespace ValidationSchemas {
accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/),
azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/)
}),
uploadRetryCount: joi.number().min(1)
uploadRetryCount: joi.number().min(1),
namespace: joi.string().min(1)
}),
dlts_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
dashboard: joi.string().min(1),
cluster: joi.string().min(1),
team: joi.string().min(1),
email: joi.string().min(1),
password: joi.string().min(1)
}),
......
......@@ -4,7 +4,7 @@
'use strict';
import * as fs from 'fs';
import { GeneralK8sClient, KubernetesCRDClient } from '../kubernetesApiClient';
import {GeneralK8sClient, KubernetesCRDClient} from '../kubernetesApiClient';
/**
* FrameworkController ClientV1
......@@ -13,14 +13,16 @@ class FrameworkControllerClientV1 extends KubernetesCRDClient {
/**
* constructor, to initialize frameworkcontroller CRD definition
*/
public constructor() {
public namespace: string;
public constructor(namespace?: string) {
super();
this.namespace = namespace ? namespace : "default"
this.crdSchema = JSON.parse(fs.readFileSync('./config/frameworkcontroller/frameworkcontrollerjob-crd-v1.json', 'utf8'));
this.client.addCustomResourceDefinition(this.crdSchema);
}
protected get operator(): any {
return this.client.apis['frameworkcontroller.microsoft.com'].v1.namespaces('default').frameworks;
return this.client.apis['frameworkcontroller.microsoft.com'].v1.namespaces(this.namespace).frameworks;
}
public get containerName(): string {
......@@ -35,9 +37,9 @@ class FrameworkControllerClientFactory {
/**
* Factory method to generate operator client
*/
public static createClient(): KubernetesCRDClient {
return new FrameworkControllerClientV1();
public static createClient(namespace?: string): KubernetesCRDClient {
return new FrameworkControllerClientV1(namespace);
}
}
export { FrameworkControllerClientFactory, GeneralK8sClient };
export {FrameworkControllerClientFactory, GeneralK8sClient};
......@@ -5,8 +5,10 @@
import * as assert from 'assert';
import { AzureStorage, KeyVaultConfig, KubernetesClusterConfig, KubernetesClusterConfigAzure, KubernetesClusterConfigNFS,
KubernetesStorageKind, KubernetesTrialConfig, KubernetesTrialConfigTemplate, NFSConfig, StorageConfig
import {
AzureStorage, KeyVaultConfig, KubernetesClusterConfig, KubernetesClusterConfigAzure, KubernetesClusterConfigNFS,
KubernetesStorageKind, KubernetesTrialConfig, KubernetesTrialConfigTemplate, NFSConfig, StorageConfig, KubernetesClusterConfigPVC,
PVCConfig,
} from '../kubernetesConfig';
export class FrameworkAttemptCompletionPolicy {
......@@ -26,8 +28,8 @@ export class FrameworkControllerTrialConfigTemplate extends KubernetesTrialConfi
public readonly name: string;
public readonly taskNum: number;
constructor(taskNum: number, command: string, gpuNum: number,
cpuNum: number, memoryMB: number, image: string,
frameworkAttemptCompletionPolicy: FrameworkAttemptCompletionPolicy, privateRegistryFilePath?: string | undefined) {
cpuNum: number, memoryMB: number, image: string,
frameworkAttemptCompletionPolicy: FrameworkAttemptCompletionPolicy, privateRegistryFilePath?: string | undefined) {
super(command, gpuNum, cpuNum, memoryMB, image, privateRegistryFilePath);
this.frameworkAttemptCompletionPolicy = frameworkAttemptCompletionPolicy;
this.name = name;
......@@ -47,60 +49,97 @@ export class FrameworkControllerTrialConfig extends KubernetesTrialConfig {
export class FrameworkControllerClusterConfig extends KubernetesClusterConfig {
public readonly serviceAccountName: string;
constructor(apiVersion: string, serviceAccountName: string) {
super(apiVersion);
constructor(apiVersion: string, serviceAccountName: string, configPath?: string, namespace?: string) {
super(apiVersion, undefined, namespace);
this.serviceAccountName = serviceAccountName;
}
}
export class FrameworkControllerClusterConfigPVC extends KubernetesClusterConfigPVC {
public readonly serviceAccountName: string;
public readonly configPath: string;
constructor(serviceAccountName: string, apiVersion: string, pvc: PVCConfig, configPath: string,
storage?: KubernetesStorageKind, namespace?: string) {
super(apiVersion, pvc, storage, namespace);
this.serviceAccountName = serviceAccountName;
this.configPath = configPath
}
public static getInstance(jsonObject: object): FrameworkControllerClusterConfigPVC {
const kubernetesClusterConfigObjectPVC: FrameworkControllerClusterConfigPVC = <FrameworkControllerClusterConfigPVC>jsonObject;
assert(kubernetesClusterConfigObjectPVC !== undefined);
return new FrameworkControllerClusterConfigPVC(
kubernetesClusterConfigObjectPVC.serviceAccountName,
kubernetesClusterConfigObjectPVC.apiVersion,
kubernetesClusterConfigObjectPVC.pvc,
kubernetesClusterConfigObjectPVC.configPath,
kubernetesClusterConfigObjectPVC.storage,
kubernetesClusterConfigObjectPVC.namespace
);
}
}
export class FrameworkControllerClusterConfigNFS extends KubernetesClusterConfigNFS {
public readonly serviceAccountName: string;
public readonly configPath?: string;
constructor(
serviceAccountName: string,
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind
) {
super(apiVersion, nfs, storage);
serviceAccountName: string,
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind,
namespace?: string,
configPath?: string
) {
super(apiVersion, nfs, storage, namespace);
this.serviceAccountName = serviceAccountName;
this.configPath = configPath
}
public static getInstance(jsonObject: object): FrameworkControllerClusterConfigNFS {
const kubeflowClusterConfigObjectNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>jsonObject;
assert (kubeflowClusterConfigObjectNFS !== undefined);
const kubernetesClusterConfigObjectNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>jsonObject;
assert(kubernetesClusterConfigObjectNFS !== undefined);
return new FrameworkControllerClusterConfigNFS(
kubeflowClusterConfigObjectNFS.serviceAccountName,
kubeflowClusterConfigObjectNFS.apiVersion,
kubeflowClusterConfigObjectNFS.nfs,
kubeflowClusterConfigObjectNFS.storage
kubernetesClusterConfigObjectNFS.serviceAccountName,
kubernetesClusterConfigObjectNFS.apiVersion,
kubernetesClusterConfigObjectNFS.nfs,
kubernetesClusterConfigObjectNFS.storage,
kubernetesClusterConfigObjectNFS.namespace
);
}
}
export class FrameworkControllerClusterConfigAzure extends KubernetesClusterConfigAzure {
public readonly serviceAccountName: string;
public readonly configPath?: string;
constructor(
serviceAccountName: string,
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind
) {
super(apiVersion, keyVault, azureStorage, storage);
serviceAccountName: string,
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind,
uploadRetryCount?: number,
namespace?: string,
configPath?: string
) {
super(apiVersion, keyVault, azureStorage, storage, uploadRetryCount, namespace);
this.serviceAccountName = serviceAccountName;
this.configPath = configPath
}
public static getInstance(jsonObject: object): FrameworkControllerClusterConfigAzure {
const kubeflowClusterConfigObjectAzure: FrameworkControllerClusterConfigAzure = <FrameworkControllerClusterConfigAzure>jsonObject;
const kubernetesClusterConfigObjectAzure: FrameworkControllerClusterConfigAzure = <FrameworkControllerClusterConfigAzure>jsonObject;
return new FrameworkControllerClusterConfigAzure(
kubeflowClusterConfigObjectAzure.serviceAccountName,
kubeflowClusterConfigObjectAzure.apiVersion,
kubeflowClusterConfigObjectAzure.keyVault,
kubeflowClusterConfigObjectAzure.azureStorage,
kubeflowClusterConfigObjectAzure.storage
kubernetesClusterConfigObjectAzure.serviceAccountName,
kubernetesClusterConfigObjectAzure.apiVersion,
kubernetesClusterConfigObjectAzure.keyVault,
kubernetesClusterConfigObjectAzure.azureStorage,
kubernetesClusterConfigObjectAzure.storage,
kubernetesClusterConfigObjectAzure.uploadRetryCount,
kubernetesClusterConfigObjectAzure.namespace
);
}
}
......@@ -108,20 +147,22 @@ export class FrameworkControllerClusterConfigAzure extends KubernetesClusterConf
export class FrameworkControllerClusterConfigFactory {
public static generateFrameworkControllerClusterConfig(jsonObject: object): FrameworkControllerClusterConfig {
const storageConfig: StorageConfig = <StorageConfig>jsonObject;
if (storageConfig === undefined) {
const storageConfig: StorageConfig = <StorageConfig>jsonObject;
if (storageConfig === undefined) {
throw new Error('Invalid json object as a StorageConfig instance');
}
if (storageConfig.storage !== undefined && storageConfig.storage === 'azureStorage') {
if (storageConfig.storage !== undefined && storageConfig.storage === 'azureStorage') {
return FrameworkControllerClusterConfigAzure.getInstance(jsonObject);
} else if (storageConfig.storage === undefined || storageConfig.storage === 'nfs') {
} else if (storageConfig.storage === undefined || storageConfig.storage === 'nfs') {
return FrameworkControllerClusterConfigNFS.getInstance(jsonObject);
}
throw new Error(`Invalid json object ${jsonObject}`);
} else if (storageConfig.storage !== undefined && storageConfig.storage === 'pvc') {
return FrameworkControllerClusterConfigPVC.getInstance(jsonObject);
}
throw new Error(`Invalid json object ${jsonObject}`);
}
}
export type FrameworkControllerJobStatus =
'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted';
'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted';
export type FrameworkControllerJobCompleteStatus = 'Succeeded' | 'Failed';
......@@ -202,8 +202,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, azureKubeflowClusterConfig.uploadRetryCount);
} else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
await cpp.exec(`mkdir -p ${this.trialLocalTempFolder}/${destDirectory}`);
await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalTempFolder}/${destDirectory}/.`);
const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
return `nfs://${nfsConfig.server}:${destDirectory}`;
......@@ -426,7 +426,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
}]);
}
// The config spec for container field
const containersSpecMap: Map<string, object> = new Map<string, object>();
const containersSpecMap: Map<string, object> = new Map<string, object>();
containersSpecMap.set('containers', [
{
// Kubeflow tensorflow operator requires that containers' name must be tensorflow
......
......@@ -4,8 +4,8 @@
'use strict';
// eslint-disable-next-line @typescript-eslint/camelcase
import { Client1_10, config } from 'kubernetes-client';
import { getLogger, Logger } from '../../common/log';
import {Client1_10, config} from 'kubernetes-client';
import {getLogger, Logger} from '../../common/log';
/**
* Generic Kubernetes client, target version >= 1.9
......@@ -16,13 +16,16 @@ class GeneralK8sClient {
protected namespace: string = 'default';
constructor() {
this.client = new Client1_10({ config: config.fromKubeconfig(), version: '1.9'});
this.client = new Client1_10({config: config.fromKubeconfig(), version: '1.9'});
this.client.loadSpec();
}
public set setNamespace(namespace: string) {
this.namespace = namespace;
}
public get getNamespace(): string {
return this.namespace;
}
private matchStorageClass(response: any): string {
const adlSupportedProvisioners: RegExp[] = [
......@@ -32,7 +35,7 @@ class GeneralK8sClient {
new RegExp("\\b" + "efs" + "\\b")
]
const templateLen = adlSupportedProvisioners.length,
responseLen = response.items.length
responseLen = response.items.length
let i = 0,
j = 0;
for (; i < responseLen; i++) {
......@@ -66,7 +69,7 @@ class GeneralK8sClient {
public async createDeployment(deploymentManifest: any): Promise<string> {
let result: Promise<string>;
const response: any = await this.client.apis.apps.v1.namespaces(this.namespace)
.deployments.post({ body: deploymentManifest })
.deployments.post({body: deploymentManifest})
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(response.body.metadata.uid);
} else {
......@@ -79,7 +82,7 @@ class GeneralK8sClient {
let result: Promise<boolean>;
// TODO: change this hard coded deployment name after demo
const response: any = await this.client.apis.apps.v1.namespaces(this.namespace)
.deployment(deploymentName).delete();
.deployment(deploymentName).delete();
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
......@@ -91,7 +94,7 @@ class GeneralK8sClient {
public async createConfigMap(configMapManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response: any = await this.client.api.v1.namespaces(this.namespace)
.configmaps.post({body: configMapManifest});
.configmaps.post({body: configMapManifest});
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
......@@ -104,7 +107,7 @@ class GeneralK8sClient {
public async createPersistentVolumeClaim(pvcManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response: any = await this.client.api.v1.namespaces(this.namespace)
.persistentvolumeclaims.post({body: pvcManifest});
.persistentvolumeclaims.post({body: pvcManifest});
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
......@@ -116,7 +119,7 @@ class GeneralK8sClient {
public async createSecret(secretManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response: any = await this.client.api.v1.namespaces(this.namespace)
.secrets.post({body: secretManifest});
.secrets.post({body: secretManifest});
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
......@@ -136,7 +139,7 @@ abstract class KubernetesCRDClient {
protected crdSchema: any;
constructor() {
this.client = new Client1_10({ config: config.fromKubeconfig() });
this.client = new Client1_10({config: config.fromKubeconfig()});
this.client.loadSpec();
}
......@@ -181,7 +184,7 @@ abstract class KubernetesCRDClient {
public async getKubernetesJob(kubeflowJobName: string): Promise<any> {
let result: Promise<any>;
const response: any = await this.operator(kubeflowJobName)
.get();
.get();
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(response.body);
} else {
......@@ -195,16 +198,16 @@ abstract class KubernetesCRDClient {
let result: Promise<boolean>;
// construct match query from labels for deleting tfjob
const matchQuery: string = Array.from(labels.keys())
.map((labelKey: string) => `${labelKey}=${labels.get(labelKey)}`)
.join(',');
.map((labelKey: string) => `${labelKey}=${labels.get(labelKey)}`)
.join(',');
try {
const deleteResult: any = await this.operator()
.delete({
qs: {
labelSelector: matchQuery,
propagationPolicy: 'Background'
}
});
.delete({
qs: {
labelSelector: matchQuery,
propagationPolicy: 'Background'
}
});
if (deleteResult.statusCode && deleteResult.statusCode >= 200 && deleteResult.statusCode <= 299) {
result = Promise.resolve(true);
} else {
......@@ -219,4 +222,4 @@ abstract class KubernetesCRDClient {
}
}
export { KubernetesCRDClient, GeneralK8sClient };
export {KubernetesCRDClient, GeneralK8sClient};
......@@ -3,16 +3,18 @@
'use strict';
export type KubernetesStorageKind = 'nfs' | 'azureStorage';
import { MethodNotImplementedError } from '../../common/errors';
export type KubernetesStorageKind = 'nfs' | 'azureStorage' | 'pvc';
import {MethodNotImplementedError} from '../../common/errors';
export abstract class KubernetesClusterConfig {
public readonly storage?: KubernetesStorageKind;
public readonly apiVersion: string;
public readonly namespace?: string;
constructor(apiVersion: string, storage?: KubernetesStorageKind) {
constructor(apiVersion: string, storage?: KubernetesStorageKind, namespace?: string) {
this.storage = storage;
this.apiVersion = apiVersion;
this.namespace = namespace
}
public get storageType(): KubernetesStorageKind {
......@@ -32,11 +34,12 @@ export class KubernetesClusterConfigNFS extends KubernetesClusterConfig {
public readonly nfs: NFSConfig;
constructor(
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind
) {
super(apiVersion, storage);
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind,
namespace?: string
) {
super(apiVersion, storage, namespace);
this.nfs = nfs;
}
......@@ -50,7 +53,8 @@ export class KubernetesClusterConfigNFS extends KubernetesClusterConfig {
return new KubernetesClusterConfigNFS(
kubernetesClusterConfigObjectNFS.apiVersion,
kubernetesClusterConfigObjectNFS.nfs,
kubernetesClusterConfigObjectNFS.storage
kubernetesClusterConfigObjectNFS.storage,
kubernetesClusterConfigObjectNFS.namespace
);
}
}
......@@ -61,13 +65,15 @@ export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
public readonly uploadRetryCount: number | undefined;
constructor(
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind,
uploadRetryCount?: number
) {
super(apiVersion, storage);
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind,
uploadRetryCount?: number,
namespace?: string,
) {
super(apiVersion, storage, namespace);
this.keyVault = keyVault;
this.azureStorage = azureStorage;
this.uploadRetryCount = uploadRetryCount;
......@@ -85,24 +91,54 @@ export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
kubernetesClusterConfigObjectAzure.keyVault,
kubernetesClusterConfigObjectAzure.azureStorage,
kubernetesClusterConfigObjectAzure.storage,
kubernetesClusterConfigObjectAzure.uploadRetryCount
kubernetesClusterConfigObjectAzure.uploadRetryCount,
kubernetesClusterConfigObjectAzure.namespace
);
}
}
export class KubernetesClusterConfigFactory {
export class KubernetesClusterConfigPVC extends KubernetesClusterConfig {
public readonly pvc: PVCConfig;
constructor(
apiVersion: string,
pvc: PVCConfig,
storage?: KubernetesStorageKind,
namespace?: string,
) {
super(apiVersion, storage, namespace);
this.pvc = pvc;
}
public get storageType(): KubernetesStorageKind {
return 'pvc';
}
public static getInstance(jsonObject: object): KubernetesClusterConfigPVC {
const kubernetesClusterConfigObjectPVC: KubernetesClusterConfigPVC =
<KubernetesClusterConfigPVC>jsonObject;
return new KubernetesClusterConfigPVC(
kubernetesClusterConfigObjectPVC.apiVersion,
kubernetesClusterConfigObjectPVC.pvc,
kubernetesClusterConfigObjectPVC.storage,
kubernetesClusterConfigObjectPVC.namespace
);
}
}
export class KubernetesClusterConfigFactory {
public static generateKubernetesClusterConfig(jsonObject: object): KubernetesClusterConfig {
const storageConfig: StorageConfig = <StorageConfig>jsonObject;
switch (storageConfig.storage) {
const storageConfig: StorageConfig = <StorageConfig>jsonObject;
switch (storageConfig.storage) {
case 'azureStorage':
return KubernetesClusterConfigAzure.getInstance(jsonObject);
case 'pvc':
return KubernetesClusterConfigPVC.getInstance(jsonObject);
case 'nfs':
case undefined:
return KubernetesClusterConfigNFS.getInstance(jsonObject);
default:
throw new Error(`Invalid json object ${jsonObject}`);
}
}
}
}
......@@ -121,6 +157,18 @@ export class NFSConfig {
}
}
/**
* PVC configuration to store Kubernetes job related files
*/
export class PVCConfig {
// Path of the mounted pvc
public readonly path: string;
constructor(path: string) {
this.path = path;
}
}
/**
* KeyVault configuration to store the key of Azure Storage Service
* Refer https://docs.microsoft.com/en-us/azure/key-vault/key-vault-manage-with-cli2
......@@ -175,7 +223,7 @@ export class KubernetesTrialConfigTemplate {
public readonly gpuNum: number;
constructor(command: string, gpuNum: number,
cpuNum: number, memoryMB: number, image: string, privateRegistryAuthPath?: string) {
cpuNum: number, memoryMB: number, image: string, privateRegistryAuthPath?: string) {
this.command = command;
this.gpuNum = gpuNum;
this.cpuNum = cpuNum;
......
......@@ -7,21 +7,21 @@ import * as cpp from 'child-process-promise';
import * as path from 'path';
import * as azureStorage from 'azure-storage';
import { EventEmitter } from 'events';
import { Base64 } from 'js-base64';
import { String } from 'typescript-string-operations';
import { getExperimentId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { MethodNotImplementedError } from '../../common/errors';
import {EventEmitter} from 'events';
import {Base64} from 'js-base64';
import {String} from 'typescript-string-operations';
import {getExperimentId} from '../../common/experimentStartupInfo';
import {getLogger, Logger} from '../../common/log';
import {MethodNotImplementedError} from '../../common/errors';
import {
NNIManagerIpConfig, TrialJobDetail, TrialJobMetric, LogType
} from '../../common/trainingService';
import { delay, getExperimentRootDir, getIPV4Address, getJobCancelStatus, getVersion, uniqueString } from '../../common/utils';
import { AzureStorageClientUtility } from './azureStorageClientUtils';
import { GeneralK8sClient, KubernetesCRDClient } from './kubernetesApiClient';
import { KubernetesClusterConfig } from './kubernetesConfig';
import { kubernetesScriptFormat, KubernetesTrialJobDetail } from './kubernetesData';
import { KubernetesJobRestServer } from './kubernetesJobRestServer';
import {delay, getExperimentRootDir, getIPV4Address, getJobCancelStatus, getVersion, uniqueString} from '../../common/utils';
import {AzureStorageClientUtility} from './azureStorageClientUtils';
import {GeneralK8sClient, KubernetesCRDClient} from './kubernetesApiClient';
import {KubernetesClusterConfig} from './kubernetesConfig';
import {kubernetesScriptFormat, KubernetesTrialJobDetail} from './kubernetesData';
import {KubernetesJobRestServer} from './kubernetesJobRestServer';
const fs = require('fs');
......@@ -34,7 +34,7 @@ abstract class KubernetesTrainingService {
protected readonly metricsEmitter: EventEmitter;
protected readonly trialJobsMap: Map<string, KubernetesTrialJobDetail>;
// experiment root dir in NFS
protected readonly trialLocalNFSTempFolder: string;
protected readonly trialLocalTempFolder: string;
protected stopping: boolean = false;
protected experimentId!: string;
protected kubernetesRestServerPort?: number;
......@@ -57,7 +57,7 @@ abstract class KubernetesTrainingService {
this.log = getLogger();
this.metricsEmitter = new EventEmitter();
this.trialJobsMap = new Map<string, KubernetesTrialJobDetail>();
this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.trialLocalTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.experimentId = getExperimentId();
this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.expContainerCodeFolder = path.join(this.CONTAINER_MOUNT_PATH, 'nni', this.experimentId, 'nni-code');
......@@ -124,7 +124,7 @@ abstract class KubernetesTrainingService {
}
public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail: KubernetesTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
const trialJobDetail: KubernetesTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
this.log.error(errorMessage);
......@@ -168,7 +168,7 @@ abstract class KubernetesTrainingService {
try {
await this.cancelTrialJob(trialJobId);
} catch (error) {
// DONT throw error during cleanup
// DONT throw error during cleanup
}
kubernetesTrialJob.status = 'SYS_CANCELED';
}
......@@ -191,9 +191,9 @@ abstract class KubernetesTrainingService {
// Unmount NFS
try {
await cpp.exec(`sudo umount ${this.trialLocalNFSTempFolder}`);
await cpp.exec(`sudo umount ${this.trialLocalTempFolder}`);
} catch (error) {
this.log.error(`Unmount ${this.trialLocalNFSTempFolder} failed, error is ${error}`);
this.log.error(`Unmount ${this.trialLocalTempFolder} failed, error is ${error}`);
}
// Stop kubernetes rest server
......@@ -230,14 +230,16 @@ abstract class KubernetesTrainingService {
await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
//create sotrage secret
this.azureStorageSecretName = String.Format('nni-secret-{0}', uniqueString(8)
.toLowerCase());
.toLowerCase());
const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
kind: 'Secret',
metadata: {
name: this.azureStorageSecretName,
namespace: 'default',
namespace: namespace,
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: getExperimentId()
......@@ -267,7 +269,7 @@ abstract class KubernetesTrainingService {
* @param trialSequenceId sequence id
*/
protected async generateRunScript(platform: string, trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleName: string, gpuNum: number): Promise<string> {
command: string, trialSequenceId: string, roleName: string, gpuNum: number): Promise<string> {
let nvidiaScript: string = '';
// Nvidia devcie plugin for K8S has a known issue that requesting zero GPUs allocates all GPUs
// Refer https://github.com/NVIDIA/k8s-device-plugin/issues/61
......@@ -297,11 +299,11 @@ abstract class KubernetesTrainingService {
return Promise.resolve(runScript);
}
protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise<void> {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
await cpp.exec(`mkdir -p ${this.trialLocalTempFolder}`);
try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalTempFolder}`);
} catch (error) {
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`;
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalTempFolder} failed, error is ${error}`;
this.log.error(mountError);
return Promise.reject(mountError);
......@@ -309,21 +311,35 @@ abstract class KubernetesTrainingService {
return Promise.resolve();
}
protected async createPVCStorage(pvcPath: string): Promise<void> {
try {
await cpp.exec(`mkdir -p ${pvcPath}`);
await cpp.exec(`sudo ln -s ${pvcPath} ${this.trialLocalTempFolder}`);
} catch (error) {
const linkError: string = `Linking ${pvcPath} to ${this.trialLocalTempFolder} failed, error is ${error}`;
this.log.error(linkError);
return Promise.reject(linkError);
}
return Promise.resolve();
}
protected async createRegistrySecret(filePath: string | undefined): Promise<string | undefined> {
if(filePath === undefined || filePath === '') {
if (filePath === undefined || filePath === '') {
return undefined;
}
const body = fs.readFileSync(filePath).toString('base64');
const registrySecretName = String.Format('nni-secret-{0}', uniqueString(8)
.toLowerCase());
.toLowerCase());
const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
kind: 'Secret',
metadata: {
name: registrySecretName,
namespace: 'default',
namespace: namespace,
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: getExperimentId()
......@@ -337,7 +353,7 @@ abstract class KubernetesTrainingService {
);
return registrySecretName;
}
/**
* upload local directory to azureStorage
* @param srcDirectory the source directory of local folder
......@@ -349,7 +365,7 @@ abstract class KubernetesTrainingService {
throw new Error('azureStorageClient is not initialized');
}
let retryCount: number = 1;
if(uploadRetryCount) {
if (uploadRetryCount) {
retryCount = uploadRetryCount;
}
let uploadSuccess: boolean = false;
......@@ -358,7 +374,7 @@ abstract class KubernetesTrainingService {
do {
uploadSuccess = await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient,
`${destDirectory}`,
`${destDirectory}`,
this.azureStorageShare,
`${srcDirectory}`);
if (!uploadSuccess) {
......@@ -378,4 +394,4 @@ abstract class KubernetesTrainingService {
return Promise.resolve(folderUriInAzure);
}
}
export { KubernetesTrainingService };
export {KubernetesTrainingService};
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment