test_knn.py 2.63 KB
Newer Older
rusty1s's avatar
rusty1s committed
1
2
3
4
from itertools import product

import pytest
import torch
rusty1s's avatar
rusty1s committed
5
from torch_cluster import knn, knn_graph
6
import pickle
rusty1s's avatar
rusty1s committed
7
from .utils import grad_dtypes, devices, tensor
rusty1s's avatar
rusty1s committed
8
9
10


@pytest.mark.parametrize('dtype,device', product(grad_dtypes, devices))
rusty1s's avatar
rusty1s committed
11
def test_knn(dtype, device):
rusty1s's avatar
rusty1s committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    x = tensor([
        [-1, -1],
        [-1, +1],
        [+1, +1],
        [+1, -1],
        [-1, -1],
        [-1, +1],
        [+1, +1],
        [+1, -1],
    ], dtype, device)
    y = tensor([
        [1, 0],
        [-1, 0],
    ], dtype, device)

    batch_x = tensor([0, 0, 0, 0, 1, 1, 1, 1], torch.long, device)
    batch_y = tensor([0, 1], torch.long, device)

rusty1s's avatar
rusty1s committed
30
31
32
33
34
35
    row, col = knn(x, y, 2, batch_x, batch_y)
    col = col.view(-1, 2).sort(dim=-1)[0].view(-1)

    assert row.tolist() == [0, 0, 1, 1]
    assert col.tolist() == [2, 3, 4, 5]

rusty1s's avatar
rusty1s committed
36
37
38
39
40
    if x.is_cuda:
        row, col = knn(x, y, 2, batch_x, batch_y, cosine=True)
        assert row.tolist() == [0, 0, 1, 1]
        assert col.tolist() == [0, 1, 4, 5]

rusty1s's avatar
rusty1s committed
41
42
43
44
45
46
47
48
49
50

@pytest.mark.parametrize('dtype,device', product(grad_dtypes, devices))
def test_knn_graph(dtype, device):
    x = tensor([
        [-1, -1],
        [-1, +1],
        [+1, +1],
        [+1, -1],
    ], dtype, device)

rusty1s's avatar
rusty1s committed
51
    row, col = knn_graph(x, k=2, flow='target_to_source')
rusty1s's avatar
rusty1s committed
52
53
54
    col = col.view(-1, 2).sort(dim=-1)[0].view(-1)
    assert row.tolist() == [0, 0, 1, 1, 2, 2, 3, 3]
    assert col.tolist() == [1, 3, 0, 2, 1, 3, 0, 2]
rusty1s's avatar
rusty1s committed
55
56
57
58
59

    row, col = knn_graph(x, k=2, flow='source_to_target')
    row = row.view(-1, 2).sort(dim=-1)[0].view(-1)
    assert row.tolist() == [1, 3, 0, 2, 1, 3, 0, 2]
    assert col.tolist() == [0, 0, 1, 1, 2, 2, 3, 3]
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91


@pytest.mark.parametrize('dtype,device', product(grad_dtypes, devices))
def test_knn_graph_large(dtype, device):
    d = pickle.load(open("test/knn_test_large.pkl", "rb"))
    x = d['x'].to(device)
    k = d['k']
    truth = d['edges']

    row, col = knn_graph(x, k=k, flow='source_to_target',
                         batch=None, n_threads=24)

    edges = set([(i, j) for (i, j) in zip(list(row.cpu().numpy()),
                                          list(col.cpu().numpy()))])

    assert(truth == edges)

    row, col = knn_graph(x, k=k, flow='source_to_target',
                         batch=None, n_threads=12)

    edges = set([(i, j) for (i, j) in zip(list(row.cpu().numpy()),
                                          list(col.cpu().numpy()))])

    assert(truth == edges)

    row, col = knn_graph(x, k=k, flow='source_to_target',
                         batch=None, n_threads=1)

    edges = set([(i, j) for (i, j) in zip(list(row.cpu().numpy()),
                                          list(col.cpu().numpy()))])

    assert(truth == edges)