Unverified Commit a697e791 authored by yxy235's avatar yxy235 Committed by GitHub
Browse files

[GraphBolt] Support numpy for edges when constructing graph in `preprocess_ondisk_dataset`. (#6916)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-0-133.us-west-2.compute.internal>
parent b3224ce8
...@@ -4,7 +4,6 @@ import os ...@@ -4,7 +4,6 @@ import os
from copy import deepcopy from copy import deepcopy
from typing import Dict, List, Union from typing import Dict, List, Union
import pandas as pd
import torch import torch
import yaml import yaml
...@@ -14,7 +13,12 @@ from ...base import dgl_warning ...@@ -14,7 +13,12 @@ from ...base import dgl_warning
from ...data.utils import download, extract_archive from ...data.utils import download, extract_archive
from ..base import etype_str_to_tuple from ..base import etype_str_to_tuple
from ..dataset import Dataset, Task from ..dataset import Dataset, Task
from ..internal import copy_or_convert_data, get_attributes, read_data from ..internal import (
copy_or_convert_data,
get_attributes,
read_data,
read_edges,
)
from ..itemset import ItemSet, ItemSetDict from ..itemset import ItemSet, ItemSetDict
from ..sampling_graph import SamplingGraph from ..sampling_graph import SamplingGraph
from .fused_csc_sampling_graph import from_dglgraph, FusedCSCSamplingGraph from .fused_csc_sampling_graph import from_dglgraph, FusedCSCSamplingGraph
...@@ -86,14 +90,9 @@ def preprocess_ondisk_dataset( ...@@ -86,14 +90,9 @@ def preprocess_ondisk_dataset(
if is_homogeneous: if is_homogeneous:
# Homogeneous graph. # Homogeneous graph.
num_nodes = input_config["graph"]["nodes"][0]["num"] num_nodes = input_config["graph"]["nodes"][0]["num"]
edge_data = pd.read_csv( edge_fmt = input_config["graph"]["edges"][0]["format"]
os.path.join( edge_path = input_config["graph"]["edges"][0]["path"]
dataset_dir, input_config["graph"]["edges"][0]["path"] src, dst = read_edges(dataset_dir, edge_fmt, edge_path)
),
names=["src", "dst"],
)
src, dst = edge_data["src"].to_numpy(), edge_data["dst"].to_numpy()
g = dgl.graph((src, dst), num_nodes=num_nodes) g = dgl.graph((src, dst), num_nodes=num_nodes)
else: else:
# Heterogeneous graph. # Heterogeneous graph.
...@@ -104,12 +103,9 @@ def preprocess_ondisk_dataset( ...@@ -104,12 +103,9 @@ def preprocess_ondisk_dataset(
# Construct the data dict. # Construct the data dict.
data_dict = {} data_dict = {}
for edge_info in input_config["graph"]["edges"]: for edge_info in input_config["graph"]["edges"]:
edge_data = pd.read_csv( edge_fmt = edge_info["format"]
os.path.join(dataset_dir, edge_info["path"]), edge_path = edge_info["path"]
names=["src", "dst"], src, dst = read_edges(dataset_dir, edge_fmt, edge_path)
)
src = torch.tensor(edge_data["src"])
dst = torch.tensor(edge_data["dst"])
data_dict[etype_str_to_tuple(edge_info["type"])] = (src, dst) data_dict[etype_str_to_tuple(edge_info["type"])] = (src, dst)
# Construct the heterograph. # Construct the heterograph.
g = dgl.heterograph(data_dict, num_nodes_dict) g = dgl.heterograph(data_dict, num_nodes_dict)
......
...@@ -4,6 +4,7 @@ import os ...@@ -4,6 +4,7 @@ import os
import shutil import shutil
import numpy as np import numpy as np
import pandas as pd
import torch import torch
from numpy.lib.format import read_array_header_1_0, read_array_header_2_0 from numpy.lib.format import read_array_header_1_0, read_array_header_2_0
...@@ -120,3 +121,27 @@ def get_attributes(_obj) -> list: ...@@ -120,3 +121,27 @@ def get_attributes(_obj) -> list:
and not callable(getattr(_obj, attribute)) and not callable(getattr(_obj, attribute))
] ]
return attributes return attributes
def read_edges(dataset_dir, edge_fmt, edge_path):
"""Read egde data from numpy or csv."""
assert edge_fmt in [
"numpy",
"csv",
], f"`numpy` or `csv` is expected when reading edges but got `{edge_fmt}`."
if edge_fmt == "numpy":
edge_data = read_data(
os.path.join(dataset_dir, edge_path),
edge_fmt,
)
assert (
edge_data.shape[0] == 2 and len(edge_data.shape) == 2
), f"The shape of edges should be (2, N), but got {edge_data.shape}."
src, dst = edge_data.numpy()
else:
edge_data = pd.read_csv(
os.path.join(dataset_dir, edge_path),
names=["src", "dst"],
)
src, dst = edge_data["src"].to_numpy(), edge_data["dst"].to_numpy()
return (src, dst)
...@@ -88,22 +88,31 @@ def random_hetero_graph(num_nodes, num_edges, num_ntypes, num_etypes): ...@@ -88,22 +88,31 @@ def random_hetero_graph(num_nodes, num_edges, num_ntypes, num_etypes):
def random_homo_graphbolt_graph( def random_homo_graphbolt_graph(
test_dir, dataset_name, num_nodes, num_edges, num_classes test_dir, dataset_name, num_nodes, num_edges, num_classes, edge_fmt="csv"
): ):
"""Generate random graphbolt version homograph""" """Generate random graphbolt version homograph"""
# Generate random edges. # Generate random edges.
nodes = np.repeat(np.arange(num_nodes), 5) nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges)) neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1) edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True) os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"]) assert edge_fmt in ["numpy", "csv"], print(
edge_path = os.path.join("edges", "edge.csv") "only numpy and csv are supported for edges."
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
) )
if edge_fmt == "csv":
# Wrtie into edges/edge.csv
edges = pd.DataFrame(edges, columns=["src", "dst"])
edge_path = os.path.join("edges", "edge.csv")
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
)
else:
# Wrtie into edges/edge.npy
edges = edges.T
edge_path = os.path.join("edges", "edge.npy")
np.save(os.path.join(test_dir, edge_path), edges)
# Generate random graph edge-feats. # Generate random graph edge-feats.
edge_feats = np.random.rand(num_edges, num_classes) edge_feats = np.random.rand(num_edges, num_classes)
...@@ -153,7 +162,7 @@ def random_homo_graphbolt_graph( ...@@ -153,7 +162,7 @@ def random_homo_graphbolt_graph(
nodes: nodes:
- num: {num_nodes} - num: {num_nodes}
edges: edges:
- format: csv - format: {edge_fmt}
path: {edge_path} path: {edge_path}
feature_data: feature_data:
- domain: edge - domain: edge
...@@ -203,7 +212,7 @@ def random_homo_graphbolt_graph( ...@@ -203,7 +212,7 @@ def random_homo_graphbolt_graph(
def genereate_raw_data_for_hetero_dataset( def genereate_raw_data_for_hetero_dataset(
test_dir, dataset_name, num_nodes, num_edges, num_classes test_dir, dataset_name, num_nodes, num_edges, num_classes, edge_fmt="csv"
): ):
# Generate edges. # Generate edges.
edges_path = {} edges_path = {}
...@@ -211,17 +220,25 @@ def genereate_raw_data_for_hetero_dataset( ...@@ -211,17 +220,25 @@ def genereate_raw_data_for_hetero_dataset(
src_ntype, etype_str, dst_ntype = etype src_ntype, etype_str, dst_ntype = etype
src = torch.randint(0, num_nodes[src_ntype], (num_edge,)) src = torch.randint(0, num_nodes[src_ntype], (num_edge,))
dst = torch.randint(0, num_nodes[dst_ntype], (num_edge,)) dst = torch.randint(0, num_nodes[dst_ntype], (num_edge,))
# Write into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True) os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
edges = pd.DataFrame( assert edge_fmt in ["numpy", "csv"], print(
np.stack([src, dst], axis=1), columns=["src", "dst"] "only numpy and csv are supported for edges."
)
edge_path = os.path.join("edges", f"{etype_str}.csv")
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
) )
if edge_fmt == "csv":
# Write into edges/edge.csv
edges = pd.DataFrame(
np.stack([src, dst], axis=1), columns=["src", "dst"]
)
edge_path = os.path.join("edges", f"{etype_str}.csv")
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
)
else:
edges = np.stack([src, dst], axis=1).T
edge_path = os.path.join("edges", f"{etype_str}.npy")
np.save(os.path.join(test_dir, edge_path), edges)
edges_path[etype_str] = edge_path edges_path[etype_str] = edge_path
# Generate node features. # Generate node features.
...@@ -263,10 +280,10 @@ def genereate_raw_data_for_hetero_dataset( ...@@ -263,10 +280,10 @@ def genereate_raw_data_for_hetero_dataset(
num: {num_nodes["item"]} num: {num_nodes["item"]}
edges: edges:
- type: "user:follow:user" - type: "user:follow:user"
format: csv format: {edge_fmt}
path: {edges_path["follow"]} path: {edges_path["follow"]}
- type: "user:click:item" - type: "user:click:item"
format: csv format: {edge_fmt}
path: {edges_path["click"]} path: {edges_path["click"]}
feature_data: feature_data:
- domain: node - domain: node
......
...@@ -1095,7 +1095,8 @@ def test_OnDiskDataset_Metadata(): ...@@ -1095,7 +1095,8 @@ def test_OnDiskDataset_Metadata():
assert dataset.dataset_name == dataset_name assert dataset.dataset_name == dataset_name
def test_OnDiskDataset_preprocess_homogeneous(): @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_preprocess_homogeneous(edge_fmt):
"""Test preprocess of OnDiskDataset.""" """Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified. # All metadata fields are specified.
...@@ -1111,6 +1112,7 @@ def test_OnDiskDataset_preprocess_homogeneous(): ...@@ -1111,6 +1112,7 @@ def test_OnDiskDataset_preprocess_homogeneous():
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -1160,6 +1162,7 @@ def test_OnDiskDataset_preprocess_homogeneous(): ...@@ -1160,6 +1162,7 @@ def test_OnDiskDataset_preprocess_homogeneous():
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -1527,7 +1530,8 @@ def test_OnDiskDataset_preprocess_yaml_content_windows(): ...@@ -1527,7 +1530,8 @@ def test_OnDiskDataset_preprocess_yaml_content_windows():
) )
def test_OnDiskDataset_load_name(): @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_load_name(edge_fmt):
"""Test preprocess of OnDiskDataset.""" """Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified. # All metadata fields are specified.
...@@ -1543,6 +1547,7 @@ def test_OnDiskDataset_load_name(): ...@@ -1543,6 +1547,7 @@ def test_OnDiskDataset_load_name():
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -1556,7 +1561,8 @@ def test_OnDiskDataset_load_name(): ...@@ -1556,7 +1561,8 @@ def test_OnDiskDataset_load_name():
dataset = None dataset = None
def test_OnDiskDataset_load_feature(): @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_load_feature(edge_fmt):
"""Test preprocess of OnDiskDataset.""" """Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified. # All metadata fields are specified.
...@@ -1572,6 +1578,7 @@ def test_OnDiskDataset_load_feature(): ...@@ -1572,6 +1578,7 @@ def test_OnDiskDataset_load_feature():
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -1640,7 +1647,8 @@ def test_OnDiskDataset_load_feature(): ...@@ -1640,7 +1647,8 @@ def test_OnDiskDataset_load_feature():
dataset = None dataset = None
def test_OnDiskDataset_load_graph(): @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_load_graph(edge_fmt):
"""Test preprocess of OnDiskDataset.""" """Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified. # All metadata fields are specified.
...@@ -1656,6 +1664,7 @@ def test_OnDiskDataset_load_graph(): ...@@ -1656,6 +1664,7 @@ def test_OnDiskDataset_load_graph():
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -1723,6 +1732,7 @@ def test_OnDiskDataset_load_graph(): ...@@ -1723,6 +1732,7 @@ def test_OnDiskDataset_load_graph():
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -1739,7 +1749,8 @@ def test_OnDiskDataset_load_graph(): ...@@ -1739,7 +1749,8 @@ def test_OnDiskDataset_load_graph():
dataset = None dataset = None
def test_OnDiskDataset_load_tasks(): @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_load_tasks(edge_fmt):
"""Test preprocess of OnDiskDataset.""" """Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified. # All metadata fields are specified.
...@@ -1755,6 +1766,7 @@ def test_OnDiskDataset_load_tasks(): ...@@ -1755,6 +1766,7 @@ def test_OnDiskDataset_load_tasks():
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -2028,7 +2040,8 @@ def test_BuiltinDataset(): ...@@ -2028,7 +2040,8 @@ def test_BuiltinDataset():
@pytest.mark.parametrize("include_original_edge_id", [True, False]) @pytest.mark.parametrize("include_original_edge_id", [True, False])
def test_OnDiskDataset_homogeneous(include_original_edge_id): @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_homogeneous(include_original_edge_id, edge_fmt):
"""Preprocess and instantiate OnDiskDataset for homogeneous graph.""" """Preprocess and instantiate OnDiskDataset for homogeneous graph."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
# All metadata fields are specified. # All metadata fields are specified.
...@@ -2044,6 +2057,7 @@ def test_OnDiskDataset_homogeneous(include_original_edge_id): ...@@ -2044,6 +2057,7 @@ def test_OnDiskDataset_homogeneous(include_original_edge_id):
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
yaml_file = os.path.join(test_dir, "metadata.yaml") yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f: with open(yaml_file, "w") as f:
...@@ -2095,7 +2109,8 @@ def test_OnDiskDataset_homogeneous(include_original_edge_id): ...@@ -2095,7 +2109,8 @@ def test_OnDiskDataset_homogeneous(include_original_edge_id):
@pytest.mark.parametrize("include_original_edge_id", [True, False]) @pytest.mark.parametrize("include_original_edge_id", [True, False])
def test_OnDiskDataset_heterogeneous(include_original_edge_id): @pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_OnDiskDataset_heterogeneous(include_original_edge_id, edge_fmt):
"""Preprocess and instantiate OnDiskDataset for heterogeneous graph.""" """Preprocess and instantiate OnDiskDataset for heterogeneous graph."""
with tempfile.TemporaryDirectory() as test_dir: with tempfile.TemporaryDirectory() as test_dir:
dataset_name = "OnDiskDataset_hetero" dataset_name = "OnDiskDataset_hetero"
...@@ -2114,6 +2129,7 @@ def test_OnDiskDataset_heterogeneous(include_original_edge_id): ...@@ -2114,6 +2129,7 @@ def test_OnDiskDataset_heterogeneous(include_original_edge_id):
num_nodes, num_nodes,
num_edges, num_edges,
num_classes, num_classes,
edge_fmt=edge_fmt,
) )
dataset = gb.OnDiskDataset( dataset = gb.OnDiskDataset(
......
import os import os
import re
import tempfile import tempfile
import dgl.graphbolt.internal as internal import dgl.graphbolt.internal as internal
import numpy as np import numpy as np
import pandas as pd
import pytest import pytest
import torch import torch
...@@ -141,3 +143,60 @@ def test_copy_or_convert_data(data_fmt, save_fmt, is_feature): ...@@ -141,3 +143,60 @@ def test_copy_or_convert_data(data_fmt, save_fmt, is_feature):
data = None data = None
tensor_data = None tensor_data = None
out_data = None out_data = None
@pytest.mark.parametrize("edge_fmt", ["csv", "numpy"])
def test_read_edges(edge_fmt):
with tempfile.TemporaryDirectory() as test_dir:
num_nodes = 40
num_edges = 200
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
if edge_fmt == "csv":
# Wrtie into edges/edge.csv
edges = pd.DataFrame(edges, columns=["src", "dst"])
edge_path = os.path.join("edges", "edge.csv")
edges.to_csv(
os.path.join(test_dir, edge_path),
index=False,
header=False,
)
else:
# Wrtie into edges/edge.npy
edges = edges.T
edge_path = os.path.join("edges", "edge.npy")
np.save(os.path.join(test_dir, edge_path), edges)
src, dst = internal.read_edges(test_dir, edge_fmt, edge_path)
assert src.all() == nodes.all()
assert dst.all() == neighbors.all()
def test_read_edges_error():
# 1. Unsupported file format.
with pytest.raises(
AssertionError,
match="`numpy` or `csv` is expected when reading edges but got `fake-type`.",
):
internal.read_edges("test_dir", "fake-type", "edge_path")
# 2. Unexpected shape of numpy array
with tempfile.TemporaryDirectory() as test_dir:
num_nodes = 40
num_edges = 200
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors, nodes], axis=1)
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
# Wrtie into edges/edge.npy
edges = edges.T
edge_path = os.path.join("edges", "edge.npy")
np.save(os.path.join(test_dir, edge_path), edges)
with pytest.raises(
AssertionError,
match=re.escape(
"The shape of edges should be (2, N), but got torch.Size([3, 200])."
),
):
internal.read_edges(test_dir, "numpy", edge_path)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment