functional.py 41.2 KB
Newer Older
1
from __future__ import absolute_import, division, print_function, unicode_literals
Vincent QB's avatar
Vincent QB committed
2

3
import math
Vincent QB's avatar
Vincent QB committed
4

Jason Lian's avatar
Jason Lian committed
5
6
import torch

Jason Lian's avatar
pre  
Jason Lian committed
7
__all__ = [
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    "istft",
    "spectrogram",
    "amplitude_to_DB",
    "create_fb_matrix",
    "create_dct",
    "mu_law_encoding",
    "mu_law_decoding",
    "complex_norm",
    "angle",
    "magphase",
    "phase_vocoder",
    "lfilter",
    "lowpass_biquad",
    "highpass_biquad",
xinyang0's avatar
xinyang0 committed
22
    "equalizer_biquad",
23
    "biquad",
24
25
    'mask_along_axis',
    'mask_along_axis_iid'
Jason Lian's avatar
pre  
Jason Lian committed
26
27
]

Vincent QB's avatar
Vincent QB committed
28

Jason Lian's avatar
Jason Lian committed
29
# TODO: remove this once https://github.com/pytorch/pytorch/issues/21478 gets solved
Jason Lian's avatar
more  
Jason Lian committed
30
@torch.jit.ignore
31
32
33
34
35
36
37
38
39
40
41
def _stft(
    waveform,
    n_fft,
    hop_length,
    win_length,
    window,
    center,
    pad_mode,
    normalized,
    onesided,
):
42
    # type: (Tensor, int, Optional[int], Optional[int], Optional[Tensor], bool, str, bool, bool) -> Tensor
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    return torch.stft(
        waveform,
        n_fft,
        hop_length,
        win_length,
        window,
        center,
        pad_mode,
        normalized,
        onesided,
    )


def istft(
    stft_matrix,  # type: Tensor
    n_fft,  # type: int
    hop_length=None,  # type: Optional[int]
    win_length=None,  # type: Optional[int]
    window=None,  # type: Optional[Tensor]
    center=True,  # type: bool
    pad_mode="reflect",  # type: str
    normalized=False,  # type: bool
    onesided=True,  # type: bool
    length=None,  # type: Optional[int]
):
jamarshon's avatar
jamarshon committed
68
    # type: (...) -> Tensor
69
    r"""Inverse short time Fourier Transform. This is expected to be the inverse of torch.stft.
jamarshon's avatar
jamarshon committed
70
    It has the same parameters (+ additional optional parameter of ``length``) and it should return the
jamarshon's avatar
jamarshon committed
71
72
    least squares estimation of the original signal. The algorithm will check using the NOLA condition (
    nonzero overlap).
jamarshon's avatar
jamarshon committed
73
74

    Important consideration in the parameters ``window`` and ``center`` so that the envelop
jamarshon's avatar
jamarshon committed
75
    created by the summation of all the windows is never zero at certain point in time. Specifically,
jamarshon's avatar
jamarshon committed
76
77
    :math:`\sum_{t=-\infty}^{\infty} w^2[n-t\times hop\_length] \cancel{=} 0`.

jamarshon's avatar
jamarshon committed
78
    Since stft discards elements at the end of the signal if they do not fit in a frame, the
79
    istft may return a shorter signal than the original signal (can occur if ``center`` is False
jamarshon's avatar
jamarshon committed
80
    since the signal isn't padded).
jamarshon's avatar
jamarshon committed
81
82

    If ``center`` is True, then there will be padding e.g. 'constant', 'reflect', etc. Left padding
jamarshon's avatar
jamarshon committed
83
84
    can be trimmed off exactly because they can be calculated but right padding cannot be calculated
    without additional information.
jamarshon's avatar
jamarshon committed
85

jamarshon's avatar
jamarshon committed
86
87
    Example: Suppose the last window is:
    [17, 18, 0, 0, 0] vs [18, 0, 0, 0, 0]
jamarshon's avatar
jamarshon committed
88

Vincent QB's avatar
Vincent QB committed
89
    The n_frame, hop_length, win_length are all the same which prevents the calculation of right padding.
jamarshon's avatar
jamarshon committed
90
91
92
93
    These additional values could be zeros or a reflection of the signal so providing ``length``
    could be useful. If ``length`` is ``None`` then padding will be aggressively removed
    (some loss of signal).

94
95
    [1] D. W. Griffin and J. S. Lim, "Signal estimation from modified short-time Fourier transform,"
    IEEE Trans. ASSP, vol.32, no.2, pp.236-243, Apr. 1984.
jamarshon's avatar
jamarshon committed
96
97

    Args:
98
        stft_matrix (torch.Tensor): Output of stft where each row of a channel is a frequency and each
99
            column is a window. it has a size of either (..., fft_size, n_frame, 2)
jamarshon's avatar
jamarshon committed
100
101
102
103
104
105
106
        n_fft (int): Size of Fourier transform
        hop_length (Optional[int]): The distance between neighboring sliding window frames.
            (Default: ``win_length // 4``)
        win_length (Optional[int]): The size of window frame and STFT filter. (Default: ``n_fft``)
        window (Optional[torch.Tensor]): The optional window function.
            (Default: ``torch.ones(win_length)``)
        center (bool): Whether ``input`` was padded on both sides so
107
108
109
110
111
112
            that the :math:`t`-th frame is centered at time :math:`t \times \text{hop\_length}`.
            (Default: ``True``)
        pad_mode (str): Controls the padding method used when ``center`` is True. (Default:
            ``'reflect'``)
        normalized (bool): Whether the STFT was normalized. (Default: ``False``)
        onesided (bool): Whether the STFT is onesided. (Default: ``True``)
jamarshon's avatar
jamarshon committed
113
        length (Optional[int]): The amount to trim the signal by (i.e. the
jamarshon's avatar
jamarshon committed
114
            original signal length). (Default: whole signal)
jamarshon's avatar
jamarshon committed
115
116

    Returns:
Vincent QB's avatar
Vincent QB committed
117
        torch.Tensor: Least squares estimation of the original signal of size (..., signal_length)
jamarshon's avatar
jamarshon committed
118
119
    """
    stft_matrix_dim = stft_matrix.dim()
Vincent QB's avatar
Vincent QB committed
120
121
    assert 3 <= stft_matrix_dim, "Incorrect stft dimension: %d" % (stft_matrix_dim)
    assert stft_matrix.nelement() > 0
jamarshon's avatar
jamarshon committed
122
123

    if stft_matrix_dim == 3:
124
        # add a channel dimension
jamarshon's avatar
jamarshon committed
125
126
        stft_matrix = stft_matrix.unsqueeze(0)

Vincent QB's avatar
Vincent QB committed
127
128
129
130
    # pack batch
    shape = stft_matrix.size()
    stft_matrix = stft_matrix.reshape(-1, *shape[-3:])

131
    dtype = stft_matrix.dtype
jamarshon's avatar
jamarshon committed
132
133
    device = stft_matrix.device
    fft_size = stft_matrix.size(1)
134
135
136
137
138
139
140
    assert (onesided and n_fft // 2 + 1 == fft_size) or (
        not onesided and n_fft == fft_size
    ), (
        "one_sided implies that n_fft // 2 + 1 == fft_size and not one_sided implies n_fft == fft_size. "
        + "Given values were onesided: %s, n_fft: %d, fft_size: %d"
        % ("True" if onesided else False, n_fft, fft_size)
    )
jamarshon's avatar
jamarshon committed
141
142
143
144
145
146
147
148
149
150
151
152
153

    # use stft defaults for Optionals
    if win_length is None:
        win_length = n_fft

    if hop_length is None:
        hop_length = int(win_length // 4)

    # There must be overlap
    assert 0 < hop_length <= win_length
    assert 0 < win_length <= n_fft

    if window is None:
154
        window = torch.ones(win_length, requires_grad=False, device=device, dtype=dtype)
jamarshon's avatar
jamarshon committed
155
156
157
158
159
160
161
162
163
164

    assert window.dim() == 1 and window.size(0) == win_length

    if win_length != n_fft:
        # center window with pad left and right zeros
        left = (n_fft - win_length) // 2
        window = torch.nn.functional.pad(window, (left, n_fft - win_length - left))
        assert window.size(0) == n_fft
    # win_length and n_fft are synonymous from here on

Vincent QB's avatar
Vincent QB committed
165
    stft_matrix = stft_matrix.transpose(1, 2)  # size (channel, n_frame, fft_size, 2)
166
167
    stft_matrix = torch.irfft(
        stft_matrix, 1, normalized, onesided, signal_sizes=(n_fft,)
Vincent QB's avatar
Vincent QB committed
168
    )  # size (channel, n_frame, n_fft)
jamarshon's avatar
jamarshon committed
169
170

    assert stft_matrix.size(2) == n_fft
Vincent QB's avatar
Vincent QB committed
171
    n_frame = stft_matrix.size(1)
jamarshon's avatar
jamarshon committed
172

Vincent QB's avatar
Vincent QB committed
173
    ytmp = stft_matrix * window.view(1, 1, n_fft)  # size (channel, n_frame, n_fft)
174
    # each column of a channel is a frame which needs to be overlap added at the right place
Vincent QB's avatar
Vincent QB committed
175
    ytmp = ytmp.transpose(1, 2)  # size (channel, n_fft, n_frame)
jamarshon's avatar
jamarshon committed
176

177
178
179
    eye = torch.eye(n_fft, requires_grad=False, device=device, dtype=dtype).unsqueeze(
        1
    )  # size (n_fft, 1, n_fft)
jamarshon's avatar
jamarshon committed
180
181
182
183

    # this does overlap add where the frames of ytmp are added such that the i'th frame of
    # ytmp is added starting at i*hop_length in the output
    y = torch.nn.functional.conv_transpose1d(
184
185
        ytmp, eye, stride=hop_length, padding=0
    )  # size (channel, 1, expected_signal_len)
jamarshon's avatar
jamarshon committed
186
187

    # do the same for the window function
188
    window_sq = (
Vincent QB's avatar
Vincent QB committed
189
190
        window.pow(2).view(n_fft, 1).repeat((1, n_frame)).unsqueeze(0)
    )  # size (1, n_fft, n_frame)
jamarshon's avatar
jamarshon committed
191
    window_envelop = torch.nn.functional.conv_transpose1d(
192
193
        window_sq, eye, stride=hop_length, padding=0
    )  # size (1, 1, expected_signal_len)
jamarshon's avatar
jamarshon committed
194

Vincent QB's avatar
Vincent QB committed
195
    expected_signal_len = n_fft + hop_length * (n_frame - 1)
jamarshon's avatar
jamarshon committed
196
197
198
199
200
201
202
203
204
205
206
207
208
    assert y.size(2) == expected_signal_len
    assert window_envelop.size(2) == expected_signal_len

    half_n_fft = n_fft // 2
    # we need to trim the front padding away if center
    start = half_n_fft if center else 0
    end = -half_n_fft if length is None else start + length

    y = y[:, :, start:end]
    window_envelop = window_envelop[:, :, start:end]

    # check NOLA non-zero overlap condition
    window_envelop_lowest = window_envelop.abs().min()
209
210
211
    assert window_envelop_lowest > 1e-11, "window overlap add min: %f" % (
        window_envelop_lowest
    )
jamarshon's avatar
jamarshon committed
212

213
    y = (y / window_envelop).squeeze(1)  # size (channel, expected_signal_len)
jamarshon's avatar
jamarshon committed
214

Vincent QB's avatar
Vincent QB committed
215
216
217
    # unpack batch
    y = y.reshape(shape[:-3] + y.shape[-1:])

218
    if stft_matrix_dim == 3:  # remove the channel dimension
jamarshon's avatar
jamarshon committed
219
        y = y.squeeze(0)
Vincent QB's avatar
Vincent QB committed
220

jamarshon's avatar
jamarshon committed
221
222
223
    return y


224
225
226
def spectrogram(
    waveform, pad, window, n_fft, hop_length, win_length, power, normalized
):
227
    # type: (Tensor, int, Tensor, int, int, int, Optional[int], bool) -> Tensor
Vincent QB's avatar
Vincent QB committed
228
    r"""Create a spectrogram or a batch of spectrograms from a raw audio signal.
229
    The spectrogram can be either magnitude-only or complex.
jamarshon's avatar
jamarshon committed
230
231

    Args:
232
        waveform (torch.Tensor): Tensor of audio of dimension (..., channel, time)
jamarshon's avatar
jamarshon committed
233
        pad (int): Two sided padding of signal
234
        window (torch.Tensor): Window tensor that is applied/multiplied to each frame/window
235
        n_fft (int): Size of FFT
236
237
238
        hop_length (int): Length of hop between STFT windows
        win_length (int): Window size
        power (int): Exponent for the magnitude spectrogram,
jamarshon's avatar
jamarshon committed
239
            (must be > 0) e.g., 1 for energy, 2 for power, etc.
240
            If None, then the complex spectrum is returned instead.
241
        normalized (bool): Whether to normalize by magnitude after stft
jamarshon's avatar
jamarshon committed
242
243

    Returns:
244
        torch.Tensor: Dimension (..., channel, freq, time), where channel
Vincent QB's avatar
Vincent QB committed
245
246
        is unchanged, freq is ``n_fft // 2 + 1`` and ``n_fft`` is the number of
        Fourier bins, and time is the number of window hops (n_frame).
Jason Lian's avatar
Jason Lian committed
247
    """
Jason Lian's avatar
Jason Lian committed
248
249

    if pad > 0:
250
        # TODO add "with torch.no_grad():" back when JIT supports it
251
        waveform = torch.nn.functional.pad(waveform, (pad, pad), "constant")
Jason Lian's avatar
Jason Lian committed
252

253
254
255
256
    # pack batch
    shape = waveform.size()
    waveform = waveform.reshape(-1, shape[-1])

Jason Lian's avatar
Jason Lian committed
257
    # default values are consistent with librosa.core.spectrum._spectrogram
258
259
260
    spec_f = _stft(
        waveform, n_fft, hop_length, win_length, window, True, "reflect", False, True
    )
261

262
263
264
    # unpack batch
    spec_f = spec_f.reshape(shape[:-1] + spec_f.shape[-3:])

265
    if normalized:
Jason Lian's avatar
Jason Lian committed
266
        spec_f /= window.pow(2).sum().sqrt()
267
268
269
    if power is not None:
        spec_f = spec_f.pow(power).sum(-1)  # get power of "complex" tensor

Jason Lian's avatar
Jason Lian committed
270
    return spec_f
Jason Lian's avatar
more  
Jason Lian committed
271
272


273
def amplitude_to_DB(x, multiplier, amin, db_multiplier, top_db=None):
274
    # type: (Tensor, float, float, float, Optional[float]) -> Tensor
Vincent QB's avatar
Vincent QB committed
275
    r"""Turn a tensor from the power/amplitude scale to the decibel scale.
276

277
    This output depends on the maximum value in the input tensor, and so
278
279
280
281
    may return different values for an audio clip split into snippets vs. a
    a full clip.

    Args:
282
        x (torch.Tensor): Input tensor before being converted to decibel scale
283
        multiplier (float): Use 10. for power and 20. for amplitude
284
        amin (float): Number to clamp ``x``
285
286
        db_multiplier (float): Log10(max(reference value and amin))
        top_db (Optional[float]): Minimum negative cut-off in decibels. A reasonable number
287
            is 80. (Default: ``None``)
288
289

    Returns:
290
        torch.Tensor: Output tensor in decibel scale
291
    """
292
293
    x_db = multiplier * torch.log10(torch.clamp(x, min=amin))
    x_db -= multiplier * db_multiplier
294
295

    if top_db is not None:
296
297
298
        new_x_db_max = torch.tensor(
            float(x_db.max()) - top_db, dtype=x_db.dtype, device=x_db.device
        )
299
        x_db = torch.max(x_db, new_x_db_max)
300

301
    return x_db
302
303


engineerchuan's avatar
engineerchuan committed
304
305
def create_fb_matrix(n_freqs, f_min, f_max, n_mels, sample_rate):
    # type: (int, float, float, int, int) -> Tensor
Vincent QB's avatar
Vincent QB committed
306
    r"""Create a frequency bin conversion matrix.
Jason Lian's avatar
more  
Jason Lian committed
307

jamarshon's avatar
jamarshon committed
308
    Args:
309
        n_freqs (int): Number of frequencies to highlight/apply
engineerchuan's avatar
engineerchuan committed
310
311
        f_min (float): Minimum frequency (Hz)
        f_max (float): Maximum frequency (Hz)
312
        n_mels (int): Number of mel filterbanks
engineerchuan's avatar
engineerchuan committed
313
        sample_rate (int): Sample rate of the audio waveform
Jason Lian's avatar
more  
Jason Lian committed
314

jamarshon's avatar
jamarshon committed
315
    Returns:
316
        torch.Tensor: Triangular filter banks (fb matrix) of size (``n_freqs``, ``n_mels``)
317
318
        meaning number of frequencies to highlight/apply to x the number of filterbanks.
        Each column is a filterbank so that assuming there is a matrix A of
319
320
        size (..., ``n_freqs``), the applied result would be
        ``A * create_fb_matrix(A.size(-1), ...)``.
321
    """
322
    # freq bins
engineerchuan's avatar
engineerchuan committed
323
324
325
326
327
    # Equivalent filterbank construction by Librosa
    all_freqs = torch.linspace(0, sample_rate // 2, n_freqs)
    i_freqs = all_freqs.ge(f_min) & all_freqs.le(f_max)
    freqs = all_freqs[i_freqs]

Jason Lian's avatar
more  
Jason Lian committed
328
    # calculate mel freq bins
329
    # hertz to mel(f) is 2595. * math.log10(1. + (f / 700.))
engineerchuan's avatar
engineerchuan committed
330
    m_min = 2595.0 * math.log10(1.0 + (f_min / 700.0))
331
    m_max = 2595.0 * math.log10(1.0 + (f_max / 700.0))
Jason Lian's avatar
more  
Jason Lian committed
332
    m_pts = torch.linspace(m_min, m_max, n_mels + 2)
333
    # mel to hertz(mel) is 700. * (10**(mel / 2595.) - 1.)
334
    f_pts = 700.0 * (10 ** (m_pts / 2595.0) - 1.0)
Jason Lian's avatar
more  
Jason Lian committed
335
336
    # calculate the difference between each mel point and each stft freq point in hertz
    f_diff = f_pts[1:] - f_pts[:-1]  # (n_mels + 1)
engineerchuan's avatar
engineerchuan committed
337
    slopes = f_pts.unsqueeze(0) - all_freqs.unsqueeze(1)  # (n_freqs, n_mels + 2)
Jason Lian's avatar
more  
Jason Lian committed
338
    # create overlapping triangles
339
    zero = torch.zeros(1)
340
    down_slopes = (-1.0 * slopes[:, :-2]) / f_diff[:-1]  # (n_freqs, n_mels)
341
342
    up_slopes = slopes[:, 2:] / f_diff[1:]  # (n_freqs, n_mels)
    fb = torch.max(zero, torch.min(down_slopes, up_slopes))
Jason Lian's avatar
more  
Jason Lian committed
343
344
345
    return fb


Jason Lian's avatar
more  
Jason Lian committed
346
def create_dct(n_mfcc, n_mels, norm):
347
    # type: (int, int, Optional[str]) -> Tensor
Vincent QB's avatar
Vincent QB committed
348
    r"""Create a DCT transformation matrix with shape (``n_mels``, ``n_mfcc``),
jamarshon's avatar
jamarshon committed
349
    normalized depending on norm.
Jason Lian's avatar
Jason Lian committed
350

jamarshon's avatar
jamarshon committed
351
    Args:
352
353
354
        n_mfcc (int): Number of mfc coefficients to retain
        n_mels (int): Number of mel filterbanks
        norm (Optional[str]): Norm to use (either 'ortho' or None)
Jason Lian's avatar
Jason Lian committed
355

jamarshon's avatar
jamarshon committed
356
    Returns:
357
        torch.Tensor: The transformation matrix, to be right-multiplied to
358
        row-wise data of size (``n_mels``, ``n_mfcc``).
Jason Lian's avatar
more  
Jason Lian committed
359
360
    """
    # http://en.wikipedia.org/wiki/Discrete_cosine_transform#DCT-II
361
362
363
    n = torch.arange(float(n_mels))
    k = torch.arange(float(n_mfcc)).unsqueeze(1)
    dct = torch.cos(math.pi / float(n_mels) * (n + 0.5) * k)  # size (n_mfcc, n_mels)
364
365
    if norm is None:
        dct *= 2.0
Jason Lian's avatar
more  
Jason Lian committed
366
    else:
367
        assert norm == "ortho"
368
        dct[0] *= 1.0 / math.sqrt(2.0)
369
        dct *= math.sqrt(2.0 / float(n_mels))
370
    return dct.t()
Jason Lian's avatar
more  
Jason Lian committed
371
372


373
def mu_law_encoding(x, quantization_channels):
374
    # type: (Tensor, int) -> Tensor
Vincent QB's avatar
Vincent QB committed
375
    r"""Encode signal based on mu-law companding.  For more info see the
Jason Lian's avatar
Jason Lian committed
376
377
378
    `Wikipedia Entry <https://en.wikipedia.org/wiki/%CE%9C-law_algorithm>`_

    This algorithm assumes the signal has been scaled to between -1 and 1 and
jamarshon's avatar
jamarshon committed
379
    returns a signal encoded with values from 0 to quantization_channels - 1.
Jason Lian's avatar
Jason Lian committed
380

jamarshon's avatar
jamarshon committed
381
382
    Args:
        x (torch.Tensor): Input tensor
383
        quantization_channels (int): Number of channels
Jason Lian's avatar
Jason Lian committed
384

jamarshon's avatar
jamarshon committed
385
    Returns:
386
        torch.Tensor: Input after mu-law encoding
Jason Lian's avatar
Jason Lian committed
387
    """
388
    mu = quantization_channels - 1.0
389
    if not x.is_floating_point():
390
391
        x = x.to(torch.float)
    mu = torch.tensor(mu, dtype=x.dtype)
392
    x_mu = torch.sign(x) * torch.log1p(mu * torch.abs(x)) / torch.log1p(mu)
Jason Lian's avatar
Jason Lian committed
393
    x_mu = ((x_mu + 1) / 2 * mu + 0.5).to(torch.int64)
Jason Lian's avatar
more  
Jason Lian committed
394
395
396
    return x_mu


397
def mu_law_decoding(x_mu, quantization_channels):
398
    # type: (Tensor, int) -> Tensor
Vincent QB's avatar
Vincent QB committed
399
    r"""Decode mu-law encoded signal.  For more info see the
Jason Lian's avatar
Jason Lian committed
400
401
402
403
404
    `Wikipedia Entry <https://en.wikipedia.org/wiki/%CE%9C-law_algorithm>`_

    This expects an input with values between 0 and quantization_channels - 1
    and returns a signal scaled between -1 and 1.

jamarshon's avatar
jamarshon committed
405
406
    Args:
        x_mu (torch.Tensor): Input tensor
407
        quantization_channels (int): Number of channels
Jason Lian's avatar
Jason Lian committed
408

jamarshon's avatar
jamarshon committed
409
    Returns:
410
        torch.Tensor: Input after mu-law decoding
Jason Lian's avatar
Jason Lian committed
411
    """
412
    mu = quantization_channels - 1.0
413
    if not x_mu.is_floating_point():
414
415
        x_mu = x_mu.to(torch.float)
    mu = torch.tensor(mu, dtype=x_mu.dtype)
416
417
    x = ((x_mu) / mu) * 2 - 1.0
    x = torch.sign(x) * (torch.exp(torch.abs(x) * torch.log1p(mu)) - 1.0) / mu
Jason Lian's avatar
more  
Jason Lian committed
418
    return x
419
420
421


def complex_norm(complex_tensor, power=1.0):
422
    # type: (Tensor, float) -> Tensor
423
    r"""Compute the norm of complex tensor input.
424
425

    Args:
426
        complex_tensor (torch.Tensor): Tensor shape of `(..., complex=2)`
427
        power (float): Power of the norm. (Default: `1.0`).
428
429

    Returns:
430
        torch.Tensor: Power of the normed input tensor. Shape of `(..., )`
431
432
433
434
435
436
437
    """
    if power == 1.0:
        return torch.norm(complex_tensor, 2, -1)
    return torch.norm(complex_tensor, 2, -1).pow(power)


def angle(complex_tensor):
438
    # type: (Tensor) -> Tensor
439
440
441
    r"""Compute the angle of complex tensor input.

    Args:
442
        complex_tensor (torch.Tensor): Tensor shape of `(..., complex=2)`
443
444

    Return:
445
        torch.Tensor: Angle of a complex tensor. Shape of `(..., )`
446
447
448
449
    """
    return torch.atan2(complex_tensor[..., 1], complex_tensor[..., 0])


450
def magphase(complex_tensor, power=1.0):
451
    # type: (Tensor, float) -> Tuple[Tensor, Tensor]
452
    r"""Separate a complex-valued spectrogram with shape `(..., 2)` into its magnitude and phase.
453
454

    Args:
455
        complex_tensor (torch.Tensor): Tensor shape of `(..., complex=2)`
456
457
458
        power (float): Power of the norm. (Default: `1.0`)

    Returns:
459
        Tuple[torch.Tensor, torch.Tensor]: The magnitude and phase of the complex tensor
460
461
462
463
464
465
466
    """
    mag = complex_norm(complex_tensor, power)
    phase = angle(complex_tensor)
    return mag, phase


def phase_vocoder(complex_specgrams, rate, phase_advance):
467
    # type: (Tensor, float, Tensor) -> Tensor
468
    r"""Given a STFT tensor, speed up in time without modifying pitch by a
469
    factor of ``rate``.
Vincent QB's avatar
Vincent QB committed
470

471
    Args:
472
        complex_specgrams (torch.Tensor): Dimension of `(..., freq, time, complex=2)`
473
        rate (float): Speed-up factor
474
475
        phase_advance (torch.Tensor): Expected phase advance in each bin. Dimension
            of (freq, 1)
Vincent QB's avatar
Vincent QB committed
476

477
    Returns:
478
        complex_specgrams_stretch (torch.Tensor): Dimension of `(...,
479
        freq, ceil(time/rate), complex=2)`
Vincent QB's avatar
Vincent QB committed
480

481
    Example
482
483
484
485
        >>> freq, hop_length = 1025, 512
        >>> # (channel, freq, time, complex=2)
        >>> complex_specgrams = torch.randn(2, freq, 300, 2)
        >>> rate = 1.3 # Speed up by 30%
486
        >>> phase_advance = torch.linspace(
487
        >>>    0, math.pi * hop_length, freq)[..., None]
488
489
        >>> x = phase_vocoder(complex_specgrams, rate, phase_advance)
        >>> x.shape # with 231 == ceil(300 / 1.3)
490
        torch.Size([2, 1025, 231, 2])
491
    """
492

493
494
495
496
    # pack batch
    shape = complex_specgrams.size()
    complex_specgrams = complex_specgrams.reshape([-1] + list(shape[-3:]))

497
498
499
500
501
    time_steps = torch.arange(0,
                              complex_specgrams.size(-2),
                              rate,
                              device=complex_specgrams.device,
                              dtype=complex_specgrams.dtype)
502

503
    alphas = time_steps % 1.0
Vincent QB's avatar
Vincent QB committed
504
    phase_0 = angle(complex_specgrams[..., :1, :])
505
506
507
508

    # Time Padding
    complex_specgrams = torch.nn.functional.pad(complex_specgrams, [0, 0, 0, 2])

509
    # (new_bins, freq, 2)
Vincent QB's avatar
Vincent QB committed
510
511
    complex_specgrams_0 = complex_specgrams.index_select(-2, time_steps.long())
    complex_specgrams_1 = complex_specgrams.index_select(-2, (time_steps + 1).long())
512
513
514
515

    angle_0 = angle(complex_specgrams_0)
    angle_1 = angle(complex_specgrams_1)

516
517
    norm_0 = torch.norm(complex_specgrams_0, p=2, dim=-1)
    norm_1 = torch.norm(complex_specgrams_1, p=2, dim=-1)
518
519
520
521
522
523

    phase = angle_1 - angle_0 - phase_advance
    phase = phase - 2 * math.pi * torch.round(phase / (2 * math.pi))

    # Compute Phase Accum
    phase = phase + phase_advance
Vincent QB's avatar
Vincent QB committed
524
    phase = torch.cat([phase_0, phase[..., :-1]], dim=-1)
525
526
527
528
529
530
531
532
533
    phase_acc = torch.cumsum(phase, -1)

    mag = alphas * norm_1 + (1 - alphas) * norm_0

    real_stretch = mag * torch.cos(phase_acc)
    imag_stretch = mag * torch.sin(phase_acc)

    complex_specgrams_stretch = torch.stack([real_stretch, imag_stretch], dim=-1)

534
535
536
    # unpack batch
    complex_specgrams_stretch = complex_specgrams_stretch.reshape(shape[:-3] + complex_specgrams_stretch.shape[1:])

537
    return complex_specgrams_stretch
538
539
540
541


def lfilter(waveform, a_coeffs, b_coeffs):
    # type: (Tensor, Tensor, Tensor) -> Tensor
Vincent QB's avatar
Vincent QB committed
542
    r"""Perform an IIR filter by evaluating difference equation.
543
544

    Args:
Vincent QB's avatar
Vincent QB committed
545
        waveform (torch.Tensor): audio waveform of dimension of `(..., time)`.  Must be normalized to -1 to 1.
546
547
548
549
550
551
552
553
        a_coeffs (torch.Tensor): denominator coefficients of difference equation of dimension of `(n_order + 1)`.
                                Lower delays coefficients are first, e.g. `[a0, a1, a2, ...]`.
                                Must be same size as b_coeffs (pad with 0's as necessary).
        b_coeffs (torch.Tensor): numerator coefficients of difference equation of dimension of `(n_order + 1)`.
                                 Lower delays coefficients are first, e.g. `[b0, b1, b2, ...]`.
                                 Must be same size as a_coeffs (pad with 0's as necessary).

    Returns:
Vincent QB's avatar
Vincent QB committed
554
        output_waveform (torch.Tensor): Dimension of `(..., time)`.  Output will be clipped to -1 to 1.
555
556
557

    """

Vincent QB's avatar
Vincent QB committed
558
559
560
561
562
563
    dim = waveform.dim()

    # pack batch
    shape = waveform.size()
    waveform = waveform.reshape(-1, shape[-1])

564
565
    assert(a_coeffs.size(0) == b_coeffs.size(0))
    assert(len(waveform.size()) == 2)
566
567
    assert(waveform.device == a_coeffs.device)
    assert(b_coeffs.device == a_coeffs.device)
568

569
570
    device = waveform.device
    dtype = waveform.dtype
Vincent QB's avatar
Vincent QB committed
571
    n_channel, n_sample = waveform.size()
572
573
574
575
    n_order = a_coeffs.size(0)
    assert(n_order > 0)

    # Pad the input and create output
Vincent QB's avatar
Vincent QB committed
576
    padded_waveform = torch.zeros(n_channel, n_sample + n_order - 1, dtype=dtype, device=device)
577
    padded_waveform[:, (n_order - 1):] = waveform
Vincent QB's avatar
Vincent QB committed
578
    padded_output_waveform = torch.zeros(n_channel, n_sample + n_order - 1, dtype=dtype, device=device)
579
580
581

    # Set up the coefficients matrix
    # Flip order, repeat, and transpose
Vincent QB's avatar
Vincent QB committed
582
583
    a_coeffs_filled = a_coeffs.flip(0).repeat(n_channel, 1).t()
    b_coeffs_filled = b_coeffs.flip(0).repeat(n_channel, 1).t()
584
585

    # Set up a few other utilities
Vincent QB's avatar
Vincent QB committed
586
587
    a0_repeated = torch.ones(n_channel, dtype=dtype, device=device) * a_coeffs[0]
    ones = torch.ones(n_channel, n_sample, dtype=dtype, device=device)
588

Vincent QB's avatar
Vincent QB committed
589
    for i_sample in range(n_sample):
590

Vincent QB's avatar
Vincent QB committed
591
        o0 = torch.zeros(n_channel, dtype=dtype, device=device)
592

Vincent QB's avatar
Vincent QB committed
593
594
        windowed_input_signal = padded_waveform[:, i_sample:(i_sample + n_order)]
        windowed_output_signal = padded_output_waveform[:, i_sample:(i_sample + n_order)]
595
596
597
598
599
600

        o0.add_(torch.diag(torch.mm(windowed_input_signal, b_coeffs_filled)))
        o0.sub_(torch.diag(torch.mm(windowed_output_signal, a_coeffs_filled)))

        o0.div_(a0_repeated)

Vincent QB's avatar
Vincent QB committed
601
        padded_output_waveform[:, i_sample + n_order - 1] = o0
602

Vincent QB's avatar
Vincent QB committed
603
604
605
606
607
608
609
610
    output = torch.min(
        ones, torch.max(ones * -1, padded_output_waveform[:, (n_order - 1):])
    )

    # unpack batch
    output = output.reshape(shape[:-1] + output.shape[-1:])

    return output
611
612
613
614


def biquad(waveform, b0, b1, b2, a0, a1, a2):
    # type: (Tensor, float, float, float, float, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
615
    r"""Perform a biquad filter of input tensor.  Initial conditions set to 0.
616
617
618
    https://en.wikipedia.org/wiki/Digital_biquad_filter

    Args:
Vincent QB's avatar
Vincent QB committed
619
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
620
621
622
623
624
625
626
627
        b0 (float): numerator coefficient of current input, x[n]
        b1 (float): numerator coefficient of input one time step ago x[n-1]
        b2 (float): numerator coefficient of input two time steps ago x[n-2]
        a0 (float): denominator coefficient of current output y[n], typically 1
        a1 (float): denominator coefficient of current output y[n-1]
        a2 (float): denominator coefficient of current output y[n-2]

    Returns:
Vincent QB's avatar
Vincent QB committed
628
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
629
630
    """

631
632
    device = waveform.device
    dtype = waveform.dtype
633
634

    output_waveform = lfilter(
635
636
637
        waveform,
        torch.tensor([a0, a1, a2], dtype=dtype, device=device),
        torch.tensor([b0, b1, b2], dtype=dtype, device=device)
638
639
640
641
642
    )
    return output_waveform


def _dB2Linear(x):
643
    # type: (float) -> float
644
645
646
647
    return math.exp(x * math.log(10) / 20.0)


def highpass_biquad(waveform, sample_rate, cutoff_freq, Q=0.707):
648
    # type: (Tensor, int, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
649
    r"""Design biquad highpass filter and perform filtering.  Similar to SoX implementation.
650
651

    Args:
Vincent QB's avatar
Vincent QB committed
652
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
653
654
655
656
657
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
        cutoff_freq (float): filter cutoff frequency
        Q (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
Vincent QB's avatar
Vincent QB committed
658
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
659
660
    """

661
    GAIN = 1.
662
663
    w0 = 2 * math.pi * cutoff_freq / sample_rate
    A = math.exp(GAIN / 40.0 * math.log(10))
664
    alpha = math.sin(w0) / 2. / Q
665
666
667
668
669
670
671
672
673
674
675
676
    mult = _dB2Linear(max(GAIN, 0))

    b0 = (1 + math.cos(w0)) / 2
    b1 = -1 - math.cos(w0)
    b2 = b0
    a0 = 1 + alpha
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha
    return biquad(waveform, b0, b1, b2, a0, a1, a2)


def lowpass_biquad(waveform, sample_rate, cutoff_freq, Q=0.707):
677
    # type: (Tensor, int, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
678
    r"""Design biquad lowpass filter and perform filtering.  Similar to SoX implementation.
679
680

    Args:
Vincent QB's avatar
Vincent QB committed
681
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
682
683
684
685
686
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
        cutoff_freq (float): filter cutoff frequency
        Q (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
Vincent QB's avatar
Vincent QB committed
687
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
688
689
    """

690
    GAIN = 1.
691
692
693
694
695
696
697
698
699
700
701
702
    w0 = 2 * math.pi * cutoff_freq / sample_rate
    A = math.exp(GAIN / 40.0 * math.log(10))
    alpha = math.sin(w0) / 2 / Q
    mult = _dB2Linear(max(GAIN, 0))

    b0 = (1 - math.cos(w0)) / 2
    b1 = 1 - math.cos(w0)
    b2 = b0
    a0 = 1 + alpha
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha
    return biquad(waveform, b0, b1, b2, a0, a1, a2)
Vincent QB's avatar
Vincent QB committed
703
704


xinyang0's avatar
xinyang0 committed
705
706
def equalizer_biquad(waveform, sample_rate, center_freq, gain, Q=0.707):
    # type: (Tensor, int, float, float, float) -> Tensor
Vincent QB's avatar
Vincent QB committed
707
    r"""Design biquad peaking equalizer filter and perform filtering.  Similar to SoX implementation.
xinyang0's avatar
xinyang0 committed
708
709
710
711

    Args:
        waveform (torch.Tensor): audio waveform of dimension of `(channel, time)`
        sample_rate (int): sampling rate of the waveform, e.g. 44100 (Hz)
712
        center_freq (float): filter's central frequency
xinyang0's avatar
xinyang0 committed
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
        gain (float): desired gain at the boost (or attenuation) in dB
        q_factor (float): https://en.wikipedia.org/wiki/Q_factor

    Returns:
        output_waveform (torch.Tensor): Dimension of `(channel, time)`
    """
    w0 = 2 * math.pi * center_freq / sample_rate
    A = math.exp(gain / 40.0 * math.log(10))
    alpha = math.sin(w0) / 2 / Q

    b0 = 1 + alpha * A
    b1 = -2 * math.cos(w0)
    b2 = 1 - alpha * A
    a0 = 1 + alpha / A
    a1 = -2 * math.cos(w0)
    a2 = 1 - alpha / A
    return biquad(waveform, b0, b1, b2, a0, a1, a2)


732
733
734
735
736
737
738
739
def mask_along_axis_iid(specgrams, mask_param, mask_value, axis):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Apply a mask along ``axis``. Mask will be applied from indices ``[v_0, v_0 + v)``, where
    ``v`` is sampled from ``uniform(0, mask_param)``, and ``v_0`` from ``uniform(0, max_v - v)``.
    All examples will have the same mask interval.

    Args:
Vincent QB's avatar
Vincent QB committed
740
        specgrams (Tensor): Real spectrograms (batch, channel, freq, time)
741
742
743
744
745
        mask_param (int): Number of columns to be masked will be uniformly sampled from [0, mask_param]
        mask_value (float): Value to assign to the masked columns
        axis (int): Axis to apply masking on (2 -> frequency, 3 -> time)

    Returns:
Vincent QB's avatar
Vincent QB committed
746
        torch.Tensor: Masked spectrograms of dimensions (batch, channel, freq, time)
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
    """

    if axis != 2 and axis != 3:
        raise ValueError('Only Frequency and Time masking are supported')

    value = torch.rand(specgrams.shape[:2]) * mask_param
    min_value = torch.rand(specgrams.shape[:2]) * (specgrams.size(axis) - value)

    # Create broadcastable mask
    mask_start = (min_value.long())[..., None, None].float()
    mask_end = (min_value.long() + value.long())[..., None, None].float()
    mask = torch.arange(0, specgrams.size(axis)).float()

    # Per batch example masking
    specgrams = specgrams.transpose(axis, -1)
    specgrams.masked_fill_((mask >= mask_start) & (mask < mask_end), mask_value)
    specgrams = specgrams.transpose(axis, -1)

    return specgrams


def mask_along_axis(specgram, mask_param, mask_value, axis):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Apply a mask along ``axis``. Mask will be applied from indices ``[v_0, v_0 + v)``, where
    ``v`` is sampled from ``uniform(0, mask_param)``, and ``v_0`` from ``uniform(0, max_v - v)``.
    All examples will have the same mask interval.

    Args:
Vincent QB's avatar
Vincent QB committed
776
        specgram (Tensor): Real spectrogram (channel, freq, time)
777
778
779
780
781
        mask_param (int): Number of columns to be masked will be uniformly sampled from [0, mask_param]
        mask_value (float): Value to assign to the masked columns
        axis (int): Axis to apply masking on (1 -> frequency, 2 -> time)

    Returns:
Vincent QB's avatar
Vincent QB committed
782
        torch.Tensor: Masked spectrogram of dimensions (channel, freq, time)
783
784
    """

785
786
787
788
    # pack batch
    shape = specgram.size()
    specgram = specgram.reshape([-1] + list(shape[-2:]))

789
790
791
792
793
794
795
796
797
798
799
800
801
802
    value = torch.rand(1) * mask_param
    min_value = torch.rand(1) * (specgram.size(axis) - value)

    mask_start = (min_value.long()).squeeze()
    mask_end = (min_value.long() + value.long()).squeeze()

    assert mask_end - mask_start < mask_param
    if axis == 1:
        specgram[:, mask_start:mask_end] = mask_value
    elif axis == 2:
        specgram[:, :, mask_start:mask_end] = mask_value
    else:
        raise ValueError('Only Frequency and Time masking are supported')

803
804
805
806
    # unpack batch
    specgram = specgram.reshape(shape[:-2] + specgram.shape[-2:])

    return specgram.reshape(shape[:-2] + specgram.shape[-2:])
807
808


Vincent QB's avatar
Vincent QB committed
809
810
811
812
813
814
815
816
817
818
819
820
def compute_deltas(specgram, win_length=5, mode="replicate"):
    # type: (Tensor, int, str) -> Tensor
    r"""Compute delta coefficients of a tensor, usually a spectrogram:

    .. math::
        d_t = \frac{\sum_{n=1}^{\text{N}} n (c_{t+n} - c_{t-n})}{2 \sum_{n=1}^{\text{N} n^2}

    where :math:`d_t` is the deltas at time :math:`t`,
    :math:`c_t` is the spectrogram coeffcients at time :math:`t`,
    :math:`N` is (`win_length`-1)//2.

    Args:
Vincent QB's avatar
Vincent QB committed
821
        specgram (torch.Tensor): Tensor of audio of dimension (..., freq, time)
Vincent QB's avatar
Vincent QB committed
822
823
824
825
        win_length (int): The window length used for computing delta
        mode (str): Mode parameter passed to padding

    Returns:
Vincent QB's avatar
Vincent QB committed
826
        deltas (torch.Tensor): Tensor of audio of dimension (..., freq, time)
Vincent QB's avatar
Vincent QB committed
827
828
829
830
831
832
833

    Example
        >>> specgram = torch.randn(1, 40, 1000)
        >>> delta = compute_deltas(specgram)
        >>> delta2 = compute_deltas(delta)
    """

Vincent QB's avatar
Vincent QB committed
834
835
836
837
    # pack batch
    shape = specgram.size()
    specgram = specgram.reshape(1, -1, shape[-1])

Vincent QB's avatar
Vincent QB committed
838
839
840
841
842
843
844
845
846
847
848
849
    assert win_length >= 3

    n = (win_length - 1) // 2

    # twice sum of integer squared
    denom = n * (n + 1) * (2 * n + 1) / 3

    specgram = torch.nn.functional.pad(specgram, (n, n), mode=mode)

    kernel = (
        torch
        .arange(-n, n + 1, 1, device=specgram.device, dtype=specgram.dtype)
Vincent QB's avatar
Vincent QB committed
850
        .repeat(specgram.shape[1], 1, 1)
Vincent QB's avatar
Vincent QB committed
851
852
    )

Vincent QB's avatar
Vincent QB committed
853
854
855
856
857
858
    output = torch.nn.functional.conv1d(specgram, kernel, groups=specgram.shape[1]) / denom

    # unpack batch
    output = output.reshape(shape)

    return output
Vincent QB's avatar
Vincent QB committed
859
860


861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
def gain(waveform, gain_db=1.0):
    # type: (Tensor, float) -> Tensor
    r"""Apply amplification or attenuation to the whole waveform.

    Args:
       waveform (torch.Tensor): Tensor of audio of dimension (channel, time).
       gain_db (float) Gain adjustment in decibels (dB) (Default: `1.0`).

    Returns:
       torch.Tensor: the whole waveform amplified by gain_db.
    """
    if (gain_db == 0):
        return waveform

    ratio = 10 ** (gain_db / 20)

    return waveform * ratio


def _scale_to_interval(waveform, interval_max=1.0):
    # type: (Tensor, float) -> Tensor
    r"""Scale the waveform to the interval [-interval_max, interval_max] across all dimensions.

    Args:
       waveform (torch.Tensor): Tensor of audio of dimension (channel, time).
       interval_max (float): The bounds of the interval, where the float indicates
           the upper bound and the negative of the float indicates the lower
           bound (Default: `1.0`).
           Example: interval=1.0 -> [-1.0, 1.0]

    Returns:
       torch.Tensor: the whole waveform scaled to interval.
    """
    abs_max = torch.max(torch.abs(waveform))
    ratio = abs_max / interval_max
    waveform /= ratio

    return waveform


def _add_noise_shaping(dithered_waveform, waveform):
    r"""Noise shaping is calculated by error:
    error[n] = dithered[n] - original[n]
    noise_shaped_waveform[n] = dithered[n] + error[n-1]
    """
    wf_shape = waveform.size()
    waveform = waveform.reshape(-1, wf_shape[-1])

    dithered_shape = dithered_waveform.size()
    dithered_waveform = dithered_waveform.reshape(-1, dithered_shape[-1])

    error = dithered_waveform - waveform

    # add error[n-1] to dithered_waveform[n], so offset the error by 1 index
    for index in range(error.size()[0]):
        err = error[index]
        error_offset = torch.cat((torch.zeros(1), err))
        error[index] = error_offset[:waveform.size()[1]]

    noise_shaped = dithered_waveform + error
    return noise_shaped.reshape(dithered_shape[:-1] + noise_shaped.shape[-1:])


def _apply_probability_distribution(waveform, density_function="TPDF"):
    # type: (Tensor, str) -> Tensor
    r"""Apply a probability distribution function on a waveform.

    Triangular probability density function (TPDF) dither noise has a
    triangular distribution; values in the center of the range have a higher
    probability of occurring.

    Rectangular probability density function (RPDF) dither noise has a
    uniform distribution; any value in the specified range has the same
    probability of occurring.

    Gaussian probability density function (GPDF) has a normal distribution.
    The relationship of probabilities of results follows a bell-shaped,
    or Gaussian curve, typical of dither generated by analog sources.
    Args:
        waveform (torch.Tensor): Tensor of audio of dimension (channel, time)
        probability_density_function (string): The density function of a
           continuous random variable (Default: `TPDF`)
           Options: Triangular Probability Density Function - `TPDF`
                    Rectangular Probability Density Function - `RPDF`
                    Gaussian Probability Density Function - `GPDF`
    Returns:
        torch.Tensor: waveform dithered with TPDF
    """
    shape = waveform.size()
    waveform = waveform.reshape(-1, shape[-1])

    channel_size = waveform.size()[0] - 1
    time_size = waveform.size()[-1] - 1

    random_channel = int(torch.randint(channel_size, [1, ]).item()) if channel_size > 0 else 0
    random_time = int(torch.randint(time_size, [1, ]).item()) if time_size > 0 else 0

    number_of_bits = 16
    up_scaling = 2 ** (number_of_bits - 1) - 2
    signal_scaled = waveform * up_scaling
    down_scaling = 2 ** (number_of_bits - 1)

    signal_scaled_dis = waveform
    if (density_function == "RPDF"):
        RPDF = waveform[random_channel][random_time] - 0.5

        signal_scaled_dis = signal_scaled + RPDF
    elif (density_function == "GPDF"):
        # TODO Replace by distribution code once
        # https://github.com/pytorch/pytorch/issues/29843 is resolved
        # gaussian = torch.distributions.normal.Normal(torch.mean(waveform, -1), 1).sample()

        num_rand_variables = 6

        gaussian = waveform[random_channel][random_time]
        for ws in num_rand_variables * [time_size]:
            rand_chan = int(torch.randint(channel_size, [1, ]).item())
            gaussian += waveform[rand_chan][int(torch.randint(ws, [1, ]).item())]

        signal_scaled_dis = signal_scaled + gaussian
    else:
        TPDF = torch.bartlett_window(time_size + 1)
        TPDF = TPDF.repeat((channel_size + 1), 1)
        signal_scaled_dis = signal_scaled + TPDF

    quantised_signal_scaled = torch.round(signal_scaled_dis)
    quantised_signal = quantised_signal_scaled / down_scaling
    return quantised_signal.reshape(shape[:-1] + quantised_signal.shape[-1:])


def dither(waveform, density_function="TPDF", noise_shaping=False):
    # type: (Tensor, str, bool) -> Tensor
    r"""Dither increases the perceived dynamic range of audio stored at a
    particular bit-depth by eliminating nonlinear truncation distortion
    (i.e. adding minimally perceived noise to mask distortion caused by quantization).
    Args:
       waveform (torch.Tensor): Tensor of audio of dimension (channel, time)
       density_function (string): The density function of a
           continuous random variable (Default: `TPDF`)
           Options: Triangular Probability Density Function - `TPDF`
                    Rectangular Probability Density Function - `RPDF`
                    Gaussian Probability Density Function - `GPDF`
       noise_shaping (boolean): a filtering process that shapes the spectral
           energy of quantisation error (Default: `False`)

    Returns:
       torch.Tensor: waveform dithered
    """
    dithered = _apply_probability_distribution(waveform, density_function=density_function)

    if noise_shaping:
        return _add_noise_shaping(dithered, waveform)
    else:
        return dithered


Vincent QB's avatar
Vincent QB committed
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
def _compute_nccf(waveform, sample_rate, frame_time, freq_low):
    # type: (Tensor, int, float, int) -> Tensor
    r"""
    Compute Normalized Cross-Correlation Function (NCCF).

    .. math::
        \phi_i(m) = \frac{\sum_{n=b_i}^{b_i + N-1} w(n) w(m+n)}{\sqrt{E(b_i) E(m+b_i)}},

    where
    :math:`\phi_i(m)` is the NCCF at frame :math:`i` with lag :math:`m`,
    :math:`w` is the waveform,
    :math:`N` is the lenght of a frame,
    :math:`b_i` is the beginning of frame :math:`i`,
    :math:`E(j)` is the energy :math:`\sum_{n=j}^{j+N-1} w^2(n)`.
    """

    EPSILON = 10 ** (-9)

    # Number of lags to check
    lags = math.ceil(sample_rate / freq_low)

    frame_size = int(math.ceil(sample_rate * frame_time))

    waveform_length = waveform.size()[-1]
    num_of_frames = math.ceil(waveform_length / frame_size)

    p = lags + num_of_frames * frame_size - waveform_length
    waveform = torch.nn.functional.pad(waveform, (0, p))

    # Compute lags
    output_lag = []
    for lag in range(1, lags + 1):
        s1 = waveform[..., :-lag].unfold(-1, frame_size, frame_size)[
            ..., :num_of_frames, :
        ]
        s2 = waveform[..., lag:].unfold(-1, frame_size, frame_size)[
            ..., :num_of_frames, :
        ]

        output_frames = (
            (s1 * s2).sum(-1)
            / (EPSILON + torch.norm(s1, p=2, dim=-1)).pow(2)
            / (EPSILON + torch.norm(s2, p=2, dim=-1)).pow(2)
        )

        output_lag.append(output_frames.unsqueeze(-1))

    nccf = torch.cat(output_lag, -1)

    return nccf


def _combine_max(a, b, thresh=0.99):
    # type: (Tuple[Tensor, Tensor], Tuple[Tensor, Tensor], float) -> Tuple[Tensor, Tensor]
    """
    Take value from first if bigger than a multiplicative factor of the second, elementwise.
    """
    mask = (a[0] > thresh * b[0])
    values = mask * a[0] + ~mask * b[0]
    indices = mask * a[1] + ~mask * b[1]
    return values, indices


def _find_max_per_frame(nccf, sample_rate, freq_high):
    # type: (Tensor, int, int) -> Tensor
    r"""
    For each frame, take the highest value of NCCF,
    apply centered median smoothing, and convert to frequency.

    Note: If the max among all the lags is very close
    to the first half of lags, then the latter is taken.
    """

    lag_min = math.ceil(sample_rate / freq_high)

    # Find near enough max that is smallest

    best = torch.max(nccf[..., lag_min:], -1)

    half_size = nccf.shape[-1] // 2
    half = torch.max(nccf[..., lag_min:half_size], -1)

    best = _combine_max(half, best)
    indices = best[1]

    # Add back minimal lag
    indices += lag_min
    # Add 1 empirical calibration offset
    indices += 1

    return indices


def _median_smoothing(indices, win_length):
    # type: (Tensor, int) -> Tensor
    r"""
    Apply median smoothing to the 1D tensor over the given window.
    """

    # Centered windowed
    pad_length = (win_length - 1) // 2

    # "replicate" padding in any dimension
    indices = torch.nn.functional.pad(
        indices, (pad_length, 0), mode="constant", value=0.
    )

    indices[..., :pad_length] = torch.cat(pad_length * [indices[..., pad_length].unsqueeze(-1)], dim=-1)
    roll = indices.unfold(-1, win_length, 1)

    values, _ = torch.median(roll, -1)
    return values


def detect_pitch_frequency(
    waveform,
    sample_rate,
    frame_time=10 ** (-2),
    win_length=30,
    freq_low=85,
    freq_high=3400,
):
    # type: (Tensor, int, float, int, int, int) -> Tensor
    r"""Detect pitch frequency.

    It is implemented using normalized cross-correlation function and median smoothing.

    Args:
Vincent QB's avatar
Vincent QB committed
1145
        waveform (torch.Tensor): Tensor of audio of dimension (..., freq, time)
Vincent QB's avatar
Vincent QB committed
1146
1147
1148
1149
1150
1151
        sample_rate (int): The sample rate of the waveform (Hz)
        win_length (int): The window length for median smoothing (in number of frames)
        freq_low (int): Lowest frequency that can be detected (Hz)
        freq_high (int): Highest frequency that can be detected (Hz)

    Returns:
Vincent QB's avatar
Vincent QB committed
1152
        freq (torch.Tensor): Tensor of audio of dimension (..., frame)
Vincent QB's avatar
Vincent QB committed
1153
1154
    """

Vincent QB's avatar
Vincent QB committed
1155
1156
1157
    dim = waveform.dim()

    # pack batch
1158
    shape = list(waveform.size())
Vincent QB's avatar
Vincent QB committed
1159
1160
    waveform = waveform.reshape([-1] + shape[-1:])

Vincent QB's avatar
Vincent QB committed
1161
1162
1163
1164
1165
1166
1167
1168
    nccf = _compute_nccf(waveform, sample_rate, frame_time, freq_low)
    indices = _find_max_per_frame(nccf, sample_rate, freq_high)
    indices = _median_smoothing(indices, win_length)

    # Convert indices to frequency
    EPSILON = 10 ** (-9)
    freq = sample_rate / (EPSILON + indices.to(torch.float))

Vincent QB's avatar
Vincent QB committed
1169
    # unpack batch
1170
    freq = freq.reshape(shape[:-1] + list(freq.shape[-1:]))
Vincent QB's avatar
Vincent QB committed
1171

Vincent QB's avatar
Vincent QB committed
1172
    return freq