scheduler.cc 3.11 KB
Newer Older
1
2
3
4
5
6
/*!
 *  Copyright (c) 2018 by Contributors
 * \file scheduler/scheduler.cc
 * \brief DGL Scheduler implementation
 */
#include <dgl/scheduler.h>
Lingfan Yu's avatar
Lingfan Yu committed
7
8
9
10
11
12
#include <unordered_map>
#include <vector>

namespace dgl {
namespace sched {

13
14
15
16
std::vector<IdArray> DegreeBucketing(const IdArray& msg_ids, const IdArray& vids,
        const IdArray& recv_ids) {
    auto n_msgs = msg_ids->shape[0];

Lingfan Yu's avatar
Lingfan Yu committed
17
    const int64_t* vid_data = static_cast<int64_t*>(vids->data);
18
19
    const int64_t* msg_id_data = static_cast<int64_t*>(msg_ids->data);
    const int64_t* recv_id_data = static_cast<int64_t*>(recv_ids->data);
Lingfan Yu's avatar
Lingfan Yu committed
20

21
    // in edge: dst->msgs
Lingfan Yu's avatar
Lingfan Yu committed
22
    std::unordered_map<int64_t, std::vector<int64_t>> in_edges;
23
24
    for (int64_t i = 0; i < n_msgs; ++i) {
        in_edges[vid_data[i]].push_back(msg_id_data[i]);
Lingfan Yu's avatar
Lingfan Yu committed
25
26
27
28
    }

    // bkt: deg->dsts
    std::unordered_map<int64_t, std::vector<int64_t>> bkt;
29
    for (const auto& it : in_edges) {
Lingfan Yu's avatar
Lingfan Yu committed
30
31
32
        bkt[it.second.size()].push_back(it.first);
    }

33
34
35
36
37
38
39
40
41
    std::unordered_set<int64_t> zero_deg_nodes;
    for (int64_t i = 0; i < recv_ids->shape[0]; ++i) {
        if (in_edges.find(recv_id_data[i]) == in_edges.end()) {
            zero_deg_nodes.insert(recv_id_data[i]);
        }
    }
    auto n_zero_deg = zero_deg_nodes.size();

    // calc output size
Lingfan Yu's avatar
Lingfan Yu committed
42
43
    int64_t n_deg = bkt.size();
    int64_t n_dst = in_edges.size();
44
45
46
47
48
49
50
    int64_t n_mid_sec = bkt.size();  // zero deg won't affect message size
    if (n_zero_deg > 0) {
        n_deg += 1;
        n_dst += n_zero_deg;
    }

    // initialize output
Lingfan Yu's avatar
Lingfan Yu committed
51
52
53
54
    IdArray degs = IdArray::Empty({n_deg}, vids->dtype, vids->ctx);
    IdArray nids = IdArray::Empty({n_dst}, vids->dtype, vids->ctx);
    IdArray nid_section = IdArray::Empty({n_deg}, vids->dtype, vids->ctx);
    IdArray mids = IdArray::Empty({n_msgs}, vids->dtype, vids->ctx);
55
    IdArray mid_section = IdArray::Empty({n_mid_sec}, vids->dtype, vids->ctx);
Lingfan Yu's avatar
Lingfan Yu committed
56
57
58
59
60
61
62
    int64_t* deg_ptr = static_cast<int64_t*>(degs->data);
    int64_t* nid_ptr = static_cast<int64_t*>(nids->data);
    int64_t* nsec_ptr = static_cast<int64_t*>(nid_section->data);
    int64_t* mid_ptr = static_cast<int64_t*>(mids->data);
    int64_t* msec_ptr = static_cast<int64_t*>(mid_section->data);

    // fill in bucketing ordering
63
64
    for (const auto& it : bkt) {  // for each bucket
        const int64_t deg = it.first;
65
        const int64_t bucket_size = it.second.size();
Lingfan Yu's avatar
Lingfan Yu committed
66
        *deg_ptr++ = deg;
67
68
        *nsec_ptr++ = bucket_size;
        *msec_ptr++ = deg * bucket_size;
69
        for (const auto dst : it.second) {  // for each dst in this bucket
Lingfan Yu's avatar
Lingfan Yu committed
70
            *nid_ptr++ = dst;
71
            for (const auto mid : in_edges[dst]) {  // for each in edge of dst
Lingfan Yu's avatar
Lingfan Yu committed
72
73
74
75
76
                *mid_ptr++ = mid;
            }
        }
    }

77
78
79
80
81
82
83
84
    if (n_zero_deg > 0) {
        *deg_ptr = 0;
        *nsec_ptr = n_zero_deg;
        for (const auto dst : zero_deg_nodes) {
            *nid_ptr++ = dst;
        }
    }

Lingfan Yu's avatar
Lingfan Yu committed
85
86
87
88
89
90
91
92
93
94
    std::vector<IdArray> ret;
    ret.push_back(std::move(degs));
    ret.push_back(std::move(nids));
    ret.push_back(std::move(nid_section));
    ret.push_back(std::move(mids));
    ret.push_back(std::move(mid_section));

    return std::move(ret);
}

95
}  // namespace sched
Lingfan Yu's avatar
Lingfan Yu committed
96

97
}  // namespace dgl