ipcInterface.ts 5.45 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
Deshui Yu's avatar
Deshui Yu committed
3
4
5
6
7
8

'use strict';

import * as assert from 'assert';
import { ChildProcess } from 'child_process';
import { EventEmitter } from 'events';
QuanluZhang's avatar
QuanluZhang committed
9
import * as net from 'net';
Deshui Yu's avatar
Deshui Yu committed
10
import { Readable, Writable } from 'stream';
11
import { NNIError } from '../common/errors';
Deshui Yu's avatar
Deshui Yu committed
12
import { getLogger, Logger } from '../common/log';
13
import { getLogDir } from '../common/utils';
Deshui Yu's avatar
Deshui Yu committed
14
15
16
17
18
19
20
21
22
23
24
25
26
import * as CommandType from './commands';

const ipcOutgoingFd: number = 3;
const ipcIncomingFd: number = 4;

/**
 * Encode a command
 * @param commandType a command type defined in 'core/commands'
 * @param content payload of the command
 * @returns binary command data
 */
function encodeCommand(commandType: string, content: string): Buffer {
    const contentBuffer: Buffer = Buffer.from(content);
27
    const contentLengthBuffer: Buffer = Buffer.from(contentBuffer.length.toString().padStart(14, '0'));
Deshui Yu's avatar
Deshui Yu committed
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    return Buffer.concat([Buffer.from(commandType), contentLengthBuffer, contentBuffer]);
}

/**
 * Decode a command
 * @param Buffer binary incoming data
 * @returns a tuple of (success, commandType, content, remain)
 *          success: true if the buffer contains at least one complete command; otherwise false
 *          remain: remaining data after the first command
 */
function decodeCommand(data: Buffer): [boolean, string, string, Buffer] {
    if (data.length < 8) {
        return [false, '', '', data];
    }
    const commandType: string = data.slice(0, 2).toString();
43
44
    const contentLength: number = parseInt(data.slice(2, 16).toString(), 10);
    if (data.length < contentLength + 16) {
Deshui Yu's avatar
Deshui Yu committed
45
46
        return [false, '', '', data];
    }
47
48
    const content: string = data.slice(16, contentLength + 16).toString();
    const remain: Buffer = data.slice(contentLength + 16);
Deshui Yu's avatar
Deshui Yu committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

    return [true, commandType, content, remain];
}

class IpcInterface {
    private acceptCommandTypes: Set<string>;
    private outgoingStream: Writable;
    private incomingStream: Readable;
    private eventEmitter: EventEmitter;
    private readBuffer: Buffer;
    private logger: Logger = getLogger();

    /**
     * Construct a IPC proxy
     * @param proc the process to wrap
     * @param acceptCommandTypes set of accepted commands for this process
     */
QuanluZhang's avatar
QuanluZhang committed
66
    constructor(outStream: Writable, inStream: Readable, acceptCommandTypes: Set<string>) {
Deshui Yu's avatar
Deshui Yu committed
67
        this.acceptCommandTypes = acceptCommandTypes;
QuanluZhang's avatar
QuanluZhang committed
68
69
        this.outgoingStream = outStream;
        this.incomingStream = inStream;
Deshui Yu's avatar
Deshui Yu committed
70
71
72
73
        this.eventEmitter = new EventEmitter();
        this.readBuffer = Buffer.alloc(0);

        this.incomingStream.on('data', (data: Buffer) => { this.receive(data); });
74
75
        this.incomingStream.on('error', (error: Error) => { this.eventEmitter.emit('error', error); });
        this.outgoingStream.on('error', (error: Error) => { this.eventEmitter.emit('error', error); });
Deshui Yu's avatar
Deshui Yu committed
76
77
78
79
80
81
82
83
    }

    /**
     * Send a command to process
     * @param commandType: a command type defined in 'core/commands'
     * @param content: payload of command
     */
    public sendCommand(commandType: string, content: string = ''): void {
84
        this.logger.debug(`ipcInterface command type: [${commandType}], content:[${content}]`);
Deshui Yu's avatar
Deshui Yu committed
85
        assert.ok(this.acceptCommandTypes.has(commandType));
86
87
88
89

        try {
            const data: Buffer = encodeCommand(commandType, content);
            if (!this.outgoingStream.write(data)) {
chicm-ms's avatar
chicm-ms committed
90
                this.logger.warning('Commands jammed in buffer!');
91
92
            }
        } catch (err) {
93
94
95
96
            throw NNIError.FromError(
                err,
                `Dispatcher Error, please check this dispatcher log file for more detailed information: ${getLogDir()}/dispatcher.log . `
            );
Deshui Yu's avatar
Deshui Yu committed
97
98
99
100
101
102
103
104
105
106
        }
    }

    /**
     * Add a command listener
     * @param listener the listener callback
     */
    public onCommand(listener: (commandType: string, content: string) => void): void {
        this.eventEmitter.on('command', listener);
    }
107
108
109
110

    public onError(listener: (error: Error) => void): void {
        this.eventEmitter.on('error', listener);
    }
Deshui Yu's avatar
Deshui Yu committed
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134

    /**
     * Deal with incoming data from process
     * Invoke listeners for each complete command received, save incomplete command to buffer
     * @param data binary incoming data
     */
    private receive(data: Buffer): void {
        this.readBuffer = Buffer.concat([this.readBuffer, data]);
        while (this.readBuffer.length > 0) {
            const [success, commandType, content, remain] = decodeCommand(this.readBuffer);
            if (!success) {
                break;
            }
            assert.ok(this.acceptCommandTypes.has(commandType));
            this.eventEmitter.emit('command', commandType, content);
            this.readBuffer = remain;
        }
    }
}

/**
 * Create IPC proxy for tuner process
 * @param process_ the tuner process
 */
135
function createDispatcherInterface(process: ChildProcess): IpcInterface {
QuanluZhang's avatar
QuanluZhang committed
136
137
138
    const outStream = <Writable>process.stdio[ipcOutgoingFd];
    const inStream = <Readable>process.stdio[ipcIncomingFd];
    return new IpcInterface(outStream, inStream, new Set([...CommandType.TUNER_COMMANDS, ...CommandType.ASSESSOR_COMMANDS]));
Deshui Yu's avatar
Deshui Yu committed
139
140
}

QuanluZhang's avatar
QuanluZhang committed
141
142
143
144
145
146
function createDispatcherPipeInterface(pipePath: string): IpcInterface {
    const client = net.createConnection(pipePath);
    return new IpcInterface(client, client, new Set([...CommandType.TUNER_COMMANDS, ...CommandType.ASSESSOR_COMMANDS]));
}

export { IpcInterface, createDispatcherInterface, createDispatcherPipeInterface, encodeCommand, decodeCommand };