Unverified Commit 30643638 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support frameworkcontroller reuse mode (#4104)

parent afe42cea
......@@ -175,3 +175,42 @@ version check
-------------
NNI support version check feature in since version 0.6, `refer <PaiMode.rst>`__
FrameworkController reuse mode
------------------------------
NNI support setting reuse mode for trial jobs. In reuse mode, NNI will submit a long-running trial runner process to occupy the container, and start trial jobs as the subprocess of the trial runner process, it means k8s do not need to schedule new container again, it just reuse old container.
Currently, frameworkcontroller reuse mode only support V2 config.
Here is the example:
.. code-block:: yaml
searchSpaceFile: search_space.json
trialCommand: python3 mnist.py
trialGpuNumber: 0
trialConcurrency: 4
maxTrialNumber: 20
tuner:
name: TPE
classArgs:
optimize_mode: maximize
trainingService:
reuseMode: true
platform: frameworkcontroller
taskRoles:
- name:
dockerImage: 'msranni/nni:latest'
taskNumber: 1
command:
gpuNumber:
cpuNumber:
memorySize:
frameworkAttemptCompletionPolicy:
minFailedTaskCount: 1
minSucceedTaskCount: 1
storage:
storageType: azureStorage
azureAccount: {your_account}
azureShare: {your_share}
keyVaultName: {your_valut_name}
keyVaultKey: {your_valut_key}
......@@ -11,35 +11,24 @@ from . import util
__all__ = [
'FrameworkControllerConfig',
'FrameworkControllerRoleConfig',
'FrameworkControllerNfsConfig',
'FrameworkControllerAzureStorageConfig'
'_FrameworkControllerStorageConfig'
]
@dataclass(init=False)
class _FrameworkControllerStorageConfig(ConfigBase):
storage: str
storage_type: str
server: Optional[str] = None
path: Optional[str] = None
azure_account: Optional[str] = None
azure_share: Optional[str] = None
key_vault: Optional[str] = None
key_vault_secret: Optional[str] = None
key_vault_name: Optional[str] = None
key_vault_key: Optional[str] = None
@dataclass(init=False)
class FrameworkControllerNfsConfig(ConfigBase):
storage: str = 'nfs'
server: str
path: str
@dataclass(init=False)
class FrameworkControllerAzureStorageConfig(ConfigBase):
storage: str = 'azureStorage'
azure_account: str
azure_share: str
key_vault: str
key_vault_secret: str
class FrameworkAttemptCompletionPolicy(ConfigBase):
min_failed_task_count: int
min_succeed_task_count: int
@dataclass(init=False)
class FrameworkControllerRoleConfig(ConfigBase):
......@@ -50,8 +39,7 @@ class FrameworkControllerRoleConfig(ConfigBase):
gpu_number: int
cpu_number: int
memory_size: str
attempt_completion_min_failed_tasks: int
attempt_completion_min_succeeded_tasks: int
framework_attempt_completion_policy: FrameworkAttemptCompletionPolicy
@dataclass(init=False)
......@@ -60,6 +48,8 @@ class FrameworkControllerConfig(TrainingServiceConfig):
service_account_name: str
storage: _FrameworkControllerStorageConfig
task_roles: List[FrameworkControllerRoleConfig]
reuse_mode: Optional[bool] = True #set reuse mode as true for v2 config
service_account_name: Optional[str]
def __init__(self, **kwargs):
kwargs = util.case_insensitive(kwargs)
......
......@@ -90,7 +90,7 @@ export interface DlcConfig extends TrainingServiceConfig {
/* Kubeflow */
// FIXME: merge with shared storage config
export interface KubeflowStorageConfig {
export interface KubernetesStorageConfig {
storageType: string;
maxTrialNumberPerGpu?: number;
server?: string;
......@@ -120,31 +120,34 @@ export interface KubeflowConfig extends TrainingServiceConfig {
maxTrialNumberPerGpu: number;
operator: KubeflowOperator;
apiVersion: OperatorApiVersion;
storage: KubeflowStorageConfig;
storage: KubernetesStorageConfig;
reuseMode: boolean;
}
/* FrameworkController */
type FrameworkControllerStorageConfig = KubeflowStorageConfig;
export interface FrameworkControllerRoleConfig {
export interface FrameworkControllerTaskRoleConfig {
name: string;
dockerImage: string;
taskNumber: number;
command: string;
gpuNumber: number;
cpuNumber: number;
memorySize: string;
attemptCompletionMinFailedTasks: number;
attemptCompletionMinSucceededTasks: number;
memorySize: number;
dockerImage: string;
privateRegistryAuthPath?: string;
frameworkAttemptCompletionPolicy: {
minFailedTaskCount: number;
minSucceedTaskCount: number;
};
}
export interface FrameworkControllerConfig extends TrainingServiceConfig {
platform: 'frameworkcontroller';
taskRoles: FrameworkControllerTaskRoleConfig[];
maxTrialNumberPerGpu: number;
storage: KubernetesStorageConfig;
reuseMode: boolean;
namespace: 'default';
apiVersion: string;
serviceAccountName: string;
storage: FrameworkControllerStorageConfig;
taskRoles: FrameworkControllerRoleConfig[];
}
/* shared storage */
......
......@@ -3,6 +3,7 @@ import { OpenPaiEnvironmentService } from './openPaiEnvironmentService';
import { LocalEnvironmentService } from './localEnvironmentService';
import { RemoteEnvironmentService } from './remoteEnvironmentService';
import { KubeflowEnvironmentService } from './kubernetes/kubeflowEnvironmentService';
import { FrameworkControllerEnvironmentService } from './kubernetes/frameworkcontrollerEnvironmentService';
import { EnvironmentService } from '../environment';
import { ExperimentConfig } from 'common/experimentConfig';
import { ExperimentStartupInfo } from 'common/experimentStartupInfo';
......@@ -24,6 +25,8 @@ export async function createEnvironmentService(name: string, config: ExperimentC
return new OpenPaiEnvironmentService(config, info);
case 'kubeflow':
return new KubeflowEnvironmentService(config, info);
case 'frameworkcontroller':
return new FrameworkControllerEnvironmentService(config, info);
case 'dlc':
return new DlcEnvironmentService(config, info);
}
......
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import * as path from 'path';
import * as component from '../../../../common/component';
import { ExperimentConfig, FrameworkControllerConfig, flattenConfig, FrameworkControllerTaskRoleConfig } from '../../../../common/experimentConfig';
import { ExperimentStartupInfo } from '../../../../common/experimentStartupInfo';
import { EnvironmentInformation } from '../../environment';
import { KubernetesEnvironmentService } from './kubernetesEnvironmentService';
import { FrameworkControllerClientFactory } from '../../../kubernetes/frameworkcontroller/frameworkcontrollerApiClient';
import { FrameworkControllerClusterConfigAzure, FrameworkControllerJobStatus, FrameworkControllerTrialConfigTemplate,
FrameworkControllerJobCompleteStatus } from '../../../kubernetes/frameworkcontroller/frameworkcontrollerConfig';
import { KeyVaultConfig, AzureStorage } from '../../../kubernetes/kubernetesConfig';
interface FlattenKubeflowConfig extends ExperimentConfig, FrameworkControllerConfig { }
@component.Singleton
export class FrameworkControllerEnvironmentService extends KubernetesEnvironmentService {
private config: FlattenKubeflowConfig;
private createStoragePromise?: Promise<void>;
private readonly fcContainerPortMap: Map<string, number> = new Map<string, number>(); // store frameworkcontroller container port
constructor(config: ExperimentConfig, info: ExperimentStartupInfo) {
super(config, info);
this.experimentId = info.experimentId;
this.config = flattenConfig(config, 'frameworkcontroller');
// Create kubernetesCRDClient
this.kubernetesCRDClient = FrameworkControllerClientFactory.createClient(
this.config.namespace);
// Create storage
if (this.config.storage.storageType === 'azureStorage') {
if (this.config.storage.azureShare === undefined ||
this.config.storage.azureAccount === undefined ||
this.config.storage.keyVaultName === undefined ||
this.config.storage.keyVaultKey === undefined) {
throw new Error("Azure storage configuration error!");
}
const azureStorage: AzureStorage = new AzureStorage(this.config.storage.azureShare, this.config.storage.azureAccount);
const keyValutConfig: KeyVaultConfig = new KeyVaultConfig(this.config.storage.keyVaultName, this.config.storage.keyVaultKey);
const azureKubeflowClusterConfig: FrameworkControllerClusterConfigAzure = new FrameworkControllerClusterConfigAzure(
this.config.namespace, this.config.apiVersion, keyValutConfig, azureStorage);
this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare;
this.createStoragePromise = this.createAzureStorage(
azureKubeflowClusterConfig.keyVault.vaultName,
azureKubeflowClusterConfig.keyVault.name
);
} else if (this.config.storage.storageType === 'nfs') {
if (this.config.storage.server === undefined ||
this.config.storage.path === undefined) {
throw new Error("NFS storage configuration error!");
}
this.createStoragePromise = this.createNFSStorage(
this.config.storage.server,
this.config.storage.path
);
}
}
public get environmentMaintenceLoopInterval(): number {
return 5000;
}
public get hasStorageService(): boolean {
return false;
}
public get getName(): string {
return 'frameworkcontroller';
}
public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
if (this.kubernetesCRDClient === undefined) {
throw new Error("kubernetesCRDClient not initialized!");
}
if (this.createStoragePromise) {
await this.createStoragePromise;
}
let configTaskRoles: any = undefined;
configTaskRoles = this.config.taskRoles;
//Generate the port used for taskRole
this.generateContainerPort(configTaskRoles);
const expFolder = `${this.CONTAINER_MOUNT_PATH}/nni/${this.experimentId}`;
environment.command = `cd ${expFolder} && ${environment.command} \
1>${expFolder}/envs/${environment.id}/trialrunner_stdout 2>${expFolder}/envs/${environment.id}/trialrunner_stderr`;
if (this.config.deprecated && this.config.deprecated.useActiveGpu !== undefined) {
environment.useActiveGpu = this.config.deprecated.useActiveGpu;
}
environment.maxTrialNumberPerGpu = this.config.maxTrialNumberPerGpu;
const frameworkcontrollerJobName: string = `nniexp${this.experimentId}env${environment.id}`.toLowerCase();
const command = this.generateCommandScript(this.config.taskRoles, environment.command);
await fs.promises.writeFile(path.join(this.environmentLocalTempFolder, "run.sh"), command, { encoding: 'utf8' });
//upload script files to sotrage
const trialJobOutputUrl: string = await this.uploadFolder(this.environmentLocalTempFolder, `nni/${this.experimentId}`);
environment.trackingUrl = trialJobOutputUrl;
// Generate kubeflow job resource config object
const frameworkcontrollerJobConfig: any = await this.prepareFrameworkControllerConfig(
environment.id,
this.environmentWorkingFolder,
frameworkcontrollerJobName
);
// Create kubeflow job based on generated kubeflow job resource config
await this.kubernetesCRDClient.createKubernetesJob(frameworkcontrollerJobConfig);
}
/**
* upload local folder to nfs or azureStroage
*/
private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.config.storage.storageType === 'azureStorage') {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, 2);
} else {
// do not need to upload files to nfs server, temp folder already mounted to nfs
return `nfs://${this.config.storage.server}:${destDirectory}`;
}
}
/**
* generate trial's command for frameworkcontroller
* expose port and execute injector.sh before executing user's command
* @param command
*/
private generateCommandScript(taskRoles: FrameworkControllerTaskRoleConfig[], command: string): string {
let portScript: string = '';
for (const taskRole of taskRoles) {
portScript += `FB_${taskRole.name.toUpperCase()}_PORT=${this.fcContainerPortMap.get(
taskRole.name
)} `;
}
return `${portScript} . /mnt/frameworkbarrier/injector.sh && ${command}`;
}
private async prepareFrameworkControllerConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string):
Promise<any> {
const podResources: any = [];
for (const taskRole of this.config.taskRoles) {
const resource: any = {};
resource.requests = this.generatePodResource(taskRole.memorySize, taskRole.cpuNumber, taskRole.gpuNumber);
resource.limits = {...resource.requests};
podResources.push(resource);
}
// Generate frameworkcontroller job resource config object
const frameworkcontrollerJobConfig: any =
await this.generateFrameworkControllerJobConfig(trialJobId, trialWorkingFolder, frameworkcontrollerJobName, podResources);
return Promise.resolve(frameworkcontrollerJobConfig);
}
private generateContainerPort(taskRoles: FrameworkControllerTrialConfigTemplate[]): void {
if (taskRoles === undefined) {
throw new Error('frameworkcontroller trial config is not initialized');
}
let port: number = 4000; //The default port used in container
for (const index of taskRoles.keys()) {
this.fcContainerPortMap.set(taskRoles[index].name, port);
port += 1;
}
}
/**
* Generate frameworkcontroller resource config file
* @param trialJobId trial job id
* @param trialWorkingFolder working folder
* @param frameworkcontrollerJobName job name
* @param podResources pod template
*/
private async generateFrameworkControllerJobConfig(trialJobId: string, trialWorkingFolder: string,
frameworkcontrollerJobName: string, podResources: any): Promise<any> {
const taskRoles: any = [];
for (const index of this.config.taskRoles.keys()) {
const containerPort: number | undefined = this.fcContainerPortMap.get(this.config.taskRoles[index].name);
if (containerPort === undefined) {
throw new Error('Container port is not initialized');
}
const taskRole: any = this.generateTaskRoleConfig(
trialWorkingFolder,
this.config.taskRoles[index].dockerImage,
`run.sh`,
podResources[index],
containerPort,
await this.createRegistrySecret(this.config.taskRoles[index].privateRegistryAuthPath)
);
taskRoles.push({
name: this.config.taskRoles[index].name,
taskNumber: this.config.taskRoles[index].taskNumber,
frameworkAttemptCompletionPolicy: {
minFailedTaskCount: this.config.taskRoles[index].frameworkAttemptCompletionPolicy.minFailedTaskCount,
minSucceededTaskCount: this.config.taskRoles[index].frameworkAttemptCompletionPolicy.minSucceedTaskCount
},
task: taskRole
});
}
return Promise.resolve({
apiVersion: `frameworkcontroller.microsoft.com/v1`,
kind: 'Framework',
metadata: {
name: frameworkcontrollerJobName,
namespace: this.config.namespace ? this.config.namespace : "default",
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: this.experimentId,
trialId: trialJobId
}
},
spec: {
executionType: 'Start',
taskRoles: taskRoles
}
});
}
private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string,
podResources: any, containerPort: number, privateRegistrySecretName: string | undefined): any {
const volumeSpecMap: Map<string, object> = new Map<string, object>();
if (this.config.storage.storageType === 'azureStorage') {
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
azureFile: {
secretName: `${this.azureStorageSecretName}`,
shareName: `${this.azureStorageShare}`,
readonly: false
}
}, {
name: 'frameworkbarrier-volume',
emptyDir: {}
}]);
} else {
volumeSpecMap.set('nniVolumes', [
{
name: 'nni-vol',
nfs: {
server: `${this.config.storage.server}`,
path: `${this.config.storage.path}`
}
}, {
name: 'frameworkbarrier-volume',
emptyDir: {}
}]);
}
const containers: any = [
{
name: 'framework',
image: replicaImage,
command: ['sh', `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [
{
name: 'nni-vol',
mountPath: this.CONTAINER_MOUNT_PATH
}, {
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}],
resources: podResources,
ports: [{
containerPort: containerPort
}]
}];
const initContainers: any = [
{
name: 'frameworkbarrier',
image: 'frameworkcontroller/frameworkbarrier',
volumeMounts: [
{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}]
}];
const spec: any = {
containers: containers,
initContainers: initContainers,
restartPolicy: 'OnFailure',
volumes: volumeSpecMap.get('nniVolumes'),
hostNetwork: false
};
if (privateRegistrySecretName) {
spec.imagePullSecrets = [
{
name: privateRegistrySecretName
}
]
}
if (this.config.serviceAccountName !== undefined) {
spec.serviceAccountName = this.config.serviceAccountName;
}
return {
pod: {
spec: spec
}
};
}
public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
environments.forEach(async (environment) => {
if (this.kubernetesCRDClient === undefined) {
throw new Error("kubernetesCRDClient undefined")
}
const kubeflowJobName: string = `nniexp${this.experimentId}env${environment.id}`.toLowerCase();
const kubernetesJobInfo = await this.kubernetesCRDClient.getKubernetesJob(kubeflowJobName);
if (kubernetesJobInfo.status && kubernetesJobInfo.status.state) {
const frameworkJobType: FrameworkControllerJobStatus = <FrameworkControllerJobStatus>kubernetesJobInfo.status.state;
/* eslint-disable require-atomic-updates */
switch (frameworkJobType) {
case 'AttemptCreationPending':
case 'AttemptCreationRequested':
case 'AttemptPreparing':
environment.setStatus('WAITING');
break;
case 'AttemptRunning':
environment.setStatus('RUNNING');
break;
case 'Completed': {
const completedJobType: FrameworkControllerJobCompleteStatus =
<FrameworkControllerJobCompleteStatus>kubernetesJobInfo.status.attemptStatus.completionStatus.type.name;
switch (completedJobType) {
case 'Succeeded':
environment.setStatus('SUCCEEDED');
break;
case 'Failed':
environment.setStatus('FAILED');
break;
default:
}
break;
}
default:
}
/* eslint-enable require-atomic-updates */
}
});
}
}
......@@ -81,13 +81,13 @@ export class KubeflowEnvironmentService extends KubernetesEnvironmentService {
}
const expFolder = `${this.CONTAINER_MOUNT_PATH}/nni/${this.experimentId}`;
environment.command = `cd ${expFolder} && ${environment.command} \
1>${expFolder}/envs/${environment.id}/trialrunner_stdout 2>${expFolder}/envs/${environment.id}/trialrunner_stderr`;
1>${expFolder}/envs/${environment.id}/trialrunner_stdout 2>${expFolder}/envs/${environment.id}/trialrunner_stderr`;
if (this.config.deprecated && this.config.deprecated.useActiveGpu !== undefined) {
environment.useActiveGpu = this.config.deprecated.useActiveGpu;
}
environment.maxTrialNumberPerGpu = this.config.maxTrialNumberPerGpu;
const kubeflowJobName: string = `nni-exp-${this.experimentId}-env-${environment.id}`.toLowerCase();
const kubeflowJobName: string = `nniexp${this.experimentId}env${environment.id}`.toLowerCase();
await fs.promises.writeFile(path.join(this.environmentLocalTempFolder, "run.sh"), environment.command, { encoding: 'utf8' });
......@@ -144,19 +144,6 @@ export class KubeflowEnvironmentService extends KubernetesEnvironmentService {
return Promise.resolve(kubeflowJobConfig);
}
public generatePodResource(memory: number, cpuNum: number, gpuNum: number): any {
const resources: any = {
memory: `${memory}Mi`,
cpu: `${cpuNum}`
};
if (gpuNum !== 0) {
resources['nvidia.com/gpu'] = `${gpuNum}`;
}
return resources;
}
/**
* Generate kubeflow resource config file
* @param kubeflowJobName job name
......
......@@ -208,7 +208,7 @@ export class KubernetesEnvironmentService extends EnvironmentService {
if (this.kubernetesCRDClient === undefined) {
throw new Error("kubernetesCRDClient undefined")
}
const kubeflowJobName: string = `nni-exp-${this.experimentId}-env-${environment.id}`.toLowerCase();
const kubeflowJobName: string = `nniexp${this.experimentId}env${environment.id}`.toLowerCase();
const kubernetesJobInfo = await this.kubernetesCRDClient.getKubernetesJob(kubeflowJobName);
if (kubernetesJobInfo.status && kubernetesJobInfo.status.conditions) {
const latestCondition: any = kubernetesJobInfo.status.conditions[kubernetesJobInfo.status.conditions.length - 1];
......@@ -255,4 +255,17 @@ export class KubernetesEnvironmentService extends EnvironmentService {
return Promise.reject(errorMessage);
}
}
public generatePodResource(memory: number, cpuNum: number, gpuNum: number): any {
const resources: any = {
memory: `${memory}Mi`,
cpu: `${cpuNum}`
};
if (gpuNum !== 0) {
resources['nvidia.com/gpu'] = `${gpuNum}`;
}
return resources;
}
}
......@@ -3,12 +3,13 @@
import { getLogger, Logger } from 'common/log';
import { MethodNotImplementedError } from 'common/errors';
import { ExperimentConfig, RemoteConfig, OpenpaiConfig, KubeflowConfig } from 'common/experimentConfig';
import { ExperimentConfig, RemoteConfig, OpenpaiConfig, KubeflowConfig, FrameworkControllerConfig } from 'common/experimentConfig';
import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric } from 'common/trainingService';
import { delay } from 'common/utils';
import { PAITrainingService } from '../pai/paiTrainingService';
import { RemoteMachineTrainingService } from '../remote_machine/remoteMachineTrainingService';
import { KubeflowTrainingService } from '../kubernetes/kubeflow/kubeflowTrainingService';
import { FrameworkControllerTrainingService } from '../kubernetes/frameworkcontroller/frameworkcontrollerTrainingService';
import { TrialDispatcher } from './trialDispatcher';
......@@ -30,6 +31,8 @@ class RouterTrainingService implements TrainingService {
instance.internalTrainingService = new PAITrainingService(config);
} else if (platform === 'kubeflow' && (<KubeflowConfig>config.trainingService).reuseMode === false) {
instance.internalTrainingService = new KubeflowTrainingService();
} else if (platform === 'frameworkcontroller' && (<FrameworkControllerConfig>config.trainingService).reuseMode === false) {
instance.internalTrainingService = new FrameworkControllerTrainingService();
} else {
instance.internalTrainingService = await TrialDispatcher.construct(config);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment