"git@developer.sourcefind.cn:OpenDAS/vision.git" did not exist on "5f54fd14bc55fc0eba2777b4c47864d951eee516"
Unverified Commit ad7be8be authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

[Distributed][Feature] New distributed partitioning pipeline (#4439)

parents ee672c0b 7e2ed9f8
...@@ -30,7 +30,7 @@ export CUDA_VISIBLE_DEVICES=-1 ...@@ -30,7 +30,7 @@ export CUDA_VISIBLE_DEVICES=-1
conda activate ${DGLBACKEND}-ci conda activate ${DGLBACKEND}-ci
python3 -m pip install pytest psutil pyyaml pydantic pandas rdflib ogb filelock || fail "pip install" python3 -m pip install pytest psutil pyyaml pydantic pandas rdflib ogb filelock pyarrow || fail "pip install"
export PYTHONUNBUFFERED=1 export PYTHONUNBUFFERED=1
export OMP_NUM_THREADS=1 export OMP_NUM_THREADS=1
......
import argparse
import dgl
import json
import numpy as np
import os
import sys
import tempfile
import torch
from dgl.data.utils import load_tensors, load_graphs
from chunk_graph import chunk_graph
def test_part_pipeline():
# Step0: prepare chunked graph data format
# A synthetic mini MAG240
num_institutions = 20
num_authors = 100
num_papers = 600
def rand_edges(num_src, num_dst, num_edges):
eids = np.random.choice(num_src * num_dst, num_edges, replace=False)
src = torch.from_numpy(eids // num_dst)
dst = torch.from_numpy(eids % num_dst)
return src, dst
num_cite_edges = 2000
num_write_edges = 1000
num_affiliate_edges = 200
# Structure
data_dict = {
('paper', 'cites', 'paper'): rand_edges(num_papers, num_papers, num_cite_edges),
('author', 'writes', 'paper'): rand_edges(num_authors, num_papers, num_write_edges),
('author', 'affiliated_with', 'institution'): rand_edges(num_authors, num_institutions, num_affiliate_edges)
}
src, dst = data_dict[('author', 'writes', 'paper')]
data_dict[('paper', 'rev_writes', 'author')] = (dst, src)
g = dgl.heterograph(data_dict)
# paper feat, label, year
num_paper_feats = 3
paper_feat = np.random.randn(num_papers, num_paper_feats)
num_classes = 4
paper_label = np.random.choice(num_classes, num_papers)
paper_year = np.random.choice(2022, num_papers)
# edge features
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
# Save features
with tempfile.TemporaryDirectory() as root_dir:
print('root_dir', root_dir)
input_dir = os.path.join(root_dir, 'data_test')
os.makedirs(input_dir)
for sub_d in ['paper', 'cites', 'writes']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_feat_path = os.path.join(input_dir, 'paper/feat.npy')
with open(paper_feat_path, 'wb') as f:
np.save(f, paper_feat)
paper_label_path = os.path.join(input_dir, 'paper/label.npy')
with open(paper_label_path, 'wb') as f:
np.save(f, paper_label)
paper_year_path = os.path.join(input_dir, 'paper/year.npy')
with open(paper_year_path, 'wb') as f:
np.save(f, paper_year)
cite_count_path = os.path.join(input_dir, 'cites/count.npy')
with open(cite_count_path, 'wb') as f:
np.save(f, cite_count)
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
output_dir = os.path.join(root_dir, 'chunked-data')
num_chunks = 2
chunk_graph(
g,
'mag240m',
{'paper':
{
'feat': paper_feat_path,
'label': paper_label_path,
'year': paper_year_path
}
},
{
'cites': {'count': cite_count_path},
'writes': {'year': write_year_path},
# you can put the same data file if they indeed share the features.
'rev_writes': {'year': write_year_path}
},
num_chunks=num_chunks,
output_path=output_dir)
# check metadata.json
json_file = os.path.join(output_dir, 'metadata.json')
assert os.path.isfile(json_file)
with open(json_file, 'rb') as f:
meta_data = json.load(f)
assert meta_data['graph_name'] == 'mag240m'
assert len(meta_data['num_nodes_per_chunk'][0]) == num_chunks
# check edge_index
output_edge_index_dir = os.path.join(output_dir, 'edge_index')
for utype, etype, vtype in data_dict.keys():
fname = ':'.join([utype, etype, vtype])
for i in range(num_chunks):
chunk_f_name = os.path.join(output_edge_index_dir, fname + str(i) + '.txt')
assert os.path.isfile(chunk_f_name)
with open(chunk_f_name, 'r') as f:
header = f.readline()
num1, num2 = header.rstrip().split(' ')
assert isinstance(int(num1), int)
assert isinstance(int(num2), int)
# check node_data
output_node_data_dir = os.path.join(output_dir, 'node_data', 'paper')
for feat in ['feat', 'label', 'year']:
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_node_data_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_papers // num_chunks
# check edge_data
num_edges = {
'paper:cites:paper': num_cite_edges,
'author:writes:paper': num_write_edges,
'paper:rev_writes:author': num_write_edges
}
output_edge_data_dir = os.path.join(output_dir, 'edge_data')
for etype, feat in [
['paper:cites:paper', 'count'],
['author:writes:paper', 'year'],
['paper:rev_writes:author', 'year']
]:
output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
for i in range(num_chunks):
chunk_f_name = '{}-{}.npy'.format(feat, i)
chunk_f_name = os.path.join(output_edge_sub_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_edges[etype] // num_chunks
# Step1: graph partition
in_dir = os.path.join(root_dir, 'chunked-data')
output_dir = os.path.join(root_dir, '2parts')
os.system('python tools/partition_algo/random_partition.py '\
'--metadata {}/metadata.json --output_path {} --num_partitions {}'.format(
in_dir, output_dir, num_chunks))
for ntype in ['author', 'institution', 'paper']:
fname = os.path.join(output_dir, '{}.txt'.format(ntype))
with open(fname, 'r') as f:
header = f.readline().rstrip()
assert isinstance(int(header), int)
# Step2: data dispatch
partition_dir = os.path.join(root_dir, '2parts')
out_dir = os.path.join(root_dir, 'partitioned')
ip_config = os.path.join(root_dir, 'ip_config.txt')
with open(ip_config, 'w') as f:
f.write('127.0.0.1\n')
f.write('127.0.0.2\n')
os.system('python tools/dispatch_data.py '\
'--in-dir {} --partitions-dir {} --out-dir {} --ip-config {}'.format(
in_dir, partition_dir, out_dir, ip_config))
# check metadata.json
meta_fname = os.path.join(out_dir, 'metadata.json')
with open(meta_fname, 'rb') as f:
meta_data = json.load(f)
all_etypes = ['affiliated_with', 'writes', 'cites', 'rev_writes']
for etype in all_etypes:
assert len(meta_data['edge_map'][etype]) == num_chunks
assert meta_data['etypes'].keys() == set(all_etypes)
assert meta_data['graph_name'] == 'mag240m'
all_ntypes = ['author', 'institution', 'paper']
for ntype in all_ntypes:
assert len(meta_data['node_map'][ntype]) == num_chunks
assert meta_data['ntypes'].keys() == set(all_ntypes)
assert meta_data['num_edges'] == 4200
assert meta_data['num_nodes'] == 720
assert meta_data['num_parts'] == num_chunks
for i in range(num_chunks):
sub_dir = 'part-' + str(i)
assert meta_data[sub_dir]['node_feats'] == 'part{}/node_feat.dgl'.format(i)
assert meta_data[sub_dir]['edge_feats'] == 'part{}/edge_feat.dgl'.format(i)
assert meta_data[sub_dir]['part_graph'] == 'part{}/graph.dgl'.format(i)
# check data
sub_dir = os.path.join(out_dir, 'part' + str(i))
# graph.dgl
fname = os.path.join(sub_dir, 'graph.dgl')
assert os.path.isfile(fname)
g_list, data_dict = load_graphs(fname)
g = g_list[0]
assert isinstance(g, dgl.DGLGraph)
# node_feat.dgl
fname = os.path.join(sub_dir, 'node_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
all_tensors = ['paper/feat', 'paper/label', 'paper/year']
assert tensor_dict.keys() == set(all_tensors)
for key in all_tensors:
assert isinstance(tensor_dict[key], torch.Tensor)
# edge_feat.dgl
fname = os.path.join(sub_dir, 'edge_feat.dgl')
assert os.path.isfile(fname)
tensor_dict = load_tensors(fname)
if __name__ == '__main__':
test_part_pipeline()
# DGL Utility Scripts
This folder contains the utilities that do not belong to DGL core package as standalone executable
scripts.
## Graph Chunking
`chunk_graph.py` provides an example of chunking an existing DGLGraph object into the on-disk
[chunked graph format](http://13.231.216.217/guide/distributed-preprocessing.html#chunked-graph-format).
<!-- TODO: change the link of documentation once it's merged to master -->
An example of chunking the OGB MAG240M dataset:
```python
import ogb.lsc
dataset = ogb.lsc.MAG240MDataset('.')
etypes = [
('paper', 'cites', 'paper'),
('author', 'writes', 'paper'),
('author', 'affiliated_with', 'institution')]
g = dgl.heterograph({k: tuple(dataset.edge_index(*k)) for k in etypes})
chunk_graph(
g,
'mag240m',
{'paper': {
'feat': 'mag240m_kddcup2021/processed/paper/node_feat.npy',
'label': 'mag240m_kddcup2021/processed/paper/node_label.npy',
'year': 'mag240m_kddcup2021/processed/paper/node_year.npy'}},
{},
4,
'output')
```
The output chunked graph metadata will go as follows (assuming the current directory as
`/home/user`:
```json
{
"graph_name": "mag240m",
"node_type": [
"author",
"institution",
"paper"
],
"num_nodes_per_chunk": [
[
30595778,
30595778,
30595778,
30595778
],
[
6431,
6430,
6430,
6430
],
[
30437917,
30437917,
30437916,
30437916
]
],
"edge_type": [
"author:affiliated_with:institution",
"author:writes:paper",
"paper:cites:paper"
],
"num_edges_per_chunk": [
[
11148147,
11148147,
11148146,
11148146
],
[
96505680,
96505680,
96505680,
96505680
],
[
324437232,
324437232,
324437231,
324437231
]
],
"edges": {
"author:affiliated_with:institution": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/author:affiliated_with:institution0.txt",
"/home/user/output/edge_index/author:affiliated_with:institution1.txt",
"/home/user/output/edge_index/author:affiliated_with:institution2.txt",
"/home/user/output/edge_index/author:affiliated_with:institution3.txt"
]
},
"author:writes:paper": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/author:writes:paper0.txt",
"/home/user/output/edge_index/author:writes:paper1.txt",
"/home/user/output/edge_index/author:writes:paper2.txt",
"/home/user/output/edge_index/author:writes:paper3.txt"
]
},
"paper:cites:paper": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/paper:cites:paper0.txt",
"/home/user/output/edge_index/paper:cites:paper1.txt",
"/home/user/output/edge_index/paper:cites:paper2.txt",
"/home/user/output/edge_index/paper:cites:paper3.txt"
]
}
},
"node_data": {
"paper": {
"feat": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/feat-0.npy",
"/home/user/output/node_data/paper/feat-1.npy",
"/home/user/output/node_data/paper/feat-2.npy",
"/home/user/output/node_data/paper/feat-3.npy"
]
},
"label": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/label-0.npy",
"/home/user/output/node_data/paper/label-1.npy",
"/home/user/output/node_data/paper/label-2.npy",
"/home/user/output/node_data/paper/label-3.npy"
]
},
"year": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/year-0.npy",
"/home/user/output/node_data/paper/year-1.npy",
"/home/user/output/node_data/paper/year-2.npy",
"/home/user/output/node_data/paper/year-3.npy"
]
}
}
},
"edge_data": {}
}
```
# See the __main__ block for usage of chunk_graph().
import pathlib
import json
from contextlib import contextmanager
import logging
import os
import torch
import dgl
from utils import setdir
from utils import array_readwriter
def chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
paths = []
offset = 0
for j, n in enumerate(chunk_sizes):
path = os.path.abspath(path_fmt % j)
arr_chunk = arr[offset:offset + n]
logging.info('Chunking %d-%d' % (offset, offset + n))
array_readwriter.get_array_parser(**fmt_meta).write(path, arr_chunk)
offset += n
paths.append(path)
return paths
def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
# First deal with ndata and edata that are homogeneous (i.e. not a dict-of-dict)
if len(g.ntypes) == 1 and not isinstance(next(iter(ndata_paths.values())), dict):
ndata_paths = {g.ntypes[0]: ndata_paths}
if len(g.etypes) == 1 and not isinstance(next(iter(edata_paths.values())), dict):
edata_paths = {g.etypes[0]: ndata_paths}
# Then convert all edge types to canonical edge types
etypestrs = {etype: ':'.join(etype) for etype in g.canonical_etypes}
edata_paths = {':'.join(g.to_canonical_etype(k)): v for k, v in edata_paths.items()}
metadata = {}
metadata['graph_name'] = name
metadata['node_type'] = g.ntypes
# Compute the number of nodes per chunk per node type
metadata['num_nodes_per_chunk'] = num_nodes_per_chunk = []
for ntype in g.ntypes:
num_nodes = g.num_nodes(ntype)
num_nodes_list = []
for i in range(num_chunks):
n = num_nodes // num_chunks + (i < num_nodes % num_chunks)
num_nodes_list.append(n)
num_nodes_per_chunk.append(num_nodes_list)
num_nodes_per_chunk_dict = {k: v for k, v in zip(g.ntypes, num_nodes_per_chunk)}
metadata['edge_type'] = [etypestrs[etype] for etype in g.canonical_etypes]
# Compute the number of edges per chunk per edge type
metadata['num_edges_per_chunk'] = num_edges_per_chunk = []
for etype in g.canonical_etypes:
num_edges = g.num_edges(etype)
num_edges_list = []
for i in range(num_chunks):
n = num_edges // num_chunks + (i < num_edges % num_chunks)
num_edges_list.append(n)
num_edges_per_chunk.append(num_edges_list)
num_edges_per_chunk_dict = {k: v for k, v in zip(g.canonical_etypes, num_edges_per_chunk)}
# Split edge index
metadata['edges'] = {}
with setdir('edge_index'):
for etype in g.canonical_etypes:
etypestr = etypestrs[etype]
logging.info('Chunking edge index for %s' % etypestr)
edges_meta = {}
fmt_meta = {"name": "csv", "delimiter": " "}
edges_meta['format'] = fmt_meta
srcdst = torch.stack(g.edges(etype=etype), 1)
edges_meta['data'] = chunk_numpy_array(
srcdst.numpy(), fmt_meta, num_edges_per_chunk_dict[etype],
etypestr + '%d.txt')
metadata['edges'][etypestr] = edges_meta
# Chunk node data
metadata['node_data'] = {}
with setdir('node_data'):
for ntype, ndata_per_type in ndata_paths.items():
ndata_meta = {}
with setdir(ntype):
for key, path in ndata_per_type.items():
logging.info('Chunking node data for type %s key %s' % (ntype, key))
ndata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(**reader_fmt_meta).read(path)
ndata_key_meta['format'] = writer_fmt_meta
ndata_key_meta['data'] = chunk_numpy_array(
arr, writer_fmt_meta, num_nodes_per_chunk_dict[ntype],
key + '-%d.npy')
ndata_meta[key] = ndata_key_meta
metadata['node_data'][ntype] = ndata_meta
# Chunk edge data
metadata['edge_data'] = {}
with setdir('edge_data'):
for etypestr, edata_per_type in edata_paths.items():
edata_meta = {}
with setdir(etypestr):
for key, path in edata_per_type.items():
logging.info('Chunking edge data for type %s key %s' % (etypestr, key))
edata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(**reader_fmt_meta).read(path)
edata_key_meta['format'] = writer_fmt_meta
etype = tuple(etypestr.split(':'))
edata_key_meta['data'] = chunk_numpy_array(
arr, writer_fmt_meta, num_edges_per_chunk_dict[etype],
key + '-%d.npy')
edata_meta[key] = edata_key_meta
metadata['edge_data'][etypestr] = edata_meta
metadata_path = 'metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f)
logging.info('Saved metadata in %s' % os.path.abspath(metadata_path))
def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
"""
Split the graph into multiple chunks.
A directory will be created at :attr:`output_path` with the metadata and chunked
edge list as well as the node/edge data.
Parameters
----------
g : DGLGraph
The graph.
name : str
The name of the graph, to be used later in DistDGL training.
ndata_paths : dict[str, pathlike] or dict[ntype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
node data key.
edata_paths : dict[str, pathlike] or dict[etype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
edge data key.
num_chunks : int
The number of chunks
output_path : pathlike
The output directory saving the chunked graph.
"""
for ntype, ndata in ndata_paths.items():
for key in ndata.keys():
ndata[key] = os.path.abspath(ndata[key])
for etype, edata in edata_paths.items():
for key in edata.keys():
edata[key] = os.path.abspath(edata[key])
with setdir(output_path):
_chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path)
if __name__ == '__main__':
logging.basicConfig(level='INFO')
input_dir = '/data'
output_dir = '/chunked-data'
(g,), _ = dgl.load_graphs(os.path.join(input_dir, 'graph.dgl'))
chunk_graph(
g,
'mag240m',
{'paper': {
'feat': os.path.join(input_dir, 'paper/feat.npy'),
'label': os.path.join(input_dir, 'paper/label.npy'),
'year': os.path.join(input_dir, 'paper/year.npy')}},
{'cites': {'count': os.path.join(input_dir, 'cites/count.npy')},
'writes': {'year': os.path.join(input_dir, 'writes/year.npy')},
# you can put the same data file if they indeed share the features.
'rev_writes': {'year': os.path.join(input_dir, 'writes/year.npy')}},
4,
output_dir)
# The generated metadata goes as in tools/sample-config/mag240m-metadata.json.
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
import pandas as pd
from pyarrow import csv
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--num-node-weights', required=True, type=int,
help='The number of node weights used by METIS.')
parser.add_argument('--workspace', type=str, default='/tmp',
help='The directory to store the intermediate results')
parser.add_argument('--node-attr-dtype', type=str, default=None,
help='The data type of the node attributes')
parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--removed-edges', help='a file that contains the removed self-loops and duplicated edges',
default=None, type=str)
args = parser.parse_args()
input_dir = args.input_dir
graph_name = args.graph_name
num_parts = args.num_parts
num_node_weights = args.num_node_weights
node_attr_dtype = args.node_attr_dtype
edge_attr_dtype = args.edge_attr_dtype
workspace_dir = args.workspace
output_dir = args.output
self_loop_edges = None
duplicate_edges = None
if args.removed_edges is not None:
removed_file = '{}/{}'.format(input_dir, args.removed_edges)
removed_df = csv.read_csv(removed_file, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
assert removed_df.num_columns == 4
src_id = removed_df['f0'].to_numpy()
dst_id = removed_df['f1'].to_numpy()
orig_id = removed_df['f2'].to_numpy()
etype = removed_df['f3'].to_numpy()
self_loop_idx = src_id == dst_id
not_self_loop_idx = src_id != dst_id
self_loop_edges = [src_id[self_loop_idx], dst_id[self_loop_idx],
orig_id[self_loop_idx], etype[self_loop_idx]]
duplicate_edges = [src_id[not_self_loop_idx], dst_id[not_self_loop_idx],
orig_id[not_self_loop_idx], etype[not_self_loop_idx]]
print('There are {} self-loops and {} duplicated edges in the removed edges'.format(len(self_loop_edges[0]),
len(duplicate_edges[0])))
with open(args.schema) as json_file:
schema = json.load(json_file)
nid_ranges = schema['nid']
eid_ranges = schema['eid']
nid_ranges = {key: np.array(nid_ranges[key]).reshape(
1, 2) for key in nid_ranges}
eid_ranges = {key: np.array(eid_ranges[key]).reshape(
1, 2) for key in eid_ranges}
id_map = dgl.distributed.id_map.IdMap(nid_ranges)
ntypes = [(key, nid_ranges[key][0, 0]) for key in nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
etypes = [(key, eid_ranges[key][0, 0]) for key in eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e: i for i, e in enumerate(etypes)}
def read_feats(file_name):
attrs = csv.read_csv(file_name, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(attrs.columns)
return np.stack([attrs.columns[i].to_numpy() for i in range(num_cols)], 1)
max_nid = np.iinfo(np.int32).max
num_edges = 0
num_nodes = 0
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype: [] for etype in etypes}
for part_id in range(num_parts):
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
node_file = 'p{:03}-{}_nodes.txt'.format(part_id, graph_name)
# The format of each line in the node file:
# <node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
# The node file contains nodes that belong to a partition. It doesn't include HALO nodes.
orig_type_nid_col = 3 + num_node_weights
first_attr_col = 4 + num_node_weights
# Get the first two columns which is the node ID and node type.
tmp_output = workspace_dir + '/' + node_file + '.tmp'
os.system('awk \'{print $1, $2, $' + str(orig_type_nid_col) + '}\''
+ ' {} > {}'.format(input_dir + '/' + node_file, tmp_output))
nodes = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
nids, ntype_ids, orig_type_nid = nodes.columns[0].to_numpy(), nodes.columns[1].to_numpy(), \
nodes.columns[2].to_numpy()
orig_homo_nid = ntype_offset_np[ntype_ids] + orig_type_nid
assert np.all(nids[1:] - nids[:-1] == 1)
nid_range = (nids[0], nids[-1])
num_nodes += len(nodes)
if node_attr_dtype is not None:
# Get node attributes
# Here we just assume all nodes have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode node attributes.
os.system('cut -d\' \' -f {}- {} > {}'.format(first_attr_col,
input_dir + '/' + node_file,
tmp_output))
node_attrs = read_feats(tmp_output)
node_feats = {}
# nodes in a partition has been sorted based on node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
assert np.all(type_nids == np.arange(
type_nids[0], type_nids[-1] + 1))
node_feats[ntype_name +
'/feat'] = th.as_tensor(node_attrs[ntype_ids == ntype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "node_feat.dgl"), node_feats)
# Determine the node ID ranges of different node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
edge_file = 'p{:03}-{}_edges.txt'.format(part_id, graph_name)
# The format of each line in the edge file:
# <src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_edge_id> <edge_type> <attributes>
tmp_output = workspace_dir + '/' + edge_file + '.tmp'
os.system('awk \'{print $1, $2, $3, $4, $5, $6}\'' + ' {} > {}'.format(input_dir + '/' + edge_file,
tmp_output))
edges = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(edges.columns)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = [
edges.columns[i].to_numpy() for i in range(num_cols)]
# Let's merge the self-loops and duplicated edges to the partition.
src_id_list, dst_id_list = [src_id], [dst_id]
orig_src_id_list, orig_dst_id_list = [orig_src_id], [orig_dst_id]
orig_edge_id_list, etype_id_list = [orig_edge_id], [etype_ids]
if self_loop_edges is not None and len(self_loop_edges[0]) > 0:
uniq_orig_nids, idx = np.unique(orig_dst_id, return_index=True)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_nids, self_loop_edges[0], return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(dst_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(self_loop_edges[0][common_idx2])
orig_dst_id_list.append(self_loop_edges[0][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(self_loop_edges[2][common_idx2])
etype_id_list.append(self_loop_edges[3][common_idx2])
print('Add {} self-loops in partition {}'.format(len(idx), part_id))
if duplicate_edges is not None and len(duplicate_edges[0]) > 0:
part_ids = orig_src_id.astype(
np.int64) * max_nid + orig_dst_id.astype(np.int64)
uniq_orig_ids, idx = np.unique(part_ids, return_index=True)
duplicate_ids = duplicate_edges[0].astype(
np.int64) * max_nid + duplicate_edges[1].astype(np.int64)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_ids, duplicate_ids, return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(src_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(duplicate_edges[0][common_idx2])
orig_dst_id_list.append(duplicate_edges[1][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(duplicate_edges[2][common_idx2])
etype_id_list.append(duplicate_edges[3][common_idx2])
print('Add {} duplicated edges in partition {}'.format(len(idx), part_id))
src_id = np.concatenate(src_id_list) if len(
src_id_list) > 1 else src_id_list[0]
dst_id = np.concatenate(dst_id_list) if len(
dst_id_list) > 1 else dst_id_list[0]
orig_src_id = np.concatenate(orig_src_id_list) if len(
orig_src_id_list) > 1 else orig_src_id_list[0]
orig_dst_id = np.concatenate(orig_dst_id_list) if len(
orig_dst_id_list) > 1 else orig_dst_id_list[0]
orig_edge_id = np.concatenate(orig_edge_id_list) if len(
orig_edge_id_list) > 1 else orig_edge_id_list[0]
etype_ids = np.concatenate(etype_id_list) if len(
etype_id_list) > 1 else etype_id_list[0]
print('There are {} edges in partition {}'.format(len(src_id), part_id))
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = src_id[sort_idx], dst_id[sort_idx], \
orig_src_id[sort_idx], orig_dst_id[sort_idx], orig_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
if edge_attr_dtype is not None:
# Get edge attributes
# Here we just assume all edges have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode edge attributes.
os.system('cut -d\' \' -f 7- {} > {}'.format(input_dir +
'/' + edge_file, tmp_output))
edge_attrs = th.as_tensor(read_feats(tmp_output))[sort_idx]
edge_feats = {}
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_feats[etype_name +
'/feat'] = th.as_tensor(edge_attrs[etype_ids == etype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "edge_feat.dgl"), edge_feats)
# Determine the edge ID range of different edge types.
edge_id_start = num_edges
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_map_val[etype_name].append([int(edge_id_start),
int(edge_id_start + np.sum(etype_ids == etype_id))])
edge_id_start += np.sum(etype_ids == etype_id)
# Here we want to compute the unique IDs in the edge list.
# It is possible that a node that belongs to the partition but it doesn't appear
# in the edge list. That is, the node is assigned to this partition, but its neighbor
# belongs to another partition so that the edge is assigned to another partition.
# This happens in a directed graph.
# To avoid this kind of nodes being removed from the graph, we add node IDs that
# belong to this partition.
ids = np.concatenate(
[src_id, dst_id, np.arange(nid_range[0], nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
local_src_id, local_dst_id = np.split(inverse_idx[:len(src_id) * 2], 2)
compact_g = dgl.graph((local_src_id, local_dst_id))
compact_g.edata['orig_id'] = th.as_tensor(orig_edge_id)
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
compact_g.edata['inner_edge'] = th.ones(
compact_g.number_of_edges(), dtype=th.bool)
# The original IDs are homogeneous IDs.
# Similarly, we need to add the original homogeneous node IDs
orig_ids = np.concatenate([orig_src_id, orig_dst_id, orig_homo_nid])
orig_homo_ids = orig_ids[idx]
ntype, per_type_ids = id_map(orig_homo_ids)
compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids)
compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype)
compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids)
compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(
uniq_ids >= nid_range[0], uniq_ids <= nid_range[1]))
local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()]
assert np.all((local_nids == th.arange(
local_nids[0], local_nids[-1] + 1)).numpy())
print('|V|={}'.format(compact_g.number_of_nodes()))
print('|E|={}'.format(compact_g.number_of_edges()))
# We need to reshuffle nodes in a partition so that all local nodes are labelled starting from 0.
reshuffle_nodes = th.arange(compact_g.number_of_nodes())
reshuffle_nodes = th.cat([reshuffle_nodes[compact_g.ndata['inner_node'].bool()],
reshuffle_nodes[compact_g.ndata['inner_node'] == 0]])
compact_g1 = dgl.node_subgraph(compact_g, reshuffle_nodes)
compact_g1.ndata['orig_id'] = compact_g.ndata['orig_id'][reshuffle_nodes]
compact_g1.ndata[dgl.NTYPE] = compact_g.ndata[dgl.NTYPE][reshuffle_nodes]
compact_g1.ndata[dgl.NID] = compact_g.ndata[dgl.NID][reshuffle_nodes]
compact_g1.ndata['inner_node'] = compact_g.ndata['inner_node'][reshuffle_nodes]
compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]]
compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]]
# reshuffle edges on ETYPE as node_subgraph relabels edges
idx = th.argsort(compact_g1.edata[dgl.ETYPE])
u, v = compact_g1.edges()
u = u[idx]
v = v[idx]
compact_g2 = dgl.graph((u, v))
compact_g2.ndata['orig_id'] = compact_g1.ndata['orig_id']
compact_g2.ndata[dgl.NTYPE] = compact_g1.ndata[dgl.NTYPE]
compact_g2.ndata[dgl.NID] = compact_g1.ndata[dgl.NID]
compact_g2.ndata['inner_node'] = compact_g1.ndata['inner_node']
compact_g2.edata['orig_id'] = compact_g1.edata['orig_id'][idx]
compact_g2.edata[dgl.ETYPE] = compact_g1.edata[dgl.ETYPE][idx]
compact_g2.edata['inner_edge'] = compact_g1.edata['inner_edge'][idx]
compact_g2.edata[dgl.EID] = th.arange(
num_edges, num_edges + compact_g2.number_of_edges())
num_edges += compact_g2.number_of_edges()
dgl.save_graphs(part_dir + '/graph.dgl', [compact_g2])
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
for part_id in range(num_parts):
part_dir = 'part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
"""Launching distributed graph partitioning pipeline """
import os
import sys
import argparse
import logging
import json
INSTALL_DIR = os.path.abspath(os.path.join(__file__, '..'))
LAUNCH_SCRIPT = "distgraphlaunch.py"
PIPELINE_SCRIPT = "distpartitioning/data_proc_pipeline.py"
UDF_WORLD_SIZE = "world-size"
UDF_PART_DIR = "partitions-dir"
UDF_INPUT_DIR = "input-dir"
UDF_GRAPH_NAME = "graph-name"
UDF_SCHEMA = "schema"
UDF_NUM_PARTS = "num-parts"
UDF_OUT_DIR = "output"
LARG_PROCS_MACHINE = "num_proc_per_machine"
LARG_IPCONF = "ip_config"
LARG_MASTER_PORT = "master_port"
def get_launch_cmd(args) -> str:
cmd = sys.executable + " " + os.path.join(INSTALL_DIR, LAUNCH_SCRIPT)
cmd = f"{cmd} --{LARG_PROCS_MACHINE} 1 "
cmd = f"{cmd} --{LARG_IPCONF} {args.ip_config} "
cmd = f"{cmd} --{LARG_MASTER_PORT} {args.master_port} "
return cmd
def submit_jobs(args) -> str:
wrapper_command = os.path.join(INSTALL_DIR, LAUNCH_SCRIPT)
#read the json file and get the remaining argument here.
schema_path = "metadata.json"
with open(os.path.join(args.in_dir, schema_path)) as schema:
schema_map = json.load(schema)
num_parts = len(schema_map["num_nodes_per_chunk"][0])
graph_name = schema_map["graph_name"]
argslist = ""
argslist += "--world-size {} ".format(num_parts)
argslist += "--partitions-dir {} ".format(os.path.abspath(args.partitions_dir))
argslist += "--input-dir {} ".format(os.path.abspath(args.in_dir))
argslist += "--graph-name {} ".format(graph_name)
argslist += "--schema {} ".format(schema_path)
argslist += "--num-parts {} ".format(num_parts)
argslist += "--output {} ".format(os.path.abspath(args.out_dir))
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
udf_cmd = f"{args.python_path} {pipeline_cmd} {argslist}"
launch_cmd = get_launch_cmd(args)
launch_cmd += '\"'+udf_cmd+'\"'
print(launch_cmd)
os.system(launch_cmd)
def main():
parser = argparse.ArgumentParser(description='Dispatch edge index and data to partitions', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--in-dir', type=str, help='Location of the input directory where the dataset is located')
parser.add_argument('--partitions-dir', type=str, help='Location of the partition-id mapping files which define node-ids and their respective partition-ids, relative to the input directory')
parser.add_argument('--out-dir', type=str, help='Location of the output directory where the graph partitions will be created by this pipeline')
parser.add_argument('--ip-config', type=str, help='File location of IP configuration for server processes')
parser.add_argument('--master-port', type=int, default=12345, help='port used by gloo group to create randezvous point')
parser.add_argument('--python-path', type=str, default=sys.executable, help='Path to the Python executable on all workers')
args, udf_command = parser.parse_known_args()
assert os.path.isdir(args.in_dir)
assert os.path.isdir(args.partitions_dir)
assert os.path.isfile(args.ip_config)
assert isinstance(args.master_port, int)
tokens = sys.executable.split(os.sep)
submit_jobs(args)
if __name__ == '__main__':
fmt = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
main()
"""Launching tool for DGL distributed training"""
import os
import stat
import sys
import subprocess
import argparse
import signal
import logging
import time
import json
import multiprocessing
import re
from functools import partial
from threading import Thread
from typing import Optional
DEFAULT_PORT = 30050
def cleanup_proc(get_all_remote_pids, conn):
'''This process tries to clean up the remote training tasks.
'''
print('cleanupu process runs')
# This process should not handle SIGINT.
signal.signal(signal.SIGINT, signal.SIG_IGN)
data = conn.recv()
# If the launch process exits normally, this process doesn't need to do anything.
if data == 'exit':
sys.exit(0)
else:
remote_pids = get_all_remote_pids()
# Otherwise, we need to ssh to each machine and kill the training jobs.
for (ip, port), pids in remote_pids.items():
kill_process(ip, port, pids)
print('cleanup process exits')
def kill_process(ip, port, pids):
'''ssh to a remote machine and kill the specified processes.
'''
curr_pid = os.getpid()
killed_pids = []
# If we kill child processes first, the parent process may create more again. This happens
# to Python's process pool. After sorting, we always kill parent processes first.
pids.sort()
for pid in pids:
assert curr_pid != pid
print('kill process {} on {}:{}'.format(pid, ip, port), flush=True)
kill_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'kill {}\''.format(pid)
subprocess.run(kill_cmd, shell=True)
killed_pids.append(pid)
# It's possible that some of the processes are not killed. Let's try again.
for i in range(3):
killed_pids = get_killed_pids(ip, port, killed_pids)
if len(killed_pids) == 0:
break
else:
killed_pids.sort()
for pid in killed_pids:
print('kill process {} on {}:{}'.format(pid, ip, port), flush=True)
kill_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'kill -9 {}\''.format(pid)
subprocess.run(kill_cmd, shell=True)
def get_killed_pids(ip, port, killed_pids):
'''Get the process IDs that we want to kill but are still alive.
'''
killed_pids = [str(pid) for pid in killed_pids]
killed_pids = ','.join(killed_pids)
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'ps -p {} -h\''.format(killed_pids)
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
pids = []
for p in res.stdout.decode('utf-8').split('\n'):
l = p.split()
if len(l) > 0:
pids.append(int(l[0]))
return pids
def execute_remote(
cmd: str,
ip: str,
port: int,
username: Optional[str] = ""
) -> Thread:
"""Execute command line on remote machine via ssh.
Args:
cmd: User-defined command (udf) to execute on the remote host.
ip: The ip-address of the host to run the command on.
port: Port number that the host is listening on.
thread_list:
username: Optional. If given, this will specify a username to use when issuing commands over SSH.
Useful when your infra requires you to explicitly specify a username to avoid permission issues.
Returns:
thread: The Thread whose run() is to run the `cmd` on the remote host. Returns when the cmd completes
on the remote host.
"""
ip_prefix = ""
if username:
ip_prefix += "{username}@".format(username=username)
# Construct ssh command that executes `cmd` on the remote host
ssh_cmd = "ssh -o StrictHostKeyChecking=no -p {port} {ip_prefix}{ip} '{cmd}'".format(
port=str(port),
ip_prefix=ip_prefix,
ip=ip,
cmd=cmd,
)
# thread func to run the job
def run(ssh_cmd):
subprocess.check_call(ssh_cmd, shell=True)
thread = Thread(target=run, args=(ssh_cmd,))
thread.setDaemon(True)
thread.start()
return thread
def get_remote_pids(ip, port, cmd_regex):
"""Get the process IDs that run the command in the remote machine.
"""
pids = []
curr_pid = os.getpid()
# Here we want to get the python processes. We may get some ssh processes, so we should filter them out.
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'ps -aux | grep python | grep -v StrictHostKeyChecking\''
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
for p in res.stdout.decode('utf-8').split('\n'):
l = p.split()
if len(l) < 2:
continue
# We only get the processes that run the specified command.
res = re.search(cmd_regex, p)
if res is not None and int(l[1]) != curr_pid:
pids.append(l[1])
pid_str = ','.join([str(pid) for pid in pids])
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'pgrep -P {}\''.format(pid_str)
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
pids1 = res.stdout.decode('utf-8').split('\n')
all_pids = []
for pid in set(pids + pids1):
if pid == '' or int(pid) == curr_pid:
continue
all_pids.append(int(pid))
all_pids.sort()
return all_pids
def get_all_remote_pids(hosts, ssh_port, udf_command):
'''Get all remote processes.
'''
remote_pids = {}
for node_id, host in enumerate(hosts):
ip, _ = host
# When creating training processes in remote machines, we may insert some arguments
# in the commands. We need to use regular expressions to match the modified command.
cmds = udf_command.split()
new_udf_command = ' .*'.join(cmds)
pids = get_remote_pids(ip, ssh_port, new_udf_command)
remote_pids[(ip, ssh_port)] = pids
return remote_pids
def construct_torch_dist_launcher_cmd(
num_trainers: int,
num_nodes: int,
node_rank: int,
master_addr: str,
master_port: int
) -> str:
"""Constructs the torch distributed launcher command.
Helper function.
Args:
num_trainers:
num_nodes:
node_rank:
master_addr:
master_port:
Returns:
cmd_str.
"""
torch_cmd_template = "-m torch.distributed.launch " \
"--nproc_per_node={nproc_per_node} " \
"--nnodes={nnodes} " \
"--node_rank={node_rank} " \
"--master_addr={master_addr} " \
"--master_port={master_port}"
return torch_cmd_template.format(
nproc_per_node=num_trainers,
nnodes=num_nodes,
node_rank=node_rank,
master_addr=master_addr,
master_port=master_port
)
def wrap_udf_in_torch_dist_launcher(
udf_command: str,
num_trainers: int,
num_nodes: int,
node_rank: int,
master_addr: str,
master_port: int,
) -> str:
"""Wraps the user-defined function (udf_command) with the torch.distributed.launch module.
Example: if udf_command is "python3 run/some/trainer.py arg1 arg2", then new_df_command becomes:
"python3 -m torch.distributed.launch <TORCH DIST ARGS> run/some/trainer.py arg1 arg2
udf_command is assumed to consist of pre-commands (optional) followed by the python launcher script (required):
Examples:
# simple
python3.7 path/to/some/trainer.py arg1 arg2
# multi-commands
(cd some/dir && python3.7 path/to/some/trainer.py arg1 arg2)
IMPORTANT: If udf_command consists of multiple python commands, then this will result in undefined behavior.
Args:
udf_command:
num_trainers:
num_nodes:
node_rank:
master_addr:
master_port:
Returns:
"""
torch_dist_cmd = construct_torch_dist_launcher_cmd(
num_trainers=num_trainers,
num_nodes=num_nodes,
node_rank=node_rank,
master_addr=master_addr,
master_port=master_port
)
# Auto-detect the python binary that kicks off the distributed trainer code.
# Note: This allowlist order matters, this will match with the FIRST matching entry. Thus, please add names to this
# from most-specific to least-specific order eg:
# (python3.7, python3.8) -> (python3)
# The allowed python versions are from this: https://www.dgl.ai/pages/start.html
python_bin_allowlist = (
"python3.6", "python3.7", "python3.8", "python3.9", "python3",
# for backwards compatibility, accept python2 but technically DGL is a py3 library, so this is not recommended
"python2.7", "python2",
)
# If none of the candidate python bins match, then we go with the default `python`
python_bin = "python"
for candidate_python_bin in python_bin_allowlist:
if candidate_python_bin in udf_command:
python_bin = candidate_python_bin
break
# transforms the udf_command from:
# python path/to/dist_trainer.py arg0 arg1
# to:
# python -m torch.distributed.launch [DIST TORCH ARGS] path/to/dist_trainer.py arg0 arg1
# Note: if there are multiple python commands in `udf_command`, this may do the Wrong Thing, eg launch each
# python command within the torch distributed launcher.
new_udf_command = udf_command.replace(python_bin, f"{python_bin} {torch_dist_cmd}")
return new_udf_command
def construct_dgl_server_env_vars(
ip_config: str,
num_proc_per_machine: int,
pythonpath: Optional[str] = "",
) -> str:
"""Constructs the DGL server-specific env vars string that are required for DGL code to behave in the correct
server role.
Convenience function.
Args:
ip_config: IP config file containing IP addresses of cluster hosts.
Relative path to workspace.
num_proc_per_machine:
pythonpath: Optional. If given, this will pass this as PYTHONPATH.
Returns:
server_env_vars: The server-specific env-vars in a string format, friendly for CLI execution.
"""
server_env_vars_template = (
"DGL_IP_CONFIG={DGL_IP_CONFIG} "
"DGL_NUM_SERVER={DGL_NUM_SERVER} "
"{suffix_optional_envvars}"
)
suffix_optional_envvars = ""
if pythonpath:
suffix_optional_envvars += f"PYTHONPATH={pythonpath} "
return server_env_vars_template.format(
DGL_IP_CONFIG=ip_config,
DGL_NUM_SERVER=num_proc_per_machine,
suffix_optional_envvars=suffix_optional_envvars,
)
def wrap_cmd_with_local_envvars(cmd: str, env_vars: str) -> str:
"""Wraps a CLI command with desired env vars with the following properties:
(1) env vars persist for the entire `cmd`, even if it consists of multiple "chained" commands like:
cmd = "ls && pwd && python run/something.py"
(2) env vars don't pollute the environment after `cmd` completes.
Example:
>>> cmd = "ls && pwd"
>>> env_vars = "VAR1=value1 VAR2=value2"
>>> wrap_cmd_with_local_envvars(cmd, env_vars)
"(export VAR1=value1 VAR2=value2; ls && pwd)"
Args:
cmd:
env_vars: A string containing env vars, eg "VAR1=val1 VAR2=val2"
Returns:
cmd_with_env_vars:
"""
# use `export` to persist env vars for entire cmd block. required if udf_command is a chain of commands
# also: wrap in parens to not pollute env:
# https://stackoverflow.com/a/45993803
return f"(export {env_vars}; {cmd})"
def wrap_cmd_with_extra_envvars(cmd: str, env_vars: list) -> str:
"""Wraps a CLI command with extra env vars
Example:
>>> cmd = "ls && pwd"
>>> env_vars = ["VAR1=value1", "VAR2=value2"]
>>> wrap_cmd_with_extra_envvars(cmd, env_vars)
"(export VAR1=value1 VAR2=value2; ls && pwd)"
Args:
cmd:
env_vars: A list of strings containing env vars, e.g., ["VAR1=value1", "VAR2=value2"]
Returns:
cmd_with_env_vars:
"""
env_vars = " ".join(env_vars)
return wrap_cmd_with_local_envvars(cmd, env_vars)
def submit_jobs(args, udf_command):
"""Submit distributed jobs (server and client processes) via ssh"""
hosts = []
thread_list = []
server_count_per_machine = 0
# Get the IP addresses of the cluster.
#ip_config = os.path.join(args.workspace, args.ip_config)
ip_config = args.ip_config
with open(ip_config) as f:
for line in f:
result = line.strip().split()
if len(result) == 2:
ip = result[0]
port = int(result[1])
hosts.append((ip, port))
elif len(result) == 1:
ip = result[0]
port = DEFAULT_PORT
hosts.append((ip, port))
else:
raise RuntimeError("Format error of ip_config.")
server_count_per_machine = args.num_proc_per_machine
# launch server tasks
server_env_vars = construct_dgl_server_env_vars(
ip_config=args.ip_config,
num_proc_per_machine=args.num_proc_per_machine,
pythonpath=os.environ.get("PYTHONPATH", ""),
)
for i in range(len(hosts) * server_count_per_machine):
ip, _ = hosts[int(i / server_count_per_machine)]
server_env_vars_cur = f"{server_env_vars} RANK={i} MASTER_ADDR={hosts[0][0]} MASTER_PORT={args.master_port}"
cmd = wrap_cmd_with_local_envvars(udf_command, server_env_vars_cur)
print(cmd)
thread_list.append(execute_remote(cmd, ip, args.ssh_port, username=args.ssh_username))
# Start a cleanup process dedicated for cleaning up remote training jobs.
conn1,conn2 = multiprocessing.Pipe()
func = partial(get_all_remote_pids, hosts, args.ssh_port, udf_command)
process = multiprocessing.Process(target=cleanup_proc, args=(func, conn1))
process.start()
def signal_handler(signal, frame):
logging.info('Stop launcher')
# We need to tell the cleanup process to kill remote training jobs.
conn2.send('cleanup')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
for thread in thread_list:
thread.join()
# The training processes complete. We should tell the cleanup process to exit.
conn2.send('exit')
process.join()
def main():
parser = argparse.ArgumentParser(description='Launch a distributed job')
parser.add_argument('--ssh_port', type=int, default=22, help='SSH Port.')
parser.add_argument(
"--ssh_username", default="",
help="Optional. When issuing commands (via ssh) to cluster, use the provided username in the ssh cmd. "
"Example: If you provide --ssh_username=bob, then the ssh command will be like: 'ssh bob@1.2.3.4 CMD' "
"instead of 'ssh 1.2.3.4 CMD'"
)
parser.add_argument('--num_proc_per_machine', type=int,
help='The number of server processes per machine')
parser.add_argument('--master_port', type=int,
help='This port is used to form gloo group (randevouz server)')
parser.add_argument('--ip_config', type=str,
help='The file (in workspace) of IP configuration for server processes')
args, udf_command = parser.parse_known_args()
assert len(udf_command) == 1, 'Please provide user command line.'
assert args.num_proc_per_machine is not None and args.num_proc_per_machine > 0, \
'--num_proc_per_machine must be a positive number.'
assert args.ip_config is not None, \
'A user has to specify an IP configuration file with --ip_config.'
udf_command = str(udf_command[0])
if 'python' not in udf_command:
raise RuntimeError("DGL launching script can only support Python executable file.")
submit_jobs(args, udf_command)
if __name__ == '__main__':
fmt = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
main()
### xxx_nodes.txt format
This file is used to provide node information to this framework. Following is the format for each line in this file:
```
<node_type> <weight1> <weight2> <weight3> <weight4> <global_type_node_id> <attributes>
```
where node_type is the type id of this node, weights can be any number of columns as determined by the user, global_type_node_id are the contiguous ids starting from `0` for a particular node_type. And attributes can be any number of columns at the end of each line.
###xxx___edges.txt format
This file is used to provide edge information to this framework. Following is the format for each line in this file:
```
<global_src_id> <global_dst_id> <global_type_edge_id> <edge_type> <attributes>
```
where global_src_id and global_dst_id are two end points of an edge, global_type_edge_id is the unique id assigned to each edge type and are contiguous, and starting from 0, for each edge_type. Attributes can be any number of columns at the end of each line.
###Naming convention
`global_` prefix (for any node or edge ids) indicate that these ids are read from graph input files. These ids are allocated to nodes and edges before `data shuffling`. These ids are globally unique across all partitions.
`shuffle_global_` prefix (for any node or edge ids) indicate that these ids are assigned after the `data shuffling` is completed. These ids are globally unique across all partitions.
`part_local_` prefix (for any node or edge ids) indicate that these ids are assigned after the `data shuffling` and are unique within a given partition.
For instance, if a variable is named as `global_src_id` it means that this id is read from the graph input file and is assumed to be globally unique across all partitions. Similarly if a variable is named `part_local_node_id` then it means that this node_id is assigned after the data shuffling is complete and is unique with a given partition.
###High level description of the algorithm
####Single file format for graph input files
Here we assume that all the nodes' related data is present in one single file and similarly all the edges are in one single file.
In this case following steps are executed to write dgl objects for each partition, as assigned my any partitioning algorithm, for example METIS.
#####Step 1 (Data Loading):
Rank-0 process reads in all the graph files which are xxx_nodes.txt, xxx_edges.txt, node_feats.dgl, edge_feats.dgl and xxx_removed_edges.txt.
Rank-0 process determines the ownership of nodes by using the output of partitioning algorithm (here, we expect the output of partitioning step is a mapping between a node and its partition id for the entire graph). Edge ownership is determined by the `destination` node-id for that edge. Each edge belongs to the partition-id of the destination node-id of each edge.
#####Step 2 (Data Shuffling):
Rank-0 process will send node-data, edge-data, node-features, edge-features to their respective processes by using the ownership rules described in Step-1. Non-Rank-0 processes will receive their own nodes, edges, node-features and edge-features and store them in local data-structures. Upon completion of sending information Rank-0 process will delete nodes, edges, node-features and edge-features which are not owned by rank-0.
#####Step 3 (ID assignment and resolution):
At this time all the ranks will have their own local information in their respective data structures. Then each process will perform the following steps: a) Assign shuffle_global_xxx (here xxx is node_ids and edge_ids) for nodes and edges by performing prefix sum on all ranks. b) Assign part_local_xxx (xxx means node_ids and edge_ids) to nodes and edges so that they can be used to index into the node and edge features, and c) Retrieve shuffle_global_node_ids by using global_node_ids to determine the ownership of any given node. This step is done for the node_ids (present locally on any given rank) for which shuffle_global_node_ids were assigned on a different rank'ed process.
#####Ste 4 (Serialization):
After every rank has global-ids, shuffle_global-ids, part_local-ids for all the nodes and edges present locally, then it proceeds by DGL object creation. Finally Rank-0 process will aggregate graph-level metadata and create a json file with graph-level information.
###How to use this tool
To run this code on a single machine using multiple processes, use the following command
```
python3 data_proc_pipeline.py --world-size 2 --nodes-file mag_nodes.txt --edges-file mag_edges.txt --node-feats-file node_feat.dgl --metis-partitions mag_part.2 --input-dir /home/ubuntu/data --graph-name mag --schema mag.json --num-parts 2 --num-node-weights 4 --workspace /home/ubuntu/data --node-attr-dtype float --output /home/ubuntu/data/outputs --removed-edges mag_removed_edges.txt
```
Above command, assumes that there are `2` partitions and number of node weights are `4`. All other command line arguments are self-explanatory.
GLOBAL_NID = "global_node_id"
GLOBAL_EID = "global_edge_id"
SHUFFLE_GLOBAL_NID = "shuffle_global_node_id"
SHUFFLE_GLOBAL_EID = "shuffle_global_edge_id"
NTYPE_ID = "node_type_id"
ETYPE_ID = "edge_type_id"
GLOBAL_TYPE_NID = "global_type_node_id"
GLOBAL_TYPE_EID = "global_type_edge_id"
GLOBAL_SRC_ID = "global_src_id"
GLOBAL_DST_ID = "global_dst_id"
SHUFFLE_GLOBAL_SRC_ID = "shuffle_global_src_id"
SHUFFLE_GLOBAL_DST_ID = "shuffle_global_dst_id"
OWNER_PROCESS = "owner_proc_id"
PART_LOCAL_NID = "part_local_nid"
GLOO_MESSAGING_TIMEOUT = 60 #seconds
STR_NODE_TYPE = "node_type"
STR_NUM_NODES_PER_CHUNK = "num_nodes_per_chunk"
STR_EDGE_TYPE = "edge_type"
STR_NUM_EDGES_PER_CHUNK = "num_edges_per_chunk"
STR_EDGES = "edges"
STR_FORMAT = "format"
STR_DATA = "data"
STR_NODE_DATA = "node_data"
STR_EDGE_DATA = "edge_data"
STR_NUMPY = "numpy"
STR_CSV = "csv"
STR_NAME = "name"
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
import pandas as pd
import constants
from pyarrow import csv
from utils import read_json, get_idranges
def create_dgl_object(graph_name, num_parts, \
schema, part_id, node_data, \
edge_data, nodeid_offset, edgeid_offset):
"""
This function creates dgl objects for a given graph partition, as in function
arguments.
The "schema" argument is a dictionary, which contains the metadata related to node ids
and edge ids. It contains two keys: "nid" and "eid", whose value is also a dictionary
with the following structure.
1. The key-value pairs in the "nid" dictionary has the following format.
"ntype-name" is the user assigned name to this node type. "format" describes the
format of the contents of the files. and "data" is a list of lists, each list has
3 elements: file-name, start_id and end_id. File-name can be either absolute or
relative path to this file and starting and ending ids are type ids of the nodes
which are contained in this file. These type ids are later used to compute global ids
of these nodes which are used throughout the processing of this pipeline.
"ntype-name" : {
"format" : "csv",
"data" : [
[ <path-to-file>/ntype0-name-0.csv, start_id0, end_id0],
[ <path-to-file>/ntype0-name-1.csv, start_id1, end_id1],
...
[ <path-to-file>/ntype0-name-<p-1>.csv, start_id<p-1>, end_id<p-1>],
]
}
2. The key-value pairs in the "eid" dictionary has the following format.
As described for the "nid" dictionary the "eid" dictionary is similarly structured
except that these entries are for edges.
"etype-name" : {
"format" : "csv",
"data" : [
[ <path-to-file>/etype0-name-0, start_id0, end_id0],
[ <path-to-file>/etype0-name-1 start_id1, end_id1],
...
[ <path-to-file>/etype0-name-1 start_id<p-1>, end_id<p-1>]
]
}
In "nid" dictionary, the type_nids are specified that
should be assigned to nodes which are read from the corresponding nodes file.
Along the same lines dictionary for the key "eid" is used for edges in the
input graph.
These type ids, for nodes and edges, are used to compute global ids for nodes
and edges which are stored in the graph object.
Parameters:
-----------
graph_name : string
name of the graph
num_parts : int
total no. of partitions (of the original graph)
schame : json object
json object created by reading the graph metadata json file
part_id : int
partition id of the graph partition for which dgl object is to be created
node_data : numpy ndarray
node_data, where each row is of the following format:
<global_nid> <ntype_id> <global_type_nid>
edge_data : numpy ndarray
edge_data, where each row is of the following format:
<global_src_id> <global_dst_id> <etype_id> <global_type_eid>
nodeid_offset : int
offset to be used when assigning node global ids in the current partition
edgeid_offset : int
offset to be used when assigning edge global ids in the current partition
Returns:
--------
dgl object
dgl object created for the current graph partition
dictionary
map between node types and the range of global node ids used
dictionary
map between edge types and the range of global edge ids used
dictionary
map between node type(string) and node_type_id(int)
dictionary
map between edge type(string) and edge_type_id(int)
"""
#create auxiliary data structures from the schema object
ntid_dict, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK])
etid_dict, global_eid_ranges = get_idranges(schema[constants.STR_EDGE_TYPE],
schema[constants.STR_NUM_EDGES_PER_CHUNK])
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
ntypes = [(key, global_nid_ranges[key][0, 0]) for key in global_nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e.split(":")[1]: i for i, e in enumerate(etypes)}
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype.split(":")[1]: [] for etype in etypes}
shuffle_global_nids, ntype_ids, global_type_nid = node_data[constants.SHUFFLE_GLOBAL_NID], \
node_data[constants.NTYPE_ID], node_data[constants.GLOBAL_TYPE_NID]
global_homo_nid = ntype_offset_np[ntype_ids] + global_type_nid
assert np.all(shuffle_global_nids[1:] - shuffle_global_nids[:-1] == 1)
shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1])
# Determine the node ID ranges of different node types.
for ntype_name in global_nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = shuffle_global_nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
#process edges
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_TYPE_EID], edge_data[constants.ETYPE_ID]
print('There are {} edges in partition {}'.format(len(shuffle_global_src_id), part_id))
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids)
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
shuffle_global_src_id[sort_idx], shuffle_global_dst_id[sort_idx], global_src_id[sort_idx], \
global_dst_id[sort_idx], global_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
# Determine the edge ID range of different edge types.
edge_id_start = edgeid_offset
for etype_name in global_eid_ranges:
tokens = etype_name.split(":")
assert len(tokens) == 3
etype_id = etypes_map[tokens[1]]
edge_map_val[tokens[1]].append([edge_id_start,
edge_id_start + np.sum(etype_ids == etype_id)])
edge_id_start += np.sum(etype_ids == etype_id)
# get the edge list in some order and then reshuffle.
# Here the order of nodes is defined by the `np.unique` function
# node order is as listed in the uniq_ids array
ids = np.concatenate(
[shuffle_global_src_id, shuffle_global_dst_id,
np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
part_local_src_id, part_local_dst_id = np.split(inverse_idx[:len(shuffle_global_src_id) * 2], 2)
inner_nodes = th.as_tensor(np.logical_and(
uniq_ids >= shuffle_global_nid_range[0],
uniq_ids <= shuffle_global_nid_range[1]))
#get the list of indices, from inner_nodes, which will sort inner_nodes as [True, True, ...., False, False, ...]
#essentially local nodes will be placed before non-local nodes.
reshuffle_nodes = th.arange(len(uniq_ids))
reshuffle_nodes = th.cat([reshuffle_nodes[inner_nodes.bool()],
reshuffle_nodes[inner_nodes == 0]])
'''
Following procedure is used to map the part_local_src_id, part_local_dst_id to account for
reshuffling of nodes (to order localy owned nodes prior to non-local nodes in a partition)
1. Form a node_map, in this case a numpy array, which will be used to map old node-ids (pre-reshuffling)
to post-reshuffling ids.
2. Once the map is created, use this map to map all the node-ids in the part_local_src_id
and part_local_dst_id list to their appropriate `new` node-ids (post-reshuffle order).
3. Since only the node's order is changed, we will have to re-order nodes related information when
creating dgl object: this includes orig_id, dgl.NTYPE, dgl.NID and inner_node.
4. Edge's order is not changed. At this point in the execution path edges are still ordered by their etype-ids.
5. Create the dgl object appropriately and return the dgl object.
Here is a simple example to understand the above flow better.
part_local_nids = [0, 1, 2, 3, 4, 5]
part_local_src_ids = [0, 0, 0, 0, 2, 3, 4]
part_local_dst_ids = [1, 2, 3, 4, 4, 4, 5]
Assume that nodes {1, 5} are halo-nodes, which are not owned by this partition.
reshuffle_nodes = [0, 2, 3, 4, 1, 5]
A node_map, which maps node-ids from old to reshuffled order is as follows:
node_map = np.zeros((len(reshuffle_nodes,)))
node_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))
Using the above map, we have mapped part_local_src_ids and part_local_dst_ids as follows:
part_local_src_ids = [0, 0, 0, 0, 1, 2, 3]
part_local_dst_ids = [4, 1, 2, 3, 3, 3, 5]
In this graph above, note that nodes {0, 1, 2, 3} are inner_nodes and {4, 5} are NON-inner-nodes
Since the edge are re-ordered in any way, there is no reordering required for edge related data
during the DGL object creation.
'''
#create the mappings to generate mapped part_local_src_id and part_local_dst_id
#This map will map from unshuffled node-ids to reshuffled-node-ids (which are ordered to prioritize
#locally owned nodes).
nid_map = np.zeros((len(reshuffle_nodes,)))
nid_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))
#Now map the edge end points to reshuffled_values.
part_local_src_id, part_local_dst_id = nid_map[part_local_src_id], nid_map[part_local_dst_id]
#create the graph here now.
part_graph = dgl.graph(data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids))
part_graph.edata[dgl.EID] = th.arange(
edgeid_offset, edgeid_offset + part_graph.number_of_edges(), dtype=th.int64)
part_graph.edata['orig_id'] = th.as_tensor(global_edge_id)
part_graph.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
part_graph.edata['inner_edge'] = th.ones(part_graph.number_of_edges(), dtype=th.bool)
#compute per_type_ids and ntype for all the nodes in the graph.
global_ids = np.concatenate(
[global_src_id, global_dst_id, global_homo_nid])
part_global_ids = global_ids[idx]
part_global_ids = part_global_ids[reshuffle_nodes]
ntype, per_type_ids = id_map(part_global_ids)
#continue with the graph creation
part_graph.ndata['orig_id'] = th.as_tensor(per_type_ids)
part_graph.ndata[dgl.NTYPE] = th.as_tensor(ntype)
part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
part_graph.ndata['inner_node'] = inner_nodes[reshuffle_nodes]
return part_graph, node_map_val, edge_map_val, ntypes_map, etypes_map
def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, node_map_val, \
edge_map_val, ntypes_map, etypes_map, output_dir ):
"""
Auxiliary function to create json file for the graph partition metadata
Parameters:
-----------
graph_name : string
name of the graph
num_nodes : int
no. of nodes in the graph partition
num_edges : int
no. of edges in the graph partition
part_id : int
integer indicating the partition id
num_parts : int
total no. of partitions of the original graph
node_map_val : dictionary
map between node types and the range of global node ids used
edge_map_val : dictionary
map between edge types and the range of global edge ids used
ntypes_map : dictionary
map between node type(string) and node_type_id(int)
etypes_map : dictionary
map between edge type(string) and edge_type_id(int)
output_dir : string
directory where the output files are to be stored
Returns:
--------
dictionary
map describing the graph information
"""
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
part_dir = 'part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
return part_metadata
import argparse
import numpy as np
import torch.multiprocessing as mp
import logging
import platform
import os
from data_shuffle import single_machine_run, multi_machine_run
def log_params(params):
""" Print all the command line arguments for debugging purposes.
Parameters:
-----------
params: argparse object
Argument Parser structure listing all the pre-defined parameters
"""
print('Input Dir: ', params.input_dir)
print('Graph Name: ', params.graph_name)
print('Schema File: ', params.schema)
print('No. partitions: ', params.num_parts)
print('Output Dir: ', params.output)
print('WorldSize: ', params.world_size)
print('Metis partitions: ', params.partitions_file)
if __name__ == "__main__":
"""
Start of execution from this point.
Invoke the appropriate function to begin execution
"""
#arguments which are already needed by the existing implementation of convert_partition.py
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--partitions-dir', help='directory of the partition-ids for each node type',
default=None, type=str)
#arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn',
default=1, type=int, required=True)
params = parser.parse_args()
#invoke the pipeline function
logging.basicConfig(level='INFO', format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s")
multi_machine_run(params)
This diff is collapsed.
import os
import numpy as np
import constants
import torch
import logging
import pyarrow
from pyarrow import csv
from utils import get_idranges
def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
"""
Function to read the multiple file formatted dataset.
Parameters:
-----------
input_dir : string
root directory where dataset is located.
graph_name : string
graph name string
rank : int
rank of the current process
world_size : int
total number of process in the current execution
schema_map : dictionary
this is the dictionary created by reading the graph metadata json file
for the input graph dataset
Return:
-------
dictionary
where keys are node-type names and values are tuples. Each tuple represents the
range of type ids read from a file by the current process. Please note that node
data for each node type is split into "p" files and each one of these "p" files are
read a process in the distributed graph partitioning pipeline
dictionary
Data read from numpy files for all the node features in this dataset. Dictionary built
using this data has keys as node feature names and values as tensor data representing
node features
dictionary
in which keys are node-type and values are a triplet. This triplet has node-feature name,
and range of tids for the node feature data read from files by the current process. Each
node-type may have mutiple feature(s) and associated tensor data.
dictionary
Data read from edges.txt file and used to build a dictionary with keys as column names
and values as columns in the csv file.
dictionary
in which keys are edge-type names and values are triplets. This triplet has edge-feature name,
and range of tids for theedge feature data read from the files by the current process. Each
edge-type may have several edge features and associated tensor data.
"""
#node features dictionary
#TODO: With the new file format, It is guaranteed that the input dataset will have
#no. of nodes with features (node-features) files and nodes metadata will always be the same.
#This means the dimension indicating the no. of nodes in any node-feature files and the no. of
#nodes in the corresponding nodes metadata file will always be the same. With this guarantee,
#we can eliminate the `node_feature_tids` dictionary since the same information is also populated
#in the `node_tids` dictionary. This will be remnoved in the next iteration of code changes.
node_features = {}
node_feature_tids = {}
'''
The structure of the node_data is as follows, which is present in the input metadata json file.
"node_data" : {
"ntype0-name" : {
"feat0-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
},
"feat1-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
}
}
}
As shown above, the value for the key "node_data" is a dictionary object, which is
used to describe the feature data for each of the node-type names. Keys in this top-level
dictionary are node-type names and value is a dictionary which captures all the features
for the current node-type. Feature data is captured with keys being the feature-names and
value is a dictionary object which has 2 keys namely format and data. Format entry is used
to mention the format of the storage used by the node features themselves and "data" is used
to mention all the files present for this given node feature.
Data read from each of the node features file is a multi-dimensional tensor data and is read
in numpy format, which is also the storage format of node features on the permanent storage.
'''
#iterate over the "node_data" dictionary in the schema_map
#read the node features if exists
#also keep track of the type_nids for which the node_features are read.
dataset_features = schema_map[constants.STR_NODE_DATA]
if((dataset_features is not None) and (len(dataset_features) > 0)):
for ntype_name, ntype_feature_data in dataset_features.items():
#ntype_feature_data is a dictionary
#where key: feature_name, value: dictionary in which keys are "format", "data"
node_feature_tids[ntype_name] = []
for feat_name, feat_data in ntype_feature_data.items():
assert len(feat_data[constants.STR_DATA]) == world_size
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
my_feat_data_fname = feat_data[constants.STR_DATA][rank] #this will be just the file name
if (os.path.isabs(my_feat_data_fname)):
logging.info(f'Loading numpy from {my_feat_data_fname}')
node_features[ntype_name+'/'+feat_name] = \
torch.from_numpy(np.load(my_feat_data_fname))
else:
numpy_path = os.path.join(input_dir, my_feat_data_fname)
logging.info(f'Loading numpy from {numpy_path}')
node_features[ntype_name+'/'+feat_name] = \
torch.from_numpy(np.load(numpy_path))
node_feature_tids[ntype_name].append([feat_name, -1, -1])
'''
"node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
"num_nodes_per_chunk" : [
[a0, a1, ...a<p-1>], #p partitions
[b0, b1, ... b<p-1>],
....
[c0, c1, ..., c<p-1>] #no, of node types
],
The "node_type" points to a list of all the node names present in the graph
And "num_nodes_per_chunk" is used to mention no. of nodes present in each of the
input nodes files. These node counters are used to compute the type_node_ids as
well as global node-ids by using a simple cumulative summation and maitaining an
offset counter to store the end of the current.
Since nodes are NOT actually associated with any additional metadata, w.r.t to the processing
involved in this pipeline this information is not needed to be stored in files. This optimization
saves a considerable amount of time when loading massively large datasets for paritioning.
As opposed to reading from files and performing shuffling process each process/rank generates nodes
which are owned by that particular rank. And using the "num_nodes_per_chunk" information each
process can easily compute any nodes per-type node_id and global node_id.
The node-ids are treated as int64's in order to support billions of nodes in the input graph.
'''
#read my nodes for each node type
node_tids, ntype_gnid_offset = get_idranges(schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK])
for ntype_name in schema_map[constants.STR_NODE_TYPE]:
if ntype_name in node_feature_tids:
for item in node_feature_tids[ntype_name]:
item[1] = node_tids[ntype_name][rank][0]
item[2] = node_tids[ntype_name][rank][1]
#done build node_features locally.
if len(node_features) <= 0:
logging.info(f'[Rank: {rank}] This dataset does not have any node features')
else:
for k, v in node_features.items():
logging.info(f'[Rank: {rank}] node feature name: {k}, feature data shape: {v.size()}')
'''
Code below is used to read edges from the input dataset with the help of the metadata json file
for the input graph dataset.
In the metadata json file, we expect the following key-value pairs to help read the edges of the
input graph.
"edge_type" : [ # a total of n edge types
canonical_etype_0,
canonical_etype_1,
...,
canonical_etype_n-1
]
The value for the key is a list of strings, each string is associated with an edgetype in the input graph.
Note that these strings are in canonical edgetypes format. This means, these edge type strings follow the
following naming convention: src_ntype:etype:dst_ntype. src_ntype and dst_ntype are node type names of the
src and dst end points of this edge type, and etype is the relation name between src and dst ntypes.
The files in which edges are present and their storage format are present in the following key-value pair:
"edges" : {
"canonical_etype_0" : {
"format" : { "name" : "csv", "delimiter" : " " },
"data" : [
filename_0,
filename_1,
filename_2,
....
filename_<p-1>
]
},
}
As shown above the "edges" dictionary value has canonical edgetypes as keys and for each canonical edgetype
we have "format" and "data" which describe the storage format of the edge files and actual filenames respectively.
Please note that each edgetype data is split in to `p` files, where p is the no. of partitions to be made of
the input graph.
Each edge file contains two columns representing the source per-type node_ids and destination per-type node_ids
of any given edge. Since these are node-ids as well they are read in as int64's.
'''
#read my edges for each edge type
etype_names = schema_map[constants.STR_EDGE_TYPE]
etype_name_idmap = {e : idx for idx, e in enumerate(etype_names)}
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK])
edge_datadict = {}
edge_data = schema_map[constants.STR_EDGES]
#read the edges files and store this data in memory.
for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, \
constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]:
edge_datadict[col] = []
for etype_name, etype_info in edge_data.items():
assert etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_CSV
edge_info = etype_info[constants.STR_DATA]
assert len(edge_info) == world_size
#edgetype strings are in canonical format, src_node_type:edge_type:dst_node_type
tokens = etype_name.split(":")
assert len(tokens) == 3
src_ntype_name = tokens[0]
rel_name = tokens[1]
dst_ntype_name = tokens[2]
logging.info(f'Reading csv files from {edge_info[rank]}')
data_df = csv.read_csv(edge_info[rank], read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
#currently these are just type_edge_ids... which will be converted to global ids
edge_datadict[constants.GLOBAL_SRC_ID].append(data_df['f0'].to_numpy() + ntype_gnid_offset[src_ntype_name][0, 0])
edge_datadict[constants.GLOBAL_DST_ID].append(data_df['f1'].to_numpy() + ntype_gnid_offset[dst_ntype_name][0, 0])
edge_datadict[constants.GLOBAL_TYPE_EID].append(np.arange(edge_tids[etype_name][rank][0],\
edge_tids[etype_name][rank][1] ,dtype=np.int64))
edge_datadict[constants.ETYPE_ID].append(etype_name_idmap[etype_name] * \
np.ones(shape=(data_df['f0'].to_numpy().shape), dtype=np.int64))
#stitch together to create the final data on the local machine
for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]:
edge_datadict[col] = np.concatenate(edge_datadict[col])
assert edge_datadict[constants.GLOBAL_SRC_ID].shape == edge_datadict[constants.GLOBAL_DST_ID].shape
assert edge_datadict[constants.GLOBAL_DST_ID].shape == edge_datadict[constants.GLOBAL_TYPE_EID].shape
assert edge_datadict[constants.GLOBAL_TYPE_EID].shape == edge_datadict[constants.ETYPE_ID].shape
logging.info(f'[Rank: {rank}] Done reading edge_file: {len(edge_datadict)}, {edge_datadict[constants.GLOBAL_SRC_ID].shape}')
return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids
import numpy as np
import os
import pyarrow
import torch
from pyarrow import csv
from gloo_wrapper import alltoallv_cpu
class DistLookupService:
'''
This is an implementation of a Distributed Lookup Service to provide the following
services to its users. Map 1) global node-ids to partition-ids, and 2) global node-ids
to shuffle global node-ids (contiguous, within each node for a give node_type and across
all the partitions)
This services initializes itself with the node-id to partition-id mappings, which are inputs
to this service. The node-id to partition-id mappings are assumed to be in one file for each
node type. These node-id-to-partition-id mappings are split within the service processes so that
each process ends up with a contiguous chunk. It first divides the no of mappings (node-id to
partition-id) for each node type into equal chunks across all the service processes. So each
service process will be thse owner of a set of node-id-to-partition-id mappings. This class
has two functions which are as follows:
1) `get_partition_ids` function which returns the node-id to partition-id mappings to the user
2) `get_shuffle_nids` function which returns the node-id to shuffle-node-id mapping to the user
Parameters:
-----------
input_dir : string
string representing the input directory where the node-type partition-id
files are located
ntype_names : list of strings
list of strings which are used to read files located within the input_dir
directory and these files contents are partition-id's for the node-ids which
are of a particular node type
id_map : dgl.distributed.id_map instance
this id_map is used to retrieve ntype-ids, node type ids, and type_nids, per type
node ids, for any given global node id
rank : integer
integer indicating the rank of a given process
world_size : integer
integer indicating the total no. of processes
'''
def __init__(self, input_dir, ntype_names, id_map, rank, world_size):
assert os.path.isdir(input_dir)
assert ntype_names is not None
assert len(ntype_names) > 0
# These lists are indexed by ntype_ids.
type_nid_begin = []
type_nid_end = []
partid_list = []
ntype_count = []
# Iterate over the node types and extract the partition id mappings.
for ntype in ntype_names:
print('[Rank: ', rank, '] Reading file: ', os.path.join(input_dir, '{}.txt'.format(ntype)))
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
ntype_partids = df['f0'].to_numpy()
count = len(ntype_partids)
ntype_count.append(count)
# Each rank assumes a contiguous set of partition-ids which are equally split
# across all the processes.
split_size = np.ceil(count/np.int64(world_size)).astype(np.int64)
start, end = np.int64(rank)*split_size, np.int64(rank+1)*split_size
if rank == (world_size-1):
end = count
type_nid_begin.append(start)
type_nid_end.append(end)
# Slice the partition-ids which belong to the current instance.
partid_list.append(ntype_partids[start:end])
# Store all the information in the object instance variable.
self.id_map = id_map
self.type_nid_begin = np.array(type_nid_begin, dtype=np.int64)
self.type_nid_end = np.array(type_nid_end, dtype=np.int64)
self.partid_list = partid_list
self.ntype_count = np.array(ntype_count, dtype=np.int64)
self.rank = rank
self.world_size = world_size
def get_partition_ids(self, global_nids):
'''
This function is used to get the partition-ids for a given set of global node ids
global_nids <-> partition-ids mappings are deterministically distributed across
all the participating processes, within the service. A contiguous global-nids
(ntype-ids, per-type-nids) are stored within each process and this is determined
by the total no. of nodes of a given ntype-id and the rank of the process.
Process, where the global_nid <-> partition-id mapping is stored can be easily computed
as described above. Once this is determined we perform an alltoallv to send the request.
On the receiving side, each process receives a set of global_nids and retrieves corresponding
partition-ids using locally stored lookup tables. It builds responses to all the other
processes and performs alltoallv.
Once the response, partition-ids, is received, they are re-ordered corresponding to the
incoming global-nids order and returns to the caller.
Parameters:
-----------
self : instance of this class
instance of this class, which is passed by the runtime implicitly
global_nids : numpy array
an array of global node-ids for which partition-ids are to be retrieved by
the distributed lookup service.
Returns:
--------
list of integers :
list of integers, which are the partition-ids of the global-node-ids (which is the
function argument)
'''
# Find the process where global_nid --> partition-id(owner) is stored.
ntype_ids, type_nids = self.id_map(global_nids)
ntype_ids, type_nids = ntype_ids.numpy(), type_nids.numpy()
assert len(ntype_ids) == len(global_nids)
# For each node-type, the per-type-node-id <-> partition-id mappings are
# stored as contiguous chunks by this lookup service.
# The no. of these mappings stored by each process, in the lookup service, are
# equally split among all the processes in the lookup service, deterministically.
typeid_counts = self.ntype_count[ntype_ids]
chunk_sizes = np.ceil(typeid_counts/self.world_size).astype(np.int64)
service_owners = np.floor_divide(type_nids, chunk_sizes).astype(np.int64)
# Now `service_owners` is a list of ranks (process-ids) which own the corresponding
# global-nid <-> partition-id mapping.
# Split the input global_nids into a list of lists where each list will be
# sent to the respective rank/process
# We also need to store the indices, in the indices_list, so that we can re-order
# the final result (partition-ids) in the same order as the global-nids (function argument)
send_list = []
indices_list = []
for idx in range(self.world_size):
idxes = np.where(service_owners == idx)
ll = global_nids[idxes[0]]
send_list.append(torch.from_numpy(ll))
indices_list.append(idxes[0])
assert len(np.concatenate(indices_list)) == len(global_nids)
assert np.all(np.sort(np.concatenate(indices_list)) == np.arange(len(global_nids)))
# Send the request to everyone else.
# As a result of this operation, the current process also receives a list of lists
# from all the other processes.
# These lists are global-node-ids whose global-node-ids <-> partition-id mappings
# are owned/stored by the current process
owner_req_list = alltoallv_cpu(self.rank, self.world_size, send_list)
# Create the response list here for each of the request list received in the previous
# step. Populate the respective partition-ids in this response lists appropriately
out_list = []
for idx in range(self.world_size):
if owner_req_list[idx] is None:
out_list.append(torch.empty((0,), dtype=torch.int64))
continue
# Get the node_type_ids and per_type_nids for the incoming global_nids.
ntype_ids, type_nids = self.id_map(owner_req_list[idx].numpy())
nypte_ids, type_nids = ntype_ids.numpy(), type_nids.numpy()
# Lists to store partition-ids for the incoming global-nids.
type_id_lookups = []
local_order_idx = []
# Now iterate over all the node_types and acculumulate all the partition-ids
# since all the partition-ids are based on the node_type order... they
# must be re-ordered as per the order of the input, which may be different.
for tid in range(len(self.partid_list)):
cond = ntype_ids == tid
local_order_idx.append(np.where(cond)[0])
global_type_nids = type_nids[cond]
if len(global_type_nids) <= 0:
continue
local_type_nids = global_type_nids - self.type_nid_begin[tid]
assert np.all(local_type_nids >= 0)
assert np.all(local_type_nids <= (self.type_nid_end[tid] + 1 - self.type_nid_begin[tid]))
cur_owners = self.partid_list[tid][local_type_nids]
type_id_lookups.append(cur_owners)
# Reorder the partition-ids, so that it agrees with the input order --
# which is the order in which the incoming message is received.
if len(type_id_lookups) <= 0:
out_list.append(torch.empty((0,), dtype=torch.int64))
else:
# Now reorder results for each request.
sort_order_idx = np.argsort(np.concatenate(local_order_idx))
lookups = np.concatenate(type_id_lookups)[sort_order_idx]
out_list.append(torch.from_numpy(lookups))
# Send the partition-ids to their respective requesting processes.
owner_resp_list = alltoallv_cpu(self.rank, self.world_size, out_list)
# Owner_resp_list, is a list of lists of numpy arrays where each list
# is a list of partition-ids which the current process requested
# Now we need to re-order so that the parition-ids correspond to the
# global_nids which are passed into this function.
# Order according to the requesting order.
# Owner_resp_list is the list of owner-ids for global_nids (function argument).
owner_ids = torch.cat([x for x in owner_resp_list if x is not None]).numpy()
assert len(owner_ids) == len(global_nids)
global_nids_order = np.concatenate(indices_list)
sort_order_idx = np.argsort(global_nids_order)
owner_ids = owner_ids[sort_order_idx]
global_nids_order = global_nids_order[sort_order_idx]
assert np.all(np.arange(len(global_nids)) == global_nids_order)
# Now the owner_ids (partition-ids) which corresponding to the global_nids.
return owner_ids
def get_shuffle_nids(self, global_nids, my_global_nids, my_shuffle_global_nids):
'''
This function is used to retrieve shuffle_global_nids for a given set of incoming
global_nids. Note that global_nids are of random order and will contain duplicates
This function first retrieves the partition-ids of the incoming global_nids.
These partition-ids which are also the ranks of processes which own the respective
global-nids as well as shuffle-global-nids. alltoallv is performed to send the
global-nids to respective ranks/partition-ids where the mapping
global-nids <-> shuffle-global-nid is located.
On the receiving side, once the global-nids are received associated shuffle-global-nids
are retrieved and an alltoallv is performed to send the responses to all the other
processes.
Once the responses, shuffle-global-nids, are received, they are re-ordered according
to the incoming global-nids order and returns to the caller.
Parameters:
-----------
self : instance of this class
instance of this class, which is passed by the runtime implicitly
global_nids : numpy array
an array of global node-ids for which partition-ids are to be retrieved by
the distributed lookup service.
my_global_nids: numpy ndarray
array of global_nids which are owned by the current partition/rank/process
This process has the node <-> partition id mapping
my_shuffle_global_nids : numpy ndarray
array of shuffle_global_nids which are assigned by the current process/rank
Returns:
--------
list of integers:
list of shuffle_global_nids which correspond to the incoming node-ids in the
global_nids.
'''
# Get the owner_ids (partition-ids or rank).
owner_ids = self.get_partition_ids(global_nids)
# Ask these owners to supply for the shuffle_global_nids.
send_list = []
id_list = []
for idx in range(self.world_size):
cond = owner_ids == idx
idxes = np.where(cond)
ll = global_nids[idxes[0]]
send_list.append(torch.from_numpy(ll))
id_list.append(idxes[0])
assert len(np.concatenate(id_list)) == len(global_nids)
cur_global_nids = alltoallv_cpu(self.rank, self.world_size, send_list)
# At this point, current process received a list of lists each containing
# a list of global-nids whose corresponding shuffle_global_nids are located
# in the current process.
shuffle_nids_list = []
for idx in range(self.world_size):
if cur_global_nids[idx] is None:
shuffle_nids_list.append(torch.empty((0,), dtype=torch.int64))
continue
uniq_ids, inverse_idx = np.unique(cur_global_nids[idx], return_inverse=True)
common, idx1, idx2 = np.intersect1d(uniq_ids, my_global_nids, assume_unique=True, return_indices=True)
assert len(common) == len(uniq_ids)
req_shuffle_global_nids = my_shuffle_global_nids[idx2][inverse_idx]
assert len(req_shuffle_global_nids) == len(cur_global_nids[idx])
shuffle_nids_list.append(torch.from_numpy(req_shuffle_global_nids))
# Send the shuffle-global-nids to their respective ranks.
mapped_global_nids = alltoallv_cpu(self.rank, self.world_size, shuffle_nids_list)
# Reorder to match global_nids (function parameter).
global_nids_order = np.concatenate(id_list)
shuffle_global_nids = torch.cat(mapped_global_nids).numpy()
assert len(shuffle_global_nids) == len(global_nids)
sorted_idx = np.argsort(global_nids_order)
shuffle_global_nids = shuffle_global_nids[ sorted_idx ]
global_nids_ordered = global_nids_order[sorted_idx]
assert np.all(global_nids_ordered == np.arange(len(global_nids)))
return shuffle_global_nids
import numpy as np
import torch
import operator
import itertools
import constants
from gloo_wrapper import allgather_sizes, alltoallv_cpu
from dist_lookup import DistLookupService
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
"""
For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
is not present at the current rank, this function retrieves their shuffle_global_ids from the owner rank
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total no. of ranks configured
global_nids_ranks : list
list of numpy arrays (of global_nids), index of the list is the rank of the process
where global_nid <-> shuffle_global_nid mapping is located.
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays
Returns:
--------
numpy ndarray
where the column-0 are global_nids and column-1 are shuffle_global_nids which are retrieved
from other processes.
"""
#build a list of sizes (lengths of lists)
global_nids_ranks = [torch.from_numpy(x) for x in global_nids_ranks]
recv_nodes = alltoallv_cpu(rank, world_size, global_nids_ranks)
# Use node_data to lookup global id to send over.
send_nodes = []
for proc_i_nodes in recv_nodes:
#list of node-ids to lookup
if proc_i_nodes is not None:
global_nids = proc_i_nodes.numpy()
if(len(global_nids) != 0):
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
send_nodes.append(torch.from_numpy(shuffle_global_nids).type(dtype=torch.int64))
else:
send_nodes.append(torch.empty((0), dtype=torch.int64))
else:
send_nodes.append(torch.empty((0), dtype=torch.int64))
#send receive global-ids
recv_shuffle_global_nids = alltoallv_cpu(rank, world_size, send_nodes)
shuffle_global_nids = np.concatenate([x.numpy() if x is not None else [] for x in recv_shuffle_global_nids])
global_nids = np.concatenate([x for x in global_nids_ranks])
ret_val = np.column_stack([global_nids, shuffle_global_nids])
return ret_val
def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data):
'''
This function is a helper function used to lookup shuffle-global-nids for a given set of
global-nids using a distributed lookup service.
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total number of processes used in the process group
edge_data : dictionary
edge_data is a dicitonary with keys as column names and values as numpy arrays representing
all the edges present in the current graph partition
id_lookup : instance of DistLookupService class
instance of a distributed lookup service class which is used to retrieve partition-ids and
shuffle-global-nids for any given set of global-nids
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays representing
all the nodes owned by the current process
Returns:
--------
dictionary :
dictionary where keys are column names and values are numpy arrays representing all the
edges present in the current graph partition
'''
node_list = np.concatenate([edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID]])
shuffle_ids = id_lookup.get_shuffle_nids(node_list,
node_data[constants.GLOBAL_NID],
node_data[constants.SHUFFLE_GLOBAL_NID])
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = np.split(shuffle_ids, 2)
return edge_data
def assign_shuffle_global_nids_nodes(rank, world_size, node_data):
"""
Utility function to assign shuffle global ids to nodes at a given rank
node_data gets converted from [ntype, global_type_nid, global_nid]
to [shuffle_global_nid, ntype, global_type_nid, global_nid, part_local_type_nid]
where shuffle_global_nid : global id of the node after data shuffle
ntype : node-type as read from xxx_nodes.txt
global_type_nid : node-type-id as read from xxx_nodes.txt
global_nid : node-id as read from xxx_nodes.txt, implicitly
this is the line no. in the file
part_local_type_nid : type_nid assigned by the current rank within its scope
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total number of processes used in the process group
ntype_counts: list of tuples
list of tuples (x,y), where x=ntype and y=no. of nodes whose shuffle_global_nids are needed
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays
"""
# Compute prefix sum to determine node-id offsets
prefix_sum_nodes = allgather_sizes([node_data[constants.GLOBAL_NID].shape[0]], world_size)
# assigning node-ids from localNodeStartId to (localNodeEndId - 1)
# Assuming here that the nodeDataArr is sorted based on the nodeType.
shuffle_global_nid_start = prefix_sum_nodes[rank]
shuffle_global_nid_end = prefix_sum_nodes[rank + 1]
# add a column with global-ids (after data shuffle)
shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64)
node_data[constants.SHUFFLE_GLOBAL_NID] = shuffle_global_nids
def assign_shuffle_global_nids_edges(rank, world_size, edge_data):
"""
Utility function to assign shuffle_global_eids to edges
edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype]
to [shuffle_global_src_nid, shuffle_global_dst_nid, global_src_nid, global_dst_nid, global_type_eid, etype]
Parameters:
-----------
rank : integer
rank of the current process
world_size : integer
total count of processes in execution
etype_counts : list of tuples
list of tuples (x,y), x = rank, y = no. of edges
edge_data : numpy ndarray
edge data as read from xxx_edges.txt file
Returns:
--------
integer
shuffle_global_eid_start, which indicates the starting value from which shuffle_global-ids are assigned to edges
on this rank
"""
#get prefix sum of edge counts per rank to locate the starting point
#from which global-ids to edges are assigned in the current rank
prefix_sum_edges = allgather_sizes([edge_data[constants.GLOBAL_SRC_ID].shape[0]], world_size)
shuffle_global_eid_start = prefix_sum_edges[rank]
shuffle_global_eid_end = prefix_sum_edges[rank + 1]
# assigning edge-ids from localEdgeStart to (localEdgeEndId - 1)
# Assuming here that the edge_data is sorted by edge_type
shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_EID] = shuffle_global_eids
return shuffle_global_eid_start
import numpy as np
import torch
import torch.distributed as dist
def allgather_sizes(send_data, world_size):
"""
Perform all gather on list lengths, used to compute prefix sums
to determine the offsets on each ranks. This is used to allocate
global ids for edges/nodes on each ranks.
Parameters
----------
send_data : numpy array
Data on which allgather is performed.
world_size : integer
No. of processes configured for execution
Returns :
---------
numpy array
array with the prefix sum
"""
#compute the length of the local data
send_length = len(send_data)
out_tensor = torch.as_tensor(send_data, dtype=torch.int64)
in_tensor = [torch.zeros(send_length, dtype=torch.int64)
for _ in range(world_size)]
#all_gather message
dist.all_gather(in_tensor, out_tensor)
#gather sizes in on array to return to the invoking function
rank_sizes = np.zeros(world_size + 1, dtype=np.int64)
count = rank_sizes[0]
for i, t in enumerate(in_tensor):
count += t.item()
rank_sizes[i+1] = count
return rank_sizes
def __alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
"""
Each process scatters list of input tensors to all processes in a cluster
and return gathered list of tensors in output list. The tensors should have the same shape.
Parameters
----------
rank : int
The rank of current worker
world_size : int
The size of the entire
output_tensor_list : List of tensor
The received tensors
input_tensor_list : List of tensor
The tensors to exchange
"""
input_tensor_list = [tensor.to(torch.device('cpu')) for tensor in input_tensor_list]
for i in range(world_size):
dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)
def alltoallv_cpu(rank, world_size, input_tensor_list):
"""
Wrapper function to providing the alltoallv functionality by using underlying alltoall
messaging primitive. This function, in its current implementation, supports exchanging
messages of arbitrary dimensions and is not tied to the user of this function.
This function pads all input tensors, except one, so that all the messages are of the same
size. Once the messages are padded, It first sends a vector whose first two elements are
1) actual message size along first dimension, and 2) Message size along first dimension
which is used for communication. The rest of the dimensions are assumed to be same across
all the input tensors. After receiving the message sizes, the receiving end will create buffers
of appropriate sizes. And then slices the received messages to remove the added padding, if any,
and returns to the caller.
Parameters:
-----------
rank : int
The rank of current worker
world_size : int
The size of the entire
input_tensor_list : List of tensor
The tensors to exchange
Returns:
--------
list :
list of tensors received from other processes during alltoall message
"""
#ensure len of input_tensor_list is same as the world_size.
assert input_tensor_list != None
assert len(input_tensor_list) == world_size
#ensure that all the tensors in the input_tensor_list are of same size.
sizes = [list(x.size()) for x in input_tensor_list]
for idx in range(1,len(sizes)):
assert len(sizes[idx-1]) == len(sizes[idx]) #no. of dimensions should be same
assert input_tensor_list[idx-1].dtype == input_tensor_list[idx].dtype # dtype should be same
assert sizes[idx-1][1:] == sizes[idx][1:] #except first dimension remaining dimensions should all be the same
#decide how much to pad.
#always use the first-dimension for padding.
ll = [ x[0] for x in sizes ]
#dims of the padding needed, if any
#these dims are used for padding purposes.
diff_dims = [ [np.amax(ll) - l[0]] + l[1:] for l in sizes ]
#pad the actual message
input_tensor_list = [torch.cat((x, torch.zeros(diff_dims[idx]).type(x.dtype))) for idx, x in enumerate(input_tensor_list)]
#send useful message sizes to all
send_counts = []
recv_counts = []
for idx in range(world_size):
#send a vector, of atleast 3 elements, [a, b, ....] where
#a = useful message dim, b = actual message outgoing message size along the first dimension
#and remaining elements are the remaining dimensions of the tensor
send_counts.append(torch.from_numpy(np.array([sizes[idx][0]] + [np.amax(ll)] + sizes[idx][1:] )).type(torch.int64))
recv_counts.append(torch.zeros((1 + len(sizes[idx])), dtype=torch.int64))
__alltoall_cpu(rank, world_size, recv_counts, send_counts)
#allocate buffers for receiving message
output_tensor_list = []
recv_counts = [ tsize.numpy() for tsize in recv_counts]
for idx, tsize in enumerate(recv_counts):
output_tensor_list.append(torch.zeros(tuple(tsize[1:])).type(input_tensor_list[idx].dtype))
#send actual message itself.
__alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list)
#extract un-padded message from the output_tensor_list and return it
return_vals = []
for s, t in zip(recv_counts, output_tensor_list):
if s[0] == 0:
return_vals.append(None)
else:
return_vals.append(t[0:s[0]])
return return_vals
def gather_metadata_json(metadata, rank, world_size):
"""
Gather an object (json schema on `rank`)
Parameters:
-----------
metadata : json dictionary object
json schema formed on each rank with graph level data.
This will be used as input to the distributed training in the later steps.
Returns:
--------
list : list of json dictionary objects
The result of the gather operation, which is the list of json dicitonary
objects from each rank in the world
"""
#Populate input obj and output obj list on rank-0 and non-rank-0 machines
input_obj = None if rank == 0 else metadata
output_objs = [None for _ in range(world_size)] if rank == 0 else None
#invoke the gloo method to perform gather on rank-0
dist.gather_object(input_obj, output_objs, dst=0)
return output_objs
import os
import torch
import numpy as np
import json
import dgl
import constants
import pyarrow
from pyarrow import csv
def read_ntype_partition_files(schema_map, input_dir):
"""
Utility method to read the partition id mapping for each node.
For each node type, there will be an file, in the input directory argument
containing the partition id mapping for a given nodeid.
Parameters:
-----------
schema_map : dictionary
dictionary created by reading the input metadata json file
input_dir : string
directory in which the node-id to partition-id mappings files are
located for each of the node types in the input graph
Returns:
--------
numpy array :
array of integers representing mapped partition-ids for a given node-id.
The line number, in these files, are used as the type_node_id in each of
the files. The index into this array will be the homogenized node-id and
value will be the partition-id for that node-id (index). Please note that
the partition-ids of each node-type are stacked together vertically and
in this way heterogenous node-ids are converted to homogenous node-ids.
"""
assert os.path.isdir(input_dir)
#iterate over the node types and extract the partition id mappings
part_ids = []
ntype_names = schema_map[constants.STR_NODE_TYPE]
for ntype in ntype_names:
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
ntype_partids = df['f0'].to_numpy()
part_ids.append(ntype_partids)
return np.concatenate(part_ids)
def read_json(json_file):
"""
Utility method to read a json file schema
Parameters:
-----------
json_file : string
file name for the json schema
Returns:
--------
dictionary, as serialized in the json_file
"""
with open(json_file) as schema:
val = json.load(schema)
return val
def get_ntype_featnames(ntype_name, schema_map):
"""
Retrieves node feature names for a given node_type
Parameters:
-----------
ntype_name : string
a string specifying a node_type name
schema : dictionary
metadata json object as a dictionary, which is read from the input
metadata file from the input dataset
Returns:
--------
list :
a list of feature names for a given node_type
"""
ntype_featdict = schema_map[constants.STR_NODE_DATA]
if (ntype_name in ntype_featdict):
featnames = []
ntype_info = ntype_featdict[ntype_name]
for k, v in ntype_info.items():
featnames.append(k)
return featnames
else:
return []
def get_node_types(schema_map):
"""
Utility method to extract node_typename -> node_type mappings
as defined by the input schema
Parameters:
-----------
schema_map : dictionary
Input schema from which the node_typename -> node_type
dictionary is created.
Returns:
--------
dictionary
with keys as node type names and values as ids (integers)
list
list of ntype name strings
dictionary
with keys as ntype ids (integers) and values as node type names
"""
ntypes = schema_map[constants.STR_NODE_TYPE]
ntype_ntypeid_map = {e : i for i, e in enumerate(ntypes)}
ntypeid_ntype_map = {i : e for i, e in enumerate(ntypes)}
return ntype_ntypeid_map, ntypes, ntypeid_ntype_map
def get_gnid_range_map(node_tids):
"""
Retrieves auxiliary dictionaries from the metadata json object
Parameters:
-----------
node_tids: dictionary
This dictionary contains the information about nodes for each node_type.
Typically this information contains p-entries, where each entry has a file-name,
starting and ending type_node_ids for the nodes in this file. Keys in this dictionary
are the node_type and value is a list of lists. Each individual entry in this list has
three items: file-name, starting type_nid and ending type_nid
Returns:
--------
dictionary :
a dictionary where keys are node_type names and values are global_nid range, which is a tuple.
"""
ntypes_gid_range = {}
offset = 0
for k, v in node_tids.items():
ntypes_gid_range[k] = [offset + int(v[0][0]), offset + int(v[-1][1])]
offset += int(v[-1][1])
return ntypes_gid_range
def write_metadata_json(metadata_list, output_dir, graph_name):
"""
Merge json schema's from each of the rank's on rank-0.
This utility function, to be used on rank-0, to create aggregated json file.
Parameters:
-----------
metadata_list : list of json (dictionaries)
a list of json dictionaries to merge on rank-0
output_dir : string
output directory path in which results are stored (as a json file)
graph-name : string
a string specifying the graph name
"""
#Initialize global metadata
graph_metadata = {}
#Merge global_edge_ids from each json object in the input list
edge_map = {}
x = metadata_list[0]["edge_map"]
for k in x:
edge_map[k] = []
for idx in range(len(metadata_list)):
edge_map[k].append([int(metadata_list[idx]["edge_map"][k][0][0]),int(metadata_list[idx]["edge_map"][k][0][1])])
graph_metadata["edge_map"] = edge_map
graph_metadata["etypes"] = metadata_list[0]["etypes"]
graph_metadata["graph_name"] = metadata_list[0]["graph_name"]
graph_metadata["halo_hops"] = metadata_list[0]["halo_hops"]
#Merge global_nodeids from each of json object in the input list
node_map = {}
x = metadata_list[0]["node_map"]
for k in x:
node_map[k] = []
for idx in range(len(metadata_list)):
node_map[k].append([int(metadata_list[idx]["node_map"][k][0][0]), int(metadata_list[idx]["node_map"][k][0][1])])
graph_metadata["node_map"] = node_map
graph_metadata["ntypes"] = metadata_list[0]["ntypes"]
graph_metadata["num_edges"] = int(sum([metadata_list[i]["num_edges"] for i in range(len(metadata_list))]))
graph_metadata["num_nodes"] = int(sum([metadata_list[i]["num_nodes"] for i in range(len(metadata_list))]))
graph_metadata["num_parts"] = metadata_list[0]["num_parts"]
graph_metadata["part_method"] = metadata_list[0]["part_method"]
for i in range(len(metadata_list)):
graph_metadata["part-{}".format(i)] = metadata_list[i]["part-{}".format(i)]
with open('{}/metadata.json'.format(output_dir), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=False, indent=4)
def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size):
"""
Add partition-id (rank which owns an edge) column to the edge_data.
Parameters:
-----------
edge_data : numpy ndarray
Edge information as read from the xxx_edges.txt file
lookup_service : instance of class DistLookupService
Distributed lookup service used to map global-nids to respective partition-ids and▒
shuffle-global-nids
edge_tids: dictionary
dictionary where keys are canonical edge types and values are list of tuples
which indicate the range of edges assigned to each of the partitions
rank : integer
rank of the current process
world_size : integer
total no. of process participating in the communication primitives
Returns:
--------
dictionary :
dictionary with keys as column names and values as numpy arrays and this information is
loaded from input dataset files. In addition to this we include additional columns which
aid this pipelines computation, like constants.OWNER_PROCESS
"""
#add global_nids to the node_data
etype_offset = {}
offset = 0
for etype_name, tid_range in edge_tids.items():
assert int(tid_range[0][0]) == 0
assert len(tid_range) == world_size
etype_offset[etype_name] = offset + int(tid_range[0][0])
offset += int(tid_range[-1][1])
global_eids = []
for etype_name, tid_range in edge_tids.items():
global_eid_start = etype_offset[etype_name]
begin = global_eid_start + int(tid_range[rank][0])
end = global_eid_start + int(tid_range[rank][1])
global_eids.append(np.arange(begin, end, dtype=np.int64))
global_eids = np.concatenate(global_eids)
assert global_eids.shape[0] == edge_data[constants.ETYPE_ID].shape[0]
edge_data[constants.GLOBAL_EID] = global_eids
#assign the owner process/rank for each edge
edge_data[constants.OWNER_PROCESS] = lookup_service.get_partition_ids(edge_data[constants.GLOBAL_DST_ID])
return edge_data
def read_edges_file(edge_file, edge_data_dict):
"""
Utility function to read xxx_edges.txt file
Parameters:
-----------
edge_file : string
Graph file for edges in the input graph
Returns:
--------
dictionary
edge data as read from xxx_edges.txt file and columns are stored
in a dictionary with key-value pairs as column-names and column-data.
"""
if edge_file == "" or edge_file == None:
return None
#Read the file from here.
#<global_src_id> <global_dst_id> <type_eid> <etype> <attributes>
# global_src_id -- global idx for the source node ... line # in the graph_nodes.txt
# global_dst_id -- global idx for the destination id node ... line # in the graph_nodes.txt
edge_data_df = csv.read_csv(edge_file, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
edge_data_dict = {}
edge_data_dict[constants.GLOBAL_SRC_ID] = edge_data_df['f0'].to_numpy()
edge_data_dict[constants.GLOBAL_DST_ID] = edge_data_df['f1'].to_numpy()
edge_data_dict[constants.GLOBAL_TYPE_EID] = edge_data_df['f2'].to_numpy()
edge_data_dict[constants.ETYPE_ID] = edge_data_df['f3'].to_numpy()
return edge_data_dict
def read_node_features_file(nodes_features_file):
"""
Utility function to load tensors from a file
Parameters:
-----------
nodes_features_file : string
Features file for nodes in the graph
Returns:
--------
dictionary
mappings between ntype and list of features
"""
node_features = dgl.data.utils.load_tensors(nodes_features_file, False)
return node_features
def read_edge_features_file(edge_features_file):
"""
Utility function to load tensors from a file
Parameters:
-----------
edge_features_file : string
Features file for edges in the graph
Returns:
--------
dictionary
mappings between etype and list of features
"""
edge_features = dgl.data.utils.load_tensors(edge_features_file, True)
return edge_features
def write_node_features(node_features, node_file):
"""
Utility function to serialize node_features in node_file file
Parameters:
-----------
node_features : dictionary
dictionary storing ntype <-> list of features
node_file : string
File in which the node information is serialized
"""
dgl.data.utils.save_tensors(node_file, node_features)
def write_edge_features(edge_features, edge_file):
"""
Utility function to serialize edge_features in edge_file file
Parameters:
-----------
edge_features : dictionary
dictionary storing etype <-> list of features
edge_file : string
File in which the edge information is serialized
"""
dgl.data.utils.save_tensors(edge_file, edge_features)
def write_graph_dgl(graph_file, graph_obj):
"""
Utility function to serialize graph dgl objects
Parameters:
-----------
graph_obj : dgl graph object
graph dgl object, as created in convert_partition.py, which is to be serialized
graph_file : string
File name in which graph object is serialized
"""
dgl.save_graphs(graph_file, [graph_obj])
def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_id):
"""
Wrapper function to create dgl objects for graph, node-features and edge-features
Parameters:
-----------
graph_obj : dgl object
graph dgl object as created in convert_partition.py file
node_features : dgl object
Tensor data for node features
edge_features : dgl object
Tensor data for edge features
output_dir : string
location where the output files will be located
part_id : int
integer indicating the partition-id
"""
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
write_graph_dgl(os.path.join(part_dir ,'graph.dgl'), graph_obj)
if node_features != None:
write_node_features(node_features, os.path.join(part_dir, "node_feat.dgl"))
if (edge_features != None):
write_edge_features(edge_features, os.path.join(part_dir, "edge_feat.dgl"))
def get_idranges(names, counts):
"""
Utility function to compute typd_id/global_id ranges for both nodes and edges.
Parameters:
-----------
names : list of strings
list of node/edge types as strings
counts : list of lists
each list contains no. of nodes/edges in a given chunk
Returns:
--------
dictionary
dictionary where the keys are node-/edge-type names and values are
list of tuples where each tuple indicates the range of values for
corresponding type-ids.
dictionary
dictionary where the keys are node-/edge-type names and value is a tuple.
This tuple indicates the global-ids for the associated node-/edge-type.
"""
gnid_start = 0
gnid_end = gnid_start
tid_dict = {}
gid_dict = {}
for idx, typename in enumerate(names):
type_counts = counts[idx]
tid_start = np.cumsum([0] + type_counts[:-1])
tid_end = np.cumsum(type_counts)
tid_ranges = list(zip(tid_start, tid_end))
type_start = tid_ranges[0][0]
type_end = tid_ranges[-1][1]
gnid_end += tid_ranges[-1][1]
tid_dict[typename] = tid_ranges
gid_dict[typename] = np.array([gnid_start, gnid_end]).reshape([1,2])
gnid_start = gnid_end
return tid_dict, gid_dict
# Requires setting PYTHONPATH=${GITROOT}/tools
import json
import logging
import sys
import os
import numpy as np
import argparse
from utils import setdir
from utils import array_readwriter
def _random_partition(metadata, num_parts):
num_nodes_per_type = [sum(_) for _ in metadata['num_nodes_per_chunk']]
ntypes = metadata['node_type']
for ntype, n in zip(ntypes, num_nodes_per_type):
logging.info('Generating partition for node type %s' % ntype)
parts = np.random.randint(0, num_parts, (n,))
array_readwriter.get_array_parser(name='csv').write(ntype + '.txt', parts)
def random_partition(metadata, num_parts, output_path):
"""
Randomly partition the graph described in metadata and generate partition ID mapping
in :attr:`output_path`.
A directory will be created at :attr:`output_path` containing the partition ID
mapping files named "<node-type>.txt" (e.g. "author.txt", "paper.txt" and
"institution.txt" for OGB-MAG240M). Each file contains one line per node representing
the partition ID the node belongs to.
"""
with setdir(output_path):
_random_partition(metadata, num_parts)
# Run with PYTHONPATH=${GIT_ROOT_DIR}/tools
# where ${GIT_ROOT_DIR} is the directory to the DGL git repository.
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--metadata', type=str, help='input metadata file of the chunked graph format')
parser.add_argument(
'--output_path', type=str, help='output directory')
parser.add_argument(
'--num_partitions', type=int, help='number of partitions')
logging.basicConfig(level='INFO')
args = parser.parse_args()
with open(args.metadata) as f:
metadata = json.load(f)
output_path = args.output_path
num_parts = args.num_partitions
random_partition(metadata, num_parts, output_path)
from .files import *
from . import array_readwriter
from .registry import register_array_parser, get_array_parser
from . import csv
from . import numpy_array
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment