Unverified Commit a14f69c9 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[DistDGL][Feature_Request]Changes in the metadata.json file for input graph dataset. (#5310)

* Implemented the following changes.

* Remove NUM_NODES_PER_CHUNK
* Remove NUM_EDGES_PER_CHUNK
* Remove the dependency between no. of edge files per edge type and no. of partitions
* Remove the dependency between no. of edge feature files per edge type and no. of partitions
* Remove the dependency between no. of edge feature files and no. of edge files per edge type.
* Remove the dependency between no. of node feature files and no. of partitions
* Add “node_type_counts”. This will be a list of integers. Each integer will represent total count of a node-type. The index in this list and the index in the “node_type” will be the same for a given node-type.
* Add “edge_type_counts”. This will be a list of integers. Each integer will represent total count of an edge-type. The index in this list and the index in the “edge_type” list will be the same for a given edge-type.

* Applying lintrunner patch.

* Adding missing keys to the metadata in the unit test framework.

* lintrunner patch.

* Resolving CI test failures due to merge conflicts.

* Applying lintrunner patch

* applying lintrunner patch

* Replacing tabspace with spaces - to satisfy lintrunner

* Fixing the CI Test Failure cases.

* Applying lintrunner patch

* lintrunner complaining about a blank line.

* Resolving issues with print statement for NoneType

* Removed tests for the arbitrary chunks tests. Since this functionality is not supported anymore.

* Addressing CI review comments.

* addressing CI review comments

* lintrunner patch

* lintrunner patch.

* Addressing CI review comments.

* lintrunner patch.
parent fcf5ad5f
......@@ -166,42 +166,6 @@ def test_chunk_graph_vector_rows(num_chunks, vector_rows):
)
@pytest.mark.parametrize(
"num_chunks, "
"num_chunks_nodes, "
"num_chunks_edges, "
"num_chunks_node_data, "
"num_chunks_edge_data",
[
[1, None, None, None, None],
[8, None, None, None, None],
[4, 4, 4, 8, 12],
[4, 4, 4, {"paper": 10}, {("author", "writes", "paper"): 24}],
[
4,
4,
4,
{"paper": {"feat": 10}},
{("author", "writes", "paper"): {"year": 24}},
],
],
)
def test_chunk_graph_arbitray_chunks(
num_chunks,
num_chunks_nodes,
num_chunks_edges,
num_chunks_node_data,
num_chunks_edge_data,
):
_test_chunk_graph(
num_chunks,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
def _test_pipeline(
num_chunks,
num_parts,
......@@ -324,41 +288,6 @@ def test_pipeline_formats(graph_formats):
_test_pipeline(4, 4, 4, graph_formats)
@pytest.mark.parametrize(
"num_chunks, "
"num_parts, "
"world_size, "
"num_chunks_node_data, "
"num_chunks_edge_data",
[
[8, 4, 2, 20, 25],
[9, 7, 5, 3, 11],
[8, 8, 4, 3, 5],
[
8,
4,
2,
{"paper": {"feat": 11, "year": 1}},
{("author", "writes", "paper"): {"year": 24}},
],
],
)
def test_pipeline_arbitray_chunks(
num_chunks,
num_parts,
world_size,
num_chunks_node_data,
num_chunks_edge_data,
):
_test_pipeline(
num_chunks,
num_parts,
world_size,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
@pytest.mark.parametrize(
"graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
......
......@@ -118,6 +118,9 @@ def _chunk_graph(
metadata["graph_name"] = name
metadata["node_type"] = g.ntypes
# add node_type_counts
metadata["num_nodes_per_type"] = [g.num_nodes(ntype) for ntype in g.ntypes]
# Initialize num_chunks for each node/edge.
num_chunks_details = _initialize_num_chunks(g, num_chunks, kwargs=kwargs)
......@@ -134,6 +137,9 @@ def _chunk_graph(
num_nodes_per_chunk.append(num_nodes_list)
metadata["edge_type"] = [etypestrs[etype] for etype in g.canonical_etypes]
metadata["num_edges_per_type"] = [
g.num_edges(etype) for etype in g.canonical_etypes
]
# Compute the number of edges per chunk per edge type
metadata["num_edges_per_chunk"] = num_edges_per_chunk = []
......
......@@ -44,16 +44,15 @@ def submit_jobs(args) -> str:
graph_name = schema_map["graph_name"]
# retrieve num_parts
num_chunks = len(schema_map["num_nodes_per_chunk"][0])
num_parts = num_chunks
num_parts = 0
partition_path = os.path.join(args.partitions_dir, "partition_meta.json")
if os.path.isfile(partition_path):
part_meta = load_partition_meta(partition_path)
num_parts = part_meta.num_parts
if num_parts > num_chunks:
raise Exception(
"Number of partitions should be less/equal than number of chunks."
)
assert (
num_parts != 0
), f"Invalid value for no. of partitions. Please check partition_meta.json file."
# verify ip_config
with open(args.ip_config, "r") as f:
......
......@@ -20,9 +20,7 @@ OWNER_PROCESS = "owner_proc_id"
PART_LOCAL_NID = "part_local_nid"
STR_NODE_TYPE = "node_type"
STR_NUM_NODES_PER_CHUNK = "num_nodes_per_chunk"
STR_EDGE_TYPE = "edge_type"
STR_NUM_EDGES_PER_CHUNK = "num_edges_per_chunk"
STR_EDGES = "edges"
STR_FORMAT = "format"
STR_FORMAT_DELIMITER = "delimiter"
......@@ -38,3 +36,8 @@ STR_NAME = "name"
STR_GRAPH_NAME = "graph_name"
STR_NODE_FEATURES = "node_features"
STR_EDGE_FEATURES = "edge_features"
STR_NUM_NODES_PER_TYPE = "num_nodes_per_type"
STR_NUM_EDGES_PER_TYPE = "num_edges_per_type"
STR_NTYPES = "ntypes"
......@@ -27,6 +27,8 @@ def create_dgl_object(
node_data,
edge_data,
edgeid_offset,
node_typecounts,
edge_typecounts,
return_orig_nids=False,
return_orig_eids=False,
):
......@@ -117,12 +119,10 @@ def create_dgl_object(
# create auxiliary data structures from the schema object
memory_snapshot("CreateDGLObj_Begin", part_id)
_, global_nid_ranges = get_idranges(
schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK],
schema[constants.STR_NODE_TYPE], node_typecounts
)
_, global_eid_ranges = get_idranges(
schema[constants.STR_EDGE_TYPE],
schema[constants.STR_NUM_EDGES_PER_CHUNK],
schema[constants.STR_EDGE_TYPE], edge_typecounts
)
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
......
......@@ -26,9 +26,11 @@ from utils import (
augment_edge_data,
get_edge_types,
get_etype_featnames,
get_gid_offsets,
get_gnid_range_map,
get_idranges,
get_node_types,
get_ntype_counts_map,
get_ntype_featnames,
map_partid_rank,
memory_snapshot,
......@@ -122,7 +124,10 @@ def gen_node_data(
# dictionary and range of global-nids in the second dictionary.
type_nid_dict, global_nid_dict = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
get_ntype_counts_map(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
),
num_chunks=num_parts,
)
......@@ -183,7 +188,8 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup):
edge_data : dictionary
edge information, as a dicitonary which stores column names as keys and values
as column data. This information is read from the edges.txt file.
id_lookup : DistLookService object
id_lookup : DistLookupService instance
this object will be used to retrieve ownership information of nodes
Returns:
--------
......@@ -217,7 +223,6 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup):
Total edges: {all_edges} Local_CHUNK_SIZE: {LOCAL_CHUNK_SIZE}"
)
# Start sending the chunks to the rest of the processes
for local_part_id in range(num_parts // world_size):
local_src_ids = []
local_dst_ids = []
......@@ -397,8 +402,12 @@ def exchange_feature(
tokens = feat_key.split("/")
assert len(tokens) == 3
local_feat_key = "/".join(tokens[:-1]) + "/" + str(local_part_id)
logging.info(
f"[Rank: {rank} feature: {feat_key}, gid_start - {gid_start} and gid_end - {gid_end}"
)
# Get the partition ids for the range of global nids.
if feat_type == constants.STR_NODE_FEATURES:
# Retrieve the partition ids for the node features.
......@@ -411,6 +420,9 @@ def exchange_feature(
# Ownership is determined by the destination node.
assert data is not None
global_eids = np.arange(gid_start, gid_end, dtype=np.int64)
logging.info(
f"[Rank: {rank} disk read global eids - min - {np.amin(data[constants.GLOBAL_EID])}, max - {np.amax(data[constants.GLOBAL_EID])}, count - {data[constants.GLOBAL_EID].shape}"
)
# Now use `data` to extract destination nodes' global id
# and use that to get the ownership
......@@ -418,6 +430,7 @@ def exchange_feature(
data[constants.GLOBAL_EID], global_eids, return_indices=True
)
assert common.shape[0] == idx2.shape[0]
assert common.shape[0] == global_eids.shape[0]
global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
......@@ -431,12 +444,16 @@ def exchange_feature(
if gids_per_partid.shape[0] == 0:
feats_per_rank.append(torch.empty((0, 1), dtype=torch.float))
global_id_per_rank.append(torch.empty((0, 1), dtype=torch.int64))
global_id_per_rank.append(torch.empty((0,), dtype=torch.int64))
else:
feats_per_rank.append(featdata_key[local_idx_partid])
global_id_per_rank.append(
torch.from_numpy(gids_per_partid).type(torch.int64)
)
for idx, tt in enumerate(feats_per_rank):
logging.info(
f"[Rank: {rank} features shape - {tt.shape} and ids - {global_id_per_rank[idx].shape}"
)
# features (and global nids) per rank to be sent out are ready
# for transmission, perform alltoallv here.
......@@ -608,6 +625,8 @@ def exchange_features(
logging.info(
f"[Rank: {rank}] Total time for feature exchange: {timedelta(seconds = end - start)}"
)
for k, v in own_features.items():
logging.info(f"Rank: {rank}] Key - {k} Value - {v.shape}")
return own_features, own_global_ids
......@@ -693,6 +712,7 @@ def exchange_graph_data(
was performed in the `exchange_features` function call
"""
memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
logging.info(f"[Rank: {rank} - node_feat_tids - {node_feat_tids}")
rcvd_node_features, rcvd_global_nids = exchange_features(
rank,
world_size,
......@@ -743,7 +763,7 @@ def exchange_graph_data(
)
def read_dataset(rank, world_size, id_lookup, params, schema_map):
def read_dataset(rank, world_size, id_lookup, params, schema_map, ntype_counts):
"""
This function gets the dataset and performs post-processing on the data which is read from files.
Additional information(columns) are added to nodes metadata like owner_process, global_nid which
......@@ -793,10 +813,10 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map):
"""
edge_features = {}
(
node_tids,
node_features,
node_feat_tids,
edge_data,
edge_typecounts,
edge_tids,
edge_features,
edge_feat_tids,
......@@ -807,25 +827,27 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map):
world_size,
params.num_parts,
schema_map,
ntype_counts,
)
# Synchronize so that everybody completes reading dataset from disk
dist.barrier()
logging.info(f"[Rank: {rank}] Done reading dataset {params.input_dir}")
dist.barrier() # SYNCH
edge_data = augment_edge_data(
edge_data, id_lookup, edge_tids, rank, world_size, params.num_parts
)
dist.barrier() # SYNCH
logging.info(
f"[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}"
)
return (
node_tids,
node_features,
node_feat_tids,
edge_data,
edge_typecounts,
edge_features,
edge_tids,
edge_feat_tids,
)
......@@ -994,47 +1016,60 @@ def gen_dist_partitions(rank, world_size, params):
f"[Rank: {rank}] Starting distributed data processing pipeline..."
)
memory_snapshot("Pipeline Begin: ", rank)
# init processing
schema_map = read_json(os.path.join(params.input_dir, params.schema))
# Initialize distributed lookup service for partition-id and shuffle-global-nids mappings
# for global-nids
_, global_nid_ranges = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
params.num_parts,
)
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
# The resources, which are node-id to partition-id mappings, are split
# into `world_size` number of parts, where each part can be mapped to
# each physical node.
id_lookup = DistLookupService(
os.path.join(params.input_dir, params.partitions_dir),
schema_map[constants.STR_NODE_TYPE],
id_map,
rank,
world_size,
params.num_parts,
)
# get the id to name mappings here.
ntypes_ntypeid_map, ntypes, ntypeid_ntypes_map = get_node_types(schema_map)
etypes_etypeid_map, etypes, etypeid_etypes_map = get_edge_types(schema_map)
logging.info(
f"[Rank: {rank}] Initialized metis partitions and node_types map..."
)
# Initialize distributed lookup service for partition-id and shuffle-global-nids mappings
# for global-nids
_, global_nid_ranges = get_idranges(
schema_map[constants.STR_NODE_TYPE],
get_ntype_counts_map(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
),
)
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
id_lookup.set_idMap(id_map)
# read input graph files and augment these datastructures with
# appropriate information (global_nid and owner process) for node and edge data
(
node_tids,
node_features,
node_feat_tids,
edge_data,
edge_typecounts,
edge_features,
edge_tids,
edge_feat_tids,
) = read_dataset(rank, world_size, id_lookup, params, schema_map)
) = read_dataset(
rank,
world_size,
id_lookup,
params,
schema_map,
get_ntype_counts_map(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
),
)
logging.info(
f"[Rank: {rank}] Done augmenting file input data with auxilary columns"
)
......@@ -1043,8 +1078,19 @@ def gen_dist_partitions(rank, world_size, params):
# send out node and edge data --- and appropriate features.
# this function will also stitch the data recvd from other processes
# and return the aggregated data
ntypes_gnid_range_map = get_gnid_range_map(node_tids)
etypes_geid_range_map = get_gnid_range_map(edge_tids)
# ntypes_gnid_range_map = get_gnid_range_map(node_tids)
# etypes_geid_range_map = get_gnid_range_map(edge_tids)
ntypes_gnid_range_map = get_gid_offsets(
schema_map[constants.STR_NODE_TYPE],
get_ntype_counts_map(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
),
)
etypes_geid_range_map = get_gid_offsets(
schema_map[constants.STR_EDGE_TYPE], edge_typecounts
)
(
node_data,
rcvd_node_features,
......@@ -1211,6 +1257,11 @@ def gen_dist_partitions(rank, world_size, params):
local_node_data,
local_edge_data,
num_edges,
get_ntype_counts_map(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
),
edge_typecounts,
params.save_orig_nids,
params.save_orig_eids,
)
......
......@@ -11,7 +11,12 @@ import pyarrow.parquet as pq
import torch
import torch.distributed as dist
from gloo_wrapper import alltoallv_cpu
from utils import generate_read_list, get_idranges, map_partid_rank
from utils import (
generate_read_list,
get_gid_offsets,
get_idranges,
map_partid_rank,
)
DATA_TYPE_ID = {
......@@ -35,99 +40,77 @@ DATA_TYPE_ID = {
REV_DATA_TYPE_ID = {id: data_type for data_type, id in DATA_TYPE_ID.items()}
def _shuffle_data(data, rank, world_size, tids, num_parts):
"""Each process scatters loaded data to all processes in a group and
return gathered data.
def _broadcast_shape(
data, rank, world_size, num_parts, is_feat_data, feat_name
):
"""Auxiliary function to broadcast the shape of a feature data.
This information is used to figure out the type-ids for the
local features.
Parameters
----------
data: tensor
Loaded node or edge data.
rank: int
Rank of current process.
world_size: int
Total number of processes in group.
tids: list[tuple]
Type-wise node/edge IDs.
num_parts: int
Number of partitions.
Returns
Parameters:
-----------
data : numpy array
which is the feature data read from the disk
rank : integer
which represents the id of the process in the process group
world_size : integer
represents the total no. of process in the process group
num_parts : integer
specifying the no. of partitions
is_feat_data : bool
flag used to seperate feature data and edge data
feat_name : string
name of the feature
Returns:
-------
shuffled_data: tensor
Shuffled node or edge data.
list of tuples :
which represents the range of type-ids for the data array.
"""
# Broadcast basic information of loaded data:
# 1. number of data lines
# 2. data dimension
# 3. data type
assert len(data.shape) in [
1,
2,
], f"Data is expected to be 1-D or 2-D but got {data.shape}."
data_shape = list(data.shape)
if len(data_shape) == 1:
data_shape.append(1)
if is_feat_data:
data_shape.append(DATA_TYPE_ID[data.dtype])
data_shape = torch.tensor(data_shape, dtype=torch.int64)
data_shape = torch.tensor(data_shape, dtype=torch.int64)
data_shape_output = [
torch.zeros_like(data_shape) for _ in range(world_size)
]
dist.all_gather(data_shape_output, data_shape)
logging.info(
f"[Rank: {rank} Received shapes from all ranks: {data_shape_output}"
)
shapes = [x.numpy() for x in data_shape_output if x[0] != 0]
shapes = np.vstack(shapes)
# Rank~0 always succeeds to load non-empty data, so we fetch info from it.
data_dim = data_shape_output[0][1].item()
data_type = REV_DATA_TYPE_ID[data_shape_output[0][2].item()]
data_lines = [data_shape[0].item() for data_shape in data_shape_output]
data_lines.insert(0, 0)
data_lines = np.cumsum(data_lines)
# prepare for scatter
data_list = [None] * world_size
if data.shape[0] > 0:
for local_part_id in range(num_parts):
target_rank = map_partid_rank(local_part_id, world_size)
start, end = tids[local_part_id]
global_start = data_lines[rank]
global_end = data_lines[rank + 1]
if start >= global_end or end <= global_start:
continue
read_start = max(0, start - global_start)
read_end = min(data.shape[0], end - global_start)
if data_list[target_rank] is None:
data_list[target_rank] = []
data_list[target_rank].append(data[read_start:read_end])
data_input = [None] * world_size
for i, data in enumerate(data_list):
if data is None or len(data) == 0:
if data_dim == 1:
data_input[i] = torch.zeros((0,), dtype=data_type)
else:
data_input[i] = torch.zeros((0, data_dim), dtype=data_type)
else:
data_input[i] = torch.cat(data).to(dtype=data_type)
del data_list
gc.collect()
local_data = data_input[rank]
if data_dim == 1:
data_input[rank] = torch.zeros((0,), dtype=data_type)
else:
data_input[rank] = torch.zeros((0, data_dim), dtype=data_type)
if is_feat_data:
logging.info(
f"shapes: {shapes}, condition: {all(shapes[0,2] == s for s in shapes[:,2])}"
)
assert all(
shapes[0, 2] == s for s in shapes[:, 2]
), f"dtypes for {feat_name} does not match on all ranks"
# scatter and gather data
data_output = alltoallv_cpu(rank, world_size, data_input)
data_output[rank] = local_data
data_output = [data for data in data_output if data is not None]
data_output = torch.cat(data_output)
# compute tids here.
type_counts = list(shapes[:, 0])
tid_start = np.cumsum([0] + type_counts[:-1])
tid_end = np.cumsum(type_counts)
tid_ranges = list(zip(tid_start, tid_end))
logging.info(f"starts -> {tid_start} ... end -> {tid_end}")
return data_output
return tid_ranges
def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
def get_dataset(
input_dir, graph_name, rank, world_size, num_parts, schema_map, ntype_counts
):
"""
Function to read the multiple file formatted dataset.
......@@ -249,11 +232,18 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
"""
# read my nodes for each node type
"""
node_tids, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
num_chunks=num_parts,
)
"""
logging.info(f"[Rank: {rank} ntype_counts: {ntype_counts}")
ntype_gnid_offset = get_gid_offsets(
schema_map[constants.STR_NODE_TYPE], ntype_counts
)
logging.info(f"[Rank: {rank} - ntype_gnid_offset = {ntype_gnid_offset}")
# iterate over the "node_data" dictionary in the schema_map
# read the node features if exists
......@@ -290,29 +280,32 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
else:
node_data = np.array([])
node_data = torch.from_numpy(node_data)
# scatter and gather data.
node_data = _shuffle_data(
cur_tids = _broadcast_shape(
node_data,
rank,
world_size,
node_tids[ntype_name],
num_parts,
True,
f"{ntype_name}/{feat_name}",
)
logging.info(f"[Rank: {rank} - cur_tids: {cur_tids}")
# collect data on current rank.
offset = 0
for local_part_id in range(num_parts):
data_key = (
f"{ntype_name}/{feat_name}/{local_part_id//world_size}"
)
if map_partid_rank(local_part_id, world_size) == rank:
nfeat = []
nfeat_tids = []
start, end = node_tids[ntype_name][local_part_id]
nfeat = node_data[offset : offset + end - start]
data_key = f"{ntype_name}/{feat_name}/{local_part_id//world_size}"
node_features[data_key] = nfeat
nfeat_tids.append(node_tids[ntype_name][local_part_id])
node_feature_tids[data_key] = nfeat_tids
offset += end - start
if len(cur_tids) > local_part_id:
start, end = cur_tids[local_part_id]
assert node_data.shape[0] == (
end - start
), f"Node feature data, {data_key}, shape = {node_data.shape} does not match with tids = ({start},{end})"
node_features[data_key] = node_data
node_feature_tids[data_key] = [(start, end)]
else:
node_features[data_key] = None
node_feature_tids[data_key] = [(0, 0)]
# done building node_features locally.
if len(node_features) <= 0:
......@@ -328,10 +321,12 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
# local_part_id indicates the partition-id, in the context of current
# process which take the values 0, 1, 2, ....
for feat_name, feat_info in node_features.items():
if feat_info == None:
continue
logging.info(
f"[Rank: {rank}] node feature name: {feat_name}, feature data shape: {feat_info.size()}"
)
tokens = feat_name.split("/")
assert len(tokens) == 3
......@@ -385,13 +380,6 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
edge_features = {}
edge_feature_tids = {}
# Read edges for each edge type that are processed by the currnet process.
edge_tids, _ = get_idranges(
schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK],
num_parts,
)
# Iterate over the "edge_data" dictionary in the schema_map.
# Read the edge features if exists.
# Also keep track of the type_eids for which the edge_features are read.
......@@ -416,6 +404,9 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
data_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(data_file):
data_file = os.path.join(input_dir, data_file)
logging.info(
f"[Rank: {rank}] Loading edges-feats of {etype_name}[{feat_name}] from {data_file}"
)
edge_data.append(
array_readwriter.get_array_parser(
**reader_fmt_meta
......@@ -427,28 +418,32 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
edge_data = np.array([])
edge_data = torch.from_numpy(edge_data)
# scatter and gather data.
edge_data = _shuffle_data(
# exchange the amount of data read from the disk.
edge_tids = _broadcast_shape(
edge_data,
rank,
world_size,
edge_tids[etype_name],
num_parts,
True,
f"{etype_name}/{feat_name}",
)
# collect data on current rank.
offset = 0
for local_part_id in range(num_parts):
data_key = (
f"{etype_name}/{feat_name}/{local_part_id//world_size}"
)
if map_partid_rank(local_part_id, world_size) == rank:
efeats = []
efeat_tids = []
start, end = edge_tids[etype_name][local_part_id]
efeats = edge_data[offset : offset + end - start]
efeat_tids.append(edge_tids[etype_name][local_part_id])
data_key = f"{etype_name}/{feat_name}/{local_part_id//world_size}"
edge_features[data_key] = efeats
edge_feature_tids[data_key] = efeat_tids
offset += end - start
if len(edge_tids) > local_part_id:
start, end = edge_tids[local_part_id]
assert edge_data.shape[0] == (
end - start
), f"Edge Feature data, for {data_key}, of shape = {edge_data.shape} does not match with tids = ({start}, {end})"
edge_features[data_key] = edge_data
edge_feature_tids[data_key] = [(start, end)]
else:
edge_features[data_key] = None
edge_feature_tids[data_key] = []
# Done with building node_features locally.
if len(edge_features) <= 0:
......@@ -459,6 +454,8 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
assert len(edge_features) == len(edge_feature_tids)
for k, v in edge_features.items():
if v == None:
continue
logging.info(
f"[Rank: {rank}] edge feature name: {k}, feature data shape: {v.shape}"
)
......@@ -511,12 +508,9 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
# read my edges for each edge type
etype_names = schema_map[constants.STR_EDGE_TYPE]
etype_name_idmap = {e: idx for idx, e in enumerate(etype_names)}
edge_tids, _ = get_idranges(
schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK],
num_chunks=num_parts,
)
edge_tids = {}
edge_typecounts = {}
edge_datadict = {}
edge_data = schema_map[constants.STR_EDGES]
......@@ -541,21 +535,25 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
dst_ntype_name = tokens[2]
num_chunks = len(edge_info)
read_list = generate_read_list(num_chunks, num_parts)
# read_list = generate_read_list(num_chunks, num_parts)
read_list = generate_read_list(num_chunks, world_size)
src_ids = []
dst_ids = []
"""
curr_partids = []
for part_id in range(num_parts):
if map_partid_rank(part_id, world_size) == rank:
curr_partids.append(read_list[part_id])
for idx in np.concatenate(curr_partids):
"""
for idx in read_list[rank]:
edge_file = edge_info[idx]
if not os.path.isabs(edge_file):
edge_file = os.path.join(input_dir, edge_file)
logging.info(
f"Loading edges of etype[{etype_name}] from {edge_file}"
f"[Rank: {rank}] Loading edges of etype[{etype_name}] from {edge_file}"
)
if (
......@@ -593,30 +591,45 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
f"Unknown edge format {etype_info[constants.STR_FORMAT][constants.STR_NAME]} for edge type {etype_name}"
)
if len(src_ids) > 0:
src_ids = np.concatenate(src_ids)
dst_ids = np.concatenate(dst_ids)
# currently these are just type_edge_ids... which will be converted to global ids
edge_datadict[constants.GLOBAL_SRC_ID].append(
src_ids + ntype_gnid_offset[src_ntype_name][0, 0]
src_ids + ntype_gnid_offset[src_ntype_name][0]
)
edge_datadict[constants.GLOBAL_DST_ID].append(
dst_ids + ntype_gnid_offset[dst_ntype_name][0, 0]
dst_ids + ntype_gnid_offset[dst_ntype_name][0]
)
edge_datadict[constants.ETYPE_ID].append(
etype_name_idmap[etype_name]
* np.ones(shape=(src_ids.shape), dtype=np.int64)
)
else:
src_ids = np.array([])
# broadcast shape to compute the etype_id, and global_eid's later.
cur_tids = _broadcast_shape(
src_ids, rank, world_size, num_parts, False, None
)
edge_typecounts[etype_name] = cur_tids[-1][1]
edge_tids[etype_name] = cur_tids
for local_part_id in range(num_parts):
if map_partid_rank(local_part_id, world_size) == rank:
if len(cur_tids) > local_part_id:
edge_datadict[constants.GLOBAL_TYPE_EID].append(
np.arange(
edge_tids[etype_name][local_part_id][0],
edge_tids[etype_name][local_part_id][1],
cur_tids[local_part_id][0],
cur_tids[local_part_id][1],
dtype=np.int64,
)
)
# edge_tids[etype_name] = [(cur_tids[local_part_id][0], cur_tids[local_part_id][1])]
assert len(edge_datadict[constants.GLOBAL_SRC_ID]) == len(
edge_datadict[constants.GLOBAL_TYPE_EID]
), f"Error while reading edges from the disk, local_part_id = {local_part_id}, num_parts = {num_parts}, world_size = {world_size} cur_tids = {cur_tids}"
# stitch together to create the final data on the local machine
for col in [
......@@ -625,8 +638,10 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
constants.GLOBAL_TYPE_EID,
constants.ETYPE_ID,
]:
if len(edge_datadict[col]) > 0:
edge_datadict[col] = np.concatenate(edge_datadict[col])
if len(edge_datadict[constants.GLOBAL_SRC_ID]) > 0:
assert (
edge_datadict[constants.GLOBAL_SRC_ID].shape
== edge_datadict[constants.GLOBAL_DST_ID].shape
......@@ -642,13 +657,23 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
logging.info(
f"[Rank: {rank}] Done reading edge_file: {len(edge_datadict)}, {edge_datadict[constants.GLOBAL_SRC_ID].shape}"
)
else:
assert edge_datadict[constants.GLOBAL_SRC_ID] == []
assert edge_datadict[constants.GLOBAL_DST_ID] == []
assert edge_datadict[constants.GLOBAL_TYPE_EID] == []
edge_datadict[constants.GLOBAL_SRC_ID] = np.array([], dtype=np.int64)
edge_datadict[constants.GLOBAL_DST_ID] = np.array([], dtype=np.int64)
edge_datadict[constants.GLOBAL_TYPE_EID] = np.array([], dtype=np.int64)
edge_datadict[constants.ETYPE_ID] = np.array([], dtype=np.int64)
logging.info(f"Rank: {rank} edge_feat_tids: {edge_feature_tids}")
return (
node_tids,
node_features,
node_feature_tids,
edge_datadict,
edge_typecounts,
edge_tids,
edge_features,
edge_feature_tids,
......
......@@ -49,9 +49,7 @@ class DistLookupService:
interger representing the no. of partitions
"""
def __init__(
self, input_dir, ntype_names, id_map, rank, world_size, num_parts
):
def __init__(self, input_dir, ntype_names, rank, world_size, num_parts):
assert os.path.isdir(input_dir)
assert ntype_names is not None
assert len(ntype_names) > 0
......@@ -61,6 +59,7 @@ class DistLookupService:
type_nid_end = []
partid_list = []
ntype_count = []
ntypes = []
# Iterate over the node types and extract the partition id mappings.
for ntype in ntype_names:
......@@ -90,6 +89,7 @@ class DistLookupService:
ntype_partids = np.concatenate(ntype_partids)
count = len(ntype_partids)
ntype_count.append(count)
ntypes.append(ntype)
# Each rank assumes a contiguous set of partition-ids which are equally split
# across all the processes.
......@@ -109,16 +109,23 @@ class DistLookupService:
# Explicitly release the array read from the file.
del ntype_partids
logging.info(
f"[Rank: {rank}] ntypeid begin - {type_nid_begin} - {type_nid_end}"
)
# Store all the information in the object instance variable.
self.id_map = id_map
self.type_nid_begin = np.array(type_nid_begin, dtype=np.int64)
self.type_nid_end = np.array(type_nid_end, dtype=np.int64)
self.partid_list = partid_list
self.ntype_count = np.array(ntype_count, dtype=np.int64)
self.ntypes = ntypes
self.rank = rank
self.world_size = world_size
self.num_parts = num_parts
def set_idMap(self, id_map):
self.id_map = id_map
def get_partition_ids(self, agg_global_nids):
"""
This function is used to get the partition-ids for a given set of global node ids
......@@ -302,7 +309,10 @@ class DistLookupService:
agg_partition_ids.append(owner_ids)
# Stitch the list of partition-ids and return to the caller
if len(agg_partition_ids) > 0:
agg_partition_ids = np.concatenate(agg_partition_ids)
else:
agg_partition_ids = np.array([], dtype=np.int64)
assert agg_global_nids.shape[0] == agg_partition_ids.shape[0]
# Now the owner_ids (partition-ids) which corresponding to the global_nids.
......
......@@ -49,7 +49,12 @@ def post_process(params):
ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema)
type_nid_dict, ntype_gnid_offset = get_idranges(
schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK],
dict(
zip(
schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_TYPE],
)
),
)
outdir = Path(params.partitions_dir)
......
import argparse
import logging
import os
import platform
import sys
from datetime import timedelta
from pathlib import Path
from timeit import default_timer as timer
import array_readwriter
......@@ -13,8 +16,7 @@ import pyarrow
import pyarrow.csv as csv
import pyarrow.parquet as pq
import torch
import torch.distributed as dist
from utils import get_idranges, get_node_types, read_json
from utils import generate_read_list, get_idranges, get_node_types, read_json
def get_proc_info():
......@@ -41,7 +43,7 @@ def get_proc_info():
return 0
def gen_edge_files(schema_map, output):
def gen_edge_files(schema_map, params):
"""Function to create edges files to be consumed by ParMETIS
for partitioning purposes.
......@@ -63,22 +65,23 @@ def gen_edge_files(schema_map, output):
rank = get_proc_info()
type_nid_dict, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
dict(
zip(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
)
),
)
# Regenerate edge files here.
edge_data = schema_map[constants.STR_EDGES]
etype_names = schema_map[constants.STR_EDGE_TYPE]
etype_name_idmap = {e: idx for idx, e in enumerate(etype_names)}
edge_tids, _ = get_idranges(
schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK],
)
outdir = Path(output)
outdir = Path(params.output_dir)
os.makedirs(outdir, exist_ok=True)
edge_files = []
num_parts = len(schema_map[constants.STR_NUM_EDGES_PER_CHUNK][0])
num_parts = params.num_parts
for etype_name, etype_info in edge_data.items():
edges_format = etype_info[constants.STR_FORMAT][constants.STR_NAME]
edge_data_files = etype_info[constants.STR_DATA]
......@@ -91,16 +94,16 @@ def gen_edge_files(schema_map, output):
rel_name = tokens[1]
dst_ntype_name = tokens[2]
def convert_to_numpy_and_write_back(data_df):
data_f0 = data_df["f0"].to_numpy()
data_f1 = data_df["f1"].to_numpy()
def process_and_write_back(data_df, idx):
data_f0 = data_df[:, 0]
data_f1 = data_df[:, 1]
global_src_id = data_f0 + ntype_gnid_offset[src_ntype_name][0, 0]
global_dst_id = data_f1 + ntype_gnid_offset[dst_ntype_name][0, 0]
cols = [global_src_id, global_dst_id]
col_names = ["global_src_id", "global_dst_id"]
out_file = edge_data_files[rank].split("/")[-1]
out_file = edge_data_files[idx].split("/")[-1]
out_file = os.path.join(outdir, "edges_{}".format(out_file))
# TODO(thvasilo): We should support writing to the same format as the input
......@@ -113,80 +116,22 @@ def gen_edge_files(schema_map, output):
)
return out_file
if edges_format == constants.STR_CSV:
delimiter = etype_info[constants.STR_FORMAT][
constants.STR_FORMAT_DELIMITER
]
data_df = csv.read_csv(
edge_data_files[rank],
read_options=pyarrow.csv.ReadOptions(
autogenerate_column_names=True
),
parse_options=pyarrow.csv.ParseOptions(delimiter=delimiter),
# handle any no. of files case here.
file_idxes = generate_read_list(len(edge_data_files), params.num_parts)
for idx in file_idxes[rank]:
reader_fmt_meta = {
"name": etype_info[constants.STR_FORMAT][constants.STR_NAME]
}
data_df = array_readwriter.get_array_parser(**reader_fmt_meta).read(
edge_data_files[idx]
)
elif edges_format == constants.STR_PARQUET:
data_df = pq.read_table(edge_data_files[rank])
data_df = data_df.rename_columns(["f0", "f1"])
else:
raise NotImplementedError(f"Unknown edge format {edges_format}")
out_file = convert_to_numpy_and_write_back(data_df)
out_file = process_and_write_back(data_df, idx)
edge_files.append(out_file)
return edge_files
def read_node_features(schema_map, tgt_ntype_name, feat_names, input_dir):
"""Helper function to read the node features.
Only node features which are requested are read from the input dataset.
Parameters:
-----------
schema_map : json dictionary
Dictionary created by reading the metadata.json file for the input dataset.
tgt_ntype_name : string
node-type name, for which node features will be read from the input dataset.
feat_names : set
A set of strings, feature names, which will be read for a given node type.
input_dir : str
The input directory where the dataset is located.
Returns:
--------
dictionary :
A dictionary where key is the feature-name and value is the numpy array.
"""
rank = get_proc_info()
node_features = {}
if constants.STR_NODE_DATA in schema_map:
dataset_features = schema_map[constants.STR_NODE_DATA]
if dataset_features and (len(dataset_features) > 0):
for ntype_name, ntype_feature_data in dataset_features.items():
if ntype_name != tgt_ntype_name:
continue
# ntype_feature_data is a dictionary
# where key: feature_name, value: dictionary in which keys are "format", "data".
for feat_name, feat_data in ntype_feature_data.items():
if feat_name in feat_names:
feat_data_fname = feat_data[constants.STR_DATA][rank]
if not os.path.isabs(feat_data_fname):
feat_data_fname = os.path.join(
input_dir, feat_data_fname
)
logging.info(f"Reading: {feat_data_fname}")
file_suffix = Path(feat_data_fname).suffix
reader_fmt_meta = {"name": file_suffix[1:]}
node_features[
feat_name
] = array_readwriter.get_array_parser(
**reader_fmt_meta
).read(
feat_data_fname
)
return node_features
def gen_node_weights_files(schema_map, input_dir, output):
def gen_node_weights_files(schema_map, params):
"""Function to create node weight files for ParMETIS along with the edge files.
This function generates node-data files, which will be read by the ParMETIS
......@@ -205,8 +150,6 @@ def gen_node_weights_files(schema_map, input_dir, output):
-----------
schema_map : json dictionary
Dictionary created by reading the metadata.json file for the input dataset.
input_dir : str
The input directory where the dataset is located.
output : string
Location of storing the node-weights and edge files for ParMETIS.
......@@ -221,28 +164,45 @@ def gen_node_weights_files(schema_map, input_dir, output):
ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema_map)
type_nid_dict, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
dict(
zip(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
)
),
)
node_files = []
outdir = Path(output)
outdir = Path(params.output_dir)
os.makedirs(outdir, exist_ok=True)
for ntype_id, ntype_name in ntid_ntype_map.items():
type_start, type_end = (
type_nid_dict[ntype_name][rank][0],
type_nid_dict[ntype_name][rank][1],
# This ntype does not have any train/test/val masks...
# Each rank will generate equal no. of rows for this node type.
total_count = schema_map[constants.STR_NUM_NODES_PER_TYPE][ntype_id]
per_rank_range = np.ones((params.num_parts,), dtype=np.int64) * (
total_count // params.num_parts
)
count = type_end - type_start
sz = (count,)
for i in range(total_count % params.num_parts):
per_rank_range[i] += 1
tid_start = np.cumsum([0] + list(per_rank_range[:-1]))
tid_end = np.cumsum(list(per_rank_range))
local_tid_start = tid_start[rank]
local_tid_end = tid_end[rank]
sz = local_tid_end - local_tid_start
cols = []
col_names = []
# ntype-id
cols.append(
pyarrow.array(np.ones(sz, dtype=np.int64) * np.int64(ntype_id))
)
col_names.append("ntype")
# one-hot vector for ntype-id here.
for i in range(len(ntypes)):
if i == ntype_id:
cols.append(pyarrow.array(np.ones(sz, dtype=np.int64)))
......@@ -250,23 +210,10 @@ def gen_node_weights_files(schema_map, input_dir, output):
cols.append(pyarrow.array(np.zeros(sz, dtype=np.int64)))
col_names.append("w{}".format(i))
# Add train/test/validation masks if present. node-degree will be added when this file
# is read by ParMETIS to mimic the exisiting single process pipeline present in dgl.
node_feats = read_node_features(
schema_map,
ntype_name,
set(["train_mask", "val_mask", "test_mask"]),
input_dir,
)
for k, v in node_feats.items():
assert sz == v.shape
cols.append(pyarrow.array(v))
col_names.append(k)
# `type_nid` should be the very last column in the node weights files.
cols.append(
pyarrow.array(
np.arange(count, dtype=np.int64) + np.int64(type_start)
np.arange(local_tid_start, local_tid_end, dtype=np.int64)
)
)
col_names.append("type_nid")
......@@ -282,8 +229,8 @@ def gen_node_weights_files(schema_map, input_dir, output):
)
node_files.append(
(
ntype_gnid_offset[ntype_name][0, 0] + type_start,
ntype_gnid_offset[ntype_name][0, 0] + type_end,
ntype_gnid_offset[ntype_name][0, 0] + local_tid_start,
ntype_gnid_offset[ntype_name][0, 0] + local_tid_end,
out_file,
)
)
......@@ -311,13 +258,16 @@ def gen_parmetis_input_args(params, schema_map):
Dictionary object created after reading the graph metadata.json file.
"""
num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK]
# TODO: This makes the assumption that all node files have the same number of chunks
num_node_parts = len(num_nodes_per_chunk[0])
ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema_map)
type_nid_dict, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK],
dict(
zip(
schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_TYPE],
)
),
)
# Check if <graph-name>_stats.txt exists, if not create one using metadata.
......@@ -331,27 +281,11 @@ def gen_parmetis_input_args(params, schema_map):
), "Graph name is not present in the json file"
graph_name = schema_map[constants.STR_GRAPH_NAME]
if not os.path.isfile(f"{graph_name}_stats.txt"):
num_nodes = np.sum(
np.concatenate(schema_map[constants.STR_NUM_NODES_PER_CHUNK])
)
num_edges = np.sum(
np.concatenate(schema_map[constants.STR_NUM_EDGES_PER_CHUNK])
)
num_nodes = np.sum(schema_map[constants.STR_NUM_NODES_PER_TYPE])
num_edges = np.sum(schema_map[constants.STR_NUM_EDGES_PER_TYPE])
num_ntypes = len(schema_map[constants.STR_NODE_TYPE])
train_mask = test_mask = val_mask = 0
node_feats = schema_map[constants.STR_NODE_DATA]
for ntype, ntype_data in node_feats.items():
if "train_mask" in ntype_data:
train_mask += 1
if "test_mask" in ntype_data:
test_mask += 1
if "val_mask" in ntype_data:
val_mask += 1
train_mask = train_mask // num_ntypes
test_mask = test_mask // num_ntypes
val_mask = val_mask // num_ntypes
num_constraints = num_ntypes + train_mask + test_mask + val_mask
num_constraints = num_ntypes
with open(f"{graph_name}_stats.txt", "w") as sf:
sf.write(f"{num_nodes} {num_edges} {num_constraints}")
......@@ -361,19 +295,28 @@ def gen_parmetis_input_args(params, schema_map):
os.makedirs(outdir, exist_ok=True)
for ntype_id, ntype_name in ntid_ntype_map.items():
global_nid_offset = ntype_gnid_offset[ntype_name][0, 0]
for r in range(num_node_parts):
type_start, type_end = (
type_nid_dict[ntype_name][r][0],
type_nid_dict[ntype_name][r][1],
total_count = schema_map[constants.STR_NUM_NODES_PER_TYPE][ntype_id]
per_rank_range = np.ones((params.num_parts,), dtype=np.int64) * (
total_count // params.num_parts
)
for i in range(total_count % params.num_parts):
per_rank_range[i] += 1
tid_start = np.cumsum([0] + list(per_rank_range[:-1]))
tid_end = np.cumsum(per_rank_range)
logging.info(f" tid-start = {tid_start}, tid-end = {tid_end}")
logging.info(f" per_rank_range - {per_rank_range}")
for rank in range(params.num_parts):
local_tid_start = tid_start[rank]
local_tid_end = tid_end[rank]
out_file = os.path.join(
outdir, "node_weights_{}_{}.txt".format(ntype_name, r)
outdir, "node_weights_{}_{}.txt".format(ntype_name, rank)
)
node_files.append(
(
out_file,
global_nid_offset + type_start,
global_nid_offset + type_end,
global_nid_offset + local_tid_start,
global_nid_offset + local_tid_end,
)
)
......@@ -409,15 +352,12 @@ def run_preprocess_data(params):
An instance of argparser class which stores command line arguments.
"""
logging.info(f"Starting to generate ParMETIS files...")
rank = get_proc_info()
schema_map = read_json(params.schema_file)
num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK]
num_parts = len(num_nodes_per_chunk[0])
gen_node_weights_files(schema_map, params.input_dir, params.output_dir)
gen_node_weights_files(schema_map, params)
logging.info(f"Done with node weights....")
gen_edge_files(schema_map, params.output_dir)
gen_edge_files(schema_map, params)
logging.info(f"Done with edge weights...")
if rank == 0:
......@@ -444,8 +384,9 @@ if __name__ == "__main__":
)
parser.add_argument(
"--input_dir",
required=True,
type=str,
help="The input directory where the dataset is located",
help="This directory will be used as the relative directory to locate files, if absolute paths are not used",
)
parser.add_argument(
"--output_dir",
......@@ -453,7 +394,20 @@ if __name__ == "__main__":
type=str,
help="The output directory for the node weights files and auxiliary files for ParMETIS.",
)
parser.add_argument(
"--num_parts",
required=True,
type=int,
help="Total no. of output graph partitions.",
)
params = parser.parse_args()
# Configure logging.
logging.basicConfig(
level="INFO",
format=f"[{platform.node()} \
%(levelname)s %(asctime)s PID:%(process)d] %(message)s",
)
# Invoke the function to generate files for parmetis
run_preprocess_data(params)
......@@ -169,6 +169,52 @@ def get_node_types(schema_map):
return ntype_ntypeid_map, ntypes, ntypeid_ntype_map
def get_gid_offsets(typenames, typecounts):
"""
Builds a map where the key-value pairs are typnames and respective
global-id offsets.
Parameters:
-----------
typenames : list of strings
a list of strings which can be either node typenames or edge typenames
typecounts : list of integers
a list of integers indicating the total number of nodes/edges for its
typeid which is the index in this list
Returns:
--------
dictionary :
a dictionary where keys are node_type names and values are
global_nid range, which is a tuple.
"""
assert len(typenames) == len(
typecounts
), f"No. of typenames does not match with its type counts names = {typenames}, counts = {typecounts}"
counts = []
for name in typenames:
counts.append(typecounts[name])
starts = np.cumsum([0] + counts[:-1])
ends = np.cumsum(counts)
gid_offsets = {}
for idx, name in enumerate(typenames):
gid_offsets[name] = [starts[idx], ends[idx]]
return gid_offsets
"""
starts = np.cumsum([0] + type_counts[:-1])
ends = np.cumsum(type_counts)
gid_offsets = {}
for idx, name in enumerate(typenames):
gid_offsets[name] = [start[idx], ends[idx]]
return gid_offsets
"""
def get_gnid_range_map(node_tids):
"""
Retrieves auxiliary dictionaries from the metadata json object
......@@ -312,8 +358,6 @@ def augment_edge_data(
etype_offset = {}
offset = 0
for etype_name, tid_range in edge_tids.items():
assert int(tid_range[0][0]) == 0
assert len(tid_range) == num_parts
etype_offset[etype_name] = offset + int(tid_range[0][0])
offset += int(tid_range[-1][1])
......@@ -321,14 +365,19 @@ def augment_edge_data(
for etype_name, tid_range in edge_tids.items():
for idx in range(num_parts):
if map_partid_rank(idx, world_size) == rank:
if len(tid_range) > idx:
global_eid_start = etype_offset[etype_name]
begin = global_eid_start + int(tid_range[idx][0])
end = global_eid_start + int(tid_range[idx][1])
global_eids.append(np.arange(begin, end, dtype=np.int64))
global_eids = np.concatenate(global_eids)
global_eids = (
np.concatenate(global_eids)
if len(global_eids) > 0
else np.array([], dtype=np.int64)
)
assert global_eids.shape[0] == edge_data[constants.ETYPE_ID].shape[0]
edge_data[constants.GLOBAL_EID] = global_eids
return edge_data
......@@ -514,19 +563,18 @@ def write_dgl_objects(
def get_idranges(names, counts, num_chunks=None):
"""
Utility function to compute typd_id/global_id ranges for both nodes and edges.
counts will be a list of numbers of a dictionary.
Length is less than or equal to the num_parts variable.
Parameters:
-----------
names : list of strings
list of node/edge types as strings
counts : list of lists
each list contains no. of nodes/edges in a given chunk
which are either node-types or edge-types
counts : list of integers
which are total no. of nodes or edges for a give node
or edge type
num_chunks : int, optional
In distributed partition pipeline, ID ranges are grouped into chunks.
In some scenarios, we'd like to merge ID ranges into specific number
of chunks. This parameter indicates the expected number of chunks.
If not specified, no merge is applied.
specifying the no. of chunks
Returns:
--------
......@@ -542,38 +590,35 @@ def get_idranges(names, counts, num_chunks=None):
gnid_end = gnid_start
tid_dict = {}
gid_dict = {}
orig_num_chunks = 0
for idx, typename in enumerate(names):
type_counts = counts[idx]
tid_start = np.cumsum([0] + type_counts[:-1])
tid_end = np.cumsum(type_counts)
tid_ranges = list(zip(tid_start, tid_end))
gnid_end += tid_ranges[-1][1]
tid_dict[typename] = tid_ranges
for idx, typename in enumerate(names):
gnid_end += counts[typename]
tid_dict[typename] = [[0, counts[typename]]]
gid_dict[typename] = np.array([gnid_start, gnid_end]).reshape([1, 2])
gnid_start = gnid_end
orig_num_chunks = len(tid_start)
if num_chunks is None:
return tid_dict, gid_dict
assert (
num_chunks <= orig_num_chunks
), "Specified number of chunks should be less/euqual than original numbers of ID ranges."
chunk_list = np.array_split(np.arange(orig_num_chunks), num_chunks)
for typename in tid_dict:
orig_tid_ranges = tid_dict[typename]
tid_ranges = []
for idx in chunk_list:
tid_ranges.append(
(orig_tid_ranges[idx[0]][0], orig_tid_ranges[idx[-1]][-1])
)
tid_dict[typename] = tid_ranges
return tid_dict, gid_dict
def get_ntype_counts_map(ntypes, ntype_counts):
"""
Return a dictionary with key, value pairs as node type names and no. of
nodes of a particular type in the input graph.
Parameters:
-----------
ntypes : list of strings
where each string is a node-type name
ntype_counts : list of integers
where each integer is the total no. of nodes for that, idx, node type
Returns:
--------
dictinary :
a dictionary where node-type names are keys and values are total no.
of nodes for a given node-type name (which is also the key)
"""
return dict(zip(ntypes, ntype_counts))
def memory_snapshot(tag, rank):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment