Unverified Commit 298eeffb authored by liuzhe-lz's avatar liuzhe-lz Committed by GitHub
Browse files

Fix bugs for hybrid and k8s modes (#4409)

parent 2a722cf7
......@@ -176,12 +176,16 @@ def to_v2(v1):
_move_field(v1_role, v2_role, 'memoryMB', 'memorySize')
_move_field(v1_role, v2_role, 'image', 'dockerImage')
_deprecate(v1_role, v2, 'privateRegistryAuthPath')
v2_role['codeDirectory'] = v2['trialCodeDirectory']
if v1_role:
_logger.error('kubeflow role not fully converted: %s', v1_role)
if platform == 'frameworkcontroller':
fc_config = v1.pop('frameworkcontroller')
_deprecate(fc_config, v2, 'serviceAccountName')
fc_config = v1.pop('frameworkcontrollerConfig')
_move_field(fc_config, ts, 'serviceAccountName')
_move_field(fc_config, ts, 'reuse', 'reuseMode')
storage_name = fc_config.pop('storage', None)
if storage_name is None:
......@@ -219,9 +223,22 @@ def to_v2(v1):
_move_field(v1_role, v2_role, 'memoryMB', 'memorySize')
_move_field(v1_role, v2_role, 'image', 'dockerImage')
_deprecate(v1_role, v2, 'privateRegistryAuthPath')
policy = 'frameworkAttemptCompletionPolicy'
if v1_role[policy]:
v2_role[policy] = {}
_move_field(v1_role[policy], v2_role[policy], 'minFailedTaskCount')
_move_field(v1_role[policy], v2_role[policy], 'minSucceededTaskCount', 'minSucceedTaskCount')
if not v1_role[policy]:
v1_role.pop(policy)
if v1_role:
_logger.error('frameworkcontroller role not fully converted: %s', v1_role)
# this is required, seems a bug in nni manager
if not v2.get('trialCommand'):
v2['trialCommand'] = v2_role['command']
# hybrid mode should always use v2 schema, so no need to handle here
v1_storage = v1.pop('sharedStorage', None)
......
......@@ -46,3 +46,9 @@ class FrameworkControllerConfig(TrainingServiceConfig):
service_account_name: Optional[str]
task_roles: List[FrameworkControllerRoleConfig]
reuse_mode: Optional[bool] = True
def _canonicalize(self, parents):
super()._canonicalize(parents)
# framework controller does not need these fields, set empty string for type check
if self.trial_command is None:
self.trial_command = ''
......@@ -44,6 +44,14 @@ class KubeflowConfig(TrainingServiceConfig):
master: Optional[KubeflowRoleConfig] = None
reuse_mode: Optional[bool] = True #set reuse mode as true for v2 config
def _canonicalize(self, parents):
super()._canonicalize(parents)
# kubeflow does not need these fields, set empty string for type check
if self.trial_command is None:
self.trial_command = ''
if self.trial_code_directory is None:
self.trial_code_directory = ''
def _validate_canonical(self):
super()._validate_canonical()
assert self.operator in ['tf-operator', 'pytorch-operator']
......@@ -253,14 +253,17 @@ def _save_experiment_information(experiment_id: str, port: int, start_time: int,
def get_stopped_experiment_config(exp_id, exp_dir=None):
if exp_dir:
exp_config = Config(exp_id, exp_dir).get_config()
config = ExperimentConfig(**exp_config)
if not os.path.samefile(exp_dir, config.experiment_working_directory):
config_json = get_stopped_experiment_config_json(exp_id, exp_dir)
config = ExperimentConfig(**config_json)
if exp_dir and not os.path.samefile(exp_dir, config.experiment_working_directory):
msg = 'Experiment working directory provided in command line (%s) is different from experiment config (%s)'
_logger.warning(msg, exp_dir, config.experiment_working_directory)
config.experiment_working_directory = exp_dir
return config
def get_stopped_experiment_config_json(exp_id, exp_dir=None):
if exp_dir:
return Config(exp_id, exp_dir).get_config()
else:
update_experiment()
experiments_config = Experiments()
......@@ -268,9 +271,8 @@ def get_stopped_experiment_config(exp_id, exp_dir=None):
experiment_metadata = experiments_dict.get(exp_id)
if experiment_metadata is None:
_logger.error('Id %s not exist!', exp_id)
return
return None
if experiment_metadata['status'] != 'STOPPED':
_logger.error('Only stopped experiments can be resumed or viewed!')
return
experiment_config = Config(exp_id, experiment_metadata['logDir']).get_config()
return ExperimentConfig(**experiment_config)
return None
return Config(exp_id, experiment_metadata['logDir']).get_config()
......@@ -357,7 +357,8 @@ kubeflow_config_schema = {
'nfs': {
'server': setType('server', str),
'path': setType('path', str)
}
},
Optional('reuse'): setType('reuse', bool),
}, {
'operator': setChoice('operator', 'tf-operator', 'pytorch-operator'),
'apiVersion': setType('apiVersion', str),
......@@ -374,7 +375,8 @@ kubeflow_config_schema = {
'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),
error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)')
},
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999)
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999),
Optional('reuse'): setType('reuse', bool),
})
}
......@@ -408,12 +410,14 @@ frameworkcontroller_config_schema = {
},
Optional('namespace'): setType('namespace', str),
Optional('configPath'): setType('configPath', str),
Optional('reuse'): setType('reuse', bool),
}, {
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage', 'pvc'),
Optional('serviceAccountName'): setType('serviceAccountName', str),
'configPath': setType('configPath', str),
'pvc': {'path': setType('server', str)},
Optional('namespace'): setType('namespace', str),
Optional('reuse'): setType('reuse', bool),
}, {
Optional('storage'): setChoice('storage', 'nfs', 'azureStorage', 'pvc'),
Optional('serviceAccountName'): setType('serviceAccountName', str),
......@@ -432,6 +436,7 @@ frameworkcontroller_config_schema = {
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999),
Optional('namespace'): setType('namespace', str),
Optional('configPath'): setType('configPath', str),
Optional('reuse'): setType('reuse', bool),
})
}
......
......@@ -12,6 +12,10 @@ from nni.experiment import Experiment, RunMode
from nni.experiment.config import ExperimentConfig, convert, utils
from nni.tools.annotation import expand_annotations, generate_search_space
# used for v1-only legacy setup, remove them later
from nni.experiment.launcher import get_stopped_experiment_config_json
from . import legacy_launcher
def create_experiment(args):
# to make it clear what are inside args
config_file = Path(args.config)
......@@ -28,12 +32,18 @@ def create_experiment(args):
config_content = yaml.safe_load(config)
v1_platform = config_content.get('trainingServicePlatform')
if v1_platform:
can_convert = True
if v1_platform == 'adl':
from . import legacy_launcher
can_convert = False
if v1_platform in ['kubeflow', 'frameworkcontroller']:
reuse = config_content.get(v1_platform + 'Config', {}).get('reuse')
can_convert = (reuse != False) # if user does not explicitly specify it, convert to reuse mode
if not can_convert:
legacy_launcher.create_experiment(args)
exit()
if v1_platform:
try:
v2_config = convert.to_v2(config_content)
except Exception:
......@@ -42,12 +52,14 @@ def create_experiment(args):
exit(1)
print(Fore.YELLOW + f'WARNING: You are using legacy config file, please update it to latest format:' + Fore.RESET)
print(Fore.YELLOW + '=' * 80 + Fore.RESET)
print(yaml.dump(v2_config).strip())
print(yaml.dump(v2_config, sort_keys=False).strip())
print(Fore.YELLOW + '=' * 80 + Fore.RESET)
print(Fore.YELLOW + 'Reference: https://nni.readthedocs.io/en/stable/reference/experiment_config.html' + Fore.RESET)
utils.set_base_path(config_file.parent)
config = ExperimentConfig(**v2_config)
utils.unset_base_path()
else:
config = ExperimentConfig.load(config_file)
......@@ -73,6 +85,11 @@ def resume_experiment(args):
foreground = args.foreground
exp_dir = args.experiment_dir
config_json = get_stopped_experiment_config_json(exp_id, exp_dir)
if config_json.get('trainingServicePlatform'):
legacy_launcher.resume_experiment(args)
exit()
exp = Experiment._resume(exp_id, exp_dir)
run_mode = RunMode.Foreground if foreground else RunMode.Detach
exp.start(port, debug, run_mode)
......@@ -82,5 +99,10 @@ def view_experiment(args):
port = args.port
exp_dir = args.experiment_dir
config_json = get_stopped_experiment_config_json(exp_id, exp_dir)
if config_json.get('trainingServicePlatform'):
legacy_launcher.view_experiment(args)
exit()
exp = Experiment._view(exp_id, exp_dir)
exp.start(port, run_mode=RunMode.Detach)
......@@ -50,6 +50,7 @@ def update_training_service_config(args):
config[args.ts]['kubeflowConfig']['azureStorage']['azureShare'] = args.azs_share
if args.nni_docker_image is not None:
config[args.ts]['trial']['worker']['image'] = args.nni_docker_image
config[args.ts]['kubeflowConfig']['reuse'] = False
elif args.ts == 'kubeflow' and args.reuse_mode == 'True':
config = get_yml_content(TRAINING_SERVICE_FILE_V2)
config[args.ts]['trainingService']['worker']['dockerImage'] = args.nni_docker_image
......@@ -74,6 +75,7 @@ def update_training_service_config(args):
config[args.ts]['frameworkcontrollerConfig']['azureStorage']['azureShare'] = args.azs_share
if args.nni_docker_image is not None:
config[args.ts]['trial']['taskRoles'][0]['image'] = args.nni_docker_image
config[args.ts]['frameworkcontrollerConfig']['reuse'] = False
elif args.ts == 'frameworkcontroller' and args.reuse_mode == 'True':
config = get_yml_content(TRAINING_SERVICE_FILE_V2)
config[args.ts]['trainingService']['taskRoles'][0]['dockerImage'] = args.nni_docker_image
......
......@@ -5,38 +5,38 @@ import { RemoteEnvironmentService } from './remoteEnvironmentService';
import { KubeflowEnvironmentService } from './kubernetes/kubeflowEnvironmentService';
import { FrameworkControllerEnvironmentService } from './kubernetes/frameworkcontrollerEnvironmentService';
import { EnvironmentService } from '../environment';
import { ExperimentConfig } from 'common/experimentConfig';
import { TrainingServiceConfig } from 'common/experimentConfig';
import { ExperimentStartupInfo } from 'common/experimentStartupInfo';
import { getCustomEnvironmentServiceConfig } from 'common/nniConfig';
import { importModule } from 'common/utils';
import { DlcEnvironmentService } from './dlcEnvironmentService';
export async function createEnvironmentService(name: string, config: ExperimentConfig): Promise<EnvironmentService> {
export async function createEnvironmentService(config: TrainingServiceConfig): Promise<EnvironmentService> {
const info = ExperimentStartupInfo.getInstance();
const tsConfig: any = config.trainingService;
const configAsAny: any = config; // environment services have different config types, skip type check
switch (name) {
switch (config.platform) {
case 'local':
return new LocalEnvironmentService(tsConfig, info);
return new LocalEnvironmentService(configAsAny, info);
case 'remote':
return new RemoteEnvironmentService(tsConfig, info);
return new RemoteEnvironmentService(configAsAny, info);
case 'aml':
return new AMLEnvironmentService(tsConfig, info);
return new AMLEnvironmentService(configAsAny, info);
case 'openpai':
return new OpenPaiEnvironmentService(tsConfig, info);
return new OpenPaiEnvironmentService(configAsAny, info);
case 'kubeflow':
return new KubeflowEnvironmentService(tsConfig, info);
return new KubeflowEnvironmentService(configAsAny, info);
case 'frameworkcontroller':
return new FrameworkControllerEnvironmentService(tsConfig, info);
return new FrameworkControllerEnvironmentService(configAsAny, info);
case 'dlc':
return new DlcEnvironmentService(tsConfig, info);
return new DlcEnvironmentService(configAsAny, info);
}
const esConfig = await getCustomEnvironmentServiceConfig(name);
const esConfig = await getCustomEnvironmentServiceConfig(config.platform);
if (esConfig === null) {
throw new Error(`${name} is not a supported training service!`);
throw new Error(`${config.platform} is not a supported training service!`);
}
const esModule = importModule(esConfig.nodeModulePath);
const esClass = esModule[esConfig.nodeClassName] as any;
return new esClass(tsConfig, info);
return new esClass(configAsAny, info);
}
......@@ -125,7 +125,7 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment
return `${portScript} . /mnt/frameworkbarrier/injector.sh && ${command}`;
}
private async prepareFrameworkControllerConfig(trialJobId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string):
private async prepareFrameworkControllerConfig(envId: string, trialWorkingFolder: string, frameworkcontrollerJobName: string):
Promise<any> {
const podResources: any = [];
for (const taskRole of this.config.taskRoles) {
......@@ -136,7 +136,7 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment
}
// Generate frameworkcontroller job resource config object
const frameworkcontrollerJobConfig: any =
await this.generateFrameworkControllerJobConfig(trialJobId, trialWorkingFolder, frameworkcontrollerJobName, podResources);
await this.generateFrameworkControllerJobConfig(envId, trialWorkingFolder, frameworkcontrollerJobName, podResources);
return Promise.resolve(frameworkcontrollerJobConfig);
}
......@@ -160,7 +160,7 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment
* @param frameworkcontrollerJobName job name
* @param podResources pod template
*/
private async generateFrameworkControllerJobConfig(trialJobId: string, trialWorkingFolder: string,
private async generateFrameworkControllerJobConfig(envId: string, trialWorkingFolder: string,
frameworkcontrollerJobName: string, podResources: any): Promise<any> {
const taskRoles: any = [];
......@@ -198,7 +198,7 @@ export class FrameworkControllerEnvironmentService extends KubernetesEnvironment
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: this.experimentId,
trialId: trialJobId
envId: envId
}
},
spec: {
......
......@@ -119,7 +119,7 @@ class TrialDispatcher implements TrainingService {
await validateCodeDir(config.trialCodeDirectory);
const serviceConfigs = Array.isArray(config.trainingService) ? config.trainingService : [ config.trainingService ];
const servicePromises = serviceConfigs.map(serviceConfig => createEnvironmentService(serviceConfig.platform, config));
const servicePromises = serviceConfigs.map(serviceConfig => createEnvironmentService(serviceConfig));
this.environmentServiceList = await Promise.all(servicePromises);
this.environmentMaintenceLoopInterval = Math.max(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment