// Copyright (c) Microsoft Corporation. // Licensed under the MIT license. // eslint-disable-next-line @typescript-eslint/camelcase import {Client1_10, config} from 'kubernetes-client'; import {getLogger, Logger} from 'common/log'; /** * This function uses the environment variable * 'KUBERNETES_SERVICE_HOST' to determine whether * the code is running from within a kubernetes container. * If it is, it returns the in-cluster config * instead of the kubeconfig. */ function getKubernetesConfig(): any { if ('KUBERNETES_SERVICE_HOST' in process.env) { return config.getInCluster(); } else { return config.fromKubeconfig(); } } /** * Generic Kubernetes client, target version >= 1.9 */ class GeneralK8sClient { protected readonly client: any; protected readonly log: Logger = getLogger('GeneralK8sClient'); protected namespace: string = 'default'; constructor() { this.client = new Client1_10({config: getKubernetesConfig(), version: '1.9'}); this.client.loadSpec(); } public set setNamespace(namespace: string) { this.namespace = namespace; } public get getNamespace(): string { return this.namespace; } private matchStorageClass(response: any): string { const adlSupportedProvisioners: RegExp[] = [ new RegExp("microk8s.io/hostpath"), new RegExp(".*cephfs.csi.ceph.com"), new RegExp(".*azure.*"), new RegExp("\\b" + "efs" + "\\b") ] const templateLen = adlSupportedProvisioners.length, responseLen = response.items.length let i = 0, j = 0; for (; i < responseLen; i++) { const provisioner: string = response.items[i].provisioner for (; j < templateLen; j++) { if (provisioner.match(adlSupportedProvisioners[j])) { return response.items[i].metadata.name; } } } return "Not Found!"; } public async getStorageClass(): Promise { let result: Promise; const response: any = await this.client.apis["storage.k8s.io"].v1beta1.storageclasses.get() const storageClassType: string = this.matchStorageClass(response.body) if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { if (storageClassType != "Not Found!") { result = Promise.resolve(storageClassType); } else { result = Promise.reject("No StorageClasses are supported!") } } else { result = Promise.reject(`List storageclasses failed, statusCode is ${response.statusCode}`); } return result; } public async createDeployment(deploymentManifest: any): Promise { let result: Promise; const response: any = await this.client.apis.apps.v1.namespaces(this.namespace) .deployments.post({body: deploymentManifest}) if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { result = Promise.resolve(response.body.metadata.uid); } else { result = Promise.reject(`Create deployment failed, statusCode is ${response.statusCode}`); } return result; } public async deleteDeployment(deploymentName: string): Promise { let result: Promise; // TODO: change this hard coded deployment name after demo const response: any = await this.client.apis.apps.v1.namespaces(this.namespace) .deployment(deploymentName).delete(); if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { result = Promise.resolve(true); } else { result = Promise.reject(`Delete deployment failed, statusCode is ${response.statusCode}`); } return result; } public async createConfigMap(configMapManifest: any): Promise { let result: Promise; const response: any = await this.client.api.v1.namespaces(this.namespace) .configmaps.post({body: configMapManifest}); if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { result = Promise.resolve(true); } else { result = Promise.reject(`Create configMap failed, statusCode is ${response.statusCode}`); } return result; } public async createPersistentVolumeClaim(pvcManifest: any): Promise { let result: Promise; const response: any = await this.client.api.v1.namespaces(this.namespace) .persistentvolumeclaims.post({body: pvcManifest}); if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { result = Promise.resolve(true); } else { result = Promise.reject(`Create pvc failed, statusCode is ${response.statusCode}`); } return result; } public async createSecret(secretManifest: any): Promise { let result: Promise; const response: any = await this.client.api.v1.namespaces(this.namespace) .secrets.post({body: secretManifest}); if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { result = Promise.resolve(true); } else { result = Promise.reject(`Create secrets failed, statusCode is ${response.statusCode}`); } return result; } } /** * Kubernetes CRD client */ abstract class KubernetesCRDClient { protected readonly client: any; protected readonly log: Logger = getLogger('KubernetesCRDClient'); protected crdSchema: any; public namespace: string = 'default'; constructor() { this.client = new Client1_10({config: getKubernetesConfig()}); this.client.loadSpec(); } protected abstract get operator(): any; public abstract get containerName(): string; public get jobKind(): string { if (this.crdSchema && this.crdSchema.spec && this.crdSchema.spec.names && this.crdSchema.spec.names.kind) { return this.crdSchema.spec.names.kind; } else { throw new Error('KubeflowOperatorClient: getJobKind failed, kind is undefined in crd schema!'); } } public get apiVersion(): string { if (this.crdSchema && this.crdSchema.spec && this.crdSchema.spec.version) { return this.crdSchema.spec.version; } else { throw new Error('KubeflowOperatorClient: get apiVersion failed, version is undefined in crd schema!'); } } public async createKubernetesJob(jobManifest: any): Promise { let result: Promise; const response: any = await this.operator.post({body: jobManifest}); if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { result = Promise.resolve(true); } else { result = Promise.reject(`KubernetesApiClient createKubernetesJob failed, statusCode is ${response.statusCode}`); } return result; } //TODO : replace any public async getKubernetesJob(kubeflowJobName: string): Promise { let result: Promise; const response: any = await this.operator(kubeflowJobName) .get(); if (response.statusCode && (response.statusCode >= 200 && response.statusCode <= 299)) { result = Promise.resolve(response.body); } else { result = Promise.reject(`KubernetesApiClient getKubernetesJob failed, statusCode is ${response.statusCode}`); } return result; } public async deleteKubernetesJob(labels: Map): Promise { let result: Promise; // construct match query from labels for deleting tfjob const matchQuery: string = Array.from(labels.keys()) .map((labelKey: string) => `${labelKey}=${labels.get(labelKey)}`) .join(','); try { const deleteResult: any = await this.operator() .delete({ qs: { labelSelector: matchQuery, propagationPolicy: 'Background' } }); if (deleteResult.statusCode && deleteResult.statusCode >= 200 && deleteResult.statusCode <= 299) { result = Promise.resolve(true); } else { result = Promise.reject( `KubernetesApiClient, delete labels ${matchQuery} get wrong statusCode ${deleteResult.statusCode}`); } } catch (err) { result = Promise.reject(err); } return result; } } export {KubernetesCRDClient, GeneralK8sClient};