Unverified Commit 1ab0170a authored by Theodore Vasiloudis's avatar Theodore Vasiloudis Committed by GitHub
Browse files

[Distributed] Ensure round-robin edge file downloads, reduce logging, other improvements. (#5578)


Co-authored-by: default avatarRhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
parent e6eefd1a
"""Functions for partitions. """
import json
import logging
import os
import time
......@@ -191,13 +192,15 @@ def load_partition(part_config, part_id, load_feats=True):
"part_graph" in part_files
), "the partition does not contain graph structure."
partition_path = relative_to_config(part_files["part_graph"])
print(
f"Start to load partition from {partition_path} which is "
f"{os.path.getsize(partition_path)} bytes. It may take non-trivial "
"time for large partition."
logging.info(
"Start to load partition from %s which is "
"%d bytes. It may take non-trivial "
"time for large partition.",
partition_path,
os.path.getsize(partition_path),
)
graph = load_graphs(partition_path)[0][0]
print("Finished loading partition.")
logging.info("Finished loading partition.")
assert (
NID in graph.ndata
......@@ -302,21 +305,23 @@ def load_partition_feats(
node_feats = None
if load_nodes:
feat_path = relative_to_config(part_files["node_feats"])
print(
f"Start to load node data from {feat_path} which is "
f"{os.path.getsize(feat_path)} bytes."
logging.debug(
"Start to load node data from %s which is " "%d bytes.",
feat_path,
os.path.getsize(feat_path),
)
node_feats = load_tensors(feat_path)
print("Finished loading node data.")
logging.info("Finished loading node data.")
edge_feats = None
if load_edges:
feat_path = relative_to_config(part_files["edge_feats"])
print(
f"Start to load edge data from {feat_path} which is "
f"{os.path.getsize(feat_path)} bytes."
logging.debug(
"Start to load edge data from %s which is " "%d bytes.",
feat_path,
os.path.getsize(feat_path),
)
edge_feats = load_tensors(feat_path)
print("Finished loading edge data.")
logging.info("Finished loading edge data.")
# In the old format, the feature name doesn't contain node/edge type.
# For compatibility, let's add node/edge types to the feature names.
if node_feats is not None:
......
......@@ -588,6 +588,6 @@ def create_chunked_dataset(
vector_rows=vector_rows,
**kwargs,
)
print("Done with creating chunked graph")
logging.debug("Done with creating chunked graph")
return g
......@@ -6,67 +6,11 @@ import numpy as np
import pytest
from distpartitioning import array_readwriter, constants
from distpartitioning.parmetis_preprocess import gen_edge_files
from distpartitioning.utils import generate_read_list
from distpartitioning.utils import generate_roundrobin_read_list
from numpy.testing import assert_array_equal
def _get_edge_files(schema_map, rank, num_parts):
"""Returns the edge files processed by each rank.
This function returns only the file names, which are
expected to be created by the function (gen_edge_files())
from the tools/distpartitioning/parmetis_preproc.py module.
Parameters:
----------
schema_map : dict, json
dictionary object created by reading the input graph's
metadata.json file
rank : integer
specifying the rank of the process
num_parts : integer
no. of partitions for the input graph
Returns:
--------
list, str :
specifying the edge file names
list, tuples :
each tuple containing edge file type and delimiter used in the
corresponding edge file
list, str :
specifying the edge type for each of the edge files
"""
edge_data = schema_map[constants.STR_EDGES]
edge_files = [] # used for file names
meta_files = [] # used for storing file types and delimiter
edge_types = [] # used for storing the edge type name
# Iterate over the `edges` key in the input metadata
# its value is a dictionary whose keys are edge names
# and value is a dictionary as well.
for etype_name, etype_info in edge_data.items():
# Get the list of files for this edge type
edge_data_files = etype_info[constants.STR_DATA]
# Get the file type, 'csv' or 'parquet'
edges_format = etype_info[constants.STR_FORMAT][constants.STR_NAME]
# Delimiter used for the edge files
edges_delimiter = None
if edges_format == constants.STR_CSV:
edges_delimiter = etype_info[constants.STR_FORMAT][
constants.STR_FORMAT_DELIMITER
]
# Split the files among the no. of workers
file_idxes = generate_read_list(len(edge_data_files), num_parts)
for idx in file_idxes[rank]:
edge_files.append(edge_data_files[idx])
meta_files.append((edges_format, edges_delimiter))
edge_types.append(etype_name)
# Return the edge file names, format information and file types
return edge_files, meta_files, edge_types
NODE_TYPE = "n1"
EDGE_TYPE = f"{NODE_TYPE}:e1:{NODE_TYPE}"
def _read_file(fname, fmt_name, fmt_delimiter):
......@@ -90,7 +34,7 @@ def _read_file(fname, fmt_name, fmt_delimiter):
return data_df
def _get_test_data(edges_dir, num_chunks, edge_fmt="csv", edge_fmt_del=" "):
def _get_test_data(edges_dir, num_chunks, edge_fmt, edge_fmt_del):
"""Creates unit test input which are a set of edge files
in the following format "src_node_id<delimiter>dst_node_id"
......@@ -113,20 +57,21 @@ def _get_test_data(edges_dir, num_chunks, edge_fmt="csv", edge_fmt_del=" "):
"""
schema = {}
schema["num_nodes_per_type"] = [10]
schema["edge_type"] = ["n1:e1:n1"]
schema["node_type"] = ["n1"]
schema["edge_type"] = [EDGE_TYPE]
schema["node_type"] = [NODE_TYPE]
edges = {}
edges["n1:e1:n1"] = {}
edges["n1:e1:n1"]["format"] = {}
edges["n1:e1:n1"]["format"]["name"] = edge_fmt
edges["n1:e1:n1"]["format"]["delimiter"] = edge_fmt_del
edges[EDGE_TYPE] = {}
edges[EDGE_TYPE]["format"] = {}
edges[EDGE_TYPE]["format"]["name"] = edge_fmt
edges[EDGE_TYPE]["format"]["delimiter"] = edge_fmt_del
os.makedirs(edges_dir, exist_ok=True)
fmt_meta = {"name": edge_fmt}
if edge_fmt == "csv":
fmt_meta["delimiter"] = edge_fmt_del
edge_files = []
for idx in range(num_chunks):
path = os.path.join(edges_dir, f"test_file_{idx}.{fmt_meta['name']}")
array_parser = array_readwriter.get_array_parser(**fmt_meta)
......@@ -135,8 +80,9 @@ def _get_test_data(edges_dir, num_chunks, edge_fmt="csv", edge_fmt_del=" "):
)
array_parser.write(path, edge_data)
edge_files = [path]
edges["n1:e1:n1"]["data"] = edge_files
edge_files.append(path)
edges[EDGE_TYPE]["data"] = edge_files
schema["edges"] = edges
return schema
......@@ -163,79 +109,69 @@ def test_gen_edge_files(num_chunks, num_parts, edges_fmt, edges_delimiter):
# Create the input dataset
with tempfile.TemporaryDirectory() as root_dir:
# Prepare the state information for firing unit test
# Create expected environment for test
input_dir = os.path.join(root_dir, "chunked-data")
output_dir = os.path.join(root_dir, "preproc_dir")
# Get the parser object
# Mock a parser object
fn_params = namedtuple("fn_params", "input_dir output_dir num_parts")
fn_params.input_dir = input_dir
fn_params.output_dir = output_dir
fn_params.num_parts = num_parts
# Read the input schema
# Create test files and get corresponding file schema
schema_map = _get_test_data(
input_dir, num_chunks, edges_fmt, edges_delimiter
)
edges_file_list = schema_map["edges"][EDGE_TYPE]["data"]
# This is breaking encapsulation, but no other good way to get file list
rank_assignments = generate_roundrobin_read_list(
len(edges_file_list), num_parts
)
# Get the global node id offsets for each node type
# There is only one node-type in the test graph
# which range from 0 thru 9.
ntype_gnid_offset = {}
ntype_gnid_offset["n1"] = np.array([0, 10 * num_chunks]).reshape(1, 2)
ntype_gnid_offset[NODE_TYPE] = np.array([0, 10 * num_chunks]).reshape(
1, 2
)
# Iterate over no. of partitions
for rank in range(num_parts):
# Fire the unit test case and get the results
actual_results = gen_edge_files(rank, schema_map, fn_params)
# Get the gold results for baseline comparision, expected results
baseline_results, fmt_results, edge_types = _get_edge_files(
schema_map, rank, num_parts
)
# Get the original files
original_files = [
edges_file_list[file_idx] for file_idx in rank_assignments[rank]
]
# Validate the results with the baseline results
# Test 1. no. of files should have the same count per rank
assert len(baseline_results) == len(actual_results)
assert len(original_files) == len(actual_results)
assert len(actual_results) > 0
# Test 2. Check the contents of each file and verify the
# file contents match with the expected results.
for idx, fname in enumerate(actual_results):
# Check the file exists
assert os.path.isfile(fname)
edge_file = fname.split("/")[-1]
# ``edgetype`` strings are in canonical format,
# src_node_type:edge_type:dst_node_type
tokens = edge_types[idx].split(":")
src_ntype_name = tokens[0]
dst_ntype_name = tokens[2]
for actual_fname, original_fname in zip(
actual_results, original_files
):
# Check the actual file exists
assert os.path.isfile(actual_fname)
# Read both files and compare the edges
# Here note that the src and dst end points are global_node_ids
target_file = os.path.join(output_dir, f"{edge_file}")
target_data = _read_file(target_file, "csv", " ")
actual_data = _read_file(actual_fname, "csv", " ")
expected_data = _read_file(
original_fname, edges_fmt, edges_delimiter
)
# Subtract the global node id offsets, so that we get type node ids
# In the current unit test case, the graph has only one node-type.
# and this means that type-node-ids are same as the global-node-ids.
# Below two lines will take take into effect when the graphs have
# more than one node type.
target_data[:, 0] -= ntype_gnid_offset[src_ntype_name][0, 0]
target_data[:, 1] -= ntype_gnid_offset[dst_ntype_name][0, 0]
# Now compare with the edge files from the input dataset
fmt_type = fmt_results[idx][0]
fmt_delimiter = fmt_results[idx][1]
# Extract the source file name here.
# it should have a prefix `edges_`<edge_file>
source_file = os.path.join(
input_dir, "".join(edge_file.split("edges_"))
)
source_data = _read_file(source_file, fmt_type, fmt_delimiter)
actual_data[:, 0] -= ntype_gnid_offset[NODE_TYPE][0, 0]
actual_data[:, 1] -= ntype_gnid_offset[NODE_TYPE][0, 0]
# Verify that the contents are equal
assert np.all(target_data == source_data)
assert_array_equal(expected_data, actual_data)
......@@ -130,9 +130,11 @@ def main():
)
parser.add_argument(
"--log-level",
required=False,
type=str,
default="info",
help="To enable log level for debugging purposes. Available options: (Critical, Error, Warning, Info, Debug, Notset)",
help="Log level to use for execution.",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
parser.add_argument(
"--python-path",
......@@ -167,19 +169,21 @@ def main():
"from high to low is ``coo``, ``csc``, ``csr``.",
)
args, udf_command = parser.parse_known_args()
args, _ = parser.parse_known_args()
fmt = "%(asctime)s %(levelname)s %(message)s"
logging.basicConfig(
format=fmt,
level=getattr(logging, args.log_level, None),
)
assert os.path.isdir(args.in_dir)
assert os.path.isdir(args.partitions_dir)
assert os.path.isfile(args.ip_config)
assert isinstance(args.log_level, str)
assert isinstance(args.master_port, int)
tokens = sys.executable.split(os.sep)
submit_jobs(args)
if __name__ == "__main__":
fmt = "%(asctime)s %(levelname)s %(message)s"
logging.basicConfig(format=fmt, level=logging.INFO)
main()
......@@ -13,7 +13,7 @@ class CSVArrayParser(object):
self.delimiter = delimiter
def read(self, path):
logging.info(
logging.debug(
"Reading from %s using CSV format with configuration %s"
% (path, self.__dict__)
)
......@@ -23,11 +23,11 @@ class CSVArrayParser(object):
arr = pyarrow.csv.read_csv(
path, read_options=read_options, parse_options=parse_options
)
logging.info("Done reading from %s" % path)
logging.debug("Done reading from %s" % path)
return arr.to_pandas().to_numpy()
def write(self, path, arr):
logging.info(
logging.debug(
"Writing to %s using CSV format with configuration %s"
% (path, self.__dict__)
)
......@@ -36,4 +36,4 @@ class CSVArrayParser(object):
)
arr = pyarrow.Table.from_pandas(pd.DataFrame(arr))
pyarrow.csv.write_csv(arr, path, write_options=write_options)
logging.info("Done writing to %s" % path)
logging.debug("Done writing to %s" % path)
......@@ -12,15 +12,15 @@ class NumpyArrayParser(object):
pass
def read(self, path):
logging.info("Reading from %s using numpy format" % path)
logging.debug("Reading from %s using numpy format" % path)
arr = np.load(path, mmap_mode="r")
logging.info("Done reading from %s" % path)
logging.debug("Done reading from %s" % path)
return arr
def write(self, path, arr):
logging.info("Writing to %s using numpy format" % path)
logging.debug("Writing to %s using numpy format" % path)
# np.save would load the entire memmap array up into CPU. So we manually open
# an empty npy file with memmap mode and manually flush it instead.
new_arr = open_memmap(path, mode="w+", dtype=arr.dtype, shape=arr.shape)
new_arr[:] = arr[:]
logging.info("Done writing to %s" % path)
logging.debug("Done writing to %s" % path)
......@@ -14,7 +14,7 @@ class ParquetArrayParser(object):
pass
def read(self, path):
logging.info("Reading from %s using parquet format" % path)
logging.debug("Reading from %s using parquet format" % path)
metadata = pyarrow.parquet.read_metadata(path)
metadata = metadata.schema.to_arrow_schema().metadata
......@@ -36,16 +36,16 @@ class ParquetArrayParser(object):
else:
arr = table.to_pandas().to_numpy()
if not shape:
logging.warning(
logging.debug(
"Shape information not found in the metadata, read the data as "
"a 2 dim array."
)
logging.info("Done reading from %s" % path)
logging.debug("Done reading from %s" % path)
shape = tuple(eval(shape.decode())) if shape else arr.shape
return arr.reshape(shape)
def write(self, path, array, vector_rows=False):
logging.info("Writing to %s using parquet format" % path)
logging.debug("Writing to %s using parquet format" % path)
shape = array.shape
if len(shape) > 2:
array = array.reshape(shape[0], -1)
......@@ -53,10 +53,10 @@ class ParquetArrayParser(object):
table = pyarrow.table(
[pyarrow.array(array.tolist())], names=["vector"]
)
logging.info("Writing to %s using single-vector rows..." % path)
logging.debug("Writing to %s using single-vector rows..." % path)
else:
table = pyarrow.Table.from_pandas(pd.DataFrame(array))
table = table.replace_schema_metadata({"shape": str(shape)})
pyarrow.parquet.write_table(table, path)
logging.info("Done writing to %s" % path)
logging.debug("Done writing to %s" % path)
......@@ -23,7 +23,7 @@ def log_params(params):
print("No. partitions: ", params.num_parts)
print("Output Dir: ", params.output)
print("WorldSize: ", params.world_size)
print("Metis partitions: ", params.partitions_file)
print("Metis partitions: ", params.partitions_dir)
if __name__ == "__main__":
......
......@@ -204,7 +204,7 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup):
dist.barrier()
# Prepare data for each rank in the cluster.
start = timer()
timer_start = timer()
CHUNK_SIZE = 100 * 1000 * 1000 # 100 * 8 * 5 = 1 * 4 = 8 GB/message/node
num_edges = edge_data[constants.GLOBAL_SRC_ID].shape[0]
......@@ -219,7 +219,7 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup):
LOCAL_CHUNK_SIZE = (num_edges // num_chunks) + (
0 if (num_edges % num_chunks == 0) else 1
)
logging.info(
logging.debug(
f"[Rank: {rank} Edge Data Shuffle - max_edges: {max_edges}, \
local_edges: {num_edges} and num_chunks: {num_chunks} \
Total edges: {all_edges} Local_CHUNK_SIZE: {LOCAL_CHUNK_SIZE}"
......@@ -233,18 +233,24 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup):
local_eids = []
for chunk in range(num_chunks):
start = chunk * LOCAL_CHUNK_SIZE
end = (chunk + 1) * LOCAL_CHUNK_SIZE
chunk_start = chunk * LOCAL_CHUNK_SIZE
chunk_end = (chunk + 1) * LOCAL_CHUNK_SIZE
logging.info(
logging.debug(
f"[Rank: {rank}] EdgeData Shuffle: processing \
local_part_id: {local_part_id} and chunkid: {chunk}"
)
cur_src_id = edge_data[constants.GLOBAL_SRC_ID][start:end]
cur_dst_id = edge_data[constants.GLOBAL_DST_ID][start:end]
cur_type_eid = edge_data[constants.GLOBAL_TYPE_EID][start:end]
cur_etype_id = edge_data[constants.ETYPE_ID][start:end]
cur_eid = edge_data[constants.GLOBAL_EID][start:end]
cur_src_id = edge_data[constants.GLOBAL_SRC_ID][
chunk_start:chunk_end
]
cur_dst_id = edge_data[constants.GLOBAL_DST_ID][
chunk_start:chunk_end
]
cur_type_eid = edge_data[constants.GLOBAL_TYPE_EID][
chunk_start:chunk_end
]
cur_etype_id = edge_data[constants.ETYPE_ID][chunk_start:chunk_end]
cur_eid = edge_data[constants.GLOBAL_EID][chunk_start:chunk_end]
input_list = []
owner_ids = id_lookup.get_partition_ids(cur_dst_id)
......@@ -307,9 +313,9 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup):
shuffle_edge_total = np.sum(shuffle_edge_counts)
assert shuffle_edge_total == all_edges
end = timer()
timer_end = timer()
logging.info(
f"[Rank: {rank}] Time to send/rcv edge data: {timedelta(seconds=end-start)}"
f"[Rank: {rank}] Time to send/rcv edge data: {timedelta(seconds=timer_end-timer_start)}"
)
# Clean up.
......@@ -406,7 +412,7 @@ def exchange_feature(
assert len(tokens) == 3
local_feat_key = "/".join(tokens[:-1]) + "/" + str(local_part_id)
logging.info(
logging.debug(
f"[Rank: {rank} feature: {feat_key}, gid_start - {gid_start} and gid_end - {gid_end}"
)
......@@ -423,7 +429,7 @@ def exchange_feature(
assert data is not None
global_eids = np.arange(gid_start, gid_end, dtype=np.int64)
if data[constants.GLOBAL_EID].shape[0] > 0:
logging.info(
logging.debug(
f"[Rank: {rank} disk read global eids - min - {np.amin(data[constants.GLOBAL_EID])}, max - {np.amax(data[constants.GLOBAL_EID])}, count - {data[constants.GLOBAL_EID].shape}"
)
......@@ -432,8 +438,12 @@ def exchange_feature(
common, idx1, idx2 = np.intersect1d(
data[constants.GLOBAL_EID], global_eids, return_indices=True
)
assert common.shape[0] == idx2.shape[0]
assert common.shape[0] == global_eids.shape[0]
assert (
common.shape[0] == idx2.shape[0]
), f"Rank {rank}: {common.shape[0]} != {idx2.shape[0]}"
assert (
common.shape[0] == global_eids.shape[0]
), f"Rank {rank}: {common.shape[0]} != {global_eids.shape[0]}"
global_dst_nids = data[constants.GLOBAL_DST_ID][idx1]
assert np.all(global_eids == data[constants.GLOBAL_EID][idx1])
......@@ -450,7 +460,7 @@ def exchange_feature(
[feat_dim_len], world_size, num_parts, return_sizes=True
)
if all_lens[0] <= 0:
logging.info(
logging.debug(
f"[Rank: {rank} No process has any feature data to shuffle for {local_feat_key}"
)
return cur_features, cur_global_ids
......@@ -470,7 +480,7 @@ def exchange_feature(
feat_dims_dtype = list(np.zeros((rank0_shape_len), dtype=np.int64))
feat_dims_dtype.append(DATA_TYPE_ID[torch.float32])
logging.info(f"Sending the feature shape information - {feat_dims_dtype}")
logging.debug(f"Sending the feature shape information - {feat_dims_dtype}")
all_dims_dtype = allgather_sizes(
feat_dims_dtype, world_size, num_parts, return_sizes=True
)
......@@ -497,7 +507,7 @@ def exchange_feature(
torch.from_numpy(gids_per_partid).type(torch.int64)
)
for idx, tt in enumerate(feats_per_rank):
logging.info(
logging.debug(
f"[Rank: {rank} features shape - {tt.shape} and ids - {global_id_per_rank[idx].shape}"
)
......@@ -509,7 +519,7 @@ def exchange_feature(
output_id_list = alltoallv_cpu(
rank, world_size, global_id_per_rank, retain_nones=False
)
logging.info(
logging.debug(
f"[Rank : {rank} feats - {output_feat_list}, ids - {output_id_list}"
)
assert len(output_feat_list) == len(output_id_list), (
......@@ -632,7 +642,7 @@ def exchange_features(
assert len(tokens) == 3
type_name = tokens[0]
feat_name = tokens[1]
logging.info(f"[Rank: {rank}] processing feature: {feat_key}")
logging.debug(f"[Rank: {rank}] processing feature: {feat_key}")
for feat_info in type_info:
# Compute the global_id range for this feature data
......@@ -675,7 +685,7 @@ def exchange_features(
f"[Rank: {rank}] Total time for feature exchange: {timedelta(seconds = end - start)}"
)
for k, v in own_features.items():
logging.info(f"Rank: {rank}] Key - {k} Value - {v.shape}")
logging.debug(f"Rank: {rank}] Key - {k} Value - {v.shape}")
return own_features, own_global_ids
......@@ -761,7 +771,7 @@ def exchange_graph_data(
was performed in the `exchange_features` function call
"""
memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
logging.info(f"[Rank: {rank} - node_feat_tids - {node_feat_tids}")
logging.debug(f"[Rank: {rank} - node_feat_tids - {node_feat_tids}")
rcvd_node_features, rcvd_global_nids = exchange_features(
rank,
world_size,
......@@ -775,7 +785,7 @@ def exchange_graph_data(
)
dist.barrier()
memory_snapshot("ShuffleNodeFeaturesComplete: ", rank)
logging.info(f"[Rank: {rank}] Done with node features exchange.")
logging.debug(f"[Rank: {rank}] Done with node features exchange.")
rcvd_edge_features, rcvd_global_eids = exchange_features(
rank,
......@@ -789,7 +799,7 @@ def exchange_graph_data(
edge_data,
)
dist.barrier()
logging.info(f"[Rank: {rank}] Done with edge features exchange.")
logging.debug(f"[Rank: {rank}] Done with edge features exchange.")
node_data = gen_node_data(
rank, world_size, num_parts, id_lookup, ntid_ntype_map, schema_map
......@@ -881,13 +891,12 @@ def read_dataset(rank, world_size, id_lookup, params, schema_map, ntype_counts):
# Synchronize so that everybody completes reading dataset from disk
dist.barrier()
logging.info(f"[Rank: {rank}] Done reading dataset {params.input_dir}")
dist.barrier() # SYNCH
edge_data = augment_edge_data(
edge_data, id_lookup, edge_tids, rank, world_size, params.num_parts
)
dist.barrier() # SYNCH
logging.info(
logging.debug(
f"[Rank: {rank}] Done augmenting edge_data: {len(edge_data)}, {edge_data[constants.GLOBAL_SRC_ID].shape}"
)
......@@ -1164,14 +1173,14 @@ def gen_dist_partitions(rank, world_size, params):
schema_map,
)
gc.collect()
logging.info(f"[Rank: {rank}] Done with data shuffling...")
logging.debug(f"[Rank: {rank}] Done with data shuffling...")
memory_snapshot("DataShuffleComplete: ", rank)
# sort node_data by ntype
node_data = reorder_data(
params.num_parts, world_size, node_data, constants.NTYPE_ID
)
logging.info(f"[Rank: {rank}] Sorted node_data by node_type")
logging.debug(f"[Rank: {rank}] Sorted node_data by node_type")
memory_snapshot("NodeDataSortComplete: ", rank)
# resolve global_ids for nodes
......@@ -1180,7 +1189,7 @@ def gen_dist_partitions(rank, world_size, params):
assign_shuffle_global_nids_nodes(
rank, world_size, params.num_parts, node_data
)
logging.info(f"[Rank: {rank}] Done assigning global-ids to nodes...")
logging.debug(f"[Rank: {rank}] Done assigning global-ids to nodes...")
memory_snapshot("ShuffleGlobalID_Nodes_Complete: ", rank)
# shuffle node feature according to the node order on each rank.
......@@ -1215,7 +1224,7 @@ def gen_dist_partitions(rank, world_size, params):
edge_data = reorder_data(
params.num_parts, world_size, edge_data, constants.ETYPE_ID
)
logging.info(f"[Rank: {rank}] Sorted edge_data by edge_type")
logging.debug(f"[Rank: {rank}] Sorted edge_data by edge_type")
memory_snapshot("EdgeDataSortComplete: ", rank)
# Synchronize before assigning shuffle-global-nids for edges end points.
......@@ -1223,7 +1232,7 @@ def gen_dist_partitions(rank, world_size, params):
shuffle_global_eid_offsets = assign_shuffle_global_nids_edges(
rank, world_size, params.num_parts, edge_data
)
logging.info(f"[Rank: {rank}] Done assigning global_ids to edges ...")
logging.debug(f"[Rank: {rank}] Done assigning global_ids to edges ...")
memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank)
......@@ -1258,7 +1267,7 @@ def gen_dist_partitions(rank, world_size, params):
edge_data = lookup_shuffle_global_nids_edges(
rank, world_size, params.num_parts, edge_data, id_lookup, node_data
)
logging.info(
logging.debug(
f"[Rank: {rank}] Done resolving orig_node_id for local node_ids..."
)
memory_snapshot("ShuffleGlobalID_Lookup_Complete: ", rank)
......@@ -1388,7 +1397,6 @@ def single_machine_run(params):
Argument Parser structure with pre-determined arguments as defined
at the bottom of this file.
"""
log_params(params)
processes = []
mp.set_start_method("spawn")
......
......@@ -65,14 +65,14 @@ def _broadcast_shape(
torch.zeros_like(data_shape) for _ in range(world_size)
]
dist.all_gather(data_shape_output, data_shape)
logging.info(
logging.debug(
f"[Rank: {rank} Received shapes from all ranks: {data_shape_output}"
)
shapes = [x.numpy() for x in data_shape_output if x[0] != 0]
shapes = np.vstack(shapes)
if is_feat_data:
logging.info(
logging.debug(
f"shapes: {shapes}, condition: {all(shapes[0,2] == s for s in shapes[:,2])}"
)
assert all(
......@@ -84,7 +84,7 @@ def _broadcast_shape(
tid_start = np.cumsum([0] + type_counts[:-1])
tid_end = np.cumsum(type_counts)
tid_ranges = list(zip(tid_start, tid_end))
logging.info(f"starts -> {tid_start} ... end -> {tid_end}")
logging.debug(f"starts -> {tid_start} ... end -> {tid_end}")
return tid_ranges
......@@ -220,11 +220,11 @@ def get_dataset(
num_chunks=num_parts,
)
"""
logging.info(f"[Rank: {rank} ntype_counts: {ntype_counts}")
logging.debug(f"[Rank: {rank} ntype_counts: {ntype_counts}")
ntype_gnid_offset = get_gid_offsets(
schema_map[constants.STR_NODE_TYPE], ntype_counts
)
logging.info(f"[Rank: {rank} - ntype_gnid_offset = {ntype_gnid_offset}")
logging.debug(f"[Rank: {rank} - ntype_gnid_offset = {ntype_gnid_offset}")
# iterate over the "node_data" dictionary in the schema_map
# read the node features if exists
......@@ -237,6 +237,7 @@ def get_dataset(
constants.STR_NUMPY,
constants.STR_PARQUET,
]
# It is guaranteed that num_chunks is always greater
# than num_partitions.
node_data = []
......@@ -269,7 +270,7 @@ def get_dataset(
True,
f"{ntype_name}/{feat_name}",
)
logging.info(f"[Rank: {rank} - cur_tids: {cur_tids}")
logging.debug(f"[Rank: {rank} - cur_tids: {cur_tids}")
# collect data on current rank.
for local_part_id in range(num_parts):
......@@ -290,7 +291,7 @@ def get_dataset(
# done building node_features locally.
if len(node_features) <= 0:
logging.info(
logging.debug(
f"[Rank: {rank}] This dataset does not have any node features"
)
else:
......@@ -305,7 +306,7 @@ def get_dataset(
if feat_info == None:
continue
logging.info(
logging.debug(
f"[Rank: {rank}] node feature name: {feat_name}, feature data shape: {feat_info.size()}"
)
tokens = feat_name.split("/")
......@@ -385,7 +386,7 @@ def get_dataset(
data_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(data_file):
data_file = os.path.join(input_dir, data_file)
logging.info(
logging.debug(
f"[Rank: {rank}] Loading edges-feats of {etype_name}[{feat_name}] from {data_file}"
)
edge_data.append(
......@@ -428,7 +429,7 @@ def get_dataset(
# Done with building node_features locally.
if len(edge_features) <= 0:
logging.info(
logging.debug(
f"[Rank: {rank}] This dataset does not have any edge features"
)
else:
......@@ -437,7 +438,7 @@ def get_dataset(
for k, v in edge_features.items():
if v == None:
continue
logging.info(
logging.debug(
f"[Rank: {rank}] edge feature name: {k}, feature data shape: {v.shape}"
)
tids = edge_feature_tids[k]
......@@ -516,7 +517,6 @@ def get_dataset(
dst_ntype_name = tokens[2]
num_chunks = len(edge_info)
# read_list = generate_read_list(num_chunks, num_parts)
read_list = generate_read_list(num_chunks, world_size)
src_ids = []
dst_ids = []
......@@ -533,7 +533,7 @@ def get_dataset(
edge_file = edge_info[idx]
if not os.path.isabs(edge_file):
edge_file = os.path.join(input_dir, edge_file)
logging.info(
logging.debug(
f"[Rank: {rank}] Loading edges of etype[{etype_name}] from {edge_file}"
)
......@@ -635,7 +635,7 @@ def get_dataset(
edge_datadict[constants.GLOBAL_TYPE_EID].shape
== edge_datadict[constants.ETYPE_ID].shape
)
logging.info(
logging.debug(
f"[Rank: {rank}] Done reading edge_file: {len(edge_datadict)}, {edge_datadict[constants.GLOBAL_SRC_ID].shape}"
)
else:
......@@ -648,7 +648,7 @@ def get_dataset(
edge_datadict[constants.GLOBAL_TYPE_EID] = np.array([], dtype=np.int64)
edge_datadict[constants.ETYPE_ID] = np.array([], dtype=np.int64)
logging.info(f"Rank: {rank} edge_feat_tids: {edge_feature_tids}")
logging.debug(f"Rank: {rank} edge_feat_tids: {edge_feature_tids}")
return (
node_features,
......
......@@ -64,7 +64,7 @@ class DistLookupService:
for ntype in ntype_names:
filename = f"{ntype}.txt"
logging.info(
logging.debug(
f"[Rank: {rank}] Reading file: {os.path.join(input_dir, filename)}"
)
......@@ -109,7 +109,7 @@ class DistLookupService:
# Explicitly release the array read from the file.
del ntype_partids
logging.info(
logging.debug(
f"[Rank: {rank}] ntypeid begin - {type_nid_begin} - {type_nid_end}"
)
......@@ -168,7 +168,7 @@ class DistLookupService:
max_count = np.amax(all_sizes)
if max_count <= 0:
logging.info(
logging.debug(
f"[Rank: {self.rank}] No process has global_nids to process !!!"
)
return
......@@ -177,7 +177,7 @@ class DistLookupService:
LOCAL_CHUNK_SIZE = np.ceil(local_rows / num_splits).astype(np.int64)
agg_partition_ids = []
logging.info(
logging.debug(
f"[Rank: {self.rank}] BatchSize: {CHUNK_SIZE}, \
max_count: {max_count}, \
splits: {num_splits}, \
......
......@@ -2,10 +2,7 @@ import argparse
import logging
import os
import platform
import sys
from datetime import timedelta
from pathlib import Path
from timeit import default_timer as timer
import array_readwriter
......@@ -14,9 +11,13 @@ import constants
import numpy as np
import pyarrow
import pyarrow.csv as csv
import pyarrow.parquet as pq
import torch
from utils import generate_read_list, get_idranges, get_node_types, read_json
from utils import (
generate_read_list,
generate_roundrobin_read_list,
get_idranges,
get_node_types,
read_json,
)
def get_proc_info():
......@@ -43,6 +44,26 @@ def get_proc_info():
return 0
def get_world_size():
"""Helper function to get the world size from the
environment when `mpirun` is used to run this python program.
Returns:
--------
integer :
Numer of processes created by the executor that created this process.
"""
env_variables = dict(os.environ)
# mpich
if "PMI_SIZE" in env_variables:
return int(env_variables["PMI_SIZE"])
# openmpi
elif "OMPI_COMM_WORLD_SIZE" in env_variables:
return int(env_variables["OMPI_COMM_WORLD_SIZE"])
else:
return 1
def gen_edge_files(rank, schema_map, params):
"""Function to create edges files to be consumed by ParMETIS
for partitioning purposes.
......@@ -64,7 +85,7 @@ def gen_edge_files(rank, schema_map, params):
output : string
Location of storing the node-weights and edge files for ParMETIS.
"""
type_nid_dict, ntype_gnid_offset = get_idranges(
_, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE],
dict(
zip(
......@@ -76,24 +97,9 @@ def gen_edge_files(rank, schema_map, params):
# Regenerate edge files here.
edge_data = schema_map[constants.STR_EDGES]
etype_names = schema_map[constants.STR_EDGE_TYPE]
etype_name_idmap = {e: idx for idx, e in enumerate(etype_names)}
outdir = Path(params.output_dir)
os.makedirs(outdir, exist_ok=True)
edge_files = []
num_parts = params.num_parts
for etype_name, etype_info in edge_data.items():
edges_format = etype_info[constants.STR_FORMAT][constants.STR_NAME]
edge_data_files = etype_info[constants.STR_DATA]
# ``edgetype`` strings are in canonical format, src_node_type:edge_type:dst_node_type
tokens = etype_name.split(":")
assert len(tokens) == 3
src_ntype_name = tokens[0]
rel_name = tokens[1]
dst_ntype_name = tokens[2]
def process_and_write_back(data_df, idx):
data_f0 = data_df[:, 0]
......@@ -104,12 +110,13 @@ def gen_edge_files(rank, schema_map, params):
cols = [global_src_id, global_dst_id]
col_names = ["global_src_id", "global_dst_id"]
out_file = edge_data_files[idx].split("/")[-1]
out_file = os.path.join(outdir, "edges_{}".format(out_file))
out_file_name = Path(edge_data_files[idx]).stem.split(".")[0]
out_file = os.path.join(
outdir, etype_name, f"edges_{out_file_name}.csv"
)
os.makedirs(os.path.dirname(out_file), exist_ok=True)
# TODO(thvasilo): We should support writing to the same format as the input
options = csv.WriteOptions(include_header=False, delimiter=" ")
options.delimiter = " "
csv.write_csv(
pyarrow.Table.from_arrays(cols, names=col_names),
out_file,
......@@ -117,9 +124,22 @@ def gen_edge_files(rank, schema_map, params):
)
return out_file
# handle any no. of files case here.
file_idxes = generate_read_list(len(edge_data_files), params.num_parts)
for idx in file_idxes[rank]:
edge_files = []
for etype_name, etype_info in edge_data.items():
edge_data_files = etype_info[constants.STR_DATA]
# ``edgetype`` strings are in canonical format, src_node_type:edge_type:dst_node_type
tokens = etype_name.split(":")
assert len(tokens) == 3
src_ntype_name = tokens[0]
dst_ntype_name = tokens[2]
rank_assignments = generate_roundrobin_read_list(
len(edge_data_files), params.num_parts
)
for file_idx in rank_assignments[rank]:
reader_fmt_meta = {
"name": etype_info[constants.STR_FORMAT][constants.STR_NAME],
}
......@@ -128,9 +148,9 @@ def gen_edge_files(rank, schema_map, params):
constants.STR_FORMAT_DELIMITER
]
data_df = array_readwriter.get_array_parser(**reader_fmt_meta).read(
os.path.join(params.input_dir, edge_data_files[idx])
os.path.join(params.input_dir, edge_data_files[file_idx])
)
out_file = process_and_write_back(data_df, idx)
out_file = process_and_write_back(data_df, file_idx)
edge_files.append(out_file)
return edge_files
......@@ -315,11 +335,11 @@ def gen_parmetis_input_args(params, schema_map):
logging.info(f" tid-start = {tid_start}, tid-end = {tid_end}")
logging.info(f" per_rank_range - {per_rank_range}")
for rank in range(params.num_parts):
local_tid_start = tid_start[rank]
local_tid_end = tid_end[rank]
for part_idx in range(params.num_parts):
local_tid_start = tid_start[part_idx]
local_tid_end = tid_end[part_idx]
out_file = os.path.join(
outdir, "node_weights_{}_{}.txt".format(ntype_name, rank)
outdir, "node_weights_{}_{}.txt".format(ntype_name, part_idx)
)
node_files.append(
(
......@@ -329,27 +349,33 @@ def gen_parmetis_input_args(params, schema_map):
)
)
nfile = open(os.path.join(params.output_dir, "parmetis_nfiles.txt"), "w")
for f in node_files:
with open(
os.path.join(params.output_dir, "parmetis_nfiles.txt"), "w"
) as parmetis_nf:
for node_file in node_files:
# format: filename global_node_id_start global_node_id_end(exclusive)
nfile.write("{} {} {}\n".format(f[0], f[1], f[2]))
nfile.close()
parmetis_nf.write(
"{} {} {}\n".format(node_file[0], node_file[1], node_file[2])
)
# Regenerate edge files here.
# NOTE: The file names need to match the ones generated by gen_edge_files function
edge_data = schema_map[constants.STR_EDGES]
edge_files = []
for etype_name, etype_info in edge_data.items():
edge_data_files = etype_info[constants.STR_DATA]
for edge_file_path in edge_data_files:
out_file = os.path.basename(edge_file_path)
out_file = os.path.join(outdir, "edges_{}".format(out_file))
out_file_name = Path(edge_file_path).stem.split(".")[0]
out_file = os.path.join(
outdir, etype_name, "edges_{}.csv".format(out_file_name)
)
edge_files.append(out_file)
with open(
os.path.join(params.output_dir, "parmetis_efiles.txt"), "w"
) as efile:
for f in edge_files:
efile.write("{}\n".format(f))
) as parmetis_efile:
for edge_file in edge_files:
parmetis_efile.write("{}\n".format(edge_file))
def run_preprocess_data(params):
......@@ -360,23 +386,23 @@ def run_preprocess_data(params):
params : argparser object
An instance of argparser class which stores command line arguments.
"""
logging.info(f"Starting to generate ParMETIS files...")
logging.info("Starting to generate ParMETIS files...")
rank = get_proc_info()
assert os.path.isdir(
params.input_dir
), f"Please check `input_dir` argument."
), f"Please check `input_dir` argument: {params.input_dit}."
schema_map = read_json(os.path.join(params.input_dir, params.schema_file))
gen_node_weights_files(schema_map, params)
logging.info(f"Done with node weights....")
logging.info("Done with node weights....")
gen_edge_files(rank, schema_map, params)
logging.info(f"Done with edge weights...")
logging.info("Done with edge weights...")
if rank == 0:
gen_parmetis_input_args(params, schema_map)
logging.info(f"Done generating files for ParMETIS run ..")
logging.info("Done generating files for ParMETIS run ..")
if __name__ == "__main__":
......@@ -414,11 +440,19 @@ if __name__ == "__main__":
type=int,
help="Total no. of output graph partitions.",
)
parser.add_argument(
"--log_level",
required=False,
type=str,
help="Log level to use for execution.",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
params = parser.parse_args()
# Configure logging.
logging.basicConfig(
level="INFO",
level=getattr(logging, params.log_level, None),
format=f"[{platform.node()} \
%(levelname)s %(asctime)s PID:%(process)d] %(message)s",
)
......
import json
import logging
import os
from itertools import cycle
import constants
......@@ -689,7 +690,10 @@ def map_partid_rank(partid, world_size):
def generate_read_list(num_files, world_size):
"""Generate the file IDs to read for each rank.
"""
Generate the file IDs to read for each rank
using sequential assignment.
Parameters:
-----------
......@@ -701,7 +705,11 @@ def generate_read_list(num_files, world_size):
Returns:
--------
read_list : np.array
Array of target file IDs to read.
Array of target file IDs to read. Each worker is expected
to read the list of file indexes in its rank's index in the list.
e.g. rank 0 reads the file indexed in read_list[0], rank 1 the
ones in read_list[1] etc.
Examples
--------
......@@ -709,3 +717,35 @@ def generate_read_list(num_files, world_size):
[array([0, 1, 2]), array([3, 4, 5]), array([6, 7]), array([8, 9])]
"""
return np.array_split(np.arange(num_files), world_size)
def generate_roundrobin_read_list(num_files, world_size):
"""
Generate the file IDs to read for each rank
using round robin assignment.
Parameters:
-----------
num_files : int
Total number of files.
world_size : int
World size of group.
Returns:
--------
read_list : np.array
Array of target file IDs to read. Each worker is expected
to read the list of file indexes in its rank's index in the list.
e.g. rank 0 reads the indexed in read_list[0], rank 1 the
ones in read_list[1] etc.
Examples
--------
>>> tools.distpartitionning.utils.generate_roundrobin_read_list(10, 4)
[[0, 4, 8], [1, 5, 9], [2, 6], [3, 7]]
"""
assignment_lists = [[] for _ in range(world_size)]
for rank, part_idx in zip(cycle(range(world_size)), range(num_files)):
assignment_lists[rank].append(part_idx)
return assignment_lists
......@@ -3,7 +3,6 @@ import argparse
import json
import logging
import os
import sys
import numpy as np
from base import dump_partition_meta, PartitionMeta
......@@ -12,7 +11,7 @@ from files import setdir
def _random_partition(metadata, num_parts):
num_nodes_per_type = [sum(_) for _ in metadata["num_nodes_per_chunk"]]
num_nodes_per_type = metadata["num_nodes_per_type"]
ntypes = metadata["node_type"]
for ntype, n in zip(ntypes, num_nodes_per_type):
logging.info("Generating partition for node type %s" % ntype)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment