Unverified Commit fa339ca3 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Fix hybrid pipeline (#4287)

parent fde9e1a0
...@@ -209,31 +209,9 @@ class TrialDispatcher implements TrainingService { ...@@ -209,31 +209,9 @@ class TrialDispatcher implements TrainingService {
} }
} }
public async run(): Promise<void> { private getStorageService(environmentService: EnvironmentService): StorageService {
await Promise.all(this.environmentServiceList.map(env => env.init()));
for(const environmentService of this.environmentServiceList) {
const runnerSettings: RunnerSettings = new RunnerSettings();
runnerSettings.nniManagerIP = this.config.nniManagerIp === undefined? await getIPV4Address() : this.config.nniManagerIp;
runnerSettings.nniManagerPort = getBasePort() + 1;
runnerSettings.commandChannel = environmentService.getCommandChannel.channelName;
runnerSettings.enableGpuCollector = this.enableGpuScheduler;
runnerSettings.command = this.config.trialCommand;
runnerSettings.nniManagerVersion = this.enableVersionCheck ? await getVersion() : '';
runnerSettings.logCollection = this.logCollection;
runnerSettings.platform = environmentService.getName;
runnerSettings.experimentId = this.experimentId;
await environmentService.getCommandChannel.start();
this.log.info(`TrialDispatcher: started channel: ${environmentService.getCommandChannel.constructor.name}`);
this.log.info(`TrialDispatcher: copying code and settings.`);
let storageService: StorageService; let storageService: StorageService;
if (this.useSharedStorage) { if (this.useSharedStorage) {
if (this.fileCopyCompleted) {
this.log.debug(`TrialDispatcher: file already copy to shared storage.`);
continue;
}
this.log.debug(`TrialDispatcher: use shared storage service.`); this.log.debug(`TrialDispatcher: use shared storage service.`);
storageService = component.get<SharedStorageService>(SharedStorageService).storageService; storageService = component.get<SharedStorageService>(SharedStorageService).storageService;
} else if (environmentService.hasStorageService) { } else if (environmentService.hasStorageService) {
...@@ -245,6 +223,25 @@ class TrialDispatcher implements TrainingService { ...@@ -245,6 +223,25 @@ class TrialDispatcher implements TrainingService {
const environmentLocalTempFolder = path.join(this.experimentRootDir, "environment-temp"); const environmentLocalTempFolder = path.join(this.experimentRootDir, "environment-temp");
storageService.initialize(this.config.trialCodeDirectory, environmentLocalTempFolder); storageService.initialize(this.config.trialCodeDirectory, environmentLocalTempFolder);
} }
return storageService;
}
public async run(): Promise<void> {
await Promise.all(this.environmentServiceList.map(env => env.init()));
for(const environmentService of this.environmentServiceList) {
await environmentService.getCommandChannel.start();
this.log.info(`TrialDispatcher: started channel: ${environmentService.getCommandChannel.constructor.name}`);
this.log.info(`TrialDispatcher: copying code.`);
if (this.useSharedStorage) {
if (this.fileCopyCompleted) {
continue;
}
}
const storageService: StorageService = this.getStorageService(environmentService);
// Copy the compressed file to remoteDirectory and delete it // Copy the compressed file to remoteDirectory and delete it
const codeDir = path.resolve(this.config.trialCodeDirectory); const codeDir = path.resolve(this.config.trialCodeDirectory);
const envDir = storageService.joinPath("envs"); const envDir = storageService.joinPath("envs");
...@@ -256,9 +253,6 @@ class TrialDispatcher implements TrainingService { ...@@ -256,9 +253,6 @@ class TrialDispatcher implements TrainingService {
await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT, installFileName); await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT, installFileName);
await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT_FOR_WIN, installFileNameForWin); await storageService.save(CONTAINER_INSTALL_NNI_SHELL_FORMAT_FOR_WIN, installFileNameForWin);
const runnerSettingsConfig = storageService.joinPath(envDir, "settings.json");
await storageService.save(JSON.stringify(runnerSettings), runnerSettingsConfig);
if (this.isDeveloping) { if (this.isDeveloping) {
let trialToolsPath = path.join(__dirname, "../../../../../tools/nni_trial_tool"); let trialToolsPath = path.join(__dirname, "../../../../../tools/nni_trial_tool");
if (false === fs.existsSync(trialToolsPath)) { if (false === fs.existsSync(trialToolsPath)) {
...@@ -655,6 +649,27 @@ class TrialDispatcher implements TrainingService { ...@@ -655,6 +649,27 @@ class TrialDispatcher implements TrainingService {
} }
} }
private async setEnvironmentSetting(environment: EnvironmentInformation): Promise<void> {
if (environment.environmentService === undefined) {
throw new Error(`Environmentservice for ${environment.id} not initialized!`);
}
const environmentService = environment.environmentService;
const runnerSettings: RunnerSettings = new RunnerSettings();
runnerSettings.nniManagerIP = this.config.nniManagerIp === undefined? await getIPV4Address() : this.config.nniManagerIp;
runnerSettings.nniManagerPort = getBasePort() + 1;
runnerSettings.commandChannel = environmentService.getCommandChannel.channelName;
runnerSettings.enableGpuCollector = this.enableGpuScheduler;
runnerSettings.command = this.config.trialCommand;
runnerSettings.nniManagerVersion = this.enableVersionCheck ? await getVersion() : '';
runnerSettings.logCollection = this.logCollection;
runnerSettings.platform = environmentService.getName;
runnerSettings.experimentId = this.experimentId;
const storageService: StorageService = this.getStorageService(environmentService);
const envDir = storageService.joinPath("envs");
const runnerSettingsConfig = storageService.joinPath(envDir, environment.id, "settings.json");
await storageService.save(JSON.stringify(runnerSettings), runnerSettingsConfig);
}
private async requestEnvironment(environmentService: EnvironmentService): Promise<void> { private async requestEnvironment(environmentService: EnvironmentService): Promise<void> {
if (this.stopping) { if (this.stopping) {
this.log.info(`Experiment is stopping, stop creating new environment`); this.log.info(`Experiment is stopping, stop creating new environment`);
...@@ -674,6 +689,8 @@ class TrialDispatcher implements TrainingService { ...@@ -674,6 +689,8 @@ class TrialDispatcher implements TrainingService {
environment.command = `mkdir -p envs/${envId} && cd envs/${envId} && ${environment.command}`; environment.command = `mkdir -p envs/${envId} && cd envs/${envId} && ${environment.command}`;
environment.useSharedStorage = this.useSharedStorage; environment.useSharedStorage = this.useSharedStorage;
// Generate setting.json file per environment to avoid conflict
await this.setEnvironmentSetting(environment);
await environmentService.startEnvironment(environment); await environmentService.startEnvironment(environment);
this.environments.set(environment.id, environment); this.environments.set(environment.id, environment);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment