"vscode:/vscode.git/clone" did not exist on "f879018b50c2b323ca0409fb5cc338e67ff648f4"
file_io.cpp 5.2 KB
Newer Older
1
2
3
4
/*!
 * Copyright (c) 2018 Microsoft Corporation. All rights reserved.
 * Licensed under the MIT License. See LICENSE file in the project root for license information.
 */
5
#include <LightGBM/utils/file_io.h>
6

Guolin Ke's avatar
Guolin Ke committed
7
#include <LightGBM/utils/log.h>
8
9
10
11

#include <algorithm>
#include <sstream>
#include <unordered_map>
12

13
14
15
16
#ifdef USE_HDFS
#include <hdfs.h>
#endif

17
namespace LightGBM {
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

struct LocalFile : VirtualFileReader, VirtualFileWriter {
  LocalFile(const std::string& filename, const std::string& mode) : filename_(filename), mode_(mode) {}
  virtual ~LocalFile() {
    if (file_ != NULL) {
      fclose(file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
#if _MSC_VER
      fopen_s(&file_, filename_.c_str(), mode_.c_str());
#else
      file_ = fopen(filename_.c_str(), mode_.c_str());
#endif
    }
    return file_ != NULL;
  }

  bool Exists() const {
    LocalFile file(filename_, "rb");
    return file.Init();
  }

  size_t Read(void* buffer, size_t bytes) const {
    return fread(buffer, 1, bytes, file_);
  }

  size_t Write(const void* buffer, size_t bytes) const {
    return fwrite(buffer, bytes, 1, file_) == 1 ? bytes : 0;
  }

Nikita Titov's avatar
Nikita Titov committed
51
 private:
52
53
54
55
56
  FILE* file_ = NULL;
  const std::string filename_;
  const std::string mode_;
};

Guolin Ke's avatar
Guolin Ke committed
57
const char* kHdfsProto = "hdfs://";
58
59

#ifdef USE_HDFS
60
61
const size_t kHdfsProtoLength = static_cast<size_t>(strlen(kHdfsProto));

Qiwei Ye's avatar
Qiwei Ye committed
62
63
64
struct HDFSFile : VirtualFileReader, VirtualFileWriter {
  HDFSFile(const std::string& filename, int flags) : filename_(filename), flags_(flags) {}
  ~HDFSFile() {
65
66
67
68
69
70
71
72
    if (file_ != NULL) {
      hdfsCloseFile(fs_, file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
      if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
73
        fs_ = GetHDFSFileSystem(filename_);
74
75
76
77
78
79
80
81
82
83
      }
      if (fs_ != NULL && (flags_ == O_WRONLY || 0 == hdfsExists(fs_, filename_.c_str()))) {
        file_ = hdfsOpenFile(fs_, filename_.c_str(), flags_, 0, 0, 0);
      }
    }
    return file_ != NULL;
  }

  bool Exists() const {
    if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
84
      fs_ = GetHDFSFileSystem(filename_);
85
86
87
88
89
90
91
92
93
94
95
96
    }
    return fs_ != NULL && 0 == hdfsExists(fs_, filename_.c_str());
  }

  size_t Read(void* data, size_t bytes) const {
    return FileOperation<void*>(data, bytes, &hdfsRead);
  }

  size_t Write(const void* data, size_t bytes) const {
    return FileOperation<const void*>(data, bytes, &hdfsWrite);
  }

Nikita Titov's avatar
Nikita Titov committed
97
 private:
98
99
100
101
102
  template <typename BufferType>
  using fileOp = tSize(*)(hdfsFS, hdfsFile, BufferType, tSize);

  template <typename BufferType>
  inline size_t FileOperation(BufferType data, size_t bytes, fileOp<BufferType> op) const {
Nikita Titov's avatar
Nikita Titov committed
103
    char* buffer = const_cast<char*>(static_cast<const char*>(data));
104
105
106
107
108
109
110
111
112
113
114
    size_t remain = bytes;
    while (remain != 0) {
      size_t nmax = static_cast<size_t>(std::numeric_limits<tSize>::max());
      tSize ret = op(fs_, file_, buffer, std::min(nmax, remain));
      if (ret > 0) {
        size_t n = static_cast<size_t>(ret);
        remain -= n;
        buffer += n;
      } else if (ret == 0) {
        break;
      } else if (errno != EINTR) {
115
        Log::Fatal("Failed HDFS file operation [%s]", strerror(errno));
116
117
118
119
120
      }
    }
    return bytes - remain;
  }

Qiwei Ye's avatar
Qiwei Ye committed
121
  static hdfsFS GetHDFSFileSystem(const std::string& uri) {
122
    size_t end = uri.find("/", kHdfsProtoLength);
123
    if (uri.find(kHdfsProto) != 0 || end == std::string::npos) {
124
      Log::Warning("Bad HDFS uri, no namenode found [%s]", uri.c_str());
125
126
      return NULL;
    }
127
    std::string hostport = uri.substr(kHdfsProtoLength, end - kHdfsProtoLength);
128
    if (fs_cache_.count(hostport) == 0) {
Qiwei Ye's avatar
Qiwei Ye committed
129
      fs_cache_[hostport] = MakeHDFSFileSystem(hostport);
130
131
132
133
    }
    return fs_cache_[hostport];
  }

Qiwei Ye's avatar
Qiwei Ye committed
134
  static hdfsFS MakeHDFSFileSystem(const std::string& hostport) {
135
136
137
138
139
140
141
    std::istringstream iss(hostport);
    std::string host;
    tPort port = 0;
    std::getline(iss, host, ':');
    iss >> port;
    hdfsFS fs = iss.eof() ? hdfsConnect(host.c_str(), port) : NULL;
    if (fs == NULL) {
142
      Log::Warning("Could not connect to HDFS namenode [%s]", hostport.c_str());
143
144
145
146
147
148
149
150
151
152
153
    }
    return fs;
  }

  mutable hdfsFS fs_ = NULL;
  hdfsFile file_ = NULL;
  const std::string filename_;
  const int flags_;
  static std::unordered_map<std::string, hdfsFS> fs_cache_;
};

Qiwei Ye's avatar
Qiwei Ye committed
154
std::unordered_map<std::string, hdfsFS> HDFSFile::fs_cache_ = std::unordered_map<std::string, hdfsFS>();
155
156
157

#define WITH_HDFS(x) x
#else
158
#define WITH_HDFS(x) Log::Fatal("HDFS support is not enabled")
159
#endif  // USE_HDFS
160
161
162

std::unique_ptr<VirtualFileReader> VirtualFileReader::Make(const std::string& filename) {
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
163
    WITH_HDFS(return std::unique_ptr<VirtualFileReader>(new HDFSFile(filename, O_RDONLY)));
164
165
166
167
168
169
170
  } else {
    return std::unique_ptr<VirtualFileReader>(new LocalFile(filename, "rb"));
  }
}

std::unique_ptr<VirtualFileWriter> VirtualFileWriter::Make(const std::string& filename) {
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
171
    WITH_HDFS(return std::unique_ptr<VirtualFileWriter>(new HDFSFile(filename, O_WRONLY)));
172
173
174
175
176
177
178
  } else {
    return std::unique_ptr<VirtualFileWriter>(new LocalFile(filename, "wb"));
  }
}

bool VirtualFileWriter::Exists(const std::string& filename) {
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
179
    WITH_HDFS(HDFSFile file(filename, O_RDONLY); return file.Exists());
180
181
182
183
184
185
186
  } else {
      LocalFile file(filename, "rb");
      return file.Exists();
  }
}

}  // namespace LightGBM