Unverified Commit d3506e34 authored by fishyds's avatar fishyds Committed by GitHub
Browse files

PAI Training Service implementation (#128)

* PAI Training service implementation
**1. Implement PAITrainingService
**2. Add trial-keeper python module, and modify setup.py to install the module
**3. Add PAItrainingService rest server to collect metrics from PAI container.
parent cc5372e2
......@@ -224,7 +224,7 @@ accepts@~1.3.5:
mime-types "~2.1.18"
negotiator "0.6.1"
ajv@^5.1.0:
ajv@^5.1.0, ajv@^5.3.0:
version "5.5.2"
resolved "https://registry.yarnpkg.com/ajv/-/ajv-5.5.2.tgz#73b5eeca3fab653e3d3f9422b341ad42205dc965"
dependencies:
......@@ -310,6 +310,10 @@ aws4@^1.6.0:
version "1.7.0"
resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.7.0.tgz#d4d0e9b9dbfca77bf08eeb0a8a471550fe39e289"
aws4@^1.8.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/aws4/-/aws4-1.8.0.tgz#f0e003d9ca9e7f59c7a508945d7b2ef9a04a542f"
babel-code-frame@^6.22.0:
version "6.26.0"
resolved "https://registry.yarnpkg.com/babel-code-frame/-/babel-code-frame-6.26.0.tgz#63fd43f7dc1e3bb7ce35947db8fe369a3f58c74b"
......@@ -364,6 +368,10 @@ buffer-from@^1.0.0, buffer-from@^1.1.0:
version "1.1.1"
resolved "https://registry.yarnpkg.com/buffer-from/-/buffer-from-1.1.1.tgz#32713bc028f75c02fdb710d7c7bcec1f2c6070ef"
buffer-stream-reader@^0.1.1:
version "0.1.1"
resolved "https://registry.yarnpkg.com/buffer-stream-reader/-/buffer-stream-reader-0.1.1.tgz#ca8bf93631deedd8b8f8c3bb44991cc30951e259"
builtin-modules@^1.1.1:
version "1.1.1"
resolved "https://registry.yarnpkg.com/builtin-modules/-/builtin-modules-1.1.1.tgz#270f076c5a72c02f5b65a47df94c5fe3a278892f"
......@@ -455,6 +463,12 @@ combined-stream@1.0.6, combined-stream@~1.0.5:
dependencies:
delayed-stream "~1.0.0"
combined-stream@~1.0.6:
version "1.0.7"
resolved "https://registry.yarnpkg.com/combined-stream/-/combined-stream-1.0.7.tgz#2d1d24317afb8abe95d6d2c0b07b57813539d828"
dependencies:
delayed-stream "~1.0.0"
commander@2.15.1:
version "2.15.1"
resolved "https://registry.yarnpkg.com/commander/-/commander-2.15.1.tgz#df46e867d0fc2aec66a34662b406a9ccafff5b0f"
......@@ -635,7 +649,7 @@ extend@2.0.x:
version "2.0.2"
resolved "https://registry.yarnpkg.com/extend/-/extend-2.0.2.tgz#1b74985400171b85554894459c978de6ef453ab7"
extend@~3.0.1:
extend@^3.0.0, extend@~3.0.1, extend@~3.0.2:
version "3.0.2"
resolved "https://registry.yarnpkg.com/extend/-/extend-3.0.2.tgz#f8b1136b4071fbd8eb140aff858b1019ec2915fa"
......@@ -671,7 +685,7 @@ forever-agent@~0.6.1:
version "0.6.1"
resolved "https://registry.yarnpkg.com/forever-agent/-/forever-agent-0.6.1.tgz#fbc71f0c41adeb37f96c577ad1ed42d8fdacca91"
form-data@~2.3.1:
form-data@~2.3.1, form-data@~2.3.2:
version "2.3.2"
resolved "https://registry.yarnpkg.com/form-data/-/form-data-2.3.2.tgz#4970498be604c20c005d4f5c23aecd21d6b49099"
dependencies:
......@@ -763,6 +777,13 @@ har-validator@~5.0.3:
ajv "^5.1.0"
har-schema "^2.0.0"
har-validator@~5.1.0:
version "5.1.0"
resolved "https://registry.yarnpkg.com/har-validator/-/har-validator-5.1.0.tgz#44657f5688a22cfd4b72486e81b3a3fb11742c29"
dependencies:
ajv "^5.3.0"
har-schema "^2.0.0"
has-ansi@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/has-ansi/-/has-ansi-2.0.0.tgz#34f5049ce1ecdf2b0649af3ef24e45ed35416d91"
......@@ -870,6 +891,10 @@ is-typedarray@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/is-typedarray/-/is-typedarray-1.0.0.tgz#e479c80858df0c1b11ddda6940f96011fcda4a9a"
is@~0.2.6:
version "0.2.7"
resolved "http://registry.npmjs.org/is/-/is-0.2.7.tgz#3b34a2c48f359972f35042849193ae7264b63562"
isarray@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/isarray/-/isarray-1.0.0.tgz#bb935d48582cba168c06834957a54a3e07124f11"
......@@ -958,12 +983,22 @@ mime-db@~1.35.0:
version "1.35.0"
resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.35.0.tgz#0569d657466491283709663ad379a99b90d9ab47"
mime-db@~1.36.0:
version "1.36.0"
resolved "https://registry.yarnpkg.com/mime-db/-/mime-db-1.36.0.tgz#5020478db3c7fe93aad7bbcc4dcf869c43363397"
mime-types@^2.1.12, mime-types@~2.1.17, mime-types@~2.1.18:
version "2.1.19"
resolved "https://registry.yarnpkg.com/mime-types/-/mime-types-2.1.19.tgz#71e464537a7ef81c15f2db9d97e913fc0ff606f0"
dependencies:
mime-db "~1.35.0"
mime-types@~2.1.19:
version "2.1.20"
resolved "https://registry.yarnpkg.com/mime-types/-/mime-types-2.1.20.tgz#930cb719d571e903738520f8470911548ca2cc19"
dependencies:
mime-db "~1.36.0"
mime@1.4.1:
version "1.4.1"
resolved "https://registry.yarnpkg.com/mime/-/mime-1.4.1.tgz#121f9ebc49e3766f311a76e1fa1c8003c4b03aa6"
......@@ -1066,6 +1101,19 @@ node-version@^1.0.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/node-version/-/node-version-1.2.0.tgz#34fde3ffa8e1149bd323983479dda620e1b5060d"
node.extend@1.0.8:
version "1.0.8"
resolved "https://registry.yarnpkg.com/node.extend/-/node.extend-1.0.8.tgz#bab04379f7383f4587990c9df07b6a7f65db772b"
dependencies:
is "~0.2.6"
object-keys "~0.4.0"
node.flow@1.2.3:
version "1.2.3"
resolved "https://registry.yarnpkg.com/node.flow/-/node.flow-1.2.3.tgz#e1c44a82aeca8d78b458a77fb3dc642f2eba2649"
dependencies:
node.extend "1.0.8"
nopt@^4.0.1:
version "4.0.1"
resolved "https://registry.yarnpkg.com/nopt/-/nopt-4.0.1.tgz#d0d4685afd5415193c8c7505602d0d17cd64474d"
......@@ -1101,10 +1149,18 @@ oauth-sign@~0.8.2:
version "0.8.2"
resolved "https://registry.yarnpkg.com/oauth-sign/-/oauth-sign-0.8.2.tgz#46a6ab7f0aead8deae9ec0565780b7d4efeb9d43"
oauth-sign@~0.9.0:
version "0.9.0"
resolved "https://registry.yarnpkg.com/oauth-sign/-/oauth-sign-0.9.0.tgz#47a7b016baa68b5fa0ecf3dee08a85c679ac6455"
object-assign@^4.0.1, object-assign@^4.1.0:
version "4.1.1"
resolved "https://registry.yarnpkg.com/object-assign/-/object-assign-4.1.1.tgz#2109adc7965887cfc05cbbd442cac8bfbb360863"
object-keys@~0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/object-keys/-/object-keys-0.4.0.tgz#28a6aae7428dd2c3a92f3d95f21335dd204e0336"
on-finished@~2.3.0:
version "2.3.0"
resolved "https://registry.yarnpkg.com/on-finished/-/on-finished-2.3.0.tgz#20f1336481b083cd75337992a16971aa2d906947"
......@@ -1199,6 +1255,10 @@ pseudomap@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/pseudomap/-/pseudomap-1.0.2.tgz#f052a28da70e618917ef0a8ac34c1ae5a68286b3"
psl@^1.1.24:
version "1.1.29"
resolved "https://registry.yarnpkg.com/psl/-/psl-1.1.29.tgz#60f580d360170bb722a797cc704411e6da850c67"
punycode@^1.4.1:
version "1.4.1"
resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.4.1.tgz#c0d5a63b2718800ad8e1eb0fa5269c84dd41845e"
......@@ -1207,7 +1267,7 @@ qs@6.5.1:
version "6.5.1"
resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.1.tgz#349cdf6eef89ec45c12d7d5eb3fc0c870343a6d8"
qs@~6.5.1:
qs@~6.5.1, qs@~6.5.2:
version "6.5.2"
resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.2.tgz#cb3ae806e8740444584ef154ce8ee98d403f3e36"
......@@ -1249,6 +1309,31 @@ reflect-metadata@^0.1.10:
version "0.1.12"
resolved "https://registry.yarnpkg.com/reflect-metadata/-/reflect-metadata-0.1.12.tgz#311bf0c6b63cd782f228a81abe146a2bfa9c56f2"
request@^2.74.0:
version "2.88.0"
resolved "https://registry.yarnpkg.com/request/-/request-2.88.0.tgz#9c2fca4f7d35b592efe57c7f0a55e81052124fef"
dependencies:
aws-sign2 "~0.7.0"
aws4 "^1.8.0"
caseless "~0.12.0"
combined-stream "~1.0.6"
extend "~3.0.2"
forever-agent "~0.6.1"
form-data "~2.3.2"
har-validator "~5.1.0"
http-signature "~1.2.0"
is-typedarray "~1.0.0"
isstream "~0.1.2"
json-stringify-safe "~5.0.1"
mime-types "~2.1.19"
oauth-sign "~0.9.0"
performance-now "^2.1.0"
qs "~6.5.2"
safe-buffer "^5.1.2"
tough-cookie "~2.4.3"
tunnel-agent "^0.6.0"
uuid "^3.3.2"
request@^2.87.0:
version "2.87.0"
resolved "https://registry.yarnpkg.com/request/-/request-2.87.0.tgz#32f00235cd08d482b4d0d68db93a829c0ed5756e"
......@@ -1294,6 +1379,12 @@ rimraf@^2.6.1:
dependencies:
glob "^7.0.5"
rmdir@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/rmdir/-/rmdir-1.2.0.tgz#4fe0357cb06168c258e73e968093dc4e8a0f3253"
dependencies:
node.flow "1.2.3"
rx@^4.1.0:
version "4.1.0"
resolved "https://registry.yarnpkg.com/rx/-/rx-4.1.0.tgz#a5f13ff79ef3b740fe30aa803fb09f98805d4782"
......@@ -1510,6 +1601,13 @@ tough-cookie@~2.3.3:
dependencies:
punycode "^1.4.1"
tough-cookie@~2.4.3:
version "2.4.3"
resolved "https://registry.yarnpkg.com/tough-cookie/-/tough-cookie-2.4.3.tgz#53f36da3f47783b0925afa06ff9f3b165280f781"
dependencies:
psl "^1.1.24"
punycode "^1.4.1"
tree-kill@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/tree-kill/-/tree-kill-1.2.0.tgz#5846786237b4239014f05db156b643212d4c6f36"
......@@ -1612,7 +1710,7 @@ utils-merge@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/utils-merge/-/utils-merge-1.0.1.tgz#9f95710f50a267947b2ccc124741c1028427e713"
uuid@^3.1.0:
uuid@^3.1.0, uuid@^3.3.2:
version "3.3.2"
resolved "https://registry.yarnpkg.com/uuid/-/uuid-3.3.2.tgz#1b4af4955eb3077c501c23872fc6513811587131"
......@@ -1628,6 +1726,14 @@ verror@1.10.0:
core-util-is "1.0.2"
extsprintf "^1.2.0"
webhdfs@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/webhdfs/-/webhdfs-1.2.0.tgz#c41b08ae33944a0220863bfd4b6719b9aaec1d37"
dependencies:
buffer-stream-reader "^0.1.1"
extend "^3.0.0"
request "^2.74.0"
which@^1.2.9:
version "1.3.1"
resolved "https://registry.yarnpkg.com/which/-/which-1.3.1.tgz#a45043d54f5805316da8d62f9f50918d3da70b0a"
......
......@@ -27,7 +27,7 @@ if env_args.platform is None:
from .standalone import *
elif env_args.platform == 'unittest':
from .test import *
elif env_args.platform in ('local', 'remote'):
elif env_args.platform in ('local', 'remote', 'pai'):
from .local import *
else:
raise RuntimeError('Unknown platform %s' % env_args.platform)
......@@ -27,7 +27,7 @@ def read(fname):
setuptools.setup(
name = 'nni',
version = '0.0.1',
version = '0.2.0',
packages = setuptools.find_packages(exclude=['tests']),
python_requires = '>=3.5',
......@@ -44,7 +44,7 @@ setuptools.setup(
author_email = 'nni@microsoft.com',
description = 'Python SDK for Neural Network Intelligence project',
license = 'MIT',
url = 'https://msrasrg.visualstudio.com/NeuralNetworkIntelligence',
url = 'https://github.com/Microsoft/nni',
long_description = read('README.md')
)
......@@ -21,7 +21,7 @@
import os
from schema import Schema, And, Use, Optional, Regex, Or
CONFIG_SCHEMA = Schema({
common_schema = {
'authorName': str,
'experimentName': str,
'trialConcurrency': And(int, lambda n: 1 <=n <= 999999),
......@@ -44,11 +44,6 @@ Optional('searchSpacePath'): os.path.exists,
Optional('classArgs'): dict,
Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999),
}),
'trial':{
'command': str,
'codeDir': os.path.exists,
'gpuNum': And(int, lambda x: 0 <= x <= 99999)
},
Optional('assessor'): Or({
'builtinAssessorName': lambda x: x in ['Medianstop'],
'classArgs': {
......@@ -58,10 +53,41 @@ Optional('assessor'): Or({
'codeDir': os.path.exists,
'classFileName': str,
'className': str,
'classArgs': {
'optimize_mode': lambda x: x in ['maximize', 'minimize']},
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
Optional('classArgs'): dict,
Optional('gpuNum'): And(int, lambda x: 0 <= x <= 99999),
}),
}
common_trial_schema = {
'trial':{
'command': str,
'codeDir': os.path.exists,
'gpuNum': And(int, lambda x: 0 <= x <= 99999)
}
}
pai_trial_schema = {
'trial':{
'command': str,
'codeDir': os.path.exists,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str,
'dataDir': Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),
'outputDir': Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?')
}
}
pai_config_schema = {
'paiConfig':{
'userName': str,
'passWord': str,
'host': str
}
}
machine_list_schima = {
Optional('machineList'):[Or({
'ip': str,
'port': And(int, lambda x: 0 < x < 65535),
......@@ -73,37 +99,11 @@ Optional('machineList'):[Or({
'username': str,
'sshKeyPath': os.path.exists,
Optional('passphrase'): str
})],
Optional('pai'):
{
'jobName': str,
"image": str,
"authFile": os.path.exists,
"dataDir": os.path.exists,
"outputDir": os.path.exists,
"codeDir": os.path.exists,
"virtualCluster": str,
"taskRoles": [
{
"name": str,
"taskNumber": And(int, lambda x: 0 <= x <= 99999),
"cpuNumber": And(int, lambda x: 0 <= x <= 99999),
"memoryMB": And(int, lambda x: 0 <= x <= 99999),
"shmMB": And(int, lambda x: 0 <= x <= 99999),
"gpuNumber": And(int, lambda x: 0 <= x <= 99999),
"portList": [
{
"label": str,
"beginAt": str,
"portNumber": And(int, lambda x: 0 < x < 65535)
}
],
"command": str,
"minFailedTaskCount": And(int, lambda x: 0 <= x <= 99999),
"minSucceededTaskCount": And(int, lambda x: 0 <= x <= 99999)
}
],
"gpuType": str,
"retryCount": And(int, lambda x: 0 <= x <= 99999)
})]
}
})
LOCAL_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema})
REMOTE_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema, **machine_list_schima})
PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_schema})
\ No newline at end of file
......@@ -65,6 +65,16 @@ def set_trial_config(experiment_config, port):
value_dict['command'] = experiment_config['trial']['command']
value_dict['codeDir'] = experiment_config['trial']['codeDir']
value_dict['gpuNum'] = experiment_config['trial']['gpuNum']
if experiment_config['trial'].get('cpuNum'):
value_dict['cpuNum'] = experiment_config['trial']['cpuNum']
if experiment_config['trial'].get('memoryMB'):
value_dict['memoryMB'] = experiment_config['trial']['memoryMB']
if experiment_config['trial'].get('image'):
value_dict['image'] = experiment_config['trial']['image']
if experiment_config['trial'].get('dataDir'):
value_dict['dataDir'] = experiment_config['trial']['dataDir']
if experiment_config['trial'].get('outputDir'):
value_dict['outputDir'] = experiment_config['trial']['outputDir']
request_data['trial_config'] = value_dict
response = rest_put(cluster_metadata_url(port), json.dumps(request_data), 20)
if check_response(response):
......@@ -95,6 +105,20 @@ def set_remote_config(experiment_config, port):
#set trial_config
return set_trial_config(experiment_config, port), err_message
def set_pai_config(experiment_config, port):
'''set pai configuration'''
pai_config_data = dict()
pai_config_data['pai_config'] = experiment_config['paiConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(pai_config_data), 20)
err_message = ''
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
return False, err_message
#set trial_config
return set_trial_config(experiment_config, port), err_message
def set_experiment(experiment_config, mode, port):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict()
......@@ -114,7 +138,7 @@ def set_experiment(experiment_config, mode, port):
{'key':'codeDir', 'value':experiment_config['trial']['codeDir']})
request_data['clusterMetaData'].append(
{'key': 'command', 'value': experiment_config['trial']['command']})
else:
elif experiment_config['trainingServicePlatform'] == 'remote':
request_data['clusterMetaData'].append(
{'key': 'machine_list', 'value': experiment_config['machineList']})
value_dict = dict()
......@@ -123,6 +147,25 @@ def set_experiment(experiment_config, mode, port):
value_dict['gpuNum'] = experiment_config['trial']['gpuNum']
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': value_dict})
elif experiment_config['trainingServicePlatform'] == 'pai':
request_data['clusterMetaData'].append(
{'key': 'pai_config', 'value': experiment_config['paiConfig']})
value_dict = dict()
value_dict['command'] = experiment_config['trial']['command']
value_dict['codeDir'] = experiment_config['trial']['codeDir']
value_dict['gpuNum'] = experiment_config['trial']['gpuNum']
if experiment_config['trial'].get('cpuNum'):
value_dict['cpuNum'] = experiment_config['trial']['cpuNum']
if experiment_config['trial'].get('memoryMB'):
value_dict['memoryMB'] = experiment_config['trial']['memoryMB']
if experiment_config['trial'].get('image'):
value_dict['image'] = experiment_config['trial']['image']
if experiment_config['trial'].get('dataDir'):
value_dict['dataDir'] = experiment_config['trial']['dataDir']
if experiment_config['trial'].get('outputDir'):
value_dict['outputDir'] = experiment_config['trial']['outputDir']
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': value_dict})
response = rest_post(experiment_url(port), json.dumps(request_data), 20)
if check_response(response):
......@@ -203,6 +246,21 @@ def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=No
except Exception:
raise Exception(ERROR_INFO % 'Rest server stopped!')
exit(0)
#set pai config
if experiment_config['trainingServicePlatform'] == 'pai':
print_normal('Setting pai config...')
config_result, err_msg = set_pai_config(experiment_config, REST_PORT)
if config_result:
print_normal('Success!')
else:
print_error('Failed! Error is: {}'.format(err_msg))
try:
cmds = ['pkill', '-P', str(rest_process.pid)]
call(cmds)
except Exception:
raise Exception(ERROR_INFO % 'Rest server stopped!')
exit(0)
# start a new experiment
print_normal('Starting experiment...')
......@@ -228,9 +286,10 @@ def launch_experiment(args, experiment_config, mode, webuiport, experiment_id=No
else:
print_normal('Starting web ui...')
webui_process = start_web_ui(webuiport)
nni_config.set_config('webuiPid', webui_process.pid)
print_normal('Starting web ui success!')
print_normal('{0} {1}'.format('Web UI url:', ' '.join(nni_config.get_config('webuiUrl'))))
if webui_process:
nni_config.set_config('webuiPid', webui_process.pid)
print_normal('Starting web ui success!')
print_normal('{0} {1}'.format('Web UI url:', ' '.join(nni_config.get_config('webuiUrl'))))
print_normal(EXPERIMENT_SUCCESS_INFO % (experiment_id, REST_PORT))
......
......@@ -20,8 +20,8 @@
import os
import json
from .config_schema import CONFIG_SCHEMA
from .common_utils import get_json_content
from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA
from .common_utils import get_json_content, print_error
def expand_path(experiment_config, key):
'''Change '~' to user home directory'''
......@@ -81,8 +81,17 @@ def validate_search_space_content(experiment_config):
def validate_common_content(experiment_config):
'''Validate whether the common values in experiment_config is valid'''
if not experiment_config.get('trainingServicePlatform') or \
experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai']:
print_error('Please set correct trainingServicePlatform!')
exit(0)
schema_dict = {
'local': LOCAL_CONFIG_SCHEMA,
'remote': REMOTE_CONFIG_SCHEMA,
'pai': PAI_CONFIG_SCHEMA
}
try:
CONFIG_SCHEMA.validate(experiment_config)
schema_dict.get(experiment_config['trainingServicePlatform']).validate(experiment_config)
#set default value
if experiment_config.get('maxExecDuration') is None:
experiment_config['maxExecDuration'] = '999d'
......
......@@ -2,8 +2,8 @@ import setuptools
setuptools.setup(
name = 'nnictl',
version = '0.0.1',
packages = setuptools.find_packages(),
version = '0.2.0',
packages = setuptools.find_packages(exclude=['*test*']),
python_requires = '>=3.5',
install_requires = [
......@@ -11,12 +11,13 @@ setuptools.setup(
'pyyaml',
'psutil',
'astor',
'schema'
'schema',
'pyhdfs'
],
author = 'Microsoft NNI Team',
author_email = 'nni@microsoft.com',
description = 'NNI control for Neural Network Intelligence project',
license = 'MIT',
url = 'https://msrasrg.visualstudio.com/NeuralNetworkIntelligence',
url = 'https://github.com/Microsoft/nni',
)
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
API_ROOT_URL = '/api/v1/nni-pai'
BASE_URL = 'http://{}'
DEFAULT_REST_PORT = 51189
HOME_DIR = os.path.join(os.environ['HOME'], 'nni')
LOG_DIR = os.path.join(HOME_DIR, 'trial-keeper', 'log')
STDOUT_FULL_PATH = os.path.join(LOG_DIR, 'stdout')
STDERR_FULL_PATH = os.path.join(LOG_DIR, 'stderr')
UPDATE_METRICS_API = '/update-metrics'
\ No newline at end of file
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import os
from pyhdfs import HdfsClient
def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient):
'''Copy directory from local to hdfs'''
if not os.path.exists(localDirectory):
raise Exception('Local Directory does not exist!')
hdfsClient.mkdirs(hdfsDirectory)
result = True
for file in os.listdir(localDirectory):
file_path = os.path.join(localDirectory, file)
if os.path.isdir(file_path):
hdfs_directory = os.path.join(hdfsDirectory, file)
try:
result = result and copyDirectoryToHdfs(file_path, hdfs_directory, hdfsClient)
except Exception as exception:
print(exception)
result = False
else:
hdfs_file_path = os.path.join(hdfsDirectory, file)
try:
result = result and copyFileToHdfs(file_path, hdfs_file_path, hdfsClient)
except Exception as exception:
print(exception)
result = False
return result
def copyFileToHdfs(localFilePath, hdfsFilePath, hdfsClient, override=True):
'''Copy a local file to hdfs directory'''
if not os.path.exists(localFilePath):
raise Exception('Local file Path does not exist!')
if os.path.isdir(localFilePath):
raise Exception('localFile should not a directory!')
if hdfsClient.exists(hdfsFilePath):
if override:
hdfsClient.delete(hdfsFilePath)
else:
return False
try:
hdfsClient.copy_from_local(localFilePath, hdfsFilePath)
return True
except Exception as exception:
print(exception)
return False
\ No newline at end of file
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #
import argparse
import errno
import json
import os
import re
import requests
from .constants import BASE_URL, DEFAULT_REST_PORT
from .rest_utils import rest_get, rest_post, rest_put, rest_delete
from .url_utils import gen_update_metrics_url
NNI_SYS_DIR = os.environ['NNI_SYS_DIR']
NNI_TRIAL_JOB_ID = os.environ['NNI_TRIAL_JOB_ID']
NNI_EXP_ID = os.environ['NNI_EXP_ID']
LEN_FIELD_SIZE = 6
MAGIC = 'ME'
print('In metrics_reader, NNI_SYS_DIR is {}'.format(NNI_SYS_DIR))
class TrialMetricsReader():
'''
Read metrics data from a trial job
'''
def __init__(self, rest_port = DEFAULT_REST_PORT):
self.offset_filename = os.path.join(NNI_SYS_DIR, '.nni', 'metrics_offset')
self.metrics_filename = os.path.join(NNI_SYS_DIR, '.nni', 'metrics')
self.rest_port = rest_port
def _metrics_file_is_empty(self):
if not os.path.isfile(self.metrics_filename):
return True
statinfo = os.stat(self.metrics_filename)
return statinfo.st_size == 0
def _get_offset(self):
offset = 0
if os.path.isfile(self.offset_filename):
with open(self.offset_filename, 'r') as f:
offset = int(f.readline())
return offset
def _write_offset(self, offset):
statinfo = os.stat(self.metrics_filename)
if offset < 0 or offset > statinfo.st_size:
raise ValueError('offset value is invalid: {}'.format(offset))
with open(self.offset_filename, 'w') as f:
f.write(str(offset)+'\n')
def _read_all_available_records(self, offset):
new_offset = offset
metrics = []
with open(self.metrics_filename, 'r') as f:
print('offset is {}'.format(offset))
f.seek(offset)
while True:
magic_string = f.read(len(MAGIC))
# empty data means EOF
if not magic_string:
break
strdatalen = f.read(LEN_FIELD_SIZE)
# empty data means EOF
if not strdatalen:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
datalen = int(strdatalen)
data = f.read(datalen)
if datalen > 0 and len(data) == datalen:
print('data is \'{}\''.format(data))
new_offset = f.tell()
metrics.append(data)
else:
raise ValueError("metric file {} format error after offset: {}.".format(self.metrics_filename, new_offset))
self._write_offset(new_offset)
return metrics
def read_trial_metrics(self):
'''
Read available metrics data for a trial
'''
if self._metrics_file_is_empty():
print('metrics is empty')
return []
offset = self._get_offset()
return self._read_all_available_records(offset)
def read_experiment_metrics(nnimanager_ip):
'''
Read metrics data for specified trial jobs
'''
result = {}
try:
reader = TrialMetricsReader()
result['jobId'] = NNI_TRIAL_JOB_ID
result['metrics'] = reader.read_trial_metrics()
print('Result metrics is {}'.format(json.dumps(result)))
if len(result['metrics']) > 0:
response = rest_post(gen_update_metrics_url(BASE_URL.format(nnimanager_ip), DEFAULT_REST_PORT, NNI_EXP_ID, NNI_TRIAL_JOB_ID), json.dumps(result), 10)
print('Response code is {}'.format(response.status_code))
except Exception:
#TODO error logging to file
pass
return json.dumps(result)
\ No newline at end of file
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import time
import requests
def rest_get(url, timeout):
'''Call rest get method'''
try:
response = requests.get(url, timeout=timeout)
return response
except Exception:
return None
def rest_post(url, data, timeout):
'''Call rest post method'''
try:
response = requests.post(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\
data=data, timeout=timeout)
return response
except Exception:
return None
def rest_put(url, data, timeout):
'''Call rest put method'''
try:
response = requests.put(url, headers={'Accept': 'application/json', 'Content-Type': 'application/json'},\
data=data, timeout=timeout)
return response
except Exception:
return None
def rest_delete(url, timeout):
'''Call rest delete method'''
try:
response = requests.delete(url, timeout=timeout)
return response
except Exception:
return None
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import unittest
import json
import sys
from pyhdfs import HdfsClient
sys.path.append("..")
from trial.hdfsClientUtility import copyFileToHdfs, copyDirectoryToHdfs
import os
import shutil
import random
import string
class HDFSClientUtilityTest(unittest.TestCase):
'''Unit test for hdfsClientUtility.py'''
def setUp(self):
self.hdfs_file_path = '../../.vscode/hdfsInfo.json'
self.hdfs_config = None
try:
with open(self.hdfs_file_path, 'r') as file:
self.hdfs_config = json.load(file)
except Exception as exception:
print(exception)
self.hdfs_client = HdfsClient(hosts='{0}:{1}'.format(self.hdfs_config['host'], '50070'), user_name=self.hdfs_config['userName'])
def get_random_name(self, length):
return ''.join(random.sample(string.ascii_letters + string.digits, length))
def test_copy_file_run(self):
'''test copyFileToHdfs'''
file_name = self.get_random_name(8)
file_content = 'hello world!'
with open('./{}'.format(file_name), 'w') as file:
file.write(file_content)
result = copyFileToHdfs('./{}'.format(file_name), '/{0}/{1}'.format(self.hdfs_config['userName'], file_name), self.hdfs_client)
self.assertTrue(result)
file_list = self.hdfs_client.listdir('/{0}'.format(self.hdfs_config['userName']))
self.assertIn(file_name, file_list)
hdfs_file_name = self.get_random_name(8)
self.hdfs_client.copy_to_local('/{0}/{1}'.format(self.hdfs_config['userName'], file_name), './{}'.format(hdfs_file_name))
self.assertTrue(os.path.exists('./{}'.format(hdfs_file_name)))
with open('./{}'.format(hdfs_file_name), 'r') as file:
content = file.readline()
self.assertEqual(file_content, content)
#clean up
os.remove('./{}'.format(file_name))
os.remove('./{}'.format(hdfs_file_name))
self.hdfs_client.delete('/{0}/{1}'.format(self.hdfs_config['userName'], file_name))
def test_copy_directory_run(self):
'''test copyDirectoryToHdfs'''
directory_name = self.get_random_name(8)
file_name_list = [self.get_random_name(8), self.get_random_name(8)]
file_content = 'hello world!'
os.makedirs('./{}'.format(directory_name))
for file_name in file_name_list:
with open('./{0}/{1}'.format(directory_name, file_name), 'w') as file:
file.write(file_content)
result = copyDirectoryToHdfs('./{}'.format(directory_name), '/{0}/{1}'.format(self.hdfs_config['userName'], directory_name), self.hdfs_client)
self.assertTrue(result)
directory_list = self.hdfs_client.listdir('/{0}'.format(self.hdfs_config['userName']))
self.assertIn(directory_name, directory_list)
sub_file_list = self.hdfs_client.listdir('/{0}/{1}'.format(self.hdfs_config['userName'], directory_name))
for file_name in file_name_list:
self.assertIn(file_name, sub_file_list)
#clean up
self.hdfs_client.delete('/{0}/{1}/{2}'.format(self.hdfs_config['userName'], directory_name, file_name))
self.hdfs_client.delete('/{0}/{1}'.format(self.hdfs_config['userName'], directory_name))
shutil.rmtree('./{}'.format(directory_name))
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
# ============================================================================================================================== #
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# ============================================================================================================================== #
import argparse
import sys
import os
from subprocess import Popen, PIPE
import time
import logging
import shlex
import re
from pyhdfs import HdfsClient
from .hdfsClientUtility import copyDirectoryToHdfs
from .constants import HOME_DIR, LOG_DIR, STDOUT_FULL_PATH, STDERR_FULL_PATH
from .metrics_reader import read_experiment_metrics
logger = logging.getLogger('trial_keeper')
def main_loop(args):
'''main loop logic for trial keeper'''
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')
print(shlex.split(args.trial_command))
# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior
process = Popen(args.trial_command, shell = True, stdout = stdout_file, stderr = stderr_file)
print('Subprocess pid is {}'.format(process.pid))
print('Current cwd is {}'.format(os.getcwd()))
while True:
retCode = process.poll()
## Read experiment metrics, to avoid missing metrics
read_experiment_metrics(args.nnimanager_ip)
if retCode is not None:
print('subprocess terminated. Exit code is {}. Quit'.format(retCode))
#copy local directory to hdfs
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR']
hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name)
print(nni_local_output_dir, args.pai_hdfs_output_dir)
try:
if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client):
print('copy directory success!')
else:
print('copy directory failed!')
except Exception as exception:
print(exception)
break
else:
print('subprocess pid: {} is still alive'.format(process.pid))
time.sleep(2)
def trial_keeper_help_info(*args):
print('please run --help to see guidance')
if __name__ == '__main__':
'''NNI Trial Keeper main function'''
PARSER = argparse.ArgumentParser()
PARSER.set_defaults(func=trial_keeper_help_info)
PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process')
PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager IP')
PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of hdfs')
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of hdfs')
PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs')
args, unknown = PARSER.parse_known_args()
if args.trial_command is None:
exit(1)
try:
main_loop(args)
except:
print('Exiting by user request')
sys.exit(1)
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge,
# to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from .constants import API_ROOT_URL, UPDATE_METRICS_API
def gen_update_metrics_url(base_url, port, exp_id, trial_job_id):
'''Generate update trial metrics url'''
return '{0}:{1}{2}{3}/{4}/:{5}'.format(base_url, port, API_ROOT_URL, UPDATE_METRICS_API, exp_id, trial_job_id)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment