Unverified Commit 92e22995 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[DistDGL][Robustness]Replacing numpy's unique with custom implementation (#5391)



* Replacing numpy's unique with custom implementation

* Added docstring to the new function.

* Adding unit tests

* Numpy's version issues with the 'kind' argument.

* Addressing CI Test Failure.

* Addressing CI review comments.

* revised implementation, optimized for time.

* added missing arguments for fallback case.

* Addressing CI test failures.

* Resolving issues with PYTHONPATH

* Fix CI Test Failure issues.

* fix CI test failures.

---------
Co-authored-by: default avatarRhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
parent d2ed57e9
import os
import tempfile
import numpy as np
import pytest
import utils
from convert_partition import _get_unique_invidx
@pytest.mark.parametrize(
"num_nodes, num_edges, nid_begin, nid_end",
[
[4000, 40000, 0, 1000],
[4000, 40000, 1000, 2000],
[4000, 40000, 2000, 3000],
[4000, 40000, 3000, 4000],
[4000, 100, 0, 1000],
[4000, 100, 1000, 2000],
[4000, 100, 2000, 3000],
[4000, 100, 3000, 4000],
[1, 1, 0, 1],
],
)
def test_get_unique_invidx_with_numpy(num_nodes, num_edges, nid_begin, nid_end):
# prepare data for the function
# generate synthetic edges
if num_edges > 0:
srcids = np.random.randint(0, num_nodes, (num_edges,)) # exclusive
dstids = np.random.randint(
nid_begin, nid_end, (num_edges,)
) # exclusive
else:
srcids = np.array([])
dstids = np.array([])
assert nid_begin <= nid_end
# generate unique node-ids for any
# partition. This list should be sorted.
# This is equivilant to shuffle_nids in a partition
unique_nids = np.arange(nid_begin, nid_end) # exclusive
# test with numpy unique here
orig_srcids = srcids.copy()
orig_dstids = dstids.copy()
input_arr = np.concatenate([srcids, dstids, unique_nids])
# test
uniques, idxes, srcids, dstids = _get_unique_invidx(
srcids, dstids, unique_nids
)
assert len(uniques) == len(idxes)
assert np.all(srcids < len(uniques))
assert np.all(dstids < len(uniques))
assert np.all(uniques[srcids].sort() == orig_srcids.sort())
assert np.all(uniques[dstids] == orig_dstids)
assert np.all(uniques == input_arr[idxes])
# numpy
np_uniques, np_idxes, np_inv_idxes = np.unique(
np.concatenate([orig_srcids, orig_dstids, unique_nids]),
return_index=True,
return_inverse=True,
)
# test uniques
assert np.all(np_uniques == uniques)
# test idxes array
assert np.all(input_arr[idxes].sort() == input_arr[np_idxes].sort())
# test srcids, inv_indices
assert np.all(
uniques[srcids].sort()
== np_uniques[np_inv_idxes[0 : len(srcids)]].sort()
)
# test dstids, inv_indices
assert np.all(
uniques[dstids].sort() == np_uniques[np_inv_idxes[len(srcids) :]].sort()
)
@pytest.mark.parametrize(
"num_nodes, num_edges, nid_begin, nid_end",
[
# dense networks, no. of edges more than no. of nodes
[4000, 40000, 0, 1000],
[4000, 40000, 1000, 2000],
[4000, 40000, 2000, 3000],
[4000, 40000, 3000, 4000],
# sparse networks, no. of edges smaller than no. of nodes
[4000, 100, 0, 1000],
[4000, 100, 1000, 2000],
[4000, 100, 2000, 3000],
[4000, 100, 3000, 4000],
# corner case
[1, 1, 0, 1],
],
)
def test_get_unique_invidx(num_nodes, num_edges, nid_begin, nid_end):
# prepare data for the function
# generate synthetic edges
if num_edges > 0:
srcids = np.random.randint(0, num_nodes, (num_edges,))
dstids = np.random.randint(nid_begin, nid_end, (num_edges,))
else:
srcids = np.array([])
dstids = np.array([])
assert nid_begin <= nid_end
# generate unique node-ids for any
# partition. This list should be sorted.
# This is equivilant to shuffle_nids in a partition
unique_nids = np.arange(nid_begin, nid_end)
# invoke the test target
uniques, idxes, src_ids, dst_ids = _get_unique_invidx(
srcids, dstids, unique_nids
)
# validate the outputs of this function
# array uniques should be sorted list of integers.
assert np.all(
np.diff(uniques) >= 0
), f"Output parameter uniques assert failing."
# idxes are list of integers
# these are indices in the concatenated list (srcids, dstids, unique_nids)
max_idx = len(src_ids) + len(dst_ids) + len(unique_nids)
assert np.all(idxes >= 0), f"Output parameter idxes has negative values."
assert np.all(
idxes < max_idx
), f"Output parameter idxes has invalid maximum value."
# srcids and dstids will be inverse indices in the uniques list
min_src = np.amin(src_ids)
max_src = np.amax(src_ids)
min_dst = np.amin(dst_ids)
max_dst = np.amax(dst_ids)
assert (
len(uniques) > max_src
), f"Inverse idx, src_ids, has invalid max value."
assert min_src >= 0, f"Inverse idx, src_ids has negative values."
assert len(uniques) > max_dst, f"Inverse idx, dst_ids, invalid max value."
assert max_dst >= 0, f"Inverse idx, dst_ids has negative values."
...@@ -21,6 +21,160 @@ from pyarrow import csv ...@@ -21,6 +21,160 @@ from pyarrow import csv
from utils import get_idranges, memory_snapshot, read_json from utils import get_idranges, memory_snapshot, read_json
def _get_unique_invidx(srcids, dstids, nids):
"""This function is used to compute a list of unique elements,
and their indices in the input list, which is the concatenation
of srcids, dstids and uniq_nids. In addition, this function will also
compute inverse indices, in the list of unique elements, for the
elements in srcids, dstids and nids arrays. srcids, dstids will be
over-written to contain the inverse indices. Basically, this function
is mimicing the functionality of numpy's unique function call.
The problem with numpy's unique function call is its high memory
requirement. For an input list of 3 billion edges it consumes about
550GB of systems memory, which is limiting the capability of the
partitioning pipeline.
Current numpy uniques function returns 3 return parameters, which are
. list of unique elements
. list of indices, in the input argument list, which are first
occurance of the corresponding element in the uniques list
. list of inverse indices, which are indices from the uniques list
and can be used to rebuild the original input array
Compared to the above numpy's return parameters, this work around
solution returns 4 values
. list of unique elements,
. list of indices, which may not be the first occurance of the
corresponding element from the uniques
. list of inverse indices, here we only build the inverse indices
for srcids and dstids input arguments. For the current use case,
only these two inverse indices are needed.
Parameters:
-----------
srcids : numpy array
a list of numbers, which are the src-ids of the edges
dstids : numpy array
a list of numbers, which are the dst-ids of the edges
nids : numpy array
a list of numbers, a list of unique shuffle-global-nids.
This list is guaranteed to be a list of sorted consecutive unique
list of numbers. Also, this list will be a `super set` for the
list of dstids. Current implementation of the pipeline guarantees
this assumption and is used to simplify the current implementation
of the workaround solution.
Returns:
--------
numpy array :
a list of unique, sorted elements, computed from the input arguments
numpy array :
a list of integers. These are indices in the concatenated list
[srcids, dstids, uniq_nids], which are the input arguments to this function
numpy array :
a list of integers. These are inverse indices, which will be indices
from the unique elements list specifying the elements from the
input array, srcids
numpy array :
a list of integers. These are inverse indices, which will be indices
from the unique elements list specifying the elements from the
input array, dstids
"""
assert len(srcids) == len(
dstids
), f"Please provide the correct input parameters"
assert len(srcids) != 0, f"Please provide a non-empty edge-list."
if np.__version__ < "1.24.0":
logging.warning(
f"Numpy version, {np.__version__}, is lower than expected."
f"Falling back to numpy's native function unique."
f"This functions memory overhead will limit size of the "
f"partitioned graph objects processed by each node in the cluster."
)
uniques, idxes, inv_idxes = np.unique(
np.concatenate([srcids, dstids, nids]),
return_index=True,
return_inverse=True,
)
src_len = len(srcids)
dst_len = len(dstids)
return (
uniques,
idxes,
inv_idxes[:src_len],
inv_idxes[src_len : (src_len + dst_len)],
)
# find uniqes which appear only in the srcids list
mask = np.isin(srcids, nids, invert=True, kind="table")
srcids_only = srcids[mask]
srcids_idxes = np.where(mask == 1)[0]
# sort
uniques, unique_srcids_idx = np.unique(srcids_only, return_index=True)
idxes = srcids_idxes[unique_srcids_idx]
# build uniques and idxes, first and second return parameters
uniques = np.concatenate([uniques, nids])
idxes = np.concatenate(
[idxes, len(srcids) + len(dstids) + np.arange(len(nids))]
)
# sort and idxes
sort_idx = np.argsort(uniques)
uniques = uniques[sort_idx]
idxes = idxes[sort_idx]
# uniques and idxes are built
assert len(uniques) == len(idxes), f"Error building the idxes array."
# build inverse idxes for srcids, dstids and nids
# over-write the srcids and dstids arrays.
sort_ids = np.argsort(srcids)
srcids = srcids[sort_ids]
# TODO: check if wrapping this while loop in a c++ wrapper
# helps in speeding up the code
idx1 = 0
idx2 = 0
while (idx1 < len(srcids)) and (idx2 < len(uniques)):
if srcids[idx1] == uniques[idx2]:
srcids[idx1] = idx2
idx1 += 1
elif srcids[idx1] < uniques[idx2]:
idx1 += 1
else:
idx2 += 1
assert idx1 >= len(srcids), (
f"Failed to locate all srcids in the uniques array "
f" len(srcids) = {len(srcids)}, idx1 = {idx1} "
f" len(uniques) = {len(uniques)}, idx2 = {idx2}"
)
srcids[sort_ids] = srcids
# process dstids now.
# dstids is guaranteed to be a subset of the `nids` list
# here we are computing index in the list of uniqes for
# each element in the list of dstids, in a two step process
# 1. locate the position of first element from nids in the
# list of uniques - dstids cannot appear to the left
# of this number, they are guaranteed to be on the right
# side of this number.
# 2. dstids = dstids - nids[0]
# By subtracting nids[0] from the list of dstids will make
# the list of dstids to be in the range of [0, max(nids)-1]
# 3. dstids = dstids - nids[0] + offset
# Now we move the list of dstids by `offset` which will be
# the starting position of the nids[0] element. Note that
# nids will ALWAYS be a SUPERSET of dstids.
offset = np.searchsorted(uniques, nids[0], side="left")
dstids = dstids - nids[0] + offset
# return the values
return uniques, idxes, srcids, dstids
def create_dgl_object( def create_dgl_object(
schema, schema,
part_id, part_id,
...@@ -232,26 +386,13 @@ def create_dgl_object( ...@@ -232,26 +386,13 @@ def create_dgl_object(
memory_snapshot("CreateDGLObj_UniqueNodeIds: ", part_id) memory_snapshot("CreateDGLObj_UniqueNodeIds: ", part_id)
# get the edge list in some order and then reshuffle. # get the edge list in some order and then reshuffle.
# Here the order of nodes is defined by the `np.unique` function # Here the order of nodes is defined by the sorted order.
# node order is as listed in the uniq_ids array uniq_ids, idx, part_local_src_id, part_local_dst_id = _get_unique_invidx(
ids = np.concatenate(
[
shuffle_global_src_id, shuffle_global_src_id,
shuffle_global_dst_id, shuffle_global_dst_id,
np.arange( np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1),
shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1
),
]
)
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True
) )
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
part_local_src_id, part_local_dst_id = np.split(
inverse_idx[: len(shuffle_global_src_id) * 2], 2
)
inner_nodes = th.as_tensor( inner_nodes = th.as_tensor(
np.logical_and( np.logical_and(
uniq_ids >= shuffle_global_nid_range[0], uniq_ids >= shuffle_global_nid_range[0],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment