#include #include #include #include #include "events.h" #include "parser.h" #include "process.h" namespace bio = boost::iostreams; log_parser::log_parser() : inf(nullptr), gz_file(nullptr), gz_in(nullptr) { buf = new char[block_size]; } log_parser::~log_parser() { if (inf) delete inf; if (gz_file) { delete gz_in; delete gz_file; } delete[] buf; } bool log_parser::next_block() { if (buf_pos == buf_len) { buf_pos = 0; } else { memmove(buf, buf + buf_pos, buf_len - buf_pos); buf_pos = buf_len - buf_pos; } inf->read(buf + buf_pos, block_size - buf_pos); size_t newlen = inf->gcount(); buf_len = buf_pos + newlen; buf_pos = 0; return newlen != 0; } void log_parser::open(const char *path) { inf = new std::ifstream(path, std::ios_base::in); } void log_parser::open_gz(const char *path) { gz_file = new std::ifstream(path, std::ios_base::in | std::ios_base::binary); gz_in = new bio::filtering_streambuf(); gz_in->push(bio::gzip_decompressor()); gz_in->push(*gz_file); inf = new std::istream(gz_in); } size_t log_parser::try_line() { size_t pos = buf_pos; size_t line_len = 0; for (; pos < buf_len && buf[pos] != '\n'; pos++, line_len++); if (pos >= buf_len) { // line is incomplete return 0; } process_line(buf + buf_pos, line_len); return pos + 1; } bool log_parser::next_event() { cur_event = nullptr; if (buf_len == 0 && !next_block()) { std::cerr << "escape 0" << std::endl; return false; } do { size_t newpos = try_line(); if (!newpos) { if (!next_block()) { std::cerr << "escape 1" << std::endl; return false; } newpos = try_line(); if (!newpos) { std::cerr << "escape 2" << std::endl; return false; } } buf_pos = newpos; } while (!cur_event); return true; }