hdfsClientUtility.ts 8.1 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3
4

import * as fs from 'fs';
5
import * as path from 'path';
6
import { Deferred } from 'ts-deferred';
7
import { getExperimentId } from '../../common/experimentStartupInfo';
8
import { getLogger } from '../../common/log';
9
import { unixPathJoin } from '../../common/utils';
10
11
12
13
14

/**
 * HDFS client utility, including copy file/directory
 */
export namespace HDFSClientUtility {
15
16
17
18
    /**
     * Get NNI experiment root directory
     * @param hdfsUserName HDFS user name
     */
19
    export function hdfsExpRootDir(hdfsUserName: string): string {
20
        // tslint:disable-next-line:prefer-template
21
        return '/' + unixPathJoin(hdfsUserName, 'nni', 'experiments', getExperimentId());
22
23
24
25
26
27
28
    }

    /**
     * Get NNI experiment code directory
     * @param hdfsUserName HDFS user name
     */
    export function getHdfsExpCodeDir(hdfsUserName: string): string {
29
        return unixPathJoin(hdfsExpRootDir(hdfsUserName), 'codeDir');
30
31
32
33
34
35
36
37
    }

    /**
     * Get NNI trial working directory
     * @param hdfsUserName HDFS user name
     * @param trialId NNI trial ID
     */
    export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
38
39
        const root: string = hdfsExpRootDir(hdfsUserName);

40
        return unixPathJoin(root, 'trials', trialId);
41
42
    }

43
44
    /**
     * Copy a local file to hdfs directory
45
     *
46
47
48
49
     * @param localFilePath local file path(source)
     * @param hdfsFilePath hdfs file path(target)
     * @param hdfsClient hdfs client
     */
50
    // tslint:disable: no-unsafe-any non-literal-fs-path no-any
51
52
    export async function copyFileToHdfs(localFilePath : string, hdfsFilePath : string, hdfsClient : any) : Promise<void> {
        const deferred: Deferred<void> = new Deferred<void>();
53
        // tslint:disable-next-line:non-literal-fs-path
54
55
56
        fs.exists(localFilePath, (exists : boolean) => {
            // Detect if local file exist
            if (exists) {
57
58
                const localFileStream: fs.ReadStream = fs.createReadStream(localFilePath);
                const hdfsFileStream: any = hdfsClient.createWriteStream(hdfsFilePath);
59
                localFileStream.pipe(hdfsFileStream);
60
                hdfsFileStream.on('finish', () => {
61
62
63
                    deferred.resolve();
                });
                hdfsFileStream.on('error', (err : any) => {
64
65
                    getLogger()
                      .error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
66
67
68
                    deferred.reject(err);
                });
            } else {
69
70
                getLogger()
                  .error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
71
72
73
                deferred.reject('file not exist!');
            }
        });
74

75
76
77
78
79
        return deferred.promise;
    }

    /**
     * Recursively copy local directory to hdfs directory
80
     *
81
82
83
84
     * @param localDirectory local directory
     * @param hdfsDirectory HDFS directory
     * @param hdfsClient   HDFS client
     */
85
    export async function copyDirectoryToHdfs(localDirectory : string, hdfsDirectory : string, hdfsClient : any) : Promise<void> {
86
87
88
89
        const deferred: Deferred<void> = new Deferred<void>();
        // TODO: fs.readdirSync doesn't support ~($HOME)
        const fileNameArray: string[] = fs.readdirSync(localDirectory);

90
        for (const fileName of fileNameArray) {
91
92
            const fullFilePath: string = path.join(localDirectory, fileName);
            try {
93
94
95
                // tslint:disable-next-line:non-literal-fs-path
                if (fs.lstatSync(fullFilePath)
                    .isFile()) {
96
97
98
99
100
                    await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                } else {
                    // If filePath is a directory, recuisively copy it to remote directory
                    await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                }
101
            } catch (error) {
102
103
104
105
106
107
108
109
110
111
112
                deferred.reject(error);
            }
        }
        // All files/directories are copied successfully, resolve
        deferred.resolve();

        return deferred.promise;
    }

    /**
     * Read content from HDFS file
113
     *
114
115
116
     * @param hdfsPath HDFS file path
     * @param hdfsClient HDFS client
     */
117
    export async function readFileFromHDFS(hdfsPath : string, hdfsClient : any) : Promise<Buffer> {
118
119
120
121
        const deferred: Deferred<Buffer> = new Deferred<Buffer>();
        let buffer : Buffer = Buffer.alloc(0);

        const exist : boolean = await pathExists(hdfsPath, hdfsClient);
122
        if (!exist) {
123
124
125
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

126
        const remoteFileStream: any = hdfsClient.createReadStream(hdfsPath);
127
128
129
130
131
132
133
134
135
        remoteFileStream.on('error', (err : any) => {
            // Reject with the error
            deferred.reject(err);
        });

        remoteFileStream.on('data', (chunk : any) => {
            // Concat the data chunk to buffer
            buffer = Buffer.concat([buffer, chunk]);
        });
136

137
        remoteFileStream.on('finish', () => {
138
139
140
141
142
143
144
145
146
            // Upload is done, resolve
            deferred.resolve(buffer);
        });

        return deferred.promise;
    }

    /**
     * Check if an HDFS path already exists
147
     *
148
149
150
151
152
     * @param hdfsPath target path need to check in HDFS
     * @param hdfsClient HDFS client
     */
    export async function pathExists(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
153
        hdfsClient.exists(hdfsPath, (exist : boolean) => {
154
             deferred.resolve(exist);
fishyds's avatar
fishyds committed
155
156
        });

157
158
        let timeoutId : NodeJS.Timer;

159
        const delayTimeout : Promise<boolean> = new Promise<boolean>((resolve : Function, reject : Function) : void => {
160
            // Set timeout and reject the promise once reach timeout (5 seconds)
161
            timeoutId = setTimeout(() => { reject(`Check HDFS path ${hdfsPath} exists timeout`); }, 5000);
162
        });
163

164
165
        return Promise.race([deferred.promise, delayTimeout])
          .finally(() => { clearTimeout(timeoutId); });
166
167
168
169
    }

    /**
     * Mkdir in HDFS, use default permission 755
170
     *
171
     * @param hdfsPath the path in HDFS. It could be either file or directory
172
     * @param hdfsClient HDFS client
173
174
175
176
     */
    export function mkdir(hdfsPath : string, hdfsClient : any) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();

177
178
        hdfsClient.mkdir(hdfsPath, (err : any) => {
            if (!err) {
179
180
181
182
183
184
185
186
187
188
189
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });

        return deferred.promise;
    }

    /**
     * Read directory contents
190
     *
191
     * @param hdfsPath the path in HDFS. It could be either file or directory
192
     * @param hdfsClient HDFS client
193
194
195
196
     */
    export async function readdir(hdfsPath : string, hdfsClient : any) : Promise<string[]> {
        const deferred : Deferred<string[]> = new Deferred<string[]>();
        const exist : boolean = await pathExists(hdfsPath, hdfsClient);
197
        if (!exist) {
198
199
200
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

201
202
        hdfsClient.readdir(hdfsPath, (err : any, files : any[]) => {
            if (err) {
203
204
205
206
207
208
209
210
211
212
213
214
                deferred.reject(err);
            }

            deferred.resolve(files);
        });

        return deferred.promise;
    }

    /**
     * Delete HDFS path
     * @param hdfsPath the path in HDFS. It could be either file or directory
215
     * @param hdfsClient HDFS client
216
217
218
219
     * @param recursive Mark if need to delete recursively
     */
    export function deletePath(hdfsPath : string, hdfsClient : any, recursive : boolean = true) : Promise<boolean> {
        const deferred : Deferred<boolean> = new Deferred<boolean>();
220
221
        hdfsClient.unlink(hdfsPath, recursive, (err : any) => {
            if (!err) {
222
223
224
225
226
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });
227

228
229
        return deferred.promise;
    }
230
    // tslint:enable: no-unsafe-any non-literal-fs-path no-any
231
}