Commit b69d826f authored by rusty1s's avatar rusty1s
Browse files

rename

parents
from .spline_conv import spline_conv
__all__ = ['spline_conv']
import torch
from .edgewise_spline_weighting_cpu import EdgewiseSplineWeightingCPU
if torch.cuda.is_available():
from .edgewise_spline_weighting_gpu import EdgewiseSplineWeightingGPU
def edgewise_spline_weighting(input, weight, amount, index):
if input.is_cuda:
return EdgewiseSplineWeightingGPU(amount, index)(input, weight)
else:
return EdgewiseSplineWeightingCPU(amount, index)(input, weight)
import torch
from torch.autograd import Function
class EdgewiseSplineWeightingCPU(Function):
def __init__(self, amount, index):
super(EdgewiseSplineWeightingCPU, self).__init__()
self.amount = amount
self.index = index
def forward(self, input, weight):
self.save_for_backward(input, weight)
_, M_in, M_out = weight.size()
k_max = self.amount.size(1)
output = input.new(input.size(0), M_out).fill_(0)
for k in range(k_max):
b = self.amount[:, k] # [|E|]
c = self.index[:, k] # [|E|]
for i in range(M_in):
w = weight[:, i] # [K x M_out]
w = w[c] # [|E| x M_out]
f = input[:, i] # [|E|]
# Need to transpose twice, so we can make use of broadcasting.
output += (f * b * w.t()).t() # [|E| x M_out]
return output
def backward(self, grad_output):
input, weight = self.saved_tensors
K, M_in, M_out = weight.size()
k_max = self.amount.size(1)
num_edges = input.size(0)
grad_input = grad_output.new(num_edges, M_in).fill_(0)
grad_weight = grad_output.new(K, M_in, M_out).fill_(0)
for k in range(k_max):
b = self.amount[:, k] # [|E|]
c = self.index[:, k] # [|E|]
c_expand = c.contiguous().view(-1, 1).expand(c.size(0), M_out)
for i in range(M_in):
w = weight[:, i] # [K x M_out]
w = w[c] # [|E| x M_out]
f = b * torch.sum(grad_output * w, dim=1) # [|E|]
grad_input[:, i] += f
f = input[:, i] # [|E|]
w_grad = (f * b * grad_output.t()).t() # [|E|, M_out]
grad_weight[:, i, :].scatter_add_(0, c_expand, w_grad)
return grad_input, grad_weight
import torch
from torch.autograd import Function
from ....utils.cuda import (cuda_num_threads, Stream, Dtype, load_kernel,
kernel_loop, get_blocks)
_edgewise_spline_weighting_forward_kernel = kernel_loop + '''
extern "C"
__global__ void edgewise_spline_weighting_forward_kernel(
const ${Dtype}* input, const ${Dtype}* weight, ${Dtype}* output,
const ${Dtype}* amount, const long* index) {
CUDA_KERNEL_LOOP(idx, ${num_threads}) {
const int e_idx = idx / ${M_out};
const int m_out_idx = idx % ${M_out};
${Dtype} result = 0.0;
${Dtype} w;
${Dtype} f;
int k;
${Dtype} b;
long c;
long w_idx;
for (int k_idx = 0; k_idx < ${k_max}; k_idx++) {
k = e_idx * ${k_max} + k_idx;
b = amount[k];
c = index[k];
for (int m_in_idx = 0; m_in_idx < ${M_in}; m_in_idx++) {
w_idx = c * ${M_out} * ${M_in} +
m_in_idx * ${M_out} +
m_out_idx;
w = weight[w_idx];
f = input[e_idx * ${M_in} + m_in_idx];
result += b * w * f;
}
}
output[idx] = result;
}
}
'''
_edgewise_spline_weighting_backward_kernel = kernel_loop + '''
extern "C"
__global__ void edgewise_spline_weighting_backward_kernel(
const ${Dtype}* grad_output, ${Dtype}* grad_input, ${Dtype}* grad_weight,
const ${Dtype}* input, const ${Dtype}* weight, const ${Dtype}* amount,
const long* index) {
CUDA_KERNEL_LOOP(idx, ${num_threads}) {
const int e_idx = idx / ${M_out};
const int m_out_idx = idx % ${M_out};
${Dtype} w;
${Dtype} g;
${Dtype} f;
${Dtype} w_grad;
int k;
${Dtype} b;
long c;
long w_idx;
for (int k_idx = 0; k_idx < ${k_max}; k_idx++) {
k = e_idx * ${k_max} + k_idx;
b = amount[k];
c = index[k];
for (int m_in_idx = 0; m_in_idx < ${M_in}; m_in_idx++) {
w_idx = c * ${M_out} * ${M_in} +
m_in_idx * ${M_out} +
m_out_idx;
w = weight[w_idx];
// Calculate input gradient.
g = grad_output[e_idx * ${M_out} + m_out_idx];
atomicAdd(&(grad_input[e_idx * ${M_in} + m_in_idx]), b * w * g);
// This is inefficient: `reduce_sum` shouldn't be done like this.
// Looping over `M_out` would be better to avoid the `atomicAdd`.
// Calculate weight gradient.
f = input[e_idx * ${M_in} + m_in_idx];
w_grad = f * b * grad_output[e_idx * ${M_out} + m_out_idx];
atomicAdd(&(grad_weight[w_idx]), w_grad);
// Not so efficient either, but not avoidable.
}
}
}
}
'''
class EdgewiseSplineWeightingGPU(Function):
def __init__(self, amount, index):
super(EdgewiseSplineWeightingGPU, self).__init__()
assert amount.is_cuda and index.is_cuda
self.amount = amount
self.index = index
def forward(self, input, weight):
assert input.is_cuda and weight.is_cuda
self.save_for_backward(input, weight)
_, M_in, M_out = weight.size()
k_max = self.amount.size(1)
output = input.new(input.size(0), M_out)
num_threads = output.numel()
with torch.cuda.device_of(input):
f = load_kernel(
'edgewise_spline_weighting_forward_kernel',
_edgewise_spline_weighting_forward_kernel,
Dtype=Dtype(input),
num_threads=num_threads,
M_in=M_in,
M_out=M_out,
k_max=k_max)
f(block=(cuda_num_threads, 1, 1),
grid=(get_blocks(num_threads), 1, 1),
args=[
input.data_ptr(),
weight.data_ptr(),
output.data_ptr(),
self.amount.data_ptr(),
self.index.data_ptr()
],
stream=Stream(ptr=torch.cuda.current_stream().cuda_stream))
return output
def backward(self, grad_output):
input, weight = self.saved_tensors
K, M_in, M_out = weight.size()
k_max = self.amount.size(1)
num_edges = input.size(0)
grad_input = grad_output.new(num_edges, M_in).fill_(0)
grad_weight = grad_output.new(K, M_in, M_out).fill_(0)
num_threads = grad_output.numel()
with torch.cuda.device_of(grad_output):
f = load_kernel(
'edgewise_spline_weighting_backward_kernel',
_edgewise_spline_weighting_backward_kernel,
Dtype=Dtype(input),
num_threads=num_threads,
num_edges=num_edges,
M_in=M_in,
M_out=M_out,
k_max=k_max,
K=K)
f(block=(cuda_num_threads, 1, 1),
grid=(get_blocks(num_threads), 1, 1),
args=[
grad_output.data_ptr(),
grad_input.data_ptr(),
grad_weight.data_ptr(),
input.data_ptr(),
weight.data_ptr(),
self.amount.data_ptr(),
self.index.data_ptr()
],
stream=Stream(ptr=torch.cuda.current_stream().cuda_stream))
return grad_input, grad_weight
import unittest
import torch
from torch.autograd import Variable, gradcheck
from numpy.testing import assert_equal
from .spline import spline
if torch.cuda.is_available():
from .edgewise_spline_weighting_gpu import EdgewiseSplineWeightingGPU
class EdgewiseSplineWeightingGPUTest(unittest.TestCase):
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_forward(self):
input = [[0.25, 0.125], [0.25, 0.375], [0.75, 0.625], [0.75, 0.875]]
input = torch.FloatTensor(input)
kernel_size = torch.LongTensor([3, 4])
is_open_spline = torch.LongTensor([1, 0])
amount, index = spline(input, kernel_size, is_open_spline, 12, 1)
amount, index = amount.cuda(), index.cuda()
input = torch.FloatTensor([[1, 2], [3, 4], [5, 6], [7, 8]])
weight = torch.arange(0.5, 0.5 * 25, step=0.5).view(12, 2, 1)
input, weight = input.cuda(), weight.cuda()
input, weight = Variable(input), Variable(weight)
op = EdgewiseSplineWeightingGPU(amount, index)
out = op(input, weight)
expected_out = [
[0.25 * (1 * (0.5 + 1.5 + 4.5 + 5.5) + 2 * (1 + 2 + 5 + 6))],
[0.25 * (3 * (1.5 + 2.5 + 5.5 + 6.5) + 4 * (2 + 3 + 6 + 7))],
[0.25 * (5 * (6.5 + 7.5 + 10.5 + 11.5) + 6 * (7 + 8 + 11 + 12))],
[0.25 * (7 * (4.5 + 7.5 + 8.5 + 11.5) + 8 * (5 + 8 + 9 + 12))],
]
assert_equal(out.cpu().data.numpy(), expected_out)
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_backward(self):
input = [[0.25, 0.125], [0.25, 0.375], [0.75, 0.625], [0.75, 0.875]]
input = torch.DoubleTensor(input)
kernel_size = torch.LongTensor([3, 4])
is_open_spline = torch.LongTensor([1, 0])
amount, index = spline(input, kernel_size, is_open_spline, 12, 1)
amount, index = amount.cuda(), index.cuda()
input = torch.randn(4, 2).double()
weight = torch.randn(12, 2, 1).double()
input, weight = input.cuda(), weight.cuda()
input = Variable(input, requires_grad=True)
weight = Variable(weight, requires_grad=True)
op = EdgewiseSplineWeightingGPU(amount, index)
test = gradcheck(op, (input, weight), eps=1e-6, atol=1e-4)
self.assertTrue(test)
import torch
from .spline_cpu import spline_cpu
if torch.cuda.is_available():
from .spline_linear_gpu import spline_linear_gpu
from .spline_quadratic_gpu import spline_quadratic_gpu
from .spline_cubic_gpu import spline_cubic_gpu
def spline(input, kernel_size, is_open_spline, K, degree):
if input.is_cuda:
if degree == 1:
return spline_linear_gpu(input, kernel_size, is_open_spline, K)
if degree == 2:
return spline_quadratic_gpu(input, kernel_size, is_open_spline, K)
if degree == 3:
return spline_cubic_gpu(input, kernel_size, is_open_spline, K)
else:
raise NotImplementedError()
else:
return spline_cpu(input, kernel_size, is_open_spline, degree)
import torch
from torch.autograd import Variable
from .spline import spline
from .edgewise_spline_weighting import edgewise_spline_weighting
def spline_conv(
adj, # Tensor
input, # Variable
weight, # Variable
kernel_size,
is_open_spline,
K,
degree=1,
bias=None):
values = adj._values()
row, col = adj._indices()
# Get features for every end vertex with shape [|E| x M_in].
output = input[col]
# Convert to [|E| x M_in] feature matrix and calculate [|E| x M_out].
amount, index = spline(values, kernel_size, is_open_spline, K, degree)
output = edgewise_spline_weighting(output, weight[:-1], amount, index)
# Convolution via `scatter_add`. Converts [|E| x M_out] feature matrix to
# [n x M_out] feature matrix.
zero = output.data.new(adj.size(1), output.size(1)).fill_(0.0)
zero = Variable(zero) if not torch.is_tensor(output) else zero
r = row.view(-1, 1).expand(row.size(0), output.size(1))
output = zero.scatter_add_(0, Variable(r), output)
# Weighten root node features by multiplying with the meaned weights from
# the origin.
output += torch.mm(input, weight[-1])
# Normalize output by degree.
ones = values.new(values.size(0)).fill_(1)
zero = values.new(output.size(0)).fill_(0)
degree = zero.scatter_add_(0, row, ones)
degree = torch.clamp(degree, min=1)
output = output / Variable(degree.view(-1, 1))
if bias is not None:
output += bias
return output
from __future__ import division
from unittest import TestCase
import torch
from torch.autograd import Variable
from numpy.testing import assert_almost_equal
from .spline_conv import spline_conv
class SplineGcnTest(TestCase):
def test_forward_cpu(self):
edges = torch.LongTensor([[0, 0, 0, 0], [1, 2, 3, 4]])
values = [[0.25, 0.125], [0.25, 0.375], [0.75, 0.625], [0.75, 0.875]]
values = torch.FloatTensor(values)
adj = torch.sparse.FloatTensor(edges, values, torch.Size([5, 5, 2]))
kernel_size = torch.LongTensor([3, 4])
is_open_spline = torch.LongTensor([1, 0])
input = torch.FloatTensor([[9, 10], [1, 2], [3, 4], [5, 6], [7, 8]])
weight = torch.arange(0.5, 0.5 * 27, step=0.5).view(13, 2, 1)
input, weight = Variable(input), Variable(weight)
output = spline_conv(
adj, input, weight, kernel_size, is_open_spline, K=12, degree=1)
expected_output = [
[(12.5 * 9 + 13 * 10 + 266) / 4],
[12.5 * 1 + 13 * 2],
[12.5 * 3 + 13 * 4],
[12.5 * 5 + 13 * 6],
[12.5 * 7 + 13 * 8],
]
assert_almost_equal(output.cpu().data.numpy(), expected_output, 1)
def test_forward_gpu(self):
if not torch.cuda.is_available():
return
edges = torch.LongTensor([[0, 0, 0, 0], [1, 2, 3, 4]])
values = [[0.25, 0.125], [0.25, 0.375], [0.75, 0.625], [0.75, 0.875]]
values = torch.FloatTensor(values)
adj = torch.sparse.FloatTensor(edges, values, torch.Size([5, 5, 2]))
kernel_size = torch.cuda.LongTensor([3, 4])
is_open_spline = torch.cuda.LongTensor([1, 0])
input = torch.FloatTensor([[9, 10], [1, 2], [3, 4], [5, 6], [7, 8]])
weight = torch.arange(0.5, 0.5 * 27, step=0.5).view(13, 2, 1)
adj, input, weight = adj.cuda(), input.cuda(), weight.cuda()
input, weight = Variable(input), Variable(weight)
output = spline_conv(
adj, input, weight, kernel_size, is_open_spline, K=12, degree=1)
expected_output = [
[(12.5 * 9 + 13 * 10 + 266) / 4],
[12.5 * 1 + 13 * 2],
[12.5 * 3 + 13 * 4],
[12.5 * 5 + 13 * 6],
[12.5 * 7 + 13 * 8],
]
assert_almost_equal(output.cpu().data.numpy(), expected_output, 1)
def test_backward_cpu(self):
pass
def test_backward_gpu(self):
pass
from functools import reduce
from itertools import product
import torch
def _spline_cpu(input, kernel_size, is_open_spline, degree):
"""
Args:
input (Tensor): 1d or 2d tensor.
kernel_size (list)
is_open_spline (list)
spline_degree (int, optional): B-Spline degree. (default: 1)
"""
if degree != 1:
raise NotImplementedError()
input = input.unsqueeze(1) if len(input.size()) < 2 else input
input = input * (kernel_size - is_open_spline).type_as(input)
amount = input.frac()
amount = torch.stack([amount, 1 - amount], dim=len(input.size()))
bot = input.floor().long()
top = (bot + 1) % kernel_size
bot %= kernel_size
index = torch.stack([top, bot], dim=len(input.size()))
return amount, index
def _create_mask(dim, m, type=torch.LongTensor):
mask = list(product(*[range(m) for _ in range(dim)]))
mask = torch.LongTensor(mask).type(type)
mask += torch.arange(0, dim * m, m).type_as(mask)
return mask
def spline_cpu(input, kernel_size, is_open_spline, degree):
amount, index = _spline_cpu(input, kernel_size, is_open_spline, degree)
dim = amount.size(1)
m = amount.size(2)
mask = _create_mask(dim, m, index.type())
amount = amount.view(-1, m * dim)
amount = amount[:, mask.view(-1)]
amount = amount.view(-1, m**dim, dim)
amount = amount.prod(2)
off = [reduce(lambda x, y: x * y, kernel_size[i:]) for i in range(1, dim)]
off.append(1)
off = torch.LongTensor([off]).type_as(index).t()
index = off * index
index = index.view(-1, m * dim)
index = index[:, mask.view(-1)]
index = index.view(-1, m**dim, dim)
index = index.sum(2)
return amount, index
from unittest import TestCase
import torch
from numpy.testing import assert_equal, assert_almost_equal
from .spline_cpu import spline_cpu
class SplineCPUTest(TestCase):
def test_open_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([5])
is_open_spline = torch.LongTensor([1])
amount, index = spline_cpu(input, kernel_size, is_open_spline, 1)
a = [[0, 1], [0.2, 0.8], [0, 1], [0, 1], [0, 1], [0.8, 0.2], [0, 1]]
i = [[1, 0], [1, 0], [2, 1], [3, 2], [4, 3], [4, 3], [0, 4]]
assert_almost_equal(amount.numpy(), a, 1)
assert_equal(index.numpy(), i)
def test_closed_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([4])
is_open_spline = torch.LongTensor([0])
amount, index = spline_cpu(input, kernel_size, is_open_spline, 1)
a = [[0, 1], [0.2, 0.8], [0, 1], [0, 1], [0, 1], [0.8, 0.2], [0, 1]]
i = [[1, 0], [1, 0], [2, 1], [3, 2], [0, 3], [0, 3], [1, 0]]
assert_almost_equal(amount.numpy(), a, 1)
assert_equal(index.numpy(), i)
def test_spline_2d(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
input = torch.stack([input, input], dim=1)
kernel_size = torch.LongTensor([5, 4])
is_open_spline = torch.LongTensor([1, 0])
amount, index = spline_cpu(input, kernel_size, is_open_spline, 1)
expected_amount = [
[0, 0, 0, 1],
[0.04, 0.16, 0.16, 0.64],
[0, 0, 0, 1],
[0, 0, 0, 1],
[0, 0, 0, 1],
[0.64, 0.16, 0.16, 0.04],
[0, 0, 0, 1],
]
expected_index = [
[5, 4, 1, 0],
[5, 4, 1, 0],
[10, 9, 6, 5],
[15, 14, 11, 10],
[16, 19, 12, 15],
[16, 19, 12, 15],
[1, 0, 17, 16],
]
assert_almost_equal(amount.numpy(), expected_amount, 2)
assert_equal(index.numpy(), expected_index)
def test_spline_3d(self):
input = torch.FloatTensor([0.05, 0.25, 0.5, 0.75, 0.95, 1])
input = torch.stack([input, input, input], dim=1)
kernel_size = torch.LongTensor([5, 4, 4])
is_open_spline = torch.LongTensor([1, 0, 0])
amount, index = spline_cpu(input, kernel_size, is_open_spline, 1)
self.assertEqual(list(amount.size()), [6, 8])
self.assertEqual(list(index.size()), [6, 8])
self.assertLess(index.max(), 5 * 4 * 4)
import torch
from ....utils.cuda import (cuda_num_threads, Stream, Dtype, load_kernel,
kernel_loop, get_blocks)
_spline_kernel = kernel_loop + '''
extern "C"
__global__ void spline_kernel(
const ${Dtype}* input, ${Dtype}* amount, long* index,
const long* kernel_size, const long* is_open_spline) {
CUDA_KERNEL_LOOP(idx, ${num_threads}) {
const int e_idx = idx / ${k_max};
int k_idx = idx % ${k_max};
int K = ${K};
int k_idx_mod;
int pos;
${Dtype} value;
${Dtype} frac;
${Dtype} a = 1.0;
long i = 0;
for (int d_idx = 0; d_idx < ${dim}; d_idx++) {
K /= kernel_size[d_idx];
k_idx_mod = k_idx % 4;
k_idx /= 4;
value = input[e_idx * ${dim} + d_idx] *
(kernel_size[d_idx] - (3 * is_open_spline[d_idx]));
frac = value - floor(value);
if (k_idx_mod == 0) a *= (1 - frac) * (1 - frac) * (1 - frac) / 6.0;
else if (k_idx_mod == 1)
a *= (3 * frac * frac * frac - 6 * frac * frac + 4) / 6.0;
else if (k_idx_mod == 2)
a *= (-3 * frac * frac * frac + 3 * frac * frac + 3 * frac + 1) / 6.0;
else a *= frac * frac * frac / 6.0;
pos = int(floor(value)) + k_idx_mod;
pos %= kernel_size[d_idx];
i += pos * K;
}
amount[idx] = a;
index[idx] = i;
}
}
'''
def spline_cubic_gpu(input, kernel_size, is_open_spline, K):
assert input.is_cuda and kernel_size.is_cuda and is_open_spline.is_cuda
input = input.unsqueeze(1) if len(input.size()) < 2 else input
num_edges, dim = input.size()
k_max = 4**dim
amount = input.new(num_edges, k_max)
index = input.new(num_edges, k_max).long()
num_threads = amount.numel()
with torch.cuda.device_of(input):
f = load_kernel(
'spline_kernel',
_spline_kernel,
Dtype=Dtype(input),
num_threads=num_threads,
k_max=k_max,
dim=dim,
K=K)
f(block=(cuda_num_threads, 1, 1),
grid=(get_blocks(num_threads), 1, 1),
args=[
input.data_ptr(),
amount.data_ptr(),
index.data_ptr(),
kernel_size.data_ptr(),
is_open_spline.data_ptr()
],
stream=Stream(ptr=torch.cuda.current_stream().cuda_stream))
return amount, index
import unittest
import torch
from numpy.testing import assert_equal, assert_almost_equal
if torch.cuda.is_available():
from .spline_cubic_gpu import spline_cubic_gpu
class SplineQuadraticGPUTest(unittest.TestCase):
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_open_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([7])
is_open_spline = torch.LongTensor([1])
a1, i1 = spline_cubic_gpu(input.cuda(),
kernel_size.cuda(), is_open_spline.cuda(), 7)
a2 = [
[0.1667, 0.6667, 0.1667, 0],
[0.0853, 0.6307, 0.2827, 0.0013],
[0.1667, 0.6667, 0.1667, 0],
[0.1667, 0.6667, 0.1667, 0],
[0.1667, 0.6667, 0.1667, 0],
[0.0013, 0.2827, 0.6307, 0.0853],
[0.1667, 0.6667, 0.1667, 0],
]
i2 = [[0, 1, 2, 3], [0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5],
[3, 4, 5, 6], [3, 4, 5, 6], [4, 5, 6, 0]]
assert_almost_equal(a1.cpu().numpy(), a2, 4)
assert_equal(i1.cpu().numpy(), i2)
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_closed_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([4])
is_open_spline = torch.LongTensor([0])
a1, i1 = spline_cubic_gpu(input.cuda(),
kernel_size.cuda(), is_open_spline.cuda(), 4)
a2 = [
[0.1667, 0.6667, 0.1667, 0],
[0.0853, 0.6307, 0.2827, 0.0013],
[0.1667, 0.6667, 0.1667, 0],
[0.1667, 0.6667, 0.1667, 0],
[0.1667, 0.6667, 0.1667, 0],
[0.0013, 0.2827, 0.6307, 0.0853],
[0.1667, 0.6667, 0.1667, 0],
]
i2 = [[0, 1, 2, 3], [0, 1, 2, 3], [1, 2, 3, 0], [2, 3, 0, 1],
[3, 0, 1, 2], [3, 0, 1, 2], [0, 1, 2, 3]]
assert_almost_equal(a1.cpu().numpy(), a2, 4)
assert_equal(i1.cpu().numpy(), i2)
import torch
from ....utils.cuda import (cuda_num_threads, Stream, Dtype, load_kernel,
kernel_loop, get_blocks)
_spline_kernel = kernel_loop + '''
extern "C"
__global__ void spline_kernel(
const ${Dtype}* input, ${Dtype}* amount, long* index,
const long* kernel_size, const long* is_open_spline) {
CUDA_KERNEL_LOOP(idx, ${num_threads}) {
const int e_idx = idx / ${k_max};
int k_idx = idx % ${k_max};
int K = ${K};
int k_idx_mod;
int bot;
int top;
${Dtype} value;
${Dtype} frac;
${Dtype} a = 1.0;
long i = 0;
for (int d_idx = 0; d_idx < ${dim}; d_idx++) {
K /= kernel_size[d_idx];
k_idx_mod = k_idx % 2;
k_idx >>= 1;
value = input[e_idx * ${dim} + d_idx] *
(kernel_size[d_idx] - is_open_spline[d_idx]);
frac = value - floor(value);
a *= (1 - k_idx_mod) * frac + k_idx_mod * (1 - frac);
bot = int(floor(value));
top = (bot + 1) % kernel_size[d_idx];
bot %= kernel_size[d_idx];
i += (k_idx_mod * bot + (1 - k_idx_mod) * top) * K;
}
amount[idx] = a;
index[idx] = i;
}
}
'''
def spline_linear_gpu(input, kernel_size, is_open_spline, K):
assert input.is_cuda and kernel_size.is_cuda and is_open_spline.is_cuda
input = input.unsqueeze(1) if len(input.size()) < 2 else input
num_edges, dim = input.size()
k_max = 2**dim
amount = input.new(num_edges, k_max)
index = input.new(num_edges, k_max).long()
num_threads = amount.numel()
with torch.cuda.device_of(input):
f = load_kernel(
'spline_kernel',
_spline_kernel,
Dtype=Dtype(input),
num_threads=num_threads,
k_max=k_max,
dim=dim,
K=K)
f(block=(cuda_num_threads, 1, 1),
grid=(get_blocks(num_threads), 1, 1),
args=[
input.data_ptr(),
amount.data_ptr(),
index.data_ptr(),
kernel_size.data_ptr(),
is_open_spline.data_ptr()
],
stream=Stream(ptr=torch.cuda.current_stream().cuda_stream))
return amount, index
import unittest
import torch
from numpy.testing import assert_equal
from .spline_cpu import spline_cpu
if torch.cuda.is_available():
from .spline_linear_gpu import spline_linear_gpu
class SplineLinearGPUTest(unittest.TestCase):
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_open_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([5])
is_open_spline = torch.LongTensor([1])
a1, i1 = spline_cpu(input, kernel_size, is_open_spline, 1)
a2, i2 = spline_linear_gpu(input.cuda(),
kernel_size.cuda(), is_open_spline.cuda(),
5)
assert_equal(a1.numpy(), a2.cpu().numpy())
assert_equal(i1.numpy(), i2.cpu().numpy())
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_closed_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([4])
is_open_spline = torch.LongTensor([0])
a1, i1 = spline_cpu(input, kernel_size, is_open_spline, 1)
a2, i2 = spline_linear_gpu(input.cuda(),
kernel_size.cuda(), is_open_spline.cuda(),
4)
assert_equal(a1.numpy(), a2.cpu().numpy())
assert_equal(i1.numpy(), i2.cpu().numpy())
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_spline_2d(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
input = torch.stack([input, input], dim=1)
kernel_size = torch.LongTensor([5, 4])
is_open_spline = torch.LongTensor([1, 0])
a1, i1 = spline_cpu(input, kernel_size, is_open_spline, 1)
a2, i2 = spline_linear_gpu(input.cuda(),
kernel_size.cuda(),
is_open_spline.cuda(), 20)
assert_equal(a1.numpy(), a2.cpu().numpy())
# assert_equal(i1.numpy(), i2.cpu().numpy())
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_spline_3d(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
input = torch.stack([input, input, input], dim=1)
kernel_size = torch.LongTensor([5, 4, 4])
is_open_spline = torch.LongTensor([1, 0, 0])
a1, i1 = spline_cpu(input, kernel_size, is_open_spline, 1)
a2, i2 = spline_linear_gpu(input.cuda(),
kernel_size.cuda(),
is_open_spline.cuda(), 80)
# assert_equal(a1.numpy(), a2.cpu().numpy())
# assert_equal(i1.numpy(), i2.cpu().numpy())
import torch
from ....utils.cuda import (cuda_num_threads, Stream, Dtype, load_kernel,
kernel_loop, get_blocks)
_spline_kernel = kernel_loop + '''
extern "C"
__global__ void spline_kernel(
const ${Dtype}* input, ${Dtype}* amount, long* index,
const long* kernel_size, const long* is_open_spline) {
CUDA_KERNEL_LOOP(idx, ${num_threads}) {
const int e_idx = idx / ${k_max};
int k_idx = idx % ${k_max};
int K = ${K};
int k_idx_mod;
int pos;
${Dtype} value;
${Dtype} frac;
${Dtype} a = 1.0;
long i = 0;
for (int d_idx = 0; d_idx < ${dim}; d_idx++) {
K /= kernel_size[d_idx];
k_idx_mod = k_idx % 3;
k_idx /= 3;
value = input[e_idx * ${dim} + d_idx] *
(kernel_size[d_idx] - (2 * is_open_spline[d_idx]));
frac = value - floor(value);
if (k_idx_mod == 0) a *= 0.5 * (1- frac) * (1-frac);
else if (k_idx_mod == 1) a *= -frac * frac + frac + 0.5;
else a *= 0.5 * frac * frac;
pos = int(floor(value)) + k_idx_mod;
pos %= kernel_size[d_idx];
i += pos * K;
}
amount[idx] = a;
index[idx] = i;
}
}
'''
def spline_quadratic_gpu(input, kernel_size, is_open_spline, K):
assert input.is_cuda and kernel_size.is_cuda and is_open_spline.is_cuda
input = input.unsqueeze(1) if len(input.size()) < 2 else input
num_edges, dim = input.size()
k_max = 3**dim
amount = input.new(num_edges, k_max)
index = input.new(num_edges, k_max).long()
num_threads = amount.numel()
with torch.cuda.device_of(input):
f = load_kernel(
'spline_kernel',
_spline_kernel,
Dtype=Dtype(input),
num_threads=num_threads,
k_max=k_max,
dim=dim,
K=K)
f(block=(cuda_num_threads, 1, 1),
grid=(get_blocks(num_threads), 1, 1),
args=[
input.data_ptr(),
amount.data_ptr(),
index.data_ptr(),
kernel_size.data_ptr(),
is_open_spline.data_ptr()
],
stream=Stream(ptr=torch.cuda.current_stream().cuda_stream))
return amount, index
import unittest
import torch
from numpy.testing import assert_equal, assert_almost_equal
if torch.cuda.is_available():
from .spline_quadratic_gpu import spline_quadratic_gpu
class SplineQuadraticGPUTest(unittest.TestCase):
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_open_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([6])
is_open_spline = torch.LongTensor([1])
a1, i1 = spline_quadratic_gpu(input.cuda(),
kernel_size.cuda(),
is_open_spline.cuda(), 6)
a2 = [[0.5, 0.5, 0], [0.32, 0.66, 0.02], [0.5, 0.5, 0], [0.5, 0.5, 0],
[0.5, 0.5, 0], [0.02, 0.66, 0.32], [0.5, 0.5, 0]]
i2 = [[0, 1, 2], [0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 5], [3, 4, 5],
[4, 5, 0]]
assert_almost_equal(a1.cpu().numpy(), a2, 2)
assert_equal(i1.cpu().numpy(), i2)
@unittest.skipIf(not torch.cuda.is_available(), 'no GPU')
def test_closed_spline(self):
input = torch.FloatTensor([0, 0.05, 0.25, 0.5, 0.75, 0.95, 1])
kernel_size = torch.LongTensor([4])
is_open_spline = torch.LongTensor([0])
a1, i1 = spline_quadratic_gpu(input.cuda(),
kernel_size.cuda(),
is_open_spline.cuda(), 4)
a2 = [[0.5, 0.5, 0], [0.32, 0.66, 0.02], [0.5, 0.5, 0], [0.5, 0.5, 0],
[0.5, 0.5, 0], [0.02, 0.66, 0.32], [0.5, 0.5, 0]]
i2 = [[0, 1, 2], [0, 1, 2], [1, 2, 3], [2, 3, 0], [3, 0, 1], [3, 0, 1],
[0, 1, 2]]
assert_almost_equal(a1.cpu().numpy(), a2, 2)
assert_equal(i1.cpu().numpy(), i2)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment