Unverified Commit 18d89b5d authored by Minjie Wang's avatar Minjie Wang Committed by GitHub
Browse files

Revert "[Dist] New distributed data preparation pipeline (#4386)" (#4391)

This reverts commit 71ce1749.
parent 71ce1749
# DGL Utility Scripts
This folder contains the utilities that do not belong to DGL core package as standalone executable
scripts.
## Graph Chunking
`chunk_graph.py` provides an example of chunking an existing DGLGraph object into the on-disk
[chunked graph format](http://13.231.216.217/guide/distributed-preprocessing.html#chunked-graph-format).
<!-- TODO: change the link of documentation once it's merged to master -->
An example of chunking the OGB MAG240M dataset:
```python
import ogb.lsc
dataset = ogb.lsc.MAG240MDataset('.')
etypes = [
('paper', 'cites', 'paper'),
('author', 'writes', 'paper'),
('author', 'affiliated_with', 'institution')]
g = dgl.heterograph({k: tuple(dataset.edge_index(*k)) for k in etypes})
chunk_graph(
g,
'mag240m',
{'paper': {
'feat': 'mag240m_kddcup2021/processed/paper/node_feat.npy',
'label': 'mag240m_kddcup2021/processed/paper/node_label.npy',
'year': 'mag240m_kddcup2021/processed/paper/node_year.npy'}},
{},
4,
'output')
```
The output chunked graph metadata will go as follows (assuming the current directory as
`/home/user`:
```json
{
"graph_name": "mag240m",
"node_type": [
"author",
"institution",
"paper"
],
"num_nodes_per_chunk": [
[
30595778,
30595778,
30595778,
30595778
],
[
6431,
6430,
6430,
6430
],
[
30437917,
30437917,
30437916,
30437916
]
],
"edge_type": [
"author:affiliated_with:institution",
"author:writes:paper",
"paper:cites:paper"
],
"num_edges_per_chunk": [
[
11148147,
11148147,
11148146,
11148146
],
[
96505680,
96505680,
96505680,
96505680
],
[
324437232,
324437232,
324437231,
324437231
]
],
"edges": {
"author:affiliated_with:institution": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/author:affiliated_with:institution0.txt",
"/home/user/output/edge_index/author:affiliated_with:institution1.txt",
"/home/user/output/edge_index/author:affiliated_with:institution2.txt",
"/home/user/output/edge_index/author:affiliated_with:institution3.txt"
]
},
"author:writes:paper": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/author:writes:paper0.txt",
"/home/user/output/edge_index/author:writes:paper1.txt",
"/home/user/output/edge_index/author:writes:paper2.txt",
"/home/user/output/edge_index/author:writes:paper3.txt"
]
},
"paper:cites:paper": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/paper:cites:paper0.txt",
"/home/user/output/edge_index/paper:cites:paper1.txt",
"/home/user/output/edge_index/paper:cites:paper2.txt",
"/home/user/output/edge_index/paper:cites:paper3.txt"
]
}
},
"node_data": {
"paper": {
"feat": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/feat-0.npy",
"/home/user/output/node_data/paper/feat-1.npy",
"/home/user/output/node_data/paper/feat-2.npy",
"/home/user/output/node_data/paper/feat-3.npy"
]
},
"label": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/label-0.npy",
"/home/user/output/node_data/paper/label-1.npy",
"/home/user/output/node_data/paper/label-2.npy",
"/home/user/output/node_data/paper/label-3.npy"
]
},
"year": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/year-0.npy",
"/home/user/output/node_data/paper/year-1.npy",
"/home/user/output/node_data/paper/year-2.npy",
"/home/user/output/node_data/paper/year-3.npy"
]
}
}
},
"edge_data": {}
}
```
# See the __main__ block for usage of chunk_graph().
import pathlib
import json
from contextlib import contextmanager
import logging
import os
import torch
import dgl
from utils import setdir
from utils import array_readwriter
def chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
paths = []
offset = 0
for j, n in enumerate(chunk_sizes):
path = os.path.abspath(path_fmt % j)
arr_chunk = arr[offset:offset + n]
logging.info('Chunking %d-%d' % (offset, offset + n))
array_readwriter.get_array_parser(**fmt_meta).write(path, arr_chunk)
offset += n
paths.append(path)
return paths
def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
# First deal with ndata and edata that are homogeneous (i.e. not a dict-of-dict)
if len(g.ntypes) == 1 and not isinstance(next(iter(ndata_paths.values())), dict):
ndata_paths = {g.ntypes[0]: ndata_paths}
if len(g.etypes) == 1 and not isinstance(next(iter(edata_paths.values())), dict):
edata_paths = {g.etypes[0]: ndata_paths}
# Then convert all edge types to canonical edge types
etypestrs = {etype: ':'.join(etype) for etype in g.canonical_etypes}
edata_paths = {':'.join(g.to_canonical_etype(k)): v for k, v in edata_paths.items()}
metadata = {}
metadata['graph_name'] = name
metadata['node_type'] = g.ntypes
# Compute the number of nodes per chunk per node type
metadata['num_nodes_per_chunk'] = num_nodes_per_chunk = []
for ntype in g.ntypes:
num_nodes = g.num_nodes(ntype)
num_nodes_list = []
for i in range(num_chunks):
n = num_nodes // num_chunks + (i < num_nodes % num_chunks)
num_nodes_list.append(n)
num_nodes_per_chunk.append(num_nodes_list)
num_nodes_per_chunk_dict = {k: v for k, v in zip(g.ntypes, num_nodes_per_chunk)}
metadata['edge_type'] = [etypestrs[etype] for etype in g.canonical_etypes]
# Compute the number of edges per chunk per edge type
metadata['num_edges_per_chunk'] = num_edges_per_chunk = []
for etype in g.canonical_etypes:
num_edges = g.num_edges(etype)
num_edges_list = []
for i in range(num_chunks):
n = num_edges // num_chunks + (i < num_edges % num_chunks)
num_edges_list.append(n)
num_edges_per_chunk.append(num_edges_list)
num_edges_per_chunk_dict = {k: v for k, v in zip(g.canonical_etypes, num_edges_per_chunk)}
# Split edge index
metadata['edges'] = {}
with setdir('edge_index'):
for etype in g.canonical_etypes:
etypestr = etypestrs[etype]
logging.info('Chunking edge index for %s' % etypestr)
edges_meta = {}
fmt_meta = {"name": "csv", "delimiter": " "}
edges_meta['format'] = fmt_meta
srcdst = torch.stack(g.edges(etype=etype), 1)
edges_meta['data'] = chunk_numpy_array(
srcdst.numpy(), fmt_meta, num_edges_per_chunk_dict[etype],
etypestr + '%d.txt')
metadata['edges'][etypestr] = edges_meta
# Chunk node data
metadata['node_data'] = {}
with setdir('node_data'):
for ntype, ndata_per_type in ndata_paths.items():
ndata_meta = {}
with setdir(ntype):
for key, path in ndata_per_type.items():
logging.info('Chunking node data for type %s key %s' % (ntype, key))
ndata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(**reader_fmt_meta).read(path)
ndata_key_meta['format'] = writer_fmt_meta
ndata_key_meta['data'] = chunk_numpy_array(
arr, writer_fmt_meta, num_nodes_per_chunk_dict[ntype],
key + '-%d.npy')
ndata_meta[key] = ndata_key_meta
metadata['node_data'][ntype] = ndata_meta
# Chunk edge data
metadata['edge_data'] = {}
with setdir('edge_data'):
for etypestr, edata_per_type in edata_paths.items():
edata_meta = {}
with setdir(etypestr):
for key, path in edata_per_type.items():
logging.info('Chunking edge data for type %s key %s' % (etypestr, key))
edata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(**reader_fmt_meta).read(path)
edata_key_meta['format'] = writer_fmt_meta
edata_key_meta['data'] = chunk_numpy_array(
arr, writer_fmt_meta, num_edges_per_chunk_dict[etype],
key + '-%d.npy')
edata_meta[key] = edata_key_meta
metadata['edge_data'][etypestr] = edata_meta
metadata_path = 'metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f)
logging.info('Saved metadata in %s' % os.path.abspath(metadata_path))
def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
"""
Split the graph into multiple chunks.
A directory will be created at :attr:`output_path` with the metadata and chunked
edge list as well as the node/edge data.
Parameters
----------
g : DGLGraph
The graph.
name : str
The name of the graph, to be used later in DistDGL training.
ndata_paths : dict[str, pathlike] or dict[ntype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
node data key.
edata_paths : dict[str, pathlike] or dict[etype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
edge data key.
num_chunks : int
The number of chunks
output_path : pathlike
The output directory saving the chunked graph.
"""
for ntype, ndata in ndata_paths.items():
for key in ndata.keys():
ndata[key] = os.path.abspath(ndata[key])
for etype, edata in edata_paths.items():
for key in edata.keys():
edata[key] = os.path.abspath(edata[key])
with setdir(output_path):
_chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path)
if __name__ == '__main__':
logging.basicConfig(level='INFO')
input_dir = '/data'
output_dir = '/chunked-data'
(g,), _ = dgl.load_graphs(os.path.join(input_dir, 'graph.dgl'))
chunk_graph(
g,
'mag240m',
{'paper': {
'feat': os.path.join(input_dir, 'paper/feat.npy'),
'label': os.path.join(input_dir, 'paper/label.npy'),
'year': os.path.join(input_dir, 'paper/year.npy')}},
{'cites': {'count': os.path.join(input_dir, 'cites/count.npy')},
'writes': {'year': os.path.join(input_dir, 'writes/year.npy')},
# you can put the same data file if they indeed share the features.
'rev_writes': {'year': os.path.join(input_dir, 'writes/year.npy')}},
4,
output_dir)
# The generated metadata goes as in tools/sample-config/mag240m-metadata.json.
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
import pandas as pd
from pyarrow import csv
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--num-node-weights', required=True, type=int,
help='The number of node weights used by METIS.')
parser.add_argument('--workspace', type=str, default='/tmp',
help='The directory to store the intermediate results')
parser.add_argument('--node-attr-dtype', type=str, default=None,
help='The data type of the node attributes')
parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--removed-edges', help='a file that contains the removed self-loops and duplicated edges',
default=None, type=str)
args = parser.parse_args()
input_dir = args.input_dir
graph_name = args.graph_name
num_parts = args.num_parts
num_node_weights = args.num_node_weights
node_attr_dtype = args.node_attr_dtype
edge_attr_dtype = args.edge_attr_dtype
workspace_dir = args.workspace
output_dir = args.output
self_loop_edges = None
duplicate_edges = None
if args.removed_edges is not None:
removed_file = '{}/{}'.format(input_dir, args.removed_edges)
removed_df = csv.read_csv(removed_file, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
assert removed_df.num_columns == 4
src_id = removed_df['f0'].to_numpy()
dst_id = removed_df['f1'].to_numpy()
orig_id = removed_df['f2'].to_numpy()
etype = removed_df['f3'].to_numpy()
self_loop_idx = src_id == dst_id
not_self_loop_idx = src_id != dst_id
self_loop_edges = [src_id[self_loop_idx], dst_id[self_loop_idx],
orig_id[self_loop_idx], etype[self_loop_idx]]
duplicate_edges = [src_id[not_self_loop_idx], dst_id[not_self_loop_idx],
orig_id[not_self_loop_idx], etype[not_self_loop_idx]]
print('There are {} self-loops and {} duplicated edges in the removed edges'.format(len(self_loop_edges[0]),
len(duplicate_edges[0])))
with open(args.schema) as json_file:
schema = json.load(json_file)
nid_ranges = schema['nid']
eid_ranges = schema['eid']
nid_ranges = {key: np.array(nid_ranges[key]).reshape(
1, 2) for key in nid_ranges}
eid_ranges = {key: np.array(eid_ranges[key]).reshape(
1, 2) for key in eid_ranges}
id_map = dgl.distributed.id_map.IdMap(nid_ranges)
ntypes = [(key, nid_ranges[key][0, 0]) for key in nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
etypes = [(key, eid_ranges[key][0, 0]) for key in eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e: i for i, e in enumerate(etypes)}
def read_feats(file_name):
attrs = csv.read_csv(file_name, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(attrs.columns)
return np.stack([attrs.columns[i].to_numpy() for i in range(num_cols)], 1)
max_nid = np.iinfo(np.int32).max
num_edges = 0
num_nodes = 0
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype: [] for etype in etypes}
for part_id in range(num_parts):
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
node_file = 'p{:03}-{}_nodes.txt'.format(part_id, graph_name)
# The format of each line in the node file:
# <node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
# The node file contains nodes that belong to a partition. It doesn't include HALO nodes.
orig_type_nid_col = 3 + num_node_weights
first_attr_col = 4 + num_node_weights
# Get the first two columns which is the node ID and node type.
tmp_output = workspace_dir + '/' + node_file + '.tmp'
os.system('awk \'{print $1, $2, $' + str(orig_type_nid_col) + '}\''
+ ' {} > {}'.format(input_dir + '/' + node_file, tmp_output))
nodes = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
nids, ntype_ids, orig_type_nid = nodes.columns[0].to_numpy(), nodes.columns[1].to_numpy(), \
nodes.columns[2].to_numpy()
orig_homo_nid = ntype_offset_np[ntype_ids] + orig_type_nid
assert np.all(nids[1:] - nids[:-1] == 1)
nid_range = (nids[0], nids[-1])
num_nodes += len(nodes)
if node_attr_dtype is not None:
# Get node attributes
# Here we just assume all nodes have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode node attributes.
os.system('cut -d\' \' -f {}- {} > {}'.format(first_attr_col,
input_dir + '/' + node_file,
tmp_output))
node_attrs = read_feats(tmp_output)
node_feats = {}
# nodes in a partition has been sorted based on node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
assert np.all(type_nids == np.arange(
type_nids[0], type_nids[-1] + 1))
node_feats[ntype_name +
'/feat'] = th.as_tensor(node_attrs[ntype_ids == ntype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "node_feat.dgl"), node_feats)
# Determine the node ID ranges of different node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
edge_file = 'p{:03}-{}_edges.txt'.format(part_id, graph_name)
# The format of each line in the edge file:
# <src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_edge_id> <edge_type> <attributes>
tmp_output = workspace_dir + '/' + edge_file + '.tmp'
os.system('awk \'{print $1, $2, $3, $4, $5, $6}\'' + ' {} > {}'.format(input_dir + '/' + edge_file,
tmp_output))
edges = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(edges.columns)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = [
edges.columns[i].to_numpy() for i in range(num_cols)]
# Let's merge the self-loops and duplicated edges to the partition.
src_id_list, dst_id_list = [src_id], [dst_id]
orig_src_id_list, orig_dst_id_list = [orig_src_id], [orig_dst_id]
orig_edge_id_list, etype_id_list = [orig_edge_id], [etype_ids]
if self_loop_edges is not None and len(self_loop_edges[0]) > 0:
uniq_orig_nids, idx = np.unique(orig_dst_id, return_index=True)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_nids, self_loop_edges[0], return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(dst_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(self_loop_edges[0][common_idx2])
orig_dst_id_list.append(self_loop_edges[0][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(self_loop_edges[2][common_idx2])
etype_id_list.append(self_loop_edges[3][common_idx2])
print('Add {} self-loops in partition {}'.format(len(idx), part_id))
if duplicate_edges is not None and len(duplicate_edges[0]) > 0:
part_ids = orig_src_id.astype(
np.int64) * max_nid + orig_dst_id.astype(np.int64)
uniq_orig_ids, idx = np.unique(part_ids, return_index=True)
duplicate_ids = duplicate_edges[0].astype(
np.int64) * max_nid + duplicate_edges[1].astype(np.int64)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_ids, duplicate_ids, return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(src_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(duplicate_edges[0][common_idx2])
orig_dst_id_list.append(duplicate_edges[1][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(duplicate_edges[2][common_idx2])
etype_id_list.append(duplicate_edges[3][common_idx2])
print('Add {} duplicated edges in partition {}'.format(len(idx), part_id))
src_id = np.concatenate(src_id_list) if len(
src_id_list) > 1 else src_id_list[0]
dst_id = np.concatenate(dst_id_list) if len(
dst_id_list) > 1 else dst_id_list[0]
orig_src_id = np.concatenate(orig_src_id_list) if len(
orig_src_id_list) > 1 else orig_src_id_list[0]
orig_dst_id = np.concatenate(orig_dst_id_list) if len(
orig_dst_id_list) > 1 else orig_dst_id_list[0]
orig_edge_id = np.concatenate(orig_edge_id_list) if len(
orig_edge_id_list) > 1 else orig_edge_id_list[0]
etype_ids = np.concatenate(etype_id_list) if len(
etype_id_list) > 1 else etype_id_list[0]
print('There are {} edges in partition {}'.format(len(src_id), part_id))
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = src_id[sort_idx], dst_id[sort_idx], \
orig_src_id[sort_idx], orig_dst_id[sort_idx], orig_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
if edge_attr_dtype is not None:
# Get edge attributes
# Here we just assume all edges have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode edge attributes.
os.system('cut -d\' \' -f 7- {} > {}'.format(input_dir +
'/' + edge_file, tmp_output))
edge_attrs = th.as_tensor(read_feats(tmp_output))[sort_idx]
edge_feats = {}
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_feats[etype_name +
'/feat'] = th.as_tensor(edge_attrs[etype_ids == etype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "edge_feat.dgl"), edge_feats)
# Determine the edge ID range of different edge types.
edge_id_start = num_edges
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_map_val[etype_name].append([int(edge_id_start),
int(edge_id_start + np.sum(etype_ids == etype_id))])
edge_id_start += np.sum(etype_ids == etype_id)
# Here we want to compute the unique IDs in the edge list.
# It is possible that a node that belongs to the partition but it doesn't appear
# in the edge list. That is, the node is assigned to this partition, but its neighbor
# belongs to another partition so that the edge is assigned to another partition.
# This happens in a directed graph.
# To avoid this kind of nodes being removed from the graph, we add node IDs that
# belong to this partition.
ids = np.concatenate(
[src_id, dst_id, np.arange(nid_range[0], nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
local_src_id, local_dst_id = np.split(inverse_idx[:len(src_id) * 2], 2)
compact_g = dgl.graph((local_src_id, local_dst_id))
compact_g.edata['orig_id'] = th.as_tensor(orig_edge_id)
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
compact_g.edata['inner_edge'] = th.ones(
compact_g.number_of_edges(), dtype=th.bool)
# The original IDs are homogeneous IDs.
# Similarly, we need to add the original homogeneous node IDs
orig_ids = np.concatenate([orig_src_id, orig_dst_id, orig_homo_nid])
orig_homo_ids = orig_ids[idx]
ntype, per_type_ids = id_map(orig_homo_ids)
compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids)
compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype)
compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids)
compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(
uniq_ids >= nid_range[0], uniq_ids <= nid_range[1]))
local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()]
assert np.all((local_nids == th.arange(
local_nids[0], local_nids[-1] + 1)).numpy())
print('|V|={}'.format(compact_g.number_of_nodes()))
print('|E|={}'.format(compact_g.number_of_edges()))
# We need to reshuffle nodes in a partition so that all local nodes are labelled starting from 0.
reshuffle_nodes = th.arange(compact_g.number_of_nodes())
reshuffle_nodes = th.cat([reshuffle_nodes[compact_g.ndata['inner_node'].bool()],
reshuffle_nodes[compact_g.ndata['inner_node'] == 0]])
compact_g1 = dgl.node_subgraph(compact_g, reshuffle_nodes)
compact_g1.ndata['orig_id'] = compact_g.ndata['orig_id'][reshuffle_nodes]
compact_g1.ndata[dgl.NTYPE] = compact_g.ndata[dgl.NTYPE][reshuffle_nodes]
compact_g1.ndata[dgl.NID] = compact_g.ndata[dgl.NID][reshuffle_nodes]
compact_g1.ndata['inner_node'] = compact_g.ndata['inner_node'][reshuffle_nodes]
compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]]
compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]]
# reshuffle edges on ETYPE as node_subgraph relabels edges
idx = th.argsort(compact_g1.edata[dgl.ETYPE])
u, v = compact_g1.edges()
u = u[idx]
v = v[idx]
compact_g2 = dgl.graph((u, v))
compact_g2.ndata['orig_id'] = compact_g1.ndata['orig_id']
compact_g2.ndata[dgl.NTYPE] = compact_g1.ndata[dgl.NTYPE]
compact_g2.ndata[dgl.NID] = compact_g1.ndata[dgl.NID]
compact_g2.ndata['inner_node'] = compact_g1.ndata['inner_node']
compact_g2.edata['orig_id'] = compact_g1.edata['orig_id'][idx]
compact_g2.edata[dgl.ETYPE] = compact_g1.edata[dgl.ETYPE][idx]
compact_g2.edata['inner_edge'] = compact_g1.edata['inner_edge'][idx]
compact_g2.edata[dgl.EID] = th.arange(
num_edges, num_edges + compact_g2.number_of_edges())
num_edges += compact_g2.number_of_edges()
dgl.save_graphs(part_dir + '/graph.dgl', [compact_g2])
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
for part_id in range(num_parts):
part_dir = 'part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
"""Launching distributed graph partitioning pipeline """
import os
import sys
import argparse
import logging
import json
INSTALL_DIR = os.path.abspath(os.path.join(__file__, '..'))
LAUNCH_SCRIPT = "distgraphlaunch.py"
PIPELINE_SCRIPT = "distpartitioning/data_proc_pipeline.py"
UDF_WORLD_SIZE = "world-size"
UDF_PART_DIR = "partitions-dir"
UDF_INPUT_DIR = "input-dir"
UDF_GRAPH_NAME = "graph-name"
UDF_SCHEMA = "schema"
UDF_NUM_PARTS = "num-parts"
UDF_OUT_DIR = "output"
LARG_PROCS_MACHINE = "num_proc_per_machine"
LARG_IPCONF = "ip_config"
LARG_MASTER_PORT = "master_port"
def get_launch_cmd(args) -> str:
cmd = sys.executable + " " + os.path.join(INSTALL_DIR, LAUNCH_SCRIPT)
cmd = f"{cmd} --{LARG_PROCS_MACHINE} 1 "
cmd = f"{cmd} --{LARG_IPCONF} {args.ip_config} "
cmd = f"{cmd} --{LARG_MASTER_PORT} {args.master_port} "
return cmd
def submit_jobs(args) -> str:
wrapper_command = os.path.join(INSTALL_DIR, LAUNCH_SCRIPT)
#read the json file and get the remaining argument here.
schema_path = os.path.join(args.in_dir, "metadata.json")
with open(schema_path) as schema:
schema_map = json.load(schema)
num_parts = len(schema_map["num_nodes_per_chunk"][0])
graph_name = schema_map["graph_name"]
argslist = ""
argslist += "--world-size {} ".format(num_parts)
argslist += "--partitions-dir {} ".format(args.partitions_dir)
argslist += "--input-dir {} ".format(args.in_dir)
argslist += "--graph-name {} ".format(graph_name)
argslist += "--schema {} ".format(schema_path)
argslist += "--num-parts {} ".format(num_parts)
argslist += "--output {} ".format(args.out_dir)
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
udf_cmd = f"{args.python_path} {pipeline_cmd} {argslist}"
launch_cmd = get_launch_cmd(args)
launch_cmd += '\"'+udf_cmd+'\"'
print(launch_cmd)
os.system(launch_cmd)
def main():
parser = argparse.ArgumentParser(description='Dispatch edge index and data to partitions', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--in-dir', type=str, help='Location of the input directory where the dataset is located')
parser.add_argument('--partitions-dir', type=str, help='Location of the partition-id mapping files which define node-ids and their respective partition-ids, relative to the input directory')
parser.add_argument('--out-dir', type=str, help='Location of the output directory where the graph partitions will be created by this pipeline')
parser.add_argument('--ip-config', type=str, help='File location of IP configuration for server processes')
parser.add_argument('--master-port', type=int, default=12345, help='port used by gloo group to create randezvous point')
parser.add_argument('--python-path', type=str, default=sys.executable, help='Path to the Python executable on all workers')
args, udf_command = parser.parse_known_args()
assert os.path.isdir(args.in_dir)
assert os.path.isdir(args.partitions_dir)
assert os.path.isfile(args.ip_config)
assert isinstance(args.master_port, int)
tokens = sys.executable.split(os.sep)
submit_jobs(args)
if __name__ == '__main__':
fmt = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
main()
"""Launching tool for DGL distributed training"""
import os
import stat
import sys
import subprocess
import argparse
import signal
import logging
import time
import json
import multiprocessing
import re
from functools import partial
from threading import Thread
from typing import Optional
DEFAULT_PORT = 30050
def cleanup_proc(get_all_remote_pids, conn):
'''This process tries to clean up the remote training tasks.
'''
print('cleanupu process runs')
# This process should not handle SIGINT.
signal.signal(signal.SIGINT, signal.SIG_IGN)
data = conn.recv()
# If the launch process exits normally, this process doesn't need to do anything.
if data == 'exit':
sys.exit(0)
else:
remote_pids = get_all_remote_pids()
# Otherwise, we need to ssh to each machine and kill the training jobs.
for (ip, port), pids in remote_pids.items():
kill_process(ip, port, pids)
print('cleanup process exits')
def kill_process(ip, port, pids):
'''ssh to a remote machine and kill the specified processes.
'''
curr_pid = os.getpid()
killed_pids = []
# If we kill child processes first, the parent process may create more again. This happens
# to Python's process pool. After sorting, we always kill parent processes first.
pids.sort()
for pid in pids:
assert curr_pid != pid
print('kill process {} on {}:{}'.format(pid, ip, port), flush=True)
kill_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'kill {}\''.format(pid)
subprocess.run(kill_cmd, shell=True)
killed_pids.append(pid)
# It's possible that some of the processes are not killed. Let's try again.
for i in range(3):
killed_pids = get_killed_pids(ip, port, killed_pids)
if len(killed_pids) == 0:
break
else:
killed_pids.sort()
for pid in killed_pids:
print('kill process {} on {}:{}'.format(pid, ip, port), flush=True)
kill_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'kill -9 {}\''.format(pid)
subprocess.run(kill_cmd, shell=True)
def get_killed_pids(ip, port, killed_pids):
'''Get the process IDs that we want to kill but are still alive.
'''
killed_pids = [str(pid) for pid in killed_pids]
killed_pids = ','.join(killed_pids)
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'ps -p {} -h\''.format(killed_pids)
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
pids = []
for p in res.stdout.decode('utf-8').split('\n'):
l = p.split()
if len(l) > 0:
pids.append(int(l[0]))
return pids
def execute_remote(
cmd: str,
ip: str,
port: int,
username: Optional[str] = ""
) -> Thread:
"""Execute command line on remote machine via ssh.
Args:
cmd: User-defined command (udf) to execute on the remote host.
ip: The ip-address of the host to run the command on.
port: Port number that the host is listening on.
thread_list:
username: Optional. If given, this will specify a username to use when issuing commands over SSH.
Useful when your infra requires you to explicitly specify a username to avoid permission issues.
Returns:
thread: The Thread whose run() is to run the `cmd` on the remote host. Returns when the cmd completes
on the remote host.
"""
ip_prefix = ""
if username:
ip_prefix += "{username}@".format(username=username)
# Construct ssh command that executes `cmd` on the remote host
ssh_cmd = "ssh -o StrictHostKeyChecking=no -p {port} {ip_prefix}{ip} '{cmd}'".format(
port=str(port),
ip_prefix=ip_prefix,
ip=ip,
cmd=cmd,
)
# thread func to run the job
def run(ssh_cmd):
subprocess.check_call(ssh_cmd, shell=True)
thread = Thread(target=run, args=(ssh_cmd,))
thread.setDaemon(True)
thread.start()
return thread
def get_remote_pids(ip, port, cmd_regex):
"""Get the process IDs that run the command in the remote machine.
"""
pids = []
curr_pid = os.getpid()
# Here we want to get the python processes. We may get some ssh processes, so we should filter them out.
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'ps -aux | grep python | grep -v StrictHostKeyChecking\''
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
for p in res.stdout.decode('utf-8').split('\n'):
l = p.split()
if len(l) < 2:
continue
# We only get the processes that run the specified command.
res = re.search(cmd_regex, p)
if res is not None and int(l[1]) != curr_pid:
pids.append(l[1])
pid_str = ','.join([str(pid) for pid in pids])
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'pgrep -P {}\''.format(pid_str)
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
pids1 = res.stdout.decode('utf-8').split('\n')
all_pids = []
for pid in set(pids + pids1):
if pid == '' or int(pid) == curr_pid:
continue
all_pids.append(int(pid))
all_pids.sort()
return all_pids
def get_all_remote_pids(hosts, ssh_port, udf_command):
'''Get all remote processes.
'''
remote_pids = {}
for node_id, host in enumerate(hosts):
ip, _ = host
# When creating training processes in remote machines, we may insert some arguments
# in the commands. We need to use regular expressions to match the modified command.
cmds = udf_command.split()
new_udf_command = ' .*'.join(cmds)
pids = get_remote_pids(ip, ssh_port, new_udf_command)
remote_pids[(ip, ssh_port)] = pids
return remote_pids
def construct_torch_dist_launcher_cmd(
num_trainers: int,
num_nodes: int,
node_rank: int,
master_addr: str,
master_port: int
) -> str:
"""Constructs the torch distributed launcher command.
Helper function.
Args:
num_trainers:
num_nodes:
node_rank:
master_addr:
master_port:
Returns:
cmd_str.
"""
torch_cmd_template = "-m torch.distributed.launch " \
"--nproc_per_node={nproc_per_node} " \
"--nnodes={nnodes} " \
"--node_rank={node_rank} " \
"--master_addr={master_addr} " \
"--master_port={master_port}"
return torch_cmd_template.format(
nproc_per_node=num_trainers,
nnodes=num_nodes,
node_rank=node_rank,
master_addr=master_addr,
master_port=master_port
)
def wrap_udf_in_torch_dist_launcher(
udf_command: str,
num_trainers: int,
num_nodes: int,
node_rank: int,
master_addr: str,
master_port: int,
) -> str:
"""Wraps the user-defined function (udf_command) with the torch.distributed.launch module.
Example: if udf_command is "python3 run/some/trainer.py arg1 arg2", then new_df_command becomes:
"python3 -m torch.distributed.launch <TORCH DIST ARGS> run/some/trainer.py arg1 arg2
udf_command is assumed to consist of pre-commands (optional) followed by the python launcher script (required):
Examples:
# simple
python3.7 path/to/some/trainer.py arg1 arg2
# multi-commands
(cd some/dir && python3.7 path/to/some/trainer.py arg1 arg2)
IMPORTANT: If udf_command consists of multiple python commands, then this will result in undefined behavior.
Args:
udf_command:
num_trainers:
num_nodes:
node_rank:
master_addr:
master_port:
Returns:
"""
torch_dist_cmd = construct_torch_dist_launcher_cmd(
num_trainers=num_trainers,
num_nodes=num_nodes,
node_rank=node_rank,
master_addr=master_addr,
master_port=master_port
)
# Auto-detect the python binary that kicks off the distributed trainer code.
# Note: This allowlist order matters, this will match with the FIRST matching entry. Thus, please add names to this
# from most-specific to least-specific order eg:
# (python3.7, python3.8) -> (python3)
# The allowed python versions are from this: https://www.dgl.ai/pages/start.html
python_bin_allowlist = (
"python3.6", "python3.7", "python3.8", "python3.9", "python3",
# for backwards compatibility, accept python2 but technically DGL is a py3 library, so this is not recommended
"python2.7", "python2",
)
# If none of the candidate python bins match, then we go with the default `python`
python_bin = "python"
for candidate_python_bin in python_bin_allowlist:
if candidate_python_bin in udf_command:
python_bin = candidate_python_bin
break
# transforms the udf_command from:
# python path/to/dist_trainer.py arg0 arg1
# to:
# python -m torch.distributed.launch [DIST TORCH ARGS] path/to/dist_trainer.py arg0 arg1
# Note: if there are multiple python commands in `udf_command`, this may do the Wrong Thing, eg launch each
# python command within the torch distributed launcher.
new_udf_command = udf_command.replace(python_bin, f"{python_bin} {torch_dist_cmd}")
return new_udf_command
def construct_dgl_server_env_vars(
ip_config: str,
num_proc_per_machine: int,
pythonpath: Optional[str] = "",
) -> str:
"""Constructs the DGL server-specific env vars string that are required for DGL code to behave in the correct
server role.
Convenience function.
Args:
ip_config: IP config file containing IP addresses of cluster hosts.
Relative path to workspace.
num_proc_per_machine:
pythonpath: Optional. If given, this will pass this as PYTHONPATH.
Returns:
server_env_vars: The server-specific env-vars in a string format, friendly for CLI execution.
"""
server_env_vars_template = (
"DGL_IP_CONFIG={DGL_IP_CONFIG} "
"DGL_NUM_SERVER={DGL_NUM_SERVER} "
"{suffix_optional_envvars}"
)
suffix_optional_envvars = ""
if pythonpath:
suffix_optional_envvars += f"PYTHONPATH={pythonpath} "
return server_env_vars_template.format(
DGL_IP_CONFIG=ip_config,
DGL_NUM_SERVER=num_proc_per_machine,
suffix_optional_envvars=suffix_optional_envvars,
)
def wrap_cmd_with_local_envvars(cmd: str, env_vars: str) -> str:
"""Wraps a CLI command with desired env vars with the following properties:
(1) env vars persist for the entire `cmd`, even if it consists of multiple "chained" commands like:
cmd = "ls && pwd && python run/something.py"
(2) env vars don't pollute the environment after `cmd` completes.
Example:
>>> cmd = "ls && pwd"
>>> env_vars = "VAR1=value1 VAR2=value2"
>>> wrap_cmd_with_local_envvars(cmd, env_vars)
"(export VAR1=value1 VAR2=value2; ls && pwd)"
Args:
cmd:
env_vars: A string containing env vars, eg "VAR1=val1 VAR2=val2"
Returns:
cmd_with_env_vars:
"""
# use `export` to persist env vars for entire cmd block. required if udf_command is a chain of commands
# also: wrap in parens to not pollute env:
# https://stackoverflow.com/a/45993803
return f"(export {env_vars}; {cmd})"
def wrap_cmd_with_extra_envvars(cmd: str, env_vars: list) -> str:
"""Wraps a CLI command with extra env vars
Example:
>>> cmd = "ls && pwd"
>>> env_vars = ["VAR1=value1", "VAR2=value2"]
>>> wrap_cmd_with_extra_envvars(cmd, env_vars)
"(export VAR1=value1 VAR2=value2; ls && pwd)"
Args:
cmd:
env_vars: A list of strings containing env vars, e.g., ["VAR1=value1", "VAR2=value2"]
Returns:
cmd_with_env_vars:
"""
env_vars = " ".join(env_vars)
return wrap_cmd_with_local_envvars(cmd, env_vars)
def submit_jobs(args, udf_command):
"""Submit distributed jobs (server and client processes) via ssh"""
hosts = []
thread_list = []
server_count_per_machine = 0
# Get the IP addresses of the cluster.
#ip_config = os.path.join(args.workspace, args.ip_config)
ip_config = args.ip_config
with open(ip_config) as f:
for line in f:
result = line.strip().split()
if len(result) == 2:
ip = result[0]
port = int(result[1])
hosts.append((ip, port))
elif len(result) == 1:
ip = result[0]
port = DEFAULT_PORT
hosts.append((ip, port))
else:
raise RuntimeError("Format error of ip_config.")
server_count_per_machine = args.num_proc_per_machine
# launch server tasks
server_env_vars = construct_dgl_server_env_vars(
ip_config=args.ip_config,
num_proc_per_machine=args.num_proc_per_machine,
pythonpath=os.environ.get("PYTHONPATH", ""),
)
for i in range(len(hosts) * server_count_per_machine):
ip, _ = hosts[int(i / server_count_per_machine)]
server_env_vars_cur = f"{server_env_vars} RANK={i} MASTER_ADDR={hosts[0][0]} MASTER_PORT={args.master_port}"
cmd = wrap_cmd_with_local_envvars(udf_command, server_env_vars_cur)
print(cmd)
thread_list.append(execute_remote(cmd, ip, args.ssh_port, username=args.ssh_username))
# Start a cleanup process dedicated for cleaning up remote training jobs.
conn1,conn2 = multiprocessing.Pipe()
func = partial(get_all_remote_pids, hosts, args.ssh_port, udf_command)
process = multiprocessing.Process(target=cleanup_proc, args=(func, conn1))
process.start()
def signal_handler(signal, frame):
logging.info('Stop launcher')
# We need to tell the cleanup process to kill remote training jobs.
conn2.send('cleanup')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
for thread in thread_list:
thread.join()
# The training processes complete. We should tell the cleanup process to exit.
conn2.send('exit')
process.join()
def main():
parser = argparse.ArgumentParser(description='Launch a distributed job')
parser.add_argument('--ssh_port', type=int, default=22, help='SSH Port.')
parser.add_argument(
"--ssh_username", default="",
help="Optional. When issuing commands (via ssh) to cluster, use the provided username in the ssh cmd. "
"Example: If you provide --ssh_username=bob, then the ssh command will be like: 'ssh bob@1.2.3.4 CMD' "
"instead of 'ssh 1.2.3.4 CMD'"
)
parser.add_argument('--num_proc_per_machine', type=int,
help='The number of server processes per machine')
parser.add_argument('--master_port', type=int,
help='This port is used to form gloo group (randevouz server)')
parser.add_argument('--ip_config', type=str,
help='The file (in workspace) of IP configuration for server processes')
args, udf_command = parser.parse_known_args()
assert len(udf_command) == 1, 'Please provide user command line.'
assert args.num_proc_per_machine is not None and args.num_proc_per_machine > 0, \
'--num_proc_per_machine must be a positive number.'
assert args.ip_config is not None, \
'A user has to specify an IP configuration file with --ip_config.'
udf_command = str(udf_command[0])
if 'python' not in udf_command:
raise RuntimeError("DGL launching script can only support Python executable file.")
submit_jobs(args, udf_command)
if __name__ == '__main__':
fmt = '%(asctime)s %(levelname)s %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
main()
### xxx_nodes.txt format
This file is used to provide node information to this framework. Following is the format for each line in this file:
```
<node_type> <weight1> <weight2> <weight3> <weight4> <global_type_node_id> <attributes>
```
where node_type is the type id of this node, weights can be any number of columns as determined by the user, global_type_node_id are the contiguous ids starting from `0` for a particular node_type. And attributes can be any number of columns at the end of each line.
###xxx___edges.txt format
This file is used to provide edge information to this framework. Following is the format for each line in this file:
```
<global_src_id> <global_dst_id> <global_type_edge_id> <edge_type> <attributes>
```
where global_src_id and global_dst_id are two end points of an edge, global_type_edge_id is the unique id assigned to each edge type and are contiguous, and starting from 0, for each edge_type. Attributes can be any number of columns at the end of each line.
###Naming convention
`global_` prefix (for any node or edge ids) indicate that these ids are read from graph input files. These ids are allocated to nodes and edges before `data shuffling`. These ids are globally unique across all partitions.
`shuffle_global_` prefix (for any node or edge ids) indicate that these ids are assigned after the `data shuffling` is completed. These ids are globally unique across all partitions.
`part_local_` prefix (for any node or edge ids) indicate that these ids are assigned after the `data shuffling` and are unique within a given partition.
For instance, if a variable is named as `global_src_id` it means that this id is read from the graph input file and is assumed to be globally unique across all partitions. Similarly if a variable is named `part_local_node_id` then it means that this node_id is assigned after the data shuffling is complete and is unique with a given partition.
###High level description of the algorithm
####Single file format for graph input files
Here we assume that all the nodes' related data is present in one single file and similarly all the edges are in one single file.
In this case following steps are executed to write dgl objects for each partition, as assigned my any partitioning algorithm, for example METIS.
#####Step 1 (Data Loading):
Rank-0 process reads in all the graph files which are xxx_nodes.txt, xxx_edges.txt, node_feats.dgl, edge_feats.dgl and xxx_removed_edges.txt.
Rank-0 process determines the ownership of nodes by using the output of partitioning algorithm (here, we expect the output of partitioning step is a mapping between a node and its partition id for the entire graph). Edge ownership is determined by the `destination` node-id for that edge. Each edge belongs to the partition-id of the destination node-id of each edge.
#####Step 2 (Data Shuffling):
Rank-0 process will send node-data, edge-data, node-features, edge-features to their respective processes by using the ownership rules described in Step-1. Non-Rank-0 processes will receive their own nodes, edges, node-features and edge-features and store them in local data-structures. Upon completion of sending information Rank-0 process will delete nodes, edges, node-features and edge-features which are not owned by rank-0.
#####Step 3 (ID assignment and resolution):
At this time all the ranks will have their own local information in their respective data structures. Then each process will perform the following steps: a) Assign shuffle_global_xxx (here xxx is node_ids and edge_ids) for nodes and edges by performing prefix sum on all ranks. b) Assign part_local_xxx (xxx means node_ids and edge_ids) to nodes and edges so that they can be used to index into the node and edge features, and c) Retrieve shuffle_global_node_ids by using global_node_ids to determine the ownership of any given node. This step is done for the node_ids (present locally on any given rank) for which shuffle_global_node_ids were assigned on a different rank'ed process.
#####Ste 4 (Serialization):
After every rank has global-ids, shuffle_global-ids, part_local-ids for all the nodes and edges present locally, then it proceeds by DGL object creation. Finally Rank-0 process will aggregate graph-level metadata and create a json file with graph-level information.
###How to use this tool
To run this code on a single machine using multiple processes, use the following command
```
python3 data_proc_pipeline.py --world-size 2 --nodes-file mag_nodes.txt --edges-file mag_edges.txt --node-feats-file node_feat.dgl --metis-partitions mag_part.2 --input-dir /home/ubuntu/data --graph-name mag --schema mag.json --num-parts 2 --num-node-weights 4 --workspace /home/ubuntu/data --node-attr-dtype float --output /home/ubuntu/data/outputs --removed-edges mag_removed_edges.txt
```
Above command, assumes that there are `2` partitions and number of node weights are `4`. All other command line arguments are self-explanatory.
GLOBAL_NID = "global_node_id"
GLOBAL_EID = "global_edge_id"
SHUFFLE_GLOBAL_NID = "shuffle_global_node_id"
SHUFFLE_GLOBAL_EID = "shuffle_global_edge_id"
NTYPE_ID = "node_type_id"
ETYPE_ID = "edge_type_id"
GLOBAL_TYPE_NID = "global_type_node_id"
GLOBAL_TYPE_EID = "global_type_edge_id"
GLOBAL_SRC_ID = "global_src_id"
GLOBAL_DST_ID = "global_dst_id"
SHUFFLE_GLOBAL_SRC_ID = "shuffle_global_src_id"
SHUFFLE_GLOBAL_DST_ID = "shuffle_global_dst_id"
OWNER_PROCESS = "owner_proc_id"
PART_LOCAL_NID = "part_local_nid"
GLOO_MESSAGING_TIMEOUT = 60 #seconds
STR_NODE_TYPE = "node_type"
STR_NUM_NODES_PER_CHUNK = "num_nodes_per_chunk"
STR_EDGE_TYPE = "edge_type"
STR_NUM_EDGES_PER_CHUNK = "num_edges_per_chunk"
STR_EDGES = "edges"
STR_FORMAT = "format"
STR_DATA = "data"
STR_NODE_DATA = "node_data"
STR_EDGE_DATA = "edge_data"
STR_NUMPY = "numpy"
STR_CSV = "csv"
STR_NAME = "name"
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
import pandas as pd
import constants
from pyarrow import csv
from utils import read_json, get_idranges
def create_dgl_object(graph_name, num_parts, \
schema, part_id, node_data, \
edge_data, nodeid_offset, edgeid_offset):
"""
This function creates dgl objects for a given graph partition, as in function
arguments.
The "schema" argument is a dictionary, which contains the metadata related to node ids
and edge ids. It contains two keys: "nid" and "eid", whose value is also a dictionary
with the following structure.
1. The key-value pairs in the "nid" dictionary has the following format.
"ntype-name" is the user assigned name to this node type. "format" describes the
format of the contents of the files. and "data" is a list of lists, each list has
3 elements: file-name, start_id and end_id. File-name can be either absolute or
relative path to this file and starting and ending ids are type ids of the nodes
which are contained in this file. These type ids are later used to compute global ids
of these nodes which are used throughout the processing of this pipeline.
"ntype-name" : {
"format" : "csv",
"data" : [
[ <path-to-file>/ntype0-name-0.csv, start_id0, end_id0],
[ <path-to-file>/ntype0-name-1.csv, start_id1, end_id1],
...
[ <path-to-file>/ntype0-name-<p-1>.csv, start_id<p-1>, end_id<p-1>],
]
}
2. The key-value pairs in the "eid" dictionary has the following format.
As described for the "nid" dictionary the "eid" dictionary is similarly structured
except that these entries are for edges.
"etype-name" : {
"format" : "csv",
"data" : [
[ <path-to-file>/etype0-name-0, start_id0, end_id0],
[ <path-to-file>/etype0-name-1 start_id1, end_id1],
...
[ <path-to-file>/etype0-name-1 start_id<p-1>, end_id<p-1>]
]
}
In "nid" dictionary, the type_nids are specified that
should be assigned to nodes which are read from the corresponding nodes file.
Along the same lines dictionary for the key "eid" is used for edges in the
input graph.
These type ids, for nodes and edges, are used to compute global ids for nodes
and edges which are stored in the graph object.
Parameters:
-----------
graph_name : string
name of the graph
num_parts : int
total no. of partitions (of the original graph)
schame : json object
json object created by reading the graph metadata json file
part_id : int
partition id of the graph partition for which dgl object is to be created
node_data : numpy ndarray
node_data, where each row is of the following format:
<global_nid> <ntype_id> <global_type_nid>
edge_data : numpy ndarray
edge_data, where each row is of the following format:
<global_src_id> <global_dst_id> <etype_id> <global_type_eid>
nodeid_offset : int
offset to be used when assigning node global ids in the current partition
edgeid_offset : int
offset to be used when assigning edge global ids in the current partition
Returns:
--------
dgl object
dgl object created for the current graph partition
dictionary
map between node types and the range of global node ids used
dictionary
map between edge types and the range of global edge ids used
dictionary
map between node type(string) and node_type_id(int)
dictionary
map between edge type(string) and edge_type_id(int)
"""
#create auxiliary data structures from the schema object
ntid_dict, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK])
etid_dict, global_eid_ranges = get_idranges(schema[constants.STR_EDGE_TYPE],
schema[constants.STR_NUM_EDGES_PER_CHUNK])
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
ntypes = [(key, global_nid_ranges[key][0, 0]) for key in global_nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e.split(":")[1]: i for i, e in enumerate(etypes)}
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype.split(":")[1]: [] for etype in etypes}
shuffle_global_nids, ntype_ids, global_type_nid = node_data[constants.SHUFFLE_GLOBAL_NID], \
node_data[constants.NTYPE_ID], node_data[constants.GLOBAL_TYPE_NID]
global_homo_nid = ntype_offset_np[ntype_ids] + global_type_nid
assert np.all(shuffle_global_nids[1:] - shuffle_global_nids[:-1] == 1)
shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1])
# Determine the node ID ranges of different node types.
for ntype_name in global_nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = shuffle_global_nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
#process edges
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_TYPE_EID], edge_data[constants.ETYPE_ID]
print('There are {} edges in partition {}'.format(len(shuffle_global_src_id), part_id))
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids)
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
shuffle_global_src_id[sort_idx], shuffle_global_dst_id[sort_idx], global_src_id[sort_idx], \
global_dst_id[sort_idx], global_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
# Determine the edge ID range of different edge types.
edge_id_start = edgeid_offset
for etype_name in global_eid_ranges:
tokens = etype_name.split(":")
assert len(tokens) == 3
etype_id = etypes_map[tokens[1]]
edge_map_val[tokens[1]].append([edge_id_start,
edge_id_start + np.sum(etype_ids == etype_id)])
edge_id_start += np.sum(etype_ids == etype_id)
# get the edge list in some order and then reshuffle.
# Here the order of nodes is defined by the `np.unique` function
# node order is as listed in the uniq_ids array
ids = np.concatenate(
[shuffle_global_src_id, shuffle_global_dst_id,
np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
part_local_src_id, part_local_dst_id = np.split(inverse_idx[:len(shuffle_global_src_id) * 2], 2)
inner_nodes = th.as_tensor(np.logical_and(
uniq_ids >= shuffle_global_nid_range[0],
uniq_ids <= shuffle_global_nid_range[1]))
#get the list of indices, from inner_nodes, which will sort inner_nodes as [True, True, ...., False, False, ...]
#essentially local nodes will be placed before non-local nodes.
reshuffle_nodes = th.arange(len(uniq_ids))
reshuffle_nodes = th.cat([reshuffle_nodes[inner_nodes.bool()],
reshuffle_nodes[inner_nodes == 0]])
'''
Following procedure is used to map the part_local_src_id, part_local_dst_id to account for
reshuffling of nodes (to order localy owned nodes prior to non-local nodes in a partition)
1. Form a node_map, in this case a numpy array, which will be used to map old node-ids (pre-reshuffling)
to post-reshuffling ids.
2. Once the map is created, use this map to map all the node-ids in the part_local_src_id
and part_local_dst_id list to their appropriate `new` node-ids (post-reshuffle order).
3. Since only the node's order is changed, we will have to re-order nodes related information when
creating dgl object: this includes orig_id, dgl.NTYPE, dgl.NID and inner_node.
4. Edge's order is not changed. At this point in the execution path edges are still ordered by their etype-ids.
5. Create the dgl object appropriately and return the dgl object.
Here is a simple example to understand the above flow better.
part_local_nids = [0, 1, 2, 3, 4, 5]
part_local_src_ids = [0, 0, 0, 0, 2, 3, 4]
part_local_dst_ids = [1, 2, 3, 4, 4, 4, 5]
Assume that nodes {1, 5} are halo-nodes, which are not owned by this partition.
reshuffle_nodes = [0, 2, 3, 4, 1, 5]
A node_map, which maps node-ids from old to reshuffled order is as follows:
node_map = np.zeros((len(reshuffle_nodes,)))
node_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))
Using the above map, we have mapped part_local_src_ids and part_local_dst_ids as follows:
part_local_src_ids = [0, 0, 0, 0, 1, 2, 3]
part_local_dst_ids = [4, 1, 2, 3, 3, 3, 5]
In this graph above, note that nodes {0, 1, 2, 3} are inner_nodes and {4, 5} are NON-inner-nodes
Since the edge are re-ordered in any way, there is no reordering required for edge related data
during the DGL object creation.
'''
#create the mappings to generate mapped part_local_src_id and part_local_dst_id
#This map will map from unshuffled node-ids to reshuffled-node-ids (which are ordered to prioritize
#locally owned nodes).
nid_map = np.zeros((len(reshuffle_nodes,)))
nid_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))
#Now map the edge end points to reshuffled_values.
part_local_src_id, part_local_dst_id = nid_map[part_local_src_id], nid_map[part_local_dst_id]
#create the graph here now.
part_graph = dgl.graph(data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids))
part_graph.edata[dgl.EID] = th.arange(
edgeid_offset, edgeid_offset + part_graph.number_of_edges(), dtype=th.int64)
part_graph.edata['orig_id'] = th.as_tensor(global_edge_id)
part_graph.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
part_graph.edata['inner_edge'] = th.ones(part_graph.number_of_edges(), dtype=th.bool)
#compute per_type_ids and ntype for all the nodes in the graph.
global_ids = np.concatenate(
[global_src_id, global_dst_id, global_homo_nid])
part_global_ids = global_ids[idx]
part_global_ids = part_global_ids[reshuffle_nodes]
ntype, per_type_ids = id_map(part_global_ids)
#continue with the graph creation
part_graph.ndata['orig_id'] = th.as_tensor(per_type_ids)
part_graph.ndata[dgl.NTYPE] = th.as_tensor(ntype)
part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
part_graph.ndata['inner_node'] = inner_nodes[reshuffle_nodes]
return part_graph, node_map_val, edge_map_val, ntypes_map, etypes_map
def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, node_map_val, \
edge_map_val, ntypes_map, etypes_map, output_dir ):
"""
Auxiliary function to create json file for the graph partition metadata
Parameters:
-----------
graph_name : string
name of the graph
num_nodes : int
no. of nodes in the graph partition
num_edges : int
no. of edges in the graph partition
part_id : int
integer indicating the partition id
num_parts : int
total no. of partitions of the original graph
node_map_val : dictionary
map between node types and the range of global node ids used
edge_map_val : dictionary
map between edge types and the range of global edge ids used
ntypes_map : dictionary
map between node type(string) and node_type_id(int)
etypes_map : dictionary
map between edge type(string) and edge_type_id(int)
output_dir : string
directory where the output files are to be stored
Returns:
--------
dictionary
map describing the graph information
"""
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
part_dir = 'part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
return part_metadata
import argparse
import numpy as np
import torch.multiprocessing as mp
import logging
import platform
import os
from data_shuffle import single_machine_run, multi_machine_run
def log_params(params):
""" Print all the command line arguments for debugging purposes.
Parameters:
-----------
params: argparse object
Argument Parser structure listing all the pre-defined parameters
"""
print('Input Dir: ', params.input_dir)
print('Graph Name: ', params.graph_name)
print('Schema File: ', params.schema)
print('No. partitions: ', params.num_parts)
print('Output Dir: ', params.output)
print('WorldSize: ', params.world_size)
print('Metis partitions: ', params.partitions_file)
if __name__ == "__main__":
"""
Start of execution from this point.
Invoke the appropriate function to begin execution
"""
#arguments which are already needed by the existing implementation of convert_partition.py
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--partitions-dir', help='directory of the partition-ids for each node type',
default=None, type=str)
#arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn',
default=1, type=int, required=True)
params = parser.parse_args()
#invoke the pipeline function
logging.basicConfig(level='INFO', format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s")
multi_machine_run(params)
This diff is collapsed.
import os
import numpy as np
import constants
import torch
import logging
import pyarrow
from pyarrow import csv
from utils import get_idranges
def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
"""
Function to read the multiple file formatted dataset.
Parameters:
-----------
input_dir : string
root directory where dataset is located.
graph_name : string
graph name string
rank : int
rank of the current process
world_size : int
total number of process in the current execution
schema_map : dictionary
this is the dictionary created by reading the graph metadata json file
for the input graph dataset
Return:
-------
dictionary
where keys are node-type names and values are tuples. Each tuple represents the
range of type ids read from a file by the current process. Please note that node
data for each node type is split into "p" files and each one of these "p" files are
read a process in the distributed graph partitioning pipeline
dictionary
Data read from numpy files for all the node features in this dataset. Dictionary built
using this data has keys as node feature names and values as tensor data representing
node features
dictionary
in which keys are node-type and values are a triplet. This triplet has node-feature name,
and range of tids for the node feature data read from files by the current process. Each
node-type may have mutiple feature(s) and associated tensor data.
dictionary
Data read from edges.txt file and used to build a dictionary with keys as column names
and values as columns in the csv file.
dictionary
in which keys are edge-type names and values are triplets. This triplet has edge-feature name,
and range of tids for theedge feature data read from the files by the current process. Each
edge-type may have several edge features and associated tensor data.
"""
#node features dictionary
#TODO: With the new file format, It is guaranteed that the input dataset will have
#no. of nodes with features (node-features) files and nodes metadata will always be the same.
#This means the dimension indicating the no. of nodes in any node-feature files and the no. of
#nodes in the corresponding nodes metadata file will always be the same. With this guarantee,
#we can eliminate the `node_feature_tids` dictionary since the same information is also populated
#in the `node_tids` dictionary. This will be remnoved in the next iteration of code changes.
node_features = {}
node_feature_tids = {}
'''
The structure of the node_data is as follows, which is present in the input metadata json file.
"node_data" : {
"ntype0-name" : {
"feat0-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
},
"feat1-name" : {
"format" : {"name": "numpy"},
"data" : [ #list
"<path>/feat-0.npy",
"<path>/feat-1.npy",
....
"<path>/feat-<p-1>.npy"
]
}
}
}
As shown above, the value for the key "node_data" is a dictionary object, which is
used to describe the feature data for each of the node-type names. Keys in this top-level
dictionary are node-type names and value is a dictionary which captures all the features
for the current node-type. Feature data is captured with keys being the feature-names and
value is a dictionary object which has 2 keys namely format and data. Format entry is used
to mention the format of the storage used by the node features themselves and "data" is used
to mention all the files present for this given node feature.
Data read from each of the node features file is a multi-dimensional tensor data and is read
in numpy format, which is also the storage format of node features on the permanent storage.
'''
#iterate over the "node_data" dictionary in the schema_map
#read the node features if exists
#also keep track of the type_nids for which the node_features are read.
dataset_features = schema_map[constants.STR_NODE_DATA]
if((dataset_features is not None) and (len(dataset_features) > 0)):
for ntype_name, ntype_feature_data in dataset_features.items():
#ntype_feature_data is a dictionary
#where key: feature_name, value: dictionary in which keys are "format", "data"
node_feature_tids[ntype_name] = []
for feat_name, feat_data in ntype_feature_data.items():
assert len(feat_data[constants.STR_DATA]) == world_size
assert feat_data[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_NUMPY
my_feat_data_fname = feat_data[constants.STR_DATA][rank] #this will be just the file name
if (os.path.isabs(my_feat_data_fname)):
logging.info(f'Loading numpy from {my_feat_data_fname}')
node_features[ntype_name+'/'+feat_name] = \
torch.from_numpy(np.load(my_feat_data_fname))
else:
numpy_path = os.path.join(input_dir, my_feat_data_fname)
logging.info(f'Loading numpy from {numpy_path}')
node_features[ntype_name+'/'+feat_name] = \
torch.from_numpy(np.load(numpy_path))
node_feature_tids[ntype_name].append([feat_name, -1, -1])
'''
"node_type" : ["ntype0-name", "ntype1-name", ....], #m node types
"num_nodes_per_chunk" : [
[a0, a1, ...a<p-1>], #p partitions
[b0, b1, ... b<p-1>],
....
[c0, c1, ..., c<p-1>] #no, of node types
],
The "node_type" points to a list of all the node names present in the graph
And "num_nodes_per_chunk" is used to mention no. of nodes present in each of the
input nodes files. These node counters are used to compute the type_node_ids as
well as global node-ids by using a simple cumulative summation and maitaining an
offset counter to store the end of the current.
Since nodes are NOT actually associated with any additional metadata, w.r.t to the processing
involved in this pipeline this information is not needed to be stored in files. This optimization
saves a considerable amount of time when loading massively large datasets for paritioning.
As opposed to reading from files and performing shuffling process each process/rank generates nodes
which are owned by that particular rank. And using the "num_nodes_per_chunk" information each
process can easily compute any nodes per-type node_id and global node_id.
The node-ids are treated as int64's in order to support billions of nodes in the input graph.
'''
#read my nodes for each node type
node_tids, ntype_gnid_offset = get_idranges(schema_map[constants.STR_NODE_TYPE],
schema_map[constants.STR_NUM_NODES_PER_CHUNK])
for ntype_name in schema_map[constants.STR_NODE_TYPE]:
if ntype_name in node_feature_tids:
for item in node_feature_tids[ntype_name]:
item[1] = node_tids[ntype_name][rank][0]
item[2] = node_tids[ntype_name][rank][1]
#done build node_features locally.
if len(node_features) <= 0:
logging.info(f'[Rank: {rank}] This dataset does not have any node features')
else:
for k, v in node_features.items():
logging.info(f'[Rank: {rank}] node feature name: {k}, feature data shape: {v.size()}')
'''
Code below is used to read edges from the input dataset with the help of the metadata json file
for the input graph dataset.
In the metadata json file, we expect the following key-value pairs to help read the edges of the
input graph.
"edge_type" : [ # a total of n edge types
canonical_etype_0,
canonical_etype_1,
...,
canonical_etype_n-1
]
The value for the key is a list of strings, each string is associated with an edgetype in the input graph.
Note that these strings are in canonical edgetypes format. This means, these edge type strings follow the
following naming convention: src_ntype:etype:dst_ntype. src_ntype and dst_ntype are node type names of the
src and dst end points of this edge type, and etype is the relation name between src and dst ntypes.
The files in which edges are present and their storage format are present in the following key-value pair:
"edges" : {
"canonical_etype_0" : {
"format" : { "name" : "csv", "delimiter" : " " },
"data" : [
filename_0,
filename_1,
filename_2,
....
filename_<p-1>
]
},
}
As shown above the "edges" dictionary value has canonical edgetypes as keys and for each canonical edgetype
we have "format" and "data" which describe the storage format of the edge files and actual filenames respectively.
Please note that each edgetype data is split in to `p` files, where p is the no. of partitions to be made of
the input graph.
Each edge file contains two columns representing the source per-type node_ids and destination per-type node_ids
of any given edge. Since these are node-ids as well they are read in as int64's.
'''
#read my edges for each edge type
etype_names = schema_map[constants.STR_EDGE_TYPE]
etype_name_idmap = {e : idx for idx, e in enumerate(etype_names)}
edge_tids, _ = get_idranges(schema_map[constants.STR_EDGE_TYPE],
schema_map[constants.STR_NUM_EDGES_PER_CHUNK])
edge_datadict = {}
edge_data = schema_map[constants.STR_EDGES]
#read the edges files and store this data in memory.
for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, \
constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]:
edge_datadict[col] = []
for etype_name, etype_info in edge_data.items():
assert etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_CSV
edge_info = etype_info[constants.STR_DATA]
assert len(edge_info) == world_size
#edgetype strings are in canonical format, src_node_type:edge_type:dst_node_type
tokens = etype_name.split(":")
assert len(tokens) == 3
src_ntype_name = tokens[0]
rel_name = tokens[1]
dst_ntype_name = tokens[2]
logging.info(f'Reading csv files from {edge_info[rank]}')
data_df = csv.read_csv(edge_info[rank], read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
#currently these are just type_edge_ids... which will be converted to global ids
edge_datadict[constants.GLOBAL_SRC_ID].append(data_df['f0'].to_numpy() + ntype_gnid_offset[src_ntype_name][0, 0])
edge_datadict[constants.GLOBAL_DST_ID].append(data_df['f1'].to_numpy() + ntype_gnid_offset[dst_ntype_name][0, 0])
edge_datadict[constants.GLOBAL_TYPE_EID].append(np.arange(edge_tids[etype_name][rank][0],\
edge_tids[etype_name][rank][1] ,dtype=np.int64))
edge_datadict[constants.ETYPE_ID].append(etype_name_idmap[etype_name] * \
np.ones(shape=(data_df['f0'].to_numpy().shape), dtype=np.int64))
#stitch together to create the final data on the local machine
for col in [constants.GLOBAL_SRC_ID, constants.GLOBAL_DST_ID, constants.GLOBAL_TYPE_EID, constants.ETYPE_ID]:
edge_datadict[col] = np.concatenate(edge_datadict[col])
assert edge_datadict[constants.GLOBAL_SRC_ID].shape == edge_datadict[constants.GLOBAL_DST_ID].shape
assert edge_datadict[constants.GLOBAL_DST_ID].shape == edge_datadict[constants.GLOBAL_TYPE_EID].shape
assert edge_datadict[constants.GLOBAL_TYPE_EID].shape == edge_datadict[constants.ETYPE_ID].shape
logging.info(f'[Rank: {rank}] Done reading edge_file: {len(edge_datadict)}, {edge_datadict[constants.GLOBAL_SRC_ID].shape}')
return node_tids, node_features, node_feature_tids, edge_datadict, edge_tids
import numpy as np
import torch
import operator
import itertools
import constants
from gloo_wrapper import allgather_sizes, alltoallv_cpu
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
"""
For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
is not present at the current rank, this function retrieves their shuffle_global_ids from the owner rank
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total no. of ranks configured
global_nids_ranks : list
list of numpy arrays (of global_nids), index of the list is the rank of the process
where global_nid <-> shuffle_global_nid mapping is located.
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays
Returns:
--------
numpy ndarray
where the column-0 are global_nids and column-1 are shuffle_global_nids which are retrieved
from other processes.
"""
#build a list of sizes (lengths of lists)
global_nids_ranks = [torch.from_numpy(x) for x in global_nids_ranks]
recv_nodes = alltoallv_cpu(rank, world_size, global_nids_ranks)
# Use node_data to lookup global id to send over.
send_nodes = []
for proc_i_nodes in recv_nodes:
#list of node-ids to lookup
if proc_i_nodes is not None:
global_nids = proc_i_nodes.numpy()
if(len(global_nids) != 0):
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
send_nodes.append(torch.from_numpy(shuffle_global_nids).type(dtype=torch.int64))
else:
send_nodes.append(torch.empty((0), dtype=torch.int64))
else:
send_nodes.append(torch.empty((0), dtype=torch.int64))
#send receive global-ids
recv_shuffle_global_nids = alltoallv_cpu(rank, world_size, send_nodes)
shuffle_global_nids = np.concatenate([x.numpy() if x is not None else [] for x in recv_shuffle_global_nids])
global_nids = np.concatenate([x for x in global_nids_ranks])
ret_val = np.column_stack([global_nids, shuffle_global_nids])
return ret_val
def get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, node_data):
"""
Edges which are owned by this rank, may have global_nids whose shuffle_global_nids are NOT present locally.
This function retrieves shuffle_global_nids for such global_nids.
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total no. of processes used
edge_data : numpy ndarray
edge_data (augmented) as read from the xxx_edges.txt file
node_part_ids : numpy array
list of partition ids indexed by global node ids.
node_data : dictionary
node_data, is a dictionary with keys as column_names and values as numpy arrays
"""
#determine unique node-ids present locally
global_nids = np.sort(np.unique(np.concatenate((edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID], node_data[constants.GLOBAL_NID]))))
#determine the rank which owns orig-node-id <-> partition/rank mappings
part_ids = node_part_ids[global_nids]
#form list of lists, each list includes global_nids whose mappings (shuffle_global_nids) needs to be retrieved.
#and rank will be the process which owns mappings of these global_nids
global_nids_ranks = []
for i in range(world_size):
if (i == rank):
global_nids_ranks.append(np.empty(shape=(0), dtype=np.int64))
continue
#not_owned_nodes = part_ids[:,0][part_ids[:,1] == i]
not_owned_node_ids = np.where(part_ids == i)[0]
if not_owned_node_ids.shape[0] == 0:
not_owned_nodes = np.empty(shape=(0), dtype=np.int64)
else:
not_owned_nodes = global_nids[not_owned_node_ids]
global_nids_ranks.append(not_owned_nodes)
#Retrieve Global-ids for respective node owners
non_local_nids = get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data)
#Add global_nid <-> shuffle_global_nid mappings to the received data
for i in range(world_size):
if (i == rank):
own_node_ids = np.where(part_ids == i)[0]
own_global_nids = global_nids[own_node_ids]
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], own_global_nids, return_indices=True)
my_shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
local_mappings = np.column_stack((own_global_nids, my_shuffle_global_nids))
resolved_global_nids = np.concatenate((non_local_nids, local_mappings))
#form a dictionary of mappings between orig-node-ids and global-ids
resolved_mappings = dict(zip(resolved_global_nids[:,0], resolved_global_nids[:,1]))
#determine global-ids for the orig-src-id and orig-dst-id
shuffle_global_src_id = [resolved_mappings[x] for x in edge_data[constants.GLOBAL_SRC_ID]]
shuffle_global_dst_id = [resolved_mappings[x] for x in edge_data[constants.GLOBAL_DST_ID]]
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID] = np.array(shuffle_global_src_id, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = np.array(shuffle_global_dst_id, dtype=np.int64)
def assign_shuffle_global_nids_nodes(rank, world_size, node_data):
"""
Utility function to assign shuffle global ids to nodes at a given rank
node_data gets converted from [ntype, global_type_nid, global_nid]
to [shuffle_global_nid, ntype, global_type_nid, global_nid, part_local_type_nid]
where shuffle_global_nid : global id of the node after data shuffle
ntype : node-type as read from xxx_nodes.txt
global_type_nid : node-type-id as read from xxx_nodes.txt
global_nid : node-id as read from xxx_nodes.txt, implicitly
this is the line no. in the file
part_local_type_nid : type_nid assigned by the current rank within its scope
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total number of processes used in the process group
ntype_counts: list of tuples
list of tuples (x,y), where x=ntype and y=no. of nodes whose shuffle_global_nids are needed
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays
"""
# Compute prefix sum to determine node-id offsets
prefix_sum_nodes = allgather_sizes([node_data[constants.GLOBAL_NID].shape[0]], world_size)
# assigning node-ids from localNodeStartId to (localNodeEndId - 1)
# Assuming here that the nodeDataArr is sorted based on the nodeType.
shuffle_global_nid_start = prefix_sum_nodes[rank]
shuffle_global_nid_end = prefix_sum_nodes[rank + 1]
# add a column with global-ids (after data shuffle)
shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64)
node_data[constants.SHUFFLE_GLOBAL_NID] = shuffle_global_nids
def assign_shuffle_global_nids_edges(rank, world_size, edge_data):
"""
Utility function to assign shuffle_global_eids to edges
edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype]
to [shuffle_global_src_nid, shuffle_global_dst_nid, global_src_nid, global_dst_nid, global_type_eid, etype]
Parameters:
-----------
rank : integer
rank of the current process
world_size : integer
total count of processes in execution
etype_counts : list of tuples
list of tuples (x,y), x = rank, y = no. of edges
edge_data : numpy ndarray
edge data as read from xxx_edges.txt file
Returns:
--------
integer
shuffle_global_eid_start, which indicates the starting value from which shuffle_global-ids are assigned to edges
on this rank
"""
#get prefix sum of edge counts per rank to locate the starting point
#from which global-ids to edges are assigned in the current rank
prefix_sum_edges = allgather_sizes([edge_data[constants.GLOBAL_SRC_ID].shape[0]], world_size)
shuffle_global_eid_start = prefix_sum_edges[rank]
shuffle_global_eid_end = prefix_sum_edges[rank + 1]
# assigning edge-ids from localEdgeStart to (localEdgeEndId - 1)
# Assuming here that the edge_data is sorted by edge_type
shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_EID] = shuffle_global_eids
return shuffle_global_eid_start
import numpy as np
import torch
import torch.distributed as dist
def allgather_sizes(send_data, world_size):
"""
Perform all gather on list lengths, used to compute prefix sums
to determine the offsets on each ranks. This is used to allocate
global ids for edges/nodes on each ranks.
Parameters
----------
send_data : numpy array
Data on which allgather is performed.
world_size : integer
No. of processes configured for execution
Returns :
---------
numpy array
array with the prefix sum
"""
#compute the length of the local data
send_length = len(send_data)
out_tensor = torch.as_tensor(send_data, dtype=torch.int64)
in_tensor = [torch.zeros(send_length, dtype=torch.int64)
for _ in range(world_size)]
#all_gather message
dist.all_gather(in_tensor, out_tensor)
#gather sizes in on array to return to the invoking function
rank_sizes = np.zeros(world_size + 1, dtype=np.int64)
count = rank_sizes[0]
for i, t in enumerate(in_tensor):
count += t.item()
rank_sizes[i+1] = count
return rank_sizes
def __alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
"""
Each process scatters list of input tensors to all processes in a cluster
and return gathered list of tensors in output list. The tensors should have the same shape.
Parameters
----------
rank : int
The rank of current worker
world_size : int
The size of the entire
output_tensor_list : List of tensor
The received tensors
input_tensor_list : List of tensor
The tensors to exchange
"""
input_tensor_list = [tensor.to(torch.device('cpu')) for tensor in input_tensor_list]
for i in range(world_size):
dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)
def alltoallv_cpu(rank, world_size, input_tensor_list):
"""
Wrapper function to providing the alltoallv functionality by using underlying alltoall
messaging primitive. This function, in its current implementation, supports exchanging
messages of arbitrary dimensions and is not tied to the user of this function.
This function pads all input tensors, except one, so that all the messages are of the same
size. Once the messages are padded, It first sends a vector whose first two elements are
1) actual message size along first dimension, and 2) Message size along first dimension
which is used for communication. The rest of the dimensions are assumed to be same across
all the input tensors. After receiving the message sizes, the receiving end will create buffers
of appropriate sizes. And then slices the received messages to remove the added padding, if any,
and returns to the caller.
Parameters:
-----------
rank : int
The rank of current worker
world_size : int
The size of the entire
input_tensor_list : List of tensor
The tensors to exchange
Returns:
--------
list :
list of tensors received from other processes during alltoall message
"""
#ensure len of input_tensor_list is same as the world_size.
assert input_tensor_list != None
assert len(input_tensor_list) == world_size
#ensure that all the tensors in the input_tensor_list are of same size.
sizes = [list(x.size()) for x in input_tensor_list]
for idx in range(1,len(sizes)):
assert len(sizes[idx-1]) == len(sizes[idx]) #no. of dimensions should be same
assert input_tensor_list[idx-1].dtype == input_tensor_list[idx].dtype # dtype should be same
assert sizes[idx-1][1:] == sizes[idx][1:] #except first dimension remaining dimensions should all be the same
#decide how much to pad.
#always use the first-dimension for padding.
ll = [ x[0] for x in sizes ]
#dims of the padding needed, if any
#these dims are used for padding purposes.
diff_dims = [ [np.amax(ll) - l[0]] + l[1:] for l in sizes ]
#pad the actual message
input_tensor_list = [torch.cat((x, torch.zeros(diff_dims[idx]).type(x.dtype))) for idx, x in enumerate(input_tensor_list)]
#send useful message sizes to all
send_counts = []
recv_counts = []
for idx in range(world_size):
#send a vector, of atleast 3 elements, [a, b, ....] where
#a = useful message dim, b = actual message outgoing message size along the first dimension
#and remaining elements are the remaining dimensions of the tensor
send_counts.append(torch.from_numpy(np.array([sizes[idx][0]] + [np.amax(ll)] + sizes[idx][1:] )).type(torch.int64))
recv_counts.append(torch.zeros((1 + len(sizes[idx])), dtype=torch.int64))
__alltoall_cpu(rank, world_size, recv_counts, send_counts)
#allocate buffers for receiving message
output_tensor_list = []
recv_counts = [ tsize.numpy() for tsize in recv_counts]
for idx, tsize in enumerate(recv_counts):
output_tensor_list.append(torch.zeros(tuple(tsize[1:])).type(input_tensor_list[idx].dtype))
#send actual message itself.
__alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list)
#extract un-padded message from the output_tensor_list and return it
return_vals = []
for s, t in zip(recv_counts, output_tensor_list):
if s[0] == 0:
return_vals.append(None)
else:
return_vals.append(t[0:s[0]])
return return_vals
def gather_metadata_json(metadata, rank, world_size):
"""
Gather an object (json schema on `rank`)
Parameters:
-----------
metadata : json dictionary object
json schema formed on each rank with graph level data.
This will be used as input to the distributed training in the later steps.
Returns:
--------
list : list of json dictionary objects
The result of the gather operation, which is the list of json dicitonary
objects from each rank in the world
"""
#Populate input obj and output obj list on rank-0 and non-rank-0 machines
input_obj = None if rank == 0 else metadata
output_objs = [None for _ in range(world_size)] if rank == 0 else None
#invoke the gloo method to perform gather on rank-0
dist.gather_object(input_obj, output_objs, dst=0)
return output_objs
import os
import torch
import numpy as np
import json
import dgl
import constants
import pyarrow
from pyarrow import csv
def read_ntype_partition_files(schema_map, input_dir):
"""
Utility method to read the partition id mapping for each node.
For each node type, there will be an file, in the input directory argument
containing the partition id mapping for a given nodeid.
Parameters:
-----------
schema_map : dictionary
dictionary created by reading the input metadata json file
input_dir : string
directory in which the node-id to partition-id mappings files are
located for each of the node types in the input graph
Returns:
--------
numpy array :
array of integers representing mapped partition-ids for a given node-id.
The line number, in these files, are used as the type_node_id in each of
the files. The index into this array will be the homogenized node-id and
value will be the partition-id for that node-id (index). Please note that
the partition-ids of each node-type are stacked together vertically and
in this way heterogenous node-ids are converted to homogenous node-ids.
"""
assert os.path.isdir(input_dir)
#iterate over the node types and extract the partition id mappings
part_ids = []
ntype_names = schema_map[constants.STR_NODE_TYPE]
for ntype in ntype_names:
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
ntype_partids = df['f0'].to_numpy()
part_ids.append(ntype_partids)
return np.concatenate(part_ids)
def read_json(json_file):
"""
Utility method to read a json file schema
Parameters:
-----------
json_file : string
file name for the json schema
Returns:
--------
dictionary, as serialized in the json_file
"""
with open(json_file) as schema:
val = json.load(schema)
return val
def get_ntype_featnames(ntype_name, schema_map):
"""
Retrieves node feature names for a given node_type
Parameters:
-----------
ntype_name : string
a string specifying a node_type name
schema : dictionary
metadata json object as a dictionary, which is read from the input
metadata file from the input dataset
Returns:
--------
list :
a list of feature names for a given node_type
"""
ntype_featdict = schema_map[constants.STR_NODE_DATA]
if (ntype_name in ntype_featdict):
featnames = []
ntype_info = ntype_featdict[ntype_name]
for k, v in ntype_info.items():
featnames.append(k)
return featnames
else:
return []
def get_node_types(schema_map):
"""
Utility method to extract node_typename -> node_type mappings
as defined by the input schema
Parameters:
-----------
schema_map : dictionary
Input schema from which the node_typename -> node_type
dictionary is created.
Returns:
--------
dictionary
with keys as node type names and values as ids (integers)
list
list of ntype name strings
dictionary
with keys as ntype ids (integers) and values as node type names
"""
ntypes = schema_map[constants.STR_NODE_TYPE]
ntype_ntypeid_map = {e : i for i, e in enumerate(ntypes)}
ntypeid_ntype_map = {i : e for i, e in enumerate(ntypes)}
return ntype_ntypeid_map, ntypes, ntypeid_ntype_map
def get_gnid_range_map(node_tids):
"""
Retrieves auxiliary dictionaries from the metadata json object
Parameters:
-----------
node_tids: dictionary
This dictionary contains the information about nodes for each node_type.
Typically this information contains p-entries, where each entry has a file-name,
starting and ending type_node_ids for the nodes in this file. Keys in this dictionary
are the node_type and value is a list of lists. Each individual entry in this list has
three items: file-name, starting type_nid and ending type_nid
Returns:
--------
dictionary :
a dictionary where keys are node_type names and values are global_nid range, which is a tuple.
"""
ntypes_gid_range = {}
offset = 0
for k, v in node_tids.items():
ntypes_gid_range[k] = [offset + int(v[0][0]), offset + int(v[-1][1])]
offset += int(v[-1][1])
return ntypes_gid_range
def write_metadata_json(metadata_list, output_dir, graph_name):
"""
Merge json schema's from each of the rank's on rank-0.
This utility function, to be used on rank-0, to create aggregated json file.
Parameters:
-----------
metadata_list : list of json (dictionaries)
a list of json dictionaries to merge on rank-0
output_dir : string
output directory path in which results are stored (as a json file)
graph-name : string
a string specifying the graph name
"""
#Initialize global metadata
graph_metadata = {}
#Merge global_edge_ids from each json object in the input list
edge_map = {}
x = metadata_list[0]["edge_map"]
for k in x:
edge_map[k] = []
for idx in range(len(metadata_list)):
edge_map[k].append([int(metadata_list[idx]["edge_map"][k][0][0]),int(metadata_list[idx]["edge_map"][k][0][1])])
graph_metadata["edge_map"] = edge_map
graph_metadata["etypes"] = metadata_list[0]["etypes"]
graph_metadata["graph_name"] = metadata_list[0]["graph_name"]
graph_metadata["halo_hops"] = metadata_list[0]["halo_hops"]
#Merge global_nodeids from each of json object in the input list
node_map = {}
x = metadata_list[0]["node_map"]
for k in x:
node_map[k] = []
for idx in range(len(metadata_list)):
node_map[k].append([int(metadata_list[idx]["node_map"][k][0][0]), int(metadata_list[idx]["node_map"][k][0][1])])
graph_metadata["node_map"] = node_map
graph_metadata["ntypes"] = metadata_list[0]["ntypes"]
graph_metadata["num_edges"] = int(sum([metadata_list[i]["num_edges"] for i in range(len(metadata_list))]))
graph_metadata["num_nodes"] = int(sum([metadata_list[i]["num_nodes"] for i in range(len(metadata_list))]))
graph_metadata["num_parts"] = metadata_list[0]["num_parts"]
graph_metadata["part_method"] = metadata_list[0]["part_method"]
for i in range(len(metadata_list)):
graph_metadata["part-{}".format(i)] = metadata_list[i]["part-{}".format(i)]
with open('{}/metadata.json'.format(output_dir), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=False, indent=4)
def augment_edge_data(edge_data, part_ids, edge_tids, rank, world_size):
"""
Add partition-id (rank which owns an edge) column to the edge_data.
Parameters:
-----------
edge_data : numpy ndarray
Edge information as read from the xxx_edges.txt file
part_ids : numpy array
array of part_ids indexed by global_nid
"""
#add global_nids to the node_data
etype_offset = {}
offset = 0
for etype_name, tid_range in edge_tids.items():
assert int(tid_range[0][0]) == 0
assert len(tid_range) == world_size
etype_offset[etype_name] = offset + int(tid_range[0][0])
offset += int(tid_range[-1][1])
global_eids = []
for etype_name, tid_range in edge_tids.items():
global_eid_start = etype_offset[etype_name]
begin = global_eid_start + int(tid_range[rank][0])
end = global_eid_start + int(tid_range[rank][1])
global_eids.append(np.arange(begin, end, dtype=np.int64))
global_eids = np.concatenate(global_eids)
assert global_eids.shape[0] == edge_data[constants.ETYPE_ID].shape[0]
edge_data[constants.GLOBAL_EID] = global_eids
#assign the owner process/rank for each edge
edge_data[constants.OWNER_PROCESS] = part_ids[edge_data[constants.GLOBAL_DST_ID]]
def read_edges_file(edge_file, edge_data_dict):
"""
Utility function to read xxx_edges.txt file
Parameters:
-----------
edge_file : string
Graph file for edges in the input graph
Returns:
--------
dictionary
edge data as read from xxx_edges.txt file and columns are stored
in a dictionary with key-value pairs as column-names and column-data.
"""
if edge_file == "" or edge_file == None:
return None
#Read the file from here.
#<global_src_id> <global_dst_id> <type_eid> <etype> <attributes>
# global_src_id -- global idx for the source node ... line # in the graph_nodes.txt
# global_dst_id -- global idx for the destination id node ... line # in the graph_nodes.txt
edge_data_df = csv.read_csv(edge_file, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
edge_data_dict = {}
edge_data_dict[constants.GLOBAL_SRC_ID] = edge_data_df['f0'].to_numpy()
edge_data_dict[constants.GLOBAL_DST_ID] = edge_data_df['f1'].to_numpy()
edge_data_dict[constants.GLOBAL_TYPE_EID] = edge_data_df['f2'].to_numpy()
edge_data_dict[constants.ETYPE_ID] = edge_data_df['f3'].to_numpy()
return edge_data_dict
def read_node_features_file(nodes_features_file):
"""
Utility function to load tensors from a file
Parameters:
-----------
nodes_features_file : string
Features file for nodes in the graph
Returns:
--------
dictionary
mappings between ntype and list of features
"""
node_features = dgl.data.utils.load_tensors(nodes_features_file, False)
return node_features
def read_edge_features_file(edge_features_file):
"""
Utility function to load tensors from a file
Parameters:
-----------
edge_features_file : string
Features file for edges in the graph
Returns:
--------
dictionary
mappings between etype and list of features
"""
edge_features = dgl.data.utils.load_tensors(edge_features_file, True)
return edge_features
def write_node_features(node_features, node_file):
"""
Utility function to serialize node_features in node_file file
Parameters:
-----------
node_features : dictionary
dictionary storing ntype <-> list of features
node_file : string
File in which the node information is serialized
"""
dgl.data.utils.save_tensors(node_file, node_features)
def write_edge_features(edge_features, edge_file):
"""
Utility function to serialize edge_features in edge_file file
Parameters:
-----------
edge_features : dictionary
dictionary storing etype <-> list of features
edge_file : string
File in which the edge information is serialized
"""
dgl.data.utils.save_tensors(edge_file, edge_features)
def write_graph_dgl(graph_file, graph_obj):
"""
Utility function to serialize graph dgl objects
Parameters:
-----------
graph_obj : dgl graph object
graph dgl object, as created in convert_partition.py, which is to be serialized
graph_file : string
File name in which graph object is serialized
"""
dgl.save_graphs(graph_file, [graph_obj])
def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_id):
"""
Wrapper function to create dgl objects for graph, node-features and edge-features
Parameters:
-----------
graph_obj : dgl object
graph dgl object as created in convert_partition.py file
node_features : dgl object
Tensor data for node features
edge_features : dgl object
Tensor data for edge features
output_dir : string
location where the output files will be located
part_id : int
integer indicating the partition-id
"""
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
write_graph_dgl(os.path.join(part_dir ,'graph.dgl'), graph_obj)
if node_features != None:
write_node_features(node_features, os.path.join(part_dir, "node_feat.dgl"))
if (edge_features != None):
write_edge_features(edge_features, os.path.join(part_dir, "edge_feat.dgl"))
def get_idranges(names, counts):
"""
Utility function to compute typd_id/global_id ranges for both nodes and edges.
Parameters:
-----------
names : list of strings
list of node/edge types as strings
counts : list of lists
each list contains no. of nodes/edges in a given chunk
Returns:
--------
dictionary
dictionary where the keys are node-/edge-type names and values are
list of tuples where each tuple indicates the range of values for
corresponding type-ids.
dictionary
dictionary where the keys are node-/edge-type names and value is a tuple.
This tuple indicates the global-ids for the associated node-/edge-type.
"""
gnid_start = 0
gnid_end = gnid_start
tid_dict = {}
gid_dict = {}
for idx, typename in enumerate(names):
type_counts = counts[idx]
tid_start = np.cumsum([0] + type_counts[:-1])
tid_end = np.cumsum(type_counts)
tid_ranges = list(zip(tid_start, tid_end))
type_start = tid_ranges[0][0]
type_end = tid_ranges[-1][1]
gnid_end += tid_ranges[-1][1]
tid_dict[typename] = tid_ranges
gid_dict[typename] = np.array([gnid_start, gnid_end]).reshape([1,2])
gnid_start = gnid_end
return tid_dict, gid_dict
# Requires setting PYTHONPATH=${GITROOT}/tools
import json
import logging
import sys
import os
import numpy as np
import argparse
from utils import setdir
from utils import array_readwriter
def _random_partition(metadata, num_parts):
num_nodes_per_type = [sum(_) for _ in metadata['num_nodes_per_chunk']]
ntypes = metadata['node_type']
for ntype, n in zip(ntypes, num_nodes_per_type):
logging.info('Generating partition for node type %s' % ntype)
parts = np.random.randint(0, num_parts, (n,))
array_readwriter.get_array_parser(name='csv').write(ntype + '.txt', parts)
def random_partition(metadata, num_parts, output_path):
"""
Randomly partition the graph described in metadata and generate partition ID mapping
in :attr:`output_path`.
A directory will be created at :attr:`output_path` containing the partition ID
mapping files named "<node-type>.txt" (e.g. "author.txt", "paper.txt" and
"institution.txt" for OGB-MAG240M). Each file contains one line per node representing
the partition ID the node belongs to.
"""
with setdir(output_path):
_random_partition(metadata, num_parts)
# Run with PYTHONPATH=${GIT_ROOT_DIR}/tools
# where ${GIT_ROOT_DIR} is the directory to the DGL git repository.
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'metadata', type=str, help='input metadata file of the chunked graph format')
parser.add_argument(
'output_path', type=str, help='output directory')
parser.add_argument(
'num_partitions', type=int, help='number of partitions')
logging.basicConfig(level='INFO')
args = parser.parse_args()
with open(args.metadata) as f:
metadata = json.load(f)
output_path = args.output_path
num_parts = args.num_partitions
random_partition(metadata, num_parts, output_path)
from .files import *
from . import array_readwriter
from .registry import register_array_parser, get_array_parser
from . import csv
from . import numpy_array
import logging
import pandas as pd
import pyarrow
import pyarrow.csv
from .registry import register_array_parser
@register_array_parser("csv")
class CSVArrayParser(object):
def __init__(self, delimiter=','):
self.delimiter = delimiter
def read(self, path):
logging.info('Reading from %s using CSV format with configuration %s' % (
path, self.__dict__))
# do not read the first line as header
read_options = pyarrow.csv.ReadOptions(autogenerate_column_names=True)
parse_options = pyarrow.csv.ParseOptions(delimiter=self.delimiter)
arr = pyarrow.csv.read_csv(path, read_options=read_options, parse_options=parse_options)
logging.info('Done reading from %s' % path)
return arr.to_pandas().to_numpy()
def write(self, path, arr):
logging.info('Writing to %s using CSV format with configuration %s' % (
path, self.__dict__))
write_options = pyarrow.csv.WriteOptions(include_header=False, delimiter=self.delimiter)
arr = pyarrow.Table.from_pandas(pd.DataFrame(arr))
pyarrow.csv.write_csv(arr, path, write_options=write_options)
logging.info('Done writing to %s' % path)
import logging
import numpy as np
from numpy.lib.format import open_memmap
from .registry import register_array_parser
@register_array_parser("numpy")
class NumpyArrayParser(object):
def __init__(self):
pass
def read(self, path):
logging.info('Reading from %s using numpy format' % path)
arr = np.load(path, mmap_mode='r')
logging.info('Done reading from %s' % path)
return arr
def write(self, path, arr):
logging.info('Writing to %s using numpy format' % path)
# np.save would load the entire memmap array up into CPU. So we manually open
# an empty npy file with memmap mode and manually flush it instead.
new_arr = open_memmap(path, mode='w+', dtype=arr.dtype, shape=arr.shape)
new_arr[:] = arr[:]
logging.info('Done writing to %s' % path)
REGISTRY = {}
def register_array_parser(name):
def _deco(cls):
REGISTRY[name] = cls
return cls
return _deco
def get_array_parser(**fmt_meta):
cls = REGISTRY[fmt_meta.pop('name')]
return cls(**fmt_meta)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment