Unverified Commit 9731e023 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist] enable to chunk node/edge data into arbitrary number of chunks (#4930)



* [Dist] enable to chunk node/edge data into arbitrary number of chunks

* [Dist] enable to split node/edge data into arbitrary parts

* refine code

* Format boolean to uint8 forcely to avoid dist.scatter failure

* convert boolean to int8 before scatter and revert it after scatter

* refine code

* fix test

* refine code

* move test utilities into utils.py

* update comment

* fix empty data

* update

* update

* fix empty data issue

* release unnecessary mem

* release unnecessary mem

* release unnecessary mem

* release unnecessary mem

* release unnecessary mem

* remove unnecessary shuffle data

* separate array_split into standalone utility

* add example
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
parent 08b60eb1
......@@ -38,4 +38,4 @@ export DMLC_LOG_DEBUG=1
python3 -m pytest -v --capture=tee-sys --junitxml=pytest_distributed.xml --durations=100 tests/distributed/*.py || fail "distributed"
PYTHONPATH=tools:$PYTHONPATH python3 -m pytest -v --capture=tee-sys --junitxml=pytest_tools.xml --durations=100 tests/tools/*.py || fail "tools"
PYTHONPATH=tools:tools/distpartitioning:$PYTHONPATH python3 -m pytest -v --capture=tee-sys --junitxml=pytest_tools.xml --durations=100 tests/tools/*.py || fail "tools"
import argparse
import json
import logging
import os
import platform
import sys
import tempfile
import dgl
import numpy as np
import torch
from chunk_graph import chunk_graph
from dgl.data.utils import load_graphs, load_tensors
def create_chunked_dataset(
root_dir, num_chunks, include_masks=False, data_fmt='numpy'
):
"""
This function creates a sample dataset, based on MAG240 dataset.
Parameters:
-----------
root_dir : string
directory in which all the files for the chunked dataset will be stored.
"""
# Step0: prepare chunked graph data format.
# A synthetic mini MAG240.
num_institutions = 1200
num_authors = 1200
num_papers = 1200
def rand_edges(num_src, num_dst, num_edges):
eids = np.random.choice(num_src * num_dst, num_edges, replace=False)
src = torch.from_numpy(eids // num_dst)
dst = torch.from_numpy(eids % num_dst)
return src, dst
num_cite_edges = 24 * 1000
num_write_edges = 12 * 1000
num_affiliate_edges = 2400
# Structure.
data_dict = {
('paper', 'cites', 'paper'): rand_edges(
num_papers, num_papers, num_cite_edges
),
('author', 'writes', 'paper'): rand_edges(
num_authors, num_papers, num_write_edges
),
('author', 'affiliated_with', 'institution'): rand_edges(
num_authors, num_institutions, num_affiliate_edges
),
('institution', 'writes', 'paper'): rand_edges(
num_institutions, num_papers, num_write_edges
),
}
src, dst = data_dict[('author', 'writes', 'paper')]
data_dict[('paper', 'rev_writes', 'author')] = (dst, src)
g = dgl.heterograph(data_dict)
# paper feat, label, year
num_paper_feats = 3
paper_feat = np.random.randn(num_papers, num_paper_feats)
num_classes = 4
paper_label = np.random.choice(num_classes, num_papers)
paper_year = np.random.choice(2022, num_papers)
paper_orig_ids = np.arange(0, num_papers)
writes_orig_ids = np.arange(0, num_write_edges)
# masks.
if include_masks:
paper_train_mask = np.random.randint(0, 2, num_papers)
paper_test_mask = np.random.randint(0, 2, num_papers)
paper_val_mask = np.random.randint(0, 2, num_papers)
author_train_mask = np.random.randint(0, 2, num_authors)
author_test_mask = np.random.randint(0, 2, num_authors)
author_val_mask = np.random.randint(0, 2, num_authors)
inst_train_mask = np.random.randint(0, 2, num_institutions)
inst_test_mask = np.random.randint(0, 2, num_institutions)
inst_val_mask = np.random.randint(0, 2, num_institutions)
# Edge features.
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
write2_year = np.random.choice(2022, num_write_edges)
# Save features.
input_dir = os.path.join(root_dir, 'data_test')
os.makedirs(input_dir)
for sub_d in ['paper', 'cites', 'writes', 'writes2']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_feat_path = os.path.join(input_dir, 'paper/feat.npy')
with open(paper_feat_path, 'wb') as f:
np.save(f, paper_feat)
g.nodes['paper'].data['feat'] = torch.from_numpy(paper_feat)
paper_label_path = os.path.join(input_dir, 'paper/label.npy')
with open(paper_label_path, 'wb') as f:
np.save(f, paper_label)
g.nodes['paper'].data['label'] = torch.from_numpy(paper_label)
paper_year_path = os.path.join(input_dir, 'paper/year.npy')
with open(paper_year_path, 'wb') as f:
np.save(f, paper_year)
g.nodes['paper'].data['year'] = torch.from_numpy(paper_year)
paper_orig_ids_path = os.path.join(input_dir, 'paper/orig_ids.npy')
with open(paper_orig_ids_path, 'wb') as f:
np.save(f, paper_orig_ids)
g.nodes['paper'].data['orig_ids'] = torch.from_numpy(paper_orig_ids)
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
g.edges['cites'].data['count'] = torch.from_numpy(cite_count)
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
g.edges[('author', 'writes', 'paper')].data['year'] = torch.from_numpy(write_year)
g.edges['rev_writes'].data['year'] = torch.from_numpy(write_year)
writes_orig_ids_path = os.path.join(input_dir, 'writes/orig_ids.npy')
with open(writes_orig_ids_path, 'wb') as f:
np.save(f, writes_orig_ids)
g.edges[('author', 'writes', 'paper')].data['orig_ids'] = torch.from_numpy(writes_orig_ids)
write2_year_path = os.path.join(input_dir, 'writes2/year.npy')
with open(write2_year_path, 'wb') as f:
np.save(f, write2_year)
g.edges[('institution', 'writes', 'paper')].data['year'] = torch.from_numpy(write2_year)
node_data = None
if include_masks:
for sub_d in ['author', 'institution']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_train_mask_path = os.path.join(input_dir, 'paper/train_mask.npy')
with open(paper_train_mask_path, 'wb') as f:
np.save(f, paper_train_mask)
paper_test_mask_path = os.path.join(input_dir, 'paper/test_mask.npy')
with open(paper_test_mask_path, 'wb') as f:
np.save(f, paper_test_mask)
paper_val_mask_path = os.path.join(input_dir, 'paper/val_mask.npy')
with open(paper_val_mask_path, 'wb') as f:
np.save(f, paper_val_mask)
author_train_mask_path = os.path.join(
input_dir, 'author/train_mask.npy'
)
with open(author_train_mask_path, 'wb') as f:
np.save(f, author_train_mask)
author_test_mask_path = os.path.join(input_dir, 'author/test_mask.npy')
with open(author_test_mask_path, 'wb') as f:
np.save(f, author_test_mask)
author_val_mask_path = os.path.join(input_dir, 'author/val_mask.npy')
with open(author_val_mask_path, 'wb') as f:
np.save(f, author_val_mask)
inst_train_mask_path = os.path.join(
input_dir, 'institution/train_mask.npy'
)
with open(inst_train_mask_path, 'wb') as f:
np.save(f, inst_train_mask)
inst_test_mask_path = os.path.join(
input_dir, 'institution/test_mask.npy'
)
with open(inst_test_mask_path, 'wb') as f:
np.save(f, inst_test_mask)
inst_val_mask_path = os.path.join(input_dir, 'institution/val_mask.npy')
with open(inst_val_mask_path, 'wb') as f:
np.save(f, inst_val_mask)
node_data = {
'paper': {
'feat': paper_feat_path,
'train_mask': paper_train_mask_path,
'test_mask': paper_test_mask_path,
'val_mask': paper_val_mask_path,
'label': paper_label_path,
'year': paper_year_path,
'orig_ids': paper_orig_ids_path,
},
'author': {
'train_mask': author_train_mask_path,
'test_mask': author_test_mask_path,
'val_mask': author_val_mask_path,
},
'institution': {
'train_mask': inst_train_mask_path,
'test_mask': inst_test_mask_path,
'val_mask': inst_val_mask_path,
},
}
else:
node_data = {
'paper': {
'feat': paper_feat_path,
'label': paper_label_path,
'year': paper_year_path,
'orig_ids': paper_orig_ids_path,
}
}
edge_data = {
'cites': {'count': cite_count_path},
('author', 'writes', 'paper'): {
'year': write_year_path,
'orig_ids': writes_orig_ids_path
},
'rev_writes': {'year': write_year_path},
('institution', 'writes', 'paper'): {
'year': write2_year_path,
},
}
output_dir = os.path.join(root_dir, 'chunked-data')
chunk_graph(
g,
'mag240m',
node_data,
edge_data,
num_chunks=num_chunks,
output_path=output_dir,
data_fmt=data_fmt,
)
print('Done with creating chunked graph')
return g
import json
import os
import tempfile
import unittest
import numpy as np
import pytest
import torch
from chunk_graph import chunk_graph
from create_chunked_dataset import create_chunked_dataset
from utils import create_chunked_dataset
from distpartitioning import array_readwriter
from distpartitioning.utils import generate_read_list
import dgl
from dgl.data.utils import load_graphs, load_tensors
......@@ -55,7 +55,7 @@ def _verify_graph_feats(
continue
true_feats = g.nodes[ntype].data[name][orig_id]
ndata = node_feats[ntype + "/" + name][local_nids]
assert torch.equal(ndata, true_feats)
assert np.array_equal(ndata.numpy(), true_feats.numpy())
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
......@@ -74,16 +74,26 @@ def _verify_graph_feats(
continue
true_feats = g.edges[etype].data[name][orig_id]
edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][local_eids]
assert torch.equal(edata, true_feats)
assert np.array_equal(edata.numpy(), true_feats.numpy())
@pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("data_fmt", ['numpy', 'parquet'])
def test_chunk_graph(num_chunks, data_fmt):
def _test_chunk_graph(
num_chunks,
data_fmt = 'numpy',
num_chunks_nodes = None,
num_chunks_edges = None,
num_chunks_node_data = None,
num_chunks_edge_data = None
):
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks, data_fmt=data_fmt)
g = create_chunked_dataset(root_dir, num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data
)
# check metadata.json
output_dir = os.path.join(root_dir, "chunked-data")
......@@ -98,7 +108,11 @@ def test_chunk_graph(num_chunks, data_fmt):
output_edge_index_dir = os.path.join(output_dir, "edge_index")
for c_etype in g.canonical_etypes:
c_etype_str = _etype_tuple_to_str(c_etype)
for i in range(num_chunks):
if num_chunks_edges is None:
n_chunks = num_chunks
else:
n_chunks = num_chunks_edges
for i in range(n_chunks):
fname = os.path.join(
output_edge_index_dir, f'{c_etype_str}{i}.txt'
)
......@@ -112,11 +126,13 @@ def test_chunk_graph(num_chunks, data_fmt):
# check node/edge_data
suffix = 'npy' if data_fmt=='numpy' else 'parquet'
reader_fmt_meta = {"name": data_fmt}
def test_data(sub_dir, feat, expected_data, expected_shape):
def test_data(
sub_dir, feat, expected_data, expected_shape, num_chunks
):
data = []
for i in range(num_chunks):
fname = os.path.join(sub_dir, f'{feat}-{i}.{suffix}')
assert os.path.isfile(fname)
assert os.path.isfile(fname), f'{fname} cannot be found.'
feat_array = array_readwriter.get_array_parser(
**reader_fmt_meta
).read(fname)
......@@ -128,18 +144,87 @@ def test_chunk_graph(num_chunks, data_fmt):
output_node_data_dir = os.path.join(output_dir, "node_data")
for ntype in g.ntypes:
sub_dir = os.path.join(output_node_data_dir, ntype)
if isinstance(num_chunks_node_data, int):
chunks_data = num_chunks_node_data
elif isinstance(num_chunks_node_data, dict):
chunks_data = num_chunks_node_data.get(ntype, num_chunks)
else:
chunks_data = num_chunks
for feat, data in g.nodes[ntype].data.items():
test_data(sub_dir, feat, data, g.num_nodes(ntype) // num_chunks)
if isinstance(chunks_data, dict):
n_chunks = chunks_data.get(feat, num_chunks)
else:
n_chunks = chunks_data
test_data(sub_dir, feat, data, g.num_nodes(ntype) // n_chunks,
n_chunks)
output_edge_data_dir = os.path.join(output_dir, "edge_data")
for c_etype in g.canonical_etypes:
c_etype_str = _etype_tuple_to_str(c_etype)
sub_dir = os.path.join(output_edge_data_dir, c_etype_str)
if isinstance(num_chunks_edge_data, int):
chunks_data = num_chunks_edge_data
elif isinstance(num_chunks_edge_data, dict):
chunks_data = num_chunks_edge_data.get(c_etype, num_chunks)
else:
chunks_data = num_chunks
for feat, data in g.edges[c_etype].data.items():
test_data(sub_dir, feat, data, g.num_edges(c_etype) // num_chunks)
if isinstance(chunks_data, dict):
n_chunks = chunks_data.get(feat, num_chunks)
else:
n_chunks = chunks_data
test_data(sub_dir, feat, data, g.num_edges(c_etype) // n_chunks,
n_chunks)
def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None, data_fmt='numpy'):
@pytest.mark.parametrize("num_chunks", [1, 8])
@pytest.mark.parametrize("data_fmt", ['numpy', 'parquet'])
def test_chunk_graph_basics(num_chunks, data_fmt):
_test_chunk_graph(num_chunks, data_fmt=data_fmt)
@pytest.mark.parametrize(
"num_chunks, "
"num_chunks_nodes, "
"num_chunks_edges, "
"num_chunks_node_data, "
"num_chunks_edge_data",
[
[1, None, None, None, None],
[8, None, None, None, None],
[4, 4, 4, 8, 12],
[4, 4, 4, {'paper': 10}, {('author', 'writes', 'paper'): 24}],
[4, 4, 4, {'paper': {'feat': 10}},
{('author', 'writes', 'paper'): {'year': 24}}],
]
)
def test_chunk_graph_arbitray_chunks(
num_chunks,
num_chunks_nodes,
num_chunks_edges,
num_chunks_node_data,
num_chunks_edge_data
):
_test_chunk_graph(
num_chunks,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data
)
def _test_pipeline(
num_chunks,
num_parts,
world_size,
graph_formats=None,
data_fmt='numpy',
num_chunks_nodes=None,
num_chunks_edges=None,
num_chunks_node_data=None,
num_chunks_edge_data=None
):
if num_chunks < num_parts:
# num_parts should less/equal than num_chunks
return
......@@ -150,7 +235,13 @@ def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None, data_f
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(root_dir, num_chunks, data_fmt=data_fmt)
g = create_chunked_dataset(root_dir, num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data
)
# Step1: graph partition
in_dir = os.path.join(root_dir, "chunked-data")
......@@ -216,7 +307,9 @@ def _test_pipeline(num_chunks, num_parts, world_size, graph_formats=None, data_f
)
@pytest.mark.parametrize("num_chunks, num_parts, world_size", [[8, 4, 2], [9, 6, 3], [11, 11, 1], [11, 4, 2], [5, 3, 1]])
@pytest.mark.parametrize("num_chunks, num_parts, world_size",
[[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]]
)
def test_pipeline_basics(num_chunks, num_parts, world_size):
_test_pipeline(num_chunks, num_parts, world_size)
......@@ -229,7 +322,52 @@ def test_pipeline_formats(graph_formats):
@pytest.mark.parametrize(
"data_fmt", ['numpy', "parquet"]
"num_chunks, "
"num_parts, "
"world_size, "
"num_chunks_node_data, "
"num_chunks_edge_data",
[
[8, 4, 2, 20, 25],
[9, 7, 5, 3, 11],
[8, 8, 4, 3, 5],
[8, 4, 2, {'paper': {'feat': 11, 'year': 1}},
{('author', 'writes', 'paper'): {'year': 24}}],
]
)
def test_pipeline_arbitray_chunks(
num_chunks,
num_parts,
world_size,
num_chunks_node_data,
num_chunks_edge_data,
):
_test_pipeline(
num_chunks,
num_parts,
world_size,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
@pytest.mark.parametrize(
"graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_pipeline_formats(graph_formats):
_test_pipeline(4, 4, 4, graph_formats)
@pytest.mark.parametrize(
"data_fmt", ["numpy", "parquet"]
)
def test_pipeline_feature_format(data_fmt):
_test_pipeline(4, 4, 4, data_fmt=data_fmt)
def test_utils_generate_read_list():
read_list = generate_read_list(10, 4)
assert np.array_equal(read_list[0], np.array([0, 1, 2]))
assert np.array_equal(read_list[1], np.array([3, 4, 5]))
assert np.array_equal(read_list[2], np.array([6, 7]))
assert np.array_equal(read_list[3], np.array([8, 9]))
......@@ -8,10 +8,9 @@ import unittest
import dgl
import numpy as np
import torch
from chunk_graph import chunk_graph
from dgl.data.utils import load_graphs, load_tensors
from create_chunked_dataset import create_chunked_dataset
from utils import create_chunked_dataset
from partition_algo.base import load_partition_meta
"""
......@@ -24,7 +23,7 @@ not yet configured in the CI framework.
def test_parmetis_preprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks, include_masks=True)
g = create_chunked_dataset(root_dir, num_chunks)
# Trigger ParMETIS pre-processing here.
schema_path = os.path.join(root_dir, 'chunked-data/metadata.json')
......@@ -115,7 +114,7 @@ def test_parmetis_preprocessing():
def test_parmetis_postprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks, include_masks=True)
g = create_chunked_dataset(root_dir, num_chunks)
num_nodes = g.number_of_nodes()
num_institutions = g.number_of_nodes('institution')
......@@ -182,7 +181,7 @@ def test_parmetis_wrapper():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
graph_name = "mag240m"
g = create_chunked_dataset(root_dir, num_chunks, include_masks=True)
g = create_chunked_dataset(root_dir, num_chunks)
all_ntypes = g.ntypes
all_etypes = g.etypes
num_constraints = len(all_ntypes) + 3
......
This diff is collapsed.
......@@ -172,7 +172,7 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data):
input_list.append(torch.from_numpy(filt_data))
dist.barrier ()
output_list = alltoallv_cpu(rank, world_size, input_list)
output_list = alltoallv_cpu(rank, world_size, input_list, retain_nones=False)
#Replace the values of the edge_data, with the received data from all the other processes.
rcvd_edge_data = torch.cat(output_list).numpy()
......@@ -292,28 +292,32 @@ def exchange_feature(rank, data, id_lookup, feat_type, feat_key, featdata_key, g
if (gids_per_partid.shape[0] == 0):
feats_per_rank.append(torch.empty((0,1), dtype=torch.float))
global_id_per_rank.append(np.empty((0,1), dtype=np.int64))
global_id_per_rank.append(torch.empty((0,1), dtype=torch.int64))
else:
feats_per_rank.append(featdata_key[local_idx_partid])
global_id_per_rank.append(torch.from_numpy(gids_per_partid).type(torch.int64))
#features (and global nids) per rank to be sent out are ready
#for transmission, perform alltoallv here.
output_feat_list = alltoallv_cpu(rank, world_size, feats_per_rank)
output_id_list = alltoallv_cpu(rank, world_size, global_id_per_rank)
output_feat_list = alltoallv_cpu(rank, world_size, feats_per_rank, retain_nones=False)
output_id_list = alltoallv_cpu(rank, world_size, global_id_per_rank, retain_nones=False)
assert len(output_feat_list) == len(output_id_list), (
"Length of feature list and id list are expected to be equal while "
f"got {len(output_feat_list)} and {len(output_id_list)}."
)
#stitch node_features together to form one large feature tensor
output_feat_list = torch.cat(output_feat_list)
output_id_list = torch.cat(output_id_list)
if local_feat_key in cur_features:
temp = cur_features[local_feat_key]
cur_features[local_feat_key] = torch.cat([temp, output_feat_list])
temp = cur_global_ids[local_feat_key]
cur_global_ids[local_feat_key] = torch.cat([temp, output_id_list])
else:
cur_features[local_feat_key] = output_feat_list
cur_global_ids[local_feat_key] = output_id_list
if len(output_feat_list) > 0:
output_feat_list = torch.cat(output_feat_list)
output_id_list = torch.cat(output_id_list)
if local_feat_key in cur_features:
temp = cur_features[local_feat_key]
cur_features[local_feat_key] = torch.cat([temp, output_feat_list])
temp = cur_global_ids[local_feat_key]
cur_global_ids[local_feat_key] = torch.cat([temp, output_id_list])
else:
cur_features[local_feat_key] = output_feat_list
cur_global_ids[local_feat_key] = output_id_list
return cur_features, cur_global_ids
......
import logging
import os
import gc
import numpy as np
import pyarrow
import torch
from pyarrow import csv
import array_readwriter
import torch.distributed as dist
import array_readwriter
import constants
from utils import get_idranges, map_partid_rank
from utils import get_idranges, map_partid_rank, generate_read_list
from gloo_wrapper import alltoallv_cpu
DATA_TYPE_ID = {
data_type: id for id, data_type in enumerate([
torch.float32,
torch.float64,
torch.float16,
torch.uint8,
torch.int8,
torch.int16,
torch.int32,
torch.int64,
torch.bool,
])
}
REV_DATA_TYPE_ID = {
id: data_type for data_type, id in DATA_TYPE_ID.items()
}
def _shuffle_data(data, rank, world_size, tids, num_parts):
'''Each process scatters loaded data to all processes in a group and
return gathered data.
Parameters
----------
data: tensor
Loaded node or edge data.
rank: int
Rank of current process.
world_size: int
Total number of processes in group.
tids: list[tuple]
Type-wise node/edge IDs.
num_parts: int
Number of partitions.
Returns
-------
shuffled_data: tensor
Shuffled node or edge data.
'''
# Broadcast basic information of loaded data:
# 1. number of data lines
# 2. data dimension
# 3. data type
assert len(data.shape) in [1, 2], (
f"Data is expected to be 1-D or 2-D but got {data.shape}."
)
data_shape = list(data.shape)
if len(data_shape) == 1:
data_shape.append(1)
data_shape.append(DATA_TYPE_ID[data.dtype])
data_shape = torch.tensor(data_shape, dtype=torch.int64)
data_shape_output = [
torch.zeros_like(data_shape) for _ in range(world_size)
]
dist.all_gather(data_shape_output, data_shape)
# Rank~0 always succeeds to load non-empty data, so we fetch info from it.
data_dim = data_shape_output[0][1].item()
data_type = REV_DATA_TYPE_ID[data_shape_output[0][2].item()]
data_lines = [data_shape[0].item() for data_shape in data_shape_output]
data_lines.insert(0, 0)
data_lines = np.cumsum(data_lines)
# prepare for scatter
data_list = [None] * world_size
if data.shape[0] > 0:
for local_part_id in range(num_parts):
target_rank = map_partid_rank(local_part_id, world_size)
start, end = tids[local_part_id]
global_start = data_lines[rank]
global_end = data_lines[rank + 1]
if start >= global_end or end <= global_start:
continue
read_start = max(0, start - global_start)
read_end = min(data.shape[0], end - global_start)
if data_list[target_rank] is None:
data_list[target_rank] = []
data_list[target_rank].append(data[read_start:read_end])
data_input = [None] * world_size
for i, data in enumerate(data_list):
if data is None or len(data) == 0:
if data_dim == 1:
data_input[i] = torch.zeros((0,), dtype=data_type)
else:
data_input[i] = torch.zeros((0, data_dim), dtype=data_type)
else:
data_input[i] = torch.cat(data).to(dtype=data_type)
del data_list
gc.collect()
local_data = data_input[rank]
if data_dim == 1:
data_input[rank] = torch.zeros((0,), dtype=data_type)
else:
data_input[rank] = torch.zeros((0, data_dim), dtype=data_type)
# scatter and gather data
data_output = alltoallv_cpu(rank, world_size, data_input)
data_output[rank] = local_data
data_output = [data for data in data_output if data is not None]
data_output = torch.cat(data_output)
return data_output
def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
......@@ -147,28 +260,51 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
assert (feat_data[constants.STR_FORMAT][constants.STR_NAME]
in [constants.STR_NUMPY, constants.STR_PARQUET])
# It is guaranteed that num_chunks is always greater
# than num_partitions.
num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), num_parts)
reader_fmt_meta = {"name": feat_data[constants.STR_FORMAT][constants.STR_NAME]}
# than num_partitions.
node_data = []
num_files = len(feat_data[constants.STR_DATA])
if num_files == 0:
continue
reader_fmt_meta = {
"name": feat_data[constants.STR_FORMAT][constants.STR_NAME]
}
read_list = generate_read_list(num_files, world_size)
for idx in read_list[rank]:
data_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(data_file):
data_file = os.path.join(input_dir, data_file)
node_data.append(
array_readwriter.get_array_parser(
**reader_fmt_meta
).read(data_file)
)
if len(node_data) > 0:
node_data = np.concatenate(node_data)
else:
node_data = np.array([])
node_data = torch.from_numpy(node_data)
# scatter and gather data.
node_data = _shuffle_data(
node_data,
rank,
world_size,
node_tids[ntype_name],
num_parts)
# collect data on current rank.
offset = 0
for local_part_id in range(num_parts):
if map_partid_rank(local_part_id, world_size) == rank:
nfeat = []
nfeat_tids = []
for idx in read_list[local_part_id]:
nfeat_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(nfeat_file):
nfeat_file = os.path.join(input_dir, nfeat_file)
logging.info(f'Loading node feature[{feat_name}] of ntype[{ntype_name}] from {nfeat_file}')
nfeat.append(
array_readwriter.get_array_parser(
**reader_fmt_meta
).read(nfeat_file)
)
nfeat = np.concatenate(nfeat) if len(nfeat) != 0 else np.array([])
node_features[ntype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = torch.from_numpy(nfeat)
start, end = node_tids[ntype_name][local_part_id]
nfeat = node_data[offset : offset + end - start]
data_key = f"{ntype_name}/{feat_name}/{local_part_id//world_size}"
node_features[data_key] = nfeat
nfeat_tids.append(node_tids[ntype_name][local_part_id])
node_feature_tids[ntype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = nfeat_tids
node_feature_tids[data_key] = nfeat_tids
offset += end - start
#done building node_features locally.
if len(node_features) <= 0:
......@@ -193,7 +329,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
# Iterate over the range of type ids for the current node feature
# and count the number of features for this feature name.
count = tids[0][1] - tids[0][0]
assert count == feat_info.size()[0]
assert count == feat_info.size()[0], f"{feat_name}, {count} vs {feat_info.size()[0]}."
'''
......@@ -247,29 +383,53 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
if dataset_features and (len(dataset_features) > 0):
for etype_name, etype_feature_data in dataset_features.items():
for feat_name, feat_data in etype_feature_data.items():
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] in [constants.STR_NUMPY, constants.STR_PARQUET]
reader_fmt_meta = {"name": feat_data[constants.STR_FORMAT][constants.STR_NAME]}
num_chunks = len(feat_data[constants.STR_DATA])
read_list = np.array_split(np.arange(num_chunks), num_parts)
assert (feat_data[constants.STR_FORMAT][constants.STR_NAME]
in [constants.STR_NUMPY, constants.STR_PARQUET])
edge_data = []
num_files = len(feat_data[constants.STR_DATA])
if num_files == 0:
continue
reader_fmt_meta = {
"name": feat_data[constants.STR_FORMAT][constants.STR_NAME]
}
read_list = generate_read_list(num_files, world_size)
for idx in read_list[rank]:
data_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(data_file):
data_file = os.path.join(input_dir, data_file)
edge_data.append(
array_readwriter.get_array_parser(
**reader_fmt_meta
).read(data_file)
)
if len(edge_data) > 0:
edge_data = np.concatenate(edge_data)
else:
edge_data = np.array([])
edge_data = torch.from_numpy(edge_data)
# scatter and gather data.
edge_data = _shuffle_data(
edge_data,
rank,
world_size,
edge_tids[etype_name],
num_parts)
# collect data on current rank.
offset = 0
for local_part_id in range(num_parts):
if map_partid_rank(local_part_id, world_size) == rank:
efeats = []
efeat_tids = []
for idx in read_list[local_part_id]:
efeat_file = feat_data[constants.STR_DATA][idx]
if not os.path.isabs(efeat_file):
efeat_file = os.path.join(input_dir, efeat_file)
logging.info(
f'Loading edge feature[{feat_name}] of etype[{etype_name}] from {efeat_file}'
)
efeats.append(
array_readwriter.get_array_parser(
**reader_fmt_meta
).read(efeat_file)
)
start, end = edge_tids[etype_name][local_part_id]
efeats = edge_data[offset : offset + end - start]
efeat_tids.append(edge_tids[etype_name][local_part_id])
edge_features[etype_name+'/'+feat_name+"/"+str(local_part_id//world_size)] = torch.from_numpy(np.concatenate(efeats))
edge_feature_tids[etype_name+"/"+feat_name+"/"+str(local_part_id//world_size)] = efeat_tids
data_key = f"{etype_name}/{feat_name}/{local_part_id//world_size}"
edge_features[data_key] = efeats
edge_feature_tids[data_key] = efeat_tids
offset += end - start
# Done with building node_features locally.
if len(edge_features) <= 0:
......@@ -353,7 +513,7 @@ def get_dataset(input_dir, graph_name, rank, world_size, num_parts, schema_map):
dst_ntype_name = tokens[2]
num_chunks = len(edge_info)
read_list = np.array_split(np.arange(num_chunks), num_parts)
read_list = generate_read_list(num_chunks, num_parts)
src_ids = []
dst_ids = []
......
......@@ -73,10 +73,23 @@ def __alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
The tensors to exchange
"""
input_tensor_list = [tensor.to(torch.device('cpu')) for tensor in input_tensor_list]
# TODO(#5002): As Boolean data is not supported in
# ``torch.distributed.scatter()``, we convert boolean into uint8 before
# scatter and convert it back afterwards.
dtypes = [ t.dtype for t in input_tensor_list]
for i, dtype in enumerate(dtypes):
if dtype == torch.bool:
input_tensor_list[i] = input_tensor_list[i].to(torch.int8)
output_tensor_list[i] = output_tensor_list[i].to(torch.int8)
for i in range(world_size):
dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)
# Convert back to original dtype
for i, dtype in enumerate(dtypes):
if dtype == torch.bool:
input_tensor_list[i] = input_tensor_list[i].to(dtype)
output_tensor_list[i] = output_tensor_list[i].to(dtype)
def alltoallv_cpu(rank, world_size, input_tensor_list):
def alltoallv_cpu(rank, world_size, input_tensor_list, retain_nones=True):
"""
Wrapper function to providing the alltoallv functionality by using underlying alltoall
messaging primitive. This function, in its current implementation, supports exchanging
......@@ -98,6 +111,8 @@ def alltoallv_cpu(rank, world_size, input_tensor_list):
The size of the entire
input_tensor_list : List of tensor
The tensors to exchange
retain_nones : bool
Indicates whether to retain ``None`` data in returned value.
Returns:
--------
......@@ -151,7 +166,8 @@ def alltoallv_cpu(rank, world_size, input_tensor_list):
return_vals = []
for s, t in zip(recv_counts, output_tensor_list):
if s[0] == 0:
return_vals.append(None)
if retain_nones:
return_vals.append(None)
else:
return_vals.append(t[0:s[0]])
return return_vals
......
......@@ -557,3 +557,26 @@ def map_partid_rank(partid, world_size):
id.
"""
return partid % world_size
def generate_read_list(num_files, world_size):
"""Generate the file IDs to read for each rank.
Parameters:
-----------
num_files : int
Total number of files.
world_size : int
World size of group.
Returns:
--------
read_list : np.array
Array of target file IDs to read.
Examples
--------
>>> tools.distpartitionning.utils.generate_read_list(10, 4)
[array([0, 1, 2]), array([3, 4, 5]), array([6, 7]), array([8, 9])]
"""
return np.array_split(np.arange(num_files), world_size)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment