Unverified Commit ffe58983 authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Optimization][KG] Several optimizations on DGL-KG (#1233)

* Several optimizations on DGL-KG:
1. Sorted positive edges for sampling which can reduce random
   memory access during positive sampling
2. Asynchronous node embedding update
3. Balanced Relation Partition that gives balanced number of
   edges in each partition. When there is no cross partition
   relation, relation embedding can be pin into GPU memory
4. tunable neg_sample_size instead of fixed neg_sample_size

* Fix test

* Fix test and eval.py

* Now TransR is OK

* Fix single GPU with mix_cpu_gpu

* Add app tests

* Fix test script

* fix mxnet

* Fix sample

* Add docstrings

* Fix

* Default value for num_workers

* Upd

* upd
parent f103bbf9
#To reproduce reported results on README, you can run the model with the following commands:
# for FB15k
# DistMult 1GPU
DGLBACKEND=pytorch python3 train.py --model DistMult --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 2000 --gamma 500.0 --lr 0.1 --max_step 100000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.08 --batch_size_eval 16 \
--valid --test -adv --mix_cpu_gpu --eval_interval 100000 --gpu 0 \
--num_worker=8 --max_step 40000
# DistMult 8GPU
DGLBACKEND=pytorch python3 train.py --model DistMult --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.08 --batch_size_eval 16 \
--valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 --gpu 0 1 2 3 4 5 6 7 \
--num_worker=4 --max_step 10000 --rel_part --async_update
# ComplEx 1GPU
DGLBACKEND=pytorch python3 train.py --model ComplEx --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 2000 --gamma 500.0 --lr 0.2 --max_step 100000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.1 --regularization_coef 2.00E-06 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 \
--gpu 0 --num_worker=8 --max_step 32000
# ComplEx 8GPU
DGLBACKEND=pytorch python3 train.py --model ComplEx --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.1 --regularization_coef 2.00E-06 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 \
--gpu 0 1 2 3 4 5 6 7 --num_worker=4 --max_step 4000 --rel_part --async_update
# TransE_l1 1GPU
DGLBACKEND=pytorch python3 train.py --model TransE_l1 --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 2000 --gamma 24.0 --lr 0.01 --max_step 20000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv
--neg_sample_size 64 --regularization_coef 1e-07 --hidden_dim 400 --gamma 16.0 --lr 0.01 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 \
--gpu 0 --num_worker=8 --max_step 48000
# TransE_l1 8GPU
DGLBACKEND=pytorch python3 train.py --model TransE_l1 --dataset FB15k --batch_size 1024 \
--neg_sample_size 64 --regularization_coef 1e-07 --hidden_dim 400 --gamma 16.0 --lr 0.01 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 \
--gpu 0 1 2 3 4 5 6 7 --num_worker=4 --max_step 6000 --rel_part --async_update
# TransE_l2 1GPU
DGLBACKEND=pytorch python3 train.py --model TransE_l2 --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 2000 --gamma 12.0 --lr 0.1 --max_step 30000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv --regularization_coef=2e-7
# RESCAL 1GPU
DGLBACKEND=pytorch python3 train.py --model RESCAL --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 500 --gamma 24.0 --lr 0.03 --max_step 30000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv
# TransR 1GPU
DGLBACKEND=pytorch python3 train.py --model TransR --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 500 --gamma 24.0 --lr 0.01 --max_step 30000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv
DGLBACKEND=pytorch python3 train.py --model RotatE --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 12.0 --lr 0.01 --max_step 30000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv -de --regularization_coef=1e-4
--neg_sample_size 256 --regularization_coef 5e-8 --hidden_dim 200 --gamma 8.0 --lr 0.015 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 \
--gpu 0 --num_worker=8 --max_step 32000
# TransR 8GPU
DGLBACKEND=pytorch python3 train.py --model TransR --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --regularization_coef 5e-8 --hidden_dim 200 --gamma 8.0 --lr 0.015 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 \
--gpu 0 1 2 3 4 5 6 7 --num_worker=4 --max_step 4000 --rel_part --async_update
# RotatE 1GPU
DGLBACKEND=pytorch python3 train.py --model RotatE --dataset FB15k --batch_size 2048 \
--neg_sample_size 256 --regularization_coef 1e-07 --hidden_dim 200 --gamma 12.0 --lr 0.009 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 -de \
--mix_cpu_gpu --max_step 40000 --gpu 0 --num_worker=4
# RotatE 8GPU
DGLBACKEND=pytorch python3 train.py --model RotatE --dataset FB15k --batch_size 2048 \
--neg_sample_size 256 --regularization_coef 1e-07 --hidden_dim 200 --gamma 12.0 --lr 0.009 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 -de \
--mix_cpu_gpu --max_step 5000 --num_proc 8 --gpu 0 1 2 3 4 5 6 7 --num_worker=4 \
--rel_part --async_update
# for wn18
DGLBACKEND=pytorch python3 train.py --model TransE_l1 --dataset wn18 --batch_size 1024 \
--neg_sample_size 512 --hidden_dim 500 --gamma 12.0 --adversarial_temperature 0.5 \
--lr 0.01 --max_step 40000 --batch_size_eval 16 --gpu 0 --valid --test -adv \
......
......@@ -8,9 +8,34 @@ import sys
import pickle
import time
# This partitions a list of edges based on relations to make sure
# each partition has roughly the same number of edges and relations.
def RelationPartition(edges, n):
def BalancedRelationPartition(edges, n):
"""This partitions a list of edges based on relations to make sure
each partition has roughly the same number of edges and relations.
Algo:
For r in relations:
Find partition with fewest edges
if r.size() > num_of empty_slot
put edges of r into this partition to fill the partition,
find next partition with fewest edges to put r in.
else
put edges of r into this partition.
Parameters
----------
edges : (heads, rels, tails) triple
Edge list to partition
n : int
number of partitions
Returns
-------
List of np.array
Edges of each partition
List of np.array
Edge types of each partition
bool
Whether there exists some relations belongs to multiple partitions
"""
heads, rels, tails = edges
print('relation partition {} edges into {} parts'.format(len(heads), n))
uniq, cnts = np.unique(rels, return_counts=True)
......@@ -24,33 +49,88 @@ def RelationPartition(edges, n):
rel_parts = []
for _ in range(n):
rel_parts.append([])
max_edges = (len(rels) // n) + 1
num_cross_part = 0
for i in range(len(cnts)):
cnt = cnts[i]
r = uniq[i]
idx = np.argmin(edge_cnts)
rel_dict[r] = idx
rel_parts[idx].append(r)
edge_cnts[idx] += cnt
rel_cnts[idx] += 1
r_parts = []
while cnt > 0:
idx = np.argmin(edge_cnts)
if edge_cnts[idx] + cnt <= max_edges:
r_parts.append([idx, cnt])
rel_parts[idx].append(r)
edge_cnts[idx] += cnt
rel_cnts[idx] += 1
cnt = 0
else:
cur_cnt = max_edges - edge_cnts[idx]
r_parts.append([idx, cur_cnt])
rel_parts[idx].append(r)
edge_cnts[idx] += cur_cnt
rel_cnts[idx] += 1
num_cross_part += 1
cnt -= cur_cnt
rel_dict[r] = r_parts
for i, edge_cnt in enumerate(edge_cnts):
print('part {} has {} edges and {} relations'.format(i, edge_cnt, rel_cnts[i]))
print('{}/{} duplicated relation across partitions'.format(num_cross_part, len(cnts)))
parts = []
for i in range(n):
parts.append([])
rel_parts[i] = np.array(rel_parts[i])
# let's store the edge index to each partition first.
for i, r in enumerate(rels):
part_idx = rel_dict[r]
r_part = rel_dict[r][0]
part_idx = r_part[0]
cnt = r_part[1]
parts[part_idx].append(i)
cnt -= 1
if cnt == 0:
rel_dict[r].pop(0)
else:
rel_dict[r][0][1] = cnt
for i, part in enumerate(parts):
parts[i] = np.array(part, dtype=np.int64)
return parts, rel_parts
shuffle_idx = np.concatenate(parts)
heads[:] = heads[shuffle_idx]
rels[:] = rels[shuffle_idx]
tails[:] = tails[shuffle_idx]
off = 0
for i, part in enumerate(parts):
parts[i] = np.arange(off, off + len(part))
off += len(part)
return parts, rel_parts, num_cross_part > 0
def RandomPartition(edges, n):
"""This partitions a list of edges randomly across n partitions
Parameters
----------
edges : (heads, rels, tails) triple
Edge list to partition
n : int
number of partitions
Returns
-------
List of np.array
Edges of each partition
"""
heads, rels, tails = edges
print('random partition {} edges into {} parts'.format(len(heads), n))
idx = np.random.permutation(len(heads))
heads[:] = heads[idx]
rels[:] = rels[idx]
tails[:] = tails[idx]
part_size = int(math.ceil(len(idx) / n))
parts = []
for i in range(n):
......@@ -61,6 +141,17 @@ def RandomPartition(edges, n):
return parts
def ConstructGraph(edges, n_entities, args):
"""Construct Graph for training
Parameters
----------
edges : (heads, rels, tails) triple
Edge list
n_entities : int
number of entities
args :
Global configs.
"""
pickle_name = 'graph_train.pickle'
if args.pickle_graph and os.path.exists(os.path.join(args.data_path, args.dataset, pickle_name)):
with open(os.path.join(args.data_path, args.dataset, pickle_name), 'rb') as graph_file:
......@@ -77,47 +168,73 @@ def ConstructGraph(edges, n_entities, args):
return g
class TrainDataset(object):
def __init__(self, dataset, args, weighting=False, ranks=64):
"""Dataset for training
Parameters
----------
dataset : KGDataset
Original dataset.
args :
Global configs.
ranks:
Number of partitions.
"""
def __init__(self, dataset, args, ranks=64):
triples = dataset.train
self.g = ConstructGraph(triples, dataset.n_entities, args)
num_train = len(triples[0])
print('|Train|:', num_train)
if ranks > 1 and args.rel_part:
self.edge_parts, self.rel_parts = RelationPartition(triples, ranks)
self.edge_parts, self.rel_parts, self.cross_part = \
BalancedRelationPartition(triples, ranks)
elif ranks > 1:
self.edge_parts = RandomPartition(triples, ranks)
self.cross_part = True
else:
self.edge_parts = [np.arange(num_train)]
if weighting:
# TODO: weight to be added
count = self.count_freq(triples)
subsampling_weight = np.vectorize(
lambda h, r, t: np.sqrt(1 / (count[(h, r)] + count[(t, -r - 1)]))
)
weight = subsampling_weight(src, etype_id, dst)
self.g.edata['weight'] = F.zerocopy_from_numpy(weight)
def count_freq(self, triples, start=4):
count = {}
for head, rel, tail in triples:
if (head, rel) not in count:
count[(head, rel)] = start
else:
count[(head, rel)] += 1
self.rel_parts = [np.arange(dataset.n_relations)]
self.cross_part = False
if (tail, -rel - 1) not in count:
count[(tail, -rel - 1)] = start
else:
count[(tail, -rel - 1)] += 1
return count
self.g = ConstructGraph(triples, dataset.n_entities, args)
def create_sampler(self, batch_size, neg_sample_size=2, neg_chunk_size=None, mode='head', num_workers=5,
def create_sampler(self, batch_size, neg_sample_size=2, neg_chunk_size=None, mode='head', num_workers=32,
shuffle=True, exclude_positive=False, rank=0):
"""Create sampler for training
Parameters
----------
batch_size : int
Batch size of each mini batch.
neg_sample_size : int
How many negative edges sampled for each node.
neg_chunk_size : int
How many edges in one chunk. We split one batch into chunks.
mode : str
Sampling mode.
number_workers: int
Number of workers used in parallel for this sampler
shuffle : bool
If True, shuffle the seed edges.
If False, do not shuffle the seed edges.
Default: False
exclude_positive : bool
If True, exlucde true positive edges in sampled negative edges
If False, return all sampled negative edges even there are positive edges
Default: False
rank : int
Which partition to sample.
Returns
-------
dgl.contrib.sampling.EdgeSampler
Edge sampler
"""
EdgeSampler = getattr(dgl.contrib.sampling, 'EdgeSampler')
assert batch_size % neg_sample_size == 0, 'batch_size should be divisible by B'
return EdgeSampler(self.g,
seed_edges=F.tensor(self.edge_parts[rank]),
batch_size=batch_size,
neg_sample_size=neg_sample_size,
neg_sample_size=int(neg_sample_size/neg_chunk_size),
chunk_size=neg_chunk_size,
negative_mode=mode,
num_workers=num_workers,
......@@ -127,6 +244,22 @@ class TrainDataset(object):
class ChunkNegEdgeSubgraph(dgl.subgraph.DGLSubGraph):
"""Wrapper for negative graph
Parameters
----------
neg_g : DGLGraph
Graph holding negative edges.
num_chunks : int
Number of chunks in sampled graph.
chunk_size : int
Info of chunk_size.
neg_sample_size : int
Info of neg_sample_size.
neg_head : bool
If True, negative_mode is 'head'
If False, negative_mode is 'tail'
"""
def __init__(self, subg, num_chunks, chunk_size,
neg_sample_size, neg_head):
super(ChunkNegEdgeSubgraph, self).__init__(subg._parent, subg.sgi)
......@@ -145,13 +278,37 @@ class ChunkNegEdgeSubgraph(dgl.subgraph.DGLSubGraph):
return self.subg.tail_nid
# KG models need to know the number of chunks, the chunk size and negative sample size
# of a negative subgraph to perform the computation more efficiently.
# This function tries to infer all of these information of the negative subgraph
# and create a wrapper class that contains all of the information.
def create_neg_subgraph(pos_g, neg_g, chunk_size, is_chunked, neg_head, num_nodes):
def create_neg_subgraph(pos_g, neg_g, chunk_size, neg_sample_size, is_chunked,
neg_head, num_nodes):
"""KG models need to know the number of chunks, the chunk size and negative sample size
of a negative subgraph to perform the computation more efficiently.
This function tries to infer all of these information of the negative subgraph
and create a wrapper class that contains all of the information.
Parameters
----------
pos_g : DGLGraph
Graph holding positive edges.
neg_g : DGLGraph
Graph holding negative edges.
chunk_size : int
Chunk size of negative subgrap.
neg_sample_size : int
Negative sample size of negative subgrap.
is_chunked : bool
If True, the sampled batch is chunked.
neg_head : bool
If True, negative_mode is 'head'
If False, negative_mode is 'tail'
num_nodes: int
Total number of nodes in the whole graph.
Returns
-------
ChunkNegEdgeSubgraph
Negative graph wrapper
"""
assert neg_g.number_of_edges() % pos_g.number_of_edges() == 0
neg_sample_size = int(neg_g.number_of_edges() / pos_g.number_of_edges())
# We use all nodes to create negative edges. Regardless of the sampling algorithm,
# we can always view the subgraph with one chunk.
if (neg_head and len(neg_g.head_nid) == num_nodes) \
......@@ -160,15 +317,13 @@ def create_neg_subgraph(pos_g, neg_g, chunk_size, is_chunked, neg_head, num_node
chunk_size = pos_g.number_of_edges()
elif is_chunked:
if pos_g.number_of_edges() < chunk_size:
num_chunks = 1
chunk_size = pos_g.number_of_edges()
return None
else:
# This is probably the last batch. Let's ignore it.
if pos_g.number_of_edges() % chunk_size > 0:
return None
num_chunks = int(pos_g.number_of_edges()/ chunk_size)
num_chunks = int(pos_g.number_of_edges() / chunk_size)
assert num_chunks * chunk_size == pos_g.number_of_edges()
assert num_chunks * neg_sample_size * chunk_size == neg_g.number_of_edges()
else:
num_chunks = pos_g.number_of_edges()
chunk_size = 1
......@@ -176,8 +331,31 @@ def create_neg_subgraph(pos_g, neg_g, chunk_size, is_chunked, neg_head, num_node
neg_sample_size, neg_head)
class EvalSampler(object):
def __init__(self, g, edges, batch_size, neg_sample_size, neg_chunk_size, mode, num_workers,
filter_false_neg):
"""Sampler for validation and testing
Parameters
----------
g : DGLGraph
Graph containing KG graph
edges : tensor
Seed edges
batch_size : int
Batch size of each mini batch.
neg_sample_size : int
How many negative edges sampled for each node.
neg_chunk_size : int
How many edges in one chunk. We split one batch into chunks.
mode : str
Sampling mode.
number_workers: int
Number of workers used in parallel for this sampler
filter_false_neg : bool
If True, exlucde true positive edges in sampled negative edges
If False, return all sampled negative edges even there are positive edges
Default: True
"""
def __init__(self, g, edges, batch_size, neg_sample_size, neg_chunk_size, mode, num_workers=32,
filter_false_neg=True):
EdgeSampler = getattr(dgl.contrib.sampling, 'EdgeSampler')
self.sampler = EdgeSampler(g,
batch_size=batch_size,
......@@ -196,17 +374,31 @@ class EvalSampler(object):
self.g = g
self.filter_false_neg = filter_false_neg
self.neg_chunk_size = neg_chunk_size
self.neg_sample_size = neg_sample_size
def __iter__(self):
return self
def __next__(self):
"""Get next batch
Returns
-------
DGLGraph
Sampled positive graph
ChunkNegEdgeSubgraph
Negative graph wrapper
"""
while True:
pos_g, neg_g = next(self.sampler_iter)
if self.filter_false_neg:
neg_positive = neg_g.edata['false_neg']
neg_g = create_neg_subgraph(pos_g, neg_g, self.neg_chunk_size, 'chunk' in self.mode,
self.neg_head, self.g.number_of_nodes())
neg_g = create_neg_subgraph(pos_g, neg_g,
self.neg_chunk_size,
self.neg_sample_size,
'chunk' in self.mode,
self.neg_head,
self.g.number_of_nodes())
if neg_g is not None:
break
......@@ -218,10 +410,21 @@ class EvalSampler(object):
return pos_g, neg_g
def reset(self):
"""Reset the sampler
"""
self.sampler_iter = iter(self.sampler)
return self
class EvalDataset(object):
"""Dataset for validation or testing
Parameters
----------
dataset : KGDataset
Original dataset.
args :
Global configs.
"""
def __init__(self, dataset, args):
pickle_name = 'graph_all.pickle'
if args.pickle_graph and os.path.exists(os.path.join(args.data_path, args.dataset, pickle_name)):
......@@ -259,10 +462,19 @@ class EvalDataset(object):
self.test = np.arange(self.num_train + self.num_valid, self.g.number_of_edges())
print('|test|:', len(self.test))
self.num_valid = len(self.valid)
self.num_test = len(self.test)
def get_edges(self, eval_type):
""" Get all edges in this dataset
Parameters
----------
eval_type : str
Sampling type, 'valid' for validation and 'test' for testing
Returns
-------
np.array
Edges
"""
if eval_type == 'valid':
return self.valid
elif eval_type == 'test':
......@@ -270,29 +482,37 @@ class EvalDataset(object):
else:
raise Exception('get invalid type: ' + eval_type)
def check(self, eval_type):
edges = self.get_edges(eval_type)
subg = self.g.edge_subgraph(edges)
if eval_type == 'valid':
data = self.valid
elif eval_type == 'test':
data = self.test
subg.copy_from_parent()
src, dst, eid = subg.all_edges('all', order='eid')
src_id = subg.ndata['id'][src]
dst_id = subg.ndata['id'][dst]
etype = subg.edata['id'][eid]
orig_src = np.array([t[0] for t in data])
orig_etype = np.array([t[1] for t in data])
orig_dst = np.array([t[2] for t in data])
np.testing.assert_equal(F.asnumpy(src_id), orig_src)
np.testing.assert_equal(F.asnumpy(dst_id), orig_dst)
np.testing.assert_equal(F.asnumpy(etype), orig_etype)
def create_sampler(self, eval_type, batch_size, neg_sample_size, neg_chunk_size,
filter_false_neg, mode='head', num_workers=5, rank=0, ranks=1):
filter_false_neg, mode='head', num_workers=32, rank=0, ranks=1):
"""Create sampler for validation or testing
Parameters
----------
eval_type : str
Sampling type, 'valid' for validation and 'test' for testing
batch_size : int
Batch size of each mini batch.
neg_sample_size : int
How many negative edges sampled for each node.
neg_chunk_size : int
How many edges in one chunk. We split one batch into chunks.
filter_false_neg : bool
If True, exlucde true positive edges in sampled negative edges
If False, return all sampled negative edges even there are positive edges
mode : str
Sampling mode.
number_workers: int
Number of workers used in parallel for this sampler
rank : int
Which partition to sample.
ranks : int
Total number of partitions.
Returns
-------
dgl.contrib.sampling.EdgeSampler
Edge sampler
"""
edges = self.get_edges(eval_type)
beg = edges.shape[0] * rank // ranks
end = min(edges.shape[0] * (rank + 1) // ranks, edges.shape[0])
......@@ -301,12 +521,32 @@ class EvalDataset(object):
mode, num_workers, filter_false_neg)
class NewBidirectionalOneShotIterator:
def __init__(self, dataloader_head, dataloader_tail, neg_chunk_size, is_chunked, num_nodes):
"""Grouped samper iterator
Parameters
----------
dataloader_head : dgl.contrib.sampling.EdgeSampler
EdgeSampler in head mode
dataloader_tail : dgl.contrib.sampling.EdgeSampler
EdgeSampler in tail mode
neg_chunk_size : int
How many edges in one chunk. We split one batch into chunks.
neg_sample_size : int
How many negative edges sampled for each node.
is_chunked : bool
If True, the sampled batch is chunked.
num_nodes : int
Total number of nodes in the whole graph.
"""
def __init__(self, dataloader_head, dataloader_tail, neg_chunk_size, neg_sample_size,
is_chunked, num_nodes):
self.sampler_head = dataloader_head
self.sampler_tail = dataloader_tail
self.iterator_head = self.one_shot_iterator(dataloader_head, neg_chunk_size, is_chunked,
self.iterator_head = self.one_shot_iterator(dataloader_head, neg_chunk_size,
neg_sample_size, is_chunked,
True, num_nodes)
self.iterator_tail = self.one_shot_iterator(dataloader_tail, neg_chunk_size, is_chunked,
self.iterator_tail = self.one_shot_iterator(dataloader_tail, neg_chunk_size,
neg_sample_size, is_chunked,
False, num_nodes)
self.step = 0
......@@ -319,11 +559,12 @@ class NewBidirectionalOneShotIterator:
return pos_g, neg_g
@staticmethod
def one_shot_iterator(dataloader, neg_chunk_size, is_chunked, neg_head, num_nodes):
def one_shot_iterator(dataloader, neg_chunk_size, neg_sample_size, is_chunked,
neg_head, num_nodes):
while True:
for pos_g, neg_g in dataloader:
neg_g = create_neg_subgraph(pos_g, neg_g, neg_chunk_size, is_chunked,
neg_head, num_nodes)
neg_g = create_neg_subgraph(pos_g, neg_g, neg_chunk_size, neg_sample_size,
is_chunked, neg_head, num_nodes)
if neg_g is None:
continue
......
......@@ -15,7 +15,7 @@ if backend.lower() == 'mxnet':
else:
import torch.multiprocessing as mp
from train_pytorch import load_model_from_checkpoint
from train_pytorch import test
from train_pytorch import test, test_mp
class ArgParser(argparse.ArgumentParser):
def __init__(self):
......@@ -100,7 +100,8 @@ def main(args):
args.train = False
args.valid = False
args.test = True
args.rel_part = False
args.strict_rel_part = False
args.async_update = False
args.batch_size_eval = args.batch_size
logger = get_logger(args)
......@@ -172,28 +173,33 @@ def main(args):
queue = mp.Queue(args.num_proc)
procs = []
for i in range(args.num_proc):
proc = mp.Process(target=test, args=(args, model, [test_sampler_heads[i], test_sampler_tails[i]],
i, 'Test', queue))
proc = mp.Process(target=test_mp, args=(args,
model,
[test_sampler_heads[i], test_sampler_tails[i]],
i,
'Test',
queue))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
total_metrics = {}
metrics = {}
logs = []
for i in range(args.num_proc):
metrics = queue.get()
for k, v in metrics.items():
if i == 0:
total_metrics[k] = v / args.num_proc
else:
total_metrics[k] += v / args.num_proc
log = queue.get()
logs = logs + log
for metric in logs[0].keys():
metrics[metric] = sum([log[metric] for log in logs]) / len(logs)
for k, v in metrics.items():
print('Test average {} at [{}/{}]: {}'.format(k, args.step, args.max_step, v))
for proc in procs:
proc.join()
else:
test(args, model, [test_sampler_head, test_sampler_tail])
print('Test takes {:.3f} seconds'.format(time.time() - start))
if __name__ == '__main__':
args = ArgParser().parse_args()
main(args)
......
"""
Graph Embedding Model
1. TransE
2. TransR
3. RESCAL
4. DistMult
5. ComplEx
6. RotatE
"""
import os
import numpy as np
import dgl.backend as F
......@@ -23,6 +32,30 @@ else:
from .pytorch.score_fun import *
class KEModel(object):
""" DGL Knowledge Embedding Model.
Parameters
----------
args:
Global configs.
model_name : str
Which KG model to use, including 'TransE_l1', 'TransE_l2', 'TransR',
'RESCAL', 'DistMult', 'ComplEx', 'RotatE'
n_entities : int
Num of entities.
n_relations : int
Num of relations.
hidden_dim : int
Dimetion size of embedding.
gamma : float
Gamma for score function.
double_entity_emb : bool
If True, entity embedding size will be 2 * hidden_dim.
Default: False
double_relation_emb : bool
If True, relation embedding size will be 2 * hidden_dim.
Default: False
"""
def __init__(self, args, model_name, n_entities, n_relations, hidden_dim, gamma,
double_entity_emb=False, double_relation_emb=False):
super(KEModel, self).__init__()
......@@ -47,15 +80,24 @@ class KEModel(object):
rel_dim = relation_dim
self.rel_dim = rel_dim
self.relation_emb = ExternalEmbedding(args, n_relations, rel_dim, F.cpu() if args.mix_cpu_gpu else device)
self.entity_dim = entity_dim
self.strict_rel_part = args.strict_rel_part
if not self.strict_rel_part:
self.relation_emb = ExternalEmbedding(args, n_relations, rel_dim,
F.cpu() if args.mix_cpu_gpu else device)
else:
self.global_relation_emb = ExternalEmbedding(args, n_relations, rel_dim, F.cpu())
if model_name == 'TransE' or model_name == 'TransE_l2':
self.score_func = TransEScore(gamma, 'l2')
elif model_name == 'TransE_l1':
self.score_func = TransEScore(gamma, 'l1')
elif model_name == 'TransR':
projection_emb = ExternalEmbedding(args, n_relations, entity_dim * relation_dim,
projection_emb = ExternalEmbedding(args,
n_relations,
entity_dim * relation_dim,
F.cpu() if args.mix_cpu_gpu else device)
self.score_func = TransRScore(gamma, projection_emb, relation_dim, entity_dim)
elif model_name == 'DistMult':
self.score_func = DistMultScore()
......@@ -66,6 +108,7 @@ class KEModel(object):
elif model_name == 'RotatE':
self.score_func = RotatEScore(gamma, self.emb_init)
self.model_name = model_name
self.head_neg_score = self.score_func.create_neg(True)
self.tail_neg_score = self.score_func.create_neg(False)
self.head_neg_prepare = self.score_func.create_neg_prepare(True)
......@@ -74,31 +117,101 @@ class KEModel(object):
self.reset_parameters()
def share_memory(self):
# TODO(zhengda) we should make it work for parameters in score func
"""Use torch.tensor.share_memory_() to allow cross process embeddings access.
"""
self.entity_emb.share_memory()
self.relation_emb.share_memory()
if self.strict_rel_part:
self.global_relation_emb.share_memory()
else:
self.relation_emb.share_memory()
if self.model_name == 'TransR':
self.score_func.share_memory()
def save_emb(self, path, dataset):
"""Save the model.
Parameters
----------
path : str
Directory to save the model.
dataset : str
Dataset name as prefix to the saved embeddings.
"""
self.entity_emb.save(path, dataset+'_'+self.model_name+'_entity')
self.relation_emb.save(path, dataset+'_'+self.model_name+'_relation')
if self.strict_rel_part:
self.global_relation_emb.save(path, dataset+'_'+self.model_name+'_relation')
else:
self.relation_emb.save(path, dataset+'_'+self.model_name+'_relation')
self.score_func.save(path, dataset+'_'+self.model_name)
def load_emb(self, path, dataset):
"""Load the model.
Parameters
----------
path : str
Directory to load the model.
dataset : str
Dataset name as prefix to the saved embeddings.
"""
self.entity_emb.load(path, dataset+'_'+self.model_name+'_entity')
self.relation_emb.load(path, dataset+'_'+self.model_name+'_relation')
self.score_func.load(path, dataset+'_'+self.model_name)
def reset_parameters(self):
"""Re-initialize the model.
"""
self.entity_emb.init(self.emb_init)
self.score_func.reset_parameters()
self.relation_emb.init(self.emb_init)
if not self.strict_rel_part:
self.relation_emb.init(self.emb_init)
def predict_score(self, g):
"""Predict the positive score.
Parameters
----------
g : DGLGraph
Graph holding positive edges.
Returns
-------
tensor
The positive score
"""
self.score_func(g)
return g.edata['score']
def predict_neg_score(self, pos_g, neg_g, to_device=None, gpu_id=-1, trace=False,
neg_deg_sample=False):
"""Calculate the negative score.
Parameters
----------
pos_g : DGLGraph
Graph holding positive edges.
neg_g : DGLGraph
Graph holding negative edges.
to_device : func
Function to move data into device.
gpu_id : int
Which gpu to move data to.
trace : bool
If True, trace the computation. This is required in training.
If False, do not trace the computation.
Default: False
neg_deg_sample : bool
If True, we use the head and tail nodes of the positive edges to
construct negative edges.
Default: False
Returns
-------
tensor
The negative score
"""
num_chunks = neg_g.num_chunks
chunk_size = neg_g.chunk_size
neg_sample_size = neg_g.neg_sample_size
......@@ -160,6 +273,19 @@ class KEModel(object):
return neg_score
def forward_test(self, pos_g, neg_g, logs, gpu_id=-1):
"""Do the forward and generate ranking results.
Parameters
----------
pos_g : DGLGraph
Graph holding positive edges.
neg_g : DGLGraph
Graph holding negative edges.
logs : List
Where to put results in.
gpu_id : int
Which gpu to accelerate the calculation. if -1 is provided, cpu is used.
"""
pos_g.ndata['emb'] = self.entity_emb(pos_g.ndata['id'], gpu_id, False)
pos_g.edata['emb'] = self.relation_emb(pos_g.edata['id'], gpu_id, False)
......@@ -183,7 +309,7 @@ class KEModel(object):
# To compute the rank of a positive edge among all negative edges,
# we need to know how many negative edges have higher scores than
# the positive edge.
rankings = F.sum(neg_scores > pos_scores, dim=1) + 1
rankings = F.sum(neg_scores >= pos_scores, dim=1) + 1
rankings = F.asnumpy(rankings)
for i in range(batch_size):
ranking = rankings[i]
......@@ -197,6 +323,24 @@ class KEModel(object):
# @profile
def forward(self, pos_g, neg_g, gpu_id=-1):
"""Do the forward.
Parameters
----------
pos_g : DGLGraph
Graph holding positive edges.
neg_g : DGLGraph
Graph holding negative edges.
gpu_id : int
Which gpu to accelerate the calculation. if -1 is provided, cpu is used.
Returns
-------
tensor
loss value
dict
loss info
"""
pos_g.ndata['emb'] = self.entity_emb(pos_g.ndata['id'], gpu_id, True)
pos_g.edata['emb'] = self.relation_emb(pos_g.edata['id'], gpu_id, True)
......@@ -248,7 +392,63 @@ class KEModel(object):
return loss, log
def update(self, gpu_id=-1):
""" Update the embeddings in the model
gpu_id : int
Which gpu to accelerate the calculation. if -1 is provided, cpu is used.
"""
self.entity_emb.update(gpu_id)
self.relation_emb.update(gpu_id)
self.score_func.update(gpu_id)
def prepare_relation(self, device=None):
""" Prepare relation embeddings in multi-process multi-gpu training model.
device : th.device
Which device (GPU) to put relation embeddings in.
"""
self.relation_emb = ExternalEmbedding(self.args, self.n_relations, self.rel_dim, device)
self.relation_emb.init(self.emb_init)
if self.model_name == 'TransR':
local_projection_emb = ExternalEmbedding(self.args, self.n_relations,
self.entity_dim * self.rel_dim, device)
self.score_func.prepare_local_emb(local_projection_emb)
self.score_func.reset_parameters()
def writeback_relation(self, rank=0, rel_parts=None):
""" Writeback relation embeddings in a specific process to global relation embedding.
Used in multi-process multi-gpu training model.
rank : int
Process id.
rel_parts : List of tensor
List of tensor stroing edge types of each partition.
"""
idx = rel_parts[rank]
self.global_relation_emb.emb[idx] = F.copy_to(self.relation_emb.emb, F.cpu())[idx]
if self.model_name == 'TransR':
self.score_func.writeback_local_emb(idx)
def load_relation(self, device=None):
""" Sync global relation embeddings into local relation embeddings.
Used in multi-process multi-gpu training model.
device : th.device
Which device (GPU) to put relation embeddings in.
"""
self.relation_emb = ExternalEmbedding(self.args, self.n_relations, self.rel_dim, device)
self.relation_emb.emb = F.copy_to(self.global_relation_emb.emb, device)
if self.model_name == 'TransR':
local_projection_emb = ExternalEmbedding(self.args, self.n_relations,
self.entity_dim * self.rel_dim, device)
self.score_func.load_local_emb(local_projection_emb)
def create_async_update(self):
"""Set up the async update for entity embedding.
"""
self.entity_emb.create_async_update()
def finish_async_update(self):
"""Terminate the async update for entity embedding.
"""
self.entity_emb.finish_async_update()
......@@ -21,6 +21,9 @@ def batched_l1_dist(a, b):
return res
class TransEScore(nn.Block):
""" TransE score function
Paper link: https://papers.nips.cc/paper/5071-translating-embeddings-for-modeling-multi-relational-data
"""
def __init__(self, gamma, dist_func='l2'):
super(TransEScore, self).__init__()
self.gamma = gamma
......@@ -81,6 +84,9 @@ class TransEScore(nn.Block):
return fn
class TransRScore(nn.Block):
"""TransR score function
Paper link: https://www.aaai.org/ocs/index.php/AAAI/AAAI15/paper/download/9571/9523
"""
def __init__(self, gamma, projection_emb, relation_dim, entity_dim):
super(TransRScore, self).__init__()
self.gamma = gamma
......@@ -180,6 +186,18 @@ class TransRScore(nn.Block):
def load(self, path, name):
self.projection_emb.load(path, name+'projection')
def prepare_local_emb(self, projection_emb):
self.global_projection_emb = self.projection_emb
self.projection_emb = projection_emb
def writeback_local_emb(self, idx):
self.global_projection_emb.emb[idx] = self.projection_emb.emb.as_in_context(mx.cpu())[idx]
def load_local_emb(self, projection_emb):
context = projection_emb.emb.context
projection_emb.emb = self.projection_emb.emb.as_in_context(context)
self.projection_emb = projection_emb
def create_neg(self, neg_head):
gamma = self.gamma
if neg_head:
......@@ -200,6 +218,9 @@ class TransRScore(nn.Block):
return fn
class DistMultScore(nn.Block):
"""DistMult score function
Paper link: https://arxiv.org/abs/1412.6575
"""
def __init__(self):
super(DistMultScore, self).__init__()
......@@ -253,6 +274,9 @@ class DistMultScore(nn.Block):
return fn
class ComplExScore(nn.Block):
"""ComplEx score function
Paper link: https://arxiv.org/abs/1606.06357
"""
def __init__(self):
super(ComplExScore, self).__init__()
......@@ -321,6 +345,9 @@ class ComplExScore(nn.Block):
return fn
class RESCALScore(nn.Block):
"""RESCAL score function
Paper link: http://www.icml-2011.org/papers/438_icmlpaper.pdf
"""
def __init__(self, relation_dim, entity_dim):
super(RESCALScore, self).__init__()
self.relation_dim = relation_dim
......@@ -384,6 +411,9 @@ class RESCALScore(nn.Block):
return fn
class RotatEScore(nn.Block):
"""RotatE score function
Paper link: https://arxiv.org/abs/1902.10197
"""
def __init__(self, gamma, emb_init, eps=1e-10):
super(RotatEScore, self).__init__()
self.gamma = gamma
......
"""
KG Sparse embedding
"""
import os
import numpy as np
import mxnet as mx
......@@ -22,6 +25,20 @@ reshape = lambda arr, x, y: arr.reshape(x, y)
cuda = lambda arr, gpu: arr.as_in_context(mx.gpu(gpu))
class ExternalEmbedding:
"""Sparse Embedding for Knowledge Graph
It is used to store both entity embeddings and relation embeddings.
Parameters
----------
args :
Global configs.
num : int
Number of embeddings.
dim : int
Embedding dimention size.
ctx : mx.ctx
Device context to store the embedding.
"""
def __init__(self, args, num, dim, ctx):
self.gpu = args.gpu
self.args = args
......@@ -32,6 +49,13 @@ class ExternalEmbedding:
self.state_step = 0
def init(self, emb_init):
"""Initializing the embeddings.
Parameters
----------
emb_init : float
The intial embedding range should be [-emb_init, emb_init].
"""
nd.random.uniform(-emb_init, emb_init,
shape=self.emb.shape, dtype=self.emb.dtype,
ctx=self.emb.context, out=self.emb)
......@@ -41,6 +65,19 @@ class ExternalEmbedding:
pass
def __call__(self, idx, gpu_id=-1, trace=True):
""" Return sliced tensor.
Parameters
----------
idx : th.tensor
Slicing index
gpu_id : int
Which gpu to put sliced data in.
trace : bool
If True, trace the computation. This is required in training.
If False, do not trace the computation.
Default: True
"""
if self.emb.context != idx.context:
idx = idx.as_in_context(self.emb.context)
data = nd.take(self.emb, idx)
......@@ -52,6 +89,15 @@ class ExternalEmbedding:
return data
def update(self, gpu_id=-1):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
gpu_id : int
Which gpu to accelerate the calculation. if -1 is provided, cpu is used.
"""
self.state_step += 1
for idx, data in self.trace:
grad = data.grad
......@@ -82,13 +128,33 @@ class ExternalEmbedding:
self.trace = []
def curr_emb(self):
"""Return embeddings in trace.
"""
data = [data for _, data in self.trace]
return nd.concat(*data, dim=0)
def save(self, path, name):
"""Save embeddings.
Parameters
----------
path : str
Directory to save the embedding.
name : str
Embedding name.
"""
emb_fname = os.path.join(path, name+'.npy')
np.save(emb_fname, self.emb.asnumpy())
def load(self, path, name):
"""Load embeddings.
Parameters
----------
path : str
Directory to load the embedding.
name : str
Embedding name.
"""
emb_fname = os.path.join(path, name+'.npy')
self.emb = nd.array(np.load(emb_fname))
......@@ -19,6 +19,9 @@ def batched_l1_dist(a, b):
return res
class TransEScore(nn.Module):
"""TransE score function
Paper link: https://papers.nips.cc/paper/5071-translating-embeddings-for-modeling-multi-relational-data
"""
def __init__(self, gamma, dist_func='l2'):
super(TransEScore, self).__init__()
self.gamma = gamma
......@@ -79,6 +82,9 @@ class TransEScore(nn.Module):
return fn
class TransRScore(nn.Module):
"""TransR score function
Paper link: https://www.aaai.org/ocs/index.php/AAAI/AAAI15/paper/download/9571/9523
"""
def __init__(self, gamma, projection_emb, relation_dim, entity_dim):
super(TransRScore, self).__init__()
self.gamma = gamma
......@@ -141,12 +147,27 @@ class TransRScore(nn.Module):
def update(self, gpu_id=-1):
self.projection_emb.update(gpu_id)
def save(self, path, name):
def save(self, path, name):
self.projection_emb.save(path, name+'projection')
def load(self, path, name):
self.projection_emb.load(path, name+'projection')
def prepare_local_emb(self, projection_emb):
self.global_projection_emb = self.projection_emb
self.projection_emb = projection_emb
def writeback_local_emb(self, idx):
self.global_projection_emb.emb[idx] = self.projection_emb.emb.cpu()[idx]
def load_local_emb(self, projection_emb):
device = projection_emb.emb.device
projection_emb.emb = self.projection_emb.emb.to(device)
self.projection_emb = projection_emb
def share_memory(self):
self.projection_emb.share_memory()
def create_neg(self, neg_head):
gamma = self.gamma
if neg_head:
......@@ -167,6 +188,9 @@ class TransRScore(nn.Module):
return fn
class DistMultScore(nn.Module):
"""DistMult score function
Paper link: https://arxiv.org/abs/1412.6575
"""
def __init__(self):
super(DistMultScore, self).__init__()
......@@ -220,6 +244,9 @@ class DistMultScore(nn.Module):
return fn
class ComplExScore(nn.Module):
"""ComplEx score function
Paper link: https://arxiv.org/abs/1606.06357
"""
def __init__(self):
super(ComplExScore, self).__init__()
......@@ -291,6 +318,9 @@ class ComplExScore(nn.Module):
return fn
class RESCALScore(nn.Module):
"""RESCAL score function
Paper link: http://www.icml-2011.org/papers/438_icmlpaper.pdf
"""
def __init__(self, relation_dim, entity_dim):
super(RESCALScore, self).__init__()
self.relation_dim = relation_dim
......@@ -354,6 +384,9 @@ class RESCALScore(nn.Module):
return fn
class RotatEScore(nn.Module):
"""RotatE score function
Paper link: https://arxiv.org/abs/1902.10197
"""
def __init__(self, gamma, emb_init):
super(RotatEScore, self).__init__()
self.gamma = gamma
......
"""
Knowledge Graph Embedding Models.
1. TransE
2. DistMult
3. ComplEx
4. RotatE
5. pRotatE
6. TransH
7. TransR
8. TransD
9. RESCAL
KG Sparse embedding
"""
import os
import numpy as np
......@@ -18,6 +9,12 @@ import torch.nn as nn
import torch.nn.functional as functional
import torch.nn.init as INIT
import torch.multiprocessing as mp
from torch.multiprocessing import Queue
from _thread import start_new_thread
import traceback
from functools import wraps
from .. import *
logsigmoid = functional.logsigmoid
......@@ -26,14 +23,97 @@ def get_device(args):
return th.device('cpu') if args.gpu[0] < 0 else th.device('cuda:' + str(args.gpu[0]))
norm = lambda x, p: x.norm(p=p)**p
get_scalar = lambda x: x.detach().item()
reshape = lambda arr, x, y: arr.view(x, y)
cuda = lambda arr, gpu: arr.cuda(gpu)
def thread_wrapped_func(func):
"""Wrapped func for torch.multiprocessing.Process.
With this wrapper we can use OMP threads in subprocesses
otherwise, OMP_NUM_THREADS=1 is mandatory.
How to use:
@thread_wrapped_func
def func_to_wrap(args ...):
"""
@wraps(func)
def decorated_function(*args, **kwargs):
queue = Queue()
def _queue_result():
exception, trace, res = None, None, None
try:
res = func(*args, **kwargs)
except Exception as e:
exception = e
trace = traceback.format_exc()
queue.put((res, exception, trace))
start_new_thread(_queue_result, ())
result, exception, trace = queue.get()
if exception is None:
return result
else:
assert isinstance(exception, Exception)
raise exception.__class__(trace)
return decorated_function
@thread_wrapped_func
def async_update(args, emb, queue):
"""Asynchronous embedding update for entity embeddings.
How it works:
1. trainer process push entity embedding update requests into the queue.
2. async_update process pull requests from the queue, calculate
the gradient state and gradient and write it into entity embeddings.
Parameters
----------
args :
Global confis.
emb : ExternalEmbedding
The entity embeddings.
queue:
The request queue.
"""
th.set_num_threads(args.num_thread)
while True:
(grad_indices, grad_values, gpu_id) = queue.get()
clr = emb.args.lr
if grad_indices is None:
return
with th.no_grad():
grad_sum = (grad_values * grad_values).mean(1)
device = emb.state_sum.device
if device != grad_indices.device:
grad_indices = grad_indices.to(device)
if device != grad_sum.device:
grad_sum = grad_sum.to(device)
emb.state_sum.index_add_(0, grad_indices, grad_sum)
std = emb.state_sum[grad_indices] # _sparse_mask
if gpu_id >= 0:
std = std.cuda(gpu_id)
std_values = std.sqrt_().add_(1e-10).unsqueeze(1)
tmp = (-clr * grad_values / std_values)
if tmp.device != device:
tmp = tmp.to(device)
emb.emb.index_add_(0, grad_indices, tmp)
class ExternalEmbedding:
"""Sparse Embedding for Knowledge Graph
It is used to store both entity embeddings and relation embeddings.
Parameters
----------
args :
Global configs.
num : int
Number of embeddings.
dim : int
Embedding dimention size.
device : th.device
Device to store the embedding.
"""
def __init__(self, args, num, dim, device):
self.gpu = args.gpu
self.args = args
......@@ -42,16 +122,42 @@ class ExternalEmbedding:
self.emb = th.empty(num, dim, dtype=th.float32, device=device)
self.state_sum = self.emb.new().resize_(self.emb.size(0)).zero_()
self.state_step = 0
# queue used by asynchronous update
self.async_q = None
# asynchronous update process
self.async_p = None
def init(self, emb_init):
"""Initializing the embeddings.
Parameters
----------
emb_init : float
The intial embedding range should be [-emb_init, emb_init].
"""
INIT.uniform_(self.emb, -emb_init, emb_init)
INIT.zeros_(self.state_sum)
def share_memory(self):
"""Use torch.tensor.share_memory_() to allow cross process tensor access
"""
self.emb.share_memory_()
self.state_sum.share_memory_()
def __call__(self, idx, gpu_id=-1, trace=True):
""" Return sliced tensor.
Parameters
----------
idx : th.tensor
Slicing index
gpu_id : int
Which gpu to put sliced data in.
trace : bool
If True, trace the computation. This is required in training.
If False, do not trace the computation.
Default: True
"""
s = self.emb[idx]
if gpu_id >= 0:
s = s.cuda(gpu_id)
......@@ -65,6 +171,15 @@ class ExternalEmbedding:
return data
def update(self, gpu_id=-1):
""" Update embeddings in a sparse manner
Sparse embeddings are updated in mini batches. we maintains gradient states for
each embedding so they can be updated separately.
Parameters
----------
gpu_id : int
Which gpu to accelerate the calculation. if -1 is provided, cpu is used.
"""
self.state_step += 1
with th.no_grad():
for idx, data in self.trace:
......@@ -76,33 +191,70 @@ class ExternalEmbedding:
# the update is non-linear so indices must be unique
grad_indices = idx
grad_values = grad
grad_sum = (grad_values * grad_values).mean(1)
device = self.state_sum.device
if device != grad_indices.device:
grad_indices = grad_indices.to(device)
if device != grad_sum.device:
grad_sum = grad_sum.to(device)
self.state_sum.index_add_(0, grad_indices, grad_sum)
std = self.state_sum[grad_indices] # _sparse_mask
if gpu_id >= 0:
std = std.cuda(gpu_id)
std_values = std.sqrt_().add_(1e-10).unsqueeze(1)
tmp = (-clr * grad_values / std_values)
if tmp.device != device:
tmp = tmp.to(device)
# TODO(zhengda) the overhead is here.
self.emb.index_add_(0, grad_indices, tmp)
if self.async_q is not None:
grad_indices.share_memory_()
grad_values.share_memory_()
self.async_q.put((grad_indices, grad_values, gpu_id))
else:
grad_sum = (grad_values * grad_values).mean(1)
device = self.state_sum.device
if device != grad_indices.device:
grad_indices = grad_indices.to(device)
if device != grad_sum.device:
grad_sum = grad_sum.to(device)
self.state_sum.index_add_(0, grad_indices, grad_sum)
std = self.state_sum[grad_indices] # _sparse_mask
if gpu_id >= 0:
std = std.cuda(gpu_id)
std_values = std.sqrt_().add_(1e-10).unsqueeze(1)
tmp = (-clr * grad_values / std_values)
if tmp.device != device:
tmp = tmp.to(device)
# TODO(zhengda) the overhead is here.
self.emb.index_add_(0, grad_indices, tmp)
self.trace = []
def create_async_update(self):
"""Set up the async update subprocess.
"""
self.async_q = Queue(1)
self.async_p = mp.Process(target=async_update, args=(self.args, self, self.async_q))
self.async_p.start()
def finish_async_update(self):
"""Notify the async update subprocess to quit.
"""
self.async_q.put((None, None, None))
self.async_p.join()
def curr_emb(self):
"""Return embeddings in trace.
"""
data = [data for _, data in self.trace]
return th.cat(data, 0)
def save(self, path, name):
"""Save embeddings.
Parameters
----------
path : str
Directory to save the embedding.
name : str
Embedding name.
"""
file_name = os.path.join(path, name+'.npy')
np.save(file_name, self.emb.cpu().detach().numpy())
def load(self, path, name):
"""Load embeddings.
Parameters
----------
path : str
Directory to load the embedding.
name : str
Embedding name.
"""
file_name = os.path.join(path, name+'.npy')
self.emb = th.Tensor(np.load(file_name))
......@@ -144,7 +144,13 @@ def check_score_func(func_name):
return_false_neg=False)
for pos_g, neg_g in sampler:
neg_g = create_neg_subgraph(pos_g, neg_g, neg_sample_size, True, True, g.number_of_nodes())
neg_g = create_neg_subgraph(pos_g,
neg_g,
neg_sample_size,
neg_sample_size,
True,
True,
g.number_of_nodes())
pos_g.copy_from_parent()
neg_g.copy_from_parent()
score1 = F.reshape(model.predict_score(neg_g), (batch_size, -1))
......
......@@ -15,8 +15,8 @@ if backend.lower() == 'mxnet':
else:
import torch.multiprocessing as mp
from train_pytorch import load_model
from train_pytorch import train
from train_pytorch import test
from train_pytorch import train, train_mp
from train_pytorch import test, test_mp
class ArgParser(argparse.ArgumentParser):
def __init__(self):
......@@ -98,7 +98,7 @@ class ArgParser(argparse.ArgumentParser):
help='set value > 0.0 if regularization is used')
self.add_argument('-rn', '--regularization_norm', type=int, default=3,
help='norm used in regularization')
self.add_argument('--num_worker', type=int, default=16,
self.add_argument('--num_worker', type=int, default=32,
help='number of workers used for loading data')
self.add_argument('--non_uni_weight', action='store_true',
help='if use uniform weight when computing loss')
......@@ -112,6 +112,12 @@ class ArgParser(argparse.ArgumentParser):
help='number of process used')
self.add_argument('--rel_part', action='store_true',
help='enable relation partitioning')
self.add_argument('--nomp_thread_per_process', type=int, default=-1,
help='num of omp threads used per process in multi-process training')
self.add_argument('--async_update', action='store_true',
help='allow async_update on node embedding')
self.add_argument('--force_sync_interval', type=int, default=-1,
help='We force a synchronization between processes every x steps')
def get_logger(args):
......@@ -162,50 +168,70 @@ def run(args, logger):
if args.neg_chunk_size_test < 0:
args.neg_chunk_size_test = args.neg_sample_size_test
num_workers = args.num_worker
train_data = TrainDataset(dataset, args, ranks=args.num_proc)
args.strict_rel_part = args.mix_cpu_gpu and (train_data.cross_part == False)
# Automatically set number of OMP threads for each process if it is not provided
# The value for GPU is evaluated in AWS p3.16xlarge
# The value for CPU is evaluated in AWS x1.32xlarge
if args.nomp_thread_per_process == -1:
if len(args.gpu) > 0:
# GPU training
args.num_thread = 4
else:
# CPU training
args.num_thread = mp.cpu_count() // args.num_proc + 1
else:
args.num_thread = args.nomp_thread_per_process
if args.num_proc > 1:
train_samplers = []
for i in range(args.num_proc):
train_sampler_head = train_data.create_sampler(args.batch_size, args.neg_sample_size,
train_sampler_head = train_data.create_sampler(args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
mode='chunk-head',
num_workers=args.num_worker,
mode='head',
num_workers=num_workers,
shuffle=True,
exclude_positive=True,
exclude_positive=False,
rank=i)
train_sampler_tail = train_data.create_sampler(args.batch_size, args.neg_sample_size,
train_sampler_tail = train_data.create_sampler(args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
mode='chunk-tail',
num_workers=args.num_worker,
mode='tail',
num_workers=num_workers,
shuffle=True,
exclude_positive=True,
exclude_positive=False,
rank=i)
train_samplers.append(NewBidirectionalOneShotIterator(train_sampler_head, train_sampler_tail,
args.neg_chunk_size,
args.neg_chunk_size, args.neg_sample_size,
True, n_entities))
else:
train_sampler_head = train_data.create_sampler(args.batch_size, args.neg_sample_size,
train_sampler_head = train_data.create_sampler(args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
mode='chunk-head',
num_workers=args.num_worker,
mode='head',
num_workers=num_workers,
shuffle=True,
exclude_positive=True)
train_sampler_tail = train_data.create_sampler(args.batch_size, args.neg_sample_size,
exclude_positive=False)
train_sampler_tail = train_data.create_sampler(args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
mode='chunk-tail',
num_workers=args.num_worker,
mode='tail',
num_workers=num_workers,
shuffle=True,
exclude_positive=True)
exclude_positive=False)
train_sampler = NewBidirectionalOneShotIterator(train_sampler_head, train_sampler_tail,
args.neg_chunk_size,
args.neg_chunk_size, args.neg_sample_size,
True, n_entities)
# for multiprocessing evaluation, we don't need to sample multiple batches at a time
# in each process.
num_workers = args.num_worker
if args.num_proc > 1:
num_workers = 1
if args.valid or args.test:
args.num_test_proc = args.num_proc if args.num_proc < len(args.gpu) else len(args.gpu)
eval_dataset = EvalDataset(dataset, args)
if args.valid:
# Here we want to use the regualr negative sampler because we need to ensure that
......@@ -248,24 +274,25 @@ def run(args, logger):
if args.test:
# Here we want to use the regualr negative sampler because we need to ensure that
# all positive edges are excluded.
if args.num_proc > 1:
# We use a maximum of num_gpu in test stage to save GPU memory.
if args.num_test_proc > 1:
test_sampler_tails = []
test_sampler_heads = []
for i in range(args.num_proc):
for i in range(args.num_test_proc):
test_sampler_head = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_chunk_size_test,
args.eval_filter,
mode='chunk-head',
num_workers=num_workers,
rank=i, ranks=args.num_proc)
rank=i, ranks=args.num_test_proc)
test_sampler_tail = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_chunk_size_test,
args.eval_filter,
mode='chunk-tail',
num_workers=num_workers,
rank=i, ranks=args.num_proc)
rank=i, ranks=args.num_test_proc)
test_sampler_heads.append(test_sampler_head)
test_sampler_tails.append(test_sampler_tail)
else:
......@@ -290,24 +317,31 @@ def run(args, logger):
# load model
model = load_model(logger, args, n_entities, n_relations)
if args.num_proc > 1:
if args.num_proc > 1 or args.async_update:
model.share_memory()
# train
start = time.time()
rel_parts = train_data.rel_parts if args.strict_rel_part else None
if args.num_proc > 1:
procs = []
barrier = mp.Barrier(args.num_proc)
for i in range(args.num_proc):
rel_parts = train_data.rel_parts if args.rel_part else None
valid_samplers = [valid_sampler_heads[i], valid_sampler_tails[i]] if args.valid else None
proc = mp.Process(target=train, args=(args, model, train_samplers[i], i, rel_parts, valid_samplers))
valid_sampler = [valid_sampler_heads[i], valid_sampler_tails[i]] if args.valid else None
proc = mp.Process(target=train_mp, args=(args,
model,
train_samplers[i],
valid_sampler,
i,
rel_parts,
barrier))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
else:
valid_samplers = [valid_sampler_head, valid_sampler_tail] if args.valid else None
train(args, model, train_sampler, valid_samplers)
train(args, model, train_sampler, valid_samplers, rel_parts=rel_parts)
print('training takes {} seconds'.format(time.time() - start))
if args.save_emb is not None:
......@@ -318,23 +352,28 @@ def run(args, logger):
# test
if args.test:
start = time.time()
if args.num_proc > 1:
queue = mp.Queue(args.num_proc)
if args.num_test_proc > 1:
queue = mp.Queue(args.num_test_proc)
procs = []
for i in range(args.num_proc):
proc = mp.Process(target=test, args=(args, model, [test_sampler_heads[i], test_sampler_tails[i]],
i, 'Test', queue))
for i in range(args.num_test_proc):
proc = mp.Process(target=test_mp, args=(args,
model,
[test_sampler_heads[i], test_sampler_tails[i]],
i,
'Test',
queue))
procs.append(proc)
proc.start()
total_metrics = {}
for i in range(args.num_proc):
metrics = queue.get()
for k, v in metrics.items():
if i == 0:
total_metrics[k] = v / args.num_proc
else:
total_metrics[k] += v / args.num_proc
metrics = {}
logs = []
for i in range(args.num_test_proc):
log = queue.get()
logs = logs + log
for metric in logs[0].keys():
metrics[metric] = sum([log[metric] for log in logs]) / len(logs)
for k, v in metrics.items():
print('Test average {} at [{}/{}]: {}'.format(k, args.step, args.max_step, v))
......
......@@ -24,8 +24,8 @@ def load_model_from_checkpoint(logger, args, n_entities, n_relations, ckpt_path)
model.load_emb(ckpt_path, args.dataset)
return model
def train(args, model, train_sampler, rank=0, rel_parts=None, valid_samplers=None):
assert args.num_proc == 1, "MXNet KGE does not support multi-process now"
def train(args, model, train_sampler, valid_samplers=None, rank=0, rel_parts=None, barrier=None):
assert args.num_proc <= 1, "MXNet KGE does not support multi-process now"
assert args.rel_part == False, "No need for relation partition in single process for MXNet KGE"
logs = []
......@@ -37,6 +37,9 @@ def train(args, model, train_sampler, rank=0, rel_parts=None, valid_samplers=Non
else:
gpu_id = -1
if args.strict_rel_part:
model.prepare_relation(mx.gpu(gpu_id))
start = time.time()
for step in range(args.init_step, args.max_step):
pos_g, neg_g = next(train_sampler)
......@@ -59,11 +62,14 @@ def train(args, model, train_sampler, rank=0, rel_parts=None, valid_samplers=Non
start = time.time()
test(args, model, valid_samplers, mode='Valid')
print('test:', time.time() - start)
if args.strict_rel_part:
model.writeback_relation(rank, rel_parts)
# clear cache
logs = []
def test(args, model, test_samplers, rank=0, mode='Test', queue=None):
assert args.num_proc == 1, "MXNet KGE does not support multi-process now"
assert args.num_proc <= 1, "MXNet KGE does not support multi-process now"
logs = []
if len(args.gpu) > 0:
......@@ -71,6 +77,9 @@ def test(args, model, test_samplers, rank=0, mode='Test', queue=None):
else:
gpu_id = -1
if args.strict_rel_part:
model.load_relation(mx.gpu(gpu_id))
for sampler in test_samplers:
#print('Number of tests: ' + len(sampler))
count = 0
......
......@@ -3,42 +3,18 @@ from models import KEModel
from torch.utils.data import DataLoader
import torch.optim as optim
import torch as th
import torch.multiprocessing as mp
from torch.multiprocessing import Queue
from _thread import start_new_thread
from distutils.version import LooseVersion
TH_VERSION = LooseVersion(th.__version__)
if TH_VERSION.version[0] == 1 and TH_VERSION.version[1] < 2:
raise Exception("DGL-ke has to work with Pytorch version >= 1.2")
from models.pytorch.tensor_models import thread_wrapped_func
import os
import logging
import time
from functools import wraps
def thread_wrapped_func(func):
@wraps(func)
def decorated_function(*args, **kwargs):
queue = Queue()
def _queue_result():
exception, trace, res = None, None, None
try:
res = func(*args, **kwargs)
except Exception as e:
exception = e
trace = traceback.format_exc()
queue.put((res, exception, trace))
start_new_thread(_queue_result, ())
result, exception, trace = queue.get()
if exception is None:
return result
else:
assert isinstance(exception, Exception)
raise exception.__class__(trace)
return decorated_function
def load_model(logger, args, n_entities, n_relations, ckpt=None):
model = KEModel(args, args.model_name, n_entities, n_relations,
args.hidden_dim, args.gamma,
......@@ -53,10 +29,7 @@ def load_model_from_checkpoint(logger, args, n_entities, n_relations, ckpt_path)
model.load_emb(ckpt_path, args.dataset)
return model
@thread_wrapped_func
def train(args, model, train_sampler, rank=0, rel_parts=None, valid_samplers=None):
if args.num_proc > 1:
th.set_num_threads(4)
def train(args, model, train_sampler, valid_samplers=None, rank=0, rel_parts=None, barrier=None):
logs = []
for arg in vars(args):
logging.info('{:20}:{}'.format(arg, getattr(args, arg)))
......@@ -66,6 +39,11 @@ def train(args, model, train_sampler, rank=0, rel_parts=None, valid_samplers=Non
else:
gpu_id = -1
if args.async_update:
model.create_async_update()
if args.strict_rel_part:
model.prepare_relation(th.device('cuda:' + str(gpu_id)))
start = time.time()
sample_time = 0
update_time = 0
......@@ -90,52 +68,78 @@ def train(args, model, train_sampler, rank=0, rel_parts=None, valid_samplers=Non
update_time += time.time() - start1
logs.append(log)
if step % args.log_interval == 0:
# force synchronize embedding across processes every X steps
if args.force_sync_interval > 0 and \
(step + 1) % args.force_sync_interval == 0:
barrier.wait()
if (step + 1) % args.log_interval == 0:
for k in logs[0].keys():
v = sum(l[k] for l in logs) / len(logs)
print('[Train]({}/{}) average {}: {}'.format(step, args.max_step, k, v))
print('[{}][Train]({}/{}) average {}: {}'.format(rank, (step + 1), args.max_step, k, v))
logs = []
print('[Train] {} steps take {:.3f} seconds'.format(args.log_interval,
print('[{}][Train] {} steps take {:.3f} seconds'.format(rank, args.log_interval,
time.time() - start))
print('sample: {:.3f}, forward: {:.3f}, backward: {:.3f}, update: {:.3f}'.format(
sample_time, forward_time, backward_time, update_time))
print('[{}]sample: {:.3f}, forward: {:.3f}, backward: {:.3f}, update: {:.3f}'.format(
rank, sample_time, forward_time, backward_time, update_time))
sample_time = 0
update_time = 0
forward_time = 0
backward_time = 0
start = time.time()
if args.valid and step % args.eval_interval == 0 and step > 1 and valid_samplers is not None:
start = time.time()
test(args, model, valid_samplers, mode='Valid')
print('test:', time.time() - start)
if args.valid and (step + 1) % args.eval_interval == 0 and step > 1 and valid_samplers is not None:
valid_start = time.time()
if args.strict_rel_part:
model.writeback_relation(rank, rel_parts)
# forced sync for validation
if barrier is not None:
barrier.wait()
test(args, model, valid_samplers, rank, mode='Valid')
print('test:', time.time() - valid_start)
if barrier is not None:
barrier.wait()
print('train {} takes {:.3f} seconds'.format(rank, time.time() - start))
if args.async_update:
model.finish_async_update()
if args.strict_rel_part:
model.writeback_relation(rank, rel_parts)
@thread_wrapped_func
def test(args, model, test_samplers, rank=0, mode='Test', queue=None):
if args.num_proc > 1:
th.set_num_threads(4)
if len(args.gpu) > 0:
gpu_id = args.gpu[rank % len(args.gpu)] if args.mix_cpu_gpu and args.num_proc > 1 else args.gpu[0]
else:
gpu_id = -1
if args.strict_rel_part:
model.load_relation(th.device('cuda:' + str(gpu_id)))
with th.no_grad():
logs = []
for sampler in test_samplers:
count = 0
for pos_g, neg_g in sampler:
with th.no_grad():
model.forward_test(pos_g, neg_g, logs, gpu_id)
model.forward_test(pos_g, neg_g, logs, gpu_id)
metrics = {}
if len(logs) > 0:
for metric in logs[0].keys():
metrics[metric] = sum([log[metric] for log in logs]) / len(logs)
if queue is not None:
queue.put(metrics)
queue.put(logs)
else:
for k, v in metrics.items():
print('{} average {} at [{}/{}]: {}'.format(mode, k, args.step, args.max_step, v))
print('[{}]{} average {} at [{}/{}]: {}'.format(rank, mode, k, args.step, args.max_step, v))
test_samplers[0] = test_samplers[0].reset()
test_samplers[1] = test_samplers[1].reset()
@thread_wrapped_func
def train_mp(args, model, train_sampler, valid_samplers=None, rank=0, rel_parts=None, barrier=None):
if args.num_proc > 1:
th.set_num_threads(args.num_thread)
train(args, model, train_sampler, valid_samplers, rank, rel_parts, barrier)
@thread_wrapped_func
def test_mp(args, model, test_samplers, rank=0, mode='Test', queue=None):
test(args, model, test_samplers, rank, mode, queue)
......@@ -1227,13 +1227,7 @@ NegSubgraph EdgeSamplerObject::genNegEdgeSubgraph(const Subgraph &pos_subg,
neg_subg.graph = GraphPtr(new ImmutableGraph(neg_coo));
neg_subg.induced_vertices = induced_neg_vid;
neg_subg.induced_edges = induced_neg_eid;
// If we didn't sample all nodes to form negative edges, some of the nodes
// in the vector might be redundant.
if (neg_sample_size < num_tot_nodes) {
std::sort(neg_vids.begin(), neg_vids.end());
auto it = std::unique(neg_vids.begin(), neg_vids.end());
neg_vids.resize(it - neg_vids.begin());
}
if (IsNegativeHeadMode(neg_mode)) {
neg_subg.head_nid = aten::VecToIdArray(Global2Local(neg_vids, neg_map));
neg_subg.tail_nid = aten::VecToIdArray(local_pos_vids);
......
......@@ -228,12 +228,10 @@ def check_head_tail(g):
lsrc = np.unique(F.asnumpy(lsrc))
head_nid = np.unique(F.asnumpy(g.head_nid))
assert len(head_nid) == len(g.head_nid)
np.testing.assert_equal(lsrc, head_nid)
ldst = np.unique(F.asnumpy(ldst))
tail_nid = np.unique(F.asnumpy(g.tail_nid))
assert len(tail_nid) == len(g.tail_nid)
np.testing.assert_equal(tail_nid, ldst)
......
......@@ -60,12 +60,45 @@ elif [ "$2" == "gpu" ]; then
python3 train.py --model DistMult --dataset FB15k --batch_size 128 \
--neg_sample_size 16 --hidden_dim 100 --gamma 500.0 --lr 0.1 --max_step 100 \
--batch_size_eval 16 --gpu 0 --valid --test -adv --mix_cpu_gpu --eval_percent 0.01 \
--save_emb DistMult_FB15k_emb --data_path /data/kg || fail "run mix CPU/GPU DistMult"
--save_emb DistMult_FB15k_emb --data_path /data/kg || fail "run mix with async CPU/GPU DistMult"
# verify saving training result
python3 eval.py --model_name DistMult --dataset FB15k --hidden_dim 100 \
--gamma 500.0 --batch_size 16 --gpu 0 --model_path DistMult_FB15k_emb/ \
--eval_percent 0.01 --data_path /data/kg || fail "eval DistMult on $2"
if [ "$1" == "pytorch" ]; then
# verify mixed CPU GPU training with async_update
python3 train.py --model DistMult --dataset FB15k --batch_size 128 \
--neg_sample_size 16 --hidden_dim 100 --gamma 500.0 --lr 0.1 --max_step 100 \
--batch_size_eval 16 --gpu 0 --valid --test -adv --mix_cpu_gpu --eval_percent 0.01 \
--async_update --data_path /data/kg || fail "run mix CPU/GPU DistMult"
# verify mixed CPU GPU training with random partition
python3 train.py --model DistMult --dataset FB15k --batch_size 128 \
--neg_sample_size 16 --hidden_dim 100 --gamma 500.0 --lr 0.1 --max_step 100 \
--batch_size_eval 16 --num_proc 2 --gpu 0 --valid --test -adv --mix_cpu_gpu \
--eval_percent 0.01 --async_update --force_sync_interval 100 \
--data_path /data/kg || fail "run multiprocess async CPU/GPU DistMult"
# verify mixed CPU GPU training with random partition async_update
python3 train.py --model DistMult --dataset FB15k --batch_size 128 \
--neg_sample_size 16 --hidden_dim 100 --gamma 500.0 --lr 0.1 --max_step 100 \
--batch_size_eval 16 --num_proc 2 --gpu 0 --valid --test -adv --mix_cpu_gpu \
--eval_percent 0.01 --rel_part --async_update --force_sync_interval 100 \
--data_path /data/kg || fail "run multiprocess async CPU/GPU DistMult"
# multi process training TransR
python3 train.py --model TransR --dataset FB15k --batch_size 128 \
--neg_sample_size 16 --hidden_dim 100 --gamma 500.0 --lr 0.1 --max_step 100 \
--batch_size_eval 16 --num_proc 2 --gpu 0 --valid --test -adv --eval_interval 30 \
--eval_percent 0.01 --data_path /data/kg --mix_cpu_gpu --rel_part --async_update \
--save_emb TransR_FB15k_emb || fail "run multiprocess TransR on $2"
python3 eval.py --model_name TransR --dataset FB15k --hidden_dim 100 \
--gamma 500.0 --batch_size 16 --num_proc 2 --gpu 0 --model_path TransR_FB15k_emb/ \
--eval_percent 0.01 --mix_cpu_gpu --data_path /data/kg || fail "eval multiprocess TransR on $2"
fi
fi
popd > /dev/null
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment