"src/include/amd_inline_asm.hip.hpp" did not exist on "dabfa77fc68bdbcfc2d10cdd515605c053cf1ed0"
Unverified Commit b177bdc8 authored by QuanluZhang's avatar QuanluZhang Committed by GitHub
Browse files

support hybrid training service v2.0 config (#3251)

parent 6330df2f
...@@ -26,7 +26,6 @@ remoteConfig: ...@@ -26,7 +26,6 @@ remoteConfig:
reuse: true reuse: true
machineList: machineList:
- ip: 10.1.1.1 - ip: 10.1.1.1
username: bob username: xxx
passwd: bob123 passwd: xxx
#port can be skip if using default ssh port 22 port: 22
#port: 22
\ No newline at end of file
experimentName: example_mnist
trialConcurrency: 3
maxExperimentDuration: 1h
maxTrialNumber: 10
searchSpaceFile: search_space.json
trialCodeDirectory: .
trialCommand: python3 mnist.py
trialGpuNumber: 0
tuner:
name: TPE
classArgs:
optimize_mode: maximize
trainingService:
- platform: local
- platform: remote
reuseMode: true
machineList:
- host: 10.1.1.1
user: xxx
password: xxx
#port can be skip if using default ssh port 22
port: 22
# FIXME: For demonstration only. It should not be here
from pathlib import Path
from nni.experiment import Experiment
from nni.experiment import RemoteMachineConfig
from nni.algorithms.hpo.hyperopt_tuner import HyperoptTuner
tuner = HyperoptTuner('tpe')
search_space = {
"dropout_rate": { "_type": "uniform", "_value": [0.5, 0.9] },
"conv_size": { "_type": "choice", "_value": [2, 3, 5, 7] },
"hidden_size": { "_type": "choice", "_value": [124, 512, 1024] },
"batch_size": { "_type": "choice", "_value": [16, 32] },
"learning_rate": { "_type": "choice", "_value": [0.0001, 0.001, 0.01, 0.1] }
}
experiment = Experiment(tuner, ['local', 'remote'])
experiment.config.experiment_name = 'test'
experiment.config.trial_concurrency = 3
experiment.config.max_trial_number = 10
experiment.config.search_space = search_space
experiment.config.trial_command = 'python3 mnist.py'
experiment.config.trial_code_directory = Path(__file__).parent
experiment.config.training_service[0].use_active_gpu = True
experiment.config.training_service[1].reuse_mode = True
rm_conf = RemoteMachineConfig()
rm_conf.host = '10.1.1.1'
rm_conf.user = 'xxx'
rm_conf.password = 'xxx'
rm_conf.port = 22
experiment.config.training_service[1].machine_list = [rm_conf]
experiment.run(26780, debug=True)
...@@ -65,15 +65,19 @@ class ExperimentConfig(ConfigBase): ...@@ -65,15 +65,19 @@ class ExperimentConfig(ConfigBase):
tuner: Optional[_AlgorithmConfig] = None tuner: Optional[_AlgorithmConfig] = None
accessor: Optional[_AlgorithmConfig] = None accessor: Optional[_AlgorithmConfig] = None
advisor: Optional[_AlgorithmConfig] = None advisor: Optional[_AlgorithmConfig] = None
training_service: TrainingServiceConfig training_service: Union[TrainingServiceConfig, List[TrainingServiceConfig]]
def __init__(self, training_service_platform: Optional[str] = None, **kwargs): def __init__(self, training_service_platform: Optional[Union[str, List[str]]] = None, **kwargs):
kwargs = util.case_insensitive(kwargs) kwargs = util.case_insensitive(kwargs)
if training_service_platform is not None: if training_service_platform is not None:
assert 'trainingservice' not in kwargs assert 'trainingservice' not in kwargs
kwargs['trainingservice'] = util.training_service_config_factory(training_service_platform) kwargs['trainingservice'] = util.training_service_config_factory(platform = training_service_platform)
elif isinstance(kwargs.get('trainingservice'), dict): elif isinstance(kwargs.get('trainingservice'), (dict, list)):
kwargs['trainingservice'] = util.training_service_config_factory(**kwargs['trainingservice']) # dict means a single training service
# list means hybrid training service
kwargs['trainingservice'] = util.training_service_config_factory(config = kwargs['trainingservice'])
else:
raise RuntimeError('Unsupported Training service configuration!')
super().__init__(**kwargs) super().__init__(**kwargs)
def validate(self, initialized_tuner: bool = False) -> None: def validate(self, initialized_tuner: bool = False) -> None:
......
...@@ -18,8 +18,20 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str, ...@@ -18,8 +18,20 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
data = config.json() data = config.json()
ts = data.pop('trainingService') ts = data.pop('trainingService')
if ts['platform'] == 'openpai': if isinstance(ts, list):
ts['platform'] = 'pai' hybrid_names = []
for conf in ts:
if conf['platform'] == 'openpai':
conf['platform'] = 'pai'
hybrid_names.append(conf['platform'])
_handle_training_service(conf, data)
data['trainingServicePlatform'] = 'hybrid'
data['hybridConfig'] = {'trainingServicePlatforms': hybrid_names}
else:
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
data['trainingServicePlatform'] = ts['platform']
_handle_training_service(ts, data)
data['authorName'] = 'N/A' data['authorName'] = 'N/A'
data['experimentName'] = data.get('experimentName', 'N/A') data['experimentName'] = data.get('experimentName', 'N/A')
...@@ -27,7 +39,7 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str, ...@@ -27,7 +39,7 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
if data['debug']: if data['debug']:
data['versionCheck'] = False data['versionCheck'] = False
data['maxTrialNum'] = data.pop('maxTrialNumber', 99999) data['maxTrialNum'] = data.pop('maxTrialNumber', 99999)
data['trainingServicePlatform'] = ts['platform']
ss = data.pop('searchSpace', None) ss = data.pop('searchSpace', None)
ss_file = data.pop('searchSpaceFile', None) ss_file = data.pop('searchSpaceFile', None)
if ss is not None: if ss is not None:
...@@ -66,6 +78,9 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str, ...@@ -66,6 +78,9 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
if 'trialGpuNumber' in data: if 'trialGpuNumber' in data:
data['trial']['gpuNum'] = data.pop('trialGpuNumber') data['trial']['gpuNum'] = data.pop('trialGpuNumber')
return data
def _handle_training_service(ts, data):
if ts['platform'] == 'local': if ts['platform'] == 'local':
data['localConfig'] = { data['localConfig'] = {
'useActiveGpu': ts.get('useActiveGpu', False), 'useActiveGpu': ts.get('useActiveGpu', False),
...@@ -140,8 +155,6 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str, ...@@ -140,8 +155,6 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
elif ts['platform'] == 'adl': elif ts['platform'] == 'adl':
data['trial']['image'] = ts['dockerImage'] data['trial']['image'] = ts['dockerImage']
return data
def _convert_gpu_indices(indices): def _convert_gpu_indices(indices):
return ','.join(str(idx) for idx in indices) if indices is not None else None return ','.join(str(idx) for idx in indices) if indices is not None else None
...@@ -175,19 +188,34 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]: ...@@ -175,19 +188,34 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
experiment_config = to_v1_yaml(config, skip_nnictl=True) experiment_config = to_v1_yaml(config, skip_nnictl=True)
ret = [] ret = []
if config.training_service.platform == 'local': if isinstance(config.training_service, list):
hybrid_conf = dict()
hybrid_conf['hybrid_config'] = experiment_config['hybridConfig']
for conf in config.training_service:
metadata = _get_cluster_metadata(conf.platform, experiment_config)
if metadata is not None:
hybrid_conf.update(metadata)
ret.append(hybrid_conf)
else:
metadata = _get_cluster_metadata(config.training_service.platform, experiment_config)
if metadata is not None:
ret.append(metadata)
if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret
def _get_cluster_metadata(platform: str, experiment_config) -> Dict:
if platform == 'local':
request_data = dict() request_data = dict()
request_data['local_config'] = experiment_config['localConfig'] request_data['local_config'] = experiment_config['localConfig']
if request_data['local_config']: if request_data['local_config']:
if request_data['local_config'].get('gpuIndices') and isinstance(request_data['local_config'].get('gpuIndices'), int): if request_data['local_config'].get('gpuIndices') and isinstance(request_data['local_config'].get('gpuIndices'), int):
request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices')) request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices'))
if request_data['local_config'].get('maxTrialNumOnEachGpu'): return request_data
request_data['local_config']['maxTrialNumOnEachGpu'] = request_data['local_config'].get('maxTrialNumOnEachGpu')
if request_data['local_config'].get('useActiveGpu'):
request_data['local_config']['useActiveGpu'] = request_data['local_config'].get('useActiveGpu')
ret.append(request_data)
elif config.training_service.platform == 'remote': elif platform == 'remote':
request_data = dict() request_data = dict()
if experiment_config.get('remoteConfig'): if experiment_config.get('remoteConfig'):
request_data['remote_config'] = experiment_config['remoteConfig'] request_data['remote_config'] = experiment_config['remoteConfig']
...@@ -198,31 +226,25 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]: ...@@ -198,31 +226,25 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
for i in range(len(request_data['machine_list'])): for i in range(len(request_data['machine_list'])):
if isinstance(request_data['machine_list'][i].get('gpuIndices'), int): if isinstance(request_data['machine_list'][i].get('gpuIndices'), int):
request_data['machine_list'][i]['gpuIndices'] = str(request_data['machine_list'][i].get('gpuIndices')) request_data['machine_list'][i]['gpuIndices'] = str(request_data['machine_list'][i].get('gpuIndices'))
ret.append(request_data) return request_data
elif config.training_service.platform == 'openpai': elif platform == 'openpai':
ret.append({'pai_config': experiment_config['paiConfig']}) return {'pai_config': experiment_config['paiConfig']}
elif config.training_service.platform == 'aml': elif platform == 'aml':
ret.append({'aml_config': experiment_config['amlConfig']}) return {'aml_config': experiment_config['amlConfig']}
elif config.training_service.platform == 'kubeflow': elif platform == 'kubeflow':
ret.append({'kubeflow_config': experiment_config['kubeflowConfig']}) return {'kubeflow_config': experiment_config['kubeflowConfig']}
elif config.training_service.platform == 'frameworkcontroller': elif platform == 'frameworkcontroller':
ret.append({'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']}) return {'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']}
elif config.training_service.platform == 'adl': elif platform == 'adl':
pass return None
else: else:
raise RuntimeError('Unsupported training service ' + config.training_service.platform) raise RuntimeError('Unsupported training service ' + platform)
if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret
def to_rest_json(config: ExperimentConfig) -> Dict[str, Any]: def to_rest_json(config: ExperimentConfig) -> Dict[str, Any]:
experiment_config = to_v1_yaml(config, skip_nnictl=True) experiment_config = to_v1_yaml(config, skip_nnictl=True)
......
...@@ -18,7 +18,7 @@ class RemoteMachineConfig(ConfigBase): ...@@ -18,7 +18,7 @@ class RemoteMachineConfig(ConfigBase):
port: int = 22 port: int = 22
user: str user: str
password: Optional[str] = None password: Optional[str] = None
ssh_key_file: PathLike = '~/.ssh/id_rsa' ssh_key_file: PathLike = None #'~/.ssh/id_rsa'
ssh_passphrase: Optional[str] = None ssh_passphrase: Optional[str] = None
use_active_gpu: bool = False use_active_gpu: bool = False
max_trial_number_per_gpu: int = 1 max_trial_number_per_gpu: int = 1
......
...@@ -8,7 +8,7 @@ Miscellaneous utility functions. ...@@ -8,7 +8,7 @@ Miscellaneous utility functions.
import math import math
import os.path import os.path
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union, List
PathLike = Union[Path, str] PathLike = Union[Path, str]
...@@ -29,12 +29,26 @@ def canonical_path(path: Optional[PathLike]) -> Optional[str]: ...@@ -29,12 +29,26 @@ def canonical_path(path: Optional[PathLike]) -> Optional[str]:
def count(*values) -> int: def count(*values) -> int:
return sum(value is not None and value is not False for value in values) return sum(value is not None and value is not False for value in values)
def training_service_config_factory(platform: str, **kwargs): # -> TrainingServiceConfig def training_service_config_factory(platform: Union[str, List[str]] = None, config: Union[List, Dict] = None): # -> TrainingServiceConfig
from .common import TrainingServiceConfig from .common import TrainingServiceConfig
for cls in TrainingServiceConfig.__subclasses__(): ts_configs = []
if cls.platform == platform: if platform is not None:
return cls(**kwargs) assert config is None
raise ValueError(f'Unrecognized platform {platform}') platforms = platform if isinstance(platform, list) else [platform]
for cls in TrainingServiceConfig.__subclasses__():
if cls.platform in platforms:
ts_configs.append(cls())
if len(ts_configs) < len(platforms):
raise RuntimeError('There is unrecognized platform!')
else:
assert config is not None
supported_platforms = {cls.platform: cls for cls in TrainingServiceConfig.__subclasses__()}
configs = config if isinstance(config, list) else [config]
for conf in configs:
if conf['platform'] not in supported_platforms:
raise RuntimeError(f'Unrecognized platform {conf["platform"]}')
ts_configs.append(supported_platforms[conf['platform']](**conf))
return ts_configs if len(ts_configs) > 1 else ts_configs[0]
def load_config(Type, value): def load_config(Type, value):
if isinstance(value, list): if isinstance(value, list):
......
...@@ -5,7 +5,7 @@ import socket ...@@ -5,7 +5,7 @@ import socket
from subprocess import Popen from subprocess import Popen
from threading import Thread from threading import Thread
import time import time
from typing import Optional, overload from typing import Optional, Union, List, overload
import colorama import colorama
import psutil import psutil
...@@ -54,7 +54,7 @@ class Experiment: ...@@ -54,7 +54,7 @@ class Experiment:
... ...
@overload @overload
def __init__(self, tuner: Tuner, training_service: str) -> None: def __init__(self, tuner: Tuner, training_service: Union[str, List[str]]) -> None:
""" """
Prepare an experiment, leaving configuration fields to be set later. Prepare an experiment, leaving configuration fields to be set later.
...@@ -86,7 +86,7 @@ class Experiment: ...@@ -86,7 +86,7 @@ class Experiment:
self._dispatcher: Optional[MsgDispatcher] = None self._dispatcher: Optional[MsgDispatcher] = None
self._dispatcher_thread: Optional[Thread] = None self._dispatcher_thread: Optional[Thread] = None
if isinstance(config, str): if isinstance(config, (str, list)):
config, training_service = None, config config, training_service = None, config
if config is None: if config is None:
......
...@@ -27,11 +27,13 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo ...@@ -27,11 +27,13 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo
config.validate(initialized_tuner=True) config.validate(initialized_tuner=True)
_ensure_port_idle(port) _ensure_port_idle(port)
if config.training_service.platform == 'openpai': if isinstance(config.training_service, list): # hybrid training service
_ensure_port_idle(port + 1, 'OpenPAI requires an additional port') _ensure_port_idle(port + 1, 'Hybrid training service requires an additional port')
elif config.training_service.platform in ['remote', 'openpai', 'kubeflow', 'frameworkcontroller', 'adl']:
_ensure_port_idle(port + 1, f'{config.training_service.platform} requires an additional port')
try: try:
_logger.info('Creating experiment %s', colorama.Fore.CYAN + exp_id + colorama.Style.RESET_ALL) _logger.info('Creating experiment, Experiment ID: %s', colorama.Fore.CYAN + exp_id + colorama.Style.RESET_ALL)
pipe = Pipe(exp_id) pipe = Pipe(exp_id)
start_time, proc = _start_rest_server(config, port, debug, exp_id, pipe.path) start_time, proc = _start_rest_server(config, port, debug, exp_id, pipe.path)
_logger.info('Connecting IPC pipe...') _logger.info('Connecting IPC pipe...')
...@@ -40,7 +42,8 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo ...@@ -40,7 +42,8 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo
nni.runtime.protocol._out_file = pipe_file nni.runtime.protocol._out_file = pipe_file
_logger.info('Statring web server...') _logger.info('Statring web server...')
_check_rest_server(port) _check_rest_server(port)
_save_experiment_information(exp_id, port, start_time, config.training_service.platform, platform = 'hybrid' if isinstance(config.training_service, list) else config.training_service.platform
_save_experiment_information(exp_id, port, start_time, platform,
config.experiment_name, proc.pid, config.experiment_working_directory) config.experiment_name, proc.pid, config.experiment_working_directory)
_logger.info('Setting up...') _logger.info('Setting up...')
_init_experiment(config, port, debug) _init_experiment(config, port, debug)
...@@ -66,9 +69,12 @@ def _ensure_port_idle(port: int, message: Optional[str] = None) -> None: ...@@ -66,9 +69,12 @@ def _ensure_port_idle(port: int, message: Optional[str] = None) -> None:
def _start_rest_server(config: ExperimentConfig, port: int, debug: bool, experiment_id: str, pipe_path: str) -> Tuple[int, Popen]: def _start_rest_server(config: ExperimentConfig, port: int, debug: bool, experiment_id: str, pipe_path: str) -> Tuple[int, Popen]:
ts = config.training_service.platform if isinstance(config.training_service, list):
if ts == 'openpai': ts = 'hybrid'
ts = 'pai' else:
ts = config.training_service.platform
if ts == 'openpai':
ts = 'pai'
args = { args = {
'port': port, 'port': port,
......
...@@ -46,7 +46,7 @@ class RetiariiExeConfig(ConfigBase): ...@@ -46,7 +46,7 @@ class RetiariiExeConfig(ConfigBase):
super().__init__(**kwargs) super().__init__(**kwargs)
if training_service_platform is not None: if training_service_platform is not None:
assert 'training_service' not in kwargs assert 'training_service' not in kwargs
self.training_service = util.training_service_config_factory(training_service_platform) self.training_service = util.training_service_config_factory(platform = training_service_platform)
def validate(self, initialized_tuner: bool = False) -> None: def validate(self, initialized_tuner: bool = False) -> None:
super().validate() super().validate()
......
...@@ -607,7 +607,7 @@ def create_experiment(args): ...@@ -607,7 +607,7 @@ def create_experiment(args):
try: try:
validate_all_content(experiment_config, config_path) validate_all_content(experiment_config, config_path)
except Exception as e: except Exception as e:
print_error(f'Config validation failed. {repr(e)}') print_error(f'Config in v1 format validation failed. {repr(e)}')
exit(1) exit(1)
nni_config.set_config('experimentConfig', experiment_config) nni_config.set_config('experimentConfig', experiment_config)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment