Unverified Commit 143c6615 authored by Chi Song's avatar Chi Song Committed by GitHub
Browse files

Reusable environment support GPU scheduler, add test cases and refactoring. (#2627)

parent 8a20c348
......@@ -25,7 +25,7 @@ export class TrialDetail implements TrialJobDetail {
// it's used to aggregate node status for multiple node trial
public nodes: Map<string, NodeInfomation>;
// assigned GPUs for multi-trial scheduled.
public assignedGpus: GPUInfo[] = [];
public assignedGpus: GPUInfo[] | undefined;
public readonly TRIAL_METADATA_DIR = ".nni";
......
......@@ -9,18 +9,20 @@ import * as path from 'path';
import { Writable } from 'stream';
import { String } from 'typescript-string-operations';
import * as component from '../../common/component';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { getBasePort, getExperimentId, getPlatform } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { NNIManagerIpConfig, TrainingService, TrialJobApplicationForm, TrialJobMetric, TrialJobStatus } from '../../common/trainingService';
import { delay, getExperimentRootDir, getLogLevel, getVersion, mkDirPSync, uniqueString, getIPV4Address } from '../../common/utils';
import { delay, getExperimentRootDir, getIPV4Address, getLogLevel, getVersion, mkDirPSync, uniqueString } from '../../common/utils';
import { GPU_INFO, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, REPORT_METRIC_DATA, SEND_TRIAL_JOB_PARAMETER, STDOUT, TRIAL_END, VERSION_CHECK } from '../../core/commands';
import { GPUSummary } from '../../training_service/common/gpuData';
import { ScheduleResultType } from '../../training_service/common/gpuData';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { validateCodeDir } from '../common/util';
import { Command, CommandChannel } from './commandChannel';
import { EnvironmentInformation, EnvironmentService, NodeInfomation, RunnerSettings } from './environment';
import { EnvironmentInformation, EnvironmentService, NodeInfomation, RunnerSettings, TrialGpuSummary } from './environment';
import { GpuScheduler } from './gpuScheduler';
import { MountedStorageService } from './storages/mountedStorageService';
import { StorageService } from './storageService';
import { TrialDetail } from './trial';
......@@ -32,8 +34,6 @@ import { TrialDetail } from './trial';
**/
@component.Singleton
class TrialDispatcher implements TrainingService {
private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?<metrics>.*?)'`;
private readonly log: Logger;
private readonly isDeveloping: boolean = false;
private stopping: boolean = false;
......@@ -53,6 +53,22 @@ class TrialDispatcher implements TrainingService {
private readonly trials: Map<string, TrialDetail>;
private readonly environments: Map<string, EnvironmentInformation>;
// uses to accelerate trial manager loop
// true means there is updates, and trial loop should run a cycle immediately.
private shouldUpdateTrials: boolean = true;
// uses to decide environment assign strategy.
// true means use gpu scheduler to decide if there is free resource for new trial.
// false means one env run one trial in same time.
private enableGpuScheduler: boolean = false;
// uses to save if user like to reuse environment
private reuseEnvironment: boolean = true;
private gpuScheduler: GpuScheduler;
// uses to reduce log count.
private isLoggedNoMoreEnvironment: boolean = false;
private isLoggedNoGpuAvailable: boolean = false;
constructor() {
this.log = getLogger();
this.trials = new Map<string, TrialDetail>();
......@@ -71,8 +87,9 @@ class TrialDispatcher implements TrainingService {
if (logLevel == "debug" && (fs.existsSync("../../../src/nni_manager") || __dirname.endsWith("src\\nni_manager\\dist\\training_service\\reusable"))) {
this.log.debug("log level is debug, and exist code folder, so set to developing mode.");
this.isDeveloping = true;
this.runnerSettings.enableGpuCollector = true;
}
this.gpuScheduler = new GpuScheduler();
}
public async listTrialJobs(): Promise<TrialDetail[]> {
......@@ -161,7 +178,7 @@ class TrialDispatcher implements TrainingService {
const environmentService = component.get<EnvironmentService>(EnvironmentService);
this.commandEmitter = new EventEmitter();
this.commandChannel = environmentService.getCommandChannel(this.commandEmitter);
this.commandChannel = environmentService.createCommandChannel(this.commandEmitter);
// TODO it's a hard code of web channel, it needs to be improved.
if (this.runnerSettings.nniManagerIP === "" || this.runnerSettings.nniManagerIP === null) {
......@@ -170,9 +187,6 @@ class TrialDispatcher implements TrainingService {
this.runnerSettings.nniManagerPort = getBasePort() + 1;
this.runnerSettings.commandChannel = this.commandChannel.channelName;
// for AML channel, other channels can ignore this.
await this.commandChannel.config("MetricEmitter", this.metricsEmitter);
// start channel
this.commandEmitter.on("command", (command: Command): void => {
this.handleCommand(command).catch((err: Error) => {
......@@ -251,9 +265,17 @@ class TrialDispatcher implements TrainingService {
this.runnerSettings.logCollection = value;
break;
case TrialConfigMetadataKey.TRIAL_CONFIG:
// TODO to support more storage types by better parameters.
this.trialConfig = <TrialConfig>JSON.parse(value);
if (this.trialConfig.reuseEnvironment !== undefined) {
this.reuseEnvironment = this.trialConfig.reuseEnvironment;
}
if (this.trialConfig.gpuNum !== undefined && this.trialConfig.gpuNum > 0) {
this.log.info(`TrialDispatcher: GPU scheduler is enabled.`)
this.enableGpuScheduler = true;
}
this.runnerSettings.enableGpuCollector = this.enableGpuScheduler;
this.runnerSettings.command = this.trialConfig.command;
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(this.trialConfig.codeDir);
......@@ -275,6 +297,7 @@ class TrialDispatcher implements TrainingService {
throw new Error(`TrialDispatcher: commandEmitter shouldn't be undefined in cleanUp.`);
}
this.stopping = true;
this.shouldUpdateTrials = true;
const environmentService = component.get<EnvironmentService>(EnvironmentService);
const environments = [...this.environments.values()];
......@@ -324,7 +347,8 @@ class TrialDispatcher implements TrainingService {
this.log.debug(`set environment ${environment.id} isAlive from ${oldIsAlive} to ${environment.isAlive} due to status is ${environment.status}.`);
}
});
await delay(5000);
this.shouldUpdateTrials = true;
await delay(environmentService.environmentMaintenceLoopInterval);
}
}
......@@ -332,9 +356,18 @@ class TrialDispatcher implements TrainingService {
if (this.commandChannel === undefined) {
throw new Error(`TrialDispatcher: commandChannel shouldn't be undefined in trialManagementLoop.`);
}
const interval = 1;
while (!this.stopping) {
await delay(2000);
let totalInterval = 1000;
while (totalInterval > 0) {
if (this.shouldUpdateTrials) {
this.shouldUpdateTrials = false;
break;
}
totalInterval -= interval;
await delay(interval);
}
const toRefreshedTrials: TrialDetail[] = [];
for (const trial of this.trials.values()) {
......@@ -347,7 +380,7 @@ class TrialDispatcher implements TrainingService {
continue;
}
const waitingTrials: TrialDetail[] = [];
let waitingTrials: TrialDetail[] = [];
let liveTrialsCount = 0;
for (const trial of toRefreshedTrials) {
const currentStatus = trial.status;
......@@ -396,7 +429,7 @@ class TrialDispatcher implements TrainingService {
}
this.releaseEnvironment(trial);
} else if (environmentStatus !== "RUNNING") {
this.log.error(`found running trial ${trial.id} on '${environment.jobId}' with '${environmentStatus}', set trial to environment status.`);
this.log.error(`found running trial ${trial.id} on '${environment.envId}' with '${environmentStatus}', set trial to environment status.`);
this.releaseEnvironment(trial);
trial.status = environmentStatus;
} else {
......@@ -412,31 +445,133 @@ class TrialDispatcher implements TrainingService {
break;
}
}
let liveEnvironmentsCount = 0;
const idleEnvironments: EnvironmentInformation[] = [];
this.environments.forEach((environment) => {
const reusableEnvironments: EnvironmentInformation[] = [];
for (const environment of this.environments.values()) {
if (environment.isAlive === true) {
liveEnvironmentsCount++;
if (environment.status === "RUNNING" && environment.isIdle) {
idleEnvironments.push(environment);
if (environment.status === "RUNNING" && environment.isRunnerReady) {
// if environment is not reusable and used, stop and not count as idle;
if (
0 === environment.runningTrialCount &&
false === this.reuseEnvironment &&
environment.assignedTrialCount > 0
) {
const environmentService = component.get<EnvironmentService>(EnvironmentService);
await environmentService.stopEnvironment(environment);
continue;
}
// if gpu scheduler is not enabled, and there is running trial, skip it.
if (false === this.enableGpuScheduler && environment.runningTrialCount > 0) {
continue;
}
reusableEnvironments.push(environment);
}
}
});
while (idleEnvironments.length > 0 && waitingTrials.length > 0) {
const trial = waitingTrials.shift();
const idleEnvironment = idleEnvironments.shift();
if (trial !== undefined && idleEnvironment != undefined) {
await this.assignEnvironment(trial, idleEnvironment);
}
let neededEnvironmentCount = 0;
if (true === this.enableGpuScheduler) {
let noGpuAvailable: boolean = false;
while (waitingTrials.length > 0) {
// skip following trials, if first trial doesn't find available GPU.
if (true === noGpuAvailable) {
// break loop to try next time.
break;
}
const trial = waitingTrials.shift();
if (undefined === trial) {
throw new Error(`TrialDispatcher: waiting trial shouldn't be undefined!`);
}
const gpuNum = this.trialConfig ? this.trialConfig.gpuNum : undefined;
const result = this.gpuScheduler.scheduleMachine(reusableEnvironments, gpuNum, trial);
switch (result.resultType) {
case ScheduleResultType.REQUIRE_EXCEED_TOTAL:
{
if (liveEnvironmentsCount == 0) {
this.log.debug(`TrialDispatcher: no live environment, so request one.`);
neededEnvironmentCount = 1;
waitingTrials = [];
this.isLoggedNoGpuAvailable = false;
} else if (reusableEnvironments.length > 0) {
const errorMessage: string = `TrialDispatcher: REQUIRE_EXCEED_TOTAL Required GPU number ${gpuNum} is too large, no machine can meet`;
this.log.error(errorMessage);
throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage);
} else {
if (false === this.isLoggedNoGpuAvailable) {
this.log.debug(`TrialDispatcher: wait GPU, live environment ${liveEnvironmentsCount}, no reusable, REQUIRE_EXCEED_TOTAL.`)
this.isLoggedNoGpuAvailable = true;
}
}
break;
}
case ScheduleResultType.TMP_NO_AVAILABLE_GPU:
{
if (false === this.isLoggedNoGpuAvailable) {
this.log.debug(`TrialDispatcher: wait GPU, live environment ${liveEnvironmentsCount}, reusable ${reusableEnvironments.length}, TMP_NO_AVAILABLE_GPU.`)
this.isLoggedNoGpuAvailable = true;
}
// if some environment is alive, but not ready, no need to create more.
if (liveEnvironmentsCount <= reusableEnvironments.length) {
neededEnvironmentCount = 1;
this.isLoggedNoGpuAvailable = false;
this.log.info(`TrialDispatcher: ${liveEnvironmentsCount} live env, and ${reusableEnvironments.length} reusable, but no GPU available so request a new one.`);
}
noGpuAvailable = true;
}
break
case ScheduleResultType.SUCCEED:
{
const environment = result.environment;
if (undefined === environment) {
throw new Error(`TrialDispatcher: scheduled env shouldn't be undefined!`);
}
trial.assignedGpus = result.gpuIndices;
await this.allocateEnvironment(trial, environment);
this.isLoggedNoGpuAvailable = false;
}
break
default:
throw new Error(`TrialDispatcher: Unknown gpu schecduler type: ${result.resultType}`);
}
}
} else {
while (reusableEnvironments.length > 0 && waitingTrials.length > 0) {
const trial = waitingTrials.shift();
const idleEnvironment = reusableEnvironments.shift();
if (trial !== undefined && idleEnvironment != undefined) {
await this.allocateEnvironment(trial, idleEnvironment);
}
}
neededEnvironmentCount = liveTrialsCount - liveEnvironmentsCount;
}
if (liveEnvironmentsCount < liveTrialsCount) {
this.log.info(`request new environment, since live trials ${liveTrialsCount} ` +
`is more than live environments ${liveEnvironmentsCount}`);
for (let index = 0; index < liveTrialsCount - liveEnvironmentsCount; index++) {
await this.requestEnvironment();
if (neededEnvironmentCount > 0) {
const environmentService = component.get<EnvironmentService>(EnvironmentService);
let requestedCount = 0;
for (let index = 0; index < neededEnvironmentCount; index++) {
if (true === environmentService.hasMoreEnvironments) {
await this.requestEnvironment();
requestedCount++;
this.isLoggedNoMoreEnvironment = false;
} else {
if (this.isLoggedNoMoreEnvironment === false) {
this.isLoggedNoMoreEnvironment = true;
this.log.info(`no more environment so far, so skip to request environment.`)
}
}
}
if (environmentService.hasMoreEnvironments === true || requestedCount > 0) {
this.log.info(`requested new environment, live trials: ${liveTrialsCount}, ` +
`live environments: ${liveEnvironmentsCount}, neededEnvironmentCount: ${neededEnvironmentCount}, ` +
`requestedCount: ${requestedCount}`);
}
}
}
}
......@@ -462,35 +597,51 @@ class TrialDispatcher implements TrainingService {
this.environments.set(environment.id, environment);
if (environment.status === "FAILED") {
environment.isIdle = false;
environment.isAlive = false;
throw new Error(`error on request environment ${environment.jobId}, please check log for more details.`);
throw new Error(`error on request environment ${environment.envId}, please check log for more details.`);
} else {
environment.isIdle = true;
environment.isAlive = true;
}
await this.commandChannel.open(environment);
this.log.info(`requested environment ${environment.id} and job id is ${environment.jobId}.`);
this.log.info(`requested environment ${environment.id} and job id is ${environment.envId}.`);
}
private async assignEnvironment(trial: TrialDetail, environment: EnvironmentInformation): Promise<void> {
private async allocateEnvironment(trial: TrialDetail, environment: EnvironmentInformation): Promise<void> {
if (this.commandChannel === undefined) {
throw new Error(`TrialDispatcher: commandChannel shouldn't be undefined in assignEnvironment.`);
throw new Error(`TrialDispatcher: commandChannel shouldn't be undefined in allocateEnvironment.`);
}
if (this.trialConfig === undefined) {
throw new Error(`TrialDispatcher: trialConfig shouldn't be undefined in allocateEnvironment.`);
}
if (trial.environment) {
throw new Error(`trial ${trial.id} has assigned environment ${trial.environment.id} already, not assign to ${environment.id}!`);
throw new Error(`TrialDispatcher: trial ${trial.id} has assigned environment ${trial.environment.id} already, not assign to ${environment.id}!`);
}
if (environment.isIdle == false) {
throw new Error(`environment ${environment.id} is not idle, and cannot be assigned again!`);
if (environment.runningTrialCount > 0 && false === this.enableGpuScheduler) {
throw new Error(`TrialDispatcher: environment ${environment.id} has running trial, and gpu scheduler is not enabled, it cannot be assigned again!`);
}
this.log.info(`assigning environment ${environment.id} to trial ${trial.id}.`);
environment.isIdle = false;
// convert assigned gpus to string for nvidia visible settings
// undefined means no constraint, [] means no gpu visible.
let gpuIndices: string | undefined = undefined;
if (undefined !== this.trialConfig.gpuNum) {
const gpuArray: number[] = [];
if (undefined !== trial.assignedGpus) {
trial.assignedGpus.map((value) => {
gpuArray.push(value.index);
});
}
gpuIndices = gpuArray.join(',');
}
environment.runningTrialCount++;
environment.assignedTrialCount++;
trial.environment = environment;
trial.settings = {
trialId: trial.id,
gpuIndices: gpuIndices,
sequenceId: trial.form.sequenceId,
parameter: trial.form.hyperParameters,
}
......@@ -500,13 +651,16 @@ class TrialDispatcher implements TrainingService {
}
private releaseEnvironment(trial: TrialDetail): void {
if (!trial.environment) {
throw new Error(`environment is not assigned to trial ${trial.id}, and cannot be released!`);
if (undefined === trial.environment) {
throw new Error(`TrialDispatcher: environment is not assigned to trial ${trial.id}, and cannot be released!`);
}
if (trial.environment.runningTrialCount <= 0) {
throw new Error(`TrialDispatcher: environment ${trial.environment.id} has no counted running trial!`);
}
if (trial.environment.isIdle) {
throw new Error(`environment ${trial.environment.id} is idle already!`);
if (true === this.enableGpuScheduler) {
this.gpuScheduler.removeGpuReservation(trial);
}
trial.environment.isIdle = true;
trial.environment.runningTrialCount--;
trial.environment = undefined;
}
......@@ -527,19 +681,20 @@ class TrialDispatcher implements TrainingService {
}
private async handleStdout(commandData: any): Promise<void> {
const metricPattern: RegExp = /NNISDK_MEb'(?<metrics>.*a?)'$/gm;
const trialLogDir: string = path.join(getExperimentRootDir(), 'trials', commandData["trial"]);
mkDirPSync(trialLogDir);
const trialLogPath: string = path.join(trialLogDir, 'stdout_log_collection.log');
try {
let skipLogging: boolean = false;
if (commandData["tag"] === 'trial' && commandData["msg"] !== undefined) {
const message = commandData["msg"];
const metricsContent: any = message.match(this.NNI_METRICS_PATTERN);
if (metricsContent && metricsContent.groups) {
const message: string = commandData["msg"];
let metricsContent = metricPattern.exec(message);
while (metricsContent && metricsContent.groups) {
const key: string = 'metrics';
const data = metricsContent.groups[key];
const metricData = JSON.parse('"' + data.split('"').join('\\"') + '"');
await this.handleMetricData(commandData["trial"], metricData);
await this.handleMetricData(commandData["trial"], data);
metricsContent = metricPattern.exec(message);
skipLogging = true;
}
}
......@@ -561,7 +716,7 @@ class TrialDispatcher implements TrainingService {
}
private async handleCommand(command: Command): Promise<void> {
this.log.debug(`TrialDispatcher: env ${command.environment.id} received command ${command.command}, data: ${command.data}`);
this.log.debug(`TrialDispatcher: env ${command.environment.id} received command ${command.command}.`);
const environment = command.environment;
const data = command.data;
const nodeId = data["node"];
......@@ -574,9 +729,7 @@ class TrialDispatcher implements TrainingService {
break;
case INITIALIZED:
{
const oldStatus = environment.status;
let isAllReady = true;
if (environment.nodeCount > 1) {
let node = environment.nodes.get(nodeId);
if (node === undefined) {
......@@ -601,9 +754,9 @@ class TrialDispatcher implements TrainingService {
}
// single node is always ready to set env status
if (isAllReady && oldStatus === "UNKNOWN") {
environment.status = "RUNNING";
this.log.info(`TrialDispatcher: env ${environment.id} received initialized message, old status: ${oldStatus}, new status: ${environment.status}.`);
if (isAllReady) {
environment.isRunnerReady = true;
this.log.info(`TrialDispatcher: env ${environment.id} received initialized message and runner is ready, env status: ${environment.status}.`);
}
}
break;
......@@ -621,7 +774,10 @@ class TrialDispatcher implements TrainingService {
}
break;
case GPU_INFO:
environment.gpuSummary.set(nodeId, <GPUSummary>(data));
{
const gpuData = <TrialGpuSummary>(data);
environment.setGpuSummary(nodeId, gpuData);
}
break;
case TRIAL_END:
{
......@@ -647,6 +803,7 @@ class TrialDispatcher implements TrainingService {
}
break;
}
this.shouldUpdateTrials = true;
}
}
......
......@@ -262,6 +262,10 @@
version "2.3.1"
resolved "https://registry.yarnpkg.com/@types/js-base64/-/js-base64-2.3.1.tgz#c39f14f129408a3d96a1105a650d8b2b6eeb4168"
"@types/js-yaml@^3.12.5":
version "3.12.5"
resolved "https://registry.yarnpkg.com/@types/js-yaml/-/js-yaml-3.12.5.tgz#136d5e6a57a931e1cce6f9d8126aa98a9c92a6bb"
"@types/json-schema@^7.0.3":
version "7.0.3"
resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.3.tgz#bdfd69d61e464dcc81b25159c270d75a73c1a636"
......@@ -277,7 +281,6 @@
"@types/minipass@*":
version "2.2.0"
resolved "https://registry.yarnpkg.com/@types/minipass/-/minipass-2.2.0.tgz#51ad404e8eb1fa961f75ec61205796807b6f9651"
integrity sha512-wuzZksN4w4kyfoOv/dlpov4NOunwutLA/q7uc00xU02ZyUY+aoM5PWIXEKBMnm0NHd4a+N71BMjq+x7+2Af1fg==
dependencies:
"@types/node" "*"
......@@ -430,7 +433,6 @@
"@types/tar@^4.0.3":
version "4.0.3"
resolved "https://registry.yarnpkg.com/@types/tar/-/tar-4.0.3.tgz#e2cce0b8ff4f285293243f5971bd7199176ac489"
integrity sha512-Z7AVMMlkI8NTWF0qGhC4QIX0zkV/+y0J8x7b/RsHrN0310+YNjoJd8UrApCiGBCWtKjxS9QhNqLi2UJNToh5hA==
dependencies:
"@types/minipass" "*"
"@types/node" "*"
......@@ -1017,7 +1019,6 @@ chownr@^1.1.2, chownr@^1.1.3:
chownr@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/chownr/-/chownr-2.0.0.tgz#15bfbe53d2eab4cf70f18a8cd68ebe5b3cb1dece"
integrity sha512-bIomtDF5KGpdogkLd9VspvFzk9KfpyyGlS8YFVZl7TGPBHL5snIOnxeshwVgPteQ9b4Eydl+pVbIyE1DcvCWgQ==
ci-info@^1.5.0:
version "1.6.0"
......@@ -1912,7 +1913,6 @@ fs-minipass@^1.2.5:
fs-minipass@^2.0.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/fs-minipass/-/fs-minipass-2.1.0.tgz#7f5036fdbf12c63c169190cbe4199c852271f9fb"
integrity sha512-V/JgOLFCS+R6Vcq0slCuaeWEdNC3ouDlJMNIsacH2VtALiu9mV4LPrHc5cDl8k5aw6J8jwgWWpiTo5RYhmIzvg==
dependencies:
minipass "^3.0.0"
......@@ -2331,7 +2331,6 @@ ignore@^4.0.6:
ignore@^5.1.4:
version "5.1.4"
resolved "https://registry.yarnpkg.com/ignore/-/ignore-5.1.4.tgz#84b7b3dbe64552b6ef0eca99f6743dbec6d97adf"
integrity sha512-MzbUSahkTW1u7JpKKjY7LCARd1fU5W2rLdxlM4kdkayuCwZImjkpluF9CM1aLewYJguPDqewLam18Y6AU69A8A==
import-fresh@^3.0.0:
version "3.2.1"
......@@ -2650,7 +2649,6 @@ istanbul-lib-source-maps@^4.0.0:
istanbul-reports@^3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/istanbul-reports/-/istanbul-reports-3.0.2.tgz#d593210e5000683750cb09fc0644e4b6e27fd53b"
integrity sha512-9tZvz7AiR3PEDNGiV9vIouQ/EAcqMXFmkcA1CDFTwOB98OZVDL0PH9glHotf5Ugp6GCOTypfzGWI/OqjWNCRUw==
dependencies:
html-escaper "^2.0.0"
istanbul-lib-report "^3.0.0"
......@@ -3193,7 +3191,6 @@ minipass@^2.3.5, minipass@^2.8.6, minipass@^2.9.0:
minipass@^3.0.0:
version "3.1.3"
resolved "https://registry.yarnpkg.com/minipass/-/minipass-3.1.3.tgz#7d42ff1f39635482e15f9cdb53184deebd5815fd"
integrity sha512-Mgd2GdMVzY+x3IJ+oHnVM+KG3lA5c8tnabyJKmHSaG2kAGpudxuOf8ToDkhumF7UzME7DecbQE9uOZhNm7PuJg==
dependencies:
yallist "^4.0.0"
......@@ -3212,7 +3209,6 @@ minizlib@^1.2.1:
minizlib@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/minizlib/-/minizlib-2.1.0.tgz#fd52c645301ef09a63a2c209697c294c6ce02cf3"
integrity sha512-EzTZN/fjSvifSX0SlqUERCN39o6T40AMarPbv0MrarSFtIITCBh7bi+dU8nxGFHuqs9jdIAeoYoKuQAAASsPPA==
dependencies:
minipass "^3.0.0"
yallist "^4.0.0"
......@@ -3249,7 +3245,6 @@ mkdirp@^0.5.1:
mkdirp@^1.0.3:
version "1.0.4"
resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-1.0.4.tgz#3eb5ed62622756d79a5f0e2a221dfebad75c2f7e"
integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==
mocha@^7.1.1:
version "7.1.1"
......@@ -3707,7 +3702,6 @@ number-is-nan@^1.0.0:
nyc@^15.0.0:
version "15.0.1"
resolved "https://registry.yarnpkg.com/nyc/-/nyc-15.0.1.tgz#bd4d5c2b17f2ec04370365a5ca1fc0ed26f9f93d"
integrity sha512-n0MBXYBYRqa67IVt62qW1r/d9UH/Qtr7SF1w/nQLJ9KxvWF6b2xCHImRAixHN9tnMMYHC2P14uo6KddNGwMgGg==
dependencies:
"@istanbuljs/load-nyc-config" "^1.0.0"
"@istanbuljs/schema" "^0.1.2"
......@@ -5065,7 +5059,6 @@ tar@^4.4.10, tar@^4.4.12, tar@^4.4.13:
tar@^6.0.2:
version "6.0.2"
resolved "https://registry.yarnpkg.com/tar/-/tar-6.0.2.tgz#5df17813468a6264ff14f766886c622b84ae2f39"
integrity sha512-Glo3jkRtPcvpDlAs/0+hozav78yoXKFr+c4wgw62NNMO3oo4AaJdCo21Uu7lcwr55h39W2XD1LMERc64wtbItg==
dependencies:
chownr "^2.0.0"
fs-minipass "^2.0.0"
......@@ -5541,7 +5534,6 @@ yallist@^3.0.2, yallist@^3.0.3:
yallist@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72"
integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==
yargs-parser@13.1.2, yargs-parser@^13.1.2:
version "13.1.2"
......
......@@ -35,6 +35,8 @@ def update_training_service_config(args):
config[args.ts]['paiConfig']['host'] = args.pai_host
if args.pai_token is not None:
config[args.ts]['paiConfig']['token'] = args.pai_token
if args.pai_reuse is not None:
config[args.ts]['paiConfig']['reuse'] = args.pai_reuse.lower() == 'true'
if args.nni_docker_image is not None:
config[args.ts]['trial']['image'] = args.nni_docker_image
if args.nni_manager_nfs_mount_path is not None:
......@@ -101,6 +103,7 @@ if __name__ == '__main__':
parser.add_argument("--output_dir", type=str)
parser.add_argument("--vc", type=str)
parser.add_argument("--pai_token", type=str)
parser.add_argument("--pai_reuse", type=str)
parser.add_argument("--pai_storage_config_name", type=str)
parser.add_argument("--nni_manager_nfs_mount_path", type=str)
parser.add_argument("--container_nfs_mount_path", type=str)
......
......@@ -57,7 +57,7 @@ jobs:
echo "TEST_IMG:$TEST_IMG"
cd test
python3 nni_test/nnitest/generate_ts_config.py --ts pai --pai_host $(pai_host) --pai_user $(pai_user) --nni_docker_image $TEST_IMG --pai_storage_config_name $(pai_storage_config_name)\
python3 nni_test/nnitest/generate_ts_config.py --ts pai --pai_reuse $(pai_reuse) --pai_host $(pai_host) --pai_user $(pai_user) --nni_docker_image $TEST_IMG --pai_storage_config_name $(pai_storage_config_name)\
--pai_token $(pai_token) --nni_manager_nfs_mount_path $(nni_manager_nfs_mount_path) --container_nfs_mount_path $(container_nfs_mount_path) --nni_manager_ip $(nni_manager_ip) --vc $(virtual_cluster)
PATH=$HOME/.local/bin:$PATH python3 nni_test/nnitest/run_tests.py --config config/integration_tests.yml --ts pai
displayName: 'integration test'
......@@ -14,10 +14,12 @@ def setType(key, valueType):
'''check key type'''
return And(valueType, error=SCHEMA_TYPE_ERROR % (key, valueType.__name__))
def setChoice(key, *args):
'''check choice'''
return And(lambda n: n in args, error=SCHEMA_RANGE_ERROR % (key, str(args)))
def setNumberRange(key, keyType, start, end):
'''check number range'''
return And(
......@@ -25,16 +27,19 @@ def setNumberRange(key, keyType, start, end):
And(lambda n: start <= n <= end, error=SCHEMA_RANGE_ERROR % (key, '(%s,%s)' % (start, end))),
)
def setPathCheck(key):
'''check if path exist'''
return And(os.path.exists, error=SCHEMA_PATH_ERROR % key)
class AlgoSchema:
"""
This class is the schema of 'tuner', 'assessor' and 'advisor' sections of experiment configuraion file.
For example:
AlgoSchema('tuner') creates the schema of tuner section.
"""
def __init__(self, algo_type):
"""
Parameters:
......@@ -108,6 +113,7 @@ class AlgoSchema:
Schema(self.algo_schema).validate(data)
self.validate_extras(data, self.algo_type)
common_schema = {
'authorName': setType('authorName', str),
'experimentName': setType('experimentName', str),
......@@ -138,7 +144,7 @@ common_schema = {
}
common_trial_schema = {
'trial':{
'trial': {
'command': setType('command', str),
'codeDir': setPathCheck('codeDir'),
Optional('gpuNum'): setNumberRange('gpuNum', int, 0, 99999),
......@@ -147,7 +153,7 @@ common_trial_schema = {
}
pai_yarn_trial_schema = {
'trial':{
'trial': {
'command': setType('command', str),
'codeDir': setPathCheck('codeDir'),
'gpuNum': setNumberRange('gpuNum', int, 0, 99999),
......@@ -156,10 +162,10 @@ pai_yarn_trial_schema = {
'image': setType('image', str),
Optional('authFile'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'authFile'),
Optional('shmMB'): setType('shmMB', int),
Optional('dataDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),\
error='ERROR: dataDir format error, dataDir format is hdfs://xxx.xxx.xxx.xxx:xxx'),
Optional('outputDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),\
error='ERROR: outputDir format error, outputDir format is hdfs://xxx.xxx.xxx.xxx:xxx'),
Optional('dataDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),
error='ERROR: dataDir format error, dataDir format is hdfs://xxx.xxx.xxx.xxx:xxx'),
Optional('outputDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),
error='ERROR: outputDir format error, outputDir format is hdfs://xxx.xxx.xxx.xxx:xxx'),
Optional('virtualCluster'): setType('virtualCluster', str),
Optional('nasMode'): setChoice('nasMode', 'classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'),
Optional('portList'): [{
......@@ -184,7 +190,7 @@ pai_yarn_config_schema = {
pai_trial_schema = {
'trial':{
'trial': {
'codeDir': setPathCheck('codeDir'),
'nniManagerNFSMountPath': setPathCheck('nniManagerNFSMountPath'),
'containerNFSMountPath': setType('containerNFSMountPath', str),
......@@ -200,21 +206,21 @@ pai_trial_schema = {
}
pai_config_schema = {
'paiConfig': Or({
'userName': setType('userName', str),
'passWord': setType('passWord', str),
'host': setType('host', str),
Optional('reuse'): setType('reuse', bool)
}, {
'paiConfig': {
'userName': setType('userName', str),
'token': setType('token', str),
Or('passWord', 'token', only_one=True): str,
'host': setType('host', str),
Optional('reuse'): setType('reuse', bool)
})
Optional('reuse'): setType('reuse', bool),
Optional('gpuNum'): setNumberRange('gpuNum', int, 0, 99999),
Optional('cpuNum'): setNumberRange('cpuNum', int, 0, 99999),
Optional('memoryMB'): setType('memoryMB', int),
Optional('maxTrialNumPerGpu'): setType('maxTrialNumPerGpu', int),
Optional('useActiveGpu'): setType('useActiveGpu', bool),
}
}
dlts_trial_schema = {
'trial':{
'trial': {
'command': setType('command', str),
'codeDir': setPathCheck('codeDir'),
'gpuNum': setNumberRange('gpuNum', int, 0, 99999),
......@@ -235,7 +241,7 @@ dlts_config_schema = {
}
aml_trial_schema = {
'trial':{
'trial': {
'codeDir': setPathCheck('codeDir'),
'command': setType('command', str),
'image': setType('image', str),
......@@ -252,7 +258,7 @@ aml_config_schema = {
}
kubeflow_trial_schema = {
'trial':{
'trial': {
'codeDir': setPathCheck('codeDir'),
Optional('nasMode'): setChoice('nasMode', 'classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'),
Optional('ps'): {
......@@ -273,7 +279,7 @@ kubeflow_trial_schema = {
'image': setType('image', str),
Optional('privateRegistryAuthPath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'privateRegistryAuthPath')
},
Optional('worker'):{
Optional('worker'): {
'replicas': setType('replicas', int),
'command': setType('command', str),
'gpuNum': setNumberRange('gpuNum', int, 0, 99999),
......@@ -286,7 +292,7 @@ kubeflow_trial_schema = {
}
kubeflow_config_schema = {
'kubeflowConfig':Or({
'kubeflowConfig': Or({
'operator': setChoice('operator', 'tf-operator', 'pytorch-operator'),
'apiVersion': setType('apiVersion', str),
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'),
......@@ -299,23 +305,23 @@ kubeflow_config_schema = {
'apiVersion': setType('apiVersion', str),
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'),
'keyVault': {
'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\
error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'),
'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\
error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)')
'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'),
'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)')
},
'azureStorage': {
'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),\
error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'),
'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),\
error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)')
'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),
error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'),
'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),
error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)')
},
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999)
})
}
frameworkcontroller_trial_schema = {
'trial':{
'trial': {
'codeDir': setPathCheck('codeDir'),
'taskRoles': [{
'name': setType('name', str),
......@@ -335,7 +341,7 @@ frameworkcontroller_trial_schema = {
}
frameworkcontroller_config_schema = {
'frameworkcontrollerConfig':Or({
'frameworkcontrollerConfig': Or({
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'),
Optional('serviceAccountName'): setType('serviceAccountName', str),
'nfs': {
......@@ -346,23 +352,23 @@ frameworkcontroller_config_schema = {
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'),
Optional('serviceAccountName'): setType('serviceAccountName', str),
'keyVault': {
'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\
error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'),
'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\
error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)')
'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'),
'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)')
},
'azureStorage': {
'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),\
error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'),
'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),\
error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)')
'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),
error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'),
'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),
error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)')
},
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999)
})
}
machine_list_schema = {
'machineList':[Or(
'machineList': [Or(
{
'ip': setType('ip', str),
Optional('port'): setNumberRange('port', int, 1, 65535),
......@@ -395,6 +401,7 @@ training_service_schema_dict = {
'dlts': Schema({**common_schema, **dlts_trial_schema, **dlts_config_schema}),
}
class NNIConfigSchema:
def validate(self, data):
train_service = data['trainingServicePlatform']
......@@ -483,19 +490,25 @@ class NNIConfigSchema:
if not taskRoles_dict:
raise SchemaError('Please set taskRoles in paiConfigPath config file!')
else:
pai_trial_fields_required_list = ['image', 'gpuNum', 'cpuNum', 'memoryMB', 'paiStorageConfigName', 'command']
pai_trial_fields_required_list = ['image', 'paiStorageConfigName', 'command']
for trial_field in pai_trial_fields_required_list:
if experiment_config['trial'].get(trial_field) is None:
raise SchemaError('Please set {0} in trial configuration,\
or set additional pai configuration file path in paiConfigPath!'.format(trial_field))
pai_resource_fields_required_list = ['gpuNum', 'cpuNum', 'memoryMB']
for required_field in pai_resource_fields_required_list:
if experiment_config['trial'].get(required_field) is None and \
experiment_config['paiConfig'].get(required_field) is None:
raise SchemaError('Please set {0} in trial or paiConfig configuration,\
or set additional pai configuration file path in paiConfigPath!'.format(required_field))
def validate_pai_trial_conifg(self, experiment_config):
'''validate the trial config in pai platform'''
if experiment_config.get('trainingServicePlatform') in ['pai', 'paiYarn']:
if experiment_config.get('trial').get('shmMB') and \
experiment_config['trial']['shmMB'] > experiment_config['trial']['memoryMB']:
experiment_config['trial']['shmMB'] > experiment_config['trial']['memoryMB']:
raise SchemaError('shmMB should be no more than memoryMB!')
#backward compatibility
# backward compatibility
warning_information = '{0} is not supported in NNI anymore, please remove the field in config file!\
please refer https://github.com/microsoft/nni/blob/master/docs/en_US/TrainingService/PaiMode.md#run-an-experiment\
for the practices of how to get data and output model in trial code'
......@@ -508,6 +521,6 @@ class NNIConfigSchema:
def validate_eth0_device(self, experiment_config):
'''validate whether the machine has eth0 device'''
if experiment_config.get('trainingServicePlatform') not in ['local'] \
and not experiment_config.get('nniManagerIp') \
and 'eth0' not in netifaces.interfaces():
and not experiment_config.get('nniManagerIp') \
and 'eth0' not in netifaces.interfaces():
raise SchemaError('This machine does not contain eth0 network device, please set nniManagerIp in config file!')
......@@ -57,7 +57,11 @@ class BaseChannel(ABC):
def close(self):
self.is_running = False
self._inner_close()
try:
self._inner_close()
except Exception as err:
# ignore any error on closing
print("error on closing channel: %s" % err)
def send(self, command, data):
"""Send command to Training Service.
......
......@@ -82,7 +82,11 @@ class RemoteLogger(object):
'''
constructor
'''
self.logger = logging.getLogger('nni_syslog_{}'.format(tag))
logger_name = 'nni_syslog_{}'.format(tag)
# to prevent multiple trial logged in same logger
if trial_id is not None:
logger_name = '{}_{}'.format(logger_name, trial_id)
self.logger = logging.getLogger(logger_name)
self.log_level = log_level
self.logger.setLevel(self.log_level)
self.pipeReader = None
......
......@@ -86,11 +86,17 @@ class Trial:
break
time.sleep(0.1)
trial_command = self.args.trial_command
gpuIndices = self.data.get('gpuIndices')
if (gpuIndices is not None):
trial_command = 'CUDA_VISIBLE_DEVICES="%s " %s' % (gpuIndices, trial_command)
self.log_pipe_stdout = self.trial_syslogger_stdout.get_pipelog_reader()
self.process = Popen(self.args.trial_command, shell=True, stdout=self.log_pipe_stdout,
self.process = Popen(trial_command, shell=True, stdout=self.log_pipe_stdout,
stderr=self.log_pipe_stdout, cwd=trial_code_dir, env=dict(environ))
nni_log(LogType.Info, '{0}: spawns a subprocess (pid {1}) to run command: {2}'.
format(self.name, self.process.pid, shlex.split(self.args.trial_command)))
format(self.name, self.process.pid, shlex.split(trial_command)))
def save_parameter_file(self, command_data):
parameters = command_data["parameters"]
......
......@@ -37,9 +37,9 @@ class WebChannel(BaseChannel):
def _inner_close(self):
if self.client is not None:
self.client.close()
if self._event_loop.is_running():
self._event_loop.close()
self.client = None
if self._event_loop.is_running():
self._event_loop.stop()
self._event_loop = None
def _inner_send(self, message):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment