"...composable_kernel_onnxruntime.git" did not exist on "b53e9d08ed5a9f80d8c13cf8bedd86155cc7c244"
Unverified Commit 9397b6f6 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

[PAITrainingService] Improve uploading codeDir efficiency (#479)

* [PAI training service] codeDir files upload improvement

* Create full local temp folder

* Organize the folder structure for experiment and trial files
parent 2c601151
...@@ -20,12 +20,38 @@ ...@@ -20,12 +20,38 @@
import * as path from 'path'; import * as path from 'path';
import * as fs from 'fs'; import * as fs from 'fs';
import { Deferred } from 'ts-deferred'; import { Deferred } from 'ts-deferred';
import { getExperimentId } from '../../common/experimentStartupInfo';
import { getLogger } from '../../common/log'; import { getLogger } from '../../common/log';
/** /**
* HDFS client utility, including copy file/directory * HDFS client utility, including copy file/directory
*/ */
export namespace HDFSClientUtility { export namespace HDFSClientUtility {
/**
* Get NNI experiment root directory
* @param hdfsUserName HDFS user name
*/
function hdfsExpRootDir(hdfsUserName: string): string {
return path.join('/', hdfsUserName, 'nni', 'experiments', getExperimentId());
}
/**
* Get NNI experiment code directory
* @param hdfsUserName HDFS user name
*/
export function getHdfsExpCodeDir(hdfsUserName: string): string {
return path.join(hdfsExpRootDir(hdfsUserName), 'codeDir');
}
/**
* Get NNI trial working directory
* @param hdfsUserName HDFS user name
* @param trialId NNI trial ID
*/
export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
return path.join(hdfsExpRootDir(hdfsUserName), 'trials', trialId);
}
/** /**
* Copy a local file to hdfs directory * Copy a local file to hdfs directory
* *
......
...@@ -63,7 +63,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string = ...@@ -63,7 +63,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} `export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4}
&& cd $NNI_SYS_DIR && sh install_nni.sh && cd $NNI_SYS_DIR && sh install_nni.sh
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' && python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}'
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10}`; --pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}'`;
export const PAI_OUTPUT_DIR_FORMAT: string = export const PAI_OUTPUT_DIR_FORMAT: string =
`hdfs://{0}:9000/`; `hdfs://{0}:9000/`;
......
...@@ -30,7 +30,7 @@ import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; ...@@ -30,7 +30,7 @@ import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData';
import { Deferred } from 'ts-deferred'; import { Deferred } from 'ts-deferred';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo'; import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { HDFSClientUtility } from './hdfsClientUtility' import { HDFSClientUtility } from './hdfsClientUtility';
import { MethodNotImplementedError } from '../../common/errors'; import { MethodNotImplementedError } from '../../common/errors';
import { getLogger, Logger } from '../../common/log'; import { getLogger, Logger } from '../../common/log';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
...@@ -38,7 +38,7 @@ import { ...@@ -38,7 +38,7 @@ import {
JobApplicationForm, TrainingService, TrialJobApplicationForm, JobApplicationForm, TrainingService, TrialJobApplicationForm,
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService'; } from '../../common/trainingService';
import { countFilesRecursively, delay, generateParamFileName, import { delay, generateParamFileName,
getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils'; getExperimentRootDir, getIPV4Address, uniqueString } from '../../common/utils';
import { PAIJobRestServer } from './paiJobRestServer' import { PAIJobRestServer } from './paiJobRestServer'
import { PAITrialJobDetail, PAI_TRIAL_COMMAND_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_LOG_PATH_FORMAT } from './paiData'; import { PAITrialJobDetail, PAI_TRIAL_COMMAND_FORMAT, PAI_OUTPUT_DIR_FORMAT, PAI_LOG_PATH_FORMAT } from './paiData';
...@@ -74,6 +74,7 @@ class PAITrainingService implements TrainingService { ...@@ -74,6 +74,7 @@ class PAITrainingService implements TrainingService {
private nextTrialSequenceId: number; private nextTrialSequenceId: number;
private paiRestServerPort?: number; private paiRestServerPort?: number;
private nniManagerIpConfig?: NNIManagerIpConfig; private nniManagerIpConfig?: NNIManagerIpConfig;
private copyExpCodeDirPromise?: Promise<void>;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -145,11 +146,11 @@ class PAITrainingService implements TrainingService { ...@@ -145,11 +146,11 @@ class PAITrainingService implements TrainingService {
throw new Error('PAI token is not initialized'); throw new Error('PAI token is not initialized');
} }
if(!this.hdfsBaseDir){ if(!this.hdfsBaseDir) {
throw new Error('hdfsBaseDir is not initialized'); throw new Error('hdfsBaseDir is not initialized');
} }
if(!this.hdfsOutputHost){ if(!this.hdfsOutputHost) {
throw new Error('hdfsOutputHost is not initialized'); throw new Error('hdfsOutputHost is not initialized');
} }
...@@ -160,6 +161,11 @@ class PAITrainingService implements TrainingService { ...@@ -160,6 +161,11 @@ class PAITrainingService implements TrainingService {
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`); this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
// Make sure experiment code files is copied from local to HDFS
if(this.copyExpCodeDirPromise) {
await this.copyExpCodeDirPromise;
}
const trialJobId: string = uniqueString(5); const trialJobId: string = uniqueString(5);
const trialSequenceId: number = this.generateSequenceId(); const trialSequenceId: number = this.generateSequenceId();
//TODO: use HDFS working folder instead //TODO: use HDFS working folder instead
...@@ -167,8 +173,7 @@ class PAITrainingService implements TrainingService { ...@@ -167,8 +173,7 @@ class PAITrainingService implements TrainingService {
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId); const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//create tmp trial working folder locally. //create tmp trial working folder locally.
await cpp.exec(`mkdir -p ${path.dirname(trialLocalTempFolder)}`); await cpp.exec(`mkdir -p ${trialLocalTempFolder}`);
await cpp.exec(`cp -r ${this.paiTrialConfig.codeDir} ${trialLocalTempFolder}`);
const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT; const runScriptContent : string = CONTAINER_INSTALL_NNI_SHELL_FORMAT;
// Write NNI installation file to local tmp files // Write NNI installation file to local tmp files
...@@ -182,8 +187,8 @@ class PAITrainingService implements TrainingService { ...@@ -182,8 +187,8 @@ class PAITrainingService implements TrainingService {
} }
// Step 1. Prepare PAI job configuration // Step 1. Prepare PAI job configuration
const paiJobName : string = `nni_exp_${this.experimentId}_trial_${trialJobId}`; const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
const hdfsCodeDir : string = path.join(this.expRootDir, trialJobId); const hdfsCodeDir: string = HDFSClientUtility.getHdfsTrialWorkDir(this.paiClusterConfig.userName, trialJobId);
const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId); const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
const hdfsLogPath : string = String.Format( const hdfsLogPath : string = String.Format(
...@@ -215,7 +220,8 @@ class PAITrainingService implements TrainingService { ...@@ -215,7 +220,8 @@ class PAITrainingService implements TrainingService {
this.paiRestServerPort, this.paiRestServerPort,
hdfsOutputDir, hdfsOutputDir,
this.hdfsOutputHost, this.hdfsOutputHost,
this.paiClusterConfig.userName this.paiClusterConfig.userName,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName)
).replace(/\r\n|\n|\r/gm, ''); ).replace(/\r\n|\n|\r/gm, '');
console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`); console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
...@@ -390,6 +396,7 @@ class PAITrainingService implements TrainingService { ...@@ -390,6 +396,7 @@ class PAITrainingService implements TrainingService {
} }
this.hdfsOutputHost = groups['host']; this.hdfsOutputHost = groups['host'];
//TODO: choose to use /${username} as baseDir
this.hdfsBaseDir = groups['baseDir']; this.hdfsBaseDir = groups['baseDir'];
if(this.hdfsBaseDir === undefined) { if(this.hdfsBaseDir === undefined) {
this.hdfsBaseDir = "/"; this.hdfsBaseDir = "/";
...@@ -415,6 +422,11 @@ class PAITrainingService implements TrainingService { ...@@ -415,6 +422,11 @@ class PAITrainingService implements TrainingService {
deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`)); deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`));
} }
// Copy experiment files from local folder to HDFS
this.copyExpCodeDirPromise = HDFSClientUtility.copyDirectoryToHdfs(this.paiTrialConfig.codeDir,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
this.hdfsClient);
deferred.resolve(); deferred.resolve();
break; break;
default: default:
......
...@@ -19,10 +19,55 @@ ...@@ -19,10 +19,55 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os import os
import posixpath
from pyhdfs import HdfsClient from pyhdfs import HdfsClient
from .log_utils import LogType, nni_log
def copyHdfsDirectoryToLocal(hdfsDirectory, localDirectory, hdfsClient):
'''Copy directory from HDFS to local'''
if not os.path.exists(localDirectory):
os.makedirs(localDirectory)
try:
listing = hdfsClient.list_status(hdfsDirectory)
except Exception as exception:
nni_log(LogType.Error, 'List hdfs directory {0} error: {1}'.format(hdfsDirectory, str(exception)))
raise exception
for f in listing:
if f.type == 'DIRECTORY':
subHdfsDirectory = posixpath.join(hdfsDirectory, f.pathSuffix)
subLocalDirectory = os.path.join(localDirectory, f.pathSuffix)
copyHdfsDirectoryToLocal(subHdfsDirectory, subLocalDirectory, hdfsClient)
elif f.type == 'FILE':
hdfsFilePath = posixpath.join(hdfsDirectory, f.pathSuffix)
localFilePath = os.path.join(localDirectory, f.pathSuffix)
copyHdfsFileToLocal(hdfsFilePath, localFilePath, hdfsClient)
else:
raise AssertionError('unexpected type {}'.format(f.type))
def copyHdfsFileToLocal(hdfsFilePath, localFilePath, hdfsClient, override=True):
'''Copy file from HDFS to local'''
if not hdfsClient.exists(hdfsFilePath):
raise Exception('HDFS file {} does not exist!'.format(hdfsFilePath))
try:
file_status = hdfsClient.get_file_status(hdfsFilePath)
if file_status.type != 'FILE':
raise Exception('HDFS file path {} is not a file'.format(hdfsFilePath))
except Exception as exception:
nni_log(LogType.Error, 'Get hdfs file {0} status error: {1}'.format(hdfsFilePath, str(exception)))
raise exception
if os.path.exists(localFilePath) and override:
os.remove(localFilePath)
try:
hdfsClient.copy_to_local(hdfsFilePath, localFilePath)
except Exception as exception:
nni_log(LogType.Error, 'Copy hdfs file {0} to {1} error: {2}'.format(hdfsFilePath, localFilePath, str(exception)))
raise exception
nni_log(LogType.Info, 'Successfully copied hdfs file {0} to {1}, {2} bytes'.format(hdfsFilePath, localFilePath, file_status.length))
def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient): def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient):
'''Copy directory from local to hdfs''' '''Copy directory from local to HDFS'''
if not os.path.exists(localDirectory): if not os.path.exists(localDirectory):
raise Exception('Local Directory does not exist!') raise Exception('Local Directory does not exist!')
hdfsClient.mkdirs(hdfsDirectory) hdfsClient.mkdirs(hdfsDirectory)
...@@ -34,19 +79,19 @@ def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient): ...@@ -34,19 +79,19 @@ def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient):
try: try:
result = result and copyDirectoryToHdfs(file_path, hdfs_directory, hdfsClient) result = result and copyDirectoryToHdfs(file_path, hdfs_directory, hdfsClient)
except Exception as exception: except Exception as exception:
print(exception) nni_log(LogType.Error, 'Copy local directory {0} to hdfs directory {1} error: {2}'.format(file_path, hdfs_directory, str(exception)))
result = False result = False
else: else:
hdfs_file_path = os.path.join(hdfsDirectory, file) hdfs_file_path = os.path.join(hdfsDirectory, file)
try: try:
result = result and copyFileToHdfs(file_path, hdfs_file_path, hdfsClient) result = result and copyFileToHdfs(file_path, hdfs_file_path, hdfsClient)
except Exception as exception: except Exception as exception:
print(exception) nni_log(LogType.Error, 'Copy local file {0} to hdfs {1} error: {2}'.format(file_path, hdfs_file_path, str(exception)))
result = False result = False
return result return result
def copyFileToHdfs(localFilePath, hdfsFilePath, hdfsClient, override=True): def copyFileToHdfs(localFilePath, hdfsFilePath, hdfsClient, override=True):
'''Copy a local file to hdfs directory''' '''Copy a local file to HDFS directory'''
if not os.path.exists(localFilePath): if not os.path.exists(localFilePath):
raise Exception('Local file Path does not exist!') raise Exception('Local file Path does not exist!')
if os.path.isdir(localFilePath): if os.path.isdir(localFilePath):
...@@ -60,5 +105,5 @@ def copyFileToHdfs(localFilePath, hdfsFilePath, hdfsClient, override=True): ...@@ -60,5 +105,5 @@ def copyFileToHdfs(localFilePath, hdfsFilePath, hdfsClient, override=True):
hdfsClient.copy_from_local(localFilePath, hdfsFilePath) hdfsClient.copy_from_local(localFilePath, hdfsFilePath)
return True return True
except Exception as exception: except Exception as exception:
print(exception) nni_log(LogType.Error, 'Copy local file {0} to hdfs file {1} error: {2}'.format(localFilePath, hdfsFilePath, str(exception)))
return False return False
\ No newline at end of file
...@@ -28,7 +28,7 @@ import re ...@@ -28,7 +28,7 @@ import re
from pyhdfs import HdfsClient from pyhdfs import HdfsClient
from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .hdfsClientUtility import copyDirectoryToHdfs from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal
from .log_utils import LogType, nni_log from .log_utils import LogType, nni_log
from .metrics_reader import read_experiment_metrics from .metrics_reader import read_experiment_metrics
...@@ -42,6 +42,15 @@ def main_loop(args): ...@@ -42,6 +42,15 @@ def main_loop(args):
stdout_file = open(STDOUT_FULL_PATH, 'a+') stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+') stderr_file = open(STDERR_FULL_PATH, 'a+')
try:
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
except Exception as e:
nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
raise e
copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client)
# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file) process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file)
nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command))) nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command)))
...@@ -57,7 +66,6 @@ def main_loop(args): ...@@ -57,7 +66,6 @@ def main_loop(args):
# Copy local directory to hdfs for OpenPAI # Copy local directory to hdfs for OpenPAI
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
try: try:
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client): if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client):
nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir))
else: else:
...@@ -85,6 +93,7 @@ if __name__ == '__main__': ...@@ -85,6 +93,7 @@ if __name__ == '__main__':
PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of hdfs') PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of hdfs')
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of hdfs') PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of hdfs')
PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
args, unknown = PARSER.parse_known_args() args, unknown = PARSER.parse_known_args()
if args.trial_command is None: if args.trial_command is None:
exit(1) exit(1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment