commandChannel.ts 5.02 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

'use strict';

import { EventEmitter } from "events";
import { getLogger, Logger } from "../../common/log";
import { TRIAL_COMMANDS } from "../../core/commands";
import { encodeCommand } from "../../core/ipcInterface";
import { Channel, EnvironmentInformation } from "./environment";

const acceptedCommands: Set<string> = new Set<string>(TRIAL_COMMANDS);

export class Command {
    public readonly environment: EnvironmentInformation;
    public readonly command: string;
    public readonly data: any;

    constructor(environment: EnvironmentInformation, command: string, data: any) {
        if (!acceptedCommands.has(command)) {
            throw new Error(`unaccepted command ${command}`);
        }
        this.environment = environment;
        this.command = command;
        this.data = data;
    }
}

export class RunnerConnection {
    public readonly environment: EnvironmentInformation;

    constructor(environment: EnvironmentInformation) {
        this.environment = environment;
    }

    public async open(): Promise<void> {
        // do nothing
    }

    public async close(): Promise<void> {
        // do nothing
    }
}

export abstract class CommandChannel {
    protected readonly log: Logger;
    protected runnerConnections: Map<string, RunnerConnection> = new Map<string, RunnerConnection>();
    protected readonly commandEmitter: EventEmitter;

    private readonly commandPattern: RegExp = /(?<type>[\w]{2})(?<length>[\d]{14})(?<data>.*)\n?/gm;

    public constructor(commandEmitter: EventEmitter) {
        this.log = getLogger();
        this.commandEmitter = commandEmitter;
    }

    public abstract get channelName(): Channel;
    public abstract config(key: string, value: any): Promise<void>;
    public abstract start(): Promise<void>;
    public abstract stop(): Promise<void>;

SparkSnail's avatar
SparkSnail committed
62
63
64
    // Pull-based command channels need loop to check messages, the loop should be started with await here.
    public abstract run(): Promise<void>;

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
    protected abstract sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void>;
    protected abstract createRunnerConnection(environment: EnvironmentInformation): RunnerConnection;

    public async sendCommand(environment: EnvironmentInformation, commantType: string, data: any): Promise<void> {
        const command = encodeCommand(commantType, JSON.stringify(data));
        this.log.debug(`CommandChannel: env ${environment.id} sending command: ${command}`);
        await this.sendCommandInternal(environment, command.toString("utf8"));
    }

    public async open(environment: EnvironmentInformation): Promise<void> {
        if (this.runnerConnections.has(environment.id)) {
            throw new Error(`CommandChannel: env ${environment.id} is opened already, shouldn't be opened again.`);
        }
        const connection = this.createRunnerConnection(environment);
        this.runnerConnections.set(environment.id, connection);
        await connection.open();
    }

    public async close(environment: EnvironmentInformation): Promise<void> {
        if (this.runnerConnections.has(environment.id)) {
            const connection = this.runnerConnections.get(environment.id);
            this.runnerConnections.delete(environment.id);
            if (connection !== undefined) {
                await connection.close();
            }
        }
    }

    protected parseCommands(content: string): [string, any][] {
        const commands: [string, any][] = [];

        let matches = this.commandPattern.exec(content);

        while (matches) {
            if (undefined !== matches.groups) {
                const commandType = matches.groups["type"];
                const dataLength = parseInt(matches.groups["length"]);
                const data: any = matches.groups["data"];
                if (dataLength !== data.length) {
                    throw new Error(`dataLength ${dataLength} not equal to actual length ${data.length}: ${data}`);
                }
                try {
                    const finalData = JSON.parse(data);
                    // to handle encode('utf8') of Python
                    commands.push([commandType, finalData]);
                } catch (error) {
                    this.log.error(`CommandChannel: error on parseCommands ${error}, original: ${matches.groups["data"]}`);
                    throw error;
                }
            }
            matches = this.commandPattern.exec(content);
        }

        return commands;
    }

    protected handleCommand(environment: EnvironmentInformation, content: string): void {
        const parsedResults = this.parseCommands(content);

        for (const parsedResult of parsedResults) {
            const commandType = parsedResult[0];
            const data = parsedResult[1];
            const command = new Command(environment, commandType, data);
            this.commandEmitter.emit("command", command);
            this.log.trace(`CommandChannel: env ${environment.id} emit command: ${commandType}, ${data}.`);
        }
    }
}