// !!! This is a file automatically generated by hipify!!! /** * Copyright (c) 2023 by Contributors * Copyright (c) 2023, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek) * @file cuda/unique_and_compact_impl.cu * @brief Unique and compact operator implementation on CUDA. */ #include #include #include #include #include #include #include #include #include #include #include "./common.h" #include "./extension/unique_and_compact.h" #include "./utils.h" namespace graphbolt { namespace ops { template struct EqualityFunc { const scalar_t* sorted_order; const scalar_t* found_locations; const scalar_t* searched_items; __host__ __device__ auto operator()(int64_t i) { return sorted_order[found_locations[i]] == searched_items[i]; } }; #define DefineCubReductionFunction(cub_reduce_fn, name) \ template \ auto name(const scalar_iterator_t input, int64_t size) { \ using scalar_t = std::remove_reference_t; \ cuda::CopyScalar result; \ CUB_CALL(cub_reduce_fn, input, result.get(), size); \ return result; \ } DefineCubReductionFunction(DeviceReduce::Max, Max); DefineCubReductionFunction(DeviceReduce::Min, Min); std::vector> UniqueAndCompactBatchedSortBased( const std::vector& src_ids, const std::vector& dst_ids, const std::vector& unique_dst_ids, int num_bits) { auto allocator = cuda::GetAllocator(); auto stream = cuda::GetCurrentStream(); auto scalar_type = src_ids.at(0).scalar_type(); return AT_DISPATCH_INDEX_TYPES( scalar_type, "unique_and_compact", ([&] { std::vector src_ids_ptr, dst_ids_ptr, unique_dst_ids_ptr; for (std::size_t i = 0; i < src_ids.size(); i++) { src_ids_ptr.emplace_back(src_ids[i].data_ptr()); dst_ids_ptr.emplace_back(dst_ids[i].data_ptr()); unique_dst_ids_ptr.emplace_back( unique_dst_ids[i].data_ptr()); } // If num_bits is not given, compute maximum vertex ids to compute // num_bits later to speedup the expensive sort operations. std::vector> max_id_src; std::vector> max_id_dst; for (std::size_t i = 0; num_bits == 0 && i < src_ids.size(); i++) { max_id_src.emplace_back(Max(src_ids_ptr[i], src_ids[i].size(0))); max_id_dst.emplace_back( Max(unique_dst_ids_ptr[i], unique_dst_ids[i].size(0))); } // Sort the unique_dst_ids tensor. std::vector sorted_unique_dst_ids; std::vector sorted_unique_dst_ids_ptr; for (std::size_t i = 0; i < unique_dst_ids.size(); i++) { sorted_unique_dst_ids.emplace_back(Sort( unique_dst_ids_ptr[i], unique_dst_ids[i].size(0), num_bits)); sorted_unique_dst_ids_ptr.emplace_back( sorted_unique_dst_ids[i].data_ptr()); } // Mark dst nodes in the src_ids tensor. std::vector(0))> is_dst; for (std::size_t i = 0; i < src_ids.size(); i++) { is_dst.emplace_back( allocator.AllocateStorage(src_ids[i].size(0))); THRUST_CALL( binary_search, sorted_unique_dst_ids_ptr[i], sorted_unique_dst_ids_ptr[i] + unique_dst_ids[i].size(0), src_ids_ptr[i], src_ids_ptr[i] + src_ids[i].size(0), is_dst[i].get()); } // Filter the non-dst nodes in the src_ids tensor, hence only_src. std::vector only_src; { std::vector> only_src_size; for (std::size_t i = 0; i < src_ids.size(); i++) { only_src.emplace_back(torch::empty( src_ids[i].size(0), sorted_unique_dst_ids[i].options())); auto is_src = thrust::make_transform_iterator( is_dst[i].get(), thrust::logical_not{}); only_src_size.emplace_back(cuda::CopyScalar{}); CUB_CALL( DeviceSelect::Flagged, src_ids_ptr[i], is_src, only_src[i].data_ptr(), only_src_size[i].get(), src_ids[i].size(0)); } stream.synchronize(); for (std::size_t i = 0; i < only_src.size(); i++) { only_src[i] = only_src[i].slice(0, 0, static_cast(only_src_size[i])); } } // The code block above synchronizes, ensuring safe access to // max_id_src and max_id_dst. if (num_bits == 0) { // index_t max_id = 0; for (std::size_t i = 0; i < max_id_src.size(); i++) { max_id = std::max(max_id, static_cast(max_id_src[i])); max_id = std::max(max_id, static_cast(max_id_dst[i])); } num_bits = cuda::NumberOfBits(1ll + max_id); // num_bits = cuda::NumberOfBits( // 1 + ::max( // static_cast(max_id_src), // static_cast(max_id_dst))); } // Sort the only_src tensor so that we can unique it later. std::vector sorted_only_src; for (auto& only_src_i : only_src) { sorted_only_src.emplace_back(Sort( only_src_i.data_ptr(), only_src_i.size(0), num_bits)); } std::vector unique_only_src; std::vector unique_only_src_ptr; std::vector> unique_only_src_size; for (std::size_t i = 0; i < src_ids.size(); i++) { // Compute the unique operation on the only_src tensor. unique_only_src.emplace_back( torch::empty(only_src[i].size(0), src_ids[i].options())); unique_only_src_ptr.emplace_back( unique_only_src[i].data_ptr()); unique_only_src_size.emplace_back(cuda::CopyScalar{}); CUB_CALL( DeviceSelect::Unique, sorted_only_src[i].data_ptr(), unique_only_src_ptr[i], unique_only_src_size[i].get(), only_src[i].size(0)); } stream.synchronize(); for (std::size_t i = 0; i < unique_only_src.size(); i++) { unique_only_src[i] = unique_only_src[i].slice( 0, 0, static_cast(unique_only_src_size[i])); } std::vector real_order; for (std::size_t i = 0; i < unique_dst_ids.size(); i++) { real_order.emplace_back( torch::cat({unique_dst_ids[i], unique_only_src[i]})); } // Sort here so that binary search can be used to lookup new_ids. std::vector sorted_order, new_ids; std::vector sorted_order_ptr; std::vector new_ids_ptr; for (std::size_t i = 0; i < real_order.size(); i++) { auto [sorted_order_i, new_ids_i] = Sort(real_order[i], num_bits); sorted_order_ptr.emplace_back(sorted_order_i.data_ptr()); new_ids_ptr.emplace_back(new_ids_i.data_ptr()); sorted_order.emplace_back(std::move(sorted_order_i)); new_ids.emplace_back(std::move(new_ids_i)); } // Holds the found locations of the src and dst ids in the // sorted_order. Later is used to lookup the new ids of the src_ids // and dst_ids tensors. std::vector(0))> new_dst_ids_loc; for (std::size_t i = 0; i < sorted_order.size(); i++) { new_dst_ids_loc.emplace_back( allocator.AllocateStorage(dst_ids[i].size(0))); THRUST_CALL( lower_bound, sorted_order_ptr[i], sorted_order_ptr[i] + sorted_order[i].size(0), dst_ids_ptr[i], dst_ids_ptr[i] + dst_ids[i].size(0), new_dst_ids_loc[i].get()); } std::vector> all_exist; at::cuda::CUDAEvent all_exist_event; bool should_record = false; // Check if unique_dst_ids includes all dst_ids. for (std::size_t i = 0; i < dst_ids.size(); i++) { if (dst_ids[i].size(0) > 0) { thrust::counting_iterator iota(0); auto equal_it = thrust::make_transform_iterator( iota, EqualityFunc{ sorted_order_ptr[i], new_dst_ids_loc[i].get(), dst_ids_ptr[i]}); all_exist.emplace_back(Min(equal_it, dst_ids[i].size(0))); should_record = true; } else { all_exist.emplace_back(cuda::CopyScalar{}); } } if (should_record) all_exist_event.record(); std::vector(0))> new_src_ids_loc; for (std::size_t i = 0; i < sorted_order.size(); i++) { new_src_ids_loc.emplace_back( allocator.AllocateStorage(src_ids[i].size(0))); THRUST_CALL( lower_bound, sorted_order_ptr[i], sorted_order_ptr[i] + sorted_order[i].size(0), src_ids_ptr[i], src_ids_ptr[i] + src_ids[i].size(0), new_src_ids_loc[i].get()); } // Finally, lookup the new compact ids of the src and dst tensors // via gather operations. std::vector new_src_ids; for (std::size_t i = 0; i < src_ids.size(); i++) { new_src_ids.emplace_back(torch::empty_like(src_ids[i])); THRUST_CALL( gather, new_src_ids_loc[i].get(), new_src_ids_loc[i].get() + src_ids[i].size(0), new_ids[i].data_ptr(), new_src_ids[i].data_ptr()); } // Perform check before we gather for the dst indices. for (std::size_t i = 0; i < dst_ids.size(); i++) { if (dst_ids[i].size(0) > 0) { if (should_record) { all_exist_event.synchronize(); should_record = false; } if (!static_cast(all_exist[i])) { throw std::out_of_range("Some ids not found."); } } } std::vector new_dst_ids; for (std::size_t i = 0; i < dst_ids.size(); i++) { new_dst_ids.emplace_back(torch::empty_like(dst_ids[i])); THRUST_CALL( gather, new_dst_ids_loc[i].get(), new_dst_ids_loc[i].get() + dst_ids[i].size(0), new_ids[i].data_ptr(), new_dst_ids[i].data_ptr()); } std::vector> results; for (std::size_t i = 0; i < src_ids.size(); i++) { results.emplace_back( std::move(real_order[i]), std::move(new_src_ids[i]), std::move(new_dst_ids[i])); } return results; })); } std::vector> UniqueAndCompactBatched( const std::vector& src_ids, const std::vector& dst_ids, const std::vector& unique_dst_ids, int num_bits) { auto dev_id = cuda::GetCurrentStream().device_index(); static std::mutex mtx; static std::unordered_map compute_capability_cache; const auto compute_capability_major = [&] { std::lock_guard lock(mtx); auto it = compute_capability_cache.find(dev_id); if (it != compute_capability_cache.end()) { return it->second; } else { int major; CUDA_RUNTIME_CHECK(hipDeviceGetAttribute( &major, hipDeviceAttributeComputeCapabilityMajor, dev_id)); return compute_capability_cache[dev_id] = major; } }(); if (compute_capability_major >= 7) { // Utilizes a hash table based implementation, the mapped id of a vertex // will be monotonically increasing as the first occurrence index of it in // torch.cat([unique_dst_ids, src_ids]). Thus, it is deterministic. // return UniqueAndCompactBatchedHashMapBased( // src_ids, dst_ids, unique_dst_ids); } // Utilizes a sort based algorithm, the mapped id of a vertex part of the // src_ids but not part of the unique_dst_ids will be monotonically increasing // as the actual vertex id increases. Thus, it is deterministic. return UniqueAndCompactBatchedSortBased( src_ids, dst_ids, unique_dst_ids, num_bits); } std::tuple UniqueAndCompact( const torch::Tensor src_ids, const torch::Tensor dst_ids, const torch::Tensor unique_dst_ids, int num_bits) { return UniqueAndCompactBatched( {src_ids}, {dst_ids}, {unique_dst_ids}, num_bits)[0]; } } // namespace ops } // namespace graphbolt