paiJobInfoCollector.ts 6.14 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3
4
5

'use strict';

6
// tslint:disable-next-line:no-implicit-dependencies
7
8
9
import * as request from 'request';
import { Deferred } from 'ts-deferred';
import { NNIError, NNIErrorNames } from '../../common/errors';
10
import { getLogger, Logger } from '../../common/log';
11
import { TrialJobStatus } from '../../common/trainingService';
12
13
import { PAIClusterConfig } from './paiConfig';
import { PAITrialJobDetail } from './paiData';
14
15
16
17
18

/**
 * Collector PAI jobs info from PAI cluster, and update pai job status locally
 */
export class PAIJobInfoCollector {
chicm-ms's avatar
chicm-ms committed
19
    private readonly trialJobsMap: Map<string, PAITrialJobDetail>;
20
    private readonly log: Logger = getLogger();
chicm-ms's avatar
chicm-ms committed
21
22
    private readonly statusesNeedToCheck: TrialJobStatus[];
    private readonly finalStatuses: TrialJobStatus[];
23
24
25
26

    constructor(jobMap: Map<string, PAITrialJobDetail>) {
        this.trialJobsMap = jobMap;
        this.statusesNeedToCheck = ['RUNNING', 'UNKNOWN', 'WAITING'];
QuanluZhang's avatar
QuanluZhang committed
27
        this.finalStatuses = ['SUCCEEDED', 'FAILED', 'USER_CANCELED', 'SYS_CANCELED', 'EARLY_STOPPED'];
28
29
    }

chicm-ms's avatar
chicm-ms committed
30
    public async retrieveTrialStatus(paiToken? : string, paiClusterConfig?: PAIClusterConfig): Promise<void> {
31
        if (paiClusterConfig === undefined || paiToken === undefined) {
32
            return Promise.resolve();
33
34
        }

chicm-ms's avatar
chicm-ms committed
35
        const updatePaiTrialJobs: Promise<void>[] = [];
36
37
        for (const [trialJobId, paiTrialJob] of this.trialJobsMap) {
            if (paiTrialJob === undefined) {
38
39
                throw new NNIError(NNIErrorNames.NOT_FOUND, `trial job id ${trialJobId} not found`);
            }
40
            updatePaiTrialJobs.push(this.getSinglePAITrialJobInfo(paiTrialJob, paiToken, paiClusterConfig));
41
42
43
44
45
        }

        await Promise.all(updatePaiTrialJobs);
    }

chicm-ms's avatar
chicm-ms committed
46
47
    private getSinglePAITrialJobInfo(paiTrialJob: PAITrialJobDetail, paiToken: string, paiClusterConfig: PAIClusterConfig): Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();
48
49
        if (!this.statusesNeedToCheck.includes(paiTrialJob.status)) {
            deferred.resolve();
50

51
52
53
54
55
56
            return deferred.promise;
        }

        // Rest call to get PAI job info and update status
        // Refer https://github.com/Microsoft/pai/blob/master/docs/rest-server/API.md for more detail about PAI Rest API
        const getJobInfoRequest: request.Options = {
57
            // tslint:disable-next-line:no-http-string
58
            uri: `http://${paiClusterConfig.host}/rest-server/api/v1/user/${paiClusterConfig.userName}/jobs/${paiTrialJob.paiJobName}`,
59
60
61
            method: 'GET',
            json: true,
            headers: {
62
63
                'Content-Type': 'application/json',
                Authorization: `Bearer ${paiToken}`
64
65
            }
        };
66
67

        // tslint:disable: no-unsafe-any no-any cyclomatic-complexity
68
        //TODO : pass in request timeout param?
69
        request(getJobInfoRequest, (error: Error, response: request.Response, body: any) => {
70
            if ((error !== undefined && error !== null) || response.statusCode >= 500) {
71
72
                this.log.error(`PAI Training service: get job info for trial ${paiTrialJob.id} from PAI Cluster failed!`);
                // Queried PAI job info failed, set job status to UNKNOWN
73
                if (paiTrialJob.status === 'WAITING' || paiTrialJob.status === 'RUNNING') {
74
75
76
                    paiTrialJob.status = 'UNKNOWN';
                }
            } else {
77
78
                if (response.body.jobStatus && response.body.jobStatus.state) {
                    switch (response.body.jobStatus.state) {
79
                        case 'WAITING':
80
81
82
83
                            paiTrialJob.status = 'WAITING';
                            break;
                        case 'RUNNING':
                            paiTrialJob.status = 'RUNNING';
84
                            if (paiTrialJob.startTime === undefined) {
85
86
                                paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
                            }
87
                            if (paiTrialJob.url === undefined) {
88
                                paiTrialJob.url = response.body.jobStatus.appTrackingUrl;
89
90
91
92
93
94
                            }
                            break;
                        case 'SUCCEEDED':
                            paiTrialJob.status = 'SUCCEEDED';
                            break;
                        case 'STOPPED':
95
                            if (paiTrialJob.isEarlyStopped !== undefined) {
96
                                paiTrialJob.status = paiTrialJob.isEarlyStopped === true ?
97
98
                                        'EARLY_STOPPED' : 'USER_CANCELED';
                            } else {
99
100
101
                                /* if paiTrialJob's isEarlyStopped is undefined, that mean we didn't stop it via cancellation,
                                 * mark it as SYS_CANCELLED by PAI
                                 */
102
                                paiTrialJob.status = 'SYS_CANCELED';
QuanluZhang's avatar
QuanluZhang committed
103
                            }
104
105
                            break;
                        case 'FAILED':
106
                            paiTrialJob.status = 'FAILED';
107
108
109
110
111
                            break;
                        default:
                            paiTrialJob.status = 'UNKNOWN';
                    }
                    // For final job statues, update startTime, endTime and url
112
113
                    if (this.finalStatuses.includes(paiTrialJob.status)) {
                        if (paiTrialJob.startTime === undefined) {
114
115
                            paiTrialJob.startTime = response.body.jobStatus.appLaunchedTime;
                        }
116
                        if (paiTrialJob.endTime === undefined) {
117
118
119
                            paiTrialJob.endTime = response.body.jobStatus.completedTime;
                        }
                        // Set pai trial job's url to WebHDFS output path
120
                        if (paiTrialJob.hdfsLogPath !== undefined) {
121
                            paiTrialJob.url += `,${paiTrialJob.hdfsLogPath}`;
122
123
124
125
126
127
128
129
130
                        }
                    }
                }
            }
            deferred.resolve();
        });

        return deferred.promise;
    }
131
    // tslint:enable: no-unsafe-any no-any
QuanluZhang's avatar
QuanluZhang committed
132
}