commandChannel.ts 4.87 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

'use strict';

import { EventEmitter } from "events";
import { getLogger, Logger } from "../../common/log";
import { TRIAL_COMMANDS } from "../../core/commands";
import { encodeCommand } from "../../core/ipcInterface";
import { Channel, EnvironmentInformation } from "./environment";

const acceptedCommands: Set<string> = new Set<string>(TRIAL_COMMANDS);

export class Command {
    public readonly environment: EnvironmentInformation;
    public readonly command: string;
    public readonly data: any;

    constructor(environment: EnvironmentInformation, command: string, data: any) {
        if (!acceptedCommands.has(command)) {
            throw new Error(`unaccepted command ${command}`);
        }
        this.environment = environment;
        this.command = command;
        this.data = data;
    }
}

export class RunnerConnection {
    public readonly environment: EnvironmentInformation;

    constructor(environment: EnvironmentInformation) {
        this.environment = environment;
    }

    public async open(): Promise<void> {
        // do nothing
    }

    public async close(): Promise<void> {
        // do nothing
    }
}

export abstract class CommandChannel {
    protected readonly log: Logger;
    protected runnerConnections: Map<string, RunnerConnection> = new Map<string, RunnerConnection>();
    protected readonly commandEmitter: EventEmitter;

    private readonly commandPattern: RegExp = /(?<type>[\w]{2})(?<length>[\d]{14})(?<data>.*)\n?/gm;

    public constructor(commandEmitter: EventEmitter) {
        this.log = getLogger();
        this.commandEmitter = commandEmitter;
    }

    public abstract get channelName(): Channel;
    public abstract config(key: string, value: any): Promise<void>;
    public abstract start(): Promise<void>;
    public abstract stop(): Promise<void>;

    protected abstract sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void>;
    protected abstract createRunnerConnection(environment: EnvironmentInformation): RunnerConnection;

    public async sendCommand(environment: EnvironmentInformation, commantType: string, data: any): Promise<void> {
        const command = encodeCommand(commantType, JSON.stringify(data));
        this.log.debug(`CommandChannel: env ${environment.id} sending command: ${command}`);
        await this.sendCommandInternal(environment, command.toString("utf8"));
    }

    public async open(environment: EnvironmentInformation): Promise<void> {
        if (this.runnerConnections.has(environment.id)) {
            throw new Error(`CommandChannel: env ${environment.id} is opened already, shouldn't be opened again.`);
        }
        const connection = this.createRunnerConnection(environment);
        this.runnerConnections.set(environment.id, connection);
        await connection.open();
    }

    public async close(environment: EnvironmentInformation): Promise<void> {
        if (this.runnerConnections.has(environment.id)) {
            const connection = this.runnerConnections.get(environment.id);
            this.runnerConnections.delete(environment.id);
            if (connection !== undefined) {
                await connection.close();
            }
        }
    }

    protected parseCommands(content: string): [string, any][] {
        const commands: [string, any][] = [];

        let matches = this.commandPattern.exec(content);

        while (matches) {
            if (undefined !== matches.groups) {
                const commandType = matches.groups["type"];
                const dataLength = parseInt(matches.groups["length"]);
                const data: any = matches.groups["data"];
                if (dataLength !== data.length) {
                    throw new Error(`dataLength ${dataLength} not equal to actual length ${data.length}: ${data}`);
                }
                try {
                    const finalData = JSON.parse(data);
                    // to handle encode('utf8') of Python
                    commands.push([commandType, finalData]);
                } catch (error) {
                    this.log.error(`CommandChannel: error on parseCommands ${error}, original: ${matches.groups["data"]}`);
                    throw error;
                }
            }
            matches = this.commandPattern.exec(content);
        }

        return commands;
    }

    protected handleCommand(environment: EnvironmentInformation, content: string): void {
        const parsedResults = this.parseCommands(content);

        for (const parsedResult of parsedResults) {
            const commandType = parsedResult[0];
            const data = parsedResult[1];
            const command = new Command(environment, commandType, data);
            this.commandEmitter.emit("command", command);
            this.log.trace(`CommandChannel: env ${environment.id} emit command: ${commandType}, ${data}.`);
        }
    }
}