Unverified Commit 9948ef4d authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Added code to support multiple-file-support feature and removed singl… (#4188)

* Added code to support multiple-file-support feature and removed single-file-support code

1. Added code to read dataset in multiple-file-format
2. Removed code for single-file format

* added files missing in the previous commit

This commit includes dataset_utils.py, which reads the dataset in multiple-file-format, gloo_wrapper function calls to support exchanging dictionaries as objects and helper functions in utils.py

* Update convert_partition.py

Updated function call "create_metadata_json" file to include partition_id so that each rank only creates its own metadata object and later on these are accumulated on rank-0 to create graph-level metadata json file.

* addressing code review comments during the CI process

code changes resulting from the code review comments received during the CI process.

* Code reorganization

Addressing CI comments and code reorganization for easier understanding.

* Removed commented out line

removed commented out line.
parent b7187dd3
......@@ -130,7 +130,6 @@ def create_dgl_object(graph_name, num_parts, \
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
part_local_src_id, part_local_dst_id = np.split(inverse_idx[:len(shuffle_global_src_id) * 2], 2)
compact_g = dgl.graph(data=(part_local_src_id, part_local_dst_id), num_nodes=len(idx))
compact_g.edata['orig_id'] = th.as_tensor(global_edge_id)
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
......@@ -185,7 +184,7 @@ def create_dgl_object(graph_name, num_parts, \
return compact_g2, node_map_val, edge_map_val, ntypes_map, etypes_map
def create_metadata_json(graph_name, num_nodes, num_edges, num_parts, node_map_val, \
def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, node_map_val, \
edge_map_val, ntypes_map, etypes_map, output_dir ):
"""
Auxiliary function to create json file for the graph partition metadata
......@@ -198,6 +197,8 @@ def create_metadata_json(graph_name, num_nodes, num_edges, num_parts, node_map_v
no. of nodes in the graph partition
num_edges : int
no. of edges in the graph partition
part_id : int
integer indicating the partition id
num_parts : int
total no. of partitions of the original graph
node_map_val : dictionary
......@@ -228,7 +229,6 @@ def create_metadata_json(graph_name, num_nodes, num_edges, num_parts, node_map_v
'ntypes': ntypes_map,
'etypes': etypes_map}
for part_id in range(num_parts):
part_dir = 'part' + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
......
import argparse
import numpy as np
import torch.multiprocessing as mp
from initialize import proc_exec, single_dev_init, multi_dev_init
from data_shuffle import single_machine_run, multi_machine_run
def log_params(params):
""" Print all the command line arguments for debugging purposes.
......@@ -29,29 +29,6 @@ def log_params(params):
print('Metis partitions: ', params.partitions_file)
print('Exec Type: ', params.exec_type)
def start_local_run(params):
""" Main function for distributed implementation on a single machine
Parameters:
-----------
params : argparser object
Argument Parser structure with pre-determined arguments as defined
at the bottom of this file.
"""
log_params(params)
processes = []
mp.set_start_method("spawn")
#Invoke `target` function from each of the spawned process for distributed
#implementation
for rank in range(params.world_size):
p = mp.Process(target=single_dev_init, args=(rank, params.world_size, proc_exec, params))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
"""
Start of execution from this point.
......@@ -99,6 +76,6 @@ if __name__ == "__main__":
#invoke the starting function here.
if(params.exec_type == 0):
start_local_run(params)
single_machine_run(params)
else:
multi_dev_init(params)
multi_machine_run(params)
This diff is collapsed.
import os
import numpy as np
import constants
import torch
def get_dataset(input_dir, graph_name, rank, num_node_weights):
"""
Function to read the multiple file formatted dataset.
Parameters:
-----------
input_dir : string
root directory where dataset is located.
graph_name : string
graph name string
rank : int
rank of the current process
num_node_weights : int
integer indicating the no. of weights each node is attributed with
Return:
-------
dictionary
Data read from nodes.txt file and used to build a dictionary with keys as column names
and values as columns in the csv file.
dictionary
Data read from numpy files for all the node features in this dataset. Dictionary built
using this data has keys as node feature names and values as tensor data representing
node features
dictionary
Data read from edges.txt file and used to build a dictionary with keys as column names
and values as columns in the csv file.
"""
#node features dictionary
node_features = {}
#iterate over the sub-dirs and extract the nodetypes
#in each nodetype folder read all the features assigned to
#current rank
siblings = os.listdir(input_dir)
for s in siblings:
if s.startswith("nodes-"):
tokens = s.split("-")
ntype = tokens[1]
num_feats = tokens[2]
for idx in range(int(num_feats)):
feat_file = s +'/node-feat-'+'{:02d}'.format(idx) +'/'+ str(rank)+'.npy'
if (os.path.exists(input_dir+'/'+feat_file)):
features = np.load(input_dir+'/'+feat_file)
node_features[ntype+'/feat'] = torch.tensor(features)
#done build node_features locally.
if len(node_features) <= 0:
print('[Rank: ', rank, '] This dataset does not have any node features')
else:
for k, v in node_features.items():
print('[Rank: ', rank, '] node feature name: ', k, ', feature data shape: ', v.size())
#read (split) xxx_nodes.txt file
node_file = input_dir+'/'+graph_name+'_nodes'+'_{:02d}.txt'.format(rank)
node_data = np.loadtxt(node_file, delimiter=' ', dtype='int64')
nodes_datadict = {}
nodes_datadict[constants.NTYPE_ID] = node_data[:,0]
type_idx = 0 + num_node_weights + 1
nodes_datadict[constants.GLOBAL_TYPE_NID] = node_data[:,type_idx]
print('[Rank: ', rank, '] Done reading node_data: ', len(nodes_datadict), nodes_datadict[constants.NTYPE_ID].shape)
#read (split) xxx_edges.txt file
edge_datadict = {}
edge_file = input_dir+'/'+graph_name+'_edges'+'_{:02d}.txt'.format(rank)
edge_data = np.loadtxt(edge_file, delimiter=' ', dtype='int64')
edge_datadict[constants.GLOBAL_SRC_ID] = edge_data[:,0]
edge_datadict[constants.GLOBAL_DST_ID] = edge_data[:,1]
edge_datadict[constants.GLOBAL_TYPE_EID] = edge_data[:,2]
edge_datadict[constants.ETYPE_ID] = edge_data[:,3]
print('[Rank: ', rank, '] Done reading edge_file: ', len(edge_datadict), edge_datadict[constants.GLOBAL_SRC_ID].shape)
return nodes_datadict, node_features, edge_datadict
......@@ -31,7 +31,7 @@ def allgather_sizes(send_data, world_size):
dist.all_gather(in_tensor, out_tensor)
#gather sizes in on array to return to the invoking function
rank_sizes = np.zeros(world_size + 1)
rank_sizes = np.zeros(world_size + 1, dtype=np.int64)
count = rank_sizes[0]
for i, t in enumerate(in_tensor):
count += t.item()
......@@ -59,6 +59,40 @@ def alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
for i in range(world_size):
dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)
def alltoall_cpu_object_lst(rank, world_size, input_list):
"""
Each process scatters list of input objects to all processes in a cluster
and return gathered list of objects in output list.
Parameters
----------
rank : int
The rank of current worker
world_size : int
The size of the entire
input_tensor_list : List of tensor
The tensors to exchange
Returns
-------
list: list of objects are received from other processes
This is the list of objects which are sent to the current process by
other processes as part of this exchange
"""
rcv_list = []
output_list = [None] * world_size
for i in range(world_size):
rcv_list.clear()
rcv_list.append(None)
if (i == rank):
dist.scatter_object_list(rcv_list, input_list, src = rank)
else:
send_list = [None] * world_size
dist.scatter_object_list(rcv_list, send_list, src = i)
output_list[i] = rcv_list[0]
return output_list
def alltoallv_cpu(rank, world_size, output_tensor_list, input_tensor_list):
"""
Each process scatters list of input tensors to all processes in a cluster
......
import os
import json
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import constants
from timeit import default_timer as timer
from datetime import timedelta
from utils import augment_node_data, augment_edge_data,\
read_nodes_file, read_edges_file,\
read_node_features_file, read_edge_features_file,\
read_partitions_file, read_json,\
get_node_types, write_metadata_json, write_dgl_objects
from globalids import assign_shuffle_global_nids_nodes, assign_shuffle_global_nids_edges,\
get_shuffle_global_nids_edges
from gloo_wrapper import gather_metadata_json
from convert_partition import create_dgl_object, create_metadata_json
def send_node_data(rank, node_data, part_ids):
"""
Function to send node_data to non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
node_data : numpy ndarray
node_data, in the augmented form
part_ids : python list
list of unique ranks/partition-ids
"""
for part_id in part_ids:
if part_id == rank:
continue
#extract <node_type>, <global_type_nid>, <global_nid>
#which belong to `part_id`
send_data_idx = (node_data[constants.OWNER_PROCESS] == part_id)
idx = send_data_idx.reshape(node_data[constants.GLOBAL_NID].shape[0])
filt_data = np.column_stack((node_data[constants.NTYPE_ID], \
node_data[constants.GLOBAL_TYPE_NID], \
node_data[constants.GLOBAL_NID]))
filt_data = filt_data[idx == 1]
#prepare tensor to send
send_size = filt_data.shape
size_tensor = torch.tensor(filt_data.shape, dtype=torch.int64)
# Send size first, so that the part-id (rank)
# can create appropriately sized buffers
dist.send(size_tensor, dst=part_id)
#send actual node_data to part-id rank
start = timer()
send_tensor = torch.from_numpy(filt_data.astype(np.int64))
dist.send(send_tensor, dst=part_id)
end = timer()
print('Rank: ', rank, ' Sent data size: ', filt_data.shape, \
', to Process: ', part_id, 'in: ', timedelta(seconds = end - start))
def send_edge_data(rank, edge_data, part_ids):
"""
Function to send edge data to non-rank-0 processes
Parameters:
-----------
rank : integer
rank of the process
edge_data : numpy ndarray
edge_data, in the augmented form
part_ids : python list
list of unique ranks/partition-ids
"""
for part_id in part_ids:
if part_id == rank:
continue
#extract global_sid, global_dit, global_type_eid, etype_id
send_data = (edge_data[constants.OWNER_PROCESS] == part_id)
idx = send_data.reshape(edge_data[constants.GLOBAL_SRC_ID].shape[0])
filt_data = np.column_stack((edge_data[constants.GLOBAL_SRC_ID][idx == 1], \
edge_data[constants.GLOBAL_DST_ID][idx == 1], \
edge_data[constants.GLOBAL_TYPE_EID][idx == 1], \
edge_data[constants.ETYPE_ID][idx == 1]))
#send shape
send_size = filt_data.shape
size_tensor = torch.tensor(filt_data.shape, dtype=torch.int64)
# Send size first, so that the rProc can create appropriately sized tensor
dist.send(size_tensor, dst=part_id)
start = timer()
send_tensor = torch.from_numpy(filt_data)
dist.send(send_tensor, dst=part_id)
end = timer()
print('Rank: ', rank, ' Time to send Edges to proc: ', part_id, \
' is : ', timedelta(seconds = end - start))
def send_node_features(rank, node_data, node_features, part_ids, ntype_map):
"""
Function to send node_features data to non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
node_data : numpy ndarray of int64
node_data, read from the xxx_nodes.txt file
node_features : numpy ndarray of floats
node_features, data from the node_feats.dgl
part_ids : list
list of unique ranks/partition-ids
ntype_map : dictionary
mappings between ntype_name -> ntype
"""
node_features_out = []
for part_id in part_ids:
if part_id == rank:
node_features_out.append(None)
continue
part_node_features = {}
for ntype_name, ntype in ntype_map.items():
if (ntype_name +'/feat' in node_features) and (node_features[ntype_name+'/feat'].shape[0] > 0):
#extract orig_type_node_id
idx = (node_data[constants.OWNER_PROCESS] == part_id) & (node_data[constants.NTYPE_ID] == ntype)
filt_global_type_nids = node_data[constants.GLOBAL_TYPE_NID][idx] # extract global_ntype_id here
part_node_features[ntype_name+'/feat'] = node_features[ntype_name+'/feat'][filt_global_type_nids]
#accumulate subset of node_features targetted for part-id rank
node_features_out.append(part_node_features)
#send data
output_list = [None]
start = timer ()
dist.scatter_object_list(output_list, node_features_out, src=0)
end = timer ()
print('Rank: ', rank, ', Done sending Node Features to: ', part_id, \
' in: ', timedelta(seconds = end - start))
def send_data(rank, node_data, node_features, edge_data, node_part_ids, ntypes_map):
"""
Wrapper function to send graph data to non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
node_data : numpy ndarray
node_data, augmented, from xxx_nodes.txt file
node_features : numpy ndarray
node_features, data from the node_feats.dgl
edge_data : numpy ndarray
edge_data, augmented, from xxx_edges.txt file
node_part_ids : ndarray
array of part_ids indexed by global_nid
ntype_map : dictionary
mappings between ntype_name -> ntype_id
"""
part_ids = np.unique(node_part_ids)
part_ids.sort ()
print('Rank: ', rank, ', Unique partitions: ', part_ids)
send_node_data(rank, node_data, part_ids)
send_edge_data(rank, edge_data, part_ids)
send_node_features(rank, node_data, node_features, part_ids, ntypes_map)
def recv_data(rank, shape, dtype):
"""
Auxiliary function to receive a multi-dimensional tensor, used by the
non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
shape : tuple of integers
shape of the received data
dtype : integer
type of the received data
Returns:
--------
numpy array
received data after completing 'recv' gloo primitive.
"""
#First receive the size of the data to be received from rank-0 process
recv_tensor_shape = torch.zeros(shape, dtype=torch.int64)
dist.recv(recv_tensor_shape, src=0)
recv_shape = list(map(lambda x: int(x), recv_tensor_shape))
#Receive the data message here for nodes here.
recv_tensor_data = torch.zeros(recv_shape, dtype=dtype)
dist.recv(recv_tensor_data, src=0)
return recv_tensor_data.numpy()
def recv_node_data(rank, shape, dtype):
"""
Function to receive node_data, used by non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
shape : tuple of integers
shape of the received data
dtype : integer
type of the received data
Returns:
--------
numpy array
result of the 'recv' gloo primitive
"""
return recv_data(rank, shape, dtype)
def recv_edge_data(rank, shape, dtype):
"""
Function to receive edge_data, used by non-rank0 processes.
Parameters:
-----------
rank : integer
rank of the process
shape : tuple of integers
shape of the received data
dtype : integer
type of the received data
Returns:
--------
numpy array
result of the 'recv' operation
"""
return recv_data(rank, shape, dtype)
def recv_node_features_obj(rank, world_size):
"""
Function to receive node_feautres as an object, as read from the node_feats.dgl file.
This is used by non-rank-0 processes.
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
no. of processes used
Returns:
--------
numpy ndarray
node_feature data, of floats, as received by the scatter_object_list function
"""
send_objs = [None for _ in range(world_size)]
recv_obj = [None]
dist.scatter_object_list(recv_obj, send_objs, src=0)
node_features = recv_obj[0]
return node_features
def read_graph_files(rank, params, node_part_ids):
"""
Read the files and return the data structures
Node data as read from files, which is in the following format:
<node_type> <weight1> <weight2> <weight3> <weight4> <global_type_nid> <attributes>
is converted to
<node_type> <weight1> <weight2> <weight3> <weight4> <global_type_nid> <nid> <recv_proc>
Edge data as read from files, which is in the following format:
<global_src_id> <global_dst_id> <global_type_eid> <edge_type> <attributes>
is converted to the following format in this function:
<global_src_id> <global_dst_id> <global_type_eid> <edge_type> <recv_proc>
Parameters:
-----------
rank : integer
rank of the process
params : argparser object
argument parser data structure to access command line arguments
node_part_ids : numpy array
array of part_ids indexed by global_nid
Returns:
--------
numpy ndarray
integer node_data with additional columns added
numpy ndarray
floats, node_features as read from the node features file
numpy ndarray
integer, edge_data with additional columns added
numpy ndarray
floats, edge_features are read from the edge feature file
"""
node_data = read_nodes_file(params.input_dir+'/'+params.nodes_file)
augment_node_data(node_data, node_part_ids)
print('Rank: ', rank, ', Completed loading nodes data: ', node_data[constants.GLOBAL_TYPE_NID].shape)
edge_data = read_edges_file(params.input_dir+'/'+params.edges_file, None)
print('Rank: ', rank, ', Completed loading edge data: ', edge_data[constants.GLOBAL_SRC_ID].shape)
edge_data = read_edges_file(params.input_dir+'/'+params.removed_edges, edge_data)
augment_edge_data(edge_data, node_part_ids)
print('Rank: ', rank, ', Completed adding removed edges : ', edge_data[constants.GLOBAL_SRC_ID].shape)
node_features = {}
node_features = read_node_features_file( params.input_dir+'/'+params.node_feats_file )
print('Rank: ', rank, ', Completed loading node features reading from file ', len(node_features))
edge_features = {}
#edge_features = read_edge_features_file( params.input_dir+'/'+params.edge_feats_file )
#print( 'Rank: ', rank, ', Completed edge features reading from file ', len(edge_features) )
return node_data, node_features, edge_data, edge_features
def proc_exec(rank, world_size, params):
"""
`main` function for each rank in the distributed implementation.
This function is used when one-machine is used for executing the entire pipeline.
In this case, all the gloo-processes will exist on the same machine. Also, this function
expects that the graph input files are in single file-format. Nodes, edges, node-features and
edge features each will have their own file describing the appropriate parts of the input graph.
Parameters:
-----------
rank : integer
rank of the current process
world_size : integer
total no. of ranks
params : argparser object
argument parser structure to access values passed from command line
"""
#Read METIS partitions
node_part_ids = read_partitions_file(params.input_dir+'/'+params.partitions_file)
print('Rank: ', rank, ', Completed loading metis partitions: ', len(node_part_ids))
#read graph schema, get ntype_map(dict for ntype to ntype-id lookups) and ntypes list
schema_map = read_json(params.input_dir+'/'+params.schema)
ntypes_map, ntypes = get_node_types(schema_map)
# Rank-0 process will read the graph input files (nodes, edges, node-features and edge-features).
# it will uses metis partitions, node-id to partition-id mappings, to determine the node and edge
# ownership and sends out data to all the other non rank-0 processes.
if rank == 0:
#read input graph files
node_data, node_features, edge_data, edge_features = read_graph_files(rank, params, node_part_ids)
# order node_data by node_type before extracting node features.
# once this is ordered, node_features are automatically ordered and
# can be assigned contiguous ids starting from 0 for each type.
#node_data = node_data[node_data[:, 0].argsort()]
sorted_idx = node_data[constants.NTYPE_ID].argsort()
for k, v in node_data.items():
node_data[k] = v[sorted_idx]
print('Rank: ', rank, ', node_data: ', len(node_data))
print('Rank: ', rank, ', node_features: ', len(node_features))
print('Rank: ', rank, ', edge_data: ', len(edge_data))
#print('Rank: ', rank, ', edge_features : ',len( edge_features))
print('Rank: ', rank, ', partitions : ', len(node_part_ids))
# shuffle data
send_data(rank, node_data, node_features, edge_data, node_part_ids, ntypes_map)
#extract features here for rank-0
for name, ntype_id in ntypes_map.items():
ntype = name + '/feat'
if(ntype in node_features):
idx = node_data[constants.GLOBAL_TYPE_NID][(node_data[constants.NTYPE_ID] == ntype_id) & (node_data[constants.OWNER_PROCESS] == rank)]
node_features[ntype] = node_features[ntype][idx]
# Filter data owned by rank-0
#extract only ntype, global_type_nid, global_nid
idx = np.where(node_data[constants.OWNER_PROCESS] == 0)[0]
for k, v in node_data.items():
node_data[k] = v[idx]
#extract only global_src_id, global_dst_id, global_type_eid etype
idx = np.where(edge_data[constants.OWNER_PROCESS] == 0)[0]
for k, v in edge_data.items():
edge_data[k] = v[idx]
else:
#Non-rank-0 processes, receives nodes, edges, node-features and edge feautres from rank-0
# process and creates appropriate data structures.
rcvd_node_data = recv_node_data(rank, 2, torch.int64)
node_data = {}
node_data[constants.NTYPE_ID] = rcvd_node_data[:,0]
node_data[constants.GLOBAL_TYPE_NID] = rcvd_node_data[:,1]
node_data[constants.GLOBAL_NID] = rcvd_node_data[:,2]
rcvd_edge_data = recv_edge_data(rank, 2, torch.int64)
edge_data = {}
edge_data[constants.GLOBAL_SRC_ID] = rcvd_edge_data[:,0]
edge_data[constants.GLOBAL_DST_ID] = rcvd_edge_data[:,1]
edge_data[constants.GLOBAL_TYPE_EID] = rcvd_edge_data[:,2]
edge_data[constants.ETYPE_ID] = rcvd_edge_data[:,3]
node_features = recv_node_features_obj(rank, world_size)
edge_features = {}
# From this point onwards, all the processes will follow the same execution logic and
# process the data which is owned by the current process.
# At this time, all the processes will have all the data which it owns (nodes, edges,
# node-features and edge-features).
#syncronize
dist.barrier()
# assign shuffle_global ids to nodes
assign_shuffle_global_nids_nodes(rank, world_size, node_data)
print('Rank: ', rank, ' Done assign Global ids to nodes...')
#sort edge_data by etype
sorted_idx = edge_data[constants.ETYPE_ID].argsort()
for k, v in edge_data.items():
edge_data[k] = v[sorted_idx]
# assign shuffle_global ids to edges
shuffle_global_eid_start = assign_shuffle_global_nids_edges(rank, world_size, edge_data)
print('Rank: ', rank, ' Done assign Global ids to edges...')
# resolve shuffle_global ids for nodes which are not locally owned
get_shuffle_global_nids_edges(rank, world_size, edge_data, node_part_ids, node_data)
print('Rank: ', rank, ' Done retrieving Global Node Ids for non-local nodes... ')
#create dgl objects
print('Rank: ', rank, ' Creating DGL objects for all partitions')
num_nodes = 0
num_edges = shuffle_global_eid_start
with open('{}/{}'.format(params.input_dir, params.schema)) as json_file:
schema = json.load(json_file)
graph_obj, ntypes_map_val, etypes_map_val, ntypes_map, etypes_map = create_dgl_object(\
params.graph_name, params.num_parts, \
schema, rank, node_data, edge_data, num_nodes, num_edges)
write_dgl_objects(graph_obj, node_features, edge_features, params.output, rank)
#get the meta-data
json_metadata = create_metadata_json(params.graph_name, num_nodes, num_edges, params.num_parts, ntypes_map_val, \
etypes_map_val, ntypes_map, etypes_map, params.output)
if (rank == 0):
#get meta-data from all partitions and merge them on rank-0
metadata_list = gather_metadata_json(json_metadata, rank, world_size)
metadata_list[0] = json_metadata
write_metadata_json(metadata_list, params.output, params.graph_name)
else:
#send meta-data to Rank-0 process
gather_metadata_json(json_metadata, rank, world_size)
def single_dev_init(rank, world_size, func_exec, params, backend="gloo"):
"""
Init. function which is run by each process in the Gloo ProcessGroup
Parameters:
-----------
rank : integer
rank of the process
world_size : integer
number of processes configured in the Process Group
proc_exec : function name
function which will be invoked which has the logic for each process in the group
params : argparser object
argument parser object to access the command line arguments
backend : string
string specifying the type of backend to use for communication
"""
os.environ["MASTER_ADDR"] = '127.0.0.1'
os.environ["MASTER_PORT"] = '29500'
#create Gloo Process Group
dist.init_process_group(backend, rank=rank, world_size=world_size)
#Invoke the main function to kick-off each process
func_exec(rank, world_size, params)
def multi_dev_init(params):
"""
Function to be invoked when executing data loading pipeline on multiple machines
Parameters:
-----------
params : argparser object
argparser object providing access to command line arguments.
"""
#init the gloo process group here.
dist.init_prcess_group("gloo", rank=params.rank, world_size=params.world_size)
print('[Rank: ', params.rank, '] Done with process group initialization...')
#invoke the main function here.
proc_exec(params.rank, params.world_size, params)
print('[Rank: ', params.rank, '] Done with Distributed data processing pipeline processing.')
......@@ -79,6 +79,54 @@ def get_node_types(schema):
return ntypes_map, ntypes
def get_edge_types(schema):
"""
Utility function to form edges dictionary between edge_type names and ids
Parameters
----------
schema : dictionary
Input schema from which the edge_typename -> edge_type
dictionary is defined
Returns
-------
dictionary:
a map between edgetype_names and ids
list:
list of edgetype_names
"""
global_eid_ranges = schema['eid']
global_eid_ranges = {key: np.array(global_eid_ranges[key]).reshape(1,2)
for key in global_eid_ranges}
etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
etypes.sort(key=lambda e: e[1])
etypes = [e[0] for e in etypes]
etypes_map = {e: i for i, e in enumerate(etypes)}
return etypes_map, etypes
def get_ntypes_map(schema):
"""
Utility function to return nodes global id range from the input schema
as well as node count per each node type
Parameters:
-----------
schema : dictionary
Input schema where the requested dictionaries are defined
Returns:
--------
dictionary :
map between the node_types and global_id ranges for each node_type
dictionary :
map between the node_type and total node count for that type
"""
return schema["nid"], schema["node_type_id_count"]
def write_metadata_json(metadata_list, output_dir, graph_name):
"""
Merge json schema's from each of the rank's on rank-0.
......@@ -102,7 +150,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
for k in x:
edge_map[k] = []
for idx in range(len(metadata_list)):
edge_map[k].append(metadata_list[idx]["edge_map"][k][0])
edge_map[k].append([int(metadata_list[idx]["edge_map"][k][0][0]),int(metadata_list[idx]["edge_map"][k][0][1])])
graph_metadata["edge_map"] = edge_map
graph_metadata["etypes"] = metadata_list[0]["etypes"]
......@@ -115,12 +163,12 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
for k in x:
node_map[k] = []
for idx in range(len(metadata_list)):
node_map[k].append(metadata_list[idx]["node_map"][k][0])
node_map[k].append([int(metadata_list[idx]["node_map"][k][0][0]), int(metadata_list[idx]["node_map"][k][0][1])])
graph_metadata["node_map"] = node_map
graph_metadata["ntypes"] = metadata_list[0]["ntypes"]
graph_metadata["num_edges"] = sum([metadata_list[i]["num_edges"] for i in range(len(metadata_list))])
graph_metadata["num_nodes"] = sum([metadata_list[i]["num_nodes"] for i in range(len(metadata_list))])
graph_metadata["num_edges"] = int(sum([metadata_list[i]["num_edges"] for i in range(len(metadata_list))]))
graph_metadata["num_nodes"] = int(sum([metadata_list[i]["num_nodes"] for i in range(len(metadata_list))]))
graph_metadata["num_parts"] = metadata_list[0]["num_parts"]
graph_metadata["part_method"] = metadata_list[0]["part_method"]
......@@ -130,7 +178,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
with open('{}/{}.json'.format(output_dir, graph_name), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=True, indent=4)
def augment_edge_data(edge_data, part_ids):
def augment_edge_data(edge_data, part_ids, id_offset):
"""
Add partition-id (rank which owns an edge) column to the edge_data.
......@@ -141,21 +189,26 @@ def augment_edge_data(edge_data, part_ids):
part_ids : numpy array
array of part_ids indexed by global_nid
"""
#add global_nids to the node_data
global_eids = np.arange(id_offset, id_offset + len(edge_data[constants.GLOBAL_TYPE_EID]), dtype=np.int64)
edge_data[constants.GLOBAL_EID] = global_eids
edge_data[constants.OWNER_PROCESS] = part_ids[edge_data[constants.GLOBAL_DST_ID]]
def augment_node_data(node_data, part_ids):
def augment_node_data(node_data, part_ids, offset):
"""
Utility function to add auxilary columns to the node_data numpy ndarray.
Parameters:
-----------
node_data : numpy ndarray
Node information as read from xxx_nodes.txt file
node_data : dictionary
Node information as read from xxx_nodes.txt file and a dictionary is built using this data
using keys as column names and values as column data from the csv txt file.
part_ids : numpy array
array of part_ids indexed by global_nid
"""
#add global_nids to the node_data
global_nids = np.arange(len(node_data[constants.GLOBAL_TYPE_NID]), dtype=np.int64)
global_nids = np.arange(offset, offset + len(node_data[constants.GLOBAL_TYPE_NID]), dtype=np.int64)
node_data[constants.GLOBAL_NID] = global_nids
#add owner proc_ids to the node_data
......@@ -323,7 +376,7 @@ def write_dgl_objects(graph_obj, node_features, edge_features, output_dir, part_
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
write_graph_dgl(os.path.join(part_dir ,'part'+str(part_id)), graph_obj)
write_graph_dgl(os.path.join(part_dir ,'graph.dgl'), graph_obj)
if node_features != None:
write_node_features(node_features, os.path.join(part_dir, "node_feat.dgl"))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment