paiJobInfoCollector.ts 6.54 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3

4
import request from 'request';
5
import { Deferred } from 'ts-deferred';
6
7
8
9
import { NNIError, NNIErrorNames } from 'common/errors';
import { getLogger, Logger } from 'common/log';
import { TrialJobStatus } from 'common/trainingService';
import { ExperimentConfig, OpenpaiConfig } from 'common/experimentConfig';
10
11
12
import { PAITrialJobDetail } from './paiConfig';

interface FlattenOpenpaiConfig extends ExperimentConfig, OpenpaiConfig { }
13
14
15
16
17

/**
 * Collector PAI jobs info from PAI cluster, and update pai job status locally
 */
export class PAIJobInfoCollector {
chicm-ms's avatar
chicm-ms committed
18
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
liuzhe-lz's avatar
liuzhe-lz committed
19
    private readonly log: Logger = getLogger('PAIJobInfoCollector');
chicm-ms's avatar
chicm-ms committed
20
21
    private readonly statusesNeedToCheck: TrialJobStatus[];
    private readonly finalStatuses: TrialJobStatus[];
22
23
24
25

    constructor(jobMap: Map<string, PAITrialJobDetail>) {
        this.trialJobsMap = jobMap;
        this.statusesNeedToCheck = ['RUNNING', 'UNKNOWN', 'WAITING'];
QuanluZhang's avatar
QuanluZhang committed
26
        this.finalStatuses = ['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'];
27
28
    }

29
30
    public async retrieveTrialStatus(protocol: string, token? : string, config?: FlattenOpenpaiConfig): Promise<void> {
        if (config === undefined || token === undefined) {
31
            return Promise.resolve();
32
33
        }

chicm-ms's avatar
chicm-ms committed
34
        const updatePaiTrialJobs: Promise<void>[] = [];
35
36
        for (const [trialJobId, paiTrialJob] of this.trialJobsMap) {
            if (paiTrialJob === undefined) {
37
38
                throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
            }
39
            updatePaiTrialJobs.push(this.getSinglePAITrialJobInfo(protocol, paiTrialJob, token, config));
40
41
42
43
44
        }

        await Promise.all(updatePaiTrialJobs);
    }

45
    private getSinglePAITrialJobInfo(_protocol: string, paiTrialJob: PAITrialJobDetail, paiToken: string, config: FlattenOpenpaiConfig): Promise<void> {
chicm-ms's avatar
chicm-ms committed
46
        const deferred: Deferred<void> = new Deferred<void>();
47
48
        if (!this.statusesNeedToCheck.includes(paiTrialJob.status)) {
            deferred.resolve();
49

50
51
52
53
54
55
            return deferred.promise;
        }

        // Rest call to get PAI job info and update status
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const getJobInfoRequest: request.Options = {
56
            uri: `${config.host}/rest-server/api/v2/jobs/${config.username}~${paiTrialJob.paiJobName}`,
57
58
            method: 'GET',
            json: true,
59
               headers: {
60
61
                'Content-Type': 'application/json',
                Authorization: `Bearer ${paiToken}`
62
63
            }
        };
64

65
        //TODO : pass in request timeout param?
66
        request(getJobInfoRequest, (error: Error, response: request.Response, _body: any) => {
67
68
69
            // Status code 200 for success
            if ((error !== undefined && error !== null) || response.statusCode >= 400) {
                // The job refresh time could be ealier than job submission, so it might return 404 error code, need refactor
70
                // Queried PAI job info failed, set job status to UNKNOWN
71
                if (paiTrialJob.status === 'WAITING' || paiTrialJob.status === 'RUNNING') {
72
73
74
                    paiTrialJob.status = 'UNKNOWN';
                }
            } else {
75
76
                if (response.body.jobStatus && response.body.jobStatus.state) {
                    switch (response.body.jobStatus.state) {
77
                        case 'WAITING':
78
79
80
81
                            paiTrialJob.status = 'WAITING';
                            break;
                        case 'RUNNING':
                            paiTrialJob.status = 'RUNNING';
82
                            if (paiTrialJob.startTime === undefined) {
83
84
                                paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
                            }
85
                            if (paiTrialJob.url === undefined) {
86
87
88
                                if (response.body.jobStatus.appTrackingUrl) {
                                    paiTrialJob.url = response.body.jobStatus.appTrackingUrl;
                                } else {
SparkSnail's avatar
SparkSnail committed
89
                                    paiTrialJob.url = paiTrialJob.paiJobDetailUrl;
90
                                }
91
92
93
94
95
96
                            }
                            break;
                        case 'SUCCEEDED':
                            paiTrialJob.status = 'SUCCEEDED';
                            break;
                        case 'STOPPED':
SparkSnail's avatar
SparkSnail committed
97
                        case 'STOPPING':
98
                            if (paiTrialJob.isEarlyStopped !== undefined) {
99
                                paiTrialJob.status = paiTrialJob.isEarlyStopped === true ?
100
101
                                        'EARLY_STOPPED' : 'USER_CANCELED';
                            } else {
102
103
104
                                /* if paiTrialJob's isEarlyStopped is undefined, that mean we didn't stop it via cancellation,
                                 * mark it as SYS_CANCELLED by PAI
                                 */
105
                                paiTrialJob.status = 'SYS_CANCELED';
QuanluZhang's avatar
QuanluZhang committed
106
                            }
107
108
                            break;
                        case 'FAILED':
109
                            paiTrialJob.status = 'FAILED';
110
111
112
113
114
                            break;
                        default:
                            paiTrialJob.status = 'UNKNOWN';
                    }
                    // For final job statues, update startTime, endTime and url
115
116
                    if (this.finalStatuses.includes(paiTrialJob.status)) {
                        if (paiTrialJob.startTime === undefined) {
117
118
                            paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
                        }
119
                        if (paiTrialJob.endTime === undefined) {
120
121
122
                            paiTrialJob.endTime = response.body.jobStatus.completedTime;
                        }
                        // Set pai trial job's url to WebHDFS output path
123
                        if (paiTrialJob.logPath !== undefined) {
124
                            if (paiTrialJob.url && paiTrialJob.url !== paiTrialJob.logPath) {
125
126
127
128
                                paiTrialJob.url += `,${paiTrialJob.logPath}`;
                            } else {
                                paiTrialJob.url = `${paiTrialJob.logPath}`;
                            }
129
130
131
132
133
134
135
136
137
                        }
                    }
                }
            }
            deferred.resolve();
        });

        return deferred.promise;
    }
QuanluZhang's avatar
QuanluZhang committed
138
}