Unverified Commit 2a757d4a authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

Remove self-loops and duplicate edges before ParMETIS and restore when...


Remove self-loops and duplicate edges before ParMETIS and restore when converting to DGLGraph (#3472)

* save self-loops and duplicated edges separately.

* [BugFix] sort graph by dgl.ETYPE

* fix bugs in verify script

* fix verify logic

* refine README
Co-authored-by: default avatarDa Zheng <zhengda1936@gmail.com>
parent 96cd2ee6
...@@ -156,6 +156,10 @@ More details about the four steps are explained in our ...@@ -156,6 +156,10 @@ More details about the four steps are explained in our
The graph structure should be written as a node file and an edge file. The node features and edge features The graph structure should be written as a node file and an edge file. The node features and edge features
can be written as DGL tensors. `write_mag.py` shows an example of writing the OGB MAG graph into files. can be written as DGL tensors. `write_mag.py` shows an example of writing the OGB MAG graph into files.
As `pm_dglpart` cannot handle self-loops and duplicate edges correctly, these edges are removed and stored
into `mag_removed_edges.txt` when calling `write_mag.py`. When converting ParMETIS outputs into DGLGraph
in next steps, `mag_removed_edges.txt` should be passed in. Refer to Step 3 for more details.
```bash ```bash
python3 write_mag.py python3 write_mag.py
``` ```
...@@ -186,10 +190,11 @@ write the files on NFS. ...@@ -186,10 +190,11 @@ write the files on NFS.
### Step 3: Convert the ParMETIS partitions into DGLGraph ### Step 3: Convert the ParMETIS partitions into DGLGraph
DGL provides a tool called `convert_partition.py` to load one partition at a time and convert it into a DGLGraph DGL provides a tool called `convert_partition.py` to load one partition at a time and convert it into a DGLGraph
and save it into a file. and save it into a file. As mentioned in Step 1, please pass `mag_removed_edges.txt` if any self-loops and
duplicate edges are removed.
```bash ```bash
python3 ~/workspace/dgl/tools/convert_partition.py --input-dir . --graph-name mag --schema mag.json --num-parts 2 --num-node-weights 4 --output outputs python3 ~/workspace/dgl/tools/convert_partition.py --input-dir . --graph-name mag --schema mag.json --num-parts 2 --num-node-weights 4 --output outputs --removed-edges mag_removed_edges.txt
``` ```
### Step 4: Read node data and edge data for each partition ### Step 4: Read node data and edge data for each partition
......
...@@ -5,7 +5,9 @@ import dgl ...@@ -5,7 +5,9 @@ import dgl
import torch as th import torch as th
from ogb.nodeproppred import DglNodePropPredDataset from ogb.nodeproppred import DglNodePropPredDataset
with open('outputs/mag.json') as json_file: partitions_folder = 'outputs'
graph_name = 'mag'
with open('{}/{}.json'.format(partitions_folder, graph_name)) as json_file:
metadata = json.load(json_file) metadata = json.load(json_file)
num_parts = metadata['num_parts'] num_parts = metadata['num_parts']
...@@ -24,8 +26,10 @@ hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat'] ...@@ -24,8 +26,10 @@ hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']
node_feats = {} node_feats = {}
edge_feats = {} edge_feats = {}
for partid in range(num_parts): for partid in range(num_parts):
part_node_feats = dgl.data.utils.load_tensors('outputs/part{}/node_feat.dgl'.format(partid)) part_node_feats = dgl.data.utils.load_tensors(
part_edge_feats = dgl.data.utils.load_tensors('outputs/part{}/edge_feat.dgl'.format(partid)) '{}/part{}/node_feat.dgl'.format(partitions_folder, partid))
part_edge_feats = dgl.data.utils.load_tensors(
'{}/part{}/edge_feat.dgl'.format(partitions_folder, partid))
for key in part_node_feats: for key in part_node_feats:
if key in node_feats: if key in node_feats:
node_feats[key].append(part_node_feats[key]) node_feats[key].append(part_node_feats[key])
...@@ -52,6 +56,9 @@ for key in etype_map: ...@@ -52,6 +56,9 @@ for key in etype_map:
etype_id = etype_map[key] etype_id = etype_map[key]
etypes[etype_id] = key etypes[etype_id] = key
etype2canonical = {etype: (srctype, etype, dsttype)
for srctype, etype, dsttype in hg.canonical_etypes}
node_map = metadata['node_map'] node_map = metadata['node_map']
for key in node_map: for key in node_map:
node_map[key] = th.stack([th.tensor(row) for row in node_map[key]], 0) node_map[key] = th.stack([th.tensor(row) for row in node_map[key]], 0)
...@@ -61,14 +68,58 @@ for key in edge_map: ...@@ -61,14 +68,58 @@ for key in edge_map:
edge_map[key] = th.stack([th.tensor(row) for row in edge_map[key]], 0) edge_map[key] = th.stack([th.tensor(row) for row in edge_map[key]], 0)
eid_map = dgl.distributed.id_map.IdMap(edge_map) eid_map = dgl.distributed.id_map.IdMap(edge_map)
for ntype in node_map:
assert hg.number_of_nodes(ntype) == th.sum(
node_map[ntype][:, 1] - node_map[ntype][:, 0])
for etype in edge_map:
assert hg.number_of_edges(etype) == th.sum(
edge_map[etype][:, 1] - edge_map[etype][:, 0])
# verify part_0 with graph_partition_book
eid = []
gpb = dgl.distributed.graph_partition_book.RangePartitionBook(0, num_parts, node_map, edge_map,
{ntype: i for i, ntype in enumerate(
hg.ntypes)},
{etype: i for i, etype in enumerate(hg.etypes)})
subg0 = dgl.load_graphs('{}/part0/graph.dgl'.format(partitions_folder))[0][0]
for etype in hg.etypes:
type_eid = th.zeros((1,), dtype=th.int64)
eid.append(gpb.map_to_homo_eid(type_eid, etype))
eid = th.cat(eid)
part_id = gpb.eid2partid(eid)
assert th.all(part_id == 0)
local_eid = gpb.eid2localeid(eid, 0)
assert th.all(local_eid == eid)
assert th.all(subg0.edata[dgl.EID][local_eid] == eid)
lsrc, ldst = subg0.find_edges(local_eid)
gsrc, gdst = subg0.ndata[dgl.NID][lsrc], subg0.ndata[dgl.NID][ldst]
assert th.all(gsrc == lsrc)
# gdst which is not assigned into current partition is not required to equal ldst
assert th.all(th.logical_or(
gdst == ldst, subg0.ndata['inner_node'][ldst] == 0))
etids, _ = gpb.map_to_per_etype(eid)
src_tids, _ = gpb.map_to_per_ntype(gsrc)
dst_tids, _ = gpb.map_to_per_ntype(gdst)
canonical_etypes = []
etype_ids = th.arange(0, len(etypes))
for src_tid, etype_id, dst_tid in zip(src_tids, etype_ids, dst_tids):
canonical_etypes.append(
(ntypes[src_tid], etypes[etype_id], ntypes[dst_tid]))
for etype in canonical_etypes:
assert etype in hg.canonical_etypes
# Load the graph partition structure. # Load the graph partition structure.
orig_node_ids = {ntype: [] for ntype in hg.ntypes}
orig_edge_ids = {etype: [] for etype in hg.etypes}
for partid in range(num_parts): for partid in range(num_parts):
print('test part', partid) print('test part', partid)
part_file = 'outputs/part{}/graph.dgl'.format(partid) part_file = '{}/part{}/graph.dgl'.format(partitions_folder, partid)
subg = dgl.load_graphs(part_file)[0][0] subg = dgl.load_graphs(part_file)[0][0]
subg_src_id, subg_dst_id = subg.edges() subg_src_id, subg_dst_id = subg.edges()
subg_src_id = subg.ndata['orig_id'][subg_src_id] orig_src_id = subg.ndata['orig_id'][subg_src_id]
subg_dst_id = subg.ndata['orig_id'][subg_dst_id] orig_dst_id = subg.ndata['orig_id'][subg_dst_id]
global_src_id = subg.ndata[dgl.NID][subg_src_id]
global_dst_id = subg.ndata[dgl.NID][subg_dst_id]
subg_ntype = subg.ndata[dgl.NTYPE] subg_ntype = subg.ndata[dgl.NTYPE]
subg_etype = subg.edata[dgl.ETYPE] subg_etype = subg.edata[dgl.ETYPE]
for ntype_id in th.unique(subg_ntype): for ntype_id in th.unique(subg_ntype):
...@@ -78,8 +129,12 @@ for partid in range(num_parts): ...@@ -78,8 +129,12 @@ for partid in range(num_parts):
nid = subg.ndata[dgl.NID][idx] nid = subg.ndata[dgl.NID][idx]
ntype_ids1, type_nid = nid_map(nid) ntype_ids1, type_nid = nid_map(nid)
orig_type_nid = subg.ndata['orig_id'][idx] orig_type_nid = subg.ndata['orig_id'][idx]
inner_node = subg.ndata['inner_node'][idx]
# All nodes should have the same node type. # All nodes should have the same node type.
assert np.all(ntype_ids1.numpy() == int(ntype_id)) assert np.all(ntype_ids1.numpy() == int(ntype_id))
assert np.all(nid[inner_node == 1].numpy() == np.arange(
node_map[ntype][partid, 0], node_map[ntype][partid, 1]))
orig_node_ids[ntype].append(orig_type_nid[inner_node == 1])
# Check node data. # Check node data.
for name in hg.nodes[ntype].data: for name in hg.nodes[ntype].data:
...@@ -89,21 +144,43 @@ for partid in range(num_parts): ...@@ -89,21 +144,43 @@ for partid in range(num_parts):
for etype_id in th.unique(subg_etype): for etype_id in th.unique(subg_etype):
etype = etypes[etype_id] etype = etypes[etype_id]
srctype, _, dsttype = etype2canonical[etype]
idx = subg_etype == etype_id idx = subg_etype == etype_id
exist = hg[etype].has_edges_between(subg_src_id[idx], subg_dst_id[idx]) exist = hg[etype].has_edges_between(orig_src_id[idx], orig_dst_id[idx])
assert np.all(exist.numpy()) assert np.all(exist.numpy())
eid = hg[etype].edge_ids(subg_src_id[idx], subg_dst_id[idx]) eid = hg[etype].edge_ids(orig_src_id[idx], orig_dst_id[idx])
assert np.all(eid.numpy() == subg.edata['orig_id'][idx].numpy()) assert np.all(eid.numpy() == subg.edata['orig_id'][idx].numpy())
ntype_ids, type_nid = nid_map(global_src_id[idx])
assert len(th.unique(ntype_ids)) == 1
assert ntypes[ntype_ids[0]] == srctype
ntype_ids, type_nid = nid_map(global_dst_id[idx])
assert len(th.unique(ntype_ids)) == 1
assert ntypes[ntype_ids[0]] == dsttype
# This is global IDs after reshuffle. # This is global IDs after reshuffle.
eid = subg.edata[dgl.EID][idx] eid = subg.edata[dgl.EID][idx]
etype_ids1, type_eid = eid_map(eid) etype_ids1, type_eid = eid_map(eid)
orig_type_eid = subg.edata['orig_id'][idx] orig_type_eid = subg.edata['orig_id'][idx]
inner_edge = subg.edata['inner_edge'][idx]
# All edges should have the same edge type. # All edges should have the same edge type.
assert np.all(etype_ids1.numpy() == int(etype_id)) assert np.all(etype_ids1.numpy() == int(etype_id))
assert np.all(np.sort(eid[inner_edge == 1].numpy()) == np.arange(
edge_map[etype][partid, 0], edge_map[etype][partid, 1]))
orig_edge_ids[etype].append(orig_type_eid[inner_edge == 1])
# Check edge data. # Check edge data.
for name in hg.edges[etype].data: for name in hg.edges[etype].data:
local_data = edge_feats[etype + '/' + name][type_eid] local_data = edge_feats[etype + '/' + name][type_eid]
local_data1 = hg.edges[etype].data[name][orig_type_eid] local_data1 = hg.edges[etype].data[name][orig_type_eid]
assert np.all(local_data.numpy() == local_data1.numpy()) assert np.all(local_data.numpy() == local_data1.numpy())
for ntype in orig_node_ids:
nids = th.cat(orig_node_ids[ntype])
nids = th.sort(nids)[0]
assert np.all((nids == th.arange(hg.number_of_nodes(ntype))).numpy())
for etype in orig_edge_ids:
eids = th.cat(orig_edge_ids[etype])
eids = th.sort(eids)[0]
assert np.all((eids == th.arange(hg.number_of_edges(etype))).numpy())
...@@ -15,10 +15,6 @@ for etype in hg_orig.canonical_etypes: ...@@ -15,10 +15,6 @@ for etype in hg_orig.canonical_etypes:
hg = dgl.heterograph(subgs) hg = dgl.heterograph(subgs)
hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat'] hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat']
print(hg) print(hg)
#subg_nodes = {}
#for ntype in hg.ntypes:
# subg_nodes[ntype] = np.random.choice(hg.number_of_nodes(ntype), int(hg.number_of_nodes(ntype) / 5), replace=False)
#hg = dgl.compact_graphs(dgl.node_subgraph(hg, subg_nodes))
# OGB-MAG is stored in heterogeneous format. We need to convert it into homogeneous format. # OGB-MAG is stored in heterogeneous format. We need to convert it into homogeneous format.
g = dgl.to_homogeneous(hg) g = dgl.to_homogeneous(hg)
...@@ -46,11 +42,44 @@ for ntype in hg.ntypes: ...@@ -46,11 +42,44 @@ for ntype in hg.ntypes:
dgl.data.utils.save_tensors("node_feat.dgl", node_feats) dgl.data.utils.save_tensors("node_feat.dgl", node_feats)
# Store the metadata of edges. # Store the metadata of edges.
# ParMETIS cannot handle duplicated edges and self-loops. We should remove them
# in the preprocessing.
src_id, dst_id = g.edges() src_id, dst_id = g.edges()
edge_data = th.stack([src_id, dst_id, # Remove self-loops
g.edata['orig_id'], self_loop_idx = src_id == dst_id
g.edata[dgl.ETYPE]], 1) not_self_loop_idx = src_id != dst_id
self_loop_src_id = src_id[self_loop_idx]
self_loop_dst_id = dst_id[self_loop_idx]
self_loop_orig_id = g.edata['orig_id'][self_loop_idx]
self_loop_etype = g.edata[dgl.ETYPE][self_loop_idx]
src_id = src_id[not_self_loop_idx]
dst_id = dst_id[not_self_loop_idx]
orig_id = g.edata['orig_id'][not_self_loop_idx]
etype = g.edata[dgl.ETYPE][not_self_loop_idx]
# Remove duplicated edges.
ids = (src_id * g.number_of_nodes() + dst_id).numpy()
uniq_ids, idx = np.unique(ids, return_index=True)
duplicate_idx = np.setdiff1d(np.arange(len(ids)), idx)
duplicate_src_id = src_id[duplicate_idx]
duplicate_dst_id = dst_id[duplicate_idx]
duplicate_orig_id = orig_id[duplicate_idx]
duplicate_etype = etype[duplicate_idx]
src_id = src_id[idx]
dst_id = dst_id[idx]
orig_id = orig_id[idx]
etype = etype[idx]
edge_data = th.stack([src_id, dst_id, orig_id, etype], 1)
np.savetxt('mag_edges.txt', edge_data.numpy(), fmt='%d', delimiter=' ') np.savetxt('mag_edges.txt', edge_data.numpy(), fmt='%d', delimiter=' ')
removed_edge_data = th.stack([th.cat([self_loop_src_id, duplicate_src_id]),
th.cat([self_loop_dst_id, duplicate_dst_id]),
th.cat([self_loop_orig_id, duplicate_orig_id]),
th.cat([self_loop_etype, duplicate_etype])],
1)
np.savetxt('mag_removed_edges.txt',
removed_edge_data.numpy(), fmt='%d', delimiter=' ')
print('There are {} edges, remove {} self-loops and {} duplicated edges'.format(g.number_of_edges(),
len(self_loop_src_id),
len(duplicate_src_id)))
# Store the edge features # Store the edge features
edge_feats = {} edge_feats = {}
...@@ -60,9 +89,10 @@ for etype in hg.etypes: ...@@ -60,9 +89,10 @@ for etype in hg.etypes:
dgl.data.utils.save_tensors("edge_feat.dgl", edge_feats) dgl.data.utils.save_tensors("edge_feat.dgl", edge_feats)
# Store the basic metadata of the graph. # Store the basic metadata of the graph.
graph_stats = [g.number_of_nodes(), g.number_of_edges(), num_node_weights] graph_stats = [g.number_of_nodes(), len(src_id), num_node_weights]
with open('mag_stats.txt', 'w') as filehandle: with open('mag_stats.txt', 'w') as filehandle:
filehandle.writelines("{} {} {}".format(graph_stats[0], graph_stats[1], graph_stats[2])) filehandle.writelines("{} {} {}".format(
graph_stats[0], graph_stats[1], graph_stats[2]))
# Store the ID ranges of nodes and edges of the entire graph. # Store the ID ranges of nodes and edges of the entire graph.
nid_ranges = {} nid_ranges = {}
......
...@@ -29,7 +29,8 @@ parser.add_argument('--edge-attr-dtype', type=str, default=None, ...@@ -29,7 +29,8 @@ parser.add_argument('--edge-attr-dtype', type=str, default=None,
help='The data type of the edge attributes') help='The data type of the edge attributes')
parser.add_argument('--output', required=True, type=str, parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results') help='The output directory of the partitioned results')
parser.add_argument('--removed-edges', help='file where we have edges that were dropped', default=None, type=str) parser.add_argument('--removed-edges', help='a file that contains the removed self-loops and duplicated edges',
default=None, type=str)
args = parser.parse_args() args = parser.parse_args()
...@@ -42,47 +43,47 @@ edge_attr_dtype = args.edge_attr_dtype ...@@ -42,47 +43,47 @@ edge_attr_dtype = args.edge_attr_dtype
workspace_dir = args.workspace workspace_dir = args.workspace
output_dir = args.output output_dir = args.output
self_loop_edges = None
duplicate_edges = None
if args.removed_edges is not None: if args.removed_edges is not None:
removed_file = '{}/{}'.format(input_dir, args.removed_edges) removed_file = '{}/{}'.format(input_dir, args.removed_edges)
remove_column_index = [0, 1, 2, 3] removed_df = csv.read_csv(removed_file, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
remove_column_name = ["distributed_src_id", "distributed_dest_id", "src_id", "dest_id"] parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
removed_df = pd.read_csv(removed_file, sep=" ", header=None) assert removed_df.num_columns == 4
removed_df.rename(columns = {0: "src_id", 1: "dest_id"}, inplace=True) src_id = removed_df['f0'].to_numpy()
dst_id = removed_df['f1'].to_numpy()
# We are adding removed edges back into the partitioned file, so that all the edges orig_id = removed_df['f2'].to_numpy()
# that were removed during ParMETIS gets retained back into the partioned file, so that etype = removed_df['f3'].to_numpy()
# no edges were lost. self_loop_idx = src_id == dst_id
not_self_loop_idx = src_id != dst_id
print('Adding removed edges back into the partitioned file, that way all edges that were removed during ParMETIS gets retained back into the partioned file') self_loop_edges = [src_id[self_loop_idx], dst_id[self_loop_idx],
orig_id[self_loop_idx], etype[self_loop_idx]]
for part_id in range(num_parts): duplicate_edges = [src_id[not_self_loop_idx], dst_id[not_self_loop_idx],
edge_file = '{}/p{:03}-{}_edges.txt'.format(input_dir, part_id, graph_name) orig_id[not_self_loop_idx], etype[not_self_loop_idx]]
part_df = pd.read_csv(edge_file, sep=" ", usecols=remove_column_index, names=remove_column_name) print('There are {} self-loops and {} duplicated edges in the removed edges'.format(len(self_loop_edges[0]),
merge_df = pd.merge(part_df, removed_df, how='inner', on=["src_id", "dest_id"]) len(duplicate_edges[0])))
merge_df.to_csv(edge_file, mode='a', header=False, index=False, sep=" ")
print('All dropped edges were retained back into the partitioned files. Now partitioned files has all edges in them')
with open(args.schema) as json_file: with open(args.schema) as json_file:
schema = json.load(json_file) schema = json.load(json_file)
nid_ranges = schema['nid'] nid_ranges = schema['nid']
eid_ranges = schema['eid'] eid_ranges = schema['eid']
nid_ranges = {key: np.array(nid_ranges[key]).reshape(1, 2) for key in nid_ranges} nid_ranges = {key: np.array(nid_ranges[key]).reshape(
eid_ranges = {key: np.array(eid_ranges[key]).reshape(1, 2) for key in eid_ranges} 1, 2) for key in nid_ranges}
eid_ranges = {key: np.array(eid_ranges[key]).reshape(
1, 2) for key in eid_ranges}
id_map = dgl.distributed.id_map.IdMap(nid_ranges) id_map = dgl.distributed.id_map.IdMap(nid_ranges)
ntypes = [(key, nid_ranges[key][0,0]) for key in nid_ranges] ntypes = [(key, nid_ranges[key][0, 0]) for key in nid_ranges]
ntypes.sort(key=lambda e: e[1]) ntypes.sort(key=lambda e: e[1])
ntype_offset_np = np.array([e[1] for e in ntypes]) ntype_offset_np = np.array([e[1] for e in ntypes])
ntypes = [e[0] for e in ntypes] ntypes = [e[0] for e in ntypes]
ntypes_map = {e:i for i, e in enumerate(ntypes)} ntypes_map = {e: i for i, e in enumerate(ntypes)}
etypes = [(key, eid_ranges[key][0,0]) for key in eid_ranges] etypes = [(key, eid_ranges[key][0, 0]) for key in eid_ranges]
etypes.sort(key=lambda e: e[1]) etypes.sort(key=lambda e: e[1])
etype_offset_np = np.array([e[1] for e in etypes]) etype_offset_np = np.array([e[1] for e in etypes])
etypes = [e[0] for e in etypes] etypes = [e[0] for e in etypes]
etypes_map = {e:i for i, e in enumerate(etypes)} etypes_map = {e: i for i, e in enumerate(etypes)}
def read_feats(file_name): def read_feats(file_name):
attrs = csv.read_csv(file_name, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), attrs = csv.read_csv(file_name, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
...@@ -90,11 +91,16 @@ def read_feats(file_name): ...@@ -90,11 +91,16 @@ def read_feats(file_name):
num_cols = len(attrs.columns) num_cols = len(attrs.columns)
return np.stack([attrs.columns[i].to_numpy() for i in range(num_cols)], 1) return np.stack([attrs.columns[i].to_numpy() for i in range(num_cols)], 1)
max_nid = np.iinfo(np.int32).max
num_edges = 0 num_edges = 0
num_nodes = 0 num_nodes = 0
node_map_val = {ntype:[] for ntype in ntypes} node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype:[] for etype in etypes} edge_map_val = {etype: [] for etype in etypes}
for part_id in range(num_parts): for part_id in range(num_parts):
part_dir = output_dir + '/part' + str(part_id)
os.makedirs(part_dir, exist_ok=True)
node_file = 'p{:03}-{}_nodes.txt'.format(part_id, graph_name) node_file = 'p{:03}-{}_nodes.txt'.format(part_id, graph_name)
# The format of each line in the node file: # The format of each line in the node file:
# <node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes> # <node_id> <node_type> <weight1> ... <orig_type_node_id> <attributes>
...@@ -129,15 +135,19 @@ for part_id in range(num_parts): ...@@ -129,15 +135,19 @@ for part_id in range(num_parts):
for ntype_name in nid_ranges: for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name] ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id] type_nids = nids[ntype_ids == ntype_id]
assert np.all(type_nids == np.arange(type_nids[0], type_nids[-1] + 1)) assert np.all(type_nids == np.arange(
node_feats[ntype_name + '/feat'] = th.as_tensor(node_attrs[ntype_ids == ntype_id]) type_nids[0], type_nids[-1] + 1))
dgl.data.utils.save_tensors(os.path.join(part_dir, "node_feat.dgl"), node_feats) node_feats[ntype_name +
'/feat'] = th.as_tensor(node_attrs[ntype_ids == ntype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "node_feat.dgl"), node_feats)
# Determine the node ID ranges of different node types. # Determine the node ID ranges of different node types.
for ntype_name in nid_ranges: for ntype_name in nid_ranges:
ntype_id = ntypes_map[ntype_name] ntype_id = ntypes_map[ntype_name]
type_nids = nids[ntype_ids == ntype_id] type_nids = nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append([int(type_nids[0]), int(type_nids[-1]) + 1]) node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
edge_file = 'p{:03}-{}_edges.txt'.format(part_id, graph_name) edge_file = 'p{:03}-{}_edges.txt'.format(part_id, graph_name)
# The format of each line in the edge file: # The format of each line in the edge file:
...@@ -149,7 +159,61 @@ for part_id in range(num_parts): ...@@ -149,7 +159,61 @@ for part_id in range(num_parts):
edges = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), edges = csv.read_csv(tmp_output, read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True),
parse_options=pyarrow.csv.ParseOptions(delimiter=' ')) parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
num_cols = len(edges.columns) num_cols = len(edges.columns)
src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = [edges.columns[i].to_numpy() for i in range(num_cols)] src_id, dst_id, orig_src_id, orig_dst_id, orig_edge_id, etype_ids = [
edges.columns[i].to_numpy() for i in range(num_cols)]
# Let's merge the self-loops and duplicated edges to the partition.
src_id_list, dst_id_list = [src_id], [dst_id]
orig_src_id_list, orig_dst_id_list = [orig_src_id], [orig_dst_id]
orig_edge_id_list, etype_id_list = [orig_edge_id], [etype_ids]
if self_loop_edges is not None and len(self_loop_edges[0]) > 0:
uniq_orig_nids, idx = np.unique(orig_dst_id, return_index=True)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_nids, self_loop_edges[0], return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(dst_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(self_loop_edges[0][common_idx2])
orig_dst_id_list.append(self_loop_edges[0][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(self_loop_edges[2][common_idx2])
etype_id_list.append(self_loop_edges[3][common_idx2])
print('Add {} self-loops in partition {}'.format(len(idx), part_id))
if duplicate_edges is not None and len(duplicate_edges[0]) > 0:
part_ids = orig_src_id.astype(
np.int64) * max_nid + orig_dst_id.astype(np.int64)
uniq_orig_ids, idx = np.unique(part_ids, return_index=True)
duplicate_ids = duplicate_edges[0].astype(
np.int64) * max_nid + duplicate_edges[1].astype(np.int64)
common_nids, common_idx1, common_idx2 = np.intersect1d(
uniq_orig_ids, duplicate_ids, return_indices=True)
idx = idx[common_idx1]
# the IDs after ID assignment
src_id_list.append(src_id[idx])
dst_id_list.append(dst_id[idx])
# homogeneous IDs in the input graph.
orig_src_id_list.append(duplicate_edges[0][common_idx2])
orig_dst_id_list.append(duplicate_edges[1][common_idx2])
# edge IDs and edge type.
orig_edge_id_list.append(duplicate_edges[2][common_idx2])
etype_id_list.append(duplicate_edges[3][common_idx2])
print('Add {} duplicated edges in partition {}'.format(len(idx), part_id))
src_id = np.concatenate(src_id_list) if len(
src_id_list) > 1 else src_id_list[0]
dst_id = np.concatenate(dst_id_list) if len(
dst_id_list) > 1 else dst_id_list[0]
orig_src_id = np.concatenate(orig_src_id_list) if len(
orig_src_id_list) > 1 else orig_src_id_list[0]
orig_dst_id = np.concatenate(orig_dst_id_list) if len(
orig_dst_id_list) > 1 else orig_dst_id_list[0]
orig_edge_id = np.concatenate(orig_edge_id_list) if len(
orig_edge_id_list) > 1 else orig_edge_id_list[0]
etype_ids = np.concatenate(etype_id_list) if len(
etype_id_list) > 1 else etype_id_list[0]
print('There are {} edges in partition {}'.format(len(src_id), part_id))
# It's not guaranteed that the edges are sorted based on edge type. # It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges. # Let's sort edges and all attributes on the edges.
sort_idx = np.argsort(etype_ids) sort_idx = np.argsort(etype_ids)
...@@ -162,13 +226,16 @@ for part_id in range(num_parts): ...@@ -162,13 +226,16 @@ for part_id in range(num_parts):
# Here we just assume all edges have the same attributes. # Here we just assume all edges have the same attributes.
# In practice, this is not the same, in which we need more complex solution to # In practice, this is not the same, in which we need more complex solution to
# encode and decode edge attributes. # encode and decode edge attributes.
os.system('cut -d\' \' -f 7- {} > {}'.format(input_dir + '/' + edge_file, tmp_output)) os.system('cut -d\' \' -f 7- {} > {}'.format(input_dir +
'/' + edge_file, tmp_output))
edge_attrs = th.as_tensor(read_feats(tmp_output))[sort_idx] edge_attrs = th.as_tensor(read_feats(tmp_output))[sort_idx]
edge_feats = {} edge_feats = {}
for etype_name in eid_ranges: for etype_name in eid_ranges:
etype_id = etypes_map[etype_name] etype_id = etypes_map[etype_name]
edge_feats[etype_name + '/feat'] = th.as_tensor(edge_attrs[etype_ids == etype_id]) edge_feats[etype_name +
dgl.data.utils.save_tensors(os.path.join(part_dir, "edge_feat.dgl"), edge_feats) '/feat'] = th.as_tensor(edge_attrs[etype_ids == etype_id])
dgl.data.utils.save_tensors(os.path.join(
part_dir, "edge_feat.dgl"), edge_feats)
# Determine the edge ID range of different edge types. # Determine the edge ID range of different edge types.
edge_id_start = num_edges edge_id_start = num_edges
...@@ -185,17 +252,18 @@ for part_id in range(num_parts): ...@@ -185,17 +252,18 @@ for part_id in range(num_parts):
# This happens in a directed graph. # This happens in a directed graph.
# To avoid this kind of nodes being removed from the graph, we add node IDs that # To avoid this kind of nodes being removed from the graph, we add node IDs that
# belong to this partition. # belong to this partition.
ids = np.concatenate([src_id, dst_id, np.arange(nid_range[0], nid_range[1] + 1)]) ids = np.concatenate(
uniq_ids, idx, inverse_idx = np.unique(ids, return_index=True, return_inverse=True) [src_id, dst_id, np.arange(nid_range[0], nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx) assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range. # We get the edge list with their node IDs mapped to a contiguous ID range.
local_src_id, local_dst_id = np.split(inverse_idx[:len(src_id) * 2], 2) local_src_id, local_dst_id = np.split(inverse_idx[:len(src_id) * 2], 2)
compact_g = dgl.graph((local_src_id, local_dst_id)) compact_g = dgl.graph((local_src_id, local_dst_id))
compact_g.edata['orig_id'] = th.as_tensor(orig_edge_id) compact_g.edata['orig_id'] = th.as_tensor(orig_edge_id)
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids) compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
compact_g.edata['inner_edge'] = th.ones(compact_g.number_of_edges(), dtype=th.bool) compact_g.edata['inner_edge'] = th.ones(
compact_g.edata[dgl.EID] = th.arange(num_edges, num_edges + compact_g.number_of_edges()) compact_g.number_of_edges(), dtype=th.bool)
num_edges += compact_g.number_of_edges()
# The original IDs are homogeneous IDs. # The original IDs are homogeneous IDs.
# Similarly, we need to add the original homogeneous node IDs # Similarly, we need to add the original homogeneous node IDs
...@@ -205,9 +273,11 @@ for part_id in range(num_parts): ...@@ -205,9 +273,11 @@ for part_id in range(num_parts):
compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids) compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids)
compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype) compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype)
compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids) compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids)
compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(uniq_ids >= nid_range[0], uniq_ids <= nid_range[1])) compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and(
uniq_ids >= nid_range[0], uniq_ids <= nid_range[1]))
local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()] local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()]
assert np.all((local_nids == th.arange(local_nids[0], local_nids[-1] + 1)).numpy()) assert np.all((local_nids == th.arange(
local_nids[0], local_nids[-1] + 1)).numpy())
print('|V|={}'.format(compact_g.number_of_nodes())) print('|V|={}'.format(compact_g.number_of_nodes()))
print('|E|={}'.format(compact_g.number_of_edges())) print('|E|={}'.format(compact_g.number_of_edges()))
...@@ -223,11 +293,25 @@ for part_id in range(num_parts): ...@@ -223,11 +293,25 @@ for part_id in range(num_parts):
compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]] compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]] compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]]
compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]] compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.EID] = compact_g.edata[dgl.EID][compact_g1.edata[dgl.EID]]
part_dir = output_dir + '/part' + str(part_id) # reshuffle edges on ETYPE as node_subgraph relabels edges
os.makedirs(part_dir, exist_ok=True) idx = th.argsort(compact_g1.edata[dgl.ETYPE])
dgl.save_graphs(part_dir + '/graph.dgl', [compact_g1]) u, v = compact_g1.edges()
u = u[idx]
v = v[idx]
compact_g2 = dgl.graph((u, v))
compact_g2.ndata['orig_id'] = compact_g1.ndata['orig_id']
compact_g2.ndata[dgl.NTYPE] = compact_g1.ndata[dgl.NTYPE]
compact_g2.ndata[dgl.NID] = compact_g1.ndata[dgl.NID]
compact_g2.ndata['inner_node'] = compact_g1.ndata['inner_node']
compact_g2.edata['orig_id'] = compact_g1.edata['orig_id'][idx]
compact_g2.edata[dgl.ETYPE] = compact_g1.edata[dgl.ETYPE][idx]
compact_g2.edata['inner_edge'] = compact_g1.edata['inner_edge'][idx]
compact_g2.edata[dgl.EID] = th.arange(
num_edges, num_edges + compact_g2.number_of_edges())
num_edges += compact_g2.number_of_edges()
dgl.save_graphs(part_dir + '/graph.dgl', [compact_g2])
part_metadata = {'graph_name': graph_name, part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes, 'num_nodes': num_nodes,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment