Commit 2af23342 authored by rusty1s's avatar rusty1s
Browse files

update HGT sampling

parent f54d6b04
#include "hgt_sample_cpu.h" #include "hgt_sample_cpu.h"
#include <chrono> // TODO #include "utils.h"
#include <random>
#include <ATen/Parallel.h> #define MAX_NEIGHBORS 50
edge_t split(const rel_t &rel_type) { edge_t split(const rel_t &rel_type) {
std::vector<std::string> result(3); vector<string> result(3);
int start = 0, end = 0; int start = 0, end;
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
end = rel_type.find(delim, start); end = rel_type.find("__", start);
result[i] = rel_type.substr(start, end - start); result[i] = rel_type.substr(start, end - start);
start = end + 2; start = end + 2;
} }
return std::make_tuple(result[0], result[1], result[2]); return make_tuple(result[0], result[1], result[2]);
} }
torch::Tensor vec_to_tensor(const std::vector<int64_t> &v) { void update_budget_(
auto *data = (int64_t *)v.data(); unordered_map<node_t, unordered_map<int64_t, float>> *budget_dict,
auto size = (int64_t)v.size();
return torch::from_blob(data, {size}, at::kLong).clone();
}
template <typename Container>
void update_budget(
std::unordered_map<node_t, std::unordered_map<int64_t, float>> *budget_dict,
const node_t &node_type, // const node_t &node_type, //
const Container &sampled_nodes, const vector<int64_t> &samples,
const std::unordered_map<node_t, std::unordered_map<int64_t, int64_t>> const unordered_map<node_t, unordered_map<int64_t, int64_t>>
&global_to_local_node_dict, &to_local_node_dict,
const std::unordered_map<rel_t, edge_t> &rel_to_edge_type, const unordered_map<rel_t, edge_t> &to_edge_type,
const c10::Dict<rel_t, torch::Tensor> &colptr_dict, const c10::Dict<rel_t, torch::Tensor> &colptr_dict,
const c10::Dict<rel_t, torch::Tensor> &row_dict) { const c10::Dict<rel_t, torch::Tensor> &row_dict) {
if (samples.empty())
return;
for (const auto &kv : colptr_dict) { for (const auto &kv : colptr_dict) {
const auto &rel_type = kv.key(); const auto &rel_type = kv.key();
const auto &edge_type = rel_to_edge_type.at(rel_type); const auto &edge_type = to_edge_type.at(rel_type);
const auto &src_node_type = std::get<0>(edge_type); const auto &src_node_type = get<0>(edge_type);
const auto &dst_node_type = std::get<2>(edge_type); const auto &dst_node_type = get<2>(edge_type);
if (node_type != dst_node_type) if (node_type != dst_node_type)
continue; continue;
const auto &global_to_local_node = const auto &to_local_src_node = to_local_node_dict.at(src_node_type);
global_to_local_node_dict.at(src_node_type);
const auto *colptr_data = kv.value().data_ptr<int64_t>(); const auto *colptr_data = kv.value().data_ptr<int64_t>();
const auto *row_data = row_dict.at(rel_type).data_ptr<int64_t>(); const auto *row_data = row_dict.at(rel_type).data_ptr<int64_t>();
auto &budget = (*budget_dict)[src_node_type]; auto &src_budget = budget_dict->at(src_node_type);
for (const auto &v : sampled_nodes) { for (const auto &w : samples) {
const int64_t col_start = colptr_data[v], col_end = colptr_data[v + 1]; const auto &col_start = colptr_data[w], &col_end = colptr_data[w + 1];
const auto col_count = col_end - col_start; if (col_end - col_start > MAX_NEIGHBORS) {
if (col_count > 520) { // TODO
// There might be same neighbors with large neighborhood sizes. // There might be same neighbors with large neighborhood sizes.
// In order to prevent that we fill our budget stare with many values // In order to prevent that we fill our budget with many values of low
// of low probability, we simply sample a subset without replacement. // probability, we instead sample a fixed amount without replacement:
std::unordered_set<int64_t> perm; auto indices = choice(col_end - col_start, MAX_NEIGHBORS, false);
for (int64_t j = col_count - 520; j < col_count; j++) { auto *indices_data = indices.data_ptr<int64_t>();
if (!perm.insert(rand() % j).second) for (int64_t i = 0; i < indices.numel(); i++) {
perm.insert(j); const auto &v = row_data[col_start + indices_data[i]];
}
const auto inv_deg = 1.f / 520.f;
for (const auto &p : perm) {
const auto w = row_data[col_start + p];
// Only add the neighbor in case we have not yet seen it before: // Only add the neighbor in case we have not yet seen it before:
if (global_to_local_node.find(w) == global_to_local_node.end()) if (to_local_src_node.find(v) == to_local_src_node.end())
budget[w] += inv_deg; src_budget[v] += 1.f / float(MAX_NEIGHBORS);
} }
} else if (col_count > 0) {
} else if (col_end != col_start) {
const auto inv_deg = 1.f / float(col_end - col_start); const auto inv_deg = 1.f / float(col_end - col_start);
for (int64_t j = col_start; j < col_end; j++) { for (int64_t i = col_start; i < col_end; i++) {
const auto w = row_data[j]; const auto &v = row_data[i];
// Only add the neighbor in case we have not yet seen it before: // Only add the neighbor in case we have not yet seen it before:
if (global_to_local_node.find(w) == global_to_local_node.end()) if (to_local_src_node.find(v) == to_local_src_node.end())
budget[w] += inv_deg; src_budget[v] += inv_deg;
} }
} }
} }
} }
auto &budget = (*budget_dict)[node_type];
for (const auto &v : sampled_nodes)
budget.erase(v);
} }
std::unordered_set<int64_t> vector<int64_t> sample_from(const unordered_map<int64_t, float> &budget,
sample_from(const std::unordered_map<int64_t, float> &budget, const int64_t num_samples) {
const int64_t num_samples) { vector<int64_t> indices;
vector<float> weights;
std::unordered_set<int64_t> output; indices.reserve(budget.size());
weights.reserve(budget.size());
// Compute the squared L2 norm:
auto norm = 0.f;
for (const auto &kv : budget)
norm += kv.second * kv.second;
if (norm == 0.) // No need to sample if there are no nodes in the budget:
return output;
// Generate `num_samples` sorted random values between `[0., norm)`:
std::default_random_engine gen{std::random_device{}()};
std::uniform_real_distribution<float> dis(0.f, norm);
std::vector<float> samples(num_samples);
for (int64_t i = 0; i < num_samples; i++)
samples[i] = dis(gen);
std::sort(samples.begin(), samples.end());
// Iterate through the budget to compute the cumulative probability
// `cum_prob[i]` for node `i`. The j-th sample is assigned to node `i` iff
// `cum_prob[i-1] < samples[j] < cum_prob[i]`.
// The implementation assigns two iterators on budget and samples,
// respectively, and then computes the node samples in linear time by
// alternatingly incrementing the two iterators based on their values.
// TODO
output.reserve(num_samples);
for (const auto &kv : budget) { for (const auto &kv : budget) {
output.insert(kv.first); indices.push_back(kv.first);
if (output.size() == num_samples) weights.push_back(kv.second * kv.second);
break;
} }
return output;
auto j = samples.begin();
auto cum_prob = 0.f;
for (const auto &kv : budget) {
cum_prob += kv.second * kv.second;
// Increment iterator `j` until its value is greater than `cum_prob`: const auto weight = from_vector(weights, true);
while (*j < cum_prob && j != samples.end()) { const auto sample = choice(budget.size(), num_samples, false, weight);
output.insert(kv.first); const auto *sample_data = sample.data_ptr<int64_t>();
j++;
}
// Terminate early in case we have completed the sampling: vector<int64_t> out(sample.numel());
if (j == samples.end()) for (int64_t i = 0; i < sample.numel(); i++) {
break; out[i] = indices[sample_data[i]];
} }
return out;
return output;
} }
std::tuple<c10::Dict<node_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>, tuple<c10::Dict<node_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>,
c10::Dict<rel_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>> c10::Dict<rel_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>>
hgt_sample_cpu(const c10::Dict<rel_t, torch::Tensor> &colptr_dict, hgt_sample_cpu(const c10::Dict<rel_t, torch::Tensor> &colptr_dict,
const c10::Dict<rel_t, torch::Tensor> &row_dict, const c10::Dict<rel_t, torch::Tensor> &row_dict,
const c10::Dict<node_t, torch::Tensor> &input_node_dict, const c10::Dict<node_t, torch::Tensor> &input_node_dict,
const c10::Dict<node_t, std::vector<int64_t>> &num_samples_dict, const c10::Dict<node_t, vector<int64_t>> &num_samples_dict,
int64_t num_hops) { const int64_t num_hops) {
std::chrono::steady_clock::time_point a = std::chrono::steady_clock::now(); // Create a mapping to convert single string relations to edge type triplets:
// Create mapping to convert single string relations to edge type triplets: std::unordered_map<rel_t, edge_t> to_edge_type;
std::unordered_map<rel_t, edge_t> rel_to_edge_type;
for (const auto &kv : colptr_dict) { for (const auto &kv : colptr_dict) {
const auto &rel_type = kv.key(); const auto &rel_type = kv.key();
rel_to_edge_type[rel_type] = split(rel_type); to_edge_type[rel_type] = split(rel_type);
} }
// Initialize various data structures for the sampling process: // Initialize some necessary data structures for the sampling process:
std::unordered_map<node_t, std::vector<int64_t>> sampled_nodes_dict; unordered_map<node_t, vector<int64_t>> nodes_dict;
std::unordered_map<node_t, std::unordered_map<int64_t, int64_t>> unordered_map<node_t, unordered_map<int64_t, int64_t>> to_local_node_dict;
global_to_local_node_dict; unordered_map<node_t, unordered_map<int64_t, float>> budget_dict;
std::unordered_map<node_t, std::unordered_map<int64_t, float>> budget_dict;
for (const auto &kv : num_samples_dict) { for (const auto &kv : num_samples_dict) {
const auto &node_type = kv.key(); const auto &node_type = kv.key();
sampled_nodes_dict[node_type]; nodes_dict[node_type];
global_to_local_node_dict[node_type]; to_local_node_dict[node_type];
budget_dict[node_type]; budget_dict[node_type];
} }
// Add all input nodes of every node type to the sampled output set, and // Add the input nodes to the sampled output nodes (line 1):
// compute initial budget (line 1-5):
for (const auto &kv : input_node_dict) { for (const auto &kv : input_node_dict) {
const auto &node_type = kv.key(); const auto &node_type = kv.key();
const auto &input_node = kv.value(); const auto &input_node = kv.value();
const auto *input_node_data = input_node.data_ptr<int64_t>(); const auto *input_node_data = input_node.data_ptr<int64_t>();
auto &sampled_nodes = sampled_nodes_dict.at(node_type); auto &nodes = nodes_dict.at(node_type);
auto &global_to_local_node = global_to_local_node_dict.at(node_type); auto &to_local_node = to_local_node_dict.at(node_type);
// Add each origin node to the sampled output nodes (line 1):
for (int64_t i = 0; i < input_node.numel(); i++) { for (int64_t i = 0; i < input_node.numel(); i++) {
const auto v = input_node_data[i]; const auto &v = input_node_data[i];
sampled_nodes.push_back(v); nodes.push_back(v);
global_to_local_node[v] = i; to_local_node[v] = i;
} }
} }
b = steady_clock::now();
// Update budget after input nodes have been added to the sampled output set std::cout << "3=" << duration_cast<microseconds>(b - a).count() << std::endl;
// (line 2-5):
for (const auto &kv : input_node_dict) { a = steady_clock::now();
const auto &node_type = kv.key(); // Update the budget based on the initial input set (line 3-5):
const auto &sampled_nodes = sampled_nodes_dict.at(node_type); for (const auto &kv : nodes_dict) {
const auto &node_type = kv.first;
update_budget<std::vector<int64_t>>( const auto &last_samples = kv.second;
&budget_dict, node_type, sampled_nodes, global_to_local_node_dict, update_budget_(&budget_dict, node_type, last_samples, to_local_node_dict,
rel_to_edge_type, colptr_dict, row_dict); to_edge_type, colptr_dict, row_dict);
} }
std::chrono::steady_clock::time_point b = std::chrono::steady_clock::now();
std::cout
<< "[1] = "
<< std::chrono::duration_cast<std::chrono::microseconds>(b - a).count()
<< "[µs]" << std::endl;
a = std::chrono::steady_clock::now();
// Sample nodes for each node type in each layer (line 6 - 18):
for (int64_t ell = 0; ell < num_hops; ell++) { for (int64_t ell = 0; ell < num_hops; ell++) {
std::vector<node_t> node_types; // Only iterate over non-empty budgets. unordered_map<node_t, vector<int64_t>> samples_dict;
for (const auto &kv : budget_dict) { for (auto &kv : budget_dict) {
if (kv.second.size() > 0) const auto &node_type = kv.first;
node_types.push_back(kv.first); auto &budget = kv.second;
const auto num_samples = num_samples_dict.at(node_type)[ell];
// Sample `num_samples` nodes, according to the budget (line 9-11):
const auto samples = sample_from(budget, num_samples);
samples_dict[node_type] = samples;
// Add samples to the sampled output nodes, and erase them from the budget
// (line 13/15):
auto &nodes = nodes_dict.at(node_type);
auto &to_local_node = to_local_node_dict.at(node_type);
for (const auto &v : samples) {
to_local_node[v] = nodes.size();
nodes.push_back(v);
budget.erase(v);
}
} }
std::unordered_map<node_t, std::unordered_set<int64_t>> if (ell < num_hops - 1) {
tmp_sampled_nodes_dict; // Add neighbors of newly sampled nodes to the budget (line 14):
at::parallel_for(0, node_types.size(), 1, [&](int64_t begin, int64_t end) { // Note that we do not need to update the budget in the last iteration.
for (int64_t i = begin; i < end; i++) { for (const auto &kv : samples_dict) {
const auto &node_type = node_types[i]; const auto &node_type = kv.first;
const auto &budget = budget_dict.at(node_type); const auto &last_samples = kv.second;
const auto num_samples = num_samples_dict.at(node_type)[ell]; update_budget_(&budget_dict, node_type, last_samples,
to_local_node_dict, to_edge_type, colptr_dict, row_dict);
// Sample `num_samples` nodes of `node_type` according to the budget
// (line 9-11):
const auto tmp_sampled_nodes = sample_from(budget, num_samples);
tmp_sampled_nodes_dict[node_type] = tmp_sampled_nodes;
// Add intermediate samples to the sampled output set (line 13):
auto &sampled_nodes = sampled_nodes_dict.at(node_type);
auto &global_to_local_node = global_to_local_node_dict.at(node_type);
for (const auto &v : tmp_sampled_nodes) {
sampled_nodes.push_back(v);
global_to_local_node[v] = sampled_nodes.size();
}
} }
});
for (const auto &kv : tmp_sampled_nodes_dict) {
// Add neighbors of newly sampled nodes to the bucket (line 14-15):
update_budget<std::unordered_set<int64_t>>(
&budget_dict, kv.first, kv.second, global_to_local_node_dict,
rel_to_edge_type, colptr_dict, row_dict);
} }
} }
b = std::chrono::steady_clock::now();
std::cout
<< "[2] = "
<< std::chrono::duration_cast<std::chrono::microseconds>(b - a).count()
<< "[µs]" << std::endl;
a = std::chrono::steady_clock::now(); c10::Dict<node_t, torch::Tensor> out_node_dict;
// Reconstruct the sampled adjacency matrix among the sampled nodes (line 19): c10::Dict<rel_t, torch::Tensor> out_row_dict;
c10::Dict<rel_t, torch::Tensor> output_row_dict; c10::Dict<rel_t, torch::Tensor> out_col_dict;
c10::Dict<rel_t, torch::Tensor> output_col_dict; c10::Dict<rel_t, torch::Tensor> out_edge_dict;
c10::Dict<rel_t, torch::Tensor> output_edge_dict;
// TODO: Parallelize across edge types? // Reconstruct the sampled adjacency matrix among the sampled nodes (line 19):
//
// at::parallel_for(0, edge_types.size(), 1, [&](int64_t begin, int64_t end) {
for (const auto &kv : colptr_dict) { for (const auto &kv : colptr_dict) {
const auto &rel_type = kv.key(); const auto &rel_type = kv.key();
const auto &edge_type = rel_to_edge_type.at(rel_type); const auto &edge_type = to_edge_type.at(rel_type);
const auto &src_node_type = std::get<0>(edge_type); const auto &src_node_type = get<0>(edge_type);
const auto &dst_node_type = std::get<2>(edge_type); const auto &dst_node_type = get<2>(edge_type);
const auto *colptr_data = kv.value().data_ptr<int64_t>(); const auto *colptr_data = kv.value().data_ptr<int64_t>();
const auto *row_data = row_dict.at(rel_type).data_ptr<int64_t>(); const auto *row_data = row_dict.at(rel_type).data_ptr<int64_t>();
const auto &sampled_dst_nodes = sampled_nodes_dict[dst_node_type]; const auto &dst_nodes = nodes_dict.at(dst_node_type);
const auto &global_to_local_src = global_to_local_node_dict[src_node_type]; const auto &to_local_src_node = to_local_node_dict.at(src_node_type);
std::vector<int64_t> rows, cols, edges; vector<int64_t> rows, cols, edges;
for (int64_t i = 0; i < (int64_t)sampled_dst_nodes.size(); i++) { for (int64_t i = 0; i < (int64_t)dst_nodes.size(); i++) {
const auto v = sampled_dst_nodes[i]; const auto &w = dst_nodes[i];
const int64_t col_start = colptr_data[v], col_end = colptr_data[v + 1]; const auto &col_start = colptr_data[w], &col_end = colptr_data[w + 1];
for (int64_t j = col_start; j < col_end; j++) { if (col_end - col_start > MAX_NEIGHBORS) {
const auto w = row_data[j]; auto indices = choice(col_end - col_start, MAX_NEIGHBORS, false);
if (global_to_local_src.find(w) != global_to_local_src.end()) { auto *indices_data = indices.data_ptr<int64_t>();
rows.push_back(global_to_local_src.at(w)); for (int64_t j = 0; j < indices.numel(); j++) {
cols.push_back(i); const auto &v = row_data[col_start + indices_data[j]];
edges.push_back(j); if (to_local_src_node.find(v) != to_local_src_node.end()) {
rows.push_back(to_local_src_node.at(v));
cols.push_back(i);
edges.push_back(col_start + j);
}
}
} else {
for (int64_t j = col_start; j < col_end; j++) {
const auto &v = row_data[j];
if (to_local_src_node.find(v) != to_local_src_node.end()) {
rows.push_back(to_local_src_node.at(v));
cols.push_back(i);
edges.push_back(j);
}
} }
} }
} }
if (rows.size() > 0) { if (rows.size() > 0) {
output_row_dict.insert(rel_type, vec_to_tensor(rows)); out_row_dict.insert(rel_type, from_vector<int64_t>(rows));
output_col_dict.insert(rel_type, vec_to_tensor(cols)); out_col_dict.insert(rel_type, from_vector<int64_t>(cols));
output_edge_dict.insert(rel_type, vec_to_tensor(edges)); out_edge_dict.insert(rel_type, from_vector<int64_t>(edges));
} }
} }
// Generate tensor-valued output node dict (line 20): // Generate tensor-valued output node dictionary (line 20):
c10::Dict<node_t, torch::Tensor> output_node_dict; for (const auto &kv : nodes_dict) {
for (const auto &kv : sampled_nodes_dict) { const auto &node_type = kv.first;
if (kv.second.size() > 0) const auto &nodes = kv.second;
output_node_dict.insert(kv.first, vec_to_tensor(kv.second)); if (!nodes.empty())
out_node_dict.insert(node_type, from_vector<int64_t>(nodes));
} }
b = std::chrono::steady_clock::now();
std::cout
<< "[3] = "
<< std::chrono::duration_cast<std::chrono::microseconds>(b - a).count()
<< "[µs]" << std::endl;
return std::make_tuple(output_node_dict, output_row_dict, output_col_dict, return make_tuple(out_node_dict, out_row_dict, out_col_dict, out_edge_dict);
output_edge_dict);
} }
...@@ -6,12 +6,10 @@ typedef std::string node_t; ...@@ -6,12 +6,10 @@ typedef std::string node_t;
typedef std::string rel_t; typedef std::string rel_t;
typedef std::tuple<std::string, std::string, std::string> edge_t; typedef std::tuple<std::string, std::string, std::string> edge_t;
const std::string delim = "__";
std::tuple<c10::Dict<node_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>, std::tuple<c10::Dict<node_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>,
c10::Dict<rel_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>> c10::Dict<rel_t, torch::Tensor>, c10::Dict<rel_t, torch::Tensor>>
hgt_sample_cpu(const c10::Dict<rel_t, torch::Tensor> &colptr_dict, hgt_sample_cpu(const c10::Dict<rel_t, torch::Tensor> &colptr_dict,
const c10::Dict<rel_t, torch::Tensor> &row_dict, const c10::Dict<rel_t, torch::Tensor> &row_dict,
const c10::Dict<node_t, torch::Tensor> &input_node_dict, const c10::Dict<node_t, torch::Tensor> &input_node_dict,
const c10::Dict<node_t, std::vector<int64_t>> &num_samples_dict, const c10::Dict<node_t, std::vector<int64_t>> &num_samples_dict,
int64_t num_hops); const int64_t num_hops);
...@@ -56,7 +56,7 @@ torch::Tensor choice(int64_t population, int64_t num_samples, ...@@ -56,7 +56,7 @@ torch::Tensor choice(int64_t population, int64_t num_samples,
auto *out_data = out.data_ptr<int64_t>(); auto *out_data = out.data_ptr<int64_t>();
int64_t i = 0; int64_t i = 0;
for (const auto &value : values) { for (const auto &value : values) {
out2_data[i] = value; out_data[i] = value;
i++; i++;
} }
return out; return out;
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
import torch import torch
# from torch import Tensor # from torch import Tensor
# from torch_sparse import SparseTensor import torch_sparse # noqa
def test_hgt_sample(): def test_hgt_sample():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment