"examples/git@developer.sourcefind.cn:OpenDAS/nni.git" did not exist on "bb8114a4a65145a1489436f98392007e12935ae9"
Unverified Commit 8c4c0ef2 authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

Support webhdfs path in python hdfs client (#722)

trial_keeper use 50070 port to connect to webhdfs server, and PAI use a mapping method to map 50070 port to 5070 port to visit restful server, this method has some risk for PAI may not support this kind of mapping in later release.Now use Pylon path(/webhdfs/api/v1) instead of 50070 port in webhdfs client of trial_keeper, the path is transmitted in trainingService.
In this pr, we have these changes:

1. Change to use webhdfs path instead of 50070 port in hdfs client.
2. Change to use new hdfs package "PythonWebHDFS", which is build to support pylon by myself. You could test the new function from "sparksnail/nni:dev-pai" image to test pai trainingService.
3. Update some variables' name according to comments.
parent 6d495c42
...@@ -63,7 +63,7 @@ setuptools.setup( ...@@ -63,7 +63,7 @@ setuptools.setup(
'psutil', 'psutil',
'requests', 'requests',
'astor', 'astor',
'pyhdfs', 'PythonWebHDFS',
'hyperopt', 'hyperopt',
'json_tricks', 'json_tricks',
'numpy', 'numpy',
......
...@@ -63,7 +63,7 @@ setup( ...@@ -63,7 +63,7 @@ setup(
'requests', 'requests',
'scipy', 'scipy',
'schema', 'schema',
'pyhdfs' 'PythonWebHDFS'
], ],
cmdclass={ cmdclass={
......
...@@ -64,7 +64,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string = ...@@ -64,7 +64,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} `export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4}
&& cd $NNI_SYS_DIR && sh install_nni.sh && cd $NNI_SYS_DIR && sh install_nni.sh
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' && python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}'
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}'`; --pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1'`;
export const PAI_OUTPUT_DIR_FORMAT: string = export const PAI_OUTPUT_DIR_FORMAT: string =
`hdfs://{0}:9000/`; `hdfs://{0}:9000/`;
......
...@@ -48,10 +48,25 @@ def main_loop(args): ...@@ -48,10 +48,25 @@ def main_loop(args):
# redirect trial keeper's stdout and stderr to syslog # redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout) trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout)
sys.stdout = sys.stderr = trial_keeper_syslogger sys.stdout = sys.stderr = trial_keeper_syslogger
# backward compatibility
hdfs_host = None
hdfs_output_dir = None
if args.hdfs_host:
hdfs_host = args.hdfs_host
elif args.pai_hdfs_host:
hdfs_host = args.pai_hdfs_host
if args.hdfs_output_dir:
hdfs_output_dir = args.hdfs_output_dir
elif args.pai_hdfs_output_dir:
hdfs_output_dir = args.pai_hdfs_output_dir
if args.pai_hdfs_host is not None and args.nni_hdfs_exp_dir is not None: if hdfs_host is not None and args.nni_hdfs_exp_dir is not None:
try: try:
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) if args.webhdfs_path:
hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5)
else:
# backward compatibility
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5)
except Exception as e: except Exception as e:
nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) nni_log(LogType.Error, 'Create HDFS client error: ' + str(e))
raise e raise e
...@@ -67,14 +82,14 @@ def main_loop(args): ...@@ -67,14 +82,14 @@ def main_loop(args):
# child worker process exits and all stdout data is read # child worker process exits and all stdout data is read
if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True: if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True:
nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode))
if args.pai_hdfs_output_dir is not None: if hdfs_output_dir is not None:
# Copy local directory to hdfs for OpenPAI # Copy local directory to hdfs for OpenPAI
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
try: try:
if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client): if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client):
nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir))
else: else:
nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir))
except Exception as e: except Exception as e:
nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e)) nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e))
raise e raise e
...@@ -95,10 +110,13 @@ if __name__ == '__main__': ...@@ -95,10 +110,13 @@ if __name__ == '__main__':
PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP') PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP')
PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port') PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port')
PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of hdfs') PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of hdfs') PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs')
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility
PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs')
PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
args, unknown = PARSER.parse_known_args() args, unknown = PARSER.parse_known_args()
if args.trial_command is None: if args.trial_command is None:
exit(1) exit(1)
......
...@@ -12,7 +12,7 @@ setuptools.setup( ...@@ -12,7 +12,7 @@ setuptools.setup(
'psutil', 'psutil',
'astor', 'astor',
'schema', 'schema',
'pyhdfs' 'PythonWebHDFS'
], ],
author = 'Microsoft NNI Team', author = 'Microsoft NNI Team',
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment