hdfsClientUtility.ts 7.76 KB
Newer Older
liuzhe-lz's avatar
liuzhe-lz committed
1
2
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
3
4

import * as fs from 'fs';
5
import * as path from 'path';
6
import { Deferred } from 'ts-deferred';
7
import { getExperimentId } from '../../common/experimentStartupInfo';
8
import { getLogger } from '../../common/log';
9
import { unixPathJoin } from '../../common/utils';
10
11
12
13
14

/**
 * HDFS client utility, including copy file/directory
 */
export namespace HDFSClientUtility {
15
16
17
18
    /**
     * Get NNI experiment root directory
     * @param hdfsUserName HDFS user name
     */
19
    export function hdfsExpRootDir(hdfsUserName: string): string {
20
        return '/' + unixPathJoin(hdfsUserName, 'nni', 'experiments', getExperimentId());
21
22
23
24
25
26
27
    }

    /**
     * Get NNI experiment code directory
     * @param hdfsUserName HDFS user name
     */
    export function getHdfsExpCodeDir(hdfsUserName: string): string {
28
        return unixPathJoin(hdfsExpRootDir(hdfsUserName), 'codeDir');
29
30
31
32
33
34
35
36
    }

    /**
     * Get NNI trial working directory
     * @param hdfsUserName HDFS user name
     * @param trialId NNI trial ID
     */
    export function getHdfsTrialWorkDir(hdfsUserName: string, trialId: string): string {
37
38
        const root: string = hdfsExpRootDir(hdfsUserName);

39
        return unixPathJoin(root, 'trials', trialId);
40
41
    }

42
43
    /**
     * Copy a local file to hdfs directory
44
     *
45
46
47
48
     * @param localFilePath local file path(source)
     * @param hdfsFilePath hdfs file path(target)
     * @param hdfsClient hdfs client
     */
chicm-ms's avatar
chicm-ms committed
49
    export async function copyFileToHdfs(localFilePath: string, hdfsFilePath: string, hdfsClient: any): Promise<void> {
50
        const deferred: Deferred<void> = new Deferred<void>();
chicm-ms's avatar
chicm-ms committed
51
        fs.exists(localFilePath, (exists: boolean) => {
52
53
            // Detect if local file exist
            if (exists) {
54
55
                const localFileStream: fs.ReadStream = fs.createReadStream(localFilePath);
                const hdfsFileStream: any = hdfsClient.createWriteStream(hdfsFilePath);
56
                localFileStream.pipe(hdfsFileStream);
57
                hdfsFileStream.on('finish', () => {
58
59
                    deferred.resolve();
                });
chicm-ms's avatar
chicm-ms committed
60
                hdfsFileStream.on('error', (err: any) => {
61
62
                    getLogger()
                      .error(`HDFSCientUtility:copyFileToHdfs, copy file failed, err is ${err.message}`);
63
64
65
                    deferred.reject(err);
                });
            } else {
66
67
                getLogger()
                  .error(`HDFSCientUtility:copyFileToHdfs, ${localFilePath} doesn't exist locally`);
68
69
70
                deferred.reject('file not exist!');
            }
        });
71

72
73
74
75
76
        return deferred.promise;
    }

    /**
     * Recursively copy local directory to hdfs directory
77
     *
78
79
80
81
     * @param localDirectory local directory
     * @param hdfsDirectory HDFS directory
     * @param hdfsClient   HDFS client
     */
chicm-ms's avatar
chicm-ms committed
82
    export async function copyDirectoryToHdfs(localDirectory: string, hdfsDirectory: string, hdfsClient: any): Promise<void> {
83
84
85
86
        const deferred: Deferred<void> = new Deferred<void>();
        // TODO: fs.readdirSync doesn't support ~($HOME)
        const fileNameArray: string[] = fs.readdirSync(localDirectory);

87
        for (const fileName of fileNameArray) {
88
89
            const fullFilePath: string = path.join(localDirectory, fileName);
            try {
90
91
                if (fs.lstatSync(fullFilePath)
                    .isFile()) {
92
93
94
95
96
                    await copyFileToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                } else {
                    // If filePath is a directory, recuisively copy it to remote directory
                    await copyDirectoryToHdfs(fullFilePath, path.join(hdfsDirectory, fileName), hdfsClient);
                }
97
            } catch (error) {
98
99
100
101
102
103
104
105
106
                deferred.reject(error);
            }
        }
        // All files/directories are copied successfully, resolve
        deferred.resolve();

        return deferred.promise;
    }

chicm-ms's avatar
chicm-ms committed
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
    /**
     * Check if an HDFS path already exists
     *
     * @param hdfsPath target path need to check in HDFS
     * @param hdfsClient HDFS client
     */
    export async function pathExists(hdfsPath: string, hdfsClient: any): Promise<boolean> {
        const deferred: Deferred<boolean> = new Deferred<boolean>();
        hdfsClient.exists(hdfsPath, (exist: boolean) => {
             deferred.resolve(exist);
        });

        let timeoutId: NodeJS.Timer;

        const delayTimeout: Promise<boolean> = new Promise<boolean>((resolve: Function, reject: Function): void => {
            // Set timeout and reject the promise once reach timeout (5 seconds)
            timeoutId = setTimeout(() => { reject(`Check HDFS path ${hdfsPath} exists timeout`); }, 5000);
        });

        return Promise.race([deferred.promise, delayTimeout])
          .finally(() => { clearTimeout(timeoutId); });
    }

130
131
    /**
     * Read content from HDFS file
132
     *
133
134
135
     * @param hdfsPath HDFS file path
     * @param hdfsClient HDFS client
     */
chicm-ms's avatar
chicm-ms committed
136
    export async function readFileFromHDFS(hdfsPath: string, hdfsClient: any): Promise<Buffer> {
137
        const deferred: Deferred<Buffer> = new Deferred<Buffer>();
chicm-ms's avatar
chicm-ms committed
138
        let buffer: Buffer = Buffer.alloc(0);
139

chicm-ms's avatar
chicm-ms committed
140
        const exist: boolean = await pathExists(hdfsPath, hdfsClient);
141
        if (!exist) {
142
143
144
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

145
        const remoteFileStream: any = hdfsClient.createReadStream(hdfsPath);
chicm-ms's avatar
chicm-ms committed
146
        remoteFileStream.on('error', (err: any) => {
147
148
149
150
            // Reject with the error
            deferred.reject(err);
        });

chicm-ms's avatar
chicm-ms committed
151
        remoteFileStream.on('data', (chunk: any) => {
152
153
154
            // Concat the data chunk to buffer
            buffer = Buffer.concat([buffer, chunk]);
        });
155

156
        remoteFileStream.on('finish', () => {
157
158
159
160
161
162
163
164
165
            // Upload is done, resolve
            deferred.resolve(buffer);
        });

        return deferred.promise;
    }

    /**
     * Mkdir in HDFS, use default permission 755
166
     *
167
     * @param hdfsPath the path in HDFS. It could be either file or directory
168
     * @param hdfsClient HDFS client
169
     */
chicm-ms's avatar
chicm-ms committed
170
171
    export function mkdir(hdfsPath: string, hdfsClient: any): Promise<boolean> {
        const deferred: Deferred<boolean> = new Deferred<boolean>();
172

chicm-ms's avatar
chicm-ms committed
173
        hdfsClient.mkdir(hdfsPath, (err: any) => {
174
            if (!err) {
175
176
177
178
179
180
181
182
183
184
185
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });

        return deferred.promise;
    }

    /**
     * Read directory contents
186
     *
187
     * @param hdfsPath the path in HDFS. It could be either file or directory
188
     * @param hdfsClient HDFS client
189
     */
chicm-ms's avatar
chicm-ms committed
190
191
192
    export async function readdir(hdfsPath: string, hdfsClient: any): Promise<string[]> {
        const deferred: Deferred<string[]> = new Deferred<string[]>();
        const exist: boolean = await pathExists(hdfsPath, hdfsClient);
193
        if (!exist) {
194
195
196
            deferred.reject(`${hdfsPath} doesn't exists`);
        }

chicm-ms's avatar
chicm-ms committed
197
        hdfsClient.readdir(hdfsPath, (err: any, files: any[]) => {
198
            if (err) {
199
200
201
202
203
204
205
206
207
208
209
210
                deferred.reject(err);
            }

            deferred.resolve(files);
        });

        return deferred.promise;
    }

    /**
     * Delete HDFS path
     * @param hdfsPath the path in HDFS. It could be either file or directory
211
     * @param hdfsClient HDFS client
212
213
     * @param recursive Mark if need to delete recursively
     */
chicm-ms's avatar
chicm-ms committed
214
215
216
    export function deletePath(hdfsPath: string, hdfsClient: any, recursive: boolean = true): Promise<boolean> {
        const deferred: Deferred<boolean> = new Deferred<boolean>();
        hdfsClient.unlink(hdfsPath, recursive, (err: any) => {
217
            if (!err) {
218
219
220
221
222
                deferred.resolve(true);
            } else {
                deferred.reject(err.message);
            }
        });
223

224
225
226
        return deferred.promise;
    }
}