Unverified Commit 25ac3344 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] Heterogeneous graph support (#2457)

* Distributed heterograph (#3)

* heterogeneous graph partition.

* fix graph partition book for heterograph.

* load heterograph partitions.

* update DistGraphServer to support heterograph.

* make DistGraph runnable for heterograph.

* partition a graph and store parts with homogeneous graph structure.

* update DistGraph server&client to use homogeneous graph.

* shuffle node Ids based on node types.

* load mag in heterograph.

* fix per-node-type mapping.

* balance node types.

* fix for homogeneous graph

* store etype for now.

* fix data name.

* fix a bug in example.

* add profiler in rgcn.

* heterogeneous RGCN.

* map homogeneous node ids to hetero node ids.

* fix graph partition book.

* fix DistGraph.

* shuffle eids.

* verify eids and their mappings when loading a partition.

* Id map from homogneous Ids to per-type Ids.

* verify partitioned results.

* add test for distributed sampler....
parent aa884d43
......@@ -39,6 +39,8 @@ the number of nodes, the number of edges and the number of labelled nodes.
python3 partition_graph.py --dataset ogb-product --num_parts 4 --balance_train --balance_edges
```
This script generates partitioned graphs and store them in the directory called `data`.
### Step 2: copy the partitioned data and files to the cluster
DGL provides a script for copying partitioned data and files to the cluster. Before that, copy the training script to a local folder:
......
## Distributed training
This is an example of training RGCN node classification in a distributed fashion. Currently, the example only support training RGCN graphs with no input features. The current implementation follows ../rgcn/entity_claasify_mp.py.
This is an example of training RGCN node classification in a distributed fashion. Currently, the example train RGCN graphs with input node features. The current implementation follows ../rgcn/entity_claasify_mp.py.
Before training, please install some python libs by pip:
......@@ -36,6 +36,8 @@ the number of nodes, the number of edges and the number of labelled nodes.
python3 partition_graph.py --dataset ogbn-mag --num_parts 4 --balance_train --balance_edges
```
This script generates partitioned graphs and store them in the directory called `data`.
### Step 2: copy the partitioned data to the cluster
DGL provides a script for copying partitioned data to the cluster. Before that, copy the training script to a local folder:
......@@ -78,7 +80,7 @@ python3 ~/dgl/tools/launch.py \
--num_samplers 4 \
--part_config data/ogbn-mag.json \
--ip_config ip_config.txt \
"python3 dgl_code/entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 512 --n-hidden 64 --lr 0.01 --eval-batch-size 16 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --num-workers 4 --num-servers 1 --sparse-embedding --sparse-lr 0.06"
"python3 dgl_code/entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 512 --n-hidden 64 --lr 0.01 --eval-batch-size 16 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --num-workers 4 --num-servers 1 --sparse-embedding --sparse-lr 0.06 --node-feats"
```
We can get the performance score at the second epoch:
......@@ -98,5 +100,5 @@ python3 partition_graph.py --dataset ogbn-mag --num_parts 1
### Step 2: run the training script
```bash
python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 256 --n-hidden 64 --lr 0.01 --eval-batch-size 8 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --conf-path 'data/ogbn-mag.json' --standalone
python3 entity_classify_dist.py --graph-name ogbn-mag --dataset ogbn-mag --fanout='25,25' --batch-size 512 --n-hidden 64 --lr 0.01 --eval-batch-size 128 --low-mem --dropout 0.5 --use-self-loop --n-bases 2 --n-epochs 3 --layer-norm --ip-config ip_config.txt --conf-path 'data/ogbn-mag.json' --standalone --sparse-embedding --sparse-lr 0.06 --node-feats
```
......@@ -106,7 +106,7 @@ class EntityClassify(nn.Module):
h = feats
for layer, block in zip(self.layers, blocks):
block = block.to(self.device)
h = layer(block, h, block.edata['etype'], block.edata['norm'])
h = layer(block, h, block.edata[dgl.ETYPE], block.edata['norm'])
return h
def init_emb(shape, dtype):
......@@ -122,8 +122,6 @@ class DistEmbedLayer(nn.Module):
Device to run the layer.
g : DistGraph
training graph
num_of_ntype : int
Number of node types
embed_size : int
Output embed size
sparse_emb: bool
......@@ -138,55 +136,74 @@ class DistEmbedLayer(nn.Module):
def __init__(self,
dev_id,
g,
num_of_ntype,
embed_size,
sparse_emb=False,
dgl_sparse_emb=False,
feat_name='feat',
embed_name='node_emb'):
super(DistEmbedLayer, self).__init__()
self.dev_id = dev_id
self.num_of_ntype = num_of_ntype
self.embed_size = embed_size
self.embed_name = embed_name
self.feat_name = feat_name
self.sparse_emb = sparse_emb
self.g = g
self.ntype_id_map = {g.get_ntype_id(ntype):ntype for ntype in g.ntypes}
self.node_projs = nn.ModuleDict()
for ntype in g.ntypes:
if feat_name in g.nodes[ntype].data:
self.node_projs[ntype] = nn.Linear(g.nodes[ntype].data[feat_name].shape[1], embed_size)
nn.init.xavier_uniform_(self.node_projs[ntype].weight)
print('node {} has data {}'.format(ntype, feat_name))
if sparse_emb:
if dgl_sparse_emb:
self.node_embeds = dgl.distributed.DistEmbedding(g.number_of_nodes(),
self.embed_size,
embed_name,
init_emb)
self.node_embeds = {}
for ntype in g.ntypes:
# We only create embeddings for nodes without node features.
if feat_name not in g.nodes[ntype].data:
part_policy = g.get_node_partition_policy(ntype)
self.node_embeds[ntype] = dgl.distributed.DistEmbedding(g.number_of_nodes(ntype),
self.embed_size,
embed_name + '_' + ntype,
init_emb,
part_policy)
else:
self.node_embeds = th.nn.Embedding(g.number_of_nodes(), self.embed_size, sparse=self.sparse_emb)
nn.init.uniform_(self.node_embeds.weight, -1.0, 1.0)
self.node_embeds = nn.ModuleDict()
for ntype in g.ntypes:
# We only create embeddings for nodes without node features.
if feat_name not in g.nodes[ntype].data:
self.node_embeds[ntype] = th.nn.Embedding(g.number_of_nodes(ntype), self.embed_size, sparse=self.sparse_emb)
nn.init.uniform_(self.node_embeds[ntype].weight, -1.0, 1.0)
else:
self.node_embeds = th.nn.Embedding(g.number_of_nodes(), self.embed_size)
nn.init.uniform_(self.node_embeds.weight, -1.0, 1.0)
def forward(self, node_ids, node_tids, features):
self.node_embeds = nn.ModuleDict()
for ntype in g.ntypes:
# We only create embeddings for nodes without node features.
if feat_name not in g.nodes[ntype].data:
self.node_embeds[ntype] = th.nn.Embedding(g.number_of_nodes(ntype), self.embed_size)
nn.init.uniform_(self.node_embeds[ntype].weight, -1.0, 1.0)
def forward(self, node_ids, ntype_ids):
"""Forward computation
Parameters
----------
node_ids : tensor
node_ids : Tensor
node ids to generate embedding for.
node_ids : tensor
ntype_ids : Tensor
node type ids
features : list of features
list of initial features for nodes belong to different node type.
If None, the corresponding features is an one-hot encoding feature,
else use the features directly as input feature and matmul a
projection matrix.
Returns
-------
tensor
embeddings as the input of the next layer
"""
embeds = th.empty(node_ids.shape[0], self.embed_size)
for ntype in range(self.num_of_ntype):
assert features[ntype] is None, 'Currently Dist RGCN only support non input feature'
loc = node_tids == ntype
embeds[loc] = self.node_embeds(node_ids[loc])
embeds = th.empty(node_ids.shape[0], self.embed_size, device=self.dev_id)
for ntype_id in th.unique(ntype_ids).tolist():
ntype = self.ntype_id_map[int(ntype_id)]
loc = ntype_ids == ntype_id
if self.feat_name in self.g.nodes[ntype].data:
embeds[loc] = self.node_projs[ntype](self.g.nodes[ntype].data[self.feat_name][node_ids[ntype_ids == ntype_id]].to(self.dev_id))
else:
embeds[loc] = self.node_embeds[ntype](node_ids[ntype_ids == ntype_id]).to(self.dev_id)
return embeds
def compute_acc(results, labels):
......@@ -196,7 +213,15 @@ def compute_acc(results, labels):
labels = labels.long()
return (results == labels).float().sum() / len(results)
def evaluate(g, model, embed_layer, labels, eval_loader, test_loader, node_feats, global_val_nid, global_test_nid):
def gen_norm(g):
_, v, eid = g.all_edges(form='all')
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True)
degrees = count[inverse_index]
norm = th.ones(eid.shape[0], device=eid.device) / degrees
norm = norm.unsqueeze(1)
g.edata['norm'] = norm
def evaluate(g, model, embed_layer, labels, eval_loader, test_loader, all_val_nid, all_test_nid):
model.eval()
embed_layer.eval()
eval_logits = []
......@@ -207,11 +232,12 @@ def evaluate(g, model, embed_layer, labels, eval_loader, test_loader, node_feats
with th.no_grad():
for sample_data in tqdm.tqdm(eval_loader):
seeds, blocks = sample_data
feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata[dgl.NTYPE],
node_feats)
for block in blocks:
gen_norm(block)
feats = embed_layer(blocks[0].srcdata[dgl.NID], blocks[0].srcdata[dgl.NTYPE])
logits = model(blocks, feats)
eval_logits.append(logits.cpu().detach())
assert np.all(seeds.numpy() < g.number_of_nodes('paper'))
eval_seeds.append(seeds.cpu().detach())
eval_logits = th.cat(eval_logits)
eval_seeds = th.cat(eval_seeds)
......@@ -222,11 +248,12 @@ def evaluate(g, model, embed_layer, labels, eval_loader, test_loader, node_feats
with th.no_grad():
for sample_data in tqdm.tqdm(test_loader):
seeds, blocks = sample_data
feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata[dgl.NTYPE],
node_feats)
for block in blocks:
gen_norm(block)
feats = embed_layer(blocks[0].srcdata[dgl.NID], blocks[0].srcdata[dgl.NTYPE])
logits = model(blocks, feats)
test_logits.append(logits.cpu().detach())
assert np.all(seeds.numpy() < g.number_of_nodes('paper'))
test_seeds.append(seeds.cpu().detach())
test_logits = th.cat(test_logits)
test_seeds = th.cat(test_seeds)
......@@ -234,8 +261,8 @@ def evaluate(g, model, embed_layer, labels, eval_loader, test_loader, node_feats
g.barrier()
if g.rank() == 0:
return compute_acc(global_results[global_val_nid], labels[global_val_nid]), \
compute_acc(global_results[global_test_nid], labels[global_test_nid])
return compute_acc(global_results[all_val_nid], labels[all_val_nid]), \
compute_acc(global_results[all_test_nid], labels[all_test_nid])
else:
return -1, -1
......@@ -274,29 +301,35 @@ class NeighborSampler:
norms = []
ntypes = []
seeds = th.LongTensor(np.asarray(seeds))
cur = seeds
gpb = self.g.get_partition_book()
# We need to map the per-type node IDs to homogeneous IDs.
cur = gpb.map_to_homo_nid(seeds, 'paper')
for fanout in self.fanouts:
frontier = self.sample_neighbors(self.g, cur, fanout, replace=True)
etypes = self.g.edata[dgl.ETYPE][frontier.edata[dgl.EID]]
norm = self.g.edata['norm'][frontier.edata[dgl.EID]]
# For a heterogeneous input graph, the returned frontier is stored in
# the homogeneous graph format.
frontier = self.sample_neighbors(self.g, cur, fanout, replace=False)
block = dgl.to_block(frontier, cur)
block.srcdata[dgl.NTYPE] = self.g.ndata[dgl.NTYPE][block.srcdata[dgl.NID]]
block.edata['etype'] = etypes
block.edata['norm'] = norm
cur = block.srcdata[dgl.NID]
block.edata[dgl.EID] = frontier.edata[dgl.EID]
# Map the homogeneous edge Ids to their edge type.
block.edata[dgl.ETYPE], block.edata[dgl.EID] = gpb.map_to_per_etype(block.edata[dgl.EID])
# Map the homogeneous node Ids to their node types and per-type Ids.
block.srcdata[dgl.NTYPE], block.srcdata[dgl.NID] = gpb.map_to_per_ntype(block.srcdata[dgl.NID])
block.dstdata[dgl.NTYPE], block.dstdata[dgl.NID] = gpb.map_to_per_ntype(block.dstdata[dgl.NID])
blocks.insert(0, block)
return seeds, blocks
def run(args, device, data):
g, node_feats, num_of_ntype, num_classes, num_rels, \
train_nid, val_nid, test_nid, labels, global_val_nid, global_test_nid = data
g, num_classes, train_nid, val_nid, test_nid, labels, all_val_nid, all_test_nid = data
num_rels = len(g.etypes)
fanouts = [int(fanout) for fanout in args.fanout.split(',')]
val_fanouts = [int(fanout) for fanout in args.validation_fanout.split(',')]
sampler = NeighborSampler(g, fanouts, dgl.distributed.sample_neighbors)
# Create DataLoader for constructing blocks
dataloader = DistDataLoader(
dataset=train_nid.numpy(),
dataset=train_nid,
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=True,
......@@ -305,7 +338,7 @@ def run(args, device, data):
valid_sampler = NeighborSampler(g, val_fanouts, dgl.distributed.sample_neighbors)
# Create DataLoader for constructing blocks
valid_dataloader = DistDataLoader(
dataset=val_nid.numpy(),
dataset=val_nid,
batch_size=args.batch_size,
collate_fn=valid_sampler.sample_blocks,
shuffle=False,
......@@ -314,7 +347,7 @@ def run(args, device, data):
test_sampler = NeighborSampler(g, [-1] * args.n_layers, dgl.distributed.sample_neighbors)
# Create DataLoader for constructing blocks
test_dataloader = DistDataLoader(
dataset=test_nid.numpy(),
dataset=test_nid,
batch_size=args.batch_size,
collate_fn=test_sampler.sample_blocks,
shuffle=False,
......@@ -322,10 +355,10 @@ def run(args, device, data):
embed_layer = DistEmbedLayer(device,
g,
num_of_ntype,
args.n_hidden,
sparse_emb=args.sparse_embedding,
dgl_sparse_emb=args.dgl_sparse)
dgl_sparse_emb=args.dgl_sparse,
feat_name='feat')
model = EntityClassify(device,
args.n_hidden,
......@@ -340,15 +373,33 @@ def run(args, device, data):
model = model.to(device)
if not args.standalone:
model = th.nn.parallel.DistributedDataParallel(model)
if args.sparse_embedding and not args.dgl_sparse:
# If there are dense parameters in the embedding layer
# or we use Pytorch saprse embeddings.
if len(embed_layer.node_projs) > 0 or not args.dgl_sparse:
embed_layer = DistributedDataParallel(embed_layer, device_ids=None, output_device=None)
if args.sparse_embedding:
if args.dgl_sparse:
emb_optimizer = dgl.distributed.SparseAdagrad([embed_layer.node_embeds], lr=args.sparse_lr)
if args.dgl_sparse and args.standalone:
emb_optimizer = dgl.distributed.SparseAdagrad(list(embed_layer.node_embeds.values()), lr=args.sparse_lr)
print('optimize DGL sparse embedding:', embed_layer.node_embeds.keys())
elif args.dgl_sparse:
emb_optimizer = dgl.distributed.SparseAdagrad(list(embed_layer.module.node_embeds.values()), lr=args.sparse_lr)
print('optimize DGL sparse embedding:', embed_layer.module.node_embeds.keys())
elif args.standalone:
emb_optimizer = th.optim.SparseAdam(embed_layer.node_embeds.parameters(), lr=args.sparse_lr)
print('optimize Pytorch sparse embedding:', embed_layer.node_embeds)
else:
emb_optimizer = th.optim.SparseAdam(embed_layer.module.node_embeds.parameters(), lr=args.sparse_lr)
optimizer = th.optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.l2norm)
print('optimize Pytorch sparse embedding:', embed_layer.module.node_embeds)
dense_params = list(model.parameters())
if args.node_feats:
if args.standalone:
dense_params += list(embed_layer.node_projs.parameters())
print('optimize dense projection:', embed_layer.node_projs)
else:
dense_params += list(embed_layer.module.node_projs.parameters())
print('optimize dense projection:', embed_layer.module.node_projs)
optimizer = th.optim.Adam(dense_params, lr=args.lr, weight_decay=args.l2norm)
else:
all_params = list(model.parameters()) + list(embed_layer.parameters())
optimizer = th.optim.Adam(all_params, lr=args.lr, weight_decay=args.l2norm)
......@@ -385,9 +436,9 @@ def run(args, device, data):
sample_time += tic_step - start
sample_t.append(tic_step - start)
feats = embed_layer(blocks[0].srcdata[dgl.NID],
blocks[0].srcdata[dgl.NTYPE],
node_feats)
for block in blocks:
gen_norm(block)
feats = embed_layer(blocks[0].srcdata[dgl.NID], blocks[0].srcdata[dgl.NTYPE])
label = labels[seeds]
copy_time = time.time()
feat_copy_t.append(copy_time - tic_step)
......@@ -410,15 +461,16 @@ def run(args, device, data):
backward_t.append(compute_end - forward_end)
# Aggregate gradients in multiple nodes.
optimizer.step()
update_t.append(time.time() - compute_end)
step_t = time.time() - start
step_time.append(step_t)
train_acc = th.sum(logits.argmax(dim=1) == label).item() / len(seeds)
if step % args.log_every == 0:
print('[{}] Epoch {:05d} | Step {:05d} | Loss {:.4f} | time {:.3f} s' \
print('[{}] Epoch {:05d} | Step {:05d} | Train acc {:.4f} | Loss {:.4f} | time {:.3f} s' \
'| sample {:.3f} | copy {:.3f} | forward {:.3f} | backward {:.3f} | update {:.3f}'.format(
g.rank(), epoch, step, loss.item(), np.sum(step_time[-args.log_every:]),
g.rank(), epoch, step, train_acc, loss.item(), np.sum(step_time[-args.log_every:]),
np.sum(sample_t[-args.log_every:]), np.sum(feat_copy_t[-args.log_every:]), np.sum(forward_t[-args.log_every:]),
np.sum(backward_t[-args.log_every:]), np.sum(update_t[-args.log_every:])))
start = time.time()
......@@ -430,7 +482,7 @@ def run(args, device, data):
start = time.time()
g.barrier()
val_acc, test_acc = evaluate(g, model, embed_layer, labels,
valid_dataloader, test_dataloader, node_feats, global_val_nid, global_test_nid)
valid_dataloader, test_dataloader, all_val_nid, all_test_nid)
if val_acc >= 0:
print('Val Acc {:.4f}, Test Acc {:.4f}, time: {:.4f}'.format(val_acc, test_acc,
time.time() - start))
......@@ -442,34 +494,24 @@ def main(args):
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.conf_path)
print('rank:', g.rank())
print('number of edges', g.number_of_edges())
pb = g.get_partition_book()
train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True)
val_nid = dgl.distributed.node_split(g.ndata['val_mask'], pb, force_even=True)
test_nid = dgl.distributed.node_split(g.ndata['test_mask'], pb, force_even=True)
local_nid = pb.partid2nids(pb.partid).detach().numpy()
train_nid = dgl.distributed.node_split(g.nodes['paper'].data['train_mask'], pb, ntype='paper', force_even=True)
val_nid = dgl.distributed.node_split(g.nodes['paper'].data['val_mask'], pb, ntype='paper', force_even=True)
test_nid = dgl.distributed.node_split(g.nodes['paper'].data['test_mask'], pb, ntype='paper', force_even=True)
local_nid = pb.partid2nids(pb.partid, 'paper').detach().numpy()
print('part {}, train: {} (local: {}), val: {} (local: {}), test: {} (local: {})'.format(
g.rank(), len(train_nid), len(np.intersect1d(train_nid.numpy(), local_nid)),
len(val_nid), len(np.intersect1d(val_nid.numpy(), local_nid)),
len(test_nid), len(np.intersect1d(test_nid.numpy(), local_nid))))
device = th.device('cpu')
labels = g.ndata['labels'][np.arange(g.number_of_nodes())]
global_val_nid = th.LongTensor(np.nonzero(g.ndata['val_mask'][np.arange(g.number_of_nodes())])).squeeze()
global_test_nid = th.LongTensor(np.nonzero(g.ndata['test_mask'][np.arange(g.number_of_nodes())])).squeeze()
labels = g.nodes['paper'].data['labels'][np.arange(g.number_of_nodes('paper'))]
all_val_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['val_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze()
all_test_nid = th.LongTensor(np.nonzero(g.nodes['paper'].data['test_mask'][np.arange(g.number_of_nodes('paper'))])).squeeze()
n_classes = len(th.unique(labels[labels >= 0]))
print(labels.shape)
print('#classes:', n_classes)
# these two infor should have a better place to store and retrive
num_of_ntype = len(th.unique(g.ndata[dgl.NTYPE][np.arange(g.number_of_nodes())]))
num_rels = len(th.unique(g.edata[dgl.ETYPE][np.arange(g.number_of_edges())]))
# no initial node features
node_feats = [None] * num_of_ntype
run(args, device, (g, node_feats, num_of_ntype, n_classes, num_rels,
train_nid, val_nid, test_nid, labels, global_val_nid, global_test_nid))
run(args, device, (g, n_classes, train_nid, val_nid, test_nid, labels, all_val_nid, all_test_nid))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='RGCN')
......@@ -527,8 +569,6 @@ if __name__ == '__main__':
help='Whether to use DGL sparse embedding')
parser.add_argument('--node-feats', default=False, action='store_true',
help='Whether use node features')
parser.add_argument('--global-norm', default=False, action='store_true',
help='User global norm instead of per node type norm')
parser.add_argument('--layer-norm', default=False, action='store_true',
help='Use layer norm')
parser.add_argument('--local_rank', type=int, help='get rank of the process')
......
......@@ -6,7 +6,7 @@ import time
from ogb.nodeproppred import DglNodePropPredDataset
def load_ogb(dataset, global_norm):
def load_ogb(dataset):
if dataset == 'ogbn-mag':
dataset = DglNodePropPredDataset(name=dataset)
split_idx = dataset.get_idx_split()
......@@ -33,54 +33,24 @@ def load_ogb(dataset, global_norm):
print('Number of valid: {}'.format(len(val_idx)))
print('Number of test: {}'.format(len(test_idx)))
# currently we do not support node feature in mag dataset.
# calculate norm for each edge type and store in edge
if global_norm is False:
for canonical_etype in hg.canonical_etypes:
u, v, eid = hg.all_edges(form='all', etype=canonical_etype)
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True)
degrees = count[inverse_index]
norm = th.ones(eid.shape[0]) / degrees
norm = norm.unsqueeze(1)
hg.edges[canonical_etype].data['norm'] = norm
# get target category id
category_id = len(hg.ntypes)
for i, ntype in enumerate(hg.ntypes):
if ntype == category:
category_id = i
g = dgl.to_homogeneous(hg, edata=['norm'])
if global_norm:
u, v, eid = g.all_edges(form='all')
_, inverse_index, count = th.unique(v, return_inverse=True, return_counts=True)
degrees = count[inverse_index]
norm = th.ones(eid.shape[0]) / degrees
norm = norm.unsqueeze(1)
g.edata['norm'] = norm
node_ids = th.arange(g.number_of_nodes())
# find out the target node ids
node_tids = g.ndata[dgl.NTYPE]
loc = (node_tids == category_id)
target_idx = node_ids[loc]
train_idx = target_idx[train_idx]
val_idx = target_idx[val_idx]
test_idx = target_idx[test_idx]
train_mask = th.zeros((g.number_of_nodes(),), dtype=th.bool)
train_mask = th.zeros((hg.number_of_nodes('paper'),), dtype=th.bool)
train_mask[train_idx] = True
val_mask = th.zeros((g.number_of_nodes(),), dtype=th.bool)
val_mask = th.zeros((hg.number_of_nodes('paper'),), dtype=th.bool)
val_mask[val_idx] = True
test_mask = th.zeros((g.number_of_nodes(),), dtype=th.bool)
test_mask = th.zeros((hg.number_of_nodes('paper'),), dtype=th.bool)
test_mask[test_idx] = True
g.ndata['train_mask'] = train_mask
g.ndata['val_mask'] = val_mask
g.ndata['test_mask'] = test_mask
hg.nodes['paper'].data['train_mask'] = train_mask
hg.nodes['paper'].data['val_mask'] = val_mask
hg.nodes['paper'].data['test_mask'] = test_mask
labels = th.full((g.number_of_nodes(),), -1, dtype=paper_labels.dtype)
labels[target_idx] = paper_labels
g.ndata['labels'] = labels
return g
hg.nodes['paper'].data['labels'] = paper_labels
return hg
else:
raise("Do not support other ogbn datasets.")
......@@ -98,21 +68,19 @@ if __name__ == '__main__':
help='turn the graph into an undirected graph.')
argparser.add_argument('--balance_edges', action='store_true',
help='balance the number of edges in each partition.')
argparser.add_argument('--global-norm', default=False, action='store_true',
help='User global norm instead of per node type norm')
args = argparser.parse_args()
start = time.time()
g = load_ogb(args.dataset, args.global_norm)
g = load_ogb(args.dataset)
print('load {} takes {:.3f} seconds'.format(args.dataset, time.time() - start))
print('|V|={}, |E|={}'.format(g.number_of_nodes(), g.number_of_edges()))
print('train: {}, valid: {}, test: {}'.format(th.sum(g.ndata['train_mask']),
th.sum(g.ndata['val_mask']),
th.sum(g.ndata['test_mask'])))
print('train: {}, valid: {}, test: {}'.format(th.sum(g.nodes['paper'].data['train_mask']),
th.sum(g.nodes['paper'].data['val_mask']),
th.sum(g.nodes['paper'].data['test_mask'])))
if args.balance_train:
balance_ntypes = g.ndata['train_mask']
balance_ntypes = {'paper': g.nodes['paper'].data['train_mask']}
else:
balance_ntypes = None
......
......@@ -355,6 +355,22 @@ def sum(input, dim, keepdims=False):
"""
pass
def floor_div(in1, in2):
"""Element-wise integer division and rounds each quotient towards zero.
Parameters
----------
in1 : Tensor
The input tensor
in2 : Tensor or integer
The input
Returns
-------
Tensor
A framework-specific tensor.
"""
def reduce_sum(input):
"""Returns the sum of all elements in the input tensor.
......
......@@ -149,6 +149,9 @@ def sum(input, dim, keepdims=False):
return nd.array([0.], dtype=input.dtype, ctx=input.context)
return nd.sum(input, axis=dim, keepdims=keepdims)
def floor_div(in1, in2):
return in1 / in2
def reduce_sum(input):
return input.sum()
......
......@@ -117,6 +117,9 @@ def copy_to(input, ctx, **kwargs):
def sum(input, dim, keepdims=False):
return th.sum(input, dim=dim, keepdim=keepdims)
def floor_div(in1, in2):
return in1 // in2
def reduce_sum(input):
return input.sum()
......
......@@ -168,6 +168,8 @@ def sum(input, dim, keepdims=False):
input = tf.cast(input, tf.int32)
return tf.reduce_sum(input, axis=dim, keepdims=keepdims)
def floor_div(in1, in2):
return astype(in1 / in2, dtype(in1))
def reduce_sum(input):
if input.dtype == tf.bool:
......
......@@ -184,9 +184,9 @@ class CitationGraphDataset(DGLBuiltinDataset):
self._graph = nx.DiGraph(graph)
self._num_classes = info['num_classes']
self._g.ndata['train_mask'] = generate_mask_tensor(self._g.ndata['train_mask'].numpy())
self._g.ndata['val_mask'] = generate_mask_tensor(self._g.ndata['val_mask'].numpy())
self._g.ndata['test_mask'] = generate_mask_tensor(self._g.ndata['test_mask'].numpy())
self._g.ndata['train_mask'] = generate_mask_tensor(F.asnumpy(self._g.ndata['train_mask']))
self._g.ndata['val_mask'] = generate_mask_tensor(F.asnumpy(self._g.ndata['val_mask']))
self._g.ndata['test_mask'] = generate_mask_tensor(F.asnumpy(self._g.ndata['test_mask']))
# hack for mxnet compatability
if self.verbose:
......
......@@ -133,7 +133,7 @@ class DistDataLoader:
if not self.drop_last and len(dataset) % self.batch_size != 0:
self.expected_idxs += 1
# We need to have a unique Id for each data loader to identify itself
# We need to have a unique ID for each data loader to identify itself
# in the sampler processes.
global DATALOADER_ID
self.name = "dataloader-" + str(DATALOADER_ID)
......
"""Define distributed graph."""
from collections.abc import MutableMapping
from collections import namedtuple
import os
import numpy as np
from ..heterograph import DGLHeteroGraph
from .. import heterograph_index
from .. import backend as F
from ..base import NID, EID
from ..base import NID, EID, NTYPE, ETYPE
from .kvstore import KVServer, get_kvstore
from .._ffi.ndarray import empty_shared_mem
from ..frame import infer_scheme
from .partition import load_partition, load_partition_book
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
from .graph_partition_book import NODE_PART_POLICY, EDGE_PART_POLICY
from .graph_partition_book import HeteroDataName, parse_hetero_data_name
from .graph_partition_book import NodePartitionPolicy, EdgePartitionPolicy
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from . import rpc
from . import role
from .server_state import ServerState
from .rpc_server import start_server
from .graph_services import find_edges as dist_find_edges
from .dist_tensor import DistTensor, _get_data_name
from .dist_tensor import DistTensor
INIT_GRAPH = 800001
......@@ -61,26 +64,21 @@ def _copy_graph_to_shared_mem(g, graph_name):
new_g = g.shared_memory(graph_name, formats='csc')
# We should share the node/edge data to the client explicitly instead of putting them
# in the KVStore because some of the node/edge data may be duplicated.
local_node_path = _get_ndata_path(graph_name, 'inner_node')
new_g.ndata['inner_node'] = _to_shared_mem(g.ndata['inner_node'], local_node_path)
local_edge_path = _get_edata_path(graph_name, 'inner_edge')
new_g.edata['inner_edge'] = _to_shared_mem(g.edata['inner_edge'], local_edge_path)
new_g.ndata['inner_node'] = _to_shared_mem(g.ndata['inner_node'],
_get_ndata_path(graph_name, 'inner_node'))
new_g.ndata[NID] = _to_shared_mem(g.ndata[NID], _get_ndata_path(graph_name, NID))
new_g.edata['inner_edge'] = _to_shared_mem(g.edata['inner_edge'],
_get_edata_path(graph_name, 'inner_edge'))
new_g.edata[EID] = _to_shared_mem(g.edata[EID], _get_edata_path(graph_name, EID))
return new_g
FIELD_DICT = {'inner_node': F.int64,
'inner_edge': F.int64,
FIELD_DICT = {'inner_node': F.int32, # A flag indicates whether the node is inside a partition.
'inner_edge': F.int32, # A flag indicates whether the edge is inside a partition.
NID: F.int64,
EID: F.int64}
def _is_ndata_name(name):
''' Is this node data in the kvstore '''
return name[:5] == NODE_PART_POLICY + ':'
def _is_edata_name(name):
''' Is this edge data in the kvstore '''
return name[:5] == EDGE_PART_POLICY + ':'
EID: F.int64,
NTYPE: F.int16,
ETYPE: F.int16}
def _get_shared_mem_ndata(g, graph_name, name):
''' Get shared-memory node data from DistGraph server.
......@@ -119,29 +117,64 @@ def _get_graph_from_shared_mem(graph_name):
if g is None:
return None
g = DGLHeteroGraph(g, ntypes, etypes)
g.ndata['inner_node'] = _get_shared_mem_ndata(g, graph_name, 'inner_node')
g.edata['inner_edge'] = _get_shared_mem_edata(g, graph_name, 'inner_edge')
g.ndata[NID] = _get_shared_mem_ndata(g, graph_name, NID)
g.edata['inner_edge'] = _get_shared_mem_edata(g, graph_name, 'inner_edge')
g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID)
return g
NodeSpace = namedtuple('NodeSpace', ['data'])
EdgeSpace = namedtuple('EdgeSpace', ['data'])
class HeteroNodeView(object):
"""A NodeView class to act as G.nodes for a DistGraph."""
__slots__ = ['_graph']
def __init__(self, graph):
self._graph = graph
def __getitem__(self, key):
assert isinstance(key, str)
return NodeSpace(data=NodeDataView(self._graph, key))
class HeteroEdgeView(object):
"""A NodeView class to act as G.nodes for a DistGraph."""
__slots__ = ['_graph']
def __init__(self, graph):
self._graph = graph
def __getitem__(self, key):
assert isinstance(key, str)
return EdgeSpace(data=EdgeDataView(self._graph, key))
class NodeDataView(MutableMapping):
"""The data view class when dist_graph.ndata[...].data is called.
"""
__slots__ = ['_graph', '_data']
def __init__(self, g):
def __init__(self, g, ntype=None):
self._graph = g
# When this is created, the server may already load node data. We need to
# initialize the node data in advance.
names = g._get_all_ndata_names()
policy = PartitionPolicy(NODE_PART_POLICY, g.get_partition_book())
self._data = {}
names = g._get_ndata_names(ntype)
if ntype is None:
self._data = g._ndata_store
else:
if ntype in g._ndata_store:
self._data = g._ndata_store[ntype]
else:
self._data = {}
g._ndata_store[ntype] = self._data
for name in names:
name1 = _get_data_name(name, policy.policy_str)
dtype, shape, _ = g._client.get_data_meta(name1)
assert name.is_node()
policy = PartitionPolicy(name.policy_str, g.get_partition_book())
dtype, shape, _ = g._client.get_data_meta(str(name))
# We create a wrapper on the existing tensor in the kvstore.
self._data[name] = DistTensor(shape, dtype, name, part_policy=policy)
self._data[name.get_name()] = DistTensor(shape, dtype, name.get_name(),
part_policy=policy)
def _get_names(self):
return list(self._data.keys())
......@@ -176,18 +209,26 @@ class EdgeDataView(MutableMapping):
"""
__slots__ = ['_graph', '_data']
def __init__(self, g):
def __init__(self, g, etype=None):
self._graph = g
# When this is created, the server may already load edge data. We need to
# initialize the edge data in advance.
names = g._get_all_edata_names()
policy = PartitionPolicy(EDGE_PART_POLICY, g.get_partition_book())
self._data = {}
names = g._get_edata_names(etype)
if etype is None:
self._data = g._edata_store
else:
if etype in g._edata_store:
self._data = g._edata_store[etype]
else:
self._data = {}
g._edata_store[etype] = self._data
for name in names:
name1 = _get_data_name(name, policy.policy_str)
dtype, shape, _ = g._client.get_data_meta(name1)
assert name.is_edge()
policy = PartitionPolicy(name.policy_str, g.get_partition_book())
dtype, shape, _ = g._client.get_data_meta(str(name))
# We create a wrapper on the existing tensor in the kvstore.
self._data[name] = DistTensor(shape, dtype, name, part_policy=policy)
self._data[name.get_name()] = DistTensor(shape, dtype, name.get_name(),
part_policy=policy)
def _get_names(self):
return list(self._data.keys())
......@@ -260,11 +301,11 @@ class DistGraphServer(KVServer):
# Load graph partition data.
if self.is_backup_server():
# The backup server doesn't load the graph partition. It'll initialized afterwards.
self.gpb, graph_name = load_partition_book(part_config, self.part_id)
self.gpb, graph_name, ntypes, etypes = load_partition_book(part_config, self.part_id)
self.client_g = None
else:
self.client_g, node_feats, edge_feats, self.gpb, \
graph_name = load_partition(part_config, self.part_id)
self.client_g, node_feats, edge_feats, self.gpb, graph_name, \
ntypes, etypes = load_partition(part_config, self.part_id)
print('load ' + graph_name)
if not disable_shared_mem:
self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name)
......@@ -272,17 +313,27 @@ class DistGraphServer(KVServer):
if not disable_shared_mem:
self.gpb.shared_memory(graph_name)
assert self.gpb.partid == self.part_id
self.add_part_policy(PartitionPolicy(NODE_PART_POLICY, self.gpb))
self.add_part_policy(PartitionPolicy(EDGE_PART_POLICY, self.gpb))
for ntype in ntypes:
node_name = HeteroDataName(True, ntype, None)
self.add_part_policy(PartitionPolicy(node_name.policy_str, self.gpb))
for etype in etypes:
edge_name = HeteroDataName(False, etype, None)
self.add_part_policy(PartitionPolicy(edge_name.policy_str, self.gpb))
if not self.is_backup_server():
for name in node_feats:
self.init_data(name=_get_data_name(name, NODE_PART_POLICY),
policy_str=NODE_PART_POLICY,
# The feature name has the following format: node_type + "/" + feature_name to avoid
# feature name collision for different node types.
ntype, feat_name = name.split('/')
data_name = HeteroDataName(True, ntype, feat_name)
self.init_data(name=str(data_name), policy_str=data_name.policy_str,
data_tensor=node_feats[name])
for name in edge_feats:
self.init_data(name=_get_data_name(name, EDGE_PART_POLICY),
policy_str=EDGE_PART_POLICY,
# The feature name has the following format: edge_type + "/" + feature_name to avoid
# feature name collision for different edge types.
etype, feat_name = name.split('/')
data_name = HeteroDataName(False, etype, feat_name)
self.init_data(name=str(data_name), policy_str=data_name.policy_str,
data_tensor=edge_feats[name])
def start(self):
......@@ -385,16 +436,24 @@ class DistGraph:
assert self._client is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
# Load graph partition data.
g, node_feats, edge_feats, self._gpb, _ = load_partition(part_config, 0)
g, node_feats, edge_feats, self._gpb, _, _, _ = load_partition(part_config, 0)
assert self._gpb.num_partitions() == 1, \
'The standalone mode can only work with the graph data with one partition'
if self._gpb is None:
self._gpb = gpb
self._g = g
for name in node_feats:
self._client.add_data(_get_data_name(name, NODE_PART_POLICY), node_feats[name])
# The feature name has the following format: node_type + "/" + feature_name.
ntype, feat_name = name.split('/')
self._client.add_data(str(HeteroDataName(True, ntype, feat_name)),
node_feats[name],
NodePartitionPolicy(self._gpb, ntype=ntype))
for name in edge_feats:
self._client.add_data(_get_data_name(name, EDGE_PART_POLICY), edge_feats[name])
# The feature name has the following format: edge_type + "/" + feature_name.
etype, feat_name = name.split('/')
self._client.add_data(str(HeteroDataName(False, etype, feat_name)),
edge_feats[name],
EdgePartitionPolicy(self._gpb, etype=etype))
self._client.map_shared_data(self._gpb)
rpc.set_num_client(1)
else:
......@@ -406,6 +465,8 @@ class DistGraph:
rpc.recv_response()
self._client.barrier()
self._ndata_store = {}
self._edata_store = {}
self._ndata = NodeDataView(self)
self._edata = EdgeDataView(self)
......@@ -415,6 +476,10 @@ class DistGraph:
self._num_nodes += int(part_md['num_nodes'])
self._num_edges += int(part_md['num_edges'])
# When we store node/edge types in a list, they are stored in the order of type IDs.
self._ntype_map = {ntype:i for i, ntype in enumerate(self.ntypes)}
self._etype_map = {etype:i for i, etype in enumerate(self.etypes)}
def _init(self):
self._client = get_kvstore()
assert self._client is not None, \
......@@ -432,6 +497,8 @@ class DistGraph:
self.graph_name, self._gpb_input = state
self._init()
self._ndata_store = {}
self._edata_store = {}
self._ndata = NodeDataView(self)
self._edata = EdgeDataView(self)
self._num_nodes = 0
......@@ -456,6 +523,18 @@ class DistGraph:
'''
return self._g
@property
def nodes(self):
'''Return a node view
'''
return HeteroNodeView(self)
@property
def edges(self):
'''Return an edge view
'''
return HeteroEdgeView(self)
@property
def ndata(self):
"""Return the data view of all the nodes.
......@@ -465,6 +544,7 @@ class DistGraph:
NodeDataView
The data view in the distributed graph storage.
"""
assert len(self.ntypes) == 1, "ndata only works for a graph with one node type."
return self._ndata
@property
......@@ -476,6 +556,7 @@ class DistGraph:
EdgeDataView
The data view in the distributed graph storage.
"""
assert len(self.etypes) == 1, "edata only works for a graph with one edge type."
return self._edata
@property
......@@ -532,8 +613,7 @@ class DistGraph:
>>> g.ntypes
['_U']
"""
# Currently, we only support a graph with one node type.
return ['_U']
return self._gpb.ntypes
@property
def etypes(self):
......@@ -551,19 +631,69 @@ class DistGraph:
['_E']
"""
# Currently, we only support a graph with one edge type.
return ['_E']
return self._gpb.etypes
def get_ntype_id(self, ntype):
"""Return the ID of the given node type.
ntype can also be None. If so, there should be only one node type in the
graph.
Parameters
----------
ntype : str
Node type
Returns
-------
int
"""
if ntype is None:
if len(self._ntype_map) != 1:
raise DGLError('Node type name must be specified if there are more than one '
'node types.')
return 0
return self._ntype_map[ntype]
def get_etype_id(self, etype):
"""Return the id of the given edge type.
def number_of_nodes(self):
etype can also be None. If so, there should be only one edge type in the
graph.
Parameters
----------
etype : str or tuple of str
Edge type
Returns
-------
int
"""
if etype is None:
if len(self._etype_map) != 1:
raise DGLError('Edge type name must be specified if there are more than one '
'edge types.')
return 0
return self._etype_map[etype]
def number_of_nodes(self, ntype=None):
"""Alias of :func:`num_nodes`"""
return self.num_nodes()
return self.num_nodes(ntype)
def number_of_edges(self):
def number_of_edges(self, etype=None):
"""Alias of :func:`num_edges`"""
return self.num_edges()
return self.num_edges(etype)
def num_nodes(self):
def num_nodes(self, ntype=None):
"""Return the total number of nodes in the distributed graph.
Parameters
----------
ntype : str, optional
The node type name. If given, it returns the number of nodes of the
type. If not given (default), it returns the total number of nodes of all types.
Returns
-------
int
......@@ -575,11 +705,28 @@ class DistGraph:
>>> print(g.num_nodes())
2449029
"""
return self._num_nodes
def num_edges(self):
if ntype is None:
if len(self.ntypes) == 1:
return self._gpb._num_nodes(self.ntypes[0])
else:
return sum([self._gpb._num_nodes(ntype) for ntype in self.ntypes])
return self._gpb._num_nodes(ntype)
def num_edges(self, etype=None):
"""Return the total number of edges in the distributed graph.
Parameters
----------
etype : str or (str, str, str), optional
The type name of the edges. The allowed type name formats are:
* ``(str, str, str)`` for source node type, edge type and destination node type.
* or one ``str`` edge type name if the name can uniquely identify a
triplet format in the graph.
If not provided, return the total number of edges regardless of the types
in the graph.
Returns
-------
int
......@@ -591,7 +738,12 @@ class DistGraph:
>>> print(g.num_edges())
123718280
"""
return self._num_edges
if etype is None:
if len(self.etypes) == 1:
return self._gpb._num_edges(self.etypes[0])
else:
return sum([self._gpb._num_edges(etype) for etype in self.etypes])
return self._gpb._num_edges(etype)
def node_attr_schemes(self):
"""Return the node feature schemes.
......@@ -677,6 +829,7 @@ class DistGraph:
tensor
The destination node ID array.
"""
assert len(self.etypes) == 1, 'find_edges does not support heterogeneous graph for now.'
return dist_find_edges(self, edges)
def get_partition_book(self):
......@@ -689,6 +842,48 @@ class DistGraph:
"""
return self._gpb
def get_node_partition_policy(self, ntype):
"""Get the partition policy for a node type.
When creating a new distributed tensor, we need to provide a partition policy
that indicates how to distribute data of the distributed tensor in a cluster
of machines. When we load a distributed graph in the cluster, we have pre-defined
partition policies for each node type and each edge type. By providing
the node type, we can reference to the pre-defined partition policy for the node type.
Parameters
----------
ntype : str
The node type
Returns
-------
PartitionPolicy
The partition policy for the node type.
"""
return NodePartitionPolicy(self.get_partition_book(), ntype)
def get_edge_partition_policy(self, etype):
"""Get the partition policy for an edge type.
When creating a new distributed tensor, we need to provide a partition policy
that indicates how to distribute data of the distributed tensor in a cluster
of machines. When we load a distributed graph in the cluster, we have pre-defined
partition policies for each node type and each edge type. By providing
the edge type, we can reference to the pre-defined partition policy for the edge type.
Parameters
----------
etype : str
The edge type
Returns
-------
PartitionPolicy
The partition policy for the edge type.
"""
return EdgePartitionPolicy(self.get_partition_book(), etype)
def barrier(self):
'''Barrier for all client nodes.
......@@ -697,46 +892,48 @@ class DistGraph:
'''
self._client.barrier()
def _get_all_ndata_names(self):
def _get_ndata_names(self, ntype=None):
''' Get the names of all node data.
'''
names = self._client.data_name_list()
ndata_names = []
for name in names:
if _is_ndata_name(name):
# Remove the prefix "node:"
ndata_names.append(name[5:])
name = parse_hetero_data_name(name)
right_type = (name.get_type() == ntype) if ntype is not None else True
if name.is_node() and right_type:
ndata_names.append(name)
return ndata_names
def _get_all_edata_names(self):
def _get_edata_names(self, etype=None):
''' Get the names of all edge data.
'''
names = self._client.data_name_list()
edata_names = []
for name in names:
if _is_edata_name(name):
# Remove the prefix "edge:"
edata_names.append(name[5:])
name = parse_hetero_data_name(name)
right_type = (name.get_type() == etype) if etype is not None else True
if name.is_edge() and right_type:
edata_names.append(name)
return edata_names
def _get_overlap(mask_arr, ids):
""" Select the Ids given a boolean mask array.
""" Select the IDs given a boolean mask array.
The boolean mask array indicates all of the Ids to be selected. We want to
find the overlap between the Ids selected by the boolean mask array and
the Id array.
The boolean mask array indicates all of the IDs to be selected. We want to
find the overlap between the IDs selected by the boolean mask array and
the ID array.
Parameters
----------
mask_arr : 1D tensor
A boolean mask array.
ids : 1D tensor
A vector with Ids.
A vector with IDs.
Returns
-------
1D tensor
The selected Ids.
The selected IDs.
"""
if isinstance(mask_arr, DistTensor):
masks = mask_arr[ids]
......@@ -812,7 +1009,7 @@ def _split_even(partition_book, rank, elements):
return eles[offsets[rank-1]:offsets[rank]]
def node_split(nodes, partition_book=None, rank=None, force_even=True):
def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=True):
''' Split nodes and return a subset for the local rank.
This function splits the input nodes based on the partition book and
......@@ -825,10 +1022,10 @@ def node_split(nodes, partition_book=None, rank=None, force_even=True):
There are two strategies to split the nodes. By default, it splits the nodes
in a way to maximize data locality. That is, all nodes that belong to a process
are returned. If `force_even` is set to true, the nodes are split evenly so
are returned. If ``force_even`` is set to true, the nodes are split evenly so
that each process gets almost the same number of nodes.
When `force_even` is True, the data locality is still preserved if a graph is partitioned
When ``force_even`` is True, the data locality is still preserved if a graph is partitioned
with Metis and the node/edge IDs are shuffled.
In this case, majority of the nodes returned for a process are the ones that
belong to the process. If node/edge IDs are not shuffled, data locality is not guaranteed.
......@@ -837,26 +1034,26 @@ def node_split(nodes, partition_book=None, rank=None, force_even=True):
----------
nodes : 1D tensor or DistTensor
A boolean mask vector that indicates input nodes.
partition_book : GraphPartitionBook
partition_book : GraphPartitionBook, optional
The graph partition book
rank : int
ntype : str, optional
The node type of the input nodes.
rank : int, optional
The rank of a process. If not given, the rank of the current process is used.
force_even : bool
force_even : bool, optional
Force the nodes are split evenly.
Returns
-------
1D-tensor
The vector of node Ids that belong to the rank.
The vector of node IDs that belong to the rank.
'''
num_nodes = 0
if not isinstance(nodes, DistTensor):
assert partition_book is not None, 'Regular tensor requires a partition book.'
elif partition_book is None:
partition_book = nodes.part_policy.partition_book
for part in partition_book.metadata():
num_nodes += part['num_nodes']
assert len(nodes) == num_nodes, \
assert len(nodes) == partition_book._num_nodes(ntype), \
'The length of boolean mask vector should be the number of nodes in the graph.'
if force_even:
return _split_even(partition_book, rank, nodes)
......@@ -865,7 +1062,7 @@ def node_split(nodes, partition_book=None, rank=None, force_even=True):
local_nids = partition_book.partid2nids(partition_book.partid)
return _split_local(partition_book, rank, nodes, local_nids)
def edge_split(edges, partition_book=None, rank=None, force_even=True):
def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=True):
''' Split edges and return a subset for the local rank.
This function splits the input edges based on the partition book and
......@@ -878,10 +1075,10 @@ def edge_split(edges, partition_book=None, rank=None, force_even=True):
There are two strategies to split the edges. By default, it splits the edges
in a way to maximize data locality. That is, all edges that belong to a process
are returned. If `force_even` is set to true, the edges are split evenly so
are returned. If ``force_even`` is set to true, the edges are split evenly so
that each process gets almost the same number of edges.
When `force_even` is True, the data locality is still preserved if a graph is partitioned
When ``force_even`` is True, the data locality is still preserved if a graph is partitioned
with Metis and the node/edge IDs are shuffled.
In this case, majority of the nodes returned for a process are the ones that
belong to the process. If node/edge IDs are not shuffled, data locality is not guaranteed.
......@@ -890,26 +1087,25 @@ def edge_split(edges, partition_book=None, rank=None, force_even=True):
----------
edges : 1D tensor or DistTensor
A boolean mask vector that indicates input edges.
partition_book : GraphPartitionBook
partition_book : GraphPartitionBook, optional
The graph partition book
rank : int
etype : str, optional
The edge type of the input edges.
rank : int, optional
The rank of a process. If not given, the rank of the current process is used.
force_even : bool
force_even : bool, optional
Force the edges are split evenly.
Returns
-------
1D-tensor
The vector of edge Ids that belong to the rank.
The vector of edge IDs that belong to the rank.
'''
num_edges = 0
if not isinstance(edges, DistTensor):
assert partition_book is not None, 'Regular tensor requires a partition book.'
elif partition_book is None:
partition_book = edges.part_policy.partition_book
for part in partition_book.metadata():
num_edges += part['num_edges']
assert len(edges) == num_edges, \
assert len(edges) == partition_book._num_edges(etype), \
'The length of boolean mask vector should be the number of edges in the graph.'
if force_even:
......
......@@ -8,17 +8,10 @@ from .role import get_role
from .. import utils
from .. import backend as F
def _get_data_name(name, part_policy):
''' This is to get the name of data in the kvstore.
KVStore doesn't understand node data or edge data. We'll use a prefix to distinguish them.
'''
return part_policy + ':' + name
def _default_init_data(shape, dtype):
return F.zeros(shape, dtype, F.cpu())
# These Ids can identify the anonymous distributed tensors.
# These IDs can identify the anonymous distributed tensors.
DIST_TENSOR_ID = 0
class DistTensor:
......@@ -144,10 +137,12 @@ class DistTensor:
assert not persistent, 'We cannot generate anonymous persistent distributed tensors'
global DIST_TENSOR_ID
# All processes of the same role should create DistTensor synchronously.
# Thus, all of them should have the same Ids.
# Thus, all of them should have the same IDs.
name = 'anonymous-' + get_role() + '-' + str(DIST_TENSOR_ID)
DIST_TENSOR_ID += 1
self._name = _get_data_name(name, part_policy.policy_str)
assert isinstance(name, str), 'name {} is type {}'.format(name, type(name))
data_name = part_policy.get_data_name(name)
self._name = str(data_name)
self._persistent = persistent
if self._name not in exist_names:
self.kvstore.init_data(self._name, shape, dtype, part_policy, init_func)
......
"""Define graph partition book."""
import pickle
from abc import ABC
import numpy as np
from .. import backend as F
......@@ -8,45 +10,89 @@ from .. import utils
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from .._ffi.ndarray import empty_shared_mem
from ..ndarray import exist_shared_mem_array
from .id_map import IdMap
def _move_metadata_to_shared_mem(graph_name, num_nodes, num_edges, part_id,
num_partitions, node_map, edge_map, is_range_part):
''' Move all metadata of the partition book to the shared memory.
We need these metadata to construct graph partition book.
These metadata will be used to construct graph partition book.
Parameters
----------
graph_name : str
The name of the graph
num_nodes : int
The total number of nodes
num_edges : int
The total number of edges
part_id : int
The partition ID.
num_partitions : int
The number of physical partitions generated for the graph.
node_map : Tensor
It stores the mapping information from node IDs to partitions. With range partitioning,
the tensor stores the serialized result of partition ranges.
edge_map : Tensor
It stores the mapping information from edge IDs to partitions. With range partitioning,
the tensor stores the serialized result of partition ranges.
is_range_part : bool
Indicate that we use a range partition. This is important for us to deserialize data
in node_map and edge_map.
Returns
-------
(Tensor, Tensor, Tensor)
The first tensor stores the serialized metadata, the second tensor stores the serialized
node map and the third tensor stores the serialized edge map. All tensors are stored in
shared memory.
'''
meta = _to_shared_mem(F.tensor([int(is_range_part), num_nodes, num_edges,
num_partitions, part_id]),
num_partitions, part_id,
len(node_map), len(edge_map)]),
_get_ndata_path(graph_name, 'meta'))
node_map = _to_shared_mem(node_map, _get_ndata_path(graph_name, 'node_map'))
edge_map = _to_shared_mem(edge_map, _get_edata_path(graph_name, 'edge_map'))
return meta, node_map, edge_map
def _get_shared_mem_metadata(graph_name):
''' Get the metadata of the graph through shared memory.
''' Get the metadata of the graph from shared memory.
The server serializes the metadata of a graph and store them in shared memory.
The client needs to deserialize the data in shared memory and get the metadata
of the graph.
The metadata includes the number of nodes and the number of edges. In the future,
we can add more information, especially for heterograph.
Parameters
----------
graph_name : str
The name of the graph. We can use the graph name to find the shared memory name.
Returns
-------
(bool, int, int, Tensor, Tensor)
The first element indicates whether it is range partitioning;
the second element is the partition ID;
the third element is the number of partitions;
the fourth element is the tensor that stores the serialized result of node maps;
the fifth element is the tensor that stores the serialized result of edge maps.
'''
# The metadata has 5 elements: is_range_part, num_nodes, num_edges, num_partitions, part_id
# We might need to extend the list in the future.
shape = (5,)
# The metadata has 7 elements: is_range_part, num_nodes, num_edges, num_partitions, part_id,
# the length of node map and the length of the edge map.
shape = (7,)
dtype = F.int64
dtype = DTYPE_DICT[dtype]
data = empty_shared_mem(_get_ndata_path(graph_name, 'meta'), False, shape, dtype)
dlpack = data.to_dlpack()
meta = F.asnumpy(F.zerocopy_from_dlpack(dlpack))
is_range_part, num_nodes, num_edges, num_partitions, part_id = meta
is_range_part, _, _, num_partitions, part_id, node_map_len, edge_map_len = meta
# Load node map
length = num_partitions if is_range_part else num_nodes
data = empty_shared_mem(_get_ndata_path(graph_name, 'node_map'), False, (length,), dtype)
data = empty_shared_mem(_get_ndata_path(graph_name, 'node_map'), False, (node_map_len,), dtype)
dlpack = data.to_dlpack()
node_map = F.zerocopy_from_dlpack(dlpack)
# Load edge_map
length = num_partitions if is_range_part else num_edges
data = empty_shared_mem(_get_edata_path(graph_name, 'edge_map'), False, (length,), dtype)
data = empty_shared_mem(_get_edata_path(graph_name, 'edge_map'), False, (edge_map_len,), dtype)
dlpack = data.to_dlpack()
edge_map = F.zerocopy_from_dlpack(dlpack)
......@@ -73,13 +119,32 @@ def get_shared_mem_partition_book(graph_name, graph_part):
'''
if not exist_shared_mem_array(_get_ndata_path(graph_name, 'meta')):
return None
is_range_part, part_id, num_parts, node_map, edge_map = _get_shared_mem_metadata(graph_name)
is_range_part, part_id, num_parts, node_map_data, edge_map_data = \
_get_shared_mem_metadata(graph_name)
if is_range_part == 1:
return RangePartitionBook(part_id, num_parts, node_map, edge_map)
# node ID ranges and edge ID ranges are stored in the order of node type IDs
# and edge type IDs.
node_map = {}
ntypes = {}
# node_map_data and edge_map_data were serialized with pickle and converted into
# a list of bytes and then stored in a numpy array before being placed in shared
# memory. To deserialize, we need to reverse the process.
node_map_data = pickle.loads(bytes(F.asnumpy(node_map_data).tolist()))
for i, (ntype, nid_range) in enumerate(node_map_data):
ntypes[ntype] = i
node_map[ntype] = nid_range
edge_map = {}
etypes = {}
edge_map_data = pickle.loads(bytes(F.asnumpy(edge_map_data).tolist()))
for i, (etype, eid_range) in enumerate(edge_map_data):
etypes[etype] = i
edge_map[etype] = eid_range
return RangePartitionBook(part_id, num_parts, node_map, edge_map, ntypes, etypes)
else:
return BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph_part)
return BasicPartitionBook(part_id, num_parts, node_map_data, edge_map_data, graph_part)
class GraphPartitionBook:
class GraphPartitionBook(ABC):
""" The base class of the graph partition book.
For distributed training, a graph is partitioned into multiple parts and is loaded
......@@ -93,13 +158,13 @@ class GraphPartitionBook:
* the node IDs and the edge IDs that a partition has.
* the local IDs of nodes and edges in a partition.
Currently, there are two classes that implement `GraphPartitionBook`:
`BasicGraphPartitionBook` and `RangePartitionBook`. `BasicGraphPartitionBook`
Currently, there are two classes that implement ``GraphPartitionBook``:
``BasicGraphPartitionBook`` and ``RangePartitionBook``. ``BasicGraphPartitionBook``
stores the mappings between every individual node/edge ID and partition ID on
every machine, which usually consumes a lot of memory, while `RangePartitionBook`
every machine, which usually consumes a lot of memory, while ``RangePartitionBook``
calculates the mapping between node/edge IDs and partition IDs based on some small
metadata because nodes/edges have been relabeled to have IDs in the same partition
fall in a contiguous ID range. `RangePartitionBook` is usually a preferred way to
fall in a contiguous ID range. ``RangePartitionBook`` is usually a preferred way to
provide mappings between node/edge IDs and partition IDs.
A graph partition book is constructed automatically when a graph is partitioned.
......@@ -149,13 +214,15 @@ class GraphPartitionBook:
Meta data of each partition.
"""
def nid2partid(self, nids):
def nid2partid(self, nids, ntype):
"""From global node IDs to partition IDs
Parameters
----------
nids : tensor
global node IDs
ntype : str
The node type
Returns
-------
......@@ -163,13 +230,15 @@ class GraphPartitionBook:
partition IDs
"""
def eid2partid(self, eids):
def eid2partid(self, eids, etype):
"""From global edge IDs to partition IDs
Parameters
----------
eids : tensor
global edge IDs
etype : str
The edge type
Returns
-------
......@@ -177,13 +246,15 @@ class GraphPartitionBook:
partition IDs
"""
def partid2nids(self, partid):
def partid2nids(self, partid, ntype):
"""From partition id to global node IDs
Parameters
----------
partid : int
partition id
ntype : str
The node type
Returns
-------
......@@ -191,13 +262,15 @@ class GraphPartitionBook:
node IDs
"""
def partid2eids(self, partid):
def partid2eids(self, partid, etype):
"""From partition id to global edge IDs
Parameters
----------
partid : int
partition id
etype : str
The edge type
Returns
-------
......@@ -205,7 +278,7 @@ class GraphPartitionBook:
edge IDs
"""
def nid2localnid(self, nids, partid):
def nid2localnid(self, nids, partid, ntype):
"""Get local node IDs within the given partition.
Parameters
......@@ -214,6 +287,8 @@ class GraphPartitionBook:
global node IDs
partid : int
partition ID
ntype : str
The node type
Returns
-------
......@@ -221,30 +296,42 @@ class GraphPartitionBook:
local node IDs
"""
def eid2localeid(self, eids, partid):
def eid2localeid(self, eids, partid, etype):
"""Get the local edge ids within the given partition.
Parameters
----------
eids : tensor
global edge ids
global edge IDs
partid : int
partition ID
etype : str
The edge type
Returns
-------
tensor
local edge ids
local edge IDs
"""
@property
def partid(self):
"""Get the current partition id
"""Get the current partition ID
Return
------
int
The partition id of current machine
The partition ID of current machine
"""
@property
def ntypes(self):
"""Get the list of node types
"""
@property
def etypes(self):
"""Get the list of edge types
"""
class BasicPartitionBook(GraphPartitionBook):
......@@ -258,13 +345,13 @@ class BasicPartitionBook(GraphPartitionBook):
Parameters
----------
part_id : int
partition id of current partition book
partition ID of current partition book
num_parts : int
number of total partitions
node_map : tensor
global node id mapping to partition id
global node ID mapping to partition ID
edge_map : tensor
global edge id mapping to partition id
global edge ID mapping to partition ID
part_graph : DGLGraph
The graph partition structure.
"""
......@@ -342,47 +429,81 @@ class BasicPartitionBook(GraphPartitionBook):
"""
return self._partition_meta_data
def _num_nodes(self):
def _num_nodes(self, ntype='_N'):
""" The total number of nodes
"""
assert ntype == '_N', 'Base partition book only supports homogeneous graph.'
return len(self._nid2partid)
def _num_edges(self):
def _num_edges(self, etype='_E'):
""" The total number of edges
"""
assert etype == '_E', 'Base partition book only supports homogeneous graph.'
return len(self._eid2partid)
def nid2partid(self, nids):
def map_to_per_ntype(self, ids):
"""Map global homogeneous node IDs to node type IDs.
Returns
type_ids, per_type_ids
"""
return F.zeros((len(ids),), F.int32, F.cpu()), ids
def map_to_per_etype(self, ids):
"""Map global homogeneous edge IDs to edge type IDs.
Returns
type_ids, per_type_ids
"""
return F.zeros((len(ids),), F.int32, F.cpu()), ids
def map_to_homo_nid(self, ids, ntype):
"""Map per-node-type IDs to global node IDs in the homogeneous format.
"""
assert ntype == '_N', 'Base partition book only supports homogeneous graph.'
return ids
def map_to_homo_eid(self, ids, etype):
"""Map per-edge-type IDs to global edge IDs in the homoenegeous format.
"""
assert etype == '_E', 'Base partition book only supports homogeneous graph.'
return ids
def nid2partid(self, nids, ntype='_N'):
"""From global node IDs to partition IDs
"""
assert ntype == '_N', 'Base partition book only supports homogeneous graph.'
return F.gather_row(self._nid2partid, nids)
def eid2partid(self, eids):
def eid2partid(self, eids, etype='_E'):
"""From global edge IDs to partition IDs
"""
assert etype == '_E', 'Base partition book only supports homogeneous graph.'
return F.gather_row(self._eid2partid, eids)
def partid2nids(self, partid):
def partid2nids(self, partid, ntype='_N'):
"""From partition id to global node IDs
"""
assert ntype == '_N', 'Base partition book only supports homogeneous graph.'
return self._partid2nids[partid]
def partid2eids(self, partid):
def partid2eids(self, partid, etype='_E'):
"""From partition id to global edge IDs
"""
assert etype == '_E', 'Base partition book only supports homogeneous graph.'
return self._partid2eids[partid]
def nid2localnid(self, nids, partid):
def nid2localnid(self, nids, partid, ntype='_N'):
"""Get local node IDs within the given partition.
"""
assert ntype == '_N', 'Base partition book only supports homogeneous graph.'
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of nid2localnid.')
return F.gather_row(self._nidg2l[partid], nids)
def eid2localeid(self, eids, partid):
def eid2localeid(self, eids, partid, etype='_E'):
"""Get the local edge ids within the given partition.
"""
assert etype == '_E', 'Base partition book only supports homogeneous graph.'
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of eid2localeid.')
......@@ -390,10 +511,22 @@ class BasicPartitionBook(GraphPartitionBook):
@property
def partid(self):
"""Get the current partition id
"""Get the current partition ID
"""
return self._part_id
@property
def ntypes(self):
"""Get the list of node types
"""
return ['_N']
@property
def etypes(self):
"""Get the list of edge types
"""
return ['_E']
class RangePartitionBook(GraphPartitionBook):
"""This partition book supports more efficient storage of partition information.
......@@ -405,44 +538,131 @@ class RangePartitionBook(GraphPartitionBook):
Parameters
----------
part_id : int
partition id of current partition book
partition ID of current partition book
num_parts : int
number of total partitions
node_map : tensor
map global node id to partition id
edge_map : tensor
map global edge id to partition id
node_map : dict[str, Tensor]
Global node ID ranges within partitions for each node type. The key is the node type
name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is
the number of partitions. Each row has two integers: the starting and the ending IDs
for a particular node type in a partition. For example, all nodes of type ``"T"`` in
partition ``i`` has ID range ``node_map["T"][i][0]`` to ``node_map["T"][i][1]``.
edge_map : dict[str, Tensor]
Global edge ID ranges within partitions for each edge type. The key is the edge type
name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is
the number of partitions. Each row has two integers: the starting and the ending IDs
for a particular edge type in a partition. For example, all edges of type ``"T"`` in
partition ``i`` has ID range ``edge_map["T"][i][0]`` to ``edge_map["T"][i][1]``.
ntypes : dict[str, int]
map ntype strings to ntype IDs.
etypes : dict[str, int]
map etype strings to etype IDs.
"""
def __init__(self, part_id, num_parts, node_map, edge_map):
def __init__(self, part_id, num_parts, node_map, edge_map, ntypes, etypes):
assert part_id >= 0, 'part_id cannot be a negative number.'
assert num_parts > 0, 'num_parts must be greater than zero.'
self._partid = part_id
self._num_partitions = num_parts
if not isinstance(node_map, np.ndarray):
node_map = F.asnumpy(node_map)
if not isinstance(edge_map, np.ndarray):
edge_map = F.asnumpy(edge_map)
self._node_map = node_map
self._edge_map = edge_map
self._ntypes = [None] * len(ntypes)
self._etypes = [None] * len(etypes)
for ntype in ntypes:
ntype_id = ntypes[ntype]
self._ntypes[ntype_id] = ntype
assert all([ntype is not None for ntype in self._ntypes]), \
"The node types have invalid IDs."
for etype in etypes:
etype_id = etypes[etype]
self._etypes[etype_id] = etype
assert all([etype is not None for etype in self._etypes]), \
"The edge types have invalid IDs."
# This stores the node ID ranges for each node type in each partition.
# The key is the node type, the value is a NumPy matrix with two columns, in which
# each row indicates the start and the end of the node ID range in a partition.
# The node IDs are global node IDs in the homogeneous representation.
self._typed_nid_range = {}
# This stores the node ID map for per-node-type IDs in each partition.
# The key is the node type, the value is a NumPy vector which indicates
# the last node ID in a partition.
self._typed_max_node_ids = {}
max_node_map = np.zeros((num_parts,), dtype=np.int64)
for key in node_map:
if not isinstance(node_map[key], np.ndarray):
node_map[key] = F.asnumpy(node_map[key])
assert node_map[key].shape == (num_parts, 2)
self._typed_nid_range[key] = node_map[key]
# This is used for per-node-type lookup.
self._typed_max_node_ids[key] = np.cumsum(self._typed_nid_range[key][:, 1]
- self._typed_nid_range[key][:, 0])
# This is used for homogeneous node ID lookup.
max_node_map = np.maximum(self._typed_nid_range[key][:, 1], max_node_map)
# This is a vector that indicates the last node ID in each partition.
# The ID is the global ID in the homogeneous representation.
self._max_node_ids = max_node_map
# Similar to _typed_nid_range.
self._typed_eid_range = {}
# similar to _typed_max_node_ids.
self._typed_max_edge_ids = {}
max_edge_map = np.zeros((num_parts,), dtype=np.int64)
for key in edge_map:
if not isinstance(edge_map[key], np.ndarray):
edge_map[key] = F.asnumpy(edge_map[key])
assert edge_map[key].shape == (num_parts, 2)
self._typed_eid_range[key] = edge_map[key]
# This is used for per-edge-type lookup.
self._typed_max_edge_ids[key] = np.cumsum(self._typed_eid_range[key][:, 1]
- self._typed_eid_range[key][:, 0])
# This is used for homogeneous edge ID lookup.
max_edge_map = np.maximum(self._typed_eid_range[key][:, 1], max_edge_map)
# Similar to _max_node_ids
self._max_edge_ids = max_edge_map
# These two are map functions that map node/edge IDs to node/edge type IDs.
self._nid_map = IdMap(self._typed_nid_range)
self._eid_map = IdMap(self._typed_eid_range)
# Get meta data of the partition book
self._partition_meta_data = []
for partid in range(self._num_partitions):
nrange_start = node_map[partid - 1] if partid > 0 else 0
nrange_end = node_map[partid]
erange_start = edge_map[partid - 1] if partid > 0 else 0
erange_end = edge_map[partid]
nrange_start = max_node_map[partid - 1] if partid > 0 else 0
nrange_end = max_node_map[partid]
num_nodes = nrange_end - nrange_start
erange_start = max_edge_map[partid - 1] if partid > 0 else 0
erange_end = max_edge_map[partid]
num_edges = erange_end - erange_start
part_info = {}
part_info['machine_id'] = partid
part_info['num_nodes'] = int(nrange_end - nrange_start)
part_info['num_edges'] = int(erange_end - erange_start)
part_info['num_nodes'] = int(num_nodes)
part_info['num_edges'] = int(num_edges)
self._partition_meta_data.append(part_info)
def shared_memory(self, graph_name):
"""Move data to shared memory.
"""
self._meta = _move_metadata_to_shared_mem(
graph_name, self._num_nodes(), self._num_edges(), self._partid,
self._num_partitions, F.tensor(self._node_map), F.tensor(self._edge_map), True)
# we need to store the nid ranges and eid ranges of different types in the order defined
# by type IDs.
nid_range = [None] * len(self.ntypes)
for i, ntype in enumerate(self.ntypes):
nid_range[i] = (ntype, self._typed_nid_range[ntype])
nid_range_pickle = pickle.dumps(nid_range)
nid_range_pickle = [e for e in nid_range_pickle]
eid_range = [None] * len(self.etypes)
for i, etype in enumerate(self.etypes):
eid_range[i] = (etype, self._typed_eid_range[etype])
eid_range_pickle = pickle.dumps(eid_range)
eid_range_pickle = [e for e in eid_range_pickle]
self._meta = _move_metadata_to_shared_mem(graph_name,
0, # We don't need to provide the number of nodes
0, # We don't need to provide the number of edges
self._partid, self._num_partitions,
F.tensor(nid_range_pickle),
F.tensor(eid_range_pickle),
True)
def num_partitions(self):
"""Return the number of partitions.
......@@ -450,59 +670,109 @@ class RangePartitionBook(GraphPartitionBook):
return self._num_partitions
def _num_nodes(self):
def _num_nodes(self, ntype='_N'):
""" The total number of nodes
"""
return int(self._node_map[-1])
if ntype == '_N':
return int(self._max_node_ids[-1])
else:
return int(self._typed_max_node_ids[ntype][-1])
def _num_edges(self):
def _num_edges(self, etype='_E'):
""" The total number of edges
"""
return int(self._edge_map[-1])
if etype == '_E':
return int(self._max_edge_ids[-1])
else:
return int(self._typed_max_edge_ids[etype][-1])
def metadata(self):
"""Return the partition meta data.
"""
return self._partition_meta_data
def map_to_per_ntype(self, ids):
"""Map global homogeneous node IDs to node type IDs.
Returns
type_ids, per_type_ids
"""
return self._nid_map(ids)
def map_to_per_etype(self, ids):
"""Map global homogeneous edge IDs to edge type IDs.
Returns
type_ids, per_type_ids
"""
return self._eid_map(ids)
def nid2partid(self, nids):
def map_to_homo_nid(self, ids, ntype):
"""Map per-node-type IDs to global node IDs in the homogeneous format.
"""
ids = utils.toindex(ids).tousertensor()
partids = self.nid2partid(ids, ntype)
end_diff = F.tensor(self._typed_max_node_ids[ntype])[partids] - ids
return F.tensor(self._typed_nid_range[ntype][:, 1])[partids] - end_diff
def map_to_homo_eid(self, ids, etype):
"""Map per-edge-type IDs to global edge IDs in the homoenegeous format.
"""
ids = utils.toindex(ids).tousertensor()
partids = self.eid2partid(ids, etype)
end_diff = F.tensor(self._typed_max_edge_ids[etype][partids]) - ids
return F.tensor(self._typed_eid_range[etype][:, 1])[partids] - end_diff
def nid2partid(self, nids, ntype='_N'):
"""From global node IDs to partition IDs
"""
nids = utils.toindex(nids)
ret = np.searchsorted(self._node_map, nids.tonumpy(), side='right')
if ntype == '_N':
ret = np.searchsorted(self._max_node_ids, nids.tonumpy(), side='right')
else:
ret = np.searchsorted(self._typed_max_node_ids[ntype], nids.tonumpy(), side='right')
ret = utils.toindex(ret)
return ret.tousertensor()
def eid2partid(self, eids):
def eid2partid(self, eids, etype='_E'):
"""From global edge IDs to partition IDs
"""
eids = utils.toindex(eids)
ret = np.searchsorted(self._edge_map, eids.tonumpy(), side='right')
if etype == '_E':
ret = np.searchsorted(self._max_edge_ids, eids.tonumpy(), side='right')
else:
ret = np.searchsorted(self._typed_max_edge_ids[etype], eids.tonumpy(), side='right')
ret = utils.toindex(ret)
return ret.tousertensor()
def partid2nids(self, partid):
"""From partition id to global node IDs
def partid2nids(self, partid, ntype='_N'):
"""From partition ID to global node IDs
"""
# TODO do we need to cache it?
start = self._node_map[partid - 1] if partid > 0 else 0
end = self._node_map[partid]
return F.arange(start, end)
if ntype == '_N':
start = self._max_node_ids[partid - 1] if partid > 0 else 0
end = self._max_node_ids[partid]
return F.arange(start, end)
else:
start = self._typed_max_node_ids[ntype][partid - 1] if partid > 0 else 0
end = self._typed_max_node_ids[ntype][partid]
return F.arange(start, end)
def partid2eids(self, partid):
"""From partition id to global edge IDs
def partid2eids(self, partid, etype='_E'):
"""From partition ID to global edge IDs
"""
# TODO do we need to cache it?
start = self._edge_map[partid - 1] if partid > 0 else 0
end = self._edge_map[partid]
return F.arange(start, end)
if etype == '_E':
start = self._max_edge_ids[partid - 1] if partid > 0 else 0
end = self._max_edge_ids[partid]
return F.arange(start, end)
else:
start = self._typed_max_edge_ids[etype][partid - 1] if partid > 0 else 0
end = self._typed_max_edge_ids[etype][partid]
return F.arange(start, end)
def nid2localnid(self, nids, partid):
def nid2localnid(self, nids, partid, ntype='_N'):
"""Get local node IDs within the given partition.
"""
if partid != self._partid:
......@@ -511,12 +781,15 @@ class RangePartitionBook(GraphPartitionBook):
nids = utils.toindex(nids)
nids = nids.tousertensor()
start = self._node_map[partid - 1] if partid > 0 else 0
if ntype == '_N':
start = self._max_node_ids[partid - 1] if partid > 0 else 0
else:
start = self._typed_max_node_ids[ntype][partid - 1] if partid > 0 else 0
return nids - int(start)
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
def eid2localeid(self, eids, partid, etype='_E'):
"""Get the local edge IDs within the given partition.
"""
if partid != self._partid:
raise RuntimeError('Now RangePartitionBook does not support \
......@@ -524,16 +797,31 @@ class RangePartitionBook(GraphPartitionBook):
eids = utils.toindex(eids)
eids = eids.tousertensor()
start = self._edge_map[partid - 1] if partid > 0 else 0
if etype == '_E':
start = self._max_edge_ids[partid - 1] if partid > 0 else 0
else:
start = self._typed_max_edge_ids[etype][partid - 1] if partid > 0 else 0
return eids - int(start)
@property
def partid(self):
"""Get the current partition id
"""Get the current partition ID.
"""
return self._partid
@property
def ntypes(self):
"""Get the list of node types
"""
return self._ntypes
@property
def etypes(self):
"""Get the list of edge types
"""
return self._etypes
NODE_PART_POLICY = 'node'
EDGE_PART_POLICY = 'edge'
......@@ -550,14 +838,19 @@ class PartitionPolicy(object):
Parameters
----------
policy_str : str
Partition policy name, e.g., 'edge' or 'node'.
Partition policy name, e.g., 'edge:_E' or 'node:_N'.
partition_book : GraphPartitionBook
A graph partition book
"""
def __init__(self, policy_str, partition_book):
# TODO(chao): support more policies for HeteroGraph
assert policy_str in (EDGE_PART_POLICY, NODE_PART_POLICY), \
'policy_str must be \'edge\' or \'node\'.'
splits = policy_str.split(':')
if len(splits) == 1:
assert policy_str in (EDGE_PART_POLICY, NODE_PART_POLICY), \
'policy_str must contain \'edge\' or \'node\'.'
if NODE_PART_POLICY == policy_str:
policy_str = NODE_PART_POLICY + ":_N"
else:
policy_str = EDGE_PART_POLICY + ":_E"
self._policy_str = policy_str
self._part_id = partition_book.partid
self._partition_book = partition_book
......@@ -595,6 +888,12 @@ class PartitionPolicy(object):
"""
return self._partition_book
def get_data_name(self, name):
"""Get HeteroDataName
"""
is_node = NODE_PART_POLICY in self._policy_str
return HeteroDataName(is_node, self._policy_str[5:], name)
def to_local(self, id_tensor):
"""Mapping global ID to local ID.
......@@ -608,10 +907,10 @@ class PartitionPolicy(object):
tensor
local ID tensor
"""
if self._policy_str == EDGE_PART_POLICY:
return self._partition_book.eid2localeid(id_tensor, self._part_id)
elif self._policy_str == NODE_PART_POLICY:
return self._partition_book.nid2localnid(id_tensor, self._part_id)
if EDGE_PART_POLICY in self._policy_str:
return self._partition_book.eid2localeid(id_tensor, self._part_id, self._policy_str[5:])
elif NODE_PART_POLICY in self._policy_str:
return self._partition_book.nid2localnid(id_tensor, self._part_id, self._policy_str[5:])
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
......@@ -628,10 +927,10 @@ class PartitionPolicy(object):
tensor
partition ID
"""
if self._policy_str == EDGE_PART_POLICY:
return self._partition_book.eid2partid(id_tensor)
elif self._policy_str == NODE_PART_POLICY:
return self._partition_book.nid2partid(id_tensor)
if EDGE_PART_POLICY in self._policy_str:
return self._partition_book.eid2partid(id_tensor, self._policy_str[5:])
elif NODE_PART_POLICY in self._policy_str:
return self._partition_book.nid2partid(id_tensor, self._policy_str[5:])
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
......@@ -643,10 +942,10 @@ class PartitionPolicy(object):
int
data size
"""
if self._policy_str == EDGE_PART_POLICY:
return len(self._partition_book.partid2eids(self._part_id))
elif self._policy_str == NODE_PART_POLICY:
return len(self._partition_book.partid2nids(self._part_id))
if EDGE_PART_POLICY in self._policy_str:
return len(self._partition_book.partid2eids(self._part_id, self._policy_str[5:]))
elif NODE_PART_POLICY in self._policy_str:
return len(self._partition_book.partid2nids(self._part_id, self._policy_str[5:]))
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
......@@ -658,9 +957,94 @@ class PartitionPolicy(object):
int
data size
"""
if self._policy_str == EDGE_PART_POLICY:
return self._partition_book._num_edges()
elif self._policy_str == NODE_PART_POLICY:
return self._partition_book._num_nodes()
if EDGE_PART_POLICY in self._policy_str:
return self._partition_book._num_edges(self._policy_str[5:])
elif NODE_PART_POLICY in self._policy_str:
return self._partition_book._num_nodes(self._policy_str[5:])
else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str)
class NodePartitionPolicy(PartitionPolicy):
'''Partition policy for nodes.
'''
def __init__(self, partition_book, ntype='_N'):
super(NodePartitionPolicy, self).__init__(NODE_PART_POLICY + ':' + ntype, partition_book)
class EdgePartitionPolicy(PartitionPolicy):
'''Partition policy for edges.
'''
def __init__(self, partition_book, etype='_E'):
super(EdgePartitionPolicy, self).__init__(EDGE_PART_POLICY + ':' + etype, partition_book)
class HeteroDataName(object):
''' The data name in a heterogeneous graph.
A unique data name has three components:
* indicate it's node data or edge data.
* indicate the node/edge type.
* the name of the data.
Parameters
----------
is_node : bool
Indicate whether it's node data or edge data.
entity_type : str
The type of the node/edge.
data_name : str
The name of the data.
'''
def __init__(self, is_node, entity_type, data_name):
self.policy_str = NODE_PART_POLICY if is_node else EDGE_PART_POLICY
self.policy_str = self.policy_str + ':' + entity_type
self.data_name = data_name
def is_node(self):
''' Is this the name of node data
'''
return NODE_PART_POLICY in self.policy_str
def is_edge(self):
''' Is this the name of edge data
'''
return EDGE_PART_POLICY in self.policy_str
def get_type(self):
''' The type of the node/edge.
This is only meaningful in a heterogeneous graph.
In homogeneous graph, type is '_N' for a node and '_E' for an edge.
'''
return self.policy_str[5:]
def get_name(self):
''' The name of the data.
'''
return self.data_name
def __str__(self):
''' The full name of the data.
The full name is used as the key in the KVStore.
'''
return self.policy_str + ':' + self.data_name
def parse_hetero_data_name(name):
'''Parse data name and create HeteroDataName.
The data name has a specialized format. We can parse the name to determine if
it's node data or edge data, node/edge type and its actual name. The data name
has three fields and they are separated by ":".
Parameters
----------
name : str
The data name
Returns
-------
HeteroDataName
'''
names = name.split(':')
assert len(names) == 3, '{} is not a valid heterograph data name'.format(name)
assert names[0] in (NODE_PART_POLICY, EDGE_PART_POLICY), \
'{} is not a valid heterograph data name'.format(name)
return HeteroDataName(names[0] == NODE_PART_POLICY, names[1], names[2])
......@@ -47,10 +47,10 @@ class FindEdgeResponse(Response):
def _sample_neighbors(local_g, partition_book, seed_nodes, fan_out, edge_dir, prob, replace):
""" Sample from local partition.
The input nodes use global Ids. We need to map the global node Ids to local node Ids,
perform sampling and map the sampled results to the global Ids space again.
The input nodes use global IDs. We need to map the global node IDs to local node IDs,
perform sampling and map the sampled results to the global IDs space again.
The sampled results are stored in three vectors that store source nodes, destination nodes
and edge Ids.
and edge IDs.
"""
local_ids = partition_book.nid2localnid(seed_nodes, partition_book.partid)
local_ids = F.astype(local_ids, local_g.idtype)
......@@ -59,7 +59,8 @@ def _sample_neighbors(local_g, partition_book, seed_nodes, fan_out, edge_dir, pr
local_g, local_ids, fan_out, edge_dir, prob, replace, _dist_training=True)
global_nid_mapping = local_g.ndata[NID]
src, dst = sampled_graph.edges()
global_src, global_dst = global_nid_mapping[src], global_nid_mapping[dst]
global_src, global_dst = F.gather_row(global_nid_mapping, src), \
F.gather_row(global_nid_mapping, dst)
global_eids = F.gather_row(local_g.edata[EID], sampled_graph.edata[EID])
return global_src, global_dst, global_eids
......@@ -78,10 +79,10 @@ def _find_edges(local_g, partition_book, seed_edges):
def _in_subgraph(local_g, partition_book, seed_nodes):
""" Get in subgraph from local partition.
The input nodes use global Ids. We need to map the global node Ids to local node Ids,
get in-subgraph and map the sampled results to the global Ids space again.
The input nodes use global IDs. We need to map the global node IDs to local node IDs,
get in-subgraph and map the sampled results to the global IDs space again.
The results are stored in three vectors that store source nodes, destination nodes
and edge Ids.
and edge IDs.
"""
local_ids = partition_book.nid2localnid(seed_nodes, partition_book.partid)
local_ids = F.astype(local_ids, local_g.idtype)
......@@ -254,7 +255,19 @@ def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False):
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
For now, we only support the input graph with one node type and one edge type.
This version provides an experimental support for heterogeneous graphs.
When the input graph is heterogeneous, the sampled subgraph is still stored in
the homogeneous graph format. That is, all nodes and edges are assigned with
unique IDs (in contrast, we typically use a type name and a node/edge ID to
identify a node or an edge in ``DGLGraph``). We refer to this type of IDs
as *homogeneous ID*.
Users can use :func:`dgl.distributed.GraphPartitionBook.map_to_per_ntype`
and :func:`dgl.distributed.GraphPartitionBook.map_to_per_etype`
to identify their node/edge types and node/edge IDs of that type.
For heterogeneous graphs, ``nodes`` can be a dictionary whose key is node type
and the value is type-specific node IDs; ``nodes`` can also be a tensor of
*homogeneous ID*.
Parameters
----------
......@@ -292,9 +305,17 @@ def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False):
DGLGraph
A sampled subgraph containing only the sampled neighboring edges. It is on CPU.
"""
gpb = g.get_partition_book()
if isinstance(nodes, dict):
assert len(nodes) == 1, 'The distributed sampler only supports one node type for now.'
nodes = list(nodes.values())[0]
homo_nids = []
for ntype in nodes:
assert ntype in g.ntypes, 'The sampled node type does not exist in the input graph'
if F.is_tensor(nodes[ntype]):
typed_nodes = nodes[ntype]
else:
typed_nodes = toindex(nodes[ntype]).tousertensor()
homo_nids.append(gpb.map_to_homo_nid(typed_nodes, ntype))
nodes = F.cat(homo_nids, 0)
def issue_remote_req(node_ids):
return SamplingRequest(node_ids, fanout, edge_dir=edge_dir,
prob=prob, replace=replace)
......
"""Module for mapping between node/edge IDs and node/edge types."""
import numpy as np
from .._ffi.function import _init_api
from .. import backend as F
from .. import utils
class IdMap:
'''A map for converting node/edge IDs to their type IDs and type-wise IDs.
For a heterogeneous graph, DGL assigns an integer ID to each node/edge type;
node and edge of different types have independent IDs starting from zero.
Therefore, a node/edge can be uniquely identified by an ID pair,
``(type_id, type_wise_id)``. To make it convenient for distributed processing,
DGL further encodes the ID pair into one integer ID, which we refer to
as *homogeneous ID*.
DGL arranges nodes and edges so that all nodes of the same type have contiguous
homogeneous IDs. If the graph is partitioned, the nodes/edges of the same type
within a partition have contiguous homogeneous IDs.
Below is an example adjancency matrix of an unpartitioned heterogeneous graph
stored using the above ID assignment. Here, the graph has two types of nodes
(``T0`` and ``T1``), and four types of edges (``R0``, ``R1``, ``R2``, ``R3``).
There are a total of 400 nodes in the graph and each type has 200 nodes. Nodes
of type 0 have IDs in [0,200), while nodes of type 1 have IDs in [200, 400).
```
0 <- T0 -> 200 <- T1 -> 400
0 +-----------+------------+
| | |
^ | R0 | R1 |
T0 | | |
v | | |
200 +-----------+------------+
| | |
^ | R2 | R3 |
T1 | | |
v | | |
400 +-----------+------------+
```
Below shows the adjacency matrix after the graph is partitioned into two.
Note that each partition still has two node types and four edge types,
and nodes/edges of the same type have contiguous IDs.
```
partition 0 partition 1
0 <- T0 -> 100 <- T1 -> 200 <- T0 -> 300 <- T1 -> 400
0 +-----------+------------+-----------+------------+
| | | |
^ | R0 | R1 | |
T0 | | | |
v | | | |
100 +-----------+------------+ |
| | | |
^ | R2 | R3 | |
T1 | | | |
v | | | |
200 +-----------+------------+-----------+------------+
| | | |
^ | | R0 | R1 |
T0 | | | |
v | | | |
100 | +-----------+------------+
| | | |
^ | | R2 | R3 |
T1 | | | |
v | | | |
200 +-----------+------------+-----------+------------+
```
The following table is an alternative way to represent the above ID assignments.
It is easy to see that the homogeneous ID range [0, 100) is used for nodes of type 0
in partition 0, [100, 200) is used for nodes of type 1 in partition 0, and so on.
```
+---------+------+----------
range | type | partition
[0, 100) | 0 | 0
[100,200) | 1 | 0
[200,300) | 0 | 1
[300,400) | 1 | 1
```
The goal of this class is to, given a node's homogenous ID, convert it into the
ID pair ``(type_id, type_wise_id)``. For example, homogeneous node ID 90 is mapped
to (0, 90); homogeneous node ID 201 is mapped to (0, 101).
Parameters
----------
id_ranges : dict[str, Tensor].
Node ID ranges within partitions for each node type. The key is the node type
name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is
the number of partitions. Each row has two integers: the starting and the ending IDs
for a particular node type in a partition. For example, all nodes of type ``"T"`` in
partition ``i`` has ID range ``id_ranges["T"][i][0]`` to ``id_ranges["T"][i][1]``.
It is the same as the `node_map` argument in `RangePartitionBook`.
'''
def __init__(self, id_ranges):
self.num_parts = list(id_ranges.values())[0].shape[0]
self.num_types = len(id_ranges)
ranges = np.zeros((self.num_parts * self.num_types, 2), dtype=np.int64)
typed_map = []
id_ranges = list(id_ranges.values())
id_ranges.sort(key=lambda a: a[0, 0])
for i, id_range in enumerate(id_ranges):
ranges[i::self.num_types] = id_range
map1 = np.cumsum(id_range[:, 1] - id_range[:, 0])
typed_map.append(map1)
assert np.all(np.diff(ranges[:, 0]) >= 0)
assert np.all(np.diff(ranges[:, 1]) >= 0)
self.range_start = utils.toindex(np.ascontiguousarray(ranges[:, 0]))
self.range_end = utils.toindex(np.ascontiguousarray(ranges[:, 1]) - 1)
self.typed_map = utils.toindex(np.concatenate(typed_map))
def __call__(self, ids):
'''Convert the homogeneous IDs to (type_id, type_wise_id).
Parameters
----------
ids : 1D tensor
The homogeneous ID.
Returns
-------
type_ids : Tensor
Type IDs
per_type_ids : Tensor
Type-wise IDs
'''
if self.num_types == 0:
return F.zeros((len(ids),), F.dtype(ids), F.cpu()), ids
if len(ids) == 0:
return ids, ids
ids = utils.toindex(ids)
ret = _CAPI_DGLHeteroMapIds(ids.todgltensor(),
self.range_start.todgltensor(),
self.range_end.todgltensor(),
self.typed_map.todgltensor(),
self.num_parts, self.num_types)
ret = utils.toindex(ret).tousertensor()
return ret[:len(ids)], ret[len(ids):]
_init_api("dgl.distributed.id_map")
......@@ -886,9 +886,9 @@ class KVClient(object):
def push_handler(data_store, name, local_offset, data)
```
`data_store` is a dict that contains all tensors in the kvstore. `name` is the name
of the tensor where new data is pushed to. `local_offset` is the offset where new
data should be written in the tensor in the local partition. `data` is the new data
``data_store`` is a dict that contains all tensors in the kvstore. ``name`` is the name
of the tensor where new data is pushed to. ``local_offset`` is the offset where new
data should be written in the tensor in the local partition. ``data`` is the new data
to be written.
Parameters
......@@ -919,8 +919,8 @@ class KVClient(object):
def pull_handler(data_store, name, local_offset)
```
`data_store` is a dict that contains all tensors in the kvstore. `name` is the name
of the tensor where new data is pushed to. `local_offset` is the offset where new
``data_store`` is a dict that contains all tensors in the kvstore. ``name`` is the name
of the tensor where new data is pushed to. ``local_offset`` is the offset where new
data should be written in the tensor in the local partition.
Parameters
......
......@@ -6,12 +6,42 @@ import time
import numpy as np
from .. import backend as F
from ..base import NID, EID
from ..base import NID, EID, NTYPE, ETYPE, dgl_warning
from ..convert import to_homogeneous
from ..random import choice as random_choice
from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors
from ..transform import metis_partition_assignment, partition_graph_with_halo
from .graph_partition_book import BasicPartitionBook, RangePartitionBook
def _get_inner_node_mask(graph, ntype_id):
if NTYPE in graph.ndata:
dtype = F.dtype(graph.ndata['inner_node'])
return graph.ndata['inner_node'] * F.astype(graph.ndata[NTYPE] == ntype_id, dtype) == 1
else:
return graph.ndata['inner_node'] == 1
def _get_inner_edge_mask(graph, etype_id):
if ETYPE in graph.edata:
dtype = F.dtype(graph.edata['inner_edge'])
return graph.edata['inner_edge'] * F.astype(graph.edata[ETYPE] == etype_id, dtype) == 1
else:
return graph.edata['inner_edge'] == 1
def _get_part_ranges(id_ranges):
res = {}
for key in id_ranges:
# Normally, each element has two values that represent the starting ID and the ending ID
# of the ID range in a partition.
# If not, the data is probably still in the old format, in which only the ending ID is
# stored. We need to convert it to the format we expect.
if not isinstance(id_ranges[key][0], list):
start = 0
for i, end in enumerate(id_ranges[key]):
id_ranges[key][i] = [start, end]
start = end
res[key] = np.concatenate([np.array(l) for l in id_ranges[key]]).reshape(-1, 2)
return res
def load_partition(part_config, part_id):
''' Load data of a partition from the data path.
......@@ -30,20 +60,24 @@ def load_partition(part_config, part_id):
part_config : str
The path of the partition config file.
part_id : int
The partition Id.
The partition ID.
Returns
-------
DGLGraph
The graph partition structure.
dict of tensors
Dict[str, Tensor]
Node features.
dict of tensors
Dict[str, Tensor]
Edge features.
GraphPartitionBook
The graph partition information.
str
The graph name
List[str]
The node types
List[str]
The edge types
'''
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
......@@ -55,15 +89,46 @@ def load_partition(part_config, part_id):
node_feats = load_tensors(part_files['node_feats'])
edge_feats = load_tensors(part_files['edge_feats'])
graph = load_graphs(part_files['part_graph'])[0][0]
assert NID in graph.ndata, "the partition graph should contain node mapping to global node Id"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge Id"
gpb, graph_name = load_partition_book(part_config, part_id, graph)
nids = F.boolean_mask(graph.ndata[NID], graph.ndata['inner_node'])
partids = gpb.nid2partid(nids)
assert np.all(F.asnumpy(partids == part_id)), 'load a wrong partition'
return graph, node_feats, edge_feats, gpb, graph_name
# In the old format, the feature name doesn't contain node/edge type.
# For compatibility, let's add node/edge types to the feature names.
node_feats1 = {}
edge_feats1 = {}
for name in node_feats:
feat = node_feats[name]
if name.find('/') == -1:
name = '_N/' + name
node_feats1[name] = feat
for name in edge_feats:
feat = edge_feats[name]
if name.find('/') == -1:
name = '_E/' + name
edge_feats1[name] = feat
node_feats = node_feats1
edge_feats = edge_feats1
assert NID in graph.ndata, "the partition graph should contain node mapping to global node ID"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge ID"
gpb, graph_name, ntypes, etypes = load_partition_book(part_config, part_id, graph)
for ntype in ntypes:
ntype_id = ntypes[ntype]
# graph.ndata[NID] are global homogeneous node IDs.
nids = F.boolean_mask(graph.ndata[NID], _get_inner_node_mask(graph, ntype_id))
partids1 = gpb.nid2partid(nids)
_, per_type_nids = gpb.map_to_per_ntype(nids)
partids2 = gpb.nid2partid(per_type_nids, ntype)
assert np.all(F.asnumpy(partids1 == part_id)), 'load a wrong partition'
assert np.all(F.asnumpy(partids2 == part_id)), 'load a wrong partition'
for etype in etypes:
etype_id = etypes[etype]
# graph.edata[EID] are global homogeneous edge IDs.
eids = F.boolean_mask(graph.edata[EID], _get_inner_edge_mask(graph, etype_id))
partids1 = gpb.eid2partid(eids)
_, per_type_eids = gpb.map_to_per_etype(eids)
partids2 = gpb.eid2partid(per_type_eids, etype)
assert np.all(F.asnumpy(partids1 == part_id)), 'load a wrong partition'
assert np.all(F.asnumpy(partids2 == part_id)), 'load a wrong partition'
return graph, node_feats, edge_feats, gpb, graph_name, ntypes, etypes
def load_partition_book(part_config, part_id, graph=None):
''' Load a graph partition book from the partition config file.
......@@ -73,7 +138,7 @@ def load_partition_book(part_config, part_id, graph=None):
part_config : str
The path of the partition config file.
part_id : int
The partition Id.
The partition ID.
graph : DGLGraph
The graph structure
......@@ -83,6 +148,10 @@ def load_partition_book(part_config, part_id, graph=None):
The global partition information.
str
The graph name
dict
The node types
dict
The edge types
'''
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
......@@ -99,18 +168,44 @@ def load_partition_book(part_config, part_id, graph=None):
# If this is a range partitioning, node_map actually stores a list, whose elements
# indicate the boundary of range partitioning. Otherwise, node_map stores a filename
# that contains node map in a NumPy array.
is_range_part = isinstance(part_metadata['node_map'], list)
node_map = part_metadata['node_map'] if is_range_part else np.load(part_metadata['node_map'])
edge_map = part_metadata['edge_map'] if is_range_part else np.load(part_metadata['edge_map'])
assert isinstance(node_map, list) == isinstance(edge_map, list), \
"The node map and edge map need to have the same format"
node_map = part_metadata['node_map']
edge_map = part_metadata['edge_map']
if isinstance(node_map, dict):
for key in node_map:
is_range_part = isinstance(node_map[key], list)
break
elif isinstance(node_map, list):
is_range_part = True
node_map = {'_N': node_map}
else:
is_range_part = False
if isinstance(edge_map, list):
edge_map = {'_E': edge_map}
ntypes = {'_N': 0}
etypes = {'_E': 0}
if 'ntypes' in part_metadata:
ntypes = part_metadata['ntypes']
if 'etypes' in part_metadata:
etypes = part_metadata['etypes']
if isinstance(node_map, dict):
for key in node_map:
assert key in ntypes, 'The node type {} is invalid'.format(key)
if isinstance(edge_map, dict):
for key in edge_map:
assert key in etypes, 'The edge type {} is invalid'.format(key)
if is_range_part:
return RangePartitionBook(part_id, num_parts, np.array(node_map),
np.array(edge_map)), part_metadata['graph_name']
node_map = _get_part_ranges(node_map)
edge_map = _get_part_ranges(edge_map)
return RangePartitionBook(part_id, num_parts, node_map, edge_map, ntypes, etypes), \
part_metadata['graph_name'], ntypes, etypes
else:
return BasicPartitionBook(part_id, num_parts, node_map, edge_map,
graph), part_metadata['graph_name']
node_map = np.load(node_map)
edge_map = np.load(edge_map)
return BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph), \
part_metadata['graph_name'], ntypes, etypes
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
reshuffle=True, balance_ntypes=None, balance_edges=False):
......@@ -121,9 +216,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
the node assignment; 3) split the node features and edge features based on
the partition result.
When a graph is partitioned, each partition can contain *HALO* nodes and edges, which are
the ones that belong to
other partitions but are included in this partition for integrity or efficiency concerns.
When a graph is partitioned, each partition can contain *HALO* nodes, which are assigned
to other partitions but are included in this partition for efficiency purpose.
In this document, *local nodes/edges* refers to the nodes and edges that truly belong to
a partition. The rest are "HALO nodes/edges".
......@@ -145,7 +239,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
|-- graph.dgl
First, the metadata of the original graph and the partitioning is stored in a JSON file
named after `graph_name`. This JSON file contains the information of the original graph
named after ``graph_name``. This JSON file contains the information of the original graph
as well as the path of the files that store each partition. Below show an example.
.. code-block:: none
......@@ -155,8 +249,16 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
"part_method" : "metis",
"num_parts" : 2,
"halo_hops" : 1,
"node_map" : "data_root_dir/node_map.npy",
"edge_map" : "data_root_dir/edge_map.npy"
"node_map": {
"_U": [ [ 0, 1261310 ],
[ 1261310, 2449029 ] ]
},
"edge_map": {
"_V": [ [ 0, 62539528 ],
[ 62539528, 123718280 ] ]
},
"etypes": { "_V": 0 },
"ntypes": { "_U": 0 },
"num_nodes" : 1000000,
"num_edges" : 52000000,
"part-0" : {
......@@ -173,36 +275,64 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
Here are the definition of the fields in the partition configuration file:
* `graph_name` is the name of the graph given by a user.
* `part_method` is the method used to assign nodes to partitions.
* ``graph_name`` is the name of the graph given by a user.
* ``part_method`` is the method used to assign nodes to partitions.
Currently, it supports "random" and "metis".
* `num_parts` is the number of partitions.
* `halo_hops` is the number of HALO nodes we want to include in a partition.
* `node_map` is the node assignment map, which tells the partition Id a node is assigned to.
* `edge_map` is the edge assignment map, which tells the partition Id an edge is assigned to.
* `num_nodes` is the number of nodes in the global graph.
* `num_edges` is the number of edges in the global graph.
* ``num_parts`` is the number of partitions.
* ``halo_hops`` is the number of hops of nodes we include in a partition as HALO nodes.
* ``node_map`` is the node assignment map, which tells the partition ID a node is assigned to.
The format of ``node_map`` is described below.
* ``edge_map`` is the edge assignment map, which tells the partition ID an edge is assigned to.
* ``num_nodes`` is the number of nodes in the global graph.
* ``num_edges`` is the number of edges in the global graph.
* `part-*` stores the data of a partition.
If node IDs and edge IDs are not shuffled to ensure that all nodes/edges in a partition
fall into a contiguous ID range, DGL needs to store node/edge mappings (from
If ``reshuffle=False``, node IDs and edge IDs of a partition do not fall into contiguous
ID ranges. In this case, DGL stores node/edge mappings (from
node/edge IDs to partition IDs) in separate files (node_map.npy and edge_map.npy).
The node/edge mappings are stored in numpy files.
.. warning::
this format is deprecated and will not be supported by the next release. In other words,
the future release will always shuffle node IDs and edge IDs when partitioning a graph.
If ``reshuffle=True``, ``node_map`` and ``edge_map`` contains the information
for mapping between global node/edge IDs to partition-local node/edge IDs.
For heterogeneous graphs, the information in ``node_map`` and ``edge_map`` can also be used
to compute node types and edge types. The format of the data in ``node_map`` and ``edge_map``
is as follows:
.. code-block:: none
{
"node_type": [ [ part1_start, part1_end ],
[ part2_start, part2_end ],
... ],
...
},
Essentially, ``node_map`` and ``edge_map`` are dictionaries. The keys are
node/edge types. The values are lists of pairs containing the start and end of
the ID range for the corresponding types in a partition.
The length of the list is the number of
partitions; each element in the list is a tuple that stores the start and the end of
an ID range for a particular node/edge type in the partition.
The graph structure of a partition is stored in a file with the DGLGraph format.
Nodes in each partition is *relabeled* to always start with zero. We call the node
ID in the original graph, *global ID*, while the relabeled ID in each partition,
*local ID*. Each partition graph has an integer node data tensor stored under name
`dgl.NID` and each value is the node's global ID. Similarly, edges are relabeled too
and the mapping from local ID to global ID is stored as an integer edge data tensor
under name `dgl.EID`.
under name `dgl.EID`. For a heterogeneous graph, the DGLGraph also contains a node
data `dgl.NTYPE` for node type and an edge data `dgl.ETYPE` for the edge type.
The partition graph contains additional node data ("inner_node" and "orig_id") and
edge data ("inner_edge"):
* "inner_node" indicates whether a node belongs to a partition.
* "inner_edge" indicates whether an edge belongs to a partition.
* "orig_id" exists when reshuffle=True. It indicates the original node Ids in the original
* "orig_id" exists when reshuffle=True. It indicates the original node IDs in the original
graph before reshuffling.
Node and edge features are splitted and stored together with each graph partition.
......@@ -214,8 +344,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
Current, it supports two constrants to balance the partitioning. By default, Metis
always tries to balance the number of nodes in each partition.
* `balance_ntypes` balances the number of nodes of different types in each partition.
* `balance_edges` balances the number of edges in each partition.
* ``balance_ntypes`` balances the number of nodes of different types in each partition.
* ``balance_edges`` balances the number of edges in each partition.
To balance the node types, a user needs to pass a vector of N elements to indicate
the type of each node. N is the number of nodes in the input graph.
......@@ -238,7 +368,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
The partition method. It supports "random" and "metis". The default value is "metis".
reshuffle : bool, optional
Reshuffle nodes and edges so that nodes and edges in a partition are in
contiguous Id range. The default value is True
contiguous ID range. The default value is True. The argument is deprecated
and will be removed in the next release.
balance_ntypes : tensor, optional
Node type of each node. This is a 1D-array of integers. Its values indicates the node
type of each node. This argument is used by Metis partition. When the argument is
......@@ -258,26 +389,115 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
>>> g, node_feats, edge_feats, gpb, graph_name = dgl.distributed.load_partition(
... 'output/test.json', 0)
'''
def get_homogeneous(g, balance_ntypes):
if len(g.etypes) == 1:
sim_g = g
if isinstance(balance_ntypes, dict):
assert len(balance_ntypes) == 1
bal_ntypes = list(balance_ntypes.values())[0]
else:
bal_ntypes = balance_ntypes
elif isinstance(balance_ntypes, dict):
# Here we assign node types for load balancing.
# The new node types includes the ones provided by users.
num_ntypes = 0
for key in g.ntypes:
if key in balance_ntypes:
g.nodes[key].data['bal_ntype'] = F.astype(balance_ntypes[key],
F.int32) + num_ntypes
uniq_ntypes = F.unique(balance_ntypes[key])
assert np.all(F.asnumpy(uniq_ntypes) == np.arange(len(uniq_ntypes)))
num_ntypes += len(uniq_ntypes)
else:
g.nodes[key].data['bal_ntype'] = F.ones((g.number_of_nodes(key),), F.int32,
F.cpu()) * num_ntypes
num_ntypes += 1
sim_g = to_homogeneous(g, ndata=['bal_ntype'])
bal_ntypes = sim_g.ndata['bal_ntype']
print('The graph has {} node types and balance among {} types'.format(
len(g.ntypes), len(F.unique(bal_ntypes))))
# We now no longer need them.
for key in g.ntypes:
del g.nodes[key].data['bal_ntype']
del sim_g.ndata['bal_ntype']
else:
sim_g = to_homogeneous(g)
bal_ntypes = sim_g.ndata[NTYPE]
return sim_g, bal_ntypes
if not reshuffle:
dgl_warning("The argument reshuffle will be deprecated in the next release. "
"For heterogeneous graphs, reshuffle must be enabled.")
if num_parts == 1:
parts = {0: g}
node_parts = F.zeros((g.number_of_nodes(),), F.int64, F.cpu())
g.ndata[NID] = F.arange(0, g.number_of_nodes())
g.edata[EID] = F.arange(0, g.number_of_edges())
g.ndata['inner_node'] = F.ones((g.number_of_nodes(),), F.int8, F.cpu())
g.edata['inner_edge'] = F.ones((g.number_of_edges(),), F.int8, F.cpu())
sim_g = to_homogeneous(g)
node_parts = F.zeros((sim_g.number_of_nodes(),), F.int64, F.cpu())
parts = {}
if reshuffle:
g.ndata['orig_id'] = F.arange(0, g.number_of_nodes())
g.edata['orig_id'] = F.arange(0, g.number_of_edges())
parts[0] = sim_g.clone()
parts[0].ndata[NID] = parts[0].ndata['orig_id'] = F.arange(0, sim_g.number_of_nodes())
parts[0].edata[EID] = parts[0].edata['orig_id'] = F.arange(0, sim_g.number_of_edges())
else:
parts[0] = sim_g.clone()
parts[0].ndata[NID] = F.arange(0, sim_g.number_of_nodes())
parts[0].edata[EID] = F.arange(0, sim_g.number_of_edges())
parts[0].ndata['inner_node'] = F.ones((sim_g.number_of_nodes(),), F.int8, F.cpu())
parts[0].edata['inner_edge'] = F.ones((sim_g.number_of_edges(),), F.int8, F.cpu())
elif part_method == 'metis':
node_parts = metis_partition_assignment(g, num_parts, balance_ntypes=balance_ntypes,
sim_g, balance_ntypes = get_homogeneous(g, balance_ntypes)
node_parts = metis_partition_assignment(sim_g, num_parts, balance_ntypes=balance_ntypes,
balance_edges=balance_edges)
parts = partition_graph_with_halo(g, node_parts, num_hops, reshuffle=reshuffle)
parts = partition_graph_with_halo(sim_g, node_parts, num_hops, reshuffle=reshuffle)
elif part_method == 'random':
node_parts = random_choice(num_parts, g.number_of_nodes())
parts = partition_graph_with_halo(g, node_parts, num_hops, reshuffle=reshuffle)
sim_g, _ = get_homogeneous(g, balance_ntypes)
node_parts = random_choice(num_parts, sim_g.number_of_nodes())
parts = partition_graph_with_halo(sim_g, node_parts, num_hops, reshuffle=reshuffle)
else:
raise Exception('Unknown partitioning method: ' + part_method)
# If the input is a heterogeneous graph, get the original node types and original node IDs.
# `part' has three types of node data at this point.
# NTYPE: the node type.
# orig_id: the global node IDs in the homogeneous version of input graph.
# NID: the global node IDs in the reshuffled homogeneous version of the input graph.
if len(g.etypes) > 1:
if reshuffle:
for name in parts:
orig_ids = parts[name].ndata['orig_id']
ntype = F.gather_row(sim_g.ndata[NTYPE], orig_ids)
parts[name].ndata[NTYPE] = F.astype(ntype, F.int32)
assert np.all(F.asnumpy(ntype) == F.asnumpy(parts[name].ndata[NTYPE]))
# Get the original edge types and original edge IDs.
orig_ids = parts[name].edata['orig_id']
etype = F.gather_row(sim_g.edata[ETYPE], orig_ids)
parts[name].edata[ETYPE] = F.astype(etype, F.int32)
assert np.all(F.asnumpy(etype) == F.asnumpy(parts[name].edata[ETYPE]))
# Calculate the global node IDs to per-node IDs mapping.
inner_ntype = F.boolean_mask(parts[name].ndata[NTYPE],
parts[name].ndata['inner_node'] == 1)
inner_nids = F.boolean_mask(parts[name].ndata[NID],
parts[name].ndata['inner_node'] == 1)
for ntype in g.ntypes:
inner_ntype_mask = inner_ntype == g.get_ntype_id(ntype)
typed_nids = F.boolean_mask(inner_nids, inner_ntype_mask)
# inner node IDs are in a contiguous ID range.
expected_range = np.arange(int(F.as_scalar(typed_nids[0])),
int(F.as_scalar(typed_nids[-1])) + 1)
assert np.all(F.asnumpy(typed_nids) == expected_range)
# Calculate the global edge IDs to per-edge IDs mapping.
inner_etype = F.boolean_mask(parts[name].edata[ETYPE],
parts[name].edata['inner_edge'] == 1)
inner_eids = F.boolean_mask(parts[name].edata[EID],
parts[name].edata['inner_edge'] == 1)
for etype in g.etypes:
inner_etype_mask = inner_etype == g.get_etype_id(etype)
typed_eids = np.sort(F.asnumpy(F.boolean_mask(inner_eids, inner_etype_mask)))
assert np.all(typed_eids == np.arange(int(typed_eids[0]),
int(typed_eids[-1]) + 1))
else:
raise NotImplementedError('not shuffled case')
# Let's calculate edge assignment.
if not reshuffle:
start = time.time()
......@@ -285,7 +505,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1
for part_id in parts:
part = parts[part_id]
# To get the edges in the input graph, we should use original node Ids.
# To get the edges in the input graph, we should use original node IDs.
local_edges = F.boolean_mask(part.edata[EID], part.edata['inner_edge'])
edge_parts[F.asnumpy(local_edges)] = part_id
print('Calculate edge assignment: {:.3f} seconds'.format(time.time() - start))
......@@ -304,21 +524,61 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
edge_map_val = edge_part_file + ".npy"
else:
# With reshuffling, we can ensure that all nodes and edges are reshuffled
# and are in contiguous Id space.
# and are in contiguous ID space.
if num_parts > 1:
node_map_val = [F.as_scalar(F.sum(F.astype(parts[i].ndata['inner_node'], F.int64),
0)) for i in parts]
node_map_val = np.cumsum(node_map_val).tolist()
assert node_map_val[-1] == g.number_of_nodes()
edge_map_val = [F.as_scalar(F.sum(F.astype(parts[i].edata['inner_edge'], F.int64),
0)) for i in parts]
edge_map_val = np.cumsum(edge_map_val).tolist()
assert edge_map_val[-1] == g.number_of_edges()
node_map_val = {}
edge_map_val = {}
for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype)
val = []
node_map_val[ntype] = []
for i in parts:
inner_node_mask = _get_inner_node_mask(parts[i], ntype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_node_mask, F.int64), 0)))
inner_nids = F.boolean_mask(parts[i].ndata[NID], inner_node_mask)
node_map_val[ntype].append([int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1])
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_nodes(ntype)
for etype in g.etypes:
etype_id = g.get_etype_id(etype)
val = []
edge_map_val[etype] = []
for i in parts:
inner_edge_mask = _get_inner_edge_mask(parts[i], etype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_edge_mask, F.int64), 0)))
inner_eids = np.sort(F.asnumpy(F.boolean_mask(parts[i].edata[EID],
inner_edge_mask)))
edge_map_val[etype].append([int(inner_eids[0]), int(inner_eids[-1]) + 1])
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_edges(etype)
else:
node_map_val = [g.number_of_nodes()]
edge_map_val = [g.number_of_edges()]
node_map_val = {}
edge_map_val = {}
for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(parts[0], ntype_id)
inner_nids = F.boolean_mask(parts[0].ndata[NID], inner_node_mask)
node_map_val[ntype] = [[int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1]]
for etype in g.etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(parts[0], etype_id)
inner_eids = F.boolean_mask(parts[0].edata[EID], inner_edge_mask)
edge_map_val[etype] = [[int(F.as_scalar(inner_eids[0])),
int(F.as_scalar(inner_eids[-1])) + 1]]
# Double check that the node IDs in the global ID space are sorted.
for ntype in node_map_val:
val = np.concatenate([np.array(l) for l in node_map_val[ntype]])
assert np.all(val[:-1] <= val[1:])
for etype in edge_map_val:
val = np.concatenate([np.array(l) for l in edge_map_val[etype]])
assert np.all(val[:-1] <= val[1:])
start = time.time()
ntypes = {ntype:g.get_ntype_id(ntype) for ntype in g.ntypes}
etypes = {etype:g.get_etype_id(etype) for etype in g.etypes}
part_metadata = {'graph_name': graph_name,
'num_nodes': g.number_of_nodes(),
'num_edges': g.number_of_edges(),
......@@ -326,7 +586,9 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
'num_parts': num_parts,
'halo_hops': num_hops,
'node_map': node_map_val,
'edge_map': edge_map_val}
'edge_map': edge_map_val,
'ntypes': ntypes,
'etypes': etypes}
for part_id in range(num_parts):
part = parts[part_id]
......@@ -334,33 +596,92 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
node_feats = {}
edge_feats = {}
if num_parts > 1:
# To get the edges in the input graph, we should use original node Ids.
ndata_name = 'orig_id' if reshuffle else NID
edata_name = 'orig_id' if reshuffle else EID
local_nodes = F.boolean_mask(part.ndata[ndata_name], part.ndata['inner_node'])
local_edges = F.boolean_mask(part.edata[edata_name], part.edata['inner_edge'])
print('part {} has {} nodes and {} edges.'.format(
part_id, part.number_of_nodes(), part.number_of_edges()))
print('{} nodes and {} edges are inside the partition'.format(
len(local_nodes), len(local_edges)))
tot_num_inner_edges += len(local_edges)
for name in g.ndata:
if name in [NID, 'inner_node']:
continue
node_feats[name] = F.gather_row(g.ndata[name], local_nodes)
for name in g.edata:
if name in [EID, 'inner_edge']:
continue
edge_feats[name] = F.gather_row(g.edata[name], local_edges)
for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype)
# To get the edges in the input graph, we should use original node IDs.
# Both orig_id and NID stores the per-node-type IDs.
ndata_name = 'orig_id' if reshuffle else NID
inner_node_mask = _get_inner_node_mask(part, ntype_id)
# This is global node IDs.
local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask)
if len(g.ntypes) > 1:
# If the input is a heterogeneous graph.
local_nodes = F.gather_row(sim_g.ndata[NID], local_nodes)
print('part {} has {} nodes of type {} and {} are inside the partition'.format(
part_id, F.as_scalar(F.sum(part.ndata[NTYPE] == ntype_id, 0)),
ntype, len(local_nodes)))
else:
print('part {} has {} nodes and {} are inside the partition'.format(
part_id, part.number_of_nodes(), len(local_nodes)))
for name in g.nodes[ntype].data:
if name in [NID, 'inner_node']:
continue
node_feats[ntype + '/' + name] = F.gather_row(g.nodes[ntype].data[name],
local_nodes)
for etype in g.etypes:
etype_id = g.get_etype_id(etype)
edata_name = 'orig_id' if reshuffle else EID
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
if len(g.etypes) > 1:
local_edges = F.gather_row(sim_g.edata[EID], local_edges)
print('part {} has {} edges of type {} and {} are inside the partition'.format(
part_id, F.as_scalar(F.sum(part.edata[ETYPE] == etype_id, 0)),
etype, len(local_edges)))
else:
print('part {} has {} edges and {} are inside the partition'.format(
part_id, part.number_of_edges(), len(local_edges)))
tot_num_inner_edges += len(local_edges)
for name in g.edges[etype].data:
if name in [EID, 'inner_edge']:
continue
edge_feats[etype + '/' + name] = F.gather_row(g.edges[etype].data[name],
local_edges)
else:
for name in g.ndata:
if name in [NID, 'inner_node']:
continue
node_feats[name] = g.ndata[name]
for name in g.edata:
if name in [EID, 'inner_edge']:
continue
edge_feats[name] = g.edata[name]
for ntype in g.ntypes:
if reshuffle and len(g.ntypes) > 1:
ndata_name = 'orig_id'
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(part, ntype_id)
# This is global node IDs.
local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask)
local_nodes = F.gather_row(sim_g.ndata[NID], local_nodes)
elif reshuffle:
local_nodes = sim_g.ndata[NID]
for name in g.nodes[ntype].data:
if name in [NID, 'inner_node']:
continue
if reshuffle:
node_feats[ntype + '/' + name] = F.gather_row(g.nodes[ntype].data[name],
local_nodes)
else:
node_feats[ntype + '/' + name] = g.nodes[ntype].data[name]
for etype in g.etypes:
if reshuffle and len(g.etypes) > 1:
edata_name = 'orig_id'
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
local_edges = F.gather_row(sim_g.edata[EID], local_edges)
elif reshuffle:
local_edges = sim_g.edata[EID]
for name in g.edges[etype].data:
if name in [EID, 'inner_edge']:
continue
if reshuffle:
edge_feats[etype + '/' + name] = F.gather_row(g.edges[etype].data[name],
local_edges)
else:
edge_feats[etype + '/' + name] = g.edges[etype].data[name]
# Some adjustment for heterogeneous graphs.
if len(g.etypes) > 1:
part.ndata['orig_id'] = F.gather_row(sim_g.ndata[NID], part.ndata['orig_id'])
part.edata['orig_id'] = F.gather_row(sim_g.edata[EID], part.edata['orig_id'])
part_dir = os.path.join(out_path, "part" + str(part_id))
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
......@@ -372,13 +693,14 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
os.makedirs(part_dir, mode=0o775, exist_ok=True)
save_tensors(node_feat_file, node_feats)
save_tensors(edge_feat_file, edge_feats)
save_graphs(part_graph_file, [part])
with open('{}/{}.json'.format(out_path, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
print('Save partitions: {:.3f} seconds'.format(time.time() - start))
num_cuts = g.number_of_edges() - tot_num_inner_edges
num_cuts = sim_g.number_of_edges() - tot_num_inner_edges
if num_parts == 1:
num_cuts = 0
print('There are {} edges in the graph and {} edge cuts for {} partitions.'.format(
......
......@@ -4,7 +4,6 @@ This kvstore is used when running in the standalone mode
"""
from .. import backend as F
from .graph_partition_book import PartitionPolicy, NODE_PART_POLICY, EDGE_PART_POLICY
class KVClient(object):
''' The fake KVStore client.
......@@ -34,9 +33,11 @@ class KVClient(object):
'''register pull handler'''
self._pull_handlers[name] = func
def add_data(self, name, tensor):
def add_data(self, name, tensor, part_policy):
'''add data to the client'''
self._data[name] = tensor
if part_policy.policy_str not in self._all_possible_part_policy:
self._all_possible_part_policy[part_policy.policy_str] = part_policy
def init_data(self, name, shape, dtype, part_policy, init_func):
'''add new data to the client'''
......@@ -72,7 +73,3 @@ class KVClient(object):
def map_shared_data(self, partition_book):
'''Mapping shared-memory tensor from server to client.'''
self._all_possible_part_policy[NODE_PART_POLICY] = PartitionPolicy(NODE_PART_POLICY,
partition_book)
self._all_possible_part_policy[EDGE_PART_POLICY] = PartitionPolicy(EDGE_PART_POLICY,
partition_book)
......@@ -6,16 +6,16 @@ from ._ffi.function import _init_api
from .heterograph import DGLHeteroGraph
from . import backend as F
from . import utils
from .base import EID, NID
from .base import EID, NID, NTYPE, ETYPE
__all__ = ["metis_partition", "metis_partition_assignment",
"partition_graph_with_halo"]
def reorder_nodes(g, new_node_ids):
""" Generate a new graph with new node Ids.
""" Generate a new graph with new node IDs.
We assign each node in the input graph with a new node Id. This results in
We assign each node in the input graph with a new node ID. This results in
a new graph.
Parameters
......@@ -23,11 +23,11 @@ def reorder_nodes(g, new_node_ids):
g : DGLGraph
The input graph
new_node_ids : a tensor
The new node Ids
The new node IDs
Returns
-------
DGLGraph
The graph with new node Ids.
The graph with new node IDs.
"""
assert len(new_node_ids) == g.number_of_nodes(), \
"The number of new node ids must match #nodes in the graph."
......@@ -35,7 +35,7 @@ def reorder_nodes(g, new_node_ids):
sorted_ids, idx = F.sort_1d(new_node_ids.tousertensor())
assert F.asnumpy(sorted_ids[0]) == 0 \
and F.asnumpy(sorted_ids[-1]) == g.number_of_nodes() - 1, \
"The new node Ids are incorrect."
"The new node IDs are incorrect."
new_gidx = _CAPI_DGLReorderGraph_Hetero(
g._graph, new_node_ids.todgltensor())
new_g = DGLHeteroGraph(gidx=new_gidx, ntypes=['_N'], etypes=['_E'])
......@@ -46,6 +46,74 @@ def reorder_nodes(g, new_node_ids):
def _get_halo_heterosubgraph_inner_node(halo_subg):
return _CAPI_GetHaloSubgraphInnerNodes_Hetero(halo_subg)
def reshuffle_graph(g, node_part=None):
'''Reshuffle node ids and edge IDs of a graph.
This function reshuffles nodes and edges in a graph so that all nodes/edges of the same type
have contiguous IDs. If a graph is partitioned and nodes are assigned to different partitions,
all nodes/edges in a partition should
get contiguous IDs; within a partition, all nodes/edges of the same type have contigous IDs.
Parameters
----------
g : DGLGraph
The input graph.
node_part : Tensor
This is a vector whose length is the same as the number of nodes in the input graph.
Each element indicates the partition ID the corresponding node is assigned to.
Returns
-------
(DGLGraph, Tensor)
The graph whose nodes and edges are reshuffled.
The 1D tensor that indicates the partition IDs of the nodes in the reshuffled graph.
'''
# In this case, we don't need to reshuffle node IDs and edge IDs.
if node_part is None:
g.ndata['orig_id'] = F.arange(0, g.number_of_nodes())
g.edata['orig_id'] = F.arange(0, g.number_of_edges())
return g, None
start = time.time()
if node_part is not None:
node_part = utils.toindex(node_part)
node_part = node_part.tousertensor()
if NTYPE in g.ndata:
is_hetero = len(F.unique(g.ndata[NTYPE])) > 1
else:
is_hetero = False
if is_hetero:
num_node_types = F.max(g.ndata[NTYPE], 0) + 1
if node_part is not None:
sorted_part, new2old_map = F.sort_1d(node_part * num_node_types + g.ndata[NTYPE])
else:
sorted_part, new2old_map = F.sort_1d(g.ndata[NTYPE])
sorted_part = F.floor_div(sorted_part, num_node_types)
elif node_part is not None:
sorted_part, new2old_map = F.sort_1d(node_part)
else:
g.ndata['orig_id'] = g.ndata[NID]
g.edata['orig_id'] = g.edata[EID]
return g, None
new_node_ids = np.zeros((g.number_of_nodes(),), dtype=np.int64)
new_node_ids[F.asnumpy(new2old_map)] = np.arange(0, g.number_of_nodes())
# If the input graph is homogneous, we only need to create an empty array, so that
# _CAPI_DGLReassignEdges_Hetero knows how to handle it.
etype = g.edata[ETYPE] if ETYPE in g.edata else F.zeros((0), F.dtype(sorted_part), F.cpu())
g = reorder_nodes(g, new_node_ids)
node_part = utils.toindex(sorted_part)
# We reassign edges in in-CSR. In this way, after partitioning, we can ensure
# that all edges in a partition are in the contiguous ID space.
etype_idx = utils.toindex(etype)
orig_eids = _CAPI_DGLReassignEdges_Hetero(g._graph, etype_idx.todgltensor(),
node_part.todgltensor(), True)
orig_eids = utils.toindex(orig_eids)
orig_eids = orig_eids.tousertensor()
g.edata['orig_id'] = orig_eids
print('Reshuffle nodes and edges: {:.3f} seconds'.format(time.time() - start))
return g, node_part.tousertensor()
def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
'''Partition a graph.
......@@ -55,10 +123,10 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
not belong to the partition of a subgraph but are connected to the nodes
in the partition within a fixed number of hops.
If `reshuffle` is turned on, the function reshuffles node Ids and edge Ids
If `reshuffle` is turned on, the function reshuffles node IDs and edge IDs
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous Id range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node Ids
in a partition fall in a contiguous ID range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node IDs
in the original input graph.
Parameters
......@@ -68,37 +136,24 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
node_part: 1D tensor
Specify which partition a node is assigned to. The length of this tensor
needs to be the same as the number of nodes of the graph. Each element
indicates the partition Id of a node.
indicates the partition ID of a node.
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same Id range.
Resuffle nodes so that nodes in the same partition are in the same ID range.
Returns
--------
a dict of DGLGraphs
The key is the partition Id and the value is the DGLGraph of the partition.
The key is the partition ID and the value is the DGLGraph of the partition.
'''
assert len(node_part) == g.number_of_nodes()
node_part = utils.toindex(node_part)
if reshuffle:
start = time.time()
node_part = node_part.tousertensor()
sorted_part, new2old_map = F.sort_1d(node_part)
new_node_ids = np.zeros((g.number_of_nodes(),), dtype=np.int64)
new_node_ids[F.asnumpy(new2old_map)] = np.arange(
0, g.number_of_nodes())
g = reorder_nodes(g, new_node_ids)
node_part = utils.toindex(sorted_part)
# We reassign edges in in-CSR. In this way, after partitioning, we can ensure
# that all edges in a partition are in the contiguous Id space.
orig_eids = _CAPI_DGLReassignEdges_Hetero(g._graph, True)
orig_eids = utils.toindex(orig_eids)
orig_eids = orig_eids.tousertensor()
g, node_part = reshuffle_graph(g, node_part)
orig_nids = g.ndata['orig_id']
print('Reshuffle nodes and edges: {:.3f} seconds'.format(
time.time() - start))
orig_eids = g.edata['orig_id']
node_part = utils.toindex(node_part)
start = time.time()
subgs = _CAPI_DGLPartitionWithHalo_Hetero(
g._graph, node_part.todgltensor(), extra_cached_hops)
......@@ -171,7 +226,7 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
Returns
-------
a 1-D tensor
A vector with each element that indicates the partition Id of a vertex.
A vector with each element that indicates the partition ID of a vertex.
'''
# METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions.
......@@ -252,10 +307,10 @@ def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
To balance the node types, a user needs to pass a vector of N elements to indicate
the type of each node. N is the number of nodes in the input graph.
If `reshuffle` is turned on, the function reshuffles node Ids and edge Ids
If `reshuffle` is turned on, the function reshuffles node IDs and edge IDs
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous Id range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node Ids
in a partition fall in a contiguous ID range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node IDs
in the original input graph.
The partitioned subgraph is stored in DGLGraph. The DGLGraph has the `part_id`
......@@ -271,7 +326,7 @@ def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same Id range.
Resuffle nodes so that nodes in the same partition are in the same ID range.
balance_ntypes : tensor
Node type of each node
balance_edges : bool
......@@ -280,7 +335,7 @@ def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
Returns
--------
a dict of DGLGraphs
The key is the partition Id and the value is the DGLGraph of the partition.
The key is the partition ID and the value is the DGLGraph of the partition.
'''
node_part = metis_partition_assignment(g, k, balance_ntypes, balance_edges)
if node_part is None:
......@@ -289,5 +344,4 @@ def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
# Then we split the original graph into parts based on the METIS partitioning results.
return partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle)
_init_api("dgl.partition")
......@@ -719,4 +719,61 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLMapSubgraphNID")
*rv = GraphOp::MapParentIdToSubgraphId(parent_vids, query);
});
template<class IdType>
IdArray MapIds(IdArray ids, IdArray range_starts, IdArray range_ends, IdArray typed_map,
int num_parts, int num_types) {
int64_t num_ids = ids->shape[0];
int64_t num_ranges = range_starts->shape[0];
IdArray ret = IdArray::Empty({num_ids * 2}, ids->dtype, ids->ctx);
const IdType *range_start_data = static_cast<IdType *>(range_starts->data);
const IdType *range_end_data = static_cast<IdType *>(range_ends->data);
const IdType *ids_data = static_cast<IdType *>(ids->data);
const IdType *typed_map_data = static_cast<IdType *>(typed_map->data);
IdType *types_data = static_cast<IdType *>(ret->data);
IdType *per_type_ids_data = static_cast<IdType *>(ret->data) + num_ids;
#pragma omp parallel for
for (int64_t i = 0; i < ids->shape[0]; i++) {
IdType id = ids_data[i];
auto it = std::lower_bound(range_end_data, range_end_data + num_ranges, id);
// The range must exist.
BUG_ON(it != range_end_data + num_ranges);
size_t range_id = it - range_end_data;
int type_id = range_id % num_types;
types_data[i] = type_id;
int part_id = range_id / num_types;
BUG_ON(part_id < num_parts);
if (part_id == 0) {
per_type_ids_data[i] = id - range_start_data[range_id];
} else {
per_type_ids_data[i] = id - range_start_data[range_id]
+ typed_map_data[num_parts * type_id + part_id - 1];
}
}
return ret;
}
DGL_REGISTER_GLOBAL("distributed.id_map._CAPI_DGLHeteroMapIds")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
const IdArray ids = args[0];
const IdArray range_starts = args[1];
const IdArray range_ends = args[2];
const IdArray typed_map = args[3];
int num_parts = args[4];
int num_types = args[5];
int num_ranges = range_starts->shape[0];
CHECK_EQ(range_starts->dtype.bits, ids->dtype.bits);
CHECK_EQ(range_ends->dtype.bits, ids->dtype.bits);
CHECK_EQ(typed_map->dtype.bits, ids->dtype.bits);
CHECK_EQ(num_ranges, num_parts * num_types);
CHECK_EQ(num_ranges, range_ends->shape[0]);
IdArray ret;
ATEN_ID_TYPE_SWITCH(ids->dtype, IdType, {
ret = MapIds<IdType>(ids, range_starts, range_ends, typed_map, num_parts, num_types);
});
*rv = ret;
});
} // namespace dgl
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment