file_io.cpp 5.35 KB
Newer Older
1
2
/*!
 * Copyright (c) 2018 Microsoft Corporation. All rights reserved.
Guolin Ke's avatar
Guolin Ke committed
3
4
 * Licensed under the MIT License. See LICENSE file in the project root for
 * license information.
5
 */
6
#include <LightGBM/utils/file_io.h>
7

Guolin Ke's avatar
Guolin Ke committed
8
#include <LightGBM/utils/log.h>
9
10
11
12

#include <algorithm>
#include <sstream>
#include <unordered_map>
13

14
15
16
17
#ifdef USE_HDFS
#include <hdfs.h>
#endif

18
namespace LightGBM {
19
20

struct LocalFile : VirtualFileReader, VirtualFileWriter {
Guolin Ke's avatar
Guolin Ke committed
21
22
  LocalFile(const std::string& filename, const std::string& mode)
      : filename_(filename), mode_(mode) {}
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  virtual ~LocalFile() {
    if (file_ != NULL) {
      fclose(file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
#if _MSC_VER
      fopen_s(&file_, filename_.c_str(), mode_.c_str());
#else
      file_ = fopen(filename_.c_str(), mode_.c_str());
#endif
    }
    return file_ != NULL;
  }

  bool Exists() const {
    LocalFile file(filename_, "rb");
    return file.Init();
  }

  size_t Read(void* buffer, size_t bytes) const {
    return fread(buffer, 1, bytes, file_);
  }

  size_t Write(const void* buffer, size_t bytes) const {
    return fwrite(buffer, bytes, 1, file_) == 1 ? bytes : 0;
  }

Nikita Titov's avatar
Nikita Titov committed
53
 private:
54
55
56
57
58
  FILE* file_ = NULL;
  const std::string filename_;
  const std::string mode_;
};

Guolin Ke's avatar
Guolin Ke committed
59
const char* kHdfsProto = "hdfs://";
60
61

#ifdef USE_HDFS
62
63
const size_t kHdfsProtoLength = static_cast<size_t>(strlen(kHdfsProto));

Qiwei Ye's avatar
Qiwei Ye committed
64
struct HDFSFile : VirtualFileReader, VirtualFileWriter {
Guolin Ke's avatar
Guolin Ke committed
65
66
  HDFSFile(const std::string& filename, int flags)
      : filename_(filename), flags_(flags) {}
Qiwei Ye's avatar
Qiwei Ye committed
67
  ~HDFSFile() {
68
69
70
71
72
73
74
75
    if (file_ != NULL) {
      hdfsCloseFile(fs_, file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
      if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
76
        fs_ = GetHDFSFileSystem(filename_);
77
      }
Guolin Ke's avatar
Guolin Ke committed
78
79
      if (fs_ != NULL &&
          (flags_ == O_WRONLY || 0 == hdfsExists(fs_, filename_.c_str()))) {
80
81
82
83
84
85
86
87
        file_ = hdfsOpenFile(fs_, filename_.c_str(), flags_, 0, 0, 0);
      }
    }
    return file_ != NULL;
  }

  bool Exists() const {
    if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
88
      fs_ = GetHDFSFileSystem(filename_);
89
90
91
92
93
94
95
96
97
98
99
100
    }
    return fs_ != NULL && 0 == hdfsExists(fs_, filename_.c_str());
  }

  size_t Read(void* data, size_t bytes) const {
    return FileOperation<void*>(data, bytes, &hdfsRead);
  }

  size_t Write(const void* data, size_t bytes) const {
    return FileOperation<const void*>(data, bytes, &hdfsWrite);
  }

Nikita Titov's avatar
Nikita Titov committed
101
 private:
102
  template <typename BufferType>
Guolin Ke's avatar
Guolin Ke committed
103
  using fileOp = tSize (*)(hdfsFS, hdfsFile, BufferType, tSize);
104
105

  template <typename BufferType>
Guolin Ke's avatar
Guolin Ke committed
106
107
  inline size_t FileOperation(BufferType data, size_t bytes,
                              fileOp<BufferType> op) const {
Nikita Titov's avatar
Nikita Titov committed
108
    char* buffer = const_cast<char*>(static_cast<const char*>(data));
109
110
111
112
113
114
115
116
117
118
119
    size_t remain = bytes;
    while (remain != 0) {
      size_t nmax = static_cast<size_t>(std::numeric_limits<tSize>::max());
      tSize ret = op(fs_, file_, buffer, std::min(nmax, remain));
      if (ret > 0) {
        size_t n = static_cast<size_t>(ret);
        remain -= n;
        buffer += n;
      } else if (ret == 0) {
        break;
      } else if (errno != EINTR) {
120
        Log::Fatal("Failed HDFS file operation [%s]", strerror(errno));
121
122
123
124
125
      }
    }
    return bytes - remain;
  }

Qiwei Ye's avatar
Qiwei Ye committed
126
  static hdfsFS GetHDFSFileSystem(const std::string& uri) {
127
    size_t end = uri.find("/", kHdfsProtoLength);
128
    if (uri.find(kHdfsProto) != 0 || end == std::string::npos) {
129
      Log::Warning("Bad HDFS uri, no namenode found [%s]", uri.c_str());
130
131
      return NULL;
    }
132
    std::string hostport = uri.substr(kHdfsProtoLength, end - kHdfsProtoLength);
133
    if (fs_cache_.count(hostport) == 0) {
Qiwei Ye's avatar
Qiwei Ye committed
134
      fs_cache_[hostport] = MakeHDFSFileSystem(hostport);
135
136
137
138
    }
    return fs_cache_[hostport];
  }

Qiwei Ye's avatar
Qiwei Ye committed
139
  static hdfsFS MakeHDFSFileSystem(const std::string& hostport) {
140
141
142
143
144
145
146
    std::istringstream iss(hostport);
    std::string host;
    tPort port = 0;
    std::getline(iss, host, ':');
    iss >> port;
    hdfsFS fs = iss.eof() ? hdfsConnect(host.c_str(), port) : NULL;
    if (fs == NULL) {
147
      Log::Warning("Could not connect to HDFS namenode [%s]", hostport.c_str());
148
149
150
151
152
153
154
155
156
157
158
    }
    return fs;
  }

  mutable hdfsFS fs_ = NULL;
  hdfsFile file_ = NULL;
  const std::string filename_;
  const int flags_;
  static std::unordered_map<std::string, hdfsFS> fs_cache_;
};

Guolin Ke's avatar
Guolin Ke committed
159
160
std::unordered_map<std::string, hdfsFS> HDFSFile::fs_cache_ =
    std::unordered_map<std::string, hdfsFS>();
161
162
163

#define WITH_HDFS(x) x
#else
164
#define WITH_HDFS(x) Log::Fatal("HDFS support is not enabled")
165
#endif  // USE_HDFS
166

Guolin Ke's avatar
Guolin Ke committed
167
168
169
std::unique_ptr<VirtualFileReader> VirtualFileReader::Make(
    const std::string& filename) {
#ifdef USE_HDFS
170
  if (0 == filename.find(kHdfsProto)) {
Guolin Ke's avatar
Guolin Ke committed
171
172
173
174
175
    WITH_HDFS(return std::unique_ptr<VirtualFileReader>(
        new HDFSFile(filename, O_RDONLY)));
  } else
#endif
  {
176
177
178
179
    return std::unique_ptr<VirtualFileReader>(new LocalFile(filename, "rb"));
  }
}

Guolin Ke's avatar
Guolin Ke committed
180
181
182
std::unique_ptr<VirtualFileWriter> VirtualFileWriter::Make(
    const std::string& filename) {
#ifdef USE_HDFS
183
  if (0 == filename.find(kHdfsProto)) {
Guolin Ke's avatar
Guolin Ke committed
184
185
186
187
188
    WITH_HDFS(return std::unique_ptr<VirtualFileWriter>(
        new HDFSFile(filename, O_WRONLY)));
  } else
#endif
  {
189
190
191
192
193
    return std::unique_ptr<VirtualFileWriter>(new LocalFile(filename, "wb"));
  }
}

bool VirtualFileWriter::Exists(const std::string& filename) {
Guolin Ke's avatar
Guolin Ke committed
194
#ifdef USE_HDFS
195
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
196
    WITH_HDFS(HDFSFile file(filename, O_RDONLY); return file.Exists());
Guolin Ke's avatar
Guolin Ke committed
197
198
199
200
201
  } else
#endif
  {
    LocalFile file(filename, "rb");
    return file.Exists();
202
203
204
205
  }
}

}  // namespace LightGBM