fileCommandChannel.ts 5.29 KB
Newer Older
1
2
3
4
5
6
7
8
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

'use strict';

import * as component from "../../../common/component";
import { delay } from "../../../common/utils";
import { CommandChannel, RunnerConnection } from "../commandChannel";
SparkSnail's avatar
SparkSnail committed
9
import { Channel, EnvironmentInformation } from "../environment";
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import { StorageService } from "../storageService";

class FileHandler {
    public fileName: string;
    public offset: number = 0;

    constructor(fileName: string) {
        this.fileName = fileName;
    }
}


class FileRunnerConnection extends RunnerConnection {
    public handlers: Map<string, FileHandler> = new Map<string, FileHandler>();
}

export class FileCommandChannel extends CommandChannel {
    private readonly commandPath = "commands";
    private stopping: boolean = false;
    // make sure no concurrent issue when sending commands.
    private sendQueues: [EnvironmentInformation, string][] = [];

    public get channelName(): Channel {
        return "file";
    }

    public async config(_key: string, _value: any): Promise<void> {
        // do nothing
    }

    public async start(): Promise<void> {
SparkSnail's avatar
SparkSnail committed
41
        // do nothing
42
43
44
45
46
47
    }

    public async stop(): Promise<void> {
        this.stopping = true;
    }

SparkSnail's avatar
SparkSnail committed
48
49
50
51
52
53
54
55
    public async run(): Promise<void> {
        // start command loops
        await Promise.all([
            this.receiveLoop(),
            this.sendLoop()
        ]);
    }

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
    protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise<void> {
        this.sendQueues.push([environment, message]);
    }

    protected createRunnerConnection(environment: EnvironmentInformation): RunnerConnection {
        return new FileRunnerConnection(environment);
    }

    private async sendLoop(): Promise<void> {
        const intervalSeconds = 0.5;
        while (!this.stopping) {
            const start = new Date();

            if (this.sendQueues.length > 0) {
                const storageService = component.get<StorageService>(StorageService);

                while (this.sendQueues.length > 0) {
                    const item = this.sendQueues.shift();
                    if (item === undefined) {
                        break;
                    }
                    const environment = item[0];
                    const message = `${item[1]}\n`;

                    const fileName = storageService.joinPath(environment.workingFolder, this.commandPath, `manager_commands.txt`);
                    await storageService.save(message, fileName, true);
                }
            }

            const end = new Date();
            const delayMs = intervalSeconds * 1000 - (end.valueOf() - start.valueOf());
            if (delayMs > 0) {
                await delay(delayMs);
            }
        }
    }

    private async receiveLoop(): Promise<void> {
        const intervalSeconds = 2;
        const storageService = component.get<StorageService>(StorageService);

        while (!this.stopping) {
            const start = new Date();

            const runnerConnections = [...this.runnerConnections.values()] as FileRunnerConnection[];
            for (const runnerConnection of runnerConnections) {
                const envCommandFolder = storageService.joinPath(runnerConnection.environment.workingFolder, this.commandPath);
                // open new command files
                if (runnerConnection.handlers.size < runnerConnection.environment.nodeCount) {
                    // to find all node commands file
                    const commandFileNames = await storageService.listDirectory(envCommandFolder);
                    const toAddedFileNames = [];
                    for (const commandFileName of commandFileNames) {
                        if (commandFileName.startsWith("runner_commands") && !runnerConnection.handlers.has(commandFileName)) {
                            toAddedFileNames.push(commandFileName);
                        }
                    }

                    for (const toAddedFileName of toAddedFileNames) {
                        const fullPath = storageService.joinPath(envCommandFolder, toAddedFileName);
                        const fileHandler: FileHandler = new FileHandler(fullPath);
                        runnerConnection.handlers.set(toAddedFileName, fileHandler);
                        this.log.debug(`FileCommandChannel: added fileHandler env ${runnerConnection.environment.id} ${toAddedFileName}`);
                    }
                }

                // to loop all commands
                for (const fileHandler of runnerConnection.handlers.values()) {
                    const newContent = await storageService.readFileContent(fileHandler.fileName, fileHandler.offset, undefined);
                    if (newContent.length > 0) {
                        const commands = newContent.split('\n');
                        for (const command of commands) {
                            this.handleCommand(runnerConnection.environment, command);
                        }
                        fileHandler.offset += newContent.length;
                    }
                }
            }

            const end = new Date();
            const delayMs = intervalSeconds * 1000 - (end.valueOf() - start.valueOf());
            if (delayMs > 0) {
                await delay(delayMs);
            }
        }
    }
}