"vscode:/vscode.git/clone" did not exist on "acd094e702a2463f602995abe344254cc31ff49d"
Unverified Commit 894ad1e3 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Support for no. of chunks smaller than no. of partitions. (#5390)

* Support for no. of chunks smaller than no. of partitions and Adding appropriate test cases.

Following changes are made with this PR.
1. Code changes for handling no. of chunks smaller than no. of partitions
2. Adding new test cases, which were previously deleted, for no. of chunks smaller than no. of partitions.
3. Also adding test cases, where multiple partitions are handled by a single process.

* Committing the missing files in this commit.

* lintrunner patch.

* lintrunner check

* lintrunner patch here.

* CI review comments.
parent 851d66fa
......@@ -166,6 +166,42 @@ def test_chunk_graph_vector_rows(num_chunks, vector_rows):
)
@pytest.mark.parametrize(
"num_chunks, "
"num_chunks_nodes, "
"num_chunks_edges, "
"num_chunks_node_data, "
"num_chunks_edge_data",
[
[1, None, None, None, None],
[8, None, None, None, None],
[4, 4, 4, 8, 12],
[4, 4, 4, {"paper": 10}, {("author", "writes", "paper"): 24}],
[
4,
4,
4,
{"paper": {"feat": 10}},
{("author", "writes", "paper"): {"year": 24}},
],
],
)
def test_chunk_graph_arbitrary_chunks(
num_chunks,
num_chunks_nodes,
num_chunks_edges,
num_chunks_node_data,
num_chunks_edge_data,
):
_test_chunk_graph(
num_chunks,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
def _test_pipeline(
num_chunks,
num_parts,
......@@ -178,9 +214,6 @@ def _test_pipeline(
num_chunks_edge_data=None,
use_verify_partitions=False,
):
if num_chunks < num_parts:
# num_parts should less/equal than num_chunks
return
if num_parts % world_size != 0:
# num_parts should be a multiple of world_size
......@@ -288,6 +321,46 @@ def test_pipeline_formats(graph_formats):
_test_pipeline(4, 4, 4, graph_formats)
@pytest.mark.parametrize(
"num_chunks, "
"num_parts, "
"world_size, "
"num_chunks_node_data, "
"num_chunks_edge_data",
[
# Test cases where no. of chunks more than
# no. of partitions
[8, 4, 4, 8, 8],
[8, 4, 2, 8, 8],
[9, 7, 5, 9, 9],
[8, 8, 4, 8, 8],
# Test cases where no. of chunks smaller
# than no. of partitions
[7, 8, 4, 7, 7],
[1, 8, 4, 1, 1],
[1, 4, 4, 1, 1],
[3, 4, 4, 3, 3],
[1, 4, 2, 1, 1],
[3, 4, 2, 3, 3],
[1, 5, 3, 1, 1],
],
)
def test_pipeline_arbitrary_chunks(
num_chunks,
num_parts,
world_size,
num_chunks_node_data,
num_chunks_edge_data,
):
_test_pipeline(
num_chunks,
num_parts,
world_size,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
@pytest.mark.parametrize(
"graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
......
......@@ -24,6 +24,7 @@ from globalids import (
from gloo_wrapper import allgather_sizes, alltoallv_cpu, gather_metadata_json
from utils import (
augment_edge_data,
DATA_TYPE_ID,
get_edge_types,
get_etype_featnames,
get_gid_offsets,
......@@ -36,6 +37,7 @@ from utils import (
memory_snapshot,
read_json,
read_ntype_partition_files,
REV_DATA_TYPE_ID,
write_dgl_objects,
write_metadata_json,
)
......@@ -420,6 +422,7 @@ def exchange_feature(
# Ownership is determined by the destination node.
assert data is not None
global_eids = np.arange(gid_start, gid_end, dtype=np.int64)
if data[constants.GLOBAL_EID].shape[0] > 0:
logging.info(
f"[Rank: {rank} disk read global eids - min - {np.amin(data[constants.GLOBAL_EID])}, max - {np.amax(data[constants.GLOBAL_EID])}, count - {data[constants.GLOBAL_EID].shape}"
)
......@@ -436,6 +439,42 @@ def exchange_feature(
assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
partid_slice = id_lookup.get_partition_ids(global_dst_nids)
# determine the shape of the feature-data
# this is needed to so that ranks where feature-data is not present
# should use the correct shape for sending the padded vector.
# exchange length here.
feat_dim_len = 0
if featdata_key is not None:
feat_dim_len = len(featdata_key.shape)
all_lens = allgather_sizes(
[feat_dim_len], world_size, num_parts, return_sizes=True
)
if all_lens[0] <= 0:
logging.info(
f"[Rank: {rank} No process has any feature data to shuffle for {local_feat_key}"
)
return cur_features, cur_global_ids
rank0_shape_len = all_lens[0]
for idx in range(1, world_size):
assert (all_lens[idx] == 0) or (all_lens[idx] == rank0_shape_len), (
f"feature: {local_feat_key} shapes does not match "
f"at rank - {idx} and rank - 0"
)
# exchange actual data here.
if featdata_key != None:
feat_dims_dtype = list(featdata_key.shape)
feat_dims_dtype.append(DATA_TYPE_ID[featdata_key.dtype])
else:
feat_dims_dtype = list(np.zeros((rank0_shape_len), dtype=np.int64))
feat_dims_dtype.append(DATA_TYPE_ID[torch.float32])
logging.info(f"Sending the feature shape information - {feat_dims_dtype}")
all_dims_dtype = allgather_sizes(
feat_dims_dtype, world_size, num_parts, return_sizes=True
)
for idx in range(world_size):
cond = partid_slice == (idx + local_part_id * world_size)
gids_per_partid = gids_feat[cond]
......@@ -443,7 +482,14 @@ def exchange_feature(
local_idx_partid = local_idx[cond]
if gids_per_partid.shape[0] == 0:
feats_per_rank.append(torch.empty((0, 1), dtype=torch.float))
assert len(all_dims_dtype) % world_size == 0
dim_len = int(len(all_dims_dtype) / world_size)
rank0_shape = tuple(list(np.zeros((dim_len - 1), dtype=np.int32)))
rank0_dtype = REV_DATA_TYPE_ID[
all_dims_dtype[(dim_len - 1) : (dim_len)][0]
]
data = torch.empty(rank0_shape, dtype=rank0_dtype)
feats_per_rank.append(data)
global_id_per_rank.append(torch.empty((0,), dtype=torch.int64))
else:
feats_per_rank.append(featdata_key[local_idx_partid])
......@@ -463,6 +509,9 @@ def exchange_feature(
output_id_list = alltoallv_cpu(
rank, world_size, global_id_per_rank, retain_nones=False
)
logging.info(
f"[Rank : {rank} feats - {output_feat_list}, ids - {output_id_list}"
)
assert len(output_feat_list) == len(output_id_list), (
"Length of feature list and id list are expected to be equal while "
f"got {len(output_feat_list)} and {len(output_id_list)}."
......
......@@ -12,34 +12,15 @@ import torch
import torch.distributed as dist
from gloo_wrapper import alltoallv_cpu
from utils import (
DATA_TYPE_ID,
generate_read_list,
get_gid_offsets,
get_idranges,
map_partid_rank,
REV_DATA_TYPE_ID,
)
DATA_TYPE_ID = {
data_type: id
for id, data_type in enumerate(
[
torch.float32,
torch.float64,
torch.float16,
torch.uint8,
torch.int8,
torch.int16,
torch.int32,
torch.int64,
torch.bool,
]
)
}
REV_DATA_TYPE_ID = {id: data_type for data_type, id in DATA_TYPE_ID.items()}
def _broadcast_shape(
data, rank, world_size, num_parts, is_feat_data, feat_name
):
......@@ -443,7 +424,7 @@ def get_dataset(
edge_feature_tids[data_key] = [(start, end)]
else:
edge_features[data_key] = None
edge_feature_tids[data_key] = []
edge_feature_tids[data_key] = [(0, 0)]
# Done with building node_features locally.
if len(edge_features) <= 0:
......
......@@ -166,6 +166,13 @@ class DistLookupService:
[local_rows], self.world_size, self.num_parts, return_sizes=True
)
max_count = np.amax(all_sizes)
if max_count <= 0:
logging.info(
f"[Rank: {self.rank}] No process has global_nids to process !!!"
)
return
num_splits = np.ceil(max_count / CHUNK_SIZE).astype(np.uint16)
LOCAL_CHUNK_SIZE = np.ceil(local_rows / num_splits).astype(np.int64)
agg_partition_ids = []
......
......@@ -8,9 +8,30 @@ import dgl
import numpy as np
import psutil
import pyarrow
import torch
from dgl.distributed.partition import _dump_part_config
from pyarrow import csv
DATA_TYPE_ID = {
data_type: id
for id, data_type in enumerate(
[
torch.float32,
torch.float64,
torch.float16,
torch.uint8,
torch.int8,
torch.int16,
torch.int32,
torch.int64,
torch.bool,
]
)
}
REV_DATA_TYPE_ID = {id: data_type for data_type, id in DATA_TYPE_ID.items()}
def read_ntype_partition_files(schema_map, input_dir):
"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment