Unverified Commit 60bc0b76 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[Distributed] reduce memory consumption in distributed graph partitioning. (#4338)

* Fix for node_subgraph function, which seems to generate segmentation fault for very large partitions

1. Removed three graph dgl objects and we create the final dgl object directly by maintaining the following constraints
a) nodes are reordered so that local nodes are placed in the beginning of the nodes list compared to non-local nodes.
b)Edges order are maintained as passed into this function.
c) src/dst end points are mapped to target values based on the reshuffle'd nodes order.

* Code changes addressing CI comments for this PR

1. Used Da's suggested map to map nodes from old to new order.
This is much simpler and mem. efficient.

* Addressing CI Comments.

1. Reduced the amount of documentation to reflect the actual implementation.
2. named the mapping object appropriately.
parent c1e01b1d
...@@ -157,73 +157,94 @@ def create_dgl_object(graph_name, num_parts, \ ...@@ -157,73 +157,94 @@ def create_dgl_object(graph_name, num_parts, \
edge_id_start + np.sum(etype_ids == etype_id)]) edge_id_start + np.sum(etype_ids == etype_id)])
edge_id_start += np.sum(etype_ids == etype_id) edge_id_start += np.sum(etype_ids == etype_id)
# Here we want to compute the unique IDs in the edge list. # get the edge list in some order and then reshuffle.
# It is possible that a node that belongs to the partition but it doesn't appear # Here the order of nodes is defined by the `np.unique` function
# in the edge list. That is, the node is assigned to this partition, but its neighbor # node order is as listed in the uniq_ids array
# belongs to another partition so that the edge is assigned to another partition.
# This happens in a directed graph.
# To avoid this kind of nodes being removed from the graph, we add node IDs that
# belong to this partition.
ids = np.concatenate( ids = np.concatenate(
[shuffle_global_src_id, shuffle_global_dst_id, np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1)]) [shuffle_global_src_id, shuffle_global_dst_id,
np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1)])
uniq_ids, idx, inverse_idx = np.unique( uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True) ids, return_index=True, return_inverse=True)
assert len(uniq_ids) == len(idx) assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range. # We get the edge list with their node IDs mapped to a contiguous ID range.
part_local_src_id, part_local_dst_id = np.split(inverse_idx[:len(shuffle_global_src_id) * 2], 2) part_local_src_id, part_local_dst_id = np.split(inverse_idx[:len(shuffle_global_src_id) * 2], 2)
compact_g = dgl.graph(data=(part_local_src_id, part_local_dst_id), num_nodes=len(idx)) inner_nodes = th.as_tensor(np.logical_and(
compact_g.edata['orig_id'] = th.as_tensor(global_edge_id) uniq_ids >= shuffle_global_nid_range[0],
compact_g.edata[dgl.ETYPE] = th.as_tensor(etype_ids) uniq_ids <= shuffle_global_nid_range[1]))
compact_g.edata['inner_edge'] = th.ones(
compact_g.number_of_edges(), dtype=th.bool) #get the list of indices, from inner_nodes, which will sort inner_nodes as [True, True, ...., False, False, ...]
#essentially local nodes will be placed before non-local nodes.
# The original IDs are homogeneous IDs. reshuffle_nodes = th.arange(len(uniq_ids))
# Similarly, we need to add the original homogeneous node IDs reshuffle_nodes = th.cat([reshuffle_nodes[inner_nodes.bool()],
global_nids = np.concatenate([global_src_id, global_dst_id, global_homo_nid]) reshuffle_nodes[inner_nodes == 0]])
global_homo_ids = global_nids[idx]
ntype, per_type_ids = id_map(global_homo_ids) '''
compact_g.ndata['orig_id'] = th.as_tensor(per_type_ids) Following procedure is used to map the part_local_src_id, part_local_dst_id to account for
compact_g.ndata[dgl.NTYPE] = th.as_tensor(ntype) reshuffling of nodes (to order localy owned nodes prior to non-local nodes in a partition)
compact_g.ndata[dgl.NID] = th.as_tensor(uniq_ids) 1. Form a node_map, in this case a numpy array, which will be used to map old node-ids (pre-reshuffling)
compact_g.ndata['inner_node'] = th.as_tensor(np.logical_and( to post-reshuffling ids.
uniq_ids >= shuffle_global_nid_range[0], uniq_ids <= shuffle_global_nid_range[1])) 2. Once the map is created, use this map to map all the node-ids in the part_local_src_id
part_local_nids = compact_g.ndata[dgl.NID][compact_g.ndata['inner_node'].bool()] and part_local_dst_id list to their appropriate `new` node-ids (post-reshuffle order).
assert np.all((part_local_nids == th.arange( 3. Since only the node's order is changed, we will have to re-order nodes related information when
part_local_nids[0], part_local_nids[-1] + 1)).numpy()) creating dgl object: this includes orig_id, dgl.NTYPE, dgl.NID and inner_node.
print('|V|={}'.format(compact_g.number_of_nodes())) 4. Edge's order is not changed. At this point in the execution path edges are still ordered by their etype-ids.
print('|E|={}'.format(compact_g.number_of_edges())) 5. Create the dgl object appropriately and return the dgl object.
# We need to reshuffle nodes in a partition so that all local nodes are labelled starting from 0. Here is a simple example to understand the above flow better.
reshuffle_nodes = th.arange(compact_g.number_of_nodes())
reshuffle_nodes = th.cat([reshuffle_nodes[compact_g.ndata['inner_node'].bool()], part_local_nids = [0, 1, 2, 3, 4, 5]
reshuffle_nodes[compact_g.ndata['inner_node'] == 0]]) part_local_src_ids = [0, 0, 0, 0, 2, 3, 4]
compact_g1 = dgl.node_subgraph(compact_g, reshuffle_nodes) part_local_dst_ids = [1, 2, 3, 4, 4, 4, 5]
compact_g1.ndata['orig_id'] = compact_g.ndata['orig_id'][reshuffle_nodes]
compact_g1.ndata[dgl.NTYPE] = compact_g.ndata[dgl.NTYPE][reshuffle_nodes] Assume that nodes {1, 5} are halo-nodes, which are not owned by this partition.
compact_g1.ndata[dgl.NID] = compact_g.ndata[dgl.NID][reshuffle_nodes]
compact_g1.ndata['inner_node'] = compact_g.ndata['inner_node'][reshuffle_nodes] reshuffle_nodes = [0, 2, 3, 4, 1, 5]
compact_g1.edata['orig_id'] = compact_g.edata['orig_id'][compact_g1.edata[dgl.EID]]
compact_g1.edata[dgl.ETYPE] = compact_g.edata[dgl.ETYPE][compact_g1.edata[dgl.EID]] A node_map, which maps node-ids from old to reshuffled order is as follows:
compact_g1.edata['inner_edge'] = compact_g.edata['inner_edge'][compact_g1.edata[dgl.EID]] node_map = np.zeros((len(reshuffle_nodes,)))
node_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))
# reshuffle edges on ETYPE as node_subgraph relabels edges
idx = th.argsort(compact_g1.edata[dgl.ETYPE]) Using the above map, we have mapped part_local_src_ids and part_local_dst_ids as follows:
u, v = compact_g1.edges() part_local_src_ids = [0, 0, 0, 0, 1, 2, 3]
u = u[idx] part_local_dst_ids = [4, 1, 2, 3, 3, 3, 5]
v = v[idx]
compact_g2 = dgl.graph((u, v)) In this graph above, note that nodes {0, 1, 2, 3} are inner_nodes and {4, 5} are NON-inner-nodes
compact_g2.ndata['orig_id'] = compact_g1.ndata['orig_id']
compact_g2.ndata[dgl.NTYPE] = compact_g1.ndata[dgl.NTYPE] Since the edge are re-ordered in any way, there is no reordering required for edge related data
compact_g2.ndata[dgl.NID] = compact_g1.ndata[dgl.NID] during the DGL object creation.
compact_g2.ndata['inner_node'] = compact_g1.ndata['inner_node'] '''
compact_g2.edata['orig_id'] = compact_g1.edata['orig_id'][idx] #create the mappings to generate mapped part_local_src_id and part_local_dst_id
compact_g2.edata[dgl.ETYPE] = compact_g1.edata[dgl.ETYPE][idx] #This map will map from unshuffled node-ids to reshuffled-node-ids (which are ordered to prioritize
compact_g2.edata['inner_edge'] = compact_g1.edata['inner_edge'][idx] #locally owned nodes).
compact_g2.edata[dgl.EID] = th.arange( nid_map = np.zeros((len(reshuffle_nodes,)))
edgeid_offset, edgeid_offset + compact_g2.number_of_edges(), dtype=th.int64) nid_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))
edgeid_offset += compact_g2.number_of_edges()
#Now map the edge end points to reshuffled_values.
return compact_g2, node_map_val, edge_map_val, ntypes_map, etypes_map part_local_src_id, part_local_dst_id = nid_map[part_local_src_id], nid_map[part_local_dst_id]
#create the graph here now.
part_graph = dgl.graph(data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids))
part_graph.edata[dgl.EID] = th.arange(
edgeid_offset, edgeid_offset + part_graph.number_of_edges(), dtype=th.int64)
part_graph.edata['orig_id'] = th.as_tensor(global_edge_id)
part_graph.edata[dgl.ETYPE] = th.as_tensor(etype_ids)
part_graph.edata['inner_edge'] = th.ones(part_graph.number_of_edges(), dtype=th.bool)
#compute per_type_ids and ntype for all the nodes in the graph.
global_ids = np.concatenate(
[global_src_id, global_dst_id, global_homo_nid])
part_global_ids = global_ids[idx]
part_global_ids = part_global_ids[reshuffle_nodes]
ntype, per_type_ids = id_map(part_global_ids)
#continue with the graph creation
part_graph.ndata['orig_id'] = th.as_tensor(per_type_ids)
part_graph.ndata[dgl.NTYPE] = th.as_tensor(ntype)
part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
part_graph.ndata['inner_node'] = inner_nodes[reshuffle_nodes]
return part_graph, node_map_val, edge_map_val, ntypes_map, etypes_map
def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, node_map_val, \ def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, node_map_val, \
edge_map_val, ntypes_map, etypes_map, output_dir ): edge_map_val, ntypes_map, etypes_map, output_dir ):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment