Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
nni
Commits
d48ad027
Unverified
Commit
d48ad027
authored
Jun 20, 2019
by
SparkSnail
Committed by
GitHub
Jun 20, 2019
Browse files
Merge pull request #184 from microsoft/master
merge master
parents
9352cc88
22993e5d
Changes
187
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
578 additions
and
465 deletions
+578
-465
src/nni_manager/package.json
src/nni_manager/package.json
+1
-0
src/nni_manager/rest_server/nniRestServer.ts
src/nni_manager/rest_server/nniRestServer.ts
+1
-1
src/nni_manager/rest_server/restHandler.ts
src/nni_manager/rest_server/restHandler.ts
+1
-1
src/nni_manager/rest_server/restValidationSchemas.ts
src/nni_manager/rest_server/restValidationSchemas.ts
+1
-1
src/nni_manager/training_service/common/clusterJobRestServer.ts
...i_manager/training_service/common/clusterJobRestServer.ts
+41
-35
src/nni_manager/training_service/common/containerJobData.ts
src/nni_manager/training_service/common/containerJobData.ts
+3
-3
src/nni_manager/training_service/common/gpuData.ts
src/nni_manager/training_service/common/gpuData.ts
+4
-4
src/nni_manager/training_service/common/jobMetrics.ts
src/nni_manager/training_service/common/jobMetrics.ts
+4
-1
src/nni_manager/training_service/common/trialConfig.ts
src/nni_manager/training_service/common/trialConfig.ts
+4
-4
src/nni_manager/training_service/common/util.ts
src/nni_manager/training_service/common/util.ts
+45
-39
src/nni_manager/training_service/kubernetes/azureStorageClientUtils.ts
...er/training_service/kubernetes/azureStorageClientUtils.ts
+109
-79
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerApiClient.ts
...netes/frameworkcontroller/frameworkcontrollerApiClient.ts
+15
-6
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerConfig.ts
...bernetes/frameworkcontroller/frameworkcontrollerConfig.ts
+30
-25
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerJobInfoCollector.ts
...rameworkcontroller/frameworkcontrollerJobInfoCollector.ts
+26
-18
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerJobRestServer.ts
...s/frameworkcontroller/frameworkcontrollerJobRestServer.ts
+5
-5
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts
...frameworkcontroller/frameworkcontrollerTrainingService.ts
+191
-171
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowApiClient.ts
...training_service/kubernetes/kubeflow/kubeflowApiClient.ts
+30
-21
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowConfig.ts
...er/training_service/kubernetes/kubeflow/kubeflowConfig.ts
+41
-28
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowJobInfoCollector.ts
...g_service/kubernetes/kubeflow/kubeflowJobInfoCollector.ts
+21
-18
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowJobRestServer.ts
...ning_service/kubernetes/kubeflow/kubeflowJobRestServer.ts
+5
-5
No files found.
src/nni_manager/package.json
View file @
d48ad027
...
...
@@ -33,6 +33,7 @@
"@types/chai-as-promised"
:
"^7.1.0"
,
"@types/express"
:
"^4.16.0"
,
"@types/glob"
:
"^7.1.1"
,
"@types/js-base64"
:
"^2.3.1"
,
"@types/mocha"
:
"^5.2.5"
,
"@types/node"
:
"10.12.18"
,
"@types/request"
:
"^2.47.1"
,
...
...
src/nni_manager/rest_server/nniRestServer.ts
View file @
d48ad027
...
...
@@ -31,7 +31,7 @@ import { createRestHandler } from './restHandler';
* NNI Main rest server, provides rest API to support
* # nnictl CLI tool
* # NNI WebUI
*
*
*/
@
component
.
Singleton
export
class
NNIRestServer
extends
RestServer
{
...
...
src/nni_manager/rest_server/restHandler.ts
View file @
d48ad027
...
...
@@ -146,7 +146,7 @@ class NNIRestHandler {
});
});
}
private
importData
(
router
:
Router
):
void
{
router
.
post
(
'
/experiment/import-data
'
,
(
req
:
Request
,
res
:
Response
)
=>
{
this
.
nniManager
.
importData
(
JSON
.
stringify
(
req
.
body
)).
then
(()
=>
{
...
...
src/nni_manager/rest_server/restValidationSchemas.ts
View file @
d48ad027
...
...
@@ -133,7 +133,7 @@ export namespace ValidationSchemas {
})
}),
nni_manager_ip
:
joi
.
object
({
nniManagerIp
:
joi
.
string
().
min
(
1
)
nniManagerIp
:
joi
.
string
().
min
(
1
)
})
}
};
...
...
src/nni_manager/training_service/common/clusterJobRestServer.ts
View file @
d48ad027
...
...
@@ -20,22 +20,24 @@
'
use strict
'
;
import
*
as
assert
from
'
assert
'
;
import
{
Request
,
Response
,
Router
}
from
'
express
'
;
// tslint:disable-next-line:no-implicit-dependencies
import
*
as
bodyParser
from
'
body-parser
'
;
import
{
Request
,
Response
,
Router
}
from
'
express
'
;
import
*
as
fs
from
'
fs
'
;
import
*
as
path
from
'
path
'
;
import
{
Writable
}
from
'
stream
'
;
import
{
String
}
from
'
typescript-string-operations
'
;
import
*
as
component
from
'
../../common/component
'
;
import
*
as
fs
from
'
fs
'
import
*
as
path
from
'
path
'
import
{
getBasePort
,
getExperimentId
}
from
'
../../common/experimentStartupInfo
'
;
import
{
RestServer
}
from
'
../../common/restServer
'
import
{
RestServer
}
from
'
../../common/restServer
'
;
import
{
getLogDir
}
from
'
../../common/utils
'
;
import
{
Writable
}
from
'
stream
'
;
/**
* Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update
*
*
*/
@
component
.
Singleton
export
abstract
class
ClusterJobRestServer
extends
RestServer
{
export
abstract
class
ClusterJobRestServer
extends
RestServer
{
private
readonly
API_ROOT_URL
:
string
=
'
/api/v1/nni-pai
'
;
private
readonly
NNI_METRICS_PATTERN
:
string
=
`NNISDK_MEb'(?<metrics>.*?)'`
;
...
...
@@ -51,22 +53,23 @@ export abstract class ClusterJobRestServer extends RestServer{
constructor
()
{
super
();
const
basePort
:
number
=
getBasePort
();
assert
(
basePort
&&
basePort
>
1024
);
this
.
port
=
basePort
+
1
;
assert
(
basePort
!==
undefined
&&
basePort
>
1024
);
this
.
port
=
basePort
+
1
;
}
public
get
clusterRestServerPort
():
number
{
if
(
!
this
.
port
)
{
if
(
this
.
port
===
undefined
)
{
throw
new
Error
(
'
PAI Rest server port is undefined
'
);
}
return
this
.
port
;
}
public
get
getErrorMessage
():
string
|
undefined
{
public
get
getErrorMessage
():
string
|
undefined
{
return
this
.
errorMessage
;
}
public
set
setEnableVersionCheck
(
versionCheck
:
boolean
)
{
this
.
enableVersionCheck
=
versionCheck
;
}
...
...
@@ -79,11 +82,15 @@ export abstract class ClusterJobRestServer extends RestServer{
this
.
app
.
use
(
this
.
API_ROOT_URL
,
this
.
createRestHandler
());
}
// Abstract method to handle trial metrics data
// tslint:disable-next-line:no-any
protected
abstract
handleTrialMetrics
(
jobId
:
string
,
trialMetrics
:
any
[])
:
void
;
// tslint:disable: no-unsafe-any no-any
private
createRestHandler
()
:
Router
{
const
router
:
Router
=
Router
();
// tslint:disable-next-line:typedef
router
.
use
((
req
:
Request
,
res
:
Response
,
next
)
=>
{
router
.
use
((
req
:
Request
,
res
:
Response
,
next
:
any
)
=>
{
this
.
log
.
info
(
`
${
req
.
method
}
:
${
req
.
url
}
: body:\n
${
JSON
.
stringify
(
req
.
body
,
undefined
,
4
)}
`
);
res
.
setHeader
(
'
Content-Type
'
,
'
application/json
'
);
next
();
...
...
@@ -92,7 +99,7 @@ export abstract class ClusterJobRestServer extends RestServer{
router
.
post
(
`/version/
${
this
.
expId
}
/:trialId`
,
(
req
:
Request
,
res
:
Response
)
=>
{
if
(
this
.
enableVersionCheck
)
{
try
{
const
checkResultSuccess
:
boolean
=
req
.
body
.
tag
===
'
VCSuccess
'
?
true
:
false
;
const
checkResultSuccess
:
boolean
=
req
.
body
.
tag
===
'
VCSuccess
'
?
true
:
false
;
if
(
this
.
versionCheckSuccess
!==
undefined
&&
this
.
versionCheckSuccess
!==
checkResultSuccess
)
{
this
.
errorMessage
=
'
Version check error, version check result is inconsistent!
'
;
this
.
log
.
error
(
this
.
errorMessage
);
...
...
@@ -103,7 +110,7 @@ export abstract class ClusterJobRestServer extends RestServer{
this
.
versionCheckSuccess
=
false
;
this
.
errorMessage
=
req
.
body
.
msg
;
}
}
catch
(
err
)
{
}
catch
(
err
)
{
this
.
log
.
error
(
`json parse metrics error:
${
err
}
`
);
res
.
status
(
500
);
res
.
send
(
err
.
message
);
...
...
@@ -122,8 +129,7 @@ export abstract class ClusterJobRestServer extends RestServer{
this
.
handleTrialMetrics
(
req
.
body
.
jobId
,
req
.
body
.
metrics
);
res
.
send
();
}
catch
(
err
)
{
}
catch
(
err
)
{
this
.
log
.
error
(
`json parse metrics error:
${
err
}
`
);
res
.
status
(
500
);
res
.
send
(
err
.
message
);
...
...
@@ -131,35 +137,37 @@ export abstract class ClusterJobRestServer extends RestServer{
});
router
.
post
(
`/stdout/
${
this
.
expId
}
/:trialId`
,
(
req
:
Request
,
res
:
Response
)
=>
{
if
(
this
.
enableVersionCheck
&&
!
this
.
versionCheckSuccess
&&
!
this
.
errorMessage
)
{
this
.
errorMessage
=
`Version check failed, didn't get version check response from trialKeeper, please check your NNI version in `
+
`NNIManager and TrialKeeper!`
if
(
this
.
enableVersionCheck
&&
(
this
.
versionCheckSuccess
===
undefined
||
!
this
.
versionCheckSuccess
)
&&
this
.
errorMessage
===
undefined
)
{
this
.
errorMessage
=
`Version check failed, didn't get version check response from trialKeeper,`
+
` please check your NNI version in NNIManager and TrialKeeper!`
;
}
const
trialLogPath
:
string
=
path
.
join
(
getLogDir
(),
`trial_
${
req
.
params
.
trialId
}
.log`
);
try
{
let
skipLogging
:
boolean
=
false
;
if
(
req
.
body
.
tag
===
'
trial
'
&&
req
.
body
.
msg
!==
undefined
)
{
const
metricsContent
=
req
.
body
.
msg
.
match
(
this
.
NNI_METRICS_PATTERN
);
if
(
metricsContent
&&
metricsContent
.
groups
)
{
this
.
handleTrialMetrics
(
req
.
params
.
trialId
,
[
metricsContent
.
groups
[
'
metrics
'
]]);
if
(
req
.
body
.
tag
===
'
trial
'
&&
req
.
body
.
msg
!==
undefined
)
{
const
metricsContent
:
any
=
req
.
body
.
msg
.
match
(
this
.
NNI_METRICS_PATTERN
);
if
(
metricsContent
&&
metricsContent
.
groups
)
{
const
key
:
string
=
'
metrics
'
;
this
.
handleTrialMetrics
(
req
.
params
.
trialId
,
[
metricsContent
.
groups
[
key
]]);
skipLogging
=
true
;
}
}
if
(
!
skipLogging
){
if
(
!
skipLogging
)
{
// Construct write stream to write remote trial's log into local file
// tslint:disable-next-line:non-literal-fs-path
const
writeStream
:
Writable
=
fs
.
createWriteStream
(
trialLogPath
,
{
flags
:
'
a+
'
,
encoding
:
'
utf8
'
,
autoClose
:
true
});
writeStream
.
write
(
req
.
body
.
msg
+
'
\n
'
);
writeStream
.
write
(
String
.
Format
(
'
{0}
\n
'
,
req
.
body
.
msg
)
);
writeStream
.
end
();
}
res
.
send
();
}
catch
(
err
)
{
}
catch
(
err
)
{
this
.
log
.
error
(
`json parse stdout data error:
${
err
}
`
);
res
.
status
(
500
);
res
.
send
(
err
.
message
);
...
...
@@ -168,7 +176,5 @@ export abstract class ClusterJobRestServer extends RestServer{
return
router
;
}
/** Abstract method to handle trial metrics data */
protected
abstract
handleTrialMetrics
(
jobId
:
string
,
trialMetrics
:
any
[])
:
void
;
}
\ No newline at end of file
// tslint:enable: no-unsafe-any no-any
}
src/nni_manager/training_service/common/containerJobData.ts
View file @
d48ad027
...
...
@@ -19,12 +19,12 @@
'
use strict
'
;
export
const
CONTAINER_INSTALL_NNI_SHELL_FORMAT
:
string
=
export
const
CONTAINER_INSTALL_NNI_SHELL_FORMAT
:
string
=
`#!/bin/bash
if python3 -c 'import nni' > /dev/null 2>&1; then
# nni module is already installed, skip
return
else
# Install nni
python3 -m pip install --user --upgrade nni
fi`
;
\ No newline at end of file
python3 -m pip install --user --upgrade nni
fi`
;
src/nni_manager/training_service/common/gpuData.ts
View file @
d48ad027
...
...
@@ -59,17 +59,17 @@ export class GPUSummary {
}
}
export
const
GPU_INFO_COLLECTOR_FORMAT_LINUX
:
string
=
export
const
GPU_INFO_COLLECTOR_FORMAT_LINUX
:
string
=
`
#!/bin/bash
export METRIC_OUTPUT_DIR={0}
echo $$ >{1}
python3 -m nni_gpu_tool.gpu_metrics_collector
`
`
;
export
const
GPU_INFO_COLLECTOR_FORMAT_WINDOWS
:
string
=
export
const
GPU_INFO_COLLECTOR_FORMAT_WINDOWS
:
string
=
`
$env:METRIC_OUTPUT_DIR="{0}"
$app = Start-Process "python" -ArgumentList "-m nni_gpu_tool.gpu_metrics_collector" -passthru -NoNewWindow
Write $app.ID | Out-File {1} -NoNewline -encoding utf8
`
\ No newline at end of file
`
;
src/nni_manager/training_service/common/jobMetrics.ts
View file @
d48ad027
...
...
@@ -21,7 +21,10 @@
import
{
TrialJobStatus
}
from
'
../../common/trainingService
'
;
// tslint:disable-next-line:max-classes-per-file
/**
* Trial job metrics class
* Representing trial job metrics properties
*/
export
class
JobMetrics
{
public
readonly
jobId
:
string
;
public
readonly
metrics
:
string
[];
...
...
src/nni_manager/training_service/common/trialConfig.ts
View file @
d48ad027
...
...
@@ -24,13 +24,13 @@
* Representing trial job configurable properties
*/
export
class
TrialConfig
{
/
**
Trail command
*/
/
/
Trail command
public
readonly
command
:
string
;
/
**
Code directory
*/
/
/
Code directory
public
readonly
codeDir
:
string
;
/
**
Required GPU number for trial job. The number should be in [0,100]
*/
/
/
Required GPU number for trial job. The number should be in [0,100]
public
readonly
gpuNum
:
number
;
/**
...
...
@@ -44,4 +44,4 @@ export class TrialConfig {
this
.
codeDir
=
codeDir
;
this
.
gpuNum
=
gpuNum
;
}
}
\ No newline at end of file
}
src/nni_manager/training_service/common/util.ts
View file @
d48ad027
import
{
getLogger
}
from
"
common/log
"
;
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
...
...
@@ -21,36 +19,36 @@ import { getLogger } from "common/log";
'
use strict
'
;
import
{
countFilesRecursively
}
from
'
../../common/utils
'
import
*
as
cpp
from
'
child-process-promise
'
;
import
*
as
cp
from
'
child_process
'
;
import
*
as
os
from
'
os
'
;
import
*
as
fs
from
'
fs
'
;
import
{
getNewLine
}
from
'
../../common/utils
'
;
import
{
GPU_INFO_COLLECTOR_FORMAT_LINUX
,
GPU_INFO_COLLECTOR_FORMAT_WINDOWS
}
from
'
./gpuData
'
;
import
*
as
os
from
'
os
'
;
import
*
as
path
from
'
path
'
;
import
{
String
}
from
'
typescript-string-operations
'
;
import
{
file
}
from
"
../../node_modules/@types/tmp
"
;
import
{
countFilesRecursively
,
getNewLine
}
from
'
../../common/utils
'
;
import
{
file
}
from
'
../../node_modules/@types/tmp
'
;
import
{
GPU_INFO_COLLECTOR_FORMAT_LINUX
,
GPU_INFO_COLLECTOR_FORMAT_WINDOWS
}
from
'
./gpuData
'
;
/**
* Validate codeDir, calculate file count recursively under codeDir, and throw error if any rule is broken
*
*
* @param codeDir codeDir in nni config file
* @returns file number under codeDir
*/
// tslint:disable: no-redundant-jsdoc
export
async
function
validateCodeDir
(
codeDir
:
string
)
:
Promise
<
number
>
{
let
fileCount
:
number
|
undefined
;
try
{
fileCount
=
await
countFilesRecursively
(
codeDir
);
}
catch
(
error
)
{
}
catch
(
error
)
{
throw
new
Error
(
`Call count file error:
${
error
}
`
);
}
if
(
fileCount
&&
fileCount
>
1000
)
{
const
errMessage
:
string
=
`Too many files(
${
fileCount
}
found}) in
${
codeDir
}
,`
if
(
fileCount
!==
undefined
&&
fileCount
>
1000
)
{
const
errMessage
:
string
=
`Too many files(
${
fileCount
}
found}) in
${
codeDir
}
,`
+
` please check if it's a valid code dir`
;
throw
new
Error
(
errMessage
);
throw
new
Error
(
errMessage
);
}
return
fileCount
;
...
...
@@ -58,7 +56,7 @@ export async function validateCodeDir(codeDir: string) : Promise<number> {
/**
* crete a new directory
* @param directory
* @param directory
*/
export
async
function
execMkdir
(
directory
:
string
):
Promise
<
void
>
{
if
(
process
.
platform
===
'
win32
'
)
{
...
...
@@ -66,6 +64,7 @@ export async function execMkdir(directory: string): Promise<void> {
}
else
{
await
cpp
.
exec
(
`mkdir -p
${
directory
}
`
);
}
return
Promise
.
resolve
();
}
...
...
@@ -80,12 +79,13 @@ export async function execCopydir(source: string, destination: string): Promise<
}
else
{
await
cpp
.
exec
(
`cp -r
${
source
}
${
destination
}
`
);
}
return
Promise
.
resolve
();
}
/**
* crete a new file
* @param filename
* @param filename
*/
export
async
function
execNewFile
(
filename
:
string
):
Promise
<
void
>
{
if
(
process
.
platform
===
'
win32
'
)
{
...
...
@@ -93,16 +93,17 @@ export async function execNewFile(filename: string): Promise<void> {
}
else
{
await
cpp
.
exec
(
`touch
${
filename
}
`
);
}
return
Promise
.
resolve
();
}
/**
* run script
* run script
using powershell or bash
* @param filePath
*/
export
function
exec
Script
(
filePath
:
string
):
cp
.
ChildProcess
{
export
function
run
Script
(
filePath
:
string
):
cp
.
ChildProcess
{
if
(
process
.
platform
===
'
win32
'
)
{
return
cp
.
exec
(
`powershell.exe -file
${
filePath
}
`
);
return
cp
.
exec
(
`powershell.exe
-ExecutionPolicy Bypass
-file
${
filePath
}
`
);
}
else
{
return
cp
.
exec
(
`bash
${
filePath
}
`
);
}
...
...
@@ -110,7 +111,7 @@ export function execScript(filePath: string): cp.ChildProcess {
/**
* output the last line of a file
* @param filePath
* @param filePath
*/
export
async
function
execTail
(
filePath
:
string
):
Promise
<
cpp
.
childProcessPromise
.
Result
>
{
let
cmdresult
:
cpp
.
childProcessPromise
.
Result
;
...
...
@@ -119,12 +120,13 @@ export async function execTail(filePath: string): Promise<cpp.childProcessPromis
}
else
{
cmdresult
=
await
cpp
.
exec
(
`tail -n 1
${
filePath
}
`
);
}
return
Promise
.
resolve
(
cmdresult
);
}
/**
* delete a directory
* @param directory
* @param directory
*/
export
async
function
execRemove
(
directory
:
string
):
Promise
<
void
>
{
if
(
process
.
platform
===
'
win32
'
)
{
...
...
@@ -132,12 +134,13 @@ export async function execRemove(directory: string): Promise<void> {
}
else
{
await
cpp
.
exec
(
`rm -rf
${
directory
}
`
);
}
return
Promise
.
resolve
();
}
/**
* kill a process
* @param directory
* @param directory
*/
export
async
function
execKill
(
pid
:
string
):
Promise
<
void
>
{
if
(
process
.
platform
===
'
win32
'
)
{
...
...
@@ -145,37 +148,39 @@ export async function execKill(pid: string): Promise<void> {
}
else
{
await
cpp
.
exec
(
`pkill -P
${
pid
}
`
);
}
return
Promise
.
resolve
();
}
/**
*
set
environment variable
*
get command of setting
environment variable
* @param variable
* @returns command string
* @returns command string
*/
export
function
setEnvironmentVariable
(
variable
:
{
key
:
string
;
value
:
string
}):
string
{
if
(
process
.
platform
===
'
win32
'
)
{
return
`$env:
${
variable
.
key
}
="
${
variable
.
value
}
"`
;
}
else
{
}
else
{
return
`export
${
variable
.
key
}
=
${
variable
.
value
}
`
;
}
}
/**
* Compress files in directory to tar file
* @param source
_p
ath
* @param tar
_p
ath
* @param source
P
ath
* @param tar
P
ath
*/
export
async
function
tarAdd
(
tar
_p
ath
:
string
,
source
_p
ath
:
string
):
Promise
<
void
>
{
export
async
function
tarAdd
(
tar
P
ath
:
string
,
source
P
ath
:
string
):
Promise
<
void
>
{
if
(
process
.
platform
===
'
win32
'
)
{
tar_path
=
tar_path
.
split
(
'
\\
'
).
join
(
'
\\\\
'
);
source_path
=
source_path
.
split
(
'
\\
'
).
join
(
'
\\\\
'
);
let
script
:
string
[]
=
[];
const
tarFilePath
:
string
=
tarPath
.
split
(
'
\\
'
)
.
join
(
'
\\\\
'
);
const
sourceFilePath
:
string
=
sourcePath
.
split
(
'
\\
'
)
.
join
(
'
\\\\
'
);
const
script
:
string
[]
=
[];
script
.
push
(
`import os`
,
`import tarfile`
,
String
.
Format
(
`tar = tarfile.open("{0}","w:gz")\r\nfor root,dir,files in os.walk("{1}"):`
,
tar
_p
ath
,
source
_p
ath
),
String
.
Format
(
`tar = tarfile.open("{0}","w:gz")\r\nfor root,dir,files in os.walk("{1}"):`
,
tar
FileP
ath
,
source
FileP
ath
),
` for file in files:`
,
` fullpath = os.path.join(root,file)`
,
` tar.add(fullpath, arcname=file)`
,
...
...
@@ -184,39 +189,40 @@ export async function tarAdd(tar_path: string, source_path: string): Promise<voi
const
tarScript
:
string
=
path
.
join
(
os
.
tmpdir
(),
'
tar.py
'
);
await
cpp
.
exec
(
`python
${
tarScript
}
`
);
}
else
{
await
cpp
.
exec
(
`tar -czf
${
tar
_p
ath
}
-C
${
source
_p
ath
}
.`
);
await
cpp
.
exec
(
`tar -czf
${
tar
P
ath
}
-C
${
source
P
ath
}
.`
);
}
return
Promise
.
resolve
();
}
/**
* generate script file name
* @param fileNamePrefix
* @param fileNamePrefix
*/
export
function
getScriptName
(
fileNamePrefix
:
string
):
string
{
if
(
process
.
platform
===
'
win32
'
)
{
return
fileNamePrefix
+
'
.ps1
'
;
return
String
.
Format
(
'
{0}.ps1
'
,
fileNamePrefix
)
;
}
else
{
return
fileNamePrefix
+
'
.sh
'
;
return
String
.
Format
(
'
{0}.sh
'
,
fileNamePrefix
)
;
}
}
/**
* generate script file
* @param gpuMetricCollectorScriptFolder
* @param gpuMetricCollectorScriptFolder
*/
export
function
getgpuMetricsCollectorScriptContent
(
gpuMetricCollectorScriptFolder
:
string
):
string
{
if
(
process
.
platform
===
'
win32
'
)
{
if
(
process
.
platform
===
'
win32
'
)
{
return
String
.
Format
(
GPU_INFO_COLLECTOR_FORMAT_WINDOWS
,
gpuMetricCollectorScriptFolder
,
path
.
join
(
gpuMetricCollectorScriptFolder
,
'
pid
'
)
,
path
.
join
(
gpuMetricCollectorScriptFolder
,
'
pid
'
)
);
}
else
{
return
String
.
Format
(
GPU_INFO_COLLECTOR_FORMAT_LINUX
,
gpuMetricCollectorScriptFolder
,
path
.
join
(
gpuMetricCollectorScriptFolder
,
'
pid
'
)
,
path
.
join
(
gpuMetricCollectorScriptFolder
,
'
pid
'
)
);
}
}
src/nni_manager/training_service/kubernetes/azureStorageClientUtils.ts
View file @
d48ad027
...
...
@@ -19,108 +19,126 @@
'
use strict
'
;
import
*
as
fs
from
'
fs
'
import
*
as
azureStorage
from
'
azure-storage
'
;
import
*
as
fs
from
'
fs
'
;
import
*
as
path
from
'
path
'
;
import
{
Deferred
}
from
'
ts-deferred
'
;
import
{
String
}
from
'
typescript-string-operations
'
;
import
{
getLogger
}
from
'
../../common/log
'
;
import
{
mkDirP
}
from
'
../../common/utils
'
;
// tslint:disable: no-redundant-jsdoc no-any no-unsafe-any
export
namespace
AzureStorageClientUtility
{
/**
* create azure share
* @param fileServerClient
* @param azureShare
* @param fileServerClient
* @param azureShare
*/
export
async
function
createShare
(
fileServerClient
:
any
,
azureShare
:
any
):
Promise
<
void
>
{
export
async
function
createShare
(
fileServerClient
:
any
,
azureShare
:
any
):
Promise
<
void
>
{
const
deferred
:
Deferred
<
void
>
=
new
Deferred
<
void
>
();
fileServerClient
.
createShareIfNotExists
(
azureShare
,
function
(
error
:
any
,
result
:
any
,
response
:
any
)
{
if
(
error
){
getLogger
().
error
(
`Create share failed:,
${
error
}
`
);
deferred
.
reject
(
error
)
}
else
{
deferred
.
resolve
()
fileServerClient
.
createShareIfNotExists
(
azureShare
,
(
error
:
any
,
result
:
any
,
response
:
any
)
=>
{
if
(
error
)
{
getLogger
()
.
error
(
`Create share failed:,
${
error
}
`
);
deferred
.
reject
(
error
);
}
else
{
deferred
.
resolve
();
}
})
});
return
deferred
.
promise
;
}
/**
* Create a new directory (NOT recursively) in azure file storage.
* @param fileServerClient
* @param azureFoler
* @param azureShare
* @param fileServerClient
* @param azureFoler
* @param azureShare
*/
export
async
function
createDirectory
(
fileServerClient
:
a
ny
,
azureFoler
:
any
,
azureShare
:
any
):
Promise
<
void
>
{
export
async
function
createDirectory
(
fileServerClient
:
a
zureStorage
.
FileService
,
azureFoler
:
any
,
azureShare
:
any
):
Promise
<
void
>
{
const
deferred
:
Deferred
<
void
>
=
new
Deferred
<
void
>
();
fileServerClient
.
createDirectoryIfNotExists
(
azureShare
,
azureFoler
,
function
(
error
:
any
,
result
:
any
,
response
:
any
)
{
if
(
error
){
getLogger
().
error
(
`Create directory failed:,
${
error
}
`
);
fileServerClient
.
createDirectoryIfNotExists
(
azureShare
,
azureFoler
,
(
error
:
any
,
result
:
any
,
response
:
any
)
=>
{
if
(
error
)
{
getLogger
()
.
error
(
`Create directory failed:,
${
error
}
`
);
deferred
.
reject
(
error
);
}
else
{
}
else
{
deferred
.
resolve
();
}
})
});
return
deferred
.
promise
;
}
/**
* Create a new directory recursively in azure file storage
* @param fileServerClient
* @param azureDirectory
* @param azureDirectory
*/
export
async
function
createDirectoryRecursive
(
fileServerClient
:
any
,
azureDirectory
:
any
,
azureShare
:
any
):
Promise
<
void
>
{
export
async
function
createDirectoryRecursive
(
fileServerClient
:
azureStorage
.
FileService
,
azureDirectory
:
string
,
azureShare
:
any
):
Promise
<
void
>
{
const
deferred
:
Deferred
<
void
>
=
new
Deferred
<
void
>
();
le
t
directories
=
azureDirectory
.
split
(
"
/
"
);
let
rootDirectory
=
""
for
(
le
t
directory
of
directories
){
cons
t
directories
:
string
[]
=
azureDirectory
.
split
(
'
/
'
);
let
rootDirectory
:
string
=
''
;
for
(
cons
t
directory
of
directories
)
{
rootDirectory
+=
directory
;
await
createDirectory
(
fileServerClient
,
rootDirectory
,
azureShare
);
rootDirectory
+=
'
/
'
;
}
deferred
.
resolve
();
return
deferred
.
promise
;
}
/**
* upload a file to azure storage
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
*/
async
function
uploadFileToAzure
(
fileServerClient
:
any
,
azureDirectory
:
any
,
azureFileName
:
any
,
azureShare
:
any
,
localFilePath
:
any
):
Promise
<
void
>
{
async
function
uploadFileToAzure
(
fileServerClient
:
any
,
azureDirectory
:
string
,
azureFileName
:
any
,
azureShare
:
any
,
localFilePath
:
string
):
Promise
<
void
>
{
const
deferred
:
Deferred
<
void
>
=
new
Deferred
<
void
>
();
await
fileServerClient
.
createFileFromLocalFile
(
azureShare
,
azureDirectory
,
azureFileName
,
localFilePath
,
function
(
error
:
any
,
result
:
any
,
response
:
any
)
{
if
(
error
){
getLogger
().
error
(
`Upload file failed:,
${
error
}
`
);
await
fileServerClient
.
createFileFromLocalFile
(
azureShare
,
azureDirectory
,
azureFileName
,
localFilePath
,
(
error
:
any
,
result
:
any
,
response
:
any
)
=>
{
if
(
error
)
{
getLogger
()
.
error
(
`Upload file failed:,
${
error
}
`
);
deferred
.
reject
(
error
);
}
else
{
}
else
{
deferred
.
resolve
();
}
})
});
return
deferred
.
promise
;
}
/**
* download a file from azure storage
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
* @param fileServerClient
* @param azureDirectory
* @param azureFileName
* @param azureShare
* @param localFilePath
*/
async
function
downloadFile
(
fileServerClient
:
any
,
azureDirectory
:
any
,
azureFileName
:
any
,
azureShare
:
any
,
localFilePath
:
any
):
Promise
<
void
>
{
async
function
downloadFile
(
fileServerClient
:
any
,
azureDirectory
:
string
,
azureFileName
:
any
,
azureShare
:
any
,
localFilePath
:
string
):
Promise
<
void
>
{
const
deferred
:
Deferred
<
void
>
=
new
Deferred
<
void
>
();
await
fileServerClient
.
getFileToStream
(
azureShare
,
azureDirectory
,
azureFileName
,
fs
.
createWriteStream
(
localFilePath
),
function
(
error
:
any
,
result
:
any
,
response
:
any
)
{
if
(
error
){
getLogger
().
error
(
`Download file failed:,
${
error
}
`
);
// tslint:disable-next-line:non-literal-fs-path
await
fileServerClient
.
getFileToStream
(
azureShare
,
azureDirectory
,
azureFileName
,
fs
.
createWriteStream
(
localFilePath
),
(
error
:
any
,
result
:
any
,
response
:
any
)
=>
{
if
(
error
)
{
getLogger
()
.
error
(
`Download file failed:,
${
error
}
`
);
deferred
.
reject
(
error
);
}
else
{
deferred
.
resolve
();
}
else
{
deferred
.
resolve
();
}
})
});
return
deferred
.
promise
;
}
...
...
@@ -131,67 +149,79 @@ export namespace AzureStorageClientUtility {
* @param azureShare : the azure share used
* @param localDirectory : local directory to be uploaded
*/
export
async
function
uploadDirectory
(
fileServerClient
:
any
,
azureDirectory
:
any
,
azureShare
:
any
,
localDirectory
:
any
):
Promise
<
void
>
{
// tslint:disable:non-literal-fs-path
export
async
function
uploadDirectory
(
fileServerClient
:
azureStorage
.
FileService
,
azureDirectory
:
string
,
azureShare
:
any
,
localDirectory
:
string
):
Promise
<
void
>
{
const
deferred
:
Deferred
<
void
>
=
new
Deferred
<
void
>
();
const
fileNameArray
:
string
[]
=
fs
.
readdirSync
(
localDirectory
);
await
createDirectoryRecursive
(
fileServerClient
,
azureDirectory
,
azureShare
);
for
(
le
t
fileName
of
fileNameArray
){
for
(
cons
t
fileName
of
fileNameArray
)
{
const
fullFilePath
:
string
=
path
.
join
(
localDirectory
,
fileName
);
try
{
if
(
fs
.
lstatSync
(
fullFilePath
).
isFile
())
{
if
(
fs
.
lstatSync
(
fullFilePath
)
.
isFile
())
{
await
uploadFileToAzure
(
fileServerClient
,
azureDirectory
,
fileName
,
azureShare
,
fullFilePath
);
}
else
{
// If filePath is a directory, recuisively copy it to azure
await
uploadDirectory
(
fileServerClient
,
azureDirectory
+
'
/
'
+
fileName
,
azureShare
,
fullFilePath
);
await
uploadDirectory
(
fileServerClient
,
String
.
Format
(
'
{0}/{1}
'
,
azureDirectory
,
fileName
)
,
azureShare
,
fullFilePath
);
}
}
catch
(
error
)
{
}
catch
(
error
)
{
deferred
.
reject
(
error
);
return
deferred
.
promise
;
}
}
// All files/directories are copied successfully, resolve
deferred
.
resolve
();
return
deferred
.
promise
;
}
/**
* downlod a directory from azure
* @param fileServerClient
* @param azureDirectory
* @param azureShare
* @param localDirectory
* @param fileServerClient
* @param azureDirectory
* @param azureShare
* @param localDirectory
*/
export
async
function
downloadDirectory
(
fileServerClient
:
any
,
azureDirectory
:
any
,
azureShare
:
any
,
localDirectory
:
any
):
Promise
<
void
>
{
export
async
function
downloadDirectory
(
fileServerClient
:
any
,
azureDirectory
:
string
,
azureShare
:
any
,
localDirectory
:
string
):
Promise
<
void
>
{
const
deferred
:
Deferred
<
void
>
=
new
Deferred
<
void
>
();
mkDirP
(
localDirectory
);
fileServerClient
.
listFilesAndDirectoriesSegmented
(
azureShare
,
azureDirectory
,
'
null
'
,
function
(
error
:
any
,
result
:
any
,
response
:
any
)
{
if
((
'
entries
'
in
result
)
===
false
){
getLogger
().
error
(
`list files failed, can't get entries in result`
);
await
mkDirP
(
localDirectory
);
fileServerClient
.
listFilesAndDirectoriesSegmented
(
azureShare
,
azureDirectory
,
'
null
'
,
async
(
error
:
any
,
result
:
any
,
response
:
any
)
=>
{
if
((
'
entries
'
in
result
)
===
false
)
{
getLogger
()
.
error
(
`list files failed, can't get entries in result`
);
throw
new
Error
(
`list files failed, can't get entries in result`
);
}
if
((
'
files
'
in
result
[
'
entries
'
])
===
false
){
getLogger
().
error
(
`list files failed, can't get files in result['entries']`
);
if
((
'
files
'
in
result
.
entries
)
===
false
)
{
getLogger
()
.
error
(
`list files failed, can't get files in result['entries']`
);
throw
new
Error
(
`list files failed, can't get files in result['entries']`
);
}
if
((
'
directories
'
in
result
[
'
directories
'
])
===
false
){
getLogger
().
error
(
`list files failed, can't get directories in result['entries']`
);
if
((
'
directories
'
in
result
.
directories
)
===
false
)
{
getLogger
()
.
error
(
`list files failed, can't get directories in result['entries']`
);
throw
new
Error
(
`list files failed, can't get directories in result['entries']`
);
}
for
(
var
fileName
of
result
[
'
entries
'
][
'
files
'
]
){
for
(
const
fileName
of
result
.
entries
.
files
)
{
const
fullFilePath
:
string
=
path
.
join
(
localDirectory
,
fileName
.
name
);
downloadFile
(
fileServerClient
,
azureDirectory
,
fileName
.
name
,
azureShare
,
fullFilePath
)
await
downloadFile
(
fileServerClient
,
azureDirectory
,
fileName
.
name
,
azureShare
,
fullFilePath
)
;
}
for
(
var
directoryName
of
result
[
'
entries
'
][
'
directories
'
]
){
const
fullDirectoryPath
:
string
=
path
.
join
(
localDirectory
,
directoryName
.
name
)
const
fullAzureDirectory
:
string
=
path
.
join
(
azureDirectory
,
directoryName
.
name
)
downloadDirectory
(
fileServerClient
,
fullAzureDirectory
,
azureShare
,
fullDirectoryPath
)
for
(
const
directoryName
of
result
.
entries
.
directories
)
{
const
fullDirectoryPath
:
string
=
path
.
join
(
localDirectory
,
directoryName
.
name
)
;
const
fullAzureDirectory
:
string
=
path
.
join
(
azureDirectory
,
directoryName
.
name
)
;
await
downloadDirectory
(
fileServerClient
,
fullAzureDirectory
,
azureShare
,
fullDirectoryPath
)
;
}
deferred
.
resolve
();
})
});
return
deferred
.
promise
;
}
}
// tslint:enable: no-redundant-jsdoc no-any no-unsafe-any
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerApiClient.ts
View file @
d48ad027
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
...
...
@@ -20,21 +21,29 @@
'
use strict
'
;
import
*
as
fs
from
'
fs
'
;
import
{
KubernetesCRDClient
,
GeneralK8s
Client
}
from
'
../kubernetesApiClient
'
;
import
{
GeneralK8sClient
,
KubernetesCRD
Client
}
from
'
../kubernetesApiClient
'
;
abstract
class
FrameworkControllerClient
extends
KubernetesCRDClient
{
/**
* FrameworkController Client
*/
abstract
class
FrameworkControllerClient
extends
KubernetesCRDClient
{
/**
* Factory method to generate operator cliet
* Factory method to generate operator clie
n
t
*/
// tslint:disable-next-line:function-name
public
static
generateFrameworkControllerClient
():
KubernetesCRDClient
{
return
new
FrameworkControllerClientV1
();
}
}
/**
* FrameworkController ClientV1
*/
class
FrameworkControllerClientV1
extends
FrameworkControllerClient
{
/**
* constructor, to initialize frameworkcontroller CRD definition
*/
// tslint:disable: no-unsafe-any no-any
public
constructor
()
{
super
();
this
.
crdSchema
=
JSON
.
parse
(
fs
.
readFileSync
(
'
./config/frameworkcontroller/frameworkcontrollerjob-crd-v1.json
'
,
'
utf8
'
));
...
...
@@ -42,13 +51,13 @@ class FrameworkControllerClientV1 extends FrameworkControllerClient {
}
protected
get
operator
():
any
{
return
this
.
client
.
apis
[
"
frameworkcontroller.microsoft.com
"
].
v1
.
namespaces
(
'
default
'
).
frameworks
;
return
this
.
client
.
apis
[
'
frameworkcontroller.microsoft.com
'
].
v1
.
namespaces
(
'
default
'
).
frameworks
;
}
// tslint:enable: no-unsafe-any no-any
public
get
containerName
():
string
{
return
'
framework
'
;
}
}
}
export
{
FrameworkControllerClient
,
GeneralK8sClient
};
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerConfig.ts
View file @
d48ad027
...
...
@@ -20,10 +20,11 @@
'
use strict
'
;
import
*
as
assert
from
'
assert
'
;
import
{
Kubernetes
Trial
Config
,
Kubernetes
TrialConfigTemplat
e
,
KubernetesClusterConfig
Azure
,
Kubernetes
ClusterConfigNFS
,
NFSConfig
,
KubernetesStorageKind
,
keyVault
Config
,
Azure
Storage
,
KubernetesCluster
Config
,
StorageConfig
}
from
'
../kubernetesConfig
'
import
{
AzureStorage
,
KeyVaultConfig
,
Kubernetes
Cluster
Config
,
Kubernetes
ClusterConfigAzur
e
,
KubernetesClusterConfig
NFS
,
Kubernetes
StorageKind
,
KubernetesTrialConfig
,
KubernetesTrialConfigTemplate
,
NFS
Config
,
StorageConfig
}
from
'
../kubernetesConfig
'
;
// tslint:disable:completed-docs
export
class
FrameworkAttemptCompletionPolicy
{
public
readonly
minFailedTaskCount
:
number
;
public
readonly
minSucceededTaskCount
:
number
;
...
...
@@ -36,13 +37,13 @@ export class FrameworkAttemptCompletionPolicy {
/**
* Trial job configuration for FrameworkController
*/
export
class
FrameworkControllerTrialConfigTemplate
extends
KubernetesTrialConfigTemplate
{
export
class
FrameworkControllerTrialConfigTemplate
extends
KubernetesTrialConfigTemplate
{
public
readonly
frameworkAttemptCompletionPolicy
:
FrameworkAttemptCompletionPolicy
;
public
readonly
name
:
string
;
public
readonly
taskNum
:
number
;
constructor
(
taskNum
:
number
,
command
:
string
,
gpuNum
:
number
,
cpuNum
:
number
,
memoryMB
:
number
,
image
:
string
,
frameworkAttemptCompletionPolicy
:
FrameworkAttemptCompletionPolicy
)
{
constructor
(
taskNum
:
number
,
command
:
string
,
gpuNum
:
number
,
cpuNum
:
number
,
memoryMB
:
number
,
image
:
string
,
frameworkAttemptCompletionPolicy
:
FrameworkAttemptCompletionPolicy
)
{
super
(
command
,
gpuNum
,
cpuNum
,
memoryMB
,
image
);
this
.
frameworkAttemptCompletionPolicy
=
frameworkAttemptCompletionPolicy
;
this
.
name
=
name
;
...
...
@@ -50,7 +51,7 @@ export class FrameworkControllerTrialConfigTemplate extends KubernetesTrialConfi
}
}
export
class
FrameworkControllerTrialConfig
extends
KubernetesTrialConfig
{
export
class
FrameworkControllerTrialConfig
extends
KubernetesTrialConfig
{
public
readonly
taskRoles
:
FrameworkControllerTrialConfigTemplate
[];
public
readonly
codeDir
:
string
;
constructor
(
codeDir
:
string
,
taskRoles
:
FrameworkControllerTrialConfigTemplate
[])
{
...
...
@@ -68,11 +69,12 @@ export class FrameworkControllerClusterConfig extends KubernetesClusterConfig {
}
}
// tslint:disable:function-name
export
class
FrameworkControllerClusterConfigNFS
extends
KubernetesClusterConfigNFS
{
public
readonly
serviceAccountName
:
string
;
constructor
(
serviceAccountName
:
string
,
apiVersion
:
string
,
serviceAccountName
:
string
,
apiVersion
:
string
,
nfs
:
NFSConfig
,
storage
?:
KubernetesStorageKind
)
{
...
...
@@ -81,8 +83,9 @@ export class FrameworkControllerClusterConfigNFS extends KubernetesClusterConfig
}
public
static
getInstance
(
jsonObject
:
object
):
FrameworkControllerClusterConfigNFS
{
let
kubeflowClusterConfigObjectNFS
=
<
FrameworkControllerClusterConfigNFS
>
jsonObject
;
assert
(
kubeflowClusterConfigObjectNFS
!==
undefined
)
const
kubeflowClusterConfigObjectNFS
:
FrameworkControllerClusterConfigNFS
=
<
FrameworkControllerClusterConfigNFS
>
jsonObject
;
assert
(
kubeflowClusterConfigObjectNFS
!==
undefined
);
return
new
FrameworkControllerClusterConfigNFS
(
kubeflowClusterConfigObjectNFS
.
serviceAccountName
,
kubeflowClusterConfigObjectNFS
.
apiVersion
,
...
...
@@ -94,20 +97,21 @@ export class FrameworkControllerClusterConfigNFS extends KubernetesClusterConfig
export
class
FrameworkControllerClusterConfigAzure
extends
KubernetesClusterConfigAzure
{
public
readonly
serviceAccountName
:
string
;
constructor
(
serviceAccountName
:
string
,
apiVersion
:
string
,
keyVault
:
k
eyVaultConfig
,
azureStorage
:
AzureStorage
,
serviceAccountName
:
string
,
apiVersion
:
string
,
keyVault
:
K
eyVaultConfig
,
azureStorage
:
AzureStorage
,
storage
?:
KubernetesStorageKind
)
{
super
(
apiVersion
,
keyVault
,
azureStorage
,
storage
);
super
(
apiVersion
,
keyVault
,
azureStorage
,
storage
);
this
.
serviceAccountName
=
serviceAccountName
;
}
public
static
getInstance
(
jsonObject
:
object
):
FrameworkControllerClusterConfigAzure
{
let
kubeflowClusterConfigObjectAzure
=
<
FrameworkControllerClusterConfigAzure
>
jsonObject
;
const
kubeflowClusterConfigObjectAzure
:
FrameworkControllerClusterConfigAzure
=
<
FrameworkControllerClusterConfigAzure
>
jsonObject
;
return
new
FrameworkControllerClusterConfigAzure
(
kubeflowClusterConfigObjectAzure
.
serviceAccountName
,
kubeflowClusterConfigObjectAzure
.
apiVersion
,
...
...
@@ -121,11 +125,11 @@ export class FrameworkControllerClusterConfigAzure extends KubernetesClusterConf
export
class
FrameworkControllerClusterConfigFactory
{
public
static
generateFrameworkControllerClusterConfig
(
jsonObject
:
object
):
FrameworkControllerClusterConfig
{
let
s
torageConfig
=
<
StorageConfig
>
jsonObject
;
if
(
!
storageConfig
)
{
throw
new
Error
(
"
Invalid json object as a StorageConfig instance
"
);
const
storageConfig
:
S
torageConfig
=
<
StorageConfig
>
jsonObject
;
if
(
storageConfig
===
undefined
)
{
throw
new
Error
(
'
Invalid json object as a StorageConfig instance
'
);
}
if
(
storageConfig
.
storage
&&
storageConfig
.
storage
===
'
azureStorage
'
)
{
if
(
storageConfig
.
storage
!==
undefined
&&
storageConfig
.
storage
===
'
azureStorage
'
)
{
return
FrameworkControllerClusterConfigAzure
.
getInstance
(
jsonObject
);
}
else
if
(
storageConfig
.
storage
===
undefined
||
storageConfig
.
storage
===
'
nfs
'
)
{
return
FrameworkControllerClusterConfigNFS
.
getInstance
(
jsonObject
);
...
...
@@ -134,6 +138,7 @@ export class FrameworkControllerClusterConfigFactory {
}
}
export
type
FrameworkControllerJobStatus
=
'
AttemptRunning
'
|
'
Completed
'
|
'
AttemptCreationPending
'
|
'
AttemptCreationRequested
'
|
'
AttemptPreparing
'
|
'
AttemptCompleted
'
;
export
type
FrameworkControllerJobStatus
=
'
AttemptRunning
'
|
'
Completed
'
|
'
AttemptCreationPending
'
|
'
AttemptCreationRequested
'
|
'
AttemptPreparing
'
|
'
AttemptCompleted
'
;
export
type
FrameworkControllerJobCompleteStatus
=
'
Succeeded
'
|
'
Failed
'
;
\ No newline at end of file
export
type
FrameworkControllerJobCompleteStatus
=
'
Succeeded
'
|
'
Failed
'
;
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerJobInfoCollector.ts
View file @
d48ad027
...
...
@@ -19,66 +19,74 @@
'
use strict
'
;
import
{
KubernetesTrialJobDetail
}
from
'
../kubernetesData
'
;
import
{
KubernetesCRDClient
}
from
'
../kubernetesApiClient
'
;
import
{
KubernetesTrialJobDetail
}
from
'
../kubernetesData
'
;
import
{
KubernetesJobInfoCollector
}
from
'
../kubernetesJobInfoCollector
'
;
import
{
FrameworkControllerJobStatus
,
FrameworkControllerJob
Complete
Status
}
from
'
./frameworkcontrollerConfig
'
;
import
{
FrameworkControllerJob
Complete
Status
,
FrameworkControllerJobStatus
}
from
'
./frameworkcontrollerConfig
'
;
/**
* Collector frameworkcontroller jobs info from Kubernetes cluster, and update frameworkcontroller job status locally
*/
export
class
FrameworkControllerJobInfoCollector
extends
KubernetesJobInfoCollector
{
export
class
FrameworkControllerJobInfoCollector
extends
KubernetesJobInfoCollector
{
constructor
(
jobMap
:
Map
<
string
,
KubernetesTrialJobDetail
>
)
{
super
(
jobMap
);
}
protected
async
retrieveSingleTrialJobInfo
(
kubernetesCRDClient
:
KubernetesCRDClient
|
undefined
,
kubernetesTrialJob
:
KubernetesTrialJobDetail
)
:
Promise
<
void
>
{
protected
async
retrieveSingleTrialJobInfo
(
kubernetesCRDClient
:
KubernetesCRDClient
|
undefined
,
kubernetesTrialJob
:
KubernetesTrialJobDetail
)
:
Promise
<
void
>
{
if
(
!
this
.
statusesNeedToCheck
.
includes
(
kubernetesTrialJob
.
status
))
{
return
Promise
.
resolve
();
}
if
(
kubernetesCRDClient
===
undefined
)
{
if
(
kubernetesCRDClient
===
undefined
)
{
return
Promise
.
reject
(
'
kubernetesCRDClient is undefined
'
);
}
// tslint:disable-next-line:no-any
let
kubernetesJobInfo
:
any
;
try
{
kubernetesJobInfo
=
await
kubernetesCRDClient
.
getKubernetesJob
(
kubernetesTrialJob
.
kubernetesJobName
);
}
catch
(
error
)
{
kubernetesJobInfo
=
await
kubernetesCRDClient
.
getKubernetesJob
(
kubernetesTrialJob
.
kubernetesJobName
);
}
catch
(
error
)
{
this
.
log
.
error
(
`Get job
${
kubernetesTrialJob
.
kubernetesJobName
}
info failed, error is
${
error
}
`
);
//This is not treat as a error status
return
Promise
.
resolve
();
}
if
(
kubernetesJobInfo
.
status
&&
kubernetesJobInfo
.
status
.
state
)
{
// tslint:disable: no-unsafe-any
if
(
kubernetesJobInfo
.
status
&&
kubernetesJobInfo
.
status
.
state
)
{
const
frameworkJobType
:
FrameworkControllerJobStatus
=
<
FrameworkControllerJobStatus
>
kubernetesJobInfo
.
status
.
state
;
switch
(
frameworkJobType
)
{
case
'
AttemptCreationPending
'
||
'
AttemptCreationRequested
'
||
'
AttemptPreparing
'
:
switch
(
frameworkJobType
)
{
case
'
AttemptCreationPending
'
:
case
'
AttemptCreationRequested
'
:
case
'
AttemptPreparing
'
:
kubernetesTrialJob
.
status
=
'
WAITING
'
;
break
;
case
'
AttemptRunning
'
:
kubernetesTrialJob
.
status
=
'
RUNNING
'
;
if
(
!
kubernetesTrialJob
.
startTime
)
{
if
(
kubernetesTrialJob
.
startTime
===
undefined
)
{
kubernetesTrialJob
.
startTime
=
Date
.
parse
(
<
string
>
kubernetesJobInfo
.
status
.
startTime
);
}
break
;
case
'
Completed
'
:
const
completedJobType
:
FrameworkControllerJobCompleteStatus
=
<
FrameworkControllerJobCompleteStatus
>
kubernetesJobInfo
.
status
.
attemptStatus
.
completionStatus
.
type
.
name
;
switch
(
completedJobType
)
{
const
completedJobType
:
FrameworkControllerJobCompleteStatus
=
<
FrameworkControllerJobCompleteStatus
>
kubernetesJobInfo
.
status
.
attemptStatus
.
completionStatus
.
type
.
name
;
switch
(
completedJobType
)
{
case
'
Succeeded
'
:
kubernetesTrialJob
.
status
=
'
SUCCEEDED
'
;
break
;
case
'
Failed
'
:
kubernetesTrialJob
.
status
=
'
FAILED
'
;
break
;
break
;
default
:
}
kubernetesTrialJob
.
endTime
=
Date
.
parse
(
<
string
>
kubernetesJobInfo
.
status
.
completionTime
);
kubernetesTrialJob
.
endTime
=
Date
.
parse
(
<
string
>
kubernetesJobInfo
.
status
.
completionTime
);
break
;
default
:
break
;
}
}
return
Promise
.
resolve
();
}
}
\ No newline at end of file
// tslint:enable: no-unsafe-any
}
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerJobRestServer.ts
View file @
d48ad027
...
...
@@ -20,16 +20,16 @@
'
use strict
'
;
import
*
as
component
from
'
../../../common/component
'
;
import
{
KubernetesJobRestServer
}
from
'
../kubernetesJobRestServer
'
;
import
{
FrameworkControllerTrainingService
}
from
'
./frameworkcontrollerTrainingService
'
;
import
{
KubernetesJobRestServer
}
from
'
../kubernetesJobRestServer
'
/**
* frameworkcontroller Training service Rest server, provides rest API to support frameworkcontroller job metrics update
*
*
*/
@
component
.
Singleton
export
class
FrameworkControllerJobRestServer
extends
KubernetesJobRestServer
{
export
class
FrameworkControllerJobRestServer
extends
KubernetesJobRestServer
{
constructor
()
{
super
(
component
.
get
(
FrameworkControllerTrainingService
));
}
}
\ No newline at end of file
}
}
src/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts
View file @
d48ad027
This diff is collapsed.
Click to expand it.
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowApiClient.ts
View file @
d48ad027
...
...
@@ -20,18 +20,22 @@
'
use strict
'
;
import
*
as
fs
from
'
fs
'
;
import
{
GeneralK8sClient
,
KubernetesCRDClient
}
from
'
../kubernetesApiClient
'
;
import
{
KubeflowOperator
}
from
'
./kubeflowConfig
'
;
import
{
KubernetesCRDClient
,
GeneralK8sClient
}
from
'
../kubernetesApiClient
'
;
abstract
class
KubeflowOperatorClient
extends
KubernetesCRDClient
{
/**
* KubeflowOperator Client
*/
abstract
class
KubeflowOperatorClient
extends
KubernetesCRDClient
{
/**
* Factory method to generate operator cliet
* Factory method to generate operator clie
n
t
*/
public
static
generateOperatorClient
(
kubeflowOperator
:
KubeflowOperator
,
operatorApiVersion
:
string
):
KubernetesCRDClient
{
switch
(
kubeflowOperator
)
{
// tslint:disable-next-line:function-name
public
static
generateOperatorClient
(
kubeflowOperator
:
KubeflowOperator
,
operatorApiVersion
:
string
):
KubernetesCRDClient
{
switch
(
kubeflowOperator
)
{
case
'
tf-operator
'
:
{
switch
(
operatorApiVersion
)
{
switch
(
operatorApiVersion
)
{
case
'
v1alpha2
'
:
{
return
new
TFOperatorClientV1Alpha2
();
}
...
...
@@ -41,11 +45,12 @@ abstract class KubeflowOperatorClient extends KubernetesCRDClient{
case
'
v1beta2
'
:
{
return
new
TFOperatorClientV1Beta2
();
}
default
:
throw
new
Error
(
`Invalid tf-operator apiVersion
${
operatorApiVersion
}
`
);
}
break
;
}
case
'
pytorch-operator
'
:
{
switch
(
operatorApiVersion
)
{
switch
(
operatorApiVersion
)
{
case
'
v1alpha2
'
:
{
return
new
PyTorchOperatorClientV1Alpha2
();
}
...
...
@@ -55,13 +60,17 @@ abstract class KubeflowOperatorClient extends KubernetesCRDClient{
case
'
v1beta2
'
:
{
return
new
PyTorchOperatorClientV1Beta2
();
}
default
:
throw
new
Error
(
`Invalid pytorch-operator apiVersion
${
operatorApiVersion
}
`
);
}
}
}
default
:
throw
new
Error
(
`Invalid operator
${
kubeflowOperator
}
`
);
}
throw
new
Error
(
`Invalid operator
${
kubeflowOperator
}
or apiVersion
${
operatorApiVersion
}
`
);
}
}
// tslint:disable: no-unsafe-any no-any completed-docs
class
TFOperatorClientV1Alpha2
extends
KubeflowOperatorClient
{
/**
* constructor, to initialize tfjob CRD definition
...
...
@@ -73,12 +82,12 @@ class TFOperatorClientV1Alpha2 extends KubeflowOperatorClient {
}
protected
get
operator
():
any
{
return
this
.
client
.
apis
[
"
kubeflow.org
"
].
v1alpha2
.
namespaces
(
'
default
'
).
tfjobs
;
return
this
.
client
.
apis
[
'
kubeflow.org
'
].
v1alpha2
.
namespaces
(
'
default
'
).
tfjobs
;
}
public
get
containerName
():
string
{
return
'
tensorflow
'
;
}
}
}
class
TFOperatorClientV1Beta1
extends
KubernetesCRDClient
{
...
...
@@ -92,12 +101,12 @@ class TFOperatorClientV1Beta1 extends KubernetesCRDClient {
}
protected
get
operator
():
any
{
return
this
.
client
.
apis
[
"
kubeflow.org
"
].
v1beta1
.
namespaces
(
'
default
'
).
tfjobs
;
return
this
.
client
.
apis
[
'
kubeflow.org
'
].
v1beta1
.
namespaces
(
'
default
'
).
tfjobs
;
}
public
get
containerName
():
string
{
return
'
tensorflow
'
;
}
}
}
class
TFOperatorClientV1Beta2
extends
KubernetesCRDClient
{
...
...
@@ -111,12 +120,12 @@ class TFOperatorClientV1Beta2 extends KubernetesCRDClient {
}
protected
get
operator
():
any
{
return
this
.
client
.
apis
[
"
kubeflow.org
"
].
v1beta2
.
namespaces
(
'
default
'
).
tfjobs
;
return
this
.
client
.
apis
[
'
kubeflow.org
'
].
v1beta2
.
namespaces
(
'
default
'
).
tfjobs
;
}
public
get
containerName
():
string
{
return
'
tensorflow
'
;
}
}
}
class
PyTorchOperatorClientV1Alpha2
extends
KubeflowOperatorClient
{
...
...
@@ -130,7 +139,7 @@ class PyTorchOperatorClientV1Alpha2 extends KubeflowOperatorClient {
}
protected
get
operator
():
any
{
return
this
.
client
.
apis
[
"
kubeflow.org
"
].
v1alpha2
.
namespaces
(
'
default
'
).
pytorchjobs
;
return
this
.
client
.
apis
[
'
kubeflow.org
'
].
v1alpha2
.
namespaces
(
'
default
'
).
pytorchjobs
;
}
public
get
containerName
():
string
{
...
...
@@ -149,7 +158,7 @@ class PyTorchOperatorClientV1Beta1 extends KubernetesCRDClient {
}
protected
get
operator
():
any
{
return
this
.
client
.
apis
[
"
kubeflow.org
"
].
v1beta1
.
namespaces
(
'
default
'
).
pytorchjobs
;
return
this
.
client
.
apis
[
'
kubeflow.org
'
].
v1beta1
.
namespaces
(
'
default
'
).
pytorchjobs
;
}
public
get
containerName
():
string
{
...
...
@@ -168,7 +177,7 @@ class PyTorchOperatorClientV1Beta2 extends KubernetesCRDClient {
}
protected
get
operator
():
any
{
return
this
.
client
.
apis
[
"
kubeflow.org
"
].
v1beta2
.
namespaces
(
'
default
'
).
pytorchjobs
;
return
this
.
client
.
apis
[
'
kubeflow.org
'
].
v1beta2
.
namespaces
(
'
default
'
).
pytorchjobs
;
}
public
get
containerName
():
string
{
...
...
@@ -176,5 +185,5 @@ class PyTorchOperatorClientV1Beta2 extends KubernetesCRDClient {
}
}
// tslint:enable: no-unsafe-any
export
{
KubeflowOperatorClient
,
GeneralK8sClient
};
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowConfig.ts
View file @
d48ad027
...
...
@@ -20,16 +20,20 @@
'
use strict
'
;
import
*
as
assert
from
'
assert
'
;
import
{
KubernetesClusterConfigAzure
,
KubernetesClusterConfigNFS
,
KubernetesStorageKind
,
NFSConfig
,
AzureStorage
,
keyVaultConfig
,
KubernetesTrialConfig
,
KubernetesTrialConfigTemplate
,
StorageConfig
,
KubernetesClusterConfig
}
from
'
../kubernetesConfig
'
import
{
MethodNotImplementedError
}
from
'
../../../common/errors
'
;
import
{
AzureStorage
,
KeyVaultConfig
,
KubernetesClusterConfig
,
KubernetesClusterConfigAzure
,
KubernetesClusterConfigNFS
,
KubernetesStorageKind
,
KubernetesTrialConfig
,
KubernetesTrialConfigTemplate
,
NFSConfig
,
StorageConfig
}
from
'
../kubernetesConfig
'
;
/
**
operator types that kubeflow supported
*/
/
/
operator types that kubeflow supported
export
type
KubeflowOperator
=
'
tf-operator
'
|
'
pytorch-operator
'
;
export
type
DistTrainRole
=
'
worker
'
|
'
ps
'
|
'
master
'
;
export
type
KubeflowJobStatus
=
'
Created
'
|
'
Running
'
|
'
Failed
'
|
'
Succeeded
'
;
export
type
OperatorApiVersion
=
'
v1alpha2
'
|
'
v1beta1
'
|
'
v1beta2
'
;
/**
* Kubeflow Cluster Configuration
*/
export
class
KubeflowClusterConfig
extends
KubernetesClusterConfig
{
public
readonly
operator
:
KubeflowOperator
;
constructor
(
apiVersion
:
string
,
operator
:
KubeflowOperator
)
{
...
...
@@ -38,11 +42,12 @@ export class KubeflowClusterConfig extends KubernetesClusterConfig {
}
}
// tslint:disable:completed-docs
export
class
KubeflowClusterConfigNFS
extends
KubernetesClusterConfigNFS
{
public
readonly
operator
:
KubeflowOperator
;
constructor
(
operator
:
KubeflowOperator
,
apiVersion
:
string
,
operator
:
KubeflowOperator
,
apiVersion
:
string
,
nfs
:
NFSConfig
,
storage
?:
KubernetesStorageKind
)
{
...
...
@@ -54,9 +59,11 @@ export class KubeflowClusterConfigNFS extends KubernetesClusterConfigNFS {
return
'
nfs
'
;
}
// tslint:disable-next-line:function-name
public
static
getInstance
(
jsonObject
:
object
):
KubeflowClusterConfigNFS
{
let
kubeflowClusterConfigObjectNFS
=
<
KubeflowClusterConfigNFS
>
jsonObject
;
assert
(
kubeflowClusterConfigObjectNFS
!==
undefined
)
const
kubeflowClusterConfigObjectNFS
:
KubeflowClusterConfigNFS
=
<
KubeflowClusterConfigNFS
>
jsonObject
;
assert
(
kubeflowClusterConfigObjectNFS
!==
undefined
);
return
new
KubeflowClusterConfigNFS
(
kubeflowClusterConfigObjectNFS
.
operator
,
kubeflowClusterConfigObjectNFS
.
apiVersion
,
...
...
@@ -66,26 +73,28 @@ export class KubeflowClusterConfigNFS extends KubernetesClusterConfigNFS {
}
}
export
class
KubeflowClusterConfigAzure
extends
KubernetesClusterConfigAzure
{
export
class
KubeflowClusterConfigAzure
extends
KubernetesClusterConfigAzure
{
public
readonly
operator
:
KubeflowOperator
;
constructor
(
operator
:
KubeflowOperator
,
apiVersion
:
string
,
keyVault
:
k
eyVaultConfig
,
azureStorage
:
AzureStorage
,
operator
:
KubeflowOperator
,
apiVersion
:
string
,
keyVault
:
K
eyVaultConfig
,
azureStorage
:
AzureStorage
,
storage
?:
KubernetesStorageKind
)
{
super
(
apiVersion
,
keyVault
,
azureStorage
,
storage
);
super
(
apiVersion
,
keyVault
,
azureStorage
,
storage
);
this
.
operator
=
operator
;
}
public
get
storageType
():
KubernetesStorageKind
{
public
get
storageType
():
KubernetesStorageKind
{
return
'
azureStorage
'
;
}
// tslint:disable-next-line:function-name
public
static
getInstance
(
jsonObject
:
object
):
KubeflowClusterConfigAzure
{
let
kubeflowClusterConfigObjectAzure
=
<
KubeflowClusterConfigAzure
>
jsonObject
;
const
kubeflowClusterConfigObjectAzure
:
KubeflowClusterConfigAzure
=
<
KubeflowClusterConfigAzure
>
jsonObject
;
return
new
KubeflowClusterConfigAzure
(
kubeflowClusterConfigObjectAzure
.
operator
,
kubeflowClusterConfigObjectAzure
.
apiVersion
,
...
...
@@ -98,12 +107,13 @@ export class KubeflowClusterConfigAzure extends KubernetesClusterConfigAzure{
export
class
KubeflowClusterConfigFactory
{
// tslint:disable-next-line:function-name
public
static
generateKubeflowClusterConfig
(
jsonObject
:
object
):
KubeflowClusterConfig
{
let
s
torageConfig
=
<
StorageConfig
>
jsonObject
;
if
(
!
storageConfig
)
{
throw
new
Error
(
"
Invalid json object as a StorageConfig instance
"
);
const
storageConfig
:
S
torageConfig
=
<
StorageConfig
>
jsonObject
;
if
(
storageConfig
===
undefined
)
{
throw
new
Error
(
'
Invalid json object as a StorageConfig instance
'
);
}
if
(
storageConfig
.
storage
&&
storageConfig
.
storage
===
'
azureStorage
'
)
{
if
(
storageConfig
.
storage
!==
undefined
&&
storageConfig
.
storage
===
'
azureStorage
'
)
{
return
KubeflowClusterConfigAzure
.
getInstance
(
jsonObject
);
}
else
if
(
storageConfig
.
storage
===
undefined
||
storageConfig
.
storage
===
'
nfs
'
)
{
return
KubeflowClusterConfigNFS
.
getInstance
(
jsonObject
);
...
...
@@ -122,10 +132,10 @@ export class KubeflowTrialConfig extends KubernetesTrialConfig {
}
}
export
class
KubeflowTrialConfigTemplate
extends
KubernetesTrialConfigTemplate
{
export
class
KubeflowTrialConfigTemplate
extends
KubernetesTrialConfigTemplate
{
public
readonly
replicas
:
number
;
constructor
(
replicas
:
number
,
command
:
string
,
gpuNum
:
number
,
cpuNum
:
number
,
memoryMB
:
number
,
image
:
string
)
{
constructor
(
replicas
:
number
,
command
:
string
,
gpuNum
:
number
,
cpuNum
:
number
,
memoryMB
:
number
,
image
:
string
)
{
super
(
command
,
gpuNum
,
cpuNum
,
memoryMB
,
image
);
this
.
replicas
=
replicas
;
}
...
...
@@ -163,22 +173,25 @@ export class KubeflowTrialConfigPytorch extends KubeflowTrialConfig {
export
class
KubeflowTrialConfigFactory
{
// tslint:disable-next-line:function-name
public
static
generateKubeflowTrialConfig
(
jsonObject
:
object
,
operator
:
KubeflowOperator
):
KubeflowTrialConfig
{
if
(
operator
===
'
tf-operator
'
){
let
kubeflowTrialConfigObject
=
<
KubeflowTrialConfigTensorflow
>
jsonObject
;
if
(
operator
===
'
tf-operator
'
)
{
const
kubeflowTrialConfigObject
:
KubeflowTrialConfigTensorflow
=
<
KubeflowTrialConfigTensorflow
>
jsonObject
;
return
new
KubeflowTrialConfigTensorflow
(
kubeflowTrialConfigObject
.
codeDir
,
kubeflowTrialConfigObject
.
worker
,
kubeflowTrialConfigObject
.
ps
);
}
else
if
(
operator
===
'
pytorch-operator
'
){
let
kubeflowTrialConfigObject
=
<
KubeflowTrialConfigPytorch
>
jsonObject
;
}
else
if
(
operator
===
'
pytorch-operator
'
)
{
const
kubeflowTrialConfigObject
:
KubeflowTrialConfigPytorch
=
<
KubeflowTrialConfigPytorch
>
jsonObject
;
return
new
KubeflowTrialConfigPytorch
(
kubeflowTrialConfigObject
.
codeDir
,
kubeflowTrialConfigObject
.
master
,
kubeflowTrialConfigObject
.
worker
);
}
throw
new
Error
(
`Invalid json object
${
jsonObject
}
`
);
throw
new
Error
(
`Invalid json object
${
jsonObject
}
`
);
}
}
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowJobInfoCollector.ts
View file @
d48ad027
...
...
@@ -19,65 +19,68 @@
'
use strict
'
;
import
{
KubernetesTrialJobDetail
}
from
'
../kubernetesData
'
;
import
{
KubernetesCRDClient
}
from
'
../kubernetesApiClient
'
;
import
{
KubernetesTrialJobDetail
}
from
'
../kubernetesData
'
;
import
{
KubernetesJobInfoCollector
}
from
'
../kubernetesJobInfoCollector
'
;
import
{
KubeflowJobStatus
}
from
'
./kubeflowConfig
'
;
/**
* Collector Kubeflow jobs info from Kubernetes cluster, and update kubeflow job status locally
*/
export
class
KubeflowJobInfoCollector
extends
KubernetesJobInfoCollector
{
export
class
KubeflowJobInfoCollector
extends
KubernetesJobInfoCollector
{
constructor
(
jobMap
:
Map
<
string
,
KubernetesTrialJobDetail
>
)
{
super
(
jobMap
);
}
protected
async
retrieveSingleTrialJobInfo
(
kubernetesCRDClient
:
KubernetesCRDClient
|
undefined
,
kubernetesTrialJob
:
KubernetesTrialJobDetail
)
:
Promise
<
void
>
{
protected
async
retrieveSingleTrialJobInfo
(
kubernetesCRDClient
:
KubernetesCRDClient
|
undefined
,
kubernetesTrialJob
:
KubernetesTrialJobDetail
)
:
Promise
<
void
>
{
if
(
!
this
.
statusesNeedToCheck
.
includes
(
kubernetesTrialJob
.
status
))
{
return
Promise
.
resolve
();
}
if
(
kubernetesCRDClient
===
undefined
)
{
if
(
kubernetesCRDClient
===
undefined
)
{
return
Promise
.
reject
(
'
kubernetesCRDClient is undefined
'
);
}
// tslint:disable:no-any no-unsafe-any
let
kubernetesJobInfo
:
any
;
try
{
kubernetesJobInfo
=
await
kubernetesCRDClient
.
getKubernetesJob
(
kubernetesTrialJob
.
kubernetesJobName
);
}
catch
(
error
)
{
// Notice: it maynot be a 'real' error since cancel trial job can also cause getKubernetesJob failed.
kubernetesJobInfo
=
await
kubernetesCRDClient
.
getKubernetesJob
(
kubernetesTrialJob
.
kubernetesJobName
);
}
catch
(
error
)
{
// Notice: it maynot be a 'real' error since cancel trial job can also cause getKubernetesJob failed.
this
.
log
.
error
(
`Get job
${
kubernetesTrialJob
.
kubernetesJobName
}
info failed, error is
${
error
}
`
);
//This is not treat as a error status
return
Promise
.
resolve
();
}
if
(
kubernetesJobInfo
.
status
&&
kubernetesJobInfo
.
status
.
conditions
)
{
const
latestCondition
=
kubernetesJobInfo
.
status
.
conditions
[
kubernetesJobInfo
.
status
.
conditions
.
length
-
1
];
if
(
kubernetesJobInfo
.
status
&&
kubernetesJobInfo
.
status
.
conditions
)
{
const
latestCondition
:
any
=
kubernetesJobInfo
.
status
.
conditions
[
kubernetesJobInfo
.
status
.
conditions
.
length
-
1
];
const
tfJobType
:
KubeflowJobStatus
=
<
KubeflowJobStatus
>
latestCondition
.
type
;
switch
(
tfJobType
)
{
switch
(
tfJobType
)
{
case
'
Created
'
:
kubernetesTrialJob
.
status
=
'
WAITING
'
;
kubernetesTrialJob
.
startTime
=
Date
.
parse
(
<
string
>
latestCondition
.
lastUpdateTime
);
break
;
kubernetesTrialJob
.
startTime
=
Date
.
parse
(
<
string
>
latestCondition
.
lastUpdateTime
);
break
;
case
'
Running
'
:
kubernetesTrialJob
.
status
=
'
RUNNING
'
;
if
(
!
kubernetesTrialJob
.
startTime
)
{
if
(
kubernetesTrialJob
.
startTime
===
undefined
)
{
kubernetesTrialJob
.
startTime
=
Date
.
parse
(
<
string
>
latestCondition
.
lastUpdateTime
);
}
break
;
case
'
Failed
'
:
kubernetesTrialJob
.
status
=
'
FAILED
'
;
kubernetesTrialJob
.
endTime
=
Date
.
parse
(
<
string
>
latestCondition
.
lastUpdateTime
);
kubernetesTrialJob
.
endTime
=
Date
.
parse
(
<
string
>
latestCondition
.
lastUpdateTime
);
break
;
case
'
Succeeded
'
:
kubernetesTrialJob
.
status
=
'
SUCCEEDED
'
;
kubernetesTrialJob
.
endTime
=
Date
.
parse
(
<
string
>
latestCondition
.
lastUpdateTime
);
kubernetesTrialJob
.
endTime
=
Date
.
parse
(
<
string
>
latestCondition
.
lastUpdateTime
);
break
;
default
:
break
;
}
}
// tslint:enable:no-any no-unsafe-any
return
Promise
.
resolve
();
}
}
\ No newline at end of file
}
src/nni_manager/training_service/kubernetes/kubeflow/kubeflowJobRestServer.ts
View file @
d48ad027
...
...
@@ -20,19 +20,19 @@
'
use strict
'
;
import
*
as
component
from
'
../../../common/component
'
;
import
{
KubernetesJobRestServer
}
from
'
../kubernetesJobRestServer
'
;
import
{
KubeflowTrainingService
}
from
'
./kubeflowTrainingService
'
;
import
{
KubernetesJobRestServer
}
from
'
../kubernetesJobRestServer
'
/**
* Kubeflow Training service Rest server, provides rest API to support kubeflow job metrics update
*
*
*/
@
component
.
Singleton
export
class
KubeflowJobRestServer
extends
KubernetesJobRestServer
{
export
class
KubeflowJobRestServer
extends
KubernetesJobRestServer
{
/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor
()
{
super
(
component
.
get
(
KubeflowTrainingService
));
}
}
\ No newline at end of file
}
}
Prev
1
2
3
4
5
6
7
8
9
10
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment