hgt_sample_cpu.cpp 11.3 KB
Newer Older
rusty1s's avatar
rusty1s committed
1
2
#include "hgt_sample_cpu.h"

rusty1s's avatar
update  
rusty1s committed
3
#include <chrono> // TODO
rusty1s's avatar
rusty1s committed
4
5
#include <random>

rusty1s's avatar
update  
rusty1s committed
6
7
#include <ATen/Parallel.h>

rusty1s's avatar
rusty1s committed
8
9
10
11
12
13
14
15
16
17
18
edge_t split(const rel_t &rel_type) {
  std::vector<std::string> result(3);
  int start = 0, end = 0;
  for (int i = 0; i < 3; i++) {
    end = rel_type.find(delim, start);
    result[i] = rel_type.substr(start, end - start);
    start = end + 2;
  }
  return std::make_tuple(result[0], result[1], result[2]);
}

rusty1s's avatar
update  
rusty1s committed
19
torch::Tensor vec_to_tensor(const std::vector<int64_t> &v) {
rusty1s's avatar
update  
rusty1s committed
20
21
22
  auto *data = (int64_t *)v.data();
  auto size = (int64_t)v.size();
  return torch::from_blob(data, {size}, at::kLong).clone();
rusty1s's avatar
update  
rusty1s committed
23
24
25
}

template <typename Container>
rusty1s's avatar
rusty1s committed
26
27
28
void update_budget(
    std::unordered_map<node_t, std::unordered_map<int64_t, float>> *budget_dict,
    const node_t &node_type, //
rusty1s's avatar
update  
rusty1s committed
29
    const Container &sampled_nodes,
rusty1s's avatar
rusty1s committed
30
31
32
    const std::unordered_map<node_t, std::unordered_map<int64_t, int64_t>>
        &global_to_local_node_dict,
    const std::unordered_map<rel_t, edge_t> &rel_to_edge_type,
rusty1s's avatar
update  
rusty1s committed
33
34
    const c10::Dict<rel_t, torch::Tensor> &colptr_dict,
    const c10::Dict<rel_t, torch::Tensor> &row_dict) {
rusty1s's avatar
rusty1s committed
35

rusty1s's avatar
update  
rusty1s committed
36
  for (const auto &kv : colptr_dict) {
rusty1s's avatar
rusty1s committed
37
38
39
40
41
42
43
44
45
46
    const auto &rel_type = kv.key();
    const auto &edge_type = rel_to_edge_type.at(rel_type);
    const auto &src_node_type = std::get<0>(edge_type);
    const auto &dst_node_type = std::get<2>(edge_type);

    if (node_type != dst_node_type)
      continue;

    const auto &global_to_local_node =
        global_to_local_node_dict.at(src_node_type);
rusty1s's avatar
update  
rusty1s committed
47
48
    const auto *colptr_data = kv.value().data_ptr<int64_t>();
    const auto *row_data = row_dict.at(rel_type).data_ptr<int64_t>();
rusty1s's avatar
rusty1s committed
49
50
51
    auto &budget = (*budget_dict)[src_node_type];

    for (const auto &v : sampled_nodes) {
rusty1s's avatar
update  
rusty1s committed
52
      const int64_t col_start = colptr_data[v], col_end = colptr_data[v + 1];
rusty1s's avatar
update  
rusty1s committed
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
      const auto col_count = col_end - col_start;
      if (col_count > 520) { // TODO
        // There might be same neighbors with large neighborhood sizes.
        // In order to prevent that we fill our budget stare with many values
        // of low probability, we simply sample a subset without replacement.
        std::unordered_set<int64_t> perm;
        for (int64_t j = col_count - 520; j < col_count; j++) {
          if (!perm.insert(rand() % j).second)
            perm.insert(j);
        }
        const auto inv_deg = 1.f / 520.f;
        for (const auto &p : perm) {
          const auto w = row_data[col_start + p];
          // Only add the neighbor in case we have not yet seen it before:
          if (global_to_local_node.find(w) == global_to_local_node.end())
            budget[w] += inv_deg;
        }
      } else if (col_count > 0) {
rusty1s's avatar
update  
rusty1s committed
71
72
73
74
        const auto inv_deg = 1.f / float(col_end - col_start);
        for (int64_t j = col_start; j < col_end; j++) {
          const auto w = row_data[j];
          // Only add the neighbor in case we have not yet seen it before:
rusty1s's avatar
rusty1s committed
75
          if (global_to_local_node.find(w) == global_to_local_node.end())
rusty1s's avatar
update  
rusty1s committed
76
            budget[w] += inv_deg;
rusty1s's avatar
rusty1s committed
77
78
79
80
81
        }
      }
    }
  }

rusty1s's avatar
update  
rusty1s committed
82
83
84
  auto &budget = (*budget_dict)[node_type];
  for (const auto &v : sampled_nodes)
    budget.erase(v);
rusty1s's avatar
rusty1s committed
85
86
87
88
89
90
}

std::unordered_set<int64_t>
sample_from(const std::unordered_map<int64_t, float> &budget,
            const int64_t num_samples) {

rusty1s's avatar
update  
rusty1s committed
91
92
  std::unordered_set<int64_t> output;

rusty1s's avatar
rusty1s committed
93
94
95
96
97
  // Compute the squared L2 norm:
  auto norm = 0.f;
  for (const auto &kv : budget)
    norm += kv.second * kv.second;

rusty1s's avatar
update  
rusty1s committed
98
99
100
  if (norm == 0.) // No need to sample if there are no nodes in the budget:
    return output;

rusty1s's avatar
rusty1s committed
101
102
  // Generate `num_samples` sorted random values between `[0., norm)`:
  std::default_random_engine gen{std::random_device{}()};
rusty1s's avatar
update  
rusty1s committed
103
104
105
106
  std::uniform_real_distribution<float> dis(0.f, norm);
  std::vector<float> samples(num_samples);
  for (int64_t i = 0; i < num_samples; i++)
    samples[i] = dis(gen);
rusty1s's avatar
rusty1s committed
107
108
109
110
111
112
113
114
  std::sort(samples.begin(), samples.end());

  // Iterate through the budget to compute the cumulative probability
  // `cum_prob[i]` for node `i`. The j-th sample is assigned to node `i` iff
  // `cum_prob[i-1] < samples[j] < cum_prob[i]`.
  // The implementation assigns two iterators on budget and samples,
  // respectively, and then computes the node samples in linear time by
  // alternatingly incrementing the two iterators based on their values.
rusty1s's avatar
update  
rusty1s committed
115
  // TODO
rusty1s's avatar
rusty1s committed
116
  output.reserve(num_samples);
rusty1s's avatar
update  
rusty1s committed
117
118
119
120
121
122
  for (const auto &kv : budget) {
    output.insert(kv.first);
    if (output.size() == num_samples)
      break;
  }
  return output;
rusty1s's avatar
rusty1s committed
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

  auto j = samples.begin();
  auto cum_prob = 0.f;
  for (const auto &kv : budget) {
    cum_prob += kv.second * kv.second;

    // Increment iterator `j` until its value is greater than `cum_prob`:
    while (*j < cum_prob && j != samples.end()) {
      output.insert(kv.first);
      j++;
    }

    // Terminate early in case we have completed the sampling:
    if (j == samples.end())
      break;
  }

  return output;
}

std::tuple<c10::Dict<node_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>,
           c10::Dict<rel_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>>
rusty1s's avatar
update  
rusty1s committed
145
146
hgt_sample_cpu(const c10::Dict<rel_t, torch::Tensor> &colptr_dict,
               const c10::Dict<rel_t, torch::Tensor> &row_dict,
rusty1s's avatar
rusty1s committed
147
148
149
150
               const c10::Dict<node_t, torch::Tensor> &input_node_dict,
               const c10::Dict<node_t, std::vector<int64_t>> &num_samples_dict,
               int64_t num_hops) {

rusty1s's avatar
update  
rusty1s committed
151
  std::chrono::steady_clock::time_point a = std::chrono::steady_clock::now();
rusty1s's avatar
rusty1s committed
152
153
  // Create mapping to convert single string relations to edge type triplets:
  std::unordered_map<rel_t, edge_t> rel_to_edge_type;
rusty1s's avatar
update  
rusty1s committed
154
  for (const auto &kv : colptr_dict) {
rusty1s's avatar
rusty1s committed
155
156
157
158
    const auto &rel_type = kv.key();
    rel_to_edge_type[rel_type] = split(rel_type);
  }

rusty1s's avatar
fix  
rusty1s committed
159
  // Initialize various data structures for the sampling process:
rusty1s's avatar
rusty1s committed
160
161
162
  std::unordered_map<node_t, std::vector<int64_t>> sampled_nodes_dict;
  std::unordered_map<node_t, std::unordered_map<int64_t, int64_t>>
      global_to_local_node_dict;
rusty1s's avatar
fix  
rusty1s committed
163
164
165
166
167
168
169
  std::unordered_map<node_t, std::unordered_map<int64_t, float>> budget_dict;
  for (const auto &kv : num_samples_dict) {
    const auto &node_type = kv.key();
    sampled_nodes_dict[node_type];
    global_to_local_node_dict[node_type];
    budget_dict[node_type];
  }
rusty1s's avatar
rusty1s committed
170

rusty1s's avatar
update  
rusty1s committed
171
172
  // Add all input nodes of every node type to the sampled output set, and
  // compute initial budget (line 1-5):
rusty1s's avatar
rusty1s committed
173
174
175
176
177
  for (const auto &kv : input_node_dict) {
    const auto &node_type = kv.key();
    const auto &input_node = kv.value();
    const auto *input_node_data = input_node.data_ptr<int64_t>();

rusty1s's avatar
fix  
rusty1s committed
178
179
    auto &sampled_nodes = sampled_nodes_dict.at(node_type);
    auto &global_to_local_node = global_to_local_node_dict.at(node_type);
rusty1s's avatar
rusty1s committed
180

rusty1s's avatar
update  
rusty1s committed
181
    // Add each origin node to the sampled output nodes (line 1):
rusty1s's avatar
rusty1s committed
182
183
184
185
186
    for (int64_t i = 0; i < input_node.numel(); i++) {
      const auto v = input_node_data[i];
      sampled_nodes.push_back(v);
      global_to_local_node[v] = i;
    }
rusty1s's avatar
update  
rusty1s committed
187
188
189
190
191
192
193
  }

  // Update budget after input nodes have been added to the sampled output set
  // (line 2-5):
  for (const auto &kv : input_node_dict) {
    const auto &node_type = kv.key();
    const auto &sampled_nodes = sampled_nodes_dict.at(node_type);
rusty1s's avatar
rusty1s committed
194

rusty1s's avatar
update  
rusty1s committed
195
196
197
    update_budget<std::vector<int64_t>>(
        &budget_dict, node_type, sampled_nodes, global_to_local_node_dict,
        rel_to_edge_type, colptr_dict, row_dict);
rusty1s's avatar
rusty1s committed
198
199
  }

rusty1s's avatar
update  
rusty1s committed
200
201
202
203
204
205
206
  std::chrono::steady_clock::time_point b = std::chrono::steady_clock::now();
  std::cout
      << "[1] = "
      << std::chrono::duration_cast<std::chrono::microseconds>(b - a).count()
      << "[µs]" << std::endl;

  a = std::chrono::steady_clock::now();
rusty1s's avatar
rusty1s committed
207
208
  // Sample nodes for each node type in each layer (line 6 - 18):
  for (int64_t ell = 0; ell < num_hops; ell++) {
rusty1s's avatar
update  
rusty1s committed
209
210
211
212
213
    std::vector<node_t> node_types; // Only iterate over non-empty budgets.
    for (const auto &kv : budget_dict) {
      if (kv.second.size() > 0)
        node_types.push_back(kv.first);
    }
rusty1s's avatar
rusty1s committed
214

rusty1s's avatar
update  
rusty1s committed
215
216
217
218
219
220
221
    std::unordered_map<node_t, std::unordered_set<int64_t>>
        tmp_sampled_nodes_dict;
    at::parallel_for(0, node_types.size(), 1, [&](int64_t begin, int64_t end) {
      for (int64_t i = begin; i < end; i++) {
        const auto &node_type = node_types[i];
        const auto &budget = budget_dict.at(node_type);
        const auto num_samples = num_samples_dict.at(node_type)[ell];
rusty1s's avatar
rusty1s committed
222

rusty1s's avatar
update  
rusty1s committed
223
224
225
226
227
228
        // Sample `num_samples` nodes of `node_type` according to the budget
        // (line 9-11):
        const auto tmp_sampled_nodes = sample_from(budget, num_samples);
        tmp_sampled_nodes_dict[node_type] = tmp_sampled_nodes;

        // Add intermediate samples to the sampled output set (line 13):
rusty1s's avatar
update  
rusty1s committed
229
230
        auto &sampled_nodes = sampled_nodes_dict.at(node_type);
        auto &global_to_local_node = global_to_local_node_dict.at(node_type);
rusty1s's avatar
update  
rusty1s committed
231
        for (const auto &v : tmp_sampled_nodes) {
rusty1s's avatar
update  
rusty1s committed
232
233
234
235
          sampled_nodes.push_back(v);
          global_to_local_node[v] = sampled_nodes.size();
        }
      }
rusty1s's avatar
update  
rusty1s committed
236
237
238
239
240
241
242
    });

    for (const auto &kv : tmp_sampled_nodes_dict) {
      // Add neighbors of newly sampled nodes to the bucket (line 14-15):
      update_budget<std::unordered_set<int64_t>>(
          &budget_dict, kv.first, kv.second, global_to_local_node_dict,
          rel_to_edge_type, colptr_dict, row_dict);
rusty1s's avatar
rusty1s committed
243
244
    }
  }
rusty1s's avatar
update  
rusty1s committed
245
246
247
248
249
  b = std::chrono::steady_clock::now();
  std::cout
      << "[2] = "
      << std::chrono::duration_cast<std::chrono::microseconds>(b - a).count()
      << "[µs]" << std::endl;
rusty1s's avatar
rusty1s committed
250

rusty1s's avatar
update  
rusty1s committed
251
  a = std::chrono::steady_clock::now();
rusty1s's avatar
rusty1s committed
252
253
254
255
  // Reconstruct the sampled adjacency matrix among the sampled nodes (line 19):
  c10::Dict<rel_t, torch::Tensor> output_row_dict;
  c10::Dict<rel_t, torch::Tensor> output_col_dict;
  c10::Dict<rel_t, torch::Tensor> output_edge_dict;
rusty1s's avatar
update  
rusty1s committed
256
257
258
259

  // TODO: Parallelize across edge types?
  //
  // at::parallel_for(0, edge_types.size(), 1, [&](int64_t begin, int64_t end) {
rusty1s's avatar
update  
rusty1s committed
260
  for (const auto &kv : colptr_dict) {
rusty1s's avatar
rusty1s committed
261
262
263
264
265
    const auto &rel_type = kv.key();
    const auto &edge_type = rel_to_edge_type.at(rel_type);
    const auto &src_node_type = std::get<0>(edge_type);
    const auto &dst_node_type = std::get<2>(edge_type);

rusty1s's avatar
update  
rusty1s committed
266
267
    const auto *colptr_data = kv.value().data_ptr<int64_t>();
    const auto *row_data = row_dict.at(rel_type).data_ptr<int64_t>();
rusty1s's avatar
rusty1s committed
268
269
270
271
272
273
274

    const auto &sampled_dst_nodes = sampled_nodes_dict[dst_node_type];
    const auto &global_to_local_src = global_to_local_node_dict[src_node_type];

    std::vector<int64_t> rows, cols, edges;
    for (int64_t i = 0; i < (int64_t)sampled_dst_nodes.size(); i++) {
      const auto v = sampled_dst_nodes[i];
rusty1s's avatar
update  
rusty1s committed
275
276
277
      const int64_t col_start = colptr_data[v], col_end = colptr_data[v + 1];
      for (int64_t j = col_start; j < col_end; j++) {
        const auto w = row_data[j];
rusty1s's avatar
rusty1s committed
278
        if (global_to_local_src.find(w) != global_to_local_src.end()) {
rusty1s's avatar
update  
rusty1s committed
279
280
          rows.push_back(global_to_local_src.at(w));
          cols.push_back(i);
rusty1s's avatar
rusty1s committed
281
282
283
284
285
          edges.push_back(j);
        }
      }
    }

rusty1s's avatar
update  
rusty1s committed
286
287
288
289
290
    if (rows.size() > 0) {
      output_row_dict.insert(rel_type, vec_to_tensor(rows));
      output_col_dict.insert(rel_type, vec_to_tensor(cols));
      output_edge_dict.insert(rel_type, vec_to_tensor(edges));
    }
rusty1s's avatar
rusty1s committed
291
292
293
294
295
  }

  // Generate tensor-valued output node dict (line 20):
  c10::Dict<node_t, torch::Tensor> output_node_dict;
  for (const auto &kv : sampled_nodes_dict) {
rusty1s's avatar
update  
rusty1s committed
296
297
    if (kv.second.size() > 0)
      output_node_dict.insert(kv.first, vec_to_tensor(kv.second));
rusty1s's avatar
rusty1s committed
298
  }
rusty1s's avatar
update  
rusty1s committed
299
300
301
302
303
  b = std::chrono::steady_clock::now();
  std::cout
      << "[3] = "
      << std::chrono::duration_cast<std::chrono::microseconds>(b - a).count()
      << "[µs]" << std::endl;
rusty1s's avatar
rusty1s committed
304
305
306
307

  return std::make_tuple(output_node_dict, output_row_dict, output_col_dict,
                         output_edge_dict);
}