"vscode:/vscode.git/clone" did not exist on "ccc06d1902a98284619a0e2ea95ef4d7bd046c0f"
text_reader.h 11.5 KB
Newer Older
1
2
3
4
/*!
 * Copyright (c) 2016 Microsoft Corporation. All rights reserved.
 * Licensed under the MIT License. See LICENSE file in the project root for license information.
 */
5
6
#ifndef LIGHTGBM_INCLUDE_LIGHTGBM_UTILS_TEXT_READER_H_
#define LIGHTGBM_INCLUDE_LIGHTGBM_UTILS_TEXT_READER_H_
Guolin Ke's avatar
Guolin Ke committed
7

8
9
10
11
#include <LightGBM/utils/log.h>
#include <LightGBM/utils/pipeline_reader.h>
#include <LightGBM/utils/random.h>

12
#include <string>
Guolin Ke's avatar
Guolin Ke committed
13
#include <cstdio>
14
#include <functional>
Guolin Ke's avatar
Guolin Ke committed
15
#include <sstream>
Guolin Ke's avatar
Guolin Ke committed
16
17
18
19
#include <vector>

namespace LightGBM {

20
21
const size_t kGbs = size_t(1024) * 1024 * 1024;

Guolin Ke's avatar
Guolin Ke committed
22
23
24
25
26
/*!
* \brief Read text data from file
*/
template<typename INDEX_T>
class TextReader {
Nikita Titov's avatar
Nikita Titov committed
27
 public:
Guolin Ke's avatar
Guolin Ke committed
28
29
30
  /*!
  * \brief Constructor
  * \param filename Filename of data
Guolin Ke's avatar
Guolin Ke committed
31
  * \param is_skip_first_line True if need to skip header
Guolin Ke's avatar
Guolin Ke committed
32
  */
33
34
  TextReader(const char* filename, bool is_skip_first_line, size_t progress_interval_bytes = SIZE_MAX):
    filename_(filename), is_skip_first_line_(is_skip_first_line), read_progress_interval_bytes_(progress_interval_bytes) {
Guolin Ke's avatar
Guolin Ke committed
35
    if (is_skip_first_line_) {
36
37
      auto reader = VirtualFileReader::Make(filename);
      if (!reader->Init()) {
38
        Log::Fatal("Could not open %s", filename);
Guolin Ke's avatar
Guolin Ke committed
39
      }
Guolin Ke's avatar
Guolin Ke committed
40
      std::stringstream str_buf;
41
42
43
44
      char read_c;
      size_t nread = reader->Read(&read_c, 1);
      while (nread == 1) {
        if (read_c == '\n' || read_c == '\r') {
Guolin Ke's avatar
Guolin Ke committed
45
46
          break;
        }
47
        str_buf << read_c;
Guolin Ke's avatar
Guolin Ke committed
48
        ++skip_bytes_;
49
        nread = reader->Read(&read_c, 1);
Guolin Ke's avatar
Guolin Ke committed
50
      }
51
52
      if (read_c == '\r') {
        reader->Read(&read_c, 1);
Guolin Ke's avatar
Guolin Ke committed
53
54
        ++skip_bytes_;
      }
55
56
      if (read_c == '\n') {
        reader->Read(&read_c, 1);
Guolin Ke's avatar
Guolin Ke committed
57
58
        ++skip_bytes_;
      }
Guolin Ke's avatar
Guolin Ke committed
59
      first_line_ = str_buf.str();
60
      Log::Debug("Skipped header \"%s\" in file %s", first_line_.c_str(), filename_);
Guolin Ke's avatar
Guolin Ke committed
61
    }
Guolin Ke's avatar
Guolin Ke committed
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
  }
  /*!
  * \brief Destructor
  */
  ~TextReader() {
    Clear();
  }
  /*!
  * \brief Clear cached data
  */
  inline void Clear() {
    lines_.clear();
    lines_.shrink_to_fit();
  }
  /*!
Guolin Ke's avatar
Guolin Ke committed
77
78
79
80
81
82
  * \brief return first line of data
  */
  inline std::string first_line() {
    return first_line_;
  }
  /*!
Guolin Ke's avatar
Guolin Ke committed
83
84
85
86
  * \brief Get text data that read from file
  * \return Text data, store in std::vector by line
  */
  inline std::vector<std::string>& Lines() { return lines_; }
87
88
89
90
91
92
93
94
95
96
97
  /*!
  * \brief Get joined text data that read from file
  * \return Text data, store in std::string, joined all lines by delimiter
  */
  inline std::string JoinedLines(std::string delimiter = "\n") {
    std::stringstream ss;
    for (auto line : lines_) {
      ss << line << delimiter;
    }
    return ss.str();
  }
Guolin Ke's avatar
Guolin Ke committed
98
99
100
101

  INDEX_T ReadAllAndProcess(const std::function<void(INDEX_T, const char*, size_t)>& process_fun) {
    last_line_ = "";
    INDEX_T total_cnt = 0;
102
    size_t bytes_read = 0;
Guolin Ke's avatar
Guolin Ke committed
103
    PipelineReader::Read(filename_, skip_bytes_,
Guolin Ke's avatar
Guolin Ke committed
104
        [&process_fun, &bytes_read, &total_cnt, this]
Guolin Ke's avatar
Guolin Ke committed
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    (const char* buffer_process, size_t read_cnt) {
      size_t cnt = 0;
      size_t i = 0;
      size_t last_i = 0;
      // skip the break between \r and \n
      if (last_line_.size() == 0 && buffer_process[0] == '\n') {
        i = 1;
        last_i = i;
      }
      while (i < read_cnt) {
        if (buffer_process[i] == '\n' || buffer_process[i] == '\r') {
          if (last_line_.size() > 0) {
            last_line_.append(buffer_process + last_i, i - last_i);
            process_fun(total_cnt, last_line_.c_str(), last_line_.size());
            last_line_ = "";
120
          } else {
Guolin Ke's avatar
Guolin Ke committed
121
122
123
124
125
126
            process_fun(total_cnt, buffer_process + last_i, i - last_i);
          }
          ++cnt;
          ++i;
          ++total_cnt;
          // skip end of line
127
128
129
          while ((buffer_process[i] == '\n' || buffer_process[i] == '\r') && i < read_cnt) {
            ++i;
          }
Guolin Ke's avatar
Guolin Ke committed
130
          last_i = i;
131
        } else {
Guolin Ke's avatar
Guolin Ke committed
132
133
134
135
          ++i;
        }
      }
      if (last_i != read_cnt) {
136
        last_line_.append(buffer_process + last_i, read_cnt - last_i);
Guolin Ke's avatar
Guolin Ke committed
137
      }
138
139
140

      size_t prev_bytes_read = bytes_read;
      bytes_read += read_cnt;
141
142
      if (prev_bytes_read / read_progress_interval_bytes_ < bytes_read / read_progress_interval_bytes_) {
        Log::Debug("Read %.1f GBs from %s.", 1.0 * bytes_read / kGbs, filename_);
143
144
      }

Guolin Ke's avatar
Guolin Ke committed
145
146
147
148
      return cnt;
    });
    // if last line of file doesn't contain end of line
    if (last_line_.size() > 0) {
149
      Log::Info("Warning: last line of %s has no end of line, still using this line", filename_);
Guolin Ke's avatar
Guolin Ke committed
150
151
152
153
154
155
156
157
158
159
160
161
162
      process_fun(total_cnt, last_line_.c_str(), last_line_.size());
      ++total_cnt;
      last_line_ = "";
    }
    return total_cnt;
  }

  /*!
  * \brief Read all text data from file in memory
  * \return number of lines of text data
  */
  INDEX_T ReadAllLines() {
    return ReadAllAndProcess(
163
      [=](INDEX_T, const char* buffer, size_t size) {
Guolin Ke's avatar
Guolin Ke committed
164
165
166
167
      lines_.emplace_back(buffer, size);
    });
  }

168
169
170
  std::vector<char> ReadContent(size_t* out_len) {
    std::vector<char> ret;
    *out_len = 0;
171
172
    auto reader = VirtualFileReader::Make(filename_);
    if (!reader->Init()) {
173
174
175
176
177
178
      return ret;
    }
    const size_t buffer_size = 16 * 1024 * 1024;
    auto buffer_read = std::vector<char>(buffer_size);
    size_t read_cnt = 0;
    do {
179
      read_cnt = reader->Read(buffer_read.data(), buffer_size);
180
181
182
183
184
185
      ret.insert(ret.end(), buffer_read.begin(), buffer_read.begin() + read_cnt);
      *out_len += read_cnt;
    } while (read_cnt > 0);
    return ret;
  }

Guolin Ke's avatar
Guolin Ke committed
186
  INDEX_T SampleFromFile(Random* random, INDEX_T sample_cnt, std::vector<std::string>* out_sampled_data) {
Guolin Ke's avatar
Guolin Ke committed
187
    INDEX_T cur_sample_cnt = 0;
Guolin Ke's avatar
Guolin Ke committed
188
189
    return ReadAllAndProcess([=, &random, &cur_sample_cnt,
                              &out_sampled_data]
Guolin Ke's avatar
Guolin Ke committed
190
191
192
193
    (INDEX_T line_idx, const char* buffer, size_t size) {
      if (cur_sample_cnt < sample_cnt) {
        out_sampled_data->emplace_back(buffer, size);
        ++cur_sample_cnt;
194
      } else {
Guolin Ke's avatar
Guolin Ke committed
195
        const size_t idx = static_cast<size_t>(random->NextInt(0, static_cast<int>(line_idx + 1)));
196
        if (idx < static_cast<size_t>(sample_cnt)) {
Guolin Ke's avatar
Guolin Ke committed
197
198
199
200
201
202
203
204
205
206
207
208
209
210
          out_sampled_data->operator[](idx) = std::string(buffer, size);
        }
      }
    });
  }
  /*!
  * \brief Read part of text data from file in memory, use filter_fun to filter data
  * \param filter_fun Function that perform data filter
  * \param out_used_data_indices Store line indices that read text data
  * \return The number of total data
  */
  INDEX_T ReadAndFilterLines(const std::function<bool(INDEX_T)>& filter_fun, std::vector<INDEX_T>* out_used_data_indices) {
    out_used_data_indices->clear();
    INDEX_T total_cnt = ReadAllAndProcess(
Guolin Ke's avatar
Guolin Ke committed
211
        [&filter_fun, &out_used_data_indices, this]
Guolin Ke's avatar
Guolin Ke committed
212
213
    (INDEX_T line_idx , const char* buffer, size_t size) {
      bool is_used = filter_fun(line_idx);
214
215
216
217
      if (is_used) {
        out_used_data_indices->push_back(line_idx);
        lines_.emplace_back(buffer, size);
      }
Guolin Ke's avatar
Guolin Ke committed
218
219
220
221
222
    });
    return total_cnt;
  }

  INDEX_T SampleAndFilterFromFile(const std::function<bool(INDEX_T)>& filter_fun, std::vector<INDEX_T>* out_used_data_indices,
Guolin Ke's avatar
Guolin Ke committed
223
    Random* random, INDEX_T sample_cnt, std::vector<std::string>* out_sampled_data) {
Guolin Ke's avatar
Guolin Ke committed
224
225
226
    INDEX_T cur_sample_cnt = 0;
    out_used_data_indices->clear();
    INDEX_T total_cnt = ReadAllAndProcess(
Guolin Ke's avatar
Guolin Ke committed
227
228
        [=, &filter_fun, &out_used_data_indices, &random, &cur_sample_cnt,
         &out_sampled_data]
Guolin Ke's avatar
Guolin Ke committed
229
230
231
    (INDEX_T line_idx, const char* buffer, size_t size) {
      bool is_used = filter_fun(line_idx);
      if (is_used) {
232
        out_used_data_indices->push_back(line_idx);
Guolin Ke's avatar
Guolin Ke committed
233
234
235
        if (cur_sample_cnt < sample_cnt) {
          out_sampled_data->emplace_back(buffer, size);
          ++cur_sample_cnt;
236
        } else {
Guolin Ke's avatar
Guolin Ke committed
237
          const size_t idx = static_cast<size_t>(random->NextInt(0, static_cast<int>(out_used_data_indices->size())));
238
          if (idx < static_cast<size_t>(sample_cnt)) {
Guolin Ke's avatar
Guolin Ke committed
239
240
241
242
243
244
245
246
247
248
            out_sampled_data->operator[](idx) = std::string(buffer, size);
          }
        }
      }
    });
    return total_cnt;
  }

  INDEX_T CountLine() {
    return ReadAllAndProcess(
249
      [=](INDEX_T, const char*, size_t) {
Guolin Ke's avatar
Guolin Ke committed
250
251
252
    });
  }

253
  INDEX_T ReadAllAndProcessParallelWithFilter(const std::function<void(INDEX_T, const std::vector<std::string>&)>& process_fun, const std::function<bool(INDEX_T, INDEX_T)>& filter_fun) {
Guolin Ke's avatar
Guolin Ke committed
254
255
    last_line_ = "";
    INDEX_T total_cnt = 0;
256
    size_t bytes_read = 0;
Guolin Ke's avatar
Guolin Ke committed
257
    INDEX_T used_cnt = 0;
Guolin Ke's avatar
Guolin Ke committed
258
    PipelineReader::Read(filename_, skip_bytes_,
Guolin Ke's avatar
Guolin Ke committed
259
        [&process_fun, &filter_fun, &total_cnt, &bytes_read, &used_cnt, this]
Guolin Ke's avatar
Guolin Ke committed
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
    (const char* buffer_process, size_t read_cnt) {
      size_t cnt = 0;
      size_t i = 0;
      size_t last_i = 0;
      INDEX_T start_idx = used_cnt;
      // skip the break between \r and \n
      if (last_line_.size() == 0 && buffer_process[0] == '\n') {
        i = 1;
        last_i = i;
      }
      while (i < read_cnt) {
        if (buffer_process[i] == '\n' || buffer_process[i] == '\r') {
          if (last_line_.size() > 0) {
            last_line_.append(buffer_process + last_i, i - last_i);
            if (filter_fun(used_cnt, total_cnt)) {
              lines_.push_back(last_line_);
              ++used_cnt;
            }
            last_line_ = "";
279
          } else {
Guolin Ke's avatar
Guolin Ke committed
280
281
282
283
284
285
286
287
288
            if (filter_fun(used_cnt, total_cnt)) {
              lines_.emplace_back(buffer_process + last_i, i - last_i);
              ++used_cnt;
            }
          }
          ++cnt;
          ++i;
          ++total_cnt;
          // skip end of line
289
290
291
          while ((buffer_process[i] == '\n' || buffer_process[i] == '\r') && i < read_cnt) {
            ++i;
          }
Guolin Ke's avatar
Guolin Ke committed
292
          last_i = i;
293
        } else {
Guolin Ke's avatar
Guolin Ke committed
294
295
296
297
298
299
          ++i;
        }
      }
      process_fun(start_idx, lines_);
      lines_.clear();
      if (last_i != read_cnt) {
300
        last_line_.append(buffer_process + last_i, read_cnt - last_i);
Guolin Ke's avatar
Guolin Ke committed
301
      }
302
303
304

      size_t prev_bytes_read = bytes_read;
      bytes_read += read_cnt;
305
306
      if (prev_bytes_read / read_progress_interval_bytes_ < bytes_read / read_progress_interval_bytes_) {
        Log::Debug("Read %.1f GBs from %s.", 1.0 * bytes_read / kGbs, filename_);
307
308
      }

Guolin Ke's avatar
Guolin Ke committed
309
310
311
312
      return cnt;
    });
    // if last line of file doesn't contain end of line
    if (last_line_.size() > 0) {
313
      Log::Info("Warning: last line of %s has no end of line, still using this line", filename_);
Guolin Ke's avatar
Guolin Ke committed
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
      if (filter_fun(used_cnt, total_cnt)) {
        lines_.push_back(last_line_);
        process_fun(used_cnt, lines_);
      }
      lines_.clear();
      ++total_cnt;
      ++used_cnt;
      last_line_ = "";
    }
    return total_cnt;
  }

  INDEX_T ReadAllAndProcessParallel(const std::function<void(INDEX_T, const std::vector<std::string>&)>& process_fun) {
    return ReadAllAndProcessParallelWithFilter(process_fun, [](INDEX_T, INDEX_T) { return true; });
  }

  INDEX_T ReadPartAndProcessParallel(const std::vector<INDEX_T>& used_data_indices, const std::function<void(INDEX_T, const std::vector<std::string>&)>& process_fun) {
331
    return ReadAllAndProcessParallelWithFilter(process_fun,
332
      [&used_data_indices](INDEX_T used_cnt, INDEX_T total_cnt) {
333
      if (static_cast<size_t>(used_cnt) < used_data_indices.size() && total_cnt == used_data_indices[used_cnt]) {
Guolin Ke's avatar
Guolin Ke committed
334
        return true;
335
      } else {
Guolin Ke's avatar
Guolin Ke committed
336
337
338
339
340
        return false;
      }
    });
  }

Nikita Titov's avatar
Nikita Titov committed
341
 private:
Guolin Ke's avatar
Guolin Ke committed
342
343
344
345
346
347
  /*! \brief Filename of text data */
  const char* filename_;
  /*! \brief Cache the read text data */
  std::vector<std::string> lines_;
  /*! \brief Buffer for last line */
  std::string last_line_;
Guolin Ke's avatar
Guolin Ke committed
348
  /*! \brief first line */
349
  std::string first_line_ = "";
Guolin Ke's avatar
Guolin Ke committed
350
351
  /*! \brief is skip first line */
  bool is_skip_first_line_ = false;
352
  size_t read_progress_interval_bytes_;
Guolin Ke's avatar
Guolin Ke committed
353
354
  /*! \brief is skip first line */
  int skip_bytes_ = 0;
Guolin Ke's avatar
Guolin Ke committed
355
356
357
358
};

}  // namespace LightGBM

359
#endif   // LIGHTGBM_INCLUDE_LIGHTGBM_UTILS_TEXT_READER_H_