hdfsClientUtility.ts 8.74 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
 * Copyright (c) Microsoft Corporation
 * All rights reserved.
 *
 * MIT License
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
 * documentation files (the "Software"), to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
 * to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
 * BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 */

import * as path from 'path';
import * as fs from 'fs';
import { Deferred } from 'ts-deferred';
23
import { getExperimentId } from '../../common/experimentStartupInfo';
24
import { getLogger } from '../../common/log';
25
import { unixPathJoin } from '../../common/utils'
26
27
28
29
30

/**
 * HDFS client utility, including copy file/directory
 */
export namespace HDFSClientUtility {
31
32
33
34
35
    /**
     * Get NNI experiment root directory
     * @param hdfsUserName HDFS user name
     */
    function hdfsExpRootDir(hdfsUserName: string): string {
36
        return '/' + unixPathJoin(hdfsUserName, 'nni', 'experiments', getExperimentId());
37
38
39
40
41
42
43
    }

    /**
     * Get NNI experiment code directory
     * @param hdfsUserName HDFS user name
     */
    export function getHdfsExpCodeDir(hdfsUserName: string): string {
44
        return unixPathJoin(hdfsExpRootDir(hdfsUserName), 'codeDir');
45
46
47
48
49
50
51
52
    }

    /**
     * Get NNI trial working directory
     * @param hdfsUserName HDFS user name
     * @param trialId NNI trial ID
     */
    export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
53
54
55
        let root = hdfsExpRootDir(hdfsUserName)
        console.log(root)
        return unixPathJoin(root, 'trials', trialId);
56
57
    }

58
59
    /**
     * Copy a local file to hdfs directory
60
     *
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
     * @param localFilePath local file path(source)
     * @param hdfsFilePath hdfs file path(target)
     * @param hdfsClient hdfs client
     */
    export async function copyFileToHdfs(localFilePath : string, hdfsFilePath : string, hdfsClient : any) : Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();
        fs.exists(localFilePath, (exists : boolean) => {
            // Detect if local file exist
            if (exists) {
                var localFileStream = fs.createReadStream(localFilePath);
                var hdfsFileStream = hdfsClient.createWriteStream(hdfsFilePath);
                localFileStream.pipe(hdfsFileStream);
                hdfsFileStream.on('finish', function onFinish () {
                    deferred.resolve();
                });
                hdfsFileStream.on('error', (err : any) => {
                    getLogger().error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
                    deferred.reject(err);
                });
            } else {
                getLogger().error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
                deferred.reject('file not exist!');
            }
        });
        return deferred.promise;
    }

    /**
     * Recursively copy local directory to hdfs directory
90
     *
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
     * @param localDirectory local directory
     * @param hdfsDirectory HDFS directory
     * @param hdfsClient   HDFS client
     */
    export async function copyDirectoryToHdfs(localDirectory : string, hdfsDirectory : string, hdfsClient : any) : Promise<void>{
        const deferred: Deferred<void> = new Deferred<void>();
        // TODO: fs.readdirSync doesn't support ~($HOME)
        const fileNameArray: string[] = fs.readdirSync(localDirectory);

        for(var fileName of fileNameArray){
            const fullFilePath: string = path.join(localDirectory, fileName);
            try {
                if (fs.lstatSync(fullFilePath).isFile()) {
                    await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                } else {
                    // If filePath is a directory, recuisively copy it to remote directory
                    await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                }
            } catch(error) {
                deferred.reject(error);
            }
        }
        // All files/directories are copied successfully, resolve
        deferred.resolve();

        return deferred.promise;
    }

    /**
     * Read content from HDFS file
121
     *
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
     * @param hdfsPath HDFS file path
     * @param hdfsClient HDFS client
     */
    export async function readFileFromHDFS(hdfsPath : string, hdfsClient :any) : Promise<Buffer> {
        const deferred: Deferred<Buffer> = new Deferred<Buffer>();
        let buffer : Buffer = Buffer.alloc(0);

        const exist : boolean = await pathExists(hdfsPath, hdfsClient);
        if(!exist) {
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

        const remoteFileStream = hdfsClient.createReadStream(hdfsPath);
        remoteFileStream.on('error', (err : any) => {
            // Reject with the error
            deferred.reject(err);
        });

        remoteFileStream.on('data', (chunk : any) => {
            // Concat the data chunk to buffer
            buffer = Buffer.concat([buffer, chunk]);
        });
144

145
146
147
148
149
150
151
152
153
154
        remoteFileStream.on('finish', function onFinish () {
            // Upload is done, resolve
            deferred.resolve(buffer);
        });

        return deferred.promise;
    }

    /**
     * Check if an HDFS path already exists
155
     *
156
157
158
159
160
161
162
     * @param hdfsPath target path need to check in HDFS
     * @param hdfsClient HDFS client
     */
    export async function pathExists(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        hdfsClient.exists(hdfsPath, (exist : boolean ) => {
             deferred.resolve(exist);
fishyds's avatar
fishyds committed
163
164
        });

165
166
        let timeoutId : NodeJS.Timer
        const delayTimeout : Promise<boolean> = new Promise<boolean>((resolve : Function, reject : Function) : void => {
167
            // Set timeout and reject the promise once reach timeout (5 seconds)
168
            timeoutId = setTimeout(() => deferred.reject(`Check HDFS path ${hdfsPath} exists timeout`), 5000);
169
        });
170

171
        return Promise.race([deferred.promise, delayTimeout]).finally(() => clearTimeout(timeoutId));
172
173
174
175
    }

    /**
     * Mkdir in HDFS, use default permission 755
176
     *
177
     * @param hdfsPath the path in HDFS. It could be either file or directory
178
     * @param hdfsClient
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
     */
    export function mkdir(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();

        hdfsClient.mkdir(hdfsPath, (err : any)=> {
            if(!err) {
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });

        return deferred.promise;
    }

    /**
     * Read directory contents
196
     *
197
     * @param hdfsPath the path in HDFS. It could be either file or directory
198
     * @param hdfsClient
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
     */
    export async function readdir(hdfsPath : string, hdfsClient : any) : Promise<string[]> {
        const deferred : Deferred<string[]> = new Deferred<string[]>();
        const exist : boolean = await pathExists(hdfsPath, hdfsClient);
        if(!exist) {
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

        hdfsClient.readdir(hdfsPath, (err : any, files : any[] ) => {
            if(err) {
                deferred.reject(err);
            }

            deferred.resolve(files);
        });

        return deferred.promise;
    }

    /**
     * Delete HDFS path
     * @param hdfsPath the path in HDFS. It could be either file or directory
221
     * @param hdfsClient
222
223
224
225
226
227
228
229
230
231
232
233
234
235
     * @param recursive Mark if need to delete recursively
     */
    export function deletePath(hdfsPath : string, hdfsClient : any, recursive : boolean = true) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
        hdfsClient.unlink(hdfsPath, recursive, (err : any)=> {
            if(!err) {
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });
        return deferred.promise;
    }
}