Unverified Commit 36dbc0fe authored by SparkSnail's avatar SparkSnail Committed by GitHub
Browse files

support frameworkcontroller training service (#484)

Add frameworkcontroller training service based on kubeflow training service.
Refactor code structure, add kubernetes training service as father class, and set kubeflow training service and frameworkcontroller training service as child class.
parent 416b8b53
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
'use strict'
import * as cpp from 'child-process-promise';
import * as path from 'path';
import { EventEmitter } from 'events';
import { getExperimentId, getInitTrialSequenceId } from '../../common/experimentStartupInfo';
import { getLogger, Logger } from '../../common/log';
import { getExperimentRootDir, uniqueString, getJobCancelStatus, getIPV4Address } from '../../common/utils';
import {
TrialJobDetail, TrialJobMetric, NNIManagerIpConfig
} from '../../common/trainingService';
import { KubernetesTrialJobDetail, KubernetesScriptFormat } from './kubernetesData';
import { KubernetesClusterConfig } from './kubernetesConfig';
import { GeneralK8sClient, KubernetesCRDClient } from './kubernetesApiClient';
import { AzureStorageClientUtility } from './azureStorageClientUtils';
import { KubernetesJobRestServer } from './kubernetesJobRestServer';
import { String } from 'typescript-string-operations';
import * as azureStorage from 'azure-storage';
var azure = require('azure-storage');
var base64 = require('js-base64').Base64;
abstract class KubernetesTrainingService {
protected readonly NNI_KUBERNETES_TRIAL_LABEL: string = 'nni-kubernetes-trial';
protected readonly log!: Logger;
protected readonly metricsEmitter: EventEmitter;
protected readonly trialJobsMap: Map<string, KubernetesTrialJobDetail>;
/** experiment root dir in NFS */
protected readonly trialLocalNFSTempFolder: string;
protected stopping: boolean = false;
protected experimentId! : string;
protected nextTrialSequenceId: number;
protected kubernetesRestServerPort?: number;
protected readonly CONTAINER_MOUNT_PATH: string;
protected azureStorageClient?: azureStorage.FileService;
protected azureStorageShare?: string;
protected azureStorageSecretName?: string;
protected azureStorageAccountName?: string;
protected nniManagerIpConfig?: NNIManagerIpConfig;
protected readonly genericK8sClient: GeneralK8sClient;
protected kubernetesCRDClient?: KubernetesCRDClient;
protected kubernetesJobRestServer?: KubernetesJobRestServer;
protected kubernetesClusterConfig?: KubernetesClusterConfig;
constructor() {
this.log = getLogger();
this.metricsEmitter = new EventEmitter();
this.trialJobsMap = new Map<string, KubernetesTrialJobDetail>();
this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.experimentId = getExperimentId();
this.nextTrialSequenceId = -1;
this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.genericK8sClient = new GeneralK8sClient();
}
public generatePodResource(memory: number, cpuNum: number, gpuNum: number) {
return {
'memory': `${memory}Mi`,
'cpu': `${cpuNum}`,
'nvidia.com/gpu': `${gpuNum}`
}
}
public listTrialJobs(): Promise<TrialJobDetail[]> {
const jobs: TrialJobDetail[] = [];
this.trialJobsMap.forEach(async (value: KubernetesTrialJobDetail, key: string) => {
if (value.form.jobType === 'TRIAL') {
jobs.push(await this.getTrialJob(key));
}
});
return Promise.resolve(jobs);
}
public getTrialJob(trialJobId: string): Promise<TrialJobDetail> {
const kubernetesTrialJob: TrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if (!kubernetesTrialJob) {
return Promise.reject(`trial job ${trialJobId} not found`)
}
return Promise.resolve(kubernetesTrialJob);
}
public addTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) {
this.metricsEmitter.on('metric', listener);
}
public removeTrialJobMetricListener(listener: (metric: TrialJobMetric) => void) {
this.metricsEmitter.off('metric', listener);
}
public get isMultiPhaseJobSupported(): boolean {
return false;
}
public getClusterMetadata(key: string): Promise<string> {
return Promise.resolve('');
}
public get MetricsEmitter() : EventEmitter {
return this.metricsEmitter;
}
protected generateSequenceId(): number {
if (this.nextTrialSequenceId === -1) {
this.nextTrialSequenceId = getInitTrialSequenceId();
}
return this.nextTrialSequenceId++;
}
protected async createAzureStorage(vaultName: string, valutKeyName: string, accountName: string, azureShare: string): Promise<void> {
try {
const result = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`);
if(result.stderr) {
const errorMessage: string = result.stderr;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
const storageAccountKey =JSON.parse(result.stdout).value;
//create storage client
this.azureStorageClient = azure.createFileService(this.azureStorageAccountName, storageAccountKey);
await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare);
//create sotrage secret
this.azureStorageSecretName = 'nni-secret-' + uniqueString(8).toLowerCase();
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
kind: 'Secret',
metadata: {
name: this.azureStorageSecretName,
namespace: 'default',
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: getExperimentId()
}
},
type: 'Opaque',
data: {
azurestorageaccountname: base64.encode(this.azureStorageAccountName),
azurestorageaccountkey: base64.encode(storageAccountKey)
}
}
);
} catch(error) {
this.log.error(error);
return Promise.reject(error);
}
return Promise.resolve();
}
/**
* Genereate run script for different roles(like worker or ps)
* @param trialJobId trial job id
* @param trialWorkingFolder working folder
* @param command
* @param trialSequenceId sequence id
*/
protected generateRunScript(platform: string, trialJobId: string, trialWorkingFolder: string,
command: string, trialSequenceId: string, roleName: string, gpuNum: number): string {
let nvidia_script: string = '';
// Nvidia devcie plugin for K8S has a known issue that requesting zero GPUs allocates all GPUs
// Refer https://github.com/NVIDIA/k8s-device-plugin/issues/61
// So we have to explicitly set CUDA_VISIBLE_DEVICES to empty if user sets gpuNum to 0 in NNI config file
if(gpuNum === 0) {
nvidia_script = `export CUDA_VISIBLE_DEVICES='0'`;
}
const nniManagerIp = this.nniManagerIpConfig?this.nniManagerIpConfig.nniManagerIp:getIPV4Address();
const runScript: string = String.Format(
KubernetesScriptFormat,
platform,
trialJobId,
path.join(trialWorkingFolder, 'output', `${roleName}_output`),
trialJobId,
getExperimentId(),
trialWorkingFolder,
trialSequenceId,
nvidia_script,
command,
nniManagerIp,
this.kubernetesRestServerPort
);
return runScript;
}
protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise<void> {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
} catch(error) {
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`;
this.log.error(mountError);
return Promise.reject(mountError);
}
return Promise.resolve();
}
public async cancelTrialJob(trialJobId: string, isEarlyStopped: boolean = false): Promise<void> {
const trialJobDetail : KubernetesTrialJobDetail | undefined = this.trialJobsMap.get(trialJobId);
if(!trialJobDetail) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} not found`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
if(!this.kubernetesCRDClient) {
const errorMessage: string = `CancelTrialJob: trial job id ${trialJobId} failed because operatorClient is undefined`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
try {
await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
[
['app', this.NNI_KUBERNETES_TRIAL_LABEL],
['expId', getExperimentId()],
['trialId', trialJobId]
]
));
} catch(err) {
const errorMessage: string = `Delete trial ${trialJobId} failed: ${err}`;
this.log.error(errorMessage);
return Promise.reject(errorMessage);
}
trialJobDetail.endTime = Date.now();
trialJobDetail.status = getJobCancelStatus(isEarlyStopped);
return Promise.resolve();
}
public async cleanUp(): Promise<void> {
this.stopping = true;
// First, cancel all running kubernetes jobs
for(let [trialJobId, kubernetesTrialJob] of this.trialJobsMap) {
if(['RUNNING', 'WAITING', 'UNKNOWN'].includes(kubernetesTrialJob.status)) {
try {
await this.cancelTrialJob(trialJobId);
} catch(error) {} // DONT throw error during cleanup
kubernetesTrialJob.status = 'SYS_CANCELED';
}
}
// Delete all kubernetes jobs whose expId label is current experiment id
try {
if(this.kubernetesCRDClient) {
await this.kubernetesCRDClient.deleteKubernetesJob(new Map(
[
['app', this.NNI_KUBERNETES_TRIAL_LABEL],
['expId', getExperimentId()]
]
));
}
} catch(error) {
this.log.error(`Delete kubernetes job with label: app=${this.NNI_KUBERNETES_TRIAL_LABEL},expId=${getExperimentId()} failed, error is ${error}`);
}
// Unmount NFS
try {
await cpp.exec(`sudo umount ${this.trialLocalNFSTempFolder}`);
} catch(error) {
this.log.error(`Unmount ${this.trialLocalNFSTempFolder} failed, error is ${error}`);
}
// Stop kubernetes rest server
if(!this.kubernetesJobRestServer) {
throw new Error('kubernetesJobRestServer not initialized!');
}
try {
await this.kubernetesJobRestServer.stop();
this.log.info('Kubernetes Training service rest server stopped successfully.');
} catch (error) {
this.log.error(`Kubernetes Training service rest server stopped failed, error: ${error.message}`);
Promise.reject(error);
}
return Promise.resolve();
}
}
export { KubernetesTrainingService }
......@@ -26,7 +26,7 @@ import * as tmp from 'tmp';
import * as component from '../../common/component';
import { cleanupUnitTest, prepareUnitTest } from '../../common/utils';
import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey';
import { KubeflowTrainingService } from '../kubeflow/kubeflowTrainingService';
import { KubeflowTrainingService } from '../kubernetes/kubeflow/kubeflowTrainingService';
// TODO: copy mockedTrail.py to local folder
const localCodeDir: string = tmp.dirSync().name
......
......@@ -27,7 +27,7 @@ if env_args.platform is None:
from .standalone import *
elif env_args.platform == 'unittest':
from .test import *
elif env_args.platform in ('local', 'remote', 'pai', 'kubeflow'):
elif env_args.platform in ('local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller'):
from .local import *
else:
raise RuntimeError('Unknown platform %s' % env_args.platform)
#!/bin/bash
ip="nni@104.210.63.241"
key="id_rsa"
chmod 600 $key
echo "Initializing remote machine..."
yes | ssh -i $key $ip "rm -rf pynni"
echo "Copy nni sdk to remote machine..."
scp -i $key -r ../src/sdk/pynni $ip:~
echo "Install nni sdk in remote machine..."
ssh -i $key $ip "cd pynni && python3 -m pip install --user ."
\ No newline at end of file
......@@ -28,7 +28,7 @@ Optional('description'): str,
'trialConcurrency': And(int, lambda n: 1 <=n <= 999999),
Optional('maxExecDuration'): Regex(r'^[1-9][0-9]*[s|m|h|d]$'),
Optional('maxTrialNum'): And(int, lambda x: 1 <= x <= 99999),
'trainingServicePlatform': And(str, lambda x: x in ['remote', 'local', 'pai', 'kubeflow']),
'trainingServicePlatform': And(str, lambda x: x in ['remote', 'local', 'pai', 'kubeflow', 'frameworkcontroller']),
Optional('searchSpacePath'): os.path.exists,
Optional('multiPhase'): bool,
Optional('multiThread'): bool,
......@@ -184,6 +184,46 @@ kubeflow_config_schema = {
})
}
frameworkcontroller_trial_schema = {
'trial':{
'codeDir': os.path.exists,
'taskRoles': [{
'name': str,
'taskNum': int,
'frameworkAttemptCompletionPolicy': {
'minFailedTaskCount': int,
'minSucceededTaskCount': int
},
'command': str,
'gpuNum': And(int, lambda x: 0 <= x <= 99999),
'cpuNum': And(int, lambda x: 0 <= x <= 99999),
'memoryMB': int,
'image': str
}]
}
}
frameworkcontroller_config_schema = {
'frameworkcontrollerConfig':Or({
Optional('storage'): Or('nfs', 'azureStorage'),
'nfs': {
'server': str,
'path': str
}
},{
Optional('storage'): Or('nfs', 'azureStorage'),
'keyVault': {
'vaultName': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),
'name': Regex('([0-9]|[a-z]|[A-Z]|-){1,127}')
},
'azureStorage': {
'accountName': Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),
'azureShare': Regex('([0-9]|[a-z]|[A-Z]|-){3,63}')
}
})
}
machine_list_schima = {
Optional('machineList'):[Or({
'ip': str,
......@@ -206,3 +246,5 @@ REMOTE_CONFIG_SCHEMA = Schema({**common_schema, **common_trial_schema, **machine
PAI_CONFIG_SCHEMA = Schema({**common_schema, **pai_trial_schema, **pai_config_schema})
KUBEFLOW_CONFIG_SCHEMA = Schema({**common_schema, **kubeflow_trial_schema, **kubeflow_config_schema})
FRAMEWORKCONTROLLER_CONFIG_SCHEMA = Schema({**common_schema, **frameworkcontroller_trial_schema, **frameworkcontroller_config_schema})
......@@ -188,6 +188,25 @@ def set_kubeflow_config(experiment_config, port, config_file_name):
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_frameworkcontroller_config(experiment_config, port, config_file_name):
'''set kubeflow configuration'''
frameworkcontroller_config_data = dict()
frameworkcontroller_config_data['frameworkcontroller_config'] = experiment_config['frameworkcontrollerConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(frameworkcontroller_config_data), 20)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message
def set_experiment(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict()
......@@ -233,6 +252,11 @@ def set_experiment(experiment_config, mode, port, config_file_name):
{'key': 'kubeflow_config', 'value': experiment_config['kubeflowConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'frameworkcontroller':
request_data['clusterMetaData'].append(
{'key': 'frameworkcontroller_config', 'value': experiment_config['frameworkcontrollerConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
response = rest_post(experiment_url(port), json.dumps(request_data), 20)
if check_response(response):
......@@ -337,7 +361,23 @@ def launch_experiment(args, experiment_config, mode, config_file_name, experimen
if err_msg:
print_error('Failed! Error is: {}'.format(err_msg))
try:
cmds = ['pkill', '-P', str(rest_process.pid)]
cmds = ['pkill', str(rest_process.pid)]
call(cmds)
except Exception:
raise Exception(ERROR_INFO % 'Restful server stopped!')
exit(1)
#set kubeflow config
if experiment_config['trainingServicePlatform'] == 'frameworkcontroller':
print_normal('Setting frameworkcontroller config...')
config_result, err_msg = set_frameworkcontroller_config(experiment_config, args.port, config_file_name)
if config_result:
print_normal('Successfully set frameworkcontroller config!')
else:
if err_msg:
print_error('Failed! Error is: {}'.format(err_msg))
try:
cmds = ['pkill', str(rest_process.pid)]
call(cmds)
except Exception:
raise Exception(ERROR_INFO % 'Restful server stopped!')
......
......@@ -20,7 +20,7 @@
import os
import json
from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA, KUBEFLOW_CONFIG_SCHEMA
from .config_schema import LOCAL_CONFIG_SCHEMA, REMOTE_CONFIG_SCHEMA, PAI_CONFIG_SCHEMA, KUBEFLOW_CONFIG_SCHEMA, FRAMEWORKCONTROLLER_CONFIG_SCHEMA
from .common_utils import get_json_content, print_error, print_warning
def expand_path(experiment_config, key):
......@@ -121,14 +121,15 @@ def validate_kubeflow_operators(experiment_config):
def validate_common_content(experiment_config):
'''Validate whether the common values in experiment_config is valid'''
if not experiment_config.get('trainingServicePlatform') or \
experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai', 'kubeflow']:
experiment_config.get('trainingServicePlatform') not in ['local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller']:
print_error('Please set correct trainingServicePlatform!')
exit(1)
schema_dict = {
'local': LOCAL_CONFIG_SCHEMA,
'remote': REMOTE_CONFIG_SCHEMA,
'pai': PAI_CONFIG_SCHEMA,
'kubeflow': KUBEFLOW_CONFIG_SCHEMA
'kubeflow': KUBEFLOW_CONFIG_SCHEMA,
'frameworkcontroller': FRAMEWORKCONTROLLER_CONFIG_SCHEMA
}
try:
schema_dict.get(experiment_config['trainingServicePlatform']).validate(experiment_config)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment