manager.ts 5.83 KB
Newer Older
1
2
3
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

4
import assert from 'assert/strict';
5
6
7
import fs from 'fs';
import os from 'os';
import path from 'path';
8
import * as timersPromises from 'timers/promises';
9
10
11

import { Deferred } from 'ts-deferred';

12
13
14
15
16
17
18
import { getLogger, Logger } from 'common/log';
import globals from 'common/globals';
import { isAlive } from 'common/utils';
import { withLock, withLockNoWait } from './utils';

const logger: Logger = getLogger('experiments_manager');

19
20
21
22
23
24
25
26
27
28
interface CrashedInfo {
    experimentId: string;
    isCrashed: boolean;
}

interface FileInfo {
    buffer: Buffer;
    mtime: number;
}

29
30
export class ExperimentsManager {
    private profileUpdateTimer: Record<string, NodeJS.Timeout | undefined> = {};
31
32

    constructor() {
33
        globals.shutdown.register('experiments_manager', this.cleanUp.bind(this));
34
35
36
    }

    public async getExperimentsInfo(): Promise<JSON> {
37
        const fileInfo: FileInfo = await withLock(globals.paths.experimentsList, () => this.readExperimentsInfo());
38
39
40
41
42
43
44
45
        const experimentsInformation = JSON.parse(fileInfo.buffer.toString());
        const expIdList: Array<string> = Object.keys(experimentsInformation).filter((expId) => {
            return experimentsInformation[expId]['status'] !== 'STOPPED';
        });
        const updateList: Array<CrashedInfo> = (await Promise.all(expIdList.map((expId) => {
            return this.checkCrashed(expId, experimentsInformation[expId]['pid']);
        }))).filter(crashedInfo => crashedInfo.isCrashed);
        if (updateList.length > 0){
46
47
48
            const result = await withLock(globals.paths.experimentsList, () => {
                return this.updateAllStatus(updateList.map(crashedInfo => crashedInfo.experimentId), fileInfo.mtime)
            });
49
50
51
            if (result !== undefined) {
                return JSON.parse(JSON.stringify(Object.keys(result).map(key=>result[key])));
            } else {
52
                await timersPromises.setTimeout(500);
53
54
55
56
57
58
59
60
61
62
63
                return await this.getExperimentsInfo();
            }
        } else {
            return JSON.parse(JSON.stringify(Object.keys(experimentsInformation).map(key=>experimentsInformation[key])));
        }
    }

    public setExperimentInfo(experimentId: string, key: string, value: any): void {
        try {
            if (this.profileUpdateTimer[key] !== undefined) {
                // if a new call with the same timerId occurs, destroy the unfinished old one
64
                clearTimeout(this.profileUpdateTimer[key]!);
65
66
                this.profileUpdateTimer[key] = undefined;
            }
67
68
            withLockNoWait(globals.paths.experimentsList, () => {
                const experimentsInformation = JSON.parse(fs.readFileSync(globals.paths.experimentsList).toString());
69
                assert(experimentId in experimentsInformation, `Experiment Manager: Experiment Id ${experimentId} not found, this should not happen`);
J-shang's avatar
J-shang committed
70
71
72
73
74
                if (value !== undefined) {
                    experimentsInformation[experimentId][key] = value;
                } else {
                    delete experimentsInformation[experimentId][key];
                }
75
                fs.writeFileSync(globals.paths.experimentsList, JSON.stringify(experimentsInformation, null, 4));
76
77
            });
        } catch (err) {
78
79
            logger.error(err);
            logger.debug(`Experiment Manager: Retry set key value: ${experimentId} {${key}: ${value}}`);
80
            if (err.code === 'EEXIST' || err.message === 'File has been locked.') {
81
                this.profileUpdateTimer[key] = setTimeout(() => this.setExperimentInfo(experimentId, key, value), 100);
82
83
84
85
86
            }
        }
    }

    private readExperimentsInfo(): FileInfo {
87
88
        const buffer: Buffer = fs.readFileSync(globals.paths.experimentsList);
        const mtime: number = fs.statSync(globals.paths.experimentsList).mtimeMs;
89
90
91
92
93
94
95
96
97
        return {buffer: buffer, mtime: mtime};
    }

    private async checkCrashed(expId: string, pid: number): Promise<CrashedInfo> {
        const alive: boolean = await isAlive(pid);
        return {experimentId: expId, isCrashed: !alive}
    }

    private updateAllStatus(updateList: Array<string>, timestamp: number): {[key: string]: any} | undefined {
98
        if (timestamp !== fs.statSync(globals.paths.experimentsList).mtimeMs) {
99
100
            return;
        } else {
101
            const experimentsInformation = JSON.parse(fs.readFileSync(globals.paths.experimentsList).toString());
102
103
104
            updateList.forEach((expId: string) => {
                if (experimentsInformation[expId]) {
                    experimentsInformation[expId]['status'] = 'STOPPED';
J-shang's avatar
J-shang committed
105
                    delete experimentsInformation[expId]['port'];
106
                } else {
107
                    logger.error(`Experiment Manager: Experiment Id ${expId} not found, this should not happen`);
108
109
                }
            });
110
            fs.writeFileSync(globals.paths.experimentsList, JSON.stringify(experimentsInformation, null, 4));
111
112
113
114
115
116
117
            return experimentsInformation;
        }
    }

    private async cleanUp(): Promise<void> {
        const deferred = new Deferred<void>();
        if (this.isUndone()) {
118
            logger.debug('Experiment manager: something undone');
119
120
121
122
123
124
125
126
            setTimeout(((deferred: Deferred<void>): void => {
                if (this.isUndone()) {
                    deferred.reject(new Error('Still has undone after 5s, forced stop.'));
                } else {
                    deferred.resolve();
                }
            }).bind(this), 5 * 1000, deferred);
        } else {
127
            logger.debug('Experiment manager: all clean up');
128
129
130
131
132
133
134
135
136
137
138
            deferred.resolve();
        }
        return deferred.promise;
    }

    private isUndone(): boolean {
        return Object.keys(this.profileUpdateTimer).filter((key: string) => {
            return this.profileUpdateTimer[key] !== undefined;
        }).length > 0;
    }
}