Unverified Commit 392e55f3 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support remote training service use reuse mode (#2923)

parent 1ae8a0db
......@@ -592,6 +592,16 @@ Specifies the pre-command that will be executed before the remote machine execut
__Note__: Because __preCommand__ will execute before other commands each time, it is strongly not recommended to set __preCommand__ that will make changes to system, i.e. `mkdir` or `touch`.
### remoteConfig
Optional field in remote mode. Users could set per machine information in `machineList` field, and set global configuration for remote mode in this field.
#### reuse
Optional. Bool. default: `false`. It's an experimental feature.
If it's true, NNI will reuse remote jobs to run as many as possible trials. It can save time of creating new jobs. User needs to make sure each trial can run independent in same job, for example, avoid loading checkpoint from previous trials.
### kubeflowConfig
#### operator
......
......@@ -23,9 +23,6 @@ import { KubeflowTrainingService } from './training_service/kubernetes/kubeflow/
import { LocalTrainingService } from './training_service/local/localTrainingService';
import { RouterTrainingService } from './training_service/reusable/routerTrainingService';
import { PAIYarnTrainingService } from './training_service/pai/paiYarn/paiYarnTrainingService';
import {
RemoteMachineTrainingService
} from './training_service/remote_machine/remoteMachineTrainingService';
import { DLTSTrainingService } from './training_service/dlts/dltsTrainingService';
function initStartupInfo(
......@@ -43,7 +40,7 @@ async function initContainer(foreground: boolean, platformMode: string, logFileN
.scope(Scope.Singleton);
} else if (platformMode === 'remote') {
Container.bind(TrainingService)
.to(RemoteMachineTrainingService)
.to(RouterTrainingService)
.scope(Scope.Singleton);
} else if (platformMode === 'pai') {
Container.bind(TrainingService)
......
......@@ -166,6 +166,9 @@ export namespace ValidationSchemas {
}),
nni_manager_ip: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
nniManagerIp: joi.string().min(1)
}),
remote_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
reuse: joi.boolean()
})
}
};
......
......@@ -10,6 +10,7 @@ export enum TrialConfigMetadataKey {
MACHINE_LIST = 'machine_list',
LOCAL_CONFIG = 'local_config',
TRIAL_CONFIG = 'trial_config',
REMOTE_CONFIG = 'remote_config',
EXPERIMENT_ID = 'experimentId',
MULTI_PHASE = 'multiPhase',
RANDOM_SCHEDULER = 'random_scheduler',
......
......@@ -125,8 +125,14 @@ export abstract class EnvironmentService {
public abstract get hasStorageService(): boolean;
public abstract config(key: string, value: string): Promise<void>;
public abstract refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void>;
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>;
public abstract stopEnvironment(environment: EnvironmentInformation): Promise<void>;
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>;
// It is used to set prefetched environment count, default value is 0 for OpenPAI and AML mode,
// in remote mode, this value is set to the length of machine list.
public get prefetchedEnvironmentCount(): number {
return 0;
}
// It depends on environment pressure and settings
// for example, OpenPAI relies on API calls, and there is an limitation for frequence, so it need to be bigger.
......
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as fs from 'fs';
import * as path from 'path';
import * as component from '../../../common/component';
import { getExperimentId } from '../../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../../common/log';
import { EnvironmentInformation, EnvironmentService } from '../environment';
import {
getExperimentRootDir,
} from '../../../common/utils';
import { TrialConfig } from '../../common/trialConfig';
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey';
import { execMkdir, validateCodeDir } from '../../common/util';
import {
ExecutorManager, RemoteMachineMeta,
} from '../../remote_machine/remoteMachineData';
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor';
import { RemoteMachineEnvironmentInformation } from '../remote/remoteConfig';
@component.Singleton
export class RemoteEnvironmentService extends EnvironmentService {
private readonly initExecutorId = "initConnection";
private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>;
private readonly environmentExecutorManagerMap: Map<string, ExecutorManager>;
private readonly remoteMachineMetaOccupiedMap: Map<RemoteMachineMeta, boolean>;
private trialConfig: TrialConfig | undefined;
private readonly log: Logger;
private sshConnectionPromises: any[];
private experimentRootDir: string;
private experimentId: string;
constructor() {
super();
this.experimentId = getExperimentId();
this.environmentExecutorManagerMap = new Map<string, ExecutorManager>();
this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>();
this.remoteMachineMetaOccupiedMap = new Map<RemoteMachineMeta, boolean>();
this.sshConnectionPromises = [];
this.experimentRootDir = getExperimentRootDir();
this.experimentId = getExperimentId();
this.log = getLogger();
}
public get prefetchedEnvironmentCount(): number {
return this.machineExecutorManagerMap.size;
}
public get environmentMaintenceLoopInterval(): number {
return 5000;
}
public get hasMoreEnvironments(): boolean {
return false;
}
public get hasStorageService(): boolean {
return false;
}
public async config(key: string, value: string): Promise<void> {
switch (key) {
case TrialConfigMetadataKey.MACHINE_LIST:
await this.setupConnections(value);
break;
case TrialConfigMetadataKey.TRIAL_CONFIG: {
const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
// Parse trial config failed, throw Error
if (remoteMachineTrailConfig === undefined) {
throw new Error('trial config parsed failed');
}
// codeDir is not a valid directory, throw Error
if (!fs.lstatSync(remoteMachineTrailConfig.codeDir)
.isDirectory()) {
throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`);
}
try {
// Validate to make sure codeDir doesn't have too many files
await validateCodeDir(remoteMachineTrailConfig.codeDir);
} catch (error) {
this.log.error(error);
return Promise.reject(new Error(error));
}
this.trialConfig = remoteMachineTrailConfig;
break;
}
default:
this.log.debug(`Remote not support metadata key: '${key}', value: '${value}'`);
}
}
private scheduleMachine(): RemoteMachineMeta | undefined {
for (const [rmMeta, occupied] of this.remoteMachineMetaOccupiedMap) {
if (!occupied) {
this.remoteMachineMetaOccupiedMap.set(rmMeta, true);
return rmMeta;
}
}
return undefined;
}
private async setupConnections(machineList: string): Promise<void> {
this.log.debug(`Connecting to remote machines: ${machineList}`);
//TO DO: verify if value's format is wrong, and json parse failed, how to handle error
const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
for (const rmMeta of rmMetaList) {
this.sshConnectionPromises.push(await this.initRemoteMachineOnConnected(rmMeta));
}
}
private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta): Promise<void> {
const executorManager: ExecutorManager = new ExecutorManager(rmMeta);
this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`);
const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId);
this.log.debug(`reached ${executor.name}`);
this.machineExecutorManagerMap.set(rmMeta, executorManager);
this.log.debug(`initializing ${executor.name}`);
// Create root working directory after executor is ready
const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni');
await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId()));
// the directory to store temp scripts in remote machine
const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId());
// clean up previous result.
await executor.createFolder(remoteGpuScriptCollectorDir, true);
await executor.allowPermission(true, nniRootDir);
}
private async refreshEnvironment(environment: EnvironmentInformation): Promise<void> {
const executor = await this.getExecutor(environment.id);
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`;
const runnerReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`;
if (fs.existsSync(jobpidPath)) {
/* eslint-disable require-atomic-updates */
try {
const isAlive = await executor.isProcessAlive(jobpidPath);
// if the process of jobpid is not alive any more
if (!isAlive) {
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
if (remoteEnvironment.rmMachineMeta === undefined) {
throw new Error(`${remoteEnvironment.id} machine meta not initialized!`);
}
this.log.info(`pid in ${remoteEnvironment.rmMachineMeta.ip}:${jobpidPath} is not alive!`);
if (fs.existsSync(runnerReturnCodeFilePath)) {
const runnerReturnCode: string = await executor.getRemoteFileContent(runnerReturnCodeFilePath);
const match: RegExpMatchArray | null = runnerReturnCode.trim()
.match(/^-?(\d+)\s+(\d+)$/);
if (match !== null) {
const { 1: code } = match;
// Update trial job's status based on result code
if (parseInt(code, 10) === 0) {
environment.setStatus('SUCCEEDED');
} else {
environment.setStatus('FAILED');
}
this.releaseEnvironmentResource(environment);
}
}
}
} catch (error) {
this.releaseEnvironmentResource(environment);
this.log.error(`Update job status exception, error is ${error.message}`);
}
}
}
public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
const tasks: Promise<void>[] = [];
environments.forEach(async (environment) => {
tasks.push(this.refreshEnvironment(environment));
});
await Promise.all(tasks);
}
/**
* If a environment is finished, release the connection resource
* @param environment remote machine environment job detail
*/
private releaseEnvironmentResource(environment: EnvironmentInformation): void {
const executorManager = this.environmentExecutorManagerMap.get(environment.id);
if (executorManager === undefined) {
throw new Error(`ExecutorManager is not assigned for environment ${environment.id}`);
}
// Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager.
executorManager.releaseExecutor(environment.id);
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
if (remoteEnvironment.rmMachineMeta === undefined) {
throw new Error(`${remoteEnvironment.id} rmMachineMeta not initialized!`);
}
this.remoteMachineMetaOccupiedMap.set(remoteEnvironment.rmMachineMeta, false);
}
public async startEnvironment(environment: EnvironmentInformation): Promise<void> {
if (this.sshConnectionPromises.length > 0) {
await Promise.all(this.sshConnectionPromises);
this.log.info('ssh connection initialized!');
// set sshConnectionPromises to [] to avoid log information duplicated
this.sshConnectionPromises = [];
if (this.trialConfig === undefined) {
throw new Error("trial config not initialized!");
}
Array.from(this.machineExecutorManagerMap.keys()).forEach(rmMeta => {
// initialize remoteMachineMetaOccupiedMap, false means not occupied
this.remoteMachineMetaOccupiedMap.set(rmMeta, false);
});
}
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
remoteEnvironment.status = 'WAITING';
// schedule machine for environment, generate command
await this.prepareEnvironment(remoteEnvironment);
// launch runner process in machine
await this.launchRunner(environment);
}
private async prepareEnvironment(environment: RemoteMachineEnvironmentInformation): Promise<boolean> {
if (this.trialConfig === undefined) {
throw new Error('trial config is not initialized');
}
// get an executor from scheduler
const rmMachineMeta: RemoteMachineMeta | undefined = this.scheduleMachine();
if (rmMachineMeta === undefined) {
this.log.warning(`No available machine!`);
return Promise.resolve(false);
} else {
environment.rmMachineMeta = rmMachineMeta;
const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(environment.rmMachineMeta);
if (executorManager === undefined) {
throw new Error(`executorManager not initialized`);
}
this.environmentExecutorManagerMap.set(environment.id, executorManager);
const executor = await this.getExecutor(environment.id);
environment.runnerWorkingFolder =
executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()),
'envs', environment.id)
environment.command = `cd ${environment.runnerWorkingFolder} && \
${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \
&& echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`;
return Promise.resolve(true);
}
}
private async launchRunner(environment: RemoteMachineEnvironmentInformation): Promise<void> {
if (this.trialConfig === undefined) {
throw new Error('trial config is not initialized');
}
const executor = await this.getExecutor(environment.id);
const environmentLocalTempFolder: string =
path.join(this.experimentRootDir, this.experimentId, "environment-temp")
await executor.createFolder(environment.runnerWorkingFolder);
await execMkdir(environmentLocalTempFolder);
await fs.promises.writeFile(path.join(environmentLocalTempFolder, executor.getScriptName("run")),
environment.command, { encoding: 'utf8' });
// Copy files in codeDir to remote working directory
await executor.copyDirectoryToRemote(environmentLocalTempFolder, environment.runnerWorkingFolder);
// Execute command in remote machine
executor.executeScript(executor.joinPath(environment.runnerWorkingFolder,
executor.getScriptName("run")), true, false);
environment.status = 'RUNNING';
if (environment.rmMachineMeta === undefined) {
throw new Error(`${environment.id} rmMachineMeta not initialized!`);
}
environment.trackingUrl = `file://${environment.rmMachineMeta.ip}:${environment.runnerWorkingFolder}`;
}
private async getExecutor(environmentId: string): Promise<ShellExecutor> {
const executorManager = this.environmentExecutorManagerMap.get(environmentId);
if (executorManager === undefined) {
throw new Error(`ExecutorManager is not assigned for environment ${environmentId}`);
}
return await executorManager.getExecutor(environmentId);
}
public async stopEnvironment(environment: EnvironmentInformation): Promise<void> {
const executor = await this.getExecutor(environment.id);
if (environment.status === 'UNKNOWN') {
environment.status = 'USER_CANCELED';
this.releaseEnvironmentResource(environment);
return
}
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`;
try {
await executor.killChildProcesses(jobpidPath);
this.releaseEnvironmentResource(environment);
} catch (error) {
this.log.error(`stopEnvironment: ${error}`);
}
}
}
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { EnvironmentInformation } from '../environment';
import { RemoteMachineMeta } from '../../remote_machine/remoteMachineData';
/**
* RemoteMachineEnvironmentInformation
*/
export class RemoteMachineEnvironmentInformation extends EnvironmentInformation {
public rmMachineMeta?: RemoteMachineMeta;
}
export class RemoteConfig {
public readonly reuse: boolean;
/**
* Constructor
* @param reuse If job is reusable for multiple trials
*/
constructor(reuse: boolean) {
this.reuse = reuse;
}
}
......@@ -12,12 +12,15 @@ import { delay } from '../../common/utils';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { PAIClusterConfig } from '../pai/paiConfig';
import { PAIK8STrainingService } from '../pai/paiK8S/paiK8STrainingService';
import { RemoteMachineTrainingService } from '../remote_machine/remoteMachineTrainingService';
import { EnvironmentService } from './environment';
import { OpenPaiEnvironmentService } from './environments/openPaiEnvironmentService';
import { AMLEnvironmentService } from './environments/amlEnvironmentService';
import { RemoteEnvironmentService } from './environments/remoteEnvironmentService';
import { MountedStorageService } from './storages/mountedStorageService';
import { StorageService } from './storageService';
import { TrialDispatcher } from './trialDispatcher';
import { RemoteConfig } from './remote/remoteConfig';
/**
......@@ -146,6 +149,18 @@ class RouterTrainingService implements TrainingService {
await this.internalTrainingService.setClusterMetadata(key, value);
this.metaDataCache.clear();
} else if (key === TrialConfigMetadataKey.REMOTE_CONFIG) {
const config = <RemoteConfig>JSON.parse(value);
if (config.reuse === true) {
this.log.info(`reuse flag enabled, use EnvironmentManager.`);
this.internalTrainingService = component.get(TrialDispatcher);
Container.bind(EnvironmentService)
.to(RemoteEnvironmentService)
.scope(Scope.Singleton);
} else {
this.log.debug(`caching metadata key:{} value:{}, as training service is not determined.`);
this.internalTrainingService = component.get(RemoteMachineTrainingService);
}
} else {
this.log.debug(`caching metadata key:{} value:{}, as training service is not determined.`);
this.metaDataCache.set(key, value);
......
......@@ -235,7 +235,7 @@ class TrialDispatcher implements TrainingService {
}
await storageService.copyDirectory(trialToolsPath, envDir, true);
}
await this.prefetchEnvironments();
this.log.info(`TrialDispatcher: run loop started.`);
await Promise.all([
this.environmentMaintenanceLoop(),
......@@ -579,6 +579,15 @@ class TrialDispatcher implements TrainingService {
}
}
private async prefetchEnvironments (): Promise<void> {
const environmentService = component.get<EnvironmentService>(EnvironmentService);
const number = environmentService.prefetchedEnvironmentCount;
this.log.info(`Initialize environments total number: ${number}`);
for (let index = 0; index < number; index++) {
await this.requestEnvironment();
}
}
private async requestEnvironment(): Promise<void> {
if (this.commandChannel === undefined) {
throw new Error(`TrialDispatcher: commandChannel shouldn't be undefined in requestEnvironment.`);
......
......@@ -372,6 +372,12 @@ frameworkcontroller_config_schema = {
})
}
remote_config_schema = {
Optional('remoteConfig'): {
'reuse': setType('reuse', bool)
}
}
machine_list_schema = {
'machineList': [Or(
{
......@@ -399,7 +405,7 @@ machine_list_schema = {
training_service_schema_dict = {
'local': Schema({**common_schema, **common_trial_schema}),
'remote': Schema({**common_schema, **common_trial_schema, **machine_list_schema}),
'remote': Schema({**common_schema, **common_trial_schema, **machine_list_schema, **remote_config_schema}),
'pai': Schema({**common_schema, **pai_trial_schema, **pai_config_schema}),
'paiYarn': Schema({**common_schema, **pai_yarn_trial_schema, **pai_yarn_config_schema}),
'kubeflow': Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema}),
......
......@@ -138,6 +138,7 @@ def set_remote_config(experiment_config, port, config_file_name):
'''Call setClusterMetadata to pass trial'''
#set machine_list
request_data = dict()
request_data['remote_config'] = experiment_config['remoteConfig']
request_data['machine_list'] = experiment_config['machineList']
if request_data['machine_list']:
for i in range(len(request_data['machine_list'])):
......@@ -301,7 +302,6 @@ def set_experiment(experiment_config, mode, port, config_file_name):
request_data['maxTrialNum'] = experiment_config['maxTrialNum']
request_data['searchSpace'] = experiment_config.get('searchSpace')
request_data['trainingServicePlatform'] = experiment_config.get('trainingServicePlatform')
if experiment_config.get('description'):
request_data['description'] = experiment_config['description']
if experiment_config.get('multiPhase'):
......@@ -332,7 +332,6 @@ def set_experiment(experiment_config, mode, port, config_file_name):
request_data['versionCheck'] = experiment_config.get('versionCheck')
if experiment_config.get('logCollection'):
request_data['logCollection'] = experiment_config.get('logCollection')
request_data['clusterMetaData'] = []
if experiment_config['trainingServicePlatform'] == 'local':
request_data['clusterMetaData'].append(
......@@ -344,6 +343,11 @@ def set_experiment(experiment_config, mode, port, config_file_name):
{'key': 'machine_list', 'value': experiment_config['machineList']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
if not experiment_config.get('remoteConfig'):
# set default value of reuse in remoteConfig to False
experiment_config['remoteConfig'] = {'reuse': False}
request_data['clusterMetaData'].append(
{'key': 'remote_config', 'value': experiment_config['remoteConfig']})
elif experiment_config['trainingServicePlatform'] == 'pai':
request_data['clusterMetaData'].append(
{'key': 'pai_config', 'value': experiment_config['paiConfig']})
......
......@@ -27,6 +27,10 @@ def main_loop(args):
gpu_refresh_last_time = datetime.now() - timedelta(minutes=1)
try:
if args.job_pid_file:
with open(args.job_pid_file, 'w') as job_file:
job_file.write("%d" % os.getpid())
trials = dict()
command_channel = args.command_channel
......@@ -143,6 +147,7 @@ if __name__ == '__main__':
PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager')
PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trial runner')
PARSER.add_argument('--node_count', type=int, help='number of nodes, it determines how to consume command and save code file')
PARSER.add_argument('--job_pid_file', type=str, help='save trial runner process pid')
args, unknown = PARSER.parse_known_args()
setting_file = "settings.json"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment