Commit 4a6b928c authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

trace: use boost co-routines for log processing

parent 8c9c3ede
......@@ -26,9 +26,12 @@
#include <string>
class log_parser;
class event {
public:
uint64_t ts;
log_parser *source;
explicit event(uint64_t ts_) : ts(ts_) {
}
......
......@@ -53,7 +53,7 @@ void gem5_parser::process_msg(uint64_t ts, char *comp_name,
return;
if (const std::string *s = syms.lookup(addr)) {
cur_event = new EHostCall(ts, *s);
cur_event = std::make_shared<EHostCall>(ts, *s);
}
} else if (comp_name_len == 18 &&
!memcmp(comp_name, "system.pc.ethernet", 18)) {
......@@ -70,40 +70,40 @@ void gem5_parser::process_msg(uint64_t ts, char *comp_name,
uint64_t size = 0;
if (p.consume_str("received ")) {
if (p.consume_str("MSI-X intr vec ") && p.consume_dec(id)) {
cur_event = new EHostMsiX(ts, id);
cur_event = std::make_shared<EHostMsiX>(ts, id);
} else if (p.consume_str("DMA read id ") && p.consume_dec(id) &&
p.consume_str(" addr ") && p.consume_hex(addr) &&
p.consume_str(" size ") && p.consume_dec(size)) {
// cosim: received DMA read id 94113551511792 addr 23697ad60
// size 20
cur_event = new EHostDmaR(ts, id, addr, size);
cur_event = std::make_shared<EHostDmaR>(ts, id, addr, size);
} else if (p.consume_str("DMA write id ") && p.consume_dec(id) &&
p.consume_str(" addr ") && p.consume_hex(addr) &&
p.consume_str(" size ") && p.consume_dec(size)) {
// cosim: received DMA write id 94113551528032 addr 236972000
// size 4
cur_event = new EHostDmaW(ts, id, addr, size);
cur_event = std::make_shared<EHostDmaW>(ts, id, addr, size);
} else if (p.consume_str("read completion id ") && p.consume_dec(id)) {
// cosim: received read completion id 94583743418112
cur_event = new EHostMmioC(ts, id);
cur_event = std::make_shared<EHostMmioC>(ts, id);
} else if (p.consume_str("write completion id ") && p.consume_dec(id)) {
// cosim: received write completion id 94583743418736
cur_event = new EHostMmioC(ts, id);
cur_event = std::make_shared<EHostMmioC>(ts, id);
}
} else if (p.consume_str("sending ")) {
if (p.consume_str("read addr ") && p.consume_hex(addr) &&
p.consume_str(" size ") && p.consume_dec(size) &&
p.consume_str(" id ") && p.consume_dec(id)) {
// cosim: sending read addr c012a500 size 4 id 94583743418112
cur_event = new EHostMmioR(ts, id, addr, size);
cur_event = std::make_shared<EHostMmioR>(ts, id, addr, size);
} else if (p.consume_str("write addr ") && p.consume_hex(addr) &&
p.consume_str(" size ") && p.consume_dec(size) &&
p.consume_str(" id ") && p.consume_dec(id)) {
// cosim: sending write addr c0108000 size 4 id 94584005188256
cur_event = new EHostMmioW(ts, id, addr, size);
cur_event = std::make_shared<EHostMmioW>(ts, id, addr, size);
}
} else if (p.consume_str("completed DMA id ") && p.consume_dec(id)) {
cur_event = new EHostDmaC(ts, id);
cur_event = std::make_shared<EHostDmaC>(ts, id);
}
}
......
......@@ -33,7 +33,9 @@
namespace bio = boost::iostreams;
log_parser::log_parser() : inf(nullptr), gz_file(nullptr), gz_in(nullptr) {
log_parser::log_parser() : inf(nullptr), gz_file(nullptr), gz_in(nullptr),
buf_len(0), buf_pos(0)
{
buf = new char[block_size];
}
......@@ -95,7 +97,7 @@ size_t log_parser::try_line() {
}
bool log_parser::next_event() {
cur_event = nullptr;
cur_event.reset();
if (buf_len == 0 && !next_block()) {
std::cerr << "escape 0" << std::endl;
......
......@@ -47,35 +47,35 @@ void nicbm_parser::process_line(char *line, size_t line_len) {
if (p.consume_str("read(off=0x")) {
if (p.consume_hex(addr) && p.consume_str(", len=") && p.consume_dec(len) &&
p.consume_str(", val=0x") && p.consume_hex(val)) {
cur_event = new e_nic_mmio_r(ts, addr, len, val);
cur_event = std::make_shared<e_nic_mmio_r>(ts, addr, len, val);
}
} else if (p.consume_str("write(off=0x")) {
if (p.consume_hex(addr) && p.consume_str(", len=") && p.consume_dec(len) &&
p.consume_str(", val=0x") && p.consume_hex(val)) {
cur_event = new e_nic_mmio_w(ts, addr, len, val);
cur_event = std::make_shared<e_nic_mmio_w>(ts, addr, len, val);
}
} else if (p.consume_str("issuing dma op 0x")) {
if (p.consume_hex(id) && p.consume_str(" addr ") && p.consume_hex(addr) &&
p.consume_str(" len ") && p.consume_hex(len)) {
cur_event = new e_nic_dma_i(ts, id, addr, len);
cur_event = std::make_shared<e_nic_dma_i>(ts, id, addr, len);
}
} else if (p.consume_str("completed dma read op 0x") ||
p.consume_str("completed dma write op 0x")) {
if (p.consume_hex(id) && p.consume_str(" addr ") && p.consume_hex(addr) &&
p.consume_str(" len ") && p.consume_hex(len)) {
cur_event = new e_nic_dma_c(ts, id);
cur_event = std::make_shared<e_nic_dma_c>(ts, id);
}
} else if (p.consume_str("issue MSI-X interrupt vec ")) {
if (p.consume_dec(id)) {
cur_event = new e_nic_msix(ts, id);
cur_event = std::make_shared<e_nic_msix>(ts, id);
}
} else if (p.consume_str("eth tx: len ")) {
if (p.consume_dec(len)) {
cur_event = new e_nic_tx(ts, len);
cur_event = std::make_shared<e_nic_tx>(ts, len);
}
} else if (p.consume_str("eth rx: port 0 len ")) {
if (p.consume_dec(len)) {
cur_event = new e_nic_rx(ts, len);
cur_event = std::make_shared<e_nic_rx>(ts, len);
}
#if 1
}
......
......@@ -24,18 +24,97 @@
#include "trace/process.h"
#include <boost/bind.hpp>
#include <boost/coroutine2/all.hpp>
#include <boost/foreach.hpp>
#include <iostream>
#include <memory>
#include "trace/events.h"
#include "trace/parser.h"
struct log_parser_cmp {
bool operator()(const log_parser *l, const log_parser *r) const {
return l->cur_event->ts < r->cur_event->ts;
template <typename T>
struct event_pair_cmp {
bool operator()(const std::pair<T, std::shared_ptr<event>> l,
const std::pair<T, std::shared_ptr<event>> r) const {
return l.second->ts < r.second->ts;
}
};
typedef boost::coroutines2::asymmetric_coroutine<std::shared_ptr<event>> coro_t;
void ReadEvents(coro_t::push_type &sink, log_parser &lp) {
while (lp.next_event() && lp.cur_event) {
lp.cur_event->source = &lp;
sink(lp.cur_event);
}
}
/** merge multiple event streams into one ordered by timestamp */
void MergeEvents(coro_t::push_type &sink,
std::set<coro_t::pull_type *> &all_parsers) {
typedef std::pair<coro_t::pull_type *, std::shared_ptr<event>> itpair_t;
// create set of pairs of source and next event, ordered by timestamp of next
// event.
std::set<itpair_t, event_pair_cmp<coro_t::pull_type *>> active;
// initially populate the set
for (auto p : all_parsers) {
if (*p) {
auto ev = p->get();
(*p)();
active.insert(std::make_pair(p, ev));
}
}
// iterate until there are no more active sources
while (!active.empty()) {
// grab event with lowest timestamp
auto i = active.begin();
itpair_t p = *i;
active.erase(i);
// emit event
sink(p.second);
// check if there is another event in the source, if so, re-enqueue
if (*p.first) {
auto ev = p.first->get();
(*p.first)();
active.insert(std::make_pair(p.first, ev));
}
}
}
void Printer(coro_t::pull_type &source) {
uint64_t ts_off = 0;
for (auto ev: source) {
std::shared_ptr<EHostCall> hc;
if ((hc = std::dynamic_pointer_cast<EHostCall>(ev)) &&
strcmp(ev->source->label, "C") &&
hc->fun == "__sys_sendto") {
std::cout << "---------- REQ START:" << ev->ts << std::endl;
ts_off = ev->ts;
}
std::cout << ev->source->label << " ";
ev->ts -= ts_off;
ev->ts /= 1000;
ev->dump(std::cout);
}
}
int main(int argc, char *argv[]) {
if (argc != 5) {
std::cerr << "Usage: process CLIENT_HLOG CLIENT_NLOG SERVER_HLOG "
"SERVER_CLOG" << std::endl;
return 1;
}
sym_map syms;
syms.add_filter("entry_SYSCALL_64");
syms.add_filter("__do_sys_gettimeofday");
......@@ -74,36 +153,14 @@ int main(int argc, char *argv[]) {
all_parsers.insert(&sh);
all_parsers.insert(&sn);
std::set<log_parser *, log_parser_cmp> active_parsers;
std::cerr << "Opened all" << std::endl;
std::set<coro_t::pull_type *> sources;
for (auto p : all_parsers) {
if (p->next_event() && p->cur_event)
active_parsers.insert(p);
sources.insert(new coro_t::pull_type(
boost::bind(ReadEvents, _1, boost::ref(*p))));
}
uint64_t ts_off = 0;
while (!active_parsers.empty()) {
auto i = active_parsers.begin();
log_parser *p = *i;
active_parsers.erase(i);
EHostCall *hc;
event *ev = p->cur_event;
if (p == &ch && (hc = dynamic_cast<EHostCall *>(ev)) &&
hc->fun == "__sys_sendto") {
std::cout << "---------- REQ START:" << ev->ts << std::endl;
ts_off = ev->ts;
}
std::cout << p->label << " ";
ev->ts -= ts_off;
ev->ts /= 1000;
ev->dump(std::cout);
delete ev;
if (p->next_event() && p->cur_event)
active_parsers.insert(p);
}
coro_t::pull_type merged(boost::bind(MergeEvents, _1, boost::ref(sources)));
Printer(merged);
}
......@@ -26,6 +26,7 @@
#include <boost/iostreams/filtering_streambuf.hpp>
#include <map>
#include <memory>
#include <set>
#include <string>
......@@ -73,7 +74,7 @@ class log_parser {
public:
const char *label;
event *cur_event;
std::shared_ptr<event> cur_event;
log_parser();
virtual ~log_parser();
......
......@@ -26,7 +26,8 @@ bin_trace_process := $(d)process
OBJS := $(addprefix $(d), process.o sym_map.o log_parser.o gem5.o nicbm.o)
$(bin_trace_process): $(OBJS) -lboost_iostreams
$(bin_trace_process): $(OBJS) -lboost_iostreams -lboost_coroutine \
-lboost_context
CLEAN := $(bin_trace_process) $(OBJS)
ALL := $(bin_trace_process)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment