Commit 73dbdd9a authored by Ilya Mironov's avatar Ilya Mironov
Browse files

Renaming gaussian_moments -> rdp_accountant.py.

parent 9ee773e3
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""A standalone utility for computing the log moments.
The utility for computing the log moments. It consists of two methods.
compute_log_moment(q, sigma, T, lmbd) computes the log moment with sampling
probability q, noise sigma, order lmbd, and T steps. get_privacy_spent computes
delta (or eps) given log moments and eps (or delta).
Example use:
Suppose that we have run an algorithm with parameters, an array of
(q1, sigma1, T1) ... (qk, sigmak, Tk), and we wish to compute eps for a given
delta. The example code would be:
max_lmbd = 32
lmbds = xrange(1, max_lmbd + 1)
log_moments = []
for lmbd in lmbds:
log_moment = 0
for q, sigma, T in parameters:
log_moment += compute_log_moment(q, sigma, T, lmbd)
log_moments.append((lmbd, log_moment))
eps, delta = get_privacy_spent(log_moments, target_delta=delta)
To verify that the I1 >= I2 (see comments in GaussianMomentsAccountant in
accountant.py for the context), run the same loop above with verify=True
passed to compute_log_moment.
"""
from __future__ import print_function
import math
import sys
import numpy as np
import scipy.integrate as integrate
import scipy.stats
from six.moves import xrange
from sympy.mpmath import mp
def _to_np_float64(v):
if math.isnan(v) or math.isinf(v):
return np.inf
return np.float64(v)
######################
# FLOAT64 ARITHMETIC #
######################
def pdf_gauss(x, sigma, mean=0):
return scipy.stats.norm.pdf(x, loc=mean, scale=sigma)
def cropped_ratio(a, b):
if a < 1E-50 and b < 1E-50:
return 1.
else:
return a / b
def integral_inf(fn):
integral, _ = integrate.quad(fn, -np.inf, np.inf)
return integral
def integral_bounded(fn, lb, ub):
integral, _ = integrate.quad(fn, lb, ub)
return integral
def distributions(sigma, q):
mu0 = lambda y: pdf_gauss(y, sigma=sigma, mean=0.0)
mu1 = lambda y: pdf_gauss(y, sigma=sigma, mean=1.0)
mu = lambda y: (1 - q) * mu0(y) + q * mu1(y)
return mu0, mu1, mu
def compute_a(sigma, q, lmbd, verbose=False):
lmbd_int = int(math.ceil(lmbd))
if lmbd_int == 0:
return 1.0
a_lambda_first_term_exact = 0
a_lambda_second_term_exact = 0
for i in xrange(lmbd_int + 1):
coef_i = scipy.special.binom(lmbd_int, i) * (q ** i)
s1, s2 = 0, 0
for j in xrange(i + 1):
coef_j = scipy.special.binom(i, j) * (-1) ** (i - j)
s1 += coef_j * np.exp((j * j - j) / (2.0 * (sigma ** 2)))
s2 += coef_j * np.exp((j * j + j) / (2.0 * (sigma ** 2)))
a_lambda_first_term_exact += coef_i * s1
a_lambda_second_term_exact += coef_i * s2
a_lambda_exact = ((1.0 - q) * a_lambda_first_term_exact +
q * a_lambda_second_term_exact)
if verbose:
print("A: by binomial expansion {} = {} + {}".format(
a_lambda_exact,
(1.0 - q) * a_lambda_first_term_exact,
q * a_lambda_second_term_exact))
return _to_np_float64(a_lambda_exact)
def compute_b(sigma, q, lmbd, verbose=False):
mu0, _, mu = distributions(sigma, q)
b_lambda_fn = lambda z: mu0(z) * np.power(cropped_ratio(mu0(z), mu(z)), lmbd)
b_lambda = integral_inf(b_lambda_fn)
m = sigma ** 2 * (np.log((2. - q) / (1. - q)) + 1. / (2 * sigma ** 2))
b_fn = lambda z: (np.power(mu0(z) / mu(z), lmbd) -
np.power(mu(-z) / mu0(z), lmbd))
if verbose:
print("M =", m)
print("f(-M) = {} f(M) = {}".format(b_fn(-m), b_fn(m)))
assert b_fn(-m) < 0 and b_fn(m) < 0
b_lambda_int1_fn = lambda z: (mu0(z) *
np.power(cropped_ratio(mu0(z), mu(z)), lmbd))
b_lambda_int2_fn = lambda z: (mu0(z) *
np.power(cropped_ratio(mu(z), mu0(z)), lmbd))
b_int1 = integral_bounded(b_lambda_int1_fn, -m, m)
b_int2 = integral_bounded(b_lambda_int2_fn, -m, m)
a_lambda_m1 = compute_a(sigma, q, lmbd - 1)
b_bound = a_lambda_m1 + b_int1 - b_int2
if verbose:
print("B: by numerical integration", b_lambda)
print("B must be no more than ", b_bound)
print(b_lambda, b_bound)
return _to_np_float64(b_lambda)
###########################
# MULTIPRECISION ROUTINES #
###########################
def pdf_gauss_mp(x, sigma, mean):
return mp.mpf(1.) / mp.sqrt(mp.mpf("2.") * sigma ** 2 * mp.pi) * mp.exp(
- (x - mean) ** 2 / (mp.mpf("2.") * sigma ** 2))
def integral_inf_mp(fn):
integral, _ = mp.quad(fn, [-mp.inf, mp.inf], error=True)
return integral
def integral_bounded_mp(fn, lb, ub):
integral, _ = mp.quad(fn, [lb, ub], error=True)
return integral
def distributions_mp(sigma, q):
mu0 = lambda y: pdf_gauss_mp(y, sigma=sigma, mean=mp.mpf(0))
mu1 = lambda y: pdf_gauss_mp(y, sigma=sigma, mean=mp.mpf(1))
mu = lambda y: (1 - q) * mu0(y) + q * mu1(y)
return mu0, mu1, mu
def compute_a_mp(sigma, q, lmbd, verbose=False):
lmbd_int = int(math.ceil(lmbd))
if lmbd_int == 0:
return 1.0
mu0, mu1, mu = distributions_mp(sigma, q)
a_lambda_fn = lambda z: mu(z) * (mu(z) / mu0(z)) ** lmbd_int
a_lambda_first_term_fn = lambda z: mu0(z) * (mu(z) / mu0(z)) ** lmbd_int
a_lambda_second_term_fn = lambda z: mu1(z) * (mu(z) / mu0(z)) ** lmbd_int
a_lambda = integral_inf_mp(a_lambda_fn)
a_lambda_first_term = integral_inf_mp(a_lambda_first_term_fn)
a_lambda_second_term = integral_inf_mp(a_lambda_second_term_fn)
if verbose:
print("A: by numerical integration {} = {} + {}".format(
a_lambda,
(1 - q) * a_lambda_first_term,
q * a_lambda_second_term))
return _to_np_float64(a_lambda)
def compute_b_mp(sigma, q, lmbd, verbose=False):
lmbd_int = int(math.ceil(lmbd))
if lmbd_int == 0:
return 1.0
mu0, _, mu = distributions_mp(sigma, q)
b_lambda_fn = lambda z: mu0(z) * (mu0(z) / mu(z)) ** lmbd_int
b_lambda = integral_inf_mp(b_lambda_fn)
m = sigma ** 2 * (mp.log((2 - q) / (1 - q)) + 1 / (2 * (sigma ** 2)))
b_fn = lambda z: ((mu0(z) / mu(z)) ** lmbd_int -
(mu(-z) / mu0(z)) ** lmbd_int)
if verbose:
print("M =", m)
print("f(-M) = {} f(M) = {}".format(b_fn(-m), b_fn(m)))
assert b_fn(-m) < 0 and b_fn(m) < 0
b_lambda_int1_fn = lambda z: mu0(z) * (mu0(z) / mu(z)) ** lmbd_int
b_lambda_int2_fn = lambda z: mu0(z) * (mu(z) / mu0(z)) ** lmbd_int
b_int1 = integral_bounded_mp(b_lambda_int1_fn, -m, m)
b_int2 = integral_bounded_mp(b_lambda_int2_fn, -m, m)
a_lambda_m1 = compute_a_mp(sigma, q, lmbd - 1)
b_bound = a_lambda_m1 + b_int1 - b_int2
if verbose:
print("B by numerical integration", b_lambda)
print("B must be no more than ", b_bound)
assert b_lambda < b_bound + 1e-5
return _to_np_float64(b_lambda)
def _compute_delta(log_moments, eps):
"""Compute delta for given log_moments and eps.
Args:
log_moments: the log moments of privacy loss, in the form of pairs
of (moment_order, log_moment)
eps: the target epsilon.
Returns:
delta
"""
min_delta = 1.0
for moment_order, log_moment in log_moments:
if moment_order == 0:
continue
if math.isinf(log_moment) or math.isnan(log_moment):
sys.stderr.write("The %d-th order is inf or Nan\n" % moment_order)
continue
if log_moment < moment_order * eps:
min_delta = min(min_delta,
math.exp(log_moment - moment_order * eps))
return min_delta
def _compute_eps(log_moments, delta):
"""Compute epsilon for given log_moments and delta.
Args:
log_moments: the log moments of privacy loss, in the form of pairs
of (moment_order, log_moment)
delta: the target delta.
Returns:
epsilon
"""
min_eps = float("inf")
for moment_order, log_moment in log_moments:
if moment_order == 0:
continue
if math.isinf(log_moment) or math.isnan(log_moment):
sys.stderr.write("The %d-th order is inf or Nan\n" % moment_order)
continue
min_eps = min(min_eps, (log_moment - math.log(delta)) / moment_order)
return min_eps
def compute_log_moment(q, sigma, steps, lmbd, verify=False, verbose=False):
"""Compute the log moment of Gaussian mechanism for given parameters.
Args:
q: the sampling ratio.
sigma: the noise sigma.
steps: the number of steps.
lmbd: the moment order.
verify: if False, only compute the symbolic version. If True, computes
both symbolic and numerical solutions and verifies the results match.
verbose: if True, print out debug information.
Returns:
the log moment with type np.float64, could be np.inf.
"""
moment = compute_a(sigma, q, lmbd, verbose=verbose)
if verify:
mp.dps = 50
moment_a_mp = compute_a_mp(sigma, q, lmbd, verbose=verbose)
moment_b_mp = compute_b_mp(sigma, q, lmbd, verbose=verbose)
np.testing.assert_allclose(moment, moment_a_mp, rtol=1e-10)
if not np.isinf(moment_a_mp):
# The following test fails for (1, np.inf)!
np.testing.assert_array_less(moment_b_mp, moment_a_mp)
if np.isinf(moment):
return np.inf
else:
return np.log(moment) * steps
def get_privacy_spent(log_moments, target_eps=None, target_delta=None):
"""Compute delta (or eps) for given eps (or delta) from log moments.
Args:
log_moments: array of (moment_order, log_moment) pairs.
target_eps: if not None, the epsilon for which we would like to compute
corresponding delta value.
target_delta: if not None, the delta for which we would like to compute
corresponding epsilon value. Exactly one of target_eps and target_delta
is None.
Returns:
eps, delta pair
"""
assert (target_eps is None) ^ (target_delta is None)
assert not ((target_eps is None) and (target_delta is None))
if target_eps is not None:
return (target_eps, _compute_delta(log_moments, target_eps))
else:
return (_compute_eps(log_moments, target_delta), target_delta)
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""A standalone utility for tracking the RDP accountant.
The utility for computing Renyi differential privacy. Its public interface
consists of two methods:
compute_rdp(q, sigma, T, order) computes RDP with sampling probability q,
noise sigma, T steps at order.
get_privacy_spent computes delta (or eps) given RDP and eps (or delta).
Example use:
Suppose that we have run an algorithm with parameters, an array of
(q1, sigma1, T1) ... (qk, sigmak, Tk), and we wish to compute eps for a given
delta. The example code would be:
max_order = 32
orders = range(1, max_order + 1)
rdp_list = []
for order in orders:
rdp = 0
for q, sigma, T in parameters:
rdp += compute_rdp(q, sigma, T, order)
rdp_list.append((order, rdp))
eps, delta, opt_order = get_privacy_spent(rdp_list, target_delta=delta)
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import sys
import numpy as np
from scipy import special
######################
# FLOAT64 ARITHMETIC #
######################
def _log_add(logx, logy):
"""Add two numbers in the log space."""
a, b = min(logx, logy), max(logx, logy)
if a == -np.inf: # adding 0
return b
# Apply exp(a) + exp(b) = (exp(a - b) + 1.) * exp(b)
return np.log(np.exp(a - b) + 1.) + b
def _log_sub(logx, logy):
"""Subtract two numbers in the log space. Answer must be positive."""
if logy == -np.inf: # subtracting 0
return logx
assert logx > logy
with np.errstate(over="raise"):
try:
return np.log(np.exp(logx - logy) - 1.) + logy
except FloatingPointError:
return logx
def _log_print(logx):
"""Pretty print."""
if logx < np.log(sys.float_info.max):
return "{}".format(np.exp(logx))
else:
return "exp({})".format(logx)
def _compute_log_a_int(q, sigma, alpha, verbose=False):
"""Compute log(A_alpha) for integer alpha."""
assert isinstance(alpha, (int, long))
log_a1, log_a2 = -np.inf, -np.inf # log of the first and second terms of A_alpha
for i in range(alpha + 1):
# Do computation in the log space. Extra care needed for q = 0 or 1.
log_coef_i = np.log(special.binom(alpha, i))
if q > 0:
log_coef_i += i * np.log(q)
elif i > 0:
continue # the term is 0, skip the rest
if q < 1.0:
log_coef_i += (alpha - i) * np.log(1 - q)
elif i < alpha:
continue # the term is 0, skip the rest
s1 = log_coef_i + (i * i - i) / (2.0 * (sigma**2))
s2 = log_coef_i + (i * i + i) / (2.0 * (sigma**2))
log_a1 = _log_add(log_a1, s1)
log_a2 = _log_add(log_a2, s2)
log_a = _log_add(np.log(1.0 - q) + log_a1, np.log(q) + log_a2)
if verbose:
print("A: by binomial expansion {} = {} + {}".format(
_log_print(log_a),
_log_print(np.log(1.0 - q) + log_a1), _log_print(np.log(q) + log_a2)))
return np.float64(log_a)
def _compute_log_a_frac(q, sigma, alpha, verbose=False):
"""Compute log(A_alpha) for fractional alpha."""
# The four parts of A_alpha:
log_a11, log_a12 = -np.inf, -np.inf
log_a21, log_a22 = -np.inf, -np.inf
i = 0
z0, _ = _compute_zs(sigma, q)
while i == 0 or max(log_s11, log_s21, log_s21, log_s22) > -30:
coef = special.binom(alpha, i)
log_coef = np.log(abs(coef))
j = alpha - i
log_t1 = log_coef + i * np.log(q) + j * np.log(1 - q)
log_t2 = log_coef + j * np.log(q) + i * np.log(1 - q)
log_e11 = np.log(.5) + _log_erfc((i - z0) / (2**.5 * sigma))
log_e12 = np.log(.5) + _log_erfc((z0 - j) / (2**.5 * sigma))
log_e21 = np.log(.5) + _log_erfc((i - (z0 - 1)) / (2**.5 * sigma))
log_e22 = np.log(.5) + _log_erfc((z0 - 1 - j) / (2**.5 * sigma))
log_s11 = log_t1 + (i * i - i) / (2.0 * (sigma**2)) + log_e11
log_s12 = log_t2 + (j * j - j) / (2.0 * (sigma**2)) + log_e12
log_s21 = log_t1 + (i * i + i) / (2.0 * (sigma**2)) + log_e21
log_s22 = log_t2 + (j * j + j) / (2.0 * (sigma**2)) + log_e22
if coef > 0:
log_a11 = _log_add(log_a11, log_s11)
log_a12 = _log_add(log_a12, log_s12)
log_a21 = _log_add(log_a21, log_s21)
log_a22 = _log_add(log_a22, log_s22)
else:
log_a11 = _log_sub(log_a11, log_s11)
log_a12 = _log_sub(log_a12, log_s12)
log_a21 = _log_sub(log_a21, log_s21)
log_a22 = _log_sub(log_a22, log_s22)
i += 1
log_a = _log_add(
np.log(1. - q) + _log_add(log_a11, log_a12),
np.log(q) + _log_add(log_a21, log_a22))
return np.float64(log_a)
def compute_log_a(q, sigma, alpha, verbose=False):
if float(alpha).is_integer():
return _compute_log_a_int(q, sigma, int(alpha), verbose)
else:
return _compute_log_a_frac(q, sigma, alpha, verbose)
def _log_erfc(x):
# Can be replaced with a single call to log_ntdr if available:
# return np.log(2.) + special.log_ntdr(-x * 2**.5)
r = special.erfc(x)
if r == 0.0:
# Using the Laurent series at infinity for the tail of the erfc function:
# erfc(x) ~ exp(-x^2-.5/x^2+.625/x^4)/(x*pi^.5)
# To verify in Mathematica:
# Series[Log[Erfc[x]] + Log[x] + Log[Pi]/2 + x^2, {x, Infinity, 6}]
return (-np.log(np.pi) / 2 - np.log(x) - x**2 - .5 * x**-2 + .625 * x**-4 -
37. / 24. * x**-6 + 353. / 64. * x**-8)
else:
return np.log(r)
def _compute_zs(sigma, q):
z0 = sigma**2 * np.log(1. / q - 1) + .5
z1 = min(z0 - 2, z0 / 2)
return z0, z1
def _compute_log_b0(sigma, q, alpha, z1):
"""Return an approximation to B0 or None if failed to converge."""
z0, _ = _compute_zs(sigma, q)
s, log_term, log_b0, k, sign, max_log_term = 0, 1., 0, 0, 1, -np.inf
# Keep adding new terms until precision is no longer preserved.
# Don't stop on the negative.
while k < alpha or (log_term > max_log_term - 36 and log_term > -30
) or sign < 0.:
log_b1 = k * (k - 2 * z0) / (2 * sigma**2)
log_b2 = _log_erfc((k - z1) / (np.sqrt(2) * sigma))
log_term = log_b0 + log_b1 + log_b2
max_log_term = max(max_log_term, log_term)
s += sign * np.exp(log_term)
k += 1
# Maintain invariant: sign * exp(log_b0) = {-alpha choose k}
log_b0 += np.log(np.abs(-alpha - k + 1)) - np.log(k)
sign *= -1
if s == 0: # May happen if all terms are < 1e-324.
return -np.inf
if s < 0 or np.log(s) < max_log_term - 25: # the series failed to converge
return None
c = np.log(.5) - np.log(1 - q) * alpha
return c + np.log(s)
def _bound_log_b1(sigma, q, alpha, z1):
log_c = _log_add(np.log(1 - q), np.log(q) + (2 * z1 - 1.) / (2 * sigma**2))
return np.log(.5) - log_c * alpha + _log_erfc(z1 / (2**.5 * sigma))
def bound_log_b(q, sigma, alpha, verbose=False):
"""Compute a numerically stable bound on log(B_alpha)."""
if q == 1.: # If the sampling rate is 100%, A and B are symmetric.
return compute_log_a(q, sigma, alpha, verbose)
z0, z1 = _compute_zs(sigma, q)
log_b_bound = np.inf
# log_b1 cannot be less than its value at z0
log_lb_b1 = _bound_log_b1(sigma, q, alpha, z0)
while z0 - z1 > 1e-3:
m = (z0 + z1) / 2
log_b0 = _compute_log_b0(sigma, q, alpha, m)
if log_b0 is None:
z0 = m
continue
log_b1 = _bound_log_b1(sigma, q, alpha, m)
log_b_bound = min(log_b_bound, _log_add(log_b0, log_b1))
log_b_min_bound = _log_add(log_b0, log_lb_b1)
if log_b_bound < 0 or log_b_min_bound < 0 or log_b_bound > log_b_min_bound + .01:
# If the bound is likely to be too loose, move z1 closer to z0 and repeat.
z1 = m
else:
break
return np.float64(log_b_bound)
def _log_bound_b_elementary(q, alpha):
return -np.log(1 - q) * alpha
def _compute_delta(log_moments, eps):
"""Compute delta for given log_moments and eps.
Args:
log_moments: the log moments of privacy loss, in the form of pairs
of (moment_order, log_moment)
eps: the target epsilon.
Returns:
delta, order
"""
min_delta, opt_order = 1.0, float("NaN")
for moment_order, log_moment in log_moments:
if moment_order == 0:
continue
if math.isinf(log_moment) or math.isnan(log_moment):
sys.stderr.write("The %d-th order is inf or Nan\n" % moment_order)
continue
if log_moment < moment_order * eps:
delta = math.exp(log_moment - moment_order * eps)
if delta < min_delta:
min_delta, opt_order = delta, moment_order
return min_delta, opt_order
def _compute_eps(log_moments, delta):
"""Compute epsilon for given log_moments and delta.
Args:
log_moments: the log moments of privacy loss, in the form of pairs
of (moment_order, log_moment)
delta: the target delta.
Returns:
epsilon, order
"""
min_eps, opt_order = float("inf"), float("NaN")
for moment_order, log_moment in log_moments:
if moment_order == 0:
continue
if math.isinf(log_moment) or math.isnan(log_moment):
sys.stderr.write("The %d-th order is inf or Nan\n" % moment_order)
continue
eps = (log_moment - math.log(delta)) / moment_order
if eps < min_eps:
min_eps, opt_order = eps, moment_order
return min_eps, opt_order
def compute_log_moment(q, sigma, steps, alpha, verbose=False):
"""Compute the log moment of Gaussian mechanism for given parameters.
Args:
q: the sampling ratio.
sigma: the noise sigma.
steps: the number of steps.
alpha: the moment order.
verbose: if True, print out debug information.
Returns:
the log moment with type np.float64, could be np.inf.
"""
log_moment_a = compute_log_a(q, sigma, alpha, verbose=verbose)
log_bound_b = _log_bound_b_elementary(q, alpha) # does not require sigma
if log_bound_b < log_moment_a:
if verbose:
print("Elementary bound suffices : {} < {}".format(
_log_print(log_bound_b), _log_print(log_moment_a)))
else:
log_bound_b2 = bound_log_b(q, sigma, alpha, verbose=verbose)
if np.isnan(log_bound_b2):
if verbose:
print("B bound failed to converge")
else:
if verbose and (log_bound_b2 < log_bound_b):
print("Elementary bound is stronger: {} < {}".format(
_log_print(log_bound_b2), _log_print(log_bound_b)))
log_bound_b = min(log_bound_b, log_bound_b2)
return max(log_moment_a, log_bound_b) * steps
def get_privacy_spent(log_moments, target_eps=None, target_delta=None):
"""Compute delta (or eps) for given eps (or delta) from log moments.
Args:
log_moments: array of (moment_order, log_moment) pairs.
target_eps: if not None, the epsilon for which we would like to compute
corresponding delta value.
target_delta: if not None, the delta for which we would like to compute
corresponding epsilon value. Exactly one of target_eps and target_delta
is None.
Returns:
eps, delta, opt_order
"""
assert bool(target_eps is None) ^ bool(target_delta is None)
if target_eps is not None:
delta, opt_order = _compute_delta(log_moments, target_eps)
return target_eps, delta, opt_order
else:
eps, opt_order = _compute_eps(log_moments, target_delta)
return eps, target_delta, opt_order
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for rdp_accountant.py."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import mpmath as mp
from absl.testing import absltest
import rdp_accountant
class TestGaussianMoments(absltest.TestCase):
# MULTI-PRECISION ROUTINES
def _pdf_gauss_mp(self, x, sigma, mean):
return 1. / mp.sqrt(2. * sigma ** 2 * mp.pi) * mp.exp(-(x - mean)
** 2 / (
2. * sigma ** 2))
def _integral_inf_mp(self, fn):
integral, _ = mp.quad(
fn, [-mp.inf, mp.inf], error=True,
maxdegree=7) # maxdegree = 6 is not enough
return integral
def _integral_bounded_mp(self, fn, lb, ub):
integral, _ = mp.quad(fn, [lb, ub], error=True)
return integral
def _distributions_mp(self, sigma, q):
mu0 = lambda y: self._pdf_gauss_mp(y, sigma=sigma, mean=0)
mu1 = lambda y: self._pdf_gauss_mp(y, sigma=sigma, mean=1.)
mu = lambda y: (1 - q) * mu0(y) + q * mu1(y)
return mu0, mu1, mu
def compute_a_mp(self, sigma, q, order, verbose=False):
"""Compute A_lambda for arbitrary lambda by numerical integration."""
mp.dps = 100
mu0, mu1, mu = self._distributions_mp(sigma, q)
a_lambda_fn = lambda z: mu(z) * (mu(z) / mu0(z)) ** order
a_lambda = self._integral_inf_mp(a_lambda_fn)
if verbose:
a_lambda_first_term_fn = lambda z: mu0(z) * (mu(z) / mu0(z)) ** order
a_lambda_second_term_fn = lambda z: mu1(z) * (mu(z) / mu0(z)) ** order
a_lambda_first_term = self._integral_inf_mp(a_lambda_first_term_fn)
a_lambda_second_term = self._integral_inf_mp(a_lambda_second_term_fn)
print("A: by numerical integration {} = {} + {}".format(
a_lambda, (1 - q) * a_lambda_first_term, q * a_lambda_second_term))
return a_lambda
def compute_b_mp(self, sigma, q, order, verbose=False):
"""Compute B_lambda for arbitrary lambda by numerical integration."""
mu0, _, mu = self._distributions_mp(sigma, q)
b_lambda_fn = lambda z: mu0(z) * (mu0(z) / mu(z)) ** order
b_numeric = self._integral_inf_mp(b_lambda_fn)
if verbose:
z0, z1 = rdp_accountant._compute_zs(sigma, q)
print("z1 = ", z1)
print("x in the Taylor series = ", q / (1 - q) * np.exp(
(2 * z1 - 1) / (2 * sigma ** 2)))
b0_numeric = self._integral_bounded_mp(b_lambda_fn, -np.inf, z1)
b1_numeric = self._integral_bounded_mp(b_lambda_fn, z1, +np.inf)
print("B: numerically {} = {} + {}".format(b_numeric, b0_numeric,
b1_numeric))
return np.float64(b_numeric)
def _compute_log_moment_mp(self, q, sigma, order):
log_a_mp = np.float64(mp.log(self.compute_a_mp(sigma, q, order)))
log_b_mp = np.float64(mp.log(self.compute_b_mp(sigma, q, order)))
return log_a_mp, log_b_mp
# TEST ROUTINES
def _almost_equal(self, a, b, rtol):
# Analogue of np.testing.assert_allclose(a, b, rtol).
self.assertBetween(a, b * (1 - rtol), b * (1 + rtol))
def _compare_bounds(self, q, sigma, order):
log_a_mp, log_b_mp = self._compute_log_moment_mp(q, sigma, order)
log_a = rdp_accountant.compute_log_a(q, sigma, order)
log_bound_b = rdp_accountant.bound_log_b(q, sigma, order)
if log_a_mp < 1000 and log_a_mp > 1e-6:
self._almost_equal(log_a, log_a_mp, rtol=1e-6)
else: # be more tolerant for _very_ large or small logarithms
if log_a_mp > 1e-12:
self._almost_equal(log_a, log_a_mp, rtol=1e-2)
else:
print("Bounds on A are too small to compare: {}, {}".format(
log_a, log_a_mp))
if np.isfinite(log_bound_b) and log_bound_b > 1e-12:
# Ignore divergence between the bound and exact value of B if
# they don't matter anyway (bound on A is larger) or q > .5
if log_bound_b > log_a and q <= .5:
self._almost_equal(log_b_mp, log_bound_b, rtol=1e-2)
if np.isfinite(log_a_mp) and np.isfinite(log_b_mp):
# We hypothesize that this assertion is always true; no proof yet.
self.assertLessEqual(log_b_mp, log_a_mp + 1e-6)
def test_compute_log_moments(self):
log_moment = rdp_accountant.compute_log_moment(0.1, 2, 10, 4)
self.assertAlmostEqual(log_moment, 0.30948, places=5)
self._compare_bounds(q=.01, sigma=.1, order=.5)
self._compare_bounds(q=.1, sigma=1., order=5)
self._compare_bounds(q=.5, sigma=2., order=32.5)
# Compare the cheap computation with expensive, multi-precision
# computation for a few parameters. Takes about a minute.
# for q in (1e-6, .1, .999):
# for sigma in (.1, 10.):
# for order in (.5, 1., 1.5, 256.):
# self._compare_bounds(q, sigma, order)
def test_get_privacy_spent(self):
orders = range(1, 33)
log_moments = []
for order in orders:
log_moment = rdp_accountant.compute_log_moment(0.01, 4, 10000, order)
log_moments.append((order, log_moment))
eps, delta, opt_order = rdp_accountant.get_privacy_spent(log_moments,
target_delta=1e-5)
self.assertAlmostEqual(eps, 1.258575, places=5)
self.assertEqual(opt_order, 19)
eps, delta, _ = rdp_accountant.get_privacy_spent(log_moments,
target_eps=1.258575)
self.assertAlmostEqual(delta, 1e-5)
def test_compute_privacy_loss(self):
parameters = [(0.01, 4, 10000), (0.1, 2, 100)]
delta = 1e-5
orders = (
1, 1.25, 1.5, 1.75, 2., 2.5, 3., 4., 5., 6., 7., 8., 10., 12., 14.,
16., 20., 24., 28., 32., 64., 256.)
log_moments = []
for order in orders:
log_moment = 0
for q, sigma, steps in parameters:
log_moment += rdp_accountant.compute_log_moment(q, sigma, steps, order)
log_moments.append((order, log_moment))
eps, delta, opt_order = rdp_accountant.get_privacy_spent(
log_moments, target_delta=delta)
self.assertAlmostEqual(eps, 3.276237, places=5)
self.assertEqual(opt_order, 7)
if __name__ == "__main__":
absltest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment