Unverified Commit 134368fa authored by George Cheng's avatar George Cheng Committed by GitHub
Browse files

DLTS integration (#1945)



* skeleton of dlts training service (#1844)

* Hello, DLTS!

* Revert version

* Remove fs-extra

* Add some default cluster config

* schema

* fix

* Optional cluster (default to `.default`)

Depends on DLWorkspace#837

* fix

* fix

* optimize gpu type

* No more copy

* Format

* Code clean up

* Issue fix

* Add optional fields in config

* Issue fix

* Lint

* Lint

* Validate email, password and team

* Doc

* Doc fix

* Set TMPDIR

* Use metadata instead of gpu_capacity

* Cancel paused DLTS job

* workaround lint rules

* pylint

* doc
Co-authored-by: default avatarQuanluZhang <z.quanluzhang@gmail.com>
parent 03cea2b4
**Run an Experiment on DLTS**
===
NNI supports running an experiment on [DLTS](https://github.com/microsoft/DLWorkspace.git), called dlts mode. Before starting to use NNI dlts mode, you should have an account to access DLTS dashboard.
## Setup Environment
Step 1. Choose a cluster from DLTS dashboard, ask administrator for the cluster dashboard URL.
![Choose Cluster](../../img/dlts-step1.png)
Step 2. Prepare a NNI config YAML like the following:
```yaml
# Set this field to "dlts"
trainingServicePlatform: dlts
authorName: your_name
experimentName: auto_mnist
trialConcurrency: 2
maxExecDuration: 3h
maxTrialNum: 100
searchSpacePath: search_space.json
useAnnotation: false
tuner:
builtinTunerName: TPE
classArgs:
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 1
image: msranni/nni
# Configuration to access DLTS
dltsConfig:
dashboard: # Ask administrator for the cluster dashboard URL
```
Remember to fill the cluster dashboard URL to the last line.
Step 3. Open your working directory of the cluster, paste the NNI config as well as related code to a directory.
![Copy Config](../../img/dlts-step3.png)
Step 4. Submit a NNI manager job to the specified cluster.
![Submit Job](../../img/dlts-step4.png)
Step 5. Go to Endpoints tab of the newly created job, click the Port 40000 link to check trial's information.
![View NNI WebUI](../../img/dlts-step5.png)
...@@ -9,3 +9,4 @@ Introduction to NNI Training Services ...@@ -9,3 +9,4 @@ Introduction to NNI Training Services
OpenPAI Yarn Mode<./TrainingService/PaiYarnMode> OpenPAI Yarn Mode<./TrainingService/PaiYarnMode>
Kubeflow<./TrainingService/KubeflowMode> Kubeflow<./TrainingService/KubeflowMode>
FrameworkController<./TrainingService/FrameworkControllerMode> FrameworkController<./TrainingService/FrameworkControllerMode>
OpenPAI<./TrainingService/DLTSMode>
debug: true
authorName: default
experimentName: example_mnist
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
#choice: local, remote, pai
trainingServicePlatform: dlts
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 1
#The docker image to run nni job on dlts
image: msranni/nni:latest
dltsConfig:
dashboard: http://azure-eastus-p40-dev1-infra01.eastus.cloudapp.azure.com/
# The following fields are all optional and could be retrieved from environment
# variables if running in DLTS job container.
# cluster: .default
# team: platform
# email: example@microsoft.com
# password: # Paste from DLTS dashboard
...@@ -26,6 +26,7 @@ import { PAIYarnTrainingService } from './training_service/pai/paiYarn/paiYarnTr ...@@ -26,6 +26,7 @@ import { PAIYarnTrainingService } from './training_service/pai/paiYarn/paiYarnTr
import { import {
RemoteMachineTrainingService RemoteMachineTrainingService
} from './training_service/remote_machine/remoteMachineTrainingService'; } from './training_service/remote_machine/remoteMachineTrainingService';
import { DLTSTrainingService } from './training_service/dlts/dltsTrainingService';
function initStartupInfo( function initStartupInfo(
startExpMode: string, resumeExperimentId: string, basePort: number, startExpMode: string, resumeExperimentId: string, basePort: number,
...@@ -60,6 +61,10 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN ...@@ -60,6 +61,10 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN
Container.bind(TrainingService) Container.bind(TrainingService)
.to(FrameworkControllerTrainingService) .to(FrameworkControllerTrainingService)
.scope(Scope.Singleton); .scope(Scope.Singleton);
} else if (platformMode === 'dlts') {
Container.bind(TrainingService)
.to(DLTSTrainingService)
.scope(Scope.Singleton);
} else { } else {
throw new Error(`Error: unsupported mode: ${platformMode}`); throw new Error(`Error: unsupported mode: ${platformMode}`);
} }
...@@ -108,7 +113,7 @@ const foreground: boolean = foregroundArg.toLowerCase() === 'true' ? true : fals ...@@ -108,7 +113,7 @@ const foreground: boolean = foregroundArg.toLowerCase() === 'true' ? true : fals
const port: number = parseInt(strPort, 10); const port: number = parseInt(strPort, 10);
const mode: string = parseArg(['--mode', '-m']); const mode: string = parseArg(['--mode', '-m']);
if (!['local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn'].includes(mode)) { if (!['local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts'].includes(mode)) {
console.log(`FATAL: unknown mode: ${mode}`); console.log(`FATAL: unknown mode: ${mode}`);
usage(); usage();
process.exit(1); process.exit(1);
......
...@@ -140,6 +140,15 @@ export namespace ValidationSchemas { ...@@ -140,6 +140,15 @@ export namespace ValidationSchemas {
}), }),
uploadRetryCount: joi.number().min(1) uploadRetryCount: joi.number().min(1)
}), }),
dlts_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
dashboard: joi.string().min(1),
cluster: joi.string().min(1),
team: joi.string().min(1),
email: joi.string().min(1),
password: joi.string().min(1)
}),
nni_manager_ip: joi.object({ // eslint-disable-line @typescript-eslint/camelcase nni_manager_ip: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
nniManagerIp: joi.string().min(1) nniManagerIp: joi.string().min(1)
}) })
......
...@@ -18,6 +18,7 @@ export enum TrialConfigMetadataKey { ...@@ -18,6 +18,7 @@ export enum TrialConfigMetadataKey {
KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config', KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config',
NNI_MANAGER_IP = 'nni_manager_ip', NNI_MANAGER_IP = 'nni_manager_ip',
FRAMEWORKCONTROLLER_CLUSTER_CONFIG = 'frameworkcontroller_config', FRAMEWORKCONTROLLER_CLUSTER_CONFIG = 'frameworkcontroller_config',
DLTS_CLUSTER_CONFIG = 'dlts_config',
VERSION_CHECK = 'version_check', VERSION_CHECK = 'version_check',
LOG_COLLECTION = 'log_collection' LOG_COLLECTION = 'log_collection'
} }
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
export interface DLTSClusterConfig {
dashboard: string;
cluster: string;
team: string;
email: string;
password: string;
gpuType?: string;
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
export const DLTS_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=dlts NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} MULTI_PHASE={5} \
&& cd $NNI_SYS_DIR && sh install_nni.sh \
&& cd '{6}' && python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' \
--nnimanager_ip '{8}' --nnimanager_port '{9}' --nni_manager_version '{10}' --log_collection '{11}'`;
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { DLTSClusterConfig } from "./dltsClusterConfig";
export class DLTSJobConfig {
public readonly team: string;
public readonly userName: string;
public readonly vcName: string;
public readonly gpuType: string;
public readonly jobType = "training";
public readonly jobtrainingtype = "RegularJob";
public readonly ssh = false;
public readonly ipython = false;
public readonly tensorboard = false;
public readonly workPath = '';
public readonly enableworkpath = true;
public readonly dataPath = '';
public readonly enabledatapath = false;
public readonly jobPath = '';
public readonly enablejobpath = true;
public readonly mountpoints = [];
public readonly env = [{ name: 'TMPDIR', value: '$HOME/tmp' }]
public readonly hostNetwork = false;
public readonly useGPUTopology = false;
public readonly isPrivileged = false;
public readonly hostIPC = false;
public readonly preemptionAllowed = "False"
public constructor(
clusterConfig: DLTSClusterConfig,
public readonly jobName: string,
public readonly resourcegpu: number,
public readonly image: string,
public readonly cmd: string,
public readonly interactivePorts: number[],
) {
if (clusterConfig.gpuType === undefined) {
throw Error('GPU type not fetched')
}
this.vcName = this.team = clusterConfig.team
this.gpuType = clusterConfig.gpuType
this.userName = clusterConfig.email
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import { Request, Response, Router } from 'express';
import { Inject } from 'typescript-ioc';
import * as component from '../../common/component';
import { ClusterJobRestServer } from '../common/clusterJobRestServer';
import { DLTSTrainingService } from './dltsTrainingService';
export interface ParameterFileMeta {
readonly experimentId: string;
readonly trialId: string;
readonly filePath: string;
}
/**
* DLTS Training service Rest server, provides rest API to support DLTS job metrics update
*
*/
@component.Singleton
export class DLTSJobRestServer extends ClusterJobRestServer {
private parameterFileMetaList: ParameterFileMeta[] = [];
@Inject
private readonly dltsTrainingService: DLTSTrainingService;
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor() {
super();
this.dltsTrainingService = component.get(DLTSTrainingService);
}
// tslint:disable-next-line:no-any
protected handleTrialMetrics(jobId: string, metrics: any[]): void {
// Split metrics array into single metric, then emit
// Warning: If not split metrics into single ones, the behavior will be UNKNOWN
for (const singleMetric of metrics) {
this.dltsTrainingService.MetricsEmitter.emit('metric', {
id : jobId,
data : singleMetric
});
}
}
protected createRestHandler(): Router {
const router: Router = super.createRestHandler();
router.post(`/parameter-file-meta`, (req: Request, res: Response) => {
try {
this.log.info(`POST /parameter-file-meta, body is ${JSON.stringify(req.body)}`);
this.parameterFileMetaList.push(req.body);
res.send();
} catch (err) {
this.log.error(`POST parameter-file-meta error: ${err}`);
res.status(500);
res.send(err.message);
}
});
router.get(`/parameter-file-meta`, (req: Request, res: Response) => {
try {
this.log.info(`GET /parameter-file-meta`);
res.send(this.parameterFileMetaList);
} catch (err) {
this.log.error(`GET parameter-file-meta error: ${err}`);
res.status(500);
res.send(err.message);
}
});
return router;
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import * as path from 'path';
import * as request from 'request';
import * as component from '../../common/component';
import { EventEmitter } from 'events';
import { String } from 'typescript-string-operations';
import { getExperimentId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import {
NNIManagerIpConfig, TrainingService,
TrialJobApplicationForm, TrialJobDetail, TrialJobMetric
} from '../../common/trainingService';
import { DLTS_TRIAL_COMMAND_FORMAT } from './dltsData';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { execMkdir, validateCodeDir } from '../common/util';
import { delay, uniqueString, getIPV4Address, getExperimentRootDir, getVersion, generateParamFileName } from '../../common/utils';
import { DLTSJobRestServer } from './dltsJobRestServer';
import { TrialConfigMetadataKey } from '../../training_service/common/trialConfigMetadataKey';
import { DLTSJobConfig } from './dltsJobConfig';
import { DLTSClusterConfig } from './dltsClusterConfig';
import { DLTSTrialConfig } from './dltsTrialConfig';
import { DLTSTrialJobDetail } from './dltsTrialJobDetail';
@component.Singleton
class DLTSTrainingService implements TrainingService {
private readonly log!: Logger;
private readonly metricsEmitter: EventEmitter;
//private readonly expRootDir: string;
private readonly jobQueue: string[];
private stopping: boolean = false;
private readonly experimentId!: string;
private versionCheck: boolean = true;
private logCollection: string = 'none';
private isMultiPhase: boolean = false;
private dltsRestServerPort?: number;
private readonly trialJobsMap: Map<string, DLTSTrialJobDetail>;
private nniManagerIpConfig?: NNIManagerIpConfig;
private dltsClusterConfig?: DLTSClusterConfig;
private dltsTrialConfig?: DLTSTrialConfig;
constructor() {
this.log = getLogger();
this.metricsEmitter = new EventEmitter();
this.trialJobsMap = new Map();
this.jobQueue = [];
this.experimentId = getExperimentId();
this.log.info('Construct DLTS training service.');
}
public async run(): Promise<void> {
this.log.info('Run DLTS training service.');
const restServer: DLTSJobRestServer = component.get(DLTSJobRestServer);
await restServer.start();
restServer.setEnableVersionCheck = this.versionCheck;
this.log.info(`DLTS Training service rest server listening on: ${restServer.endPoint}`);
await Promise.all([
this.statusCheckingLoop(),
this.submitJobLoop()]);
this.log.info('DLTS training service exit.');
}
private async statusCheckingLoop(): Promise<void> {
while (!this.stopping) {
const updateDLTSTrialJobs: Promise<void>[] = [];
for (const [trialJobId, dltsTrialJob] of this.trialJobsMap) {
updateDLTSTrialJobs.push(this.getDLTSTrialJobInfo(dltsTrialJob));
}
await Promise.all(updateDLTSTrialJobs);
// Calcel paused dlts job
const cancelPausedJobPromises: Promise<void>[] = [];
for (const [trialJobId, dltsTrialJob] of this.trialJobsMap) {
if (dltsTrialJob.dltsPaused && dltsTrialJob.status === 'RUNNING') {
cancelPausedJobPromises.push(this.cancelTrialJob(trialJobId));
}
}
await Promise.all(cancelPausedJobPromises);
const restServer: DLTSJobRestServer = component.get(DLTSJobRestServer);
if (restServer.getErrorMessage !== undefined) {
throw new Error(restServer.getErrorMessage);
}
await delay(3000);
}
}
private async getDLTSTrialJobInfo(dltsTrialJob: DLTSTrialJobDetail): Promise<void> {
if (this.dltsClusterConfig == null) {
throw Error('Cluster config is not set');
}
const requestOptions: request.Options = {
uri: `${this.dltsClusterConfig.dashboard}api/v2/clusters/${this.dltsClusterConfig.cluster}/jobs/${dltsTrialJob.dltsJobId}`,
qs: {
email: this.dltsClusterConfig.email,
password: this.dltsClusterConfig.password
},
json: true
};
const body = await new Promise((resolve, reject) => {
request(requestOptions, (error, response, body) => {
if (error != null) {
reject(error)
} else {
resolve(body)
}
})
}) as any;
void ((): void => {
switch (body['jobStatus']) {
case 'unapproved':
case 'queued':
case 'scheduling':
dltsTrialJob.status = "WAITING";
break;
case 'running':
dltsTrialJob.status = "RUNNING";
if (dltsTrialJob.startTime === undefined) {
dltsTrialJob.startTime = Date.parse(body['jobStatusDetail'][0]['startedAt'])
}
if (dltsTrialJob.url === undefined) {
dltsTrialJob.url = `${this.dltsClusterConfig.dashboard}job/${this.dltsClusterConfig.team}/${this.dltsClusterConfig.cluster}/${dltsTrialJob.dltsJobId}`
}
break;
case 'finished':
dltsTrialJob.status = "SUCCEEDED";
break;
case 'failed':
dltsTrialJob.status = "FAILED";
break;
case 'pausing':
case 'paused':
dltsTrialJob.status = "RUNNING";
dltsTrialJob.dltsPaused = true;
break;
case 'killing':
case 'killed':
if (dltsTrialJob.isEarlyStopped !== undefined) {
dltsTrialJob.status = dltsTrialJob.isEarlyStopped === true
? 'EARLY_STOPPED' : 'USER_CANCELED';
} else {
dltsTrialJob.status = 'SYS_CANCELED';
}
break;
default:
dltsTrialJob.status = "UNKNOWN";
}
}) ();
}
private async submitJobLoop(): Promise<void> {
while (!this.stopping) {
while (!this.stopping && this.jobQueue.length > 0) {
const trialJobId: string = this.jobQueue[0];
this.log.info(`Got job ${trialJobId}`);
if (await this.submitTrialJobToDLTS(trialJobId)) {
// Remove trial job with trialJobId from job queue
this.jobQueue.shift();
} else {
// Break the while loop since failed to submitJob
break;
}
}
await delay(3000);
}
}
public async listTrialJobs(): Promise<TrialJobDetail[]> {
return Array.from(this.trialJobsMap.values());
}
public async getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
const trialJob = this.trialJobsMap.get(trialJobId);
if (trialJob === undefined) {
throw Error(`Trial job ${trialJobId} not found.`)
}
return trialJob
}
public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
this.metricsEmitter.on('metric', listener);
}
public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void): void {
this.metricsEmitter.off('metric', listener);
}
public get MetricsEmitter(): EventEmitter {
return this.metricsEmitter;
}
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const trialJobId: string = uniqueString(5);
const trialWorkingFolder: string = path.join(
'/nni/experiments', getExperimentId(),
'/trials/', trialJobId);
const trialJobDetail = new DLTSTrialJobDetail(
trialJobId, // id
'WAITING', // status
Date.now(), // submitTime
trialWorkingFolder, // workingDirectory
form,
`nni_exp_${this.experimentId}_trial_${trialJobId}`
);
this.trialJobsMap.set(trialJobId, trialJobDetail);
this.jobQueue.push(trialJobId);
return trialJobDetail;
}
public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail: DLTSTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw Error(`cancelTrialJob: trial job id ${trialJobId} not found`);
}
if (this.dltsClusterConfig === undefined) {
throw Error('DLTS Cluster config is not initialized');
}
const options: request.Options = {
method: 'PUT',
uri: `${this.dltsClusterConfig.dashboard}api/clusters/${this.dltsClusterConfig.cluster}/jobs/${trialJobDetail.dltsJobId}/status`,
qs: {
email: this.dltsClusterConfig.email,
password: this.dltsClusterConfig.password
},
body: {
status: 'killing'
},
json: true
};
// Set trialjobDetail's early stopped field, to mark the job's cancellation source
trialJobDetail.isEarlyStopped = isEarlyStopped;
await new Promise((resolve, reject) => {
request(options, (error: Error, response: request.Response, body: any) => {
if (error) {
reject(error);
} else {
resolve(body);
}
});
});
}
private async getGpuType(): Promise<string> {
if (this.dltsClusterConfig === undefined) {
throw new Error('DLTS Cluster config is not initialized');
}
const gpuRequestOptions: request.Options = {
method: 'GET',
qs: {
email: this.dltsClusterConfig.email,
password: this.dltsClusterConfig.password
},
uri: `${this.dltsClusterConfig.dashboard}api/teams/${this.dltsClusterConfig.team}/clusters/${this.dltsClusterConfig.cluster}`,
json: true
};
return new Promise<string>((resolve, reject) => {
request(gpuRequestOptions, (error, response, data) => {
if (error) {
return reject(error)
}
try {
const metadata = JSON.parse(data['metadata'])
resolve(Object.keys(metadata)[0])
} catch (error) {
reject(error)
}
})
});
}
public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.DLTS_CLUSTER_CONFIG:
this.dltsClusterConfig = <DLTSClusterConfig>JSON.parse(value);
if (!this.dltsClusterConfig.cluster) {
this.dltsClusterConfig.cluster = '.default'
}
if (!this.dltsClusterConfig.email) {
if (process.env['DLWS_USER_EMAIL']) {
this.dltsClusterConfig.email = process.env['DLWS_USER_EMAIL'] as string
} else {
throw Error('`email` field in `dltsConfig` is not configured.')
}
}
if (!this.dltsClusterConfig.password) {
if (process.env['DLTS_JOB_TOKEN']) {
this.dltsClusterConfig.password = process.env['DLTS_JOB_TOKEN'] as string
} else {
throw Error('`password` field in `dltsConfig` is not configured.')
}
}
if (!this.dltsClusterConfig.team) {
if (process.env['DLWS_VC_NAME']) {
this.dltsClusterConfig.team = process.env['DLWS_VC_NAME'] as string
} else {
throw Error('`team` field in `dltsConfig` is not configured.')
}
}
this.dltsClusterConfig.gpuType = await this.getGpuType();
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
this.dltsTrialConfig = <DLTSTrialConfig>JSON.parse(value);
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.dltsTrialConfig.codeDir);
} catch (error) {
this.log.error(error);
throw error;
}
break;
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
case TrialConfigMetadataKey.MULTI_PHASE:
this.isMultiPhase = (value === 'true' || value === 'True');
break;
default:
//Reject for unknown keys
throw new Error(`Uknown key: ${key}`);
}
}
public async getClusterMetadata(key: string): Promise<string> {
return '';
}
public async cleanUp(): Promise<void> {
this.log.info('Stopping DLTS training service...');
this.stopping = true;
const restServer: DLTSJobRestServer = component.get(DLTSJobRestServer);
try {
await restServer.stop();
this.log.info('DLTS Training service rest server stopped successfully.');
return;
} catch (error) {
// tslint:disable-next-line: no-unsafe-any
this.log.error(`DLTS Training service rest server stopped failed, error: ${error.message}`);
throw error;
}
}
private async submitTrialJobToDLTS(trialJobId: string): Promise<boolean> {
const trialJobDetail: DLTSTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`Failed to find DLTSTrialJobDetail for job ${trialJobId}`);
}
if (this.dltsClusterConfig === undefined) {
throw new Error('DLTS Cluster config is not initialized');
}
if (this.dltsTrialConfig === undefined) {
throw new Error('trial config is not initialized');
}
if (this.dltsRestServerPort === undefined) {
const restServer: DLTSJobRestServer = component.get(DLTSJobRestServer);
this.dltsRestServerPort = restServer.clusterRestServerPort;
}
// Step 1. Prepare DLTS job configuration
const trialLocalFolder = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//create tmp trial working folder locally.
await execMkdir(trialLocalFolder);
const runScriptContent: string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalFolder, 'install_nni.sh'), runScriptContent, { encoding: 'utf8' });
// Write file content ( parameter.cfg ) to local tmp folders
if (trialJobDetail.form !== undefined) {
await fs.promises.writeFile(
path.join(trialLocalFolder, generateParamFileName(trialJobDetail.form.hyperParameters)),
trialJobDetail.form.hyperParameters.value, { encoding: 'utf8' }
);
}
// tslint:disable-next-line: strict-boolean-expressions
const nniManagerIp: string = this.nniManagerIpConfig ? this.nniManagerIpConfig.nniManagerIp : getIPV4Address();
const version: string = this.versionCheck ? await getVersion() : '';
const nniDLTSTrialCommand: string = String.Format(
DLTS_TRIAL_COMMAND_FORMAT,
trialLocalFolder,
path.join(trialLocalFolder, 'nnioutput'),
trialJobId,
this.experimentId,
trialJobDetail.form.sequenceId,
false,
this.dltsTrialConfig.codeDir,
this.dltsTrialConfig.command,
nniManagerIp,
this.dltsRestServerPort,
version,
this.logCollection
)
.replace(/\r\n|\n|\r/gm, '');
// Step 2. Submit DLTS job via Rest call
const dltsJobConfig: DLTSJobConfig = new DLTSJobConfig(
this.dltsClusterConfig,
trialJobDetail.dltsJobName,
this.dltsTrialConfig.gpuNum,
this.dltsTrialConfig.image,
nniDLTSTrialCommand,
[]
);
const submitJobRequest: request.Options = {
method: 'POST',
uri: `${this.dltsClusterConfig.dashboard}api/clusters/${this.dltsClusterConfig.cluster}/jobs`,
qs: {
email: this.dltsClusterConfig.email,
password: this.dltsClusterConfig.password
},
body: dltsJobConfig,
json: true
}
const responseData = await new Promise<any>((resolve, reject) => {
request(submitJobRequest, function (error, response, data) {
if (error) {
return reject(error)
} else {
return resolve(data)
}
})
});
trialJobDetail.dltsJobId = responseData['jobId']
return true;
}
public async updateTrialJob(trialJobId: string, form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) {
throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
}
if (this.dltsClusterConfig === undefined) {
throw new Error('DLTS Cluster config is not initialized');
}
if (this.dltsTrialConfig === undefined) {
throw new Error('DLTS trial config is not initialized');
}
const hyperParameters = form.hyperParameters;
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
const hpFileName: string = generateParamFileName(hyperParameters);
const localFilepath: string = path.join(trialLocalTempFolder, hpFileName);
await fs.promises.writeFile(localFilepath, hyperParameters.value, { encoding: 'utf8' });
const parameterFileMeta = {
experimentId: this.experimentId,
trialId: trialJobId
};
const restServer: DLTSJobRestServer = component.get(DLTSJobRestServer);
const req: request.Options = {
uri: `${restServer.endPoint}${restServer.apiRootUrl}/parameter-file-meta`,
method: 'POST',
json: true,
body: parameterFileMeta
};
await new Promise((resolve, reject) => {
request(req, (err: Error, res: request.Response) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
return trialJobDetail;
}
public get isMultiPhaseJobSupported(): boolean {
return false;
}
}
export { DLTSTrainingService };
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { TrialConfig } from "training_service/common/trialConfig";
export class DLTSTrialConfig extends TrialConfig {
public constructor(
command: string,
codeDir: string,
gpuNum: number,
public readonly image: string
) {
super(command, codeDir, gpuNum);
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import {
TrialJobDetail,
TrialJobStatus,
TrialJobApplicationForm
} from "../../common/trainingService";
export class DLTSTrialJobDetail implements TrialJobDetail {
public startTime?: number;
public endTime?: number;
public tags?: string[];
public url?: string;
public isEarlyStopped?: boolean;
// DLTS staff
public dltsJobId?: string;
public dltsPaused: boolean = false;
public constructor (
public id: string,
public status: TrialJobStatus,
public submitTime: number,
public workingDirectory: string,
public form: TrialJobApplicationForm,
// DLTS staff
public dltsJobName: string,
) {}
}
...@@ -9,7 +9,7 @@ if trial_env_vars.NNI_PLATFORM is None: ...@@ -9,7 +9,7 @@ if trial_env_vars.NNI_PLATFORM is None:
from .standalone import * from .standalone import *
elif trial_env_vars.NNI_PLATFORM == 'unittest': elif trial_env_vars.NNI_PLATFORM == 'unittest':
from .test import * from .test import *
elif trial_env_vars.NNI_PLATFORM in ('local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn'): elif trial_env_vars.NNI_PLATFORM in ('local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts'):
from .local import * from .local import *
else: else:
raise RuntimeError('Unknown platform %s' % trial_env_vars.NNI_PLATFORM) raise RuntimeError('Unknown platform %s' % trial_env_vars.NNI_PLATFORM)
...@@ -32,7 +32,8 @@ common_schema = { ...@@ -32,7 +32,8 @@ common_schema = {
'trialConcurrency': setNumberRange('trialConcurrency', int, 1, 99999), 'trialConcurrency': setNumberRange('trialConcurrency', int, 1, 99999),
Optional('maxExecDuration'): And(Regex(r'^[1-9][0-9]*[s|m|h|d]$', error='ERROR: maxExecDuration format is [digit]{s,m,h,d}')), Optional('maxExecDuration'): And(Regex(r'^[1-9][0-9]*[s|m|h|d]$', error='ERROR: maxExecDuration format is [digit]{s,m,h,d}')),
Optional('maxTrialNum'): setNumberRange('maxTrialNum', int, 1, 99999), Optional('maxTrialNum'): setNumberRange('maxTrialNum', int, 1, 99999),
'trainingServicePlatform': setChoice('trainingServicePlatform', 'remote', 'local', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn'), 'trainingServicePlatform': setChoice(
'trainingServicePlatform', 'remote', 'local', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts'),
Optional('searchSpacePath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'searchSpacePath'), Optional('searchSpacePath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'searchSpacePath'),
Optional('multiPhase'): setType('multiPhase', bool), Optional('multiPhase'): setType('multiPhase', bool),
Optional('multiThread'): setType('multiThread', bool), Optional('multiThread'): setType('multiThread', bool),
...@@ -297,6 +298,27 @@ pai_config_schema = { ...@@ -297,6 +298,27 @@ pai_config_schema = {
}) })
} }
dlts_trial_schema = {
'trial':{
'command': setType('command', str),
'codeDir': setPathCheck('codeDir'),
'gpuNum': setNumberRange('gpuNum', int, 0, 99999),
'image': setType('image', str),
}
}
dlts_config_schema = {
'dltsConfig': {
'dashboard': setType('dashboard', str),
Optional('cluster'): setType('cluster', str),
Optional('team'): setType('team', str),
Optional('email'): setType('email', str),
Optional('password'): setType('password', str),
}
}
kubeflow_trial_schema = { kubeflow_trial_schema = {
'trial':{ 'trial':{
'codeDir': setPathCheck('codeDir'), 'codeDir': setPathCheck('codeDir'),
...@@ -438,6 +460,8 @@ PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_sc ...@@ -438,6 +460,8 @@ PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_sc
PAI_YARN_CONFIG_SCHEMA = Schema({**common_schema, **pai_yarn_trial_schema, **pai_yarn_config_schema}) PAI_YARN_CONFIG_SCHEMA = Schema({**common_schema, **pai_yarn_trial_schema, **pai_yarn_config_schema})
DLTS_CONFIG_SCHEMA = Schema({**common_schema, **dlts_trial_schema, **dlts_config_schema})
KUBEFLOW_CONFIG_SCHEMA = Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema}) KUBEFLOW_CONFIG_SCHEMA = Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema})
FRAMEWORKCONTROLLER_CONFIG_SCHEMA = Schema({**common_schema, **frameworkcontroller_trial_schema, **frameworkcontroller_config_schema}) FRAMEWORKCONTROLLER_CONFIG_SCHEMA = Schema({**common_schema, **frameworkcontroller_trial_schema, **frameworkcontroller_config_schema})
...@@ -289,6 +289,25 @@ def set_frameworkcontroller_config(experiment_config, port, config_file_name): ...@@ -289,6 +289,25 @@ def set_frameworkcontroller_config(experiment_config, port, config_file_name):
#set trial_config #set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message return set_trial_config(experiment_config, port, config_file_name), err_message
def set_dlts_config(experiment_config, port, config_file_name):
'''set dlts configuration'''
dlts_config_data = dict()
dlts_config_data['dlts_config'] = experiment_config['dltsConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(dlts_config_data), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_experiment(experiment_config, mode, port, config_file_name): def set_experiment(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content''' '''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict() request_data = dict()
...@@ -389,6 +408,8 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res ...@@ -389,6 +408,8 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res
config_result, err_msg = set_kubeflow_config(experiment_config, port, config_file_name) config_result, err_msg = set_kubeflow_config(experiment_config, port, config_file_name)
elif platform == 'frameworkcontroller': elif platform == 'frameworkcontroller':
config_result, err_msg = set_frameworkcontroller_config(experiment_config, port, config_file_name) config_result, err_msg = set_frameworkcontroller_config(experiment_config, port, config_file_name)
elif platform == 'dlts':
config_result, err_msg = set_dlts_config(experiment_config, port, config_file_name)
else: else:
raise Exception(ERROR_INFO % 'Unsupported platform!') raise Exception(ERROR_INFO % 'Unsupported platform!')
exit(1) exit(1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment