paiJobInfoCollector.ts 6.48 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

'use strict';

import * as request from 'request';
import { Deferred } from 'ts-deferred';
import { getLogger, Logger } from '../../common/log';
import { NNIError, NNIErrorNames } from '../../common/errors';
import { PAITrialJobDetail } from './paiData';
import { PAIClusterConfig } from './paiConfig';
import { TrialJobStatus } from '../../common/trainingService';

/**
 * Collector PAI jobs info from PAI cluster, and update pai job status locally
 */
export class PAIJobInfoCollector {
    private readonly trialJobsMap : Map<string, PAITrialJobDetail>;
    private readonly log: Logger = getLogger();
    private readonly statusesNeedToCheck : TrialJobStatus[];
    private readonly finalStatuses : TrialJobStatus[];

    constructor(jobMap: Map<string, PAITrialJobDetail>) {
        this.trialJobsMap = jobMap;
        this.statusesNeedToCheck = ['RUNNING', 'UNKNOWN', 'WAITING'];
QuanluZhang's avatar
QuanluZhang committed
42
        this.finalStatuses = ['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'];
43
44
    }

45
    public async retrieveTrialStatus(paiToken? : string, paiClusterConfig?: PAIClusterConfig) : Promise<void> {
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
        if (!paiClusterConfig || !paiToken) {
            return Promise.resolve();            
        }

        const updatePaiTrialJobs : Promise<void>[] = [];
        for(let [trialJobId, paiTrialJob] of this.trialJobsMap) {
            if (!paiTrialJob) {
                throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
            }
            updatePaiTrialJobs.push(this.getSinglePAITrialJobInfo(paiTrialJob, paiToken, paiClusterConfig))
        }

        await Promise.all(updatePaiTrialJobs);
    }

    private getSinglePAITrialJobInfo(paiTrialJob : PAITrialJobDetail, paiToken : string, paiClusterConfig: PAIClusterConfig) : Promise<void> {
        const deferred : Deferred<void> = new Deferred<void>();
        if (!this.statusesNeedToCheck.includes(paiTrialJob.status)) {
            deferred.resolve();
            return deferred.promise;
        }

        // Rest call to get PAI job info and update status
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const getJobInfoRequest: request.Options = {
71
            uri: `http://${paiClusterConfig.host}/rest-server/api/v1/user/${paiClusterConfig.userName}/jobs/${paiTrialJob.paiJobName}`,
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
            method: 'GET',
            json: true,
            headers: {
                "Content-Type": "application/json",
                "Authorization": 'Bearer ' + paiToken
            }
        };
        //TODO : pass in request timeout param? 
        request(getJobInfoRequest, (error: Error, response: request.Response, body: any) => {
            if (error || response.statusCode >= 500) {
                this.log.error(`PAI Training service: get job info for trial ${paiTrialJob.id} from PAI Cluster failed!`);
                // Queried PAI job info failed, set job status to UNKNOWN
                if(paiTrialJob.status === 'WAITING' || paiTrialJob.status === 'RUNNING') {
                    paiTrialJob.status = 'UNKNOWN';
                }
            } else {
                if(response.body.jobStatus && response.body.jobStatus.state) {
                    switch(response.body.jobStatus.state) {
                        case 'WAITING': 
                            paiTrialJob.status = 'WAITING';
                            break;
                        case 'RUNNING':
                            paiTrialJob.status = 'RUNNING';
                            if(!paiTrialJob.startTime) {
                                paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
                            }
                            if(!paiTrialJob.url) {
                                paiTrialJob.url = response.body.jobStatus.appTrackingUrl;    
                            }
                            break;
                        case 'SUCCEEDED':
                            paiTrialJob.status = 'SUCCEEDED';
                            break;
                        case 'STOPPED':
QuanluZhang's avatar
QuanluZhang committed
106
107
108
                            if (paiTrialJob.status !== 'EARLY_STOPPED') {
                                paiTrialJob.status = 'USER_CANCELED';
                            }
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
                            break;
                        case 'FAILED':
                            paiTrialJob.status = 'FAILED';                            
                            break;
                        default:
                            paiTrialJob.status = 'UNKNOWN';
                            break;
                    }
                    // For final job statues, update startTime, endTime and url
                    if(this.finalStatuses.includes(paiTrialJob.status)) {
                        if(!paiTrialJob.startTime) {
                            paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
                        }
                        if(!paiTrialJob.endTime) {
                            paiTrialJob.endTime = response.body.jobStatus.completedTime;
                        }
                        // Set pai trial job's url to WebHDFS output path
                        if(paiTrialJob.hdfsLogPath) {
                            paiTrialJob.url = paiTrialJob.hdfsLogPath;
                        }
                    }
                }
            }
            deferred.resolve();
        });

        return deferred.promise;
    }
QuanluZhang's avatar
QuanluZhang committed
137
}