import os import json import numpy as np import dgl import torch as th from ogb.nodeproppred import DglNodePropPredDataset partitions_folder = 'outputs' graph_name = 'mag' with open('{}/{}.json'.format(partitions_folder, graph_name)) as json_file: metadata = json.load(json_file) num_parts = metadata['num_parts'] # Load OGB-MAG. dataset = DglNodePropPredDataset(name='ogbn-mag') hg_orig, labels = dataset[0] subgs = {} for etype in hg_orig.canonical_etypes: u, v = hg_orig.all_edges(etype=etype) subgs[etype] = (u, v) subgs[(etype[2], 'rev-'+etype[1], etype[0])] = (v, u) hg = dgl.heterograph(subgs) hg.nodes['paper'].data['feat'] = hg_orig.nodes['paper'].data['feat'] # Construct node data and edge data after reshuffling. node_feats = {} edge_feats = {} for partid in range(num_parts): part_node_feats = dgl.data.utils.load_tensors( '{}/part{}/node_feat.dgl'.format(partitions_folder, partid)) part_edge_feats = dgl.data.utils.load_tensors( '{}/part{}/edge_feat.dgl'.format(partitions_folder, partid)) for key in part_node_feats: if key in node_feats: node_feats[key].append(part_node_feats[key]) else: node_feats[key] = [part_node_feats[key]] for key in part_edge_feats: if key in edge_feats: edge_feats[key].append(part_edge_feats[key]) else: edge_feats[key] = [part_edge_feats[key]] for key in node_feats: node_feats[key] = th.cat(node_feats[key]) for key in edge_feats: edge_feats[key] = th.cat(edge_feats[key]) ntype_map = metadata['ntypes'] ntypes = [None] * len(ntype_map) for key in ntype_map: ntype_id = ntype_map[key] ntypes[ntype_id] = key etype_map = metadata['etypes'] etypes = [None] * len(etype_map) for key in etype_map: etype_id = etype_map[key] etypes[etype_id] = key etype2canonical = {etype: (srctype, etype, dsttype) for srctype, etype, dsttype in hg.canonical_etypes} node_map = metadata['node_map'] for key in node_map: node_map[key] = th.stack([th.tensor(row) for row in node_map[key]], 0) nid_map = dgl.distributed.id_map.IdMap(node_map) edge_map = metadata['edge_map'] for key in edge_map: edge_map[key] = th.stack([th.tensor(row) for row in edge_map[key]], 0) eid_map = dgl.distributed.id_map.IdMap(edge_map) for ntype in node_map: assert hg.number_of_nodes(ntype) == th.sum( node_map[ntype][:, 1] - node_map[ntype][:, 0]) for etype in edge_map: assert hg.number_of_edges(etype) == th.sum( edge_map[etype][:, 1] - edge_map[etype][:, 0]) # verify part_0 with graph_partition_book eid = [] gpb = dgl.distributed.graph_partition_book.RangePartitionBook(0, num_parts, node_map, edge_map, {ntype: i for i, ntype in enumerate( hg.ntypes)}, {etype: i for i, etype in enumerate(hg.etypes)}) subg0 = dgl.load_graphs('{}/part0/graph.dgl'.format(partitions_folder))[0][0] for etype in hg.etypes: type_eid = th.zeros((1,), dtype=th.int64) eid.append(gpb.map_to_homo_eid(type_eid, etype)) eid = th.cat(eid) part_id = gpb.eid2partid(eid) assert th.all(part_id == 0) local_eid = gpb.eid2localeid(eid, 0) assert th.all(local_eid == eid) assert th.all(subg0.edata[dgl.EID][local_eid] == eid) lsrc, ldst = subg0.find_edges(local_eid) gsrc, gdst = subg0.ndata[dgl.NID][lsrc], subg0.ndata[dgl.NID][ldst] # The destination nodes are owned by the partition. assert th.all(gdst == ldst) # gdst which is not assigned into current partition is not required to equal ldst assert th.all(th.logical_or( gdst == ldst, subg0.ndata['inner_node'][ldst] == 0)) etids, _ = gpb.map_to_per_etype(eid) src_tids, _ = gpb.map_to_per_ntype(gsrc) dst_tids, _ = gpb.map_to_per_ntype(gdst) canonical_etypes = [] etype_ids = th.arange(0, len(etypes)) for src_tid, etype_id, dst_tid in zip(src_tids, etype_ids, dst_tids): canonical_etypes.append( (ntypes[src_tid], etypes[etype_id], ntypes[dst_tid])) for etype in canonical_etypes: assert etype in hg.canonical_etypes # Load the graph partition structure. orig_node_ids = {ntype: [] for ntype in hg.ntypes} orig_edge_ids = {etype: [] for etype in hg.etypes} for partid in range(num_parts): print('test part', partid) part_file = '{}/part{}/graph.dgl'.format(partitions_folder, partid) subg = dgl.load_graphs(part_file)[0][0] subg_src_id, subg_dst_id = subg.edges() orig_src_id = subg.ndata['orig_id'][subg_src_id] orig_dst_id = subg.ndata['orig_id'][subg_dst_id] global_src_id = subg.ndata[dgl.NID][subg_src_id] global_dst_id = subg.ndata[dgl.NID][subg_dst_id] subg_ntype = subg.ndata[dgl.NTYPE] subg_etype = subg.edata[dgl.ETYPE] for ntype_id in th.unique(subg_ntype): ntype = ntypes[ntype_id] idx = subg_ntype == ntype_id # This is global IDs after reshuffle. nid = subg.ndata[dgl.NID][idx] ntype_ids1, type_nid = nid_map(nid) orig_type_nid = subg.ndata['orig_id'][idx] inner_node = subg.ndata['inner_node'][idx] # All nodes should have the same node type. assert np.all(ntype_ids1.numpy() == int(ntype_id)) assert np.all(nid[inner_node == 1].numpy() == np.arange( node_map[ntype][partid, 0], node_map[ntype][partid, 1])) orig_node_ids[ntype].append(orig_type_nid[inner_node == 1]) # Check the degree of the inner nodes. inner_nids = th.nonzero(th.logical_and(subg_ntype == ntype_id, subg.ndata['inner_node']), as_tuple=True)[0] subg_deg = subg.in_degrees(inner_nids) orig_nids = subg.ndata['orig_id'][inner_nids] # Calculate the in-degrees of nodes of a particular node type. glob_deg = th.zeros(len(subg_deg), dtype=th.int64) for etype in hg.canonical_etypes: dst_ntype = etype[2] if dst_ntype == ntype: glob_deg += hg.in_degrees(orig_nids, etype=etype) assert np.all(glob_deg.numpy() == subg_deg.numpy()) # Check node data. for name in hg.nodes[ntype].data: local_data = node_feats[ntype + '/' + name][type_nid] local_data1 = hg.nodes[ntype].data[name][orig_type_nid] assert np.all(local_data.numpy() == local_data1.numpy()) for etype_id in th.unique(subg_etype): etype = etypes[etype_id] srctype, _, dsttype = etype2canonical[etype] idx = subg_etype == etype_id exist = hg[etype].has_edges_between(orig_src_id[idx], orig_dst_id[idx]) assert np.all(exist.numpy()) eid = hg[etype].edge_ids(orig_src_id[idx], orig_dst_id[idx]) assert np.all(eid.numpy() == subg.edata['orig_id'][idx].numpy()) ntype_ids, type_nid = nid_map(global_src_id[idx]) assert len(th.unique(ntype_ids)) == 1 assert ntypes[ntype_ids[0]] == srctype ntype_ids, type_nid = nid_map(global_dst_id[idx]) assert len(th.unique(ntype_ids)) == 1 assert ntypes[ntype_ids[0]] == dsttype # This is global IDs after reshuffle. eid = subg.edata[dgl.EID][idx] etype_ids1, type_eid = eid_map(eid) orig_type_eid = subg.edata['orig_id'][idx] inner_edge = subg.edata['inner_edge'][idx] # All edges should have the same edge type. assert np.all(etype_ids1.numpy() == int(etype_id)) assert np.all(np.sort(eid[inner_edge == 1].numpy()) == np.arange( edge_map[etype][partid, 0], edge_map[etype][partid, 1])) orig_edge_ids[etype].append(orig_type_eid[inner_edge == 1]) # Check edge data. for name in hg.edges[etype].data: local_data = edge_feats[etype + '/' + name][type_eid] local_data1 = hg.edges[etype].data[name][orig_type_eid] assert np.all(local_data.numpy() == local_data1.numpy()) for ntype in orig_node_ids: nids = th.cat(orig_node_ids[ntype]) nids = th.sort(nids)[0] assert np.all((nids == th.arange(hg.number_of_nodes(ntype))).numpy()) for etype in orig_edge_ids: eids = th.cat(orig_edge_ids[etype]) eids = th.sort(eids)[0] assert np.all((eids == th.arange(hg.number_of_edges(etype))).numpy())