Unverified Commit 6d591989 authored by chicm-ms's avatar chicm-ms Committed by GitHub
Browse files

Refactoring nnimanager log (#652)

* Pull code (#22)

* Support distributed job for frameworkcontroller (#612)

support distributed job for frameworkcontroller

* Multiphase doc (#519)

* multiPhase doc

* updates

* updates

* Add time parser for 'nnictl update duration' (#632)

Current nnictl update duration only support seconds unit, add a parser for this command to support {s, m, h, d}

* fix experiment state bug (#629)

* update top README.md (#622)

* Update README.md

* update (#634)

* Integration tests refactoring (#625)

* Integration test refactoring (#21) (#616)

* Integration test refactoring (#21)

* Refactoring integration tests

* test metrics

* update azure pipeline

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* updates

* update trigger

* Integration test refactoring (#618)

* updates

* updates

* update pipeline (#619)

* update pipeline

* updates

* updates

* updates

* updates

* updates

* test pipeline (#623)

* test pipeline

* updates

* updates

* updates

* Update integration test (#624)

* Update integration test

* updates

* updates

* updates

* updates

* updates

* updates

* Revert "Pull code (#22)"

This reverts commit 62fc165ad7b2ba724eead3b99f010aa34491e2c7.

* Update nnimanager logs

* updates

* Update README.md

* Revert "Update README.md"

This reverts commit bc67061160e5d57305a6e7fb63d491d12d0e9002.

* updates

* updates
parent d9c83c0c
...@@ -73,7 +73,7 @@ class BufferSerialEmitter { ...@@ -73,7 +73,7 @@ class BufferSerialEmitter {
@component.Singleton @component.Singleton
class Logger { class Logger {
private DEFAULT_LOGFILE: string = path.join(getLogDir(), 'nnimanager.log'); private DEFAULT_LOGFILE: string = path.join(getLogDir(), 'nnimanager.log');
private level: number = DEBUG; private level: number = INFO;
private bufferSerialEmitter: BufferSerialEmitter; private bufferSerialEmitter: BufferSerialEmitter;
private writable: Writable; private writable: Writable;
...@@ -153,4 +153,4 @@ function getLogger(fileName?: string): Logger { ...@@ -153,4 +153,4 @@ function getLogger(fileName?: string): Logger {
return component.get(Logger); return component.get(Logger);
} }
export { Logger, getLogger }; export { Logger, getLogger, logLevelNameMap };
...@@ -23,6 +23,7 @@ import { MetricDataRecord, MetricType, TrialJobInfo } from './datastore'; ...@@ -23,6 +23,7 @@ import { MetricDataRecord, MetricType, TrialJobInfo } from './datastore';
import { TrialJobStatus } from './trainingService'; import { TrialJobStatus } from './trainingService';
type ProfileUpdateType = 'TRIAL_CONCURRENCY' | 'MAX_EXEC_DURATION' | 'SEARCH_SPACE' | 'MAX_TRIAL_NUM'; type ProfileUpdateType = 'TRIAL_CONCURRENCY' | 'MAX_EXEC_DURATION' | 'SEARCH_SPACE' | 'MAX_TRIAL_NUM';
type ExperimentStatus = 'INITIALIZED' | 'RUNNING' | 'ERROR' | 'STOPPING' | 'STOPPED' | 'DONE' | 'NO_MORE_TRIAL' | 'TUNER_NO_MORE_TRIAL';
interface ExperimentParams { interface ExperimentParams {
authorName: string; authorName: string;
...@@ -85,7 +86,7 @@ interface TrialJobStatistics { ...@@ -85,7 +86,7 @@ interface TrialJobStatistics {
} }
interface NNIManagerStatus { interface NNIManagerStatus {
status: 'INITIALIZED' | 'RUNNING' | 'ERROR' | 'STOPPING' | 'STOPPED' | 'DONE' | 'NO_MORE_TRIAL' | 'TUNER_NO_MORE_TRIAL'; status: ExperimentStatus;
errors: string[]; errors: string[];
} }
...@@ -109,4 +110,4 @@ abstract class Manager { ...@@ -109,4 +110,4 @@ abstract class Manager {
public abstract getStatus(): NNIManagerStatus; public abstract getStatus(): NNIManagerStatus;
} }
export { Manager, ExperimentParams, ExperimentProfile, TrialJobStatistics, ProfileUpdateType, NNIManagerStatus }; export { Manager, ExperimentParams, ExperimentProfile, TrialJobStatistics, ProfileUpdateType, NNIManagerStatus, ExperimentStatus };
...@@ -29,7 +29,7 @@ import { NNIError } from '../common/errors'; ...@@ -29,7 +29,7 @@ import { NNIError } from '../common/errors';
import { getExperimentId, setInitTrialSequenceId } from '../common/experimentStartupInfo'; import { getExperimentId, setInitTrialSequenceId } from '../common/experimentStartupInfo';
import { getLogger, Logger } from '../common/log'; import { getLogger, Logger } from '../common/log';
import { import {
ExperimentParams, ExperimentProfile, Manager, ExperimentParams, ExperimentProfile, Manager, ExperimentStatus,
NNIManagerStatus, ProfileUpdateType, TrialJobStatistics NNIManagerStatus, ProfileUpdateType, TrialJobStatistics
} from '../common/manager'; } from '../common/manager';
import { import {
...@@ -112,12 +112,13 @@ class NNIManager implements Manager { ...@@ -112,12 +112,13 @@ class NNIManager implements Manager {
} }
public async cancelTrialJobByUser(trialJobId: string): Promise<void> { public async cancelTrialJobByUser(trialJobId: string): Promise<void> {
this.log.info(`User cancelTrialJob: ${trialJobId}`);
await this.trainingService.cancelTrialJob(trialJobId); await this.trainingService.cancelTrialJob(trialJobId);
await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, ''); await this.dataStore.storeTrialJobEvent('USER_TO_CANCEL', trialJobId, '');
} }
public async startExperiment(expParams: ExperimentParams): Promise<string> { public async startExperiment(expParams: ExperimentParams): Promise<string> {
this.log.debug(`Starting experiment: ${this.experimentProfile.id}`); this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
this.experimentProfile.params = expParams; this.experimentProfile.params = expParams;
await this.storeExperimentProfile(); await this.storeExperimentProfile();
this.log.debug('Setup tuner...'); this.log.debug('Setup tuner...');
...@@ -138,7 +139,7 @@ class NNIManager implements Manager { ...@@ -138,7 +139,7 @@ class NNIManager implements Manager {
checkpointDir); checkpointDir);
this.experimentProfile.startTime = Date.now(); this.experimentProfile.startTime = Date.now();
this.status.status = 'RUNNING'; this.setStatus('RUNNING');
await this.storeExperimentProfile(); await this.storeExperimentProfile();
this.run().catch((err: Error) => { this.run().catch((err: Error) => {
this.criticalError(err); this.criticalError(err);
...@@ -148,6 +149,7 @@ class NNIManager implements Manager { ...@@ -148,6 +149,7 @@ class NNIManager implements Manager {
} }
public async resumeExperiment(): Promise<void> { public async resumeExperiment(): Promise<void> {
this.log.info(`Resuming experiment: ${this.experimentProfile.id}`);
//Fetch back the experiment profile //Fetch back the experiment profile
const experimentId: string = getExperimentId(); const experimentId: string = getExperimentId();
this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId); this.experimentProfile = await this.dataStore.getExperimentProfile(experimentId);
...@@ -185,7 +187,7 @@ class NNIManager implements Manager { ...@@ -185,7 +187,7 @@ class NNIManager implements Manager {
this.experimentProfile.endTime) { this.experimentProfile.endTime) {
delete this.experimentProfile.endTime; delete this.experimentProfile.endTime;
} }
this.status.status = 'RUNNING'; this.setStatus('RUNNING');
// TO DO: update database record for resume event // TO DO: update database record for resume event
this.run().catch((err: Error) => { this.run().catch((err: Error) => {
...@@ -200,6 +202,7 @@ class NNIManager implements Manager { ...@@ -200,6 +202,7 @@ class NNIManager implements Manager {
} }
public async setClusterMetadata(key: string, value: string): Promise<void> { public async setClusterMetadata(key: string, value: string): Promise<void> {
this.log.info(`NNIManager setClusterMetadata, key: ${key}, value: ${value}`);
let timeoutId: NodeJS.Timer; let timeoutId: NodeJS.Timer;
// TO DO: move timeout value to constants file // TO DO: move timeout value to constants file
const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => { const delay1: Promise<{}> = new Promise((resolve: Function, reject: Function): void => {
...@@ -223,10 +226,10 @@ class NNIManager implements Manager { ...@@ -223,10 +226,10 @@ class NNIManager implements Manager {
} }
public async stopExperiment(): Promise<void> { public async stopExperiment(): Promise<void> {
this.status.status = 'STOPPING'; this.setStatus('STOPPING');
this.log.info('Experiment done, cleaning up...'); this.log.info('Stopping experiment, cleaning up ...');
await this.experimentDoneCleanUp(); await this.experimentDoneCleanUp();
this.log.info('Experiment done.'); this.log.info('Experiment stopped.');
} }
public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> { public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
...@@ -334,6 +337,7 @@ class NNIManager implements Manager { ...@@ -334,6 +337,7 @@ class NNIManager implements Manager {
if (trialJob.status === 'RUNNING' || if (trialJob.status === 'RUNNING' ||
trialJob.status === 'WAITING') { trialJob.status === 'WAITING') {
try { try {
this.log.info(`cancelTrialJob: ${trialJob.id}`);
await this.trainingService.cancelTrialJob(trialJob.id); await this.trainingService.cancelTrialJob(trialJob.id);
} catch (error) { } catch (error) {
// pid does not exist, do nothing here // pid does not exist, do nothing here
...@@ -343,7 +347,7 @@ class NNIManager implements Manager { ...@@ -343,7 +347,7 @@ class NNIManager implements Manager {
await this.trainingService.cleanUp(); await this.trainingService.cleanUp();
this.experimentProfile.endTime = Date.now(); this.experimentProfile.endTime = Date.now();
await this.storeExperimentProfile(); await this.storeExperimentProfile();
this.status.status = 'STOPPED'; this.setStatus('STOPPED');
} }
private async periodicallyUpdateExecDuration(): Promise<void> { private async periodicallyUpdateExecDuration(): Promise<void> {
...@@ -365,8 +369,8 @@ class NNIManager implements Manager { ...@@ -365,8 +369,8 @@ class NNIManager implements Manager {
throw new Error('Error: tuner has not been setup'); throw new Error('Error: tuner has not been setup');
} }
while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) { while (!['ERROR', 'STOPPING', 'STOPPED'].includes(this.status.status)) {
await delay(1000 * 5);
this.dispatcher.sendCommand(PING); this.dispatcher.sendCommand(PING);
await delay(1000 * 5);
} }
} }
...@@ -379,6 +383,7 @@ class NNIManager implements Manager { ...@@ -379,6 +383,7 @@ class NNIManager implements Manager {
const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId); const trialJobDetail: TrialJobDetail = await this.trainingService.getTrialJob(trialJobId);
const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId); const oldTrialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) { if (oldTrialJobDetail !== undefined && oldTrialJobDetail.status !== trialJobDetail.status) {
this.log.info(`Trial job ${trialJobDetail.id} status changed from ${oldTrialJobDetail.status} to ${trialJobDetail.status}`);
this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail)); this.trialJobs.set(trialJobId, Object.assign({}, trialJobDetail));
await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail); await this.dataStore.storeTrialJobEvent(trialJobDetail.status, trialJobDetail.id, undefined, trialJobDetail);
} }
...@@ -478,12 +483,12 @@ class NNIManager implements Manager { ...@@ -478,12 +483,12 @@ class NNIManager implements Manager {
if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration || if (this.experimentProfile.execDuration > this.experimentProfile.params.maxExecDuration ||
this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) { this.currSubmittedTrialNum >= this.experimentProfile.params.maxTrialNum) {
if (this.status.status !== 'DONE') { if (this.status.status !== 'DONE') {
this.status.status = 'NO_MORE_TRIAL'; this.setStatus('NO_MORE_TRIAL');
waitSubmittedToFinish = this.currSubmittedTrialNum; waitSubmittedToFinish = this.currSubmittedTrialNum;
assert(allFinishedTrialJobNum <= waitSubmittedToFinish); assert(allFinishedTrialJobNum <= waitSubmittedToFinish);
if (allFinishedTrialJobNum >= waitSubmittedToFinish) { if (allFinishedTrialJobNum >= waitSubmittedToFinish) {
this.status.status = 'DONE'; this.setStatus('DONE');
this.experimentProfile.endTime = Date.now(); this.experimentProfile.endTime = Date.now();
await this.storeExperimentProfile(); await this.storeExperimentProfile();
// write this log for travis CI // write this log for travis CI
...@@ -496,7 +501,7 @@ class NNIManager implements Manager { ...@@ -496,7 +501,7 @@ class NNIManager implements Manager {
await this.storeExperimentProfile(); await this.storeExperimentProfile();
} }
if (this.status.status !== 'TUNER_NO_MORE_TRIAL') { if (this.status.status !== 'TUNER_NO_MORE_TRIAL') {
this.status.status = 'RUNNING'; this.setStatus('RUNNING');
} }
for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) { for (let i: number = this.trialJobs.size; i < this.experimentProfile.params.trialConcurrency; i++) {
if (this.waitingTrials.length === 0 || if (this.waitingTrials.length === 0 ||
...@@ -515,6 +520,7 @@ class NNIManager implements Manager { ...@@ -515,6 +520,7 @@ class NNIManager implements Manager {
index: 0 index: 0
} }
}; };
this.log.info(`submitTrialJob: form: ${JSON.stringify(trialJobAppForm)}`);
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm); const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(trialJobAppForm);
await this.storeMaxSequenceId(trialJobDetail.sequenceId); await this.storeMaxSequenceId(trialJobDetail.sequenceId);
this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail)); this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
...@@ -558,6 +564,7 @@ class NNIManager implements Manager { ...@@ -558,6 +564,7 @@ class NNIManager implements Manager {
} }
private addEventListeners(): void { private addEventListeners(): void {
this.log.info('Add event listeners');
// TO DO: cannot run this method more than once in one NNIManager instance // TO DO: cannot run this method more than once in one NNIManager instance
if (this.dispatcher === undefined) { if (this.dispatcher === undefined) {
throw new Error('Error: tuner or job maintainer have not been setup'); throw new Error('Error: tuner or job maintainer have not been setup');
...@@ -585,6 +592,7 @@ class NNIManager implements Manager { ...@@ -585,6 +592,7 @@ class NNIManager implements Manager {
} }
private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> { private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
this.log.debug(`NNIManager received trial job metrics: ${metric}`);
await this.dataStore.storeMetricData(metric.id, metric.data); await this.dataStore.storeMetricData(metric.id, metric.data);
if (this.dispatcher === undefined) { if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup'); throw new Error('Error: tuner has not been setup');
...@@ -612,7 +620,7 @@ class NNIManager implements Manager { ...@@ -612,7 +620,7 @@ class NNIManager implements Manager {
} }
private async onTunerCommand(commandType: string, content: string): Promise<void> { private async onTunerCommand(commandType: string, content: string): Promise<void> {
this.log.info(`Command from tuner: ${commandType}, ${content}`); this.log.info(`NNIManaer received command from dispatcher: ${commandType}, ${content}`);
switch (commandType) { switch (commandType) {
case INITIALIZED: case INITIALIZED:
// Tuner is intialized, search space is set, request tuner to generate hyper parameters // Tuner is intialized, search space is set, request tuner to generate hyper parameters
...@@ -621,7 +629,7 @@ class NNIManager implements Manager { ...@@ -621,7 +629,7 @@ class NNIManager implements Manager {
case NEW_TRIAL_JOB: case NEW_TRIAL_JOB:
if (this.status.status === 'TUNER_NO_MORE_TRIAL') { if (this.status.status === 'TUNER_NO_MORE_TRIAL') {
this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set'); this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
this.status.status = 'RUNNING'; this.setStatus('RUNNING');
} }
this.waitingTrials.push(content); this.waitingTrials.push(content);
break; break;
...@@ -637,14 +645,16 @@ class NNIManager implements Manager { ...@@ -637,14 +645,16 @@ class NNIManager implements Manager {
index: tunerCommand.parameter_index index: tunerCommand.parameter_index
} }
}; };
this.log.info(`updateTrialJob: job id: ${tunerCommand.trial_job_id}, form: ${JSON.stringify(trialJobForm)}`);
await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm); await this.trainingService.updateTrialJob(tunerCommand.trial_job_id, trialJobForm);
await this.dataStore.storeTrialJobEvent( await this.dataStore.storeTrialJobEvent(
'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined); 'ADD_HYPERPARAMETER', tunerCommand.trial_job_id, content, undefined);
break; break;
case NO_MORE_TRIAL_JOBS: case NO_MORE_TRIAL_JOBS:
this.status.status = 'TUNER_NO_MORE_TRIAL'; this.setStatus('TUNER_NO_MORE_TRIAL');
break; break;
case KILL_TRIAL_JOB: case KILL_TRIAL_JOB:
this.log.info(`cancelTrialJob: ${JSON.parse(content)}`);
await this.trainingService.cancelTrialJob(JSON.parse(content), true); await this.trainingService.cancelTrialJob(JSON.parse(content), true);
break; break;
default: default:
...@@ -662,7 +672,14 @@ class NNIManager implements Manager { ...@@ -662,7 +672,14 @@ class NNIManager implements Manager {
this.log.error(err.stack); this.log.error(err.stack);
} }
this.status.errors.push(err.message); this.status.errors.push(err.message);
this.status.status = 'ERROR'; this.setStatus('ERROR');
}
private setStatus(status: ExperimentStatus): void {
if (status !== this.status.status) {
this.log.info(`Change NNIManager status from: ${this.status.status} to: ${status}`);
this.status.status = status;
}
} }
private createEmptyExperimentProfile(): ExperimentProfile { private createEmptyExperimentProfile(): ExperimentProfile {
......
...@@ -32,6 +32,7 @@ import { ...@@ -32,6 +32,7 @@ import {
TrialJobEvent, TrialJobEvent,
TrialJobEventRecord TrialJobEventRecord
} from '../common/datastore'; } from '../common/datastore';
import { getLogger, Logger } from '../common/log';
import { ExperimentProfile } from '../common/manager'; import { ExperimentProfile } from '../common/manager';
import { TrialJobDetail } from '../common/trainingService'; import { TrialJobDetail } from '../common/trainingService';
...@@ -95,6 +96,7 @@ function loadMetricData(row: any): MetricDataRecord { ...@@ -95,6 +96,7 @@ function loadMetricData(row: any): MetricDataRecord {
class SqlDB implements Database { class SqlDB implements Database {
private db!: sqlite3.Database; private db!: sqlite3.Database;
private log: Logger = getLogger();
private initTask!: Deferred<void>; private initTask!: Deferred<void>;
public init(createNew: boolean, dbDir: string): Promise<void> { public init(createNew: boolean, dbDir: string): Promise<void> {
...@@ -102,6 +104,7 @@ class SqlDB implements Database { ...@@ -102,6 +104,7 @@ class SqlDB implements Database {
return this.initTask.promise; return this.initTask.promise;
} }
this.initTask = new Deferred<void>(); this.initTask = new Deferred<void>();
this.log.debug(`Database directory: ${dbDir}`);
assert(fs.existsSync(dbDir)); assert(fs.existsSync(dbDir));
// tslint:disable-next-line:no-bitwise // tslint:disable-next-line:no-bitwise
...@@ -144,7 +147,7 @@ class SqlDB implements Database { ...@@ -144,7 +147,7 @@ class SqlDB implements Database {
exp.maxSequenceId, exp.maxSequenceId,
exp.revision exp.revision
]; ];
this.log.trace(`storeExperimentProfile: SQL: ${sql}, args: ${JSON.stringify(args)}`);
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<void> = new Deferred<void>();
this.db.run(sql, args, (err: Error | null) => { this.resolve(deferred, err); }); this.db.run(sql, args, (err: Error | null) => { this.resolve(deferred, err); });
...@@ -161,7 +164,7 @@ class SqlDB implements Database { ...@@ -161,7 +164,7 @@ class SqlDB implements Database {
sql = 'select * from ExperimentProfile where id=? and revision=?'; sql = 'select * from ExperimentProfile where id=? and revision=?';
args = [experimentId, revision]; args = [experimentId, revision];
} }
this.log.trace(`queryExperimentProfile: SQL: ${sql}, args: ${JSON.stringify(args)}`);
const deferred: Deferred<ExperimentProfile[]> = new Deferred<ExperimentProfile[]>(); const deferred: Deferred<ExperimentProfile[]> = new Deferred<ExperimentProfile[]>();
this.db.all(sql, args, (err: Error | null, rows: any[]) => { this.db.all(sql, args, (err: Error | null, rows: any[]) => {
this.resolve(deferred, err, rows, loadExperimentProfile); this.resolve(deferred, err, rows, loadExperimentProfile);
...@@ -183,6 +186,7 @@ class SqlDB implements Database { ...@@ -183,6 +186,7 @@ class SqlDB implements Database {
const sequenceId: number | undefined = jobDetail === undefined ? undefined : jobDetail.sequenceId; const sequenceId: number | undefined = jobDetail === undefined ? undefined : jobDetail.sequenceId;
const args: any[] = [timestamp, trialJobId, event, hyperParameter, logPath, sequenceId]; const args: any[] = [timestamp, trialJobId, event, hyperParameter, logPath, sequenceId];
this.log.trace(`storeTrialJobEvent: SQL: ${sql}, args: ${JSON.stringify(args)}`);
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<void> = new Deferred<void>();
this.db.run(sql, args, (err: Error | null) => { this.resolve(deferred, err); }); this.db.run(sql, args, (err: Error | null) => { this.resolve(deferred, err); });
...@@ -205,6 +209,7 @@ class SqlDB implements Database { ...@@ -205,6 +209,7 @@ class SqlDB implements Database {
args = [trialJobId, event]; args = [trialJobId, event];
} }
this.log.trace(`queryTrialJobEvent: SQL: ${sql}, args: ${JSON.stringify(args)}`);
const deferred: Deferred<TrialJobEventRecord[]> = new Deferred<TrialJobEventRecord[]>(); const deferred: Deferred<TrialJobEventRecord[]> = new Deferred<TrialJobEventRecord[]>();
this.db.all(sql, args, (err: Error | null, rows: any[]) => { this.db.all(sql, args, (err: Error | null, rows: any[]) => {
this.resolve(deferred, err, rows, loadTrialJobEvent); this.resolve(deferred, err, rows, loadTrialJobEvent);
...@@ -218,6 +223,7 @@ class SqlDB implements Database { ...@@ -218,6 +223,7 @@ class SqlDB implements Database {
const json: MetricDataRecord = JSON.parse(data); const json: MetricDataRecord = JSON.parse(data);
const args: any[] = [Date.now(), json.trialJobId, json.parameterId, json.type, json.sequence, JSON.stringify(json.data)]; const args: any[] = [Date.now(), json.trialJobId, json.parameterId, json.type, json.sequence, JSON.stringify(json.data)];
this.log.trace(`storeMetricData: SQL: ${sql}, args: ${JSON.stringify(args)}`);
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<void> = new Deferred<void>();
this.db.run(sql, args, (err: Error | null) => { this.resolve(deferred, err); }); this.db.run(sql, args, (err: Error | null) => { this.resolve(deferred, err); });
...@@ -240,6 +246,7 @@ class SqlDB implements Database { ...@@ -240,6 +246,7 @@ class SqlDB implements Database {
args = [trialJobId, metricType]; args = [trialJobId, metricType];
} }
this.log.trace(`queryMetricData: SQL: ${sql}, args: ${JSON.stringify(args)}`);
const deferred: Deferred<MetricDataRecord[]> = new Deferred<MetricDataRecord[]>(); const deferred: Deferred<MetricDataRecord[]> = new Deferred<MetricDataRecord[]>();
this.db.all(sql, args, (err: Error | null, rows: any[]) => { this.db.all(sql, args, (err: Error | null, rows: any[]) => {
this.resolve(deferred, err, rows, loadMetricData); this.resolve(deferred, err, rows, loadMetricData);
...@@ -268,6 +275,7 @@ class SqlDB implements Database { ...@@ -268,6 +275,7 @@ class SqlDB implements Database {
for (const row of (<any[]>rows)) { for (const row of (<any[]>rows)) {
data.push(rowLoader(row)); data.push(rowLoader(row));
} }
this.log.trace(`sql query result: ${JSON.stringify(data)}`);
(<Deferred<T[]>>deferred).resolve(data); (<Deferred<T[]>>deferred).resolve(data);
} }
} }
......
...@@ -25,7 +25,7 @@ import * as component from './common/component'; ...@@ -25,7 +25,7 @@ import * as component from './common/component';
import * as fs from 'fs'; import * as fs from 'fs';
import { Database, DataStore } from './common/datastore'; import { Database, DataStore } from './common/datastore';
import { setExperimentStartupInfo } from './common/experimentStartupInfo'; import { setExperimentStartupInfo } from './common/experimentStartupInfo';
import { getLogger, Logger } from './common/log'; import { getLogger, Logger, logLevelNameMap } from './common/log';
import { Manager } from './common/manager'; import { Manager } from './common/manager';
import { TrainingService } from './common/trainingService'; import { TrainingService } from './common/trainingService';
import { parseArg, uniqueString, mkDirP, getLogDir } from './common/utils'; import { parseArg, uniqueString, mkDirP, getLogDir } from './common/utils';
...@@ -111,7 +111,7 @@ if (logDir.length > 0) { ...@@ -111,7 +111,7 @@ if (logDir.length > 0) {
} }
const logLevel: string = parseArg(['--log_level', '-ll']); const logLevel: string = parseArg(['--log_level', '-ll']);
if (logLevel.length > 0 && !['debug', 'info', 'error', 'warning', 'critical'].includes(logLevel)) { if (logLevel.length > 0 && !logLevelNameMap.has(logLevel)) {
console.log(`FATAL: invalid log_level: ${logLevel}`); console.log(`FATAL: invalid log_level: ${logLevel}`);
} }
......
...@@ -30,20 +30,17 @@ import { getLogger, Logger } from '../common/log'; ...@@ -30,20 +30,17 @@ import { getLogger, Logger } from '../common/log';
import { ExperimentProfile, Manager, TrialJobStatistics} from '../common/manager'; import { ExperimentProfile, Manager, TrialJobStatistics} from '../common/manager';
import { ValidationSchemas } from './restValidationSchemas'; import { ValidationSchemas } from './restValidationSchemas';
import { NNIRestServer } from './nniRestServer'; import { NNIRestServer } from './nniRestServer';
import { TensorBoard } from './tensorboard';
const expressJoi = require('express-joi-validator'); const expressJoi = require('express-joi-validator');
class NNIRestHandler { class NNIRestHandler {
private restServer: NNIRestServer; private restServer: NNIRestServer;
private nniManager: Manager; private nniManager: Manager;
private tb: TensorBoard;
private log: Logger; private log: Logger;
constructor(rs: NNIRestServer) { constructor(rs: NNIRestServer) {
this.nniManager = component.get(Manager); this.nniManager = component.get(Manager);
this.restServer = rs; this.restServer = rs;
this.tb = new TensorBoard();
this.log = getLogger(); this.log = getLogger();
} }
...@@ -52,10 +49,7 @@ class NNIRestHandler { ...@@ -52,10 +49,7 @@ class NNIRestHandler {
// tslint:disable-next-line:typedef // tslint:disable-next-line:typedef
router.use((req: Request, res: Response, next) => { router.use((req: Request, res: Response, next) => {
// Don't log useless empty body content this.log.debug(`${req.method}: ${req.url}: body:\n${JSON.stringify(req.body, undefined, 4)}`);
if(req.body && Object.keys(req.body).length > 0) {
this.log.info(`${req.method}: ${req.url}: body:\n${JSON.stringify(req.body, undefined, 4)}`);
}
res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Origin', '*');
res.header('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept'); res.header('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept');
res.header('Access-Control-Allow-Methods', 'PUT,POST,GET,DELETE,OPTIONS'); res.header('Access-Control-Allow-Methods', 'PUT,POST,GET,DELETE,OPTIONS');
...@@ -76,10 +70,6 @@ class NNIRestHandler { ...@@ -76,10 +70,6 @@ class NNIRestHandler {
this.addTrialJob(router); this.addTrialJob(router);
this.cancelTrialJob(router); this.cancelTrialJob(router);
this.getMetricData(router); this.getMetricData(router);
this.getExample(router);
this.getTriedParameters(router);
this.startTensorBoard(router);
this.stopTensorBoard(router);
// Express-joi-validator configuration // Express-joi-validator configuration
router.use((err: any, req: Request, res: Response, next: any) => { router.use((err: any, req: Request, res: Response, next: any) => {
...@@ -104,12 +94,12 @@ class NNIRestHandler { ...@@ -104,12 +94,12 @@ class NNIRestHandler {
}); });
// If it's a fatal error, exit process // If it's a fatal error, exit process
if(isFatal) { if (isFatal) {
this.log.fatal(err); this.log.fatal(err);
process.exit(1); process.exit(1);
} else {
this.log.error(err);
} }
this.log.error(err);
} }
private version(router: Router): void { private version(router: Router): void {
...@@ -125,10 +115,11 @@ class NNIRestHandler { ...@@ -125,10 +115,11 @@ class NNIRestHandler {
const ds: DataStore = component.get<DataStore>(DataStore); const ds: DataStore = component.get<DataStore>(DataStore);
ds.init().then(() => { ds.init().then(() => {
res.send(this.nniManager.getStatus()); res.send(this.nniManager.getStatus());
this.log.info('Datastore initialization done');
}).catch(async (err: Error) => { }).catch(async (err: Error) => {
this.handle_error(err, res); this.handle_error(err, res);
this.log.error(err.message); this.log.error(err.message);
this.log.error(`Database initialize failed, stopping rest server...`); this.log.error(`Datastore initialize failed, stopping rest server...`);
await this.restServer.stop(); await this.restServer.stop();
}); });
}); });
...@@ -259,41 +250,6 @@ class NNIRestHandler { ...@@ -259,41 +250,6 @@ class NNIRestHandler {
}); });
} }
private startTensorBoard(router: Router): void {
router.post('/tensorboard', expressJoi(ValidationSchemas.STARTTENSORBOARD), async (req: Request, res: Response) => {
const jobIds: string[] = req.query.job_ids.split(',');
const tensorboardCmd: string | undefined = req.query.tensorboard_cmd;
this.tb.startTensorBoard(jobIds, tensorboardCmd).then((endPoint: string) => {
res.send({endPoint: endPoint});
}).catch((err: Error) => {
this.handle_error(err, res);
});
});
}
private stopTensorBoard(router: Router): void {
router.delete('/tensorboard', expressJoi(ValidationSchemas.STOPTENSORBOARD), async (req: Request, res: Response) => {
const endPoint: string = req.query.endpoint;
this.tb.stopTensorBoard(endPoint).then(() => {
res.send();
}).catch((err: Error) => {
this.handle_error(err, res);
});
});
}
private getExample(router: Router): void {
// tslint:disable-next-line:no-empty
router.get('/example', async (req: Request, res: Response) => {
});
}
private getTriedParameters(router: Router): void {
// tslint:disable-next-line:no-empty
router.get('/tried-parameters', async (req: Request, res: Response) => {
});
}
private setErrorPathForFailedJob(jobInfo: TrialJobInfo): TrialJobInfo { private setErrorPathForFailedJob(jobInfo: TrialJobInfo): TrialJobInfo {
if (jobInfo === undefined || jobInfo.status !== 'FAILED' || jobInfo.logPath === undefined) { if (jobInfo === undefined || jobInfo.status !== 'FAILED' || jobInfo.logPath === undefined) {
return jobInfo; return jobInfo;
......
...@@ -187,14 +187,4 @@ export namespace ValidationSchemas { ...@@ -187,14 +187,4 @@ export namespace ValidationSchemas {
maxSequenceId: joi.number() maxSequenceId: joi.number()
} }
}; };
export const STARTTENSORBOARD = {
query: {
job_ids: joi.string().min(5).max(5).required()
}
};
export const STOPTENSORBOARD = {
query: {
endpoint: joi.string().uri().required()
}
};
} }
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as component from '../common/component';
import { DataStore, TrialJobInfo } from '../common/datastore';
import { NNIErrorNames } from '../common/errors';
import { getLogger, Logger } from '../common/log';
import { HostJobApplicationForm, TrainingService, TrialJobStatus } from '../common/trainingService';
export class TensorBoard {
private DEFAULT_PORT: number = 6006;
private TENSORBOARD_COMMAND: string = 'PATH=$PATH:~/.local/bin:/usr/local/bin tensorboard';
private tbJobMap: Map<string, string>;
private trainingService: TrainingService;
private dataStore: DataStore;
private log: Logger = getLogger();
constructor() {
this.tbJobMap = new Map();
this.trainingService = component.get(TrainingService);
this.dataStore = component.get(DataStore);
}
public async startTensorBoard(trialJobIds: string[], tbCmd?: string, port?: number): Promise<string> {
let tensorBoardPort: number = this.DEFAULT_PORT;
if (port !== undefined) {
tensorBoardPort = port;
}
const host: string = await this.getJobHost(trialJobIds);
const tbEndpoint: string = `http://${host}:${tensorBoardPort}`;
try {
if (await this.isTensorBoardRunningOnHost(host)) {
await this.stopHostTensorBoard(host);
}
} catch (error) {
if (error.name !== NNIErrorNames.NOT_FOUND) {
throw error;
} else {
this.tbJobMap.delete(host);
}
}
const logDirs: string[] = [];
for (const id of trialJobIds) {
logDirs.push(await this.getLogDir(id));
}
let tensorBoardCmd: string = this.TENSORBOARD_COMMAND;
if (tbCmd !== undefined && tbCmd.trim().length > 0) {
tensorBoardCmd = tbCmd;
}
const cmd: string = `${tensorBoardCmd} --logdir ${logDirs.join(':')} --port ${tensorBoardPort}`;
const form: HostJobApplicationForm = {
jobType: 'HOST',
host: host,
cmd: cmd
};
const jobId: string = (await this.trainingService.submitTrialJob(form)).id;
this.tbJobMap.set(host, jobId);
return tbEndpoint;
}
public async cleanUp(): Promise<void> {
const stopTensorBoardTasks: Promise<void>[] = [];
this.tbJobMap.forEach((jobId: string, host: string) => {
stopTensorBoardTasks.push(this.stopHostTensorBoard(host).catch((err: Error) => {
this.log.error(`Error occurred stopping tensorboard service: ${err.message}`);
}));
});
await Promise.all(stopTensorBoardTasks);
}
public stopTensorBoard(endPoint: string): Promise<void> {
const host: string = this.getEndPointHost(endPoint);
return this.stopHostTensorBoard(host);
}
private stopHostTensorBoard(host: string): Promise<void> {
const jobId: string | undefined = this.tbJobMap.get(host);
if (jobId === undefined) {
return Promise.resolve();
}
return this.trainingService.cancelTrialJob(jobId);
}
private async isTensorBoardRunningOnHost(host: string): Promise<boolean> {
const jobId: string | undefined = this.tbJobMap.get(host);
if (jobId === undefined) {
return false;
}
const status: TrialJobStatus = (await this.trainingService.getTrialJob(jobId)).status;
return ['RUNNING', 'WAITING'].includes(status);
}
private async getJobHost(trialJobIds: string[]): Promise<string> {
if (trialJobIds === undefined || trialJobIds.length < 1) {
throw new Error('No trail job specified.');
}
const jobInfo: TrialJobInfo = await this.dataStore.getTrialJob(trialJobIds[0]);
const logPath: string | undefined = jobInfo.logPath;
if (logPath === undefined) {
throw new Error(`Failed to find job logPath: ${jobInfo.id}`);
}
return logPath.split('://')[1].split(':')[0]; //TODO use url parse
}
private async getLogDir(trialJobId: string): Promise<string> {
const jobInfo: TrialJobInfo = await this.dataStore.getTrialJob(trialJobId);
const logPath: string | undefined = jobInfo.logPath;
if (logPath === undefined) {
throw new Error(`Failed to find job logPath: ${jobInfo.id}`);
}
return logPath.split('://')[1].split(':')[1]; //TODO use url parse
}
private getEndPointHost(endPoint: string): string {
const parts = endPoint.match(/.*:\/\/(.*):(.*)/);
if (parts !== null) {
return parts[1];
} else {
throw new Error(`Invalid endPoint: ${endPoint}`);
}
}
}
...@@ -61,9 +61,11 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -61,9 +61,11 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap); this.kubeflowJobInfoCollector = new KubeflowJobInfoCollector(this.trialJobsMap);
this.experimentId = getExperimentId(); this.experimentId = getExperimentId();
this.nextTrialSequenceId = -1; this.nextTrialSequenceId = -1;
this.log.info('Construct Kubeflow training service.');
} }
public async run(): Promise<void> { public async run(): Promise<void> {
this.log.info('Run Kubeflow training service.');
this.kubernetesJobRestServer = component.get(KubeflowJobRestServer); this.kubernetesJobRestServer = component.get(KubeflowJobRestServer);
if(!this.kubernetesJobRestServer) { if(!this.kubernetesJobRestServer) {
throw new Error('kubernetesJobRestServer not initialized!'); throw new Error('kubernetesJobRestServer not initialized!');
...@@ -75,6 +77,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber ...@@ -75,6 +77,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
await delay(3000); await delay(3000);
await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient); await this.kubeflowJobInfoCollector.retrieveTrialStatus(this.kubernetesCRDClient);
} }
this.log.info('Kubeflow training service exit.');
} }
public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> { public async submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
......
...@@ -90,7 +90,7 @@ class LocalTrialJobDetail implements TrialJobDetail { ...@@ -90,7 +90,7 @@ class LocalTrialJobDetail implements TrialJobDetail {
} }
/** /**
* Local training service * Local machine training service
*/ */
class LocalTrainingService implements TrainingService { class LocalTrainingService implements TrainingService {
private eventEmitter: EventEmitter; private eventEmitter: EventEmitter;
...@@ -114,9 +114,11 @@ class LocalTrainingService implements TrainingService { ...@@ -114,9 +114,11 @@ class LocalTrainingService implements TrainingService {
this.log = getLogger(); this.log = getLogger();
this.trialSequenceId = -1; this.trialSequenceId = -1;
this.streams = new Array<ts.Stream>(); this.streams = new Array<ts.Stream>();
this.log.info('Construct local machine training service.');
} }
public async run(): Promise<void> { public async run(): Promise<void> {
this.log.info('Run local machine training service.');
while (!this.stopping) { while (!this.stopping) {
while (this.jobQueue.length !== 0) { while (this.jobQueue.length !== 0) {
const trialJobId: string = this.jobQueue[0]; const trialJobId: string = this.jobQueue[0];
...@@ -133,6 +135,7 @@ class LocalTrainingService implements TrainingService { ...@@ -133,6 +135,7 @@ class LocalTrainingService implements TrainingService {
} }
await delay(5000); await delay(5000);
} }
this.log.info('Local machine training service exit.');
} }
public async listTrialJobs(): Promise<TrialJobDetail[]> { public async listTrialJobs(): Promise<TrialJobDetail[]> {
...@@ -180,7 +183,7 @@ class LocalTrainingService implements TrainingService { ...@@ -180,7 +183,7 @@ class LocalTrainingService implements TrainingService {
} catch (error) { } catch (error) {
//ignore //ignore
} }
this.log.info(`trailJob status update: ${trialJobId}, ${trialJob.status}`); this.log.debug(`trailJob status update: ${trialJobId}, ${trialJob.status}`);
} }
} }
...@@ -196,7 +199,6 @@ class LocalTrainingService implements TrainingService { ...@@ -196,7 +199,6 @@ class LocalTrainingService implements TrainingService {
} }
public submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> { public submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
if (form.jobType === 'HOST') { if (form.jobType === 'HOST') {
return this.runHostJob(<HostJobApplicationForm>form); return this.runHostJob(<HostJobApplicationForm>form);
} else if (form.jobType === 'TRIAL') { } else if (form.jobType === 'TRIAL') {
...@@ -247,7 +249,6 @@ class LocalTrainingService implements TrainingService { ...@@ -247,7 +249,6 @@ class LocalTrainingService implements TrainingService {
} }
public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> { public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
this.log.info(`cancelTrialJob: ${trialJobId}`);
const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId); const trialJob: LocalTrialJobDetail | undefined = this.jobMap.get(trialJobId);
if (trialJob === undefined) { if (trialJob === undefined) {
throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found'); throw new NNIError(NNIErrorNames.NOT_FOUND, 'Trial job not found');
...@@ -303,6 +304,7 @@ class LocalTrainingService implements TrainingService { ...@@ -303,6 +304,7 @@ class LocalTrainingService implements TrainingService {
} }
public cleanUp(): Promise<void> { public cleanUp(): Promise<void> {
this.log.info('Stopping local machine training service...');
this.stopping = true; this.stopping = true;
for (const stream of this.streams) { for (const stream of this.streams) {
stream.destroy(); stream.destroy();
......
...@@ -87,9 +87,11 @@ class PAITrainingService implements TrainingService { ...@@ -87,9 +87,11 @@ class PAITrainingService implements TrainingService {
this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?'; this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?';
this.nextTrialSequenceId = -1; this.nextTrialSequenceId = -1;
this.paiTokenUpdateInterval = 7200000; //2hours this.paiTokenUpdateInterval = 7200000; //2hours
this.log.info('Construct OpenPAI training service.');
} }
public async run(): Promise<void> { public async run(): Promise<void> {
this.log.info('Run PAI training service.');
const restServer: PAIJobRestServer = component.get(PAIJobRestServer); const restServer: PAIJobRestServer = component.get(PAIJobRestServer);
await restServer.start(); await restServer.start();
...@@ -99,6 +101,7 @@ class PAITrainingService implements TrainingService { ...@@ -99,6 +101,7 @@ class PAITrainingService implements TrainingService {
await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig); await this.paiJobCollector.retrieveTrialStatus(this.paiToken, this.paiClusterConfig);
await delay(3000); await delay(3000);
} }
this.log.info('PAI training service exit.');
} }
public async listTrialJobs(): Promise<TrialJobDetail[]> { public async listTrialJobs(): Promise<TrialJobDetail[]> {
...@@ -446,6 +449,7 @@ class PAITrainingService implements TrainingService { ...@@ -446,6 +449,7 @@ class PAITrainingService implements TrainingService {
} }
public async cleanUp(): Promise<void> { public async cleanUp(): Promise<void> {
this.log.info('Stopping PAI training service...');
this.stopping = true; this.stopping = true;
const deferred : Deferred<void> = new Deferred<void>(); const deferred : Deferred<void> = new Deferred<void>();
......
...@@ -64,7 +64,7 @@ export class MetricsCollector { ...@@ -64,7 +64,7 @@ export class MetricsCollector {
if (trialJobDetail.status !== 'EARLY_STOPPED') { if (trialJobDetail.status !== 'EARLY_STOPPED') {
trialJobDetail.status = jobMetrics.jobStatus; trialJobDetail.status = jobMetrics.jobStatus;
} }
this.log.info(`Set trialjob ${trialJobDetail.id} status to ${trialJobDetail.status}`); this.log.debug(`Set trialjob ${trialJobDetail.id} status to ${trialJobDetail.status}`);
runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => { runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => {
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined // If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
if(rmMeta.gpuReservation !== undefined) { if(rmMeta.gpuReservation !== undefined) {
......
...@@ -81,12 +81,14 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -81,12 +81,14 @@ class RemoteMachineTrainingService implements TrainingService {
this.timer = timer; this.timer = timer;
this.log = getLogger(); this.log = getLogger();
this.trialSequenceId = -1; this.trialSequenceId = -1;
this.log.info('Construct remote machine training service.');
} }
/** /**
* Loop to launch trial jobs and collect trial metrics * Loop to launch trial jobs and collect trial metrics
*/ */
public async run(): Promise<void> { public async run(): Promise<void> {
this.log.info('Run remote machine training service.');
while (!this.stopping) { while (!this.stopping) {
while (this.jobQueue.length > 0) { while (this.jobQueue.length > 0) {
const trialJobId: string = this.jobQueue[0]; const trialJobId: string = this.jobQueue[0];
...@@ -105,6 +107,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -105,6 +107,7 @@ class RemoteMachineTrainingService implements TrainingService {
await metricsCollector.collectMetrics(); await metricsCollector.collectMetrics();
await delay(3000); await delay(3000);
} }
this.log.info('Remote machine training service exit.');
} }
/** /**
...@@ -171,8 +174,6 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -171,8 +174,6 @@ class RemoteMachineTrainingService implements TrainingService {
* @param form trial job description form * @param form trial job description form
*/ */
public submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> { public submitTrialJob(form: JobApplicationForm): Promise<TrialJobDetail> {
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
if (!this.trialConfig) { if (!this.trialConfig) {
throw new Error('trial config is not initialized'); throw new Error('trial config is not initialized');
} }
...@@ -207,7 +208,6 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -207,7 +208,6 @@ class RemoteMachineTrainingService implements TrainingService {
* @param form job application form * @param form job application form
*/ */
public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> { public async updateTrialJob(trialJobId: string, form: JobApplicationForm): Promise<TrialJobDetail> {
this.log.info(`updateTrialJob: form: ${JSON.stringify(form)}`);
const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId); const trialJobDetail: undefined | TrialJobDetail = this.trialJobsMap.get(trialJobId);
if (trialJobDetail === undefined) { if (trialJobDetail === undefined) {
throw new Error(`updateTrialJob failed: ${trialJobId} not found`); throw new Error(`updateTrialJob failed: ${trialJobId} not found`);
...@@ -238,7 +238,6 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -238,7 +238,6 @@ class RemoteMachineTrainingService implements TrainingService {
* @param trialJobId ID of trial job * @param trialJobId ID of trial job
*/ */
public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> { public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
this.log.info(`cancelTrialJob: jobId: ${trialJobId}`);
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<void> = new Deferred<void>();
const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId); const trialJob: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (!trialJob) { if (!trialJob) {
...@@ -329,12 +328,14 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -329,12 +328,14 @@ class RemoteMachineTrainingService implements TrainingService {
} }
public cleanUp(): Promise<void> { public cleanUp(): Promise<void> {
this.log.info('Stopping remote machine training service...');
this.stopping = true; this.stopping = true;
return Promise.resolve(); return Promise.resolve();
} }
private async setupConnections(machineList: string): Promise<void> { private async setupConnections(machineList: string): Promise<void> {
this.log.debug(`Connecting to remote machines: ${machineList}`);
const deferred: Deferred<void> = new Deferred<void>(); const deferred: Deferred<void> = new Deferred<void>();
//TO DO: verify if value's format is wrong, and json parse failed, how to handle error //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList); const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
...@@ -563,7 +564,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -563,7 +564,7 @@ class RemoteMachineTrainingService implements TrainingService {
} }
trialJob.endTime = parseInt(timestamp, 10); trialJob.endTime = parseInt(timestamp, 10);
} }
this.log.info(`trailJob status update: ${trialJob.id}, ${trialJob.status}`); this.log.debug(`trailJob status update: ${trialJob.id}, ${trialJob.status}`);
} }
deferred.resolve(trialJob); deferred.resolve(trialJob);
} catch (error) { } catch (error) {
......
...@@ -27,7 +27,7 @@ import { Client, ClientChannel, SFTPWrapper } from 'ssh2'; ...@@ -27,7 +27,7 @@ import { Client, ClientChannel, SFTPWrapper } from 'ssh2';
import * as stream from 'stream'; import * as stream from 'stream';
import { Deferred } from 'ts-deferred'; import { Deferred } from 'ts-deferred';
import { NNIError, NNIErrorNames } from '../../common/errors'; import { NNIError, NNIErrorNames } from '../../common/errors';
import { getLogger } from '../../common/log'; import { getLogger, Logger } from '../../common/log';
import { uniqueString, getRemoteTmpDir } from '../../common/utils'; import { uniqueString, getRemoteTmpDir } from '../../common/utils';
import { RemoteCommandResult } from './remoteMachineData'; import { RemoteCommandResult } from './remoteMachineData';
...@@ -69,11 +69,13 @@ export namespace SSHClientUtility { ...@@ -69,11 +69,13 @@ export namespace SSHClientUtility {
* @param sshClient SSH Client * @param sshClient SSH Client
*/ */
export function copyFileToRemote(localFilePath : string, remoteFilePath : string, sshClient : Client) : Promise<boolean> { export function copyFileToRemote(localFilePath : string, remoteFilePath : string, sshClient : Client) : Promise<boolean> {
const log: Logger = getLogger();
log.debug(`copyFileToRemote: localFilePath: ${localFilePath}, remoteFilePath: ${remoteFilePath}`);
assert(sshClient !== undefined); assert(sshClient !== undefined);
const deferred: Deferred<boolean> = new Deferred<boolean>(); const deferred: Deferred<boolean> = new Deferred<boolean>();
sshClient.sftp((err : Error, sftp : SFTPWrapper) => { sshClient.sftp((err : Error, sftp : SFTPWrapper) => {
if (err) { if (err) {
getLogger().error(`copyFileToRemote: ${err.message}, ${localFilePath}, ${remoteFilePath}`); log.error(`copyFileToRemote: ${err.message}, ${localFilePath}, ${remoteFilePath}`);
deferred.reject(err); deferred.reject(err);
return; return;
...@@ -98,6 +100,8 @@ export namespace SSHClientUtility { ...@@ -98,6 +100,8 @@ export namespace SSHClientUtility {
* @param client SSH Client * @param client SSH Client
*/ */
export function remoteExeCommand(command : string, client : Client): Promise<RemoteCommandResult> { export function remoteExeCommand(command : string, client : Client): Promise<RemoteCommandResult> {
const log: Logger = getLogger();
log.debug(`remoteExeCommand: command: [${command}]`);
const deferred : Deferred<RemoteCommandResult> = new Deferred<RemoteCommandResult>(); const deferred : Deferred<RemoteCommandResult> = new Deferred<RemoteCommandResult>();
let stdout: string = ''; let stdout: string = '';
let stderr: string = ''; let stderr: string = '';
...@@ -105,7 +109,7 @@ export namespace SSHClientUtility { ...@@ -105,7 +109,7 @@ export namespace SSHClientUtility {
client.exec(command, (err : Error, channel : ClientChannel) => { client.exec(command, (err : Error, channel : ClientChannel) => {
if (err) { if (err) {
getLogger().error(`remoteExeCommand: ${err.message}`); log.error(`remoteExeCommand: ${err.message}`);
deferred.reject(err); deferred.reject(err);
return; return;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment