Unverified Commit b1319200 authored by Ankit Garg's avatar Ankit Garg Committed by GitHub
Browse files

[Feature] Add removed edges in distributed graph partitioning to handle heterogeneous graph (#3137)



* Added code for Rectifying (TypeError: unhashable type: 'slice') when copying file

* 1) added distributed preprocessing code to create ParMetis Input from CSV files
2) add code to run pm_dglpart on multiple machines
3) added support for recreating heteregenous graph from homo geneous graph based on dropped edges, as ParMetis currently only supports homogeneous graphs

* move to pandas

* Added comments and remove drop_duplicates as it was redundant

* Addressed Pr Comments

* Rename variable

* Added comment

* Added comment

* updated ReadMe
Co-authored-by: default avatarAnkit Garg <gaank@amazon.com>
Co-authored-by: default avatarDa Zheng <zhengda1936@gmail.com>
parent b78acd67
......@@ -37,6 +37,8 @@ DGL provides a plenty of learning materials for all kinds of users from ML resea
It is convenient to train models using DGL on large-scale graphs across multiple GPUs or multiple machines. DGL extensively optimizes the whole stack to reduce the overhead in communication, memory consumption and synchronization. As a result, DGL can easily scale to billion-sized graphs. See the [system performance note](https://docs.dgl.ai/performance.html) for the comparison with the other tools.
Now DistDGL ParMETIS implementation also provides support for hetero graph by adding back the dropped edges back into the partitioned graph thus handling parMetis hetero graph usecase.
## Get Started
Users can install DGL from [pip and conda](https://www.dgl.ai/pages/start.html). Advanced users can follow the [instructions](https://docs.dgl.ai/install/index.html#install-from-source) to install from source.
......
import pandas as pd
import os
import glob
import json
import argparse
from collections import defaultdict
path = os.getcwd()
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--name", help="name of graph to create", default="order")
parser.add_argument("-nc", "--node_column", nargs="+", default=['order_id', 'entity_index', 'order_datetime', 'cid'])
parser.add_argument("-nk", "--node_key", default='entity_index')
parser.add_argument("-ec", "--edge_column", nargs="+", default=['predicate_type', 'predicate_index', 'entity_index', 'entity_index_y'])
parser.add_argument("-es", "--edge_start", default="entity_index")
parser.add_argument("-en", "--edge_end", default="entity_index_y")
args = parser.parse_args()
#Store all types of node in nodes folder
nodes_list = sorted(glob.glob(os.path.join(path, "nodes/*")))
if os.path.exists("{}_nodes.txt".format(args.name)):
os.remove("{}_nodes.txt".format(args.name))
schema_dict = defaultdict(dict)
node_type_id = 0
all_nodes_count = 0
for node_type_name in nodes_list:
nodes_count = 0
csv_files = sorted(glob.glob(os.path.join(node_type_name, "*.csv")))
for file_name in csv_files:
df = pd.read_csv(file_name, error_bad_lines=False, escapechar='\\', names=args.node_column, usecols=[*range(len(args.node_column))])
df_entity = pd.DataFrame(df[args.node_key], columns=[args.node_key])
df_entity['type'] = node_type_id
column_list = ['type']
for weight_index in range(len(nodes_list)):
weight_num = "weight{}".format(weight_index)
column_list.append(weight_num)
if weight_index == node_type_id:
df_entity[weight_num] = 1
else:
df_entity[weight_num] = 0
nodes_count += len(df_entity.index)
column_list.append(args.node_key)
#This loop is trying to create file which servers as an input for Metis Algorithm.
#More details about metis input can been found here : https://docs.dgl.ai/en/0.6.x/guide/distributed-preprocessing.html#input-format-for-parmetis
df_entity.to_csv("{}_nodes.txt".format(args.name), columns=column_list, sep=" ", index=False, header=False, mode='a')
schema_dict['nid'][os.path.basename(node_type_name)] = [all_nodes_count, nodes_count + all_nodes_count]
all_nodes_count += nodes_count
node_type_id += 1
if os.path.exists("{}_edges.txt".format(args.name)):
os.remove("{}_edges.txt".format(args.name))
#Store all types of edge in edges folder
edges_list = sorted(glob.glob(os.path.join(path, "edges/*")))
all_edges_count = 0
edge_type_id = 0
for edge_type_name in edges_list:
edge_count = 0
csv_files = sorted(glob.glob(os.path.join(edge_type_name, "*.csv")))
for file_name in csv_files:
df = pd.read_csv(file_name, error_bad_lines=False, escapechar='\\', names=args.edge_column, usecols=[*range(len(args.edge_column))])
df_entity = pd.DataFrame(df[[args.edge_start, args.edge_end]], columns=[args.edge_start, args.edge_end])
df_entity['type'] = edge_type_id
df_entity = df_entity.reset_index()
df_entity['number'] = df_entity.index + edge_count
edge_count += len(df_entity.index)
#This loop is trying to create file which servers as an input for Metis Algorithm.
#More details about metis input can been found here : https://docs.dgl.ai/en/0.6.x/guide/distributed-preprocessing.html#input-format-for-parmetis
df_entity.to_csv("{}_edges.txt".format(args.name), columns=[args.edge_start, args.edge_end, 'number', 'type'], sep=" ", index=False, header=False, mode='a')
schema_dict['eid'][os.path.basename(edge_type_name)] = [all_edges_count, all_edges_count + edge_count]
edge_type_id += 1
all_edges_count += edge_count
if os.path.exists("{}_stats.txt".format(args.name)):
os.remove("{}_stats.txt".format(args.name))
df = pd.DataFrame([[all_nodes_count, all_edges_count, len(nodes_list)]], columns=['nodes_count', 'edges_count', 'weight_count'])
df.to_csv("{}_stats.txt".format(args.name), columns=['nodes_count', 'edges_count', 'weight_count'], sep=" ", index=False, header=False)
if os.path.exists("{}.json".format(args.name)):
os.remove("{}.json".format(args.name))
with open("{}.json".format(args.name), "w", encoding="utf8") as json_file:
json.dump(schema_dict, json_file, ensure_ascii=False)
171-0000102-1785122,0,2021-06-01 21:15:33,18604601535
171-0000550-1206725,1,2021-06-08 12:53:53,19613747325
171-0000784-4201160,2,2021-06-05 16:27:42,8348611025
#!/bin/bash
cur_dir=$(pwd)
host_count=`cat hostfile | wc -l`
graph_name="order"
perhost_part=2
current_host=`ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'`
echo "metis creation start"
##Nodes
`python3 metis_creation.py -n ${graph_name}`
echo "metis creation ends"
echo "directory creation starts"
while read p; do
if [ "$p" != "$current_host" ]; then
`ssh ${p} "mkdir -p ${cur_dir}" < /dev/null`
fi
done <hostfile
echo "directory creation ends"
echo "partioning starts"
`mpirun --hostfile hostfile -np ${host_count} pm_dglpart ${graph_name} ${perhost_part} > mpirun.out`
echo "partioning ends"
echo "scp starts"
while read p; do
if [ "$p" != "$current_host" ]; then
`scp ${p}:${cur_dir}/* ./ < /dev/null`
fi
done <hostfile
echo "scp ends"
echo "fetching removed edges starts"
`cat mpirun.out | grep "Duplicate edges with metadata" | awk -F'[][]' '{print $4}' > remove.csv`
echo "fetching removed edges ends"
echo "homo graph to herto graph starts"
`python3 substitute_to_hetero.py -n order -r remove.csv`
echo "homo graph to herto graph ends"
......@@ -6,8 +6,10 @@ import numpy as np
import dgl
import torch as th
import pyarrow
import pandas as pd
from pyarrow import csv
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
......@@ -27,6 +29,8 @@ parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--removed-edges', help='file where we have edges that were dropped', default=None, type=str)
args = parser.parse_args()
input_dir = args.input_dir
......@@ -38,6 +42,29 @@ edge_attr_dtype = args.edge_attr_dtype
workspace_dir = args.workspace
output_dir = args.output
if args.removed_edges is not None:
removed_file = '{}/{}'.format(input_dir, args.removed_edges)
remove_column_index = [0, 1, 2, 3]
remove_column_name = ["distributed_src_id", "distributed_dest_id", "src_id", "dest_id"]
removed_df = pd.read_csv(removed_file, sep=" ", header=None)
removed_df.rename(columns = {0: "src_id", 1: "dest_id"}, inplace=True)
# We are adding removed edges back into the partitioned file, so that all the edges
# that were removed during ParMETIS gets retained back into the partioned file, so that
# no edges were lost.
print('Adding removed edges back into the partitioned file, that way all edges that were removed during ParMETIS gets retained back into the partioned file')
for part_id in range(num_parts):
edge_file = '{}/p{:03}-{}_edges.txt'.format(input_dir, part_id, graph_name)
part_df = pd.read_csv(edge_file, sep=" ", usecols=remove_column_index, names=remove_column_name)
merge_df = pd.merge(part_df, removed_df, how='inner', on=["src_id", "dest_id"])
merge_df.to_csv(edge_file, mode='a', header=False, index=False, sep=" ")
print('All dropped edges were retained back into the partitioned files. Now partitioned files has all edges in them')
with open(args.schema) as json_file:
schema = json.load(json_file)
nid_ranges = schema['nid']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment