Unverified Commit 9dee8e92 authored by J-shang's avatar J-shang Committed by GitHub
Browse files

fix kubeflow pipeline (#4767)

parent 689a9780
......@@ -128,7 +128,7 @@ NNI makes AutoML techniques plug-and-play
.. codesnippetcard::
:icon: ../img/thumbnails/quantization-small.svg
:title: Quantization
:link: tutorials/quantization_speedup
:link: tutorials/quantization_quick_start_mnist
.. code-block::
......
.. f2a86f83def6c4b2e35ba50ce2487deb
.. dbd41cab307bcd76cc747b3d478709b8
NNI 文档
=================
......
......@@ -5,7 +5,7 @@
Change Log
==========
Release 2.7 - 4/14/2022
Release 2.7 - 4/18/2022
-----------------------
Documentation
......
......@@ -46,6 +46,7 @@ class FrameworkControllerConfig(TrainingServiceConfig):
service_account_name: Optional[str]
task_roles: List[FrameworkControllerRoleConfig]
reuse_mode: Optional[bool] = True
namespace: str = 'default'
def _canonicalize(self, parents):
super()._canonicalize(parents)
......
......@@ -43,6 +43,7 @@ class KubeflowConfig(TrainingServiceConfig):
ps: Optional[KubeflowRoleConfig] = None
master: Optional[KubeflowRoleConfig] = None
reuse_mode: Optional[bool] = True #set reuse mode as true for v2 config
namespace: str = 'default'
def _canonicalize(self, parents):
super()._canonicalize(parents)
......
......@@ -359,6 +359,7 @@ kubeflow_config_schema = {
'path': setType('path', str)
},
Optional('reuse'): setType('reuse', bool),
Optional('namespace'): setType('namespace', str),
}, {
'operator': setChoice('operator', 'tf-operator', 'pytorch-operator'),
'apiVersion': setType('apiVersion', str),
......@@ -377,6 +378,7 @@ kubeflow_config_schema = {
},
Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999),
Optional('reuse'): setType('reuse', bool),
Optional('namespace'): setType('namespace', str),
})
}
......
{
"lr":{"_type":"choice", "_value":[0.1, 0.01, 0.001, 0.0001]},
"optimizer":{"_type":"choice", "_value":["SGD", "Adadelta", "Adagrad", "Adam", "Adamax"]},
"model":{"_type":"choice", "_value":["vgg", "resnet18"]}
"model":{"_type":"choice", "_value":["vgg"]}
}
......@@ -18,6 +18,7 @@ kubeflow:
azureStorage:
accountName:
azureShare:
namespace: kubeflow
trial:
worker:
replicas: 1
......@@ -35,7 +36,7 @@ frameworkcontroller:
maxTrialNum: 2
trialConcurrency: 2
frameworkcontrollerConfig:
serviceAccountName: frameworkbarrier
serviceAccountName: frameworkcontroller
storage: azureStorage
keyVault:
vaultName:
......@@ -43,6 +44,7 @@ frameworkcontroller:
azureStorage:
accountName:
azureShare:
namespace: kubeflow
trial:
taskRoles:
- name: worker
......
......@@ -20,6 +20,7 @@ kubeflow:
trainingService:
reuseMode: true
platform: kubeflow
namespace: kubeflow
worker:
command:
code_directory:
......@@ -44,6 +45,7 @@ frameworkcontroller:
trainingService:
reuseMode: true
platform: frameworkcontroller
namespace: kubeflow
serviceAccountName: frameworkcontroller
taskRoles:
- name: worker
......
......@@ -122,12 +122,18 @@ def print_file_content(filepath):
print(content, flush=True)
def print_trial_job_log(training_service, trial_jobs_url):
trial_jobs = get_trial_jobs(trial_jobs_url)
for trial_job in trial_jobs:
trial_log_dir = os.path.join(get_experiment_dir(EXPERIMENT_URL), 'trials', trial_job['trialJobId'])
trial_log_root = os.path.join(get_experiment_dir(EXPERIMENT_URL), 'trials')
if not os.path.exists(trial_log_root):
print('trial log folder does not exist: {}'.format(trial_log_root), flush=True)
return
folders = os.listdir(trial_log_root)
for name in folders:
trial_log_dir = os.path.join(trial_log_root, name)
log_files = ['stderr', 'trial.log'] if training_service == 'local' else ['stdout_log_collection.log']
for log_file in log_files:
print_file_content(os.path.join(trial_log_dir, log_file))
log_file_path = os.path.join(trial_log_dir, log_file)
if os.path.exists(log_file_path):
print_file_content(log_file_path)
def print_experiment_log(experiment_id):
log_dir = get_nni_log_dir(experiment_id=experiment_id)
......
......@@ -132,6 +132,7 @@ export interface KubeflowConfig extends TrainingServiceConfig {
master?: KubeflowRoleConfig;
reuseMode: boolean;
maxTrialNumberPerGpu?: number;
namespace?: string;
}
export interface FrameworkControllerTaskRoleConfig {
......@@ -156,7 +157,7 @@ export interface FrameworkControllerConfig extends TrainingServiceConfig {
taskRoles: FrameworkControllerTaskRoleConfig[];
reuseMode: boolean;
maxTrialNumberPerGpu?: number;
namespace?: 'default';
namespace?: string;
apiVersion?: string;
}
......
......@@ -52,7 +52,8 @@ if __name__ == "__main__":
print('stop_result:failed')
exit(0)
loop_count += 1
time.sleep(500)
time.sleep(5)
status = run.get_status()
print('stop_result:success')
exit(0)
elif line == 'receive':
......
......@@ -11,7 +11,7 @@ class AdlClientV1 extends KubernetesCRDClient {
/**
* constructor, to initialize adl CRD definition
*/
protected readonly namespace: string;
public readonly namespace: string;
public constructor(namespace: string) {
super();
......
......@@ -118,7 +118,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
} else {
configTaskRoles = this.parseCustomTaskRoles(this.fcTemplate.spec.taskRoles)
}
const namespace = this.fcClusterConfig.namespace ? this.fcClusterConfig.namespace : "default";
const namespace = this.fcClusterConfig.namespace ?? "default";
this.genericK8sClient.setNamespace = namespace;
if (this.kubernetesRestServerPort === undefined) {
......@@ -134,7 +134,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
const trialJobId: string = uniqueString(5);
// Set trial's NFS working folder
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId);
let frameworkcontrollerJobName: string = `nniexp${this.experimentId}trial${trialJobId}`.toLowerCase();
let frameworkcontrollerJobConfig: any;
......@@ -204,6 +204,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
let namespace: string | undefined;
this.fcClusterConfig = FrameworkControllerClusterConfigFactory
.generateFrameworkControllerClusterConfig(frameworkcontrollerClusterJsonObject);
this.genericK8sClient.setNamespace = this.fcClusterConfig.namespace ?? "default";
if (this.fcClusterConfig.storageType === 'azureStorage') {
const azureFrameworkControllerClusterConfig: FrameworkControllerClusterConfigAzure =
<FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
......@@ -346,8 +347,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
for (const taskRole of configTaskRoles) {
const runScriptContent: string =
await this.generateRunScript('frameworkcontroller', trialJobId, trialWorkingFolder,
this.generateCommandScript(configTaskRoles, taskRole.command), form.sequenceId.toString(),
taskRole.name, taskRole.gpuNum ? taskRole.gpuNum : 0);
this.generateCommandScript(configTaskRoles, taskRole.command),
form.sequenceId.toString(), taskRole.name, taskRole.gpuNum ? taskRole.gpuNum : 0);
await fs.promises.writeFile(path.join(trialLocalTempFolder, `run_${taskRole.name}.sh`), runScriptContent, {encoding: 'utf8'});
}
......@@ -439,7 +440,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
kind: 'Framework',
metadata: {
name: frameworkcontrollerJobName,
namespace: this.fcClusterConfig.namespace ? this.fcClusterConfig.namespace : "default",
namespace: this.fcClusterConfig.namespace ?? "default",
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: getExperimentId(),
......
......@@ -17,7 +17,7 @@ class TFOperatorClientV1Alpha2 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1alpha2.namespaces('default').tfjobs;
return this.client.apis['kubeflow.org'].v1alpha2.namespaces(this.namespace).tfjobs;
}
public get containerName(): string {
......@@ -36,7 +36,7 @@ class TFOperatorClientV1Beta1 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1beta1.namespaces('default').tfjobs;
return this.client.apis['kubeflow.org'].v1beta1.namespaces(this.namespace).tfjobs;
}
public get containerName(): string {
......@@ -55,7 +55,7 @@ class TFOperatorClientV1Beta2 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1beta2.namespaces('default').tfjobs;
return this.client.apis['kubeflow.org'].v1beta2.namespaces(this.namespace).tfjobs;
}
public get containerName(): string {
......@@ -74,7 +74,7 @@ class TFOperatorClientV1 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1.namespaces('default').tfjobs;
return this.client.apis['kubeflow.org'].v1.namespaces(this.namespace).tfjobs;
}
public get containerName(): string {
......@@ -92,7 +92,7 @@ class PyTorchOperatorClientV1 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1.namespaces('default').pytorchjobs;
return this.client.apis['kubeflow.org'].v1.namespaces(this.namespace).pytorchjobs;
}
public get containerName(): string {
......@@ -110,7 +110,7 @@ class PyTorchOperatorClientV1Alpha2 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1alpha2.namespaces('default').pytorchjobs;
return this.client.apis['kubeflow.org'].v1alpha2.namespaces(this.namespace).pytorchjobs;
}
public get containerName(): string {
......@@ -129,7 +129,7 @@ class PyTorchOperatorClientV1Beta1 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1beta1.namespaces('default').pytorchjobs;
return this.client.apis['kubeflow.org'].v1beta1.namespaces(this.namespace).pytorchjobs;
}
public get containerName(): string {
......@@ -148,7 +148,7 @@ class PyTorchOperatorClientV1Beta2 extends KubernetesCRDClient {
}
protected get operator(): any {
return this.client.apis['kubeflow.org'].v1beta2.namespaces('default').pytorchjobs;
return this.client.apis['kubeflow.org'].v1beta2.namespaces(this.namespace).pytorchjobs;
}
public get containerName(): string {
......
......@@ -18,8 +18,8 @@ export type OperatorApiVersion = 'v1alpha2' | 'v1beta1' | 'v1beta2' | 'v1';
*/
export class KubeflowClusterConfig extends KubernetesClusterConfig {
public readonly operator: KubeflowOperator;
constructor(apiVersion: string, operator: KubeflowOperator) {
super(apiVersion);
constructor(apiVersion: string, operator: KubeflowOperator, namespace?: string) {
super(apiVersion, undefined, namespace);
this.operator = operator;
}
}
......@@ -30,9 +30,10 @@ export class KubeflowClusterConfigNFS extends KubernetesClusterConfigNFS {
operator: KubeflowOperator,
apiVersion: string,
nfs: NFSConfig,
storage?: KubernetesStorageKind
storage?: KubernetesStorageKind,
namespace?: string
) {
super(apiVersion, nfs, storage);
super(apiVersion, nfs, storage, namespace);
this.operator = operator;
}
......@@ -48,7 +49,8 @@ export class KubeflowClusterConfigNFS extends KubernetesClusterConfigNFS {
kubeflowClusterConfigObjectNFS.operator,
kubeflowClusterConfigObjectNFS.apiVersion,
kubeflowClusterConfigObjectNFS.nfs,
kubeflowClusterConfigObjectNFS.storage
kubeflowClusterConfigObjectNFS.storage,
kubeflowClusterConfigObjectNFS.namespace
);
}
}
......@@ -61,9 +63,10 @@ export class KubeflowClusterConfigAzure extends KubernetesClusterConfigAzure {
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind
storage?: KubernetesStorageKind,
namespace?: string
) {
super(apiVersion, keyVault, azureStorage, storage);
super(apiVersion, keyVault, azureStorage, storage, undefined, namespace);
this.operator = operator;
}
......@@ -79,7 +82,8 @@ export class KubeflowClusterConfigAzure extends KubernetesClusterConfigAzure {
kubeflowClusterConfigObjectAzure.apiVersion,
kubeflowClusterConfigObjectAzure.keyVault,
kubeflowClusterConfigObjectAzure.azureStorage,
kubeflowClusterConfigObjectAzure.storage
kubeflowClusterConfigObjectAzure.storage,
kubeflowClusterConfigObjectAzure.namespace
);
}
}
......
......@@ -14,7 +14,7 @@ export class KubeflowJobRestServer extends KubernetesJobRestServer {
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor() {
super(component.get(KubeflowTrainingService));
constructor(kubeflowTrainingService: KubeflowTrainingService) {
super(kubeflowTrainingService);
}
}
......@@ -69,7 +69,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
}
if (this.kubernetesRestServerPort === undefined) {
const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
const restServer: KubeflowJobRestServer = new KubeflowJobRestServer(this);
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}
......@@ -81,7 +81,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
const trialJobId: string = uniqueString(5);
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId);
//prepare the runscript
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
//upload script files to sotrage
......@@ -120,6 +120,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG: {
const kubeflowClusterJsonObject: object = JSON.parse(value);
this.kubeflowClusterConfig = KubeflowClusterConfigFactory.generateKubeflowClusterConfig(kubeflowClusterJsonObject);
this.genericK8sClient.setNamespace = this.kubeflowClusterConfig.namespace ?? "default";
if (this.kubeflowClusterConfig.storageType === 'azureStorage') {
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName;
......@@ -137,6 +138,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
}
this.kubernetesCRDClient = KubeflowOperatorClientFactory.createClient(
this.kubeflowClusterConfig.operator, this.kubeflowClusterConfig.apiVersion);
this.kubernetesCRDClient.namespace = this.kubeflowClusterConfig.namespace ?? "default";
break;
}
case TrialConfigMetadataKey.TRIAL_CONFIG: {
......@@ -310,7 +312,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
// Generate kubeflow job resource config object
const kubeflowJobConfig: any = await this.generateKubeflowJobConfig(trialJobId, trialWorkingFolder, kubeflowJobName, workerPodResources,
nonWorkerResources);
this.log.info('kubeflowJobConfig:', kubeflowJobConfig);
return Promise.resolve(kubeflowJobConfig);
}
......@@ -368,7 +370,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
kind: this.kubernetesCRDClient.jobKind,
metadata: {
name: kubeflowJobName,
namespace: 'default',
namespace: this.kubernetesCRDClient.namespace,
labels: {
app: this.NNI_KUBERNETES_TRIAL_LABEL,
expId: getExperimentId(),
......
......@@ -150,6 +150,7 @@ abstract class KubernetesCRDClient {
protected readonly client: any;
protected readonly log: Logger = getLogger('KubernetesCRDClient');
protected crdSchema: any;
public namespace: string = 'default';
constructor() {
this.client = new Client1_10({config: getKubernetesConfig()});
......
......@@ -230,7 +230,7 @@ abstract class KubernetesTrainingService {
this.azureStorageSecretName = String.Format('nni-secret-{0}', uniqueString(8)
.toLowerCase());
const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
const namespace = this.genericK8sClient.getNamespace ?? "default";
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
......@@ -330,7 +330,7 @@ abstract class KubernetesTrainingService {
const body = fs.readFileSync(filePath).toString('base64');
const registrySecretName = String.Format('nni-secret-{0}', uniqueString(8)
.toLowerCase());
const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default"
const namespace = this.genericK8sClient.getNamespace ?? "default";
await this.genericK8sClient.createSecret(
{
apiVersion: 'v1',
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment