test_blackscholes.py 3.93 KB
Newer Older
dugupeiwen's avatar
dugupeiwen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import numpy as np
import math
from numba import cuda, double, void
from numba.cuda.testing import unittest, CUDATestCase


RISKFREE = 0.02
VOLATILITY = 0.30

A1 = 0.31938153
A2 = -0.356563782
A3 = 1.781477937
A4 = -1.821255978
A5 = 1.330274429
RSQRT2PI = 0.39894228040143267793994605993438


def cnd(d):
    K = 1.0 / (1.0 + 0.2316419 * np.abs(d))
    ret_val = (RSQRT2PI * np.exp(-0.5 * d * d) *
               (K * (A1 + K * (A2 + K * (A3 + K * (A4 + K * A5))))))
    return np.where(d > 0, 1.0 - ret_val, ret_val)


def black_scholes(callResult, putResult, stockPrice, optionStrike, optionYears,
                  Riskfree, Volatility):
    S = stockPrice
    X = optionStrike
    T = optionYears
    R = Riskfree
    V = Volatility
    sqrtT = np.sqrt(T)
    d1 = (np.log(S / X) + (R + 0.5 * V * V) * T) / (V * sqrtT)
    d2 = d1 - V * sqrtT
    cndd1 = cnd(d1)
    cndd2 = cnd(d2)

    expRT = np.exp(- R * T)
    callResult[:] = (S * cndd1 - X * expRT * cndd2)
    putResult[:] = (X * expRT * (1.0 - cndd2) - S * (1.0 - cndd1))


def randfloat(rand_var, low, high):
    return (1.0 - rand_var) * low + rand_var * high


class TestBlackScholes(CUDATestCase):
    def test_blackscholes(self):
        OPT_N = 400
        iterations = 2

        stockPrice = randfloat(np.random.random(OPT_N), 5.0, 30.0)
        optionStrike = randfloat(np.random.random(OPT_N), 1.0, 100.0)
        optionYears = randfloat(np.random.random(OPT_N), 0.25, 10.0)

        callResultNumpy = np.zeros(OPT_N)
        putResultNumpy = -np.ones(OPT_N)

        callResultNumba = np.zeros(OPT_N)
        putResultNumba = -np.ones(OPT_N)

        # numpy
        for i in range(iterations):
            black_scholes(callResultNumpy, putResultNumpy, stockPrice,
                          optionStrike, optionYears, RISKFREE, VOLATILITY)

        @cuda.jit(double(double), device=True, inline=True)
        def cnd_cuda(d):
            K = 1.0 / (1.0 + 0.2316419 * math.fabs(d))
            ret_val = (RSQRT2PI * math.exp(-0.5 * d * d) *
                       (K * (A1 + K * (A2 + K * (A3 + K * (A4 + K * A5))))))
            if d > 0:
                ret_val = 1.0 - ret_val
            return ret_val

        @cuda.jit(void(double[:], double[:], double[:], double[:], double[:],
                       double, double))
        def black_scholes_cuda(callResult, putResult, S, X, T, R, V):
            i = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x
            if i >= S.shape[0]:
                return
            sqrtT = math.sqrt(T[i])
            d1 = ((math.log(S[i] / X[i]) + (R + 0.5 * V * V) * T[i])
                  / (V * sqrtT))
            d2 = d1 - V * sqrtT
            cndd1 = cnd_cuda(d1)
            cndd2 = cnd_cuda(d2)

            expRT = math.exp((-1. * R) * T[i])
            callResult[i] = (S[i] * cndd1 - X[i] * expRT * cndd2)
            putResult[i] = (X[i] * expRT * (1.0 - cndd2) - S[i] * (1.0 - cndd1))

        # numba
        blockdim = 512, 1
        griddim = int(math.ceil(float(OPT_N) / blockdim[0])), 1
        stream = cuda.stream()
        d_callResult = cuda.to_device(callResultNumba, stream)
        d_putResult = cuda.to_device(putResultNumba, stream)
        d_stockPrice = cuda.to_device(stockPrice, stream)
        d_optionStrike = cuda.to_device(optionStrike, stream)
        d_optionYears = cuda.to_device(optionYears, stream)

        for i in range(iterations):
            black_scholes_cuda[griddim, blockdim, stream](
                d_callResult, d_putResult, d_stockPrice, d_optionStrike,
                d_optionYears, RISKFREE, VOLATILITY)
        d_callResult.copy_to_host(callResultNumba, stream)
        d_putResult.copy_to_host(putResultNumba, stream)
        stream.synchronize()

        delta = np.abs(callResultNumpy - callResultNumba)
        L1norm = delta.sum() / np.abs(callResultNumpy).sum()

        max_abs_err = delta.max()
        self.assertTrue(L1norm < 1e-13)
        self.assertTrue(max_abs_err < 1e-13)


if __name__ == '__main__':
    unittest.main()