"R-package/tests/vscode:/vscode.git/clone" did not exist on "eded794efb576da165e5dadb73f71ca77592bcd4"
file_io.cpp 5.31 KB
Newer Older
1
2
/*!
 * Copyright (c) 2018 Microsoft Corporation. All rights reserved.
Guolin Ke's avatar
Guolin Ke committed
3
4
 * Licensed under the MIT License. See LICENSE file in the project root for
 * license information.
5
 */
6
7
8
9

#include <algorithm>
#include <sstream>
#include <unordered_map>
10

11
12
13
14
#include <LightGBM/utils/file_io.h>

#include <LightGBM/utils/log.h>

15
16
17
18
#ifdef USE_HDFS
#include <hdfs.h>
#endif

19
namespace LightGBM {
20
21

struct LocalFile : VirtualFileReader, VirtualFileWriter {
Guolin Ke's avatar
Guolin Ke committed
22
23
  LocalFile(const std::string& filename, const std::string& mode)
      : filename_(filename), mode_(mode) {}
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  virtual ~LocalFile() {
    if (file_ != NULL) {
      fclose(file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
#if _MSC_VER
      fopen_s(&file_, filename_.c_str(), mode_.c_str());
#else
      file_ = fopen(filename_.c_str(), mode_.c_str());
#endif
    }
    return file_ != NULL;
  }

  bool Exists() const {
    LocalFile file(filename_, "rb");
    return file.Init();
  }

  size_t Read(void* buffer, size_t bytes) const {
    return fread(buffer, 1, bytes, file_);
  }

  size_t Write(const void* buffer, size_t bytes) const {
    return fwrite(buffer, bytes, 1, file_) == 1 ? bytes : 0;
  }

Nikita Titov's avatar
Nikita Titov committed
54
 private:
55
56
57
58
59
  FILE* file_ = NULL;
  const std::string filename_;
  const std::string mode_;
};

Guolin Ke's avatar
Guolin Ke committed
60
const char* kHdfsProto = "hdfs://";
61
62

#ifdef USE_HDFS
63
64
const size_t kHdfsProtoLength = static_cast<size_t>(strlen(kHdfsProto));

Qiwei Ye's avatar
Qiwei Ye committed
65
struct HDFSFile : VirtualFileReader, VirtualFileWriter {
Guolin Ke's avatar
Guolin Ke committed
66
67
  HDFSFile(const std::string& filename, int flags)
      : filename_(filename), flags_(flags) {}
Qiwei Ye's avatar
Qiwei Ye committed
68
  ~HDFSFile() {
69
70
71
72
73
74
75
76
    if (file_ != NULL) {
      hdfsCloseFile(fs_, file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
      if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
77
        fs_ = GetHDFSFileSystem(filename_);
78
      }
Guolin Ke's avatar
Guolin Ke committed
79
80
      if (fs_ != NULL &&
          (flags_ == O_WRONLY || 0 == hdfsExists(fs_, filename_.c_str()))) {
81
82
83
84
85
86
87
88
        file_ = hdfsOpenFile(fs_, filename_.c_str(), flags_, 0, 0, 0);
      }
    }
    return file_ != NULL;
  }

  bool Exists() const {
    if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
89
      fs_ = GetHDFSFileSystem(filename_);
90
91
92
93
94
95
96
97
98
99
100
101
    }
    return fs_ != NULL && 0 == hdfsExists(fs_, filename_.c_str());
  }

  size_t Read(void* data, size_t bytes) const {
    return FileOperation<void*>(data, bytes, &hdfsRead);
  }

  size_t Write(const void* data, size_t bytes) const {
    return FileOperation<const void*>(data, bytes, &hdfsWrite);
  }

Nikita Titov's avatar
Nikita Titov committed
102
 private:
103
  template <typename BufferType>
Guolin Ke's avatar
Guolin Ke committed
104
  using fileOp = tSize (*)(hdfsFS, hdfsFile, BufferType, tSize);
105
106

  template <typename BufferType>
Guolin Ke's avatar
Guolin Ke committed
107
108
  inline size_t FileOperation(BufferType data, size_t bytes,
                              fileOp<BufferType> op) const {
Nikita Titov's avatar
Nikita Titov committed
109
    char* buffer = const_cast<char*>(static_cast<const char*>(data));
110
111
112
113
114
115
116
117
118
119
120
    size_t remain = bytes;
    while (remain != 0) {
      size_t nmax = static_cast<size_t>(std::numeric_limits<tSize>::max());
      tSize ret = op(fs_, file_, buffer, std::min(nmax, remain));
      if (ret > 0) {
        size_t n = static_cast<size_t>(ret);
        remain -= n;
        buffer += n;
      } else if (ret == 0) {
        break;
      } else if (errno != EINTR) {
121
        Log::Fatal("Failed HDFS file operation [%s]", strerror(errno));
122
123
124
125
126
      }
    }
    return bytes - remain;
  }

Qiwei Ye's avatar
Qiwei Ye committed
127
  static hdfsFS GetHDFSFileSystem(const std::string& uri) {
128
    size_t end = uri.find("/", kHdfsProtoLength);
129
    if (uri.find(kHdfsProto) != 0 || end == std::string::npos) {
130
      Log::Warning("Bad HDFS uri, no namenode found [%s]", uri.c_str());
131
132
      return NULL;
    }
133
    std::string hostport = uri.substr(kHdfsProtoLength, end - kHdfsProtoLength);
134
    if (fs_cache_.count(hostport) == 0) {
Qiwei Ye's avatar
Qiwei Ye committed
135
      fs_cache_[hostport] = MakeHDFSFileSystem(hostport);
136
137
138
139
    }
    return fs_cache_[hostport];
  }

Qiwei Ye's avatar
Qiwei Ye committed
140
  static hdfsFS MakeHDFSFileSystem(const std::string& hostport) {
141
142
143
144
145
146
147
    std::istringstream iss(hostport);
    std::string host;
    tPort port = 0;
    std::getline(iss, host, ':');
    iss >> port;
    hdfsFS fs = iss.eof() ? hdfsConnect(host.c_str(), port) : NULL;
    if (fs == NULL) {
148
      Log::Warning("Could not connect to HDFS namenode [%s]", hostport.c_str());
149
150
151
152
153
154
155
156
157
158
159
    }
    return fs;
  }

  mutable hdfsFS fs_ = NULL;
  hdfsFile file_ = NULL;
  const std::string filename_;
  const int flags_;
  static std::unordered_map<std::string, hdfsFS> fs_cache_;
};

Guolin Ke's avatar
Guolin Ke committed
160
161
std::unordered_map<std::string, hdfsFS> HDFSFile::fs_cache_ =
    std::unordered_map<std::string, hdfsFS>();
162
163
164

#define WITH_HDFS(x) x
#else
165
#define WITH_HDFS(x) Log::Fatal("HDFS support is not enabled")
166
#endif  // USE_HDFS
167

Guolin Ke's avatar
Guolin Ke committed
168
169
170
std::unique_ptr<VirtualFileReader> VirtualFileReader::Make(
    const std::string& filename) {
#ifdef USE_HDFS
171
  if (0 == filename.find(kHdfsProto)) {
Guolin Ke's avatar
Guolin Ke committed
172
173
    WITH_HDFS(return std::unique_ptr<VirtualFileReader>(
        new HDFSFile(filename, O_RDONLY)));
174
  }
175
176
#endif
  return std::unique_ptr<VirtualFileReader>(new LocalFile(filename, "rb"));
177
178
}

Guolin Ke's avatar
Guolin Ke committed
179
180
181
std::unique_ptr<VirtualFileWriter> VirtualFileWriter::Make(
    const std::string& filename) {
#ifdef USE_HDFS
182
  if (0 == filename.find(kHdfsProto)) {
Guolin Ke's avatar
Guolin Ke committed
183
184
    WITH_HDFS(return std::unique_ptr<VirtualFileWriter>(
        new HDFSFile(filename, O_WRONLY)));
185
  }
186
187
#endif
  return std::unique_ptr<VirtualFileWriter>(new LocalFile(filename, "wb"));
188
189
190
}

bool VirtualFileWriter::Exists(const std::string& filename) {
Guolin Ke's avatar
Guolin Ke committed
191
#ifdef USE_HDFS
192
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
193
    WITH_HDFS(HDFSFile file(filename, O_RDONLY); return file.Exists());
194
  }
195
196
197
#endif
  LocalFile file(filename, "rb");
  return file.Exists();
198
199
200
}

}  // namespace LightGBM