Unverified Commit 432c71ef authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Code changes to fix order sensitivity of the pipeline (#5288)



Following changes are made in this PR.
1. In dataset_utils.py, when reading edges from disk we follow the order defined by the STR_EDGE_TYPE key in the metadata.json file. This order is implicitly used to assign edgeid to edge types. This same order is used to read edges from the disk as well.
2. Now the unit test framework will also randomize the order of edges read from the disk. This is done for the edges when reading from the disk for the unit tests.
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
parent f5afc6ea
import os
import json import json
import logging import logging
import numpy as np import os
import torch
import dgl import dgl
import numpy as np
import torch
from distpartitioning import array_readwriter from distpartitioning import array_readwriter
from distpartitioning.array_readwriter.parquet import ParquetArrayParser from distpartitioning.array_readwriter.parquet import ParquetArrayParser
from files import setdir from files import setdir
...@@ -16,12 +16,16 @@ def _chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt, vector_rows=False): ...@@ -16,12 +16,16 @@ def _chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt, vector_rows=False):
for j, n in enumerate(chunk_sizes): for j, n in enumerate(chunk_sizes):
path = os.path.abspath(path_fmt % j) path = os.path.abspath(path_fmt % j)
arr_chunk = arr[offset: offset + n] arr_chunk = arr[offset : offset + n]
shape = arr_chunk.shape shape = arr_chunk.shape
logging.info("Chunking %d-%d" % (offset, offset + n)) logging.info("Chunking %d-%d" % (offset, offset + n))
# If requested we write multi-column arrays as single-column vector Parquet files # If requested we write multi-column arrays as single-column vector Parquet files
array_parser = array_readwriter.get_array_parser(**fmt_meta) array_parser = array_readwriter.get_array_parser(**fmt_meta)
if isinstance(array_parser, ParquetArrayParser) and len(shape) > 1 and shape[1] > 1: if (
isinstance(array_parser, ParquetArrayParser)
and len(shape) > 1
and shape[1] > 1
):
array_parser.write(path, arr_chunk, vector_rows=vector_rows) array_parser.write(path, arr_chunk, vector_rows=vector_rows)
else: else:
array_parser.write(path, arr_chunk) array_parser.write(path, arr_chunk)
...@@ -83,8 +87,15 @@ def _initialize_num_chunks(g, num_chunks, kwargs=None): ...@@ -83,8 +87,15 @@ def _initialize_num_chunks(g, num_chunks, kwargs=None):
def _chunk_graph( def _chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_format, g,
vector_rows=False, **kwargs name,
ndata_paths,
edata_paths,
num_chunks,
data_fmt,
edges_format,
vector_rows=False,
**kwargs,
): ):
# First deal with ndata and edata that are homogeneous # First deal with ndata and edata that are homogeneous
# (i.e. not a dict-of-dict) # (i.e. not a dict-of-dict)
...@@ -139,16 +150,24 @@ def _chunk_graph( ...@@ -139,16 +150,24 @@ def _chunk_graph(
k: v for k, v in zip(g.canonical_etypes, num_edges_per_chunk) k: v for k, v in zip(g.canonical_etypes, num_edges_per_chunk)
} }
idxes_etypestr = {
idx: (etype, etypestrs[etype])
for idx, etype in enumerate(g.canonical_etypes)
}
idxes = np.arange(len(idxes_etypestr))
# Split edge index # Split edge index
metadata["edges"] = {} metadata["edges"] = {}
with setdir("edge_index"): with setdir("edge_index"):
for etype in g.canonical_etypes: np.random.shuffle(idxes)
etypestr = etypestrs[etype] for idx in idxes:
etype = idxes_etypestr[idx][0]
etypestr = idxes_etypestr[idx][1]
logging.info("Chunking edge index for %s" % etypestr) logging.info("Chunking edge index for %s" % etypestr)
edges_meta = {} edges_meta = {}
if edges_format == 'csv': if edges_format == "csv":
fmt_meta = {"name": edges_format, "delimiter": " "} fmt_meta = {"name": edges_format, "delimiter": " "}
elif edges_format == 'parquet': elif edges_format == "parquet":
fmt_meta = {"name": edges_format} fmt_meta = {"name": edges_format}
else: else:
raise RuntimeError(f"Invalid edges_fmt: {edges_format}") raise RuntimeError(f"Invalid edges_fmt: {edges_format}")
...@@ -259,7 +278,7 @@ def chunk_graph( ...@@ -259,7 +278,7 @@ def chunk_graph(
num_chunks, num_chunks,
output_path, output_path,
data_fmt="numpy", data_fmt="numpy",
edges_fmt='csv', edges_fmt="csv",
vector_rows=False, vector_rows=False,
**kwargs, **kwargs,
): ):
...@@ -302,14 +321,26 @@ def chunk_graph( ...@@ -302,14 +321,26 @@ def chunk_graph(
edata[key] = os.path.abspath(edata[key]) edata[key] = os.path.abspath(edata[key])
with setdir(output_path): with setdir(output_path):
_chunk_graph( _chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_fmt, g,
vector_rows, **kwargs name,
ndata_paths,
edata_paths,
num_chunks,
data_fmt,
edges_fmt,
vector_rows,
**kwargs,
) )
def create_chunked_dataset( def create_chunked_dataset(
root_dir, num_chunks, data_fmt="numpy", edges_fmt='csv', root_dir,
vector_rows=False, **kwargs): num_chunks,
data_fmt="numpy",
edges_fmt="csv",
vector_rows=False,
**kwargs,
):
""" """
This function creates a sample dataset, based on MAG240 dataset. This function creates a sample dataset, based on MAG240 dataset.
......
...@@ -529,7 +529,8 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map): ...@@ -529,7 +529,8 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
]: ]:
edge_datadict[col] = [] edge_datadict[col] = []
for etype_name, etype_info in edge_data.items(): for etype_name, etype_id in etype_name_idmap.items():
etype_info = edge_data[etype_name]
edge_info = etype_info[constants.STR_DATA] edge_info = etype_info[constants.STR_DATA]
# edgetype strings are in canonical format, src_node_type:edge_type:dst_node_type # edgetype strings are in canonical format, src_node_type:edge_type:dst_node_type
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment