webCommandChannel.ts 5.37 KB
Newer Older
1
2
3
4
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { Server as SocketServer } from "ws";
5
6
import { getBasePort, getExperimentId } from "common/experimentStartupInfo";
import { INITIALIZED } from 'core/commands';
7
8
import { CommandChannel, RunnerConnection } from "../commandChannel";
import { Channel, EnvironmentInformation } from "../environment";
9
import { EventEmitter } from "events";
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

class WebRunnerConnection extends RunnerConnection {
    public readonly clients: WebSocket[] = [];

    public async close(): Promise<void> {
        await super.close();
        while (this.clients.length > 0) {
            const client = this.clients.shift();
            if (client !== undefined) {
                client.close();
            }
        }
    }

    public AddClient(client: WebSocket): void {
        this.clients.push(client);
    }
}

export class WebCommandChannel extends CommandChannel {
    private readonly expId: string = getExperimentId();
31
    private static commandChannel: WebCommandChannel;
32
33
34
35
36
37
38
39
40
41
    private webSocketServer: SocketServer | undefined;
    private clients: Map<WebSocket, WebRunnerConnection | undefined> = new Map<WebSocket, WebRunnerConnection | undefined>();

    public get channelName(): Channel {
        return "web";
    }

    public async config(_key: string, _value: any): Promise<void> {
        // do nothing
    }
42
43
44
45
46
47
48
49
50
51
52
53
    
    // Set WebCommandChannel as singleton mode, one experiment could only start one webCommandChannel instance
    private constructor(commandEmitter: EventEmitter) {
        super(commandEmitter);
    }

    public static getInstance(commandEmitter: EventEmitter): CommandChannel {
        if (!this.commandChannel) {
            this.commandChannel = new WebCommandChannel(commandEmitter);
        }
        return this.commandChannel;
    }
54
55
56
57
58
59
60
61

    public async start(): Promise<void> {
        const port = getBasePort() + 1;
        this.webSocketServer = new SocketServer({ port });

        this.webSocketServer.on('connection', (client: WebSocket) => {
            this.log.debug(`WebCommandChannel: received connection`);
            client.onerror = (event): void => {
liuzhe-lz's avatar
liuzhe-lz committed
62
                this.log.error('error on client', event);
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
            }

            this.clients.set(client, undefined);
            client.onmessage = (message): void => {
                this.receivedWebSocketMessage(client, message);
            };
        }).on('error', (error) => {
            this.log.error(`error on websocket server ${error}`);
        });
    }

    public async stop(): Promise<void> {
        if (this.webSocketServer !== undefined) {
            this.webSocketServer.close();
        }
    }

SparkSnail's avatar
SparkSnail committed
80
81
82
83
    public async run(): Promise<void>{
        // do nothing
    }

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
    protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
        if (this.webSocketServer === undefined) {
            throw new Error(`WebCommandChannel: uninitialized!`)
        }
        const runnerConnection = this.runnerConnections.get(environment.id) as WebRunnerConnection;
        if (runnerConnection !== undefined) {
            for (const client of runnerConnection.clients) {
                client.send(message);
            }
        } else {
            this.log.warning(`WebCommandChannel: cannot find client for env ${environment.id}, message is ignored.`);
        }
    }

    protected createRunnerConnection(environment: EnvironmentInformation): RunnerConnection {
        return new WebRunnerConnection(environment);
    }

    private receivedWebSocketMessage(client: WebSocket, message: MessageEvent): void {
        let connection = this.clients.get(client) as WebRunnerConnection | undefined;
        const rawCommands = message.data.toString();

        if (connection === undefined) {
            // undefined means it's expecting initializing message.
            const commands = this.parseCommands(rawCommands);
            let isValid = false;
liuzhe-lz's avatar
liuzhe-lz committed
110
            this.log.debug('WebCommandChannel: received initialize message:', rawCommands);
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

            if (commands.length > 0) {
                const commandType = commands[0][0];
                const result = commands[0][1];
                if (commandType === INITIALIZED &&
                    result.expId === this.expId &&
                    this.runnerConnections.has(result.runnerId)
                ) {
                    const runnerConnection = this.runnerConnections.get(result.runnerId) as WebRunnerConnection;
                    this.clients.set(client, runnerConnection);
                    runnerConnection.AddClient(client);
                    connection = runnerConnection;
                    isValid = true;
                    this.log.debug(`WebCommandChannel: client of env ${runnerConnection.environment.id} initialized`);
                } else {
                    this.log.warning(`WebCommandChannel: client is not initialized, runnerId: ${result.runnerId}, command: ${commandType}, expId: ${this.expId}, exists: ${this.runnerConnections.has(result.runnerId)}`);
                }
            }

            if (!isValid) {
                this.log.warning(`WebCommandChannel: rejected client with invalid init message ${rawCommands}`);
                client.close();
                this.clients.delete(client);
            }
        }

        if (connection !== undefined) {
            this.handleCommand(connection.environment, rawCommands);
        }
    }
}