Commit 2b8cf2ee authored by SparkSnail's avatar SparkSnail Committed by fishyds
Browse files

fix paiTrainingService bugs (#137)

* fix nnictl bug

* add hdfs host validation

* fix bugs

* fix dockerfile

* fix install.sh

* update install.sh

* fix dockerfile

* Set timeout for HDFSUtility exists function

* remove unused TODO

* fix sdk

* add optional for outputDir and dataDir

* refactor dockerfile.base

* Remove unused import in hdfsclientUtility
parent 1d174836
...@@ -10,4 +10,6 @@ RUN pip3 --no-cache-dir install tensorflow-gpu==1.10.0 ...@@ -10,4 +10,6 @@ RUN pip3 --no-cache-dir install tensorflow-gpu==1.10.0
# #
#Keras 2.1.6 #Keras 2.1.6
# #
RUN pip3 --no-cache-dir install Keras==2.1.6 RUN pip3 --no-cache-dir install Keras==2.1.6
\ No newline at end of file
WORKDIR /root
\ No newline at end of file
...@@ -22,27 +22,62 @@ FROM nvidia/cuda:9.0-cudnn7-devel-ubuntu16.04 ...@@ -22,27 +22,62 @@ FROM nvidia/cuda:9.0-cudnn7-devel-ubuntu16.04
LABEL maintainer='Microsoft NNI Team<nni@microsoft.com>' LABEL maintainer='Microsoft NNI Team<nni@microsoft.com>'
RUN apt-get update && apt-get install -y --no-install-recommends \ ENV HADOOP_VERSION=2.7.2
sudo apt-utils git curl vim unzip openssh-client wget \ LABEL HADOOP_VERSION=2.7.2
build-essential cmake \
libopenblas-dev
# RUN DEBIAN_FRONTEND=noninteractive && \
# Python 3.5 apt-get -y update && \
# apt-get -y install sudo \
RUN apt-get install -y --no-install-recommends python3.5 python3.5-dev python3-pip python3-tk && \ apt-utils \
pip3 install --no-cache-dir --upgrade pip setuptools && \ git \
echo "alias python='python3'" >> /root/.bash_aliases && \ curl \
echo "alias pip='pip3'" >> /root/.bash_aliases vim \
unzip \
wget \
build-essential \
cmake \
libopenblas-dev \
automake \
openjdk-8-jdk \
openssh-client \
openssh-server \
lsof \
python3.5 \
python3-dev \
python3-pip \
python3-tk \
libcupti-dev && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# numpy 1.14.3 scipy 1.1.0 # numpy 1.14.3 scipy 1.1.0
RUN pip3 --no-cache-dir install \ RUN pip3 --no-cache-dir install \
numpy==1.14.3 scipy==1.1.0 numpy==1.14.3 scipy==1.1.0
# #
#Install node 10.10.0, yarn 1.9.4, NNI v0.1 #Install hadoop
#
RUN wget -qO- http://archive.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz | \
tar xz -C /usr/local && \
mv /usr/local/hadoop-${HADOOP_VERSION} /usr/local/hadoop
#
#Install NNI
# #
RUN git clone -b v0.1 https://github.com/Microsoft/nni.git RUN pip3 install -v --user git+https://github.com/Microsoft/nni.git@v0.2
RUN cd nni && sh install.sh
RUN echo 'PATH=~/.local/node/bin:~/.local/yarn/bin:~/.local/bin:$PATH' >> ~/.bashrc ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 \
RUN cd .. && rm -rf nni HADOOP_INSTALL=/usr/local/hadoop \
NVIDIA_VISIBLE_DEVICES=all
ENV HADOOP_PREFIX=${HADOOP_INSTALL} \
HADOOP_BIN_DIR=${HADOOP_INSTALL}/bin \
HADOOP_SBIN_DIR=${HADOOP_INSTALL}/sbin \
HADOOP_HDFS_HOME=${HADOOP_INSTALL} \
HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_INSTALL}/lib/native \
HADOOP_OPTS="-Djava.library.path=${HADOOP_INSTALL}/lib/native"
ENV PATH=/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/root/.local/bin:/usr/bin:/sbin:/bin:${HADOOP_BIN_DIR}:${HADOOP_SBIN_DIR} \
LD_LIBRARY_PATH=/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/cuda/lib64:/usr/local/cuda/targets/x86_64-linux/lib/stubs:${JAVA_HOME}/jre/lib/amd64/server
WORKDIR /root
#!/bin/bash #!/bin/bash
make build
make install-dependencies make install-dependencies
make build
make dev-install make dev-install
make install-examples
make update-bash-config
source ~/.bashrc source ~/.bashrc
...@@ -131,7 +131,10 @@ export namespace HDFSClientUtility { ...@@ -131,7 +131,10 @@ export namespace HDFSClientUtility {
const deferred : Deferred<boolean> = new Deferred<boolean>(); const deferred : Deferred<boolean> = new Deferred<boolean>();
hdfsClient.exists(hdfsPath, (exist : boolean ) => { hdfsClient.exists(hdfsPath, (exist : boolean ) => {
deferred.resolve(exist); deferred.resolve(exist);
}) });
// Set timeout and reject the promise once reach timeout (5 seconds)
setTimeout(() => deferred.reject(`Check HDFS path ${hdfsPath} exists timeout`), 5000);
return deferred.promise; return deferred.promise;
} }
......
...@@ -64,6 +64,8 @@ class PAITrainingService implements TrainingService { ...@@ -64,6 +64,8 @@ class PAITrainingService implements TrainingService {
private experimentId! : string; private experimentId! : string;
private readonly paiJobCollector : PAIJobInfoCollector; private readonly paiJobCollector : PAIJobInfoCollector;
private readonly hdfsDirPattern: string; private readonly hdfsDirPattern: string;
private hdfsBaseDir: string | undefined;
private hdfsOutputHost: string | undefined;
constructor() { constructor() {
this.log = getLogger(); this.log = getLogger();
...@@ -131,6 +133,14 @@ class PAITrainingService implements TrainingService { ...@@ -131,6 +133,14 @@ class PAITrainingService implements TrainingService {
if (!this.paiToken) { if (!this.paiToken) {
throw new Error('PAI token is not initialized'); throw new Error('PAI token is not initialized');
} }
if(!this.hdfsBaseDir){
throw new Error('hdfsBaseDir is not initialized');
}
if(!this.hdfsOutputHost){
throw new Error('hdfsOutputHost is not initialized');
}
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`); this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
...@@ -156,26 +166,11 @@ class PAITrainingService implements TrainingService { ...@@ -156,26 +166,11 @@ class PAITrainingService implements TrainingService {
// Step 1. Prepare PAI job configuration // Step 1. Prepare PAI job configuration
const paiJobName : string = `nni_exp_${this.experimentId}_trial_${trialJobId}`; const paiJobName : string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
const hdfsCodeDir : string = path.join(this.expRootDir, trialJobId); const hdfsCodeDir : string = path.join(this.expRootDir, trialJobId);
const hdfsDirContent = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern); const hdfsOutputDir : string = path.join(this.hdfsBaseDir, this.experimentId, trialJobId);
if(hdfsDirContent === null) {
throw new Error('Trial outputDir format Error');
}
const groups = hdfsDirContent.groups;
if(groups === undefined) {
throw new Error('Trial outputDir format Error');
}
const hdfsHost = groups['host'];
let hdfsBaseDirectory = groups['baseDir'];
if(hdfsBaseDirectory === undefined) {
hdfsBaseDirectory = "/";
}
const hdfsOutputDir : string = path.join(hdfsBaseDirectory, this.experimentId, trialJobId);
const hdfsLogPath : string = String.Format( const hdfsLogPath : string = String.Format(
PAI_LOG_PATH_FORMAT, PAI_LOG_PATH_FORMAT,
hdfsHost, this.hdfsOutputHost,
hdfsOutputDir); hdfsOutputDir);
const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail( const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail(
...@@ -198,7 +193,7 @@ class PAITrainingService implements TrainingService { ...@@ -198,7 +193,7 @@ class PAITrainingService implements TrainingService {
this.paiTrialConfig.command, this.paiTrialConfig.command,
getIPV4Address(), getIPV4Address(),
hdfsOutputDir, hdfsOutputDir,
hdfsHost, this.hdfsOutputHost,
this.paiClusterConfig.userName this.paiClusterConfig.userName
).replace(/\r\n|\n|\r/gm, ''); ).replace(/\r\n|\n|\r/gm, '');
...@@ -309,7 +304,7 @@ class PAITrainingService implements TrainingService { ...@@ -309,7 +304,7 @@ class PAITrainingService implements TrainingService {
return deferred.promise; return deferred.promise;
} }
public setClusterMetadata(key: string, value: string): Promise<void> { public async setClusterMetadata(key: string, value: string): Promise<void> {
const deferred : Deferred<void> = new Deferred<void>(); const deferred : Deferred<void> = new Deferred<void>();
switch (key) { switch (key) {
...@@ -336,13 +331,12 @@ class PAITrainingService implements TrainingService { ...@@ -336,13 +331,12 @@ class PAITrainingService implements TrainingService {
request(authentication_req, (error: Error, response: request.Response, body: any) => { request(authentication_req, (error: Error, response: request.Response, body: any) => {
if (error) { if (error) {
//TODO: should me make the setClusterMetadata's return type to Promise<string>?
this.log.error(`Get PAI token failed: ${error.message}`); this.log.error(`Get PAI token failed: ${error.message}`);
deferred.reject(); deferred.reject(new Error(`Get PAI token failed: ${error.message}`));
} else { } else {
if(response.statusCode !== 200){ if(response.statusCode !== 200){
this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`); this.log.error(`Get PAI token failed: get PAI Rest return code ${response.statusCode}`);
deferred.reject(); deferred.reject(new Error(`Get PAI token failed, please check paiConfig username or password`));
} }
this.paiToken = body.token; this.paiToken = body.token;
...@@ -353,7 +347,7 @@ class PAITrainingService implements TrainingService { ...@@ -353,7 +347,7 @@ class PAITrainingService implements TrainingService {
case TrialConfigMetadataKey.TRIAL_CONFIG: case TrialConfigMetadataKey.TRIAL_CONFIG:
if (!this.paiClusterConfig){ if (!this.paiClusterConfig){
this.log.error('pai cluster config is not initialized'); this.log.error('pai cluster config is not initialized');
deferred.reject(); deferred.reject(new Error('pai cluster config is not initialized'));
break; break;
} }
this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value); this.paiTrialConfig = <NNIPAITrialConfig>JSON.parse(value);
...@@ -364,6 +358,38 @@ class PAITrainingService implements TrainingService { ...@@ -364,6 +358,38 @@ class PAITrainingService implements TrainingService {
this.paiClusterConfig.host this.paiClusterConfig.host
).replace(/\r\n|\n|\r/gm, ''); ).replace(/\r\n|\n|\r/gm, '');
} }
const hdfsDirContent = this.paiTrialConfig.outputDir.match(this.hdfsDirPattern);
if(hdfsDirContent === null) {
throw new Error('Trial outputDir format Error');
}
const groups = hdfsDirContent.groups;
if(groups === undefined) {
throw new Error('Trial outputDir format Error');
}
this.hdfsOutputHost = groups['host'];
this.hdfsBaseDir = groups['baseDir'];
if(this.hdfsBaseDir === undefined) {
this.hdfsBaseDir = "/";
}
const hdfsClient = WebHDFS.createClient({
user: this.paiClusterConfig.userName,
port: 50070,
host: this.hdfsOutputHost
});
try {
const exist : boolean = await HDFSClientUtility.pathExists("/", hdfsClient);
if(!exist) {
deferred.reject(new Error(`Please check hdfsOutputDir host!`));
}
} catch(error) {
deferred.reject(new Error(`HDFS encounters problem, error is ${error}. Please check hdfsOutputDir host!`));
}
deferred.resolve(); deferred.resolve();
break; break;
default: default:
......
...@@ -30,6 +30,8 @@ if not os.path.exists(os.path.join(_sysdir, '.nni')): ...@@ -30,6 +30,8 @@ if not os.path.exists(os.path.join(_sysdir, '.nni')):
_metric_file = open(os.path.join(_sysdir, '.nni', 'metrics'), 'wb') _metric_file = open(os.path.join(_sysdir, '.nni', 'metrics'), 'wb')
_outputdir = os.environ['NNI_OUTPUT_DIR'] _outputdir = os.environ['NNI_OUTPUT_DIR']
if not os.path.exists(_outputdir):
os.makedirs(_outputdir)
_log_file_path = os.path.join(_outputdir, 'trial.log') _log_file_path = os.path.join(_outputdir, 'trial.log')
init_logger(_log_file_path) init_logger(_log_file_path)
......
...@@ -74,8 +74,8 @@ pai_trial_schema = { ...@@ -74,8 +74,8 @@ pai_trial_schema = {
'cpuNum': And(int, lambda x: 0 <= x <= 99999), 'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int, 'memoryMB': int,
'image': str, 'image': str,
'dataDir': Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'), Optional('dataDir'): Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),
'outputDir': Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?') Optional('outputDir'): Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?')
} }
} }
......
...@@ -79,6 +79,7 @@ def set_trial_config(experiment_config, port): ...@@ -79,6 +79,7 @@ def set_trial_config(experiment_config, port):
if check_response(response): if check_response(response):
return True return True
else: else:
print('Error message is {}'.format(response.text))
with open(STDERR_FULL_PATH, 'a+') as fout: with open(STDERR_FULL_PATH, 'a+') as fout:
fout.write(json.dumps(json.loads(response.text), indent=4, sort_keys=True, separators=(',', ':'))) fout.write(json.dumps(json.loads(response.text), indent=4, sort_keys=True, separators=(',', ':')))
return False return False
...@@ -109,7 +110,7 @@ def set_pai_config(experiment_config, port): ...@@ -109,7 +110,7 @@ def set_pai_config(experiment_config, port):
pai_config_data = dict() pai_config_data = dict()
pai_config_data['pai_config'] = experiment_config['paiConfig'] pai_config_data['pai_config'] = experiment_config['paiConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(pai_config_data), 20) response = rest_put(cluster_metadata_url(port), json.dumps(pai_config_data), 20)
err_message = '' err_message = None
if not response or not response.status_code == 200: if not response or not response.status_code == 200:
if response is not None: if response is not None:
err_message = response.text err_message = response.text
...@@ -172,6 +173,7 @@ def set_experiment(experiment_config, mode, port): ...@@ -172,6 +173,7 @@ def set_experiment(experiment_config, mode, port):
else: else:
with open(STDERR_FULL_PATH, 'a+') as fout: with open(STDERR_FULL_PATH, 'a+') as fout:
fout.write(json.dumps(json.loads(response.text), indent=4, sort_keys=True, separators=(',', ':'))) fout.write(json.dumps(json.loads(response.text), indent=4, sort_keys=True, separators=(',', ':')))
print_error('Setting experiment error, error message is {}'.format(response.text))
return None return None
def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=None): def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=None):
...@@ -251,7 +253,8 @@ def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=No ...@@ -251,7 +253,8 @@ def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=No
if config_result: if config_result:
print_normal('Success!') print_normal('Success!')
else: else:
print_error('Failed! Error is: {}'.format(err_msg)) if err_msg:
print_error('Failed! Error is: {}'.format(err_msg))
try: try:
cmds = ['pkill', '-P', str(rest_process.pid)] cmds = ['pkill', '-P', str(rest_process.pid)]
call(cmds) call(cmds)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment