hdfsClientUtility.ts 9.14 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

import * as fs from 'fs';
21
import * as path from 'path';
22
import { Deferred } from 'ts-deferred';
23
import { getExperimentId } from '../../common/experimentStartupInfo';
24
import { getLogger } from '../../common/log';
25
import { unixPathJoin } from '../../common/utils';
26
27
28
29
30

/**
 * HDFS client utility, including copy file/directory
 */
export namespace HDFSClientUtility {
31
32
33
34
35
    /**
     * Get NNI experiment root directory
     * @param hdfsUserName HDFS user name
     */
    function hdfsExpRootDir(hdfsUserName: string): string {
36
        // tslint:disable-next-line:prefer-template
37
        return '/' + unixPathJoin(hdfsUserName, 'nni', 'experiments', getExperimentId());
38
39
40
41
42
43
44
    }

    /**
     * Get NNI experiment code directory
     * @param hdfsUserName HDFS user name
     */
    export function getHdfsExpCodeDir(hdfsUserName: string): string {
45
        return unixPathJoin(hdfsExpRootDir(hdfsUserName), 'codeDir');
46
47
48
49
50
51
52
53
    }

    /**
     * Get NNI trial working directory
     * @param hdfsUserName HDFS user name
     * @param trialId NNI trial ID
     */
    export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
54
55
        const root: string = hdfsExpRootDir(hdfsUserName);

56
        return unixPathJoin(root, 'trials', trialId);
57
58
    }

59
60
    /**
     * Copy a local file to hdfs directory
61
     *
62
63
64
65
     * @param localFilePath local file path(source)
     * @param hdfsFilePath hdfs file path(target)
     * @param hdfsClient hdfs client
     */
66
    // tslint:disable: no-unsafe-any non-literal-fs-path no-any
67
68
    export async function copyFileToHdfs(localFilePath : string, hdfsFilePath : string, hdfsClient : any) : Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();
69
        // tslint:disable-next-line:non-literal-fs-path
70
71
72
        fs.exists(localFilePath, (exists : boolean) => {
            // Detect if local file exist
            if (exists) {
73
74
                const localFileStream: fs.ReadStream = fs.createReadStream(localFilePath);
                const hdfsFileStream: any = hdfsClient.createWriteStream(hdfsFilePath);
75
                localFileStream.pipe(hdfsFileStream);
76
                hdfsFileStream.on('finish', () => {
77
78
79
                    deferred.resolve();
                });
                hdfsFileStream.on('error', (err : any) => {
80
81
                    getLogger()
                      .error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
82
83
84
                    deferred.reject(err);
                });
            } else {
85
86
                getLogger()
                  .error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
87
88
89
                deferred.reject('file not exist!');
            }
        });
90

91
92
93
94
95
        return deferred.promise;
    }

    /**
     * Recursively copy local directory to hdfs directory
96
     *
97
98
99
100
     * @param localDirectory local directory
     * @param hdfsDirectory HDFS directory
     * @param hdfsClient   HDFS client
     */
101
    export async function copyDirectoryToHdfs(localDirectory : string, hdfsDirectory : string, hdfsClient : any) : Promise<void> {
102
103
104
105
        const deferred: Deferred<void> = new Deferred<void>();
        // TODO: fs.readdirSync doesn't support ~($HOME)
        const fileNameArray: string[] = fs.readdirSync(localDirectory);

106
        for (const fileName of fileNameArray) {
107
108
            const fullFilePath: string = path.join(localDirectory, fileName);
            try {
109
110
111
                // tslint:disable-next-line:non-literal-fs-path
                if (fs.lstatSync(fullFilePath)
                    .isFile()) {
112
113
114
115
116
                    await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                } else {
                    // If filePath is a directory, recuisively copy it to remote directory
                    await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                }
117
            } catch (error) {
118
119
120
121
122
123
124
125
126
127
128
                deferred.reject(error);
            }
        }
        // All files/directories are copied successfully, resolve
        deferred.resolve();

        return deferred.promise;
    }

    /**
     * Read content from HDFS file
129
     *
130
131
132
     * @param hdfsPath HDFS file path
     * @param hdfsClient HDFS client
     */
133
    export async function readFileFromHDFS(hdfsPath : string, hdfsClient : any) : Promise<Buffer> {
134
135
136
137
        const deferred: Deferred<Buffer> = new Deferred<Buffer>();
        let buffer : Buffer = Buffer.alloc(0);

        const exist : boolean = await pathExists(hdfsPath, hdfsClient);
138
        if (!exist) {
139
140
141
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

142
        const remoteFileStream: any = hdfsClient.createReadStream(hdfsPath);
143
144
145
146
147
148
149
150
151
        remoteFileStream.on('error', (err : any) => {
            // Reject with the error
            deferred.reject(err);
        });

        remoteFileStream.on('data', (chunk : any) => {
            // Concat the data chunk to buffer
            buffer = Buffer.concat([buffer, chunk]);
        });
152

153
        remoteFileStream.on('finish', () => {
154
155
156
157
158
159
160
161
162
            // Upload is done, resolve
            deferred.resolve(buffer);
        });

        return deferred.promise;
    }

    /**
     * Check if an HDFS path already exists
163
     *
164
165
166
167
168
     * @param hdfsPath target path need to check in HDFS
     * @param hdfsClient HDFS client
     */
    export async function pathExists(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
169
        hdfsClient.exists(hdfsPath, (exist : boolean) => {
170
             deferred.resolve(exist);
fishyds's avatar
fishyds committed
171
172
        });

173
174
        let timeoutId : NodeJS.Timer;

175
        const delayTimeout : Promise<boolean> = new Promise<boolean>((resolve : Function, reject : Function) : void => {
176
            // Set timeout and reject the promise once reach timeout (5 seconds)
177
            timeoutId = setTimeout(() => { reject(`Check HDFS path ${hdfsPath} exists timeout`); }, 5000);
178
        });
179

180
181
        return Promise.race([deferred.promise, delayTimeout])
          .finally(() => { clearTimeout(timeoutId); });
182
183
184
185
    }

    /**
     * Mkdir in HDFS, use default permission 755
186
     *
187
     * @param hdfsPath the path in HDFS. It could be either file or directory
188
     * @param hdfsClient HDFS client
189
190
191
192
     */
    export function mkdir(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();

193
194
        hdfsClient.mkdir(hdfsPath, (err : any) => {
            if (!err) {
195
196
197
198
199
200
201
202
203
204
205
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });

        return deferred.promise;
    }

    /**
     * Read directory contents
206
     *
207
     * @param hdfsPath the path in HDFS. It could be either file or directory
208
     * @param hdfsClient HDFS client
209
210
211
212
     */
    export async function readdir(hdfsPath : string, hdfsClient : any) : Promise<string[]> {
        const deferred : Deferred<string[]> = new Deferred<string[]>();
        const exist : boolean = await pathExists(hdfsPath, hdfsClient);
213
        if (!exist) {
214
215
216
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

217
218
        hdfsClient.readdir(hdfsPath, (err : any, files : any[]) => {
            if (err) {
219
220
221
222
223
224
225
226
227
228
229
230
                deferred.reject(err);
            }

            deferred.resolve(files);
        });

        return deferred.promise;
    }

    /**
     * Delete HDFS path
     * @param hdfsPath the path in HDFS. It could be either file or directory
231
     * @param hdfsClient HDFS client
232
233
234
235
     * @param recursive Mark if need to delete recursively
     */
    export function deletePath(hdfsPath : string, hdfsClient : any, recursive : boolean = true) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
236
237
        hdfsClient.unlink(hdfsPath, recursive, (err : any) => {
            if (!err) {
238
239
240
241
242
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });
243

244
245
        return deferred.promise;
    }
246
    // tslint:enable: no-unsafe-any non-literal-fs-path no-any
247
}