Commit 4553de75 authored by SparkSnail's avatar SparkSnail Committed by fishyds
Browse files

Fix FrameworkController broken change (#628)

* add serviceAccountName

* add serviceAccountName for frameworkcontroller
parent df39b5ea
...@@ -108,6 +108,7 @@ export namespace ValidationSchemas { ...@@ -108,6 +108,7 @@ export namespace ValidationSchemas {
}), }),
frameworkcontroller_config: joi.object({ frameworkcontroller_config: joi.object({
storage: joi.string().min(1), storage: joi.string().min(1),
serviceAccountName: joi.string().min(1),
nfs: joi.object({ nfs: joi.object({
server: joi.string().min(1).required(), server: joi.string().min(1).required(),
path: joi.string().min(1).required() path: joi.string().min(1).required()
......
...@@ -18,8 +18,11 @@ ...@@ -18,8 +18,11 @@
*/ */
'use strict'; 'use strict';
import * as assert from 'assert';
import { KubernetesTrialConfig, KubernetesTrialConfigTemplate } from '../kubernetesConfig' import { KubernetesTrialConfig, KubernetesTrialConfigTemplate, KubernetesClusterConfigAzure,
KubernetesClusterConfigNFS, NFSConfig, KubernetesStorageKind, keyVaultConfig, AzureStorage, KubernetesClusterConfig,
StorageConfig } from '../kubernetesConfig'
export class FrameworkAttemptCompletionPolicy { export class FrameworkAttemptCompletionPolicy {
public readonly minFailedTaskCount: number; public readonly minFailedTaskCount: number;
...@@ -57,6 +60,80 @@ export class FrameworkControllerTrialConfig extends KubernetesTrialConfig{ ...@@ -57,6 +60,80 @@ export class FrameworkControllerTrialConfig extends KubernetesTrialConfig{
} }
} }
export class FrameworkControllerClusterConfig extends KubernetesClusterConfig {
public readonly serviceAccountName: string;
constructor(apiVersion: string, serviceAccountName: string) {
super(apiVersion);
this.serviceAccountName = serviceAccountName;
}
}
export class FrameworkControllerClusterConfigNFS extends KubernetesClusterConfigNFS {
public readonly serviceAccountName: string;
constructor(
serviceAccountName: string,
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind
) {
super(apiVersion, nfs, storage);
this.serviceAccountName = serviceAccountName;
}
public static getInstance(jsonObject: object): FrameworkControllerClusterConfigNFS {
let kubeflowClusterConfigObjectNFS = <FrameworkControllerClusterConfigNFS>jsonObject;
assert (kubeflowClusterConfigObjectNFS !== undefined)
return new FrameworkControllerClusterConfigNFS(
kubeflowClusterConfigObjectNFS.serviceAccountName,
kubeflowClusterConfigObjectNFS.apiVersion,
kubeflowClusterConfigObjectNFS.nfs,
kubeflowClusterConfigObjectNFS.storage
);
}
}
export class FrameworkControllerClusterConfigAzure extends KubernetesClusterConfigAzure {
public readonly serviceAccountName: string;
constructor(
serviceAccountName: string,
apiVersion: string,
keyVault: keyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind
) {
super(apiVersion, keyVault, azureStorage,storage);
this.serviceAccountName = serviceAccountName;
}
public static getInstance(jsonObject: object): FrameworkControllerClusterConfigAzure {
let kubeflowClusterConfigObjectAzure = <FrameworkControllerClusterConfigAzure>jsonObject;
return new FrameworkControllerClusterConfigAzure(
kubeflowClusterConfigObjectAzure.serviceAccountName,
kubeflowClusterConfigObjectAzure.apiVersion,
kubeflowClusterConfigObjectAzure.keyVault,
kubeflowClusterConfigObjectAzure.azureStorage,
kubeflowClusterConfigObjectAzure.storage
);
}
}
export class FrameworkControllerClusterConfigFactory {
public static generateFrameworkControllerClusterConfig(jsonObject: object): FrameworkControllerClusterConfig {
let storageConfig = <StorageConfig>jsonObject;
if(!storageConfig) {
throw new Error("Invalid json object as a StorageConfig instance");
}
if(storageConfig.storage && storageConfig.storage === 'azureStorage') {
return FrameworkControllerClusterConfigAzure.getInstance(jsonObject);
} else if (storageConfig.storage === undefined || storageConfig.storage === 'nfs') {
return FrameworkControllerClusterConfigNFS.getInstance(jsonObject);
}
throw new Error(`Invalid json object ${jsonObject}`);
}
}
export type FrameworkControllerJobStatus = 'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted'; export type FrameworkControllerJobStatus = 'AttemptRunning' | 'Completed' | 'AttemptCreationPending' | 'AttemptCreationRequested' | 'AttemptPreparing' | 'AttemptCompleted';
export type FrameworkControllerJobCompleteStatus = 'Succeeded' | 'Failed'; export type FrameworkControllerJobCompleteStatus = 'Succeeded' | 'Failed';
\ No newline at end of file
...@@ -32,12 +32,13 @@ import { ...@@ -32,12 +32,13 @@ import {
TrialJobDetail, NNIManagerIpConfig TrialJobDetail, NNIManagerIpConfig
} from '../../../common/trainingService'; } from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
import { NFSConfig, KubernetesClusterConfigNFS, KubernetesClusterConfigAzure, KubernetesClusterConfigFactory } from '../kubernetesConfig' import { NFSConfig } from '../kubernetesConfig'
import { KubernetesTrialJobDetail } from '../kubernetesData'; import { KubernetesTrialJobDetail } from '../kubernetesData';
import { validateCodeDir } from '../../common/util'; import { validateCodeDir } from '../../common/util';
import { AzureStorageClientUtility } from '../azureStorageClientUtils'; import { AzureStorageClientUtility } from '../azureStorageClientUtils';
import { KubernetesTrainingService } from '../kubernetesTrainingService'; import { KubernetesTrainingService } from '../kubernetesTrainingService';
import { FrameworkControllerTrialConfig } from './frameworkcontrollerConfig'; import { FrameworkControllerTrialConfig, FrameworkControllerClusterConfig, FrameworkControllerClusterConfigAzure, FrameworkControllerClusterConfigNFS,
FrameworkControllerClusterConfigFactory} from './frameworkcontrollerConfig';
import { FrameworkControllerJobRestServer } from './frameworkcontrollerJobRestServer'; import { FrameworkControllerJobRestServer } from './frameworkcontrollerJobRestServer';
import { FrameworkControllerClient } from './frameworkcontrollerApiClient'; import { FrameworkControllerClient } from './frameworkcontrollerApiClient';
import { FrameworkControllerJobInfoCollector } from './frameworkcontrollerJobInfoCollector'; import { FrameworkControllerJobInfoCollector } from './frameworkcontrollerJobInfoCollector';
...@@ -50,6 +51,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -50,6 +51,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
private fcTrialConfig?: FrameworkControllerTrialConfig; // frameworkcontroller trial configuration private fcTrialConfig?: FrameworkControllerTrialConfig; // frameworkcontroller trial configuration
private fcJobInfoCollector: FrameworkControllerJobInfoCollector; // frameworkcontroller job info collector private fcJobInfoCollector: FrameworkControllerJobInfoCollector; // frameworkcontroller job info collector
private fcContainerPortMap = new Map<string, number>(); // store frameworkcontroller container port private fcContainerPortMap = new Map<string, number>(); // store frameworkcontroller container port
private fcClusterConfig?: FrameworkControllerClusterConfig;
constructor() { constructor() {
super(); super();
...@@ -73,7 +75,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -73,7 +75,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
} }
public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> { public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
if(!this.kubernetesClusterConfig) { if(!this.fcClusterConfig) {
throw new Error('frameworkcontrollerClusterConfig is not initialized'); throw new Error('frameworkcontrollerClusterConfig is not initialized');
} }
if(!this.kubernetesCRDClient) { if(!this.kubernetesCRDClient) {
...@@ -129,13 +131,13 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -129,13 +131,13 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
* return: trialJobOutputUrl * return: trialJobOutputUrl
*/ */
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> { private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
if(!this.kubernetesClusterConfig) { if(!this.fcClusterConfig) {
throw new Error('Kubeflow Cluster config is not initialized'); throw new Error('Kubeflow Cluster config is not initialized');
} }
let trialJobOutputUrl: string = ''; let trialJobOutputUrl: string = '';
if(this.kubernetesClusterConfig.storageType === 'azureStorage') { if(this.fcClusterConfig.storageType === 'azureStorage') {
try{ try{
//upload local files to azure storage //upload local files to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient, await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
...@@ -146,8 +148,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -146,8 +148,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
this.log.error(error); this.log.error(error);
return Promise.reject(error); return Promise.reject(error);
} }
} else if(this.kubernetesClusterConfig.storageType === 'nfs') { } else if(this.fcClusterConfig.storageType === 'nfs') {
let nfsFrameworkControllerClusterConfig: KubernetesClusterConfigNFS = <KubernetesClusterConfigNFS>this.kubernetesClusterConfig; let nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
// Creat work dir for current trial in NFS directory // Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`); await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy code files from local dir to NFS mounted dir // Copy code files from local dir to NFS mounted dir
...@@ -170,7 +172,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -170,7 +172,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
throw new Error('frameworkcontroller trial config is not initialized'); throw new Error('frameworkcontroller trial config is not initialized');
} }
for(let taskRole of this.fcTrialConfig.taskRoles) { for(let taskRole of this.fcTrialConfig.taskRoles) {
portScript += `${taskRole.name}_port=${this.fcContainerPortMap.get(taskRole.name)} `; portScript += `FB_${taskRole.name.toUpperCase()}_PORT=${this.fcContainerPortMap.get(taskRole.name)} `;
} }
return `${portScript} . /mnt/frameworkbarrier/injector.sh && ${command}`; return `${portScript} . /mnt/frameworkbarrier/injector.sh && ${command}`;
} }
...@@ -229,9 +231,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -229,9 +231,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
case TrialConfigMetadataKey.FRAMEWORKCONTROLLER_CLUSTER_CONFIG: case TrialConfigMetadataKey.FRAMEWORKCONTROLLER_CLUSTER_CONFIG:
let frameworkcontrollerClusterJsonObject = JSON.parse(value); let frameworkcontrollerClusterJsonObject = JSON.parse(value);
this.kubernetesClusterConfig = KubernetesClusterConfigFactory.generateKubernetesClusterConfig(frameworkcontrollerClusterJsonObject); this.fcClusterConfig = FrameworkControllerClusterConfigFactory.generateFrameworkControllerClusterConfig(frameworkcontrollerClusterJsonObject);
if(this.kubernetesClusterConfig.storageType === 'azureStorage') { if(this.fcClusterConfig.storageType === 'azureStorage') {
let azureFrameworkControllerClusterConfig = <KubernetesClusterConfigAzure>this.kubernetesClusterConfig; let azureFrameworkControllerClusterConfig = <FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
this.azureStorageAccountName = azureFrameworkControllerClusterConfig.azureStorage.accountName; this.azureStorageAccountName = azureFrameworkControllerClusterConfig.azureStorage.accountName;
this.azureStorageShare = azureFrameworkControllerClusterConfig.azureStorage.azureShare; this.azureStorageShare = azureFrameworkControllerClusterConfig.azureStorage.azureShare;
await this.createAzureStorage( await this.createAzureStorage(
...@@ -240,8 +242,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -240,8 +242,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
azureFrameworkControllerClusterConfig.azureStorage.accountName, azureFrameworkControllerClusterConfig.azureStorage.accountName,
azureFrameworkControllerClusterConfig.azureStorage.azureShare azureFrameworkControllerClusterConfig.azureStorage.azureShare
); );
} else if(this.kubernetesClusterConfig.storageType === 'nfs') { } else if(this.fcClusterConfig.storageType === 'nfs') {
let nfsFrameworkControllerClusterConfig = <KubernetesClusterConfigNFS>this.kubernetesClusterConfig; let nfsFrameworkControllerClusterConfig = <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
await this.createNFSStorage( await this.createNFSStorage(
nfsFrameworkControllerClusterConfig.nfs.server, nfsFrameworkControllerClusterConfig.nfs.server,
nfsFrameworkControllerClusterConfig.nfs.path nfsFrameworkControllerClusterConfig.nfs.path
...@@ -292,7 +294,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -292,7 +294,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
* @param podResources pod template * @param podResources pod template
*/ */
private generateFrameworkControllerJobConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName : string, podResources : any) : any { private generateFrameworkControllerJobConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName : string, podResources : any) : any {
if(!this.kubernetesClusterConfig) { if(!this.fcClusterConfig) {
throw new Error('frameworkcontroller Cluster config is not initialized'); throw new Error('frameworkcontroller Cluster config is not initialized');
} }
...@@ -346,16 +348,16 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -346,16 +348,16 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string, podResources: any, containerPort: number): any { private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string, podResources: any, containerPort: number): any {
if(!this.kubernetesClusterConfig) { if(!this.fcClusterConfig) {
throw new Error('frameworkcontroller Cluster config is not initialized'); throw new Error('frameworkcontroller Cluster config is not initialized');
} }
if(!this.fcTrialConfig) { if(!this.fcTrialConfig) {
throw new Error('frameworkcontroller trial config is not initialized'); throw new Error('frameworkcontroller trial config is not initialized');
} }
let volumeSpecMap = new Map<string, object>(); let volumeSpecMap = new Map<string, object>();
if(this.kubernetesClusterConfig.storageType === 'azureStorage'){ if(this.fcClusterConfig.storageType === 'azureStorage'){
volumeSpecMap.set('nniVolumes', [ volumeSpecMap.set('nniVolumes', [
{ {
name: 'nni-vol', name: 'nni-vol',
...@@ -369,7 +371,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -369,7 +371,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
emptyDir: {} emptyDir: {}
}]) }])
}else { }else {
let frameworkcontrollerClusterConfigNFS: KubernetesClusterConfigNFS = <KubernetesClusterConfigNFS> this.kubernetesClusterConfig; let frameworkcontrollerClusterConfigNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS> this.fcClusterConfig;
volumeSpecMap.set('nniVolumes', [ volumeSpecMap.set('nniVolumes', [
{ {
name: 'nni-vol', name: 'nni-vol',
...@@ -382,41 +384,49 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -382,41 +384,49 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
emptyDir: {} emptyDir: {}
}]) }])
} }
let containers = [
{
name: 'framework',
image: replicaImage,
command: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [
{
name: 'nni-vol',
mountPath: this.CONTAINER_MOUNT_PATH
},{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}],
resources: podResources,
ports: [{
containerPort: containerPort
}]
}]
let initContainers = [
{
name: 'frameworkbarrier',
image: 'frameworkcontroller/frameworkbarrier',
volumeMounts: [
{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}]
}]
let spec: any = {
containers: containers,
initContainers: initContainers,
restartPolicy: 'OnFailure',
volumes: volumeSpecMap.get('nniVolumes'),
hostNetwork: false
};
if(this.fcClusterConfig.serviceAccountName) {
spec.serviceAccountName = this.fcClusterConfig.serviceAccountName;
}
let taskRole = { let taskRole = {
pod: { pod: {
spec: { spec: spec
containers: [
{
name: 'framework',
image: replicaImage,
command: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [
{
name: 'nni-vol',
mountPath: this.CONTAINER_MOUNT_PATH
},{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}],
resources: podResources,
ports: [{
containerPort: containerPort
}]
}],
initContainers: [
{
name: 'frameworkbarrier',
image: 'frameworkcontroller/frameworkbarrier',
volumeMounts: [
{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}]
}],
restartPolicy: 'OnFailure',
volumes: volumeSpecMap.get('nniVolumes'),
hostNetwork: false
}
} }
} }
return taskRole; return taskRole;
......
...@@ -32,8 +32,8 @@ export type OperatorApiVersion = 'v1alpha2' | 'v1beta1'; ...@@ -32,8 +32,8 @@ export type OperatorApiVersion = 'v1alpha2' | 'v1beta1';
export class KubeflowClusterConfig extends KubernetesClusterConfig { export class KubeflowClusterConfig extends KubernetesClusterConfig {
public readonly operator: KubeflowOperator; public readonly operator: KubeflowOperator;
constructor(codeDir: string, operator: KubeflowOperator) { constructor(apiVersion: string, operator: KubeflowOperator) {
super(codeDir); super(apiVersion);
this.operator = operator; this.operator = operator;
} }
} }
......
...@@ -216,12 +216,14 @@ frameworkcontroller_trial_schema = { ...@@ -216,12 +216,14 @@ frameworkcontroller_trial_schema = {
frameworkcontroller_config_schema = { frameworkcontroller_config_schema = {
'frameworkcontrollerConfig':Or({ 'frameworkcontrollerConfig':Or({
Optional('storage'): Or('nfs', 'azureStorage'), Optional('storage'): Or('nfs', 'azureStorage'),
Optional('serviceAccountName'): str,
'nfs': { 'nfs': {
'server': str, 'server': str,
'path': str 'path': str
} }
},{ },{
Optional('storage'): Or('nfs', 'azureStorage'), Optional('storage'): Or('nfs', 'azureStorage'),
Optional('serviceAccountName'): str,
'keyVault': { 'keyVault': {
'vaultName': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'), 'vaultName': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
'name': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}') 'name': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment