hdfsClientUtility.ts 8.06 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3
4

import * as fs from 'fs';
5
import * as path from 'path';
6
import { Deferred } from 'ts-deferred';
7
import { getExperimentId } from '../../common/experimentStartupInfo';
8
import { getLogger } from '../../common/log';
9
import { unixPathJoin } from '../../common/utils';
10
11
12
13
14

/**
 * HDFS client utility, including copy file/directory
 */
export namespace HDFSClientUtility {
15
16
17
18
    /**
     * Get NNI experiment root directory
     * @param hdfsUserName HDFS user name
     */
19
    export function hdfsExpRootDir(hdfsUserName: string): string {
20
        // tslint:disable-next-line:prefer-template
21
        return '/' + unixPathJoin(hdfsUserName, 'nni', 'experiments', getExperimentId());
22
23
24
25
26
27
28
    }

    /**
     * Get NNI experiment code directory
     * @param hdfsUserName HDFS user name
     */
    export function getHdfsExpCodeDir(hdfsUserName: string): string {
29
        return unixPathJoin(hdfsExpRootDir(hdfsUserName), 'codeDir');
30
31
32
33
34
35
36
37
    }

    /**
     * Get NNI trial working directory
     * @param hdfsUserName HDFS user name
     * @param trialId NNI trial ID
     */
    export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
38
39
        const root: string = hdfsExpRootDir(hdfsUserName);

40
        return unixPathJoin(root, 'trials', trialId);
41
42
    }

43
44
    /**
     * Copy a local file to hdfs directory
45
     *
46
47
48
49
     * @param localFilePath local file path(source)
     * @param hdfsFilePath hdfs file path(target)
     * @param hdfsClient hdfs client
     */
50
    // tslint:disable: no-unsafe-any non-literal-fs-path no-any
chicm-ms's avatar
chicm-ms committed
51
    export async function copyFileToHdfs(localFilePath: string, hdfsFilePath: string, hdfsClient: any): Promise<void> {
52
        const deferred: Deferred<void> = new Deferred<void>();
53
        // tslint:disable-next-line:non-literal-fs-path
chicm-ms's avatar
chicm-ms committed
54
        fs.exists(localFilePath, (exists: boolean) => {
55
56
            // Detect if local file exist
            if (exists) {
57
58
                const localFileStream: fs.ReadStream = fs.createReadStream(localFilePath);
                const hdfsFileStream: any = hdfsClient.createWriteStream(hdfsFilePath);
59
                localFileStream.pipe(hdfsFileStream);
60
                hdfsFileStream.on('finish', () => {
61
62
                    deferred.resolve();
                });
chicm-ms's avatar
chicm-ms committed
63
                hdfsFileStream.on('error', (err: any) => {
64
65
                    getLogger()
                      .error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
66
67
68
                    deferred.reject(err);
                });
            } else {
69
70
                getLogger()
                  .error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
71
72
73
                deferred.reject('file not exist!');
            }
        });
74

75
76
77
78
79
        return deferred.promise;
    }

    /**
     * Recursively copy local directory to hdfs directory
80
     *
81
82
83
84
     * @param localDirectory local directory
     * @param hdfsDirectory HDFS directory
     * @param hdfsClient   HDFS client
     */
chicm-ms's avatar
chicm-ms committed
85
    export async function copyDirectoryToHdfs(localDirectory: string, hdfsDirectory: string, hdfsClient: any): Promise<void> {
86
87
88
89
        const deferred: Deferred<void> = new Deferred<void>();
        // TODO: fs.readdirSync doesn't support ~($HOME)
        const fileNameArray: string[] = fs.readdirSync(localDirectory);

90
        for (const fileName of fileNameArray) {
91
92
            const fullFilePath: string = path.join(localDirectory, fileName);
            try {
93
94
95
                // tslint:disable-next-line:non-literal-fs-path
                if (fs.lstatSync(fullFilePath)
                    .isFile()) {
96
97
98
99
100
                    await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                } else {
                    // If filePath is a directory, recuisively copy it to remote directory
                    await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                }
101
            } catch (error) {
102
103
104
105
106
107
108
109
110
                deferred.reject(error);
            }
        }
        // All files/directories are copied successfully, resolve
        deferred.resolve();

        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
    /**
     * Check if an HDFS path already exists
     *
     * @param hdfsPath target path need to check in HDFS
     * @param hdfsClient HDFS client
     */
    export async function pathExists(hdfsPath: string, hdfsClient: any): Promise<boolean> {
        const deferred: Deferred<boolean> = new Deferred<boolean>();
        hdfsClient.exists(hdfsPath, (exist: boolean) => {
             deferred.resolve(exist);
        });

        let timeoutId: NodeJS.Timer;

        const delayTimeout: Promise<boolean> = new Promise<boolean>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(() => { reject(`Check HDFS path ${hdfsPath} exists timeout`); }, 5000);
        });

        return Promise.race([deferred.promise, delayTimeout])
          .finally(() => { clearTimeout(timeoutId); });
    }

134
135
    /**
     * Read content from HDFS file
136
     *
137
138
139
     * @param hdfsPath HDFS file path
     * @param hdfsClient HDFS client
     */
chicm-ms's avatar
chicm-ms committed
140
    export async function readFileFromHDFS(hdfsPath: string, hdfsClient: any): Promise<Buffer> {
141
        const deferred: Deferred<Buffer> = new Deferred<Buffer>();
chicm-ms's avatar
chicm-ms committed
142
        let buffer: Buffer = Buffer.alloc(0);
143

chicm-ms's avatar
chicm-ms committed
144
        const exist: boolean = await pathExists(hdfsPath, hdfsClient);
145
        if (!exist) {
146
147
148
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

149
        const remoteFileStream: any = hdfsClient.createReadStream(hdfsPath);
chicm-ms's avatar
chicm-ms committed
150
        remoteFileStream.on('error', (err: any) => {
151
152
153
154
            // Reject with the error
            deferred.reject(err);
        });

chicm-ms's avatar
chicm-ms committed
155
        remoteFileStream.on('data', (chunk: any) => {
156
157
158
            // Concat the data chunk to buffer
            buffer = Buffer.concat([buffer, chunk]);
        });
159

160
        remoteFileStream.on('finish', () => {
161
162
163
164
165
166
167
168
169
            // Upload is done, resolve
            deferred.resolve(buffer);
        });

        return deferred.promise;
    }

    /**
     * Mkdir in HDFS, use default permission 755
170
     *
171
     * @param hdfsPath the path in HDFS. It could be either file or directory
172
     * @param hdfsClient HDFS client
173
     */
chicm-ms's avatar
chicm-ms committed
174
175
    export function mkdir(hdfsPath: string, hdfsClient: any): Promise<boolean> {
        const deferred: Deferred<boolean> = new Deferred<boolean>();
176

chicm-ms's avatar
chicm-ms committed
177
        hdfsClient.mkdir(hdfsPath, (err: any) => {
178
            if (!err) {
179
180
181
182
183
184
185
186
187
188
189
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });

        return deferred.promise;
    }

    /**
     * Read directory contents
190
     *
191
     * @param hdfsPath the path in HDFS. It could be either file or directory
192
     * @param hdfsClient HDFS client
193
     */
chicm-ms's avatar
chicm-ms committed
194
195
196
    export async function readdir(hdfsPath: string, hdfsClient: any): Promise<string[]> {
        const deferred: Deferred<string[]> = new Deferred<string[]>();
        const exist: boolean = await pathExists(hdfsPath, hdfsClient);
197
        if (!exist) {
198
199
200
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

chicm-ms's avatar
chicm-ms committed
201
        hdfsClient.readdir(hdfsPath, (err: any, files: any[]) => {
202
            if (err) {
203
204
205
206
207
208
209
210
211
212
213
214
                deferred.reject(err);
            }

            deferred.resolve(files);
        });

        return deferred.promise;
    }

    /**
     * Delete HDFS path
     * @param hdfsPath the path in HDFS. It could be either file or directory
215
     * @param hdfsClient HDFS client
216
217
     * @param recursive Mark if need to delete recursively
     */
chicm-ms's avatar
chicm-ms committed
218
219
220
    export function deletePath(hdfsPath: string, hdfsClient: any, recursive: boolean = true): Promise<boolean> {
        const deferred: Deferred<boolean> = new Deferred<boolean>();
        hdfsClient.unlink(hdfsPath, recursive, (err: any) => {
221
            if (!err) {
222
223
224
225
226
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });
227

228
229
        return deferred.promise;
    }
230
    // tslint:enable: no-unsafe-any non-literal-fs-path no-any
231
}