Unverified Commit 774709d3 authored by Theodore Vasiloudis's avatar Theodore Vasiloudis Committed by GitHub
Browse files

[Dist] Add support for Parquet-formatted edges files, remove some assumptions...


[Dist] Add support for Parquet-formatted edges files, remove some assumptions on edge file number. (#5051)

* [Dist] Add support for Parquet-formatted edges files, remove some assumptions on edge file number.

* [Dist] Add parquet edges option to unit tests.
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
parent cd817a1a
...@@ -5,6 +5,7 @@ import tempfile ...@@ -5,6 +5,7 @@ import tempfile
import numpy as np import numpy as np
import pytest import pytest
import torch import torch
import pyarrow.parquet as pq
from utils import create_chunked_dataset from utils import create_chunked_dataset
from distpartitioning import array_readwriter from distpartitioning import array_readwriter
...@@ -80,6 +81,7 @@ def _verify_graph_feats( ...@@ -80,6 +81,7 @@ def _verify_graph_feats(
def _test_chunk_graph( def _test_chunk_graph(
num_chunks, num_chunks,
data_fmt = 'numpy', data_fmt = 'numpy',
edges_fmt = 'csv',
num_chunks_nodes = None, num_chunks_nodes = None,
num_chunks_edges = None, num_chunks_edges = None,
num_chunks_node_data = None, num_chunks_node_data = None,
...@@ -88,7 +90,7 @@ def _test_chunk_graph( ...@@ -88,7 +90,7 @@ def _test_chunk_graph(
with tempfile.TemporaryDirectory() as root_dir: with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks, g = create_chunked_dataset(root_dir, num_chunks,
data_fmt=data_fmt, data_fmt=data_fmt, edges_fmt=edges_fmt,
num_chunks_nodes=num_chunks_nodes, num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges, num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data, num_chunks_node_data=num_chunks_node_data,
...@@ -117,11 +119,17 @@ def _test_chunk_graph( ...@@ -117,11 +119,17 @@ def _test_chunk_graph(
output_edge_index_dir, f'{c_etype_str}{i}.txt' output_edge_index_dir, f'{c_etype_str}{i}.txt'
) )
assert os.path.isfile(fname) assert os.path.isfile(fname)
if edges_fmt == 'csv':
with open(fname, "r") as f: with open(fname, "r") as f:
header = f.readline() header = f.readline()
num1, num2 = header.rstrip().split(" ") num1, num2 = header.rstrip().split(" ")
assert isinstance(int(num1), int) assert isinstance(int(num1), int)
assert isinstance(int(num2), int) assert isinstance(int(num2), int)
elif edges_fmt == 'parquet':
metadata = pq.read_metadata(fname)
assert metadata.num_columns == 2
else:
assert False, f"Invalid edges_fmt: {edges_fmt}"
# check node/edge_data # check node/edge_data
suffix = 'npy' if data_fmt=='numpy' else 'parquet' suffix = 'npy' if data_fmt=='numpy' else 'parquet'
...@@ -179,8 +187,9 @@ def _test_chunk_graph( ...@@ -179,8 +187,9 @@ def _test_chunk_graph(
@pytest.mark.parametrize("num_chunks", [1, 8]) @pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("data_fmt", ['numpy', 'parquet']) @pytest.mark.parametrize("data_fmt", ['numpy', 'parquet'])
def test_chunk_graph_basics(num_chunks, data_fmt): @pytest.mark.parametrize("edges_fmt", ['csv', 'parquet'])
_test_chunk_graph(num_chunks, data_fmt=data_fmt) def test_chunk_graph_basics(num_chunks, data_fmt, edges_fmt):
_test_chunk_graph(num_chunks, data_fmt=data_fmt, edges_fmt=edges_fmt)
@pytest.mark.parametrize( @pytest.mark.parametrize(
......
...@@ -76,7 +76,7 @@ def _initialize_num_chunks(g, num_chunks, kwargs=None): ...@@ -76,7 +76,7 @@ def _initialize_num_chunks(g, num_chunks, kwargs=None):
def _chunk_graph( def _chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, **kwargs g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_format, **kwargs
): ):
# First deal with ndata and edata that are homogeneous # First deal with ndata and edata that are homogeneous
# (i.e. not a dict-of-dict) # (i.e. not a dict-of-dict)
...@@ -138,7 +138,12 @@ def _chunk_graph( ...@@ -138,7 +138,12 @@ def _chunk_graph(
etypestr = etypestrs[etype] etypestr = etypestrs[etype]
logging.info("Chunking edge index for %s" % etypestr) logging.info("Chunking edge index for %s" % etypestr)
edges_meta = {} edges_meta = {}
fmt_meta = {"name": "csv", "delimiter": " "} if edges_format == 'csv':
fmt_meta = {"name": edges_format, "delimiter": " "}
elif edges_format == 'parquet':
fmt_meta = {"name": edges_format}
else:
raise RuntimeError(f"Invalid edges_fmt: {edges_format}")
edges_meta["format"] = fmt_meta edges_meta["format"] = fmt_meta
srcdst = torch.stack(g.edges(etype=etype), 1) srcdst = torch.stack(g.edges(etype=etype), 1)
...@@ -244,6 +249,7 @@ def chunk_graph( ...@@ -244,6 +249,7 @@ def chunk_graph(
num_chunks, num_chunks,
output_path, output_path,
data_fmt="numpy", data_fmt="numpy",
edges_fmt='csv',
**kwargs, **kwargs,
): ):
""" """
...@@ -281,11 +287,12 @@ def chunk_graph( ...@@ -281,11 +287,12 @@ def chunk_graph(
edata[key] = os.path.abspath(edata[key]) edata[key] = os.path.abspath(edata[key])
with setdir(output_path): with setdir(output_path):
_chunk_graph( _chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, data_fmt, **kwargs g, name, ndata_paths, edata_paths, num_chunks, data_fmt, edges_fmt, **kwargs
) )
def create_chunked_dataset(root_dir, num_chunks, data_fmt="numpy", **kwargs): def create_chunked_dataset(
root_dir, num_chunks, data_fmt="numpy", edges_fmt='csv', **kwargs):
""" """
This function creates a sample dataset, based on MAG240 dataset. This function creates a sample dataset, based on MAG240 dataset.
...@@ -523,6 +530,7 @@ def create_chunked_dataset(root_dir, num_chunks, data_fmt="numpy", **kwargs): ...@@ -523,6 +530,7 @@ def create_chunked_dataset(root_dir, num_chunks, data_fmt="numpy", **kwargs):
num_chunks=num_chunks, num_chunks=num_chunks,
output_path=output_dir, output_path=output_dir,
data_fmt=data_fmt, data_fmt=data_fmt,
edges_fmt=edges_fmt,
**kwargs, **kwargs,
) )
print("Done with creating chunked graph") print("Done with creating chunked graph")
......
...@@ -25,6 +25,7 @@ STR_EDGE_TYPE = "edge_type" ...@@ -25,6 +25,7 @@ STR_EDGE_TYPE = "edge_type"
STR_NUM_EDGES_PER_CHUNK = "num_edges_per_chunk" STR_NUM_EDGES_PER_CHUNK = "num_edges_per_chunk"
STR_EDGES = "edges" STR_EDGES = "edges"
STR_FORMAT = "format" STR_FORMAT = "format"
STR_FORMAT_DELIMITER = "delimiter"
STR_DATA = "data" STR_DATA = "data"
STR_NODE_DATA = "node_data" STR_NODE_DATA = "node_data"
STR_EDGE_DATA = "edge_data" STR_EDGE_DATA = "edge_data"
......
...@@ -4,6 +4,7 @@ import gc ...@@ -4,6 +4,7 @@ import gc
import numpy as np import numpy as np
import pyarrow import pyarrow
import pyarrow.parquet as pq
import torch import torch
import torch.distributed as dist import torch.distributed as dist
...@@ -501,8 +502,6 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map): ...@@ -501,8 +502,6 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
edge_datadict[col] = [] edge_datadict[col] = []
for etype_name, etype_info in edge_data.items(): for etype_name, etype_info in edge_data.items():
assert etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_CSV
edge_info = etype_info[constants.STR_DATA] edge_info = etype_info[constants.STR_DATA]
#edgetype strings are in canonical format, src_node_type:edge_type:dst_node_type #edgetype strings are in canonical format, src_node_type:edge_type:dst_node_type
...@@ -528,6 +527,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map): ...@@ -528,6 +527,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
edge_file = os.path.join(input_dir, edge_file) edge_file = os.path.join(input_dir, edge_file)
logging.info(f'Loading edges of etype[{etype_name}] from {edge_file}') logging.info(f'Loading edges of etype[{etype_name}] from {edge_file}')
if etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_CSV:
read_options=pyarrow.csv.ReadOptions(use_threads=True, block_size=4096, autogenerate_column_names=True) read_options=pyarrow.csv.ReadOptions(use_threads=True, block_size=4096, autogenerate_column_names=True)
parse_options=pyarrow.csv.ParseOptions(delimiter=' ') parse_options=pyarrow.csv.ParseOptions(delimiter=' ')
with pyarrow.csv.open_csv(edge_file, read_options=read_options, parse_options=parse_options) as reader: with pyarrow.csv.open_csv(edge_file, read_options=read_options, parse_options=parse_options) as reader:
...@@ -538,6 +538,13 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map): ...@@ -538,6 +538,13 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
next_table = pyarrow.Table.from_batches([next_chunk]) next_table = pyarrow.Table.from_batches([next_chunk])
src_ids.append(next_table['f0'].to_numpy()) src_ids.append(next_table['f0'].to_numpy())
dst_ids.append(next_table['f1'].to_numpy()) dst_ids.append(next_table['f1'].to_numpy())
elif etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_PARQUET:
data_df = pq.read_table(edge_file)
data_df = data_df.rename_columns(["f0", "f1"])
src_ids.append(data_df['f0'].to_numpy())
dst_ids.append(data_df['f1'].to_numpy())
else:
raise ValueError(f'Unknown edge format {etype_info[constants.STR_FORMAT][constants.STR_NAME]} for edge type {etype_name}')
src_ids = np.concatenate(src_ids) src_ids = np.concatenate(src_ids)
dst_ids = np.concatenate(dst_ids) dst_ids = np.concatenate(dst_ids)
......
...@@ -7,6 +7,7 @@ from pathlib import Path ...@@ -7,6 +7,7 @@ from pathlib import Path
import numpy as np import numpy as np
import pyarrow import pyarrow
import pyarrow.csv as csv import pyarrow.csv as csv
import pyarrow.parquet as pq
import torch import torch
import torch.distributed as dist import torch.distributed as dist
...@@ -77,7 +78,8 @@ def gen_edge_files(schema_map, output): ...@@ -77,7 +78,8 @@ def gen_edge_files(schema_map, output):
num_parts = len(schema_map[constants.STR_NUM_EDGES_PER_CHUNK][0]) num_parts = len(schema_map[constants.STR_NUM_EDGES_PER_CHUNK][0])
for etype_name, etype_info in edge_data.items(): for etype_name, etype_info in edge_data.items():
edge_info = etype_info[constants.STR_DATA] edges_format = etype_info[constants.STR_FORMAT][constants.STR_NAME]
edge_data_files = etype_info[constants.STR_DATA]
# ``edgetype`` strings are in canonical format, src_node_type:edge_type:dst_node_type # ``edgetype`` strings are in canonical format, src_node_type:edge_type:dst_node_type
tokens = etype_name.split(":") tokens = etype_name.split(":")
...@@ -87,13 +89,7 @@ def gen_edge_files(schema_map, output): ...@@ -87,13 +89,7 @@ def gen_edge_files(schema_map, output):
rel_name = tokens[1] rel_name = tokens[1]
dst_ntype_name = tokens[2] dst_ntype_name = tokens[2]
data_df = csv.read_csv( def convert_to_numpy_and_write_back(data_df):
edge_info[rank],
read_options=pyarrow.csv.ReadOptions(
autogenerate_column_names=True
),
parse_options=pyarrow.csv.ParseOptions(delimiter=" "),
)
data_f0 = data_df["f0"].to_numpy() data_f0 = data_df["f0"].to_numpy()
data_f1 = data_df["f1"].to_numpy() data_f1 = data_df["f1"].to_numpy()
...@@ -102,14 +98,33 @@ def gen_edge_files(schema_map, output): ...@@ -102,14 +98,33 @@ def gen_edge_files(schema_map, output):
cols = [global_src_id, global_dst_id] cols = [global_src_id, global_dst_id]
col_names = ["global_src_id", "global_dst_id"] col_names = ["global_src_id", "global_dst_id"]
out_file = edge_info[rank].split("/")[-1] out_file = edge_data_files[rank].split("/")[-1]
out_file = os.path.join(outdir, "edges_{}".format(out_file)) out_file = os.path.join(outdir, "edges_{}".format(out_file))
# TODO(thvasilo): We should support writing to the same format as the input
options = csv.WriteOptions(include_header=False, delimiter=" ") options = csv.WriteOptions(include_header=False, delimiter=" ")
options.delimiter = " " options.delimiter = " "
csv.write_csv( csv.write_csv(
pyarrow.Table.from_arrays(cols, names=col_names), out_file, options pyarrow.Table.from_arrays(cols, names=col_names), out_file, options
) )
return out_file
if edges_format == constants.STR_CSV:
delimiter = etype_info[constants.STR_FORMAT][constants.STR_FORMAT_DELIMITER]
data_df = csv.read_csv(
edge_data_files[rank],
read_options=pyarrow.csv.ReadOptions(
autogenerate_column_names=True
),
parse_options=pyarrow.csv.ParseOptions(delimiter=delimiter),
)
elif edges_format == constants.STR_PARQUET:
data_df = pq.read_table(edge_data_files[rank])
data_df = data_df.rename_columns(["f0", "f1"])
else:
raise NotImplementedError(f"Unknown edge format {edges_format}")
out_file = convert_to_numpy_and_write_back(data_df)
edge_files.append(out_file) edge_files.append(out_file)
return edge_files return edge_files
...@@ -277,7 +292,8 @@ def gen_parmetis_input_args(params, schema_map): ...@@ -277,7 +292,8 @@ def gen_parmetis_input_args(params, schema_map):
""" """
num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK] num_nodes_per_chunk = schema_map[constants.STR_NUM_NODES_PER_CHUNK]
num_parts = len(num_nodes_per_chunk[0]) # TODO: This makes the assumption that all node files have the same number of chunks
num_node_parts = len(num_nodes_per_chunk[0])
ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema_map) ntypes_ntypeid_map, ntypes, ntid_ntype_map = get_node_types(schema_map)
type_nid_dict, ntype_gnid_offset = get_idranges( type_nid_dict, ntype_gnid_offset = get_idranges(
schema_map[constants.STR_NODE_TYPE], schema_map[constants.STR_NODE_TYPE],
...@@ -319,7 +335,7 @@ def gen_parmetis_input_args(params, schema_map): ...@@ -319,7 +335,7 @@ def gen_parmetis_input_args(params, schema_map):
os.makedirs(outdir, exist_ok=True) os.makedirs(outdir, exist_ok=True)
for ntype_id, ntype_name in ntid_ntype_map.items(): for ntype_id, ntype_name in ntid_ntype_map.items():
global_nid_offset = ntype_gnid_offset[ntype_name][0, 0] global_nid_offset = ntype_gnid_offset[ntype_name][0, 0]
for r in range(num_parts): for r in range(num_node_parts):
type_start, type_end = ( type_start, type_end = (
type_nid_dict[ntype_name][r][0], type_nid_dict[ntype_name][r][0],
type_nid_dict[ntype_name][r][1], type_nid_dict[ntype_name][r][1],
...@@ -345,9 +361,9 @@ def gen_parmetis_input_args(params, schema_map): ...@@ -345,9 +361,9 @@ def gen_parmetis_input_args(params, schema_map):
edge_data = schema_map[constants.STR_EDGES] edge_data = schema_map[constants.STR_EDGES]
edge_files = [] edge_files = []
for etype_name, etype_info in edge_data.items(): for etype_name, etype_info in edge_data.items():
edge_info = etype_info[constants.STR_DATA] edge_data_files = etype_info[constants.STR_DATA]
for r in range(num_parts): for edge_file_path in edge_data_files:
out_file = edge_info[r].split("/")[-1] out_file = os.path.basename(edge_file_path)
out_file = os.path.join(outdir, "edges_{}".format(out_file)) out_file = os.path.join(outdir, "edges_{}".format(out_file))
edge_files.append(out_file) edge_files.append(out_file)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment