Commit 9d3d926b authored by SparkSnail's avatar SparkSnail Committed by chicm-ms
Browse files

Migrate remote log (#655)

* fix remote bug

* add document

* add document

* update

* update

* update

* update

* fix remote issue

* fix forEach

* update doc according to comments

* update

* update

* update

* remove 'any more'

* add base version for remote-log

* change launcher.py

* test

* basic version

* debug

* debug

* basic work version

* fix code

* update disable_log

* remove unused line

* add diable log in kubernetesTrainingService

* add detect frameworkcontroller

* fix comment

* update

* update

* fix kubernetesData

* debug

* debug

* debug

* fix comment

* fix conflict

* remove local temp files

* revert launcher.py

* update code by comments

* remove disableLog

* remove disable Log

* set timeout for cleanup

* fix code by comments

* update variable names

* add comments

* add delay function

* update

* update

* update by comments

* add  in remote script path

* rename variables

* update variable name

* add mkdir -p for subfolder
parent 83684388
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
"version": "999.0.0-developing", "version": "999.0.0-developing",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {
"postbuild": "cp -rf scripts ./dist/ && cp -rf config ./dist/", "postbuild": "cp -rf config ./dist/",
"build": "tsc", "build": "tsc",
"test": "nyc mocha -r ts-node/register -t 15000 --recursive **/*.test.ts --exclude node_modules/**/**/*.test.ts --exclude core/test/nnimanager.test.ts --colors", "test": "nyc mocha -r ts-node/register -t 15000 --recursive **/*.test.ts --exclude node_modules/**/**/*.test.ts --exclude core/test/nnimanager.test.ts --colors",
"start": "node dist/main.js", "start": "node dist/main.js",
......
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #
import argparse
import errno
import json
import os
import re
METRICS_FILENAME = '.nni/metrics'
OFFSET_FILENAME = '.nni/metrics_offset'
JOB_CODE_FILENAME = '.nni/code'
JOB_PID_FILENAME = '.nni/jobpid'
JOB_CODE_PATTERN = re.compile('^(\d+)\s+(\d+)$')
LEN_FIELD_SIZE = 6
MAGIC = 'ME'
class TrialMetricsReader():
'''
Read metrics data from a trial job
'''
def __init__(self, trial_job_dir):
self.trial_job_dir = trial_job_dir
self.offset_filename = os.path.join(trial_job_dir, OFFSET_FILENAME)
self.metrics_filename = os.path.join(trial_job_dir, METRICS_FILENAME)
self.jobcode_filename = os.path.join(trial_job_dir, JOB_CODE_FILENAME)
self.jobpid_filemame = os.path.join(trial_job_dir, JOB_PID_FILENAME)
def _metrics_file_is_empty(self):
if not os.path.isfile(self.metrics_filename):
return True
statinfo = os.stat(self.metrics_filename)
return statinfo.st_size == 0
def _get_offset(self):
offset = 0
if os.path.isfile(self.offset_filename):
with open(self.offset_filename, 'r') as f:
offset = int(f.readline())
return offset
def _write_offset(self, offset):
statinfo = os.stat(self.metrics_filename)
if offset < 0 or offset > statinfo.st_size:
raise ValueError('offset value is invalid: {}'.format(offset))
with open(self.offset_filename, 'w') as f:
f.write(str(offset)+'\n')
def _read_all_available_records(self, offset):
new_offset = offset
metrics = []
with open(self.metrics_filename, 'r') as f:
f.seek(offset)
while True:
magic_string = f.read(len(MAGIC))
# empty data means EOF
if not magic_string:
break
strdatalen = f.read(LEN_FIELD_SIZE)
# empty data means EOF
if not strdatalen:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
datalen = int(strdatalen)
data = f.read(datalen)
if datalen > 0 and len(data) == datalen:
new_offset = f.tell()
metrics.append(data)
else:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
self._write_offset(new_offset)
return metrics
def _pid_exists(selft, pid):
if pid < 0:
return False
if pid == 0:
# According to "man 2 kill" PID 0 refers to every process
# in the process group of the calling process.
# On certain systems 0 is a valid PID but we have no way
# to know that in a portable fashion.
raise ValueError('invalid PID 0')
try:
os.kill(pid, 0)
except OSError as err:
if err.errno == errno.ESRCH:
# ESRCH == No such process
return False
elif err.errno == errno.EPERM:
# EPERM clearly means there's a process to deny access to
return True
else:
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH)
raise
else:
return True
def read_trial_metrics(self):
'''
Read available metrics data for a trial
'''
if self._metrics_file_is_empty():
return []
offset = self._get_offset()
return self._read_all_available_records(offset)
def read_trial_status(self):
if os.path.isfile(self.jobpid_filemame):
with open(self.jobpid_filemame, 'r') as f:
jobpid = int(f.readline())
if self._pid_exists(jobpid):
return 'RUNNING' ,-1
else:
return self._read_job_return_code()
else:
# raise ValueError('offset value is invalid: {}'.format(offset))
return 'UNKNOWN' ,-1
def _read_job_return_code(self):
if os.path.isfile(self.jobcode_filename):
with open(self.jobcode_filename, 'r') as f:
job_return_code = f.readline()
match = JOB_CODE_PATTERN.match(job_return_code)
if(match):
return_code = int(match.group(1))
timestamp = int(match.group(2))
status = ''
if return_code == 0:
status = 'SUCCEEDED'
elif return_code > 128:
status = 'USER_CANCELED'
else:
status = 'FAILED'
return status, timestamp
else:
raise ValueError('Job code file format incorrect')
else:
raise ValueError('job return code file doesnt exist: {}'.format(self.jobcode_filename))
def read_experiment_metrics(args):
'''
Read metrics data for specified trial jobs
'''
trial_job_ids = args.trial_job_ids.strip().split(',')
trial_job_ids = [id.strip() for id in trial_job_ids]
results = []
for trial_job_id in trial_job_ids:
result = {}
try:
trial_job_dir = os.path.join(args.experiment_dir, 'trials', trial_job_id)
reader = TrialMetricsReader(trial_job_dir)
result['jobId'] = trial_job_id
result['metrics'] = reader.read_trial_metrics()
result['jobStatus'], result['endTimestamp'] = reader.read_trial_status()
results.append(result)
except Exception:
#TODO error logging to file
pass
print(json.dumps(results))
if __name__ == '__main__':
PARSER = argparse.ArgumentParser()
PARSER.add_argument("--experiment_dir", type=str, help="Root directory of experiment", required=True)
PARSER.add_argument("--trial_job_ids", type=str, help="Trial job ids splited with ','", required=True)
ARGS, UNKNOWN = PARSER.parse_known_args()
read_experiment_metrics(ARGS)
...@@ -163,4 +163,20 @@ export class GPUScheduler { ...@@ -163,4 +163,20 @@ export class GPUScheduler {
} }
}; };
} }
/**
* remove the job's gpu reversion
* @param trialJobId
* @param rmMeta
*/
public removeGpuReservation(trialJobId: string, rmMeta?: RemoteMachineMeta): void{
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
if(rmMeta !== undefined && rmMeta.gpuReservation !== undefined) {
rmMeta.gpuReservation.forEach((reserveTrialJobId : string, gpuIndex : number) => {
if(reserveTrialJobId == trialJobId) {
rmMeta.gpuReservation.delete(gpuIndex);
}
});
}
}
} }
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as assert from 'assert';
import { EventEmitter } from 'events';
import * as path from 'path';
import { Client } from 'ssh2';
import { getLogger, Logger } from '../../common/log';
import { TrialJobStatus, TrialJobDetail } from '../../common/trainingService';
import { JobMetrics } from '../common/jobMetrics';
import { RemoteCommandResult, RemoteMachineMeta, RemoteMachineTrialJobDetail } from './remoteMachineData';
import { SSHClientUtility } from './sshClientUtility';
export class MetricsCollector {
private machineSSHClientMap : Map<RemoteMachineMeta, Client>;
private trialJobsMap : Map<string, any>;
private expRootDir: string;
private metricsEmitter: EventEmitter;
private log: Logger = getLogger();
constructor(clientMap: Map<RemoteMachineMeta, Client>,
jobMap: Map<string, any>,
expDir: string, eventEmitter: EventEmitter) {
this.machineSSHClientMap = clientMap;
this.trialJobsMap = jobMap;
this.expRootDir = expDir;
this.metricsEmitter = eventEmitter;
}
public async collectMetrics(): Promise<void> {
const aliveJobStatus : TrialJobStatus[] = ['RUNNING', 'SUCCEEDED'];
const runningJobsMap: Map<RemoteMachineMeta, string[]> = this.getTrialJobIdsGroupByRmMeta(aliveJobStatus);
const readMetricsTasks: Promise<JobMetrics[]>[] = [];;
runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => {
readMetricsTasks.push(this.readRmMetrics(rmMeta, jobIds));
});
const allMetrics = await Promise.all(readMetricsTasks.map(task => { return task.catch(err => { this.log.error(err.message); }); }));
allMetrics.forEach((rmMetrics) => {
if (rmMetrics !== undefined && rmMetrics.length > 0) {
rmMetrics.forEach((jobMetrics) => {
const trialJobId : string = jobMetrics.jobId;
const trialJobDetail : RemoteMachineTrialJobDetail = <RemoteMachineTrialJobDetail>this.trialJobsMap.get(trialJobId);
assert(trialJobDetail);
// If job status is not alive again, remove its GPU reservation
if(!['RUNNING'].includes(jobMetrics.jobStatus)) {
if (trialJobDetail.status !== 'EARLY_STOPPED') {
trialJobDetail.status = jobMetrics.jobStatus;
}
this.log.debug(`Set trialjob ${trialJobDetail.id} status to ${trialJobDetail.status}`);
runningJobsMap.forEach((jobIds: string[], rmMeta: RemoteMachineMeta) => {
// If remote machine has no GPU, gpuReservcation is not initialized, so check if it's undefined
if(rmMeta.gpuReservation !== undefined) {
rmMeta.gpuReservation.forEach((reserveTrialJobId : string, gpuIndex : number) => {
if(reserveTrialJobId == trialJobId) {
rmMeta.gpuReservation.delete(gpuIndex);
}
});
}
});
}
this.sendMetricsToListeners(jobMetrics);
});
}
});
}
private getTrialJobIdsGroupByRmMeta(status: TrialJobStatus[]): Map<RemoteMachineMeta, string[]> {
const map: Map<RemoteMachineMeta, string[]> = new Map<RemoteMachineMeta, string[]>();
this.trialJobsMap.forEach((trialJob, id) => {
let reservedTrialJobIds : string[] = [];
if(trialJob.rmMeta !== undefined
&& trialJob.rmMeta.gpuReservation !== undefined) {
reservedTrialJobIds = Array.from(trialJob.rmMeta.gpuReservation.values());
}
if (reservedTrialJobIds.includes(id) || status.includes(trialJob.status)) {
if (map.has(trialJob.rmMeta)) {
const ids = map.get(trialJob.rmMeta);
if (ids !== undefined && !ids.includes(id)) {
ids.push(id);
}
} else {
let initJobIds : string[] = [id];
// If the remote machine has jobs reserve GPU, also put that jobs into list to get metrics data
if(trialJob.rmMeta.gpuReservation !== undefined) {
const concatJobIds : string[] = initJobIds.concat(reservedTrialJobIds);
initJobIds = concatJobIds.filter((item, pos) => concatJobIds.indexOf(item) === pos);
}
map.set(trialJob.rmMeta, initJobIds);
}
}
});
return map;
}
private sendMetricsToListeners(jobMetrics: JobMetrics): void {
if (jobMetrics === undefined) {
return;
}
const jobId: string = jobMetrics.jobId;
jobMetrics.metrics.forEach((metric: string) => {
if (metric.length > 0) {
this.metricsEmitter.emit('metric', {
id : jobId,
data : metric
});
}
});
}
private async readRmMetrics(rmMeta: RemoteMachineMeta, trialJobIds: string[]): Promise<JobMetrics[]> {
if (trialJobIds === undefined || trialJobIds.length < 1) {
return [];
}
const scriptFile: string = path.join(path.dirname(path.dirname(this.expRootDir)), 'scripts', 'metrics_reader.py');
const cmdStr: string = `python3 ${scriptFile} --experiment_dir ${this.expRootDir} --trial_job_ids ${trialJobIds.join(',')}`;
trialJobIds.forEach((id: string) => {
const trialJob: RemoteMachineTrialJobDetail = this.trialJobsMap.get(id);
assert(trialJob.rmMeta === rmMeta);
});
const sshClient: Client | undefined = this.machineSSHClientMap.get(rmMeta);
if (sshClient === undefined) {
throw new Error('SSHClient not found!');
}
const result: RemoteCommandResult = await SSHClientUtility.remoteExeCommand(cmdStr, sshClient);
if (result.exitCode !== 0) {
throw new Error(`Failed to read metrics data: ${result.stderr}`);
} else {
if (result.stdout !== undefined && result.stdout.length > 0) {
return <JobMetrics[]>JSON.parse(result.stdout);
} else {
return [];
}
}
}
}
...@@ -108,15 +108,14 @@ export enum ScheduleResultType { ...@@ -108,15 +108,14 @@ export enum ScheduleResultType {
REQUIRE_EXCEED_TOTAL REQUIRE_EXCEED_TOTAL
} }
export const REMOTEMACHINE_RUN_SHELL_FORMAT: string = export const REMOTEMACHINE_TRIAL_COMMAND_FORMAT: string =
`#!/bin/bash `#!/bin/bash
export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_TRIAL_JOB_ID={1} NNI_OUTPUT_DIR={0} export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} export MULTI_PHASE={5}
export MULTI_PHASE={7}
export NNI_TRIAL_SEQ_ID={8}
cd $NNI_SYS_DIR cd $NNI_SYS_DIR
echo $$ >{2} sh install_nni.sh
eval {3}{4} 2>{5} echo $$ >{6}
echo $? \`date +%s%3N\` >{6}`; python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}'
echo $? \`date +%s%3N\` >{10}`;
export const HOST_JOB_SHELL_FORMAT: string = export const HOST_JOB_SHELL_FORMAT: string =
`#!/bin/bash `#!/bin/bash
...@@ -124,3 +123,11 @@ cd {0} ...@@ -124,3 +123,11 @@ cd {0}
echo $$ >{1} echo $$ >{1}
eval {2} >stdout 2>stderr eval {2} >stdout 2>stderr
echo $? \`date +%s%3N\` >{3}`; echo $? \`date +%s%3N\` >{3}`;
export const GPU_COLLECTOR_FORMAT: string =
`
#!/bin/bash
export METRIC_OUTPUT_DIR={0}
echo $$ >{1}
python3 -m nni_gpu_tool.gpu_metrics_collector
`
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict';
import * as component from '../../common/component';
import { Inject } from 'typescript-ioc';
import { RemoteMachineTrainingService } from './remoteMachineTrainingService';
import { ClusterJobRestServer } from '../common/clusterJobRestServer'
/**
* RemoteMachine Training service Rest server, provides rest RemoteMachine to support remotemachine job metrics update
*
*/
@component.Singleton
export class RemoteMachineJobRestServer extends ClusterJobRestServer{
@Inject
private readonly remoteMachineTrainingService : RemoteMachineTrainingService;
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor() {
super();
this.remoteMachineTrainingService = component.get(RemoteMachineTrainingService);
}
protected handleTrialMetrics(jobId : string, metrics : any[]) : void {
// Split metrics array into single metric, then emit
// Warning: If not split metrics into single ones, the behavior will be UNKNOWNls
for (const singleMetric of metrics) {
this.remoteMachineTrainingService.MetricsEmitter.emit('metric', {
id : jobId,
data : singleMetric
});
}
}
}
\ No newline at end of file
...@@ -29,30 +29,34 @@ import { Client, ConnectConfig } from 'ssh2'; ...@@ -29,30 +29,34 @@ import { Client, ConnectConfig } from 'ssh2';
import { Deferred } from 'ts-deferred'; import { Deferred } from 'ts-deferred';
import { String } from 'typescript-string-operations'; import { String } from 'typescript-string-operations';
import * as component from '../../common/component'; import * as component from '../../common/component';
import { MethodNotImplementedError, NNIError, NNIErrorNames } from '../../common/errors'; import { NNIError, NNIErrorNames } from '../../common/errors';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo'; import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log'; import { getLogger, Logger } from '../../common/log';
import { ObservableTimer } from '../../common/observableTimer'; import { ObservableTimer } from '../../common/observableTimer';
import { import {
HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric HostJobApplicationForm, HyperParameters, JobApplicationForm, TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService'; } from '../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString, getJobCancelStatus, getRemoteTmpDir } from '../../common/utils'; import { delay, generateParamFileName, getExperimentRootDir, uniqueString, getJobCancelStatus, getRemoteTmpDir,getIPV4Address } from '../../common/utils';
import { GPUSummary } from '../common/gpuData'; import { GPUSummary } from '../common/gpuData';
import { TrialConfig } from '../common/trialConfig'; import { TrialConfig } from '../common/trialConfig';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { GPUScheduler } from './gpuScheduler'; import { GPUScheduler } from './gpuScheduler';
import { MetricsCollector } from './metricsCollector';
import { import {
HOST_JOB_SHELL_FORMAT, RemoteCommandResult, RemoteMachineMeta, HOST_JOB_SHELL_FORMAT, RemoteCommandResult, RemoteMachineMeta,
REMOTEMACHINE_RUN_SHELL_FORMAT, RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineScheduleInfo, RemoteMachineScheduleResult,
RemoteMachineTrialJobDetail, ScheduleResultType RemoteMachineTrialJobDetail, ScheduleResultType, REMOTEMACHINE_TRIAL_COMMAND_FORMAT,
GPU_COLLECTOR_FORMAT
} from './remoteMachineData'; } from './remoteMachineData';
import { SSHClientUtility } from './sshClientUtility'; import { SSHClientUtility } from './sshClientUtility';
import { validateCodeDir} from '../common/util'; import { validateCodeDir} from '../common/util';
import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { mkDirP } from '../../common/utils';
/** /**
* Training Service implementation for Remote Machine (Linux) * Training Service implementation for Remote Machine (Linux)
*/ */
@component.Singleton
class RemoteMachineTrainingService implements TrainingService { class RemoteMachineTrainingService implements TrainingService {
private machineSSHClientMap: Map<RemoteMachineMeta, Client>; private machineSSHClientMap: Map<RemoteMachineMeta, Client>;
private trialJobsMap: Map<string, RemoteMachineTrialJobDetail>; private trialJobsMap: Map<string, RemoteMachineTrialJobDetail>;
...@@ -67,7 +71,9 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -67,7 +71,9 @@ class RemoteMachineTrainingService implements TrainingService {
private log: Logger; private log: Logger;
private isMultiPhase: boolean = false; private isMultiPhase: boolean = false;
private trialSequenceId: number; private trialSequenceId: number;
private remoteRestServerPort?: number;
private readonly remoteOS: string; private readonly remoteOS: string;
private nniManagerIpConfig?: NNIManagerIpConfig;
constructor(@component.Inject timer: ObservableTimer) { constructor(@component.Inject timer: ObservableTimer) {
this.remoteOS = 'linux'; this.remoteOS = 'linux';
...@@ -88,9 +94,12 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -88,9 +94,12 @@ class RemoteMachineTrainingService implements TrainingService {
* Loop to launch trial jobs and collect trial metrics * Loop to launch trial jobs and collect trial metrics
*/ */
public async run(): Promise<void> { public async run(): Promise<void> {
const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
await restServer.start();
this.log.info('Run remote machine training service.'); this.log.info('Run remote machine training service.');
while (!this.stopping) { while (!this.stopping) {
while (this.jobQueue.length > 0) { while (this.jobQueue.length > 0) {
this.updateGpuReservation();
const trialJobId: string = this.jobQueue[0]; const trialJobId: string = this.jobQueue[0];
const prepareResult : boolean = await this.prepareTrialJob(trialJobId); const prepareResult : boolean = await this.prepareTrialJob(trialJobId);
if (prepareResult) { if (prepareResult) {
...@@ -102,9 +111,6 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -102,9 +111,6 @@ class RemoteMachineTrainingService implements TrainingService {
break; break;
} }
} }
const metricsCollector: MetricsCollector = new MetricsCollector(
this.machineSSHClientMap, this.trialJobsMap, this.remoteExpRootDir, this.metricsEmitter);
await metricsCollector.collectMetrics();
await delay(3000); await delay(3000);
} }
this.log.info('Remote machine training service exit.'); this.log.info('Remote machine training service exit.');
...@@ -225,6 +231,17 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -225,6 +231,17 @@ class RemoteMachineTrainingService implements TrainingService {
return trialJobDetail; return trialJobDetail;
} }
/**
* remove gpu reversion when job is not running
*/
private updateGpuReservation() {
for (const [key, value] of this.trialJobsMap) {
if(!['WAITING', 'RUNNING'].includes(value.status)) {
this.gpuScheduler.removeGpuReservation(value.id, value.rmMeta);
}
};
}
/** /**
* Is multiphase job supported in current training service * Is multiphase job supported in current training service
...@@ -284,8 +301,13 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -284,8 +301,13 @@ class RemoteMachineTrainingService implements TrainingService {
*/ */
public async setClusterMetadata(key: string, value: string): Promise<void> { public async setClusterMetadata(key: string, value: string): Promise<void> {
switch (key) { switch (key) {
case TrialConfigMetadataKey.NNI_MANAGER_IP:
this.nniManagerIpConfig = <NNIManagerIpConfig>JSON.parse(value);
break;
case TrialConfigMetadataKey.MACHINE_LIST: case TrialConfigMetadataKey.MACHINE_LIST:
await this.setupConnections(value); await this.setupConnections(value);
//remove local temp files
await cpp.exec(`rm -rf ${this.getLocalGpuMetricCollectorDir()}`);
break; break;
case TrialConfigMetadataKey.TRIAL_CONFIG: case TrialConfigMetadataKey.TRIAL_CONFIG:
const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value); const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value);
...@@ -326,12 +348,58 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -326,12 +348,58 @@ class RemoteMachineTrainingService implements TrainingService {
return deferred.promise; return deferred.promise;
} }
public cleanUp(): Promise<void> { /**
* cleanup() has a time out of 10s to clean remote connections
*/
public async cleanUp(): Promise<void> {
this.log.info('Stopping remote machine training service...'); this.log.info('Stopping remote machine training service...');
this.stopping = true; this.stopping = true;
await Promise.race([delay(10000), this.cleanupConnections()]);
}
/**
* stop gpu_metric_collector process in remote machine and remove unused scripts
*/
private async cleanupConnections(): Promise<void> {
try{
for (const [rmMeta, client] of this.machineSSHClientMap.entries()) {
let jobpidPath: string = path.join(this.getRemoteScriptsPath(rmMeta.username), 'pid');
await SSHClientUtility.remoteExeCommand(`pkill -P \`cat ${jobpidPath}\``, client);
await SSHClientUtility.remoteExeCommand(`rm -rf ${this.getRemoteScriptsPath(rmMeta.username)}`, client);
}
}catch (error) {
//ignore error, this function is called to cleanup remote connections when experiment is stopping
this.log.error(`Cleanup connection exception, error is ${error.message}`);
}
return Promise.resolve(); return Promise.resolve();
}
/**
* Generate gpu metric collector directory to store temp gpu metric collector script files
*/
private getLocalGpuMetricCollectorDir(): string {
let userName: string = path.basename(os.homedir()); //get current user name of os
return `${os.tmpdir()}/${userName}/nni/scripts/`;
}
/**
* Generate gpu metric collector shell script in local machine,
* used to run in remote machine, and will be deleted after uploaded from local.
*/
private async generateGpuMetricsCollectorScript(userName: string): Promise<void> {
let gpuMetricCollectorScriptFolder : string = this.getLocalGpuMetricCollectorDir();
await cpp.exec(`mkdir -p ${path.join(gpuMetricCollectorScriptFolder, userName)}`);
//generate gpu_metrics_collector.sh
let gpuMetricsCollectorScriptPath: string = path.join(gpuMetricCollectorScriptFolder, userName, 'gpu_metrics_collector.sh');
const remoteGPUScriptsDir: string = this.getRemoteScriptsPath(userName); // This directory is used to store gpu_metrics and pid created by script
const gpuMetricsCollectorScriptContent: string = String.Format(
GPU_COLLECTOR_FORMAT,
remoteGPUScriptsDir,
path.join(remoteGPUScriptsDir, 'pid'),
);
await fs.promises.writeFile(gpuMetricsCollectorScriptPath, gpuMetricsCollectorScriptContent, { encoding: 'utf8' });
} }
private async setupConnections(machineList: string): Promise<void> { private async setupConnections(machineList: string): Promise<void> {
...@@ -340,6 +408,7 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -340,6 +408,7 @@ class RemoteMachineTrainingService implements TrainingService {
//TO DO: verify if value's format is wrong, and json parse failed, how to handle error //TO DO: verify if value's format is wrong, and json parse failed, how to handle error
const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList); const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList);
let connectedRMNum: number = 0; let connectedRMNum: number = 0;
rmMetaList.forEach((rmMeta: RemoteMachineMeta) => { rmMetaList.forEach((rmMeta: RemoteMachineMeta) => {
const conn: Client = new Client(); const conn: Client = new Client();
let connectConfig: ConnectConfig = { let connectConfig: ConnectConfig = {
...@@ -372,29 +441,30 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -372,29 +441,30 @@ class RemoteMachineTrainingService implements TrainingService {
deferred.reject(new Error(err.message)); deferred.reject(new Error(err.message));
}).connect(connectConfig); }).connect(connectConfig);
}); });
return deferred.promise; return deferred.promise;
} }
private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta, conn: Client): Promise<void> { private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta, conn: Client): Promise<void> {
// Create root working directory after ssh connection is ready // Create root working directory after ssh connection is ready
//TO DO: Should we mk experiments rootDir here? await this.generateGpuMetricsCollectorScript(rmMeta.username); //generate gpu script in local machine first, will copy to remote machine later
const nniRootDir: string = '/tmp/nni'; const nniRootDir: string = `${os.tmpdir()}/nni`;
await SSHClientUtility.remoteExeCommand(`mkdir -p ${this.remoteExpRootDir}`, conn); await SSHClientUtility.remoteExeCommand(`mkdir -p ${this.remoteExpRootDir}`, conn);
// Copy NNI scripts to remote expeirment working directory // Copy NNI scripts to remote expeirment working directory
const remoteScriptsDir: string = this.getRemoteScriptsPath(); const localGpuScriptCollectorDir: string = this.getLocalGpuMetricCollectorDir();
await SSHClientUtility.remoteExeCommand(`mkdir -p ${remoteScriptsDir}`, conn); const remoteGpuScriptCollectorDir: string = this.getRemoteScriptsPath(rmMeta.username); //the directory to store temp scripts in remote machine
await SSHClientUtility.copyDirectoryToRemote('./scripts', remoteScriptsDir, conn, this.remoteOS); await SSHClientUtility.remoteExeCommand(`mkdir -p ${remoteGpuScriptCollectorDir}`, conn);
await SSHClientUtility.remoteExeCommand(`chmod 777 ${nniRootDir} ${nniRootDir}/* ${nniRootDir}/scripts/*`, conn); await SSHClientUtility.remoteExeCommand(`chmod 777 ${nniRootDir} ${nniRootDir}/* ${nniRootDir}/scripts/*`, conn);
//copy gpu_metrics_collector.sh to remote
await SSHClientUtility.copyFileToRemote(path.join(localGpuScriptCollectorDir, rmMeta.username, 'gpu_metrics_collector.sh'), path.join(remoteGpuScriptCollectorDir, 'gpu_metrics_collector.sh'), conn);
//Begin to execute gpu_metrics_collection scripts //Begin to execute gpu_metrics_collection scripts
SSHClientUtility.remoteExeCommand(`cd ${remoteScriptsDir} && python3 gpu_metrics_collector.py`, conn); SSHClientUtility.remoteExeCommand(`bash ${path.join(remoteGpuScriptCollectorDir, 'gpu_metrics_collector.sh')}`, conn);
this.timer.subscribe( this.timer.subscribe(
async (tick: number) => { async (tick: number) => {
const cmdresult: RemoteCommandResult = await SSHClientUtility.remoteExeCommand( const cmdresult: RemoteCommandResult = await SSHClientUtility.remoteExeCommand(
`tail -n 1 ${path.join(remoteScriptsDir, 'gpu_metrics')}`, conn); `tail -n 1 ${path.join(remoteGpuScriptCollectorDir, 'gpu_metrics')}`, conn);
if (cmdresult && cmdresult.stdout) { if (cmdresult && cmdresult.stdout) {
rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult.stdout); rmMeta.gpuSummary = <GPUSummary>JSON.parse(cmdresult.stdout);
} }
...@@ -412,7 +482,6 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -412,7 +482,6 @@ class RemoteMachineTrainingService implements TrainingService {
if (trialJobDetail === undefined) { if (trialJobDetail === undefined) {
throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`); throw new NNIError(NNIErrorNames.INVALID_JOB_DETAIL, `Invalid job detail information for trial job ${trialJobId}`);
} }
// get an ssh client from scheduler // get an ssh client from scheduler
const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobId); const rmScheduleResult: RemoteMachineScheduleResult = this.gpuScheduler.scheduleMachine(this.trialConfig.gpuNum, trialJobId);
if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) { if (rmScheduleResult.resultType === ScheduleResultType.REQUIRE_EXCEED_TOTAL) {
...@@ -466,39 +535,51 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -466,39 +535,51 @@ class RemoteMachineTrainingService implements TrainingService {
await SSHClientUtility.remoteExeCommand(`mkdir -p ${trialWorkingFolder}`, sshClient); await SSHClientUtility.remoteExeCommand(`mkdir -p ${trialWorkingFolder}`, sshClient);
await SSHClientUtility.remoteExeCommand(`mkdir -p ${path.join(trialWorkingFolder, '.nni')}`, sshClient); await SSHClientUtility.remoteExeCommand(`mkdir -p ${path.join(trialWorkingFolder, '.nni')}`, sshClient);
await SSHClientUtility.remoteExeCommand(`touch ${path.join(trialWorkingFolder, '.nni', 'metrics')}`, sshClient);
// RemoteMachineRunShellFormat is the run shell format string, // RemoteMachineRunShellFormat is the run shell format string,
// See definition in remoteMachineData.ts // See definition in remoteMachineData.ts
const runScriptContent: string = String.Format(
REMOTEMACHINE_RUN_SHELL_FORMAT, let command: string;
// Set CUDA_VISIBLE_DEVICES environment variable based on cuda_visible_device
// If no valid cuda_visible_device is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
if(typeof cuda_visible_device === 'string' && cuda_visible_device.length > 0) {
command = `CUDA_VISIBLE_DEVICES=${cuda_visible_device} ${this.trialConfig.command}`;
} else {
command = `CUDA_VISIBLE_DEVICES=" " ${this.trialConfig.command}`;
}
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
if(!this.remoteRestServerPort) {
const restServer: RemoteMachineJobRestServer = component.get(RemoteMachineJobRestServer);
this.remoteRestServerPort = restServer.clusterRestServerPort;
}
const runScriptTrialContent: string = String.Format(
REMOTEMACHINE_TRIAL_COMMAND_FORMAT,
trialWorkingFolder,
trialWorkingFolder, trialWorkingFolder,
trialJobId, trialJobId,
path.join(trialWorkingFolder, '.nni', 'jobpid'), getExperimentId(),
// Set CUDA_VISIBLE_DEVICES environment variable based on cuda_visible_device trialJobDetail.sequenceId.toString(),
// If no valid cuda_visible_device is defined, set CUDA_VISIBLE_DEVICES to empty string to hide GPU device
(typeof cuda_visible_device === 'string' && cuda_visible_device.length > 0) ?
`CUDA_VISIBLE_DEVICES=${cuda_visible_device} ` : `CUDA_VISIBLE_DEVICES=" " `,
this.trialConfig.command,
path.join(trialWorkingFolder, 'stderr'),
path.join(trialWorkingFolder, '.nni', 'code'),
/** Mark if the trial is multi-phase job */
this.isMultiPhase, this.isMultiPhase,
trialJobDetail.sequenceId.toString() path.join(trialWorkingFolder, '.nni', 'jobpid'),
); command,
nniManagerIp,
this.remoteRestServerPort,
path.join(trialWorkingFolder, '.nni', 'code')
)
//create tmp trial working folder locally. //create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${path.join(trialLocalTempFolder, '.nni')}`); await cpp.exec(`mkdir -p ${path.join(trialLocalTempFolder, '.nni')}`);
//create tmp trial working folder locally.
await cpp.exec(`cp -r ${this.trialConfig.codeDir}/* ${trialLocalTempFolder}`);
const installScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'install_nni.sh'), installScriptContent, { encoding: 'utf8' });
// Write file content ( run.sh and parameter.cfg ) to local tmp files // Write file content ( run.sh and parameter.cfg ) to local tmp files
await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptContent, { encoding: 'utf8' }); await fs.promises.writeFile(path.join(trialLocalTempFolder, 'run.sh'), runScriptTrialContent, { encoding: 'utf8' });
// Copy local tmp files to remote machine
await SSHClientUtility.copyFileToRemote(
path.join(trialLocalTempFolder, 'run.sh'), path.join(trialWorkingFolder, 'run.sh'), sshClient);
await this.writeParameterFile(trialJobId, form.hyperParameters, rmScheduleInfo.rmMeta); await this.writeParameterFile(trialJobId, form.hyperParameters, rmScheduleInfo.rmMeta);
// Copy files in codeDir to remote working directory // Copy files in codeDir to remote working directory
await SSHClientUtility.copyDirectoryToRemote(this.trialConfig.codeDir, trialWorkingFolder, sshClient, this.remoteOS); await SSHClientUtility.copyDirectoryToRemote(trialLocalTempFolder, trialWorkingFolder, sshClient, this.remoteOS);
// Execute command in remote machine // Execute command in remote machine
SSHClientUtility.remoteExeCommand(`bash ${path.join(trialWorkingFolder, 'run.sh')}`, sshClient); SSHClientUtility.remoteExeCommand(`bash ${path.join(trialWorkingFolder, 'run.sh')}`, sshClient);
} }
...@@ -546,7 +627,6 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -546,7 +627,6 @@ class RemoteMachineTrainingService implements TrainingService {
const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>(); const deferred: Deferred<TrialJobDetail> = new Deferred<TrialJobDetail>();
const jobpidPath: string = this.getJobPidPath(trialJob.id); const jobpidPath: string = this.getJobPidPath(trialJob.id);
const trialReturnCodeFilePath: string = path.join(this.remoteExpRootDir, 'trials', trialJob.id, '.nni', 'code'); const trialReturnCodeFilePath: string = path.join(this.remoteExpRootDir, 'trials', trialJob.id, '.nni', 'code');
try { try {
const killResult: number = (await SSHClientUtility.remoteExeCommand(`kill -0 \`cat ${jobpidPath}\``, sshClient)).exitCode; const killResult: number = (await SSHClientUtility.remoteExeCommand(`kill -0 \`cat ${jobpidPath}\``, sshClient)).exitCode;
// if the process of jobpid is not alive any more // if the process of jobpid is not alive any more
...@@ -576,12 +656,11 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -576,12 +656,11 @@ class RemoteMachineTrainingService implements TrainingService {
deferred.resolve(trialJob); deferred.resolve(trialJob);
} }
} }
return deferred.promise; return deferred.promise;
} }
private getRemoteScriptsPath(): string { private getRemoteScriptsPath(userName: string): string {
return path.join(path.dirname(path.dirname(this.remoteExpRootDir)), 'scripts'); return path.join(getRemoteTmpDir(this.remoteOS), userName, 'nni', 'scripts');
} }
private getHostJobRemoteDir(jobId: string): string { private getHostJobRemoteDir(jobId: string): string {
...@@ -592,6 +671,10 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -592,6 +671,10 @@ class RemoteMachineTrainingService implements TrainingService {
return path.join(getRemoteTmpDir(this.remoteOS), 'nni', 'experiments', getExperimentId()); return path.join(getRemoteTmpDir(this.remoteOS), 'nni', 'experiments', getExperimentId());
} }
public get MetricsEmitter() : EventEmitter {
return this.metricsEmitter;
}
private getJobPidPath(jobId: string): string { private getJobPidPath(jobId: string): string {
const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId); const trialJobDetail: RemoteMachineTrialJobDetail | undefined = this.trialJobsMap.get(jobId);
if (trialJobDetail === undefined) { if (trialJobDetail === undefined) {
...@@ -633,22 +716,6 @@ class RemoteMachineTrainingService implements TrainingService { ...@@ -633,22 +716,6 @@ class RemoteMachineTrainingService implements TrainingService {
return this.trialSequenceId++; return this.trialSequenceId++;
} }
private async writeRemoteTrialFile(trialJobId: string, fileContent: string,
rmMeta: RemoteMachineMeta, fileName: string): Promise<void> {
const sshClient: Client | undefined = this.machineSSHClientMap.get(rmMeta);
if (sshClient === undefined) {
throw new Error('sshClient is undefined.');
}
const trialWorkingFolder: string = path.join(this.remoteExpRootDir, 'trials', trialJobId);
const trialLocalTempFolder: string = path.join(this.expRootDir, 'trials-local', trialJobId);
const localFilepath: string = path.join(trialLocalTempFolder, fileName);
await fs.promises.writeFile(localFilepath, fileContent, { encoding: 'utf8' });
await SSHClientUtility.copyFileToRemote(localFilepath, path.join(trialWorkingFolder, fileName), sshClient);
}
} }
export { RemoteMachineTrainingService }; export { RemoteMachineTrainingService };
...@@ -36,7 +36,7 @@ if not os.path.exists(_outputdir): ...@@ -36,7 +36,7 @@ if not os.path.exists(_outputdir):
os.makedirs(_outputdir) os.makedirs(_outputdir)
_nni_platform = os.environ['NNI_PLATFORM'] _nni_platform = os.environ['NNI_PLATFORM']
if _nni_platform not in ['pai', 'kubeflow', 'frameworkcontroller']: if _nni_platform == 'local':
_log_file_path = os.path.join(_outputdir, 'trial.log') _log_file_path = os.path.join(_outputdir, 'trial.log')
init_logger(_log_file_path) init_logger(_log_file_path)
...@@ -77,7 +77,7 @@ def get_next_parameter(): ...@@ -77,7 +77,7 @@ def get_next_parameter():
return params return params
def send_metric(string): def send_metric(string):
if _nni_platform in ['pai', 'kubeflow', 'frameworkcontroller']: if _nni_platform != 'local':
data = (string).encode('utf8') data = (string).encode('utf8')
assert len(data) < 1000000, 'Metric too long' assert len(data) < 1000000, 'Metric too long'
print('NNISDK_ME%s' % (data)) print('NNISDK_ME%s' % (data))
......
...@@ -57,8 +57,7 @@ class OpenRow extends React.Component<OpenRowProps, {}> { ...@@ -57,8 +57,7 @@ class OpenRow extends React.Component<OpenRowProps, {}> {
</TabPane> </TabPane>
<TabPane tab="Log" key="2"> <TabPane tab="Log" key="2">
{ {
trainingPlatform === 'pai' || trainingPlatform === 'kubeflow' || trainingPlatform !== 'local'
trainingPlatform === 'frameworkcontroller'
? ?
<PaiTrialLog <PaiTrialLog
logStr={logPathRow} logStr={logPathRow}
......
...@@ -106,7 +106,7 @@ def start_rest_server(port, platform, mode, config_file_name, experiment_id=None ...@@ -106,7 +106,7 @@ def start_rest_server(port, platform, mode, config_file_name, experiment_id=None
'You could use \'nnictl create --help\' to get help information' % port) 'You could use \'nnictl create --help\' to get help information' % port)
exit(1) exit(1)
if (platform == 'pai' or platform == 'kubeflow') and detect_port(int(port) + 1): if (platform != 'local') and detect_port(int(port) + 1):
print_error('PAI mode need an additional adjacent port %d, and the port %d is used by another process!\n' \ print_error('PAI mode need an additional adjacent port %d, and the port %d is used by another process!\n' \
'You could set another port to start experiment!\n' \ 'You could set another port to start experiment!\n' \
'You could use \'nnictl create --help\' to get help information' % ((int(port) + 1), (int(port) + 1))) 'You could use \'nnictl create --help\' to get help information' % ((int(port) + 1), (int(port) + 1)))
......
...@@ -25,7 +25,7 @@ import time ...@@ -25,7 +25,7 @@ import time
from xml.dom import minidom from xml.dom import minidom
def check_ready_to_run(): def check_ready_to_run():
pgrep_output =subprocess.check_output('pgrep -fx \'python3 gpu_metrics_collector.py\'', shell=True) pgrep_output =subprocess.check_output('pgrep -fx \'python3 -m nni_gpu_tool.gpu_metrics_collector\'', shell=True)
pidList = [] pidList = []
for pid in pgrep_output.splitlines(): for pid in pgrep_output.splitlines():
pidList.append(int(pid)) pidList.append(int(pid))
...@@ -33,17 +33,18 @@ def check_ready_to_run(): ...@@ -33,17 +33,18 @@ def check_ready_to_run():
return len(pidList) == 0 return len(pidList) == 0
def main(argv): def main(argv):
metrics_output_dir = os.environ['METRIC_OUTPUT_DIR']
if check_ready_to_run() == False: if check_ready_to_run() == False:
# GPU metrics collector is already running. Exit # GPU metrics collector is already running. Exit
exit(2) exit(2)
with open("./gpu_metrics", "w") as outputFile: with open(os.path.join(metrics_output_dir, "gpu_metrics"), "w") as outputFile:
pass pass
os.chmod("./gpu_metrics", 0o777) os.chmod(os.path.join(metrics_output_dir, "gpu_metrics"), 0o777)
cmd = 'nvidia-smi -q -x' cmd = 'nvidia-smi -q -x'
while(True): while(True):
try: try:
smi_output = subprocess.check_output(cmd, shell=True) smi_output = subprocess.check_output(cmd, shell=True)
parse_nvidia_smi_result(smi_output, '.') parse_nvidia_smi_result(smi_output, metrics_output_dir)
except: except:
exception = sys.exc_info() exception = sys.exc_info()
for e in exception: for e in exception:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment