Unverified Commit b8e4918b authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support distributed job for frameworkcontroller (#612)

support distributed job for frameworkcontroller
parent 043fb758
...@@ -47,12 +47,13 @@ import { FrameworkControllerJobInfoCollector } from './frameworkcontrollerJobInf ...@@ -47,12 +47,13 @@ import { FrameworkControllerJobInfoCollector } from './frameworkcontrollerJobInf
*/ */
@component.Singleton @component.Singleton
class FrameworkControllerTrainingService extends KubernetesTrainingService implements KubernetesTrainingService { class FrameworkControllerTrainingService extends KubernetesTrainingService implements KubernetesTrainingService {
private frameworkcontrollerTrialConfig?: FrameworkControllerTrialConfig; private fcTrialConfig?: FrameworkControllerTrialConfig; // frameworkcontroller trial configuration
private frameworkcontrollerJobInfoCollector: FrameworkControllerJobInfoCollector; private fcJobInfoCollector: FrameworkControllerJobInfoCollector; // frameworkcontroller job info collector
private fcContainerPortMap = new Map<string, number>(); // store frameworkcontroller container port
constructor() { constructor() {
super(); super();
this.frameworkcontrollerJobInfoCollector = new FrameworkControllerJobInfoCollector(this.trialJobsMap); this.fcJobInfoCollector = new FrameworkControllerJobInfoCollector(this.trialJobsMap);
this.experimentId = getExperimentId(); this.experimentId = getExperimentId();
this.nextTrialSequenceId = -1; this.nextTrialSequenceId = -1;
} }
...@@ -67,7 +68,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -67,7 +68,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
while (!this.stopping) { while (!this.stopping) {
// collect metrics for frameworkcontroller jobs by interacting with Kubernetes API server // collect metrics for frameworkcontroller jobs by interacting with Kubernetes API server
await delay(3000); await delay(3000);
await this.frameworkcontrollerJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient); await this.fcJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
} }
} }
...@@ -90,7 +91,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -90,7 +91,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId); const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId); const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
const frameworkcontrollerJobName = `nniexp${this.experimentId}trial${trialJobId}`.toLowerCase(); const frameworkcontrollerJobName = `nniexp${this.experimentId}trial${trialJobId}`.toLowerCase();
//Generate the port used for taskRole
this.generateContainerPort();
await this.prepareRunScript(trialLocalTempFolder, curTrialSequenceId, trialJobId, trialWorkingFolder, form); await this.prepareRunScript(trialLocalTempFolder, curTrialSequenceId, trialJobId, trialWorkingFolder, form);
//upload code files //upload code files
...@@ -157,22 +159,38 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -157,22 +159,38 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
return Promise.resolve(trialJobOutputUrl); return Promise.resolve(trialJobOutputUrl);
} }
/**
* generate trial's command for frameworkcontroller
* expose port and execute injector.sh before executing user's command
* @param command
*/
private generateCommandScript(command: string): string {
let portScript = '';
if(!this.fcTrialConfig) {
throw new Error('frameworkcontroller trial config is not initialized');
}
for(let taskRole of this.fcTrialConfig.taskRoles) {
portScript += `${taskRole.name}_port=${this.fcContainerPortMap.get(taskRole.name)} `;
}
return `${portScript} . /mnt/frameworkbarrier/injector.sh && ${command}`;
}
private async prepareRunScript(trialLocalTempFolder: string, curTrialSequenceId: number, trialJobId: string, trialWorkingFolder: string, form: JobApplicationForm): Promise<void> { private async prepareRunScript(trialLocalTempFolder: string, curTrialSequenceId: number, trialJobId: string, trialWorkingFolder: string, form: JobApplicationForm): Promise<void> {
if(!this.frameworkcontrollerTrialConfig) { if(!this.fcTrialConfig) {
throw new Error('frameworkcontroller trial config is not initialized'); throw new Error('frameworkcontroller trial config is not initialized');
} }
await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`); await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`);
await cpp.exec(`cp -r ${this.frameworkcontrollerTrialConfig.codeDir} ${trialLocalTempFolder}`); await cpp.exec(`cp -r ${this.fcTrialConfig.codeDir} ${trialLocalTempFolder}`);
const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT; const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files // Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' }); await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
// Create tmp trial working folder locally. // Create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${trialLocalTempFolder}`); await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
for(let taskRole of this.frameworkcontrollerTrialConfig.taskRoles) { for(let taskRole of this.fcTrialConfig.taskRoles) {
const runScriptContent: string = this.generateRunScript('frameworkcontroller', trialJobId, trialWorkingFolder, const runScriptContent: string = this.generateRunScript('frameworkcontroller', trialJobId, trialWorkingFolder,
taskRole.command, curTrialSequenceId.toString(), taskRole.name, taskRole.gpuNum); this.generateCommandScript(taskRole.command), curTrialSequenceId.toString(), taskRole.name, taskRole.gpuNum);
await fs.promises.writeFile(path.join(trialLocalTempFolder, `run_${taskRole.name}.sh`), runScriptContent, { encoding: 'utf8' }); await fs.promises.writeFile(path.join(trialLocalTempFolder, `run_${taskRole.name}.sh`), runScriptContent, { encoding: 'utf8' });
} }
...@@ -186,12 +204,12 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -186,12 +204,12 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
private async prepareFrameworkControllerConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string): Promise<any> { private async prepareFrameworkControllerConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string): Promise<any> {
if(!this.frameworkcontrollerTrialConfig) { if(!this.fcTrialConfig) {
throw new Error('frameworkcontroller trial config is not initialized'); throw new Error('frameworkcontroller trial config is not initialized');
} }
const podResources : any = []; const podResources : any = [];
for(let taskRole of this.frameworkcontrollerTrialConfig.taskRoles) { for(let taskRole of this.fcTrialConfig.taskRoles) {
let resource: any = {}; let resource: any = {};
resource.requests = this.generatePodResource(taskRole.memoryMB, taskRole.cpuNum, taskRole.gpuNum); resource.requests = this.generatePodResource(taskRole.memoryMB, taskRole.cpuNum, taskRole.gpuNum);
resource.limits = Object.assign({}, resource.requests); resource.limits = Object.assign({}, resource.requests);
...@@ -234,14 +252,14 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -234,14 +252,14 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
case TrialConfigMetadataKey.TRIAL_CONFIG: case TrialConfigMetadataKey.TRIAL_CONFIG:
let frameworkcontrollerTrialJsonObjsect = JSON.parse(value); let frameworkcontrollerTrialJsonObjsect = JSON.parse(value);
this.frameworkcontrollerTrialConfig = new FrameworkControllerTrialConfig( this.fcTrialConfig = new FrameworkControllerTrialConfig(
frameworkcontrollerTrialJsonObjsect.codeDir, frameworkcontrollerTrialJsonObjsect.codeDir,
frameworkcontrollerTrialJsonObjsect.taskRoles frameworkcontrollerTrialJsonObjsect.taskRoles
); );
// Validate to make sure codeDir doesn't have too many files // Validate to make sure codeDir doesn't have too many files
try { try {
await validateCodeDir(this.frameworkcontrollerTrialConfig.codeDir); await validateCodeDir(this.fcTrialConfig.codeDir);
} catch(error) { } catch(error) {
this.log.error(error); this.log.error(error);
return Promise.reject(new Error(error)); return Promise.reject(new Error(error));
...@@ -253,6 +271,18 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -253,6 +271,18 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
return Promise.resolve(); return Promise.resolve();
} }
private generateContainerPort() {
if(!this.fcTrialConfig) {
throw new Error('frameworkcontroller trial config is not initialized');
}
let port = 4000; //The default port used in container
for(let index in this.fcTrialConfig.taskRoles) {
this.fcContainerPortMap.set(this.fcTrialConfig.taskRoles[index].name, port);
port += 1;
}
}
/** /**
* Generate frameworkcontroller resource config file * Generate frameworkcontroller resource config file
...@@ -266,24 +296,29 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -266,24 +296,29 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
throw new Error('frameworkcontroller Cluster config is not initialized'); throw new Error('frameworkcontroller Cluster config is not initialized');
} }
if(!this.frameworkcontrollerTrialConfig) { if(!this.fcTrialConfig) {
throw new Error('frameworkcontroller trial config is not initialized'); throw new Error('frameworkcontroller trial config is not initialized');
} }
let taskRoles = []; let taskRoles = [];
for(let index in this.frameworkcontrollerTrialConfig.taskRoles) { for(let index in this.fcTrialConfig.taskRoles) {
let containerPort = this.fcContainerPortMap.get(this.fcTrialConfig.taskRoles[index].name);
if(!containerPort) {
throw new Error('Container port is not initialized');
}
let taskRole = this.generateTaskRoleConfig( let taskRole = this.generateTaskRoleConfig(
trialWorkingFolder, trialWorkingFolder,
this.frameworkcontrollerTrialConfig.taskRoles[index].image, this.fcTrialConfig.taskRoles[index].image,
`run_${this.frameworkcontrollerTrialConfig.taskRoles[index].name}.sh`, `run_${this.fcTrialConfig.taskRoles[index].name}.sh`,
podResources[index] podResources[index],
containerPort
); );
taskRoles.push({ taskRoles.push({
name: this.frameworkcontrollerTrialConfig.taskRoles[index].name, name: this.fcTrialConfig.taskRoles[index].name,
taskNumber: this.frameworkcontrollerTrialConfig.taskRoles[index].taskNum, taskNumber: this.fcTrialConfig.taskRoles[index].taskNum,
frameworkAttemptCompletionPolicy: { frameworkAttemptCompletionPolicy: {
minFailedTaskCount: this.frameworkcontrollerTrialConfig.taskRoles[index].frameworkAttemptCompletionPolicy.minFailedTaskCount, minFailedTaskCount: this.fcTrialConfig.taskRoles[index].frameworkAttemptCompletionPolicy.minFailedTaskCount,
minSucceededTaskCount: this.frameworkcontrollerTrialConfig.taskRoles[index].frameworkAttemptCompletionPolicy.minSucceededTaskCount minSucceededTaskCount: this.fcTrialConfig.taskRoles[index].frameworkAttemptCompletionPolicy.minSucceededTaskCount
}, },
task: taskRole task: taskRole
}); });
...@@ -308,12 +343,14 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -308,12 +343,14 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
}; };
} }
private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string, podResources: any): any {
private generateTaskRoleConfig(trialWorkingFolder: string, replicaImage: string, runScriptFile: string, podResources: any, containerPort: number): any {
if(!this.kubernetesClusterConfig) { if(!this.kubernetesClusterConfig) {
throw new Error('frameworkcontroller Cluster config is not initialized'); throw new Error('frameworkcontroller Cluster config is not initialized');
} }
if(!this.frameworkcontrollerTrialConfig) { if(!this.fcTrialConfig) {
throw new Error('frameworkcontroller trial config is not initialized'); throw new Error('frameworkcontroller trial config is not initialized');
} }
...@@ -327,6 +364,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -327,6 +364,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
shareName: `${this.azureStorageShare}`, shareName: `${this.azureStorageShare}`,
readonly: false readonly: false
} }
}, {
name: 'frameworkbarrier-volume',
emptyDir: {}
}]) }])
}else { }else {
let frameworkcontrollerClusterConfigNFS: KubernetesClusterConfigNFS = <KubernetesClusterConfigNFS> this.kubernetesClusterConfig; let frameworkcontrollerClusterConfigNFS: KubernetesClusterConfigNFS = <KubernetesClusterConfigNFS> this.kubernetesClusterConfig;
...@@ -337,9 +377,11 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -337,9 +377,11 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
server: `${frameworkcontrollerClusterConfigNFS.nfs.server}`, server: `${frameworkcontrollerClusterConfigNFS.nfs.server}`,
path: `${frameworkcontrollerClusterConfigNFS.nfs.path}` path: `${frameworkcontrollerClusterConfigNFS.nfs.path}`
} }
}, {
name: 'frameworkbarrier-volume',
emptyDir: {}
}]) }])
} }
let taskRole = { let taskRole = {
pod: { pod: {
spec: { spec: {
...@@ -347,16 +389,33 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple ...@@ -347,16 +389,33 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
{ {
name: 'framework', name: 'framework',
image: replicaImage, image: replicaImage,
args: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`], command: ["sh", `${path.join(trialWorkingFolder, runScriptFile)}`],
volumeMounts: [ volumeMounts: [
{ {
name: 'nni-vol', name: 'nni-vol',
mountPath: this.CONTAINER_MOUNT_PATH mountPath: this.CONTAINER_MOUNT_PATH
},{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}], }],
resources: podResources resources: podResources,
ports: [{
containerPort: containerPort
}]
}],
initContainers: [
{
name: 'frameworkbarrier',
image: 'frameworkcontroller/frameworkbarrier',
volumeMounts: [
{
name: 'frameworkbarrier-volume',
mountPath: '/mnt/frameworkbarrier'
}]
}], }],
restartPolicy: 'OnFailure', restartPolicy: 'OnFailure',
volumes: volumeSpecMap.get('nniVolumes') volumes: volumeSpecMap.get('nniVolumes'),
hostNetwork: false
} }
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment