scheduler.cc 6.15 KB
Newer Older
1
2
3
4
5
6
/*!
 *  Copyright (c) 2018 by Contributors
 * \file scheduler/scheduler.cc
 * \brief DGL Scheduler implementation
 */
#include <dgl/scheduler.h>
Lingfan Yu's avatar
Lingfan Yu committed
7
8
9
10
11
12
#include <unordered_map>
#include <vector>

namespace dgl {
namespace sched {

13
template <class IdType>
14
15
16
17
std::vector<IdArray> DegreeBucketing(const IdArray& msg_ids, const IdArray& vids,
        const IdArray& recv_ids) {
    auto n_msgs = msg_ids->shape[0];

18
19
20
    const IdType* vid_data = static_cast<IdType*>(vids->data);
    const IdType* msg_id_data = static_cast<IdType*>(msg_ids->data);
    const IdType* recv_id_data = static_cast<IdType*>(recv_ids->data);
Lingfan Yu's avatar
Lingfan Yu committed
21

22
    // in edge: dst->msgs
23
24
    std::unordered_map<IdType, std::vector<IdType>> in_edges;
    for (IdType i = 0; i < n_msgs; ++i) {
25
        in_edges[vid_data[i]].push_back(msg_id_data[i]);
Lingfan Yu's avatar
Lingfan Yu committed
26
27
28
    }

    // bkt: deg->dsts
29
    std::unordered_map<IdType, std::vector<IdType>> bkt;
30
    for (const auto& it : in_edges) {
Lingfan Yu's avatar
Lingfan Yu committed
31
32
33
        bkt[it.second.size()].push_back(it.first);
    }

34
35
    std::unordered_set<IdType> zero_deg_nodes;
    for (IdType i = 0; i < recv_ids->shape[0]; ++i) {
36
37
38
39
40
41
42
        if (in_edges.find(recv_id_data[i]) == in_edges.end()) {
            zero_deg_nodes.insert(recv_id_data[i]);
        }
    }
    auto n_zero_deg = zero_deg_nodes.size();

    // calc output size
43
44
45
    IdType n_deg = bkt.size();
    IdType n_dst = in_edges.size();
    IdType n_mid_sec = bkt.size();  // zero deg won't affect message size
46
47
48
49
50
51
    if (n_zero_deg > 0) {
        n_deg += 1;
        n_dst += n_zero_deg;
    }

    // initialize output
Lingfan Yu's avatar
Lingfan Yu committed
52
53
54
55
    IdArray degs = IdArray::Empty({n_deg}, vids->dtype, vids->ctx);
    IdArray nids = IdArray::Empty({n_dst}, vids->dtype, vids->ctx);
    IdArray nid_section = IdArray::Empty({n_deg}, vids->dtype, vids->ctx);
    IdArray mids = IdArray::Empty({n_msgs}, vids->dtype, vids->ctx);
56
    IdArray mid_section = IdArray::Empty({n_mid_sec}, vids->dtype, vids->ctx);
57
58
59
60
61
    IdType* deg_ptr = static_cast<IdType*>(degs->data);
    IdType* nid_ptr = static_cast<IdType*>(nids->data);
    IdType* nsec_ptr = static_cast<IdType*>(nid_section->data);
    IdType* mid_ptr = static_cast<IdType*>(mids->data);
    IdType* msec_ptr = static_cast<IdType*>(mid_section->data);
Lingfan Yu's avatar
Lingfan Yu committed
62
63

    // fill in bucketing ordering
64
    for (const auto& it : bkt) {  // for each bucket
65
66
        const IdType deg = it.first;
        const IdType bucket_size = it.second.size();
Lingfan Yu's avatar
Lingfan Yu committed
67
        *deg_ptr++ = deg;
68
69
        *nsec_ptr++ = bucket_size;
        *msec_ptr++ = deg * bucket_size;
70
        for (const auto dst : it.second) {  // for each dst in this bucket
Lingfan Yu's avatar
Lingfan Yu committed
71
            *nid_ptr++ = dst;
72
            for (const auto mid : in_edges[dst]) {  // for each in edge of dst
Lingfan Yu's avatar
Lingfan Yu committed
73
74
75
76
77
                *mid_ptr++ = mid;
            }
        }
    }

78
79
80
81
82
83
84
85
    if (n_zero_deg > 0) {
        *deg_ptr = 0;
        *nsec_ptr = n_zero_deg;
        for (const auto dst : zero_deg_nodes) {
            *nid_ptr++ = dst;
        }
    }

Lingfan Yu's avatar
Lingfan Yu committed
86
87
88
89
90
91
92
93
94
95
    std::vector<IdArray> ret;
    ret.push_back(std::move(degs));
    ret.push_back(std::move(nids));
    ret.push_back(std::move(nid_section));
    ret.push_back(std::move(mids));
    ret.push_back(std::move(mid_section));

    return std::move(ret);
}

96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
template std::vector<IdArray> DegreeBucketing<int32_t>(const IdArray& msg_ids,
                                                       const IdArray& vids,
                                                       const IdArray& recv_ids);

template std::vector<IdArray> DegreeBucketing<int64_t>(const IdArray& msg_ids,
                                                       const IdArray& vids,
                                                       const IdArray& recv_ids);

template <class IdType>
std::vector<IdArray> GroupEdgeByNodeDegree(const IdArray& uids,
                                           const IdArray& vids,
                                           const IdArray& eids) {
  auto n_edge = eids->shape[0];
  const IdType* eid_data = static_cast<IdType*>(eids->data);
  const IdType* uid_data = static_cast<IdType*>(uids->data);
  const IdType* vid_data = static_cast<IdType*>(vids->data);

  // node2edge: group_by nodes uid -> (eid, the other end vid)
  std::unordered_map<IdType, std::vector<std::pair<IdType, IdType>>> node2edge;
  for (IdType i = 0; i < n_edge; ++i) {
    node2edge[uid_data[i]].emplace_back(eid_data[i], vid_data[i]);
  }

  // bkt: deg -> group_by node uid
  std::unordered_map<IdType, std::vector<IdType>> bkt;
  for (const auto& it : node2edge) {
    bkt[it.second.size()].push_back(it.first);
  }

  // number of unique degree
  IdType n_deg = bkt.size();

  // initialize output
  IdArray degs = IdArray::Empty({n_deg}, eids->dtype, eids->ctx);
  IdArray new_uids = IdArray::Empty({n_edge}, uids->dtype, uids->ctx);
  IdArray new_vids = IdArray::Empty({n_edge}, vids->dtype, vids->ctx);
  IdArray new_eids = IdArray::Empty({n_edge}, eids->dtype, eids->ctx);
  IdArray sections = IdArray::Empty({n_deg}, eids->dtype, eids->ctx);
  IdType* deg_ptr = static_cast<IdType*>(degs->data);
  IdType* uid_ptr = static_cast<IdType*>(new_uids->data);
  IdType* vid_ptr = static_cast<IdType*>(new_vids->data);
  IdType* eid_ptr = static_cast<IdType*>(new_eids->data);
  IdType* sec_ptr = static_cast<IdType*>(sections->data);

  // fill in bucketing ordering
  for (const auto& it : bkt) {  // for each bucket
    // degree of this bucket
    const IdType deg = it.first;
    // number of edges in this bucket
    const IdType bucket_size = it.second.size();
    *deg_ptr++ = deg;
    *sec_ptr++ = deg * bucket_size;
    for (const auto u : it.second) {           // for uid in this bucket
      for (const auto& pair : node2edge[u]) {  // for each edge of uid
        *uid_ptr++ = u;
        *vid_ptr++ = pair.second;
        *eid_ptr++ = pair.first;
      }
154
    }
155
  }
156

157
158
159
160
161
162
  std::vector<IdArray> ret;
  ret.push_back(std::move(degs));
  ret.push_back(std::move(new_uids));
  ret.push_back(std::move(new_vids));
  ret.push_back(std::move(new_eids));
  ret.push_back(std::move(sections));
163

164
165
  return std::move(ret);
}
166

167
168
template std::vector<IdArray> GroupEdgeByNodeDegree<int32_t>(
    const IdArray& uids, const IdArray& vids, const IdArray& eids);
169

170
171
template std::vector<IdArray> GroupEdgeByNodeDegree<int64_t>(
    const IdArray& uids, const IdArray& vids, const IdArray& eids);
172

173
}  // namespace sched
Lingfan Yu's avatar
Lingfan Yu committed
174

175
}  // namespace dgl