Unverified Commit 817ec68b authored by liuzhe-lz's avatar liuzhe-lz Committed by GitHub
Browse files

Add native support for v2 config (#3466)

parent 6aaca5f7
......@@ -7,7 +7,7 @@ import logging
import json
import base64
from .runtime.common import enable_multi_thread, enable_multi_phase
from .runtime.common import enable_multi_thread
from .runtime.msg_dispatcher import MsgDispatcher
from .tools.package_utils import create_builtin_class_instance, create_customized_class_instance
......@@ -29,10 +29,8 @@ def main():
exp_params = json.loads(exp_params_decode)
logger.debug('exp_params json obj: [%s]', json.dumps(exp_params, indent=4))
if exp_params.get('multiThread'):
if exp_params.get('deprecated', {}).get('multiThread'):
enable_multi_thread()
if exp_params.get('multiPhase'):
enable_multi_phase()
if exp_params.get('advisor') is not None:
# advisor is enabled and starts to run
......@@ -61,10 +59,10 @@ def main():
def _run_advisor(exp_params):
if exp_params.get('advisor').get('builtinAdvisorName'):
if exp_params.get('advisor').get('name'):
dispatcher = create_builtin_class_instance(
exp_params.get('advisor').get('builtinAdvisorName'),
exp_params.get('advisor').get('classArgs'),
exp_params['advisor']['name'],
exp_params['advisor'].get('classArgs'),
'advisors')
else:
dispatcher = create_customized_class_instance(exp_params.get('advisor'))
......@@ -78,26 +76,26 @@ def _run_advisor(exp_params):
def _create_tuner(exp_params):
if exp_params.get('tuner').get('builtinTunerName'):
if exp_params['tuner'].get('name'):
tuner = create_builtin_class_instance(
exp_params.get('tuner').get('builtinTunerName'),
exp_params.get('tuner').get('classArgs'),
exp_params['tuner']['name'],
exp_params['tuner'].get('classArgs'),
'tuners')
else:
tuner = create_customized_class_instance(exp_params.get('tuner'))
tuner = create_customized_class_instance(exp_params['tuner'])
if tuner is None:
raise AssertionError('Failed to create Tuner instance')
return tuner
def _create_assessor(exp_params):
if exp_params.get('assessor').get('builtinAssessorName'):
if exp_params['assessor'].get('name'):
assessor = create_builtin_class_instance(
exp_params.get('assessor').get('builtinAssessorName'),
exp_params.get('assessor').get('classArgs'),
exp_params['assessor']['name'],
exp_params['assessor'].get('classArgs'),
'assessors')
else:
assessor = create_customized_class_instance(exp_params.get('assessor'))
assessor = create_customized_class_instance(exp_params['assessor'])
if assessor is None:
raise AssertionError('Failed to create Assessor instance')
return assessor
......
......@@ -9,3 +9,4 @@ from .aml import *
from .kubeflow import *
from .frameworkcontroller import *
from .adl import *
from .shared_storage import *
......@@ -101,6 +101,8 @@ class ConfigBase:
elif isinstance(value, ConfigBase):
setattr(ret, key, value.canonical())
# value will be copied twice, should not be a performance issue anyway
elif isinstance(value, Path):
setattr(ret, key, str(value))
return ret
def validate(self) -> None:
......
......@@ -5,6 +5,8 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from ruamel.yaml import YAML
from .base import ConfigBase, PathLike
from . import util
......@@ -27,23 +29,27 @@ class _AlgorithmConfig(ConfigBase):
super().validate()
_validate_algo(self)
@dataclass(init=False)
class AlgorithmConfig(_AlgorithmConfig):
name: str
class_args: Optional[Dict[str, Any]] = None
@dataclass(init=False)
class CustomAlgorithmConfig(_AlgorithmConfig):
class_name: str
class_directory: Optional[PathLike] = None
class_directory: Optional[PathLike] = '.'
class_args: Optional[Dict[str, Any]] = None
class TrainingServiceConfig(ConfigBase):
platform: str
class SharedStorageConfig(ConfigBase):
storage_type: str
local_mount_point: str
remote_mount_point: str
local_mounted: str
@dataclass(init=False)
class ExperimentConfig(ConfigBase):
......@@ -53,19 +59,21 @@ class ExperimentConfig(ConfigBase):
trial_command: str
trial_code_directory: PathLike = '.'
trial_concurrency: int
trial_gpu_number: Optional[int] = None
trial_gpu_number: Optional[int] = None # TODO: in openpai cannot be None
max_experiment_duration: Optional[str] = None
max_trial_number: Optional[int] = None
nni_manager_ip: Optional[str] = None
use_annotation: bool = False
debug: bool = False
log_level: Optional[str] = None
experiment_working_directory: Optional[PathLike] = None
experiment_working_directory: PathLike = '~/nni-experiments'
tuner_gpu_indices: Optional[Union[List[int], str]] = None
tuner: Optional[_AlgorithmConfig] = None
assessor: Optional[_AlgorithmConfig] = None
advisor: Optional[_AlgorithmConfig] = None
training_service: Union[TrainingServiceConfig, List[TrainingServiceConfig]]
shared_storage: Optional[SharedStorageConfig] = None
_deprecated: Optional[Dict[str, Any]] = None
def __init__(self, training_service_platform: Optional[Union[str, List[str]]] = None, **kwargs):
base_path = kwargs.pop('_base_path', None)
......@@ -100,6 +108,12 @@ class ExperimentConfig(ConfigBase):
if self.training_service.use_active_gpu is None:
raise ValueError('Please set "use_active_gpu"')
def json(self) -> Dict[str, Any]:
obj = super().json()
if obj.get('searchSpaceFile'):
obj['searchSpace'] = YAML().load(open(obj.pop('searchSpaceFile')))
return obj
## End of public API ##
@property
......@@ -117,9 +131,9 @@ _canonical_rules = {
'max_experiment_duration': lambda value: f'{util.parse_time(value)}s' if value is not None else None,
'experiment_working_directory': util.canonical_path,
'tuner_gpu_indices': lambda value: [int(idx) for idx in value.split(',')] if isinstance(value, str) else value,
'tuner': lambda config: None if config is None or config.name == '_none_' else config,
'assessor': lambda config: None if config is None or config.name == '_none_' else config,
'advisor': lambda config: None if config is None or config.name == '_none_' else config,
'tuner': lambda config: None if config is None or config.name == '_none_' else config.canonical(),
'assessor': lambda config: None if config is None or config.name == '_none_' else config.canonical(),
'advisor': lambda config: None if config is None or config.name == '_none_' else config.canonical(),
}
_validation_rules = {
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import copy
import logging
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, List
from .common import ExperimentConfig
from .common import ExperimentConfig, AlgorithmConfig, CustomAlgorithmConfig
from .remote import RemoteMachineConfig
from .kubeflow import KubeflowRoleConfig, KubeflowNfsConfig, KubeflowAzureStorageConfig
from .frameworkcontroller import FrameworkControllerRoleConfig
from .shared_storage import NfsConfig, AzureBlobConfig
from . import util
_logger = logging.getLogger(__name__)
def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str, Any]:
config.validate(False)
data = config.json()
ts = data.pop('trainingService')
data['trial'] = {
'command': data.pop('trialCommand'),
'codeDir': data.pop('trialCodeDirectory'),
}
if 'trialGpuNumber' in data:
data['trial']['gpuNum'] = data.pop('trialGpuNumber')
if isinstance(ts, list):
hybrid_names = []
for conf in ts:
if conf['platform'] == 'openpai':
conf['platform'] = 'pai'
hybrid_names.append(conf['platform'])
_handle_training_service(conf, data)
data['trainingServicePlatform'] = 'hybrid'
data['hybridConfig'] = {'trainingServicePlatforms': hybrid_names}
else:
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
data['trainingServicePlatform'] = ts['platform']
_handle_training_service(ts, data)
data['authorName'] = 'N/A'
data['experimentName'] = data.get('experimentName', 'N/A')
data['maxExecDuration'] = data.pop('maxExperimentDuration', '999d')
if data['debug']:
data['versionCheck'] = False
data['maxTrialNum'] = data.pop('maxTrialNumber', 99999)
ss = data.pop('searchSpace', None)
ss_file = data.pop('searchSpaceFile', None)
if ss is not None:
ss_file = NamedTemporaryFile('w', delete=False)
json.dump(ss, ss_file, indent=4)
data['searchSpacePath'] = ss_file.name
elif ss_file is not None:
data['searchSpacePath'] = ss_file
if 'experimentWorkingDirectory' in data:
data['logDir'] = data.pop('experimentWorkingDirectory')
def to_v2(v1) -> ExperimentConfig:
v1 = copy.deepcopy(v1)
platform = v1.pop('trainingServicePlatform')
assert platform in ['local', 'remote', 'openpai', 'aml']
v2 = ExperimentConfig(platform)
_drop_field(v1, 'authorName')
_move_field(v1, v2, 'experimentName', 'experiment_name')
_drop_field(v1, 'description')
_move_field(v1, v2, 'trialConcurrency', 'trial_concurrency')
_move_field(v1, v2, 'maxExecDuration', 'max_experiment_duration')
if isinstance(v2.max_experiment_duration, (int, float)):
v2.max_experiment_duration = str(v2.max_experiment_duration) + 's'
_move_field(v1, v2, 'maxTrialNum', 'max_trial_number')
_move_field(v1, v2, 'searchSpacePath', 'search_space_file')
assert not v1.pop('multiPhase', None), 'Multi-phase is no longer supported'
_deprecate(v1, v2, 'multiThread')
_move_field(v1, v2, 'nniManagerIp', 'nni_manager_ip')
_move_field(v1, v2, 'logDir', 'experiment_working_directory')
_move_field(v1, v2, 'debug', 'debug')
_deprecate(v1, v2, 'versionCheck')
_move_field(v1, v2, 'logLevel', 'log_level')
_deprecate(v1, v2, 'logCollection')
v1.pop('useAnnotation', None) # TODO: how to handle annotation in nni.Experiment?
if 'trial' in v1:
v1_trial = v1.pop('trial')
_move_field(v1_trial, v2, 'command', 'trial_command')
_move_field(v1_trial, v2, 'codeDir', 'trial_code_directory')
_move_field(v1_trial, v2, 'gpuNum', 'trial_gpu_number')
for algo_type in ['tuner', 'assessor', 'advisor']:
algo = data.get(algo_type)
if algo is None:
if algo_type not in v1:
continue
if algo['name'] is not None: # builtin
algo['builtin' + algo_type.title() + 'Name'] = algo.pop('name')
algo.pop('className', None)
algo.pop('codeDirectory', None)
else:
algo.pop('name', None)
class_name_parts = algo.pop('className').split('.')
algo['codeDir'] = algo.pop('codeDirectory', '') + '/'.join(class_name_parts[:-2])
algo['classFileName'] = class_name_parts[-2] + '.py'
algo['className'] = class_name_parts[-1]
tuner_gpu_indices = _convert_gpu_indices(data.pop('tunerGpuIndices', None))
if tuner_gpu_indices is not None:
data['tuner']['gpuIndicies'] = tuner_gpu_indices
return data
def _handle_training_service(ts, data):
if ts['platform'] == 'local':
data['localConfig'] = {
'useActiveGpu': ts.get('useActiveGpu', False),
'maxTrialNumPerGpu': ts['maxTrialNumberPerGpu']
}
if 'gpuIndices' in ts:
data['localConfig']['gpuIndices'] = _convert_gpu_indices(ts['gpuIndices'])
elif ts['platform'] == 'remote':
data['remoteConfig'] = {'reuse': ts['reuseMode']}
data['machineList'] = []
for machine in ts['machineList']:
machine_v1 = {
'ip': machine.get('host'),
'port': machine.get('port'),
'username': machine.get('user'),
'passwd': machine.get('password'),
'sshKeyPath': machine.get('sshKeyFile'),
'passphrase': machine.get('sshPassphrase'),
'gpuIndices': _convert_gpu_indices(machine.get('gpuIndices')),
'maxTrialNumPerGpu': machine.get('maxTrialNumPerGpu'),
'useActiveGpu': machine.get('useActiveGpu'),
'pythonPath': machine.get('pythonPath')
}
machine_v1 = {k: v for k, v in machine_v1.items() if v is not None}
data['machineList'].append(machine_v1)
elif ts['platform'] == 'pai':
data['trial']['image'] = ts['dockerImage']
data['trial']['nniManagerNFSMountPath'] = ts['localStorageMountPoint']
data['trial']['containerNFSMountPath'] = ts['containerStorageMountPoint']
data['trial']['paiStorageConfigName'] = ts['storageConfigName']
data['trial']['cpuNum'] = ts['trialCpuNumber']
data['trial']['memoryMB'] = ts['trialMemorySize']
data['paiConfig'] = {
'userName': ts['username'],
'token': ts['token'],
'host': ts['host'],
'reuse': ts['reuseMode']
}
if 'openpaiConfigFile' in ts:
data['paiConfig']['paiConfigPath'] = ts['openpaiConfigFile']
elif 'openpaiConfig' in ts:
conf_file = NamedTemporaryFile('w', delete=False)
json.dump(ts['openpaiConfig'], conf_file, indent=4)
data['paiConfig']['paiConfigPath'] = conf_file.name
elif ts['platform'] == 'aml':
data['trial']['image'] = ts['dockerImage']
data['amlConfig'] = dict(ts)
data['amlConfig'].pop('platform')
data['amlConfig'].pop('dockerImage')
elif ts['platform'] == 'kubeflow':
data['trial'].pop('command')
data['trial'].pop('gpuNum')
data['kubeflowConfig'] = dict(ts['storage'])
data['kubeflowConfig']['operator'] = ts['operator']
data['kubeflowConfig']['apiVersion'] = ts['apiVersion']
data['trial']['worker'] = _convert_kubeflow_role(ts['worker'])
if ts.get('parameterServer') is not None:
if ts['operator'] == 'tf-operator':
data['trial']['ps'] = _convert_kubeflow_role(ts['parameterServer'])
else:
data['trial']['master'] = _convert_kubeflow_role(ts['parameterServer'])
elif ts['platform'] == 'frameworkcontroller':
data['trial'].pop('command')
data['trial'].pop('gpuNum')
data['frameworkcontrollerConfig'] = dict(ts['storage'])
data['frameworkcontrollerConfig']['serviceAccountName'] = ts['serviceAccountName']
data['trial']['taskRoles'] = [_convert_fxctl_role(r) for r in ts['taskRoles']]
elif ts['platform'] == 'adl':
data['trial']['image'] = ts['dockerImage']
def _convert_gpu_indices(indices):
return ','.join(str(idx) for idx in indices) if indices is not None else None
def _convert_kubeflow_role(data):
return {
'replicas': data['replicas'],
'command': data['command'],
'gpuNum': data['gpuNumber'],
'cpuNum': data['cpuNumber'],
'memoryMB': util.parse_size(data['memorySize']),
'image': data['dockerImage']
}
def _convert_fxctl_role(data):
return {
'name': data['name'],
'taskNum': data['taskNumber'],
'command': data['command'],
'gpuNum': data['gpuNumber'],
'cpuNum': data['cpuNumber'],
'memoryMB': util.parse_size(data['memorySize']),
'image': data['dockerImage'],
'frameworkAttemptCompletionPolicy': {
'minFailedTaskCount': data['attemptCompletionMinFailedTasks'],
'minSucceededTaskCount': data['attemptCompletionMinSucceededTasks']
}
}
def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
ret = []
if isinstance(config.training_service, list):
hybrid_conf = dict()
hybrid_conf['hybrid_config'] = experiment_config['hybridConfig']
for conf in config.training_service:
metadata = _get_cluster_metadata(conf.platform, experiment_config)
if metadata is not None:
hybrid_conf.update(metadata)
ret.append(hybrid_conf)
else:
metadata = _get_cluster_metadata(config.training_service.platform, experiment_config)
if metadata is not None:
ret.append(metadata)
v1_algo = v1.pop(algo_type)
if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret
builtin_name = v1_algo.pop(f'builtin{algo_type.title()}Name', None)
class_args = v1_algo.pop('classArgs', None)
def _get_cluster_metadata(platform: str, experiment_config) -> Dict:
if platform == 'local':
request_data = dict()
request_data['local_config'] = experiment_config['localConfig']
if request_data['local_config']:
if request_data['local_config'].get('gpuIndices') and isinstance(request_data['local_config'].get('gpuIndices'), int):
request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices'))
return request_data
elif platform == 'remote':
request_data = dict()
if experiment_config.get('remoteConfig'):
request_data['remote_config'] = experiment_config['remoteConfig']
else:
request_data['remote_config'] = {'reuse': False}
request_data['machine_list'] = experiment_config['machineList']
if request_data['machine_list']:
for i in range(len(request_data['machine_list'])):
if isinstance(request_data['machine_list'][i].get('gpuIndices'), int):
request_data['machine_list'][i]['gpuIndices'] = str(request_data['machine_list'][i].get('gpuIndices'))
return request_data
elif platform == 'openpai':
return {'pai_config': experiment_config['paiConfig']}
elif platform == 'aml':
return {'aml_config': experiment_config['amlConfig']}
elif platform == 'kubeflow':
return {'kubeflow_config': experiment_config['kubeflowConfig']}
if builtin_name is not None:
v2_algo = AlgorithmConfig(name=builtin_name, class_args=class_args)
elif platform == 'frameworkcontroller':
return {'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']}
elif platform == 'adl':
return None
else:
class_directory = util.canonical_path(v1_algo.pop('codeDir'))
class_file_name = v1_algo.pop('classFileName')
assert class_file_name.endswith('.py')
class_name = class_file_name[:-3] + '.' + v1_algo.pop('className')
v2_algo = CustomAlgorithmConfig(
class_name=class_name,
class_directory=class_directory,
class_args=class_args
)
setattr(v2, algo_type, v2_algo)
_deprecate(v1_algo, v2, 'includeIntermediateResults')
_move_field(v1_algo, v2, 'gpuIndices', 'tuner_gpu_indices')
assert not v1_algo, v1_algo
ts = v2.training_service
if platform == 'local':
local_config = v1.pop('localConfig', {})
_move_field(local_config, ts, 'gpuIndices', 'gpu_indices')
_move_field(local_config, ts, 'maxTrialNumPerGpu', 'max_trial_number_per_gpu')
_move_field(local_config, ts, 'useActiveGpu', 'use_active_gpu')
assert not local_config, local_config
if platform == 'remote':
remote_config = v1.pop('remoteConfig', {})
_move_field(remote_config, ts, 'reuse', 'reuse_mode')
assert not remote_config, remote_config
ts.machine_list = []
for v1_machine in v1.pop('machineList'):
v2_machine = RemoteMachineConfig()
ts.machine_list.append(v2_machine)
_move_field(v1_machine, v2_machine, 'ip', 'host')
_move_field(v1_machine, v2_machine, 'port', 'port')
_move_field(v1_machine, v2_machine, 'username', 'user')
_move_field(v1_machine, v2_machine, 'sshKeyPath', 'ssh_key_file')
_move_field(v1_machine, v2_machine, 'passphrase', 'ssh_passphrase')
_move_field(v1_machine, v2_machine, 'gpuIndices', 'gpu_indices')
_move_field(v1_machine, v2_machine, 'maxTrialNumPerGpu', 'max_trial_number_per_gpu')
_move_field(v1_machine, v2_machine, 'useActiveGpu', 'use_active_gpu')
_move_field(v1_machine, v2_machine, 'pythonPath', 'python_path')
_move_field(v1_machine, v2_machine, 'passwd', 'password')
assert not v1_machine, v1_machine
if platform == 'openpai':
_move_field(v1_trial, ts, 'nniManagerNFSMountPath', 'local_storage_mount_point')
_move_field(v1_trial, ts, 'containerNFSMountPath', 'container_storage_mount_point')
_move_field(v1_trial, ts, 'cpuNum', 'trial_cpu_number')
if 'memoryMB' in v1_trial:
ts.trial_memory_size = str(v1_trial.pop('memoryMB')) + 'mb'
_move_field(v1_trial, ts, 'image', 'docker_image')
_deprecate(v1_trial, v2, 'virtualCluster')
_move_field(v1_trial, ts, 'paiStorageConfigName', 'storage_config_name')
_move_field(v1_trial, ts, 'paiConfigPath', 'openpaiConfigFile')
pai_config = v1.pop('paiConfig')
_move_field(pai_config, ts, 'userName', 'username')
_deprecate(pai_config, v2, 'password')
_move_field(pai_config, ts, 'token', 'token')
_move_field(pai_config, ts, 'host', 'host')
_move_field(pai_config, ts, 'reuse', 'reuse_mode')
_move_field(pai_config, ts, 'gpuNum', 'trial_gpu_number')
_move_field(pai_config, ts, 'cpuNum', 'trial_cpu_number')
if 'memoryMB' in pai_config:
ts.trial_memory_size = str(pai_config.pop('memoryMB')) + 'mb'
_deprecate(pai_config, v2, 'maxTrialNumPerGpu')
_deprecate(pai_config, v2, 'useActiveGpu')
assert not pai_config, pai_config
if platform == 'aml':
_move_field(v1_trial, ts, 'image', 'docker_image')
aml_config = v1.pop('amlConfig', {})
_move_field(aml_config, ts, 'subscriptionId', 'subscription_id')
_move_field(aml_config, ts, 'resourceGroup', 'resource_group')
_move_field(aml_config, ts, 'workspaceName', 'workspace_name')
_move_field(aml_config, ts, 'computeTarget', 'compute_target')
_deprecate(aml_config, v2, 'maxTrialNumPerGpu')
_deprecate(aml_config, v2, 'useActiveGpu')
assert not aml_config, aml_config
if platform == 'kubeflow':
kf_config = v1.pop('kubeflowConfig')
_move_field(kf_config, ts, 'operator', 'operator')
ps_name = 'ps' if ts.operator != 'pytorch-operator' else 'master'
_move_field(kf_config, ts, 'apiVersion', 'api_version')
# FIXME: use storage service
storage_name = kf_config.pop('storage', None)
if storage_name is None:
storage_name = 'nfs' if 'nfs' in kf_config else 'azureStorage'
if storage_name == 'nfs':
nfs = kf_config.pop('nfs')
ts.storage = KubeflowNfsConfig(server=nfs['server'], path=nfs['path'])
if storage_name == 'azureStorage':
key_vault = kf_config.pop('keyVault')
azure_storage = kf_config.pop('azureStorage')
ts.storage = KubeflowAzureStorageConfig(
azure_account=azure_storage['accountName'],
azure_share=azure_storage['azureShare'],
key_vault=key_vault['vaultName'],
key_vault_secret=key_vault['name']
)
_deprecate(kf_config, v2, 'uploadRetryCount')
assert not kf_config, kf_config
_drop_field(v1_trial, 'nasMode')
for role_name in [ps_name, 'worker']:
if role_name not in v1_trial:
continue
v1_role = v1_trial.pop(role_name)
v2_role = KubeflowRoleConfig()
if role_name == 'worker':
ts.worker = v2_role
else:
raise RuntimeError('Unsupported training service ' + platform)
def to_rest_json(config: ExperimentConfig) -> Dict[str, Any]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
request_data = dict()
request_data['authorName'] = experiment_config['authorName']
request_data['experimentName'] = experiment_config['experimentName']
request_data['trialConcurrency'] = experiment_config['trialConcurrency']
request_data['maxExecDuration'] = util.parse_time(experiment_config['maxExecDuration'])
request_data['maxTrialNum'] = experiment_config['maxTrialNum']
if config.search_space is not None:
request_data['searchSpace'] = json.dumps(config.search_space)
elif config.search_space_file is not None:
request_data['searchSpace'] = Path(config.search_space_file).read_text()
request_data['trainingServicePlatform'] = experiment_config.get('trainingServicePlatform')
if experiment_config.get('advisor'):
request_data['advisor'] = experiment_config['advisor']
if request_data['advisor'].get('gpuNum'):
_logger.warning('gpuNum is deprecated, please use gpuIndices instead.')
if request_data['advisor'].get('gpuIndices') and isinstance(request_data['advisor'].get('gpuIndices'), int):
request_data['advisor']['gpuIndices'] = str(request_data['advisor'].get('gpuIndices'))
elif experiment_config.get('tuner'):
request_data['tuner'] = experiment_config['tuner']
if request_data['tuner'].get('gpuNum'):
_logger.warning('gpuNum is deprecated, please use gpuIndices instead.')
if request_data['tuner'].get('gpuIndices') and isinstance(request_data['tuner'].get('gpuIndices'), int):
request_data['tuner']['gpuIndices'] = str(request_data['tuner'].get('gpuIndices'))
if 'assessor' in experiment_config:
request_data['assessor'] = experiment_config['assessor']
if request_data['assessor'].get('gpuNum'):
_logger.warning('gpuNum is deprecated, please remove it from your config file.')
ts.parameter_server = v2_role
_move_field(v1_role, v2_role, 'replicas', 'replicas')
_move_field(v1_role, v2_role, 'command', 'command')
_move_field(v1_role, v2_role, 'gpu_num', 'gpu_number')
_move_field(v1_role, v2_role, 'cpu_num', 'cpu_number')
v2_role.memory_size = str(v1_role.pop('memoryMB')) + 'mb'
_move_field(v1_role, v2_role, 'image', 'docker_image')
_deprecate(v1_role, v2, 'privateRegistryAuthPath')
assert not v1_role, v1_role
if platform == 'frameworkcontroller':
fc_config = v1.pop('frameworkcontroller')
_deprecate(fc_config, v2, 'serviceAccountName')
storage_name = fc_config.pop('storage', None)
if storage_name is None:
storage_name = 'nfs' if 'nfs' in fc_config else 'azureStorage'
if storage_name == 'nfs':
nfs = fc_config.pop('nfs')
ts.storage = KubeflowNfsConfig(server=nfs['server'], path=nfs['path'])
if storage_name == 'azureStorage':
key_vault = fc_config.pop('keyVault')
azure_storage = fc_config.pop('azureStorage')
ts.storage = KubeflowAzureStorageConfig(
azure_account=azure_storage['accountName'],
azure_share=azure_storage['azureShare'],
key_vault=key_vault['vaultName'],
key_vault_secret=key_vault['name']
)
_deprecate(fc_config, v2, 'uploadRetryCount')
assert not fc_config, fc_config
_drop_field(v1_trial, 'nasMode')
ts.task_roles = []
for v1_role in v1_trial.pop('taskRoles', []):
v2_role = FrameworkControllerRoleConfig()
ts.task_roles.append(v2_role)
_move_field(v1_role, v2_role, 'name', 'name')
_move_field(v1_role, v2_role, 'taskNum', 'task_number')
policy = v1_role.pop('frameworkControllerCompletionPolicy', {})
_move_field(policy, v2_role, 'minFailedTaskCount', 'attempt_completion_min_failed_tasks')
_move_field(policy, v2_role, 'minSucceededTaskCount', 'attempt_completion_min_succeeded_tasks')
_move_field(v1_role, v2_role, 'command', 'command')
_move_field(v1_role, v2_role, 'gpuNum', 'gpu_number')
_move_field(v1_role, v2_role, 'cpuNum', 'cpu_number')
v2_role.memory_size = str(v1_role.pop('memoryMB')) + 'mb'
_move_field(v1_role, v2_role, 'image', 'docker_image')
_deprecate(v1_role, v2, 'privateRegistryAuthPath')
assert not v1_role, v1_role
# hybrid mode should always use v2 schema, so no need to handle here
v1_storage = v1.pop('sharedStorage', None)
if v1_storage:
type_ = v1_storage.pop('storageType')
if type_ == 'NFS':
v2.shared_storage = NfsConfig(**v1_storage)
elif type_ == 'AzureBlob':
v2.shared_storage = AzureBlobConfig(**v1_storage)
else:
request_data['tuner'] = {'builtinTunerName': '_user_created_'}
#debug mode should disable version check
if experiment_config.get('debug') is not None:
request_data['versionCheck'] = not experiment_config.get('debug')
#validate version check
if experiment_config.get('versionCheck') is not None:
request_data['versionCheck'] = experiment_config.get('versionCheck')
if experiment_config.get('logCollection'):
request_data['logCollection'] = experiment_config.get('logCollection')
request_data['clusterMetaData'] = []
if experiment_config['trainingServicePlatform'] == 'local':
if experiment_config.get('localConfig'):
request_data['clusterMetaData'].append(
{'key': 'local_config', 'value': experiment_config['localConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'remote':
request_data['clusterMetaData'].append(
{'key': 'machine_list', 'value': experiment_config['machineList']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
if not experiment_config.get('remoteConfig'):
# set default value of reuse in remoteConfig to False
experiment_config['remoteConfig'] = {'reuse': False}
request_data['clusterMetaData'].append(
{'key': 'remote_config', 'value': experiment_config['remoteConfig']})
elif experiment_config['trainingServicePlatform'] == 'pai':
request_data['clusterMetaData'].append(
{'key': 'pai_config', 'value': experiment_config['paiConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'kubeflow':
request_data['clusterMetaData'].append(
{'key': 'kubeflow_config', 'value': experiment_config['kubeflowConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'frameworkcontroller':
request_data['clusterMetaData'].append(
{'key': 'frameworkcontroller_config', 'value': experiment_config['frameworkcontrollerConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'aml':
request_data['clusterMetaData'].append(
{'key': 'aml_config', 'value': experiment_config['amlConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
return request_data
raise ValueError(f'bad storage type: {type_}')
assert not v1_trial, v1_trial
assert not v1, v1
return v2.canonical()
def _move_field(v1, v2, v1_key, v2_key):
if v1_key in v1:
value = v1.pop(v1_key, None)
if value is not None:
setattr(v2, v2_key, value)
def _drop_field(v1, key):
if key in v1:
logging.warning(f'Configuration field {key} is no longer supported and has been ignored')
v1.pop(key)
# NOTE: fields not yet supported by v2 are also (temporarily) placed here
def _deprecate(v1, v2, key):
if key in v1:
if v2._deprecated is None:
v2._deprecated = {}
v2._deprecated[key] = v1.pop(key)
......@@ -56,7 +56,7 @@ class KubeflowConfig(TrainingServiceConfig):
parameter_server: Optional[KubeflowRoleConfig] = None
def __init__(self, **kwargs):
kwargs = util.case_insensitve(kwargs)
kwargs = util.case_insensitive(kwargs)
kwargs['storage'] = util.load_config(_KubeflowStorageConfig, kwargs.get('storage'))
kwargs['worker'] = util.load_config(KubeflowRoleConfig, kwargs.get('worker'))
kwargs['parameterserver'] = util.load_config(KubeflowRoleConfig, kwargs.get('parameterserver'))
......
......@@ -23,7 +23,7 @@ class OpenpaiConfig(TrainingServiceConfig):
docker_image: str = 'msranni/nni:latest'
local_storage_mount_point: PathLike
container_storage_mount_point: str
reuse_mode: bool = False
reuse_mode: bool = True
openpai_config: Optional[Dict[str, Any]] = None
openpai_config_file: Optional[PathLike] = None
......
......@@ -46,7 +46,7 @@ class RemoteMachineConfig(ConfigBase):
@dataclass(init=False)
class RemoteConfig(TrainingServiceConfig):
platform: str = 'remote'
reuse_mode: bool = False
reuse_mode: bool = True
machine_list: List[RemoteMachineConfig]
def __init__(self, **kwargs):
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from dataclasses import dataclass
from typing import Optional
from .common import SharedStorageConfig
__all__ = ['NfsConfig', 'AzureBlobConfig']
@dataclass(init=False)
class NfsConfig(SharedStorageConfig):
storage_type: str = 'NFS'
nfs_server: str
exported_directory: str
@dataclass(init=False)
class AzureBlobConfig(SharedStorageConfig):
storage_type: str = 'AzureBlob'
storage_account_name: str
storage_account_key: Optional[str] = None
resource_group_name: Optional[str] = None
container_name: str
......@@ -19,7 +19,7 @@ def case_insensitive(key_or_kwargs: Union[str, Dict[str, Any]]) -> Union[str, Di
return {key.lower().replace('_', ''): value for key, value in key_or_kwargs.items()}
def camel_case(key: str) -> str:
words = key.split('_')
words = key.strip('_').split('_')
return words[0] + ''.join(word.title() for word in words[1:])
def canonical_path(path: Optional[PathLike]) -> Optional[str]:
......
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import contextlib
import logging
from pathlib import Path
......@@ -13,7 +16,6 @@ import nni_node # pylint: disable=import-error
import nni.runtime.protocol
from .config import ExperimentConfig
from .config import convert
from .pipe import Pipe
from . import rest
from ..tools.nnictl.config_utils import Experiments
......@@ -40,7 +42,7 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo
_save_experiment_information(exp_id, port, start_time, platform,
config.experiment_name, proc.pid, config.experiment_working_directory)
_logger.info('Setting up...')
_init_experiment(config, port, debug)
rest.post(port, '/experiment', config.json())
return proc
except Exception as e:
......@@ -75,7 +77,7 @@ def start_experiment_retiarii(exp_id: str, config: ExperimentConfig, port: int,
_save_experiment_information(exp_id, port, start_time, platform,
config.experiment_name, proc.pid, config.experiment_working_directory)
_logger.info('Setting up...')
_init_experiment(config, port, debug)
rest.post(port, '/experiment', config.json())
return proc, pipe
except Exception as e:
......@@ -145,12 +147,6 @@ def _check_rest_server(port: int, retry: int = 3) -> None:
rest.get(port, '/check-status')
def _init_experiment(config: ExperimentConfig, port: int, debug: bool) -> None:
for cluster_metadata in convert.to_cluster_metadata(config):
rest.put(port, '/experiment/cluster-metadata', cluster_metadata)
rest.post(port, '/experiment', convert.to_rest_json(config))
def _save_experiment_information(experiment_id: str, port: int, start_time: int, platform: str, name: str, pid: int, logDir: str) -> None:
experiments_config = Experiments()
experiments_config.add_experiment(experiment_id, port, start_time, platform, name, pid=pid, logDir=logDir)
......@@ -35,11 +35,17 @@ def verify_algo_import(meta):
def algo_reg(args):
meta_list = read_reg_meta_list(args.meta_path)
for meta in meta_list:
if get_registered_algo_meta(meta['builtinName']) is not None:
print_error('builtinName {} already registered'.format(meta['builtinName']))
return
old = get_registered_algo_meta(meta['builtinName'])
if old is None:
verify_algo_import(meta)
save_algo_meta_data(meta)
elif old['source'] != 'nni':
verify_algo_import(meta)
print_green(f'Updating exist algorithm')
remove_algo_meta_data(meta['builtinName'])
save_algo_meta_data(meta)
else:
print_error(f'Cannot overwrite builtin algorithm')
print_green('{} registered sucessfully!'.format(meta['builtinName']))
def algo_unreg(args):
......
......@@ -25,6 +25,8 @@ from .constants import NNI_HOME_DIR, ERROR_INFO, REST_TIME_OUT, EXPERIMENT_SUCCE
from .command_utils import check_output_command, kill_command
from .nnictl_utils import update_experiment
k8s_training_services = ['kubeflow', 'frameworkcontroller', 'adl']
def get_log_path(experiment_id):
'''generate stdout and stderr log path'''
os.makedirs(os.path.join(NNI_HOME_DIR, experiment_id, 'log'), exist_ok=True)
......@@ -115,23 +117,6 @@ def set_trial_config(experiment_config, port, config_file_name):
fout.write(json.dumps(json.loads(response.text), indent=4, sort_keys=True, separators=(',', ':')))
return False
def set_local_config(experiment_config, port, config_file_name):
'''set local configuration'''
request_data = dict()
if experiment_config.get('localConfig'):
request_data['local_config'] = experiment_config['localConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(request_data), REST_TIME_OUT)
err_message = ''
if not response or not check_response(response):
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
return set_trial_config(experiment_config, port, config_file_name), None
def set_adl_config(experiment_config, port, config_file_name):
'''set adl configuration'''
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
......@@ -140,36 +125,6 @@ def set_adl_config(experiment_config, port, config_file_name):
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), None
def set_remote_config(experiment_config, port, config_file_name):
'''Call setClusterMetadata to pass trial'''
#set machine_list
request_data = dict()
if experiment_config.get('remoteConfig'):
request_data['remote_config'] = experiment_config['remoteConfig']
else:
request_data['remote_config'] = {'reuse': False}
request_data['machine_list'] = experiment_config['machineList']
if request_data['machine_list']:
for i in range(len(request_data['machine_list'])):
if isinstance(request_data['machine_list'][i].get('gpuIndices'), int):
request_data['machine_list'][i]['gpuIndices'] = str(request_data['machine_list'][i].get('gpuIndices'))
# It needs to connect all remote machines, the time out of connection is 30 seconds.
# So timeout of this place should be longer.
response = rest_put(cluster_metadata_url(port), json.dumps(request_data), 60, True)
err_message = ''
if not response or not check_response(response):
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def setNNIManagerIp(experiment_config, port, config_file_name):
'''set nniManagerIp'''
if experiment_config.get('nniManagerIp') is None:
......@@ -187,25 +142,6 @@ def setNNIManagerIp(experiment_config, port, config_file_name):
return False, err_message
return True, None
def set_pai_config(experiment_config, port, config_file_name):
'''set pai configuration'''
pai_config_data = dict()
pai_config_data['pai_config'] = experiment_config['paiConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(pai_config_data), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_kubeflow_config(experiment_config, port, config_file_name):
'''set kubeflow configuration'''
kubeflow_config_data = dict()
......@@ -244,77 +180,6 @@ def set_frameworkcontroller_config(experiment_config, port, config_file_name):
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_dlts_config(experiment_config, port, config_file_name):
'''set dlts configuration'''
dlts_config_data = dict()
dlts_config_data['dlts_config'] = experiment_config['dltsConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(dlts_config_data), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_aml_config(experiment_config, port, config_file_name):
'''set aml configuration'''
aml_config_data = dict()
aml_config_data['aml_config'] = experiment_config['amlConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(aml_config_data), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_hybrid_config(experiment_config, port, config_file_name):
'''set hybrid configuration'''
hybrid_config_data = dict()
hybrid_config_data['hybrid_config'] = experiment_config['hybridConfig']
platform_list = experiment_config['hybridConfig']['trainingServicePlatforms']
for platform in platform_list:
if platform == 'aml':
hybrid_config_data['aml_config'] = experiment_config['amlConfig']
elif platform == 'remote':
if experiment_config.get('remoteConfig'):
hybrid_config_data['remote_config'] = experiment_config['remoteConfig']
hybrid_config_data['machine_list'] = experiment_config['machineList']
elif platform == 'local' and experiment_config.get('localConfig'):
hybrid_config_data['local_config'] = experiment_config['localConfig']
elif platform == 'pai':
hybrid_config_data['pai_config'] = experiment_config['paiConfig']
# It needs to connect all remote machines, set longer timeout here to wait for restful server connection response.
time_out = 60 if 'remote' in platform_list else REST_TIME_OUT
response = rest_put(cluster_metadata_url(port), json.dumps(hybrid_config_data), time_out)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_shared_storage(experiment_config, port, config_file_name):
if 'sharedStorage' in experiment_config:
response = rest_put(cluster_metadata_url(port), json.dumps({'shared_storage_config': experiment_config['sharedStorage']}), REST_TIME_OUT)
......@@ -328,7 +193,7 @@ def set_shared_storage(experiment_config, port, config_file_name):
return False, err_message
return True, None
def set_experiment(experiment_config, mode, port, config_file_name):
def set_experiment_v1(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict()
request_data['authorName'] = experiment_config['authorName']
......@@ -371,28 +236,7 @@ def set_experiment(experiment_config, mode, port, config_file_name):
if experiment_config.get('logCollection'):
request_data['logCollection'] = experiment_config.get('logCollection')
request_data['clusterMetaData'] = []
if experiment_config['trainingServicePlatform'] == 'local':
if experiment_config.get('localConfig'):
request_data['clusterMetaData'].append(
{'key': 'local_config', 'value': experiment_config['localConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'remote':
request_data['clusterMetaData'].append(
{'key': 'machine_list', 'value': experiment_config['machineList']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
if not experiment_config.get('remoteConfig'):
# set default value of reuse in remoteConfig to False
experiment_config['remoteConfig'] = {'reuse': False}
request_data['clusterMetaData'].append(
{'key': 'remote_config', 'value': experiment_config['remoteConfig']})
elif experiment_config['trainingServicePlatform'] == 'pai':
request_data['clusterMetaData'].append(
{'key': 'pai_config', 'value': experiment_config['paiConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'kubeflow':
if experiment_config['trainingServicePlatform'] == 'kubeflow':
request_data['clusterMetaData'].append(
{'key': 'kubeflow_config', 'value': experiment_config['kubeflowConfig']})
request_data['clusterMetaData'].append(
......@@ -402,26 +246,6 @@ def set_experiment(experiment_config, mode, port, config_file_name):
{'key': 'frameworkcontroller_config', 'value': experiment_config['frameworkcontrollerConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'aml':
request_data['clusterMetaData'].append(
{'key': 'aml_config', 'value': experiment_config['amlConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'hybrid':
request_data['clusterMetaData'].append(
{'key': 'hybrid_config', 'value': experiment_config['hybridConfig']})
platform_list = experiment_config['hybridConfig']['trainingServicePlatforms']
request_dict = {
'aml': {'key': 'aml_config', 'value': experiment_config.get('amlConfig')},
'remote': {'key': 'machine_list', 'value': experiment_config.get('machineList')},
'pai': {'key': 'pai_config', 'value': experiment_config.get('paiConfig')},
'local': {'key': 'local_config', 'value': experiment_config.get('localConfig')}
}
for platform in platform_list:
if request_dict.get(platform):
request_data['clusterMetaData'].append(request_dict[platform])
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'adl':
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
......@@ -436,28 +260,29 @@ def set_experiment(experiment_config, mode, port, config_file_name):
print_error('Setting experiment error, error message is {}'.format(response.text))
return None
def set_experiment_v2(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
response = rest_post(experiment_url(port), json.dumps(experiment_config), REST_TIME_OUT, show_error=True)
if check_response(response):
return response
else:
_, stderr_full_path = get_log_path(config_file_name)
if response is not None:
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(response.text), indent=4, sort_keys=True, separators=(',', ':')))
print_error('Setting experiment error, error message is {}'.format(response.text))
return None
def set_platform_config(platform, experiment_config, port, config_file_name, rest_process):
'''call set_cluster_metadata for specific platform'''
print_normal('Setting {0} config...'.format(platform))
config_result, err_msg = None, None
if platform == 'adl':
config_result, err_msg = set_adl_config(experiment_config, port, config_file_name)
elif platform == 'local':
config_result, err_msg = set_local_config(experiment_config, port, config_file_name)
elif platform == 'remote':
config_result, err_msg = set_remote_config(experiment_config, port, config_file_name)
elif platform == 'pai':
config_result, err_msg = set_pai_config(experiment_config, port, config_file_name)
elif platform == 'kubeflow':
config_result, err_msg = set_kubeflow_config(experiment_config, port, config_file_name)
elif platform == 'frameworkcontroller':
config_result, err_msg = set_frameworkcontroller_config(experiment_config, port, config_file_name)
elif platform == 'dlts':
config_result, err_msg = set_dlts_config(experiment_config, port, config_file_name)
elif platform == 'aml':
config_result, err_msg = set_aml_config(experiment_config, port, config_file_name)
elif platform == 'hybrid':
config_result, err_msg = set_hybrid_config(experiment_config, port, config_file_name)
else:
raise Exception(ERROR_INFO % 'Unsupported platform!')
exit(1)
......@@ -473,7 +298,7 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res
raise Exception(ERROR_INFO % 'Rest server stopped!')
exit(1)
def launch_experiment(args, experiment_config, mode, experiment_id):
def launch_experiment(args, experiment_config, mode, experiment_id, config_version):
'''follow steps to start rest server and start experiment'''
# check packages for tuner
package_name, module_name = None, None
......@@ -503,12 +328,17 @@ def launch_experiment(args, experiment_config, mode, experiment_id):
if log_level not in ['trace', 'debug'] and (args.debug or experiment_config.get('debug') is True):
log_level = 'debug'
# start rest server
rest_process, start_time = start_rest_server(args.port, experiment_config['trainingServicePlatform'], \
if config_version == 1:
platform = experiment_config['trainingServicePlatform']
else:
platform = experiment_config['trainingService']['platform']
rest_process, start_time = start_rest_server(args.port, platform, \
mode, experiment_id, foreground, log_dir, log_level)
# save experiment information
Experiments().add_experiment(experiment_id, args.port, start_time,
experiment_config['trainingServicePlatform'],
experiment_config['experimentName'], pid=rest_process.pid, logDir=log_dir)
platform,
experiment_config.get('experimentName', 'N/A'), pid=rest_process.pid, logDir=log_dir)
# Deal with annotation
if experiment_config.get('useAnnotation'):
path = os.path.join(tempfile.gettempdir(), get_user(), 'nni', 'annotation')
......@@ -521,7 +351,8 @@ def launch_experiment(args, experiment_config, mode, experiment_id):
search_space = generate_search_space(code_dir)
experiment_config['searchSpace'] = json.dumps(search_space)
assert search_space, ERROR_INFO % 'Generated search space is empty'
elif experiment_config.get('searchSpacePath'):
elif config_version == 1:
if experiment_config.get('searchSpacePath'):
search_space = get_json_content(experiment_config.get('searchSpacePath'))
experiment_config['searchSpace'] = json.dumps(search_space)
else:
......@@ -539,7 +370,7 @@ def launch_experiment(args, experiment_config, mode, experiment_id):
except Exception:
raise Exception(ERROR_INFO % 'Rest server stopped!')
exit(1)
if mode != 'view':
if config_version == 1 and mode != 'view':
# set platform configuration
set_platform_config(experiment_config['trainingServicePlatform'], experiment_config, args.port,\
experiment_id, rest_process)
......@@ -549,7 +380,10 @@ def launch_experiment(args, experiment_config, mode, experiment_id):
# set debug configuration
if mode != 'view' and experiment_config.get('debug') is None:
experiment_config['debug'] = args.debug
response = set_experiment(experiment_config, mode, args.port, experiment_id)
if config_version == 1:
response = set_experiment_v1(experiment_config, mode, args.port, experiment_id)
else:
response = set_experiment_v2(experiment_config, mode, args.port, experiment_id)
if response:
if experiment_id is None:
experiment_id = json.loads(response.text).get('experiment_id')
......@@ -584,25 +418,27 @@ def create_experiment(args):
if not os.path.exists(config_path):
print_error('Please set correct config path!')
exit(1)
experiment_config = get_yml_content(config_path)
config_yml = get_yml_content(config_path)
try:
validate_all_content(experiment_config, config_path)
except Exception:
print_warning('Validation with V1 schema failed. Trying to convert from V2 format...')
config = ExperimentConfig(_base_path=Path(config_path).parent, **config_yml)
config_v2 = config.json()
except Exception as error_v2:
print_warning('Validation with V2 schema failed. Trying to convert from V1 format...')
try:
config = ExperimentConfig(_base_path=Path(config_path).parent, **experiment_config)
experiment_config = convert.to_v1_yaml(config)
except Exception as e:
print_error(f'Config in v2 format validation failed, the config error in v2 format is: {repr(e)}')
try:
validate_all_content(experiment_config, config_path)
except Exception as e:
print_error(f'Config in v1 format validation failed, the config error in v1 format is: {repr(e)}')
validate_all_content(config_yml, config_path)
except Exception as error_v1:
print_error(f'Convert from v1 format failed: {repr(error_v1)}')
print_error(f'Config in v2 format validation failed: {repr(error_v2)}')
exit(1)
from nni.experiment.config import convert
config_v2 = convert.to_v2(config_yml).json()
try:
launch_experiment(args, experiment_config, 'new', experiment_id)
if getattr(config_v2['trainingService'], 'platform', None) in k8s_training_services:
launch_experiment(args, config_yml, 'new', experiment_id, 1)
else:
launch_experiment(args, config_v2, 'new', experiment_id, 2)
except Exception as exception:
restServerPid = Experiments().get_all_experiments().get(experiment_id, {}).get('pid')
if restServerPid:
......@@ -632,8 +468,12 @@ def manage_stopped_experiment(args, mode):
print_normal('{0} experiment {1}...'.format(mode, experiment_id))
experiment_config = Config(experiment_id, experiments_dict[args.id]['logDir']).get_config()
experiments_config.update_experiment(args.id, 'port', args.port)
assert 'trainingService' in experiment_config or 'trainingServicePlatform' in experiment_config
try:
launch_experiment(args, experiment_config, mode, experiment_id)
if 'trainingService' in experiment_config:
launch_experiment(args, experiment_config, mode, experiment_id, 2)
else:
launch_experiment(args, experiment_config, mode, experiment_id, 1)
except Exception as exception:
restServerPid = Experiments().get_all_experiments().get(experiment_id, {}).get('pid')
if restServerPid:
......
......@@ -124,4 +124,5 @@ def validate_all_content(experiment_config, config_path):
NNIConfigSchema().validate(experiment_config)
if 'maxExecDuration' in experiment_config:
experiment_config['maxExecDuration'] = parse_time(experiment_config['maxExecDuration'])
......@@ -178,25 +178,24 @@ def create_customized_class_instance(class_params):
----------
class_params: dict
class_params should contains following keys:
codeDir: code directory
classFileName: python file name of the class
className: class name
codeDirectory: code directory
className: qualified class name
classArgs (optional): kwargs pass to class constructor
Returns: object
-------
Returns customized class instance.
"""
code_dir = class_params.get('codeDir')
class_filename = class_params.get('classFileName')
class_name = class_params.get('className')
code_dir = class_params.get('classDirectory')
qualified_class_name = class_params.get('className')
class_args = class_params.get('classArgs')
if not os.path.isfile(os.path.join(code_dir, class_filename)):
raise ValueError('Class file not found: {}'.format(
os.path.join(code_dir, class_filename)))
if code_dir and not os.path.isdir(code_dir):
raise ValueError(f'Directory not found: {code_dir}')
sys.path.append(code_dir)
module_name = os.path.splitext(class_filename)[0]
module_name, class_name = qualified_class_name.rsplit('.', 1)
class_module = importlib.import_module(module_name)
class_constructor = getattr(class_module, class_name)
......
......@@ -45,13 +45,6 @@ testCases:
- name: multi-thread
configFile: test/config/multi_thread/config.yml
- name: multi-phase-batch
configFile: test/config/multi_phase/batch.yml
config:
# for batch tuner, maxTrialNum can not exceed length of search space
maxTrialNum: 2
trialConcurrency: 2
#########################################################################
# nni assessor test
#########################################################################
......
......@@ -30,7 +30,8 @@
"argsIgnorePattern": "^_"
}
],
"@typescript-eslint/no-var-requires": 0
"@typescript-eslint/no-var-requires": 0,
"@typescript-eslint/no-non-null-assertion": 0
},
"ignorePatterns": [
"node_modules/",
......
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
'use strict';
import * as assert from 'assert';
export interface TrainingServiceConfig {
platform: string;
}
/* Local */
export interface LocalConfig extends TrainingServiceConfig {
platform: 'local';
useActiveGpu?: boolean;
maxTrialNumberPerGpu: number;
gpuIndices?: number[];
}
/* Remote */
export interface RemoteMachineConfig {
host: string;
port: number;
user: string;
password?: string;
sshKeyFile: string;
sshPassphrase?: string;
useActiveGpu: boolean;
maxTrialNumberPerGpu: number;
gpuIndices?: number[];
pythonPath?: string;
}
export interface RemoteConfig extends TrainingServiceConfig {
platform: 'remote';
reuseMode: boolean;
machineList: RemoteMachineConfig[];
}
/* OpenPAI */
export interface OpenpaiConfig extends TrainingServiceConfig {
platform: 'openpai';
host: string;
username: string;
token: string;
trialCpuNumber: number;
trialMemorySize: string;
storageConfigName: string;
dockerImage: string;
localStorageMountPoint: string;
containerStorageMountPoint: string;
reuseMode: boolean;
openpaiConfig?: object;
}
/* AML */
export interface AmlConfig extends TrainingServiceConfig {
platform: 'aml';
subscriptionId: string;
resourceGroup: string;
workspaceName: string;
computeTarget: string;
dockerImage: string;
}
/* Kubeflow */
// FIXME: merge with shared storage config
export interface KubeflowStorageConfig {
storage: string;
server?: string;
path?: string;
azureAccount?: string;
azureShare?: string;
keyVault?: string;
keyVaultSecret?: string;
}
export interface KubeflowRoleConfig {
replicas: number;
command: string;
gpuNumber: number;
cpuNumber: number;
memorySize: string;
dockerImage: string;
}
export interface KubeflowConfig extends TrainingServiceConfig {
platform: 'kubeflow';
operator: string;
apiVersion: string;
storage: KubeflowStorageConfig;
worker: KubeflowRoleConfig;
parameterServer?: KubeflowRoleConfig;
}
/* FrameworkController */
type FrameworkControllerStorageConfig = KubeflowStorageConfig;
export interface FrameworkControllerRoleConfig {
name: string;
dockerImage: string;
taskNumber: number;
command: string;
gpuNumber: number;
cpuNumber: number;
memorySize: string;
attemptCompletionMinFailedTasks: number;
attemptCompletionMinSucceededTasks: number;
}
export interface FrameworkControllerConfig extends TrainingServiceConfig {
platform: 'frameworkcontroller';
serviceAccountName: string;
storage: FrameworkControllerStorageConfig;
taskRoles: FrameworkControllerRoleConfig[];
}
/* shared storage */
export interface SharedStorageConfig {
storageType: string;
localMountPoint: string;
remoteMountPoint: string;
localMounted: string;
}
export interface NfsConfig extends SharedStorageConfig {
storageType: 'NFS';
nfsServer: string;
exportedDirectory: string;
}
export interface AzureBlobConfig extends SharedStorageConfig {
storageAccountName: string;
storageAccountKey?: string;
resourceGroupName?: string;
containerName: string;
}
/* common */
export interface AlgorithmConfig {
name?: string;
className?: string;
codeDirectory?: string;
classArgs?: object;
}
export interface ExperimentConfig {
experimentName?: string;
searchSpace: any;
trialCommand: string;
trialCodeDirectory: string;
trialConcurrency: number;
trialGpuNumber?: number;
maxExperimentDuration?: string;
maxTrialNumber?: number;
nniManagerIp?: string;
//useAnnotation: boolean; // dealed inside nnictl
debug: boolean;
logLevel?: string;
experimentWorkingDirectory?: string;
tunerGpuIndices?: number[];
tuner?: AlgorithmConfig;
assessor?: AlgorithmConfig;
advisor?: AlgorithmConfig;
trainingService: TrainingServiceConfig | TrainingServiceConfig[];
sharedStorage?: SharedStorageConfig;
deprecated?: any; // configs that are not yet natively supported by v2 (workaround)
}
/* util functions */
const timeUnits = { d: 24 * 3600, h: 3600, m: 60, s: 1 };
export function toSeconds(time: string): number {
for (const [unit, factor] of Object.entries(timeUnits)) {
if (time.toLowerCase().endsWith(unit)) {
const digits = time.slice(0, -1);
return Number(digits) * factor;
}
}
throw new Error(`Bad time string "${time}"`);
}
const sizeUnits = { tb: 1024 * 1024, gb: 1024 * 1024, mb: 1, kb: 1 / 1024 };
export function toMegaBytes(size: string): number {
for (const [unit, factor] of Object.entries(sizeUnits)) {
if (size.toLowerCase().endsWith(unit)) {
const digits = size.slice(0, -2);
return Math.floor(Number(digits) * factor);
}
}
throw new Error(`Bad size string "${size}"`);
}
export function toCudaVisibleDevices(gpuIndices?: number[]): string {
return gpuIndices === undefined ? '' : gpuIndices.join(',');
}
export function flattenConfig<T>(config: ExperimentConfig, platform: string): T {
const flattened = { };
Object.assign(flattened, config);
if (Array.isArray(config.trainingService)) {
for (const trainingService of config.trainingService) {
if (trainingService.platform === platform) {
Object.assign(flattened, trainingService);
}
}
} else {
assert(config.trainingService.platform === platform);
Object.assign(flattened, config.trainingService);
}
return <T>flattened;
}
......@@ -17,8 +17,14 @@ const INFO: number = 4;
const DEBUG: number = 5;
const TRACE: number = 6;
const logLevelNameMap: Map<string, number> = new Map([['fatal', FATAL],
['error', ERROR], ['warning', WARNING], ['info', INFO], ['debug', DEBUG], ['trace', TRACE]]);
const logLevelNameMap: Map<string, number> = new Map([
['fatal', FATAL],
['error', ERROR],
['warning', WARNING],
['info', INFO],
['debug', DEBUG],
['trace', TRACE]
]);
class BufferSerialEmitter {
private buffer: Buffer;
......
......@@ -5,6 +5,7 @@
import { MetricDataRecord, MetricType, TrialJobInfo } from './datastore';
import { TrialJobStatus, LogType } from './trainingService';
import { ExperimentConfig } from './experimentConfig';
type ProfileUpdateType = 'TRIAL_CONCURRENCY' | 'MAX_EXEC_DURATION' | 'SEARCH_SPACE' | 'MAX_TRIAL_NUM';
type ExperimentStatus = 'INITIALIZED' | 'RUNNING' | 'ERROR' | 'STOPPING' | 'STOPPED' | 'DONE' | 'NO_MORE_TRIAL' | 'TUNER_NO_MORE_TRIAL';
......@@ -13,58 +14,12 @@ namespace ExperimentStartUpMode {
export const RESUME = 'resume';
}
interface ExperimentParams {
authorName: string;
experimentName: string;
description?: string;
trialConcurrency: number;
maxExecDuration: number; //seconds
maxTrialNum: number;
searchSpace: string;
trainingServicePlatform: string;
multiPhase?: boolean;
multiThread?: boolean;
versionCheck?: boolean;
logCollection?: string;
tuner?: {
className?: string;
builtinTunerName?: string;
codeDir?: string;
classArgs?: any;
classFileName?: string;
checkpointDir: string;
includeIntermediateResults?: boolean;
gpuIndices?: string;
};
assessor?: {
className?: string;
builtinAssessorName?: string;
codeDir?: string;
classArgs?: any;
classFileName?: string;
checkpointDir: string;
};
advisor?: {
className?: string;
builtinAdvisorName?: string;
codeDir?: string;
classArgs?: any;
classFileName?: string;
checkpointDir: string;
gpuIndices?: string;
};
clusterMetaData?: {
key: string;
value: string;
}[];
}
interface ExperimentProfile {
params: ExperimentParams;
params: ExperimentConfig;
id: string;
execDuration: number;
logDir?: string;
startTime?: number;
logDir: string;
startTime: number;
endTime?: number;
nextSequenceId: number;
revision: number;
......@@ -81,7 +36,7 @@ interface NNIManagerStatus {
}
abstract class Manager {
public abstract startExperiment(experimentParams: ExperimentParams): Promise<string>;
public abstract startExperiment(experimentConfig: ExperimentConfig): Promise<string>;
public abstract resumeExperiment(readonly: boolean): Promise<void>;
public abstract stopExperiment(): Promise<void>;
public abstract stopExperimentTopHalf(): Promise<void>;
......@@ -113,4 +68,4 @@ abstract class Manager {
public abstract fetchTrialOutput(trialJobId: string, subpath: string): Promise<void>;
}
export { Manager, ExperimentParams, ExperimentProfile, TrialJobStatistics, ProfileUpdateType, NNIManagerStatus, ExperimentStatus, ExperimentStartUpMode };
export { Manager, ExperimentConfig, ExperimentProfile, TrialJobStatistics, ProfileUpdateType, NNIManagerStatus, ExperimentStatus, ExperimentStartUpMode };
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment