Unverified Commit 41f8a162 authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Feature Improvement]Edge Sampler w and w/o replacement (#1116)



* Add weight based edge sampler

* Can run, edge weight work.
TODO: test node weight

* Fix node weight sample

* Fix y

* Update doc

* Fix syntex

* Fix

* Fix GPU test for sampler

* Fix test

* Fix

* Refactor EdgeSampler to act as class object not function that it
can record its own private states.

* clean

* Fix

* Fix

* Fix run bug on kg app

* update

* update test

* test

* Simply python API and fix some C code

* Fix

* Fix

* Fix syntex

* Fix

* Update API description

* add replacement for edge sampler

* Now edge sampler support replacement and no-replacement

* Fix

* Fix

* change kg/app to use edge sampler with replacement config

* Update replacement algo

* Fix syntax

* Update

* Update
Co-authored-by: default avatarDa Zheng <zhengda1936@gmail.com>
parent e890a899
......@@ -4,6 +4,7 @@ import scipy as sp
import dgl.backend as F
import dgl
import os
import sys
import pickle
import time
......
......@@ -530,10 +530,17 @@ class EdgeSampler(object):
edge only if the triple (source node, destination node and relation)
matches one of the edges in the graph.
For uniform sampling, the sampler generates only num_of_edges/batch_size
samples.
For uniform sampling, the sampler generates samples infinitly.
This sampler samples positive edges without replacement by default, which means
it returns a fixed number of batches (i.e., num_edges/batch_size), and the
positive edges sampled will not be duplicated. However, one can explicitly
specify sampling with replacement (replacement = True), that the sampler treats
each sampling of a single positive edge as a standalone event.
To contorl how many samples the sampler can return, a reset parameter can be used.
If it is set to true, the sampler will generate samples infinitely. For the sampler
with replacement, it will reshuffle the seed edges each time it consumes all the
edges and reset the replacement state. If it is set to false, the sampler will only
generate num_edges/batch_size samples.
Parameters
----------
......@@ -554,6 +561,14 @@ class EdgeSampler(object):
The number of workers to sample edges in parallel.
prefetch : bool, optional
If true, prefetch the samples in the next batch. Default: False
replacement: bool, optional
Whether the sampler samples edges with or without repalcement. Default False
reset: bool, optional
If true, the sampler will generate samples infinitely, and for the sampler with
replacement, it will reshuffle the edges each time it consumes all the edges and
reset the replacement state.
If false, the sampler will only generate num_edges/batch_size samples by default.
Default: False.
negative_mode : string, optional
The method used to construct negative edges. Possible values are 'head', 'tail'.
neg_sample_size : int, optional
......@@ -590,6 +605,8 @@ class EdgeSampler(object):
shuffle=False,
num_workers=1,
prefetch=False,
replacement=False,
reset=False,
negative_mode="",
neg_sample_size=0,
exclude_positive=False,
......@@ -617,6 +634,7 @@ class EdgeSampler(object):
self._seed_edges = F.arange(0, g.number_of_edges())
else:
self._seed_edges = seed_edges
if shuffle:
self._seed_edges = F.rand_shuffle(self._seed_edges)
if edge_weight is None:
......@@ -634,6 +652,8 @@ class EdgeSampler(object):
if prefetch:
self._prefetching_wrapper_class = ThreadPrefetchingWrapper
self._num_prefetch = num_workers * 2 if prefetch else 0
self._replacement = replacement
self._reset = reset
self._num_workers = int(num_workers)
self._negative_mode = negative_mode
......@@ -643,8 +663,10 @@ class EdgeSampler(object):
self._sampler = _CAPI_CreateUniformEdgeSampler(
self.g._graph,
self.seed_edges.todgltensor(),
self.batch_size, # batch size
self._batch_size, # batch size
self._num_workers, # num batches
self._replacement,
self._reset,
self._negative_mode,
self._neg_sample_size,
self._exclude_positive,
......@@ -658,6 +680,8 @@ class EdgeSampler(object):
self._node_weight,
self._batch_size, # batch size
self._num_workers, # num batches
self._replacement,
self._reset,
self._negative_mode,
self._neg_sample_size,
self._exclude_positive,
......@@ -678,7 +702,11 @@ class EdgeSampler(object):
Returns
-------
list[GraphIndex] or list[(GraphIndex, GraphIndex)]
Next "bunch" of edges to be processed.
Next "bunch" of edges to be processed.
If negative_mode is specified, a list of (pos_subg, neg_subg) pairs i
s returned.
If return_false_neg is specified as True, the true negative edges and
false negative edges in neg_subg is identified in neg_subg.edata['false_neg'].
'''
if self._is_uniform:
subgs = _CAPI_FetchUniformEdgeSample(
......@@ -708,6 +736,13 @@ class EdgeSampler(object):
def __iter__(self):
it = SamplerIter(self)
if self._is_uniform:
subgs = _CAPI_ResetUniformEdgeSample(
self._sampler)
else:
subgs = _CAPI_ResetWeightedEdgeSample(
self._sampler)
if self._num_prefetch:
return self._prefetching_wrapper_class(it, self._num_prefetch)
else:
......
......@@ -73,6 +73,7 @@ class ArrayHeap {
* Sample from arrayHeap
*/
size_t Sample() {
// heap_ is empty
ValueType xi = heap_[1] * RandomEngine::ThreadLocal()->Uniform<float>();
size_t i = 1;
while (i < limit_) {
......@@ -88,12 +89,19 @@ class ArrayHeap {
/*
* Sample a vector by given the size n
*/
void SampleWithoutReplacement(size_t n, std::vector<size_t>* samples) {
size_t SampleWithoutReplacement(size_t n, std::vector<size_t>* samples) {
// sample n elements
for (size_t i = 0; i < n; ++i) {
size_t i = 0;
for (; i < n; ++i) {
// heap is empty
if (heap_[1] == 0) {
break;
}
samples->at(i) = this->Sample();
this->Delete(samples->at(i));
}
return i;
}
private:
......@@ -110,6 +118,8 @@ class EdgeSamplerObject: public Object {
IdArray seed_edges,
const int64_t batch_size,
const int64_t num_workers,
const bool replacement,
const bool reset,
const std::string neg_mode,
const int64_t neg_sample_size,
const bool exclude_positive,
......@@ -121,6 +131,8 @@ class EdgeSamplerObject: public Object {
batch_size_ = batch_size;
num_workers_ = num_workers;
replacement_ = replacement;
reset_ = reset;
neg_mode_ = neg_mode;
neg_sample_size_ = neg_sample_size;
exclude_positive_ = exclude_positive;
......@@ -130,6 +142,7 @@ class EdgeSamplerObject: public Object {
~EdgeSamplerObject() {}
virtual void Fetch(DGLRetValue* rv) = 0;
virtual void Reset() = 0;
protected:
virtual void randomSample(size_t set_size, size_t num, std::vector<size_t>* out) = 0;
......@@ -153,6 +166,8 @@ class EdgeSamplerObject: public Object {
int64_t batch_size_;
int64_t num_workers_;
bool replacement_;
int64_t reset_;
std::string neg_mode_;
int64_t neg_sample_size_;
bool exclude_positive_;
......@@ -1403,6 +1418,8 @@ public:
IdArray seed_edges,
const int64_t batch_size,
const int64_t num_workers,
const bool replacement,
const bool reset,
const std::string neg_mode,
const int64_t neg_sample_size,
const bool exclude_positive,
......@@ -1412,6 +1429,8 @@ public:
seed_edges,
batch_size,
num_workers,
replacement,
reset,
neg_mode,
neg_sample_size,
exclude_positive,
......@@ -1420,6 +1439,7 @@ public:
batch_curr_id_ = 0;
num_seeds_ = seed_edges->shape[0];
max_batch_id_ = (num_seeds_ + batch_size - 1) / batch_size;
// TODO(song): Tricky thing here to make sure gptr_ has coo cache
gptr_->FindEdge(0);
}
......@@ -1436,8 +1456,21 @@ public:
const int64_t start = (batch_curr_id_ + i) * batch_size_;
const int64_t end = std::min(start + batch_size_, num_seeds_);
const int64_t num_edges = end - start;
IdArray worker_seeds = seed_edges_.CreateView({num_edges}, DLDataType{kDLInt, 64, 1},
sizeof(dgl_id_t) * start);
IdArray worker_seeds;
if (replacement_ == false) {
worker_seeds = seed_edges_.CreateView({num_edges}, DLDataType{kDLInt, 64, 1},
sizeof(dgl_id_t) * start);
} else {
std::vector<dgl_id_t> seeds;
// sampling of each edge is a standalone event
for (int64_t i = 0; i < num_edges; ++i) {
seeds.push_back(RandomEngine::ThreadLocal()->RandInt(num_seeds_));
}
worker_seeds = aten::VecToIdArray(seeds);
}
EdgeArray arr = gptr_->FindEdges(worker_seeds);
const dgl_id_t *src_ids = static_cast<const dgl_id_t *>(arr.src->data);
const dgl_id_t *dst_ids = static_cast<const dgl_id_t *>(arr.dst->data);
......@@ -1468,9 +1501,23 @@ public:
}
batch_curr_id_ += num_workers;
if (batch_curr_id_ >= max_batch_id_ && reset_ == true) {
Reset();
}
*rv = List<SubgraphRef>(positive_subgs);
}
void Reset() {
batch_curr_id_ = 0;
if (replacement_ == false) {
// Now we should shuffle the data and reset the sampler.
dgl_id_t *seed_ids = static_cast<dgl_id_t *>(seed_edges_->data);
std::shuffle(seed_ids, seed_ids + seed_edges_->shape[0],
std::default_random_engine());
}
}
DGL_DECLARE_OBJECT_TYPE_INFO(UniformEdgeSamplerObject, Object);
private:
......@@ -1512,11 +1559,13 @@ DGL_REGISTER_GLOBAL("sampling._CAPI_CreateUniformEdgeSampler")
IdArray seed_edges = args[1];
const int64_t batch_size = args[2];
const int64_t max_num_workers = args[3];
const std::string neg_mode = args[4];
const int neg_sample_size = args[5];
const bool exclude_positive = args[6];
const bool check_false_neg = args[7];
IdArray relations = args[8];
const bool replacement = args[4];
const bool reset = args[5];
const std::string neg_mode = args[6];
const int neg_sample_size = args[7];
const bool exclude_positive = args[8];
const bool check_false_neg = args[9];
IdArray relations = args[10];
// process args
auto gptr = std::dynamic_pointer_cast<ImmutableGraph>(g.sptr());
CHECK(gptr) << "sampling isn't implemented in mutable graph";
......@@ -1527,6 +1576,8 @@ DGL_REGISTER_GLOBAL("sampling._CAPI_CreateUniformEdgeSampler")
seed_edges,
batch_size,
max_num_workers,
replacement,
reset,
neg_mode,
neg_sample_size,
exclude_positive,
......@@ -1541,15 +1592,23 @@ DGL_REGISTER_GLOBAL("sampling._CAPI_FetchUniformEdgeSample")
sampler->Fetch(rv);
});
DGL_REGISTER_GLOBAL("sampling._CAPI_ResetUniformEdgeSample")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
UniformEdgeSampler sampler = args[0];
sampler->Reset();
});
template<typename ValueType>
class WeightedEdgeSamplerObject: public EdgeSamplerObject {
public:
public:
explicit WeightedEdgeSamplerObject(const GraphPtr gptr,
IdArray seed_edges,
NDArray edge_weight,
NDArray node_weight,
const int64_t batch_size,
const int64_t num_workers,
const bool replacement,
const bool reset,
const std::string neg_mode,
const int64_t neg_sample_size,
const bool exclude_positive,
......@@ -1559,18 +1618,21 @@ public:
seed_edges,
batch_size,
num_workers,
replacement,
reset,
neg_mode,
neg_sample_size,
exclude_positive,
check_false_neg,
relations) {
const size_t num_edges = edge_weight->shape[0];
const int64_t num_edges = edge_weight->shape[0];
const ValueType *edge_prob = static_cast<const ValueType*>(edge_weight->data);
std::vector<ValueType> eprob(num_edges);
for (size_t i = 0; i < num_edges; ++i) {
for (int64_t i = 0; i < num_edges; ++i) {
eprob[i] = edge_prob[i];
}
edge_selector_ = std::make_shared<ArrayHeap<ValueType>>(eprob);
edge_weight_ = edge_weight;
const size_t num_nodes = node_weight->shape[0];
if (num_nodes == 0) {
......@@ -1584,6 +1646,10 @@ public:
node_selector_ = std::make_shared<ArrayHeap<ValueType>>(nprob);
}
curr_batch_id_ = 0;
// handle int64 overflow here
max_batch_id_ = (num_edges + batch_size - 1) / batch_size;
// TODO(song): Tricky thing here to make sure gptr_ has coo cache
gptr_->FindEdge(0);
}
......@@ -1592,17 +1658,30 @@ public:
}
void Fetch(DGLRetValue* rv) {
const int64_t num_workers = std::min(num_workers_, max_batch_id_ - curr_batch_id_);
// generate subgraphs.
std::vector<SubgraphRef> positive_subgs(num_workers_);
std::vector<SubgraphRef> negative_subgs(num_workers_);
std::vector<SubgraphRef> positive_subgs(num_workers);
std::vector<SubgraphRef> negative_subgs(num_workers);
#pragma omp parallel for
for (int i = 0; i < num_workers_; i++) {
for (int i = 0; i < num_workers; i++) {
const dgl_id_t *seed_edge_ids = static_cast<const dgl_id_t *>(seed_edges_->data);
std::vector<int64_t> edge_ids(batch_size_);
for (int i = 0; i < batch_size_; ++i) {
int64_t edge_id = edge_selector_->Sample();
edge_ids[i] = seed_edge_ids[edge_id];
std::vector<size_t> edge_ids(batch_size_);
if (replacement_ == false) {
size_t n = batch_size_;
size_t num_ids = 0;
#pragma omp critical
num_ids = edge_selector_->SampleWithoutReplacement(n, &edge_ids);
while (edge_ids.size() > num_ids) {
edge_ids.pop_back();
}
} else {
// sampling of each edge is a standalone event
for (int i = 0; i < batch_size_; ++i) {
size_t edge_id = edge_selector_->Sample();
edge_ids[i] = seed_edge_ids[edge_id];
}
}
auto worker_seeds = aten::VecToIdArray(edge_ids, seed_edges_->dtype.bits);
......@@ -1631,14 +1710,33 @@ public:
negative_subgs[i] = ConvertRef(neg_subg);
}
}
curr_batch_id_ += num_workers;
if (curr_batch_id_ >= max_batch_id_ && reset_ == true) {
Reset();
}
if (neg_mode_.size() > 0) {
positive_subgs.insert(positive_subgs.end(), negative_subgs.begin(), negative_subgs.end());
}
*rv = List<SubgraphRef>(positive_subgs);
}
void Reset() {
curr_batch_id_ = 0;
if (replacement_ == false) {
const int64_t num_edges = edge_weight_->shape[0];
const ValueType *edge_prob = static_cast<const ValueType*>(edge_weight_->data);
std::vector<ValueType> eprob(num_edges);
for (int64_t i = 0; i < num_edges; ++i) {
eprob[i] = edge_prob[i];
}
// rebuild the edge_selector_
edge_selector_ = std::make_shared<ArrayHeap<ValueType>>(eprob);
}
}
DGL_DECLARE_OBJECT_TYPE_INFO(WeightedEdgeSamplerObject<ValueType>, Object);
private:
......@@ -1699,6 +1797,10 @@ private:
private:
std::shared_ptr<ArrayHeap<ValueType>> edge_selector_;
std::shared_ptr<ArrayHeap<ValueType>> node_selector_;
NDArray edge_weight_;
int64_t curr_batch_id_;
int64_t max_batch_id_;
};
template class WeightedEdgeSamplerObject<float>;
......@@ -1729,11 +1831,13 @@ DGL_REGISTER_GLOBAL("sampling._CAPI_CreateWeightedEdgeSampler")
NDArray node_weight = args[3];
const int64_t batch_size = args[4];
const int64_t max_num_workers = args[5];
const std::string neg_mode = args[6];
const int64_t neg_sample_size = args[7];
const bool exclude_positive = args[8];
const bool check_false_neg = args[9];
IdArray relations = args[10];
const bool replacement = args[6];
const bool reset = args[7];
const std::string neg_mode = args[8];
const int64_t neg_sample_size = args[9];
const bool exclude_positive = args[10];
const bool check_false_neg = args[11];
IdArray relations = args[12];
auto gptr = std::dynamic_pointer_cast<ImmutableGraph>(g.sptr());
CHECK(gptr) << "sampling isn't implemented in mutable graph";
......@@ -1756,6 +1860,8 @@ DGL_REGISTER_GLOBAL("sampling._CAPI_CreateWeightedEdgeSampler")
node_weight,
batch_size,
num_workers,
replacement,
reset,
neg_mode,
neg_sample_size,
exclude_positive,
......@@ -1770,4 +1876,10 @@ DGL_REGISTER_GLOBAL("sampling._CAPI_FetchWeightedEdgeSample")
sampler->Fetch(rv);
});
DGL_REGISTER_GLOBAL("sampling._CAPI_ResetWeightedEdgeSample")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
FloatWeightedEdgeSampler sampler = args[0];
sampler->Reset();
});
} // namespace dgl
......@@ -252,11 +252,12 @@ def check_negative_sampler(mode, exclude_positive, neg_size):
EdgeSampler = getattr(dgl.contrib.sampling, 'EdgeSampler')
# Test the homogeneous graph.
total_samples = 0
batch_size = 50
max_samples = num_edges
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
negative_mode=mode,
reset=False,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
......@@ -289,15 +290,80 @@ def check_negative_sampler(mode, exclude_positive, neg_size):
assert np.sum(F.asnumpy(exist) == 0) == len(exist)
else:
assert F.array_equal(g.has_edges_between(neg_src, neg_dst), exist)
total_samples += batch_size
assert total_samples <= num_edges
# check replacement = True
# with reset = False (default setting)
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=False,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = False
# with reset = False (default setting)
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=False,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = True
# with reset = True
total_samples = 0
max_samples = 2 * num_edges
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=True,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) <= batch_size
total_samples += len(pos_leid)
if (total_samples >= max_samples):
break
assert total_samples >= max_samples
# check replacement = False
# with reset = True
total_samples = 0
max_samples = 2 * num_edges
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=True,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) <= batch_size
total_samples += len(pos_leid)
if (total_samples >= max_samples):
break
assert total_samples >= max_samples
# Test the knowledge graph.
total_samples = 0
for _, neg_edges in EdgeSampler(g, batch_size,
negative_mode=mode,
reset=False,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
......@@ -316,8 +382,7 @@ def check_negative_sampler(mode, exclude_positive, neg_size):
exist = neg_edges.edata['etype'][i] == etype
assert F.asnumpy(exists[i]) == F.asnumpy(exist)
total_samples += batch_size
if (total_samples >= max_samples):
break
assert total_samples <= num_edges
def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
g = generate_rand_graph(100)
......@@ -339,9 +404,10 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
# Correctness check
# Test the homogeneous graph.
batch_size = 50
# Test the knowledge graph with edge weight provied.
total_samples = 0
max_samples = num_edges
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
reset=False,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
......@@ -376,12 +442,12 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
else:
assert F.array_equal(g.has_edges_between(neg_src, neg_dst), exist)
total_samples += batch_size
if (total_samples >= max_samples):
break
assert total_samples <= num_edges
# Test the knowledge graph with edge weight provied.
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
reset=False,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
......@@ -402,12 +468,12 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
exist = neg_edges.edata['etype'][i] == etype
assert F.asnumpy(exists[i]) == F.asnumpy(exist)
total_samples += batch_size
if (total_samples >= max_samples):
break
assert total_samples <= num_edges
# Test the knowledge graph with edge/node weight provied.
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
reset=False,
edge_weight=edge_weight,
node_weight=node_weight,
negative_mode=mode,
......@@ -429,8 +495,80 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
exist = neg_edges.edata['etype'][i] == etype
assert F.asnumpy(exists[i]) == F.asnumpy(exist)
total_samples += batch_size
if (total_samples >= max_samples):
assert total_samples <= num_edges
# check replacement = True with pos edges no-uniform sample
# with reset = False
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=False,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = True with pos edges no-uniform sample
# with reset = True
total_samples = 0
max_samples = 4 * num_edges
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=True,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
if total_samples >= max_samples:
break
assert total_samples == max_samples
# check replacement = False with pos/neg edges no-uniform sample
# reset = False
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=False,
edge_weight=edge_weight,
node_weight=node_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = False with pos/neg edges no-uniform sample
# reset = True
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=True,
edge_weight=edge_weight,
node_weight=node_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
if total_samples >= max_samples:
break
assert total_samples == max_samples
# Check Rate
dgl.random.seed(0)
......@@ -445,12 +583,13 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
g.edata['etype'] = F.copy_to(F.tensor(etype), F.cpu())
# Test w/o node weight.
max_samples = num_edges / 5
# Test the knowledge graph with edge weight provied.
max_samples = num_edges // 5
total_samples = 0
# Test the knowledge graph with edge weight provied.
edge_sampled = np.full((num_edges,), 0, dtype=np.int32)
node_sampled = np.full((num_nodes,), 0, dtype=np.int32)
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
......@@ -465,11 +604,11 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
else:
neg_dst = neg_edges.parent_nid[neg_ldst]
np.add.at(node_sampled, F.asnumpy(neg_dst), 1)
np.add.at(edge_sampled, F.asnumpy(F.gather_row(pos_edges.parent_eid, pos_leid)), 1)
np.add.at(edge_sampled, F.asnumpy(pos_edges.parent_eid[pos_leid]), 1)
total_samples += batch_size
if (total_samples >= max_samples):
if total_samples > max_samples:
break
# Check rate here
edge_rate_0 = edge_sampled[0] / edge_sampled.sum()
edge_tail_half_cnt = edge_sampled[edge_sampled.shape[0] // 2:-1].sum()
......@@ -484,10 +623,11 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
assert np.allclose(node_rate_tail_half, 0.5, atol=0.02)
# Test the knowledge graph with edge/node weight provied.
total_samples = 0
edge_sampled = np.full((num_edges,), 0, dtype=np.int32)
node_sampled = np.full((num_nodes,), 0, dtype=np.int32)
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
edge_weight=edge_weight,
node_weight=node_weight,
negative_mode=mode,
......@@ -503,10 +643,9 @@ def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
else:
neg_dst = F.gather_row(neg_edges.parent_nid, neg_ldst)
np.add.at(node_sampled, F.asnumpy(neg_dst), 1)
np.add.at(edge_sampled, F.asnumpy(F.gather_row(pos_edges.parent_eid, pos_leid)), 1)
np.add.at(edge_sampled, F.asnumpy(pos_edges.parent_eid[pos_leid]), 1)
total_samples += batch_size
if (total_samples >= max_samples):
if total_samples > max_samples:
break
# Check rate here
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment