Unverified Commit 055885d9 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Merge dev-adl2 into Master (#3117)

parent 2c5d89a7
......@@ -42,6 +42,7 @@ interface TrialJobDetail {
readonly workingDirectory: string;
readonly form: TrialJobApplicationForm;
isEarlyStopped?: boolean;
message?: string;
}
/**
......
{
"apiVersion": "apiextensions.k8s.io/v1beta1",
"kind": "CustomResourceDefinition",
"metadata": {
"name": "adaptdljobs.adaptdl.petuum.com"
},
"spec": {
"group": "adaptdl.petuum.com",
"version": "v1",
"scope": "Namespaced",
"names": {
"plural": "adaptdljobs",
"singular": "adaptdljob",
"kind": "AdaptDLJob"
}
}
}
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "<name>",
"ownerReferences": [
{
"apiVersion": "adaptdl.petuum.com/v1",
"kind": "AdaptDLJob",
"name": "<adaptdljob_name>",
"uid": "<adaptdljob_uid>"
}
]
},
"data": {
"run.sh": "<run_script>",
"cleanup.sh": "<clean_script>"
}
}
{
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": "<name>",
"ownerReferences": [
{
"apiVersion": "adaptdl.petuum.com/v1",
"kind": "AdaptDLJob",
"name": "<adaptdljob_name>",
"uid": "<adaptdljob_uid>"
}
]
},
"spec": {
"accessModes": [
"ReadWriteMany"
],
"resources": {
"requests": {
"storage": "<storage_size>"
}
},
"storageClassName": "<storage_class>",
"volumeMode": "Filesystem"
}
}
{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "<name>",
"labels": {
"expId": "<exp_id>"
}
},
"spec": {
"selector": {
"matchLabels": {
"app": "<name>"
}
},
"replicas": 1,
"template": {
"metadata": {
"labels": {
"app": "<name>"
}
},
"spec": {
"containers": [
{
"command": ["tensorboard"],
"args": ["--host=0.0.0.0", "--logdir=/adaptdl/tensorboard", "--port=6006"],
"image": "tensorflow/tensorflow",
"name": "tensorboard",
"ports": [
{
"containerPort": 6006
}
],
"volumeMounts": [
{
"mountPath": "/adaptdl/tensorboard",
"name": "adaptdl-tensorboard-pvc",
"subPath": "adaptdl/tensorboard"
}
]
}
],
"volumes": [
{
"name": "adaptdl-tensorboard-pvc",
"persistentVolumeClaim": {
"claimName": "<adaptdl_tensorflow_pvc_name>"
}
}
]
}
}
}
}
\ No newline at end of file
{
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": "<name>",
"ownerReferences": [
{
"apiVersion": "apps/v1",
"kind": "Deployment",
"name": "<adaptdl_tensorboard_name>",
"uid": "<adaptdl_tensorboard_uid>"
}
]
},
"spec": {
"accessModes": [
"ReadWriteMany"
],
"resources": {
"requests": {
"storage": "<storage_size>"
}
},
"storageClassName": "<storage_class>",
"volumeMode": "Filesystem"
}
}
{
"apiVersion": "adaptdl.petuum.com/v1",
"kind": "AdaptDLJob",
"metadata": {
"name": "<name>",
"labels": {
"app": "<app_name>",
"expId": "<exp_id>",
"trialId": "<trial_id>"
}
},
"spec": {
"preemptible": false,
"template": {
"spec": {
"containers": [
{
"lifecycle":
{
"preStop":
{
"exec":
{
"command": ["/cleanup.sh"]
}
}
},
"command": ["/run.sh"],
"env": [
{
"name": "ADAPTDL_CHECKPOINT_PATH",
"value": "/adaptdl/checkpoint"
},
{
"name": "ADAPTDL_TENSORBOARD_LOGDIR",
"value": "/adaptdl/tensorboard"
},
{
"name": "ADAPTDL_SHARE_PATH",
"value": "/adaptdl/share"
}
],
"image": "<image>",
"imagePullPolicy": "Always",
"name": "main",
"resources": {
"requests": {
"memory": "<memorySize>",
"cpu": "<cpuNum>"
},
"limits": {
"nvidia.com/gpu": 1
}
},
"volumeMounts": [
{
"mountPath": "/adaptdl/checkpoint",
"name": "adaptdl-pvc",
"subPath": "adaptdl/checkpoint"
},
{
"mountPath": "/adaptdl/share",
"name": "adaptdl-pvc",
"subPath": "adaptdl/share"
},
{
"mountPath": "/adaptdl/tensorboard",
"name": "adaptdl-tensorboard-pvc",
"subPath": "adaptdl/tensorboard"
},
{
"mountPath": "/cleanup.sh",
"name": "adaptdl-nni-configmap",
"subPath": "cleanup.sh"
},
{
"mountPath": "/run.sh",
"name": "adaptdl-nni-configmap",
"subPath": "run.sh"
}
]
}
],
"imagePullSecrets": [],
"volumes": [
{
"name": "adaptdl-pvc",
"persistentVolumeClaim": {
"claimName": "<adaptdl_pvc_name>"
}
},
{
"name": "adaptdl-tensorboard-pvc",
"persistentVolumeClaim": {
"claimName": "<adaptdl_tensorflow_pvc_name>"
}
},
{
"name": "adaptdl-nni-configmap",
"configMap": {
"name": "<adaptdl_nni_configmap_name>",
"defaultMode": 511
}
}
]
}
}
}
}
......@@ -345,6 +345,14 @@ class NNIManager implements Manager {
return this.status;
}
public getTrialJobMessage(trialJobId: string): string | undefined {
const trialJob = this.trialJobs.get(trialJobId);
if (trialJob !== undefined){
return trialJob.message
}
return undefined
}
public async listTrialJobs(status?: TrialJobStatus): Promise<TrialJobInfo[]> {
return this.dataStore.listTrialJobs(status);
}
......@@ -501,6 +509,10 @@ class NNIManager implements Manager {
this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
}
const newTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
if (newTrialJobDetail !== undefined) {
newTrialJobDetail.message = trialJobDetail.message;
}
let hyperParams: string | undefined = undefined;
switch (trialJobDetail.status) {
case 'SUCCEEDED':
......
......@@ -19,6 +19,7 @@ import { NNIManager } from './core/nnimanager';
import { SqlDB } from './core/sqlDatabase';
import { NNIRestServer } from './rest_server/nniRestServer';
import { FrameworkControllerTrainingService } from './training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService';
import { AdlTrainingService } from './training_service/kubernetes/adl/adlTrainingService';
import { KubeflowTrainingService } from './training_service/kubernetes/kubeflow/kubeflowTrainingService';
import { LocalTrainingService } from './training_service/local/localTrainingService';
import { RouterTrainingService } from './training_service/reusable/routerTrainingService';
......@@ -34,7 +35,11 @@ function initStartupInfo(
}
async function initContainer(foreground: boolean, platformMode: string, logFileName?: string): Promise<void> {
if (platformMode === 'local') {
if (platformMode === 'adl') {
Container.bind(TrainingService)
.to(AdlTrainingService)
.scope(Scope.Singleton);
} else if (platformMode === 'local') {
Container.bind(TrainingService)
.to(LocalTrainingService)
.scope(Scope.Singleton);
......@@ -94,7 +99,7 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN
function usage(): void {
console.info('usage: node main.js --port <port> --mode \
<local/remote/pai/kubeflow/frameworkcontroller/paiYarn/aml> --start_mode <new/resume> --experiment_id <id> --foreground <true/false>');
<adl/local/remote/pai/kubeflow/frameworkcontroller/paiYarn/aml> --start_mode <new/resume> --experiment_id <id> --foreground <true/false>');
}
const strPort: string = parseArg(['--port', '-p']);
......@@ -114,7 +119,7 @@ const foreground: boolean = foregroundArg.toLowerCase() === 'true' ? true : fals
const port: number = parseInt(strPort, 10);
const mode: string = parseArg(['--mode', '-m']);
if (!['local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml'].includes(mode)) {
if (!['adl', 'local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml'].includes(mode)) {
console.log(`FATAL: unknown mode: ${mode}`);
usage();
process.exit(1);
......
......@@ -15,12 +15,13 @@ import { ExperimentProfile, Manager, TrialJobStatistics } from '../common/manage
import { ValidationSchemas } from './restValidationSchemas';
import { NNIRestServer } from './nniRestServer';
import { getVersion } from '../common/utils';
import { NNIManager } from "../core/nnimanager";
const expressJoi = require('express-joi-validator');
class NNIRestHandler {
private restServer: NNIRestServer;
private nniManager: Manager;
private nniManager: NNIManager;
private log: Logger;
constructor(rs: NNIRestServer) {
......@@ -209,6 +210,7 @@ class NNIRestHandler {
this.nniManager.listTrialJobs(req.query.status).then((jobInfos: TrialJobInfo[]) => {
jobInfos.forEach((trialJob: TrialJobInfo) => {
this.setErrorPathForFailedJob(trialJob);
this.setMessageforJob(trialJob);
});
res.send(jobInfos);
}).catch((err: Error) => {
......@@ -221,6 +223,7 @@ class NNIRestHandler {
router.get('/trial-jobs/:id', (req: Request, res: Response) => {
this.nniManager.getTrialJob(req.params.id).then((jobDetail: TrialJobInfo) => {
const jobInfo: TrialJobInfo = this.setErrorPathForFailedJob(jobDetail);
this.setMessageforJob(jobInfo);
res.send(jobInfo);
}).catch((err: Error) => {
this.handleError(err, res);
......@@ -311,6 +314,14 @@ class NNIRestHandler {
return jobInfo;
}
private setMessageforJob(jobInfo: TrialJobInfo): TrialJobInfo {
if (jobInfo === undefined){
return jobInfo
}
jobInfo.message = this.nniManager.getTrialJobMessage(jobInfo.trialJobId);
return jobInfo
}
}
export function createRestHandler(rs: NNIRestServer): Router {
......
......@@ -32,6 +32,9 @@ export namespace ValidationSchemas {
outputDir: joi.string(),
cpuNum: joi.number().min(1),
memoryMB: joi.number().min(100),
// ############## adl cpu and memory config ###############
memorySize: joi.string(),
// ########################################################
gpuNum: joi.number().min(0),
command: joi.string().min(1),
virtualCluster: joi.string(),
......@@ -93,6 +96,20 @@ export namespace ValidationSchemas {
minFailedTaskCount: joi.number(),
minSucceededTaskCount: joi.number()
})
}),
imagePullSecrets: joi.array({
name: joi.string().min(1).required()
}),
// ############## adl ###############
adaptive: joi.boolean(),
checkpoint: joi.object({
storageClass: joi.string().min(1).required(),
storageSize: joi.string().min(1).required()
}),
nfs: joi.object({
server: joi.string().min(1).required(),
path: joi.string().min(1).required(),
containerMountPath: joi.string().min(1).required()
})
}),
pai_yarn_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
......
......@@ -110,6 +110,11 @@ export class MockedNNIManager extends Manager {
return deferred.promise;
}
public getTrialJobMessage(trialJobId: string): string | undefined {
return "TEST-MESSAGE"
}
public stopExperiment(): Promise<void> {
throw new MethodNotImplementedError();
}
......
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import { GeneralK8sClient, KubernetesCRDClient } from '../kubernetesApiClient';
/**
* Adl ClientV1
*/
class AdlClientV1 extends KubernetesCRDClient {
/**
* constructor, to initialize adl CRD definition
*/
public constructor() {
super();
this.crdSchema = JSON.parse(fs.readFileSync('./config/adl/adaptdl-crd-v1.json', 'utf8'));
this.client.addCustomResourceDefinition(this.crdSchema);
}
protected get operator(): any {
return this.client.apis['adaptdl.petuum.com'].v1.namespaces('default').adaptdljobs;
}
public get containerName(): string {
return 'main';
}
public async getKubernetesPods(jobName: string): Promise<any> {
let result: Promise<any>;
const response = await this.client.api.v1.namespaces('default').pods
.get({ qs: { labelSelector: `adaptdl/job=${jobName}` } });
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(response.body);
} else {
result = Promise.reject(`AdlClient getKubernetesPods failed, statusCode is ${response.statusCode}`);
}
return result;
}
}
/**
* Adl Client
*/
class AdlClientFactory {
/**
* Factory method to generate operator client
*/
public static createClient(): KubernetesCRDClient {
return new AdlClientV1();
}
}
export { AdlClientFactory, GeneralK8sClient };
export { AdlClientV1 }
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import {KubernetesTrialConfig} from "../kubernetesConfig";
/**
* Checkpoint Config
*/
export class CheckpointConfig {
public readonly storageClass: string;
public readonly storageSize: string;
constructor(storageClass: string, storageSize: string) {
this.storageClass = storageClass;
this.storageSize = storageSize;
}
}
/**
* imagePullSecret Config
*/
export class ImagePullSecretConfig{
public readonly name: string;
constructor(name: string) {
this.name = name
}
}
/**
* NFS Config
*/
export class NFSConfig {
public readonly server: string;
public readonly path: string;
public readonly containerMountPath: string;
constructor(server: string, path: string, containerMountPath: string) {
this.server = server;
this.path = path;
this.containerMountPath = containerMountPath;
}
}
/**
* Trial job configuration for Adl
*/
export class AdlTrialConfig extends KubernetesTrialConfig {
public readonly command: string;
public readonly gpuNum: number;
public readonly image: string;
public readonly imagePullSecrets?: ImagePullSecretConfig[];
public readonly nfs?: NFSConfig;
public readonly checkpoint?: CheckpointConfig;
public readonly cpuNum?: number;
public readonly memorySize?: string;
public readonly adaptive?: boolean; // adaptive == preemptible
constructor(codeDir: string,
command: string, gpuNum: number,
image: string, imagePullSecrets?: ImagePullSecretConfig[],
nfs?: NFSConfig, checkpoint?: CheckpointConfig,
cpuNum?: number, memorySize?: string,
adaptive?: boolean
) {
super(codeDir);
this.command = command;
this.gpuNum = gpuNum;
this.image = image;
this.imagePullSecrets = imagePullSecrets;
this.nfs = nfs;
this.checkpoint = checkpoint;
this.cpuNum = cpuNum;
this.memorySize = memorySize;
this.adaptive = adaptive;
}
}
export type AdlJobStatus = "Pending" | "Running" | "Starting" | "Stopping" | "Failed" | "Succeeded";
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import { AdlClientV1 } from './adlApiClient';
import { KubernetesTrialJobDetail} from '../kubernetesData';
import { KubernetesJobInfoCollector } from '../kubernetesJobInfoCollector';
import { AdlJobStatus } from './adlConfig';
/**
* Collector Adl jobs info from Kubernetes cluster, and update adl job status locally
*/
export class AdlJobInfoCollector extends KubernetesJobInfoCollector {
constructor(jobMap: Map<string, KubernetesTrialJobDetail>) {
super(jobMap);
}
protected async retrieveSingleTrialJobInfo(kubernetesCRDClient: AdlClientV1 | undefined,
kubernetesTrialJob: KubernetesTrialJobDetail): Promise<void> {
if (!this.statusesNeedToCheck.includes(kubernetesTrialJob.status)) {
return Promise.resolve();
}
if (kubernetesCRDClient === undefined) {
return Promise.reject('kubernetesCRDClient is undefined');
}
let kubernetesJobInfo: any;
let kubernetesPodsInfo: any;
try {
kubernetesJobInfo = await kubernetesCRDClient.getKubernetesJob(kubernetesTrialJob.kubernetesJobName);
kubernetesPodsInfo = await kubernetesCRDClient.getKubernetesPods(kubernetesTrialJob.kubernetesJobName);
} catch (error) {
// Notice: it maynot be a 'real' error since cancel trial job can also cause getKubernetesJob failed.
this.log.error(`Get job ${kubernetesTrialJob.kubernetesJobName} info failed, error is ${error}`);
//This is not treat as a error status
return Promise.resolve();
}
/* eslint-disable require-atomic-updates */
if (kubernetesJobInfo.status) {
const phase: AdlJobStatus = <AdlJobStatus>kubernetesJobInfo.status.phase
switch (phase) {
case 'Pending':
case 'Starting':
kubernetesTrialJob.status = 'WAITING';
if (kubernetesPodsInfo.items.length > 0){
if (kubernetesPodsInfo.items[0].status.containerStatuses != undefined) {
const currState: any = kubernetesPodsInfo.items[0].status.containerStatuses[0].state
if (currState.waiting != undefined) {
const msg: string = currState.waiting.reason
if (msg == "ImagePullBackOff" || msg == "ErrImagePull") {
kubernetesTrialJob.status = 'FAILED';
}
}
}
kubernetesTrialJob.message = kubernetesPodsInfo.items
.map((pod: any) => JSON.stringify(pod.status.containerStatuses))
.join('\n');
}
kubernetesTrialJob.startTime = Date.parse(<string>kubernetesJobInfo.metadata.creationTimestamp);
break;
case 'Running':
case 'Stopping':
kubernetesTrialJob.status = 'RUNNING';
kubernetesTrialJob.message = `Use 'nnictl log trial --trial_id ${kubernetesTrialJob.id}' to check the log stream.`;
if (kubernetesTrialJob.startTime === undefined) {
kubernetesTrialJob.startTime = Date.parse(<string>kubernetesJobInfo.metadata.creationTimestamp);
}
break;
case 'Failed':
kubernetesTrialJob.status = 'FAILED';
kubernetesTrialJob.message = kubernetesJobInfo.status.message;
if (kubernetesPodsInfo.items.length > 0) {
kubernetesTrialJob.message += " ; ";
kubernetesTrialJob.message += `Use 'nnictl log trial --trial_id ${kubernetesTrialJob.id}' for the path of the collected logs.`;
}
// undefined => NaN as endTime here
kubernetesTrialJob.endTime = Date.parse(<string>kubernetesJobInfo.status.completionTimestamp);
break;
case 'Succeeded':
kubernetesTrialJob.status = 'SUCCEEDED';
kubernetesTrialJob.endTime = Date.parse(<string>kubernetesJobInfo.status.completionTimestamp);
kubernetesTrialJob.message = `Succeeded at ${kubernetesJobInfo.status.completionTimestamp}`
break;
default:
}
}
/* eslint-enable require-atomic-updates */
return Promise.resolve();
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as component from '../../../common/component';
import { KubernetesJobRestServer } from '../kubernetesJobRestServer';
import { AdlTrainingService } from './adlTrainingService';
/**
* Adl Training service Rest server, provides rest API to support adl job metrics update
*
*/
@component.Singleton
export class AdlJobRestServer extends KubernetesJobRestServer {
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor() {
super(component.get(AdlTrainingService));
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import * as component from '../../../common/component';
import { String } from 'typescript-string-operations';
import { getExperimentId } from '../../../common/experimentStartupInfo';
import {
NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
} from '../../../common/trainingService';
import { delay, generateParamFileName, getVersion, uniqueString } from '../../../common/utils';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { KubernetesTrialJobDetail } from '../kubernetesData';
import { KubernetesTrainingService } from '../kubernetesTrainingService';
import { AdlClientFactory } from './adlApiClient'
import { AdlJobInfoCollector } from './adlJobInfoCollector';
import { AdlJobRestServer } from './adlJobRestServer';
import { AdlTrialConfig } from './adlConfig'
/**
* Training Service implementation for Adl
*/
@component.Singleton
class AdlTrainingService extends KubernetesTrainingService implements KubernetesTrainingService {
private adlTrialConfig?: AdlTrialConfig;
private readonly adlJobInfoCollector: AdlJobInfoCollector;
private configmapTemplateStr: string;
private jobTemplateStr: string;
private pvcTemplateStr: string;
private tensorboardPvcTemplate: any;
private tensorboardDeploymentTemplate: any;
//TODO: change the logic here when we want to support multiple tensorboard
private tensorboardName: string = "adaptdl-tensorboard-" + getExperimentId().toLowerCase();
constructor() {
super();
this.adlJobInfoCollector = new AdlJobInfoCollector(this.trialJobsMap);
this.experimentId = getExperimentId();
this.kubernetesCRDClient = AdlClientFactory.createClient();
this.configmapTemplateStr = fs.readFileSync(
'./config/adl/adaptdl-nni-configmap-template.json', 'utf8');
this.jobTemplateStr = fs.readFileSync('./config/adl/adaptdljob-template.json', 'utf8');
this.pvcTemplateStr = fs.readFileSync('./config/adl/adaptdl-pvc-template.json', 'utf8');
this.tensorboardPvcTemplate = JSON.parse(
fs.readFileSync('./config/adl/adaptdl-tensorboard-pvc-template.json', 'utf8'));
this.tensorboardDeploymentTemplate = JSON.parse(
fs.readFileSync('./config/adl/adaptdl-tensorboard-deployment-template.json', 'utf8'));
this.log.info('Construct Adl training service.');
}
public async run(): Promise<void> {
this.log.info(this.tensorboardName);
this.log.info('Start tensorboard deployment.');
await this.launchTensorboard()
this.log.info('Run Adl training service.');
this.kubernetesJobRestServer = component.get(AdlJobRestServer);
if (this.kubernetesJobRestServer === undefined) {
throw new Error('kubernetesJobRestServer not initialized!');
}
await this.kubernetesJobRestServer.start();
this.kubernetesJobRestServer.setEnableVersionCheck = this.versionCheck;
this.log.info(`Adl Training service rest server listening on: ${this.kubernetesJobRestServer.endPoint}`);
while (!this.stopping) {
// collect metrics for Adl jobs by interacting with Kubernetes API server
await delay(3000);
await this.adlJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
if (this.kubernetesJobRestServer.getErrorMessage !== undefined) {
throw new Error(this.kubernetesJobRestServer.getErrorMessage);
}
}
this.log.info('Adl training service exit.');
}
private async launchTensorboard(): Promise<void> {
// Start the tensorboard at the beginning of the experiment.
if (this.adlTrialConfig === undefined) {
throw new Error('Adl trial config is undefined');
}
// Create tensorboard deployment
this.tensorboardDeploymentTemplate.metadata.name = this.tensorboardName
this.tensorboardDeploymentTemplate.metadata.labels.expId = this.experimentId
this.tensorboardDeploymentTemplate.spec.selector.matchLabels.app = this.tensorboardName
this.tensorboardDeploymentTemplate.spec.template.metadata.labels.app = this.tensorboardName
this.tensorboardDeploymentTemplate.spec.template.spec.volumes[0]
.persistentVolumeClaim.claimName = this.tensorboardName
const deploymentUid: string = await this.genericK8sClient.createDeployment(this.tensorboardDeploymentTemplate);
// Create pvc
this.tensorboardPvcTemplate.metadata.name = this.tensorboardName;
this.tensorboardPvcTemplate.metadata.ownerReferences[0].name = this.tensorboardName;
this.tensorboardPvcTemplate.metadata.ownerReferences[0].uid = deploymentUid
if (this.adlTrialConfig.checkpoint != undefined) {
this.tensorboardPvcTemplate.spec.resources.requests.storage = this.adlTrialConfig.checkpoint.storageSize;
this.tensorboardPvcTemplate.spec.storageClassName = this.adlTrialConfig.checkpoint.storageClass;
}
else {
this.tensorboardPvcTemplate.spec.resources.requests.storage = "1Gi"
this.tensorboardPvcTemplate.spec.storageClassName = await this.genericK8sClient.getStorageClass();
}
await this.genericK8sClient.createPersistentVolumeClaim(this.tensorboardPvcTemplate);
return Promise.resolve()
}
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
if (this.kubernetesCRDClient === undefined) {
throw new Error('Adl job operator client is undefined');
}
if (this.adlTrialConfig === undefined) {
throw new Error('Adl trial config is undefined');
}
if (this.kubernetesRestServerPort === undefined) {
const restServer: AdlJobRestServer = component.get(AdlJobRestServer);
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}
const trialJobId: string = uniqueString(5);
const adlJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const initStatus: TrialJobStatus = 'WAITING';
const codeDir = this.adlTrialConfig.codeDir;
const outputDir = "output"
const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
trialJobId,
initStatus,
Date.now(),
codeDir,
form,
adlJobName,
outputDir
);
// Create adljob
const job: any = JSON.parse(this.jobTemplateStr);
job.metadata.name = adlJobName
job.metadata.labels.app = this.NNI_KUBERNETES_TRIAL_LABEL
job.metadata.labels.expId = this.experimentId
job.metadata.labels.trialId = trialJobId
if (this.adlTrialConfig.adaptive !== undefined){
job.spec.preemptible = this.adlTrialConfig.adaptive
}
job.spec.template.spec.containers[0]
.image = this.adlTrialConfig.image;
job.spec.template.spec.volumes[0]
.persistentVolumeClaim.claimName = adlJobName
job.spec.template.spec.volumes[1]
.persistentVolumeClaim.claimName = this.tensorboardName
job.spec.template.spec.volumes[2]
.configMap.name = adlJobName
// Handle Pod Resource
let cpu: number = 1;
let memory: string = "1Gi";
if (this.adlTrialConfig.cpuNum !== undefined) {
cpu = this.adlTrialConfig.cpuNum;
}
if (this.adlTrialConfig.memorySize !== undefined) {
memory = this.adlTrialConfig.memorySize;
}
job.spec.template.spec.containers[0]
.resources.requests.memory = memory;
job.spec.template.spec.containers[0]
.resources.requests.cpu = cpu;
job.spec.template.spec.containers[0]
.resources.limits["nvidia.com/gpu"] = this.adlTrialConfig.gpuNum;
// Handle imagePullSecrets
if (this.adlTrialConfig.imagePullSecrets !== undefined) {
job.spec.template.spec.imagePullSecrets = job.spec.template.spec
.imagePullSecrets.concat(this.adlTrialConfig.imagePullSecrets);
}
// Handle NFS
if (this.adlTrialConfig.nfs !== undefined) {
job.spec.template.spec.volumes.push({
"name": "nfs",
"nfs": {
"server": this.adlTrialConfig.nfs.server,
"path": this.adlTrialConfig.nfs.path,
"readOnly": false
}
});
job.spec.template.spec.containers[0].volumeMounts.push({
"name": "nfs",
"mountPath": this.adlTrialConfig.nfs.containerMountPath
});
}
await this.kubernetesCRDClient.createKubernetesJob(job);
const k8sadlJob: any = await this.kubernetesCRDClient.getKubernetesJob(adlJobName);
// Create pvc
const pvc: any = JSON.parse(this.pvcTemplateStr);
pvc.metadata.name = adlJobName;
pvc.metadata.ownerReferences[0].name = adlJobName;
pvc.metadata.ownerReferences[0].uid = k8sadlJob.metadata.uid;
if (this.adlTrialConfig.checkpoint != undefined) {
pvc.spec.resources.requests.storage = this.adlTrialConfig
.checkpoint.storageSize;
pvc.spec.storageClassName = this.adlTrialConfig.checkpoint.storageClass;
}
else {
pvc.spec.resources.requests.storage = "1Gi"
pvc.spec.storageClassName = await this.genericK8sClient.getStorageClass();
}
await this.genericK8sClient.createPersistentVolumeClaim(pvc);
// prepare the runscript and convert it to configmap and mount it
const configmap: any = JSON.parse(this.configmapTemplateStr);
configmap.metadata.name = adlJobName;
configmap.metadata.ownerReferences[0].name = adlJobName;
configmap.metadata.ownerReferences[0].uid = k8sadlJob.metadata.uid;
configmap.data["run.sh"] = await this.prepareRunScript(
trialJobId, form, codeDir, outputDir)
const cleanupScriptTemplate: string =
`#!/bin/bash
ps aux | grep "python3 -m nni_trial_tool.trial_keeper" | awk '{print $2}' | xargs kill -2
while true;
do
proc=\`ps aux | grep "python3 -m nni_trial_tool.trial_keeper" | awk '{print $2}' | grep "" -c\`
if (( $proc == 1 )); then
exit 0
else
echo "waiting"
fi
sleep 1
done
`;
configmap.data["cleanup.sh"] = cleanupScriptTemplate
await this.genericK8sClient.createConfigMap(configmap)
// Set trial job detail until create Adl job successfully
this.trialJobsMap.set(trialJobId, trialJobDetail);
return Promise.resolve(trialJobDetail);
}
private async prepareRunScript(jobId: string,
form: TrialJobApplicationForm,
codeDir: string,
outputDir: string): Promise<string> {
if (this.adlTrialConfig === undefined) {
throw new Error('Adl trial config is undefined');
}
if (this.kubernetesRestServerPort === undefined) {
throw new Error('Adl rest server port is undefined');
}
if (this.nniManagerIpConfig === undefined) {
throw new Error('Adl nniManager ip config is undefined');
}
const expId: string = this.experimentId;
const seqId: string = form.sequenceId.toString();
const command: string = this.adlTrialConfig.command;
const hyperParameters: string = form.hyperParameters.value;
const hyperParametersFile: string = generateParamFileName(form.hyperParameters);
const nniManagerPort: string = this.kubernetesRestServerPort.toString();
const nniManagerIp: string = this.nniManagerIpConfig.nniManagerIp;
let nniManagerVersion: string = '';
if (this.versionCheck) {
nniManagerVersion = await getVersion();
}
let nvidiaScript: string = '';
if (this.adlTrialConfig.gpuNum == 0) {
nvidiaScript = 'export CUDA_VISIBLE_DEVICES=';
}
const runScriptTemplate: string =
`#!/bin/bash
export NNI_PLATFORM=adl
export MULTI_PHASE=false
export NNI_SYS_DIR={0}
export NNI_CODE_DIR={0}
export NNI_OUTPUT_DIR={1}
export NNI_TRIAL_JOB_ID={2}
export NNI_EXP_ID={3}
export NNI_TRIAL_SEQ_ID={4}
mkdir -p $NNI_OUTPUT_DIR
{5}
echo '{6}' > $NNI_CODE_DIR/{7}
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' \
--nnimanager_ip {9} --nnimanager_port {10} \
--nni_manager_version '{11}' --log_collection '{12}'
`;
const runScript = String.Format(
runScriptTemplate, codeDir, outputDir,
jobId, expId, seqId, nvidiaScript,
hyperParameters, hyperParametersFile, command,
nniManagerIp, nniManagerPort, nniManagerVersion,
this.logCollection);
return Promise.resolve(runScript);
}
public async setClusterMetadata(key: string, value: string): Promise<void> {
this.log.info('SetCluster ' + key + ', ' +value);
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
this.adlTrialConfig = <AdlTrialConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default:
}
return Promise.resolve();
}
public getClusterMetadata(key: string): Promise<string> {
let result: string;
switch (key) {
case TrialConfigMetadataKey.TRIAL_CONFIG:
if (this.adlTrialConfig === undefined) {
return Promise.reject(`${key} is not set yet`);
}
result = JSON.stringify(this.adlTrialConfig);
break;
case TrialConfigMetadataKey.NNI_MANAGER_IP:
if (this.nniManagerIpConfig === undefined) {
return Promise.reject(`${key} is not set yet`);
}
result = JSON.stringify(this.nniManagerIpConfig);
break;
default:
return Promise.reject(`${key} not set`);
}
return Promise.resolve(result);
}
}
export { AdlTrainingService };
......@@ -19,6 +19,94 @@ class GeneralK8sClient {
this.client.loadSpec();
}
private matchStorageClass(response: any): string {
const adlSupportedProvisioners: RegExp[] = [
new RegExp("microk8s.io/hostpath"),
new RegExp(".*cephfs.csi.ceph.com"),
new RegExp(".*azure.*"),
new RegExp("\\b" + "efs" + "\\b")
]
const templateLen = adlSupportedProvisioners.length,
responseLen = response.items.length
let i = 0,
j = 0;
for (; i < responseLen; i++) {
const provisioner: string = response.items[i].provisioner
for (; j < templateLen; j++) {
if (provisioner.match(adlSupportedProvisioners[j])) {
return response.items[i].metadata.name;
}
}
}
return "Not Found!";
}
public async getStorageClass(): Promise<string> {
let result: Promise<string>;
const response: any = await this.client.apis["storage.k8s.io"].v1beta1.storageclasses.get()
const storageClassType: string = this.matchStorageClass(response.body)
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
if (storageClassType != "Not Found!") {
result = Promise.resolve(storageClassType);
}
else {
result = Promise.reject("No StorageClasses are supported!")
}
} else {
result = Promise.reject(`List storageclasses failed, statusCode is ${response.statusCode}`);
}
return result;
}
public async createDeployment(deploymentManifest: any): Promise<string> {
let result: Promise<string>;
const response: any = await this.client.apis.apps.v1.namespaces('default').deployments.post({ body: deploymentManifest })
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(response.body.metadata.uid);
} else {
result = Promise.reject(`Create deployment failed, statusCode is ${response.statusCode}`);
}
return result;
}
public async deleteDeployment(deploymentName: string): Promise<boolean> {
let result: Promise<boolean>;
// TODO: change this hard coded deployment name after demo
const response: any = await this.client.apis.apps.v1.namespaces('default')
.deployment(deploymentName).delete();
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`Delete deployment failed, statusCode is ${response.statusCode}`);
}
return result;
}
public async createConfigMap(configMapManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response: any = await this.client.api.v1.namespaces('default')
.configmaps.post({body: configMapManifest});
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`Create configMap failed, statusCode is ${response.statusCode}`);
}
return result;
}
public async createPersistentVolumeClaim(pvcManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response: any = await this.client.api.v1.namespaces('default')
.persistentvolumeclaims.post({body: pvcManifest});
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`Create pvc failed, statusCode is ${response.statusCode}`);
}
return result;
}
public async createSecret(secretManifest: any): Promise<boolean> {
let result: Promise<boolean>;
const response: any = await this.client.api.v1.namespaces('default').secrets
......@@ -77,7 +165,7 @@ abstract class KubernetesCRDClient {
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(true);
} else {
result = Promise.reject(`Create kubernetes job failed, statusCode is ${response.statusCode}`);
result = Promise.reject(`KubernetesApiClient createKubernetesJob failed, statusCode is ${response.statusCode}`);
}
return result;
......@@ -91,7 +179,7 @@ abstract class KubernetesCRDClient {
if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) {
result = Promise.resolve(response.body);
} else {
result = Promise.reject(`KubeflowOperatorClient get tfjobs failed, statusCode is ${response.statusCode}`);
result = Promise.reject(`KubernetesApiClient getKubernetesJob failed, statusCode is ${response.statusCode}`);
}
return result;
......@@ -115,7 +203,7 @@ abstract class KubernetesCRDClient {
result = Promise.resolve(true);
} else {
result = Promise.reject(
`KubeflowOperatorClient, delete labels ${matchQuery} get wrong statusCode ${deleteResult.statusCode}`);
`KubernetesApiClient, delete labels ${matchQuery} get wrong statusCode ${deleteResult.statusCode}`);
}
} catch (err) {
result = Promise.reject(err);
......
......@@ -11,6 +11,7 @@ import { TrialJobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../
export class KubernetesTrialJobDetail implements TrialJobDetail {
public id: string;
public status: TrialJobStatus;
public message?: string;
public submitTime: number;
public startTime?: number;
public endTime?: number;
......@@ -26,6 +27,7 @@ export class KubernetesTrialJobDetail implements TrialJobDetail {
kubernetesJobName: string, url: string) {
this.id = id;
this.status = status;
this.message = 'Pending for creating the trial job.';
this.submitTime = submitTime;
this.workingDirectory = workingDirectory;
this.form = form;
......
......@@ -23,21 +23,16 @@ export class KubernetesJobInfoCollector {
this.statusesNeedToCheck = ['RUNNING', 'WAITING'];
}
public async retrieveTrialStatus(kubernetesCRDClient: KubernetesCRDClient | undefined): Promise<void> {
public async retrieveTrialStatus(kubernetesCRDClient: KubernetesCRDClient | undefined): Promise<void[]> {
assert(kubernetesCRDClient !== undefined);
const updateKubernetesTrialJobs: Promise<void>[] = [];
for (const [trialJobId, kubernetesTrialJob] of this.trialJobsMap) {
if (kubernetesTrialJob === undefined) {
throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
}
// Since Kubeflow needs some delay to schedule jobs, we provide 20 seconds buffer time to check kubeflow job's status
if (Date.now() - kubernetesTrialJob.submitTime < 20 * 1000) {
return Promise.resolve();
}
updateKubernetesTrialJobs.push(this.retrieveSingleTrialJobInfo(kubernetesCRDClient, kubernetesTrialJob));
}
await Promise.all(updateKubernetesTrialJobs);
return Promise.all(updateKubernetesTrialJobs);
}
protected async retrieveSingleTrialJobInfo(_kubernetesCRDClient: KubernetesCRDClient | undefined,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment