file_io.cpp 4.93 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <LightGBM/utils/log.h>
#include <LightGBM/utils/file_io.h>

#include <algorithm>
#include <sstream>
#include <unordered_map>
#ifdef USE_HDFS
#include <hdfs.h>
#endif

namespace LightGBM{

struct LocalFile : VirtualFileReader, VirtualFileWriter {
  LocalFile(const std::string& filename, const std::string& mode) : filename_(filename), mode_(mode) {}
  virtual ~LocalFile() {
    if (file_ != NULL) {
      fclose(file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
#if _MSC_VER
      fopen_s(&file_, filename_.c_str(), mode_.c_str());
#else
      file_ = fopen(filename_.c_str(), mode_.c_str());
#endif
    }
    return file_ != NULL;
  }

  bool Exists() const {
    LocalFile file(filename_, "rb");
    return file.Init();
  }

  size_t Read(void* buffer, size_t bytes) const {
    return fread(buffer, 1, bytes, file_);
  }

  size_t Write(const void* buffer, size_t bytes) const {
    return fwrite(buffer, bytes, 1, file_) == 1 ? bytes : 0;
  }

private:
  FILE* file_ = NULL;
  const std::string filename_;
  const std::string mode_;
};

const std::string kHdfsProto = "hdfs://";

#ifdef USE_HDFS
Qiwei Ye's avatar
Qiwei Ye committed
54
55
56
struct HDFSFile : VirtualFileReader, VirtualFileWriter {
  HDFSFile(const std::string& filename, int flags) : filename_(filename), flags_(flags) {}
  ~HDFSFile() {
57
58
59
60
61
62
63
64
    if (file_ != NULL) {
      hdfsCloseFile(fs_, file_);
    }
  }

  bool Init() {
    if (file_ == NULL) {
      if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
65
        fs_ = GetHDFSFileSystem(filename_);
66
67
68
69
70
71
72
73
74
75
      }
      if (fs_ != NULL && (flags_ == O_WRONLY || 0 == hdfsExists(fs_, filename_.c_str()))) {
        file_ = hdfsOpenFile(fs_, filename_.c_str(), flags_, 0, 0, 0);
      }
    }
    return file_ != NULL;
  }

  bool Exists() const {
    if (fs_ == NULL) {
Qiwei Ye's avatar
Qiwei Ye committed
76
      fs_ = GetHDFSFileSystem(filename_);
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
    }
    return fs_ != NULL && 0 == hdfsExists(fs_, filename_.c_str());
  }

  size_t Read(void* data, size_t bytes) const {
    return FileOperation<void*>(data, bytes, &hdfsRead);
  }

  size_t Write(const void* data, size_t bytes) const {
    return FileOperation<const void*>(data, bytes, &hdfsWrite);
  }

private:
  template <typename BufferType>
  using fileOp = tSize(*)(hdfsFS, hdfsFile, BufferType, tSize);

  template <typename BufferType>
  inline size_t FileOperation(BufferType data, size_t bytes, fileOp<BufferType> op) const {
    char* buffer = (char *)data;
    size_t remain = bytes;
    while (remain != 0) {
      size_t nmax = static_cast<size_t>(std::numeric_limits<tSize>::max());
      tSize ret = op(fs_, file_, buffer, std::min(nmax, remain));
      if (ret > 0) {
        size_t n = static_cast<size_t>(ret);
        remain -= n;
        buffer += n;
      } else if (ret == 0) {
        break;
      } else if (errno != EINTR) {
        Log::Fatal("Failed hdfs file operation [%s]", strerror(errno));
      }
    }
    return bytes - remain;
  }

Qiwei Ye's avatar
Qiwei Ye committed
113
  static hdfsFS GetHDFSFileSystem(const std::string& uri) {
114
115
116
117
118
119
120
    size_t end = uri.find("/", kHdfsProto.length());
    if (uri.find(kHdfsProto) != 0 || end == std::string::npos) {
      Log::Warning("Bad hdfs uri, no namenode found [%s]", uri.c_str());
      return NULL;
    }
    std::string hostport = uri.substr(kHdfsProto.length(), end - kHdfsProto.length());
    if (fs_cache_.count(hostport) == 0) {
Qiwei Ye's avatar
Qiwei Ye committed
121
      fs_cache_[hostport] = MakeHDFSFileSystem(hostport);
122
123
124
125
    }
    return fs_cache_[hostport];
  }

Qiwei Ye's avatar
Qiwei Ye committed
126
  static hdfsFS MakeHDFSFileSystem(const std::string& hostport) {
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
    std::istringstream iss(hostport);
    std::string host;
    tPort port = 0;
    std::getline(iss, host, ':');
    iss >> port;
    hdfsFS fs = iss.eof() ? hdfsConnect(host.c_str(), port) : NULL;
    if (fs == NULL) {
      Log::Warning("Could not connect to hdfs namenode [%s]", hostport.c_str());
    }
    return fs;
  }

  mutable hdfsFS fs_ = NULL;
  hdfsFile file_ = NULL;
  const std::string filename_;
  const int flags_;
  static std::unordered_map<std::string, hdfsFS> fs_cache_;
};

Qiwei Ye's avatar
Qiwei Ye committed
146
std::unordered_map<std::string, hdfsFS> HDFSFile::fs_cache_ = std::unordered_map<std::string, hdfsFS>();
147
148
149
150
151
152
153
154

#define WITH_HDFS(x) x
#else
#define WITH_HDFS(x) Log::Fatal("HDFS Support not enabled.")
#endif // USE_HDFS

std::unique_ptr<VirtualFileReader> VirtualFileReader::Make(const std::string& filename) {
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
155
    WITH_HDFS(return std::unique_ptr<VirtualFileReader>(new HDFSFile(filename, O_RDONLY)));
156
157
158
159
160
161
162
  } else {
    return std::unique_ptr<VirtualFileReader>(new LocalFile(filename, "rb"));
  }
}

std::unique_ptr<VirtualFileWriter> VirtualFileWriter::Make(const std::string& filename) {
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
163
    WITH_HDFS(return std::unique_ptr<VirtualFileWriter>(new HDFSFile(filename, O_WRONLY)));
164
165
166
167
168
169
170
  } else {
    return std::unique_ptr<VirtualFileWriter>(new LocalFile(filename, "wb"));
  }
}

bool VirtualFileWriter::Exists(const std::string& filename) {
  if (0 == filename.find(kHdfsProto)) {
Qiwei Ye's avatar
Qiwei Ye committed
171
    WITH_HDFS(HDFSFile file(filename, O_RDONLY); return file.Exists());
172
173
174
175
176
177
178
  } else {
      LocalFile file(filename, "rb");
      return file.Exists();
  }
}

}  // namespace LightGBM