pipeline_reader.h 1.92 KB
Newer Older
Guolin Ke's avatar
Guolin Ke committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef LIGHTGBM_UTILS_PIPELINE_READER_H_
#define LIGHTGBM_UTILS_PIPELINE_READER_H_

#include <LightGBM/utils/log.h>

#include <cstdio>

#include <algorithm>
#include <functional>
#include <thread>

namespace LightGBM{

/*!
* \brief A pipeline file reader, use 2 threads, one read block from file, the other process the block
*/
class PipelineReader {
public:
  /*!
  * \brief Read data from a file, use pipeline methods
  * \param filename Filename of data
  * \process_fun Process function
  */
Guolin Ke's avatar
Guolin Ke committed
24
  static size_t Read(const char* filename, int skip_bytes, const std::function<size_t (const char*, size_t)>& process_fun) {
Guolin Ke's avatar
Guolin Ke committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    FILE* file;

#ifdef _MSC_VER
    fopen_s(&file, filename, "rb");
#else
    file = fopen(filename, "rb");
#endif
    if (file == NULL) {
      return 0;
    }
    size_t cnt = 0;
    const size_t buffer_size =  16 * 1024 * 1024 ;
    // buffer used for the process_fun
    char* buffer_process = new char[buffer_size];
    // buffer used for the file reading
    char* buffer_read = new char[buffer_size];
Guolin Ke's avatar
Guolin Ke committed
41
    size_t read_cnt = 0;
Guolin Ke's avatar
Guolin Ke committed
42
43
    if (skip_bytes > 0) {
      // skip first k bytes
Guolin Ke's avatar
Guolin Ke committed
44
      read_cnt = fread(buffer_process, 1, skip_bytes, file);
Guolin Ke's avatar
Guolin Ke committed
45
    }
Guolin Ke's avatar
Guolin Ke committed
46
    // read first block
Guolin Ke's avatar
Guolin Ke committed
47
    read_cnt = fread(buffer_process, 1, buffer_size, file);
Guolin Ke's avatar
Guolin Ke committed
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    size_t last_read_cnt = 0;
    while (read_cnt > 0) {
      // strat read thread
      std::thread read_worker = std::thread(
        [file, buffer_read, buffer_size, &last_read_cnt] {
        last_read_cnt = fread(buffer_read, 1, buffer_size, file);
      }
      );
      // start process
      cnt += process_fun(buffer_process, read_cnt);
      // wait for read thread
      read_worker.join();
      // exchange the buffer
      std::swap(buffer_process, buffer_read);
      read_cnt = last_read_cnt;
    }
    delete[] buffer_process;
    delete[] buffer_read;
    // close file
    fclose(file);
    return cnt;
  }

};

}  // namespace LightGBM

Guolin Ke's avatar
Guolin Ke committed
75
#endif   // LightGBM_UTILS_PIPELINE_READER_H_