"examples/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "e82f174624663b0d72f6fcc13a1d3339d54ffc75"
Unverified Commit 08fd6cf8 authored by peizhou001's avatar peizhou001 Committed by GitHub
Browse files

[Feature] Add parquet support for node/edge features in chunked data (#4933)

parent e0fc038d
......@@ -15,7 +15,7 @@ from dgl.data.utils import load_graphs, load_tensors
def create_chunked_dataset(
root_dir, num_chunks, include_masks=False
root_dir, num_chunks, include_masks=False, data_fmt='numpy'
):
"""
This function creates a sample dataset, based on MAG240 dataset.
......@@ -233,6 +233,7 @@ def create_chunked_dataset(
edge_data,
num_chunks=num_chunks,
output_path=output_dir,
data_fmt=data_fmt,
)
print('Done with creating chunked graph')
......
import os
import tempfile
import numpy as np
import pytest
from distpartitioning import array_readwriter
@pytest.mark.parametrize(
"shape", [[500], [300, 10], [200, 5, 5], [100, 5, 5, 5]]
)
@pytest.mark.parametrize("format", ["numpy", "parquet"])
def test_array_readwriter(format, shape):
original_array = np.random.rand(*shape)
fmt_meta = {"name": format}
with tempfile.TemporaryDirectory() as test_dir:
path = os.path.join(test_dir, f"nodes.{format}")
array_readwriter.get_array_parser(**fmt_meta).write(
path, original_array
)
array = array_readwriter.get_array_parser(**fmt_meta).read(path)
assert original_array.shape == array.shape
assert np.array_equal(original_array, array)
......@@ -8,16 +8,14 @@ import pytest
import torch
from chunk_graph import chunk_graph
from create_chunked_dataset import create_chunked_dataset
from distpartitioning import array_readwriter
import dgl
from dgl.data.utils import load_graphs, load_tensors
from dgl.distributed.partition import (
RESERVED_FIELD_DTYPE,
load_partition,
_get_inner_node_mask,
_get_inner_edge_mask,
_etype_tuple_to_str,
)
from dgl.distributed.partition import (RESERVED_FIELD_DTYPE,
_etype_tuple_to_str,
_get_inner_edge_mask,
_get_inner_node_mask, load_partition)
def _verify_partition_data_types(part_g):
......@@ -80,11 +78,12 @@ def _verify_graph_feats(
@pytest.mark.parametrize("num_chunks", [1, 8])
def test_chunk_graph(num_chunks):
@pytest.mark.parametrize("data_fmt", ['numpy', 'parquet'])
def test_chunk_graph(num_chunks, data_fmt):
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks)
g = create_chunked_dataset(root_dir, num_chunks, data_fmt=data_fmt)
# check metadata.json
output_dir = os.path.join(root_dir, "chunked-data")
......@@ -111,12 +110,16 @@ def test_chunk_graph(num_chunks):
assert isinstance(int(num2), int)
# check node/edge_data
suffix = 'npy' if data_fmt=='numpy' else 'parquet'
reader_fmt_meta = {"name": data_fmt}
def test_data(sub_dir, feat, expected_data, expected_shape):
data = []
for i in range(num_chunks):
fname = os.path.join(sub_dir, f'{feat}-{i}.npy')
fname = os.path.join(sub_dir, f'{feat}-{i}.{suffix}')
assert os.path.isfile(fname)
feat_array = np.load(fname)
feat_array = array_readwriter.get_array_parser(
**reader_fmt_meta
).read(fname)
assert feat_array.shape[0] == expected_shape
data.append(feat_array)
data = np.concatenate(data, 0)
......@@ -136,7 +139,7 @@ def test_chunk_graph(num_chunks):
test_data(sub_dir, feat, data, g.num_edges(c_etype) // num_chunks)
def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None):
def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None, data_fmt='numpy'):
if num_chunks < num_parts:
# num_parts should less/equal than num_chunks
return
......@@ -147,7 +150,7 @@ def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None):
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks)
g = create_chunked_dataset(root_dir, num_chunks, data_fmt=data_fmt)
# Step1: graph partition
in_dir = os.path.join(root_dir, "chunked-data")
......@@ -224,3 +227,9 @@ def test_pipeline_basics(num_chunks, num_parts, world_size):
def test_pipeline_formats(graph_formats):
_test_pipeline(4, 4, 4, graph_formats)
@pytest.mark.parametrize(
"data_fmt", ['numpy', "parquet"]
)
def test_pipeline_feature_format(data_fmt):
_test_pipeline(4, 4, 4, data_fmt=data_fmt)
......@@ -6,7 +6,8 @@ import pathlib
from contextlib import contextmanager
import torch
from utils import array_readwriter, setdir
from distpartitioning import array_readwriter
from files import setdir
import dgl
......@@ -25,7 +26,7 @@ def chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
return paths
def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt):
# First deal with ndata and edata that are homogeneous (i.e. not a dict-of-dict)
if len(g.ntypes) == 1 and not isinstance(
next(iter(ndata_paths.values())), dict
......@@ -94,6 +95,8 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
metadata["edges"][etypestr] = edges_meta
# Chunk node data
reader_fmt_meta, writer_fmt_meta = {"name": "numpy"}, {"name": data_fmt}
file_suffix = 'npy' if data_fmt == 'numpy' else 'parquet'
metadata["node_data"] = {}
with setdir("node_data"):
for ntype, ndata_per_type in ndata_paths.items():
......@@ -104,7 +107,6 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
"Chunking node data for type %s key %s" % (ntype, key)
)
ndata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(
**reader_fmt_meta
).read(path)
......@@ -113,7 +115,7 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
arr,
writer_fmt_meta,
num_nodes_per_chunk_dict[ntype],
key + "-%d.npy",
key + "-%d." + file_suffix,
)
ndata_meta[key] = ndata_key_meta
......@@ -131,7 +133,6 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
% (etypestr, key)
)
edata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(
**reader_fmt_meta
).read(path)
......@@ -141,7 +142,7 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
arr,
writer_fmt_meta,
num_edges_per_chunk_dict[etype],
key + "-%d.npy",
key + "-%d." + file_suffix,
)
edata_meta[key] = edata_key_meta
......@@ -153,7 +154,7 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
logging.info("Saved metadata in %s" % os.path.abspath(metadata_path))
def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt='numpy'):
"""
Split the graph into multiple chunks.
......@@ -184,7 +185,7 @@ def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
for key in edata.keys():
edata[key] = os.path.abspath(edata[key])
with setdir(output_path):
_chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path)
_chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt)
if __name__ == "__main__":
......
from . import csv, numpy_array
from . import csv, numpy_array, parquet
from .registry import get_array_parser, register_array_parser
import logging
import pandas as pd
import pyarrow
import pyarrow.parquet
from .registry import register_array_parser
@register_array_parser("parquet")
class ParquetArrayParser(object):
def __init__(self):
pass
def read(self, path):
logging.info("Reading from %s using parquet format" % path)
metadata = pyarrow.parquet.read_metadata(path)
metadata = metadata.schema.to_arrow_schema().metadata
# As parquet data are tabularized, we assume the dim of ndarray is 2.
# If not, it should be explictly specified in the file as metadata.
shape = metadata.get(b"shape", None)
table = pyarrow.parquet.read_table(path, memory_map=True)
logging.info("Done reading from %s" % path)
arr = table.to_pandas().to_numpy()
if not shape:
logging.warning(
"Shape information not found in the metadata, read the data as "
"a 2 dim array."
)
shape = tuple(eval(shape.decode())) if shape else arr.shape
return arr.reshape(shape)
def write(self, path, array):
logging.info("Writing to %s using parquet format" % path)
shape = array.shape
if len(shape) > 2:
array = array.reshape(shape[0], -1)
table = pyarrow.Table.from_pandas(pd.DataFrame(array))
table = table.replace_schema_metadata({"shape": str(shape)})
pyarrow.parquet.write_table(table, path)
logging.info("Done writing to %s" % path)
......@@ -30,6 +30,7 @@ STR_NODE_DATA = "node_data"
STR_EDGE_DATA = "edge_data"
STR_NUMPY = "numpy"
STR_PARQUET = "parquet"
STR_CSV = "csv"
STR_NAME = "name"
......
......@@ -5,6 +5,7 @@ import numpy as np
import pyarrow
import torch
from pyarrow import csv
import array_readwriter
import constants
from utils import get_idranges, map_partid_rank
......@@ -106,7 +107,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
to mention all the files present for this given node feature.
Data read from each of the node features file is a multi-dimensional tensor data and is read
in numpy format, which is also the storage format of node features on the permanent storage.
in numpy or parquet format, which is also the storage format of node features on the permanent storage.
"node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
"num_nodes_per_chunk" : [
......@@ -143,12 +144,13 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
if((dataset_features is not None) and (len(dataset_features) > 0)):
for ntype_name, ntype_feature_data in dataset_features.items():
for feat_name, feat_data in ntype_feature_data.items():
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
assert (feat_data[constants.STR_FORMAT][constants.STR_NAME]
in [constants.STR_NUMPY, constants.STR_PARQUET])
# It is guaranteed that num_chunks is always greater
# than num_partitions.
num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), num_parts)
reader_fmt_meta = {"name": feat_data[constants.STR_FORMAT][constants.STR_NAME]}
for local_part_id in range(num_parts):
if map_partid_rank(local_part_id, world_size) == rank:
nfeat = []
......@@ -158,7 +160,11 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
if not os.path.isabs(nfeat_file):
nfeat_file = os.path.join(input_dir, nfeat_file)
logging.info(f'Loading node feature[{feat_name}] of ntype[{ntype_name}] from {nfeat_file}')
nfeat.append(np.load(nfeat_file))
nfeat.append(
array_readwriter.get_array_parser(
**reader_fmt_meta
).read(nfeat_file)
)
nfeat = np.concatenate(nfeat) if len(nfeat) != 0 else np.array([])
node_features[ntype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = torch.from_numpy(nfeat)
nfeat_tids.append(node_tids[ntype_name][local_part_id])
......@@ -241,7 +247,8 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
if dataset_features and (len(dataset_features) > 0):
for etype_name, etype_feature_data in dataset_features.items():
for feat_name, feat_data in etype_feature_data.items():
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] in [constants.STR_NUMPY, constants.STR_PARQUET]
reader_fmt_meta = {"name": feat_data[constants.STR_FORMAT][constants.STR_NAME]}
num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), num_parts)
for local_part_id in range(num_parts):
......@@ -249,14 +256,17 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
efeats = []
efeat_tids = []
for idx in read_list[local_part_id]:
feature_fname = feat_data[constants.STR_DATA][idx]
if (os.path.isabs(feature_fname)):
logging.info(f'Loading numpy from {feature_fname}')
efeats.append(torch.from_numpy(np.load(feature_fname)))
else:
numpy_path = os.path.join(input_dir, feature_fname)
logging.info(f'Loading numpy from {numpy_path}')
efeats.append(torch.from_numpy(np.load(numpy_path)))
efeat_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(efeat_file):
efeat_file = os.path.join(input_dir, efeat_file)
logging.info(
f'Loading edge feature[{feat_name}] of etype[{etype_name}] from {efeat_file}'
)
efeats.append(
array_readwriter.get_array_parser(
**reader_fmt_meta
).read(efeat_file)
)
efeat_tids.append(edge_tids[etype_name][local_part_id])
edge_features[etype_name+'/'+feat_name+"/"+str(local_part_id//world_size)] = torch.from_numpy(np.concatenate(efeats))
edge_feature_tids[etype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = efeat_tids
......
......@@ -2,17 +2,15 @@ import json
import logging
import os
import dgl
import constants
import numpy as np
import psutil
import pyarrow
import torch
from pyarrow import csv
import constants
from dgl.distributed.partition import (
_dump_part_config
)
import dgl
from dgl.distributed.partition import _dump_part_config
def read_ntype_partition_files(schema_map, input_dir):
"""
......
......@@ -7,8 +7,8 @@ import sys
import numpy as np
from base import PartitionMeta, dump_partition_meta
from utils import array_readwriter, setdir
from distpartitioning import array_readwriter
from files import setdir
def _random_partition(metadata, num_parts):
num_nodes_per_type = [sum(_) for _ in metadata["num_nodes_per_chunk"]]
......
from . import array_readwriter
from .files import *
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment