Unverified Commit 4b87e47f authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[Distributed Training Pipeline] Initial implementation of Distributed data...


[Distributed Training Pipeline] Initial implementation of Distributed data processing step in the Dis… (#3926)

* Initial implementation of Distributed data processing step in the Distributed Training pipeline

Implemented the following:
1) Read the output of parmetis (node-id to partition-id mappings)
2) Read the original graph files
3) Shuffle the node/edge metadata and features
4) output the partition specific files in DGL format using convert_partition.py functionality
5) Graph meta data is serialized in json format on rank-0 machine.

* Bug Fixes identified during verification of the dataset

1. When sending out global-id lookups for non-local nodes, in the msg_alltoall.py, conditional filter was used to identify the indices in node_data which is incorrect. Replaced the conditional filter with intersect1d to find out the common node ids and appropriate indices which are later used to identify the needed information to communicate.

2. When writing the graph level json file in distributed processing, the edge_offset on non-rank-0 machines was starting from 0 instead of the appropriate offset. Now added code to start the edge(s) from correct starting offset instead of 0 always.

* Restructuring and consolidation of code

1) Fixed issue when running verify_mag_dataset.py, Now we read xxx_removed_edges.txt and add these edges to `edge_data`. This will ensure that the self-loops and duplicate edges are handling appropriately when compared to the original dataset.

2) Consolidated code into a fewer files and changed code to following the python naming convention.

* Code changes addressing code review comments

Following changes are made in this commit.
1) Naming convention is defined and code is changed accordingly. Definition of various global_ids are defined and how to read them is mentioned.
2) All the code review comments are addressed
3)Files are moved to a new directory with dgl/tools directory as per suggestion
4) README.md file is include and it contains detailed information about the Naming convention adopted by the code, high level overview of the algorithm used in data-shuffling, example command-line to use on a single machine.

* addressing github review comments

Made code changes addressing all the review comments from GitHub.

* Addressing latest code review comments

Addressed all the latest code reviewing comments. One of the major changes is treating the node and edge metadata as dictionary objects and removing all the python lists with numpy arrays.

* Update README.md

Text rendering corrections

* Addressed code review comments

Addressed code review comments for the latest code review
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
parent cba465f2
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
import pandas as pd
from pyarrow import csv
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--num-node-weights', required=True, type=int,
help='The number of node weights used by METIS.')
parser.add_argument('--workspace', type=str, default='/tmp',
help='The directory to store the intermediate results')
parser.add_argument('--node-attr-dtype', type=str, default=None,
help='The data type of the node attributes')
parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--removed-edges', help='a file that contains the removed self-loops and duplicated edges',
default=None, type=str)
args = parser.parse_args()
input_dir = args.input_dir
graph_name = args.graph_name
num_parts = args.num_parts
num_node_weights = args.num_node_weights
node_attr_dtype = args.node_attr_dtype
edge_attr_dtype = args.edge_attr_dtype
workspace_dir = args.workspace
output_dir = args.output
self_loop_edges = None
duplicate_edges = None
if args.removed_edges is not None:
removed_file = '{}/{}'.format(input_dir, args.removed_edges)
removed_df = csv.read_csv(removed_file, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
assert removed_df.num_columns == 4
src_id = removed_df['f0'].to_numpy()
dst_id = removed_df['f1'].to_numpy()
orig_id = removed_df['f2'].to_numpy()
etype = removed_df['f3'].to_numpy()
self_loop_idx = src_id == dst_id
not_self_loop_idx = src_id != dst_id
self_loop_edges = [src_id[self_loop_idx], dst_id[self_loop_idx],
orig_id[self_loop_idx], etype[self_loop_idx]]
duplicate_edges = [src_id[not_self_loop_idx], dst_id[not_self_loop_idx],
orig_id[not_self_loop_idx], etype[not_self_loop_idx]]
print('There are {} self-loops and {} duplicated edges in the removed edges'.format(len(self_loop_edges[0]),
len(duplicate_edges[0])))
with open(args.schema) as json_file:
schema = json.load(json_file)
nid_ranges = schema['nid']
eid_ranges = schema['eid']
nid_ranges = {key: np.array(nid_ranges[key]).reshape(
1, 2) for key in nid_ranges}
eid_ranges = {key: np.array(eid_ranges[key]).reshape(
1, 2) for key in eid_ranges}
id_map = dgl.distributed.id_map.IdMap(nid_ranges)
ntypes = [(key, nid_ranges[key][0, 0]) for key in nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
etypes = [(key, eid_ranges[key][0, 0]) for key in eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e: i for i, e in enumerate(etypes)}
def read_feats(file_name):
attrs = csv.read_csv(file_name, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(attrs.columns)
return np.stack([attrs.columns[i].to_numpy() for i in range(num_cols)], 1)
max_nid = np.iinfo(np.int32).max
num_edges = 0
num_nodes = 0
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype: [] for etype in etypes}
for part_id in range(num_parts):
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
node_file = 'p{:03}-{}_nodes.txt'.format(part_id, graph_name)
# The format of each line in the node file:
# <node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
# The node file contains nodes that belong to a partition. It doesn't include HALO nodes.
orig_type_nid_col = 3 + num_node_weights
first_attr_col = 4 + num_node_weights
# Get the first two columns which is the node ID and node type.
tmp_output = workspace_dir + '/' + node_file + '.tmp'
os.system('awk \'{print $1, $2, $' + str(orig_type_nid_col) + '}\''
+ ' {} > {}'.format(input_dir + '/' + node_file, tmp_output))
nodes = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
nids, ntype_ids, orig_type_nid = nodes.columns[0].to_numpy(), nodes.columns[1].to_numpy(), \
nodes.columns[2].to_numpy()
orig_homo_nid = ntype_offset_np[ntype_ids] + orig_type_nid
assert np.all(nids[1:] - nids[:-1] == 1)
nid_range = (nids[0], nids[-1])
num_nodes += len(nodes)
if node_attr_dtype is not None:
# Get node attributes
# Here we just assume all nodes have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode node attributes.
os.system('cut -d\' \' -f {}- {} > {}'.format(first_attr_col,
input_dir + '/' + node_file,
tmp_output))
node_attrs = read_feats(tmp_output)
node_feats = {}
# nodes in a partition has been sorted based on node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
assert np.all(type_nids == np.arange(
type_nids[0], type_nids[-1] + 1))
node_feats[ntype_name +
'/feat'] = th.as_tensor(node_attrs[ntype_ids == ntype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "node_feat.dgl"), node_feats)
# Determine the node ID ranges of different node types.
for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
edge_file = 'p{:03}-{}_edges.txt'.format(part_id, graph_name)
# The format of each line in the edge file:
# <src_id> <dst_id> <orig_src_id> <orig_dst_id> <orig_edge_id> <edge_type> <attributes>
tmp_output = workspace_dir + '/' + edge_file + '.tmp'
os.system('awk \'{print $1, $2, $3, $4, $5, $6}\'' + ' {} > {}'.format(input_dir + '/' + edge_file,
tmp_output))
edges = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(edges.columns)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = [
edges.columns[i].to_numpy() for i in range(num_cols)]
# Let's merge the self-loops and duplicated edges to the partition.
src_id_list, dst_id_list = [src_id], [dst_id]
orig_src_id_list, orig_dst_id_list = [orig_src_id], [orig_dst_id]
orig_edge_id_list, etype_id_list = [orig_edge_id], [etype_ids]
if self_loop_edges is not None and len(self_loop_edges[0]) > 0:
uniq_orig_nids, idx = np.unique(orig_dst_id, return_index=True)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_nids, self_loop_edges[0], return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(dst_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(self_loop_edges[0][common_idx2])
orig_dst_id_list.append(self_loop_edges[0][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(self_loop_edges[2][common_idx2])
etype_id_list.append(self_loop_edges[3][common_idx2])
print('Add {} self-loops in partition {}'.format(len(idx), part_id))
if duplicate_edges is not None and len(duplicate_edges[0]) > 0:
part_ids = orig_src_id.astype(
np.int64) * max_nid + orig_dst_id.astype(np.int64)
uniq_orig_ids, idx = np.unique(part_ids, return_index=True)
duplicate_ids = duplicate_edges[0].astype(
np.int64) * max_nid + duplicate_edges[1].astype(np.int64)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_ids, duplicate_ids, return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(src_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(duplicate_edges[0][common_idx2])
orig_dst_id_list.append(duplicate_edges[1][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(duplicate_edges[2][common_idx2])
etype_id_list.append(duplicate_edges[3][common_idx2])
print('Add {} duplicated edges in partition {}'.format(len(idx), part_id))
src_id = np.concatenate(src_id_list) if len(
src_id_list) > 1 else src_id_list[0]
dst_id = np.concatenate(dst_id_list) if len(
dst_id_list) > 1 else dst_id_list[0]
orig_src_id = np.concatenate(orig_src_id_list) if len(
orig_src_id_list) > 1 else orig_src_id_list[0]
orig_dst_id = np.concatenate(orig_dst_id_list) if len(
orig_dst_id_list) > 1 else orig_dst_id_list[0]
orig_edge_id = np.concatenate(orig_edge_id_list) if len(
orig_edge_id_list) > 1 else orig_edge_id_list[0]
etype_ids = np.concatenate(etype_id_list) if len(
etype_id_list) > 1 else etype_id_list[0]
print('There are {} edges in partition {}'.format(len(src_id), part_id))
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = src_id[sort_idx], dst_id[sort_idx], \
orig_src_id[sort_idx], orig_dst_id[sort_idx], orig_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
if edge_attr_dtype is not None:
# Get edge attributes
# Here we just assume all edges have the same attributes.
# In practice, this is not the same, in which we need more complex solution to
# encode and decode edge attributes.
os.system('cut -d\' \' -f 7- {} > {}'.format(input_dir +
'/' + edge_file, tmp_output))
edge_attrs = th.as_tensor(read_feats(tmp_output))[sort_idx]
edge_feats = {}
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_feats[etype_name +
'/feat'] = th.as_tensor(edge_attrs[etype_ids == etype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "edge_feat.dgl"), edge_feats)
# Determine the edge ID range of different edge types.
edge_id_start = num_edges
for etype_name in eid_ranges:
etype_id = etypes_map[etype_name]
edge_map_val[etype_name].append([int(edge_id_start),
int(edge_id_start + np.sum(etype_ids == etype_id))])
edge_id_start += np.sum(etype_ids == etype_id)
# Here we want to compute the unique IDs in the edge list.
# It is possible that a node that belongs to the partition but it doesn't appear
# in the edge list. That is, the node is assigned to this partition, but its neighbor
# belongs to another partition so that the edge is assigned to another partition.
# This happens in a directed graph.
# To avoid this kind of nodes being removed from the graph, we add node IDs that
# belong to this partition.
ids = np.concatenate(
[src_id, dst_id, np.arange(nid_range[0], nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
local_src_id, local_dst_id = np.split(inverse_idx[:len(src_id) * 2], 2)
compact_g = dgl.graph((local_src_id, local_dst_id))
compact_g.edata['orig_id'] = th.as_tensor(orig_edge_id)
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
compact_g.edata['inner_edge'] = th.ones(
compact_g.number_of_edges(), dtype=th.bool)
# The original IDs are homogeneous IDs.
# Similarly, we need to add the original homogeneous node IDs
orig_ids = np.concatenate([orig_src_id, orig_dst_id, orig_homo_nid])
orig_homo_ids = orig_ids[idx]
ntype, per_type_ids = id_map(orig_homo_ids)
compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids)
compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype)
compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids)
compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(
uniq_ids >= nid_range[0], uniq_ids <= nid_range[1]))
local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()]
assert np.all((local_nids == th.arange(
local_nids[0], local_nids[-1] + 1)).numpy())
print('|V|={}'.format(compact_g.number_of_nodes()))
print('|E|={}'.format(compact_g.number_of_edges()))
# We need to reshuffle nodes in a partition so that all local nodes are labelled starting from 0.
reshuffle_nodes = th.arange(compact_g.number_of_nodes())
reshuffle_nodes = th.cat([reshuffle_nodes[compact_g.ndata['inner_node'].bool()],
reshuffle_nodes[compact_g.ndata['inner_node'] == 0]])
compact_g1 = dgl.node_subgraph(compact_g, reshuffle_nodes)
compact_g1.ndata['orig_id'] = compact_g.ndata['orig_id'][reshuffle_nodes]
compact_g1.ndata[dgl.NTYPE] = compact_g.ndata[dgl.NTYPE][reshuffle_nodes]
compact_g1.ndata[dgl.NID] = compact_g.ndata[dgl.NID][reshuffle_nodes]
compact_g1.ndata['inner_node'] = compact_g.ndata['inner_node'][reshuffle_nodes]
compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]]
compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]]
# reshuffle edges on ETYPE as node_subgraph relabels edges
idx = th.argsort(compact_g1.edata[dgl.ETYPE])
u, v = compact_g1.edges()
u = u[idx]
v = v[idx]
compact_g2 = dgl.graph((u, v))
compact_g2.ndata['orig_id'] = compact_g1.ndata['orig_id']
compact_g2.ndata[dgl.NTYPE] = compact_g1.ndata[dgl.NTYPE]
compact_g2.ndata[dgl.NID] = compact_g1.ndata[dgl.NID]
compact_g2.ndata['inner_node'] = compact_g1.ndata['inner_node']
compact_g2.edata['orig_id'] = compact_g1.edata['orig_id'][idx]
compact_g2.edata[dgl.ETYPE] = compact_g1.edata[dgl.ETYPE][idx]
compact_g2.edata['inner_edge'] = compact_g1.edata['inner_edge'][idx]
compact_g2.edata[dgl.EID] = th.arange(
num_edges, num_edges + compact_g2.number_of_edges())
num_edges += compact_g2.number_of_edges()
dgl.save_graphs(part_dir + '/graph.dgl', [compact_g2])
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
for part_id in range(num_parts):
part_dir = 'part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
### xxx_nodes.txt format
This file is used to provide node information to this framework. Following is the format for each line in this file:
```
<node_type> <weight1> <weight2> <weight3> <weight4> <global_type_node_id> <attributes>
```
where node_type is the type id of this node, weights can be any number of columns as determined by the user, global_type_node_id are the contiguous ids starting from `0` for a particular node_type. And attributes can be any number of columns at the end of each line.
###xxx___edges.txt format
This file is used to provide edge information to this framework. Following is the format for each line in this file:
```
<global_src_id> <global_dst_id> <global_type_edge_id> <edge_type> <attributes>
```
where global_src_id and global_dst_id are two end points of an edge, global_type_edge_id is the unique id assigned to each edge type and are contiguous, and starting from 0, for each edge_type. Attributes can be any number of columns at the end of each line.
###Naming convention
`global_` prefix (for any node or edge ids) indicate that these ids are read from graph input files. These ids are allocated to nodes and edges before `data shuffling`. These ids are globally unique across all partitions.
`shuffle_global_` prefix (for any node or edge ids) indicate that these ids are assigned after the `data shuffling` is completed. These ids are globally unique across all partitions.
`part_local_` prefix (for any node or edge ids) indicate that these ids are assigned after the `data shuffling` and are unique within a given partition.
For instance, if a variable is named as `global_src_id` it means that this id is read from the graph input file and is assumed to be globally unique across all partitions. Similarly if a variable is named `part_local_node_id` then it means that this node_id is assigned after the data shuffling is complete and is unique with a given partition.
###High level description of the algorithm
####Single file format for graph input files
Here we assume that all the nodes' related data is present in one single file and similarly all the edges are in one single file.
In this case following steps are executed to write dgl objects for each partition, as assigned my any partitioning algorithm, for example METIS.
#####Step 1 (Data Loading):
Rank-0 process reads in all the graph files which are xxx_nodes.txt, xxx_edges.txt, node_feats.dgl, edge_feats.dgl and xxx_removed_edges.txt.
Rank-0 process determines the ownership of nodes by using the output of partitioning algorithm (here, we expect the output of partitioning step is a mapping between a node and its partition id for the entire graph). Edge ownership is determined by the `destination` node-id for that edge. Each edge belongs to the partition-id of the destination node-id of each edge.
#####Step 2 (Data Shuffling):
Rank-0 process will send node-data, edge-data, node-features, edge-features to their respective processes by using the ownership rules described in Step-1. Non-Rank-0 processes will receive their own nodes, edges, node-features and edge-features and store them in local data-structures. Upon completion of sending information Rank-0 process will delete nodes, edges, node-features and edge-features which are not owned by rank-0.
#####Step 3 (ID assignment and resolution):
At this time all the ranks will have their own local information in their respective data structures. Then each process will perform the following steps: a) Assign shuffle_global_xxx (here xxx is node_ids and edge_ids) for nodes and edges by performing prefix sum on all ranks. b) Assign part_local_xxx (xxx means node_ids and edge_ids) to nodes and edges so that they can be used to index into the node and edge features, and c) Retrieve shuffle_global_node_ids by using global_node_ids to determine the ownership of any given node. This step is done for the node_ids (present locally on any given rank) for which shuffle_global_node_ids were assigned on a different rank'ed process.
#####Ste 4 (Serialization):
After every rank has global-ids, shuffle_global-ids, part_local-ids for all the nodes and edges present locally, then it proceeds by DGL object creation. Finally Rank-0 process will aggregate graph-level metadata and create a json file with graph-level information.
###How to use this tool
To run this code on a single machine using multiple processes, use the following command
```
python3 data_proc_pipeline.py --world-size 2 --nodes-file mag_nodes.txt --edges-file mag_edges.txt --node-feats-file node_feat.dgl --metis-partitions mag_part.2 --input-dir /home/ubuntu/data --graph-name mag --schema mag.json --num-parts 2 --num-node-weights 4 --workspace /home/ubuntu/data --node-attr-dtype float --output /home/ubuntu/data/outputs --removed-edges mag_removed_edges.txt
```
Above command, assumes that there are `2` partitions and number of node weights are `4`. All other command line arguments are self-explanatory.
GLOBAL_NID = "global_node_id"
GLOBAL_EID = "global_edge_id"
SHUFFLE_GLOBAL_NID = "shuffle_global_node_id"
SHUFFLE_GLOBAL_EID = "shuffle_global_edge_id"
NTYPE_ID = "node_type_id"
ETYPE_ID = "edge_type_id"
GLOBAL_TYPE_NID = "global_type_node_id"
GLOBAL_TYPE_EID = "global_type_edge_id"
GLOBAL_SRC_ID = "global_src_id"
GLOBAL_DST_ID = "global_dst_id"
SHUFFLE_GLOBAL_SRC_ID = "shuffle_global_src_id"
SHUFFLE_GLOBAL_DST_ID = "shuffle_global_dst_id"
OWNER_PROCESS = "owner_proc_id"
PART_LOCAL_NID = "part_local_nid"
import os
import json
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
import pandas as pd
import constants
from pyarrow import csv
def create_dgl_object(graph_name, num_parts, \
schema, part_id, node_data, \
edge_data, nodeid_offset, edgeid_offset):
"""
This function creates dgl objects for a given graph partition, as in function
arguments.
Parameters:
-----------
graph_name : string
name of the graph
num_parts : int
total no. of partitions (of the original graph)
schame : json object
json object created by reading the graph metadata json file
part_id : int
partition id of the graph partition for which dgl object is to be created
node_data : numpy ndarray
node_data, where each row is of the following format:
<global_nid> <ntype_id> <global_type_nid>
edge_data : numpy ndarray
edge_data, where each row is of the following format:
<global_src_id> <global_dst_id> <etype_id> <global_type_eid>
nodeid_offset : int
offset to be used when assigning node global ids in the current partition
edgeid_offset : int
offset to be used when assigning edge global ids in the current partition
return compact_g2, node_map_val, edge_map_val, ntypes_map, etypes_map
Returns:
--------
dgl object
dgl object created for the current graph partition
dictionary
map between node types and the range of global node ids used
dictionary
map between edge types and the range of global edge ids used
dictionary
map between node type(string) and node_type_id(int)
dictionary
map between edge type(string) and edge_type_id(int)
"""
#create auxiliary data structures from the schema object
global_nid_ranges = schema['nid']
global_eid_ranges = schema['eid']
global_nid_ranges = {key: np.array(global_nid_ranges[key]).reshape(
1, 2) for key in global_nid_ranges}
global_eid_ranges = {key: np.array(global_eid_ranges[key]).reshape(
1, 2) for key in global_eid_ranges}
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
ntypes = [(key, global_nid_ranges[key][0, 0]) for key in global_nid_ranges]
ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes]
etypes_map = {e: i for i, e in enumerate(etypes)}
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype: [] for etype in etypes}
shuffle_global_nids, ntype_ids, global_type_nid = node_data[constants.SHUFFLE_GLOBAL_NID], \
node_data[constants.NTYPE_ID], node_data[constants.GLOBAL_TYPE_NID]
global_homo_nid = ntype_offset_np[ntype_ids] + global_type_nid
assert np.all(shuffle_global_nids[1:] - shuffle_global_nids[:-1] == 1)
shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1])
# Determine the node ID ranges of different node types.
for ntype_name in global_nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = shuffle_global_nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
#process edges
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_TYPE_EID], edge_data[constants.ETYPE_ID]
print('There are {} edges in partition {}'.format(len(shuffle_global_src_id), part_id))
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids)
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
shuffle_global_src_id[sort_idx], shuffle_global_dst_id[sort_idx], global_src_id[sort_idx], \
global_dst_id[sort_idx], global_edge_id[sort_idx], etype_ids[sort_idx]
assert np.all(np.diff(etype_ids) >= 0)
# Determine the edge ID range of different edge types.
edge_id_start = edgeid_offset
for etype_name in global_eid_ranges:
etype_id = etypes_map[etype_name]
edge_map_val[etype_name].append([edge_id_start,
edge_id_start + np.sum(etype_ids == etype_id)])
edge_id_start += np.sum(etype_ids == etype_id)
# Here we want to compute the unique IDs in the edge list.
# It is possible that a node that belongs to the partition but it doesn't appear
# in the edge list. That is, the node is assigned to this partition, but its neighbor
# belongs to another partition so that the edge is assigned to another partition.
# This happens in a directed graph.
# To avoid this kind of nodes being removed from the graph, we add node IDs that
# belong to this partition.
ids = np.concatenate(
[shuffle_global_src_id, shuffle_global_dst_id, np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
part_local_src_id, part_local_dst_id = np.split(inverse_idx[:len(shuffle_global_src_id) * 2], 2)
compact_g = dgl.graph((part_local_src_id, part_local_dst_id))
compact_g.edata['orig_id'] = th.as_tensor(global_edge_id)
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
compact_g.edata['inner_edge'] = th.ones(
compact_g.number_of_edges(), dtype=th.bool)
# The original IDs are homogeneous IDs.
# Similarly, we need to add the original homogeneous node IDs
global_nids = np.concatenate([global_src_id, global_dst_id, global_homo_nid])
global_homo_ids = global_nids[idx]
ntype, per_type_ids = id_map(global_homo_ids)
compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids)
compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype)
compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids)
compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(
uniq_ids >= shuffle_global_nid_range[0], uniq_ids <= shuffle_global_nid_range[1]))
part_local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()]
assert np.all((part_local_nids == th.arange(
part_local_nids[0], part_local_nids[-1] + 1)).numpy())
print('|V|={}'.format(compact_g.number_of_nodes()))
print('|E|={}'.format(compact_g.number_of_edges()))
# We need to reshuffle nodes in a partition so that all local nodes are labelled starting from 0.
reshuffle_nodes = th.arange(compact_g.number_of_nodes())
reshuffle_nodes = th.cat([reshuffle_nodes[compact_g.ndata['inner_node'].bool()],
reshuffle_nodes[compact_g.ndata['inner_node'] == 0]])
compact_g1 = dgl.node_subgraph(compact_g, reshuffle_nodes)
compact_g1.ndata['orig_id'] = compact_g.ndata['orig_id'][reshuffle_nodes]
compact_g1.ndata[dgl.NTYPE] = compact_g.ndata[dgl.NTYPE][reshuffle_nodes]
compact_g1.ndata[dgl.NID] = compact_g.ndata[dgl.NID][reshuffle_nodes]
compact_g1.ndata['inner_node'] = compact_g.ndata['inner_node'][reshuffle_nodes]
compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]]
compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]]
# reshuffle edges on ETYPE as node_subgraph relabels edges
idx = th.argsort(compact_g1.edata[dgl.ETYPE])
u, v = compact_g1.edges()
u = u[idx]
v = v[idx]
compact_g2 = dgl.graph((u, v))
compact_g2.ndata['orig_id'] = compact_g1.ndata['orig_id']
compact_g2.ndata[dgl.NTYPE] = compact_g1.ndata[dgl.NTYPE]
compact_g2.ndata[dgl.NID] = compact_g1.ndata[dgl.NID]
compact_g2.ndata['inner_node'] = compact_g1.ndata['inner_node']
compact_g2.edata['orig_id'] = compact_g1.edata['orig_id'][idx]
compact_g2.edata[dgl.ETYPE] = compact_g1.edata[dgl.ETYPE][idx]
compact_g2.edata['inner_edge'] = compact_g1.edata['inner_edge'][idx]
compact_g2.edata[dgl.EID] = th.arange(
edgeid_offset, edgeid_offset + compact_g2.number_of_edges(), dtype=th.int64)
edgeid_offset += compact_g2.number_of_edges()
return compact_g2, node_map_val, edge_map_val, ntypes_map, etypes_map
def create_metadata_json(graph_name, num_nodes, num_edges, num_parts, node_map_val, \
edge_map_val, ntypes_map, etypes_map, output_dir ):
"""
Auxiliary function to create json file for the graph partition metadata
Parameters:
-----------
graph_name : string
name of the graph
num_nodes : int
no. of nodes in the graph partition
num_edges : int
no. of edges in the graph partition
num_parts : int
total no. of partitions of the original graph
node_map_val : dictionary
map between node types and the range of global node ids used
edge_map_val : dictionary
map between edge types and the range of global edge ids used
ntypes_map : dictionary
map between node type(string) and node_type_id(int)
etypes_map : dictionary
map between edge type(string) and edge_type_id(int)
output_dir : string
directory where the output files are to be stored
Returns:
--------
dictionary
map describing the graph information
"""
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
for part_id in range(num_parts):
part_dir = 'part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
return part_metadata
import argparse
import numpy as np
import torch.multiprocessing as mp
from initialize import proc_exec, single_dev_init, multi_dev_init
def log_params(params):
""" Print all the command line arguments for debugging purposes.
Parameters:
-----------
params: argparse object
Argument Parser structure listing all the pre-defined parameters
"""
print('Input Dir: ', params.input_dir)
print('Graph Name: ', params.graph_name)
print('Schema File: ', params.schema)
print('No. partitions: ', params.num_parts)
print('No. node weights: ', params.num_node_weights)
print('Workspace dir: ', params.workspace)
print('Node Attr Type: ', params.node_attr_dtype)
print('Edge Attr Dtype: ', params.edge_attr_dtype)
print('Output Dir: ', params.output)
print('Removed Edges File: ', params.removed_edges)
print('WorldSize: ', params.world_size)
print('Nodes File: ', params.nodes_file)
print('Edges File: ', params.edges_file)
print('Node feats: ', params.node_feats_file)
print('Edge feats: ', params.edge_feats_file)
print('Metis partitions: ', params.partitions_file)
print('Exec Type: ', params.exec_type)
def start_local_run(params):
""" Main function for distributed implementation on a single machine
Parameters:
-----------
params : argparser object
Argument Parser structure with pre-determined arguments as defined
at the bottom of this file.
"""
log_params(params)
processes = []
mp.set_start_method("spawn")
#Invoke `target` function from each of the spawned process for distributed
#implementation
for rank in range(params.world_size):
p = mp.Process(target=single_dev_init, args=(rank, params.world_size, proc_exec, params))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
"""
Start of execution from this point.
Invoke the appropriate function to begin execution
"""
#arguments which are already needed by the existing implementation of convert_partition.py
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--num-node-weights', required=True, type=int,
help='The number of node weights used by METIS.')
parser.add_argument('--workspace', type=str, default='/tmp',
help='The directory to store the intermediate results')
parser.add_argument('--node-attr-dtype', type=str, default=None,
help='The data type of the node attributes')
parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--removed-edges', help='a file that contains the removed self-loops and duplicated edges',
default=None, type=str)
parser.add_argument('--exec-type', type=int, default=0,
help='Use 0 for single machine run and 1 for distributed execution')
#arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn',
default=1, type=int, required=True)
parser.add_argument('--nodes-file', help='filename of the nodes metadata',
default=None, type=str, required=True)
parser.add_argument('--edges-file', help='filename of the nodes metadata',
default=None, type=str, required=True)
parser.add_argument('--node-feats-file', help='filename of the nodes features',
default=None, type=str, required=True)
parser.add_argument('--edge-feats-file', help='filename of the nodes metadata',
default=None, type=str )
parser.add_argument('--partitions-file', help='filename of the output of dgl_part2 (metis partitions)',
default=None, type=str)
params = parser.parse_args()
#invoke the starting function here.
if(params.exec_type == 0):
start_local_run(params)
else:
multi_dev_init(params)
import numpy as np
import torch
import operator
import itertools
import constants
from gloo_wrapper import allgather_sizes, alltoall_cpu, alltoallv_cpu
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
"""
For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
is not present at the current rank, this function retrieves their shuffle_global_ids from the owner rank
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total no. of ranks configured
global_nids_ranks : list
list of numpy arrays (of global_nids), index of the list is the rank of the process
where global_nid <-> shuffle_global_nid mapping is located.
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays
Returns:
--------
numpy ndarray
where the column-0 are global_nids and column-1 are shuffle_global_nids which are retrieved
from other processes.
"""
#build a list of sizes (lengths of lists)
sizes = [len(x) for x in global_nids_ranks]
#compute total_nodes whose mappings should be resolved, between orig-node-id <-> global-id
total_nodes = np.sum(sizes)
if (total_nodes == 0):
print('Rank: ', rank, ' -- All mappings are present locally... No need for to send any info.')
return None
#determine the no. of global_node_ids to send and receive and perform alltoall
send_counts = list(torch.Tensor(sizes).type(dtype=torch.int64).chunk(world_size))
recv_counts = list(torch.zeros([world_size], dtype=torch.int64).chunk(world_size))
alltoall_cpu(rank, world_size, recv_counts, send_counts)
#allocate buffers to receive node-ids
recv_nodes = []
for i in recv_counts:
recv_nodes.append(torch.zeros([i.item()], dtype=torch.int64))
#form the outgoing message
send_nodes = []
for i in range(world_size):
send_nodes.append(torch.Tensor(global_nids_ranks[i]).type(dtype=torch.int64))
#send-recieve messages
alltoallv_cpu(rank, world_size, recv_nodes, send_nodes)
# allocate buffers to receive global-ids
recv_shuffle_global_nids = []
for i in sizes:
recv_shuffle_global_nids.append(torch.zeros((i), dtype=torch.int64))
# Use node_data to lookup global id to send over.
send_nodes = []
for proc_i_nodes in recv_nodes:
#list of node-ids to lookup
global_nids = proc_i_nodes.numpy()
if (len(global_nids) != 0):
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
values = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
send_nodes.append(torch.Tensor(values).type(dtype=torch.int64))
else:
send_nodes.append(torch.empty((0,), dtype=torch.int64))
#send receive global-ids
alltoallv_cpu(rank, world_size, recv_shuffle_global_nids, send_nodes)
shuffle_global_nids = [x.numpy() for x in recv_shuffle_global_nids]
global_nids = [x for x in global_nids_ranks]
return np.column_stack((np.concatenate(global_nids), np.concatenate(shuffle_global_nids)))
def get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, node_data):
"""
Edges which are owned by this rank, may have global_nids whose shuffle_global_nids are NOT present locally.
This function retrieves shuffle_global_nids for such global_nids.
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total no. of processes used
edge_data : numpy ndarray
edge_data (augmented) as read from the xxx_edges.txt file
node_part_ids : numpy array
list of partition ids indexed by global node ids.
node_data : dictionary
node_data, is a dictionary with keys as column_names and values as numpy arrays
"""
#determine unique node-ids present locally
global_nids = np.sort(np.unique(np.concatenate((edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID], node_data[constants.GLOBAL_NID]))))
#determine the rank which owns orig-node-id <-> partition/rank mappings
part_ids = node_part_ids[global_nids]
#form list of lists, each list includes global_nids whose mappings (shuffle_global_nids) needs to be retrieved.
#and rank will be the process which owns mappings of these global_nids
global_nids_ranks = []
for i in range(world_size):
if (i == rank):
global_nids_ranks.append(np.empty(shape=(0)))
continue
#not_owned_nodes = part_ids[:,0][part_ids[:,1] == i]
not_owned_node_ids = np.where(part_ids == i)[0]
if not_owned_node_ids.shape[0] == 0:
not_owned_nodes = np.empty(shape=(0))
else:
not_owned_nodes = global_nids[not_owned_node_ids]
global_nids_ranks.append(not_owned_nodes)
#Retrieve Global-ids for respective node owners
resolved_global_nids = get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data)
#Add global_nid <-> shuffle_global_nid mappings to the received data
for i in range(world_size):
if (i == rank):
own_node_ids = np.where(part_ids == i)[0]
own_global_nids = global_nids[own_node_ids]
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], own_global_nids, return_indices=True)
my_shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
local_mappings = np.column_stack((own_global_nids, my_shuffle_global_nids))
resolved_global_nids = np.concatenate((resolved_global_nids, local_mappings))
#form a dictionary of mappings between orig-node-ids and global-ids
resolved_mappings = dict(zip(resolved_global_nids[:,0], resolved_global_nids[:,1]))
#determine global-ids for the orig-src-id and orig-dst-id
shuffle_global_src_id = [resolved_mappings[x] for x in edge_data[constants.GLOBAL_SRC_ID]]
shuffle_global_dst_id = [resolved_mappings[x] for x in edge_data[constants.GLOBAL_DST_ID]]
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID] = np.array(shuffle_global_src_id, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = np.array(shuffle_global_dst_id, dtype=np.int64)
def assign_shuffle_global_nids_nodes(rank, world_size, node_data):
"""
Utility function to assign shuffle global ids to nodes at a given rank
node_data gets converted from [ntype, global_type_nid, global_nid]
to [shuffle_global_nid, ntype, global_type_nid, global_nid, part_local_type_nid]
where shuffle_global_nid : global id of the node after data shuffle
ntype : node-type as read from xxx_nodes.txt
global_type_nid : node-type-id as read from xxx_nodes.txt
global_nid : node-id as read from xxx_nodes.txt, implicitly
this is the line no. in the file
part_local_type_nid : type_nid assigned by the current rank within its scope
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
total number of processes used in the process group
ntype_counts: list of tuples
list of tuples (x,y), where x=ntype and y=no. of nodes whose shuffle_global_nids are needed
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays
"""
# Compute prefix sum to determine node-id offsets
prefix_sum_nodes = allgather_sizes([node_data[constants.GLOBAL_NID].shape[0]], world_size)
# assigning node-ids from localNodeStartId to (localNodeEndId - 1)
# Assuming here that the nodeDataArr is sorted based on the nodeType.
shuffle_global_nid_start = prefix_sum_nodes[rank]
shuffle_global_nid_end = prefix_sum_nodes[rank + 1]
# add a column with global-ids (after data shuffle)
shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64)
node_data[constants.SHUFFLE_GLOBAL_NID] = shuffle_global_nids
def assign_shuffle_global_nids_edges(rank, world_size, edge_data):
"""
Utility function to assign shuffle_global_eids to edges
edge_data gets converted from [global_src_nid, global_dst_nid, global_type_eid, etype]
to [shuffle_global_src_nid, shuffle_global_dst_nid, global_src_nid, global_dst_nid, global_type_eid, etype]
Parameters:
-----------
rank : integer
rank of the current process
world_size : integer
total count of processes in execution
etype_counts : list of tuples
list of tuples (x,y), x = rank, y = no. of edges
edge_data : numpy ndarray
edge data as read from xxx_edges.txt file
Returns:
--------
integer
shuffle_global_eid_start, which indicates the starting value from which shuffle_global-ids are assigned to edges
on this rank
"""
#get prefix sum of edge counts per rank to locate the starting point
#from which global-ids to edges are assigned in the current rank
prefix_sum_edges = allgather_sizes([edge_data[constants.GLOBAL_SRC_ID].shape[0]], world_size)
shuffle_global_eid_start = prefix_sum_edges[rank]
shuffle_global_eid_end = prefix_sum_edges[rank + 1]
# assigning edge-ids from localEdgeStart to (localEdgeEndId - 1)
# Assuming here that the edge_data is sorted by edge_type
shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_EID] = shuffle_global_eids
return shuffle_global_eid_start
import numpy as np
import torch
import torch.distributed as dist
def allgather_sizes(send_data, world_size):
"""
Perform all gather on list lengths, used to compute prefix sums
to determine the offsets on each ranks. This is used to allocate
global ids for edges/nodes on each ranks.
Parameters
----------
send_data : numpy array
Data on which allgather is performed.
world_size : integer
No. of processes configured for execution
Returns :
---------
numpy array
array with the prefix sum
"""
#compute the length of the local data
send_length = len(send_data)
out_tensor = torch.as_tensor(send_data, dtype=torch.int64)
in_tensor = [torch.zeros(send_length, dtype=torch.int64)
for _ in range(world_size)]
#all_gather message
dist.all_gather(in_tensor, out_tensor)
#gather sizes in on array to return to the invoking function
rank_sizes = np.zeros(world_size + 1)
count = rank_sizes[0]
for i, t in enumerate(in_tensor):
count += t.item()
rank_sizes[i+1] = count
return rank_sizes
def alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
"""
Each process scatters list of input tensors to all processes in a cluster
and return gathered list of tensors in output list. The tensors should have the same shape.
Parameters
----------
rank : int
The rank of current worker
world_size : int
The size of the entire
output_tensor_list : List of tensor
The received tensors
input_tensor_list : List of tensor
The tensors to exchange
"""
input_tensor_list = [tensor.to(torch.device('cpu')) for tensor in input_tensor_list]
for i in range(world_size):
dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)
def alltoallv_cpu(rank, world_size, output_tensor_list, input_tensor_list):
"""
Each process scatters list of input tensors to all processes in a cluster
and return gathered list of tensors in output list.
Parameters
----------
rank : int
The rank of current worker
world_size : int
The size of the entire
output_tensor_list : List of tensor
The received tensors
input_tensor_list : List of tensor
The tensors to exchange
"""
# send tensor to each target trainer using torch.distributed.isend
# isend is async
senders = []
for i in range(world_size):
if i == rank:
output_tensor_list[i] = input_tensor_list[i].to(torch.device('cpu'))
else:
sender = dist.isend(input_tensor_list[i].to(torch.device('cpu')), dst=i)
senders.append(sender)
for i in range(world_size):
if i != rank:
dist.recv(output_tensor_list[i], src=i)
torch.distributed.barrier()
def gather_metadata_json(metadata, rank, world_size):
"""
Gather an object (json schema on `rank`)
Parameters:
-----------
metadata : json dictionary object
json schema formed on each rank with graph level data.
This will be used as input to the distributed training in the later steps.
Returns:
--------
list : list of json dictionary objects
The result of the gather operation, which is the list of json dicitonary
objects from each rank in the world
"""
#Populate input obj and output obj list on rank-0 and non-rank-0 machines
input_obj = None if rank == 0 else metadata
output_objs = [None for _ in range(world_size)] if rank == 0 else None
#invoke the gloo method to perform gather on rank-0
dist.gather_object(input_obj, output_objs, dst=0)
return output_objs
import os
import json
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import constants
from timeit import default_timer as timer
from datetime import timedelta
from utils import augment_node_data, augment_edge_data,\
read_nodes_file, read_edges_file,\
read_node_features_file, read_edge_features_file,\
read_partitions_file, read_json,\
get_node_types, write_metadata_json, write_dgl_objects
from globalids import assign_shuffle_global_nids_nodes, assign_shuffle_global_nids_edges,\
get_shuffle_global_nids_edges
from gloo_wrapper import gather_metadata_json
from convert_partition import create_dgl_object, create_metadata_json
def send_node_data(rank, node_data, part_ids):
"""
Function to send node_data to non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
node_data : numpy ndarray
node_data, in the augmented form
part_ids : python list
list of unique ranks/partition-ids
"""
for part_id in part_ids:
if part_id == rank:
continue
#extract <node_type>, <global_type_nid>, <global_nid>
#which belong to `part_id`
send_data_idx = (node_data[constants.OWNER_PROCESS] == part_id)
idx = send_data_idx.reshape(node_data[constants.GLOBAL_NID].shape[0])
filt_data = np.column_stack((node_data[constants.NTYPE_ID], \
node_data[constants.GLOBAL_TYPE_NID], \
node_data[constants.GLOBAL_NID]))
filt_data = filt_data[idx == 1]
#prepare tensor to send
send_size = filt_data.shape
size_tensor = torch.tensor(filt_data.shape, dtype=torch.int64)
# Send size first, so that the part-id (rank)
# can create appropriately sized buffers
dist.send(size_tensor, dst=part_id)
#send actual node_data to part-id rank
start = timer()
send_tensor = torch.from_numpy(filt_data.astype(np.int64))
dist.send(send_tensor, dst=part_id)
end = timer()
print('Rank: ', rank, ' Sent data size: ', filt_data.shape, \
', to Process: ', part_id, 'in: ', timedelta(seconds = end - start))
def send_edge_data(rank, edge_data, part_ids):
"""
Function to send edge data to non-rank-0 processes
Parameters:
-----------
rank : integer
rank of the process
edge_data : numpy ndarray
edge_data, in the augmented form
part_ids : python list
list of unique ranks/partition-ids
"""
for part_id in part_ids:
if part_id == rank:
continue
#extract global_sid, global_dit, global_type_eid, etype_id
send_data = (edge_data[constants.OWNER_PROCESS] == part_id)
idx = send_data.reshape(edge_data[constants.GLOBAL_SRC_ID].shape[0])
filt_data = np.column_stack((edge_data[constants.GLOBAL_SRC_ID][idx == 1], \
edge_data[constants.GLOBAL_DST_ID][idx == 1], \
edge_data[constants.GLOBAL_TYPE_EID][idx == 1], \
edge_data[constants.ETYPE_ID][idx == 1]))
#send shape
send_size = filt_data.shape
size_tensor = torch.tensor(filt_data.shape, dtype=torch.int64)
# Send size first, so that the rProc can create appropriately sized tensor
dist.send(size_tensor, dst=part_id)
start = timer()
send_tensor = torch.from_numpy(filt_data)
dist.send(send_tensor, dst=part_id)
end = timer()
print('Rank: ', rank, ' Time to send Edges to proc: ', part_id, \
' is : ', timedelta(seconds = end - start))
def send_node_features(rank, node_data, node_features, part_ids, ntype_map):
"""
Function to send node_features data to non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
node_data : numpy ndarray of int64
node_data, read from the xxx_nodes.txt file
node_features : numpy ndarray of floats
node_features, data from the node_feats.dgl
part_ids : list
list of unique ranks/partition-ids
ntype_map : dictionary
mappings between ntype_name -> ntype
"""
node_features_out = []
for part_id in part_ids:
if part_id == rank:
node_features_out.append(None)
continue
part_node_features = {}
for ntype_name, ntype in ntype_map.items():
if (ntype_name +'/feat' in node_features) and (node_features[ntype_name+'/feat'].shape[0] > 0):
#extract orig_type_node_id
idx = (node_data[constants.OWNER_PROCESS] == part_id) & (node_data[constants.NTYPE_ID] == ntype)
filt_global_type_nids = node_data[constants.GLOBAL_TYPE_NID][idx] # extract global_ntype_id here
part_node_features[ntype_name+'/feat'] = node_features[ntype_name+'/feat'][filt_global_type_nids]
#accumulate subset of node_features targetted for part-id rank
node_features_out.append(part_node_features)
#send data
output_list = [None]
start = timer ()
dist.scatter_object_list(output_list, node_features_out, src=0)
end = timer ()
print('Rank: ', rank, ', Done sending Node Features to: ', part_id, \
' in: ', timedelta(seconds = end - start))
def send_data(rank, node_data, node_features, edge_data, node_part_ids, ntypes_map):
"""
Wrapper function to send graph data to non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
node_data : numpy ndarray
node_data, augmented, from xxx_nodes.txt file
node_features : numpy ndarray
node_features, data from the node_feats.dgl
edge_data : numpy ndarray
edge_data, augmented, from xxx_edges.txt file
node_part_ids : ndarray
array of part_ids indexed by global_nid
ntype_map : dictionary
mappings between ntype_name -> ntype_id
"""
part_ids = np.unique(node_part_ids)
part_ids.sort ()
print('Rank: ', rank, ', Unique partitions: ', part_ids)
send_node_data(rank, node_data, part_ids)
send_edge_data(rank, edge_data, part_ids)
send_node_features(rank, node_data, node_features, part_ids, ntypes_map)
def recv_data(rank, shape, dtype):
"""
Auxiliary function to receive a multi-dimensional tensor, used by the
non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
shape : tuple of integers
shape of the received data
dtype : integer
type of the received data
Returns:
--------
numpy array
received data after completing 'recv' gloo primitive.
"""
#First receive the size of the data to be received from rank-0 process
recv_tensor_shape = torch.zeros(shape, dtype=torch.int64)
dist.recv(recv_tensor_shape, src=0)
recv_shape = list(map(lambda x: int(x), recv_tensor_shape))
#Receive the data message here for nodes here.
recv_tensor_data = torch.zeros(recv_shape, dtype=dtype)
dist.recv(recv_tensor_data, src=0)
return recv_tensor_data.numpy()
def recv_node_data(rank, shape, dtype):
"""
Function to receive node_data, used by non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
shape : tuple of integers
shape of the received data
dtype : integer
type of the received data
Returns:
--------
numpy array
result of the 'recv' gloo primitive
"""
return recv_data(rank, shape, dtype)
def recv_edge_data(rank, shape, dtype):
"""
Function to receive edge_data, used by non-rank0 processes.
Parameters:
-----------
rank : integer
rank of the process
shape : tuple of integers
shape of the received data
dtype : integer
type of the received data
Returns:
--------
numpy array
result of the 'recv' operation
"""
return recv_data(rank, shape, dtype)
def recv_node_features_obj(rank, world_size):
"""
Function to receive node_feautres as an object, as read from the node_feats.dgl file.
This is used by non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
no. of processes used
Returns:
--------
numpy ndarray
node_feature data, of floats, as received by the scatter_object_list function
"""
send_objs = [None for _ in range(world_size)]
recv_obj = [None]
dist.scatter_object_list(recv_obj, send_objs, src=0)
node_features = recv_obj[0]
return node_features
def read_graph_files(rank, params, node_part_ids):
"""
Read the files and return the data structures
Node data as read from files, which is in the following format:
<node_type> <weight1> <weight2> <weight3> <weight4> <global_type_nid> <attributes>
is converted to
<node_type> <weight1> <weight2> <weight3> <weight4> <global_type_nid> <nid> <recv_proc>
Edge data as read from files, which is in the following format:
<global_src_id> <global_dst_id> <global_type_eid> <edge_type> <attributes>
is converted to the following format in this function:
<global_src_id> <global_dst_id> <global_type_eid> <edge_type> <recv_proc>
Parameters:
-----------
rank : integer
rank of the process
params : argparser object
argument parser data structure to access command line arguments
node_part_ids : numpy array
array of part_ids indexed by global_nid
Returns:
--------
numpy ndarray
integer node_data with additional columns added
numpy ndarray
floats, node_features as read from the node features file
numpy ndarray
integer, edge_data with additional columns added
numpy ndarray
floats, edge_features are read from the edge feature file
"""
node_data = read_nodes_file(params.input_dir+'/'+params.nodes_file)
augment_node_data(node_data, node_part_ids)
print('Rank: ', rank, ', Completed loading nodes data: ', node_data[constants.GLOBAL_TYPE_NID].shape)
edge_data = read_edges_file(params.input_dir+'/'+params.edges_file, None)
print('Rank: ', rank, ', Completed loading edge data: ', edge_data[constants.GLOBAL_SRC_ID].shape)
edge_data = read_edges_file(params.input_dir+'/'+params.removed_edges, edge_data)
augment_edge_data(edge_data, node_part_ids)
print('Rank: ', rank, ', Completed adding removed edges : ', edge_data[constants.GLOBAL_SRC_ID].shape)
node_features = {}
node_features = read_node_features_file( params.input_dir+'/'+params.node_feats_file )
print('Rank: ', rank, ', Completed loading node features reading from file ', len(node_features))
edge_features = {}
#edge_features = read_edge_features_file( params.input_dir+'/'+params.edge_feats_file )
#print( 'Rank: ', rank, ', Completed edge features reading from file ', len(edge_features) )
return node_data, node_features, edge_data, edge_features
def proc_exec(rank, world_size, params):
"""
`main` function for each rank in the distributed implementation.
This function is used when one-machine is used for executing the entire pipeline.
In this case, all the gloo-processes will exist on the same machine. Also, this function
expects that the graph input files are in single file-format. Nodes, edges, node-features and
edge features each will have their own file describing the appropriate parts of the input graph.
Parameters:
-----------
rank : integer
rank of the current process
world_size : integer
total no. of ranks
params : argparser object
argument parser structure to access values passed from command line
"""
#Read METIS partitions
node_part_ids = read_partitions_file(params.input_dir+'/'+params.partitions_file)
print('Rank: ', rank, ', Completed loading metis partitions: ', len(node_part_ids))
#read graph schema, get ntype_map(dict for ntype to ntype-id lookups) and ntypes list
schema_map = read_json(params.input_dir+'/'+params.schema)
ntypes_map, ntypes = get_node_types(schema_map)
# Rank-0 process will read the graph input files (nodes, edges, node-features and edge-features).
# it will uses metis partitions, node-id to partition-id mappings, to determine the node and edge
# ownership and sends out data to all the other non rank-0 processes.
if rank == 0:
#read input graph files
node_data, node_features, edge_data, edge_features = read_graph_files(rank, params, node_part_ids)
# order node_data by node_type before extracting node features.
# once this is ordered, node_features are automatically ordered and
# can be assigned contiguous ids starting from 0 for each type.
#node_data = node_data[node_data[:, 0].argsort()]
sorted_idx = node_data[constants.NTYPE_ID].argsort()
for k, v in node_data.items():
node_data[k] = v[sorted_idx]
print('Rank: ', rank, ', node_data: ', len(node_data))
print('Rank: ', rank, ', node_features: ', len(node_features))
print('Rank: ', rank, ', edge_data: ', len(edge_data))
#print('Rank: ', rank, ', edge_features : ',len( edge_features))
print('Rank: ', rank, ', partitions : ', len(node_part_ids))
# shuffle data
send_data(rank, node_data, node_features, edge_data, node_part_ids, ntypes_map)
#extract features here for rank-0
for name, ntype_id in ntypes_map.items():
ntype = name + '/feat'
if(ntype in node_features):
idx = node_data[constants.GLOBAL_TYPE_NID][(node_data[constants.NTYPE_ID] == ntype_id) & (node_data[constants.OWNER_PROCESS] == rank)]
node_features[ntype] = node_features[ntype][idx]
# Filter data owned by rank-0
#extract only ntype, global_type_nid, global_nid
idx = np.where(node_data[constants.OWNER_PROCESS] == 0)[0]
for k, v in node_data.items():
node_data[k] = v[idx]
#extract only global_src_id, global_dst_id, global_type_eid etype
idx = np.where(edge_data[constants.OWNER_PROCESS] == 0)[0]
for k, v in edge_data.items():
edge_data[k] = v[idx]
else:
#Non-rank-0 processes, receives nodes, edges, node-features and edge feautres from rank-0
# process and creates appropriate data structures.
rcvd_node_data = recv_node_data(rank, 2, torch.int64)
node_data = {}
node_data[constants.NTYPE_ID] = rcvd_node_data[:,0]
node_data[constants.GLOBAL_TYPE_NID] = rcvd_node_data[:,1]
node_data[constants.GLOBAL_NID] = rcvd_node_data[:,2]
rcvd_edge_data = recv_edge_data(rank, 2, torch.int64)
edge_data = {}
edge_data[constants.GLOBAL_SRC_ID] = rcvd_edge_data[:,0]
edge_data[constants.GLOBAL_DST_ID] = rcvd_edge_data[:,1]
edge_data[constants.GLOBAL_TYPE_EID] = rcvd_edge_data[:,2]
edge_data[constants.ETYPE_ID] = rcvd_edge_data[:,3]
node_features = recv_node_features_obj(rank, world_size)
edge_features = {}
# From this point onwards, all the processes will follow the same execution logic and
# process the data which is owned by the current process.
# At this time, all the processes will have all the data which it owns (nodes, edges,
# node-features and edge-features).
#syncronize
dist.barrier()
# assign shuffle_global ids to nodes
assign_shuffle_global_nids_nodes(rank, world_size, node_data)
print('Rank: ', rank, ' Done assign Global ids to nodes...')
#sort edge_data by etype
sorted_idx = edge_data[constants.ETYPE_ID].argsort()
for k, v in edge_data.items():
edge_data[k] = v[sorted_idx]
# assign shuffle_global ids to edges
shuffle_global_eid_start = assign_shuffle_global_nids_edges(rank, world_size, edge_data)
print('Rank: ', rank, ' Done assign Global ids to edges...')
# resolve shuffle_global ids for nodes which are not locally owned
get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, node_data)
print('Rank: ', rank, ' Done retrieving Global Node Ids for non-local nodes... ')
#create dgl objects
print('Rank: ', rank, ' Creating DGL objects for all partitions')
num_nodes = 0
num_edges = shuffle_global_eid_start
with open('{}/{}'.format(params.input_dir, params.schema)) as json_file:
schema = json.load(json_file)
graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map = create_dgl_object(\
params.graph_name, params.num_parts, \
schema, rank, node_data, edge_data, num_nodes, num_edges)
write_dgl_objects(graph_obj, node_features, edge_features, params.output, rank)
#get the meta-data
json_metadata = create_metadata_json(params.graph_name, num_nodes, num_edges, params.num_parts, ntypes_map_val, \
etypes_map_val, ntypes_map, etypes_map, params.output)
if (rank == 0):
#get meta-data from all partitions and merge them on rank-0
metadata_list = gather_metadata_json(json_metadata, rank, world_size)
metadata_list[0] = json_metadata
write_metadata_json(metadata_list, params.output, params.graph_name)
else:
#send meta-data to Rank-0 process
gather_metadata_json(json_metadata, rank, world_size)
def single_dev_init(rank, world_size, func_exec, params, backend="gloo"):
"""
Init. function which is run by each process in the Gloo ProcessGroup
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
number of processes configured in the Process Group
proc_exec : function name
function which will be invoked which has the logic for each process in the group
params : argparser object
argument parser object to access the command line arguments
backend : string
string specifying the type of backend to use for communication
"""
os.environ["MASTER_ADDR"] = '127.0.0.1'
os.environ["MASTER_PORT"] = '29500'
#create Gloo Process Group
dist.init_process_group(backend, rank=rank, world_size=world_size)
#Invoke the main function to kick-off each process
func_exec(rank, world_size, params)
def multi_dev_init(params):
"""
Function to be invoked when executing data loading pipeline on multiple machines
Parameters:
-----------
params : argparser object
argparser object providing access to command line arguments.
"""
#init the gloo process group here.
dist.init_prcess_group("gloo", rank=params.rank, world_size=params.world_size)
print('[Rank: ', params.rank, '] Done with process group initialization...')
#invoke the main function here.
proc_exec(params.rank, params.world_size, params)
print('[Rank: ', params.rank, '] Done with Distributed data processing pipeline processing.')
import os
import torch
import numpy as np
import json
import dgl
import constants
def read_partitions_file(part_file):
"""
Utility method to read metis partitions, which is the output of
pm_dglpart2
Parameters:
-----------
part_file : string
file name which is the output of metis partitioning
algorithm (pm_dglpart2, in the METIS installation).
This function expects each line in `part_file` to be formatted as
<global_nid> <part_id>
and the contents of this file are sorted by <global_nid>.
Returns:
--------
numpy array
array of part_ids and the idx is the <global_nid>
"""
partitions_map = np.loadtxt(part_file, delimiter=' ', dtype=np.int64)
#as a precaution sort the lines based on the <global_nid>
partitions_map = partitions_map[partitions_map[:,0].argsort()]
return partitions_map[:,1]
def read_json(json_file):
"""
Utility method to read a json file schema
Parameters:
-----------
json_file : string
file name for the json schema
Returns:
--------
dictionary, as serialized in the json_file
"""
with open(json_file) as schema:
val = json.load(schema)
return val
def get_node_types(schema):
"""
Utility method to extract node_typename -> node_type mappings
as defined by the input schema
Parameters:
-----------
schema : dictionary
Input schema from which the node_typename -> node_type
dictionary is created.
Returns:
--------
dictionary, list
dictionary with ntype <-> type_nid mappings
list of ntype strings
"""
#Get the node_id ranges from the schema
global_nid_ranges = schema['nid']
global_nid_ranges = {key: np.array(global_nid_ranges[key]).reshape(1,2)
for key in global_nid_ranges}
#Create an array with the starting id for each node_type and sort
ntypes = [(key, global_nid_ranges[key][0,0]) for key in global_nid_ranges]
ntypes.sort(key=lambda e: e[1])
#Create node_typename -> node_type dictionary
ntypes = [e[0] for e in ntypes]
ntypes_map = {e: i for i, e in enumerate(ntypes)}
return ntypes_map, ntypes
def write_metadata_json(metadata_list, output_dir, graph_name):
"""
Merge json schema's from each of the rank's on rank-0.
This utility function, to be used on rank-0, to create aggregated json file.
Parameters:
-----------
metadata_list : list of json (dictionaries)
a list of json dictionaries to merge on rank-0
output_dir : string
output directory path in which results are stored (as a json file)
graph-name : string
a string specifying the graph name
"""
#Initialize global metadata
graph_metadata = {}
#Merge global_edge_ids from each json object in the input list
edge_map = {}
x = metadata_list[0]["edge_map"]
for k in x:
edge_map[k] = []
for idx in range(len(metadata_list)):
edge_map[k].append(metadata_list[idx]["edge_map"][k][0])
graph_metadata["edge_map"] = edge_map
graph_metadata["etypes"] = metadata_list[0]["etypes"]
graph_metadata["graph_name"] = metadata_list[0]["graph_name"]
graph_metadata["halo_hops"] = metadata_list[0]["halo_hops"]
#Merge global_nodeids from each of json object in the input list
node_map = {}
x = metadata_list[0]["node_map"]
for k in x:
node_map[k] = []
for idx in range(len(metadata_list)):
node_map[k].append(metadata_list[idx]["node_map"][k][0])
graph_metadata["node_map"] = node_map
graph_metadata["ntypes"] = metadata_list[0]["ntypes"]
graph_metadata["num_edges"] = sum([metadata_list[i]["num_edges"] for i in range(len(metadata_list))])
graph_metadata["num_nodes"] = sum([metadata_list[i]["num_nodes"] for i in range(len(metadata_list))])
graph_metadata["num_parts"] = metadata_list[0]["num_parts"]
graph_metadata["part_method"] = metadata_list[0]["part_method"]
for i in range(len(metadata_list)):
graph_metadata["part-{}".format(i)] = metadata_list[i]["part-{}".format(i)]
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=True, indent=4)
def augment_edge_data(edge_data, part_ids):
"""
Add partition-id (rank which owns an edge) column to the edge_data.
Parameters:
-----------
edge_data : numpy ndarray
Edge information as read from the xxx_edges.txt file
part_ids : numpy array
array of part_ids indexed by global_nid
"""
edge_data[constants.OWNER_PROCESS] = part_ids[edge_data[constants.GLOBAL_DST_ID]]
def augment_node_data(node_data, part_ids):
"""
Utility function to add auxilary columns to the node_data numpy ndarray.
Parameters:
-----------
node_data : numpy ndarray
Node information as read from xxx_nodes.txt file
part_ids : numpy array
array of part_ids indexed by global_nid
"""
#add global_nids to the node_data
global_nids = np.arange(len(node_data[constants.GLOBAL_TYPE_NID]), dtype=np.int64)
node_data[constants.GLOBAL_NID] = global_nids
#add owner proc_ids to the node_data
proc_ids = part_ids[node_data[constants.GLOBAL_NID]]
node_data[constants.OWNER_PROCESS] = proc_ids
def read_nodes_file(nodes_file):
"""
Utility function to read xxx_nodes.txt file
Parameters:
-----------
nodesfile : string
Graph file for nodes in the input graph
Returns:
--------
dictionary
Nodes data stored in dictionary where keys are column names
and values are the columns from the numpy ndarray as read from the
xxx_nodes.txt file
"""
if nodes_file == "" or nodes_file == None:
return None
# Read the file from here.
# Assuming the nodes file is a numpy file
# nodes.txt file is of the following format
# <node_type> <weight1> <weight2> <weight3> <weight4> <global_type_nid> <attributes>
# For the ogb-mag dataset, nodes.txt is of the above format.
nodes_data = np.loadtxt(nodes_file, delimiter=' ', dtype='int64')
nodes_datadict = {}
nodes_datadict[constants.NTYPE_ID] = nodes_data[:,0]
nodes_datadict[constants.GLOBAL_TYPE_NID] = nodes_data[:,5]
return nodes_datadict
def read_edges_file(edge_file, edge_data_dict):
"""
Utility function to read xxx_edges.txt file
Parameters:
-----------
edge_file : string
Graph file for edges in the input graph
Returns:
--------
dictionary
edge data as read from xxx_edges.txt file and columns are stored
in a dictionary with key-value pairs as column-names and column-data.
"""
if edge_file == "" or edge_file == None:
return None
#Read the file from here.
#<global_src_id> <global_dst_id> <type_eid> <etype> <attributes>
# global_src_id -- global idx for the source node ... line # in the graph_nodes.txt
# global_dst_id -- global idx for the destination id node ... line # in the graph_nodes.txt
edge_data = np.loadtxt(edge_file , delimiter=' ', dtype = 'int64')
if (edge_data_dict == None):
edge_data_dict = {}
edge_data_dict[constants.GLOBAL_SRC_ID] = edge_data[:,0]
edge_data_dict[constants.GLOBAL_DST_ID] = edge_data[:,1]
edge_data_dict[constants.GLOBAL_TYPE_EID] = edge_data[:,2]
edge_data_dict[constants.ETYPE_ID] = edge_data[:,3]
else:
edge_data_dict[constants.GLOBAL_SRC_ID] = \
np.concatenate((edge_data_dict[constants.GLOBAL_SRC_ID], edge_data[:,0]))
edge_data_dict[constants.GLOBAL_DST_ID] = \
np.concatenate((edge_data_dict[constants.GLOBAL_DST_ID], edge_data[:,1]))
edge_data_dict[constants.GLOBAL_TYPE_EID] = \
np.concatenate((edge_data_dict[constants.GLOBAL_TYPE_EID], edge_data[:,2]))
edge_data_dict[constants.ETYPE_ID] = \
np.concatenate((edge_data_dict[constants.ETYPE_ID], edge_data[:,3]))
return edge_data_dict
def read_node_features_file(nodes_features_file):
"""
Utility function to load tensors from a file
Parameters:
-----------
nodes_features_file : string
Features file for nodes in the graph
Returns:
--------
dictionary
mappings between ntype and list of features
"""
node_features = dgl.data.utils.load_tensors(nodes_features_file, False)
return node_features
def read_edge_features_file(edge_features_file):
"""
Utility function to load tensors from a file
Parameters:
-----------
edge_features_file : string
Features file for edges in the graph
Returns:
--------
dictionary
mappings between etype and list of features
"""
edge_features = dgl.data.utils.load_tensors(edge_features_file, True)
return edge_features
def write_node_features(node_features, node_file):
"""
Utility function to serialize node_features in node_file file
Parameters:
-----------
node_features : dictionary
dictionary storing ntype <-> list of features
node_file : string
File in which the node information is serialized
"""
dgl.data.utils.save_tensors(node_file, node_features)
def write_edge_features(edge_features, edge_file):
"""
Utility function to serialize edge_features in edge_file file
Parameters:
-----------
edge_features : dictionary
dictionary storing etype <-> list of features
edge_file : string
File in which the edge information is serialized
"""
dgl.data.utils.save_tensors(edge_file, edge_features)
def write_graph_dgl(graph_file, graph_obj):
"""
Utility function to serialize graph dgl objects
Parameters:
-----------
graph_obj : dgl graph object
graph dgl object, as created in convert_partition.py, which is to be serialized
graph_file : string
File name in which graph object is serialized
"""
dgl.save_graphs(graph_file, [graph_obj])
def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_id):
"""
Wrapper function to create dgl objects for graph, node-features and edge-features
graph_obj : dgl object
graph dgl object as created in convert_partition.py file
node_features : dgl object
Tensor data for node features
edge_features : dgl object
Tensor data for edge features
"""
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
write_graph_dgl(os.path.join(part_dir ,'part'+str(part_id)), graph_obj)
if node_features != None:
write_node_features(node_features, os.path.join(part_dir, "node_feat.dgl"))
if (edge_features != None):
write_edge_features(edge_features, os.path.join(part_dir, "edge_feat.dgl"))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment