Unverified Commit 942b17ab authored by Mingbang Wang's avatar Mingbang Wang Committed by GitHub
Browse files

[GraphBolt] Add hardcode testcases for `gb.preprocess_ondisk_dataset` (#7035)

parent 7a976098
......@@ -1191,6 +1191,355 @@ def test_OnDiskDataset_preprocess_homogeneous(edge_fmt):
fused_csc_sampling_graph = None
def test_OnDiskDataset_preprocess_homogeneous_hardcode(edge_fmt="numpy"):
"""Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
"""Original graph in COO:
0 1 1 0 0
0 0 1 1 0
0 0 0 1 1
1 0 0 0 1
1 1 0 0 0
node_feats: [0.0, 1.9, 2.8, 3.7, 4.6]
edge_feats: [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]
"""
dataset_name = "graphbolt_test"
num_nodes = 5
num_edges = 10
num_classes = 1
# Generate edges.
edges = np.array(
[[0, 0, 1, 1, 2, 2, 3, 3, 4, 4], [1, 2, 2, 3, 3, 4, 4, 0, 0, 1]]
).T
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
edges = edges.T
edge_path = os.path.join("edges", "edge.npy")
np.save(os.path.join(test_dir, edge_path), edges)
# Generate graph edge-feats.
edge_feats = np.array(
[0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]
)
os.makedirs(os.path.join(test_dir, "data"), exist_ok=True)
edge_feat_path = os.path.join("data", "edge-feat.npy")
np.save(os.path.join(test_dir, edge_feat_path), edge_feats)
# Generate node-feats.
node_feats = np.array([0.0, 1.9, 2.8, 3.7, 4.6])
node_feat_path = os.path.join("data", "node-feat.npy")
np.save(os.path.join(test_dir, node_feat_path), node_feats)
# Generate train/test/valid set.
os.makedirs(os.path.join(test_dir, "set"), exist_ok=True)
train_data = np.array([0, 1, 2, 3, 4])
train_path = os.path.join("set", "train.npy")
np.save(os.path.join(test_dir, train_path), train_data)
valid_data = np.array([0, 1, 2, 3, 4])
valid_path = os.path.join("set", "valid.npy")
np.save(os.path.join(test_dir, valid_path), valid_data)
test_data = np.array([0, 1, 2, 3, 4])
test_path = os.path.join("set", "test.npy")
np.save(os.path.join(test_dir, test_path), test_data)
yaml_content = (
f"dataset_name: {dataset_name}\n"
f"graph:\n"
f" nodes:\n"
f" - num: {num_nodes}\n"
f" edges:\n"
f" - format: {edge_fmt}\n"
f" path: {edge_path}\n"
f" feature_data:\n"
f" - domain: node\n"
f" type: null\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {node_feat_path}\n"
f" - domain: edge\n"
f" type: null\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {edge_feat_path}\n"
f"feature_data:\n"
f" - domain: node\n"
f" type: null\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {node_feat_path}\n"
f" - domain: edge\n"
f" type: null\n"
f" name: feat\n"
f" format: numpy\n"
f" path: {edge_feat_path}\n"
f"tasks:\n"
f" - name: node_classification\n"
f" num_classes: {num_classes}\n"
f" train_set:\n"
f" - type: null\n"
f" data:\n"
f" - name: node_pairs\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {train_path}\n"
f" validation_set:\n"
f" - type: null\n"
f" data:\n"
f" - name: node_pairs\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {valid_path}\n"
f" test_set:\n"
f" - type: null\n"
f" data:\n"
f" - name: node_pairs\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {test_path}\n"
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
output_file = gb.ondisk_dataset.preprocess_ondisk_dataset(
test_dir,
include_original_edge_id=True,
)
with open(output_file, "rb") as f:
processed_dataset = yaml.load(f, Loader=yaml.Loader)
assert processed_dataset["dataset_name"] == dataset_name
assert processed_dataset["tasks"][0]["num_classes"] == num_classes
assert "graph" not in processed_dataset
assert "graph_topology" in processed_dataset
fused_csc_sampling_graph = torch.load(
os.path.join(test_dir, processed_dataset["graph_topology"]["path"])
)
assert fused_csc_sampling_graph.total_num_nodes == num_nodes
assert fused_csc_sampling_graph.total_num_edges == num_edges
assert torch.equal(
fused_csc_sampling_graph.csc_indptr,
torch.tensor([0, 2, 4, 6, 8, 10]),
)
assert torch.equal(
fused_csc_sampling_graph.indices,
torch.tensor([3, 4, 0, 4, 0, 1, 1, 2, 2, 3]),
)
assert torch.equal(
fused_csc_sampling_graph.node_attributes["feat"],
torch.tensor([0.0, 1.9, 2.8, 3.7, 4.6], dtype=torch.float64),
)
assert torch.equal(
fused_csc_sampling_graph.edge_attributes["feat"],
torch.tensor(
[0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9],
dtype=torch.float64,
),
)
assert torch.equal(
fused_csc_sampling_graph.edge_attributes[gb.ORIGINAL_EDGE_ID],
torch.tensor([7, 8, 0, 9, 1, 2, 3, 4, 5, 6]),
)
num_samples = 5
fanout = 1
subgraph = fused_csc_sampling_graph.sample_neighbors(
torch.arange(num_samples),
torch.tensor([fanout]),
)
assert len(subgraph.sampled_csc.indices) <= num_samples
def test_OnDiskDataset_preprocess_heterogeneous_hardcode(edge_fmt="numpy"):
"""Test preprocess of OnDiskDataset."""
with tempfile.TemporaryDirectory() as test_dir:
"""Original graph in COO:
0 1 1 0 0
0 0 1 1 0
0 0 0 1 1
1 0 0 0 1
1 1 0 0 0
node_type_0: [0, 1]
node_type_1: [2, 3, 4]
edge_type_0: node_type_0 -> node_type_0
edge_type_1: node_type_0 -> node_type_1
edge_type_2: node_type_1 -> node_type_1
edge_type_3: node_type_1 -> node_type_0
node_feats: [0.0, 1.9, 2.8, 3.7, 4.6]
edge_feats: [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9]
"""
dataset_name = "graphbolt_test"
num_nodes = {
"A": 2,
"B": 3,
}
num_edges = {
("A", "a_a", "A"): 1,
("A", "a_b", "B"): 3,
("B", "b_b", "A"): 3,
("B", "b_a", "B"): 3,
}
num_classes = 1
# Generate edges.
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
np.save(
os.path.join(test_dir, "edges", "a_a.npy"), np.array([[0], [1]])
)
np.save(
os.path.join(test_dir, "edges", "a_b.npy"),
np.array([[0, 1, 1], [0, 0, 1]]),
)
np.save(
os.path.join(test_dir, "edges", "b_b.npy"),
np.array([[0, 0, 1], [1, 2, 2]]),
)
np.save(
os.path.join(test_dir, "edges", "b_a.npy"),
np.array([[1, 2, 2], [0, 0, 1]]),
)
# Generate node features.
os.makedirs(os.path.join(test_dir, "data"), exist_ok=True)
np.save(
os.path.join(test_dir, "data", "A-feat.npy"), np.array([0.0, 1.9])
)
np.save(
os.path.join(test_dir, "data", "B-feat.npy"),
np.array([2.8, 3.7, 4.6]),
)
# Generate edge features.
os.makedirs(os.path.join(test_dir, "data"), exist_ok=True)
np.save(os.path.join(test_dir, "data", "a_a-feat.npy"), np.array([0.0]))
np.save(
os.path.join(test_dir, "data", "a_b-feat.npy"),
np.array([1.1, 2.2, 3.3]),
)
np.save(
os.path.join(test_dir, "data", "b_b-feat.npy"),
np.array([4.4, 5.5, 6.6]),
)
np.save(
os.path.join(test_dir, "data", "b_a-feat.npy"),
np.array([7.7, 8.8, 9.9]),
)
yaml_content = (
f"dataset_name: {dataset_name}\n"
f"graph:\n"
f" nodes:\n"
f" - type: A\n"
f" num: 2\n"
f" - type: B\n"
f" num: 3\n"
f" edges:\n"
f" - type: A:a_a:A\n"
f" format: {edge_fmt}\n"
f" path: {os.path.join('edges', 'a_a.npy')}\n"
f" - type: A:a_b:B\n"
f" format: {edge_fmt}\n"
f" path: {os.path.join('edges', 'a_b.npy')}\n"
f" - type: B:b_b:B\n"
f" format: {edge_fmt}\n"
f" path: {os.path.join('edges', 'b_b.npy')}\n"
f" - type: B:b_a:A\n"
f" format: {edge_fmt}\n"
f" path: {os.path.join('edges', 'b_a.npy')}\n"
f" feature_data:\n"
f" - domain: node\n"
f" type: A\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {os.path.join(test_dir, 'data', 'A-feat.npy')}\n"
f" - domain: node\n"
f" type: B\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {os.path.join(test_dir, 'data', 'B-feat.npy')}\n"
f" - domain: edge\n"
f" type: A:a_a:A\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {os.path.join(test_dir, 'data', 'a_a-feat.npy')}\n"
f" - domain: edge\n"
f" type: A:a_b:B\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {os.path.join(test_dir, 'data', 'a_b-feat.npy')}\n"
f" - domain: edge\n"
f" type: B:b_b:B\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {os.path.join(test_dir, 'data', 'b_b-feat.npy')}\n"
f" - domain: edge\n"
f" type: B:b_a:A\n"
f" name: feat\n"
f" format: numpy\n"
f" in_memory: true\n"
f" path: {os.path.join(test_dir, 'data', 'b_a-feat.npy')}\n"
)
yaml_file = os.path.join(test_dir, "metadata.yaml")
with open(yaml_file, "w") as f:
f.write(yaml_content)
output_file = gb.ondisk_dataset.preprocess_ondisk_dataset(
test_dir,
include_original_edge_id=True,
)
with open(output_file, "rb") as f:
processed_dataset = yaml.load(f, Loader=yaml.Loader)
assert processed_dataset["dataset_name"] == dataset_name
assert "graph" not in processed_dataset
assert "graph_topology" in processed_dataset
fused_csc_sampling_graph = torch.load(
os.path.join(test_dir, processed_dataset["graph_topology"]["path"])
)
assert fused_csc_sampling_graph.total_num_nodes == 5
assert fused_csc_sampling_graph.total_num_edges == 10
assert torch.equal(
fused_csc_sampling_graph.csc_indptr,
torch.tensor([0, 2, 4, 6, 8, 10]),
)
assert torch.equal(
fused_csc_sampling_graph.indices,
torch.tensor([3, 4, 0, 4, 0, 1, 1, 2, 2, 3]),
)
assert torch.equal(
fused_csc_sampling_graph.node_attributes["feat"],
torch.tensor([0.0, 1.9, 2.8, 3.7, 4.6], dtype=torch.float64),
)
assert torch.equal(
fused_csc_sampling_graph.edge_attributes["feat"],
torch.tensor(
[0.0, 1.1, 2.2, 3.3, 7.7, 8.8, 9.9, 4.4, 5.5, 6.6],
dtype=torch.float64,
),
)
assert torch.equal(
fused_csc_sampling_graph.type_per_edge,
torch.tensor([2, 2, 0, 2, 1, 1, 1, 3, 3, 3]),
)
assert torch.equal(
fused_csc_sampling_graph.edge_attributes[gb.ORIGINAL_EDGE_ID],
torch.tensor([0, 1, 0, 2, 0, 1, 2, 0, 1, 2]),
)
def test_OnDiskDataset_preprocess_path():
"""Test if the preprocess function can catch the path error."""
with tempfile.TemporaryDirectory() as test_dir:
......@@ -1244,7 +1593,7 @@ def test_OnDiskDataset_preprocess_yaml_content_unix():
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
# Write into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges/"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edges.to_csv(
......@@ -1398,7 +1747,7 @@ def test_OnDiskDataset_preprocess_yaml_content_windows():
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
# Write into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges\\"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edges.to_csv(
......@@ -2143,7 +2492,7 @@ def test_OnDiskDataset_load_1D_feature(fmt):
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
# Write into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edge_path = os.path.join("edges", "edge.csv")
......@@ -2756,7 +3105,7 @@ def test_OnDiskDataset_preprocess_graph_with_single_type():
nodes = np.repeat(np.arange(num_nodes), 5)
neighbors = np.random.randint(0, num_nodes, size=(num_edges))
edges = np.stack([nodes, neighbors], axis=1)
# Wrtie into edges/edge.csv
# Write into edges/edge.csv
os.makedirs(os.path.join(test_dir, "edges/"), exist_ok=True)
edges = pd.DataFrame(edges, columns=["src", "dst"])
edges.to_csv(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment