Unverified Commit 63541c88 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[DistGB] enable int32 id ranges in IdMap (#7144)

parent 7c51cd16
"""Module for mapping between node/edge IDs and node/edge types.""" """Module for mapping between node/edge IDs and node/edge types."""
import numpy as np import numpy as np
from .. import backend as F, utils from .. import backend as F, utils
...@@ -6,6 +7,9 @@ from .. import backend as F, utils ...@@ -6,6 +7,9 @@ from .. import backend as F, utils
from .._ffi.function import _init_api from .._ffi.function import _init_api
__all__ = ["IdMap"]
class IdMap: class IdMap:
"""A map for converting node/edge IDs to their type IDs and type-wise IDs. """A map for converting node/edge IDs to their type IDs and type-wise IDs.
...@@ -100,22 +104,36 @@ class IdMap: ...@@ -100,22 +104,36 @@ class IdMap:
""" """
def __init__(self, id_ranges): def __init__(self, id_ranges):
self.num_parts = list(id_ranges.values())[0].shape[0] id_ranges_values = list(id_ranges.values())
assert isinstance(
id_ranges_values[0], np.ndarray
), "id_ranges should be a dict of numpy arrays."
self.num_parts = id_ranges_values[0].shape[0]
self.dtype = id_ranges_values[0].dtype
self.dtype_str = "int32" if self.dtype == np.int32 else "int64"
self.num_types = len(id_ranges) self.num_types = len(id_ranges)
ranges = np.zeros((self.num_parts * self.num_types, 2), dtype=np.int64) ranges = np.zeros(
(self.num_parts * self.num_types, 2), dtype=self.dtype
)
typed_map = [] typed_map = []
id_ranges = list(id_ranges.values()) id_ranges = id_ranges_values
id_ranges.sort(key=lambda a: a[0, 0]) id_ranges.sort(key=lambda a: a[0, 0])
for i, id_range in enumerate(id_ranges): for i, id_range in enumerate(id_ranges):
ranges[i :: self.num_types] = id_range ranges[i :: self.num_types] = id_range
map1 = np.cumsum(id_range[:, 1] - id_range[:, 0]) map1 = np.cumsum(id_range[:, 1] - id_range[:, 0], dtype=self.dtype)
typed_map.append(map1) typed_map.append(map1)
assert np.all(np.diff(ranges[:, 0]) >= 0) assert np.all(np.diff(ranges[:, 0]) >= 0)
assert np.all(np.diff(ranges[:, 1]) >= 0) assert np.all(np.diff(ranges[:, 1]) >= 0)
self.range_start = utils.toindex(np.ascontiguousarray(ranges[:, 0])) self.range_start = utils.toindex(
self.range_end = utils.toindex(np.ascontiguousarray(ranges[:, 1]) - 1) np.ascontiguousarray(ranges[:, 0]), dtype=self.dtype_str
self.typed_map = utils.toindex(np.concatenate(typed_map)) )
self.range_end = utils.toindex(
np.ascontiguousarray(ranges[:, 1]) - 1, dtype=self.dtype_str
)
self.typed_map = utils.toindex(
np.concatenate(typed_map), dtype=self.dtype_str
)
def __call__(self, ids): def __call__(self, ids):
"""Convert the homogeneous IDs to (type_id, type_wise_id). """Convert the homogeneous IDs to (type_id, type_wise_id).
...@@ -137,7 +155,7 @@ class IdMap: ...@@ -137,7 +155,7 @@ class IdMap:
if len(ids) == 0: if len(ids) == 0:
return ids, ids return ids, ids
ids = utils.toindex(ids) ids = utils.toindex(ids, dtype=self.dtype_str)
ret = _CAPI_DGLHeteroMapIds( ret = _CAPI_DGLHeteroMapIds(
ids.todgltensor(), ids.todgltensor(),
self.range_start.todgltensor(), self.range_start.todgltensor(),
...@@ -146,7 +164,7 @@ class IdMap: ...@@ -146,7 +164,7 @@ class IdMap:
self.num_parts, self.num_parts,
self.num_types, self.num_types,
) )
ret = utils.toindex(ret).tousertensor() ret = utils.toindex(ret, dtype=self.dtype_str).tousertensor()
return ret[: len(ids)], ret[len(ids) :] return ret[: len(ids)], ret[len(ids) :]
......
...@@ -30,8 +30,12 @@ from .graph_partition_book import ( ...@@ -30,8 +30,12 @@ from .graph_partition_book import (
RESERVED_FIELD_DTYPE = { RESERVED_FIELD_DTYPE = {
"inner_node": F.uint8, # A flag indicates whether the node is inside a partition. "inner_node": (
"inner_edge": F.uint8, # A flag indicates whether the edge is inside a partition. F.uint8
), # A flag indicates whether the node is inside a partition.
"inner_edge": (
F.uint8
), # A flag indicates whether the edge is inside a partition.
NID: F.int64, NID: F.int64,
EID: F.int64, EID: F.int64,
NTYPE: F.int16, NTYPE: F.int16,
...@@ -512,6 +516,23 @@ def load_partition_book(part_config, part_id): ...@@ -512,6 +516,23 @@ def load_partition_book(part_config, part_id):
node_map = _get_part_ranges(node_map) node_map = _get_part_ranges(node_map)
edge_map = _get_part_ranges(edge_map) edge_map = _get_part_ranges(edge_map)
# Format dtype of node/edge map if dtype is specified.
def _format_node_edge_map(part_metadata, map_type, data):
key = f"{map_type}_map_dtype"
if key not in part_metadata:
return data
dtype = part_metadata[key]
assert dtype in ["int32", "int64"], (
f"The {map_type} map dtype should be either int32 or int64, "
f"but got {dtype}."
)
for key in data:
data[key] = data[key].astype(dtype)
return data
node_map = _format_node_edge_map(part_metadata, "node", node_map)
edge_map = _format_node_edge_map(part_metadata, "edge", edge_map)
# Sort the node/edge maps by the node/edge type ID. # Sort the node/edge maps by the node/edge type ID.
node_map = dict(sorted(node_map.items(), key=lambda x: ntypes[x[0]])) node_map = dict(sorted(node_map.items(), key=lambda x: ntypes[x[0]]))
edge_map = dict(sorted(edge_map.items(), key=lambda x: etypes[x[0]])) edge_map = dict(sorted(edge_map.items(), key=lambda x: etypes[x[0]]))
......
...@@ -282,6 +282,14 @@ def check_hetero_partition( ...@@ -282,6 +282,14 @@ def check_hetero_partition(
src_ntype_ids, part_src_ids = gpb.map_to_per_ntype(part_src_ids) src_ntype_ids, part_src_ids = gpb.map_to_per_ntype(part_src_ids)
dst_ntype_ids, part_dst_ids = gpb.map_to_per_ntype(part_dst_ids) dst_ntype_ids, part_dst_ids = gpb.map_to_per_ntype(part_dst_ids)
etype_ids, part_eids = gpb.map_to_per_etype(part_eids) etype_ids, part_eids = gpb.map_to_per_etype(part_eids)
# `IdMap` is in int64 by default.
assert src_ntype_ids.dtype == F.int64
assert dst_ntype_ids.dtype == F.int64
assert etype_ids.dtype == F.int64
with pytest.raises(dgl.utils.internal.InconsistentDtypeException):
gpb.map_to_per_ntype(F.tensor([0], F.int32))
with pytest.raises(dgl.utils.internal.InconsistentDtypeException):
gpb.map_to_per_etype(F.tensor([0], F.int32))
# These are original per-type IDs. # These are original per-type IDs.
for etype_id, etype in enumerate(hg.canonical_etypes): for etype_id, etype in enumerate(hg.canonical_etypes):
part_src_ids1 = F.boolean_mask(part_src_ids, etype_ids == etype_id) part_src_ids1 = F.boolean_mask(part_src_ids, etype_ids == etype_id)
...@@ -541,13 +549,21 @@ def test_partition( ...@@ -541,13 +549,21 @@ def test_partition(
reset_envs() reset_envs()
def test_RangePartitionBook(): @pytest.mark.parametrize("node_map_dtype", [F.int32, F.int64])
@pytest.mark.parametrize("edge_map_dtype", [F.int32, F.int64])
def test_RangePartitionBook(node_map_dtype, edge_map_dtype):
part_id = 1 part_id = 1
num_parts = 2 num_parts = 2
# homogeneous # homogeneous
node_map = {DEFAULT_NTYPE: F.tensor([[0, 1000], [1000, 2000]])} node_map = {
edge_map = {DEFAULT_ETYPE: F.tensor([[0, 5000], [5000, 10000]])} DEFAULT_NTYPE: F.tensor([[0, 1000], [1000, 2000]], dtype=node_map_dtype)
}
edge_map = {
DEFAULT_ETYPE: F.tensor(
[[0, 5000], [5000, 10000]], dtype=edge_map_dtype
)
}
ntypes = {DEFAULT_NTYPE: 0} ntypes = {DEFAULT_NTYPE: 0}
etypes = {DEFAULT_ETYPE: 0} etypes = {DEFAULT_ETYPE: 0}
gpb = RangePartitionBook( gpb = RangePartitionBook(
...@@ -556,6 +572,21 @@ def test_RangePartitionBook(): ...@@ -556,6 +572,21 @@ def test_RangePartitionBook():
assert gpb.etypes == [DEFAULT_ETYPE[1]] assert gpb.etypes == [DEFAULT_ETYPE[1]]
assert gpb.canonical_etypes == [DEFAULT_ETYPE] assert gpb.canonical_etypes == [DEFAULT_ETYPE]
assert gpb.to_canonical_etype(DEFAULT_ETYPE[1]) == DEFAULT_ETYPE assert gpb.to_canonical_etype(DEFAULT_ETYPE[1]) == DEFAULT_ETYPE
ntype_ids, per_ntype_ids = gpb.map_to_per_ntype(
F.tensor([0, 1000], dtype=node_map_dtype)
)
assert ntype_ids.dtype == node_map_dtype
assert per_ntype_ids.dtype == node_map_dtype
assert np.all(F.asnumpy(ntype_ids) == 0)
assert np.all(F.asnumpy(per_ntype_ids) == [0, 1000])
etype_ids, per_etype_ids = gpb.map_to_per_etype(
F.tensor([0, 5000], dtype=edge_map_dtype)
)
assert etype_ids.dtype == edge_map_dtype
assert per_etype_ids.dtype == edge_map_dtype
assert np.all(F.asnumpy(etype_ids) == 0)
assert np.all(F.asnumpy(per_etype_ids) == [0, 5000])
node_policy = NodePartitionPolicy(gpb, DEFAULT_NTYPE) node_policy = NodePartitionPolicy(gpb, DEFAULT_NTYPE)
assert node_policy.type_name == DEFAULT_NTYPE assert node_policy.type_name == DEFAULT_NTYPE
...@@ -564,10 +595,12 @@ def test_RangePartitionBook(): ...@@ -564,10 +595,12 @@ def test_RangePartitionBook():
# Init via etype is not supported # Init via etype is not supported
node_map = { node_map = {
"node1": F.tensor([[0, 1000], [1000, 2000]]), "node1": F.tensor([[0, 1000], [1000, 2000]], dtype=node_map_dtype),
"node2": F.tensor([[0, 1000], [1000, 2000]]), "node2": F.tensor([[0, 1000], [1000, 2000]], dtype=node_map_dtype),
}
edge_map = {
"edge1": F.tensor([[0, 5000], [5000, 10000]], dtype=edge_map_dtype)
} }
edge_map = {"edge1": F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {"node1": 0, "node2": 1} ntypes = {"node1": 0, "node2": 1}
etypes = {"edge1": 0} etypes = {"edge1": 0}
expect_except = False expect_except = False
...@@ -587,11 +620,13 @@ def test_RangePartitionBook(): ...@@ -587,11 +620,13 @@ def test_RangePartitionBook():
# heterogeneous, init via canonical etype # heterogeneous, init via canonical etype
node_map = { node_map = {
"node1": F.tensor([[0, 1000], [1000, 2000]]), "node1": F.tensor([[0, 1000], [1000, 2000]], dtype=node_map_dtype),
"node2": F.tensor([[0, 1000], [1000, 2000]]), "node2": F.tensor([[0, 1000], [1000, 2000]], dtype=node_map_dtype),
} }
edge_map = { edge_map = {
("node1", "edge1", "node2"): F.tensor([[0, 5000], [5000, 10000]]) ("node1", "edge1", "node2"): F.tensor(
[[0, 5000], [5000, 10000]], dtype=edge_map_dtype
)
} }
ntypes = {"node1": 0, "node2": 1} ntypes = {"node1": 0, "node2": 1}
etypes = {("node1", "edge1", "node2"): 0} etypes = {("node1", "edge1", "node2"): 0}
...@@ -603,6 +638,23 @@ def test_RangePartitionBook(): ...@@ -603,6 +638,23 @@ def test_RangePartitionBook():
assert gpb.canonical_etypes == [c_etype] assert gpb.canonical_etypes == [c_etype]
assert gpb.to_canonical_etype("edge1") == c_etype assert gpb.to_canonical_etype("edge1") == c_etype
assert gpb.to_canonical_etype(c_etype) == c_etype assert gpb.to_canonical_etype(c_etype) == c_etype
ntype_ids, per_ntype_ids = gpb.map_to_per_ntype(
F.tensor([0, 1000], dtype=node_map_dtype)
)
assert ntype_ids.dtype == node_map_dtype
assert per_ntype_ids.dtype == node_map_dtype
assert np.all(F.asnumpy(ntype_ids) == 0)
assert np.all(F.asnumpy(per_ntype_ids) == [0, 1000])
etype_ids, per_etype_ids = gpb.map_to_per_etype(
F.tensor([0, 5000], dtype=edge_map_dtype)
)
assert etype_ids.dtype == edge_map_dtype
assert per_etype_ids.dtype == edge_map_dtype
assert np.all(F.asnumpy(etype_ids) == 0)
assert np.all(F.asnumpy(per_etype_ids) == [0, 5000])
expect_except = False expect_except = False
try: try:
gpb.to_canonical_etype(("node1", "edge2", "node2")) gpb.to_canonical_etype(("node1", "edge2", "node2"))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment